Coverage for cli/commands/tasks_cmd.py: 72%

158 statements  

« prev     ^ index     » next       coverage.py v7.14.1, created at 2026-06-15 15:07 +0000

1"""Long-running task observability commands. 

2 

3Mirrors the ``task_status`` / ``task_tail`` MCP tools so operators can 

4inspect the same on-disk status records from a terminal: 

5 

6* ``gco tasks list`` — newest-first table of recent invocations 

7* ``gco tasks show TASK_ID`` — full record for one task 

8* ``gco tasks tail TASK_ID -n 100 [-f]`` — last N lines of raw output, 

9 with ``-f`` polling like ``tail -f`` 

10* ``gco tasks prune`` — drop all but the most recent N records 

11 

12Long-running MCP tools (``deploy_all``, ``destroy_all``, 

13``bootstrap_cdk``, ``deploy_stack``, ``destroy_stack``, 

14``images_build``, ``images_push``) record progress to 

15``~/.gco/tasks/{task_id}.json`` and the raw subprocess output to 

16``~/.gco/tasks/{task_id}.log`` on every line. This module reads 

17those artifacts and never writes them — the writer lives in 

18``mcp/tools/_task_status.py``. 

19""" 

20 

21import contextlib 

22import json 

23import sys 

24import time 

25from pathlib import Path 

26from typing import Any 

27 

28import click 

29 

30 

31def _status_dir() -> Path: 

32 """Honour ``GCO_TASK_STATUS_DIR`` for tests, fall back to ``~/.gco/tasks``. 

33 

34 Mirrors ``mcp.tools._task_status.status_dir`` so the CLI and the MCP 

35 server always read from the same place. We don't import the MCP 

36 helper directly because ``cli/`` and ``mcp/`` are separate top-level 

37 packages and we want this command to work without the MCP install. 

38 """ 

39 import os 

40 

41 override = os.environ.get("GCO_TASK_STATUS_DIR") 

42 if override: 42 ↛ 44line 42 didn't jump to line 44 because the condition on line 42 was always true

43 return Path(override) 

44 return Path.home() / ".gco" / "tasks" 

45 

46 

47def _is_pid_alive(pid: int | None) -> bool: 

48 """Best-effort liveness check via ``os.kill(pid, 0)``. 

49 

50 Returns ``False`` when the PID is missing/zero or the OS reports 

51 the process gone. Returns ``True`` for live processes including 

52 those owned by other users (``PermissionError``). Anything else is 

53 treated as not-alive so an unexpected ``OSError`` can't strand a 

54 task in ``running`` forever. 

55 """ 

56 import os 

57 

58 if pid is None or pid <= 0: 58 ↛ 59line 58 didn't jump to line 59 because the condition on line 58 was never true

59 return False 

60 try: 

61 os.kill(pid, 0) 

62 return True 

63 except ProcessLookupError: 

64 return False 

65 except PermissionError: 

66 return True 

67 except OSError: 

68 return False 

69 

70 

71def _read_status(path: Path) -> dict[str, Any] | None: 

72 """Load one status JSON, applying the orphan rewrite. 

73 

74 Identical semantics to ``mcp.tools._task_status._read_status_file``: 

75 re-checks the PID and rewrites ``state=running`` to ``orphaned`` 

76 when the recorded process is dead. Kept as a local copy so the 

77 CLI doesn't need ``mcp/`` on the import path. 

78 """ 

79 try: 

80 record = json.loads(path.read_text(encoding="utf-8")) 

81 except OSError, ValueError: 

82 return None 

83 if not isinstance(record, dict): 83 ↛ 84line 83 didn't jump to line 84 because the condition on line 83 was never true

84 return None 

85 pid = record.get("pid") 

86 is_alive = _is_pid_alive(pid if isinstance(pid, int) else None) 

87 record["is_alive"] = is_alive 

88 if record.get("state") == "running" and not is_alive: 88 ↛ 89line 88 didn't jump to line 89 because the condition on line 88 was never true

89 record["state"] = "orphaned" 

90 return record 

91 

92 

93def _list_records(directory: Path) -> list[dict[str, Any]]: 

94 """Return all records newest-first.""" 

95 if not directory.exists(): 95 ↛ 96line 95 didn't jump to line 96 because the condition on line 95 was never true

96 return [] 

97 out: list[dict[str, Any]] = [] 

98 for path in sorted(directory.glob("*.json"), key=lambda p: p.stat().st_mtime, reverse=True): 

99 record = _read_status(path) 

100 if record is not None: 100 ↛ 98line 100 didn't jump to line 98 because the condition on line 100 was always true

