Coverage for gco/services/queue_processor.py: 94%

295 statements  

« prev     ^ index     » next       coverage.py v7.14.1, created at 2026-06-15 15:07 +0000

1""" 

2Queue Processor Service for GCO (Global Capacity Orchestrator on AWS). 

3 

4Polls the regional SQS job queue, reads Kubernetes manifests from messages, 

5validates them, and applies them to the cluster. Designed to run as a 

6short-lived pod managed by a KEDA ScaledJob that scales based on queue depth. 

7 

8Each invocation processes a single SQS message (which may contain multiple 

9manifests). On success the message is deleted; on failure it returns to the 

10queue after the visibility timeout (5 min) and eventually lands in the DLQ 

11after 3 failed attempts. 

12 

13Message format (produced by `gco jobs submit-sqs`): 

14 { 

15 "job_id": "abc123", 

16 "manifests": [<k8s manifest dicts>], 

17 "namespace": "gco-jobs", 

18 "priority": 0, 

19 "submitted_at": "2026-03-26T12:00:00+00:00" 

20 } 

21 

22Configuration via environment variables: 

23 JOB_QUEUE_URL: SQS queue URL to consume from (required) 

24 AWS_REGION: AWS region (default: us-east-1) 

25 ALLOWED_NAMESPACES: Comma-separated namespace allowlist 

26 (default: default,gco-jobs) 

27 MAX_GPU_PER_MANIFEST: Max GPUs summed across all containers 

28 (regular + init + ephemeral) (default: 4) 

29 MAX_CPU_PER_MANIFEST: Max CPU summed across all containers; accepts 

30 K8s suffixes ("500m" or "10" for cores) 

31 (default: 10000 millicores = 10 cores) 

32 MAX_MEMORY_PER_MANIFEST: Max memory summed across all containers; 

33 accepts K8s suffixes ("32Gi", "256Mi") or 

34 a bare byte count (default: 32Gi) 

35 TRUSTED_REGISTRIES: Comma-separated list of registry domains 

36 (e.g. "nvcr.io,public.ecr.aws"). Empty/unset 

37 disables the image registry check (fail-open). 

38 Keep in sync with 

39 cdk.json::job_validation_policy.trusted_registries. 

40 TRUSTED_DOCKERHUB_ORGS: Comma-separated list of Docker Hub org names 

41 (e.g. "nvidia,pytorch"). Empty/unset disables 

42 the check. Keep in sync with 

43 cdk.json::job_validation_policy.trusted_dockerhub_orgs. 

44 

45Security policy toggles (all default to true except ``BLOCK_RUN_AS_ROOT`` 

46which defaults to false, matching job_validation_policy.manifest_security_policy 

47in cdk.json). Each one controls whether the corresponding pod/container 

48setting is rejected; the REST manifest_processor enforces an identical set 

49so both submission paths apply the same policy: 

50 

51 BLOCK_PRIVILEGED: Reject ``securityContext.privileged: true`` 

52 on pod or container (default: true) 

53 BLOCK_PRIVILEGE_ESCALATION: Reject containers with 

54 allowPrivilegeEscalation=true 

55 (default: true) 

56 BLOCK_HOST_NETWORK: Block pods with hostNetwork=true 

57 (default: true) 

58 BLOCK_HOST_PID: Block pods with hostPID=true 

59 (default: true) 

60 BLOCK_HOST_IPC: Block pods with hostIPC=true 

61 (default: true) 

62 BLOCK_HOST_PATH: Block volumes referencing hostPath 

63 (default: true) 

64 BLOCK_ADDED_CAPABILITIES: Block containers that add Linux 

65 capabilities via securityContext.capabilities.add 

66 (default: true) 

67 BLOCK_RUN_AS_ROOT: Reject runAsUser: 0 at pod or container 

68 level (default: false — many public 

69 images still run as root) 

70""" 

71 

72from __future__ import annotations 

73 

74import json 

75import logging 

76import os 

77import sys 

78import time 

79from typing import Any 

80 

81import boto3 

82from kubernetes import client, config, dynamic 

83from kubernetes.client.rest import ApiException 

84from kubernetes.dynamic.exceptions import NotFoundError, ResourceNotFoundError 

85 

86logging.basicConfig( 

87 level=logging.INFO, 

88 format="%(asctime)s %(levelname)s [queue-processor] %(message)s", 

89) 

