Coverage for cli / jobs.py: 94%
386 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-30 21:47 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-30 21:47 +0000
1"""
2Job management for GCO CLI.
4Provides functionality to submit, query, and manage jobs across GCO clusters.
5"""
7from collections.abc import Callable
8from dataclasses import dataclass, field
9from datetime import UTC, datetime
10from pathlib import Path
11from typing import Any
13import yaml
15from gco.services.manifest_processor import safe_load_all_yaml
17from .aws_client import get_aws_client
18from .config import GCOConfig, get_config
20logger = __import__("logging").getLogger(__name__)
23def _format_duration(seconds: int) -> str:
24 """Format seconds into a human-readable duration string."""
25 if seconds < 60:
26 return f"{seconds}s"
27 minutes, secs = divmod(seconds, 60)
28 if minutes < 60:
29 return f"{minutes}m{secs:02d}s"
30 hours, mins = divmod(minutes, 60)
31 return f"{hours}h{mins:02d}m{secs:02d}s"
34def _first_manifest_namespace(manifests: list[dict[str, Any]]) -> str | None:
35 """Return the first explicit ``metadata.namespace`` found in a manifest list.
37 Used by the SQS submission path to populate the envelope ``namespace``
38 field (informational — the queue processor reads each manifest's own
39 namespace for validation). Returns None if no manifest declares one.
40 """
41 for manifest in manifests:
42 ns = manifest.get("metadata", {}).get("namespace") if isinstance(manifest, dict) else None
43 if ns: 43 ↛ 44line 43 didn't jump to line 44 because the condition on line 43 was never true
44 return str(ns)
45 return None
48@dataclass
49class JobInfo:
50 """Information about a Kubernetes job."""
52 name: str
53 namespace: str
54 region: str
55 status: str # "pending", "running", "succeeded", "failed"
56 created_time: datetime | None = None
57 start_time: datetime | None = None
58 completion_time: datetime | None = None
59 active_pods: int = 0
60 succeeded_pods: int = 0
61 failed_pods: int = 0
62 parallelism: int = 1
63 completions: int = 1
64 labels: dict[str, str] = field(default_factory=dict)
66 @property
67 def is_complete(self) -> bool:
68 return self.status in ("succeeded", "failed")
70 @property
71 def duration_seconds(self) -> int | None:
72 if self.start_time and self.completion_time:
73 return int((self.completion_time - self.start_time).total_seconds())
74 if self.start_time:
75 return int((datetime.now(UTC) - self.start_time).total_seconds())
76 return None
79class JobManager:
80 """
81 Manages jobs across GCO clusters.
83 Provides:
84 - Job submission with region targeting
85 - Job status queries across regions
86 - Job logs retrieval
87 - Job deletion
88 """
90 def __init__(self, config: GCOConfig | None = None):
91 self.config = config or get_config()
92 self._aws_client = get_aws_client(config)
93 # Configure regional API mode if enabled
94 if hasattr(self.config, "use_regional_api") and self.config.use_regional_api:
95 self._aws_client.set_use_regional_api(True)
97 def load_manifests(self, path: str) -> list[dict[str, Any]]:
98 """
99 Load Kubernetes manifests from a file or directory.
101 Args:
102 path: Path to YAML file or directory containing YAML files
104 Returns:
105 List of manifest dictionaries
106 """
107 manifests = []
108 path_obj = Path(path)
110 if path_obj.is_file():
111 manifests.extend(self._load_yaml_file(path_obj))
112 elif path_obj.is_dir():
113 for yaml_file in sorted(path_obj.glob("*.yaml")):
114 manifests.extend(self._load_yaml_file(yaml_file))
115 for yaml_file in sorted(path_obj.glob("*.yml")):
116 manifests.extend(self._load_yaml_file(yaml_file))
117 else:
118 raise FileNotFoundError(f"Path not found: {path}")
120 return manifests
122 def _load_yaml_file(self, path: Path) -> list[dict[str, Any]]:
123 """Load manifests from a single YAML file."""
124 with open(path, encoding="utf-8") as f:
125 return safe_load_all_yaml(f, allow_aliases=False)
127 def submit_job(
128 self,
129 manifests: str | list[dict[str, Any]],
130 namespace: str | None = None,
131 target_region: str | None = None,
132 dry_run: bool = False,
133 labels: dict[str, str] | None = None,
134 ) -> dict[str, Any]:
135 """
136 Submit a job to GCO.
138 Args:
139 manifests: Path to manifest file/directory or list of manifest dicts
140 namespace: Fallback namespace for manifests that don't declare
141 their own. When set, each manifest's ``metadata.namespace`` is
142 filled in only if missing — existing values are preserved so
143 users who've declared a target namespace in the manifest can
144 rely on it reaching the server untouched. Server-side
145 validation enforces the allowlist.
146 target_region: Force job to specific region
147 dry_run: Validate without applying
148 labels: Additional labels to add to manifests
150 Returns:
151 Submission result dictionary
152 """
153 # Load manifests if path provided
154 manifest_list = self.load_manifests(manifests) if isinstance(manifests, str) else manifests
156 # Apply namespace as a fallback only — preserve any namespace the
157 # manifest declared itself.
158 if namespace:
159 for manifest in manifest_list:
160 if "metadata" not in manifest:
161 manifest["metadata"] = {}
162 manifest["metadata"].setdefault("namespace", namespace)
164 # Apply additional labels
165 if labels:
166 for manifest in manifest_list:
167 if "metadata" not in manifest:
168 manifest["metadata"] = {}
169 if "labels" not in manifest["metadata"]: 169 ↛ 171line 169 didn't jump to line 171 because the condition on line 169 was always true
170 manifest["metadata"]["labels"] = {}
171 manifest["metadata"]["labels"].update(labels)
173 # Submit via API
174 return self._aws_client.submit_manifests(
175 manifests=manifest_list,
176 namespace=namespace,
177 target_region=target_region,
178 dry_run=dry_run,
179 )
181 def submit_job_direct(
182 self,
183 manifests: str | list[dict[str, Any]],
184 region: str,
185 namespace: str | None = None,
186 dry_run: bool = False,
187 labels: dict[str, str] | None = None,
188 ) -> dict[str, Any]:
189 """
190 Submit a job directly to a regional cluster using kubectl.
192 This bypasses the API Gateway and submits directly to the EKS cluster
193 using kubectl. Requires:
194 - kubectl installed and in PATH
195 - EKS access entry configured for your IAM principal
196 - AWS credentials with eks:DescribeCluster permission
198 Args:
199 manifests: Path to manifest file/directory or list of manifest dicts
200 region: Target region for direct submission (required)
201 namespace: Fallback namespace for manifests that don't declare
202 their own. When set, each manifest's ``metadata.namespace`` is
203 filled in only if missing — existing values are preserved so
204 users who've declared a target namespace in the manifest can
205 rely on it reaching ``kubectl apply`` untouched.
206 dry_run: Validate without applying
207 labels: Additional labels to add to manifests
209 Returns:
210 Submission result dictionary
211 """
212 import subprocess
213 import tempfile
214 import uuid
216 # Load manifests if path provided
217 manifest_list = self.load_manifests(manifests) if isinstance(manifests, str) else manifests
219 # Apply namespace as a fallback only — preserve any namespace the
220 # manifest declared itself.
221 if namespace:
222 for manifest in manifest_list:
223 if "metadata" not in manifest:
224 manifest["metadata"] = {}
225 manifest["metadata"].setdefault("namespace", namespace)
227 # Apply additional labels
228 if labels:
229 for manifest in manifest_list:
230 if "metadata" not in manifest:
231 manifest["metadata"] = {}
232 if "labels" not in manifest["metadata"]: 232 ↛ 234line 232 didn't jump to line 234 because the condition on line 232 was always true
233 manifest["metadata"]["labels"] = {}
234 manifest["metadata"]["labels"].update(labels)
236 # Get cluster name from stack
237 stack = self._aws_client.get_regional_stack(region)
238 if not stack:
239 raise ValueError(f"No GCO stack found in region {region}")
241 cluster_name = stack.cluster_name
243 # Update kubeconfig for the cluster
244 from .kubectl_helpers import update_kubeconfig
246 update_kubeconfig(cluster_name, region)
248 # Handle existing Job resources before applying
249 warnings: list[str] = []
250 if not dry_run:
251 for manifest in manifest_list:
252 if manifest.get("kind") != "Job": 252 ↛ 253line 252 didn't jump to line 253 because the condition on line 252 was never true
253 continue
254 job_name = manifest.get("metadata", {}).get("name")
255 job_ns = manifest.get("metadata", {}).get("namespace", namespace or "default")
256 if not job_name:
257 continue
259 existing_status = self._get_kubectl_job_status(job_name, job_ns)
260 if existing_status is None:
261 # No existing job — nothing to do
262 continue
264 if existing_status in ("complete", "failed"):
265 # Finished job — safe to delete and replace
266 subprocess.run(
267 ["kubectl", "delete", "job", job_name, "-n", job_ns],
268 capture_output=True,
269 text=True,
270 )
271 else:
272 # Job is still active — auto-rename to avoid collision
273 suffix = uuid.uuid4().hex[:5]
274 new_name = f"{job_name}-{suffix}"
275 original_name = job_name
276 manifest["metadata"]["name"] = new_name
277 warnings.append(
278 f"Job '{original_name}' is still running in namespace "
279 f"'{job_ns}'. Renamed new submission to '{new_name}'."
280 )
281 logger.warning(
282 "Job %s is active in %s, renamed to %s",
283 original_name,
284 job_ns,
285 new_name,
286 )
288 # Write manifests to temp file
289 with tempfile.NamedTemporaryFile(mode="w", suffix=".yaml", delete=False) as f:
290 yaml.dump_all(manifest_list, f)
291 f.flush() # Ensure content is written before using f.name
292 temp_path = f.name # nosemgrep: tempfile-without-flush
294 try:
295 # Build kubectl command
296 kubectl_cmd = ["kubectl", "apply", "-f", temp_path]
298 if dry_run:
299 kubectl_cmd.extend(["--dry-run=client"])
301 # Run kubectl apply
302 result = subprocess.run(
303 kubectl_cmd, capture_output=True, text=True
304 ) # nosemgrep: dangerous-subprocess-use-audit - kubectl_cmd is a list ["kubectl","apply","-f",temp_path]; temp_path is a secure tempfile, not user input
306 if result.returncode != 0:
307 raise RuntimeError(f"kubectl apply failed: {result.stderr}")
309 # Parse output to get job name
310 output_lines = result.stdout.strip().split("\n")
311 created_resources = []
312 for line in output_lines:
313 if line: 313 ↛ 312line 313 didn't jump to line 312 because the condition on line 313 was always true
314 created_resources.append(line)
316 # Get job name from first manifest (may have been renamed)
317 job_name = None
318 for manifest in manifest_list: 318 ↛ 323line 318 didn't jump to line 323 because the loop on line 318 didn't complete
319 if manifest.get("kind") == "Job": 319 ↛ 318line 319 didn't jump to line 318 because the condition on line 319 was always true
320 job_name = manifest.get("metadata", {}).get("name")
321 break
323 response: dict[str, Any] = {
324 "status": "success",
325 "method": "kubectl",
326 "cluster": cluster_name,
327 "region": region,
328 "namespace": namespace or "default",
329 "job_name": job_name,
330 "dry_run": dry_run,
331 "resources": created_resources,
332 "output": result.stdout,
333 }
334 if warnings:
335 response["warnings"] = warnings
336 return response
338 finally:
339 # Clean up temp file
340 import os
342 os.unlink(temp_path)
344 def _get_kubectl_job_status(self, job_name: str, namespace: str) -> str | None:
345 """Check the status of an existing Job via kubectl.
347 Returns:
348 "complete", "failed", "active", or None if the job doesn't exist.
349 """
350 import json
351 import subprocess
353 result = subprocess.run(
354 [
355 "kubectl",
356 "get",
357 "job",
358 job_name,
359 "-n",
360 namespace,
361 "-o",
362 "json",
363 ],
364 capture_output=True,
365 text=True,
366 )
367 if result.returncode != 0:
368 return None # Job doesn't exist
370 try:
371 job_data = json.loads(result.stdout)
372 except json.JSONDecodeError, KeyError:
373 return None
375 conditions = job_data.get("status", {}).get("conditions") or []
376 for condition in conditions:
377 cond_type = condition.get("type", "")
378 cond_status = condition.get("status", "")
379 if cond_type == "Complete" and cond_status == "True":
380 return "complete"
381 if cond_type == "Failed" and cond_status == "True":
382 return "failed"
383 return "active"
385 def list_jobs(
386 self,
387 region: str | None = None,
388 namespace: str | None = None,
389 status: str | None = None,
390 all_regions: bool = False,
391 ) -> list[JobInfo]:
392 """
393 List jobs across GCO clusters.
395 Args:
396 region: Specific region to query
397 namespace: Filter by namespace
398 status: Filter by status
399 all_regions: Query all discovered regions
401 Returns:
402 List of JobInfo objects
403 """
404 jobs = []
406 if all_regions:
407 # Query all discovered regional stacks
408 stacks = self._aws_client.discover_regional_stacks()
409 for stack_region in stacks:
410 try:
411 region_jobs = self._query_jobs_in_region(stack_region, namespace, status)
412 jobs.extend(region_jobs)
413 except Exception as e:
414 logger.warning("Failed to query jobs in %s: %s", stack_region, e)
415 continue
416 elif region:
417 jobs = self._query_jobs_in_region(region, namespace, status)
418 else:
419 # Use default region
420 jobs = self._query_jobs_in_region(self.config.default_region, namespace, status)
422 return jobs
424 def _query_jobs_in_region(
425 self, region: str, namespace: str | None, status: str | None
426 ) -> list[JobInfo]:
427 """Query jobs in a specific region."""
428 try:
429 response = self._aws_client.get_jobs(region=region, namespace=namespace, status=status)
431 jobs = []
432 # response is a list, but we expect a dict with "jobs" key from the API
433 job_list = response.get("jobs", []) if isinstance(response, dict) else response
434 for job_data in job_list:
435 jobs.append(self._parse_job_info(job_data, region))
437 return jobs
438 except Exception as exc:
439 logger.warning("Failed to query jobs in %s: %s", region, exc)
440 return []
442 def _parse_job_info(self, job_data: dict[str, Any], region: str) -> JobInfo:
443 """Parse job data into JobInfo object."""
444 metadata = job_data.get("metadata", {})
445 status_data = job_data.get("status", {})
446 spec = job_data.get("spec", {})
448 # Determine job status
449 conditions = status_data.get("conditions", [])
450 job_status = "pending"
451 for condition in conditions:
452 if condition.get("type") == "Complete" and condition.get("status") == "True":
453 job_status = "succeeded"
454 break
455 if condition.get("type") == "Failed" and condition.get("status") == "True": 455 ↛ 451line 455 didn't jump to line 451 because the condition on line 455 was always true
456 job_status = "failed"
457 break
459 if job_status == "pending" and status_data.get("active", 0) > 0:
460 job_status = "running"
462 # Parse timestamps
463 created_time = None
464 if metadata.get("creationTimestamp"):
465 created_time = datetime.fromisoformat(
466 metadata["creationTimestamp"].replace("Z", "+00:00")
467 )
469 start_time = None
470 if status_data.get("startTime"):
471 start_time = datetime.fromisoformat(status_data["startTime"].replace("Z", "+00:00"))
473 completion_time = None
474 if status_data.get("completionTime"):
475 completion_time = datetime.fromisoformat(
476 status_data["completionTime"].replace("Z", "+00:00")
477 )
479 return JobInfo(
480 name=metadata.get("name", ""),
481 namespace=metadata.get("namespace", "default"),
482 region=region,
483 status=job_status,
484 created_time=created_time,
485 start_time=start_time,
486 completion_time=completion_time,
487 active_pods=status_data.get("active", 0),
488 succeeded_pods=status_data.get("succeeded", 0),
489 failed_pods=status_data.get("failed", 0),
490 parallelism=spec.get("parallelism", 1),
491 completions=spec.get("completions", 1),
492 labels=metadata.get("labels", {}),
493 )
495 def get_job(self, job_name: str, namespace: str, region: str | None = None) -> JobInfo | None:
496 """
497 Get detailed information about a specific job.
499 Args:
500 job_name: Name of the job
501 namespace: Namespace of the job
502 region: Region where the job is running
504 Returns:
505 JobInfo or None if not found
506 """
507 try:
508 response = self._aws_client.get_job_details(
509 job_name=job_name, namespace=namespace, region=region or self.config.default_region
510 )
511 return self._parse_job_info(response, region or self.config.default_region)
512 except Exception as e:
513 logger.debug("Failed to get job details for %s: %s", job_name, e)
514 return None
516 def get_job_logs(
517 self,
518 job_name: str,
519 namespace: str,
520 region: str | None = None,
521 tail_lines: int = 100,
522 follow: bool = False,
523 since_hours: int = 24,
524 ) -> str:
525 """
526 Get logs from a job.
528 Tries the Kubernetes API first (via the GCO API). If the pod is no
529 longer available (completed/deleted), falls back to CloudWatch Logs
530 where Container Insights stores application logs.
532 Args:
533 job_name: Name of the job
534 namespace: Namespace of the job
535 region: Region where the job is running
536 tail_lines: Number of lines to return
537 follow: Stream logs (not implemented yet)
538 since_hours: Hours to look back in CloudWatch (default 24)
540 Returns:
541 Log content as string
542 """
543 if follow:
544 raise NotImplementedError("Log streaming not yet implemented")
546 target_region = region or self.config.default_region
548 try:
549 return self._aws_client.get_job_logs(
550 job_name=job_name,
551 namespace=namespace,
552 region=target_region,
553 tail_lines=tail_lines,
554 )
555 except RuntimeError as e:
556 error_msg = str(e)
557 # If the pod is gone or pending, try CloudWatch
558 if any(
559 hint in error_msg.lower()
560 for hint in ["not found", "pending", "no pods", "terminated", "completed"]
561 ):
562 logger.info("Pod not available, falling back to CloudWatch Logs")
563 try:
564 return self._get_cloudwatch_logs(
565 job_name=job_name,
566 region=target_region,
567 tail_lines=tail_lines,
568 since_hours=since_hours,
569 )
570 except Exception as cw_err:
571 logger.debug("CloudWatch fallback failed: %s", cw_err)
572 raise RuntimeError(
573 f"{error_msg}\n\n"
574 f"CloudWatch Logs fallback also failed: {cw_err}\n"
575 f"Tip: Container logs appear in CloudWatch within a few minutes. "
576 f"If the job just finished, try again shortly."
577 ) from e
578 raise
580 def _get_cloudwatch_logs(
581 self,
582 job_name: str,
583 region: str,
584 tail_lines: int = 100,
585 since_hours: int = 24,
586 ) -> str:
587 """
588 Fetch job logs from CloudWatch Logs (Container Insights).
590 The CloudWatch Observability addon ships container stdout/stderr to:
591 /aws/containerinsights/{cluster_name}/application
593 Args:
594 job_name: Name of the job (used to filter log streams)
595 region: AWS region
596 tail_lines: Number of log lines to return
597 since_hours: Hours to look back (default 24)
599 Returns:
600 Log content as string
601 """
602 cluster_name = f"{self.config.project_name}-{region}"
603 log_group = f"/aws/containerinsights/{cluster_name}/application"
605 logs_client = self._aws_client._session.client("logs", region_name=region)
607 import time
609 now = int(time.time())
610 start_time = now - (since_hours * 3600)
612 query = (
613 f"fields @timestamp, @message "
614 f'| filter @logStream like "{job_name}" '
615 f"| sort @timestamp asc "
616 f"| limit {tail_lines}"
617 )
619 start_query = logs_client.start_query(
620 logGroupName=log_group,
621 startTime=start_time,
622 endTime=now,
623 queryString=query,
624 )
625 query_id = start_query["queryId"]
627 # Poll for results (CloudWatch Insights is async)
628 result = None
629 for _ in range(30): # up to 30 seconds
630 time.sleep(1)
631 result = logs_client.get_query_results(queryId=query_id)
632 if result["status"] in ("Complete", "Failed", "Cancelled"):
633 break
635 if result is None or result["status"] != "Complete":
636 status = result["status"] if result else "unknown"
637 raise RuntimeError(
638 f"CloudWatch Logs query did not complete (status: {status}). Try again in a moment."
639 )
641 if not result["results"]:
642 raise RuntimeError(
643 f"No logs found in CloudWatch for job '{job_name}' "
644 f"in the last {since_hours} hours (log group: {log_group}). "
645 f"Logs may take 1-2 minutes to appear after a pod runs. "
646 f"Use --since to search further back, or check the job name "
647 f"with: gco jobs list -r {region}"
648 )
650 # Extract log messages from results.
651 # CloudWatch Container Insights wraps logs in a JSON envelope:
652 # {"time":"...","stream":"stdout","log":"actual message","kubernetes":{...}}
653 # We parse out the "log" field for clean output, falling back to the
654 # raw message if it's not JSON.
655 import json as _json
657 lines = []
658 for row in result["results"]:
659 for entry in row: 659 ↛ 658line 659 didn't jump to line 658 because the loop on line 659 didn't complete
660 if entry["field"] == "@message":
661 raw = entry["value"].rstrip()
662 try:
663 parsed = _json.loads(raw)
664 lines.append(parsed.get("log", raw).rstrip())
665 except ValueError, TypeError:
666 lines.append(raw)
667 break
669 header = f"[CloudWatch Logs — {log_group}]\n"
670 return header + "\n".join(lines)
672 def delete_job(
673 self, job_name: str, namespace: str, region: str | None = None
674 ) -> dict[str, Any]:
675 """
676 Delete a job.
678 Args:
679 job_name: Name of the job
680 namespace: Namespace of the job
681 region: Region where the job is running
683 Returns:
684 Deletion result
685 """
686 return self._aws_client.delete_job(
687 job_name=job_name, namespace=namespace, region=region or self.config.default_region
688 )
690 def wait_for_job(
691 self,
692 job_name: str,
693 namespace: str,
694 region: str | None = None,
695 timeout_seconds: int = 3600,
696 poll_interval: int = 10,
697 progress_callback: Callable[[JobInfo, int], None] | None = None,
698 ) -> JobInfo:
699 """
700 Wait for a job to complete with progress reporting.
702 Args:
703 job_name: Name of the job
704 namespace: Namespace of the job
705 region: Region where the job is running
706 timeout_seconds: Maximum time to wait
707 poll_interval: Seconds between status checks
708 progress_callback: Optional callable(JobInfo, elapsed_seconds) for progress updates.
709 If None, a default stderr progress line is printed.
711 Returns:
712 Final JobInfo
714 Raises:
715 TimeoutError: If job doesn't complete within timeout
716 """
717 import sys
718 import time
720 start_time = time.time()
722 while True:
723 job = self.get_job(job_name, namespace, region)
725 if job is None:
726 raise ValueError(f"Job {job_name} not found in namespace {namespace}")
728 elapsed = time.time() - start_time
729 elapsed_str = _format_duration(int(elapsed))
731 if job.is_complete:
732 # Clear the progress line and return
733 sys.stderr.write("\r\033[K")
734 sys.stderr.flush()
735 return job
737 # Build progress message
738 pods_info = (
739 f"{job.active_pods} active, {job.succeeded_pods}/{job.completions} succeeded"
740 )
741 if job.failed_pods: 741 ↛ 742line 741 didn't jump to line 742 because the condition on line 741 was never true
742 pods_info += f", {job.failed_pods} failed"
744 status_line = f" ⏳ {job.status.capitalize()} — {pods_info} — {elapsed_str} elapsed"
746 if progress_callback: 746 ↛ 750line 746 didn't jump to line 750 because the condition on line 746 was always true
747 progress_callback(job, int(elapsed))
748 else:
749 # Overwrite the same line on stderr
750 sys.stderr.write(f"\r\033[K{status_line}")
751 sys.stderr.flush()
753 if elapsed >= timeout_seconds:
754 sys.stderr.write("\r\033[K")
755 sys.stderr.flush()
756 raise TimeoutError(
757 f"Job {job_name} did not complete within {timeout_seconds} seconds "
758 f"(last status: {job.status}, pods: {pods_info})"
759 )
761 time.sleep(poll_interval) # nosemgrep: arbitrary-sleep - intentional polling delay
763 def submit_job_sqs(
764 self,
765 manifests: str | list[dict[str, Any]],
766 region: str,
767 namespace: str | None = None,
768 labels: dict[str, str] | None = None,
769 priority: int = 0,
770 ) -> dict[str, Any]:
771 """
772 Submit a job to a regional SQS queue for processing.
774 This is the recommended way to submit jobs as it:
775 - Decouples submission from processing
776 - Enables KEDA-based autoscaling
777 - Provides better fault tolerance
779 Args:
780 manifests: Path to manifest file/directory or list of manifest dicts
781 region: Target region for job submission (required)
782 namespace: Fallback namespace for manifests that don't declare
783 their own. When set, each manifest's ``metadata.namespace`` is
784 filled in only if missing — existing values are preserved so
785 users who've declared a target namespace in the manifest can
786 rely on it reaching the queue processor untouched. Server-side
787 validation enforces the allowlist.
788 labels: Additional labels to add to manifests
789 priority: Job priority (higher = more important)
791 Returns:
792 Submission result dictionary with message_id and queue info
793 """
794 import json
795 import uuid
797 import boto3
799 # Load manifests if path provided
800 manifest_list = self.load_manifests(manifests) if isinstance(manifests, str) else manifests
802 # Apply namespace as a fallback only — preserve any namespace the
803 # manifest declared itself.
804 if namespace:
805 for manifest in manifest_list:
806 if "metadata" not in manifest:
807 manifest["metadata"] = {}
808 manifest["metadata"].setdefault("namespace", namespace)
810 # Apply additional labels
811 if labels:
812 for manifest in manifest_list:
813 if "metadata" not in manifest: 813 ↛ 814line 813 didn't jump to line 814 because the condition on line 813 was never true
814 manifest["metadata"] = {}
815 if "labels" not in manifest["metadata"]: 815 ↛ 817line 815 didn't jump to line 817 because the condition on line 815 was always true
816 manifest["metadata"]["labels"] = {}
817 manifest["metadata"]["labels"].update(labels)
819 # Get queue URL from stack
820 stack = self._aws_client.get_regional_stack(region)
821 if not stack:
822 raise ValueError(f"No GCO stack found in region {region}")
824 # Get queue URL from CloudFormation outputs
825 cfn = boto3.client("cloudformation", region_name=region)
826 response = cfn.describe_stacks(StackName=stack.stack_name)
827 outputs = {
828 o["OutputKey"]: o["OutputValue"] for o in response["Stacks"][0].get("Outputs", [])
829 }
830 queue_url = outputs.get("JobQueueUrl")
832 if not queue_url:
833 raise ValueError(f"Job queue not found in stack {stack.stack_name}")
835 # Create SQS message. The ``namespace`` field in the envelope is
836 # informational only — the queue processor reads each manifest's
837 # own ``metadata.namespace`` for validation and application. Report
838 # the first manifest's namespace here so the submission response
839 # matches reality when the user doesn't pass ``--namespace``.
840 job_id = str(uuid.uuid4())[:8]
841 envelope_namespace = namespace or _first_manifest_namespace(manifest_list) or "gco-jobs"
842 message_body = {
843 "job_id": job_id,
844 "manifests": manifest_list,
845 "namespace": envelope_namespace,
846 "priority": priority,
847 "submitted_at": datetime.now(UTC).isoformat(),
848 }
850 # Send to SQS
851 sqs = boto3.client("sqs", region_name=region)
852 response = sqs.send_message(
853 QueueUrl=queue_url,
854 MessageBody=json.dumps(message_body),
855 MessageAttributes={
856 "Priority": {"DataType": "Number", "StringValue": str(priority)},
857 "JobId": {"DataType": "String", "StringValue": job_id},
858 },
859 )
861 # Get job name from first manifest
862 job_name = None
863 for manifest in manifest_list: 863 ↛ 868line 863 didn't jump to line 868 because the loop on line 863 didn't complete
864 if manifest.get("kind") == "Job": 864 ↛ 863line 864 didn't jump to line 863 because the condition on line 864 was always true
865 job_name = manifest.get("metadata", {}).get("name")
866 break
868 return {
869 "status": "queued",
870 "method": "sqs",
871 "message_id": response["MessageId"],
872 "job_id": job_id,
873 "job_name": job_name,
874 "queue_url": queue_url,
875 "region": region,
876 "namespace": envelope_namespace,
877 "priority": priority,
878 }
880 def get_queue_status(self, region: str) -> dict[str, Any]:
881 """
882 Get the status of the job queue in a region.
884 Args:
885 region: AWS region
887 Returns:
888 Queue status including message counts
889 """
890 import boto3
892 stack = self._aws_client.get_regional_stack(region)
893 if not stack:
894 raise ValueError(f"No GCO stack found in region {region}")
896 # Get queue URLs from CloudFormation outputs
897 cfn = boto3.client("cloudformation", region_name=region)
898 response = cfn.describe_stacks(StackName=stack.stack_name)
899 outputs = {
900 o["OutputKey"]: o["OutputValue"] for o in response["Stacks"][0].get("Outputs", [])
901 }
903 queue_url = outputs.get("JobQueueUrl")
904 dlq_url = outputs.get("JobDlqUrl")
906 if not queue_url:
907 raise ValueError(f"Job queue not found in stack {stack.stack_name}")
909 sqs = boto3.client("sqs", region_name=region)
911 # Get main queue attributes
912 queue_attrs = sqs.get_queue_attributes(
913 QueueUrl=queue_url,
914 AttributeNames=[
915 "ApproximateNumberOfMessages",
916 "ApproximateNumberOfMessagesNotVisible",
917 "ApproximateNumberOfMessagesDelayed",
918 ],
919 )["Attributes"]
921 result = {
922 "region": region,
923 "queue_url": queue_url,
924 "messages_available": int(queue_attrs.get("ApproximateNumberOfMessages", 0)),
925 "messages_in_flight": int(queue_attrs.get("ApproximateNumberOfMessagesNotVisible", 0)),
926 "messages_delayed": int(queue_attrs.get("ApproximateNumberOfMessagesDelayed", 0)),
927 }
929 # Get DLQ attributes if available
930 if dlq_url: 930 ↛ 938line 930 didn't jump to line 938 because the condition on line 930 was always true
931 dlq_attrs = sqs.get_queue_attributes(
932 QueueUrl=dlq_url,
933 AttributeNames=["ApproximateNumberOfMessages"],
934 )["Attributes"]
935 result["dlq_url"] = dlq_url
936 result["dlq_messages"] = int(dlq_attrs.get("ApproximateNumberOfMessages", 0))
938 return result
940 def list_jobs_global(
941 self,
942 namespace: str | None = None,
943 status: str | None = None,
944 limit: int = 50,
945 ) -> dict[str, Any]:
946 """
947 List jobs across all regions via the global API endpoint.
949 This uses the cross-region aggregator Lambda to query all regional
950 clusters in parallel and return a unified view.
952 Args:
953 namespace: Filter by namespace
954 status: Filter by status
955 limit: Maximum jobs to return
957 Returns:
958 Aggregated job list with region information
959 """
960 return self._aws_client.get_global_jobs(
961 namespace=namespace,
962 status=status,
963 limit=limit,
964 )
966 def get_global_health(self) -> dict[str, Any]:
967 """
968 Get health status across all regions.
970 Returns:
971 Aggregated health status from all regional clusters
972 """
973 return self._aws_client.get_global_health()
975 def get_global_status(self) -> dict[str, Any]:
976 """
977 Get cluster status across all regions.
979 Returns:
980 Aggregated status from all regional clusters
981 """
982 return self._aws_client.get_global_status()
984 def bulk_delete_global(
985 self,
986 namespace: str | None = None,
987 status: str | None = None,
988 older_than_days: int | None = None,
989 dry_run: bool = True,
990 ) -> dict[str, Any]:
991 """
992 Bulk delete jobs across all regions.
994 Args:
995 namespace: Filter by namespace
996 status: Filter by status
997 older_than_days: Delete jobs older than N days
998 dry_run: If True, only return what would be deleted
1000 Returns:
1001 Deletion results from all regions
1002 """
1003 return self._aws_client.bulk_delete_global(
1004 namespace=namespace,
1005 status=status,
1006 older_than_days=older_than_days,
1007 dry_run=dry_run,
1008 )
1010 def get_job_events(
1011 self,
1012 job_name: str,
1013 namespace: str,
1014 region: str | None = None,
1015 ) -> dict[str, Any]:
1016 """
1017 Get Kubernetes events for a job.
1019 Args:
1020 job_name: Name of the job
1021 namespace: Namespace of the job
1022 region: Region where the job is running
1024 Returns:
1025 Events related to the job
1026 """
1027 return self._aws_client.get_job_events(
1028 job_name=job_name,
1029 namespace=namespace,
1030 region=region or self.config.default_region,
1031 )
1033 def get_job_pods(
1034 self,
1035 job_name: str,
1036 namespace: str,
1037 region: str | None = None,
1038 ) -> dict[str, Any]:
1039 """
1040 Get pods for a job.
1042 Args:
1043 job_name: Name of the job
1044 namespace: Namespace of the job
1045 region: Region where the job is running
1047 Returns:
1048 Pod details for the job
1049 """
1050 return self._aws_client.get_job_pods(
1051 job_name=job_name,
1052 namespace=namespace,
1053 region=region or self.config.default_region,
1054 )
1056 def get_pod_logs(
1057 self,
1058 job_name: str,
1059 pod_name: str,
1060 namespace: str,
1061 region: str | None = None,
1062 tail_lines: int = 100,
1063 container: str | None = None,
1064 ) -> dict[str, Any]:
1065 """
1066 Get logs from a specific pod of a job.
1068 Args:
1069 job_name: Name of the job
1070 pod_name: Name of the pod
1071 namespace: Namespace of the job
1072 region: Region where the job is running
1073 tail_lines: Number of lines to return
1074 container: Container name (for multi-container pods)
1076 Returns:
1077 Pod logs response
1078 """
1079 return self._aws_client.get_pod_logs(
1080 job_name=job_name,
1081 pod_name=pod_name,
1082 namespace=namespace,
1083 region=region or self.config.default_region,
1084 tail_lines=tail_lines,
1085 container=container,
1086 )
1088 def get_job_metrics(
1089 self,
1090 job_name: str,
1091 namespace: str,
1092 region: str | None = None,
1093 ) -> dict[str, Any]:
1094 """
1095 Get resource metrics for a job.
1097 Args:
1098 job_name: Name of the job
1099 namespace: Namespace of the job
1100 region: Region where the job is running
1102 Returns:
1103 Resource usage metrics for the job's pods
1104 """
1105 return self._aws_client.get_job_metrics(
1106 job_name=job_name,
1107 namespace=namespace,
1108 region=region or self.config.default_region,
1109 )
1111 def retry_job(
1112 self,
1113 job_name: str,
1114 namespace: str,
1115 region: str | None = None,
1116 ) -> dict[str, Any]:
1117 """
1118 Retry a failed job.
1120 Creates a new job from the failed job's spec with a new name.
1122 Args:
1123 job_name: Name of the failed job
1124 namespace: Namespace of the job
1125 region: Region where the job is running
1127 Returns:
1128 Result with new job name
1129 """
1130 return self._aws_client.retry_job(
1131 job_name=job_name,
1132 namespace=namespace,
1133 region=region or self.config.default_region,
1134 )
1136 def bulk_delete_jobs(
1137 self,
1138 namespace: str | None = None,
1139 status: str | None = None,
1140 older_than_days: int | None = None,
1141 label_selector: str | None = None,
1142 region: str | None = None,
1143 dry_run: bool = True,
1144 ) -> dict[str, Any]:
1145 """
1146 Bulk delete jobs in a region.
1148 Args:
1149 namespace: Filter by namespace
1150 status: Filter by status
1151 older_than_days: Delete jobs older than N days
1152 label_selector: Kubernetes label selector
1153 region: Target region
1154 dry_run: If True, only return what would be deleted
1156 Returns:
1157 Deletion results
1158 """
1159 return self._aws_client.bulk_delete_jobs(
1160 namespace=namespace,
1161 status=status,
1162 older_than_days=older_than_days,
1163 label_selector=label_selector,
1164 region=region or self.config.default_region,
1165 dry_run=dry_run,
1166 )
1169def get_job_manager(config: GCOConfig | None = None) -> JobManager:
1170 """Get a configured job manager instance."""
1171 return JobManager(config)