Coverage for gco / services / api_routes / jobs.py: 84%

311 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-30 21:47 +0000

1"""Job listing, details, logs, events, metrics, delete, and retry endpoints.""" 

2 

3from __future__ import annotations 

4 

5import logging 

6from datetime import UTC, datetime, timedelta 

7from typing import Any 

8 

9from fastapi import APIRouter, HTTPException, Query 

10from fastapi.responses import JSONResponse, Response 

11 

12from gco.models import ManifestSubmissionRequest 

13from gco.services.api_shared import ( 

14 BulkDeleteRequest, 

15 _check_namespace, 

16 _check_processor, 

17 _parse_event_to_dict, 

18 _parse_job_to_dict, 

19 _parse_pod_to_dict, 

20) 

21 

22router = APIRouter(prefix="/api/v1/jobs", tags=["Jobs"]) 

23logger = logging.getLogger(__name__) 

24 

25 

26@router.get("") 

27async def list_jobs( 

28 namespace: str | None = Query(None, description="Filter by namespace"), 

29 status: str | None = Query(None, description="Filter by status"), 

30 limit: int = Query(50, ge=1, le=1000, description="Maximum number of jobs to return"), 

31 offset: int = Query(0, ge=0, description="Number of jobs to skip"), 

32 sort: str = Query("createdAt:desc", description="Sort field and order (field:asc|desc)"), 

33 label_selector: str | None = Query(None, description="Kubernetes label selector"), 

34) -> Response: 

35 """List Kubernetes Jobs with pagination and filtering.""" 

36 processor = _check_processor() 

37 

38 try: 

39 all_jobs = await processor.list_jobs(namespace=namespace, status_filter=status) 

40 

41 if label_selector: 

42 filtered_jobs = [] 

43 for job in all_jobs: 

44 labels = job.get("metadata", {}).get("labels", {}) 

45 match = True 

46 for selector in label_selector.split(","): 

47 if "=" in selector: 47 ↛ 46line 47 didn't jump to line 46 because the condition on line 47 was always true

48 key, value = selector.split("=", 1) 

49 if labels.get(key.strip()) != value.strip(): 

50 match = False 

51 break 

52 if match: 

53 filtered_jobs.append(job) 

54 all_jobs = filtered_jobs 

55 

56 sort_field, sort_order = "createdAt", "desc" 

57 if ":" in sort: 57 ↛ 60line 57 didn't jump to line 60 because the condition on line 57 was always true

58 sort_field, sort_order = sort.split(":", 1) 

59 

60 def get_sort_key(job: dict[str, Any]) -> Any: 

61 if sort_field == "createdAt": 

62 return job.get("metadata", {}).get("creationTimestamp", "") 

63 if sort_field == "name": 63 ↛ 65line 63 didn't jump to line 65 because the condition on line 63 was always true

64 return job.get("metadata", {}).get("name", "") 

65 if sort_field == "status": 

66 return job.get("status", {}).get("active", 0) 

67 return "" 

68 

69 all_jobs.sort(key=get_sort_key, reverse=(sort_order == "desc")) 

70 

71 total = len(all_jobs) 

72 paginated_jobs = all_jobs[offset : offset + limit] 

73 

74 response = { 

75 "cluster_id": processor.cluster_id, 

76 "region": processor.region, 

77 "timestamp": datetime.now(UTC).isoformat(), 

78 "total": total, 

79 "limit": limit, 

80 "offset": offset, 

81 "has_more": (offset + limit) < total, 

82 "count": len(paginated_jobs), 

83 "jobs": paginated_jobs, 

84 } 

85 

86 return JSONResponse(status_code=200, content=response) 

87 

88 except ValueError as e: 

89 raise HTTPException(status_code=400, detail=str(e)) from e 

90 except Exception as e: 

91 logger.error(f"Error listing jobs: {e}") 

92 raise HTTPException(status_code=500, detail=f"Internal server error: {e!s}") from e 

93 

94 

95@router.get("/{namespace}/{name}") 

96async def get_job(namespace: str, name: str) -> Response: 

97 """Get details of a specific Job.""" 

98 processor = _check_processor() 

99 _check_namespace(namespace, processor) 

100 

101 try: 

102 job = processor.batch_v1.read_namespaced_job(name=name, namespace=namespace) 

103 job_info = _parse_job_to_dict(job) 

104 

105 response = { 

106 "cluster_id": processor.cluster_id, 

107 "region": processor.region, 

108 "timestamp": datetime.now(UTC).isoformat(), 

109 **job_info, 

110 } 

