Coverage for cli/jobs.py: 94%

404 statements  

« prev     ^ index     » next       coverage.py v7.14.1, created at 2026-06-15 15:07 +0000

1""" 

2Job management for GCO CLI. 

3 

4Provides functionality to submit, query, and manage jobs across GCO clusters. 

5""" 

6 

7from collections.abc import Callable 

8from dataclasses import dataclass, field 

9from datetime import UTC, datetime 

10from pathlib import Path 

11from typing import Any 

12 

13import yaml 

14 

15from gco.services.manifest_processor import safe_load_all_yaml 

16 

17from .aws_client import get_aws_client 

18from .config import GCOConfig, get_config 

19 

20# <pyflowchart-code-diagram> BEGIN - auto-inserted, do not edit 

21# Flowchart(s) generated from this file: 

22# * ``JobManager.submit_job`` -> ``diagrams/code_diagrams/cli/jobs.JobManager_submit_job.html`` 

23# (PNG: ``diagrams/code_diagrams/cli/jobs.JobManager_submit_job.png``) 

24# * ``JobManager.submit_job_sqs`` -> ``diagrams/code_diagrams/cli/jobs.JobManager_submit_job_sqs.html`` 

25# (PNG: ``diagrams/code_diagrams/cli/jobs.JobManager_submit_job_sqs.png``) 

26# Regenerate with ``python diagrams/code_diagrams/generate.py``. 

27# <pyflowchart-code-diagram> END 

28 

29 

30logger = __import__("logging").getLogger(__name__) 

31 

32 

33def _format_duration(seconds: int) -> str: 

34 """Format seconds into a human-readable duration string.""" 

35 if seconds < 60: 

36 return f"{seconds}s" 

37 minutes, secs = divmod(seconds, 60) 

38 if minutes < 60: 

39 return f"{minutes}m{secs:02d}s" 

40 hours, mins = divmod(minutes, 60) 

41 return f"{hours}h{mins:02d}m{secs:02d}s" 

42 

43 

44def _first_manifest_namespace(manifests: list[dict[str, Any]]) -> str | None: 

45 """Return the first explicit ``metadata.namespace`` found in a manifest list. 

46 

47 Used by the SQS submission path to populate the envelope ``namespace`` 

48 field (informational — the queue processor reads each manifest's own 

49 namespace for validation). Returns None if no manifest declares one. 

50 """ 

51 for manifest in manifests: 

52 ns = manifest.get("metadata", {}).get("namespace") if isinstance(manifest, dict) else None 

53 if ns: 53 ↛ 54line 53 didn't jump to line 54 because the condition on line 53 was never true

54 return str(ns) 

55 return None 

56 

57 

58def _extract_image_refs(spec: dict[str, Any]) -> list[str]: 

59 """Extract container image refs from a parsed Job spec. 

60 

61 The API surface for a Job carries ``spec.template.spec.containers`` and 

62 ``spec.template.spec.initContainers`` lists, each entry of which has 

63 a ``name`` and an ``image`` URI. Returns an alphabetically-sorted, 

64 deduplicated list so the output is stable across calls — orphan-image 

65 cross-references rely on set equality. 

66 """ 

67 refs: set[str] = set() 

68 template = spec.get("template") if isinstance(spec, dict) else None 

69 pod_spec = template.get("spec") if isinstance(template, dict) else None 

70 if not isinstance(pod_spec, dict): 

71 return [] 

72 for key in ("containers", "initContainers"): 

73 items = pod_spec.get(key, []) 

74 if not isinstance(items, list): 74 ↛ 75line 74 didn't jump to line 75 because the condition on line 74 was never true

75 continue 

76 for entry in items: 

77 if not isinstance(entry, dict): 

78 continue 

79 image = entry.get("image") 

80 if isinstance(image, str) and image: 

81 refs.add(image) 

82 return sorted(refs) 

83 

84 

85@dataclass 

86class JobInfo: 

87 """Information about a Kubernetes job.""" 

88 

89 name: str 

90 namespace: str 

91 region: str 

92 status: str # "pending", "running", "succeeded", "failed" 

93 created_time: datetime | None = None 

94 start_time: datetime | None = None 

95 completion_time: datetime | None = None 

96 active_pods: int = 0 

97 succeeded_pods: int = 0 

98 failed_pods: int = 0 

99 parallelism: int = 1 

100 completions: int = 1 

101 labels: dict[str, str] = field(default_factory=dict) 

102 image_refs: list[str] = field(default_factory=list) 

103 

104 @property 

105 def is_complete(self) -> bool: 

106 return self.status in ("succeeded", "failed") 

107 

108 @property 

109 def duration_seconds(self) -> int | None: 

110 if self.start_time and self.completion_time: 

111 return int((self.completion_time - self.start_time).total_seconds()) 

112 if self.start_time: 

113 return int((datetime.now(UTC) - self.start_time).total_seconds()) 

114 return None 

115 

116 

117class JobManager: 

118 """ 

119 Manages jobs across GCO clusters. 

120 

121 Provides: 

122 - Job submission with region targeting 

123 - Job status queries across regions 

124 - Job logs retrieval 

125 - Job deletion 

126 """ 

127 

128 def __init__(self, config: GCOConfig | None = None): 

129 self.config = config or get_config() 

130 self._aws_client = get_aws_client(config) 

131 # Configure regional API mode if enabled 

132 if hasattr(self.config, "use_regional_api") and self.config.use_regional_api: 

133 self._aws_client.set_use_regional_api(True) 

134 

