Coverage for mcp/mission/engine.py: 89%

572 statements  

« prev     ^ index     » next       coverage.py v7.14.1, created at 2026-06-15 15:07 +0000

1"""Five-phase iteration loop driver for the Mission goal-directed loop. 

2 

3The :class:`MissionEngine` owns one ``run_iteration`` lifecycle per call: it 

4loads the persisted session, walks the iteration through propose → execute 

5→ observe → evaluate → decide, persists the resulting record, and writes a 

6Final_Report when the verdict is terminal. Every external dependency is 

7injected at construction time so unit tests can supply mocks for the tool 

8dispatcher, the sampling callable, and the script sandbox runner. 

9 

10Why a class rather than a free function? Two reasons. 

11 

12* Each phase needs the same handful of dependencies (the backend, the 

13 tool dispatcher, the cost-estimator map, the clock). Threading them 

14 through every method as positional arguments would be tedious and 

15 error-prone; the dataclass shape gives every phase one place to look 

16 for them. 

17* A test that exercises a single phase in isolation needs to construct 

18 a ``MissionEngine`` with stubbed dependencies and call the private 

19 method directly. Having the dependencies on the instance — rather 

20 than as module-level singletons — keeps the engine pure and free of 

21 process-global state. 

22 

23Phase contract: 

24 

25* Each ``_*_phase`` method is wrapped in a try/finally that emits 

26 exactly one ``audit.emit_phase_event`` regardless of whether the body 

27 succeeded or raised. The matching :class:`PhaseRecord` is appended to 

28 ``record["phases"]`` in the same finally block, so a failed phase 

29 still produces a structured record on the iteration. 

30* Any phase that raises propagates the exception out of 

31 ``run_iteration`` after the engine marks the session as ``failed``, 

32 appends the partial iteration, and persists. Subsequent calls to 

33 ``run_iteration`` on a ``failed`` session refuse with 

34 ``session_failed``. 

35 

36Determinism: only the Decide_Phase consults a clock, and it does so by 

37calling ``self.now()`` exactly once per call so the value is observable 

38and pinnable from tests. The Propose_Phase's deterministic fallback uses 

39no clock and no random source. The Execute_Phase reads the clock for 

40the per-phase ``started_at`` / ``ended_at`` timestamps but its outputs 

41(the tool-call records) do not depend on those values. 

42 

43The ``Context`` type is from FastMCP and brings a heavy dependency tree 

44(MCP transport, ``contextvars``, etc.) that unit tests do not need. We 

45type ``ctx`` as ``Any | None`` so the engine module imports cleanly in 

46isolation; the production wiring threads a real :class:`fastmcp.Context` 

47through the dispatcher and sampler callables, where its concrete type 

48matters. This is the same trade-off the existing ``mcp/tools/*.py`` 

49modules make for tools that take an injected context. 

50""" 

51 

52from __future__ import annotations 

53 

54import contextlib 

55from collections.abc import Awaitable, Callable, Mapping 

56from dataclasses import dataclass, field 

57from datetime import UTC, datetime 

58from typing import Any, cast 

59 

60from . import audit, decide, final_report 

61from .checkpoints import mark_checkpoint 

62from .predicate import PredicateRejected, evaluate_predicate, parse_predicate 

63from .sampling import SamplingFallback, SamplingUsed 

64from .types import ( 

65 TERMINAL_STATES, 

66 TERMINAL_VERDICTS, 

67 Criterion, 

68 CriterionResult, 

69 IterationRecord, 

70 Observation, 

71 PhaseRecord, 

72 SessionState, 

73 Strategy, 

74 ToolCallRecord, 

75 VerdictLabel, 

76 VerdictReason, 

77) 

78 

79# <pyflowchart-code-diagram> BEGIN - auto-inserted, do not edit 

80# Flowchart(s) generated from this file: 

81# * ``MissionEngine.run_iteration`` -> ``diagrams/code_diagrams/mcp/mission/engine.MissionEngine_run_iteration.html`` 

82# (PNG: ``diagrams/code_diagrams/mcp/mission/engine.MissionEngine_run_iteration.png``) 

83# Regenerate with ``python diagrams/code_diagrams/generate.py``. 

84# <pyflowchart-code-diagram> END 

85 

86 

87__all__ = [ 

88 "MissionEngine", 

89 "MissionEngineError", 

90] 

91 

92 

93# --------------------------------------------------------------------------- 

94# Error 

95# --------------------------------------------------------------------------- 

96 

97 

98class MissionEngineError(Exception): 

99 """Raised by the engine for stable, code-keyed lifecycle errors. 

100 

101 The :attr:`code` attribute carries a short stable string (e.g. 

102 ``"session_not_found"``, ``"session_terminal"``, ``"session_paused"``, 

103 ``"session_failed"``) that the MCP tool wrappers and the CLI render 

104 as a structured tool error. The exception's string form falls back 

105 to ``code`` so logs always show something meaningful even when the 

106 caller does not pull the attribute out explicitly. 

107 """ 

108 

109 def __init__(self, code: str, *, message: str | None = None) -> None: 

110 self.code: str = code 

111 super().__init__(message if message is not None else code) 

112 

113 

114# --------------------------------------------------------------------------- 

115# Phase-name constants (typed) 

116# --------------------------------------------------------------------------- 

117 

118# Centralised so the audit emitter and PhaseRecord constructor share one 

119# spelling for each phase. Matches the ``Literal`` shape declared on 

120# :class:`PhaseRecord` and on :func:`audit.emit_phase_event`. 

121_PROPOSE = "propose" 

122_EXECUTE = "execute" 

123_OBSERVE = "observe" 

124_EVALUATE = "evaluate" 

125_DECIDE = "decide" 

126 

127 

128# --------------------------------------------------------------------------- 

129# Defaults 

130# --------------------------------------------------------------------------- 

131 

132 

133def _default_now() -> Callable[[], datetime]: 

134 """Return a clock callable that yields the current UTC datetime. 

135 

136 Used as the ``default_factory`` for :attr:`MissionEngine.now`. Wrapping 

137 the lambda in a function keeps ``mypy --strict`` happy with the 

138 ``Callable[[], datetime]`` annotation while preserving the 

139 "constructed once per engine, called many times" semantics. 

140 """ 

141 return lambda: datetime.now(UTC) 

142 

143 

144# --------------------------------------------------------------------------- 

145# Engine 

146# --------------------------------------------------------------------------- 

147 

148 

149# Type aliases for the injected callables. Loose on purpose: the precise 

150# shapes settle in later slices (sampling in slice 6, sandbox in slice 5). 

151ToolDispatcher = Callable[[str, dict[str, Any], Any], Awaitable[Any]] 

152SamplingCallable = Callable[..., Awaitable[Any]] 

153SandboxRunner = Callable[ 

154 [str, Any, ToolDispatcher], 

155 Awaitable[tuple[dict[str, Any], list[ToolCallRecord]]], 

156] 

157 

158 

159@dataclass 

160class MissionEngine: 

161 """Driver for the Mission five-phase iteration loop. 

162 

163 Construction takes every external dependency the engine needs: 

164 

165 * ``backend`` — the persistence layer (filesystem, DynamoDB, …). 

166 Engine never reaches outside this protocol for state I/O. 

167 * ``tool_dispatcher`` — async callable that invokes one MCP tool. 

168 Signature ``(tool_name, args, ctx) -> result``. The engine routes 

169 every direct ``tool_calls`` invocation through this callable so 

170 tests can swap in a stub that returns canned results. 

171 * ``sampling_callable`` — optional async callable that produces an 

172 LLM-derived next Strategy when the prior Verdict was ``adjust`` 

173 and the session has ``use_sampling=true``. Loose signature; slice 

174 6 finalises it. ``None`` (or any failure inside it) routes the 

175 engine to the deterministic fallback strategy. 

176 * ``sandbox_runner`` — optional async callable that runs a scripted 

177 Strategy in the Mission sandbox. Signature ``(script, ctx, 

178 tool_dispatcher) -> (observation_dict, script_call_log)``. ``None`` 

179 means the engine refuses any scripted strategy with a clear 

180 error instead of silently executing operator-supplied code. 

181 * ``now`` — injectable clock. Defaults to a UTC clock; tests pin it 

182 so deterministic verdicts (the Decide_Phase consults the clock 

183 for budget caps and for the cadence resolver) are reproducible. 

184 """ 

185 

186 backend: Any 

187 tool_dispatcher: ToolDispatcher 

188 sampling_callable: SamplingCallable | None 

189 sandbox_runner: SandboxRunner | None 

190 now: Callable[[], datetime] = field(default_factory=_default_now) 

191 # Optional async callable that drives the Final_Report's 

192 # ``lessons`` and ``recommended_followups`` overlay. Loose typing 

193 # mirrors :attr:`sampling_callable` so legacy tests that wire a 

194 # plain async stub keep working. Production wiring binds it to a 

195 # closure over :func:`mcp.mission.sampling.maybe_sample_final_lessons` 

196 # — see :meth:`_maybe_sample_final_lessons`. ``None`` (the default) 

197 # disables the overlay; the deterministic templates from 

198 # :func:`mcp.mission.final_report.build_deterministic_report` stand 

199 # on their own in that case. 

200 final_lessons_callable: SamplingCallable | None = None 

201 

202 # ------------------------------------------------------------------ # 

203 # Public surface 

204 # ------------------------------------------------------------------ # 

205 

206 async def run_iteration( 

207 self, 

208 session_id: str, 

209 ctx: Any | None = None, 

210 ) -> IterationRecord: 

