Coverage for gco / services / template_store.py: 92%

312 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-30 21:47 +0000

1""" 

2DynamoDB-backed storage for job templates, webhooks, and job records. 

3 

4This module provides persistent storage for: 

5- Job templates: Reusable job configurations with parameter substitution 

6- Webhooks: Event notification registrations 

7- Job records: Centralized job tracking with status updates 

8 

9Tables are created in the global stack and accessed from all regional services. 

10 

11Region Configuration: 

12 DynamoDB tables are deployed in the global region (e.g., us-east-2) but 

13 accessed from regional services (e.g., us-east-1). The region is determined 

14 by checking environment variables in this order: 

15 1. DYNAMODB_REGION - Explicitly set for DynamoDB access 

16 2. GLOBAL_REGION - The global stack's region 

17 3. AWS_REGION - Fallback to current region 

18 

19Job Queue Architecture: 

20 1. Jobs are submitted to the jobs table with target_region and status="queued" 

21 2. Regional manifest processors poll for jobs targeting their region 

22 3. Processor claims job (status="claimed"), applies to K8s, updates status 

23 4. Status updates flow back to DynamoDB for global visibility 

24""" 

25 

26from __future__ import annotations 

27 

28import json 

29import logging 

30import os 

31from datetime import UTC, datetime 

32from enum import StrEnum 

33from typing import Any 

34 

35import boto3 

36from botocore.exceptions import ClientError 

37 

38logger = logging.getLogger(__name__) 

39 

40 

41def _utc_now_iso() -> str: 

42 """Return current UTC time in ISO format with Z suffix.""" 

43 return datetime.now(UTC).isoformat().replace("+00:00", "Z") 

44 

45 

46class JobStatus(StrEnum): 

47 """Job status values for the centralized job store.""" 

48 

49 QUEUED = "queued" # Submitted, waiting for regional pickup 

50 CLAIMED = "claimed" # Claimed by a regional processor 

51 APPLYING = "applying" # Being applied to Kubernetes 

52 PENDING = "pending" # Applied, waiting for pod scheduling 

53 RUNNING = "running" # Pod(s) running 

54 SUCCEEDED = "succeeded" # Job completed successfully 

55 FAILED = "failed" # Job failed 

56 CANCELLED = "cancelled" # Job was cancelled 

57 

58 

59class TemplateStore: 

60 """DynamoDB-backed store for job templates.""" 

61 

62 def __init__(self, table_name: str | None = None, region: str | None = None): 

63 """Initialize the template store. 

64 

65 Args: 

66 table_name: DynamoDB table name. Defaults to env var TEMPLATES_TABLE_NAME. 

67 region: AWS region for DynamoDB. Defaults to env var DYNAMODB_REGION, 

68 then GLOBAL_REGION, then AWS_REGION. 

69 """ 

70 self.table_name = table_name or os.getenv("TEMPLATES_TABLE_NAME", "gco-job-templates") 

71 # DynamoDB tables are in the global region, not the regional cluster region 

72 self.region = ( 

73 region 

74 or os.getenv("DYNAMODB_REGION") 

75 or os.getenv("GLOBAL_REGION") 

76 or os.getenv("AWS_REGION", "us-east-1") 

77 ) 

78 self._dynamodb = boto3.resource("dynamodb", region_name=self.region) 

79 self._table = self._dynamodb.Table(self.table_name) 

80 

81 def list_templates(self) -> list[dict[str, Any]]: 

82 """List all templates.""" 

83 try: 

84 response = self._table.scan( 

85 ProjectionExpression="template_name, description, created_at, updated_at" 

86 ) 

87 items = response.get("Items", []) 

88 

89 # Handle pagination 

90 while "LastEvaluatedKey" in response: 

91 response = self._table.scan( 

92 ProjectionExpression="template_name, description, created_at, updated_at", 

93 ExclusiveStartKey=response["LastEvaluatedKey"], 

94 ) 

95 items.extend(response.get("Items", [])) 

96 

97 return [ 

98 { 

99 "name": item["template_name"], 

100 "description": item.get("description"), 

101 "created_at": item.get("created_at"), 

102 "updated_at": item.get("updated_at"), 

103 } 

104 for item in items 

105 ] 

106 except ClientError as e: 

107 logger.error(f"Failed to list templates: {e}") 

