Coverage for gco / services / webhook_dispatcher.py: 98%

322 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-30 21:47 +0000

1""" 

2Webhook Dispatcher Service for GCO (Global Capacity Orchestrator on AWS). 

3 

4This service monitors Kubernetes job status changes and dispatches webhook 

5notifications to registered endpoints. It runs as a background task alongside 

6the health monitor or as a standalone service. 

7 

8Key Features: 

9- Watches Kubernetes jobs for status changes (started, completed, failed) 

10- Queries matching webhooks from DynamoDB based on event type and namespace 

11- Dispatches HTTP POST requests with JSON payloads 

12- Signs payloads with HMAC-SHA256 when a secret is configured 

13- Implements retry logic with exponential backoff for failed deliveries 

14- Publishes delivery metrics to CloudWatch 

15 

16Webhook Payload Format: 

17 { 

18 "event": "job.completed", 

19 "timestamp": "2026-02-04T12:00:00Z", 

20 "cluster_id": "gco-cluster-us-east-1", 

21 "region": "us-east-1", 

22 "job": { 

23 "name": "my-job", 

24 "namespace": "gco-jobs", 

25 "uid": "abc-123", 

26 "status": "succeeded", 

27 "start_time": "2026-02-04T11:55:00Z", 

28 "completion_time": "2026-02-04T12:00:00Z", 

29 "succeeded": 1, 

30 "failed": 0 

31 } 

32 } 

33 

34HMAC Signature: 

35 When a webhook has a secret configured, the payload is signed using 

36 HMAC-SHA256. The signature is included in the X-GCO-Signature header 

37 as "sha256=<hex_digest>". 

38 

39Environment Variables: 

40 CLUSTER_NAME: Name of the EKS cluster 

41 REGION: AWS region of the cluster 

42 WEBHOOK_TIMEOUT: HTTP timeout for webhook calls (default: 30) 

43 WEBHOOK_MAX_RETRIES: Maximum retry attempts (default: 3) 

44 WEBHOOK_RETRY_DELAY: Initial retry delay in seconds (default: 5) 

45 WEBHOOKS_TABLE_NAME: DynamoDB table for webhooks 

46""" 

47 

48from __future__ import annotations 

49 

50import asyncio 

51import contextlib 

52import hashlib 

53import hmac 

54import ipaddress 

55import json 

56import logging 

57import os 

58import socket 

59from dataclasses import dataclass, field 

60from datetime import UTC, datetime 

61from enum import StrEnum 

62from typing import Any 

63from urllib.parse import urlparse 

64 

65import httpx 

66from kubernetes import client, config 

67from kubernetes.client.models import V1Job 

68from kubernetes.client.rest import ApiException 

69from kubernetes.watch import Watch 

70 

71from gco.services.template_store import WebhookStore, get_webhook_store 

72 

73logging.basicConfig( 

74 level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" 

75) 

76logger = logging.getLogger(__name__) 

77 

78# Networks blocked for SSRF prevention 

79BLOCKED_NETWORKS = [ 

80 ipaddress.ip_network("10.0.0.0/8"), 

81 ipaddress.ip_network("172.16.0.0/12"), 

82 ipaddress.ip_network("192.168.0.0/16"), 

83 ipaddress.ip_network("169.254.0.0/16"), 

84 ipaddress.ip_network("127.0.0.0/8"), 

85 ipaddress.ip_network("::1/128"), 

86 ipaddress.ip_network("fc00::/7"), 

87 ipaddress.ip_network("fe80::/10"), 

88] 

89 

90 

91def validate_webhook_url( 

92 url: str, allowed_domains: list[str] | None = None 

93) -> tuple[bool, str | None]: 

94 """Validate a webhook URL for SSRF prevention. 

95 

96 Checks: 

97 - HTTPS-only scheme 

98 - Domain allowlist (if configured) 

99 - DNS resolution with IP validation against blocked private networks 

100 

101 Args: 

102 url: The webhook URL to validate. 

103 allowed_domains: Optional list of allowed domains. If non-empty, 

104 only URLs targeting these domains are permitted. 

105 

106 Returns: 

107 A tuple of (is_valid, error_message). error_message is None when valid. 

108 """ 

109 parsed = urlparse(url) 

110 

111 # Scheme check 

112 if parsed.scheme != "https": 

113 return False, "Only HTTPS webhook URLs are allowed" 

114 

115 hostname = parsed.hostname 