101 out.append(record) 

102 return out 

103 

104 

105def _format_state(state: str) -> str: 

106 """Colour-code state for terminal output. Plain text when not a TTY.""" 

107 if not sys.stdout.isatty(): 107 ↛ 109line 107 didn't jump to line 109 because the condition on line 107 was always true

108 return state 

109 palette = { 

110 "running": "\x1b[36mrunning\x1b[0m", # cyan 

111 "succeeded": "\x1b[32msucceeded\x1b[0m", # green 

112 "failed": "\x1b[31mfailed\x1b[0m", # red 

113 "cancelled": "\x1b[33mcancelled\x1b[0m", # yellow 

114 "orphaned": "\x1b[35morphaned\x1b[0m", # magenta 

115 } 

116 return palette.get(state, state) 

117 

118 

119def _format_elapsed(seconds: int | None) -> str: 

120 """Render an integer second count compactly: ``s`` / ``MmSs`` / ``HhMm``.""" 

121 if seconds is None: 121 ↛ 122line 121 didn't jump to line 122 because the condition on line 121 was never true

122 return "-" 

123 if seconds < 60: 123 ↛ 124line 123 didn't jump to line 124 because the condition on line 123 was never true

124 return f"{seconds}s" 

125 minutes, sec = divmod(seconds, 60) 

126 if minutes < 60: 126 ↛ 128line 126 didn't jump to line 128 because the condition on line 126 was always true

127 return f"{minutes}m{sec:02d}s" 

128 hours, mins = divmod(minutes, 60) 

129 return f"{hours}h{mins:02d}m" 

130 

131 

132@click.group() 

133def tasks() -> None: 

134 """Inspect long-running MCP / CLI task status. 

135 

136 Commands like ``gco stacks deploy-all`` and ``gco images push`` write 

137 progress records and raw subprocess logs to ``~/.gco/tasks/`` so you 

138 can observe them without parsing terminal scrollback. ``gco tasks 

139 list/show/tail`` read those files; the writer is in the MCP tool 

140 runner. 

141 """ 

142 

143 

144@tasks.command("list") 

145@click.option( 

146 "-n", 

147 "--limit", 

148 type=int, 

149 default=20, 

150 show_default=True, 

151 help="Maximum records to display (newest first).", 

152) 

153@click.option( 

154 "--json", 

155 "as_json", 

156 is_flag=True, 

157 help="Emit raw JSON instead of the table.", 

158) 

159def tasks_list(limit: int, as_json: bool) -> None: 

160 """List recent task invocations newest-first. 

161 

162 Shows tool, state (with orphan rewriting for dead PIDs), elapsed 

163 wall-clock, stacks completed, and the last stack name observed. 

164 Pass ``--json`` for machine-readable output you can pipe to ``jq``. 

165 """ 

166 records = _list_records(_status_dir())[:limit] if limit > 0 else _list_records(_status_dir()) 

167 

168 if as_json: 

169 click.echo(json.dumps({"tasks": records}, indent=2, sort_keys=True)) 

170 return 

171 

172 if not records: 

173 click.echo( 

174 "No tasks recorded yet. Run a long-running command (e.g. 'gco stacks deploy-all') to populate ~/.gco/tasks/." 

175 ) 

176 return 

177 

178 header = ( 

179 f"{'TASK ID':<40} {'TOOL':<18} {'STATE':<11} {'ELAPSED':<8} {'STACKS':<10} LAST STACK" 

180 ) 

181 click.echo(header) 

182 click.echo("-" * len(header)) 

183 for r in records: 

184 task_id = (r.get("task_id") or "")[:40] 

185 tool = (r.get("tool") or "")[:18] 

186 state = _format_state(r.get("state") or "?") 

187 elapsed = _format_elapsed(r.get("elapsed_seconds")) 

188 stacks_completed = r.get("stacks_completed") or 0 

189 stacks_total = r.get("stacks_total") 

190 stacks = f"{stacks_completed}/{stacks_total}" if stacks_total else f"{stacks_completed}" 

191 last_stack = r.get("last_stack") or "-" 

192 # State string may include ANSI codes — pad on the visible width. 

193 visible_state = r.get("state") or "?" 

194 state_pad = " " * max(0, 11 - len(visible_state)) 

195 click.echo( 

196 f"{task_id:<40} {tool:<18} {state}{state_pad} {elapsed:<8} {stacks:<10} {last_stack}" 

197 ) 

198 

199 

200@tasks.command("show") 

201@click.argument("task_id") 

202def tasks_show(task_id: str) -> None: 

