Coverage for gco / config / config_loader.py: 97%

227 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-30 21:47 +0000

1""" 

2Configuration loader for GCO (Global Capacity Orchestrator on AWS). 

3 

4This module loads and validates configuration from CDK context (cdk.json). 

5It provides type-safe access to all configuration values with sensible defaults 

6and comprehensive validation. 

7 

8Configuration Sections: 

9- project_name: Unique identifier for the deployment 

10- regions: List of AWS regions to deploy to 

11- kubernetes_version: EKS Kubernetes version 

12- resource_thresholds: CPU/memory/GPU utilization thresholds 

13- global_accelerator: Global Accelerator settings 

14- alb_config: Application Load Balancer health check settings 

15- manifest_processor: Manifest validation and resource limits 

16- api_gateway: Throttling and logging configuration 

17- tags: Common tags applied to all resources 

18 

19Usage: 

20 config = ConfigLoader(app) 

21 regions = config.get_regions() 

22 cluster_config = config.get_cluster_config("us-east-1") 

23""" 

24 

25from __future__ import annotations 

26 

27import logging 

28from typing import Any, cast 

29 

30import boto3 

31from aws_cdk import App 

32 

33from gco.models import ClusterConfig, ResourceThresholds 

34 

35logger = logging.getLogger(__name__) 

36 

37 

38class ConfigValidationError(Exception): 

39 """Raised when configuration validation fails.""" 

40 

41 pass 

42 

43 

44class ConfigLoader: 

45 """ 

46 Loads and validates configuration from CDK context (cdk.json) 

47 """ 

48 

49 # Valid AWS regions (subset of commonly used regions) 

50 VALID_REGIONS = { 

51 "us-east-1", 

52 "us-east-2", 

53 "us-west-1", 

54 "us-west-2", 

55 "eu-west-1", 

56 "eu-west-2", 

57 "eu-west-3", 

58 "eu-central-1", 

59 "ap-southeast-1", 

60 "ap-southeast-2", 

61 "ap-northeast-1", 

62 "ap-northeast-2", 

63 "ca-central-1", 

64 "sa-east-1", 

65 } 

66 

67 def __init__(self, app: App): 

68 self.app = app 

69 self._validate_configuration() 

70 

71 def _validate_configuration(self) -> None: 

72 """Validate the entire configuration""" 

73 # Check if we have any context at all (might be running outside CDK) 

74 project_name = self.app.node.try_get_context("project_name") 

75 if project_name is None: 

76 # Running outside CDK context, skip validation 

77 return 

78 

79 # Validate required fields exist 

80 required_fields = [ 

81 "project_name", 

82 "kubernetes_version", 

83 "resource_thresholds", 

84 ] 

85 for field in required_fields: 

86 if not self.app.node.try_get_context(field): 

87 raise ConfigValidationError(f"Required configuration field '{field}' is missing") 

88 

89 # Check for deployment_regions 

90 deployment_regions = self.app.node.try_get_context("deployment_regions") 

91 if not deployment_regions: 

92 raise ConfigValidationError( 

93 "Required configuration field 'deployment_regions' is missing" 

94 ) 

95 

96 # Validate regions 

97 self._validate_regions() 

98 

99 # Validate resource thresholds 

100 self._validate_resource_thresholds() 

101 

102 # Validate Global Accelerator config 

103 self._validate_global_accelerator_config() 

104 

105 # Validate ALB config 

106 self._validate_alb_config() 

107 

108 # Validate manifest processor config 

109 self._validate_manifest_processor_config() 

110 

111 # Validate API Gateway config 

112 self._validate_api_gateway_config() 

113 

114 # Validate EKS cluster config 

115 self._validate_eks_cluster_config() 

116 

117 def _validate_regions(self) -> None: 

118 """Validate region configuration""" 

119 regions = self.get_regions() 

120 

121 if not regions: 

122 raise ConfigValidationError("At least one region must be specified") 

123 

124 if len(regions) > 10: 

125 raise ConfigValidationError("Maximum of 10 regions supported") 

126 

127 for region in regions: 

128 if region not in self.VALID_REGIONS: 

129 raise ConfigValidationError( 

130 f"Invalid region '{region}'. Valid regions: {sorted(self.VALID_REGIONS)}" 

131 ) 

132 

133 # Check for duplicates 