135 def load_manifests(self, path: str) -> list[dict[str, Any]]: 

136 """ 

137 Load Kubernetes manifests from a file or directory. 

138 

139 Args: 

140 path: Path to YAML file or directory containing YAML files 

141 

142 Returns: 

143 List of manifest dictionaries 

144 """ 

145 manifests = [] 

146 path_obj = Path(path) 

147 

148 if path_obj.is_file(): 

149 manifests.extend(self._load_yaml_file(path_obj)) 

150 elif path_obj.is_dir(): 

151 for yaml_file in sorted(path_obj.glob("*.yaml")): 

152 manifests.extend(self._load_yaml_file(yaml_file)) 

153 for yaml_file in sorted(path_obj.glob("*.yml")): 

154 manifests.extend(self._load_yaml_file(yaml_file)) 

155 else: 

156 raise FileNotFoundError(f"Path not found: {path}") 

157 

158 return manifests 

159 

160 def _load_yaml_file(self, path: Path) -> list[dict[str, Any]]: 

161 """Load manifests from a single YAML file.""" 

162 with open(path, encoding="utf-8") as f: 

163 return safe_load_all_yaml(f, allow_aliases=False) 

164 

165 def submit_job( 

166 self, 

167 manifests: str | list[dict[str, Any]], 

168 namespace: str | None = None, 

169 target_region: str | None = None, 

170 dry_run: bool = False, 

171 labels: dict[str, str] | None = None, 

172 ) -> dict[str, Any]: 

173 """ 

174 Submit a job to GCO. 

175 

176 Args: 

177 manifests: Path to manifest file/directory or list of manifest dicts 

178 namespace: Fallback namespace for manifests that don't declare 

179 their own. When set, each manifest's ``metadata.namespace`` is 

180 filled in only if missing — existing values are preserved so 

181 users who've declared a target namespace in the manifest can 

182 rely on it reaching the server untouched. Server-side 

183 validation enforces the allowlist. 

184 target_region: Force job to specific region 

185 dry_run: Validate without applying 

186 labels: Additional labels to add to manifests 

187 

188 Returns: 

189 Submission result dictionary 

190 """ 

191 # Load manifests if path provided 

192 manifest_list = self.load_manifests(manifests) if isinstance(manifests, str) else manifests 

193 

194 # Apply namespace as a fallback only — preserve any namespace the 

195 # manifest declared itself. 

196 if namespace: 

197 for manifest in manifest_list: 

198 if "metadata" not in manifest: 

199 manifest["metadata"] = {} 

200 manifest["metadata"].setdefault("namespace", namespace) 

201 

202 # Apply additional labels 

203 if labels: 

204 for manifest in manifest_list: 

205 if "metadata" not in manifest: 

206 manifest["metadata"] = {} 

207 if "labels" not in manifest["metadata"]: 207 ↛ 209line 207 didn't jump to line 209 because the condition on line 207 was always true

208 manifest["metadata"]["labels"] = {} 

209 manifest["metadata"]["labels"].update(labels) 

210 

211 # Submit via API 

212 return self._aws_client.submit_manifests( 

213 manifests=manifest_list, 

214 namespace=namespace, 

215 target_region=target_region, 

216 dry_run=dry_run, 

217 ) 

218 

219 def submit_job_direct( 

220 self, 

221 manifests: str | list[dict[str, Any]], 

222 region: str, 

223 namespace: str | None = None, 

224 dry_run: bool = False, 

225 labels: dict[str, str] | None = None, 

226 ) -> dict[str, Any]: 

227 """ 

228 Submit a job directly to a regional cluster using kubectl. 

229 

230 This bypasses the API Gateway and submits directly to the EKS cluster 

231 using kubectl. Requires: 

232 - kubectl installed and in PATH 

233 - EKS access entry configured for your IAM principal 

234 - AWS credentials with eks:DescribeCluster permission 

235 

236 Args: 

237 manifests: Path to manifest file/directory or list of manifest dicts 

238 region: Target region for direct submission (required) 

239 namespace: Fallback namespace for manifests that don't declare 

240 their own. When set, each manifest's ``metadata.namespace`` is 

241 filled in only if missing — existing values are preserved so 

242 users who've declared a target namespace in the manifest can 

243 rely on it reaching ``kubectl apply`` untouched. 

244 dry_run: Validate without applying 

245 labels: Additional labels to add to manifests 

246 

247 Returns: 

248 Submission result dictionary 

249 """ 

250 import subprocess 

251 import tempfile 

252 import uuid 

253 

254 # Load manifests if path provided 

255 manifest_list = self.load_manifests(manifests) if isinstance(manifests, str) else manifests 

256 

257 # Apply namespace as a fallback only — preserve any namespace the 

258 # manifest declared itself. 

259 if namespace: 

260 for manifest in manifest_list: 

261 if "metadata" not in manifest: 

262 manifest["metadata"] = {} 

263 manifest["metadata"].setdefault("namespace", namespace) 

264 

265 # Apply additional labels 

266 if labels: 

267 for manifest in manifest_list: 

268 if "metadata" not in manifest: 

269 manifest["metadata"] = {} 

270 if "labels" not in manifest["metadata"]: 270 ↛ 272line 270 didn't jump to line 272 because the condition on line 270 was always true

271 manifest["metadata"]["labels"] = {} 

272 manifest["metadata"]["labels"].update(labels) 

273 

274 # Get cluster name from stack 

275 stack = self._aws_client.get_regional_stack(region) 

276 if not stack: 

