Skip to main content

Verify HMAC Signatures

Every DVS webhook is signed with HMAC-SHA256 using a secret you received at onboarding. Always verify the signature before processing — if you skip this, an attacker who knows your endpoint URL can send fake events.

The algorithm

DVS computes:

payload_to_sign = timestamp + "." + raw_body
signature = hmac_sha256_hex(secret, payload_to_sign)

And sends two headers:

X-DVS-Signature: t=1748884800,v1=a8d3e8b1c2...
X-DVS-Signature-Timestamp: 1748884800

Your job:

  1. Extract timestamp from X-DVS-Signature-Timestamp.
  2. Build the expected payload: timestamp + "." + raw_request_body.
  3. Compute hmac_sha256_hex(your_secret, payload).
  4. Compare it against the v1=... part of X-DVS-Signature using constant-time comparison.
  5. Reject if |now - timestamp| > 300 seconds (anti-replay).

Critical rules

warning

Use the raw request body, not the parsed JSON. The signature is over the exact bytes DVS sent. If you parse JSON and re-serialize, whitespace and key order change and the signature will not match. Configure your framework to give you raw bytes (express.raw(), FastAPI await request.body(), etc.).

warning

Use a constant-time comparison. Use hmac.compare_digest (Python), crypto.timingSafeEqual (Node), hash_equals (PHP), MessageDigest.isEqual (Java). A naive == is vulnerable to timing attacks.

warning

Sync your server clock with NTP. The ±5 minute anti-replay window assumes accurate time. Drifted clocks will reject legitimate events.

Implementations

import hmac
import hashlib
import time
from fastapi import FastAPI, Request, HTTPException, Header

DVS_WEBHOOK_SECRET = os.environ["DVS_WEBHOOK_SECRET"]
SIGNATURE_TOLERANCE_SECONDS = 300

app = FastAPI()


def verify_dvs_signature(
raw_body: bytes,
signature_header: str | None,
timestamp_header: str | None,
secret: str,
) -> bool:
if not signature_header or not timestamp_header:
return False

# 1) Anti-replay window
try:
ts = int(timestamp_header)
except (TypeError, ValueError):
return False
if abs(time.time() - ts) > SIGNATURE_TOLERANCE_SECONDS:
return False

# 2) Parse signature header (format: "t=<ts>,v1=<hex>")
parts = {k: v for k, v in (kv.split("=", 1) for kv in signature_header.split(","))}
received = parts.get("v1")
if not received:
return False

# 3) Compute expected signature
payload = f"{timestamp_header}.".encode() + raw_body
expected = hmac.new(secret.encode(), payload, hashlib.sha256).hexdigest()

# 4) Constant-time compare
return hmac.compare_digest(received, expected)


@app.post("/integrations/dvs/webhooks")
async def receive_dvs_webhook(
request: Request,
x_dvs_signature: str | None = Header(None),
x_dvs_signature_timestamp: str | None = Header(None),
x_dvs_event_id: str | None = Header(None),
x_dvs_event_type: str | None = Header(None),
):
raw_body = await request.body()

if not verify_dvs_signature(
raw_body, x_dvs_signature, x_dvs_signature_timestamp, DVS_WEBHOOK_SECRET
):
raise HTTPException(status_code=401, detail="Invalid signature")

if already_processed(x_dvs_event_id):
return {"status": "duplicate_ignored"}

enqueue_for_processing(x_dvs_event_id, x_dvs_event_type, raw_body)
return {"status": "received"}

Troubleshooting

SymptomLikely cause
Receiver returns 401 for every webhookWrong secret, or computing HMAC on parsed JSON instead of raw body.
Some webhooks pass, others fail (after rotation)Forgot to support both secrets during the 24h grace window. See Rotating Webhook Secrets.
All webhooks rejected with "timestamp too old"Your server clock drifted. Install NTP / chrony / systemd-timesyncd.
Works locally with ngrok, fails in prodDifferent secret in prod environment, or proxy mutating the body (some load balancers/CDNs alter JSON). Bypass any transformation between the network edge and your handler.

Test it without real traffic

Ask OSIGU to send a test.ping event to your endpoint via the :test admin action. The payload is synthetic but signed with your real secret — confirms end-to-end verification works.