Coverage for cli / aws_client.py: 94%

349 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-30 21:47 +0000

1""" 

2AWS Client utilities for GCO CLI. 

3 

4Provides authenticated access to AWS services with SigV4 signing, 

5stack discovery, and region management. 

6""" 

7 

8import json 

9import logging 

10import time 

11from dataclasses import dataclass 

12from datetime import datetime 

13from typing import Any 

14from urllib.parse import quote 

15 

16import boto3 

17import requests 

18from botocore.auth import SigV4Auth 

19from botocore.awsrequest import AWSRequest 

20 

21from .config import GCOConfig, get_config 

22 

23logger = logging.getLogger(__name__) 

24 

25# HTTP status codes that are safe to retry (transient failures) 

26_RETRYABLE_STATUS_CODES = {429, 502, 503, 504} 

27_MAX_RETRIES = 3 

28_RETRY_BACKOFF_BASE = 1.0 # seconds 

29 

30 

31@dataclass 

32class RegionalStack: 

33 """Information about a regional GCO stack.""" 

34 

35 region: str 

36 stack_name: str 

37 cluster_name: str 

38 status: str 

39 api_endpoint: str | None = None 

40 efs_file_system_id: str | None = None 

41 fsx_file_system_id: str | None = None 

42 created_time: datetime | None = None 

43 

44 

45@dataclass 

46class ApiEndpoint: 

47 """API Gateway endpoint information.""" 

48 

49 url: str 

50 region: str 

51 api_id: str 

52 is_regional: bool = False # True if this is a regional API (for private access) 

53 

54 

55class GCOAWSClient: 

56 """ 

57 AWS client for GCO operations. 

58 

59 Handles: 

60 - Stack discovery across regions 

61 - Authenticated API requests with SigV4 

62 - CloudFormation stack queries 

63 - EKS cluster information 

64 """ 

65 

66 def __init__(self, config: GCOConfig | None = None): 

67 self.config = config or get_config() 

68 self._session = boto3.Session() 

69 self._api_endpoint_cache: ApiEndpoint | None = None 

70 self._regional_api_cache: dict[str, ApiEndpoint] = {} 

71 self._regional_stacks_cache: dict[str, RegionalStack] | None = None 

72 self._cache_timestamp: float | None = None 

73 self._use_regional_api: bool = False # Set to True to use regional APIs 

74 

75 def _is_cache_valid(self) -> bool: 

76 """Check if cache is still valid.""" 

77 if self._cache_timestamp is None: 

78 return False 

79 return (time.time() - self._cache_timestamp) < self.config.cache_ttl_seconds 

80 

81 def _invalidate_cache(self) -> None: 

82 """Invalidate all caches.""" 

83 self._api_endpoint_cache = None 

84 self._regional_api_cache = {} 

85 self._regional_stacks_cache = None 

86 self._cache_timestamp = None 

87 

88 def set_use_regional_api(self, use_regional: bool) -> None: 

89 """Set whether to use regional APIs instead of global API. 

90 

91 When enabled, API calls will be routed through regional API Gateways 

92 that use VPC Lambdas to access internal ALBs. This is required when 

93 public access is disabled. 

94 

95 Args: 

96 use_regional: True to use regional APIs, False for global API 

97 """ 

98 self._use_regional_api = use_regional 

99 

100 def get_regional_api_endpoint( 

101 self, region: str, force_refresh: bool = False 

102 ) -> ApiEndpoint | None: 

103 """ 

104 Get the regional API Gateway endpoint for a specific region. 

105 

106 Regional APIs are used when public access is disabled and the ALB 

107 is internal-only. 

108 

109 Args: 

110 region: AWS region 

111 force_refresh: Force refresh from CloudFormation 

112 

113 Returns: 

114 ApiEndpoint with URL and metadata, or None if not found 

115 """ 

116 if not force_refresh and region in self._regional_api_cache and self._is_cache_valid(): 

117 return self._regional_api_cache[region] 

118 

119 cfn = self._session.client("cloudformation", region_name=region) 

120 stack_name = f"{self.config.project_name}-regional-api-{region}" 

121 

122 try: 

