Coverage for mcp/resources/jobs.py: 100%
23 statements
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-15 15:07 +0000
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-15 15:07 +0000
1"""Live job state resources (gco://jobs/...) for the GCO MCP server.
3Each handler is a thin wrapper around ``kubectl get job`` so the
4resource layer never re-implements Kubernetes plumbing. Handler
5returns the raw YAML (or a structured error string) — pinning a job
6URI is a cheap way for an LLM to keep the latest manifest in context
7across turns.
8"""
10from __future__ import annotations
12import json
13import re
14from typing import Any
16import cli_runner
18# RFC 1123 label format. Job names live in the same namespace as pod
19# names, so the same rule applies. Bounded length stops accidental
20# command-line stuffing through a malformed URI template expansion.
21_JOB_NAME_RE = re.compile(r"^[a-z0-9](?:[-a-z0-9]{0,251}[a-z0-9])?$")
22_DEFAULT_NAMESPACE = "gco-jobs"
23_KUBECTL_TIMEOUT_SECONDS = 30
26def _job_resource(job_name: str) -> str:
27 """Return the live YAML for ``job_name`` in the GCO jobs namespace."""
28 if not _JOB_NAME_RE.match(job_name):
29 return json.dumps(
30 {
31 "error": "invalid job_name",
32 "detail": "must match ^[a-z0-9](?:[-a-z0-9]{0,251}[a-z0-9])?$",
33 "value": job_name,
34 }
35 )
36 try:
37 result = cli_runner.subprocess.run( # type: ignore[attr-defined] # nosemgrep: dangerous-subprocess-use-audit - shell=False; argv is a literal list with a validated job_name
38 [
39 "kubectl",
40 "get",
41 "job",
42 job_name,
43 "-n",
44 _DEFAULT_NAMESPACE,
45 "-o",
46 "yaml",
47 ],
48 capture_output=True,
49 text=True,
50 timeout=_KUBECTL_TIMEOUT_SECONDS,
51 )
52 except FileNotFoundError:
53 return json.dumps({"error": "kubectl not found"})
54 except cli_runner.subprocess.TimeoutExpired: # type: ignore[attr-defined]
55 return json.dumps({"error": f"kubectl timed out after {_KUBECTL_TIMEOUT_SECONDS}s"})
56 if result.returncode != 0:
57 err = (result.stderr or result.stdout or "").strip()
58 return json.dumps(
59 {"error": err or "kubectl command failed", "exit_code": result.returncode}
60 )
61 return str(result.stdout)
64def register(mcp_instance: Any) -> None:
65 """Register live job-state resources against the shared MCP server."""
66 mcp_instance.resource("gco://jobs/{job_name}")(_job_resource)