Coverage for gco / services / health_monitor.py: 96%

328 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-30 21:47 +0000

1""" 

2Health Monitor Service for GCO (Global Capacity Orchestrator on AWS). 

3 

4This service monitors Kubernetes cluster resource utilization and reports 

5health status for load balancer health checks and monitoring dashboards. 

6 

7Key Features: 

8- Collects CPU, memory, and GPU utilization metrics from Kubernetes Metrics Server 

9- Compares utilization against configurable thresholds 

10- Reports health status (healthy/unhealthy) based on threshold violations 

11- Caches metrics to reduce API calls to Kubernetes 

12 

13Environment Variables: 

14 CLUSTER_NAME: Name of the EKS cluster being monitored 

15 REGION: AWS region of the cluster 

16 CPU_THRESHOLD: CPU utilization threshold percentage (default: 80, -1 to disable) 

17 MEMORY_THRESHOLD: Memory utilization threshold percentage (default: 85, -1 to disable) 

18 GPU_THRESHOLD: GPU utilization threshold percentage (default: 90, -1 to disable) 

19 

20Usage: 

21 health_monitor = create_health_monitor_from_env() 

22 status = await health_monitor.get_health_status() 

23""" 

24 

25import asyncio 

26import logging 

27import os 

28from datetime import datetime 

29from typing import Any, Literal 

30 

31from kubernetes import client, config 

32from kubernetes.client.rest import ApiException 

33 

34from gco.models import HealthStatus, RequestedResources, ResourceThresholds, ResourceUtilization 

35from gco.services.structured_logging import configure_structured_logging 

36 

37logging.basicConfig( 

38 level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" 

39) 

40logger = logging.getLogger(__name__) 

41 

42 

43class HealthMonitor: 

44 """ 

45 Monitors Kubernetes cluster resource utilization and determines health status 

46 """ 

47 

48 def __init__(self, cluster_id: str, region: str, thresholds: ResourceThresholds): 

49 self.cluster_id = cluster_id 

50 self.region = region 

51 self.thresholds = thresholds 

52 

53 # Initialize Kubernetes clients 

54 try: 

55 # Try to load in-cluster config first (when running in pod) 

56 config.load_incluster_config() 

57 logger.info("Loaded in-cluster Kubernetes configuration") 

58 except config.ConfigException: 

59 try: 

60 # Fall back to local kubeconfig (for development) 

61 config.load_kube_config() 

62 logger.info("Loaded local Kubernetes configuration") 

63 except config.ConfigException as e: 

64 logger.error(f"Failed to load Kubernetes configuration: {e}") 

65 raise 

66 

67 self.core_v1 = client.CoreV1Api() 

68 self.networking_v1 = client.NetworkingV1Api() 

69 self.metrics_v1beta1 = client.CustomObjectsApi() 

70 

71 # Timeout for Kubernetes API calls (seconds) 

72 self._k8s_timeout = int(os.environ.get("K8S_API_TIMEOUT", "30")) 

73 

74 # Cache for metrics 

75 self._last_metrics_time: datetime | None = None 

76 self._cached_metrics: dict[str, Any] | None = None 

77 self._cache_duration = 30 # seconds 

78 

79 # ALB hostname sync 

80 self._last_alb_sync: datetime | None = None 

81 self._alb_sync_interval = 300 # 5 minutes 

82 

83 async def get_cluster_metrics(self) -> tuple[ResourceUtilization, int, int, RequestedResources]: 

84 """ 

85 Get current cluster resource utilization metrics 

86 Returns: (ResourceUtilization, active_jobs_count, pending_pods_count, pending_requested_resources) 

87 """ 

88 try: 

89 # Get node metrics from metrics server 

90 node_metrics = await self._get_node_metrics() 

91 

92 # Get pod metrics for active jobs count and pending pods 

93 active_jobs, pending_pods = await self._get_pod_counts() 

94 

95 # Calculate cluster-wide utilization 

96 cpu_utilization = self._calculate_cpu_utilization(node_metrics) 

97 memory_utilization = self._calculate_memory_utilization(node_metrics) 

98 gpu_utilization = await self._calculate_gpu_utilization() 

99 

100 # Calculate resources requested by pending pods 

