Coverage for gco/services/queue_processor.py: 94%
295 statements
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-15 15:07 +0000
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-15 15:07 +0000
1"""
2Queue Processor Service for GCO (Global Capacity Orchestrator on AWS).
4Polls the regional SQS job queue, reads Kubernetes manifests from messages,
5validates them, and applies them to the cluster. Designed to run as a
6short-lived pod managed by a KEDA ScaledJob that scales based on queue depth.
8Each invocation processes a single SQS message (which may contain multiple
9manifests). On success the message is deleted; on failure it returns to the
10queue after the visibility timeout (5 min) and eventually lands in the DLQ
11after 3 failed attempts.
13Message format (produced by `gco jobs submit-sqs`):
14 {
15 "job_id": "abc123",
16 "manifests": [<k8s manifest dicts>],
17 "namespace": "gco-jobs",
18 "priority": 0,
19 "submitted_at": "2026-03-26T12:00:00+00:00"
20 }
22Configuration via environment variables:
23 JOB_QUEUE_URL: SQS queue URL to consume from (required)
24 AWS_REGION: AWS region (default: us-east-1)
25 ALLOWED_NAMESPACES: Comma-separated namespace allowlist
26 (default: default,gco-jobs)
27 MAX_GPU_PER_MANIFEST: Max GPUs summed across all containers
28 (regular + init + ephemeral) (default: 4)
29 MAX_CPU_PER_MANIFEST: Max CPU summed across all containers; accepts
30 K8s suffixes ("500m" or "10" for cores)
31 (default: 10000 millicores = 10 cores)
32 MAX_MEMORY_PER_MANIFEST: Max memory summed across all containers;
33 accepts K8s suffixes ("32Gi", "256Mi") or
34 a bare byte count (default: 32Gi)
35 TRUSTED_REGISTRIES: Comma-separated list of registry domains
36 (e.g. "nvcr.io,public.ecr.aws"). Empty/unset
37 disables the image registry check (fail-open).
38 Keep in sync with
39 cdk.json::job_validation_policy.trusted_registries.
40 TRUSTED_DOCKERHUB_ORGS: Comma-separated list of Docker Hub org names
41 (e.g. "nvidia,pytorch"). Empty/unset disables
42 the check. Keep in sync with
43 cdk.json::job_validation_policy.trusted_dockerhub_orgs.
45Security policy toggles (all default to true except ``BLOCK_RUN_AS_ROOT``
46which defaults to false, matching job_validation_policy.manifest_security_policy
47in cdk.json). Each one controls whether the corresponding pod/container
48setting is rejected; the REST manifest_processor enforces an identical set
49so both submission paths apply the same policy:
51 BLOCK_PRIVILEGED: Reject ``securityContext.privileged: true``
52 on pod or container (default: true)
53 BLOCK_PRIVILEGE_ESCALATION: Reject containers with
54 allowPrivilegeEscalation=true
55 (default: true)
56 BLOCK_HOST_NETWORK: Block pods with hostNetwork=true
57 (default: true)
58 BLOCK_HOST_PID: Block pods with hostPID=true
59 (default: true)
60 BLOCK_HOST_IPC: Block pods with hostIPC=true
61 (default: true)
62 BLOCK_HOST_PATH: Block volumes referencing hostPath
63 (default: true)
64 BLOCK_ADDED_CAPABILITIES: Block containers that add Linux
65 capabilities via securityContext.capabilities.add
66 (default: true)
67 BLOCK_RUN_AS_ROOT: Reject runAsUser: 0 at pod or container
68 level (default: false — many public
69 images still run as root)
70"""
72from __future__ import annotations
74import json
75import logging
76import os
77import sys
78import time
79from typing import Any
81import boto3
82from kubernetes import client, config, dynamic
83from kubernetes.client.rest import ApiException
84from kubernetes.dynamic.exceptions import NotFoundError, ResourceNotFoundError
86logging.basicConfig(
87 level=logging.INFO,
88 format="%(asctime)s %(levelname)s [queue-processor] %(message)s",
89)
90log = logging.getLogger("queue-processor")
93def _parse_cpu_string(cpu_str: str) -> int:
94 """Parse a Kubernetes-style CPU string to millicores.
96 Accepts:
97 - Millicore suffix: "500m" -> 500
98 - Whole cores: "4" -> 4000
99 - Bare millicore counts: "10000" (when > 999) stays as millicores
100 """
101 if not cpu_str:
102 return 0
103 s = cpu_str.strip()
104 if s.endswith("m"):
105 return int(s[:-1])
106 return int(s) * 1000
109def _parse_memory_string(memory_str: str) -> int:
110 """Parse a Kubernetes-style memory string to bytes.
112 Accepts binary suffixes (Ki, Mi, Gi, Ti), decimal suffixes (k, M, G),
113 or a bare byte count.
114 """
115 if not memory_str:
116 return 0
117 s = memory_str.strip()
118 if s.endswith("Ki"):
119 return int(s[:-2]) * 1024
120 if s.endswith("Mi"):
121 return int(s[:-2]) * 1024**2
122 if s.endswith("Gi"):
123 return int(s[:-2]) * 1024**3
124 if s.endswith("Ti"):
125 return int(s[:-2]) * 1024**4
126 if s.endswith("k"): 126 ↛ 127line 126 didn't jump to line 127 because the condition on line 126 was never true
127 return int(s[:-1]) * 1000
128 if s.endswith("M"): 128 ↛ 129line 128 didn't jump to line 129 because the condition on line 128 was never true
129 return int(s[:-1]) * 1000**2
130 if s.endswith("G"):
131 return int(s[:-1]) * 1000**3
132 return int(s)
135# --- Configuration from environment ---
136# These are set by the KEDA ScaledJob manifest (post-helm-sqs-consumer.yaml)
137# and populated from cdk.json queue_processor settings during CDK deploy.
138QUEUE_URL = os.environ.get("JOB_QUEUE_URL", "")
139REGION = os.environ.get("AWS_REGION", os.environ.get("AWS_DEFAULT_REGION", "us-east-1"))
140ALLOWED_NAMESPACES = set(os.environ.get("ALLOWED_NAMESPACES", "default,gco-jobs").split(","))
141MAX_CPU = _parse_cpu_string(os.environ.get("MAX_CPU_PER_MANIFEST", "10000")) # millicores
142MAX_MEMORY = _parse_memory_string(os.environ.get("MAX_MEMORY_PER_MANIFEST", "32Gi")) # bytes
143MAX_GPU = int(os.environ.get("MAX_GPU_PER_MANIFEST", "4"))
145# Trusted image sources (populated from cdk.json::manifest_processor at deploy time).
146# Comma-separated env vars; empty/unset disables the check (fail-open logged).
147# Keep in sync with gco/services/manifest_processor.py::_validate_image_sources.
148TRUSTED_REGISTRIES = [
149 r.strip() for r in os.environ.get("TRUSTED_REGISTRIES", "").split(",") if r.strip()
150]
151TRUSTED_DOCKERHUB_ORGS = [
152 o.strip() for o in os.environ.get("TRUSTED_DOCKERHUB_ORGS", "").split(",") if o.strip()
153]
156def _env_bool(name: str, default: bool) -> bool:
157 """Parse a boolean environment variable.
159 Empty/unset returns ``default``. Recognized truthy values: "true", "1",
160 "yes", "on" (case-insensitive). Everything else is falsy.
161 """
162 raw = os.environ.get(name)
163 if raw is None or raw == "":
164 return default
165 return raw.strip().lower() in ("true", "1", "yes", "on")
168# Security-policy toggles. Every one of these mirrors an attribute the REST
169# manifest_processor exposes via cdk.json::job_validation_policy.manifest_security_policy.
170# Both submission paths MUST enforce the same policy — an attacker holding
171# sqs:SendMessage on the job queue must not be able to bypass checks the REST
172# path applies. Structural parity is pinned by
173# tests/test_queue_processor.py::TestSecurityPolicyParityWithManifestProcessor.
174BLOCK_PRIVILEGED = _env_bool("BLOCK_PRIVILEGED", True)
175BLOCK_PRIVILEGE_ESCALATION = _env_bool("BLOCK_PRIVILEGE_ESCALATION", True)
176BLOCK_HOST_NETWORK = _env_bool("BLOCK_HOST_NETWORK", True)
177BLOCK_HOST_PID = _env_bool("BLOCK_HOST_PID", True)
178BLOCK_HOST_IPC = _env_bool("BLOCK_HOST_IPC", True)
179BLOCK_HOST_PATH = _env_bool("BLOCK_HOST_PATH", True)
180BLOCK_ADDED_CAPABILITIES = _env_bool("BLOCK_ADDED_CAPABILITIES", True)
181BLOCK_RUN_AS_ROOT = _env_bool("BLOCK_RUN_AS_ROOT", False)
184def _is_registry_domain(entry: str) -> bool:
185 """True if the entry looks like a registry domain (has '.' or ':')."""
186 return "." in entry or ":" in entry
189def _iter_containers(pod_spec: dict[str, Any]) -> list[tuple[str, dict[str, Any]]]:
190 """Yield (kind, container_dict) for every container, initContainer, and
191 ephemeralContainer in a pod spec."""
192 out: list[tuple[str, dict[str, Any]]] = []
193 for c in pod_spec.get("containers", []) or []:
194 out.append(("container", c))
195 for c in pod_spec.get("initContainers", []) or []:
196 out.append(("initContainer", c))
197 for c in pod_spec.get("ephemeralContainers", []) or []: 197 ↛ 198line 197 didn't jump to line 198 because the loop on line 197 never started
198 out.append(("ephemeralContainer", c))
199 return out
202def _is_image_trusted(image: str) -> bool:
203 """True if the image reference is from a trusted registry or Docker Hub org.
205 Matches the semantics of manifest_processor._validate_image_sources:
206 1. Official Docker Hub images (no '/') are always allowed
207 2. Images with a registry domain (first segment has '.' or ':') must
208 match an entry in TRUSTED_REGISTRIES exactly (or a multi-segment
209 prefix like "public.ecr.aws/lambda")
210 3. Docker Hub images with an org (first segment has no '.' or ':') must
211 match an entry in TRUSTED_DOCKERHUB_ORGS
213 If both allowlists are empty the check is disabled (fail-open, logged).
214 """
215 if not TRUSTED_REGISTRIES and not TRUSTED_DOCKERHUB_ORGS:
216 return True
217 if not image: 217 ↛ 218line 217 didn't jump to line 218 because the condition on line 217 was never true
218 return True
219 if "/" not in image:
220 # Case 1: Official Docker Hub image — always trusted
221 return True
222 first = image.split("/", 1)[0]
223 if _is_registry_domain(first):
224 for registry in TRUSTED_REGISTRIES:
225 if first == registry or image.startswith(registry + "/"):
226 return True
227 return False
228 return first in TRUSTED_DOCKERHUB_ORGS
231def load_k8s() -> None:
232 """Load Kubernetes configuration (in-cluster or local kubeconfig)."""
233 try:
234 config.load_incluster_config()
235 log.info("Loaded in-cluster Kubernetes configuration")
236 except config.ConfigException:
237 config.load_kube_config()
238 log.info("Loaded local kubeconfig")
241def validate_manifest(m: dict[str, Any]) -> tuple[bool, str]:
242 """Validate a manifest before applying it to the cluster.
244 The queue processor mirrors the security checks performed by the REST
245 `manifest_processor` service (``gco/services/manifest_processor.py``)
246 so that the SQS path cannot bypass them. Checks performed:
248 1. **Namespace allowlist** — manifest namespace must be in
249 ``ALLOWED_NAMESPACES`` (from ``ALLOWED_NAMESPACES`` env var,
250 populated from ``cdk.json::job_validation_policy.allowed_namespaces``,
251 shared with the REST manifest_processor).
253 2. **Pod-level security policy** (configurable via cdk.json::
254 job_validation_policy.manifest_security_policy, shared between both
255 services). Rejects ``hostNetwork``, ``hostPID``, ``hostIPC``,
256 ``hostPath`` volumes, privileged pod security context, and
257 (if ``BLOCK_RUN_AS_ROOT``) pod-level ``runAsUser: 0``.
259 3. **Container-level security policy** — for every container kind
260 (regular, init, ephemeral) rejects ``privileged``,
261 ``allowPrivilegeEscalation``, ``capabilities.add``, and (if
262 ``BLOCK_RUN_AS_ROOT``) container-level ``runAsUser: 0``. Iterating
263 every container kind catches the classic "smuggle it via an init
264 container" bypass.
266 4. **Image registry allowlist** — every container's image must come
267 from ``TRUSTED_REGISTRIES`` (registry domains like ``nvcr.io``)
268 or ``TRUSTED_DOCKERHUB_ORGS`` (Docker Hub orgs like ``nvidia``).
269 Official Docker Hub images with no slash are always allowed. When
270 both allowlists are empty the check is disabled. Keep the lists in
271 sync with ``cdk.json::job_validation_policy.trusted_registries`` and
272 ``trusted_dockerhub_orgs`` — CDK wires the same config into both
273 services.
275 5. **Resource caps** — the TOTAL CPU, memory, and GPU across ALL
276 containers (regular + init + ephemeral) must not exceed
277 ``MAX_CPU``, ``MAX_MEMORY``, and ``MAX_GPU``. This matches
278 ``manifest_processor._validate_resource_limits`` — K8s accounts
279 init/ephemeral resources differently at scheduling time, but
280 from an enforcement perspective we sum them so an operator's
281 ``max_*_per_manifest`` budget is a hard cap regardless of where
282 the request is placed.
284 Returns:
285 ``(True, "")`` if the manifest is accepted, otherwise
286 ``(False, reason)`` where ``reason`` is a human-readable string.
287 """
288 kind = m.get("kind")
289 if not kind:
290 return False, "missing 'kind'"
291 api = m.get("apiVersion")
292 if not api:
293 return False, "missing 'apiVersion'"
294 meta = m.get("metadata", {})
295 if not meta.get("name"):
296 return False, "missing 'metadata.name'"
297 ns = meta.get("namespace", "default")
298 if ns not in ALLOWED_NAMESPACES:
299 return False, f"namespace '{ns}' not in allowed list {ALLOWED_NAMESPACES}"
301 # Get pod spec for security and resource checks.
302 # Handle multiple resource shapes, matching manifest_processor._get_all_containers:
303 # - Deployments / StatefulSets / ReplicaSets / DaemonSets / Jobs: spec.template.spec
304 # - CronJob: spec.jobTemplate.spec.template.spec
305 # - Pod (bare): spec (has 'containers' directly)
306 spec = m.get("spec", {})
307 pod_spec = None
308 if "template" in spec:
309 pod_spec = spec["template"].get("spec", {})
310 elif "jobTemplate" in spec:
311 pod_spec = spec["jobTemplate"].get("spec", {}).get("template", {}).get("spec", {})
312 elif "containers" in spec:
313 # Plain Pod manifest
314 pod_spec = spec
316 if pod_spec:
317 all_containers = _iter_containers(pod_spec)
319 # --- Pod-level security policy checks ---
320 # Mirror manifest_processor._validate_security_context so the SQS
321 # path enforces the same policy as the REST path.
322 if BLOCK_HOST_NETWORK and pod_spec.get("hostNetwork", False):
323 return False, "hostNetwork is not permitted"
324 if BLOCK_HOST_PID and pod_spec.get("hostPID", False):
325 return False, "hostPID is not permitted"
326 if BLOCK_HOST_IPC and pod_spec.get("hostIPC", False):
327 return False, "hostIPC is not permitted"
328 if BLOCK_HOST_PATH:
329 for volume in pod_spec.get("volumes", []) or []:
330 if volume.get("hostPath") is not None: 330 ↛ 329line 330 didn't jump to line 329 because the condition on line 330 was always true
331 return False, "hostPath volumes are not permitted"
333 pod_security_context = pod_spec.get("securityContext", {}) or {}
334 if BLOCK_PRIVILEGED and pod_security_context.get("privileged", False):
335 return False, "privileged pod security context is not permitted"
336 if BLOCK_RUN_AS_ROOT:
337 pod_run_as_user = pod_security_context.get("runAsUser")
338 if pod_run_as_user is not None and pod_run_as_user == 0:
339 return False, "running as root (runAsUser: 0) is not permitted"
341 # --- Container-level security policy checks ---
342 # Every toggle is applied to every container kind (regular, init,
343 # ephemeral). An init container running as root or with CAP_SYS_ADMIN
344 # has the same blast radius as a regular container running the same
345 # way; there is no reason to give any kind a free pass.
346 for kind, c in all_containers:
347 cname = c.get("name", "unknown")
348 sc = c.get("securityContext", {}) or {}
349 if BLOCK_PRIVILEGED and sc.get("privileged", False):
350 return False, f"{kind} '{cname}': privileged containers are not permitted"
351 if BLOCK_PRIVILEGE_ESCALATION and sc.get("allowPrivilegeEscalation", False):
352 return False, f"{kind} '{cname}': allowPrivilegeEscalation is not permitted"
353 if BLOCK_ADDED_CAPABILITIES:
354 added_caps = (sc.get("capabilities", {}) or {}).get("add", []) or []
355 if added_caps:
356 return False, f"{kind} '{cname}': added capabilities are not permitted"
357 if BLOCK_RUN_AS_ROOT:
358 ras = sc.get("runAsUser")
359 if ras is not None and ras == 0: 359 ↛ 346line 359 didn't jump to line 346 because the condition on line 359 was always true
360 return (
361 False,
362 f"{kind} '{cname}': running as root (runAsUser: 0) is not permitted",
363 )
365 # Enforce image registry allowlist (matches manifest_processor semantics)
366 for kind, c in all_containers:
367 image = c.get("image", "")
368 if not _is_image_trusted(image):
369 cname = c.get("name", "unknown")
370 return (
371 False,
372 f"{kind} '{cname}': untrusted image source '{image}'",
373 )
375 # Enforce resource caps across ALL container kinds.
376 # Sum the resource requests/limits of every container (regular,
377 # init, and ephemeral). This is stricter than the K8s scheduler's
378 # accounting but matches our security intent: an operator's
379 # configured "max CPU/memory/GPU per manifest" is a hard cap on
380 # the total resources a submitter can request regardless of
381 # which container kind carries the request.
382 total_gpu = 0
383 total_cpu = 0
384 total_memory = 0
385 for _kind, c in all_containers:
386 res = c.get("resources", {}) or {}
387 limits = res.get("limits", {}) or {}
388 requests = res.get("requests", {}) or {}
389 gpu = limits.get("nvidia.com/gpu") or requests.get("nvidia.com/gpu", "0") # nosec B113 - dict.get(), not HTTP requests
390 total_gpu += int(gpu)
391 cpu_str = limits.get("cpu") or requests.get("cpu", "0") # nosec B113 - dict.get(), not HTTP requests
392 if isinstance(cpu_str, str) and cpu_str.endswith("m"):
393 total_cpu += int(cpu_str[:-1])
394 else:
395 total_cpu += int(float(cpu_str) * 1000)
396 mem_str = limits.get("memory") or requests.get("memory", "0") # nosec B113 - dict.get(), not HTTP requests
397 if isinstance(mem_str, str): 397 ↛ 407line 397 didn't jump to line 407 because the condition on line 397 was always true
398 if mem_str.endswith("Gi"): 398 ↛ 399line 398 didn't jump to line 399 because the condition on line 398 was never true
399 total_memory += int(float(mem_str[:-2]) * 1024**3)
400 elif mem_str.endswith("Mi"):
401 total_memory += int(float(mem_str[:-2]) * 1024**2)
402 elif mem_str.endswith("Ki"): 402 ↛ 403line 402 didn't jump to line 403 because the condition on line 402 was never true
403 total_memory += int(float(mem_str[:-2]) * 1024)
404 else:
405 total_memory += int(mem_str)
406 else:
407 total_memory += int(mem_str)
409 errors = []
410 if total_gpu > MAX_GPU:
411 errors.append(f"GPU {total_gpu} exceeds max {MAX_GPU}")
412 if total_cpu > MAX_CPU:
413 errors.append(f"CPU {total_cpu}m exceeds max {MAX_CPU}m")
414 if total_memory > MAX_MEMORY: 414 ↛ 415line 414 didn't jump to line 415 because the condition on line 414 was never true
415 errors.append(
416 f"Memory {total_memory / (1024**3):.0f}Gi "
417 f"exceeds max {MAX_MEMORY / (1024**3):.0f}Gi"
418 )
419 if errors:
420 hint = (
421 "To raise limits, update queue_processor in cdk.json "
422 "and redeploy (see examples/README.md#troubleshooting)"
423 )
424 return False, "; ".join(errors) + f". {hint}"
426 return True, ""
429def _extract_pod_spec(manifest: dict[str, Any]) -> dict[str, Any] | None:
430 """Return the pod spec for any supported workload kind, or None.
432 Mirrors manifest_processor._extract_pod_spec so the SQS path and the
433 REST path apply the same injection semantics.
434 """
435 spec = manifest.get("spec")
436 if not isinstance(spec, dict):
437 return None
439 kind = manifest.get("kind", "")
441 # CronJob: spec.jobTemplate.spec.template.spec
442 if kind == "CronJob":
443 job_template = spec.get("jobTemplate")
444 if isinstance(job_template, dict): 444 ↛ 452line 444 didn't jump to line 452 because the condition on line 444 was always true
445 job_spec = job_template.get("spec")
446 if isinstance(job_spec, dict): 446 ↛ 452line 446 didn't jump to line 452 because the condition on line 446 was always true
447 template = job_spec.get("template")
448 if isinstance(template, dict): 448 ↛ 452line 448 didn't jump to line 452 because the condition on line 448 was always true
449 pod_spec = template.get("spec")
450 if isinstance(pod_spec, dict): 450 ↛ 452line 450 didn't jump to line 452 because the condition on line 450 was always true
451 return pod_spec
452 return None
454 # Deployment / StatefulSet / DaemonSet / ReplicaSet / Job: spec.template.spec
455 if "template" in spec:
456 template = spec.get("template")
457 if isinstance(template, dict): 457 ↛ 461line 457 didn't jump to line 461 because the condition on line 457 was always true
458 pod_spec = template.get("spec")
459 if isinstance(pod_spec, dict): 459 ↛ 461line 459 didn't jump to line 461 because the condition on line 459 was always true
460 return pod_spec
461 return None
463 # Bare Pod: spec contains "containers" directly
464 if "containers" in spec: 464 ↛ 467line 464 didn't jump to line 467 because the condition on line 464 was always true
465 return spec
467 return None
470def _inject_security_defaults(manifest: dict[str, Any]) -> dict[str, Any]:
471 """Inject security defaults into a user-submitted manifest in-place.
473 Currently sets ``automountServiceAccountToken: false`` on the pod spec
474 unless the user has explicitly set it either way (uses setdefault).
476 Mirrors manifest_processor._inject_security_defaults so jobs submitted
477 via SQS get the same SA-token-theft protection as those submitted via
478 the REST API.
479 """
480 pod_spec = _extract_pod_spec(manifest)
481 if pod_spec is not None:
482 pod_spec.setdefault("automountServiceAccountToken", False)
483 return manifest
486def apply_manifest(m: dict[str, Any]) -> str:
487 """Apply a single manifest using the dynamic Kubernetes client."""
488 # Inject security defaults BEFORE applying so user pods never
489 # auto-mount the default SA token (T-022 / M-113 parity with the
490 # REST manifest_processor path).
491 _inject_security_defaults(m)
493 dyn = dynamic.DynamicClient(client.ApiClient())
494 api_version = m["apiVersion"]
495 kind = m["kind"]
496 name = m["metadata"]["name"]
497 namespace = m["metadata"].get("namespace", "default")
499 try:
500 resource = dyn.resources.get(api_version=api_version, kind=kind)
501 except ResourceNotFoundError:
502 return f"SKIP unknown resource {api_version}/{kind}"
504 # For Jobs, delete completed/failed ones first so re-submission works.
505 # Without this, re-submitting the same job name would fail with a 409 conflict
506 # because Kubernetes doesn't allow creating a Job with the same name as an
507 # existing one (even if it's finished).
508 if kind == "Job":
509 try:
510 existing = resource.get(name=name, namespace=namespace)
511 conditions = existing.get("status", {}).get("conditions", [])
512 finished = any(c.get("type") in ("Complete", "Failed") for c in conditions)
513 if finished:
514 log.info(f"Deleting finished Job {namespace}/{name} before re-creation")
515 resource.delete(
516 name=name,
517 namespace=namespace,
518 body=client.V1DeleteOptions(propagation_policy="Background"),
519 )
520 time.sleep(2)
521 except (NotFoundError, ApiException) as e:
522 log.debug("Pre-create lookup for Job %s/%s failed: %s", namespace, name, e)
524 # Create-or-update pattern: try create first, fall back to patch on 409 (conflict).
525 # This is idempotent — safe to retry without side effects.
526 try:
527 if resource.namespaced:
528 resource.create(body=m, namespace=namespace)
529 else:
530 resource.create(body=m)
531 return f"CREATED {kind}/{name}"
532 except ApiException as e:
533 if e.status == 409:
534 try:
535 if resource.namespaced:
536 resource.patch(body=m, name=name, namespace=namespace)
537 else:
538 resource.patch(body=m, name=name)
539 return f"UPDATED {kind}/{name}"
540 except ApiException as patch_err:
541 return f"PATCH_FAILED {kind}/{name}: {patch_err.reason}"
542 return f"CREATE_FAILED {kind}/{name}: {e.reason}"
545def process_one_message() -> bool:
546 """Receive and process a single SQS message. Returns True on success."""
547 if not QUEUE_URL:
548 log.error("JOB_QUEUE_URL not set")
549 return False
551 sqs = boto3.client("sqs", region_name=REGION)
553 resp = sqs.receive_message(
554 QueueUrl=QUEUE_URL,
555 MaxNumberOfMessages=1,
556 WaitTimeSeconds=5,
557 MessageAttributeNames=["All"],
558 )
560 messages = resp.get("Messages", [])
561 if not messages:
562 log.info("No messages in queue")
563 return True
565 msg = messages[0]
566 receipt = msg["ReceiptHandle"]
567 body = json.loads(msg["Body"])
569 job_id = body.get("job_id", "unknown")
570 manifests = body.get("manifests", [])
571 log.info(f"Processing job_id={job_id}, manifests={len(manifests)}")
573 results: list[str] = []
574 failed = False
575 for i, m in enumerate(manifests):
576 ok, reason = validate_manifest(m)
577 if not ok:
578 log.error(f" manifest[{i}] validation failed: {reason}")
579 results.append(f"INVALID: {reason}")
580 failed = True
581 continue
582 result = apply_manifest(m)
583 log.info(f" manifest[{i}]: {result}")
584 results.append(result)
585 if "FAILED" in result:
586 failed = True
588 if failed:
589 # Don't delete the SQS message — it will become visible again after the
590 # visibility timeout (5 min) and retry. After 3 total failures, SQS
591 # moves it to the dead-letter queue for manual inspection.
592 log.error(f"Job {job_id} had failures — message will return to queue")
593 return False
595 sqs.delete_message(QueueUrl=QUEUE_URL, ReceiptHandle=receipt)
596 log.info(f"Job {job_id} processed successfully")
597 return True
600def main() -> None:
601 """Entry point for the queue processor."""
602 load_k8s()
603 success = process_one_message()
604 if not success:
605 sys.exit(1)
608if __name__ == "__main__":
609 main()