123 response = cfn.describe_stacks(StackName=stack_name) 

124 stack = response["Stacks"][0] 

125 

126 api_url = None 

127 for output in stack.get("Outputs", []): 

128 if output["OutputKey"] == "RegionalApiEndpoint": 

129 api_url = output["OutputValue"].rstrip("/") 

130 break 

131 

132 if not api_url: 

133 return None 

134 

135 # Extract API ID from URL 

136 api_id = api_url.split(".")[0].replace("https://", "") 

137 

138 endpoint = ApiEndpoint(url=api_url, region=region, api_id=api_id, is_regional=True) 

139 self._regional_api_cache[region] = endpoint 

140 return endpoint 

141 

142 except cfn.exceptions.ClientError: 

143 # Stack doesn't exist 

144 return None 

145 except Exception as e: 

146 logger.debug("Failed to get regional API endpoint for %s: %s", region, e) 

147 return None 

148 

149 def get_api_endpoint(self, force_refresh: bool = False) -> ApiEndpoint: 

150 """ 

151 Get the global API Gateway endpoint. 

152 

153 Args: 

154 force_refresh: Force refresh from CloudFormation 

155 

156 Returns: 

157 ApiEndpoint with URL and metadata 

158 """ 

159 if not force_refresh and self._api_endpoint_cache and self._is_cache_valid(): 

160 return self._api_endpoint_cache 

161 

162 cfn = self._session.client("cloudformation", region_name=self.config.api_gateway_region) 

163 

164 try: 

165 response = cfn.describe_stacks(StackName=self.config.api_gateway_stack_name) 

166 stack = response["Stacks"][0] 

167 

168 api_url = None 

169 for output in stack.get("Outputs", []): 

170 if output["OutputKey"] == "ApiEndpoint": 170 ↛ 169line 170 didn't jump to line 169 because the condition on line 170 was always true

171 api_url = output["OutputValue"].rstrip("/") 

172 break 

173 

174 if not api_url: 

175 raise ValueError( 

176 f"ApiEndpoint not found in stack {self.config.api_gateway_stack_name}" 

177 ) 

178 

179 # Extract API ID from URL 

180 # Format: https://{api-id}.execute-api.{region}.amazonaws.com/prod 

181 api_id = api_url.split(".")[0].replace("https://", "") 

182 

183 self._api_endpoint_cache = ApiEndpoint( 

184 url=api_url, region=self.config.api_gateway_region, api_id=api_id 

185 ) 

186 self._cache_timestamp = time.time() 

187 

188 return self._api_endpoint_cache 

189 

190 except Exception as e: 

191 raise RuntimeError(f"Failed to get API endpoint: {e}") from e 

192 

193 def discover_regional_stacks(self, force_refresh: bool = False) -> dict[str, RegionalStack]: 

194 """ 

195 Discover all regional GCO stacks. 

196 

197 Checks configured regions from cdk.json first for fast discovery, 

198 then falls back to scanning all AWS regions if no stacks are found. 

199 

200 Args: 

201 force_refresh: Force refresh from CloudFormation 

202 

203 Returns: 

204 Dictionary mapping region to RegionalStack 

205 """ 

206 if not force_refresh and self._regional_stacks_cache and self._is_cache_valid(): 

207 return self._regional_stacks_cache 

208 

209 regional_stacks: dict[str, RegionalStack] = {} 

210 

211 # Try configured regions first (fast path) 

212 configured_regions = self._get_configured_regions() 

213 if configured_regions: 213 ↛ 220line 213 didn't jump to line 220 because the condition on line 213 was always true

214 for region in configured_regions: 

215 stack = self._probe_regional_stack(region) 

216 if stack: 

217 regional_stacks[region] = stack 

218 

219 # If we found stacks in configured regions, skip the full scan 

220 if not regional_stacks: 

221 # Fall back to scanning all regions 

222 logger.debug("No stacks found in configured regions, scanning all AWS regions") 

223 ec2 = self._session.client("ec2", region_name="us-east-1") 

224 regions_response = ec2.describe_regions() 

225 all_regions = [r["RegionName"] for r in regions_response["Regions"]] 

