Coverage for mcp/audit.py: 94%

122 statements  

« prev     ^ index     » next       coverage.py v7.14.1, created at 2026-06-15 15:07 +0000

1""" 

2Audit logging infrastructure for the GCO MCP server. 

3 

4Provides: 

5- ``_sanitize_arguments`` — redacts sensitive keys, truncates large values. 

6- ``audit_logged`` — decorator that emits structured JSON audit entries for 

7 every MCP tool invocation (success or failure). Dispatches on 

8 ``inspect.iscoroutinefunction`` so async tools work transparently. 

9- ``audit_messages_var`` / ``audit_elicitations_var`` — ContextVars populated 

10 by ``mcp/audit_middleware.py`` to surface ``ctx.warning``/``info``/``error`` 

11 /``elicit`` calls in the audit entry. 

12- Startup audit log entry emitted at import time. 

13""" 

14 

15import contextvars 

16import functools 

17import inspect 

18import json 

19import logging 

20import os 

21import re 

22import time 

23from collections.abc import Callable 

24from datetime import UTC, datetime 

25from typing import Any 

26 

27import feature_flags 

28from version import get_project_version 

29 

30# <pyflowchart-code-diagram> BEGIN - auto-inserted, do not edit 

31# Flowchart(s) generated from this file: 

32# * ``audit_logged`` -> ``diagrams/code_diagrams/mcp/audit.audit_logged.html`` 

33# (PNG: ``diagrams/code_diagrams/mcp/audit.audit_logged.png``) 

34# Regenerate with ``python diagrams/code_diagrams/generate.py``. 

35# <pyflowchart-code-diagram> END 

36 

37 

38# ============================================================================= 

39# AUDIT LOGGING 

40# ============================================================================= 

41 

42_MCP_SERVER_VERSION = get_project_version() 

43 

44audit_logger = logging.getLogger("gco.mcp.audit") 

45 

46# Patterns for sensitive argument key names (case-insensitive) 

47_SENSITIVE_KEY_PATTERNS = [ 

48 re.compile(r".*token.*", re.IGNORECASE), 

49 re.compile(r".*secret.*", re.IGNORECASE), 

50 re.compile(r".*password.*", re.IGNORECASE), 

51 re.compile(r".*key.*", re.IGNORECASE), 

52] 

53 

54_MAX_ARG_VALUE_BYTES = 1024 # 1KB 

55 

56# Per-invocation capture buffers populated by the audit middleware. The 

57# middleware sets fresh lists at the start of every tool call; the audit 

58# decorator reads them at the end and includes them in the entry when 

59# non-empty. Default ``None`` means "no capture in scope" — the patched 

60# Context methods short-circuit to the originals without recording. 

61audit_messages_var: contextvars.ContextVar[list[dict[str, str]] | None] = contextvars.ContextVar( 

62 "gco_audit_messages", default=None 

63) 

64audit_elicitations_var: contextvars.ContextVar[list[dict[str, object]] | None] = ( 

65 contextvars.ContextVar("gco_audit_elicitations", default=None) 

66) 

67 

68 

69def _sanitize_arguments(kwargs: dict[str, Any]) -> dict[str, Any]: 

70 """Sanitize tool arguments for audit logging. 

71 

72 - Redact values whose key name matches sensitive patterns (token, secret, password, key). 

73 - Truncate string values longer than 1KB to first 100 chars + '[truncated]'. 

74 - Replace values that aren't JSON-serializable (e.g. FastMCP ``Context`` 

75 and ``Progress`` dependencies injected as keyword arguments) with a 

76 type-only placeholder so ``json.dumps(_build_audit_entry(...))`` can't 

77 raise ``TypeError`` mid-tool. Without this guard, every long-running 

78 tool that takes ``ctx``/``progress`` (deploy_all, destroy_all, 

79 bootstrap_cdk, deploy_stack, destroy_stack, images_build, 

80 images_push) crashes the wrapper with 

81 ``Object of type Context is not JSON serializable`` before the 

82 underlying CLI ever runs. 

83 """ 

84 sanitized = {} 

85 for k, v in kwargs.items(): 

86 # Check if the key name matches any sensitive pattern 

87 if any(pattern.match(k) for pattern in _SENSITIVE_KEY_PATTERNS): 

88 sanitized[k] = "[REDACTED]" 

89 continue 

90 

91 # Truncate large string values 

92 str_val = str(v) if not isinstance(v, str) else v 

