Coverage for gco / services / health_api.py: 83%

136 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-30 21:47 +0000

1""" 

2Health API Service for GCO (Global Capacity Orchestrator on AWS). 

3 

4This FastAPI service exposes health status endpoints for: 

5- ALB health checks (/healthz, /readyz) 

6- Detailed health status (/api/v1/health) 

7- Resource utilization metrics (/api/v1/metrics) 

8- Service status information (/api/v1/status) 

9 

10The service runs a background task that continuously monitors cluster health 

11and caches the results for fast response times on health check endpoints. 

12 

13Endpoints: 

14 GET /healthz - Kubernetes liveness probe (always 200 if running) 

15 GET /readyz - Kubernetes readiness probe (200 if health monitor ready) 

16 GET /api/v1/health - Detailed health status (200 if healthy, 503 if not) 

17 GET /api/v1/metrics - Resource utilization metrics 

18 GET /api/v1/status - Service operational status 

19 

20Environment Variables: 

21 HOST: Bind address (default: 0.0.0.0) 

22 PORT: Listen port (default: 8080) 

23 LOG_LEVEL: Logging level (default: info) 

24 CLUSTER_NAME, REGION, *_THRESHOLD: See health_monitor.py 

25""" 

26 

27import asyncio 

28import contextlib 

29import logging 

30import os 

31from collections.abc import AsyncIterator 

32from contextlib import asynccontextmanager 

33from datetime import datetime 

34from typing import Any 

35 

36from fastapi import FastAPI, HTTPException, Request 

37from fastapi.responses import JSONResponse 

38 

39from gco.models import HealthStatus 

40from gco.services.auth_middleware import AuthenticationMiddleware 

41from gco.services.health_monitor import HealthMonitor, create_health_monitor_from_env 

42from gco.services.metrics_publisher import HealthMonitorMetrics 

43from gco.services.structured_logging import configure_structured_logging 

44from gco.services.webhook_dispatcher import ( 

45 WebhookDispatcher, 

46 create_webhook_dispatcher_from_env, 

47) 

48 

49logging.basicConfig( 

50 level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" 

51) 

52logger = logging.getLogger(__name__) 

53 

54# Global health monitor instance 

55health_monitor: HealthMonitor | None = None 

56health_metrics: HealthMonitorMetrics | None = None 

57webhook_dispatcher: WebhookDispatcher | None = None 

58current_health_status: HealthStatus | None = None 

59health_check_task = None 

60 

61 

62@asynccontextmanager 

63async def lifespan(app: FastAPI) -> AsyncIterator[None]: 

64 """ 

65 Application lifespan manager - starts and stops background health monitoring 

66 and webhook dispatcher. 

67 """ 

68 global health_monitor, health_metrics, health_check_task, webhook_dispatcher 

69 

70 # Startup 

71 logger.info("Starting Health API Service") 

72 try: 

73 health_monitor = create_health_monitor_from_env() 

74 

75 # Enable structured JSON logging now that we know cluster_id and region 

76 configure_structured_logging( 

77 service_name="health-api", 

78 cluster_id=health_monitor.cluster_id, 

79 region=health_monitor.region, 

80 ) 

81 # Initialize metrics publisher for CloudWatch custom metrics 

82 # Non-fatal: if credentials aren't available yet (e.g., Pod Identity agent 

83 # still starting), we skip metrics but keep serving health checks. 

84 try: 

85 health_metrics = HealthMonitorMetrics( 

86 cluster_name=health_monitor.cluster_id, 

87 region=health_monitor.region, 

88 ) 

89 except Exception as e: 

90 logger.warning(f"Failed to initialize CloudWatch metrics publisher: {e}") 

91 health_metrics = None 

92 health_check_task = asyncio.create_task(background_health_monitor()) 

93 logger.info("Health monitoring started") 

94 

95 # Start webhook dispatcher for job event notifications 

96 try: 

97 webhook_dispatcher = create_webhook_dispatcher_from_env() 

