Coverage for mcp/tools/jobs.py: 95%

76 statements  

« prev     ^ index     » next       coverage.py v7.14.1, created at 2026-06-15 15:07 +0000

1"""Job management MCP tools.""" 

2 

3import asyncio 

4import contextlib 

5 

6import cli_runner 

7from audit import audit_logged 

8from feature_flags import FLAG_DESTRUCTIVE_OPERATIONS, is_enabled 

9from server import mcp 

10 

11 

12async def _ctx_warning(message: str) -> None: 

13 """Emit ``ctx.warning(...)`` from inside a tool body, no-op when no Context. 

14 

15 The destructive ``delete_job`` tool runs short — we don't need the 

16 full long-task progress stack, just an audited warning back to the 

17 operator (and the audit log via the middleware spy). 

18 """ 

19 try: 

20 from fastmcp.server.dependencies import get_context 

21 

22 ctx = get_context() 

23 except Exception: 

24 return 

25 with contextlib.suppress(Exception): 

26 await ctx.warning(message) 

27 

28 

29@mcp.tool(tags={"safe", "jobs"}) 

30@audit_logged 

31def list_jobs( 

32 region: str | None = None, namespace: str | None = None, status: str | None = None 

33) -> str: 

34 """List jobs across GCO clusters. 

35 

36 Args: 

37 region: AWS region (e.g. us-east-1). If omitted, lists across all regions. 

38 namespace: Filter by Kubernetes namespace. 

39 status: Filter by job status (pending, running, completed, succeeded, failed). 

40 """ 

41 args = ["jobs", "list"] 

42 if region: 

43 args += ["-r", region] 

44 else: 

45 args += ["--all-regions"] 

46 if namespace: 

47 args += ["-n", namespace] 

48 if status: 

49 args += ["-s", status] 

50 return cli_runner._run_cli(*args) 

51 

52 

53@mcp.tool(tags={"low-risk", "jobs"}) 

54@audit_logged 

55def submit_job_sqs( 

56 manifest_path: str, region: str, namespace: str | None = None, priority: int | None = None 

57) -> str: 

58 """Submit a job via SQS queue (recommended for production). 

59 

60 Args: 

61 manifest_path: Path to the YAML manifest file (relative to project root). 

62 region: Target AWS region for the SQS queue. 

63 namespace: Override the namespace in the manifest. 

64 priority: Job priority (0-100, higher = more important). 

65 """ 

66 args = ["jobs", "submit-sqs", manifest_path, "-r", region] 

67 if namespace: 

68 args += ["-n", namespace] 

69 if priority is not None: 

70 args += ["--priority", str(priority)] 

71 return cli_runner._run_cli(*args) 

72 

73 

74@mcp.tool(tags={"low-risk", "jobs"}) 

75@audit_logged 

76def submit_job_api(manifest_path: str, namespace: str | None = None) -> str: 

77 """Submit a job via the authenticated API Gateway (SigV4). 

78 

79 Args: 

80 manifest_path: Path to the YAML manifest file. 

81 namespace: Override the namespace in the manifest. 

82 """ 

83 args = ["jobs", "submit", manifest_path] 

84 if namespace: 84 ↛ 86line 84 didn't jump to line 86 because the condition on line 84 was always true

85 args += ["-n", namespace] 

86 return cli_runner._run_cli(*args) 

87 

88 

89@mcp.tool(tags={"safe", "jobs"}) 

90@audit_logged 

91def get_job(job_name: str, region: str, namespace: str = "gco-jobs") -> str: 

92 """Get details of a specific job. 

93 

94 Args: 

95 job_name: Name of the job. 

96 region: AWS region where the job is running. 

97 namespace: Kubernetes namespace. 

98 """ 

99 return cli_runner._run_cli("jobs", "get", job_name, "-r", region, "-n", namespace) 

100 

101 

102@mcp.tool(tags={"safe", "jobs"}) 

103@audit_logged 

104def get_job_logs(job_name: str, region: str, namespace: str = "gco-jobs", tail: int = 100) -> str: 

105 """Get logs from a job. 

106 

107 Args: 

108 job_name: Name of the job. 

109 region: AWS region. 

110 namespace: Kubernetes namespace. 

111 tail: Number of log lines to return. 

112 """ 

113 return cli_runner._run_cli( 

114 "jobs", "logs", job_name, "-r", region, "-n", namespace, "--tail", str(tail) 

115 ) 

116 

117 

118if is_enabled(FLAG_DESTRUCTIVE_OPERATIONS): 

119 

120 @mcp.tool(tags={"destructive", "jobs"}) 

121 @audit_logged 

122 async def delete_job(job_name: str, region: str, namespace: str = "gco-jobs") -> str: 

123 """[gated by GCO_ENABLE_DESTRUCTIVE_OPERATIONS] destructive. 

124 

125 Delete a job. Cannot be undone — the Kubernetes Job and its pods 

126 are removed and any pod logs not yet shipped to CloudWatch are lost. 

127 

128 Args: 

129 job_name: Name of the job to delete. 

130 region: AWS region. 

131 namespace: Kubernetes namespace. 

132 """ 

133 await _ctx_warning( 

134 f"Deleting job {job_name!r} in {region}/{namespace} — this cannot be undone." 

135 ) 

136 return await asyncio.to_thread( 

137 cli_runner._run_cli, "jobs", "delete", job_name, "-r", region, "-n", namespace, "-y" 

138 ) 

139 

140 

141@mcp.tool(tags={"safe", "jobs"}) 

142@audit_logged 

143def get_job_events(job_name: str, region: str, namespace: str = "gco-jobs") -> str: 

144 """Get Kubernetes events for a job (useful for debugging). 

145 

146 Args: 

147 job_name: Name of the job. 

148 region: AWS region. 

149 namespace: Kubernetes namespace. 

150 """ 

151 return cli_runner._run_cli("jobs", "events", job_name, "-r", region, "-n", namespace) 

152 

153 

154@mcp.tool(tags={"safe", "jobs"}) 

155@audit_logged 

156def cluster_health(region: str | None = None) -> str: 

157 """Get health status of GCO clusters. 

158 

159 Args: 

160 region: Specific region, or omit for all regions. 

161 """ 

162 args = ["jobs", "health"] 

163 if region: 

164 args += ["-r", region] 

165 else: 

166 args += ["--all-regions"] 

167 return cli_runner._run_cli(*args) 

168 

169 

170@mcp.tool(tags={"safe", "jobs"}) 

171@audit_logged 

172def queue_status(region: str | None = None) -> str: 

173 """View SQS queue status (pending, in-flight, DLQ counts). 

174 

175 Args: 

176 region: Specific region, or omit for all regions. 

177 """ 

178 args = ["jobs", "queue-status"] 

179 if region: 179 ↛ 180line 179 didn't jump to line 180 because the condition on line 179 was never true

180 args += ["-r", region] 

181 else: 

182 args += ["--all-regions"] 

183 return cli_runner._run_cli(*args)