Coverage for gco / services / api_routes / queue.py: 97%
110 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-30 21:47 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-30 21:47 +0000
1"""DynamoDB-backed global job queue endpoints."""
3from __future__ import annotations
5import logging
6from datetime import UTC, datetime
7from typing import TYPE_CHECKING
9from fastapi import APIRouter, HTTPException, Query
10from fastapi.responses import JSONResponse, Response
12from gco.models import ManifestSubmissionRequest
13from gco.services.api_shared import QueuedJobRequest, _check_processor
14from gco.services.template_store import JobStatus as JobStoreStatus
16if TYPE_CHECKING:
17 from gco.services.template_store import JobStore
19router = APIRouter(prefix="/api/v1/queue", tags=["Job Queue"])
20logger = logging.getLogger(__name__)
23def _get_job_store() -> JobStore:
24 from gco.services.manifest_api import job_store
26 if job_store is None:
27 raise HTTPException(status_code=503, detail="Job store not initialized")
28 return job_store
31@router.post("/jobs")
32async def submit_job_to_queue(request: QueuedJobRequest) -> Response:
33 """Submit a job to the global queue for regional pickup."""
34 import uuid
36 store = _get_job_store()
37 job_id = str(uuid.uuid4())
39 try:
40 job = store.submit_job(
41 job_id=job_id,
42 manifest=request.manifest,
43 target_region=request.target_region,
44 namespace=request.namespace,
45 priority=request.priority,
46 labels=request.labels,
47 )
48 return JSONResponse(
49 status_code=201,
50 content={
51 "timestamp": datetime.now(UTC).isoformat(),
52 "message": "Job queued successfully",
53 "job": job,
54 },
55 )
56 except Exception as e:
57 logger.error(f"Failed to queue job: {e}")
58 raise HTTPException(status_code=500, detail=f"Failed to queue job: {e!s}") from e
61@router.get("/jobs")
62async def list_queued_jobs(
63 target_region: str | None = Query(None, description="Filter by target region"),
64 status: str | None = Query(None, description="Filter by status"),
65 namespace: str | None = Query(None, description="Filter by namespace"),
66 limit: int = Query(100, description="Maximum results", ge=1, le=1000),
67) -> Response:
68 """List jobs in the global queue with optional filters."""
69 store = _get_job_store()
70 try:
71 jobs = store.list_jobs(
72 target_region=target_region, status=status, namespace=namespace, limit=limit
73 )
74 return JSONResponse(
75 status_code=200,
76 content={
77 "timestamp": datetime.now(UTC).isoformat(),
78 "count": len(jobs),
79 "jobs": jobs,
80 },
81 )
82 except Exception as e:
83 logger.error(f"Failed to list queued jobs: {e}")
84 raise HTTPException(status_code=500, detail=f"Failed to list jobs: {e!s}") from e
87@router.get("/jobs/{job_id}")
88async def get_queued_job(job_id: str) -> Response:
89 """Get details of a specific queued job."""
90 store = _get_job_store()
91 try:
92 job = store.get_job(job_id)
93 if job is None:
94 raise HTTPException(status_code=404, detail=f"Job '{job_id}' not found")
95 return JSONResponse(
96 status_code=200,
97 content={"timestamp": datetime.now(UTC).isoformat(), "job": job},
98 )
99 except HTTPException:
100 raise
101 except Exception as e:
102 logger.error(f"Failed to get job {job_id}: {e}")
103 raise HTTPException(status_code=500, detail=f"Failed to get job: {e!s}") from e
106@router.delete("/jobs/{job_id}")
107async def cancel_queued_job(
108 job_id: str, reason: str | None = Query(None, description="Cancellation reason")
109) -> Response:
110 """Cancel a queued job (only works for jobs not yet running)."""
111 store = _get_job_store()
112 try:
113 cancelled = store.cancel_job(job_id, reason=reason)
114 if not cancelled:
115 raise HTTPException(
116 status_code=409,
117 detail=f"Job '{job_id}' cannot be cancelled (already running or completed)",
118 )
119 return JSONResponse(
120 status_code=200,
121 content={
122 "timestamp": datetime.now(UTC).isoformat(),
123 "message": f"Job '{job_id}' cancelled successfully",
124 },
125 )
126 except HTTPException:
127 raise
128 except Exception as e:
129 logger.error(f"Failed to cancel job {job_id}: {e}")
130 raise HTTPException(status_code=500, detail=f"Failed to cancel job: {e!s}") from e
133@router.get("/stats")
134async def get_queue_stats() -> Response:
135 """Get job queue statistics by region and status."""
136 store = _get_job_store()
137 try:
138 counts = store.get_job_counts_by_region()
139 total_jobs = sum(sum(statuses.values()) for statuses in counts.values())
140 total_queued = sum(statuses.get("queued", 0) for statuses in counts.values())
141 total_running = sum(statuses.get("running", 0) for statuses in counts.values())
143 return JSONResponse(
144 status_code=200,
145 content={
146 "timestamp": datetime.now(UTC).isoformat(),
147 "summary": {
148 "total_jobs": total_jobs,
149 "total_queued": total_queued,
150 "total_running": total_running,
151 },
152 "by_region": counts,
153 },
154 )
155 except Exception as e:
156 logger.error(f"Failed to get queue stats: {e}")
157 raise HTTPException(status_code=500, detail=f"Failed to get stats: {e!s}") from e
160@router.post("/poll")
161async def poll_and_process_jobs(
162 limit: int = Query(5, description="Maximum jobs to process", ge=1, le=20),
163) -> Response:
164 """Poll for queued jobs and process them (called by regional processors)."""
165 processor = _check_processor()
166 store = _get_job_store()
168 region = processor.region
169 processed_jobs = []
171 try:
172 queued_jobs = store.get_queued_jobs_for_region(region, limit=limit)
174 for queued_job in queued_jobs:
175 job_id = queued_job["job_id"]
177 claimed = store.claim_job(job_id, claimed_by=region)
178 if not claimed:
179 continue
181 try:
182 store.update_job_status(
183 job_id, JobStoreStatus.APPLYING, message="Applying to Kubernetes"
184 )
186 manifest = queued_job["manifest"]
187 namespace = queued_job["namespace"]
189 submission_request = ManifestSubmissionRequest(
190 manifests=[manifest], namespace=namespace, dry_run=False, validate=True
191 )
193 result = await processor.process_manifest_submission(submission_request)
195 if result.success:
196 k8s_uid = None
197 if result.resources: 197 ↛ 200line 197 didn't jump to line 200 because the condition on line 197 was always true
198 k8s_uid = result.resources[0].uid
200 store.update_job_status(
201 job_id,
202 JobStoreStatus.PENDING,
203 message="Applied to Kubernetes, waiting for scheduling",
204 k8s_job_uid=k8s_uid,
205 )
206 processed_jobs.append(
207 {"job_id": job_id, "status": "applied", "k8s_uid": k8s_uid}
208 )
209 else:
210 error_msg = "; ".join(result.errors) if result.errors else "Unknown error"
211 store.update_job_status(
212 job_id,
213 JobStoreStatus.FAILED,
214 message="Failed to apply to Kubernetes",
215 error=error_msg,
216 )
217 processed_jobs.append(
218 {"job_id": job_id, "status": "failed", "error": error_msg}
219 )
221 except Exception as e:
222 logger.error(f"Failed to process job {job_id}: {e}")
223 store.update_job_status(
224 job_id, JobStoreStatus.FAILED, message="Processing error", error=str(e)
225 )
226 processed_jobs.append({"job_id": job_id, "status": "failed", "error": str(e)})
228 return JSONResponse(
229 status_code=200,
230 content={
231 "timestamp": datetime.now(UTC).isoformat(),
232 "region": region,
233 "jobs_polled": len(queued_jobs),
234 "jobs_processed": len(processed_jobs),
235 "results": processed_jobs,
236 },
237 )
239 except Exception as e:
240 logger.error(f"Failed to poll jobs: {e}")
241 raise HTTPException(status_code=500, detail=f"Failed to poll jobs: {e!s}") from e