Coverage for cli / commands / jobs_cmd.py: 88%

488 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-30 21:47 +0000

1"""Job management commands.""" 

2 

3import logging 

4import sys 

5from typing import Any 

6 

7import click 

8 

9from ..config import GCOConfig 

10from ..jobs import get_job_manager 

11from ..output import format_job_table, get_output_formatter 

12 

13logger = logging.getLogger(__name__) 

14 

15pass_config = click.make_pass_decorator(GCOConfig, ensure=True) 

16 

17 

18def _resolve_result_namespace(result: dict[str, Any], fallback: str) -> str: 

19 """Pick the right namespace to poll for a submitted job. 

20 

21 The manifest API response includes per-resource status dicts with a 

22 ``namespace`` field reflecting where the resource actually landed 

23 (which may differ from the CLI's ``--namespace`` flag if the manifest 

24 declared its own ``metadata.namespace``). Prefer that, then the 

25 top-level response envelope, then the CLI-provided fallback. 

26 """ 

27 resources = result.get("resources") or [] 

28 for resource in resources: 28 ↛ 29line 28 didn't jump to line 29 because the loop on line 28 never started

29 ns = resource.get("namespace") 

30 if ns: 

31 return str(ns) 

32 envelope_ns = result.get("namespace") 

33 if envelope_ns: 33 ↛ 34line 33 didn't jump to line 34 because the condition on line 33 was never true

34 return str(envelope_ns) 

35 return fallback 

36 

37 

38@click.group() 

39@pass_config 

40def jobs(config: Any) -> None: 

41 """Manage jobs across GCO clusters.""" 

42 pass 

43 

44 

45@jobs.command("submit") 

46@click.argument("manifest_path", type=click.Path(exists=True)) 

47@click.option( 

48 "--namespace", 

49 "-n", 

50 help="Fallback namespace for manifests that don't declare their own", 

51) 

52@click.option("--region", "-r", "target_region", help="Target specific region") 

53@click.option("--dry-run", is_flag=True, help="Validate without applying") 

54@click.option("--label", "-l", multiple=True, help="Add labels (key=value)") 

55@click.option("--wait", "-w", is_flag=True, help="Wait for job completion") 

56@click.option("--timeout", default=3600, help="Wait timeout in seconds") 

57@pass_config 

58def submit_job( 

59 config: Any, 

60 manifest_path: Any, 

61 namespace: Any, 

62 target_region: Any, 

63 dry_run: Any, 

64 label: Any, 

65 wait: Any, 

66 timeout: Any, 

67) -> None: 

68 """Submit a job to GCO. 

69 

70 MANIFEST_PATH can be a YAML file or directory containing YAML files. 

71 """ 

72 formatter = get_output_formatter(config) 

73 job_manager = get_job_manager(config) 

74 

75 # Parse labels 

76 labels = {} 

77 for lbl in label: 

78 if "=" in lbl: 78 ↛ 77line 78 didn't jump to line 77 because the condition on line 78 was always true

79 k, v = lbl.split("=", 1) 

80 labels[k] = v 

81 

82 try: 

83 result = job_manager.submit_job( 

84 manifests=manifest_path, 

85 namespace=namespace, 

86 target_region=target_region, 

87 dry_run=dry_run, 

88 labels=labels if labels else None, 

89 ) 

90 

91 if dry_run: 

92 formatter.print_success("Dry run successful - manifests are valid") 

93 else: 

94 formatter.print_success("Job submitted successfully") 

95 

96 # Surface any rename warnings from the API response 

97 for resource in result.get("resources", []): 97 ↛ 98line 97 didn't jump to line 98 because the loop on line 97 never started

98 msg = resource.get("message", "") 

99 if "renamed" in msg.lower() or "still running" in msg.lower(): 

100 formatter.print_warning(msg) 

101 

102 formatter.print(result) 

103 

104 # Wait for completion if requested 

105 if wait and not dry_run: 

106 job_name = result.get("job_name") or result.get("name") 

107 if job_name: 107 ↛ exitline 107 didn't return from function 'submit_job' because the condition on line 107 was always true

108 # The API response tells us exactly where the resource landed 

109 # (may differ from --namespace since the manifest's own value 

110 # takes precedence). Fall back to the CLI flag or the config 

111 # default only if the response didn't include a namespace. 

112 resolved_ns = _resolve_result_namespace( 

113 result, fallback=namespace or config.default_namespace 

114 ) 

115 formatter.print_info(f"Waiting for job {job_name} to complete...") 

116 final_job = job_manager.wait_for_job( 

117 job_name=job_name, 

118 namespace=resolved_ns, 

119 region=target_region, 

120 timeout_seconds=timeout, 

121 ) 

122 formatter.print_success(f"Job completed with status: {final_job.status}") 

123 

124 except Exception as e: 

125 formatter.print_error(f"Failed to submit job: {e}") 

126 sys.exit(1) 

127 

128 

129@jobs.command("submit-direct") 

130@click.argument("manifest_path", type=click.Path(exists=True)) 