90log = logging.getLogger("queue-processor") 

91 

92 

93def _parse_cpu_string(cpu_str: str) -> int: 

94 """Parse a Kubernetes-style CPU string to millicores. 

95 

96 Accepts: 

97 - Millicore suffix: "500m" -> 500 

98 - Whole cores: "4" -> 4000 

99 - Bare millicore counts: "10000" (when > 999) stays as millicores 

100 """ 

101 if not cpu_str: 

102 return 0 

103 s = cpu_str.strip() 

104 if s.endswith("m"): 

105 return int(s[:-1]) 

106 return int(s) * 1000 

107 

108 

109def _parse_memory_string(memory_str: str) -> int: 

110 """Parse a Kubernetes-style memory string to bytes. 

111 

112 Accepts binary suffixes (Ki, Mi, Gi, Ti), decimal suffixes (k, M, G), 

113 or a bare byte count. 

114 """ 

115 if not memory_str: 

116 return 0 

117 s = memory_str.strip() 

118 if s.endswith("Ki"): 

119 return int(s[:-2]) * 1024 

120 if s.endswith("Mi"): 

121 return int(s[:-2]) * 1024**2 

122 if s.endswith("Gi"): 

123 return int(s[:-2]) * 1024**3 

124 if s.endswith("Ti"): 

125 return int(s[:-2]) * 1024**4 

126 if s.endswith("k"): 126 ↛ 127line 126 didn't jump to line 127 because the condition on line 126 was never true

127 return int(s[:-1]) * 1000 

128 if s.endswith("M"): 128 ↛ 129line 128 didn't jump to line 129 because the condition on line 128 was never true

129 return int(s[:-1]) * 1000**2 

130 if s.endswith("G"): 

131 return int(s[:-1]) * 1000**3 

132 return int(s) 

133 

134 

135# --- Configuration from environment --- 

136# These are set by the KEDA ScaledJob manifest (post-helm-sqs-consumer.yaml) 

137# and populated from cdk.json queue_processor settings during CDK deploy. 

138QUEUE_URL = os.environ.get("JOB_QUEUE_URL", "") 

139REGION = os.environ.get("AWS_REGION", os.environ.get("AWS_DEFAULT_REGION", "us-east-1")) 

140ALLOWED_NAMESPACES = set(os.environ.get("ALLOWED_NAMESPACES", "default,gco-jobs").split(",")) 

141MAX_CPU = _parse_cpu_string(os.environ.get("MAX_CPU_PER_MANIFEST", "10000")) # millicores 

142MAX_MEMORY = _parse_memory_string(os.environ.get("MAX_MEMORY_PER_MANIFEST", "32Gi")) # bytes 

143MAX_GPU = int(os.environ.get("MAX_GPU_PER_MANIFEST", "4")) 

144 

145# Trusted image sources (populated from cdk.json::manifest_processor at deploy time). 

146# Comma-separated env vars; empty/unset disables the check (fail-open logged). 

147# Keep in sync with gco/services/manifest_processor.py::_validate_image_sources. 

148TRUSTED_REGISTRIES = [ 

149 r.strip() for r in os.environ.get("TRUSTED_REGISTRIES", "").split(",") if r.strip() 

150] 

151TRUSTED_DOCKERHUB_ORGS = [ 

152 o.strip() for o in os.environ.get("TRUSTED_DOCKERHUB_ORGS", "").split(",") if o.strip() 

153] 

154 

155 

156def _env_bool(name: str, default: bool) -> bool: 

157 """Parse a boolean environment variable. 

158 

159 Empty/unset returns ``default``. Recognized truthy values: "true", "1", 

160 "yes", "on" (case-insensitive). Everything else is falsy. 

161 """ 

162 raw = os.environ.get(name) 

163 if raw is None or raw == "": 

164 return default 

165 return raw.strip().lower() in ("true", "1", "yes", "on") 

166 

167 

168# Security-policy toggles. Every one of these mirrors an attribute the REST 

169# manifest_processor exposes via cdk.json::job_validation_policy.manifest_security_policy. 

170# Both submission paths MUST enforce the same policy — an attacker holding 

171# sqs:SendMessage on the job queue must not be able to bypass checks the REST 

172# path applies. Structural parity is pinned by 

173# tests/test_queue_processor.py::TestSecurityPolicyParityWithManifestProcessor. 