134 if len(regions) != len(set(regions)): 

135 raise ConfigValidationError("Duplicate regions found in configuration") 

136 

137 def _validate_resource_thresholds(self) -> None: 

138 """Validate resource threshold configuration""" 

139 thresholds_config = self.app.node.try_get_context("resource_thresholds") 

140 

141 required_thresholds = ["cpu_threshold", "memory_threshold", "gpu_threshold"] 

142 for threshold in required_thresholds: 

143 if threshold not in thresholds_config: 

144 raise ConfigValidationError(f"Missing threshold configuration: {threshold}") 

145 

146 value = thresholds_config[threshold] 

147 if not isinstance(value, int) or (value != -1 and not 0 <= value <= 100): 

148 raise ConfigValidationError( 

149 f"{threshold} must be an integer between 0 and 100 (or -1 to disable), got {value}" 

150 ) 

151 

152 # Validate optional thresholds if present 

153 for opt_threshold in [ 

154 "pending_pods_threshold", 

155 "pending_requested_cpu_vcpus", 

156 "pending_requested_memory_gb", 

157 "pending_requested_gpus", 

158 ]: 

159 if opt_threshold in thresholds_config: 

160 value = thresholds_config[opt_threshold] 

161 if not isinstance(value, int) or (value != -1 and value < 0): 

162 raise ConfigValidationError( 

163 f"{opt_threshold} must be a non-negative integer (or -1 to disable), got {value}" 

164 ) 

165 

166 def _validate_global_accelerator_config(self) -> None: 

167 """Validate Global Accelerator configuration""" 

168 ga_config = self.app.node.try_get_context("global_accelerator") 

169 if not ga_config: 

170 raise ConfigValidationError("global_accelerator configuration is required") 

171 

172 required_fields = [ 

173 "name", 

174 "health_check_grace_period", 

175 "health_check_interval", 

176 "health_check_timeout", 

177 "health_check_path", 

178 ] 

179 for field in required_fields: 

180 if field not in ga_config: 

181 raise ConfigValidationError(f"Missing global_accelerator configuration: {field}") 

182 

183 # Validate timing values 

184 for field in ["health_check_grace_period", "health_check_interval", "health_check_timeout"]: 

185 value = ga_config[field] 

186 if not isinstance(value, int) or value <= 0: 

187 raise ConfigValidationError(f"{field} must be a positive integer, got {value}") 

188 

189 # Validate health check path 

190 if not ga_config["health_check_path"].startswith("/"): 

191 raise ConfigValidationError("health_check_path must start with '/'") 

192 

193 def _validate_alb_config(self) -> None: 

194 """Validate ALB configuration""" 

195 alb_config = self.app.node.try_get_context("alb_config") 

196 if not alb_config: 

197 raise ConfigValidationError("alb_config configuration is required") 

198 

199 required_fields = [ 

200 "health_check_interval", 

201 "health_check_timeout", 

202 "healthy_threshold", 

203 "unhealthy_threshold", 

204 ] 

205 for field in required_fields: 

206 if field not in alb_config: 

207 raise ConfigValidationError(f"Missing alb_config configuration: {field}") 

208 

209 value = alb_config[field] 

210 if not isinstance(value, int) or value <= 0: 

211 raise ConfigValidationError(f"{field} must be a positive integer, got {value}") 

212 

213 def _validate_manifest_processor_config(self) -> None: 

214 """Validate manifest processor configuration. 

215 

216 The manifest processor section in cdk.json holds service-specific 

217 settings only. The shared validation policy (allowed_namespaces, 

218 resource_quotas, trusted_registries, trusted_dockerhub_orgs, 

219 manifest_security_policy, allowed_kinds) lives under 

220 ``job_validation_policy`` because the queue_processor reads the 

221 same values. 

222 """ 

223 mp_config = self.app.node.try_get_context("manifest_processor") 

224 if not mp_config: 

225 raise ConfigValidationError("manifest_processor configuration is required") 

226 

227 required_fields = [ 

228 "image", 

229 "replicas", 

230 "resource_limits", 

231 ] 

232 for field in required_fields: 

233 if field not in mp_config: 

234 raise ConfigValidationError(f"Missing manifest_processor configuration: {field}") 

235 

236 # Validate replicas 

