learning_ai_common_plat/packages/events/src/memory.ts

123 lines
3.5 KiB
TypeScript

import type {
PlatformEventName,
PlatformEventPayload,
PlatformEvent,
EventHandler,
EventSubscription,
} from './types.js';
// ── In-Memory Event Bus ──────────────────────────────────────
// Phase 1 implementation: typed in-process pub/sub with error isolation.
// Handlers run concurrently via Promise.allSettled — a failing handler
// never blocks other handlers or the emitter.
export class EventBus {
private handlers = new Map<string, Set<{ id: string; fn: EventHandler<PlatformEventName> }>>();
private subscriptionCounter = 0;
/**
* Subscribe to a specific event type.
* Returns an EventSubscription with an `unsubscribe()` method.
*/
on<T extends PlatformEventName>(eventType: T, handler: EventHandler<T>): EventSubscription {
const id = `sub_${++this.subscriptionCounter}`;
if (!this.handlers.has(eventType)) {
this.handlers.set(eventType, new Set());
}
const entry = { id, fn: handler as EventHandler<PlatformEventName> };
this.handlers.get(eventType)!.add(entry);
return {
id,
eventType,
unsubscribe: () => {
this.handlers.get(eventType)?.delete(entry);
},
};
}
/**
* Emit an event to all registered handlers for that event type.
* Handlers run concurrently. Errors are collected and returned,
* never thrown — the emitter is never blocked.
*/
async emit<T extends PlatformEventName>(
eventType: T,
payload: PlatformEventPayload<T>,
options?: { source?: string }
): Promise<EmitResult> {
const event: PlatformEvent<T> = {
id: crypto.randomUUID(),
type: eventType,
payload,
timestamp: new Date().toISOString(),
source: options?.source,
};
const handlers = this.handlers.get(eventType);
if (!handlers || handlers.size === 0) {
return { eventId: event.id, handlerCount: 0, errors: [] };
}
const results = await Promise.allSettled(
Array.from(handlers).map(async ({ fn }) => fn(event as PlatformEvent<PlatformEventName>))
);
const errors: EmitError[] = [];
for (const result of results) {
if (result.status === 'rejected') {
errors.push({
eventType,
eventId: event.id,
error: result.reason instanceof Error ? result.reason.message : String(result.reason),
});
}
}
return { eventId: event.id, handlerCount: handlers.size, errors };
}
/**
* Remove all handlers for a specific event type, or all handlers if no type given.
*/
clear(eventType?: PlatformEventName): void {
if (eventType) {
this.handlers.delete(eventType);
} else {
this.handlers.clear();
}
}
/**
* Get count of registered handlers for a specific event type.
*/
listenerCount(eventType: PlatformEventName): number {
return this.handlers.get(eventType)?.size ?? 0;
}
/**
* Get all event types that have at least one handler registered.
*/
eventTypes(): PlatformEventName[] {
return Array.from(this.handlers.entries())
.filter(([, set]) => set.size > 0)
.map(([type]) => type as PlatformEventName);
}
}
// ── Result Types ─────────────────────────────────────────────
export interface EmitResult {
eventId: string;
handlerCount: number;
errors: EmitError[];
}
export interface EmitError {
eventType: string;
eventId: string;
error: string;
}