Coverage for cli / aws_client.py: 94%
349 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-30 21:47 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-30 21:47 +0000
1"""
2AWS Client utilities for GCO CLI.
4Provides authenticated access to AWS services with SigV4 signing,
5stack discovery, and region management.
6"""
8import json
9import logging
10import time
11from dataclasses import dataclass
12from datetime import datetime
13from typing import Any
14from urllib.parse import quote
16import boto3
17import requests
18from botocore.auth import SigV4Auth
19from botocore.awsrequest import AWSRequest
21from .config import GCOConfig, get_config
23logger = logging.getLogger(__name__)
25# HTTP status codes that are safe to retry (transient failures)
26_RETRYABLE_STATUS_CODES = {429, 502, 503, 504}
27_MAX_RETRIES = 3
28_RETRY_BACKOFF_BASE = 1.0 # seconds
31@dataclass
32class RegionalStack:
33 """Information about a regional GCO stack."""
35 region: str
36 stack_name: str
37 cluster_name: str
38 status: str
39 api_endpoint: str | None = None
40 efs_file_system_id: str | None = None
41 fsx_file_system_id: str | None = None
42 created_time: datetime | None = None
45@dataclass
46class ApiEndpoint:
47 """API Gateway endpoint information."""
49 url: str
50 region: str
51 api_id: str
52 is_regional: bool = False # True if this is a regional API (for private access)
55class GCOAWSClient:
56 """
57 AWS client for GCO operations.
59 Handles:
60 - Stack discovery across regions
61 - Authenticated API requests with SigV4
62 - CloudFormation stack queries
63 - EKS cluster information
64 """
66 def __init__(self, config: GCOConfig | None = None):
67 self.config = config or get_config()
68 self._session = boto3.Session()
69 self._api_endpoint_cache: ApiEndpoint | None = None
70 self._regional_api_cache: dict[str, ApiEndpoint] = {}
71 self._regional_stacks_cache: dict[str, RegionalStack] | None = None
72 self._cache_timestamp: float | None = None
73 self._use_regional_api: bool = False # Set to True to use regional APIs
75 def _is_cache_valid(self) -> bool:
76 """Check if cache is still valid."""
77 if self._cache_timestamp is None:
78 return False
79 return (time.time() - self._cache_timestamp) < self.config.cache_ttl_seconds
81 def _invalidate_cache(self) -> None:
82 """Invalidate all caches."""
83 self._api_endpoint_cache = None
84 self._regional_api_cache = {}
85 self._regional_stacks_cache = None
86 self._cache_timestamp = None
88 def set_use_regional_api(self, use_regional: bool) -> None:
89 """Set whether to use regional APIs instead of global API.
91 When enabled, API calls will be routed through regional API Gateways
92 that use VPC Lambdas to access internal ALBs. This is required when
93 public access is disabled.
95 Args:
96 use_regional: True to use regional APIs, False for global API
97 """
98 self._use_regional_api = use_regional
100 def get_regional_api_endpoint(
101 self, region: str, force_refresh: bool = False
102 ) -> ApiEndpoint | None:
103 """
104 Get the regional API Gateway endpoint for a specific region.
106 Regional APIs are used when public access is disabled and the ALB
107 is internal-only.
109 Args:
110 region: AWS region
111 force_refresh: Force refresh from CloudFormation
113 Returns:
114 ApiEndpoint with URL and metadata, or None if not found
115 """
116 if not force_refresh and region in self._regional_api_cache and self._is_cache_valid():
117 return self._regional_api_cache[region]
119 cfn = self._session.client("cloudformation", region_name=region)
120 stack_name = f"{self.config.project_name}-regional-api-{region}"
122 try:
123 response = cfn.describe_stacks(StackName=stack_name)
124 stack = response["Stacks"][0]
126 api_url = None
127 for output in stack.get("Outputs", []):
128 if output["OutputKey"] == "RegionalApiEndpoint":
129 api_url = output["OutputValue"].rstrip("/")
130 break
132 if not api_url:
133 return None
135 # Extract API ID from URL
136 api_id = api_url.split(".")[0].replace("https://", "")
138 endpoint = ApiEndpoint(url=api_url, region=region, api_id=api_id, is_regional=True)
139 self._regional_api_cache[region] = endpoint
140 return endpoint
142 except cfn.exceptions.ClientError:
143 # Stack doesn't exist
144 return None
145 except Exception as e:
146 logger.debug("Failed to get regional API endpoint for %s: %s", region, e)
147 return None
149 def get_api_endpoint(self, force_refresh: bool = False) -> ApiEndpoint:
150 """
151 Get the global API Gateway endpoint.
153 Args:
154 force_refresh: Force refresh from CloudFormation
156 Returns:
157 ApiEndpoint with URL and metadata
158 """
159 if not force_refresh and self._api_endpoint_cache and self._is_cache_valid():
160 return self._api_endpoint_cache
162 cfn = self._session.client("cloudformation", region_name=self.config.api_gateway_region)
164 try:
165 response = cfn.describe_stacks(StackName=self.config.api_gateway_stack_name)
166 stack = response["Stacks"][0]
168 api_url = None
169 for output in stack.get("Outputs", []):
170 if output["OutputKey"] == "ApiEndpoint": 170 ↛ 169line 170 didn't jump to line 169 because the condition on line 170 was always true
171 api_url = output["OutputValue"].rstrip("/")
172 break
174 if not api_url:
175 raise ValueError(
176 f"ApiEndpoint not found in stack {self.config.api_gateway_stack_name}"
177 )
179 # Extract API ID from URL
180 # Format: https://{api-id}.execute-api.{region}.amazonaws.com/prod
181 api_id = api_url.split(".")[0].replace("https://", "")
183 self._api_endpoint_cache = ApiEndpoint(
184 url=api_url, region=self.config.api_gateway_region, api_id=api_id
185 )
186 self._cache_timestamp = time.time()
188 return self._api_endpoint_cache
190 except Exception as e:
191 raise RuntimeError(f"Failed to get API endpoint: {e}") from e
193 def discover_regional_stacks(self, force_refresh: bool = False) -> dict[str, RegionalStack]:
194 """
195 Discover all regional GCO stacks.
197 Checks configured regions from cdk.json first for fast discovery,
198 then falls back to scanning all AWS regions if no stacks are found.
200 Args:
201 force_refresh: Force refresh from CloudFormation
203 Returns:
204 Dictionary mapping region to RegionalStack
205 """
206 if not force_refresh and self._regional_stacks_cache and self._is_cache_valid():
207 return self._regional_stacks_cache
209 regional_stacks: dict[str, RegionalStack] = {}
211 # Try configured regions first (fast path)
212 configured_regions = self._get_configured_regions()
213 if configured_regions: 213 ↛ 220line 213 didn't jump to line 220 because the condition on line 213 was always true
214 for region in configured_regions:
215 stack = self._probe_regional_stack(region)
216 if stack:
217 regional_stacks[region] = stack
219 # If we found stacks in configured regions, skip the full scan
220 if not regional_stacks:
221 # Fall back to scanning all regions
222 logger.debug("No stacks found in configured regions, scanning all AWS regions")
223 ec2 = self._session.client("ec2", region_name="us-east-1")
224 regions_response = ec2.describe_regions()
225 all_regions = [r["RegionName"] for r in regions_response["Regions"]]
227 for region in all_regions:
228 if region in configured_regions:
229 continue # Already checked
230 stack = self._probe_regional_stack(region)
231 if stack: 231 ↛ 227line 231 didn't jump to line 227 because the condition on line 231 was always true
232 regional_stacks[region] = stack
234 self._regional_stacks_cache = regional_stacks
235 self._cache_timestamp = time.time()
237 return regional_stacks
239 def _get_configured_regions(self) -> list[str]:
240 """Get the list of configured deployment regions from cdk.json."""
241 from .config import _load_cdk_json
243 cdk_regions = _load_cdk_json()
244 regions: list[str] = cdk_regions.get("regional", [])
245 return regions
247 def _probe_regional_stack(self, region: str) -> RegionalStack | None:
248 """Probe a single region for a GCO regional stack.
250 Args:
251 region: AWS region to check
253 Returns:
254 RegionalStack if found, None otherwise
255 """
256 try:
257 cfn = self._session.client("cloudformation", region_name=region)
258 stack_name = f"{self.config.regional_stack_prefix}-{region}"
260 try:
261 response = cfn.describe_stacks(StackName=stack_name)
262 stack = response["Stacks"][0]
264 outputs = {o["OutputKey"]: o["OutputValue"] for o in stack.get("Outputs", [])}
266 return RegionalStack(
267 region=region,
268 stack_name=stack_name,
269 cluster_name=outputs.get("ClusterName", f"{self.config.project_name}-{region}"),
270 status=stack["StackStatus"],
271 efs_file_system_id=outputs.get("EfsFileSystemId"),
272 fsx_file_system_id=outputs.get("FsxFileSystemId"),
273 created_time=stack.get("CreationTime"),
274 )
275 except cfn.exceptions.ClientError:
276 return None
278 except Exception as e:
279 logger.debug("Failed to get regional stack info for %s: %s", region, e)
280 return None
282 def get_regional_stack(self, region: str) -> RegionalStack | None:
283 """Get information about a specific regional stack."""
284 stacks = self.discover_regional_stacks()
285 return stacks.get(region)
287 def call_api(
288 self,
289 method: str,
290 path: str,
291 region: str | None = None,
292 body: dict[str, Any] | None = None,
293 params: dict[str, str] | None = None,
294 ) -> dict[str, Any]:
295 """
296 Make an API call and return the JSON response.
298 This is a convenience wrapper around make_authenticated_request.
300 Args:
301 method: HTTP method (GET, POST, DELETE, etc.)
302 path: API path (e.g., /api/v1/templates)
303 region: Target region for the request
304 body: Request body (will be JSON encoded)
305 params: Query parameters
307 Returns:
308 JSON response as dictionary
310 Raises:
311 RuntimeError: If the request fails with a descriptive error message
312 """
313 # Add URL-encoded query parameters to path
314 if params:
315 encoded_pairs = [
316 f"{quote(str(k), safe='')}={quote(str(v), safe='')}"
317 for k, v in params.items()
318 if v is not None
319 ]
320 if encoded_pairs: 320 ↛ 323line 320 didn't jump to line 323 because the condition on line 320 was always true
321 path = f"{path}?{'&'.join(encoded_pairs)}"
323 response = self.make_authenticated_request(
324 method=method,
325 path=path,
326 body=body,
327 target_region=region,
328 )
330 if not response.ok:
331 error_msg = f"{response.status_code} {response.reason}"
332 try:
333 error_data = response.json()
334 if "error" in error_data:
335 error_msg = error_data["error"]
336 elif "message" in error_data:
337 error_msg = error_data["message"]
338 elif "detail" in error_data: 338 ↛ 342line 338 didn't jump to line 342 because the condition on line 338 was always true
339 error_msg = error_data["detail"]
340 except json.JSONDecodeError, KeyError:
341 error_msg = response.text or error_msg
342 raise RuntimeError(f"API request failed: {error_msg}")
344 result: dict[str, Any] = response.json()
345 return result
347 def make_authenticated_request(
348 self,
349 method: str,
350 path: str,
351 body: dict[str, Any] | None = None,
352 headers: dict[str, str] | None = None,
353 target_region: str | None = None,
354 ) -> requests.Response:
355 """
356 Make an authenticated request to the GCO API.
358 If use_regional_api is enabled and a target_region is specified,
359 the request will be routed through the regional API Gateway.
360 Otherwise, it uses the global API Gateway.
362 Args:
363 method: HTTP method (GET, POST, etc.)
364 path: API path (e.g., /api/v1/manifests)
365 body: Request body (will be JSON encoded)
366 headers: Additional headers
367 target_region: Target region for the request (added as header)
369 Returns:
370 requests.Response object
371 """
372 # Determine which endpoint to use
373 if self._use_regional_api and target_region:
374 regional_endpoint = self.get_regional_api_endpoint(target_region)
375 # Fall back to global API if regional not available
376 endpoint = regional_endpoint or self.get_api_endpoint()
377 else:
378 endpoint = self.get_api_endpoint()
380 url = f"{endpoint.url}{path}"
382 # Prepare headers
383 request_headers = headers or {}
384 request_headers["Content-Type"] = "application/json"
386 # Add target region header if specified (for global API routing)
387 if target_region and not endpoint.is_regional:
388 request_headers["X-GCO-Target-Region"] = target_region
390 # Prepare body
391 body_str = json.dumps(body) if body else ""
393 # Create AWS request for signing
394 aws_request = AWSRequest(method=method, url=url, headers=request_headers, data=body_str)
396 # Sign the request with the endpoint's region
397 credentials = self._session.get_credentials()
398 if credentials is None:
399 raise RuntimeError(
400 "No AWS credentials found. Configure credentials via environment variables, "
401 "~/.aws/credentials, IAM role, or SSO (aws sso login)."
402 )
403 SigV4Auth(credentials, "execute-api", endpoint.region).add_auth(aws_request)
405 # Make the request with retry for transient failures
406 # Also retry 403 once with refreshed credentials (handles expired tokens from
407 # SSO, assumed roles, or instance metadata that rotated mid-request)
408 last_response = None
409 _retried_auth = False
410 for attempt in range(_MAX_RETRIES):
411 response = requests.request(
412 method=method,
413 url=url,
414 headers=dict(aws_request.headers),
415 data=body_str,
416 timeout=30,
417 )
418 last_response = response
420 # 403 may mean expired SigV4 signature — retry once with fresh credentials
421 if response.status_code == 403 and not _retried_auth:
422 _retried_auth = True
423 logger.warning(
424 "Request to %s returned 403, refreshing credentials and retrying",
425 path,
426 )
427 # Force a new session to pick up refreshed credentials
428 self._session = boto3.Session()
429 aws_request = AWSRequest(
430 method=method, url=url, headers=request_headers, data=body_str
431 )
432 credentials = self._session.get_credentials()
433 if credentials is None: 433 ↛ 434line 433 didn't jump to line 434 because the condition on line 433 was never true
434 return response # No credentials available, return the 403
435 SigV4Auth(credentials, "execute-api", endpoint.region).add_auth(aws_request)
436 continue
438 if response.status_code not in _RETRYABLE_STATUS_CODES:
439 return response
441 # Retryable error — back off and retry
442 if attempt < _MAX_RETRIES - 1:
443 wait_time = _RETRY_BACKOFF_BASE * (2**attempt)
444 logger.warning(
445 "Request to %s returned %d, retrying in %.1fs (attempt %d/%d)",
446 path,
447 response.status_code,
448 wait_time,
449 attempt + 1,
450 _MAX_RETRIES,
451 )
452 time.sleep(wait_time)
454 # Re-sign the request for the retry (credentials/time may have changed)
455 aws_request = AWSRequest(
456 method=method, url=url, headers=request_headers, data=body_str
457 )
458 credentials = self._session.get_credentials()
459 if credentials is None: 459 ↛ 460line 459 didn't jump to line 460 because the condition on line 459 was never true
460 return last_response
461 SigV4Auth(credentials, "execute-api", endpoint.region).add_auth(aws_request)
463 # All retries exhausted — return the last response
464 return last_response # type: ignore[return-value]
466 def submit_manifests(
467 self,
468 manifests: list[dict[str, Any]],
469 namespace: str | None = None,
470 target_region: str | None = None,
471 dry_run: bool = False,
472 ) -> dict[str, Any]:
473 """
474 Submit manifests to the GCO API.
476 Args:
477 manifests: List of Kubernetes manifest dictionaries
478 namespace: Default namespace for manifests
479 target_region: Target region for job execution
480 dry_run: If True, validate without applying
482 Returns:
483 API response dictionary
485 Raises:
486 RuntimeError: If submission fails with descriptive error message
487 """
488 body = {"manifests": manifests, "dry_run": dry_run}
490 if namespace: 490 ↛ 493line 490 didn't jump to line 493 because the condition on line 490 was always true
491 body["namespace"] = namespace
493 response = self.make_authenticated_request(
494 method="POST", path="/api/v1/manifests", body=body, target_region=target_region
495 )
497 # Parse response and provide descriptive error messages
498 if not response.ok:
499 error_msg = f"{response.status_code} {response.reason}"
500 try:
501 error_data = response.json()
502 # Extract meaningful error details from the response
503 if "resources" in error_data:
504 failed = [r for r in error_data["resources"] if r.get("status") == "failed"]
505 if failed: 505 ↛ 516line 505 didn't jump to line 516 because the condition on line 505 was always true
506 messages = [
507 f"{r.get('name')}: {r.get('message', 'Unknown error')}" for r in failed
508 ]
509 error_msg = "; ".join(messages)
510 elif "error" in error_data: 510 ↛ 511line 510 didn't jump to line 511 because the condition on line 510 was never true
511 error_msg = error_data["error"]
512 elif "message" in error_data: 512 ↛ 516line 512 didn't jump to line 516 because the condition on line 512 was always true
513 error_msg = error_data["message"]
514 except json.JSONDecodeError, KeyError:
515 error_msg = response.text or error_msg
516 raise RuntimeError(error_msg)
518 result: dict[str, Any] = response.json()
519 return result
521 def get_jobs(
522 self,
523 region: str | None = None,
524 namespace: str | None = None,
525 status: str | None = None,
526 ) -> list[dict[str, Any]]:
527 """
528 Get jobs from GCO clusters.
530 Args:
531 region: Specific region to query (None for all regions)
532 namespace: Filter by namespace
533 status: Filter by status (running, completed, failed)
535 Returns:
536 List of job information dictionaries
537 """
538 params = []
539 if namespace: 539 ↛ 541line 539 didn't jump to line 541 because the condition on line 539 was always true
540 params.append(f"namespace={namespace}")
541 if status:
542 params.append(f"status={status}")
544 query_string = f"?{'&'.join(params)}" if params else ""
546 response = self.make_authenticated_request(
547 method="GET", path=f"/api/v1/jobs{query_string}", target_region=region
548 )
550 response.raise_for_status()
551 result: list[dict[str, Any]] = response.json()
552 return result
554 def get_job_details(
555 self, job_name: str, namespace: str, region: str | None = None
556 ) -> dict[str, Any]:
557 """
558 Get detailed information about a specific job.
560 Args:
561 job_name: Name of the job
562 namespace: Namespace of the job
563 region: Region where the job is running
565 Returns:
566 Job details dictionary
567 """
568 response = self.make_authenticated_request(
569 method="GET", path=f"/api/v1/jobs/{namespace}/{job_name}", target_region=region
570 )
572 response.raise_for_status()
573 result: dict[str, Any] = response.json()
574 return result
576 def get_job_logs(
577 self, job_name: str, namespace: str, region: str | None = None, tail_lines: int = 100
578 ) -> str:
579 """
580 Get logs from a job.
582 Args:
583 job_name: Name of the job
584 namespace: Namespace of the job
585 region: Region where the job is running
586 tail_lines: Number of lines to return from the end
588 Returns:
589 Log content as string
590 """
591 response = self.make_authenticated_request(
592 method="GET",
593 path=f"/api/v1/jobs/{namespace}/{job_name}/logs?tail={tail_lines}",
594 target_region=region,
595 )
597 if not response.ok:
598 # Try to extract a useful error message from the response body
599 try:
600 error_data = response.json()
601 detail = error_data.get("detail", response.reason)
602 except Exception:
603 detail = response.text or response.reason
604 raise RuntimeError(detail)
606 return str(response.json().get("logs", ""))
608 def delete_job(
609 self, job_name: str, namespace: str, region: str | None = None
610 ) -> dict[str, Any]:
611 """
612 Delete a job.
614 Args:
615 job_name: Name of the job
616 namespace: Namespace of the job
617 region: Region where the job is running
619 Returns:
620 Deletion result dictionary
621 """
622 response = self.make_authenticated_request(
623 method="DELETE", path=f"/api/v1/jobs/{namespace}/{job_name}", target_region=region
624 )
626 response.raise_for_status()
627 result: dict[str, Any] = response.json()
628 return result
630 def get_regional_alb_endpoint(self, region: str) -> str | None:
631 """
632 Get the ALB endpoint for a specific region.
634 Args:
635 region: AWS region
637 Returns:
638 ALB DNS name or None if not found
639 """
640 stack = self.get_regional_stack(region)
641 if not stack: 641 ↛ 642line 641 didn't jump to line 642 because the condition on line 641 was never true
642 return None
644 cfn = self._session.client("cloudformation", region_name=region)
645 try:
646 response = cfn.describe_stacks(StackName=stack.stack_name)
647 stack_data = response["Stacks"][0]
648 outputs = {o["OutputKey"]: o["OutputValue"] for o in stack_data.get("Outputs", [])}
649 return outputs.get("AlbDnsName") or outputs.get("LoadBalancerDnsName")
650 except Exception as e:
651 logger.debug("Failed to get ALB DNS for %s: %s", region, e)
652 return None
654 # =========================================================================
655 # Global Aggregation Methods (Cross-Region)
656 # =========================================================================
658 def get_global_jobs(
659 self,
660 namespace: str | None = None,
661 status: str | None = None,
662 limit: int = 50,
663 ) -> dict[str, Any]:
664 """
665 Get jobs across all regions via the global aggregation API.
667 Args:
668 namespace: Filter by namespace
669 status: Filter by status
670 limit: Maximum jobs to return
672 Returns:
673 Aggregated job list with region information
674 """
675 params = [f"limit={limit}"]
676 if namespace: 676 ↛ 678line 676 didn't jump to line 678 because the condition on line 676 was always true
677 params.append(f"namespace={namespace}")
678 if status: 678 ↛ 681line 678 didn't jump to line 681 because the condition on line 678 was always true
679 params.append(f"status={status}")
681 query_string = f"?{'&'.join(params)}"
683 response = self.make_authenticated_request(
684 method="GET", path=f"/api/v1/global/jobs{query_string}"
685 )
687 response.raise_for_status()
688 result: dict[str, Any] = response.json()
689 return result
691 def get_global_health(self) -> dict[str, Any]:
692 """
693 Get health status across all regions.
695 Returns:
696 Aggregated health status from all regional clusters
697 """
698 response = self.make_authenticated_request(method="GET", path="/api/v1/global/health")
700 response.raise_for_status()
701 result: dict[str, Any] = response.json()
702 return result
704 def get_global_status(self) -> dict[str, Any]:
705 """
706 Get cluster status across all regions.
708 Returns:
709 Aggregated status from all regional clusters
710 """
711 response = self.make_authenticated_request(method="GET", path="/api/v1/global/status")
713 response.raise_for_status()
714 result: dict[str, Any] = response.json()
715 return result
717 def bulk_delete_global(
718 self,
719 namespace: str | None = None,
720 status: str | None = None,
721 older_than_days: int | None = None,
722 dry_run: bool = True,
723 ) -> dict[str, Any]:
724 """
725 Bulk delete jobs across all regions.
727 Args:
728 namespace: Filter by namespace
729 status: Filter by status
730 older_than_days: Delete jobs older than N days
731 dry_run: If True, only return what would be deleted
733 Returns:
734 Deletion results from all regions
735 """
736 body: dict[str, Any] = {"dry_run": dry_run}
737 if namespace:
738 body["namespace"] = namespace
739 if status: 739 ↛ 741line 739 didn't jump to line 741 because the condition on line 739 was always true
740 body["status"] = status
741 if older_than_days:
742 body["older_than_days"] = older_than_days
744 response = self.make_authenticated_request(
745 method="DELETE", path="/api/v1/global/jobs", body=body
746 )
748 response.raise_for_status()
749 result: dict[str, Any] = response.json()
750 return result
752 # =========================================================================
753 # Regional Job Operations (New API Endpoints)
754 # =========================================================================
756 def get_job_events(self, job_name: str, namespace: str, region: str) -> dict[str, Any]:
757 """
758 Get Kubernetes events for a job.
760 Args:
761 job_name: Name of the job
762 namespace: Namespace of the job
763 region: Region where the job is running
765 Returns:
766 Events related to the job
767 """
768 response = self.make_authenticated_request(
769 method="GET",
770 path=f"/api/v1/jobs/{namespace}/{job_name}/events",
771 target_region=region,
772 )
774 response.raise_for_status()
775 result: dict[str, Any] = response.json()
776 return result
778 def get_job_pods(self, job_name: str, namespace: str, region: str) -> dict[str, Any]:
779 """
780 Get pods for a job.
782 Args:
783 job_name: Name of the job
784 namespace: Namespace of the job
785 region: Region where the job is running
787 Returns:
788 Pod details for the job
789 """
790 response = self.make_authenticated_request(
791 method="GET",
792 path=f"/api/v1/jobs/{namespace}/{job_name}/pods",
793 target_region=region,
794 )
796 response.raise_for_status()
797 result: dict[str, Any] = response.json()
798 return result
800 def get_pod_logs(
801 self,
802 job_name: str,
803 pod_name: str,
804 namespace: str,
805 region: str,
806 tail_lines: int = 100,
807 container: str | None = None,
808 ) -> dict[str, Any]:
809 """
810 Get logs from a specific pod of a job.
812 Args:
813 job_name: Name of the job
814 pod_name: Name of the pod
815 namespace: Namespace of the job
816 region: Region where the job is running
817 tail_lines: Number of lines to return from the end
818 container: Container name (for multi-container pods)
820 Returns:
821 Pod logs response
822 """
823 params = [f"tail={tail_lines}"]
824 if container:
825 params.append(f"container={container}")
827 query_string = f"?{'&'.join(params)}"
829 response = self.make_authenticated_request(
830 method="GET",
831 path=f"/api/v1/jobs/{namespace}/{job_name}/pods/{pod_name}/logs{query_string}",
832 target_region=region,
833 )
835 response.raise_for_status()
836 result: dict[str, Any] = response.json()
837 return result
839 def get_job_metrics(self, job_name: str, namespace: str, region: str) -> dict[str, Any]:
840 """
841 Get resource metrics for a job.
843 Args:
844 job_name: Name of the job
845 namespace: Namespace of the job
846 region: Region where the job is running
848 Returns:
849 Resource usage metrics for the job's pods
850 """
851 response = self.make_authenticated_request(
852 method="GET",
853 path=f"/api/v1/jobs/{namespace}/{job_name}/metrics",
854 target_region=region,
855 )
857 response.raise_for_status()
858 result: dict[str, Any] = response.json()
859 return result
861 def retry_job(self, job_name: str, namespace: str, region: str) -> dict[str, Any]:
862 """
863 Retry a failed job.
865 Creates a new job from the failed job's spec with a new name.
867 Args:
868 job_name: Name of the failed job
869 namespace: Namespace of the job
870 region: Region where the job is running
872 Returns:
873 Result with new job name
874 """
875 response = self.make_authenticated_request(
876 method="POST",
877 path=f"/api/v1/jobs/{namespace}/{job_name}/retry",
878 target_region=region,
879 )
881 response.raise_for_status()
882 result: dict[str, Any] = response.json()
883 return result
885 def bulk_delete_jobs(
886 self,
887 namespace: str | None = None,
888 status: str | None = None,
889 older_than_days: int | None = None,
890 label_selector: str | None = None,
891 region: str | None = None,
892 dry_run: bool = True,
893 ) -> dict[str, Any]:
894 """
895 Bulk delete jobs in a region.
897 Args:
898 namespace: Filter by namespace
899 status: Filter by status
900 older_than_days: Delete jobs older than N days
901 label_selector: Kubernetes label selector
902 region: Target region
903 dry_run: If True, only return what would be deleted
905 Returns:
906 Deletion results
907 """
908 body: dict[str, Any] = {"dry_run": dry_run}
909 if namespace:
910 body["namespace"] = namespace
911 if status:
912 body["status"] = status
913 if older_than_days:
914 body["older_than_days"] = older_than_days
915 if label_selector:
916 body["label_selector"] = label_selector
918 response = self.make_authenticated_request(
919 method="DELETE", path="/api/v1/jobs", body=body, target_region=region
920 )
922 response.raise_for_status()
923 result: dict[str, Any] = response.json()
924 return result
926 def get_health(self, region: str) -> dict[str, Any]:
927 """
928 Get health status for a specific region.
930 Args:
931 region: Target region
933 Returns:
934 Health status for the regional cluster
935 """
936 response = self.make_authenticated_request(
937 method="GET", path="/api/v1/health", target_region=region
938 )
940 response.raise_for_status()
941 result: dict[str, Any] = response.json()
942 return result
945def get_aws_client(config: GCOConfig | None = None) -> GCOAWSClient:
946 """Get a configured AWS client instance."""
947 return GCOAWSClient(config)