211 """Run one full iteration for ``session_id`` and return its record. 

212 

213 Lifecycle: 

214 

215 1. Load the session; raise ``session_not_found`` when missing. 

216 2. Refuse a session in any terminal state (``failed`` → 

217 ``session_failed``; ``completed`` / ``terminated`` → 

218 ``session_terminal``) or in ``paused`` (``session_paused``). 

219 3. Transition ``pending → running`` on the very first iteration 

220 and stamp ``session["started_at"]``. 

221 4. Allocate a fresh :class:`IterationRecord` and run the five 

222 phases in order. Each phase emits exactly one 

223 ``audit.emit_phase_event`` regardless of outcome. 

224 5. On any phase exception: append the partial iteration to 

225 ``session["iterations"]``, mark the session ``failed``, save, 

226 and re-raise. The session JSON stays inspectable. 

227 6. On success: stamp the verdict on the iteration, append it, 

228 update the no-progress counter, save the session. 

229 7. On terminal verdict (``complete`` / ``terminate``): transition 

230 the session status, write the Final_Report, save again. 

231 8. Emit one ``audit.emit_verdict_event`` regardless of outcome. 

232 9. Return the iteration record. 

233 """ 

234 session = self.backend.load_session(session_id) 

235 if session is None: 235 ↛ 236line 235 didn't jump to line 236 because the condition on line 235 was never true

236 raise MissionEngineError("session_not_found") 

237 

238 # The terminal-state check distinguishes ``failed`` from the 

239 # other terminal states because callers (and the tool-error 

240 # table) treat them differently — a failed session needs manual 

241 # inspection via ``mission_history``, while a completed / 

242 # terminated session is simply done. 

243 status = session["status"] 

244 if status == "failed": 

245 raise MissionEngineError("session_failed") 

246 if status in TERMINAL_STATES: 

247 raise MissionEngineError("session_terminal") 

248 if status == "paused": 

249 raise MissionEngineError("session_paused") 

250 

251 iteration_start = self.now() 

252 

253 # First-iteration transition. ``started_at`` is the wall-clock 

254 # anchor for the wall-clock-budget computation in Decide_Phase 

255 # so we set it exactly once, on the pending → running edge. 

256 if session["status"] == "pending": 

257 session["status"] = "running" 

258 session["started_at"] = iteration_start.isoformat() 

259 

260 iteration_index = len(session["iterations"]) 

261 record = self._make_iteration_record(iteration_index, iteration_start) 

262 

263 try: 

264 strategy = await self._propose_phase(session, ctx, record) 

265 executed_calls = await self._execute_phase(session, strategy, ctx, record) 

266 await self._observe_phase(session, strategy, executed_calls, record) 

267 await self._evaluate_phase(session, record) 

268 verdict, reason = await self._decide_phase(session, record) 

269 except Exception: 

270 # Persist a failure record so the session JSON remains a 

271 # complete history of everything the loop attempted. The 

272 # verdict stays at its placeholder value because no 

273 # Decide_Phase actually fired; consumers detect the failure 

274 # through ``session["status"] == "failed"`` and the failed 

275 # phase entry in ``record["phases"]``. 

276 record["ended_at"] = self.now().isoformat() 

277 session["iterations"].append(record) 

278 session["status"] = "failed" 

279 session["ended_at"] = record["ended_at"] 

280 with contextlib.suppress(Exception): 

281 # A save failure during a failure path must not shadow 

282 # the original phase exception — the operator's first 

283 # need is to see what actually went wrong, not what 

284 # went wrong while reporting what went wrong. 

285 self.backend.save_session(session) 

286 raise 

287 

288 # Stamp the verdict on the iteration record before append; the 

289 # decide cascade inspects ``len(session["iterations"])`` (i.e. 

290 # iterations *before* the current one) so we deliberately 

291 # append after Decide_Phase rather than before. 

292 record["verdict"] = verdict 

293 record["verdict_reason"] = reason 

294 record["ended_at"] = self.now().isoformat() 

295 session["iterations"].append(record) 

296 

297 self._update_session_post_iteration(session, record) 

298 self.backend.save_session(session) 

299 

300 if verdict in TERMINAL_VERDICTS: 

301 await self._finalise_terminal_session(session, record, verdict, reason) 

302 self.backend.save_session(session) 

303 

304 # One verdict event per iteration regardless of terminal vs 

305 # in-progress, so audit consumers see a uniform stream. 

306 audit.emit_verdict_event( 

307 session_id=session_id, 

308 iteration_index=iteration_index, 

309 verdict=verdict, 

310 verdict_reason=reason, 

311 revision_rationale=record.get("revision_rationale"), 

312 ) 

313 

314 return record 

315 

316 # ------------------------------------------------------------------ # 

317 # Iteration record bootstrap 

318 # ------------------------------------------------------------------ # 

319 

320 @staticmethod 

321 def _make_iteration_record(iteration_index: int, started_at: datetime) -> IterationRecord: 

322 """Build the empty :class:`IterationRecord` for a new iteration. 

323 

324 Verdict and reason are placeholder values; the Decide_Phase 

325 overwrites them before the record is appended. ``ended_at`` 

326 is set to the empty string and rewritten just before append 

327 so the persisted shape always carries an ISO-8601 timestamp. 

328 """ 

329 record: IterationRecord = { 

330 "iteration_index": iteration_index, 

331 "started_at": started_at.isoformat(), 

332 "ended_at": "", 

333 "phases": [], 

334 "strategy": cast(Strategy, {}), 

335 "observation": cast(Observation, {}), 

336 "criteria_evaluation": [], 

337 "verdict": "continue", 

338 "verdict_reason": "in_progress", 

339 "checkpoint_evaluated": False, 

340 } 

341 return record 

342 

343 # ------------------------------------------------------------------ # 

344 # Phase wrapper 

345 # ------------------------------------------------------------------ # 

346 

347 async def _run_phase( 

348 self, 

349 session: SessionState, 

350 record: IterationRecord, 

351 phase_name: str, 

352 body: Callable[[], Awaitable[Any]], 

353 ) -> Any: 

354 """Execute ``body`` with phase audit + record bookkeeping. 

355 

356 Centralises the try/finally that every phase needs: 

357 

358 * stamps ``started_at`` from the engine clock, 

359 * runs the body, 

360 * stamps ``ended_at`` from the engine clock again, 

361 * appends a :class:`PhaseRecord` to ``record["phases"]``, 

362 * emits exactly one ``audit.emit_phase_event``. 

363 

364 On exception, the finally block records ``status="failed"`` 

365 with the exception's name + message (truncated to 200 chars to 

366 match the audit module's existing convention) and re-raises so 

367 ``run_iteration`` can drive the failure path. 

368 """ 

369 started_at = self.now().isoformat() 

370 status: str = "succeeded" 

371 error_message: str | None = None 

372 try: 

373 return await body() 

374 except Exception as exc: 

375 status = "failed" 

376 error_message = f"{type(exc).__name__}: {exc}"[:200] 

377 raise 

378 finally: 

379 ended_at = self.now().isoformat() 

380 phase_record: PhaseRecord = { 

381 "phase": cast(Any, phase_name), 

382 "status": cast(Any, status), 

383 "started_at": started_at, 

384 "ended_at": ended_at, 

385 } 

386 if error_message: 

387 phase_record["error_message"] = error_message 

388 record["phases"].append(phase_record) 

389 audit.emit_phase_event( 

390 session_id=session["session_id"], 

391 iteration_index=record["iteration_index"], 

392 phase=cast(Any, phase_name), 

393 status=cast(Any, status), 

394 started_at=started_at, 

395 ended_at=ended_at, 

396 error_message=error_message, 

397 ) 

398 

399 # ------------------------------------------------------------------ # 

400 # Phase 1 — propose 

401 # ------------------------------------------------------------------ # 

402 

403 async def _propose_phase( 

404 self, 

405 session: SessionState, 

406 ctx: Any | None, 

407 record: IterationRecord, 

408 ) -> Strategy: 

409 """Build the Strategy for this iteration. 

410 

411 Two paths: 

412 

413 * **Sampling path** — when the prior verdict was ``adjust`` AND 

414 the session has ``use_sampling=true`` AND a sampling callable 

415 is wired, await the callable and adopt its return value as 

416 the Strategy. Any exception from the callable, or any return 

417 shape that does not look like a Strategy, falls through to 

418 the deterministic path. Slice 6 will replace this with a 

419 richer prompt-builder + validator. 

420 * **Deterministic path** — re-run the most recent successful 

421 tool call (using the same args) when one exists, otherwise 

422 invoke the first tool in the session's allowlist with empty 

423 args. Pure: no clock, no randomness, no external I/O. The 

424 resulting Strategy is always a single ``tool_calls`` entry. 

425 

426 The chosen Strategy is stored on ``record["strategy"]`` and 

427 returned for the Execute_Phase to consume. 

428 """ 

429 

430 async def body() -> Strategy: 

431 strategy = await self._build_strategy(session, ctx, record) 

432 record["strategy"] = strategy 

433 return strategy 

434 

435 return cast(Strategy, await self._run_phase(session, record, _PROPOSE, body)) 

436 

437 async def _build_strategy( 

438 self, session: SessionState, ctx: Any | None, record: IterationRecord 

439 ) -> Strategy: 

440 """Pick the Propose_Phase Strategy via the sampling-or-fallback rule.""" 

441 if self._should_attempt_sampling(session): 

442 sampled = await self._try_sample_strategy(session, ctx, record) 

443 if sampled is not None: 

444 return sampled 

445 return self._deterministic_strategy(session) 

446 

447 def _should_attempt_sampling(self, session: SessionState) -> bool: 

448 """True iff the prior verdict was ``adjust`` and sampling is wired.""" 

449 if self.sampling_callable is None: 

450 return False 

451 if not session.get("use_sampling"): 451 ↛ 452line 451 didn't jump to line 452 because the condition on line 451 was never true

