Coverage for gco / services / api_routes / webhooks.py: 100%
56 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-30 21:47 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-30 21:47 +0000
1"""Webhook registration and management endpoints."""
3from __future__ import annotations
5import logging
6import os
7import uuid
8from datetime import UTC, datetime
9from typing import TYPE_CHECKING
11from fastapi import APIRouter, HTTPException
12from fastapi.responses import JSONResponse, Response
14from gco.services.api_shared import WebhookRequest
15from gco.services.webhook_dispatcher import validate_webhook_url
17if TYPE_CHECKING:
18 from gco.services.template_store import WebhookStore
20router = APIRouter(prefix="/api/v1/webhooks", tags=["Webhooks"])
21logger = logging.getLogger(__name__)
24def _get_webhook_store() -> WebhookStore:
25 from gco.services.manifest_api import webhook_store
27 if webhook_store is None:
28 raise HTTPException(status_code=503, detail="Webhook store not initialized")
29 return webhook_store
32@router.get("")
33async def list_webhooks(namespace: str | None = None) -> Response:
34 """List all registered webhooks."""
35 store = _get_webhook_store()
36 try:
37 webhooks_list = store.list_webhooks(namespace=namespace)
38 return JSONResponse(
39 status_code=200,
40 content={
41 "timestamp": datetime.now(UTC).isoformat(),
42 "count": len(webhooks_list),
43 "webhooks": webhooks_list,
44 },
45 )
46 except Exception as e:
47 logger.error(f"Failed to list webhooks: {e}")
48 raise HTTPException(status_code=500, detail=f"Failed to list webhooks: {e!s}") from e
51@router.post("")
52async def create_webhook(request: WebhookRequest) -> Response:
53 """Register a new webhook for job events."""
54 store = _get_webhook_store()
56 # Validate the URL at registration time so misconfigured webhooks are
57 # rejected immediately instead of failing silently at every job event.
58 # Uses the same validator as the delivery path (validate_webhook_url)
59 # so the two never drift: HTTPS-only, no RFC1918/link-local/loopback,
60 # optional allowed_domains from WEBHOOK_ALLOWED_DOMAINS env var.
61 allowed_domains_str = os.getenv("WEBHOOK_ALLOWED_DOMAINS", "")
62 allowed_domains = [d.strip() for d in allowed_domains_str.split(",") if d.strip()]
63 is_valid, error = validate_webhook_url(request.url, allowed_domains=allowed_domains or None)
64 if not is_valid:
65 raise HTTPException(
66 status_code=400,
67 detail=f"Invalid webhook URL: {error}",
68 )
70 webhook_id = str(uuid.uuid4())[:8]
72 try:
73 webhook = store.create_webhook(
74 webhook_id=webhook_id,
75 url=request.url,
76 events=[e.value for e in request.events],
77 namespace=request.namespace,
78 secret=request.secret,
79 )
80 return JSONResponse(
81 status_code=201,
82 content={
83 "timestamp": datetime.now(UTC).isoformat(),
84 "message": "Webhook registered successfully",
85 "webhook": webhook,
86 },
87 )
88 except ValueError as e:
89 raise HTTPException(status_code=409, detail=str(e)) from e
90 except Exception as e:
91 logger.error(f"Failed to create webhook: {e}")
92 raise HTTPException(status_code=500, detail=f"Failed to create webhook: {e!s}") from e
95@router.delete("/{webhook_id}")
96async def delete_webhook(webhook_id: str) -> Response:
97 """Delete a webhook."""
98 store = _get_webhook_store()
99 try:
100 deleted = store.delete_webhook(webhook_id)
101 if not deleted:
102 raise HTTPException(status_code=404, detail=f"Webhook '{webhook_id}' not found")
103 return JSONResponse(
104 status_code=200,
105 content={
106 "timestamp": datetime.now(UTC).isoformat(),
107 "message": f"Webhook '{webhook_id}' deleted successfully",
108 },
109 )
110 except HTTPException:
111 raise
112 except Exception as e:
113 logger.error(f"Failed to delete webhook: {e}")
114 raise HTTPException(status_code=500, detail=f"Failed to delete webhook: {e!s}") from e