Coverage for mcp/tools/mission.py: 96%

180 statements  

« prev     ^ index     » next       coverage.py v7.14.1, created at 2026-06-15 15:07 +0000

1"""Mission goal-directed iteration loop tools. 

2 

3The whole module body is gated by :data:`feature_flags.FLAG_MISSION` so 

4the nine ``mission_*`` tool decorators only fire when 

5``GCO_ENABLE_MISSION=true``. With the flag unset, this module imports 

6cleanly and FastMCP never sees the tools. 

7 

8[gated by GCO_ENABLE_MISSION] 

9""" 

10 

11from __future__ import annotations 

12 

13import contextlib 

14import json 

15import secrets 

16import sys 

17from collections.abc import Mapping, Sequence 

18from datetime import UTC, datetime 

19from pathlib import Path 

20from typing import Any, cast 

21 

22from audit import audit_logged 

23from feature_flags import FLAG_MISSION, is_enabled 

24from server import mcp 

25 

26# Mission package lives under ``mcp/mission/``; the path-injection 

27# pattern matches the rest of the MCP module surface so ``import 

28# mission.*`` resolves without making the ``mcp`` directory a package. 

29sys.path.insert(0, str(Path(__file__).resolve().parent.parent)) 

30 

31 

32def _try_get_context() -> Any | None: 

33 """Return the active FastMCP Context if inside a request, else ``None``. 

34 

35 Mirrors :func:`mcp.tools.jobs._ctx_warning`: wraps the optional 

36 ``fastmcp.server.dependencies.get_context`` import so the helper 

37 works in unit tests that don't go through an MCP request — those 

38 raise ``RuntimeError`` from ``get_context()``, which we swallow. 

39 """ 

40 try: 

41 from fastmcp.server.dependencies import get_context 

42 

43 return get_context() 

44 except Exception: 

45 return None 

46 

47 

48# Module body is entirely gated by the feature flag. When the flag is 

49# unset, none of the tool decorators below fire and FastMCP never sees 

50# the registrations. 

51if is_enabled(FLAG_MISSION): 

52 from mission import ( 

53 sampling as mission_sampling, 

54 ) 

55 from mission import ( 

56 state as mission_state, 

57 ) 

58 from mission import ( 

59 validation as mission_validation, 

60 ) 

61 from mission.decide import decide_verdict 

62 from mission.engine import MissionEngineError 

63 from mission.types import SCHEMA_VERSION, TERMINAL_STATES, SessionState 

64 from mission.validation import MissionValidationError 

65 

66 # ------------------------------------------------------------------ # 

67 # Registry introspection helpers 

68 # ------------------------------------------------------------------ # 

69 

70 async def _registered_tools_dict() -> dict[str, Any]: 

71 """Return a name -> Tool object mapping for every registered tool. 

72 

73 Uses ``mcp._list_tools()`` because that bypasses the catalog- 

74 replacement transforms (BM25 / Code Mode) and gives us the 

75 underlying registry. Tolerates FastMCP API drift by falling 

76 back to an empty mapping on any exception — the validators 

77 downstream interpret an empty dict as "nothing registered", 

78 which is benign for sessions that don't lean on tag-based 

79 cost gating. 

80 """ 

81 try: 

82 tools = await mcp._list_tools() 

83 except Exception: 

84 return {} 

85 return {t.name: t for t in tools} 

86 

87 async def _registered_tool_tags() -> dict[str, set[str]]: 

88 """Return name -> tag-set for every registered tool. 

89 

90 Same defensive shape as :func:`_registered_tools_dict`. Tools 

91 with no declared ``tags`` attribute contribute an empty set so 

92 the budget validator treats them as non-cost-incurring. 

93 """ 

94 registered = await _registered_tools_dict() 

95 out: dict[str, set[str]] = {} 

96 for name, tool in registered.items(): 

97 tags = getattr(tool, "tags", None) 

98 out[name] = set(tags) if tags else set() 

99 return out 

100 

101 async def _tool_docstrings_dict() -> dict[str, str]: 

