Coverage for mcp/tools/_long_task.py: 90%

130 statements  

« prev     ^ index     » next       coverage.py v7.14.1, created at 2026-06-15 15:07 +0000

1"""Shared async subprocess runner for long-running MCP tools. 

2 

3Streams progress through FastMCP's Progress dependency, emits a periodic 

4heartbeat when the underlying process goes quiet, captures the tail of 

5stderr for failure surfacing, and raises ``ToolError`` on non-zero exit 

6with a structured error message so MCP clients can render failures 

7properly instead of treating them as opaque success-shaped JSON blobs. 

8 

9In parallel with the MCP wire, every invocation also writes a JSON 

10status file plus a raw log file under ``~/.gco/tasks/`` via 

11``_task_status.TaskStatusWriter``. That gives operators (and other 

12agents) an out-of-band view of what's happening even when the MCP 

13client drops or buries the streamed notifications. The disk surface 

14is read by the ``task_status`` / ``task_tail`` MCP tools and by 

15``gco tasks list/tail``. 

16 

17Used by every long-running stack-lifecycle tool (deploy_stack, deploy_all, 

18bootstrap_cdk, destroy_stack, destroy_all) and by images_build / images_push. 

19""" 

20 

21from __future__ import annotations 

22 

23import asyncio 

24import contextlib 

25import json 

26import re 

27import signal 

28import time 

29from collections import deque 

30from collections.abc import Sequence 

31from typing import Any 

32 

33from fastmcp.exceptions import ToolError 

34 

35from tools._task_status import TaskStatusWriter, make_task_id 

36 

37# <pyflowchart-code-diagram> BEGIN - auto-inserted, do not edit 

38# Flowchart(s) generated from this file: 

39# * ``_run_long_task`` -> ``diagrams/code_diagrams/mcp/tools/_long_task._run_long_task.html`` 

40# (PNG: ``diagrams/code_diagrams/mcp/tools/_long_task._run_long_task.png``) 

41# Regenerate with ``python diagrams/code_diagrams/generate.py``. 

42# <pyflowchart-code-diagram> END 

43 

44 

45_CFN_FAILED_RE = re.compile(r"(CREATE|UPDATE|DELETE)_FAILED") 

46# Matches the CDK stack-progress prefix it prints in the form 

47# `` ✅ gco-global``, `` ❌ gco-us-east-1``, or ``gco-us-east-1: deploying...``. 

48_CDK_STACK_LINE_RE = re.compile(r"\b(gco-[a-z0-9-]+)\b") 

49# Recognises CDK's per-stack done/fail signal: ``✅ gco-global`` (deploy) 

50# or ``✅ gco-global: destroyed`` (destroy). One match per stack → 

51# one progress increment so the client's percentage tracks stack 

52# completion, not individual CFN resource events (which jump from 0 

53# to dozens mid-stack and reset on the next stack). 

54_CDK_STACK_DONE_RE = re.compile(r"[✅✨]\s+(gco-[a-z0-9-]+)\b") 

55_CANCEL_GRACE_SECONDS = 10 

56# How often to emit a heartbeat status when the subprocess is silent. CDK 

57# can be quiet for minutes during EKS cluster creation or VPC tear-down 

58# while CloudFormation churns away. Without a heartbeat, MCP clients 

59# render the tool as wedged. 

60_HEARTBEAT_INTERVAL_SECONDS = 30 

61# How many tail lines of stderr to retain for the failure payload. CDK 

62# failure summaries typically fit in 30-50 lines; we cap at 80 to avoid 

63# blowing up the ToolError message size. 

64_STDERR_TAIL_LINES = 80 

65_PARTIAL_STATE_DISCLAIMER = ( 

66 "Partial CloudFormation state may remain — inspect via stack_status or the AWS console." 

67) 

68 

69 

70def _argv_has_traversal(argv: Sequence[str]) -> tuple[int, str] | None: 

71 """Return (index, offending_value) for the first non-flag arg with a ``..`` 

72 segment, else None. 

73 

74 A non-flag element is one that does not start with ``-``. The check matches 

75 the convention used by ``mcp.cli_runner._run_cli`` so the rejection shape 

76 is consistent across short- and long-running tools. 

77 """ 

