Coverage for mcp/mission/_engine_factory.py: 86%

137 statements  

« prev     ^ index     » next       coverage.py v7.14.1, created at 2026-06-15 15:07 +0000

1"""Shared :class:`MissionEngine` factory used by the MCP tool and the CLI. 

2 

3Both the ``mcp/tools/mission.py`` MCP tool surface and the 

4``cli/commands/mission_cmd.py`` Click subcommands need to build a 

5:class:`mcp.mission.engine.MissionEngine` with production-wired 

6dependencies — a real tool dispatcher that routes through the live 

7FastMCP registry, a sampling callable that runs the 

8``Strategy_Revision`` prompt against the resolved backend, and an 

9optional sandbox runner for scripted strategies. The wiring used to 

10live only inside the MCP tool module, which left the CLI with a stub 

11dispatcher and ``sampling_callable=None``. That made 

12``gco mission run`` and ``gco mission iterate`` useful for smoke- 

13testing the engine bookkeeping but unable to converge on goals that 

14depend on actual tool-result content. 

15 

16This module hosts the shared factory. The MCP tool surface and the 

17CLI both call :func:`build_engine_dependencies` to obtain the same 

18``(tool_dispatcher, sampling_callable, sandbox_runner)`` triple, then 

19hand them to :class:`mcp.mission.engine.MissionEngine`. The CLI also 

20uses :func:`make_stub_dispatcher` to opt into the canned-response 

21behaviour explicitly through ``--dry-run`` for the smoke-test use 

22case the original stub was designed for. 

23 

24Why a separate module rather than living inside the MCP tool? The 

25MCP tool's body is gated by ``GCO_ENABLE_MISSION``; the CLI must be 

26able to import the factory regardless of the flag, because the 

27flag-gating happens in the Click group, not at import time. Splitting 

28the factory out keeps the MCP tool body lean and lets the CLI reach 

29the same wiring without crossing a feature-flag boundary. 

30""" 

31 

32from __future__ import annotations 

33 

34import json 

35import sys 

36from collections.abc import Awaitable, Callable, Mapping 

37from datetime import UTC, datetime 

38from pathlib import Path 

39from typing import TYPE_CHECKING, Any, cast 

40 

41# <pyflowchart-code-diagram> BEGIN - auto-inserted, do not edit 

42# Flowchart(s) generated from this file: 

43# * ``build_engine_dependencies`` -> ``diagrams/code_diagrams/mcp/mission/_engine_factory.build_engine_dependencies.html`` 

44# (PNG: ``diagrams/code_diagrams/mcp/mission/_engine_factory.build_engine_dependencies.png``) 

45# Regenerate with ``python diagrams/code_diagrams/generate.py``. 

46# <pyflowchart-code-diagram> END 

47 

48 

49# The mission package and the FastMCP server module both live under 

50# ``mcp/``; the path-injection pattern matches the rest of the MCP 

51# surface so ``import server`` and ``import mission.*`` resolve 

52# without making the ``mcp`` directory a package. 

53sys.path.insert(0, str(Path(__file__).resolve().parent.parent)) 

54 

55from mission import sampling as mission_sampling # noqa: E402 

56from mission import state as mission_state # noqa: E402 

57from mission.engine import MissionEngine, SandboxRunner, ToolDispatcher # noqa: E402 

58 

59if TYPE_CHECKING: # pragma: no cover - import only for type checkers 

60 from mission.types import SessionState, ToolCallRecord 

61 

62 

63__all__ = [ 

64 "EngineDependencies", 

65 "build_engine_dependencies", 

66 "build_mission_engine", 

67 "fetch_registered_tool_metadata", 

68 "make_stub_dispatcher", 

69 "remaining_wall_clock_seconds", 

70] 

71 

72 

73# --------------------------------------------------------------------------- 

74# Public types 

75# --------------------------------------------------------------------------- 

76 

77 

78class EngineDependencies: 