102 """Return name -> docstring/description mapping for sampling prompts.""" 

103 registered = await _registered_tools_dict() 

104 return {name: (getattr(t, "description", "") or "") for name, t in registered.items()} 

105 

106 # ------------------------------------------------------------------ # 

107 # Session helpers 

108 # ------------------------------------------------------------------ # 

109 

110 def _strip_private_fields(session: Mapping[str, Any]) -> dict[str, Any]: 

111 """Return a JSON-safe copy of ``session`` with private criterion keys dropped. 

112 

113 Thin alias over :func:`mission.validation.strip_private_fields`. 

114 Kept as a module-private name so call sites in this file 

115 (``mission_start``, ``mission_complete``, etc.) read at a 

116 glance without having to qualify the canonical helper through 

117 the long ``mission.validation`` path. 

118 """ 

119 return mission_validation.strip_private_fields(session) 

120 

121 def _strip_private_fields_iterations( 

122 iterations: Sequence[Mapping[str, Any]], 

123 ) -> list[dict[str, Any]]: 

124 """Strip private keys from each iteration's ``criteria_evaluation`` shape. 

125 

126 Thin alias over 

127 :func:`mission.validation.strip_private_fields_iterations`. 

128 """ 

129 return mission_validation.strip_private_fields_iterations(iterations) 

130 

131 # ------------------------------------------------------------------ # 

132 # Engine wiring 

133 # ------------------------------------------------------------------ # 

134 

135 # The engine factory itself lives in :mod:`mcp.mission._engine_factory` 

136 # so the CLI can reuse the same wiring without crossing the 

137 # ``GCO_ENABLE_MISSION`` gate. We keep a thin alias here so call 

138 # sites in this module stay readable without having to spell out 

139 # the long import path. 

140 from mission._engine_factory import build_mission_engine as _build_engine # noqa: PLC0415 

141 

142 # ------------------------------------------------------------------ # 

143 # mission_start 

144 # ------------------------------------------------------------------ # 

145 

146 @mcp.tool(tags={"low-risk", "mission"}) 

147 @audit_logged 

148 async def mission_start( 

149 directive: str, 

150 criteria: list[dict[str, Any]], 

151 budget: dict[str, Any], 

152 tool_allowlist: list[str] | None = None, 

153 checkpoint_cadence: dict[str, Any] | None = None, 

154 stagnation_threshold: int = 3, 

155 use_sampling: bool | None = None, 

156 allow_scripted_strategies: bool = False, 

157 sampling_model_preferences: dict[str, Any] | None = None, 

158 allow_all_tools: bool = False, 

159 ) -> str: 

160 """[gated by GCO_ENABLE_MISSION] Start a new Mission session. 

161 

162 Args: 

163 directive: Natural-language goal description. 

164 criteria: List of success criterion dicts (``metric_threshold``, 

165 ``event``, or ``predicate`` kinds). 

166 budget: Budget controls dict with ``max_iterations`` and 

167 ``max_wall_clock_seconds``. Cost guardrails live 

168 out-of-band via AWS Budgets and Cost Anomaly Detection. 

169 tool_allowlist: List of tool names the session may invoke. 

170 Optional; omit it when ``allow_all_tools`` is set. 

171 checkpoint_cadence: Optional cadence dict (default 

172 ``{"kind": "every_iteration"}``). 

173 stagnation_threshold: Iterations of no progress before 

174 terminating (default 3). 

175 use_sampling: Three-state opt-in. ``None`` auto-detects, 

176 ``True`` opts in explicitly, ``False`` opts out. 

177 allow_scripted_strategies: When True, the session permits 

178 scripted strategies (validated via the sandbox AST). 

179 sampling_model_preferences: Optional FastMCP 

180 ``ModelPreferences`` payload forwarded to MCP sampling. 

181 allow_all_tools: When True (default ``False``), resolve the 

182 session's allowlist to every currently-registered tool 

183 minus the ``mission_*`` control tools, instead of an 

184 explicit ``tool_allowlist``. Mutually exclusive with a 

185 non-empty ``tool_allowlist``. 

186 

187 Returns a JSON string with the new ``session_id`` and the 

188 resolved sampling state, or an error envelope. 

189 """ 

