Coverage for gco/services/manifest_processor.py: 97%

526 statements  

« prev     ^ index     » next       coverage.py v7.14.1, created at 2026-06-15 15:07 +0000

1""" 

2Manifest Processor Service for GCO (Global Capacity Orchestrator on AWS). 

3 

4This service processes Kubernetes manifest submissions, validates them against 

5security and resource constraints, and applies them to the cluster. 

6 

7Key Features: 

8- Validates manifests for required fields and structure 

9- Enforces namespace restrictions (only allowed namespaces) 

10- Enforces resource limits (CPU, memory, GPU per manifest) 

11- Validates security context (no privileged containers) 

12- Validates image sources (trusted registries only) 

13- Supports dry-run mode for validation without applying 

14 

15Security Validations: 

16- Namespace must be in allowed list (default: default, gco-jobs) 

17- No privileged containers or privilege escalation 

18- Images must be from trusted registries 

19- Resource requests/limits within configured maximums 

20 

21Environment Variables: 

22 CLUSTER_NAME: Name of the EKS cluster 

23 REGION: AWS region of the cluster 

24 MAX_CPU_PER_MANIFEST: Maximum CPU (millicores) per manifest (default: 10000) 

25 MAX_MEMORY_PER_MANIFEST: Maximum memory per manifest (default: 32Gi) 

26 MAX_GPU_PER_MANIFEST: Maximum GPUs per manifest (default: 4) 

27 ALLOWED_NAMESPACES: Comma-separated list of allowed namespaces 

28 VALIDATION_ENABLED: Enable/disable validation (default: true) 

29 

30Usage: 

31 processor = create_manifest_processor_from_env() 

32 response = await processor.process_manifest_submission(request) 

33""" 

34 

35from __future__ import annotations 

36 

37import logging 

38import os 

39from typing import Any, cast 

40 

41import yaml 

42from kubernetes import client, config, dynamic 

43from kubernetes.client.models import V1Job 

44from kubernetes.client.rest import ApiException 

45from kubernetes.dynamic.exceptions import ResourceNotFoundError 

46 

47from gco.models import ( 

48 ManifestSubmissionRequest, 

49 ManifestSubmissionResponse, 

50 ResourceStatus, 

51) 

52from gco.services.structured_logging import configure_structured_logging, sanitize_log_value 

53 

54# NOTE: No logging.basicConfig() here. This module is imported by the CLI 

55# (cli/jobs.py, cli/commands/*_cmd.py) as a library for YAML loading helpers. 

56# Calling basicConfig() at import time would configure the root logger with 

57# INFO-level output, causing noisy botocore/urllib3 INFO messages on every 

58# CLI command. Container entry points (manifest_api.py) do their own 

59# basicConfig() call. 

60logger = logging.getLogger(__name__) 

61 

62 

63# --------------------------------------------------------------------------- 

64# YAML Alias Rejection Loader 

65# --------------------------------------------------------------------------- 

66 

67 

68class NoAliasSafeLoader(yaml.SafeLoader): 

69 """A YAML SafeLoader that rejects anchors and aliases. 

70 

71 YAML anchors (``&anchor``) and aliases (``*anchor``) can be used to 

72 construct exponentially large data structures (billion-laughs attack). 

73 This loader raises an error when any alias is encountered, preventing 

74 such attacks at the parsing stage. 

75 """ 

76 

77 def compose_node(self, parent: Any, index: Any) -> Any: 

78 if self.check_event(yaml.AliasEvent): # type: ignore[no-untyped-call] 

79 event = self.get_event() # type: ignore[no-untyped-call] 

80 raise yaml.composer.ComposerError( 

81 None, 

82 None, 

83 "YAML aliases are not allowed " 

84 "(security policy: yaml_allow_aliases=false), " 

85 f"found alias *{event.anchor}", 

86 event.start_mark, 

87 ) 

88 return super().compose_node(parent, index) 

89 

90 

91def safe_load_yaml(stream: str | Any, *, allow_aliases: bool = False) -> Any: 

92 """Load a single YAML document with optional alias rejection. 

93 

94 Args: 

95 stream: YAML string or file-like object. 

96 allow_aliases: If False (default), reject YAML anchors/aliases. 

97 

98 Returns: 

99 Parsed YAML document. 

100 

101 Raises: 

102 yaml.YAMLError: If the document is invalid or contains aliases 

103 when ``allow_aliases`` is False. 

104 """ 

105 loader_cls = yaml.SafeLoader if allow_aliases else NoAliasSafeLoader 

106 # Loader is always a SafeLoader subclass (SafeLoader or NoAliasSafeLoader), 

107 # so this is equivalent to yaml.safe_load. Bandit's B506 check does not 

108 # recognize the custom loader as safe. 

109 return yaml.load(stream, Loader=loader_cls) # nosec B506 

110 

111 

112def safe_load_all_yaml(stream: str | Any, *, allow_aliases: bool = False) -> list[Any]: 

113 """Load all YAML documents from a stream with optional alias rejection. 

114 

115 Args: 

116 stream: YAML string or file-like object. 

117 allow_aliases: If False (default), reject YAML anchors/aliases. 

118 

119 Returns: 

120 List of parsed YAML documents (``None`` documents are skipped). 

121 

122 Raises: 

123 yaml.YAMLError: If any document is invalid or contains aliases 

124 when ``allow_aliases`` is False. 

125 """ 

126 loader_cls = yaml.SafeLoader if allow_aliases else NoAliasSafeLoader 

127 # Loader is always a SafeLoader subclass, so this is equivalent to 

128 # yaml.safe_load_all. Bandit's B506 check does not recognize the custom 

129 # loader as safe. 

130 return [ 

131 doc 

132 for doc in yaml.load_all(stream, Loader=loader_cls) 

133 if doc is not None # nosec B506 

134 ] 

135 

136 

137class ManifestProcessor: 

138 """ 

139 Processes Kubernetes manifest submissions and applies them to the cluster 

140 """ 

141 

142 def __init__(self, cluster_id: str, region: str, config_dict: dict[str, Any]): 

143 self.cluster_id = cluster_id 

144 self.region = region 

145 self.config = config_dict 

146 

147 # Initialize Kubernetes clients 

148 try: 

149 # Try to load in-cluster config first (when running in pod) 

150 config.load_incluster_config() 

151 logger.info("Loaded in-cluster Kubernetes configuration") 