108 raise 

109 

110 def get_template(self, name: str) -> dict[str, Any] | None: 

111 """Get a template by name.""" 

112 try: 

113 response = self._table.get_item(Key={"template_name": name}) 

114 item = response.get("Item") 

115 if not item: 

116 return None 

117 

118 return { 

119 "name": item["template_name"], 

120 "description": item.get("description"), 

121 "manifest": json.loads(item["manifest"]), 

122 "parameters": json.loads(item.get("parameters", "{}")), 

123 "created_at": item.get("created_at"), 

124 "updated_at": item.get("updated_at"), 

125 } 

126 except ClientError as e: 

127 logger.error(f"Failed to get template {name}: {e}") 

128 raise 

129 

130 def create_template( 

131 self, 

132 name: str, 

133 manifest: dict[str, Any], 

134 description: str | None = None, 

135 parameters: dict[str, Any] | None = None, 

136 ) -> dict[str, Any]: 

137 """Create a new template.""" 

138 now = _utc_now_iso() 

139 

140 item = { 

141 "template_name": name, 

142 "manifest": json.dumps(manifest), 

143 "parameters": json.dumps(parameters or {}), 

144 "created_at": now, 

145 "updated_at": now, 

146 } 

147 if description: 

148 item["description"] = description 

149 

150 try: 

151 self._table.put_item( 

152 Item=item, 

153 ConditionExpression="attribute_not_exists(template_name)", 

154 ) 

155 return { 

156 "name": name, 

157 "description": description, 

158 "manifest": manifest, 

159 "parameters": parameters or {}, 

160 "created_at": now, 

161 } 

162 except ClientError as e: 

163 if e.response["Error"]["Code"] == "ConditionalCheckFailedException": 

164 raise ValueError(f"Template '{name}' already exists") from e 

165 logger.error(f"Failed to create template {name}: {e}") 

166 raise 

167 

168 def update_template( 

169 self, 

170 name: str, 

171 manifest: dict[str, Any] | None = None, 

172 description: str | None = None, 

173 parameters: dict[str, Any] | None = None, 

174 ) -> dict[str, Any] | None: 

175 """Update an existing template.""" 

176 now = _utc_now_iso() 

177 

178 update_expr_parts = ["updated_at = :updated_at"] 

179 expr_values: dict[str, Any] = {":updated_at": now} 

180 

181 if manifest is not None: 

182 update_expr_parts.append("manifest = :manifest") 

183 expr_values[":manifest"] = json.dumps(manifest) 

184 

185 if description is not None: 

186 update_expr_parts.append("description = :description") 

187 expr_values[":description"] = description 

188 

189 if parameters is not None: 189 ↛ 190line 189 didn't jump to line 190 because the condition on line 189 was never true

190 update_expr_parts.append("parameters = :parameters") 

191 expr_values[":parameters"] = json.dumps(parameters) 

192 

193 try: 

194 response = self._table.update_item( 

195 Key={"template_name": name}, 

196 UpdateExpression="SET " + ", ".join(update_expr_parts), 

197 ExpressionAttributeValues=expr_values, 

198 ConditionExpression="attribute_exists(template_name)", 

199 ReturnValues="ALL_NEW", 

200 ) 

201 item = response.get("Attributes", {}) 

202 return { 

203 "name": item["template_name"], 

204 "description": item.get("description"), 

205 "manifest": json.loads(item["manifest"]), 

206 "parameters": json.loads(item.get("parameters", "{}")), 

207 "created_at": item.get("created_at"), 

208 "updated_at": item.get("updated_at"), 

209 } 

210 except ClientError as e: 

211 if e.response["Error"]["Code"] == "ConditionalCheckFailedException": 

212 return None 

213 logger.error(f"Failed to update template {name}: {e}") 

214 raise 

215 

216 def delete_template(self, name: str) -> bool: 

217 """Delete a template.""" 

218 try: 

219 self._table.delete_item( 

220 Key={"template_name": name}, 

221 ConditionExpression="attribute_exists(template_name)", 

222 ) 

223 return True 

224 except ClientError as e: 

225 if e.response["Error"]["Code"] == "ConditionalCheckFailedException": 

226 return False 

227 logger.error(f"Failed to delete template {name}: {e}") 

228 raise 

229 

