Coverage for mcp/mission/audit.py: 95%

108 statements  

« prev     ^ index     » next       coverage.py v7.14.1, created at 2026-06-15 15:07 +0000

1"""Mission-specific audit emitters. 

2 

3Three thin wrappers over the existing ``audit_logger`` from ``mcp/audit.py``. 

4Each emitter builds a structured dict matching the Mission audit-fields 

5schema and writes it as a single ``audit_logger.info(json.dumps(entry))`` 

6call — the same pattern that ``_build_audit_entry`` and ``emit_startup_log`` 

7use in ``mcp/audit.py``. 

8 

9Three event types, one helper each: 

10 

11* ``emit_phase_event`` — one per phase (propose / execute / observe / 

12 evaluate / decide), regardless of success or failure. Carries the 

13 per-phase ``started_at`` / ``ended_at`` timestamps that the engine 

14 measures around the phase body, plus an ``error_message`` field on 

15 ``failed`` events. 

16* ``emit_verdict_event`` — one per Decide_Phase outcome. Carries the 

17 ``verdict`` label and ``verdict_reason``, plus a ``revision_rationale`` 

18 field when the verdict is ``adjust``. 

19* ``emit_sampling_event`` — one per sampling call (whether the call was 

20 used, rejected, fell back, was unavailable, or was disabled). The second 

21 positional argument is ``iteration_index_or_purpose`` so callers can 

22 pass either an integer iteration index (during the loop) or a string / 

23 ``None`` for sampling that happens outside any single iteration (e.g. 

24 Final_Report fill-in). 

25 

26Every entry carries a fresh ``timestamp`` set to ``datetime.now(UTC).isoformat()`` 

27at emit time — the supplied phase ``started_at`` / ``ended_at`` are recorded 

28in their own dedicated fields and are not used as the entry timestamp. 

29 

30The Mission package is loaded via ``sys.path``-on-``mcp/`` (the same trick 

31``run_mcp.py`` uses for the rest of the MCP modules), so ``audit_logger`` is 

32imported as ``from audit import audit_logger`` — matching every 

33``mcp/tools/*.py`` module. 

34 

35In-process audit ring buffer 

36============================ 

37Mission also installs a bounded in-process collector 

38(``MissionAuditCollectorHandler``) on the shared ``gco.mcp.audit`` 

39logger so the ``mission://sessions/{session_id}/audit-replay`` 

40resource has a source of phase / verdict entries to feed 

41:func:`replay_audit_entries`. The buffer is capped at 5000 entries 

42(FIFO eviction) so a long-running process cannot OOM through the 

43audit channel; the cap fits comfortably above a 1000-iteration 

44session's ~6000 entries since the session would have terminated on 

45the iteration cap by then. 

46""" 

47 

48from __future__ import annotations 

49 

50import json 

51import logging 

52from collections import deque 

53from datetime import UTC, datetime 

54from typing import Any, Literal 

55 

56from audit import audit_logger 

57 

58# --------------------------------------------------------------------------- 

59# Event-type tags 

60# --------------------------------------------------------------------------- 

61 

62# Stable ``event_type`` strings so audit consumers can filter Mission events 

63# without parsing the rest of the entry. The values are part of the public 

64# audit contract — tests, dashboards, and the reconstruction test in 

65# ``tests/test_mission_audit.py`` match on these literals. 

66EVENT_TYPE_PHASE = "mission_phase_event" 

67EVENT_TYPE_VERDICT = "mission_verdict_event" 

68EVENT_TYPE_SAMPLING = "mission_sampling_event" 

69EVENT_TYPE_SCRIPT_CALL = "mission_script_call_event" 

70 

71 

72# --------------------------------------------------------------------------- 

73# Helpers 

74# --------------------------------------------------------------------------- 

75 

76 

77def _now_iso() -> str: 

78 """Return the current UTC time as an ISO 8601 string. 

79 

80 Centralised so every Mission emitter records ``timestamp`` in the same 

81 format. Matches the pattern used by ``_build_audit_entry`` and 

82 ``emit_startup_log`` in ``mcp/audit.py``. 

83 """ 

84 return datetime.now(UTC).isoformat() 

85 

86 

87def _emit(entry: dict[str, Any]) -> None: 

88 """Serialise ``entry`` and route through the shared audit logger. 

89 

90 Centralised so the three public emitters share one log path — and so a 

91 future swap to a structured handler only needs to change one site. 

92 """ 

