Coverage for gco/services/health_monitor.py: 97%

322 statements  

« prev     ^ index     » next       coverage.py v7.14.1, created at 2026-06-15 15:07 +0000

1""" 

2Health Monitor Service for GCO (Global Capacity Orchestrator on AWS). 

3 

4This service monitors Kubernetes cluster resource utilization and reports 

5health status for load balancer health checks and monitoring dashboards. 

6 

7Key Features: 

8- Collects CPU, memory, and GPU utilization metrics from Kubernetes Metrics Server 

9- Compares utilization against configurable thresholds 

10- Reports health status (healthy/unhealthy) based on threshold violations 

11- Caches metrics to reduce API calls to Kubernetes 

12 

13Environment Variables: 

14 CLUSTER_NAME: Name of the EKS cluster being monitored 

15 REGION: AWS region of the cluster 

16 CPU_THRESHOLD: CPU utilization threshold percentage (default: 80, -1 to disable) 

17 MEMORY_THRESHOLD: Memory utilization threshold percentage (default: 85, -1 to disable) 

18 GPU_THRESHOLD: GPU utilization threshold percentage (default: 90, -1 to disable) 

19 

20Usage: 

21 health_monitor = create_health_monitor_from_env() 

22 status = await health_monitor.get_health_status() 

23""" 

24 

25import asyncio 

26import logging 

27import os 

28from datetime import datetime 

29from typing import Any, Literal 

30 

31from kubernetes import client, config 

32from kubernetes.client.rest import ApiException 

33 

34from gco.models import HealthStatus, RequestedResources, ResourceThresholds, ResourceUtilization 

35from gco.services.structured_logging import configure_structured_logging 

36 

37logging.basicConfig( 

38 level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" 

39) 

40logger = logging.getLogger(__name__) 

41 

42 

43class HealthMonitor: 

44 """ 

45 Monitors Kubernetes cluster resource utilization and determines health status 

46 """ 

47 

48 def __init__(self, cluster_id: str, region: str, thresholds: ResourceThresholds): 

49 self.cluster_id = cluster_id 

50 self.region = region 

51 self.thresholds = thresholds 

52 

53 # Initialize Kubernetes clients 

54 try: 

55 # Try to load in-cluster config first (when running in pod) 

56 config.load_incluster_config() 

57 logger.info("Loaded in-cluster Kubernetes configuration") 

58 except config.ConfigException: 

59 try: 

60 # Fall back to local kubeconfig (for development) 

61 config.load_kube_config() 

62 logger.info("Loaded local Kubernetes configuration") 

63 except config.ConfigException as e: 

64 logger.error(f"Failed to load Kubernetes configuration: {e}") 

65 raise 

66 

67 self.core_v1 = client.CoreV1Api() 

68 self.networking_v1 = client.NetworkingV1Api() 

69 self.metrics_v1beta1 = client.CustomObjectsApi() 

70 

71 # Timeout for Kubernetes API calls (seconds) 

72 self._k8s_timeout = int(os.environ.get("K8S_API_TIMEOUT", "30")) 

73 

74 # Cache for metrics 

75 self._last_metrics_time: datetime | None = None 

76 self._cached_metrics: dict[str, Any] | None = None 

77 self._cache_duration = 30 # seconds 

78 

79 # ALB hostname sync 

80 self._last_alb_sync: datetime | None = None 

81 self._alb_sync_interval = 300 # 5 minutes 

82 

83 async def get_cluster_metrics(self) -> tuple[ResourceUtilization, int, int, RequestedResources]: 

84 """ 

85 Get current cluster resource utilization metrics 

86 Returns: (ResourceUtilization, active_jobs_count, pending_pods_count, pending_requested_resources) 

87 """ 

88 try: 

89 # Get node metrics from metrics server 

90 node_metrics = await self._get_node_metrics() 

91 

92 # Get pod metrics for active jobs count and pending pods 

93 active_jobs, pending_pods = await self._get_pod_counts() 

94 

95 # Calculate cluster-wide utilization 

96 cpu_utilization = self._calculate_cpu_utilization(node_metrics) 

97 memory_utilization = self._calculate_memory_utilization(node_metrics) 

98 gpu_utilization = await self._calculate_gpu_utilization() 

