Coverage for gco / services / queue_processor.py: 94%

295 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-30 21:47 +0000

1""" 

2Queue Processor Service for GCO (Global Capacity Orchestrator on AWS). 

3 

4Polls the regional SQS job queue, reads Kubernetes manifests from messages, 

5validates them, and applies them to the cluster. Designed to run as a 

6short-lived pod managed by a KEDA ScaledJob that scales based on queue depth. 

7 

8Each invocation processes a single SQS message (which may contain multiple 

9manifests). On success the message is deleted; on failure it returns to the 

10queue after the visibility timeout (5 min) and eventually lands in the DLQ 

11after 3 failed attempts. 

12 

13Message format (produced by `gco jobs submit-sqs`): 

14 { 

15 "job_id": "abc123", 

16 "manifests": [<k8s manifest dicts>], 

17 "namespace": "gco-jobs", 

18 "priority": 0, 

19 "submitted_at": "2026-03-26T12:00:00+00:00" 

20 } 

21 

22Configuration via environment variables: 

23 JOB_QUEUE_URL: SQS queue URL to consume from (required) 

24 AWS_REGION: AWS region (default: us-east-1) 

25 ALLOWED_NAMESPACES: Comma-separated namespace allowlist 

26 (default: default,gco-jobs) 

27 MAX_GPU_PER_MANIFEST: Max GPUs summed across all containers 

28 (regular + init + ephemeral) (default: 4) 

29 MAX_CPU_PER_MANIFEST: Max CPU summed across all containers; accepts 

30 K8s suffixes ("500m" or "10" for cores) 

31 (default: 10000 millicores = 10 cores) 

32 MAX_MEMORY_PER_MANIFEST: Max memory summed across all containers; 

33 accepts K8s suffixes ("32Gi", "256Mi") or 

34 a bare byte count (default: 32Gi) 

35 TRUSTED_REGISTRIES: Comma-separated list of registry domains 

36 (e.g. "nvcr.io,public.ecr.aws"). Empty/unset 

37 disables the image registry check (fail-open). 

38 Keep in sync with 

39 cdk.json::job_validation_policy.trusted_registries. 

40 TRUSTED_DOCKERHUB_ORGS: Comma-separated list of Docker Hub org names 

41 (e.g. "nvidia,pytorch"). Empty/unset disables 

42 the check. Keep in sync with 

43 cdk.json::job_validation_policy.trusted_dockerhub_orgs. 

44 

45Security policy toggles (all default to true except ``BLOCK_RUN_AS_ROOT`` 

46which defaults to false, matching job_validation_policy.manifest_security_policy 

47in cdk.json). Each one controls whether the corresponding pod/container 

48setting is rejected; the REST manifest_processor enforces an identical set 

49so both submission paths apply the same policy: 

50 

51 BLOCK_PRIVILEGED: Reject ``securityContext.privileged: true`` 

52 on pod or container (default: true) 

53 BLOCK_PRIVILEGE_ESCALATION: Reject containers with 

54 allowPrivilegeEscalation=true 

55 (default: true) 

56 BLOCK_HOST_NETWORK: Block pods with hostNetwork=true 

57 (default: true) 

58 BLOCK_HOST_PID: Block pods with hostPID=true 

59 (default: true) 

60 BLOCK_HOST_IPC: Block pods with hostIPC=true 

61 (default: true) 

62 BLOCK_HOST_PATH: Block volumes referencing hostPath 

63 (default: true) 

64 BLOCK_ADDED_CAPABILITIES: Block containers that add Linux 

65 capabilities via securityContext.capabilities.add 

66 (default: true) 

67 BLOCK_RUN_AS_ROOT: Reject runAsUser: 0 at pod or container 

68 level (default: false — many public 

69 images still run as root) 

70""" 

71 

72from __future__ import annotations 

73 

74import json 

75import logging 

76import os 

77import sys 

78import time 

79from typing import Any 

80 

81import boto3 

82from kubernetes import client, config, dynamic 

83from kubernetes.client.rest import ApiException 

84from kubernetes.dynamic.exceptions import NotFoundError, ResourceNotFoundError 

85 

86logging.basicConfig( 

87 level=logging.INFO, 

88 format="%(asctime)s %(levelname)s [queue-processor] %(message)s", 

89) 

90log = logging.getLogger("queue-processor") 

91 

