Coverage for cli / jobs.py: 94%

386 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-30 21:47 +0000

1""" 

2Job management for GCO CLI. 

3 

4Provides functionality to submit, query, and manage jobs across GCO clusters. 

5""" 

6 

7from collections.abc import Callable 

8from dataclasses import dataclass, field 

9from datetime import UTC, datetime 

10from pathlib import Path 

11from typing import Any 

12 

13import yaml 

14 

15from gco.services.manifest_processor import safe_load_all_yaml 

16 

17from .aws_client import get_aws_client 

18from .config import GCOConfig, get_config 

19 

20logger = __import__("logging").getLogger(__name__) 

21 

22 

23def _format_duration(seconds: int) -> str: 

24 """Format seconds into a human-readable duration string.""" 

25 if seconds < 60: 

26 return f"{seconds}s" 

27 minutes, secs = divmod(seconds, 60) 

28 if minutes < 60: 

29 return f"{minutes}m{secs:02d}s" 

30 hours, mins = divmod(minutes, 60) 

31 return f"{hours}h{mins:02d}m{secs:02d}s" 

32 

33 

34def _first_manifest_namespace(manifests: list[dict[str, Any]]) -> str | None: 

35 """Return the first explicit ``metadata.namespace`` found in a manifest list. 

36 

37 Used by the SQS submission path to populate the envelope ``namespace`` 

38 field (informational — the queue processor reads each manifest's own 

39 namespace for validation). Returns None if no manifest declares one. 

40 """ 

41 for manifest in manifests: 

42 ns = manifest.get("metadata", {}).get("namespace") if isinstance(manifest, dict) else None 

43 if ns: 43 ↛ 44line 43 didn't jump to line 44 because the condition on line 43 was never true

44 return str(ns) 

45 return None 

46 

47 

48@dataclass 

49class JobInfo: 

50 """Information about a Kubernetes job.""" 

51 

52 name: str 

53 namespace: str 

54 region: str 

55 status: str # "pending", "running", "succeeded", "failed" 

56 created_time: datetime | None = None 

57 start_time: datetime | None = None 

58 completion_time: datetime | None = None 

59 active_pods: int = 0 

60 succeeded_pods: int = 0 

61 failed_pods: int = 0 

62 parallelism: int = 1 

63 completions: int = 1 

64 labels: dict[str, str] = field(default_factory=dict) 

65 

66 @property 

67 def is_complete(self) -> bool: 

68 return self.status in ("succeeded", "failed") 

69 

70 @property 

71 def duration_seconds(self) -> int | None: 

72 if self.start_time and self.completion_time: 

73 return int((self.completion_time - self.start_time).total_seconds()) 

74 if self.start_time: 

75 return int((datetime.now(UTC) - self.start_time).total_seconds()) 

76 return None 

77 

78 

79class JobManager: 

80 """ 

81 Manages jobs across GCO clusters. 

82 

83 Provides: 

84 - Job submission with region targeting 

85 - Job status queries across regions 

86 - Job logs retrieval 

87 - Job deletion 

88 """ 

89 

90 def __init__(self, config: GCOConfig | None = None): 

91 self.config = config or get_config() 

92 self._aws_client = get_aws_client(config) 

93 # Configure regional API mode if enabled 

94 if hasattr(self.config, "use_regional_api") and self.config.use_regional_api: 

95 self._aws_client.set_use_regional_api(True) 

96 

97 def load_manifests(self, path: str) -> list[dict[str, Any]]: 

98 """ 

99 Load Kubernetes manifests from a file or directory. 

100 

101 Args: 

102 path: Path to YAML file or directory containing YAML files 

103 

104 Returns: 

105 List of manifest dictionaries 

106 """ 

107 manifests = [] 

108 path_obj = Path(path) 

109 

110 if path_obj.is_file(): 

111 manifests.extend(self._load_yaml_file(path_obj)) 

112 elif path_obj.is_dir(): 

113 for yaml_file in sorted(path_obj.glob("*.yaml")): 

114 manifests.extend(self._load_yaml_file(yaml_file)) 

115 for yaml_file in sorted(path_obj.glob("*.yml")): 

116 manifests.extend(self._load_yaml_file(yaml_file)) 

117 else: 

118 raise FileNotFoundError(f"Path not found: {path}") 

119 

120 return manifests 

121 

122 def _load_yaml_file(self, path: Path) -> list[dict[str, Any]]: 

123 """Load manifests from a single YAML file.""" 

124 with open(path, encoding="utf-8") as f: 

125 return safe_load_all_yaml(f, allow_aliases=False) 

126 

127 def submit_job( 

128 self, 

129 manifests: str | list[dict[str, Any]], 

130 namespace: str | None = None, 

131 target_region: str | None = None, 

132 dry_run: bool = False, 

133 labels: dict[str, str] | None = None, 

134 ) -> dict[str, Any]: 

135 """ 

136 Submit a job to GCO. 

137 

138 Args: 

139 manifests: Path to manifest file/directory or list of manifest dicts 

140 namespace: Fallback namespace for manifests that don't declare 

141 their own. When set, each manifest's ``metadata.namespace`` is 

142 filled in only if missing — existing values are preserved so 

143 users who've declared a target namespace in the manifest can 

144 rely on it reaching the server untouched. Server-side 

145 validation enforces the allowlist. 

146 target_region: Force job to specific region 

147 dry_run: Validate without applying 

148 labels: Additional labels to add to manifests 

149 

150 Returns: 

151 Submission result dictionary 

152 """ 