99 

100 # Calculate resources requested by pending pods 

101 pending_requested = await self._calculate_pending_requested_resources() 

102 

103 resource_utilization = ResourceUtilization( 

104 cpu=cpu_utilization, memory=memory_utilization, gpu=gpu_utilization 

105 ) 

106 

107 logger.info( 

108 f"Cluster metrics - CPU: {cpu_utilization:.1f}%, " 

109 f"Memory: {memory_utilization:.1f}%, GPU: {gpu_utilization:.1f}%, " 

110 f"Active Jobs: {active_jobs}, Pending Pods: {pending_pods}, " 

111 f"Pending Requested CPU: {pending_requested.cpu_vcpus:.1f} vCPUs, " 

112 f"Pending Requested Memory: {pending_requested.memory_gb:.1f} GB" 

113 ) 

114 

115 return resource_utilization, active_jobs, pending_pods, pending_requested 

116 

117 except Exception as e: 

118 logger.error(f"Failed to get cluster metrics: {e}") 

119 # Re-raise so get_health_status returns "unhealthy" instead of 

120 # silently reporting 0% utilization (which looks healthy to GA). 

121 raise 

122 

123 async def _get_node_metrics(self) -> dict[str, Any]: 

124 """Get node metrics from Kubernetes metrics server""" 

125 try: 

126 # Check cache first 

127 now = datetime.now() 

128 if ( 

129 self._cached_metrics 

130 and self._last_metrics_time 

131 and (now - self._last_metrics_time).seconds < self._cache_duration 

132 ): 

133 return self._cached_metrics 

134 

135 # Fetch fresh metrics 

136 node_metrics: dict[str, Any] = self.metrics_v1beta1.list_cluster_custom_object( 

137 group="metrics.k8s.io", 

138 version="v1beta1", 

139 plural="nodes", 

140 _request_timeout=self._k8s_timeout, 

141 ) 

142 

143 # Update cache 

144 self._cached_metrics = node_metrics 

145 self._last_metrics_time = now 

146 

147 return node_metrics 

148 

149 except ApiException as e: 

150 logger.error(f"Failed to get node metrics: {e}") 

151 # Invalidate cache so stale data isn't used on next call 

152 self._cached_metrics = None 

153 self._last_metrics_time = None 

154 # Re-raise so get_cluster_metrics propagates the failure 

155 # to get_health_status, which returns "unhealthy" 

156 raise 

157 

158 def _calculate_cpu_utilization(self, node_metrics: dict[str, Any]) -> float: 

159 """Calculate cluster-wide CPU utilization percentage""" 

160 total_cpu_usage = 0.0 

161 total_cpu_capacity = 0.0 

162 

163 try: 

164 # Get node list for capacity information 

165 nodes = self.core_v1.list_node(_request_timeout=self._k8s_timeout) 

166 node_capacities = {} 

167 

168 for node in nodes.items: 

169 node_name = node.metadata.name 

170 cpu_capacity = node.status.allocatable.get("cpu", "0") 

171 # Convert CPU capacity to millicores 

172 if cpu_capacity.endswith("m"): 

173 cpu_capacity_millicores = int(cpu_capacity[:-1]) 

174 else: 

175 cpu_capacity_millicores = int(cpu_capacity) * 1000 

176 node_capacities[node_name] = cpu_capacity_millicores 

177 total_cpu_capacity += cpu_capacity_millicores 

178 

179 # Calculate usage from metrics 

180 for item in node_metrics.get("items", []): 

181 cpu_usage = item["usage"]["cpu"] 

182 

183 # Convert CPU usage to millicores 

184 if cpu_usage.endswith("n"): 

185 cpu_usage_millicores = int(cpu_usage[:-1]) / 1_000_000 

186 elif cpu_usage.endswith("u"): 

187 cpu_usage_millicores = int(cpu_usage[:-1]) / 1_000 

188 elif cpu_usage.endswith("m"): 

189 cpu_usage_millicores = int(cpu_usage[:-1]) 

190 else: 

191 cpu_usage_millicores = int(cpu_usage) * 1000 

192 

193 total_cpu_usage += cpu_usage_millicores 

194 

195 if total_cpu_capacity > 0: 