277 raise ValueError(f"No GCO stack found in region {region}") 

278 

279 cluster_name = stack.cluster_name 

280 

281 # Update kubeconfig for the cluster 

282 from .kubectl_helpers import update_kubeconfig 

283 

284 update_kubeconfig(cluster_name, region) 

285 

286 # Handle existing Job resources before applying 

287 warnings: list[str] = [] 

288 if not dry_run: 

289 for manifest in manifest_list: 

290 if manifest.get("kind") != "Job": 290 ↛ 291line 290 didn't jump to line 291 because the condition on line 290 was never true

291 continue 

292 job_name = manifest.get("metadata", {}).get("name") 

293 job_ns = manifest.get("metadata", {}).get("namespace", namespace or "default") 

294 if not job_name: 

295 continue 

296 

297 existing_status = self._get_kubectl_job_status(job_name, job_ns) 

298 if existing_status is None: 

299 # No existing job — nothing to do 

300 continue 

301 

302 if existing_status in ("complete", "failed"): 

303 # Finished job — safe to delete and replace 

304 subprocess.run( 

305 ["kubectl", "delete", "job", job_name, "-n", job_ns], 

306 capture_output=True, 

307 text=True, 

308 ) 

309 else: 

310 # Job is still active — auto-rename to avoid collision 

311 suffix = uuid.uuid4().hex[:5] 

312 new_name = f"{job_name}-{suffix}" 

313 original_name = job_name 

314 manifest["metadata"]["name"] = new_name 

315 warnings.append( 

316 f"Job '{original_name}' is still running in namespace " 

317 f"'{job_ns}'. Renamed new submission to '{new_name}'." 

318 ) 

319 logger.warning( 

320 "Job %s is active in %s, renamed to %s", 

321 original_name, 

322 job_ns, 

323 new_name, 

324 ) 

325 

326 # Write manifests to temp file 

327 with tempfile.NamedTemporaryFile(mode="w", suffix=".yaml", delete=False) as f: 

328 yaml.dump_all(manifest_list, f) 

329 f.flush() # Ensure content is written before using f.name 

330 temp_path = f.name # nosemgrep: tempfile-without-flush 

331 

332 try: 

333 # Build kubectl command 

334 kubectl_cmd = ["kubectl", "apply", "-f", temp_path] 

335 

336 if dry_run: 

337 kubectl_cmd.extend(["--dry-run=client"]) 

338 

339 # Run kubectl apply 

340 result = subprocess.run( 

341 kubectl_cmd, capture_output=True, text=True 

342 ) # nosemgrep: dangerous-subprocess-use-audit - kubectl_cmd is a list ["kubectl","apply","-f",temp_path]; temp_path is a secure tempfile, not user input 

343 

344 if result.returncode != 0: 

345 raise RuntimeError(f"kubectl apply failed: {result.stderr}") 

346 

347 # Parse output to get job name 

348 output_lines = result.stdout.strip().split("\n") 

349 created_resources = [] 

350 for line in output_lines: 

351 if line: 351 ↛ 350line 351 didn't jump to line 350 because the condition on line 351 was always true

352 created_resources.append(line) 

353 

354 # Get job name from first manifest (may have been renamed) 

355 job_name = None 

356 for manifest in manifest_list: 356 ↛ 361line 356 didn't jump to line 361 because the loop on line 356 didn't complete

357 if manifest.get("kind") == "Job": 357 ↛ 356line 357 didn't jump to line 356 because the condition on line 357 was always true

358 job_name = manifest.get("metadata", {}).get("name") 

359 break 

360 

361 response: dict[str, Any] = { 

362 "status": "success", 

363 "method": "kubectl", 

364 "cluster": cluster_name, 

365 "region": region, 

366 "namespace": namespace or "default", 

367 "job_name": job_name, 

368 "dry_run": dry_run, 

369 "resources": created_resources, 

370 "output": result.stdout, 

371 } 

372 if warnings: 

373 response["warnings"] = warnings 

374 return response 

375 

376 finally: 

377 # Clean up temp file 

378 import os 

379 

380 os.unlink(temp_path) 

381 

382 def _get_kubectl_job_status(self, job_name: str, namespace: str) -> str | None: 

383 """Check the status of an existing Job via kubectl. 

384 

385 Returns: 

386 "complete", "failed", "active", or None if the job doesn't exist. 

387 """ 

388 import json 

389 import subprocess 

390 

391 result = subprocess.run( 

392 [ 

393 "kubectl", 

394 "get", 

395 "job", 

396 job_name, 

397 "-n", 

398 namespace, 

399 "-o", 

400 "json", 

401 ], 

402 capture_output=True, 

403 text=True, 

404 ) 

405 if result.returncode != 0: 

406 return None # Job doesn't exist 

407 

408 try: 

409 job_data = json.loads(result.stdout) 

410 except json.JSONDecodeError, KeyError: 

411 return None 

412 

413 conditions = job_data.get("status", {}).get("conditions") or [] 

414 for condition in conditions: 

415 cond_type = condition.get("type", "") 

416 cond_status = condition.get("status", "") 

417 if cond_type == "Complete" and cond_status == "True": 

418 return "complete" 

419 if cond_type == "Failed" and cond_status == "True": 

420 return "failed" 

421 return "active" 

422 

423 def list_jobs( 

424 self, 

425 region: str | None = None, 

426 namespace: str | None = None, 

427 status: str | None = None, 

428 all_regions: bool = False, 

429 ) -> list[JobInfo]: 