93 audit_logger.info(json.dumps(entry)) 

94 

95 

96# --------------------------------------------------------------------------- 

97# Public emitters 

98# --------------------------------------------------------------------------- 

99 

100 

101def emit_phase_event( 

102 session_id: str, 

103 iteration_index: int, 

104 phase: Literal["propose", "execute", "observe", "evaluate", "decide"], 

105 status: Literal["succeeded", "failed"], 

106 started_at: str, 

107 ended_at: str, 

108 error_message: str | None = None, 

109) -> None: 

110 """Emit one ``mission_phase_event`` audit entry. 

111 

112 Called by ``MissionEngine`` exactly once per phase from a try/finally 

113 block, so a failed phase still produces an entry with 

114 ``phase_status="failed"`` and the exception's ``error_message``. 

115 

116 The ``started_at`` / ``ended_at`` arguments are recorded in their own 

117 fields — they describe the phase body, not the audit emit. The entry's 

118 own ``timestamp`` field is a fresh ``_now_iso()`` value set at call 

119 time so the audit log retains a faithful emission ordering. 

120 """ 

121 entry: dict[str, Any] = { 

122 "event_type": EVENT_TYPE_PHASE, 

123 "mission_session_id": session_id, 

124 "iteration_index": iteration_index, 

125 "phase": phase, 

126 "phase_status": status, 

127 "phase_started_at": started_at, 

128 "phase_ended_at": ended_at, 

129 "timestamp": _now_iso(), 

130 } 

131 if error_message: 

132 # Match the 200-char truncation that ``_build_audit_entry`` applies 

133 # to its own ``error`` field so phase errors don't blow up the log 

134 # line on a long traceback summary. 

135 entry["error_message"] = error_message[:200] 

136 _emit(entry) 

137 

138 

139def emit_verdict_event( 

140 session_id: str, 

141 iteration_index: int, 

142 verdict: str, 

143 verdict_reason: str, 

144 revision_rationale: str | None = None, 

145) -> None: 

146 """Emit one ``mission_verdict_event`` audit entry. 

147 

148 Called by ``MissionEngine`` once per Decide_Phase outcome. The 

149 ``revision_rationale`` is meaningful only on the ``adjust`` verdict 

150 (the rationale describes why the next iteration is being asked to 

151 revise the strategy), but this helper records it whenever the caller 

152 supplies a non-empty string — the engine decides when to populate 

153 it. This keeps the helper a pure formatter. 

154 """ 

155 entry: dict[str, Any] = { 

156 "event_type": EVENT_TYPE_VERDICT, 

157 "mission_session_id": session_id, 

158 "iteration_index": iteration_index, 

159 "verdict": verdict, 

160 "verdict_reason": verdict_reason, 

161 "timestamp": _now_iso(), 

162 } 

163 if revision_rationale: 

164 entry["revision_rationale"] = revision_rationale 

165 _emit(entry) 

166 

167 

168def emit_sampling_event( 

169 session_id: str, 

170 iteration_index_or_purpose: int | str | None, 

171 sampling_purpose: str, 

172 sampling_status: str, 

173 sampling_backend: str, 

174 sampling_model_id: str | None = None, 

175 model_output_bytes: int | None = None, 

176 validation_error: str | None = None, 

177 input_tokens: int | None = None, 

178 output_tokens: int | None = None, 

179) -> None: 

180 """Emit one ``mission_sampling_event`` audit entry. 

181 

182 The second positional argument carries the iteration index when the 

183 sampling call happens inside the loop body, or a string / ``None`` for 

184 out-of-loop calls (e.g. final-report ``lessons`` fill-in). The helper 

185 routes the value to the right field: 

186 

187 * ``int`` → ``iteration_index`` (matches the design's audit-fields 

188 table, which lists ``iteration_index`` as present on sampling 

189 events that occur during iterations). 

190 * non-empty ``str`` → ``sampling_context`` (an out-of-loop label). 

191 * ``None`` or empty string → neither field is recorded. 

192 

193 ``sampling_model_id`` and ``model_output_bytes`` are recorded only when 

194 the sampler actually produced output (typically ``sampling_status="used"``). 

195 ``validation_error`` is recorded only when present (typically 

196 ``sampling_status="rejected"``). The conditional emission keeps the 

197 audit entry from carrying empty / null fields that downstream consumers 

198 would otherwise have to filter out. 

199 """ 