101 pending_requested = await self._calculate_pending_requested_resources() 

102 

103 resource_utilization = ResourceUtilization( 

104 cpu=cpu_utilization, memory=memory_utilization, gpu=gpu_utilization 

105 ) 

106 

107 logger.info( 

108 f"Cluster metrics - CPU: {cpu_utilization:.1f}%, " 

109 f"Memory: {memory_utilization:.1f}%, GPU: {gpu_utilization:.1f}%, " 

110 f"Active Jobs: {active_jobs}, Pending Pods: {pending_pods}, " 

111 f"Pending Requested CPU: {pending_requested.cpu_vcpus:.1f} vCPUs, " 

112 f"Pending Requested Memory: {pending_requested.memory_gb:.1f} GB" 

113 ) 

114 

115 return resource_utilization, active_jobs, pending_pods, pending_requested 

116 

117 except Exception as e: 

118 logger.error(f"Failed to get cluster metrics: {e}") 

119 # Re-raise so get_health_status returns "unhealthy" instead of 

120 # silently reporting 0% utilization (which looks healthy to GA). 

121 raise 

122 

123 async def _get_node_metrics(self) -> dict[str, Any]: 

124 """Get node metrics from Kubernetes metrics server""" 

125 try: 

126 # Check cache first 

127 now = datetime.now() 

128 if ( 

129 self._cached_metrics 

130 and self._last_metrics_time 

131 and (now - self._last_metrics_time).seconds < self._cache_duration 

132 ): 

133 return self._cached_metrics 

134 

135 # Fetch fresh metrics 

136 node_metrics: dict[str, Any] = self.metrics_v1beta1.list_cluster_custom_object( 

137 group="metrics.k8s.io", 

138 version="v1beta1", 

139 plural="nodes", 

140 _request_timeout=self._k8s_timeout, 

141 ) 

142 

143 # Update cache 

144 self._cached_metrics = node_metrics 

145 self._last_metrics_time = now 

146 

147 return node_metrics 

148 

149 except ApiException as e: 

150 logger.error(f"Failed to get node metrics: {e}") 

151 # Invalidate cache so stale data isn't used on next call 

152 self._cached_metrics = None 

153 self._last_metrics_time = None 

154 # Re-raise so get_cluster_metrics propagates the failure 

155 # to get_health_status, which returns "unhealthy" 

156 raise 

157 

158 def _calculate_cpu_utilization(self, node_metrics: dict[str, Any]) -> float: 

159 """Calculate cluster-wide CPU utilization percentage""" 

160 total_cpu_usage = 0.0 

161 total_cpu_capacity = 0.0 

162 

163 try: 

164 # Get node list for capacity information 

165 nodes = self.core_v1.list_node(_request_timeout=self._k8s_timeout) 

166 node_capacities = {} 

167 

168 for node in nodes.items: 

169 node_name = node.metadata.name 

170 cpu_capacity = node.status.allocatable.get("cpu", "0") 

171 # Convert CPU capacity to millicores 

172 if cpu_capacity.endswith("m"): 

173 cpu_capacity_millicores = int(cpu_capacity[:-1]) 

174 else: 

175 cpu_capacity_millicores = int(cpu_capacity) * 1000 

176 node_capacities[node_name] = cpu_capacity_millicores 

177 total_cpu_capacity += cpu_capacity_millicores 

178 

179 # Calculate usage from metrics 

180 for item in node_metrics.get("items", []): 

181 node_name = item["metadata"]["name"] 

182 cpu_usage = item["usage"]["cpu"] 

183 

184 # Convert CPU usage to millicores 

185 if cpu_usage.endswith("n"): 

186 cpu_usage_millicores = int(cpu_usage[:-1]) / 1_000_000 

187 elif cpu_usage.endswith("u"): 

188 cpu_usage_millicores = int(cpu_usage[:-1]) / 1_000 

189 elif cpu_usage.endswith("m"): 

190 cpu_usage_millicores = int(cpu_usage[:-1]) 

191 else: 

192 cpu_usage_millicores = int(cpu_usage) * 1000 

193 

194 total_cpu_usage += cpu_usage_millicores 

195 

196 if total_cpu_capacity > 0: 

197 return (total_cpu_usage / total_cpu_capacity) * 100 