226 

227 for region in all_regions: 

228 if region in configured_regions: 

229 continue # Already checked 

230 stack = self._probe_regional_stack(region) 

231 if stack: 231 ↛ 227line 231 didn't jump to line 227 because the condition on line 231 was always true

232 regional_stacks[region] = stack 

233 

234 self._regional_stacks_cache = regional_stacks 

235 self._cache_timestamp = time.time() 

236 

237 return regional_stacks 

238 

239 def _get_configured_regions(self) -> list[str]: 

240 """Get the list of configured deployment regions from cdk.json.""" 

241 from .config import _load_cdk_json 

242 

243 cdk_regions = _load_cdk_json() 

244 regions: list[str] = cdk_regions.get("regional", []) 

245 return regions 

246 

247 def _probe_regional_stack(self, region: str) -> RegionalStack | None: 

248 """Probe a single region for a GCO regional stack. 

249 

250 Args: 

251 region: AWS region to check 

252 

253 Returns: 

254 RegionalStack if found, None otherwise 

255 """ 

256 try: 

257 cfn = self._session.client("cloudformation", region_name=region) 

258 stack_name = f"{self.config.regional_stack_prefix}-{region}" 

259 

260 try: 

261 response = cfn.describe_stacks(StackName=stack_name) 

262 stack = response["Stacks"][0] 

263 

264 outputs = {o["OutputKey"]: o["OutputValue"] for o in stack.get("Outputs", [])} 

265 

266 return RegionalStack( 

267 region=region, 

268 stack_name=stack_name, 

269 cluster_name=outputs.get("ClusterName", f"{self.config.project_name}-{region}"), 

270 status=stack["StackStatus"], 

271 efs_file_system_id=outputs.get("EfsFileSystemId"), 

272 fsx_file_system_id=outputs.get("FsxFileSystemId"), 

273 created_time=stack.get("CreationTime"), 

274 ) 

275 except cfn.exceptions.ClientError: 

276 return None 

277 

278 except Exception as e: 

279 logger.debug("Failed to get regional stack info for %s: %s", region, e) 

280 return None 

281 

282 def get_regional_stack(self, region: str) -> RegionalStack | None: 

283 """Get information about a specific regional stack.""" 

284 stacks = self.discover_regional_stacks() 

285 return stacks.get(region) 

286 

287 def call_api( 

288 self, 

289 method: str, 

290 path: str, 

291 region: str | None = None, 

292 body: dict[str, Any] | None = None, 

293 params: dict[str, str] | None = None, 

294 ) -> dict[str, Any]: 

295 """ 

296 Make an API call and return the JSON response. 

297 

298 This is a convenience wrapper around make_authenticated_request. 

299 

300 Args: 

301 method: HTTP method (GET, POST, DELETE, etc.) 

302 path: API path (e.g., /api/v1/templates) 

303 region: Target region for the request 

304 body: Request body (will be JSON encoded) 

305 params: Query parameters 

306 

307 Returns: 

308 JSON response as dictionary 

309 

310 Raises: 

311 RuntimeError: If the request fails with a descriptive error message 

312 """ 

313 # Add URL-encoded query parameters to path 

314 if params: 

315 encoded_pairs = [ 

316 f"{quote(str(k), safe='')}={quote(str(v), safe='')}" 

317 for k, v in params.items() 

318 if v is not None 

319 ] 

320 if encoded_pairs: 320 ↛ 323line 320 didn't jump to line 323 because the condition on line 320 was always true

321 path = f"{path}?{'&'.join(encoded_pairs)}" 

322 

323 response = self.make_authenticated_request( 

324 method=method, 

325 path=path, 

326 body=body, 

327 target_region=region, 

328 ) 

329 

330 if not response.ok: 

331 error_msg = f"{response.status_code} {response.reason}" 

332 try: 

333 error_data = response.json() 

334 if "error" in error_data: 

335 error_msg = error_data["error"] 

336 elif "message" in error_data: 

337 error_msg = error_data["message"] 

338 elif "detail" in error_data: 338 ↛ 342line 338 didn't jump to line 342 because the condition on line 338 was always true

