Coverage for gco / services / template_store.py: 92%
312 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-30 21:47 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-30 21:47 +0000
1"""
2DynamoDB-backed storage for job templates, webhooks, and job records.
4This module provides persistent storage for:
5- Job templates: Reusable job configurations with parameter substitution
6- Webhooks: Event notification registrations
7- Job records: Centralized job tracking with status updates
9Tables are created in the global stack and accessed from all regional services.
11Region Configuration:
12 DynamoDB tables are deployed in the global region (e.g., us-east-2) but
13 accessed from regional services (e.g., us-east-1). The region is determined
14 by checking environment variables in this order:
15 1. DYNAMODB_REGION - Explicitly set for DynamoDB access
16 2. GLOBAL_REGION - The global stack's region
17 3. AWS_REGION - Fallback to current region
19Job Queue Architecture:
20 1. Jobs are submitted to the jobs table with target_region and status="queued"
21 2. Regional manifest processors poll for jobs targeting their region
22 3. Processor claims job (status="claimed"), applies to K8s, updates status
23 4. Status updates flow back to DynamoDB for global visibility
24"""
26from __future__ import annotations
28import json
29import logging
30import os
31from datetime import UTC, datetime
32from enum import StrEnum
33from typing import Any
35import boto3
36from botocore.exceptions import ClientError
38logger = logging.getLogger(__name__)
41def _utc_now_iso() -> str:
42 """Return current UTC time in ISO format with Z suffix."""
43 return datetime.now(UTC).isoformat().replace("+00:00", "Z")
46class JobStatus(StrEnum):
47 """Job status values for the centralized job store."""
49 QUEUED = "queued" # Submitted, waiting for regional pickup
50 CLAIMED = "claimed" # Claimed by a regional processor
51 APPLYING = "applying" # Being applied to Kubernetes
52 PENDING = "pending" # Applied, waiting for pod scheduling
53 RUNNING = "running" # Pod(s) running
54 SUCCEEDED = "succeeded" # Job completed successfully
55 FAILED = "failed" # Job failed
56 CANCELLED = "cancelled" # Job was cancelled
59class TemplateStore:
60 """DynamoDB-backed store for job templates."""
62 def __init__(self, table_name: str | None = None, region: str | None = None):
63 """Initialize the template store.
65 Args:
66 table_name: DynamoDB table name. Defaults to env var TEMPLATES_TABLE_NAME.
67 region: AWS region for DynamoDB. Defaults to env var DYNAMODB_REGION,
68 then GLOBAL_REGION, then AWS_REGION.
69 """
70 self.table_name = table_name or os.getenv("TEMPLATES_TABLE_NAME", "gco-job-templates")
71 # DynamoDB tables are in the global region, not the regional cluster region
72 self.region = (
73 region
74 or os.getenv("DYNAMODB_REGION")
75 or os.getenv("GLOBAL_REGION")
76 or os.getenv("AWS_REGION", "us-east-1")
77 )
78 self._dynamodb = boto3.resource("dynamodb", region_name=self.region)
79 self._table = self._dynamodb.Table(self.table_name)
81 def list_templates(self) -> list[dict[str, Any]]:
82 """List all templates."""
83 try:
84 response = self._table.scan(
85 ProjectionExpression="template_name, description, created_at, updated_at"
86 )
87 items = response.get("Items", [])
89 # Handle pagination
90 while "LastEvaluatedKey" in response:
91 response = self._table.scan(
92 ProjectionExpression="template_name, description, created_at, updated_at",
93 ExclusiveStartKey=response["LastEvaluatedKey"],
94 )
95 items.extend(response.get("Items", []))
97 return [
98 {
99 "name": item["template_name"],
100 "description": item.get("description"),
101 "created_at": item.get("created_at"),
102 "updated_at": item.get("updated_at"),
103 }
104 for item in items
105 ]
106 except ClientError as e:
107 logger.error(f"Failed to list templates: {e}")
108 raise
110 def get_template(self, name: str) -> dict[str, Any] | None:
111 """Get a template by name."""
112 try:
113 response = self._table.get_item(Key={"template_name": name})
114 item = response.get("Item")
115 if not item:
116 return None
118 return {
119 "name": item["template_name"],
120 "description": item.get("description"),
121 "manifest": json.loads(item["manifest"]),
122 "parameters": json.loads(item.get("parameters", "{}")),
123 "created_at": item.get("created_at"),
124 "updated_at": item.get("updated_at"),
125 }
126 except ClientError as e:
127 logger.error(f"Failed to get template {name}: {e}")
128 raise
130 def create_template(
131 self,
132 name: str,
133 manifest: dict[str, Any],
134 description: str | None = None,
135 parameters: dict[str, Any] | None = None,
136 ) -> dict[str, Any]:
137 """Create a new template."""
138 now = _utc_now_iso()
140 item = {
141 "template_name": name,
142 "manifest": json.dumps(manifest),
143 "parameters": json.dumps(parameters or {}),
144 "created_at": now,
145 "updated_at": now,
146 }
147 if description:
148 item["description"] = description
150 try:
151 self._table.put_item(
152 Item=item,
153 ConditionExpression="attribute_not_exists(template_name)",
154 )
155 return {
156 "name": name,
157 "description": description,
158 "manifest": manifest,
159 "parameters": parameters or {},
160 "created_at": now,
161 }
162 except ClientError as e:
163 if e.response["Error"]["Code"] == "ConditionalCheckFailedException":
164 raise ValueError(f"Template '{name}' already exists") from e
165 logger.error(f"Failed to create template {name}: {e}")
166 raise
168 def update_template(
169 self,
170 name: str,
171 manifest: dict[str, Any] | None = None,
172 description: str | None = None,
173 parameters: dict[str, Any] | None = None,
174 ) -> dict[str, Any] | None:
175 """Update an existing template."""
176 now = _utc_now_iso()
178 update_expr_parts = ["updated_at = :updated_at"]
179 expr_values: dict[str, Any] = {":updated_at": now}
181 if manifest is not None:
182 update_expr_parts.append("manifest = :manifest")
183 expr_values[":manifest"] = json.dumps(manifest)
185 if description is not None:
186 update_expr_parts.append("description = :description")
187 expr_values[":description"] = description
189 if parameters is not None: 189 ↛ 190line 189 didn't jump to line 190 because the condition on line 189 was never true
190 update_expr_parts.append("parameters = :parameters")
191 expr_values[":parameters"] = json.dumps(parameters)
193 try:
194 response = self._table.update_item(
195 Key={"template_name": name},
196 UpdateExpression="SET " + ", ".join(update_expr_parts),
197 ExpressionAttributeValues=expr_values,
198 ConditionExpression="attribute_exists(template_name)",
199 ReturnValues="ALL_NEW",
200 )
201 item = response.get("Attributes", {})
202 return {
203 "name": item["template_name"],
204 "description": item.get("description"),
205 "manifest": json.loads(item["manifest"]),
206 "parameters": json.loads(item.get("parameters", "{}")),
207 "created_at": item.get("created_at"),
208 "updated_at": item.get("updated_at"),
209 }
210 except ClientError as e:
211 if e.response["Error"]["Code"] == "ConditionalCheckFailedException":
212 return None
213 logger.error(f"Failed to update template {name}: {e}")
214 raise
216 def delete_template(self, name: str) -> bool:
217 """Delete a template."""
218 try:
219 self._table.delete_item(
220 Key={"template_name": name},
221 ConditionExpression="attribute_exists(template_name)",
222 )
223 return True
224 except ClientError as e:
225 if e.response["Error"]["Code"] == "ConditionalCheckFailedException":
226 return False
227 logger.error(f"Failed to delete template {name}: {e}")
228 raise
230 def template_exists(self, name: str) -> bool:
231 """Check if a template exists."""
232 try:
233 response = self._table.get_item(
234 Key={"template_name": name},
235 ProjectionExpression="template_name",
236 )
237 return "Item" in response
238 except ClientError as e:
239 logger.error(f"Failed to check template existence {name}: {e}")
240 raise
243class WebhookStore:
244 """DynamoDB-backed store for webhooks."""
246 def __init__(self, table_name: str | None = None, region: str | None = None):
247 """Initialize the webhook store.
249 Args:
250 table_name: DynamoDB table name. Defaults to env var WEBHOOKS_TABLE_NAME.
251 region: AWS region for DynamoDB. Defaults to env var DYNAMODB_REGION,
252 then GLOBAL_REGION, then AWS_REGION.
253 """
254 self.table_name = table_name or os.getenv("WEBHOOKS_TABLE_NAME", "gco-webhooks")
255 # DynamoDB tables are in the global region, not the regional cluster region
256 self.region = (
257 region
258 or os.getenv("DYNAMODB_REGION")
259 or os.getenv("GLOBAL_REGION")
260 or os.getenv("AWS_REGION", "us-east-1")
261 )
262 self._dynamodb = boto3.resource("dynamodb", region_name=self.region)
263 self._table = self._dynamodb.Table(self.table_name)
265 def list_webhooks(self, namespace: str | None = None) -> list[dict[str, Any]]:
266 """List webhooks, optionally filtered by namespace."""
267 try:
268 if namespace:
269 response = self._table.query(
270 IndexName="namespace-index",
271 KeyConditionExpression="namespace = :ns",
272 ExpressionAttributeValues={":ns": namespace},
273 )
274 items = response.get("Items", [])
275 else:
276 response = self._table.scan()
277 items = response.get("Items", [])
279 while "LastEvaluatedKey" in response: 279 ↛ 280line 279 didn't jump to line 280 because the condition on line 279 was never true
280 response = self._table.scan(
281 ExclusiveStartKey=response["LastEvaluatedKey"],
282 )
283 items.extend(response.get("Items", []))
285 return [
286 {
287 "id": item["webhook_id"],
288 "url": item["url"],
289 "events": json.loads(item.get("events", "[]")),
290 "namespace": item.get("namespace"),
291 "created_at": item.get("created_at"),
292 }
293 for item in items
294 ]
295 except ClientError as e:
296 logger.error(f"Failed to list webhooks: {e}")
297 raise
299 def get_webhook(self, webhook_id: str) -> dict[str, Any] | None:
300 """Get a webhook by ID."""
301 try:
302 response = self._table.get_item(Key={"webhook_id": webhook_id})
303 item = response.get("Item")
304 if not item:
305 return None
307 return {
308 "id": item["webhook_id"],
309 "url": item["url"],
310 "events": json.loads(item.get("events", "[]")),
311 "namespace": item.get("namespace"),
312 "secret": item.get("secret"),
313 "created_at": item.get("created_at"),
314 }
315 except ClientError as e:
316 logger.error(f"Failed to get webhook {webhook_id}: {e}")
317 raise
319 def create_webhook(
320 self,
321 webhook_id: str,
322 url: str,
323 events: list[str],
324 namespace: str | None = None,
325 secret: str | None = None,
326 ) -> dict[str, Any]:
327 """Create a new webhook."""
328 now = _utc_now_iso()
330 item: dict[str, Any] = {
331 "webhook_id": webhook_id,
332 "url": url,
333 "events": json.dumps(events),
334 "created_at": now,
335 }
336 if namespace: 336 ↛ 338line 336 didn't jump to line 338 because the condition on line 336 was always true
337 item["namespace"] = namespace
338 if secret:
339 item["secret"] = secret
341 try:
342 self._table.put_item(Item=item)
343 return {
344 "id": webhook_id,
345 "url": url,
346 "events": events,
347 "namespace": namespace,
348 "created_at": now,
349 }
350 except ClientError as e:
351 logger.error(f"Failed to create webhook: {e}")
352 raise
354 def delete_webhook(self, webhook_id: str) -> bool:
355 """Delete a webhook."""
356 try:
357 self._table.delete_item(
358 Key={"webhook_id": webhook_id},
359 ConditionExpression="attribute_exists(webhook_id)",
360 )
361 return True
362 except ClientError as e:
363 if e.response["Error"]["Code"] == "ConditionalCheckFailedException":
364 return False
365 logger.error(f"Failed to delete webhook {webhook_id}: {e}")
366 raise
368 def get_webhooks_for_event(
369 self, event: str, namespace: str | None = None
370 ) -> list[dict[str, Any]]:
371 """Get all webhooks subscribed to a specific event."""
372 webhooks = self.list_webhooks(namespace=namespace)
373 return [w for w in webhooks if event in w.get("events", [])]
376class JobStore:
377 """DynamoDB-backed store for centralized job tracking.
379 This store enables:
380 - Global job submission with region targeting
381 - Real-time status tracking across all regions
382 - Job history and audit trail
383 - Cross-region job queries without hitting K8s APIs
384 """
386 def __init__(self, table_name: str | None = None, region: str | None = None):
387 """Initialize the job store.
389 Args:
390 table_name: DynamoDB table name. Defaults to env var JOBS_TABLE_NAME.
391 region: AWS region for DynamoDB. Defaults to env var DYNAMODB_REGION,
392 then GLOBAL_REGION, then AWS_REGION.
393 """
394 self.table_name = table_name or os.getenv("JOBS_TABLE_NAME", "gco-jobs")
395 # DynamoDB tables are in the global region, not the regional cluster region
396 self.region = (
397 region
398 or os.getenv("DYNAMODB_REGION")
399 or os.getenv("GLOBAL_REGION")
400 or os.getenv("AWS_REGION", "us-east-1")
401 )
402 self._dynamodb = boto3.resource("dynamodb", region_name=self.region)
403 self._table = self._dynamodb.Table(self.table_name)
405 def submit_job(
406 self,
407 job_id: str,
408 manifest: dict[str, Any],
409 target_region: str,
410 namespace: str = "gco-jobs",
411 priority: int = 0,
412 labels: dict[str, str] | None = None,
413 submitted_by: str | None = None,
414 ) -> dict[str, Any]:
415 """Submit a job to the centralized queue.
417 Args:
418 job_id: Unique job identifier
419 manifest: Kubernetes job manifest
420 target_region: Region where job should run
421 namespace: Kubernetes namespace
422 priority: Job priority (higher = more important)
423 labels: Optional labels for filtering
424 submitted_by: Optional submitter identifier
426 Returns:
427 Job record with submission details
428 """
429 now = _utc_now_iso()
431 # Extract job name from manifest
432 job_name = manifest.get("metadata", {}).get("name", job_id)
434 item: dict[str, Any] = {
435 "job_id": job_id,
436 "job_name": job_name,
437 "target_region": target_region,
438 "namespace": namespace,
439 "status": JobStatus.QUEUED.value,
440 "priority": priority,
441 "manifest": json.dumps(manifest),
442 "submitted_at": now,
443 "updated_at": now,
444 "status_history": json.dumps(
445 [{"status": JobStatus.QUEUED.value, "timestamp": now, "message": "Job submitted"}]
446 ),
447 }
449 if labels:
450 item["labels"] = json.dumps(labels)
451 if submitted_by:
452 item["submitted_by"] = submitted_by
454 try:
455 self._table.put_item(Item=item)
456 return {
457 "job_id": job_id,
458 "job_name": job_name,
459 "target_region": target_region,
460 "namespace": namespace,
461 "status": JobStatus.QUEUED.value,
462 "priority": priority,
463 "submitted_at": now,
464 }
465 except ClientError as e:
466 logger.error(f"Failed to submit job {job_id}: {e}")
467 raise
469 def claim_job(self, job_id: str, claimed_by: str) -> dict[str, Any] | None:
470 """Claim a queued job for processing.
472 Uses conditional update to prevent race conditions between regions.
474 Args:
475 job_id: Job to claim
476 claimed_by: Identifier of the claiming processor (e.g., region name)
478 Returns:
479 Job record if claimed successfully, None if already claimed
480 """
481 now = _utc_now_iso()
483 try:
484 response = self._table.update_item(
485 Key={"job_id": job_id},
486 UpdateExpression="SET #status = :new_status, claimed_by = :claimed_by, "
487 "claimed_at = :claimed_at, updated_at = :updated_at",
488 ConditionExpression="#status = :queued_status",
489 ExpressionAttributeNames={"#status": "status"},
490 ExpressionAttributeValues={
491 ":new_status": JobStatus.CLAIMED.value,
492 ":queued_status": JobStatus.QUEUED.value,
493 ":claimed_by": claimed_by,
494 ":claimed_at": now,
495 ":updated_at": now,
496 },
497 ReturnValues="ALL_NEW",
498 )
499 item = response.get("Attributes", {})
500 return self._parse_job_item(item)
501 except ClientError as e:
502 if e.response["Error"]["Code"] == "ConditionalCheckFailedException":
503 return None # Job already claimed
504 logger.error(f"Failed to claim job {job_id}: {e}")
505 raise
507 def update_job_status(
508 self,
509 job_id: str,
510 status: JobStatus | str,
511 message: str | None = None,
512 k8s_job_uid: str | None = None,
513 error: str | None = None,
514 ) -> dict[str, Any] | None:
515 """Update job status with history tracking.
517 Args:
518 job_id: Job to update
519 status: New status
520 message: Optional status message
521 k8s_job_uid: Kubernetes job UID (set when job is applied)
522 error: Error message if failed
524 Returns:
525 Updated job record
526 """
527 now = _utc_now_iso()
528 status_value = status.value if isinstance(status, JobStatus) else status
530 # Build update expression
531 update_parts = ["#status = :status", "updated_at = :updated_at"]
532 expr_values: dict[str, Any] = {
533 ":status": status_value,
534 ":updated_at": now,
535 }
537 if k8s_job_uid:
538 update_parts.append("k8s_job_uid = :k8s_uid")
539 expr_values[":k8s_uid"] = k8s_job_uid
541 if error:
542 update_parts.append("error_message = :error")
543 expr_values[":error"] = error
545 if status_value in [JobStatus.SUCCEEDED.value, JobStatus.FAILED.value]:
546 update_parts.append("completed_at = :completed_at")
547 expr_values[":completed_at"] = now
549 try:
550 response = self._table.update_item(
551 Key={"job_id": job_id},
552 UpdateExpression="SET " + ", ".join(update_parts),
553 ExpressionAttributeNames={"#status": "status"},
554 ExpressionAttributeValues=expr_values,
555 ReturnValues="ALL_NEW",
556 )
557 item = response.get("Attributes", {})
559 # Append to status history (separate update to avoid conflicts)
560 history_entry = {"status": status_value, "timestamp": now}
561 if message:
562 history_entry["message"] = message
563 if error:
564 history_entry["error"] = error
566 self._append_status_history(job_id, history_entry)
568 return self._parse_job_item(item)
569 except ClientError as e:
570 logger.error(f"Failed to update job {job_id}: {e}")
571 raise
573 def _append_status_history(self, job_id: str, entry: dict[str, Any]) -> None:
574 """Append an entry to job status history."""
575 try:
576 # Get current history
577 response = self._table.get_item(
578 Key={"job_id": job_id},
579 ProjectionExpression="status_history",
580 )
581 current = json.loads(response.get("Item", {}).get("status_history", "[]"))
582 current.append(entry)
584 # Update with new history
585 self._table.update_item(
586 Key={"job_id": job_id},
587 UpdateExpression="SET status_history = :history",
588 ExpressionAttributeValues={":history": json.dumps(current)},
589 )
590 except ClientError as e:
591 logger.warning(f"Failed to update status history for {job_id}: {e}")
593 def get_job(self, job_id: str) -> dict[str, Any] | None:
594 """Get a job by ID."""
595 try:
596 response = self._table.get_item(Key={"job_id": job_id})
597 item = response.get("Item")
598 if not item:
599 return None
600 return self._parse_job_item(item)
601 except ClientError as e:
602 logger.error(f"Failed to get job {job_id}: {e}")
603 raise
605 def list_jobs(
606 self,
607 target_region: str | None = None,
608 status: str | None = None,
609 namespace: str | None = None,
610 limit: int = 100,
611 ) -> list[dict[str, Any]]:
612 """List jobs with optional filters.
614 Args:
615 target_region: Filter by target region
616 status: Filter by status
617 namespace: Filter by namespace
618 limit: Maximum results
620 Returns:
621 List of job records
622 """
623 try:
624 # Build filter expression
625 filter_parts = []
626 expr_values: dict[str, Any] = {}
627 expr_names: dict[str, str] = {}
629 if target_region:
630 filter_parts.append("target_region = :region")
631 expr_values[":region"] = target_region
633 if status:
634 filter_parts.append("#status = :status")
635 expr_values[":status"] = status
636 expr_names["#status"] = "status"
638 if namespace: 638 ↛ 639line 638 didn't jump to line 639 because the condition on line 638 was never true
639 filter_parts.append("#ns = :namespace")
640 expr_values[":namespace"] = namespace
641 expr_names["#ns"] = "namespace"
643 scan_kwargs: dict[str, Any] = {"Limit": limit}
644 if filter_parts:
645 scan_kwargs["FilterExpression"] = " AND ".join(filter_parts)
646 scan_kwargs["ExpressionAttributeValues"] = expr_values
647 if expr_names:
648 scan_kwargs["ExpressionAttributeNames"] = expr_names
650 response = self._table.scan(**scan_kwargs)
651 items = response.get("Items", [])
653 return [self._parse_job_item(item) for item in items]
654 except ClientError as e:
655 logger.error(f"Failed to list jobs: {e}")
656 raise
658 def get_queued_jobs_for_region(self, region: str, limit: int = 10) -> list[dict[str, Any]]:
659 """Get queued jobs targeting a specific region.
661 Used by regional processors to poll for work.
663 Args:
664 region: Target region
665 limit: Maximum jobs to return
667 Returns:
668 List of queued jobs sorted by priority (descending)
669 """
670 try:
671 response = self._table.query(
672 IndexName="region-status-index",
673 KeyConditionExpression="target_region = :region AND #status = :status",
674 ExpressionAttributeNames={"#status": "status"},
675 ExpressionAttributeValues={
676 ":region": region,
677 ":status": JobStatus.QUEUED.value,
678 },
679 Limit=limit,
680 ScanIndexForward=False, # Descending order
681 )
682 items = response.get("Items", [])
684 # Sort by priority (higher first)
685 jobs = [self._parse_job_item(item) for item in items]
686 return sorted(jobs, key=lambda j: j.get("priority", 0), reverse=True)
687 except ClientError as e:
688 logger.error(f"Failed to get queued jobs for {region}: {e}")
689 raise
691 def get_job_counts_by_region(self) -> dict[str, dict[str, int]]:
692 """Get job counts grouped by region and status.
694 Returns:
695 Dict mapping region -> status -> count
696 """
697 try:
698 response = self._table.scan(
699 ProjectionExpression="target_region, #status",
700 ExpressionAttributeNames={"#status": "status"},
701 )
702 items = response.get("Items", [])
704 # Handle pagination
705 while "LastEvaluatedKey" in response: 705 ↛ 706line 705 didn't jump to line 706 because the condition on line 705 was never true
706 response = self._table.scan(
707 ProjectionExpression="target_region, #status",
708 ExpressionAttributeNames={"#status": "status"},
709 ExclusiveStartKey=response["LastEvaluatedKey"],
710 )
711 items.extend(response.get("Items", []))
713 # Aggregate counts
714 counts: dict[str, dict[str, int]] = {}
715 for item in items:
716 region = item.get("target_region", "unknown")
717 status = item.get("status", "unknown")
718 if region not in counts:
719 counts[region] = {}
720 counts[region][status] = counts[region].get(status, 0) + 1
722 return counts
723 except ClientError as e:
724 logger.error(f"Failed to get job counts: {e}")
725 raise
727 def cancel_job(self, job_id: str, reason: str | None = None) -> bool:
728 """Cancel a job if it's still in a cancellable state.
730 Args:
731 job_id: Job to cancel
732 reason: Optional cancellation reason
734 Returns:
735 True if cancelled, False if not cancellable
736 """
737 cancellable_statuses = [JobStatus.QUEUED.value, JobStatus.CLAIMED.value]
739 try:
740 self._table.update_item(
741 Key={"job_id": job_id},
742 UpdateExpression="SET #status = :cancelled, updated_at = :now, "
743 "cancelled_at = :now, cancel_reason = :reason",
744 ConditionExpression="#status IN (:s1, :s2)",
745 ExpressionAttributeNames={"#status": "status"},
746 ExpressionAttributeValues={
747 ":cancelled": JobStatus.CANCELLED.value,
748 ":now": _utc_now_iso(),
749 ":reason": reason or "Cancelled by user",
750 ":s1": cancellable_statuses[0],
751 ":s2": cancellable_statuses[1],
752 },
753 )
754 return True
755 except ClientError as e:
756 if e.response["Error"]["Code"] == "ConditionalCheckFailedException":
757 return False
758 logger.error(f"Failed to cancel job {job_id}: {e}")
759 raise
761 def _parse_job_item(self, item: dict[str, Any]) -> dict[str, Any]:
762 """Parse a DynamoDB item into a job record."""
763 return {
764 "job_id": item.get("job_id"),
765 "job_name": item.get("job_name"),
766 "target_region": item.get("target_region"),
767 "namespace": item.get("namespace"),
768 "status": item.get("status"),
769 "priority": int(item.get("priority", 0)),
770 "manifest": json.loads(item.get("manifest", "{}")),
771 "labels": json.loads(item.get("labels", "{}")),
772 "submitted_at": item.get("submitted_at"),
773 "submitted_by": item.get("submitted_by"),
774 "claimed_by": item.get("claimed_by"),
775 "claimed_at": item.get("claimed_at"),
776 "completed_at": item.get("completed_at"),
777 "updated_at": item.get("updated_at"),
778 "k8s_job_uid": item.get("k8s_job_uid"),
779 "error_message": item.get("error_message"),
780 "status_history": json.loads(item.get("status_history", "[]")),
781 }
784# Singleton instances for use in the API
785_template_store: TemplateStore | None = None
786_webhook_store: WebhookStore | None = None
787_job_store: JobStore | None = None
790def get_template_store() -> TemplateStore:
791 """Get or create the template store singleton."""
792 global _template_store
793 if _template_store is None:
794 _template_store = TemplateStore()
795 return _template_store
798def get_webhook_store() -> WebhookStore:
799 """Get or create the webhook store singleton."""
800 global _webhook_store
801 if _webhook_store is None:
802 _webhook_store = WebhookStore()
803 return _webhook_store
806def get_job_store() -> JobStore:
807 """Get or create the job store singleton."""
808 global _job_store
809 if _job_store is None:
810 _job_store = JobStore()
811 return _job_store