430 """ 

431 List jobs across GCO clusters. 

432 

433 Args: 

434 region: Specific region to query 

435 namespace: Filter by namespace 

436 status: Filter by status 

437 all_regions: Query all discovered regions 

438 

439 Returns: 

440 List of JobInfo objects 

441 """ 

442 jobs = [] 

443 

444 if all_regions: 

445 # Query all discovered regional stacks 

446 stacks = self._aws_client.discover_regional_stacks() 

447 for stack_region in stacks: 

448 try: 

449 region_jobs = self._query_jobs_in_region(stack_region, namespace, status) 

450 jobs.extend(region_jobs) 

451 except Exception as e: 

452 logger.warning("Failed to query jobs in %s: %s", stack_region, e) 

453 continue 

454 elif region: 

455 jobs = self._query_jobs_in_region(region, namespace, status) 

456 else: 

457 # Use default region 

458 jobs = self._query_jobs_in_region(self.config.default_region, namespace, status) 

459 

460 return jobs 

461 

462 def _query_jobs_in_region( 

463 self, region: str, namespace: str | None, status: str | None 

464 ) -> list[JobInfo]: 

465 """Query jobs in a specific region.""" 

466 try: 

467 response = self._aws_client.get_jobs(region=region, namespace=namespace, status=status) 

468 

469 jobs = [] 

470 # response is a list, but we expect a dict with "jobs" key from the API 

471 job_list = response.get("jobs", []) if isinstance(response, dict) else response 

472 for job_data in job_list: 

473 jobs.append(self._parse_job_info(job_data, region)) 

474 

475 return jobs 

476 except Exception as exc: 

477 logger.warning("Failed to query jobs in %s: %s", region, exc) 

478 return [] 

479 

480 def _parse_job_info(self, job_data: dict[str, Any], region: str) -> JobInfo: 

481 """Parse job data into JobInfo object.""" 

482 metadata = job_data.get("metadata", {}) 

483 status_data = job_data.get("status", {}) 

484 spec = job_data.get("spec", {}) 

485 

486 # Determine job status 

487 conditions = status_data.get("conditions", []) 

488 job_status = "pending" 

489 for condition in conditions: 

490 if condition.get("type") == "Complete" and condition.get("status") == "True": 

491 job_status = "succeeded" 

492 break 

493 if condition.get("type") == "Failed" and condition.get("status") == "True": 493 ↛ 489line 493 didn't jump to line 489 because the condition on line 493 was always true

494 job_status = "failed" 

495 break 

496 

497 if job_status == "pending" and status_data.get("active", 0) > 0: 

498 job_status = "running" 

499 

500 # Parse timestamps 

501 created_time = None 

502 if metadata.get("creationTimestamp"): 

503 created_time = datetime.fromisoformat( 

504 metadata["creationTimestamp"].replace("Z", "+00:00") 

505 ) 

506 

507 start_time = None 

508 if status_data.get("startTime"): 

509 start_time = datetime.fromisoformat(status_data["startTime"].replace("Z", "+00:00")) 

510 

511 completion_time = None 

512 if status_data.get("completionTime"): 

513 completion_time = datetime.fromisoformat( 

514 status_data["completionTime"].replace("Z", "+00:00") 

515 ) 

516 

517 return JobInfo( 

518 name=metadata.get("name", ""), 

519 namespace=metadata.get("namespace", "default"), 

520 region=region, 

521 status=job_status, 

522 created_time=created_time, 

523 start_time=start_time, 

524 completion_time=completion_time, 

525 active_pods=status_data.get("active", 0), 

526 succeeded_pods=status_data.get("succeeded", 0), 

527 failed_pods=status_data.get("failed", 0), 

528 parallelism=spec.get("parallelism", 1), 

529 completions=spec.get("completions", 1), 

530 labels=metadata.get("labels", {}), 

531 image_refs=_extract_image_refs(spec), 

532 ) 

533 

534 def get_job(self, job_name: str, namespace: str, region: str | None = None) -> JobInfo | None: 

535 """ 

536 Get detailed information about a specific job. 

537 

538 Args: 

539 job_name: Name of the job 

540 namespace: Namespace of the job 

541 region: Region where the job is running 

542 

543 Returns: 

544 JobInfo or None if not found 

545 """ 

546 try: 

547 response = self._aws_client.get_job_details( 

548 job_name=job_name, namespace=namespace, region=region or self.config.default_region 

549 ) 

550 return self._parse_job_info(response, region or self.config.default_region) 

551 except Exception as e: 

552 logger.debug("Failed to get job details for %s: %s", job_name, e) 

553 return None 

554 

555 def get_job_logs( 

556 self, 

557 job_name: str, 

558 namespace: str, 

559 region: str | None = None, 

560 tail_lines: int = 100, 

561 follow: bool = False, 

562 since_hours: int = 24, 

563 ) -> str: 

564 """ 

565 Get logs from a job. 

566 

567 Tries the Kubernetes API first (via the GCO API). If the pod is no 

568 longer available (completed/deleted), falls back to CloudWatch Logs 

569 where Container Insights stores application logs. 

570 

571 Args: 

572 job_name: Name of the job 

573 namespace: Namespace of the job 

574 region: Region where the job is running 

575 tail_lines: Number of lines to return 

576 follow: Stream logs (not implemented yet) 

577 since_hours: Hours to look back in CloudWatch (default 24) 

578 

579 Returns: 

580 Log content as string 

581 """ 

582 if follow: 

583 raise NotImplementedError("Log streaming not yet implemented") 