198 

199 except Exception as e: 

200 logger.error(f"Error calculating CPU utilization: {e}") 

201 

202 return 0.0 

203 

204 def _calculate_memory_utilization(self, node_metrics: dict[str, Any]) -> float: 

205 """Calculate cluster-wide memory utilization percentage""" 

206 total_memory_usage = 0 

207 total_memory_capacity = 0 

208 

209 try: 

210 # Get node list for capacity information 

211 nodes = self.core_v1.list_node(_request_timeout=self._k8s_timeout) 

212 

213 for node in nodes.items: 

214 memory_capacity = node.status.allocatable.get("memory", "0") 

215 # Convert memory capacity to bytes 

216 memory_capacity_bytes = self._parse_memory_string(memory_capacity) 

217 total_memory_capacity += memory_capacity_bytes 

218 

219 # Calculate usage from metrics 

220 for item in node_metrics.get("items", []): 

221 memory_usage = item["usage"]["memory"] 

222 memory_usage_bytes = self._parse_memory_string(memory_usage) 

223 total_memory_usage += memory_usage_bytes 

224 

225 if total_memory_capacity > 0: 

226 return (total_memory_usage / total_memory_capacity) * 100 

227 

228 except Exception as e: 

229 logger.error(f"Error calculating memory utilization: {e}") 

230 

231 return 0.0 

232 

233 def _parse_memory_string(self, memory_str: str) -> int: 

234 """Parse Kubernetes memory string to bytes""" 

235 if not memory_str: 

236 return 0 

237 

238 memory_str = memory_str.strip() 

239 

240 # Handle different units 

241 if memory_str.endswith("Ki"): 

242 return int(memory_str[:-2]) * 1024 

243 if memory_str.endswith("Mi"): 

244 return int(memory_str[:-2]) * 1024 * 1024 

245 if memory_str.endswith("Gi"): 

246 return int(memory_str[:-2]) * 1024 * 1024 * 1024 

247 if memory_str.endswith("Ti"): 

248 return int(memory_str[:-2]) * 1024 * 1024 * 1024 * 1024 

249 if memory_str.endswith("k"): 

250 return int(memory_str[:-1]) * 1000 

251 if memory_str.endswith("M"): 

252 return int(memory_str[:-1]) * 1000 * 1000 

253 if memory_str.endswith("G"): 

254 return int(memory_str[:-1]) * 1000 * 1000 * 1000 

255 return int(memory_str) 

256 

257 async def _calculate_gpu_utilization(self) -> float: 

258 """Calculate cluster-wide GPU utilization percentage""" 

259 try: 

260 # Get pods with GPU requests 

261 pods = self.core_v1.list_pod_for_all_namespaces( 

262 _request_timeout=self._k8s_timeout, 

263 ) 

264 

265 total_gpu_requested = 0 

266 total_gpu_capacity = 0 

267 

268 # Get node GPU capacity 

269 nodes = self.core_v1.list_node(_request_timeout=self._k8s_timeout) 

270 for node in nodes.items: 

271 gpu_capacity = node.status.allocatable.get("nvidia.com/gpu", "0") 

272 total_gpu_capacity += int(gpu_capacity) 

273 

274 # Calculate GPU requests from running pods 

275 for pod in pods.items: 

276 if pod.status.phase == "Running": 

277 for container in pod.spec.containers: 

278 if container.resources and container.resources.requests: 

279 gpu_request = container.resources.requests.get("nvidia.com/gpu", "0") 

280 total_gpu_requested += int(gpu_request) 

281 

282 if total_gpu_capacity > 0: 

283 return (total_gpu_requested / total_gpu_capacity) * 100 

284 

285 except Exception as e: 

286 logger.error(f"Error calculating GPU utilization: {e}") 

287 

288 return 0.0 

289 

290 async def _get_active_jobs_count(self) -> int: 

291 """Get count of active jobs in the cluster""" 

292 try: 

293 # Count running pods (excluding system pods) 

294 pods = self.core_v1.list_pod_for_all_namespaces( 

295 _request_timeout=self._k8s_timeout, 

296 ) 

297 active_jobs = 0 

298 

299 for pod in pods.items: 

300 # Skip system namespaces 