131@click.option("--region", "-r", required=True, help="Target region for direct submission") 

132@click.option( 

133 "--namespace", 

134 "-n", 

135 help="Fallback namespace for manifests that don't declare their own", 

136) 

137@click.option("--dry-run", is_flag=True, help="Validate without applying") 

138@click.option("--label", "-l", multiple=True, help="Add labels (key=value)") 

139@click.option("--wait", "-w", is_flag=True, help="Wait for job completion") 

140@click.option("--timeout", default=3600, help="Wait timeout in seconds") 

141@pass_config 

142def submit_job_direct( 

143 config: Any, 

144 manifest_path: Any, 

145 region: Any, 

146 namespace: Any, 

147 dry_run: Any, 

148 label: Any, 

149 wait: Any, 

150 timeout: Any, 

151) -> None: 

152 """Submit a job directly to a regional cluster using kubectl. 

153 

154 This bypasses the API Gateway and submits directly to the EKS cluster. 

155 

156 REQUIREMENTS: 

157 - kubectl installed and in PATH 

158 - EKS access entry configured for your IAM principal 

159 - AWS credentials with eks:DescribeCluster permission 

160 

161 To configure EKS access, run: 

162 

163 aws eks create-access-entry --cluster-name gco-REGION --principal-arn YOUR_ARN 

164 

165 aws eks associate-access-policy --cluster-name gco-REGION \\ 

166 --principal-arn YOUR_ARN \\ 

167 --policy-arn arn:aws:eks::aws:cluster-access-policy/AmazonEKSClusterAdminPolicy \\ 

168 --access-scope type=cluster 

169 

170 Examples: 

171 gco jobs submit-direct job.yaml --region us-east-1 

172 gco jobs submit-direct job.yaml -r us-west-2 -n gco-jobs --wait 

173 """ 

174 formatter = get_output_formatter(config) 

175 job_manager = get_job_manager(config) 

176 

177 # Parse labels 

178 labels = {} 

179 for lbl in label: 

180 if "=" in lbl: 180 ↛ 179line 180 didn't jump to line 179 because the condition on line 180 was always true

181 k, v = lbl.split("=", 1) 

182 labels[k] = v 

183 

184 try: 

185 formatter.print_info(f"Submitting directly to cluster in {region} via kubectl...") 

186 

187 result = job_manager.submit_job_direct( 

188 manifests=manifest_path, 

189 region=region, 

190 namespace=namespace, 

191 dry_run=dry_run, 

192 labels=labels if labels else None, 

193 ) 

194 

195 if dry_run: 

196 formatter.print_success("Dry run successful - manifests are valid") 

197 else: 

198 formatter.print_success(f"Job submitted directly to {region}") 

199 

200 # Surface any warnings (e.g. job was renamed due to name collision) 

201 for warning in result.pop("warnings", []): 201 ↛ 202line 201 didn't jump to line 202 because the loop on line 201 never started

202 formatter.print_warning(warning) 

203 

204 formatter.print(result) 

205 

206 # Wait for completion if requested 

207 if wait and not dry_run: 

208 job_name = result.get("job_name") or result.get("name") 

209 if job_name: 209 ↛ exitline 209 didn't return from function 'submit_job_direct' because the condition on line 209 was always true

210 resolved_ns = _resolve_result_namespace( 

211 result, fallback=namespace or config.default_namespace 

212 ) 

213 formatter.print_info(f"Waiting for job {job_name} to complete...") 

214 final_job = job_manager.wait_for_job( 

215 job_name=job_name, 

216 namespace=resolved_ns, 

217 region=region, 

218 timeout_seconds=timeout, 

219 ) 

220 formatter.print_success(f"Job completed with status: {final_job.status}") 

221 

222 except Exception as e: 

223 formatter.print_error(f"Failed to submit job directly: {e}") 

224 sys.exit(1) 

225 

226 

227@jobs.command("submit-sqs") 

228@click.argument("manifest_path", type=click.Path(exists=True)) 

229@click.option("--region", "-r", help="Target region (auto-selects optimal if not specified)") 

230@click.option( 

231 "--namespace", 

232 "-n", 

233 help="Fallback namespace for manifests that don't declare their own", 

234) 

235@click.option("--label", "-l", multiple=True, help="Add labels (key=value)") 

236@click.option("--priority", "-p", default=0, help="Job priority (higher = more important)") 

237@click.option("--auto-region", is_flag=True, help="Auto-select optimal region based on capacity") 

238@pass_config 

239def submit_job_sqs( 

240 config: Any, 

241 manifest_path: Any, 

242 region: Any, 

243 namespace: Any, 

244 label: Any, 

245 priority: Any, 

246 auto_region: Any, 

247) -> None: 