92 

93def _parse_cpu_string(cpu_str: str) -> int: 

94 """Parse a Kubernetes-style CPU string to millicores. 

95 

96 Accepts: 

97 - Millicore suffix: "500m" -> 500 

98 - Whole cores: "4" -> 4000 

99 - Bare millicore counts: "10000" (when > 999) stays as millicores 

100 """ 

101 if not cpu_str: 

102 return 0 

103 s = cpu_str.strip() 

104 if s.endswith("m"): 

105 return int(s[:-1]) 

106 return int(s) * 1000 

107 

108 

109def _parse_memory_string(memory_str: str) -> int: 

110 """Parse a Kubernetes-style memory string to bytes. 

111 

112 Accepts binary suffixes (Ki, Mi, Gi, Ti), decimal suffixes (k, M, G), 

113 or a bare byte count. 

114 """ 

115 if not memory_str: 

116 return 0 

117 s = memory_str.strip() 

118 if s.endswith("Ki"): 

119 return int(s[:-2]) * 1024 

120 if s.endswith("Mi"): 

121 return int(s[:-2]) * 1024**2 

122 if s.endswith("Gi"): 

123 return int(s[:-2]) * 1024**3 

124 if s.endswith("Ti"): 

125 return int(s[:-2]) * 1024**4 

126 if s.endswith("k"): 126 ↛ 127line 126 didn't jump to line 127 because the condition on line 126 was never true

127 return int(s[:-1]) * 1000 

128 if s.endswith("M"): 128 ↛ 129line 128 didn't jump to line 129 because the condition on line 128 was never true

129 return int(s[:-1]) * 1000**2 

130 if s.endswith("G"): 

131 return int(s[:-1]) * 1000**3 

132 return int(s) 

133 

134 

135# --- Configuration from environment --- 

136# These are set by the KEDA ScaledJob manifest (post-helm-sqs-consumer.yaml) 

137# and populated from cdk.json queue_processor settings during CDK deploy. 

138QUEUE_URL = os.environ.get("JOB_QUEUE_URL", "") 

139REGION = os.environ.get("AWS_REGION", os.environ.get("AWS_DEFAULT_REGION", "us-east-1")) 

140ALLOWED_NAMESPACES = set(os.environ.get("ALLOWED_NAMESPACES", "default,gco-jobs").split(",")) 

141MAX_CPU = _parse_cpu_string(os.environ.get("MAX_CPU_PER_MANIFEST", "10000")) # millicores 

142MAX_MEMORY = _parse_memory_string(os.environ.get("MAX_MEMORY_PER_MANIFEST", "32Gi")) # bytes 

143MAX_GPU = int(os.environ.get("MAX_GPU_PER_MANIFEST", "4")) 

144 

145# Trusted image sources (populated from cdk.json::manifest_processor at deploy time). 

146# Comma-separated env vars; empty/unset disables the check (fail-open logged). 

147# Keep in sync with gco/services/manifest_processor.py::_validate_image_sources. 

148TRUSTED_REGISTRIES = [ 

149 r.strip() for r in os.environ.get("TRUSTED_REGISTRIES", "").split(",") if r.strip() 

150] 

151TRUSTED_DOCKERHUB_ORGS = [ 

152 o.strip() for o in os.environ.get("TRUSTED_DOCKERHUB_ORGS", "").split(",") if o.strip() 

153] 

154 

155 

156def _env_bool(name: str, default: bool) -> bool: 

157 """Parse a boolean environment variable. 

158 

159 Empty/unset returns ``default``. Recognized truthy values: "true", "1", 

160 "yes", "on" (case-insensitive). Everything else is falsy. 

161 """ 

162 raw = os.environ.get(name) 

163 if raw is None or raw == "": 

164 return default 

165 return raw.strip().lower() in ("true", "1", "yes", "on") 

166 

167 

168# Security-policy toggles. Every one of these mirrors an attribute the REST 

169# manifest_processor exposes via cdk.json::job_validation_policy.manifest_security_policy. 

170# Both submission paths MUST enforce the same policy — an attacker holding 

171# sqs:SendMessage on the job queue must not be able to bypass checks the REST 

172# path applies. Structural parity is pinned by 

173# tests/test_queue_processor.py::TestSecurityPolicyParityWithManifestProcessor. 

