Coverage for cli/jobs.py: 94%
404 statements
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-15 15:07 +0000
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-15 15:07 +0000
1"""
2Job management for GCO CLI.
4Provides functionality to submit, query, and manage jobs across GCO clusters.
5"""
7from collections.abc import Callable
8from dataclasses import dataclass, field
9from datetime import UTC, datetime
10from pathlib import Path
11from typing import Any
13import yaml
15from gco.services.manifest_processor import safe_load_all_yaml
17from .aws_client import get_aws_client
18from .config import GCOConfig, get_config
20# <pyflowchart-code-diagram> BEGIN - auto-inserted, do not edit
21# Flowchart(s) generated from this file:
22# * ``JobManager.submit_job`` -> ``diagrams/code_diagrams/cli/jobs.JobManager_submit_job.html``
23# (PNG: ``diagrams/code_diagrams/cli/jobs.JobManager_submit_job.png``)
24# * ``JobManager.submit_job_sqs`` -> ``diagrams/code_diagrams/cli/jobs.JobManager_submit_job_sqs.html``
25# (PNG: ``diagrams/code_diagrams/cli/jobs.JobManager_submit_job_sqs.png``)
26# Regenerate with ``python diagrams/code_diagrams/generate.py``.
27# <pyflowchart-code-diagram> END
30logger = __import__("logging").getLogger(__name__)
33def _format_duration(seconds: int) -> str:
34 """Format seconds into a human-readable duration string."""
35 if seconds < 60:
36 return f"{seconds}s"
37 minutes, secs = divmod(seconds, 60)
38 if minutes < 60:
39 return f"{minutes}m{secs:02d}s"
40 hours, mins = divmod(minutes, 60)
41 return f"{hours}h{mins:02d}m{secs:02d}s"
44def _first_manifest_namespace(manifests: list[dict[str, Any]]) -> str | None:
45 """Return the first explicit ``metadata.namespace`` found in a manifest list.
47 Used by the SQS submission path to populate the envelope ``namespace``
48 field (informational — the queue processor reads each manifest's own
49 namespace for validation). Returns None if no manifest declares one.
50 """
51 for manifest in manifests:
52 ns = manifest.get("metadata", {}).get("namespace") if isinstance(manifest, dict) else None
53 if ns: 53 ↛ 54line 53 didn't jump to line 54 because the condition on line 53 was never true
54 return str(ns)
55 return None
58def _extract_image_refs(spec: dict[str, Any]) -> list[str]:
59 """Extract container image refs from a parsed Job spec.
61 The API surface for a Job carries ``spec.template.spec.containers`` and
62 ``spec.template.spec.initContainers`` lists, each entry of which has
63 a ``name`` and an ``image`` URI. Returns an alphabetically-sorted,
64 deduplicated list so the output is stable across calls — orphan-image
65 cross-references rely on set equality.
66 """
67 refs: set[str] = set()
68 template = spec.get("template") if isinstance(spec, dict) else None
69 pod_spec = template.get("spec") if isinstance(template, dict) else None
70 if not isinstance(pod_spec, dict):
71 return []
72 for key in ("containers", "initContainers"):
73 items = pod_spec.get(key, [])
74 if not isinstance(items, list): 74 ↛ 75line 74 didn't jump to line 75 because the condition on line 74 was never true
75 continue
76 for entry in items:
77 if not isinstance(entry, dict):
78 continue
79 image = entry.get("image")
80 if isinstance(image, str) and image:
81 refs.add(image)
82 return sorted(refs)
85@dataclass
86class JobInfo:
87 """Information about a Kubernetes job."""
89 name: str
90 namespace: str
91 region: str
92 status: str # "pending", "running", "succeeded", "failed"
93 created_time: datetime | None = None
94 start_time: datetime | None = None
95 completion_time: datetime | None = None
96 active_pods: int = 0
97 succeeded_pods: int = 0
98 failed_pods: int = 0
99 parallelism: int = 1
100 completions: int = 1
101 labels: dict[str, str] = field(default_factory=dict)
102 image_refs: list[str] = field(default_factory=list)
104 @property
105 def is_complete(self) -> bool:
106 return self.status in ("succeeded", "failed")
108 @property
109 def duration_seconds(self) -> int | None:
110 if self.start_time and self.completion_time:
111 return int((self.completion_time - self.start_time).total_seconds())
112 if self.start_time:
113 return int((datetime.now(UTC) - self.start_time).total_seconds())
114 return None
117class JobManager:
118 """
119 Manages jobs across GCO clusters.
121 Provides:
122 - Job submission with region targeting
123 - Job status queries across regions
124 - Job logs retrieval
125 - Job deletion
126 """
128 def __init__(self, config: GCOConfig | None = None):
129 self.config = config or get_config()
130 self._aws_client = get_aws_client(config)
131 # Configure regional API mode if enabled
132 if hasattr(self.config, "use_regional_api") and self.config.use_regional_api:
133 self._aws_client.set_use_regional_api(True)
135 def load_manifests(self, path: str) -> list[dict[str, Any]]:
136 """
137 Load Kubernetes manifests from a file or directory.
139 Args:
140 path: Path to YAML file or directory containing YAML files
142 Returns:
143 List of manifest dictionaries
144 """
145 manifests = []
146 path_obj = Path(path)
148 if path_obj.is_file():
149 manifests.extend(self._load_yaml_file(path_obj))
150 elif path_obj.is_dir():
151 for yaml_file in sorted(path_obj.glob("*.yaml")):
152 manifests.extend(self._load_yaml_file(yaml_file))
153 for yaml_file in sorted(path_obj.glob("*.yml")):
154 manifests.extend(self._load_yaml_file(yaml_file))
155 else:
156 raise FileNotFoundError(f"Path not found: {path}")
158 return manifests
160 def _load_yaml_file(self, path: Path) -> list[dict[str, Any]]:
161 """Load manifests from a single YAML file."""
162 with open(path, encoding="utf-8") as f:
163 return safe_load_all_yaml(f, allow_aliases=False)
165 def submit_job(
166 self,
167 manifests: str | list[dict[str, Any]],
168 namespace: str | None = None,
169 target_region: str | None = None,
170 dry_run: bool = False,
171 labels: dict[str, str] | None = None,
172 ) -> dict[str, Any]:
173 """
174 Submit a job to GCO.
176 Args:
177 manifests: Path to manifest file/directory or list of manifest dicts
178 namespace: Fallback namespace for manifests that don't declare
179 their own. When set, each manifest's ``metadata.namespace`` is
180 filled in only if missing — existing values are preserved so
181 users who've declared a target namespace in the manifest can
182 rely on it reaching the server untouched. Server-side
183 validation enforces the allowlist.
184 target_region: Force job to specific region
185 dry_run: Validate without applying
186 labels: Additional labels to add to manifests
188 Returns:
189 Submission result dictionary
190 """
191 # Load manifests if path provided
192 manifest_list = self.load_manifests(manifests) if isinstance(manifests, str) else manifests
194 # Apply namespace as a fallback only — preserve any namespace the
195 # manifest declared itself.
196 if namespace:
197 for manifest in manifest_list:
198 if "metadata" not in manifest:
199 manifest["metadata"] = {}
200 manifest["metadata"].setdefault("namespace", namespace)
202 # Apply additional labels
203 if labels:
204 for manifest in manifest_list:
205 if "metadata" not in manifest:
206 manifest["metadata"] = {}
207 if "labels" not in manifest["metadata"]: 207 ↛ 209line 207 didn't jump to line 209 because the condition on line 207 was always true
208 manifest["metadata"]["labels"] = {}
209 manifest["metadata"]["labels"].update(labels)
211 # Submit via API
212 return self._aws_client.submit_manifests(
213 manifests=manifest_list,
214 namespace=namespace,
215 target_region=target_region,
216 dry_run=dry_run,
217 )
219 def submit_job_direct(
220 self,
221 manifests: str | list[dict[str, Any]],
222 region: str,
223 namespace: str | None = None,
224 dry_run: bool = False,
225 labels: dict[str, str] | None = None,
226 ) -> dict[str, Any]:
227 """
228 Submit a job directly to a regional cluster using kubectl.
230 This bypasses the API Gateway and submits directly to the EKS cluster
231 using kubectl. Requires:
232 - kubectl installed and in PATH
233 - EKS access entry configured for your IAM principal
234 - AWS credentials with eks:DescribeCluster permission
236 Args:
237 manifests: Path to manifest file/directory or list of manifest dicts
238 region: Target region for direct submission (required)
239 namespace: Fallback namespace for manifests that don't declare
240 their own. When set, each manifest's ``metadata.namespace`` is
241 filled in only if missing — existing values are preserved so
242 users who've declared a target namespace in the manifest can
243 rely on it reaching ``kubectl apply`` untouched.
244 dry_run: Validate without applying
245 labels: Additional labels to add to manifests
247 Returns:
248 Submission result dictionary
249 """
250 import subprocess
251 import tempfile
252 import uuid
254 # Load manifests if path provided
255 manifest_list = self.load_manifests(manifests) if isinstance(manifests, str) else manifests
257 # Apply namespace as a fallback only — preserve any namespace the
258 # manifest declared itself.
259 if namespace:
260 for manifest in manifest_list:
261 if "metadata" not in manifest:
262 manifest["metadata"] = {}
263 manifest["metadata"].setdefault("namespace", namespace)
265 # Apply additional labels
266 if labels:
267 for manifest in manifest_list:
268 if "metadata" not in manifest:
269 manifest["metadata"] = {}
270 if "labels" not in manifest["metadata"]: 270 ↛ 272line 270 didn't jump to line 272 because the condition on line 270 was always true
271 manifest["metadata"]["labels"] = {}
272 manifest["metadata"]["labels"].update(labels)
274 # Get cluster name from stack
275 stack = self._aws_client.get_regional_stack(region)
276 if not stack:
277 raise ValueError(f"No GCO stack found in region {region}")
279 cluster_name = stack.cluster_name
281 # Update kubeconfig for the cluster
282 from .kubectl_helpers import update_kubeconfig
284 update_kubeconfig(cluster_name, region)
286 # Handle existing Job resources before applying
287 warnings: list[str] = []
288 if not dry_run:
289 for manifest in manifest_list:
290 if manifest.get("kind") != "Job": 290 ↛ 291line 290 didn't jump to line 291 because the condition on line 290 was never true
291 continue
292 job_name = manifest.get("metadata", {}).get("name")
293 job_ns = manifest.get("metadata", {}).get("namespace", namespace or "default")
294 if not job_name:
295 continue
297 existing_status = self._get_kubectl_job_status(job_name, job_ns)
298 if existing_status is None:
299 # No existing job — nothing to do
300 continue
302 if existing_status in ("complete", "failed"):
303 # Finished job — safe to delete and replace
304 subprocess.run(
305 ["kubectl", "delete", "job", job_name, "-n", job_ns],
306 capture_output=True,
307 text=True,
308 )
309 else:
310 # Job is still active — auto-rename to avoid collision
311 suffix = uuid.uuid4().hex[:5]
312 new_name = f"{job_name}-{suffix}"
313 original_name = job_name
314 manifest["metadata"]["name"] = new_name
315 warnings.append(
316 f"Job '{original_name}' is still running in namespace "
317 f"'{job_ns}'. Renamed new submission to '{new_name}'."
318 )
319 logger.warning(
320 "Job %s is active in %s, renamed to %s",
321 original_name,
322 job_ns,
323 new_name,
324 )
326 # Write manifests to temp file
327 with tempfile.NamedTemporaryFile(mode="w", suffix=".yaml", delete=False) as f:
328 yaml.dump_all(manifest_list, f)
329 f.flush() # Ensure content is written before using f.name
330 temp_path = f.name # nosemgrep: tempfile-without-flush
332 try:
333 # Build kubectl command
334 kubectl_cmd = ["kubectl", "apply", "-f", temp_path]
336 if dry_run:
337 kubectl_cmd.extend(["--dry-run=client"])
339 # Run kubectl apply
340 result = subprocess.run(
341 kubectl_cmd, capture_output=True, text=True
342 ) # nosemgrep: dangerous-subprocess-use-audit - kubectl_cmd is a list ["kubectl","apply","-f",temp_path]; temp_path is a secure tempfile, not user input
344 if result.returncode != 0:
345 raise RuntimeError(f"kubectl apply failed: {result.stderr}")
347 # Parse output to get job name
348 output_lines = result.stdout.strip().split("\n")
349 created_resources = []
350 for line in output_lines:
351 if line: 351 ↛ 350line 351 didn't jump to line 350 because the condition on line 351 was always true
352 created_resources.append(line)
354 # Get job name from first manifest (may have been renamed)
355 job_name = None
356 for manifest in manifest_list: 356 ↛ 361line 356 didn't jump to line 361 because the loop on line 356 didn't complete
357 if manifest.get("kind") == "Job": 357 ↛ 356line 357 didn't jump to line 356 because the condition on line 357 was always true
358 job_name = manifest.get("metadata", {}).get("name")
359 break
361 response: dict[str, Any] = {
362 "status": "success",
363 "method": "kubectl",
364 "cluster": cluster_name,
365 "region": region,
366 "namespace": namespace or "default",
367 "job_name": job_name,
368 "dry_run": dry_run,
369 "resources": created_resources,
370 "output": result.stdout,
371 }
372 if warnings:
373 response["warnings"] = warnings
374 return response
376 finally:
377 # Clean up temp file
378 import os
380 os.unlink(temp_path)
382 def _get_kubectl_job_status(self, job_name: str, namespace: str) -> str | None:
383 """Check the status of an existing Job via kubectl.
385 Returns:
386 "complete", "failed", "active", or None if the job doesn't exist.
387 """
388 import json
389 import subprocess
391 result = subprocess.run(
392 [
393 "kubectl",
394 "get",
395 "job",
396 job_name,
397 "-n",
398 namespace,
399 "-o",
400 "json",
401 ],
402 capture_output=True,
403 text=True,
404 )
405 if result.returncode != 0:
406 return None # Job doesn't exist
408 try:
409 job_data = json.loads(result.stdout)
410 except json.JSONDecodeError, KeyError:
411 return None
413 conditions = job_data.get("status", {}).get("conditions") or []
414 for condition in conditions:
415 cond_type = condition.get("type", "")
416 cond_status = condition.get("status", "")
417 if cond_type == "Complete" and cond_status == "True":
418 return "complete"
419 if cond_type == "Failed" and cond_status == "True":
420 return "failed"
421 return "active"
423 def list_jobs(
424 self,
425 region: str | None = None,
426 namespace: str | None = None,
427 status: str | None = None,
428 all_regions: bool = False,
429 ) -> list[JobInfo]:
430 """
431 List jobs across GCO clusters.
433 Args:
434 region: Specific region to query
435 namespace: Filter by namespace
436 status: Filter by status
437 all_regions: Query all discovered regions
439 Returns:
440 List of JobInfo objects
441 """
442 jobs = []
444 if all_regions:
445 # Query all discovered regional stacks
446 stacks = self._aws_client.discover_regional_stacks()
447 for stack_region in stacks:
448 try:
449 region_jobs = self._query_jobs_in_region(stack_region, namespace, status)
450 jobs.extend(region_jobs)
451 except Exception as e:
452 logger.warning("Failed to query jobs in %s: %s", stack_region, e)
453 continue
454 elif region:
455 jobs = self._query_jobs_in_region(region, namespace, status)
456 else:
457 # Use default region
458 jobs = self._query_jobs_in_region(self.config.default_region, namespace, status)
460 return jobs
462 def _query_jobs_in_region(
463 self, region: str, namespace: str | None, status: str | None
464 ) -> list[JobInfo]:
465 """Query jobs in a specific region."""
466 try:
467 response = self._aws_client.get_jobs(region=region, namespace=namespace, status=status)
469 jobs = []
470 # response is a list, but we expect a dict with "jobs" key from the API
471 job_list = response.get("jobs", []) if isinstance(response, dict) else response
472 for job_data in job_list:
473 jobs.append(self._parse_job_info(job_data, region))
475 return jobs
476 except Exception as exc:
477 logger.warning("Failed to query jobs in %s: %s", region, exc)
478 return []
480 def _parse_job_info(self, job_data: dict[str, Any], region: str) -> JobInfo:
481 """Parse job data into JobInfo object."""
482 metadata = job_data.get("metadata", {})
483 status_data = job_data.get("status", {})
484 spec = job_data.get("spec", {})
486 # Determine job status
487 conditions = status_data.get("conditions", [])
488 job_status = "pending"
489 for condition in conditions:
490 if condition.get("type") == "Complete" and condition.get("status") == "True":
491 job_status = "succeeded"
492 break
493 if condition.get("type") == "Failed" and condition.get("status") == "True": 493 ↛ 489line 493 didn't jump to line 489 because the condition on line 493 was always true
494 job_status = "failed"
495 break
497 if job_status == "pending" and status_data.get("active", 0) > 0:
498 job_status = "running"
500 # Parse timestamps
501 created_time = None
502 if metadata.get("creationTimestamp"):
503 created_time = datetime.fromisoformat(
504 metadata["creationTimestamp"].replace("Z", "+00:00")
505 )
507 start_time = None
508 if status_data.get("startTime"):
509 start_time = datetime.fromisoformat(status_data["startTime"].replace("Z", "+00:00"))
511 completion_time = None
512 if status_data.get("completionTime"):
513 completion_time = datetime.fromisoformat(
514 status_data["completionTime"].replace("Z", "+00:00")
515 )
517 return JobInfo(
518 name=metadata.get("name", ""),
519 namespace=metadata.get("namespace", "default"),
520 region=region,
521 status=job_status,
522 created_time=created_time,
523 start_time=start_time,
524 completion_time=completion_time,
525 active_pods=status_data.get("active", 0),
526 succeeded_pods=status_data.get("succeeded", 0),
527 failed_pods=status_data.get("failed", 0),
528 parallelism=spec.get("parallelism", 1),
529 completions=spec.get("completions", 1),
530 labels=metadata.get("labels", {}),
531 image_refs=_extract_image_refs(spec),
532 )
534 def get_job(self, job_name: str, namespace: str, region: str | None = None) -> JobInfo | None:
535 """
536 Get detailed information about a specific job.
538 Args:
539 job_name: Name of the job
540 namespace: Namespace of the job
541 region: Region where the job is running
543 Returns:
544 JobInfo or None if not found
545 """
546 try:
547 response = self._aws_client.get_job_details(
548 job_name=job_name, namespace=namespace, region=region or self.config.default_region
549 )
550 return self._parse_job_info(response, region or self.config.default_region)
551 except Exception as e:
552 logger.debug("Failed to get job details for %s: %s", job_name, e)
553 return None
555 def get_job_logs(
556 self,
557 job_name: str,
558 namespace: str,
559 region: str | None = None,
560 tail_lines: int = 100,
561 follow: bool = False,
562 since_hours: int = 24,
563 ) -> str:
564 """
565 Get logs from a job.
567 Tries the Kubernetes API first (via the GCO API). If the pod is no
568 longer available (completed/deleted), falls back to CloudWatch Logs
569 where Container Insights stores application logs.
571 Args:
572 job_name: Name of the job
573 namespace: Namespace of the job
574 region: Region where the job is running
575 tail_lines: Number of lines to return
576 follow: Stream logs (not implemented yet)
577 since_hours: Hours to look back in CloudWatch (default 24)
579 Returns:
580 Log content as string
581 """
582 if follow:
583 raise NotImplementedError("Log streaming not yet implemented")
585 target_region = region or self.config.default_region
587 try:
588 return self._aws_client.get_job_logs(
589 job_name=job_name,
590 namespace=namespace,
591 region=target_region,
592 tail_lines=tail_lines,
593 )
594 except RuntimeError as e:
595 error_msg = str(e)
596 # If the pod is gone or pending, try CloudWatch
597 if any(
598 hint in error_msg.lower()
599 for hint in ["not found", "pending", "no pods", "terminated", "completed"]
600 ):
601 logger.info("Pod not available, falling back to CloudWatch Logs")
602 try:
603 return self._get_cloudwatch_logs(
604 job_name=job_name,
605 region=target_region,
606 tail_lines=tail_lines,
607 since_hours=since_hours,
608 )
609 except Exception as cw_err:
610 logger.debug("CloudWatch fallback failed: %s", cw_err)
611 raise RuntimeError(
612 f"{error_msg}\n\n"
613 f"CloudWatch Logs fallback also failed: {cw_err}\n"
614 f"Tip: Container logs appear in CloudWatch within a few minutes. "
615 f"If the job just finished, try again shortly."
616 ) from e
617 raise
619 def _get_cloudwatch_logs(
620 self,
621 job_name: str,
622 region: str,
623 tail_lines: int = 100,
624 since_hours: int = 24,
625 ) -> str:
626 """
627 Fetch job logs from CloudWatch Logs (Container Insights).
629 The CloudWatch Observability addon ships container stdout/stderr to:
630 /aws/containerinsights/{cluster_name}/application
632 Args:
633 job_name: Name of the job (used to filter log streams)
634 region: AWS region
635 tail_lines: Number of log lines to return
636 since_hours: Hours to look back (default 24)
638 Returns:
639 Log content as string
640 """
641 cluster_name = f"{self.config.project_name}-{region}"
642 log_group = f"/aws/containerinsights/{cluster_name}/application"
644 logs_client = self._aws_client._session.client("logs", region_name=region)
646 import time
648 now = int(time.time())
649 start_time = now - (since_hours * 3600)
651 query = (
652 f"fields @timestamp, @message "
653 f'| filter @logStream like "{job_name}" '
654 f"| sort @timestamp asc "
655 f"| limit {tail_lines}"
656 )
658 start_query = logs_client.start_query(
659 logGroupName=log_group,
660 startTime=start_time,
661 endTime=now,
662 queryString=query,
663 )
664 query_id = start_query["queryId"]
666 # Poll for results (CloudWatch Insights is async)
667 result = None
668 for _ in range(30): # up to 30 seconds
669 time.sleep(1)
670 result = logs_client.get_query_results(queryId=query_id)
671 if result["status"] in ("Complete", "Failed", "Cancelled"):
672 break
674 if result is None or result["status"] != "Complete":
675 status = result["status"] if result else "unknown"
676 raise RuntimeError(
677 f"CloudWatch Logs query did not complete (status: {status}). Try again in a moment."
678 )
680 if not result["results"]:
681 raise RuntimeError(
682 f"No logs found in CloudWatch for job '{job_name}' "
683 f"in the last {since_hours} hours (log group: {log_group}). "
684 f"Logs may take 1-2 minutes to appear after a pod runs. "
685 f"Use --since to search further back, or check the job name "
686 f"with: gco jobs list -r {region}"
687 )
689 # Extract log messages from results.
690 # CloudWatch Container Insights wraps logs in a JSON envelope:
691 # {"time":"...","stream":"stdout","log":"actual message","kubernetes":{...}}
692 # We parse out the "log" field for clean output, falling back to the
693 # raw message if it's not JSON.
694 import json as _json
696 lines = []
697 for row in result["results"]:
698 for entry in row: 698 ↛ 697line 698 didn't jump to line 697 because the loop on line 698 didn't complete
699 if entry["field"] == "@message":
700 raw = entry["value"].rstrip()
701 try:
702 parsed = _json.loads(raw)
703 lines.append(parsed.get("log", raw).rstrip())
704 except ValueError, TypeError:
705 lines.append(raw)
706 break
708 header = f"[CloudWatch Logs — {log_group}]\n"
709 return header + "\n".join(lines)
711 def delete_job(
712 self, job_name: str, namespace: str, region: str | None = None
713 ) -> dict[str, Any]:
714 """
715 Delete a job.
717 Args:
718 job_name: Name of the job
719 namespace: Namespace of the job
720 region: Region where the job is running
722 Returns:
723 Deletion result
724 """
725 return self._aws_client.delete_job(
726 job_name=job_name, namespace=namespace, region=region or self.config.default_region
727 )
729 def wait_for_job(
730 self,
731 job_name: str,
732 namespace: str,
733 region: str | None = None,
734 timeout_seconds: int = 3600,
735 poll_interval: int = 10,
736 progress_callback: Callable[[JobInfo, int], None] | None = None,
737 ) -> JobInfo:
738 """
739 Wait for a job to complete with progress reporting.
741 Args:
742 job_name: Name of the job
743 namespace: Namespace of the job
744 region: Region where the job is running
745 timeout_seconds: Maximum time to wait
746 poll_interval: Seconds between status checks
747 progress_callback: Optional callable(JobInfo, elapsed_seconds) for progress updates.
748 If None, a default stderr progress line is printed.
750 Returns:
751 Final JobInfo
753 Raises:
754 TimeoutError: If job doesn't complete within timeout
755 """
756 import sys
757 import time
759 start_time = time.time()
761 while True:
762 job = self.get_job(job_name, namespace, region)
764 if job is None:
765 raise ValueError(f"Job {job_name} not found in namespace {namespace}")
767 elapsed = time.time() - start_time
768 elapsed_str = _format_duration(int(elapsed))
770 if job.is_complete:
771 # Clear the progress line and return
772 sys.stderr.write("\r\033[K")
773 sys.stderr.flush()
774 return job
776 # Build progress message
777 pods_info = (
778 f"{job.active_pods} active, {job.succeeded_pods}/{job.completions} succeeded"
779 )
780 if job.failed_pods: 780 ↛ 781line 780 didn't jump to line 781 because the condition on line 780 was never true
781 pods_info += f", {job.failed_pods} failed"
783 status_line = f" ⏳ {job.status.capitalize()} — {pods_info} — {elapsed_str} elapsed"
785 if progress_callback: 785 ↛ 789line 785 didn't jump to line 789 because the condition on line 785 was always true
786 progress_callback(job, int(elapsed))
787 else:
788 # Overwrite the same line on stderr
789 sys.stderr.write(f"\r\033[K{status_line}")
790 sys.stderr.flush()
792 if elapsed >= timeout_seconds:
793 sys.stderr.write("\r\033[K")
794 sys.stderr.flush()
795 raise TimeoutError(
796 f"Job {job_name} did not complete within {timeout_seconds} seconds "
797 f"(last status: {job.status}, pods: {pods_info})"
798 )
800 time.sleep(poll_interval) # nosemgrep: arbitrary-sleep - intentional polling delay
802 def submit_job_sqs(
803 self,
804 manifests: str | list[dict[str, Any]],
805 region: str,
806 namespace: str | None = None,
807 labels: dict[str, str] | None = None,
808 priority: int = 0,
809 ) -> dict[str, Any]:
810 """
811 Submit a job to a regional SQS queue for processing.
813 This is the recommended way to submit jobs as it:
814 - Decouples submission from processing
815 - Enables KEDA-based autoscaling
816 - Provides better fault tolerance
818 Args:
819 manifests: Path to manifest file/directory or list of manifest dicts
820 region: Target region for job submission (required)
821 namespace: Fallback namespace for manifests that don't declare
822 their own. When set, each manifest's ``metadata.namespace`` is
823 filled in only if missing — existing values are preserved so
824 users who've declared a target namespace in the manifest can
825 rely on it reaching the queue processor untouched. Server-side
826 validation enforces the allowlist.
827 labels: Additional labels to add to manifests
828 priority: Job priority (higher = more important)
830 Returns:
831 Submission result dictionary with message_id and queue info
832 """
833 import json
834 import uuid
836 import boto3
838 # Load manifests if path provided
839 manifest_list = self.load_manifests(manifests) if isinstance(manifests, str) else manifests
841 # Apply namespace as a fallback only — preserve any namespace the
842 # manifest declared itself.
843 if namespace:
844 for manifest in manifest_list:
845 if "metadata" not in manifest:
846 manifest["metadata"] = {}
847 manifest["metadata"].setdefault("namespace", namespace)
849 # Apply additional labels
850 if labels:
851 for manifest in manifest_list:
852 if "metadata" not in manifest: 852 ↛ 853line 852 didn't jump to line 853 because the condition on line 852 was never true
853 manifest["metadata"] = {}
854 if "labels" not in manifest["metadata"]: 854 ↛ 856line 854 didn't jump to line 856 because the condition on line 854 was always true
855 manifest["metadata"]["labels"] = {}
856 manifest["metadata"]["labels"].update(labels)
858 # Get queue URL from stack
859 stack = self._aws_client.get_regional_stack(region)
860 if not stack:
861 raise ValueError(f"No GCO stack found in region {region}")
863 # Get queue URL from CloudFormation outputs
864 cfn = boto3.client("cloudformation", region_name=region)
865 response = cfn.describe_stacks(StackName=stack.stack_name)
866 outputs = {
867 o["OutputKey"]: o["OutputValue"] for o in response["Stacks"][0].get("Outputs", [])
868 }
869 queue_url = outputs.get("JobQueueUrl")
871 if not queue_url:
872 raise ValueError(f"Job queue not found in stack {stack.stack_name}")
874 # Create SQS message. The ``namespace`` field in the envelope is
875 # informational only — the queue processor reads each manifest's
876 # own ``metadata.namespace`` for validation and application. Report
877 # the first manifest's namespace here so the submission response
878 # matches reality when the user doesn't pass ``--namespace``.
879 job_id = str(uuid.uuid4())[:8]
880 envelope_namespace = namespace or _first_manifest_namespace(manifest_list) or "gco-jobs"
881 message_body = {
882 "job_id": job_id,
883 "manifests": manifest_list,
884 "namespace": envelope_namespace,
885 "priority": priority,
886 "submitted_at": datetime.now(UTC).isoformat(),
887 }
889 # Send to SQS
890 sqs = boto3.client("sqs", region_name=region)
891 response = sqs.send_message(
892 QueueUrl=queue_url,
893 MessageBody=json.dumps(message_body),
894 MessageAttributes={
895 "Priority": {"DataType": "Number", "StringValue": str(priority)},
896 "JobId": {"DataType": "String", "StringValue": job_id},
897 },
898 )
900 # Get job name from first manifest
901 job_name = None
902 for manifest in manifest_list: 902 ↛ 907line 902 didn't jump to line 907 because the loop on line 902 didn't complete
903 if manifest.get("kind") == "Job": 903 ↛ 902line 903 didn't jump to line 902 because the condition on line 903 was always true
904 job_name = manifest.get("metadata", {}).get("name")
905 break
907 return {
908 "status": "queued",
909 "method": "sqs",
910 "message_id": response["MessageId"],
911 "job_id": job_id,
912 "job_name": job_name,
913 "queue_url": queue_url,
914 "region": region,
915 "namespace": envelope_namespace,
916 "priority": priority,
917 }
919 def get_queue_status(self, region: str) -> dict[str, Any]:
920 """
921 Get the status of the job queue in a region.
923 Args:
924 region: AWS region
926 Returns:
927 Queue status including message counts
928 """
929 import boto3
931 stack = self._aws_client.get_regional_stack(region)
932 if not stack:
933 raise ValueError(f"No GCO stack found in region {region}")
935 # Get queue URLs from CloudFormation outputs
936 cfn = boto3.client("cloudformation", region_name=region)
937 response = cfn.describe_stacks(StackName=stack.stack_name)
938 outputs = {
939 o["OutputKey"]: o["OutputValue"] for o in response["Stacks"][0].get("Outputs", [])
940 }
942 queue_url = outputs.get("JobQueueUrl")
943 dlq_url = outputs.get("JobDlqUrl")
945 if not queue_url:
946 raise ValueError(f"Job queue not found in stack {stack.stack_name}")
948 sqs = boto3.client("sqs", region_name=region)
950 # Get main queue attributes
951 queue_attrs = sqs.get_queue_attributes(
952 QueueUrl=queue_url,
953 AttributeNames=[
954 "ApproximateNumberOfMessages",
955 "ApproximateNumberOfMessagesNotVisible",
956 "ApproximateNumberOfMessagesDelayed",
957 ],
958 )["Attributes"]
960 result = {
961 "region": region,
962 "queue_url": queue_url,
963 "messages_available": int(queue_attrs.get("ApproximateNumberOfMessages", 0)),
964 "messages_in_flight": int(queue_attrs.get("ApproximateNumberOfMessagesNotVisible", 0)),
965 "messages_delayed": int(queue_attrs.get("ApproximateNumberOfMessagesDelayed", 0)),
966 }
968 # Get DLQ attributes if available
969 if dlq_url: 969 ↛ 977line 969 didn't jump to line 977 because the condition on line 969 was always true
970 dlq_attrs = sqs.get_queue_attributes(
971 QueueUrl=dlq_url,
972 AttributeNames=["ApproximateNumberOfMessages"],
973 )["Attributes"]
974 result["dlq_url"] = dlq_url
975 result["dlq_messages"] = int(dlq_attrs.get("ApproximateNumberOfMessages", 0))
977 return result
979 def list_jobs_global(
980 self,
981 namespace: str | None = None,
982 status: str | None = None,
983 limit: int = 50,
984 ) -> dict[str, Any]:
985 """
986 List jobs across all regions via the global API endpoint.
988 This uses the cross-region aggregator Lambda to query all regional
989 clusters in parallel and return a unified view.
991 Args:
992 namespace: Filter by namespace
993 status: Filter by status
994 limit: Maximum jobs to return
996 Returns:
997 Aggregated job list with region information
998 """
999 return self._aws_client.get_global_jobs(
1000 namespace=namespace,
1001 status=status,
1002 limit=limit,
1003 )
1005 def get_global_health(self) -> dict[str, Any]:
1006 """
1007 Get health status across all regions.
1009 Returns:
1010 Aggregated health status from all regional clusters
1011 """
1012 return self._aws_client.get_global_health()
1014 def get_global_status(self) -> dict[str, Any]:
1015 """
1016 Get cluster status across all regions.
1018 Returns:
1019 Aggregated status from all regional clusters
1020 """
1021 return self._aws_client.get_global_status()
1023 def bulk_delete_global(
1024 self,
1025 namespace: str | None = None,
1026 status: str | None = None,
1027 older_than_days: int | None = None,
1028 dry_run: bool = True,
1029 ) -> dict[str, Any]:
1030 """
1031 Bulk delete jobs across all regions.
1033 Args:
1034 namespace: Filter by namespace
1035 status: Filter by status
1036 older_than_days: Delete jobs older than N days
1037 dry_run: If True, only return what would be deleted
1039 Returns:
1040 Deletion results from all regions
1041 """
1042 return self._aws_client.bulk_delete_global(
1043 namespace=namespace,
1044 status=status,
1045 older_than_days=older_than_days,
1046 dry_run=dry_run,
1047 )
1049 def get_job_events(
1050 self,
1051 job_name: str,
1052 namespace: str,
1053 region: str | None = None,
1054 ) -> dict[str, Any]:
1055 """
1056 Get Kubernetes events for a job.
1058 Args:
1059 job_name: Name of the job
1060 namespace: Namespace of the job
1061 region: Region where the job is running
1063 Returns:
1064 Events related to the job
1065 """
1066 return self._aws_client.get_job_events(
1067 job_name=job_name,
1068 namespace=namespace,
1069 region=region or self.config.default_region,
1070 )
1072 def get_job_pods(
1073 self,
1074 job_name: str,
1075 namespace: str,
1076 region: str | None = None,
1077 ) -> dict[str, Any]:
1078 """
1079 Get pods for a job.
1081 Args:
1082 job_name: Name of the job
1083 namespace: Namespace of the job
1084 region: Region where the job is running
1086 Returns:
1087 Pod details for the job
1088 """
1089 return self._aws_client.get_job_pods(
1090 job_name=job_name,
1091 namespace=namespace,
1092 region=region or self.config.default_region,
1093 )
1095 def get_pod_logs(
1096 self,
1097 job_name: str,
1098 pod_name: str,
1099 namespace: str,
1100 region: str | None = None,
1101 tail_lines: int = 100,
1102 container: str | None = None,
1103 ) -> dict[str, Any]:
1104 """
1105 Get logs from a specific pod of a job.
1107 Args:
1108 job_name: Name of the job
1109 pod_name: Name of the pod
1110 namespace: Namespace of the job
1111 region: Region where the job is running
1112 tail_lines: Number of lines to return
1113 container: Container name (for multi-container pods)
1115 Returns:
1116 Pod logs response
1117 """
1118 return self._aws_client.get_pod_logs(
1119 job_name=job_name,
1120 pod_name=pod_name,
1121 namespace=namespace,
1122 region=region or self.config.default_region,
1123 tail_lines=tail_lines,
1124 container=container,
1125 )
1127 def get_job_metrics(
1128 self,
1129 job_name: str,
1130 namespace: str,
1131 region: str | None = None,
1132 ) -> dict[str, Any]:
1133 """
1134 Get resource metrics for a job.
1136 Args:
1137 job_name: Name of the job
1138 namespace: Namespace of the job
1139 region: Region where the job is running
1141 Returns:
1142 Resource usage metrics for the job's pods
1143 """
1144 return self._aws_client.get_job_metrics(
1145 job_name=job_name,
1146 namespace=namespace,
1147 region=region or self.config.default_region,
1148 )
1150 def retry_job(
1151 self,
1152 job_name: str,
1153 namespace: str,
1154 region: str | None = None,
1155 ) -> dict[str, Any]:
1156 """
1157 Retry a failed job.
1159 Creates a new job from the failed job's spec with a new name.
1161 Args:
1162 job_name: Name of the failed job
1163 namespace: Namespace of the job
1164 region: Region where the job is running
1166 Returns:
1167 Result with new job name
1168 """
1169 return self._aws_client.retry_job(
1170 job_name=job_name,
1171 namespace=namespace,
1172 region=region or self.config.default_region,
1173 )
1175 def bulk_delete_jobs(
1176 self,
1177 namespace: str | None = None,
1178 status: str | None = None,
1179 older_than_days: int | None = None,
1180 label_selector: str | None = None,
1181 region: str | None = None,
1182 dry_run: bool = True,
1183 ) -> dict[str, Any]:
1184 """
1185 Bulk delete jobs in a region.
1187 Args:
1188 namespace: Filter by namespace
1189 status: Filter by status
1190 older_than_days: Delete jobs older than N days
1191 label_selector: Kubernetes label selector
1192 region: Target region
1193 dry_run: If True, only return what would be deleted
1195 Returns:
1196 Deletion results
1197 """
1198 return self._aws_client.bulk_delete_jobs(
1199 namespace=namespace,
1200 status=status,
1201 older_than_days=older_than_days,
1202 label_selector=label_selector,
1203 region=region or self.config.default_region,
1204 dry_run=dry_run,
1205 )
1208def get_job_manager(config: GCOConfig | None = None) -> JobManager:
1209 """Get a configured job manager instance."""
1210 return JobManager(config)