Coverage for mcp/tools/_task_status.py: 87%

219 statements  

« prev     ^ index     » next       coverage.py v7.14.1, created at 2026-06-15 15:07 +0000

1""" 

2Disk-backed status reporting for long-running MCP tools. 

3 

4The MCP spec lets tools stream progress and log notifications back to the 

5calling client, but client-side rendering of those notifications is 

6inconsistent. Some clients drop them, some bury them in a debug panel, 

7some never surface them at all. The result: a 30-60 minute deploy_all 

8looks "wedged" to the user even though the underlying CLI is producing 

9output every few seconds. 

10 

11This module gives every long-running tool a parallel observability 

12channel that doesn't depend on the MCP wire at all: 

13 

14* A JSON ``status`` file at ``~/.gco/tasks/{task_id}.json`` updated on 

15 every progress event (atomic via tempfile + ``os.replace``). 

16* A raw ``log`` file at ``~/.gco/tasks/{task_id}.log`` containing the 

17 full interleaved stdout+stderr of the subprocess, so operators have 

18 the same forensic record CDK would have left in their terminal. 

19* Orphan detection on read: when the status reports ``state=running`` 

20 but the recorded PID is no longer alive, the returned dict is 

21 re-stamped to ``state=orphaned`` so callers see honest data even 

22 when the MCP wrapper crashed without a final write. 

23 

24Two MCP tools (``task_status`` / ``task_tail``) and one CLI group 

25(``gco tasks list/tail/prune``) read these files. Both surfaces are 

26read-only — the writer always lives in ``_run_long_task``. 

27 

28The status directory is configurable via ``GCO_TASK_STATUS_DIR`` so 

29unit tests can isolate to ``tmp_path``. ``GCO_DISABLE_TASK_STATUS=1`` 

30skips file emission entirely (kept as an escape hatch for sandboxed 

31environments where ``~/.gco`` isn't writable). 

32""" 

33 

34from __future__ import annotations 

35 

36import json 

37import os 

38import tempfile 

39import threading 

40import time 

41from collections import deque 

42from collections.abc import Iterable 

43from datetime import UTC, datetime 

44from pathlib import Path 

45from typing import IO, Any 

46 

47# Keep at most this many task files (status + log) in the directory. 

48# When a new task starts, anything older than the most recent N gets 

49# pruned. 50 is enough for a couple of full deploy cycles plus a 

50# handful of one-off image pushes. 

51_TASK_RETENTION = 50 

52 

53# Tail buffer kept in the status file. Larger than what any single 

54# message line will need, but capped so the JSON file stays small. 

55_STATUS_TAIL_LINES = 20 

56 

57# Debounce window for atomic status writes. Per-line writes would do 

58# hundreds of fsyncs during a noisy CDK phase; this batches them so 

59# we write at most ~2 times per second under sustained output. 

60_STATUS_WRITE_DEBOUNCE_SECONDS = 0.5 

61 

62# How long to consider a process "alive" — really just a guard so a 

63# stale PID that's been recycled by another unrelated process isn't 

64# falsely reported as still running. We don't try to be clever about 

65# PID recycling beyond this; ``ps``-style verification would need 

66# command-line matching and is out of scope. 

67_PID_ALIVE_SIGNAL = 0 

68 

69 

70def status_dir() -> Path: 

71 """Resolve the status directory honouring the env override. 

72 

73 Tests set ``GCO_TASK_STATUS_DIR`` to a ``tmp_path`` so they don't 

74 write to the developer's real home dir. 

75 """ 

76 override = os.environ.get("GCO_TASK_STATUS_DIR") 

77 if override: 

78 return Path(override) 

79 return Path.home() / ".gco" / "tasks" 

80 

81 

82def task_status_enabled() -> bool: 

83 """``True`` unless the operator explicitly opted out. 

84 

85 The opt-out is a defensive escape hatch — sandboxed CI runs and 

86 container builds that mount a read-only home directory can set 

87 ``GCO_DISABLE_TASK_STATUS=1`` to skip the disk writes without 

88 losing any of the MCP wire-side observability. 

89 """ 