248 """Submit a job to a regional SQS queue for processing. 

249 

250 This is the recommended way to submit jobs as it: 

251 - Decouples submission from processing 

252 - Enables KEDA-based autoscaling 

253 - Provides better fault tolerance 

254 

255 If --auto-region is specified, the CLI will analyze capacity across all 

256 regions and submit to the optimal one. 

257 

258 Examples: 

259 gco jobs submit-sqs job.yaml --region us-east-1 

260 gco jobs submit-sqs job.yaml --auto-region 

261 gco jobs submit-sqs job.yaml -r us-west-2 --priority 10 

262 """ 

263 formatter = get_output_formatter(config) 

264 job_manager = get_job_manager(config) 

265 

266 # Parse labels 

267 labels = {} 

268 for lbl in label: 

269 if "=" in lbl: 269 ↛ 268line 269 didn't jump to line 268 because the condition on line 269 was always true

270 k, v = lbl.split("=", 1) 

271 labels[k] = v 

272 

273 try: 

274 # Auto-select region if requested 

275 if auto_region and not region: 

276 formatter.print_info("Analyzing capacity across regions...") 

277 from ..capacity import get_capacity_checker 

278 

279 checker = get_capacity_checker(config) 

280 recommendation = checker.recommend_region_for_job() 

281 region = recommendation["region"] 

282 formatter.print_info(f"Selected region: {region} ({recommendation['reason']})") 

283 elif not region: 

284 region = config.default_region 

285 

286 formatter.print_info(f"Submitting job to SQS queue in {region}...") 

287 

288 result = job_manager.submit_job_sqs( 

289 manifests=manifest_path, 

290 region=region, 

291 namespace=namespace, 

292 labels=labels if labels else None, 

293 priority=priority, 

294 ) 

295 

296 formatter.print_success(f"Job queued successfully in {region}") 

297 formatter.print(result) 

298 

299 except Exception as e: 

300 formatter.print_error(f"Failed to submit job to SQS: {e}") 

301 sys.exit(1) 

302 

303 

304@jobs.command("queue-status") 

305@click.option("--region", "-r", help="Specific region to check") 

306@click.option("--all-regions", "-a", is_flag=True, help="Check all regions") 

307@pass_config 

308def queue_status(config: Any, region: Any, all_regions: Any) -> None: 

309 """Show job queue status across regions. 

310 

311 Displays the number of pending, in-flight, and failed messages 

312 in the job queues. 

313 

314 Examples: 

315 gco jobs queue-status --region us-east-1 

316 gco jobs queue-status --all-regions 

317 """ 

318 formatter = get_output_formatter(config) 

319 job_manager = get_job_manager(config) 

320 

321 try: 

322 if all_regions: 

323 from ..aws_client import get_aws_client 

324 

325 aws_client = get_aws_client(config) 

326 stacks = aws_client.discover_regional_stacks() 

327 

328 results = [] 

329 for stack_region in stacks: 

330 try: 

331 status = job_manager.get_queue_status(stack_region) 

332 results.append(status) 

333 except Exception as e: 

334 logger.debug("Failed to get queue status for %s: %s", stack_region, e) 

335 continue 

336 

337 if not results: 

338 formatter.print_warning("No queue status available") 

339 return 

340 

341 # Format as table 

342 print("\n REGION PENDING IN-FLIGHT DELAYED DLQ") 

343 print(" " + "-" * 55) 

344 for r in results: 

345 dlq = r.get("dlq_messages", 0) 

346 print( 

347 f" {r['region']:<15} {r['messages_available']:>7} " 

348 f"{r['messages_in_flight']:>9} {r['messages_delayed']:>7} {dlq:>3}" 

349 ) 

350 else: 

351 target_region = region or config.default_region 

352 status = job_manager.get_queue_status(target_region) 

353 formatter.print(status) 

354 

355 except Exception as e: 

356 formatter.print_error(f"Failed to get queue status: {e}") 

357 sys.exit(1) 

358 

359 

360@jobs.command("list") 

361@click.option("--namespace", "-n", help="Filter by namespace") 

362@click.option("--region", "-r", help="Target region (required unless --all-regions)") 

363@click.option("--status", "-s", type=click.Choice(["pending", "running", "succeeded", "failed"])) 

364@click.option("--all-regions", "-a", is_flag=True, help="Query all regions via global API") 

365@click.option("--limit", "-l", default=50, help="Maximum jobs to return") 

366@pass_config 

367def list_jobs( 

368 config: Any, namespace: Any, region: Any, status: Any, all_regions: Any, limit: Any 

369) -> None: 

370 """List jobs in GCO clusters. 

371 

372 You must specify either --region for a specific cluster or --all-regions 

373 to query all clusters via the global aggregation API. 

374 

375 Examples: 

376 gco jobs list --region us-east-1 

377 gco jobs list --all-regions 

378 gco jobs list -r us-west-2 -n gco-jobs --status running 

379 """ 

380 formatter = get_output_formatter(config) 

381 job_manager = get_job_manager(config) 

382 

383 # Require explicit region or --all-regions 

384 if not region and not all_regions: 

385 formatter.print_error("You must specify --region or --all-regions") 

386 formatter.print_info(" Use --region/-r to query a specific cluster") 