237 if not isinstance(mp_config["replicas"], int) or mp_config["replicas"] <= 0: 

238 raise ConfigValidationError("manifest_processor replicas must be a positive integer") 

239 

240 # Validate the shared policy section separately so a misconfigured 

241 # policy block surfaces a clear error pointing at the right key. 

242 policy = self.app.node.try_get_context("job_validation_policy") 

243 if policy is None: 

244 raise ConfigValidationError( 

245 "job_validation_policy configuration is required (shared between " 

246 "manifest_processor and queue_processor)" 

247 ) 

248 for policy_field in ("allowed_namespaces", "resource_quotas"): 

249 if policy_field not in policy: 

250 raise ConfigValidationError( 

251 f"Missing job_validation_policy configuration: {policy_field}" 

252 ) 

253 

254 # Validate resource limits 

255 resource_limits = mp_config["resource_limits"] 

256 if "cpu" not in resource_limits or "memory" not in resource_limits: 

257 raise ConfigValidationError( 

258 "manifest_processor resource_limits must contain 'cpu' and 'memory'" 

259 ) 

260 

261 # Validate allowed namespaces (lives under job_validation_policy). 

262 if not isinstance(policy["allowed_namespaces"], list): 

263 raise ConfigValidationError("job_validation_policy.allowed_namespaces must be a list") 

264 

265 def _validate_api_gateway_config(self) -> None: 

266 """Validate API Gateway configuration""" 

267 api_gw_config = self.app.node.try_get_context("api_gateway") 

268 if not api_gw_config: 

269 raise ConfigValidationError("api_gateway configuration is required") 

270 

271 required_fields = [ 

272 "throttle_rate_limit", 

273 "throttle_burst_limit", 

274 "log_level", 

275 "metrics_enabled", 

276 "tracing_enabled", 

277 ] 

278 for field in required_fields: 

279 if field not in api_gw_config: 

280 raise ConfigValidationError(f"Missing api_gateway configuration: {field}") 

281 

282 # Validate throttle limits 

283 throttle_rate = api_gw_config["throttle_rate_limit"] 

284 throttle_burst = api_gw_config["throttle_burst_limit"] 

285 

286 if not isinstance(throttle_rate, int) or throttle_rate <= 0: 

287 raise ConfigValidationError( 

288 f"throttle_rate_limit must be a positive integer, got {throttle_rate}" 

289 ) 

290 

291 if not isinstance(throttle_burst, int) or throttle_burst <= 0: 

292 raise ConfigValidationError( 

293 f"throttle_burst_limit must be a positive integer, got {throttle_burst}" 

294 ) 

295 

296 if throttle_burst < throttle_rate: 

297 raise ConfigValidationError( 

298 "throttle_burst_limit should be greater than or equal to throttle_rate_limit" 

299 ) 

300 

301 # Validate log level 

302 valid_log_levels = ["OFF", "ERROR", "INFO"] 

303 log_level = api_gw_config["log_level"] 

304 if log_level not in valid_log_levels: 

305 raise ConfigValidationError( 

306 f"log_level must be one of {valid_log_levels}, got {log_level}" 

307 ) 

308 

309 # Validate boolean flags 

310 if not isinstance(api_gw_config["metrics_enabled"], bool): 

311 raise ConfigValidationError("metrics_enabled must be a boolean") 

312 

313 if not isinstance(api_gw_config["tracing_enabled"], bool): 

314 raise ConfigValidationError("tracing_enabled must be a boolean") 

315 

316 def _validate_eks_cluster_config(self) -> None: 

317 """Validate EKS cluster configuration""" 

318 eks_config = self.app.node.try_get_context("eks_cluster") or {} 

319 

320 # Validate endpoint_access if present 

321 if "endpoint_access" in eks_config: 

322 valid_access_modes = ["PRIVATE", "PUBLIC_AND_PRIVATE"] 

323 if eks_config["endpoint_access"] not in valid_access_modes: 

324 raise ConfigValidationError( 

325 f"endpoint_access must be one of {valid_access_modes}, " 

326 f"got {eks_config['endpoint_access']}" 

327 ) 

328 

329 def get_project_name(self) -> str: 

330 """Get project name from configuration""" 

331 return self.app.node.try_get_context("project_name") or "gco" 

332 

333 def get_deployment_regions(self) -> dict[str, Any]: 