90 return os.environ.get("GCO_DISABLE_TASK_STATUS", "").lower() not in {"1", "true", "yes"} 

91 

92 

93def _now_iso() -> str: 

94 """ISO-8601 UTC timestamp with second precision.""" 

95 return datetime.now(UTC).replace(microsecond=0).isoformat().replace("+00:00", "Z") 

96 

97 

98def _atomic_write_json(target: Path, payload: dict[str, Any]) -> None: 

99 """Write ``payload`` to ``target`` atomically. 

100 

101 Uses ``tempfile.NamedTemporaryFile`` in the same directory so 

102 ``os.replace`` is a same-filesystem rename (atomic on POSIX). 

103 Readers always see either the previous file or the new one, 

104 never a partial write. 

105 """ 

106 target.parent.mkdir(parents=True, exist_ok=True) 

107 # delete=False so we can rename it before the context manager closes. 

108 fd = tempfile.NamedTemporaryFile( # noqa: SIM115 - explicit close+replace below 

109 mode="w", 

110 encoding="utf-8", 

111 dir=str(target.parent), 

112 prefix=f".{target.name}.", 

113 suffix=".tmp", 

114 delete=False, 

115 ) 

116 try: 

117 json.dump(payload, fd, indent=2, sort_keys=True) 

118 fd.write("\n") 

119 fd.flush() 

120 os.fsync(fd.fileno()) 

121 finally: 

122 fd.close() 

123 os.replace(fd.name, target) 

124 

125 

126def _is_pid_alive(pid: int | None) -> bool: 

127 """Best-effort liveness check via signal 0. 

128 

129 ``os.kill(pid, 0)`` raises ``ProcessLookupError`` when the PID is 

130 free, ``PermissionError`` when the PID belongs to a process we 

131 don't own (still alive), and returns ``None`` on success. Any 

132 other ``OSError`` we treat conservatively as "not alive" so an 

133 edge case can't strand a task in ``running`` forever. 

134 """ 

135 if pid is None or pid <= 0: 135 ↛ 136line 135 didn't jump to line 136 because the condition on line 135 was never true

136 return False 

137 try: 

138 os.kill(pid, _PID_ALIVE_SIGNAL) 

139 return True 

140 except ProcessLookupError: 

141 return False 

142 except PermissionError: 

143 return True 

144 except OSError: 

145 return False 

146 

147 

148def _prune_old_tasks(directory: Path, keep: int = _TASK_RETENTION) -> None: 

149 """Drop the oldest task files so the directory doesn't grow unbounded. 

150 

151 Tasks are paired (``{task_id}.json`` + ``{task_id}.log``); we sort 

152 by mtime on the JSON file and remove pairs beyond the retention 

153 cap. Errors are swallowed — pruning is best-effort and must never 

154 break a live task's status emission. 

155 """ 

156 try: 

157 if not directory.exists(): 157 ↛ 158line 157 didn't jump to line 158 because the condition on line 157 was never true

158 return 

159 json_files = sorted( 

160 directory.glob("*.json"), 

161 key=lambda p: p.stat().st_mtime, 

162 reverse=True, 

163 ) 

164 for stale in json_files[keep:]: 164 ↛ 165line 164 didn't jump to line 165 because the loop on line 164 never started

165 with _suppress_oserror(): 

166 stale.unlink() 

167 log = stale.with_suffix(".log") 

168 with _suppress_oserror(): 

169 if log.exists(): 

170 log.unlink() 

171 except OSError: 

172 # Pruning is best-effort. Never raise from here. 

173 return 

174 

175 

176class _suppress_oserror: 

177 """Compact context manager that swallows OSError only. 

178 

179 ``contextlib.suppress(OSError)`` would do, but we use the dedicated 

180 type so future readers can grep for the intentional swallows. 

181 """ 

182 

183 def __enter__(self) -> _suppress_oserror: 

184 return self 

185 

186 def __exit__( 

187 self, 

188 exc_type: type[BaseException] | None, 

189 exc: BaseException | None, 

190 tb: object, 

191 ) -> bool: 