111 

112 return JSONResponse(status_code=200, content=response) 

113 

114 except HTTPException: 

115 raise 

116 except Exception as e: 

117 if "NotFound" in str(e) or "404" in str(e): 

118 raise HTTPException( 

119 status_code=404, detail=f"Job '{name}' not found in namespace '{namespace}'" 

120 ) from e 

121 logger.error(f"Error getting job: {e}") 

122 raise HTTPException(status_code=500, detail=f"Internal server error: {e!s}") from e 

123 

124 

125@router.get("/{namespace}/{name}/logs") 

126async def get_job_logs( 

127 namespace: str, 

128 name: str, 

129 container: str | None = Query(None, description="Container name (for multi-container pods)"), 

130 tail: int = Query(100, ge=1, le=10000, description="Number of lines from the end"), 

131 previous: bool = Query(False, description="Get logs from previous terminated container"), 

132 since_seconds: int | None = Query( 

133 None, ge=1, description="Only return logs newer than N seconds" 

134 ), 

135 timestamps: bool = Query(False, description="Include timestamps in log lines"), 

136) -> Response: 

137 """Get logs from a Job's pods.""" 

138 from kubernetes.client.rest import ApiException as K8sApiException 

139 

140 processor = _check_processor() 

141 _check_namespace(namespace, processor) 

142 

143 try: 

144 try: 

145 processor.batch_v1.read_namespaced_job(name=name, namespace=namespace) 

146 except K8sApiException as e: 

147 if e.status == 404: 

148 raise HTTPException( 

149 status_code=404, 

150 detail=f"Job '{name}' not found in namespace '{namespace}'", 

151 ) from e 

152 raise 

153 

154 pods = processor.core_v1.list_namespaced_pod( 

155 namespace=namespace, label_selector=f"job-name={name}" 

156 ) 

157 

158 if not pods.items: 

159 raise HTTPException( 

160 status_code=404, 

161 detail=( 

162 f"No pods found for job '{name}'. " 

163 "The job may have completed and pods were cleaned up " 

164 "(ttlSecondsAfterFinished). Use 'gco jobs get' to check job status." 

165 ), 

166 ) 

167 

168 sorted_pods = sorted( 

169 pods.items, 

170 key=lambda p: p.metadata.creation_timestamp or datetime.min.replace(tzinfo=UTC), 

171 reverse=True, 

172 ) 

173 pod = sorted_pods[0] 

174 

175 pod_phase = pod.status.phase if pod.status else "Unknown" 

176 if pod_phase == "Pending": 

177 raise HTTPException( 

178 status_code=409, 

179 detail=( 

180 f"Pod '{pod.metadata.name}' is still Pending — logs are not yet available. " 

181 "The node may still be provisioning. Use 'gco jobs events' to check." 

182 ), 

183 ) 

184 

185 log_kwargs: dict[str, Any] = { 

186 "name": pod.metadata.name, 

187 "namespace": namespace, 

188 "tail_lines": tail, 

189 "previous": previous, 

190 "timestamps": timestamps, 

191 } 

192 if container: 192 ↛ 193line 192 didn't jump to line 193 because the condition on line 192 was never true

193 log_kwargs["container"] = container 

194 if since_seconds: 194 ↛ 195line 194 didn't jump to line 195 because the condition on line 194 was never true

195 log_kwargs["since_seconds"] = since_seconds 

196 

197 try: 

198 logs = processor.core_v1.read_namespaced_pod_log(**log_kwargs) 

199 except K8sApiException as e: 

200 if e.status == 400: 200 ↛ 213line 200 didn't jump to line 213 because the condition on line 200 was always true

201 error_body = str(e.body) if e.body else str(e.reason) 

202 if "waiting" in error_body.lower() or "not found" in error_body.lower(): 202 ↛ 212line 202 didn't jump to line 212 because the condition on line 202 was always true

203 available = [c.name for c in pod.spec.containers] 

204 raise HTTPException( 

205 status_code=400, 

206 detail=( 

207 f"Logs not available: {error_body}. " 

208 f"Pod phase: {pod_phase}. " 

209 f"Available containers: {available}" 

210 ), 

211 ) from e 

212 raise HTTPException(status_code=400, detail=f"Bad request: {error_body}") from e 

213 raise 

214 

215 available_containers = [c.name for c in pod.spec.containers] 

216 init_containers = [c.name for c in (pod.spec.init_containers or [])] 

217 