200 entry: dict[str, Any] = { 

201 "event_type": EVENT_TYPE_SAMPLING, 

202 "mission_session_id": session_id, 

203 "sampling_purpose": sampling_purpose, 

204 "sampling_status": sampling_status, 

205 "sampling_backend": sampling_backend, 

206 "timestamp": _now_iso(), 

207 } 

208 

209 # Route the iteration-or-purpose argument. ``int`` → numeric 

210 # ``iteration_index``; ``str`` (non-empty) → ``sampling_context``; 

211 # ``None`` and empty strings → omitted. ``bool`` is excluded 

212 # explicitly because it is a subclass of ``int`` in Python and would 

213 # otherwise be silently recorded as ``iteration_index=True``. 

214 if isinstance(iteration_index_or_purpose, int) and not isinstance( 

215 iteration_index_or_purpose, bool 

216 ): 

217 entry["iteration_index"] = iteration_index_or_purpose 

218 elif isinstance(iteration_index_or_purpose, str) and iteration_index_or_purpose: 

219 entry["sampling_context"] = iteration_index_or_purpose 

220 

221 if sampling_model_id: 

222 entry["sampling_model_id"] = sampling_model_id 

223 if model_output_bytes is not None: 

224 entry["model_output_bytes"] = model_output_bytes 

225 if validation_error: 

226 entry["validation_error"] = validation_error[:200] 

227 if input_tokens is not None: 227 ↛ 228line 227 didn't jump to line 228 because the condition on line 227 was never true

228 entry["input_tokens"] = input_tokens 

229 if output_tokens is not None: 229 ↛ 230line 229 didn't jump to line 230 because the condition on line 229 was never true

230 entry["output_tokens"] = output_tokens 

231 

232 _emit(entry) 

233 

234 

235def emit_script_call_event( 

236 session_id: str, 

237 iteration_index: int, 

238 tool_name: str, 

239 status: str, 

240 duration_ms: int, 

241 error_message: str | None = None, 

242) -> None: 

243 """Emit one ``mission_script_call_event`` audit entry. 

244 

245 Called by the in-script tool wrapper after each invocation of an 

246 operator-allowlisted tool from inside a Mission script. The 

247 underlying tool call already produced its own ``@audit_logged`` 

248 entry through the registered tool function; this helper layers a 

249 second, distinct audit row tagged ``via_script=True`` so consumers 

250 can tell at a glance which calls were driven from a script versus 

251 a direct ``tool_calls`` strategy. 

252 

253 The ``status`` argument carries the call's terminal state (``ok`` 

254 / ``failed`` / ``skipped_not_allowed``) and ``duration_ms`` mirrors 

255 the per-call timing the wrapper records on its own 

256 ``script_call_log`` entries. ``error_message`` is recorded only 

257 when supplied and is truncated to 200 characters to match the 

258 existing convention in :func:`emit_phase_event` and 

259 :func:`emit_sampling_event`. 

260 """ 

261 entry: dict[str, Any] = { 

262 "event_type": EVENT_TYPE_SCRIPT_CALL, 

263 "via_script": True, 

264 "mission_session_id": session_id, 

265 "iteration_index": iteration_index, 

266 "tool_name": tool_name, 

267 "tool_status": status, 

268 "duration_ms": duration_ms, 

269 "timestamp": _now_iso(), 

270 } 

271 if error_message: 

272 entry["error_message"] = error_message[:200] 

273 _emit(entry) 

274 

275 

276__all__ = [ 

277 "EVENT_TYPE_PHASE", 

278 "EVENT_TYPE_SAMPLING", 

279 "EVENT_TYPE_SCRIPT_CALL", 

280 "EVENT_TYPE_VERDICT", 

281 "MissionAuditCollectorHandler", 

282 "emit_phase_event", 

283 "emit_sampling_event", 

284 "emit_script_call_event", 

285 "emit_verdict_event", 

286 "get_collector", 

287 "install_collector", 

288 "replay_audit_entries", 

289] 

290 

291 

292# --------------------------------------------------------------------------- 

293# Audit-replay helper 

294# --------------------------------------------------------------------------- 

295 

296 

297# Default cap on the in-process collector ring buffer. 5000 entries is 

298# big enough to cover a session that runs to its iteration budget 

299# (six entries per iteration × ~800 iterations) but small enough that 

300# a long-running process cannot OOM through the audit channel. 

