feat(events): add durable event bus worker path

This commit is contained in:
root 2026-03-14 16:02:40 +00:00
parent 8de22f9f22
commit 885ee2d504
11 changed files with 374 additions and 10 deletions

View File

@ -42,6 +42,10 @@ TELEGRAM_BOT_TOKEN=
TELEGRAM_DEFAULT_CHAT_ID=
SLACK_WEBHOOK_URL=
SLACK_DEFAULT_CHANNEL=
EVENT_BUS_BACKEND=file
EVENT_BUS_FILE=.data/platform-events.json
EVENT_BUS_POLL_MS=100
EVENT_BUS_LEASE_MS=30000
# ── Extraction Service (port 4005 + Python sidecar 4006) ─────
PYTHON_SIDECAR_URL=http://localhost:4006

View File

@ -17,6 +17,13 @@
"build": "tsc",
"test": "vitest run"
},
"dependencies": {
"@bytelyst/queue": "workspace:*"
},
"devDependencies": {
"@types/node": "^22.12.0",
"vitest": "^3.0.5"
},
"peerDependencies": {
"zod": "^3.0.0"
}

View File

@ -0,0 +1,91 @@
import { mkdtemp, rm } from 'node:fs/promises';
import { join } from 'node:path';
import { tmpdir } from 'node:os';
import { describe, it, expect, vi } from 'vitest';
import { FileQueueStore } from '@bytelyst/queue';
import { MemoryQueueStore } from '@bytelyst/queue';
import { DurableEventBus } from './durable.js';
describe('DurableEventBus', () => {
it('delivers queued events through the worker', async () => {
const store = new MemoryQueueStore();
const bus = new DurableEventBus({
store,
autoStart: false,
pollIntervalMs: 10,
});
const handler = vi.fn();
bus.on('user.created', handler);
bus.start();
const result = await bus.emit('user.created', {
userId: 'u1',
email: 'test@example.com',
plan: 'free',
productId: 'lysnrai',
});
expect(result.handlerCount).toBe(1);
await waitFor(() => {
expect(handler).toHaveBeenCalledOnce();
});
await bus.stop();
});
it('persists emitted events across bus instances before the worker starts', async () => {
const dir = await mkdtemp(join(tmpdir(), 'events-store-'));
const filePath = join(dir, 'events.json');
try {
const first = new DurableEventBus({
store: new FileQueueStore({ filePath }),
autoStart: false,
pollIntervalMs: 10,
});
await first.emit('payment.failed', {
invoiceId: 'inv_1',
userId: 'u1',
amount: 499,
retryCount: 1,
productId: 'lysnrai',
});
await first.stop();
const second = new DurableEventBus({
store: new FileQueueStore({ filePath }),
autoStart: false,
pollIntervalMs: 10,
});
const handler = vi.fn();
second.on('payment.failed', handler);
second.start();
await waitFor(() => {
expect(handler).toHaveBeenCalledOnce();
});
await second.stop();
} finally {
await rm(dir, { recursive: true, force: true });
}
});
});
async function waitFor(assertion: () => void, timeoutMs = 1_000): Promise<void> {
const startedAt = Date.now();
while (Date.now() - startedAt < timeoutMs) {
try {
assertion();
return;
} catch {
await new Promise(resolve => setTimeout(resolve, 20));
}
}
assertion();
}

View File

