Coverage for gco / services / manifest_processor.py: 97%

518 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-30 21:47 +0000

1""" 

2Manifest Processor Service for GCO (Global Capacity Orchestrator on AWS). 

3 

4This service processes Kubernetes manifest submissions, validates them against 

5security and resource constraints, and applies them to the cluster. 

6 

7Key Features: 

8- Validates manifests for required fields and structure 

9- Enforces namespace restrictions (only allowed namespaces) 

10- Enforces resource limits (CPU, memory, GPU per manifest) 

11- Validates security context (no privileged containers) 

12- Validates image sources (trusted registries only) 

13- Supports dry-run mode for validation without applying 

14 

15Security Validations: 

16- Namespace must be in allowed list (default: default, gco-jobs) 

17- No privileged containers or privilege escalation 

18- Images must be from trusted registries 

19- Resource requests/limits within configured maximums 

20 

21Environment Variables: 

22 CLUSTER_NAME: Name of the EKS cluster 

23 REGION: AWS region of the cluster 

24 MAX_CPU_PER_MANIFEST: Maximum CPU (millicores) per manifest (default: 10000) 

25 MAX_MEMORY_PER_MANIFEST: Maximum memory per manifest (default: 32Gi) 

26 MAX_GPU_PER_MANIFEST: Maximum GPUs per manifest (default: 4) 

27 ALLOWED_NAMESPACES: Comma-separated list of allowed namespaces 

28 VALIDATION_ENABLED: Enable/disable validation (default: true) 

29 

30Usage: 

31 processor = create_manifest_processor_from_env() 

32 response = await processor.process_manifest_submission(request) 

33""" 

34 

35from __future__ import annotations 

36 

37import logging 

38import os 

39from typing import Any, cast 

40 

41import yaml 

42from kubernetes import client, config, dynamic 

43from kubernetes.client.models import V1Job 

44from kubernetes.client.rest import ApiException 

45from kubernetes.dynamic.exceptions import ResourceNotFoundError 

46 

47from gco.models import ( 

48 ManifestSubmissionRequest, 

49 ManifestSubmissionResponse, 

50 ResourceStatus, 

51) 

52from gco.services.structured_logging import configure_structured_logging 

53 

54# NOTE: No logging.basicConfig() here. This module is imported by the CLI 

55# (cli/jobs.py, cli/commands/*_cmd.py) as a library for YAML loading helpers. 

56# Calling basicConfig() at import time would configure the root logger with 

57# INFO-level output, causing noisy botocore/urllib3 INFO messages on every 

58# CLI command. Container entry points (manifest_api.py) do their own 

59# basicConfig() call. 

60logger = logging.getLogger(__name__) 

61 

62 

63# --------------------------------------------------------------------------- 

64# YAML Alias Rejection Loader 

65# --------------------------------------------------------------------------- 

66 

67 

68class NoAliasSafeLoader(yaml.SafeLoader): 

69 """A YAML SafeLoader that rejects anchors and aliases. 

70 

71 YAML anchors (``&anchor``) and aliases (``*anchor``) can be used to 

72 construct exponentially large data structures (billion-laughs attack). 

73 This loader raises an error when any alias is encountered, preventing 

74 such attacks at the parsing stage. 

75 """ 

76 

77 def compose_node(self, parent: Any, index: Any) -> Any: 

78 if self.check_event(yaml.AliasEvent): # type: ignore[no-untyped-call] 

79 event = self.get_event() # type: ignore[no-untyped-call] 

80 raise yaml.composer.ComposerError( 

81 None, 

82 None, 

83 "YAML aliases are not allowed " 

84 "(security policy: yaml_allow_aliases=false), " 

85 f"found alias *{event.anchor}", 

86 event.start_mark, 

87 ) 

88 return super().compose_node(parent, index) 

89 

90 

91def safe_load_yaml(stream: str | Any, *, allow_aliases: bool = False) -> Any: 

92 """Load a single YAML document with optional alias rejection. 

93 

94 Args: 

95 stream: YAML string or file-like object. 

96 allow_aliases: If False (default), reject YAML anchors/aliases. 

97 

98 Returns: 

99 Parsed YAML document. 

100 

101 Raises: 

102 yaml.YAMLError: If the document is invalid or contains aliases 

103 when ``allow_aliases`` is False. 

104 """ 

105 loader_cls = yaml.SafeLoader if allow_aliases else NoAliasSafeLoader 

106 # Loader is always a SafeLoader subclass (SafeLoader or NoAliasSafeLoader), 

107 # so this is equivalent to yaml.safe_load. Bandit's B506 check does not 

108 # recognize the custom loader as safe. 

109 return yaml.load(stream, Loader=loader_cls) # nosec B506 

110 

111 

112def safe_load_all_yaml(stream: str | Any, *, allow_aliases: bool = False) -> list[Any]: 

113 """Load all YAML documents from a stream with optional alias rejection. 

114 

115 Args: 

116 stream: YAML string or file-like object. 

117 allow_aliases: If False (default), reject YAML anchors/aliases. 

118 

119 Returns: 

120 List of parsed YAML documents (``None`` documents are skipped). 

121 

122 Raises: 

123 yaml.YAMLError: If any document is invalid or contains aliases 

124 when ``allow_aliases`` is False. 

125 """ 

126 loader_cls = yaml.SafeLoader if allow_aliases else NoAliasSafeLoader 

127 # Loader is always a SafeLoader subclass, so this is equivalent to 

128 # yaml.safe_load_all. Bandit's B506 check does not recognize the custom 

129 # loader as safe. 

130 return [ 

131 doc for doc in yaml.load_all(stream, Loader=loader_cls) if doc is not None # nosec B506 

132 ] 

133 

134 

135class ManifestProcessor: 

136 """ 

137 Processes Kubernetes manifest submissions and applies them to the cluster 

138 """ 

139 

140 def __init__(self, cluster_id: str, region: str, config_dict: dict[str, Any]): 

141 self.cluster_id = cluster_id 

142 self.region = region 

143 self.config = config_dict 

144 

145 # Initialize Kubernetes clients 

146 try: 

147 # Try to load in-cluster config first (when running in pod) 

148 config.load_incluster_config() 

