Coverage for gco/services/inference_monitor.py: 87%

525 statements  

« prev     ^ index     » next       coverage.py v7.14.1, created at 2026-06-15 15:07 +0000

1""" 

2Inference Monitor — reconciliation controller for inference endpoints. 

3 

4Runs in each regional EKS cluster and polls the global DynamoDB table 

5(gco-inference-endpoints) to reconcile desired state with actual 

6Kubernetes resources. Follows a GitOps-style reconciliation pattern: 

7 

8 DynamoDB (desired state) → inference_monitor → Kubernetes (actual state) 

9 

10The monitor: 

11- Creates Deployments, Services, and Ingress rules for new endpoints 

12- Updates existing deployments when spec changes 

13- Scales deployments up/down 

14- Tears down resources when endpoints are deleted 

15- Reports per-region status back to DynamoDB 

16 

17Environment Variables: 

18 CLUSTER_NAME: Name of the EKS cluster 

19 REGION: AWS region this monitor runs in 

20 INFERENCE_ENDPOINTS_TABLE_NAME: DynamoDB table name 

21 RECONCILE_INTERVAL_SECONDS: Seconds between reconciliation loops (default: 15) 

22 INFERENCE_NAMESPACE: Namespace for inference workloads (default: gco-inference) 

23""" 

24 

25import asyncio 

26import logging 

27import os 

28from datetime import UTC, datetime 

29from typing import Any 

30 

31from kubernetes import client, config 

32from kubernetes.client.models import V1Deployment 

33from kubernetes.client.rest import ApiException 

34 

35from gco.services.inference_store import InferenceEndpointStore 

36from gco.services.structured_logging import configure_structured_logging 

37 

38logging.basicConfig( 

39 level=logging.INFO, 

40 format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", 

41) 

42logger = logging.getLogger(__name__) 

43 

44 

45class InferenceMonitor: 

46 """ 

47 Reconciliation controller for inference endpoints. 

48 

49 Polls DynamoDB for desired endpoint state and reconciles with 

50 the actual Kubernetes resources in the local cluster. 

51 """ 

52 

53 def __init__( 

54 self, 

55 cluster_id: str, 

56 region: str, 

57 store: InferenceEndpointStore, 

58 namespace: str = "gco-inference", 

59 reconcile_interval: int = 15, 

60 ): 

61 self.cluster_id = cluster_id 

62 self.region = region 

63 self.store = store 

64 self.namespace = namespace 

65 self.reconcile_interval = reconcile_interval 

66 self._running = False 

67 

68 # Initialize Kubernetes clients 

69 try: 

70 config.load_incluster_config() 

71 logger.info("Loaded in-cluster Kubernetes configuration") 

72 except config.ConfigException: 

73 try: 

74 config.load_kube_config() 

75 logger.info("Loaded local Kubernetes configuration") 

76 except config.ConfigException as e: 

77 logger.error("Failed to load Kubernetes configuration: %s", e) 

78 raise 

79 

80 self.apps_v1 = client.AppsV1Api() 

81 self.core_v1 = client.CoreV1Api() 

82 self.networking_v1 = client.NetworkingV1Api() 

83 

84 # Timeout for Kubernetes API calls (seconds) 

85 self._k8s_timeout = int(os.environ.get("K8S_API_TIMEOUT", "30")) 

86 

87 # Health watchdog: tracks when each endpoint first became unready. 

88 # If an endpoint stays unready for longer than _ingress_removal_threshold, 

89 # the watchdog removes its Ingress to protect the shared ALB from 

90 # having an unhealthy target group (which would make GA mark the 

91 # entire ALB as unhealthy, blocking all inference in the region). 

92 self._unready_since: dict[str, datetime] = {} 

93 self._ingress_removal_threshold = int( 

94 os.environ.get("INFERENCE_UNHEALTHY_THRESHOLD_SECONDS", "300") 

95 ) # 5 minutes default 

96 

97 # Metrics 

98 self._reconcile_count = 0 

99 self._errors_count = 0 

100 

101 # ------------------------------------------------------------------ 

102 # Reconciliation loop 

103 # ------------------------------------------------------------------ 

104 

105 async def start(self) -> None: 

106 """Start the reconciliation loop with leader election. 

107 

108 Uses a Kubernetes Lease object for leader election so that only 

109 one replica reconciles at a time. Other replicas stay on standby 

110 and take over if the leader dies. 

111 """ 

112 if self._running: 

113 logger.warning("Inference monitor already running") 

114 return 

115 self._running = True 

116 logger.info( 

117 "Starting inference monitor for %s in %s (interval=%ds)", 

118 self.cluster_id, 

119 self.region, 

120 self.reconcile_interval, 

121 ) 

122 

123 # Namespace and ServiceAccount are pre-created by the kubectl-applier 

124 # at deploy time (00-namespaces.yaml, 01-serviceaccounts.yaml). The 

125 # inference-monitor SA has namespace-scoped RBAC only — it cannot 

126 # read_namespace/create_namespace, so we don't try. If the namespace 

127 # is ever missing, deployments below will fail with a clear 404. 

128 

129 # Get pod identity for leader election 

130 pod_name = os.environ.get("HOSTNAME", f"monitor-{id(self)}") 

131 lease_name = "inference-monitor-leader" 

132 

133 while self._running: 

134 try: 

135 if self._try_acquire_lease(lease_name, pod_name): 135 ↛ 138line 135 didn't jump to line 138 because the condition on line 135 was always true

136 await self.reconcile() 

137 else: 

138 logger.debug("Not the leader, waiting...") 

139 except Exception as e: 

140 logger.error("Reconciliation error: %s", e, exc_info=True) 

141 self._errors_count += 1 

142 try: 

143 await asyncio.sleep(self.reconcile_interval) 

144 except Exception as e: 

145 logger.error("Sleep interrupted: %s", e) 

146 break 

147 

148 def _try_acquire_lease(self, lease_name: str, holder: str) -> bool: 

149 """Try to acquire or renew a Kubernetes Lease for leader election. 

150 

151 Uses optimistic concurrency via resourceVersion — if two monitors 

152 race to update the same lease, K8s returns 409 Conflict for the 

153 loser, preventing split-brain. 

154 

155 Returns True if this instance is the leader. 

156 """ 

157 

158 coordination_v1 = client.CoordinationV1Api() 

159 now = datetime.now(UTC) 

160 

161 try: 

162 lease = coordination_v1.read_namespaced_lease(lease_name, self.namespace) 

163 current_holder = lease.spec.holder_identity 

