Coverage for gco / services / api_routes / queue.py: 97%

110 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-30 21:47 +0000

1"""DynamoDB-backed global job queue endpoints.""" 

2 

3from __future__ import annotations 

4 

5import logging 

6from datetime import UTC, datetime 

7from typing import TYPE_CHECKING 

8 

9from fastapi import APIRouter, HTTPException, Query 

10from fastapi.responses import JSONResponse, Response 

11 

12from gco.models import ManifestSubmissionRequest 

13from gco.services.api_shared import QueuedJobRequest, _check_processor 

14from gco.services.template_store import JobStatus as JobStoreStatus 

15 

16if TYPE_CHECKING: 

17 from gco.services.template_store import JobStore 

18 

19router = APIRouter(prefix="/api/v1/queue", tags=["Job Queue"]) 

20logger = logging.getLogger(__name__) 

21 

22 

23def _get_job_store() -> JobStore: 

24 from gco.services.manifest_api import job_store 

25 

26 if job_store is None: 

27 raise HTTPException(status_code=503, detail="Job store not initialized") 

28 return job_store 

29 

30 

31@router.post("/jobs") 

32async def submit_job_to_queue(request: QueuedJobRequest) -> Response: 

33 """Submit a job to the global queue for regional pickup.""" 

34 import uuid 

35 

36 store = _get_job_store() 

37 job_id = str(uuid.uuid4()) 

38 

39 try: 

40 job = store.submit_job( 

41 job_id=job_id, 

42 manifest=request.manifest, 

43 target_region=request.target_region, 

44 namespace=request.namespace, 

45 priority=request.priority, 

46 labels=request.labels, 

47 ) 

48 return JSONResponse( 

49 status_code=201, 

50 content={ 

51 "timestamp": datetime.now(UTC).isoformat(), 

52 "message": "Job queued successfully", 

53 "job": job, 

54 }, 

55 ) 

56 except Exception as e: 

57 logger.error(f"Failed to queue job: {e}") 

58 raise HTTPException(status_code=500, detail=f"Failed to queue job: {e!s}") from e 

59 

60 

61@router.get("/jobs") 

62async def list_queued_jobs( 

63 target_region: str | None = Query(None, description="Filter by target region"), 

64 status: str | None = Query(None, description="Filter by status"), 

65 namespace: str | None = Query(None, description="Filter by namespace"), 

66 limit: int = Query(100, description="Maximum results", ge=1, le=1000), 

67) -> Response: 

68 """List jobs in the global queue with optional filters.""" 

69 store = _get_job_store() 

70 try: 

71 jobs = store.list_jobs( 

72 target_region=target_region, status=status, namespace=namespace, limit=limit 

73 ) 

74 return JSONResponse( 

75 status_code=200, 

76 content={ 

77 "timestamp": datetime.now(UTC).isoformat(), 

78 "count": len(jobs), 

79 "jobs": jobs, 

80 }, 

81 ) 

82 except Exception as e: 

83 logger.error(f"Failed to list queued jobs: {e}") 

84 raise HTTPException(status_code=500, detail=f"Failed to list jobs: {e!s}") from e 

85 

86 

87@router.get("/jobs/{job_id}") 

88async def get_queued_job(job_id: str) -> Response: 

89 """Get details of a specific queued job.""" 

90 store = _get_job_store() 

91 try: 

92 job = store.get_job(job_id) 

93 if job is None: 

94 raise HTTPException(status_code=404, detail=f"Job '{job_id}' not found") 

95 return JSONResponse( 

96 status_code=200, 

97 content={"timestamp": datetime.now(UTC).isoformat(), "job": job}, 

98 ) 

99 except HTTPException: 

100 raise 

101 except Exception as e: 

102 logger.error(f"Failed to get job {job_id}: {e}") 

103 raise HTTPException(status_code=500, detail=f"Failed to get job: {e!s}") from e 

104 

105 

106@router.delete("/jobs/{job_id}") 

107async def cancel_queued_job( 

108 job_id: str, reason: str | None = Query(None, description="Cancellation reason") 

109) -> Response: 

110 """Cancel a queued job (only works for jobs not yet running).""" 

111 store = _get_job_store() 

112 try: 

113 cancelled = store.cancel_job(job_id, reason=reason) 

114 if not cancelled: 

115 raise HTTPException( 

116 status_code=409, 

117 detail=f"Job '{job_id}' cannot be cancelled (already running or completed)", 

118 ) 