339 error_msg = error_data["detail"] 

340 except json.JSONDecodeError, KeyError: 

341 error_msg = response.text or error_msg 

342 raise RuntimeError(f"API request failed: {error_msg}") 

343 

344 result: dict[str, Any] = response.json() 

345 return result 

346 

347 def make_authenticated_request( 

348 self, 

349 method: str, 

350 path: str, 

351 body: dict[str, Any] | None = None, 

352 headers: dict[str, str] | None = None, 

353 target_region: str | None = None, 

354 ) -> requests.Response: 

355 """ 

356 Make an authenticated request to the GCO API. 

357 

358 If use_regional_api is enabled and a target_region is specified, 

359 the request will be routed through the regional API Gateway. 

360 Otherwise, it uses the global API Gateway. 

361 

362 Args: 

363 method: HTTP method (GET, POST, etc.) 

364 path: API path (e.g., /api/v1/manifests) 

365 body: Request body (will be JSON encoded) 

366 headers: Additional headers 

367 target_region: Target region for the request (added as header) 

368 

369 Returns: 

370 requests.Response object 

371 """ 

372 # Determine which endpoint to use 

373 if self._use_regional_api and target_region: 

374 regional_endpoint = self.get_regional_api_endpoint(target_region) 

375 # Fall back to global API if regional not available 

376 endpoint = regional_endpoint or self.get_api_endpoint() 

377 else: 

378 endpoint = self.get_api_endpoint() 

379 

380 url = f"{endpoint.url}{path}" 

381 

382 # Prepare headers 

383 request_headers = headers or {} 

384 request_headers["Content-Type"] = "application/json" 

385 

386 # Add target region header if specified (for global API routing) 

387 if target_region and not endpoint.is_regional: 

388 request_headers["X-GCO-Target-Region"] = target_region 

389 

390 # Prepare body 

391 body_str = json.dumps(body) if body else "" 

392 

393 # Create AWS request for signing 

394 aws_request = AWSRequest(method=method, url=url, headers=request_headers, data=body_str) 

395 

396 # Sign the request with the endpoint's region 

397 credentials = self._session.get_credentials() 

398 if credentials is None: 

399 raise RuntimeError( 

400 "No AWS credentials found. Configure credentials via environment variables, " 

401 "~/.aws/credentials, IAM role, or SSO (aws sso login)." 

402 ) 

403 SigV4Auth(credentials, "execute-api", endpoint.region).add_auth(aws_request) 

404 

405 # Make the request with retry for transient failures 

406 # Also retry 403 once with refreshed credentials (handles expired tokens from 

407 # SSO, assumed roles, or instance metadata that rotated mid-request) 

408 last_response = None 

409 _retried_auth = False 

410 for attempt in range(_MAX_RETRIES): 

411 response = requests.request( 

412 method=method, 

413 url=url, 

414 headers=dict(aws_request.headers), 

415 data=body_str, 

416 timeout=30, 

417 ) 

418 last_response = response 

419 

420 # 403 may mean expired SigV4 signature — retry once with fresh credentials 

421 if response.status_code == 403 and not _retried_auth: 

422 _retried_auth = True 

423 logger.warning( 

424 "Request to %s returned 403, refreshing credentials and retrying", 

425 path, 

426 ) 

427 # Force a new session to pick up refreshed credentials 

428 self._session = boto3.Session() 

429 aws_request = AWSRequest( 

430 method=method, url=url, headers=request_headers, data=body_str 

431 ) 

432 credentials = self._session.get_credentials() 

433 if credentials is None: 433 ↛ 434line 433 didn't jump to line 434 because the condition on line 433 was never true

434 return response # No credentials available, return the 403 

435 SigV4Auth(credentials, "execute-api", endpoint.region).add_auth(aws_request) 

436 continue 

437 

438 if response.status_code not in _RETRYABLE_STATUS_CODES: 

439 return response 

440 

441 # Retryable error — back off and retry 

442 if attempt < _MAX_RETRIES - 1: 

443 wait_time = _RETRY_BACKOFF_BASE * (2**attempt) 