98 await webhook_dispatcher.start() 

99 logger.info("Webhook dispatcher started") 

100 except Exception as e: 

101 logger.warning(f"Failed to start webhook dispatcher: {e}") 

102 # Don't fail startup if webhook dispatcher fails - it's not critical 

103 webhook_dispatcher = None 

104 

105 except Exception as e: 

106 logger.error(f"Failed to start health monitoring: {e}") 

107 raise 

108 

109 yield 

110 

111 # Shutdown 

112 logger.info("Shutting down Health API Service") 

113 if health_check_task: 113 ↛ 117line 113 didn't jump to line 117 because the condition on line 113 was always true

114 health_check_task.cancel() 

115 with contextlib.suppress(asyncio.CancelledError): 

116 await health_check_task 

117 if webhook_dispatcher: 117 ↛ 118line 117 didn't jump to line 118 because the condition on line 117 was never true

118 await webhook_dispatcher.stop() 

119 logger.info("Webhook dispatcher stopped") 

120 logger.info("Health monitoring stopped") 

121 

122 

123# Create FastAPI app with lifespan management 

124app = FastAPI( 

125 title="GCO Health Monitor API", 

126 description="Health monitoring service for GCO (Global Capacity Orchestrator on AWS) EKS clusters", 

127 version="1.0.0", 

128 lifespan=lifespan, 

129) 

130 

131# Add authentication middleware 

132app.add_middleware(AuthenticationMiddleware) 

133 

134 

135async def background_health_monitor() -> None: 

136 """ 

137 Background task that continuously monitors cluster health 

138 and publishes metrics to CloudWatch 

139 """ 

140 global current_health_status 

141 

142 while True: 

143 try: 

144 if health_monitor is None: 144 ↛ 145line 144 didn't jump to line 145 because the condition on line 144 was never true

145 logger.warning("Health monitor not initialized, waiting...") 

146 await asyncio.sleep(10) 

147 continue 

148 current_health_status = await health_monitor.get_health_status() 

149 logger.debug(f"Health status updated: {current_health_status.status}") 

150 

151 # Periodically sync ALB hostname in SSM (self-healing) 

152 await health_monitor.sync_alb_registration() 

153 

154 # Publish metrics to CloudWatch for dashboard visibility 

155 if health_metrics and current_health_status: 

156 try: 

157 health_metrics.publish_resource_utilization( 

158 cpu_percent=current_health_status.resource_utilization.cpu, 

159 memory_percent=current_health_status.resource_utilization.memory, 

160 gpu_percent=current_health_status.resource_utilization.gpu, 

161 active_jobs=current_health_status.active_jobs, 

162 ) 

163 # Also publish health status 

164 threshold_violations = ( 

165 current_health_status.get_threshold_violations() 

166 if hasattr(current_health_status, "get_threshold_violations") 

167 else [] 

168 ) 

169 health_metrics.publish_health_status( 

170 is_healthy=(current_health_status.status == "healthy"), 

171 threshold_violations=threshold_violations, 

172 ) 

173 logger.debug("Published health metrics to CloudWatch") 

174 except Exception as e: 

175 logger.warning(f"Failed to publish health metrics to CloudWatch: {e}") 

176 

177 # Sleep for 30 seconds before next check 

178 await asyncio.sleep(30) 

179 

180 except asyncio.CancelledError: 

181 logger.info("Background health monitoring cancelled") 

182 break 

183 except Exception as e: 

184 logger.error(f"Error in background health monitoring: {e}") 

185 await asyncio.sleep(10) # Shorter sleep on error 

186 

187 

188@app.get("/") 

189async def root() -> dict[str, Any]: 

190 """Root endpoint with basic service information""" 

191 return { 

192 "service": "GCO Health Monitor API", 

193 "version": "1.0.0", 

194 "status": "running", 

195 "endpoints": { 

196 "health": "/api/v1/health", 

197 "metrics": "/api/v1/metrics", 

198 "status": "/api/v1/status", 

199 }, 

200 } 