153 # Load manifests if path provided 

154 manifest_list = self.load_manifests(manifests) if isinstance(manifests, str) else manifests 

155 

156 # Apply namespace as a fallback only — preserve any namespace the 

157 # manifest declared itself. 

158 if namespace: 

159 for manifest in manifest_list: 

160 if "metadata" not in manifest: 

161 manifest["metadata"] = {} 

162 manifest["metadata"].setdefault("namespace", namespace) 

163 

164 # Apply additional labels 

165 if labels: 

166 for manifest in manifest_list: 

167 if "metadata" not in manifest: 

168 manifest["metadata"] = {} 

169 if "labels" not in manifest["metadata"]: 169 ↛ 171line 169 didn't jump to line 171 because the condition on line 169 was always true

170 manifest["metadata"]["labels"] = {} 

171 manifest["metadata"]["labels"].update(labels) 

172 

173 # Submit via API 

174 return self._aws_client.submit_manifests( 

175 manifests=manifest_list, 

176 namespace=namespace, 

177 target_region=target_region, 

178 dry_run=dry_run, 

179 ) 

180 

181 def submit_job_direct( 

182 self, 

183 manifests: str | list[dict[str, Any]], 

184 region: str, 

185 namespace: str | None = None, 

186 dry_run: bool = False, 

187 labels: dict[str, str] | None = None, 

188 ) -> dict[str, Any]: 

189 """ 

190 Submit a job directly to a regional cluster using kubectl. 

191 

192 This bypasses the API Gateway and submits directly to the EKS cluster 

193 using kubectl. Requires: 

194 - kubectl installed and in PATH 

195 - EKS access entry configured for your IAM principal 

196 - AWS credentials with eks:DescribeCluster permission 

197 

198 Args: 

199 manifests: Path to manifest file/directory or list of manifest dicts 

200 region: Target region for direct submission (required) 

201 namespace: Fallback namespace for manifests that don't declare 

202 their own. When set, each manifest's ``metadata.namespace`` is 

203 filled in only if missing — existing values are preserved so 

204 users who've declared a target namespace in the manifest can 

205 rely on it reaching ``kubectl apply`` untouched. 

206 dry_run: Validate without applying 

207 labels: Additional labels to add to manifests 

208 

209 Returns: 

210 Submission result dictionary 

211 """ 

212 import subprocess 

213 import tempfile 

214 import uuid 

215 

216 # Load manifests if path provided 

217 manifest_list = self.load_manifests(manifests) if isinstance(manifests, str) else manifests 

218 

219 # Apply namespace as a fallback only — preserve any namespace the 

220 # manifest declared itself. 

221 if namespace: 

222 for manifest in manifest_list: 

223 if "metadata" not in manifest: 

224 manifest["metadata"] = {} 

225 manifest["metadata"].setdefault("namespace", namespace) 

226 

227 # Apply additional labels 

228 if labels: 

229 for manifest in manifest_list: 

230 if "metadata" not in manifest: 

231 manifest["metadata"] = {} 

232 if "labels" not in manifest["metadata"]: 232 ↛ 234line 232 didn't jump to line 234 because the condition on line 232 was always true

233 manifest["metadata"]["labels"] = {} 

234 manifest["metadata"]["labels"].update(labels) 

235 

236 # Get cluster name from stack 

237 stack = self._aws_client.get_regional_stack(region) 

238 if not stack: 

239 raise ValueError(f"No GCO stack found in region {region}") 

240 

241 cluster_name = stack.cluster_name 

242 

243 # Update kubeconfig for the cluster 

244 from .kubectl_helpers import update_kubeconfig 

245 

246 update_kubeconfig(cluster_name, region) 

247 

248 # Handle existing Job resources before applying 

249 warnings: list[str] = [] 

250 if not dry_run: 

251 for manifest in manifest_list: 

252 if manifest.get("kind") != "Job": 252 ↛ 253line 252 didn't jump to line 253 because the condition on line 252 was never true

253 continue 

254 job_name = manifest.get("metadata", {}).get("name") 

255 job_ns = manifest.get("metadata", {}).get("namespace", namespace or "default") 

256 if not job_name: 

257 continue 

258 

259 existing_status = self._get_kubectl_job_status(job_name, job_ns) 

260 if existing_status is None: 

261 # No existing job — nothing to do 

262 continue 

263 

264 if existing_status in ("complete", "failed"): 

265 # Finished job — safe to delete and replace 

266 subprocess.run( 

267 ["kubectl", "delete", "job", job_name, "-n", job_ns], 

268 capture_output=True, 

269 text=True, 

270 ) 

271 else: 

272 # Job is still active — auto-rename to avoid collision 

273 suffix = uuid.uuid4().hex[:5] 

274 new_name = f"{job_name}-{suffix}" 

275 original_name = job_name 

276 manifest["metadata"]["name"] = new_name 

