Coverage for gco/services/api_routes/queue.py: 97%

111 statements  

« prev     ^ index     » next       coverage.py v7.14.1, created at 2026-06-15 15:07 +0000

1"""DynamoDB-backed global job queue endpoints.""" 

2 

3from __future__ import annotations 

4 

5import logging 

6from datetime import UTC, datetime 

7from typing import TYPE_CHECKING 

8 

9from fastapi import APIRouter, HTTPException, Query 

10from fastapi.responses import JSONResponse, Response 

11 

12from gco.models import ManifestSubmissionRequest 

13from gco.services.api_shared import QueuedJobRequest, _check_processor 

14from gco.services.structured_logging import sanitize_log_value 

15from gco.services.template_store import JobStatus as JobStoreStatus 

16 

17if TYPE_CHECKING: 

18 from gco.services.template_store import JobStore 

19 

20router = APIRouter(prefix="/api/v1/queue", tags=["Job Queue"]) 

21logger = logging.getLogger(__name__) 

22 

23 

24def _get_job_store() -> JobStore: 

25 from gco.services.manifest_api import job_store 

26 

27 if job_store is None: 

28 raise HTTPException(status_code=503, detail="Job store not initialized") 

29 return job_store 

30 

31 

32@router.post("/jobs") 

33async def submit_job_to_queue(request: QueuedJobRequest) -> Response: 

34 """Submit a job to the global queue for regional pickup.""" 

35 import uuid 

36 

37 store = _get_job_store() 

38 job_id = str(uuid.uuid4()) 

39 

40 try: 

41 job = store.submit_job( 

42 job_id=job_id, 

43 manifest=request.manifest, 

44 target_region=request.target_region, 

45 namespace=request.namespace, 

46 priority=request.priority, 

47 labels=request.labels, 

48 ) 

49 return JSONResponse( 

50 status_code=201, 

51 content={ 

52 "timestamp": datetime.now(UTC).isoformat(), 

53 "message": "Job queued successfully", 

54 "job": job, 

55 }, 

56 ) 

57 except Exception as e: 

58 logger.error(f"Failed to queue job: {e}") 

59 raise HTTPException(status_code=500, detail=f"Failed to queue job: {e!s}") from e 

60 

61 

62@router.get("/jobs") 

63async def list_queued_jobs( 

64 target_region: str | None = Query(None, description="Filter by target region"), 

65 status: str | None = Query(None, description="Filter by status"), 

66 namespace: str | None = Query(None, description="Filter by namespace"), 

67 limit: int = Query(100, description="Maximum results", ge=1, le=1000), 

68) -> Response: 

69 """List jobs in the global queue with optional filters.""" 

70 store = _get_job_store() 

71 try: 

72 jobs = store.list_jobs( 

73 target_region=target_region, status=status, namespace=namespace, limit=limit 

74 ) 

75 return JSONResponse( 

76 status_code=200, 

77 content={ 

78 "timestamp": datetime.now(UTC).isoformat(), 

79 "count": len(jobs), 

80 "jobs": jobs, 

81 }, 

82 ) 

83 except Exception as e: 

84 logger.error(f"Failed to list queued jobs: {e}") 

85 raise HTTPException(status_code=500, detail=f"Failed to list jobs: {e!s}") from e 

86 

87 

88@router.get("/jobs/{job_id}") 

89async def get_queued_job(job_id: str) -> Response: 

90 """Get details of a specific queued job.""" 

91 store = _get_job_store() 

92 try: 

93 job = store.get_job(job_id) 

94 if job is None: 

95 raise HTTPException(status_code=404, detail=f"Job '{job_id}' not found") 

96 return JSONResponse( 

97 status_code=200, 

98 content={"timestamp": datetime.now(UTC).isoformat(), "job": job}, 

99 ) 

100 except HTTPException: 

101 raise 

102 except Exception as e: 

103 logger.error("Failed to get job %s: %s", sanitize_log_value(job_id), e) 

104 raise HTTPException(status_code=500, detail=f"Failed to get job: {e!s}") from e 

105 

106 

107@router.delete("/jobs/{job_id}") 

108async def cancel_queued_job( 

109 job_id: str, reason: str | None = Query(None, description="Cancellation reason") 

110) -> Response: 

111 """Cancel a queued job (only works for jobs not yet running).""" 

112 store = _get_job_store() 

113 try: 

114 cancelled = store.cancel_job(job_id, reason=reason) 

115 if not cancelled: 

116 raise HTTPException( 

117 status_code=409, 

118 detail=f"Job '{job_id}' cannot be cancelled (already running or completed)", 

119 ) 