78 for i, v in enumerate(argv): 

79 if v.startswith("-"): 

80 continue 

81 if ".." in v.split("/") or ".." in v.split("\\"): 

82 return (i, v[:100]) 

83 return None 

84 

85 

86async def _run_long_task( 

87 argv: Sequence[str], 

88 *, 

89 ctx: Any, 

90 progress: Any, 

91 is_stack_op: bool = True, 

92 total_units: int | None = None, 

93) -> str: 

94 """Run a long-running subprocess with rich status reporting. 

95 

96 Spawns ``argv`` via :func:`asyncio.create_subprocess_exec`, drains stdout 

97 and stderr concurrently, and streams progress through three channels: 

98 

99 * ``progress.set_message(...)`` for every meaningful line (≤ 200 chars) 

100 * ``progress.increment()`` once per stack completion (CDK's ``✅ gco-…`` 

101 / ``✨ gco-…`` lines), so the progress counter tracks AWS-side stack 

102 milestones rather than per-resource CFN events 

103 * ``ctx.info(...)`` for stderr lines so MCP clients with a Context 

104 surface render them through the standard MCP logging channel 

105 

106 When ``total_units`` is supplied, it's forwarded to 

107 ``progress.set_total(total_units)`` once at startup so the client 

108 can render a proper percentage. Callers that know how many stacks 

109 they're processing (e.g. ``deploy_all`` knowing the cdk.json 

110 deployment_regions count) should pass it; callers that don't can 

111 omit it and the client falls back to indeterminate progress. 

112 

113 A periodic heartbeat fires every ``_HEARTBEAT_INTERVAL_SECONDS`` when 

114 no output has been seen, so the client never sees a stalled progress 

115 state during quiet CDK phases (EKS cluster creation, VPC teardown). 

116 The heartbeat carries the elapsed wall-clock and the most recent 

117 stack name observed in the output, in the form 

118 ``"still running … 4m12s elapsed (last: gco-us-east-1)"``. 

119 

120 On cancellation, sends ``SIGTERM``, waits up to ``_CANCEL_GRACE_SECONDS`` 

121 for graceful shutdown, sends ``SIGKILL`` on timeout, cancels the drain 

122 tasks, and re-raises ``CancelledError``. When ``is_stack_op`` is True, 

123 the re-raised exception's message includes a CloudFormation 

124 partial-state disclaimer so operators know AWS state may be inconsistent. 

125 

126 Returns a JSON string on success: 

127 

128 * ``{"status": "ok", "stacks_completed": <n>, "duration_seconds": <s>, 

129 "last_stack": <str|null>}`` 

130 

131 Raises :class:`fastmcp.exceptions.ToolError` on subprocess failure with 

132 a structured message that includes: 

133 

134 * The exit code 

135 * The number of stacks completed before failure 

136 * The last stack the run was working on (parsed from output) 

137 * Whether any ``*_FAILED`` line was seen 

138 * The tail of stderr (up to ``_STDERR_TAIL_LINES`` lines) 

139 * The partial-state disclaimer (when ``is_stack_op=True``) 

140 

141 The structured payload lives in the ToolError ``args[0]`` as a JSON 

142 string so MCP clients can deserialize and render it; FastMCP forwards 

143 the message to the client's ``CallToolResult.content`` automatically. 

144 

145 Path-traversal rejection is unchanged — returns the same JSON stub 

146 without spawning a subprocess. 

147 """ 

148 hit = _argv_has_traversal(argv) 

149 if hit is not None: 

150 idx, val = hit 

151 return json.dumps({"error": "path_traversal_detected", "argv_index": idx, "value": val}) 

152 

153 started = time.monotonic() 

154 if total_units is not None and total_units > 0: 

155 # set_total may not be implemented by every Progress impl on 

156 # every fastmcp version — best-effort. 

157 with contextlib.suppress(AttributeError, NotImplementedError): 

158 await progress.set_total(int(total_units)) 

159 

160 # ``asyncio.create_subprocess_exec`` itself can be cancelled mid- 