301 if pod.metadata.namespace in ["kube-system", "kube-public", "kube-node-lease"]: 

302 continue 

303 

304 # Count running pods as active jobs 

305 if pod.status.phase == "Running": 

306 active_jobs += 1 

307 

308 return active_jobs 

309 

310 except Exception as e: 

311 logger.error(f"Error getting active jobs count: {e}") 

312 return 0 

313 

314 async def _get_pod_counts(self) -> tuple[int, int]: 

315 """Get count of active jobs and pending pods in the cluster""" 

316 try: 

317 pods = self.core_v1.list_pod_for_all_namespaces( 

318 _request_timeout=self._k8s_timeout, 

319 ) 

320 active_jobs = 0 

321 pending_pods = 0 

322 

323 for pod in pods.items: 

324 # Skip system namespaces 

325 if pod.metadata.namespace in ["kube-system", "kube-public", "kube-node-lease"]: 

326 continue 

327 

328 if pod.status.phase == "Running": 

329 active_jobs += 1 

330 elif pod.status.phase == "Pending": 330 ↛ 323line 330 didn't jump to line 323 because the condition on line 330 was always true

331 pending_pods += 1 

332 

333 return active_jobs, pending_pods 

334 

335 except Exception as e: 

336 logger.error(f"Error getting pod counts: {e}") 

337 return 0, 0 

338 

339 async def _calculate_pending_requested_resources(self) -> RequestedResources: 

340 """Calculate total resources requested by pending pods""" 

341 try: 

342 pods = self.core_v1.list_pod_for_all_namespaces( 

343 _request_timeout=self._k8s_timeout, 

344 ) 

345 total_cpu_millicores = 0.0 

346 total_memory_bytes = 0 

347 total_gpus = 0 

348 

349 for pod in pods.items: 

350 # Skip system namespaces 

351 if pod.metadata.namespace in ["kube-system", "kube-public", "kube-node-lease"]: 

352 continue 

353 

354 # Only count pending pods 

355 if pod.status.phase != "Pending": 

356 continue 

357 

358 for container in pod.spec.containers: 

359 if container.resources and container.resources.requests: 359 ↛ 358line 359 didn't jump to line 358 because the condition on line 359 was always true

360 # CPU 

361 cpu_request = container.resources.requests.get("cpu", "0") 

362 if cpu_request.endswith("m"): 

363 total_cpu_millicores += int(cpu_request[:-1]) 

364 elif cpu_request.endswith("n"): 

365 total_cpu_millicores += int(cpu_request[:-1]) / 1_000_000 

366 else: 

367 total_cpu_millicores += float(cpu_request) * 1000 

368 

369 # Memory 

370 memory_request = container.resources.requests.get("memory", "0") 

371 total_memory_bytes += self._parse_memory_string(memory_request) 

372 

373 # GPUs 

374 gpu_request = container.resources.requests.get("nvidia.com/gpu", "0") 

375 total_gpus += int(gpu_request) 

376 

377 # Convert to vCPUs and GB 

378 cpu_vcpus = total_cpu_millicores / 1000 

379 memory_gb = total_memory_bytes / (1024 * 1024 * 1024) 

380 

381 return RequestedResources(cpu_vcpus=cpu_vcpus, memory_gb=memory_gb, gpus=total_gpus) 

382 

383 except Exception as e: 

384 logger.error(f"Error calculating pending requested resources: {e}") 

385 return RequestedResources(cpu_vcpus=0.0, memory_gb=0.0, gpus=0) 

386 

387 async def get_health_status(self) -> HealthStatus: 

388 """ 

389 Get current health status of the cluster 

390 """ 

391 try: 

392 # Get current metrics 

393 ( 

394 resource_utilization, 

395 active_jobs, 

396 pending_pods, 

397 pending_requested, 

398 ) = await self.get_cluster_metrics() 

399 

400 # Determine health status based on thresholds 

401 # A threshold of -1 means that check is disabled 

402 is_healthy = True 

403 if not self.thresholds.is_disabled("cpu_threshold"): 403 ↛ 407line 403 didn't jump to line 407 because the condition on line 403 was always true

404 is_healthy = ( 

405 is_healthy and resource_utilization.cpu <= self.thresholds.cpu_threshold 

406 ) 