452 return False 

453 iterations = session.get("iterations") or [] 

454 if not iterations: 454 ↛ 455line 454 didn't jump to line 455 because the condition on line 454 was never true

455 return False 

456 return iterations[-1].get("verdict") == "adjust" 

457 

458 async def _try_sample_strategy( 

459 self, session: SessionState, ctx: Any | None, record: IterationRecord 

460 ) -> Strategy | None: 

461 """Call the sampling callable and adopt its return as a Strategy. 

462 

463 Returns ``None`` on any failure (exception, non-dict return, dict 

464 missing both ``tool_calls`` and ``script``) so the caller can 

465 fall back to the deterministic strategy. Shape validation here 

466 is intentionally tight: the engine cannot run a ``script`` 

467 strategy without a sandbox, and we never want a malformed sampler 

468 result to cascade into Execute_Phase as an opaque error. 

469 

470 Three return shapes are recognised: 

471 

472 * :class:`mcp.mission.sampling.SamplingUsed` — the production 

473 orchestration helper's accepted-output type. The Strategy is 

474 read from ``parsed["next_strategy"]``; the sampler's 

475 ``revision_rationale`` is stamped on the iteration ``record`` 

476 so :func:`mcp.mission.audit.emit_verdict_event` surfaces it 

477 in the next iteration's audit trail. 

478 * :class:`mcp.mission.sampling.SamplingFallback` — the 

479 orchestration helper's rejection / fallback type. The engine 

480 treats this exactly like a missing return: ``None`` so the 

481 deterministic-fallback path runs. The fallback's own 

482 rationale stays on the audit event the helper already emitted; 

483 we deliberately do *not* override the engine's deterministic 

484 rationale-template here so the verdict path stays fully 

485 deterministic when sampling rejects. 

486 * Raw ``dict`` (the legacy / test pattern) — kept verbatim so 

487 existing engine tests that pass simple async lambdas returning 

488 ``{"tool_calls": [...]}`` continue to work without churn. 

489 """ 

490 assert self.sampling_callable is not None # narrowed by caller 

491 try: 

492 result = await self.sampling_callable(session=session, ctx=ctx) 

493 except Exception: 

494 return None 

495 

496 # Phase 6.7 result types — the orchestration helper returns 

497 # either ``SamplingUsed`` (accept) or ``SamplingFallback`` 

498 # (reject). The engine maps them onto its existing 

499 # "Strategy or fall back" surface. 

500 if isinstance(result, SamplingUsed): 

501 next_strategy = result.parsed.get("next_strategy") 

502 if not isinstance(next_strategy, dict): 502 ↛ 503line 502 didn't jump to line 503 because the condition on line 502 was never true

503 return None 

504 self._capture_sampled_rationale(record, result.parsed) 

505 return self._coerce_strategy_dict(next_strategy) 

506 

507 if isinstance(result, SamplingFallback): 

508 # The fallback's own deterministic rationale is already on 

509 # the emitted audit event. The engine routes through its 

510 # own deterministic-fallback path so the verdict path stays 

511 # fully deterministic — returning ``None`` is the signal. 

512 return None 

513 

514 # Legacy raw-dict return — preserved verbatim so older tests 

515 # and callers that wire a simple ``async def: return {...}`` 

516 # stub continue to work unchanged. 

517 if not isinstance(result, dict): 517 ↛ 518line 517 didn't jump to line 518 because the condition on line 517 was never true

518 return None 

519 return self._coerce_strategy_dict(result) 

520 

521 def _coerce_strategy_dict(self, candidate: dict[str, Any]) -> Strategy | None: 

522 """Adopt ``candidate`` as a :class:`Strategy` if it is well-shaped. 

523 

524 Centralises the structural check (exactly one of ``tool_calls`` 

525 or ``script`` populated and well-typed) so both the 

526 :class:`SamplingUsed` path and the legacy raw-dict path land 

527 through one validator. Returns ``None`` for any malformed 

528 shape; callers fall back to the deterministic strategy. 

529 """ 

530 if "tool_calls" in candidate: 

531 tool_calls = candidate["tool_calls"] 

532 if isinstance(tool_calls, list) and tool_calls: 

533 return cast(Strategy, dict(candidate)) 

534 return None 

535 if "script" in candidate: 

536 script = candidate["script"] 

537 if isinstance(script, str) and script: 

538 # The sandbox runner is the only thing that can 

539 # safely execute a script. If it isn't wired, the 

540 # sampled script is unusable — fall back. 

541 if self.sandbox_runner is None: 541 ↛ 543line 541 didn't jump to line 543 because the condition on line 541 was always true

542 return None 

543 return cast(Strategy, dict(candidate)) 

544 return None 

545 return None 

546 

547 @staticmethod 

548 def _capture_sampled_rationale(record: IterationRecord, parsed_payload: dict[str, Any]) -> None: 

549 """Stamp the sampler's ``revision_rationale`` on ``record`` if present. 

550 

551 The advisory model's rationale lives at 

552 ``parsed_payload["revision_rationale"]`` per the 

553 Strategy_Revision schema. Recording it on the iteration record 

554 means :func:`mcp.mission.audit.emit_verdict_event` (which the 

555 engine calls at the end of ``run_iteration`` with 

556 ``record.get("revision_rationale")``) emits the model-derived 

557 text instead of the deterministic template that the 

558 Decide_Phase synthesises for ``adjust`` verdicts. The engine 

559 only ever calls this from the sampling-success path, so a 

560 rejection / fallback never overrides the deterministic 

561 template the Decide_Phase set. 

562 """ 

563 rationale = parsed_payload.get("revision_rationale") 

564 if isinstance(rationale, str) and rationale: 564 ↛ exitline 564 didn't return from function '_capture_sampled_rationale' because the condition on line 564 was always true

565 record["revision_rationale"] = rationale 

566 

567 def _deterministic_strategy(self, session: SessionState) -> Strategy: 

568 """Build the fallback Strategy when sampling is off or unusable. 

569 

570 Re-runs the most recent successful tool call. When the prior 

571 call used empty args and there are unmet criteria, the 

572 widening rule injects a ``query`` parameter derived from the 

573 unmet criterion IDs — this gives catalog-search tools 

574 (``find_examples``, ``find_docs``) a chance to return 

575 relevant content without needing the sampler. When no 

576 successful call exists yet, the first tool in the allowlist 

577 runs with widened args if possible, empty args otherwise. 

578 """ 

579 prior_call = self._find_most_recent_successful_call(session) 

580 if prior_call is not None: 

581 tool_name, args = prior_call 

582 widened = self._widen_args(args, session) 

583 rationale_suffix = " with widened args" if widened != args else " with prior args" 

584 return cast( 

585 Strategy, 

586 { 

587 "tool_calls": [{"tool_name": tool_name, "args": dict(widened)}], 

588 "rationale": f"deterministic fallback: re-run {tool_name}{rationale_suffix}", 

589 }, 

590 ) 

591 allowlist = session.get("tool_allowlist") or [] 

592 if not allowlist: 

593 raise MissionEngineError("propose_no_tool_available") 

594 widened = self._widen_args({}, session) 

595 return cast( 

596 Strategy, 

597 { 

598 "tool_calls": [{"tool_name": allowlist[0], "args": widened}], 

599 "rationale": ( 

600 "deterministic fallback: invoking first allowlisted " 

601 "tool with widened args from unmet criteria" 

602 if widened 

603 else "deterministic fallback: invoking first allowlisted " 

604 "tool with empty args (no prior successful call)" 

605 ), 

606 }, 

607 ) 

608 

609 @staticmethod 

610 def _widen_args(args: dict[str, Any], session: SessionState) -> dict[str, Any]: 

611 """Inject a ``query`` parameter from unmet criteria when args are empty. 

612 

613 The widening rule is intentionally simple: if the args dict 

614 has no ``query`` key (or it's empty), extract keywords from 

615 the IDs of unmet criteria (splitting on ``_``) and join them 

616 as a space-separated query string. This gives catalog-search 

617 tools a chance to return relevant content on the deterministic 

618 path without needing the sampler. 

619 

620 Returns the original args unchanged when: 

621 - ``query`` is already populated (don't override operator intent) 

622 - No unmet criteria exist (nothing to widen toward) 

623 - The session has no iteration history (first call, no eval yet) 

624 """ 

625 # Don't override an existing query. 

626 if args.get("query"): 

627 return args 

628 

629 # Find unmet criteria from the latest iteration's evaluation. 

630 iterations = session.get("iterations") or [] 

631 if not iterations: 

632 return args 

633 latest = iterations[-1] 

634 criteria_eval = latest.get("criteria_evaluation") or [] 

635 unmet_ids: list[str] = [] 

636 for result in criteria_eval: 

637 if isinstance(result, dict) and result.get("status") == "unmet": 

638 cid = result.get("criterion_id", "") 

639 if cid: 639 ↛ 636line 639 didn't jump to line 636 because the condition on line 639 was always true

640 unmet_ids.append(cid) 

641 if not unmet_ids: 

642 return args 

643 

644 # Build a query from the unmet criterion IDs by splitting on 

645 # underscores and deduplicating. Skip generic words. 

646 skip_words = {"called", "succeeded", "found", "present", "no", "errors", "occurred"} 

647 keywords: list[str] = [] 

648 seen: set[str] = set() 

649 for cid in unmet_ids: 

650 for word in cid.split("_"): 

651 word_lower = word.lower() 

652 if word_lower not in skip_words and word_lower not in seen and len(word) > 2: 

653 keywords.append(word_lower) 

654 seen.add(word_lower) 

655 if not keywords: 

656 return args 

657 

658 widened = dict(args) 

659 widened["query"] = " ".join(keywords[:5]) # Cap at 5 keywords 

