Coverage for mcp/tools/mission.py: 96%
180 statements
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-15 15:07 +0000
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-15 15:07 +0000
1"""Mission goal-directed iteration loop tools.
3The whole module body is gated by :data:`feature_flags.FLAG_MISSION` so
4the nine ``mission_*`` tool decorators only fire when
5``GCO_ENABLE_MISSION=true``. With the flag unset, this module imports
6cleanly and FastMCP never sees the tools.
8[gated by GCO_ENABLE_MISSION]
9"""
11from __future__ import annotations
13import contextlib
14import json
15import secrets
16import sys
17from collections.abc import Mapping, Sequence
18from datetime import UTC, datetime
19from pathlib import Path
20from typing import Any, cast
22from audit import audit_logged
23from feature_flags import FLAG_MISSION, is_enabled
24from server import mcp
26# Mission package lives under ``mcp/mission/``; the path-injection
27# pattern matches the rest of the MCP module surface so ``import
28# mission.*`` resolves without making the ``mcp`` directory a package.
29sys.path.insert(0, str(Path(__file__).resolve().parent.parent))
32def _try_get_context() -> Any | None:
33 """Return the active FastMCP Context if inside a request, else ``None``.
35 Mirrors :func:`mcp.tools.jobs._ctx_warning`: wraps the optional
36 ``fastmcp.server.dependencies.get_context`` import so the helper
37 works in unit tests that don't go through an MCP request — those
38 raise ``RuntimeError`` from ``get_context()``, which we swallow.
39 """
40 try:
41 from fastmcp.server.dependencies import get_context
43 return get_context()
44 except Exception:
45 return None
48# Module body is entirely gated by the feature flag. When the flag is
49# unset, none of the tool decorators below fire and FastMCP never sees
50# the registrations.
51if is_enabled(FLAG_MISSION):
52 from mission import (
53 sampling as mission_sampling,
54 )
55 from mission import (
56 state as mission_state,
57 )
58 from mission import (
59 validation as mission_validation,
60 )
61 from mission.decide import decide_verdict
62 from mission.engine import MissionEngineError
63 from mission.types import SCHEMA_VERSION, TERMINAL_STATES, SessionState
64 from mission.validation import MissionValidationError
66 # ------------------------------------------------------------------ #
67 # Registry introspection helpers
68 # ------------------------------------------------------------------ #
70 async def _registered_tools_dict() -> dict[str, Any]:
71 """Return a name -> Tool object mapping for every registered tool.
73 Uses ``mcp._list_tools()`` because that bypasses the catalog-
74 replacement transforms (BM25 / Code Mode) and gives us the
75 underlying registry. Tolerates FastMCP API drift by falling
76 back to an empty mapping on any exception — the validators
77 downstream interpret an empty dict as "nothing registered",
78 which is benign for sessions that don't lean on tag-based
79 cost gating.
80 """
81 try:
82 tools = await mcp._list_tools()
83 except Exception:
84 return {}
85 return {t.name: t for t in tools}
87 async def _registered_tool_tags() -> dict[str, set[str]]:
88 """Return name -> tag-set for every registered tool.
90 Same defensive shape as :func:`_registered_tools_dict`. Tools
91 with no declared ``tags`` attribute contribute an empty set so
92 the budget validator treats them as non-cost-incurring.
93 """
94 registered = await _registered_tools_dict()
95 out: dict[str, set[str]] = {}
96 for name, tool in registered.items():
97 tags = getattr(tool, "tags", None)
98 out[name] = set(tags) if tags else set()
99 return out
101 async def _tool_docstrings_dict() -> dict[str, str]:
102 """Return name -> docstring/description mapping for sampling prompts."""
103 registered = await _registered_tools_dict()
104 return {name: (getattr(t, "description", "") or "") for name, t in registered.items()}
106 # ------------------------------------------------------------------ #
107 # Session helpers
108 # ------------------------------------------------------------------ #
110 def _strip_private_fields(session: Mapping[str, Any]) -> dict[str, Any]:
111 """Return a JSON-safe copy of ``session`` with private criterion keys dropped.
113 Thin alias over :func:`mission.validation.strip_private_fields`.
114 Kept as a module-private name so call sites in this file
115 (``mission_start``, ``mission_complete``, etc.) read at a
116 glance without having to qualify the canonical helper through
117 the long ``mission.validation`` path.
118 """
119 return mission_validation.strip_private_fields(session)
121 def _strip_private_fields_iterations(
122 iterations: Sequence[Mapping[str, Any]],
123 ) -> list[dict[str, Any]]:
124 """Strip private keys from each iteration's ``criteria_evaluation`` shape.
126 Thin alias over
127 :func:`mission.validation.strip_private_fields_iterations`.
128 """
129 return mission_validation.strip_private_fields_iterations(iterations)
131 # ------------------------------------------------------------------ #
132 # Engine wiring
133 # ------------------------------------------------------------------ #
135 # The engine factory itself lives in :mod:`mcp.mission._engine_factory`
136 # so the CLI can reuse the same wiring without crossing the
137 # ``GCO_ENABLE_MISSION`` gate. We keep a thin alias here so call
138 # sites in this module stay readable without having to spell out
139 # the long import path.
140 from mission._engine_factory import build_mission_engine as _build_engine # noqa: PLC0415
142 # ------------------------------------------------------------------ #
143 # mission_start
144 # ------------------------------------------------------------------ #
146 @mcp.tool(tags={"low-risk", "mission"})
147 @audit_logged
148 async def mission_start(
149 directive: str,
150 criteria: list[dict[str, Any]],
151 budget: dict[str, Any],
152 tool_allowlist: list[str] | None = None,
153 checkpoint_cadence: dict[str, Any] | None = None,
154 stagnation_threshold: int = 3,
155 use_sampling: bool | None = None,
156 allow_scripted_strategies: bool = False,
157 sampling_model_preferences: dict[str, Any] | None = None,
158 allow_all_tools: bool = False,
159 ) -> str:
160 """[gated by GCO_ENABLE_MISSION] Start a new Mission session.
162 Args:
163 directive: Natural-language goal description.
164 criteria: List of success criterion dicts (``metric_threshold``,
165 ``event``, or ``predicate`` kinds).
166 budget: Budget controls dict with ``max_iterations`` and
167 ``max_wall_clock_seconds``. Cost guardrails live
168 out-of-band via AWS Budgets and Cost Anomaly Detection.
169 tool_allowlist: List of tool names the session may invoke.
170 Optional; omit it when ``allow_all_tools`` is set.
171 checkpoint_cadence: Optional cadence dict (default
172 ``{"kind": "every_iteration"}``).
173 stagnation_threshold: Iterations of no progress before
174 terminating (default 3).
175 use_sampling: Three-state opt-in. ``None`` auto-detects,
176 ``True`` opts in explicitly, ``False`` opts out.
177 allow_scripted_strategies: When True, the session permits
178 scripted strategies (validated via the sandbox AST).
179 sampling_model_preferences: Optional FastMCP
180 ``ModelPreferences`` payload forwarded to MCP sampling.
181 allow_all_tools: When True (default ``False``), resolve the
182 session's allowlist to every currently-registered tool
183 minus the ``mission_*`` control tools, instead of an
184 explicit ``tool_allowlist``. Mutually exclusive with a
185 non-empty ``tool_allowlist``.
187 Returns a JSON string with the new ``session_id`` and the
188 resolved sampling state, or an error envelope.
189 """
190 try:
191 directive_clean = mission_validation.validate_directive(directive)
192 criteria_clean = mission_validation.validate_criteria(criteria)
193 registered_tools = await _registered_tools_dict()
194 registered_tags = await _registered_tool_tags()
195 control_tools = {n for n, tags in registered_tags.items() if "mission" in tags}
196 allowlist_clean = mission_validation.resolve_effective_allowlist(
197 allow_all_tools=allow_all_tools,
198 explicit_allowlist=tool_allowlist,
199 registered_tools=registered_tools,
200 control_tools=control_tools,
201 )
202 budget_clean = mission_validation.validate_budget(
203 budget, allowlist_clean, registered_tags
204 )
205 cadence_clean = mission_validation.validate_cadence(
206 checkpoint_cadence
207 if checkpoint_cadence is not None
208 else {"kind": "every_iteration"}
209 )
210 except MissionValidationError as err:
211 return json.dumps({"code": err.code, "details": err.details})
213 ctx = _try_get_context()
214 use_sampling_resolved, backend_resolved = mission_sampling.resolve_sampling_state(
215 ctx, use_sampling
216 )
218 session_id = f"mission-{secrets.token_hex(8)}"
219 session: dict[str, Any] = {
220 "version": SCHEMA_VERSION,
221 "session_id": session_id,
222 "directive_text": directive_clean,
223 "criteria": criteria_clean,
224 "budget": budget_clean,
225 "tool_allowlist": allowlist_clean,
226 "checkpoint_cadence": cadence_clean,
227 "stagnation_threshold": stagnation_threshold,
228 "use_sampling": use_sampling_resolved,
229 "sampling_backend_resolved": backend_resolved,
230 "allow_scripted_strategies": bool(allow_scripted_strategies),
231 "status": "pending",
232 "created_at": datetime.now(UTC).isoformat(),
233 "iterations": [],
234 "no_progress_counter": 0,
235 }
236 if sampling_model_preferences is not None: 236 ↛ 237line 236 didn't jump to line 237 because the condition on line 236 was never true
237 session["sampling_model_preferences"] = sampling_model_preferences
239 backend = mission_state.get_backend()
240 # Strip the validator's cached ``_parsed_ast`` AST nodes from
241 # predicate criteria before persistence — ``ast.Expression``
242 # is not JSON-serialisable and the FilesystemBackend writes
243 # via ``json.dump``. The engine re-parses on demand from the
244 # ``expression`` string when it next loads the session, so
245 # stripping here is lossless. Mirrors ``_strip_private_criteria``
246 # in ``cli/commands/mission_cmd.py``.
247 backend.save_session(cast("SessionState", _strip_private_fields(session)))
249 return json.dumps(
250 {
251 "session_id": session_id,
252 "status": "pending",
253 "use_sampling": use_sampling_resolved,
254 "sampling_backend_resolved": backend_resolved,
255 }
256 )
258 # ------------------------------------------------------------------ #
259 # mission_status
260 # ------------------------------------------------------------------ #
262 @mcp.tool(tags={"safe", "mission"})
263 @audit_logged
264 async def mission_status(session_id: str) -> str:
265 """[gated by GCO_ENABLE_MISSION] Get the full state of a Mission session.
267 Args:
268 session_id: The session identifier returned by
269 :func:`mission_start`.
271 Returns the full session JSON or an error envelope when the
272 session is unknown.
273 """
274 backend = mission_state.get_backend()
275 session = backend.load_session(session_id)
276 if session is None:
277 return json.dumps(
278 {
279 "code": "session_not_found",
280 "details": {"session_id": session_id},
281 }
282 )
283 cleaned = _strip_private_fields(session)
284 return json.dumps(cleaned, default=str)
286 # ------------------------------------------------------------------ #
287 # mission_iterate
288 # ------------------------------------------------------------------ #
290 @mcp.tool(tags={"low-risk", "mission"})
291 @audit_logged
292 async def mission_iterate(
293 session_id: str,
294 max_iterations_this_call: int = 1,
295 ) -> str:
296 """[gated by GCO_ENABLE_MISSION] Run iteration(s) on a Mission session.
298 Args:
299 session_id: The session to iterate.
300 max_iterations_this_call: How many iterations to run before
301 returning (default 1). The loop exits early on a
302 terminal verdict (``complete`` or ``terminate``).
304 Returns a JSON object with a ``session_id`` and an
305 ``iterations`` list of iteration summaries (verdict, reason,
306 iteration index). On engine errors, returns an error envelope
307 with whatever summaries had accumulated before the failure.
308 """
309 if max_iterations_this_call < 1:
310 return json.dumps(
311 {
312 "code": "invalid_argument",
313 "details": {"reason": "max_iterations_this_call must be >= 1"},
314 }
315 )
317 ctx = _try_get_context()
318 backend = mission_state.get_backend()
320 session = backend.load_session(session_id)
321 if session is None:
322 return json.dumps(
323 {
324 "code": "session_not_found",
325 "details": {"session_id": session_id},
326 }
327 )
329 engine = await _build_engine(session, ctx)
331 summaries: list[dict[str, Any]] = []
332 for _ in range(max_iterations_this_call):
333 try:
334 record = await engine.run_iteration(session_id, ctx=ctx)
335 except MissionEngineError as err:
336 return json.dumps(
337 {
338 "code": err.code,
339 "details": {"session_id": session_id},
340 "iterations": summaries,
341 }
342 )
343 summaries.append(
344 {
345 "iteration_index": record["iteration_index"],
346 "verdict": record["verdict"],
347 "verdict_reason": record["verdict_reason"],
348 }
349 )
350 if record["verdict"] in ("complete", "terminate"):
351 break
353 return json.dumps({"session_id": session_id, "iterations": summaries})
355 # ------------------------------------------------------------------ #
356 # mission_checkpoint
357 # ------------------------------------------------------------------ #
359 @mcp.tool(tags={"safe", "mission"})
360 @audit_logged
361 async def mission_checkpoint(session_id: str) -> str:
362 """[gated by GCO_ENABLE_MISSION] Re-run the verdict cascade on the latest iteration.
364 Args:
365 session_id: The session whose latest iteration should be
366 re-evaluated.
368 Returns a JSON object with the freshly-computed verdict and
369 reason. Does not run the propose / execute / observe phases —
370 only the deterministic decide cascade. Returns an error envelope
371 when the session is missing or has no iterations yet.
372 """
373 backend = mission_state.get_backend()
374 session = backend.load_session(session_id)
375 if session is None:
376 return json.dumps(
377 {
378 "code": "session_not_found",
379 "details": {"session_id": session_id},
380 }
381 )
382 iterations = session.get("iterations") or []
383 if not iterations:
384 return json.dumps(
385 {
386 "code": "no_iterations",
387 "details": {"session_id": session_id},
388 }
389 )
391 latest = iterations[-1]
392 verdict, reason = decide_verdict(session, latest, datetime.now(UTC))
393 return json.dumps(
394 {
395 "session_id": session_id,
396 "iteration_index": latest["iteration_index"],
397 "verdict": verdict,
398 "verdict_reason": reason,
399 }
400 )
402 # ------------------------------------------------------------------ #
403 # mission_complete
404 # ------------------------------------------------------------------ #
406 @mcp.tool(tags={"low-risk", "mission"})
407 @audit_logged
408 async def mission_complete(session_id: str, reason: str = "forced_complete") -> str:
409 """[gated by GCO_ENABLE_MISSION] Force a Mission session into completed status.
411 Args:
412 session_id: The session to complete.
413 reason: Free-form reason recorded alongside the synthetic
414 final verdict (default ``forced_complete``).
416 Stamps a synthetic ``complete`` final verdict and an
417 ``ended_at`` timestamp. Refuses sessions already in a terminal
418 state.
419 """
420 del reason # currently informational; the synthetic verdict is fixed
421 backend = mission_state.get_backend()
422 session = backend.load_session(session_id)
423 if session is None:
424 return json.dumps(
425 {
426 "code": "session_not_found",
427 "details": {"session_id": session_id},
428 }
429 )
430 if session["status"] in TERMINAL_STATES:
431 return json.dumps(
432 {
433 "code": "session_terminal",
434 "details": {
435 "session_id": session_id,
436 "status": session["status"],
437 },
438 }
439 )
440 session["status"] = "completed"
441 session["final_verdict"] = "complete"
442 session["ended_at"] = datetime.now(UTC).isoformat()
443 # Defensive strip — sessions loaded from the backend were
444 # already private-field-clean, but a future change that
445 # re-attaches ``_parsed_ast`` somewhere in the flow shouldn't
446 # silently break persistence. ``_strip_private_fields`` is
447 # cheap and idempotent on already-clean inputs.
448 backend.save_session(cast("SessionState", _strip_private_fields(session)))
449 return json.dumps({"session_id": session_id, "status": "completed"})
451 # ------------------------------------------------------------------ #
452 # mission_abort
453 # ------------------------------------------------------------------ #
455 @mcp.tool(tags={"low-risk", "mission"})
456 @audit_logged
457 async def mission_abort(session_id: str, pause: bool = False) -> str:
458 """[gated by GCO_ENABLE_MISSION] Pause or terminate a Mission session.
460 Args:
461 session_id: The session to transition.
462 pause: When True, transition to ``paused``. When False
463 (default), transition to ``terminated`` with a
464 synthetic ``terminate`` final verdict.
466 Refuses sessions already in a terminal state. Best-effort
467 emits a ``ctx.warning`` when the active request context is
468 available so the operator sees the side-effect.
469 """
470 backend = mission_state.get_backend()
471 session = backend.load_session(session_id)
472 if session is None:
473 return json.dumps(
474 {
475 "code": "session_not_found",
476 "details": {"session_id": session_id},
477 }
478 )
479 if session["status"] in TERMINAL_STATES:
480 return json.dumps(
481 {
482 "code": "session_terminal",
483 "details": {
484 "session_id": session_id,
485 "status": session["status"],
486 },
487 }
488 )
489 if pause:
490 session["status"] = "paused"
491 else:
492 session["status"] = "terminated"
493 session["final_verdict"] = "terminate"
494 session["ended_at"] = datetime.now(UTC).isoformat()
495 ctx = _try_get_context()
496 if ctx is not None: 496 ↛ 500line 496 didn't jump to line 500 because the condition on line 496 was always true
497 with contextlib.suppress(Exception):
498 await ctx.warning(f"Mission session {session_id} terminated by operator.")
499 # Defensive strip — see mission_complete for the rationale.
500 backend.save_session(cast("SessionState", _strip_private_fields(session)))
501 return json.dumps({"session_id": session_id, "status": session["status"]})
503 # ------------------------------------------------------------------ #
504 # mission_resume
505 # ------------------------------------------------------------------ #
507 @mcp.tool(tags={"low-risk", "mission"})
508 @audit_logged
509 async def mission_resume(session_id: str) -> str:
510 """[gated by GCO_ENABLE_MISSION] Resume a paused Mission session.
512 Args:
513 session_id: The session to resume.
515 Transitions ``paused -> running``. Returns an error envelope
516 when the session is missing or not in ``paused`` state.
517 """
518 backend = mission_state.get_backend()
519 session = backend.load_session(session_id)
520 if session is None:
521 return json.dumps(
522 {
523 "code": "session_not_found",
524 "details": {"session_id": session_id},
525 }
526 )
527 if session["status"] != "paused":
528 return json.dumps(
529 {
530 "code": "not_paused",
531 "details": {
532 "session_id": session_id,
533 "status": session["status"],
534 },
535 }
536 )
537 session["status"] = "running"
538 # Defensive strip — see mission_complete for the rationale.
539 backend.save_session(cast("SessionState", _strip_private_fields(session)))
540 return json.dumps({"session_id": session_id, "status": "running"})
542 # ------------------------------------------------------------------ #
543 # mission_history
544 # ------------------------------------------------------------------ #
546 @mcp.tool(tags={"safe", "mission"})
547 @audit_logged
548 async def mission_history(session_id: str, format: str = "summary") -> str:
549 """[gated by GCO_ENABLE_MISSION] Get iteration history for a Mission session.
551 Args:
552 session_id: The session whose history to retrieve.
553 format: ``"summary"`` (default) returns a compact list of
554 ``{iteration_index, verdict, verdict_reason,
555 started_at, ended_at}`` dicts; ``"full"`` returns the
556 complete iteration record dicts.
558 Returns a JSON object with an ``iterations`` list, or an error
559 envelope when the session is unknown.
560 """
561 backend = mission_state.get_backend()
562 session = backend.load_session(session_id)
563 if session is None:
564 return json.dumps(
565 {
566 "code": "session_not_found",
567 "details": {"session_id": session_id},
568 }
569 )
570 iterations = session.get("iterations") or []
571 if format == "full":
572 return json.dumps(
573 {"iterations": _strip_private_fields_iterations(iterations)},
574 default=str,
575 )
576 summaries = [
577 {
578 "iteration_index": it.get("iteration_index"),
579 "verdict": it.get("verdict"),
580 "verdict_reason": it.get("verdict_reason"),
581 "started_at": it.get("started_at"),
582 "ended_at": it.get("ended_at"),
583 }
584 for it in iterations
585 ]
586 return json.dumps({"iterations": summaries})
588 # ------------------------------------------------------------------ #
589 # mission_list
590 # ------------------------------------------------------------------ #
592 @mcp.tool(tags={"safe", "mission"})
593 @audit_logged
594 async def mission_list(status: str | None = None) -> str:
595 """[gated by GCO_ENABLE_MISSION] List Mission sessions.
597 Args:
598 status: Optional filter. Recognised values are ``running``,
599 ``completed``, ``terminated``, ``failed``, ``paused``,
600 ``pending``. Omit to list every known session.
602 Returns a JSON object with a ``sessions`` list of summary dicts
603 (``session_id``, ``status``, ``created_at``,
604 ``iteration_count``).
605 """
606 backend = mission_state.get_backend()
607 filter_dict = {"status": status} if status else None
608 sessions = backend.list_sessions(filter_dict)
609 return json.dumps({"sessions": sessions})