Coverage for mcp/tools/_long_task.py: 90%
130 statements
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-15 15:07 +0000
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-15 15:07 +0000
1"""Shared async subprocess runner for long-running MCP tools.
3Streams progress through FastMCP's Progress dependency, emits a periodic
4heartbeat when the underlying process goes quiet, captures the tail of
5stderr for failure surfacing, and raises ``ToolError`` on non-zero exit
6with a structured error message so MCP clients can render failures
7properly instead of treating them as opaque success-shaped JSON blobs.
9In parallel with the MCP wire, every invocation also writes a JSON
10status file plus a raw log file under ``~/.gco/tasks/`` via
11``_task_status.TaskStatusWriter``. That gives operators (and other
12agents) an out-of-band view of what's happening even when the MCP
13client drops or buries the streamed notifications. The disk surface
14is read by the ``task_status`` / ``task_tail`` MCP tools and by
15``gco tasks list/tail``.
17Used by every long-running stack-lifecycle tool (deploy_stack, deploy_all,
18bootstrap_cdk, destroy_stack, destroy_all) and by images_build / images_push.
19"""
21from __future__ import annotations
23import asyncio
24import contextlib
25import json
26import re
27import signal
28import time
29from collections import deque
30from collections.abc import Sequence
31from typing import Any
33from fastmcp.exceptions import ToolError
35from tools._task_status import TaskStatusWriter, make_task_id
37# <pyflowchart-code-diagram> BEGIN - auto-inserted, do not edit
38# Flowchart(s) generated from this file:
39# * ``_run_long_task`` -> ``diagrams/code_diagrams/mcp/tools/_long_task._run_long_task.html``
40# (PNG: ``diagrams/code_diagrams/mcp/tools/_long_task._run_long_task.png``)
41# Regenerate with ``python diagrams/code_diagrams/generate.py``.
42# <pyflowchart-code-diagram> END
45_CFN_FAILED_RE = re.compile(r"(CREATE|UPDATE|DELETE)_FAILED")
46# Matches the CDK stack-progress prefix it prints in the form
47# `` ✅ gco-global``, `` ❌ gco-us-east-1``, or ``gco-us-east-1: deploying...``.
48_CDK_STACK_LINE_RE = re.compile(r"\b(gco-[a-z0-9-]+)\b")
49# Recognises CDK's per-stack done/fail signal: ``✅ gco-global`` (deploy)
50# or ``✅ gco-global: destroyed`` (destroy). One match per stack →
51# one progress increment so the client's percentage tracks stack
52# completion, not individual CFN resource events (which jump from 0
53# to dozens mid-stack and reset on the next stack).
54_CDK_STACK_DONE_RE = re.compile(r"[✅✨]\s+(gco-[a-z0-9-]+)\b")
55_CANCEL_GRACE_SECONDS = 10
56# How often to emit a heartbeat status when the subprocess is silent. CDK
57# can be quiet for minutes during EKS cluster creation or VPC tear-down
58# while CloudFormation churns away. Without a heartbeat, MCP clients
59# render the tool as wedged.
60_HEARTBEAT_INTERVAL_SECONDS = 30
61# How many tail lines of stderr to retain for the failure payload. CDK
62# failure summaries typically fit in 30-50 lines; we cap at 80 to avoid
63# blowing up the ToolError message size.
64_STDERR_TAIL_LINES = 80
65_PARTIAL_STATE_DISCLAIMER = (
66 "Partial CloudFormation state may remain — inspect via stack_status or the AWS console."
67)
70def _argv_has_traversal(argv: Sequence[str]) -> tuple[int, str] | None:
71 """Return (index, offending_value) for the first non-flag arg with a ``..``
72 segment, else None.
74 A non-flag element is one that does not start with ``-``. The check matches
75 the convention used by ``mcp.cli_runner._run_cli`` so the rejection shape
76 is consistent across short- and long-running tools.
77 """
78 for i, v in enumerate(argv):
79 if v.startswith("-"):
80 continue
81 if ".." in v.split("/") or ".." in v.split("\\"):
82 return (i, v[:100])
83 return None
86async def _run_long_task(
87 argv: Sequence[str],
88 *,
89 ctx: Any,
90 progress: Any,
91 is_stack_op: bool = True,
92 total_units: int | None = None,
93) -> str:
94 """Run a long-running subprocess with rich status reporting.
96 Spawns ``argv`` via :func:`asyncio.create_subprocess_exec`, drains stdout
97 and stderr concurrently, and streams progress through three channels:
99 * ``progress.set_message(...)`` for every meaningful line (≤ 200 chars)
100 * ``progress.increment()`` once per stack completion (CDK's ``✅ gco-…``
101 / ``✨ gco-…`` lines), so the progress counter tracks AWS-side stack
102 milestones rather than per-resource CFN events
103 * ``ctx.info(...)`` for stderr lines so MCP clients with a Context
104 surface render them through the standard MCP logging channel
106 When ``total_units`` is supplied, it's forwarded to
107 ``progress.set_total(total_units)`` once at startup so the client
108 can render a proper percentage. Callers that know how many stacks
109 they're processing (e.g. ``deploy_all`` knowing the cdk.json
110 deployment_regions count) should pass it; callers that don't can
111 omit it and the client falls back to indeterminate progress.
113 A periodic heartbeat fires every ``_HEARTBEAT_INTERVAL_SECONDS`` when
114 no output has been seen, so the client never sees a stalled progress
115 state during quiet CDK phases (EKS cluster creation, VPC teardown).
116 The heartbeat carries the elapsed wall-clock and the most recent
117 stack name observed in the output, in the form
118 ``"still running … 4m12s elapsed (last: gco-us-east-1)"``.
120 On cancellation, sends ``SIGTERM``, waits up to ``_CANCEL_GRACE_SECONDS``
121 for graceful shutdown, sends ``SIGKILL`` on timeout, cancels the drain
122 tasks, and re-raises ``CancelledError``. When ``is_stack_op`` is True,
123 the re-raised exception's message includes a CloudFormation
124 partial-state disclaimer so operators know AWS state may be inconsistent.
126 Returns a JSON string on success:
128 * ``{"status": "ok", "stacks_completed": <n>, "duration_seconds": <s>,
129 "last_stack": <str|null>}``
131 Raises :class:`fastmcp.exceptions.ToolError` on subprocess failure with
132 a structured message that includes:
134 * The exit code
135 * The number of stacks completed before failure
136 * The last stack the run was working on (parsed from output)
137 * Whether any ``*_FAILED`` line was seen
138 * The tail of stderr (up to ``_STDERR_TAIL_LINES`` lines)
139 * The partial-state disclaimer (when ``is_stack_op=True``)
141 The structured payload lives in the ToolError ``args[0]`` as a JSON
142 string so MCP clients can deserialize and render it; FastMCP forwards
143 the message to the client's ``CallToolResult.content`` automatically.
145 Path-traversal rejection is unchanged — returns the same JSON stub
146 without spawning a subprocess.
147 """
148 hit = _argv_has_traversal(argv)
149 if hit is not None:
150 idx, val = hit
151 return json.dumps({"error": "path_traversal_detected", "argv_index": idx, "value": val})
153 started = time.monotonic()
154 if total_units is not None and total_units > 0:
155 # set_total may not be implemented by every Progress impl on
156 # every fastmcp version — best-effort.
157 with contextlib.suppress(AttributeError, NotImplementedError):
158 await progress.set_total(int(total_units))
160 # ``asyncio.create_subprocess_exec`` itself can be cancelled mid-
161 # call (the inner ``_make_subprocess_transport`` awaits a waiter).
162 # When that happens for a stack-op invocation, surface the same
163 # partial-state disclaimer the post-creation cancel path emits —
164 # AWS state may already be in flight even if our local subprocess
165 # never got a PID. Without this guard a fast cancellation lands as
166 # a bare ``CancelledError`` and the operator sees no warning that
167 # CloudFormation may need inspection.
168 try:
169 proc = await asyncio.create_subprocess_exec(
170 *argv,
171 stdout=asyncio.subprocess.PIPE,
172 stderr=asyncio.subprocess.PIPE,
173 )
174 except asyncio.CancelledError:
175 if is_stack_op:
176 raise asyncio.CancelledError(_PARTIAL_STATE_DISCLAIMER) from None
177 raise
179 # Disk-backed status: parallel observability channel that doesn't
180 # depend on the MCP client surfacing notifications. Reads land in
181 # the task_status / task_tail tools and in ``gco tasks``.
182 #
183 # Derive a human-readable tool name from the argv. ``gco stacks
184 # deploy-all`` → ``stacks_deploy_all``; the dash is normalized to
185 # an underscore so the IDs are filesystem-safe and grep-friendly.
186 # Plain ``argv[0]`` (the bare interpreter or finch path) is the
187 # fallback for non-gco invocations like ``images_build`` which
188 # may shell out directly.
189 if len(argv) >= 3 and argv[0] == "gco":
190 tool_name = f"{argv[1]}_{argv[2].replace('-', '_')}"
191 elif len(argv) >= 2 and argv[0] == "gco": 191 ↛ 192line 191 didn't jump to line 192 because the condition on line 191 was never true
192 tool_name = argv[1]
193 else:
194 tool_name = argv[0] if argv else "task"
195 task_id = make_task_id(tool_name)
196 status_writer = TaskStatusWriter(
197 task_id=task_id,
198 tool=tool_name,
199 argv=list(argv),
200 pid=proc.pid,
201 total_units=total_units,
202 )
204 stacks_completed = 0
205 failed_lines: list[str] = []
206 stderr_tail: deque[str] = deque(maxlen=_STDERR_TAIL_LINES)
207 last_activity = time.monotonic()
208 last_stack: str | None = None
209 completed_stacks_set: set[str] = set()
211 async def _drain(stream: asyncio.StreamReader | None, label: str) -> None:
212 nonlocal stacks_completed, last_activity, last_stack
213 assert stream is not None # PIPE is set above for both streams
214 async for raw in stream:
215 line = raw.decode("utf-8", errors="replace").rstrip()
216 if not line: 216 ↛ 217line 216 didn't jump to line 217 because the condition on line 216 was never true
217 continue
218 last_activity = time.monotonic()
219 await progress.set_message(line[:200])
220 status_writer.record_line(line, stream=label)
221 stack_done = _CDK_STACK_DONE_RE.search(line)
222 if stack_done is not None:
223 # Increment once per unique stack — CDK can echo the
224 # done line on stdout and stderr, double-counting
225 # would mis-report progress.
226 name = stack_done.group(1)
227 if name not in completed_stacks_set:
228 completed_stacks_set.add(name)
229 stacks_completed += 1
230 await progress.increment()
231 status_writer.increment_stacks(name)
232 if _CFN_FAILED_RE.search(line):
233 failed_lines.append(line[:200])
234 stack_match = _CDK_STACK_LINE_RE.search(line)
235 if stack_match:
236 last_stack = stack_match.group(1)
237 status_writer.set_last_stack(last_stack)
238 if label == "stderr":
239 stderr_tail.append(line)
240 await ctx.info(f"stderr: {line[:200]}")
242 async def _heartbeat() -> None:
243 """Emit a periodic 'still running' status when the subprocess is quiet.
245 The heartbeat does NOT call ``progress.increment()`` — that's
246 reserved for AWS-side milestones. It only updates the message
247 so the client sees activity, and emits a parallel ``ctx.info``
248 so clients without a Progress observer still get the signal.
249 """
250 while True:
251 await asyncio.sleep(_HEARTBEAT_INTERVAL_SECONDS)
252 silent_for = time.monotonic() - last_activity
253 if silent_for < _HEARTBEAT_INTERVAL_SECONDS:
254 continue
255 elapsed = int(time.monotonic() - started)
256 stack_part = f" (last: {last_stack})" if last_stack else ""
257 msg = f"still running … {_format_duration(elapsed)} elapsed{stack_part}"
258 await progress.set_message(msg)
259 await ctx.info(msg)
261 drains = [
262 asyncio.create_task(_drain(proc.stdout, "stdout")),
263 asyncio.create_task(_drain(proc.stderr, "stderr")),
264 ]
265 heartbeat = asyncio.create_task(_heartbeat())
267 try:
268 rc = await proc.wait()
269 await asyncio.gather(*drains, return_exceptions=True)
270 except asyncio.CancelledError:
271 if proc.returncode is None: 271 ↛ 279line 271 didn't jump to line 279 because the condition on line 271 was always true
272 proc.send_signal(signal.SIGTERM)
273 try:
274 await asyncio.wait_for(proc.wait(), timeout=_CANCEL_GRACE_SECONDS)
275 except TimeoutError:
276 if proc.returncode is None:
277 proc.send_signal(signal.SIGKILL)
278 await proc.wait()
279 for d in drains:
280 d.cancel()
281 heartbeat.cancel()
282 status_writer.finish(
283 state="cancelled",
284 exit_code=proc.returncode,
285 error=_PARTIAL_STATE_DISCLAIMER if is_stack_op else "cancelled",
286 )
287 if is_stack_op:
288 raise asyncio.CancelledError(_PARTIAL_STATE_DISCLAIMER) from None
289 raise
290 finally:
291 heartbeat.cancel()
293 duration = int(time.monotonic() - started)
295 if rc != 0:
296 # Compose a structured failure payload so the client sees the
297 # full error context (which stack failed, the stderr tail, the
298 # partial-state disclaimer) rather than an opaque exit-code stub.
299 payload: dict[str, Any] = {
300 "error": f"exit_code={rc}",
301 "exit_code": rc,
302 "task_id": task_id,
303 "stacks_completed": stacks_completed,
304 "duration_seconds": duration,
305 "last_stack": last_stack,
306 "failed_events": failed_lines[:10],
307 "stderr_tail": list(stderr_tail),
308 }
309 if is_stack_op:
310 payload["disclaimer"] = _PARTIAL_STATE_DISCLAIMER
311 status_writer.finish(state="failed", exit_code=rc, error=f"exit_code={rc}")
312 # ToolError surfaces the payload to the client as a tool-level
313 # failure (CallToolResult.is_error=True). Inline callers see it
314 # via the FastMCP transport; agent clients render it as a
315 # tool-error message rather than success-shaped data.
316 raise ToolError(json.dumps(payload))
318 status_writer.finish(state="succeeded", exit_code=rc)
319 return json.dumps(
320 {
321 "status": "ok",
322 "task_id": task_id,
323 "stacks_completed": stacks_completed,
324 "duration_seconds": duration,
325 "last_stack": last_stack,
326 }
327 )
330def _format_duration(seconds: int) -> str:
331 """Render an integer second count as ``HhMmSs`` / ``MmSs`` / ``Ss``."""
332 if seconds < 60:
333 return f"{seconds}s"
334 minutes, sec = divmod(seconds, 60)
335 if minutes < 60:
336 return f"{minutes}m{sec:02d}s"
337 hours, mins = divmod(minutes, 60)
338 return f"{hours}h{mins:02d}m{sec:02d}s"