Coverage for cli / commands / queue_cmd.py: 92%
169 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-30 21:47 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-30 21:47 +0000
1"""Global job queue commands."""
3import sys
4from typing import Any
6import click
8from ..config import GCOConfig
9from ..output import get_output_formatter
11pass_config = click.make_pass_decorator(GCOConfig, ensure=True)
14@click.group()
15@pass_config
16def queue(config: Any) -> None:
17 """Manage the global job queue (DynamoDB-backed).
19 The job queue provides centralized job submission and tracking:
20 - Submit jobs to any region from anywhere
21 - Track job status globally
22 - View job history and statistics
23 """
24 pass
27@queue.command("submit")
28@click.argument("manifest_path", type=click.Path(exists=True))
29@click.option("--region", "-r", required=True, help="Target region for job execution")
30@click.option("--namespace", "-n", default="gco-jobs", help="Kubernetes namespace")
31@click.option("--priority", "-p", default=0, help="Job priority (0-100, higher = more important)")
32@click.option("--label", "-l", multiple=True, help="Add labels (key=value)")
33@pass_config
34def queue_submit(
35 config: Any, manifest_path: Any, region: Any, namespace: Any, priority: Any, label: Any
36) -> None:
37 """Submit a job to the global queue for regional pickup.
39 Jobs are stored in DynamoDB and picked up by the target region's
40 manifest processor. This enables global job submission with
41 centralized tracking.
43 Examples:
44 gco queue submit job.yaml --region us-east-1
45 gco queue submit job.yaml -r us-west-2 --priority 50
46 gco queue submit job.yaml -r us-east-1 -l team=ml -l project=training
47 """
49 from gco.services.manifest_processor import safe_load_yaml
51 formatter = get_output_formatter(config)
53 # Parse labels
54 labels = {}
55 for lbl in label:
56 if "=" in lbl: 56 ↛ 55line 56 didn't jump to line 55 because the condition on line 56 was always true
57 k, v = lbl.split("=", 1)
58 labels[k] = v
60 try:
61 # Load manifest
62 with open(manifest_path, encoding="utf-8") as f:
63 manifest = safe_load_yaml(f, allow_aliases=False)
65 # Submit via API
66 from ..aws_client import get_aws_client
68 aws_client = get_aws_client(config)
70 result = aws_client.call_api(
71 method="POST",
72 path="/api/v1/queue/jobs",
73 region=region,
74 body={
75 "manifest": manifest,
76 "target_region": region,
77 "namespace": namespace,
78 "priority": priority,
79 "labels": labels if labels else None,
80 },
81 )
83 formatter.print_success(f"Job queued for {region}")
84 formatter.print(result)
86 except Exception as e:
87 formatter.print_error(f"Failed to queue job: {e}")
88 sys.exit(1)
91@queue.command("list")
92@click.option("--region", "-r", help="Filter by target region")
93@click.option(
94 "--status",
95 "-s",
96 type=click.Choice(["queued", "claimed", "running", "succeeded", "failed", "cancelled"]),
97 help="Filter by status",
98)
99@click.option("--namespace", "-n", help="Filter by namespace")
100@click.option("--limit", "-l", default=50, help="Maximum results")
101@pass_config
102def queue_list(config: Any, region: Any, status: Any, namespace: Any, limit: Any) -> None:
103 """List jobs in the global queue.
105 Examples:
106 gco queue list
107 gco queue list --region us-east-1 --status queued
108 gco queue list -s running
109 """
110 formatter = get_output_formatter(config)
112 try:
113 from ..aws_client import get_aws_client
115 aws_client = get_aws_client(config)
117 # Build query params
118 params = {"limit": limit}
119 if region:
120 params["target_region"] = region
121 if status:
122 params["status"] = status
123 if namespace:
124 params["namespace"] = namespace
126 # Use any region to query (DynamoDB is global)
127 query_region = region or config.default_region
128 result = aws_client.call_api(
129 method="GET",
130 path="/api/v1/queue/jobs",
131 region=query_region,
132 params=params,
133 )
135 if config.output_format == "table": 135 ↛ 154line 135 didn't jump to line 154 because the condition on line 135 was always true
136 jobs = result.get("jobs", [])
137 if not jobs:
138 formatter.print_info("No jobs found")
139 return
141 print(f"\n Queued Jobs ({result.get('count', 0)} total)")
142 print(" " + "-" * 90)
143 print(
144 " JOB ID NAME REGION STATUS"
145 )
146 print(" " + "-" * 90)
147 for job in jobs:
148 job_id = job.get("job_id", "")[:36]
149 name = job.get("job_name", "")[:22]
150 target = job.get("target_region", "")[:14]
151 job_status = job.get("status", "")[:10]
152 print(f" {job_id:<36} {name:<23} {target:<15} {job_status}")
153 else:
154 formatter.print(result)
156 except Exception as e:
157 formatter.print_error(f"Failed to list queued jobs: {e}")
158 sys.exit(1)
161@queue.command("get")
162@click.argument("job_id")
163@click.option("--region", "-r", help="Region to query (any region works)")
164@pass_config
165def queue_get(config: Any, job_id: Any, region: Any) -> None:
166 """Get details of a queued job including status history.
168 Examples:
169 gco queue get abc123-def456
170 gco queue get abc123-def456 --region us-east-1
171 """
172 formatter = get_output_formatter(config)
174 try:
175 from ..aws_client import get_aws_client
177 aws_client = get_aws_client(config)
179 query_region = region or config.default_region
180 result = aws_client.call_api(
181 method="GET",
182 path=f"/api/v1/queue/jobs/{job_id}",
183 region=query_region,
184 )
186 job = result.get("job", {})
188 if config.output_format == "table": 188 ↛ 214line 188 didn't jump to line 214 because the condition on line 188 was always true
189 print(f"\n Job: {job.get('job_id')}")
190 print(" " + "-" * 50)
191 print(f" Name: {job.get('job_name')}")
192 print(f" Target Region: {job.get('target_region')}")
193 print(f" Namespace: {job.get('namespace')}")
194 print(f" Status: {job.get('status')}")
195 print(f" Priority: {job.get('priority')}")
196 print(f" Submitted: {job.get('submitted_at')}")
197 if job.get("claimed_by"): 197 ↛ 199line 197 didn't jump to line 199 because the condition on line 197 was always true
198 print(f" Claimed By: {job.get('claimed_by')}")
199 if job.get("completed_at"): 199 ↛ 200line 199 didn't jump to line 200 because the condition on line 199 was never true
200 print(f" Completed: {job.get('completed_at')}")
201 if job.get("error_message"): 201 ↛ 202line 201 didn't jump to line 202 because the condition on line 201 was never true
202 print(f" Error: {job.get('error_message')}")
204 # Show status history
205 history = job.get("status_history", [])
206 if history: 206 ↛ exitline 206 didn't return from function 'queue_get' because the condition on line 206 was always true
207 print("\n Status History:")
208 for entry in history:
209 ts = entry.get("timestamp", "")[:19]
210 st = entry.get("status", "")
211 msg = entry.get("message", "")[:40]
212 print(f" [{ts}] {st}: {msg}")
213 else:
214 formatter.print(result)
216 except Exception as e:
217 formatter.print_error(f"Failed to get job: {e}")
218 sys.exit(1)
221@queue.command("cancel")
222@click.argument("job_id")
223@click.option("--reason", help="Cancellation reason")
224@click.option("--region", "-r", help="Region to query (any region works)")
225@click.option("--yes", "-y", is_flag=True, help="Skip confirmation")
226@pass_config
227def queue_cancel(config: Any, job_id: Any, reason: Any, region: Any, yes: Any) -> None:
228 """Cancel a queued job (only works for jobs not yet running).
230 Examples:
231 gco queue cancel abc123-def456
232 gco queue cancel abc123-def456 --reason "No longer needed"
233 """
234 formatter = get_output_formatter(config)
236 if not yes: 236 ↛ 237line 236 didn't jump to line 237 because the condition on line 236 was never true
237 click.confirm(f"Cancel job {job_id}?", abort=True)
239 try:
240 from ..aws_client import get_aws_client
242 aws_client = get_aws_client(config)
244 query_region = region or config.default_region
245 params = {}
246 if reason:
247 params["reason"] = reason
249 result = aws_client.call_api(
250 method="DELETE",
251 path=f"/api/v1/queue/jobs/{job_id}",
252 region=query_region,
253 params=params,
254 )
256 formatter.print_success(f"Job {job_id} cancelled")
257 formatter.print(result)
259 except Exception as e:
260 formatter.print_error(f"Failed to cancel job: {e}")
261 sys.exit(1)
264@queue.command("stats")
265@click.option("--region", "-r", help="Region to query (any region works)")
266@pass_config
267def queue_stats(config: Any, region: Any) -> None:
268 """Get job queue statistics by region and status.
270 Examples:
271 gco queue stats
272 """
273 formatter = get_output_formatter(config)
275 try:
276 from ..aws_client import get_aws_client
278 aws_client = get_aws_client(config)
280 query_region = region or config.default_region
281 result = aws_client.call_api(
282 method="GET",
283 path="/api/v1/queue/stats",
284 region=query_region,
285 )
287 if config.output_format == "table": 287 ↛ 308line 287 didn't jump to line 308 because the condition on line 287 was always true
288 summary = result.get("summary", {})
289 by_region = result.get("by_region", {})
291 print("\n Job Queue Statistics")
292 print(" " + "-" * 50)
293 print(f" Total Jobs: {summary.get('total_jobs', 0)}")
294 print(f" Queued: {summary.get('total_queued', 0)}")
295 print(f" Running: {summary.get('total_running', 0)}")
297 if by_region: 297 ↛ exitline 297 didn't return from function 'queue_stats' because the condition on line 297 was always true
298 print("\n By Region:")
299 print(" REGION QUEUED RUNNING SUCCEEDED FAILED")
300 print(" " + "-" * 55)
301 for reg, statuses in by_region.items():
302 queued = statuses.get("queued", 0)
303 running = statuses.get("running", 0)
304 succeeded = statuses.get("succeeded", 0)
305 failed = statuses.get("failed", 0)
306 print(f" {reg:<15} {queued:>6} {running:>7} {succeeded:>9} {failed:>6}")
307 else:
308 formatter.print(result)
310 except Exception as e:
311 formatter.print_error(f"Failed to get queue stats: {e}")
312 sys.exit(1)