444 logger.warning( 

445 "Request to %s returned %d, retrying in %.1fs (attempt %d/%d)", 

446 path, 

447 response.status_code, 

448 wait_time, 

449 attempt + 1, 

450 _MAX_RETRIES, 

451 ) 

452 time.sleep(wait_time) 

453 

454 # Re-sign the request for the retry (credentials/time may have changed) 

455 aws_request = AWSRequest( 

456 method=method, url=url, headers=request_headers, data=body_str 

457 ) 

458 credentials = self._session.get_credentials() 

459 if credentials is None: 459 ↛ 460line 459 didn't jump to line 460 because the condition on line 459 was never true

460 return last_response 

461 SigV4Auth(credentials, "execute-api", endpoint.region).add_auth(aws_request) 

462 

463 # All retries exhausted — return the last response 

464 return last_response # type: ignore[return-value] 

465 

466 def submit_manifests( 

467 self, 

468 manifests: list[dict[str, Any]], 

469 namespace: str | None = None, 

470 target_region: str | None = None, 

471 dry_run: bool = False, 

472 ) -> dict[str, Any]: 

473 """ 

474 Submit manifests to the GCO API. 

475 

476 Args: 

477 manifests: List of Kubernetes manifest dictionaries 

478 namespace: Default namespace for manifests 

479 target_region: Target region for job execution 

480 dry_run: If True, validate without applying 

481 

482 Returns: 

483 API response dictionary 

484 

485 Raises: 

486 RuntimeError: If submission fails with descriptive error message 

487 """ 

488 body = {"manifests": manifests, "dry_run": dry_run} 

489 

490 if namespace: 490 ↛ 493line 490 didn't jump to line 493 because the condition on line 490 was always true

491 body["namespace"] = namespace 

492 

493 response = self.make_authenticated_request( 

494 method="POST", path="/api/v1/manifests", body=body, target_region=target_region 

495 ) 

496 

497 # Parse response and provide descriptive error messages 

498 if not response.ok: 

499 error_msg = f"{response.status_code} {response.reason}" 

500 try: 

501 error_data = response.json() 

502 # Extract meaningful error details from the response 

503 if "resources" in error_data: 

504 failed = [r for r in error_data["resources"] if r.get("status") == "failed"] 

505 if failed: 505 ↛ 516line 505 didn't jump to line 516 because the condition on line 505 was always true

506 messages = [ 

507 f"{r.get('name')}: {r.get('message', 'Unknown error')}" for r in failed 

508 ] 

509 error_msg = "; ".join(messages) 

510 elif "error" in error_data: 510 ↛ 511line 510 didn't jump to line 511 because the condition on line 510 was never true

511 error_msg = error_data["error"] 

512 elif "message" in error_data: 512 ↛ 516line 512 didn't jump to line 516 because the condition on line 512 was always true

513 error_msg = error_data["message"] 

514 except json.JSONDecodeError, KeyError: 

515 error_msg = response.text or error_msg 

516 raise RuntimeError(error_msg) 

517 

518 result: dict[str, Any] = response.json() 

519 return result 

520 

521 def get_jobs( 

522 self, 

523 region: str | None = None, 

524 namespace: str | None = None, 

525 status: str | None = None, 

526 ) -> list[dict[str, Any]]: 

527 """ 

528 Get jobs from GCO clusters. 

529 

530 Args: 

531 region: Specific region to query (None for all regions) 

532 namespace: Filter by namespace 

533 status: Filter by status (running, completed, failed) 

534 

535 Returns: 

536 List of job information dictionaries 

537 """ 

538 params = [] 

539 if namespace: 539 ↛ 541line 539 didn't jump to line 541 because the condition on line 539 was always true

540 params.append(f"namespace={namespace}") 

541 if status: 

542 params.append(f"status={status}") 

543 

544 query_string = f"?{'&'.join(params)}" if params else "" 

545 

546 response = self.make_authenticated_request( 

547 method="GET", path=f"/api/v1/jobs{query_string}", target_region=region 

548 ) 

549 

550 response.raise_for_status() 

551 result: list[dict[str, Any]] = response.json() 

552 return result 

553 