387 formatter.print_info(" Use --all-regions/-a to query all clusters") 

388 sys.exit(1) 

389 

390 try: 

391 if all_regions: 

392 # Use global aggregation API 

393 result = job_manager.list_jobs_global( 

394 namespace=namespace, 

395 status=status, 

396 limit=limit, 

397 ) 

398 

399 if config.output_format == "table": 399 ↛ 434line 399 didn't jump to line 434 because the condition on line 399 was always true

400 # Print summary 

401 print("\n Global Jobs Summary") 

402 print(" " + "-" * 50) 

403 print(f" Total jobs: {result.get('total', 0)}") 

404 print(f" Regions queried: {result.get('regions_queried', 0)}") 

405 print(f" Regions successful: {result.get('regions_successful', 0)}") 

406 

407 # Print region summaries 

408 if result.get("region_summaries"): 408 ↛ 409line 408 didn't jump to line 409 because the condition on line 408 was never true

409 print("\n REGION COUNT TOTAL") 

410 print(" " + "-" * 35) 

411 for r in result["region_summaries"]: 

412 print(f" {r['region']:<15} {r['count']:>5} {r['total']:>5}") 

413 

414 # Print jobs 

415 jobs_data = result.get("jobs", []) 

416 if jobs_data: 416 ↛ 417line 416 didn't jump to line 417 because the condition on line 416 was never true

417 print( 

418 "\n NAME NAMESPACE REGION STATUS" 

419 ) 

420 print(" " + "-" * 75) 

421 for job in jobs_data[:limit]: 

422 name = job.get("metadata", {}).get("name", "")[:30] 

423 ns = job.get("metadata", {}).get("namespace", "")[:14] 

424 job_region = job.get("_source_region", "")[:14] 

425 job_status = job.get("computed_status", "unknown")[:10] 

426 print(f" {name:<30} {ns:<15} {job_region:<15} {job_status}") 

427 

428 # Print errors if any 

429 if result.get("errors"): 429 ↛ 430line 429 didn't jump to line 430 because the condition on line 429 was never true

430 print("\n Errors:") 

431 for err in result["errors"]: 

432 formatter.print_warning(f" {err['region']}: {err['error']}") 

433 else: 

434 formatter.print(result) 

435 else: 

436 # Query specific region 

437 jobs_list = job_manager.list_jobs( 

438 region=region, namespace=namespace, status=status, all_regions=False 

439 ) 

440 

441 if config.output_format == "table": 

442 print(format_job_table(jobs_list)) 

443 else: 

444 formatter.print(jobs_list) 

445 

446 except Exception as e: 

447 formatter.print_error(f"Failed to list jobs: {e}") 

448 sys.exit(1) 

449 

450 

451@jobs.command("get") 

452@click.argument("job_name") 

453@click.option("--namespace", "-n", default="gco-jobs", help="Job namespace") 

454@click.option("--region", "-r", required=True, help="Job region (required)") 

455@pass_config 

456def get_job(config: Any, job_name: Any, namespace: Any, region: Any) -> None: 

457 """Get details of a specific job. 

458 

459 Examples: 

460 gco jobs get my-job --region us-east-1 

461 gco jobs get training-job -r us-west-2 -n ml-jobs 

462 """ 

463 formatter = get_output_formatter(config) 

464 job_manager = get_job_manager(config) 

465 

466 try: 

467 job = job_manager.get_job(job_name, namespace, region) 

468 if job: 

469 formatter.print(job) 

470 else: 

471 formatter.print_error(f"Job {job_name} not found") 

472 sys.exit(1) 

473 except Exception as e: 

474 formatter.print_error(f"Failed to get job: {e}") 

475 sys.exit(1) 

476 

477 

478@jobs.command("logs") 

479@click.argument("job_name") 

480@click.option("--namespace", "-n", default="gco-jobs", help="Job namespace") 

481@click.option("--region", "-r", required=True, help="Job region (required)") 

482@click.option("--tail", "-t", default=100, help="Number of lines to show") 

483@click.option( 

484 "--since", "-s", default=24, type=int, help="Hours to look back in CloudWatch (default: 24)" 

485) 

486@click.option("--container", "-c", help="Container name (for multi-container pods)") 

487@pass_config 

488def get_logs( 

489 config: Any, job_name: Any, namespace: Any, region: Any, tail: Any, since: Any, container: Any 

490) -> None: 

491 """Get logs from a job. 

492 

493 Fetches logs from the Kubernetes API if the pod is still running. 

494 If the pod is gone, falls back to CloudWatch Logs automatically. 

495 Use --since to control how far back CloudWatch searches. 

496 

497 Examples: 

498 gco jobs logs my-job --region us-east-1 

499 gco jobs logs training-job -r us-west-2 -n ml-jobs --tail 500 

500 gco jobs logs old-job -r us-east-1 --since 72 

501 gco jobs logs multi-container-job -r us-east-1 --container sidecar 

502 """ 

503 formatter = get_output_formatter(config) 