174BLOCK_PRIVILEGED = _env_bool("BLOCK_PRIVILEGED", True) 

175BLOCK_PRIVILEGE_ESCALATION = _env_bool("BLOCK_PRIVILEGE_ESCALATION", True) 

176BLOCK_HOST_NETWORK = _env_bool("BLOCK_HOST_NETWORK", True) 

177BLOCK_HOST_PID = _env_bool("BLOCK_HOST_PID", True) 

178BLOCK_HOST_IPC = _env_bool("BLOCK_HOST_IPC", True) 

179BLOCK_HOST_PATH = _env_bool("BLOCK_HOST_PATH", True) 

180BLOCK_ADDED_CAPABILITIES = _env_bool("BLOCK_ADDED_CAPABILITIES", True) 

181BLOCK_RUN_AS_ROOT = _env_bool("BLOCK_RUN_AS_ROOT", False) 

182 

183 

184def _is_registry_domain(entry: str) -> bool: 

185 """True if the entry looks like a registry domain (has '.' or ':').""" 

186 return "." in entry or ":" in entry 

187 

188 

189def _iter_containers(pod_spec: dict[str, Any]) -> list[tuple[str, dict[str, Any]]]: 

190 """Yield (kind, container_dict) for every container, initContainer, and 

191 ephemeralContainer in a pod spec.""" 

192 out: list[tuple[str, dict[str, Any]]] = [] 

193 for c in pod_spec.get("containers", []) or []: 

194 out.append(("container", c)) 

195 for c in pod_spec.get("initContainers", []) or []: 

196 out.append(("initContainer", c)) 

197 for c in pod_spec.get("ephemeralContainers", []) or []: 197 ↛ 198line 197 didn't jump to line 198 because the loop on line 197 never started

198 out.append(("ephemeralContainer", c)) 

199 return out 

200 

201 

202def _is_image_trusted(image: str) -> bool: 

203 """True if the image reference is from a trusted registry or Docker Hub org. 

204 

205 Matches the semantics of manifest_processor._validate_image_sources: 

206 1. Official Docker Hub images (no '/') are always allowed 

207 2. Images with a registry domain (first segment has '.' or ':') must 

208 match an entry in TRUSTED_REGISTRIES exactly (or a multi-segment 

209 prefix like "public.ecr.aws/lambda") 

210 3. Docker Hub images with an org (first segment has no '.' or ':') must 

211 match an entry in TRUSTED_DOCKERHUB_ORGS 

212 

213 If both allowlists are empty the check is disabled (fail-open, logged). 

214 """ 

215 if not TRUSTED_REGISTRIES and not TRUSTED_DOCKERHUB_ORGS: 

216 return True 

217 if not image: 217 ↛ 218line 217 didn't jump to line 218 because the condition on line 217 was never true

218 return True 

219 if "/" not in image: 

220 # Case 1: Official Docker Hub image — always trusted 

221 return True 

222 first = image.split("/", 1)[0] 

223 if _is_registry_domain(first): 

224 for registry in TRUSTED_REGISTRIES: 

225 if first == registry or image.startswith(registry + "/"): 

226 return True 

227 return False 

228 return first in TRUSTED_DOCKERHUB_ORGS 

229 

230 

231def load_k8s() -> None: 

232 """Load Kubernetes configuration (in-cluster or local kubeconfig).""" 

233 try: 

234 config.load_incluster_config() 

235 log.info("Loaded in-cluster Kubernetes configuration") 

236 except config.ConfigException: 

237 config.load_kube_config() 

238 log.info("Loaded local kubeconfig") 

239 

240 

241def validate_manifest(m: dict[str, Any]) -> tuple[bool, str]: 