164 renew_time = lease.spec.renew_time 

165 

166 # Check if lease is expired (holder hasn't renewed in 3x interval) 

167 if renew_time: 

168 elapsed = (now - renew_time.replace(tzinfo=UTC)).total_seconds() 

169 if elapsed > self.reconcile_interval * 3: 

170 # Lease expired — take over 

171 logger.info("Lease expired (held by %s), taking over", current_holder) 

172 current_holder = None 

173 

174 if current_holder == holder: 

175 # We're the leader — renew 

176 lease.spec.renew_time = now 

177 try: 

178 coordination_v1.replace_namespaced_lease(lease_name, self.namespace, lease) 

179 except ApiException as conflict: 

180 if conflict.status == 409: 

181 logger.debug("Lease renew conflict (another writer), retrying next cycle") 

182 return False 

183 raise 

184 return True 

185 if current_holder is None or current_holder == "": 

186 # No leader — claim it 

187 lease.spec.holder_identity = holder 

188 lease.spec.renew_time = now 

189 try: 

190 coordination_v1.replace_namespaced_lease(lease_name, self.namespace, lease) 

191 except ApiException as conflict: 

192 if conflict.status == 409: 

193 logger.info("Lost lease race to another monitor") 

194 return False 

195 raise 

196 logger.info("Acquired leader lease as %s", holder) 

197 return True 

198 # Someone else is the leader 

199 return False 

200 

201 except ApiException as e: 

202 if e.status == 404: 

203 # Lease doesn't exist — create it 

204 lease = client.V1Lease( 

205 metadata=client.V1ObjectMeta( 

206 name=lease_name, 

207 namespace=self.namespace, 

208 ), 

209 spec=client.V1LeaseSpec( 

210 holder_identity=holder, 

211 lease_duration_seconds=self.reconcile_interval * 3, 

212 renew_time=now, 

213 ), 

214 ) 

215 try: 

216 coordination_v1.create_namespaced_lease(self.namespace, lease) 

217 logger.info("Created leader lease as %s", holder) 

218 return True 

219 except ApiException: 

220 return False 

221 logger.warning("Lease check failed: %s", e.reason) 

222 return False 

223 

224 def stop(self) -> None: 

225 """Stop the reconciliation loop.""" 

226 self._running = False 

227 logger.info("Inference monitor stopped") 

228 

229 async def reconcile(self) -> list[dict[str, Any]]: 

230 """ 

231 Run one reconciliation cycle. 

232 

233 Returns a list of actions taken (for logging/testing). 

234 """ 

235 self._reconcile_count += 1 

236 actions: list[dict[str, Any]] = [] 

237 

238 # Get all endpoints from DynamoDB 

239 try: 

240 endpoints = self.store.list_endpoints() 

241 except Exception as e: 

242 logger.error("Failed to list endpoints from DynamoDB: %s", e) 

243 return actions 

244 

245 for endpoint in endpoints: 

246 try: 

247 action = await self._reconcile_endpoint(endpoint) 

248 if action: 248 ↛ 245line 248 didn't jump to line 245 because the condition on line 248 was always true

249 actions.append(action) 

250 except Exception as e: 

251 name = endpoint.get("endpoint_name", "unknown") 

252 logger.error("Failed to reconcile endpoint %s: %s", name, e) 

253 self._errors_count += 1 

254 self.store.update_region_status( 

255 name, 

256 self.region, 

257 "error", 

258 error=str(e), 

259 ) 

260 

261 # Purge fully-deleted endpoints from DynamoDB to prevent unbounded growth. 

262 # An endpoint is fully deleted when desired_state is "deleted" and all 

263 # target regions report "deleted" status. 

264 for endpoint in endpoints: 

265 if endpoint.get("desired_state") != "deleted": 265 ↛ 267line 265 didn't jump to line 267 because the condition on line 265 was always true

266 continue 

267 region_status = endpoint.get("region_status", {}) 

268 target_regions = endpoint.get("target_regions", []) 

269 if not target_regions: 

270 continue 

271 all_deleted = all( 

272 isinstance(region_status.get(r), dict) 

273 and region_status.get(r, {}).get("state") == "deleted" 

274 for r in target_regions 

275 ) 

276 if all_deleted: 

277 ep_name = endpoint["endpoint_name"] 

278 try: 

279 self.store.delete_endpoint(ep_name) 

280 logger.info("Purged fully-deleted endpoint %s from DynamoDB", ep_name) 

281 actions.append({"action": "purge", "endpoint": ep_name}) 

282 except Exception as e: 

283 logger.warning("Failed to purge endpoint %s: %s", ep_name, e) 

284 

285 return actions 

286 

287 async def _reconcile_endpoint(self, endpoint: dict[str, Any]) -> dict[str, Any] | None: 

288 """Reconcile a single endpoint.""" 

289 name = endpoint["endpoint_name"] 

290 desired_state = endpoint.get("desired_state", "deploying") 

291 target_regions = endpoint.get("target_regions", []) 

292 spec = endpoint.get("spec", {}) 

293 ns = endpoint.get("namespace", self.namespace) 

294 

295 # Am I a target region? 

296 if self.region not in target_regions: 

297 # If I have resources for this endpoint, clean them up 

298 if self._deployment_exists(name, ns): 

299 logger.info( 

300 "Endpoint %s no longer targets %s, cleaning up", 

301 name, 

302 self.region, 

303 ) 

304 self._delete_resources(name, ns) 

305 self.store.update_region_status( 

306 name, 

307 self.region, 

308 "deleted", 

309 ) 

310 return {"action": "cleanup", "endpoint": name, "reason": "region_removed"} 

311 return None 

312 

313 # Reconcile based on desired state 

314 if desired_state in ("deploying", "running"): 

315 return await self._reconcile_running(name, ns, spec, endpoint) 

316 if desired_state == "stopped": 

317 return self._reconcile_stopped(name, ns) 

318 if desired_state == "deleted": 

319 return self._reconcile_deleted(name, ns) 

320 

321 return None 

322 

323 async def _reconcile_running( 

324 self, 

325 name: str, 

326 namespace: str, 

327 spec: dict[str, Any], 

328 endpoint: dict[str, Any], 

329 ) -> dict[str, Any] | None: 

330 """Ensure the endpoint is running with the correct spec.""" 

331 deployment = self._get_deployment(name, namespace) 

332 

333 if deployment is None: 

334 # Create everything 

335 logger.info("Creating endpoint %s in %s", name, self.region) 

336 self._create_deployment(name, namespace, spec) 