203 """Print the full JSON record for one task. 

204 

205 Useful when ``gco tasks list`` shows a task that needs deeper 

206 inspection — argv, exit code, stderr tail, etc. 

207 """ 

208 path = _status_dir() / f"{task_id}.json" 

209 record = _read_status(path) 

210 if record is None: 

211 click.echo(f"Task not found: {task_id}", err=True) 

212 sys.exit(1) 

213 click.echo(json.dumps(record, indent=2, sort_keys=True)) 

214 

215 

216@tasks.command("tail") 

217@click.argument("task_id") 

218@click.option( 

219 "-n", 

220 "--lines", 

221 type=int, 

222 default=100, 

223 show_default=True, 

224 help="Lines to show.", 

225) 

226@click.option( 

227 "-f", 

228 "--follow", 

229 is_flag=True, 

230 help="Follow the log file (poll for new lines, like 'tail -f').", 

231) 

232def tasks_tail(task_id: str, lines: int, follow: bool) -> None: 

233 """Print the last N lines of a task's raw output log. 

234 

235 Each line is prefixed with ``[stdout]`` or ``[stderr]`` so you can 

236 tell which stream produced it. ``--follow`` keeps polling the file 

237 until interrupted, mirroring ``tail -f``. 

238 """ 

239 log_path = _status_dir() / f"{task_id}.log" 

240 if not log_path.exists(): 

241 click.echo(f"No log for task: {task_id}", err=True) 

242 sys.exit(1) 

243 

244 # Initial tail. 

245 from collections import deque 

246 

247 try: 

248 with open(log_path, encoding="utf-8", errors="replace") as fp: 

249 buf: deque[str] = deque(fp, maxlen=lines if lines > 0 else 0) 

250 except OSError as e: 

251 click.echo(f"Failed to read log: {e}", err=True) 

252 sys.exit(1) 

253 

254 for line in buf: 

255 click.echo(line.rstrip("\n")) 

256 

257 if not follow: 257 ↛ 261line 257 didn't jump to line 261 because the condition on line 257 was always true

258 return 

259 

260 # Follow mode: poll the file every 500ms. 

261 try: 

262 with open(log_path, encoding="utf-8", errors="replace") as fp: 

263 fp.seek(0, 2) # end of file 

264 while True: 

265 chunk = fp.read() 

266 if chunk: 

267 click.echo(chunk, nl=False) 

268 else: 

269 # Stop following once the task is no longer running. 

270 record = _read_status(_status_dir() / f"{task_id}.json") 

271 if record is not None and record.get("state") not in {"running"}: 

272 return 

273 time.sleep(0.5) 

274 except KeyboardInterrupt: 

275 return 

276 

277 

278@tasks.command("prune") 

279@click.option( 

280 "-k", 

281 "--keep", 

282 type=int, 

283 default=50, 

284 show_default=True, 

285 help="Keep the N most recent tasks. Older are deleted.", 

286) 

287@click.option("-y", "--yes", is_flag=True, help="Skip confirmation.") 

288def tasks_prune(keep: int, yes: bool) -> None: 

289 """Delete old task records, keeping the most recent N. 

290 

291 Useful if ``~/.gco/tasks/`` has accumulated stale records and you 

292 want a manual sweep. The MCP runner also auto-prunes on every new 

293 task start, so this is purely for ad-hoc cleanup. 

294 """ 

295 directory = _status_dir() 

296 if not directory.exists(): 296 ↛ 297line 296 didn't jump to line 297 because the condition on line 296 was never true

297 click.echo("No task directory yet — nothing to prune.") 

298 return 

299 

300 json_files = sorted(directory.glob("*.json"), key=lambda p: p.stat().st_mtime, reverse=True) 

301 stale = json_files[keep:] 

302 if not stale: 302 ↛ 303line 302 didn't jump to line 303 because the condition on line 302 was never true

303 click.echo(f"Already at or below {keep} task(s). Nothing to do.") 

304 return 

305 

306 if not yes: 306 ↛ 307line 306 didn't jump to line 307 because the condition on line 306 was never true

307 click.confirm( 

308 f"Delete {len(stale)} task record(s) older than the {keep} most recent?", 

309 abort=True, 

310 ) 

311 

312 removed = 0 

313 for path in stale: 

314 with contextlib.suppress(OSError): 

315 path.unlink() 

316 removed += 1 

317 log_path = path.with_suffix(".log") 

318 if log_path.exists(): 318 ↛ 319line 318 didn't jump to line 319 because the condition on line 318 was never true

319 with contextlib.suppress(OSError): 

320 log_path.unlink() 

321 

322 click.echo(f"Removed {removed} task record(s).")