242 """Validate a manifest before applying it to the cluster. 

243 

244 The queue processor mirrors the security checks performed by the REST 

245 `manifest_processor` service (``gco/services/manifest_processor.py``) 

246 so that the SQS path cannot bypass them. Checks performed: 

247 

248 1. **Namespace allowlist** — manifest namespace must be in 

249 ``ALLOWED_NAMESPACES`` (from ``ALLOWED_NAMESPACES`` env var, 

250 populated from ``cdk.json::job_validation_policy.allowed_namespaces``, 

251 shared with the REST manifest_processor). 

252 

253 2. **Pod-level security policy** (configurable via cdk.json:: 

254 job_validation_policy.manifest_security_policy, shared between both 

255 services). Rejects ``hostNetwork``, ``hostPID``, ``hostIPC``, 

256 ``hostPath`` volumes, privileged pod security context, and 

257 (if ``BLOCK_RUN_AS_ROOT``) pod-level ``runAsUser: 0``. 

258 

259 3. **Container-level security policy** — for every container kind 

260 (regular, init, ephemeral) rejects ``privileged``, 

261 ``allowPrivilegeEscalation``, ``capabilities.add``, and (if 

262 ``BLOCK_RUN_AS_ROOT``) container-level ``runAsUser: 0``. Iterating 

263 every container kind catches the classic "smuggle it via an init 

264 container" bypass. 

265 

266 4. **Image registry allowlist** — every container's image must come 

267 from ``TRUSTED_REGISTRIES`` (registry domains like ``nvcr.io``) 

268 or ``TRUSTED_DOCKERHUB_ORGS`` (Docker Hub orgs like ``nvidia``). 

269 Official Docker Hub images with no slash are always allowed. When 

270 both allowlists are empty the check is disabled. Keep the lists in 

271 sync with ``cdk.json::job_validation_policy.trusted_registries`` and 

272 ``trusted_dockerhub_orgs`` — CDK wires the same config into both 

273 services. 

274 

275 5. **Resource caps** — the TOTAL CPU, memory, and GPU across ALL 

276 containers (regular + init + ephemeral) must not exceed 

277 ``MAX_CPU``, ``MAX_MEMORY``, and ``MAX_GPU``. This matches 

278 ``manifest_processor._validate_resource_limits`` — K8s accounts 

279 init/ephemeral resources differently at scheduling time, but 

280 from an enforcement perspective we sum them so an operator's 

281 ``max_*_per_manifest`` budget is a hard cap regardless of where 

282 the request is placed. 

283 

284 Returns: 

285 ``(True, "")`` if the manifest is accepted, otherwise 

286 ``(False, reason)`` where ``reason`` is a human-readable string. 

287 """ 

288 kind = m.get("kind") 

289 if not kind: 

290 return False, "missing 'kind'" 

291 api = m.get("apiVersion") 

292 if not api: 

293 return False, "missing 'apiVersion'" 

294 meta = m.get("metadata", {}) 

295 if not meta.get("name"): 

296 return False, "missing 'metadata.name'" 

297 ns = meta.get("namespace", "default") 

298 if ns not in ALLOWED_NAMESPACES: 

299 return False, f"namespace '{ns}' not in allowed list {ALLOWED_NAMESPACES}" 

300 

301 # Get pod spec for security and resource checks. 

302 # Handle multiple resource shapes, matching manifest_processor._get_all_containers: 

303 # - Deployments / StatefulSets / ReplicaSets / DaemonSets / Jobs: spec.template.spec 

304 # - CronJob: spec.jobTemplate.spec.template.spec 

305 # - Pod (bare): spec (has 'containers' directly) 

306 spec = m.get("spec", {}) 

307 pod_spec = None 

308 if "template" in spec: 

309 pod_spec = spec["template"].get("spec", {}) 

310 elif "jobTemplate" in spec: 

311 pod_spec = spec["jobTemplate"].get("spec", {}).get("template", {}).get("spec", {}) 

312 elif "containers" in spec: 

313 # Plain Pod manifest 

314 pod_spec = spec 

315 

316 if pod_spec: 

317 all_containers = _iter_containers(pod_spec) 

318 

319 # --- Pod-level security policy checks --- 

320 # Mirror manifest_processor._validate_security_context so the SQS 

321 # path enforces the same policy as the REST path. 

322 if BLOCK_HOST_NETWORK and pod_spec.get("hostNetwork", False): 

323 return False, "hostNetwork is not permitted" 

324 if BLOCK_HOST_PID and pod_spec.get("hostPID", False): 

325 return False, "hostPID is not permitted" 

326 if BLOCK_HOST_IPC and pod_spec.get("hostIPC", False): 

327 return False, "hostIPC is not permitted" 

328 if BLOCK_HOST_PATH: 

329 for volume in pod_spec.get("volumes", []) or []: 

330 if volume.get("hostPath") is not None: 330 ↛ 329line 330 didn't jump to line 329 because the condition on line 330 was always true

331 return False, "hostPath volumes are not permitted" 