149 logger.info("Loaded in-cluster Kubernetes configuration") 

150 except config.ConfigException: 

151 try: 

152 # Fall back to local kubeconfig (for development) 

153 config.load_kube_config() 

154 logger.info("Loaded local Kubernetes configuration") 

155 except config.ConfigException as e: 

156 logger.error(f"Failed to load Kubernetes configuration: {e}") 

157 raise 

158 

159 # Initialize API clients 

160 self.api_client = client.ApiClient() 

161 self.api_client.configuration.request_timeout = int(os.environ.get("K8S_API_TIMEOUT", "30")) 

162 self.core_v1 = client.CoreV1Api() 

163 self.apps_v1 = client.AppsV1Api() 

164 self.batch_v1 = client.BatchV1Api() 

165 self.networking_v1 = client.NetworkingV1Api() 

166 self.custom_objects = client.CustomObjectsApi() 

167 

168 # Dynamic client for CRDs - lazy initialized to avoid cluster connection during init 

169 self._dynamic_client: dynamic.DynamicClient | None = None 

170 

171 # Timeout for Kubernetes API calls (seconds) 

172 self._k8s_timeout = int(os.environ.get("K8S_API_TIMEOUT", "30")) 

173 

174 # Resource quotas and limits 

175 self.max_cpu_per_manifest = self._parse_cpu_string( 

176 config_dict.get("max_cpu_per_manifest", "10") 

177 ) 

178 self.max_memory_per_manifest = self._parse_memory_string( 

179 config_dict.get("max_memory_per_manifest", "32Gi") 

180 ) 

181 self.max_gpu_per_manifest = int(config_dict.get("max_gpu_per_manifest", 4)) 

182 self.allowed_namespaces = set( 

183 config_dict.get("allowed_namespaces", ["default", "gco-jobs"]) 

184 ) 

185 self.validation_enabled = config_dict.get("validation_enabled", True) 

186 

187 # Trusted registries for image validation (configurable via cdk.json) 

188 self.trusted_registries = config_dict.get( 

189 "trusted_registries", 

190 [ 

191 "docker.io", 

192 "gcr.io", 

193 "quay.io", 

194 "registry.k8s.io", 

195 "k8s.gcr.io", 

196 "public.ecr.aws", 

197 "nvcr.io", 

198 ], 

199 ) 

200 self.trusted_dockerhub_orgs = config_dict.get( 

201 "trusted_dockerhub_orgs", 

202 [ 

203 "nvidia", 

204 "pytorch", 

205 "rayproject", 

206 "tensorflow", 

207 "huggingface", 

208 "amazon", 

209 "bitnami", 

210 "gco", 

211 ], 

212 ) 

213 

214 # Warn about trusted_registries entries that look like Docker Hub orgs (no dot or colon) 

215 for registry in self.trusted_registries: 

216 if not self._is_registry_domain(registry): 216 ↛ 217line 216 didn't jump to line 217 because the condition on line 216 was never true

217 logger.warning( 

218 f"Trusted registry '{registry}' has no domain separator (dot or colon) — " 

219 f"consider moving it to trusted_dockerhub_orgs instead" 

220 ) 

221 

222 # YAML parsing limits (configurable via cdk.json) 

223 self.yaml_max_depth = int(config_dict.get("yaml_max_depth", 50)) 

224 

225 # Allowed resource kinds (configurable via cdk.json) 

226 self.allowed_kinds = set( 

227 config_dict.get( 

228 "allowed_kinds", 

229 [ 

230 "Job", 

231 "CronJob", 

232 "Deployment", 

233 "StatefulSet", 

234 "DaemonSet", 

235 "Service", 

236 "ConfigMap", 

237 "Pod", 

238 ], 

239 ) 

240 ) 

241 

242 # Security policy — toggleable checks (configurable via cdk.json) 

243 security_policy = config_dict.get("manifest_security_policy", {}) 

244 self.block_privileged = security_policy.get("block_privileged", True) 

245 self.block_privilege_escalation = security_policy.get("block_privilege_escalation", True) 

246 self.block_host_network = security_policy.get("block_host_network", True) 

247 self.block_host_pid = security_policy.get("block_host_pid", True) 

248 self.block_host_ipc = security_policy.get("block_host_ipc", True) 

249 self.block_host_path = security_policy.get("block_host_path", True) 

250 self.block_added_capabilities = security_policy.get("block_added_capabilities", True) 

251 self.block_run_as_root = security_policy.get("block_run_as_root", False) 

252 

253 # ------------------------------------------------------------------ 

254 # Security defaults injection 

255 # ------------------------------------------------------------------ 

256 

257 @staticmethod 

258 def _extract_pod_spec(manifest: dict[str, Any]) -> dict[str, Any] | None: 

259 """Extract the pod spec from a manifest, handling all workload types. 

260 

261 Supports: 

262 - Deployment / StatefulSet / DaemonSet / ReplicaSet → spec.template.spec 

263 - Job → spec.template.spec 

264 - CronJob → spec.jobTemplate.spec.template.spec 

265 - Bare Pod → spec (when ``containers`` key is present) 

266 

267 Returns: 

268 The pod spec dict (mutable reference), or ``None`` if the manifest 

269 does not contain a recognisable pod spec. 

270 """ 

271 spec = manifest.get("spec") 

272 if spec is None or not isinstance(spec, dict): 

273 return None 

274 

275 kind = manifest.get("kind", "") 

276 

277 # CronJob: spec.jobTemplate.spec.template.spec 

278 if kind == "CronJob": 

279 job_template = spec.get("jobTemplate") 

280 if isinstance(job_template, dict): 280 ↛ 288line 280 didn't jump to line 288 because the condition on line 280 was always true

281 job_spec = job_template.get("spec") 

282 if isinstance(job_spec, dict): 282 ↛ 288line 282 didn't jump to line 288 because the condition on line 282 was always true

283 template = job_spec.get("template") 

284 if isinstance(template, dict): 284 ↛ 288line 284 didn't jump to line 288 because the condition on line 284 was always true

285 pod_spec = template.get("spec") 

286 if isinstance(pod_spec, dict): 286 ↛ 288line 286 didn't jump to line 288 because the condition on line 286 was always true

287 return pod_spec 

288 return None 

