learning_ai_common_plat/packages/events/src/durable.ts

153 lines
3.9 KiB
TypeScript

import { QueueWorker, type QueueJob, type QueueStore } from '@bytelyst/queue';
import type { EmitResult } from './memory.js';
import type {
EventHandler,
EventSubscription,
PlatformEvent,
PlatformEventName,
PlatformEventPayload,
} from './types.js';
interface EventEnvelope<T extends PlatformEventName = PlatformEventName> {
event: PlatformEvent<T>;
}
export interface DurableEventBusOptions {
store: QueueStore;
queueName?: string;
workerId?: string;
pollIntervalMs?: number;
leaseMs?: number;
backoffMs?: number;
autoStart?: boolean;
}
export class DurableEventBus {
private readonly handlers = new Map<
string,
Set<{ id: string; fn: EventHandler<PlatformEventName> }>
>();
private subscriptionCounter = 0;
private readonly queueName: string;
private readonly store: QueueStore;
private readonly worker: QueueWorker<EventEnvelope, void>;
private running = false;
constructor(options: DurableEventBusOptions) {
this.queueName = options.queueName ?? 'platform-events';
this.store = options.store;
this.worker = new QueueWorker<EventEnvelope, void>({
queueName: this.queueName,
store: this.store,
workerId: options.workerId,
pollIntervalMs: options.pollIntervalMs,
leaseMs: options.leaseMs,
backoffMs: options.backoffMs,
handler: async job => {
await this.dispatch(job);
},
});
if (options.autoStart !== false) {
this.start();
}
}
on<T extends PlatformEventName>(eventType: T, handler: EventHandler<T>): EventSubscription {
const id = `sub_${++this.subscriptionCounter}`;
if (!this.handlers.has(eventType)) {
this.handlers.set(eventType, new Set());
}
const entry = { id, fn: handler as EventHandler<PlatformEventName> };
this.handlers.get(eventType)!.add(entry);
return {
id,
eventType,
unsubscribe: () => {
this.handlers.get(eventType)?.delete(entry);
},
};
}
async emit<T extends PlatformEventName>(
eventType: T,
payload: PlatformEventPayload<T>,
options?: { source?: string }
): Promise<EmitResult> {
const event: PlatformEvent<T> = {
id: crypto.randomUUID(),
type: eventType,
payload,
timestamp: new Date().toISOString(),
source: options?.source,
};
await this.store.enqueue<EventEnvelope>(this.queueName, {
idempotencyKey: event.id,
type: eventType,
payload: { event },
productId: extractProductId(payload),
metadata: {
source: options?.source,
},
});
return {
eventId: event.id,
handlerCount: this.listenerCount(eventType),
errors: [],
};
}
start(): void {
if (this.running) return;
this.running = true;
this.worker.start();
}
async stop(): Promise<void> {
if (!this.running) return;
this.running = false;
await this.worker.stop();
}
clear(eventType?: PlatformEventName): void {
if (eventType) {
this.handlers.delete(eventType);
} else {
this.handlers.clear();
}
}
listenerCount(eventType: PlatformEventName): number {
return this.handlers.get(eventType)?.size ?? 0;
}
eventTypes(): PlatformEventName[] {
return Array.from(this.handlers.entries())
.filter(([, set]) => set.size > 0)
.map(([type]) => type as PlatformEventName);
}
private async dispatch(job: QueueJob<EventEnvelope, void>): Promise<void> {
const event = job.payload.event;
const handlers = this.handlers.get(event.type);
if (!handlers || handlers.size === 0) {
return;
}
await Promise.allSettled(
Array.from(handlers).map(async ({ fn }) => fn(event as PlatformEvent<PlatformEventName>))
);
}
}
function extractProductId(payload: unknown): string | undefined {
if (!payload || typeof payload !== 'object') return undefined;
const productId = (payload as { productId?: unknown }).productId;
return typeof productId === 'string' ? productId : undefined;
}