152 except config.ConfigException: 

153 try: 

154 # Fall back to local kubeconfig (for development) 

155 config.load_kube_config() 

156 logger.info("Loaded local Kubernetes configuration") 

157 except config.ConfigException as e: 

158 logger.error(f"Failed to load Kubernetes configuration: {e}") 

159 raise 

160 

161 # Initialize API clients 

162 self.api_client = client.ApiClient() 

163 self.api_client.configuration.request_timeout = int(os.environ.get("K8S_API_TIMEOUT", "30")) 

164 self.core_v1 = client.CoreV1Api() 

165 self.apps_v1 = client.AppsV1Api() 

166 self.batch_v1 = client.BatchV1Api() 

167 self.networking_v1 = client.NetworkingV1Api() 

168 self.custom_objects = client.CustomObjectsApi() 

169 

170 # Dynamic client for CRDs - lazy initialized to avoid cluster connection during init 

171 self._dynamic_client: dynamic.DynamicClient | None = None 

172 

173 # Timeout for Kubernetes API calls (seconds) 

174 self._k8s_timeout = int(os.environ.get("K8S_API_TIMEOUT", "30")) 

175 

176 # Resource quotas and limits 

177 self.max_cpu_per_manifest = self._parse_cpu_string( 

178 config_dict.get("max_cpu_per_manifest", "10") 

179 ) 

180 self.max_memory_per_manifest = self._parse_memory_string( 

181 config_dict.get("max_memory_per_manifest", "32Gi") 

182 ) 

183 self.max_gpu_per_manifest = int(config_dict.get("max_gpu_per_manifest", 4)) 

184 self.allowed_namespaces = set( 

185 config_dict.get("allowed_namespaces", ["default", "gco-jobs"]) 

186 ) 

187 self.validation_enabled = config_dict.get("validation_enabled", True) 

188 

189 # Trusted registries for image validation (configurable via cdk.json) 

190 self.trusted_registries = config_dict.get( 

191 "trusted_registries", 

192 [ 

193 "docker.io", 

194 "gcr.io", 

195 "quay.io", 

196 "registry.k8s.io", 

197 "k8s.gcr.io", 

198 "public.ecr.aws", 

199 "nvcr.io", 

200 ], 

201 ) 

202 self.trusted_dockerhub_orgs = config_dict.get( 

203 "trusted_dockerhub_orgs", 

204 [ 

205 "nvidia", 

206 "pytorch", 

207 "rayproject", 

208 "tensorflow", 

209 "huggingface", 

210 "amazon", 

211 "bitnami", 

212 "gco", 

213 ], 

214 ) 

215 

216 # Warn about trusted_registries entries that look like Docker Hub orgs (no dot or colon) 

217 for registry in self.trusted_registries: 

218 if not self._is_registry_domain(registry): 218 ↛ 219line 218 didn't jump to line 219 because the condition on line 218 was never true

219 logger.warning( 

220 f"Trusted registry '{registry}' has no domain separator (dot or colon) — " 

221 f"consider moving it to trusted_dockerhub_orgs instead" 

222 ) 

223 

224 # YAML parsing limits (configurable via cdk.json) 

225 self.yaml_max_depth = int(config_dict.get("yaml_max_depth", 50)) 

226 

227 # Allowed resource kinds (configurable via cdk.json) 

228 self.allowed_kinds = set( 

229 config_dict.get( 

230 "allowed_kinds", 

231 [ 

232 "Job", 

233 "CronJob", 

234 "Deployment", 

235 "StatefulSet", 

236 "DaemonSet", 

237 "Service", 

238 "ConfigMap", 

239 "Pod", 

240 ], 

241 ) 

242 ) 

243 

244 # Security policy — toggleable checks (configurable via cdk.json) 

245 security_policy = config_dict.get("manifest_security_policy", {}) 

246 self.block_privileged = security_policy.get("block_privileged", True) 

247 self.block_privilege_escalation = security_policy.get("block_privilege_escalation", True) 

248 self.block_host_network = security_policy.get("block_host_network", True) 

249 self.block_host_pid = security_policy.get("block_host_pid", True) 

250 self.block_host_ipc = security_policy.get("block_host_ipc", True) 

251 self.block_host_path = security_policy.get("block_host_path", True) 

252 self.block_added_capabilities = security_policy.get("block_added_capabilities", True) 

253 self.block_run_as_root = security_policy.get("block_run_as_root", False) 

254 

255 # ------------------------------------------------------------------ 

256 # Security defaults injection 

257 # ------------------------------------------------------------------ 

258 

259 @staticmethod 

260 def _extract_pod_spec(manifest: dict[str, Any]) -> dict[str, Any] | None: 

261 """Extract the pod spec from a manifest, handling all workload types. 

262 

263 Supports: 

264 - Deployment / StatefulSet / DaemonSet / ReplicaSet → spec.template.spec 

265 - Job → spec.template.spec 

266 - CronJob → spec.jobTemplate.spec.template.spec 

267 - Bare Pod → spec (when ``containers`` key is present) 

268 

269 Returns: 

270 The pod spec dict (mutable reference), or ``None`` if the manifest 

271 does not contain a recognisable pod spec. 

272 """ 

273 spec = manifest.get("spec") 

274 if spec is None or not isinstance(spec, dict): 

275 return None 

276 

277 kind = manifest.get("kind", "") 

278 

279 # CronJob: spec.jobTemplate.spec.template.spec 

280 if kind == "CronJob": 

281 job_template = spec.get("jobTemplate") 

282 if isinstance(job_template, dict): 282 ↛ 290line 282 didn't jump to line 290 because the condition on line 282 was always true

283 job_spec = job_template.get("spec") 

284 if isinstance(job_spec, dict): 284 ↛ 290line 284 didn't jump to line 290 because the condition on line 284 was always true

285 template = job_spec.get("template") 

286 if isinstance(template, dict): 286 ↛ 290line 286 didn't jump to line 290 because the condition on line 286 was always true

287 pod_spec = template.get("spec") 

288 if isinstance(pod_spec, dict): 288 ↛ 290line 288 didn't jump to line 290 because the condition on line 288 was always true

289 return pod_spec 

290 return None 

291 

292 # Deployment / StatefulSet / DaemonSet / ReplicaSet / Job: 

293 # spec.template.spec 

294 if "template" in spec: 

295 template = spec.get("template") 