334 """Get deployment regions configuration. 

335 

336 Returns a dict with: 

337 - global: Region for Global Accelerator and SSM parameters (default: us-east-2) 

338 - api_gateway: Region for API Gateway stack (default: us-east-2) 

339 - monitoring: Region for Monitoring stack (default: us-east-2) 

340 - regional: List of regions for EKS clusters (default: ["us-east-1"]) 

341 

342 Note: Global Accelerator is a global service but requires a "home" region 

343 for CloudFormation deployment. us-east-2 is used by default to keep 

344 global infrastructure separate from workload regions. 

345 """ 

346 deployment_regions = self.app.node.try_get_context("deployment_regions") or {} 

347 

348 return { 

349 "global": deployment_regions.get("global", "us-east-2"), 

350 "api_gateway": deployment_regions.get("api_gateway", "us-east-2"), 

351 "monitoring": deployment_regions.get("monitoring", "us-east-2"), 

352 "regional": deployment_regions.get("regional", ["us-east-1"]), 

353 } 

354 

355 def get_global_region(self) -> str: 

356 """Get the region for global resources (Global Accelerator, SSM params).""" 

357 region = self.get_deployment_regions()["global"] 

358 return str(region) 

359 

360 def get_api_gateway_region(self) -> str: 

361 """Get the region for API Gateway stack.""" 

362 region = self.get_deployment_regions()["api_gateway"] 

363 return str(region) 

364 

365 def get_monitoring_region(self) -> str: 

366 """Get the region for Monitoring stack.""" 

367 region = self.get_deployment_regions()["monitoring"] 

368 return str(region) 

369 

370 def get_regions(self) -> list[str]: 

371 """Get list of regions for EKS cluster deployment.""" 

372 deployment_regions = self.get_deployment_regions() 

373 regional = deployment_regions["regional"] 

374 return list(regional) if isinstance(regional, list) else [str(regional)] 

375 

376 def get_kubernetes_version(self) -> str: 

377 """Get Kubernetes version from configuration""" 

378 return self.app.node.try_get_context("kubernetes_version") or "1.35" 

379 

380 def get_resource_thresholds(self) -> ResourceThresholds: 

381 """Get resource thresholds configuration""" 

382 thresholds_config = self.app.node.try_get_context("resource_thresholds") or { 

383 "cpu_threshold": 60, 

384 "memory_threshold": 60, 

385 "gpu_threshold": -1, 

386 "pending_pods_threshold": 10, 

387 "pending_requested_cpu_vcpus": 100, 

388 "pending_requested_memory_gb": 200, 

389 "pending_requested_gpus": -1, 

390 } 

391 return ResourceThresholds( 

392 cpu_threshold=thresholds_config["cpu_threshold"], 

393 memory_threshold=thresholds_config["memory_threshold"], 

394 gpu_threshold=thresholds_config["gpu_threshold"], 

395 pending_pods_threshold=thresholds_config.get("pending_pods_threshold", 10), 

396 pending_requested_cpu_vcpus=thresholds_config.get("pending_requested_cpu_vcpus", 100), 

397 pending_requested_memory_gb=thresholds_config.get("pending_requested_memory_gb", 200), 

398 pending_requested_gpus=thresholds_config.get("pending_requested_gpus", 8), 

399 ) 

400 

401 def get_cluster_config(self, region: str) -> ClusterConfig: 

402 """Get complete cluster configuration for a region""" 

403 return ClusterConfig( 

404 region=region, 

405 cluster_name=f"{self.get_project_name()}-{region}", 

406 kubernetes_version=self.get_kubernetes_version(), 

407 addons=["metrics-server"], 

408 resource_thresholds=self.get_resource_thresholds(), 

409 ) 

410 

411 def get_global_accelerator_config(self) -> dict[str, Any]: 

412 """Get Global Accelerator configuration""" 

413 return self.app.node.try_get_context("global_accelerator") or { 

414 "name": "gco-accelerator", 

415 "health_check_grace_period": 30, 

416 "health_check_interval": 30, 

417 "health_check_timeout": 5, 

418 "health_check_path": "/api/v1/health", 

419 } 

420 

421 def get_alb_config(self) -> dict[str, Any]: 

422 """Get ALB configuration""" 

