Coverage for mcp/mission/state.py: 92%
128 statements
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-15 15:07 +0000
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-15 15:07 +0000
1"""Persistence backends for the Mission goal-directed iteration loop.
3This module defines the :class:`MissionStateBackend` protocol — the narrow
4interface the engine and tool wrappers depend on for loading, saving,
5listing, and deleting :class:`~mcp.mission.types.SessionState` records.
6Concrete implementations (filesystem, DynamoDB) and the
7:func:`get_backend` resolver land in follow-on slices of this file. The
8protocol is declared with :func:`typing.runtime_checkable` so tests can
9assert backend conformance with ``isinstance`` rather than relying on
10duck-typed call sites.
11"""
13from __future__ import annotations
15import contextlib
16import json
17import logging
18import os
19import tempfile
20from pathlib import Path
21from typing import Any, Protocol, cast, runtime_checkable
23from . import SCHEMA_VERSION
24from .types import SessionState
26logger = logging.getLogger(__name__)
29@runtime_checkable
30class MissionStateBackend(Protocol):
31 """Storage contract for Mission session records.
33 All four methods operate on whole :class:`SessionState` payloads keyed
34 by ``session_id``. Implementations are responsible for whatever
35 serialization, atomicity, and access-control guarantees their backing
36 store provides; callers treat the interface as opaque key-value
37 storage with a list operation that returns lightweight metadata
38 rather than full session bodies.
39 """
41 def load_session(self, session_id: str) -> SessionState | None:
42 """Return the session record for ``session_id`` or ``None`` if absent.
44 Implementations return ``None`` for both unknown ``session_id`` and
45 records whose ``version`` does not match the current
46 :data:`mcp.mission.SCHEMA_VERSION`; the caller cannot distinguish
47 the two and treats both as a missing session.
48 """
49 ...
51 def save_session(self, session: SessionState) -> None:
52 """Persist ``session`` keyed by its ``session_id`` field.
54 Writes are expected to be atomic from the reader's perspective: a
55 concurrent :meth:`load_session` either sees the prior record or
56 the new one, never a partial write.
57 """
58 ...
60 def list_sessions(self, filter: dict[str, Any] | None = None) -> list[dict[str, Any]]:
61 """Return lightweight metadata for sessions matching ``filter``.
63 Each returned dict carries identifying fields (``session_id``,
64 ``status``, ``created_at``, and similar) rather than the full
65 :class:`SessionState`. ``filter`` is an implementation-defined
66 mapping; passing ``None`` lists every session the backend can
67 see.
68 """
69 ...
71 def delete_session(self, session_id: str) -> bool:
72 """Remove ``session_id`` and return ``True`` if a record was deleted.
74 Returns ``False`` when no record existed; implementations do not
75 raise on a missing key so that callers can use ``delete_session``
76 as an idempotent cleanup primitive.
77 """
78 ...
81class FilesystemBackend:
82 """JSON-on-disk implementation of :class:`MissionStateBackend`.
84 Each session is persisted as ``<root>/<session_id>.json`` with its
85 matching :class:`~mcp.mission.types.SessionState` payload; the
86 Final_Report (when present) lives alongside it as
87 ``<root>/<session_id>.report.json``. Writes go through the standard
88 "temp file in the same directory, ``fsync``, then ``os.replace``"
89 pattern so a reader concurrent with a writer always sees either the
90 prior version of the file or the new one — never a partial JSON
91 document. The temp file lives in the same directory as the final
92 target so ``os.replace`` is a same-filesystem rename and therefore
93 atomic on POSIX.
95 On POSIX systems the root directory is created (and re-asserted) at
96 mode ``0o700`` and every session and report file is written at mode
97 ``0o600`` so persisted state is unreadable to other local users.
98 Permission calls are gated on ``os.name != "nt"`` because the POSIX
99 permission model does not apply on Windows; the backend still works
100 on Windows, just without the explicit mode tightening.
101 """
103 def __init__(self, root: Path | None = None) -> None:
104 self.root = root if root is not None else Path.home() / ".gco" / "missions"
105 self._root_initialized = False
107 # ------------------------------------------------------------------ #
108 # internals
109 # ------------------------------------------------------------------ #
111 def _ensure_root(self) -> None:
112 """Create the root directory on first use, idempotently.
114 We defer the ``mkdir`` to the first write so simply constructing
115 a backend (e.g. in the resolver in :func:`get_backend`) does not
116 eagerly create ``~/.gco/missions`` on a host that ends up using
117 a different backend.
118 """
119 if self._root_initialized:
120 return
121 self.root.mkdir(parents=True, exist_ok=True)
122 if os.name != "nt": 122 ↛ 133line 122 didn't jump to line 133 because the condition on line 122 was always true
123 with contextlib.suppress(OSError):
124 # Best-effort tightening: a directory we already own with
125 # different permissions is still safer to use than to
126 # refuse the write outright. 0o700 (owner-only) is
127 # intentional for ~/.gco/missions: session JSON contains
128 # operator-supplied directives, criteria, observations,
129 # and tool-call results that should not be readable by
130 # other local users.
131 # nosemgrep: python.lang.security.audit.insecure-file-permissions.insecure-file-permissions
132 os.chmod(self.root, 0o700)
133 self._root_initialized = True
135 def _session_path(self, session_id: str) -> Path:
136 return self.root / f"{session_id}.json"
138 def _report_path(self, session_id: str) -> Path:
139 return self.root / f"{session_id}.report.json"
141 # ------------------------------------------------------------------ #
142 # protocol methods
143 # ------------------------------------------------------------------ #
145 def load_session(self, session_id: str) -> SessionState | None:
146 """Return the persisted session or ``None`` for missing/unsupported.
148 Returns ``None`` when the file does not exist, when the root
149 directory has not been created yet, when the JSON cannot be
150 parsed, or when the on-disk ``version`` field does not match
151 :data:`mcp.mission.SCHEMA_VERSION`. Version mismatches log a
152 single warning naming the unsupported value so an operator can
153 spot stale state without having to grep the directory by hand.
154 """
155 path = self._session_path(session_id)
156 try:
157 text = path.read_text(encoding="utf-8")
158 except FileNotFoundError:
159 return None
160 except OSError:
161 return None
163 try:
164 payload = json.loads(text)
165 except ValueError:
166 return None
168 if not isinstance(payload, dict):
169 return None
171 version = payload.get("version")
172 if version != SCHEMA_VERSION:
173 logger.warning(
174 "Refusing to load Mission session %s: unsupported schema version %r",
175 session_id,
176 version,
177 )
178 return None
180 return payload # type: ignore[return-value]
182 def save_session(self, session: SessionState) -> None:
183 """Persist ``session`` atomically to ``<root>/<session_id>.json``.
185 Opens a temp file in the same directory, dumps JSON, flushes and
186 ``fsync``s, applies POSIX mode ``0o600`` (when supported), then
187 ``os.replace``s onto the final path. A failure mid-write leaves
188 the temp file behind but never replaces the existing final file,
189 so the previously-persisted state remains loadable.
191 Defense-in-depth strip. The validators in
192 :mod:`mcp.mission.validation` attach a cached
193 :class:`ast.Expression` under ``_parsed_ast`` on every
194 ``predicate`` criterion. That object is not JSON-serialisable;
195 a caller that hands a freshly-validated session straight to
196 ``save_session`` without first stripping the cache would
197 raise :class:`TypeError` at ``json.dump`` time. We strip
198 unconditionally here so every persistence path stays correct
199 regardless of which caller forgot. The strip is cheap and
200 idempotent on already-clean inputs.
201 """
202 # Local import: ``mission.validation`` is part of the same
203 # package so this isn't a cross-package edge, just a
204 # dependency-direction kept lazy to keep the eager import
205 # surface of ``state`` minimal.
206 from .validation import strip_private_fields
208 self._ensure_root()
209 session_id = session["session_id"]
210 final = self._session_path(session_id)
211 cleaned = cast("SessionState", strip_private_fields(session))
213 try:
214 tmp = tempfile.NamedTemporaryFile( # noqa: SIM115 - explicit close+replace below
215 mode="w",
216 encoding="utf-8",
217 dir=str(self.root),
218 prefix=f"{session_id}.",
219 suffix=".json.tmp",
220 delete=False,
221 )
222 try:
223 json.dump(cleaned, tmp)
224 tmp.flush()
225 os.fsync(tmp.fileno())
226 finally:
227 tmp.close()
229 if os.name != "nt": 229 ↛ 236line 229 didn't jump to line 236 because the condition on line 229 was always true
230 with contextlib.suppress(OSError):
231 # Same rationale as in ``_ensure_root`` — proceed
232 # with the replace rather than abandoning a write
233 # we already fsynced.
234 os.chmod(tmp.name, 0o600)
236 os.replace(tmp.name, final)
237 except OSError as exc:
238 # Re-raise with the underlying message intact so callers and
239 # operators see the real cause (disk full, permission denied,
240 # etc.) rather than a wrapped abstraction.
241 raise OSError(str(exc)) from exc
243 def list_sessions(self, filter: dict[str, Any] | None = None) -> list[dict[str, Any]]:
244 """Return summary dicts for every parseable session under ``root``.
246 Each entry has the shape ``{"session_id", "status", "created_at",
247 "iteration_count"}``. Sessions whose JSON fails to parse, whose
248 version is unsupported, or which are missing required summary
249 fields are silently skipped (one debug-log line per skip) so a
250 single corrupt file cannot block listing the rest.
252 ``filter`` currently supports the ``status`` key only; callers
253 pass ``{"status": "running"}`` to narrow the list.
254 """
255 if not self.root.exists():
256 return []
258 results: list[dict[str, Any]] = []
259 for path in self.root.glob("*.json"):
260 # Skip the sibling report files — they share the directory
261 # but are not session payloads.
262 if path.name.endswith(".report.json"):
263 continue
264 try:
265 payload = json.loads(path.read_text(encoding="utf-8"))
266 except OSError, ValueError:
267 logger.debug("Skipping unreadable Mission file: %s", path)
268 continue
269 if not isinstance(payload, dict):
270 logger.debug("Skipping non-object Mission file: %s", path)
271 continue
272 if payload.get("version") != SCHEMA_VERSION:
273 logger.debug(
274 "Skipping Mission file %s with unknown version %r",
275 path,
276 payload.get("version"),
277 )
278 continue
280 summary = {
281 "session_id": payload.get("session_id", path.stem),
282 "status": payload.get("status"),
283 "created_at": payload.get("created_at"),
284 "iteration_count": len(payload.get("iterations", []) or []),
285 }
286 results.append(summary)
288 if filter and "status" in filter:
289 wanted = filter["status"]
290 results = [r for r in results if r.get("status") == wanted]
292 return results
294 def delete_session(self, session_id: str) -> bool:
295 """Remove the session JSON and any matching report file.
297 Returns ``True`` when at least one of the two files existed and
298 was removed; ``False`` when neither was present (including when
299 the root directory has never been created). The two removals are
300 independent so a stale ``.report.json`` left behind by an
301 earlier crash is still cleaned up even when the session JSON has
302 already been deleted.
303 """
304 if not self.root.exists():
305 return False
307 removed = False
308 for path in (self._session_path(session_id), self._report_path(session_id)):
309 try:
310 os.remove(path)
311 removed = True
312 except FileNotFoundError:
313 continue
314 except OSError:
315 # An unreadable-but-present file should not silently
316 # masquerade as "no record existed"; surface it.
317 raise
318 return removed
321class DynamoDBBackend:
322 """DynamoDB-backed implementation of :class:`MissionStateBackend`.
324 Stub implementation: this class declares the protocol shape so the
325 global stack's CDK wiring can reference the backend type and so the
326 resolver in slice 3.4 can construct it, but no automated test in
327 this slice exercises any code path that touches AWS. Each method
328 is annotated with a ``TODO(mission-dynamodb)`` marker right above
329 its body and the corresponding tests are skipped (see slice 3.6).
330 A separate, AWS-credentialed smoke test validates the real
331 behaviour.
333 Item schema mirrors the :class:`SessionState` TypedDict one-to-one:
334 the partition key is ``session_id`` and ``status`` plus ``created_at``
335 feed a ``status-index`` GSI so :meth:`list_sessions` can filter by
336 status without a full table scan. ``put_item`` is atomic by virtue
337 of DynamoDB's single-item write semantics, so the temp-file dance
338 used by :class:`FilesystemBackend` is unnecessary here.
340 Table-name resolution is lazy: when the constructor's ``table_name``
341 argument is ``None``, the table name is fetched from SSM at
342 ``/{project_name}/missions-table-name`` on the first call that
343 needs it (not at construction time). This matches the precedent
344 pattern in ``cli/models.py`` and lets unit tests construct a
345 ``DynamoDBBackend()`` on a host without AWS credentials without
346 triggering an SSM call. ``project_name`` is read from the
347 ``GCO_PROJECT_NAME`` environment variable, defaulting to ``"gco"``
348 so a fresh checkout (or CI run without the env var set) lines up
349 with the default project name in ``cli/config.py``.
351 The SSM lookup goes through :func:`gco.services.aws_ssm.get_ssm_parameter`,
352 the shared helper that consolidates the pattern previously duplicated
353 across ``cli/models.py``, ``cli/analytics_user_mgmt.py``, and
354 ``gco/services/health_monitor.py``. Putting the helper under
355 ``gco/services/`` (rather than ``cli/aws_client.py``) keeps
356 ``mcp/`` free of the forbidden ``mcp -> cli`` import edge while
357 still letting every backend share one implementation.
358 """
360 def __init__(self, table_name: str | None = None) -> None:
361 self._table_name: str | None = table_name
362 self._table: Any = None # boto3 Table resource, lazily constructed
364 # ------------------------------------------------------------------ #
365 # internals
366 # ------------------------------------------------------------------ #
368 def _resolve_table_name(self) -> str: # pragma: no cover - boto3 / SSM
369 """Return the cached table name, fetching from SSM on first call.
371 Reads ``GCO_PROJECT_NAME`` (default ``"gco"``) to build the SSM
372 parameter path ``/{project_name}/missions-table-name``. The
373 value is cached on the instance so subsequent method calls do
374 not re-hit SSM.
375 """
376 if self._table_name is not None:
377 return self._table_name
379 from gco.services.aws_ssm import get_ssm_parameter
381 project_name = os.environ.get("GCO_PROJECT_NAME", "gco")
382 param_name = f"/{project_name}/missions-table-name"
384 self._table_name = get_ssm_parameter(param_name)
385 return self._table_name
387 def _get_table(self) -> Any: # pragma: no cover - boto3 resource
388 """Return the cached ``boto3`` Table resource, building it lazily."""
389 if self._table is not None:
390 return self._table
392 import boto3
394 self._table = boto3.resource("dynamodb").Table(self._resolve_table_name())
395 return self._table
397 # ------------------------------------------------------------------ #
398 # protocol methods
399 # ------------------------------------------------------------------ #
401 def load_session(self, session_id: str) -> SessionState | None: # pragma: no cover - DynamoDB
402 """Fetch the session via ``get_item`` keyed on ``session_id``."""
403 table = self._get_table()
404 response = table.get_item(Key={"session_id": session_id})
405 item = response.get("Item")
406 if item is None:
407 return None
408 if item.get("version") != SCHEMA_VERSION:
409 logger.warning(
410 "Refusing to load Mission session %s: unsupported schema version %r",
411 session_id,
412 item.get("version"),
413 )
414 return None
415 return cast("SessionState", item)
417 def save_session(self, session: SessionState) -> None: # pragma: no cover - DynamoDB
418 """Persist the session via ``put_item`` (atomic single-item write).
420 Defense-in-depth strip — same rationale as
421 :meth:`FilesystemBackend.save_session`. DynamoDB serialises
422 through boto3's own type-converter, which raises
423 :class:`TypeError` on an :class:`ast.Expression` just like
424 the JSON path; stripping here keeps both backends symmetric.
425 """
426 from .validation import strip_private_fields
428 table = self._get_table()
429 table.put_item(Item=strip_private_fields(session))
431 def list_sessions(
432 self, filter: dict[str, Any] | None = None
433 ) -> list[dict[str, Any]]: # pragma: no cover - DynamoDB
434 """Return summary dicts via the ``status-index`` GSI.
436 When ``filter`` provides a ``status`` key, the call uses the GSI
437 partition key directly. With no filter (or any other filter
438 shape), this stub falls back to a table ``scan`` so that the
439 method still returns the same summary shape as
440 :meth:`FilesystemBackend.list_sessions`.
441 """
442 from boto3.dynamodb.conditions import Key
444 table = self._get_table()
445 if filter and "status" in filter:
446 response = table.query(
447 IndexName="status-index",
448 KeyConditionExpression=Key("status").eq(filter["status"]),
449 )
450 items = response.get("Items", [])
451 else:
452 response = table.scan()
453 items = response.get("Items", [])
455 return [
456 {
457 "session_id": item.get("session_id"),
458 "status": item.get("status"),
459 "created_at": item.get("created_at"),
460 "iteration_count": len(item.get("iterations", []) or []),
461 }
462 for item in items
463 ]
465 def delete_session(self, session_id: str) -> bool: # pragma: no cover - DynamoDB
466 """Delete the session via ``delete_item`` (idempotent).
468 Uses ``ReturnValues="ALL_OLD"`` so the call can distinguish a
469 successful deletion from a no-op on a missing key, matching the
470 :class:`FilesystemBackend` semantics where the return value
471 signals whether anything was actually removed.
472 """
473 table = self._get_table()
474 response = table.delete_item(
475 Key={"session_id": session_id},
476 ReturnValues="ALL_OLD",
477 )
478 return response.get("Attributes") is not None
481# ---------------------------------------------------------------------- #
482# resolver
483# ---------------------------------------------------------------------- #
485# Recognised values for the ``GCO_MISSION_STATE_BACKEND`` env var. Anything
486# outside this set normalises to ``"filesystem"`` — same fallback rule as
487# the ``GCO_MCP_TOOL_SEARCH`` precedent in ``mcp/server.py``.
488_BACKEND_VALUES = frozenset({"filesystem", "dynamodb"})
490# Cached backend instance, populated on first call to ``get_backend()``.
491# ``GCO_MISSION_STATE_BACKEND`` is resolved once at first use and the
492# resulting instance is reused for every subsequent call. Env vars do not
493# change at runtime in practice, and a shared instance keeps the
494# ``FilesystemBackend._root_initialized`` cache hot across callers — the
495# same module-load resolution pattern used for ``GCO_MCP_TOOL_SEARCH`` in
496# ``mcp/server.py``.
497_BACKEND_INSTANCE: MissionStateBackend | None = None
500def get_backend() -> MissionStateBackend:
501 """Return the configured Mission state backend, lazily constructed.
503 Reads ``GCO_MISSION_STATE_BACKEND`` on first call. Recognised values
504 are ``"filesystem"`` (default) and ``"dynamodb"``; any other value
505 logs a single warning naming the unrecognised input and falls back
506 to :class:`FilesystemBackend`, matching the unknown-value handling
507 for ``GCO_MCP_TOOL_SEARCH`` in ``mcp/server.py``. The resolved
508 backend is cached at module scope so subsequent calls return the
509 same instance.
510 """
511 global _BACKEND_INSTANCE
512 if _BACKEND_INSTANCE is not None:
513 return _BACKEND_INSTANCE
515 raw = os.environ.get("GCO_MISSION_STATE_BACKEND", "filesystem").strip().lower()
516 if raw == "dynamodb":
517 _BACKEND_INSTANCE = DynamoDBBackend() # pragma: no cover - boto3 path
518 elif raw == "filesystem":
519 _BACKEND_INSTANCE = FilesystemBackend()
520 else:
521 logger.warning(
522 "Unrecognised GCO_MISSION_STATE_BACKEND value %r; falling back to filesystem",
523 raw,
524 )
525 _BACKEND_INSTANCE = FilesystemBackend()
526 return _BACKEND_INSTANCE