201 

202 

203@app.get("/api/v1/health") 

204async def health_check() -> JSONResponse: 

205 """ 

206 Primary health check endpoint for ALB health checks 

207 Returns 200 if cluster is healthy, 503 if unhealthy 

208 """ 

209 global current_health_status 

210 

211 try: 

212 # If we don't have a current status, get one immediately 

213 if current_health_status is None: 

214 if health_monitor is None: 

215 raise HTTPException(status_code=503, detail="Health monitor not initialized") 

216 current_health_status = await health_monitor.get_health_status() 

217 

218 # Check if status is too old (more than 2 minutes) 

219 if current_health_status: 219 ↛ 226line 219 didn't jump to line 226 because the condition on line 219 was always true

220 age_seconds = (datetime.now() - current_health_status.timestamp).total_seconds() 

221 if age_seconds > 120 and health_monitor is not None: # 2 minutes 

222 logger.warning(f"Health status is {age_seconds:.0f} seconds old, refreshing") 

223 current_health_status = await health_monitor.get_health_status() 

224 

225 # Return appropriate HTTP status based on health 

226 if current_health_status.status == "healthy": 

227 return JSONResponse( 

228 status_code=200, 

229 content={ 

230 "status": "healthy", 

231 "timestamp": current_health_status.timestamp.isoformat(), 

232 "cluster_id": current_health_status.cluster_id, 

233 "region": current_health_status.region, 

234 }, 

235 ) 

236 return JSONResponse( 

237 status_code=503, 

238 content={ 

239 "status": "unhealthy", 

240 "timestamp": current_health_status.timestamp.isoformat(), 

241 "cluster_id": current_health_status.cluster_id, 

242 "region": current_health_status.region, 

243 "message": current_health_status.message, 

244 }, 

245 ) 

246 

247 except Exception as e: 

248 logger.error(f"Health check failed: {e}") 

249 return JSONResponse( 

250 status_code=503, 

251 content={ 

252 "status": "unhealthy", 

253 "timestamp": datetime.now().isoformat(), 

254 "error": "health monitor unavailable", 

255 }, 

256 ) 

257 

258 

259@app.get("/api/v1/metrics") 

260async def get_metrics() -> dict[str, Any]: 

261 """ 

262 Detailed metrics endpoint with resource utilization information 

263 """ 

264 global current_health_status 

265 

266 try: 

267 # Get fresh metrics if needed 

268 if current_health_status is None: 

269 if health_monitor is None: 

270 raise HTTPException(status_code=503, detail="Health monitor not initialized") 

271 current_health_status = await health_monitor.get_health_status() 

272 

273 return { 

274 "cluster_id": current_health_status.cluster_id, 

275 "region": current_health_status.region, 

276 "timestamp": current_health_status.timestamp.isoformat(), 

277 "status": current_health_status.status, 

278 "resource_utilization": { 

279 "cpu_percent": round(current_health_status.resource_utilization.cpu, 2), 

280 "memory_percent": round(current_health_status.resource_utilization.memory, 2), 

281 "gpu_percent": round(current_health_status.resource_utilization.gpu, 2), 

282 }, 

283 "thresholds": { 

284 "cpu_threshold": current_health_status.thresholds.cpu_threshold, 

285 "memory_threshold": current_health_status.thresholds.memory_threshold, 

286 "gpu_threshold": current_health_status.thresholds.gpu_threshold, 

287 }, 

288 "active_jobs": current_health_status.active_jobs, 

289 "message": current_health_status.message, 

290 "threshold_violations": ( 

291 current_health_status.get_threshold_violations() 

292 if hasattr(current_health_status, "get_threshold_violations") 

293 else [] 

294 ), 

295 } 

296 

297 except Exception as e: 

298 logger.error(f"Failed to get metrics: {e}") 

299 raise HTTPException(status_code=500, detail=f"Failed to get metrics: {e!s}") from e 

300 

301 

302@app.get("/api/v1/status") 