161 # call (the inner ``_make_subprocess_transport`` awaits a waiter). 

162 # When that happens for a stack-op invocation, surface the same 

163 # partial-state disclaimer the post-creation cancel path emits — 

164 # AWS state may already be in flight even if our local subprocess 

165 # never got a PID. Without this guard a fast cancellation lands as 

166 # a bare ``CancelledError`` and the operator sees no warning that 

167 # CloudFormation may need inspection. 

168 try: 

169 proc = await asyncio.create_subprocess_exec( 

170 *argv, 

171 stdout=asyncio.subprocess.PIPE, 

172 stderr=asyncio.subprocess.PIPE, 

173 ) 

174 except asyncio.CancelledError: 

175 if is_stack_op: 

176 raise asyncio.CancelledError(_PARTIAL_STATE_DISCLAIMER) from None 

177 raise 

178 

179 # Disk-backed status: parallel observability channel that doesn't 

180 # depend on the MCP client surfacing notifications. Reads land in 

181 # the task_status / task_tail tools and in ``gco tasks``. 

182 # 

183 # Derive a human-readable tool name from the argv. ``gco stacks 

184 # deploy-all`` → ``stacks_deploy_all``; the dash is normalized to 

185 # an underscore so the IDs are filesystem-safe and grep-friendly. 

186 # Plain ``argv[0]`` (the bare interpreter or finch path) is the 

187 # fallback for non-gco invocations like ``images_build`` which 

188 # may shell out directly. 

189 if len(argv) >= 3 and argv[0] == "gco": 

190 tool_name = f"{argv[1]}_{argv[2].replace('-', '_')}" 

191 elif len(argv) >= 2 and argv[0] == "gco": 191 ↛ 192line 191 didn't jump to line 192 because the condition on line 191 was never true

192 tool_name = argv[1] 

193 else: 

194 tool_name = argv[0] if argv else "task" 

195 task_id = make_task_id(tool_name) 

196 status_writer = TaskStatusWriter( 

197 task_id=task_id, 

198 tool=tool_name, 

199 argv=list(argv), 

200 pid=proc.pid, 

201 total_units=total_units, 

202 ) 

203 

204 stacks_completed = 0 

205 failed_lines: list[str] = [] 

206 stderr_tail: deque[str] = deque(maxlen=_STDERR_TAIL_LINES) 

207 last_activity = time.monotonic() 

208 last_stack: str | None = None 

209 completed_stacks_set: set[str] = set() 

210 

211 async def _drain(stream: asyncio.StreamReader | None, label: str) -> None: 

212 nonlocal stacks_completed, last_activity, last_stack 

213 assert stream is not None # PIPE is set above for both streams 

214 async for raw in stream: 

215 line = raw.decode("utf-8", errors="replace").rstrip() 

216 if not line: 216 ↛ 217line 216 didn't jump to line 217 because the condition on line 216 was never true

217 continue 

218 last_activity = time.monotonic() 

219 await progress.set_message(line[:200]) 

220 status_writer.record_line(line, stream=label) 

221 stack_done = _CDK_STACK_DONE_RE.search(line) 

222 if stack_done is not None: 

223 # Increment once per unique stack — CDK can echo the 

224 # done line on stdout and stderr, double-counting 

225 # would mis-report progress. 

226 name = stack_done.group(1) 

227 if name not in completed_stacks_set: 

228 completed_stacks_set.add(name) 

229 stacks_completed += 1 

230 await progress.increment() 

231 status_writer.increment_stacks(name) 

232 if _CFN_FAILED_RE.search(line): 

233 failed_lines.append(line[:200]) 

234 stack_match = _CDK_STACK_LINE_RE.search(line) 

235 if stack_match: 

236 last_stack = stack_match.group(1) 

237 status_writer.set_last_stack(last_stack) 

238 if label == "stderr": 

239 stderr_tail.append(line) 

240 await ctx.info(f"stderr: {line[:200]}") 

241 

242 async def _heartbeat() -> None: 

