Coverage for mcp/audit.py: 94%
122 statements
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-15 15:07 +0000
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-15 15:07 +0000
1"""
2Audit logging infrastructure for the GCO MCP server.
4Provides:
5- ``_sanitize_arguments`` — redacts sensitive keys, truncates large values.
6- ``audit_logged`` — decorator that emits structured JSON audit entries for
7 every MCP tool invocation (success or failure). Dispatches on
8 ``inspect.iscoroutinefunction`` so async tools work transparently.
9- ``audit_messages_var`` / ``audit_elicitations_var`` — ContextVars populated
10 by ``mcp/audit_middleware.py`` to surface ``ctx.warning``/``info``/``error``
11 /``elicit`` calls in the audit entry.
12- Startup audit log entry emitted at import time.
13"""
15import contextvars
16import functools
17import inspect
18import json
19import logging
20import os
21import re
22import time
23from collections.abc import Callable
24from datetime import UTC, datetime
25from typing import Any
27import feature_flags
28from version import get_project_version
30# <pyflowchart-code-diagram> BEGIN - auto-inserted, do not edit
31# Flowchart(s) generated from this file:
32# * ``audit_logged`` -> ``diagrams/code_diagrams/mcp/audit.audit_logged.html``
33# (PNG: ``diagrams/code_diagrams/mcp/audit.audit_logged.png``)
34# Regenerate with ``python diagrams/code_diagrams/generate.py``.
35# <pyflowchart-code-diagram> END
38# =============================================================================
39# AUDIT LOGGING
40# =============================================================================
42_MCP_SERVER_VERSION = get_project_version()
44audit_logger = logging.getLogger("gco.mcp.audit")
46# Patterns for sensitive argument key names (case-insensitive)
47_SENSITIVE_KEY_PATTERNS = [
48 re.compile(r".*token.*", re.IGNORECASE),
49 re.compile(r".*secret.*", re.IGNORECASE),
50 re.compile(r".*password.*", re.IGNORECASE),
51 re.compile(r".*key.*", re.IGNORECASE),
52]
54_MAX_ARG_VALUE_BYTES = 1024 # 1KB
56# Per-invocation capture buffers populated by the audit middleware. The
57# middleware sets fresh lists at the start of every tool call; the audit
58# decorator reads them at the end and includes them in the entry when
59# non-empty. Default ``None`` means "no capture in scope" — the patched
60# Context methods short-circuit to the originals without recording.
61audit_messages_var: contextvars.ContextVar[list[dict[str, str]] | None] = contextvars.ContextVar(
62 "gco_audit_messages", default=None
63)
64audit_elicitations_var: contextvars.ContextVar[list[dict[str, object]] | None] = (
65 contextvars.ContextVar("gco_audit_elicitations", default=None)
66)
69def _sanitize_arguments(kwargs: dict[str, Any]) -> dict[str, Any]:
70 """Sanitize tool arguments for audit logging.
72 - Redact values whose key name matches sensitive patterns (token, secret, password, key).
73 - Truncate string values longer than 1KB to first 100 chars + '[truncated]'.
74 - Replace values that aren't JSON-serializable (e.g. FastMCP ``Context``
75 and ``Progress`` dependencies injected as keyword arguments) with a
76 type-only placeholder so ``json.dumps(_build_audit_entry(...))`` can't
77 raise ``TypeError`` mid-tool. Without this guard, every long-running
78 tool that takes ``ctx``/``progress`` (deploy_all, destroy_all,
79 bootstrap_cdk, deploy_stack, destroy_stack, images_build,
80 images_push) crashes the wrapper with
81 ``Object of type Context is not JSON serializable`` before the
82 underlying CLI ever runs.
83 """
84 sanitized = {}
85 for k, v in kwargs.items():
86 # Check if the key name matches any sensitive pattern
87 if any(pattern.match(k) for pattern in _SENSITIVE_KEY_PATTERNS):
88 sanitized[k] = "[REDACTED]"
89 continue
91 # Truncate large string values
92 str_val = str(v) if not isinstance(v, str) else v
93 if len(str_val.encode("utf-8", errors="replace")) > _MAX_ARG_VALUE_BYTES:
94 sanitized[k] = str_val[:100] + "[truncated]"
95 continue
97 # Probe JSON-serializability so injected dependencies (FastMCP
98 # Context / Progress, dataclasses without ``default``, etc.) don't
99 # blow up the audit emission. Bare primitives short-circuit the
100 # try/except since ``json.dumps`` on str/int/float/bool/None/list/
101 # dict-of-primitives is ~free.
102 try:
103 json.dumps(v)
104 sanitized[k] = v
105 except Exception:
106 # Best-effort: any serialization failure (TypeError on unknown
107 # types, ValueError on circular refs / NaN with allow_nan=False)
108 # falls through to a type-only placeholder so the audit log
109 # always emits valid JSON.
110 sanitized[k] = f"<unserializable: {type(v).__name__}>"
111 return sanitized
114def _try_get_fastmcp_context() -> Any | None:
115 """Return the active FastMCP Context if inside a request, else None.
117 Wrapping the import lets ``audit_logged`` work in unit tests that don't
118 go through an MCP request — ``get_context()`` raises ``RuntimeError`` in
119 that case, which we swallow.
120 """
121 try:
122 from fastmcp.server.dependencies import get_context
124 return get_context()
125 except Exception:
126 return None
129def _try_get_task_id(ctx: Any | None) -> str | None:
130 """Extract the FastMCP task ID from request meta when present.
132 Every attribute access is wrapped in ``getattr(..., None)`` so a missing
133 intermediate (no request_context, no meta, no task_id) yields ``None``
134 rather than raising.
135 """
136 if ctx is None: 136 ↛ 137line 136 didn't jump to line 137 because the condition on line 136 was never true
137 return None
138 rc = getattr(ctx, "request_context", None)
139 if rc is None: 139 ↛ 140line 139 didn't jump to line 140 because the condition on line 139 was never true
140 return None
141 meta = getattr(rc, "meta", None)
142 if meta is None: 142 ↛ 143line 142 didn't jump to line 143 because the condition on line 142 was never true
143 return None
144 return getattr(meta, "task_id", None)
147def _build_audit_entry(
148 func_name: str,
149 sanitized_args: dict[str, Any],
150 status: str,
151 duration_ms: float,
152 error: str | None,
153 result: Any, # noqa: ARG001 -- reserved for future result-shape capture
154) -> dict[str, Any]:
155 """Build the JSON dict for a single tool-invocation audit entry.
157 Optional fields (``error``, ``request_id``, ``client_id``, ``task_id``,
158 ``client_messages``, ``elicitations``) are omitted when their values
159 are missing or empty. Existing sync-tool entries that don't trigger
160 any new field look identical to the pre-refactor shape.
161 """
162 entry: dict[str, Any] = {
163 "event": "mcp.tool.invocation",
164 "tool": func_name,
165 "arguments": sanitized_args,
166 "status": status,
167 "duration_ms": round(duration_ms, 2),
168 "timestamp": datetime.now(UTC).isoformat(),
169 }
170 if error:
171 entry["error"] = error[:200]
173 ctx = _try_get_fastmcp_context()
174 if ctx is not None:
175 # ``request_id`` raises if no request_context is set; guard with
176 # request_context first so the access is safe.
177 if getattr(ctx, "request_context", None) is not None: 177 ↛ 184line 177 didn't jump to line 184 because the condition on line 177 was always true
178 try:
179 rid = ctx.request_id
180 except Exception:
181 rid = None
182 if rid: 182 ↛ 184line 182 didn't jump to line 184 because the condition on line 182 was always true
183 entry["request_id"] = rid
184 cid = getattr(ctx, "client_id", None)
185 if cid:
186 entry["client_id"] = cid
187 tid = _try_get_task_id(ctx)
188 if tid:
189 entry["task_id"] = tid
191 msgs = audit_messages_var.get()
192 if msgs:
193 entry["client_messages"] = list(msgs)
194 elics = audit_elicitations_var.get()
195 if elics:
196 entry["elicitations"] = list(elics)
198 return entry
201def audit_logged(func: Callable[..., Any]) -> Callable[..., Any]:
202 """Decorator that emits structured JSON audit entries for tool invocations.
204 Dispatches on ``inspect.iscoroutinefunction(func)``: async tools get an
205 async wrapper that ``await``s the call, sync tools keep the existing
206 sync path. Both wrappers share ``_build_audit_entry``.
207 """
208 if inspect.iscoroutinefunction(func):
210 @functools.wraps(func)
211 async def async_wrapper(*args: Any, **kwargs: Any) -> Any:
212 start = time.time()
213 sanitized_args = _sanitize_arguments(kwargs)
214 try:
215 result = await func(*args, **kwargs)
216 duration_ms = (time.time() - start) * 1000
217 audit_logger.info(
218 json.dumps(
219 _build_audit_entry(
220 func.__name__,
221 sanitized_args,
222 "success",
223 duration_ms,
224 None,
225 result,
226 )
227 )
228 )
229 return result
230 except Exception as e:
231 duration_ms = (time.time() - start) * 1000
232 audit_logger.info(
233 json.dumps(
234 _build_audit_entry(
235 func.__name__,
236 sanitized_args,
237 "error",
238 duration_ms,
239 str(e),
240 None,
241 )
242 )
243 )
244 raise
246 return async_wrapper
248 @functools.wraps(func)
249 def sync_wrapper(*args: Any, **kwargs: Any) -> Any:
250 start = time.time()
251 sanitized_args = _sanitize_arguments(kwargs)
252 try:
253 result = func(*args, **kwargs)
254 duration_ms = (time.time() - start) * 1000
255 audit_logger.info(
256 json.dumps(
257 _build_audit_entry(
258 func.__name__,
259 sanitized_args,
260 "success",
261 duration_ms,
262 None,
263 result,
264 )
265 )
266 )
267 return result
268 except Exception as e:
269 duration_ms = (time.time() - start) * 1000
270 audit_logger.info(
271 json.dumps(
272 _build_audit_entry(
273 func.__name__,
274 sanitized_args,
275 "error",
276 duration_ms,
277 str(e),
278 None,
279 )
280 )
281 )
282 raise
284 return sync_wrapper
287# =============================================================================
288# STARTUP LOG
289# =============================================================================
291# Recognised values for the ``GCO_MCP_TOOL_SEARCH`` env var. Anything outside
292# this set normalises to ``"bm25"`` — the same fallback rule that
293# ``mcp/server.py`` uses when wiring the catalog-replacement transform.
294_TOOL_SEARCH_VALUES = ("bm25", "regex", "code_mode", "off")
297def _resolve_tool_search() -> str:
298 """Return the effective ``GCO_MCP_TOOL_SEARCH`` value after normalisation.
300 Mirrors the resolution in ``mcp/server.py``: read the env var, strip and
301 lowercase, then fall back to ``"bm25"`` for unset, empty, or unknown
302 values so the audit entry reports what was actually wired.
303 """
304 raw = os.environ.get("GCO_MCP_TOOL_SEARCH", "bm25").strip().lower()
305 return raw if raw in _TOOL_SEARCH_VALUES else "bm25"
308def emit_startup_log() -> None:
309 """Emit the startup audit log entry."""
310 entry: dict[str, Any] = {
311 "event": "mcp.server.startup",
312 "version": _MCP_SERVER_VERSION,
313 "audit_log_level": logging.getLevelName(audit_logger.getEffectiveLevel()),
314 "timestamp": datetime.now(UTC).isoformat(),
315 }
316 if feature_flags.all_tools_enabled():
317 entry["all_tools_enabled"] = True
318 if feature_flags.is_enabled(feature_flags.FLAG_MISSION):
319 entry["mission_enabled"] = True
320 tool_search = _resolve_tool_search()
321 entry["tool_search"] = tool_search
322 if tool_search == "code_mode":
323 entry["code_mode_experimental"] = True
324 audit_logger.info(json.dumps(entry))