296 if isinstance(template, dict): 296 ↛ 300line 296 didn't jump to line 300 because the condition on line 296 was always true

297 pod_spec = template.get("spec") 

298 if isinstance(pod_spec, dict): 298 ↛ 300line 298 didn't jump to line 300 because the condition on line 298 was always true

299 return pod_spec 

300 return None 

301 

302 # Bare Pod: spec contains "containers" directly 

303 if "containers" in spec: 

304 return cast(dict[str, Any], spec) 

305 

306 return None 

307 

308 def _inject_security_defaults(self, manifest: dict[str, Any]) -> dict[str, Any]: 

309 """Inject security defaults into user-submitted manifests. 

310 

311 Currently injects: 

312 - ``automountServiceAccountToken: false`` in the pod spec (unless the 

313 user has explicitly set it). 

314 

315 The method mutates *manifest* in-place and returns it for convenience. 

316 """ 

317 pod_spec = self._extract_pod_spec(manifest) 

318 if pod_spec is not None: 

319 # Use setdefault so we don't override an explicit user choice 

320 pod_spec.setdefault("automountServiceAccountToken", False) 

321 return manifest 

322 

323 @property 

324 def dynamic_client(self) -> dynamic.DynamicClient: 

325 """Lazy-initialized dynamic client for CRD support.""" 

326 if self._dynamic_client is None: 

327 self._dynamic_client = dynamic.DynamicClient(self.api_client) 

328 return self._dynamic_client 

329 

330 def _parse_cpu_string(self, cpu_str: str) -> int: 

331 """Parse CPU string to millicores""" 

332 if not cpu_str: 

333 return 0 

334 

335 cpu_str = cpu_str.strip() 

336 if cpu_str.endswith("m"): 

337 return int(cpu_str[:-1]) 

338 return int(cpu_str) * 1000 

339 

340 def _parse_memory_string(self, memory_str: str) -> int: 

341 """Parse memory string to bytes""" 

342 if not memory_str: 

343 return 0 

344 

345 memory_str = memory_str.strip() 

346 

347 if memory_str.endswith("Ki"): 

348 return int(memory_str[:-2]) * 1024 

349 if memory_str.endswith("Mi"): 

350 return int(memory_str[:-2]) * 1024 * 1024 

351 if memory_str.endswith("Gi"): 

352 return int(memory_str[:-2]) * 1024 * 1024 * 1024 

353 if memory_str.endswith("Ti"): 

354 return int(memory_str[:-2]) * 1024 * 1024 * 1024 * 1024 

355 if memory_str.endswith("k"): 

356 return int(memory_str[:-1]) * 1000 

357 if memory_str.endswith("M"): 

358 return int(memory_str[:-1]) * 1000 * 1000 

359 if memory_str.endswith("G"): 

360 return int(memory_str[:-1]) * 1000 * 1000 * 1000 

361 return int(memory_str) 

362 

363 def _check_yaml_depth(self, obj: Any, current_depth: int = 0) -> bool: 

364 """Check if a parsed YAML/JSON object exceeds max nesting depth. 

365 

366 Recursively walks dicts and lists. Returns False if depth exceeds 

367 ``self.yaml_max_depth``. 

368 

369 Args: 

370 obj: The parsed object to check (dict, list, or scalar). 

371 current_depth: Current recursion depth (callers should leave at 0). 

372 

373 Returns: 

374 True if the object is within the depth limit, False otherwise. 

375 """ 

376 if current_depth > self.yaml_max_depth: 

377 return False 

378 if isinstance(obj, dict): 

379 return all(self._check_yaml_depth(v, current_depth + 1) for v in obj.values()) 

380 if isinstance(obj, list): 

381 return all(self._check_yaml_depth(item, current_depth + 1) for item in obj) 

382 return True 

383 

384 def validate_manifest(self, manifest: dict[str, Any]) -> tuple[bool, str | None]: 

385 """ 

386 Validate a Kubernetes manifest for security and resource constraints 

387 Returns: (is_valid, error_message) 

388 """ 

389 if not self.validation_enabled: 

390 return True, None 

391 

392 try: 

393 # YAML depth check — reject excessively nested documents 

394 if not self._check_yaml_depth(manifest): 

395 return ( 

396 False, 

397 f"Manifest exceeds maximum nesting depth of {self.yaml_max_depth} levels", 

398 ) 

399 

400 # Basic structure validation 

401 required_fields = ["apiVersion", "kind", "metadata"] 

402 for field in required_fields: 

403 if field not in manifest: 

404 return False, f"Missing required field: {field}" 

405 

406 # Validate metadata 

407 metadata = manifest.get("metadata", {}) 

408 if "name" not in metadata: 

409 return False, "Missing metadata.name field" 

410 

411 # Validate namespace 

412 namespace = metadata.get("namespace", "default") 

413 if namespace not in self.allowed_namespaces: 

414 return ( 

415 False, 

416 f"Namespace '{namespace}' not allowed. Allowed namespaces: {list(self.allowed_namespaces)}", 

417 ) 

418 

419 # Validate resource kind 

420 kind = manifest.get("kind", "") 

421 if kind not in self.allowed_kinds: 

422 return ( 

423 False, 

424 f"Resource kind '{kind}' is not allowed. Allowed kinds: {sorted(self.allowed_kinds)}", 

425 ) 

426 

427 # Validate resource limits for workload resources 

428 if kind in [ 

429 "Deployment", 

430 "Job", 

431 "CronJob", 

432 "StatefulSet", 

433 "DaemonSet", 

434 ]: 

435 resource_valid, resource_error = self._validate_resource_limits(manifest) 

436 if not resource_valid: 

437 return False, resource_error 

438 

439 # Security validations 

440 sec_valid, sec_error = self._validate_security_context(manifest) 

441 if not sec_valid: 

442 return False, f"Security context validation failed: {sec_error}" 

443 

444 # Validate image sources (prevent pulling from untrusted registries) 

445 img_valid, img_error = self._validate_image_sources(manifest) 

446 if not img_valid: 

447 return False, img_error or "Untrusted image sources detected" 

448 

449 return True, None 

450 

451 except Exception as e: 

452 logger.error(f"Error validating manifest: {e}") 

453 return False, f"Validation error: {e!s}" 

454 

455 def _validate_resource_limits(self, manifest: dict[str, Any]) -> tuple[bool, str]: 

