Coverage for mcp/resources/tasks.py: 99%

53 statements  

« prev     ^ index     » next       coverage.py v7.14.1, created at 2026-06-15 15:07 +0000

1"""Task status resources (tasks://gco/...) for the GCO MCP server. 

2 

3Reads through FastMCP's task protocol surface to surface the status of 

4a long-running tool invocation as JSON. Returns a graceful error stub 

5when the FastMCP build in use doesn't expose a task-status query — the 

6protocol surface lives under ``fastmcp.server.tasks`` and its public 

7shape can shift between minor versions. 

8""" 

9 

10from __future__ import annotations 

11 

12import json 

13import re 

14from typing import Any 

15 

16# Task IDs are client-controlled strings (FastMCP forwards whatever the 

17# client passed). Restrict to a generous alphanumeric+ punctuation set 

18# so a malformed URI expansion can't sneak shell metacharacters into 

19# downstream lookups. 

20_TASK_ID_RE = re.compile(r"^[A-Za-z0-9][A-Za-z0-9._:-]{0,255}$") 

21 

22 

23def _lookup_task_state(task_id: str) -> dict[str, Any] | None: 

24 """Best-effort lookup of a task's current state through FastMCP's API. 

25 

26 FastMCP exposes task introspection via the docket store registered 

27 on the server instance. The exact accessor moves between minor 

28 versions, so this helper tries the documented paths in order and 

29 returns ``None`` if none of them work — the caller turns that into 

30 a graceful error JSON. 

31 """ 

32 try: 

33 from server import mcp as _mcp 

34 except ImportError: 

35 return None 

36 

37 # Newer FastMCP exposes a direct ``get_task`` accessor. 

38 getter = getattr(_mcp, "get_task", None) 

39 if callable(getter): 

40 try: 

41 record = getter(task_id) 

42 except Exception: # noqa: BLE001 

43 record = None 

44 if record is not None: 

45 return _coerce_to_dict(record) 

46 

47 # Older builds keep state on the docket adapter. 

48 docket = getattr(_mcp, "_docket", None) or getattr(_mcp, "docket", None) 

49 for attr in ("get_task", "get", "fetch_task"): 

50 accessor = getattr(docket, attr, None) 

51 if callable(accessor): 

52 try: 

53 record = accessor(task_id) 

54 except Exception: # noqa: BLE001 

55 continue 

56 if record is not None: 56 ↛ 49line 56 didn't jump to line 49 because the condition on line 56 was always true

57 return _coerce_to_dict(record) 

58 

59 return None 

60 

61 

62def _coerce_to_dict(record: object) -> dict[str, Any]: 

63 """Best-effort conversion of an opaque task record to a JSON-friendly dict.""" 

64 if isinstance(record, dict): 

65 return record 

66 for attr in ("model_dump", "dict", "to_dict", "_asdict"): 

67 method = getattr(record, attr, None) 

68 if callable(method): 

69 try: 

70 payload = method() 

71 except Exception: # noqa: BLE001 

72 continue 

73 if isinstance(payload, dict): 

74 return payload 

75 if hasattr(record, "__dict__"): 

76 return {k: v for k, v in vars(record).items() if not k.startswith("_")} 

77 return {"value": str(record)} 

78 

79 

80def _task_resource(task_id: str) -> str: 

81 """Return the current status of ``task_id`` as JSON.""" 

82 if not _TASK_ID_RE.match(task_id): 

83 return json.dumps({"error": "invalid task_id", "value": task_id}) 

84 state = _lookup_task_state(task_id) 

85 if state is None: 

86 return json.dumps( 

87 { 

88 "error": "task protocol not available", 

89 "detail": ( 

90 "this build of FastMCP does not expose a task-status accessor " 

91 "this resource handler can call" 

92 ), 

93 "task_id": task_id, 

94 } 

95 ) 

96 return json.dumps({"task_id": task_id, "state": state}, indent=2, default=str) 

97 

98 

99def register(mcp_instance: Any) -> None: 

100 """Register the task-status resource against the shared MCP server.""" 

101 mcp_instance.resource("tasks://gco/{task_id}")(_task_resource)