174BLOCK_PRIVILEGED = _env_bool("BLOCK_PRIVILEGED", True) 

175BLOCK_PRIVILEGE_ESCALATION = _env_bool("BLOCK_PRIVILEGE_ESCALATION", True) 

176BLOCK_HOST_NETWORK = _env_bool("BLOCK_HOST_NETWORK", True) 

177BLOCK_HOST_PID = _env_bool("BLOCK_HOST_PID", True) 

178BLOCK_HOST_IPC = _env_bool("BLOCK_HOST_IPC", True) 

179BLOCK_HOST_PATH = _env_bool("BLOCK_HOST_PATH", True) 

180BLOCK_ADDED_CAPABILITIES = _env_bool("BLOCK_ADDED_CAPABILITIES", True) 

181BLOCK_RUN_AS_ROOT = _env_bool("BLOCK_RUN_AS_ROOT", False) 

182 

183 

184def _is_registry_domain(entry: str) -> bool: 

185 """True if the entry looks like a registry domain (has '.' or ':').""" 

186 return "." in entry or ":" in entry 

187 

188 

189def _iter_containers(pod_spec: dict[str, Any]) -> list[tuple[str, dict[str, Any]]]: 

190 """Yield (kind, container_dict) for every container, initContainer, and 

191 ephemeralContainer in a pod spec.""" 

192 out: list[tuple[str, dict[str, Any]]] = [] 

193 for c in pod_spec.get("containers", []) or []: 

194 out.append(("container", c)) 

195 for c in pod_spec.get("initContainers", []) or []: 

196 out.append(("initContainer", c)) 

197 for c in pod_spec.get("ephemeralContainers", []) or []: 197 ↛ 198line 197 didn't jump to line 198 because the loop on line 197 never started

198 out.append(("ephemeralContainer", c)) 

199 return out 

200 

201 

202def _is_image_trusted(image: str) -> bool: 

203 """True if the image reference is from a trusted registry or Docker Hub org. 

204 

205 Matches the semantics of manifest_processor._validate_image_sources: 

206 1. Official Docker Hub images (no '/') are always allowed 

207 2. Images with a registry domain (first segment has '.' or ':') must 

208 match an entry in TRUSTED_REGISTRIES exactly (or a multi-segment 

209 prefix like "public.ecr.aws/lambda") 

210 3. Docker Hub images with an org (first segment has no '.' or ':') must 

211 match an entry in TRUSTED_DOCKERHUB_ORGS 

212 

213 If both allowlists are empty the check is disabled (fail-open, logged). 

214 """ 

215 if not TRUSTED_REGISTRIES and not TRUSTED_DOCKERHUB_ORGS: 

216 return True 

217 if not image: 217 ↛ 218line 217 didn't jump to line 218 because the condition on line 217 was never true

218 return True 

219 if "/" not in image: 

220 # Case 1: Official Docker Hub image — always trusted 

221 return True 

222 first = image.split("/", 1)[0] 

223 if _is_registry_domain(first): 

224 for registry in TRUSTED_REGISTRIES: 

225 if first == registry or image.startswith(registry + "/"): 

226 return True 

227 return False 

228 return first in TRUSTED_DOCKERHUB_ORGS 

229 

230 

231def load_k8s() -> None: 

232 """Load Kubernetes configuration (in-cluster or local kubeconfig).""" 

233 try: 

234 config.load_incluster_config() 

235 log.info("Loaded in-cluster Kubernetes configuration") 

236 except config.ConfigException: 

237 config.load_kube_config() 

238 log.info("Loaded local kubeconfig") 

239 

240 

241def validate_manifest(m: dict[str, Any]) -> tuple[bool, str]: 

