Coverage for mcp/mission/state.py: 92%

128 statements  

« prev     ^ index     » next       coverage.py v7.14.1, created at 2026-06-15 15:07 +0000

1"""Persistence backends for the Mission goal-directed iteration loop. 

2 

3This module defines the :class:`MissionStateBackend` protocol — the narrow 

4interface the engine and tool wrappers depend on for loading, saving, 

5listing, and deleting :class:`~mcp.mission.types.SessionState` records. 

6Concrete implementations (filesystem, DynamoDB) and the 

7:func:`get_backend` resolver land in follow-on slices of this file. The 

8protocol is declared with :func:`typing.runtime_checkable` so tests can 

9assert backend conformance with ``isinstance`` rather than relying on 

10duck-typed call sites. 

11""" 

12 

13from __future__ import annotations 

14 

15import contextlib 

16import json 

17import logging 

18import os 

19import tempfile 

20from pathlib import Path 

21from typing import Any, Protocol, cast, runtime_checkable 

22 

23from . import SCHEMA_VERSION 

24from .types import SessionState 

25 

26logger = logging.getLogger(__name__) 

27 

28 

29@runtime_checkable 

30class MissionStateBackend(Protocol): 

31 """Storage contract for Mission session records. 

32 

33 All four methods operate on whole :class:`SessionState` payloads keyed 

34 by ``session_id``. Implementations are responsible for whatever 

35 serialization, atomicity, and access-control guarantees their backing 

36 store provides; callers treat the interface as opaque key-value 

37 storage with a list operation that returns lightweight metadata 

38 rather than full session bodies. 

39 """ 

40 

41 def load_session(self, session_id: str) -> SessionState | None: 

42 """Return the session record for ``session_id`` or ``None`` if absent. 

43 

44 Implementations return ``None`` for both unknown ``session_id`` and 

45 records whose ``version`` does not match the current 

46 :data:`mcp.mission.SCHEMA_VERSION`; the caller cannot distinguish 

47 the two and treats both as a missing session. 

48 """ 

49 ... 

50 

51 def save_session(self, session: SessionState) -> None: 

52 """Persist ``session`` keyed by its ``session_id`` field. 

53 

54 Writes are expected to be atomic from the reader's perspective: a 

55 concurrent :meth:`load_session` either sees the prior record or 

56 the new one, never a partial write. 

57 """ 

58 ... 

59 

60 def list_sessions(self, filter: dict[str, Any] | None = None) -> list[dict[str, Any]]: 

61 """Return lightweight metadata for sessions matching ``filter``. 

62 

63 Each returned dict carries identifying fields (``session_id``, 

64 ``status``, ``created_at``, and similar) rather than the full 

65 :class:`SessionState`. ``filter`` is an implementation-defined 

66 mapping; passing ``None`` lists every session the backend can 

67 see. 

68 """ 

69 ... 

70 

71 def delete_session(self, session_id: str) -> bool: 

72 """Remove ``session_id`` and return ``True`` if a record was deleted. 

73 

74 Returns ``False`` when no record existed; implementations do not 

75 raise on a missing key so that callers can use ``delete_session`` 

76 as an idempotent cleanup primitive. 

77 """ 

78 ... 

79 

80 

81class FilesystemBackend: 

82 """JSON-on-disk implementation of :class:`MissionStateBackend`. 

83 

84 Each session is persisted as ``<root>/<session_id>.json`` with its 

85 matching :class:`~mcp.mission.types.SessionState` payload; the 

86 Final_Report (when present) lives alongside it as 

87 ``<root>/<session_id>.report.json``. Writes go through the standard 

88 "temp file in the same directory, ``fsync``, then ``os.replace``" 

89 pattern so a reader concurrent with a writer always sees either the 

90 prior version of the file or the new one — never a partial JSON 

91 document. The temp file lives in the same directory as the final 

92 target so ``os.replace`` is a same-filesystem rename and therefore 

93 atomic on POSIX. 

94 

95 On POSIX systems the root directory is created (and re-asserted) at 

96 mode ``0o700`` and every session and report file is written at mode 

97 ``0o600`` so persisted state is unreadable to other local users. 

98 Permission calls are gated on ``os.name != "nt"`` because the POSIX 

99 permission model does not apply on Windows; the backend still works 

100 on Windows, just without the explicit mode tightening. 

101 """ 

102 

103 def __init__(self, root: Path | None = None) -> None: 

104 self.root = root if root is not None else Path.home() / ".gco" / "missions" 

