Coverage for mcp/mission/audit.py: 95%
108 statements
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-15 15:07 +0000
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-15 15:07 +0000
1"""Mission-specific audit emitters.
3Three thin wrappers over the existing ``audit_logger`` from ``mcp/audit.py``.
4Each emitter builds a structured dict matching the Mission audit-fields
5schema and writes it as a single ``audit_logger.info(json.dumps(entry))``
6call — the same pattern that ``_build_audit_entry`` and ``emit_startup_log``
7use in ``mcp/audit.py``.
9Three event types, one helper each:
11* ``emit_phase_event`` — one per phase (propose / execute / observe /
12 evaluate / decide), regardless of success or failure. Carries the
13 per-phase ``started_at`` / ``ended_at`` timestamps that the engine
14 measures around the phase body, plus an ``error_message`` field on
15 ``failed`` events.
16* ``emit_verdict_event`` — one per Decide_Phase outcome. Carries the
17 ``verdict`` label and ``verdict_reason``, plus a ``revision_rationale``
18 field when the verdict is ``adjust``.
19* ``emit_sampling_event`` — one per sampling call (whether the call was
20 used, rejected, fell back, was unavailable, or was disabled). The second
21 positional argument is ``iteration_index_or_purpose`` so callers can
22 pass either an integer iteration index (during the loop) or a string /
23 ``None`` for sampling that happens outside any single iteration (e.g.
24 Final_Report fill-in).
26Every entry carries a fresh ``timestamp`` set to ``datetime.now(UTC).isoformat()``
27at emit time — the supplied phase ``started_at`` / ``ended_at`` are recorded
28in their own dedicated fields and are not used as the entry timestamp.
30The Mission package is loaded via ``sys.path``-on-``mcp/`` (the same trick
31``run_mcp.py`` uses for the rest of the MCP modules), so ``audit_logger`` is
32imported as ``from audit import audit_logger`` — matching every
33``mcp/tools/*.py`` module.
35In-process audit ring buffer
36============================
37Mission also installs a bounded in-process collector
38(``MissionAuditCollectorHandler``) on the shared ``gco.mcp.audit``
39logger so the ``mission://sessions/{session_id}/audit-replay``
40resource has a source of phase / verdict entries to feed
41:func:`replay_audit_entries`. The buffer is capped at 5000 entries
42(FIFO eviction) so a long-running process cannot OOM through the
43audit channel; the cap fits comfortably above a 1000-iteration
44session's ~6000 entries since the session would have terminated on
45the iteration cap by then.
46"""
48from __future__ import annotations
50import json
51import logging
52from collections import deque
53from datetime import UTC, datetime
54from typing import Any, Literal
56from audit import audit_logger
58# ---------------------------------------------------------------------------
59# Event-type tags
60# ---------------------------------------------------------------------------
62# Stable ``event_type`` strings so audit consumers can filter Mission events
63# without parsing the rest of the entry. The values are part of the public
64# audit contract — tests, dashboards, and the reconstruction test in
65# ``tests/test_mission_audit.py`` match on these literals.
66EVENT_TYPE_PHASE = "mission_phase_event"
67EVENT_TYPE_VERDICT = "mission_verdict_event"
68EVENT_TYPE_SAMPLING = "mission_sampling_event"
69EVENT_TYPE_SCRIPT_CALL = "mission_script_call_event"
72# ---------------------------------------------------------------------------
73# Helpers
74# ---------------------------------------------------------------------------
77def _now_iso() -> str:
78 """Return the current UTC time as an ISO 8601 string.
80 Centralised so every Mission emitter records ``timestamp`` in the same
81 format. Matches the pattern used by ``_build_audit_entry`` and
82 ``emit_startup_log`` in ``mcp/audit.py``.
83 """
84 return datetime.now(UTC).isoformat()
87def _emit(entry: dict[str, Any]) -> None:
88 """Serialise ``entry`` and route through the shared audit logger.
90 Centralised so the three public emitters share one log path — and so a
91 future swap to a structured handler only needs to change one site.
92 """
93 audit_logger.info(json.dumps(entry))
96# ---------------------------------------------------------------------------
97# Public emitters
98# ---------------------------------------------------------------------------
101def emit_phase_event(
102 session_id: str,
103 iteration_index: int,
104 phase: Literal["propose", "execute", "observe", "evaluate", "decide"],
105 status: Literal["succeeded", "failed"],
106 started_at: str,
107 ended_at: str,
108 error_message: str | None = None,
109) -> None:
110 """Emit one ``mission_phase_event`` audit entry.
112 Called by ``MissionEngine`` exactly once per phase from a try/finally
113 block, so a failed phase still produces an entry with
114 ``phase_status="failed"`` and the exception's ``error_message``.
116 The ``started_at`` / ``ended_at`` arguments are recorded in their own
117 fields — they describe the phase body, not the audit emit. The entry's
118 own ``timestamp`` field is a fresh ``_now_iso()`` value set at call
119 time so the audit log retains a faithful emission ordering.
120 """
121 entry: dict[str, Any] = {
122 "event_type": EVENT_TYPE_PHASE,
123 "mission_session_id": session_id,
124 "iteration_index": iteration_index,
125 "phase": phase,
126 "phase_status": status,
127 "phase_started_at": started_at,
128 "phase_ended_at": ended_at,
129 "timestamp": _now_iso(),
130 }
131 if error_message:
132 # Match the 200-char truncation that ``_build_audit_entry`` applies
133 # to its own ``error`` field so phase errors don't blow up the log
134 # line on a long traceback summary.
135 entry["error_message"] = error_message[:200]
136 _emit(entry)
139def emit_verdict_event(
140 session_id: str,
141 iteration_index: int,
142 verdict: str,
143 verdict_reason: str,
144 revision_rationale: str | None = None,
145) -> None:
146 """Emit one ``mission_verdict_event`` audit entry.
148 Called by ``MissionEngine`` once per Decide_Phase outcome. The
149 ``revision_rationale`` is meaningful only on the ``adjust`` verdict
150 (the rationale describes why the next iteration is being asked to
151 revise the strategy), but this helper records it whenever the caller
152 supplies a non-empty string — the engine decides when to populate
153 it. This keeps the helper a pure formatter.
154 """
155 entry: dict[str, Any] = {
156 "event_type": EVENT_TYPE_VERDICT,
157 "mission_session_id": session_id,
158 "iteration_index": iteration_index,
159 "verdict": verdict,
160 "verdict_reason": verdict_reason,
161 "timestamp": _now_iso(),
162 }
163 if revision_rationale:
164 entry["revision_rationale"] = revision_rationale
165 _emit(entry)
168def emit_sampling_event(
169 session_id: str,
170 iteration_index_or_purpose: int | str | None,
171 sampling_purpose: str,
172 sampling_status: str,
173 sampling_backend: str,
174 sampling_model_id: str | None = None,
175 model_output_bytes: int | None = None,
176 validation_error: str | None = None,
177 input_tokens: int | None = None,
178 output_tokens: int | None = None,
179) -> None:
180 """Emit one ``mission_sampling_event`` audit entry.
182 The second positional argument carries the iteration index when the
183 sampling call happens inside the loop body, or a string / ``None`` for
184 out-of-loop calls (e.g. final-report ``lessons`` fill-in). The helper
185 routes the value to the right field:
187 * ``int`` → ``iteration_index`` (matches the design's audit-fields
188 table, which lists ``iteration_index`` as present on sampling
189 events that occur during iterations).
190 * non-empty ``str`` → ``sampling_context`` (an out-of-loop label).
191 * ``None`` or empty string → neither field is recorded.
193 ``sampling_model_id`` and ``model_output_bytes`` are recorded only when
194 the sampler actually produced output (typically ``sampling_status="used"``).
195 ``validation_error`` is recorded only when present (typically
196 ``sampling_status="rejected"``). The conditional emission keeps the
197 audit entry from carrying empty / null fields that downstream consumers
198 would otherwise have to filter out.
199 """
200 entry: dict[str, Any] = {
201 "event_type": EVENT_TYPE_SAMPLING,
202 "mission_session_id": session_id,
203 "sampling_purpose": sampling_purpose,
204 "sampling_status": sampling_status,
205 "sampling_backend": sampling_backend,
206 "timestamp": _now_iso(),
207 }
209 # Route the iteration-or-purpose argument. ``int`` → numeric
210 # ``iteration_index``; ``str`` (non-empty) → ``sampling_context``;
211 # ``None`` and empty strings → omitted. ``bool`` is excluded
212 # explicitly because it is a subclass of ``int`` in Python and would
213 # otherwise be silently recorded as ``iteration_index=True``.
214 if isinstance(iteration_index_or_purpose, int) and not isinstance(
215 iteration_index_or_purpose, bool
216 ):
217 entry["iteration_index"] = iteration_index_or_purpose
218 elif isinstance(iteration_index_or_purpose, str) and iteration_index_or_purpose:
219 entry["sampling_context"] = iteration_index_or_purpose
221 if sampling_model_id:
222 entry["sampling_model_id"] = sampling_model_id
223 if model_output_bytes is not None:
224 entry["model_output_bytes"] = model_output_bytes
225 if validation_error:
226 entry["validation_error"] = validation_error[:200]
227 if input_tokens is not None: 227 ↛ 228line 227 didn't jump to line 228 because the condition on line 227 was never true
228 entry["input_tokens"] = input_tokens
229 if output_tokens is not None: 229 ↛ 230line 229 didn't jump to line 230 because the condition on line 229 was never true
230 entry["output_tokens"] = output_tokens
232 _emit(entry)
235def emit_script_call_event(
236 session_id: str,
237 iteration_index: int,
238 tool_name: str,
239 status: str,
240 duration_ms: int,
241 error_message: str | None = None,
242) -> None:
243 """Emit one ``mission_script_call_event`` audit entry.
245 Called by the in-script tool wrapper after each invocation of an
246 operator-allowlisted tool from inside a Mission script. The
247 underlying tool call already produced its own ``@audit_logged``
248 entry through the registered tool function; this helper layers a
249 second, distinct audit row tagged ``via_script=True`` so consumers
250 can tell at a glance which calls were driven from a script versus
251 a direct ``tool_calls`` strategy.
253 The ``status`` argument carries the call's terminal state (``ok``
254 / ``failed`` / ``skipped_not_allowed``) and ``duration_ms`` mirrors
255 the per-call timing the wrapper records on its own
256 ``script_call_log`` entries. ``error_message`` is recorded only
257 when supplied and is truncated to 200 characters to match the
258 existing convention in :func:`emit_phase_event` and
259 :func:`emit_sampling_event`.
260 """
261 entry: dict[str, Any] = {
262 "event_type": EVENT_TYPE_SCRIPT_CALL,
263 "via_script": True,
264 "mission_session_id": session_id,
265 "iteration_index": iteration_index,
266 "tool_name": tool_name,
267 "tool_status": status,
268 "duration_ms": duration_ms,
269 "timestamp": _now_iso(),
270 }
271 if error_message:
272 entry["error_message"] = error_message[:200]
273 _emit(entry)
276__all__ = [
277 "EVENT_TYPE_PHASE",
278 "EVENT_TYPE_SAMPLING",
279 "EVENT_TYPE_SCRIPT_CALL",
280 "EVENT_TYPE_VERDICT",
281 "MissionAuditCollectorHandler",
282 "emit_phase_event",
283 "emit_sampling_event",
284 "emit_script_call_event",
285 "emit_verdict_event",
286 "get_collector",
287 "install_collector",
288 "replay_audit_entries",
289]
292# ---------------------------------------------------------------------------
293# Audit-replay helper
294# ---------------------------------------------------------------------------
297# Default cap on the in-process collector ring buffer. 5000 entries is
298# big enough to cover a session that runs to its iteration budget
299# (six entries per iteration × ~800 iterations) but small enough that
300# a long-running process cannot OOM through the audit channel.
301_DEFAULT_COLLECTOR_CAPACITY = 5000
304class MissionAuditCollectorHandler(logging.Handler):
305 """Bounded ring-buffer logging handler that captures Mission audit JSON.
307 Attached to the shared ``gco.mcp.audit`` logger by
308 :func:`install_collector` so the
309 ``mission://sessions/{session_id}/audit-replay`` resource has a
310 source of phase / verdict entries to feed
311 :func:`replay_audit_entries`. The handler filters by
312 ``event_type`` so non-Mission audit emitters (the standard MCP
313 tool-invocation decorator, the startup-log helper) do not pollute
314 the buffer.
316 Bounded via :class:`collections.deque(maxlen=N)` so a long-running
317 process never grows the buffer without bound. Operators who want a
318 larger or smaller window can construct the handler explicitly with
319 ``capacity=`` or call :func:`install_collector(capacity=...)`.
320 """
322 _MISSION_EVENT_TYPES = frozenset(
323 {
324 EVENT_TYPE_PHASE,
325 EVENT_TYPE_VERDICT,
326 EVENT_TYPE_SAMPLING,
327 EVENT_TYPE_SCRIPT_CALL,
328 }
329 )
331 def __init__(self, capacity: int = _DEFAULT_COLLECTOR_CAPACITY) -> None:
332 super().__init__(level=logging.INFO)
333 self._buffer: deque[dict[str, Any]] = deque(maxlen=capacity)
335 def emit(self, record: logging.LogRecord) -> None:
336 """Capture Mission audit JSON entries into the ring buffer."""
337 try:
338 payload = json.loads(record.getMessage())
339 except TypeError, ValueError:
340 return
341 if not isinstance(payload, dict):
342 return
343 if payload.get("event_type") not in self._MISSION_EVENT_TYPES:
344 return
345 self._buffer.append(payload)
347 def entries_for(self, session_id: str) -> list[dict[str, Any]]:
348 """Return a list copy of every captured entry for ``session_id``."""
349 return [dict(e) for e in list(self._buffer) if e.get("mission_session_id") == session_id]
351 def clear(self) -> None:
352 """Drop every captured entry. Useful for test isolation."""
353 self._buffer.clear()
356# Module-level collector. ``None`` until :func:`install_collector` is
357# called — the resources/__init__.py wiring installs it once at import
358# time so every Mission audit entry the engine emits during the
359# process lifetime is reachable from the audit-replay resource.
360_COLLECTOR: MissionAuditCollectorHandler | None = None
363def install_collector(
364 capacity: int = _DEFAULT_COLLECTOR_CAPACITY,
365) -> MissionAuditCollectorHandler:
366 """Attach a :class:`MissionAuditCollectorHandler` to the audit logger.
368 Idempotent: a second call with the same parameters returns the
369 existing handler. The function exists so test fixtures can clear
370 and re-attach the handler between cases without leaking captured
371 entries across the boundary.
373 Logger level boost. Python's stdlib ``logging`` defaults the root
374 threshold to ``WARNING``, which means an unconfigured caller that
375 never calls ``logging.basicConfig(level=logging.INFO)`` would
376 silently drop every ``audit_logger.info(...)`` call before it
377 reaches a handler — including this collector. The
378 ``mission://sessions/{id}/audit-replay`` resource needs entries
379 to flow regardless of the host's logging setup, so we floor the
380 logger's level at ``INFO`` here. Hosts that have already set a
381 finer threshold (e.g. ``DEBUG``) keep theirs; only the
382 "unconfigured" case is repaired.
383 """
384 global _COLLECTOR
385 if _COLLECTOR is None:
386 _COLLECTOR = MissionAuditCollectorHandler(capacity=capacity)
387 audit_logger.addHandler(_COLLECTOR)
388 # Floor at INFO so audit_logger.info() entries reach the handler
389 # even when the host has not configured logging at all. We never
390 # *raise* the threshold — a host that explicitly set DEBUG keeps
391 # DEBUG.
392 if audit_logger.level == logging.NOTSET or audit_logger.level > logging.INFO:
393 audit_logger.setLevel(logging.INFO)
394 return _COLLECTOR
397def get_collector() -> MissionAuditCollectorHandler | None:
398 """Return the installed collector or ``None`` when nothing is attached."""
399 return _COLLECTOR
402def replay_audit_entries(
403 session_id: str,
404 entries: list[dict[str, Any]],
405) -> list[dict[str, Any]]:
406 """Reconstruct iteration history from a stream of Mission audit entries.
408 Pure function. Each iteration produces five
409 ``mission_phase_event`` entries (one per phase) plus one
410 ``mission_verdict_event`` entry that closes it out, in emission
411 order. This walker filters ``entries`` to the events whose
412 ``mission_session_id`` matches ``session_id``, accumulates phase
413 events into the active iteration's ``phases`` list, and stamps
414 the verdict + reason from the matching verdict event before
415 appending the completed record.
417 Returns a list of dicts shaped like
418 ``{"iteration_index": int, "phases": [{"phase", "status",
419 "started_at", "ended_at", "error_message"}, ...], "verdict": str
420 | None, "verdict_reason": str | None, "revision_rationale": str
421 | None}``. The shape is intentionally narrow — it covers only
422 the fields the audit stream is expected to fully describe, not
423 the strategy / observation / criteria-evaluation fields the
424 engine persists separately to the session backend.
426 A phase event whose ``iteration_index`` jumps ahead of the
427 active iteration before its verdict event has landed flushes
428 the active iteration with ``verdict=None`` / ``verdict_reason
429 =None`` so a malformed audit stream surfaces as a visible
430 sentinel rather than a silent merge. An iteration with no
431 closing verdict event at end-of-stream is appended the same way.
432 """
433 matching = [
434 e for e in entries if isinstance(e, dict) and e.get("mission_session_id") == session_id
435 ]
437 iterations: list[dict[str, Any]] = []
438 current_index: int | None = None
439 current_phases: list[dict[str, Any]] = []
441 def _flush_current(verdict: str | None, reason: str | None, rationale: str | None) -> None:
442 """Append the active iteration to the result list."""
443 if current_index is None: 443 ↛ 444line 443 didn't jump to line 444 because the condition on line 443 was never true
444 return
445 iterations.append(
446 {
447 "iteration_index": current_index,
448 "phases": list(current_phases),
449 "verdict": verdict,
450 "verdict_reason": reason,
451 "revision_rationale": rationale,
452 }
453 )
455 for entry in matching:
456 event_type = entry.get("event_type")
457 iteration_index = entry.get("iteration_index")
459 if event_type == EVENT_TYPE_PHASE:
460 if current_index is not None and current_index != iteration_index:
461 # New iteration arrived before the prior closed —
462 # flush the prior with sentinel verdict / reason so
463 # the caller can see the orphaned phases.
464 _flush_current(None, None, None)
465 current_phases = []
466 current_index = iteration_index
467 phase_record: dict[str, Any] = {
468 "phase": entry.get("phase"),
469 "status": entry.get("phase_status"),
470 "started_at": entry.get("phase_started_at"),
471 "ended_at": entry.get("phase_ended_at"),
472 }
473 if "error_message" in entry:
474 phase_record["error_message"] = entry["error_message"]
475 current_phases.append(phase_record)
476 elif event_type == EVENT_TYPE_VERDICT: 476 ↛ 455line 476 didn't jump to line 455 because the condition on line 476 was always true
477 # ``current_index`` may legitimately be ``None`` when a
478 # verdict-only iteration arrives (e.g. a synthetic
479 # ``cadence_skip``); in that case stamp the iteration
480 # index from the verdict event itself.
481 if current_index is None:
482 current_index = iteration_index
483 _flush_current(
484 entry.get("verdict"),
485 entry.get("verdict_reason"),
486 entry.get("revision_rationale"),
487 )
488 current_index = None
489 current_phases = []
491 # Stream ended mid-iteration — flush the unclosed iteration with
492 # null verdict so the caller sees the partial record.
493 if current_index is not None:
494 _flush_current(None, None, None)
496 return iterations