196 return (total_cpu_usage / total_cpu_capacity) * 100 

197 

198 except Exception as e: 

199 logger.error(f"Error calculating CPU utilization: {e}") 

200 

201 return 0.0 

202 

203 def _calculate_memory_utilization(self, node_metrics: dict[str, Any]) -> float: 

204 """Calculate cluster-wide memory utilization percentage""" 

205 total_memory_usage = 0 

206 total_memory_capacity = 0 

207 

208 try: 

209 # Get node list for capacity information 

210 nodes = self.core_v1.list_node(_request_timeout=self._k8s_timeout) 

211 

212 for node in nodes.items: 

213 memory_capacity = node.status.allocatable.get("memory", "0") 

214 # Convert memory capacity to bytes 

215 memory_capacity_bytes = self._parse_memory_string(memory_capacity) 

216 total_memory_capacity += memory_capacity_bytes 

217 

218 # Calculate usage from metrics 

219 for item in node_metrics.get("items", []): 

220 memory_usage = item["usage"]["memory"] 

221 memory_usage_bytes = self._parse_memory_string(memory_usage) 

222 total_memory_usage += memory_usage_bytes 

223 

224 if total_memory_capacity > 0: 

225 return (total_memory_usage / total_memory_capacity) * 100 

226 

227 except Exception as e: 

228 logger.error(f"Error calculating memory utilization: {e}") 

229 

230 return 0.0 

231 

232 def _parse_memory_string(self, memory_str: str) -> int: 

233 """Parse Kubernetes memory string to bytes""" 

234 if not memory_str: 

235 return 0 

236 

237 memory_str = memory_str.strip() 

238 

239 # Handle different units 

240 if memory_str.endswith("Ki"): 

241 return int(memory_str[:-2]) * 1024 

242 if memory_str.endswith("Mi"): 

243 return int(memory_str[:-2]) * 1024 * 1024 

244 if memory_str.endswith("Gi"): 

245 return int(memory_str[:-2]) * 1024 * 1024 * 1024 

246 if memory_str.endswith("Ti"): 

247 return int(memory_str[:-2]) * 1024 * 1024 * 1024 * 1024 

248 if memory_str.endswith("k"): 

249 return int(memory_str[:-1]) * 1000 

250 if memory_str.endswith("M"): 

251 return int(memory_str[:-1]) * 1000 * 1000 

252 if memory_str.endswith("G"): 

253 return int(memory_str[:-1]) * 1000 * 1000 * 1000 

254 return int(memory_str) 

255 

256 async def _calculate_gpu_utilization(self) -> float: 

257 """Calculate cluster-wide GPU utilization percentage""" 

258 try: 

259 # Get pods with GPU requests 

260 pods = self.core_v1.list_pod_for_all_namespaces( 

261 _request_timeout=self._k8s_timeout, 

262 ) 

263 

264 total_gpu_requested = 0 

265 total_gpu_capacity = 0 

266 

267 # Get node GPU capacity 

268 nodes = self.core_v1.list_node(_request_timeout=self._k8s_timeout) 

269 for node in nodes.items: 

270 gpu_capacity = node.status.allocatable.get("nvidia.com/gpu", "0") 

271 total_gpu_capacity += int(gpu_capacity) 

272 

273 # Calculate GPU requests from running pods 

274 for pod in pods.items: 

275 if pod.status.phase == "Running": 

276 for container in pod.spec.containers: 

277 if container.resources and container.resources.requests: 

278 gpu_request = container.resources.requests.get("nvidia.com/gpu", "0") 

279 total_gpu_requested += int(gpu_request) 

280 

281 if total_gpu_capacity > 0: 

282 return (total_gpu_requested / total_gpu_capacity) * 100 

283 

284 except Exception as e: 

285 logger.error(f"Error calculating GPU utilization: {e}") 

286 

287 return 0.0 

288 

289 async def _get_active_jobs_count(self) -> int: 

290 """Get count of active jobs in the cluster""" 

291 try: 

292 # Count running pods (excluding system pods) 

293 pods = self.core_v1.list_pod_for_all_namespaces( 

294 _request_timeout=self._k8s_timeout, 

295 ) 

296 active_jobs = 0 

297 

298 for pod in pods.items: 

299 # Skip system namespaces 