79 """Dependency triple consumed by :class:`MissionEngine`. 

80 

81 Holds the callables :class:`MissionEngine` needs at construction 

82 time so callers can build them once and pass them through. A simple 

83 namespace class rather than a NamedTuple so fields can be optional 

84 without the boilerplate. 

85 """ 

86 

87 __slots__ = ("final_lessons_callable", "sampling_callable", "sandbox_runner", "tool_dispatcher") 

88 

89 def __init__( 

90 self, 

91 *, 

92 tool_dispatcher: ToolDispatcher, 

93 sampling_callable: Callable[..., Awaitable[Any]] | None, 

94 sandbox_runner: SandboxRunner | None, 

95 final_lessons_callable: Callable[..., Awaitable[Any]] | None = None, 

96 ) -> None: 

97 self.tool_dispatcher = tool_dispatcher 

98 self.sampling_callable = sampling_callable 

99 self.sandbox_runner = sandbox_runner 

100 self.final_lessons_callable = final_lessons_callable 

101 

102 

103# --------------------------------------------------------------------------- 

104# Helpers shared between the MCP tool and the CLI 

105# --------------------------------------------------------------------------- 

106 

107 

108def remaining_wall_clock_seconds(session: Mapping[str, Any]) -> float | None: 

109 """Return remaining wall-clock seconds for ``session``, or ``None``. 

110 

111 Mirrors the behaviour the MCP tool surface used to keep private: 

112 sessions that haven't started yet report the full cap; sessions 

113 whose ``max_wall_clock_seconds`` is the ``-1`` "uncapped" sentinel 

114 return ``None`` so the sampling prompt's budget context renders 

115 ``"remaining_wall_clock_seconds": null``; running sessions return 

116 the difference between cap and elapsed time clamped at zero. 

117 """ 

118 cap = session.get("budget", {}).get("max_wall_clock_seconds") 

119 if cap is None or cap == -1: 

120 return None 

121 started_raw = session.get("started_at") 

122 if not started_raw: 

123 return float(cap) 

124 try: 

125 started = datetime.fromisoformat(started_raw) 

126 except TypeError, ValueError: 

127 return float(cap) 

128 elapsed = (datetime.now(UTC) - started).total_seconds() 

129 return max(0.0, float(cap) - elapsed) 

130 

131 

132async def fetch_registered_tool_metadata() -> tuple[dict[str, Any], dict[str, str]]: 

133 """Return (registered_tools, tool_docstrings) from the live FastMCP registry. 

134 

135 Reads through ``server.mcp._list_tools()``, the same low-level path 

136 the MCP tool surface used to walk inline. Returns two parallel 

137 dicts so the sampler closure can be built without the caller 

138 needing to call ``mcp.*`` introspection twice. 

139 

140 A blanket ``Exception`` swallow yields ``({}, {})`` so the engine 

141 factory still produces a usable dispatcher when the registry is 

142 not yet populated (CLI path before ``register_all_tools`` ran, 

143 test harness with a stub mcp instance, etc.). 

144 """ 

145 try: 

146 from server import mcp # noqa: PLC0415 - lazy 

147 except Exception: 

148 return {}, {} 

149 try: 

150 tools = await mcp._list_tools() 

151 except Exception: 

152 return {}, {} 

153 registered = {t.name: t for t in tools} 

154 docstrings = {name: (getattr(t, "description", "") or "") for name, t in registered.items()} 

155 return registered, docstrings 

156 

157 

158# --------------------------------------------------------------------------- 

159# Tool dispatcher 

160# --------------------------------------------------------------------------- 

161 

162 

163async def _live_dispatch_tool( 

164 tool_name: str, 

165 args: dict[str, Any], 

166 ctx_inner: Any | None, 

167) -> Any: 

