Coverage for mcp/tools/_task_status.py: 87%
219 statements
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-15 15:07 +0000
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-15 15:07 +0000
1"""
2Disk-backed status reporting for long-running MCP tools.
4The MCP spec lets tools stream progress and log notifications back to the
5calling client, but client-side rendering of those notifications is
6inconsistent. Some clients drop them, some bury them in a debug panel,
7some never surface them at all. The result: a 30-60 minute deploy_all
8looks "wedged" to the user even though the underlying CLI is producing
9output every few seconds.
11This module gives every long-running tool a parallel observability
12channel that doesn't depend on the MCP wire at all:
14* A JSON ``status`` file at ``~/.gco/tasks/{task_id}.json`` updated on
15 every progress event (atomic via tempfile + ``os.replace``).
16* A raw ``log`` file at ``~/.gco/tasks/{task_id}.log`` containing the
17 full interleaved stdout+stderr of the subprocess, so operators have
18 the same forensic record CDK would have left in their terminal.
19* Orphan detection on read: when the status reports ``state=running``
20 but the recorded PID is no longer alive, the returned dict is
21 re-stamped to ``state=orphaned`` so callers see honest data even
22 when the MCP wrapper crashed without a final write.
24Two MCP tools (``task_status`` / ``task_tail``) and one CLI group
25(``gco tasks list/tail/prune``) read these files. Both surfaces are
26read-only — the writer always lives in ``_run_long_task``.
28The status directory is configurable via ``GCO_TASK_STATUS_DIR`` so
29unit tests can isolate to ``tmp_path``. ``GCO_DISABLE_TASK_STATUS=1``
30skips file emission entirely (kept as an escape hatch for sandboxed
31environments where ``~/.gco`` isn't writable).
32"""
34from __future__ import annotations
36import json
37import os
38import tempfile
39import threading
40import time
41from collections import deque
42from collections.abc import Iterable
43from datetime import UTC, datetime
44from pathlib import Path
45from typing import IO, Any
47# Keep at most this many task files (status + log) in the directory.
48# When a new task starts, anything older than the most recent N gets
49# pruned. 50 is enough for a couple of full deploy cycles plus a
50# handful of one-off image pushes.
51_TASK_RETENTION = 50
53# Tail buffer kept in the status file. Larger than what any single
54# message line will need, but capped so the JSON file stays small.
55_STATUS_TAIL_LINES = 20
57# Debounce window for atomic status writes. Per-line writes would do
58# hundreds of fsyncs during a noisy CDK phase; this batches them so
59# we write at most ~2 times per second under sustained output.
60_STATUS_WRITE_DEBOUNCE_SECONDS = 0.5
62# How long to consider a process "alive" — really just a guard so a
63# stale PID that's been recycled by another unrelated process isn't
64# falsely reported as still running. We don't try to be clever about
65# PID recycling beyond this; ``ps``-style verification would need
66# command-line matching and is out of scope.
67_PID_ALIVE_SIGNAL = 0
70def status_dir() -> Path:
71 """Resolve the status directory honouring the env override.
73 Tests set ``GCO_TASK_STATUS_DIR`` to a ``tmp_path`` so they don't
74 write to the developer's real home dir.
75 """
76 override = os.environ.get("GCO_TASK_STATUS_DIR")
77 if override:
78 return Path(override)
79 return Path.home() / ".gco" / "tasks"
82def task_status_enabled() -> bool:
83 """``True`` unless the operator explicitly opted out.
85 The opt-out is a defensive escape hatch — sandboxed CI runs and
86 container builds that mount a read-only home directory can set
87 ``GCO_DISABLE_TASK_STATUS=1`` to skip the disk writes without
88 losing any of the MCP wire-side observability.
89 """
90 return os.environ.get("GCO_DISABLE_TASK_STATUS", "").lower() not in {"1", "true", "yes"}
93def _now_iso() -> str:
94 """ISO-8601 UTC timestamp with second precision."""
95 return datetime.now(UTC).replace(microsecond=0).isoformat().replace("+00:00", "Z")
98def _atomic_write_json(target: Path, payload: dict[str, Any]) -> None:
99 """Write ``payload`` to ``target`` atomically.
101 Uses ``tempfile.NamedTemporaryFile`` in the same directory so
102 ``os.replace`` is a same-filesystem rename (atomic on POSIX).
103 Readers always see either the previous file or the new one,
104 never a partial write.
105 """
106 target.parent.mkdir(parents=True, exist_ok=True)
107 # delete=False so we can rename it before the context manager closes.
108 fd = tempfile.NamedTemporaryFile( # noqa: SIM115 - explicit close+replace below
109 mode="w",
110 encoding="utf-8",
111 dir=str(target.parent),
112 prefix=f".{target.name}.",
113 suffix=".tmp",
114 delete=False,
115 )
116 try:
117 json.dump(payload, fd, indent=2, sort_keys=True)
118 fd.write("\n")
119 fd.flush()
120 os.fsync(fd.fileno())
121 finally:
122 fd.close()
123 os.replace(fd.name, target)
126def _is_pid_alive(pid: int | None) -> bool:
127 """Best-effort liveness check via signal 0.
129 ``os.kill(pid, 0)`` raises ``ProcessLookupError`` when the PID is
130 free, ``PermissionError`` when the PID belongs to a process we
131 don't own (still alive), and returns ``None`` on success. Any
132 other ``OSError`` we treat conservatively as "not alive" so an
133 edge case can't strand a task in ``running`` forever.
134 """
135 if pid is None or pid <= 0: 135 ↛ 136line 135 didn't jump to line 136 because the condition on line 135 was never true
136 return False
137 try:
138 os.kill(pid, _PID_ALIVE_SIGNAL)
139 return True
140 except ProcessLookupError:
141 return False
142 except PermissionError:
143 return True
144 except OSError:
145 return False
148def _prune_old_tasks(directory: Path, keep: int = _TASK_RETENTION) -> None:
149 """Drop the oldest task files so the directory doesn't grow unbounded.
151 Tasks are paired (``{task_id}.json`` + ``{task_id}.log``); we sort
152 by mtime on the JSON file and remove pairs beyond the retention
153 cap. Errors are swallowed — pruning is best-effort and must never
154 break a live task's status emission.
155 """
156 try:
157 if not directory.exists(): 157 ↛ 158line 157 didn't jump to line 158 because the condition on line 157 was never true
158 return
159 json_files = sorted(
160 directory.glob("*.json"),
161 key=lambda p: p.stat().st_mtime,
162 reverse=True,
163 )
164 for stale in json_files[keep:]: 164 ↛ 165line 164 didn't jump to line 165 because the loop on line 164 never started
165 with _suppress_oserror():
166 stale.unlink()
167 log = stale.with_suffix(".log")
168 with _suppress_oserror():
169 if log.exists():
170 log.unlink()
171 except OSError:
172 # Pruning is best-effort. Never raise from here.
173 return
176class _suppress_oserror:
177 """Compact context manager that swallows OSError only.
179 ``contextlib.suppress(OSError)`` would do, but we use the dedicated
180 type so future readers can grep for the intentional swallows.
181 """
183 def __enter__(self) -> _suppress_oserror:
184 return self
186 def __exit__(
187 self,
188 exc_type: type[BaseException] | None,
189 exc: BaseException | None,
190 tb: object,
191 ) -> bool:
192 return exc_type is not None and issubclass(exc_type, OSError)
195def make_task_id(tool_name: str) -> str:
196 """Generate a sortable, collision-resistant task ID.
198 Format: ``{tool_name}-{millis_since_epoch}-{counter}``. The
199 millisecond timestamp gives natural sortability; the
200 monotonically-incrementing process-local counter ensures a tight
201 loop of calls (or a multi-threaded burst) never collides on a
202 single millisecond. Two tasks landing in the same millisecond
203 sort by counter, which is the actual call order.
204 """
205 counter = _next_task_counter()
206 return f"{tool_name}-{int(time.time() * 1000)}-{counter}"
209_TASK_COUNTER_LOCK = threading.Lock()
210_TASK_COUNTER = 0
213def _next_task_counter() -> int:
214 """Return a monotonically-increasing counter, thread-safe."""
215 global _TASK_COUNTER
216 with _TASK_COUNTER_LOCK:
217 _TASK_COUNTER += 1
218 return _TASK_COUNTER
221class TaskStatusWriter:
222 """Disk-backed status emitter for one long-running tool invocation.
224 Owns the lifecycle of a ``{task_id}.json`` + ``{task_id}.log``
225 pair. Use as a context manager — the ``__exit__`` flushes a
226 final ``state=succeeded|failed|cancelled`` write so observers
227 always see a terminal record.
229 Thread-safety: the lock guards the in-memory tail buffer and
230 debounce timestamps so it's safe to call ``record_line`` from
231 the stdout and stderr drain coroutines concurrently. The
232 underlying file ops are single-writer per task by construction.
233 """
235 def __init__(
236 self,
237 task_id: str,
238 tool: str,
239 argv: list[str],
240 *,
241 pid: int | None,
242 total_units: int | None = None,
243 ) -> None:
244 self.task_id = task_id
245 self.tool = tool
246 self._argv = list(argv)
247 self._pid = pid
248 self._total_units = total_units
249 self._enabled = task_status_enabled()
251 self._dir = status_dir()
252 self._status_path = self._dir / f"{task_id}.json"
253 self._log_path = self._dir / f"{task_id}.log"
255 self._started_at_iso = _now_iso()
256 self._started_monotonic = time.monotonic()
257 self._stacks_completed = 0
258 self._last_stack: str | None = None
259 self._last_message: str | None = None
260 self._tail: deque[str] = deque(maxlen=_STATUS_TAIL_LINES)
261 self._state = "running"
262 self._exit_code: int | None = None
263 self._error: str | None = None
265 self._lock = threading.Lock()
266 self._last_write_ts = 0.0
267 self._log_fp: IO[str] | None = None
269 if self._enabled:
270 try:
271 self._dir.mkdir(parents=True, exist_ok=True)
272 _prune_old_tasks(self._dir)
273 self._log_fp = open( # noqa: SIM115 - long-lived file handle, closed in finish()
274 self._log_path, "w", encoding="utf-8", buffering=1
275 )
276 except OSError:
277 # If we can't open the log, fall back to status-only.
278 self._log_fp = None
279 self._write_status_now()
281 # --- recording ------------------------------------------------------
283 def record_line(self, line: str, *, stream: str) -> None:
284 """Append a single output line and refresh the status file.
286 ``stream`` is "stdout" or "stderr" — used as a prefix in the
287 log file so readers can tell them apart in interleaved order.
288 Status writes are debounced to avoid hundreds of fsyncs on
289 noisy phases; the log file is unbuffered.
290 """
291 if not self._enabled:
292 return
293 with self._lock:
294 self._tail.append(line)
295 self._last_message = line
296 now = time.monotonic()
297 if now - self._last_write_ts >= _STATUS_WRITE_DEBOUNCE_SECONDS:
298 self._last_write_ts = now
299 self._write_status_now()
300 if self._log_fp is not None: 300 ↛ exitline 300 didn't jump to the function exit
301 with _suppress_oserror():
302 self._log_fp.write(f"[{stream}] {line}\n")
304 def increment_stacks(self, stack_name: str) -> None:
305 """Record that one more stack finished.
307 Triggers an immediate (un-debounced) write so the
308 ``stacks_completed`` counter is fresh for any reader polling
309 between stack milestones.
310 """
311 if not self._enabled:
312 return
313 with self._lock:
314 self._stacks_completed += 1
315 self._last_stack = stack_name
316 self._last_write_ts = time.monotonic()
317 self._write_status_now()
319 def set_last_stack(self, stack_name: str) -> None:
320 """Update the last-seen stack name without bumping the counter."""
321 if not self._enabled: 321 ↛ 322line 321 didn't jump to line 322 because the condition on line 321 was never true
322 return
323 with self._lock:
324 self._last_stack = stack_name
326 def finish(
327 self,
328 *,
329 state: str,
330 exit_code: int | None = None,
331 error: str | None = None,
332 ) -> None:
333 """Stamp a terminal state and flush.
335 ``state`` is one of "succeeded", "failed", "cancelled". Once
336 finished, the status file is no longer touched — readers
337 rely on the timestamp + state to know they have the final
338 record.
339 """
340 if not self._enabled:
341 return
342 with self._lock:
343 self._state = state
344 self._exit_code = exit_code
345 self._error = error
346 self._write_status_now()
347 if self._log_fp is not None: 347 ↛ exitline 347 didn't jump to the function exit
348 with _suppress_oserror():
349 self._log_fp.flush()
350 self._log_fp.close()
351 self._log_fp = None
353 # --- internals ------------------------------------------------------
355 def _build_payload(self) -> dict[str, Any]:
356 elapsed = int(time.monotonic() - self._started_monotonic)
357 payload: dict[str, Any] = {
358 "task_id": self.task_id,
359 "tool": self.tool,
360 "argv": self._argv,
361 "pid": self._pid,
362 "started_at": self._started_at_iso,
363 "updated_at": _now_iso(),
364 "elapsed_seconds": elapsed,
365 "state": self._state,
366 "stacks_completed": self._stacks_completed,
367 "last_stack": self._last_stack,
368 "last_message": self._last_message,
369 "tail": list(self._tail),
370 "log_path": str(self._log_path),
371 }
372 if self._total_units is not None and self._total_units > 0:
373 payload["stacks_total"] = self._total_units
374 if self._exit_code is not None:
375 payload["exit_code"] = self._exit_code
376 if self._error is not None:
377 payload["error"] = self._error
378 return payload
380 def _write_status_now(self) -> None:
381 try:
382 _atomic_write_json(self._status_path, self._build_payload())
383 except OSError:
384 # Disk emission is best-effort — never let a write failure
385 # crash the live tool invocation.
386 return
389# ---------------------------------------------------------------------------
390# Read-side helpers for the task_status / task_tail tools and the CLI.
391# ---------------------------------------------------------------------------
394def list_tasks(directory: Path | None = None) -> list[dict[str, Any]]:
395 """Return all known task status records, newest first.
397 Each record gets ``is_alive`` re-computed from the recorded PID,
398 and ``state`` is rewritten to ``"orphaned"`` when a record claims
399 ``running`` but the PID is dead. This is the canonical way for
400 callers to detect tasks whose MCP wrapper exited unexpectedly
401 while the underlying CDK kept going.
402 """
403 directory = directory or status_dir()
404 if not directory.exists(): 404 ↛ 405line 404 didn't jump to line 405 because the condition on line 404 was never true
405 return []
406 records: list[dict[str, Any]] = []
407 for path in sorted(
408 directory.glob("*.json"),
409 key=lambda p: p.stat().st_mtime,
410 reverse=True,
411 ):
412 record = _read_status_file(path)
413 if record is not None: 413 ↛ 407line 413 didn't jump to line 407 because the condition on line 413 was always true
414 records.append(record)
415 return records
418def get_task(task_id: str, directory: Path | None = None) -> dict[str, Any] | None:
419 """Return one task record by ID, with ``is_alive`` / orphan rewriting.
421 Returns ``None`` when the file is missing — callers can use that
422 to distinguish "no such task" from "task ran and finished".
423 """
424 directory = directory or status_dir()
425 path = directory / f"{task_id}.json"
426 if not path.exists():
427 return None
428 return _read_status_file(path)
431def tail_log(task_id: str, lines: int = 100, directory: Path | None = None) -> list[str]:
432 """Return the last ``lines`` lines of the task's raw log.
434 Empty list when the log file is missing, the task hasn't emitted
435 anything yet, or the directory is unreadable. Lines do NOT include
436 the trailing newline so callers don't have to strip them.
437 """
438 if lines <= 0:
439 return []
440 directory = directory or status_dir()
441 path = directory / f"{task_id}.log"
442 if not path.exists():
443 return []
444 try:
445 # deque maxlen does the bounded read for us with constant memory.
446 with open(path, encoding="utf-8", errors="replace") as fp:
447 buf: deque[str] = deque(fp, maxlen=lines)
448 return [line.rstrip("\n") for line in buf]
449 except OSError:
450 return []
453def prune_tasks(keep: int = _TASK_RETENTION, directory: Path | None = None) -> int:
454 """Remove all but the most-recent ``keep`` task files.
456 Returns the number of task IDs removed (one count per pair —
457 a JSON+log removal counts once). Useful for the ``gco tasks
458 prune`` CLI when an operator wants a manual sweep.
459 """
460 directory = directory or status_dir()
461 if not directory.exists(): 461 ↛ 462line 461 didn't jump to line 462 because the condition on line 461 was never true
462 return 0
463 json_files = sorted(
464 directory.glob("*.json"),
465 key=lambda p: p.stat().st_mtime,
466 reverse=True,
467 )
468 removed = 0
469 for stale in json_files[keep:]:
470 with _suppress_oserror():
471 stale.unlink()
472 removed += 1
473 log = stale.with_suffix(".log")
474 with _suppress_oserror():
475 if log.exists(): 475 ↛ 469line 475 didn't jump to line 469
476 log.unlink()
477 return removed
480def _read_status_file(path: Path) -> dict[str, Any] | None:
481 """Load a single status JSON, applying liveness/orphan post-processing.
483 Returns ``None`` only when the file is unreadable or malformed —
484 a transient half-written file (which atomic writes shouldn't
485 produce, but defence in depth) is treated as missing rather than
486 raised.
487 """
488 try:
489 text = path.read_text(encoding="utf-8")
490 record = json.loads(text)
491 except OSError, ValueError:
492 return None
493 if not isinstance(record, dict): 493 ↛ 494line 493 didn't jump to line 494 because the condition on line 493 was never true
494 return None
495 pid = record.get("pid")
496 is_alive = _is_pid_alive(pid if isinstance(pid, int) else None)
497 record["is_alive"] = is_alive
498 if record.get("state") == "running" and not is_alive:
499 record["state"] = "orphaned"
500 return record
503def task_ids_for(records: Iterable[dict[str, Any]]) -> list[str]:
504 """Project a sequence of task records to their IDs (helper for tests)."""
505 return [r["task_id"] for r in records if "task_id" in r]