Coverage for gco/services/api_routes/queue.py: 97%
111 statements
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-15 15:07 +0000
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-15 15:07 +0000
1"""DynamoDB-backed global job queue endpoints."""
3from __future__ import annotations
5import logging
6from datetime import UTC, datetime
7from typing import TYPE_CHECKING
9from fastapi import APIRouter, HTTPException, Query
10from fastapi.responses import JSONResponse, Response
12from gco.models import ManifestSubmissionRequest
13from gco.services.api_shared import QueuedJobRequest, _check_processor
14from gco.services.structured_logging import sanitize_log_value
15from gco.services.template_store import JobStatus as JobStoreStatus
17if TYPE_CHECKING:
18 from gco.services.template_store import JobStore
20router = APIRouter(prefix="/api/v1/queue", tags=["Job Queue"])
21logger = logging.getLogger(__name__)
24def _get_job_store() -> JobStore:
25 from gco.services.manifest_api import job_store
27 if job_store is None:
28 raise HTTPException(status_code=503, detail="Job store not initialized")
29 return job_store
32@router.post("/jobs")
33async def submit_job_to_queue(request: QueuedJobRequest) -> Response:
34 """Submit a job to the global queue for regional pickup."""
35 import uuid
37 store = _get_job_store()
38 job_id = str(uuid.uuid4())
40 try:
41 job = store.submit_job(
42 job_id=job_id,
43 manifest=request.manifest,
44 target_region=request.target_region,
45 namespace=request.namespace,
46 priority=request.priority,
47 labels=request.labels,
48 )
49 return JSONResponse(
50 status_code=201,
51 content={
52 "timestamp": datetime.now(UTC).isoformat(),
53 "message": "Job queued successfully",
54 "job": job,
55 },
56 )
57 except Exception as e:
58 logger.error(f"Failed to queue job: {e}")
59 raise HTTPException(status_code=500, detail=f"Failed to queue job: {e!s}") from e
62@router.get("/jobs")
63async def list_queued_jobs(
64 target_region: str | None = Query(None, description="Filter by target region"),
65 status: str | None = Query(None, description="Filter by status"),
66 namespace: str | None = Query(None, description="Filter by namespace"),
67 limit: int = Query(100, description="Maximum results", ge=1, le=1000),
68) -> Response:
69 """List jobs in the global queue with optional filters."""
70 store = _get_job_store()
71 try:
72 jobs = store.list_jobs(
73 target_region=target_region, status=status, namespace=namespace, limit=limit
74 )
75 return JSONResponse(
76 status_code=200,
77 content={
78 "timestamp": datetime.now(UTC).isoformat(),
79 "count": len(jobs),
80 "jobs": jobs,
81 },
82 )
83 except Exception as e:
84 logger.error(f"Failed to list queued jobs: {e}")
85 raise HTTPException(status_code=500, detail=f"Failed to list jobs: {e!s}") from e
88@router.get("/jobs/{job_id}")
89async def get_queued_job(job_id: str) -> Response:
90 """Get details of a specific queued job."""
91 store = _get_job_store()
92 try:
93 job = store.get_job(job_id)
94 if job is None:
95 raise HTTPException(status_code=404, detail=f"Job '{job_id}' not found")
96 return JSONResponse(
97 status_code=200,
98 content={"timestamp": datetime.now(UTC).isoformat(), "job": job},
99 )
100 except HTTPException:
101 raise
102 except Exception as e:
103 logger.error("Failed to get job %s: %s", sanitize_log_value(job_id), e)
104 raise HTTPException(status_code=500, detail=f"Failed to get job: {e!s}") from e
107@router.delete("/jobs/{job_id}")
108async def cancel_queued_job(
109 job_id: str, reason: str | None = Query(None, description="Cancellation reason")
110) -> Response:
111 """Cancel a queued job (only works for jobs not yet running)."""
112 store = _get_job_store()
113 try:
114 cancelled = store.cancel_job(job_id, reason=reason)
115 if not cancelled:
116 raise HTTPException(
117 status_code=409,
118 detail=f"Job '{job_id}' cannot be cancelled (already running or completed)",
119 )
120 return JSONResponse(
121 status_code=200,
122 content={
123 "timestamp": datetime.now(UTC).isoformat(),
124 "message": f"Job '{job_id}' cancelled successfully",
125 },
126 )
127 except HTTPException:
128 raise
129 except Exception as e:
130 logger.error("Failed to cancel job %s: %s", sanitize_log_value(job_id), e)
131 raise HTTPException(status_code=500, detail=f"Failed to cancel job: {e!s}") from e
134@router.get("/stats")
135async def get_queue_stats() -> Response:
136 """Get job queue statistics by region and status."""
137 store = _get_job_store()
138 try:
139 counts = store.get_job_counts_by_region()
140 total_jobs = sum(sum(statuses.values()) for statuses in counts.values())
141 total_queued = sum(statuses.get("queued", 0) for statuses in counts.values())
142 total_running = sum(statuses.get("running", 0) for statuses in counts.values())
144 return JSONResponse(
145 status_code=200,
146 content={
147 "timestamp": datetime.now(UTC).isoformat(),
148 "summary": {
149 "total_jobs": total_jobs,
150 "total_queued": total_queued,
151 "total_running": total_running,
152 },
153 "by_region": counts,
154 },
155 )
156 except Exception as e:
157 logger.error(f"Failed to get queue stats: {e}")
158 raise HTTPException(status_code=500, detail=f"Failed to get stats: {e!s}") from e
161@router.post("/poll")
162async def poll_and_process_jobs(
163 limit: int = Query(5, description="Maximum jobs to process", ge=1, le=20),
164) -> Response:
165 """Poll for queued jobs and process them (called by regional processors)."""
166 processor = _check_processor()
167 store = _get_job_store()
169 region = processor.region
170 processed_jobs = []
172 try:
173 queued_jobs = store.get_queued_jobs_for_region(region, limit=limit)
175 for queued_job in queued_jobs:
176 job_id = queued_job["job_id"]
178 claimed = store.claim_job(job_id, claimed_by=region)
179 if not claimed:
180 continue
182 try:
183 store.update_job_status(
184 job_id, JobStoreStatus.APPLYING, message="Applying to Kubernetes"
185 )
187 manifest = queued_job["manifest"]
188 namespace = queued_job["namespace"]
190 submission_request = ManifestSubmissionRequest(
191 manifests=[manifest], namespace=namespace, dry_run=False, validate=True
192 )
194 result = await processor.process_manifest_submission(submission_request)
196 if result.success:
197 k8s_uid = None
198 if result.resources: 198 ↛ 201line 198 didn't jump to line 201 because the condition on line 198 was always true
199 k8s_uid = result.resources[0].uid
201 store.update_job_status(
202 job_id,
203 JobStoreStatus.PENDING,
204 message="Applied to Kubernetes, waiting for scheduling",
205 k8s_job_uid=k8s_uid,
206 )
207 processed_jobs.append(
208 {"job_id": job_id, "status": "applied", "k8s_uid": k8s_uid}
209 )
210 else:
211 error_msg = "; ".join(result.errors) if result.errors else "Unknown error"
212 store.update_job_status(
213 job_id,
214 JobStoreStatus.FAILED,
215 message="Failed to apply to Kubernetes",
216 error=error_msg,
217 )
218 processed_jobs.append(
219 {"job_id": job_id, "status": "failed", "error": error_msg}
220 )
222 except Exception as e:
223 logger.error(f"Failed to process job {job_id}: {e}")
224 store.update_job_status(
225 job_id, JobStoreStatus.FAILED, message="Processing error", error=str(e)
226 )
227 processed_jobs.append({"job_id": job_id, "status": "failed", "error": str(e)})
229 return JSONResponse(
230 status_code=200,
231 content={
232 "timestamp": datetime.now(UTC).isoformat(),
233 "region": region,
234 "jobs_polled": len(queued_jobs),
235 "jobs_processed": len(processed_jobs),
236 "results": processed_jobs,
237 },
238 )
240 except Exception as e:
241 logger.error(f"Failed to poll jobs: {e}")
242 raise HTTPException(status_code=500, detail=f"Failed to poll jobs: {e!s}") from e