504 job_manager = get_job_manager(config) 

505 

506 try: 

507 logs = job_manager.get_job_logs( 

508 job_name, namespace, region, tail_lines=tail, since_hours=since 

509 ) 

510 print(logs) 

511 except Exception as e: 

512 formatter.print_error(f"Failed to get logs: {e}") 

513 sys.exit(1) 

514 

515 

516@jobs.command("delete") 

517@click.argument("job_name") 

518@click.option("--namespace", "-n", default="gco-jobs", help="Job namespace") 

519@click.option("--region", "-r", required=True, help="Job region (required)") 

520@click.option("--yes", "-y", is_flag=True, help="Skip confirmation") 

521@pass_config 

522def delete_job(config: Any, job_name: Any, namespace: Any, region: Any, yes: Any) -> None: 

523 """Delete a job. 

524 

525 Examples: 

526 gco jobs delete my-job --region us-east-1 

527 gco jobs delete old-job -r us-west-2 -n ml-jobs -y 

528 """ 

529 formatter = get_output_formatter(config) 

530 job_manager = get_job_manager(config) 

531 

532 if not yes: 532 ↛ 533line 532 didn't jump to line 533 because the condition on line 532 was never true

533 click.confirm(f"Delete job {job_name} in namespace {namespace} ({region})?", abort=True) 

534 

535 try: 

536 job_manager.delete_job(job_name, namespace, region) 

537 formatter.print_success(f"Job {job_name} deleted") 

538 except Exception as e: 

539 formatter.print_error(f"Failed to delete job: {e}") 

540 sys.exit(1) 

541 

542 

543@jobs.command("events") 

544@click.argument("job_name") 

545@click.option("--namespace", "-n", default="gco-jobs", help="Job namespace") 

546@click.option("--region", "-r", required=True, help="Job region (required)") 

547@pass_config 

548def get_job_events(config: Any, job_name: Any, namespace: Any, region: Any) -> None: 

549 """Get Kubernetes events for a job. 

550 

551 Shows events related to the job and its pods, useful for debugging 

552 scheduling issues, resource problems, or startup failures. 

553 

554 Examples: 

555 gco jobs events my-job --region us-east-1 

556 gco jobs events training-job -n ml-jobs -r us-west-2 

557 """ 

558 formatter = get_output_formatter(config) 

559 job_manager = get_job_manager(config) 

560 

561 try: 

562 result = job_manager.get_job_events(job_name, namespace, region) 

563 

564 if config.output_format == "table": 564 ↛ 580line 564 didn't jump to line 580 because the condition on line 564 was always true

565 events = result.get("events", []) 

566 if not events: 566 ↛ 567line 566 didn't jump to line 567 because the condition on line 566 was never true

567 formatter.print_info("No events found for this job") 

568 return 

569 

570 print(f"\n Events for {job_name} ({result.get('count', 0)} total)") 

571 print(" " + "-" * 70) 

572 for event in events: 

573 event_type = event.get("type") or "Normal" 

574 reason = (event.get("reason") or "")[:20] 

575 message = (event.get("message") or "")[:50] 

576 timestamp = (event.get("lastTimestamp") or event.get("firstTimestamp") or "")[:19] 

577 marker = "⚠" if event_type == "Warning" else "✓" 

578 print(f" {marker} [{timestamp}] {reason:<20} {message}") 

579 else: 

580 formatter.print(result) 

581 

582 except Exception as e: 

583 formatter.print_error(f"Failed to get job events: {e}") 

584 sys.exit(1) 

585 

586 

587@jobs.command("pods") 

588@click.argument("job_name") 

589@click.option("--namespace", "-n", default="gco-jobs", help="Job namespace") 

590@click.option("--region", "-r", required=True, help="Job region (required)") 

591@pass_config 

592def get_job_pods(config: Any, job_name: Any, namespace: Any, region: Any) -> None: 

593 """Get pod details for a job. 

594 

595 Shows all pods created by the job with their status, node placement, 

596 and container information. 

597 

598 Examples: 

599 gco jobs pods my-job -r us-east-1 

600 gco jobs pods training-job -n ml-jobs -r us-west-2 

601 """ 

602 formatter = get_output_formatter(config) 

603 job_manager = get_job_manager(config) 

604 

605 try: 

606 result = job_manager.get_job_pods(job_name, namespace, region) 

607 

608 if config.output_format == "table": 608 ↛ 630line 608 didn't jump to line 630 because the condition on line 608 was always true

609 pods = result.get("pods", []) 

610 if not pods: 610 ↛ 611line 610 didn't jump to line 611 because the condition on line 610 was never true

611 formatter.print_info("No pods found for this job") 

612 return 

613 

614 print(f"\n Pods for {job_name} ({result.get('count', 0)} total)") 

615 print(" " + "-" * 80) 

616 print( 

617 " NAME NODE STATUS RESTARTS" 

618 ) 

619 print(" " + "-" * 80) 

620 for pod in pods: 

621 name = (pod.get("metadata", {}).get("name") or "")[:40] 

