Coverage for cli / commands / queue_cmd.py: 92%

169 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-30 21:47 +0000

1"""Global job queue commands.""" 

2 

3import sys 

4from typing import Any 

5 

6import click 

7 

8from ..config import GCOConfig 

9from ..output import get_output_formatter 

10 

11pass_config = click.make_pass_decorator(GCOConfig, ensure=True) 

12 

13 

14@click.group() 

15@pass_config 

16def queue(config: Any) -> None: 

17 """Manage the global job queue (DynamoDB-backed). 

18 

19 The job queue provides centralized job submission and tracking: 

20 - Submit jobs to any region from anywhere 

21 - Track job status globally 

22 - View job history and statistics 

23 """ 

24 pass 

25 

26 

27@queue.command("submit") 

28@click.argument("manifest_path", type=click.Path(exists=True)) 

29@click.option("--region", "-r", required=True, help="Target region for job execution") 

30@click.option("--namespace", "-n", default="gco-jobs", help="Kubernetes namespace") 

31@click.option("--priority", "-p", default=0, help="Job priority (0-100, higher = more important)") 

32@click.option("--label", "-l", multiple=True, help="Add labels (key=value)") 

33@pass_config 

34def queue_submit( 

35 config: Any, manifest_path: Any, region: Any, namespace: Any, priority: Any, label: Any 

36) -> None: 

37 """Submit a job to the global queue for regional pickup. 

38 

39 Jobs are stored in DynamoDB and picked up by the target region's 

40 manifest processor. This enables global job submission with 

41 centralized tracking. 

42 

43 Examples: 

44 gco queue submit job.yaml --region us-east-1 

45 gco queue submit job.yaml -r us-west-2 --priority 50 

46 gco queue submit job.yaml -r us-east-1 -l team=ml -l project=training 

47 """ 

48 

49 from gco.services.manifest_processor import safe_load_yaml 

50 

51 formatter = get_output_formatter(config) 

52 

53 # Parse labels 

54 labels = {} 

55 for lbl in label: 

56 if "=" in lbl: 56 ↛ 55line 56 didn't jump to line 55 because the condition on line 56 was always true

57 k, v = lbl.split("=", 1) 

58 labels[k] = v 

59 

60 try: 

61 # Load manifest 

62 with open(manifest_path, encoding="utf-8") as f: 

63 manifest = safe_load_yaml(f, allow_aliases=False) 

64 

65 # Submit via API 

66 from ..aws_client import get_aws_client 

67 

68 aws_client = get_aws_client(config) 

69 

70 result = aws_client.call_api( 

71 method="POST", 

72 path="/api/v1/queue/jobs", 

73 region=region, 

74 body={ 

75 "manifest": manifest, 

76 "target_region": region, 

77 "namespace": namespace, 

78 "priority": priority, 

79 "labels": labels if labels else None, 

80 }, 

81 ) 

82 

83 formatter.print_success(f"Job queued for {region}") 

84 formatter.print(result) 

85 

86 except Exception as e: 

87 formatter.print_error(f"Failed to queue job: {e}") 

88 sys.exit(1) 

89 

90 

91@queue.command("list") 

92@click.option("--region", "-r", help="Filter by target region") 

93@click.option( 

94 "--status", 

95 "-s", 

96 type=click.Choice(["queued", "claimed", "running", "succeeded", "failed", "cancelled"]), 

97 help="Filter by status", 

98) 

99@click.option("--namespace", "-n", help="Filter by namespace") 

100@click.option("--limit", "-l", default=50, help="Maximum results") 

101@pass_config 

102def queue_list(config: Any, region: Any, status: Any, namespace: Any, limit: Any) -> None: 

103 """List jobs in the global queue. 

104 

105 Examples: 

106 gco queue list 

107 gco queue list --region us-east-1 --status queued 

108 gco queue list -s running 

109 """ 

110 formatter = get_output_formatter(config) 