218 response = { 

219 "cluster_id": processor.cluster_id, 

220 "region": processor.region, 

221 "timestamp": datetime.now(UTC).isoformat(), 

222 "job_name": name, 

223 "namespace": namespace, 

224 "pod_name": pod.metadata.name, 

225 "container": container or (available_containers[0] if available_containers else None), 

226 "available_containers": available_containers, 

227 "init_containers": init_containers, 

228 "previous": previous, 

229 "tail_lines": tail, 

230 "logs": logs, 

231 } 

232 

233 return JSONResponse(status_code=200, content=response) 

234 

235 except HTTPException: 

236 raise 

237 except K8sApiException as e: 

238 logger.error(f"Kubernetes API error getting job logs: {e.status} {e.reason}") 

239 raise HTTPException( 

240 status_code=502, detail=f"Kubernetes API error: {e.status} {e.reason}" 

241 ) from e 

242 except Exception as e: 

243 logger.error(f"Error getting job logs: {e}") 

244 raise HTTPException(status_code=500, detail=f"Internal server error: {e!s}") from e 

245 

246 

247@router.get("/{namespace}/{name}/events") 

248async def get_job_events(namespace: str, name: str) -> Response: 

249 """Get events related to a Job.""" 

250 processor = _check_processor() 

251 _check_namespace(namespace, processor) 

252 

253 try: 

254 field_selector = f"involvedObject.name={name},involvedObject.kind=Job" 

255 job_events = processor.core_v1.list_namespaced_event( 

256 namespace=namespace, field_selector=field_selector 

257 ) 

258 

259 pods = processor.core_v1.list_namespaced_pod( 

260 namespace=namespace, label_selector=f"job-name={name}" 

261 ) 

262 

263 pod_events = [] 

264 for pod in pods.items: 264 ↛ 265line 264 didn't jump to line 265 because the loop on line 264 never started

265 field_selector = f"involvedObject.name={pod.metadata.name},involvedObject.kind=Pod" 

266 events = processor.core_v1.list_namespaced_event( 

267 namespace=namespace, field_selector=field_selector 

268 ) 

269 pod_events.extend(events.items) 

270 

271 all_events = [_parse_event_to_dict(e) for e in job_events.items] 

272 all_events.extend([_parse_event_to_dict(e) for e in pod_events]) 

273 all_events.sort( 

274 key=lambda e: e.get("lastTimestamp") or e.get("firstTimestamp") or "", reverse=True 

275 ) 

276 

277 response = { 

278 "cluster_id": processor.cluster_id, 

279 "region": processor.region, 

280 "timestamp": datetime.now(UTC).isoformat(), 

281 "job_name": name, 

282 "namespace": namespace, 

283 "count": len(all_events), 

284 "events": all_events, 

285 } 

286 

287 return JSONResponse(status_code=200, content=response) 

288 

289 except HTTPException: 

290 raise 

291 except Exception as e: 

292 logger.error(f"Error getting job events: {e}") 

293 raise HTTPException(status_code=500, detail=f"Internal server error: {e!s}") from e 

294 

295 

296@router.get("/{namespace}/{name}/pods") 

297async def get_job_pods(namespace: str, name: str) -> Response: 

298 """Get pods belonging to a Job.""" 

299 processor = _check_processor() 

300 _check_namespace(namespace, processor) 

301 

302 try: 

303 pods = processor.core_v1.list_namespaced_pod( 

304 namespace=namespace, label_selector=f"job-name={name}" 

305 ) 

306 pod_list = [_parse_pod_to_dict(pod) for pod in pods.items] 

307 

308 response = { 

309 "cluster_id": processor.cluster_id, 

310 "region": processor.region, 

311 "timestamp": datetime.now(UTC).isoformat(), 

312 "job_name": name, 

313 "namespace": namespace, 

314 "count": len(pod_list), 

315 "pods": pod_list, 

316 } 

317 

318 return JSONResponse(status_code=200, content=response) 

319 

320 except HTTPException: 

321 raise 

322 except Exception as e: 

323 logger.error(f"Error getting job pods: {e}") 

324 raise HTTPException(status_code=500, detail=f"Internal server error: {e!s}") from e 

325 

326 

327@router.get("/{namespace}/{name}/pods/{pod_name}/logs") 

328async def get_pod_logs( 

329 namespace: str, 

330 name: str, 

331 pod_name: str, 

332 container: str | None = Query(None, description="Container name"), 

333 tail: int = Query(100, ge=1, le=10000, description="Number of lines from the end"), 

334 previous: bool = Query(False, description="Get logs from previous terminated container"), 

335) -> Response: 

