Coverage for gco / services / webhook_dispatcher.py: 98%
322 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-30 21:47 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-30 21:47 +0000
1"""
2Webhook Dispatcher Service for GCO (Global Capacity Orchestrator on AWS).
4This service monitors Kubernetes job status changes and dispatches webhook
5notifications to registered endpoints. It runs as a background task alongside
6the health monitor or as a standalone service.
8Key Features:
9- Watches Kubernetes jobs for status changes (started, completed, failed)
10- Queries matching webhooks from DynamoDB based on event type and namespace
11- Dispatches HTTP POST requests with JSON payloads
12- Signs payloads with HMAC-SHA256 when a secret is configured
13- Implements retry logic with exponential backoff for failed deliveries
14- Publishes delivery metrics to CloudWatch
16Webhook Payload Format:
17 {
18 "event": "job.completed",
19 "timestamp": "2026-02-04T12:00:00Z",
20 "cluster_id": "gco-cluster-us-east-1",
21 "region": "us-east-1",
22 "job": {
23 "name": "my-job",
24 "namespace": "gco-jobs",
25 "uid": "abc-123",
26 "status": "succeeded",
27 "start_time": "2026-02-04T11:55:00Z",
28 "completion_time": "2026-02-04T12:00:00Z",
29 "succeeded": 1,
30 "failed": 0
31 }
32 }
34HMAC Signature:
35 When a webhook has a secret configured, the payload is signed using
36 HMAC-SHA256. The signature is included in the X-GCO-Signature header
37 as "sha256=<hex_digest>".
39Environment Variables:
40 CLUSTER_NAME: Name of the EKS cluster
41 REGION: AWS region of the cluster
42 WEBHOOK_TIMEOUT: HTTP timeout for webhook calls (default: 30)
43 WEBHOOK_MAX_RETRIES: Maximum retry attempts (default: 3)
44 WEBHOOK_RETRY_DELAY: Initial retry delay in seconds (default: 5)
45 WEBHOOKS_TABLE_NAME: DynamoDB table for webhooks
46"""
48from __future__ import annotations
50import asyncio
51import contextlib
52import hashlib
53import hmac
54import ipaddress
55import json
56import logging
57import os
58import socket
59from dataclasses import dataclass, field
60from datetime import UTC, datetime
61from enum import StrEnum
62from typing import Any
63from urllib.parse import urlparse
65import httpx
66from kubernetes import client, config
67from kubernetes.client.models import V1Job
68from kubernetes.client.rest import ApiException
69from kubernetes.watch import Watch
71from gco.services.template_store import WebhookStore, get_webhook_store
73logging.basicConfig(
74 level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
75)
76logger = logging.getLogger(__name__)
78# Networks blocked for SSRF prevention
79BLOCKED_NETWORKS = [
80 ipaddress.ip_network("10.0.0.0/8"),
81 ipaddress.ip_network("172.16.0.0/12"),
82 ipaddress.ip_network("192.168.0.0/16"),
83 ipaddress.ip_network("169.254.0.0/16"),
84 ipaddress.ip_network("127.0.0.0/8"),
85 ipaddress.ip_network("::1/128"),
86 ipaddress.ip_network("fc00::/7"),
87 ipaddress.ip_network("fe80::/10"),
88]
91def validate_webhook_url(
92 url: str, allowed_domains: list[str] | None = None
93) -> tuple[bool, str | None]:
94 """Validate a webhook URL for SSRF prevention.
96 Checks:
97 - HTTPS-only scheme
98 - Domain allowlist (if configured)
99 - DNS resolution with IP validation against blocked private networks
101 Args:
102 url: The webhook URL to validate.
103 allowed_domains: Optional list of allowed domains. If non-empty,
104 only URLs targeting these domains are permitted.
106 Returns:
107 A tuple of (is_valid, error_message). error_message is None when valid.
108 """
109 parsed = urlparse(url)
111 # Scheme check
112 if parsed.scheme != "https":
113 return False, "Only HTTPS webhook URLs are allowed"
115 hostname = parsed.hostname
116 if not hostname:
117 return False, "Webhook URL must include a valid hostname"
119 # Domain allowlist (if configured and non-empty)
120 if allowed_domains and hostname not in allowed_domains:
121 return False, f"Domain '{hostname}' not in allowed domains list"
123 # DNS resolution + IP validation
124 port = parsed.port or 443
125 try:
126 resolved = socket.getaddrinfo(hostname, port, proto=socket.IPPROTO_TCP)
127 except socket.gaierror:
128 return False, f"DNS resolution failed for {hostname}"
130 if not resolved:
131 return False, f"DNS resolution returned no results for {hostname}"
133 for _family, _type, _proto, _canonname, sockaddr in resolved:
134 ip = ipaddress.ip_address(sockaddr[0])
135 for network in BLOCKED_NETWORKS:
136 if ip in network:
137 return False, (f"Resolved IP {ip} is in blocked network {network}")
139 return True, None
142class WebhookEvent(StrEnum):
143 """Webhook event types."""
145 JOB_STARTED = "job.started"
146 JOB_COMPLETED = "job.completed"
147 JOB_FAILED = "job.failed"
150@dataclass
151class WebhookDeliveryResult:
152 """Result of a webhook delivery attempt."""
154 webhook_id: str
155 url: str
156 event: str
157 success: bool
158 status_code: int | None = None
159 error: str | None = None
160 attempts: int = 1
161 duration_ms: float = 0.0
164@dataclass
165class JobStateCache:
166 """Cache of job states to detect transitions."""
168 # Map of job_uid -> last known status
169 job_states: dict[str, str] = field(default_factory=dict)
171 def get_state(self, job_uid: str) -> str | None:
172 """Get cached state for a job."""
173 return self.job_states.get(job_uid)
175 def set_state(self, job_uid: str, state: str) -> str | None:
176 """Set state for a job, returns previous state."""
177 previous = self.job_states.get(job_uid)
178 self.job_states[job_uid] = state
179 return previous
181 def remove(self, job_uid: str) -> None:
182 """Remove a job from the cache."""
183 self.job_states.pop(job_uid, None)
186class WebhookDispatcher:
187 """
188 Dispatches webhook notifications for Kubernetes job events.
190 This class monitors job status changes and sends HTTP notifications
191 to registered webhook endpoints.
192 """
194 def __init__(
195 self,
196 cluster_id: str,
197 region: str,
198 webhook_store: WebhookStore | None = None,
199 timeout: int = 30,
200 max_retries: int = 3,
201 retry_delay: int = 5,
202 namespaces: list[str] | None = None,
203 allowed_domains: list[str] | None = None,
204 ):
205 """Initialize the webhook dispatcher.
207 Args:
208 cluster_id: EKS cluster identifier
209 region: AWS region
210 webhook_store: DynamoDB webhook store (uses singleton if None)
211 timeout: HTTP timeout for webhook calls in seconds
212 max_retries: Maximum retry attempts for failed deliveries
213 retry_delay: Initial retry delay in seconds (doubles each retry)
214 namespaces: Namespaces to watch (None = all non-system namespaces)
215 allowed_domains: Optional list of allowed webhook domains for SSRF prevention
216 """
217 self.cluster_id = cluster_id
218 self.region = region
219 self.webhook_store = webhook_store or get_webhook_store()
220 self.timeout = timeout
221 self.max_retries = max_retries
222 self.retry_delay = retry_delay
223 self.namespaces = namespaces or ["gco-jobs", "default"]
224 self.allowed_domains = allowed_domains or []
226 # Initialize Kubernetes client
227 try:
228 config.load_incluster_config()
229 logger.info("Loaded in-cluster Kubernetes configuration")
230 except config.ConfigException:
231 try:
232 config.load_kube_config()
233 logger.info("Loaded local Kubernetes configuration")
234 except config.ConfigException as e:
235 logger.error(f"Failed to load Kubernetes configuration: {e}")
236 raise
238 self.batch_v1 = client.BatchV1Api()
240 # Timeout for Kubernetes API calls (seconds)
241 self._k8s_timeout = int(os.environ.get("K8S_API_TIMEOUT", "30"))
243 # State tracking
244 self._job_state_cache = JobStateCache()
245 self._running = False
246 self._watch_task: asyncio.Task[None] | None = None
248 # Metrics
249 self._deliveries_total = 0
250 self._deliveries_success = 0
251 self._deliveries_failed = 0
253 def _compute_job_status(self, job: V1Job) -> str:
254 """Compute the effective status of a Kubernetes job."""
255 status = job.status
256 conditions = status.conditions or []
258 for condition in conditions:
259 if condition.type == "Complete" and condition.status == "True":
260 return "succeeded"
261 if condition.type == "Failed" and condition.status == "True": 261 ↛ 258line 261 didn't jump to line 258 because the condition on line 261 was always true
262 return "failed"
264 if (status.active or 0) > 0:
265 return "running"
267 if (status.succeeded or 0) > 0:
268 return "succeeded"
270 if (status.failed or 0) > 0:
271 return "failed"
273 return "pending"
275 def _determine_event(
276 self, previous_status: str | None, current_status: str
277 ) -> WebhookEvent | None:
278 """Determine which webhook event to fire based on status transition."""
279 if previous_status is None:
280 # New job - check if it's already running
281 if current_status == "running":
282 return WebhookEvent.JOB_STARTED
283 return None
285 # Status transitions
286 if previous_status in ("pending",) and current_status == "running":
287 return WebhookEvent.JOB_STARTED
289 if previous_status in ("pending", "running") and current_status == "succeeded":
290 return WebhookEvent.JOB_COMPLETED
292 if previous_status in ("pending", "running") and current_status == "failed":
293 return WebhookEvent.JOB_FAILED
295 return None
297 def _build_payload(self, event: WebhookEvent, job: V1Job) -> dict[str, Any]:
298 """Build the webhook payload for a job event."""
299 metadata = job.metadata
300 status = job.status
302 return {
303 "event": event.value,
304 "timestamp": datetime.now(UTC).isoformat().replace("+00:00", "Z"),
305 "cluster_id": self.cluster_id,
306 "region": self.region,
307 "job": {
308 "name": metadata.name,
309 "namespace": metadata.namespace,
310 "uid": metadata.uid,
311 "labels": metadata.labels or {},
312 "status": self._compute_job_status(job),
313 "start_time": (status.start_time.isoformat() if status.start_time else None),
314 "completion_time": (
315 status.completion_time.isoformat() if status.completion_time else None
316 ),
317 "active": status.active or 0,
318 "succeeded": status.succeeded or 0,
319 "failed": status.failed or 0,
320 },
321 }
323 def _sign_payload(self, payload: str, secret: str) -> str:
324 """Sign a payload using HMAC-SHA256."""
325 signature = hmac.new(
326 secret.encode("utf-8"),
327 payload.encode("utf-8"),
328 hashlib.sha256,
329 ).hexdigest()
330 return f"sha256={signature}"
332 async def _deliver_webhook(
333 self,
334 webhook: dict[str, Any],
335 payload: dict[str, Any],
336 ) -> WebhookDeliveryResult:
337 """Deliver a webhook with retry logic."""
338 webhook_id = webhook["id"]
339 url = webhook["url"]
340 secret = webhook.get("secret")
341 event = payload["event"]
343 # SSRF prevention: validate URL before making any HTTP request
344 is_valid, error = validate_webhook_url(url, self.allowed_domains or None)
345 if not is_valid: 345 ↛ 346line 345 didn't jump to line 346 because the condition on line 345 was never true
346 logger.warning(f"Webhook URL validation failed: {webhook_id} -> {url}: {error}")
347 return WebhookDeliveryResult(
348 webhook_id=webhook_id,
349 url=url,
350 event=event,
351 success=False,
352 error=f"URL validation failed: {error}",
353 attempts=0,
354 duration_ms=0.0,
355 )
357 payload_json = json.dumps(payload)
358 headers = {
359 "Content-Type": "application/json",
360 "User-Agent": f"GCO-Webhook/{self.cluster_id}",
361 "X-GCO-Event": event,
362 "X-GCO-Cluster": self.cluster_id,
363 "X-GCO-Region": self.region,
364 }
366 if secret:
367 headers["X-GCO-Signature"] = self._sign_payload(payload_json, secret)
369 attempts = 0
370 last_error: str | None = None
371 last_status_code: int | None = None
372 start_time = datetime.now(UTC)
374 async with httpx.AsyncClient(timeout=self.timeout) as client:
375 while attempts < self.max_retries:
376 attempts += 1
377 try:
378 response = await client.post(
379 url,
380 content=payload_json,
381 headers=headers,
382 )
383 last_status_code = response.status_code
385 if 200 <= response.status_code < 300:
386 duration = (datetime.now(UTC) - start_time).total_seconds() * 1000
387 logger.info(
388 f"Webhook delivered successfully: {webhook_id} -> {url} "
389 f"(status={response.status_code}, attempts={attempts})"
390 )
391 self._deliveries_success += 1
392 self._deliveries_total += 1
393 return WebhookDeliveryResult(
394 webhook_id=webhook_id,
395 url=url,
396 event=event,
397 success=True,
398 status_code=response.status_code,
399 attempts=attempts,
400 duration_ms=duration,
401 )
403 # Non-2xx response - retry for 5xx errors
404 last_error = f"HTTP {response.status_code}: {response.text[:200]}"
405 if response.status_code >= 500:
406 logger.warning(
407 f"Webhook delivery failed (attempt {attempts}): "
408 f"{webhook_id} -> {url}: {last_error}"
409 )
410 if attempts < self.max_retries:
411 delay = self.retry_delay * (2 ** (attempts - 1))
412 await asyncio.sleep(delay)
413 continue
414 # 4xx errors - don't retry
415 break
417 except httpx.TimeoutException:
418 last_error = "Request timed out"
419 logger.warning(
420 f"Webhook delivery timed out (attempt {attempts}): {webhook_id} -> {url}"
421 )
422 if attempts < self.max_retries:
423 delay = self.retry_delay * (2 ** (attempts - 1))
424 await asyncio.sleep(delay)
426 except httpx.RequestError as e:
427 last_error = str(e)
428 logger.warning(
429 f"Webhook delivery error (attempt {attempts}): {webhook_id} -> {url}: {e}"
430 )
431 if attempts < self.max_retries:
432 delay = self.retry_delay * (2 ** (attempts - 1))
433 await asyncio.sleep(delay)
435 # All retries exhausted
436 duration = (datetime.now(UTC) - start_time).total_seconds() * 1000
437 logger.error(
438 f"Webhook delivery failed after {attempts} attempts: "
439 f"{webhook_id} -> {url}: {last_error}"
440 )
441 self._deliveries_failed += 1
442 self._deliveries_total += 1
443 return WebhookDeliveryResult(
444 webhook_id=webhook_id,
445 url=url,
446 event=event,
447 success=False,
448 status_code=last_status_code,
449 error=last_error,
450 attempts=attempts,
451 duration_ms=duration,
452 )
454 async def _dispatch_event(self, event: WebhookEvent, job: V1Job) -> list[WebhookDeliveryResult]:
455 """Dispatch webhooks for a job event."""
456 namespace = job.metadata.namespace
457 payload = self._build_payload(event, job)
459 # Get webhooks subscribed to this event
460 try:
461 # Get webhooks for this specific namespace
462 namespace_webhooks = self.webhook_store.get_webhooks_for_event(
463 event.value, namespace=namespace
464 )
465 # Get global webhooks (no namespace filter)
466 global_webhooks = self.webhook_store.get_webhooks_for_event(event.value, namespace=None)
468 # Combine and deduplicate
469 all_webhooks = {w["id"]: w for w in namespace_webhooks}
470 for w in global_webhooks:
471 if w.get("namespace") is None: # Only add truly global webhooks
472 all_webhooks[w["id"]] = w
474 webhooks = list(all_webhooks.values())
476 except Exception as e:
477 logger.error(f"Failed to get webhooks for event {event.value}: {e}")
478 return []
480 if not webhooks:
481 logger.debug(f"No webhooks registered for event {event.value} in namespace {namespace}")
482 return []
484 logger.info(
485 f"Dispatching {len(webhooks)} webhooks for {event.value} "
486 f"(job={job.metadata.name}, namespace={namespace})"
487 )
489 # Dispatch all webhooks concurrently
490 tasks = [self._deliver_webhook(webhook, payload) for webhook in webhooks]
491 results = await asyncio.gather(*tasks, return_exceptions=True)
493 # Filter out exceptions and return results
494 delivery_results = []
495 for result in results:
496 if isinstance(result, WebhookDeliveryResult):
497 delivery_results.append(result)
498 elif isinstance(result, Exception): 498 ↛ 495line 498 didn't jump to line 495 because the condition on line 498 was always true
499 logger.error(f"Webhook delivery raised exception: {result}")
501 return delivery_results
503 async def _process_job_event(self, event_type: str, job: V1Job) -> None:
504 """Process a Kubernetes job event."""
505 job_uid = job.metadata.uid
506 job_name = job.metadata.name
507 namespace = job.metadata.namespace
509 # Skip system namespaces
510 if namespace in ("kube-system", "kube-public", "kube-node-lease"):
511 return
513 # Skip if not in watched namespaces (if specified)
514 if self.namespaces and namespace not in self.namespaces:
515 return
517 current_status = self._compute_job_status(job)
519 if event_type == "DELETED":
520 self._job_state_cache.remove(job_uid)
521 return
523 # Get previous state and update cache
524 previous_status = self._job_state_cache.set_state(job_uid, current_status)
526 # Determine if we should fire an event
527 webhook_event = self._determine_event(previous_status, current_status)
529 if webhook_event:
530 logger.info(
531 f"Job status transition: {job_name} ({namespace}) "
532 f"{previous_status or 'new'} -> {current_status} "
533 f"-> firing {webhook_event.value}"
534 )
535 await self._dispatch_event(webhook_event, job)
537 def _sync_watch_jobs(self) -> list[tuple[str, Any]]:
538 """
539 Synchronous job watcher that yields batches of events.
540 This runs in a thread executor to avoid blocking the async event loop.
541 """
542 w = Watch()
543 events = []
545 try:
546 # Watch jobs with a short timeout to allow periodic returns
547 for event in w.stream(
548 self.batch_v1.list_job_for_all_namespaces,
549 timeout_seconds=30, # Short timeout to return control periodically
550 ):
551 if not self._running:
552 break
554 events.append((event["type"], event["object"]))
556 # Return batch after collecting some events or if we have any
557 if len(events) >= 10:
558 break
560 except ApiException as e:
561 if e.status == 410: # Gone - resource version too old
562 logger.warning("Watch expired, will restart...")
563 else:
564 logger.error(f"Kubernetes API error in job watcher: {e}")
565 raise
566 except Exception as e:
567 logger.error(f"Error in sync job watcher: {e}")
568 raise
570 return events
572 async def _watch_jobs(self) -> None:
573 """Watch Kubernetes jobs for status changes using thread executor."""
574 logger.info(f"Starting job watcher for namespaces: {self.namespaces}")
576 while self._running:
577 try:
578 # Run the synchronous watch in a thread executor
579 events = await asyncio.to_thread(self._sync_watch_jobs)
581 # Process collected events
582 for event_type, job in events:
583 if not self._running: 583 ↛ 584line 583 didn't jump to line 584 because the condition on line 583 was never true
584 break
586 try:
587 await self._process_job_event(event_type, job)
588 except Exception as e:
589 logger.error(f"Error processing job event: {e}")
591 # Small delay between watch cycles if no events
592 if not events:
593 await asyncio.sleep(1)
595 except Exception as e:
596 logger.error(f"Error in job watcher: {e}")
597 await asyncio.sleep(5)
599 async def start(self) -> None:
600 """Start the webhook dispatcher."""
601 if self._running:
602 logger.warning("Webhook dispatcher already running")
603 return
605 self._running = True
606 logger.info(f"Starting webhook dispatcher for cluster {self.cluster_id}")
608 # Initialize job state cache with current jobs
609 await self._initialize_job_cache()
611 # Start the watch task
612 self._watch_task = asyncio.create_task(self._watch_jobs())
614 async def stop(self) -> None:
615 """Stop the webhook dispatcher."""
616 logger.info("Stopping webhook dispatcher")
617 self._running = False
619 if self._watch_task:
620 self._watch_task.cancel()
621 with contextlib.suppress(asyncio.CancelledError):
622 await self._watch_task
623 self._watch_task = None
625 async def _initialize_job_cache(self) -> None:
626 """Initialize the job state cache with current job states."""
627 try:
628 jobs = self.batch_v1.list_job_for_all_namespaces(
629 _request_timeout=self._k8s_timeout,
630 )
631 for job in jobs.items:
632 namespace = job.metadata.namespace
633 if namespace in ("kube-system", "kube-public", "kube-node-lease"):
634 continue
635 if self.namespaces and namespace not in self.namespaces:
636 continue
638 job_uid = job.metadata.uid
639 status = self._compute_job_status(job)
640 self._job_state_cache.set_state(job_uid, status)
642 logger.info(f"Initialized job cache with {len(self._job_state_cache.job_states)} jobs")
643 except Exception as e:
644 logger.error(f"Failed to initialize job cache: {e}")
646 def get_metrics(self) -> dict[str, Any]:
647 """Get dispatcher metrics."""
648 return {
649 "deliveries_total": self._deliveries_total,
650 "deliveries_success": self._deliveries_success,
651 "deliveries_failed": self._deliveries_failed,
652 "cached_jobs": len(self._job_state_cache.job_states),
653 "running": self._running,
654 }
657def create_webhook_dispatcher_from_env() -> WebhookDispatcher:
658 """Create WebhookDispatcher instance from environment variables."""
659 cluster_id = os.getenv("CLUSTER_NAME", "unknown-cluster")
660 region = os.getenv("REGION", "unknown-region")
661 timeout = int(os.getenv("WEBHOOK_TIMEOUT", "30"))
662 max_retries = int(os.getenv("WEBHOOK_MAX_RETRIES", "3"))
663 retry_delay = int(os.getenv("WEBHOOK_RETRY_DELAY", "5"))
665 # Parse namespaces from env
666 namespaces_str = os.getenv("ALLOWED_NAMESPACES", "gco-jobs,default")
667 namespaces = [ns.strip() for ns in namespaces_str.split(",") if ns.strip()]
669 # Parse allowed domains from env (comma-separated)
670 allowed_domains_str = os.getenv("WEBHOOK_ALLOWED_DOMAINS", "")
671 allowed_domains = [d.strip() for d in allowed_domains_str.split(",") if d.strip()]
673 return WebhookDispatcher(
674 cluster_id=cluster_id,
675 region=region,
676 timeout=timeout,
677 max_retries=max_retries,
678 retry_delay=retry_delay,
679 namespaces=namespaces,
680 allowed_domains=allowed_domains,
681 )
684async def main() -> None:
685 """Main function for running the webhook dispatcher standalone."""
686 dispatcher = create_webhook_dispatcher_from_env()
688 try:
689 await dispatcher.start()
691 # Keep running until interrupted
692 while True:
693 await asyncio.sleep(60)
694 metrics = dispatcher.get_metrics()
695 logger.info(f"Webhook dispatcher metrics: {metrics}")
697 except KeyboardInterrupt:
698 logger.info("Webhook dispatcher stopped by user")
699 finally:
700 await dispatcher.stop()
703if __name__ == "__main__":
704 asyncio.run(main())