Coverage for gco/services/health_monitor.py: 97%
322 statements
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-15 15:07 +0000
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-15 15:07 +0000
1"""
2Health Monitor Service for GCO (Global Capacity Orchestrator on AWS).
4This service monitors Kubernetes cluster resource utilization and reports
5health status for load balancer health checks and monitoring dashboards.
7Key Features:
8- Collects CPU, memory, and GPU utilization metrics from Kubernetes Metrics Server
9- Compares utilization against configurable thresholds
10- Reports health status (healthy/unhealthy) based on threshold violations
11- Caches metrics to reduce API calls to Kubernetes
13Environment Variables:
14 CLUSTER_NAME: Name of the EKS cluster being monitored
15 REGION: AWS region of the cluster
16 CPU_THRESHOLD: CPU utilization threshold percentage (default: 80, -1 to disable)
17 MEMORY_THRESHOLD: Memory utilization threshold percentage (default: 85, -1 to disable)
18 GPU_THRESHOLD: GPU utilization threshold percentage (default: 90, -1 to disable)
20Usage:
21 health_monitor = create_health_monitor_from_env()
22 status = await health_monitor.get_health_status()
23"""
25import asyncio
26import logging
27import os
28from datetime import datetime
29from typing import Any, Literal
31from kubernetes import client, config
32from kubernetes.client.rest import ApiException
34from gco.models import HealthStatus, RequestedResources, ResourceThresholds, ResourceUtilization
35from gco.services.structured_logging import configure_structured_logging
37logging.basicConfig(
38 level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
39)
40logger = logging.getLogger(__name__)
43class HealthMonitor:
44 """
45 Monitors Kubernetes cluster resource utilization and determines health status
46 """
48 def __init__(self, cluster_id: str, region: str, thresholds: ResourceThresholds):
49 self.cluster_id = cluster_id
50 self.region = region
51 self.thresholds = thresholds
53 # Initialize Kubernetes clients
54 try:
55 # Try to load in-cluster config first (when running in pod)
56 config.load_incluster_config()
57 logger.info("Loaded in-cluster Kubernetes configuration")
58 except config.ConfigException:
59 try:
60 # Fall back to local kubeconfig (for development)
61 config.load_kube_config()
62 logger.info("Loaded local Kubernetes configuration")
63 except config.ConfigException as e:
64 logger.error(f"Failed to load Kubernetes configuration: {e}")
65 raise
67 self.core_v1 = client.CoreV1Api()
68 self.networking_v1 = client.NetworkingV1Api()
69 self.metrics_v1beta1 = client.CustomObjectsApi()
71 # Timeout for Kubernetes API calls (seconds)
72 self._k8s_timeout = int(os.environ.get("K8S_API_TIMEOUT", "30"))
74 # Cache for metrics
75 self._last_metrics_time: datetime | None = None
76 self._cached_metrics: dict[str, Any] | None = None
77 self._cache_duration = 30 # seconds
79 # ALB hostname sync
80 self._last_alb_sync: datetime | None = None
81 self._alb_sync_interval = 300 # 5 minutes
83 async def get_cluster_metrics(self) -> tuple[ResourceUtilization, int, int, RequestedResources]:
84 """
85 Get current cluster resource utilization metrics
86 Returns: (ResourceUtilization, active_jobs_count, pending_pods_count, pending_requested_resources)
87 """
88 try:
89 # Get node metrics from metrics server
90 node_metrics = await self._get_node_metrics()
92 # Get pod metrics for active jobs count and pending pods
93 active_jobs, pending_pods = await self._get_pod_counts()
95 # Calculate cluster-wide utilization
96 cpu_utilization = self._calculate_cpu_utilization(node_metrics)
97 memory_utilization = self._calculate_memory_utilization(node_metrics)
98 gpu_utilization = await self._calculate_gpu_utilization()
100 # Calculate resources requested by pending pods
101 pending_requested = await self._calculate_pending_requested_resources()
103 resource_utilization = ResourceUtilization(
104 cpu=cpu_utilization, memory=memory_utilization, gpu=gpu_utilization
105 )
107 logger.info(
108 f"Cluster metrics - CPU: {cpu_utilization:.1f}%, "
109 f"Memory: {memory_utilization:.1f}%, GPU: {gpu_utilization:.1f}%, "
110 f"Active Jobs: {active_jobs}, Pending Pods: {pending_pods}, "
111 f"Pending Requested CPU: {pending_requested.cpu_vcpus:.1f} vCPUs, "
112 f"Pending Requested Memory: {pending_requested.memory_gb:.1f} GB"
113 )
115 return resource_utilization, active_jobs, pending_pods, pending_requested
117 except Exception as e:
118 logger.error(f"Failed to get cluster metrics: {e}")
119 # Re-raise so get_health_status returns "unhealthy" instead of
120 # silently reporting 0% utilization (which looks healthy to GA).
121 raise
123 async def _get_node_metrics(self) -> dict[str, Any]:
124 """Get node metrics from Kubernetes metrics server"""
125 try:
126 # Check cache first
127 now = datetime.now()
128 if (
129 self._cached_metrics
130 and self._last_metrics_time
131 and (now - self._last_metrics_time).seconds < self._cache_duration
132 ):
133 return self._cached_metrics
135 # Fetch fresh metrics
136 node_metrics: dict[str, Any] = self.metrics_v1beta1.list_cluster_custom_object(
137 group="metrics.k8s.io",
138 version="v1beta1",
139 plural="nodes",
140 _request_timeout=self._k8s_timeout,
141 )
143 # Update cache
144 self._cached_metrics = node_metrics
145 self._last_metrics_time = now
147 return node_metrics
149 except ApiException as e:
150 logger.error(f"Failed to get node metrics: {e}")
151 # Invalidate cache so stale data isn't used on next call
152 self._cached_metrics = None
153 self._last_metrics_time = None
154 # Re-raise so get_cluster_metrics propagates the failure
155 # to get_health_status, which returns "unhealthy"
156 raise
158 def _calculate_cpu_utilization(self, node_metrics: dict[str, Any]) -> float:
159 """Calculate cluster-wide CPU utilization percentage"""
160 total_cpu_usage = 0.0
161 total_cpu_capacity = 0.0
163 try:
164 # Get node list for capacity information
165 nodes = self.core_v1.list_node(_request_timeout=self._k8s_timeout)
166 node_capacities = {}
168 for node in nodes.items:
169 node_name = node.metadata.name
170 cpu_capacity = node.status.allocatable.get("cpu", "0")
171 # Convert CPU capacity to millicores
172 if cpu_capacity.endswith("m"):
173 cpu_capacity_millicores = int(cpu_capacity[:-1])
174 else:
175 cpu_capacity_millicores = int(cpu_capacity) * 1000
176 node_capacities[node_name] = cpu_capacity_millicores
177 total_cpu_capacity += cpu_capacity_millicores
179 # Calculate usage from metrics
180 for item in node_metrics.get("items", []):
181 cpu_usage = item["usage"]["cpu"]
183 # Convert CPU usage to millicores
184 if cpu_usage.endswith("n"):
185 cpu_usage_millicores = int(cpu_usage[:-1]) / 1_000_000
186 elif cpu_usage.endswith("u"):
187 cpu_usage_millicores = int(cpu_usage[:-1]) / 1_000
188 elif cpu_usage.endswith("m"):
189 cpu_usage_millicores = int(cpu_usage[:-1])
190 else:
191 cpu_usage_millicores = int(cpu_usage) * 1000
193 total_cpu_usage += cpu_usage_millicores
195 if total_cpu_capacity > 0:
196 return (total_cpu_usage / total_cpu_capacity) * 100
198 except Exception as e:
199 logger.error(f"Error calculating CPU utilization: {e}")
201 return 0.0
203 def _calculate_memory_utilization(self, node_metrics: dict[str, Any]) -> float:
204 """Calculate cluster-wide memory utilization percentage"""
205 total_memory_usage = 0
206 total_memory_capacity = 0
208 try:
209 # Get node list for capacity information
210 nodes = self.core_v1.list_node(_request_timeout=self._k8s_timeout)
212 for node in nodes.items:
213 memory_capacity = node.status.allocatable.get("memory", "0")
214 # Convert memory capacity to bytes
215 memory_capacity_bytes = self._parse_memory_string(memory_capacity)
216 total_memory_capacity += memory_capacity_bytes
218 # Calculate usage from metrics
219 for item in node_metrics.get("items", []):
220 memory_usage = item["usage"]["memory"]
221 memory_usage_bytes = self._parse_memory_string(memory_usage)
222 total_memory_usage += memory_usage_bytes
224 if total_memory_capacity > 0:
225 return (total_memory_usage / total_memory_capacity) * 100
227 except Exception as e:
228 logger.error(f"Error calculating memory utilization: {e}")
230 return 0.0
232 def _parse_memory_string(self, memory_str: str) -> int:
233 """Parse Kubernetes memory string to bytes"""
234 if not memory_str:
235 return 0
237 memory_str = memory_str.strip()
239 # Handle different units
240 if memory_str.endswith("Ki"):
241 return int(memory_str[:-2]) * 1024
242 if memory_str.endswith("Mi"):
243 return int(memory_str[:-2]) * 1024 * 1024
244 if memory_str.endswith("Gi"):
245 return int(memory_str[:-2]) * 1024 * 1024 * 1024
246 if memory_str.endswith("Ti"):
247 return int(memory_str[:-2]) * 1024 * 1024 * 1024 * 1024
248 if memory_str.endswith("k"):
249 return int(memory_str[:-1]) * 1000
250 if memory_str.endswith("M"):
251 return int(memory_str[:-1]) * 1000 * 1000
252 if memory_str.endswith("G"):
253 return int(memory_str[:-1]) * 1000 * 1000 * 1000
254 return int(memory_str)
256 async def _calculate_gpu_utilization(self) -> float:
257 """Calculate cluster-wide GPU utilization percentage"""
258 try:
259 # Get pods with GPU requests
260 pods = self.core_v1.list_pod_for_all_namespaces(
261 _request_timeout=self._k8s_timeout,
262 )
264 total_gpu_requested = 0
265 total_gpu_capacity = 0
267 # Get node GPU capacity
268 nodes = self.core_v1.list_node(_request_timeout=self._k8s_timeout)
269 for node in nodes.items:
270 gpu_capacity = node.status.allocatable.get("nvidia.com/gpu", "0")
271 total_gpu_capacity += int(gpu_capacity)
273 # Calculate GPU requests from running pods
274 for pod in pods.items:
275 if pod.status.phase == "Running":
276 for container in pod.spec.containers:
277 if container.resources and container.resources.requests:
278 gpu_request = container.resources.requests.get("nvidia.com/gpu", "0")
279 total_gpu_requested += int(gpu_request)
281 if total_gpu_capacity > 0:
282 return (total_gpu_requested / total_gpu_capacity) * 100
284 except Exception as e:
285 logger.error(f"Error calculating GPU utilization: {e}")
287 return 0.0
289 async def _get_active_jobs_count(self) -> int:
290 """Get count of active jobs in the cluster"""
291 try:
292 # Count running pods (excluding system pods)
293 pods = self.core_v1.list_pod_for_all_namespaces(
294 _request_timeout=self._k8s_timeout,
295 )
296 active_jobs = 0
298 for pod in pods.items:
299 # Skip system namespaces
300 if pod.metadata.namespace in ["kube-system", "kube-public", "kube-node-lease"]:
301 continue
303 # Count running pods as active jobs
304 if pod.status.phase == "Running":
305 active_jobs += 1
307 return active_jobs
309 except Exception as e:
310 logger.error(f"Error getting active jobs count: {e}")
311 return 0
313 async def _get_pod_counts(self) -> tuple[int, int]:
314 """Get count of active jobs and pending pods in the cluster"""
315 try:
316 pods = self.core_v1.list_pod_for_all_namespaces(
317 _request_timeout=self._k8s_timeout,
318 )
319 active_jobs = 0
320 pending_pods = 0
322 for pod in pods.items:
323 # Skip system namespaces
324 if pod.metadata.namespace in ["kube-system", "kube-public", "kube-node-lease"]:
325 continue
327 if pod.status.phase == "Running":
328 active_jobs += 1
329 elif pod.status.phase == "Pending": 329 ↛ 322line 329 didn't jump to line 322 because the condition on line 329 was always true
330 pending_pods += 1
332 return active_jobs, pending_pods
334 except Exception as e:
335 logger.error(f"Error getting pod counts: {e}")
336 return 0, 0
338 async def _calculate_pending_requested_resources(self) -> RequestedResources:
339 """Calculate total resources requested by pending pods"""
340 try:
341 pods = self.core_v1.list_pod_for_all_namespaces(
342 _request_timeout=self._k8s_timeout,
343 )
344 total_cpu_millicores = 0.0
345 total_memory_bytes = 0
346 total_gpus = 0
348 for pod in pods.items:
349 # Skip system namespaces
350 if pod.metadata.namespace in ["kube-system", "kube-public", "kube-node-lease"]:
351 continue
353 # Only count pending pods
354 if pod.status.phase != "Pending":
355 continue
357 for container in pod.spec.containers:
358 if container.resources and container.resources.requests: 358 ↛ 357line 358 didn't jump to line 357 because the condition on line 358 was always true
359 # CPU
360 cpu_request = container.resources.requests.get("cpu", "0")
361 if cpu_request.endswith("m"):
362 total_cpu_millicores += int(cpu_request[:-1])
363 elif cpu_request.endswith("n"):
364 total_cpu_millicores += int(cpu_request[:-1]) / 1_000_000
365 else:
366 total_cpu_millicores += float(cpu_request) * 1000
368 # Memory
369 memory_request = container.resources.requests.get("memory", "0")
370 total_memory_bytes += self._parse_memory_string(memory_request)
372 # GPUs
373 gpu_request = container.resources.requests.get("nvidia.com/gpu", "0")
374 total_gpus += int(gpu_request)
376 # Convert to vCPUs and GB
377 cpu_vcpus = total_cpu_millicores / 1000
378 memory_gb = total_memory_bytes / (1024 * 1024 * 1024)
380 return RequestedResources(cpu_vcpus=cpu_vcpus, memory_gb=memory_gb, gpus=total_gpus)
382 except Exception as e:
383 logger.error(f"Error calculating pending requested resources: {e}")
384 return RequestedResources(cpu_vcpus=0.0, memory_gb=0.0, gpus=0)
386 async def get_health_status(self) -> HealthStatus:
387 """
388 Get current health status of the cluster
389 """
390 try:
391 # Get current metrics
392 (
393 resource_utilization,
394 active_jobs,
395 pending_pods,
396 pending_requested,
397 ) = await self.get_cluster_metrics()
399 # Determine health status based on thresholds
400 # A threshold of -1 means that check is disabled
401 is_healthy = True
402 if not self.thresholds.is_disabled("cpu_threshold"): 402 ↛ 406line 402 didn't jump to line 406 because the condition on line 402 was always true
403 is_healthy = (
404 is_healthy and resource_utilization.cpu <= self.thresholds.cpu_threshold
405 )
406 if not self.thresholds.is_disabled("memory_threshold"): 406 ↛ 410line 406 didn't jump to line 410 because the condition on line 406 was always true
407 is_healthy = (
408 is_healthy and resource_utilization.memory <= self.thresholds.memory_threshold
409 )
410 if not self.thresholds.is_disabled("gpu_threshold"): 410 ↛ 414line 410 didn't jump to line 414 because the condition on line 410 was always true
411 is_healthy = (
412 is_healthy and resource_utilization.gpu <= self.thresholds.gpu_threshold
413 )
414 if not self.thresholds.is_disabled("pending_pods_threshold"): 414 ↛ 416line 414 didn't jump to line 416 because the condition on line 414 was always true
415 is_healthy = is_healthy and pending_pods <= self.thresholds.pending_pods_threshold
416 if not self.thresholds.is_disabled("pending_requested_cpu_vcpus"): 416 ↛ 421line 416 didn't jump to line 421 because the condition on line 416 was always true
417 is_healthy = (
418 is_healthy
419 and pending_requested.cpu_vcpus <= self.thresholds.pending_requested_cpu_vcpus
420 )
421 if not self.thresholds.is_disabled("pending_requested_memory_gb"): 421 ↛ 426line 421 didn't jump to line 426 because the condition on line 421 was always true
422 is_healthy = (
423 is_healthy
424 and pending_requested.memory_gb <= self.thresholds.pending_requested_memory_gb
425 )
426 if not self.thresholds.is_disabled("pending_requested_gpus"): 426 ↛ 431line 426 didn't jump to line 431 because the condition on line 426 was always true
427 is_healthy = (
428 is_healthy and pending_requested.gpus <= self.thresholds.pending_requested_gpus
429 )
431 status: Literal["healthy", "unhealthy"] = "healthy" if is_healthy else "unhealthy"
433 # Generate status message
434 message = None
435 if not is_healthy:
436 violations = []
437 if (
438 not self.thresholds.is_disabled("cpu_threshold")
439 and resource_utilization.cpu > self.thresholds.cpu_threshold
440 ):
441 violations.append(
442 f"CPU: {resource_utilization.cpu:.1f}% > {self.thresholds.cpu_threshold}%"
443 )
444 if (
445 not self.thresholds.is_disabled("memory_threshold")
446 and resource_utilization.memory > self.thresholds.memory_threshold
447 ):
448 violations.append(
449 f"Memory: {resource_utilization.memory:.1f}% > {self.thresholds.memory_threshold}%"
450 )
451 if (
452 not self.thresholds.is_disabled("gpu_threshold")
453 and resource_utilization.gpu > self.thresholds.gpu_threshold
454 ):
455 violations.append(
456 f"GPU: {resource_utilization.gpu:.1f}% > {self.thresholds.gpu_threshold}%"
457 )
458 if (
459 not self.thresholds.is_disabled("pending_pods_threshold")
460 and pending_pods > self.thresholds.pending_pods_threshold
461 ):
462 violations.append(
463 f"Pending Pods: {pending_pods} > {self.thresholds.pending_pods_threshold}"
464 )
465 if (
466 not self.thresholds.is_disabled("pending_requested_cpu_vcpus")
467 and pending_requested.cpu_vcpus > self.thresholds.pending_requested_cpu_vcpus
468 ):
469 violations.append(
470 f"Pending CPU: {pending_requested.cpu_vcpus:.1f} vCPUs > {self.thresholds.pending_requested_cpu_vcpus} vCPUs"
471 )
472 if (
473 not self.thresholds.is_disabled("pending_requested_memory_gb")
474 and pending_requested.memory_gb > self.thresholds.pending_requested_memory_gb
475 ):
476 violations.append(
477 f"Pending Memory: {pending_requested.memory_gb:.1f} GB > {self.thresholds.pending_requested_memory_gb} GB"
478 )
479 if (
480 not self.thresholds.is_disabled("pending_requested_gpus")
481 and pending_requested.gpus > self.thresholds.pending_requested_gpus
482 ):
483 violations.append(
484 f"Pending GPUs: {pending_requested.gpus} > {self.thresholds.pending_requested_gpus}"
485 )
486 message = f"Threshold violations: {', '.join(violations)}"
488 health_status = HealthStatus(
489 cluster_id=self.cluster_id,
490 region=self.region,
491 timestamp=datetime.now(),
492 status=status,
493 resource_utilization=resource_utilization,
494 thresholds=self.thresholds,
495 active_jobs=active_jobs,
496 pending_pods=pending_pods,
497 pending_requested=pending_requested,
498 message=message,
499 )
501 logger.info(f"Health status: {status} - {message or 'All thresholds within limits'}")
502 return health_status
504 except Exception as e:
505 logger.error(f"Error getting health status: {e}")
506 # Return unhealthy status on error
507 return HealthStatus(
508 cluster_id=self.cluster_id,
509 region=self.region,
510 timestamp=datetime.now(),
511 status="unhealthy",
512 resource_utilization=ResourceUtilization(cpu=0.0, memory=0.0, gpu=0.0),
513 thresholds=self.thresholds,
514 active_jobs=0,
515 pending_pods=0,
516 pending_requested=RequestedResources(cpu_vcpus=0.0, memory_gb=0.0, gpus=0),
517 message=f"Health check error: {e!s}",
518 )
520 async def sync_alb_registration(self) -> None:
521 """Ensure the SSM ALB hostname parameter matches the actual ALB.
523 Reads the main ingress status to get the current ALB hostname,
524 compares it to the SSM parameter, and updates SSM if stale.
525 This makes the system self-healing when the ALB changes
526 (e.g., due to IngressClassParams group merges).
528 Runs at most once every 5 minutes to avoid excessive API calls.
529 """
530 now = datetime.now()
531 if (
532 self._last_alb_sync
533 and (now - self._last_alb_sync).total_seconds() < self._alb_sync_interval
534 ):
535 return
537 self._last_alb_sync = now
539 try:
540 # Read the main ingress to get the current ALB hostname
541 ingress = self.networking_v1.read_namespaced_ingress(
542 "gco-ingress",
543 "gco-system",
544 _request_timeout=self._k8s_timeout,
545 )
546 lb_ingress = ingress.status.load_balancer.ingress
547 if not lb_ingress:
548 return
550 current_hostname = lb_ingress[0].hostname
551 if not current_hostname: 551 ↛ 552line 551 didn't jump to line 552 because the condition on line 551 was never true
552 return
554 # Compare with SSM parameter
555 import os
557 from gco.services.aws_ssm import (
558 get_ssm_parameter_optional,
559 put_ssm_parameter,
560 )
562 global_region = os.environ.get("GLOBAL_REGION", "us-east-2")
563 project_name = os.environ.get("PROJECT_NAME", "gco")
564 param_name = f"/{project_name}/alb-hostname-{self.region}"
566 stored_hostname = get_ssm_parameter_optional(param_name, region=global_region)
568 if stored_hostname != current_hostname:
569 logger.warning(
570 "ALB hostname mismatch: SSM=%s, actual=%s. Updating SSM.",
571 stored_hostname,
572 current_hostname,
573 )
574 put_ssm_parameter(
575 param_name,
576 current_hostname,
577 region=global_region,
578 parameter_type="String",
579 overwrite=True,
580 )
581 logger.info("Updated SSM parameter %s to %s", param_name, current_hostname)
583 except Exception as e:
584 logger.warning("ALB sync check failed (non-fatal): %s", e)
587def create_health_monitor_from_env() -> HealthMonitor:
588 """
589 Create HealthMonitor instance from environment variables
590 """
591 cluster_id = os.getenv("CLUSTER_NAME", "unknown-cluster")
592 region = os.getenv("REGION", "unknown-region")
594 # Load thresholds from environment (defaults match cdk.json)
595 cpu_threshold = int(os.getenv("CPU_THRESHOLD", "60"))
596 memory_threshold = int(os.getenv("MEMORY_THRESHOLD", "60"))
597 gpu_threshold = int(os.getenv("GPU_THRESHOLD", "60"))
598 pending_pods_threshold = int(os.getenv("PENDING_PODS_THRESHOLD", "10"))
599 pending_requested_cpu_vcpus = int(os.getenv("PENDING_REQUESTED_CPU_VCPUS", "100"))
600 pending_requested_memory_gb = int(os.getenv("PENDING_REQUESTED_MEMORY_GB", "200"))
601 pending_requested_gpus = int(os.getenv("PENDING_REQUESTED_GPUS", "8"))
603 thresholds = ResourceThresholds(
604 cpu_threshold=cpu_threshold,
605 memory_threshold=memory_threshold,
606 gpu_threshold=gpu_threshold,
607 pending_pods_threshold=pending_pods_threshold,
608 pending_requested_cpu_vcpus=pending_requested_cpu_vcpus,
609 pending_requested_memory_gb=pending_requested_memory_gb,
610 pending_requested_gpus=pending_requested_gpus,
611 )
613 return HealthMonitor(cluster_id, region, thresholds)
616async def main() -> None:
617 """
618 Main function for running the health monitor with webhook dispatcher.
620 This runs both the health monitoring loop and the webhook dispatcher
621 as concurrent tasks.
622 """
623 from gco.services.webhook_dispatcher import create_webhook_dispatcher_from_env
625 health_monitor = create_health_monitor_from_env()
627 # Enable structured JSON logging for CloudWatch Insights
628 configure_structured_logging(
629 service_name="health-monitor",
630 cluster_id=health_monitor.cluster_id,
631 region=health_monitor.region,
632 )
634 webhook_dispatcher = create_webhook_dispatcher_from_env()
636 # Start webhook dispatcher
637 await webhook_dispatcher.start()
638 logger.info("Webhook dispatcher started")
640 try:
641 while True:
642 try:
643 health_status = await health_monitor.get_health_status()
644 print(f"Health Status: {health_status.status}")
645 print(f"CPU: {health_status.resource_utilization.cpu:.1f}%")
646 print(f"Memory: {health_status.resource_utilization.memory:.1f}%")
647 print(f"GPU: {health_status.resource_utilization.gpu:.1f}%")
648 print(f"Active Jobs: {health_status.active_jobs}")
649 print(f"Pending Pods: {health_status.pending_pods}")
650 if health_status.pending_requested:
651 print(
652 f"Pending Requested CPU: {health_status.pending_requested.cpu_vcpus:.1f} vCPUs"
653 )
654 print(
655 f"Pending Requested Memory: {health_status.pending_requested.memory_gb:.1f} GB"
656 )
657 if health_status.message:
658 print(f"Message: {health_status.message}")
660 # Print webhook dispatcher metrics
661 webhook_metrics = webhook_dispatcher.get_metrics()
662 print(
663 f"Webhook Deliveries: {webhook_metrics['deliveries_total']} "
664 f"(success={webhook_metrics['deliveries_success']}, "
665 f"failed={webhook_metrics['deliveries_failed']})"
666 )
667 print("-" * 50)
669 await asyncio.sleep(30) # Check every 30 seconds
671 except KeyboardInterrupt:
672 raise
673 except Exception as e:
674 logger.error(f"Error in main loop: {e}")
675 await asyncio.sleep(10)
677 except KeyboardInterrupt:
678 logger.info("Health monitor stopped by user")
679 finally:
680 await webhook_dispatcher.stop()
681 logger.info("Webhook dispatcher stopped")
684if __name__ == "__main__":
685 asyncio.run(main())