242 """Validate a manifest before applying it to the cluster. 

243 

244 The queue processor mirrors the security checks performed by the REST 

245 `manifest_processor` service (``gco/services/manifest_processor.py``) 

246 so that the SQS path cannot bypass them. Checks performed: 

247 

248 1. **Namespace allowlist** — manifest namespace must be in 

249 ``ALLOWED_NAMESPACES`` (from ``ALLOWED_NAMESPACES`` env var, 

250 populated from ``cdk.json::job_validation_policy.allowed_namespaces``, 

251 shared with the REST manifest_processor). 

252 

253 2. **Pod-level security policy** (configurable via cdk.json:: 

254 job_validation_policy.manifest_security_policy, shared between both 

255 services). Rejects ``hostNetwork``, ``hostPID``, ``hostIPC``, 

256 ``hostPath`` volumes, privileged pod security context, and 

257 (if ``BLOCK_RUN_AS_ROOT``) pod-level ``runAsUser: 0``. 

258 

259 3. **Container-level security policy** — for every container kind 

260 (regular, init, ephemeral) rejects ``privileged``, 

261 ``allowPrivilegeEscalation``, ``capabilities.add``, and (if 

262 ``BLOCK_RUN_AS_ROOT``) container-level ``runAsUser: 0``. Iterating 

263 every container kind catches the classic "smuggle it via an init 

264 container" bypass. 

265 

266 4. **Image registry allowlist** — every container's image must come 

267 from ``TRUSTED_REGISTRIES`` (registry domains like ``nvcr.io``) 

268 or ``TRUSTED_DOCKERHUB_ORGS`` (Docker Hub orgs like ``nvidia``). 

269 Official Docker Hub images with no slash are always allowed. When 

270 both allowlists are empty the check is disabled. Keep the lists in 

271 sync with ``cdk.json::job_validation_policy.trusted_registries`` and 

272 ``trusted_dockerhub_orgs`` — CDK wires the same config into both 

273 services. 

274 

275 5. **Resource caps** — the TOTAL CPU, memory, and GPU across ALL 

276 containers (regular + init + ephemeral) must not exceed 

277 ``MAX_CPU``, ``MAX_MEMORY``, and ``MAX_GPU``. This matches 

278 ``manifest_processor._validate_resource_limits`` — K8s accounts 

279 init/ephemeral resources differently at scheduling time, but 

280 from an enforcement perspective we sum them so an operator's 

281 ``max_*_per_manifest`` budget is a hard cap regardless of where 

282 the request is placed. 

283 

284 Returns: 

285 ``(True, "")`` if the manifest is accepted, otherwise 

286 ``(False, reason)`` where ``reason`` is a human-readable string. 

287 """ 

288 kind = m.get("kind") 

289 if not kind: 

290 return False, "missing 'kind'" 

291 api = m.get("apiVersion") 

292 if not api: 

293 return False, "missing 'apiVersion'" 

294 meta = m.get("metadata", {}) 

295 if not meta.get("name"): 

296 return False, "missing 'metadata.name'" 

297 ns = meta.get("namespace", "default") 

298 if ns not in ALLOWED_NAMESPACES: 

299 return False, f"namespace '{ns}' not in allowed list {ALLOWED_NAMESPACES}" 

300 

301 # Get pod spec for security and resource checks. 

302 # Handle multiple resource shapes, matching manifest_processor._get_all_containers: 

303 # - Deployments / StatefulSets / ReplicaSets / DaemonSets / Jobs: spec.template.spec 

304 # - CronJob: spec.jobTemplate.spec.template.spec 

305 # - Pod (bare): spec (has 'containers' directly) 

306 spec = m.get("spec", {}) 

307 pod_spec = None 

308 if "template" in spec: 

309 pod_spec = spec["template"].get("spec", {}) 

310 elif "jobTemplate" in spec: 

311 pod_spec = spec["jobTemplate"].get("spec", {}).get("template", {}).get("spec", {}) 

312 elif "containers" in spec: 

313 # Plain Pod manifest 

314 pod_spec = spec 

315 

316 if pod_spec: 

317 all_containers = _iter_containers(pod_spec) 

318 

319 # --- Pod-level security policy checks --- 

320 # Mirror manifest_processor._validate_security_context so the SQS 

321 # path enforces the same policy as the REST path. 

322 if BLOCK_HOST_NETWORK and pod_spec.get("hostNetwork", False): 

323 return False, "hostNetwork is not permitted" 

324 if BLOCK_HOST_PID and pod_spec.get("hostPID", False): 

325 return False, "hostPID is not permitted" 

326 if BLOCK_HOST_IPC and pod_spec.get("hostIPC", False): 

327 return False, "hostIPC is not permitted" 

328 if BLOCK_HOST_PATH: 

329 for volume in pod_spec.get("volumes", []) or []: 

330 if volume.get("hostPath") is not None: 330 ↛ 329line 330 didn't jump to line 329 because the condition on line 330 was always true

331 return False, "hostPath volumes are not permitted" 

332 