116 if not hostname: 

117 return False, "Webhook URL must include a valid hostname" 

118 

119 # Domain allowlist (if configured and non-empty) 

120 if allowed_domains and hostname not in allowed_domains: 

121 return False, f"Domain '{hostname}' not in allowed domains list" 

122 

123 # DNS resolution + IP validation 

124 port = parsed.port or 443 

125 try: 

126 resolved = socket.getaddrinfo(hostname, port, proto=socket.IPPROTO_TCP) 

127 except socket.gaierror: 

128 return False, f"DNS resolution failed for {hostname}" 

129 

130 if not resolved: 

131 return False, f"DNS resolution returned no results for {hostname}" 

132 

133 for _family, _type, _proto, _canonname, sockaddr in resolved: 

134 ip = ipaddress.ip_address(sockaddr[0]) 

135 for network in BLOCKED_NETWORKS: 

136 if ip in network: 

137 return False, (f"Resolved IP {ip} is in blocked network {network}") 

138 

139 return True, None 

140 

141 

142class WebhookEvent(StrEnum): 

143 """Webhook event types.""" 

144 

145 JOB_STARTED = "job.started" 

146 JOB_COMPLETED = "job.completed" 

147 JOB_FAILED = "job.failed" 

148 

149 

150@dataclass 

151class WebhookDeliveryResult: 

152 """Result of a webhook delivery attempt.""" 

153 

154 webhook_id: str 

155 url: str 

156 event: str 

157 success: bool 

158 status_code: int | None = None 

159 error: str | None = None 

160 attempts: int = 1 

161 duration_ms: float = 0.0 

162 

163 

164@dataclass 

165class JobStateCache: 

166 """Cache of job states to detect transitions.""" 

167 

168 # Map of job_uid -> last known status 

169 job_states: dict[str, str] = field(default_factory=dict) 

170 

171 def get_state(self, job_uid: str) -> str | None: 

172 """Get cached state for a job.""" 

173 return self.job_states.get(job_uid) 

174 

175 def set_state(self, job_uid: str, state: str) -> str | None: 

176 """Set state for a job, returns previous state.""" 

177 previous = self.job_states.get(job_uid) 

178 self.job_states[job_uid] = state 

179 return previous 

180 

181 def remove(self, job_uid: str) -> None: 

182 """Remove a job from the cache.""" 

183 self.job_states.pop(job_uid, None) 

184 

185 

186class WebhookDispatcher: 

187 """ 

188 Dispatches webhook notifications for Kubernetes job events. 

189 

190 This class monitors job status changes and sends HTTP notifications 

191 to registered webhook endpoints. 

192 """ 

193 

194 def __init__( 

195 self, 

196 cluster_id: str, 

197 region: str, 

198 webhook_store: WebhookStore | None = None, 

199 timeout: int = 30, 

200 max_retries: int = 3, 

201 retry_delay: int = 5, 

202 namespaces: list[str] | None = None, 

203 allowed_domains: list[str] | None = None, 

204 ): 

205 """Initialize the webhook dispatcher. 

206 

207 Args: 

208 cluster_id: EKS cluster identifier 

209 region: AWS region 

210 webhook_store: DynamoDB webhook store (uses singleton if None) 

211 timeout: HTTP timeout for webhook calls in seconds 

212 max_retries: Maximum retry attempts for failed deliveries 

213 retry_delay: Initial retry delay in seconds (doubles each retry) 

214 namespaces: Namespaces to watch (None = all non-system namespaces) 

215 allowed_domains: Optional list of allowed webhook domains for SSRF prevention 

216 """ 

217 self.cluster_id = cluster_id 

218 self.region = region 

219 self.webhook_store = webhook_store or get_webhook_store() 

220 self.timeout = timeout 

221 self.max_retries = max_retries 

222 self.retry_delay = retry_delay 

223 self.namespaces = namespaces or ["gco-jobs", "default"] 

224 self.allowed_domains = allowed_domains or [] 

225 

226 # Initialize Kubernetes client 

227 try: 

228 config.load_incluster_config() 

229 logger.info("Loaded in-cluster Kubernetes configuration") 

230 except config.ConfigException: 

231 try: 

232 config.load_kube_config() 

233 logger.info("Loaded local Kubernetes configuration") 

234 except config.ConfigException as e: 

235 logger.error(f"Failed to load Kubernetes configuration: {e}") 

236 raise 

237 

238 self.batch_v1 = client.BatchV1Api() 

