Coverage for mcp/mission/final_report.py: 90%

131 statements  

« prev     ^ index     » next       coverage.py v7.14.1, created at 2026-06-15 15:07 +0000

1"""Mission Final_Report writer. 

2 

3Builds and persists the durable JSON artifact that ends a Mission_Session. 

4The report captures the directive, criteria, budget, allowlist, cadence, 

5the full iteration history (with private parser caches stripped), and the 

6terminal verdict. Two surfaces: 

7 

8* :func:`build_deterministic_report` — pure: takes a session and the 

9 terminal ``(verdict, reason)`` tuple, returns a dict containing only 

10 fields that can be derived from the session payload without consulting 

11 any LLM. The ``lessons`` and ``recommended_followups`` slots are 

12 pre-populated with templated text so a Mission running with sampling 

13 disabled — or with a sampling backend that fails — still produces a 

14 complete, useful report. 

15* :func:`write_final_report` — calls :func:`build_deterministic_report`, 

16 optionally overlays the sampler-supplied ``lessons`` / 

17 ``recommended_followups``, persists the report, and updates 

18 ``session["final_report_path"]``. Returns the persisted-path identifier. 

19 

20The writer is deliberately backend-aware. :class:`FilesystemBackend` writes 

21the report as a sibling file at ``<root>/<session_id>.report.json`` using 

22the same temp-file + ``fsync`` + ``os.replace`` atomic pattern that 

23:meth:`FilesystemBackend.save_session` uses, so a reader concurrent with a 

24writer never sees a partial JSON document. Other backends (today, the 

25:class:`DynamoDBBackend` stub) embed the report on the session under a 

26``final_report`` key and re-save the session — DynamoDB's single-item 

27``put_item`` is atomic, so no separate dance is needed. The synthetic 

28identifier returned in that case is ``"dynamodb://{session_id}/report"`` so 

29callers always have a stable string to record on 

30``session["final_report_path"]``. 

31""" 

32 

33from __future__ import annotations 

34 

35import contextlib 

36import copy 

37import json 

38import logging 

39import os 

40import tempfile 

41from collections.abc import Callable 

42from datetime import UTC, datetime 

43from typing import Any, cast 

44 

45from .state import FilesystemBackend 

46from .types import IterationRecord, SessionState, VerdictLabel, VerdictReason 

47 

48__all__ = [ 

49 "build_deterministic_report", 

50 "write_final_report", 

51] 

52 

53logger = logging.getLogger(__name__) 

54 

55 

56# -------------------------------------------------------------------------- 

57# Type aliases 

58# -------------------------------------------------------------------------- 

59 

60# A sampler callable supplies LLM-derived ``lessons`` / 

61# ``recommended_followups`` overlays for the report. It receives the 

62# session and the terminal verdict tuple, and returns a dict carrying the 

63# two keys — or ``None`` when the call failed and the deterministic 

64# templates should be kept. 

65Sampler = Callable[ 

66 [SessionState, VerdictLabel, VerdictReason], 

67 "dict[str, Any] | None", 

68] 

69 

70 

71# Private cache key written by ``validate_criteria`` onto every 

72# ``predicate`` Criterion. We strip it from anything that lands in the 

73# report so the artifact stays portable JSON. 

74_PARSED_AST_KEY = "_parsed_ast" 

75 

76 

77# -------------------------------------------------------------------------- 

78# Public surface 

79# -------------------------------------------------------------------------- 

80 

81 

82def build_deterministic_report( 

83 session: SessionState, 

84 verdict: VerdictLabel, 

85 reason: VerdictReason, 

86) -> dict[str, Any]: 

87 """Return the Final_Report dict using only deterministic session fields. 

88 

89 The returned dict carries: 

90 

91 * Identification — ``session_id`` and the verbatim ``directive_text``. 

92 * Configuration snapshot — ``criteria`` (with the cached parser AST 

93 stripped), ``budget``, ``tool_allowlist``, ``checkpoint_cadence``, 

94 and ``stagnation_threshold``. 

95 * Lifecycle timestamps — ``created_at``, ``started_at`` (``None`` 

96 when the session never ran a real iteration), and a fresh 

97 ``ended_at`` set to the current UTC time. 

98 * Outcome — ``iterations_run``, ``final_verdict``, 

99 ``final_verdict_reason``, ``final_criteria_evaluation`` (the last 

100 iteration's per-Criterion results, or ``None`` when no iteration 

101 ran). 

102 * Iteration history — ``iterations``, deep-copied with private 

103 ``_parsed_ast`` keys stripped throughout. 

104 * Templated narrative — ``lessons`` and ``recommended_followups`` 

105 pre-populated with deterministic template text so a session that 

106 ran with sampling disabled, or whose sampler failed, still 

107 produces a useful report. :func:`write_final_report` overlays 

108 these two fields when a working sampler is supplied. 

109 

110 Pure: depends only on the session payload and the verdict tuple, and 

111 produces nothing that a caller could not regenerate from the same 

112 inputs. The single ``datetime.now`` call records the moment the 

113 report was assembled — that is itself the deterministic function of 

114 "now I am writing the report" rather than business logic that 

115 consults the clock. 

116 """ 