622 node = (pod.get("spec", {}).get("nodeName") or "")[:22] 

623 phase = (pod.get("status", {}).get("phase") or "Unknown")[:10] 

624 restarts = sum( 

625 c.get("restartCount", 0) 

626 for c in (pod.get("status", {}).get("containerStatuses") or []) 

627 ) 

628 print(f" {name:<40} {node:<23} {phase:<10} {restarts}") 

629 else: 

630 formatter.print(result) 

631 

632 except Exception as e: 

633 formatter.print_error(f"Failed to get job pods: {e}") 

634 sys.exit(1) 

635 

636 

637@jobs.command("pod-logs") 

638@click.argument("job_name") 

639@click.argument("pod_name") 

640@click.option("--namespace", "-n", default="gco-jobs", help="Job namespace") 

641@click.option("--region", "-r", required=True, help="Job region (required)") 

642@click.option("--tail", "-t", default=100, help="Number of lines to show") 

643@click.option("--container", "-c", help="Container name (for multi-container pods)") 

644@pass_config 

645def get_pod_logs_cmd( 

646 config: Any, 

647 job_name: Any, 

648 pod_name: Any, 

649 namespace: Any, 

650 region: Any, 

651 tail: Any, 

652 container: Any, 

653) -> None: 

654 """Get logs from a specific pod of a job. 

655 

656 Use 'gco jobs pods' first to list available pods, then use this 

657 command to get logs from a specific pod. 

658 

659 Examples: 

660 gco jobs pod-logs my-job my-job-abc123 -r us-east-1 

661 gco jobs pod-logs training-job training-job-xyz789 -r us-west-2 --tail 500 

662 gco jobs pod-logs multi-job multi-job-pod1 -r us-east-1 --container sidecar 

663 """ 

664 formatter = get_output_formatter(config) 

665 job_manager = get_job_manager(config) 

666 

667 try: 

668 result = job_manager.get_pod_logs( 

669 job_name=job_name, 

670 pod_name=pod_name, 

671 namespace=namespace, 

672 region=region, 

673 tail_lines=tail, 

674 container=container, 

675 ) 

676 

677 # Print logs directly 

678 logs = result.get("logs", "") 

679 if logs: 

680 print(logs) 

681 else: 

682 formatter.print_info("No logs available") 

683 

684 except Exception as e: 

685 formatter.print_error(f"Failed to get pod logs: {e}") 

686 sys.exit(1) 

687 

688 

689@jobs.command("metrics") 

690@click.argument("job_name") 

691@click.option("--namespace", "-n", default="gco-jobs", help="Job namespace") 

692@click.option("--region", "-r", required=True, help="Job region (required)") 

693@pass_config 

694def get_job_metrics(config: Any, job_name: Any, namespace: Any, region: Any) -> None: 

695 """Get resource usage metrics for a job. 

696 

697 Shows CPU and memory usage for all pods in the job. Requires 

698 metrics-server to be installed in the cluster. 

699 

700 Examples: 

701 gco jobs metrics my-job --region us-east-1 

702 gco jobs metrics training-job -n ml-jobs -r us-west-2 

703 """ 

704 formatter = get_output_formatter(config) 

705 job_manager = get_job_manager(config) 

706 

707 try: 

708 result = job_manager.get_job_metrics(job_name, namespace, region) 

709 

710 if config.output_format == "table": 710 ↛ 729line 710 didn't jump to line 729 because the condition on line 710 was always true

711 summary = result.get("summary", {}) 

712 pods = result.get("pods", []) 

713 

714 print(f"\n Resource Metrics for {job_name}") 

715 print(" " + "-" * 50) 

716 print(f" Total CPU: {summary.get('total_cpu_millicores', 0)}m") 

717 print(f" Total Memory: {summary.get('total_memory_mib', 0):.1f} MiB") 

718 print(f" Pod Count: {summary.get('pod_count', 0)}") 

719 

720 if pods: 

721 print("\n POD CPU(m) MEMORY(MiB)") 

722 print(" " + "-" * 65) 

723 for pod in pods: 

724 pod_name = pod.get("pod_name", "")[:40] 

725 cpu = sum(c.get("cpu_millicores", 0) for c in pod.get("containers", [])) 

726 mem = sum(c.get("memory_mib", 0) for c in pod.get("containers", [])) 

727 print(f" {pod_name:<40} {cpu:>6} {mem:>10.1f}") 

728 else: 

729 formatter.print(result) 

730 

731 except Exception as e: 

732 formatter.print_error(f"Failed to get job metrics: {e}") 

733 sys.exit(1) 

734 

735 

736@jobs.command("retry") 

737@click.argument("job_name") 

738@click.option("--namespace", "-n", default="gco-jobs", help="Job namespace") 

739@click.option("--region", "-r", required=True, help="Job region (required)") 

740@click.option("--yes", "-y", is_flag=True, help="Skip confirmation") 

741@pass_config 

742def retry_job(config: Any, job_name: Any, namespace: Any, region: Any, yes: Any) -> None: 