230 def template_exists(self, name: str) -> bool: 

231 """Check if a template exists.""" 

232 try: 

233 response = self._table.get_item( 

234 Key={"template_name": name}, 

235 ProjectionExpression="template_name", 

236 ) 

237 return "Item" in response 

238 except ClientError as e: 

239 logger.error(f"Failed to check template existence {name}: {e}") 

240 raise 

241 

242 

243class WebhookStore: 

244 """DynamoDB-backed store for webhooks.""" 

245 

246 def __init__(self, table_name: str | None = None, region: str | None = None): 

247 """Initialize the webhook store. 

248 

249 Args: 

250 table_name: DynamoDB table name. Defaults to env var WEBHOOKS_TABLE_NAME. 

251 region: AWS region for DynamoDB. Defaults to env var DYNAMODB_REGION, 

252 then GLOBAL_REGION, then AWS_REGION. 

253 """ 

254 self.table_name = table_name or os.getenv("WEBHOOKS_TABLE_NAME", "gco-webhooks") 

255 # DynamoDB tables are in the global region, not the regional cluster region 

256 self.region = ( 

257 region 

258 or os.getenv("DYNAMODB_REGION") 

259 or os.getenv("GLOBAL_REGION") 

260 or os.getenv("AWS_REGION", "us-east-1") 

261 ) 

262 self._dynamodb = boto3.resource("dynamodb", region_name=self.region) 

263 self._table = self._dynamodb.Table(self.table_name) 

264 

265 def list_webhooks(self, namespace: str | None = None) -> list[dict[str, Any]]: 

266 """List webhooks, optionally filtered by namespace.""" 

267 try: 

268 if namespace: 

269 response = self._table.query( 

270 IndexName="namespace-index", 

271 KeyConditionExpression="namespace = :ns", 

272 ExpressionAttributeValues={":ns": namespace}, 

273 ) 

274 items = response.get("Items", []) 

275 else: 

276 response = self._table.scan() 

277 items = response.get("Items", []) 

278 

279 while "LastEvaluatedKey" in response: 279 ↛ 280line 279 didn't jump to line 280 because the condition on line 279 was never true

280 response = self._table.scan( 

281 ExclusiveStartKey=response["LastEvaluatedKey"], 

282 ) 

283 items.extend(response.get("Items", [])) 

284 

285 return [ 

286 { 

287 "id": item["webhook_id"], 

288 "url": item["url"], 

289 "events": json.loads(item.get("events", "[]")), 

290 "namespace": item.get("namespace"), 

291 "created_at": item.get("created_at"), 

292 } 

293 for item in items 

294 ] 

295 except ClientError as e: 

296 logger.error(f"Failed to list webhooks: {e}") 

297 raise 

298 

299 def get_webhook(self, webhook_id: str) -> dict[str, Any] | None: 

300 """Get a webhook by ID.""" 

301 try: 

302 response = self._table.get_item(Key={"webhook_id": webhook_id}) 

303 item = response.get("Item") 

304 if not item: 

305 return None 

306 

307 return { 

308 "id": item["webhook_id"], 

309 "url": item["url"], 

310 "events": json.loads(item.get("events", "[]")), 

311 "namespace": item.get("namespace"), 

312 "secret": item.get("secret"), 

313 "created_at": item.get("created_at"), 

314 } 

315 except ClientError as e: 

316 logger.error(f"Failed to get webhook {webhook_id}: {e}") 

317 raise 

318 

319 def create_webhook( 

320 self, 

321 webhook_id: str, 

322 url: str, 

323 events: list[str], 

324 namespace: str | None = None, 

325 secret: str | None = None, 

326 ) -> dict[str, Any]: 

327 """Create a new webhook.""" 

328 now = _utc_now_iso() 

329 

330 item: dict[str, Any] = { 

331 "webhook_id": webhook_id, 

332 "url": url, 

333 "events": json.dumps(events), 

334 "created_at": now, 

335 } 

336 if namespace: 336 ↛ 338line 336 didn't jump to line 338 because the condition on line 336 was always true

337 item["namespace"] = namespace 

338 if secret: 

339 item["secret"] = secret 

340 

341 try: 

342 self._table.put_item(Item=item) 

343 return { 

344 "id": webhook_id, 

345 "url": url, 

346 "events": events, 

347 "namespace": namespace, 

348 "created_at": now, 

349 } 

