Coverage for gco / services / manifest_api.py: 87%

123 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-30 21:47 +0000

1""" 

2Manifest API Service for GCO (Global Capacity Orchestrator on AWS). 

3 

4This FastAPI service provides REST endpoints for Kubernetes manifest 

5submission, validation, and management. Endpoint implementations live 

6in the ``api_routes`` sub-package; this module wires them together and 

7owns the application lifecycle, Pydantic request/response models, and 

8health probes. 

9 

10See ``api_routes/`` for the individual routers: 

11 - manifests.py — manifest submit / validate / resource CRUD 

12 - jobs.py — job list / get / logs / events / metrics / delete / retry 

13 - templates.py — job template CRUD + create-from-template 

14 - webhooks.py — webhook registration 

15 - queue.py — DynamoDB-backed global job queue 

16""" 

17 

18from __future__ import annotations 

19 

20import logging 

21import os 

22from collections.abc import AsyncIterator, Awaitable, Callable 

23from contextlib import asynccontextmanager 

24from datetime import UTC, datetime 

25from typing import Any 

26 

27from fastapi import FastAPI, HTTPException, Request 

28from fastapi.responses import JSONResponse 

29from starlette.middleware.base import BaseHTTPMiddleware 

30from starlette.responses import Response 

31from starlette.types import ASGIApp 

32 

33from gco.services.auth_middleware import AuthenticationMiddleware 

34from gco.services.manifest_processor import ( 

35 ManifestProcessor, 

36 create_manifest_processor_from_env, 

37) 

38from gco.services.metrics_publisher import ManifestProcessorMetrics 

39from gco.services.structured_logging import configure_structured_logging 

40from gco.services.template_store import ( 

41 JobStore, 

42 TemplateStore, 

43 WebhookStore, 

44 get_job_store, 

45 get_template_store, 

46 get_webhook_store, 

47) 

48 

49logging.basicConfig( 

50 level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" 

51) 

52logger = logging.getLogger(__name__) 

53 

54 

55# --------------------------------------------------------------------------- 

56# Request Size Limit Middleware 

57# --------------------------------------------------------------------------- 

58 

59# Default max request body size: 1MB 

60DEFAULT_MAX_REQUEST_BODY_BYTES = 1_048_576 

61 

62 

63class RequestSizeLimitMiddleware(BaseHTTPMiddleware): 

64 """ 

65 Middleware to enforce request body size limits. 

66 

67 Rejects requests that exceed the configured maximum body size with HTTP 413 

68 (Payload Too Large). Checks the Content-Length header first for an early 

69 rejection without reading the body. For requests without Content-Length, 

70 reads up to limit + 1 byte and rejects if exceeded. 

71 """ 

72 

73 def __init__(self, app: ASGIApp, max_body_bytes: int = DEFAULT_MAX_REQUEST_BODY_BYTES) -> None: 

74 super().__init__(app) 

75 self.max_body_bytes = max_body_bytes 

76 

77 async def dispatch( 

78 self, 

79 request: Request, 

80 call_next: Callable[[Request], Awaitable[Response]], 

81 ) -> Response: 

82 # Skip size checks for GET, HEAD, OPTIONS, DELETE (typically no body) 

83 if request.method in ("GET", "HEAD", "OPTIONS", "DELETE"): 

84 return await call_next(request) 

85 

86 # Check Content-Length header for early rejection 

87 content_length = request.headers.get("content-length") 

88 if content_length: 88 ↛ 104line 88 didn't jump to line 104 because the condition on line 88 was always true

89 try: 

90 if int(content_length) > self.max_body_bytes: 

91 return JSONResponse( 

92 status_code=413, 

93 content={ 

94 "detail": f"Request body exceeds maximum size of {self.max_body_bytes} bytes" 

95 }, 

96 ) 

97 except (ValueError, TypeError): 

98 # Invalid Content-Length header — let the request proceed 

99 # and let downstream validation handle it 

100 pass 

101 

102 # For requests without Content-Length (chunked transfer), read up to 

103 # limit + 1 byte to detect oversized bodies 