277 warnings.append( 

278 f"Job '{original_name}' is still running in namespace " 

279 f"'{job_ns}'. Renamed new submission to '{new_name}'." 

280 ) 

281 logger.warning( 

282 "Job %s is active in %s, renamed to %s", 

283 original_name, 

284 job_ns, 

285 new_name, 

286 ) 

287 

288 # Write manifests to temp file 

289 with tempfile.NamedTemporaryFile(mode="w", suffix=".yaml", delete=False) as f: 

290 yaml.dump_all(manifest_list, f) 

291 f.flush() # Ensure content is written before using f.name 

292 temp_path = f.name # nosemgrep: tempfile-without-flush 

293 

294 try: 

295 # Build kubectl command 

296 kubectl_cmd = ["kubectl", "apply", "-f", temp_path] 

297 

298 if dry_run: 

299 kubectl_cmd.extend(["--dry-run=client"]) 

300 

301 # Run kubectl apply 

302 result = subprocess.run( 

303 kubectl_cmd, capture_output=True, text=True 

304 ) # nosemgrep: dangerous-subprocess-use-audit - kubectl_cmd is a list ["kubectl","apply","-f",temp_path]; temp_path is a secure tempfile, not user input 

305 

306 if result.returncode != 0: 

307 raise RuntimeError(f"kubectl apply failed: {result.stderr}") 

308 

309 # Parse output to get job name 

310 output_lines = result.stdout.strip().split("\n") 

311 created_resources = [] 

312 for line in output_lines: 

313 if line: 313 ↛ 312line 313 didn't jump to line 312 because the condition on line 313 was always true

314 created_resources.append(line) 

315 

316 # Get job name from first manifest (may have been renamed) 

317 job_name = None 

318 for manifest in manifest_list: 318 ↛ 323line 318 didn't jump to line 323 because the loop on line 318 didn't complete

319 if manifest.get("kind") == "Job": 319 ↛ 318line 319 didn't jump to line 318 because the condition on line 319 was always true

320 job_name = manifest.get("metadata", {}).get("name") 

321 break 

322 

323 response: dict[str, Any] = { 

324 "status": "success", 

325 "method": "kubectl", 

326 "cluster": cluster_name, 

327 "region": region, 

328 "namespace": namespace or "default", 

329 "job_name": job_name, 

330 "dry_run": dry_run, 

331 "resources": created_resources, 

332 "output": result.stdout, 

333 } 

334 if warnings: 

335 response["warnings"] = warnings 

336 return response 

337 

338 finally: 

339 # Clean up temp file 

340 import os 

341 

342 os.unlink(temp_path) 

343 

344 def _get_kubectl_job_status(self, job_name: str, namespace: str) -> str | None: 

345 """Check the status of an existing Job via kubectl. 

346 

347 Returns: 

348 "complete", "failed", "active", or None if the job doesn't exist. 

349 """ 

350 import json 

351 import subprocess 

352 

353 result = subprocess.run( 

354 [ 

355 "kubectl", 

356 "get", 

357 "job", 

358 job_name, 

359 "-n", 

360 namespace, 

361 "-o", 

362 "json", 

363 ], 

364 capture_output=True, 

365 text=True, 

366 ) 

367 if result.returncode != 0: 

368 return None # Job doesn't exist 

369 

370 try: 

371 job_data = json.loads(result.stdout) 

372 except json.JSONDecodeError, KeyError: 

373 return None 

374 

375 conditions = job_data.get("status", {}).get("conditions") or [] 

376 for condition in conditions: 

377 cond_type = condition.get("type", "") 

378 cond_status = condition.get("status", "") 

379 if cond_type == "Complete" and cond_status == "True": 

380 return "complete" 

381 if cond_type == "Failed" and cond_status == "True": 

382 return "failed" 

383 return "active" 

384 

385 def list_jobs( 

386 self, 

387 region: str | None = None, 

388 namespace: str | None = None, 

389 status: str | None = None, 

390 all_regions: bool = False, 

391 ) -> list[JobInfo]: 

392 """ 

393 List jobs across GCO clusters. 

394 

395 Args: 

396 region: Specific region to query 

397 namespace: Filter by namespace 

398 status: Filter by status 

399 all_regions: Query all discovered regions 

400 

401 Returns: 

402 List of JobInfo objects 

403 """ 

404 jobs = [] 

405 

406 if all_regions: 

407 # Query all discovered regional stacks 

408 stacks = self._aws_client.discover_regional_stacks() 

409 for stack_region in stacks: 

410 try: 

411 region_jobs = self._query_jobs_in_region(stack_region, namespace, status) 

412 jobs.extend(region_jobs) 

413 except Exception as e: 

414 logger.warning("Failed to query jobs in %s: %s", stack_region, e) 

415 continue 

416 elif region: 

417 jobs = self._query_jobs_in_region(region, namespace, status) 

418 else: 

419 # Use default region 

420 jobs = self._query_jobs_in_region(self.config.default_region, namespace, status) 

421 

422 return jobs 

423 

424 def _query_jobs_in_region( 

425 self, region: str, namespace: str | None, status: str | None 

426 ) -> list[JobInfo]: 

427 """Query jobs in a specific region.""" 

428 try: 

429 response = self._aws_client.get_jobs(region=region, namespace=namespace, status=status) 

430 