289 

290 # Deployment / StatefulSet / DaemonSet / ReplicaSet / Job: 

291 # spec.template.spec 

292 if "template" in spec: 

293 template = spec.get("template") 

294 if isinstance(template, dict): 294 ↛ 298line 294 didn't jump to line 298 because the condition on line 294 was always true

295 pod_spec = template.get("spec") 

296 if isinstance(pod_spec, dict): 296 ↛ 298line 296 didn't jump to line 298 because the condition on line 296 was always true

297 return pod_spec 

298 return None 

299 

300 # Bare Pod: spec contains "containers" directly 

301 if "containers" in spec: 

302 return cast(dict[str, Any], spec) 

303 

304 return None 

305 

306 def _inject_security_defaults(self, manifest: dict[str, Any]) -> dict[str, Any]: 

307 """Inject security defaults into user-submitted manifests. 

308 

309 Currently injects: 

310 - ``automountServiceAccountToken: false`` in the pod spec (unless the 

311 user has explicitly set it). 

312 

313 The method mutates *manifest* in-place and returns it for convenience. 

314 """ 

315 pod_spec = self._extract_pod_spec(manifest) 

316 if pod_spec is not None: 

317 # Use setdefault so we don't override an explicit user choice 

318 pod_spec.setdefault("automountServiceAccountToken", False) 

319 return manifest 

320 

321 @property 

322 def dynamic_client(self) -> dynamic.DynamicClient: 

323 """Lazy-initialized dynamic client for CRD support.""" 

324 if self._dynamic_client is None: 

325 self._dynamic_client = dynamic.DynamicClient(self.api_client) 

326 return self._dynamic_client 

327 

328 def _parse_cpu_string(self, cpu_str: str) -> int: 

329 """Parse CPU string to millicores""" 

330 if not cpu_str: 

331 return 0 

332 

333 cpu_str = cpu_str.strip() 

334 if cpu_str.endswith("m"): 

335 return int(cpu_str[:-1]) 

336 return int(cpu_str) * 1000 

337 

338 def _parse_memory_string(self, memory_str: str) -> int: 

339 """Parse memory string to bytes""" 

340 if not memory_str: 

341 return 0 

342 

343 memory_str = memory_str.strip() 

344 

345 if memory_str.endswith("Ki"): 

346 return int(memory_str[:-2]) * 1024 

347 if memory_str.endswith("Mi"): 

348 return int(memory_str[:-2]) * 1024 * 1024 

349 if memory_str.endswith("Gi"): 

350 return int(memory_str[:-2]) * 1024 * 1024 * 1024 

351 if memory_str.endswith("Ti"): 

352 return int(memory_str[:-2]) * 1024 * 1024 * 1024 * 1024 

353 if memory_str.endswith("k"): 

354 return int(memory_str[:-1]) * 1000 

355 if memory_str.endswith("M"): 

356 return int(memory_str[:-1]) * 1000 * 1000 

357 if memory_str.endswith("G"): 

358 return int(memory_str[:-1]) * 1000 * 1000 * 1000 

359 return int(memory_str) 

360 

361 def _check_yaml_depth(self, obj: Any, current_depth: int = 0) -> bool: 

362 """Check if a parsed YAML/JSON object exceeds max nesting depth. 

363 

364 Recursively walks dicts and lists. Returns False if depth exceeds 

365 ``self.yaml_max_depth``. 

366 

367 Args: 

368 obj: The parsed object to check (dict, list, or scalar). 

369 current_depth: Current recursion depth (callers should leave at 0). 

370 

371 Returns: 

372 True if the object is within the depth limit, False otherwise. 

373 """ 

374 if current_depth > self.yaml_max_depth: 

375 return False 

376 if isinstance(obj, dict): 

377 return all(self._check_yaml_depth(v, current_depth + 1) for v in obj.values()) 

378 if isinstance(obj, list): 

379 return all(self._check_yaml_depth(item, current_depth + 1) for item in obj) 

380 return True 

381 

382 def validate_manifest(self, manifest: dict[str, Any]) -> tuple[bool, str | None]: 

383 """ 

384 Validate a Kubernetes manifest for security and resource constraints 

385 Returns: (is_valid, error_message) 

386 """ 

387 if not self.validation_enabled: 

388 return True, None 

389 

390 try: 

391 # YAML depth check — reject excessively nested documents 

392 if not self._check_yaml_depth(manifest): 

393 return ( 

394 False, 

395 f"Manifest exceeds maximum nesting depth of {self.yaml_max_depth} levels", 

396 ) 

397 

398 # Basic structure validation 

399 required_fields = ["apiVersion", "kind", "metadata"] 

400 for field in required_fields: 

401 if field not in manifest: 

402 return False, f"Missing required field: {field}" 

403 

404 # Validate metadata 

405 metadata = manifest.get("metadata", {}) 

406 if "name" not in metadata: 

407 return False, "Missing metadata.name field" 

408 

409 # Validate namespace 

410 namespace = metadata.get("namespace", "default") 

411 if namespace not in self.allowed_namespaces: 

412 return ( 

413 False, 

414 f"Namespace '{namespace}' not allowed. Allowed namespaces: {list(self.allowed_namespaces)}", 

415 ) 

416 

417 # Validate resource kind 

418 kind = manifest.get("kind", "") 

419 if kind not in self.allowed_kinds: 

420 return ( 

421 False, 

422 f"Resource kind '{kind}' is not allowed. Allowed kinds: {sorted(self.allowed_kinds)}", 

423 ) 

424 

425 # Validate resource limits for workload resources 

426 if kind in [ 

427 "Deployment", 

428 "Job", 

429 "CronJob", 

430 "StatefulSet", 

431 "DaemonSet", 

432 ]: 

433 resource_valid, resource_error = self._validate_resource_limits(manifest) 

434 if not resource_valid: 

435 return False, resource_error 

436 

437 # Security validations 

438 sec_valid, sec_error = self._validate_security_context(manifest) 

439 if not sec_valid: 

440 return False, f"Security context validation failed: {sec_error}" 

441 

442 # Validate image sources (prevent pulling from untrusted registries) 

443 img_valid, img_error = self._validate_image_sources(manifest) 

444 if not img_valid: 

