import type { PlatformEventName, PlatformEventPayload, PlatformEvent, EventHandler, EventSubscription, } from './types.js'; // ── In-Memory Event Bus ────────────────────────────────────── // Phase 1 implementation: typed in-process pub/sub with error isolation. // Handlers run concurrently via Promise.allSettled — a failing handler // never blocks other handlers or the emitter. export class EventBus { private handlers = new Map }>>(); private subscriptionCounter = 0; /** * Subscribe to a specific event type. * Returns an EventSubscription with an `unsubscribe()` method. */ on(eventType: T, handler: EventHandler): EventSubscription { const id = `sub_${++this.subscriptionCounter}`; if (!this.handlers.has(eventType)) { this.handlers.set(eventType, new Set()); } const entry = { id, fn: handler as EventHandler }; this.handlers.get(eventType)!.add(entry); return { id, eventType, unsubscribe: () => { this.handlers.get(eventType)?.delete(entry); }, }; } /** * Emit an event to all registered handlers for that event type. * Handlers run concurrently. Errors are collected and returned, * never thrown — the emitter is never blocked. */ async emit( eventType: T, payload: PlatformEventPayload, options?: { source?: string } ): Promise { const event: PlatformEvent = { id: crypto.randomUUID(), type: eventType, payload, timestamp: new Date().toISOString(), source: options?.source, }; const handlers = this.handlers.get(eventType); if (!handlers || handlers.size === 0) { return { eventId: event.id, handlerCount: 0, errors: [] }; } const results = await Promise.allSettled( Array.from(handlers).map(async ({ fn }) => fn(event as PlatformEvent)) ); const errors: EmitError[] = []; for (const result of results) { if (result.status === 'rejected') { errors.push({ eventType, eventId: event.id, error: result.reason instanceof Error ? result.reason.message : String(result.reason), }); } } return { eventId: event.id, handlerCount: handlers.size, errors }; } /** * Remove all handlers for a specific event type, or all handlers if no type given. */ clear(eventType?: PlatformEventName): void { if (eventType) { this.handlers.delete(eventType); } else { this.handlers.clear(); } } /** * Get count of registered handlers for a specific event type. */ listenerCount(eventType: PlatformEventName): number { return this.handlers.get(eventType)?.size ?? 0; } /** * Get all event types that have at least one handler registered. */ eventTypes(): PlatformEventName[] { return Array.from(this.handlers.entries()) .filter(([, set]) => set.size > 0) .map(([type]) => type as PlatformEventName); } } // ── Result Types ───────────────────────────────────────────── export interface EmitResult { eventId: string; handlerCount: number; errors: EmitError[]; } export interface EmitError { eventType: string; eventId: string; error: string; }