407 if not self.thresholds.is_disabled("memory_threshold"): 407 ↛ 411line 407 didn't jump to line 411 because the condition on line 407 was always true

408 is_healthy = ( 

409 is_healthy and resource_utilization.memory <= self.thresholds.memory_threshold 

410 ) 

411 if not self.thresholds.is_disabled("gpu_threshold"): 411 ↛ 415line 411 didn't jump to line 415 because the condition on line 411 was always true

412 is_healthy = ( 

413 is_healthy and resource_utilization.gpu <= self.thresholds.gpu_threshold 

414 ) 

415 if not self.thresholds.is_disabled("pending_pods_threshold"): 415 ↛ 417line 415 didn't jump to line 417 because the condition on line 415 was always true

416 is_healthy = is_healthy and pending_pods <= self.thresholds.pending_pods_threshold 

417 if not self.thresholds.is_disabled("pending_requested_cpu_vcpus"): 417 ↛ 422line 417 didn't jump to line 422 because the condition on line 417 was always true

418 is_healthy = ( 

419 is_healthy 

420 and pending_requested.cpu_vcpus <= self.thresholds.pending_requested_cpu_vcpus 

421 ) 

422 if not self.thresholds.is_disabled("pending_requested_memory_gb"): 422 ↛ 427line 422 didn't jump to line 427 because the condition on line 422 was always true

423 is_healthy = ( 

424 is_healthy 

425 and pending_requested.memory_gb <= self.thresholds.pending_requested_memory_gb 

426 ) 

427 if not self.thresholds.is_disabled("pending_requested_gpus"): 427 ↛ 432line 427 didn't jump to line 432 because the condition on line 427 was always true

428 is_healthy = ( 

429 is_healthy and pending_requested.gpus <= self.thresholds.pending_requested_gpus 

430 ) 

431 

432 status: Literal["healthy", "unhealthy"] = "healthy" if is_healthy else "unhealthy" 

433 

434 # Generate status message 

435 message = None 

436 if not is_healthy: 

437 violations = [] 

438 if ( 

439 not self.thresholds.is_disabled("cpu_threshold") 

440 and resource_utilization.cpu > self.thresholds.cpu_threshold 

441 ): 

442 violations.append( 

443 f"CPU: {resource_utilization.cpu:.1f}% > {self.thresholds.cpu_threshold}%" 

444 ) 

445 if ( 

446 not self.thresholds.is_disabled("memory_threshold") 

447 and resource_utilization.memory > self.thresholds.memory_threshold 

448 ): 

449 violations.append( 

450 f"Memory: {resource_utilization.memory:.1f}% > {self.thresholds.memory_threshold}%" 

451 ) 

452 if ( 

453 not self.thresholds.is_disabled("gpu_threshold") 

454 and resource_utilization.gpu > self.thresholds.gpu_threshold 

455 ): 

456 violations.append( 

457 f"GPU: {resource_utilization.gpu:.1f}% > {self.thresholds.gpu_threshold}%" 

458 ) 

459 if ( 

460 not self.thresholds.is_disabled("pending_pods_threshold") 

461 and pending_pods > self.thresholds.pending_pods_threshold 

462 ): 

463 violations.append( 

464 f"Pending Pods: {pending_pods} > {self.thresholds.pending_pods_threshold}" 

465 ) 

466 if ( 

467 not self.thresholds.is_disabled("pending_requested_cpu_vcpus") 

468 and pending_requested.cpu_vcpus > self.thresholds.pending_requested_cpu_vcpus 

469 ): 

470 violations.append( 

471 f"Pending CPU: {pending_requested.cpu_vcpus:.1f} vCPUs > {self.thresholds.pending_requested_cpu_vcpus} vCPUs" 

472 ) 

473 if ( 

474 not self.thresholds.is_disabled("pending_requested_memory_gb") 

475 and pending_requested.memory_gb > self.thresholds.pending_requested_memory_gb 

476 ): 

477 violations.append( 

478 f"Pending Memory: {pending_requested.memory_gb:.1f} GB > {self.thresholds.pending_requested_memory_gb} GB" 

479 ) 

480 if ( 

481 not self.thresholds.is_disabled("pending_requested_gpus") 

482 and pending_requested.gpus > self.thresholds.pending_requested_gpus 

483 ): 