584 

585 target_region = region or self.config.default_region 

586 

587 try: 

588 return self._aws_client.get_job_logs( 

589 job_name=job_name, 

590 namespace=namespace, 

591 region=target_region, 

592 tail_lines=tail_lines, 

593 ) 

594 except RuntimeError as e: 

595 error_msg = str(e) 

596 # If the pod is gone or pending, try CloudWatch 

597 if any( 

598 hint in error_msg.lower() 

599 for hint in ["not found", "pending", "no pods", "terminated", "completed"] 

600 ): 

601 logger.info("Pod not available, falling back to CloudWatch Logs") 

602 try: 

603 return self._get_cloudwatch_logs( 

604 job_name=job_name, 

605 region=target_region, 

606 tail_lines=tail_lines, 

607 since_hours=since_hours, 

608 ) 

609 except Exception as cw_err: 

610 logger.debug("CloudWatch fallback failed: %s", cw_err) 

611 raise RuntimeError( 

612 f"{error_msg}\n\n" 

613 f"CloudWatch Logs fallback also failed: {cw_err}\n" 

614 f"Tip: Container logs appear in CloudWatch within a few minutes. " 

615 f"If the job just finished, try again shortly." 

616 ) from e 

617 raise 

618 

619 def _get_cloudwatch_logs( 

620 self, 

621 job_name: str, 

622 region: str, 

623 tail_lines: int = 100, 

624 since_hours: int = 24, 

625 ) -> str: 

626 """ 

627 Fetch job logs from CloudWatch Logs (Container Insights). 

628 

629 The CloudWatch Observability addon ships container stdout/stderr to: 

630 /aws/containerinsights/{cluster_name}/application 

631 

632 Args: 

633 job_name: Name of the job (used to filter log streams) 

634 region: AWS region 

635 tail_lines: Number of log lines to return 

636 since_hours: Hours to look back (default 24) 

637 

638 Returns: 

639 Log content as string 

640 """ 

641 cluster_name = f"{self.config.project_name}-{region}" 

642 log_group = f"/aws/containerinsights/{cluster_name}/application" 

643 

644 logs_client = self._aws_client._session.client("logs", region_name=region) 

645 

646 import time 

647 

648 now = int(time.time()) 

649 start_time = now - (since_hours * 3600) 

650 

651 query = ( 

652 f"fields @timestamp, @message " 

653 f'| filter @logStream like "{job_name}" ' 

654 f"| sort @timestamp asc " 

655 f"| limit {tail_lines}" 

656 ) 

657 

658 start_query = logs_client.start_query( 

659 logGroupName=log_group, 

660 startTime=start_time, 

661 endTime=now, 

662 queryString=query, 

663 ) 

664 query_id = start_query["queryId"] 

665 

666 # Poll for results (CloudWatch Insights is async) 

667 result = None 

668 for _ in range(30): # up to 30 seconds 

669 time.sleep(1) 

670 result = logs_client.get_query_results(queryId=query_id) 

671 if result["status"] in ("Complete", "Failed", "Cancelled"): 

672 break 

673 

674 if result is None or result["status"] != "Complete": 

675 status = result["status"] if result else "unknown" 

676 raise RuntimeError( 

677 f"CloudWatch Logs query did not complete (status: {status}). Try again in a moment." 

678 ) 

679 

680 if not result["results"]: 

681 raise RuntimeError( 

682 f"No logs found in CloudWatch for job '{job_name}' " 

683 f"in the last {since_hours} hours (log group: {log_group}). " 

684 f"Logs may take 1-2 minutes to appear after a pod runs. " 

685 f"Use --since to search further back, or check the job name " 

686 f"with: gco jobs list -r {region}" 

687 ) 

688 

689 # Extract log messages from results. 

690 # CloudWatch Container Insights wraps logs in a JSON envelope: 

691 # {"time":"...","stream":"stdout","log":"actual message","kubernetes":{...}} 

692 # We parse out the "log" field for clean output, falling back to the 

693 # raw message if it's not JSON. 

694 import json as _json 

695 

696 lines = [] 

697 for row in result["results"]: 

698 for entry in row: 698 ↛ 697line 698 didn't jump to line 697 because the loop on line 698 didn't complete

699 if entry["field"] == "@message": 

700 raw = entry["value"].rstrip() 

701 try: 

702 parsed = _json.loads(raw) 

703 lines.append(parsed.get("log", raw).rstrip()) 

704 except ValueError, TypeError: 

705 lines.append(raw) 

706 break 

707 

708 header = f"[CloudWatch Logs — {log_group}]\n" 

709 return header + "\n".join(lines) 

710 

711 def delete_job( 

712 self, job_name: str, namespace: str, region: str | None = None 

713 ) -> dict[str, Any]: 

714 """ 

715 Delete a job. 

716 

717 Args: 

718 job_name: Name of the job 

719 namespace: Namespace of the job 

720 region: Region where the job is running 

721 

722 Returns: 

723 Deletion result 

724 """ 

725 return self._aws_client.delete_job( 

726 job_name=job_name, namespace=namespace, region=region or self.config.default_region 

727 ) 

728 

729 def wait_for_job( 

730 self, 

731 job_name: str, 

732 namespace: str, 

733 region: str | None = None, 

734 timeout_seconds: int = 3600, 

735 poll_interval: int = 10, 

736 progress_callback: Callable[[JobInfo, int], None] | None = None, 

737 ) -> JobInfo: 