168 """Dispatch ``tool_name`` against the live FastMCP registry. 

169 

170 Looks the tool up via ``server.mcp.get_tool`` and invokes it with 

171 ``args``. The raw FastMCP ``ToolResult`` Pydantic model is not 

172 JSON-serialisable, so the helper unwraps it: 

173 

174 * Prefer ``structured_content`` when present — every FastMCP tool 

175 with a typed return surfaces a JSON-able dict here. 

176 * Fall back to the first content block's ``text`` field; 

177 best-effort JSON-parse so structured string-returning tools 

178 round-trip as dicts. 

179 * Anything else returns ``None`` so the engine records a benign 

180 placeholder rather than a non-serialisable object. 

181 

182 ``RuntimeError`` propagates for unknown tool names so the engine's 

183 per-call try/except records a ``failed`` outcome rather than 

184 silently invoking nothing. 

185 """ 

186 # Reuse the active request context when one exists so tools that 

187 # introspect ``get_context()`` see the right one. Fall back to 

188 # ``ctx_inner`` when no request is active (CLI path / unit-test 

189 # path). 

190 context: Any | None 

191 try: 

192 from fastmcp.server.dependencies import get_context # noqa: PLC0415 

193 

194 try: 

195 context = get_context() 

196 except Exception: 

197 context = ctx_inner 

198 except Exception: 

199 context = ctx_inner 

200 del context # FastMCP uses contextvars internally 

201 

202 from server import mcp # noqa: PLC0415 

203 

204 tool_obj = await mcp.get_tool(tool_name) 

205 if tool_obj is None: 

206 raise RuntimeError(f"tool {tool_name!r} not registered") 

207 result = await tool_obj.run(args) 

208 

209 structured = getattr(result, "structured_content", None) 

210 if isinstance(structured, dict): 

211 return structured 

212 content_blocks = getattr(result, "content", None) or [] 

213 if content_blocks: 

214 first = content_blocks[0] 

215 text_payload = getattr(first, "text", None) 

216 if isinstance(text_payload, str): 216 ↛ 221line 216 didn't jump to line 221 because the condition on line 216 was always true

217 try: 

218 return json.loads(text_payload) 

219 except TypeError, ValueError: 

220 return text_payload 

221 return None 

222 

223 

224def make_stub_dispatcher() -> ToolDispatcher: 

225 """Return a tool dispatcher that returns canned-ok responses. 

226 

227 Reserved for ``--dry-run`` smoke testing. Returns 

228 ``{"_status": "ok", "_stub": True, ...}`` for every call so the 

229 engine bookkeeping converges without invoking any real tool — 

230 useful for exercising the loop without spending Bedrock or AWS 

231 credits, and for unit tests that don't want a live registry. 

232 

233 Production code paths use :func:`build_engine_dependencies` so 

234 this stub never fires unless the operator opts in explicitly. 

235 """ 

236 

237 async def _dispatch(tool_name: str, args: dict[str, Any], ctx: Any) -> dict[str, Any]: 

238 return { 

239 "_status": "ok", 

240 "_stub": True, 

241 "tool_name": tool_name, 

242 "args": dict(args), 

243 } 

244 

245 return _dispatch 

246 

247 

248# --------------------------------------------------------------------------- 

249# Sandbox runner 

250# --------------------------------------------------------------------------- 

251 

252 

253def _build_sandbox_runner(session: Mapping[str, Any]) -> SandboxRunner | None: 

254 """Wire a real sandbox runner when the session permits scripted strategies.""" 

255 if not session.get("allow_scripted_strategies"): 

256 return None 

257 try: 

258 from mission.sandbox import MissionSandbox # noqa: PLC0415 

259 except ImportError: 

260 return None 

261 

262 sandbox = MissionSandbox( 

263 list(session.get("tool_allowlist") or []), 

264 cast("SessionState", session), 

265 ) 

266 

267 async def _sandbox_runner( 

268 script: str, 

269 ctx_arg: Any, 

270 dispatcher: ToolDispatcher, 

271 ) -> tuple[dict[str, Any], list[ToolCallRecord]]: 

272 obs, calls = await sandbox.run(script, ctx_arg, dispatcher) 

273 return obs, cast("list[ToolCallRecord]", calls) 

274 

275 return _sandbox_runner 

276 