337 self._create_service(name, namespace, spec) 

338 self._update_ingress_rule(name, namespace, spec, endpoint) 

339 if spec.get("autoscaling", {}).get("enabled"): 

340 self._create_or_update_hpa(name, namespace, spec) 

341 self.store.update_region_status( 

342 name, 

343 self.region, 

344 "creating", 

345 replicas_desired=spec.get("replicas", 1), 

346 ) 

347 return {"action": "create", "endpoint": name} 

348 

349 # Deployment exists — ensure Service and Ingress also exist 

350 # (they may have been manually deleted or lost during a rollout) 

351 self._ensure_service(name, namespace, spec) 

352 

353 # Check readiness before ensuring Ingress — the health watchdog may 

354 # remove the Ingress if the endpoint has been unready too long 

355 desired_replicas = spec.get("replicas", 1) 

356 current_replicas = deployment.spec.replicas or 1 

357 ready_replicas = deployment.status.ready_replicas or 0 

358 

359 ingress_removed = self._check_health_watchdog( 

360 name, namespace, ready_replicas, desired_replicas, spec, endpoint 

361 ) 

362 if not ingress_removed: 362 ↛ 365line 362 didn't jump to line 365 because the condition on line 362 was always true

363 self._ensure_ingress(name, namespace, spec, endpoint) 

364 

365 if current_replicas != desired_replicas: 

366 logger.info( 

367 "Scaling endpoint %s: %d → %d replicas", 

368 name, 

369 current_replicas, 

370 desired_replicas, 

371 ) 

372 self._scale_deployment(name, namespace, desired_replicas) 

373 self.store.update_region_status( 

374 name, 

375 self.region, 

376 "updating", 

377 replicas_ready=ready_replicas, 

378 replicas_desired=desired_replicas, 

379 ) 

380 return {"action": "scale", "endpoint": name, "replicas": desired_replicas} 

381 

382 # Check if image changed 

383 current_image = self._get_deployment_image(deployment) 

384 desired_image = self._resolve_image_for_region(spec) if spec.get("image") else "" 

385 if current_image and desired_image and current_image != desired_image: 

386 logger.info("Updating endpoint %s image: %s → %s", name, current_image, desired_image) 

387 self._update_deployment_image(name, namespace, desired_image) 

388 self.store.update_region_status( 

389 name, 

390 self.region, 

391 "updating", 

392 replicas_ready=ready_replicas, 

393 replicas_desired=desired_replicas, 

394 ) 

395 return {"action": "update_image", "endpoint": name, "image": desired_image} 

396 

397 # Everything is in sync — report status 

398 state = "running" if ready_replicas >= desired_replicas else "creating" 

399 self.store.update_region_status( 

400 name, 

401 self.region, 

402 state, 

403 replicas_ready=ready_replicas, 

404 replicas_desired=desired_replicas, 

405 ) 

406 

407 # Reconcile canary deployment if present 

408 canary = spec.get("canary") 

409 if canary: 409 ↛ 410line 409 didn't jump to line 410 because the condition on line 409 was never true

410 self._reconcile_canary(name, namespace, spec, canary, endpoint) 

411 else: 

412 # No canary — clean up canary resources if they exist 

413 self._cleanup_canary(name, namespace) 

414 

415 # If all replicas are ready and desired_state is "deploying", promote to "running" 

416 if state == "running" and endpoint.get("desired_state") == "deploying": 

417 # Check if all target regions are running 

418 all_running = True 

419 for r_status in endpoint.get("region_status", {}).values(): 

420 if isinstance(r_status, dict) and r_status.get("state") != "running": 420 ↛ 421line 420 didn't jump to line 421 because the condition on line 420 was never true

421 all_running = False 

422 break 

423 if all_running: 423 ↛ 426line 423 didn't jump to line 426 because the condition on line 423 was always true

424 self.store.update_desired_state(name, "running") 

425 

426 return None 

427 

428 def _reconcile_stopped(self, name: str, namespace: str) -> dict[str, Any] | None: 

429 """Scale deployment to zero.""" 

430 deployment = self._get_deployment(name, namespace) 

431 if deployment is None: 

432 return None 

433 

434 current_replicas = deployment.spec.replicas or 0 

435 if current_replicas > 0: 

436 logger.info("Stopping endpoint %s (scaling to 0)", name) 

437 self._scale_deployment(name, namespace, 0) 

438 self.store.update_region_status( 

439 name, 

440 self.region, 

441 "stopped", 

442 replicas_ready=0, 

443 replicas_desired=0, 

444 ) 

445 return {"action": "stop", "endpoint": name} 

446 

447 self.store.update_region_status( 

448 name, 

449 self.region, 

450 "stopped", 

451 replicas_ready=0, 

452 replicas_desired=0, 

453 ) 

454 return None 

455 

456 def _reconcile_deleted(self, name: str, namespace: str) -> dict[str, Any] | None: 

457 """Delete all resources for the endpoint.""" 

458 # Clean up health watchdog tracker 

459 self._unready_since.pop(name, None) 

460 

461 if self._deployment_exists(name, namespace): 

462 logger.info("Deleting endpoint %s from %s", name, self.region) 

463 self._delete_resources(name, namespace) 

464 self.store.update_region_status(name, self.region, "deleted") 

465 return {"action": "delete", "endpoint": name} 

466 

467 self.store.update_region_status(name, self.region, "deleted") 

468 return None 

469 

470 # ------------------------------------------------------------------ 

471 # Kubernetes resource management 

472 # ------------------------------------------------------------------ 

473 

474 def _deployment_exists(self, name: str, namespace: str) -> bool: 

475 try: 

476 self.apps_v1.read_namespaced_deployment( 

477 name, namespace, _request_timeout=self._k8s_timeout 

478 ) 

479 return True 

480 except ApiException as e: 

481 if e.status == 404: 

482 return False 

483 raise 

484 

485 def _get_deployment(self, name: str, namespace: str) -> V1Deployment | None: 

486 try: 

487 return self.apps_v1.read_namespaced_deployment( 

488 name, namespace, _request_timeout=self._k8s_timeout 

489 ) 

490 except ApiException as e: 

491 if e.status == 404: 

492 return None 

493 raise 

494 

495 def _get_deployment_image(self, deployment: V1Deployment) -> str | None: 

496 """Get the image of the first container in a deployment.""" 

497 containers = deployment.spec.template.spec.containers 

498 if containers: 

499 image: str = containers[0].image 

500 return image 

501 return None 

502 

503 def _resolve_image_for_region(self, spec: dict[str, Any]) -> str: 

