Coverage for mcp/resources/tasks.py: 99%
53 statements
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-15 15:07 +0000
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-15 15:07 +0000
1"""Task status resources (tasks://gco/...) for the GCO MCP server.
3Reads through FastMCP's task protocol surface to surface the status of
4a long-running tool invocation as JSON. Returns a graceful error stub
5when the FastMCP build in use doesn't expose a task-status query — the
6protocol surface lives under ``fastmcp.server.tasks`` and its public
7shape can shift between minor versions.
8"""
10from __future__ import annotations
12import json
13import re
14from typing import Any
16# Task IDs are client-controlled strings (FastMCP forwards whatever the
17# client passed). Restrict to a generous alphanumeric+ punctuation set
18# so a malformed URI expansion can't sneak shell metacharacters into
19# downstream lookups.
20_TASK_ID_RE = re.compile(r"^[A-Za-z0-9][A-Za-z0-9._:-]{0,255}$")
23def _lookup_task_state(task_id: str) -> dict[str, Any] | None:
24 """Best-effort lookup of a task's current state through FastMCP's API.
26 FastMCP exposes task introspection via the docket store registered
27 on the server instance. The exact accessor moves between minor
28 versions, so this helper tries the documented paths in order and
29 returns ``None`` if none of them work — the caller turns that into
30 a graceful error JSON.
31 """
32 try:
33 from server import mcp as _mcp
34 except ImportError:
35 return None
37 # Newer FastMCP exposes a direct ``get_task`` accessor.
38 getter = getattr(_mcp, "get_task", None)
39 if callable(getter):
40 try:
41 record = getter(task_id)
42 except Exception: # noqa: BLE001
43 record = None
44 if record is not None:
45 return _coerce_to_dict(record)
47 # Older builds keep state on the docket adapter.
48 docket = getattr(_mcp, "_docket", None) or getattr(_mcp, "docket", None)
49 for attr in ("get_task", "get", "fetch_task"):
50 accessor = getattr(docket, attr, None)
51 if callable(accessor):
52 try:
53 record = accessor(task_id)
54 except Exception: # noqa: BLE001
55 continue
56 if record is not None: 56 ↛ 49line 56 didn't jump to line 49 because the condition on line 56 was always true
57 return _coerce_to_dict(record)
59 return None
62def _coerce_to_dict(record: object) -> dict[str, Any]:
63 """Best-effort conversion of an opaque task record to a JSON-friendly dict."""
64 if isinstance(record, dict):
65 return record
66 for attr in ("model_dump", "dict", "to_dict", "_asdict"):
67 method = getattr(record, attr, None)
68 if callable(method):
69 try:
70 payload = method()
71 except Exception: # noqa: BLE001
72 continue
73 if isinstance(payload, dict):
74 return payload
75 if hasattr(record, "__dict__"):
76 return {k: v for k, v in vars(record).items() if not k.startswith("_")}
77 return {"value": str(record)}
80def _task_resource(task_id: str) -> str:
81 """Return the current status of ``task_id`` as JSON."""
82 if not _TASK_ID_RE.match(task_id):
83 return json.dumps({"error": "invalid task_id", "value": task_id})
84 state = _lookup_task_state(task_id)
85 if state is None:
86 return json.dumps(
87 {
88 "error": "task protocol not available",
89 "detail": (
90 "this build of FastMCP does not expose a task-status accessor "
91 "this resource handler can call"
92 ),
93 "task_id": task_id,
94 }
95 )
96 return json.dumps({"task_id": task_id, "state": state}, indent=2, default=str)
99def register(mcp_instance: Any) -> None:
100 """Register the task-status resource against the shared MCP server."""
101 mcp_instance.resource("tasks://gco/{task_id}")(_task_resource)