Saltar al contenido principal

Verificar Signatures HMAC

Cada webhook de DVS se firma con HMAC-SHA256 usando un secret entregado en el onboarding. Siempre verificar la signature antes de procesar — si se omite este paso, un atacante que conozca la URL del endpoint puede enviar eventos falsos.

El algoritmo

DVS calcula:

payload_to_sign = timestamp + "." + raw_body
signature = hmac_sha256_hex(secret, payload_to_sign)

Y envía dos headers:

X-DVS-Signature: t=1748884800,v1=a8d3e8b1c2...
X-DVS-Signature-Timestamp: 1748884800

Lo que corresponde hacer:

  1. Extraer timestamp de X-DVS-Signature-Timestamp.
  2. Construir el payload esperado: timestamp + "." + raw_request_body.
  3. Calcular hmac_sha256_hex(your_secret, payload).
  4. Compararlo contra la parte v1=... de X-DVS-Signature usando comparación de tiempo constante.
  5. Rechazar si |now - timestamp| > 300 segundos (anti-replay).

Reglas críticas

aviso

Usar el raw request body, no el JSON parseado. La signature es sobre los bytes exactos que DVS envió. Si se parsea el JSON y se re-serializa, los espacios en blanco y el orden de las keys cambian y la signature no coincidirá. Configurar el framework para entregar los bytes raw (express.raw(), FastAPI await request.body(), etc.).

aviso

Usar una comparación de tiempo constante. Usar hmac.compare_digest (Python), crypto.timingSafeEqual (Node), hash_equals (PHP), MessageDigest.isEqual (Java). Un == ingenuo es vulnerable a timing attacks.

aviso

Sincronizar el reloj del servidor con NTP. La ventana anti-replay de ±5 minutos asume tiempo preciso. Relojes desviados rechazarán eventos legítimos.

Implementaciones

import hmac
import hashlib
import time
from fastapi import FastAPI, Request, HTTPException, Header

DVS_WEBHOOK_SECRET = os.environ["DVS_WEBHOOK_SECRET"]
SIGNATURE_TOLERANCE_SECONDS = 300

app = FastAPI()


def verify_dvs_signature(
raw_body: bytes,
signature_header: str | None,
timestamp_header: str | None,
secret: str,
) -> bool:
if not signature_header or not timestamp_header:
return False

# 1) Ventana anti-replay
try:
ts = int(timestamp_header)
except (TypeError, ValueError):
return False
if abs(time.time() - ts) > SIGNATURE_TOLERANCE_SECONDS:
return False

# 2) Parsear el header de signature (formato: "t=<ts>,v1=<hex>")
parts = {k: v for k, v in (kv.split("=", 1) for kv in signature_header.split(","))}
received = parts.get("v1")
if not received:
return False

# 3) Calcular la signature esperada
payload = f"{timestamp_header}.".encode() + raw_body
expected = hmac.new(secret.encode(), payload, hashlib.sha256).hexdigest()

# 4) Comparación de tiempo constante
return hmac.compare_digest(received, expected)


@app.post("/integrations/dvs/webhooks")
async def receive_dvs_webhook(
request: Request,
x_dvs_signature: str | None = Header(None),
x_dvs_signature_timestamp: str | None = Header(None),
x_dvs_event_id: str | None = Header(None),
x_dvs_event_type: str | None = Header(None),
):
raw_body = await request.body()

if not verify_dvs_signature(
raw_body, x_dvs_signature, x_dvs_signature_timestamp, DVS_WEBHOOK_SECRET
):
raise HTTPException(status_code=401, detail="Invalid signature")

if already_processed(x_dvs_event_id):
return {"status": "duplicate_ignored"}

enqueue_for_processing(x_dvs_event_id, x_dvs_event_type, raw_body)
return {"status": "received"}

Solución de problemas

SíntomaCausa probable
El receptor retorna 401 en todos los webhooksSecret incorrecto, o se está calculando el HMAC sobre el JSON parseado en lugar del raw body.
Algunos webhooks pasan y otros fallan (tras rotación)Se olvidó soportar ambos secrets durante la ventana de gracia de 24h. Ver Rotación de Webhook Secrets.
Todos los webhooks rechazados con "timestamp too old"El reloj del servidor está desviado. Instalar NTP / chrony / systemd-timesyncd.
Funciona localmente con ngrok, falla en producciónSecret distinto en el entorno de producción, o un proxy mutando el body (algunos load balancers/CDNs alteran el JSON). Evitar cualquier transformación entre el borde de la red y el handler.

Pruebas sin tráfico real

Pedir a OSIGU que envíe un evento test.ping al endpoint vía la acción administrativa :test. El payload es sintético pero firmado con el secret real — confirma que la verificación end-to-end funciona.