504 """Pick the image URI this region should pull from. 

505 

506 ``cli.inference.InferenceManager.deploy`` populates 

507 ``spec["region_image_uris"]`` with a per-region map when the 

508 primary image is an ECR URI, so each cluster can pull from its 

509 local replica instead of crossing the WAN. The map is omitted 

510 for non-ECR refs and for deploys with ``rewrite_image=False``, 

511 in which case we fall back to the flat ``spec["image"]`` URI. 

512 

513 When the map is present but lacks an entry for ``self.region`` 

514 (a target region was added after the spec was last written), 

515 the flat URI is also used so the deployment doesn't break — the 

516 next reconcile after a fresh deploy picks up the right URI. 

517 """ 

518 region_map = spec.get("region_image_uris") 

519 if isinstance(region_map, dict): 

520 uri = region_map.get(self.region) 

521 if isinstance(uri, str) and uri: 

522 return uri 

523 return str(spec["image"]) 

524 

525 def _create_deployment(self, name: str, namespace: str, spec: dict[str, Any]) -> None: 

526 """Create a Kubernetes Deployment for an inference endpoint.""" 

527 replicas = spec.get("replicas", 1) 

528 image = self._resolve_image_for_region(spec) 

529 port = spec.get("port", 8000) 

530 gpu_count = spec.get("gpu_count", 1) 

531 health_path = spec.get("health_check_path", "/health") 

532 env_vars = spec.get("env", {}) 

533 resources = spec.get("resources", {}) 

534 model_path = spec.get("model_path") 

535 command = spec.get("command") 

536 args = spec.get("args") 

537 

538 # Build container 

539 container_env = [client.V1EnvVar(name=k, value=str(v)) for k, v in env_vars.items()] 

540 

541 # Inject --root-path for servers that support it (vLLM, TGI). 

542 # This tells the server to mount its API at /inference/{name}. 

543 # We append to existing args (from --extra-args) rather than replacing them. 

544 ingress_prefix = f"/inference/{name}" 

545 root_path_images = ("vllm", "text-generation-inference", "tgi") 

546 image_lower = image.lower() 

547 if not command and any(tag in image_lower for tag in root_path_images): 

548 if args: 548 ↛ 550line 548 didn't jump to line 550 because the condition on line 548 was never true

549 # Append --root-path to user-provided args if not already present 

550 if "--root-path" not in args: 

551 args = list(args) + ["--root-path", ingress_prefix] 

552 else: 

553 args = ["--root-path", ingress_prefix] 

554 

555 resource_reqs = client.V1ResourceRequirements( 

556 requests=resources.get("requests", {"cpu": "1", "memory": "4Gi"}), 

557 limits=resources.get("limits", {"cpu": "4", "memory": "16Gi"}), 

558 ) 

559 # Add accelerator resources (GPU or Neuron) 

560 accelerator = spec.get("accelerator", "nvidia") 

561 if gpu_count > 0: 

562 if accelerator == "neuron": 

563 # AWS Trainium/Inferentia — request Neuron devices 

564 if resource_reqs.limits is None: 564 ↛ 565line 564 didn't jump to line 565 because the condition on line 564 was never true

565 resource_reqs.limits = {} 

566 resource_reqs.limits["aws.amazon.com/neuron"] = str(gpu_count) 

567 if resource_reqs.requests is None: 567 ↛ 568line 567 didn't jump to line 568 because the condition on line 567 was never true

568 resource_reqs.requests = {} 

569 resource_reqs.requests["aws.amazon.com/neuron"] = str(gpu_count) 

570 else: 

571 # NVIDIA GPU (default) 

572 if resource_reqs.limits is None: 572 ↛ 573line 572 didn't jump to line 573 because the condition on line 572 was never true

573 resource_reqs.limits = {} 

574 resource_reqs.limits["nvidia.com/gpu"] = str(gpu_count) 

575 if resource_reqs.requests is None: 575 ↛ 576line 575 didn't jump to line 576 because the condition on line 575 was never true

576 resource_reqs.requests = {} 

577 resource_reqs.requests["nvidia.com/gpu"] = str(gpu_count) 

578 

579 volume_mounts = [] 

580 volumes = [] 

581 init_containers = [] 

582 model_source = spec.get("model_source") 

583 

584 if model_path or model_source: 

585 volume_mounts.append( 

586 client.V1VolumeMount( 

587 name="model-storage", 

588 mount_path="/models", 

589 ) 

590 ) 

591 volumes.append( 

592 client.V1Volume( 

593 name="model-storage", 

594 persistent_volume_claim=client.V1PersistentVolumeClaimVolumeSource( 

595 claim_name="efs-claim", 

596 ), 

597 ) 

598 ) 

599 

600 # Add init container to sync model from S3 if model_source is set 

601 if model_source and model_source.startswith("s3://"): 

602 model_dest = f"/models/{name}" 

603 init_containers.append( 

604 client.V1Container( 

605 name="model-sync", 

606 image="amazon/aws-cli:latest", 

607 command=["sh", "-c"], 

608 args=[ 

609 f"if [ -d '{model_dest}' ] && [ \"$(ls -A '{model_dest}')\" ]; then " 

610 f"echo 'Model already cached at {model_dest}, skipping sync'; " 

611 f"else echo 'Syncing model from {model_source}...'; " 

612 f"aws s3 sync {model_source} {model_dest} --quiet; " 

613 f"echo 'Model sync complete'; fi" 

614 ], 

615 volume_mounts=[ 

616 client.V1VolumeMount( 

617 name="model-storage", 

618 mount_path="/models", 

619 ) 

620 ], 

621 resources=client.V1ResourceRequirements( 

622 requests={"cpu": "1", "memory": "2Gi"}, 

623 limits={"cpu": "4", "memory": "8Gi"}, 

624 ), 

625 ) 

626 ) 

627 

628 # Probe path depends on whether the server handles the prefix 

629 uses_root_path = args is not None and "--root-path" in args 

630 probe_health = f"{ingress_prefix}{health_path}" if uses_root_path else health_path 

631 

632 container = client.V1Container( 

633 name="inference", 

634 image=image, 

635 ports=[client.V1ContainerPort(container_port=port)], 

636 env=container_env if container_env else None, 

637 resources=resource_reqs, 

638 volume_mounts=volume_mounts if volume_mounts else None, 

639 command=command, 

640 args=args, 

641 liveness_probe=client.V1Probe( 

642 http_get=client.V1HTTPGetAction(path=probe_health, port=port), 

643 initial_delay_seconds=120, 

644 period_seconds=15, 

645 failure_threshold=5, 

646 ), 

647 readiness_probe=client.V1Probe( 

648 http_get=client.V1HTTPGetAction(path=probe_health, port=port), 

649 initial_delay_seconds=30, 

650 period_seconds=10, 

651 ), 

652 ) 

