Coverage for gco/services/inference_monitor.py: 87%
525 statements
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-15 15:07 +0000
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-15 15:07 +0000
1"""
2Inference Monitor — reconciliation controller for inference endpoints.
4Runs in each regional EKS cluster and polls the global DynamoDB table
5(gco-inference-endpoints) to reconcile desired state with actual
6Kubernetes resources. Follows a GitOps-style reconciliation pattern:
8 DynamoDB (desired state) → inference_monitor → Kubernetes (actual state)
10The monitor:
11- Creates Deployments, Services, and Ingress rules for new endpoints
12- Updates existing deployments when spec changes
13- Scales deployments up/down
14- Tears down resources when endpoints are deleted
15- Reports per-region status back to DynamoDB
17Environment Variables:
18 CLUSTER_NAME: Name of the EKS cluster
19 REGION: AWS region this monitor runs in
20 INFERENCE_ENDPOINTS_TABLE_NAME: DynamoDB table name
21 RECONCILE_INTERVAL_SECONDS: Seconds between reconciliation loops (default: 15)
22 INFERENCE_NAMESPACE: Namespace for inference workloads (default: gco-inference)
23"""
25import asyncio
26import logging
27import os
28from datetime import UTC, datetime
29from typing import Any
31from kubernetes import client, config
32from kubernetes.client.models import V1Deployment
33from kubernetes.client.rest import ApiException
35from gco.services.inference_store import InferenceEndpointStore
36from gco.services.structured_logging import configure_structured_logging
38logging.basicConfig(
39 level=logging.INFO,
40 format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
41)
42logger = logging.getLogger(__name__)
45class InferenceMonitor:
46 """
47 Reconciliation controller for inference endpoints.
49 Polls DynamoDB for desired endpoint state and reconciles with
50 the actual Kubernetes resources in the local cluster.
51 """
53 def __init__(
54 self,
55 cluster_id: str,
56 region: str,
57 store: InferenceEndpointStore,
58 namespace: str = "gco-inference",
59 reconcile_interval: int = 15,
60 ):
61 self.cluster_id = cluster_id
62 self.region = region
63 self.store = store
64 self.namespace = namespace
65 self.reconcile_interval = reconcile_interval
66 self._running = False
68 # Initialize Kubernetes clients
69 try:
70 config.load_incluster_config()
71 logger.info("Loaded in-cluster Kubernetes configuration")
72 except config.ConfigException:
73 try:
74 config.load_kube_config()
75 logger.info("Loaded local Kubernetes configuration")
76 except config.ConfigException as e:
77 logger.error("Failed to load Kubernetes configuration: %s", e)
78 raise
80 self.apps_v1 = client.AppsV1Api()
81 self.core_v1 = client.CoreV1Api()
82 self.networking_v1 = client.NetworkingV1Api()
84 # Timeout for Kubernetes API calls (seconds)
85 self._k8s_timeout = int(os.environ.get("K8S_API_TIMEOUT", "30"))
87 # Health watchdog: tracks when each endpoint first became unready.
88 # If an endpoint stays unready for longer than _ingress_removal_threshold,
89 # the watchdog removes its Ingress to protect the shared ALB from
90 # having an unhealthy target group (which would make GA mark the
91 # entire ALB as unhealthy, blocking all inference in the region).
92 self._unready_since: dict[str, datetime] = {}
93 self._ingress_removal_threshold = int(
94 os.environ.get("INFERENCE_UNHEALTHY_THRESHOLD_SECONDS", "300")
95 ) # 5 minutes default
97 # Metrics
98 self._reconcile_count = 0
99 self._errors_count = 0
101 # ------------------------------------------------------------------
102 # Reconciliation loop
103 # ------------------------------------------------------------------
105 async def start(self) -> None:
106 """Start the reconciliation loop with leader election.
108 Uses a Kubernetes Lease object for leader election so that only
109 one replica reconciles at a time. Other replicas stay on standby
110 and take over if the leader dies.
111 """
112 if self._running:
113 logger.warning("Inference monitor already running")
114 return
115 self._running = True
116 logger.info(
117 "Starting inference monitor for %s in %s (interval=%ds)",
118 self.cluster_id,
119 self.region,
120 self.reconcile_interval,
121 )
123 # Namespace and ServiceAccount are pre-created by the kubectl-applier
124 # at deploy time (00-namespaces.yaml, 01-serviceaccounts.yaml). The
125 # inference-monitor SA has namespace-scoped RBAC only — it cannot
126 # read_namespace/create_namespace, so we don't try. If the namespace
127 # is ever missing, deployments below will fail with a clear 404.
129 # Get pod identity for leader election
130 pod_name = os.environ.get("HOSTNAME", f"monitor-{id(self)}")
131 lease_name = "inference-monitor-leader"
133 while self._running:
134 try:
135 if self._try_acquire_lease(lease_name, pod_name): 135 ↛ 138line 135 didn't jump to line 138 because the condition on line 135 was always true
136 await self.reconcile()
137 else:
138 logger.debug("Not the leader, waiting...")
139 except Exception as e:
140 logger.error("Reconciliation error: %s", e, exc_info=True)
141 self._errors_count += 1
142 try:
143 await asyncio.sleep(self.reconcile_interval)
144 except Exception as e:
145 logger.error("Sleep interrupted: %s", e)
146 break
148 def _try_acquire_lease(self, lease_name: str, holder: str) -> bool:
149 """Try to acquire or renew a Kubernetes Lease for leader election.
151 Uses optimistic concurrency via resourceVersion — if two monitors
152 race to update the same lease, K8s returns 409 Conflict for the
153 loser, preventing split-brain.
155 Returns True if this instance is the leader.
156 """
158 coordination_v1 = client.CoordinationV1Api()
159 now = datetime.now(UTC)
161 try:
162 lease = coordination_v1.read_namespaced_lease(lease_name, self.namespace)
163 current_holder = lease.spec.holder_identity
164 renew_time = lease.spec.renew_time
166 # Check if lease is expired (holder hasn't renewed in 3x interval)
167 if renew_time:
168 elapsed = (now - renew_time.replace(tzinfo=UTC)).total_seconds()
169 if elapsed > self.reconcile_interval * 3:
170 # Lease expired — take over
171 logger.info("Lease expired (held by %s), taking over", current_holder)
172 current_holder = None
174 if current_holder == holder:
175 # We're the leader — renew
176 lease.spec.renew_time = now
177 try:
178 coordination_v1.replace_namespaced_lease(lease_name, self.namespace, lease)
179 except ApiException as conflict:
180 if conflict.status == 409:
181 logger.debug("Lease renew conflict (another writer), retrying next cycle")
182 return False
183 raise
184 return True
185 if current_holder is None or current_holder == "":
186 # No leader — claim it
187 lease.spec.holder_identity = holder
188 lease.spec.renew_time = now
189 try:
190 coordination_v1.replace_namespaced_lease(lease_name, self.namespace, lease)
191 except ApiException as conflict:
192 if conflict.status == 409:
193 logger.info("Lost lease race to another monitor")
194 return False
195 raise
196 logger.info("Acquired leader lease as %s", holder)
197 return True
198 # Someone else is the leader
199 return False
201 except ApiException as e:
202 if e.status == 404:
203 # Lease doesn't exist — create it
204 lease = client.V1Lease(
205 metadata=client.V1ObjectMeta(
206 name=lease_name,
207 namespace=self.namespace,
208 ),
209 spec=client.V1LeaseSpec(
210 holder_identity=holder,
211 lease_duration_seconds=self.reconcile_interval * 3,
212 renew_time=now,
213 ),
214 )
215 try:
216 coordination_v1.create_namespaced_lease(self.namespace, lease)
217 logger.info("Created leader lease as %s", holder)
218 return True
219 except ApiException:
220 return False
221 logger.warning("Lease check failed: %s", e.reason)
222 return False
224 def stop(self) -> None:
225 """Stop the reconciliation loop."""
226 self._running = False
227 logger.info("Inference monitor stopped")
229 async def reconcile(self) -> list[dict[str, Any]]:
230 """
231 Run one reconciliation cycle.
233 Returns a list of actions taken (for logging/testing).
234 """
235 self._reconcile_count += 1
236 actions: list[dict[str, Any]] = []
238 # Get all endpoints from DynamoDB
239 try:
240 endpoints = self.store.list_endpoints()
241 except Exception as e:
242 logger.error("Failed to list endpoints from DynamoDB: %s", e)
243 return actions
245 for endpoint in endpoints:
246 try:
247 action = await self._reconcile_endpoint(endpoint)
248 if action: 248 ↛ 245line 248 didn't jump to line 245 because the condition on line 248 was always true
249 actions.append(action)
250 except Exception as e:
251 name = endpoint.get("endpoint_name", "unknown")
252 logger.error("Failed to reconcile endpoint %s: %s", name, e)
253 self._errors_count += 1
254 self.store.update_region_status(
255 name,
256 self.region,
257 "error",
258 error=str(e),
259 )
261 # Purge fully-deleted endpoints from DynamoDB to prevent unbounded growth.
262 # An endpoint is fully deleted when desired_state is "deleted" and all
263 # target regions report "deleted" status.
264 for endpoint in endpoints:
265 if endpoint.get("desired_state") != "deleted": 265 ↛ 267line 265 didn't jump to line 267 because the condition on line 265 was always true
266 continue
267 region_status = endpoint.get("region_status", {})
268 target_regions = endpoint.get("target_regions", [])
269 if not target_regions:
270 continue
271 all_deleted = all(
272 isinstance(region_status.get(r), dict)
273 and region_status.get(r, {}).get("state") == "deleted"
274 for r in target_regions
275 )
276 if all_deleted:
277 ep_name = endpoint["endpoint_name"]
278 try:
279 self.store.delete_endpoint(ep_name)
280 logger.info("Purged fully-deleted endpoint %s from DynamoDB", ep_name)
281 actions.append({"action": "purge", "endpoint": ep_name})
282 except Exception as e:
283 logger.warning("Failed to purge endpoint %s: %s", ep_name, e)
285 return actions
287 async def _reconcile_endpoint(self, endpoint: dict[str, Any]) -> dict[str, Any] | None:
288 """Reconcile a single endpoint."""
289 name = endpoint["endpoint_name"]
290 desired_state = endpoint.get("desired_state", "deploying")
291 target_regions = endpoint.get("target_regions", [])
292 spec = endpoint.get("spec", {})
293 ns = endpoint.get("namespace", self.namespace)
295 # Am I a target region?
296 if self.region not in target_regions:
297 # If I have resources for this endpoint, clean them up
298 if self._deployment_exists(name, ns):
299 logger.info(
300 "Endpoint %s no longer targets %s, cleaning up",
301 name,
302 self.region,
303 )
304 self._delete_resources(name, ns)
305 self.store.update_region_status(
306 name,
307 self.region,
308 "deleted",
309 )
310 return {"action": "cleanup", "endpoint": name, "reason": "region_removed"}
311 return None
313 # Reconcile based on desired state
314 if desired_state in ("deploying", "running"):
315 return await self._reconcile_running(name, ns, spec, endpoint)
316 if desired_state == "stopped":
317 return self._reconcile_stopped(name, ns)
318 if desired_state == "deleted":
319 return self._reconcile_deleted(name, ns)
321 return None
323 async def _reconcile_running(
324 self,
325 name: str,
326 namespace: str,
327 spec: dict[str, Any],
328 endpoint: dict[str, Any],
329 ) -> dict[str, Any] | None:
330 """Ensure the endpoint is running with the correct spec."""
331 deployment = self._get_deployment(name, namespace)
333 if deployment is None:
334 # Create everything
335 logger.info("Creating endpoint %s in %s", name, self.region)
336 self._create_deployment(name, namespace, spec)
337 self._create_service(name, namespace, spec)
338 self._update_ingress_rule(name, namespace, spec, endpoint)
339 if spec.get("autoscaling", {}).get("enabled"):
340 self._create_or_update_hpa(name, namespace, spec)
341 self.store.update_region_status(
342 name,
343 self.region,
344 "creating",
345 replicas_desired=spec.get("replicas", 1),
346 )
347 return {"action": "create", "endpoint": name}
349 # Deployment exists — ensure Service and Ingress also exist
350 # (they may have been manually deleted or lost during a rollout)
351 self._ensure_service(name, namespace, spec)
353 # Check readiness before ensuring Ingress — the health watchdog may
354 # remove the Ingress if the endpoint has been unready too long
355 desired_replicas = spec.get("replicas", 1)
356 current_replicas = deployment.spec.replicas or 1
357 ready_replicas = deployment.status.ready_replicas or 0
359 ingress_removed = self._check_health_watchdog(
360 name, namespace, ready_replicas, desired_replicas, spec, endpoint
361 )
362 if not ingress_removed: 362 ↛ 365line 362 didn't jump to line 365 because the condition on line 362 was always true
363 self._ensure_ingress(name, namespace, spec, endpoint)
365 if current_replicas != desired_replicas:
366 logger.info(
367 "Scaling endpoint %s: %d → %d replicas",
368 name,
369 current_replicas,
370 desired_replicas,
371 )
372 self._scale_deployment(name, namespace, desired_replicas)
373 self.store.update_region_status(
374 name,
375 self.region,
376 "updating",
377 replicas_ready=ready_replicas,
378 replicas_desired=desired_replicas,
379 )
380 return {"action": "scale", "endpoint": name, "replicas": desired_replicas}
382 # Check if image changed
383 current_image = self._get_deployment_image(deployment)
384 desired_image = self._resolve_image_for_region(spec) if spec.get("image") else ""
385 if current_image and desired_image and current_image != desired_image:
386 logger.info("Updating endpoint %s image: %s → %s", name, current_image, desired_image)
387 self._update_deployment_image(name, namespace, desired_image)
388 self.store.update_region_status(
389 name,
390 self.region,
391 "updating",
392 replicas_ready=ready_replicas,
393 replicas_desired=desired_replicas,
394 )
395 return {"action": "update_image", "endpoint": name, "image": desired_image}
397 # Everything is in sync — report status
398 state = "running" if ready_replicas >= desired_replicas else "creating"
399 self.store.update_region_status(
400 name,
401 self.region,
402 state,
403 replicas_ready=ready_replicas,
404 replicas_desired=desired_replicas,
405 )
407 # Reconcile canary deployment if present
408 canary = spec.get("canary")
409 if canary: 409 ↛ 410line 409 didn't jump to line 410 because the condition on line 409 was never true
410 self._reconcile_canary(name, namespace, spec, canary, endpoint)
411 else:
412 # No canary — clean up canary resources if they exist
413 self._cleanup_canary(name, namespace)
415 # If all replicas are ready and desired_state is "deploying", promote to "running"
416 if state == "running" and endpoint.get("desired_state") == "deploying":
417 # Check if all target regions are running
418 all_running = True
419 for r_status in endpoint.get("region_status", {}).values():
420 if isinstance(r_status, dict) and r_status.get("state") != "running": 420 ↛ 421line 420 didn't jump to line 421 because the condition on line 420 was never true
421 all_running = False
422 break
423 if all_running: 423 ↛ 426line 423 didn't jump to line 426 because the condition on line 423 was always true
424 self.store.update_desired_state(name, "running")
426 return None
428 def _reconcile_stopped(self, name: str, namespace: str) -> dict[str, Any] | None:
429 """Scale deployment to zero."""
430 deployment = self._get_deployment(name, namespace)
431 if deployment is None:
432 return None
434 current_replicas = deployment.spec.replicas or 0
435 if current_replicas > 0:
436 logger.info("Stopping endpoint %s (scaling to 0)", name)
437 self._scale_deployment(name, namespace, 0)
438 self.store.update_region_status(
439 name,
440 self.region,
441 "stopped",
442 replicas_ready=0,
443 replicas_desired=0,
444 )
445 return {"action": "stop", "endpoint": name}
447 self.store.update_region_status(
448 name,
449 self.region,
450 "stopped",
451 replicas_ready=0,
452 replicas_desired=0,
453 )
454 return None
456 def _reconcile_deleted(self, name: str, namespace: str) -> dict[str, Any] | None:
457 """Delete all resources for the endpoint."""
458 # Clean up health watchdog tracker
459 self._unready_since.pop(name, None)
461 if self._deployment_exists(name, namespace):
462 logger.info("Deleting endpoint %s from %s", name, self.region)
463 self._delete_resources(name, namespace)
464 self.store.update_region_status(name, self.region, "deleted")
465 return {"action": "delete", "endpoint": name}
467 self.store.update_region_status(name, self.region, "deleted")
468 return None
470 # ------------------------------------------------------------------
471 # Kubernetes resource management
472 # ------------------------------------------------------------------
474 def _deployment_exists(self, name: str, namespace: str) -> bool:
475 try:
476 self.apps_v1.read_namespaced_deployment(
477 name, namespace, _request_timeout=self._k8s_timeout
478 )
479 return True
480 except ApiException as e:
481 if e.status == 404:
482 return False
483 raise
485 def _get_deployment(self, name: str, namespace: str) -> V1Deployment | None:
486 try:
487 return self.apps_v1.read_namespaced_deployment(
488 name, namespace, _request_timeout=self._k8s_timeout
489 )
490 except ApiException as e:
491 if e.status == 404:
492 return None
493 raise
495 def _get_deployment_image(self, deployment: V1Deployment) -> str | None:
496 """Get the image of the first container in a deployment."""
497 containers = deployment.spec.template.spec.containers
498 if containers:
499 image: str = containers[0].image
500 return image
501 return None
503 def _resolve_image_for_region(self, spec: dict[str, Any]) -> str:
504 """Pick the image URI this region should pull from.
506 ``cli.inference.InferenceManager.deploy`` populates
507 ``spec["region_image_uris"]`` with a per-region map when the
508 primary image is an ECR URI, so each cluster can pull from its
509 local replica instead of crossing the WAN. The map is omitted
510 for non-ECR refs and for deploys with ``rewrite_image=False``,
511 in which case we fall back to the flat ``spec["image"]`` URI.
513 When the map is present but lacks an entry for ``self.region``
514 (a target region was added after the spec was last written),
515 the flat URI is also used so the deployment doesn't break — the
516 next reconcile after a fresh deploy picks up the right URI.
517 """
518 region_map = spec.get("region_image_uris")
519 if isinstance(region_map, dict):
520 uri = region_map.get(self.region)
521 if isinstance(uri, str) and uri:
522 return uri
523 return str(spec["image"])
525 def _create_deployment(self, name: str, namespace: str, spec: dict[str, Any]) -> None:
526 """Create a Kubernetes Deployment for an inference endpoint."""
527 replicas = spec.get("replicas", 1)
528 image = self._resolve_image_for_region(spec)
529 port = spec.get("port", 8000)
530 gpu_count = spec.get("gpu_count", 1)
531 health_path = spec.get("health_check_path", "/health")
532 env_vars = spec.get("env", {})
533 resources = spec.get("resources", {})
534 model_path = spec.get("model_path")
535 command = spec.get("command")
536 args = spec.get("args")
538 # Build container
539 container_env = [client.V1EnvVar(name=k, value=str(v)) for k, v in env_vars.items()]
541 # Inject --root-path for servers that support it (vLLM, TGI).
542 # This tells the server to mount its API at /inference/{name}.
543 # We append to existing args (from --extra-args) rather than replacing them.
544 ingress_prefix = f"/inference/{name}"
545 root_path_images = ("vllm", "text-generation-inference", "tgi")
546 image_lower = image.lower()
547 if not command and any(tag in image_lower for tag in root_path_images):
548 if args: 548 ↛ 550line 548 didn't jump to line 550 because the condition on line 548 was never true
549 # Append --root-path to user-provided args if not already present
550 if "--root-path" not in args:
551 args = list(args) + ["--root-path", ingress_prefix]
552 else:
553 args = ["--root-path", ingress_prefix]
555 resource_reqs = client.V1ResourceRequirements(
556 requests=resources.get("requests", {"cpu": "1", "memory": "4Gi"}),
557 limits=resources.get("limits", {"cpu": "4", "memory": "16Gi"}),
558 )
559 # Add accelerator resources (GPU or Neuron)
560 accelerator = spec.get("accelerator", "nvidia")
561 if gpu_count > 0:
562 if accelerator == "neuron":
563 # AWS Trainium/Inferentia — request Neuron devices
564 if resource_reqs.limits is None: 564 ↛ 565line 564 didn't jump to line 565 because the condition on line 564 was never true
565 resource_reqs.limits = {}
566 resource_reqs.limits["aws.amazon.com/neuron"] = str(gpu_count)
567 if resource_reqs.requests is None: 567 ↛ 568line 567 didn't jump to line 568 because the condition on line 567 was never true
568 resource_reqs.requests = {}
569 resource_reqs.requests["aws.amazon.com/neuron"] = str(gpu_count)
570 else:
571 # NVIDIA GPU (default)
572 if resource_reqs.limits is None: 572 ↛ 573line 572 didn't jump to line 573 because the condition on line 572 was never true
573 resource_reqs.limits = {}
574 resource_reqs.limits["nvidia.com/gpu"] = str(gpu_count)
575 if resource_reqs.requests is None: 575 ↛ 576line 575 didn't jump to line 576 because the condition on line 575 was never true
576 resource_reqs.requests = {}
577 resource_reqs.requests["nvidia.com/gpu"] = str(gpu_count)
579 volume_mounts = []
580 volumes = []
581 init_containers = []
582 model_source = spec.get("model_source")
584 if model_path or model_source:
585 volume_mounts.append(
586 client.V1VolumeMount(
587 name="model-storage",
588 mount_path="/models",
589 )
590 )
591 volumes.append(
592 client.V1Volume(
593 name="model-storage",
594 persistent_volume_claim=client.V1PersistentVolumeClaimVolumeSource(
595 claim_name="efs-claim",
596 ),
597 )
598 )
600 # Add init container to sync model from S3 if model_source is set
601 if model_source and model_source.startswith("s3://"):
602 model_dest = f"/models/{name}"
603 init_containers.append(
604 client.V1Container(
605 name="model-sync",
606 image="amazon/aws-cli:latest",
607 command=["sh", "-c"],
608 args=[
609 f"if [ -d '{model_dest}' ] && [ \"$(ls -A '{model_dest}')\" ]; then "
610 f"echo 'Model already cached at {model_dest}, skipping sync'; "
611 f"else echo 'Syncing model from {model_source}...'; "
612 f"aws s3 sync {model_source} {model_dest} --quiet; "
613 f"echo 'Model sync complete'; fi"
614 ],
615 volume_mounts=[
616 client.V1VolumeMount(
617 name="model-storage",
618 mount_path="/models",
619 )
620 ],
621 resources=client.V1ResourceRequirements(
622 requests={"cpu": "1", "memory": "2Gi"},
623 limits={"cpu": "4", "memory": "8Gi"},
624 ),
625 )
626 )
628 # Probe path depends on whether the server handles the prefix
629 uses_root_path = args is not None and "--root-path" in args
630 probe_health = f"{ingress_prefix}{health_path}" if uses_root_path else health_path
632 container = client.V1Container(
633 name="inference",
634 image=image,
635 ports=[client.V1ContainerPort(container_port=port)],
636 env=container_env if container_env else None,
637 resources=resource_reqs,
638 volume_mounts=volume_mounts if volume_mounts else None,
639 command=command,
640 args=args,
641 liveness_probe=client.V1Probe(
642 http_get=client.V1HTTPGetAction(path=probe_health, port=port),
643 initial_delay_seconds=120,
644 period_seconds=15,
645 failure_threshold=5,
646 ),
647 readiness_probe=client.V1Probe(
648 http_get=client.V1HTTPGetAction(path=probe_health, port=port),
649 initial_delay_seconds=30,
650 period_seconds=10,
651 ),
652 )
654 # Build tolerations based on accelerator type
655 if accelerator == "neuron":
656 tolerations = [
657 client.V1Toleration(
658 key="aws.amazon.com/neuron",
659 operator="Equal",
660 value="true",
661 effect="NoSchedule",
662 )
663 ]
664 else:
665 tolerations = [
666 client.V1Toleration(
667 key="nvidia.com/gpu",
668 operator="Equal",
669 value="true",
670 effect="NoSchedule",
671 )
672 ]
674 # Node selector based on accelerator type
675 node_selector = spec.get("node_selector", {})
676 if gpu_count > 0 and not node_selector:
677 if accelerator == "neuron":
678 node_selector = {"accelerator": "neuron"}
679 else:
680 node_selector = {"eks.amazonaws.com/instance-gpu-manufacturer": "nvidia"}
682 # Apply capacity type preference (spot/on-demand)
683 capacity_type = spec.get("capacity_type")
684 if capacity_type in ("spot", "on-demand"):
685 node_selector["karpenter.sh/capacity-type"] = capacity_type
687 deployment = client.V1Deployment(
688 metadata=client.V1ObjectMeta(
689 name=name,
690 namespace=namespace,
691 labels={
692 "app": name,
693 "project": "gco",
694 "gco.io/type": "inference",
695 },
696 ),
697 spec=client.V1DeploymentSpec(
698 replicas=replicas,
699 selector=client.V1LabelSelector(
700 match_labels={"app": name},
701 ),
702 template=client.V1PodTemplateSpec(
703 metadata=client.V1ObjectMeta(
704 labels={
705 "app": name,
706 "project": "gco",
707 "gco.io/type": "inference",
708 },
709 ),
710 spec=client.V1PodSpec(
711 service_account_name="gco-service-account",
712 containers=[container],
713 init_containers=init_containers if init_containers else None,
714 tolerations=tolerations,
715 node_selector=node_selector if node_selector else None,
716 volumes=volumes if volumes else None,
717 ),
718 ),
719 ),
720 )
722 self.apps_v1.create_namespaced_deployment(
723 namespace, deployment, _request_timeout=self._k8s_timeout
724 )
725 logger.info("Created deployment %s/%s", namespace, name)
727 def _create_service(self, name: str, namespace: str, spec: dict[str, Any]) -> None:
728 """Create a Kubernetes Service for an inference endpoint."""
729 port = spec.get("port", 8000)
731 service = client.V1Service(
732 metadata=client.V1ObjectMeta(
733 name=name,
734 namespace=namespace,
735 labels={
736 "app": name,
737 "project": "gco",
738 "gco.io/type": "inference",
739 },
740 ),
741 spec=client.V1ServiceSpec(
742 selector={"app": name},
743 ports=[
744 client.V1ServicePort(
745 port=80,
746 target_port=port,
747 protocol="TCP",
748 )
749 ],
750 type="ClusterIP",
751 ),
752 )
754 try:
755 self.core_v1.create_namespaced_service(
756 namespace, service, _request_timeout=self._k8s_timeout
757 )
758 logger.info("Created service %s/%s", namespace, name)
759 except ApiException as e:
760 if e.status == 409: 760 ↛ 763line 760 didn't jump to line 763 because the condition on line 760 was always true
761 logger.info("Service %s/%s already exists", namespace, name)
762 else:
763 raise
765 def _ensure_service(self, name: str, namespace: str, spec: dict[str, Any]) -> None:
766 """Ensure the Service exists, recreating it if missing."""
767 try:
768 self.core_v1.read_namespaced_service(
769 name, namespace, _request_timeout=self._k8s_timeout
770 )
771 except ApiException as e:
772 if e.status == 404:
773 logger.warning("Service %s/%s missing, recreating", namespace, name)
774 self._create_service(name, namespace, spec)
775 else:
776 raise
778 def _ensure_ingress(
779 self,
780 name: str,
781 namespace: str,
782 spec: dict[str, Any],
783 endpoint: dict[str, Any],
784 ) -> None:
785 """Ensure the Ingress exists, recreating it if missing."""
786 try:
787 self.networking_v1.read_namespaced_ingress(
788 f"inference-{name}", namespace, _request_timeout=self._k8s_timeout
789 )
790 except ApiException as e:
791 if e.status == 404:
792 logger.warning("Ingress for %s missing, recreating", name)
793 self._update_ingress_rule(name, namespace, spec, endpoint)
794 else:
795 raise
797 def _update_ingress_rule(
798 self,
799 name: str,
800 namespace: str,
801 spec: dict[str, Any],
802 endpoint: dict[str, Any],
803 ) -> None:
804 """Create or update an Ingress for the inference endpoint.
806 The Ingress is created in the same namespace as the Service and pods.
807 IngressClassParams with group.name merges all Ingresses onto a single
808 shared ALB regardless of namespace.
809 """
810 ingress_path = endpoint.get("ingress_path", f"/inference/{name}")
811 image = spec.get("image", "")
812 image_lower = image.lower()
813 root_path_images = ("vllm", "text-generation-inference", "tgi")
814 uses_root_path = any(tag in image_lower for tag in root_path_images)
815 base_health = spec.get("health_check_path", "/health")
816 health_path = f"/inference/{name}{base_health}" if uses_root_path else base_health
818 ingress = client.V1Ingress(
819 metadata=client.V1ObjectMeta(
820 name=f"inference-{name}",
821 namespace=namespace,
822 labels={
823 "app": name,
824 "project": "gco",
825 "gco.io/type": "inference",
826 },
827 annotations={
828 "alb.ingress.kubernetes.io/healthcheck-path": health_path,
829 "alb.ingress.kubernetes.io/healthcheck-interval-seconds": "15",
830 },
831 ),
832 spec=client.V1IngressSpec(
833 ingress_class_name="alb",
834 rules=[
835 client.V1IngressRule(
836 http=client.V1HTTPIngressRuleValue(
837 paths=[
838 client.V1HTTPIngressPath(
839 path=ingress_path,
840 path_type="Prefix",
841 backend=client.V1IngressBackend(
842 service=client.V1IngressServiceBackend(
843 name=name,
844 port=client.V1ServiceBackendPort(
845 number=80,
846 ),
847 ),
848 ),
849 )
850 ]
851 )
852 )
853 ],
854 ),
855 )
857 try:
858 self.networking_v1.create_namespaced_ingress(
859 namespace, ingress, _request_timeout=self._k8s_timeout
860 )
861 logger.info("Created ingress for %s at %s", name, ingress_path)
862 except ApiException as e:
863 if e.status == 409: 863 ↛ 869line 863 didn't jump to line 869 because the condition on line 863 was always true
864 self.networking_v1.patch_namespaced_ingress(
865 f"inference-{name}", namespace, ingress, _request_timeout=self._k8s_timeout
866 )
867 logger.info("Updated ingress for %s", name)
868 else:
869 raise
871 def _check_health_watchdog(
872 self,
873 name: str,
874 namespace: str,
875 ready_replicas: int,
876 desired_replicas: int,
877 spec: dict[str, Any],
878 endpoint: dict[str, Any],
879 ) -> bool:
880 """Health watchdog: remove Ingress for persistently unhealthy endpoints.
882 If an endpoint has zero ready replicas for longer than the configured
883 threshold, the watchdog removes its Ingress to protect the shared ALB.
884 Global Accelerator considers an ALB unhealthy if ANY target group has
885 zero healthy targets, so one bad endpoint can block all inference
886 traffic to the region.
888 When the endpoint recovers (ready_replicas > 0), the Ingress is
889 automatically re-created by _ensure_ingress on the next cycle.
891 Returns:
892 True if the Ingress was removed (caller should skip _ensure_ingress).
893 False if the endpoint is healthy or still within the grace period.
894 """
895 if ready_replicas > 0:
896 # Endpoint is healthy — clear the tracker
897 if name in self._unready_since:
898 logger.info(
899 "Endpoint %s recovered, re-enabling Ingress",
900 name,
901 )
902 del self._unready_since[name]
903 return False
905 # Endpoint has zero ready replicas
906 now = datetime.now(UTC)
908 if name not in self._unready_since:
909 # First time seeing this endpoint as unready — start the clock
910 self._unready_since[name] = now
911 logger.warning(
912 "Endpoint %s has 0/%d ready replicas, starting health watchdog timer",
913 name,
914 desired_replicas,
915 )
916 return False
918 # Check how long it's been unready
919 unready_duration = (now - self._unready_since[name]).total_seconds()
921 if unready_duration < self._ingress_removal_threshold:
922 remaining = self._ingress_removal_threshold - unready_duration
923 logger.warning(
924 "Endpoint %s unready for %ds (removing Ingress in %ds)",
925 name,
926 int(unready_duration),
927 int(remaining),
928 )
929 return False
931 # Threshold exceeded — remove the Ingress to protect the ALB
932 ingress_name = f"inference-{name}"
933 try:
934 self.networking_v1.delete_namespaced_ingress(
935 ingress_name, namespace, _request_timeout=self._k8s_timeout
936 )
937 logger.warning(
938 "WATCHDOG: Removed Ingress for unhealthy endpoint %s "
939 "(unready for %ds > %ds threshold). "
940 "Ingress will be re-created when the endpoint recovers.",
941 name,
942 int(unready_duration),
943 self._ingress_removal_threshold,
944 )
945 except ApiException as e:
946 if e.status == 404:
947 logger.debug("Ingress for %s already removed", name)
948 else:
949 logger.error("Failed to remove Ingress for %s: %s", name, e)
951 return True
953 def _scale_deployment(self, name: str, namespace: str, replicas: int) -> None:
954 """Scale a deployment to the desired replica count."""
955 self.apps_v1.patch_namespaced_deployment(
956 name,
957 namespace,
958 body={"spec": {"replicas": replicas}},
959 _request_timeout=self._k8s_timeout,
960 )
962 def _update_deployment_image(self, name: str, namespace: str, image: str) -> None:
963 """Update the container image of a deployment."""
964 self.apps_v1.patch_namespaced_deployment(
965 name,
966 namespace,
967 body={
968 "spec": {
969 "template": {"spec": {"containers": [{"name": "inference", "image": image}]}}
970 }
971 },
972 _request_timeout=self._k8s_timeout,
973 )
975 def _reconcile_canary(
976 self,
977 name: str,
978 namespace: str,
979 spec: dict[str, Any],
980 canary: dict[str, Any],
981 endpoint: dict[str, Any],
982 ) -> None:
983 """Reconcile canary deployment and weighted ingress routing.
985 Creates a canary deployment and service alongside the primary,
986 then updates the ingress to use ALB action-based weighted routing.
987 """
988 canary_name = f"{name}-canary"
989 canary_image = canary.get("image", "")
990 canary_replicas = canary.get("replicas", 1)
991 canary_weight = canary.get("weight", 10)
992 primary_weight = 100 - canary_weight
994 # Build canary spec (same as primary but with canary image/replicas)
995 canary_spec = dict(spec)
996 canary_spec["image"] = canary_image
997 canary_spec["replicas"] = canary_replicas
998 # Remove canary field from the canary spec to avoid recursion
999 canary_spec.pop("canary", None)
1001 # Create or update canary deployment
1002 canary_deployment = self._get_deployment(canary_name, namespace)
1003 if canary_deployment is None:
1004 logger.info("Creating canary deployment %s with image %s", canary_name, canary_image)
1005 self._create_deployment(canary_name, namespace, canary_spec)
1006 self._create_service(canary_name, namespace, canary_spec)
1007 else:
1008 # Update image if changed
1009 current_image = self._get_deployment_image(canary_deployment)
1010 if current_image != canary_image:
1011 self._update_deployment_image(canary_name, namespace, canary_image)
1012 # Update replicas if changed
1013 if (canary_deployment.spec.replicas or 1) != canary_replicas:
1014 self._scale_deployment(canary_name, namespace, canary_replicas)
1016 # Update ingress with weighted routing via ALB actions annotation
1017 self._update_canary_ingress(name, namespace, spec, endpoint, primary_weight, canary_weight)
1019 def _update_canary_ingress(
1020 self,
1021 name: str,
1022 namespace: str,
1023 spec: dict[str, Any],
1024 endpoint: dict[str, Any],
1025 primary_weight: int,
1026 canary_weight: int,
1027 ) -> None:
1028 """Update ingress with ALB weighted target group routing."""
1029 import json as _json
1031 ingress_path = endpoint.get("ingress_path", f"/inference/{name}")
1032 image = spec.get("image", "")
1033 image_lower = image.lower()
1034 root_path_images = ("vllm", "text-generation-inference", "tgi")
1035 uses_root_path = any(tag in image_lower for tag in root_path_images)
1036 base_health = spec.get("health_check_path", "/health")
1037 health_path = f"/inference/{name}{base_health}" if uses_root_path else base_health
1039 # ALB weighted routing via forward action annotation
1040 forward_config = _json.dumps(
1041 {
1042 "type": "forward",
1043 "forwardConfig": {
1044 "targetGroups": [
1045 {
1046 "serviceName": name,
1047 "servicePort": 80,
1048 "weight": primary_weight,
1049 },
1050 {
1051 "serviceName": f"{name}-canary",
1052 "servicePort": 80,
1053 "weight": canary_weight,
1054 },
1055 ]
1056 },
1057 }
1058 )
1060 ingress = client.V1Ingress(
1061 metadata=client.V1ObjectMeta(
1062 name=f"inference-{name}",
1063 namespace=namespace,
1064 labels={
1065 "app": name,
1066 "project": "gco",
1067 "gco.io/type": "inference",
1068 "gco.io/canary": "true",
1069 },
1070 annotations={
1071 "alb.ingress.kubernetes.io/healthcheck-path": health_path,
1072 "alb.ingress.kubernetes.io/healthcheck-interval-seconds": "15",
1073 "alb.ingress.kubernetes.io/actions.weighted-routing": forward_config,
1074 },
1075 ),
1076 spec=client.V1IngressSpec(
1077 ingress_class_name="alb",
1078 rules=[
1079 client.V1IngressRule(
1080 http=client.V1HTTPIngressRuleValue(
1081 paths=[
1082 client.V1HTTPIngressPath(
1083 path=ingress_path,
1084 path_type="Prefix",
1085 backend=client.V1IngressBackend(
1086 service=client.V1IngressServiceBackend(
1087 name="weighted-routing",
1088 port=client.V1ServiceBackendPort(
1089 name="use-annotation",
1090 ),
1091 ),
1092 ),
1093 )
1094 ]
1095 )
1096 )
1097 ],
1098 ),
1099 )
1101 try:
1102 self.networking_v1.patch_namespaced_ingress(
1103 f"inference-{name}", namespace, ingress, _request_timeout=self._k8s_timeout
1104 )
1105 logger.info(
1106 "Updated ingress for %s: primary=%d%% canary=%d%%",
1107 name,
1108 primary_weight,
1109 canary_weight,
1110 )
1111 except ApiException as e:
1112 if e.status == 404:
1113 self.networking_v1.create_namespaced_ingress(
1114 namespace, ingress, _request_timeout=self._k8s_timeout
1115 )
1116 logger.info("Created canary ingress for %s", name)
1117 else:
1118 raise
1120 def _cleanup_canary(self, name: str, namespace: str) -> None:
1121 """Remove canary deployment, service, and restore primary-only ingress."""
1122 canary_name = f"{name}-canary"
1124 # Delete canary deployment
1125 try:
1126 self.apps_v1.delete_namespaced_deployment(
1127 canary_name, namespace, _request_timeout=self._k8s_timeout
1128 )
1129 logger.info("Deleted canary deployment %s", canary_name)
1130 except ApiException as e:
1131 if e.status != 404:
1132 logger.error("Failed to delete canary deployment %s: %s", canary_name, e)
1134 # Delete canary service
1135 try:
1136 self.core_v1.delete_namespaced_service(
1137 canary_name, namespace, _request_timeout=self._k8s_timeout
1138 )
1139 logger.info("Deleted canary service %s", canary_name)
1140 except ApiException as e:
1141 if e.status != 404:
1142 logger.error("Failed to delete canary service %s: %s", canary_name, e)
1144 def _delete_resources(self, name: str, namespace: str) -> None:
1145 """Delete all Kubernetes resources for an endpoint."""
1146 # Delete canary resources first
1147 self._cleanup_canary(name, namespace)
1149 # Delete deployment
1150 try:
1151 self.apps_v1.delete_namespaced_deployment(
1152 name, namespace, _request_timeout=self._k8s_timeout
1153 )
1154 logger.info("Deleted deployment %s/%s", namespace, name)
1155 except ApiException as e:
1156 if e.status != 404:
1157 logger.error("Failed to delete deployment %s: %s", name, e)
1159 # Delete service
1160 try:
1161 self.core_v1.delete_namespaced_service(
1162 name, namespace, _request_timeout=self._k8s_timeout
1163 )
1164 logger.info("Deleted service %s/%s", namespace, name)
1165 except ApiException as e:
1166 if e.status != 404:
1167 logger.error("Failed to delete service %s: %s", name, e)
1169 # Delete ingress
1170 try:
1171 self.networking_v1.delete_namespaced_ingress(
1172 f"inference-{name}", namespace, _request_timeout=self._k8s_timeout
1173 )
1174 logger.info("Deleted ingress for %s", name)
1175 except ApiException as e:
1176 if e.status != 404:
1177 logger.error("Failed to delete ingress for %s: %s", name, e)
1179 # Delete HPA
1180 try:
1181 autoscaling_v2 = client.AutoscalingV2Api()
1182 autoscaling_v2.delete_namespaced_horizontal_pod_autoscaler(name, namespace)
1183 logger.info("Deleted HPA for %s", name)
1184 except ApiException as e:
1185 if e.status != 404:
1186 logger.error("Failed to delete HPA for %s: %s", name, e)
1188 def _create_or_update_hpa(self, name: str, namespace: str, spec: dict[str, Any]) -> None:
1189 """Create or update a Horizontal Pod Autoscaler for an inference endpoint."""
1190 autoscaling_config = spec.get("autoscaling", {})
1191 if not autoscaling_config.get("enabled"):
1192 return
1194 min_replicas = autoscaling_config.get("min_replicas", 1)
1195 max_replicas = autoscaling_config.get("max_replicas", 10)
1196 metrics_config = autoscaling_config.get("metrics", [{"type": "cpu", "target": 70}])
1198 # Build HPA metrics
1199 hpa_metrics = []
1200 for m in metrics_config:
1201 metric_type = m.get("type", "cpu")
1202 target_value = m.get("target", 70)
1204 if metric_type == "cpu":
1205 hpa_metrics.append(
1206 client.V2MetricSpec(
1207 type="Resource",
1208 resource=client.V2ResourceMetricSource(
1209 name="cpu",
1210 target=client.V2MetricTarget(
1211 type="Utilization",
1212 average_utilization=target_value,
1213 ),
1214 ),
1215 )
1216 )
1217 elif metric_type == "memory":
1218 hpa_metrics.append(
1219 client.V2MetricSpec(
1220 type="Resource",
1221 resource=client.V2ResourceMetricSource(
1222 name="memory",
1223 target=client.V2MetricTarget(
1224 type="Utilization",
1225 average_utilization=target_value,
1226 ),
1227 ),
1228 )
1229 )
1231 if not hpa_metrics:
1232 # Default to CPU if no recognized metrics
1233 hpa_metrics.append(
1234 client.V2MetricSpec(
1235 type="Resource",
1236 resource=client.V2ResourceMetricSource(
1237 name="cpu",
1238 target=client.V2MetricTarget(
1239 type="Utilization",
1240 average_utilization=70,
1241 ),
1242 ),
1243 )
1244 )
1246 hpa = client.V2HorizontalPodAutoscaler(
1247 metadata=client.V1ObjectMeta(
1248 name=name,
1249 namespace=namespace,
1250 labels={
1251 "app": name,
1252 "project": "gco",
1253 "gco.io/type": "inference",
1254 },
1255 ),
1256 spec=client.V2HorizontalPodAutoscalerSpec(
1257 scale_target_ref=client.V2CrossVersionObjectReference(
1258 api_version="apps/v1",
1259 kind="Deployment",
1260 name=name,
1261 ),
1262 min_replicas=min_replicas,
1263 max_replicas=max_replicas,
1264 metrics=hpa_metrics,
1265 ),
1266 )
1268 autoscaling_v2 = client.AutoscalingV2Api()
1269 try:
1270 autoscaling_v2.create_namespaced_horizontal_pod_autoscaler(namespace, hpa)
1271 logger.info("Created HPA for %s (min=%d, max=%d)", name, min_replicas, max_replicas)
1272 except ApiException as e:
1273 if e.status == 409: 1273 ↛ 1277line 1273 didn't jump to line 1277 because the condition on line 1273 was always true
1274 autoscaling_v2.patch_namespaced_horizontal_pod_autoscaler(name, namespace, hpa)
1275 logger.info("Updated HPA for %s", name)
1276 else:
1277 raise
1279 # ------------------------------------------------------------------
1280 # Metrics
1281 # ------------------------------------------------------------------
1283 def get_metrics(self) -> dict[str, Any]:
1284 return {
1285 "cluster_id": self.cluster_id,
1286 "region": self.region,
1287 "running": self._running,
1288 "reconcile_count": self._reconcile_count,
1289 "errors_count": self._errors_count,
1290 }
1293def create_inference_monitor_from_env() -> InferenceMonitor:
1294 """Create an InferenceMonitor from environment variables."""
1295 cluster_id = os.getenv("CLUSTER_NAME", "unknown-cluster")
1296 region = os.getenv("REGION", "unknown-region")
1297 namespace = os.getenv("INFERENCE_NAMESPACE", "gco-inference")
1298 interval = int(os.getenv("RECONCILE_INTERVAL_SECONDS", "15"))
1300 # Enable structured JSON logging for CloudWatch Insights
1301 configure_structured_logging(
1302 service_name="inference-monitor",
1303 cluster_id=cluster_id,
1304 region=region,
1305 )
1307 store = InferenceEndpointStore() # Uses DYNAMODB_REGION env var, falls back to REGION
1309 return InferenceMonitor(
1310 cluster_id=cluster_id,
1311 region=region,
1312 store=store,
1313 namespace=namespace,
1314 reconcile_interval=interval,
1315 )
1318async def main() -> None:
1319 """Entry point for the inference monitor."""
1320 monitor = create_inference_monitor_from_env()
1321 logger.info("Inference monitor initialized: %s", monitor.get_metrics())
1323 while True:
1324 try:
1325 await monitor.start()
1326 except KeyboardInterrupt:
1327 logger.info("Shutting down inference monitor")
1328 monitor.stop()
1329 break
1330 except Exception as e:
1331 logger.error("Monitor crashed, restarting in 10s: %s", e, exc_info=True)
1332 monitor.stop()
1333 monitor._running = False
1334 await asyncio.sleep(10)
1337if __name__ == "__main__":
1338 asyncio.run(main())