Coverage for gco / config / config_loader.py: 97%
227 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-30 21:47 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-30 21:47 +0000
1"""
2Configuration loader for GCO (Global Capacity Orchestrator on AWS).
4This module loads and validates configuration from CDK context (cdk.json).
5It provides type-safe access to all configuration values with sensible defaults
6and comprehensive validation.
8Configuration Sections:
9- project_name: Unique identifier for the deployment
10- regions: List of AWS regions to deploy to
11- kubernetes_version: EKS Kubernetes version
12- resource_thresholds: CPU/memory/GPU utilization thresholds
13- global_accelerator: Global Accelerator settings
14- alb_config: Application Load Balancer health check settings
15- manifest_processor: Manifest validation and resource limits
16- api_gateway: Throttling and logging configuration
17- tags: Common tags applied to all resources
19Usage:
20 config = ConfigLoader(app)
21 regions = config.get_regions()
22 cluster_config = config.get_cluster_config("us-east-1")
23"""
25from __future__ import annotations
27import logging
28from typing import Any, cast
30import boto3
31from aws_cdk import App
33from gco.models import ClusterConfig, ResourceThresholds
35logger = logging.getLogger(__name__)
38class ConfigValidationError(Exception):
39 """Raised when configuration validation fails."""
41 pass
44class ConfigLoader:
45 """
46 Loads and validates configuration from CDK context (cdk.json)
47 """
49 # Valid AWS regions (subset of commonly used regions)
50 VALID_REGIONS = {
51 "us-east-1",
52 "us-east-2",
53 "us-west-1",
54 "us-west-2",
55 "eu-west-1",
56 "eu-west-2",
57 "eu-west-3",
58 "eu-central-1",
59 "ap-southeast-1",
60 "ap-southeast-2",
61 "ap-northeast-1",
62 "ap-northeast-2",
63 "ca-central-1",
64 "sa-east-1",
65 }
67 def __init__(self, app: App):
68 self.app = app
69 self._validate_configuration()
71 def _validate_configuration(self) -> None:
72 """Validate the entire configuration"""
73 # Check if we have any context at all (might be running outside CDK)
74 project_name = self.app.node.try_get_context("project_name")
75 if project_name is None:
76 # Running outside CDK context, skip validation
77 return
79 # Validate required fields exist
80 required_fields = [
81 "project_name",
82 "kubernetes_version",
83 "resource_thresholds",
84 ]
85 for field in required_fields:
86 if not self.app.node.try_get_context(field):
87 raise ConfigValidationError(f"Required configuration field '{field}' is missing")
89 # Check for deployment_regions
90 deployment_regions = self.app.node.try_get_context("deployment_regions")
91 if not deployment_regions:
92 raise ConfigValidationError(
93 "Required configuration field 'deployment_regions' is missing"
94 )
96 # Validate regions
97 self._validate_regions()
99 # Validate resource thresholds
100 self._validate_resource_thresholds()
102 # Validate Global Accelerator config
103 self._validate_global_accelerator_config()
105 # Validate ALB config
106 self._validate_alb_config()
108 # Validate manifest processor config
109 self._validate_manifest_processor_config()
111 # Validate API Gateway config
112 self._validate_api_gateway_config()
114 # Validate EKS cluster config
115 self._validate_eks_cluster_config()
117 def _validate_regions(self) -> None:
118 """Validate region configuration"""
119 regions = self.get_regions()
121 if not regions:
122 raise ConfigValidationError("At least one region must be specified")
124 if len(regions) > 10:
125 raise ConfigValidationError("Maximum of 10 regions supported")
127 for region in regions:
128 if region not in self.VALID_REGIONS:
129 raise ConfigValidationError(
130 f"Invalid region '{region}'. Valid regions: {sorted(self.VALID_REGIONS)}"
131 )
133 # Check for duplicates
134 if len(regions) != len(set(regions)):
135 raise ConfigValidationError("Duplicate regions found in configuration")
137 def _validate_resource_thresholds(self) -> None:
138 """Validate resource threshold configuration"""
139 thresholds_config = self.app.node.try_get_context("resource_thresholds")
141 required_thresholds = ["cpu_threshold", "memory_threshold", "gpu_threshold"]
142 for threshold in required_thresholds:
143 if threshold not in thresholds_config:
144 raise ConfigValidationError(f"Missing threshold configuration: {threshold}")
146 value = thresholds_config[threshold]
147 if not isinstance(value, int) or (value != -1 and not 0 <= value <= 100):
148 raise ConfigValidationError(
149 f"{threshold} must be an integer between 0 and 100 (or -1 to disable), got {value}"
150 )
152 # Validate optional thresholds if present
153 for opt_threshold in [
154 "pending_pods_threshold",
155 "pending_requested_cpu_vcpus",
156 "pending_requested_memory_gb",
157 "pending_requested_gpus",
158 ]:
159 if opt_threshold in thresholds_config:
160 value = thresholds_config[opt_threshold]
161 if not isinstance(value, int) or (value != -1 and value < 0):
162 raise ConfigValidationError(
163 f"{opt_threshold} must be a non-negative integer (or -1 to disable), got {value}"
164 )
166 def _validate_global_accelerator_config(self) -> None:
167 """Validate Global Accelerator configuration"""
168 ga_config = self.app.node.try_get_context("global_accelerator")
169 if not ga_config:
170 raise ConfigValidationError("global_accelerator configuration is required")
172 required_fields = [
173 "name",
174 "health_check_grace_period",
175 "health_check_interval",
176 "health_check_timeout",
177 "health_check_path",
178 ]
179 for field in required_fields:
180 if field not in ga_config:
181 raise ConfigValidationError(f"Missing global_accelerator configuration: {field}")
183 # Validate timing values
184 for field in ["health_check_grace_period", "health_check_interval", "health_check_timeout"]:
185 value = ga_config[field]
186 if not isinstance(value, int) or value <= 0:
187 raise ConfigValidationError(f"{field} must be a positive integer, got {value}")
189 # Validate health check path
190 if not ga_config["health_check_path"].startswith("/"):
191 raise ConfigValidationError("health_check_path must start with '/'")
193 def _validate_alb_config(self) -> None:
194 """Validate ALB configuration"""
195 alb_config = self.app.node.try_get_context("alb_config")
196 if not alb_config:
197 raise ConfigValidationError("alb_config configuration is required")
199 required_fields = [
200 "health_check_interval",
201 "health_check_timeout",
202 "healthy_threshold",
203 "unhealthy_threshold",
204 ]
205 for field in required_fields:
206 if field not in alb_config:
207 raise ConfigValidationError(f"Missing alb_config configuration: {field}")
209 value = alb_config[field]
210 if not isinstance(value, int) or value <= 0:
211 raise ConfigValidationError(f"{field} must be a positive integer, got {value}")
213 def _validate_manifest_processor_config(self) -> None:
214 """Validate manifest processor configuration.
216 The manifest processor section in cdk.json holds service-specific
217 settings only. The shared validation policy (allowed_namespaces,
218 resource_quotas, trusted_registries, trusted_dockerhub_orgs,
219 manifest_security_policy, allowed_kinds) lives under
220 ``job_validation_policy`` because the queue_processor reads the
221 same values.
222 """
223 mp_config = self.app.node.try_get_context("manifest_processor")
224 if not mp_config:
225 raise ConfigValidationError("manifest_processor configuration is required")
227 required_fields = [
228 "image",
229 "replicas",
230 "resource_limits",
231 ]
232 for field in required_fields:
233 if field not in mp_config:
234 raise ConfigValidationError(f"Missing manifest_processor configuration: {field}")
236 # Validate replicas
237 if not isinstance(mp_config["replicas"], int) or mp_config["replicas"] <= 0:
238 raise ConfigValidationError("manifest_processor replicas must be a positive integer")
240 # Validate the shared policy section separately so a misconfigured
241 # policy block surfaces a clear error pointing at the right key.
242 policy = self.app.node.try_get_context("job_validation_policy")
243 if policy is None:
244 raise ConfigValidationError(
245 "job_validation_policy configuration is required (shared between "
246 "manifest_processor and queue_processor)"
247 )
248 for policy_field in ("allowed_namespaces", "resource_quotas"):
249 if policy_field not in policy:
250 raise ConfigValidationError(
251 f"Missing job_validation_policy configuration: {policy_field}"
252 )
254 # Validate resource limits
255 resource_limits = mp_config["resource_limits"]
256 if "cpu" not in resource_limits or "memory" not in resource_limits:
257 raise ConfigValidationError(
258 "manifest_processor resource_limits must contain 'cpu' and 'memory'"
259 )
261 # Validate allowed namespaces (lives under job_validation_policy).
262 if not isinstance(policy["allowed_namespaces"], list):
263 raise ConfigValidationError("job_validation_policy.allowed_namespaces must be a list")
265 def _validate_api_gateway_config(self) -> None:
266 """Validate API Gateway configuration"""
267 api_gw_config = self.app.node.try_get_context("api_gateway")
268 if not api_gw_config:
269 raise ConfigValidationError("api_gateway configuration is required")
271 required_fields = [
272 "throttle_rate_limit",
273 "throttle_burst_limit",
274 "log_level",
275 "metrics_enabled",
276 "tracing_enabled",
277 ]
278 for field in required_fields:
279 if field not in api_gw_config:
280 raise ConfigValidationError(f"Missing api_gateway configuration: {field}")
282 # Validate throttle limits
283 throttle_rate = api_gw_config["throttle_rate_limit"]
284 throttle_burst = api_gw_config["throttle_burst_limit"]
286 if not isinstance(throttle_rate, int) or throttle_rate <= 0:
287 raise ConfigValidationError(
288 f"throttle_rate_limit must be a positive integer, got {throttle_rate}"
289 )
291 if not isinstance(throttle_burst, int) or throttle_burst <= 0:
292 raise ConfigValidationError(
293 f"throttle_burst_limit must be a positive integer, got {throttle_burst}"
294 )
296 if throttle_burst < throttle_rate:
297 raise ConfigValidationError(
298 "throttle_burst_limit should be greater than or equal to throttle_rate_limit"
299 )
301 # Validate log level
302 valid_log_levels = ["OFF", "ERROR", "INFO"]
303 log_level = api_gw_config["log_level"]
304 if log_level not in valid_log_levels:
305 raise ConfigValidationError(
306 f"log_level must be one of {valid_log_levels}, got {log_level}"
307 )
309 # Validate boolean flags
310 if not isinstance(api_gw_config["metrics_enabled"], bool):
311 raise ConfigValidationError("metrics_enabled must be a boolean")
313 if not isinstance(api_gw_config["tracing_enabled"], bool):
314 raise ConfigValidationError("tracing_enabled must be a boolean")
316 def _validate_eks_cluster_config(self) -> None:
317 """Validate EKS cluster configuration"""
318 eks_config = self.app.node.try_get_context("eks_cluster") or {}
320 # Validate endpoint_access if present
321 if "endpoint_access" in eks_config:
322 valid_access_modes = ["PRIVATE", "PUBLIC_AND_PRIVATE"]
323 if eks_config["endpoint_access"] not in valid_access_modes:
324 raise ConfigValidationError(
325 f"endpoint_access must be one of {valid_access_modes}, "
326 f"got {eks_config['endpoint_access']}"
327 )
329 def get_project_name(self) -> str:
330 """Get project name from configuration"""
331 return self.app.node.try_get_context("project_name") or "gco"
333 def get_deployment_regions(self) -> dict[str, Any]:
334 """Get deployment regions configuration.
336 Returns a dict with:
337 - global: Region for Global Accelerator and SSM parameters (default: us-east-2)
338 - api_gateway: Region for API Gateway stack (default: us-east-2)
339 - monitoring: Region for Monitoring stack (default: us-east-2)
340 - regional: List of regions for EKS clusters (default: ["us-east-1"])
342 Note: Global Accelerator is a global service but requires a "home" region
343 for CloudFormation deployment. us-east-2 is used by default to keep
344 global infrastructure separate from workload regions.
345 """
346 deployment_regions = self.app.node.try_get_context("deployment_regions") or {}
348 return {
349 "global": deployment_regions.get("global", "us-east-2"),
350 "api_gateway": deployment_regions.get("api_gateway", "us-east-2"),
351 "monitoring": deployment_regions.get("monitoring", "us-east-2"),
352 "regional": deployment_regions.get("regional", ["us-east-1"]),
353 }
355 def get_global_region(self) -> str:
356 """Get the region for global resources (Global Accelerator, SSM params)."""
357 region = self.get_deployment_regions()["global"]
358 return str(region)
360 def get_api_gateway_region(self) -> str:
361 """Get the region for API Gateway stack."""
362 region = self.get_deployment_regions()["api_gateway"]
363 return str(region)
365 def get_monitoring_region(self) -> str:
366 """Get the region for Monitoring stack."""
367 region = self.get_deployment_regions()["monitoring"]
368 return str(region)
370 def get_regions(self) -> list[str]:
371 """Get list of regions for EKS cluster deployment."""
372 deployment_regions = self.get_deployment_regions()
373 regional = deployment_regions["regional"]
374 return list(regional) if isinstance(regional, list) else [str(regional)]
376 def get_kubernetes_version(self) -> str:
377 """Get Kubernetes version from configuration"""
378 return self.app.node.try_get_context("kubernetes_version") or "1.35"
380 def get_resource_thresholds(self) -> ResourceThresholds:
381 """Get resource thresholds configuration"""
382 thresholds_config = self.app.node.try_get_context("resource_thresholds") or {
383 "cpu_threshold": 60,
384 "memory_threshold": 60,
385 "gpu_threshold": -1,
386 "pending_pods_threshold": 10,
387 "pending_requested_cpu_vcpus": 100,
388 "pending_requested_memory_gb": 200,
389 "pending_requested_gpus": -1,
390 }
391 return ResourceThresholds(
392 cpu_threshold=thresholds_config["cpu_threshold"],
393 memory_threshold=thresholds_config["memory_threshold"],
394 gpu_threshold=thresholds_config["gpu_threshold"],
395 pending_pods_threshold=thresholds_config.get("pending_pods_threshold", 10),
396 pending_requested_cpu_vcpus=thresholds_config.get("pending_requested_cpu_vcpus", 100),
397 pending_requested_memory_gb=thresholds_config.get("pending_requested_memory_gb", 200),
398 pending_requested_gpus=thresholds_config.get("pending_requested_gpus", 8),
399 )
401 def get_cluster_config(self, region: str) -> ClusterConfig:
402 """Get complete cluster configuration for a region"""
403 return ClusterConfig(
404 region=region,
405 cluster_name=f"{self.get_project_name()}-{region}",
406 kubernetes_version=self.get_kubernetes_version(),
407 addons=["metrics-server"],
408 resource_thresholds=self.get_resource_thresholds(),
409 )
411 def get_global_accelerator_config(self) -> dict[str, Any]:
412 """Get Global Accelerator configuration"""
413 return self.app.node.try_get_context("global_accelerator") or {
414 "name": "gco-accelerator",
415 "health_check_grace_period": 30,
416 "health_check_interval": 30,
417 "health_check_timeout": 5,
418 "health_check_path": "/api/v1/health",
419 }
421 def get_alb_config(self) -> dict[str, Any]:
422 """Get ALB configuration"""
423 return self.app.node.try_get_context("alb_config") or {
424 "health_check_interval": 30,
425 "health_check_timeout": 5,
426 "healthy_threshold": 2,
427 "unhealthy_threshold": 2,
428 }
430 def get_manifest_processor_config(self) -> dict[str, Any]:
431 """Get manifest processor configuration.
433 Merges three cdk.json sections into a single runtime config:
435 - ``manifest_processor``: service-specific settings (replicas, image,
436 resource_limits, allowed_namespaces, validation_enabled,
437 max_request_body_bytes, yaml_max_depth, yaml_allow_aliases)
438 - ``job_validation_policy``: shared validation policy (resource_quotas,
439 trusted_registries, trusted_dockerhub_orgs, manifest_security_policy,
440 allowed_kinds). Pulled in verbatim so the REST path reads the same
441 policy the SQS queue processor enforces.
443 Note: The 'image' field is a placeholder default. In practice, the actual
444 image is built from dockerfiles/manifest-processor-dockerfile and pushed
445 to ECR during CDK deployment. The {{MANIFEST_PROCESSOR_IMAGE}} placeholder
446 in manifests is replaced with the ECR image URI.
447 """
448 default_config = {
449 "image": "gco/manifest-processor:latest", # Placeholder, replaced by ECR image
450 "replicas": 3,
451 "resource_limits": {"cpu": "1000m", "memory": "2Gi"},
452 "validation_enabled": True,
453 # allowed_namespaces, resource_quotas, trusted_registries,
454 # trusted_dockerhub_orgs, manifest_security_policy, and
455 # allowed_kinds are merged in below from job_validation_policy.
456 "allowed_namespaces": ["default", "gco-jobs"],
457 "resource_quotas": {
458 "max_cpu_per_manifest": "10",
459 "max_memory_per_manifest": "32Gi",
460 "max_gpu_per_manifest": 4,
461 },
462 "trusted_registries": [
463 "docker.io",
464 "gcr.io",
465 "quay.io",
466 "registry.k8s.io",
467 "k8s.gcr.io",
468 "public.ecr.aws",
469 "nvcr.io",
470 "gco",
471 ],
472 "trusted_dockerhub_orgs": [
473 "nvidia",
474 "pytorch",
475 "rayproject",
476 "tensorflow",
477 "huggingface",
478 "amazon",
479 "bitnami",
480 ],
481 }
482 context_config = self.app.node.try_get_context("manifest_processor") or {}
484 # Merge in the shared job_validation_policy section. These keys apply
485 # to BOTH the manifest processor and the queue processor; they live
486 # in their own top-level cdk.json section so neither service "owns"
487 # them. We flatten them into the manifest processor's runtime config
488 # so service code keeps its existing attribute layout.
489 shared_policy = self.app.node.try_get_context("job_validation_policy") or {}
490 return {**default_config, **context_config, **shared_policy}
492 def get_api_gateway_config(self) -> dict[str, Any]:
493 """Get API Gateway configuration.
495 Returns:
496 API Gateway configuration dictionary with the following keys:
497 - throttle_rate_limit: Requests per second limit
498 - throttle_burst_limit: Burst capacity
499 - log_level: CloudWatch logging level (OFF, ERROR, INFO)
500 - metrics_enabled: Enable CloudWatch metrics
501 - tracing_enabled: Enable X-Ray tracing
502 - regional_api_enabled: Enable regional API Gateways for private access
503 When true, deploys a regional API Gateway with VPC Lambda in each
504 region, allowing API access when the ALB is internal-only.
505 """
506 default_config = {
507 "throttle_rate_limit": 1000,
508 "throttle_burst_limit": 2000,
509 "log_level": "INFO",
510 "metrics_enabled": True,
511 "tracing_enabled": True,
512 "regional_api_enabled": False,
513 }
514 return {**default_config, **(self.app.node.try_get_context("api_gateway") or {})}
516 def get_eks_cluster_config(self) -> dict[str, Any]:
517 """Get EKS cluster configuration.
519 Returns:
520 EKS cluster configuration dictionary with the following keys:
521 - endpoint_access: EKS API endpoint access mode
522 - "PRIVATE": API server only accessible from within VPC (default, most secure)
523 - "PUBLIC_AND_PRIVATE": API server accessible from internet and VPC
525 Note:
526 PRIVATE endpoint is recommended for production. Job submission still works
527 via API Gateway → Lambda (in VPC) or SQS queues. For kubectl access with
528 PRIVATE endpoint, use a bastion host, VPN, or AWS SSM Session Manager.
529 """
530 default_config = {
531 "endpoint_access": "PRIVATE",
532 }
533 return {**default_config, **(self.app.node.try_get_context("eks_cluster") or {})}
535 def get_fsx_lustre_config(self, region: str | None = None) -> dict[str, Any]:
536 """Get FSx for Lustre configuration.
538 Args:
539 region: Optional region to get config for. If provided, checks for
540 region-specific overrides first.
542 Returns:
543 FSx configuration dictionary with the following keys:
544 - enabled: Whether FSx is enabled
545 - storage_capacity_gib: Storage capacity in GiB (min 1200)
546 - deployment_type: SCRATCH_1, SCRATCH_2, PERSISTENT_1, PERSISTENT_2
547 - file_system_type_version: Lustre version (2.12 or 2.15, default: 2.15)
548 IMPORTANT: Use 2.15 for kernel 6.x compatibility (AL2023, Bottlerocket)
549 - per_unit_storage_throughput: Throughput for PERSISTENT types
550 - data_compression_type: LZ4 or NONE
551 - import_path: S3 path for data import
552 - export_path: S3 path for data export
553 - auto_import_policy: NEW, NEW_CHANGED, NEW_CHANGED_DELETED
554 - node_group: Node group configuration for FSx workloads
555 - instance_types: List of instance types
556 - min_size: Minimum nodes (default: 0)
557 - max_size: Maximum nodes (default: 10)
558 - desired_size: Desired nodes (default: 0, scales from zero)
559 - ami_type: AMI type - one of:
560 AL2023_X86_64_STANDARD (default), AL2023_ARM_64_STANDARD,
561 AL2023_X86_64_NVIDIA, AL2023_ARM_64_NVIDIA, AL2023_X86_64_NEURON
562 - capacity_type: ON_DEMAND (default) or SPOT
563 - disk_size: Root disk size in GB (default: 100)
564 - labels: Additional node labels (dict)
565 """
566 default_config = {
567 "enabled": False,
568 "storage_capacity_gib": 1200,
569 "deployment_type": "SCRATCH_2",
570 "file_system_type_version": "2.15", # Use 2.15 for kernel 6.x compatibility
571 "per_unit_storage_throughput": 200,
572 "data_compression_type": "LZ4",
573 "import_path": None,
574 "export_path": None,
575 "auto_import_policy": "NEW_CHANGED_DELETED",
576 "node_group": {
577 "instance_types": ["m5.large", "m5.xlarge", "m6i.large", "m6i.xlarge"],
578 "min_size": 0,
579 "max_size": 10,
580 "desired_size": 1,
581 "ami_type": "AL2023_X86_64_STANDARD",
582 "capacity_type": "ON_DEMAND",
583 "disk_size": 100,
584 "labels": {},
585 },
586 }
588 # Get global FSx config
589 global_ctx = self.app.node.try_get_context("fsx_lustre")
590 global_config: dict[str, Any] = global_ctx if isinstance(global_ctx, dict) else {}
591 merged_config: dict[str, Any] = {**default_config, **global_config}
593 # Ensure node_group has all required fields with defaults
594 if "node_group" in global_config:
595 global_node_group = global_config["node_group"]
596 if isinstance(global_node_group, dict): 596 ↛ 604line 596 didn't jump to line 604 because the condition on line 596 was always true
597 default_node_group = cast(dict[str, Any], default_config["node_group"])
598 merged_config["node_group"] = {
599 **default_node_group,
600 **global_node_group,
601 }
603 # Check for region-specific override
604 if region:
605 region_overrides_ctx = self.app.node.try_get_context("fsx_lustre_regions")
606 region_overrides: dict[str, Any] = (
607 region_overrides_ctx if isinstance(region_overrides_ctx, dict) else {}
608 )
609 if region in region_overrides:
610 region_config = region_overrides[region]
611 if isinstance(region_config, dict): 611 ↛ 627line 611 didn't jump to line 627 because the condition on line 611 was always true
612 merged_config = {**merged_config, **region_config}
613 # Handle nested node_group override
614 if "node_group" in region_config:
615 region_node_group = region_config["node_group"]
616 if isinstance(region_node_group, dict): 616 ↛ 627line 616 didn't jump to line 627 because the condition on line 616 was always true
617 existing_node_group = merged_config.get("node_group")
618 if isinstance(existing_node_group, dict): 618 ↛ 621line 618 didn't jump to line 621 because the condition on line 618 was always true
619 base_node_group = existing_node_group
620 else:
621 base_node_group = cast(dict[str, Any], default_config["node_group"])
622 merged_config["node_group"] = {
623 **base_node_group,
624 **region_node_group,
625 }
627 return merged_config
629 def get_valkey_config(self) -> dict[str, Any]:
630 """Get Valkey Serverless cache configuration.
632 Returns:
633 Valkey configuration dictionary with the following keys:
634 - enabled: Whether Valkey cache is enabled (default: True)
635 - max_data_storage_gb: Maximum data storage in GB (default: 5)
636 - max_ecpu_per_second: Maximum ECPUs per second (default: 5000)
637 - snapshot_retention_limit: Daily snapshots to retain (default: 1)
638 """
639 default_config: dict[str, Any] = {
640 "enabled": True,
641 "max_data_storage_gb": 5,
642 "max_ecpu_per_second": 5000,
643 "snapshot_retention_limit": 1,
644 }
645 valkey_ctx = self.app.node.try_get_context("valkey")
646 valkey_config: dict[str, Any] = valkey_ctx if isinstance(valkey_ctx, dict) else {}
647 return {**default_config, **valkey_config}
649 def get_aurora_pgvector_config(self) -> dict[str, Any]:
650 """Get Aurora Serverless v2 + pgvector vector database configuration.
652 Returns:
653 Aurora pgvector configuration dictionary with the following keys:
654 - enabled: Whether Aurora pgvector is enabled (default: False)
655 - min_acu: Minimum Aurora Capacity Units (default: 0, scales to zero)
656 - max_acu: Maximum Aurora Capacity Units (default: 16)
657 - backup_retention_days: Number of days to retain automated backups (default: 7)
658 - deletion_protection: Whether deletion protection is enabled (default: False)
659 """
660 default_config: dict[str, Any] = {
661 "enabled": False,
662 "min_acu": 0,
663 "max_acu": 16,
664 "backup_retention_days": 7,
665 "deletion_protection": False,
666 }
667 aurora_ctx = self.app.node.try_get_context("aurora_pgvector")
668 aurora_config: dict[str, Any] = aurora_ctx if isinstance(aurora_ctx, dict) else {}
669 return {**default_config, **aurora_config}
671 def get_tags(self) -> dict[str, str]:
672 """Get common tags from configuration"""
673 return self.app.node.try_get_context("tags") or {}
675 def validate_region_availability(self, region: str) -> bool:
676 """Validate that a region is available in the current AWS account"""
677 try:
678 ec2 = boto3.client("ec2", region_name=region)
679 ec2.describe_regions(RegionNames=[region])
680 return True
681 except Exception as e:
682 logger.debug("Region %s not available: %s", region, e)
683 return False
685 def get_available_regions(self) -> list[str]:
686 """Get list of available AWS regions for the current account"""
687 try:
688 ec2 = boto3.client("ec2")
689 response = ec2.describe_regions()
690 return [region["RegionName"] for region in response["Regions"]]
691 except Exception as e:
692 logger.debug("Failed to list regions, using defaults: %s", e)
693 return list(self.VALID_REGIONS)