import { randomUUID } from 'node:crypto'; import type { PlatformEventName, PlatformEvent } from '@bytelyst/events'; import { bus } from './event-bus.js'; import { storeEvent } from './event-store-bridge.js'; interface DispatcherOptions { maxRetries?: number; productId?: string; } type HandlerFn = ( topic: string, payload: Record, productId: string ) => Promise; const handlers = new Map(); export function registerDispatchHandler(handlerType: string, fn: HandlerFn): void { const existing = handlers.get(handlerType) ?? []; existing.push(fn); handlers.set(handlerType, existing); } export function clearDispatchHandlers(): void { handlers.clear(); } export async function dispatchEvent( topic: string, payload: Record, productId: string, options: DispatcherOptions = {} ): Promise<{ dispatched: number; errors: number }> { const maxRetries = options.maxRetries ?? 3; let dispatched = 0; let errors = 0; // Store event for replay capability const eventId = `evt_${randomUUID()}`; try { await storeEvent(productId, topic, payload, eventId); } catch { // best-effort storage — don't block dispatch } // Look up active subscriptions for this topic const { findActiveByTopic } = await import('../modules/event-subscriptions/repository.js'); const subscriptions = await findActiveByTopic(productId, topic).catch( () => [] as Awaited> ); for (const sub of subscriptions) { // Apply filter expression if present if (sub.filterExpression) { try { const filterFn = new Function('payload', `return ${sub.filterExpression}`); if (!filterFn(payload)) continue; } catch { // skip invalid filter expressions continue; } } const handlerFns = handlers.get(sub.handlerType) ?? []; for (const fn of handlerFns) { let attempt = 0; let success = false; while (attempt < maxRetries && !success) { attempt++; try { await fn(topic, payload, productId); success = true; dispatched++; } catch { if (attempt >= maxRetries) { errors++; // Send to DLQ try { const { createDlqEntry } = await import('../modules/event-subscriptions/repository.js'); await createDlqEntry({ id: `dlq_${randomUUID()}`, productId, topic, payload, error: `Failed after ${maxRetries} attempts`, attempts: attempt, subscriptionId: sub.id, originalEventId: eventId, createdAt: new Date().toISOString(), }); } catch { // best-effort DLQ } } } } } } return { dispatched, errors }; } export function wireDispatcherToBus(): void { // Subscribe to all known events and dispatch them const eventNames: PlatformEventName[] = [ 'user.created', 'user.deleted', 'subscription.created', 'subscription.changed', 'subscription.canceled', 'payment.succeeded', 'payment.failed', 'job.completed', 'job.failed', 'flag.toggled', 'flag.created', 'flag.updated', 'flag.deleted', 'flag.kill_switch', 'license.activated', 'license.expired', 'invitation.redeemed', 'referral.completed', 'waitlist.joined', ]; for (const eventName of eventNames) { bus.on(eventName, async (event: PlatformEvent) => { const productId = (event.payload as Record).productId as string | undefined; if (productId) { await dispatchEvent(event.type, event.payload as Record, productId); } }); } }