Coverage for gco / services / health_monitor.py: 96%
328 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-30 21:47 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-30 21:47 +0000
1"""
2Health Monitor Service for GCO (Global Capacity Orchestrator on AWS).
4This service monitors Kubernetes cluster resource utilization and reports
5health status for load balancer health checks and monitoring dashboards.
7Key Features:
8- Collects CPU, memory, and GPU utilization metrics from Kubernetes Metrics Server
9- Compares utilization against configurable thresholds
10- Reports health status (healthy/unhealthy) based on threshold violations
11- Caches metrics to reduce API calls to Kubernetes
13Environment Variables:
14 CLUSTER_NAME: Name of the EKS cluster being monitored
15 REGION: AWS region of the cluster
16 CPU_THRESHOLD: CPU utilization threshold percentage (default: 80, -1 to disable)
17 MEMORY_THRESHOLD: Memory utilization threshold percentage (default: 85, -1 to disable)
18 GPU_THRESHOLD: GPU utilization threshold percentage (default: 90, -1 to disable)
20Usage:
21 health_monitor = create_health_monitor_from_env()
22 status = await health_monitor.get_health_status()
23"""
25import asyncio
26import logging
27import os
28from datetime import datetime
29from typing import Any, Literal
31from kubernetes import client, config
32from kubernetes.client.rest import ApiException
34from gco.models import HealthStatus, RequestedResources, ResourceThresholds, ResourceUtilization
35from gco.services.structured_logging import configure_structured_logging
37logging.basicConfig(
38 level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
39)
40logger = logging.getLogger(__name__)
43class HealthMonitor:
44 """
45 Monitors Kubernetes cluster resource utilization and determines health status
46 """
48 def __init__(self, cluster_id: str, region: str, thresholds: ResourceThresholds):
49 self.cluster_id = cluster_id
50 self.region = region
51 self.thresholds = thresholds
53 # Initialize Kubernetes clients
54 try:
55 # Try to load in-cluster config first (when running in pod)
56 config.load_incluster_config()
57 logger.info("Loaded in-cluster Kubernetes configuration")
58 except config.ConfigException:
59 try:
60 # Fall back to local kubeconfig (for development)
61 config.load_kube_config()
62 logger.info("Loaded local Kubernetes configuration")
63 except config.ConfigException as e:
64 logger.error(f"Failed to load Kubernetes configuration: {e}")
65 raise
67 self.core_v1 = client.CoreV1Api()
68 self.networking_v1 = client.NetworkingV1Api()
69 self.metrics_v1beta1 = client.CustomObjectsApi()
71 # Timeout for Kubernetes API calls (seconds)
72 self._k8s_timeout = int(os.environ.get("K8S_API_TIMEOUT", "30"))
74 # Cache for metrics
75 self._last_metrics_time: datetime | None = None
76 self._cached_metrics: dict[str, Any] | None = None
77 self._cache_duration = 30 # seconds
79 # ALB hostname sync
80 self._last_alb_sync: datetime | None = None
81 self._alb_sync_interval = 300 # 5 minutes
83 async def get_cluster_metrics(self) -> tuple[ResourceUtilization, int, int, RequestedResources]:
84 """
85 Get current cluster resource utilization metrics
86 Returns: (ResourceUtilization, active_jobs_count, pending_pods_count, pending_requested_resources)
87 """
88 try:
89 # Get node metrics from metrics server
90 node_metrics = await self._get_node_metrics()
92 # Get pod metrics for active jobs count and pending pods
93 active_jobs, pending_pods = await self._get_pod_counts()
95 # Calculate cluster-wide utilization
96 cpu_utilization = self._calculate_cpu_utilization(node_metrics)
97 memory_utilization = self._calculate_memory_utilization(node_metrics)
98 gpu_utilization = await self._calculate_gpu_utilization()
100 # Calculate resources requested by pending pods
101 pending_requested = await self._calculate_pending_requested_resources()
103 resource_utilization = ResourceUtilization(
104 cpu=cpu_utilization, memory=memory_utilization, gpu=gpu_utilization
105 )
107 logger.info(
108 f"Cluster metrics - CPU: {cpu_utilization:.1f}%, "
109 f"Memory: {memory_utilization:.1f}%, GPU: {gpu_utilization:.1f}%, "
110 f"Active Jobs: {active_jobs}, Pending Pods: {pending_pods}, "
111 f"Pending Requested CPU: {pending_requested.cpu_vcpus:.1f} vCPUs, "
112 f"Pending Requested Memory: {pending_requested.memory_gb:.1f} GB"
113 )
115 return resource_utilization, active_jobs, pending_pods, pending_requested
117 except Exception as e:
118 logger.error(f"Failed to get cluster metrics: {e}")
119 # Re-raise so get_health_status returns "unhealthy" instead of
120 # silently reporting 0% utilization (which looks healthy to GA).
121 raise
123 async def _get_node_metrics(self) -> dict[str, Any]:
124 """Get node metrics from Kubernetes metrics server"""
125 try:
126 # Check cache first
127 now = datetime.now()
128 if (
129 self._cached_metrics
130 and self._last_metrics_time
131 and (now - self._last_metrics_time).seconds < self._cache_duration
132 ):
133 return self._cached_metrics
135 # Fetch fresh metrics
136 node_metrics: dict[str, Any] = self.metrics_v1beta1.list_cluster_custom_object(
137 group="metrics.k8s.io",
138 version="v1beta1",
139 plural="nodes",
140 _request_timeout=self._k8s_timeout,
141 )
143 # Update cache
144 self._cached_metrics = node_metrics
145 self._last_metrics_time = now
147 return node_metrics
149 except ApiException as e:
150 logger.error(f"Failed to get node metrics: {e}")
151 # Invalidate cache so stale data isn't used on next call
152 self._cached_metrics = None
153 self._last_metrics_time = None
154 # Re-raise so get_cluster_metrics propagates the failure
155 # to get_health_status, which returns "unhealthy"
156 raise
158 def _calculate_cpu_utilization(self, node_metrics: dict[str, Any]) -> float:
159 """Calculate cluster-wide CPU utilization percentage"""
160 total_cpu_usage = 0.0
161 total_cpu_capacity = 0.0
163 try:
164 # Get node list for capacity information
165 nodes = self.core_v1.list_node(_request_timeout=self._k8s_timeout)
166 node_capacities = {}
168 for node in nodes.items:
169 node_name = node.metadata.name
170 cpu_capacity = node.status.allocatable.get("cpu", "0")
171 # Convert CPU capacity to millicores
172 if cpu_capacity.endswith("m"):
173 cpu_capacity_millicores = int(cpu_capacity[:-1])
174 else:
175 cpu_capacity_millicores = int(cpu_capacity) * 1000
176 node_capacities[node_name] = cpu_capacity_millicores
177 total_cpu_capacity += cpu_capacity_millicores
179 # Calculate usage from metrics
180 for item in node_metrics.get("items", []):
181 node_name = item["metadata"]["name"]
182 cpu_usage = item["usage"]["cpu"]
184 # Convert CPU usage to millicores
185 if cpu_usage.endswith("n"):
186 cpu_usage_millicores = int(cpu_usage[:-1]) / 1_000_000
187 elif cpu_usage.endswith("u"):
188 cpu_usage_millicores = int(cpu_usage[:-1]) / 1_000
189 elif cpu_usage.endswith("m"):
190 cpu_usage_millicores = int(cpu_usage[:-1])
191 else:
192 cpu_usage_millicores = int(cpu_usage) * 1000
194 total_cpu_usage += cpu_usage_millicores
196 if total_cpu_capacity > 0:
197 return (total_cpu_usage / total_cpu_capacity) * 100
199 except Exception as e:
200 logger.error(f"Error calculating CPU utilization: {e}")
202 return 0.0
204 def _calculate_memory_utilization(self, node_metrics: dict[str, Any]) -> float:
205 """Calculate cluster-wide memory utilization percentage"""
206 total_memory_usage = 0
207 total_memory_capacity = 0
209 try:
210 # Get node list for capacity information
211 nodes = self.core_v1.list_node(_request_timeout=self._k8s_timeout)
213 for node in nodes.items:
214 memory_capacity = node.status.allocatable.get("memory", "0")
215 # Convert memory capacity to bytes
216 memory_capacity_bytes = self._parse_memory_string(memory_capacity)
217 total_memory_capacity += memory_capacity_bytes
219 # Calculate usage from metrics
220 for item in node_metrics.get("items", []):
221 memory_usage = item["usage"]["memory"]
222 memory_usage_bytes = self._parse_memory_string(memory_usage)
223 total_memory_usage += memory_usage_bytes
225 if total_memory_capacity > 0:
226 return (total_memory_usage / total_memory_capacity) * 100
228 except Exception as e:
229 logger.error(f"Error calculating memory utilization: {e}")
231 return 0.0
233 def _parse_memory_string(self, memory_str: str) -> int:
234 """Parse Kubernetes memory string to bytes"""
235 if not memory_str:
236 return 0
238 memory_str = memory_str.strip()
240 # Handle different units
241 if memory_str.endswith("Ki"):
242 return int(memory_str[:-2]) * 1024
243 if memory_str.endswith("Mi"):
244 return int(memory_str[:-2]) * 1024 * 1024
245 if memory_str.endswith("Gi"):
246 return int(memory_str[:-2]) * 1024 * 1024 * 1024
247 if memory_str.endswith("Ti"):
248 return int(memory_str[:-2]) * 1024 * 1024 * 1024 * 1024
249 if memory_str.endswith("k"):
250 return int(memory_str[:-1]) * 1000
251 if memory_str.endswith("M"):
252 return int(memory_str[:-1]) * 1000 * 1000
253 if memory_str.endswith("G"):
254 return int(memory_str[:-1]) * 1000 * 1000 * 1000
255 return int(memory_str)
257 async def _calculate_gpu_utilization(self) -> float:
258 """Calculate cluster-wide GPU utilization percentage"""
259 try:
260 # Get pods with GPU requests
261 pods = self.core_v1.list_pod_for_all_namespaces(
262 _request_timeout=self._k8s_timeout,
263 )
265 total_gpu_requested = 0
266 total_gpu_capacity = 0
268 # Get node GPU capacity
269 nodes = self.core_v1.list_node(_request_timeout=self._k8s_timeout)
270 for node in nodes.items:
271 gpu_capacity = node.status.allocatable.get("nvidia.com/gpu", "0")
272 total_gpu_capacity += int(gpu_capacity)
274 # Calculate GPU requests from running pods
275 for pod in pods.items:
276 if pod.status.phase == "Running":
277 for container in pod.spec.containers:
278 if container.resources and container.resources.requests:
279 gpu_request = container.resources.requests.get("nvidia.com/gpu", "0")
280 total_gpu_requested += int(gpu_request)
282 if total_gpu_capacity > 0:
283 return (total_gpu_requested / total_gpu_capacity) * 100
285 except Exception as e:
286 logger.error(f"Error calculating GPU utilization: {e}")
288 return 0.0
290 async def _get_active_jobs_count(self) -> int:
291 """Get count of active jobs in the cluster"""
292 try:
293 # Count running pods (excluding system pods)
294 pods = self.core_v1.list_pod_for_all_namespaces(
295 _request_timeout=self._k8s_timeout,
296 )
297 active_jobs = 0
299 for pod in pods.items:
300 # Skip system namespaces
301 if pod.metadata.namespace in ["kube-system", "kube-public", "kube-node-lease"]:
302 continue
304 # Count running pods as active jobs
305 if pod.status.phase == "Running":
306 active_jobs += 1
308 return active_jobs
310 except Exception as e:
311 logger.error(f"Error getting active jobs count: {e}")
312 return 0
314 async def _get_pod_counts(self) -> tuple[int, int]:
315 """Get count of active jobs and pending pods in the cluster"""
316 try:
317 pods = self.core_v1.list_pod_for_all_namespaces(
318 _request_timeout=self._k8s_timeout,
319 )
320 active_jobs = 0
321 pending_pods = 0
323 for pod in pods.items:
324 # Skip system namespaces
325 if pod.metadata.namespace in ["kube-system", "kube-public", "kube-node-lease"]:
326 continue
328 if pod.status.phase == "Running":
329 active_jobs += 1
330 elif pod.status.phase == "Pending": 330 ↛ 323line 330 didn't jump to line 323 because the condition on line 330 was always true
331 pending_pods += 1
333 return active_jobs, pending_pods
335 except Exception as e:
336 logger.error(f"Error getting pod counts: {e}")
337 return 0, 0
339 async def _calculate_pending_requested_resources(self) -> RequestedResources:
340 """Calculate total resources requested by pending pods"""
341 try:
342 pods = self.core_v1.list_pod_for_all_namespaces(
343 _request_timeout=self._k8s_timeout,
344 )
345 total_cpu_millicores = 0.0
346 total_memory_bytes = 0
347 total_gpus = 0
349 for pod in pods.items:
350 # Skip system namespaces
351 if pod.metadata.namespace in ["kube-system", "kube-public", "kube-node-lease"]:
352 continue
354 # Only count pending pods
355 if pod.status.phase != "Pending":
356 continue
358 for container in pod.spec.containers:
359 if container.resources and container.resources.requests: 359 ↛ 358line 359 didn't jump to line 358 because the condition on line 359 was always true
360 # CPU
361 cpu_request = container.resources.requests.get("cpu", "0")
362 if cpu_request.endswith("m"):
363 total_cpu_millicores += int(cpu_request[:-1])
364 elif cpu_request.endswith("n"):
365 total_cpu_millicores += int(cpu_request[:-1]) / 1_000_000
366 else:
367 total_cpu_millicores += float(cpu_request) * 1000
369 # Memory
370 memory_request = container.resources.requests.get("memory", "0")
371 total_memory_bytes += self._parse_memory_string(memory_request)
373 # GPUs
374 gpu_request = container.resources.requests.get("nvidia.com/gpu", "0")
375 total_gpus += int(gpu_request)
377 # Convert to vCPUs and GB
378 cpu_vcpus = total_cpu_millicores / 1000
379 memory_gb = total_memory_bytes / (1024 * 1024 * 1024)
381 return RequestedResources(cpu_vcpus=cpu_vcpus, memory_gb=memory_gb, gpus=total_gpus)
383 except Exception as e:
384 logger.error(f"Error calculating pending requested resources: {e}")
385 return RequestedResources(cpu_vcpus=0.0, memory_gb=0.0, gpus=0)
387 async def get_health_status(self) -> HealthStatus:
388 """
389 Get current health status of the cluster
390 """
391 try:
392 # Get current metrics
393 (
394 resource_utilization,
395 active_jobs,
396 pending_pods,
397 pending_requested,
398 ) = await self.get_cluster_metrics()
400 # Determine health status based on thresholds
401 # A threshold of -1 means that check is disabled
402 is_healthy = True
403 if not self.thresholds.is_disabled("cpu_threshold"): 403 ↛ 407line 403 didn't jump to line 407 because the condition on line 403 was always true
404 is_healthy = (
405 is_healthy and resource_utilization.cpu <= self.thresholds.cpu_threshold
406 )
407 if not self.thresholds.is_disabled("memory_threshold"): 407 ↛ 411line 407 didn't jump to line 411 because the condition on line 407 was always true
408 is_healthy = (
409 is_healthy and resource_utilization.memory <= self.thresholds.memory_threshold
410 )
411 if not self.thresholds.is_disabled("gpu_threshold"): 411 ↛ 415line 411 didn't jump to line 415 because the condition on line 411 was always true
412 is_healthy = (
413 is_healthy and resource_utilization.gpu <= self.thresholds.gpu_threshold
414 )
415 if not self.thresholds.is_disabled("pending_pods_threshold"): 415 ↛ 417line 415 didn't jump to line 417 because the condition on line 415 was always true
416 is_healthy = is_healthy and pending_pods <= self.thresholds.pending_pods_threshold
417 if not self.thresholds.is_disabled("pending_requested_cpu_vcpus"): 417 ↛ 422line 417 didn't jump to line 422 because the condition on line 417 was always true
418 is_healthy = (
419 is_healthy
420 and pending_requested.cpu_vcpus <= self.thresholds.pending_requested_cpu_vcpus
421 )
422 if not self.thresholds.is_disabled("pending_requested_memory_gb"): 422 ↛ 427line 422 didn't jump to line 427 because the condition on line 422 was always true
423 is_healthy = (
424 is_healthy
425 and pending_requested.memory_gb <= self.thresholds.pending_requested_memory_gb
426 )
427 if not self.thresholds.is_disabled("pending_requested_gpus"): 427 ↛ 432line 427 didn't jump to line 432 because the condition on line 427 was always true
428 is_healthy = (
429 is_healthy and pending_requested.gpus <= self.thresholds.pending_requested_gpus
430 )
432 status: Literal["healthy", "unhealthy"] = "healthy" if is_healthy else "unhealthy"
434 # Generate status message
435 message = None
436 if not is_healthy:
437 violations = []
438 if (
439 not self.thresholds.is_disabled("cpu_threshold")
440 and resource_utilization.cpu > self.thresholds.cpu_threshold
441 ):
442 violations.append(
443 f"CPU: {resource_utilization.cpu:.1f}% > {self.thresholds.cpu_threshold}%"
444 )
445 if (
446 not self.thresholds.is_disabled("memory_threshold")
447 and resource_utilization.memory > self.thresholds.memory_threshold
448 ):
449 violations.append(
450 f"Memory: {resource_utilization.memory:.1f}% > {self.thresholds.memory_threshold}%"
451 )
452 if (
453 not self.thresholds.is_disabled("gpu_threshold")
454 and resource_utilization.gpu > self.thresholds.gpu_threshold
455 ):
456 violations.append(
457 f"GPU: {resource_utilization.gpu:.1f}% > {self.thresholds.gpu_threshold}%"
458 )
459 if (
460 not self.thresholds.is_disabled("pending_pods_threshold")
461 and pending_pods > self.thresholds.pending_pods_threshold
462 ):
463 violations.append(
464 f"Pending Pods: {pending_pods} > {self.thresholds.pending_pods_threshold}"
465 )
466 if (
467 not self.thresholds.is_disabled("pending_requested_cpu_vcpus")
468 and pending_requested.cpu_vcpus > self.thresholds.pending_requested_cpu_vcpus
469 ):
470 violations.append(
471 f"Pending CPU: {pending_requested.cpu_vcpus:.1f} vCPUs > {self.thresholds.pending_requested_cpu_vcpus} vCPUs"
472 )
473 if (
474 not self.thresholds.is_disabled("pending_requested_memory_gb")
475 and pending_requested.memory_gb > self.thresholds.pending_requested_memory_gb
476 ):
477 violations.append(
478 f"Pending Memory: {pending_requested.memory_gb:.1f} GB > {self.thresholds.pending_requested_memory_gb} GB"
479 )
480 if (
481 not self.thresholds.is_disabled("pending_requested_gpus")
482 and pending_requested.gpus > self.thresholds.pending_requested_gpus
483 ):
484 violations.append(
485 f"Pending GPUs: {pending_requested.gpus} > {self.thresholds.pending_requested_gpus}"
486 )
487 message = f"Threshold violations: {', '.join(violations)}"
489 health_status = HealthStatus(
490 cluster_id=self.cluster_id,
491 region=self.region,
492 timestamp=datetime.now(),
493 status=status,
494 resource_utilization=resource_utilization,
495 thresholds=self.thresholds,
496 active_jobs=active_jobs,
497 pending_pods=pending_pods,
498 pending_requested=pending_requested,
499 message=message,
500 )
502 logger.info(f"Health status: {status} - {message or 'All thresholds within limits'}")
503 return health_status
505 except Exception as e:
506 logger.error(f"Error getting health status: {e}")
507 # Return unhealthy status on error
508 return HealthStatus(
509 cluster_id=self.cluster_id,
510 region=self.region,
511 timestamp=datetime.now(),
512 status="unhealthy",
513 resource_utilization=ResourceUtilization(cpu=0.0, memory=0.0, gpu=0.0),
514 thresholds=self.thresholds,
515 active_jobs=0,
516 pending_pods=0,
517 pending_requested=RequestedResources(cpu_vcpus=0.0, memory_gb=0.0, gpus=0),
518 message=f"Health check error: {e!s}",
519 )
521 async def sync_alb_registration(self) -> None:
522 """Ensure the SSM ALB hostname parameter matches the actual ALB.
524 Reads the main ingress status to get the current ALB hostname,
525 compares it to the SSM parameter, and updates SSM if stale.
526 This makes the system self-healing when the ALB changes
527 (e.g., due to IngressClassParams group merges).
529 Runs at most once every 5 minutes to avoid excessive API calls.
530 """
531 now = datetime.now()
532 if (
533 self._last_alb_sync
534 and (now - self._last_alb_sync).total_seconds() < self._alb_sync_interval
535 ):
536 return
538 self._last_alb_sync = now
540 try:
541 # Read the main ingress to get the current ALB hostname
542 ingress = self.networking_v1.read_namespaced_ingress(
543 "gco-ingress",
544 "gco-system",
545 _request_timeout=self._k8s_timeout,
546 )
547 lb_ingress = ingress.status.load_balancer.ingress
548 if not lb_ingress:
549 return
551 current_hostname = lb_ingress[0].hostname
552 if not current_hostname: 552 ↛ 553line 552 didn't jump to line 553 because the condition on line 552 was never true
553 return
555 # Compare with SSM parameter
556 import os
558 import boto3
560 global_region = os.environ.get("GLOBAL_REGION", "us-east-2")
561 project_name = os.environ.get("PROJECT_NAME", "gco")
562 ssm = boto3.client("ssm", region_name=global_region)
563 param_name = f"/{project_name}/alb-hostname-{self.region}"
565 try:
566 resp = ssm.get_parameter(Name=param_name)
567 stored_hostname = resp["Parameter"]["Value"]
568 except ssm.exceptions.ParameterNotFound:
569 stored_hostname = None
571 if stored_hostname != current_hostname:
572 logger.warning(
573 "ALB hostname mismatch: SSM=%s, actual=%s. Updating SSM.",
574 stored_hostname,
575 current_hostname,
576 )
577 ssm.put_parameter(
578 Name=param_name,
579 Value=current_hostname,
580 Type="String",
581 Overwrite=True,
582 )
583 logger.info("Updated SSM parameter %s to %s", param_name, current_hostname)
585 except Exception as e:
586 logger.warning("ALB sync check failed (non-fatal): %s", e)
589def create_health_monitor_from_env() -> HealthMonitor:
590 """
591 Create HealthMonitor instance from environment variables
592 """
593 cluster_id = os.getenv("CLUSTER_NAME", "unknown-cluster")
594 region = os.getenv("REGION", "unknown-region")
596 # Load thresholds from environment (defaults match cdk.json)
597 cpu_threshold = int(os.getenv("CPU_THRESHOLD", "60"))
598 memory_threshold = int(os.getenv("MEMORY_THRESHOLD", "60"))
599 gpu_threshold = int(os.getenv("GPU_THRESHOLD", "60"))
600 pending_pods_threshold = int(os.getenv("PENDING_PODS_THRESHOLD", "10"))
601 pending_requested_cpu_vcpus = int(os.getenv("PENDING_REQUESTED_CPU_VCPUS", "100"))
602 pending_requested_memory_gb = int(os.getenv("PENDING_REQUESTED_MEMORY_GB", "200"))
603 pending_requested_gpus = int(os.getenv("PENDING_REQUESTED_GPUS", "8"))
605 thresholds = ResourceThresholds(
606 cpu_threshold=cpu_threshold,
607 memory_threshold=memory_threshold,
608 gpu_threshold=gpu_threshold,
609 pending_pods_threshold=pending_pods_threshold,
610 pending_requested_cpu_vcpus=pending_requested_cpu_vcpus,
611 pending_requested_memory_gb=pending_requested_memory_gb,
612 pending_requested_gpus=pending_requested_gpus,
613 )
615 return HealthMonitor(cluster_id, region, thresholds)
618async def main() -> None:
619 """
620 Main function for running the health monitor with webhook dispatcher.
622 This runs both the health monitoring loop and the webhook dispatcher
623 as concurrent tasks.
624 """
625 from gco.services.webhook_dispatcher import create_webhook_dispatcher_from_env
627 health_monitor = create_health_monitor_from_env()
629 # Enable structured JSON logging for CloudWatch Insights
630 configure_structured_logging(
631 service_name="health-monitor",
632 cluster_id=health_monitor.cluster_id,
633 region=health_monitor.region,
634 )
636 webhook_dispatcher = create_webhook_dispatcher_from_env()
638 # Start webhook dispatcher
639 await webhook_dispatcher.start()
640 logger.info("Webhook dispatcher started")
642 try:
643 while True:
644 try:
645 health_status = await health_monitor.get_health_status()
646 print(f"Health Status: {health_status.status}")
647 print(f"CPU: {health_status.resource_utilization.cpu:.1f}%")
648 print(f"Memory: {health_status.resource_utilization.memory:.1f}%")
649 print(f"GPU: {health_status.resource_utilization.gpu:.1f}%")
650 print(f"Active Jobs: {health_status.active_jobs}")
651 print(f"Pending Pods: {health_status.pending_pods}")
652 if health_status.pending_requested:
653 print(
654 f"Pending Requested CPU: {health_status.pending_requested.cpu_vcpus:.1f} vCPUs"
655 )
656 print(
657 f"Pending Requested Memory: {health_status.pending_requested.memory_gb:.1f} GB"
658 )
659 if health_status.message:
660 print(f"Message: {health_status.message}")
662 # Print webhook dispatcher metrics
663 webhook_metrics = webhook_dispatcher.get_metrics()
664 print(
665 f"Webhook Deliveries: {webhook_metrics['deliveries_total']} "
666 f"(success={webhook_metrics['deliveries_success']}, "
667 f"failed={webhook_metrics['deliveries_failed']})"
668 )
669 print("-" * 50)
671 await asyncio.sleep(30) # Check every 30 seconds
673 except KeyboardInterrupt:
674 raise
675 except Exception as e:
676 logger.error(f"Error in main loop: {e}")
677 await asyncio.sleep(10)
679 except KeyboardInterrupt:
680 logger.info("Health monitor stopped by user")
681 finally:
682 await webhook_dispatcher.stop()
683 logger.info("Webhook dispatcher stopped")
686if __name__ == "__main__":
687 asyncio.run(main())