333 pod_security_context = pod_spec.get("securityContext", {}) or {} 

334 if BLOCK_PRIVILEGED and pod_security_context.get("privileged", False): 

335 return False, "privileged pod security context is not permitted" 

336 if BLOCK_RUN_AS_ROOT: 

337 pod_run_as_user = pod_security_context.get("runAsUser") 

338 if pod_run_as_user is not None and pod_run_as_user == 0: 

339 return False, "running as root (runAsUser: 0) is not permitted" 

340 

341 # --- Container-level security policy checks --- 

342 # Every toggle is applied to every container kind (regular, init, 

343 # ephemeral). An init container running as root or with CAP_SYS_ADMIN 

344 # has the same blast radius as a regular container running the same 

345 # way; there is no reason to give any kind a free pass. 

346 for kind, c in all_containers: 

347 cname = c.get("name", "unknown") 

348 sc = c.get("securityContext", {}) or {} 

349 if BLOCK_PRIVILEGED and sc.get("privileged", False): 

350 return False, f"{kind} '{cname}': privileged containers are not permitted" 

351 if BLOCK_PRIVILEGE_ESCALATION and sc.get("allowPrivilegeEscalation", False): 

352 return False, f"{kind} '{cname}': allowPrivilegeEscalation is not permitted" 

353 if BLOCK_ADDED_CAPABILITIES: 

354 added_caps = (sc.get("capabilities", {}) or {}).get("add", []) or [] 

355 if added_caps: 

356 return False, f"{kind} '{cname}': added capabilities are not permitted" 

357 if BLOCK_RUN_AS_ROOT: 

358 ras = sc.get("runAsUser") 

359 if ras is not None and ras == 0: 359 ↛ 346line 359 didn't jump to line 346 because the condition on line 359 was always true

360 return ( 

361 False, 

362 f"{kind} '{cname}': running as root (runAsUser: 0) is not permitted", 

363 ) 

364 

365 # Enforce image registry allowlist (matches manifest_processor semantics) 

366 for kind, c in all_containers: 

367 image = c.get("image", "") 

368 if not _is_image_trusted(image): 

369 cname = c.get("name", "unknown") 

370 return ( 

371 False, 

372 f"{kind} '{cname}': untrusted image source '{image}'", 

373 ) 

374 

375 # Enforce resource caps across ALL container kinds. 

376 # Sum the resource requests/limits of every container (regular, 

377 # init, and ephemeral). This is stricter than the K8s scheduler's 

378 # accounting but matches our security intent: an operator's 

379 # configured "max CPU/memory/GPU per manifest" is a hard cap on 

380 # the total resources a submitter can request regardless of 

381 # which container kind carries the request. 

382 total_gpu = 0 

383 total_cpu = 0 

384 total_memory = 0 

385 for _kind, c in all_containers: 

386 res = c.get("resources", {}) or {} 

387 limits = res.get("limits", {}) or {} 

388 requests = res.get("requests", {}) or {} 

389 gpu = limits.get("nvidia.com/gpu") or requests.get( 

390 "nvidia.com/gpu", "0" 

391 ) # nosec B113 - dict.get(), not HTTP requests 

392 total_gpu += int(gpu) 

393 cpu_str = limits.get("cpu") or requests.get( 

394 "cpu", "0" 

395 ) # nosec B113 - dict.get(), not HTTP requests 

396 if isinstance(cpu_str, str) and cpu_str.endswith("m"): 

397 total_cpu += int(cpu_str[:-1]) 

398 else: 

399 total_cpu += int(float(cpu_str) * 1000) 

400 mem_str = limits.get("memory") or requests.get( 

401 "memory", "0" 

402 ) # nosec B113 - dict.get(), not HTTP requests 

403 if isinstance(mem_str, str): 403 ↛ 413line 403 didn't jump to line 413 because the condition on line 403 was always true

404 if mem_str.endswith("Gi"): 404 ↛ 405line 404 didn't jump to line 405 because the condition on line 404 was never true

405 total_memory += int(float(mem_str[:-2]) * 1024**3) 

406 elif mem_str.endswith("Mi"): 

407 total_memory += int(float(mem_str[:-2]) * 1024**2) 

408 elif mem_str.endswith("Ki"): 408 ↛ 409line 408 didn't jump to line 409 because the condition on line 408 was never true