332 

333 pod_security_context = pod_spec.get("securityContext", {}) or {} 

334 if BLOCK_PRIVILEGED and pod_security_context.get("privileged", False): 

335 return False, "privileged pod security context is not permitted" 

336 if BLOCK_RUN_AS_ROOT: 

337 pod_run_as_user = pod_security_context.get("runAsUser") 

338 if pod_run_as_user is not None and pod_run_as_user == 0: 

339 return False, "running as root (runAsUser: 0) is not permitted" 

340 

341 # --- Container-level security policy checks --- 

342 # Every toggle is applied to every container kind (regular, init, 

343 # ephemeral). An init container running as root or with CAP_SYS_ADMIN 

344 # has the same blast radius as a regular container running the same 

345 # way; there is no reason to give any kind a free pass. 

346 for kind, c in all_containers: 

347 cname = c.get("name", "unknown") 

348 sc = c.get("securityContext", {}) or {} 

349 if BLOCK_PRIVILEGED and sc.get("privileged", False): 

350 return False, f"{kind} '{cname}': privileged containers are not permitted" 

351 if BLOCK_PRIVILEGE_ESCALATION and sc.get("allowPrivilegeEscalation", False): 

352 return False, f"{kind} '{cname}': allowPrivilegeEscalation is not permitted" 

353 if BLOCK_ADDED_CAPABILITIES: 

354 added_caps = (sc.get("capabilities", {}) or {}).get("add", []) or [] 

355 if added_caps: 

356 return False, f"{kind} '{cname}': added capabilities are not permitted" 

357 if BLOCK_RUN_AS_ROOT: 

358 ras = sc.get("runAsUser") 

359 if ras is not None and ras == 0: 359 ↛ 346line 359 didn't jump to line 346 because the condition on line 359 was always true

360 return ( 

361 False, 

362 f"{kind} '{cname}': running as root (runAsUser: 0) is not permitted", 

363 ) 

364 

365 # Enforce image registry allowlist (matches manifest_processor semantics) 

366 for kind, c in all_containers: 

367 image = c.get("image", "") 

368 if not _is_image_trusted(image): 

369 cname = c.get("name", "unknown") 

370 return ( 

371 False, 

372 f"{kind} '{cname}': untrusted image source '{image}'", 

373 ) 

374 

375 # Enforce resource caps across ALL container kinds. 

376 # Sum the resource requests/limits of every container (regular, 

377 # init, and ephemeral). This is stricter than the K8s scheduler's 

378 # accounting but matches our security intent: an operator's 

379 # configured "max CPU/memory/GPU per manifest" is a hard cap on 

380 # the total resources a submitter can request regardless of 

381 # which container kind carries the request. 

382 total_gpu = 0 

383 total_cpu = 0 

384 total_memory = 0 

385 for _kind, c in all_containers: 

386 res = c.get("resources", {}) or {} 

387 limits = res.get("limits", {}) or {} 

388 requests = res.get("requests", {}) or {} 

389 gpu = limits.get("nvidia.com/gpu") or requests.get("nvidia.com/gpu", "0") # nosec B113 - dict.get(), not HTTP requests 

390 total_gpu += int(gpu) 

391 cpu_str = limits.get("cpu") or requests.get("cpu", "0") # nosec B113 - dict.get(), not HTTP requests 

392 if isinstance(cpu_str, str) and cpu_str.endswith("m"): 

393 total_cpu += int(cpu_str[:-1]) 

394 else: 

395 total_cpu += int(float(cpu_str) * 1000) 

396 mem_str = limits.get("memory") or requests.get("memory", "0") # nosec B113 - dict.get(), not HTTP requests 

397 if isinstance(mem_str, str): 397 ↛ 407line 397 didn't jump to line 407 because the condition on line 397 was always true

398 if mem_str.endswith("Gi"): 398 ↛ 399line 398 didn't jump to line 399 because the condition on line 398 was never true

399 total_memory += int(float(mem_str[:-2]) * 1024**3) 

400 elif mem_str.endswith("Mi"): 

401 total_memory += int(float(mem_str[:-2]) * 1024**2) 

402 elif mem_str.endswith("Ki"): 402 ↛ 403line 402 didn't jump to line 403 because the condition on line 402 was never true

403 total_memory += int(float(mem_str[:-2]) * 1024) 

404 else: 

