Coverage for gco/services/api_shared.py: 98%
145 statements
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-15 15:07 +0000
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-15 15:07 +0000
1"""
2Shared state, models, and helpers for the Manifest API routers.
4This module holds the global state (manifest processor, DynamoDB stores),
5Pydantic request/response models, and helper functions used across all API
6route modules. Centralizing them here avoids circular imports between
7manifest_api.py and the routers.
8"""
10from __future__ import annotations
12import logging
13from enum import StrEnum
14from typing import Any
16from fastapi import HTTPException
17from kubernetes.client.models import CoreV1Event, V1Job, V1Pod
18from pydantic import BaseModel, Field
20from gco.services.manifest_processor import ManifestProcessor
21from gco.services.metrics_publisher import ManifestProcessorMetrics
22from gco.services.template_store import (
23 JobStore,
24 TemplateStore,
25 WebhookStore,
26)
28logger = logging.getLogger(__name__)
31# ---------------------------------------------------------------------------
32# Shared enums and Pydantic models
33# ---------------------------------------------------------------------------
36class SortOrder(StrEnum):
37 ASC = "asc"
38 DESC = "desc"
41class JobStatus(StrEnum):
42 PENDING = "pending"
43 RUNNING = "running"
44 COMPLETED = "completed"
45 SUCCEEDED = "succeeded"
46 FAILED = "failed"
49class WebhookEvent(StrEnum):
50 JOB_COMPLETED = "job.completed"
51 JOB_FAILED = "job.failed"
52 JOB_STARTED = "job.started"
55class ManifestSubmissionAPIRequest(BaseModel):
56 """API model for manifest submission requests."""
58 manifests: list[dict[str, Any]] = Field(
59 ..., description="List of Kubernetes manifests to apply"
60 )
61 namespace: str | None = Field(
62 None, description="Default namespace for resources without namespace specified"
63 )
64 dry_run: bool = Field(False, description="If true, validate manifests without applying them")
65 validate_manifests: bool = Field(
66 True, description="If true, perform validation checks on manifests", alias="validate"
67 )
69 model_config = {
70 "json_schema_extra": {
71 "example": {
72 "manifests": [
73 {"apiVersion": "batch/v1", "kind": "Job", "metadata": {"name": "example"}}
74 ],
75 "namespace": "gco-jobs",
76 "dry_run": False,
77 }
78 }
79 }
82class ResourceIdentifier(BaseModel):
83 api_version: str = Field(..., description="Kubernetes API version (e.g., 'apps/v1')")
84 kind: str = Field(..., description="Kubernetes resource kind (e.g., 'Deployment')")
85 name: str = Field(..., description="Resource name")
86 namespace: str = Field(..., description="Resource namespace")
89class BulkDeleteRequest(BaseModel):
90 namespace: str | None = Field(None, description="Filter by namespace")
91 status: JobStatus | None = Field(None, description="Filter by status")
92 older_than_days: int | None = Field(
93 None, description="Delete jobs older than N days", ge=1, le=365
94 )
95 label_selector: str | None = Field(None, description="Kubernetes label selector")
96 dry_run: bool = Field(False, description="If true, only return what would be deleted")
98 model_config = {
99 "json_schema_extra": {
100 "example": {
101 "namespace": "gco-jobs",
102 "status": "completed",
103 "older_than_days": 7,
104 "dry_run": False,
105 }
106 }
107 }
110class JobTemplateRequest(BaseModel):
111 name: str = Field(..., description="Template name", min_length=1, max_length=63)
112 description: str | None = Field(None, description="Template description")
113 manifest: dict[str, Any] = Field(..., description="Job manifest template")
114 parameters: dict[str, Any] | None = Field(None, description="Default parameter values")
116 model_config = {
117 "json_schema_extra": {
118 "example": {
119 "name": "gpu-training-template",
120 "description": "Template for GPU training jobs",
121 "manifest": {
122 "apiVersion": "batch/v1",
123 "kind": "Job",
124 "metadata": {"name": "{{name}}"},
125 },
126 "parameters": {"image": "pytorch/pytorch:latest"},
127 }
128 }
129 }
132class JobFromTemplateRequest(BaseModel):
133 name: str = Field(..., description="Job name", min_length=1, max_length=63)
134 namespace: str = Field("gco-jobs", description="Target namespace")
135 parameters: dict[str, Any] | None = Field(None, description="Parameter overrides")
137 model_config = {
138 "json_schema_extra": {
139 "example": {
140 "name": "my-training-job",
141 "namespace": "gco-jobs",
142 "parameters": {"image": "my-custom-image:v1"},
143 }
144 }
145 }
148class WebhookRequest(BaseModel):
149 url: str = Field(..., description="Webhook URL to call")
150 events: list[WebhookEvent] = Field(..., description="Events to subscribe to")
151 namespace: str | None = Field(None, description="Filter by namespace (optional)")
152 secret: str | None = Field(None, description="Secret for HMAC signature (optional)")
154 model_config = {
155 "json_schema_extra": {
156 "example": {
157 "url": "https://example.com/webhook",
158 "events": ["job.completed", "job.failed"],
159 "namespace": "gco-jobs",
160 }
161 }
162 }
165class QueuedJobRequest(BaseModel):
166 manifest: dict[str, Any] = Field(..., description="Kubernetes job manifest")
167 target_region: str = Field(..., description="Target region for job execution")
168 namespace: str = Field("gco-jobs", description="Kubernetes namespace")
169 priority: int = Field(0, description="Job priority (higher = more important)", ge=0, le=100)
170 labels: dict[str, str] | None = Field(None, description="Optional labels for filtering")
172 model_config = {
173 "json_schema_extra": {
174 "example": {
175 "manifest": {
176 "apiVersion": "batch/v1",
177 "kind": "Job",
178 "metadata": {"name": "my-training-job"},
179 },
180 "target_region": "us-east-1",
181 "namespace": "gco-jobs",
182 "priority": 10,
183 }
184 }
185 }
188class PaginatedResponse(BaseModel):
189 total: int = Field(..., description="Total number of items")
190 limit: int = Field(..., description="Items per page")
191 offset: int = Field(..., description="Current offset")
192 has_more: bool = Field(..., description="Whether more items exist")
195class ErrorResponse(BaseModel):
196 error: str = Field(..., description="Error type")
197 detail: str = Field(..., description="Error details")
198 timestamp: str = Field(..., description="Error timestamp")
201# ---------------------------------------------------------------------------
202# Global state — populated by the lifespan handler in manifest_api.py
203# ---------------------------------------------------------------------------
204manifest_processor: ManifestProcessor | None = None
205manifest_metrics: ManifestProcessorMetrics | None = None
206template_store: TemplateStore | None = None
207webhook_store: WebhookStore | None = None
208job_store: JobStore | None = None
211# ---------------------------------------------------------------------------
212# Helper functions
213# ---------------------------------------------------------------------------
216def _check_processor() -> ManifestProcessor:
217 """Check if manifest processor is initialized and return it."""
218 # Import at call-time to read the global that lifespan populates on
219 # the manifest_api module (tests also patch it there).
220 from gco.services import manifest_api as _api
222 if _api.manifest_processor is None:
223 raise HTTPException(status_code=503, detail="Manifest processor not initialized")
224 return _api.manifest_processor
227def _check_namespace(namespace: str, processor: ManifestProcessor) -> None:
228 """Check if namespace is allowed."""
229 if namespace not in processor.allowed_namespaces:
230 raise HTTPException(
231 status_code=403,
232 detail=f"Namespace '{namespace}' not allowed. Allowed: {list(processor.allowed_namespaces)}",
233 )
236def _parse_job_to_dict(job: V1Job) -> dict[str, Any]:
237 """Parse a Kubernetes Job object to a dictionary."""
238 metadata = job.metadata
239 status = job.status
240 spec = job.spec
242 conditions = status.conditions or []
243 computed_status = "pending"
244 for condition in conditions:
245 if condition.type == "Complete" and condition.status == "True":
246 computed_status = "succeeded"
247 break
248 if condition.type == "Failed" and condition.status == "True": 248 ↛ 244line 248 didn't jump to line 244 because the condition on line 248 was always true
249 computed_status = "failed"
250 break
252 if computed_status == "pending" and (status.active or 0) > 0:
253 computed_status = "running"
255 # Pull container image refs from the pod template so callers (e.g.
256 # the orphan-image cross-reference) can identify which ECR images
257 # are still in use without a second round-trip per job.
258 template = getattr(spec, "template", None)
259 pod_spec = getattr(template, "spec", None) if template is not None else None
260 containers = getattr(pod_spec, "containers", None) or []
261 init_containers = getattr(pod_spec, "init_containers", None) or []
262 container_specs = [
263 {"name": getattr(c, "name", ""), "image": getattr(c, "image", "")} for c in containers
264 ]
265 init_container_specs = [
266 {"name": getattr(c, "name", ""), "image": getattr(c, "image", "")} for c in init_containers
267 ]
269 return {
270 "metadata": {
271 "name": metadata.name,
272 "namespace": metadata.namespace,
273 "creationTimestamp": (
274 metadata.creation_timestamp.isoformat() if metadata.creation_timestamp else None
275 ),
276 "labels": metadata.labels or {},
277 "annotations": metadata.annotations or {},
278 "uid": metadata.uid,
279 },
280 "spec": {
281 "parallelism": spec.parallelism,
282 "completions": spec.completions,
283 "backoffLimit": spec.backoff_limit,
284 "template": {
285 "spec": {
286 "containers": container_specs,
287 "initContainers": init_container_specs,
288 },
289 },
290 },
291 "status": {
292 "active": status.active or 0,
293 "succeeded": status.succeeded or 0,
294 "failed": status.failed or 0,
295 "startTime": status.start_time.isoformat() if status.start_time else None,
296 "completionTime": (
297 status.completion_time.isoformat() if status.completion_time else None
298 ),
299 "conditions": [
300 {
301 "type": c.type,
302 "status": c.status,
303 "reason": c.reason,
304 "message": c.message,
305 "lastTransitionTime": (
306 c.last_transition_time.isoformat() if c.last_transition_time else None
307 ),
308 }
309 for c in conditions
310 ],
311 },
312 "computed_status": computed_status,
313 }
316def _parse_pod_to_dict(pod: V1Pod) -> dict[str, Any]:
317 """Parse a Kubernetes Pod object to a dictionary."""
318 metadata = pod.metadata
319 status = pod.status
320 spec = pod.spec
322 container_statuses = []
323 for cs in status.container_statuses or []:
324 container_status: dict[str, Any] = {
325 "name": cs.name,
326 "ready": cs.ready,
327 "restartCount": cs.restart_count,
328 "image": cs.image,
329 }
330 if cs.state: 330 ↛ 343line 330 didn't jump to line 343 because the condition on line 330 was always true
331 if cs.state.running:
332 container_status["state"] = "running"
333 container_status["startedAt"] = (
334 cs.state.running.started_at.isoformat() if cs.state.running.started_at else None
335 )
336 elif cs.state.waiting:
337 container_status["state"] = "waiting"
338 container_status["reason"] = cs.state.waiting.reason
339 elif cs.state.terminated: 339 ↛ 343line 339 didn't jump to line 343 because the condition on line 339 was always true
340 container_status["state"] = "terminated"
341 container_status["exitCode"] = cs.state.terminated.exit_code
342 container_status["reason"] = cs.state.terminated.reason
343 container_statuses.append(container_status)
345 init_container_statuses = []
346 for cs in status.init_container_statuses or []:
347 init_status = {
348 "name": cs.name,
349 "ready": cs.ready,
350 "restartCount": cs.restart_count,
351 }
352 init_container_statuses.append(init_status)
354 return {
355 "metadata": {
356 "name": metadata.name,
357 "namespace": metadata.namespace,
358 "creationTimestamp": (
359 metadata.creation_timestamp.isoformat() if metadata.creation_timestamp else None
360 ),
361 "labels": metadata.labels or {},
362 "uid": metadata.uid,
363 },
364 "spec": {
365 "nodeName": spec.node_name,
366 "containers": [{"name": c.name, "image": c.image} for c in spec.containers],
367 "initContainers": [
368 {"name": c.name, "image": c.image} for c in (spec.init_containers or [])
369 ],
370 },
371 "status": {
372 "phase": status.phase,
373 "hostIP": status.host_ip,
374 "podIP": status.pod_ip,
375 "startTime": status.start_time.isoformat() if status.start_time else None,
376 "containerStatuses": container_statuses,
377 "initContainerStatuses": init_container_statuses,
378 },
379 }
382def _parse_event_to_dict(event: CoreV1Event) -> dict[str, Any]:
383 """Parse a Kubernetes Event object to a dictionary."""
384 return {
385 "type": event.type,
386 "reason": event.reason,
387 "message": event.message,
388 "count": event.count or 1,
389 "firstTimestamp": (event.first_timestamp.isoformat() if event.first_timestamp else None),
390 "lastTimestamp": (event.last_timestamp.isoformat() if event.last_timestamp else None),
391 "source": {
392 "component": event.source.component if event.source else None,
393 "host": event.source.host if event.source else None,
394 },
395 "involvedObject": {
396 "kind": event.involved_object.kind if event.involved_object else None,
397 "name": event.involved_object.name if event.involved_object else None,
398 "namespace": event.involved_object.namespace if event.involved_object else None,
399 },
400 }
403def _apply_template_parameters(
404 manifest: dict[str, Any], parameters: dict[str, Any]
405) -> dict[str, Any]:
406 """Apply parameter substitutions to a manifest template."""
407 import json
408 import re
410 manifest_str = json.dumps(manifest)
411 for key, value in parameters.items():
412 pattern = r"\{\{\s*" + re.escape(key) + r"\s*\}\}"
413 manifest_str = re.sub(pattern, str(value), manifest_str)
414 result: dict[str, Any] = json.loads(manifest_str)
415 return result