409 total_memory += int(float(mem_str[:-2]) * 1024) 

410 else: 

411 total_memory += int(mem_str) 

412 else: 

413 total_memory += int(mem_str) 

414 

415 errors = [] 

416 if total_gpu > MAX_GPU: 

417 errors.append(f"GPU {total_gpu} exceeds max {MAX_GPU}") 

418 if total_cpu > MAX_CPU: 

419 errors.append(f"CPU {total_cpu}m exceeds max {MAX_CPU}m") 

420 if total_memory > MAX_MEMORY: 420 ↛ 421line 420 didn't jump to line 421 because the condition on line 420 was never true

421 errors.append( 

422 f"Memory {total_memory / (1024**3):.0f}Gi " 

423 f"exceeds max {MAX_MEMORY / (1024**3):.0f}Gi" 

424 ) 

425 if errors: 

426 hint = ( 

427 "To raise limits, update queue_processor in cdk.json " 

428 "and redeploy (see examples/README.md#troubleshooting)" 

429 ) 

430 return False, "; ".join(errors) + f". {hint}" 

431 

432 return True, "" 

433 

434 

435def _extract_pod_spec(manifest: dict[str, Any]) -> dict[str, Any] | None: 

436 """Return the pod spec for any supported workload kind, or None. 

437 

438 Mirrors manifest_processor._extract_pod_spec so the SQS path and the 

439 REST path apply the same injection semantics. 

440 """ 

441 spec = manifest.get("spec") 

442 if not isinstance(spec, dict): 

443 return None 

444 

445 kind = manifest.get("kind", "") 

446 

447 # CronJob: spec.jobTemplate.spec.template.spec 

448 if kind == "CronJob": 

449 job_template = spec.get("jobTemplate") 

450 if isinstance(job_template, dict): 450 ↛ 458line 450 didn't jump to line 458 because the condition on line 450 was always true

451 job_spec = job_template.get("spec") 

452 if isinstance(job_spec, dict): 452 ↛ 458line 452 didn't jump to line 458 because the condition on line 452 was always true

453 template = job_spec.get("template") 

454 if isinstance(template, dict): 454 ↛ 458line 454 didn't jump to line 458 because the condition on line 454 was always true

455 pod_spec = template.get("spec") 

456 if isinstance(pod_spec, dict): 456 ↛ 458line 456 didn't jump to line 458 because the condition on line 456 was always true

457 return pod_spec 

458 return None 

459 

460 # Deployment / StatefulSet / DaemonSet / ReplicaSet / Job: spec.template.spec 

461 if "template" in spec: 

462 template = spec.get("template") 

463 if isinstance(template, dict): 463 ↛ 467line 463 didn't jump to line 467 because the condition on line 463 was always true

464 pod_spec = template.get("spec") 

465 if isinstance(pod_spec, dict): 465 ↛ 467line 465 didn't jump to line 467 because the condition on line 465 was always true

466 return pod_spec 

467 return None 

468 

469 # Bare Pod: spec contains "containers" directly 

470 if "containers" in spec: 470 ↛ 473line 470 didn't jump to line 473 because the condition on line 470 was always true

471 return spec 

472 

473 return None 

474 

475 

476def _inject_security_defaults(manifest: dict[str, Any]) -> dict[str, Any]: 

477 """Inject security defaults into a user-submitted manifest in-place. 

478 

479 Currently sets ``automountServiceAccountToken: false`` on the pod spec 

480 unless the user has explicitly set it either way (uses setdefault). 

481 

482 Mirrors manifest_processor._inject_security_defaults so jobs submitted 

483 via SQS get the same SA-token-theft protection as those submitted via 

484 the REST API. 

485 """ 

486 pod_spec = _extract_pod_spec(manifest) 

487 if pod_spec is not None: 

488 pod_spec.setdefault("automountServiceAccountToken", False) 

489 return manifest 

490 

491 

492def apply_manifest(m: dict[str, Any]) -> str: 

493 """Apply a single manifest using the dynamic Kubernetes client.""" 

494 # Inject security defaults BEFORE applying so user pods never 

495 # auto-mount the default SA token (T-022 / M-113 parity with the 

496 # REST manifest_processor path). 

497 _inject_security_defaults(m) 

498 

499 dyn = dynamic.DynamicClient(client.ApiClient()) 

500 api_version = m["apiVersion"] 