93 if len(str_val.encode("utf-8", errors="replace")) > _MAX_ARG_VALUE_BYTES: 

94 sanitized[k] = str_val[:100] + "[truncated]" 

95 continue 

96 

97 # Probe JSON-serializability so injected dependencies (FastMCP 

98 # Context / Progress, dataclasses without ``default``, etc.) don't 

99 # blow up the audit emission. Bare primitives short-circuit the 

100 # try/except since ``json.dumps`` on str/int/float/bool/None/list/ 

101 # dict-of-primitives is ~free. 

102 try: 

103 json.dumps(v) 

104 sanitized[k] = v 

105 except Exception: 

106 # Best-effort: any serialization failure (TypeError on unknown 

107 # types, ValueError on circular refs / NaN with allow_nan=False) 

108 # falls through to a type-only placeholder so the audit log 

109 # always emits valid JSON. 

110 sanitized[k] = f"<unserializable: {type(v).__name__}>" 

111 return sanitized 

112 

113 

114def _try_get_fastmcp_context() -> Any | None: 

115 """Return the active FastMCP Context if inside a request, else None. 

116 

117 Wrapping the import lets ``audit_logged`` work in unit tests that don't 

118 go through an MCP request — ``get_context()`` raises ``RuntimeError`` in 

119 that case, which we swallow. 

120 """ 

121 try: 

122 from fastmcp.server.dependencies import get_context 

123 

124 return get_context() 

125 except Exception: 

126 return None 

127 

128 

129def _try_get_task_id(ctx: Any | None) -> str | None: 

130 """Extract the FastMCP task ID from request meta when present. 

131 

132 Every attribute access is wrapped in ``getattr(..., None)`` so a missing 

133 intermediate (no request_context, no meta, no task_id) yields ``None`` 

134 rather than raising. 

135 """ 

136 if ctx is None: 136 ↛ 137line 136 didn't jump to line 137 because the condition on line 136 was never true

137 return None 

138 rc = getattr(ctx, "request_context", None) 

139 if rc is None: 139 ↛ 140line 139 didn't jump to line 140 because the condition on line 139 was never true

140 return None 

141 meta = getattr(rc, "meta", None) 

142 if meta is None: 142 ↛ 143line 142 didn't jump to line 143 because the condition on line 142 was never true

143 return None 

144 return getattr(meta, "task_id", None) 

145 

146 

147def _build_audit_entry( 

148 func_name: str, 

149 sanitized_args: dict[str, Any], 

150 status: str, 

151 duration_ms: float, 

152 error: str | None, 

153 result: Any, # noqa: ARG001 -- reserved for future result-shape capture 

154) -> dict[str, Any]: 

155 """Build the JSON dict for a single tool-invocation audit entry. 

156 

157 Optional fields (``error``, ``request_id``, ``client_id``, ``task_id``, 

158 ``client_messages``, ``elicitations``) are omitted when their values 

159 are missing or empty. Existing sync-tool entries that don't trigger 

160 any new field look identical to the pre-refactor shape. 

161 """ 

162 entry: dict[str, Any] = { 

163 "event": "mcp.tool.invocation", 

164 "tool": func_name, 

165 "arguments": sanitized_args, 

166 "status": status, 

167 "duration_ms": round(duration_ms, 2), 

168 "timestamp": datetime.now(UTC).isoformat(), 

169 } 

170 if error: 

171 entry["error"] = error[:200] 

172 

173 ctx = _try_get_fastmcp_context() 

174 if ctx is not None: 

175 # ``request_id`` raises if no request_context is set; guard with 

176 # request_context first so the access is safe. 

177 if getattr(ctx, "request_context", None) is not None: 177 ↛ 184line 177 didn't jump to line 184 because the condition on line 177 was always true

178 try: 

179 rid = ctx.request_id 

180 except Exception: 

181 rid = None 

182 if rid: 182 ↛ 184line 182 didn't jump to line 184 because the condition on line 182 was always true

183 entry["request_id"] = rid 

184 cid = getattr(ctx, "client_id", None) 

185 if cid: 

186 entry["client_id"] = cid 

187 tid = _try_get_task_id(ctx) 

188 if tid: 

189 entry["task_id"] = tid 

190 

191 msgs = audit_messages_var.get() 

192 if msgs: 

193 entry["client_messages"] = list(msgs) 

194 elics = audit_elicitations_var.get() 

195 if elics: 

196 entry["elicitations"] = list(elics) 