660 return widened 

661 

662 @staticmethod 

663 def _find_most_recent_successful_call( 

664 session: SessionState, 

665 ) -> tuple[str, dict[str, Any]] | None: 

666 """Walk the iteration history backwards for a successful tool call. 

667 

668 Returns ``(tool_name, args)`` for the most recent call whose 

669 :class:`ToolCallRecord` has ``status="ok"``. Looks at both the 

670 recorded executed-call list (for tool_calls strategies — kept 

671 on the iteration's Strategy under ``tool_calls`` after execute) 

672 and the script call log (for scripted strategies). Skips 

673 iterations whose strategy was a script with no successful 

674 embedded tool call. 

675 

676 Returns ``None`` when no prior successful call exists across 

677 the entire history. 

678 """ 

679 for iteration in reversed(session.get("iterations") or []): 

680 # Scripted strategies record their inner calls on 

681 # ``script_call_log``; direct tool_calls strategies don't 

682 # have that key. 

683 for source_key in ("script_call_log",): 

684 log = iteration.get(source_key) 

685 if not log: 685 ↛ 687line 685 didn't jump to line 687 because the condition on line 685 was always true

686 continue 

687 for call in reversed(log): 

688 if ( 

689 isinstance(call, dict) 

690 and call.get("status") == "ok" 

691 and isinstance(call.get("tool_name"), str) 

692 and isinstance(call.get("args"), dict) 

693 ): 

694 return call["tool_name"], dict(call["args"]) 

695 # Direct tool_calls strategies write the executed records 

696 # back onto the Strategy under ``tool_calls`` (each entry 

697 # carrying the same status / args fields the script log 

698 # would carry). This keeps a single lookup path here. 

699 strategy = iteration.get("strategy") or {} 

700 tool_calls = strategy.get("tool_calls") or [] 

701 for tc in reversed(tool_calls): 701 ↛ 679line 701 didn't jump to line 679 because the loop on line 701 didn't complete

702 if ( 702 ↛ 701line 702 didn't jump to line 701 because the condition on line 702 was always true

703 isinstance(tc, dict) 

704 and tc.get("status") == "ok" 

705 and isinstance(tc.get("tool_name"), str) 

706 and isinstance(tc.get("args"), dict) 

707 ): 

708 return tc["tool_name"], dict(tc["args"]) 

709 return None 

710 

711 # ------------------------------------------------------------------ # 

712 # Phase 2 — execute 

713 # ------------------------------------------------------------------ # 

714 

715 async def _execute_phase( 

716 self, 

717 session: SessionState, 

718 strategy: Strategy, 

719 ctx: Any | None, 

720 record: IterationRecord, 

721 ) -> list[ToolCallRecord]: 

722 """Run the Strategy and return the list of executed tool calls. 

723 

724 Two modes: 

725 

726 * **tool_calls** — iterate the strategy's ``tool_calls`` in 

727 order. For each: gate by the session's allowlist, dispatch 

728 via :attr:`tool_dispatcher`, and record the outcome 

729 (``ok`` / ``failed`` / ``skipped_not_allowed``). One failed 

730 call does not abort the iteration — the next call still 

731 runs, the failure lands as one entry, and Observe_Phase 

732 surfaces it under ``errors``. 

733 * **script** — hand the script to :attr:`sandbox_runner` along 

734 with ``ctx`` and the engine's own ``tool_dispatcher`` so the 

735 sandbox can safely invoke allowlisted tools as native 

736 callables. The runner returns ``(observation_dict, 

737 script_call_log)``. The observation is stashed on the record 

738 for Observe_Phase to use directly; the script_call_log is 

739 stored on ``record["script_call_log"]``. 

740 

741 For both modes, every successful call (or each successful 

742 embedded call in script mode) is recorded into the iteration 

743 record for audit and replay. 

744 """ 

745 

746 async def body() -> list[ToolCallRecord]: 

747 if "script" in strategy: 

748 return await self._execute_script(session, strategy, ctx, record) 

749 return await self._execute_tool_calls(session, strategy, ctx, record) 

750 

751 return cast( 

752 list[ToolCallRecord], 

753 await self._run_phase(session, record, _EXECUTE, body), 

754 ) 

755 

756 async def _execute_tool_calls( 

757 self, 

758 session: SessionState, 

759 strategy: Strategy, 

760 ctx: Any | None, 

761 record: IterationRecord, 

762 ) -> list[ToolCallRecord]: 

763 """Run the Strategy's ``tool_calls`` list with allowlist gating.""" 

764 allowlist = set(session.get("tool_allowlist") or []) 

765 executed: list[ToolCallRecord] = [] 

766 for entry in strategy.get("tool_calls", []) or []: 

767 tool_name = entry.get("tool_name") if isinstance(entry, dict) else None 

768 args = entry.get("args") if isinstance(entry, dict) else {} 

769 if not isinstance(tool_name, str) or not tool_name: 769 ↛ 774line 769 didn't jump to line 774 because the condition on line 769 was never true

770 # A malformed tool_calls entry is the operator's bug, 

771 # but failing the entire iteration over one bad entry 

772 # is harsher than the loop semantics demand. Record 

773 # it as a failed call and move on. 

774 executed.append( 

775 { 

776 "tool_name": str(tool_name) if tool_name else "<unknown>", 

777 "args": args if isinstance(args, dict) else {}, 

778 "status": "failed", 

779 "result_summary": None, 

780 "duration_ms": 0, 

781 "error_message": "tool_name_missing_or_invalid", 

782 } 

783 ) 

784 continue 

785 if not isinstance(args, dict): 785 ↛ 786line 785 didn't jump to line 786 because the condition on line 785 was never true

786 args = {} 

787 if tool_name not in allowlist: 

788 executed.append( 

789 { 

790 "tool_name": tool_name, 

791 "args": args, 

792 "status": "skipped_not_allowed", 

793 "result_summary": None, 

794 "duration_ms": 0, 

795 } 

796 ) 

797 continue 

798 executed.append(await self._dispatch_one_call(session, tool_name, args, ctx)) 

799 

800 # Persist the executed records back onto the Strategy so the 

801 # propose-fallback's "most recent successful call" lookup has 

802 # a single source of truth on every persisted iteration. 

803 record["strategy"]["tool_calls"] = [dict(call) for call in executed] 

804 return executed 

805 

806 async def _dispatch_one_call( 

807 self, 

808 session: SessionState, 

809 tool_name: str, 

810 args: dict[str, Any], 

811 ctx: Any | None, 

812 ) -> ToolCallRecord: 

813 """Invoke one allowlisted tool through the dispatcher.""" 

814 del session # accepted for symmetry with the execute path; unused 

815 started = self.now() 

816 try: 

817 result = await self.tool_dispatcher(tool_name, args, ctx) 

818 except Exception as exc: 

819 duration_ms = self._elapsed_ms(started) 

820 return { 

821 "tool_name": tool_name, 

822 "args": args, 

823 "status": "failed", 

824 "result_summary": None, 

825 "duration_ms": duration_ms, 

826 "error_message": f"{type(exc).__name__}: {exc}"[:200], 

827 } 

828 duration_ms = self._elapsed_ms(started) 

829 record: ToolCallRecord = { 

830 "tool_name": tool_name, 

831 "args": args, 

832 "status": "ok", 

833 "result_summary": result, 

834 "duration_ms": duration_ms, 

835 } 

836 return record 

837 

838 async def _execute_script( 

839 self, 

840 session: SessionState, 

841 strategy: Strategy, 

842 ctx: Any | None, 

843 record: IterationRecord, 

844 ) -> list[ToolCallRecord]: 

845 """Run a scripted strategy through the wired sandbox runner. 

846 

847 Three failure modes get translated into stable engine error 

848 codes so the MCP tool wrappers and the CLI render them as 

849 structured rejections rather than opaque tracebacks: 

850 

851 * ``sandbox_runner is None`` — the engine was constructed 

852 without a sandbox. The session-start validator should have 

853 rejected any script-bearing Strategy already (scripts go 

854 through ``validate_script_ast`` before they reach here), 

855 but a sampled-then-injected script could still arrive at 

856 this method. Treat it as a validation failure with the 

857 equivalent of ``script_rejected``. 

858 * :class:`ScriptRejected` from inside the runner — the runner 

859 re-validated the script just before execution and the AST 

860 gate fired. Re-raise as ``script_rejected``. 

861 * :class:`SandboxTerminated` from inside the runner — Monty 

862 killed the script for exceeding a duration / memory cap. 

863 The cap is a true budget cap, not a code-quality failure, 

864 so the engine *swallows* the exception, builds a partial 

865 Observation from whatever the script collected before being 

866 killed, stashes the partial ``script_call_log`` and a 

867 ``sandbox_terminated_reason`` sentinel on the iteration 

868 record, and returns a list of partial calls. The Decide_Phase 

869 reads the sentinel and emits ``("terminate", 

870 "max_wall_clock")`` so the verdict surfaces on the 

871 budget-cap path rather than via a phase failure. 

872 

873 The sandbox module is imported lazily inside this method so 

874 the engine module stays importable on hosts where the 

875 underlying ``pydantic_monty`` dependency is absent (CLI-only 

876 environments, dry-run validators, etc.). When the lazy 

877 import fails, the structured-exception translation is 

878 skipped and the original exception bubbles up to the 

879 ``run_iteration`` failure path — the engine still records a 

880 failed Execute_Phase, which is the right behaviour even 

881 without per-class translation. 

882 """ 

883 del session # accepted for symmetry with the execute path; unused 

884 if self.sandbox_runner is None: 

885 raise MissionEngineError("script_rejected") 

886 script = strategy["script"] 

887 try: 