653 

654 # Build tolerations based on accelerator type 

655 if accelerator == "neuron": 

656 tolerations = [ 

657 client.V1Toleration( 

658 key="aws.amazon.com/neuron", 

659 operator="Equal", 

660 value="true", 

661 effect="NoSchedule", 

662 ) 

663 ] 

664 else: 

665 tolerations = [ 

666 client.V1Toleration( 

667 key="nvidia.com/gpu", 

668 operator="Equal", 

669 value="true", 

670 effect="NoSchedule", 

671 ) 

672 ] 

673 

674 # Node selector based on accelerator type 

675 node_selector = spec.get("node_selector", {}) 

676 if gpu_count > 0 and not node_selector: 

677 if accelerator == "neuron": 

678 node_selector = {"accelerator": "neuron"} 

679 else: 

680 node_selector = {"eks.amazonaws.com/instance-gpu-manufacturer": "nvidia"} 

681 

682 # Apply capacity type preference (spot/on-demand) 

683 capacity_type = spec.get("capacity_type") 

684 if capacity_type in ("spot", "on-demand"): 

685 node_selector["karpenter.sh/capacity-type"] = capacity_type 

686 

687 deployment = client.V1Deployment( 

688 metadata=client.V1ObjectMeta( 

689 name=name, 

690 namespace=namespace, 

691 labels={ 

692 "app": name, 

693 "project": "gco", 

694 "gco.io/type": "inference", 

695 }, 

696 ), 

697 spec=client.V1DeploymentSpec( 

698 replicas=replicas, 

699 selector=client.V1LabelSelector( 

700 match_labels={"app": name}, 

701 ), 

702 template=client.V1PodTemplateSpec( 

703 metadata=client.V1ObjectMeta( 

704 labels={ 

705 "app": name, 

706 "project": "gco", 

707 "gco.io/type": "inference", 

708 }, 

709 ), 

710 spec=client.V1PodSpec( 

711 service_account_name="gco-service-account", 

712 containers=[container], 

713 init_containers=init_containers if init_containers else None, 

714 tolerations=tolerations, 

715 node_selector=node_selector if node_selector else None, 

716 volumes=volumes if volumes else None, 

717 ), 

718 ), 

719 ), 

720 ) 

721 

722 self.apps_v1.create_namespaced_deployment( 

723 namespace, deployment, _request_timeout=self._k8s_timeout 

724 ) 

725 logger.info("Created deployment %s/%s", namespace, name) 

726 

727 def _create_service(self, name: str, namespace: str, spec: dict[str, Any]) -> None: 

728 """Create a Kubernetes Service for an inference endpoint.""" 

729 port = spec.get("port", 8000) 

730 

731 service = client.V1Service( 

732 metadata=client.V1ObjectMeta( 

733 name=name, 

734 namespace=namespace, 

735 labels={ 

736 "app": name, 

737 "project": "gco", 

738 "gco.io/type": "inference", 

739 }, 

740 ), 

741 spec=client.V1ServiceSpec( 

742 selector={"app": name}, 

743 ports=[ 

744 client.V1ServicePort( 

745 port=80, 

746 target_port=port, 

747 protocol="TCP", 

748 ) 

749 ], 

750 type="ClusterIP", 

751 ), 

752 ) 

753 

754 try: 

755 self.core_v1.create_namespaced_service( 

756 namespace, service, _request_timeout=self._k8s_timeout 

757 ) 

758 logger.info("Created service %s/%s", namespace, name) 

759 except ApiException as e: 

760 if e.status == 409: 760 ↛ 763line 760 didn't jump to line 763 because the condition on line 760 was always true

761 logger.info("Service %s/%s already exists", namespace, name) 

762 else: 

763 raise 

764 

765 def _ensure_service(self, name: str, namespace: str, spec: dict[str, Any]) -> None: 

766 """Ensure the Service exists, recreating it if missing.""" 

767 try: 

768 self.core_v1.read_namespaced_service( 

769 name, namespace, _request_timeout=self._k8s_timeout 

770 ) 

771 except ApiException as e: 

772 if e.status == 404: 

773 logger.warning("Service %s/%s missing, recreating", namespace, name) 

774 self._create_service(name, namespace, spec) 

775 else: 

776 raise 

777 

778 def _ensure_ingress( 

779 self, 

780 name: str, 

781 namespace: str, 

782 spec: dict[str, Any], 

783 endpoint: dict[str, Any], 

784 ) -> None: 

785 """Ensure the Ingress exists, recreating it if missing.""" 

786 try: 

787 self.networking_v1.read_namespaced_ingress( 

788 f"inference-{name}", namespace, _request_timeout=self._k8s_timeout 

789 ) 

790 except ApiException as e: 

791 if e.status == 404: 

792 logger.warning("Ingress for %s missing, recreating", name) 

793 self._update_ingress_rule(name, namespace, spec, endpoint) 

794 else: 

795 raise 

796 

797 def _update_ingress_rule( 

798 self, 

799 name: str, 

800 namespace: str, 

801 spec: dict[str, Any], 

802 endpoint: dict[str, Any], 

803 ) -> None: 

804 """Create or update an Ingress for the inference endpoint. 

805 

806 The Ingress is created in the same namespace as the Service and pods. 

807 IngressClassParams with group.name merges all Ingresses onto a single 

808 shared ALB regardless of namespace. 

809 """ 

810 ingress_path = endpoint.get("ingress_path", f"/inference/{name}") 

811 image = spec.get("image", "") 

812 image_lower = image.lower() 

813 root_path_images = ("vllm", "text-generation-inference", "tgi") 

814 uses_root_path = any(tag in image_lower for tag in root_path_images) 

815 base_health = spec.get("health_check_path", "/health") 

816 health_path = f"/inference/{name}{base_health}" if uses_root_path else base_health 

817 