104 if not content_length: 104 ↛ 105line 104 didn't jump to line 105 because the condition on line 104 was never true

105 body = await request.body() 

106 if len(body) > self.max_body_bytes: 

107 return JSONResponse( 

108 status_code=413, 

109 content={ 

110 "detail": f"Request body exceeds maximum size of {self.max_body_bytes} bytes" 

111 }, 

112 ) 

113 

114 return await call_next(request) 

115 

116 

117# --------------------------------------------------------------------------- 

118# Global state — populated by lifespan, read by routers via this module. 

119# --------------------------------------------------------------------------- 

120manifest_processor: ManifestProcessor | None = None 

121manifest_metrics: ManifestProcessorMetrics | None = None 

122template_store: TemplateStore | None = None 

123webhook_store: WebhookStore | None = None 

124job_store: JobStore | None = None 

125 

126 

127# ============================================================================= 

128# Pydantic Models for API 

129# ============================================================================= 

130 

131 

132# ============================================================================= 

133# Application Lifecycle 

134# ============================================================================= 

135 

136 

137@asynccontextmanager 

138async def lifespan(app: FastAPI) -> AsyncIterator[None]: 

139 """Application lifespan manager — initializes manifest processor and stores.""" 

140 global manifest_processor, manifest_metrics, template_store, webhook_store, job_store 

141 

142 logger.info("Starting Manifest API Service") 

143 try: 

144 manifest_processor = create_manifest_processor_from_env() 

145 

146 configure_structured_logging( 

147 service_name="manifest-api", 

148 cluster_id=manifest_processor.cluster_id, 

149 region=manifest_processor.region, 

150 ) 

151 

152 manifest_metrics = ManifestProcessorMetrics( 

153 cluster_name=manifest_processor.cluster_id, 

154 region=manifest_processor.region, 

155 ) 

156 logger.info("Manifest processor initialized") 

157 

158 template_store = get_template_store() 

159 webhook_store = get_webhook_store() 

160 job_store = get_job_store() 

161 logger.info("DynamoDB stores initialized") 

162 except Exception as e: 

163 logger.error(f"Failed to initialize manifest processor: {e}") 

164 raise 

165 

166 yield 

167 

168 logger.info("Shutting down Manifest API Service") 

169 

170 

171# ============================================================================= 

172# Create FastAPI app and include routers 

173# ============================================================================= 

174 

175app = FastAPI( 

176 title="GCO Manifest Processor API", 

177 description="Kubernetes manifest submission and management service for GCO (Global Capacity Orchestrator on AWS)", 

178 version="2.0.0", 

179 lifespan=lifespan, 

180) 

181 

182app.add_middleware(AuthenticationMiddleware) 

183 

184# Request size limit middleware — added after auth middleware so it executes 

185# first in the request pipeline (Starlette processes middleware in LIFO order). 

186_max_body_bytes = int(os.getenv("MAX_REQUEST_BODY_BYTES", str(DEFAULT_MAX_REQUEST_BODY_BYTES))) 

187app.add_middleware(RequestSizeLimitMiddleware, max_body_bytes=_max_body_bytes) 

188 

189# Include domain routers 

190from gco.services.api_routes.jobs import router as jobs_router # noqa: E402 

191from gco.services.api_routes.manifests import router as manifests_router # noqa: E402 

192from gco.services.api_routes.queue import router as queue_router # noqa: E402 

193from gco.services.api_routes.templates import router as templates_router # noqa: E402 

194from gco.services.api_routes.webhooks import router as webhooks_router # noqa: E402 

195 

196app.include_router(manifests_router) 

197app.include_router(jobs_router) 

198app.include_router(templates_router) 

199app.include_router(webhooks_router) 

200app.include_router(queue_router) 

201 

202 

203# ============================================================================= 

204# Root & Health Endpoints (kept here — they're thin and tightly coupled to state) 

205# ============================================================================= 

206 

207 

208@app.get("/", tags=["Info"]) 

209async def root() -> dict[str, Any]: 

210 """Root endpoint with basic service information and API overview.""" 