484 violations.append( 

485 f"Pending GPUs: {pending_requested.gpus} > {self.thresholds.pending_requested_gpus}" 

486 ) 

487 message = f"Threshold violations: {', '.join(violations)}" 

488 

489 health_status = HealthStatus( 

490 cluster_id=self.cluster_id, 

491 region=self.region, 

492 timestamp=datetime.now(), 

493 status=status, 

494 resource_utilization=resource_utilization, 

495 thresholds=self.thresholds, 

496 active_jobs=active_jobs, 

497 pending_pods=pending_pods, 

498 pending_requested=pending_requested, 

499 message=message, 

500 ) 

501 

502 logger.info(f"Health status: {status} - {message or 'All thresholds within limits'}") 

503 return health_status 

504 

505 except Exception as e: 

506 logger.error(f"Error getting health status: {e}") 

507 # Return unhealthy status on error 

508 return HealthStatus( 

509 cluster_id=self.cluster_id, 

510 region=self.region, 

511 timestamp=datetime.now(), 

512 status="unhealthy", 

513 resource_utilization=ResourceUtilization(cpu=0.0, memory=0.0, gpu=0.0), 

514 thresholds=self.thresholds, 

515 active_jobs=0, 

516 pending_pods=0, 

517 pending_requested=RequestedResources(cpu_vcpus=0.0, memory_gb=0.0, gpus=0), 

518 message=f"Health check error: {e!s}", 

519 ) 

520 

521 async def sync_alb_registration(self) -> None: 

522 """Ensure the SSM ALB hostname parameter matches the actual ALB. 

523 

524 Reads the main ingress status to get the current ALB hostname, 

525 compares it to the SSM parameter, and updates SSM if stale. 

526 This makes the system self-healing when the ALB changes 

527 (e.g., due to IngressClassParams group merges). 

528 

529 Runs at most once every 5 minutes to avoid excessive API calls. 

530 """ 

531 now = datetime.now() 

532 if ( 

533 self._last_alb_sync 

534 and (now - self._last_alb_sync).total_seconds() < self._alb_sync_interval 

535 ): 

536 return 

537 

538 self._last_alb_sync = now 

539 

540 try: 

541 # Read the main ingress to get the current ALB hostname 

542 ingress = self.networking_v1.read_namespaced_ingress( 

543 "gco-ingress", 

544 "gco-system", 

545 _request_timeout=self._k8s_timeout, 

546 ) 

547 lb_ingress = ingress.status.load_balancer.ingress 

548 if not lb_ingress: 

549 return 

550 

551 current_hostname = lb_ingress[0].hostname 

552 if not current_hostname: 552 ↛ 553line 552 didn't jump to line 553 because the condition on line 552 was never true

553 return 

554 

555 # Compare with SSM parameter 

556 import os 

557 

558 import boto3 

559 

560 global_region = os.environ.get("GLOBAL_REGION", "us-east-2") 

561 project_name = os.environ.get("PROJECT_NAME", "gco") 

562 ssm = boto3.client("ssm", region_name=global_region) 

563 param_name = f"/{project_name}/alb-hostname-{self.region}" 

564 

565 try: 

566 resp = ssm.get_parameter(Name=param_name) 

567 stored_hostname = resp["Parameter"]["Value"] 

568 except ssm.exceptions.ParameterNotFound: 

569 stored_hostname = None 

570 

571 if stored_hostname != current_hostname: 

572 logger.warning( 

573 "ALB hostname mismatch: SSM=%s, actual=%s. Updating SSM.", 

574 stored_hostname, 

575 current_hostname, 

576 ) 

577 ssm.put_parameter( 

578 Name=param_name, 

579 Value=current_hostname, 

580 Type="String", 

581 Overwrite=True, 

582 ) 

583 logger.info("Updated SSM parameter %s to %s", param_name, current_hostname) 

584 

585 except Exception as e: 

586 logger.warning("ALB sync check failed (non-fatal): %s", e) 

587 

588 

589def create_health_monitor_from_env() -> HealthMonitor: 

590 """ 

591 Create HealthMonitor instance from environment variables 

592 """ 

593 cluster_id = os.getenv("CLUSTER_NAME", "unknown-cluster") 