190 try: 

191 directive_clean = mission_validation.validate_directive(directive) 

192 criteria_clean = mission_validation.validate_criteria(criteria) 

193 registered_tools = await _registered_tools_dict() 

194 registered_tags = await _registered_tool_tags() 

195 control_tools = {n for n, tags in registered_tags.items() if "mission" in tags} 

196 allowlist_clean = mission_validation.resolve_effective_allowlist( 

197 allow_all_tools=allow_all_tools, 

198 explicit_allowlist=tool_allowlist, 

199 registered_tools=registered_tools, 

200 control_tools=control_tools, 

201 ) 

202 budget_clean = mission_validation.validate_budget( 

203 budget, allowlist_clean, registered_tags 

204 ) 

205 cadence_clean = mission_validation.validate_cadence( 

206 checkpoint_cadence 

207 if checkpoint_cadence is not None 

208 else {"kind": "every_iteration"} 

209 ) 

210 except MissionValidationError as err: 

211 return json.dumps({"code": err.code, "details": err.details}) 

212 

213 ctx = _try_get_context() 

214 use_sampling_resolved, backend_resolved = mission_sampling.resolve_sampling_state( 

215 ctx, use_sampling 

216 ) 

217 

218 session_id = f"mission-{secrets.token_hex(8)}" 

219 session: dict[str, Any] = { 

220 "version": SCHEMA_VERSION, 

221 "session_id": session_id, 

222 "directive_text": directive_clean, 

223 "criteria": criteria_clean, 

224 "budget": budget_clean, 

225 "tool_allowlist": allowlist_clean, 

226 "checkpoint_cadence": cadence_clean, 

227 "stagnation_threshold": stagnation_threshold, 

228 "use_sampling": use_sampling_resolved, 

229 "sampling_backend_resolved": backend_resolved, 

230 "allow_scripted_strategies": bool(allow_scripted_strategies), 

231 "status": "pending", 

232 "created_at": datetime.now(UTC).isoformat(), 

233 "iterations": [], 

234 "no_progress_counter": 0, 

235 } 

236 if sampling_model_preferences is not None: 236 ↛ 237line 236 didn't jump to line 237 because the condition on line 236 was never true

237 session["sampling_model_preferences"] = sampling_model_preferences 

238 

239 backend = mission_state.get_backend() 

240 # Strip the validator's cached ``_parsed_ast`` AST nodes from 

241 # predicate criteria before persistence — ``ast.Expression`` 

242 # is not JSON-serialisable and the FilesystemBackend writes 

243 # via ``json.dump``. The engine re-parses on demand from the 

244 # ``expression`` string when it next loads the session, so 

245 # stripping here is lossless. Mirrors ``_strip_private_criteria`` 

246 # in ``cli/commands/mission_cmd.py``. 

247 backend.save_session(cast("SessionState", _strip_private_fields(session))) 

248 

249 return json.dumps( 

250 { 

251 "session_id": session_id, 

252 "status": "pending", 

253 "use_sampling": use_sampling_resolved, 

254 "sampling_backend_resolved": backend_resolved, 

255 } 

256 ) 

257 

258 # ------------------------------------------------------------------ # 

259 # mission_status 

260 # ------------------------------------------------------------------ # 

261 

262 @mcp.tool(tags={"safe", "mission"}) 

263 @audit_logged 

264 async def mission_status(session_id: str) -> str: 

265 """[gated by GCO_ENABLE_MISSION] Get the full state of a Mission session. 

266 

267 Args: 

268 session_id: The session identifier returned by 

269 :func:`mission_start`. 

270 

271 Returns the full session JSON or an error envelope when the 

272 session is unknown. 

273 """ 

274 backend = mission_state.get_backend() 

275 session = backend.load_session(session_id) 

276 if session is None: 