445 return False, img_error or "Untrusted image sources detected" 

446 

447 return True, None 

448 

449 except Exception as e: 

450 logger.error(f"Error validating manifest: {e}") 

451 return False, f"Validation error: {e!s}" 

452 

453 def _validate_resource_limits(self, manifest: dict[str, Any]) -> tuple[bool, str]: 

454 """Validate resource limits in manifest. 

455 

456 Returns: 

457 Tuple of (is_valid, error_message). error_message is empty if valid. 

458 """ 

459 try: 

460 errors: list[str] = [] 

461 spec = manifest.get("spec", {}) 

462 

463 # Get pod spec (handle different resource types) 

464 pod_spec = {} 

465 if "template" in spec: # Deployment, StatefulSet, etc. 

466 pod_spec = spec.get("template", {}).get("spec", {}) 

467 elif "jobTemplate" in spec: # CronJob 467 ↛ 471line 467 didn't jump to line 471 because the condition on line 467 was always true

468 pod_spec = ( 

469 spec.get("jobTemplate", {}).get("spec", {}).get("template", {}).get("spec", {}) 

470 ) 

471 elif "containers" in spec: # Pod 

472 pod_spec = spec 

473 

474 total_cpu = 0 

475 total_memory = 0 

476 total_gpu = 0 

477 

478 for _container_type, container in self._get_all_containers(pod_spec): 

479 resources = container.get("resources", {}) 

480 requests = resources.get("requests", {}) 

481 limits = resources.get("limits", {}) 

482 

483 # Check CPU (use limits if available, otherwise requests) 

484 cpu = limits.get( 

485 "cpu" 

486 ) or requests.get( # nosec B113 - dict.get(), not HTTP requests 

487 "cpu", "0" 

488 ) 

489 total_cpu += self._parse_cpu_string(cpu) 

490 

491 # Check Memory 

492 memory = limits.get( 

493 "memory" 

494 ) or requests.get( # nosec B113 - dict.get(), not HTTP requests 

495 "memory", "0" 

496 ) 

497 total_memory += self._parse_memory_string(memory) 

498 

499 # Check GPU 

500 gpu = limits.get( 

501 "nvidia.com/gpu" 

502 ) or requests.get( # nosec B113 - dict.get(), not HTTP requests 

503 "nvidia.com/gpu", "0" 

504 ) 

505 total_gpu += int(gpu) 

506 

507 # Validate against limits 

508 if total_cpu > self.max_cpu_per_manifest: 

509 logger.warning(f"CPU limit exceeded: {total_cpu}m > {self.max_cpu_per_manifest}m") 

510 errors.append(f"CPU {total_cpu}m exceeds max {self.max_cpu_per_manifest}m") 

511 

512 if total_memory > self.max_memory_per_manifest: 

513 logger.warning( 

514 f"Memory limit exceeded: {total_memory} > {self.max_memory_per_manifest}" 

515 ) 

516 mem_gb = self.max_memory_per_manifest / (1024**3) 

517 req_gb = total_memory / (1024**3) 

518 errors.append(f"Memory {req_gb:.0f}Gi exceeds max {mem_gb:.0f}Gi") 

519 

520 if total_gpu > self.max_gpu_per_manifest: 

521 logger.warning(f"GPU limit exceeded: {total_gpu} > {self.max_gpu_per_manifest}") 

522 errors.append(f"GPU {total_gpu} exceeds max {self.max_gpu_per_manifest}") 

523 

524 if errors: 

525 hint = ( 

526 "To raise limits, update resource_quotas in cdk.json " 

527 "and redeploy (see examples/README.md#troubleshooting)" 

528 ) 

529 return False, "; ".join(errors) + f". {hint}" 

530 

531 return True, "" 

532 

533 except Exception as e: 

534 logger.error(f"Error validating resource limits: {e}") 

535 return False, f"Resource limit validation error: {e}" 

536 

537 def _get_all_containers(self, pod_spec: dict[str, Any]) -> list[tuple[str, dict[str, Any]]]: 

538 """Get all containers from pod spec including init and ephemeral containers. 

539 

540 Returns: 

541 List of (container_type, container_dict) tuples where container_type 

542 is one of 'container', 'initContainer', or 'ephemeralContainer'. 

543 """ 

544 result = [] 

545 for c in pod_spec.get("containers", []): 

546 result.append(("container", c)) 

547 for c in pod_spec.get("initContainers", []): 

548 result.append(("initContainer", c)) 

549 for c in pod_spec.get("ephemeralContainers", []): 

550 result.append(("ephemeralContainer", c)) 

551 return result 

552 

553 def _validate_security_context(self, manifest: dict[str, Any]) -> tuple[bool, str | None]: 

554 """Validate security context settings. 

555 

556 Returns: 

557 Tuple of (is_valid, error_message). error_message is None if valid. 

558 """ 

559 try: 

560 # Basic security checks - prevent privileged containers 

561 spec = manifest.get("spec", {}) 

562 

563 # Get pod spec (handle different resource types) 

564 pod_spec = None 

565 if "template" in spec: 

566 pod_spec = spec.get("template", {}).get("spec", {}) 

567 elif "jobTemplate" in spec: 

568 pod_spec = ( 

569 spec.get("jobTemplate", {}).get("spec", {}).get("template", {}).get("spec", {}) 

570 ) 

571 elif "containers" in spec: 

572 pod_spec = spec 

573 

574 if pod_spec: 

575 # --- Pod-level checks --- 

576 if self.block_host_network and pod_spec.get("hostNetwork", False): 

577 return False, "hostNetwork is not permitted" 

578 

579 if self.block_host_pid and pod_spec.get("hostPID", False): 

580 return False, "hostPID is not permitted" 

581 

582 if self.block_host_ipc and pod_spec.get("hostIPC", False): 

583 return False, "hostIPC is not permitted" 

584 

585 # Check volumes for hostPath 

586 if self.block_host_path: 

587 for volume in pod_spec.get("volumes", []): 

588 if volume.get("hostPath") is not None: 588 ↛ 587line 588 didn't jump to line 587 because the condition on line 588 was always true

589 return False, "hostPath volumes are not permitted" 

590 

591 # Check pod security context 

592 security_context = pod_spec.get("securityContext", {}) 