818 ingress = client.V1Ingress( 

819 metadata=client.V1ObjectMeta( 

820 name=f"inference-{name}", 

821 namespace=namespace, 

822 labels={ 

823 "app": name, 

824 "project": "gco", 

825 "gco.io/type": "inference", 

826 }, 

827 annotations={ 

828 "alb.ingress.kubernetes.io/healthcheck-path": health_path, 

829 "alb.ingress.kubernetes.io/healthcheck-interval-seconds": "15", 

830 }, 

831 ), 

832 spec=client.V1IngressSpec( 

833 ingress_class_name="alb", 

834 rules=[ 

835 client.V1IngressRule( 

836 http=client.V1HTTPIngressRuleValue( 

837 paths=[ 

838 client.V1HTTPIngressPath( 

839 path=ingress_path, 

840 path_type="Prefix", 

841 backend=client.V1IngressBackend( 

842 service=client.V1IngressServiceBackend( 

843 name=name, 

844 port=client.V1ServiceBackendPort( 

845 number=80, 

846 ), 

847 ), 

848 ), 

849 ) 

850 ] 

851 ) 

852 ) 

853 ], 

854 ), 

855 ) 

856 

857 try: 

858 self.networking_v1.create_namespaced_ingress( 

859 namespace, ingress, _request_timeout=self._k8s_timeout 

860 ) 

861 logger.info("Created ingress for %s at %s", name, ingress_path) 

862 except ApiException as e: 

863 if e.status == 409: 863 ↛ 869line 863 didn't jump to line 869 because the condition on line 863 was always true

864 self.networking_v1.patch_namespaced_ingress( 

865 f"inference-{name}", namespace, ingress, _request_timeout=self._k8s_timeout 

866 ) 

867 logger.info("Updated ingress for %s", name) 

868 else: 

869 raise 

870 

871 def _check_health_watchdog( 

872 self, 

873 name: str, 

874 namespace: str, 

875 ready_replicas: int, 

876 desired_replicas: int, 

877 spec: dict[str, Any], 

878 endpoint: dict[str, Any], 

879 ) -> bool: 

880 """Health watchdog: remove Ingress for persistently unhealthy endpoints. 

881 

882 If an endpoint has zero ready replicas for longer than the configured 

883 threshold, the watchdog removes its Ingress to protect the shared ALB. 

884 Global Accelerator considers an ALB unhealthy if ANY target group has 

885 zero healthy targets, so one bad endpoint can block all inference 

886 traffic to the region. 

887 

888 When the endpoint recovers (ready_replicas > 0), the Ingress is 

889 automatically re-created by _ensure_ingress on the next cycle. 

890 

891 Returns: 

892 True if the Ingress was removed (caller should skip _ensure_ingress). 

893 False if the endpoint is healthy or still within the grace period. 

894 """ 

895 if ready_replicas > 0: 

896 # Endpoint is healthy — clear the tracker 

897 if name in self._unready_since: 

898 logger.info( 

899 "Endpoint %s recovered, re-enabling Ingress", 

900 name, 

901 ) 

902 del self._unready_since[name] 

903 return False 

904 

905 # Endpoint has zero ready replicas 

906 now = datetime.now(UTC) 

907 

908 if name not in self._unready_since: 

909 # First time seeing this endpoint as unready — start the clock 

910 self._unready_since[name] = now 

911 logger.warning( 

912 "Endpoint %s has 0/%d ready replicas, starting health watchdog timer", 

913 name, 

914 desired_replicas, 

915 ) 

916 return False 

917 

918 # Check how long it's been unready 

919 unready_duration = (now - self._unready_since[name]).total_seconds() 

920 

921 if unready_duration < self._ingress_removal_threshold: 

922 remaining = self._ingress_removal_threshold - unready_duration 

923 logger.warning( 

924 "Endpoint %s unready for %ds (removing Ingress in %ds)", 

925 name, 

926 int(unready_duration), 

927 int(remaining), 

928 ) 

929 return False 

930 

931 # Threshold exceeded — remove the Ingress to protect the ALB 

932 ingress_name = f"inference-{name}" 

933 try: 

934 self.networking_v1.delete_namespaced_ingress( 

935 ingress_name, namespace, _request_timeout=self._k8s_timeout 

936 ) 

937 logger.warning( 

938 "WATCHDOG: Removed Ingress for unhealthy endpoint %s " 

939 "(unready for %ds > %ds threshold). " 

940 "Ingress will be re-created when the endpoint recovers.", 

941 name, 

942 int(unready_duration), 

943 self._ingress_removal_threshold, 

944 ) 

945 except ApiException as e: 

946 if e.status == 404: 

947 logger.debug("Ingress for %s already removed", name) 

948 else: 

949 logger.error("Failed to remove Ingress for %s: %s", name, e) 

950 

951 return True 

952 

953 def _scale_deployment(self, name: str, namespace: str, replicas: int) -> None: 

954 """Scale a deployment to the desired replica count.""" 

955 self.apps_v1.patch_namespaced_deployment( 

956 name, 

957 namespace, 

958 body={"spec": {"replicas": replicas}}, 

959 _request_timeout=self._k8s_timeout, 

960 ) 

961 

962 def _update_deployment_image(self, name: str, namespace: str, image: str) -> None: 

963 """Update the container image of a deployment.""" 

964 self.apps_v1.patch_namespaced_deployment( 

965 name, 

966 namespace, 

967 body={ 

968 "spec": { 

969 "template": {"spec": {"containers": [{"name": "inference", "image": image}]}} 

970 } 

971 }, 

972 _request_timeout=self._k8s_timeout, 

973 ) 

974 

975 def _reconcile_canary( 

976 self, 

977 name: str, 

978 namespace: str, 

979 spec: dict[str, Any], 

980 canary: dict[str, Any], 

981 endpoint: dict[str, Any], 

982 ) -> None: 

983 """Reconcile canary deployment and weighted ingress routing. 

984 

985 Creates a canary deployment and service alongside the primary, 

986 then updates the ingress to use ALB action-based weighted routing. 

987 """ 

988 canary_name = f"{name}-canary" 

989 canary_image = canary.get("image", "") 

990 canary_replicas = canary.get("replicas", 1) 

991 canary_weight = canary.get("weight", 10) 

992 primary_weight = 100 - canary_weight 

993 

994 # Build canary spec (same as primary but with canary image/replicas) 

995 canary_spec = dict(spec) 

996 canary_spec["image"] = canary_image 

997 canary_spec["replicas"] = canary_replicas 

998 # Remove canary field from the canary spec to avoid recursion 

999 canary_spec.pop("canary", None) 

1000 

1001 # Create or update canary deployment 

1002 canary_deployment = self._get_deployment(canary_name, namespace) 

1003 if canary_deployment is None: 

1004 logger.info("Creating canary deployment %s with image %s", canary_name, canary_image) 

1005 self._create_deployment(canary_name, namespace, canary_spec) 