277 return json.dumps( 

278 { 

279 "code": "session_not_found", 

280 "details": {"session_id": session_id}, 

281 } 

282 ) 

283 cleaned = _strip_private_fields(session) 

284 return json.dumps(cleaned, default=str) 

285 

286 # ------------------------------------------------------------------ # 

287 # mission_iterate 

288 # ------------------------------------------------------------------ # 

289 

290 @mcp.tool(tags={"low-risk", "mission"}) 

291 @audit_logged 

292 async def mission_iterate( 

293 session_id: str, 

294 max_iterations_this_call: int = 1, 

295 ) -> str: 

296 """[gated by GCO_ENABLE_MISSION] Run iteration(s) on a Mission session. 

297 

298 Args: 

299 session_id: The session to iterate. 

300 max_iterations_this_call: How many iterations to run before 

301 returning (default 1). The loop exits early on a 

302 terminal verdict (``complete`` or ``terminate``). 

303 

304 Returns a JSON object with a ``session_id`` and an 

305 ``iterations`` list of iteration summaries (verdict, reason, 

306 iteration index). On engine errors, returns an error envelope 

307 with whatever summaries had accumulated before the failure. 

308 """ 

309 if max_iterations_this_call < 1: 

310 return json.dumps( 

311 { 

312 "code": "invalid_argument", 

313 "details": {"reason": "max_iterations_this_call must be >= 1"}, 

314 } 

315 ) 

316 

317 ctx = _try_get_context() 

318 backend = mission_state.get_backend() 

319 

320 session = backend.load_session(session_id) 

321 if session is None: 

322 return json.dumps( 

323 { 

324 "code": "session_not_found", 

325 "details": {"session_id": session_id}, 

326 } 

327 ) 

328 

329 engine = await _build_engine(session, ctx) 

330 

331 summaries: list[dict[str, Any]] = [] 

332 for _ in range(max_iterations_this_call): 

333 try: 

334 record = await engine.run_iteration(session_id, ctx=ctx) 

335 except MissionEngineError as err: 

336 return json.dumps( 

337 { 

338 "code": err.code, 

339 "details": {"session_id": session_id}, 

340 "iterations": summaries, 

341 } 

342 ) 

343 summaries.append( 

344 { 

345 "iteration_index": record["iteration_index"], 

346 "verdict": record["verdict"], 

347 "verdict_reason": record["verdict_reason"], 

348 } 

349 ) 

350 if record["verdict"] in ("complete", "terminate"): 

351 break 

352 

353 return json.dumps({"session_id": session_id, "iterations": summaries}) 

354 

355 # ------------------------------------------------------------------ # 

356 # mission_checkpoint 

357 # ------------------------------------------------------------------ # 

358 

359 @mcp.tool(tags={"safe", "mission"}) 

360 @audit_logged 

361 async def mission_checkpoint(session_id: str) -> str: 

362 """[gated by GCO_ENABLE_MISSION] Re-run the verdict cascade on the latest iteration. 

363 

364 Args: 

365 session_id: The session whose latest iteration should be 

366 re-evaluated. 

367 

368 Returns a JSON object with the freshly-computed verdict and 

369 reason. Does not run the propose / execute / observe phases — 

370 only the deterministic decide cascade. Returns an error envelope 

371 when the session is missing or has no iterations yet. 

372 """ 

373 backend = mission_state.get_backend() 

374 session = backend.load_session(session_id) 

375 if session is None: 

376 return json.dumps( 

377 { 

378 "code": "session_not_found", 

379 "details": {"session_id": session_id}, 

380 } 

381 ) 

382 iterations = session.get("iterations") or [] 

383 if not iterations: 

384 return json.dumps( 

385 { 

386 "code": "no_iterations", 

387 "details": {"session_id": session_id}, 

388 } 

389 ) 

390 

391 latest = iterations[-1] 

392 verdict, reason = decide_verdict(session, latest, datetime.now(UTC)) 