117 now_iso = datetime.now(UTC).isoformat() 

118 

119 report: dict[str, Any] = { 

120 "session_id": session["session_id"], 

121 "directive_text": session["directive_text"], 

122 "criteria": _strip_parsed_ast_from_criteria( 

123 cast("list[dict[str, Any]]", list(session.get("criteria") or [])) 

124 ), 

125 "budget": dict(session.get("budget") or {}), 

126 "tool_allowlist": list(session.get("tool_allowlist") or []), 

127 "checkpoint_cadence": dict(session.get("checkpoint_cadence") or {}), 

128 "stagnation_threshold": session.get("stagnation_threshold"), 

129 "created_at": session.get("created_at"), 

130 "started_at": session.get("started_at"), 

131 "ended_at": now_iso, 

132 "iterations_run": len(session.get("iterations") or []), 

133 "final_verdict": verdict, 

134 "final_verdict_reason": reason, 

135 "final_criteria_evaluation": _final_criteria_evaluation(session), 

136 "lessons": _build_lessons_template(session, verdict, reason), 

137 "recommended_followups": _build_followups_template(session, verdict, reason), 

138 "iterations": _strip_parsed_ast_from_iterations(session.get("iterations") or []), 

139 } 

140 return report 

141 

142 

143def write_final_report( 

144 backend: Any, 

145 session: SessionState, 

146 verdict: VerdictLabel, 

147 reason: VerdictReason, 

148 sampler: Sampler | None = None, 

149) -> str: 

150 """Build, optionally overlay, and persist the Final_Report. 

151 

152 The flow is: 

153 

154 1. :func:`build_deterministic_report` produces a complete report 

155 dict with templated ``lessons`` / ``recommended_followups``. 

156 2. When ``sampler`` is supplied, it is called once with 

157 ``(session, verdict, reason)``. A returned dict whose ``lessons`` 

158 and / or ``recommended_followups`` keys are well-typed overlays 

159 the corresponding template values; any other return (``None``, 

160 a dict missing both keys, or an exception) leaves the templates 

161 intact. Sampler failures are logged at WARNING and never 

162 propagated — the report must always land. 

163 3. The report is persisted alongside (or on) the session, depending 

164 on the backend type: 

165 

166 * :class:`mcp.mission.state.FilesystemBackend` writes 

167 ``<root>/<session_id>.report.json`` using the same temp-file + 

168 ``fsync`` + ``os.replace`` atomic pattern as 

169 :meth:`FilesystemBackend.save_session`. Returns the absolute 

170 path of the report file. 

171 * Any other backend (today, the DynamoDB stub) attaches the 

172 report dict to the session under ``final_report`` and calls 

173 ``backend.save_session(session)``. DynamoDB's single-item 

174 ``put_item`` is atomic so no separate dance is needed. Returns 

175 ``"dynamodb://{session_id}/report"`` as a stable synthetic 

176 identifier. 

177 

178 4. ``session["final_report_path"]`` is updated with the returned 

179 identifier so callers (and the next ``backend.save_session``) 

180 record where the report lives. 

181 """ 

182 report = build_deterministic_report(session, verdict, reason) 

183 

184 if sampler is not None: 

185 overlay = _safely_invoke_sampler(sampler, session, verdict, reason) 

186 if overlay is not None: 186 ↛ 189line 186 didn't jump to line 189 because the condition on line 186 was always true

187 _apply_sampler_overlay(report, overlay) 

188 

189 if isinstance(backend, FilesystemBackend): 189 ↛ 192line 189 didn't jump to line 192 because the condition on line 189 was always true

190 path = _write_report_to_filesystem(backend, session["session_id"], report) 

191 else: 

192 path = _attach_report_to_session(backend, session, report) 

193 

194 session["final_report_path"] = path 

195 return path 

196 

197 

198# -------------------------------------------------------------------------- 