431 jobs = [] 

432 # response is a list, but we expect a dict with "jobs" key from the API 

433 job_list = response.get("jobs", []) if isinstance(response, dict) else response 

434 for job_data in job_list: 

435 jobs.append(self._parse_job_info(job_data, region)) 

436 

437 return jobs 

438 except Exception as exc: 

439 logger.warning("Failed to query jobs in %s: %s", region, exc) 

440 return [] 

441 

442 def _parse_job_info(self, job_data: dict[str, Any], region: str) -> JobInfo: 

443 """Parse job data into JobInfo object.""" 

444 metadata = job_data.get("metadata", {}) 

445 status_data = job_data.get("status", {}) 

446 spec = job_data.get("spec", {}) 

447 

448 # Determine job status 

449 conditions = status_data.get("conditions", []) 

450 job_status = "pending" 

451 for condition in conditions: 

452 if condition.get("type") == "Complete" and condition.get("status") == "True": 

453 job_status = "succeeded" 

454 break 

455 if condition.get("type") == "Failed" and condition.get("status") == "True": 455 ↛ 451line 455 didn't jump to line 451 because the condition on line 455 was always true

456 job_status = "failed" 

457 break 

458 

459 if job_status == "pending" and status_data.get("active", 0) > 0: 

460 job_status = "running" 

461 

462 # Parse timestamps 

463 created_time = None 

464 if metadata.get("creationTimestamp"): 

465 created_time = datetime.fromisoformat( 

466 metadata["creationTimestamp"].replace("Z", "+00:00") 

467 ) 

468 

469 start_time = None 

470 if status_data.get("startTime"): 

471 start_time = datetime.fromisoformat(status_data["startTime"].replace("Z", "+00:00")) 

472 

473 completion_time = None 

474 if status_data.get("completionTime"): 

475 completion_time = datetime.fromisoformat( 

476 status_data["completionTime"].replace("Z", "+00:00") 

477 ) 

478 

479 return JobInfo( 

480 name=metadata.get("name", ""), 

481 namespace=metadata.get("namespace", "default"), 

482 region=region, 

483 status=job_status, 

484 created_time=created_time, 

485 start_time=start_time, 

486 completion_time=completion_time, 

487 active_pods=status_data.get("active", 0), 

488 succeeded_pods=status_data.get("succeeded", 0), 

489 failed_pods=status_data.get("failed", 0), 

490 parallelism=spec.get("parallelism", 1), 

491 completions=spec.get("completions", 1), 

492 labels=metadata.get("labels", {}), 

493 ) 

494 

495 def get_job(self, job_name: str, namespace: str, region: str | None = None) -> JobInfo | None: 

496 """ 

497 Get detailed information about a specific job. 

498 

499 Args: 

500 job_name: Name of the job 

501 namespace: Namespace of the job 

502 region: Region where the job is running 

503 

504 Returns: 

505 JobInfo or None if not found 

506 """ 

507 try: 

508 response = self._aws_client.get_job_details( 

509 job_name=job_name, namespace=namespace, region=region or self.config.default_region 

510 ) 

511 return self._parse_job_info(response, region or self.config.default_region) 

512 except Exception as e: 

513 logger.debug("Failed to get job details for %s: %s", job_name, e) 

514 return None 

515 

516 def get_job_logs( 

517 self, 

518 job_name: str, 

519 namespace: str, 

520 region: str | None = None, 

521 tail_lines: int = 100, 

522 follow: bool = False, 

523 since_hours: int = 24, 

524 ) -> str: 

525 """ 

526 Get logs from a job. 

527 

528 Tries the Kubernetes API first (via the GCO API). If the pod is no 

529 longer available (completed/deleted), falls back to CloudWatch Logs 

530 where Container Insights stores application logs. 

531 

532 Args: 

533 job_name: Name of the job 

534 namespace: Namespace of the job 

535 region: Region where the job is running 

536 tail_lines: Number of lines to return 

537 follow: Stream logs (not implemented yet) 

538 since_hours: Hours to look back in CloudWatch (default 24) 

539 

540 Returns: 

541 Log content as string 

542 """ 

543 if follow: 

544 raise NotImplementedError("Log streaming not yet implemented") 

545 

546 target_region = region or self.config.default_region 

547 

548 try: 

549 return self._aws_client.get_job_logs( 

550 job_name=job_name, 

551 namespace=namespace, 

552 region=target_region, 

553 tail_lines=tail_lines, 

554 ) 

555 except RuntimeError as e: 

556 error_msg = str(e) 

557 # If the pod is gone or pending, try CloudWatch 

558 if any( 

559 hint in error_msg.lower() 

560 for hint in ["not found", "pending", "no pods", "terminated", "completed"] 

561 ): 

562 logger.info("Pod not available, falling back to CloudWatch Logs") 

563 try: 

564 return self._get_cloudwatch_logs( 

565 job_name=job_name, 

566 region=target_region, 

567 tail_lines=tail_lines, 

568 since_hours=since_hours, 

569 ) 

570 except Exception as cw_err: 

571 logger.debug("CloudWatch fallback failed: %s", cw_err) 