393 return json.dumps( 

394 { 

395 "session_id": session_id, 

396 "iteration_index": latest["iteration_index"], 

397 "verdict": verdict, 

398 "verdict_reason": reason, 

399 } 

400 ) 

401 

402 # ------------------------------------------------------------------ # 

403 # mission_complete 

404 # ------------------------------------------------------------------ # 

405 

406 @mcp.tool(tags={"low-risk", "mission"}) 

407 @audit_logged 

408 async def mission_complete(session_id: str, reason: str = "forced_complete") -> str: 

409 """[gated by GCO_ENABLE_MISSION] Force a Mission session into completed status. 

410 

411 Args: 

412 session_id: The session to complete. 

413 reason: Free-form reason recorded alongside the synthetic 

414 final verdict (default ``forced_complete``). 

415 

416 Stamps a synthetic ``complete`` final verdict and an 

417 ``ended_at`` timestamp. Refuses sessions already in a terminal 

418 state. 

419 """ 

420 del reason # currently informational; the synthetic verdict is fixed 

421 backend = mission_state.get_backend() 

422 session = backend.load_session(session_id) 

423 if session is None: 

424 return json.dumps( 

425 { 

426 "code": "session_not_found", 

427 "details": {"session_id": session_id}, 

428 } 

429 ) 

430 if session["status"] in TERMINAL_STATES: 

431 return json.dumps( 

432 { 

433 "code": "session_terminal", 

434 "details": { 

435 "session_id": session_id, 

436 "status": session["status"], 

437 }, 

438 } 

439 ) 

440 session["status"] = "completed" 

441 session["final_verdict"] = "complete" 

442 session["ended_at"] = datetime.now(UTC).isoformat() 

443 # Defensive strip — sessions loaded from the backend were 

444 # already private-field-clean, but a future change that 

445 # re-attaches ``_parsed_ast`` somewhere in the flow shouldn't 

446 # silently break persistence. ``_strip_private_fields`` is 

447 # cheap and idempotent on already-clean inputs. 

448 backend.save_session(cast("SessionState", _strip_private_fields(session))) 

449 return json.dumps({"session_id": session_id, "status": "completed"}) 

450 

451 # ------------------------------------------------------------------ # 

452 # mission_abort 

453 # ------------------------------------------------------------------ # 

454 

455 @mcp.tool(tags={"low-risk", "mission"}) 

456 @audit_logged 

457 async def mission_abort(session_id: str, pause: bool = False) -> str: 

458 """[gated by GCO_ENABLE_MISSION] Pause or terminate a Mission session. 

459 

460 Args: 

461 session_id: The session to transition. 

462 pause: When True, transition to ``paused``. When False 

463 (default), transition to ``terminated`` with a 

464 synthetic ``terminate`` final verdict. 

465 

466 Refuses sessions already in a terminal state. Best-effort 

467 emits a ``ctx.warning`` when the active request context is 

468 available so the operator sees the side-effect. 

469 """ 

470 backend = mission_state.get_backend() 

471 session = backend.load_session(session_id) 

472 if session is None: 

473 return json.dumps( 

474 { 

475 "code": "session_not_found", 

476 "details": {"session_id": session_id}, 

477 } 

478 ) 

479 if session["status"] in TERMINAL_STATES: 

480 return json.dumps( 

481 { 

482 "code": "session_terminal", 

483 "details": { 

484 "session_id": session_id, 

485 "status": session["status"], 

486 }, 

487 } 

488 ) 

489 if pause: 

490 session["status"] = "paused" 

491 else: 

492 session["status"] = "terminated" 

493 session["final_verdict"] = "terminate" 

494 session["ended_at"] = datetime.now(UTC).isoformat() 

495 ctx = _try_get_context() 

496 if ctx is not None: 496 ↛ 500line 496 didn't jump to line 500 because the condition on line 496 was always true

497 with contextlib.suppress(Exception): 

498 await ctx.warning(f"Mission session {session_id} terminated by operator.") 

499 # Defensive strip — see mission_complete for the rationale. 

500 backend.save_session(cast("SessionState", _strip_private_fields(session))) 