111 

112 try: 

113 from ..aws_client import get_aws_client 

114 

115 aws_client = get_aws_client(config) 

116 

117 # Build query params 

118 params = {"limit": limit} 

119 if region: 

120 params["target_region"] = region 

121 if status: 

122 params["status"] = status 

123 if namespace: 

124 params["namespace"] = namespace 

125 

126 # Use any region to query (DynamoDB is global) 

127 query_region = region or config.default_region 

128 result = aws_client.call_api( 

129 method="GET", 

130 path="/api/v1/queue/jobs", 

131 region=query_region, 

132 params=params, 

133 ) 

134 

135 if config.output_format == "table": 135 ↛ 154line 135 didn't jump to line 154 because the condition on line 135 was always true

136 jobs = result.get("jobs", []) 

137 if not jobs: 

138 formatter.print_info("No jobs found") 

139 return 

140 

141 print(f"\n Queued Jobs ({result.get('count', 0)} total)") 

142 print(" " + "-" * 90) 

143 print( 

144 " JOB ID NAME REGION STATUS" 

145 ) 

146 print(" " + "-" * 90) 

147 for job in jobs: 

148 job_id = job.get("job_id", "")[:36] 

149 name = job.get("job_name", "")[:22] 

150 target = job.get("target_region", "")[:14] 

151 job_status = job.get("status", "")[:10] 

152 print(f" {job_id:<36} {name:<23} {target:<15} {job_status}") 

153 else: 

154 formatter.print(result) 

155 

156 except Exception as e: 

157 formatter.print_error(f"Failed to list queued jobs: {e}") 

158 sys.exit(1) 

159 

160 

161@queue.command("get") 

162@click.argument("job_id") 

163@click.option("--region", "-r", help="Region to query (any region works)") 

164@pass_config 

165def queue_get(config: Any, job_id: Any, region: Any) -> None: 

166 """Get details of a queued job including status history. 

167 

168 Examples: 

169 gco queue get abc123-def456 

170 gco queue get abc123-def456 --region us-east-1 

171 """ 

172 formatter = get_output_formatter(config) 

173 

174 try: 

175 from ..aws_client import get_aws_client 

176 

177 aws_client = get_aws_client(config) 

178 

179 query_region = region or config.default_region 

180 result = aws_client.call_api( 

181 method="GET", 

182 path=f"/api/v1/queue/jobs/{job_id}", 

183 region=query_region, 

184 ) 

185 

186 job = result.get("job", {}) 

187 

188 if config.output_format == "table": 188 ↛ 214line 188 didn't jump to line 214 because the condition on line 188 was always true

189 print(f"\n Job: {job.get('job_id')}") 

190 print(" " + "-" * 50) 

191 print(f" Name: {job.get('job_name')}") 

192 print(f" Target Region: {job.get('target_region')}") 

193 print(f" Namespace: {job.get('namespace')}") 

194 print(f" Status: {job.get('status')}") 

195 print(f" Priority: {job.get('priority')}") 

196 print(f" Submitted: {job.get('submitted_at')}") 

197 if job.get("claimed_by"): 197 ↛ 199line 197 didn't jump to line 199 because the condition on line 197 was always true

198 print(f" Claimed By: {job.get('claimed_by')}") 

199 if job.get("completed_at"): 199 ↛ 200line 199 didn't jump to line 200 because the condition on line 199 was never true

200 print(f" Completed: {job.get('completed_at')}") 

201 if job.get("error_message"): 201 ↛ 202line 201 didn't jump to line 202 because the condition on line 201 was never true

202 print(f" Error: {job.get('error_message')}") 

203 

204 # Show status history 

205 history = job.get("status_history", []) 

206 if history: 206 ↛ exitline 206 didn't return from function 'queue_get' because the condition on line 206 was always true

207 print("\n Status History:") 

208 for entry in history: 

209 ts = entry.get("timestamp", "")[:19] 