1006 self._create_service(canary_name, namespace, canary_spec) 

1007 else: 

1008 # Update image if changed 

1009 current_image = self._get_deployment_image(canary_deployment) 

1010 if current_image != canary_image: 

1011 self._update_deployment_image(canary_name, namespace, canary_image) 

1012 # Update replicas if changed 

1013 if (canary_deployment.spec.replicas or 1) != canary_replicas: 

1014 self._scale_deployment(canary_name, namespace, canary_replicas) 

1015 

1016 # Update ingress with weighted routing via ALB actions annotation 

1017 self._update_canary_ingress(name, namespace, spec, endpoint, primary_weight, canary_weight) 

1018 

1019 def _update_canary_ingress( 

1020 self, 

1021 name: str, 

1022 namespace: str, 

1023 spec: dict[str, Any], 

1024 endpoint: dict[str, Any], 

1025 primary_weight: int, 

1026 canary_weight: int, 

1027 ) -> None: 

1028 """Update ingress with ALB weighted target group routing.""" 

1029 import json as _json 

1030 

1031 ingress_path = endpoint.get("ingress_path", f"/inference/{name}") 

1032 image = spec.get("image", "") 

1033 image_lower = image.lower() 

1034 root_path_images = ("vllm", "text-generation-inference", "tgi") 

1035 uses_root_path = any(tag in image_lower for tag in root_path_images) 

1036 base_health = spec.get("health_check_path", "/health") 

1037 health_path = f"/inference/{name}{base_health}" if uses_root_path else base_health 

1038 

1039 # ALB weighted routing via forward action annotation 

1040 forward_config = _json.dumps( 

1041 { 

1042 "type": "forward", 

1043 "forwardConfig": { 

1044 "targetGroups": [ 

1045 { 

1046 "serviceName": name, 

1047 "servicePort": 80, 

1048 "weight": primary_weight, 

1049 }, 

1050 { 

1051 "serviceName": f"{name}-canary", 

1052 "servicePort": 80, 

1053 "weight": canary_weight, 

1054 }, 

1055 ] 

1056 }, 

1057 } 

1058 ) 

1059 

1060 ingress = client.V1Ingress( 

1061 metadata=client.V1ObjectMeta( 

1062 name=f"inference-{name}", 

1063 namespace=namespace, 

1064 labels={ 

1065 "app": name, 

1066 "project": "gco", 

1067 "gco.io/type": "inference", 

1068 "gco.io/canary": "true", 

1069 }, 

1070 annotations={ 

1071 "alb.ingress.kubernetes.io/healthcheck-path": health_path, 

1072 "alb.ingress.kubernetes.io/healthcheck-interval-seconds": "15", 

1073 "alb.ingress.kubernetes.io/actions.weighted-routing": forward_config, 

1074 }, 

1075 ), 

1076 spec=client.V1IngressSpec( 

1077 ingress_class_name="alb", 

1078 rules=[ 

1079 client.V1IngressRule( 

1080 http=client.V1HTTPIngressRuleValue( 

1081 paths=[ 

1082 client.V1HTTPIngressPath( 

1083 path=ingress_path, 

1084 path_type="Prefix", 

1085 backend=client.V1IngressBackend( 

1086 service=client.V1IngressServiceBackend( 

1087 name="weighted-routing", 

1088 port=client.V1ServiceBackendPort( 

1089 name="use-annotation", 

1090 ), 

1091 ), 

1092 ), 

1093 ) 

1094 ] 

1095 ) 

1096 ) 

1097 ], 

1098 ), 

1099 ) 

1100 

1101 try: 

1102 self.networking_v1.patch_namespaced_ingress( 

1103 f"inference-{name}", namespace, ingress, _request_timeout=self._k8s_timeout 

1104 ) 

1105 logger.info( 

1106 "Updated ingress for %s: primary=%d%% canary=%d%%", 

1107 name, 

1108 primary_weight, 

1109 canary_weight, 

1110 ) 

1111 except ApiException as e: 

1112 if e.status == 404: 

1113 self.networking_v1.create_namespaced_ingress( 

1114 namespace, ingress, _request_timeout=self._k8s_timeout 

1115 ) 

1116 logger.info("Created canary ingress for %s", name) 

1117 else: 

1118 raise 

1119 

1120 def _cleanup_canary(self, name: str, namespace: str) -> None: 

1121 """Remove canary deployment, service, and restore primary-only ingress.""" 

1122 canary_name = f"{name}-canary" 

1123 

1124 # Delete canary deployment 

1125 try: 

1126 self.apps_v1.delete_namespaced_deployment( 

1127 canary_name, namespace, _request_timeout=self._k8s_timeout 

1128 ) 

1129 logger.info("Deleted canary deployment %s", canary_name) 

1130 except ApiException as e: 

1131 if e.status != 404: 

1132 logger.error("Failed to delete canary deployment %s: %s", canary_name, e) 

1133 

1134 # Delete canary service 

1135 try: 

1136 self.core_v1.delete_namespaced_service( 

1137 canary_name, namespace, _request_timeout=self._k8s_timeout 

1138 ) 

1139 logger.info("Deleted canary service %s", canary_name) 

1140 except ApiException as e: 

1141 if e.status != 404: 

1142 logger.error("Failed to delete canary service %s: %s", canary_name, e) 

1143 

1144 def _delete_resources(self, name: str, namespace: str) -> None: 

1145 """Delete all Kubernetes resources for an endpoint.""" 

1146 # Delete canary resources first 

1147 self._cleanup_canary(name, namespace) 

1148 

1149 # Delete deployment 

1150 try: 

1151 self.apps_v1.delete_namespaced_deployment( 

1152 name, namespace, _request_timeout=self._k8s_timeout 

1153 ) 

1154 logger.info("Deleted deployment %s/%s", namespace, name) 

1155 except ApiException as e: 

1156 if e.status != 404: 

1157 logger.error("Failed to delete deployment %s: %s", name, e) 

1158 

1159 # Delete service 

1160 try: 

1161 self.core_v1.delete_namespaced_service( 

1162 name, namespace, _request_timeout=self._k8s_timeout 

1163 ) 

1164 logger.info("Deleted service %s/%s", namespace, name) 

1165 except ApiException as e: 

1166 if e.status != 404: 

1167 logger.error("Failed to delete service %s: %s", name, e) 

1168 

1169 # Delete ingress 

1170 try: 

1171 self.networking_v1.delete_namespaced_ingress( 

1172 f"inference-{name}", namespace, _request_timeout=self._k8s_timeout 

1173 ) 

