import { QueueWorker, type QueueJob, type QueueStore } from '@bytelyst/queue'; import type { EmitResult } from './memory.js'; import type { EventHandler, EventSubscription, PlatformEvent, PlatformEventName, PlatformEventPayload, } from './types.js'; interface EventEnvelope { event: PlatformEvent; } export interface DurableEventBusOptions { store: QueueStore; queueName?: string; workerId?: string; pollIntervalMs?: number; leaseMs?: number; backoffMs?: number; autoStart?: boolean; } export class DurableEventBus { private readonly handlers = new Map< string, Set<{ id: string; fn: EventHandler }> >(); private subscriptionCounter = 0; private readonly queueName: string; private readonly store: QueueStore; private readonly worker: QueueWorker; private running = false; constructor(options: DurableEventBusOptions) { this.queueName = options.queueName ?? 'platform-events'; this.store = options.store; this.worker = new QueueWorker({ queueName: this.queueName, store: this.store, workerId: options.workerId, pollIntervalMs: options.pollIntervalMs, leaseMs: options.leaseMs, backoffMs: options.backoffMs, handler: async job => { await this.dispatch(job); }, }); if (options.autoStart !== false) { this.start(); } } on(eventType: T, handler: EventHandler): EventSubscription { const id = `sub_${++this.subscriptionCounter}`; if (!this.handlers.has(eventType)) { this.handlers.set(eventType, new Set()); } const entry = { id, fn: handler as EventHandler }; this.handlers.get(eventType)!.add(entry); return { id, eventType, unsubscribe: () => { this.handlers.get(eventType)?.delete(entry); }, }; } async emit( eventType: T, payload: PlatformEventPayload, options?: { source?: string } ): Promise { const event: PlatformEvent = { id: crypto.randomUUID(), type: eventType, payload, timestamp: new Date().toISOString(), source: options?.source, }; await this.store.enqueue(this.queueName, { idempotencyKey: event.id, type: eventType, payload: { event }, productId: extractProductId(payload), metadata: { source: options?.source, }, }); return { eventId: event.id, handlerCount: this.listenerCount(eventType), errors: [], }; } start(): void { if (this.running) return; this.running = true; this.worker.start(); } async stop(): Promise { if (!this.running) return; this.running = false; await this.worker.stop(); } clear(eventType?: PlatformEventName): void { if (eventType) { this.handlers.delete(eventType); } else { this.handlers.clear(); } } listenerCount(eventType: PlatformEventName): number { return this.handlers.get(eventType)?.size ?? 0; } eventTypes(): PlatformEventName[] { return Array.from(this.handlers.entries()) .filter(([, set]) => set.size > 0) .map(([type]) => type as PlatformEventName); } private async dispatch(job: QueueJob): Promise { const event = job.payload.event; const handlers = this.handlers.get(event.type); if (!handlers || handlers.size === 0) { return; } await Promise.allSettled( Array.from(handlers).map(async ({ fn }) => fn(event as PlatformEvent)) ); } } function extractProductId(payload: unknown): string | undefined { if (!payload || typeof payload !== 'object') return undefined; const productId = (payload as { productId?: unknown }).productId; return typeof productId === 'string' ? productId : undefined; }