572 raise RuntimeError( 

573 f"{error_msg}\n\n" 

574 f"CloudWatch Logs fallback also failed: {cw_err}\n" 

575 f"Tip: Container logs appear in CloudWatch within a few minutes. " 

576 f"If the job just finished, try again shortly." 

577 ) from e 

578 raise 

579 

580 def _get_cloudwatch_logs( 

581 self, 

582 job_name: str, 

583 region: str, 

584 tail_lines: int = 100, 

585 since_hours: int = 24, 

586 ) -> str: 

587 """ 

588 Fetch job logs from CloudWatch Logs (Container Insights). 

589 

590 The CloudWatch Observability addon ships container stdout/stderr to: 

591 /aws/containerinsights/{cluster_name}/application 

592 

593 Args: 

594 job_name: Name of the job (used to filter log streams) 

595 region: AWS region 

596 tail_lines: Number of log lines to return 

597 since_hours: Hours to look back (default 24) 

598 

599 Returns: 

600 Log content as string 

601 """ 

602 cluster_name = f"{self.config.project_name}-{region}" 

603 log_group = f"/aws/containerinsights/{cluster_name}/application" 

604 

605 logs_client = self._aws_client._session.client("logs", region_name=region) 

606 

607 import time 

608 

609 now = int(time.time()) 

610 start_time = now - (since_hours * 3600) 

611 

612 query = ( 

613 f"fields @timestamp, @message " 

614 f'| filter @logStream like "{job_name}" ' 

615 f"| sort @timestamp asc " 

616 f"| limit {tail_lines}" 

617 ) 

618 

619 start_query = logs_client.start_query( 

620 logGroupName=log_group, 

621 startTime=start_time, 

622 endTime=now, 

623 queryString=query, 

624 ) 

625 query_id = start_query["queryId"] 

626 

627 # Poll for results (CloudWatch Insights is async) 

628 result = None 

629 for _ in range(30): # up to 30 seconds 

630 time.sleep(1) 

631 result = logs_client.get_query_results(queryId=query_id) 

632 if result["status"] in ("Complete", "Failed", "Cancelled"): 

633 break 

634 

635 if result is None or result["status"] != "Complete": 

636 status = result["status"] if result else "unknown" 

637 raise RuntimeError( 

638 f"CloudWatch Logs query did not complete (status: {status}). Try again in a moment." 

639 ) 

640 

641 if not result["results"]: 

642 raise RuntimeError( 

643 f"No logs found in CloudWatch for job '{job_name}' " 

644 f"in the last {since_hours} hours (log group: {log_group}). " 

645 f"Logs may take 1-2 minutes to appear after a pod runs. " 

646 f"Use --since to search further back, or check the job name " 

647 f"with: gco jobs list -r {region}" 

648 ) 

649 

650 # Extract log messages from results. 

651 # CloudWatch Container Insights wraps logs in a JSON envelope: 

652 # {"time":"...","stream":"stdout","log":"actual message","kubernetes":{...}} 

653 # We parse out the "log" field for clean output, falling back to the 

654 # raw message if it's not JSON. 

655 import json as _json 

656 

657 lines = [] 

658 for row in result["results"]: 

659 for entry in row: 659 ↛ 658line 659 didn't jump to line 658 because the loop on line 659 didn't complete

660 if entry["field"] == "@message": 

661 raw = entry["value"].rstrip() 

662 try: 

663 parsed = _json.loads(raw) 

664 lines.append(parsed.get("log", raw).rstrip()) 

665 except ValueError, TypeError: 

666 lines.append(raw) 

667 break 

668 

669 header = f"[CloudWatch Logs — {log_group}]\n" 

670 return header + "\n".join(lines) 

671 

672 def delete_job( 

673 self, job_name: str, namespace: str, region: str | None = None 

674 ) -> dict[str, Any]: 

675 """ 

676 Delete a job. 

677 

678 Args: 

679 job_name: Name of the job 

680 namespace: Namespace of the job 

681 region: Region where the job is running 

682 

683 Returns: 

684 Deletion result 

685 """ 

686 return self._aws_client.delete_job( 

687 job_name=job_name, namespace=namespace, region=region or self.config.default_region 

688 ) 

689 

690 def wait_for_job( 

691 self, 

692 job_name: str, 

693 namespace: str, 

694 region: str | None = None, 

695 timeout_seconds: int = 3600, 

696 poll_interval: int = 10, 

697 progress_callback: Callable[[JobInfo, int], None] | None = None, 

698 ) -> JobInfo: 

699 """ 

700 Wait for a job to complete with progress reporting. 

701 

702 Args: 

703 job_name: Name of the job 

704 namespace: Namespace of the job 

705 region: Region where the job is running 

706 timeout_seconds: Maximum time to wait 

707 poll_interval: Seconds between status checks 

708 progress_callback: Optional callable(JobInfo, elapsed_seconds) for progress updates. 

709 If None, a default stderr progress line is printed. 

710 

711 Returns: 

712 Final JobInfo 

713 

714 Raises: 

715 TimeoutError: If job doesn't complete within timeout 

716 """ 

717 import sys 

718 import time 

719 

720 start_time = time.time() 

721 

722 while True: 

723 job = self.get_job(job_name, namespace, region) 

724 

725 if job is None: 

726 raise ValueError(f"Job {job_name} not found in namespace {namespace}") 

