Coverage for gco / services / api_shared.py: 98%
139 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-30 21:47 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-30 21:47 +0000
1"""
2Shared state, models, and helpers for the Manifest API routers.
4This module holds the global state (manifest processor, DynamoDB stores),
5Pydantic request/response models, and helper functions used across all API
6route modules. Centralizing them here avoids circular imports between
7manifest_api.py and the routers.
8"""
10from __future__ import annotations
12import logging
13from enum import StrEnum
14from typing import Any
16from fastapi import HTTPException
17from kubernetes.client.models import CoreV1Event, V1Job, V1Pod
18from pydantic import BaseModel, Field
20from gco.services.manifest_processor import ManifestProcessor
21from gco.services.metrics_publisher import ManifestProcessorMetrics
22from gco.services.template_store import (
23 JobStore,
24 TemplateStore,
25 WebhookStore,
26)
28logger = logging.getLogger(__name__)
31# ---------------------------------------------------------------------------
32# Shared enums and Pydantic models
33# ---------------------------------------------------------------------------
36class SortOrder(StrEnum):
37 ASC = "asc"
38 DESC = "desc"
41class JobStatus(StrEnum):
42 PENDING = "pending"
43 RUNNING = "running"
44 COMPLETED = "completed"
45 SUCCEEDED = "succeeded"
46 FAILED = "failed"
49class WebhookEvent(StrEnum):
50 JOB_COMPLETED = "job.completed"
51 JOB_FAILED = "job.failed"
52 JOB_STARTED = "job.started"
55class ManifestSubmissionAPIRequest(BaseModel):
56 """API model for manifest submission requests."""
58 manifests: list[dict[str, Any]] = Field(
59 ..., description="List of Kubernetes manifests to apply"
60 )
61 namespace: str | None = Field(
62 None, description="Default namespace for resources without namespace specified"
63 )
64 dry_run: bool = Field(False, description="If true, validate manifests without applying them")
65 validate_manifests: bool = Field(
66 True, description="If true, perform validation checks on manifests", alias="validate"
67 )
69 model_config = {
70 "json_schema_extra": {
71 "example": {
72 "manifests": [
73 {"apiVersion": "batch/v1", "kind": "Job", "metadata": {"name": "example"}}
74 ],
75 "namespace": "gco-jobs",
76 "dry_run": False,
77 }
78 }
79 }
82class ResourceIdentifier(BaseModel):
83 api_version: str = Field(..., description="Kubernetes API version (e.g., 'apps/v1')")
84 kind: str = Field(..., description="Kubernetes resource kind (e.g., 'Deployment')")
85 name: str = Field(..., description="Resource name")
86 namespace: str = Field(..., description="Resource namespace")
89class BulkDeleteRequest(BaseModel):
90 namespace: str | None = Field(None, description="Filter by namespace")
91 status: JobStatus | None = Field(None, description="Filter by status")
92 older_than_days: int | None = Field(
93 None, description="Delete jobs older than N days", ge=1, le=365
94 )
95 label_selector: str | None = Field(None, description="Kubernetes label selector")
96 dry_run: bool = Field(False, description="If true, only return what would be deleted")
98 model_config = {
99 "json_schema_extra": {
100 "example": {
101 "namespace": "gco-jobs",
102 "status": "completed",
103 "older_than_days": 7,
104 "dry_run": False,
105 }
106 }
107 }
110class JobTemplateRequest(BaseModel):
111 name: str = Field(..., description="Template name", min_length=1, max_length=63)
112 description: str | None = Field(None, description="Template description")
113 manifest: dict[str, Any] = Field(..., description="Job manifest template")
114 parameters: dict[str, Any] | None = Field(None, description="Default parameter values")
116 model_config = {
117 "json_schema_extra": {
118 "example": {
119 "name": "gpu-training-template",
120 "description": "Template for GPU training jobs",
121 "manifest": {
122 "apiVersion": "batch/v1",
123 "kind": "Job",
124 "metadata": {"name": "{{name}}"},
125 },
126 "parameters": {"image": "pytorch/pytorch:latest"},
127 }
128 }
129 }
132class JobFromTemplateRequest(BaseModel):
133 name: str = Field(..., description="Job name", min_length=1, max_length=63)
134 namespace: str = Field("gco-jobs", description="Target namespace")
135 parameters: dict[str, Any] | None = Field(None, description="Parameter overrides")
137 model_config = {
138 "json_schema_extra": {
139 "example": {
140 "name": "my-training-job",
141 "namespace": "gco-jobs",
142 "parameters": {"image": "my-custom-image:v1"},
143 }
144 }
145 }
148class WebhookRequest(BaseModel):
149 url: str = Field(..., description="Webhook URL to call")
150 events: list[WebhookEvent] = Field(..., description="Events to subscribe to")
151 namespace: str | None = Field(None, description="Filter by namespace (optional)")
152 secret: str | None = Field(None, description="Secret for HMAC signature (optional)")
154 model_config = {
155 "json_schema_extra": {
156 "example": {
157 "url": "https://example.com/webhook",
158 "events": ["job.completed", "job.failed"],
159 "namespace": "gco-jobs",
160 }
161 }
162 }
165class QueuedJobRequest(BaseModel):
166 manifest: dict[str, Any] = Field(..., description="Kubernetes job manifest")
167 target_region: str = Field(..., description="Target region for job execution")
168 namespace: str = Field("gco-jobs", description="Kubernetes namespace")
169 priority: int = Field(0, description="Job priority (higher = more important)", ge=0, le=100)
170 labels: dict[str, str] | None = Field(None, description="Optional labels for filtering")
172 model_config = {
173 "json_schema_extra": {
174 "example": {
175 "manifest": {
176 "apiVersion": "batch/v1",
177 "kind": "Job",
178 "metadata": {"name": "my-training-job"},
179 },
180 "target_region": "us-east-1",
181 "namespace": "gco-jobs",
182 "priority": 10,
183 }
184 }
185 }
188class PaginatedResponse(BaseModel):
189 total: int = Field(..., description="Total number of items")
190 limit: int = Field(..., description="Items per page")
191 offset: int = Field(..., description="Current offset")
192 has_more: bool = Field(..., description="Whether more items exist")
195class ErrorResponse(BaseModel):
196 error: str = Field(..., description="Error type")
197 detail: str = Field(..., description="Error details")
198 timestamp: str = Field(..., description="Error timestamp")
201# ---------------------------------------------------------------------------
202# Global state — populated by the lifespan handler in manifest_api.py
203# ---------------------------------------------------------------------------
204manifest_processor: ManifestProcessor | None = None
205manifest_metrics: ManifestProcessorMetrics | None = None
206template_store: TemplateStore | None = None
207webhook_store: WebhookStore | None = None
208job_store: JobStore | None = None
211# ---------------------------------------------------------------------------
212# Helper functions
213# ---------------------------------------------------------------------------
216def _check_processor() -> ManifestProcessor:
217 """Check if manifest processor is initialized and return it."""
218 # Import at call-time to read the global that lifespan populates on
219 # the manifest_api module (tests also patch it there).
220 from gco.services import manifest_api as _api
222 if _api.manifest_processor is None:
223 raise HTTPException(status_code=503, detail="Manifest processor not initialized")
224 return _api.manifest_processor
227def _check_namespace(namespace: str, processor: ManifestProcessor) -> None:
228 """Check if namespace is allowed."""
229 if namespace not in processor.allowed_namespaces:
230 raise HTTPException(
231 status_code=403,
232 detail=f"Namespace '{namespace}' not allowed. Allowed: {list(processor.allowed_namespaces)}",
233 )
236def _parse_job_to_dict(job: V1Job) -> dict[str, Any]:
237 """Parse a Kubernetes Job object to a dictionary."""
238 metadata = job.metadata
239 status = job.status
240 spec = job.spec
242 conditions = status.conditions or []
243 computed_status = "pending"
244 for condition in conditions:
245 if condition.type == "Complete" and condition.status == "True":
246 computed_status = "succeeded"
247 break
248 if condition.type == "Failed" and condition.status == "True": 248 ↛ 244line 248 didn't jump to line 244 because the condition on line 248 was always true
249 computed_status = "failed"
250 break
252 if computed_status == "pending" and (status.active or 0) > 0:
253 computed_status = "running"
255 return {
256 "metadata": {
257 "name": metadata.name,
258 "namespace": metadata.namespace,
259 "creationTimestamp": (
260 metadata.creation_timestamp.isoformat() if metadata.creation_timestamp else None
261 ),
262 "labels": metadata.labels or {},
263 "annotations": metadata.annotations or {},
264 "uid": metadata.uid,
265 },
266 "spec": {
267 "parallelism": spec.parallelism,
268 "completions": spec.completions,
269 "backoffLimit": spec.backoff_limit,
270 },
271 "status": {
272 "active": status.active or 0,
273 "succeeded": status.succeeded or 0,
274 "failed": status.failed or 0,
275 "startTime": status.start_time.isoformat() if status.start_time else None,
276 "completionTime": (
277 status.completion_time.isoformat() if status.completion_time else None
278 ),
279 "conditions": [
280 {
281 "type": c.type,
282 "status": c.status,
283 "reason": c.reason,
284 "message": c.message,
285 "lastTransitionTime": (
286 c.last_transition_time.isoformat() if c.last_transition_time else None
287 ),
288 }
289 for c in conditions
290 ],
291 },
292 "computed_status": computed_status,
293 }
296def _parse_pod_to_dict(pod: V1Pod) -> dict[str, Any]:
297 """Parse a Kubernetes Pod object to a dictionary."""
298 metadata = pod.metadata
299 status = pod.status
300 spec = pod.spec
302 container_statuses = []
303 for cs in status.container_statuses or []:
304 container_status: dict[str, Any] = {
305 "name": cs.name,
306 "ready": cs.ready,
307 "restartCount": cs.restart_count,
308 "image": cs.image,
309 }
310 if cs.state: 310 ↛ 323line 310 didn't jump to line 323 because the condition on line 310 was always true
311 if cs.state.running:
312 container_status["state"] = "running"
313 container_status["startedAt"] = (
314 cs.state.running.started_at.isoformat() if cs.state.running.started_at else None
315 )
316 elif cs.state.waiting:
317 container_status["state"] = "waiting"
318 container_status["reason"] = cs.state.waiting.reason
319 elif cs.state.terminated: 319 ↛ 323line 319 didn't jump to line 323 because the condition on line 319 was always true
320 container_status["state"] = "terminated"
321 container_status["exitCode"] = cs.state.terminated.exit_code
322 container_status["reason"] = cs.state.terminated.reason
323 container_statuses.append(container_status)
325 init_container_statuses = []
326 for cs in status.init_container_statuses or []:
327 init_status = {
328 "name": cs.name,
329 "ready": cs.ready,
330 "restartCount": cs.restart_count,
331 }
332 init_container_statuses.append(init_status)
334 return {
335 "metadata": {
336 "name": metadata.name,
337 "namespace": metadata.namespace,
338 "creationTimestamp": (
339 metadata.creation_timestamp.isoformat() if metadata.creation_timestamp else None
340 ),
341 "labels": metadata.labels or {},
342 "uid": metadata.uid,
343 },
344 "spec": {
345 "nodeName": spec.node_name,
346 "containers": [{"name": c.name, "image": c.image} for c in spec.containers],
347 "initContainers": [
348 {"name": c.name, "image": c.image} for c in (spec.init_containers or [])
349 ],
350 },
351 "status": {
352 "phase": status.phase,
353 "hostIP": status.host_ip,
354 "podIP": status.pod_ip,
355 "startTime": status.start_time.isoformat() if status.start_time else None,
356 "containerStatuses": container_statuses,
357 "initContainerStatuses": init_container_statuses,
358 },
359 }
362def _parse_event_to_dict(event: CoreV1Event) -> dict[str, Any]:
363 """Parse a Kubernetes Event object to a dictionary."""
364 return {
365 "type": event.type,
366 "reason": event.reason,
367 "message": event.message,
368 "count": event.count or 1,
369 "firstTimestamp": (event.first_timestamp.isoformat() if event.first_timestamp else None),
370 "lastTimestamp": (event.last_timestamp.isoformat() if event.last_timestamp else None),
371 "source": {
372 "component": event.source.component if event.source else None,
373 "host": event.source.host if event.source else None,
374 },
375 "involvedObject": {
376 "kind": event.involved_object.kind if event.involved_object else None,
377 "name": event.involved_object.name if event.involved_object else None,
378 "namespace": event.involved_object.namespace if event.involved_object else None,
379 },
380 }
383def _apply_template_parameters(
384 manifest: dict[str, Any], parameters: dict[str, Any]
385) -> dict[str, Any]:
386 """Apply parameter substitutions to a manifest template."""
387 import json
388 import re
390 manifest_str = json.dumps(manifest)
391 for key, value in parameters.items():
392 pattern = r"\{\{\s*" + re.escape(key) + r"\s*\}\}"
393 manifest_str = re.sub(pattern, str(value), manifest_str)
394 result: dict[str, Any] = json.loads(manifest_str)
395 return result