301_DEFAULT_COLLECTOR_CAPACITY = 5000 

302 

303 

304class MissionAuditCollectorHandler(logging.Handler): 

305 """Bounded ring-buffer logging handler that captures Mission audit JSON. 

306 

307 Attached to the shared ``gco.mcp.audit`` logger by 

308 :func:`install_collector` so the 

309 ``mission://sessions/{session_id}/audit-replay`` resource has a 

310 source of phase / verdict entries to feed 

311 :func:`replay_audit_entries`. The handler filters by 

312 ``event_type`` so non-Mission audit emitters (the standard MCP 

313 tool-invocation decorator, the startup-log helper) do not pollute 

314 the buffer. 

315 

316 Bounded via :class:`collections.deque(maxlen=N)` so a long-running 

317 process never grows the buffer without bound. Operators who want a 

318 larger or smaller window can construct the handler explicitly with 

319 ``capacity=`` or call :func:`install_collector(capacity=...)`. 

320 """ 

321 

322 _MISSION_EVENT_TYPES = frozenset( 

323 { 

324 EVENT_TYPE_PHASE, 

325 EVENT_TYPE_VERDICT, 

326 EVENT_TYPE_SAMPLING, 

327 EVENT_TYPE_SCRIPT_CALL, 

328 } 

329 ) 

330 

331 def __init__(self, capacity: int = _DEFAULT_COLLECTOR_CAPACITY) -> None: 

332 super().__init__(level=logging.INFO) 

333 self._buffer: deque[dict[str, Any]] = deque(maxlen=capacity) 

334 

335 def emit(self, record: logging.LogRecord) -> None: 

336 """Capture Mission audit JSON entries into the ring buffer.""" 

337 try: 

338 payload = json.loads(record.getMessage()) 

339 except TypeError, ValueError: 

340 return 

341 if not isinstance(payload, dict): 

342 return 

343 if payload.get("event_type") not in self._MISSION_EVENT_TYPES: 

344 return 

345 self._buffer.append(payload) 

346 

347 def entries_for(self, session_id: str) -> list[dict[str, Any]]: 

348 """Return a list copy of every captured entry for ``session_id``.""" 

349 return [dict(e) for e in list(self._buffer) if e.get("mission_session_id") == session_id] 

350 

351 def clear(self) -> None: 

352 """Drop every captured entry. Useful for test isolation.""" 

353 self._buffer.clear() 

354 

355 

356# Module-level collector. ``None`` until :func:`install_collector` is 

357# called — the resources/__init__.py wiring installs it once at import 

358# time so every Mission audit entry the engine emits during the 

359# process lifetime is reachable from the audit-replay resource. 

360_COLLECTOR: MissionAuditCollectorHandler | None = None 

361 

362 

363def install_collector( 

364 capacity: int = _DEFAULT_COLLECTOR_CAPACITY, 

365) -> MissionAuditCollectorHandler: 

366 """Attach a :class:`MissionAuditCollectorHandler` to the audit logger. 

367 

368 Idempotent: a second call with the same parameters returns the 

369 existing handler. The function exists so test fixtures can clear 

370 and re-attach the handler between cases without leaking captured 

371 entries across the boundary. 

372 

373 Logger level boost. Python's stdlib ``logging`` defaults the root 

374 threshold to ``WARNING``, which means an unconfigured caller that 

375 never calls ``logging.basicConfig(level=logging.INFO)`` would 

376 silently drop every ``audit_logger.info(...)`` call before it 

377 reaches a handler — including this collector. The 

378 ``mission://sessions/{id}/audit-replay`` resource needs entries 

379 to flow regardless of the host's logging setup, so we floor the 

380 logger's level at ``INFO`` here. Hosts that have already set a 

381 finer threshold (e.g. ``DEBUG``) keep theirs; only the 

382 "unconfigured" case is repaired. 

383 """ 

384 global _COLLECTOR 

385 if _COLLECTOR is None: 

386 _COLLECTOR = MissionAuditCollectorHandler(capacity=capacity) 

387 audit_logger.addHandler(_COLLECTOR) 

388 # Floor at INFO so audit_logger.info() entries reach the handler 

389 # even when the host has not configured logging at all. We never 

390 # *raise* the threshold — a host that explicitly set DEBUG keeps 

391 # DEBUG. 

392 if audit_logger.level == logging.NOTSET or audit_logger.level > logging.INFO: 

