Coverage for mcp/resources/cluster.py: 100%

41 statements  

« prev     ^ index     » next       coverage.py v7.14.1, created at 2026-06-15 15:07 +0000

1"""Cluster topology resources (gco://cluster/...) for the GCO MCP server. 

2 

3Aggregates two views of regional cluster state into a single JSON 

4payload an LLM can pin: the Karpenter NodePool inventory (via the 

5``gco nodepools list`` CLI surface) and the list of pods currently 

6in ``Pending`` phase (via ``kubectl get pods``). The combination is 

7the cheapest read that answers "what shape is this cluster in right 

8now and what's stuck waiting for room to schedule". 

9""" 

10 

11from __future__ import annotations 

12 

13import json 

14import re 

15from typing import Any 

16 

17import cli_runner 

18 

19# AWS region IDs: lowercase letters, digits, and hyphens. The bounded 

20# length is generous (current AWS regions max out near 14 characters, 

21# but local-region constructs like ``gov-east-1`` may grow). 

22_REGION_RE = re.compile(r"^[a-z]{2}(?:-[a-z]+)+-[0-9]+$") 

23_KUBECTL_TIMEOUT_SECONDS = 30 

24 

25 

26def _list_nodepools(region: str) -> dict[str, Any]: 

27 """Run ``gco nodepools list`` and return the parsed payload (or an error stub).""" 

28 raw = cli_runner._run_cli("nodepools", "list", "-r", region) 

29 try: 

30 parsed = json.loads(raw) 

31 except json.JSONDecodeError, ValueError: 

32 return {"error": "failed to parse nodepools output", "raw": raw} 

33 if isinstance(parsed, dict): 

34 return parsed 

35 return {"value": parsed} 

36 

37 

38def _pending_pods(region: str) -> dict[str, Any]: 

39 """Return pods currently in ``Pending`` phase across the regional cluster.""" 

40 cluster_name = f"gco-{region}" 

41 try: 

42 result = cli_runner.subprocess.run( # type: ignore[attr-defined] # nosemgrep: dangerous-subprocess-use-audit - shell=False; argv built from validated region literal 

43 [ 

44 "kubectl", 

45 "get", 

46 "pods", 

47 "--all-namespaces", 

48 "--field-selector", 

49 "status.phase=Pending", 

50 "-o", 

51 "json", 

52 "--context", 

53 f"arn:aws:eks:{region}:cluster/{cluster_name}", 

54 ], 

55 capture_output=True, 

56 text=True, 

57 timeout=_KUBECTL_TIMEOUT_SECONDS, 

58 ) 

59 except FileNotFoundError: 

60 return {"error": "kubectl not found"} 

61 except cli_runner.subprocess.TimeoutExpired: # type: ignore[attr-defined] 

62 return {"error": f"kubectl timed out after {_KUBECTL_TIMEOUT_SECONDS}s"} 

63 if result.returncode != 0: 

64 err = (result.stderr or result.stdout or "").strip() 

65 return {"error": err or "kubectl command failed", "exit_code": result.returncode} 

66 try: 

67 parsed = json.loads(result.stdout) 

68 except json.JSONDecodeError, ValueError: 

69 return {"error": "failed to parse kubectl output", "raw": result.stdout} 

70 if isinstance(parsed, dict): 

71 return parsed 

72 return {"value": parsed} 

73 

74 

75def _topology_resource(region: str) -> str: 

76 """Return a structured snapshot of nodepools plus pending pods for ``region``.""" 

77 if not _REGION_RE.match(region): 

78 return json.dumps({"error": "invalid region", "value": region}) 

79 summary = { 

80 "region": region, 

81 "nodepools": _list_nodepools(region), 

82 "pending_pods": _pending_pods(region), 

83 } 

84 return json.dumps(summary, indent=2, default=str) 

85 

86 

87def register(mcp_instance: Any) -> None: 

88 """Register the cluster topology aggregator against the shared MCP server.""" 

89 mcp_instance.resource("gco://cluster/{region}/topology")(_topology_resource)