423 return self.app.node.try_get_context("alb_config") or { 

424 "health_check_interval": 30, 

425 "health_check_timeout": 5, 

426 "healthy_threshold": 2, 

427 "unhealthy_threshold": 2, 

428 } 

429 

430 def get_manifest_processor_config(self) -> dict[str, Any]: 

431 """Get manifest processor configuration. 

432 

433 Merges three cdk.json sections into a single runtime config: 

434 

435 - ``manifest_processor``: service-specific settings (replicas, image, 

436 resource_limits, allowed_namespaces, validation_enabled, 

437 max_request_body_bytes, yaml_max_depth, yaml_allow_aliases) 

438 - ``job_validation_policy``: shared validation policy (resource_quotas, 

439 trusted_registries, trusted_dockerhub_orgs, manifest_security_policy, 

440 allowed_kinds). Pulled in verbatim so the REST path reads the same 

441 policy the SQS queue processor enforces. 

442 

443 Note: The 'image' field is a placeholder default. In practice, the actual 

444 image is built from dockerfiles/manifest-processor-dockerfile and pushed 

445 to ECR during CDK deployment. The {{MANIFEST_PROCESSOR_IMAGE}} placeholder 

446 in manifests is replaced with the ECR image URI. 

447 """ 

448 default_config = { 

449 "image": "gco/manifest-processor:latest", # Placeholder, replaced by ECR image 

450 "replicas": 3, 

451 "resource_limits": {"cpu": "1000m", "memory": "2Gi"}, 

452 "validation_enabled": True, 

453 # allowed_namespaces, resource_quotas, trusted_registries, 

454 # trusted_dockerhub_orgs, manifest_security_policy, and 

455 # allowed_kinds are merged in below from job_validation_policy. 

456 "allowed_namespaces": ["default", "gco-jobs"], 

457 "resource_quotas": { 

458 "max_cpu_per_manifest": "10", 

459 "max_memory_per_manifest": "32Gi", 

460 "max_gpu_per_manifest": 4, 

461 }, 

462 "trusted_registries": [ 

463 "docker.io", 

464 "gcr.io", 

465 "quay.io", 

466 "registry.k8s.io", 

467 "k8s.gcr.io", 

468 "public.ecr.aws", 

469 "nvcr.io", 

470 "gco", 

471 ], 

472 "trusted_dockerhub_orgs": [ 

473 "nvidia", 

474 "pytorch", 

475 "rayproject", 

476 "tensorflow", 

477 "huggingface", 

478 "amazon", 

479 "bitnami", 

480 ], 

481 } 

482 context_config = self.app.node.try_get_context("manifest_processor") or {} 

483 

484 # Merge in the shared job_validation_policy section. These keys apply 

485 # to BOTH the manifest processor and the queue processor; they live 

486 # in their own top-level cdk.json section so neither service "owns" 

487 # them. We flatten them into the manifest processor's runtime config 

488 # so service code keeps its existing attribute layout. 

489 shared_policy = self.app.node.try_get_context("job_validation_policy") or {} 

490 return {**default_config, **context_config, **shared_policy} 

491 

492 def get_api_gateway_config(self) -> dict[str, Any]: 

493 """Get API Gateway configuration. 

494 

495 Returns: 

496 API Gateway configuration dictionary with the following keys: 

497 - throttle_rate_limit: Requests per second limit 

498 - throttle_burst_limit: Burst capacity 

499 - log_level: CloudWatch logging level (OFF, ERROR, INFO) 

500 - metrics_enabled: Enable CloudWatch metrics 

501 - tracing_enabled: Enable X-Ray tracing 

502 - regional_api_enabled: Enable regional API Gateways for private access 

503 When true, deploys a regional API Gateway with VPC Lambda in each 

504 region, allowing API access when the ALB is internal-only. 

505 """ 

506 default_config = { 

507 "throttle_rate_limit": 1000, 

508 "throttle_burst_limit": 2000, 

509 "log_level": "INFO", 

510 "metrics_enabled": True, 

511 "tracing_enabled": True, 

512 "regional_api_enabled": False, 

513 } 

514 return {**default_config, **(self.app.node.try_get_context("api_gateway") or {})} 

515 

516 def get_eks_cluster_config(self) -> dict[str, Any]: 