105 self._root_initialized = False 

106 

107 # ------------------------------------------------------------------ # 

108 # internals 

109 # ------------------------------------------------------------------ # 

110 

111 def _ensure_root(self) -> None: 

112 """Create the root directory on first use, idempotently. 

113 

114 We defer the ``mkdir`` to the first write so simply constructing 

115 a backend (e.g. in the resolver in :func:`get_backend`) does not 

116 eagerly create ``~/.gco/missions`` on a host that ends up using 

117 a different backend. 

118 """ 

119 if self._root_initialized: 

120 return 

121 self.root.mkdir(parents=True, exist_ok=True) 

122 if os.name != "nt": 122 ↛ 133line 122 didn't jump to line 133 because the condition on line 122 was always true

123 with contextlib.suppress(OSError): 

124 # Best-effort tightening: a directory we already own with 

125 # different permissions is still safer to use than to 

126 # refuse the write outright. 0o700 (owner-only) is 

127 # intentional for ~/.gco/missions: session JSON contains 

128 # operator-supplied directives, criteria, observations, 

129 # and tool-call results that should not be readable by 

130 # other local users. 

131 # nosemgrep: python.lang.security.audit.insecure-file-permissions.insecure-file-permissions 

132 os.chmod(self.root, 0o700) 

133 self._root_initialized = True 

134 

135 def _session_path(self, session_id: str) -> Path: 

136 return self.root / f"{session_id}.json" 

137 

138 def _report_path(self, session_id: str) -> Path: 

139 return self.root / f"{session_id}.report.json" 

140 

141 # ------------------------------------------------------------------ # 

142 # protocol methods 

143 # ------------------------------------------------------------------ # 

144 

145 def load_session(self, session_id: str) -> SessionState | None: 

146 """Return the persisted session or ``None`` for missing/unsupported. 

147 

148 Returns ``None`` when the file does not exist, when the root 

149 directory has not been created yet, when the JSON cannot be 

150 parsed, or when the on-disk ``version`` field does not match 

151 :data:`mcp.mission.SCHEMA_VERSION`. Version mismatches log a 

152 single warning naming the unsupported value so an operator can 

153 spot stale state without having to grep the directory by hand. 

154 """ 

155 path = self._session_path(session_id) 

156 try: 

157 text = path.read_text(encoding="utf-8") 

158 except FileNotFoundError: 

159 return None 

160 except OSError: 

161 return None 

162 

163 try: 

164 payload = json.loads(text) 

165 except ValueError: 

166 return None 

167 

168 if not isinstance(payload, dict): 

169 return None 

170 

171 version = payload.get("version") 

172 if version != SCHEMA_VERSION: 

173 logger.warning( 

174 "Refusing to load Mission session %s: unsupported schema version %r", 

175 session_id, 

176 version, 

177 ) 

178 return None 

179 

180 return payload # type: ignore[return-value] 

181 

182 def save_session(self, session: SessionState) -> None: 

183 """Persist ``session`` atomically to ``<root>/<session_id>.json``. 

184 

185 Opens a temp file in the same directory, dumps JSON, flushes and 

186 ``fsync``s, applies POSIX mode ``0o600`` (when supported), then 

187 ``os.replace``s onto the final path. A failure mid-write leaves 

188 the temp file behind but never replaces the existing final file, 

189 so the previously-persisted state remains loadable. 

190 

191 Defense-in-depth strip. The validators in 

192 :mod:`mcp.mission.validation` attach a cached 

193 :class:`ast.Expression` under ``_parsed_ast`` on every 

194 ``predicate`` criterion. That object is not JSON-serialisable; 

195 a caller that hands a freshly-validated session straight to 

196 ``save_session`` without first stripping the cache would 

197 raise :class:`TypeError` at ``json.dump`` time. We strip 

198 unconditionally here so every persistence path stays correct 

199 regardless of which caller forgot. The strip is cheap and 

200 idempotent on already-clean inputs. 

201 """ 

202 # Local import: ``mission.validation`` is part of the same 

203 # package so this isn't a cross-package edge, just a 

204 # dependency-direction kept lazy to keep the eager import 

205 # surface of ``state`` minimal. 

206 from .validation import strip_private_fields 

207 

208 self._ensure_root() 

209 session_id = session["session_id"] 

210 final = self._session_path(session_id) 

211 cleaned = cast("SessionState", strip_private_fields(session)) 

212 

213 try: 

