Implement a Webhook Receiver
This guide walks through implementing a webhook receiver that:
- Verifies the HMAC signature.
- Deduplicates by
event_id. - Returns 200 fast (under 10 seconds).
- Enqueues heavy work for async processing.
Architecture
- 1Receive the POST
DVS sends a JSON body and signature headers to your endpoint.
- 2Verify the HMAC signature
Reject with 401 if invalid. See Verify HMAC Signatures.
- 3Check idempotency
Look up
X-DVS-Event-Idin yourprocessed_eventstable. If present, return 200 immediately without reprocessing. - 4Insert idempotency record + enqueue
Atomic insert into
processed_events(unique constraint on event_id). Then push the event to your internal queue/worker. - 5Return 200 OK
Within 10 seconds. Do not do business logic synchronously.
Why decouple from business logic
DVS times out the webhook delivery at 10 seconds. If your receiver does heavy work synchronously (database queries, notifying downstream systems, sending emails), you risk timeout and DVS will retry — leading to duplicate processing.
The pattern is: receive, verify, enqueue, respond fast. A separate worker dequeues and processes at its own pace.
Idempotency table
CREATE TABLE processed_webhook_events (
event_id VARCHAR(255) PRIMARY KEY,
event_type VARCHAR(50) NOT NULL,
processed_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
Insert with ON CONFLICT DO NOTHING semantics. If the insert returns 0 rows, the event was already processed — respond 200 without re-enqueuing.
Clean up records older than 30 days with a daily cron — DVS won't retry events more than 30 minutes old anyway.
Full receiver — Python (FastAPI)
import hmac
import hashlib
import time
from fastapi import FastAPI, Request, HTTPException, Header
from sqlalchemy import insert
from sqlalchemy.dialects.postgresql import insert as pg_insert
# Assume `db`, `events_table`, `internal_queue` are already configured.
DVS_WEBHOOK_SECRET = os.environ["DVS_WEBHOOK_SECRET"]
SIGNATURE_TOLERANCE_SECONDS = 300
def verify_signature(raw_body: bytes, sig: str, ts: str, secret: str) -> bool:
if not sig or not ts: return False
try: ts_int = int(ts)
except ValueError: return False
if abs(time.time() - ts_int) > SIGNATURE_TOLERANCE_SECONDS: return False
parts = {k: v for k, v in (kv.split("=", 1) for kv in sig.split(","))}
if "v1" not in parts: return False
expected = hmac.new(secret.encode(), f"{ts}.".encode() + raw_body, hashlib.sha256).hexdigest()
return hmac.compare_digest(parts["v1"], expected)
def mark_processed(event_id: str, event_type: str) -> bool:
"""Return True if newly inserted (first time), False if already existed."""
stmt = pg_insert(events_table).values(event_id=event_id, event_type=event_type).on_conflict_do_nothing()
result = db.execute(stmt)
return result.rowcount == 1
app = FastAPI()
@app.post("/integrations/dvs/webhooks")
async def receive(request: Request,
x_dvs_signature: str = Header(None),
x_dvs_signature_timestamp: str = Header(None),
x_dvs_event_id: str = Header(None),
x_dvs_event_type: str = Header(None)):
raw_body = await request.body()
if not verify_signature(raw_body, x_dvs_signature, x_dvs_signature_timestamp, DVS_WEBHOOK_SECRET):
raise HTTPException(401, "Invalid signature")
if not mark_processed(x_dvs_event_id, x_dvs_event_type):
return {"status": "duplicate_ignored"}
internal_queue.send_message(MessageBody=raw_body.decode(), MessageAttributes={
"event_type": {"StringValue": x_dvs_event_type, "DataType": "String"},
"event_id": {"StringValue": x_dvs_event_id, "DataType": "String"},
})
return {"status": "received"}
The worker (separate process) reads from internal_queue, parses the JSON, and runs business logic.
Response code semantics (what DVS does with your reply)
| Your status | DVS behavior |
|---|---|
2xx | Delivered. No retry. |
3xx | Treated as error. Retried. DVS does NOT follow redirects. |
4xx | Treated as error. Retried up to max attempts. |
5xx | Treated as error. Retried. |
| Timeout (>10s) | Treated as failure. Retried. |
Returning 200 for duplicate events keeps DVS happy and prevents pointless retries.
What to do if your endpoint is down
DVS will retry up to 3 times (immediate, +60s, +300s). After 3 failures the event goes to DVS's dead-letter queue and is not auto-recovered.
Fallback: poll GET /v1/classification-requests/{id} (or validation-requests) for resources whose webhook you missed. The id is also in the response of the original POST.
After 10 consecutive failures, DVS auto-disables your endpoint. Contact OSIGU to reactivate.
Testing locally
Use ngrok to expose your local server. See Testing with ngrok.