738 """ 

739 Wait for a job to complete with progress reporting. 

740 

741 Args: 

742 job_name: Name of the job 

743 namespace: Namespace of the job 

744 region: Region where the job is running 

745 timeout_seconds: Maximum time to wait 

746 poll_interval: Seconds between status checks 

747 progress_callback: Optional callable(JobInfo, elapsed_seconds) for progress updates. 

748 If None, a default stderr progress line is printed. 

749 

750 Returns: 

751 Final JobInfo 

752 

753 Raises: 

754 TimeoutError: If job doesn't complete within timeout 

755 """ 

756 import sys 

757 import time 

758 

759 start_time = time.time() 

760 

761 while True: 

762 job = self.get_job(job_name, namespace, region) 

763 

764 if job is None: 

765 raise ValueError(f"Job {job_name} not found in namespace {namespace}") 

766 

767 elapsed = time.time() - start_time 

768 elapsed_str = _format_duration(int(elapsed)) 

769 

770 if job.is_complete: 

771 # Clear the progress line and return 

772 sys.stderr.write("\r\033[K") 

773 sys.stderr.flush() 

774 return job 

775 

776 # Build progress message 

777 pods_info = ( 

778 f"{job.active_pods} active, {job.succeeded_pods}/{job.completions} succeeded" 

779 ) 

780 if job.failed_pods: 780 ↛ 781line 780 didn't jump to line 781 because the condition on line 780 was never true

781 pods_info += f", {job.failed_pods} failed" 

782 

783 status_line = f"{job.status.capitalize()}{pods_info}{elapsed_str} elapsed" 

784 

785 if progress_callback: 785 ↛ 789line 785 didn't jump to line 789 because the condition on line 785 was always true

786 progress_callback(job, int(elapsed)) 

787 else: 

788 # Overwrite the same line on stderr 

789 sys.stderr.write(f"\r\033[K{status_line}") 

790 sys.stderr.flush() 

791 

792 if elapsed >= timeout_seconds: 

793 sys.stderr.write("\r\033[K") 

794 sys.stderr.flush() 

795 raise TimeoutError( 

796 f"Job {job_name} did not complete within {timeout_seconds} seconds " 

797 f"(last status: {job.status}, pods: {pods_info})" 

798 ) 

799 

800 time.sleep(poll_interval) # nosemgrep: arbitrary-sleep - intentional polling delay 

801 

802 def submit_job_sqs( 

803 self, 

804 manifests: str | list[dict[str, Any]], 

805 region: str, 

806 namespace: str | None = None, 

807 labels: dict[str, str] | None = None, 

808 priority: int = 0, 

809 ) -> dict[str, Any]: 

810 """ 

811 Submit a job to a regional SQS queue for processing. 

812 

813 This is the recommended way to submit jobs as it: 

814 - Decouples submission from processing 

815 - Enables KEDA-based autoscaling 

816 - Provides better fault tolerance 

817 

818 Args: 

819 manifests: Path to manifest file/directory or list of manifest dicts 

820 region: Target region for job submission (required) 

821 namespace: Fallback namespace for manifests that don't declare 

822 their own. When set, each manifest's ``metadata.namespace`` is 

823 filled in only if missing — existing values are preserved so 

824 users who've declared a target namespace in the manifest can 

825 rely on it reaching the queue processor untouched. Server-side 

826 validation enforces the allowlist. 

827 labels: Additional labels to add to manifests 

828 priority: Job priority (higher = more important) 

829 

830 Returns: 

831 Submission result dictionary with message_id and queue info 

832 """ 

833 import json 

834 import uuid 

835 

836 import boto3 

837 

838 # Load manifests if path provided 

839 manifest_list = self.load_manifests(manifests) if isinstance(manifests, str) else manifests 

840 

841 # Apply namespace as a fallback only — preserve any namespace the 

842 # manifest declared itself. 

843 if namespace: 

844 for manifest in manifest_list: 

845 if "metadata" not in manifest: 

846 manifest["metadata"] = {} 

847 manifest["metadata"].setdefault("namespace", namespace) 

848 

849 # Apply additional labels 

850 if labels: 

851 for manifest in manifest_list: 

852 if "metadata" not in manifest: 852 ↛ 853line 852 didn't jump to line 853 because the condition on line 852 was never true

853 manifest["metadata"] = {} 

854 if "labels" not in manifest["metadata"]: 854 ↛ 856line 854 didn't jump to line 856 because the condition on line 854 was always true

855 manifest["metadata"]["labels"] = {} 

856 manifest["metadata"]["labels"].update(labels) 

857 

858 # Get queue URL from stack 

859 stack = self._aws_client.get_regional_stack(region) 

860 if not stack: 

861 raise ValueError(f"No GCO stack found in region {region}") 

862 

863 # Get queue URL from CloudFormation outputs 

864 cfn = boto3.client("cloudformation", region_name=region) 

865 response = cfn.describe_stacks(StackName=stack.stack_name) 

866 outputs = { 

867 o["OutputKey"]: o["OutputValue"] for o in response["Stacks"][0].get("Outputs", []) 

868 } 

869 queue_url = outputs.get("JobQueueUrl") 

870 

871 if not queue_url: 

872 raise ValueError(f"Job queue not found in stack {stack.stack_name}") 

873 

874 # Create SQS message. The ``namespace`` field in the envelope is 

875 # informational only — the queue processor reads each manifest's 

876 # own ``metadata.namespace`` for validation and application. Report 

877 # the first manifest's namespace here so the submission response 

878 # matches reality when the user doesn't pass ``--namespace``. 