192 return exc_type is not None and issubclass(exc_type, OSError) 

193 

194 

195def make_task_id(tool_name: str) -> str: 

196 """Generate a sortable, collision-resistant task ID. 

197 

198 Format: ``{tool_name}-{millis_since_epoch}-{counter}``. The 

199 millisecond timestamp gives natural sortability; the 

200 monotonically-incrementing process-local counter ensures a tight 

201 loop of calls (or a multi-threaded burst) never collides on a 

202 single millisecond. Two tasks landing in the same millisecond 

203 sort by counter, which is the actual call order. 

204 """ 

205 counter = _next_task_counter() 

206 return f"{tool_name}-{int(time.time() * 1000)}-{counter}" 

207 

208 

209_TASK_COUNTER_LOCK = threading.Lock() 

210_TASK_COUNTER = 0 

211 

212 

213def _next_task_counter() -> int: 

214 """Return a monotonically-increasing counter, thread-safe.""" 

215 global _TASK_COUNTER 

216 with _TASK_COUNTER_LOCK: 

217 _TASK_COUNTER += 1 

218 return _TASK_COUNTER 

219 

220 

221class TaskStatusWriter: 

222 """Disk-backed status emitter for one long-running tool invocation. 

223 

224 Owns the lifecycle of a ``{task_id}.json`` + ``{task_id}.log`` 

225 pair. Use as a context manager — the ``__exit__`` flushes a 

226 final ``state=succeeded|failed|cancelled`` write so observers 

227 always see a terminal record. 

228 

229 Thread-safety: the lock guards the in-memory tail buffer and 

230 debounce timestamps so it's safe to call ``record_line`` from 

231 the stdout and stderr drain coroutines concurrently. The 

232 underlying file ops are single-writer per task by construction. 

233 """ 

234 

235 def __init__( 

236 self, 

237 task_id: str, 

238 tool: str, 

239 argv: list[str], 

240 *, 

241 pid: int | None, 

242 total_units: int | None = None, 

243 ) -> None: 

244 self.task_id = task_id 

245 self.tool = tool 

246 self._argv = list(argv) 

247 self._pid = pid 

248 self._total_units = total_units 

249 self._enabled = task_status_enabled() 

250 

251 self._dir = status_dir() 

252 self._status_path = self._dir / f"{task_id}.json" 

253 self._log_path = self._dir / f"{task_id}.log" 

254 

255 self._started_at_iso = _now_iso() 

256 self._started_monotonic = time.monotonic() 

257 self._stacks_completed = 0 

258 self._last_stack: str | None = None 

259 self._last_message: str | None = None 

260 self._tail: deque[str] = deque(maxlen=_STATUS_TAIL_LINES) 

261 self._state = "running" 

262 self._exit_code: int | None = None 

263 self._error: str | None = None 

264 

265 self._lock = threading.Lock() 

266 self._last_write_ts = 0.0 

267 self._log_fp: IO[str] | None = None 

268 

269 if self._enabled: 

270 try: 

271 self._dir.mkdir(parents=True, exist_ok=True) 

272 _prune_old_tasks(self._dir) 

273 self._log_fp = open( # noqa: SIM115 - long-lived file handle, closed in finish() 

274 self._log_path, "w", encoding="utf-8", buffering=1 

275 ) 

276 except OSError: 

277 # If we can't open the log, fall back to status-only. 

278 self._log_fp = None 

279 self._write_status_now() 

280 

281 # --- recording ------------------------------------------------------ 

282 

283 def record_line(self, line: str, *, stream: str) -> None: 

284 """Append a single output line and refresh the status file. 

285 

286 ``stream`` is "stdout" or "stderr" — used as a prefix in the 

287 log file so readers can tell them apart in interleaved order. 

288 Status writes are debounced to avoid hundreds of fsyncs on 

289 noisy phases; the log file is unbuffered. 

290 """ 

291 if not self._enabled: 

292 return 

293 with self._lock: 

294 self._tail.append(line) 

295 self._last_message = line 

296 now = time.monotonic() 

297 if now - self._last_write_ts >= _STATUS_WRITE_DEBOUNCE_SECONDS: 

