153 lines
3.9 KiB
TypeScript
153 lines
3.9 KiB
TypeScript
import { QueueWorker, type QueueJob, type QueueStore } from '@bytelyst/queue';
|
|
import type { EmitResult } from './memory.js';
|
|
import type {
|
|
EventHandler,
|
|
EventSubscription,
|
|
PlatformEvent,
|
|
PlatformEventName,
|
|
PlatformEventPayload,
|
|
} from './types.js';
|
|
|
|
interface EventEnvelope<T extends PlatformEventName = PlatformEventName> {
|
|
event: PlatformEvent<T>;
|
|
}
|
|
|
|
export interface DurableEventBusOptions {
|
|
store: QueueStore;
|
|
queueName?: string;
|
|
workerId?: string;
|
|
pollIntervalMs?: number;
|
|
leaseMs?: number;
|
|
backoffMs?: number;
|
|
autoStart?: boolean;
|
|
}
|
|
|
|
export class DurableEventBus {
|
|
private readonly handlers = new Map<
|
|
string,
|
|
Set<{ id: string; fn: EventHandler<PlatformEventName> }>
|
|
>();
|
|
private subscriptionCounter = 0;
|
|
private readonly queueName: string;
|
|
private readonly store: QueueStore;
|
|
private readonly worker: QueueWorker<EventEnvelope, void>;
|
|
private running = false;
|
|
|
|
constructor(options: DurableEventBusOptions) {
|
|
this.queueName = options.queueName ?? 'platform-events';
|
|
this.store = options.store;
|
|
this.worker = new QueueWorker<EventEnvelope, void>({
|
|
queueName: this.queueName,
|
|
store: this.store,
|
|
workerId: options.workerId,
|
|
pollIntervalMs: options.pollIntervalMs,
|
|
leaseMs: options.leaseMs,
|
|
backoffMs: options.backoffMs,
|
|
handler: async job => {
|
|
await this.dispatch(job);
|
|
},
|
|
});
|
|
|
|
if (options.autoStart !== false) {
|
|
this.start();
|
|
}
|
|
}
|
|
|
|
on<T extends PlatformEventName>(eventType: T, handler: EventHandler<T>): EventSubscription {
|
|
const id = `sub_${++this.subscriptionCounter}`;
|
|
|
|
if (!this.handlers.has(eventType)) {
|
|
this.handlers.set(eventType, new Set());
|
|
}
|
|
|
|
const entry = { id, fn: handler as EventHandler<PlatformEventName> };
|
|
this.handlers.get(eventType)!.add(entry);
|
|
|
|
return {
|
|
id,
|
|
eventType,
|
|
unsubscribe: () => {
|
|
this.handlers.get(eventType)?.delete(entry);
|
|
},
|
|
};
|
|
}
|
|
|
|
async emit<T extends PlatformEventName>(
|
|
eventType: T,
|
|
payload: PlatformEventPayload<T>,
|
|
options?: { source?: string }
|
|
): Promise<EmitResult> {
|
|
const event: PlatformEvent<T> = {
|
|
id: crypto.randomUUID(),
|
|
type: eventType,
|
|
payload,
|
|
timestamp: new Date().toISOString(),
|
|
source: options?.source,
|
|
};
|
|
|
|
await this.store.enqueue<EventEnvelope>(this.queueName, {
|
|
idempotencyKey: event.id,
|
|
type: eventType,
|
|
payload: { event },
|
|
productId: extractProductId(payload),
|
|
metadata: {
|
|
source: options?.source,
|
|
},
|
|
});
|
|
|
|
return {
|
|
eventId: event.id,
|
|
handlerCount: this.listenerCount(eventType),
|
|
errors: [],
|
|
};
|
|
}
|
|
|
|
start(): void {
|
|
if (this.running) return;
|
|
this.running = true;
|
|
this.worker.start();
|
|
}
|
|
|
|
async stop(): Promise<void> {
|
|
if (!this.running) return;
|
|
this.running = false;
|
|
await this.worker.stop();
|
|
}
|
|
|
|
clear(eventType?: PlatformEventName): void {
|
|
if (eventType) {
|
|
this.handlers.delete(eventType);
|
|
} else {
|
|
this.handlers.clear();
|
|
}
|
|
}
|
|
|
|
listenerCount(eventType: PlatformEventName): number {
|
|
return this.handlers.get(eventType)?.size ?? 0;
|
|
}
|
|
|
|
eventTypes(): PlatformEventName[] {
|
|
return Array.from(this.handlers.entries())
|
|
.filter(([, set]) => set.size > 0)
|
|
.map(([type]) => type as PlatformEventName);
|
|
}
|
|
|
|
private async dispatch(job: QueueJob<EventEnvelope, void>): Promise<void> {
|
|
const event = job.payload.event;
|
|
const handlers = this.handlers.get(event.type);
|
|
if (!handlers || handlers.size === 0) {
|
|
return;
|
|
}
|
|
|
|
await Promise.allSettled(
|
|
Array.from(handlers).map(async ({ fn }) => fn(event as PlatformEvent<PlatformEventName>))
|
|
);
|
|
}
|
|
}
|
|
|
|
function extractProductId(payload: unknown): string | undefined {
|
|
if (!payload || typeof payload !== 'object') return undefined;
|
|
const productId = (payload as { productId?: unknown }).productId;
|
|
return typeof productId === 'string' ? productId : undefined;
|
|
}
|