Coverage for mcp/mission/engine.py: 89%
572 statements
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-15 15:07 +0000
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-15 15:07 +0000
1"""Five-phase iteration loop driver for the Mission goal-directed loop.
3The :class:`MissionEngine` owns one ``run_iteration`` lifecycle per call: it
4loads the persisted session, walks the iteration through propose → execute
5→ observe → evaluate → decide, persists the resulting record, and writes a
6Final_Report when the verdict is terminal. Every external dependency is
7injected at construction time so unit tests can supply mocks for the tool
8dispatcher, the sampling callable, and the script sandbox runner.
10Why a class rather than a free function? Two reasons.
12* Each phase needs the same handful of dependencies (the backend, the
13 tool dispatcher, the cost-estimator map, the clock). Threading them
14 through every method as positional arguments would be tedious and
15 error-prone; the dataclass shape gives every phase one place to look
16 for them.
17* A test that exercises a single phase in isolation needs to construct
18 a ``MissionEngine`` with stubbed dependencies and call the private
19 method directly. Having the dependencies on the instance — rather
20 than as module-level singletons — keeps the engine pure and free of
21 process-global state.
23Phase contract:
25* Each ``_*_phase`` method is wrapped in a try/finally that emits
26 exactly one ``audit.emit_phase_event`` regardless of whether the body
27 succeeded or raised. The matching :class:`PhaseRecord` is appended to
28 ``record["phases"]`` in the same finally block, so a failed phase
29 still produces a structured record on the iteration.
30* Any phase that raises propagates the exception out of
31 ``run_iteration`` after the engine marks the session as ``failed``,
32 appends the partial iteration, and persists. Subsequent calls to
33 ``run_iteration`` on a ``failed`` session refuse with
34 ``session_failed``.
36Determinism: only the Decide_Phase consults a clock, and it does so by
37calling ``self.now()`` exactly once per call so the value is observable
38and pinnable from tests. The Propose_Phase's deterministic fallback uses
39no clock and no random source. The Execute_Phase reads the clock for
40the per-phase ``started_at`` / ``ended_at`` timestamps but its outputs
41(the tool-call records) do not depend on those values.
43The ``Context`` type is from FastMCP and brings a heavy dependency tree
44(MCP transport, ``contextvars``, etc.) that unit tests do not need. We
45type ``ctx`` as ``Any | None`` so the engine module imports cleanly in
46isolation; the production wiring threads a real :class:`fastmcp.Context`
47through the dispatcher and sampler callables, where its concrete type
48matters. This is the same trade-off the existing ``mcp/tools/*.py``
49modules make for tools that take an injected context.
50"""
52from __future__ import annotations
54import contextlib
55from collections.abc import Awaitable, Callable, Mapping
56from dataclasses import dataclass, field
57from datetime import UTC, datetime
58from typing import Any, cast
60from . import audit, decide, final_report
61from .checkpoints import mark_checkpoint
62from .predicate import PredicateRejected, evaluate_predicate, parse_predicate
63from .sampling import SamplingFallback, SamplingUsed
64from .types import (
65 TERMINAL_STATES,
66 TERMINAL_VERDICTS,
67 Criterion,
68 CriterionResult,
69 IterationRecord,
70 Observation,
71 PhaseRecord,
72 SessionState,
73 Strategy,
74 ToolCallRecord,
75 VerdictLabel,
76 VerdictReason,
77)
79# <pyflowchart-code-diagram> BEGIN - auto-inserted, do not edit
80# Flowchart(s) generated from this file:
81# * ``MissionEngine.run_iteration`` -> ``diagrams/code_diagrams/mcp/mission/engine.MissionEngine_run_iteration.html``
82# (PNG: ``diagrams/code_diagrams/mcp/mission/engine.MissionEngine_run_iteration.png``)
83# Regenerate with ``python diagrams/code_diagrams/generate.py``.
84# <pyflowchart-code-diagram> END
87__all__ = [
88 "MissionEngine",
89 "MissionEngineError",
90]
93# ---------------------------------------------------------------------------
94# Error
95# ---------------------------------------------------------------------------
98class MissionEngineError(Exception):
99 """Raised by the engine for stable, code-keyed lifecycle errors.
101 The :attr:`code` attribute carries a short stable string (e.g.
102 ``"session_not_found"``, ``"session_terminal"``, ``"session_paused"``,
103 ``"session_failed"``) that the MCP tool wrappers and the CLI render
104 as a structured tool error. The exception's string form falls back
105 to ``code`` so logs always show something meaningful even when the
106 caller does not pull the attribute out explicitly.
107 """
109 def __init__(self, code: str, *, message: str | None = None) -> None:
110 self.code: str = code
111 super().__init__(message if message is not None else code)
114# ---------------------------------------------------------------------------
115# Phase-name constants (typed)
116# ---------------------------------------------------------------------------
118# Centralised so the audit emitter and PhaseRecord constructor share one
119# spelling for each phase. Matches the ``Literal`` shape declared on
120# :class:`PhaseRecord` and on :func:`audit.emit_phase_event`.
121_PROPOSE = "propose"
122_EXECUTE = "execute"
123_OBSERVE = "observe"
124_EVALUATE = "evaluate"
125_DECIDE = "decide"
128# ---------------------------------------------------------------------------
129# Defaults
130# ---------------------------------------------------------------------------
133def _default_now() -> Callable[[], datetime]:
134 """Return a clock callable that yields the current UTC datetime.
136 Used as the ``default_factory`` for :attr:`MissionEngine.now`. Wrapping
137 the lambda in a function keeps ``mypy --strict`` happy with the
138 ``Callable[[], datetime]`` annotation while preserving the
139 "constructed once per engine, called many times" semantics.
140 """
141 return lambda: datetime.now(UTC)
144# ---------------------------------------------------------------------------
145# Engine
146# ---------------------------------------------------------------------------
149# Type aliases for the injected callables. Loose on purpose: the precise
150# shapes settle in later slices (sampling in slice 6, sandbox in slice 5).
151ToolDispatcher = Callable[[str, dict[str, Any], Any], Awaitable[Any]]
152SamplingCallable = Callable[..., Awaitable[Any]]
153SandboxRunner = Callable[
154 [str, Any, ToolDispatcher],
155 Awaitable[tuple[dict[str, Any], list[ToolCallRecord]]],
156]
159@dataclass
160class MissionEngine:
161 """Driver for the Mission five-phase iteration loop.
163 Construction takes every external dependency the engine needs:
165 * ``backend`` — the persistence layer (filesystem, DynamoDB, …).
166 Engine never reaches outside this protocol for state I/O.
167 * ``tool_dispatcher`` — async callable that invokes one MCP tool.
168 Signature ``(tool_name, args, ctx) -> result``. The engine routes
169 every direct ``tool_calls`` invocation through this callable so
170 tests can swap in a stub that returns canned results.
171 * ``sampling_callable`` — optional async callable that produces an
172 LLM-derived next Strategy when the prior Verdict was ``adjust``
173 and the session has ``use_sampling=true``. Loose signature; slice
174 6 finalises it. ``None`` (or any failure inside it) routes the
175 engine to the deterministic fallback strategy.
176 * ``sandbox_runner`` — optional async callable that runs a scripted
177 Strategy in the Mission sandbox. Signature ``(script, ctx,
178 tool_dispatcher) -> (observation_dict, script_call_log)``. ``None``
179 means the engine refuses any scripted strategy with a clear
180 error instead of silently executing operator-supplied code.
181 * ``now`` — injectable clock. Defaults to a UTC clock; tests pin it
182 so deterministic verdicts (the Decide_Phase consults the clock
183 for budget caps and for the cadence resolver) are reproducible.
184 """
186 backend: Any
187 tool_dispatcher: ToolDispatcher
188 sampling_callable: SamplingCallable | None
189 sandbox_runner: SandboxRunner | None
190 now: Callable[[], datetime] = field(default_factory=_default_now)
191 # Optional async callable that drives the Final_Report's
192 # ``lessons`` and ``recommended_followups`` overlay. Loose typing
193 # mirrors :attr:`sampling_callable` so legacy tests that wire a
194 # plain async stub keep working. Production wiring binds it to a
195 # closure over :func:`mcp.mission.sampling.maybe_sample_final_lessons`
196 # — see :meth:`_maybe_sample_final_lessons`. ``None`` (the default)
197 # disables the overlay; the deterministic templates from
198 # :func:`mcp.mission.final_report.build_deterministic_report` stand
199 # on their own in that case.
200 final_lessons_callable: SamplingCallable | None = None
202 # ------------------------------------------------------------------ #
203 # Public surface
204 # ------------------------------------------------------------------ #
206 async def run_iteration(
207 self,
208 session_id: str,
209 ctx: Any | None = None,
210 ) -> IterationRecord:
211 """Run one full iteration for ``session_id`` and return its record.
213 Lifecycle:
215 1. Load the session; raise ``session_not_found`` when missing.
216 2. Refuse a session in any terminal state (``failed`` →
217 ``session_failed``; ``completed`` / ``terminated`` →
218 ``session_terminal``) or in ``paused`` (``session_paused``).
219 3. Transition ``pending → running`` on the very first iteration
220 and stamp ``session["started_at"]``.
221 4. Allocate a fresh :class:`IterationRecord` and run the five
222 phases in order. Each phase emits exactly one
223 ``audit.emit_phase_event`` regardless of outcome.
224 5. On any phase exception: append the partial iteration to
225 ``session["iterations"]``, mark the session ``failed``, save,
226 and re-raise. The session JSON stays inspectable.
227 6. On success: stamp the verdict on the iteration, append it,
228 update the no-progress counter, save the session.
229 7. On terminal verdict (``complete`` / ``terminate``): transition
230 the session status, write the Final_Report, save again.
231 8. Emit one ``audit.emit_verdict_event`` regardless of outcome.
232 9. Return the iteration record.
233 """
234 session = self.backend.load_session(session_id)
235 if session is None: 235 ↛ 236line 235 didn't jump to line 236 because the condition on line 235 was never true
236 raise MissionEngineError("session_not_found")
238 # The terminal-state check distinguishes ``failed`` from the
239 # other terminal states because callers (and the tool-error
240 # table) treat them differently — a failed session needs manual
241 # inspection via ``mission_history``, while a completed /
242 # terminated session is simply done.
243 status = session["status"]
244 if status == "failed":
245 raise MissionEngineError("session_failed")
246 if status in TERMINAL_STATES:
247 raise MissionEngineError("session_terminal")
248 if status == "paused":
249 raise MissionEngineError("session_paused")
251 iteration_start = self.now()
253 # First-iteration transition. ``started_at`` is the wall-clock
254 # anchor for the wall-clock-budget computation in Decide_Phase
255 # so we set it exactly once, on the pending → running edge.
256 if session["status"] == "pending":
257 session["status"] = "running"
258 session["started_at"] = iteration_start.isoformat()
260 iteration_index = len(session["iterations"])
261 record = self._make_iteration_record(iteration_index, iteration_start)
263 try:
264 strategy = await self._propose_phase(session, ctx, record)
265 executed_calls = await self._execute_phase(session, strategy, ctx, record)
266 await self._observe_phase(session, strategy, executed_calls, record)
267 await self._evaluate_phase(session, record)
268 verdict, reason = await self._decide_phase(session, record)
269 except Exception:
270 # Persist a failure record so the session JSON remains a
271 # complete history of everything the loop attempted. The
272 # verdict stays at its placeholder value because no
273 # Decide_Phase actually fired; consumers detect the failure
274 # through ``session["status"] == "failed"`` and the failed
275 # phase entry in ``record["phases"]``.
276 record["ended_at"] = self.now().isoformat()
277 session["iterations"].append(record)
278 session["status"] = "failed"
279 session["ended_at"] = record["ended_at"]
280 with contextlib.suppress(Exception):
281 # A save failure during a failure path must not shadow
282 # the original phase exception — the operator's first
283 # need is to see what actually went wrong, not what
284 # went wrong while reporting what went wrong.
285 self.backend.save_session(session)
286 raise
288 # Stamp the verdict on the iteration record before append; the
289 # decide cascade inspects ``len(session["iterations"])`` (i.e.
290 # iterations *before* the current one) so we deliberately
291 # append after Decide_Phase rather than before.
292 record["verdict"] = verdict
293 record["verdict_reason"] = reason
294 record["ended_at"] = self.now().isoformat()
295 session["iterations"].append(record)
297 self._update_session_post_iteration(session, record)
298 self.backend.save_session(session)
300 if verdict in TERMINAL_VERDICTS:
301 await self._finalise_terminal_session(session, record, verdict, reason)
302 self.backend.save_session(session)
304 # One verdict event per iteration regardless of terminal vs
305 # in-progress, so audit consumers see a uniform stream.
306 audit.emit_verdict_event(
307 session_id=session_id,
308 iteration_index=iteration_index,
309 verdict=verdict,
310 verdict_reason=reason,
311 revision_rationale=record.get("revision_rationale"),
312 )
314 return record
316 # ------------------------------------------------------------------ #
317 # Iteration record bootstrap
318 # ------------------------------------------------------------------ #
320 @staticmethod
321 def _make_iteration_record(iteration_index: int, started_at: datetime) -> IterationRecord:
322 """Build the empty :class:`IterationRecord` for a new iteration.
324 Verdict and reason are placeholder values; the Decide_Phase
325 overwrites them before the record is appended. ``ended_at``
326 is set to the empty string and rewritten just before append
327 so the persisted shape always carries an ISO-8601 timestamp.
328 """
329 record: IterationRecord = {
330 "iteration_index": iteration_index,
331 "started_at": started_at.isoformat(),
332 "ended_at": "",
333 "phases": [],
334 "strategy": cast(Strategy, {}),
335 "observation": cast(Observation, {}),
336 "criteria_evaluation": [],
337 "verdict": "continue",
338 "verdict_reason": "in_progress",
339 "checkpoint_evaluated": False,
340 }
341 return record
343 # ------------------------------------------------------------------ #
344 # Phase wrapper
345 # ------------------------------------------------------------------ #
347 async def _run_phase(
348 self,
349 session: SessionState,
350 record: IterationRecord,
351 phase_name: str,
352 body: Callable[[], Awaitable[Any]],
353 ) -> Any:
354 """Execute ``body`` with phase audit + record bookkeeping.
356 Centralises the try/finally that every phase needs:
358 * stamps ``started_at`` from the engine clock,
359 * runs the body,
360 * stamps ``ended_at`` from the engine clock again,
361 * appends a :class:`PhaseRecord` to ``record["phases"]``,
362 * emits exactly one ``audit.emit_phase_event``.
364 On exception, the finally block records ``status="failed"``
365 with the exception's name + message (truncated to 200 chars to
366 match the audit module's existing convention) and re-raises so
367 ``run_iteration`` can drive the failure path.
368 """
369 started_at = self.now().isoformat()
370 status: str = "succeeded"
371 error_message: str | None = None
372 try:
373 return await body()
374 except Exception as exc:
375 status = "failed"
376 error_message = f"{type(exc).__name__}: {exc}"[:200]
377 raise
378 finally:
379 ended_at = self.now().isoformat()
380 phase_record: PhaseRecord = {
381 "phase": cast(Any, phase_name),
382 "status": cast(Any, status),
383 "started_at": started_at,
384 "ended_at": ended_at,
385 }
386 if error_message:
387 phase_record["error_message"] = error_message
388 record["phases"].append(phase_record)
389 audit.emit_phase_event(
390 session_id=session["session_id"],
391 iteration_index=record["iteration_index"],
392 phase=cast(Any, phase_name),
393 status=cast(Any, status),
394 started_at=started_at,
395 ended_at=ended_at,
396 error_message=error_message,
397 )
399 # ------------------------------------------------------------------ #
400 # Phase 1 — propose
401 # ------------------------------------------------------------------ #
403 async def _propose_phase(
404 self,
405 session: SessionState,
406 ctx: Any | None,
407 record: IterationRecord,
408 ) -> Strategy:
409 """Build the Strategy for this iteration.
411 Two paths:
413 * **Sampling path** — when the prior verdict was ``adjust`` AND
414 the session has ``use_sampling=true`` AND a sampling callable
415 is wired, await the callable and adopt its return value as
416 the Strategy. Any exception from the callable, or any return
417 shape that does not look like a Strategy, falls through to
418 the deterministic path. Slice 6 will replace this with a
419 richer prompt-builder + validator.
420 * **Deterministic path** — re-run the most recent successful
421 tool call (using the same args) when one exists, otherwise
422 invoke the first tool in the session's allowlist with empty
423 args. Pure: no clock, no randomness, no external I/O. The
424 resulting Strategy is always a single ``tool_calls`` entry.
426 The chosen Strategy is stored on ``record["strategy"]`` and
427 returned for the Execute_Phase to consume.
428 """
430 async def body() -> Strategy:
431 strategy = await self._build_strategy(session, ctx, record)
432 record["strategy"] = strategy
433 return strategy
435 return cast(Strategy, await self._run_phase(session, record, _PROPOSE, body))
437 async def _build_strategy(
438 self, session: SessionState, ctx: Any | None, record: IterationRecord
439 ) -> Strategy:
440 """Pick the Propose_Phase Strategy via the sampling-or-fallback rule."""
441 if self._should_attempt_sampling(session):
442 sampled = await self._try_sample_strategy(session, ctx, record)
443 if sampled is not None:
444 return sampled
445 return self._deterministic_strategy(session)
447 def _should_attempt_sampling(self, session: SessionState) -> bool:
448 """True iff the prior verdict was ``adjust`` and sampling is wired."""
449 if self.sampling_callable is None:
450 return False
451 if not session.get("use_sampling"): 451 ↛ 452line 451 didn't jump to line 452 because the condition on line 451 was never true
452 return False
453 iterations = session.get("iterations") or []
454 if not iterations: 454 ↛ 455line 454 didn't jump to line 455 because the condition on line 454 was never true
455 return False
456 return iterations[-1].get("verdict") == "adjust"
458 async def _try_sample_strategy(
459 self, session: SessionState, ctx: Any | None, record: IterationRecord
460 ) -> Strategy | None:
461 """Call the sampling callable and adopt its return as a Strategy.
463 Returns ``None`` on any failure (exception, non-dict return, dict
464 missing both ``tool_calls`` and ``script``) so the caller can
465 fall back to the deterministic strategy. Shape validation here
466 is intentionally tight: the engine cannot run a ``script``
467 strategy without a sandbox, and we never want a malformed sampler
468 result to cascade into Execute_Phase as an opaque error.
470 Three return shapes are recognised:
472 * :class:`mcp.mission.sampling.SamplingUsed` — the production
473 orchestration helper's accepted-output type. The Strategy is
474 read from ``parsed["next_strategy"]``; the sampler's
475 ``revision_rationale`` is stamped on the iteration ``record``
476 so :func:`mcp.mission.audit.emit_verdict_event` surfaces it
477 in the next iteration's audit trail.
478 * :class:`mcp.mission.sampling.SamplingFallback` — the
479 orchestration helper's rejection / fallback type. The engine
480 treats this exactly like a missing return: ``None`` so the
481 deterministic-fallback path runs. The fallback's own
482 rationale stays on the audit event the helper already emitted;
483 we deliberately do *not* override the engine's deterministic
484 rationale-template here so the verdict path stays fully
485 deterministic when sampling rejects.
486 * Raw ``dict`` (the legacy / test pattern) — kept verbatim so
487 existing engine tests that pass simple async lambdas returning
488 ``{"tool_calls": [...]}`` continue to work without churn.
489 """
490 assert self.sampling_callable is not None # narrowed by caller
491 try:
492 result = await self.sampling_callable(session=session, ctx=ctx)
493 except Exception:
494 return None
496 # Phase 6.7 result types — the orchestration helper returns
497 # either ``SamplingUsed`` (accept) or ``SamplingFallback``
498 # (reject). The engine maps them onto its existing
499 # "Strategy or fall back" surface.
500 if isinstance(result, SamplingUsed):
501 next_strategy = result.parsed.get("next_strategy")
502 if not isinstance(next_strategy, dict): 502 ↛ 503line 502 didn't jump to line 503 because the condition on line 502 was never true
503 return None
504 self._capture_sampled_rationale(record, result.parsed)
505 return self._coerce_strategy_dict(next_strategy)
507 if isinstance(result, SamplingFallback):
508 # The fallback's own deterministic rationale is already on
509 # the emitted audit event. The engine routes through its
510 # own deterministic-fallback path so the verdict path stays
511 # fully deterministic — returning ``None`` is the signal.
512 return None
514 # Legacy raw-dict return — preserved verbatim so older tests
515 # and callers that wire a simple ``async def: return {...}``
516 # stub continue to work unchanged.
517 if not isinstance(result, dict): 517 ↛ 518line 517 didn't jump to line 518 because the condition on line 517 was never true
518 return None
519 return self._coerce_strategy_dict(result)
521 def _coerce_strategy_dict(self, candidate: dict[str, Any]) -> Strategy | None:
522 """Adopt ``candidate`` as a :class:`Strategy` if it is well-shaped.
524 Centralises the structural check (exactly one of ``tool_calls``
525 or ``script`` populated and well-typed) so both the
526 :class:`SamplingUsed` path and the legacy raw-dict path land
527 through one validator. Returns ``None`` for any malformed
528 shape; callers fall back to the deterministic strategy.
529 """
530 if "tool_calls" in candidate:
531 tool_calls = candidate["tool_calls"]
532 if isinstance(tool_calls, list) and tool_calls:
533 return cast(Strategy, dict(candidate))
534 return None
535 if "script" in candidate:
536 script = candidate["script"]
537 if isinstance(script, str) and script:
538 # The sandbox runner is the only thing that can
539 # safely execute a script. If it isn't wired, the
540 # sampled script is unusable — fall back.
541 if self.sandbox_runner is None: 541 ↛ 543line 541 didn't jump to line 543 because the condition on line 541 was always true
542 return None
543 return cast(Strategy, dict(candidate))
544 return None
545 return None
547 @staticmethod
548 def _capture_sampled_rationale(record: IterationRecord, parsed_payload: dict[str, Any]) -> None:
549 """Stamp the sampler's ``revision_rationale`` on ``record`` if present.
551 The advisory model's rationale lives at
552 ``parsed_payload["revision_rationale"]`` per the
553 Strategy_Revision schema. Recording it on the iteration record
554 means :func:`mcp.mission.audit.emit_verdict_event` (which the
555 engine calls at the end of ``run_iteration`` with
556 ``record.get("revision_rationale")``) emits the model-derived
557 text instead of the deterministic template that the
558 Decide_Phase synthesises for ``adjust`` verdicts. The engine
559 only ever calls this from the sampling-success path, so a
560 rejection / fallback never overrides the deterministic
561 template the Decide_Phase set.
562 """
563 rationale = parsed_payload.get("revision_rationale")
564 if isinstance(rationale, str) and rationale: 564 ↛ exitline 564 didn't return from function '_capture_sampled_rationale' because the condition on line 564 was always true
565 record["revision_rationale"] = rationale
567 def _deterministic_strategy(self, session: SessionState) -> Strategy:
568 """Build the fallback Strategy when sampling is off or unusable.
570 Re-runs the most recent successful tool call. When the prior
571 call used empty args and there are unmet criteria, the
572 widening rule injects a ``query`` parameter derived from the
573 unmet criterion IDs — this gives catalog-search tools
574 (``find_examples``, ``find_docs``) a chance to return
575 relevant content without needing the sampler. When no
576 successful call exists yet, the first tool in the allowlist
577 runs with widened args if possible, empty args otherwise.
578 """
579 prior_call = self._find_most_recent_successful_call(session)
580 if prior_call is not None:
581 tool_name, args = prior_call
582 widened = self._widen_args(args, session)
583 rationale_suffix = " with widened args" if widened != args else " with prior args"
584 return cast(
585 Strategy,
586 {
587 "tool_calls": [{"tool_name": tool_name, "args": dict(widened)}],
588 "rationale": f"deterministic fallback: re-run {tool_name}{rationale_suffix}",
589 },
590 )
591 allowlist = session.get("tool_allowlist") or []
592 if not allowlist:
593 raise MissionEngineError("propose_no_tool_available")
594 widened = self._widen_args({}, session)
595 return cast(
596 Strategy,
597 {
598 "tool_calls": [{"tool_name": allowlist[0], "args": widened}],
599 "rationale": (
600 "deterministic fallback: invoking first allowlisted "
601 "tool with widened args from unmet criteria"
602 if widened
603 else "deterministic fallback: invoking first allowlisted "
604 "tool with empty args (no prior successful call)"
605 ),
606 },
607 )
609 @staticmethod
610 def _widen_args(args: dict[str, Any], session: SessionState) -> dict[str, Any]:
611 """Inject a ``query`` parameter from unmet criteria when args are empty.
613 The widening rule is intentionally simple: if the args dict
614 has no ``query`` key (or it's empty), extract keywords from
615 the IDs of unmet criteria (splitting on ``_``) and join them
616 as a space-separated query string. This gives catalog-search
617 tools a chance to return relevant content on the deterministic
618 path without needing the sampler.
620 Returns the original args unchanged when:
621 - ``query`` is already populated (don't override operator intent)
622 - No unmet criteria exist (nothing to widen toward)
623 - The session has no iteration history (first call, no eval yet)
624 """
625 # Don't override an existing query.
626 if args.get("query"):
627 return args
629 # Find unmet criteria from the latest iteration's evaluation.
630 iterations = session.get("iterations") or []
631 if not iterations:
632 return args
633 latest = iterations[-1]
634 criteria_eval = latest.get("criteria_evaluation") or []
635 unmet_ids: list[str] = []
636 for result in criteria_eval:
637 if isinstance(result, dict) and result.get("status") == "unmet":
638 cid = result.get("criterion_id", "")
639 if cid: 639 ↛ 636line 639 didn't jump to line 636 because the condition on line 639 was always true
640 unmet_ids.append(cid)
641 if not unmet_ids:
642 return args
644 # Build a query from the unmet criterion IDs by splitting on
645 # underscores and deduplicating. Skip generic words.
646 skip_words = {"called", "succeeded", "found", "present", "no", "errors", "occurred"}
647 keywords: list[str] = []
648 seen: set[str] = set()
649 for cid in unmet_ids:
650 for word in cid.split("_"):
651 word_lower = word.lower()
652 if word_lower not in skip_words and word_lower not in seen and len(word) > 2:
653 keywords.append(word_lower)
654 seen.add(word_lower)
655 if not keywords:
656 return args
658 widened = dict(args)
659 widened["query"] = " ".join(keywords[:5]) # Cap at 5 keywords
660 return widened
662 @staticmethod
663 def _find_most_recent_successful_call(
664 session: SessionState,
665 ) -> tuple[str, dict[str, Any]] | None:
666 """Walk the iteration history backwards for a successful tool call.
668 Returns ``(tool_name, args)`` for the most recent call whose
669 :class:`ToolCallRecord` has ``status="ok"``. Looks at both the
670 recorded executed-call list (for tool_calls strategies — kept
671 on the iteration's Strategy under ``tool_calls`` after execute)
672 and the script call log (for scripted strategies). Skips
673 iterations whose strategy was a script with no successful
674 embedded tool call.
676 Returns ``None`` when no prior successful call exists across
677 the entire history.
678 """
679 for iteration in reversed(session.get("iterations") or []):
680 # Scripted strategies record their inner calls on
681 # ``script_call_log``; direct tool_calls strategies don't
682 # have that key.
683 for source_key in ("script_call_log",):
684 log = iteration.get(source_key)
685 if not log: 685 ↛ 687line 685 didn't jump to line 687 because the condition on line 685 was always true
686 continue
687 for call in reversed(log):
688 if (
689 isinstance(call, dict)
690 and call.get("status") == "ok"
691 and isinstance(call.get("tool_name"), str)
692 and isinstance(call.get("args"), dict)
693 ):
694 return call["tool_name"], dict(call["args"])
695 # Direct tool_calls strategies write the executed records
696 # back onto the Strategy under ``tool_calls`` (each entry
697 # carrying the same status / args fields the script log
698 # would carry). This keeps a single lookup path here.
699 strategy = iteration.get("strategy") or {}
700 tool_calls = strategy.get("tool_calls") or []
701 for tc in reversed(tool_calls): 701 ↛ 679line 701 didn't jump to line 679 because the loop on line 701 didn't complete
702 if ( 702 ↛ 701line 702 didn't jump to line 701 because the condition on line 702 was always true
703 isinstance(tc, dict)
704 and tc.get("status") == "ok"
705 and isinstance(tc.get("tool_name"), str)
706 and isinstance(tc.get("args"), dict)
707 ):
708 return tc["tool_name"], dict(tc["args"])
709 return None
711 # ------------------------------------------------------------------ #
712 # Phase 2 — execute
713 # ------------------------------------------------------------------ #
715 async def _execute_phase(
716 self,
717 session: SessionState,
718 strategy: Strategy,
719 ctx: Any | None,
720 record: IterationRecord,
721 ) -> list[ToolCallRecord]:
722 """Run the Strategy and return the list of executed tool calls.
724 Two modes:
726 * **tool_calls** — iterate the strategy's ``tool_calls`` in
727 order. For each: gate by the session's allowlist, dispatch
728 via :attr:`tool_dispatcher`, and record the outcome
729 (``ok`` / ``failed`` / ``skipped_not_allowed``). One failed
730 call does not abort the iteration — the next call still
731 runs, the failure lands as one entry, and Observe_Phase
732 surfaces it under ``errors``.
733 * **script** — hand the script to :attr:`sandbox_runner` along
734 with ``ctx`` and the engine's own ``tool_dispatcher`` so the
735 sandbox can safely invoke allowlisted tools as native
736 callables. The runner returns ``(observation_dict,
737 script_call_log)``. The observation is stashed on the record
738 for Observe_Phase to use directly; the script_call_log is
739 stored on ``record["script_call_log"]``.
741 For both modes, every successful call (or each successful
742 embedded call in script mode) is recorded into the iteration
743 record for audit and replay.
744 """
746 async def body() -> list[ToolCallRecord]:
747 if "script" in strategy:
748 return await self._execute_script(session, strategy, ctx, record)
749 return await self._execute_tool_calls(session, strategy, ctx, record)
751 return cast(
752 list[ToolCallRecord],
753 await self._run_phase(session, record, _EXECUTE, body),
754 )
756 async def _execute_tool_calls(
757 self,
758 session: SessionState,
759 strategy: Strategy,
760 ctx: Any | None,
761 record: IterationRecord,
762 ) -> list[ToolCallRecord]:
763 """Run the Strategy's ``tool_calls`` list with allowlist gating."""
764 allowlist = set(session.get("tool_allowlist") or [])
765 executed: list[ToolCallRecord] = []
766 for entry in strategy.get("tool_calls", []) or []:
767 tool_name = entry.get("tool_name") if isinstance(entry, dict) else None
768 args = entry.get("args") if isinstance(entry, dict) else {}
769 if not isinstance(tool_name, str) or not tool_name: 769 ↛ 774line 769 didn't jump to line 774 because the condition on line 769 was never true
770 # A malformed tool_calls entry is the operator's bug,
771 # but failing the entire iteration over one bad entry
772 # is harsher than the loop semantics demand. Record
773 # it as a failed call and move on.
774 executed.append(
775 {
776 "tool_name": str(tool_name) if tool_name else "<unknown>",
777 "args": args if isinstance(args, dict) else {},
778 "status": "failed",
779 "result_summary": None,
780 "duration_ms": 0,
781 "error_message": "tool_name_missing_or_invalid",
782 }
783 )
784 continue
785 if not isinstance(args, dict): 785 ↛ 786line 785 didn't jump to line 786 because the condition on line 785 was never true
786 args = {}
787 if tool_name not in allowlist:
788 executed.append(
789 {
790 "tool_name": tool_name,
791 "args": args,
792 "status": "skipped_not_allowed",
793 "result_summary": None,
794 "duration_ms": 0,
795 }
796 )
797 continue
798 executed.append(await self._dispatch_one_call(session, tool_name, args, ctx))
800 # Persist the executed records back onto the Strategy so the
801 # propose-fallback's "most recent successful call" lookup has
802 # a single source of truth on every persisted iteration.
803 record["strategy"]["tool_calls"] = [dict(call) for call in executed]
804 return executed
806 async def _dispatch_one_call(
807 self,
808 session: SessionState,
809 tool_name: str,
810 args: dict[str, Any],
811 ctx: Any | None,
812 ) -> ToolCallRecord:
813 """Invoke one allowlisted tool through the dispatcher."""
814 del session # accepted for symmetry with the execute path; unused
815 started = self.now()
816 try:
817 result = await self.tool_dispatcher(tool_name, args, ctx)
818 except Exception as exc:
819 duration_ms = self._elapsed_ms(started)
820 return {
821 "tool_name": tool_name,
822 "args": args,
823 "status": "failed",
824 "result_summary": None,
825 "duration_ms": duration_ms,
826 "error_message": f"{type(exc).__name__}: {exc}"[:200],
827 }
828 duration_ms = self._elapsed_ms(started)
829 record: ToolCallRecord = {
830 "tool_name": tool_name,
831 "args": args,
832 "status": "ok",
833 "result_summary": result,
834 "duration_ms": duration_ms,
835 }
836 return record
838 async def _execute_script(
839 self,
840 session: SessionState,
841 strategy: Strategy,
842 ctx: Any | None,
843 record: IterationRecord,
844 ) -> list[ToolCallRecord]:
845 """Run a scripted strategy through the wired sandbox runner.
847 Three failure modes get translated into stable engine error
848 codes so the MCP tool wrappers and the CLI render them as
849 structured rejections rather than opaque tracebacks:
851 * ``sandbox_runner is None`` — the engine was constructed
852 without a sandbox. The session-start validator should have
853 rejected any script-bearing Strategy already (scripts go
854 through ``validate_script_ast`` before they reach here),
855 but a sampled-then-injected script could still arrive at
856 this method. Treat it as a validation failure with the
857 equivalent of ``script_rejected``.
858 * :class:`ScriptRejected` from inside the runner — the runner
859 re-validated the script just before execution and the AST
860 gate fired. Re-raise as ``script_rejected``.
861 * :class:`SandboxTerminated` from inside the runner — Monty
862 killed the script for exceeding a duration / memory cap.
863 The cap is a true budget cap, not a code-quality failure,
864 so the engine *swallows* the exception, builds a partial
865 Observation from whatever the script collected before being
866 killed, stashes the partial ``script_call_log`` and a
867 ``sandbox_terminated_reason`` sentinel on the iteration
868 record, and returns a list of partial calls. The Decide_Phase
869 reads the sentinel and emits ``("terminate",
870 "max_wall_clock")`` so the verdict surfaces on the
871 budget-cap path rather than via a phase failure.
873 The sandbox module is imported lazily inside this method so
874 the engine module stays importable on hosts where the
875 underlying ``pydantic_monty`` dependency is absent (CLI-only
876 environments, dry-run validators, etc.). When the lazy
877 import fails, the structured-exception translation is
878 skipped and the original exception bubbles up to the
879 ``run_iteration`` failure path — the engine still records a
880 failed Execute_Phase, which is the right behaviour even
881 without per-class translation.
882 """
883 del session # accepted for symmetry with the execute path; unused
884 if self.sandbox_runner is None:
885 raise MissionEngineError("script_rejected")
886 script = strategy["script"]
887 try:
888 observation_dict, script_call_log = await self.sandbox_runner(
889 script, ctx, self.tool_dispatcher
890 )
891 except Exception as exc:
892 # Late-resolved class lookup: importing the sandbox
893 # module at top of file would pull in
894 # ``pydantic_monty`` on import, which the engine
895 # explicitly does not require (an operator can run
896 # ``mission_validate`` against a stored session JSON
897 # without a working sandbox). Importing here means the
898 # translation is best-effort but the engine module
899 # itself stays loadable everywhere.
900 try:
901 from .sandbox import (
902 SandboxTerminated,
903 ScriptRejected,
904 )
905 except Exception:
906 raise
907 if isinstance(exc, ScriptRejected):
908 raise MissionEngineError("script_rejected") from exc
909 if isinstance(exc, SandboxTerminated): 909 ↛ 949line 909 didn't jump to line 949 because the condition on line 909 was always true
910 # The sandbox cap is a budget cap, not a phase
911 # failure: the script ran out of wall clock (or
912 # memory, or hit a runtime / typing / syntax error
913 # mid-run) under operator-supplied limits. Capture
914 # whatever it collected before being killed and
915 # route the verdict through the budget-cap path.
916 #
917 # The sentinel on the iteration record is what the
918 # cascade in ``decide_verdict`` reads to short-
919 # circuit to ``("terminate", "max_wall_clock")``
920 # before any other branch is consulted; without it
921 # the cascade would fall through to the default
922 # ``("continue", "in_progress")`` because no other
923 # cap was breached.
924 record["script_call_log"] = cast(
925 "list[ToolCallRecord]", list(exc.partial_script_call_log)
926 )
927 # Build a minimal Observation from the partial
928 # logs so Evaluate_Phase has the same shape it
929 # would have on a successful sandbox run. Missing
930 # keys (``metrics``, ``events``, etc.) get default
931 # empties; Observe_Phase fills in any timestamps
932 # the partial doesn't carry.
933 partial_observation: dict[str, Any] = {
934 "tool_results": [
935 self._annotate_tool_result(call) for call in exc.partial_script_call_log
936 ],
937 "metrics": {},
938 "events": list(exc.partial_events),
939 }
940 if exc.partial_observations:
941 partial_observation["metrics"]["observations"] = {
942 entry["key"]: entry["value"]
943 for entry in exc.partial_observations
944 if isinstance(entry, dict) and "key" in entry
945 }
946 record["observation"] = cast(Observation, partial_observation)
947 record["sandbox_terminated_reason"] = "max_wall_clock"
948 return cast("list[ToolCallRecord]", list(exc.partial_script_call_log))
949 raise
950 # The sandbox already produced a normalized Observation dict,
951 # so we cache it on the record for Observe_Phase to pick up
952 # directly. This is the only path where Observe_Phase sees a
953 # pre-built Observation.
954 record["script_call_log"] = cast("list[ToolCallRecord]", list(script_call_log))
955 record["observation"] = cast(Observation, dict(observation_dict))
956 return list(script_call_log)
958 def _elapsed_ms(self, started: datetime) -> int:
959 """Return integer milliseconds elapsed since ``started``."""
960 delta = self.now() - started
961 return max(int(delta.total_seconds() * 1000), 0)
963 # ------------------------------------------------------------------ #
964 # Phase 3 — observe
965 # ------------------------------------------------------------------ #
967 async def _observe_phase(
968 self,
969 session: SessionState,
970 strategy: Strategy,
971 executed_calls: list[ToolCallRecord],
972 record: IterationRecord,
973 ) -> None:
974 """Normalise tool-call outputs into an :class:`Observation`.
976 Two paths:
978 * **Script strategy** — Execute_Phase already stashed the
979 sandbox's Observation on ``record["observation"]``. Observe
980 fills in any missing required keys (``tool_results``,
981 ``metrics``, ``events``, ``phase_started_at`` /
982 ``phase_ended_at``) so downstream Evaluate_Phase consumers
983 can rely on the shape.
984 * **Tool-calls strategy** — build the Observation from the
985 executed-call records: ``tool_results`` is the list of
986 ``result_summary`` values (one per call, including failed
987 ones for stable indexing); ``metrics`` and ``events`` are
988 merged from any call result that carries those keys at the
989 top level; ``errors`` is appended for failed or skipped
990 calls. This is intentionally permissive — a Strategy that
991 doesn't produce metrics or events leaves those slots empty
992 rather than raising.
993 """
995 async def body() -> None:
996 phase_started = self.now()
997 if "script" in strategy:
998 # The sandbox already produced the Observation. Fill
999 # in any timestamp slots it didn't populate so the
1000 # shape is uniform for evaluators.
1001 obs = cast(dict[str, Any], record.get("observation") or {})
1002 obs.setdefault("tool_results", [])
1003 obs.setdefault("metrics", {})
1004 obs.setdefault("events", [])
1005 obs.setdefault("phase_started_at", phase_started.isoformat())
1006 obs.setdefault("phase_ended_at", self.now().isoformat())
1007 record["observation"] = cast(Observation, obs)
1008 return
1009 record["observation"] = self._build_observation(executed_calls, phase_started)
1011 await self._run_phase(session, record, _OBSERVE, body)
1013 @staticmethod
1014 def _annotate_tool_result(call: ToolCallRecord | dict[str, Any]) -> Any:
1015 """Wrap a call's ``result_summary`` with the per-call call markers.
1017 The Observation's ``tool_results`` list is the canonical input
1018 to predicate criteria and to the dedicated ``tool_call_succeeded``
1019 evaluator. Both consult ``r.get("_status")`` and
1020 ``r.get("tool_name")`` to know which tool produced the entry
1021 and whether the call succeeded — markers the engine adds here
1022 rather than relying on individual tools to inject. The stub
1023 dispatcher used to synthesise these markers in its return,
1024 but the live FastMCP dispatcher returns whatever shape the
1025 underlying tool produces (often a structured ``{"result": [
1026 ... ]}`` dict that doesn't carry call-level metadata).
1028 Strategy:
1030 * **Dict result_summary** — augment in place with ``_status``
1031 and ``tool_name`` only when those keys are absent. This
1032 keeps any caller-supplied marker visible (some tools do
1033 synthesise them) while ensuring evaluators always find
1034 them.
1035 * **Non-dict result_summary** (None, list, primitive) — wrap
1036 in a fresh dict carrying the call's ``_status`` /
1037 ``tool_name`` plus a ``result`` field that holds the
1038 original payload so predicates can still walk into it.
1039 """
1040 result = call.get("result_summary")
1041 status = call.get("status") or "unknown"
1042 tool_name = call.get("tool_name")
1043 if isinstance(result, dict):
1044 annotated = dict(result)
1045 annotated.setdefault("_status", status)
1046 annotated.setdefault("tool_name", tool_name)
1047 return annotated
1048 return {
1049 "_status": status,
1050 "tool_name": tool_name,
1051 "result": result,
1052 }
1054 def _build_observation(
1055 self,
1056 executed_calls: list[ToolCallRecord],
1057 phase_started: datetime,
1058 ) -> Observation:
1059 """Merge a list of :class:`ToolCallRecord` into an :class:`Observation`.
1061 Each ``executed_calls`` entry contributes one annotated dict
1062 to ``observation["tool_results"]`` via
1063 :meth:`_annotate_tool_result` so the entry always carries
1064 the ``_status`` and ``tool_name`` markers the predicate
1065 evaluator and the ``tool_call_succeeded`` evaluator both rely
1066 on, regardless of what shape the underlying tool returned.
1067 """
1068 tool_results: list[Any] = []
1069 metrics: dict[str, Any] = {}
1070 events: list[dict[str, Any]] = []
1071 errors: list[dict[str, Any]] = []
1073 for call in executed_calls:
1074 tool_results.append(self._annotate_tool_result(call))
1075 if call.get("status") == "ok":
1076 result = call.get("result_summary")
1077 # Permissive merge: when a tool's result happens to
1078 # include a top-level ``metrics`` dict or ``events``
1079 # list, lift them into the Observation. Anything else
1080 # stays only in ``tool_results``.
1081 if isinstance(result, dict):
1082 result_metrics = result.get("metrics")
1083 if isinstance(result_metrics, dict):
1084 metrics.update(result_metrics)
1085 result_events = result.get("events")
1086 if isinstance(result_events, list): 1086 ↛ 1087line 1086 didn't jump to line 1087 because the condition on line 1086 was never true
1087 for event in result_events:
1088 if isinstance(event, dict):
1089 events.append(event)
1090 else:
1091 # ``failed`` and ``skipped_not_allowed`` both surface as
1092 # errors; the heuristic in decide.py uses "errors that
1093 # didn't appear in the prior Observation" to drive the
1094 # adjust verdict, so a stable shape per error matters.
1095 errors.append(
1096 {
1097 "tool_name": call.get("tool_name"),
1098 "status": call.get("status"),
1099 "error_message": call.get("error_message"),
1100 }
1101 )
1103 observation: Observation = {
1104 "tool_results": tool_results,
1105 "metrics": metrics,
1106 "events": events,
1107 "phase_started_at": phase_started.isoformat(),
1108 "phase_ended_at": self.now().isoformat(),
1109 }
1110 if errors:
1111 observation["errors"] = errors
1112 return observation
1114 # ------------------------------------------------------------------ #
1115 # Phase 4 — evaluate
1116 # ------------------------------------------------------------------ #
1118 async def _evaluate_phase(self, session: SessionState, record: IterationRecord) -> None:
1119 """Walk the session's Criteria and produce :class:`CriterionResult` rows.
1121 The kinds dispatch to per-kind helpers:
1123 * ``metric_threshold`` — dot-path lookup on the Observation,
1124 numeric comparison via the declared operator.
1125 * ``event`` — scan the Observation's ``events`` list for an
1126 entry whose ``event_name`` matches the criterion's target.
1127 * ``predicate`` — evaluate the cached parsed AST against the
1128 Observation. A raised exception lands as ``inconclusive`` so
1129 a malformed predicate cannot crash the loop.
1131 Order in the output list matches the declared order of
1132 ``session["criteria"]`` so iteration audit consumers can pair
1133 results with criteria positionally.
1134 """
1136 async def body() -> None:
1137 observation = cast(dict[str, Any], record.get("observation") or {})
1138 # Build a cumulative observation for predicates: merge all
1139 # prior iterations' tool_results into the current one so
1140 # predicates like ``any('gpu' in str(r) for r in
1141 # obs['tool_results'])`` can see results from prior
1142 # iterations. Metrics and events stay per-iteration (they
1143 # represent the current state, not history).
1144 cumulative_obs = self._build_cumulative_observation(observation, session)
1145 results: list[CriterionResult] = []
1146 for criterion in session.get("criteria") or []:
1147 results.append(
1148 self._evaluate_one_criterion(criterion, observation, cumulative_obs, session)
1149 )
1150 record["criteria_evaluation"] = results
1152 await self._run_phase(session, record, _EVALUATE, body)
1154 @staticmethod
1155 def _build_cumulative_observation(
1156 current_obs: dict[str, Any], session: SessionState
1157 ) -> dict[str, Any]:
1158 """Merge prior iterations' tool_results and metric history into a view.
1160 Two things are cumulative on the returned view:
1162 * ``tool_results`` — every prior iteration's results concatenated with
1163 the current iteration's, so predicate criteria can see results from
1164 all iterations. This enables multi-tool goals where each tool runs in
1165 a different iteration to converge.
1166 * ``metric_history`` — a history-aware map from metric name to the
1167 ordered list of its numeric values across the session
1168 (oldest→newest, current iteration last). This is what lets the
1169 ``metric_trend`` criterion ask "is loss falling across iterations?"
1170 even though the engine keeps the per-iteration ``metrics`` dict
1171 strictly point-in-time. Non-numeric and boolean metric values are
1172 skipped so a stray string reading cannot poison a trend.
1174 ``metrics``, ``events``, and ``errors`` stay per-iteration on the view
1175 because they represent current state: ``metrics`` is the latest
1176 point-in-time reading (a ``metric_threshold`` criterion still compares
1177 the single current value), events are per-iteration signals, and errors
1178 are per-call. The history lives *alongside* them under
1179 ``metric_history`` rather than replacing them, so existing criteria are
1180 unaffected.
1181 """
1182 all_tool_results: list[Any] = []
1183 metric_history: dict[str, list[float]] = {}
1185 def _accumulate_metrics(obs: Mapping[str, Any]) -> None:
1186 metrics = obs.get("metrics")
1187 if not isinstance(metrics, dict): 1187 ↛ 1188line 1187 didn't jump to line 1188 because the condition on line 1187 was never true
1188 return
1189 for key, value in metrics.items():
1190 # Mirror the Numeric_Value guard the readers use: int or float,
1191 # never bool. A non-numeric reading contributes no history
1192 # point rather than breaking the series.
1193 if isinstance(value, bool) or not isinstance(value, (int, float)):
1194 continue
1195 metric_history.setdefault(key, []).append(float(value))
1197 for prior in session.get("iterations") or []:
1198 prior_obs = prior.get("observation") or {}
1199 prior_results = prior_obs.get("tool_results")
1200 if isinstance(prior_results, list): 1200 ↛ 1202line 1200 didn't jump to line 1202 because the condition on line 1200 was always true
1201 all_tool_results.extend(prior_results)
1202 _accumulate_metrics(prior_obs)
1203 # Append current iteration's results and metrics last so the history is
1204 # ordered oldest→newest with the current reading at the end.
1205 current_results = current_obs.get("tool_results")
1206 if isinstance(current_results, list): 1206 ↛ 1208line 1206 didn't jump to line 1208 because the condition on line 1206 was always true
1207 all_tool_results.extend(current_results)
1208 _accumulate_metrics(current_obs)
1209 # Build the cumulative view: tool_results + metric_history are
1210 # cumulative, everything else comes from the current observation.
1211 cumulative: dict[str, Any] = dict(current_obs)
1212 cumulative["tool_results"] = all_tool_results
1213 cumulative["metric_history"] = metric_history
1214 return cumulative
1216 def _evaluate_one_criterion(
1217 self,
1218 criterion: Criterion,
1219 observation: dict[str, Any],
1220 cumulative_obs: dict[str, Any],
1221 session: SessionState,
1222 ) -> CriterionResult:
1223 """Dispatch to the right evaluator and produce a result row."""
1224 criterion_id = criterion["criterion_id"]
1225 kind = criterion["kind"]
1226 evaluated_at = self.now().isoformat()
1228 if kind == "metric_threshold":
1229 status, evidence = self._evaluate_metric_threshold(criterion, observation)
1230 elif kind == "metric_trend":
1231 # Trend evaluates against the cumulative observation, where the
1232 # engine accumulates ``metric_history`` across iterations.
1233 status, evidence = self._evaluate_metric_trend(criterion, cumulative_obs)
1234 elif kind == "event": 1234 ↛ 1235line 1234 didn't jump to line 1235 because the condition on line 1234 was never true
1235 status, evidence = self._evaluate_event(criterion, observation)
1236 elif kind == "predicate":
1237 # Predicates evaluate against the cumulative observation so
1238 # they can see tool_results from all prior iterations.
1239 status, evidence = self._evaluate_predicate(criterion, cumulative_obs)
1240 elif kind == "tool_call_succeeded": 1240 ↛ 1247line 1240 didn't jump to line 1247 because the condition on line 1240 was always true
1241 status, evidence = self._evaluate_tool_call_succeeded(criterion, observation, session)
1242 else:
1243 # Unreachable when the validator has run — but if a
1244 # malformed session somehow lands here, surface the bad
1245 # kind as inconclusive rather than raising and tearing
1246 # down the entire iteration.
1247 status = "inconclusive"
1248 evidence = f"unknown_criterion_kind:{kind!r}"
1250 return {
1251 "criterion_id": criterion_id,
1252 "status": cast(Any, status),
1253 "evidence": evidence,
1254 "evaluated_at": evaluated_at,
1255 }
1257 @staticmethod
1258 def _evaluate_metric_threshold(
1259 criterion: Criterion, observation: dict[str, Any]
1260 ) -> tuple[str, Any]:
1261 """Look up the metric by dot-path and compare to ``target``."""
1262 path = criterion.get("metric") or ""
1263 op = criterion.get("op")
1264 target = criterion.get("target")
1265 value: Any = observation
1266 for segment in path.split("."):
1267 if isinstance(value, dict) and segment in value:
1268 value = value[segment]
1269 else:
1270 return "inconclusive", f"metric_path_missing:{path!r}"
1271 if isinstance(value, bool) or not isinstance(value, (int, float)):
1272 return "inconclusive", value
1273 try:
1274 met = _compare_numbers(value, cast(str, op), cast(float, target))
1275 except ValueError:
1276 return "inconclusive", value
1277 return ("met" if met else "unmet"), value
1279 @staticmethod
1280 def _evaluate_metric_trend(
1281 criterion: Criterion, cumulative_obs: dict[str, Any]
1282 ) -> tuple[str, Any]:
1283 """Evaluate a metric's direction across the accumulated history.
1285 Reads the metric's value series from
1286 ``cumulative_obs["metric_history"]`` — the oldest→newest list of
1287 numeric readings the engine accumulates in
1288 :meth:`_build_cumulative_observation`. The ``metric`` dot-path is
1289 resolved against that map: a leading ``metrics.`` segment is stripped
1290 so a criterion can reuse the same ``"metrics.loss"`` path it would use
1291 for ``metric_threshold`` and still address the ``loss`` history series.
1293 The series is trimmed to the most-recent ``window`` points (default:
1294 all available). With fewer than ``min_points`` numeric points (default
1295 2) the criterion is ``inconclusive`` — a trend is undefined on a single
1296 reading, and the loop must never be failed for lack of history. The
1297 verdict compares the last point to the first point of the windowed
1298 series per ``direction``:
1300 * ``decreasing`` → last < first
1301 * ``increasing`` → last > first
1302 * ``non_increasing`` → last <= first
1303 * ``non_decreasing`` → last >= first
1305 Evidence is a structured dict (direction, the windowed points, first /
1306 last, and net delta) so the audit log shows exactly what the verdict
1307 was computed from.
1308 """
1309 path = criterion.get("metric") or ""
1310 direction = criterion.get("direction")
1311 # Resolve the metric key against the history map. Accept both the bare
1312 # key (``"loss"``) and the dot-path form (``"metrics.loss"``) so a
1313 # trend criterion lines up with the metric_threshold convention.
1314 history = cumulative_obs.get("metric_history")
1315 if not isinstance(history, dict):
1316 return "inconclusive", "metric_history_missing"
1317 key = path.split(".", 1)[1] if path.startswith("metrics.") else path
1318 series = history.get(key)
1319 if not isinstance(series, list) or not series:
1320 return "inconclusive", f"metric_history_empty:{key!r}"
1322 # Keep only the numeric points (the accumulator already filters, but be
1323 # defensive against a hand-built cumulative_obs in tests).
1324 points: list[float] = [
1325 float(v) for v in series if not isinstance(v, bool) and isinstance(v, (int, float))
1326 ]
1328 window = criterion.get("window")
1329 if isinstance(window, int) and not isinstance(window, bool) and window > 0:
1330 points = points[-window:]
1332 min_points = criterion.get("min_points")
1333 required_points = (
1334 min_points if isinstance(min_points, int) and not isinstance(min_points, bool) else 2
1335 )
1336 required_points = max(2, required_points)
1337 if len(points) < required_points:
1338 return "inconclusive", {
1339 "reason": "insufficient_history",
1340 "points": points,
1341 "required_points": required_points,
1342 }
1344 first = points[0]
1345 last = points[-1]
1346 delta = last - first
1347 if direction == "decreasing":
1348 met = last < first
1349 elif direction == "increasing":
1350 met = last > first
1351 elif direction == "non_increasing": 1351 ↛ 1353line 1351 didn't jump to line 1353 because the condition on line 1351 was always true
1352 met = last <= first
1353 elif direction == "non_decreasing":
1354 met = last >= first
1355 else:
1356 # Unreachable when the validator has run; surface defensively.
1357 return "inconclusive", f"unknown_direction:{direction!r}"
1359 evidence = {
1360 "direction": direction,
1361 "points": points,
1362 "first": first,
1363 "last": last,
1364 "delta": delta,
1365 }
1366 return ("met" if met else "unmet"), evidence
1368 @staticmethod
1369 def _evaluate_event(criterion: Criterion, observation: dict[str, Any]) -> tuple[str, Any]:
1370 """Scan ``observation['events']`` for the named event."""
1371 if "events" not in observation:
1372 return "inconclusive", "events_field_missing"
1373 events = observation.get("events")
1374 if not isinstance(events, list):
1375 return "inconclusive", "events_field_not_a_list"
1376 target = criterion.get("event_name")
1377 for event in events:
1378 if isinstance(event, dict) and event.get("event_name") == target:
1379 return "met", event
1380 return "unmet", None
1382 @staticmethod
1383 def _evaluate_predicate(criterion: Criterion, observation: dict[str, Any]) -> tuple[str, Any]:
1384 """Run the cached parsed AST against the Observation.
1386 The validator caches an :class:`ast.Expression` under
1387 ``_parsed_ast`` when ``validate_criteria`` runs in-process.
1388 Persistence layers strip that key before serialisation (the
1389 AST node is not JSON-safe), so a session reloaded from disk
1390 carries criteria *without* ``_parsed_ast``. We detect the
1391 missing cache and re-parse on demand from ``expression``;
1392 the parser was already accepted at validation time so a
1393 re-parse is a pure no-op short of re-reading the source. A
1394 post-load tampering with ``expression`` would cause
1395 :class:`PredicateRejected`, which we surface as a structured
1396 ``inconclusive`` evidence string rather than letting it
1397 propagate.
1398 """
1399 parsed = criterion.get("_parsed_ast")
1400 if parsed is None:
1401 expression = criterion.get("expression")
1402 if not isinstance(expression, str) or not expression:
1403 return "inconclusive", "predicate_ast_not_cached"
1404 try:
1405 parsed = parse_predicate(expression)
1406 except PredicateRejected as exc:
1407 return "inconclusive", f"predicate_rejected_post_load: {exc.reason}"
1408 try:
1409 value = evaluate_predicate(parsed, observation)
1410 except Exception as exc:
1411 return "inconclusive", f"{type(exc).__name__}: {exc}"
1412 return ("met" if value else "unmet"), value
1414 @staticmethod
1415 def _evaluate_tool_call_succeeded(
1416 criterion: Criterion,
1417 observation: dict[str, Any],
1418 session: SessionState | dict[str, Any] | None = None,
1419 ) -> tuple[str, Any]:
1420 """Count successful tool_results matching the named tool across all iterations.
1422 The criterion is met when at least ``min_count`` (default 1)
1423 entries across the **entire session history** (all prior
1424 iterations' observations plus the current one) have
1425 ``tool_name`` equal to the criterion's ``tool_name`` and
1426 ``_status`` equal to ``"ok"``. This cumulative evaluation
1427 means a multi-tool goal where each tool runs in a different
1428 iteration can still converge — the criterion remembers that
1429 the tool succeeded in a prior iteration even if the current
1430 iteration called a different tool.
1432 Returns a ``(status, evidence)`` tuple where ``evidence`` is
1433 a structured dict so the audit log shows the match shape.
1434 """
1435 target_tool = criterion.get("tool_name")
1436 min_count = criterion.get("min_count", 1)
1438 # Collect tool_results from all prior iterations + the current observation.
1439 all_results: list[dict[str, Any]] = []
1441 # Prior iterations' observations (when session is provided).
1442 if session is not None:
1443 for prior_iteration in session.get("iterations") or []: 1443 ↛ 1444line 1443 didn't jump to line 1444 because the loop on line 1443 never started
1444 prior_obs = prior_iteration.get("observation") or {}
1445 prior_results = prior_obs.get("tool_results")
1446 if isinstance(prior_results, list):
1447 for r in prior_results:
1448 if isinstance(r, dict):
1449 all_results.append(r)
1451 # Current iteration's observation.
1452 current_results = observation.get("tool_results")
1453 if isinstance(current_results, list):
1454 for r in current_results:
1455 if isinstance(r, dict):
1456 all_results.append(r)
1458 if not all_results:
1459 return "inconclusive", "tool_results_field_missing"
1461 successful = [
1462 r for r in all_results if r.get("tool_name") == target_tool and r.get("_status") == "ok"
1463 ]
1464 evidence = {
1465 "tool_name": target_tool,
1466 "min_count": min_count,
1467 "successful_call_count": len(successful),
1468 }
1469 if len(successful) >= min_count:
1470 return "met", evidence
1471 return "unmet", evidence
1473 # ------------------------------------------------------------------ #
1474 # Phase 5 — decide
1475 # ------------------------------------------------------------------ #
1477 async def _decide_phase(
1478 self, session: SessionState, record: IterationRecord
1479 ) -> tuple[VerdictLabel, VerdictReason]:
1480 """Run the deterministic verdict cascade and stamp the record."""
1482 async def body() -> tuple[VerdictLabel, VerdictReason]:
1483 now_value = self.now()
1484 verdict, reason = decide.decide_verdict(session, record, now_value)
1485 checkpoint_evaluated = reason != "cadence_skip"
1486 record["checkpoint_evaluated"] = checkpoint_evaluated
1487 if verdict == "adjust":
1488 record["revision_rationale"] = decide.build_revision_rationale_template(
1489 session, record
1490 )
1491 if checkpoint_evaluated:
1492 # ``last_checkpoint_at`` anchors the every_t_seconds
1493 # cadence; only real (non-skip) verdicts advance it.
1494 mark_checkpoint(session, now_value)
1495 return verdict, reason
1497 return cast(
1498 tuple[VerdictLabel, VerdictReason],
1499 await self._run_phase(session, record, _DECIDE, body),
1500 )
1502 # ------------------------------------------------------------------ #
1503 # Post-iteration housekeeping
1504 # ------------------------------------------------------------------ #
1506 def _update_session_post_iteration(
1507 self, session: SessionState, record: IterationRecord
1508 ) -> None:
1509 """Advance or reset the no-progress counter.
1511 Counter semantics (matching the Decide_Phase's stagnation cap):
1513 * Synthetic ``cadence_skip`` iterations leave the counter
1514 alone — a session whose cadence is ``every_n_iterations``
1515 must not be able to reach ``stagnation_threshold`` purely
1516 because most iterations skip the criteria check.
1517 * On evaluated iterations, compute the per-criterion
1518 improvement against the immediately prior evaluated
1519 iteration. A criterion improved iff its prior status was
1520 ``unmet`` or ``inconclusive`` AND its current status is
1521 ``met``. Any improvement resets the counter to 0; otherwise
1522 the counter increments by 1.
1524 ``record`` has already been appended to
1525 ``session["iterations"]`` by the caller, so the prior
1526 iteration is at index ``-2``.
1527 """
1528 if not record.get("checkpoint_evaluated"):
1529 return
1531 prior_eval = self._previous_evaluated_iteration(session)
1532 if prior_eval is None:
1533 # No prior evaluated iteration: the loop has nothing to
1534 # measure improvement against. Treat as no-improvement
1535 # rather than a forced reset, so the stagnation counter
1536 # tracks "how long since we made measurable progress"
1537 # uniformly across the run.
1538 session["no_progress_counter"] = (session.get("no_progress_counter", 0) or 0) + 1
1539 return
1541 if self._criteria_improved(prior_eval, record["criteria_evaluation"]):
1542 session["no_progress_counter"] = 0
1543 else:
1544 session["no_progress_counter"] = (session.get("no_progress_counter", 0) or 0) + 1
1546 @staticmethod
1547 def _previous_evaluated_iteration(
1548 session: SessionState,
1549 ) -> list[CriterionResult] | None:
1550 """Return the criteria evaluation of the most recent evaluated iteration.
1552 "Evaluated" here means ``checkpoint_evaluated=True``. Skipping
1553 cadence-skip iterations is what makes the no-progress counter
1554 immune to the cadence configuration: it only ever measures
1555 movement between *real* checkpoints. The current iteration is
1556 already appended to ``session["iterations"]`` by the caller,
1557 so we look strictly before it.
1558 """
1559 iterations = session.get("iterations") or []
1560 # Skip the just-appended current iteration (last index).
1561 for prior in reversed(iterations[:-1]):
1562 if prior.get("checkpoint_evaluated"):
1563 return list(prior.get("criteria_evaluation") or [])
1564 return None
1566 @staticmethod
1567 def _criteria_improved(
1568 prior: list[CriterionResult],
1569 current: list[CriterionResult],
1570 ) -> bool:
1571 """Return True iff any criterion went from not-met to met."""
1572 prior_status = {
1573 result["criterion_id"]: result["status"]
1574 for result in prior
1575 if isinstance(result, dict) and "criterion_id" in result
1576 }
1577 for result in current:
1578 if not isinstance(result, dict): 1578 ↛ 1579line 1578 didn't jump to line 1579 because the condition on line 1578 was never true
1579 continue
1580 current_status = result.get("status")
1581 if current_status != "met":
1582 continue
1583 prior_value = prior_status.get(result.get("criterion_id"))
1584 if prior_value in ("unmet", "inconclusive", None):
1585 # ``None`` covers a criterion that did not appear in
1586 # the prior evaluation — treating it as "not met
1587 # before" is consistent with first-time-met being an
1588 # improvement.
1589 return True
1590 return False
1592 # ------------------------------------------------------------------ #
1593 # Terminal-verdict finalisation
1594 # ------------------------------------------------------------------ #
1596 async def _finalise_terminal_session(
1597 self,
1598 session: SessionState,
1599 record: IterationRecord,
1600 verdict: VerdictLabel,
1601 reason: VerdictReason,
1602 ) -> None:
1603 """Transition the session to its terminal status and write the report.
1605 Called by ``run_iteration`` only when the verdict is in
1606 :data:`TERMINAL_VERDICTS`. The status mapping is fixed:
1607 ``complete`` → ``completed``, ``terminate`` → ``terminated``.
1608 ``ended_at`` is anchored on the iteration's own ``ended_at``
1609 so the session's lifecycle window matches the last persisted
1610 iteration's window without an extra clock read.
1612 When :attr:`final_lessons_callable` is wired, the engine awaits
1613 it once to fetch a ``{"lessons": ..., "recommended_followups":
1614 ...}`` overlay and synthesises a tiny synchronous sampler
1615 closure that returns the pre-fetched overlay; the closure is
1616 then handed to :func:`mcp.mission.final_report.write_final_report`
1617 which keeps its existing sync-callable contract. This pre-fetch
1618 bridge is needed because the production helper
1619 (:func:`mcp.mission.sampling.maybe_sample_final_lessons`) is
1620 async while ``write_final_report`` is sync.
1621 """
1622 if verdict == "complete":
1623 session["status"] = "completed"
1624 else: # verdict == "terminate"
1625 session["status"] = "terminated"
1626 session["ended_at"] = record["ended_at"]
1627 session["final_verdict"] = verdict
1628 # Optional sampling overlay for the lessons / followups fields.
1629 # Pre-fetch so the (sync) report writer never has to await.
1630 overlay = await self._maybe_sample_final_lessons(session, verdict, reason)
1631 sampler: Callable[..., dict[str, Any] | None] | None
1632 if overlay is not None:
1634 def _pre_fetched_sampler(
1635 _session: SessionState,
1636 _verdict: VerdictLabel,
1637 _reason: VerdictReason,
1638 ) -> dict[str, Any] | None:
1639 return overlay
1641 sampler = _pre_fetched_sampler
1642 else:
1643 sampler = None
1644 # The Final_Report is the durable exit artifact. We write it
1645 # via the report helper so the persistence path (filesystem
1646 # sibling vs. embedded-on-session for non-filesystem backends)
1647 # is owned by one module.
1648 final_report.write_final_report(self.backend, session, verdict, reason, sampler=sampler)
1650 async def _maybe_sample_final_lessons(
1651 self,
1652 session: SessionState,
1653 verdict: VerdictLabel,
1654 reason: VerdictReason,
1655 ) -> dict[str, Any] | None:
1656 """Fetch the optional Final_Report ``lessons`` / ``followups`` overlay.
1658 Calls :attr:`final_lessons_callable` once (when wired and when
1659 the session opted into sampling) and adapts the return value
1660 into the ``{"lessons": str, "recommended_followups": list[str]}``
1661 shape that
1662 :func:`mcp.mission.final_report.write_final_report` expects.
1664 Three return shapes are recognised:
1666 * :class:`mcp.mission.sampling.SamplingUsed` — production path.
1667 ``parsed["lessons"]`` is a list of strings; the engine joins
1668 them with double newlines so the report's ``lessons`` field
1669 stays a single string. ``parsed["recommended_followups"]`` is
1670 forwarded as a list verbatim.
1671 * Raw ``dict`` (legacy / test pattern) — passed straight
1672 through. The downstream sampler-overlay code in
1673 ``write_final_report`` already validates and silently drops
1674 malformed fields.
1675 * Anything else (including :class:`SamplingFallback`,
1676 ``None``, exceptions) — returns ``None`` so the
1677 deterministic templates from
1678 :func:`mcp.mission.final_report.build_deterministic_report`
1679 stand on their own.
1681 The method swallows any exception raised by the callable
1682 because the Final_Report is the durable exit artifact and a
1683 flaky sampler must not block it from landing.
1684 """
1685 del verdict, reason # forwarded only for symmetry with the legacy Sampler shape
1686 if self.final_lessons_callable is None:
1687 return None
1688 if not session.get("use_sampling"): 1688 ↛ 1689line 1688 didn't jump to line 1689 because the condition on line 1688 was never true
1689 return None
1690 try:
1691 result = await self.final_lessons_callable(session=session)
1692 except Exception:
1693 return None
1694 if isinstance(result, SamplingUsed): 1694 ↛ 1706line 1694 didn't jump to line 1706 because the condition on line 1694 was always true
1695 lessons = result.parsed.get("lessons")
1696 followups = result.parsed.get("recommended_followups")
1697 overlay: dict[str, Any] = {}
1698 if isinstance(lessons, list) and all(isinstance(item, str) for item in lessons): 1698 ↛ 1703line 1698 didn't jump to line 1703 because the condition on line 1698 was always true
1699 # write_final_report expects ``lessons`` as a single
1700 # string; join with blank lines so multi-bullet output
1701 # from the model stays readable on render.
1702 overlay["lessons"] = "\n\n".join(lessons)
1703 if isinstance(followups, list) and all(isinstance(item, str) for item in followups): 1703 ↛ 1705line 1703 didn't jump to line 1705 because the condition on line 1703 was always true
1704 overlay["recommended_followups"] = list(followups)
1705 return overlay or None
1706 if isinstance(result, SamplingFallback):
1707 return None
1708 if isinstance(result, dict):
1709 # Legacy raw-dict path — let the downstream overlay
1710 # validator do the structural check.
1711 return result
1712 return None
1715# ---------------------------------------------------------------------------
1716# Comparison helper
1717# ---------------------------------------------------------------------------
1720def _compare_numbers(value: float, op: str, target: float) -> bool:
1721 """Apply one of the six allowed numeric comparison operators."""
1722 if op == "<":
1723 return value < target
1724 if op == "<=":
1725 return value <= target
1726 if op == ">":
1727 return value > target
1728 if op == ">=":
1729 return value >= target
1730 if op == "==":
1731 return value == target
1732 if op == "!=":
1733 return value != target
1734 raise ValueError(f"unknown comparison operator: {op!r}")