199# Templated narrative 

200# -------------------------------------------------------------------------- 

201 

202 

203def _build_lessons_template( 

204 session: SessionState, 

205 verdict: VerdictLabel, 

206 reason: VerdictReason, 

207) -> str: 

208 """Deterministic ``lessons`` paragraph for sessions without sampling overlay. 

209 

210 A few lines of operator-readable narrative pulling exclusively from 

211 the persisted session: the directive, the terminal verdict and 

212 reason, the iteration count, and a comma-separated list of unmet or 

213 inconclusive criterion ids drawn from the final iteration's 

214 evaluation. Stays short and machine-parseable so it is easy to grep 

215 or display in a CLI summary. 

216 """ 

217 iterations = session.get("iterations") or [] 

218 iteration_count = len(iterations) 

219 directive = session.get("directive_text", "") 

220 # Trim the directive so a verbose multi-line directive does not turn 

221 # this paragraph into a wall of text. 

222 if len(directive) > 240: 

223 directive = directive[:237] + "..." 

224 

225 final_eval = _final_criteria_evaluation(session) or [] 

226 not_met_ids = [ 

227 result["criterion_id"] 

228 for result in final_eval 

229 if result.get("status") in ("unmet", "inconclusive") 

230 ] 

231 not_met_summary = ", ".join(not_met_ids) if not_met_ids else "none" 

232 

233 return ( 

234 f"Mission ended with verdict {verdict!r} (reason {reason!r}) after " 

235 f"{iteration_count} iteration(s). Directive: {directive!r}. " 

236 f"Outstanding criteria at termination: {not_met_summary}. " 

237 "This summary is templated text — re-run with sampling enabled to " 

238 "replace it with a model-derived narrative." 

239 ) 

240 

241 

242def _build_followups_template( 

243 session: SessionState, 

244 verdict: VerdictLabel, 

245 reason: VerdictReason, 

246) -> list[str]: 

247 """Deterministic ``recommended_followups`` for templated reports. 

248 

249 Returns 1–3 generic next-step suggestions chosen from the verdict 

250 reason. Pure: same inputs → same outputs. Wording stays short so 

251 callers can render the list as bullet points in a CLI summary. 

252 

253 The ``session`` argument is unused today but kept on the signature 

254 so a future enhancement that consults the iteration history (e.g. 

255 naming the most-used tool) can be added without changing every 

256 call site. 

257 """ 

258 del session # currently unused; kept for signature stability 

259 

260 suggestions: list[str] = [] 

261 

262 if verdict == "complete": 

263 suggestions.append( 

264 "Persist any artefacts produced by the final iteration so the " 

265 "outcome survives beyond the session JSON." 

266 ) 

267 suggestions.append( 

268 "Re-run with tighter criteria thresholds to confirm the result " 

269 "was not a borderline match." 

270 ) 

271 elif reason == "max_iterations": 

272 suggestions.append( 

273 "Re-run with a higher max_iterations cap if more iterations " 

274 "would plausibly close the remaining gap." 

275 ) 

276 suggestions.append( 

277 "Inspect the iteration history for repeated tool sequences and " 

278 "consider tightening the strategy revision heuristic." 

279 ) 

280 elif reason == "max_wall_clock": 

281 suggestions.append( 

282 "Re-run with a higher max_wall_clock_seconds budget, or split " 

283 "the directive into smaller sub-goals." 

284 ) 

285 elif reason == "no_progress": 

286 suggestions.append( 

287 "Re-evaluate criteria thresholds — sustained no-progress may " 

288 "indicate the targets are unreachable with the current tool " 

289 "allowlist." 

290 ) 

291 suggestions.append( 

292 "Widen the tool allowlist or supply a richer directive so the " 

293 "loop can explore alternative strategies." 

294 ) 

295 elif reason == "user_abort": 

296 suggestions.append( 

297 "Resume the session with mission_resume once the manual intervention is complete." 

298 ) 

299 else: 

300 suggestions.append( 

301 "Inspect the iteration history for the last verdict and adjust " 

302 "the directive, criteria, or allowlist accordingly." 

303 ) 

304 

305 suggestions.append( 

306 "These suggestions are templated — re-run with sampling enabled to " 

307 "replace them with model-derived followups." 

308 ) 

309 return suggestions[:3] 

310 

311 

312# -------------------------------------------------------------------------- 

313# Strip helpers — pure 

314# -------------------------------------------------------------------------- 

315 

316 

317def _strip_parsed_ast_from_criteria(criteria: list[dict[str, Any]]) -> list[dict[str, Any]]: 

