Coverage for gco / services / manifest_api.py: 87%
123 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-30 21:47 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-30 21:47 +0000
1"""
2Manifest API Service for GCO (Global Capacity Orchestrator on AWS).
4This FastAPI service provides REST endpoints for Kubernetes manifest
5submission, validation, and management. Endpoint implementations live
6in the ``api_routes`` sub-package; this module wires them together and
7owns the application lifecycle, Pydantic request/response models, and
8health probes.
10See ``api_routes/`` for the individual routers:
11 - manifests.py — manifest submit / validate / resource CRUD
12 - jobs.py — job list / get / logs / events / metrics / delete / retry
13 - templates.py — job template CRUD + create-from-template
14 - webhooks.py — webhook registration
15 - queue.py — DynamoDB-backed global job queue
16"""
18from __future__ import annotations
20import logging
21import os
22from collections.abc import AsyncIterator, Awaitable, Callable
23from contextlib import asynccontextmanager
24from datetime import UTC, datetime
25from typing import Any
27from fastapi import FastAPI, HTTPException, Request
28from fastapi.responses import JSONResponse
29from starlette.middleware.base import BaseHTTPMiddleware
30from starlette.responses import Response
31from starlette.types import ASGIApp
33from gco.services.auth_middleware import AuthenticationMiddleware
34from gco.services.manifest_processor import (
35 ManifestProcessor,
36 create_manifest_processor_from_env,
37)
38from gco.services.metrics_publisher import ManifestProcessorMetrics
39from gco.services.structured_logging import configure_structured_logging
40from gco.services.template_store import (
41 JobStore,
42 TemplateStore,
43 WebhookStore,
44 get_job_store,
45 get_template_store,
46 get_webhook_store,
47)
49logging.basicConfig(
50 level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
51)
52logger = logging.getLogger(__name__)
55# ---------------------------------------------------------------------------
56# Request Size Limit Middleware
57# ---------------------------------------------------------------------------
59# Default max request body size: 1MB
60DEFAULT_MAX_REQUEST_BODY_BYTES = 1_048_576
63class RequestSizeLimitMiddleware(BaseHTTPMiddleware):
64 """
65 Middleware to enforce request body size limits.
67 Rejects requests that exceed the configured maximum body size with HTTP 413
68 (Payload Too Large). Checks the Content-Length header first for an early
69 rejection without reading the body. For requests without Content-Length,
70 reads up to limit + 1 byte and rejects if exceeded.
71 """
73 def __init__(self, app: ASGIApp, max_body_bytes: int = DEFAULT_MAX_REQUEST_BODY_BYTES) -> None:
74 super().__init__(app)
75 self.max_body_bytes = max_body_bytes
77 async def dispatch(
78 self,
79 request: Request,
80 call_next: Callable[[Request], Awaitable[Response]],
81 ) -> Response:
82 # Skip size checks for GET, HEAD, OPTIONS, DELETE (typically no body)
83 if request.method in ("GET", "HEAD", "OPTIONS", "DELETE"):
84 return await call_next(request)
86 # Check Content-Length header for early rejection
87 content_length = request.headers.get("content-length")
88 if content_length: 88 ↛ 104line 88 didn't jump to line 104 because the condition on line 88 was always true
89 try:
90 if int(content_length) > self.max_body_bytes:
91 return JSONResponse(
92 status_code=413,
93 content={
94 "detail": f"Request body exceeds maximum size of {self.max_body_bytes} bytes"
95 },
96 )
97 except (ValueError, TypeError):
98 # Invalid Content-Length header — let the request proceed
99 # and let downstream validation handle it
100 pass
102 # For requests without Content-Length (chunked transfer), read up to
103 # limit + 1 byte to detect oversized bodies
104 if not content_length: 104 ↛ 105line 104 didn't jump to line 105 because the condition on line 104 was never true
105 body = await request.body()
106 if len(body) > self.max_body_bytes:
107 return JSONResponse(
108 status_code=413,
109 content={
110 "detail": f"Request body exceeds maximum size of {self.max_body_bytes} bytes"
111 },
112 )
114 return await call_next(request)
117# ---------------------------------------------------------------------------
118# Global state — populated by lifespan, read by routers via this module.
119# ---------------------------------------------------------------------------
120manifest_processor: ManifestProcessor | None = None
121manifest_metrics: ManifestProcessorMetrics | None = None
122template_store: TemplateStore | None = None
123webhook_store: WebhookStore | None = None
124job_store: JobStore | None = None
127# =============================================================================
128# Pydantic Models for API
129# =============================================================================
132# =============================================================================
133# Application Lifecycle
134# =============================================================================
137@asynccontextmanager
138async def lifespan(app: FastAPI) -> AsyncIterator[None]:
139 """Application lifespan manager — initializes manifest processor and stores."""
140 global manifest_processor, manifest_metrics, template_store, webhook_store, job_store
142 logger.info("Starting Manifest API Service")
143 try:
144 manifest_processor = create_manifest_processor_from_env()
146 configure_structured_logging(
147 service_name="manifest-api",
148 cluster_id=manifest_processor.cluster_id,
149 region=manifest_processor.region,
150 )
152 manifest_metrics = ManifestProcessorMetrics(
153 cluster_name=manifest_processor.cluster_id,
154 region=manifest_processor.region,
155 )
156 logger.info("Manifest processor initialized")
158 template_store = get_template_store()
159 webhook_store = get_webhook_store()
160 job_store = get_job_store()
161 logger.info("DynamoDB stores initialized")
162 except Exception as e:
163 logger.error(f"Failed to initialize manifest processor: {e}")
164 raise
166 yield
168 logger.info("Shutting down Manifest API Service")
171# =============================================================================
172# Create FastAPI app and include routers
173# =============================================================================
175app = FastAPI(
176 title="GCO Manifest Processor API",
177 description="Kubernetes manifest submission and management service for GCO (Global Capacity Orchestrator on AWS)",
178 version="2.0.0",
179 lifespan=lifespan,
180)
182app.add_middleware(AuthenticationMiddleware)
184# Request size limit middleware — added after auth middleware so it executes
185# first in the request pipeline (Starlette processes middleware in LIFO order).
186_max_body_bytes = int(os.getenv("MAX_REQUEST_BODY_BYTES", str(DEFAULT_MAX_REQUEST_BODY_BYTES)))
187app.add_middleware(RequestSizeLimitMiddleware, max_body_bytes=_max_body_bytes)
189# Include domain routers
190from gco.services.api_routes.jobs import router as jobs_router # noqa: E402
191from gco.services.api_routes.manifests import router as manifests_router # noqa: E402
192from gco.services.api_routes.queue import router as queue_router # noqa: E402
193from gco.services.api_routes.templates import router as templates_router # noqa: E402
194from gco.services.api_routes.webhooks import router as webhooks_router # noqa: E402
196app.include_router(manifests_router)
197app.include_router(jobs_router)
198app.include_router(templates_router)
199app.include_router(webhooks_router)
200app.include_router(queue_router)
203# =============================================================================
204# Root & Health Endpoints (kept here — they're thin and tightly coupled to state)
205# =============================================================================
208@app.get("/", tags=["Info"])
209async def root() -> dict[str, Any]:
210 """Root endpoint with basic service information and API overview."""
211 return {
212 "service": "GCO Manifest Processor API",
213 "version": "2.0.0",
214 "status": "running",
215 "cluster_id": (manifest_processor.cluster_id if manifest_processor else "unknown"),
216 "region": (manifest_processor.region if manifest_processor else "unknown"),
217 "endpoints": {
218 "manifests": {
219 "submit": "POST /api/v1/manifests",
220 "validate": "POST /api/v1/manifests/validate",
221 "get": "GET /api/v1/manifests/{namespace}/{name}",
222 "delete": "DELETE /api/v1/manifests/{namespace}/{name}",
223 },
224 "jobs": {
225 "list": "GET /api/v1/jobs",
226 "get": "GET /api/v1/jobs/{namespace}/{name}",
227 "logs": "GET /api/v1/jobs/{namespace}/{name}/logs",
228 "events": "GET /api/v1/jobs/{namespace}/{name}/events",
229 "pods": "GET /api/v1/jobs/{namespace}/{name}/pods",
230 "metrics": "GET /api/v1/jobs/{namespace}/{name}/metrics",
231 "delete": "DELETE /api/v1/jobs/{namespace}/{name}",
232 "bulk_delete": "DELETE /api/v1/jobs",
233 "retry": "POST /api/v1/jobs/{namespace}/{name}/retry",
234 },
235 "templates": {
236 "list": "GET /api/v1/templates",
237 "create": "POST /api/v1/templates",
238 "get": "GET /api/v1/templates/{name}",
239 "delete": "DELETE /api/v1/templates/{name}",
240 "create_job": "POST /api/v1/jobs/from-template/{name}",
241 },
242 "webhooks": {
243 "list": "GET /api/v1/webhooks",
244 "create": "POST /api/v1/webhooks",
245 "delete": "DELETE /api/v1/webhooks/{id}",
246 },
247 "health": "GET /api/v1/health",
248 "status": "GET /api/v1/status",
249 },
250 }
253@app.get("/healthz", tags=["Health"])
254async def kubernetes_health_check() -> dict[str, str]:
255 """Kubernetes-style liveness probe."""
256 return {"status": "ok"}
259@app.get("/readyz", tags=["Health"])
260async def kubernetes_readiness_check() -> dict[str, str]:
261 """Kubernetes-style readiness probe."""
262 if manifest_processor is None:
263 raise HTTPException(status_code=503, detail="Manifest processor not ready")
264 return {"status": "ready"}
267@app.get("/api/v1/health", tags=["Health"])
268async def health_check() -> JSONResponse:
269 """Health check endpoint for load balancer health checks."""
270 try:
271 if manifest_processor is None:
272 return JSONResponse(
273 status_code=503,
274 content={
275 "status": "unhealthy",
276 "timestamp": datetime.now(UTC).isoformat(),
277 "message": "Manifest processor not initialized",
278 },
279 )
281 try:
282 manifest_processor.core_v1.list_namespace(limit=1)
283 api_healthy = True
284 except Exception as e:
285 logger.error(f"Kubernetes API health check failed: {e}")
286 api_healthy = False
288 status_code = 200 if api_healthy else 503
289 return JSONResponse(
290 status_code=status_code,
291 content={
292 "status": "healthy" if api_healthy else "unhealthy",
293 "timestamp": datetime.now(UTC).isoformat(),
294 "cluster_id": manifest_processor.cluster_id,
295 "region": manifest_processor.region,
296 "kubernetes_api": "connected" if api_healthy else "disconnected",
297 },
298 )
300 except Exception as e:
301 logger.error(f"Health check failed: {e}")
302 return JSONResponse(
303 status_code=503,
304 content={
305 "status": "unhealthy",
306 "timestamp": datetime.now(UTC).isoformat(),
307 "error": "manifest processor unavailable",
308 },
309 )
312@app.get("/api/v1/status", tags=["Health"])
313async def get_service_status() -> dict[str, Any]:
314 """Service status endpoint with detailed information."""
315 templates_count = 0
316 webhooks_count = 0
317 try:
318 if template_store: 318 ↛ 320line 318 didn't jump to line 320 because the condition on line 318 was always true
319 templates_count = len(template_store.list_templates())
320 if webhook_store:
321 webhooks_count = len(webhook_store.list_webhooks())
322 except Exception as e:
323 logger.warning(f"Failed to get store counts: {e}")
325 status_info: dict[str, Any] = {
326 "service": "GCO Manifest Processor API",
327 "version": "2.0.0",
328 "timestamp": datetime.now(UTC).isoformat(),
329 "manifest_processor_initialized": manifest_processor is not None,
330 "environment": {
331 "cluster_name": os.getenv("CLUSTER_NAME", "unknown"),
332 "region": os.getenv("REGION", "unknown"),
333 "max_cpu_per_manifest": os.getenv("MAX_CPU_PER_MANIFEST", "10"),
334 "max_memory_per_manifest": os.getenv("MAX_MEMORY_PER_MANIFEST", "32Gi"),
335 "max_gpu_per_manifest": os.getenv("MAX_GPU_PER_MANIFEST", "4"),
336 "allowed_namespaces": os.getenv("ALLOWED_NAMESPACES", "default,gco-jobs"),
337 "validation_enabled": os.getenv("VALIDATION_ENABLED", "true"),
338 },
339 "templates_count": templates_count,
340 "webhooks_count": webhooks_count,
341 }
343 if manifest_processor: 343 ↛ 358line 343 didn't jump to line 358 because the condition on line 343 was always true
344 status_info.update(
345 {
346 "cluster_id": manifest_processor.cluster_id,
347 "region": manifest_processor.region,
348 "resource_limits": {
349 "max_cpu_millicores": manifest_processor.max_cpu_per_manifest,
350 "max_memory_bytes": manifest_processor.max_memory_per_manifest,
351 "max_gpu_count": manifest_processor.max_gpu_per_manifest,
352 },
353 "allowed_namespaces": list(manifest_processor.allowed_namespaces),
354 "validation_enabled": manifest_processor.validation_enabled,
355 }
356 )
358 return status_info
361# =============================================================================
362# Error Handlers
363# =============================================================================
366@app.exception_handler(Exception)
367async def global_exception_handler(request: Request, exc: Exception) -> JSONResponse:
368 """Global exception handler for unhandled errors."""
369 logger.error(f"Unhandled exception in {request.method} {request.url}: {exc}")
370 return JSONResponse(
371 status_code=500,
372 content={
373 "error": "Internal server error",
374 "detail": str(exc) if os.getenv("DEBUG") else "An unexpected error occurred",
375 "timestamp": datetime.now(UTC).isoformat(),
376 },
377 )
380# =============================================================================
381# App Factory & Entrypoint
382# =============================================================================
385def create_app() -> FastAPI:
386 """Factory function to create the FastAPI app."""
387 return app
390if __name__ == "__main__":
391 import uvicorn
393 host = os.getenv("HOST", "0.0.0.0") # nosec B104 — must bind all interfaces inside K8s pod
394 port = int(os.getenv("PORT", "8080"))
395 log_level = os.getenv("LOG_LEVEL", "info").lower()
397 logger.info(f"Starting Manifest API on {host}:{port}")
399 uvicorn.run(
400 "gco.services.manifest_api:app",
401 host=host,
402 port=port,
403 log_level=log_level,
404 reload=False,
405 )