888 observation_dict, script_call_log = await self.sandbox_runner( 

889 script, ctx, self.tool_dispatcher 

890 ) 

891 except Exception as exc: 

892 # Late-resolved class lookup: importing the sandbox 

893 # module at top of file would pull in 

894 # ``pydantic_monty`` on import, which the engine 

895 # explicitly does not require (an operator can run 

896 # ``mission_validate`` against a stored session JSON 

897 # without a working sandbox). Importing here means the 

898 # translation is best-effort but the engine module 

899 # itself stays loadable everywhere. 

900 try: 

901 from .sandbox import ( 

902 SandboxTerminated, 

903 ScriptRejected, 

904 ) 

905 except Exception: 

906 raise 

907 if isinstance(exc, ScriptRejected): 

908 raise MissionEngineError("script_rejected") from exc 

909 if isinstance(exc, SandboxTerminated): 909 ↛ 949line 909 didn't jump to line 949 because the condition on line 909 was always true

910 # The sandbox cap is a budget cap, not a phase 

911 # failure: the script ran out of wall clock (or 

912 # memory, or hit a runtime / typing / syntax error 

913 # mid-run) under operator-supplied limits. Capture 

914 # whatever it collected before being killed and 

915 # route the verdict through the budget-cap path. 

916 # 

917 # The sentinel on the iteration record is what the 

918 # cascade in ``decide_verdict`` reads to short- 

919 # circuit to ``("terminate", "max_wall_clock")`` 

920 # before any other branch is consulted; without it 

921 # the cascade would fall through to the default 

922 # ``("continue", "in_progress")`` because no other 

923 # cap was breached. 

924 record["script_call_log"] = cast( 

925 "list[ToolCallRecord]", list(exc.partial_script_call_log) 

926 ) 

927 # Build a minimal Observation from the partial 

928 # logs so Evaluate_Phase has the same shape it 

929 # would have on a successful sandbox run. Missing 

930 # keys (``metrics``, ``events``, etc.) get default 

931 # empties; Observe_Phase fills in any timestamps 

932 # the partial doesn't carry. 

933 partial_observation: dict[str, Any] = { 

934 "tool_results": [ 

935 self._annotate_tool_result(call) for call in exc.partial_script_call_log 

936 ], 

937 "metrics": {}, 

938 "events": list(exc.partial_events), 

939 } 

940 if exc.partial_observations: 

941 partial_observation["metrics"]["observations"] = { 

942 entry["key"]: entry["value"] 

943 for entry in exc.partial_observations 

944 if isinstance(entry, dict) and "key" in entry 

945 } 

946 record["observation"] = cast(Observation, partial_observation) 

947 record["sandbox_terminated_reason"] = "max_wall_clock" 

948 return cast("list[ToolCallRecord]", list(exc.partial_script_call_log)) 

949 raise 

950 # The sandbox already produced a normalized Observation dict, 

951 # so we cache it on the record for Observe_Phase to pick up 

952 # directly. This is the only path where Observe_Phase sees a 

953 # pre-built Observation. 

954 record["script_call_log"] = cast("list[ToolCallRecord]", list(script_call_log)) 

955 record["observation"] = cast(Observation, dict(observation_dict)) 

956 return list(script_call_log) 

957 

958 def _elapsed_ms(self, started: datetime) -> int: 

959 """Return integer milliseconds elapsed since ``started``.""" 

960 delta = self.now() - started 

961 return max(int(delta.total_seconds() * 1000), 0) 

962 

963 # ------------------------------------------------------------------ # 

964 # Phase 3 — observe 

965 # ------------------------------------------------------------------ # 

966 

967 async def _observe_phase( 

968 self, 

969 session: SessionState, 

970 strategy: Strategy, 

971 executed_calls: list[ToolCallRecord], 

972 record: IterationRecord, 

973 ) -> None: 

974 """Normalise tool-call outputs into an :class:`Observation`. 

975 

976 Two paths: 

977 

978 * **Script strategy** — Execute_Phase already stashed the 

979 sandbox's Observation on ``record["observation"]``. Observe 

980 fills in any missing required keys (``tool_results``, 

981 ``metrics``, ``events``, ``phase_started_at`` / 

982 ``phase_ended_at``) so downstream Evaluate_Phase consumers 

983 can rely on the shape. 

984 * **Tool-calls strategy** — build the Observation from the 

985 executed-call records: ``tool_results`` is the list of 

986 ``result_summary`` values (one per call, including failed 

987 ones for stable indexing); ``metrics`` and ``events`` are 

988 merged from any call result that carries those keys at the 

989 top level; ``errors`` is appended for failed or skipped 

990 calls. This is intentionally permissive — a Strategy that 

991 doesn't produce metrics or events leaves those slots empty 

992 rather than raising. 

993 """ 

994 

995 async def body() -> None: 

996 phase_started = self.now() 

997 if "script" in strategy: 

998 # The sandbox already produced the Observation. Fill 

999 # in any timestamp slots it didn't populate so the 

1000 # shape is uniform for evaluators. 

1001 obs = cast(dict[str, Any], record.get("observation") or {}) 

1002 obs.setdefault("tool_results", []) 

1003 obs.setdefault("metrics", {}) 

1004 obs.setdefault("events", []) 

1005 obs.setdefault("phase_started_at", phase_started.isoformat()) 

1006 obs.setdefault("phase_ended_at", self.now().isoformat()) 

1007 record["observation"] = cast(Observation, obs) 

1008 return 

1009 record["observation"] = self._build_observation(executed_calls, phase_started) 

1010 

1011 await self._run_phase(session, record, _OBSERVE, body) 

1012 

1013 @staticmethod 

1014 def _annotate_tool_result(call: ToolCallRecord | dict[str, Any]) -> Any: 

1015 """Wrap a call's ``result_summary`` with the per-call call markers. 

1016 

1017 The Observation's ``tool_results`` list is the canonical input 

1018 to predicate criteria and to the dedicated ``tool_call_succeeded`` 

1019 evaluator. Both consult ``r.get("_status")`` and 

1020 ``r.get("tool_name")`` to know which tool produced the entry 

1021 and whether the call succeeded — markers the engine adds here 

1022 rather than relying on individual tools to inject. The stub 

1023 dispatcher used to synthesise these markers in its return, 

1024 but the live FastMCP dispatcher returns whatever shape the 

1025 underlying tool produces (often a structured ``{"result": [ 

1026 ... ]}`` dict that doesn't carry call-level metadata). 

1027 

1028 Strategy: 

1029 

1030 * **Dict result_summary** — augment in place with ``_status`` 

1031 and ``tool_name`` only when those keys are absent. This 

1032 keeps any caller-supplied marker visible (some tools do 

1033 synthesise them) while ensuring evaluators always find 

1034 them. 

1035 * **Non-dict result_summary** (None, list, primitive) — wrap 

1036 in a fresh dict carrying the call's ``_status`` / 

1037 ``tool_name`` plus a ``result`` field that holds the 

1038 original payload so predicates can still walk into it. 

1039 """ 

1040 result = call.get("result_summary") 

1041 status = call.get("status") or "unknown" 

1042 tool_name = call.get("tool_name") 

1043 if isinstance(result, dict): 

1044 annotated = dict(result) 

1045 annotated.setdefault("_status", status) 

1046 annotated.setdefault("tool_name", tool_name) 

1047 return annotated 

1048 return { 

1049 "_status": status, 

1050 "tool_name": tool_name, 

1051 "result": result, 

1052 } 

1053 

1054 def _build_observation( 

1055 self, 

1056 executed_calls: list[ToolCallRecord], 

1057 phase_started: datetime, 

1058 ) -> Observation: 

1059 """Merge a list of :class:`ToolCallRecord` into an :class:`Observation`. 

1060 

1061 Each ``executed_calls`` entry contributes one annotated dict 

1062 to ``observation["tool_results"]`` via 

1063 :meth:`_annotate_tool_result` so the entry always carries 

1064 the ``_status`` and ``tool_name`` markers the predicate 

1065 evaluator and the ``tool_call_succeeded`` evaluator both rely 

1066 on, regardless of what shape the underlying tool returned. 

1067 """ 

1068 tool_results: list[Any] = [] 

1069 metrics: dict[str, Any] = {} 

1070 events: list[dict[str, Any]] = [] 

1071 errors: list[dict[str, Any]] = [] 

1072 

1073 for call in executed_calls: 

1074 tool_results.append(self._annotate_tool_result(call)) 

1075 if call.get("status") == "ok": 

1076 result = call.get("result_summary") 

1077 # Permissive merge: when a tool's result happens to 

1078 # include a top-level ``metrics`` dict or ``events`` 

1079 # list, lift them into the Observation. Anything else 

1080 # stays only in ``tool_results``. 

1081 if isinstance(result, dict): 

1082 result_metrics = result.get("metrics") 

1083 if isinstance(result_metrics, dict): 

1084 metrics.update(result_metrics) 

1085 result_events = result.get("events") 

1086 if isinstance(result_events, list): 1086 ↛ 1087line 1086 didn't jump to line 1087 because the condition on line 1086 was never true

1087 for event in result_events: 

1088 if isinstance(event, dict): 

1089 events.append(event) 

1090 else: 

1091 # ``failed`` and ``skipped_not_allowed`` both surface as 

1092 # errors; the heuristic in decide.py uses "errors that 

1093 # didn't appear in the prior Observation" to drive the 

1094 # adjust verdict, so a stable shape per error matters. 

1095 errors.append( 

1096 { 

1097 "tool_name": call.get("tool_name"), 

1098 "status": call.get("status"), 

1099 "error_message": call.get("error_message"), 

1100 } 

1101 ) 

1102 