879 job_id = str(uuid.uuid4())[:8] 

880 envelope_namespace = namespace or _first_manifest_namespace(manifest_list) or "gco-jobs" 

881 message_body = { 

882 "job_id": job_id, 

883 "manifests": manifest_list, 

884 "namespace": envelope_namespace, 

885 "priority": priority, 

886 "submitted_at": datetime.now(UTC).isoformat(), 

887 } 

888 

889 # Send to SQS 

890 sqs = boto3.client("sqs", region_name=region) 

891 response = sqs.send_message( 

892 QueueUrl=queue_url, 

893 MessageBody=json.dumps(message_body), 

894 MessageAttributes={ 

895 "Priority": {"DataType": "Number", "StringValue": str(priority)}, 

896 "JobId": {"DataType": "String", "StringValue": job_id}, 

897 }, 

898 ) 

899 

900 # Get job name from first manifest 

901 job_name = None 

902 for manifest in manifest_list: 902 ↛ 907line 902 didn't jump to line 907 because the loop on line 902 didn't complete

903 if manifest.get("kind") == "Job": 903 ↛ 902line 903 didn't jump to line 902 because the condition on line 903 was always true

904 job_name = manifest.get("metadata", {}).get("name") 

905 break 

906 

907 return { 

908 "status": "queued", 

909 "method": "sqs", 

910 "message_id": response["MessageId"], 

911 "job_id": job_id, 

912 "job_name": job_name, 

913 "queue_url": queue_url, 

914 "region": region, 

915 "namespace": envelope_namespace, 

916 "priority": priority, 

917 } 

918 

919 def get_queue_status(self, region: str) -> dict[str, Any]: 

920 """ 

921 Get the status of the job queue in a region. 

922 

923 Args: 

924 region: AWS region 

925 

926 Returns: 

927 Queue status including message counts 

928 """ 

929 import boto3 

930 

931 stack = self._aws_client.get_regional_stack(region) 

932 if not stack: 

933 raise ValueError(f"No GCO stack found in region {region}") 

934 

935 # Get queue URLs from CloudFormation outputs 

936 cfn = boto3.client("cloudformation", region_name=region) 

937 response = cfn.describe_stacks(StackName=stack.stack_name) 

938 outputs = { 

939 o["OutputKey"]: o["OutputValue"] for o in response["Stacks"][0].get("Outputs", []) 

940 } 

941 

942 queue_url = outputs.get("JobQueueUrl") 

943 dlq_url = outputs.get("JobDlqUrl") 

944 

945 if not queue_url: 

946 raise ValueError(f"Job queue not found in stack {stack.stack_name}") 

947 

948 sqs = boto3.client("sqs", region_name=region) 

949 

950 # Get main queue attributes 

951 queue_attrs = sqs.get_queue_attributes( 

952 QueueUrl=queue_url, 

953 AttributeNames=[ 

954 "ApproximateNumberOfMessages", 

955 "ApproximateNumberOfMessagesNotVisible", 

956 "ApproximateNumberOfMessagesDelayed", 

957 ], 

958 )["Attributes"] 

959 

960 result = { 

961 "region": region, 

962 "queue_url": queue_url, 

963 "messages_available": int(queue_attrs.get("ApproximateNumberOfMessages", 0)), 

964 "messages_in_flight": int(queue_attrs.get("ApproximateNumberOfMessagesNotVisible", 0)), 

965 "messages_delayed": int(queue_attrs.get("ApproximateNumberOfMessagesDelayed", 0)), 

966 } 

967 

968 # Get DLQ attributes if available 

969 if dlq_url: 969 ↛ 977line 969 didn't jump to line 977 because the condition on line 969 was always true

970 dlq_attrs = sqs.get_queue_attributes( 

971 QueueUrl=dlq_url, 

972 AttributeNames=["ApproximateNumberOfMessages"], 

973 )["Attributes"] 

974 result["dlq_url"] = dlq_url 

975 result["dlq_messages"] = int(dlq_attrs.get("ApproximateNumberOfMessages", 0)) 

976 

977 return result 

978 

979 def list_jobs_global( 

980 self, 

981 namespace: str | None = None, 

982 status: str | None = None, 

983 limit: int = 50, 

984 ) -> dict[str, Any]: 

985 """ 

986 List jobs across all regions via the global API endpoint. 

987 

988 This uses the cross-region aggregator Lambda to query all regional 

989 clusters in parallel and return a unified view. 

990 

991 Args: 

992 namespace: Filter by namespace 

993 status: Filter by status 

994 limit: Maximum jobs to return 

995 

996 Returns: 

997 Aggregated job list with region information 

998 """ 

999 return self._aws_client.get_global_jobs( 

1000 namespace=namespace, 

1001 status=status, 

1002 limit=limit, 

1003 ) 

1004 

1005 def get_global_health(self) -> dict[str, Any]: 

1006 """ 

1007 Get health status across all regions. 

1008 

1009 Returns: 

1010 Aggregated health status from all regional clusters 

1011 """ 

1012 return self._aws_client.get_global_health() 

1013 

1014 def get_global_status(self) -> dict[str, Any]: 

1015 """ 

1016 Get cluster status across all regions. 

1017 

1018 Returns: 

1019 Aggregated status from all regional clusters 

1020 """ 

1021 return self._aws_client.get_global_status() 

1022 

1023 def bulk_delete_global( 

1024 self, 

1025 namespace: str | None = None, 

1026 status: str | None = None, 

1027 older_than_days: int | None = None, 

1028 dry_run: bool = True, 

1029 ) -> dict[str, Any]: 