@ -0,0 +1,152 @@
import { QueueWorker, type QueueJob, type QueueStore } from '@bytelyst/queue';
import type { EmitResult } from './memory.js';
import type {
EventHandler,
EventSubscription,
PlatformEvent,
PlatformEventName,
PlatformEventPayload,
} from './types.js';
interface EventEnvelope<T extends PlatformEventName = PlatformEventName> {
event: PlatformEvent<T>;
}
export interface DurableEventBusOptions {
store: QueueStore;
queueName?: string;
workerId?: string;
pollIntervalMs?: number;
leaseMs?: number;
backoffMs?: number;
autoStart?: boolean;
}
export class DurableEventBus {
private readonly handlers = new Map<
string,
Set<{ id: string; fn: EventHandler<PlatformEventName> }>
>();
private subscriptionCounter = 0;
private readonly queueName: string;
private readonly store: QueueStore;
private readonly worker: QueueWorker<EventEnvelope, void>;
private running = false;
constructor(options: DurableEventBusOptions) {
this.queueName = options.queueName ?? 'platform-events';
this.store = options.store;
this.worker = new QueueWorker<EventEnvelope, void>({
queueName: this.queueName,
store: this.store,
workerId: options.workerId,
pollIntervalMs: options.pollIntervalMs,
leaseMs: options.leaseMs,
backoffMs: options.backoffMs,
handler: async job => {
await this.dispatch(job);
},
});
if (options.autoStart !== false) {
this.start();
}
}
on<T extends PlatformEventName>(eventType: T, handler: EventHandler<T>): EventSubscription {
const id = `sub_${++this.subscriptionCounter}`;
if (!this.handlers.has(eventType)) {
this.handlers.set(eventType, new Set());
}
const entry = { id, fn: handler as EventHandler<PlatformEventName> };
this.handlers.get(eventType)!.add(entry);
return {
id,
eventType,
unsubscribe: () => {
this.handlers.get(eventType)?.delete(entry);
},
};
}
async emit<T extends PlatformEventName>(
eventType: T,
payload: PlatformEventPayload<T>,
options?: { source?: string }
): Promise<EmitResult> {
const event: PlatformEvent<T> = {
id: crypto.randomUUID(),
type: eventType,
payload,
timestamp: new Date().toISOString(),
source: options?.source,
};
await this.store.enqueue<EventEnvelope>(this.queueName, {
idempotencyKey: event.id,
type: eventType,
payload: { event },
productId: extractProductId(payload),
metadata: {
source: options?.source,
},
});
return {
eventId: event.id,
handlerCount: this.listenerCount(eventType),
errors: [],
};
}
start(): void {
if (this.running) return;
this.running = true;
this.worker.start();
}
async stop(): Promise<void> {
if (!this.running) return;
this.running = false;
await this.worker.stop();
}
clear(eventType?: PlatformEventName): void {
if (eventType) {
this.handlers.delete(eventType);
} else {
this.handlers.clear();
}
}
listenerCount(eventType: PlatformEventName): number {
return this.handlers.get(eventType)?.size ?? 0;
}
eventTypes(): PlatformEventName[] {
return Array.from(this.handlers.entries())
.filter(([, set]) => set.size > 0)
.map(([type]) => type as PlatformEventName);
}
private async dispatch(job: QueueJob<EventEnvelope, void>): Promise<void> {
const event = job.payload.event;
const handlers = this.handlers.get(event.type);
if (!handlers || handlers.size === 0) {
return;
}
await Promise.allSettled(
Array.from(handlers).map(async ({ fn }) => fn(event as PlatformEvent<PlatformEventName>))
);
}
}
function extractProductId(payload: unknown): string | undefined {
if (!payload || typeof payload !== 'object') return undefined;
const productId = (payload as { productId?: unknown }).productId;
return typeof productId === 'string' ? productId : undefined;
}

View File