456 """Validate resource limits in manifest. 

457 

458 Returns: 

459 Tuple of (is_valid, error_message). error_message is empty if valid. 

460 """ 

461 try: 

462 errors: list[str] = [] 

463 spec = manifest.get("spec", {}) 

464 

465 # Get pod spec (handle different resource types) 

466 pod_spec = {} 

467 if "template" in spec: # Deployment, StatefulSet, etc. 

468 pod_spec = spec.get("template", {}).get("spec", {}) 

469 elif "jobTemplate" in spec: # CronJob 469 ↛ 473line 469 didn't jump to line 473 because the condition on line 469 was always true

470 pod_spec = ( 

471 spec.get("jobTemplate", {}).get("spec", {}).get("template", {}).get("spec", {}) 

472 ) 

473 elif "containers" in spec: # Pod 

474 pod_spec = spec 

475 

476 total_cpu = 0 

477 total_memory = 0 

478 total_gpu = 0 

479 

480 for _container_type, container in self._get_all_containers(pod_spec): 

481 resources = container.get("resources", {}) 

482 requests = resources.get("requests", {}) 

483 limits = resources.get("limits", {}) 

484 

485 # Check CPU (use limits if available, otherwise requests) 

486 cpu = limits.get("cpu") or requests.get( # nosec B113 - dict.get(), not HTTP requests 

487 "cpu", "0" 

488 ) 

489 total_cpu += self._parse_cpu_string(cpu) 

490 

491 # Check Memory 

492 memory = limits.get("memory") or requests.get( # nosec B113 - dict.get(), not HTTP requests 

493 "memory", "0" 

494 ) 

495 total_memory += self._parse_memory_string(memory) 

496 

497 # Check GPU 

498 gpu = limits.get("nvidia.com/gpu") or requests.get( # nosec B113 - dict.get(), not HTTP requests 

499 "nvidia.com/gpu", "0" 

500 ) 

501 total_gpu += int(gpu) 

502 

503 # Validate against limits 

504 if total_cpu > self.max_cpu_per_manifest: 

505 logger.warning(f"CPU limit exceeded: {total_cpu}m > {self.max_cpu_per_manifest}m") 

506 errors.append(f"CPU {total_cpu}m exceeds max {self.max_cpu_per_manifest}m") 

507 

508 if total_memory > self.max_memory_per_manifest: 

509 logger.warning( 

510 f"Memory limit exceeded: {total_memory} > {self.max_memory_per_manifest}" 

511 ) 

512 mem_gb = self.max_memory_per_manifest / (1024**3) 

513 req_gb = total_memory / (1024**3) 

514 errors.append(f"Memory {req_gb:.0f}Gi exceeds max {mem_gb:.0f}Gi") 

515 

516 if total_gpu > self.max_gpu_per_manifest: 

517 logger.warning(f"GPU limit exceeded: {total_gpu} > {self.max_gpu_per_manifest}") 

518 errors.append(f"GPU {total_gpu} exceeds max {self.max_gpu_per_manifest}") 

519 

520 if errors: 

521 hint = ( 

522 "To raise limits, update resource_quotas in cdk.json " 

523 "and redeploy (see examples/README.md#troubleshooting)" 

524 ) 

525 return False, "; ".join(errors) + f". {hint}" 

526 

527 return True, "" 

528 

529 except Exception as e: 

530 logger.error(f"Error validating resource limits: {e}") 

531 return False, f"Resource limit validation error: {e}" 

532 

533 def _get_all_containers(self, pod_spec: dict[str, Any]) -> list[tuple[str, dict[str, Any]]]: 

534 """Get all containers from pod spec including init and ephemeral containers. 

535 

536 Returns: 

537 List of (container_type, container_dict) tuples where container_type 

538 is one of 'container', 'initContainer', or 'ephemeralContainer'. 

539 """ 

540 result = [] 

541 for c in pod_spec.get("containers", []): 

542 result.append(("container", c)) 

543 for c in pod_spec.get("initContainers", []): 

544 result.append(("initContainer", c)) 

545 for c in pod_spec.get("ephemeralContainers", []): 

546 result.append(("ephemeralContainer", c)) 

547 return result 

548 

549 def _validate_security_context(self, manifest: dict[str, Any]) -> tuple[bool, str | None]: 

550 """Validate security context settings. 

551 

552 Returns: 

553 Tuple of (is_valid, error_message). error_message is None if valid. 

554 """ 

555 try: 

556 # Basic security checks - prevent privileged containers 

557 spec = manifest.get("spec", {}) 

558 

559 # Get pod spec (handle different resource types) 

560 pod_spec = None 

561 if "template" in spec: 

562 pod_spec = spec.get("template", {}).get("spec", {}) 

563 elif "jobTemplate" in spec: 

564 pod_spec = ( 

565 spec.get("jobTemplate", {}).get("spec", {}).get("template", {}).get("spec", {}) 

566 ) 

567 elif "containers" in spec: 

568 pod_spec = spec 

569 

570 if pod_spec: 

571 # --- Pod-level checks --- 

572 if self.block_host_network and pod_spec.get("hostNetwork", False): 

573 return False, "hostNetwork is not permitted" 

574 

575 if self.block_host_pid and pod_spec.get("hostPID", False): 

576 return False, "hostPID is not permitted" 

577 

578 if self.block_host_ipc and pod_spec.get("hostIPC", False): 

579 return False, "hostIPC is not permitted" 

580 

581 # Check volumes for hostPath 

582 if self.block_host_path: 

583 for volume in pod_spec.get("volumes", []): 

584 if volume.get("hostPath") is not None: 

585 return False, "hostPath volumes are not permitted" 

586 

587 # Check pod security context 

588 security_context = pod_spec.get("securityContext", {}) 

589 if self.block_privileged and security_context.get("privileged", False): 

590 return False, "privileged pod security context is not permitted" 

591 

592 if self.block_run_as_root: 

593 run_as_user = security_context.get("runAsUser") 

594 if run_as_user is not None and run_as_user == 0: 

595 return False, "running as root (runAsUser: 0) is not permitted" 

596 

597 # --- Container-level checks --- 

598 for container_type, container in self._get_all_containers(pod_spec): 

599 container_name = container.get("name", "unknown") 

600 container_security = container.get("securityContext", {}) 

601 if self.block_privileged and container_security.get("privileged", False): 

602 return ( 

603 False, 

604 f"{container_type} '{container_name}': privileged containers are not permitted", 

605 ) 