336 """Get logs from a specific pod belonging to a Job.""" 

337 processor = _check_processor() 

338 _check_namespace(namespace, processor) 

339 

340 try: 

341 pod = processor.core_v1.read_namespaced_pod(name=pod_name, namespace=namespace) 

342 job_name_label = pod.metadata.labels.get("job-name") 

343 if job_name_label != name: 

344 raise HTTPException( 

345 status_code=400, detail=f"Pod '{pod_name}' does not belong to job '{name}'" 

346 ) 

347 

348 log_kwargs: dict[str, Any] = { 

349 "name": pod_name, 

350 "namespace": namespace, 

351 "tail_lines": tail, 

352 "previous": previous, 

353 } 

354 if container: 354 ↛ 355line 354 didn't jump to line 355 because the condition on line 354 was never true

355 log_kwargs["container"] = container 

356 

357 logs = processor.core_v1.read_namespaced_pod_log(**log_kwargs) 

358 

359 response = { 

360 "cluster_id": processor.cluster_id, 

361 "region": processor.region, 

362 "timestamp": datetime.now(UTC).isoformat(), 

363 "job_name": name, 

364 "namespace": namespace, 

365 "pod_name": pod_name, 

366 "container": container, 

367 "logs": logs, 

368 } 

369 

370 return JSONResponse(status_code=200, content=response) 

371 

372 except HTTPException: 

373 raise 

374 except Exception as e: 

375 if "NotFound" in str(e) or "404" in str(e): 375 ↛ 377line 375 didn't jump to line 377 because the condition on line 375 was always true

376 raise HTTPException(status_code=404, detail=f"Pod '{pod_name}' not found") from e 

377 logger.error(f"Error getting pod logs: {e}") 

378 raise HTTPException(status_code=500, detail=f"Internal server error: {e!s}") from e 

379 

380 

381@router.get("/{namespace}/{name}/metrics") 

382async def get_job_metrics(namespace: str, name: str) -> Response: 

383 """Get resource usage metrics for a Job's pods.""" 

384 processor = _check_processor() 

385 _check_namespace(namespace, processor) 

386 

387 try: 

388 pods = processor.core_v1.list_namespaced_pod( 

389 namespace=namespace, label_selector=f"job-name={name}" 

390 ) 

391 

392 if not pods.items: 

393 raise HTTPException(status_code=404, detail=f"No pods found for job '{name}'") 

394 

395 pod_metrics = [] 

396 total_cpu_millicores = 0 

397 total_memory_bytes = 0 

398 

399 try: 

400 for pod in pods.items: 

401 try: 

402 metrics = processor.custom_objects.get_namespaced_custom_object( 

403 group="metrics.k8s.io", 

404 version="v1beta1", 

405 namespace=namespace, 

406 plural="pods", 

407 name=pod.metadata.name, 

408 ) 

409 

410 containers_metrics = [] 

411 for container in metrics.get("containers", []): 

412 cpu_str = container.get("usage", {}).get("cpu", "0") 

413 memory_str = container.get("usage", {}).get("memory", "0") 

414 

415 cpu_millicores = 0 

416 if cpu_str.endswith("n"): 

417 cpu_millicores = int(cpu_str[:-1]) // 1000000 

418 elif cpu_str.endswith("m"): 

419 cpu_millicores = int(cpu_str[:-1]) 

420 else: 

421 cpu_millicores = int(cpu_str) * 1000 

422 

423 memory_bytes = 0 

424 if memory_str.endswith("Ki"): 

425 memory_bytes = int(memory_str[:-2]) * 1024 

426 elif memory_str.endswith("Mi"): 

427 memory_bytes = int(memory_str[:-2]) * 1024 * 1024 

428 elif memory_str.endswith("Gi"): 428 ↛ 431line 428 didn't jump to line 431 because the condition on line 428 was always true

429 memory_bytes = int(memory_str[:-2]) * 1024 * 1024 * 1024 

430 else: 

431 memory_bytes = int(memory_str) 

432 

433 total_cpu_millicores += cpu_millicores 

434 total_memory_bytes += memory_bytes 

435 

436 containers_metrics.append( 

437 { 

438 "name": container.get("name"), 

439 "cpu_millicores": cpu_millicores, 

440 "memory_bytes": memory_bytes, 

441 "memory_mib": round(memory_bytes / (1024 * 1024), 2), 

442 } 

443 ) 

444 