211 return { 

212 "service": "GCO Manifest Processor API", 

213 "version": "2.0.0", 

214 "status": "running", 

215 "cluster_id": (manifest_processor.cluster_id if manifest_processor else "unknown"), 

216 "region": (manifest_processor.region if manifest_processor else "unknown"), 

217 "endpoints": { 

218 "manifests": { 

219 "submit": "POST /api/v1/manifests", 

220 "validate": "POST /api/v1/manifests/validate", 

221 "get": "GET /api/v1/manifests/{namespace}/{name}", 

222 "delete": "DELETE /api/v1/manifests/{namespace}/{name}", 

223 }, 

224 "jobs": { 

225 "list": "GET /api/v1/jobs", 

226 "get": "GET /api/v1/jobs/{namespace}/{name}", 

227 "logs": "GET /api/v1/jobs/{namespace}/{name}/logs", 

228 "events": "GET /api/v1/jobs/{namespace}/{name}/events", 

229 "pods": "GET /api/v1/jobs/{namespace}/{name}/pods", 

230 "metrics": "GET /api/v1/jobs/{namespace}/{name}/metrics", 

231 "delete": "DELETE /api/v1/jobs/{namespace}/{name}", 

232 "bulk_delete": "DELETE /api/v1/jobs", 

233 "retry": "POST /api/v1/jobs/{namespace}/{name}/retry", 

234 }, 

235 "templates": { 

236 "list": "GET /api/v1/templates", 

237 "create": "POST /api/v1/templates", 

238 "get": "GET /api/v1/templates/{name}", 

239 "delete": "DELETE /api/v1/templates/{name}", 

240 "create_job": "POST /api/v1/jobs/from-template/{name}", 

241 }, 

242 "webhooks": { 

243 "list": "GET /api/v1/webhooks", 

244 "create": "POST /api/v1/webhooks", 

245 "delete": "DELETE /api/v1/webhooks/{id}", 

246 }, 

247 "health": "GET /api/v1/health", 

248 "status": "GET /api/v1/status", 

249 }, 

250 } 

251 

252 

253@app.get("/healthz", tags=["Health"]) 

254async def kubernetes_health_check() -> dict[str, str]: 

255 """Kubernetes-style liveness probe.""" 

256 return {"status": "ok"} 

257 

258 

259@app.get("/readyz", tags=["Health"]) 

260async def kubernetes_readiness_check() -> dict[str, str]: 

261 """Kubernetes-style readiness probe.""" 

262 if manifest_processor is None: 

263 raise HTTPException(status_code=503, detail="Manifest processor not ready") 

264 return {"status": "ready"} 

265 

266 

267@app.get("/api/v1/health", tags=["Health"]) 

268async def health_check() -> JSONResponse: 

269 """Health check endpoint for load balancer health checks.""" 

270 try: 

271 if manifest_processor is None: 

272 return JSONResponse( 

273 status_code=503, 

274 content={ 

275 "status": "unhealthy", 

276 "timestamp": datetime.now(UTC).isoformat(), 

277 "message": "Manifest processor not initialized", 

278 }, 

279 ) 

280 

281 try: 

282 manifest_processor.core_v1.list_namespace(limit=1) 

283 api_healthy = True 

284 except Exception as e: 

285 logger.error(f"Kubernetes API health check failed: {e}") 

286 api_healthy = False 

287 

288 status_code = 200 if api_healthy else 503 

289 return JSONResponse( 

290 status_code=status_code, 

291 content={ 

292 "status": "healthy" if api_healthy else "unhealthy", 

293 "timestamp": datetime.now(UTC).isoformat(), 

294 "cluster_id": manifest_processor.cluster_id, 

295 "region": manifest_processor.region, 

296 "kubernetes_api": "connected" if api_healthy else "disconnected", 

297 }, 

298 ) 

299 

300 except Exception as e: 

301 logger.error(f"Health check failed: {e}") 

302 return JSONResponse( 

303 status_code=503, 

304 content={ 

305 "status": "unhealthy", 

306 "timestamp": datetime.now(UTC).isoformat(), 

307 "error": "manifest processor unavailable", 

308 }, 

309 ) 