350 except ClientError as e: 

351 logger.error(f"Failed to create webhook: {e}") 

352 raise 

353 

354 def delete_webhook(self, webhook_id: str) -> bool: 

355 """Delete a webhook.""" 

356 try: 

357 self._table.delete_item( 

358 Key={"webhook_id": webhook_id}, 

359 ConditionExpression="attribute_exists(webhook_id)", 

360 ) 

361 return True 

362 except ClientError as e: 

363 if e.response["Error"]["Code"] == "ConditionalCheckFailedException": 

364 return False 

365 logger.error(f"Failed to delete webhook {webhook_id}: {e}") 

366 raise 

367 

368 def get_webhooks_for_event( 

369 self, event: str, namespace: str | None = None 

370 ) -> list[dict[str, Any]]: 

371 """Get all webhooks subscribed to a specific event.""" 

372 webhooks = self.list_webhooks(namespace=namespace) 

373 return [w for w in webhooks if event in w.get("events", [])] 

374 

375 

376class JobStore: 

377 """DynamoDB-backed store for centralized job tracking. 

378 

379 This store enables: 

380 - Global job submission with region targeting 

381 - Real-time status tracking across all regions 

382 - Job history and audit trail 

383 - Cross-region job queries without hitting K8s APIs 

384 """ 

385 

386 def __init__(self, table_name: str | None = None, region: str | None = None): 

387 """Initialize the job store. 

388 

389 Args: 

390 table_name: DynamoDB table name. Defaults to env var JOBS_TABLE_NAME. 

391 region: AWS region for DynamoDB. Defaults to env var DYNAMODB_REGION, 

392 then GLOBAL_REGION, then AWS_REGION. 

393 """ 

394 self.table_name = table_name or os.getenv("JOBS_TABLE_NAME", "gco-jobs") 

395 # DynamoDB tables are in the global region, not the regional cluster region 

396 self.region = ( 

397 region 

398 or os.getenv("DYNAMODB_REGION") 

399 or os.getenv("GLOBAL_REGION") 

400 or os.getenv("AWS_REGION", "us-east-1") 

401 ) 

402 self._dynamodb = boto3.resource("dynamodb", region_name=self.region) 

403 self._table = self._dynamodb.Table(self.table_name) 

404 

405 def submit_job( 

406 self, 

407 job_id: str, 

408 manifest: dict[str, Any], 

409 target_region: str, 

410 namespace: str = "gco-jobs", 

411 priority: int = 0, 

412 labels: dict[str, str] | None = None, 

413 submitted_by: str | None = None, 

414 ) -> dict[str, Any]: 

415 """Submit a job to the centralized queue. 

416 

417 Args: 

418 job_id: Unique job identifier 

419 manifest: Kubernetes job manifest 

420 target_region: Region where job should run 

421 namespace: Kubernetes namespace 

422 priority: Job priority (higher = more important) 

423 labels: Optional labels for filtering 

424 submitted_by: Optional submitter identifier 

425 

426 Returns: 

427 Job record with submission details 

428 """ 

429 now = _utc_now_iso() 

430 

431 # Extract job name from manifest 

432 job_name = manifest.get("metadata", {}).get("name", job_id) 

433 

434 item: dict[str, Any] = { 

435 "job_id": job_id, 

436 "job_name": job_name, 

437 "target_region": target_region, 

438 "namespace": namespace, 

439 "status": JobStatus.QUEUED.value, 

440 "priority": priority, 

441 "manifest": json.dumps(manifest), 

442 "submitted_at": now, 

443 "updated_at": now, 

444 "status_history": json.dumps( 

445 [{"status": JobStatus.QUEUED.value, "timestamp": now, "message": "Job submitted"}] 

446 ), 

447 } 

448 

449 if labels: 

450 item["labels"] = json.dumps(labels) 

451 if submitted_by: 

452 item["submitted_by"] = submitted_by 

453 

454 try: 

455 self._table.put_item(Item=item) 

456 return { 

457 "job_id": job_id, 

458 "job_name": job_name, 

459 "target_region": target_region, 

460 "namespace": namespace, 

461 "status": JobStatus.QUEUED.value, 

462 "priority": priority, 

463 "submitted_at": now, 

464 } 

465 except ClientError as e: 