120 return JSONResponse( 

121 status_code=200, 

122 content={ 

123 "timestamp": datetime.now(UTC).isoformat(), 

124 "message": f"Job '{job_id}' cancelled successfully", 

125 }, 

126 ) 

127 except HTTPException: 

128 raise 

129 except Exception as e: 

130 logger.error("Failed to cancel job %s: %s", sanitize_log_value(job_id), e) 

131 raise HTTPException(status_code=500, detail=f"Failed to cancel job: {e!s}") from e 

132 

133 

134@router.get("/stats") 

135async def get_queue_stats() -> Response: 

136 """Get job queue statistics by region and status.""" 

137 store = _get_job_store() 

138 try: 

139 counts = store.get_job_counts_by_region() 

140 total_jobs = sum(sum(statuses.values()) for statuses in counts.values()) 

141 total_queued = sum(statuses.get("queued", 0) for statuses in counts.values()) 

142 total_running = sum(statuses.get("running", 0) for statuses in counts.values()) 

143 

144 return JSONResponse( 

145 status_code=200, 

146 content={ 

147 "timestamp": datetime.now(UTC).isoformat(), 

148 "summary": { 

149 "total_jobs": total_jobs, 

150 "total_queued": total_queued, 

151 "total_running": total_running, 

152 }, 

153 "by_region": counts, 

154 }, 

155 ) 

156 except Exception as e: 

157 logger.error(f"Failed to get queue stats: {e}") 

158 raise HTTPException(status_code=500, detail=f"Failed to get stats: {e!s}") from e 

159 

160 

161@router.post("/poll") 

162async def poll_and_process_jobs( 

163 limit: int = Query(5, description="Maximum jobs to process", ge=1, le=20), 

164) -> Response: 

165 """Poll for queued jobs and process them (called by regional processors).""" 

166 processor = _check_processor() 

167 store = _get_job_store() 

168 

169 region = processor.region 

170 processed_jobs = [] 

171 

172 try: 

173 queued_jobs = store.get_queued_jobs_for_region(region, limit=limit) 

174 

175 for queued_job in queued_jobs: 

176 job_id = queued_job["job_id"] 

177 

178 claimed = store.claim_job(job_id, claimed_by=region) 

179 if not claimed: 

180 continue 

181 

182 try: 

183 store.update_job_status( 

184 job_id, JobStoreStatus.APPLYING, message="Applying to Kubernetes" 

185 ) 

186 

187 manifest = queued_job["manifest"] 

188 namespace = queued_job["namespace"] 

189 

190 submission_request = ManifestSubmissionRequest( 

191 manifests=[manifest], namespace=namespace, dry_run=False, validate=True 

192 ) 

193 

194 result = await processor.process_manifest_submission(submission_request) 

195 

196 if result.success: 

197 k8s_uid = None 

198 if result.resources: 198 ↛ 201line 198 didn't jump to line 201 because the condition on line 198 was always true

199 k8s_uid = result.resources[0].uid 

200 

201 store.update_job_status( 

202 job_id, 

203 JobStoreStatus.PENDING, 

204 message="Applied to Kubernetes, waiting for scheduling", 

205 k8s_job_uid=k8s_uid, 

206 ) 

207 processed_jobs.append( 

208 {"job_id": job_id, "status": "applied", "k8s_uid": k8s_uid} 

209 ) 

210 else: 

211 error_msg = "; ".join(result.errors) if result.errors else "Unknown error" 

212 store.update_job_status( 

213 job_id, 

214 JobStoreStatus.FAILED, 

215 message="Failed to apply to Kubernetes", 

216 error=error_msg, 

217 ) 

218 processed_jobs.append( 

219 {"job_id": job_id, "status": "failed", "error": error_msg} 

220 ) 

221 

222 except Exception as e: 

223 logger.error(f"Failed to process job {job_id}: {e}") 

224 store.update_job_status( 

225 job_id, JobStoreStatus.FAILED, message="Processing error", error=str(e) 

226 ) 

227 processed_jobs.append({"job_id": job_id, "status": "failed", "error": str(e)}) 

228 

229 return JSONResponse( 

230 status_code=200, 

231 content={ 

232 "timestamp": datetime.now(UTC).isoformat(), 

233 "region": region, 

234 "jobs_polled": len(queued_jobs), 

235 "jobs_processed": len(processed_jobs), 

236 "results": processed_jobs, 

237 }, 

238 ) 

239 

240 except Exception as e: 

241 logger.error(f"Failed to poll jobs: {e}") 

242 raise HTTPException(status_code=500, detail=f"Failed to poll jobs: {e!s}") from e