learning_ai_clock/backend/src/modules/webhooks/dispatcher.ts
saravanakumardb1 f10b83c122 feat(backend): scaffold product-specific Fastify backend (port 4011)
Add backend/ directory with Fastify 5 + TypeScript ESM service:
- Modules: timers, routines, households, shared-timers, webhooks (migrated from platform-service)
- Cosmos containers: timers, routines, households, shared_timers, webhook_subscriptions, webhook_events
- JWT verification via jose (matches platform-service issuer)
- Shared @bytelyst/* packages via file: refs
- 171 Vitest tests passing

Update AGENTS.md: update backend integration section with product backend details
2026-03-01 20:39:08 -08:00

201 lines
5.9 KiB
TypeScript

import { createHmac } from 'node:crypto';
import type { WebhookEventType, WebhookSubscriptionDoc, WebhookEventDoc } from './types.js';
import * as repo from './repository.js';
// ── HMAC Signing ──────────────────────────────────────────────
export function signPayload(payload: string, secret: string): string {
return createHmac('sha256', secret).update(payload).digest('hex');
}
export function buildSignatureHeader(payload: string, secret: string): string {
const timestamp = Math.floor(Date.now() / 1000);
const signature = createHmac('sha256', secret).update(`${timestamp}.${payload}`).digest('hex');
return `t=${timestamp},v1=${signature}`;
}
// ── Delivery ──────────────────────────────────────────────────
export interface DeliveryResult {
subscriptionId: string;
eventId: string;
success: boolean;
statusCode?: number;
error?: string;
}
/**
* Dispatch a webhook event to all matching subscriptions for a user.
* Returns delivery results for each subscription.
*/
export async function dispatchEvent(
userId: string,
productId: string,
eventType: WebhookEventType,
payload: Record<string, unknown>,
log?: { info: (...args: unknown[]) => void; error: (...args: unknown[]) => void }
): Promise<DeliveryResult[]> {
const subscriptions = await repo.findSubscriptionsForEvent(userId, productId, eventType);
if (subscriptions.length === 0) {
return [];
}
const results: DeliveryResult[] = [];
for (const sub of subscriptions) {
const result = await deliverToSubscription(sub, eventType, payload, log);
results.push(result);
}
return results;
}
/**
* Deliver a single event to a single subscription.
* Creates an event log entry and handles retries.
*/
async function deliverToSubscription(
sub: WebhookSubscriptionDoc,
eventType: WebhookEventType,
payload: Record<string, unknown>,
log?: { info: (...args: unknown[]) => void; error: (...args: unknown[]) => void }
): Promise<DeliveryResult> {
const eventId = crypto.randomUUID();
const now = new Date().toISOString();
// Create event log entry
const eventDoc: WebhookEventDoc = {
id: eventId,
subscriptionId: sub.id,
userId: sub.userId,
productId: sub.productId,
eventType,
payload,
createdAt: now,
attempts: 0,
maxRetries: sub.maxRetries,
};
await repo.createEvent(eventDoc);
// Attempt delivery with retries
const maxAttempts = (sub.maxRetries || 3) + 1;
let lastError: string | undefined;
let statusCode: number | undefined;
for (let attempt = 1; attempt <= maxAttempts; attempt++) {
try {
const bodyJson = JSON.stringify({
id: eventId,
type: eventType,
timestamp: now,
data: payload,
});
const signatureHeader = buildSignatureHeader(bodyJson, sub.secret);
const controller = new globalThis.AbortController();
const timeout = globalThis.setTimeout(() => controller.abort(), 10_000);
const response = await fetch(sub.url, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'X-Webhook-Signature': signatureHeader,
'X-Webhook-Id': eventId,
'X-Webhook-Event': eventType,
'User-Agent': 'ChronoMind-Webhooks/1.0',
},
body: bodyJson,
signal: controller.signal,
});
globalThis.clearTimeout(timeout);
statusCode = response.status;
if (response.ok) {
// Success — update event log
await repo.updateEvent({
...eventDoc,
deliveredAt: new Date().toISOString(),
statusCode,
attempts: attempt,
});
await repo.resetFailureCount(sub.id, sub.userId);
log?.info({ subscriptionId: sub.id, eventType, attempt, statusCode }, 'webhook delivered');
return {
subscriptionId: sub.id,
eventId,
success: true,
statusCode,
};
}
lastError = `HTTP ${statusCode}`;
} catch (err: unknown) {
lastError = err instanceof Error ? err.message : String(err);
}
// Exponential backoff between retries (100ms, 200ms, 400ms, ...)
if (attempt < maxAttempts) {
const delay = Math.min(100 * Math.pow(2, attempt - 1), 5000);
await new Promise<void>(resolve => globalThis.setTimeout(resolve, delay));
}
}
// All attempts failed
await repo.updateEvent({
...eventDoc,
attempts: maxAttempts,
error: lastError,
statusCode,
});
await repo.incrementFailureCount(sub.id, sub.userId);
log?.error({ subscriptionId: sub.id, eventType, error: lastError }, 'webhook delivery failed');
return {
subscriptionId: sub.id,
eventId,
success: false,
statusCode,
error: lastError,
};
}
// ── Verify Signature (for consumers) ──────────────────────────
export function verifySignature(
signatureHeader: string,
body: string,
secret: string,
toleranceSeconds = 300
): boolean {
const parts = signatureHeader.split(',');
const timestampPart = parts.find(p => p.startsWith('t='));
const signaturePart = parts.find(p => p.startsWith('v1='));
if (!timestampPart || !signaturePart) return false;
const timestamp = parseInt(timestampPart.slice(2), 10);
const signature = signaturePart.slice(3);
// Check timestamp tolerance
const now = Math.floor(Date.now() / 1000);
if (Math.abs(now - timestamp) > toleranceSeconds) return false;
// Verify HMAC
const expected = createHmac('sha256', secret).update(`${timestamp}.${body}`).digest('hex');
// Constant-time comparison
if (expected.length !== signature.length) return false;
let diff = 0;
for (let i = 0; i < expected.length; i++) {
diff |= expected.charCodeAt(i) ^ signature.charCodeAt(i);
}
return diff === 0;
}