1103 observation: Observation = { 

1104 "tool_results": tool_results, 

1105 "metrics": metrics, 

1106 "events": events, 

1107 "phase_started_at": phase_started.isoformat(), 

1108 "phase_ended_at": self.now().isoformat(), 

1109 } 

1110 if errors: 

1111 observation["errors"] = errors 

1112 return observation 

1113 

1114 # ------------------------------------------------------------------ # 

1115 # Phase 4 — evaluate 

1116 # ------------------------------------------------------------------ # 

1117 

1118 async def _evaluate_phase(self, session: SessionState, record: IterationRecord) -> None: 

1119 """Walk the session's Criteria and produce :class:`CriterionResult` rows. 

1120 

1121 The kinds dispatch to per-kind helpers: 

1122 

1123 * ``metric_threshold`` — dot-path lookup on the Observation, 

1124 numeric comparison via the declared operator. 

1125 * ``event`` — scan the Observation's ``events`` list for an 

1126 entry whose ``event_name`` matches the criterion's target. 

1127 * ``predicate`` — evaluate the cached parsed AST against the 

1128 Observation. A raised exception lands as ``inconclusive`` so 

1129 a malformed predicate cannot crash the loop. 

1130 

1131 Order in the output list matches the declared order of 

1132 ``session["criteria"]`` so iteration audit consumers can pair 

1133 results with criteria positionally. 

1134 """ 

1135 

1136 async def body() -> None: 

1137 observation = cast(dict[str, Any], record.get("observation") or {}) 

1138 # Build a cumulative observation for predicates: merge all 

1139 # prior iterations' tool_results into the current one so 

1140 # predicates like ``any('gpu' in str(r) for r in 

1141 # obs['tool_results'])`` can see results from prior 

1142 # iterations. Metrics and events stay per-iteration (they 

1143 # represent the current state, not history). 

1144 cumulative_obs = self._build_cumulative_observation(observation, session) 

1145 results: list[CriterionResult] = [] 

1146 for criterion in session.get("criteria") or []: 

1147 results.append( 

1148 self._evaluate_one_criterion(criterion, observation, cumulative_obs, session) 

1149 ) 

1150 record["criteria_evaluation"] = results 

1151 

1152 await self._run_phase(session, record, _EVALUATE, body) 

1153 

1154 @staticmethod 

1155 def _build_cumulative_observation( 

1156 current_obs: dict[str, Any], session: SessionState 

1157 ) -> dict[str, Any]: 

1158 """Merge prior iterations' tool_results and metric history into a view. 

1159 

1160 Two things are cumulative on the returned view: 

1161 

1162 * ``tool_results`` — every prior iteration's results concatenated with 

1163 the current iteration's, so predicate criteria can see results from 

1164 all iterations. This enables multi-tool goals where each tool runs in 

1165 a different iteration to converge. 

1166 * ``metric_history`` — a history-aware map from metric name to the 

1167 ordered list of its numeric values across the session 

1168 (oldest→newest, current iteration last). This is what lets the 

1169 ``metric_trend`` criterion ask "is loss falling across iterations?" 

1170 even though the engine keeps the per-iteration ``metrics`` dict 

1171 strictly point-in-time. Non-numeric and boolean metric values are 

1172 skipped so a stray string reading cannot poison a trend. 

1173 

1174 ``metrics``, ``events``, and ``errors`` stay per-iteration on the view 

1175 because they represent current state: ``metrics`` is the latest 

1176 point-in-time reading (a ``metric_threshold`` criterion still compares 

1177 the single current value), events are per-iteration signals, and errors 

1178 are per-call. The history lives *alongside* them under 

1179 ``metric_history`` rather than replacing them, so existing criteria are 

1180 unaffected. 

1181 """ 

1182 all_tool_results: list[Any] = [] 

1183 metric_history: dict[str, list[float]] = {} 

1184 

1185 def _accumulate_metrics(obs: Mapping[str, Any]) -> None: 

1186 metrics = obs.get("metrics") 

1187 if not isinstance(metrics, dict): 1187 ↛ 1188line 1187 didn't jump to line 1188 because the condition on line 1187 was never true

1188 return 

1189 for key, value in metrics.items(): 

1190 # Mirror the Numeric_Value guard the readers use: int or float, 

1191 # never bool. A non-numeric reading contributes no history 

1192 # point rather than breaking the series. 

1193 if isinstance(value, bool) or not isinstance(value, (int, float)): 

1194 continue 

1195 metric_history.setdefault(key, []).append(float(value)) 

1196 

1197 for prior in session.get("iterations") or []: 

1198 prior_obs = prior.get("observation") or {} 

1199 prior_results = prior_obs.get("tool_results") 

1200 if isinstance(prior_results, list): 1200 ↛ 1202line 1200 didn't jump to line 1202 because the condition on line 1200 was always true

1201 all_tool_results.extend(prior_results) 

1202 _accumulate_metrics(prior_obs) 

1203 # Append current iteration's results and metrics last so the history is 

1204 # ordered oldest→newest with the current reading at the end. 

1205 current_results = current_obs.get("tool_results") 

1206 if isinstance(current_results, list): 1206 ↛ 1208line 1206 didn't jump to line 1208 because the condition on line 1206 was always true

1207 all_tool_results.extend(current_results) 

1208 _accumulate_metrics(current_obs) 

1209 # Build the cumulative view: tool_results + metric_history are 

1210 # cumulative, everything else comes from the current observation. 

1211 cumulative: dict[str, Any] = dict(current_obs) 

1212 cumulative["tool_results"] = all_tool_results 

1213 cumulative["metric_history"] = metric_history 

1214 return cumulative 

1215 

1216 def _evaluate_one_criterion( 

1217 self, 

1218 criterion: Criterion, 

1219 observation: dict[str, Any], 

1220 cumulative_obs: dict[str, Any], 

1221 session: SessionState, 

1222 ) -> CriterionResult: 

1223 """Dispatch to the right evaluator and produce a result row.""" 

1224 criterion_id = criterion["criterion_id"] 

1225 kind = criterion["kind"] 

1226 evaluated_at = self.now().isoformat() 

1227 

1228 if kind == "metric_threshold": 

1229 status, evidence = self._evaluate_metric_threshold(criterion, observation) 

1230 elif kind == "metric_trend": 

1231 # Trend evaluates against the cumulative observation, where the 

1232 # engine accumulates ``metric_history`` across iterations. 

1233 status, evidence = self._evaluate_metric_trend(criterion, cumulative_obs) 

1234 elif kind == "event": 1234 ↛ 1235line 1234 didn't jump to line 1235 because the condition on line 1234 was never true

1235 status, evidence = self._evaluate_event(criterion, observation) 

1236 elif kind == "predicate": 

1237 # Predicates evaluate against the cumulative observation so 

1238 # they can see tool_results from all prior iterations. 

1239 status, evidence = self._evaluate_predicate(criterion, cumulative_obs) 

1240 elif kind == "tool_call_succeeded": 1240 ↛ 1247line 1240 didn't jump to line 1247 because the condition on line 1240 was always true

1241 status, evidence = self._evaluate_tool_call_succeeded(criterion, observation, session) 

1242 else: 

1243 # Unreachable when the validator has run — but if a 

1244 # malformed session somehow lands here, surface the bad 

1245 # kind as inconclusive rather than raising and tearing 

1246 # down the entire iteration. 

1247 status = "inconclusive" 

1248 evidence = f"unknown_criterion_kind:{kind!r}" 

1249 

1250 return { 

1251 "criterion_id": criterion_id, 

1252 "status": cast(Any, status), 

1253 "evidence": evidence, 

1254 "evaluated_at": evaluated_at, 

1255 } 

1256 

1257 @staticmethod 

1258 def _evaluate_metric_threshold( 

1259 criterion: Criterion, observation: dict[str, Any] 

1260 ) -> tuple[str, Any]: 

1261 """Look up the metric by dot-path and compare to ``target``.""" 

1262 path = criterion.get("metric") or "" 

1263 op = criterion.get("op") 

1264 target = criterion.get("target") 

1265 value: Any = observation 

1266 for segment in path.split("."): 

1267 if isinstance(value, dict) and segment in value: 

1268 value = value[segment] 

1269 else: 

1270 return "inconclusive", f"metric_path_missing:{path!r}" 

1271 if isinstance(value, bool) or not isinstance(value, (int, float)): 

1272 return "inconclusive", value 

1273 try: 

1274 met = _compare_numbers(value, cast(str, op), cast(float, target)) 

1275 except ValueError: 

1276 return "inconclusive", value 

1277 return ("met" if met else "unmet"), value 

1278 

1279 @staticmethod 

1280 def _evaluate_metric_trend( 

1281 criterion: Criterion, cumulative_obs: dict[str, Any] 

1282 ) -> tuple[str, Any]: 

1283 """Evaluate a metric's direction across the accumulated history. 

1284 

1285 Reads the metric's value series from 

1286 ``cumulative_obs["metric_history"]`` — the oldest→newest list of 

1287 numeric readings the engine accumulates in 

1288 :meth:`_build_cumulative_observation`. The ``metric`` dot-path is 

1289 resolved against that map: a leading ``metrics.`` segment is stripped 

1290 so a criterion can reuse the same ``"metrics.loss"`` path it would use 

1291 for ``metric_threshold`` and still address the ``loss`` history series. 

1292 

1293 The series is trimmed to the most-recent ``window`` points (default: 

1294 all available). With fewer than ``min_points`` numeric points (default 

1295 2) the criterion is ``inconclusive`` — a trend is undefined on a single 

1296 reading, and the loop must never be failed for lack of history. The 

1297 verdict compares the last point to the first point of the windowed 

1298 series per ``direction``: 

1299 

1300 * ``decreasing`` → last < first 

1301 * ``increasing`` → last > first 

1302 * ``non_increasing`` → last <= first 

1303 * ``non_decreasing`` → last >= first 

1304 

1305 Evidence is a structured dict (direction, the windowed points, first / 

1306 last, and net delta) so the audit log shows exactly what the verdict 

1307 was computed from. 

1308 """ 