517 """Get EKS cluster configuration. 

518 

519 Returns: 

520 EKS cluster configuration dictionary with the following keys: 

521 - endpoint_access: EKS API endpoint access mode 

522 - "PRIVATE": API server only accessible from within VPC (default, most secure) 

523 - "PUBLIC_AND_PRIVATE": API server accessible from internet and VPC 

524 

525 Note: 

526 PRIVATE endpoint is recommended for production. Job submission still works 

527 via API Gateway → Lambda (in VPC) or SQS queues. For kubectl access with 

528 PRIVATE endpoint, use a bastion host, VPN, or AWS SSM Session Manager. 

529 """ 

530 default_config = { 

531 "endpoint_access": "PRIVATE", 

532 } 

533 return {**default_config, **(self.app.node.try_get_context("eks_cluster") or {})} 

534 

535 def get_fsx_lustre_config(self, region: str | None = None) -> dict[str, Any]: 

536 """Get FSx for Lustre configuration. 

537 

538 Args: 

539 region: Optional region to get config for. If provided, checks for 

540 region-specific overrides first. 

541 

542 Returns: 

543 FSx configuration dictionary with the following keys: 

544 - enabled: Whether FSx is enabled 

545 - storage_capacity_gib: Storage capacity in GiB (min 1200) 

546 - deployment_type: SCRATCH_1, SCRATCH_2, PERSISTENT_1, PERSISTENT_2 

547 - file_system_type_version: Lustre version (2.12 or 2.15, default: 2.15) 

548 IMPORTANT: Use 2.15 for kernel 6.x compatibility (AL2023, Bottlerocket) 

549 - per_unit_storage_throughput: Throughput for PERSISTENT types 

550 - data_compression_type: LZ4 or NONE 

551 - import_path: S3 path for data import 

552 - export_path: S3 path for data export 

553 - auto_import_policy: NEW, NEW_CHANGED, NEW_CHANGED_DELETED 

554 - node_group: Node group configuration for FSx workloads 

555 - instance_types: List of instance types 

556 - min_size: Minimum nodes (default: 0) 

557 - max_size: Maximum nodes (default: 10) 

558 - desired_size: Desired nodes (default: 0, scales from zero) 

559 - ami_type: AMI type - one of: 

560 AL2023_X86_64_STANDARD (default), AL2023_ARM_64_STANDARD, 

561 AL2023_X86_64_NVIDIA, AL2023_ARM_64_NVIDIA, AL2023_X86_64_NEURON 

562 - capacity_type: ON_DEMAND (default) or SPOT 

563 - disk_size: Root disk size in GB (default: 100) 

564 - labels: Additional node labels (dict) 

565 """ 

566 default_config = { 

567 "enabled": False, 

568 "storage_capacity_gib": 1200, 

569 "deployment_type": "SCRATCH_2", 

570 "file_system_type_version": "2.15", # Use 2.15 for kernel 6.x compatibility 

571 "per_unit_storage_throughput": 200, 

572 "data_compression_type": "LZ4", 

573 "import_path": None, 

574 "export_path": None, 

575 "auto_import_policy": "NEW_CHANGED_DELETED", 

576 "node_group": { 

577 "instance_types": ["m5.large", "m5.xlarge", "m6i.large", "m6i.xlarge"], 

578 "min_size": 0, 

579 "max_size": 10, 

580 "desired_size": 1, 

581 "ami_type": "AL2023_X86_64_STANDARD", 

582 "capacity_type": "ON_DEMAND", 

583 "disk_size": 100, 

584 "labels": {}, 

585 }, 

586 } 

587 

588 # Get global FSx config 

589 global_ctx = self.app.node.try_get_context("fsx_lustre") 

590 global_config: dict[str, Any] = global_ctx if isinstance(global_ctx, dict) else {} 

591 merged_config: dict[str, Any] = {**default_config, **global_config} 

592 

593 # Ensure node_group has all required fields with defaults 

594 if "node_group" in global_config: 

595 global_node_group = global_config["node_group"] 

596 if isinstance(global_node_group, dict): 596 ↛ 604line 596 didn't jump to line 604 because the condition on line 596 was always true

597 default_node_group = cast(dict[str, Any], default_config["node_group"]) 

598 merged_config["node_group"] = { 

599 **default_node_group, 

600 **global_node_group, 

601 } 

602 

603 # Check for region-specific override 

604 if region: 