300 if pod.metadata.namespace in ["kube-system", "kube-public", "kube-node-lease"]: 

301 continue 

302 

303 # Count running pods as active jobs 

304 if pod.status.phase == "Running": 

305 active_jobs += 1 

306 

307 return active_jobs 

308 

309 except Exception as e: 

310 logger.error(f"Error getting active jobs count: {e}") 

311 return 0 

312 

313 async def _get_pod_counts(self) -> tuple[int, int]: 

314 """Get count of active jobs and pending pods in the cluster""" 

315 try: 

316 pods = self.core_v1.list_pod_for_all_namespaces( 

317 _request_timeout=self._k8s_timeout, 

318 ) 

319 active_jobs = 0 

320 pending_pods = 0 

321 

322 for pod in pods.items: 

323 # Skip system namespaces 

324 if pod.metadata.namespace in ["kube-system", "kube-public", "kube-node-lease"]: 

325 continue 

326 

327 if pod.status.phase == "Running": 

328 active_jobs += 1 

329 elif pod.status.phase == "Pending": 329 ↛ 322line 329 didn't jump to line 322 because the condition on line 329 was always true

330 pending_pods += 1 

331 

332 return active_jobs, pending_pods 

333 

334 except Exception as e: 

335 logger.error(f"Error getting pod counts: {e}") 

336 return 0, 0 

337 

338 async def _calculate_pending_requested_resources(self) -> RequestedResources: 

339 """Calculate total resources requested by pending pods""" 

340 try: 

341 pods = self.core_v1.list_pod_for_all_namespaces( 

342 _request_timeout=self._k8s_timeout, 

343 ) 

344 total_cpu_millicores = 0.0 

345 total_memory_bytes = 0 

346 total_gpus = 0 

347 

348 for pod in pods.items: 

349 # Skip system namespaces 

350 if pod.metadata.namespace in ["kube-system", "kube-public", "kube-node-lease"]: 

351 continue 

352 

353 # Only count pending pods 

354 if pod.status.phase != "Pending": 

355 continue 

356 

357 for container in pod.spec.containers: 

358 if container.resources and container.resources.requests: 358 ↛ 357line 358 didn't jump to line 357 because the condition on line 358 was always true

359 # CPU 

360 cpu_request = container.resources.requests.get("cpu", "0") 

361 if cpu_request.endswith("m"): 

362 total_cpu_millicores += int(cpu_request[:-1]) 

363 elif cpu_request.endswith("n"): 

364 total_cpu_millicores += int(cpu_request[:-1]) / 1_000_000 

365 else: 

366 total_cpu_millicores += float(cpu_request) * 1000 

367 

368 # Memory 

369 memory_request = container.resources.requests.get("memory", "0") 

370 total_memory_bytes += self._parse_memory_string(memory_request) 

371 

372 # GPUs 

373 gpu_request = container.resources.requests.get("nvidia.com/gpu", "0") 

374 total_gpus += int(gpu_request) 

375 

376 # Convert to vCPUs and GB 

377 cpu_vcpus = total_cpu_millicores / 1000 

378 memory_gb = total_memory_bytes / (1024 * 1024 * 1024) 

379 

380 return RequestedResources(cpu_vcpus=cpu_vcpus, memory_gb=memory_gb, gpus=total_gpus) 

381 

382 except Exception as e: 

383 logger.error(f"Error calculating pending requested resources: {e}") 

384 return RequestedResources(cpu_vcpus=0.0, memory_gb=0.0, gpus=0) 

385 

386 async def get_health_status(self) -> HealthStatus: 

387 """ 

388 Get current health status of the cluster 

389 """ 

390 try: 

391 # Get current metrics 

392 ( 

393 resource_utilization, 

394 active_jobs, 

395 pending_pods, 

396 pending_requested, 

397 ) = await self.get_cluster_metrics() 

398 

399 # Determine health status based on thresholds 

400 # A threshold of -1 means that check is disabled 

401 is_healthy = True 

402 if not self.thresholds.is_disabled("cpu_threshold"): 402 ↛ 406line 402 didn't jump to line 406 because the condition on line 402 was always true

403 is_healthy = ( 

404 is_healthy and resource_utilization.cpu <= self.thresholds.cpu_threshold 

405 ) 