501 kind = m["kind"] 

502 name = m["metadata"]["name"] 

503 namespace = m["metadata"].get("namespace", "default") 

504 

505 try: 

506 resource = dyn.resources.get(api_version=api_version, kind=kind) 

507 except ResourceNotFoundError: 

508 return f"SKIP unknown resource {api_version}/{kind}" 

509 

510 # For Jobs, delete completed/failed ones first so re-submission works. 

511 # Without this, re-submitting the same job name would fail with a 409 conflict 

512 # because Kubernetes doesn't allow creating a Job with the same name as an 

513 # existing one (even if it's finished). 

514 if kind == "Job": 

515 try: 

516 existing = resource.get(name=name, namespace=namespace) 

517 conditions = existing.get("status", {}).get("conditions", []) 

518 finished = any(c.get("type") in ("Complete", "Failed") for c in conditions) 

519 if finished: 

520 log.info(f"Deleting finished Job {namespace}/{name} before re-creation") 

521 resource.delete( 

522 name=name, 

523 namespace=namespace, 

524 body=client.V1DeleteOptions(propagation_policy="Background"), 

525 ) 

526 time.sleep(2) 

527 except (NotFoundError, ApiException): 

528 pass 

529 

530 # Create-or-update pattern: try create first, fall back to patch on 409 (conflict). 

531 # This is idempotent — safe to retry without side effects. 

532 try: 

533 if resource.namespaced: 

534 resource.create(body=m, namespace=namespace) 

535 else: 

536 resource.create(body=m) 

537 return f"CREATED {kind}/{name}" 

538 except ApiException as e: 

539 if e.status == 409: 

540 try: 

541 if resource.namespaced: 

542 resource.patch(body=m, name=name, namespace=namespace) 

543 else: 

544 resource.patch(body=m, name=name) 

545 return f"UPDATED {kind}/{name}" 

546 except ApiException as patch_err: 

547 return f"PATCH_FAILED {kind}/{name}: {patch_err.reason}" 

548 return f"CREATE_FAILED {kind}/{name}: {e.reason}" 

549 

550 

551def process_one_message() -> bool: 

552 """Receive and process a single SQS message. Returns True on success.""" 

553 if not QUEUE_URL: 

554 log.error("JOB_QUEUE_URL not set") 

555 return False 

556 

557 sqs = boto3.client("sqs", region_name=REGION) 

558 

559 resp = sqs.receive_message( 

560 QueueUrl=QUEUE_URL, 

561 MaxNumberOfMessages=1, 

562 WaitTimeSeconds=5, 

563 MessageAttributeNames=["All"], 

564 ) 

565 

566 messages = resp.get("Messages", []) 

567 if not messages: 

568 log.info("No messages in queue") 

569 return True 

570 

571 msg = messages[0] 

572 receipt = msg["ReceiptHandle"] 

573 body = json.loads(msg["Body"]) 

574 

575 job_id = body.get("job_id", "unknown") 

576 manifests = body.get("manifests", []) 

577 log.info(f"Processing job_id={job_id}, manifests={len(manifests)}") 

578 

579 results: list[str] = [] 

580 failed = False 

581 for i, m in enumerate(manifests): 

582 ok, reason = validate_manifest(m) 

583 if not ok: 

584 log.error(f" manifest[{i}] validation failed: {reason}") 

585 results.append(f"INVALID: {reason}") 

586 failed = True 

587 continue 

588 result = apply_manifest(m) 

589 log.info(f" manifest[{i}]: {result}") 

590 results.append(result) 

591 if "FAILED" in result: 

592 failed = True 

593 

594 if failed: 

595 # Don't delete the SQS message — it will become visible again after the 

596 # visibility timeout (5 min) and retry. After 3 total failures, SQS 

597 # moves it to the dead-letter queue for manual inspection. 

598 log.error(f"Job {job_id} had failures — message will return to queue") 

599 return False 

600 

601 sqs.delete_message(QueueUrl=QUEUE_URL, ReceiptHandle=receipt) 

602 log.info(f"Job {job_id} processed successfully") 

603 return True 

604 

605 

606def main() -> None: 

607 """Entry point for the queue processor.""" 

608 load_k8s() 

609 success = process_one_message() 

610 if not success: 

611 sys.exit(1) 

612 

613 

614if __name__ == "__main__": 

615 main()