593 if self.block_privileged and security_context.get("privileged", False): 

594 return False, "privileged pod security context is not permitted" 

595 

596 if self.block_run_as_root: 

597 run_as_user = security_context.get("runAsUser") 

598 if run_as_user is not None and run_as_user == 0: 

599 return False, "running as root (runAsUser: 0) is not permitted" 

600 

601 # --- Container-level checks --- 

602 for container_type, container in self._get_all_containers(pod_spec): 

603 container_name = container.get("name", "unknown") 

604 container_security = container.get("securityContext", {}) 

605 if self.block_privileged and container_security.get("privileged", False): 

606 return ( 

607 False, 

608 f"{container_type} '{container_name}': privileged containers are not permitted", 

609 ) 

610 if self.block_privilege_escalation and container_security.get( 

611 "allowPrivilegeEscalation", False 

612 ): 

613 return ( 

614 False, 

615 f"{container_type} '{container_name}': allowPrivilegeEscalation is not permitted", 

616 ) 

617 

618 # Check for added capabilities 

619 if self.block_added_capabilities: 

620 added_caps = container_security.get("capabilities", {}).get("add", []) 

621 if added_caps: 

622 return ( 

623 False, 

624 f"{container_type} '{container_name}': added capabilities are not permitted", 

625 ) 

626 

627 # Check for runAsUser: 0 (root) — off by default 

628 if self.block_run_as_root: 

629 run_as_user = container_security.get("runAsUser") 

630 if run_as_user is not None and run_as_user == 0: 

631 return ( 

632 False, 

633 f"{container_type} '{container_name}': running as root (runAsUser: 0) is not permitted", 

634 ) 

635 

636 return True, None 

637 

638 except Exception as e: 

639 logger.error(f"Error validating security context: {e}") 

640 return False, f"Security context error: {e}" 

641 

642 @staticmethod 

643 def _is_registry_domain(entry: str) -> bool: 

644 """Check if a registry entry is a proper domain (contains dot or colon). 

645 

646 A proper registry domain contains either a dot (e.g., 'docker.io', 'gcr.io') 

647 or a colon (e.g., 'localhost:5000'). Entries without these are Docker Hub 

648 organization names (e.g., 'nvidia', 'gco'). 

649 """ 

650 return "." in entry or ":" in entry 

651 

652 def _validate_image_sources(self, manifest: dict[str, Any]) -> tuple[bool, str | None]: 

653 """Validate container image sources. 

654 

655 Uses proper domain matching instead of prefix matching to prevent 

656 dependency confusion attacks (e.g., 'gco-malicious/evil' should NOT 

657 match a trusted registry entry 'gco'). 

658 

659 Matching logic: 

660 1. If image has no '/' → official Docker Hub image (always allowed) 

661 2. If image has '/' and part before first '/' contains a dot or colon 

662 → it's a registry domain → match against trusted_registries 

663 3. If image has '/' but first segment has no dot/colon 

664 → it's a Docker Hub org → match against trusted_dockerhub_orgs 

665 4. Digest references (@sha256:) are accepted from any trusted source 

666 

667 Returns: 

668 Tuple of (is_valid, error_message). error_message is None if valid. 

669 """ 

670 try: 

671 trusted_registries = self.trusted_registries 

672 trusted_dockerhub_orgs = self.trusted_dockerhub_orgs 

673 

674 spec = manifest.get("spec", {}) 

675 

676 # Get pod spec (handle different resource types) 

677 pod_spec = {} 

678 if "template" in spec: 

679 pod_spec = spec.get("template", {}).get("spec", {}) 

680 elif "jobTemplate" in spec: 

681 pod_spec = ( 

682 spec.get("jobTemplate", {}).get("spec", {}).get("template", {}).get("spec", {}) 

683 ) 

684 elif "containers" in spec: 

685 pod_spec = spec 

686 

687 for container_type, container in self._get_all_containers(pod_spec): 

688 image = container.get("image", "") 

689 if not image: 

690 continue 

691 

692 is_trusted = False 

693 

694 # Case 1: Official Docker Hub image (no slash) — e.g., "python:3.14", "busybox" 

695 if "/" not in image: 

696 is_trusted = True 

697 else: 

698 # Image has a slash — determine if first segment is a domain or org 

699 first_segment = image.split("/")[0] 

700 

701 if self._is_registry_domain(first_segment): 

702 # Case 2: First segment looks like a domain (has dot or colon) 

703 # Match against trusted_registries as exact domain match 

704 for registry in trusted_registries: 

705 if first_segment == registry: 

706 is_trusted = True 

707 break 

708 # Also support multi-level registry paths like "public.ecr.aws" 

709 # where the image might be "public.ecr.aws/lambda/python:3.14" 

710 if image.startswith(registry + "/"): 710 ↛ 711line 710 didn't jump to line 711 because the condition on line 710 was never true

711 is_trusted = True 

712 break 

713 else: 

714 # Case 3: First segment has no dot/colon — it's a Docker Hub org 

715 # Match against trusted_dockerhub_orgs 

716 if first_segment in trusted_dockerhub_orgs: 

717 is_trusted = True 

718 

719 if not is_trusted: 

720 container_name = container.get("name", "unknown") 

721 logger.warning(f"Untrusted image source: {image}") 

722 return ( 

723 False, 

724 f"{container_type} '{container_name}': Untrusted image source '{image}'", 

725 ) 

726 

727 return True, None 

728 

729 except Exception as e: 

730 logger.error(f"Error validating image sources: {e}") 

731 return False, f"Image source validation error: {e}" 

732 

733 async def process_manifest_submission( 

734 self, request: ManifestSubmissionRequest 

735 ) -> ManifestSubmissionResponse: 

736 """ 

737 Process a manifest submission request 

738 """ 

739 logger.info(f"Processing manifest submission with {len(request.manifests)} manifests") 

740 

741 resources = [] 

742 errors = [] 

743 overall_success = True 

744 

745 try: 

746 # Process each manifest 

747 for i, manifest_data in enumerate(request.manifests): 

748 try: 

749 # Validate manifest 

750 is_valid, error_msg = self.validate_manifest(manifest_data) 

751 if not is_valid: 