727 

728 elapsed = time.time() - start_time 

729 elapsed_str = _format_duration(int(elapsed)) 

730 

731 if job.is_complete: 

732 # Clear the progress line and return 

733 sys.stderr.write("\r\033[K") 

734 sys.stderr.flush() 

735 return job 

736 

737 # Build progress message 

738 pods_info = ( 

739 f"{job.active_pods} active, {job.succeeded_pods}/{job.completions} succeeded" 

740 ) 

741 if job.failed_pods: 741 ↛ 742line 741 didn't jump to line 742 because the condition on line 741 was never true

742 pods_info += f", {job.failed_pods} failed" 

743 

744 status_line = f"{job.status.capitalize()}{pods_info}{elapsed_str} elapsed" 

745 

746 if progress_callback: 746 ↛ 750line 746 didn't jump to line 750 because the condition on line 746 was always true

747 progress_callback(job, int(elapsed)) 

748 else: 

749 # Overwrite the same line on stderr 

750 sys.stderr.write(f"\r\033[K{status_line}") 

751 sys.stderr.flush() 

752 

753 if elapsed >= timeout_seconds: 

754 sys.stderr.write("\r\033[K") 

755 sys.stderr.flush() 

756 raise TimeoutError( 

757 f"Job {job_name} did not complete within {timeout_seconds} seconds " 

758 f"(last status: {job.status}, pods: {pods_info})" 

759 ) 

760 

761 time.sleep(poll_interval) # nosemgrep: arbitrary-sleep - intentional polling delay 

762 

763 def submit_job_sqs( 

764 self, 

765 manifests: str | list[dict[str, Any]], 

766 region: str, 

767 namespace: str | None = None, 

768 labels: dict[str, str] | None = None, 

769 priority: int = 0, 

770 ) -> dict[str, Any]: 

771 """ 

772 Submit a job to a regional SQS queue for processing. 

773 

774 This is the recommended way to submit jobs as it: 

775 - Decouples submission from processing 

776 - Enables KEDA-based autoscaling 

777 - Provides better fault tolerance 

778 

779 Args: 

780 manifests: Path to manifest file/directory or list of manifest dicts 

781 region: Target region for job submission (required) 

782 namespace: Fallback namespace for manifests that don't declare 

783 their own. When set, each manifest's ``metadata.namespace`` is 

784 filled in only if missing — existing values are preserved so 

785 users who've declared a target namespace in the manifest can 

786 rely on it reaching the queue processor untouched. Server-side 

787 validation enforces the allowlist. 

788 labels: Additional labels to add to manifests 

789 priority: Job priority (higher = more important) 

790 

791 Returns: 

792 Submission result dictionary with message_id and queue info 

793 """ 

794 import json 

795 import uuid 

796 

797 import boto3 

798 

799 # Load manifests if path provided 

800 manifest_list = self.load_manifests(manifests) if isinstance(manifests, str) else manifests 

801 

802 # Apply namespace as a fallback only — preserve any namespace the 

803 # manifest declared itself. 

804 if namespace: 

805 for manifest in manifest_list: 

806 if "metadata" not in manifest: 

807 manifest["metadata"] = {} 

808 manifest["metadata"].setdefault("namespace", namespace) 

809 

810 # Apply additional labels 

811 if labels: 

812 for manifest in manifest_list: 

813 if "metadata" not in manifest: 813 ↛ 814line 813 didn't jump to line 814 because the condition on line 813 was never true

814 manifest["metadata"] = {} 

815 if "labels" not in manifest["metadata"]: 815 ↛ 817line 815 didn't jump to line 817 because the condition on line 815 was always true

816 manifest["metadata"]["labels"] = {} 

817 manifest["metadata"]["labels"].update(labels) 

818 

819 # Get queue URL from stack 

820 stack = self._aws_client.get_regional_stack(region) 

821 if not stack: 

822 raise ValueError(f"No GCO stack found in region {region}") 

823 

824 # Get queue URL from CloudFormation outputs 

825 cfn = boto3.client("cloudformation", region_name=region) 

826 response = cfn.describe_stacks(StackName=stack.stack_name) 

827 outputs = { 

828 o["OutputKey"]: o["OutputValue"] for o in response["Stacks"][0].get("Outputs", []) 

829 } 

830 queue_url = outputs.get("JobQueueUrl") 

831 

832 if not queue_url: 

833 raise ValueError(f"Job queue not found in stack {stack.stack_name}") 

834 

835 # Create SQS message. The ``namespace`` field in the envelope is 

836 # informational only — the queue processor reads each manifest's 

837 # own ``metadata.namespace`` for validation and application. Report 

838 # the first manifest's namespace here so the submission response 

839 # matches reality when the user doesn't pass ``--namespace``. 

840 job_id = str(uuid.uuid4())[:8] 

841 envelope_namespace = namespace or _first_manifest_namespace(manifest_list) or "gco-jobs" 

842 message_body = { 

843 "job_id": job_id, 

844 "manifests": manifest_list, 

845 "namespace": envelope_namespace, 

846 "priority": priority, 

847 "submitted_at": datetime.now(UTC).isoformat(), 

848 } 

849 

850 # Send to SQS 

851 sqs = boto3.client("sqs", region_name=region) 

