Coverage for mcp/mission/_engine_factory.py: 86%
137 statements
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-15 15:07 +0000
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-15 15:07 +0000
1"""Shared :class:`MissionEngine` factory used by the MCP tool and the CLI.
3Both the ``mcp/tools/mission.py`` MCP tool surface and the
4``cli/commands/mission_cmd.py`` Click subcommands need to build a
5:class:`mcp.mission.engine.MissionEngine` with production-wired
6dependencies — a real tool dispatcher that routes through the live
7FastMCP registry, a sampling callable that runs the
8``Strategy_Revision`` prompt against the resolved backend, and an
9optional sandbox runner for scripted strategies. The wiring used to
10live only inside the MCP tool module, which left the CLI with a stub
11dispatcher and ``sampling_callable=None``. That made
12``gco mission run`` and ``gco mission iterate`` useful for smoke-
13testing the engine bookkeeping but unable to converge on goals that
14depend on actual tool-result content.
16This module hosts the shared factory. The MCP tool surface and the
17CLI both call :func:`build_engine_dependencies` to obtain the same
18``(tool_dispatcher, sampling_callable, sandbox_runner)`` triple, then
19hand them to :class:`mcp.mission.engine.MissionEngine`. The CLI also
20uses :func:`make_stub_dispatcher` to opt into the canned-response
21behaviour explicitly through ``--dry-run`` for the smoke-test use
22case the original stub was designed for.
24Why a separate module rather than living inside the MCP tool? The
25MCP tool's body is gated by ``GCO_ENABLE_MISSION``; the CLI must be
26able to import the factory regardless of the flag, because the
27flag-gating happens in the Click group, not at import time. Splitting
28the factory out keeps the MCP tool body lean and lets the CLI reach
29the same wiring without crossing a feature-flag boundary.
30"""
32from __future__ import annotations
34import json
35import sys
36from collections.abc import Awaitable, Callable, Mapping
37from datetime import UTC, datetime
38from pathlib import Path
39from typing import TYPE_CHECKING, Any, cast
41# <pyflowchart-code-diagram> BEGIN - auto-inserted, do not edit
42# Flowchart(s) generated from this file:
43# * ``build_engine_dependencies`` -> ``diagrams/code_diagrams/mcp/mission/_engine_factory.build_engine_dependencies.html``
44# (PNG: ``diagrams/code_diagrams/mcp/mission/_engine_factory.build_engine_dependencies.png``)
45# Regenerate with ``python diagrams/code_diagrams/generate.py``.
46# <pyflowchart-code-diagram> END
49# The mission package and the FastMCP server module both live under
50# ``mcp/``; the path-injection pattern matches the rest of the MCP
51# surface so ``import server`` and ``import mission.*`` resolve
52# without making the ``mcp`` directory a package.
53sys.path.insert(0, str(Path(__file__).resolve().parent.parent))
55from mission import sampling as mission_sampling # noqa: E402
56from mission import state as mission_state # noqa: E402
57from mission.engine import MissionEngine, SandboxRunner, ToolDispatcher # noqa: E402
59if TYPE_CHECKING: # pragma: no cover - import only for type checkers
60 from mission.types import SessionState, ToolCallRecord
63__all__ = [
64 "EngineDependencies",
65 "build_engine_dependencies",
66 "build_mission_engine",
67 "fetch_registered_tool_metadata",
68 "make_stub_dispatcher",
69 "remaining_wall_clock_seconds",
70]
73# ---------------------------------------------------------------------------
74# Public types
75# ---------------------------------------------------------------------------
78class EngineDependencies:
79 """Dependency triple consumed by :class:`MissionEngine`.
81 Holds the callables :class:`MissionEngine` needs at construction
82 time so callers can build them once and pass them through. A simple
83 namespace class rather than a NamedTuple so fields can be optional
84 without the boilerplate.
85 """
87 __slots__ = ("final_lessons_callable", "sampling_callable", "sandbox_runner", "tool_dispatcher")
89 def __init__(
90 self,
91 *,
92 tool_dispatcher: ToolDispatcher,
93 sampling_callable: Callable[..., Awaitable[Any]] | None,
94 sandbox_runner: SandboxRunner | None,
95 final_lessons_callable: Callable[..., Awaitable[Any]] | None = None,
96 ) -> None:
97 self.tool_dispatcher = tool_dispatcher
98 self.sampling_callable = sampling_callable
99 self.sandbox_runner = sandbox_runner
100 self.final_lessons_callable = final_lessons_callable
103# ---------------------------------------------------------------------------
104# Helpers shared between the MCP tool and the CLI
105# ---------------------------------------------------------------------------
108def remaining_wall_clock_seconds(session: Mapping[str, Any]) -> float | None:
109 """Return remaining wall-clock seconds for ``session``, or ``None``.
111 Mirrors the behaviour the MCP tool surface used to keep private:
112 sessions that haven't started yet report the full cap; sessions
113 whose ``max_wall_clock_seconds`` is the ``-1`` "uncapped" sentinel
114 return ``None`` so the sampling prompt's budget context renders
115 ``"remaining_wall_clock_seconds": null``; running sessions return
116 the difference between cap and elapsed time clamped at zero.
117 """
118 cap = session.get("budget", {}).get("max_wall_clock_seconds")
119 if cap is None or cap == -1:
120 return None
121 started_raw = session.get("started_at")
122 if not started_raw:
123 return float(cap)
124 try:
125 started = datetime.fromisoformat(started_raw)
126 except TypeError, ValueError:
127 return float(cap)
128 elapsed = (datetime.now(UTC) - started).total_seconds()
129 return max(0.0, float(cap) - elapsed)
132async def fetch_registered_tool_metadata() -> tuple[dict[str, Any], dict[str, str]]:
133 """Return (registered_tools, tool_docstrings) from the live FastMCP registry.
135 Reads through ``server.mcp._list_tools()``, the same low-level path
136 the MCP tool surface used to walk inline. Returns two parallel
137 dicts so the sampler closure can be built without the caller
138 needing to call ``mcp.*`` introspection twice.
140 A blanket ``Exception`` swallow yields ``({}, {})`` so the engine
141 factory still produces a usable dispatcher when the registry is
142 not yet populated (CLI path before ``register_all_tools`` ran,
143 test harness with a stub mcp instance, etc.).
144 """
145 try:
146 from server import mcp # noqa: PLC0415 - lazy
147 except Exception:
148 return {}, {}
149 try:
150 tools = await mcp._list_tools()
151 except Exception:
152 return {}, {}
153 registered = {t.name: t for t in tools}
154 docstrings = {name: (getattr(t, "description", "") or "") for name, t in registered.items()}
155 return registered, docstrings
158# ---------------------------------------------------------------------------
159# Tool dispatcher
160# ---------------------------------------------------------------------------
163async def _live_dispatch_tool(
164 tool_name: str,
165 args: dict[str, Any],
166 ctx_inner: Any | None,
167) -> Any:
168 """Dispatch ``tool_name`` against the live FastMCP registry.
170 Looks the tool up via ``server.mcp.get_tool`` and invokes it with
171 ``args``. The raw FastMCP ``ToolResult`` Pydantic model is not
172 JSON-serialisable, so the helper unwraps it:
174 * Prefer ``structured_content`` when present — every FastMCP tool
175 with a typed return surfaces a JSON-able dict here.
176 * Fall back to the first content block's ``text`` field;
177 best-effort JSON-parse so structured string-returning tools
178 round-trip as dicts.
179 * Anything else returns ``None`` so the engine records a benign
180 placeholder rather than a non-serialisable object.
182 ``RuntimeError`` propagates for unknown tool names so the engine's
183 per-call try/except records a ``failed`` outcome rather than
184 silently invoking nothing.
185 """
186 # Reuse the active request context when one exists so tools that
187 # introspect ``get_context()`` see the right one. Fall back to
188 # ``ctx_inner`` when no request is active (CLI path / unit-test
189 # path).
190 context: Any | None
191 try:
192 from fastmcp.server.dependencies import get_context # noqa: PLC0415
194 try:
195 context = get_context()
196 except Exception:
197 context = ctx_inner
198 except Exception:
199 context = ctx_inner
200 del context # FastMCP uses contextvars internally
202 from server import mcp # noqa: PLC0415
204 tool_obj = await mcp.get_tool(tool_name)
205 if tool_obj is None:
206 raise RuntimeError(f"tool {tool_name!r} not registered")
207 result = await tool_obj.run(args)
209 structured = getattr(result, "structured_content", None)
210 if isinstance(structured, dict):
211 return structured
212 content_blocks = getattr(result, "content", None) or []
213 if content_blocks:
214 first = content_blocks[0]
215 text_payload = getattr(first, "text", None)
216 if isinstance(text_payload, str): 216 ↛ 221line 216 didn't jump to line 221 because the condition on line 216 was always true
217 try:
218 return json.loads(text_payload)
219 except TypeError, ValueError:
220 return text_payload
221 return None
224def make_stub_dispatcher() -> ToolDispatcher:
225 """Return a tool dispatcher that returns canned-ok responses.
227 Reserved for ``--dry-run`` smoke testing. Returns
228 ``{"_status": "ok", "_stub": True, ...}`` for every call so the
229 engine bookkeeping converges without invoking any real tool —
230 useful for exercising the loop without spending Bedrock or AWS
231 credits, and for unit tests that don't want a live registry.
233 Production code paths use :func:`build_engine_dependencies` so
234 this stub never fires unless the operator opts in explicitly.
235 """
237 async def _dispatch(tool_name: str, args: dict[str, Any], ctx: Any) -> dict[str, Any]:
238 return {
239 "_status": "ok",
240 "_stub": True,
241 "tool_name": tool_name,
242 "args": dict(args),
243 }
245 return _dispatch
248# ---------------------------------------------------------------------------
249# Sandbox runner
250# ---------------------------------------------------------------------------
253def _build_sandbox_runner(session: Mapping[str, Any]) -> SandboxRunner | None:
254 """Wire a real sandbox runner when the session permits scripted strategies."""
255 if not session.get("allow_scripted_strategies"):
256 return None
257 try:
258 from mission.sandbox import MissionSandbox # noqa: PLC0415
259 except ImportError:
260 return None
262 sandbox = MissionSandbox(
263 list(session.get("tool_allowlist") or []),
264 cast("SessionState", session),
265 )
267 async def _sandbox_runner(
268 script: str,
269 ctx_arg: Any,
270 dispatcher: ToolDispatcher,
271 ) -> tuple[dict[str, Any], list[ToolCallRecord]]:
272 obs, calls = await sandbox.run(script, ctx_arg, dispatcher)
273 return obs, cast("list[ToolCallRecord]", calls)
275 return _sandbox_runner
278# ---------------------------------------------------------------------------
279# Sampling callable
280# ---------------------------------------------------------------------------
283def _build_sampling_callable(
284 session: Mapping[str, Any],
285 ctx: Any | None,
286 *,
287 registered_tools: dict[str, Any],
288 tool_docstrings: dict[str, str],
289) -> Callable[..., Awaitable[Any]] | None:
290 """Wire the Strategy_Revision sampler when the session opted in."""
291 if not session.get("use_sampling"):
292 return None
293 if session.get("sampling_backend_resolved") == "none":
294 return None
296 backend_obj = mission_sampling.select_sampling_backend(
297 ctx,
298 model_id=session.get("bedrock_model_id"),
299 prefs=session.get("sampling_model_preferences"),
300 )
301 if backend_obj is None:
302 return None
304 # Slow-moving live signals (per-region queue depth, GPU utilisation,
305 # deployed-region list, reservation counts). Cached on the closure
306 # so one ``build_engine_dependencies`` call — which spans the
307 # multi-iteration drive of one CLI invocation or one MCP request —
308 # only pays the AWS round-trip once. Outer-list trick keeps the
309 # cache mutable through the inner closure without ``nonlocal``.
310 from mission._environment import gather_session_environment # noqa: PLC0415
312 env_cache: list[Mapping[str, Any] | None] = []
314 async def _sampler(*, session: dict[str, Any], ctx: Any | None) -> Any:
315 iterations = session.get("iterations") or []
316 latest = iterations[-1] if iterations else None
317 if latest is None:
318 return None
319 budget = session.get("budget") or {}
320 # ``max_iterations=-1`` is the "uncapped" sentinel; the prompt
321 # expects an informational remaining-iterations count, so we
322 # report zero in that mode (the model is told nothing about
323 # the iteration axis when there's no cap to count down from).
324 # Finite caps subtract the count of recorded iterations and
325 # clamp at zero.
326 cap = int(budget.get("max_iterations", 0))
327 remaining_iters = 0 if cap == -1 else max(0, cap - len(iterations))
328 if not env_cache:
329 try:
330 env_cache.append(gather_session_environment(session))
331 except Exception: # noqa: BLE001
332 env_cache.append(None)
333 env_ctx = env_cache[0]
334 return await mission_sampling.maybe_sample_strategy_revision(
335 backend=backend_obj,
336 session=cast("SessionState", session),
337 iteration=latest,
338 allowlist=list(session.get("tool_allowlist") or []),
339 registered_tools=registered_tools,
340 tool_docstrings=tool_docstrings,
341 remaining_iterations=remaining_iters,
342 remaining_wall_clock_secs=remaining_wall_clock_seconds(session),
343 allow_scripts=bool(session.get("allow_scripted_strategies", False)),
344 environment_context=env_ctx,
345 )
347 return _sampler
350# ---------------------------------------------------------------------------
351# Final lessons callable
352# ---------------------------------------------------------------------------
355def _build_final_lessons_callable(
356 session: Mapping[str, Any],
357 ctx: Any | None,
358 tool_docstrings: dict[str, str],
359) -> Callable[..., Awaitable[Any]] | None:
360 """Wire the Final_Report lessons overlay when sampling is enabled.
362 When the session opted into sampling and a backend resolves, the
363 engine calls this after a terminal verdict to produce model-derived
364 ``lessons`` and ``recommended_followups`` for the Final_Report.
365 Without it, the report uses deterministic templates.
366 """
367 if not session.get("use_sampling"): 367 ↛ 369line 367 didn't jump to line 369 because the condition on line 367 was always true
368 return None
369 if session.get("sampling_backend_resolved") == "none":
370 return None
372 backend_obj = mission_sampling.select_sampling_backend(
373 ctx,
374 model_id=session.get("bedrock_model_id"),
375 prefs=session.get("sampling_model_preferences"),
376 )
377 if backend_obj is None:
378 return None
380 async def _final_lessons(*, session: dict[str, Any], ctx: Any | None) -> Any:
381 return await mission_sampling.maybe_sample_final_lessons(
382 backend=backend_obj,
383 session=cast("SessionState", session),
384 tool_docstrings=tool_docstrings,
385 )
387 return _final_lessons
390# ---------------------------------------------------------------------------
391# Public factory
392# ---------------------------------------------------------------------------
395async def build_engine_dependencies(
396 session: Mapping[str, Any],
397 ctx: Any | None,
398 *,
399 use_stub_dispatcher: bool = False,
400) -> EngineDependencies:
401 """Build the :class:`MissionEngine` dependency triple for ``session``.
403 Looks up the live FastMCP registry to populate the registered-tools
404 map and per-tool docstring cache, then assembles the production
405 wiring: a live tool dispatcher (or the canned-stub when
406 ``use_stub_dispatcher`` is True), the Strategy_Revision sampler
407 when sampling resolved to a real backend, and a sandbox runner
408 when the session permits scripted strategies.
410 The CLI passes ``use_stub_dispatcher=True`` only on the
411 ``--dry-run`` path; the MCP tool surface always uses the live
412 dispatcher.
413 """
414 if use_stub_dispatcher:
415 # The stub dispatcher means real tools never run, which means
416 # there's nothing to inform the sampling prompt; downgrade to
417 # the deterministic propose path for symmetry. The session's
418 # ``use_sampling`` flag stays as the operator set it so the
419 # criteria-scaffold path still runs through Bedrock.
420 return EngineDependencies(
421 tool_dispatcher=make_stub_dispatcher(),
422 sampling_callable=None,
423 sandbox_runner=_build_sandbox_runner(session),
424 )
426 registered_tools, tool_docstrings = await fetch_registered_tool_metadata()
427 sampling_callable = _build_sampling_callable(
428 session,
429 ctx,
430 registered_tools=registered_tools,
431 tool_docstrings=tool_docstrings,
432 )
433 sandbox_runner = _build_sandbox_runner(session)
434 final_lessons = _build_final_lessons_callable(session, ctx, tool_docstrings)
435 return EngineDependencies(
436 tool_dispatcher=_live_dispatch_tool,
437 sampling_callable=sampling_callable,
438 sandbox_runner=sandbox_runner,
439 final_lessons_callable=final_lessons,
440 )
443async def build_mission_engine(
444 session: Mapping[str, Any],
445 ctx: Any | None,
446 *,
447 use_stub_dispatcher: bool = False,
448) -> MissionEngine:
449 """Build a :class:`MissionEngine` instance ready to drive ``session``.
451 Convenience wrapper over :func:`build_engine_dependencies` that
452 also resolves the persistence backend through
453 :func:`mcp.mission.state.get_backend`. Most callers should use
454 this; the lower-level :func:`build_engine_dependencies` is
455 available for code that needs to instantiate the engine itself
456 (e.g. with a custom backend).
457 """
458 deps = await build_engine_dependencies(session, ctx, use_stub_dispatcher=use_stub_dispatcher)
459 backend = mission_state.get_backend()
460 return MissionEngine(
461 backend=backend,
462 tool_dispatcher=deps.tool_dispatcher,
463 sampling_callable=deps.sampling_callable,
464 sandbox_runner=deps.sandbox_runner,
465 final_lessons_callable=deps.final_lessons_callable,
466 )