239 

240 # Timeout for Kubernetes API calls (seconds) 

241 self._k8s_timeout = int(os.environ.get("K8S_API_TIMEOUT", "30")) 

242 

243 # State tracking 

244 self._job_state_cache = JobStateCache() 

245 self._running = False 

246 self._watch_task: asyncio.Task[None] | None = None 

247 

248 # Metrics 

249 self._deliveries_total = 0 

250 self._deliveries_success = 0 

251 self._deliveries_failed = 0 

252 

253 def _compute_job_status(self, job: V1Job) -> str: 

254 """Compute the effective status of a Kubernetes job.""" 

255 status = job.status 

256 conditions = status.conditions or [] 

257 

258 for condition in conditions: 

259 if condition.type == "Complete" and condition.status == "True": 

260 return "succeeded" 

261 if condition.type == "Failed" and condition.status == "True": 261 ↛ 258line 261 didn't jump to line 258 because the condition on line 261 was always true

262 return "failed" 

263 

264 if (status.active or 0) > 0: 

265 return "running" 

266 

267 if (status.succeeded or 0) > 0: 

268 return "succeeded" 

269 

270 if (status.failed or 0) > 0: 

271 return "failed" 

272 

273 return "pending" 

274 

275 def _determine_event( 

276 self, previous_status: str | None, current_status: str 

277 ) -> WebhookEvent | None: 

278 """Determine which webhook event to fire based on status transition.""" 

279 if previous_status is None: 

280 # New job - check if it's already running 

281 if current_status == "running": 

282 return WebhookEvent.JOB_STARTED 

283 return None 

284 

285 # Status transitions 

286 if previous_status in ("pending",) and current_status == "running": 

287 return WebhookEvent.JOB_STARTED 

288 

289 if previous_status in ("pending", "running") and current_status == "succeeded": 

290 return WebhookEvent.JOB_COMPLETED 

291 

292 if previous_status in ("pending", "running") and current_status == "failed": 

293 return WebhookEvent.JOB_FAILED 

294 

295 return None 

296 

297 def _build_payload(self, event: WebhookEvent, job: V1Job) -> dict[str, Any]: 

298 """Build the webhook payload for a job event.""" 

299 metadata = job.metadata 

300 status = job.status 

301 

302 return { 

303 "event": event.value, 

304 "timestamp": datetime.now(UTC).isoformat().replace("+00:00", "Z"), 

305 "cluster_id": self.cluster_id, 

306 "region": self.region, 

307 "job": { 

308 "name": metadata.name, 

309 "namespace": metadata.namespace, 

310 "uid": metadata.uid, 

311 "labels": metadata.labels or {}, 

312 "status": self._compute_job_status(job), 

313 "start_time": (status.start_time.isoformat() if status.start_time else None), 

314 "completion_time": ( 

315 status.completion_time.isoformat() if status.completion_time else None 

316 ), 

317 "active": status.active or 0, 

318 "succeeded": status.succeeded or 0, 

319 "failed": status.failed or 0, 

320 }, 

321 } 

322 

323 def _sign_payload(self, payload: str, secret: str) -> str: 

324 """Sign a payload using HMAC-SHA256.""" 

325 signature = hmac.new( 

326 secret.encode("utf-8"), 

327 payload.encode("utf-8"), 

328 hashlib.sha256, 

329 ).hexdigest() 

330 return f"sha256={signature}" 

331 

332 async def _deliver_webhook( 

333 self, 

334 webhook: dict[str, Any], 

335 payload: dict[str, Any], 

336 ) -> WebhookDeliveryResult: 

337 """Deliver a webhook with retry logic.""" 

338 webhook_id = webhook["id"] 

339 url = webhook["url"] 

340 secret = webhook.get("secret") 

341 event = payload["event"] 

342 

343 # SSRF prevention: validate URL before making any HTTP request 

344 is_valid, error = validate_webhook_url(url, self.allowed_domains or None) 

345 if not is_valid: 345 ↛ 346line 345 didn't jump to line 346 because the condition on line 345 was never true

346 logger.warning(f"Webhook URL validation failed: {webhook_id} -> {url}: {error}") 

347 return WebhookDeliveryResult( 

348 webhook_id=webhook_id, 

349 url=url, 

350 event=event, 

351 success=False, 

352 error=f"URL validation failed: {error}", 

353 attempts=0, 

354 duration_ms=0.0, 

355 ) 