405 total_memory += int(mem_str) 

406 else: 

407 total_memory += int(mem_str) 

408 

409 errors = [] 

410 if total_gpu > MAX_GPU: 

411 errors.append(f"GPU {total_gpu} exceeds max {MAX_GPU}") 

412 if total_cpu > MAX_CPU: 

413 errors.append(f"CPU {total_cpu}m exceeds max {MAX_CPU}m") 

414 if total_memory > MAX_MEMORY: 414 ↛ 415line 414 didn't jump to line 415 because the condition on line 414 was never true

415 errors.append( 

416 f"Memory {total_memory / (1024**3):.0f}Gi " 

417 f"exceeds max {MAX_MEMORY / (1024**3):.0f}Gi" 

418 ) 

419 if errors: 

420 hint = ( 

421 "To raise limits, update queue_processor in cdk.json " 

422 "and redeploy (see examples/README.md#troubleshooting)" 

423 ) 

424 return False, "; ".join(errors) + f". {hint}" 

425 

426 return True, "" 

427 

428 

429def _extract_pod_spec(manifest: dict[str, Any]) -> dict[str, Any] | None: 

430 """Return the pod spec for any supported workload kind, or None. 

431 

432 Mirrors manifest_processor._extract_pod_spec so the SQS path and the 

433 REST path apply the same injection semantics. 

434 """ 

435 spec = manifest.get("spec") 

436 if not isinstance(spec, dict): 

437 return None 

438 

439 kind = manifest.get("kind", "") 

440 

441 # CronJob: spec.jobTemplate.spec.template.spec 

442 if kind == "CronJob": 

443 job_template = spec.get("jobTemplate") 

444 if isinstance(job_template, dict): 444 ↛ 452line 444 didn't jump to line 452 because the condition on line 444 was always true

445 job_spec = job_template.get("spec") 

446 if isinstance(job_spec, dict): 446 ↛ 452line 446 didn't jump to line 452 because the condition on line 446 was always true

447 template = job_spec.get("template") 

448 if isinstance(template, dict): 448 ↛ 452line 448 didn't jump to line 452 because the condition on line 448 was always true

449 pod_spec = template.get("spec") 

450 if isinstance(pod_spec, dict): 450 ↛ 452line 450 didn't jump to line 452 because the condition on line 450 was always true

451 return pod_spec 

452 return None 

453 

454 # Deployment / StatefulSet / DaemonSet / ReplicaSet / Job: spec.template.spec 

455 if "template" in spec: 

456 template = spec.get("template") 

457 if isinstance(template, dict): 457 ↛ 461line 457 didn't jump to line 461 because the condition on line 457 was always true

458 pod_spec = template.get("spec") 

459 if isinstance(pod_spec, dict): 459 ↛ 461line 459 didn't jump to line 461 because the condition on line 459 was always true

460 return pod_spec 

461 return None 

462 

463 # Bare Pod: spec contains "containers" directly 

464 if "containers" in spec: 464 ↛ 467line 464 didn't jump to line 467 because the condition on line 464 was always true

465 return spec 

466 

467 return None 

468 

469 

470def _inject_security_defaults(manifest: dict[str, Any]) -> dict[str, Any]: 

471 """Inject security defaults into a user-submitted manifest in-place. 

472 

473 Currently sets ``automountServiceAccountToken: false`` on the pod spec 

474 unless the user has explicitly set it either way (uses setdefault). 

475 

476 Mirrors manifest_processor._inject_security_defaults so jobs submitted 

477 via SQS get the same SA-token-theft protection as those submitted via 

478 the REST API. 

479 """ 

480 pod_spec = _extract_pod_spec(manifest) 

481 if pod_spec is not None: 

482 pod_spec.setdefault("automountServiceAccountToken", False) 

483 return manifest 

484 

485 

486def apply_manifest(m: dict[str, Any]) -> str: 

487 """Apply a single manifest using the dynamic Kubernetes client.""" 

488 # Inject security defaults BEFORE applying so user pods never 

489 # auto-mount the default SA token (T-022 / M-113 parity with the 

490 # REST manifest_processor path). 

491 _inject_security_defaults(m) 

492 

493 dyn = dynamic.DynamicClient(client.ApiClient()) 

494 api_version = m["apiVersion"] 

495 kind = m["kind"] 

496 name = m["metadata"]["name"] 