1030 """ 

1031 Bulk delete jobs across all regions. 

1032 

1033 Args: 

1034 namespace: Filter by namespace 

1035 status: Filter by status 

1036 older_than_days: Delete jobs older than N days 

1037 dry_run: If True, only return what would be deleted 

1038 

1039 Returns: 

1040 Deletion results from all regions 

1041 """ 

1042 return self._aws_client.bulk_delete_global( 

1043 namespace=namespace, 

1044 status=status, 

1045 older_than_days=older_than_days, 

1046 dry_run=dry_run, 

1047 ) 

1048 

1049 def get_job_events( 

1050 self, 

1051 job_name: str, 

1052 namespace: str, 

1053 region: str | None = None, 

1054 ) -> dict[str, Any]: 

1055 """ 

1056 Get Kubernetes events for a job. 

1057 

1058 Args: 

1059 job_name: Name of the job 

1060 namespace: Namespace of the job 

1061 region: Region where the job is running 

1062 

1063 Returns: 

1064 Events related to the job 

1065 """ 

1066 return self._aws_client.get_job_events( 

1067 job_name=job_name, 

1068 namespace=namespace, 

1069 region=region or self.config.default_region, 

1070 ) 

1071 

1072 def get_job_pods( 

1073 self, 

1074 job_name: str, 

1075 namespace: str, 

1076 region: str | None = None, 

1077 ) -> dict[str, Any]: 

1078 """ 

1079 Get pods for a job. 

1080 

1081 Args: 

1082 job_name: Name of the job 

1083 namespace: Namespace of the job 

1084 region: Region where the job is running 

1085 

1086 Returns: 

1087 Pod details for the job 

1088 """ 

1089 return self._aws_client.get_job_pods( 

1090 job_name=job_name, 

1091 namespace=namespace, 

1092 region=region or self.config.default_region, 

1093 ) 

1094 

1095 def get_pod_logs( 

1096 self, 

1097 job_name: str, 

1098 pod_name: str, 

1099 namespace: str, 

1100 region: str | None = None, 

1101 tail_lines: int = 100, 

1102 container: str | None = None, 

1103 ) -> dict[str, Any]: 

1104 """ 

1105 Get logs from a specific pod of a job. 

1106 

1107 Args: 

1108 job_name: Name of the job 

1109 pod_name: Name of the pod 

1110 namespace: Namespace of the job 

1111 region: Region where the job is running 

1112 tail_lines: Number of lines to return 

1113 container: Container name (for multi-container pods) 

1114 

1115 Returns: 

1116 Pod logs response 

1117 """ 

1118 return self._aws_client.get_pod_logs( 

1119 job_name=job_name, 

1120 pod_name=pod_name, 

1121 namespace=namespace, 

1122 region=region or self.config.default_region, 

1123 tail_lines=tail_lines, 

1124 container=container, 

1125 ) 

1126 

1127 def get_job_metrics( 

1128 self, 

1129 job_name: str, 

1130 namespace: str, 

1131 region: str | None = None, 

1132 ) -> dict[str, Any]: 

1133 """ 

1134 Get resource metrics for a job. 

1135 

1136 Args: 

1137 job_name: Name of the job 

1138 namespace: Namespace of the job 

1139 region: Region where the job is running 

1140 

1141 Returns: 

1142 Resource usage metrics for the job's pods 

1143 """ 

1144 return self._aws_client.get_job_metrics( 

1145 job_name=job_name, 

1146 namespace=namespace, 

1147 region=region or self.config.default_region, 

1148 ) 

1149 

1150 def retry_job( 

1151 self, 

1152 job_name: str, 

1153 namespace: str, 

1154 region: str | None = None, 

1155 ) -> dict[str, Any]: 

1156 """ 

1157 Retry a failed job. 

1158 

1159 Creates a new job from the failed job's spec with a new name. 

1160 

1161 Args: 

1162 job_name: Name of the failed job 

1163 namespace: Namespace of the job 

1164 region: Region where the job is running 

1165 

1166 Returns: 

1167 Result with new job name 

1168 """ 

1169 return self._aws_client.retry_job( 

1170 job_name=job_name, 

1171 namespace=namespace, 

1172 region=region or self.config.default_region, 

1173 ) 

1174 

1175 def bulk_delete_jobs( 

1176 self, 

1177 namespace: str | None = None, 

1178 status: str | None = None, 

1179 older_than_days: int | None = None, 

1180 label_selector: str | None = None, 

1181 region: str | None = None, 

1182 dry_run: bool = True, 

1183 ) -> dict[str, Any]: 

1184 """ 

1185 Bulk delete jobs in a region. 

1186 

1187 Args: 

1188 namespace: Filter by namespace 

1189 status: Filter by status 

1190 older_than_days: Delete jobs older than N days 

1191 label_selector: Kubernetes label selector 

1192 region: Target region 

1193 dry_run: If True, only return what would be deleted 

1194 

1195 Returns: 

1196 Deletion results 

1197 """ 

1198 return self._aws_client.bulk_delete_jobs( 

1199 namespace=namespace, 

1200 status=status, 

1201 older_than_days=older_than_days, 

1202 label_selector=label_selector, 

1203 region=region or self.config.default_region, 

1204 dry_run=dry_run, 

1205 ) 

1206 

1207 

1208def get_job_manager(config: GCOConfig | None = None) -> JobManager: 

1209 """Get a configured job manager instance.""" 

1210 return JobManager(config)