1174 logger.info("Deleted ingress for %s", name) 

1175 except ApiException as e: 

1176 if e.status != 404: 

1177 logger.error("Failed to delete ingress for %s: %s", name, e) 

1178 

1179 # Delete HPA 

1180 try: 

1181 autoscaling_v2 = client.AutoscalingV2Api() 

1182 autoscaling_v2.delete_namespaced_horizontal_pod_autoscaler(name, namespace) 

1183 logger.info("Deleted HPA for %s", name) 

1184 except ApiException as e: 

1185 if e.status != 404: 

1186 logger.error("Failed to delete HPA for %s: %s", name, e) 

1187 

1188 def _create_or_update_hpa(self, name: str, namespace: str, spec: dict[str, Any]) -> None: 

1189 """Create or update a Horizontal Pod Autoscaler for an inference endpoint.""" 

1190 autoscaling_config = spec.get("autoscaling", {}) 

1191 if not autoscaling_config.get("enabled"): 

1192 return 

1193 

1194 min_replicas = autoscaling_config.get("min_replicas", 1) 

1195 max_replicas = autoscaling_config.get("max_replicas", 10) 

1196 metrics_config = autoscaling_config.get("metrics", [{"type": "cpu", "target": 70}]) 

1197 

1198 # Build HPA metrics 

1199 hpa_metrics = [] 

1200 for m in metrics_config: 

1201 metric_type = m.get("type", "cpu") 

1202 target_value = m.get("target", 70) 

1203 

1204 if metric_type == "cpu": 

1205 hpa_metrics.append( 

1206 client.V2MetricSpec( 

1207 type="Resource", 

1208 resource=client.V2ResourceMetricSource( 

1209 name="cpu", 

1210 target=client.V2MetricTarget( 

1211 type="Utilization", 

1212 average_utilization=target_value, 

1213 ), 

1214 ), 

1215 ) 

1216 ) 

1217 elif metric_type == "memory": 

1218 hpa_metrics.append( 

1219 client.V2MetricSpec( 

1220 type="Resource", 

1221 resource=client.V2ResourceMetricSource( 

1222 name="memory", 

1223 target=client.V2MetricTarget( 

1224 type="Utilization", 

1225 average_utilization=target_value, 

1226 ), 

1227 ), 

1228 ) 

1229 ) 

1230 

1231 if not hpa_metrics: 

1232 # Default to CPU if no recognized metrics 

1233 hpa_metrics.append( 

1234 client.V2MetricSpec( 

1235 type="Resource", 

1236 resource=client.V2ResourceMetricSource( 

1237 name="cpu", 

1238 target=client.V2MetricTarget( 

1239 type="Utilization", 

1240 average_utilization=70, 

1241 ), 

1242 ), 

1243 ) 

1244 ) 

1245 

1246 hpa = client.V2HorizontalPodAutoscaler( 

1247 metadata=client.V1ObjectMeta( 

1248 name=name, 

1249 namespace=namespace, 

1250 labels={ 

1251 "app": name, 

1252 "project": "gco", 

1253 "gco.io/type": "inference", 

1254 }, 

1255 ), 

1256 spec=client.V2HorizontalPodAutoscalerSpec( 

1257 scale_target_ref=client.V2CrossVersionObjectReference( 

1258 api_version="apps/v1", 

1259 kind="Deployment", 

1260 name=name, 

1261 ), 

1262 min_replicas=min_replicas, 

1263 max_replicas=max_replicas, 

1264 metrics=hpa_metrics, 

1265 ), 

1266 ) 

1267 

1268 autoscaling_v2 = client.AutoscalingV2Api() 

1269 try: 

1270 autoscaling_v2.create_namespaced_horizontal_pod_autoscaler(namespace, hpa) 

1271 logger.info("Created HPA for %s (min=%d, max=%d)", name, min_replicas, max_replicas) 

1272 except ApiException as e: 

1273 if e.status == 409: 1273 ↛ 1277line 1273 didn't jump to line 1277 because the condition on line 1273 was always true

1274 autoscaling_v2.patch_namespaced_horizontal_pod_autoscaler(name, namespace, hpa) 

1275 logger.info("Updated HPA for %s", name) 

1276 else: 

1277 raise 

1278 

1279 # ------------------------------------------------------------------ 

1280 # Metrics 

1281 # ------------------------------------------------------------------ 

1282 

1283 def get_metrics(self) -> dict[str, Any]: 

1284 return { 

1285 "cluster_id": self.cluster_id, 

1286 "region": self.region, 

1287 "running": self._running, 

1288 "reconcile_count": self._reconcile_count, 

1289 "errors_count": self._errors_count, 

1290 } 

1291 

1292 

1293def create_inference_monitor_from_env() -> InferenceMonitor: 

1294 """Create an InferenceMonitor from environment variables.""" 

1295 cluster_id = os.getenv("CLUSTER_NAME", "unknown-cluster") 

1296 region = os.getenv("REGION", "unknown-region") 

1297 namespace = os.getenv("INFERENCE_NAMESPACE", "gco-inference") 

1298 interval = int(os.getenv("RECONCILE_INTERVAL_SECONDS", "15")) 

1299 

1300 # Enable structured JSON logging for CloudWatch Insights 

1301 configure_structured_logging( 

1302 service_name="inference-monitor", 

1303 cluster_id=cluster_id, 

1304 region=region, 

1305 ) 

1306 

1307 store = InferenceEndpointStore() # Uses DYNAMODB_REGION env var, falls back to REGION 

1308 

1309 return InferenceMonitor( 

1310 cluster_id=cluster_id, 

1311 region=region, 

1312 store=store, 

1313 namespace=namespace, 

1314 reconcile_interval=interval, 

1315 ) 

1316 

1317 

1318async def main() -> None: 

1319 """Entry point for the inference monitor.""" 

1320 monitor = create_inference_monitor_from_env() 

1321 logger.info("Inference monitor initialized: %s", monitor.get_metrics()) 

1322 

1323 while True: 

1324 try: 

1325 await monitor.start() 

1326 except KeyboardInterrupt: 

1327 logger.info("Shutting down inference monitor") 

1328 monitor.stop() 

1329 break 

1330 except Exception as e: 

1331 logger.error("Monitor crashed, restarting in 10s: %s", e, exc_info=True) 

1332 monitor.stop() 

1333 monitor._running = False 

1334 await asyncio.sleep(10) 

1335 

1336 

1337if __name__ == "__main__": 

1338 asyncio.run(main())