1309 path = criterion.get("metric") or "" 

1310 direction = criterion.get("direction") 

1311 # Resolve the metric key against the history map. Accept both the bare 

1312 # key (``"loss"``) and the dot-path form (``"metrics.loss"``) so a 

1313 # trend criterion lines up with the metric_threshold convention. 

1314 history = cumulative_obs.get("metric_history") 

1315 if not isinstance(history, dict): 

1316 return "inconclusive", "metric_history_missing" 

1317 key = path.split(".", 1)[1] if path.startswith("metrics.") else path 

1318 series = history.get(key) 

1319 if not isinstance(series, list) or not series: 

1320 return "inconclusive", f"metric_history_empty:{key!r}" 

1321 

1322 # Keep only the numeric points (the accumulator already filters, but be 

1323 # defensive against a hand-built cumulative_obs in tests). 

1324 points: list[float] = [ 

1325 float(v) for v in series if not isinstance(v, bool) and isinstance(v, (int, float)) 

1326 ] 

1327 

1328 window = criterion.get("window") 

1329 if isinstance(window, int) and not isinstance(window, bool) and window > 0: 

1330 points = points[-window:] 

1331 

1332 min_points = criterion.get("min_points") 

1333 required_points = ( 

1334 min_points if isinstance(min_points, int) and not isinstance(min_points, bool) else 2 

1335 ) 

1336 required_points = max(2, required_points) 

1337 if len(points) < required_points: 

1338 return "inconclusive", { 

1339 "reason": "insufficient_history", 

1340 "points": points, 

1341 "required_points": required_points, 

1342 } 

1343 

1344 first = points[0] 

1345 last = points[-1] 

1346 delta = last - first 

1347 if direction == "decreasing": 

1348 met = last < first 

1349 elif direction == "increasing": 

1350 met = last > first 

1351 elif direction == "non_increasing": 1351 ↛ 1353line 1351 didn't jump to line 1353 because the condition on line 1351 was always true

1352 met = last <= first 

1353 elif direction == "non_decreasing": 

1354 met = last >= first 

1355 else: 

1356 # Unreachable when the validator has run; surface defensively. 

1357 return "inconclusive", f"unknown_direction:{direction!r}" 

1358 

1359 evidence = { 

1360 "direction": direction, 

1361 "points": points, 

1362 "first": first, 

1363 "last": last, 

1364 "delta": delta, 

1365 } 

1366 return ("met" if met else "unmet"), evidence 

1367 

1368 @staticmethod 

1369 def _evaluate_event(criterion: Criterion, observation: dict[str, Any]) -> tuple[str, Any]: 

1370 """Scan ``observation['events']`` for the named event.""" 

1371 if "events" not in observation: 

1372 return "inconclusive", "events_field_missing" 

1373 events = observation.get("events") 

1374 if not isinstance(events, list): 

1375 return "inconclusive", "events_field_not_a_list" 

1376 target = criterion.get("event_name") 

1377 for event in events: 

1378 if isinstance(event, dict) and event.get("event_name") == target: 

1379 return "met", event 

1380 return "unmet", None 

1381 

1382 @staticmethod 

1383 def _evaluate_predicate(criterion: Criterion, observation: dict[str, Any]) -> tuple[str, Any]: 

1384 """Run the cached parsed AST against the Observation. 

1385 

1386 The validator caches an :class:`ast.Expression` under 

1387 ``_parsed_ast`` when ``validate_criteria`` runs in-process. 

1388 Persistence layers strip that key before serialisation (the 

1389 AST node is not JSON-safe), so a session reloaded from disk 

1390 carries criteria *without* ``_parsed_ast``. We detect the 

1391 missing cache and re-parse on demand from ``expression``; 

1392 the parser was already accepted at validation time so a 

1393 re-parse is a pure no-op short of re-reading the source. A 

1394 post-load tampering with ``expression`` would cause 

1395 :class:`PredicateRejected`, which we surface as a structured 

1396 ``inconclusive`` evidence string rather than letting it 

1397 propagate. 

1398 """ 

1399 parsed = criterion.get("_parsed_ast") 

1400 if parsed is None: 

1401 expression = criterion.get("expression") 

1402 if not isinstance(expression, str) or not expression: 

1403 return "inconclusive", "predicate_ast_not_cached" 

1404 try: 

1405 parsed = parse_predicate(expression) 

1406 except PredicateRejected as exc: 

1407 return "inconclusive", f"predicate_rejected_post_load: {exc.reason}" 

1408 try: 

1409 value = evaluate_predicate(parsed, observation) 

1410 except Exception as exc: 

1411 return "inconclusive", f"{type(exc).__name__}: {exc}" 

1412 return ("met" if value else "unmet"), value 

1413 

1414 @staticmethod 

1415 def _evaluate_tool_call_succeeded( 

1416 criterion: Criterion, 

1417 observation: dict[str, Any], 

1418 session: SessionState | dict[str, Any] | None = None, 

1419 ) -> tuple[str, Any]: 

1420 """Count successful tool_results matching the named tool across all iterations. 

1421 

1422 The criterion is met when at least ``min_count`` (default 1) 

1423 entries across the **entire session history** (all prior 

1424 iterations' observations plus the current one) have 

1425 ``tool_name`` equal to the criterion's ``tool_name`` and 

1426 ``_status`` equal to ``"ok"``. This cumulative evaluation 

1427 means a multi-tool goal where each tool runs in a different 

1428 iteration can still converge — the criterion remembers that 

1429 the tool succeeded in a prior iteration even if the current 

1430 iteration called a different tool. 

1431 

1432 Returns a ``(status, evidence)`` tuple where ``evidence`` is 

1433 a structured dict so the audit log shows the match shape. 

1434 """ 

1435 target_tool = criterion.get("tool_name") 

1436 min_count = criterion.get("min_count", 1) 

1437 

1438 # Collect tool_results from all prior iterations + the current observation. 

1439 all_results: list[dict[str, Any]] = [] 

1440 

1441 # Prior iterations' observations (when session is provided). 

1442 if session is not None: 

1443 for prior_iteration in session.get("iterations") or []: 1443 ↛ 1444line 1443 didn't jump to line 1444 because the loop on line 1443 never started

1444 prior_obs = prior_iteration.get("observation") or {} 

1445 prior_results = prior_obs.get("tool_results") 

1446 if isinstance(prior_results, list): 

1447 for r in prior_results: 

1448 if isinstance(r, dict): 

1449 all_results.append(r) 

1450 

1451 # Current iteration's observation. 

1452 current_results = observation.get("tool_results") 

1453 if isinstance(current_results, list): 

1454 for r in current_results: 

1455 if isinstance(r, dict): 

1456 all_results.append(r) 

1457 

1458 if not all_results: 

1459 return "inconclusive", "tool_results_field_missing" 

1460 

1461 successful = [ 

1462 r for r in all_results if r.get("tool_name") == target_tool and r.get("_status") == "ok" 

1463 ] 

1464 evidence = { 

1465 "tool_name": target_tool, 

1466 "min_count": min_count, 

1467 "successful_call_count": len(successful), 

1468 } 

1469 if len(successful) >= min_count: 

1470 return "met", evidence 

1471 return "unmet", evidence 

1472 

1473 # ------------------------------------------------------------------ # 

1474 # Phase 5 — decide 

1475 # ------------------------------------------------------------------ # 

1476 

1477 async def _decide_phase( 

1478 self, session: SessionState, record: IterationRecord 

1479 ) -> tuple[VerdictLabel, VerdictReason]: 

1480 """Run the deterministic verdict cascade and stamp the record.""" 

1481 

1482 async def body() -> tuple[VerdictLabel, VerdictReason]: 

1483 now_value = self.now() 

1484 verdict, reason = decide.decide_verdict(session, record, now_value) 

1485 checkpoint_evaluated = reason != "cadence_skip" 

1486 record["checkpoint_evaluated"] = checkpoint_evaluated 

1487 if verdict == "adjust": 

1488 record["revision_rationale"] = decide.build_revision_rationale_template( 

1489 session, record 

1490 ) 

1491 if checkpoint_evaluated: 

1492 # ``last_checkpoint_at`` anchors the every_t_seconds 

1493 # cadence; only real (non-skip) verdicts advance it. 

1494 mark_checkpoint(session, now_value) 

1495 return verdict, reason 

1496 

1497 return cast( 

1498 tuple[VerdictLabel, VerdictReason], 

1499 await self._run_phase(session, record, _DECIDE, body), 

1500 ) 

1501 

1502 # ------------------------------------------------------------------ # 

1503 # Post-iteration housekeeping 

1504 # ------------------------------------------------------------------ # 

1505 

1506 def _update_session_post_iteration( 

1507 self, session: SessionState, record: IterationRecord 

1508 ) -> None: 

1509 """Advance or reset the no-progress counter. 

1510 

1511 Counter semantics (matching the Decide_Phase's stagnation cap): 

1512 

1513 * Synthetic ``cadence_skip`` iterations leave the counter 

1514 alone — a session whose cadence is ``every_n_iterations`` 

1515 must not be able to reach ``stagnation_threshold`` purely 

1516 because most iterations skip the criteria check. 

1517 * On evaluated iterations, compute the per-criterion 

1518 improvement against the immediately prior evaluated 

1519 iteration. A criterion improved iff its prior status was 

1520 ``unmet`` or ``inconclusive`` AND its current status is 

1521 ``met``. Any improvement resets the counter to 0; otherwise 

1522 the counter increments by 1. 

1523 

1524 ``record`` has already been appended to 

1525 ``session["iterations"]`` by the caller, so the prior 

1526 iteration is at index ``-2``. 

1527 """ 