554 def get_job_details( 

555 self, job_name: str, namespace: str, region: str | None = None 

556 ) -> dict[str, Any]: 

557 """ 

558 Get detailed information about a specific job. 

559 

560 Args: 

561 job_name: Name of the job 

562 namespace: Namespace of the job 

563 region: Region where the job is running 

564 

565 Returns: 

566 Job details dictionary 

567 """ 

568 response = self.make_authenticated_request( 

569 method="GET", path=f"/api/v1/jobs/{namespace}/{job_name}", target_region=region 

570 ) 

571 

572 response.raise_for_status() 

573 result: dict[str, Any] = response.json() 

574 return result 

575 

576 def get_job_logs( 

577 self, job_name: str, namespace: str, region: str | None = None, tail_lines: int = 100 

578 ) -> str: 

579 """ 

580 Get logs from a job. 

581 

582 Args: 

583 job_name: Name of the job 

584 namespace: Namespace of the job 

585 region: Region where the job is running 

586 tail_lines: Number of lines to return from the end 

587 

588 Returns: 

589 Log content as string 

590 """ 

591 response = self.make_authenticated_request( 

592 method="GET", 

593 path=f"/api/v1/jobs/{namespace}/{job_name}/logs?tail={tail_lines}", 

594 target_region=region, 

595 ) 

596 

597 if not response.ok: 

598 # Try to extract a useful error message from the response body 

599 try: 

600 error_data = response.json() 

601 detail = error_data.get("detail", response.reason) 

602 except Exception: 

603 detail = response.text or response.reason 

604 raise RuntimeError(detail) 

605 

606 return str(response.json().get("logs", "")) 

607 

608 def delete_job( 

609 self, job_name: str, namespace: str, region: str | None = None 

610 ) -> dict[str, Any]: 

611 """ 

612 Delete a job. 

613 

614 Args: 

615 job_name: Name of the job 

616 namespace: Namespace of the job 

617 region: Region where the job is running 

618 

619 Returns: 

620 Deletion result dictionary 

621 """ 

622 response = self.make_authenticated_request( 

623 method="DELETE", path=f"/api/v1/jobs/{namespace}/{job_name}", target_region=region 

624 ) 

625 

626 response.raise_for_status() 

627 result: dict[str, Any] = response.json() 

628 return result 

629 

630 def get_regional_alb_endpoint(self, region: str) -> str | None: 

631 """ 

632 Get the ALB endpoint for a specific region. 

633 

634 Args: 

635 region: AWS region 

636 

637 Returns: 

638 ALB DNS name or None if not found 

639 """ 

640 stack = self.get_regional_stack(region) 

641 if not stack: 641 ↛ 642line 641 didn't jump to line 642 because the condition on line 641 was never true

642 return None 

643 

644 cfn = self._session.client("cloudformation", region_name=region) 

645 try: 

646 response = cfn.describe_stacks(StackName=stack.stack_name) 

647 stack_data = response["Stacks"][0] 

648 outputs = {o["OutputKey"]: o["OutputValue"] for o in stack_data.get("Outputs", [])} 

649 return outputs.get("AlbDnsName") or outputs.get("LoadBalancerDnsName") 

650 except Exception as e: 

651 logger.debug("Failed to get ALB DNS for %s: %s", region, e) 

652 return None 

653 

654 # ========================================================================= 

655 # Global Aggregation Methods (Cross-Region) 

656 # ========================================================================= 

657 

658 def get_global_jobs( 

659 self, 

660 namespace: str | None = None, 

661 status: str | None = None, 

662 limit: int = 50, 

663 ) -> dict[str, Any]: 

664 """ 

665 Get jobs across all regions via the global aggregation API. 

666 

667 Args: 

668 namespace: Filter by namespace 

669 status: Filter by status 

670 limit: Maximum jobs to return 

671 

672 Returns: 

673 Aggregated job list with region information 

674 """ 

675 params = [f"limit={limit}"] 

676 if namespace: 676 ↛ 678line 676 didn't jump to line 678 because the condition on line 676 was always true

677 params.append(f"namespace={namespace}") 