466 logger.error(f"Failed to submit job {job_id}: {e}") 

467 raise 

468 

469 def claim_job(self, job_id: str, claimed_by: str) -> dict[str, Any] | None: 

470 """Claim a queued job for processing. 

471 

472 Uses conditional update to prevent race conditions between regions. 

473 

474 Args: 

475 job_id: Job to claim 

476 claimed_by: Identifier of the claiming processor (e.g., region name) 

477 

478 Returns: 

479 Job record if claimed successfully, None if already claimed 

480 """ 

481 now = _utc_now_iso() 

482 

483 try: 

484 response = self._table.update_item( 

485 Key={"job_id": job_id}, 

486 UpdateExpression="SET #status = :new_status, claimed_by = :claimed_by, " 

487 "claimed_at = :claimed_at, updated_at = :updated_at", 

488 ConditionExpression="#status = :queued_status", 

489 ExpressionAttributeNames={"#status": "status"}, 

490 ExpressionAttributeValues={ 

491 ":new_status": JobStatus.CLAIMED.value, 

492 ":queued_status": JobStatus.QUEUED.value, 

493 ":claimed_by": claimed_by, 

494 ":claimed_at": now, 

495 ":updated_at": now, 

496 }, 

497 ReturnValues="ALL_NEW", 

498 ) 

499 item = response.get("Attributes", {}) 

500 return self._parse_job_item(item) 

501 except ClientError as e: 

502 if e.response["Error"]["Code"] == "ConditionalCheckFailedException": 

503 return None # Job already claimed 

504 logger.error(f"Failed to claim job {job_id}: {e}") 

505 raise 

506 

507 def update_job_status( 

508 self, 

509 job_id: str, 

510 status: JobStatus | str, 

511 message: str | None = None, 

512 k8s_job_uid: str | None = None, 

513 error: str | None = None, 

514 ) -> dict[str, Any] | None: 

515 """Update job status with history tracking. 

516 

517 Args: 

518 job_id: Job to update 

519 status: New status 

520 message: Optional status message 

521 k8s_job_uid: Kubernetes job UID (set when job is applied) 

522 error: Error message if failed 

523 

524 Returns: 

525 Updated job record 

526 """ 

527 now = _utc_now_iso() 

528 status_value = status.value if isinstance(status, JobStatus) else status 

529 

530 # Build update expression 

531 update_parts = ["#status = :status", "updated_at = :updated_at"] 

532 expr_values: dict[str, Any] = { 

533 ":status": status_value, 

534 ":updated_at": now, 

535 } 

536 

537 if k8s_job_uid: 

538 update_parts.append("k8s_job_uid = :k8s_uid") 

539 expr_values[":k8s_uid"] = k8s_job_uid 

540 

541 if error: 

542 update_parts.append("error_message = :error") 

543 expr_values[":error"] = error 

544 

545 if status_value in [JobStatus.SUCCEEDED.value, JobStatus.FAILED.value]: 

546 update_parts.append("completed_at = :completed_at") 

547 expr_values[":completed_at"] = now 

548 

549 try: 

550 response = self._table.update_item( 

551 Key={"job_id": job_id}, 

552 UpdateExpression="SET " + ", ".join(update_parts), 

553 ExpressionAttributeNames={"#status": "status"}, 

554 ExpressionAttributeValues=expr_values, 

555 ReturnValues="ALL_NEW", 

556 ) 

557 item = response.get("Attributes", {}) 

558 

559 # Append to status history (separate update to avoid conflicts) 

560 history_entry = {"status": status_value, "timestamp": now} 

561 if message: 

562 history_entry["message"] = message 

563 if error: 

564 history_entry["error"] = error 

565 

566 self._append_status_history(job_id, history_entry) 

567 

568 return self._parse_job_item(item) 

569 except ClientError as e: 

570 logger.error(f"Failed to update job {job_id}: {e}") 

571 raise 

572 

573 def _append_status_history(self, job_id: str, entry: dict[str, Any]) -> None: 

574 """Append an entry to job status history.""" 

575 try: 

576 # Get current history 

577 response = self._table.get_item( 

578 Key={"job_id": job_id}, 

579 ProjectionExpression="status_history", 

580 ) 

581 current = json.loads(response.get("Item", {}).get("status_history", "[]")) 