594 region = os.getenv("REGION", "unknown-region") 

595 

596 # Load thresholds from environment (defaults match cdk.json) 

597 cpu_threshold = int(os.getenv("CPU_THRESHOLD", "60")) 

598 memory_threshold = int(os.getenv("MEMORY_THRESHOLD", "60")) 

599 gpu_threshold = int(os.getenv("GPU_THRESHOLD", "60")) 

600 pending_pods_threshold = int(os.getenv("PENDING_PODS_THRESHOLD", "10")) 

601 pending_requested_cpu_vcpus = int(os.getenv("PENDING_REQUESTED_CPU_VCPUS", "100")) 

602 pending_requested_memory_gb = int(os.getenv("PENDING_REQUESTED_MEMORY_GB", "200")) 

603 pending_requested_gpus = int(os.getenv("PENDING_REQUESTED_GPUS", "8")) 

604 

605 thresholds = ResourceThresholds( 

606 cpu_threshold=cpu_threshold, 

607 memory_threshold=memory_threshold, 

608 gpu_threshold=gpu_threshold, 

609 pending_pods_threshold=pending_pods_threshold, 

610 pending_requested_cpu_vcpus=pending_requested_cpu_vcpus, 

611 pending_requested_memory_gb=pending_requested_memory_gb, 

612 pending_requested_gpus=pending_requested_gpus, 

613 ) 

614 

615 return HealthMonitor(cluster_id, region, thresholds) 

616 

617 

618async def main() -> None: 

619 """ 

620 Main function for running the health monitor with webhook dispatcher. 

621 

622 This runs both the health monitoring loop and the webhook dispatcher 

623 as concurrent tasks. 

624 """ 

625 from gco.services.webhook_dispatcher import create_webhook_dispatcher_from_env 

626 

627 health_monitor = create_health_monitor_from_env() 

628 

629 # Enable structured JSON logging for CloudWatch Insights 

630 configure_structured_logging( 

631 service_name="health-monitor", 

632 cluster_id=health_monitor.cluster_id, 

633 region=health_monitor.region, 

634 ) 

635 

636 webhook_dispatcher = create_webhook_dispatcher_from_env() 

637 

638 # Start webhook dispatcher 

639 await webhook_dispatcher.start() 

640 logger.info("Webhook dispatcher started") 

641 

642 try: 

643 while True: 

644 try: 

645 health_status = await health_monitor.get_health_status() 

646 print(f"Health Status: {health_status.status}") 

647 print(f"CPU: {health_status.resource_utilization.cpu:.1f}%") 

648 print(f"Memory: {health_status.resource_utilization.memory:.1f}%") 

649 print(f"GPU: {health_status.resource_utilization.gpu:.1f}%") 

650 print(f"Active Jobs: {health_status.active_jobs}") 

651 print(f"Pending Pods: {health_status.pending_pods}") 

652 if health_status.pending_requested: 

653 print( 

654 f"Pending Requested CPU: {health_status.pending_requested.cpu_vcpus:.1f} vCPUs" 

655 ) 

656 print( 

657 f"Pending Requested Memory: {health_status.pending_requested.memory_gb:.1f} GB" 

658 ) 

659 if health_status.message: 

660 print(f"Message: {health_status.message}") 

661 

662 # Print webhook dispatcher metrics 

663 webhook_metrics = webhook_dispatcher.get_metrics() 

664 print( 

665 f"Webhook Deliveries: {webhook_metrics['deliveries_total']} " 

666 f"(success={webhook_metrics['deliveries_success']}, " 

667 f"failed={webhook_metrics['deliveries_failed']})" 

668 ) 

669 print("-" * 50) 

670 

671 await asyncio.sleep(30) # Check every 30 seconds 

672 

673 except KeyboardInterrupt: 

674 raise 

675 except Exception as e: 

676 logger.error(f"Error in main loop: {e}") 

677 await asyncio.sleep(10) 

678 

679 except KeyboardInterrupt: 

680 logger.info("Health monitor stopped by user") 

681 finally: 

682 await webhook_dispatcher.stop() 

683 logger.info("Webhook dispatcher stopped") 

684 

685 

686if __name__ == "__main__": 

687 asyncio.run(main())