743 """Retry a failed job. 

744 

745 Creates a new job from the failed job's spec with a new name. 

746 The original job is preserved for debugging. 

747 

748 Examples: 

749 gco jobs retry failed-job --region us-east-1 

750 gco jobs retry training-job -n ml-jobs -r us-west-2 -y 

751 """ 

752 formatter = get_output_formatter(config) 

753 job_manager = get_job_manager(config) 

754 

755 if not yes: 755 ↛ 756line 755 didn't jump to line 756 because the condition on line 755 was never true

756 click.confirm(f"Retry job {job_name} in namespace {namespace} ({region})?", abort=True) 

757 

758 try: 

759 result = job_manager.retry_job(job_name, namespace, region) 

760 

761 if result.get("success"): 

762 formatter.print_success(f"Job retry created: {result.get('new_job')}") 

763 else: 

764 formatter.print_error(f"Failed to retry job: {result.get('message')}") 

765 sys.exit(1) 

766 

767 formatter.print(result) 

768 

769 except Exception as e: 

770 formatter.print_error(f"Failed to retry job: {e}") 

771 sys.exit(1) 

772 

773 

774@jobs.command("bulk-delete") 

775@click.option("--namespace", "-n", help="Filter by namespace") 

776@click.option("--status", "-s", type=click.Choice(["completed", "succeeded", "failed"])) 

777@click.option("--older-than-days", "-d", type=int, help="Delete jobs older than N days") 

778@click.option("--label-selector", "-l", help="Kubernetes label selector") 

779@click.option("--region", "-r", help="Target region (required unless --all-regions)") 

780@click.option("--all-regions", "-a", is_flag=True, help="Delete across all regions") 

781@click.option("--dry-run", is_flag=True, default=True, help="Only show what would be deleted") 

782@click.option("--execute", is_flag=True, help="Actually delete (disables dry-run)") 

783@click.option("--yes", "-y", is_flag=True, help="Skip confirmation") 

784@pass_config 

785def bulk_delete_jobs( 

786 config: Any, 

787 namespace: Any, 

788 status: Any, 

789 older_than_days: Any, 

790 label_selector: Any, 

791 region: Any, 

792 all_regions: Any, 

793 dry_run: Any, 

794 execute: Any, 

795 yes: Any, 

796) -> None: 

797 """Bulk delete jobs based on filters. 

798 

799 You must specify either --region for a specific cluster or --all-regions 

800 to delete across all clusters. 

801 

802 By default runs in dry-run mode. Use --execute to actually delete. 

803 

804 Examples: 

805 gco jobs bulk-delete --region us-east-1 --status completed --older-than-days 7 

806 gco jobs bulk-delete -r us-west-2 -n gco-jobs -s failed --execute -y 

807 gco jobs bulk-delete --all-regions --status failed --older-than-days 30 --execute 

808 """ 

809 formatter = get_output_formatter(config) 

810 job_manager = get_job_manager(config) 

811 

812 # Require explicit region or --all-regions 

813 if not region and not all_regions: 

814 formatter.print_error("You must specify --region or --all-regions") 

815 formatter.print_info(" Use --region/-r to delete from a specific cluster") 

816 formatter.print_info(" Use --all-regions/-a to delete across all clusters") 

817 sys.exit(1) 

818 

819 # --execute disables dry-run 

820 if execute: 

821 dry_run = False 

822 

823 if not dry_run and not yes: 823 ↛ 824line 823 didn't jump to line 824 because the condition on line 823 was never true

824 scope = f"region {region}" if region else "ALL regions" 

825 click.confirm( 

826 f"This will permanently delete matching jobs in {scope}. Continue?", abort=True 

827 ) 

828 

829 try: 

830 if region: 

831 # Single region delete 

832 result = job_manager.bulk_delete_jobs( 

833 namespace=namespace, 

834 status=status, 

835 older_than_days=older_than_days, 

836 label_selector=label_selector, 

837 region=region, 

838 dry_run=dry_run, 

839 ) 

840 else: 

841 # Global delete across all regions 

842 result = job_manager.bulk_delete_global( 

843 namespace=namespace, 

844 status=status, 

845 older_than_days=older_than_days, 

846 dry_run=dry_run, 

847 ) 

848 

849 if dry_run: 

850 formatter.print_info("DRY RUN - No jobs were deleted") 

851 formatter.print_info(f"Would delete {result.get('total_matched', 0)} jobs") 

852 else: 

853 formatter.print_success( 

854 f"Deleted {result.get('deleted_count', result.get('total_deleted', 0))} jobs" 

855 ) 

856 

857 formatter.print(result) 

858 

859 except Exception as e: 

860 formatter.print_error(f"Failed to bulk delete jobs: {e}") 

861 sys.exit(1) 

862 

863 

864@jobs.command("health") 

865@click.option("--region", "-r", help="Target region (required unless --all-regions)") 

866@click.option("--all-regions", "-a", is_flag=True, help="Get health across all regions") 