277 

278# --------------------------------------------------------------------------- 

279# Sampling callable 

280# --------------------------------------------------------------------------- 

281 

282 

283def _build_sampling_callable( 

284 session: Mapping[str, Any], 

285 ctx: Any | None, 

286 *, 

287 registered_tools: dict[str, Any], 

288 tool_docstrings: dict[str, str], 

289) -> Callable[..., Awaitable[Any]] | None: 

290 """Wire the Strategy_Revision sampler when the session opted in.""" 

291 if not session.get("use_sampling"): 

292 return None 

293 if session.get("sampling_backend_resolved") == "none": 

294 return None 

295 

296 backend_obj = mission_sampling.select_sampling_backend( 

297 ctx, 

298 model_id=session.get("bedrock_model_id"), 

299 prefs=session.get("sampling_model_preferences"), 

300 ) 

301 if backend_obj is None: 

302 return None 

303 

304 # Slow-moving live signals (per-region queue depth, GPU utilisation, 

305 # deployed-region list, reservation counts). Cached on the closure 

306 # so one ``build_engine_dependencies`` call — which spans the 

307 # multi-iteration drive of one CLI invocation or one MCP request — 

308 # only pays the AWS round-trip once. Outer-list trick keeps the 

309 # cache mutable through the inner closure without ``nonlocal``. 

310 from mission._environment import gather_session_environment # noqa: PLC0415 

311 

312 env_cache: list[Mapping[str, Any] | None] = [] 

313 

314 async def _sampler(*, session: dict[str, Any], ctx: Any | None) -> Any: 

315 iterations = session.get("iterations") or [] 

316 latest = iterations[-1] if iterations else None 

317 if latest is None: 

318 return None 

319 budget = session.get("budget") or {} 

320 # ``max_iterations=-1`` is the "uncapped" sentinel; the prompt 

321 # expects an informational remaining-iterations count, so we 

322 # report zero in that mode (the model is told nothing about 

323 # the iteration axis when there's no cap to count down from). 

324 # Finite caps subtract the count of recorded iterations and 

325 # clamp at zero. 

326 cap = int(budget.get("max_iterations", 0)) 

327 remaining_iters = 0 if cap == -1 else max(0, cap - len(iterations)) 

328 if not env_cache: 

329 try: 

330 env_cache.append(gather_session_environment(session)) 

331 except Exception: # noqa: BLE001 

332 env_cache.append(None) 

333 env_ctx = env_cache[0] 

334 return await mission_sampling.maybe_sample_strategy_revision( 

335 backend=backend_obj, 

336 session=cast("SessionState", session), 

337 iteration=latest, 

338 allowlist=list(session.get("tool_allowlist") or []), 

339 registered_tools=registered_tools, 

340 tool_docstrings=tool_docstrings, 

341 remaining_iterations=remaining_iters, 

342 remaining_wall_clock_secs=remaining_wall_clock_seconds(session), 

343 allow_scripts=bool(session.get("allow_scripted_strategies", False)), 

344 environment_context=env_ctx, 

345 ) 

346 

347 return _sampler 

348 

349 

350# --------------------------------------------------------------------------- 

351# Final lessons callable 

352# --------------------------------------------------------------------------- 

353 

354 

355def _build_final_lessons_callable( 

356 session: Mapping[str, Any], 

357 ctx: Any | None, 

358 tool_docstrings: dict[str, str], 

359) -> Callable[..., Awaitable[Any]] | None: 

360 """Wire the Final_Report lessons overlay when sampling is enabled. 

361 

362 When the session opted into sampling and a backend resolves, the 

363 engine calls this after a terminal verdict to produce model-derived 

364 ``lessons`` and ``recommended_followups`` for the Final_Report. 

365 Without it, the report uses deterministic templates. 

366 """ 

367 if not session.get("use_sampling"): 367 ↛ 369line 367 didn't jump to line 369 because the condition on line 367 was always true

368 return None 

369 if session.get("sampling_backend_resolved") == "none": 