214 tmp = tempfile.NamedTemporaryFile( # noqa: SIM115 - explicit close+replace below 

215 mode="w", 

216 encoding="utf-8", 

217 dir=str(self.root), 

218 prefix=f"{session_id}.", 

219 suffix=".json.tmp", 

220 delete=False, 

221 ) 

222 try: 

223 json.dump(cleaned, tmp) 

224 tmp.flush() 

225 os.fsync(tmp.fileno()) 

226 finally: 

227 tmp.close() 

228 

229 if os.name != "nt": 229 ↛ 236line 229 didn't jump to line 236 because the condition on line 229 was always true

230 with contextlib.suppress(OSError): 

231 # Same rationale as in ``_ensure_root`` — proceed 

232 # with the replace rather than abandoning a write 

233 # we already fsynced. 

234 os.chmod(tmp.name, 0o600) 

235 

236 os.replace(tmp.name, final) 

237 except OSError as exc: 

238 # Re-raise with the underlying message intact so callers and 

239 # operators see the real cause (disk full, permission denied, 

240 # etc.) rather than a wrapped abstraction. 

241 raise OSError(str(exc)) from exc 

242 

243 def list_sessions(self, filter: dict[str, Any] | None = None) -> list[dict[str, Any]]: 

244 """Return summary dicts for every parseable session under ``root``. 

245 

246 Each entry has the shape ``{"session_id", "status", "created_at", 

247 "iteration_count"}``. Sessions whose JSON fails to parse, whose 

248 version is unsupported, or which are missing required summary 

249 fields are silently skipped (one debug-log line per skip) so a 

250 single corrupt file cannot block listing the rest. 

251 

252 ``filter`` currently supports the ``status`` key only; callers 

253 pass ``{"status": "running"}`` to narrow the list. 

254 """ 

255 if not self.root.exists(): 

256 return [] 

257 

258 results: list[dict[str, Any]] = [] 

259 for path in self.root.glob("*.json"): 

260 # Skip the sibling report files — they share the directory 

261 # but are not session payloads. 

262 if path.name.endswith(".report.json"): 

263 continue 

264 try: 

265 payload = json.loads(path.read_text(encoding="utf-8")) 

266 except OSError, ValueError: 

267 logger.debug("Skipping unreadable Mission file: %s", path) 

268 continue 

269 if not isinstance(payload, dict): 

270 logger.debug("Skipping non-object Mission file: %s", path) 

271 continue 

272 if payload.get("version") != SCHEMA_VERSION: 

273 logger.debug( 

274 "Skipping Mission file %s with unknown version %r", 

275 path, 

276 payload.get("version"), 

277 ) 

278 continue 

279 

280 summary = { 

281 "session_id": payload.get("session_id", path.stem), 

282 "status": payload.get("status"), 

283 "created_at": payload.get("created_at"), 

284 "iteration_count": len(payload.get("iterations", []) or []), 

285 } 

286 results.append(summary) 

287 

288 if filter and "status" in filter: 

289 wanted = filter["status"] 

290 results = [r for r in results if r.get("status") == wanted] 

291 

292 return results 

293 

294 def delete_session(self, session_id: str) -> bool: 

295 """Remove the session JSON and any matching report file. 

296 

297 Returns ``True`` when at least one of the two files existed and 

298 was removed; ``False`` when neither was present (including when 

299 the root directory has never been created). The two removals are 

300 independent so a stale ``.report.json`` left behind by an 

301 earlier crash is still cleaned up even when the session JSON has 

302 already been deleted. 

303 """ 

304 if not self.root.exists(): 

305 return False 

306 

307 removed = False 

308 for path in (self._session_path(session_id), self._report_path(session_id)): 

309 try: 

310 os.remove(path) 

311 removed = True 

312 except FileNotFoundError: 

313 continue 

314 except OSError: 

315 # An unreadable-but-present file should not silently 

316 # masquerade as "no record existed"; surface it. 

317 raise 

318 return removed 

319 

320 

321class DynamoDBBackend: 