406 if not self.thresholds.is_disabled("memory_threshold"): 406 ↛ 410line 406 didn't jump to line 410 because the condition on line 406 was always true

407 is_healthy = ( 

408 is_healthy and resource_utilization.memory <= self.thresholds.memory_threshold 

409 ) 

410 if not self.thresholds.is_disabled("gpu_threshold"): 410 ↛ 414line 410 didn't jump to line 414 because the condition on line 410 was always true

411 is_healthy = ( 

412 is_healthy and resource_utilization.gpu <= self.thresholds.gpu_threshold 

413 ) 

414 if not self.thresholds.is_disabled("pending_pods_threshold"): 414 ↛ 416line 414 didn't jump to line 416 because the condition on line 414 was always true

415 is_healthy = is_healthy and pending_pods <= self.thresholds.pending_pods_threshold 

416 if not self.thresholds.is_disabled("pending_requested_cpu_vcpus"): 416 ↛ 421line 416 didn't jump to line 421 because the condition on line 416 was always true

417 is_healthy = ( 

418 is_healthy 

419 and pending_requested.cpu_vcpus <= self.thresholds.pending_requested_cpu_vcpus 

420 ) 

421 if not self.thresholds.is_disabled("pending_requested_memory_gb"): 421 ↛ 426line 421 didn't jump to line 426 because the condition on line 421 was always true

422 is_healthy = ( 

423 is_healthy 

424 and pending_requested.memory_gb <= self.thresholds.pending_requested_memory_gb 

425 ) 

426 if not self.thresholds.is_disabled("pending_requested_gpus"): 426 ↛ 431line 426 didn't jump to line 431 because the condition on line 426 was always true

427 is_healthy = ( 

428 is_healthy and pending_requested.gpus <= self.thresholds.pending_requested_gpus 

429 ) 

430 

431 status: Literal["healthy", "unhealthy"] = "healthy" if is_healthy else "unhealthy" 

432 

433 # Generate status message 

434 message = None 

435 if not is_healthy: 

436 violations = [] 

437 if ( 

438 not self.thresholds.is_disabled("cpu_threshold") 

439 and resource_utilization.cpu > self.thresholds.cpu_threshold 

440 ): 

441 violations.append( 

442 f"CPU: {resource_utilization.cpu:.1f}% > {self.thresholds.cpu_threshold}%" 

443 ) 

444 if ( 

445 not self.thresholds.is_disabled("memory_threshold") 

446 and resource_utilization.memory > self.thresholds.memory_threshold 

447 ): 

448 violations.append( 

449 f"Memory: {resource_utilization.memory:.1f}% > {self.thresholds.memory_threshold}%" 

450 ) 

451 if ( 

452 not self.thresholds.is_disabled("gpu_threshold") 

453 and resource_utilization.gpu > self.thresholds.gpu_threshold 

454 ): 

455 violations.append( 

456 f"GPU: {resource_utilization.gpu:.1f}% > {self.thresholds.gpu_threshold}%" 

457 ) 

458 if ( 

459 not self.thresholds.is_disabled("pending_pods_threshold") 

460 and pending_pods > self.thresholds.pending_pods_threshold 

461 ): 

462 violations.append( 

463 f"Pending Pods: {pending_pods} > {self.thresholds.pending_pods_threshold}" 

464 ) 

465 if ( 

466 not self.thresholds.is_disabled("pending_requested_cpu_vcpus") 

467 and pending_requested.cpu_vcpus > self.thresholds.pending_requested_cpu_vcpus 

468 ): 

469 violations.append( 

470 f"Pending CPU: {pending_requested.cpu_vcpus:.1f} vCPUs > {self.thresholds.pending_requested_cpu_vcpus} vCPUs" 

471 ) 

472 if ( 

473 not self.thresholds.is_disabled("pending_requested_memory_gb") 

474 and pending_requested.memory_gb > self.thresholds.pending_requested_memory_gb 

475 ): 

476 violations.append( 

477 f"Pending Memory: {pending_requested.memory_gb:.1f} GB > {self.thresholds.pending_requested_memory_gb} GB" 

478 ) 