356 

357 payload_json = json.dumps(payload) 

358 headers = { 

359 "Content-Type": "application/json", 

360 "User-Agent": f"GCO-Webhook/{self.cluster_id}", 

361 "X-GCO-Event": event, 

362 "X-GCO-Cluster": self.cluster_id, 

363 "X-GCO-Region": self.region, 

364 } 

365 

366 if secret: 

367 headers["X-GCO-Signature"] = self._sign_payload(payload_json, secret) 

368 

369 attempts = 0 

370 last_error: str | None = None 

371 last_status_code: int | None = None 

372 start_time = datetime.now(UTC) 

373 

374 async with httpx.AsyncClient(timeout=self.timeout) as client: 

375 while attempts < self.max_retries: 

376 attempts += 1 

377 try: 

378 response = await client.post( 

379 url, 

380 content=payload_json, 

381 headers=headers, 

382 ) 

383 last_status_code = response.status_code 

384 

385 if 200 <= response.status_code < 300: 

386 duration = (datetime.now(UTC) - start_time).total_seconds() * 1000 

387 logger.info( 

388 f"Webhook delivered successfully: {webhook_id} -> {url} " 

389 f"(status={response.status_code}, attempts={attempts})" 

390 ) 

391 self._deliveries_success += 1 

392 self._deliveries_total += 1 

393 return WebhookDeliveryResult( 

394 webhook_id=webhook_id, 

395 url=url, 

396 event=event, 

397 success=True, 

398 status_code=response.status_code, 

399 attempts=attempts, 

400 duration_ms=duration, 

401 ) 

402 

403 # Non-2xx response - retry for 5xx errors 

404 last_error = f"HTTP {response.status_code}: {response.text[:200]}" 

405 if response.status_code >= 500: 

406 logger.warning( 

407 f"Webhook delivery failed (attempt {attempts}): " 

408 f"{webhook_id} -> {url}: {last_error}" 

409 ) 

410 if attempts < self.max_retries: 

411 delay = self.retry_delay * (2 ** (attempts - 1)) 

412 await asyncio.sleep(delay) 

413 continue 

414 # 4xx errors - don't retry 

415 break 

416 

417 except httpx.TimeoutException: 

418 last_error = "Request timed out" 

419 logger.warning( 

420 f"Webhook delivery timed out (attempt {attempts}): {webhook_id} -> {url}" 

421 ) 

422 if attempts < self.max_retries: 

423 delay = self.retry_delay * (2 ** (attempts - 1)) 

424 await asyncio.sleep(delay) 

425 

426 except httpx.RequestError as e: 

427 last_error = str(e) 

428 logger.warning( 

429 f"Webhook delivery error (attempt {attempts}): {webhook_id} -> {url}: {e}" 

430 ) 

431 if attempts < self.max_retries: 

432 delay = self.retry_delay * (2 ** (attempts - 1)) 

433 await asyncio.sleep(delay) 

434 

435 # All retries exhausted 

436 duration = (datetime.now(UTC) - start_time).total_seconds() * 1000 

437 logger.error( 

438 f"Webhook delivery failed after {attempts} attempts: " 

439 f"{webhook_id} -> {url}: {last_error}" 

440 ) 

441 self._deliveries_failed += 1 

442 self._deliveries_total += 1 

443 return WebhookDeliveryResult( 

444 webhook_id=webhook_id, 

445 url=url, 

446 event=event, 

447 success=False, 

448 status_code=last_status_code, 

449 error=last_error, 

450 attempts=attempts, 

451 duration_ms=duration, 

452 ) 

453 

454 async def _dispatch_event(self, event: WebhookEvent, job: V1Job) -> list[WebhookDeliveryResult]: 

455 """Dispatch webhooks for a job event.""" 

456 namespace = job.metadata.namespace 

457 payload = self._build_payload(event, job) 

458 

459 # Get webhooks subscribed to this event 

460 try: 

461 # Get webhooks for this specific namespace 

462 namespace_webhooks = self.webhook_store.get_webhooks_for_event( 

463 event.value, namespace=namespace 

464 ) 

465 # Get global webhooks (no namespace filter) 

466 global_webhooks = self.webhook_store.get_webhooks_for_event(event.value, namespace=None) 

467 

468 # Combine and deduplicate 

469 all_webhooks = {w["id"]: w for w in namespace_webhooks} 

470 for w in global_webhooks: 

471 if w.get("namespace") is None: # Only add truly global webhooks 