867@pass_config 

868def job_health(config: Any, region: Any, all_regions: Any) -> None: 

869 """Get health status of GCO clusters. 

870 

871 You must specify either --region for a specific cluster or --all-regions 

872 to get health status across all clusters. 

873 

874 Examples: 

875 gco jobs health --region us-east-1 

876 gco jobs health --all-regions 

877 """ 

878 formatter = get_output_formatter(config) 

879 job_manager = get_job_manager(config) 

880 

881 # Require explicit region or --all-regions 

882 if not region and not all_regions: 

883 formatter.print_error("You must specify --region or --all-regions") 

884 formatter.print_info(" Use --region/-r to check a specific cluster") 

885 formatter.print_info(" Use --all-regions/-a to check all clusters") 

886 sys.exit(1) 

887 

888 try: 

889 if all_regions: 

890 result = job_manager.get_global_health() 

891 

892 if config.output_format == "table": 892 ↛ 911line 892 didn't jump to line 911 because the condition on line 892 was always true

893 print( 

894 f"\n Global Health Status: {result.get('overall_status', 'unknown').upper()}" 

895 ) 

896 print(" " + "-" * 50) 

897 print( 

898 f" Healthy regions: {result.get('healthy_regions', 0)}/{result.get('total_regions', 0)}" 

899 ) 

900 

901 regions = result.get("regions", []) 

902 if regions: 902 ↛ exitline 902 didn't return from function 'job_health' because the condition on line 902 was always true

903 print("\n REGION STATUS CLUSTER") 

904 print(" " + "-" * 50) 

905 for r in regions: 

906 status_icon = "✓" if r.get("status") == "healthy" else "✗" 

907 print( 

908 f" {status_icon} {r.get('region', ''):<13} {r.get('status', ''):<12} {r.get('cluster_id', '')}" 

909 ) 

910 else: 

911 formatter.print(result) 

912 else: 

913 # Single region health check via API 

914 result = job_manager._aws_client.get_health(region=region) 

915 formatter.print(result) 

916 

917 except Exception as e: 

918 formatter.print_error(f"Failed to get health status: {e}") 

919 sys.exit(1) 

920 

921 

922@jobs.command("submit-queue") 

923@click.argument("manifest_path", type=click.Path(exists=True)) 

924@click.option("--region", "-r", required=True, help="Target region for job execution") 

925@click.option("--namespace", "-n", default="gco-jobs", help="Kubernetes namespace") 

926@click.option("--priority", "-p", default=0, help="Job priority (0-100, higher = more important)") 

927@click.option("--label", "-l", multiple=True, help="Add labels (key=value)") 

928@pass_config 

929def submit_job_queue( 

930 config: Any, manifest_path: Any, region: Any, namespace: Any, priority: Any, label: Any 

931) -> None: 

932 """Submit a job to the global DynamoDB queue for regional pickup. 

933 

934 Jobs are stored in DynamoDB and picked up by the target region's 

935 manifest processor. This enables global job submission with 

936 centralized tracking and status history. 

937 

938 This is different from submit-sqs which uses regional SQS queues. 

939 The DynamoDB queue provides: 

940 - Global visibility of all queued jobs 

941 - Status tracking and history 

942 - Priority-based scheduling 

943 - Cross-region job management 

944 

945 Use 'gco queue list' to view queued jobs and their status. 

946 

947 Examples: 

948 gco jobs submit-queue job.yaml --region us-east-1 

949 gco jobs submit-queue job.yaml -r us-west-2 --priority 50 

950 gco jobs submit-queue job.yaml -r us-east-1 -l team=ml -l project=training 

951 """ 

952 

953 from gco.services.manifest_processor import safe_load_yaml 

954 

955 formatter = get_output_formatter(config) 

956 

957 # Parse labels 

958 labels = {} 

959 for lbl in label: 

960 if "=" in lbl: 960 ↛ 959line 960 didn't jump to line 959 because the condition on line 960 was always true

961 k, v = lbl.split("=", 1) 

962 labels[k] = v 

963 

964 try: 

965 # Load manifest 

966 with open(manifest_path, encoding="utf-8") as f: 

967 manifest = safe_load_yaml(f, allow_aliases=False) 

968 

969 # Submit via API 

970 from ..aws_client import get_aws_client 

971 

972 aws_client = get_aws_client(config) 

973 

974 result = aws_client.call_api( 

975 method="POST", 

976 path="/api/v1/queue/jobs", 

977 region=region, 

978 body={ 

979 "manifest": manifest, 

980 "target_region": region, 

981 "namespace": namespace, 

982 "priority": priority, 

983 "labels": labels if labels else None, 

984 }, 

985 ) 

986 

987 formatter.print_success(f"Job queued for {region}") 

988 formatter.print_info("Use 'gco queue list' or 'gco queue get <job_id>' to track status") 

989 formatter.print(result) 

990 

991 except Exception as e: 

992 formatter.print_error(f"Failed to queue job: {e}") 

993 sys.exit(1)