752 error_msg = f"Manifest {i + 1} validation failed: {error_msg}" 

753 errors.append(error_msg) 

754 logger.error(error_msg) 

755 

756 # Create failed resource status 

757 resource_status = ResourceStatus( 

758 api_version=manifest_data.get("apiVersion", "unknown"), 

759 kind=manifest_data.get("kind", "unknown"), 

760 name=manifest_data.get("metadata", {}).get("name", f"manifest-{i + 1}"), 

761 namespace=manifest_data.get("metadata", {}).get("namespace", "default"), 

762 status="failed", 

763 message=error_msg, 

764 ) 

765 resources.append(resource_status) 

766 overall_success = False 

767 continue 

768 

769 # Apply manifest if validation passed 

770 if not request.dry_run: 

771 resource_status = await self._apply_manifest( 

772 manifest_data, request.namespace 

773 ) 

774 resources.append(resource_status) 

775 

776 if not resource_status.is_successful(): 

777 overall_success = False 

778 else: 

779 # Dry run - just validate 

780 resource_status = ResourceStatus( 

781 api_version=manifest_data.get("apiVersion", "unknown"), 

782 kind=manifest_data.get("kind", "unknown"), 

783 name=manifest_data.get("metadata", {}).get("name", "unknown"), 

784 namespace=manifest_data.get("metadata", {}).get( 

785 "namespace", request.namespace or "default" 

786 ), 

787 status="unchanged", 

788 message="Dry run - validation passed", 

789 ) 

790 resources.append(resource_status) 

791 

792 except Exception as e: 

793 error_msg = f"Error processing manifest {i + 1}: {e!s}" 

794 errors.append(error_msg) 

795 logger.error(error_msg) 

796 overall_success = False 

797 

798 # Create failed resource status 

799 resource_status = ResourceStatus( 

800 api_version=manifest_data.get("apiVersion", "unknown"), 

801 kind=manifest_data.get("kind", "unknown"), 

802 name=manifest_data.get("metadata", {}).get("name", f"manifest-{i + 1}"), 

803 namespace=manifest_data.get("metadata", {}).get("namespace", "default"), 

804 status="failed", 

805 message=str(e), 

806 ) 

807 resources.append(resource_status) 

808 

809 except Exception as e: 

810 error_msg = f"Fatal error processing manifest submission: {e!s}" 

811 errors.append(error_msg) 

812 logger.error(error_msg) 

813 overall_success = False 

814 

815 response = ManifestSubmissionResponse( 

816 success=overall_success, 

817 cluster_id=self.cluster_id, 

818 region=self.region, 

819 resources=resources, 

820 errors=errors if errors else None, 

821 ) 

822 

823 logger.info( 

824 f"Manifest submission completed - Success: {overall_success}, " 

825 f"Resources: {len(resources)}, Errors: {len(errors)}" 

826 ) 

827 

828 return response 

829 

830 async def _apply_manifest( 

831 self, manifest_data: dict[str, Any], default_namespace: str | None = None 

832 ) -> ResourceStatus: 

833 """ 

834 Apply a single manifest to the cluster. 

835 

836 For Jobs and CronJobs, if the resource already exists and is completed/failed, 

837 it will be automatically deleted and recreated (since these resources are immutable). 

838 """ 

839 try: 

840 api_version: str = manifest_data.get("apiVersion", "unknown") 

841 kind: str = manifest_data.get("kind", "unknown") 

842 metadata = manifest_data.get("metadata", {}) 

843 name: str = metadata.get("name", "unknown") 

844 namespace: str = metadata.get("namespace", default_namespace or "default") 

845 

846 # Ensure namespace is set in manifest 

847 if "namespace" not in metadata and namespace: 

848 manifest_data["metadata"]["namespace"] = namespace 

849 

850 # Inject security defaults (e.g., automountServiceAccountToken: false) 

851 self._inject_security_defaults(manifest_data) 

852 

853 # Check if resource already exists 

854 existing_resource = await self._get_existing_resource( 

855 api_version, kind, name, namespace 

856 ) 

857 

858 if existing_resource: 

859 # Jobs are immutable — if one already exists and is finished, 

860 # delete it first so we can recreate cleanly. 

861 # If the job is still active, auto-rename to avoid collision. 

862 if kind == "Job": 

863 if self._is_job_finished(existing_resource): 

864 logger.info( 

865 f"Job {name} already exists and is finished, deleting before recreating" 

866 ) 

867 await self.delete_resource(api_version, kind, name, namespace) 

868 import asyncio 

869 

870 await asyncio.sleep(1) 

871 await self._create_resource(manifest_data) 

872 status = "created" 

873 message = "Previous completed job replaced with new submission" 

874 else: 

875 # Active job — rename to avoid destroying it 

876 import uuid 

877 

878 suffix = uuid.uuid4().hex[:5] 

879 new_name = f"{name}-{suffix}" 

880 manifest_data["metadata"]["name"] = new_name 

881 logger.warning( 

882 f"Job {name} is still active, renamed new submission to {new_name}" 

883 ) 

884 await self._create_resource(manifest_data) 

885 status = "created" 

886 message = ( 

887 f"Job '{name}' is still running. " 

888 f"New submission renamed to '{new_name}'." 

889 ) 

890 name = new_name 

891 else: 

892 # Update existing resource (works for mutable resources) 

893 updated_resource = await self._update_resource(manifest_data) 

894 status = "updated" if updated_resource else "unchanged" 

895 message = ( 

896 "Resource updated successfully" if updated_resource else "No changes needed" 

897 ) 

898 else: 

899 # Create new resource 

900 await self._create_resource(manifest_data) 

901 status = "created" 

902 message = "Resource created successfully" 

903 

904 return ResourceStatus( 

905 api_version=api_version, 

906 kind=kind, 

907 name=name, 

908 namespace=namespace, 

909 status=status, 

910 message=message, 

911 ) 

912 

913 except ApiException as e: 

914 logger.error(f"Kubernetes API error applying manifest: {e}") 

915 return ResourceStatus( 

916 api_version=manifest_data.get("apiVersion", "unknown"), 

917 kind=manifest_data.get("kind", "unknown"), 

918 name=manifest_data.get("metadata", {}).get("name", "unknown"), 

919 namespace=manifest_data.get("metadata", {}).get("namespace", "default"), 

920 status="failed", 

921 message=f"API error: {e.reason}", 

922 ) 