479 if ( 

480 not self.thresholds.is_disabled("pending_requested_gpus") 

481 and pending_requested.gpus > self.thresholds.pending_requested_gpus 

482 ): 

483 violations.append( 

484 f"Pending GPUs: {pending_requested.gpus} > {self.thresholds.pending_requested_gpus}" 

485 ) 

486 message = f"Threshold violations: {', '.join(violations)}" 

487 

488 health_status = HealthStatus( 

489 cluster_id=self.cluster_id, 

490 region=self.region, 

491 timestamp=datetime.now(), 

492 status=status, 

493 resource_utilization=resource_utilization, 

494 thresholds=self.thresholds, 

495 active_jobs=active_jobs, 

496 pending_pods=pending_pods, 

497 pending_requested=pending_requested, 

498 message=message, 

499 ) 

500 

501 logger.info(f"Health status: {status} - {message or 'All thresholds within limits'}") 

502 return health_status 

503 

504 except Exception as e: 

505 logger.error(f"Error getting health status: {e}") 

506 # Return unhealthy status on error 

507 return HealthStatus( 

508 cluster_id=self.cluster_id, 

509 region=self.region, 

510 timestamp=datetime.now(), 

511 status="unhealthy", 

512 resource_utilization=ResourceUtilization(cpu=0.0, memory=0.0, gpu=0.0), 

513 thresholds=self.thresholds, 

514 active_jobs=0, 

515 pending_pods=0, 

516 pending_requested=RequestedResources(cpu_vcpus=0.0, memory_gb=0.0, gpus=0), 

517 message=f"Health check error: {e!s}", 

518 ) 

519 

520 async def sync_alb_registration(self) -> None: 

521 """Ensure the SSM ALB hostname parameter matches the actual ALB. 

522 

523 Reads the main ingress status to get the current ALB hostname, 

524 compares it to the SSM parameter, and updates SSM if stale. 

525 This makes the system self-healing when the ALB changes 

526 (e.g., due to IngressClassParams group merges). 

527 

528 Runs at most once every 5 minutes to avoid excessive API calls. 

529 """ 

530 now = datetime.now() 

531 if ( 

532 self._last_alb_sync 

533 and (now - self._last_alb_sync).total_seconds() < self._alb_sync_interval 

534 ): 

535 return 

536 

537 self._last_alb_sync = now 

538 

539 try: 

540 # Read the main ingress to get the current ALB hostname 

541 ingress = self.networking_v1.read_namespaced_ingress( 

542 "gco-ingress", 

543 "gco-system", 

544 _request_timeout=self._k8s_timeout, 

545 ) 

546 lb_ingress = ingress.status.load_balancer.ingress 

547 if not lb_ingress: 

548 return 

549 

550 current_hostname = lb_ingress[0].hostname 

551 if not current_hostname: 551 ↛ 552line 551 didn't jump to line 552 because the condition on line 551 was never true

552 return 

553 

554 # Compare with SSM parameter 

555 import os 

556 

557 from gco.services.aws_ssm import ( 

558 get_ssm_parameter_optional, 

559 put_ssm_parameter, 

560 ) 

561 

562 global_region = os.environ.get("GLOBAL_REGION", "us-east-2") 

563 project_name = os.environ.get("PROJECT_NAME", "gco") 

564 param_name = f"/{project_name}/alb-hostname-{self.region}" 

565 

566 stored_hostname = get_ssm_parameter_optional(param_name, region=global_region) 

567 

568 if stored_hostname != current_hostname: 

569 logger.warning( 

570 "ALB hostname mismatch: SSM=%s, actual=%s. Updating SSM.", 

571 stored_hostname, 

572 current_hostname, 

573 ) 

574 put_ssm_parameter( 

575 param_name, 

576 current_hostname, 

577 region=global_region, 

578 parameter_type="String", 

579 overwrite=True, 

580 ) 

581 logger.info("Updated SSM parameter %s to %s", param_name, current_hostname) 

582 

583 except Exception as e: 

584 logger.warning("ALB sync check failed (non-fatal): %s", e) 

585 

586 

587def create_health_monitor_from_env() -> HealthMonitor: 

588 """ 

589 Create HealthMonitor instance from environment variables 

590 """ 

591 cluster_id = os.getenv("CLUSTER_NAME", "unknown-cluster") 