445 pod_metrics.append( 

446 {"pod_name": pod.metadata.name, "containers": containers_metrics} 

447 ) 

448 

449 except Exception as e: 

450 logger.warning(f"Could not get metrics for pod {pod.metadata.name}: {e}") 

451 pod_metrics.append( 

452 {"pod_name": pod.metadata.name, "error": "Metrics not available"} 

453 ) 

454 

455 except Exception as e: 

456 logger.warning(f"Metrics API not available: {e}") 

457 

458 response = { 

459 "cluster_id": processor.cluster_id, 

460 "region": processor.region, 

461 "timestamp": datetime.now(UTC).isoformat(), 

462 "job_name": name, 

463 "namespace": namespace, 

464 "summary": { 

465 "total_cpu_millicores": total_cpu_millicores, 

466 "total_memory_bytes": total_memory_bytes, 

467 "total_memory_mib": round(total_memory_bytes / (1024 * 1024), 2), 

468 "pod_count": len(pods.items), 

469 }, 

470 "pods": pod_metrics, 

471 } 

472 

473 return JSONResponse(status_code=200, content=response) 

474 

475 except HTTPException: 

476 raise 

477 except Exception as e: 

478 logger.error(f"Error getting job metrics: {e}") 

479 raise HTTPException(status_code=500, detail=f"Internal server error: {e!s}") from e 

480 

481 

482@router.delete("/{namespace}/{name}") 

483async def delete_job(namespace: str, name: str) -> Response: 

484 """Delete a Job and its pods.""" 

485 processor = _check_processor() 

486 _check_namespace(namespace, processor) 

487 

488 try: 

489 processor.batch_v1.delete_namespaced_job( 

490 name=name, namespace=namespace, propagation_policy="Background" 

491 ) 

492 

493 response = { 

494 "cluster_id": processor.cluster_id, 

495 "region": processor.region, 

496 "timestamp": datetime.now(UTC).isoformat(), 

497 "job_name": name, 

498 "namespace": namespace, 

499 "status": "deleted", 

500 "message": "Job deleted successfully", 

501 } 

502 

503 return JSONResponse(status_code=200, content=response) 

504 

505 except HTTPException: 

506 raise 

507 except Exception as e: 

508 if "NotFound" in str(e) or "404" in str(e): 

509 raise HTTPException( 

510 status_code=404, detail=f"Job '{name}' not found in namespace '{namespace}'" 

511 ) from e 

512 logger.error(f"Error deleting job: {e}") 

513 raise HTTPException(status_code=500, detail=f"Internal server error: {e!s}") from e 

514 

515 

516@router.delete("") 

517async def bulk_delete_jobs(request: BulkDeleteRequest) -> Response: 

518 """Bulk delete jobs based on filters.""" 

519 

520 processor = _check_processor() 

521 

522 try: 

523 status_filter = request.status.value if request.status else None 

524 all_jobs = await processor.list_jobs( 

525 namespace=request.namespace, status_filter=status_filter 

526 ) 

527 

528 jobs_to_delete = [] 

529 cutoff_time = None 

530 if request.older_than_days: 530 ↛ 531line 530 didn't jump to line 531 because the condition on line 530 was never true

531 cutoff_time = datetime.now(UTC) - timedelta(days=request.older_than_days) 

532 

533 for job in all_jobs: 

534 if cutoff_time: 534 ↛ 535line 534 didn't jump to line 535 because the condition on line 534 was never true

535 created_str = job.get("metadata", {}).get("creationTimestamp") 

536 if created_str: 

537 created = datetime.fromisoformat(created_str.replace("Z", "+00:00")) 

538 if created.replace(tzinfo=None) > cutoff_time: 

539 continue 

540 

541 if request.label_selector: 

542 labels = job.get("metadata", {}).get("labels", {}) 

543 match = True 

544 for selector in request.label_selector.split(","): 

545 if "=" in selector: 545 ↛ 544line 545 didn't jump to line 544 because the condition on line 545 was always true

546 key, value = selector.split("=", 1) 

547 if labels.get(key.strip()) != value.strip(): 

548 match = False 

549 break 

550 if not match: 

551 continue 

552 

553 jobs_to_delete.append(job) 

554 

555 deleted_jobs = [] 

556 failed_jobs = [] 

557 

558 if not request.dry_run: 

559 for job in jobs_to_delete: 

560 job_name = job.get("metadata", {}).get("name") 

561 job_namespace = job.get("metadata", {}).get("namespace") 

562 try: 