310 

311 

312@app.get("/api/v1/status", tags=["Health"]) 

313async def get_service_status() -> dict[str, Any]: 

314 """Service status endpoint with detailed information.""" 

315 templates_count = 0 

316 webhooks_count = 0 

317 try: 

318 if template_store: 318 ↛ 320line 318 didn't jump to line 320 because the condition on line 318 was always true

319 templates_count = len(template_store.list_templates()) 

320 if webhook_store: 

321 webhooks_count = len(webhook_store.list_webhooks()) 

322 except Exception as e: 

323 logger.warning(f"Failed to get store counts: {e}") 

324 

325 status_info: dict[str, Any] = { 

326 "service": "GCO Manifest Processor API", 

327 "version": "2.0.0", 

328 "timestamp": datetime.now(UTC).isoformat(), 

329 "manifest_processor_initialized": manifest_processor is not None, 

330 "environment": { 

331 "cluster_name": os.getenv("CLUSTER_NAME", "unknown"), 

332 "region": os.getenv("REGION", "unknown"), 

333 "max_cpu_per_manifest": os.getenv("MAX_CPU_PER_MANIFEST", "10"), 

334 "max_memory_per_manifest": os.getenv("MAX_MEMORY_PER_MANIFEST", "32Gi"), 

335 "max_gpu_per_manifest": os.getenv("MAX_GPU_PER_MANIFEST", "4"), 

336 "allowed_namespaces": os.getenv("ALLOWED_NAMESPACES", "default,gco-jobs"), 

337 "validation_enabled": os.getenv("VALIDATION_ENABLED", "true"), 

338 }, 

339 "templates_count": templates_count, 

340 "webhooks_count": webhooks_count, 

341 } 

342 

343 if manifest_processor: 343 ↛ 358line 343 didn't jump to line 358 because the condition on line 343 was always true

344 status_info.update( 

345 { 

346 "cluster_id": manifest_processor.cluster_id, 

347 "region": manifest_processor.region, 

348 "resource_limits": { 

349 "max_cpu_millicores": manifest_processor.max_cpu_per_manifest, 

350 "max_memory_bytes": manifest_processor.max_memory_per_manifest, 

351 "max_gpu_count": manifest_processor.max_gpu_per_manifest, 

352 }, 

353 "allowed_namespaces": list(manifest_processor.allowed_namespaces), 

354 "validation_enabled": manifest_processor.validation_enabled, 

355 } 

356 ) 

357 

358 return status_info 

359 

360 

361# ============================================================================= 

362# Error Handlers 

363# ============================================================================= 

364 

365 

366@app.exception_handler(Exception) 

367async def global_exception_handler(request: Request, exc: Exception) -> JSONResponse: 

368 """Global exception handler for unhandled errors.""" 

369 logger.error(f"Unhandled exception in {request.method} {request.url}: {exc}") 

370 return JSONResponse( 

371 status_code=500, 

372 content={ 

373 "error": "Internal server error", 

374 "detail": str(exc) if os.getenv("DEBUG") else "An unexpected error occurred", 

375 "timestamp": datetime.now(UTC).isoformat(), 

376 }, 

377 ) 

378 

379 

380# ============================================================================= 

381# App Factory & Entrypoint 

382# ============================================================================= 

383 

384 

385def create_app() -> FastAPI: 

386 """Factory function to create the FastAPI app.""" 

387 return app 

388 

389 

390if __name__ == "__main__": 

391 import uvicorn 

392 

393 host = os.getenv("HOST", "0.0.0.0") # nosec B104 — must bind all interfaces inside K8s pod 

394 port = int(os.getenv("PORT", "8080")) 

395 log_level = os.getenv("LOG_LEVEL", "info").lower() 

396 

397 logger.info(f"Starting Manifest API on {host}:{port}") 

398 

399 uvicorn.run( 

400 "gco.services.manifest_api:app", 

401 host=host, 

402 port=port, 

403 log_level=log_level, 

404 reload=False, 

405 )