Coverage for gco / services / api_routes / jobs.py: 84%
311 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-30 21:47 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-30 21:47 +0000
1"""Job listing, details, logs, events, metrics, delete, and retry endpoints."""
3from __future__ import annotations
5import logging
6from datetime import UTC, datetime, timedelta
7from typing import Any
9from fastapi import APIRouter, HTTPException, Query
10from fastapi.responses import JSONResponse, Response
12from gco.models import ManifestSubmissionRequest
13from gco.services.api_shared import (
14 BulkDeleteRequest,
15 _check_namespace,
16 _check_processor,
17 _parse_event_to_dict,
18 _parse_job_to_dict,
19 _parse_pod_to_dict,
20)
22router = APIRouter(prefix="/api/v1/jobs", tags=["Jobs"])
23logger = logging.getLogger(__name__)
26@router.get("")
27async def list_jobs(
28 namespace: str | None = Query(None, description="Filter by namespace"),
29 status: str | None = Query(None, description="Filter by status"),
30 limit: int = Query(50, ge=1, le=1000, description="Maximum number of jobs to return"),
31 offset: int = Query(0, ge=0, description="Number of jobs to skip"),
32 sort: str = Query("createdAt:desc", description="Sort field and order (field:asc|desc)"),
33 label_selector: str | None = Query(None, description="Kubernetes label selector"),
34) -> Response:
35 """List Kubernetes Jobs with pagination and filtering."""
36 processor = _check_processor()
38 try:
39 all_jobs = await processor.list_jobs(namespace=namespace, status_filter=status)
41 if label_selector:
42 filtered_jobs = []
43 for job in all_jobs:
44 labels = job.get("metadata", {}).get("labels", {})
45 match = True
46 for selector in label_selector.split(","):
47 if "=" in selector: 47 ↛ 46line 47 didn't jump to line 46 because the condition on line 47 was always true
48 key, value = selector.split("=", 1)
49 if labels.get(key.strip()) != value.strip():
50 match = False
51 break
52 if match:
53 filtered_jobs.append(job)
54 all_jobs = filtered_jobs
56 sort_field, sort_order = "createdAt", "desc"
57 if ":" in sort: 57 ↛ 60line 57 didn't jump to line 60 because the condition on line 57 was always true
58 sort_field, sort_order = sort.split(":", 1)
60 def get_sort_key(job: dict[str, Any]) -> Any:
61 if sort_field == "createdAt":
62 return job.get("metadata", {}).get("creationTimestamp", "")
63 if sort_field == "name": 63 ↛ 65line 63 didn't jump to line 65 because the condition on line 63 was always true
64 return job.get("metadata", {}).get("name", "")
65 if sort_field == "status":
66 return job.get("status", {}).get("active", 0)
67 return ""
69 all_jobs.sort(key=get_sort_key, reverse=(sort_order == "desc"))
71 total = len(all_jobs)
72 paginated_jobs = all_jobs[offset : offset + limit]
74 response = {
75 "cluster_id": processor.cluster_id,
76 "region": processor.region,
77 "timestamp": datetime.now(UTC).isoformat(),
78 "total": total,
79 "limit": limit,
80 "offset": offset,
81 "has_more": (offset + limit) < total,
82 "count": len(paginated_jobs),
83 "jobs": paginated_jobs,
84 }
86 return JSONResponse(status_code=200, content=response)
88 except ValueError as e:
89 raise HTTPException(status_code=400, detail=str(e)) from e
90 except Exception as e:
91 logger.error(f"Error listing jobs: {e}")
92 raise HTTPException(status_code=500, detail=f"Internal server error: {e!s}") from e
95@router.get("/{namespace}/{name}")
96async def get_job(namespace: str, name: str) -> Response:
97 """Get details of a specific Job."""
98 processor = _check_processor()
99 _check_namespace(namespace, processor)
101 try:
102 job = processor.batch_v1.read_namespaced_job(name=name, namespace=namespace)
103 job_info = _parse_job_to_dict(job)
105 response = {
106 "cluster_id": processor.cluster_id,
107 "region": processor.region,
108 "timestamp": datetime.now(UTC).isoformat(),
109 **job_info,
110 }
112 return JSONResponse(status_code=200, content=response)
114 except HTTPException:
115 raise
116 except Exception as e:
117 if "NotFound" in str(e) or "404" in str(e):
118 raise HTTPException(
119 status_code=404, detail=f"Job '{name}' not found in namespace '{namespace}'"
120 ) from e
121 logger.error(f"Error getting job: {e}")
122 raise HTTPException(status_code=500, detail=f"Internal server error: {e!s}") from e
125@router.get("/{namespace}/{name}/logs")
126async def get_job_logs(
127 namespace: str,
128 name: str,
129 container: str | None = Query(None, description="Container name (for multi-container pods)"),
130 tail: int = Query(100, ge=1, le=10000, description="Number of lines from the end"),
131 previous: bool = Query(False, description="Get logs from previous terminated container"),
132 since_seconds: int | None = Query(
133 None, ge=1, description="Only return logs newer than N seconds"
134 ),
135 timestamps: bool = Query(False, description="Include timestamps in log lines"),
136) -> Response:
137 """Get logs from a Job's pods."""
138 from kubernetes.client.rest import ApiException as K8sApiException
140 processor = _check_processor()
141 _check_namespace(namespace, processor)
143 try:
144 try:
145 processor.batch_v1.read_namespaced_job(name=name, namespace=namespace)
146 except K8sApiException as e:
147 if e.status == 404:
148 raise HTTPException(
149 status_code=404,
150 detail=f"Job '{name}' not found in namespace '{namespace}'",
151 ) from e
152 raise
154 pods = processor.core_v1.list_namespaced_pod(
155 namespace=namespace, label_selector=f"job-name={name}"
156 )
158 if not pods.items:
159 raise HTTPException(
160 status_code=404,
161 detail=(
162 f"No pods found for job '{name}'. "
163 "The job may have completed and pods were cleaned up "
164 "(ttlSecondsAfterFinished). Use 'gco jobs get' to check job status."
165 ),
166 )
168 sorted_pods = sorted(
169 pods.items,
170 key=lambda p: p.metadata.creation_timestamp or datetime.min.replace(tzinfo=UTC),
171 reverse=True,
172 )
173 pod = sorted_pods[0]
175 pod_phase = pod.status.phase if pod.status else "Unknown"
176 if pod_phase == "Pending":
177 raise HTTPException(
178 status_code=409,
179 detail=(
180 f"Pod '{pod.metadata.name}' is still Pending — logs are not yet available. "
181 "The node may still be provisioning. Use 'gco jobs events' to check."
182 ),
183 )
185 log_kwargs: dict[str, Any] = {
186 "name": pod.metadata.name,
187 "namespace": namespace,
188 "tail_lines": tail,
189 "previous": previous,
190 "timestamps": timestamps,
191 }
192 if container: 192 ↛ 193line 192 didn't jump to line 193 because the condition on line 192 was never true
193 log_kwargs["container"] = container
194 if since_seconds: 194 ↛ 195line 194 didn't jump to line 195 because the condition on line 194 was never true
195 log_kwargs["since_seconds"] = since_seconds
197 try:
198 logs = processor.core_v1.read_namespaced_pod_log(**log_kwargs)
199 except K8sApiException as e:
200 if e.status == 400: 200 ↛ 213line 200 didn't jump to line 213 because the condition on line 200 was always true
201 error_body = str(e.body) if e.body else str(e.reason)
202 if "waiting" in error_body.lower() or "not found" in error_body.lower(): 202 ↛ 212line 202 didn't jump to line 212 because the condition on line 202 was always true
203 available = [c.name for c in pod.spec.containers]
204 raise HTTPException(
205 status_code=400,
206 detail=(
207 f"Logs not available: {error_body}. "
208 f"Pod phase: {pod_phase}. "
209 f"Available containers: {available}"
210 ),
211 ) from e
212 raise HTTPException(status_code=400, detail=f"Bad request: {error_body}") from e
213 raise
215 available_containers = [c.name for c in pod.spec.containers]
216 init_containers = [c.name for c in (pod.spec.init_containers or [])]
218 response = {
219 "cluster_id": processor.cluster_id,
220 "region": processor.region,
221 "timestamp": datetime.now(UTC).isoformat(),
222 "job_name": name,
223 "namespace": namespace,
224 "pod_name": pod.metadata.name,
225 "container": container or (available_containers[0] if available_containers else None),
226 "available_containers": available_containers,
227 "init_containers": init_containers,
228 "previous": previous,
229 "tail_lines": tail,
230 "logs": logs,
231 }
233 return JSONResponse(status_code=200, content=response)
235 except HTTPException:
236 raise
237 except K8sApiException as e:
238 logger.error(f"Kubernetes API error getting job logs: {e.status} {e.reason}")
239 raise HTTPException(
240 status_code=502, detail=f"Kubernetes API error: {e.status} {e.reason}"
241 ) from e
242 except Exception as e:
243 logger.error(f"Error getting job logs: {e}")
244 raise HTTPException(status_code=500, detail=f"Internal server error: {e!s}") from e
247@router.get("/{namespace}/{name}/events")
248async def get_job_events(namespace: str, name: str) -> Response:
249 """Get events related to a Job."""
250 processor = _check_processor()
251 _check_namespace(namespace, processor)
253 try:
254 field_selector = f"involvedObject.name={name},involvedObject.kind=Job"
255 job_events = processor.core_v1.list_namespaced_event(
256 namespace=namespace, field_selector=field_selector
257 )
259 pods = processor.core_v1.list_namespaced_pod(
260 namespace=namespace, label_selector=f"job-name={name}"
261 )
263 pod_events = []
264 for pod in pods.items: 264 ↛ 265line 264 didn't jump to line 265 because the loop on line 264 never started
265 field_selector = f"involvedObject.name={pod.metadata.name},involvedObject.kind=Pod"
266 events = processor.core_v1.list_namespaced_event(
267 namespace=namespace, field_selector=field_selector
268 )
269 pod_events.extend(events.items)
271 all_events = [_parse_event_to_dict(e) for e in job_events.items]
272 all_events.extend([_parse_event_to_dict(e) for e in pod_events])
273 all_events.sort(
274 key=lambda e: e.get("lastTimestamp") or e.get("firstTimestamp") or "", reverse=True
275 )
277 response = {
278 "cluster_id": processor.cluster_id,
279 "region": processor.region,
280 "timestamp": datetime.now(UTC).isoformat(),
281 "job_name": name,
282 "namespace": namespace,
283 "count": len(all_events),
284 "events": all_events,
285 }
287 return JSONResponse(status_code=200, content=response)
289 except HTTPException:
290 raise
291 except Exception as e:
292 logger.error(f"Error getting job events: {e}")
293 raise HTTPException(status_code=500, detail=f"Internal server error: {e!s}") from e
296@router.get("/{namespace}/{name}/pods")
297async def get_job_pods(namespace: str, name: str) -> Response:
298 """Get pods belonging to a Job."""
299 processor = _check_processor()
300 _check_namespace(namespace, processor)
302 try:
303 pods = processor.core_v1.list_namespaced_pod(
304 namespace=namespace, label_selector=f"job-name={name}"
305 )
306 pod_list = [_parse_pod_to_dict(pod) for pod in pods.items]
308 response = {
309 "cluster_id": processor.cluster_id,
310 "region": processor.region,
311 "timestamp": datetime.now(UTC).isoformat(),
312 "job_name": name,
313 "namespace": namespace,
314 "count": len(pod_list),
315 "pods": pod_list,
316 }
318 return JSONResponse(status_code=200, content=response)
320 except HTTPException:
321 raise
322 except Exception as e:
323 logger.error(f"Error getting job pods: {e}")
324 raise HTTPException(status_code=500, detail=f"Internal server error: {e!s}") from e
327@router.get("/{namespace}/{name}/pods/{pod_name}/logs")
328async def get_pod_logs(
329 namespace: str,
330 name: str,
331 pod_name: str,
332 container: str | None = Query(None, description="Container name"),
333 tail: int = Query(100, ge=1, le=10000, description="Number of lines from the end"),
334 previous: bool = Query(False, description="Get logs from previous terminated container"),
335) -> Response:
336 """Get logs from a specific pod belonging to a Job."""
337 processor = _check_processor()
338 _check_namespace(namespace, processor)
340 try:
341 pod = processor.core_v1.read_namespaced_pod(name=pod_name, namespace=namespace)
342 job_name_label = pod.metadata.labels.get("job-name")
343 if job_name_label != name:
344 raise HTTPException(
345 status_code=400, detail=f"Pod '{pod_name}' does not belong to job '{name}'"
346 )
348 log_kwargs: dict[str, Any] = {
349 "name": pod_name,
350 "namespace": namespace,
351 "tail_lines": tail,
352 "previous": previous,
353 }
354 if container: 354 ↛ 355line 354 didn't jump to line 355 because the condition on line 354 was never true
355 log_kwargs["container"] = container
357 logs = processor.core_v1.read_namespaced_pod_log(**log_kwargs)
359 response = {
360 "cluster_id": processor.cluster_id,
361 "region": processor.region,
362 "timestamp": datetime.now(UTC).isoformat(),
363 "job_name": name,
364 "namespace": namespace,
365 "pod_name": pod_name,
366 "container": container,
367 "logs": logs,
368 }
370 return JSONResponse(status_code=200, content=response)
372 except HTTPException:
373 raise
374 except Exception as e:
375 if "NotFound" in str(e) or "404" in str(e): 375 ↛ 377line 375 didn't jump to line 377 because the condition on line 375 was always true
376 raise HTTPException(status_code=404, detail=f"Pod '{pod_name}' not found") from e
377 logger.error(f"Error getting pod logs: {e}")
378 raise HTTPException(status_code=500, detail=f"Internal server error: {e!s}") from e
381@router.get("/{namespace}/{name}/metrics")
382async def get_job_metrics(namespace: str, name: str) -> Response:
383 """Get resource usage metrics for a Job's pods."""
384 processor = _check_processor()
385 _check_namespace(namespace, processor)
387 try:
388 pods = processor.core_v1.list_namespaced_pod(
389 namespace=namespace, label_selector=f"job-name={name}"
390 )
392 if not pods.items:
393 raise HTTPException(status_code=404, detail=f"No pods found for job '{name}'")
395 pod_metrics = []
396 total_cpu_millicores = 0
397 total_memory_bytes = 0
399 try:
400 for pod in pods.items:
401 try:
402 metrics = processor.custom_objects.get_namespaced_custom_object(
403 group="metrics.k8s.io",
404 version="v1beta1",
405 namespace=namespace,
406 plural="pods",
407 name=pod.metadata.name,
408 )
410 containers_metrics = []
411 for container in metrics.get("containers", []):
412 cpu_str = container.get("usage", {}).get("cpu", "0")
413 memory_str = container.get("usage", {}).get("memory", "0")
415 cpu_millicores = 0
416 if cpu_str.endswith("n"):
417 cpu_millicores = int(cpu_str[:-1]) // 1000000
418 elif cpu_str.endswith("m"):
419 cpu_millicores = int(cpu_str[:-1])
420 else:
421 cpu_millicores = int(cpu_str) * 1000
423 memory_bytes = 0
424 if memory_str.endswith("Ki"):
425 memory_bytes = int(memory_str[:-2]) * 1024
426 elif memory_str.endswith("Mi"):
427 memory_bytes = int(memory_str[:-2]) * 1024 * 1024
428 elif memory_str.endswith("Gi"): 428 ↛ 431line 428 didn't jump to line 431 because the condition on line 428 was always true
429 memory_bytes = int(memory_str[:-2]) * 1024 * 1024 * 1024
430 else:
431 memory_bytes = int(memory_str)
433 total_cpu_millicores += cpu_millicores
434 total_memory_bytes += memory_bytes
436 containers_metrics.append(
437 {
438 "name": container.get("name"),
439 "cpu_millicores": cpu_millicores,
440 "memory_bytes": memory_bytes,
441 "memory_mib": round(memory_bytes / (1024 * 1024), 2),
442 }
443 )
445 pod_metrics.append(
446 {"pod_name": pod.metadata.name, "containers": containers_metrics}
447 )
449 except Exception as e:
450 logger.warning(f"Could not get metrics for pod {pod.metadata.name}: {e}")
451 pod_metrics.append(
452 {"pod_name": pod.metadata.name, "error": "Metrics not available"}
453 )
455 except Exception as e:
456 logger.warning(f"Metrics API not available: {e}")
458 response = {
459 "cluster_id": processor.cluster_id,
460 "region": processor.region,
461 "timestamp": datetime.now(UTC).isoformat(),
462 "job_name": name,
463 "namespace": namespace,
464 "summary": {
465 "total_cpu_millicores": total_cpu_millicores,
466 "total_memory_bytes": total_memory_bytes,
467 "total_memory_mib": round(total_memory_bytes / (1024 * 1024), 2),
468 "pod_count": len(pods.items),
469 },
470 "pods": pod_metrics,
471 }
473 return JSONResponse(status_code=200, content=response)
475 except HTTPException:
476 raise
477 except Exception as e:
478 logger.error(f"Error getting job metrics: {e}")
479 raise HTTPException(status_code=500, detail=f"Internal server error: {e!s}") from e
482@router.delete("/{namespace}/{name}")
483async def delete_job(namespace: str, name: str) -> Response:
484 """Delete a Job and its pods."""
485 processor = _check_processor()
486 _check_namespace(namespace, processor)
488 try:
489 processor.batch_v1.delete_namespaced_job(
490 name=name, namespace=namespace, propagation_policy="Background"
491 )
493 response = {
494 "cluster_id": processor.cluster_id,
495 "region": processor.region,
496 "timestamp": datetime.now(UTC).isoformat(),
497 "job_name": name,
498 "namespace": namespace,
499 "status": "deleted",
500 "message": "Job deleted successfully",
501 }
503 return JSONResponse(status_code=200, content=response)
505 except HTTPException:
506 raise
507 except Exception as e:
508 if "NotFound" in str(e) or "404" in str(e):
509 raise HTTPException(
510 status_code=404, detail=f"Job '{name}' not found in namespace '{namespace}'"
511 ) from e
512 logger.error(f"Error deleting job: {e}")
513 raise HTTPException(status_code=500, detail=f"Internal server error: {e!s}") from e
516@router.delete("")
517async def bulk_delete_jobs(request: BulkDeleteRequest) -> Response:
518 """Bulk delete jobs based on filters."""
520 processor = _check_processor()
522 try:
523 status_filter = request.status.value if request.status else None
524 all_jobs = await processor.list_jobs(
525 namespace=request.namespace, status_filter=status_filter
526 )
528 jobs_to_delete = []
529 cutoff_time = None
530 if request.older_than_days: 530 ↛ 531line 530 didn't jump to line 531 because the condition on line 530 was never true
531 cutoff_time = datetime.now(UTC) - timedelta(days=request.older_than_days)
533 for job in all_jobs:
534 if cutoff_time: 534 ↛ 535line 534 didn't jump to line 535 because the condition on line 534 was never true
535 created_str = job.get("metadata", {}).get("creationTimestamp")
536 if created_str:
537 created = datetime.fromisoformat(created_str.replace("Z", "+00:00"))
538 if created.replace(tzinfo=None) > cutoff_time:
539 continue
541 if request.label_selector:
542 labels = job.get("metadata", {}).get("labels", {})
543 match = True
544 for selector in request.label_selector.split(","):
545 if "=" in selector: 545 ↛ 544line 545 didn't jump to line 544 because the condition on line 545 was always true
546 key, value = selector.split("=", 1)
547 if labels.get(key.strip()) != value.strip():
548 match = False
549 break
550 if not match:
551 continue
553 jobs_to_delete.append(job)
555 deleted_jobs = []
556 failed_jobs = []
558 if not request.dry_run:
559 for job in jobs_to_delete:
560 job_name = job.get("metadata", {}).get("name")
561 job_namespace = job.get("metadata", {}).get("namespace")
562 try:
563 processor.batch_v1.delete_namespaced_job(
564 name=job_name, namespace=job_namespace, propagation_policy="Background"
565 )
566 deleted_jobs.append({"name": job_name, "namespace": job_namespace})
567 except Exception as e:
568 failed_jobs.append(
569 {"name": job_name, "namespace": job_namespace, "error": str(e)}
570 )
572 response: dict[str, Any] = {
573 "cluster_id": processor.cluster_id,
574 "region": processor.region,
575 "timestamp": datetime.now(UTC).isoformat(),
576 "dry_run": request.dry_run,
577 "total_matched": len(jobs_to_delete),
578 "deleted_count": len(deleted_jobs),
579 "failed_count": len(failed_jobs),
580 "jobs": (
581 [
582 {
583 "name": j.get("metadata", {}).get("name"),
584 "namespace": j.get("metadata", {}).get("namespace"),
585 }
586 for j in jobs_to_delete
587 ]
588 if request.dry_run
589 else deleted_jobs
590 ),
591 "failed": failed_jobs if failed_jobs else None,
592 }
594 return JSONResponse(status_code=200, content=response)
596 except ValueError as e:
597 raise HTTPException(status_code=400, detail=str(e)) from e
598 except Exception as e:
599 logger.error(f"Error bulk deleting jobs: {e}")
600 raise HTTPException(status_code=500, detail=f"Internal server error: {e!s}") from e
603@router.post("/{namespace}/{name}/retry")
604async def retry_job(namespace: str, name: str) -> Response:
605 """Retry a failed job by creating a new job from its spec."""
606 processor = _check_processor()
607 _check_namespace(namespace, processor)
609 try:
610 try:
611 original_job = processor.batch_v1.read_namespaced_job(name=name, namespace=namespace)
612 except Exception as e:
613 if "NotFound" in str(e) or "404" in str(e):
614 raise HTTPException(
615 status_code=404, detail=f"Job '{name}' not found in namespace '{namespace}'"
616 ) from e
617 raise
619 new_name = f"{name}-retry-{datetime.now(UTC).strftime('%Y%m%d%H%M%S')}"
621 new_job_manifest = {
622 "apiVersion": "batch/v1",
623 "kind": "Job",
624 "metadata": {
625 "name": new_name,
626 "namespace": namespace,
627 "labels": {
628 **(original_job.metadata.labels or {}),
629 "gco.io/retry-of": name,
630 },
631 "annotations": {
632 **(original_job.metadata.annotations or {}),
633 "gco.io/original-job": name,
634 },
635 },
636 "spec": {
637 "parallelism": original_job.spec.parallelism,
638 "completions": original_job.spec.completions,
639 "backoffLimit": original_job.spec.backoff_limit,
640 "template": original_job.spec.template.to_dict(),
641 },
642 }
644 spec_dict = new_job_manifest.get("spec", {})
645 if isinstance(spec_dict, dict): 645 ↛ 650line 645 didn't jump to line 650 because the condition on line 645 was always true
646 template_dict = spec_dict.get("template", {})
647 if isinstance(template_dict, dict) and "status" in template_dict: 647 ↛ 648line 647 didn't jump to line 648 because the condition on line 647 was never true
648 del template_dict["status"]
650 submission_request = ManifestSubmissionRequest(
651 manifests=[new_job_manifest], namespace=namespace, dry_run=False, validate=True
652 )
654 result = await processor.process_manifest_submission(submission_request)
656 response = {
657 "cluster_id": processor.cluster_id,
658 "region": processor.region,
659 "timestamp": datetime.now(UTC).isoformat(),
660 "original_job": name,
661 "new_job": new_name,
662 "namespace": namespace,
663 "success": result.success,
664 "message": (
665 "Job retry created successfully" if result.success else "Failed to create retry job"
666 ),
667 "errors": result.errors,
668 }
670 status_code = 201 if result.success else 400
671 return JSONResponse(status_code=status_code, content=response)
673 except HTTPException:
674 raise
675 except Exception as e:
676 logger.error(f"Error retrying job: {e}")
677 raise HTTPException(status_code=500, detail=f"Internal server error: {e!s}") from e