197 

198 return entry 

199 

200 

201def audit_logged(func: Callable[..., Any]) -> Callable[..., Any]: 

202 """Decorator that emits structured JSON audit entries for tool invocations. 

203 

204 Dispatches on ``inspect.iscoroutinefunction(func)``: async tools get an 

205 async wrapper that ``await``s the call, sync tools keep the existing 

206 sync path. Both wrappers share ``_build_audit_entry``. 

207 """ 

208 if inspect.iscoroutinefunction(func): 

209 

210 @functools.wraps(func) 

211 async def async_wrapper(*args: Any, **kwargs: Any) -> Any: 

212 start = time.time() 

213 sanitized_args = _sanitize_arguments(kwargs) 

214 try: 

215 result = await func(*args, **kwargs) 

216 duration_ms = (time.time() - start) * 1000 

217 audit_logger.info( 

218 json.dumps( 

219 _build_audit_entry( 

220 func.__name__, 

221 sanitized_args, 

222 "success", 

223 duration_ms, 

224 None, 

225 result, 

226 ) 

227 ) 

228 ) 

229 return result 

230 except Exception as e: 

231 duration_ms = (time.time() - start) * 1000 

232 audit_logger.info( 

233 json.dumps( 

234 _build_audit_entry( 

235 func.__name__, 

236 sanitized_args, 

237 "error", 

238 duration_ms, 

239 str(e), 

240 None, 

241 ) 

242 ) 

243 ) 

244 raise 

245 

246 return async_wrapper 

247 

248 @functools.wraps(func) 

249 def sync_wrapper(*args: Any, **kwargs: Any) -> Any: 

250 start = time.time() 

251 sanitized_args = _sanitize_arguments(kwargs) 

252 try: 

253 result = func(*args, **kwargs) 

254 duration_ms = (time.time() - start) * 1000 

255 audit_logger.info( 

256 json.dumps( 

257 _build_audit_entry( 

258 func.__name__, 

259 sanitized_args, 

260 "success", 

261 duration_ms, 

262 None, 

263 result, 

264 ) 

265 ) 

266 ) 

267 return result 

268 except Exception as e: 

269 duration_ms = (time.time() - start) * 1000 

270 audit_logger.info( 

271 json.dumps( 

272 _build_audit_entry( 

273 func.__name__, 

274 sanitized_args, 

275 "error", 

276 duration_ms, 

277 str(e), 

278 None, 

279 ) 

280 ) 

281 ) 

282 raise 

283 

284 return sync_wrapper 

285 

286 

287# ============================================================================= 

288# STARTUP LOG 

289# ============================================================================= 

290 

291# Recognised values for the ``GCO_MCP_TOOL_SEARCH`` env var. Anything outside 

292# this set normalises to ``"bm25"`` — the same fallback rule that 

293# ``mcp/server.py`` uses when wiring the catalog-replacement transform. 

294_TOOL_SEARCH_VALUES = ("bm25", "regex", "code_mode", "off") 

295 

296 

297def _resolve_tool_search() -> str: 

298 """Return the effective ``GCO_MCP_TOOL_SEARCH`` value after normalisation. 

299 

300 Mirrors the resolution in ``mcp/server.py``: read the env var, strip and 

301 lowercase, then fall back to ``"bm25"`` for unset, empty, or unknown 

302 values so the audit entry reports what was actually wired. 

303 """ 

304 raw = os.environ.get("GCO_MCP_TOOL_SEARCH", "bm25").strip().lower() 

305 return raw if raw in _TOOL_SEARCH_VALUES else "bm25" 

306 

307 

308def emit_startup_log() -> None: 

309 """Emit the startup audit log entry.""" 

310 entry: dict[str, Any] = { 

311 "event": "mcp.server.startup", 

312 "version": _MCP_SERVER_VERSION, 

313 "audit_log_level": logging.getLevelName(audit_logger.getEffectiveLevel()), 

314 "timestamp": datetime.now(UTC).isoformat(), 

315 } 

316 if feature_flags.all_tools_enabled(): 

317 entry["all_tools_enabled"] = True 

318 if feature_flags.is_enabled(feature_flags.FLAG_MISSION): 

319 entry["mission_enabled"] = True 

320 tool_search = _resolve_tool_search() 

321 entry["tool_search"] = tool_search 

322 if tool_search == "code_mode": 

323 entry["code_mode_experimental"] = True 

324 audit_logger.info(json.dumps(entry))