497 namespace = m["metadata"].get("namespace", "default") 

498 

499 try: 

500 resource = dyn.resources.get(api_version=api_version, kind=kind) 

501 except ResourceNotFoundError: 

502 return f"SKIP unknown resource {api_version}/{kind}" 

503 

504 # For Jobs, delete completed/failed ones first so re-submission works. 

505 # Without this, re-submitting the same job name would fail with a 409 conflict 

506 # because Kubernetes doesn't allow creating a Job with the same name as an 

507 # existing one (even if it's finished). 

508 if kind == "Job": 

509 try: 

510 existing = resource.get(name=name, namespace=namespace) 

511 conditions = existing.get("status", {}).get("conditions", []) 

512 finished = any(c.get("type") in ("Complete", "Failed") for c in conditions) 

513 if finished: 

514 log.info(f"Deleting finished Job {namespace}/{name} before re-creation") 

515 resource.delete( 

516 name=name, 

517 namespace=namespace, 

518 body=client.V1DeleteOptions(propagation_policy="Background"), 

519 ) 

520 time.sleep(2) 

521 except (NotFoundError, ApiException) as e: 

522 log.debug("Pre-create lookup for Job %s/%s failed: %s", namespace, name, e) 

523 

524 # Create-or-update pattern: try create first, fall back to patch on 409 (conflict). 

525 # This is idempotent — safe to retry without side effects. 

526 try: 

527 if resource.namespaced: 

528 resource.create(body=m, namespace=namespace) 

529 else: 

530 resource.create(body=m) 

531 return f"CREATED {kind}/{name}" 

532 except ApiException as e: 

533 if e.status == 409: 

534 try: 

535 if resource.namespaced: 

536 resource.patch(body=m, name=name, namespace=namespace) 

537 else: 

538 resource.patch(body=m, name=name) 

539 return f"UPDATED {kind}/{name}" 

540 except ApiException as patch_err: 

541 return f"PATCH_FAILED {kind}/{name}: {patch_err.reason}" 

542 return f"CREATE_FAILED {kind}/{name}: {e.reason}" 

543 

544 

545def process_one_message() -> bool: 

546 """Receive and process a single SQS message. Returns True on success.""" 

547 if not QUEUE_URL: 

548 log.error("JOB_QUEUE_URL not set") 

549 return False 

550 

551 sqs = boto3.client("sqs", region_name=REGION) 

552 

553 resp = sqs.receive_message( 

554 QueueUrl=QUEUE_URL, 

555 MaxNumberOfMessages=1, 

556 WaitTimeSeconds=5, 

557 MessageAttributeNames=["All"], 

558 ) 

559 

560 messages = resp.get("Messages", []) 

561 if not messages: 

562 log.info("No messages in queue") 

563 return True 

564 

565 msg = messages[0] 

566 receipt = msg["ReceiptHandle"] 

567 body = json.loads(msg["Body"]) 

568 

569 job_id = body.get("job_id", "unknown") 

570 manifests = body.get("manifests", []) 

571 log.info(f"Processing job_id={job_id}, manifests={len(manifests)}") 

572 

573 results: list[str] = [] 

574 failed = False 

575 for i, m in enumerate(manifests): 

576 ok, reason = validate_manifest(m) 

577 if not ok: 

578 log.error(f" manifest[{i}] validation failed: {reason}") 

579 results.append(f"INVALID: {reason}") 

580 failed = True 

581 continue 

582 result = apply_manifest(m) 

583 log.info(f" manifest[{i}]: {result}") 

584 results.append(result) 

585 if "FAILED" in result: 

586 failed = True 

587 

588 if failed: 

589 # Don't delete the SQS message — it will become visible again after the 

590 # visibility timeout (5 min) and retry. After 3 total failures, SQS 

591 # moves it to the dead-letter queue for manual inspection. 

592 log.error(f"Job {job_id} had failures — message will return to queue") 

593 return False 

594 

595 sqs.delete_message(QueueUrl=QUEUE_URL, ReceiptHandle=receipt) 

596 log.info(f"Job {job_id} processed successfully") 

597 return True 

598 

599 

600def main() -> None: 

601 """Entry point for the queue processor.""" 

602 load_k8s() 

603 success = process_one_message() 

604 if not success: 

605 sys.exit(1) 

606 

607 

608if __name__ == "__main__": 

609 main()