298 self._last_write_ts = now 

299 self._write_status_now() 

300 if self._log_fp is not None: 300 ↛ exitline 300 didn't jump to the function exit

301 with _suppress_oserror(): 

302 self._log_fp.write(f"[{stream}] {line}\n") 

303 

304 def increment_stacks(self, stack_name: str) -> None: 

305 """Record that one more stack finished. 

306 

307 Triggers an immediate (un-debounced) write so the 

308 ``stacks_completed`` counter is fresh for any reader polling 

309 between stack milestones. 

310 """ 

311 if not self._enabled: 

312 return 

313 with self._lock: 

314 self._stacks_completed += 1 

315 self._last_stack = stack_name 

316 self._last_write_ts = time.monotonic() 

317 self._write_status_now() 

318 

319 def set_last_stack(self, stack_name: str) -> None: 

320 """Update the last-seen stack name without bumping the counter.""" 

321 if not self._enabled: 321 ↛ 322line 321 didn't jump to line 322 because the condition on line 321 was never true

322 return 

323 with self._lock: 

324 self._last_stack = stack_name 

325 

326 def finish( 

327 self, 

328 *, 

329 state: str, 

330 exit_code: int | None = None, 

331 error: str | None = None, 

332 ) -> None: 

333 """Stamp a terminal state and flush. 

334 

335 ``state`` is one of "succeeded", "failed", "cancelled". Once 

336 finished, the status file is no longer touched — readers 

337 rely on the timestamp + state to know they have the final 

338 record. 

339 """ 

340 if not self._enabled: 

341 return 

342 with self._lock: 

343 self._state = state 

344 self._exit_code = exit_code 

345 self._error = error 

346 self._write_status_now() 

347 if self._log_fp is not None: 347 ↛ exitline 347 didn't jump to the function exit

348 with _suppress_oserror(): 

349 self._log_fp.flush() 

350 self._log_fp.close() 

351 self._log_fp = None 

352 

353 # --- internals ------------------------------------------------------ 

354 

355 def _build_payload(self) -> dict[str, Any]: 

356 elapsed = int(time.monotonic() - self._started_monotonic) 

357 payload: dict[str, Any] = { 

358 "task_id": self.task_id, 

359 "tool": self.tool, 

360 "argv": self._argv, 

361 "pid": self._pid, 

362 "started_at": self._started_at_iso, 

363 "updated_at": _now_iso(), 

364 "elapsed_seconds": elapsed, 

365 "state": self._state, 

366 "stacks_completed": self._stacks_completed, 

367 "last_stack": self._last_stack, 

368 "last_message": self._last_message, 

369 "tail": list(self._tail), 

370 "log_path": str(self._log_path), 

371 } 

372 if self._total_units is not None and self._total_units > 0: 

373 payload["stacks_total"] = self._total_units 

374 if self._exit_code is not None: 

375 payload["exit_code"] = self._exit_code 

376 if self._error is not None: 

377 payload["error"] = self._error 

378 return payload 

379 

380 def _write_status_now(self) -> None: 

381 try: 

382 _atomic_write_json(self._status_path, self._build_payload()) 

383 except OSError: 

384 # Disk emission is best-effort — never let a write failure 

385 # crash the live tool invocation. 

386 return 

387 

388 

389# --------------------------------------------------------------------------- 

390# Read-side helpers for the task_status / task_tail tools and the CLI. 

391# --------------------------------------------------------------------------- 

392 

393 

394def list_tasks(directory: Path | None = None) -> list[dict[str, Any]]: 

395 """Return all known task status records, newest first. 

396 

397 Each record gets ``is_alive`` re-computed from the recorded PID, 

398 and ``state`` is rewritten to ``"orphaned"`` when a record claims 

399 ``running`` but the PID is dead. This is the canonical way for 

400 callers to detect tasks whose MCP wrapper exited unexpectedly 

401 while the underlying CDK kept going. 

402 """ 

403 directory = directory or status_dir() 

404 if not directory.exists(): 404 ↛ 405line 404 didn't jump to line 405 because the condition on line 404 was never true

