Coverage for mcp/resources/cluster.py: 100%
41 statements
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-15 15:07 +0000
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-15 15:07 +0000
1"""Cluster topology resources (gco://cluster/...) for the GCO MCP server.
3Aggregates two views of regional cluster state into a single JSON
4payload an LLM can pin: the Karpenter NodePool inventory (via the
5``gco nodepools list`` CLI surface) and the list of pods currently
6in ``Pending`` phase (via ``kubectl get pods``). The combination is
7the cheapest read that answers "what shape is this cluster in right
8now and what's stuck waiting for room to schedule".
9"""
11from __future__ import annotations
13import json
14import re
15from typing import Any
17import cli_runner
19# AWS region IDs: lowercase letters, digits, and hyphens. The bounded
20# length is generous (current AWS regions max out near 14 characters,
21# but local-region constructs like ``gov-east-1`` may grow).
22_REGION_RE = re.compile(r"^[a-z]{2}(?:-[a-z]+)+-[0-9]+$")
23_KUBECTL_TIMEOUT_SECONDS = 30
26def _list_nodepools(region: str) -> dict[str, Any]:
27 """Run ``gco nodepools list`` and return the parsed payload (or an error stub)."""
28 raw = cli_runner._run_cli("nodepools", "list", "-r", region)
29 try:
30 parsed = json.loads(raw)
31 except json.JSONDecodeError, ValueError:
32 return {"error": "failed to parse nodepools output", "raw": raw}
33 if isinstance(parsed, dict):
34 return parsed
35 return {"value": parsed}
38def _pending_pods(region: str) -> dict[str, Any]:
39 """Return pods currently in ``Pending`` phase across the regional cluster."""
40 cluster_name = f"gco-{region}"
41 try:
42 result = cli_runner.subprocess.run( # type: ignore[attr-defined] # nosemgrep: dangerous-subprocess-use-audit - shell=False; argv built from validated region literal
43 [
44 "kubectl",
45 "get",
46 "pods",
47 "--all-namespaces",
48 "--field-selector",
49 "status.phase=Pending",
50 "-o",
51 "json",
52 "--context",
53 f"arn:aws:eks:{region}:cluster/{cluster_name}",
54 ],
55 capture_output=True,
56 text=True,
57 timeout=_KUBECTL_TIMEOUT_SECONDS,
58 )
59 except FileNotFoundError:
60 return {"error": "kubectl not found"}
61 except cli_runner.subprocess.TimeoutExpired: # type: ignore[attr-defined]
62 return {"error": f"kubectl timed out after {_KUBECTL_TIMEOUT_SECONDS}s"}
63 if result.returncode != 0:
64 err = (result.stderr or result.stdout or "").strip()
65 return {"error": err or "kubectl command failed", "exit_code": result.returncode}
66 try:
67 parsed = json.loads(result.stdout)
68 except json.JSONDecodeError, ValueError:
69 return {"error": "failed to parse kubectl output", "raw": result.stdout}
70 if isinstance(parsed, dict):
71 return parsed
72 return {"value": parsed}
75def _topology_resource(region: str) -> str:
76 """Return a structured snapshot of nodepools plus pending pods for ``region``."""
77 if not _REGION_RE.match(region):
78 return json.dumps({"error": "invalid region", "value": region})
79 summary = {
80 "region": region,
81 "nodepools": _list_nodepools(region),
82 "pending_pods": _pending_pods(region),
83 }
84 return json.dumps(summary, indent=2, default=str)
87def register(mcp_instance: Any) -> None:
88 """Register the cluster topology aggregator against the shared MCP server."""
89 mcp_instance.resource("gco://cluster/{region}/topology")(_topology_resource)