322 """DynamoDB-backed implementation of :class:`MissionStateBackend`. 

323 

324 Stub implementation: this class declares the protocol shape so the 

325 global stack's CDK wiring can reference the backend type and so the 

326 resolver in slice 3.4 can construct it, but no automated test in 

327 this slice exercises any code path that touches AWS. Each method 

328 is annotated with a ``TODO(mission-dynamodb)`` marker right above 

329 its body and the corresponding tests are skipped (see slice 3.6). 

330 A separate, AWS-credentialed smoke test validates the real 

331 behaviour. 

332 

333 Item schema mirrors the :class:`SessionState` TypedDict one-to-one: 

334 the partition key is ``session_id`` and ``status`` plus ``created_at`` 

335 feed a ``status-index`` GSI so :meth:`list_sessions` can filter by 

336 status without a full table scan. ``put_item`` is atomic by virtue 

337 of DynamoDB's single-item write semantics, so the temp-file dance 

338 used by :class:`FilesystemBackend` is unnecessary here. 

339 

340 Table-name resolution is lazy: when the constructor's ``table_name`` 

341 argument is ``None``, the table name is fetched from SSM at 

342 ``/{project_name}/missions-table-name`` on the first call that 

343 needs it (not at construction time). This matches the precedent 

344 pattern in ``cli/models.py`` and lets unit tests construct a 

345 ``DynamoDBBackend()`` on a host without AWS credentials without 

346 triggering an SSM call. ``project_name`` is read from the 

347 ``GCO_PROJECT_NAME`` environment variable, defaulting to ``"gco"`` 

348 so a fresh checkout (or CI run without the env var set) lines up 

349 with the default project name in ``cli/config.py``. 

350 

351 The SSM lookup goes through :func:`gco.services.aws_ssm.get_ssm_parameter`, 

352 the shared helper that consolidates the pattern previously duplicated 

353 across ``cli/models.py``, ``cli/analytics_user_mgmt.py``, and 

354 ``gco/services/health_monitor.py``. Putting the helper under 

355 ``gco/services/`` (rather than ``cli/aws_client.py``) keeps 

356 ``mcp/`` free of the forbidden ``mcp -> cli`` import edge while 

357 still letting every backend share one implementation. 

358 """ 

359 

360 def __init__(self, table_name: str | None = None) -> None: 

361 self._table_name: str | None = table_name 

362 self._table: Any = None # boto3 Table resource, lazily constructed 

363 

364 # ------------------------------------------------------------------ # 

365 # internals 

366 # ------------------------------------------------------------------ # 

367 

368 def _resolve_table_name(self) -> str: # pragma: no cover - boto3 / SSM 

369 """Return the cached table name, fetching from SSM on first call. 

370 

371 Reads ``GCO_PROJECT_NAME`` (default ``"gco"``) to build the SSM 

372 parameter path ``/{project_name}/missions-table-name``. The 

373 value is cached on the instance so subsequent method calls do 

374 not re-hit SSM. 

375 """ 

376 if self._table_name is not None: 

377 return self._table_name 

378 

379 from gco.services.aws_ssm import get_ssm_parameter 

380 

381 project_name = os.environ.get("GCO_PROJECT_NAME", "gco") 

382 param_name = f"/{project_name}/missions-table-name" 

383 

384 self._table_name = get_ssm_parameter(param_name) 

385 return self._table_name 

386 

387 def _get_table(self) -> Any: # pragma: no cover - boto3 resource 

388 """Return the cached ``boto3`` Table resource, building it lazily.""" 

389 if self._table is not None: 

390 return self._table 

391 

392 import boto3 

393 

394 self._table = boto3.resource("dynamodb").Table(self._resolve_table_name()) 

395 return self._table 

396 

397 # ------------------------------------------------------------------ # 

398 # protocol methods 

399 # ------------------------------------------------------------------ # 

400 

401 def load_session(self, session_id: str) -> SessionState | None: # pragma: no cover - DynamoDB 

402 """Fetch the session via ``get_item`` keyed on ``session_id``.""" 

403 table = self._get_table() 

404 response = table.get_item(Key={"session_id": session_id}) 

405 item = response.get("Item") 

406 if item is None: 

407 return None 

408 if item.get("version") != SCHEMA_VERSION: 

409 logger.warning( 

410 "Refusing to load Mission session %s: unsupported schema version %r", 

411 session_id, 

412 item.get("version"), 

413 ) 

414 return None 

415 return cast("SessionState", item) 

416 

417 def save_session(self, session: SessionState) -> None: # pragma: no cover - DynamoDB 

418 """Persist the session via ``put_item`` (atomic single-item write). 

419 

420 Defense-in-depth strip — same rationale as 

421 :meth:`FilesystemBackend.save_session`. DynamoDB serialises 

422 through boto3's own type-converter, which raises 

423 :class:`TypeError` on an :class:`ast.Expression` just like 

424 the JSON path; stripping here keeps both backends symmetric. 

425 """ 

426 from .validation import strip_private_fields 

427 