606 if self.block_privilege_escalation and container_security.get( 

607 "allowPrivilegeEscalation", False 

608 ): 

609 return ( 

610 False, 

611 f"{container_type} '{container_name}': allowPrivilegeEscalation is not permitted", 

612 ) 

613 

614 # Check for added capabilities 

615 if self.block_added_capabilities: 

616 added_caps = container_security.get("capabilities", {}).get("add", []) 

617 if added_caps: 

618 return ( 

619 False, 

620 f"{container_type} '{container_name}': added capabilities are not permitted", 

621 ) 

622 

623 # Check for runAsUser: 0 (root) — off by default 

624 if self.block_run_as_root: 

625 run_as_user = container_security.get("runAsUser") 

626 if run_as_user is not None and run_as_user == 0: 

627 return ( 

628 False, 

629 f"{container_type} '{container_name}': running as root (runAsUser: 0) is not permitted", 

630 ) 

631 

632 return True, None 

633 

634 except Exception as e: 

635 logger.error(f"Error validating security context: {e}") 

636 return False, f"Security context error: {e}" 

637 

638 @staticmethod 

639 def _is_registry_domain(entry: str) -> bool: 

640 """Check if a registry entry is a proper domain (contains dot or colon). 

641 

642 A proper registry domain contains either a dot (e.g., 'docker.io', 'gcr.io') 

643 or a colon (e.g., 'localhost:5000'). Entries without these are Docker Hub 

644 organization names (e.g., 'nvidia', 'gco'). 

645 """ 

646 return "." in entry or ":" in entry 

647 

648 def _validate_image_sources(self, manifest: dict[str, Any]) -> tuple[bool, str | None]: 

649 """Validate container image sources. 

650 

651 Uses proper domain matching instead of prefix matching to prevent 

652 dependency confusion attacks (e.g., 'gco-malicious/evil' should NOT 

653 match a trusted registry entry 'gco'). 

654 

655 Matching logic: 

656 1. If image has no '/' → official Docker Hub image (always allowed) 

657 2. If image has '/' and part before first '/' contains a dot or colon 

658 → it's a registry domain → match against trusted_registries 

659 3. If image has '/' but first segment has no dot/colon 

660 → it's a Docker Hub org → match against trusted_dockerhub_orgs 

661 4. Digest references (@sha256:) are accepted from any trusted source 

662 

663 Returns: 

664 Tuple of (is_valid, error_message). error_message is None if valid. 

665 """ 

666 try: 

667 trusted_registries = self.trusted_registries 

668 trusted_dockerhub_orgs = self.trusted_dockerhub_orgs 

669 

670 spec = manifest.get("spec", {}) 

671 

672 # Get pod spec (handle different resource types) 

673 pod_spec = {} 

674 if "template" in spec: 

675 pod_spec = spec.get("template", {}).get("spec", {}) 

676 elif "jobTemplate" in spec: 

677 pod_spec = ( 

678 spec.get("jobTemplate", {}).get("spec", {}).get("template", {}).get("spec", {}) 

679 ) 

680 elif "containers" in spec: 

681 pod_spec = spec 

682 

683 for container_type, container in self._get_all_containers(pod_spec): 

684 image = container.get("image", "") 

685 if not image: 

686 continue 

687 

688 is_trusted = False 

689 

690 # Case 1: Official Docker Hub image (no slash) — e.g., "python:3.14", "busybox" 

691 if "/" not in image: 

692 is_trusted = True 

693 else: 

694 # Image has a slash — determine if first segment is a domain or org 

695 first_segment = image.split("/")[0] 

696 

697 if self._is_registry_domain(first_segment): 

698 # Case 2: First segment looks like a domain (has dot or colon) 

699 # Match against trusted_registries as exact domain match 

700 for registry in trusted_registries: 

701 if first_segment == registry: 

702 is_trusted = True 

703 break 

704 # Also support multi-level registry paths like "public.ecr.aws" 

705 # where the image might be "public.ecr.aws/lambda/python:3.14" 

706 if image.startswith(registry + "/"): 706 ↛ 707line 706 didn't jump to line 707 because the condition on line 706 was never true

707 is_trusted = True 

708 break 

709 else: 

710 # Case 3: First segment has no dot/colon — it's a Docker Hub org 

711 # Match against trusted_dockerhub_orgs 

712 if first_segment in trusted_dockerhub_orgs: 

713 is_trusted = True 

714 

715 if not is_trusted: 

716 container_name = container.get("name", "unknown") 

717 logger.warning(f"Untrusted image source: {image}") 

718 return ( 

719 False, 

720 f"{container_type} '{container_name}': Untrusted image source '{image}'", 

721 ) 

722 

723 return True, None 

724 

725 except Exception as e: 

726 logger.error(f"Error validating image sources: {e}") 

727 return False, f"Image source validation error: {e}" 

728 

729 async def process_manifest_submission( 

730 self, request: ManifestSubmissionRequest 

731 ) -> ManifestSubmissionResponse: 

732 """ 

733 Process a manifest submission request 

734 """ 

735 logger.info(f"Processing manifest submission with {len(request.manifests)} manifests") 

736 

737 resources = [] 

738 errors = [] 

739 overall_success = True 

740 

741 try: 

742 # Process each manifest 

743 for i, manifest_data in enumerate(request.manifests): 

744 try: 

745 # Validate manifest 

746 is_valid, error_msg = self.validate_manifest(manifest_data) 

747 if not is_valid: 

748 error_msg = f"Manifest {i + 1} validation failed: {error_msg}" 

749 errors.append(error_msg) 

750 logger.error(error_msg) 

751 

752 # Create failed resource status 

753 resource_status = ResourceStatus( 

754 api_version=manifest_data.get("apiVersion", "unknown"), 

755 kind=manifest_data.get("kind", "unknown"), 

756 name=manifest_data.get("metadata", {}).get("name", f"manifest-{i + 1}"), 

757 namespace=manifest_data.get("metadata", {}).get("namespace", "default"), 

758 status="failed", 

759 message=error_msg, 

760 ) 

761 resources.append(resource_status) 

762 overall_success = False 

763 continue 

764 

765 # Apply manifest if validation passed 

766 if not request.dry_run: 

767 resource_status = await self._apply_manifest( 

768 manifest_data, request.namespace 

769 ) 

770 resources.append(resource_status) 

771 

772 if not resource_status.is_successful(): 