592 region = os.getenv("REGION", "unknown-region") 

593 

594 # Load thresholds from environment (defaults match cdk.json) 

595 cpu_threshold = int(os.getenv("CPU_THRESHOLD", "60")) 

596 memory_threshold = int(os.getenv("MEMORY_THRESHOLD", "60")) 

597 gpu_threshold = int(os.getenv("GPU_THRESHOLD", "60")) 

598 pending_pods_threshold = int(os.getenv("PENDING_PODS_THRESHOLD", "10")) 

599 pending_requested_cpu_vcpus = int(os.getenv("PENDING_REQUESTED_CPU_VCPUS", "100")) 

600 pending_requested_memory_gb = int(os.getenv("PENDING_REQUESTED_MEMORY_GB", "200")) 

601 pending_requested_gpus = int(os.getenv("PENDING_REQUESTED_GPUS", "8")) 

602 

603 thresholds = ResourceThresholds( 

604 cpu_threshold=cpu_threshold, 

605 memory_threshold=memory_threshold, 

606 gpu_threshold=gpu_threshold, 

607 pending_pods_threshold=pending_pods_threshold, 

608 pending_requested_cpu_vcpus=pending_requested_cpu_vcpus, 

609 pending_requested_memory_gb=pending_requested_memory_gb, 

610 pending_requested_gpus=pending_requested_gpus, 

611 ) 

612 

613 return HealthMonitor(cluster_id, region, thresholds) 

614 

615 

616async def main() -> None: 

617 """ 

618 Main function for running the health monitor with webhook dispatcher. 

619 

620 This runs both the health monitoring loop and the webhook dispatcher 

621 as concurrent tasks. 

622 """ 

623 from gco.services.webhook_dispatcher import create_webhook_dispatcher_from_env 

624 

625 health_monitor = create_health_monitor_from_env() 

626 

627 # Enable structured JSON logging for CloudWatch Insights 

628 configure_structured_logging( 

629 service_name="health-monitor", 

630 cluster_id=health_monitor.cluster_id, 

631 region=health_monitor.region, 

632 ) 

633 

634 webhook_dispatcher = create_webhook_dispatcher_from_env() 

635 

636 # Start webhook dispatcher 

637 await webhook_dispatcher.start() 

638 logger.info("Webhook dispatcher started") 

639 

640 try: 

641 while True: 

642 try: 

643 health_status = await health_monitor.get_health_status() 

644 print(f"Health Status: {health_status.status}") 

645 print(f"CPU: {health_status.resource_utilization.cpu:.1f}%") 

646 print(f"Memory: {health_status.resource_utilization.memory:.1f}%") 

647 print(f"GPU: {health_status.resource_utilization.gpu:.1f}%") 

648 print(f"Active Jobs: {health_status.active_jobs}") 

649 print(f"Pending Pods: {health_status.pending_pods}") 

650 if health_status.pending_requested: 

651 print( 

652 f"Pending Requested CPU: {health_status.pending_requested.cpu_vcpus:.1f} vCPUs" 

653 ) 

654 print( 

655 f"Pending Requested Memory: {health_status.pending_requested.memory_gb:.1f} GB" 

656 ) 

657 if health_status.message: 

658 print(f"Message: {health_status.message}") 

659 

660 # Print webhook dispatcher metrics 

661 webhook_metrics = webhook_dispatcher.get_metrics() 

662 print( 

663 f"Webhook Deliveries: {webhook_metrics['deliveries_total']} " 

664 f"(success={webhook_metrics['deliveries_success']}, " 

665 f"failed={webhook_metrics['deliveries_failed']})" 

666 ) 

667 print("-" * 50) 

668 

669 await asyncio.sleep(30) # Check every 30 seconds 

670 

671 except KeyboardInterrupt: 

672 raise 

673 except Exception as e: 

674 logger.error(f"Error in main loop: {e}") 

675 await asyncio.sleep(10) 

676 

677 except KeyboardInterrupt: 

678 logger.info("Health monitor stopped by user") 

679 finally: 

680 await webhook_dispatcher.stop() 

681 logger.info("Webhook dispatcher stopped") 

682 

683 

684if __name__ == "__main__": 

685 asyncio.run(main())