678 if status: 678 ↛ 681line 678 didn't jump to line 681 because the condition on line 678 was always true

679 params.append(f"status={status}") 

680 

681 query_string = f"?{'&'.join(params)}" 

682 

683 response = self.make_authenticated_request( 

684 method="GET", path=f"/api/v1/global/jobs{query_string}" 

685 ) 

686 

687 response.raise_for_status() 

688 result: dict[str, Any] = response.json() 

689 return result 

690 

691 def get_global_health(self) -> dict[str, Any]: 

692 """ 

693 Get health status across all regions. 

694 

695 Returns: 

696 Aggregated health status from all regional clusters 

697 """ 

698 response = self.make_authenticated_request(method="GET", path="/api/v1/global/health") 

699 

700 response.raise_for_status() 

701 result: dict[str, Any] = response.json() 

702 return result 

703 

704 def get_global_status(self) -> dict[str, Any]: 

705 """ 

706 Get cluster status across all regions. 

707 

708 Returns: 

709 Aggregated status from all regional clusters 

710 """ 

711 response = self.make_authenticated_request(method="GET", path="/api/v1/global/status") 

712 

713 response.raise_for_status() 

714 result: dict[str, Any] = response.json() 

715 return result 

716 

717 def bulk_delete_global( 

718 self, 

719 namespace: str | None = None, 

720 status: str | None = None, 

721 older_than_days: int | None = None, 

722 dry_run: bool = True, 

723 ) -> dict[str, Any]: 

724 """ 

725 Bulk delete jobs across all regions. 

726 

727 Args: 

728 namespace: Filter by namespace 

729 status: Filter by status 

730 older_than_days: Delete jobs older than N days 

731 dry_run: If True, only return what would be deleted 

732 

733 Returns: 

734 Deletion results from all regions 

735 """ 

736 body: dict[str, Any] = {"dry_run": dry_run} 

737 if namespace: 

738 body["namespace"] = namespace 

739 if status: 739 ↛ 741line 739 didn't jump to line 741 because the condition on line 739 was always true

740 body["status"] = status 

741 if older_than_days: 

742 body["older_than_days"] = older_than_days 

743 

744 response = self.make_authenticated_request( 

745 method="DELETE", path="/api/v1/global/jobs", body=body 

746 ) 

747 

748 response.raise_for_status() 

749 result: dict[str, Any] = response.json() 

750 return result 

751 

752 # ========================================================================= 

753 # Regional Job Operations (New API Endpoints) 

754 # ========================================================================= 

755 

756 def get_job_events(self, job_name: str, namespace: str, region: str) -> dict[str, Any]: 

757 """ 

758 Get Kubernetes events for a job. 

759 

760 Args: 

761 job_name: Name of the job 

762 namespace: Namespace of the job 

763 region: Region where the job is running 

764 

765 Returns: 

766 Events related to the job 

767 """ 

768 response = self.make_authenticated_request( 

769 method="GET", 

770 path=f"/api/v1/jobs/{namespace}/{job_name}/events", 

771 target_region=region, 

772 ) 

773 

774 response.raise_for_status() 

775 result: dict[str, Any] = response.json() 

776 return result 

777 

778 def get_job_pods(self, job_name: str, namespace: str, region: str) -> dict[str, Any]: 

779 """ 

780 Get pods for a job. 

781 

782 Args: 

783 job_name: Name of the job 

784 namespace: Namespace of the job 

785 region: Region where the job is running 

786 

787 Returns: 

788 Pod details for the job 

789 """ 

790 response = self.make_authenticated_request( 

791 method="GET", 

792 path=f"/api/v1/jobs/{namespace}/{job_name}/pods", 

793 target_region=region, 

794 ) 

795 

796 response.raise_for_status() 

797 result: dict[str, Any] = response.json() 

798 return result 

799 

800 def get_pod_logs( 

801 self, 

802 job_name: str, 

803 pod_name: str, 

804 namespace: str, 

805 region: str, 

806 tail_lines: int = 100, 

807 container: str | None = None, 

808 ) -> dict[str, Any]: 