119 return JSONResponse( 

120 status_code=200, 

121 content={ 

122 "timestamp": datetime.now(UTC).isoformat(), 

123 "message": f"Job '{job_id}' cancelled successfully", 

124 }, 

125 ) 

126 except HTTPException: 

127 raise 

128 except Exception as e: 

129 logger.error(f"Failed to cancel job {job_id}: {e}") 

130 raise HTTPException(status_code=500, detail=f"Failed to cancel job: {e!s}") from e 

131 

132 

133@router.get("/stats") 

134async def get_queue_stats() -> Response: 

135 """Get job queue statistics by region and status.""" 

136 store = _get_job_store() 

137 try: 

138 counts = store.get_job_counts_by_region() 

139 total_jobs = sum(sum(statuses.values()) for statuses in counts.values()) 

140 total_queued = sum(statuses.get("queued", 0) for statuses in counts.values()) 

141 total_running = sum(statuses.get("running", 0) for statuses in counts.values()) 

142 

143 return JSONResponse( 

144 status_code=200, 

145 content={ 

146 "timestamp": datetime.now(UTC).isoformat(), 

147 "summary": { 

148 "total_jobs": total_jobs, 

149 "total_queued": total_queued, 

150 "total_running": total_running, 

151 }, 

152 "by_region": counts, 

153 }, 

154 ) 

155 except Exception as e: 

156 logger.error(f"Failed to get queue stats: {e}") 

157 raise HTTPException(status_code=500, detail=f"Failed to get stats: {e!s}") from e 

158 

159 

160@router.post("/poll") 

161async def poll_and_process_jobs( 

162 limit: int = Query(5, description="Maximum jobs to process", ge=1, le=20), 

163) -> Response: 

164 """Poll for queued jobs and process them (called by regional processors).""" 

165 processor = _check_processor() 

166 store = _get_job_store() 

167 

168 region = processor.region 

169 processed_jobs = [] 

170 

171 try: 

172 queued_jobs = store.get_queued_jobs_for_region(region, limit=limit) 

173 

174 for queued_job in queued_jobs: 

175 job_id = queued_job["job_id"] 

176 

177 claimed = store.claim_job(job_id, claimed_by=region) 

178 if not claimed: 

179 continue 

180 

181 try: 

182 store.update_job_status( 

183 job_id, JobStoreStatus.APPLYING, message="Applying to Kubernetes" 

184 ) 

185 

186 manifest = queued_job["manifest"] 

187 namespace = queued_job["namespace"] 

188 

189 submission_request = ManifestSubmissionRequest( 

190 manifests=[manifest], namespace=namespace, dry_run=False, validate=True 

191 ) 

192 

193 result = await processor.process_manifest_submission(submission_request) 

194 

195 if result.success: 

196 k8s_uid = None 

197 if result.resources: 197 ↛ 200line 197 didn't jump to line 200 because the condition on line 197 was always true

198 k8s_uid = result.resources[0].uid 

199 

200 store.update_job_status( 

201 job_id, 

202 JobStoreStatus.PENDING, 

203 message="Applied to Kubernetes, waiting for scheduling", 

204 k8s_job_uid=k8s_uid, 

205 ) 

206 processed_jobs.append( 

207 {"job_id": job_id, "status": "applied", "k8s_uid": k8s_uid} 

208 ) 

209 else: 

210 error_msg = "; ".join(result.errors) if result.errors else "Unknown error" 

211 store.update_job_status( 

212 job_id, 

213 JobStoreStatus.FAILED, 

214 message="Failed to apply to Kubernetes", 

215 error=error_msg, 

216 ) 

217 processed_jobs.append( 

218 {"job_id": job_id, "status": "failed", "error": error_msg} 

219 ) 

220 

221 except Exception as e: 

222 logger.error(f"Failed to process job {job_id}: {e}") 

223 store.update_job_status( 

224 job_id, JobStoreStatus.FAILED, message="Processing error", error=str(e) 

225 ) 

226 processed_jobs.append({"job_id": job_id, "status": "failed", "error": str(e)}) 

227 

228 return JSONResponse( 

229 status_code=200, 

230 content={ 

231 "timestamp": datetime.now(UTC).isoformat(), 

232 "region": region, 

233 "jobs_polled": len(queued_jobs), 

234 "jobs_processed": len(processed_jobs), 

235 "results": processed_jobs, 

236 }, 

237 ) 

238 

239 except Exception as e: 

240 logger.error(f"Failed to poll jobs: {e}") 

241 raise HTTPException(status_code=500, detail=f"Failed to poll jobs: {e!s}") from e