563 processor.batch_v1.delete_namespaced_job( 

564 name=job_name, namespace=job_namespace, propagation_policy="Background" 

565 ) 

566 deleted_jobs.append({"name": job_name, "namespace": job_namespace}) 

567 except Exception as e: 

568 failed_jobs.append( 

569 {"name": job_name, "namespace": job_namespace, "error": str(e)} 

570 ) 

571 

572 response: dict[str, Any] = { 

573 "cluster_id": processor.cluster_id, 

574 "region": processor.region, 

575 "timestamp": datetime.now(UTC).isoformat(), 

576 "dry_run": request.dry_run, 

577 "total_matched": len(jobs_to_delete), 

578 "deleted_count": len(deleted_jobs), 

579 "failed_count": len(failed_jobs), 

580 "jobs": ( 

581 [ 

582 { 

583 "name": j.get("metadata", {}).get("name"), 

584 "namespace": j.get("metadata", {}).get("namespace"), 

585 } 

586 for j in jobs_to_delete 

587 ] 

588 if request.dry_run 

589 else deleted_jobs 

590 ), 

591 "failed": failed_jobs if failed_jobs else None, 

592 } 

593 

594 return JSONResponse(status_code=200, content=response) 

595 

596 except ValueError as e: 

597 raise HTTPException(status_code=400, detail=str(e)) from e 

598 except Exception as e: 

599 logger.error(f"Error bulk deleting jobs: {e}") 

600 raise HTTPException(status_code=500, detail=f"Internal server error: {e!s}") from e 

601 

602 

603@router.post("/{namespace}/{name}/retry") 

604async def retry_job(namespace: str, name: str) -> Response: 

605 """Retry a failed job by creating a new job from its spec.""" 

606 processor = _check_processor() 

607 _check_namespace(namespace, processor) 

608 

609 try: 

610 try: 

611 original_job = processor.batch_v1.read_namespaced_job(name=name, namespace=namespace) 

612 except Exception as e: 

613 if "NotFound" in str(e) or "404" in str(e): 

614 raise HTTPException( 

615 status_code=404, detail=f"Job '{name}' not found in namespace '{namespace}'" 

616 ) from e 

617 raise 

618 

619 new_name = f"{name}-retry-{datetime.now(UTC).strftime('%Y%m%d%H%M%S')}" 

620 

621 new_job_manifest = { 

622 "apiVersion": "batch/v1", 

623 "kind": "Job", 

624 "metadata": { 

625 "name": new_name, 

626 "namespace": namespace, 

627 "labels": { 

628 **(original_job.metadata.labels or {}), 

629 "gco.io/retry-of": name, 

630 }, 

631 "annotations": { 

632 **(original_job.metadata.annotations or {}), 

633 "gco.io/original-job": name, 

634 }, 

635 }, 

636 "spec": { 

637 "parallelism": original_job.spec.parallelism, 

638 "completions": original_job.spec.completions, 

639 "backoffLimit": original_job.spec.backoff_limit, 

640 "template": original_job.spec.template.to_dict(), 

641 }, 

642 } 

643 

644 spec_dict = new_job_manifest.get("spec", {}) 

645 if isinstance(spec_dict, dict): 645 ↛ 650line 645 didn't jump to line 650 because the condition on line 645 was always true

646 template_dict = spec_dict.get("template", {}) 

647 if isinstance(template_dict, dict) and "status" in template_dict: 647 ↛ 648line 647 didn't jump to line 648 because the condition on line 647 was never true

648 del template_dict["status"] 

649 

650 submission_request = ManifestSubmissionRequest( 

651 manifests=[new_job_manifest], namespace=namespace, dry_run=False, validate=True 

652 ) 

653 

654 result = await processor.process_manifest_submission(submission_request) 

655 

656 response = { 

657 "cluster_id": processor.cluster_id, 

658 "region": processor.region, 

659 "timestamp": datetime.now(UTC).isoformat(), 

660 "original_job": name, 

661 "new_job": new_name, 

662 "namespace": namespace, 

663 "success": result.success, 

664 "message": ( 

665 "Job retry created successfully" if result.success else "Failed to create retry job" 

666 ), 

667 "errors": result.errors, 

668 } 

669 

670 status_code = 201 if result.success else 400 

671 return JSONResponse(status_code=status_code, content=response) 

672 

673 except HTTPException: 

674 raise 

675 except Exception as e: 

676 logger.error(f"Error retrying job: {e}") 

677 raise HTTPException(status_code=500, detail=f"Internal server error: {e!s}") from e