Coverage for mcp/resources/jobs.py: 100%

23 statements  

« prev     ^ index     » next       coverage.py v7.14.1, created at 2026-06-15 15:07 +0000

1"""Live job state resources (gco://jobs/...) for the GCO MCP server. 

2 

3Each handler is a thin wrapper around ``kubectl get job`` so the 

4resource layer never re-implements Kubernetes plumbing. Handler 

5returns the raw YAML (or a structured error string) — pinning a job 

6URI is a cheap way for an LLM to keep the latest manifest in context 

7across turns. 

8""" 

9 

10from __future__ import annotations 

11 

12import json 

13import re 

14from typing import Any 

15 

16import cli_runner 

17 

18# RFC 1123 label format. Job names live in the same namespace as pod 

19# names, so the same rule applies. Bounded length stops accidental 

20# command-line stuffing through a malformed URI template expansion. 

21_JOB_NAME_RE = re.compile(r"^[a-z0-9](?:[-a-z0-9]{0,251}[a-z0-9])?$") 

22_DEFAULT_NAMESPACE = "gco-jobs" 

23_KUBECTL_TIMEOUT_SECONDS = 30 

24 

25 

26def _job_resource(job_name: str) -> str: 

27 """Return the live YAML for ``job_name`` in the GCO jobs namespace.""" 

28 if not _JOB_NAME_RE.match(job_name): 

29 return json.dumps( 

30 { 

31 "error": "invalid job_name", 

32 "detail": "must match ^[a-z0-9](?:[-a-z0-9]{0,251}[a-z0-9])?$", 

33 "value": job_name, 

34 } 

35 ) 

36 try: 

37 result = cli_runner.subprocess.run( # type: ignore[attr-defined] # nosemgrep: dangerous-subprocess-use-audit - shell=False; argv is a literal list with a validated job_name 

38 [ 

39 "kubectl", 

40 "get", 

41 "job", 

42 job_name, 

43 "-n", 

44 _DEFAULT_NAMESPACE, 

45 "-o", 

46 "yaml", 

47 ], 

48 capture_output=True, 

49 text=True, 

50 timeout=_KUBECTL_TIMEOUT_SECONDS, 

51 ) 

52 except FileNotFoundError: 

53 return json.dumps({"error": "kubectl not found"}) 

54 except cli_runner.subprocess.TimeoutExpired: # type: ignore[attr-defined] 

55 return json.dumps({"error": f"kubectl timed out after {_KUBECTL_TIMEOUT_SECONDS}s"}) 

56 if result.returncode != 0: 

57 err = (result.stderr or result.stdout or "").strip() 

58 return json.dumps( 

59 {"error": err or "kubectl command failed", "exit_code": result.returncode} 

60 ) 

61 return str(result.stdout) 

62 

63 

64def register(mcp_instance: Any) -> None: 

65 """Register live job-state resources against the shared MCP server.""" 

66 mcp_instance.resource("gco://jobs/{job_name}")(_job_resource)