852 response = sqs.send_message( 

853 QueueUrl=queue_url, 

854 MessageBody=json.dumps(message_body), 

855 MessageAttributes={ 

856 "Priority": {"DataType": "Number", "StringValue": str(priority)}, 

857 "JobId": {"DataType": "String", "StringValue": job_id}, 

858 }, 

859 ) 

860 

861 # Get job name from first manifest 

862 job_name = None 

863 for manifest in manifest_list: 863 ↛ 868line 863 didn't jump to line 868 because the loop on line 863 didn't complete

864 if manifest.get("kind") == "Job": 864 ↛ 863line 864 didn't jump to line 863 because the condition on line 864 was always true

865 job_name = manifest.get("metadata", {}).get("name") 

866 break 

867 

868 return { 

869 "status": "queued", 

870 "method": "sqs", 

871 "message_id": response["MessageId"], 

872 "job_id": job_id, 

873 "job_name": job_name, 

874 "queue_url": queue_url, 

875 "region": region, 

876 "namespace": envelope_namespace, 

877 "priority": priority, 

878 } 

879 

880 def get_queue_status(self, region: str) -> dict[str, Any]: 

881 """ 

882 Get the status of the job queue in a region. 

883 

884 Args: 

885 region: AWS region 

886 

887 Returns: 

888 Queue status including message counts 

889 """ 

890 import boto3 

891 

892 stack = self._aws_client.get_regional_stack(region) 

893 if not stack: 

894 raise ValueError(f"No GCO stack found in region {region}") 

895 

896 # Get queue URLs from CloudFormation outputs 

897 cfn = boto3.client("cloudformation", region_name=region) 

898 response = cfn.describe_stacks(StackName=stack.stack_name) 

899 outputs = { 

900 o["OutputKey"]: o["OutputValue"] for o in response["Stacks"][0].get("Outputs", []) 

901 } 

902 

903 queue_url = outputs.get("JobQueueUrl") 

904 dlq_url = outputs.get("JobDlqUrl") 

905 

906 if not queue_url: 

907 raise ValueError(f"Job queue not found in stack {stack.stack_name}") 

908 

909 sqs = boto3.client("sqs", region_name=region) 

910 

911 # Get main queue attributes 

912 queue_attrs = sqs.get_queue_attributes( 

913 QueueUrl=queue_url, 

914 AttributeNames=[ 

915 "ApproximateNumberOfMessages", 

916 "ApproximateNumberOfMessagesNotVisible", 

917 "ApproximateNumberOfMessagesDelayed", 

918 ], 

919 )["Attributes"] 

920 

921 result = { 

922 "region": region, 

923 "queue_url": queue_url, 

924 "messages_available": int(queue_attrs.get("ApproximateNumberOfMessages", 0)), 

925 "messages_in_flight": int(queue_attrs.get("ApproximateNumberOfMessagesNotVisible", 0)), 

926 "messages_delayed": int(queue_attrs.get("ApproximateNumberOfMessagesDelayed", 0)), 

927 } 

928 

929 # Get DLQ attributes if available 

930 if dlq_url: 930 ↛ 938line 930 didn't jump to line 938 because the condition on line 930 was always true

931 dlq_attrs = sqs.get_queue_attributes( 

932 QueueUrl=dlq_url, 

933 AttributeNames=["ApproximateNumberOfMessages"], 

934 )["Attributes"] 

935 result["dlq_url"] = dlq_url 

936 result["dlq_messages"] = int(dlq_attrs.get("ApproximateNumberOfMessages", 0)) 

937 

938 return result 

939 

940 def list_jobs_global( 

941 self, 

942 namespace: str | None = None, 

943 status: str | None = None, 

944 limit: int = 50, 

945 ) -> dict[str, Any]: 

946 """ 

947 List jobs across all regions via the global API endpoint. 

948 

949 This uses the cross-region aggregator Lambda to query all regional 

950 clusters in parallel and return a unified view. 

951 

952 Args: 

953 namespace: Filter by namespace 

954 status: Filter by status 

955 limit: Maximum jobs to return 

956 

957 Returns: 

958 Aggregated job list with region information 

959 """ 

960 return self._aws_client.get_global_jobs( 

961 namespace=namespace, 

962 status=status, 

963 limit=limit, 

964 ) 

965 

966 def get_global_health(self) -> dict[str, Any]: 

967 """ 

968 Get health status across all regions. 

969 

970 Returns: 

971 Aggregated health status from all regional clusters 

972 """ 

973 return self._aws_client.get_global_health() 

974 

975 def get_global_status(self) -> dict[str, Any]: 

976 """ 

977 Get cluster status across all regions. 

978 

979 Returns: 

980 Aggregated status from all regional clusters 

981 """ 

982 return self._aws_client.get_global_status() 

983 

984 def bulk_delete_global( 

985 self, 

986 namespace: str | None = None, 

987 status: str | None = None, 

988 older_than_days: int | None = None, 

989 dry_run: bool = True, 

990 ) -> dict[str, Any]: 

991 """ 

992 Bulk delete jobs across all regions. 

993 

994 Args: 

995 namespace: Filter by namespace 

996 status: Filter by status 

997 older_than_days: Delete jobs older than N days 

998 dry_run: If True, only return what would be deleted 

999 

1000 Returns: 

1001 Deletion results from all regions 

1002 """ 