243 """Emit a periodic 'still running' status when the subprocess is quiet. 

244 

245 The heartbeat does NOT call ``progress.increment()`` — that's 

246 reserved for AWS-side milestones. It only updates the message 

247 so the client sees activity, and emits a parallel ``ctx.info`` 

248 so clients without a Progress observer still get the signal. 

249 """ 

250 while True: 

251 await asyncio.sleep(_HEARTBEAT_INTERVAL_SECONDS) 

252 silent_for = time.monotonic() - last_activity 

253 if silent_for < _HEARTBEAT_INTERVAL_SECONDS: 

254 continue 

255 elapsed = int(time.monotonic() - started) 

256 stack_part = f" (last: {last_stack})" if last_stack else "" 

257 msg = f"still running … {_format_duration(elapsed)} elapsed{stack_part}" 

258 await progress.set_message(msg) 

259 await ctx.info(msg) 

260 

261 drains = [ 

262 asyncio.create_task(_drain(proc.stdout, "stdout")), 

263 asyncio.create_task(_drain(proc.stderr, "stderr")), 

264 ] 

265 heartbeat = asyncio.create_task(_heartbeat()) 

266 

267 try: 

268 rc = await proc.wait() 

269 await asyncio.gather(*drains, return_exceptions=True) 

270 except asyncio.CancelledError: 

271 if proc.returncode is None: 271 ↛ 279line 271 didn't jump to line 279 because the condition on line 271 was always true

272 proc.send_signal(signal.SIGTERM) 

273 try: 

274 await asyncio.wait_for(proc.wait(), timeout=_CANCEL_GRACE_SECONDS) 

275 except TimeoutError: 

276 if proc.returncode is None: 

277 proc.send_signal(signal.SIGKILL) 

278 await proc.wait() 

279 for d in drains: 

280 d.cancel() 

281 heartbeat.cancel() 

282 status_writer.finish( 

283 state="cancelled", 

284 exit_code=proc.returncode, 

285 error=_PARTIAL_STATE_DISCLAIMER if is_stack_op else "cancelled", 

286 ) 

287 if is_stack_op: 

288 raise asyncio.CancelledError(_PARTIAL_STATE_DISCLAIMER) from None 

289 raise 

290 finally: 

291 heartbeat.cancel() 

292 

293 duration = int(time.monotonic() - started) 

294 

295 if rc != 0: 

296 # Compose a structured failure payload so the client sees the 

297 # full error context (which stack failed, the stderr tail, the 

298 # partial-state disclaimer) rather than an opaque exit-code stub. 

299 payload: dict[str, Any] = { 

300 "error": f"exit_code={rc}", 

301 "exit_code": rc, 

302 "task_id": task_id, 

303 "stacks_completed": stacks_completed, 

304 "duration_seconds": duration, 

305 "last_stack": last_stack, 

306 "failed_events": failed_lines[:10], 

307 "stderr_tail": list(stderr_tail), 

308 } 

309 if is_stack_op: 

310 payload["disclaimer"] = _PARTIAL_STATE_DISCLAIMER 

311 status_writer.finish(state="failed", exit_code=rc, error=f"exit_code={rc}") 

312 # ToolError surfaces the payload to the client as a tool-level 

313 # failure (CallToolResult.is_error=True). Inline callers see it 

314 # via the FastMCP transport; agent clients render it as a 

315 # tool-error message rather than success-shaped data. 

316 raise ToolError(json.dumps(payload)) 

317 

318 status_writer.finish(state="succeeded", exit_code=rc) 

319 return json.dumps( 

320 { 

321 "status": "ok", 

322 "task_id": task_id, 

323 "stacks_completed": stacks_completed, 

324 "duration_seconds": duration, 

325 "last_stack": last_stack, 

326 } 

327 ) 

328 

329 

330def _format_duration(seconds: int) -> str: 

331 """Render an integer second count as ``HhMmSs`` / ``MmSs`` / ``Ss``.""" 

332 if seconds < 60: 

333 return f"{seconds}s" 

334 minutes, sec = divmod(seconds, 60) 

335 if minutes < 60: 

336 return f"{minutes}m{sec:02d}s" 

337 hours, mins = divmod(minutes, 60) 

338 return f"{hours}h{mins:02d}m{sec:02d}s"