605 region_overrides_ctx = self.app.node.try_get_context("fsx_lustre_regions") 

606 region_overrides: dict[str, Any] = ( 

607 region_overrides_ctx if isinstance(region_overrides_ctx, dict) else {} 

608 ) 

609 if region in region_overrides: 

610 region_config = region_overrides[region] 

611 if isinstance(region_config, dict): 611 ↛ 627line 611 didn't jump to line 627 because the condition on line 611 was always true

612 merged_config = {**merged_config, **region_config} 

613 # Handle nested node_group override 

614 if "node_group" in region_config: 

615 region_node_group = region_config["node_group"] 

616 if isinstance(region_node_group, dict): 616 ↛ 627line 616 didn't jump to line 627 because the condition on line 616 was always true

617 existing_node_group = merged_config.get("node_group") 

618 if isinstance(existing_node_group, dict): 618 ↛ 621line 618 didn't jump to line 621 because the condition on line 618 was always true

619 base_node_group = existing_node_group 

620 else: 

621 base_node_group = cast(dict[str, Any], default_config["node_group"]) 

622 merged_config["node_group"] = { 

623 **base_node_group, 

624 **region_node_group, 

625 } 

626 

627 return merged_config 

628 

629 def get_valkey_config(self) -> dict[str, Any]: 

630 """Get Valkey Serverless cache configuration. 

631 

632 Returns: 

633 Valkey configuration dictionary with the following keys: 

634 - enabled: Whether Valkey cache is enabled (default: True) 

635 - max_data_storage_gb: Maximum data storage in GB (default: 5) 

636 - max_ecpu_per_second: Maximum ECPUs per second (default: 5000) 

637 - snapshot_retention_limit: Daily snapshots to retain (default: 1) 

638 """ 

639 default_config: dict[str, Any] = { 

640 "enabled": True, 

641 "max_data_storage_gb": 5, 

642 "max_ecpu_per_second": 5000, 

643 "snapshot_retention_limit": 1, 

644 } 

645 valkey_ctx = self.app.node.try_get_context("valkey") 

646 valkey_config: dict[str, Any] = valkey_ctx if isinstance(valkey_ctx, dict) else {} 

647 return {**default_config, **valkey_config} 

648 

649 def get_aurora_pgvector_config(self) -> dict[str, Any]: 

650 """Get Aurora Serverless v2 + pgvector vector database configuration. 

651 

652 Returns: 

653 Aurora pgvector configuration dictionary with the following keys: 

654 - enabled: Whether Aurora pgvector is enabled (default: False) 

655 - min_acu: Minimum Aurora Capacity Units (default: 0, scales to zero) 

656 - max_acu: Maximum Aurora Capacity Units (default: 16) 

657 - backup_retention_days: Number of days to retain automated backups (default: 7) 

658 - deletion_protection: Whether deletion protection is enabled (default: False) 

659 """ 

660 default_config: dict[str, Any] = { 

661 "enabled": False, 

662 "min_acu": 0, 

663 "max_acu": 16, 

664 "backup_retention_days": 7, 

665 "deletion_protection": False, 

666 } 

667 aurora_ctx = self.app.node.try_get_context("aurora_pgvector") 

668 aurora_config: dict[str, Any] = aurora_ctx if isinstance(aurora_ctx, dict) else {} 

669 return {**default_config, **aurora_config} 

670 

671 def get_tags(self) -> dict[str, str]: 

672 """Get common tags from configuration""" 

673 return self.app.node.try_get_context("tags") or {} 

674 

675 def validate_region_availability(self, region: str) -> bool: 

676 """Validate that a region is available in the current AWS account""" 

677 try: 

678 ec2 = boto3.client("ec2", region_name=region) 

679 ec2.describe_regions(RegionNames=[region]) 

680 return True 

681 except Exception as e: 

682 logger.debug("Region %s not available: %s", region, e) 

683 return False 

684 

685 def get_available_regions(self) -> list[str]: 

686 """Get list of available AWS regions for the current account""" 

687 try: 

688 ec2 = boto3.client("ec2") 

689 response = ec2.describe_regions() 

690 return [region["RegionName"] for region in response["Regions"]] 

691 except Exception as e: 

692 logger.debug("Failed to list regions, using defaults: %s", e) 

693 return list(self.VALID_REGIONS)