472 all_webhooks[w["id"]] = w 

473 

474 webhooks = list(all_webhooks.values()) 

475 

476 except Exception as e: 

477 logger.error(f"Failed to get webhooks for event {event.value}: {e}") 

478 return [] 

479 

480 if not webhooks: 

481 logger.debug(f"No webhooks registered for event {event.value} in namespace {namespace}") 

482 return [] 

483 

484 logger.info( 

485 f"Dispatching {len(webhooks)} webhooks for {event.value} " 

486 f"(job={job.metadata.name}, namespace={namespace})" 

487 ) 

488 

489 # Dispatch all webhooks concurrently 

490 tasks = [self._deliver_webhook(webhook, payload) for webhook in webhooks] 

491 results = await asyncio.gather(*tasks, return_exceptions=True) 

492 

493 # Filter out exceptions and return results 

494 delivery_results = [] 

495 for result in results: 

496 if isinstance(result, WebhookDeliveryResult): 

497 delivery_results.append(result) 

498 elif isinstance(result, Exception): 498 ↛ 495line 498 didn't jump to line 495 because the condition on line 498 was always true

499 logger.error(f"Webhook delivery raised exception: {result}") 

500 

501 return delivery_results 

502 

503 async def _process_job_event(self, event_type: str, job: V1Job) -> None: 

504 """Process a Kubernetes job event.""" 

505 job_uid = job.metadata.uid 

506 job_name = job.metadata.name 

507 namespace = job.metadata.namespace 

508 

509 # Skip system namespaces 

510 if namespace in ("kube-system", "kube-public", "kube-node-lease"): 

511 return 

512 

513 # Skip if not in watched namespaces (if specified) 

514 if self.namespaces and namespace not in self.namespaces: 

515 return 

516 

517 current_status = self._compute_job_status(job) 

518 

519 if event_type == "DELETED": 

520 self._job_state_cache.remove(job_uid) 

521 return 

522 

523 # Get previous state and update cache 

524 previous_status = self._job_state_cache.set_state(job_uid, current_status) 

525 

526 # Determine if we should fire an event 

527 webhook_event = self._determine_event(previous_status, current_status) 

528 

529 if webhook_event: 

530 logger.info( 

531 f"Job status transition: {job_name} ({namespace}) " 

532 f"{previous_status or 'new'} -> {current_status} " 

533 f"-> firing {webhook_event.value}" 

534 ) 

535 await self._dispatch_event(webhook_event, job) 

536 

537 def _sync_watch_jobs(self) -> list[tuple[str, Any]]: 

538 """ 

539 Synchronous job watcher that yields batches of events. 

540 This runs in a thread executor to avoid blocking the async event loop. 

541 """ 

542 w = Watch() 

543 events = [] 

544 

545 try: 

546 # Watch jobs with a short timeout to allow periodic returns 

547 for event in w.stream( 

548 self.batch_v1.list_job_for_all_namespaces, 

549 timeout_seconds=30, # Short timeout to return control periodically 

550 ): 

551 if not self._running: 

552 break 

553 

554 events.append((event["type"], event["object"])) 

555 

556 # Return batch after collecting some events or if we have any 

557 if len(events) >= 10: 

558 break 

559 

560 except ApiException as e: 

561 if e.status == 410: # Gone - resource version too old 

562 logger.warning("Watch expired, will restart...") 

563 else: 

564 logger.error(f"Kubernetes API error in job watcher: {e}") 

565 raise 

566 except Exception as e: 

567 logger.error(f"Error in sync job watcher: {e}") 

568 raise 

569 

570 return events 

571 

572 async def _watch_jobs(self) -> None: 

573 """Watch Kubernetes jobs for status changes using thread executor.""" 

574 logger.info(f"Starting job watcher for namespaces: {self.namespaces}") 

575 

576 while self._running: 

577 try: 

578 # Run the synchronous watch in a thread executor 

579 events = await asyncio.to_thread(self._sync_watch_jobs) 

580 

581 # Process collected events 

582 for event_type, job in events: 

583 if not self._running: 583 ↛ 584line 583 didn't jump to line 584 because the condition on line 583 was never true

584 break 

585 

586 try: 

587 await self._process_job_event(event_type, job) 

588 except Exception as e: 

589 logger.error(f"Error processing job event: {e}") 

590 

591 # Small delay between watch cycles if no events 

592 if not events: 