@ -1,5 +1,7 @@
export { EventBus } from './memory.js';
export type { EmitResult, EmitError } from './memory.js';
export { DurableEventBus } from './durable.js';
export type { DurableEventBusOptions } from './durable.js';
export { PlatformEventSchemas } from './types.js';
export type {
PlatformEventName,

42
pnpm-lock.yaml generated
View File

@ -430,9 +430,19 @@ importers:
packages/events:
dependencies:
'@bytelyst/queue':
specifier: workspace:*
version: link:../queue
zod:
specifier: ^3.0.0
version: 3.25.76
devDependencies:
'@types/node':
specifier: ^22.12.0
version: 22.19.11
vitest:
specifier: ^3.0.5
version: 3.2.4(@types/debug@4.1.12)(@types/node@22.19.11)(happy-dom@18.0.1)(jiti@2.6.1)(jsdom@28.0.0(@noble/hashes@1.8.0))(lightningcss@1.31.1)(msw@2.12.10(@types/node@22.19.11)(typescript@5.9.3))(terser@5.46.0)(tsx@4.21.0)(yaml@2.8.2)
packages/extraction:
dependencies:
@ -772,6 +782,9 @@ importers:
'@bytelyst/fastify-core':
specifier: workspace:*
version: link:../../packages/fastify-core
'@bytelyst/queue':
specifier: workspace:*
version: link:../../packages/queue
'@bytelyst/storage':
specifier: workspace:*
version: link:../../packages/storage
@ -18915,8 +18928,8 @@ snapshots:
'@next/eslint-plugin-next': 16.1.6
eslint: 9.39.2(jiti@2.6.1)
eslint-import-resolver-node: 0.3.9
eslint-import-resolver-typescript: 3.10.1(eslint-plugin-import@2.32.0)(eslint@9.39.2(jiti@2.6.1))
eslint-plugin-import: 2.32.0(@typescript-eslint/parser@8.56.0(eslint@9.39.2(jiti@2.6.1))(typescript@5.9.3))(eslint-import-resolver-typescript@3.10.1)(eslint@9.39.2(jiti@2.6.1))
eslint-import-resolver-typescript: 3.10.1(eslint-plugin-import@2.32.0(@typescript-eslint/parser@8.56.0(eslint@9.39.2(jiti@2.6.1))(typescript@5.9.3))(eslint@9.39.2(jiti@2.6.1)))(eslint@9.39.2(jiti@2.6.1))
eslint-plugin-import: 2.32.0(@typescript-eslint/parser@8.56.0(eslint@9.39.2(jiti@2.6.1))(typescript@5.9.3))(eslint-import-resolver-typescript@3.10.1(eslint-plugin-import@2.32.0(@typescript-eslint/parser@8.56.0(eslint@9.39.2(jiti@2.6.1))(typescript@5.9.3))(eslint@9.39.2(jiti@2.6.1)))(eslint@9.39.2(jiti@2.6.1)))(eslint@9.39.2(jiti@2.6.1))
eslint-plugin-jsx-a11y: 6.10.2(eslint@9.39.2(jiti@2.6.1))
eslint-plugin-react: 7.37.5(eslint@9.39.2(jiti@2.6.1))
eslint-plugin-react-hooks: 7.0.1(eslint@9.39.2(jiti@2.6.1))
@ -18938,6 +18951,21 @@ snapshots:
transitivePeerDependencies:
- supports-color
eslint-import-resolver-typescript@3.10.1(eslint-plugin-import@2.32.0(@typescript-eslint/parser@8.56.0(eslint@9.39.2(jiti@2.6.1))(typescript@5.9.3))(eslint@9.39.2(jiti@2.6.1)))(eslint@9.39.2(jiti@2.6.1)):
dependencies:
'@nolyfill/is-core-module': 1.0.39
debug: 4.4.3
eslint: 9.39.2(jiti@2.6.1)
get-tsconfig: 4.13.6
is-bun-module: 2.0.0
stable-hash: 0.0.5
tinyglobby: 0.2.15
unrs-resolver: 1.11.1
optionalDependencies:
eslint-plugin-import: 2.32.0(@typescript-eslint/parser@8.56.0(eslint@9.39.2(jiti@2.6.1))(typescript@5.9.3))(eslint-import-resolver-typescript@3.10.1(eslint-plugin-import@2.32.0(@typescript-eslint/parser@8.56.0(eslint@9.39.2(jiti@2.6.1))(typescript@5.9.3))(eslint@9.39.2(jiti@2.6.1)))(eslint@9.39.2(jiti@2.6.1)))(eslint@9.39.2(jiti@2.6.1))
transitivePeerDependencies:
- supports-color
eslint-import-resolver-typescript@3.10.1(eslint-plugin-import@2.32.0)(eslint@9.39.2(jiti@2.6.1)):
dependencies:
'@nolyfill/is-core-module': 1.0.39
@ -18949,7 +18977,7 @@ snapshots:
tinyglobby: 0.2.15
unrs-resolver: 1.11.1
optionalDependencies:
eslint-plugin-import: 2.32.0(@typescript-eslint/parser@8.56.0(eslint@9.39.2(jiti@2.6.1))(typescript@5.9.3))(eslint-import-resolver-typescript@3.10.1)(eslint@9.39.2(jiti@2.6.1))
eslint-plugin-import: 2.32.0(@typescript-eslint/parser@8.55.0(eslint@9.39.2(jiti@2.6.1))(typescript@5.9.3))(eslint-import-resolver-typescript@3.10.1)(eslint@9.39.2(jiti@2.6.1))
transitivePeerDependencies:
- supports-color
@ -18964,14 +18992,14 @@ snapshots:
transitivePeerDependencies:
- supports-color
eslint-module-utils@2.12.1(@typescript-eslint/parser@8.56.0(eslint@9.39.2(jiti@2.6.1))(typescript@5.9.3))(eslint-import-resolver-node@0.3.9)(eslint-import-resolver-typescript@3.10.1)(eslint@9.39.2(jiti@2.6.1)):
eslint-module-utils@2.12.1(@typescript-eslint/parser@8.56.0(eslint@9.39.2(jiti@2.6.1))(typescript@5.9.3))(eslint-import-resolver-node@0.3.9)(eslint-import-resolver-typescript@3.10.1(eslint-plugin-import@2.32.0(@typescript-eslint/parser@8.56.0(eslint@9.39.2(jiti@2.6.1))(typescript@5.9.3))(eslint@9.39.2(jiti@2.6.1)))(eslint@9.39.2(jiti@2.6.1)))(eslint@9.39.2(jiti@2.6.1)):
dependencies:
debug: 3.2.7
optionalDependencies:
'@typescript-eslint/parser': 8.56.0(eslint@9.39.2(jiti@2.6.1))(typescript@5.9.3)
eslint: 9.39.2(jiti@2.6.1)
eslint-import-resolver-node: 0.3.9
eslint-import-resolver-typescript: 3.10.1(eslint-plugin-import@2.32.0)(eslint@9.39.2(jiti@2.6.1))
eslint-import-resolver-typescript: 3.10.1(eslint-plugin-import@2.32.0(@typescript-eslint/parser@8.56.0(eslint@9.39.2(jiti@2.6.1))(typescript@5.9.3))(eslint@9.39.2(jiti@2.6.1)))(eslint@9.39.2(jiti@2.6.1))
transitivePeerDependencies:
- supports-color
@ -19004,7 +19032,7 @@ snapshots:
- eslint-import-resolver-webpack
- supports-color
eslint-plugin-import@2.32.0(@typescript-eslint/parser@8.56.0(eslint@9.39.2(jiti@2.6.1))(typescript@5.9.3))(eslint-import-resolver-typescript@3.10.1)(eslint@9.39.2(jiti@2.6.1)):
eslint-plugin-import@2.32.0(@typescript-eslint/parser@8.56.0(eslint@9.39.2(jiti@2.6.1))(typescript@5.9.3))(eslint-import-resolver-typescript@3.10.1(eslint-plugin-import@2.32.0(@typescript-eslint/parser@8.56.0(eslint@9.39.2(jiti@2.6.1))(typescript@5.9.3))(eslint@9.39.2(jiti@2.6.1)))(eslint@9.39.2(jiti@2.6.1)))(eslint@9.39.2(jiti@2.6.1)):
dependencies:
'@rtsao/scc': 1.1.0
array-includes: 3.1.9
@ -19015,7 +19043,7 @@ snapshots:
doctrine: 2.1.0
eslint: 9.39.2(jiti@2.6.1)
eslint-import-resolver-node: 0.3.9
eslint-module-utils: 2.12.1(@typescript-eslint/parser@8.56.0(eslint@9.39.2(jiti@2.6.1))(typescript@5.9.3))(eslint-import-resolver-node@0.3.9)(eslint-import-resolver-typescript@3.10.1)(eslint@9.39.2(jiti@2.6.1))
eslint-module-utils: 2.12.1(@typescript-eslint/parser@8.56.0(eslint@9.39.2(jiti@2.6.1))(typescript@5.9.3))(eslint-import-resolver-node@0.3.9)(eslint-import-resolver-typescript@3.10.1(eslint-plugin-import@2.32.0(@typescript-eslint/parser@8.56.0(eslint@9.39.2(jiti@2.6.1))(typescript@5.9.3))(eslint@9.39.2(jiti@2.6.1)))(eslint@9.39.2(jiti@2.6.1)))(eslint@9.39.2(jiti@2.6.1))
hasown: 2.0.2
is-core-module: 2.16.1
is-glob: 4.0.3

View File

@ -8,7 +8,7 @@
"dev": "tsx watch src/server.ts",
"build": "tsc",
"start": "node dist/server.js",
"pretest": "corepack pnpm --dir ../.. --filter @bytelyst/auth --filter @bytelyst/blob --filter @bytelyst/storage --filter @bytelyst/config --filter @bytelyst/cosmos --filter @bytelyst/datastore --filter @bytelyst/errors --filter @bytelyst/events --filter @bytelyst/fastify-core build",
"pretest": "corepack pnpm --dir ../.. --filter @bytelyst/auth --filter @bytelyst/blob --filter @bytelyst/storage --filter @bytelyst/config --filter @bytelyst/cosmos --filter @bytelyst/datastore --filter @bytelyst/errors --filter @bytelyst/queue --filter @bytelyst/events --filter @bytelyst/fastify-core build",
"test": "vitest run",
"test:watch": "vitest",
"lint": "eslint src/",
@ -25,6 +25,7 @@
"@bytelyst/errors": "workspace:*",
"@bytelyst/events": "workspace:*",
"@bytelyst/fastify-core": "workspace:*",
"@bytelyst/queue": "workspace:*",
"@fastify/cors": "^10.0.2",
"@fastify/rate-limit": "^10.3.0",
"@fastify/swagger": "^9.4.2",

View File

@ -61,6 +61,10 @@ const envSchema = z.object({
TELEGRAM_DEFAULT_CHAT_ID: z.string().optional(),
SLACK_WEBHOOK_URL: z.string().optional(),
SLACK_DEFAULT_CHANNEL: z.string().optional(),
EVENT_BUS_BACKEND: z.enum(['memory', 'file']).default('file'),
EVENT_BUS_FILE: z.string().default('.data/platform-events.json'),
EVENT_BUS_POLL_MS: z.coerce.number().default(100),
EVENT_BUS_LEASE_MS: z.coerce.number().default(30_000),
});
export const config = envSchema.parse(process.env);

View File

@ -0,0 +1,47 @@
import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest';
import { join } from 'node:path';
import { tmpdir } from 'node:os';
const ORIGINAL_ENV = { ...process.env };
describe('event bus backend wiring', () => {
beforeEach(() => {
vi.resetModules();
process.env = {
...ORIGINAL_ENV,
COSMOS_ENDPOINT: 'http://localhost:8081',
COSMOS_KEY: 'test-key',
JWT_SECRET: 'test-jwt-secret',
};
});
afterEach(async () => {
vi.resetModules();
process.env = { ...ORIGINAL_ENV };
});
it('uses the durable event bus when configured for file backend', async () => {
process.env.EVENT_BUS_BACKEND = 'file';
process.env.EVENT_BUS_FILE = join(tmpdir(), `platform-events-${Date.now()}.json`);
const [{ DurableEventBus }, eventBusModule] = await Promise.all([
import('@bytelyst/events'),
import('./event-bus.js'),
]);
expect(eventBusModule.bus).toBeInstanceOf(DurableEventBus);
eventBusModule.startEventBus();
await eventBusModule.stopEventBus();
});
it('uses the in-memory event bus when configured for memory backend', async () => {
process.env.EVENT_BUS_BACKEND = 'memory';
const [{ EventBus }, eventBusModule] = await Promise.all([
import('@bytelyst/events'),
import('./event-bus.js'),
]);
expect(eventBusModule.bus).toBeInstanceOf(EventBus);
});
});

View File

@ -1,7 +1,30 @@
import { EventBus } from '@bytelyst/events';
import { DurableEventBus, EventBus } from '@bytelyst/events';
import { FileQueueStore } from '@bytelyst/queue';
import { config } from './config.js';
// ── Singleton Event Bus ──────────────────────────────────────
// Single instance shared across all modules in platform-service.
// Import this wherever you need to emit or subscribe to events.
export const bus = new EventBus();
export const bus =
config.EVENT_BUS_BACKEND === 'file'
? new DurableEventBus({
store: new FileQueueStore({ filePath: config.EVENT_BUS_FILE }),
queueName: 'platform-events',
pollIntervalMs: config.EVENT_BUS_POLL_MS,
leaseMs: config.EVENT_BUS_LEASE_MS,
autoStart: false,
})
: new EventBus();
export function startEventBus(): void {
if (bus instanceof DurableEventBus) {
bus.start();
}
}
export async function stopEventBus(): Promise<void> {
if (bus instanceof DurableEventBus) {
await bus.stop();
}
}

View File

@ -89,6 +89,7 @@ import { registerDiagnosticsSubscribers } from './modules/diagnostics/subscriber
import { registerDeliverySubscribers } from './modules/delivery/subscribers.js';
import { verifyToken } from './modules/auth/jwt.js';
import { registerOptionalApiKeyContext } from './lib/api-key-auth.js';
import { startEventBus, stopEventBus } from './lib/event-bus.js';
await initCosmosIfNeeded();
await loadProductCache();
@ -210,6 +211,10 @@ await app.register(surveyRoutes, { prefix: '/api' });
// Register event bus subscribers
registerDiagnosticsSubscribers(app.log);
registerDeliverySubscribers(app.log);
startEventBus();
app.addHook('onClose', async () => {
await stopEventBus();
});
// Start diagnostic trigger evaluation job (Phase 4)
startTriggerEvaluationJob(app.log);