318 """Return a shallow copy of ``criteria`` with private parser caches removed. 

319 

320 The ``validate_criteria`` validator caches the parsed AST under 

321 ``_parsed_ast`` on every ``predicate`` Criterion. The Final_Report 

322 is meant to be portable JSON, so we strip the cache before 

323 serialisation. The strip is also defensive: the report dict is 

324 later passed through ``json.dumps``, and an ``ast.Expression`` 

325 object would raise there with a less obvious error than this. 

326 """ 

327 cleaned: list[dict[str, Any]] = [] 

328 for criterion in criteria: 

329 if not isinstance(criterion, dict): 329 ↛ 330line 329 didn't jump to line 330 because the condition on line 329 was never true

330 cleaned.append(criterion) 

331 continue 

332 cleaned.append({k: v for k, v in criterion.items() if k != _PARSED_AST_KEY}) 

333 return cleaned 

334 

335 

336def _strip_parsed_ast_from_iterations( 

337 iterations: list[IterationRecord], 

338) -> list[dict[str, Any]]: 

339 """Return a deep copy of the iteration history with parser caches removed. 

340 

341 Walks every nested dict and drops any ``_parsed_ast`` entry it 

342 finds. The validators only cache on Criterion entries today, but 

343 the strip is intentionally broad so a future code path that 

344 accidentally embeds a Criterion (with its cache attached) inside an 

345 IterationRecord cannot corrupt the report's JSON serialisation. 

346 """ 

347 cloned = copy.deepcopy(list(iterations)) 

348 for entry in cloned: 

349 _strip_parsed_ast_in_place(entry) 

350 return cast(list[dict[str, Any]], cloned) 

351 

352 

353def _strip_parsed_ast_in_place(value: Any) -> None: 

354 """Recursively delete ``_parsed_ast`` keys from any nested dict.""" 

355 if isinstance(value, dict): 

356 if _PARSED_AST_KEY in value: 356 ↛ 357line 356 didn't jump to line 357 because the condition on line 356 was never true

357 del value[_PARSED_AST_KEY] 

358 for inner in value.values(): 

359 _strip_parsed_ast_in_place(inner) 

360 elif isinstance(value, list): 

361 for inner in value: 

362 _strip_parsed_ast_in_place(inner) 

363 

364 

365def _final_criteria_evaluation(session: SessionState) -> list[dict[str, Any]] | None: 

366 """Return the last iteration's ``criteria_evaluation`` list, or ``None``. 

367 

368 Used as the ``final_criteria_evaluation`` field on the report so a 

369 consumer can answer "which criteria were met at the moment the 

370 session ended" without scanning the iteration history. 

371 Returns ``None`` when the session ran no iterations — the report is 

372 still useful for sessions that terminated at start (e.g. a 

373 user_abort before the first iteration). 

374 """ 

375 iterations = session.get("iterations") or [] 

376 if not iterations: 

377 return None 

378 last = iterations[-1] 

379 evaluation = last.get("criteria_evaluation") 

380 if not evaluation: 380 ↛ 381line 380 didn't jump to line 381 because the condition on line 380 was never true

381 return None 

382 return [dict(result) for result in evaluation] 

383 

384 

385# -------------------------------------------------------------------------- 

386# Sampler overlay 

387# -------------------------------------------------------------------------- 

388 

389 

390def _safely_invoke_sampler( 

391 sampler: Sampler, 

392 session: SessionState, 

393 verdict: VerdictLabel, 

394 reason: VerdictReason, 

395) -> dict[str, Any] | None: 

396 """Call ``sampler`` and return its dict, or ``None`` on any failure. 

397 

398 A sampler that raises must not block the report from landing — the 

399 Final_Report is the durable exit artifact of the loop. Any 

400 exception is logged at WARNING and swallowed, leaving the 

401 deterministic templates in place. A non-dict return is treated the 

402 same way (logged, ignored). 

403 """ 

404 try: 

405 result = sampler(session, verdict, reason) 

406 except Exception: 

407 logger.warning( 

408 "Mission sampler raised while building Final_Report for session %s; " 

409 "keeping templated lessons / recommended_followups.", 

410 session.get("session_id"), 

411 exc_info=True, 

412 ) 

413 return None 

414 if result is None: 414 ↛ 415line 414 didn't jump to line 415 because the condition on line 414 was never true

415 return None 

416 if not isinstance(result, dict): 