210 st = entry.get("status", "") 

211 msg = entry.get("message", "")[:40] 

212 print(f" [{ts}] {st}: {msg}") 

213 else: 

214 formatter.print(result) 

215 

216 except Exception as e: 

217 formatter.print_error(f"Failed to get job: {e}") 

218 sys.exit(1) 

219 

220 

221@queue.command("cancel") 

222@click.argument("job_id") 

223@click.option("--reason", help="Cancellation reason") 

224@click.option("--region", "-r", help="Region to query (any region works)") 

225@click.option("--yes", "-y", is_flag=True, help="Skip confirmation") 

226@pass_config 

227def queue_cancel(config: Any, job_id: Any, reason: Any, region: Any, yes: Any) -> None: 

228 """Cancel a queued job (only works for jobs not yet running). 

229 

230 Examples: 

231 gco queue cancel abc123-def456 

232 gco queue cancel abc123-def456 --reason "No longer needed" 

233 """ 

234 formatter = get_output_formatter(config) 

235 

236 if not yes: 236 ↛ 237line 236 didn't jump to line 237 because the condition on line 236 was never true

237 click.confirm(f"Cancel job {job_id}?", abort=True) 

238 

239 try: 

240 from ..aws_client import get_aws_client 

241 

242 aws_client = get_aws_client(config) 

243 

244 query_region = region or config.default_region 

245 params = {} 

246 if reason: 

247 params["reason"] = reason 

248 

249 result = aws_client.call_api( 

250 method="DELETE", 

251 path=f"/api/v1/queue/jobs/{job_id}", 

252 region=query_region, 

253 params=params, 

254 ) 

255 

256 formatter.print_success(f"Job {job_id} cancelled") 

257 formatter.print(result) 

258 

259 except Exception as e: 

260 formatter.print_error(f"Failed to cancel job: {e}") 

261 sys.exit(1) 

262 

263 

264@queue.command("stats") 

265@click.option("--region", "-r", help="Region to query (any region works)") 

266@pass_config 

267def queue_stats(config: Any, region: Any) -> None: 

268 """Get job queue statistics by region and status. 

269 

270 Examples: 

271 gco queue stats 

272 """ 

273 formatter = get_output_formatter(config) 

274 

275 try: 

276 from ..aws_client import get_aws_client 

277 

278 aws_client = get_aws_client(config) 

279 

280 query_region = region or config.default_region 

281 result = aws_client.call_api( 

282 method="GET", 

283 path="/api/v1/queue/stats", 

284 region=query_region, 

285 ) 

286 

287 if config.output_format == "table": 287 ↛ 308line 287 didn't jump to line 308 because the condition on line 287 was always true

288 summary = result.get("summary", {}) 

289 by_region = result.get("by_region", {}) 

290 

291 print("\n Job Queue Statistics") 

292 print(" " + "-" * 50) 

293 print(f" Total Jobs: {summary.get('total_jobs', 0)}") 

294 print(f" Queued: {summary.get('total_queued', 0)}") 

295 print(f" Running: {summary.get('total_running', 0)}") 

296 

297 if by_region: 297 ↛ exitline 297 didn't return from function 'queue_stats' because the condition on line 297 was always true

298 print("\n By Region:") 

299 print(" REGION QUEUED RUNNING SUCCEEDED FAILED") 

300 print(" " + "-" * 55) 

301 for reg, statuses in by_region.items(): 

302 queued = statuses.get("queued", 0) 

303 running = statuses.get("running", 0) 

304 succeeded = statuses.get("succeeded", 0) 

305 failed = statuses.get("failed", 0) 

306 print(f" {reg:<15} {queued:>6} {running:>7} {succeeded:>9} {failed:>6}") 

307 else: 

308 formatter.print(result) 

309 

310 except Exception as e: 

311 formatter.print_error(f"Failed to get queue stats: {e}") 

312 sys.exit(1)