773 overall_success = False 

774 else: 

775 # Dry run - just validate 

776 resource_status = ResourceStatus( 

777 api_version=manifest_data.get("apiVersion", "unknown"), 

778 kind=manifest_data.get("kind", "unknown"), 

779 name=manifest_data.get("metadata", {}).get("name", "unknown"), 

780 namespace=manifest_data.get("metadata", {}).get( 

781 "namespace", request.namespace or "default" 

782 ), 

783 status="unchanged", 

784 message="Dry run - validation passed", 

785 ) 

786 resources.append(resource_status) 

787 

788 except Exception as e: 

789 error_msg = f"Error processing manifest {i + 1}: {e!s}" 

790 errors.append(error_msg) 

791 logger.error(error_msg) 

792 overall_success = False 

793 

794 # Create failed resource status 

795 resource_status = ResourceStatus( 

796 api_version=manifest_data.get("apiVersion", "unknown"), 

797 kind=manifest_data.get("kind", "unknown"), 

798 name=manifest_data.get("metadata", {}).get("name", f"manifest-{i + 1}"), 

799 namespace=manifest_data.get("metadata", {}).get("namespace", "default"), 

800 status="failed", 

801 message=str(e), 

802 ) 

803 resources.append(resource_status) 

804 

805 except Exception as e: 

806 error_msg = f"Fatal error processing manifest submission: {e!s}" 

807 errors.append(error_msg) 

808 logger.error(error_msg) 

809 overall_success = False 

810 

811 response = ManifestSubmissionResponse( 

812 success=overall_success, 

813 cluster_id=self.cluster_id, 

814 region=self.region, 

815 resources=resources, 

816 errors=errors if errors else None, 

817 ) 

818 

819 logger.info( 

820 f"Manifest submission completed - Success: {overall_success}, " 

821 f"Resources: {len(resources)}, Errors: {len(errors)}" 

822 ) 

823 

824 return response 

825 

826 async def _apply_manifest( 

827 self, manifest_data: dict[str, Any], default_namespace: str | None = None 

828 ) -> ResourceStatus: 

829 """ 

830 Apply a single manifest to the cluster. 

831 

832 For Jobs and CronJobs, if the resource already exists and is completed/failed, 

833 it will be automatically deleted and recreated (since these resources are immutable). 

834 """ 

835 try: 

836 api_version: str = manifest_data.get("apiVersion", "unknown") 

837 kind: str = manifest_data.get("kind", "unknown") 

838 metadata = manifest_data.get("metadata", {}) 

839 name: str = metadata.get("name", "unknown") 

840 namespace: str = metadata.get("namespace", default_namespace or "default") 

841 

842 # Ensure namespace is set in manifest 

843 if "namespace" not in metadata and namespace: 

844 manifest_data["metadata"]["namespace"] = namespace 

845 

846 # Inject security defaults (e.g., automountServiceAccountToken: false) 

847 self._inject_security_defaults(manifest_data) 

848 

849 # Check if resource already exists 

850 existing_resource = await self._get_existing_resource( 

851 api_version, kind, name, namespace 

852 ) 

853 

854 if existing_resource: 

855 # Jobs are immutable — if one already exists and is finished, 

856 # delete it first so we can recreate cleanly. 

857 # If the job is still active, auto-rename to avoid collision. 

858 if kind == "Job": 

859 if self._is_job_finished(existing_resource): 

860 logger.info( 

861 f"Job {name} already exists and is finished, deleting before recreating" 

862 ) 

863 await self.delete_resource(api_version, kind, name, namespace) 

864 import asyncio 

865 

866 await asyncio.sleep(1) 

867 await self._create_resource(manifest_data) 

868 status = "created" 

869 message = "Previous completed job replaced with new submission" 

870 else: 

871 # Active job — rename to avoid destroying it 

872 import uuid 

873 

874 suffix = uuid.uuid4().hex[:5] 

875 new_name = f"{name}-{suffix}" 

876 manifest_data["metadata"]["name"] = new_name 

877 logger.warning( 

878 f"Job {name} is still active, renamed new submission to {new_name}" 

879 ) 

880 await self._create_resource(manifest_data) 

881 status = "created" 

882 message = ( 

883 f"Job '{name}' is still running. " 

884 f"New submission renamed to '{new_name}'." 

885 ) 

886 name = new_name 

887 else: 

888 # Update existing resource (works for mutable resources) 

889 updated_resource = await self._update_resource(manifest_data) 

890 status = "updated" if updated_resource else "unchanged" 

891 message = ( 

892 "Resource updated successfully" if updated_resource else "No changes needed" 

893 ) 

894 else: 

895 # Create new resource 

896 await self._create_resource(manifest_data) 

897 status = "created" 

898 message = "Resource created successfully" 

899 

900 return ResourceStatus( 

901 api_version=api_version, 

902 kind=kind, 

903 name=name, 

904 namespace=namespace, 

905 status=status, 

906 message=message, 

907 ) 

908 

909 except ApiException as e: 

910 logger.error(f"Kubernetes API error applying manifest: {e}") 

911 return ResourceStatus( 

912 api_version=manifest_data.get("apiVersion", "unknown"), 

913 kind=manifest_data.get("kind", "unknown"), 

914 name=manifest_data.get("metadata", {}).get("name", "unknown"), 

915 namespace=manifest_data.get("metadata", {}).get("namespace", "default"), 

916 status="failed", 

917 message=f"API error: {e.reason}", 

918 ) 

919 except Exception as e: 

920 logger.error(f"Error applying manifest: {e}") 

921 return ResourceStatus( 

922 api_version=manifest_data.get("apiVersion", "unknown"), 

923 kind=manifest_data.get("kind", "unknown"), 

924 name=manifest_data.get("metadata", {}).get("name", "unknown"), 

925 namespace=manifest_data.get("metadata", {}).get("namespace", "default"), 

926 status="failed", 

927 message=str(e), 

928 ) 

929 

930 def _is_job_finished(self, job_resource: dict[str, Any]) -> bool: 

931 """Check if a Kubernetes Job resource is in a terminal state (Complete or Failed).""" 

932 status = job_resource.get("status", {}) 

933 conditions = status.get("conditions") or [] 

934 for condition in conditions: 

935 cond_type = condition.get("type", "") 

936 cond_status = condition.get("status", "") 

937 if cond_type in ("Complete", "Failed") and cond_status == "True": 