501 return json.dumps({"session_id": session_id, "status": session["status"]}) 

502 

503 # ------------------------------------------------------------------ # 

504 # mission_resume 

505 # ------------------------------------------------------------------ # 

506 

507 @mcp.tool(tags={"low-risk", "mission"}) 

508 @audit_logged 

509 async def mission_resume(session_id: str) -> str: 

510 """[gated by GCO_ENABLE_MISSION] Resume a paused Mission session. 

511 

512 Args: 

513 session_id: The session to resume. 

514 

515 Transitions ``paused -> running``. Returns an error envelope 

516 when the session is missing or not in ``paused`` state. 

517 """ 

518 backend = mission_state.get_backend() 

519 session = backend.load_session(session_id) 

520 if session is None: 

521 return json.dumps( 

522 { 

523 "code": "session_not_found", 

524 "details": {"session_id": session_id}, 

525 } 

526 ) 

527 if session["status"] != "paused": 

528 return json.dumps( 

529 { 

530 "code": "not_paused", 

531 "details": { 

532 "session_id": session_id, 

533 "status": session["status"], 

534 }, 

535 } 

536 ) 

537 session["status"] = "running" 

538 # Defensive strip — see mission_complete for the rationale. 

539 backend.save_session(cast("SessionState", _strip_private_fields(session))) 

540 return json.dumps({"session_id": session_id, "status": "running"}) 

541 

542 # ------------------------------------------------------------------ # 

543 # mission_history 

544 # ------------------------------------------------------------------ # 

545 

546 @mcp.tool(tags={"safe", "mission"}) 

547 @audit_logged 

548 async def mission_history(session_id: str, format: str = "summary") -> str: 

549 """[gated by GCO_ENABLE_MISSION] Get iteration history for a Mission session. 

550 

551 Args: 

552 session_id: The session whose history to retrieve. 

553 format: ``"summary"`` (default) returns a compact list of 

554 ``{iteration_index, verdict, verdict_reason, 

555 started_at, ended_at}`` dicts; ``"full"`` returns the 

556 complete iteration record dicts. 

557 

558 Returns a JSON object with an ``iterations`` list, or an error 

559 envelope when the session is unknown. 

560 """ 

561 backend = mission_state.get_backend() 

562 session = backend.load_session(session_id) 

563 if session is None: 

564 return json.dumps( 

565 { 

566 "code": "session_not_found", 

567 "details": {"session_id": session_id}, 

568 } 

569 ) 

570 iterations = session.get("iterations") or [] 

571 if format == "full": 

572 return json.dumps( 

573 {"iterations": _strip_private_fields_iterations(iterations)}, 

574 default=str, 

575 ) 

576 summaries = [ 

577 { 

578 "iteration_index": it.get("iteration_index"), 

579 "verdict": it.get("verdict"), 

580 "verdict_reason": it.get("verdict_reason"), 

581 "started_at": it.get("started_at"), 

582 "ended_at": it.get("ended_at"), 

583 } 

584 for it in iterations 

585 ] 

586 return json.dumps({"iterations": summaries}) 

587 

588 # ------------------------------------------------------------------ # 

589 # mission_list 

590 # ------------------------------------------------------------------ # 

591 

592 @mcp.tool(tags={"safe", "mission"}) 

593 @audit_logged 

594 async def mission_list(status: str | None = None) -> str: 

595 """[gated by GCO_ENABLE_MISSION] List Mission sessions. 

596 

597 Args: 

598 status: Optional filter. Recognised values are ``running``, 

599 ``completed``, ``terminated``, ``failed``, ``paused``, 

600 ``pending``. Omit to list every known session. 

601 

602 Returns a JSON object with a ``sessions`` list of summary dicts 

603 (``session_id``, ``status``, ``created_at``, 

604 ``iteration_count``). 

605 """ 

606 backend = mission_state.get_backend() 

607 filter_dict = {"status": status} if status else None 

608 sessions = backend.list_sessions(filter_dict) 

609 return json.dumps({"sessions": sessions})