428 table = self._get_table() 

429 table.put_item(Item=strip_private_fields(session)) 

430 

431 def list_sessions( 

432 self, filter: dict[str, Any] | None = None 

433 ) -> list[dict[str, Any]]: # pragma: no cover - DynamoDB 

434 """Return summary dicts via the ``status-index`` GSI. 

435 

436 When ``filter`` provides a ``status`` key, the call uses the GSI 

437 partition key directly. With no filter (or any other filter 

438 shape), this stub falls back to a table ``scan`` so that the 

439 method still returns the same summary shape as 

440 :meth:`FilesystemBackend.list_sessions`. 

441 """ 

442 from boto3.dynamodb.conditions import Key 

443 

444 table = self._get_table() 

445 if filter and "status" in filter: 

446 response = table.query( 

447 IndexName="status-index", 

448 KeyConditionExpression=Key("status").eq(filter["status"]), 

449 ) 

450 items = response.get("Items", []) 

451 else: 

452 response = table.scan() 

453 items = response.get("Items", []) 

454 

455 return [ 

456 { 

457 "session_id": item.get("session_id"), 

458 "status": item.get("status"), 

459 "created_at": item.get("created_at"), 

460 "iteration_count": len(item.get("iterations", []) or []), 

461 } 

462 for item in items 

463 ] 

464 

465 def delete_session(self, session_id: str) -> bool: # pragma: no cover - DynamoDB 

466 """Delete the session via ``delete_item`` (idempotent). 

467 

468 Uses ``ReturnValues="ALL_OLD"`` so the call can distinguish a 

469 successful deletion from a no-op on a missing key, matching the 

470 :class:`FilesystemBackend` semantics where the return value 

471 signals whether anything was actually removed. 

472 """ 

473 table = self._get_table() 

474 response = table.delete_item( 

475 Key={"session_id": session_id}, 

476 ReturnValues="ALL_OLD", 

477 ) 

478 return response.get("Attributes") is not None 

479 

480 

481# ---------------------------------------------------------------------- # 

482# resolver 

483# ---------------------------------------------------------------------- # 

484 

485# Recognised values for the ``GCO_MISSION_STATE_BACKEND`` env var. Anything 

486# outside this set normalises to ``"filesystem"`` — same fallback rule as 

487# the ``GCO_MCP_TOOL_SEARCH`` precedent in ``mcp/server.py``. 

488_BACKEND_VALUES = frozenset({"filesystem", "dynamodb"}) 

489 

490# Cached backend instance, populated on first call to ``get_backend()``. 

491# ``GCO_MISSION_STATE_BACKEND`` is resolved once at first use and the 

492# resulting instance is reused for every subsequent call. Env vars do not 

493# change at runtime in practice, and a shared instance keeps the 

494# ``FilesystemBackend._root_initialized`` cache hot across callers — the 

495# same module-load resolution pattern used for ``GCO_MCP_TOOL_SEARCH`` in 

496# ``mcp/server.py``. 

497_BACKEND_INSTANCE: MissionStateBackend | None = None 

498 

499 

500def get_backend() -> MissionStateBackend: 

501 """Return the configured Mission state backend, lazily constructed. 

502 

503 Reads ``GCO_MISSION_STATE_BACKEND`` on first call. Recognised values 

504 are ``"filesystem"`` (default) and ``"dynamodb"``; any other value 

505 logs a single warning naming the unrecognised input and falls back 

506 to :class:`FilesystemBackend`, matching the unknown-value handling 

507 for ``GCO_MCP_TOOL_SEARCH`` in ``mcp/server.py``. The resolved 

508 backend is cached at module scope so subsequent calls return the 

509 same instance. 

510 """ 

511 global _BACKEND_INSTANCE 

512 if _BACKEND_INSTANCE is not None: 

513 return _BACKEND_INSTANCE 

514 

515 raw = os.environ.get("GCO_MISSION_STATE_BACKEND", "filesystem").strip().lower() 

516 if raw == "dynamodb": 

517 _BACKEND_INSTANCE = DynamoDBBackend() # pragma: no cover - boto3 path 

518 elif raw == "filesystem": 

519 _BACKEND_INSTANCE = FilesystemBackend() 

520 else: 

521 logger.warning( 

522 "Unrecognised GCO_MISSION_STATE_BACKEND value %r; falling back to filesystem", 

523 raw, 

524 ) 

525 _BACKEND_INSTANCE = FilesystemBackend() 

526 return _BACKEND_INSTANCE