Coverage for gco/services/manifest_processor.py: 97%
526 statements
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-15 15:07 +0000
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-15 15:07 +0000
1"""
2Manifest Processor Service for GCO (Global Capacity Orchestrator on AWS).
4This service processes Kubernetes manifest submissions, validates them against
5security and resource constraints, and applies them to the cluster.
7Key Features:
8- Validates manifests for required fields and structure
9- Enforces namespace restrictions (only allowed namespaces)
10- Enforces resource limits (CPU, memory, GPU per manifest)
11- Validates security context (no privileged containers)
12- Validates image sources (trusted registries only)
13- Supports dry-run mode for validation without applying
15Security Validations:
16- Namespace must be in allowed list (default: default, gco-jobs)
17- No privileged containers or privilege escalation
18- Images must be from trusted registries
19- Resource requests/limits within configured maximums
21Environment Variables:
22 CLUSTER_NAME: Name of the EKS cluster
23 REGION: AWS region of the cluster
24 MAX_CPU_PER_MANIFEST: Maximum CPU (millicores) per manifest (default: 10000)
25 MAX_MEMORY_PER_MANIFEST: Maximum memory per manifest (default: 32Gi)
26 MAX_GPU_PER_MANIFEST: Maximum GPUs per manifest (default: 4)
27 ALLOWED_NAMESPACES: Comma-separated list of allowed namespaces
28 VALIDATION_ENABLED: Enable/disable validation (default: true)
30Usage:
31 processor = create_manifest_processor_from_env()
32 response = await processor.process_manifest_submission(request)
33"""
35from __future__ import annotations
37import logging
38import os
39from typing import Any, cast
41import yaml
42from kubernetes import client, config, dynamic
43from kubernetes.client.models import V1Job
44from kubernetes.client.rest import ApiException
45from kubernetes.dynamic.exceptions import ResourceNotFoundError
47from gco.models import (
48 ManifestSubmissionRequest,
49 ManifestSubmissionResponse,
50 ResourceStatus,
51)
52from gco.services.structured_logging import configure_structured_logging, sanitize_log_value
54# NOTE: No logging.basicConfig() here. This module is imported by the CLI
55# (cli/jobs.py, cli/commands/*_cmd.py) as a library for YAML loading helpers.
56# Calling basicConfig() at import time would configure the root logger with
57# INFO-level output, causing noisy botocore/urllib3 INFO messages on every
58# CLI command. Container entry points (manifest_api.py) do their own
59# basicConfig() call.
60logger = logging.getLogger(__name__)
63# ---------------------------------------------------------------------------
64# YAML Alias Rejection Loader
65# ---------------------------------------------------------------------------
68class NoAliasSafeLoader(yaml.SafeLoader):
69 """A YAML SafeLoader that rejects anchors and aliases.
71 YAML anchors (``&anchor``) and aliases (``*anchor``) can be used to
72 construct exponentially large data structures (billion-laughs attack).
73 This loader raises an error when any alias is encountered, preventing
74 such attacks at the parsing stage.
75 """
77 def compose_node(self, parent: Any, index: Any) -> Any:
78 if self.check_event(yaml.AliasEvent): # type: ignore[no-untyped-call]
79 event = self.get_event() # type: ignore[no-untyped-call]
80 raise yaml.composer.ComposerError(
81 None,
82 None,
83 "YAML aliases are not allowed "
84 "(security policy: yaml_allow_aliases=false), "
85 f"found alias *{event.anchor}",
86 event.start_mark,
87 )
88 return super().compose_node(parent, index)
91def safe_load_yaml(stream: str | Any, *, allow_aliases: bool = False) -> Any:
92 """Load a single YAML document with optional alias rejection.
94 Args:
95 stream: YAML string or file-like object.
96 allow_aliases: If False (default), reject YAML anchors/aliases.
98 Returns:
99 Parsed YAML document.
101 Raises:
102 yaml.YAMLError: If the document is invalid or contains aliases
103 when ``allow_aliases`` is False.
104 """
105 loader_cls = yaml.SafeLoader if allow_aliases else NoAliasSafeLoader
106 # Loader is always a SafeLoader subclass (SafeLoader or NoAliasSafeLoader),
107 # so this is equivalent to yaml.safe_load. Bandit's B506 check does not
108 # recognize the custom loader as safe.
109 return yaml.load(stream, Loader=loader_cls) # nosec B506
112def safe_load_all_yaml(stream: str | Any, *, allow_aliases: bool = False) -> list[Any]:
113 """Load all YAML documents from a stream with optional alias rejection.
115 Args:
116 stream: YAML string or file-like object.
117 allow_aliases: If False (default), reject YAML anchors/aliases.
119 Returns:
120 List of parsed YAML documents (``None`` documents are skipped).
122 Raises:
123 yaml.YAMLError: If any document is invalid or contains aliases
124 when ``allow_aliases`` is False.
125 """
126 loader_cls = yaml.SafeLoader if allow_aliases else NoAliasSafeLoader
127 # Loader is always a SafeLoader subclass, so this is equivalent to
128 # yaml.safe_load_all. Bandit's B506 check does not recognize the custom
129 # loader as safe.
130 return [
131 doc
132 for doc in yaml.load_all(stream, Loader=loader_cls)
133 if doc is not None # nosec B506
134 ]
137class ManifestProcessor:
138 """
139 Processes Kubernetes manifest submissions and applies them to the cluster
140 """
142 def __init__(self, cluster_id: str, region: str, config_dict: dict[str, Any]):
143 self.cluster_id = cluster_id
144 self.region = region
145 self.config = config_dict
147 # Initialize Kubernetes clients
148 try:
149 # Try to load in-cluster config first (when running in pod)
150 config.load_incluster_config()
151 logger.info("Loaded in-cluster Kubernetes configuration")
152 except config.ConfigException:
153 try:
154 # Fall back to local kubeconfig (for development)
155 config.load_kube_config()
156 logger.info("Loaded local Kubernetes configuration")
157 except config.ConfigException as e:
158 logger.error(f"Failed to load Kubernetes configuration: {e}")
159 raise
161 # Initialize API clients
162 self.api_client = client.ApiClient()
163 self.api_client.configuration.request_timeout = int(os.environ.get("K8S_API_TIMEOUT", "30"))
164 self.core_v1 = client.CoreV1Api()
165 self.apps_v1 = client.AppsV1Api()
166 self.batch_v1 = client.BatchV1Api()
167 self.networking_v1 = client.NetworkingV1Api()
168 self.custom_objects = client.CustomObjectsApi()
170 # Dynamic client for CRDs - lazy initialized to avoid cluster connection during init
171 self._dynamic_client: dynamic.DynamicClient | None = None
173 # Timeout for Kubernetes API calls (seconds)
174 self._k8s_timeout = int(os.environ.get("K8S_API_TIMEOUT", "30"))
176 # Resource quotas and limits
177 self.max_cpu_per_manifest = self._parse_cpu_string(
178 config_dict.get("max_cpu_per_manifest", "10")
179 )
180 self.max_memory_per_manifest = self._parse_memory_string(
181 config_dict.get("max_memory_per_manifest", "32Gi")
182 )
183 self.max_gpu_per_manifest = int(config_dict.get("max_gpu_per_manifest", 4))
184 self.allowed_namespaces = set(
185 config_dict.get("allowed_namespaces", ["default", "gco-jobs"])
186 )
187 self.validation_enabled = config_dict.get("validation_enabled", True)
189 # Trusted registries for image validation (configurable via cdk.json)
190 self.trusted_registries = config_dict.get(
191 "trusted_registries",
192 [
193 "docker.io",
194 "gcr.io",
195 "quay.io",
196 "registry.k8s.io",
197 "k8s.gcr.io",
198 "public.ecr.aws",
199 "nvcr.io",
200 ],
201 )
202 self.trusted_dockerhub_orgs = config_dict.get(
203 "trusted_dockerhub_orgs",
204 [
205 "nvidia",
206 "pytorch",
207 "rayproject",
208 "tensorflow",
209 "huggingface",
210 "amazon",
211 "bitnami",
212 "gco",
213 ],
214 )
216 # Warn about trusted_registries entries that look like Docker Hub orgs (no dot or colon)
217 for registry in self.trusted_registries:
218 if not self._is_registry_domain(registry): 218 ↛ 219line 218 didn't jump to line 219 because the condition on line 218 was never true
219 logger.warning(
220 f"Trusted registry '{registry}' has no domain separator (dot or colon) — "
221 f"consider moving it to trusted_dockerhub_orgs instead"
222 )
224 # YAML parsing limits (configurable via cdk.json)
225 self.yaml_max_depth = int(config_dict.get("yaml_max_depth", 50))
227 # Allowed resource kinds (configurable via cdk.json)
228 self.allowed_kinds = set(
229 config_dict.get(
230 "allowed_kinds",
231 [
232 "Job",
233 "CronJob",
234 "Deployment",
235 "StatefulSet",
236 "DaemonSet",
237 "Service",
238 "ConfigMap",
239 "Pod",
240 ],
241 )
242 )
244 # Security policy — toggleable checks (configurable via cdk.json)
245 security_policy = config_dict.get("manifest_security_policy", {})
246 self.block_privileged = security_policy.get("block_privileged", True)
247 self.block_privilege_escalation = security_policy.get("block_privilege_escalation", True)
248 self.block_host_network = security_policy.get("block_host_network", True)
249 self.block_host_pid = security_policy.get("block_host_pid", True)
250 self.block_host_ipc = security_policy.get("block_host_ipc", True)
251 self.block_host_path = security_policy.get("block_host_path", True)
252 self.block_added_capabilities = security_policy.get("block_added_capabilities", True)
253 self.block_run_as_root = security_policy.get("block_run_as_root", False)
255 # ------------------------------------------------------------------
256 # Security defaults injection
257 # ------------------------------------------------------------------
259 @staticmethod
260 def _extract_pod_spec(manifest: dict[str, Any]) -> dict[str, Any] | None:
261 """Extract the pod spec from a manifest, handling all workload types.
263 Supports:
264 - Deployment / StatefulSet / DaemonSet / ReplicaSet → spec.template.spec
265 - Job → spec.template.spec
266 - CronJob → spec.jobTemplate.spec.template.spec
267 - Bare Pod → spec (when ``containers`` key is present)
269 Returns:
270 The pod spec dict (mutable reference), or ``None`` if the manifest
271 does not contain a recognisable pod spec.
272 """
273 spec = manifest.get("spec")
274 if spec is None or not isinstance(spec, dict):
275 return None
277 kind = manifest.get("kind", "")
279 # CronJob: spec.jobTemplate.spec.template.spec
280 if kind == "CronJob":
281 job_template = spec.get("jobTemplate")
282 if isinstance(job_template, dict): 282 ↛ 290line 282 didn't jump to line 290 because the condition on line 282 was always true
283 job_spec = job_template.get("spec")
284 if isinstance(job_spec, dict): 284 ↛ 290line 284 didn't jump to line 290 because the condition on line 284 was always true
285 template = job_spec.get("template")
286 if isinstance(template, dict): 286 ↛ 290line 286 didn't jump to line 290 because the condition on line 286 was always true
287 pod_spec = template.get("spec")
288 if isinstance(pod_spec, dict): 288 ↛ 290line 288 didn't jump to line 290 because the condition on line 288 was always true
289 return pod_spec
290 return None
292 # Deployment / StatefulSet / DaemonSet / ReplicaSet / Job:
293 # spec.template.spec
294 if "template" in spec:
295 template = spec.get("template")
296 if isinstance(template, dict): 296 ↛ 300line 296 didn't jump to line 300 because the condition on line 296 was always true
297 pod_spec = template.get("spec")
298 if isinstance(pod_spec, dict): 298 ↛ 300line 298 didn't jump to line 300 because the condition on line 298 was always true
299 return pod_spec
300 return None
302 # Bare Pod: spec contains "containers" directly
303 if "containers" in spec:
304 return cast(dict[str, Any], spec)
306 return None
308 def _inject_security_defaults(self, manifest: dict[str, Any]) -> dict[str, Any]:
309 """Inject security defaults into user-submitted manifests.
311 Currently injects:
312 - ``automountServiceAccountToken: false`` in the pod spec (unless the
313 user has explicitly set it).
315 The method mutates *manifest* in-place and returns it for convenience.
316 """
317 pod_spec = self._extract_pod_spec(manifest)
318 if pod_spec is not None:
319 # Use setdefault so we don't override an explicit user choice
320 pod_spec.setdefault("automountServiceAccountToken", False)
321 return manifest
323 @property
324 def dynamic_client(self) -> dynamic.DynamicClient:
325 """Lazy-initialized dynamic client for CRD support."""
326 if self._dynamic_client is None:
327 self._dynamic_client = dynamic.DynamicClient(self.api_client)
328 return self._dynamic_client
330 def _parse_cpu_string(self, cpu_str: str) -> int:
331 """Parse CPU string to millicores"""
332 if not cpu_str:
333 return 0
335 cpu_str = cpu_str.strip()
336 if cpu_str.endswith("m"):
337 return int(cpu_str[:-1])
338 return int(cpu_str) * 1000
340 def _parse_memory_string(self, memory_str: str) -> int:
341 """Parse memory string to bytes"""
342 if not memory_str:
343 return 0
345 memory_str = memory_str.strip()
347 if memory_str.endswith("Ki"):
348 return int(memory_str[:-2]) * 1024
349 if memory_str.endswith("Mi"):
350 return int(memory_str[:-2]) * 1024 * 1024
351 if memory_str.endswith("Gi"):
352 return int(memory_str[:-2]) * 1024 * 1024 * 1024
353 if memory_str.endswith("Ti"):
354 return int(memory_str[:-2]) * 1024 * 1024 * 1024 * 1024
355 if memory_str.endswith("k"):
356 return int(memory_str[:-1]) * 1000
357 if memory_str.endswith("M"):
358 return int(memory_str[:-1]) * 1000 * 1000
359 if memory_str.endswith("G"):
360 return int(memory_str[:-1]) * 1000 * 1000 * 1000
361 return int(memory_str)
363 def _check_yaml_depth(self, obj: Any, current_depth: int = 0) -> bool:
364 """Check if a parsed YAML/JSON object exceeds max nesting depth.
366 Recursively walks dicts and lists. Returns False if depth exceeds
367 ``self.yaml_max_depth``.
369 Args:
370 obj: The parsed object to check (dict, list, or scalar).
371 current_depth: Current recursion depth (callers should leave at 0).
373 Returns:
374 True if the object is within the depth limit, False otherwise.
375 """
376 if current_depth > self.yaml_max_depth:
377 return False
378 if isinstance(obj, dict):
379 return all(self._check_yaml_depth(v, current_depth + 1) for v in obj.values())
380 if isinstance(obj, list):
381 return all(self._check_yaml_depth(item, current_depth + 1) for item in obj)
382 return True
384 def validate_manifest(self, manifest: dict[str, Any]) -> tuple[bool, str | None]:
385 """
386 Validate a Kubernetes manifest for security and resource constraints
387 Returns: (is_valid, error_message)
388 """
389 if not self.validation_enabled:
390 return True, None
392 try:
393 # YAML depth check — reject excessively nested documents
394 if not self._check_yaml_depth(manifest):
395 return (
396 False,
397 f"Manifest exceeds maximum nesting depth of {self.yaml_max_depth} levels",
398 )
400 # Basic structure validation
401 required_fields = ["apiVersion", "kind", "metadata"]
402 for field in required_fields:
403 if field not in manifest:
404 return False, f"Missing required field: {field}"
406 # Validate metadata
407 metadata = manifest.get("metadata", {})
408 if "name" not in metadata:
409 return False, "Missing metadata.name field"
411 # Validate namespace
412 namespace = metadata.get("namespace", "default")
413 if namespace not in self.allowed_namespaces:
414 return (
415 False,
416 f"Namespace '{namespace}' not allowed. Allowed namespaces: {list(self.allowed_namespaces)}",
417 )
419 # Validate resource kind
420 kind = manifest.get("kind", "")
421 if kind not in self.allowed_kinds:
422 return (
423 False,
424 f"Resource kind '{kind}' is not allowed. Allowed kinds: {sorted(self.allowed_kinds)}",
425 )
427 # Validate resource limits for workload resources
428 if kind in [
429 "Deployment",
430 "Job",
431 "CronJob",
432 "StatefulSet",
433 "DaemonSet",
434 ]:
435 resource_valid, resource_error = self._validate_resource_limits(manifest)
436 if not resource_valid:
437 return False, resource_error
439 # Security validations
440 sec_valid, sec_error = self._validate_security_context(manifest)
441 if not sec_valid:
442 return False, f"Security context validation failed: {sec_error}"
444 # Validate image sources (prevent pulling from untrusted registries)
445 img_valid, img_error = self._validate_image_sources(manifest)
446 if not img_valid:
447 return False, img_error or "Untrusted image sources detected"
449 return True, None
451 except Exception as e:
452 logger.error(f"Error validating manifest: {e}")
453 return False, f"Validation error: {e!s}"
455 def _validate_resource_limits(self, manifest: dict[str, Any]) -> tuple[bool, str]:
456 """Validate resource limits in manifest.
458 Returns:
459 Tuple of (is_valid, error_message). error_message is empty if valid.
460 """
461 try:
462 errors: list[str] = []
463 spec = manifest.get("spec", {})
465 # Get pod spec (handle different resource types)
466 pod_spec = {}
467 if "template" in spec: # Deployment, StatefulSet, etc.
468 pod_spec = spec.get("template", {}).get("spec", {})
469 elif "jobTemplate" in spec: # CronJob 469 ↛ 473line 469 didn't jump to line 473 because the condition on line 469 was always true
470 pod_spec = (
471 spec.get("jobTemplate", {}).get("spec", {}).get("template", {}).get("spec", {})
472 )
473 elif "containers" in spec: # Pod
474 pod_spec = spec
476 total_cpu = 0
477 total_memory = 0
478 total_gpu = 0
480 for _container_type, container in self._get_all_containers(pod_spec):
481 resources = container.get("resources", {})
482 requests = resources.get("requests", {})
483 limits = resources.get("limits", {})
485 # Check CPU (use limits if available, otherwise requests)
486 cpu = limits.get("cpu") or requests.get( # nosec B113 - dict.get(), not HTTP requests
487 "cpu", "0"
488 )
489 total_cpu += self._parse_cpu_string(cpu)
491 # Check Memory
492 memory = limits.get("memory") or requests.get( # nosec B113 - dict.get(), not HTTP requests
493 "memory", "0"
494 )
495 total_memory += self._parse_memory_string(memory)
497 # Check GPU
498 gpu = limits.get("nvidia.com/gpu") or requests.get( # nosec B113 - dict.get(), not HTTP requests
499 "nvidia.com/gpu", "0"
500 )
501 total_gpu += int(gpu)
503 # Validate against limits
504 if total_cpu > self.max_cpu_per_manifest:
505 logger.warning(f"CPU limit exceeded: {total_cpu}m > {self.max_cpu_per_manifest}m")
506 errors.append(f"CPU {total_cpu}m exceeds max {self.max_cpu_per_manifest}m")
508 if total_memory > self.max_memory_per_manifest:
509 logger.warning(
510 f"Memory limit exceeded: {total_memory} > {self.max_memory_per_manifest}"
511 )
512 mem_gb = self.max_memory_per_manifest / (1024**3)
513 req_gb = total_memory / (1024**3)
514 errors.append(f"Memory {req_gb:.0f}Gi exceeds max {mem_gb:.0f}Gi")
516 if total_gpu > self.max_gpu_per_manifest:
517 logger.warning(f"GPU limit exceeded: {total_gpu} > {self.max_gpu_per_manifest}")
518 errors.append(f"GPU {total_gpu} exceeds max {self.max_gpu_per_manifest}")
520 if errors:
521 hint = (
522 "To raise limits, update resource_quotas in cdk.json "
523 "and redeploy (see examples/README.md#troubleshooting)"
524 )
525 return False, "; ".join(errors) + f". {hint}"
527 return True, ""
529 except Exception as e:
530 logger.error(f"Error validating resource limits: {e}")
531 return False, f"Resource limit validation error: {e}"
533 def _get_all_containers(self, pod_spec: dict[str, Any]) -> list[tuple[str, dict[str, Any]]]:
534 """Get all containers from pod spec including init and ephemeral containers.
536 Returns:
537 List of (container_type, container_dict) tuples where container_type
538 is one of 'container', 'initContainer', or 'ephemeralContainer'.
539 """
540 result = []
541 for c in pod_spec.get("containers", []):
542 result.append(("container", c))
543 for c in pod_spec.get("initContainers", []):
544 result.append(("initContainer", c))
545 for c in pod_spec.get("ephemeralContainers", []):
546 result.append(("ephemeralContainer", c))
547 return result
549 def _validate_security_context(self, manifest: dict[str, Any]) -> tuple[bool, str | None]:
550 """Validate security context settings.
552 Returns:
553 Tuple of (is_valid, error_message). error_message is None if valid.
554 """
555 try:
556 # Basic security checks - prevent privileged containers
557 spec = manifest.get("spec", {})
559 # Get pod spec (handle different resource types)
560 pod_spec = None
561 if "template" in spec:
562 pod_spec = spec.get("template", {}).get("spec", {})
563 elif "jobTemplate" in spec:
564 pod_spec = (
565 spec.get("jobTemplate", {}).get("spec", {}).get("template", {}).get("spec", {})
566 )
567 elif "containers" in spec:
568 pod_spec = spec
570 if pod_spec:
571 # --- Pod-level checks ---
572 if self.block_host_network and pod_spec.get("hostNetwork", False):
573 return False, "hostNetwork is not permitted"
575 if self.block_host_pid and pod_spec.get("hostPID", False):
576 return False, "hostPID is not permitted"
578 if self.block_host_ipc and pod_spec.get("hostIPC", False):
579 return False, "hostIPC is not permitted"
581 # Check volumes for hostPath
582 if self.block_host_path:
583 for volume in pod_spec.get("volumes", []):
584 if volume.get("hostPath") is not None:
585 return False, "hostPath volumes are not permitted"
587 # Check pod security context
588 security_context = pod_spec.get("securityContext", {})
589 if self.block_privileged and security_context.get("privileged", False):
590 return False, "privileged pod security context is not permitted"
592 if self.block_run_as_root:
593 run_as_user = security_context.get("runAsUser")
594 if run_as_user is not None and run_as_user == 0:
595 return False, "running as root (runAsUser: 0) is not permitted"
597 # --- Container-level checks ---
598 for container_type, container in self._get_all_containers(pod_spec):
599 container_name = container.get("name", "unknown")
600 container_security = container.get("securityContext", {})
601 if self.block_privileged and container_security.get("privileged", False):
602 return (
603 False,
604 f"{container_type} '{container_name}': privileged containers are not permitted",
605 )
606 if self.block_privilege_escalation and container_security.get(
607 "allowPrivilegeEscalation", False
608 ):
609 return (
610 False,
611 f"{container_type} '{container_name}': allowPrivilegeEscalation is not permitted",
612 )
614 # Check for added capabilities
615 if self.block_added_capabilities:
616 added_caps = container_security.get("capabilities", {}).get("add", [])
617 if added_caps:
618 return (
619 False,
620 f"{container_type} '{container_name}': added capabilities are not permitted",
621 )
623 # Check for runAsUser: 0 (root) — off by default
624 if self.block_run_as_root:
625 run_as_user = container_security.get("runAsUser")
626 if run_as_user is not None and run_as_user == 0:
627 return (
628 False,
629 f"{container_type} '{container_name}': running as root (runAsUser: 0) is not permitted",
630 )
632 return True, None
634 except Exception as e:
635 logger.error(f"Error validating security context: {e}")
636 return False, f"Security context error: {e}"
638 @staticmethod
639 def _is_registry_domain(entry: str) -> bool:
640 """Check if a registry entry is a proper domain (contains dot or colon).
642 A proper registry domain contains either a dot (e.g., 'docker.io', 'gcr.io')
643 or a colon (e.g., 'localhost:5000'). Entries without these are Docker Hub
644 organization names (e.g., 'nvidia', 'gco').
645 """
646 return "." in entry or ":" in entry
648 def _validate_image_sources(self, manifest: dict[str, Any]) -> tuple[bool, str | None]:
649 """Validate container image sources.
651 Uses proper domain matching instead of prefix matching to prevent
652 dependency confusion attacks (e.g., 'gco-malicious/evil' should NOT
653 match a trusted registry entry 'gco').
655 Matching logic:
656 1. If image has no '/' → official Docker Hub image (always allowed)
657 2. If image has '/' and part before first '/' contains a dot or colon
658 → it's a registry domain → match against trusted_registries
659 3. If image has '/' but first segment has no dot/colon
660 → it's a Docker Hub org → match against trusted_dockerhub_orgs
661 4. Digest references (@sha256:) are accepted from any trusted source
663 Returns:
664 Tuple of (is_valid, error_message). error_message is None if valid.
665 """
666 try:
667 trusted_registries = self.trusted_registries
668 trusted_dockerhub_orgs = self.trusted_dockerhub_orgs
670 spec = manifest.get("spec", {})
672 # Get pod spec (handle different resource types)
673 pod_spec = {}
674 if "template" in spec:
675 pod_spec = spec.get("template", {}).get("spec", {})
676 elif "jobTemplate" in spec:
677 pod_spec = (
678 spec.get("jobTemplate", {}).get("spec", {}).get("template", {}).get("spec", {})
679 )
680 elif "containers" in spec:
681 pod_spec = spec
683 for container_type, container in self._get_all_containers(pod_spec):
684 image = container.get("image", "")
685 if not image:
686 continue
688 is_trusted = False
690 # Case 1: Official Docker Hub image (no slash) — e.g., "python:3.14", "busybox"
691 if "/" not in image:
692 is_trusted = True
693 else:
694 # Image has a slash — determine if first segment is a domain or org
695 first_segment = image.split("/")[0]
697 if self._is_registry_domain(first_segment):
698 # Case 2: First segment looks like a domain (has dot or colon)
699 # Match against trusted_registries as exact domain match
700 for registry in trusted_registries:
701 if first_segment == registry:
702 is_trusted = True
703 break
704 # Also support multi-level registry paths like "public.ecr.aws"
705 # where the image might be "public.ecr.aws/lambda/python:3.14"
706 if image.startswith(registry + "/"): 706 ↛ 707line 706 didn't jump to line 707 because the condition on line 706 was never true
707 is_trusted = True
708 break
709 else:
710 # Case 3: First segment has no dot/colon — it's a Docker Hub org
711 # Match against trusted_dockerhub_orgs
712 if first_segment in trusted_dockerhub_orgs:
713 is_trusted = True
715 if not is_trusted:
716 container_name = container.get("name", "unknown")
717 logger.warning(f"Untrusted image source: {image}")
718 return (
719 False,
720 f"{container_type} '{container_name}': Untrusted image source '{image}'",
721 )
723 return True, None
725 except Exception as e:
726 logger.error(f"Error validating image sources: {e}")
727 return False, f"Image source validation error: {e}"
729 async def process_manifest_submission(
730 self, request: ManifestSubmissionRequest
731 ) -> ManifestSubmissionResponse:
732 """
733 Process a manifest submission request
734 """
735 logger.info(f"Processing manifest submission with {len(request.manifests)} manifests")
737 resources = []
738 errors = []
739 overall_success = True
741 try:
742 # Process each manifest
743 for i, manifest_data in enumerate(request.manifests):
744 try:
745 # Validate manifest
746 is_valid, error_msg = self.validate_manifest(manifest_data)
747 if not is_valid:
748 error_msg = f"Manifest {i + 1} validation failed: {error_msg}"
749 errors.append(error_msg)
750 logger.error(error_msg)
752 # Create failed resource status
753 resource_status = ResourceStatus(
754 api_version=manifest_data.get("apiVersion", "unknown"),
755 kind=manifest_data.get("kind", "unknown"),
756 name=manifest_data.get("metadata", {}).get("name", f"manifest-{i + 1}"),
757 namespace=manifest_data.get("metadata", {}).get("namespace", "default"),
758 status="failed",
759 message=error_msg,
760 )
761 resources.append(resource_status)
762 overall_success = False
763 continue
765 # Apply manifest if validation passed
766 if not request.dry_run:
767 resource_status = await self._apply_manifest(
768 manifest_data, request.namespace
769 )
770 resources.append(resource_status)
772 if not resource_status.is_successful():
773 overall_success = False
774 else:
775 # Dry run - just validate
776 resource_status = ResourceStatus(
777 api_version=manifest_data.get("apiVersion", "unknown"),
778 kind=manifest_data.get("kind", "unknown"),
779 name=manifest_data.get("metadata", {}).get("name", "unknown"),
780 namespace=manifest_data.get("metadata", {}).get(
781 "namespace", request.namespace or "default"
782 ),
783 status="unchanged",
784 message="Dry run - validation passed",
785 )
786 resources.append(resource_status)
788 except Exception as e:
789 error_msg = f"Error processing manifest {i + 1}: {e!s}"
790 errors.append(error_msg)
791 logger.error(error_msg)
792 overall_success = False
794 # Create failed resource status
795 resource_status = ResourceStatus(
796 api_version=manifest_data.get("apiVersion", "unknown"),
797 kind=manifest_data.get("kind", "unknown"),
798 name=manifest_data.get("metadata", {}).get("name", f"manifest-{i + 1}"),
799 namespace=manifest_data.get("metadata", {}).get("namespace", "default"),
800 status="failed",
801 message=str(e),
802 )
803 resources.append(resource_status)
805 except Exception as e:
806 error_msg = f"Fatal error processing manifest submission: {e!s}"
807 errors.append(error_msg)
808 logger.error(error_msg)
809 overall_success = False
811 response = ManifestSubmissionResponse(
812 success=overall_success,
813 cluster_id=self.cluster_id,
814 region=self.region,
815 resources=resources,
816 errors=errors if errors else None,
817 )
819 logger.info(
820 f"Manifest submission completed - Success: {overall_success}, "
821 f"Resources: {len(resources)}, Errors: {len(errors)}"
822 )
824 return response
826 async def _apply_manifest(
827 self, manifest_data: dict[str, Any], default_namespace: str | None = None
828 ) -> ResourceStatus:
829 """
830 Apply a single manifest to the cluster.
832 For Jobs and CronJobs, if the resource already exists and is completed/failed,
833 it will be automatically deleted and recreated (since these resources are immutable).
834 """
835 try:
836 api_version: str = manifest_data.get("apiVersion", "unknown")
837 kind: str = manifest_data.get("kind", "unknown")
838 metadata = manifest_data.get("metadata", {})
839 name: str = metadata.get("name", "unknown")
840 namespace: str = metadata.get("namespace", default_namespace or "default")
842 # Ensure namespace is set in manifest
843 if "namespace" not in metadata and namespace:
844 manifest_data["metadata"]["namespace"] = namespace
846 # Inject security defaults (e.g., automountServiceAccountToken: false)
847 self._inject_security_defaults(manifest_data)
849 # Check if resource already exists
850 existing_resource = await self._get_existing_resource(
851 api_version, kind, name, namespace
852 )
854 if existing_resource:
855 # Jobs are immutable — if one already exists and is finished,
856 # delete it first so we can recreate cleanly.
857 # If the job is still active, auto-rename to avoid collision.
858 if kind == "Job":
859 if self._is_job_finished(existing_resource):
860 logger.info(
861 f"Job {name} already exists and is finished, deleting before recreating"
862 )
863 await self.delete_resource(api_version, kind, name, namespace)
864 import asyncio
866 await asyncio.sleep(1)
867 await self._create_resource(manifest_data)
868 status = "created"
869 message = "Previous completed job replaced with new submission"
870 else:
871 # Active job — rename to avoid destroying it
872 import uuid
874 suffix = uuid.uuid4().hex[:5]
875 new_name = f"{name}-{suffix}"
876 manifest_data["metadata"]["name"] = new_name
877 logger.warning(
878 f"Job {name} is still active, renamed new submission to {new_name}"
879 )
880 await self._create_resource(manifest_data)
881 status = "created"
882 message = (
883 f"Job '{name}' is still running. "
884 f"New submission renamed to '{new_name}'."
885 )
886 name = new_name
887 else:
888 # Update existing resource (works for mutable resources)
889 updated_resource = await self._update_resource(manifest_data)
890 status = "updated" if updated_resource else "unchanged"
891 message = (
892 "Resource updated successfully" if updated_resource else "No changes needed"
893 )
894 else:
895 # Create new resource
896 await self._create_resource(manifest_data)
897 status = "created"
898 message = "Resource created successfully"
900 return ResourceStatus(
901 api_version=api_version,
902 kind=kind,
903 name=name,
904 namespace=namespace,
905 status=status,
906 message=message,
907 )
909 except ApiException as e:
910 logger.error(f"Kubernetes API error applying manifest: {e}")
911 return ResourceStatus(
912 api_version=manifest_data.get("apiVersion", "unknown"),
913 kind=manifest_data.get("kind", "unknown"),
914 name=manifest_data.get("metadata", {}).get("name", "unknown"),
915 namespace=manifest_data.get("metadata", {}).get("namespace", "default"),
916 status="failed",
917 message=f"API error: {e.reason}",
918 )
919 except Exception as e:
920 logger.error(f"Error applying manifest: {e}")
921 return ResourceStatus(
922 api_version=manifest_data.get("apiVersion", "unknown"),
923 kind=manifest_data.get("kind", "unknown"),
924 name=manifest_data.get("metadata", {}).get("name", "unknown"),
925 namespace=manifest_data.get("metadata", {}).get("namespace", "default"),
926 status="failed",
927 message=str(e),
928 )
930 def _is_job_finished(self, job_resource: dict[str, Any]) -> bool:
931 """Check if a Kubernetes Job resource is in a terminal state (Complete or Failed)."""
932 status = job_resource.get("status", {})
933 conditions = status.get("conditions") or []
934 for condition in conditions:
935 cond_type = condition.get("type", "")
936 cond_status = condition.get("status", "")
937 if cond_type in ("Complete", "Failed") and cond_status == "True":
938 return True
939 return False
941 async def _get_existing_resource(
942 self, api_version: str, kind: str, name: str, namespace: str
943 ) -> dict[str, Any] | None:
944 """Check if a resource already exists using dynamic client"""
945 try:
946 # Get the API resource
947 api_resource = self._get_api_resource(api_version, kind)
949 # Try to get the resource
950 if namespace and api_resource.namespaced:
951 resource = api_resource.get(name=name, namespace=namespace)
952 else:
953 resource = api_resource.get(name=name)
955 if resource is not None: 955 ↛ 966line 955 didn't jump to line 966 because the condition on line 955 was always true
956 return dict(resource.to_dict())
958 except ApiException as e:
959 if e.status == 404:
960 return None # Resource doesn't exist
961 raise
962 except ValueError:
963 # Unknown resource type
964 return None
966 return None
968 def _get_api_resource(self, api_version: str, kind: str) -> Any:
969 """Get the API resource for a given apiVersion and kind using dynamic client."""
970 try:
971 return self.dynamic_client.resources.get(api_version=api_version, kind=kind)
972 except ResourceNotFoundError as e:
973 logger.error(
974 "Resource type not found: %s/%s",
975 sanitize_log_value(api_version),
976 sanitize_log_value(kind),
977 )
978 raise ValueError(f"Unknown resource type: {api_version}/{kind}") from e
980 async def _create_resource(self, manifest_data: dict[str, Any]) -> bool:
981 """Create a new resource from manifest using dynamic client"""
982 try:
983 api_version = manifest_data.get("apiVersion", "")
984 kind = manifest_data.get("kind", "")
985 namespace = manifest_data.get("metadata", {}).get("namespace")
987 # Get the API resource
988 api_resource = self._get_api_resource(api_version, kind)
990 # Create the resource
991 if namespace and api_resource.namespaced:
992 api_resource.create(body=manifest_data, namespace=namespace)
993 else:
994 api_resource.create(body=manifest_data)
996 return True
997 except Exception as e:
998 logger.error(f"Error creating resource: {e}")
999 raise
1001 async def _update_resource(self, manifest_data: dict[str, Any]) -> bool:
1002 """Update an existing resource using dynamic client"""
1003 try:
1004 api_version = manifest_data.get("apiVersion", "")
1005 kind = manifest_data.get("kind", "")
1006 name = manifest_data.get("metadata", {}).get("name", "")
1007 namespace = manifest_data.get("metadata", {}).get("namespace")
1009 # Get the API resource
1010 api_resource = self._get_api_resource(api_version, kind)
1012 # Update the resource using patch (server-side apply)
1013 if namespace and api_resource.namespaced:
1014 api_resource.patch(
1015 body=manifest_data,
1016 name=name,
1017 namespace=namespace,
1018 content_type="application/merge-patch+json",
1019 )
1020 else:
1021 api_resource.patch(
1022 body=manifest_data,
1023 name=name,
1024 content_type="application/merge-patch+json",
1025 )
1027 return True
1028 except Exception as e:
1029 logger.error(f"Error updating resource: {e}")
1030 raise
1032 async def delete_resource(
1033 self, api_version: str, kind: str, name: str, namespace: str
1034 ) -> ResourceStatus:
1035 """
1036 Delete a resource from the cluster using dynamic client
1037 """
1038 try:
1039 # Get the API resource
1040 api_resource = self._get_api_resource(api_version, kind)
1042 # Delete the resource
1043 if namespace and api_resource.namespaced:
1044 api_resource.delete(name=name, namespace=namespace)
1045 else:
1046 api_resource.delete(name=name)
1048 return ResourceStatus(
1049 api_version=api_version,
1050 kind=kind,
1051 name=name,
1052 namespace=namespace,
1053 status="deleted",
1054 message="Resource deleted successfully",
1055 )
1057 except ValueError as e:
1058 # Unknown resource type
1059 return ResourceStatus(
1060 api_version=api_version,
1061 kind=kind,
1062 name=name,
1063 namespace=namespace,
1064 status="failed",
1065 message=str(e),
1066 )
1067 except ApiException as e:
1068 if e.status == 404:
1069 return ResourceStatus(
1070 api_version=api_version,
1071 kind=kind,
1072 name=name,
1073 namespace=namespace,
1074 status="unchanged",
1075 message="Resource not found (already deleted)",
1076 )
1077 return ResourceStatus(
1078 api_version=api_version,
1079 kind=kind,
1080 name=name,
1081 namespace=namespace,
1082 status="failed",
1083 message=f"Delete failed: {e.reason}",
1084 )
1085 except Exception as e:
1086 return ResourceStatus(
1087 api_version=api_version,
1088 kind=kind,
1089 name=name,
1090 namespace=namespace,
1091 status="failed",
1092 message=str(e),
1093 )
1095 async def list_jobs(
1096 self, namespace: str | None = None, status_filter: str | None = None
1097 ) -> list[dict[str, Any]]:
1098 """
1099 List Kubernetes Jobs from allowed namespaces.
1101 Args:
1102 namespace: Filter by specific namespace (must be in allowed_namespaces)
1103 status_filter: Filter by status: "running", "completed", "failed"
1105 Returns:
1106 List of job dictionaries with metadata and status
1107 """
1108 jobs = []
1110 # Determine which namespaces to query
1111 if namespace:
1112 if namespace not in self.allowed_namespaces:
1113 raise ValueError(
1114 f"Namespace '{namespace}' not allowed. "
1115 f"Allowed namespaces: {list(self.allowed_namespaces)}"
1116 )
1117 namespaces_to_query = [namespace]
1118 else:
1119 namespaces_to_query = list(self.allowed_namespaces)
1121 for ns in namespaces_to_query:
1122 try:
1123 job_list = self.batch_v1.list_namespaced_job(
1124 namespace=ns, _request_timeout=self._k8s_timeout
1125 )
1126 for job in job_list.items:
1127 job_dict = self._job_to_dict(job)
1129 # Apply status filter
1130 if status_filter:
1131 job_status = self._get_job_status(job)
1132 if job_status != status_filter:
1133 continue
1135 jobs.append(job_dict)
1136 except ApiException as e:
1137 logger.warning(
1138 "Failed to list jobs in namespace %s: %s", sanitize_log_value(ns), e.reason
1139 )
1140 continue
1142 return jobs
1144 def _job_to_dict(self, job: V1Job) -> dict[str, Any]:
1145 """Convert a Kubernetes Job object to a dictionary."""
1146 metadata = job.metadata
1147 status = job.status
1148 spec = job.spec
1150 return {
1151 "metadata": {
1152 "name": metadata.name,
1153 "namespace": metadata.namespace,
1154 "creationTimestamp": (
1155 metadata.creation_timestamp.isoformat() if metadata.creation_timestamp else None
1156 ),
1157 "labels": metadata.labels or {},
1158 "uid": metadata.uid,
1159 },
1160 "spec": {
1161 "parallelism": spec.parallelism,
1162 "completions": spec.completions,
1163 "backoffLimit": spec.backoff_limit,
1164 },
1165 "status": {
1166 "active": status.active or 0,
1167 "succeeded": status.succeeded or 0,
1168 "failed": status.failed or 0,
1169 "startTime": status.start_time.isoformat() if status.start_time else None,
1170 "completionTime": (
1171 status.completion_time.isoformat() if status.completion_time else None
1172 ),
1173 "conditions": [
1174 {
1175 "type": c.type,
1176 "status": c.status,
1177 "reason": c.reason,
1178 "message": c.message,
1179 }
1180 for c in (status.conditions or [])
1181 ],
1182 },
1183 }
1185 def _get_job_status(self, job: V1Job) -> str:
1186 """Determine the status of a job: running, completed, or failed."""
1187 status = job.status
1188 conditions = status.conditions or []
1190 for condition in conditions:
1191 if condition.type == "Complete" and condition.status == "True":
1192 return "completed"
1193 if condition.type == "Failed" and condition.status == "True": 1193 ↛ 1190line 1193 didn't jump to line 1190 because the condition on line 1193 was always true
1194 return "failed"
1196 if (status.active or 0) > 0:
1197 return "running"
1199 return "pending"
1201 async def get_resource_status(
1202 self, api_version: str, kind: str, name: str, namespace: str
1203 ) -> dict[str, Any] | None:
1204 """
1205 Get the status of a specific resource
1206 """
1207 try:
1208 resource = await self._get_existing_resource(api_version, kind, name, namespace)
1209 if resource:
1210 return {
1211 "api_version": api_version,
1212 "kind": kind,
1213 "name": name,
1214 "namespace": namespace,
1215 "exists": True,
1216 "status": resource.get("status", {}),
1217 "metadata": resource.get("metadata", {}),
1218 "spec": resource.get("spec", {}),
1219 }
1220 return {
1221 "api_version": api_version,
1222 "kind": kind,
1223 "name": name,
1224 "namespace": namespace,
1225 "exists": False,
1226 }
1227 except Exception as e:
1228 logger.error(f"Error getting resource status: {e}")
1229 return None
1232def create_manifest_processor_from_env() -> ManifestProcessor:
1233 """
1234 Create ManifestProcessor instance from environment variables
1235 """
1236 cluster_id = os.getenv("CLUSTER_NAME", "unknown-cluster")
1237 region = os.getenv("REGION", "unknown-region")
1239 # Enable structured JSON logging for CloudWatch Insights
1240 configure_structured_logging(
1241 service_name="manifest-processor",
1242 cluster_id=cluster_id,
1243 region=region,
1244 )
1246 # Load configuration from environment
1247 config_dict = {
1248 "max_cpu_per_manifest": os.getenv("MAX_CPU_PER_MANIFEST", "10"),
1249 "max_memory_per_manifest": os.getenv("MAX_MEMORY_PER_MANIFEST", "32Gi"),
1250 "max_gpu_per_manifest": int(os.getenv("MAX_GPU_PER_MANIFEST", "4")),
1251 "allowed_namespaces": os.getenv("ALLOWED_NAMESPACES", "default,gco-jobs").split(","),
1252 "validation_enabled": os.getenv("VALIDATION_ENABLED", "true").lower() == "true",
1253 }
1255 # Image registry allowlist — sourced from the same CDK env vars the
1256 # queue_processor reads, so an attacker who holds sqs:SendMessage on
1257 # the regional queue can't reach an image source the REST path
1258 # rejects. When unset (or empty) the ManifestProcessor falls back
1259 # to its hardcoded default. Empty/missing values are dropped to
1260 # match the queue_processor's parsing rules.
1261 trusted_registries_env = os.getenv("TRUSTED_REGISTRIES", "")
1262 trusted_registries = [r.strip() for r in trusted_registries_env.split(",") if r.strip()]
1263 if trusted_registries: 1263 ↛ 1264line 1263 didn't jump to line 1264 because the condition on line 1263 was never true
1264 config_dict["trusted_registries"] = trusted_registries
1266 trusted_dockerhub_orgs_env = os.getenv("TRUSTED_DOCKERHUB_ORGS", "")
1267 trusted_dockerhub_orgs = [o.strip() for o in trusted_dockerhub_orgs_env.split(",") if o.strip()]
1268 if trusted_dockerhub_orgs: 1268 ↛ 1269line 1268 didn't jump to line 1269 because the condition on line 1268 was never true
1269 config_dict["trusted_dockerhub_orgs"] = trusted_dockerhub_orgs
1271 return ManifestProcessor(cluster_id, region, config_dict)