Coverage for gco / services / queue_processor.py: 94%
295 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-30 21:47 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-30 21:47 +0000
1"""
2Queue Processor Service for GCO (Global Capacity Orchestrator on AWS).
4Polls the regional SQS job queue, reads Kubernetes manifests from messages,
5validates them, and applies them to the cluster. Designed to run as a
6short-lived pod managed by a KEDA ScaledJob that scales based on queue depth.
8Each invocation processes a single SQS message (which may contain multiple
9manifests). On success the message is deleted; on failure it returns to the
10queue after the visibility timeout (5 min) and eventually lands in the DLQ
11after 3 failed attempts.
13Message format (produced by `gco jobs submit-sqs`):
14 {
15 "job_id": "abc123",
16 "manifests": [<k8s manifest dicts>],
17 "namespace": "gco-jobs",
18 "priority": 0,
19 "submitted_at": "2026-03-26T12:00:00+00:00"
20 }
22Configuration via environment variables:
23 JOB_QUEUE_URL: SQS queue URL to consume from (required)
24 AWS_REGION: AWS region (default: us-east-1)
25 ALLOWED_NAMESPACES: Comma-separated namespace allowlist
26 (default: default,gco-jobs)
27 MAX_GPU_PER_MANIFEST: Max GPUs summed across all containers
28 (regular + init + ephemeral) (default: 4)
29 MAX_CPU_PER_MANIFEST: Max CPU summed across all containers; accepts
30 K8s suffixes ("500m" or "10" for cores)
31 (default: 10000 millicores = 10 cores)
32 MAX_MEMORY_PER_MANIFEST: Max memory summed across all containers;
33 accepts K8s suffixes ("32Gi", "256Mi") or
34 a bare byte count (default: 32Gi)
35 TRUSTED_REGISTRIES: Comma-separated list of registry domains
36 (e.g. "nvcr.io,public.ecr.aws"). Empty/unset
37 disables the image registry check (fail-open).
38 Keep in sync with
39 cdk.json::job_validation_policy.trusted_registries.
40 TRUSTED_DOCKERHUB_ORGS: Comma-separated list of Docker Hub org names
41 (e.g. "nvidia,pytorch"). Empty/unset disables
42 the check. Keep in sync with
43 cdk.json::job_validation_policy.trusted_dockerhub_orgs.
45Security policy toggles (all default to true except ``BLOCK_RUN_AS_ROOT``
46which defaults to false, matching job_validation_policy.manifest_security_policy
47in cdk.json). Each one controls whether the corresponding pod/container
48setting is rejected; the REST manifest_processor enforces an identical set
49so both submission paths apply the same policy:
51 BLOCK_PRIVILEGED: Reject ``securityContext.privileged: true``
52 on pod or container (default: true)
53 BLOCK_PRIVILEGE_ESCALATION: Reject containers with
54 allowPrivilegeEscalation=true
55 (default: true)
56 BLOCK_HOST_NETWORK: Block pods with hostNetwork=true
57 (default: true)
58 BLOCK_HOST_PID: Block pods with hostPID=true
59 (default: true)
60 BLOCK_HOST_IPC: Block pods with hostIPC=true
61 (default: true)
62 BLOCK_HOST_PATH: Block volumes referencing hostPath
63 (default: true)
64 BLOCK_ADDED_CAPABILITIES: Block containers that add Linux
65 capabilities via securityContext.capabilities.add
66 (default: true)
67 BLOCK_RUN_AS_ROOT: Reject runAsUser: 0 at pod or container
68 level (default: false — many public
69 images still run as root)
70"""
72from __future__ import annotations
74import json
75import logging
76import os
77import sys
78import time
79from typing import Any
81import boto3
82from kubernetes import client, config, dynamic
83from kubernetes.client.rest import ApiException
84from kubernetes.dynamic.exceptions import NotFoundError, ResourceNotFoundError
86logging.basicConfig(
87 level=logging.INFO,
88 format="%(asctime)s %(levelname)s [queue-processor] %(message)s",
89)
90log = logging.getLogger("queue-processor")
93def _parse_cpu_string(cpu_str: str) -> int:
94 """Parse a Kubernetes-style CPU string to millicores.
96 Accepts:
97 - Millicore suffix: "500m" -> 500
98 - Whole cores: "4" -> 4000
99 - Bare millicore counts: "10000" (when > 999) stays as millicores
100 """
101 if not cpu_str:
102 return 0
103 s = cpu_str.strip()
104 if s.endswith("m"):
105 return int(s[:-1])
106 return int(s) * 1000
109def _parse_memory_string(memory_str: str) -> int:
110 """Parse a Kubernetes-style memory string to bytes.
112 Accepts binary suffixes (Ki, Mi, Gi, Ti), decimal suffixes (k, M, G),
113 or a bare byte count.
114 """
115 if not memory_str:
116 return 0
117 s = memory_str.strip()
118 if s.endswith("Ki"):
119 return int(s[:-2]) * 1024
120 if s.endswith("Mi"):
121 return int(s[:-2]) * 1024**2
122 if s.endswith("Gi"):
123 return int(s[:-2]) * 1024**3
124 if s.endswith("Ti"):
125 return int(s[:-2]) * 1024**4
126 if s.endswith("k"): 126 ↛ 127line 126 didn't jump to line 127 because the condition on line 126 was never true
127 return int(s[:-1]) * 1000
128 if s.endswith("M"): 128 ↛ 129line 128 didn't jump to line 129 because the condition on line 128 was never true
129 return int(s[:-1]) * 1000**2
130 if s.endswith("G"):
131 return int(s[:-1]) * 1000**3
132 return int(s)
135# --- Configuration from environment ---
136# These are set by the KEDA ScaledJob manifest (post-helm-sqs-consumer.yaml)
137# and populated from cdk.json queue_processor settings during CDK deploy.
138QUEUE_URL = os.environ.get("JOB_QUEUE_URL", "")
139REGION = os.environ.get("AWS_REGION", os.environ.get("AWS_DEFAULT_REGION", "us-east-1"))
140ALLOWED_NAMESPACES = set(os.environ.get("ALLOWED_NAMESPACES", "default,gco-jobs").split(","))
141MAX_CPU = _parse_cpu_string(os.environ.get("MAX_CPU_PER_MANIFEST", "10000")) # millicores
142MAX_MEMORY = _parse_memory_string(os.environ.get("MAX_MEMORY_PER_MANIFEST", "32Gi")) # bytes
143MAX_GPU = int(os.environ.get("MAX_GPU_PER_MANIFEST", "4"))
145# Trusted image sources (populated from cdk.json::manifest_processor at deploy time).
146# Comma-separated env vars; empty/unset disables the check (fail-open logged).
147# Keep in sync with gco/services/manifest_processor.py::_validate_image_sources.
148TRUSTED_REGISTRIES = [
149 r.strip() for r in os.environ.get("TRUSTED_REGISTRIES", "").split(",") if r.strip()
150]
151TRUSTED_DOCKERHUB_ORGS = [
152 o.strip() for o in os.environ.get("TRUSTED_DOCKERHUB_ORGS", "").split(",") if o.strip()
153]
156def _env_bool(name: str, default: bool) -> bool:
157 """Parse a boolean environment variable.
159 Empty/unset returns ``default``. Recognized truthy values: "true", "1",
160 "yes", "on" (case-insensitive). Everything else is falsy.
161 """
162 raw = os.environ.get(name)
163 if raw is None or raw == "":
164 return default
165 return raw.strip().lower() in ("true", "1", "yes", "on")
168# Security-policy toggles. Every one of these mirrors an attribute the REST
169# manifest_processor exposes via cdk.json::job_validation_policy.manifest_security_policy.
170# Both submission paths MUST enforce the same policy — an attacker holding
171# sqs:SendMessage on the job queue must not be able to bypass checks the REST
172# path applies. Structural parity is pinned by
173# tests/test_queue_processor.py::TestSecurityPolicyParityWithManifestProcessor.
174BLOCK_PRIVILEGED = _env_bool("BLOCK_PRIVILEGED", True)
175BLOCK_PRIVILEGE_ESCALATION = _env_bool("BLOCK_PRIVILEGE_ESCALATION", True)
176BLOCK_HOST_NETWORK = _env_bool("BLOCK_HOST_NETWORK", True)
177BLOCK_HOST_PID = _env_bool("BLOCK_HOST_PID", True)
178BLOCK_HOST_IPC = _env_bool("BLOCK_HOST_IPC", True)
179BLOCK_HOST_PATH = _env_bool("BLOCK_HOST_PATH", True)
180BLOCK_ADDED_CAPABILITIES = _env_bool("BLOCK_ADDED_CAPABILITIES", True)
181BLOCK_RUN_AS_ROOT = _env_bool("BLOCK_RUN_AS_ROOT", False)
184def _is_registry_domain(entry: str) -> bool:
185 """True if the entry looks like a registry domain (has '.' or ':')."""
186 return "." in entry or ":" in entry
189def _iter_containers(pod_spec: dict[str, Any]) -> list[tuple[str, dict[str, Any]]]:
190 """Yield (kind, container_dict) for every container, initContainer, and
191 ephemeralContainer in a pod spec."""
192 out: list[tuple[str, dict[str, Any]]] = []
193 for c in pod_spec.get("containers", []) or []:
194 out.append(("container", c))
195 for c in pod_spec.get("initContainers", []) or []:
196 out.append(("initContainer", c))
197 for c in pod_spec.get("ephemeralContainers", []) or []: 197 ↛ 198line 197 didn't jump to line 198 because the loop on line 197 never started
198 out.append(("ephemeralContainer", c))
199 return out
202def _is_image_trusted(image: str) -> bool:
203 """True if the image reference is from a trusted registry or Docker Hub org.
205 Matches the semantics of manifest_processor._validate_image_sources:
206 1. Official Docker Hub images (no '/') are always allowed
207 2. Images with a registry domain (first segment has '.' or ':') must
208 match an entry in TRUSTED_REGISTRIES exactly (or a multi-segment
209 prefix like "public.ecr.aws/lambda")
210 3. Docker Hub images with an org (first segment has no '.' or ':') must
211 match an entry in TRUSTED_DOCKERHUB_ORGS
213 If both allowlists are empty the check is disabled (fail-open, logged).
214 """
215 if not TRUSTED_REGISTRIES and not TRUSTED_DOCKERHUB_ORGS:
216 return True
217 if not image: 217 ↛ 218line 217 didn't jump to line 218 because the condition on line 217 was never true
218 return True
219 if "/" not in image:
220 # Case 1: Official Docker Hub image — always trusted
221 return True
222 first = image.split("/", 1)[0]
223 if _is_registry_domain(first):
224 for registry in TRUSTED_REGISTRIES:
225 if first == registry or image.startswith(registry + "/"):
226 return True
227 return False
228 return first in TRUSTED_DOCKERHUB_ORGS
231def load_k8s() -> None:
232 """Load Kubernetes configuration (in-cluster or local kubeconfig)."""
233 try:
234 config.load_incluster_config()
235 log.info("Loaded in-cluster Kubernetes configuration")
236 except config.ConfigException:
237 config.load_kube_config()
238 log.info("Loaded local kubeconfig")
241def validate_manifest(m: dict[str, Any]) -> tuple[bool, str]:
242 """Validate a manifest before applying it to the cluster.
244 The queue processor mirrors the security checks performed by the REST
245 `manifest_processor` service (``gco/services/manifest_processor.py``)
246 so that the SQS path cannot bypass them. Checks performed:
248 1. **Namespace allowlist** — manifest namespace must be in
249 ``ALLOWED_NAMESPACES`` (from ``ALLOWED_NAMESPACES`` env var,
250 populated from ``cdk.json::job_validation_policy.allowed_namespaces``,
251 shared with the REST manifest_processor).
253 2. **Pod-level security policy** (configurable via cdk.json::
254 job_validation_policy.manifest_security_policy, shared between both
255 services). Rejects ``hostNetwork``, ``hostPID``, ``hostIPC``,
256 ``hostPath`` volumes, privileged pod security context, and
257 (if ``BLOCK_RUN_AS_ROOT``) pod-level ``runAsUser: 0``.
259 3. **Container-level security policy** — for every container kind
260 (regular, init, ephemeral) rejects ``privileged``,
261 ``allowPrivilegeEscalation``, ``capabilities.add``, and (if
262 ``BLOCK_RUN_AS_ROOT``) container-level ``runAsUser: 0``. Iterating
263 every container kind catches the classic "smuggle it via an init
264 container" bypass.
266 4. **Image registry allowlist** — every container's image must come
267 from ``TRUSTED_REGISTRIES`` (registry domains like ``nvcr.io``)
268 or ``TRUSTED_DOCKERHUB_ORGS`` (Docker Hub orgs like ``nvidia``).
269 Official Docker Hub images with no slash are always allowed. When
270 both allowlists are empty the check is disabled. Keep the lists in
271 sync with ``cdk.json::job_validation_policy.trusted_registries`` and
272 ``trusted_dockerhub_orgs`` — CDK wires the same config into both
273 services.
275 5. **Resource caps** — the TOTAL CPU, memory, and GPU across ALL
276 containers (regular + init + ephemeral) must not exceed
277 ``MAX_CPU``, ``MAX_MEMORY``, and ``MAX_GPU``. This matches
278 ``manifest_processor._validate_resource_limits`` — K8s accounts
279 init/ephemeral resources differently at scheduling time, but
280 from an enforcement perspective we sum them so an operator's
281 ``max_*_per_manifest`` budget is a hard cap regardless of where
282 the request is placed.
284 Returns:
285 ``(True, "")`` if the manifest is accepted, otherwise
286 ``(False, reason)`` where ``reason`` is a human-readable string.
287 """
288 kind = m.get("kind")
289 if not kind:
290 return False, "missing 'kind'"
291 api = m.get("apiVersion")
292 if not api:
293 return False, "missing 'apiVersion'"
294 meta = m.get("metadata", {})
295 if not meta.get("name"):
296 return False, "missing 'metadata.name'"
297 ns = meta.get("namespace", "default")
298 if ns not in ALLOWED_NAMESPACES:
299 return False, f"namespace '{ns}' not in allowed list {ALLOWED_NAMESPACES}"
301 # Get pod spec for security and resource checks.
302 # Handle multiple resource shapes, matching manifest_processor._get_all_containers:
303 # - Deployments / StatefulSets / ReplicaSets / DaemonSets / Jobs: spec.template.spec
304 # - CronJob: spec.jobTemplate.spec.template.spec
305 # - Pod (bare): spec (has 'containers' directly)
306 spec = m.get("spec", {})
307 pod_spec = None
308 if "template" in spec:
309 pod_spec = spec["template"].get("spec", {})
310 elif "jobTemplate" in spec:
311 pod_spec = spec["jobTemplate"].get("spec", {}).get("template", {}).get("spec", {})
312 elif "containers" in spec:
313 # Plain Pod manifest
314 pod_spec = spec
316 if pod_spec:
317 all_containers = _iter_containers(pod_spec)
319 # --- Pod-level security policy checks ---
320 # Mirror manifest_processor._validate_security_context so the SQS
321 # path enforces the same policy as the REST path.
322 if BLOCK_HOST_NETWORK and pod_spec.get("hostNetwork", False):
323 return False, "hostNetwork is not permitted"
324 if BLOCK_HOST_PID and pod_spec.get("hostPID", False):
325 return False, "hostPID is not permitted"
326 if BLOCK_HOST_IPC and pod_spec.get("hostIPC", False):
327 return False, "hostIPC is not permitted"
328 if BLOCK_HOST_PATH:
329 for volume in pod_spec.get("volumes", []) or []:
330 if volume.get("hostPath") is not None: 330 ↛ 329line 330 didn't jump to line 329 because the condition on line 330 was always true
331 return False, "hostPath volumes are not permitted"
333 pod_security_context = pod_spec.get("securityContext", {}) or {}
334 if BLOCK_PRIVILEGED and pod_security_context.get("privileged", False):
335 return False, "privileged pod security context is not permitted"
336 if BLOCK_RUN_AS_ROOT:
337 pod_run_as_user = pod_security_context.get("runAsUser")
338 if pod_run_as_user is not None and pod_run_as_user == 0:
339 return False, "running as root (runAsUser: 0) is not permitted"
341 # --- Container-level security policy checks ---
342 # Every toggle is applied to every container kind (regular, init,
343 # ephemeral). An init container running as root or with CAP_SYS_ADMIN
344 # has the same blast radius as a regular container running the same
345 # way; there is no reason to give any kind a free pass.
346 for kind, c in all_containers:
347 cname = c.get("name", "unknown")
348 sc = c.get("securityContext", {}) or {}
349 if BLOCK_PRIVILEGED and sc.get("privileged", False):
350 return False, f"{kind} '{cname}': privileged containers are not permitted"
351 if BLOCK_PRIVILEGE_ESCALATION and sc.get("allowPrivilegeEscalation", False):
352 return False, f"{kind} '{cname}': allowPrivilegeEscalation is not permitted"
353 if BLOCK_ADDED_CAPABILITIES:
354 added_caps = (sc.get("capabilities", {}) or {}).get("add", []) or []
355 if added_caps:
356 return False, f"{kind} '{cname}': added capabilities are not permitted"
357 if BLOCK_RUN_AS_ROOT:
358 ras = sc.get("runAsUser")
359 if ras is not None and ras == 0: 359 ↛ 346line 359 didn't jump to line 346 because the condition on line 359 was always true
360 return (
361 False,
362 f"{kind} '{cname}': running as root (runAsUser: 0) is not permitted",
363 )
365 # Enforce image registry allowlist (matches manifest_processor semantics)
366 for kind, c in all_containers:
367 image = c.get("image", "")
368 if not _is_image_trusted(image):
369 cname = c.get("name", "unknown")
370 return (
371 False,
372 f"{kind} '{cname}': untrusted image source '{image}'",
373 )
375 # Enforce resource caps across ALL container kinds.
376 # Sum the resource requests/limits of every container (regular,
377 # init, and ephemeral). This is stricter than the K8s scheduler's
378 # accounting but matches our security intent: an operator's
379 # configured "max CPU/memory/GPU per manifest" is a hard cap on
380 # the total resources a submitter can request regardless of
381 # which container kind carries the request.
382 total_gpu = 0
383 total_cpu = 0
384 total_memory = 0
385 for _kind, c in all_containers:
386 res = c.get("resources", {}) or {}
387 limits = res.get("limits", {}) or {}
388 requests = res.get("requests", {}) or {}
389 gpu = limits.get("nvidia.com/gpu") or requests.get(
390 "nvidia.com/gpu", "0"
391 ) # nosec B113 - dict.get(), not HTTP requests
392 total_gpu += int(gpu)
393 cpu_str = limits.get("cpu") or requests.get(
394 "cpu", "0"
395 ) # nosec B113 - dict.get(), not HTTP requests
396 if isinstance(cpu_str, str) and cpu_str.endswith("m"):
397 total_cpu += int(cpu_str[:-1])
398 else:
399 total_cpu += int(float(cpu_str) * 1000)
400 mem_str = limits.get("memory") or requests.get(
401 "memory", "0"
402 ) # nosec B113 - dict.get(), not HTTP requests
403 if isinstance(mem_str, str): 403 ↛ 413line 403 didn't jump to line 413 because the condition on line 403 was always true
404 if mem_str.endswith("Gi"): 404 ↛ 405line 404 didn't jump to line 405 because the condition on line 404 was never true
405 total_memory += int(float(mem_str[:-2]) * 1024**3)
406 elif mem_str.endswith("Mi"):
407 total_memory += int(float(mem_str[:-2]) * 1024**2)
408 elif mem_str.endswith("Ki"): 408 ↛ 409line 408 didn't jump to line 409 because the condition on line 408 was never true
409 total_memory += int(float(mem_str[:-2]) * 1024)
410 else:
411 total_memory += int(mem_str)
412 else:
413 total_memory += int(mem_str)
415 errors = []
416 if total_gpu > MAX_GPU:
417 errors.append(f"GPU {total_gpu} exceeds max {MAX_GPU}")
418 if total_cpu > MAX_CPU:
419 errors.append(f"CPU {total_cpu}m exceeds max {MAX_CPU}m")
420 if total_memory > MAX_MEMORY: 420 ↛ 421line 420 didn't jump to line 421 because the condition on line 420 was never true
421 errors.append(
422 f"Memory {total_memory / (1024**3):.0f}Gi "
423 f"exceeds max {MAX_MEMORY / (1024**3):.0f}Gi"
424 )
425 if errors:
426 hint = (
427 "To raise limits, update queue_processor in cdk.json "
428 "and redeploy (see examples/README.md#troubleshooting)"
429 )
430 return False, "; ".join(errors) + f". {hint}"
432 return True, ""
435def _extract_pod_spec(manifest: dict[str, Any]) -> dict[str, Any] | None:
436 """Return the pod spec for any supported workload kind, or None.
438 Mirrors manifest_processor._extract_pod_spec so the SQS path and the
439 REST path apply the same injection semantics.
440 """
441 spec = manifest.get("spec")
442 if not isinstance(spec, dict):
443 return None
445 kind = manifest.get("kind", "")
447 # CronJob: spec.jobTemplate.spec.template.spec
448 if kind == "CronJob":
449 job_template = spec.get("jobTemplate")
450 if isinstance(job_template, dict): 450 ↛ 458line 450 didn't jump to line 458 because the condition on line 450 was always true
451 job_spec = job_template.get("spec")
452 if isinstance(job_spec, dict): 452 ↛ 458line 452 didn't jump to line 458 because the condition on line 452 was always true
453 template = job_spec.get("template")
454 if isinstance(template, dict): 454 ↛ 458line 454 didn't jump to line 458 because the condition on line 454 was always true
455 pod_spec = template.get("spec")
456 if isinstance(pod_spec, dict): 456 ↛ 458line 456 didn't jump to line 458 because the condition on line 456 was always true
457 return pod_spec
458 return None
460 # Deployment / StatefulSet / DaemonSet / ReplicaSet / Job: spec.template.spec
461 if "template" in spec:
462 template = spec.get("template")
463 if isinstance(template, dict): 463 ↛ 467line 463 didn't jump to line 467 because the condition on line 463 was always true
464 pod_spec = template.get("spec")
465 if isinstance(pod_spec, dict): 465 ↛ 467line 465 didn't jump to line 467 because the condition on line 465 was always true
466 return pod_spec
467 return None
469 # Bare Pod: spec contains "containers" directly
470 if "containers" in spec: 470 ↛ 473line 470 didn't jump to line 473 because the condition on line 470 was always true
471 return spec
473 return None
476def _inject_security_defaults(manifest: dict[str, Any]) -> dict[str, Any]:
477 """Inject security defaults into a user-submitted manifest in-place.
479 Currently sets ``automountServiceAccountToken: false`` on the pod spec
480 unless the user has explicitly set it either way (uses setdefault).
482 Mirrors manifest_processor._inject_security_defaults so jobs submitted
483 via SQS get the same SA-token-theft protection as those submitted via
484 the REST API.
485 """
486 pod_spec = _extract_pod_spec(manifest)
487 if pod_spec is not None:
488 pod_spec.setdefault("automountServiceAccountToken", False)
489 return manifest
492def apply_manifest(m: dict[str, Any]) -> str:
493 """Apply a single manifest using the dynamic Kubernetes client."""
494 # Inject security defaults BEFORE applying so user pods never
495 # auto-mount the default SA token (T-022 / M-113 parity with the
496 # REST manifest_processor path).
497 _inject_security_defaults(m)
499 dyn = dynamic.DynamicClient(client.ApiClient())
500 api_version = m["apiVersion"]
501 kind = m["kind"]
502 name = m["metadata"]["name"]
503 namespace = m["metadata"].get("namespace", "default")
505 try:
506 resource = dyn.resources.get(api_version=api_version, kind=kind)
507 except ResourceNotFoundError:
508 return f"SKIP unknown resource {api_version}/{kind}"
510 # For Jobs, delete completed/failed ones first so re-submission works.
511 # Without this, re-submitting the same job name would fail with a 409 conflict
512 # because Kubernetes doesn't allow creating a Job with the same name as an
513 # existing one (even if it's finished).
514 if kind == "Job":
515 try:
516 existing = resource.get(name=name, namespace=namespace)
517 conditions = existing.get("status", {}).get("conditions", [])
518 finished = any(c.get("type") in ("Complete", "Failed") for c in conditions)
519 if finished:
520 log.info(f"Deleting finished Job {namespace}/{name} before re-creation")
521 resource.delete(
522 name=name,
523 namespace=namespace,
524 body=client.V1DeleteOptions(propagation_policy="Background"),
525 )
526 time.sleep(2)
527 except (NotFoundError, ApiException):
528 pass
530 # Create-or-update pattern: try create first, fall back to patch on 409 (conflict).
531 # This is idempotent — safe to retry without side effects.
532 try:
533 if resource.namespaced:
534 resource.create(body=m, namespace=namespace)
535 else:
536 resource.create(body=m)
537 return f"CREATED {kind}/{name}"
538 except ApiException as e:
539 if e.status == 409:
540 try:
541 if resource.namespaced:
542 resource.patch(body=m, name=name, namespace=namespace)
543 else:
544 resource.patch(body=m, name=name)
545 return f"UPDATED {kind}/{name}"
546 except ApiException as patch_err:
547 return f"PATCH_FAILED {kind}/{name}: {patch_err.reason}"
548 return f"CREATE_FAILED {kind}/{name}: {e.reason}"
551def process_one_message() -> bool:
552 """Receive and process a single SQS message. Returns True on success."""
553 if not QUEUE_URL:
554 log.error("JOB_QUEUE_URL not set")
555 return False
557 sqs = boto3.client("sqs", region_name=REGION)
559 resp = sqs.receive_message(
560 QueueUrl=QUEUE_URL,
561 MaxNumberOfMessages=1,
562 WaitTimeSeconds=5,
563 MessageAttributeNames=["All"],
564 )
566 messages = resp.get("Messages", [])
567 if not messages:
568 log.info("No messages in queue")
569 return True
571 msg = messages[0]
572 receipt = msg["ReceiptHandle"]
573 body = json.loads(msg["Body"])
575 job_id = body.get("job_id", "unknown")
576 manifests = body.get("manifests", [])
577 log.info(f"Processing job_id={job_id}, manifests={len(manifests)}")
579 results: list[str] = []
580 failed = False
581 for i, m in enumerate(manifests):
582 ok, reason = validate_manifest(m)
583 if not ok:
584 log.error(f" manifest[{i}] validation failed: {reason}")
585 results.append(f"INVALID: {reason}")
586 failed = True
587 continue
588 result = apply_manifest(m)
589 log.info(f" manifest[{i}]: {result}")
590 results.append(result)
591 if "FAILED" in result:
592 failed = True
594 if failed:
595 # Don't delete the SQS message — it will become visible again after the
596 # visibility timeout (5 min) and retry. After 3 total failures, SQS
597 # moves it to the dead-letter queue for manual inspection.
598 log.error(f"Job {job_id} had failures — message will return to queue")
599 return False
601 sqs.delete_message(QueueUrl=QUEUE_URL, ReceiptHandle=receipt)
602 log.info(f"Job {job_id} processed successfully")
603 return True
606def main() -> None:
607 """Entry point for the queue processor."""
608 load_k8s()
609 success = process_one_message()
610 if not success:
611 sys.exit(1)
614if __name__ == "__main__":
615 main()