923 except Exception as e: 

924 logger.error(f"Error applying manifest: {e}") 

925 return ResourceStatus( 

926 api_version=manifest_data.get("apiVersion", "unknown"), 

927 kind=manifest_data.get("kind", "unknown"), 

928 name=manifest_data.get("metadata", {}).get("name", "unknown"), 

929 namespace=manifest_data.get("metadata", {}).get("namespace", "default"), 

930 status="failed", 

931 message=str(e), 

932 ) 

933 

934 def _is_job_finished(self, job_resource: dict[str, Any]) -> bool: 

935 """Check if a Kubernetes Job resource is in a terminal state (Complete or Failed).""" 

936 status = job_resource.get("status", {}) 

937 conditions = status.get("conditions") or [] 

938 for condition in conditions: 

939 cond_type = condition.get("type", "") 

940 cond_status = condition.get("status", "") 

941 if cond_type in ("Complete", "Failed") and cond_status == "True": 

942 return True 

943 return False 

944 

945 async def _get_existing_resource( 

946 self, api_version: str, kind: str, name: str, namespace: str 

947 ) -> dict[str, Any] | None: 

948 """Check if a resource already exists using dynamic client""" 

949 try: 

950 # Get the API resource 

951 api_resource = self._get_api_resource(api_version, kind) 

952 

953 # Try to get the resource 

954 if namespace and api_resource.namespaced: 

955 resource = api_resource.get(name=name, namespace=namespace) 

956 else: 

957 resource = api_resource.get(name=name) 

958 

959 if resource is not None: 959 ↛ 970line 959 didn't jump to line 970 because the condition on line 959 was always true

960 return dict(resource.to_dict()) 

961 

962 except ApiException as e: 

963 if e.status == 404: 

964 return None # Resource doesn't exist 

965 raise 

966 except ValueError: 

967 # Unknown resource type 

968 return None 

969 

970 return None 

971 

972 def _get_api_resource(self, api_version: str, kind: str) -> Any: 

973 """Get the API resource for a given apiVersion and kind using dynamic client.""" 

974 try: 

975 return self.dynamic_client.resources.get(api_version=api_version, kind=kind) 

976 except ResourceNotFoundError as e: 

977 logger.error(f"Resource type not found: {api_version}/{kind}") 

978 raise ValueError(f"Unknown resource type: {api_version}/{kind}") from e 

979 

980 async def _create_resource(self, manifest_data: dict[str, Any]) -> bool: 

981 """Create a new resource from manifest using dynamic client""" 

982 try: 

983 api_version = manifest_data.get("apiVersion", "") 

984 kind = manifest_data.get("kind", "") 

985 namespace = manifest_data.get("metadata", {}).get("namespace") 

986 

987 # Get the API resource 

988 api_resource = self._get_api_resource(api_version, kind) 

989 

990 # Create the resource 

991 if namespace and api_resource.namespaced: 

992 api_resource.create(body=manifest_data, namespace=namespace) 

993 else: 

994 api_resource.create(body=manifest_data) 

995 

996 return True 

997 except Exception as e: 

998 logger.error(f"Error creating resource: {e}") 

999 raise 

1000 

1001 async def _update_resource(self, manifest_data: dict[str, Any]) -> bool: 

1002 """Update an existing resource using dynamic client""" 

1003 try: 

1004 api_version = manifest_data.get("apiVersion", "") 

1005 kind = manifest_data.get("kind", "") 

1006 name = manifest_data.get("metadata", {}).get("name", "") 

1007 namespace = manifest_data.get("metadata", {}).get("namespace") 

1008 

1009 # Get the API resource 

1010 api_resource = self._get_api_resource(api_version, kind) 

1011 

1012 # Update the resource using patch (server-side apply) 

1013 if namespace and api_resource.namespaced: 

1014 api_resource.patch( 

1015 body=manifest_data, 

1016 name=name, 

1017 namespace=namespace, 

1018 content_type="application/merge-patch+json", 

1019 ) 

1020 else: 

1021 api_resource.patch( 

1022 body=manifest_data, 

1023 name=name, 

1024 content_type="application/merge-patch+json", 

1025 ) 

1026 

1027 return True 

1028 except Exception as e: 

1029 logger.error(f"Error updating resource: {e}") 

1030 raise 

1031 

1032 async def delete_resource( 

1033 self, api_version: str, kind: str, name: str, namespace: str 

1034 ) -> ResourceStatus: 

1035 """ 

1036 Delete a resource from the cluster using dynamic client 

1037 """ 

1038 try: 

1039 # Get the API resource 

1040 api_resource = self._get_api_resource(api_version, kind) 

1041 

1042 # Delete the resource 

1043 if namespace and api_resource.namespaced: 

1044 api_resource.delete(name=name, namespace=namespace) 

1045 else: 

1046 api_resource.delete(name=name) 

1047 

1048 return ResourceStatus( 

1049 api_version=api_version, 

1050 kind=kind, 

1051 name=name, 

1052 namespace=namespace, 

1053 status="deleted", 

1054 message="Resource deleted successfully", 

1055 ) 

1056 

1057 except ValueError as e: 

1058 # Unknown resource type 

1059 return ResourceStatus( 

1060 api_version=api_version, 

1061 kind=kind, 

1062 name=name, 

1063 namespace=namespace, 

1064 status="failed", 

1065 message=str(e), 

1066 ) 

1067 except ApiException as e: 

1068 if e.status == 404: 

1069 return ResourceStatus( 

1070 api_version=api_version, 

1071 kind=kind, 

1072 name=name, 

1073 namespace=namespace, 

1074 status="unchanged", 

1075 message="Resource not found (already deleted)", 

1076 ) 

1077 return ResourceStatus( 

1078 api_version=api_version, 

1079 kind=kind, 

1080 name=name, 

1081 namespace=namespace, 

1082 status="failed", 

1083 message=f"Delete failed: {e.reason}", 

1084 ) 

1085 except Exception as e: 