405 return [] 

406 records: list[dict[str, Any]] = [] 

407 for path in sorted( 

408 directory.glob("*.json"), 

409 key=lambda p: p.stat().st_mtime, 

410 reverse=True, 

411 ): 

412 record = _read_status_file(path) 

413 if record is not None: 413 ↛ 407line 413 didn't jump to line 407 because the condition on line 413 was always true

414 records.append(record) 

415 return records 

416 

417 

418def get_task(task_id: str, directory: Path | None = None) -> dict[str, Any] | None: 

419 """Return one task record by ID, with ``is_alive`` / orphan rewriting. 

420 

421 Returns ``None`` when the file is missing — callers can use that 

422 to distinguish "no such task" from "task ran and finished". 

423 """ 

424 directory = directory or status_dir() 

425 path = directory / f"{task_id}.json" 

426 if not path.exists(): 

427 return None 

428 return _read_status_file(path) 

429 

430 

431def tail_log(task_id: str, lines: int = 100, directory: Path | None = None) -> list[str]: 

432 """Return the last ``lines`` lines of the task's raw log. 

433 

434 Empty list when the log file is missing, the task hasn't emitted 

435 anything yet, or the directory is unreadable. Lines do NOT include 

436 the trailing newline so callers don't have to strip them. 

437 """ 

438 if lines <= 0: 

439 return [] 

440 directory = directory or status_dir() 

441 path = directory / f"{task_id}.log" 

442 if not path.exists(): 

443 return [] 

444 try: 

445 # deque maxlen does the bounded read for us with constant memory. 

446 with open(path, encoding="utf-8", errors="replace") as fp: 

447 buf: deque[str] = deque(fp, maxlen=lines) 

448 return [line.rstrip("\n") for line in buf] 

449 except OSError: 

450 return [] 

451 

452 

453def prune_tasks(keep: int = _TASK_RETENTION, directory: Path | None = None) -> int: 

454 """Remove all but the most-recent ``keep`` task files. 

455 

456 Returns the number of task IDs removed (one count per pair — 

457 a JSON+log removal counts once). Useful for the ``gco tasks 

458 prune`` CLI when an operator wants a manual sweep. 

459 """ 

460 directory = directory or status_dir() 

461 if not directory.exists(): 461 ↛ 462line 461 didn't jump to line 462 because the condition on line 461 was never true

462 return 0 

463 json_files = sorted( 

464 directory.glob("*.json"), 

465 key=lambda p: p.stat().st_mtime, 

466 reverse=True, 

467 ) 

468 removed = 0 

469 for stale in json_files[keep:]: 

470 with _suppress_oserror(): 

471 stale.unlink() 

472 removed += 1 

473 log = stale.with_suffix(".log") 

474 with _suppress_oserror(): 

475 if log.exists(): 475 ↛ 469line 475 didn't jump to line 469

476 log.unlink() 

477 return removed 

478 

479 

480def _read_status_file(path: Path) -> dict[str, Any] | None: 

481 """Load a single status JSON, applying liveness/orphan post-processing. 

482 

483 Returns ``None`` only when the file is unreadable or malformed — 

484 a transient half-written file (which atomic writes shouldn't 

485 produce, but defence in depth) is treated as missing rather than 

486 raised. 

487 """ 

488 try: 

489 text = path.read_text(encoding="utf-8") 

490 record = json.loads(text) 

491 except OSError, ValueError: 

492 return None 

493 if not isinstance(record, dict): 493 ↛ 494line 493 didn't jump to line 494 because the condition on line 493 was never true

494 return None 

495 pid = record.get("pid") 

496 is_alive = _is_pid_alive(pid if isinstance(pid, int) else None) 

497 record["is_alive"] = is_alive 

498 if record.get("state") == "running" and not is_alive: 

499 record["state"] = "orphaned" 

500 return record 

501 

502 

503def task_ids_for(records: Iterable[dict[str, Any]]) -> list[str]: 

504 """Project a sequence of task records to their IDs (helper for tests).""" 

505 return [r["task_id"] for r in records if "task_id" in r]