582 current.append(entry) 

583 

584 # Update with new history 

585 self._table.update_item( 

586 Key={"job_id": job_id}, 

587 UpdateExpression="SET status_history = :history", 

588 ExpressionAttributeValues={":history": json.dumps(current)}, 

589 ) 

590 except ClientError as e: 

591 logger.warning(f"Failed to update status history for {job_id}: {e}") 

592 

593 def get_job(self, job_id: str) -> dict[str, Any] | None: 

594 """Get a job by ID.""" 

595 try: 

596 response = self._table.get_item(Key={"job_id": job_id}) 

597 item = response.get("Item") 

598 if not item: 

599 return None 

600 return self._parse_job_item(item) 

601 except ClientError as e: 

602 logger.error(f"Failed to get job {job_id}: {e}") 

603 raise 

604 

605 def list_jobs( 

606 self, 

607 target_region: str | None = None, 

608 status: str | None = None, 

609 namespace: str | None = None, 

610 limit: int = 100, 

611 ) -> list[dict[str, Any]]: 

612 """List jobs with optional filters. 

613 

614 Args: 

615 target_region: Filter by target region 

616 status: Filter by status 

617 namespace: Filter by namespace 

618 limit: Maximum results 

619 

620 Returns: 

621 List of job records 

622 """ 

623 try: 

624 # Build filter expression 

625 filter_parts = [] 

626 expr_values: dict[str, Any] = {} 

627 expr_names: dict[str, str] = {} 

628 

629 if target_region: 

630 filter_parts.append("target_region = :region") 

631 expr_values[":region"] = target_region 

632 

633 if status: 

634 filter_parts.append("#status = :status") 

635 expr_values[":status"] = status 

636 expr_names["#status"] = "status" 

637 

638 if namespace: 638 ↛ 639line 638 didn't jump to line 639 because the condition on line 638 was never true

639 filter_parts.append("#ns = :namespace") 

640 expr_values[":namespace"] = namespace 

641 expr_names["#ns"] = "namespace" 

642 

643 scan_kwargs: dict[str, Any] = {"Limit": limit} 

644 if filter_parts: 

645 scan_kwargs["FilterExpression"] = " AND ".join(filter_parts) 

646 scan_kwargs["ExpressionAttributeValues"] = expr_values 

647 if expr_names: 

648 scan_kwargs["ExpressionAttributeNames"] = expr_names 

649 

650 response = self._table.scan(**scan_kwargs) 

651 items = response.get("Items", []) 

652 

653 return [self._parse_job_item(item) for item in items] 

654 except ClientError as e: 

655 logger.error(f"Failed to list jobs: {e}") 

656 raise 

657 

658 def get_queued_jobs_for_region(self, region: str, limit: int = 10) -> list[dict[str, Any]]: 

659 """Get queued jobs targeting a specific region. 

660 

661 Used by regional processors to poll for work. 

662 

663 Args: 

664 region: Target region 

665 limit: Maximum jobs to return 

666 

667 Returns: 

668 List of queued jobs sorted by priority (descending) 

669 """ 

670 try: 

671 response = self._table.query( 

672 IndexName="region-status-index", 

673 KeyConditionExpression="target_region = :region AND #status = :status", 

674 ExpressionAttributeNames={"#status": "status"}, 

675 ExpressionAttributeValues={ 

676 ":region": region, 

677 ":status": JobStatus.QUEUED.value, 

678 }, 

679 Limit=limit, 

680 ScanIndexForward=False, # Descending order 

681 ) 

682 items = response.get("Items", []) 

683 

684 # Sort by priority (higher first) 

685 jobs = [self._parse_job_item(item) for item in items] 

686 return sorted(jobs, key=lambda j: j.get("priority", 0), reverse=True) 

687 except ClientError as e: 

688 logger.error(f"Failed to get queued jobs for {region}: {e}") 

689 raise 

690 

691 def get_job_counts_by_region(self) -> dict[str, dict[str, int]]: 

692 """Get job counts grouped by region and status. 

693 

694 Returns: 

695 Dict mapping region -> status -> count 

696 """ 

697 try: 

698 response = self._table.scan( 

699 ProjectionExpression="target_region, #status", 

700 ExpressionAttributeNames={"#status": "status"}, 

701 ) 

702 items = response.get("Items", []) 