417 logger.warning( 

418 "Mission sampler returned a non-dict (%s) for session %s; " 

419 "keeping templated lessons / recommended_followups.", 

420 type(result).__name__, 

421 session.get("session_id"), 

422 ) 

423 return None 

424 return result 

425 

426 

427def _apply_sampler_overlay(report: dict[str, Any], overlay: dict[str, Any]) -> None: 

428 """Overwrite ``lessons`` and / or ``recommended_followups`` if well-typed. 

429 

430 Each field is overlaid independently: a sampler that produced a 

431 valid ``lessons`` string but malformed ``recommended_followups`` 

432 keeps the lessons replacement and falls back to the template list 

433 for the followups. The shape checks are defensive — a sampler is 

434 free-form by contract, and silently dropping a malformed field is 

435 safer than letting a non-string slip into a downstream consumer. 

436 """ 

437 lessons = overlay.get("lessons") 

438 if isinstance(lessons, str) and lessons: 

439 report["lessons"] = lessons 

440 

441 followups = overlay.get("recommended_followups") 

442 if isinstance(followups, list) and all(isinstance(item, str) for item in followups): 

443 report["recommended_followups"] = list(followups) 

444 

445 

446# -------------------------------------------------------------------------- 

447# Persistence 

448# -------------------------------------------------------------------------- 

449 

450 

451def _write_report_to_filesystem( 

452 backend: FilesystemBackend, 

453 session_id: str, 

454 report: dict[str, Any], 

455) -> str: 

456 """Persist ``report`` as ``<root>/<session_id>.report.json`` atomically. 

457 

458 Mirrors the temp-file + ``fsync`` + ``os.replace`` pattern from 

459 :meth:`FilesystemBackend.save_session`: a partial write leaves the 

460 temp file behind but never replaces the existing report file, so a 

461 reader concurrent with a writer always sees either the prior 

462 version or the new one. Returns the absolute path of the written 

463 file. 

464 

465 Uses :meth:`FilesystemBackend._ensure_root` to lazily create the 

466 backend's root directory on first use; this matches the session 

467 writer and avoids duplicating the directory-creation logic here. 

468 """ 

469 backend._ensure_root() 

470 final = backend.root / f"{session_id}.report.json" 

471 try: 

472 tmp = tempfile.NamedTemporaryFile( # noqa: SIM115 - explicit close+replace below 

473 mode="w", 

474 encoding="utf-8", 

475 dir=str(backend.root), 

476 prefix=f"{session_id}.report.", 

477 suffix=".json.tmp", 

478 delete=False, 

479 ) 

480 try: 

481 json.dump(report, tmp) 

482 tmp.flush() 

483 os.fsync(tmp.fileno()) 

484 finally: 

485 tmp.close() 

486 if os.name != "nt": 486 ↛ 492line 486 didn't jump to line 492 because the condition on line 486 was always true

487 with contextlib.suppress(OSError): 

488 # Same rationale as the session writer: a successful 

489 # fsync is too valuable to abandon over a permission 

490 # tightening that the underlying filesystem refused. 

491 os.chmod(tmp.name, 0o600) 

492 os.replace(tmp.name, final) 

493 except OSError as exc: 

494 # Re-raise with the underlying message intact so operators see 

495 # the real cause (disk full, permission denied) rather than a 

496 # wrapped abstraction. 

497 raise OSError(str(exc)) from exc 

498 return str(final) 

499 

500 

501def _attach_report_to_session( 

502 backend: Any, 

503 session: SessionState, 

504 report: dict[str, Any], 

505) -> str: 

506 """Embed ``report`` on the session and re-save through the backend. 

507 

508 Used for backends that do not write sibling files (today, the 

509 DynamoDB stub). Returns the synthetic identifier 

510 ``"dynamodb://{session_id}/report"`` so the caller has a stable 

511 path-like value to record on ``session["final_report_path"]``. 

512 

513 The session is mutated in place: the ``final_report`` key carries 

514 the report dict so a later ``backend.load_session`` returns the 

515 full payload without a second round-trip. The backend's 

516 ``save_session`` performs whatever atomicity the storage layer 

517 provides (DynamoDB ``put_item`` is single-item-atomic by contract). 

518 """ 

519 # ``final_report`` is not declared on :class:`SessionState`; cast 

520 # through ``dict[str, Any]`` so the assignment lands without a 

521 # TypedDict-unknown-key complaint while keeping the underlying 

522 # session object identity intact. 

523 cast(dict[str, Any], session)["final_report"] = report 

524 backend.save_session(session) 

525 return f"dynamodb://{session['session_id']}/report"