938 return True 

939 return False 

940 

941 async def _get_existing_resource( 

942 self, api_version: str, kind: str, name: str, namespace: str 

943 ) -> dict[str, Any] | None: 

944 """Check if a resource already exists using dynamic client""" 

945 try: 

946 # Get the API resource 

947 api_resource = self._get_api_resource(api_version, kind) 

948 

949 # Try to get the resource 

950 if namespace and api_resource.namespaced: 

951 resource = api_resource.get(name=name, namespace=namespace) 

952 else: 

953 resource = api_resource.get(name=name) 

954 

955 if resource is not None: 955 ↛ 966line 955 didn't jump to line 966 because the condition on line 955 was always true

956 return dict(resource.to_dict()) 

957 

958 except ApiException as e: 

959 if e.status == 404: 

960 return None # Resource doesn't exist 

961 raise 

962 except ValueError: 

963 # Unknown resource type 

964 return None 

965 

966 return None 

967 

968 def _get_api_resource(self, api_version: str, kind: str) -> Any: 

969 """Get the API resource for a given apiVersion and kind using dynamic client.""" 

970 try: 

971 return self.dynamic_client.resources.get(api_version=api_version, kind=kind) 

972 except ResourceNotFoundError as e: 

973 logger.error( 

974 "Resource type not found: %s/%s", 

975 sanitize_log_value(api_version), 

976 sanitize_log_value(kind), 

977 ) 

978 raise ValueError(f"Unknown resource type: {api_version}/{kind}") from e 

979 

980 async def _create_resource(self, manifest_data: dict[str, Any]) -> bool: 

981 """Create a new resource from manifest using dynamic client""" 

982 try: 

983 api_version = manifest_data.get("apiVersion", "") 

984 kind = manifest_data.get("kind", "") 

985 namespace = manifest_data.get("metadata", {}).get("namespace") 

986 

987 # Get the API resource 

988 api_resource = self._get_api_resource(api_version, kind) 

989 

990 # Create the resource 

991 if namespace and api_resource.namespaced: 

992 api_resource.create(body=manifest_data, namespace=namespace) 

993 else: 

994 api_resource.create(body=manifest_data) 

995 

996 return True 

997 except Exception as e: 

998 logger.error(f"Error creating resource: {e}") 

999 raise 

1000 

1001 async def _update_resource(self, manifest_data: dict[str, Any]) -> bool: 

1002 """Update an existing resource using dynamic client""" 

1003 try: 

1004 api_version = manifest_data.get("apiVersion", "") 

1005 kind = manifest_data.get("kind", "") 

1006 name = manifest_data.get("metadata", {}).get("name", "") 

1007 namespace = manifest_data.get("metadata", {}).get("namespace") 

1008 

1009 # Get the API resource 

1010 api_resource = self._get_api_resource(api_version, kind) 

1011 

1012 # Update the resource using patch (server-side apply) 

1013 if namespace and api_resource.namespaced: 

1014 api_resource.patch( 

1015 body=manifest_data, 

1016 name=name, 

1017 namespace=namespace, 

1018 content_type="application/merge-patch+json", 

1019 ) 

1020 else: 

1021 api_resource.patch( 

1022 body=manifest_data, 

1023 name=name, 

1024 content_type="application/merge-patch+json", 

1025 ) 

1026 

1027 return True 

1028 except Exception as e: 

1029 logger.error(f"Error updating resource: {e}") 

1030 raise 

1031 

1032 async def delete_resource( 

1033 self, api_version: str, kind: str, name: str, namespace: str 

1034 ) -> ResourceStatus: 

1035 """ 

1036 Delete a resource from the cluster using dynamic client 

1037 """ 

1038 try: 

1039 # Get the API resource 

1040 api_resource = self._get_api_resource(api_version, kind) 

1041 

1042 # Delete the resource 

1043 if namespace and api_resource.namespaced: 

1044 api_resource.delete(name=name, namespace=namespace) 

1045 else: 

1046 api_resource.delete(name=name) 

1047 

1048 return ResourceStatus( 

1049 api_version=api_version, 

1050 kind=kind, 

1051 name=name, 

1052 namespace=namespace, 

1053 status="deleted", 

1054 message="Resource deleted successfully", 

1055 ) 

1056 

1057 except ValueError as e: 

1058 # Unknown resource type 

1059 return ResourceStatus( 

1060 api_version=api_version, 

1061 kind=kind, 

1062 name=name, 

1063 namespace=namespace, 

1064 status="failed", 

1065 message=str(e), 

1066 ) 

1067 except ApiException as e: 

1068 if e.status == 404: 

1069 return ResourceStatus( 

1070 api_version=api_version, 

1071 kind=kind, 

1072 name=name, 

1073 namespace=namespace, 

1074 status="unchanged", 

1075 message="Resource not found (already deleted)", 

1076 ) 

1077 return ResourceStatus( 

1078 api_version=api_version, 

1079 kind=kind, 

1080 name=name, 

1081 namespace=namespace, 

1082 status="failed", 

1083 message=f"Delete failed: {e.reason}", 

1084 ) 

1085 except Exception as e: 

1086 return ResourceStatus( 

1087 api_version=api_version, 

1088 kind=kind, 

1089 name=name, 

1090 namespace=namespace, 

1091 status="failed", 

1092 message=str(e), 

1093 ) 

1094 

1095 async def list_jobs( 

1096 self, namespace: str | None = None, status_filter: str | None = None 

1097 ) -> list[dict[str, Any]]: 

1098 """ 

1099 List Kubernetes Jobs from allowed namespaces. 

1100 

1101 Args: 

1102 namespace: Filter by specific namespace (must be in allowed_namespaces) 

1103 status_filter: Filter by status: "running", "completed", "failed" 

1104 

1105 Returns: 

1106 List of job dictionaries with metadata and status 

1107 """ 

1108 jobs = [] 

1109 

1110 # Determine which namespaces to query 

1111 if namespace: 

1112 if namespace not in self.allowed_namespaces: 

1113 raise ValueError( 

1114 f"Namespace '{namespace}' not allowed. " 

1115 f"Allowed namespaces: {list(self.allowed_namespaces)}" 

1116 ) 

1117 namespaces_to_query = [namespace] 

1118 else: 

1119 namespaces_to_query = list(self.allowed_namespaces) 

1120 

1121 for ns in namespaces_to_query: 

1122 try: 

