Coverage for gco / services / manifest_processor.py: 97%
518 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-30 21:47 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-30 21:47 +0000
1"""
2Manifest Processor Service for GCO (Global Capacity Orchestrator on AWS).
4This service processes Kubernetes manifest submissions, validates them against
5security and resource constraints, and applies them to the cluster.
7Key Features:
8- Validates manifests for required fields and structure
9- Enforces namespace restrictions (only allowed namespaces)
10- Enforces resource limits (CPU, memory, GPU per manifest)
11- Validates security context (no privileged containers)
12- Validates image sources (trusted registries only)
13- Supports dry-run mode for validation without applying
15Security Validations:
16- Namespace must be in allowed list (default: default, gco-jobs)
17- No privileged containers or privilege escalation
18- Images must be from trusted registries
19- Resource requests/limits within configured maximums
21Environment Variables:
22 CLUSTER_NAME: Name of the EKS cluster
23 REGION: AWS region of the cluster
24 MAX_CPU_PER_MANIFEST: Maximum CPU (millicores) per manifest (default: 10000)
25 MAX_MEMORY_PER_MANIFEST: Maximum memory per manifest (default: 32Gi)
26 MAX_GPU_PER_MANIFEST: Maximum GPUs per manifest (default: 4)
27 ALLOWED_NAMESPACES: Comma-separated list of allowed namespaces
28 VALIDATION_ENABLED: Enable/disable validation (default: true)
30Usage:
31 processor = create_manifest_processor_from_env()
32 response = await processor.process_manifest_submission(request)
33"""
35from __future__ import annotations
37import logging
38import os
39from typing import Any, cast
41import yaml
42from kubernetes import client, config, dynamic
43from kubernetes.client.models import V1Job
44from kubernetes.client.rest import ApiException
45from kubernetes.dynamic.exceptions import ResourceNotFoundError
47from gco.models import (
48 ManifestSubmissionRequest,
49 ManifestSubmissionResponse,
50 ResourceStatus,
51)
52from gco.services.structured_logging import configure_structured_logging
54# NOTE: No logging.basicConfig() here. This module is imported by the CLI
55# (cli/jobs.py, cli/commands/*_cmd.py) as a library for YAML loading helpers.
56# Calling basicConfig() at import time would configure the root logger with
57# INFO-level output, causing noisy botocore/urllib3 INFO messages on every
58# CLI command. Container entry points (manifest_api.py) do their own
59# basicConfig() call.
60logger = logging.getLogger(__name__)
63# ---------------------------------------------------------------------------
64# YAML Alias Rejection Loader
65# ---------------------------------------------------------------------------
68class NoAliasSafeLoader(yaml.SafeLoader):
69 """A YAML SafeLoader that rejects anchors and aliases.
71 YAML anchors (``&anchor``) and aliases (``*anchor``) can be used to
72 construct exponentially large data structures (billion-laughs attack).
73 This loader raises an error when any alias is encountered, preventing
74 such attacks at the parsing stage.
75 """
77 def compose_node(self, parent: Any, index: Any) -> Any:
78 if self.check_event(yaml.AliasEvent): # type: ignore[no-untyped-call]
79 event = self.get_event() # type: ignore[no-untyped-call]
80 raise yaml.composer.ComposerError(
81 None,
82 None,
83 "YAML aliases are not allowed "
84 "(security policy: yaml_allow_aliases=false), "
85 f"found alias *{event.anchor}",
86 event.start_mark,
87 )
88 return super().compose_node(parent, index)
91def safe_load_yaml(stream: str | Any, *, allow_aliases: bool = False) -> Any:
92 """Load a single YAML document with optional alias rejection.
94 Args:
95 stream: YAML string or file-like object.
96 allow_aliases: If False (default), reject YAML anchors/aliases.
98 Returns:
99 Parsed YAML document.
101 Raises:
102 yaml.YAMLError: If the document is invalid or contains aliases
103 when ``allow_aliases`` is False.
104 """
105 loader_cls = yaml.SafeLoader if allow_aliases else NoAliasSafeLoader
106 # Loader is always a SafeLoader subclass (SafeLoader or NoAliasSafeLoader),
107 # so this is equivalent to yaml.safe_load. Bandit's B506 check does not
108 # recognize the custom loader as safe.
109 return yaml.load(stream, Loader=loader_cls) # nosec B506
112def safe_load_all_yaml(stream: str | Any, *, allow_aliases: bool = False) -> list[Any]:
113 """Load all YAML documents from a stream with optional alias rejection.
115 Args:
116 stream: YAML string or file-like object.
117 allow_aliases: If False (default), reject YAML anchors/aliases.
119 Returns:
120 List of parsed YAML documents (``None`` documents are skipped).
122 Raises:
123 yaml.YAMLError: If any document is invalid or contains aliases
124 when ``allow_aliases`` is False.
125 """
126 loader_cls = yaml.SafeLoader if allow_aliases else NoAliasSafeLoader
127 # Loader is always a SafeLoader subclass, so this is equivalent to
128 # yaml.safe_load_all. Bandit's B506 check does not recognize the custom
129 # loader as safe.
130 return [
131 doc for doc in yaml.load_all(stream, Loader=loader_cls) if doc is not None # nosec B506
132 ]
135class ManifestProcessor:
136 """
137 Processes Kubernetes manifest submissions and applies them to the cluster
138 """
140 def __init__(self, cluster_id: str, region: str, config_dict: dict[str, Any]):
141 self.cluster_id = cluster_id
142 self.region = region
143 self.config = config_dict
145 # Initialize Kubernetes clients
146 try:
147 # Try to load in-cluster config first (when running in pod)
148 config.load_incluster_config()
149 logger.info("Loaded in-cluster Kubernetes configuration")
150 except config.ConfigException:
151 try:
152 # Fall back to local kubeconfig (for development)
153 config.load_kube_config()
154 logger.info("Loaded local Kubernetes configuration")
155 except config.ConfigException as e:
156 logger.error(f"Failed to load Kubernetes configuration: {e}")
157 raise
159 # Initialize API clients
160 self.api_client = client.ApiClient()
161 self.api_client.configuration.request_timeout = int(os.environ.get("K8S_API_TIMEOUT", "30"))
162 self.core_v1 = client.CoreV1Api()
163 self.apps_v1 = client.AppsV1Api()
164 self.batch_v1 = client.BatchV1Api()
165 self.networking_v1 = client.NetworkingV1Api()
166 self.custom_objects = client.CustomObjectsApi()
168 # Dynamic client for CRDs - lazy initialized to avoid cluster connection during init
169 self._dynamic_client: dynamic.DynamicClient | None = None
171 # Timeout for Kubernetes API calls (seconds)
172 self._k8s_timeout = int(os.environ.get("K8S_API_TIMEOUT", "30"))
174 # Resource quotas and limits
175 self.max_cpu_per_manifest = self._parse_cpu_string(
176 config_dict.get("max_cpu_per_manifest", "10")
177 )
178 self.max_memory_per_manifest = self._parse_memory_string(
179 config_dict.get("max_memory_per_manifest", "32Gi")
180 )
181 self.max_gpu_per_manifest = int(config_dict.get("max_gpu_per_manifest", 4))
182 self.allowed_namespaces = set(
183 config_dict.get("allowed_namespaces", ["default", "gco-jobs"])
184 )
185 self.validation_enabled = config_dict.get("validation_enabled", True)
187 # Trusted registries for image validation (configurable via cdk.json)
188 self.trusted_registries = config_dict.get(
189 "trusted_registries",
190 [
191 "docker.io",
192 "gcr.io",
193 "quay.io",
194 "registry.k8s.io",
195 "k8s.gcr.io",
196 "public.ecr.aws",
197 "nvcr.io",
198 ],
199 )
200 self.trusted_dockerhub_orgs = config_dict.get(
201 "trusted_dockerhub_orgs",
202 [
203 "nvidia",
204 "pytorch",
205 "rayproject",
206 "tensorflow",
207 "huggingface",
208 "amazon",
209 "bitnami",
210 "gco",
211 ],
212 )
214 # Warn about trusted_registries entries that look like Docker Hub orgs (no dot or colon)
215 for registry in self.trusted_registries:
216 if not self._is_registry_domain(registry): 216 ↛ 217line 216 didn't jump to line 217 because the condition on line 216 was never true
217 logger.warning(
218 f"Trusted registry '{registry}' has no domain separator (dot or colon) — "
219 f"consider moving it to trusted_dockerhub_orgs instead"
220 )
222 # YAML parsing limits (configurable via cdk.json)
223 self.yaml_max_depth = int(config_dict.get("yaml_max_depth", 50))
225 # Allowed resource kinds (configurable via cdk.json)
226 self.allowed_kinds = set(
227 config_dict.get(
228 "allowed_kinds",
229 [
230 "Job",
231 "CronJob",
232 "Deployment",
233 "StatefulSet",
234 "DaemonSet",
235 "Service",
236 "ConfigMap",
237 "Pod",
238 ],
239 )
240 )
242 # Security policy — toggleable checks (configurable via cdk.json)
243 security_policy = config_dict.get("manifest_security_policy", {})
244 self.block_privileged = security_policy.get("block_privileged", True)
245 self.block_privilege_escalation = security_policy.get("block_privilege_escalation", True)
246 self.block_host_network = security_policy.get("block_host_network", True)
247 self.block_host_pid = security_policy.get("block_host_pid", True)
248 self.block_host_ipc = security_policy.get("block_host_ipc", True)
249 self.block_host_path = security_policy.get("block_host_path", True)
250 self.block_added_capabilities = security_policy.get("block_added_capabilities", True)
251 self.block_run_as_root = security_policy.get("block_run_as_root", False)
253 # ------------------------------------------------------------------
254 # Security defaults injection
255 # ------------------------------------------------------------------
257 @staticmethod
258 def _extract_pod_spec(manifest: dict[str, Any]) -> dict[str, Any] | None:
259 """Extract the pod spec from a manifest, handling all workload types.
261 Supports:
262 - Deployment / StatefulSet / DaemonSet / ReplicaSet → spec.template.spec
263 - Job → spec.template.spec
264 - CronJob → spec.jobTemplate.spec.template.spec
265 - Bare Pod → spec (when ``containers`` key is present)
267 Returns:
268 The pod spec dict (mutable reference), or ``None`` if the manifest
269 does not contain a recognisable pod spec.
270 """
271 spec = manifest.get("spec")
272 if spec is None or not isinstance(spec, dict):
273 return None
275 kind = manifest.get("kind", "")
277 # CronJob: spec.jobTemplate.spec.template.spec
278 if kind == "CronJob":
279 job_template = spec.get("jobTemplate")
280 if isinstance(job_template, dict): 280 ↛ 288line 280 didn't jump to line 288 because the condition on line 280 was always true
281 job_spec = job_template.get("spec")
282 if isinstance(job_spec, dict): 282 ↛ 288line 282 didn't jump to line 288 because the condition on line 282 was always true
283 template = job_spec.get("template")
284 if isinstance(template, dict): 284 ↛ 288line 284 didn't jump to line 288 because the condition on line 284 was always true
285 pod_spec = template.get("spec")
286 if isinstance(pod_spec, dict): 286 ↛ 288line 286 didn't jump to line 288 because the condition on line 286 was always true
287 return pod_spec
288 return None
290 # Deployment / StatefulSet / DaemonSet / ReplicaSet / Job:
291 # spec.template.spec
292 if "template" in spec:
293 template = spec.get("template")
294 if isinstance(template, dict): 294 ↛ 298line 294 didn't jump to line 298 because the condition on line 294 was always true
295 pod_spec = template.get("spec")
296 if isinstance(pod_spec, dict): 296 ↛ 298line 296 didn't jump to line 298 because the condition on line 296 was always true
297 return pod_spec
298 return None
300 # Bare Pod: spec contains "containers" directly
301 if "containers" in spec:
302 return cast(dict[str, Any], spec)
304 return None
306 def _inject_security_defaults(self, manifest: dict[str, Any]) -> dict[str, Any]:
307 """Inject security defaults into user-submitted manifests.
309 Currently injects:
310 - ``automountServiceAccountToken: false`` in the pod spec (unless the
311 user has explicitly set it).
313 The method mutates *manifest* in-place and returns it for convenience.
314 """
315 pod_spec = self._extract_pod_spec(manifest)
316 if pod_spec is not None:
317 # Use setdefault so we don't override an explicit user choice
318 pod_spec.setdefault("automountServiceAccountToken", False)
319 return manifest
321 @property
322 def dynamic_client(self) -> dynamic.DynamicClient:
323 """Lazy-initialized dynamic client for CRD support."""
324 if self._dynamic_client is None:
325 self._dynamic_client = dynamic.DynamicClient(self.api_client)
326 return self._dynamic_client
328 def _parse_cpu_string(self, cpu_str: str) -> int:
329 """Parse CPU string to millicores"""
330 if not cpu_str:
331 return 0
333 cpu_str = cpu_str.strip()
334 if cpu_str.endswith("m"):
335 return int(cpu_str[:-1])
336 return int(cpu_str) * 1000
338 def _parse_memory_string(self, memory_str: str) -> int:
339 """Parse memory string to bytes"""
340 if not memory_str:
341 return 0
343 memory_str = memory_str.strip()
345 if memory_str.endswith("Ki"):
346 return int(memory_str[:-2]) * 1024
347 if memory_str.endswith("Mi"):
348 return int(memory_str[:-2]) * 1024 * 1024
349 if memory_str.endswith("Gi"):
350 return int(memory_str[:-2]) * 1024 * 1024 * 1024
351 if memory_str.endswith("Ti"):
352 return int(memory_str[:-2]) * 1024 * 1024 * 1024 * 1024
353 if memory_str.endswith("k"):
354 return int(memory_str[:-1]) * 1000
355 if memory_str.endswith("M"):
356 return int(memory_str[:-1]) * 1000 * 1000
357 if memory_str.endswith("G"):
358 return int(memory_str[:-1]) * 1000 * 1000 * 1000
359 return int(memory_str)
361 def _check_yaml_depth(self, obj: Any, current_depth: int = 0) -> bool:
362 """Check if a parsed YAML/JSON object exceeds max nesting depth.
364 Recursively walks dicts and lists. Returns False if depth exceeds
365 ``self.yaml_max_depth``.
367 Args:
368 obj: The parsed object to check (dict, list, or scalar).
369 current_depth: Current recursion depth (callers should leave at 0).
371 Returns:
372 True if the object is within the depth limit, False otherwise.
373 """
374 if current_depth > self.yaml_max_depth:
375 return False
376 if isinstance(obj, dict):
377 return all(self._check_yaml_depth(v, current_depth + 1) for v in obj.values())
378 if isinstance(obj, list):
379 return all(self._check_yaml_depth(item, current_depth + 1) for item in obj)
380 return True
382 def validate_manifest(self, manifest: dict[str, Any]) -> tuple[bool, str | None]:
383 """
384 Validate a Kubernetes manifest for security and resource constraints
385 Returns: (is_valid, error_message)
386 """
387 if not self.validation_enabled:
388 return True, None
390 try:
391 # YAML depth check — reject excessively nested documents
392 if not self._check_yaml_depth(manifest):
393 return (
394 False,
395 f"Manifest exceeds maximum nesting depth of {self.yaml_max_depth} levels",
396 )
398 # Basic structure validation
399 required_fields = ["apiVersion", "kind", "metadata"]
400 for field in required_fields:
401 if field not in manifest:
402 return False, f"Missing required field: {field}"
404 # Validate metadata
405 metadata = manifest.get("metadata", {})
406 if "name" not in metadata:
407 return False, "Missing metadata.name field"
409 # Validate namespace
410 namespace = metadata.get("namespace", "default")
411 if namespace not in self.allowed_namespaces:
412 return (
413 False,
414 f"Namespace '{namespace}' not allowed. Allowed namespaces: {list(self.allowed_namespaces)}",
415 )
417 # Validate resource kind
418 kind = manifest.get("kind", "")
419 if kind not in self.allowed_kinds:
420 return (
421 False,
422 f"Resource kind '{kind}' is not allowed. Allowed kinds: {sorted(self.allowed_kinds)}",
423 )
425 # Validate resource limits for workload resources
426 if kind in [
427 "Deployment",
428 "Job",
429 "CronJob",
430 "StatefulSet",
431 "DaemonSet",
432 ]:
433 resource_valid, resource_error = self._validate_resource_limits(manifest)
434 if not resource_valid:
435 return False, resource_error
437 # Security validations
438 sec_valid, sec_error = self._validate_security_context(manifest)
439 if not sec_valid:
440 return False, f"Security context validation failed: {sec_error}"
442 # Validate image sources (prevent pulling from untrusted registries)
443 img_valid, img_error = self._validate_image_sources(manifest)
444 if not img_valid:
445 return False, img_error or "Untrusted image sources detected"
447 return True, None
449 except Exception as e:
450 logger.error(f"Error validating manifest: {e}")
451 return False, f"Validation error: {e!s}"
453 def _validate_resource_limits(self, manifest: dict[str, Any]) -> tuple[bool, str]:
454 """Validate resource limits in manifest.
456 Returns:
457 Tuple of (is_valid, error_message). error_message is empty if valid.
458 """
459 try:
460 errors: list[str] = []
461 spec = manifest.get("spec", {})
463 # Get pod spec (handle different resource types)
464 pod_spec = {}
465 if "template" in spec: # Deployment, StatefulSet, etc.
466 pod_spec = spec.get("template", {}).get("spec", {})
467 elif "jobTemplate" in spec: # CronJob 467 ↛ 471line 467 didn't jump to line 471 because the condition on line 467 was always true
468 pod_spec = (
469 spec.get("jobTemplate", {}).get("spec", {}).get("template", {}).get("spec", {})
470 )
471 elif "containers" in spec: # Pod
472 pod_spec = spec
474 total_cpu = 0
475 total_memory = 0
476 total_gpu = 0
478 for _container_type, container in self._get_all_containers(pod_spec):
479 resources = container.get("resources", {})
480 requests = resources.get("requests", {})
481 limits = resources.get("limits", {})
483 # Check CPU (use limits if available, otherwise requests)
484 cpu = limits.get(
485 "cpu"
486 ) or requests.get( # nosec B113 - dict.get(), not HTTP requests
487 "cpu", "0"
488 )
489 total_cpu += self._parse_cpu_string(cpu)
491 # Check Memory
492 memory = limits.get(
493 "memory"
494 ) or requests.get( # nosec B113 - dict.get(), not HTTP requests
495 "memory", "0"
496 )
497 total_memory += self._parse_memory_string(memory)
499 # Check GPU
500 gpu = limits.get(
501 "nvidia.com/gpu"
502 ) or requests.get( # nosec B113 - dict.get(), not HTTP requests
503 "nvidia.com/gpu", "0"
504 )
505 total_gpu += int(gpu)
507 # Validate against limits
508 if total_cpu > self.max_cpu_per_manifest:
509 logger.warning(f"CPU limit exceeded: {total_cpu}m > {self.max_cpu_per_manifest}m")
510 errors.append(f"CPU {total_cpu}m exceeds max {self.max_cpu_per_manifest}m")
512 if total_memory > self.max_memory_per_manifest:
513 logger.warning(
514 f"Memory limit exceeded: {total_memory} > {self.max_memory_per_manifest}"
515 )
516 mem_gb = self.max_memory_per_manifest / (1024**3)
517 req_gb = total_memory / (1024**3)
518 errors.append(f"Memory {req_gb:.0f}Gi exceeds max {mem_gb:.0f}Gi")
520 if total_gpu > self.max_gpu_per_manifest:
521 logger.warning(f"GPU limit exceeded: {total_gpu} > {self.max_gpu_per_manifest}")
522 errors.append(f"GPU {total_gpu} exceeds max {self.max_gpu_per_manifest}")
524 if errors:
525 hint = (
526 "To raise limits, update resource_quotas in cdk.json "
527 "and redeploy (see examples/README.md#troubleshooting)"
528 )
529 return False, "; ".join(errors) + f". {hint}"
531 return True, ""
533 except Exception as e:
534 logger.error(f"Error validating resource limits: {e}")
535 return False, f"Resource limit validation error: {e}"
537 def _get_all_containers(self, pod_spec: dict[str, Any]) -> list[tuple[str, dict[str, Any]]]:
538 """Get all containers from pod spec including init and ephemeral containers.
540 Returns:
541 List of (container_type, container_dict) tuples where container_type
542 is one of 'container', 'initContainer', or 'ephemeralContainer'.
543 """
544 result = []
545 for c in pod_spec.get("containers", []):
546 result.append(("container", c))
547 for c in pod_spec.get("initContainers", []):
548 result.append(("initContainer", c))
549 for c in pod_spec.get("ephemeralContainers", []):
550 result.append(("ephemeralContainer", c))
551 return result
553 def _validate_security_context(self, manifest: dict[str, Any]) -> tuple[bool, str | None]:
554 """Validate security context settings.
556 Returns:
557 Tuple of (is_valid, error_message). error_message is None if valid.
558 """
559 try:
560 # Basic security checks - prevent privileged containers
561 spec = manifest.get("spec", {})
563 # Get pod spec (handle different resource types)
564 pod_spec = None
565 if "template" in spec:
566 pod_spec = spec.get("template", {}).get("spec", {})
567 elif "jobTemplate" in spec:
568 pod_spec = (
569 spec.get("jobTemplate", {}).get("spec", {}).get("template", {}).get("spec", {})
570 )
571 elif "containers" in spec:
572 pod_spec = spec
574 if pod_spec:
575 # --- Pod-level checks ---
576 if self.block_host_network and pod_spec.get("hostNetwork", False):
577 return False, "hostNetwork is not permitted"
579 if self.block_host_pid and pod_spec.get("hostPID", False):
580 return False, "hostPID is not permitted"
582 if self.block_host_ipc and pod_spec.get("hostIPC", False):
583 return False, "hostIPC is not permitted"
585 # Check volumes for hostPath
586 if self.block_host_path:
587 for volume in pod_spec.get("volumes", []):
588 if volume.get("hostPath") is not None: 588 ↛ 587line 588 didn't jump to line 587 because the condition on line 588 was always true
589 return False, "hostPath volumes are not permitted"
591 # Check pod security context
592 security_context = pod_spec.get("securityContext", {})
593 if self.block_privileged and security_context.get("privileged", False):
594 return False, "privileged pod security context is not permitted"
596 if self.block_run_as_root:
597 run_as_user = security_context.get("runAsUser")
598 if run_as_user is not None and run_as_user == 0:
599 return False, "running as root (runAsUser: 0) is not permitted"
601 # --- Container-level checks ---
602 for container_type, container in self._get_all_containers(pod_spec):
603 container_name = container.get("name", "unknown")
604 container_security = container.get("securityContext", {})
605 if self.block_privileged and container_security.get("privileged", False):
606 return (
607 False,
608 f"{container_type} '{container_name}': privileged containers are not permitted",
609 )
610 if self.block_privilege_escalation and container_security.get(
611 "allowPrivilegeEscalation", False
612 ):
613 return (
614 False,
615 f"{container_type} '{container_name}': allowPrivilegeEscalation is not permitted",
616 )
618 # Check for added capabilities
619 if self.block_added_capabilities:
620 added_caps = container_security.get("capabilities", {}).get("add", [])
621 if added_caps:
622 return (
623 False,
624 f"{container_type} '{container_name}': added capabilities are not permitted",
625 )
627 # Check for runAsUser: 0 (root) — off by default
628 if self.block_run_as_root:
629 run_as_user = container_security.get("runAsUser")
630 if run_as_user is not None and run_as_user == 0:
631 return (
632 False,
633 f"{container_type} '{container_name}': running as root (runAsUser: 0) is not permitted",
634 )
636 return True, None
638 except Exception as e:
639 logger.error(f"Error validating security context: {e}")
640 return False, f"Security context error: {e}"
642 @staticmethod
643 def _is_registry_domain(entry: str) -> bool:
644 """Check if a registry entry is a proper domain (contains dot or colon).
646 A proper registry domain contains either a dot (e.g., 'docker.io', 'gcr.io')
647 or a colon (e.g., 'localhost:5000'). Entries without these are Docker Hub
648 organization names (e.g., 'nvidia', 'gco').
649 """
650 return "." in entry or ":" in entry
652 def _validate_image_sources(self, manifest: dict[str, Any]) -> tuple[bool, str | None]:
653 """Validate container image sources.
655 Uses proper domain matching instead of prefix matching to prevent
656 dependency confusion attacks (e.g., 'gco-malicious/evil' should NOT
657 match a trusted registry entry 'gco').
659 Matching logic:
660 1. If image has no '/' → official Docker Hub image (always allowed)
661 2. If image has '/' and part before first '/' contains a dot or colon
662 → it's a registry domain → match against trusted_registries
663 3. If image has '/' but first segment has no dot/colon
664 → it's a Docker Hub org → match against trusted_dockerhub_orgs
665 4. Digest references (@sha256:) are accepted from any trusted source
667 Returns:
668 Tuple of (is_valid, error_message). error_message is None if valid.
669 """
670 try:
671 trusted_registries = self.trusted_registries
672 trusted_dockerhub_orgs = self.trusted_dockerhub_orgs
674 spec = manifest.get("spec", {})
676 # Get pod spec (handle different resource types)
677 pod_spec = {}
678 if "template" in spec:
679 pod_spec = spec.get("template", {}).get("spec", {})
680 elif "jobTemplate" in spec:
681 pod_spec = (
682 spec.get("jobTemplate", {}).get("spec", {}).get("template", {}).get("spec", {})
683 )
684 elif "containers" in spec:
685 pod_spec = spec
687 for container_type, container in self._get_all_containers(pod_spec):
688 image = container.get("image", "")
689 if not image:
690 continue
692 is_trusted = False
694 # Case 1: Official Docker Hub image (no slash) — e.g., "python:3.14", "busybox"
695 if "/" not in image:
696 is_trusted = True
697 else:
698 # Image has a slash — determine if first segment is a domain or org
699 first_segment = image.split("/")[0]
701 if self._is_registry_domain(first_segment):
702 # Case 2: First segment looks like a domain (has dot or colon)
703 # Match against trusted_registries as exact domain match
704 for registry in trusted_registries:
705 if first_segment == registry:
706 is_trusted = True
707 break
708 # Also support multi-level registry paths like "public.ecr.aws"
709 # where the image might be "public.ecr.aws/lambda/python:3.14"
710 if image.startswith(registry + "/"): 710 ↛ 711line 710 didn't jump to line 711 because the condition on line 710 was never true
711 is_trusted = True
712 break
713 else:
714 # Case 3: First segment has no dot/colon — it's a Docker Hub org
715 # Match against trusted_dockerhub_orgs
716 if first_segment in trusted_dockerhub_orgs:
717 is_trusted = True
719 if not is_trusted:
720 container_name = container.get("name", "unknown")
721 logger.warning(f"Untrusted image source: {image}")
722 return (
723 False,
724 f"{container_type} '{container_name}': Untrusted image source '{image}'",
725 )
727 return True, None
729 except Exception as e:
730 logger.error(f"Error validating image sources: {e}")
731 return False, f"Image source validation error: {e}"
733 async def process_manifest_submission(
734 self, request: ManifestSubmissionRequest
735 ) -> ManifestSubmissionResponse:
736 """
737 Process a manifest submission request
738 """
739 logger.info(f"Processing manifest submission with {len(request.manifests)} manifests")
741 resources = []
742 errors = []
743 overall_success = True
745 try:
746 # Process each manifest
747 for i, manifest_data in enumerate(request.manifests):
748 try:
749 # Validate manifest
750 is_valid, error_msg = self.validate_manifest(manifest_data)
751 if not is_valid:
752 error_msg = f"Manifest {i + 1} validation failed: {error_msg}"
753 errors.append(error_msg)
754 logger.error(error_msg)
756 # Create failed resource status
757 resource_status = ResourceStatus(
758 api_version=manifest_data.get("apiVersion", "unknown"),
759 kind=manifest_data.get("kind", "unknown"),
760 name=manifest_data.get("metadata", {}).get("name", f"manifest-{i + 1}"),
761 namespace=manifest_data.get("metadata", {}).get("namespace", "default"),
762 status="failed",
763 message=error_msg,
764 )
765 resources.append(resource_status)
766 overall_success = False
767 continue
769 # Apply manifest if validation passed
770 if not request.dry_run:
771 resource_status = await self._apply_manifest(
772 manifest_data, request.namespace
773 )
774 resources.append(resource_status)
776 if not resource_status.is_successful():
777 overall_success = False
778 else:
779 # Dry run - just validate
780 resource_status = ResourceStatus(
781 api_version=manifest_data.get("apiVersion", "unknown"),
782 kind=manifest_data.get("kind", "unknown"),
783 name=manifest_data.get("metadata", {}).get("name", "unknown"),
784 namespace=manifest_data.get("metadata", {}).get(
785 "namespace", request.namespace or "default"
786 ),
787 status="unchanged",
788 message="Dry run - validation passed",
789 )
790 resources.append(resource_status)
792 except Exception as e:
793 error_msg = f"Error processing manifest {i + 1}: {e!s}"
794 errors.append(error_msg)
795 logger.error(error_msg)
796 overall_success = False
798 # Create failed resource status
799 resource_status = ResourceStatus(
800 api_version=manifest_data.get("apiVersion", "unknown"),
801 kind=manifest_data.get("kind", "unknown"),
802 name=manifest_data.get("metadata", {}).get("name", f"manifest-{i + 1}"),
803 namespace=manifest_data.get("metadata", {}).get("namespace", "default"),
804 status="failed",
805 message=str(e),
806 )
807 resources.append(resource_status)
809 except Exception as e:
810 error_msg = f"Fatal error processing manifest submission: {e!s}"
811 errors.append(error_msg)
812 logger.error(error_msg)
813 overall_success = False
815 response = ManifestSubmissionResponse(
816 success=overall_success,
817 cluster_id=self.cluster_id,
818 region=self.region,
819 resources=resources,
820 errors=errors if errors else None,
821 )
823 logger.info(
824 f"Manifest submission completed - Success: {overall_success}, "
825 f"Resources: {len(resources)}, Errors: {len(errors)}"
826 )
828 return response
830 async def _apply_manifest(
831 self, manifest_data: dict[str, Any], default_namespace: str | None = None
832 ) -> ResourceStatus:
833 """
834 Apply a single manifest to the cluster.
836 For Jobs and CronJobs, if the resource already exists and is completed/failed,
837 it will be automatically deleted and recreated (since these resources are immutable).
838 """
839 try:
840 api_version: str = manifest_data.get("apiVersion", "unknown")
841 kind: str = manifest_data.get("kind", "unknown")
842 metadata = manifest_data.get("metadata", {})
843 name: str = metadata.get("name", "unknown")
844 namespace: str = metadata.get("namespace", default_namespace or "default")
846 # Ensure namespace is set in manifest
847 if "namespace" not in metadata and namespace:
848 manifest_data["metadata"]["namespace"] = namespace
850 # Inject security defaults (e.g., automountServiceAccountToken: false)
851 self._inject_security_defaults(manifest_data)
853 # Check if resource already exists
854 existing_resource = await self._get_existing_resource(
855 api_version, kind, name, namespace
856 )
858 if existing_resource:
859 # Jobs are immutable — if one already exists and is finished,
860 # delete it first so we can recreate cleanly.
861 # If the job is still active, auto-rename to avoid collision.
862 if kind == "Job":
863 if self._is_job_finished(existing_resource):
864 logger.info(
865 f"Job {name} already exists and is finished, deleting before recreating"
866 )
867 await self.delete_resource(api_version, kind, name, namespace)
868 import asyncio
870 await asyncio.sleep(1)
871 await self._create_resource(manifest_data)
872 status = "created"
873 message = "Previous completed job replaced with new submission"
874 else:
875 # Active job — rename to avoid destroying it
876 import uuid
878 suffix = uuid.uuid4().hex[:5]
879 new_name = f"{name}-{suffix}"
880 manifest_data["metadata"]["name"] = new_name
881 logger.warning(
882 f"Job {name} is still active, renamed new submission to {new_name}"
883 )
884 await self._create_resource(manifest_data)
885 status = "created"
886 message = (
887 f"Job '{name}' is still running. "
888 f"New submission renamed to '{new_name}'."
889 )
890 name = new_name
891 else:
892 # Update existing resource (works for mutable resources)
893 updated_resource = await self._update_resource(manifest_data)
894 status = "updated" if updated_resource else "unchanged"
895 message = (
896 "Resource updated successfully" if updated_resource else "No changes needed"
897 )
898 else:
899 # Create new resource
900 await self._create_resource(manifest_data)
901 status = "created"
902 message = "Resource created successfully"
904 return ResourceStatus(
905 api_version=api_version,
906 kind=kind,
907 name=name,
908 namespace=namespace,
909 status=status,
910 message=message,
911 )
913 except ApiException as e:
914 logger.error(f"Kubernetes API error applying manifest: {e}")
915 return ResourceStatus(
916 api_version=manifest_data.get("apiVersion", "unknown"),
917 kind=manifest_data.get("kind", "unknown"),
918 name=manifest_data.get("metadata", {}).get("name", "unknown"),
919 namespace=manifest_data.get("metadata", {}).get("namespace", "default"),
920 status="failed",
921 message=f"API error: {e.reason}",
922 )
923 except Exception as e:
924 logger.error(f"Error applying manifest: {e}")
925 return ResourceStatus(
926 api_version=manifest_data.get("apiVersion", "unknown"),
927 kind=manifest_data.get("kind", "unknown"),
928 name=manifest_data.get("metadata", {}).get("name", "unknown"),
929 namespace=manifest_data.get("metadata", {}).get("namespace", "default"),
930 status="failed",
931 message=str(e),
932 )
934 def _is_job_finished(self, job_resource: dict[str, Any]) -> bool:
935 """Check if a Kubernetes Job resource is in a terminal state (Complete or Failed)."""
936 status = job_resource.get("status", {})
937 conditions = status.get("conditions") or []
938 for condition in conditions:
939 cond_type = condition.get("type", "")
940 cond_status = condition.get("status", "")
941 if cond_type in ("Complete", "Failed") and cond_status == "True":
942 return True
943 return False
945 async def _get_existing_resource(
946 self, api_version: str, kind: str, name: str, namespace: str
947 ) -> dict[str, Any] | None:
948 """Check if a resource already exists using dynamic client"""
949 try:
950 # Get the API resource
951 api_resource = self._get_api_resource(api_version, kind)
953 # Try to get the resource
954 if namespace and api_resource.namespaced:
955 resource = api_resource.get(name=name, namespace=namespace)
956 else:
957 resource = api_resource.get(name=name)
959 if resource is not None: 959 ↛ 970line 959 didn't jump to line 970 because the condition on line 959 was always true
960 return dict(resource.to_dict())
962 except ApiException as e:
963 if e.status == 404:
964 return None # Resource doesn't exist
965 raise
966 except ValueError:
967 # Unknown resource type
968 return None
970 return None
972 def _get_api_resource(self, api_version: str, kind: str) -> Any:
973 """Get the API resource for a given apiVersion and kind using dynamic client."""
974 try:
975 return self.dynamic_client.resources.get(api_version=api_version, kind=kind)
976 except ResourceNotFoundError as e:
977 logger.error(f"Resource type not found: {api_version}/{kind}")
978 raise ValueError(f"Unknown resource type: {api_version}/{kind}") from e
980 async def _create_resource(self, manifest_data: dict[str, Any]) -> bool:
981 """Create a new resource from manifest using dynamic client"""
982 try:
983 api_version = manifest_data.get("apiVersion", "")
984 kind = manifest_data.get("kind", "")
985 namespace = manifest_data.get("metadata", {}).get("namespace")
987 # Get the API resource
988 api_resource = self._get_api_resource(api_version, kind)
990 # Create the resource
991 if namespace and api_resource.namespaced:
992 api_resource.create(body=manifest_data, namespace=namespace)
993 else:
994 api_resource.create(body=manifest_data)
996 return True
997 except Exception as e:
998 logger.error(f"Error creating resource: {e}")
999 raise
1001 async def _update_resource(self, manifest_data: dict[str, Any]) -> bool:
1002 """Update an existing resource using dynamic client"""
1003 try:
1004 api_version = manifest_data.get("apiVersion", "")
1005 kind = manifest_data.get("kind", "")
1006 name = manifest_data.get("metadata", {}).get("name", "")
1007 namespace = manifest_data.get("metadata", {}).get("namespace")
1009 # Get the API resource
1010 api_resource = self._get_api_resource(api_version, kind)
1012 # Update the resource using patch (server-side apply)
1013 if namespace and api_resource.namespaced:
1014 api_resource.patch(
1015 body=manifest_data,
1016 name=name,
1017 namespace=namespace,
1018 content_type="application/merge-patch+json",
1019 )
1020 else:
1021 api_resource.patch(
1022 body=manifest_data,
1023 name=name,
1024 content_type="application/merge-patch+json",
1025 )
1027 return True
1028 except Exception as e:
1029 logger.error(f"Error updating resource: {e}")
1030 raise
1032 async def delete_resource(
1033 self, api_version: str, kind: str, name: str, namespace: str
1034 ) -> ResourceStatus:
1035 """
1036 Delete a resource from the cluster using dynamic client
1037 """
1038 try:
1039 # Get the API resource
1040 api_resource = self._get_api_resource(api_version, kind)
1042 # Delete the resource
1043 if namespace and api_resource.namespaced:
1044 api_resource.delete(name=name, namespace=namespace)
1045 else:
1046 api_resource.delete(name=name)
1048 return ResourceStatus(
1049 api_version=api_version,
1050 kind=kind,
1051 name=name,
1052 namespace=namespace,
1053 status="deleted",
1054 message="Resource deleted successfully",
1055 )
1057 except ValueError as e:
1058 # Unknown resource type
1059 return ResourceStatus(
1060 api_version=api_version,
1061 kind=kind,
1062 name=name,
1063 namespace=namespace,
1064 status="failed",
1065 message=str(e),
1066 )
1067 except ApiException as e:
1068 if e.status == 404:
1069 return ResourceStatus(
1070 api_version=api_version,
1071 kind=kind,
1072 name=name,
1073 namespace=namespace,
1074 status="unchanged",
1075 message="Resource not found (already deleted)",
1076 )
1077 return ResourceStatus(
1078 api_version=api_version,
1079 kind=kind,
1080 name=name,
1081 namespace=namespace,
1082 status="failed",
1083 message=f"Delete failed: {e.reason}",
1084 )
1085 except Exception as e:
1086 return ResourceStatus(
1087 api_version=api_version,
1088 kind=kind,
1089 name=name,
1090 namespace=namespace,
1091 status="failed",
1092 message=str(e),
1093 )
1095 async def list_jobs(
1096 self, namespace: str | None = None, status_filter: str | None = None
1097 ) -> list[dict[str, Any]]:
1098 """
1099 List Kubernetes Jobs from allowed namespaces.
1101 Args:
1102 namespace: Filter by specific namespace (must be in allowed_namespaces)
1103 status_filter: Filter by status: "running", "completed", "failed"
1105 Returns:
1106 List of job dictionaries with metadata and status
1107 """
1108 jobs = []
1110 # Determine which namespaces to query
1111 if namespace:
1112 if namespace not in self.allowed_namespaces:
1113 raise ValueError(
1114 f"Namespace '{namespace}' not allowed. "
1115 f"Allowed namespaces: {list(self.allowed_namespaces)}"
1116 )
1117 namespaces_to_query = [namespace]
1118 else:
1119 namespaces_to_query = list(self.allowed_namespaces)
1121 for ns in namespaces_to_query:
1122 try:
1123 job_list = self.batch_v1.list_namespaced_job(
1124 namespace=ns, _request_timeout=self._k8s_timeout
1125 )
1126 for job in job_list.items:
1127 job_dict = self._job_to_dict(job)
1129 # Apply status filter
1130 if status_filter:
1131 job_status = self._get_job_status(job)
1132 if job_status != status_filter:
1133 continue
1135 jobs.append(job_dict)
1136 except ApiException as e:
1137 logger.warning(f"Failed to list jobs in namespace {ns}: {e.reason}")
1138 continue
1140 return jobs
1142 def _job_to_dict(self, job: V1Job) -> dict[str, Any]:
1143 """Convert a Kubernetes Job object to a dictionary."""
1144 metadata = job.metadata
1145 status = job.status
1146 spec = job.spec
1148 return {
1149 "metadata": {
1150 "name": metadata.name,
1151 "namespace": metadata.namespace,
1152 "creationTimestamp": (
1153 metadata.creation_timestamp.isoformat() if metadata.creation_timestamp else None
1154 ),
1155 "labels": metadata.labels or {},
1156 "uid": metadata.uid,
1157 },
1158 "spec": {
1159 "parallelism": spec.parallelism,
1160 "completions": spec.completions,
1161 "backoffLimit": spec.backoff_limit,
1162 },
1163 "status": {
1164 "active": status.active or 0,
1165 "succeeded": status.succeeded or 0,
1166 "failed": status.failed or 0,
1167 "startTime": status.start_time.isoformat() if status.start_time else None,
1168 "completionTime": (
1169 status.completion_time.isoformat() if status.completion_time else None
1170 ),
1171 "conditions": [
1172 {
1173 "type": c.type,
1174 "status": c.status,
1175 "reason": c.reason,
1176 "message": c.message,
1177 }
1178 for c in (status.conditions or [])
1179 ],
1180 },
1181 }
1183 def _get_job_status(self, job: V1Job) -> str:
1184 """Determine the status of a job: running, completed, or failed."""
1185 status = job.status
1186 conditions = status.conditions or []
1188 for condition in conditions:
1189 if condition.type == "Complete" and condition.status == "True":
1190 return "completed"
1191 if condition.type == "Failed" and condition.status == "True": 1191 ↛ 1188line 1191 didn't jump to line 1188 because the condition on line 1191 was always true
1192 return "failed"
1194 if (status.active or 0) > 0:
1195 return "running"
1197 return "pending"
1199 async def get_resource_status(
1200 self, api_version: str, kind: str, name: str, namespace: str
1201 ) -> dict[str, Any] | None:
1202 """
1203 Get the status of a specific resource
1204 """
1205 try:
1206 resource = await self._get_existing_resource(api_version, kind, name, namespace)
1207 if resource:
1208 return {
1209 "api_version": api_version,
1210 "kind": kind,
1211 "name": name,
1212 "namespace": namespace,
1213 "exists": True,
1214 "status": resource.get("status", {}),
1215 "metadata": resource.get("metadata", {}),
1216 "spec": resource.get("spec", {}),
1217 }
1218 return {
1219 "api_version": api_version,
1220 "kind": kind,
1221 "name": name,
1222 "namespace": namespace,
1223 "exists": False,
1224 }
1225 except Exception as e:
1226 logger.error(f"Error getting resource status: {e}")
1227 return None
1230def create_manifest_processor_from_env() -> ManifestProcessor:
1231 """
1232 Create ManifestProcessor instance from environment variables
1233 """
1234 cluster_id = os.getenv("CLUSTER_NAME", "unknown-cluster")
1235 region = os.getenv("REGION", "unknown-region")
1237 # Enable structured JSON logging for CloudWatch Insights
1238 configure_structured_logging(
1239 service_name="manifest-processor",
1240 cluster_id=cluster_id,
1241 region=region,
1242 )
1244 # Load configuration from environment
1245 config_dict = {
1246 "max_cpu_per_manifest": os.getenv("MAX_CPU_PER_MANIFEST", "10"),
1247 "max_memory_per_manifest": os.getenv("MAX_MEMORY_PER_MANIFEST", "32Gi"),
1248 "max_gpu_per_manifest": int(os.getenv("MAX_GPU_PER_MANIFEST", "4")),
1249 "allowed_namespaces": os.getenv("ALLOWED_NAMESPACES", "default,gco-jobs").split(","),
1250 "validation_enabled": os.getenv("VALIDATION_ENABLED", "true").lower() == "true",
1251 }
1253 return ManifestProcessor(cluster_id, region, config_dict)