393 audit_logger.setLevel(logging.INFO) 

394 return _COLLECTOR 

395 

396 

397def get_collector() -> MissionAuditCollectorHandler | None: 

398 """Return the installed collector or ``None`` when nothing is attached.""" 

399 return _COLLECTOR 

400 

401 

402def replay_audit_entries( 

403 session_id: str, 

404 entries: list[dict[str, Any]], 

405) -> list[dict[str, Any]]: 

406 """Reconstruct iteration history from a stream of Mission audit entries. 

407 

408 Pure function. Each iteration produces five 

409 ``mission_phase_event`` entries (one per phase) plus one 

410 ``mission_verdict_event`` entry that closes it out, in emission 

411 order. This walker filters ``entries`` to the events whose 

412 ``mission_session_id`` matches ``session_id``, accumulates phase 

413 events into the active iteration's ``phases`` list, and stamps 

414 the verdict + reason from the matching verdict event before 

415 appending the completed record. 

416 

417 Returns a list of dicts shaped like 

418 ``{"iteration_index": int, "phases": [{"phase", "status", 

419 "started_at", "ended_at", "error_message"}, ...], "verdict": str 

420 | None, "verdict_reason": str | None, "revision_rationale": str 

421 | None}``. The shape is intentionally narrow — it covers only 

422 the fields the audit stream is expected to fully describe, not 

423 the strategy / observation / criteria-evaluation fields the 

424 engine persists separately to the session backend. 

425 

426 A phase event whose ``iteration_index`` jumps ahead of the 

427 active iteration before its verdict event has landed flushes 

428 the active iteration with ``verdict=None`` / ``verdict_reason 

429 =None`` so a malformed audit stream surfaces as a visible 

430 sentinel rather than a silent merge. An iteration with no 

431 closing verdict event at end-of-stream is appended the same way. 

432 """ 

433 matching = [ 

434 e for e in entries if isinstance(e, dict) and e.get("mission_session_id") == session_id 

435 ] 

436 

437 iterations: list[dict[str, Any]] = [] 

438 current_index: int | None = None 

439 current_phases: list[dict[str, Any]] = [] 

440 

441 def _flush_current(verdict: str | None, reason: str | None, rationale: str | None) -> None: 

442 """Append the active iteration to the result list.""" 

443 if current_index is None: 443 ↛ 444line 443 didn't jump to line 444 because the condition on line 443 was never true

444 return 

445 iterations.append( 

446 { 

447 "iteration_index": current_index, 

448 "phases": list(current_phases), 

449 "verdict": verdict, 

450 "verdict_reason": reason, 

451 "revision_rationale": rationale, 

452 } 

453 ) 

454 

455 for entry in matching: 

456 event_type = entry.get("event_type") 

457 iteration_index = entry.get("iteration_index") 

458 

459 if event_type == EVENT_TYPE_PHASE: 

460 if current_index is not None and current_index != iteration_index: 

461 # New iteration arrived before the prior closed — 

462 # flush the prior with sentinel verdict / reason so 

463 # the caller can see the orphaned phases. 

464 _flush_current(None, None, None) 

465 current_phases = [] 

466 current_index = iteration_index 

467 phase_record: dict[str, Any] = { 

468 "phase": entry.get("phase"), 

469 "status": entry.get("phase_status"), 

470 "started_at": entry.get("phase_started_at"), 

471 "ended_at": entry.get("phase_ended_at"), 

472 } 

473 if "error_message" in entry: 

474 phase_record["error_message"] = entry["error_message"] 

475 current_phases.append(phase_record) 

476 elif event_type == EVENT_TYPE_VERDICT: 476 ↛ 455line 476 didn't jump to line 455 because the condition on line 476 was always true

477 # ``current_index`` may legitimately be ``None`` when a 

478 # verdict-only iteration arrives (e.g. a synthetic 

479 # ``cadence_skip``); in that case stamp the iteration 

480 # index from the verdict event itself. 

481 if current_index is None: 

482 current_index = iteration_index 

483 _flush_current( 

484 entry.get("verdict"), 

485 entry.get("verdict_reason"), 

486 entry.get("revision_rationale"), 

487 ) 

488 current_index = None 

489 current_phases = [] 

490 

491 # Stream ended mid-iteration — flush the unclosed iteration with 

492 # null verdict so the caller sees the partial record. 

493 if current_index is not None: 

494 _flush_current(None, None, None) 

495 

496 return iterations