809 """ 

810 Get logs from a specific pod of a job. 

811 

812 Args: 

813 job_name: Name of the job 

814 pod_name: Name of the pod 

815 namespace: Namespace of the job 

816 region: Region where the job is running 

817 tail_lines: Number of lines to return from the end 

818 container: Container name (for multi-container pods) 

819 

820 Returns: 

821 Pod logs response 

822 """ 

823 params = [f"tail={tail_lines}"] 

824 if container: 

825 params.append(f"container={container}") 

826 

827 query_string = f"?{'&'.join(params)}" 

828 

829 response = self.make_authenticated_request( 

830 method="GET", 

831 path=f"/api/v1/jobs/{namespace}/{job_name}/pods/{pod_name}/logs{query_string}", 

832 target_region=region, 

833 ) 

834 

835 response.raise_for_status() 

836 result: dict[str, Any] = response.json() 

837 return result 

838 

839 def get_job_metrics(self, job_name: str, namespace: str, region: str) -> dict[str, Any]: 

840 """ 

841 Get resource metrics for a job. 

842 

843 Args: 

844 job_name: Name of the job 

845 namespace: Namespace of the job 

846 region: Region where the job is running 

847 

848 Returns: 

849 Resource usage metrics for the job's pods 

850 """ 

851 response = self.make_authenticated_request( 

852 method="GET", 

853 path=f"/api/v1/jobs/{namespace}/{job_name}/metrics", 

854 target_region=region, 

855 ) 

856 

857 response.raise_for_status() 

858 result: dict[str, Any] = response.json() 

859 return result 

860 

861 def retry_job(self, job_name: str, namespace: str, region: str) -> dict[str, Any]: 

862 """ 

863 Retry a failed job. 

864 

865 Creates a new job from the failed job's spec with a new name. 

866 

867 Args: 

868 job_name: Name of the failed job 

869 namespace: Namespace of the job 

870 region: Region where the job is running 

871 

872 Returns: 

873 Result with new job name 

874 """ 

875 response = self.make_authenticated_request( 

876 method="POST", 

877 path=f"/api/v1/jobs/{namespace}/{job_name}/retry", 

878 target_region=region, 

879 ) 

880 

881 response.raise_for_status() 

882 result: dict[str, Any] = response.json() 

883 return result 

884 

885 def bulk_delete_jobs( 

886 self, 

887 namespace: str | None = None, 

888 status: str | None = None, 

889 older_than_days: int | None = None, 

890 label_selector: str | None = None, 

891 region: str | None = None, 

892 dry_run: bool = True, 

893 ) -> dict[str, Any]: 

894 """ 

895 Bulk delete jobs in a region. 

896 

897 Args: 

898 namespace: Filter by namespace 

899 status: Filter by status 

900 older_than_days: Delete jobs older than N days 

901 label_selector: Kubernetes label selector 

902 region: Target region 

903 dry_run: If True, only return what would be deleted 

904 

905 Returns: 

906 Deletion results 

907 """ 

908 body: dict[str, Any] = {"dry_run": dry_run} 

909 if namespace: 

910 body["namespace"] = namespace 

911 if status: 

912 body["status"] = status 

913 if older_than_days: 

914 body["older_than_days"] = older_than_days 

915 if label_selector: 

916 body["label_selector"] = label_selector 

917 

918 response = self.make_authenticated_request( 

919 method="DELETE", path="/api/v1/jobs", body=body, target_region=region 

920 ) 

921 

922 response.raise_for_status() 

923 result: dict[str, Any] = response.json() 

924 return result 

925 

926 def get_health(self, region: str) -> dict[str, Any]: 

927 """ 

928 Get health status for a specific region. 

929 

930 Args: 

931 region: Target region 

932 

933 Returns: 

934 Health status for the regional cluster 

935 """ 

936 response = self.make_authenticated_request( 

937 method="GET", path="/api/v1/health", target_region=region 

938 ) 

939 

940 response.raise_for_status() 

941 result: dict[str, Any] = response.json() 

942 return result 

943 

944 

945def get_aws_client(config: GCOConfig | None = None) -> GCOAWSClient: 

946 """Get a configured AWS client instance.""" 

947 return GCOAWSClient(config)