Coverage for gco / services / health_api.py: 83%
136 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-30 21:47 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-30 21:47 +0000
1"""
2Health API Service for GCO (Global Capacity Orchestrator on AWS).
4This FastAPI service exposes health status endpoints for:
5- ALB health checks (/healthz, /readyz)
6- Detailed health status (/api/v1/health)
7- Resource utilization metrics (/api/v1/metrics)
8- Service status information (/api/v1/status)
10The service runs a background task that continuously monitors cluster health
11and caches the results for fast response times on health check endpoints.
13Endpoints:
14 GET /healthz - Kubernetes liveness probe (always 200 if running)
15 GET /readyz - Kubernetes readiness probe (200 if health monitor ready)
16 GET /api/v1/health - Detailed health status (200 if healthy, 503 if not)
17 GET /api/v1/metrics - Resource utilization metrics
18 GET /api/v1/status - Service operational status
20Environment Variables:
21 HOST: Bind address (default: 0.0.0.0)
22 PORT: Listen port (default: 8080)
23 LOG_LEVEL: Logging level (default: info)
24 CLUSTER_NAME, REGION, *_THRESHOLD: See health_monitor.py
25"""
27import asyncio
28import contextlib
29import logging
30import os
31from collections.abc import AsyncIterator
32from contextlib import asynccontextmanager
33from datetime import datetime
34from typing import Any
36from fastapi import FastAPI, HTTPException, Request
37from fastapi.responses import JSONResponse
39from gco.models import HealthStatus
40from gco.services.auth_middleware import AuthenticationMiddleware
41from gco.services.health_monitor import HealthMonitor, create_health_monitor_from_env
42from gco.services.metrics_publisher import HealthMonitorMetrics
43from gco.services.structured_logging import configure_structured_logging
44from gco.services.webhook_dispatcher import (
45 WebhookDispatcher,
46 create_webhook_dispatcher_from_env,
47)
49logging.basicConfig(
50 level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
51)
52logger = logging.getLogger(__name__)
54# Global health monitor instance
55health_monitor: HealthMonitor | None = None
56health_metrics: HealthMonitorMetrics | None = None
57webhook_dispatcher: WebhookDispatcher | None = None
58current_health_status: HealthStatus | None = None
59health_check_task = None
62@asynccontextmanager
63async def lifespan(app: FastAPI) -> AsyncIterator[None]:
64 """
65 Application lifespan manager - starts and stops background health monitoring
66 and webhook dispatcher.
67 """
68 global health_monitor, health_metrics, health_check_task, webhook_dispatcher
70 # Startup
71 logger.info("Starting Health API Service")
72 try:
73 health_monitor = create_health_monitor_from_env()
75 # Enable structured JSON logging now that we know cluster_id and region
76 configure_structured_logging(
77 service_name="health-api",
78 cluster_id=health_monitor.cluster_id,
79 region=health_monitor.region,
80 )
81 # Initialize metrics publisher for CloudWatch custom metrics
82 # Non-fatal: if credentials aren't available yet (e.g., Pod Identity agent
83 # still starting), we skip metrics but keep serving health checks.
84 try:
85 health_metrics = HealthMonitorMetrics(
86 cluster_name=health_monitor.cluster_id,
87 region=health_monitor.region,
88 )
89 except Exception as e:
90 logger.warning(f"Failed to initialize CloudWatch metrics publisher: {e}")
91 health_metrics = None
92 health_check_task = asyncio.create_task(background_health_monitor())
93 logger.info("Health monitoring started")
95 # Start webhook dispatcher for job event notifications
96 try:
97 webhook_dispatcher = create_webhook_dispatcher_from_env()
98 await webhook_dispatcher.start()
99 logger.info("Webhook dispatcher started")
100 except Exception as e:
101 logger.warning(f"Failed to start webhook dispatcher: {e}")
102 # Don't fail startup if webhook dispatcher fails - it's not critical
103 webhook_dispatcher = None
105 except Exception as e:
106 logger.error(f"Failed to start health monitoring: {e}")
107 raise
109 yield
111 # Shutdown
112 logger.info("Shutting down Health API Service")
113 if health_check_task: 113 ↛ 117line 113 didn't jump to line 117 because the condition on line 113 was always true
114 health_check_task.cancel()
115 with contextlib.suppress(asyncio.CancelledError):
116 await health_check_task
117 if webhook_dispatcher: 117 ↛ 118line 117 didn't jump to line 118 because the condition on line 117 was never true
118 await webhook_dispatcher.stop()
119 logger.info("Webhook dispatcher stopped")
120 logger.info("Health monitoring stopped")
123# Create FastAPI app with lifespan management
124app = FastAPI(
125 title="GCO Health Monitor API",
126 description="Health monitoring service for GCO (Global Capacity Orchestrator on AWS) EKS clusters",
127 version="1.0.0",
128 lifespan=lifespan,
129)
131# Add authentication middleware
132app.add_middleware(AuthenticationMiddleware)
135async def background_health_monitor() -> None:
136 """
137 Background task that continuously monitors cluster health
138 and publishes metrics to CloudWatch
139 """
140 global current_health_status
142 while True:
143 try:
144 if health_monitor is None: 144 ↛ 145line 144 didn't jump to line 145 because the condition on line 144 was never true
145 logger.warning("Health monitor not initialized, waiting...")
146 await asyncio.sleep(10)
147 continue
148 current_health_status = await health_monitor.get_health_status()
149 logger.debug(f"Health status updated: {current_health_status.status}")
151 # Periodically sync ALB hostname in SSM (self-healing)
152 await health_monitor.sync_alb_registration()
154 # Publish metrics to CloudWatch for dashboard visibility
155 if health_metrics and current_health_status:
156 try:
157 health_metrics.publish_resource_utilization(
158 cpu_percent=current_health_status.resource_utilization.cpu,
159 memory_percent=current_health_status.resource_utilization.memory,
160 gpu_percent=current_health_status.resource_utilization.gpu,
161 active_jobs=current_health_status.active_jobs,
162 )
163 # Also publish health status
164 threshold_violations = (
165 current_health_status.get_threshold_violations()
166 if hasattr(current_health_status, "get_threshold_violations")
167 else []
168 )
169 health_metrics.publish_health_status(
170 is_healthy=(current_health_status.status == "healthy"),
171 threshold_violations=threshold_violations,
172 )
173 logger.debug("Published health metrics to CloudWatch")
174 except Exception as e:
175 logger.warning(f"Failed to publish health metrics to CloudWatch: {e}")
177 # Sleep for 30 seconds before next check
178 await asyncio.sleep(30)
180 except asyncio.CancelledError:
181 logger.info("Background health monitoring cancelled")
182 break
183 except Exception as e:
184 logger.error(f"Error in background health monitoring: {e}")
185 await asyncio.sleep(10) # Shorter sleep on error
188@app.get("/")
189async def root() -> dict[str, Any]:
190 """Root endpoint with basic service information"""
191 return {
192 "service": "GCO Health Monitor API",
193 "version": "1.0.0",
194 "status": "running",
195 "endpoints": {
196 "health": "/api/v1/health",
197 "metrics": "/api/v1/metrics",
198 "status": "/api/v1/status",
199 },
200 }
203@app.get("/api/v1/health")
204async def health_check() -> JSONResponse:
205 """
206 Primary health check endpoint for ALB health checks
207 Returns 200 if cluster is healthy, 503 if unhealthy
208 """
209 global current_health_status
211 try:
212 # If we don't have a current status, get one immediately
213 if current_health_status is None:
214 if health_monitor is None:
215 raise HTTPException(status_code=503, detail="Health monitor not initialized")
216 current_health_status = await health_monitor.get_health_status()
218 # Check if status is too old (more than 2 minutes)
219 if current_health_status: 219 ↛ 226line 219 didn't jump to line 226 because the condition on line 219 was always true
220 age_seconds = (datetime.now() - current_health_status.timestamp).total_seconds()
221 if age_seconds > 120 and health_monitor is not None: # 2 minutes
222 logger.warning(f"Health status is {age_seconds:.0f} seconds old, refreshing")
223 current_health_status = await health_monitor.get_health_status()
225 # Return appropriate HTTP status based on health
226 if current_health_status.status == "healthy":
227 return JSONResponse(
228 status_code=200,
229 content={
230 "status": "healthy",
231 "timestamp": current_health_status.timestamp.isoformat(),
232 "cluster_id": current_health_status.cluster_id,
233 "region": current_health_status.region,
234 },
235 )
236 return JSONResponse(
237 status_code=503,
238 content={
239 "status": "unhealthy",
240 "timestamp": current_health_status.timestamp.isoformat(),
241 "cluster_id": current_health_status.cluster_id,
242 "region": current_health_status.region,
243 "message": current_health_status.message,
244 },
245 )
247 except Exception as e:
248 logger.error(f"Health check failed: {e}")
249 return JSONResponse(
250 status_code=503,
251 content={
252 "status": "unhealthy",
253 "timestamp": datetime.now().isoformat(),
254 "error": "health monitor unavailable",
255 },
256 )
259@app.get("/api/v1/metrics")
260async def get_metrics() -> dict[str, Any]:
261 """
262 Detailed metrics endpoint with resource utilization information
263 """
264 global current_health_status
266 try:
267 # Get fresh metrics if needed
268 if current_health_status is None:
269 if health_monitor is None:
270 raise HTTPException(status_code=503, detail="Health monitor not initialized")
271 current_health_status = await health_monitor.get_health_status()
273 return {
274 "cluster_id": current_health_status.cluster_id,
275 "region": current_health_status.region,
276 "timestamp": current_health_status.timestamp.isoformat(),
277 "status": current_health_status.status,
278 "resource_utilization": {
279 "cpu_percent": round(current_health_status.resource_utilization.cpu, 2),
280 "memory_percent": round(current_health_status.resource_utilization.memory, 2),
281 "gpu_percent": round(current_health_status.resource_utilization.gpu, 2),
282 },
283 "thresholds": {
284 "cpu_threshold": current_health_status.thresholds.cpu_threshold,
285 "memory_threshold": current_health_status.thresholds.memory_threshold,
286 "gpu_threshold": current_health_status.thresholds.gpu_threshold,
287 },
288 "active_jobs": current_health_status.active_jobs,
289 "message": current_health_status.message,
290 "threshold_violations": (
291 current_health_status.get_threshold_violations()
292 if hasattr(current_health_status, "get_threshold_violations")
293 else []
294 ),
295 }
297 except Exception as e:
298 logger.error(f"Failed to get metrics: {e}")
299 raise HTTPException(status_code=500, detail=f"Failed to get metrics: {e!s}") from e
302@app.get("/api/v1/status")
303async def get_status() -> dict[str, Any]:
304 """
305 Service status endpoint with operational information
306 """
308 # Get webhook dispatcher metrics if available
309 webhook_metrics = None
310 if webhook_dispatcher: 310 ↛ 311line 310 didn't jump to line 311 because the condition on line 310 was never true
311 webhook_metrics = webhook_dispatcher.get_metrics()
313 service_status = {
314 "service": "GCO Health Monitor API",
315 "version": "1.0.0",
316 "uptime_seconds": None, # Could be implemented with start time tracking
317 "health_monitor_initialized": health_monitor is not None,
318 "background_task_running": health_check_task is not None and not health_check_task.done(),
319 "last_health_check": (
320 current_health_status.timestamp.isoformat() if current_health_status else None
321 ),
322 "webhook_dispatcher": {
323 "enabled": webhook_dispatcher is not None,
324 "running": webhook_metrics.get("running", False) if webhook_metrics else False,
325 "deliveries_total": (
326 webhook_metrics.get("deliveries_total", 0) if webhook_metrics else 0
327 ),
328 "deliveries_success": (
329 webhook_metrics.get("deliveries_success", 0) if webhook_metrics else 0
330 ),
331 "deliveries_failed": (
332 webhook_metrics.get("deliveries_failed", 0) if webhook_metrics else 0
333 ),
334 "cached_jobs": webhook_metrics.get("cached_jobs", 0) if webhook_metrics else 0,
335 },
336 "environment": {
337 "cluster_name": os.getenv("CLUSTER_NAME", "unknown"),
338 "region": os.getenv("REGION", "unknown"),
339 "cpu_threshold": os.getenv("CPU_THRESHOLD", "80"),
340 "memory_threshold": os.getenv("MEMORY_THRESHOLD", "85"),
341 "gpu_threshold": os.getenv("GPU_THRESHOLD", "90"),
342 },
343 }
345 return service_status
348@app.get("/healthz")
349async def kubernetes_health_check() -> dict[str, str]:
350 """
351 Kubernetes-style health check endpoint
352 Simple endpoint that returns 200 if the service is running
353 """
354 return {"status": "ok"}
357@app.get("/readyz")
358async def kubernetes_readiness_check() -> dict[str, str]:
359 """
360 Kubernetes-style readiness check endpoint
361 Returns 200 if the service is ready to serve traffic
362 """
364 if health_monitor is None:
365 raise HTTPException(status_code=503, detail="Health monitor not ready")
367 return {"status": "ready"}
370# Error handlers
371@app.exception_handler(Exception)
372async def global_exception_handler(request: Request, exc: Exception) -> JSONResponse:
373 """Global exception handler for unhandled errors"""
374 logger.error(f"Unhandled exception: {exc}")
375 return JSONResponse(
376 status_code=500,
377 content={
378 "error": "Internal server error",
379 "detail": str(exc) if os.getenv("DEBUG") else "An unexpected error occurred",
380 },
381 )
384def create_app() -> FastAPI:
385 """Factory function to create the FastAPI app"""
386 return app
389if __name__ == "__main__":
390 import uvicorn
392 # Configuration from environment variables
393 host = os.getenv("HOST", "0.0.0.0") # nosec B104 — must bind all interfaces inside K8s pod
394 port = int(os.getenv("PORT", "8080"))
395 log_level = os.getenv("LOG_LEVEL", "info").lower()
397 logger.info(f"Starting Health API on {host}:{port}")
399 uvicorn.run(
400 "gco.services.health_api:app", host=host, port=port, log_level=log_level, reload=False
401 )