303async def get_status() -> dict[str, Any]: 

304 """ 

305 Service status endpoint with operational information 

306 """ 

307 

308 # Get webhook dispatcher metrics if available 

309 webhook_metrics = None 

310 if webhook_dispatcher: 310 ↛ 311line 310 didn't jump to line 311 because the condition on line 310 was never true

311 webhook_metrics = webhook_dispatcher.get_metrics() 

312 

313 service_status = { 

314 "service": "GCO Health Monitor API", 

315 "version": "1.0.0", 

316 "uptime_seconds": None, # Could be implemented with start time tracking 

317 "health_monitor_initialized": health_monitor is not None, 

318 "background_task_running": health_check_task is not None and not health_check_task.done(), 

319 "last_health_check": ( 

320 current_health_status.timestamp.isoformat() if current_health_status else None 

321 ), 

322 "webhook_dispatcher": { 

323 "enabled": webhook_dispatcher is not None, 

324 "running": webhook_metrics.get("running", False) if webhook_metrics else False, 

325 "deliveries_total": ( 

326 webhook_metrics.get("deliveries_total", 0) if webhook_metrics else 0 

327 ), 

328 "deliveries_success": ( 

329 webhook_metrics.get("deliveries_success", 0) if webhook_metrics else 0 

330 ), 

331 "deliveries_failed": ( 

332 webhook_metrics.get("deliveries_failed", 0) if webhook_metrics else 0 

333 ), 

334 "cached_jobs": webhook_metrics.get("cached_jobs", 0) if webhook_metrics else 0, 

335 }, 

336 "environment": { 

337 "cluster_name": os.getenv("CLUSTER_NAME", "unknown"), 

338 "region": os.getenv("REGION", "unknown"), 

339 "cpu_threshold": os.getenv("CPU_THRESHOLD", "80"), 

340 "memory_threshold": os.getenv("MEMORY_THRESHOLD", "85"), 

341 "gpu_threshold": os.getenv("GPU_THRESHOLD", "90"), 

342 }, 

343 } 

344 

345 return service_status 

346 

347 

348@app.get("/healthz") 

349async def kubernetes_health_check() -> dict[str, str]: 

350 """ 

351 Kubernetes-style health check endpoint 

352 Simple endpoint that returns 200 if the service is running 

353 """ 

354 return {"status": "ok"} 

355 

356 

357@app.get("/readyz") 

358async def kubernetes_readiness_check() -> dict[str, str]: 

359 """ 

360 Kubernetes-style readiness check endpoint 

361 Returns 200 if the service is ready to serve traffic 

362 """ 

363 

364 if health_monitor is None: 

365 raise HTTPException(status_code=503, detail="Health monitor not ready") 

366 

367 return {"status": "ready"} 

368 

369 

370# Error handlers 

371@app.exception_handler(Exception) 

372async def global_exception_handler(request: Request, exc: Exception) -> JSONResponse: 

373 """Global exception handler for unhandled errors""" 

374 logger.error(f"Unhandled exception: {exc}") 

375 return JSONResponse( 

376 status_code=500, 

377 content={ 

378 "error": "Internal server error", 

379 "detail": str(exc) if os.getenv("DEBUG") else "An unexpected error occurred", 

380 }, 

381 ) 

382 

383 

384def create_app() -> FastAPI: 

385 """Factory function to create the FastAPI app""" 

386 return app 

387 

388 

389if __name__ == "__main__": 

390 import uvicorn 

391 

392 # Configuration from environment variables 

393 host = os.getenv("HOST", "0.0.0.0") # nosec B104 — must bind all interfaces inside K8s pod 

394 port = int(os.getenv("PORT", "8080")) 

395 log_level = os.getenv("LOG_LEVEL", "info").lower() 

396 

397 logger.info(f"Starting Health API on {host}:{port}") 

398 

399 uvicorn.run( 

400 "gco.services.health_api:app", host=host, port=port, log_level=log_level, reload=False 

401 )