593 await asyncio.sleep(1) 

594 

595 except Exception as e: 

596 logger.error(f"Error in job watcher: {e}") 

597 await asyncio.sleep(5) 

598 

599 async def start(self) -> None: 

600 """Start the webhook dispatcher.""" 

601 if self._running: 

602 logger.warning("Webhook dispatcher already running") 

603 return 

604 

605 self._running = True 

606 logger.info(f"Starting webhook dispatcher for cluster {self.cluster_id}") 

607 

608 # Initialize job state cache with current jobs 

609 await self._initialize_job_cache() 

610 

611 # Start the watch task 

612 self._watch_task = asyncio.create_task(self._watch_jobs()) 

613 

614 async def stop(self) -> None: 

615 """Stop the webhook dispatcher.""" 

616 logger.info("Stopping webhook dispatcher") 

617 self._running = False 

618 

619 if self._watch_task: 

620 self._watch_task.cancel() 

621 with contextlib.suppress(asyncio.CancelledError): 

622 await self._watch_task 

623 self._watch_task = None 

624 

625 async def _initialize_job_cache(self) -> None: 

626 """Initialize the job state cache with current job states.""" 

627 try: 

628 jobs = self.batch_v1.list_job_for_all_namespaces( 

629 _request_timeout=self._k8s_timeout, 

630 ) 

631 for job in jobs.items: 

632 namespace = job.metadata.namespace 

633 if namespace in ("kube-system", "kube-public", "kube-node-lease"): 

634 continue 

635 if self.namespaces and namespace not in self.namespaces: 

636 continue 

637 

638 job_uid = job.metadata.uid 

639 status = self._compute_job_status(job) 

640 self._job_state_cache.set_state(job_uid, status) 

641 

642 logger.info(f"Initialized job cache with {len(self._job_state_cache.job_states)} jobs") 

643 except Exception as e: 

644 logger.error(f"Failed to initialize job cache: {e}") 

645 

646 def get_metrics(self) -> dict[str, Any]: 

647 """Get dispatcher metrics.""" 

648 return { 

649 "deliveries_total": self._deliveries_total, 

650 "deliveries_success": self._deliveries_success, 

651 "deliveries_failed": self._deliveries_failed, 

652 "cached_jobs": len(self._job_state_cache.job_states), 

653 "running": self._running, 

654 } 

655 

656 

657def create_webhook_dispatcher_from_env() -> WebhookDispatcher: 

658 """Create WebhookDispatcher instance from environment variables.""" 

659 cluster_id = os.getenv("CLUSTER_NAME", "unknown-cluster") 

660 region = os.getenv("REGION", "unknown-region") 

661 timeout = int(os.getenv("WEBHOOK_TIMEOUT", "30")) 

662 max_retries = int(os.getenv("WEBHOOK_MAX_RETRIES", "3")) 

663 retry_delay = int(os.getenv("WEBHOOK_RETRY_DELAY", "5")) 

664 

665 # Parse namespaces from env 

666 namespaces_str = os.getenv("ALLOWED_NAMESPACES", "gco-jobs,default") 

667 namespaces = [ns.strip() for ns in namespaces_str.split(",") if ns.strip()] 

668 

669 # Parse allowed domains from env (comma-separated) 

670 allowed_domains_str = os.getenv("WEBHOOK_ALLOWED_DOMAINS", "") 

671 allowed_domains = [d.strip() for d in allowed_domains_str.split(",") if d.strip()] 

672 

673 return WebhookDispatcher( 

674 cluster_id=cluster_id, 

675 region=region, 

676 timeout=timeout, 

677 max_retries=max_retries, 

678 retry_delay=retry_delay, 

679 namespaces=namespaces, 

680 allowed_domains=allowed_domains, 

681 ) 

682 

683 

684async def main() -> None: 

685 """Main function for running the webhook dispatcher standalone.""" 

686 dispatcher = create_webhook_dispatcher_from_env() 

687 

688 try: 

689 await dispatcher.start() 

690 

691 # Keep running until interrupted 

692 while True: 

693 await asyncio.sleep(60) 

694 metrics = dispatcher.get_metrics() 

695 logger.info(f"Webhook dispatcher metrics: {metrics}") 

696 

697 except KeyboardInterrupt: 

698 logger.info("Webhook dispatcher stopped by user") 

699 finally: 

700 await dispatcher.stop() 

701 

702 

703if __name__ == "__main__": 

704 asyncio.run(main())