1528 if not record.get("checkpoint_evaluated"): 

1529 return 

1530 

1531 prior_eval = self._previous_evaluated_iteration(session) 

1532 if prior_eval is None: 

1533 # No prior evaluated iteration: the loop has nothing to 

1534 # measure improvement against. Treat as no-improvement 

1535 # rather than a forced reset, so the stagnation counter 

1536 # tracks "how long since we made measurable progress" 

1537 # uniformly across the run. 

1538 session["no_progress_counter"] = (session.get("no_progress_counter", 0) or 0) + 1 

1539 return 

1540 

1541 if self._criteria_improved(prior_eval, record["criteria_evaluation"]): 

1542 session["no_progress_counter"] = 0 

1543 else: 

1544 session["no_progress_counter"] = (session.get("no_progress_counter", 0) or 0) + 1 

1545 

1546 @staticmethod 

1547 def _previous_evaluated_iteration( 

1548 session: SessionState, 

1549 ) -> list[CriterionResult] | None: 

1550 """Return the criteria evaluation of the most recent evaluated iteration. 

1551 

1552 "Evaluated" here means ``checkpoint_evaluated=True``. Skipping 

1553 cadence-skip iterations is what makes the no-progress counter 

1554 immune to the cadence configuration: it only ever measures 

1555 movement between *real* checkpoints. The current iteration is 

1556 already appended to ``session["iterations"]`` by the caller, 

1557 so we look strictly before it. 

1558 """ 

1559 iterations = session.get("iterations") or [] 

1560 # Skip the just-appended current iteration (last index). 

1561 for prior in reversed(iterations[:-1]): 

1562 if prior.get("checkpoint_evaluated"): 

1563 return list(prior.get("criteria_evaluation") or []) 

1564 return None 

1565 

1566 @staticmethod 

1567 def _criteria_improved( 

1568 prior: list[CriterionResult], 

1569 current: list[CriterionResult], 

1570 ) -> bool: 

1571 """Return True iff any criterion went from not-met to met.""" 

1572 prior_status = { 

1573 result["criterion_id"]: result["status"] 

1574 for result in prior 

1575 if isinstance(result, dict) and "criterion_id" in result 

1576 } 

1577 for result in current: 

1578 if not isinstance(result, dict): 1578 ↛ 1579line 1578 didn't jump to line 1579 because the condition on line 1578 was never true

1579 continue 

1580 current_status = result.get("status") 

1581 if current_status != "met": 

1582 continue 

1583 prior_value = prior_status.get(result.get("criterion_id")) 

1584 if prior_value in ("unmet", "inconclusive", None): 

1585 # ``None`` covers a criterion that did not appear in 

1586 # the prior evaluation — treating it as "not met 

1587 # before" is consistent with first-time-met being an 

1588 # improvement. 

1589 return True 

1590 return False 

1591 

1592 # ------------------------------------------------------------------ # 

1593 # Terminal-verdict finalisation 

1594 # ------------------------------------------------------------------ # 

1595 

1596 async def _finalise_terminal_session( 

1597 self, 

1598 session: SessionState, 

1599 record: IterationRecord, 

1600 verdict: VerdictLabel, 

1601 reason: VerdictReason, 

1602 ) -> None: 

1603 """Transition the session to its terminal status and write the report. 

1604 

1605 Called by ``run_iteration`` only when the verdict is in 

1606 :data:`TERMINAL_VERDICTS`. The status mapping is fixed: 

1607 ``complete`` → ``completed``, ``terminate`` → ``terminated``. 

1608 ``ended_at`` is anchored on the iteration's own ``ended_at`` 

1609 so the session's lifecycle window matches the last persisted 

1610 iteration's window without an extra clock read. 

1611 

1612 When :attr:`final_lessons_callable` is wired, the engine awaits 

1613 it once to fetch a ``{"lessons": ..., "recommended_followups": 

1614 ...}`` overlay and synthesises a tiny synchronous sampler 

1615 closure that returns the pre-fetched overlay; the closure is 

1616 then handed to :func:`mcp.mission.final_report.write_final_report` 

1617 which keeps its existing sync-callable contract. This pre-fetch 

1618 bridge is needed because the production helper 

1619 (:func:`mcp.mission.sampling.maybe_sample_final_lessons`) is 

1620 async while ``write_final_report`` is sync. 

1621 """ 

1622 if verdict == "complete": 

1623 session["status"] = "completed" 

1624 else: # verdict == "terminate" 

1625 session["status"] = "terminated" 

1626 session["ended_at"] = record["ended_at"] 

1627 session["final_verdict"] = verdict 

1628 # Optional sampling overlay for the lessons / followups fields. 

1629 # Pre-fetch so the (sync) report writer never has to await. 

1630 overlay = await self._maybe_sample_final_lessons(session, verdict, reason) 

1631 sampler: Callable[..., dict[str, Any] | None] | None 

1632 if overlay is not None: 

1633 

1634 def _pre_fetched_sampler( 

1635 _session: SessionState, 

1636 _verdict: VerdictLabel, 

1637 _reason: VerdictReason, 

1638 ) -> dict[str, Any] | None: 

1639 return overlay 

1640 

1641 sampler = _pre_fetched_sampler 

1642 else: 

1643 sampler = None 

1644 # The Final_Report is the durable exit artifact. We write it 

1645 # via the report helper so the persistence path (filesystem 

1646 # sibling vs. embedded-on-session for non-filesystem backends) 

1647 # is owned by one module. 

1648 final_report.write_final_report(self.backend, session, verdict, reason, sampler=sampler) 

1649 

1650 async def _maybe_sample_final_lessons( 

1651 self, 

1652 session: SessionState, 

1653 verdict: VerdictLabel, 

1654 reason: VerdictReason, 

1655 ) -> dict[str, Any] | None: 

1656 """Fetch the optional Final_Report ``lessons`` / ``followups`` overlay. 

1657 

1658 Calls :attr:`final_lessons_callable` once (when wired and when 

1659 the session opted into sampling) and adapts the return value 

1660 into the ``{"lessons": str, "recommended_followups": list[str]}`` 

1661 shape that 

1662 :func:`mcp.mission.final_report.write_final_report` expects. 

1663 

1664 Three return shapes are recognised: 

1665 

1666 * :class:`mcp.mission.sampling.SamplingUsed` — production path. 

1667 ``parsed["lessons"]`` is a list of strings; the engine joins 

1668 them with double newlines so the report's ``lessons`` field 

1669 stays a single string. ``parsed["recommended_followups"]`` is 

1670 forwarded as a list verbatim. 

1671 * Raw ``dict`` (legacy / test pattern) — passed straight 

1672 through. The downstream sampler-overlay code in 

1673 ``write_final_report`` already validates and silently drops 

1674 malformed fields. 

1675 * Anything else (including :class:`SamplingFallback`, 

1676 ``None``, exceptions) — returns ``None`` so the 

1677 deterministic templates from 

1678 :func:`mcp.mission.final_report.build_deterministic_report` 

1679 stand on their own. 

1680 

1681 The method swallows any exception raised by the callable 

1682 because the Final_Report is the durable exit artifact and a 

1683 flaky sampler must not block it from landing. 

1684 """ 

1685 del verdict, reason # forwarded only for symmetry with the legacy Sampler shape 

1686 if self.final_lessons_callable is None: 

1687 return None 

1688 if not session.get("use_sampling"): 1688 ↛ 1689line 1688 didn't jump to line 1689 because the condition on line 1688 was never true

1689 return None 

1690 try: 

1691 result = await self.final_lessons_callable(session=session) 

1692 except Exception: 

1693 return None 

1694 if isinstance(result, SamplingUsed): 1694 ↛ 1706line 1694 didn't jump to line 1706 because the condition on line 1694 was always true

1695 lessons = result.parsed.get("lessons") 

1696 followups = result.parsed.get("recommended_followups") 

1697 overlay: dict[str, Any] = {} 

1698 if isinstance(lessons, list) and all(isinstance(item, str) for item in lessons): 1698 ↛ 1703line 1698 didn't jump to line 1703 because the condition on line 1698 was always true

1699 # write_final_report expects ``lessons`` as a single 

1700 # string; join with blank lines so multi-bullet output 

1701 # from the model stays readable on render. 

1702 overlay["lessons"] = "\n\n".join(lessons) 

1703 if isinstance(followups, list) and all(isinstance(item, str) for item in followups): 1703 ↛ 1705line 1703 didn't jump to line 1705 because the condition on line 1703 was always true

1704 overlay["recommended_followups"] = list(followups) 

1705 return overlay or None 

1706 if isinstance(result, SamplingFallback): 

1707 return None 

1708 if isinstance(result, dict): 

1709 # Legacy raw-dict path — let the downstream overlay 

1710 # validator do the structural check. 

1711 return result 

1712 return None 

1713 

1714 

1715# --------------------------------------------------------------------------- 

1716# Comparison helper 

1717# --------------------------------------------------------------------------- 

1718 

1719 

1720def _compare_numbers(value: float, op: str, target: float) -> bool: 

1721 """Apply one of the six allowed numeric comparison operators.""" 

1722 if op == "<": 

1723 return value < target 

1724 if op == "<=": 

1725 return value <= target 

1726 if op == ">": 

1727 return value > target 

1728 if op == ">=": 

1729 return value >= target 

1730 if op == "==": 

1731 return value == target 

1732 if op == "!=": 

1733 return value != target 

1734 raise ValueError(f"unknown comparison operator: {op!r}")