Coverage for cli/commands/tasks_cmd.py: 72%
158 statements
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-15 15:07 +0000
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-15 15:07 +0000
1"""Long-running task observability commands.
3Mirrors the ``task_status`` / ``task_tail`` MCP tools so operators can
4inspect the same on-disk status records from a terminal:
6* ``gco tasks list`` — newest-first table of recent invocations
7* ``gco tasks show TASK_ID`` — full record for one task
8* ``gco tasks tail TASK_ID -n 100 [-f]`` — last N lines of raw output,
9 with ``-f`` polling like ``tail -f``
10* ``gco tasks prune`` — drop all but the most recent N records
12Long-running MCP tools (``deploy_all``, ``destroy_all``,
13``bootstrap_cdk``, ``deploy_stack``, ``destroy_stack``,
14``images_build``, ``images_push``) record progress to
15``~/.gco/tasks/{task_id}.json`` and the raw subprocess output to
16``~/.gco/tasks/{task_id}.log`` on every line. This module reads
17those artifacts and never writes them — the writer lives in
18``mcp/tools/_task_status.py``.
19"""
21import contextlib
22import json
23import sys
24import time
25from pathlib import Path
26from typing import Any
28import click
31def _status_dir() -> Path:
32 """Honour ``GCO_TASK_STATUS_DIR`` for tests, fall back to ``~/.gco/tasks``.
34 Mirrors ``mcp.tools._task_status.status_dir`` so the CLI and the MCP
35 server always read from the same place. We don't import the MCP
36 helper directly because ``cli/`` and ``mcp/`` are separate top-level
37 packages and we want this command to work without the MCP install.
38 """
39 import os
41 override = os.environ.get("GCO_TASK_STATUS_DIR")
42 if override: 42 ↛ 44line 42 didn't jump to line 44 because the condition on line 42 was always true
43 return Path(override)
44 return Path.home() / ".gco" / "tasks"
47def _is_pid_alive(pid: int | None) -> bool:
48 """Best-effort liveness check via ``os.kill(pid, 0)``.
50 Returns ``False`` when the PID is missing/zero or the OS reports
51 the process gone. Returns ``True`` for live processes including
52 those owned by other users (``PermissionError``). Anything else is
53 treated as not-alive so an unexpected ``OSError`` can't strand a
54 task in ``running`` forever.
55 """
56 import os
58 if pid is None or pid <= 0: 58 ↛ 59line 58 didn't jump to line 59 because the condition on line 58 was never true
59 return False
60 try:
61 os.kill(pid, 0)
62 return True
63 except ProcessLookupError:
64 return False
65 except PermissionError:
66 return True
67 except OSError:
68 return False
71def _read_status(path: Path) -> dict[str, Any] | None:
72 """Load one status JSON, applying the orphan rewrite.
74 Identical semantics to ``mcp.tools._task_status._read_status_file``:
75 re-checks the PID and rewrites ``state=running`` to ``orphaned``
76 when the recorded process is dead. Kept as a local copy so the
77 CLI doesn't need ``mcp/`` on the import path.
78 """
79 try:
80 record = json.loads(path.read_text(encoding="utf-8"))
81 except OSError, ValueError:
82 return None
83 if not isinstance(record, dict): 83 ↛ 84line 83 didn't jump to line 84 because the condition on line 83 was never true
84 return None
85 pid = record.get("pid")
86 is_alive = _is_pid_alive(pid if isinstance(pid, int) else None)
87 record["is_alive"] = is_alive
88 if record.get("state") == "running" and not is_alive: 88 ↛ 89line 88 didn't jump to line 89 because the condition on line 88 was never true
89 record["state"] = "orphaned"
90 return record
93def _list_records(directory: Path) -> list[dict[str, Any]]:
94 """Return all records newest-first."""
95 if not directory.exists(): 95 ↛ 96line 95 didn't jump to line 96 because the condition on line 95 was never true
96 return []
97 out: list[dict[str, Any]] = []
98 for path in sorted(directory.glob("*.json"), key=lambda p: p.stat().st_mtime, reverse=True):
99 record = _read_status(path)
100 if record is not None: 100 ↛ 98line 100 didn't jump to line 98 because the condition on line 100 was always true
101 out.append(record)
102 return out
105def _format_state(state: str) -> str:
106 """Colour-code state for terminal output. Plain text when not a TTY."""
107 if not sys.stdout.isatty(): 107 ↛ 109line 107 didn't jump to line 109 because the condition on line 107 was always true
108 return state
109 palette = {
110 "running": "\x1b[36mrunning\x1b[0m", # cyan
111 "succeeded": "\x1b[32msucceeded\x1b[0m", # green
112 "failed": "\x1b[31mfailed\x1b[0m", # red
113 "cancelled": "\x1b[33mcancelled\x1b[0m", # yellow
114 "orphaned": "\x1b[35morphaned\x1b[0m", # magenta
115 }
116 return palette.get(state, state)
119def _format_elapsed(seconds: int | None) -> str:
120 """Render an integer second count compactly: ``s`` / ``MmSs`` / ``HhMm``."""
121 if seconds is None: 121 ↛ 122line 121 didn't jump to line 122 because the condition on line 121 was never true
122 return "-"
123 if seconds < 60: 123 ↛ 124line 123 didn't jump to line 124 because the condition on line 123 was never true
124 return f"{seconds}s"
125 minutes, sec = divmod(seconds, 60)
126 if minutes < 60: 126 ↛ 128line 126 didn't jump to line 128 because the condition on line 126 was always true
127 return f"{minutes}m{sec:02d}s"
128 hours, mins = divmod(minutes, 60)
129 return f"{hours}h{mins:02d}m"
132@click.group()
133def tasks() -> None:
134 """Inspect long-running MCP / CLI task status.
136 Commands like ``gco stacks deploy-all`` and ``gco images push`` write
137 progress records and raw subprocess logs to ``~/.gco/tasks/`` so you
138 can observe them without parsing terminal scrollback. ``gco tasks
139 list/show/tail`` read those files; the writer is in the MCP tool
140 runner.
141 """
144@tasks.command("list")
145@click.option(
146 "-n",
147 "--limit",
148 type=int,
149 default=20,
150 show_default=True,
151 help="Maximum records to display (newest first).",
152)
153@click.option(
154 "--json",
155 "as_json",
156 is_flag=True,
157 help="Emit raw JSON instead of the table.",
158)
159def tasks_list(limit: int, as_json: bool) -> None:
160 """List recent task invocations newest-first.
162 Shows tool, state (with orphan rewriting for dead PIDs), elapsed
163 wall-clock, stacks completed, and the last stack name observed.
164 Pass ``--json`` for machine-readable output you can pipe to ``jq``.
165 """
166 records = _list_records(_status_dir())[:limit] if limit > 0 else _list_records(_status_dir())
168 if as_json:
169 click.echo(json.dumps({"tasks": records}, indent=2, sort_keys=True))
170 return
172 if not records:
173 click.echo(
174 "No tasks recorded yet. Run a long-running command (e.g. 'gco stacks deploy-all') to populate ~/.gco/tasks/."
175 )
176 return
178 header = (
179 f"{'TASK ID':<40} {'TOOL':<18} {'STATE':<11} {'ELAPSED':<8} {'STACKS':<10} LAST STACK"
180 )
181 click.echo(header)
182 click.echo("-" * len(header))
183 for r in records:
184 task_id = (r.get("task_id") or "")[:40]
185 tool = (r.get("tool") or "")[:18]
186 state = _format_state(r.get("state") or "?")
187 elapsed = _format_elapsed(r.get("elapsed_seconds"))
188 stacks_completed = r.get("stacks_completed") or 0
189 stacks_total = r.get("stacks_total")
190 stacks = f"{stacks_completed}/{stacks_total}" if stacks_total else f"{stacks_completed}"
191 last_stack = r.get("last_stack") or "-"
192 # State string may include ANSI codes — pad on the visible width.
193 visible_state = r.get("state") or "?"
194 state_pad = " " * max(0, 11 - len(visible_state))
195 click.echo(
196 f"{task_id:<40} {tool:<18} {state}{state_pad} {elapsed:<8} {stacks:<10} {last_stack}"
197 )
200@tasks.command("show")
201@click.argument("task_id")
202def tasks_show(task_id: str) -> None:
203 """Print the full JSON record for one task.
205 Useful when ``gco tasks list`` shows a task that needs deeper
206 inspection — argv, exit code, stderr tail, etc.
207 """
208 path = _status_dir() / f"{task_id}.json"
209 record = _read_status(path)
210 if record is None:
211 click.echo(f"Task not found: {task_id}", err=True)
212 sys.exit(1)
213 click.echo(json.dumps(record, indent=2, sort_keys=True))
216@tasks.command("tail")
217@click.argument("task_id")
218@click.option(
219 "-n",
220 "--lines",
221 type=int,
222 default=100,
223 show_default=True,
224 help="Lines to show.",
225)
226@click.option(
227 "-f",
228 "--follow",
229 is_flag=True,
230 help="Follow the log file (poll for new lines, like 'tail -f').",
231)
232def tasks_tail(task_id: str, lines: int, follow: bool) -> None:
233 """Print the last N lines of a task's raw output log.
235 Each line is prefixed with ``[stdout]`` or ``[stderr]`` so you can
236 tell which stream produced it. ``--follow`` keeps polling the file
237 until interrupted, mirroring ``tail -f``.
238 """
239 log_path = _status_dir() / f"{task_id}.log"
240 if not log_path.exists():
241 click.echo(f"No log for task: {task_id}", err=True)
242 sys.exit(1)
244 # Initial tail.
245 from collections import deque
247 try:
248 with open(log_path, encoding="utf-8", errors="replace") as fp:
249 buf: deque[str] = deque(fp, maxlen=lines if lines > 0 else 0)
250 except OSError as e:
251 click.echo(f"Failed to read log: {e}", err=True)
252 sys.exit(1)
254 for line in buf:
255 click.echo(line.rstrip("\n"))
257 if not follow: 257 ↛ 261line 257 didn't jump to line 261 because the condition on line 257 was always true
258 return
260 # Follow mode: poll the file every 500ms.
261 try:
262 with open(log_path, encoding="utf-8", errors="replace") as fp:
263 fp.seek(0, 2) # end of file
264 while True:
265 chunk = fp.read()
266 if chunk:
267 click.echo(chunk, nl=False)
268 else:
269 # Stop following once the task is no longer running.
270 record = _read_status(_status_dir() / f"{task_id}.json")
271 if record is not None and record.get("state") not in {"running"}:
272 return
273 time.sleep(0.5)
274 except KeyboardInterrupt:
275 return
278@tasks.command("prune")
279@click.option(
280 "-k",
281 "--keep",
282 type=int,
283 default=50,
284 show_default=True,
285 help="Keep the N most recent tasks. Older are deleted.",
286)
287@click.option("-y", "--yes", is_flag=True, help="Skip confirmation.")
288def tasks_prune(keep: int, yes: bool) -> None:
289 """Delete old task records, keeping the most recent N.
291 Useful if ``~/.gco/tasks/`` has accumulated stale records and you
292 want a manual sweep. The MCP runner also auto-prunes on every new
293 task start, so this is purely for ad-hoc cleanup.
294 """
295 directory = _status_dir()
296 if not directory.exists(): 296 ↛ 297line 296 didn't jump to line 297 because the condition on line 296 was never true
297 click.echo("No task directory yet — nothing to prune.")
298 return
300 json_files = sorted(directory.glob("*.json"), key=lambda p: p.stat().st_mtime, reverse=True)
301 stale = json_files[keep:]
302 if not stale: 302 ↛ 303line 302 didn't jump to line 303 because the condition on line 302 was never true
303 click.echo(f"Already at or below {keep} task(s). Nothing to do.")
304 return
306 if not yes: 306 ↛ 307line 306 didn't jump to line 307 because the condition on line 306 was never true
307 click.confirm(
308 f"Delete {len(stale)} task record(s) older than the {keep} most recent?",
309 abort=True,
310 )
312 removed = 0
313 for path in stale:
314 with contextlib.suppress(OSError):
315 path.unlink()
316 removed += 1
317 log_path = path.with_suffix(".log")
318 if log_path.exists(): 318 ↛ 319line 318 didn't jump to line 319 because the condition on line 318 was never true
319 with contextlib.suppress(OSError):
320 log_path.unlink()
322 click.echo(f"Removed {removed} task record(s).")