370 return None 

371 

372 backend_obj = mission_sampling.select_sampling_backend( 

373 ctx, 

374 model_id=session.get("bedrock_model_id"), 

375 prefs=session.get("sampling_model_preferences"), 

376 ) 

377 if backend_obj is None: 

378 return None 

379 

380 async def _final_lessons(*, session: dict[str, Any], ctx: Any | None) -> Any: 

381 return await mission_sampling.maybe_sample_final_lessons( 

382 backend=backend_obj, 

383 session=cast("SessionState", session), 

384 tool_docstrings=tool_docstrings, 

385 ) 

386 

387 return _final_lessons 

388 

389 

390# --------------------------------------------------------------------------- 

391# Public factory 

392# --------------------------------------------------------------------------- 

393 

394 

395async def build_engine_dependencies( 

396 session: Mapping[str, Any], 

397 ctx: Any | None, 

398 *, 

399 use_stub_dispatcher: bool = False, 

400) -> EngineDependencies: 

401 """Build the :class:`MissionEngine` dependency triple for ``session``. 

402 

403 Looks up the live FastMCP registry to populate the registered-tools 

404 map and per-tool docstring cache, then assembles the production 

405 wiring: a live tool dispatcher (or the canned-stub when 

406 ``use_stub_dispatcher`` is True), the Strategy_Revision sampler 

407 when sampling resolved to a real backend, and a sandbox runner 

408 when the session permits scripted strategies. 

409 

410 The CLI passes ``use_stub_dispatcher=True`` only on the 

411 ``--dry-run`` path; the MCP tool surface always uses the live 

412 dispatcher. 

413 """ 

414 if use_stub_dispatcher: 

415 # The stub dispatcher means real tools never run, which means 

416 # there's nothing to inform the sampling prompt; downgrade to 

417 # the deterministic propose path for symmetry. The session's 

418 # ``use_sampling`` flag stays as the operator set it so the 

419 # criteria-scaffold path still runs through Bedrock. 

420 return EngineDependencies( 

421 tool_dispatcher=make_stub_dispatcher(), 

422 sampling_callable=None, 

423 sandbox_runner=_build_sandbox_runner(session), 

424 ) 

425 

426 registered_tools, tool_docstrings = await fetch_registered_tool_metadata() 

427 sampling_callable = _build_sampling_callable( 

428 session, 

429 ctx, 

430 registered_tools=registered_tools, 

431 tool_docstrings=tool_docstrings, 

432 ) 

433 sandbox_runner = _build_sandbox_runner(session) 

434 final_lessons = _build_final_lessons_callable(session, ctx, tool_docstrings) 

435 return EngineDependencies( 

436 tool_dispatcher=_live_dispatch_tool, 

437 sampling_callable=sampling_callable, 

438 sandbox_runner=sandbox_runner, 

439 final_lessons_callable=final_lessons, 

440 ) 

441 

442 

443async def build_mission_engine( 

444 session: Mapping[str, Any], 

445 ctx: Any | None, 

446 *, 

447 use_stub_dispatcher: bool = False, 

448) -> MissionEngine: 

449 """Build a :class:`MissionEngine` instance ready to drive ``session``. 

450 

451 Convenience wrapper over :func:`build_engine_dependencies` that 

452 also resolves the persistence backend through 

453 :func:`mcp.mission.state.get_backend`. Most callers should use 

454 this; the lower-level :func:`build_engine_dependencies` is 

455 available for code that needs to instantiate the engine itself 

456 (e.g. with a custom backend). 

457 """ 

458 deps = await build_engine_dependencies(session, ctx, use_stub_dispatcher=use_stub_dispatcher) 

459 backend = mission_state.get_backend() 

460 return MissionEngine( 

461 backend=backend, 

462 tool_dispatcher=deps.tool_dispatcher, 

463 sampling_callable=deps.sampling_callable, 

464 sandbox_runner=deps.sandbox_runner, 

465 final_lessons_callable=deps.final_lessons_callable, 

466 )