1086 return ResourceStatus( 

1087 api_version=api_version, 

1088 kind=kind, 

1089 name=name, 

1090 namespace=namespace, 

1091 status="failed", 

1092 message=str(e), 

1093 ) 

1094 

1095 async def list_jobs( 

1096 self, namespace: str | None = None, status_filter: str | None = None 

1097 ) -> list[dict[str, Any]]: 

1098 """ 

1099 List Kubernetes Jobs from allowed namespaces. 

1100 

1101 Args: 

1102 namespace: Filter by specific namespace (must be in allowed_namespaces) 

1103 status_filter: Filter by status: "running", "completed", "failed" 

1104 

1105 Returns: 

1106 List of job dictionaries with metadata and status 

1107 """ 

1108 jobs = [] 

1109 

1110 # Determine which namespaces to query 

1111 if namespace: 

1112 if namespace not in self.allowed_namespaces: 

1113 raise ValueError( 

1114 f"Namespace '{namespace}' not allowed. " 

1115 f"Allowed namespaces: {list(self.allowed_namespaces)}" 

1116 ) 

1117 namespaces_to_query = [namespace] 

1118 else: 

1119 namespaces_to_query = list(self.allowed_namespaces) 

1120 

1121 for ns in namespaces_to_query: 

1122 try: 

1123 job_list = self.batch_v1.list_namespaced_job( 

1124 namespace=ns, _request_timeout=self._k8s_timeout 

1125 ) 

1126 for job in job_list.items: 

1127 job_dict = self._job_to_dict(job) 

1128 

1129 # Apply status filter 

1130 if status_filter: 

1131 job_status = self._get_job_status(job) 

1132 if job_status != status_filter: 

1133 continue 

1134 

1135 jobs.append(job_dict) 

1136 except ApiException as e: 

1137 logger.warning(f"Failed to list jobs in namespace {ns}: {e.reason}") 

1138 continue 

1139 

1140 return jobs 

1141 

1142 def _job_to_dict(self, job: V1Job) -> dict[str, Any]: 

1143 """Convert a Kubernetes Job object to a dictionary.""" 

1144 metadata = job.metadata 

1145 status = job.status 

1146 spec = job.spec 

1147 

1148 return { 

1149 "metadata": { 

1150 "name": metadata.name, 

1151 "namespace": metadata.namespace, 

1152 "creationTimestamp": ( 

1153 metadata.creation_timestamp.isoformat() if metadata.creation_timestamp else None 

1154 ), 

1155 "labels": metadata.labels or {}, 

1156 "uid": metadata.uid, 

1157 }, 

1158 "spec": { 

1159 "parallelism": spec.parallelism, 

1160 "completions": spec.completions, 

1161 "backoffLimit": spec.backoff_limit, 

1162 }, 

1163 "status": { 

1164 "active": status.active or 0, 

1165 "succeeded": status.succeeded or 0, 

1166 "failed": status.failed or 0, 

1167 "startTime": status.start_time.isoformat() if status.start_time else None, 

1168 "completionTime": ( 

1169 status.completion_time.isoformat() if status.completion_time else None 

1170 ), 

1171 "conditions": [ 

1172 { 

1173 "type": c.type, 

1174 "status": c.status, 

1175 "reason": c.reason, 

1176 "message": c.message, 

1177 } 

1178 for c in (status.conditions or []) 

1179 ], 

1180 }, 

1181 } 

1182 

1183 def _get_job_status(self, job: V1Job) -> str: 

1184 """Determine the status of a job: running, completed, or failed.""" 

1185 status = job.status 

1186 conditions = status.conditions or [] 

1187 

1188 for condition in conditions: 

1189 if condition.type == "Complete" and condition.status == "True": 

1190 return "completed" 

1191 if condition.type == "Failed" and condition.status == "True": 1191 ↛ 1188line 1191 didn't jump to line 1188 because the condition on line 1191 was always true

1192 return "failed" 

1193 

1194 if (status.active or 0) > 0: 

1195 return "running" 

1196 

1197 return "pending" 

1198 

1199 async def get_resource_status( 

1200 self, api_version: str, kind: str, name: str, namespace: str 

1201 ) -> dict[str, Any] | None: 

1202 """ 

1203 Get the status of a specific resource 

1204 """ 

1205 try: 

1206 resource = await self._get_existing_resource(api_version, kind, name, namespace) 

1207 if resource: 

1208 return { 

1209 "api_version": api_version, 

1210 "kind": kind, 

1211 "name": name, 

1212 "namespace": namespace, 

1213 "exists": True, 

1214 "status": resource.get("status", {}), 

1215 "metadata": resource.get("metadata", {}), 

1216 "spec": resource.get("spec", {}), 

1217 } 

1218 return { 

1219 "api_version": api_version, 

1220 "kind": kind, 

1221 "name": name, 

1222 "namespace": namespace, 

1223 "exists": False, 

1224 } 

1225 except Exception as e: 

1226 logger.error(f"Error getting resource status: {e}") 

1227 return None 

1228 

1229 

1230def create_manifest_processor_from_env() -> ManifestProcessor: 

1231 """ 

1232 Create ManifestProcessor instance from environment variables 

1233 """ 

1234 cluster_id = os.getenv("CLUSTER_NAME", "unknown-cluster") 

1235 region = os.getenv("REGION", "unknown-region") 

1236 

1237 # Enable structured JSON logging for CloudWatch Insights 

1238 configure_structured_logging( 

1239 service_name="manifest-processor", 

1240 cluster_id=cluster_id, 

1241 region=region, 

1242 ) 

1243 

1244 # Load configuration from environment 

1245 config_dict = { 

1246 "max_cpu_per_manifest": os.getenv("MAX_CPU_PER_MANIFEST", "10"), 

1247 "max_memory_per_manifest": os.getenv("MAX_MEMORY_PER_MANIFEST", "32Gi"), 

1248 "max_gpu_per_manifest": int(os.getenv("MAX_GPU_PER_MANIFEST", "4")), 

1249 "allowed_namespaces": os.getenv("ALLOWED_NAMESPACES", "default,gco-jobs").split(","), 

1250 "validation_enabled": os.getenv("VALIDATION_ENABLED", "true").lower() == "true", 

1251 } 

1252 

1253 return ManifestProcessor(cluster_id, region, config_dict)