Coverage for mcp/mission/sampling.py: 91%
456 statements
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-15 15:07 +0000
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-15 15:07 +0000
1"""Mission sampling — prompt builders for the advisory LLM path.
3The Mission engine routes optional model-driven advice
4(Strategy_Revision rationales / next-strategy proposals on ``adjust``,
5Final_Report ``lessons`` / ``recommended_followups`` on ``complete`` and
6``terminate``) through a small, transport-agnostic plumbing pipe that
7starts here. This module is the **prompt-assembly half** of that pipe:
8pure Python, sync, no MCP / boto3 / fastmcp imports. Backends, capability
9detection, response validation, and orchestration helpers land in sibling
10sections of this file in subsequent commits.
12The two render methods on :class:`SamplingPrompt` produce a
13deterministic ``str`` payload from the bare data the caller passes in:
15* :meth:`SamplingPrompt.assemble` — the Strategy_Revision prompt. Includes
16 the directive, the Success_Criteria with current per-criterion status,
17 the resolved Tool_Allowlist with each tool's docstring, an explicit
18 budget context block, the last five Iteration summaries (Observation
19 fields larger than :data:`OBSERVATION_FIELD_BYTE_CAP` truncated to
20 :data:`OBSERVATION_FIELD_TRUNCATE_TO` bytes plus the marker
21 :data:`TRUNCATION_MARKER`, with the original byte lengths recorded
22 under the ``_original_bytes`` map), and the JSON Schema instruction
23 block built around :data:`STRATEGY_REVISION_SCHEMA`.
24* :meth:`SamplingPrompt.assemble_final_lessons` — the Final_Report
25 prompt. Reuses the directive / criteria assembly but emits
26 :data:`FINAL_LESSONS_SCHEMA` instead of the strategy-revision schema,
27 and replaces the iteration-by-iteration Observation summaries with a
28 short ``verdict`` / ``verdict_reason`` summary list because the
29 Final_Report path does not need raw Observation history.
31Both render methods cap total output at :data:`PROMPT_BYTE_BUDGET`
32bytes (UTF-8). When the assembled prompt exceeds the cap, the oldest
33Iteration summary is dropped and the prompt re-rendered, repeating
34until the prompt fits. Truncation and dropping are deterministic — the
35same inputs always produce a byte-identical output. This is the
36property the tests under
37``tests/test_mission_sampling.py`` pin down.
38"""
40from __future__ import annotations
42import asyncio
43import json
44import os
45from collections.abc import Mapping, Sequence
46from dataclasses import dataclass, field
47from typing import TYPE_CHECKING, Any, Literal, Protocol, cast, runtime_checkable
49from . import validation as _validation
50from .types import Criterion, CriterionResult, IterationRecord, Observation, Strategy
51from .validation import MissionValidationError
53# <pyflowchart-code-diagram> BEGIN - auto-inserted, do not edit
54# Flowchart(s) generated from this file:
55# * ``maybe_sample_strategy_revision`` -> ``diagrams/code_diagrams/mcp/mission/sampling.maybe_sample_strategy_revision.html``
56# (PNG: ``diagrams/code_diagrams/mcp/mission/sampling.maybe_sample_strategy_revision.png``)
57# Regenerate with ``python diagrams/code_diagrams/generate.py``.
58# <pyflowchart-code-diagram> END
61if TYPE_CHECKING: # pragma: no cover - import-time only
62 # ``fastmcp.Context`` is the concrete type expected by
63 # :class:`MCPSamplingBackend`. Kept behind ``TYPE_CHECKING`` so the
64 # runtime import surface stays pure-stdlib; the backend itself
65 # accepts any object that exposes a compatible ``sample`` method.
66 from fastmcp import Context as _FastMCPContext # noqa: F401
68__all__ = [
69 "BEDROCK_MAX_TOKENS",
70 "BEDROCK_TEMPERATURE",
71 "DEFAULT_BEDROCK_MODEL_ID",
72 "DEFAULT_BEDROCK_REGION",
73 "ENV_BEDROCK_MODEL_ID",
74 "ENV_BEDROCK_REGION",
75 "ENVIRONMENT_CONTEXT_BYTE_CAP",
76 "FINAL_LESSONS_SCHEMA",
77 "BedrockSamplingBackend",
78 "MCPSamplingBackend",
79 "MissionValidationError",
80 "OBSERVATION_FIELD_BYTE_CAP",
81 "OBSERVATION_FIELD_TRUNCATE_TO",
82 "PROMPT_BYTE_BUDGET",
83 "RECENT_ITERATIONS_LIMIT",
84 "STRATEGY_REVISION_SCHEMA",
85 "STRATEGY_SHAPE_SCHEMA",
86 "SamplingBackend",
87 "SamplingFallback",
88 "SamplingPrompt",
89 "SamplingTransportError",
90 "SamplingUsed",
91 "TRUNCATION_MARKER",
92 "maybe_sample_final_lessons",
93 "maybe_sample_strategy_revision",
94 "resolve_sampling_state",
95 "select_sampling_backend",
96 "validate_strategy_against_catalog",
97]
100# ---------------------------------------------------------------------------
101# Tunables (named so tests can reference them without hard-coding magic)
102# ---------------------------------------------------------------------------
104#: Per-Observation-field byte cap. Fields whose JSON-serialised UTF-8
105#: byte length exceeds this value are truncated. A field whose byte
106#: length is exactly equal to this value is **not** truncated — the
107#: comparison uses strict greater-than to keep the boundary stable.
108OBSERVATION_FIELD_BYTE_CAP: int = 4096
110#: Target byte length after truncation. The truncated string is the
111#: first ``OBSERVATION_FIELD_TRUNCATE_TO`` bytes of the JSON-serialised
112#: form, decoded with ``errors="ignore"`` so a multi-byte boundary in
113#: the middle of a UTF-8 codepoint cannot raise, with
114#: :data:`TRUNCATION_MARKER` appended.
115OBSERVATION_FIELD_TRUNCATE_TO: int = 2048
117#: Marker appended to every truncated field so the reader can see at a
118#: glance the field was clipped.
119TRUNCATION_MARKER: str = "... [truncated]"
121#: Total prompt byte budget. The render methods drop the oldest
122#: Iteration summary one at a time until ``len(prompt.encode("utf-8"))
123#: <= PROMPT_BYTE_BUDGET``.
124PROMPT_BYTE_BUDGET: int = 32768
126#: Maximum number of Iteration summaries to include even if the byte
127#: budget is plentiful. The caller is expected to pass at most this
128#: many already; the builder slices defensively.
129RECENT_ITERATIONS_LIMIT: int = 5
131#: Per-Environment-context byte cap. The optional environment context
132#: block (``=== Environment context ===``) is its own truncation
133#: domain so the section can never grow without bound and push the
134#: rest of the prompt over :data:`PROMPT_BYTE_BUDGET`. The cap mirrors
135#: :data:`OBSERVATION_FIELD_BYTE_CAP` because both surfaces hold the
136#: same flavour of structured live signal (cluster + queue snapshots
137#: in this case) and the same truncation marker convention applies.
138ENVIRONMENT_CONTEXT_BYTE_CAP: int = 4096
141# ---------------------------------------------------------------------------
142# Environment context summarisation
143# ---------------------------------------------------------------------------
146def _summarise_environment_context(env: Mapping[str, Any]) -> dict[str, Any]:
147 """Return a JSON-safe, byte-capped summary of the environment context.
149 The block is rendered into the Strategy_Revision prompt under
150 ``=== Environment context ===``. It carries small, slow-moving
151 live signals — per-region queue depths, GPU utilisation, deployed
152 region list, reservation counts — that the model would otherwise
153 have to spend tool calls to discover.
155 Two guarantees on the output:
157 1. The serialised form fits inside :data:`ENVIRONMENT_CONTEXT_BYTE_CAP`
158 UTF-8 bytes. When the input does not, top-level fields are
159 evaluated in sorted-key order, dropped one at a time from the
160 largest contributor down, and the dropped key list is recorded
161 under ``"_dropped_fields"`` so the operator can spot which
162 inputs got pruned.
163 2. Top-level keys are emitted in sorted order so two callers
164 passing semantically-identical dicts produce a byte-identical
165 block — the same property the determinism tests pin down for
166 Observation summaries.
167 """
168 # Defensive copy + sort so insertion order doesn't leak.
169 ordered: dict[str, Any] = {key: env[key] for key in sorted(env.keys())}
170 serialised = _dumps(ordered)
171 if _utf8_len(serialised) <= ENVIRONMENT_CONTEXT_BYTE_CAP:
172 return ordered
174 # Drop largest top-level field first, repeating until under cap.
175 # Records dropped keys so the operator (and the audit pipeline)
176 # can see what got pruned without having to diff against the
177 # gather helper's output.
178 dropped: list[str] = []
179 working = dict(ordered)
180 while _utf8_len(_dumps(working)) > ENVIRONMENT_CONTEXT_BYTE_CAP and working:
181 biggest_key = max(working, key=lambda k: _utf8_len(_dumps(working[k])))
182 dropped.append(biggest_key)
183 del working[biggest_key]
185 if dropped: 185 ↛ 189line 185 didn't jump to line 189 because the condition on line 185 was always true
186 # Sort the dropped list so its position in the prompt is stable
187 # regardless of which key happened to be biggest first.
188 working["_dropped_fields"] = sorted(dropped)
189 return working
192# ---------------------------------------------------------------------------
193# Bedrock backend tunables
194# ---------------------------------------------------------------------------
196#: Default Bedrock model identifier. Mirrors
197#: ``cli.capacity.advisor.BedrockCapacityAdvisor.DEFAULT_MODEL`` so an
198#: operator who has cleared Bedrock model access for the capacity
199#: advisor automatically gets the same model for Mission sampling.
200DEFAULT_BEDROCK_MODEL_ID: str = "us.anthropic.claude-sonnet-4-5-20250929-v1:0"
202#: Default Bedrock region. The capacity advisor pins ``us-east-1`` for
203#: the same reason: cross-region inference profiles routinely surface
204#: in ``us-east-1`` first and our installations have it whitelisted.
205DEFAULT_BEDROCK_REGION: str = "us-east-1"
207#: Env var that overrides :data:`DEFAULT_BEDROCK_MODEL_ID` at runtime.
208ENV_BEDROCK_MODEL_ID: str = "GCO_MISSION_BEDROCK_MODEL_ID"
210#: Env var that overrides :data:`DEFAULT_BEDROCK_REGION` at runtime.
211ENV_BEDROCK_REGION: str = "GCO_MISSION_BEDROCK_REGION"
213#: Maximum response tokens for the Bedrock Converse call. Sized for the
214#: criteria-array prompt's worst case: a five-criterion array with
215#: predicates can be 800-1500 output tokens by itself, and reasoning
216#: models (DeepSeek R1, Claude reasoning variants) consume an
217#: additional 1500-3000 think tokens that count against the same
218#: budget. 8192 leaves comfortable headroom on every visible CRIS
219#: profile while staying well under any model's context window.
220BEDROCK_MAX_TOKENS: int = 8192
222#: Sampling temperature for the Bedrock Converse call. Low — the
223#: scaffolder asks for a strict JSON-array shape, not creative
224#: variety; high temperatures invite the model to wander into
225#: rejected attribute / method shapes.
226BEDROCK_TEMPERATURE: float = 0.2
229# ---------------------------------------------------------------------------
230# JSON Schemas — embedded as module-level constants
231# ---------------------------------------------------------------------------
233# The Strategy shape mirrors the ``Strategy`` TypedDict from
234# ``mcp/mission/types.py``: every key is optional in isolation and the
235# validator enforces the mutual-exclusivity invariant (exactly one of
236# ``tool_calls`` or ``script`` populated). The schema below mirrors
237# that with a ``oneOf`` clause. The model-side validator in subsequent
238# commits performs the same check on parsed responses; this schema is
239# the textual instruction the prompt embeds for the model.
240STRATEGY_SHAPE_SCHEMA: dict[str, Any] = {
241 "type": "object",
242 "additionalProperties": False,
243 "properties": {
244 "tool_calls": {
245 "type": "array",
246 "minItems": 1,
247 "items": {
248 "type": "object",
249 "additionalProperties": True,
250 "required": ["tool_name", "args"],
251 "properties": {
252 "tool_name": {"type": "string", "minLength": 1},
253 "args": {"type": "object"},
254 },
255 },
256 },
257 "script": {"type": "string", "minLength": 1},
258 "expected_observation_keys": {
259 "type": "array",
260 "items": {"type": "string"},
261 },
262 "rationale": {"type": "string"},
263 },
264 "oneOf": [
265 {"required": ["tool_calls"]},
266 {"required": ["script"]},
267 ],
268}
270#: JSON Schema for the model's response when called for a
271#: Strategy_Revision. The model must return exactly these three keys.
272STRATEGY_REVISION_SCHEMA: dict[str, Any] = {
273 "$schema": "http://json-schema.org/draft-07/schema#",
274 "title": "Mission strategy revision",
275 "type": "object",
276 "additionalProperties": False,
277 "required": ["revision_rationale", "next_strategy", "confidence"],
278 "properties": {
279 "revision_rationale": {"type": "string", "minLength": 1},
280 "next_strategy": STRATEGY_SHAPE_SCHEMA,
281 "confidence": {
282 "type": "number",
283 "minimum": 0.0,
284 "maximum": 1.0,
285 },
286 },
287}
289#: JSON Schema for the model's response when called from the
290#: Final_Report writer.
291FINAL_LESSONS_SCHEMA: dict[str, Any] = {
292 "$schema": "http://json-schema.org/draft-07/schema#",
293 "title": "Mission final lessons",
294 "type": "object",
295 "additionalProperties": False,
296 "required": ["lessons", "recommended_followups"],
297 "properties": {
298 "lessons": {
299 "type": "array",
300 "minItems": 1,
301 "items": {"type": "string", "minLength": 1},
302 },
303 "recommended_followups": {
304 "type": "array",
305 "items": {"type": "string", "minLength": 1},
306 },
307 },
308}
311# ---------------------------------------------------------------------------
312# JSON helpers — every dump in this module routes through ``_dumps``
313# so the byte-counting and the rendered prompt agree on the encoding.
314# ---------------------------------------------------------------------------
317def _dumps(value: Any, *, indent: int | None = None) -> str:
318 """Deterministic JSON encoder used everywhere in this module.
320 ``sort_keys=True`` is the source of determinism — Python dicts are
321 insertion-ordered, but the Hypothesis strategies that drive the
322 determinism tests build dicts via ``fixed_dictionaries`` whose
323 insertion order is implementation-defined, so sorting is the only
324 way to get byte-identical output across two draws of the same
325 abstract dict shape. ``ensure_ascii=False`` keeps non-ASCII text
326 intact so the byte-budget bookkeeping matches what the LLM sees.
327 """
328 return json.dumps(
329 value,
330 sort_keys=True,
331 ensure_ascii=False,
332 indent=indent,
333 separators=(",", ": ") if indent is not None else (",", ":"),
334 )
337def _utf8_len(s: str) -> int:
338 """UTF-8 byte length of ``s`` — the only "size" the budget cares about."""
339 return len(s.encode("utf-8"))
342def _truncate_serialised(serialised: str) -> str:
343 """Slice a serialised value down to ``OBSERVATION_FIELD_TRUNCATE_TO``
344 bytes plus :data:`TRUNCATION_MARKER`. Decode-safe.
346 The slicing is byte-level rather than codepoint-level because the
347 cap itself is a byte budget. Using ``errors="ignore"`` strips any
348 partial codepoint at the boundary so the result is always valid
349 UTF-8 — at the cost of dropping at most three bytes' worth of an
350 incomplete codepoint, which is acceptable for an advisory summary.
351 """
352 truncated_bytes = serialised.encode("utf-8")[:OBSERVATION_FIELD_TRUNCATE_TO]
353 truncated_str = truncated_bytes.decode("utf-8", errors="ignore")
354 return truncated_str + TRUNCATION_MARKER
357# ---------------------------------------------------------------------------
358# Observation summarisation
359# ---------------------------------------------------------------------------
362def _summarise_observation(obs: Mapping[str, Any] | Observation) -> dict[str, Any]:
363 """Return a JSON-safe summary of an Observation with oversized fields
364 truncated and the original byte lengths recorded.
366 The summary mirrors the Observation's top-level keys. For each key
367 whose JSON-serialised value exceeds :data:`OBSERVATION_FIELD_BYTE_CAP`
368 bytes, the value is replaced by the byte-clamped + marker string and
369 the original byte length is recorded under
370 ``summary["_original_bytes"][<key>]``. Fields at or below the cap pass
371 through unchanged.
373 The ``_original_bytes`` private key is omitted entirely when no field
374 was truncated so the summary stays clean for the common case.
375 """
376 obs_map: Mapping[str, Any] = cast("Mapping[str, Any]", obs)
377 summary: dict[str, Any] = {}
378 original_bytes: dict[str, int] = {}
379 # Sorting the keys guarantees the rendered prompt is byte-identical
380 # even when the caller's dict was built in a different insertion
381 # order than another caller's identical-shape dict.
382 for key in sorted(obs_map.keys()):
383 if key == "_original_bytes": 383 ↛ 387line 383 didn't jump to line 387 because the condition on line 383 was never true
384 # A defensively-guarded passthrough: a previous summarisation
385 # round (e.g., a re-render after dropping iterations) must
386 # not double-count the marker map.
387 continue
388 value = obs_map[key]
389 serialised = _dumps(value)
390 n_bytes = _utf8_len(serialised)
391 if n_bytes > OBSERVATION_FIELD_BYTE_CAP:
392 summary[key] = _truncate_serialised(serialised)
393 original_bytes[key] = n_bytes
394 else:
395 summary[key] = value
396 if original_bytes:
397 summary["_original_bytes"] = original_bytes
398 return summary
401def _summarise_iteration(iteration: Mapping[str, Any] | IterationRecord) -> dict[str, Any]:
402 """Build the per-iteration summary that feeds the Strategy_Revision prompt.
404 The summary keeps just the fields a downstream model needs to
405 reason about: the iteration index, the strategy that was tried,
406 the verdict + reason, and the size-capped Observation. Phase
407 timestamps and the criteria-evaluation list are intentionally
408 omitted because a) they are deterministic functions of fields the
409 model already sees in the criteria-status block, and b) keeping
410 them out shrinks the per-iteration footprint so the byte budget
411 holds with five iterations more often.
412 """
413 obs = iteration.get("observation") or {}
414 return {
415 "iteration_index": iteration.get("iteration_index"),
416 "strategy": iteration.get("strategy") or {},
417 "verdict": iteration.get("verdict"),
418 "verdict_reason": iteration.get("verdict_reason"),
419 "observation_summary": _summarise_observation(obs),
420 }
423def _summarise_iteration_for_lessons(
424 iteration: Mapping[str, Any] | IterationRecord,
425) -> dict[str, Any]:
426 """Final_Report summary — verdict + reason only, no Observation.
428 The Final_Report path needs to reason about *what happened* across
429 the run, not the per-iteration tool output. Dropping the Observation
430 keeps the prompt small enough that the byte budget never bites in
431 practice for sessions of any reasonable length.
432 """
433 return {
434 "iteration_index": iteration.get("iteration_index"),
435 "verdict": iteration.get("verdict"),
436 "verdict_reason": iteration.get("verdict_reason"),
437 }
440# ---------------------------------------------------------------------------
441# Criteria status pairing
442# ---------------------------------------------------------------------------
445def _pair_criteria_with_status(
446 criteria: Sequence[Criterion],
447 statuses: Sequence[CriterionResult],
448) -> list[dict[str, Any]]:
449 """Return ``criteria`` annotated with their most recent status entry.
451 Each entry in the result mirrors the Criterion definition (the kind,
452 the required-flag, the kind-specific payload keys) and adds a
453 nested ``status`` block populated from the matching ``CriterionResult``
454 by ``criterion_id``. Criteria with no matching status entry get
455 ``status`` set to ``{"status": "inconclusive", "evidence": null}``
456 so the model always sees a stable shape.
458 The ``_parsed_ast`` private key on a ``predicate`` criterion is
459 stripped — it is a Python ``ast.Expression`` object that is not
460 JSON-serialisable and that the model has no use for.
461 """
462 by_id: dict[str, CriterionResult] = {}
463 for s in statuses:
464 cid = s.get("criterion_id")
465 if cid is None: 465 ↛ 466line 465 didn't jump to line 466 because the condition on line 465 was never true
466 continue
467 # If the caller passes duplicates (older then newer), prefer the
468 # last entry — that's the most-recent-wins convention the engine
469 # uses everywhere else.
470 by_id[cid] = s
472 out: list[dict[str, Any]] = []
473 for c in criteria:
474 cid = c.get("criterion_id")
475 # Strip private cached AST and surface only the prompt-relevant fields.
476 public = {k: v for k, v in c.items() if not k.startswith("_")}
477 match = by_id.get(cid) if cid is not None else None
478 if match is None:
479 public["status"] = {
480 "status": "inconclusive",
481 "evidence": None,
482 }
483 else:
484 public["status"] = {
485 "status": match.get("status"),
486 "evidence": match.get("evidence"),
487 "evaluated_at": match.get("evaluated_at"),
488 }
489 out.append(public)
490 return out
493# ---------------------------------------------------------------------------
494# Tool allowlist rendering
495# ---------------------------------------------------------------------------
498def _render_tool_allowlist(
499 allowlist: Sequence[str],
500 docstrings: Mapping[str, str],
501 schemas: Mapping[str, Any] | None = None,
502) -> list[dict[str, Any]]:
503 """Pair every allowlisted tool name with its docstring and input schema.
505 Tools without a registered docstring get an empty string — the
506 prompt remains valid; the model just sees a tool name with no
507 inline description. Tools without a schema get ``null`` so the
508 model knows no args are required. Names are emitted in the
509 caller's allowlist order so the prompt is identical for two
510 callers that pass the same list.
511 """
512 rendered: list[dict[str, Any]] = []
513 for name in allowlist:
514 entry: dict[str, Any] = {
515 "tool_name": name,
516 "docstring": str(docstrings.get(name, "")),
517 }
518 if schemas:
519 schema = schemas.get(name)
520 if schema is not None: 520 ↛ 522line 520 didn't jump to line 522 because the condition on line 520 was always true
521 entry["input_schema"] = schema
522 rendered.append(entry)
523 return rendered
526# ---------------------------------------------------------------------------
527# Budget context rendering
528# ---------------------------------------------------------------------------
531def _render_budget_context(
532 *,
533 remaining_iterations: int,
534 remaining_wall_clock_secs: float | None,
535 allow_scripts: bool,
536) -> dict[str, Any]:
537 """Render the budget context block. Stable shape regardless of inputs.
539 ``None`` for the wall-clock cap is rendered verbatim as JSON
540 ``null`` so the model can disambiguate "unbounded" from "0".
541 """
542 return {
543 "remaining_iterations": int(remaining_iterations),
544 "remaining_wall_clock_seconds": (
545 float(remaining_wall_clock_secs) if remaining_wall_clock_secs is not None else None
546 ),
547 "allow_scripted_strategies": bool(allow_scripts),
548 }
551# ---------------------------------------------------------------------------
552# SamplingPrompt — the public class
553# ---------------------------------------------------------------------------
556@dataclass(frozen=True)
557class SamplingPrompt:
558 """Bundle the bare data needed to assemble a sampling prompt string.
560 The dataclass is ``frozen=True`` so callers cannot mutate the inputs
561 between an :meth:`assemble` call and an :meth:`assemble_final_lessons`
562 call — both methods produce deterministic outputs from the same
563 bound state, which is the property the determinism tests pin down.
565 All inputs are required positionally or by keyword; defaults are
566 only provided where the design spec defines a default.
567 """
569 directive: str
570 success_criteria: Sequence[Criterion]
571 criteria_status: Sequence[CriterionResult]
572 recent_iterations: Sequence[IterationRecord]
573 tool_allowlist: Sequence[str]
574 tool_docstrings: Mapping[str, str]
575 remaining_iterations: int
576 remaining_wall_clock_secs: float | None
577 allow_scripts: bool = field(default=False)
578 #: Per-tool JSON Schema for the input parameters. Keyed by tool
579 #: name; values are the JSON-serialisable schema dict (or ``None``
580 #: for tools that take no args). Included in the prompt so the
581 #: Strategy_Revision model can propose valid ``args`` dicts.
582 tool_schemas: Mapping[str, Any] = field(default_factory=dict)
583 #: Optional snapshot of slow-moving live signals (per-region queue
584 #: depth, GPU utilisation, deployed-region list, reservation
585 #: counts, etc.) gathered once at session start and reused on
586 #: every iteration's prompt. ``None`` (the default) suppresses the
587 #: ``=== Environment context ===`` section entirely so the prompt
588 #: stays byte-identical to the pre-environment-context shape —
589 #: that's what every existing determinism test pins down.
590 environment_context: Mapping[str, Any] | None = field(default=None)
592 # ---- Strategy_Revision rendering --------------------------------------
594 def assemble(self) -> str:
595 """Return the Strategy_Revision prompt string.
597 The output is capped at :data:`PROMPT_BYTE_BUDGET` UTF-8 bytes.
598 When the freshly-assembled prompt exceeds the cap, the oldest
599 Iteration summary is dropped and the prompt re-rendered. The
600 loop terminates because each drop monotonically shrinks the
601 prompt and there is a non-iteration baseline that fits well
602 under the cap on its own (the directive, criteria, allowlist,
603 budget block, and schema instruction together are ~6-10 KB
604 for any reasonable session shape).
605 """
606 # Defensive slice — the caller is asked to pass at most five,
607 # but if they pass more, take the most recent five.
608 iterations: list[IterationRecord] = list(self.recent_iterations[-RECENT_ITERATIONS_LIMIT:])
610 while True:
611 text = self._render(
612 schema=STRATEGY_REVISION_SCHEMA,
613 iterations=[_summarise_iteration(it) for it in iterations],
614 schema_purpose="strategy_revision",
615 )
616 if _utf8_len(text) <= PROMPT_BYTE_BUDGET or not iterations:
617 return text
618 # Drop the oldest iteration and try again.
619 iterations = iterations[1:]
621 # ---- Final_Report rendering -------------------------------------------
623 def assemble_final_lessons(self) -> str:
624 """Return the Final_Report ``lessons`` prompt string.
626 The shape parallels :meth:`assemble` but emits
627 :data:`FINAL_LESSONS_SCHEMA` and uses iteration **verdict
628 summaries only** instead of full Observation summaries. The same
629 :data:`PROMPT_BYTE_BUDGET` byte cap applies; the same
630 oldest-first drop policy kicks in if the cap is exceeded.
631 """
632 iterations: list[IterationRecord] = list(self.recent_iterations)
634 while True:
635 text = self._render(
636 schema=FINAL_LESSONS_SCHEMA,
637 iterations=[_summarise_iteration_for_lessons(it) for it in iterations],
638 schema_purpose="final_lessons",
639 )
640 if _utf8_len(text) <= PROMPT_BYTE_BUDGET or not iterations: 640 ↛ 642line 640 didn't jump to line 642 because the condition on line 640 was always true
641 return text
642 iterations = iterations[1:]
644 # ---- Internal renderer ------------------------------------------------
646 def _render(
647 self,
648 *,
649 schema: dict[str, Any],
650 iterations: Sequence[Mapping[str, Any]],
651 schema_purpose: str,
652 ) -> str:
653 """Format the full prompt from the section blocks.
655 The text layout is fixed — every section is delimited by a
656 ``=== <name> ===`` header so the model can latch onto a
657 predictable structure. Section bodies are JSON wherever the
658 content is structured; the directive itself is rendered as
659 free text because that is how the operator wrote it.
660 """
661 criteria_block = _pair_criteria_with_status(self.success_criteria, self.criteria_status)
662 tool_block = _render_tool_allowlist(
663 self.tool_allowlist, self.tool_docstrings, self.tool_schemas
664 )
665 budget_block = _render_budget_context(
666 remaining_iterations=self.remaining_iterations,
667 remaining_wall_clock_secs=self.remaining_wall_clock_secs,
668 allow_scripts=self.allow_scripts,
669 )
671 if schema_purpose == "strategy_revision":
672 preamble = (
673 "You are advising a Mission goal-directed iteration loop. "
674 "Propose the next Strategy that moves the Mission toward "
675 "satisfying its Success_Criteria. The Verdict label, "
676 "budget enforcement, and Criteria evaluation are all "
677 "computed server-side and are unaffected by your output. "
678 "Your role is advisory: the rationale and next_strategy "
679 "you produce are validated against the Tool_Allowlist and "
680 "the remaining budget before being adopted.\n\n"
681 "IMPORTANT: You may propose MULTIPLE tool calls in a "
682 "single iteration by including multiple entries in the "
683 "tool_calls array. This is especially useful when the "
684 "unmet criteria require results from different tools — "
685 "calling them all in one iteration lets the evaluator "
686 "see all results together. Use the input_schema in the "
687 "Tool allowlist section to construct valid args for each "
688 "tool call."
689 )
690 recent_header = "Recent iterations (oldest first)"
691 else:
692 preamble = (
693 "You are advising a Mission goal-directed iteration loop "
694 "that has just reached a terminal Verdict. Produce the "
695 "lessons learned and the recommended follow-ups for the "
696 "operator. Your output is merged into the Final_Report; "
697 "the Verdict label, budget bookkeeping, and Criteria "
698 "evaluation that produced the terminal state are "
699 "deterministic server-side outputs and are not under "
700 "review."
701 )
702 recent_header = "Iteration verdict summary (oldest first)"
704 sections: list[str] = []
705 sections.append(preamble)
706 sections.append("")
707 sections.append("=== Mission directive ===")
708 sections.append(self.directive)
709 sections.append("")
710 sections.append("=== Success criteria with current status ===")
711 sections.append(_dumps(criteria_block, indent=2))
712 sections.append("")
713 sections.append("=== Tool allowlist ===")
714 sections.append(_dumps(tool_block, indent=2))
715 sections.append("")
716 sections.append("=== Budget context ===")
717 sections.append(_dumps(budget_block, indent=2))
718 sections.append("")
719 if self.environment_context is not None:
720 # Truncated + key-sorted — see :func:`_summarise_environment_context`.
721 # Emitting a header even for an empty dict means a session
722 # that opted in but had a probe failure still surfaces
723 # "we tried" so the operator can act on the gap.
724 env_summary = _summarise_environment_context(self.environment_context)
725 sections.append("=== Environment context (slow-moving live signals) ===")
726 sections.append(_dumps(env_summary, indent=2))
727 sections.append("")
728 sections.append(f"=== {recent_header} ===")
729 sections.append(_dumps(list(iterations), indent=2))
730 sections.append("")
731 sections.append("=== Output schema ===")
732 sections.append(
733 "Respond with a single JSON object that validates against "
734 "the JSON Schema below. Do not include any prose outside "
735 "the JSON object."
736 )
737 sections.append(_dumps(schema, indent=2))
739 return "\n".join(sections)
742# ---------------------------------------------------------------------------
743# Backend protocol and transport-error type
744# ---------------------------------------------------------------------------
747@runtime_checkable
748class SamplingBackend(Protocol):
749 """Transport-agnostic surface for the advisory LLM call.
751 Implementations bind a concrete transport (e.g., the MCP
752 ``Context.sample`` capability or ``bedrock-runtime:Converse``) and
753 expose a single async ``sample`` entry point. The protocol is
754 ``runtime_checkable`` so call sites — and tests — can use
755 ``isinstance(backend, SamplingBackend)`` to gate dispatch on a
756 duck-typed backend instance.
758 Attributes:
759 backend_name: Stable identifier the audit pipeline emits in the
760 ``sampling_backend`` field. Constrained to the two
761 transports the system supports today.
762 model_id: The concrete model identifier the backend will route
763 the prompt to. Echoed in audit events so replay can
764 reproduce the exact request.
765 """
767 backend_name: Literal["mcp", "bedrock"]
768 model_id: str
770 async def sample(self, prompt: SamplingPrompt) -> str:
771 """Render ``prompt`` through the bound transport and return the
772 raw model output text. Implementations raise
773 :class:`SamplingTransportError` (with a transport-tagged
774 ``code``) on any transport-layer failure so the engine's
775 fallback policy can branch on a single, well-typed exception.
776 """
777 ...
780class SamplingTransportError(Exception):
781 """Transport-layer failure raised by a :class:`SamplingBackend`.
783 The mandatory ``code`` attribute tags the failure class so the
784 engine's deterministic-fallback path can branch on a stable string
785 without parsing the message. The convention is
786 ``"<backend>_<error_class>"`` for backend-specific failures and a
787 short, snake-cased label for backend-agnostic failures.
789 Documented codes (used elsewhere in the Mission stack):
791 * ``"bedrock_AccessDeniedException"`` — IAM denied
792 ``bedrock:InvokeModel`` for the resolved model.
793 * ``"mcp_unavailable"`` — the MCP transport reported no sampling
794 capability or the in-flight call was cancelled.
795 * ``"bedrock_malformed_response"`` — Converse returned a payload
796 that did not have the expected ``output.message.content[0].text``
797 shape.
798 * ``"bedrock_no_credentials"`` — the local ``boto3`` session could
799 not resolve credentials.
801 Args:
802 code: Mandatory failure tag (see examples above).
803 message: Optional human-readable detail. When present, it is
804 joined to ``code`` with ``": "`` for the string
805 representation; when absent, ``str(self)`` is just the
806 ``code``.
807 """
809 def __init__(self, code: str, message: str | None = None) -> None:
810 self.code: str = code
811 self.message: str | None = message
812 # Forward the most useful single-line representation to
813 # ``Exception.__init__`` so ``logging`` / ``traceback`` modules
814 # show the same string ``str(self)`` produces below.
815 if message is None:
816 super().__init__(code)
817 else:
818 super().__init__(f"{code}: {message}")
820 def __str__(self) -> str:
821 if self.message is None:
822 return self.code
823 return f"{self.code}: {self.message}"
826# ---------------------------------------------------------------------------
827# MCPSamplingBackend — routes the prompt through a FastMCP Context
828# ---------------------------------------------------------------------------
831class MCPSamplingBackend:
832 """Sampling backend that calls ``ctx.sample`` on a FastMCP-style Context.
834 The constructor accepts any object exposing an awaitable ``sample``
835 method — typically ``fastmcp.Context``. The type is intentionally
836 duck-typed so ``fastmcp`` is not a runtime import requirement of
837 this module.
839 Two FastMCP API quirks are absorbed here:
841 1. ``ctx.sample`` returns either a bare string or an object with a
842 ``.text`` attribute, depending on the FastMCP version. Both
843 shapes are accepted; anything else surfaces as
844 :class:`SamplingTransportError` with code
845 ``"mcp_unexpected_response_type"``.
846 2. The keyword carrying ``ModelPreferences`` has been spelled both
847 ``modelPreferences`` (camelCase, MCP wire format) and
848 ``model_preferences`` (snake_case, older Python binding). The
849 backend tries the camelCase form first and falls back to the
850 snake_case form on a ``TypeError`` so it works against either
851 FastMCP release without pinning a version.
853 Any other exception from the transport is re-raised as
854 :class:`SamplingTransportError` with code
855 ``"mcp_<ExceptionClassName>"`` and the original exception preserved
856 via ``__cause__``.
857 """
859 backend_name: Literal["mcp", "bedrock"] = "mcp"
861 def __init__(
862 self,
863 ctx: Any,
864 model_id: str | None = None,
865 prefs: dict[str, Any] | None = None,
866 ) -> None:
867 """Bind the Context, the optional model id, and the preferences dict.
869 Args:
870 ctx: A FastMCP ``Context`` (or any duck-compatible object
871 exposing an awaitable ``sample(text, **kwargs)``
872 method). Stored verbatim.
873 model_id: Optional concrete model identifier. Echoed in
874 audit events. Defaults to the empty string when not
875 provided so the attribute is always present.
876 prefs: Optional FastMCP ``ModelPreferences`` payload. Passed
877 straight through to ``ctx.sample`` when not ``None``;
878 omitted from the call entirely when ``None``. The
879 payload is not validated here — that is the transport's
880 job.
881 """
882 self._ctx = ctx
883 self.model_id: str = model_id if model_id is not None else ""
884 self._prefs = prefs
886 async def sample(self, prompt: SamplingPrompt) -> str:
887 """Render ``prompt`` through ``ctx.sample`` and return the raw text.
889 Raises:
890 SamplingTransportError: On any transport-level failure or
891 when the transport returns an unexpected response shape.
892 The original exception is chained via ``__cause__``.
893 """
894 text = prompt.assemble()
895 try:
896 if self._prefs is None:
897 result = await self._ctx.sample(text)
898 else:
899 # Compatibility shim: try MCP-spec camelCase first; on a
900 # signature mismatch, fall back to the snake_case form.
901 # The ``TypeError`` here is a binding-version concern,
902 # not a transport failure, so it is NOT translated to a
903 # SamplingTransportError.
904 try:
905 result = await self._ctx.sample(text, modelPreferences=self._prefs)
906 except TypeError:
907 result = await self._ctx.sample(text, model_preferences=self._prefs)
909 if hasattr(result, "text"):
910 return str(result.text)
911 if isinstance(result, str):
912 return result
913 raise SamplingTransportError(
914 "mcp_unexpected_response_type",
915 message=f"got {type(result).__name__}",
916 )
917 except SamplingTransportError:
918 # Already tagged by us — let it propagate untouched so the
919 # ``code`` attribute is preserved.
920 raise
921 except Exception as err: # noqa: BLE001 - intentional broad catch
922 raise SamplingTransportError(f"mcp_{type(err).__name__}") from err
925# ---------------------------------------------------------------------------
926# BedrockSamplingBackend — routes the prompt through bedrock-runtime:Converse
927# ---------------------------------------------------------------------------
930class BedrockSamplingBackend:
931 """Sampling backend that calls ``bedrock-runtime:Converse``.
933 The backend resolves its model id and region at construction time
934 from (in order of precedence) the explicit constructor argument,
935 the matching environment variable
936 (:data:`ENV_BEDROCK_MODEL_ID` / :data:`ENV_BEDROCK_REGION`), and
937 finally the module-level default
938 (:data:`DEFAULT_BEDROCK_MODEL_ID` /
939 :data:`DEFAULT_BEDROCK_REGION`). The ``boto3`` client itself is
940 constructed lazily on the first :meth:`sample` call so that
941 ``import mission.sampling`` does not pull ``boto3`` into the
942 import graph and so that test code can swap the import in via
943 ``unittest.mock.patch`` without paying for a real session at
944 construction time.
946 Failure modes:
948 * Missing or partial AWS credentials at client-construction time
949 surface as :class:`SamplingTransportError` with code
950 ``"bedrock_no_credentials"``; the original exception is chained
951 via ``__cause__``.
952 * A ``botocore.exceptions.ClientError`` from the ``Converse`` call
953 surfaces as :class:`SamplingTransportError` with code
954 ``"bedrock_<ErrorCode>"`` where ``<ErrorCode>`` is read from the
955 error envelope (defaulting to ``"Unknown"`` when the envelope is
956 malformed).
957 * A response that does not have the expected
958 ``output.message.content[0].text`` shape — including ``None``
959 and empty ``content`` lists — surfaces as
960 :class:`SamplingTransportError` with code
961 ``"bedrock_malformed_response"``.
962 """
964 backend_name: Literal["mcp", "bedrock"] = "bedrock"
966 def __init__(
967 self,
968 model_id: str | None = None,
969 region: str | None = None,
970 ) -> None:
971 """Resolve the model id and region; defer client construction.
973 Args:
974 model_id: Optional explicit model id. When ``None``, falls
975 back to the :data:`ENV_BEDROCK_MODEL_ID` environment
976 variable, then to :data:`DEFAULT_BEDROCK_MODEL_ID`.
977 region: Optional explicit region. When ``None``, falls back
978 to :data:`ENV_BEDROCK_REGION`, then to
979 :data:`DEFAULT_BEDROCK_REGION`.
980 """
981 self.model_id: str = (
982 model_id
983 if model_id is not None
984 else os.environ.get(ENV_BEDROCK_MODEL_ID, DEFAULT_BEDROCK_MODEL_ID)
985 )
986 self._region: str = (
987 region
988 if region is not None
989 else os.environ.get(ENV_BEDROCK_REGION, DEFAULT_BEDROCK_REGION)
990 )
991 # The boto3 client is built on first ``sample`` call. ``None``
992 # here is the sentinel for "not yet constructed".
993 self._client: Any = None
995 def _get_client(self) -> Any:
996 """Return the cached ``bedrock-runtime`` client, building it on first use.
998 ``boto3`` and ``botocore.exceptions`` are imported here rather
999 than at module top-level so that pure-Python consumers of this
1000 module (the prompt builder, the protocol, the error type) do
1001 not pay for the ``boto3`` import. This also lets tests patch
1002 ``mission.sampling.boto3`` after import.
1003 """
1004 if self._client is not None:
1005 return self._client
1006 # Local import — keeps the module's import surface boto3-free.
1007 import boto3
1008 from botocore.exceptions import (
1009 NoCredentialsError,
1010 PartialCredentialsError,
1011 )
1013 try:
1014 self._client = boto3.Session().client("bedrock-runtime", region_name=self._region)
1015 except (NoCredentialsError, PartialCredentialsError) as err:
1016 raise SamplingTransportError("bedrock_no_credentials") from err
1017 return self._client
1019 async def sample(self, prompt: SamplingPrompt) -> str:
1020 """Render ``prompt`` through ``Converse`` and return the response text.
1022 Raises:
1023 SamplingTransportError: On any transport-level failure.
1024 * ``bedrock_no_credentials`` — credentials could not be
1025 resolved by ``boto3`` at client-construction time.
1026 * ``bedrock_<ErrorCode>`` — the ``Converse`` call raised
1027 a ``ClientError``; ``<ErrorCode>`` is the AWS error
1028 code from the envelope.
1029 * ``bedrock_malformed_response`` — the response did not
1030 contain the expected
1031 ``output.message.content[0].text`` shape.
1032 """
1033 # Local import — see ``_get_client`` for the rationale.
1034 from botocore.exceptions import ClientError
1036 client = self._get_client()
1038 text = prompt.assemble()
1039 try:
1040 response = await asyncio.to_thread(
1041 client.converse,
1042 modelId=self.model_id,
1043 messages=[{"role": "user", "content": [{"text": text}]}],
1044 inferenceConfig={
1045 "maxTokens": BEDROCK_MAX_TOKENS,
1046 "temperature": BEDROCK_TEMPERATURE,
1047 },
1048 )
1049 except ClientError as err:
1050 # ``e.response`` is documented to be present on ClientError
1051 # but the envelope shape can vary; defend against missing
1052 # keys so the audit pipeline always sees a tagged code.
1053 envelope = getattr(err, "response", None) or {}
1054 error_block = envelope.get("Error", {}) if isinstance(envelope, dict) else {}
1055 code = (
1056 error_block.get("Code", "Unknown") if isinstance(error_block, dict) else "Unknown"
1057 )
1058 raise SamplingTransportError(f"bedrock_{code}") from err
1060 # Capture token usage from the Converse response for the audit
1061 # trail. The ``usage`` block is present on every successful
1062 # Converse response and carries ``inputTokens`` and
1063 # ``outputTokens``. Store on the instance so callers can read
1064 # it after each sample() call without changing the protocol.
1065 usage = response.get("usage") or {}
1066 self.last_input_tokens: int | None = usage.get("inputTokens")
1067 self.last_output_tokens: int | None = usage.get("outputTokens")
1069 try:
1070 return str(response["output"]["message"]["content"][0]["text"])
1071 except (KeyError, IndexError, TypeError) as err:
1072 raise SamplingTransportError("bedrock_malformed_response") from err
1075# ---------------------------------------------------------------------------
1076# Backend resolver
1077# ---------------------------------------------------------------------------
1080def _ctx_has_sampling_capability(ctx: Any) -> bool:
1081 """Return ``True`` when the MCP context advertises sampling support.
1083 FastMCP exposes the negotiated client capabilities under a couple
1084 of attribute paths depending on its release: the modern
1085 ``ctx.session_capabilities.sampling`` and the older
1086 ``ctx.fastmcp.client_capabilities.sampling``. Either path may be
1087 missing or set to ``None`` on a context that has not finished
1088 capability negotiation. The probe walks both paths defensively
1089 using ``getattr`` so a missing attribute never raises.
1090 """
1091 caps = getattr(ctx, "session_capabilities", None)
1092 if caps is None:
1093 # Older FastMCP versions surface capabilities via fastmcp.client_capabilities.
1094 fastmcp_attr = getattr(ctx, "fastmcp", None)
1095 caps = (
1096 getattr(fastmcp_attr, "client_capabilities", None) if fastmcp_attr is not None else None
1097 )
1098 if caps is None:
1099 return False
1100 sampling = getattr(caps, "sampling", None)
1101 return bool(sampling)
1104def select_sampling_backend(
1105 ctx: Any | None,
1106 model_id: str | None,
1107 prefs: dict[str, Any] | None,
1108) -> SamplingBackend | None:
1109 """Resolve which sampling backend to use for the given call site.
1111 The resolver picks one of three outcomes:
1113 * MCP path — ``ctx`` is non-``None`` and the context advertises
1114 sampling capability. Returns an :class:`MCPSamplingBackend`
1115 bound to ``ctx`` with ``model_id`` and ``prefs`` forwarded.
1116 * CLI path — ``ctx`` is ``None``. Returns a
1117 :class:`BedrockSamplingBackend` constructed with ``model_id``;
1118 credential resolution is deferred to the first ``sample`` call.
1119 * Deterministic-fallback only — ``ctx`` is non-``None`` but the
1120 context does not advertise sampling capability. Returns ``None``.
1122 Args:
1123 ctx: A FastMCP-style ``Context`` for the MCP path, or ``None``
1124 for the CLI path. Anything else passes through the same
1125 duck-typed capability probe used by the MCP backend.
1126 model_id: Optional concrete model identifier. Forwarded to
1127 either backend constructor verbatim.
1128 prefs: Optional FastMCP ``ModelPreferences`` payload. Forwarded
1129 only to :class:`MCPSamplingBackend`; the Bedrock backend
1130 uses its own pinned inference config.
1132 Returns:
1133 A :class:`SamplingBackend` instance, or ``None`` when the only
1134 available path is the deterministic fallback.
1135 """
1136 if ctx is None:
1137 return BedrockSamplingBackend(model_id)
1138 if _ctx_has_sampling_capability(ctx):
1139 return MCPSamplingBackend(ctx, model_id, prefs)
1140 return None
1143# ---------------------------------------------------------------------------
1144# Strategy-against-catalog validator
1145# ---------------------------------------------------------------------------
1148def _resolve_input_schema(tool: Any) -> Any:
1149 """Return the registered Pydantic input model for a Tool, or None.
1151 FastMCP exposes the model under ``input_schema`` in newer releases
1152 and ``inputSchema`` in older ones. Tolerate both. Tools that genuinely
1153 take no args (or test catalog mocks that omit the attribute) yield
1154 ``None``, in which case the caller skips per-call args validation.
1155 """
1156 schema = getattr(tool, "input_schema", None)
1157 if schema is None:
1158 schema = getattr(tool, "inputSchema", None)
1159 return schema
1162def _extract_tool_json_schemas(
1163 allowlist: Sequence[str],
1164 registered_tools: Mapping[str, Any],
1165) -> dict[str, Any]:
1166 """Extract JSON Schema dicts for each allowlisted tool's input parameters.
1168 Calls ``.model_json_schema()`` on the Pydantic model exposed by
1169 ``_resolve_input_schema``. Falls back gracefully: tools without a
1170 schema, tools whose schema isn't a Pydantic model, and any
1171 exception during schema extraction all yield ``None`` for that
1172 tool (omitted from the output dict). The caller renders the
1173 result into the Strategy_Revision prompt so the model can propose
1174 valid ``args`` dicts.
1175 """
1176 schemas: dict[str, Any] = {}
1177 for name in allowlist:
1178 tool = registered_tools.get(name)
1179 if tool is None: 1179 ↛ 1180line 1179 didn't jump to line 1180 because the condition on line 1179 was never true
1180 continue
1181 model = _resolve_input_schema(tool)
1182 if model is None: 1182 ↛ 1183line 1182 didn't jump to line 1183 because the condition on line 1182 was never true
1183 continue
1184 try:
1185 # Pydantic v2 models expose model_json_schema() as a classmethod.
1186 json_schema = model.model_json_schema()
1187 schemas[name] = json_schema
1188 except Exception:
1189 # Non-Pydantic schema, or a mock that doesn't support it.
1190 continue
1191 return schemas
1194def validate_strategy_against_catalog(
1195 strategy: Strategy,
1196 allowlist: list[str],
1197 registered_tools: dict[str, Any],
1198 allow_scripts: bool,
1199) -> None:
1200 """Validate a Strategy against the live tool catalog.
1202 Returns ``None`` on accept; raises :class:`MissionValidationError`
1203 with a structured ``details.reason`` enum on reject. The function
1204 layers catalog-aware checks on top of the structural validation in
1205 :func:`mission.validation.validate_strategy`:
1207 1. Mutual-exclusivity (exactly one of ``tool_calls`` / ``script``).
1208 2. Per-call ``tool_name`` is in ``allowlist``.
1209 3. Per-call ``args`` validates against the registered Pydantic model
1210 exposed under ``Tool.input_schema`` (or the older
1211 ``Tool.inputSchema``); calls whose tool has neither attribute or
1212 a ``None`` schema skip args validation.
1213 4. For scripted strategies, ``allow_scripts`` is True and the
1214 script's AST passes :func:`mission.sandbox.validate_script_ast`.
1216 Args:
1217 strategy: The Strategy dict to validate.
1218 allowlist: The session's resolved Tool_Allowlist.
1219 registered_tools: Mapping from tool name to a registered tool
1220 object (typed ``Any`` so the module imports without
1221 FastMCP). Read-only — only ``input_schema`` /
1222 ``inputSchema`` is consulted.
1223 allow_scripts: Session-level flag gating scripted strategies.
1224 """
1225 # 1. Structural validation: mutual exclusivity, script-allow gating,
1226 # and AST validation for scripts. Reuses the existing validator
1227 # so error shapes for those rejection classes stay aligned with
1228 # the rest of the input pipeline.
1229 _validation.validate_strategy(cast("dict[str, Any]", strategy), allowlist, allow_scripts)
1231 # The structural validator has already accepted exactly one of the
1232 # two shapes. Branch on which one is present.
1233 if "tool_calls" in strategy:
1234 tool_calls = strategy["tool_calls"]
1235 # Empty list is rejected by validate_strategy; this is a defence
1236 # in depth for callers that might bypass that path.
1237 if not tool_calls: 1237 ↛ 1238line 1237 didn't jump to line 1238 because the condition on line 1237 was never true
1238 raise MissionValidationError(
1239 "validation_error",
1240 details={
1241 "field": "strategy",
1242 "subfield": "tool_calls",
1243 "reason": "tool_calls_empty",
1244 },
1245 )
1247 # 2. Per-call name-in-allowlist check.
1248 for call in tool_calls:
1249 name = call.get("tool_name")
1250 if name not in allowlist:
1251 raise MissionValidationError(
1252 "validation_error",
1253 details={
1254 "field": "strategy",
1255 "subfield": "tool_calls",
1256 "tool_name": name,
1257 "reason": "tool_not_allowlisted",
1258 "allowlist": list(allowlist),
1259 },
1260 )
1262 # 3. Per-call args validation against the tool's Pydantic model.
1263 for call in tool_calls:
1264 name = call["tool_name"]
1265 tool = registered_tools.get(name)
1266 if tool is None: 1266 ↛ 1270line 1266 didn't jump to line 1270 because the condition on line 1266 was never true
1267 # Catalog could have a name in the allowlist that is not
1268 # currently registered (gating, dynamic load). Mirror the
1269 # unknown-tool shape used elsewhere.
1270 raise MissionValidationError(
1271 "validation_error",
1272 details={
1273 "field": "strategy",
1274 "subfield": "tool_calls",
1275 "tool_name": name,
1276 "reason": "tool_not_registered",
1277 },
1278 )
1279 schema = _resolve_input_schema(tool)
1280 if schema is None:
1281 # Either the tool genuinely takes no args, or the test
1282 # catalog omitted a model. Skip args validation rather
1283 # than reject — the design treats missing schema as
1284 # "trust the dispatcher".
1285 continue
1286 args = call.get("args", {})
1287 if not isinstance(args, dict): 1287 ↛ 1288line 1287 didn't jump to line 1288 because the condition on line 1287 was never true
1288 raise MissionValidationError(
1289 "validation_error",
1290 details={
1291 "field": "strategy",
1292 "subfield": "tool_calls",
1293 "tool_name": name,
1294 "reason": "tool_args_invalid",
1295 "errors": [
1296 {
1297 "type": "args_not_a_dict",
1298 "actual_type": type(args).__name__,
1299 }
1300 ],
1301 },
1302 )
1303 try:
1304 schema.model_validate(args)
1305 except Exception as exc: # noqa: BLE001 - pydantic ValidationError + similar
1306 # Pydantic v2 ValidationError exposes ``.errors()`` as a
1307 # list of structured dicts. Tolerate any other exception
1308 # type (e.g. older Pydantic, custom validators) by
1309 # falling back to ``str(exc)``.
1310 errors_method = getattr(exc, "errors", None)
1311 if callable(errors_method): 1311 ↛ 1317line 1311 didn't jump to line 1317 because the condition on line 1311 was always true
1312 try:
1313 errors_payload: Any = errors_method()
1314 except Exception: # noqa: BLE001 - defensive
1315 errors_payload = [{"type": "unknown", "msg": str(exc)}]
1316 else:
1317 errors_payload = [{"type": "unknown", "msg": str(exc)}]
1318 raise MissionValidationError(
1319 "validation_error",
1320 details={
1321 "field": "strategy",
1322 "subfield": "tool_calls",
1323 "tool_name": name,
1324 "reason": "tool_args_invalid",
1325 "errors": errors_payload,
1326 },
1327 ) from exc
1329 # 4. Cost estimation against remaining budget. Removed —
1330 # cost guardrails live out-of-band via AWS Budgets / Cost
1331 # Anomaly Detection rather than in the Mission cascade.
1332 # Scripted strategies: validate_strategy already ran allow_scripts
1333 # gating and the AST validator. No catalog-aware checks are layered
1334 # on top here — the script-side enforcement happens at execute time
1335 # via the in-script tool callable wrappers.
1338# ---------------------------------------------------------------------------
1339# Orchestration helpers — bind a backend to a SessionState and return either
1340# a used result or a deterministic fallback.
1341# ---------------------------------------------------------------------------
1343# Local imports kept inside this section so the prompt-builder /
1344# backend half above stays free of audit / decide dependencies.
1345from . import audit as _mission_audit # noqa: E402
1346from . import decide as _decide # noqa: E402
1348# Type alias used by the helpers below. ``SessionState`` is a TypedDict
1349# whose runtime value is just ``dict``; the alias keeps the signatures
1350# expressive without forcing the import to leak through ``__all__``.
1351from .types import SessionState as _SessionState # noqa: E402
1354@dataclass(frozen=True)
1355class SamplingUsed:
1356 """A successful sampling call's accepted output."""
1358 output_text: str
1359 """Raw model output (the text returned by the bound backend)."""
1361 parsed: dict[str, Any]
1362 """Parsed JSON payload that has cleared the schema and catalog checks."""
1364 backend_name: Literal["mcp", "bedrock"]
1365 """Stable backend identifier — echoes the bound backend's tag."""
1367 model_id: str
1368 """The concrete model id the backend routed the prompt to."""
1371@dataclass(frozen=True)
1372class SamplingFallback:
1373 """A rejected or unavailable sampling call's deterministic substitute.
1375 Returned when the bound backend was ``None``, the transport raised,
1376 the model output failed to parse / validate, or any catalog or
1377 budget check rejected the proposed strategy. The ``rationale`` is
1378 a pure function of the bound :class:`SessionState` and the most
1379 recent :class:`IterationRecord`, so the engine can replay or
1380 reproduce a fallback exactly from persisted state.
1381 """
1383 rationale: str
1384 """Deterministic fallback text. Empty string for ``final_lessons``
1385 — the final-report writer fills in its own deterministic text in
1386 that case."""
1388 reason: str
1389 """Stable token tagging *why* the fallback fired. Examples:
1390 ``"transport_error"``, ``"json_parse"``, ``"schema_mismatch"``,
1391 ``"tool_not_allowlisted"``, ``"tool_args_invalid"``,
1392 ``"over_budget"``, ``"script_rejected"``,
1393 ``"no_backend_resolved"``, ``"disabled"``."""
1395 backend_name: Literal["mcp", "bedrock", "none"]
1396 """The bound backend's tag, or ``"none"`` when no backend was
1397 resolved at the call site."""
1399 model_id: str | None
1400 """The bound backend's model id, or ``None`` when no backend was
1401 resolved."""
1404# ---------------------------------------------------------------------------
1405# JSON / schema helpers (private to the orchestration layer)
1406# ---------------------------------------------------------------------------
1409def _extract_json_object(text: str) -> dict[str, Any]:
1410 """Parse the first JSON object embedded in ``text``.
1412 Models routinely wrap JSON in prose. The implementation slices from
1413 the first ``{`` to the last ``}`` and feeds the result to
1414 :func:`json.loads`. When no balanced braces are present, or the
1415 sliced substring is not valid JSON, the function raises
1416 :class:`json.JSONDecodeError` so the caller can branch on a single
1417 well-typed exception.
1418 """
1419 start = text.find("{")
1420 end = text.rfind("}")
1421 if start == -1 or end == -1 or end < start:
1422 # No braces at all → treat as a parse error so the calling
1423 # branch surfaces ``reason="json_parse"``.
1424 raise json.JSONDecodeError("no JSON object found", text, 0)
1425 candidate = text[start : end + 1]
1426 parsed = json.loads(candidate)
1427 if not isinstance(parsed, dict): 1427 ↛ 1430line 1427 didn't jump to line 1430 because the condition on line 1427 was never true
1428 # The sliced substring parsed but is not an object — surface as
1429 # a parse error too, since downstream code requires a dict.
1430 raise json.JSONDecodeError("top-level JSON value is not an object", candidate, 0)
1431 return parsed
1434def _validate_revision_schema(parsed: dict[str, Any]) -> None:
1435 """Reject a parsed payload that is not a valid Strategy_Revision.
1437 Required keys: ``revision_rationale`` (non-empty str),
1438 ``next_strategy`` (dict), ``confidence`` (number in [0, 1]).
1439 """
1440 rationale = parsed.get("revision_rationale")
1441 if not isinstance(rationale, str) or not rationale:
1442 raise ValueError("schema_mismatch: revision_rationale must be non-empty str")
1443 next_strategy = parsed.get("next_strategy")
1444 if not isinstance(next_strategy, dict): 1444 ↛ 1445line 1444 didn't jump to line 1445 because the condition on line 1444 was never true
1445 raise ValueError("schema_mismatch: next_strategy must be a dict")
1446 confidence = parsed.get("confidence")
1447 # ``bool`` is excluded explicitly — it is a subclass of ``int`` in
1448 # Python and would otherwise sneak past the numeric check.
1449 if isinstance(confidence, bool) or not isinstance(confidence, (int, float)): 1449 ↛ 1450line 1449 didn't jump to line 1450 because the condition on line 1449 was never true
1450 raise ValueError("schema_mismatch: confidence must be a number")
1451 if not (0.0 <= float(confidence) <= 1.0): 1451 ↛ 1452line 1451 didn't jump to line 1452 because the condition on line 1451 was never true
1452 raise ValueError("schema_mismatch: confidence must be in [0, 1]")
1455def _validate_lessons_schema(parsed: dict[str, Any]) -> None:
1456 """Reject a parsed payload that is not a valid final-lessons dict.
1458 Required keys: ``lessons`` (non-empty list of non-empty str),
1459 ``recommended_followups`` (list of str — may be empty).
1460 """
1461 lessons = parsed.get("lessons")
1462 if not isinstance(lessons, list) or not lessons: 1462 ↛ 1463line 1462 didn't jump to line 1463 because the condition on line 1462 was never true
1463 raise ValueError("schema_mismatch: lessons must be a non-empty list")
1464 for item in lessons:
1465 if not isinstance(item, str) or not item: 1465 ↛ 1466line 1465 didn't jump to line 1466 because the condition on line 1465 was never true
1466 raise ValueError("schema_mismatch: each lesson must be a non-empty str")
1467 followups = parsed.get("recommended_followups")
1468 if not isinstance(followups, list): 1468 ↛ 1469line 1468 didn't jump to line 1469 because the condition on line 1468 was never true
1469 raise ValueError("schema_mismatch: recommended_followups must be a list")
1470 for item in followups:
1471 if not isinstance(item, str): 1471 ↛ 1472line 1471 didn't jump to line 1472 because the condition on line 1471 was never true
1472 raise ValueError("schema_mismatch: each follow-up must be a str")
1475# ---------------------------------------------------------------------------
1476# maybe_sample_strategy_revision
1477# ---------------------------------------------------------------------------
1480async def maybe_sample_strategy_revision(
1481 *,
1482 backend: SamplingBackend | None,
1483 session: _SessionState,
1484 iteration: IterationRecord,
1485 allowlist: list[str],
1486 registered_tools: dict[str, Any],
1487 tool_docstrings: dict[str, str],
1488 remaining_iterations: int,
1489 remaining_wall_clock_secs: float | None,
1490 allow_scripts: bool,
1491 environment_context: Mapping[str, Any] | None = None,
1492) -> SamplingUsed | SamplingFallback:
1493 """Consult the advisory LLM for a Strategy_Revision, or fall back.
1495 Returns a :class:`SamplingUsed` when the bound backend produces a
1496 JSON object that clears schema validation and the catalog checks.
1497 Returns a :class:`SamplingFallback` carrying the deterministic
1498 rationale from
1499 :func:`mission.decide.build_revision_rationale_template` on every
1500 rejection class. Emits exactly one
1501 :func:`mission.audit.emit_sampling_event` per call.
1502 """
1503 session_id = session["session_id"]
1504 iteration_index = iteration["iteration_index"]
1505 template = _decide.build_revision_rationale_template(session, iteration)
1507 # ---- No backend resolved: short-circuit. ------------------------------
1508 if backend is None:
1509 _mission_audit.emit_sampling_event(
1510 session_id,
1511 iteration_index,
1512 sampling_purpose="strategy_revision",
1513 sampling_status="disabled",
1514 sampling_backend="none",
1515 )
1516 return SamplingFallback(
1517 rationale=template,
1518 reason="no_backend_resolved",
1519 backend_name="none",
1520 model_id=None,
1521 )
1523 backend_name = backend.backend_name
1524 model_id = backend.model_id
1526 # ---- Build the prompt. ------------------------------------------------
1527 # The in-progress iteration that triggered ``adjust`` is already in
1528 # ``session["iterations"][-1]``, so the most-recent-five window is a
1529 # plain slice; ``RECENT_ITERATIONS_LIMIT`` is enforced inside the
1530 # prompt builder as a defensive cap.
1531 recent_iterations = list(session["iterations"][-RECENT_ITERATIONS_LIMIT:])
1532 tool_schemas = _extract_tool_json_schemas(allowlist, registered_tools)
1533 prompt = SamplingPrompt(
1534 directive=session["directive_text"],
1535 success_criteria=session["criteria"],
1536 criteria_status=iteration["criteria_evaluation"],
1537 recent_iterations=recent_iterations,
1538 tool_allowlist=allowlist,
1539 tool_docstrings=tool_docstrings,
1540 remaining_iterations=remaining_iterations,
1541 remaining_wall_clock_secs=remaining_wall_clock_secs,
1542 allow_scripts=allow_scripts,
1543 tool_schemas=tool_schemas,
1544 environment_context=environment_context,
1545 )
1547 # ---- Transport: backend.sample. --------------------------------------
1548 try:
1549 output_text = await backend.sample(prompt)
1550 except SamplingTransportError as err:
1551 _mission_audit.emit_sampling_event(
1552 session_id,
1553 iteration_index,
1554 sampling_purpose="strategy_revision",
1555 sampling_status="rejected",
1556 sampling_backend=backend_name,
1557 sampling_model_id=model_id or None,
1558 validation_error=err.code,
1559 )
1560 return SamplingFallback(
1561 rationale=template,
1562 reason="transport_error",
1563 backend_name=backend_name,
1564 model_id=model_id,
1565 )
1567 # ---- Parse the output as JSON. ---------------------------------------
1568 try:
1569 parsed = _extract_json_object(output_text)
1570 except json.JSONDecodeError:
1571 _mission_audit.emit_sampling_event(
1572 session_id,
1573 iteration_index,
1574 sampling_purpose="strategy_revision",
1575 sampling_status="rejected",
1576 sampling_backend=backend_name,
1577 sampling_model_id=model_id or None,
1578 validation_error="json_parse",
1579 )
1580 return SamplingFallback(
1581 rationale=template,
1582 reason="json_parse",
1583 backend_name=backend_name,
1584 model_id=model_id,
1585 )
1587 # ---- Schema validation. ----------------------------------------------
1588 try:
1589 _validate_revision_schema(parsed)
1590 except ValueError:
1591 _mission_audit.emit_sampling_event(
1592 session_id,
1593 iteration_index,
1594 sampling_purpose="strategy_revision",
1595 sampling_status="rejected",
1596 sampling_backend=backend_name,
1597 sampling_model_id=model_id or None,
1598 validation_error="schema_mismatch",
1599 )
1600 return SamplingFallback(
1601 rationale=template,
1602 reason="schema_mismatch",
1603 backend_name=backend_name,
1604 model_id=model_id,
1605 )
1607 # ---- Catalog validation on the proposed next_strategy. ---------------
1608 try:
1609 validate_strategy_against_catalog(
1610 parsed["next_strategy"],
1611 allowlist,
1612 registered_tools,
1613 allow_scripts,
1614 )
1615 except MissionValidationError as err:
1616 # ``err.details["reason"]`` carries the structured rejection
1617 # token (e.g. ``"tool_not_allowlisted"``,
1618 # ``"tool_args_invalid"``). Fall back to a generic label when
1619 # the validator emits a rejection without a ``reason`` key.
1620 details = err.details or {}
1621 reason = details.get("reason", "validation_error")
1622 _mission_audit.emit_sampling_event(
1623 session_id,
1624 iteration_index,
1625 sampling_purpose="strategy_revision",
1626 sampling_status="rejected",
1627 sampling_backend=backend_name,
1628 sampling_model_id=model_id or None,
1629 validation_error=str(reason),
1630 )
1631 return SamplingFallback(
1632 rationale=template,
1633 reason=str(reason),
1634 backend_name=backend_name,
1635 model_id=model_id,
1636 )
1638 # ---- Success path. ---------------------------------------------------
1639 # Extract token usage from the backend if available (Bedrock backend
1640 # stores it as a side-channel after each sample() call).
1641 _input_tokens = getattr(backend, "last_input_tokens", None)
1642 _output_tokens = getattr(backend, "last_output_tokens", None)
1643 _mission_audit.emit_sampling_event(
1644 session_id,
1645 iteration_index,
1646 sampling_purpose="strategy_revision",
1647 sampling_status="used",
1648 sampling_backend=backend_name,
1649 sampling_model_id=model_id or None,
1650 model_output_bytes=len(output_text.encode("utf-8")),
1651 input_tokens=_input_tokens,
1652 output_tokens=_output_tokens,
1653 )
1654 return SamplingUsed(
1655 output_text=output_text,
1656 parsed=parsed,
1657 backend_name=backend_name,
1658 model_id=model_id,
1659 )
1662# ---------------------------------------------------------------------------
1663# maybe_sample_final_lessons
1664# ---------------------------------------------------------------------------
1667async def maybe_sample_final_lessons(
1668 *,
1669 backend: SamplingBackend | None,
1670 session: _SessionState,
1671 remaining_iterations: int = 0,
1672 remaining_wall_clock_secs: float | None = None,
1673 allow_scripts: bool = False,
1674 tool_docstrings: dict[str, str] | None = None,
1675 environment_context: Mapping[str, Any] | None = None,
1676) -> SamplingUsed | SamplingFallback:
1677 """Consult the advisory LLM for final lessons, or fall back.
1679 Returns a :class:`SamplingUsed` when the bound backend produces a
1680 JSON object that clears the lessons schema. Returns a
1681 :class:`SamplingFallback` with an *empty* rationale on every
1682 rejection class — the final-report writer is responsible for the
1683 deterministic-text path when sampling does not produce usable
1684 output. Emits exactly one
1685 :func:`mission.audit.emit_sampling_event` per call, with
1686 ``iteration_index_or_purpose=None`` since the call is out-of-loop.
1687 """
1688 session_id = session["session_id"]
1690 # ---- No backend resolved: short-circuit. ------------------------------
1691 if backend is None:
1692 _mission_audit.emit_sampling_event(
1693 session_id,
1694 None,
1695 sampling_purpose="final_lessons",
1696 sampling_status="disabled",
1697 sampling_backend="none",
1698 )
1699 return SamplingFallback(
1700 rationale="",
1701 reason="no_backend_resolved",
1702 backend_name="none",
1703 model_id=None,
1704 )
1706 backend_name = backend.backend_name
1707 model_id = backend.model_id
1709 # ---- Build the prompt. ------------------------------------------------
1710 # Pass *all* iterations; the prompt builder trims / drops as needed
1711 # to fit the byte budget.
1712 prompt = SamplingPrompt(
1713 directive=session["directive_text"],
1714 success_criteria=session["criteria"],
1715 # The lessons prompt has no per-iteration criteria status; the
1716 # builder still expects the field, so reuse the most recent
1717 # iteration's evaluation when available, else an empty list.
1718 criteria_status=(
1719 list(session["iterations"][-1]["criteria_evaluation"]) if session["iterations"] else []
1720 ),
1721 recent_iterations=list(session["iterations"]),
1722 tool_allowlist=session.get("tool_allowlist", []),
1723 tool_docstrings=tool_docstrings or {},
1724 remaining_iterations=remaining_iterations,
1725 remaining_wall_clock_secs=remaining_wall_clock_secs,
1726 allow_scripts=allow_scripts,
1727 environment_context=environment_context,
1728 )
1730 # ---- Transport: backend.sample (uses lessons assembler). -------------
1731 try:
1732 # We render the lessons-specific prompt here so the byte-cap
1733 # bookkeeping uses the right schema header. The backend's own
1734 # ``sample`` calls ``prompt.assemble()`` under the hood for the
1735 # Strategy_Revision flow, but for lessons we assemble here and
1736 # invoke a thin shim through the backend.
1737 rendered = prompt.assemble_final_lessons()
1738 output_text = await _sample_with_assembled_text(backend, rendered)
1739 except SamplingTransportError as err:
1740 _mission_audit.emit_sampling_event(
1741 session_id,
1742 None,
1743 sampling_purpose="final_lessons",
1744 sampling_status="rejected",
1745 sampling_backend=backend_name,
1746 sampling_model_id=model_id or None,
1747 validation_error=err.code,
1748 )
1749 return SamplingFallback(
1750 rationale="",
1751 reason="transport_error",
1752 backend_name=backend_name,
1753 model_id=model_id,
1754 )
1756 # ---- Parse the output as JSON. ---------------------------------------
1757 try:
1758 parsed = _extract_json_object(output_text)
1759 except json.JSONDecodeError:
1760 _mission_audit.emit_sampling_event(
1761 session_id,
1762 None,
1763 sampling_purpose="final_lessons",
1764 sampling_status="rejected",
1765 sampling_backend=backend_name,
1766 sampling_model_id=model_id or None,
1767 validation_error="json_parse",
1768 )
1769 return SamplingFallback(
1770 rationale="",
1771 reason="json_parse",
1772 backend_name=backend_name,
1773 model_id=model_id,
1774 )
1776 # ---- Schema validation. ----------------------------------------------
1777 try:
1778 _validate_lessons_schema(parsed)
1779 except ValueError:
1780 _mission_audit.emit_sampling_event(
1781 session_id,
1782 None,
1783 sampling_purpose="final_lessons",
1784 sampling_status="rejected",
1785 sampling_backend=backend_name,
1786 sampling_model_id=model_id or None,
1787 validation_error="schema_mismatch",
1788 )
1789 return SamplingFallback(
1790 rationale="",
1791 reason="schema_mismatch",
1792 backend_name=backend_name,
1793 model_id=model_id,
1794 )
1796 # ---- Success path. ---------------------------------------------------
1797 _input_tokens = getattr(backend, "last_input_tokens", None)
1798 _output_tokens = getattr(backend, "last_output_tokens", None)
1799 _mission_audit.emit_sampling_event(
1800 session_id,
1801 None,
1802 sampling_purpose="final_lessons",
1803 sampling_status="used",
1804 sampling_backend=backend_name,
1805 sampling_model_id=model_id or None,
1806 model_output_bytes=len(output_text.encode("utf-8")),
1807 input_tokens=_input_tokens,
1808 output_tokens=_output_tokens,
1809 )
1810 return SamplingUsed(
1811 output_text=output_text,
1812 parsed=parsed,
1813 backend_name=backend_name,
1814 model_id=model_id,
1815 )
1818async def _sample_with_assembled_text(backend: SamplingBackend, rendered: str) -> str:
1819 """Route a pre-assembled prompt string through a backend.
1821 Both shipped backends accept a :class:`SamplingPrompt` and call
1822 ``assemble`` themselves to render the strategy-revision shape. For
1823 the final-lessons path we render the lessons-shaped prompt here
1824 and need to deliver that exact text to the transport. The shim
1825 builds a tiny prompt-shaped wrapper whose :meth:`assemble` returns
1826 the pre-rendered text and forwards it to the backend.
1827 """
1828 pre_rendered = rendered
1830 class _PreRendered:
1831 """Thin :class:`SamplingPrompt` look-alike with a fixed assemble()."""
1833 def assemble(self) -> str:
1834 return pre_rendered
1836 # The two shipped backends only call ``prompt.assemble()`` so the
1837 # duck-typed wrapper above is enough to drive either of them.
1838 return await backend.sample(_PreRendered()) # type: ignore[arg-type]
1841# ---------------------------------------------------------------------------
1842# Session-start sampling-state resolver
1843# ---------------------------------------------------------------------------
1846def _bedrock_credentials_available() -> bool:
1847 """Lightweight probe: do local AWS credentials resolve?
1849 Instantiates a ``boto3.Session()`` and asks for ``get_credentials()``
1850 without making any network call. ``boto3`` is imported inside the
1851 function so the module's top-level import surface stays free of
1852 SDK dependencies — and so a host that has no ``boto3`` installed
1853 (or any other unexpected import-time failure) cleanly degrades to
1854 "no credentials available" rather than crashing the helper.
1855 """
1856 try:
1857 import boto3
1859 session = boto3.Session()
1860 creds = session.get_credentials()
1861 return creds is not None
1862 except Exception:
1863 return False
1866def resolve_sampling_state(
1867 ctx: Any | None,
1868 use_sampling_param: bool | None,
1869) -> tuple[bool, Literal["mcp", "bedrock", "none"]]:
1870 """Decide whether sampling is enabled for a session and which backend resolves.
1872 Resolution precedence (first match wins):
1874 1. ``use_sampling_param is False`` — caller explicitly disabled
1875 sampling, so the result is ``(False, "none")`` regardless of
1876 any capability the environment advertises.
1877 2. ``ctx`` is non-``None`` and advertises MCP sampling capability —
1878 the caller is on the MCP path and the host can perform sampling,
1879 so the result is ``(True, "mcp")``.
1880 3. ``ctx is None`` (CLI path) — probe local AWS credentials. When
1881 they resolve, return ``(True, "bedrock")``. When they do not,
1882 return ``(True, "none")`` if the caller opted in explicitly with
1883 ``use_sampling_param is True`` (so the caller can decide whether
1884 to error or proceed deterministic-only), and ``(False, "none")``
1885 otherwise.
1886 4. ``ctx`` is non-``None`` but advertises no sampling capability —
1887 same explicit-opt-in handling as the no-credentials CLI branch:
1888 ``(True, "none")`` when the caller opted in explicitly,
1889 ``(False, "none")`` otherwise.
1891 Args:
1892 ctx: A FastMCP-style Context for the MCP path, or ``None`` for
1893 the CLI path. The capability probe is duck-typed so any
1894 object exposing the same attributes that
1895 :func:`_ctx_has_sampling_capability` checks works here.
1896 use_sampling_param: Three-state opt-in flag. ``None`` means the
1897 caller did not specify and the helper should auto-detect.
1898 ``False`` short-circuits to a disabled state. ``True`` means
1899 the caller explicitly opted in; the backend is auto-detected
1900 and ``"none"`` is allowed when no concrete backend resolves.
1902 Returns:
1903 A ``(use_sampling, backend)`` tuple. The caller persists both
1904 values on its ``SessionState`` so the audit pipeline can stamp
1905 every later sampling event with the resolved backend.
1906 """
1907 # 1. Explicit opt-out wins outright.
1908 if use_sampling_param is False:
1909 return (False, "none")
1911 # 2. MCP path with capability advertised.
1912 if ctx is not None and _ctx_has_sampling_capability(ctx):
1913 return (True, "mcp")
1915 # 3. CLI path — probe local AWS credentials.
1916 if ctx is None:
1917 if _bedrock_credentials_available():
1918 return (True, "bedrock")
1919 if use_sampling_param is True:
1920 return (True, "none")
1921 return (False, "none")
1923 # 4. ctx present but no MCP capability — only honour an explicit True.
1924 if use_sampling_param is True:
1925 return (True, "none")
1926 return (False, "none")