703 

704 # Handle pagination 

705 while "LastEvaluatedKey" in response: 705 ↛ 706line 705 didn't jump to line 706 because the condition on line 705 was never true

706 response = self._table.scan( 

707 ProjectionExpression="target_region, #status", 

708 ExpressionAttributeNames={"#status": "status"}, 

709 ExclusiveStartKey=response["LastEvaluatedKey"], 

710 ) 

711 items.extend(response.get("Items", [])) 

712 

713 # Aggregate counts 

714 counts: dict[str, dict[str, int]] = {} 

715 for item in items: 

716 region = item.get("target_region", "unknown") 

717 status = item.get("status", "unknown") 

718 if region not in counts: 

719 counts[region] = {} 

720 counts[region][status] = counts[region].get(status, 0) + 1 

721 

722 return counts 

723 except ClientError as e: 

724 logger.error(f"Failed to get job counts: {e}") 

725 raise 

726 

727 def cancel_job(self, job_id: str, reason: str | None = None) -> bool: 

728 """Cancel a job if it's still in a cancellable state. 

729 

730 Args: 

731 job_id: Job to cancel 

732 reason: Optional cancellation reason 

733 

734 Returns: 

735 True if cancelled, False if not cancellable 

736 """ 

737 cancellable_statuses = [JobStatus.QUEUED.value, JobStatus.CLAIMED.value] 

738 

739 try: 

740 self._table.update_item( 

741 Key={"job_id": job_id}, 

742 UpdateExpression="SET #status = :cancelled, updated_at = :now, " 

743 "cancelled_at = :now, cancel_reason = :reason", 

744 ConditionExpression="#status IN (:s1, :s2)", 

745 ExpressionAttributeNames={"#status": "status"}, 

746 ExpressionAttributeValues={ 

747 ":cancelled": JobStatus.CANCELLED.value, 

748 ":now": _utc_now_iso(), 

749 ":reason": reason or "Cancelled by user", 

750 ":s1": cancellable_statuses[0], 

751 ":s2": cancellable_statuses[1], 

752 }, 

753 ) 

754 return True 

755 except ClientError as e: 

756 if e.response["Error"]["Code"] == "ConditionalCheckFailedException": 

757 return False 

758 logger.error(f"Failed to cancel job {job_id}: {e}") 

759 raise 

760 

761 def _parse_job_item(self, item: dict[str, Any]) -> dict[str, Any]: 

762 """Parse a DynamoDB item into a job record.""" 

763 return { 

764 "job_id": item.get("job_id"), 

765 "job_name": item.get("job_name"), 

766 "target_region": item.get("target_region"), 

767 "namespace": item.get("namespace"), 

768 "status": item.get("status"), 

769 "priority": int(item.get("priority", 0)), 

770 "manifest": json.loads(item.get("manifest", "{}")), 

771 "labels": json.loads(item.get("labels", "{}")), 

772 "submitted_at": item.get("submitted_at"), 

773 "submitted_by": item.get("submitted_by"), 

774 "claimed_by": item.get("claimed_by"), 

775 "claimed_at": item.get("claimed_at"), 

776 "completed_at": item.get("completed_at"), 

777 "updated_at": item.get("updated_at"), 

778 "k8s_job_uid": item.get("k8s_job_uid"), 

779 "error_message": item.get("error_message"), 

780 "status_history": json.loads(item.get("status_history", "[]")), 

781 } 

782 

783 

784# Singleton instances for use in the API 

785_template_store: TemplateStore | None = None 

786_webhook_store: WebhookStore | None = None 

787_job_store: JobStore | None = None 

788 

789 

790def get_template_store() -> TemplateStore: 

791 """Get or create the template store singleton.""" 

792 global _template_store 

793 if _template_store is None: 

794 _template_store = TemplateStore() 

795 return _template_store 

796 

797 

798def get_webhook_store() -> WebhookStore: 

799 """Get or create the webhook store singleton.""" 

800 global _webhook_store 

801 if _webhook_store is None: 

802 _webhook_store = WebhookStore() 

803 return _webhook_store 

804 

805 

806def get_job_store() -> JobStore: 

807 """Get or create the job store singleton.""" 

808 global _job_store 

809 if _job_store is None: 

810 _job_store = JobStore() 

811 return _job_store