1003 return self._aws_client.bulk_delete_global( 

1004 namespace=namespace, 

1005 status=status, 

1006 older_than_days=older_than_days, 

1007 dry_run=dry_run, 

1008 ) 

1009 

1010 def get_job_events( 

1011 self, 

1012 job_name: str, 

1013 namespace: str, 

1014 region: str | None = None, 

1015 ) -> dict[str, Any]: 

1016 """ 

1017 Get Kubernetes events for a job. 

1018 

1019 Args: 

1020 job_name: Name of the job 

1021 namespace: Namespace of the job 

1022 region: Region where the job is running 

1023 

1024 Returns: 

1025 Events related to the job 

1026 """ 

1027 return self._aws_client.get_job_events( 

1028 job_name=job_name, 

1029 namespace=namespace, 

1030 region=region or self.config.default_region, 

1031 ) 

1032 

1033 def get_job_pods( 

1034 self, 

1035 job_name: str, 

1036 namespace: str, 

1037 region: str | None = None, 

1038 ) -> dict[str, Any]: 

1039 """ 

1040 Get pods for a job. 

1041 

1042 Args: 

1043 job_name: Name of the job 

1044 namespace: Namespace of the job 

1045 region: Region where the job is running 

1046 

1047 Returns: 

1048 Pod details for the job 

1049 """ 

1050 return self._aws_client.get_job_pods( 

1051 job_name=job_name, 

1052 namespace=namespace, 

1053 region=region or self.config.default_region, 

1054 ) 

1055 

1056 def get_pod_logs( 

1057 self, 

1058 job_name: str, 

1059 pod_name: str, 

1060 namespace: str, 

1061 region: str | None = None, 

1062 tail_lines: int = 100, 

1063 container: str | None = None, 

1064 ) -> dict[str, Any]: 

1065 """ 

1066 Get logs from a specific pod of a job. 

1067 

1068 Args: 

1069 job_name: Name of the job 

1070 pod_name: Name of the pod 

1071 namespace: Namespace of the job 

1072 region: Region where the job is running 

1073 tail_lines: Number of lines to return 

1074 container: Container name (for multi-container pods) 

1075 

1076 Returns: 

1077 Pod logs response 

1078 """ 

1079 return self._aws_client.get_pod_logs( 

1080 job_name=job_name, 

1081 pod_name=pod_name, 

1082 namespace=namespace, 

1083 region=region or self.config.default_region, 

1084 tail_lines=tail_lines, 

1085 container=container, 

1086 ) 

1087 

1088 def get_job_metrics( 

1089 self, 

1090 job_name: str, 

1091 namespace: str, 

1092 region: str | None = None, 

1093 ) -> dict[str, Any]: 

1094 """ 

1095 Get resource metrics for a job. 

1096 

1097 Args: 

1098 job_name: Name of the job 

1099 namespace: Namespace of the job 

1100 region: Region where the job is running 

1101 

1102 Returns: 

1103 Resource usage metrics for the job's pods 

1104 """ 

1105 return self._aws_client.get_job_metrics( 

1106 job_name=job_name, 

1107 namespace=namespace, 

1108 region=region or self.config.default_region, 

1109 ) 

1110 

1111 def retry_job( 

1112 self, 

1113 job_name: str, 

1114 namespace: str, 

1115 region: str | None = None, 

1116 ) -> dict[str, Any]: 

1117 """ 

1118 Retry a failed job. 

1119 

1120 Creates a new job from the failed job's spec with a new name. 

1121 

1122 Args: 

1123 job_name: Name of the failed job 

1124 namespace: Namespace of the job 

1125 region: Region where the job is running 

1126 

1127 Returns: 

1128 Result with new job name 

1129 """ 

1130 return self._aws_client.retry_job( 

1131 job_name=job_name, 

1132 namespace=namespace, 

1133 region=region or self.config.default_region, 

1134 ) 

1135 

1136 def bulk_delete_jobs( 

1137 self, 

1138 namespace: str | None = None, 

1139 status: str | None = None, 

1140 older_than_days: int | None = None, 

1141 label_selector: str | None = None, 

1142 region: str | None = None, 

1143 dry_run: bool = True, 

1144 ) -> dict[str, Any]: 

1145 """ 

1146 Bulk delete jobs in a region. 

1147 

1148 Args: 

1149 namespace: Filter by namespace 

1150 status: Filter by status 

1151 older_than_days: Delete jobs older than N days 

1152 label_selector: Kubernetes label selector 

1153 region: Target region 

1154 dry_run: If True, only return what would be deleted 

1155 

1156 Returns: 

1157 Deletion results 

1158 """ 

1159 return self._aws_client.bulk_delete_jobs( 

1160 namespace=namespace, 

1161 status=status, 

1162 older_than_days=older_than_days, 

1163 label_selector=label_selector, 

1164 region=region or self.config.default_region, 

1165 dry_run=dry_run, 

1166 ) 

1167 

1168 

1169def get_job_manager(config: GCOConfig | None = None) -> JobManager: 

1170 """Get a configured job manager instance.""" 

1171 return JobManager(config)