1123 job_list = self.batch_v1.list_namespaced_job( 

1124 namespace=ns, _request_timeout=self._k8s_timeout 

1125 ) 

1126 for job in job_list.items: 

1127 job_dict = self._job_to_dict(job) 

1128 

1129 # Apply status filter 

1130 if status_filter: 

1131 job_status = self._get_job_status(job) 

1132 if job_status != status_filter: 

1133 continue 

1134 

1135 jobs.append(job_dict) 

1136 except ApiException as e: 

1137 logger.warning( 

1138 "Failed to list jobs in namespace %s: %s", sanitize_log_value(ns), e.reason 

1139 ) 

1140 continue 

1141 

1142 return jobs 

1143 

1144 def _job_to_dict(self, job: V1Job) -> dict[str, Any]: 

1145 """Convert a Kubernetes Job object to a dictionary.""" 

1146 metadata = job.metadata 

1147 status = job.status 

1148 spec = job.spec 

1149 

1150 return { 

1151 "metadata": { 

1152 "name": metadata.name, 

1153 "namespace": metadata.namespace, 

1154 "creationTimestamp": ( 

1155 metadata.creation_timestamp.isoformat() if metadata.creation_timestamp else None 

1156 ), 

1157 "labels": metadata.labels or {}, 

1158 "uid": metadata.uid, 

1159 }, 

1160 "spec": { 

1161 "parallelism": spec.parallelism, 

1162 "completions": spec.completions, 

1163 "backoffLimit": spec.backoff_limit, 

1164 }, 

1165 "status": { 

1166 "active": status.active or 0, 

1167 "succeeded": status.succeeded or 0, 

1168 "failed": status.failed or 0, 

1169 "startTime": status.start_time.isoformat() if status.start_time else None, 

1170 "completionTime": ( 

1171 status.completion_time.isoformat() if status.completion_time else None 

1172 ), 

1173 "conditions": [ 

1174 { 

1175 "type": c.type, 

1176 "status": c.status, 

1177 "reason": c.reason, 

1178 "message": c.message, 

1179 } 

1180 for c in (status.conditions or []) 

1181 ], 

1182 }, 

1183 } 

1184 

1185 def _get_job_status(self, job: V1Job) -> str: 

1186 """Determine the status of a job: running, completed, or failed.""" 

1187 status = job.status 

1188 conditions = status.conditions or [] 

1189 

1190 for condition in conditions: 

1191 if condition.type == "Complete" and condition.status == "True": 

1192 return "completed" 

1193 if condition.type == "Failed" and condition.status == "True": 1193 ↛ 1190line 1193 didn't jump to line 1190 because the condition on line 1193 was always true

1194 return "failed" 

1195 

1196 if (status.active or 0) > 0: 

1197 return "running" 

1198 

1199 return "pending" 

1200 

1201 async def get_resource_status( 

1202 self, api_version: str, kind: str, name: str, namespace: str 

1203 ) -> dict[str, Any] | None: 

1204 """ 

1205 Get the status of a specific resource 

1206 """ 

1207 try: 

1208 resource = await self._get_existing_resource(api_version, kind, name, namespace) 

1209 if resource: 

1210 return { 

1211 "api_version": api_version, 

1212 "kind": kind, 

1213 "name": name, 

1214 "namespace": namespace, 

1215 "exists": True, 

1216 "status": resource.get("status", {}), 

1217 "metadata": resource.get("metadata", {}), 

1218 "spec": resource.get("spec", {}), 

1219 } 

1220 return { 

1221 "api_version": api_version, 

1222 "kind": kind, 

1223 "name": name, 

1224 "namespace": namespace, 

1225 "exists": False, 

1226 } 

1227 except Exception as e: 

1228 logger.error(f"Error getting resource status: {e}") 

1229 return None 

1230 

1231 

1232def create_manifest_processor_from_env() -> ManifestProcessor: 

1233 """ 

1234 Create ManifestProcessor instance from environment variables 

1235 """ 

1236 cluster_id = os.getenv("CLUSTER_NAME", "unknown-cluster") 

1237 region = os.getenv("REGION", "unknown-region") 

1238 

1239 # Enable structured JSON logging for CloudWatch Insights 

1240 configure_structured_logging( 

1241 service_name="manifest-processor", 

1242 cluster_id=cluster_id, 

1243 region=region, 

1244 ) 

1245 

1246 # Load configuration from environment 

1247 config_dict = { 

1248 "max_cpu_per_manifest": os.getenv("MAX_CPU_PER_MANIFEST", "10"), 

1249 "max_memory_per_manifest": os.getenv("MAX_MEMORY_PER_MANIFEST", "32Gi"), 

1250 "max_gpu_per_manifest": int(os.getenv("MAX_GPU_PER_MANIFEST", "4")), 

1251 "allowed_namespaces": os.getenv("ALLOWED_NAMESPACES", "default,gco-jobs").split(","), 

1252 "validation_enabled": os.getenv("VALIDATION_ENABLED", "true").lower() == "true", 

1253 } 

1254 

1255 # Image registry allowlist — sourced from the same CDK env vars the 

1256 # queue_processor reads, so an attacker who holds sqs:SendMessage on 

1257 # the regional queue can't reach an image source the REST path 

1258 # rejects. When unset (or empty) the ManifestProcessor falls back 

1259 # to its hardcoded default. Empty/missing values are dropped to 

1260 # match the queue_processor's parsing rules. 

1261 trusted_registries_env = os.getenv("TRUSTED_REGISTRIES", "") 

1262 trusted_registries = [r.strip() for r in trusted_registries_env.split(",") if r.strip()] 

1263 if trusted_registries: 1263 ↛ 1264line 1263 didn't jump to line 1264 because the condition on line 1263 was never true

1264 config_dict["trusted_registries"] = trusted_registries 

1265 

1266 trusted_dockerhub_orgs_env = os.getenv("TRUSTED_DOCKERHUB_ORGS", "") 

1267 trusted_dockerhub_orgs = [o.strip() for o in trusted_dockerhub_orgs_env.split(",") if o.strip()] 

1268 if trusted_dockerhub_orgs: 1268 ↛ 1269line 1268 didn't jump to line 1269 because the condition on line 1268 was never true

1269 config_dict["trusted_dockerhub_orgs"] = trusted_dockerhub_orgs 

1270 

1271 return ManifestProcessor(cluster_id, region, config_dict)