diff --git a/.env.example b/.env.example index 60364310..1aca5cba 100644 --- a/.env.example +++ b/.env.example @@ -42,6 +42,10 @@ TELEGRAM_BOT_TOKEN= TELEGRAM_DEFAULT_CHAT_ID= SLACK_WEBHOOK_URL= SLACK_DEFAULT_CHANNEL= +EVENT_BUS_BACKEND=file +EVENT_BUS_FILE=.data/platform-events.json +EVENT_BUS_POLL_MS=100 +EVENT_BUS_LEASE_MS=30000 # ── Extraction Service (port 4005 + Python sidecar 4006) ───── PYTHON_SIDECAR_URL=http://localhost:4006 diff --git a/packages/events/package.json b/packages/events/package.json index b29e6a50..d055de5a 100644 --- a/packages/events/package.json +++ b/packages/events/package.json @@ -17,6 +17,13 @@ "build": "tsc", "test": "vitest run" }, + "dependencies": { + "@bytelyst/queue": "workspace:*" + }, + "devDependencies": { + "@types/node": "^22.12.0", + "vitest": "^3.0.5" + }, "peerDependencies": { "zod": "^3.0.0" } diff --git a/packages/events/src/durable.test.ts b/packages/events/src/durable.test.ts new file mode 100644 index 00000000..b2733a7d --- /dev/null +++ b/packages/events/src/durable.test.ts @@ -0,0 +1,91 @@ +import { mkdtemp, rm } from 'node:fs/promises'; +import { join } from 'node:path'; +import { tmpdir } from 'node:os'; +import { describe, it, expect, vi } from 'vitest'; +import { FileQueueStore } from '@bytelyst/queue'; +import { MemoryQueueStore } from '@bytelyst/queue'; +import { DurableEventBus } from './durable.js'; + +describe('DurableEventBus', () => { + it('delivers queued events through the worker', async () => { + const store = new MemoryQueueStore(); + const bus = new DurableEventBus({ + store, + autoStart: false, + pollIntervalMs: 10, + }); + + const handler = vi.fn(); + bus.on('user.created', handler); + bus.start(); + + const result = await bus.emit('user.created', { + userId: 'u1', + email: 'test@example.com', + plan: 'free', + productId: 'lysnrai', + }); + + expect(result.handlerCount).toBe(1); + + await waitFor(() => { + expect(handler).toHaveBeenCalledOnce(); + }); + + await bus.stop(); + }); + + it('persists emitted events across bus instances before the worker starts', async () => { + const dir = await mkdtemp(join(tmpdir(), 'events-store-')); + const filePath = join(dir, 'events.json'); + + try { + const first = new DurableEventBus({ + store: new FileQueueStore({ filePath }), + autoStart: false, + pollIntervalMs: 10, + }); + + await first.emit('payment.failed', { + invoiceId: 'inv_1', + userId: 'u1', + amount: 499, + retryCount: 1, + productId: 'lysnrai', + }); + await first.stop(); + + const second = new DurableEventBus({ + store: new FileQueueStore({ filePath }), + autoStart: false, + pollIntervalMs: 10, + }); + + const handler = vi.fn(); + second.on('payment.failed', handler); + second.start(); + + await waitFor(() => { + expect(handler).toHaveBeenCalledOnce(); + }); + + await second.stop(); + } finally { + await rm(dir, { recursive: true, force: true }); + } + }); +}); + +async function waitFor(assertion: () => void, timeoutMs = 1_000): Promise { + const startedAt = Date.now(); + while (Date.now() - startedAt < timeoutMs) { + try { + assertion(); + return; + } catch { + await new Promise(resolve => setTimeout(resolve, 20)); + } + } + + assertion(); +} diff --git a/packages/events/src/durable.ts b/packages/events/src/durable.ts new file mode 100644 index 00000000..abb57e92 --- /dev/null +++ b/packages/events/src/durable.ts @@ -0,0 +1,152 @@ +import { QueueWorker, type QueueJob, type QueueStore } from '@bytelyst/queue'; +import type { EmitResult } from './memory.js'; +import type { + EventHandler, + EventSubscription, + PlatformEvent, + PlatformEventName, + PlatformEventPayload, +} from './types.js'; + +interface EventEnvelope { + event: PlatformEvent; +} + +export interface DurableEventBusOptions { + store: QueueStore; + queueName?: string; + workerId?: string; + pollIntervalMs?: number; + leaseMs?: number; + backoffMs?: number; + autoStart?: boolean; +} + +export class DurableEventBus { + private readonly handlers = new Map< + string, + Set<{ id: string; fn: EventHandler }> + >(); + private subscriptionCounter = 0; + private readonly queueName: string; + private readonly store: QueueStore; + private readonly worker: QueueWorker; + private running = false; + + constructor(options: DurableEventBusOptions) { + this.queueName = options.queueName ?? 'platform-events'; + this.store = options.store; + this.worker = new QueueWorker({ + queueName: this.queueName, + store: this.store, + workerId: options.workerId, + pollIntervalMs: options.pollIntervalMs, + leaseMs: options.leaseMs, + backoffMs: options.backoffMs, + handler: async job => { + await this.dispatch(job); + }, + }); + + if (options.autoStart !== false) { + this.start(); + } + } + + on(eventType: T, handler: EventHandler): EventSubscription { + const id = `sub_${++this.subscriptionCounter}`; + + if (!this.handlers.has(eventType)) { + this.handlers.set(eventType, new Set()); + } + + const entry = { id, fn: handler as EventHandler }; + this.handlers.get(eventType)!.add(entry); + + return { + id, + eventType, + unsubscribe: () => { + this.handlers.get(eventType)?.delete(entry); + }, + }; + } + + async emit( + eventType: T, + payload: PlatformEventPayload, + options?: { source?: string } + ): Promise { + const event: PlatformEvent = { + id: crypto.randomUUID(), + type: eventType, + payload, + timestamp: new Date().toISOString(), + source: options?.source, + }; + + await this.store.enqueue(this.queueName, { + idempotencyKey: event.id, + type: eventType, + payload: { event }, + productId: extractProductId(payload), + metadata: { + source: options?.source, + }, + }); + + return { + eventId: event.id, + handlerCount: this.listenerCount(eventType), + errors: [], + }; + } + + start(): void { + if (this.running) return; + this.running = true; + this.worker.start(); + } + + async stop(): Promise { + if (!this.running) return; + this.running = false; + await this.worker.stop(); + } + + clear(eventType?: PlatformEventName): void { + if (eventType) { + this.handlers.delete(eventType); + } else { + this.handlers.clear(); + } + } + + listenerCount(eventType: PlatformEventName): number { + return this.handlers.get(eventType)?.size ?? 0; + } + + eventTypes(): PlatformEventName[] { + return Array.from(this.handlers.entries()) + .filter(([, set]) => set.size > 0) + .map(([type]) => type as PlatformEventName); + } + + private async dispatch(job: QueueJob): Promise { + const event = job.payload.event; + const handlers = this.handlers.get(event.type); + if (!handlers || handlers.size === 0) { + return; + } + + await Promise.allSettled( + Array.from(handlers).map(async ({ fn }) => fn(event as PlatformEvent)) + ); + } +} + +function extractProductId(payload: unknown): string | undefined { + if (!payload || typeof payload !== 'object') return undefined; + const productId = (payload as { productId?: unknown }).productId; + return typeof productId === 'string' ? productId : undefined; +} diff --git a/packages/events/src/index.ts b/packages/events/src/index.ts index eb48fa50..2e0d02a9 100644 --- a/packages/events/src/index.ts +++ b/packages/events/src/index.ts @@ -1,5 +1,7 @@ export { EventBus } from './memory.js'; export type { EmitResult, EmitError } from './memory.js'; +export { DurableEventBus } from './durable.js'; +export type { DurableEventBusOptions } from './durable.js'; export { PlatformEventSchemas } from './types.js'; export type { PlatformEventName, diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 13a31c2c..adad5838 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -430,9 +430,19 @@ importers: packages/events: dependencies: + '@bytelyst/queue': + specifier: workspace:* + version: link:../queue zod: specifier: ^3.0.0 version: 3.25.76 + devDependencies: + '@types/node': + specifier: ^22.12.0 + version: 22.19.11 + vitest: + specifier: ^3.0.5 + version: 3.2.4(@types/debug@4.1.12)(@types/node@22.19.11)(happy-dom@18.0.1)(jiti@2.6.1)(jsdom@28.0.0(@noble/hashes@1.8.0))(lightningcss@1.31.1)(msw@2.12.10(@types/node@22.19.11)(typescript@5.9.3))(terser@5.46.0)(tsx@4.21.0)(yaml@2.8.2) packages/extraction: dependencies: @@ -772,6 +782,9 @@ importers: '@bytelyst/fastify-core': specifier: workspace:* version: link:../../packages/fastify-core + '@bytelyst/queue': + specifier: workspace:* + version: link:../../packages/queue '@bytelyst/storage': specifier: workspace:* version: link:../../packages/storage @@ -18915,8 +18928,8 @@ snapshots: '@next/eslint-plugin-next': 16.1.6 eslint: 9.39.2(jiti@2.6.1) eslint-import-resolver-node: 0.3.9 - eslint-import-resolver-typescript: 3.10.1(eslint-plugin-import@2.32.0)(eslint@9.39.2(jiti@2.6.1)) - eslint-plugin-import: 2.32.0(@typescript-eslint/parser@8.56.0(eslint@9.39.2(jiti@2.6.1))(typescript@5.9.3))(eslint-import-resolver-typescript@3.10.1)(eslint@9.39.2(jiti@2.6.1)) + eslint-import-resolver-typescript: 3.10.1(eslint-plugin-import@2.32.0(@typescript-eslint/parser@8.56.0(eslint@9.39.2(jiti@2.6.1))(typescript@5.9.3))(eslint@9.39.2(jiti@2.6.1)))(eslint@9.39.2(jiti@2.6.1)) + eslint-plugin-import: 2.32.0(@typescript-eslint/parser@8.56.0(eslint@9.39.2(jiti@2.6.1))(typescript@5.9.3))(eslint-import-resolver-typescript@3.10.1(eslint-plugin-import@2.32.0(@typescript-eslint/parser@8.56.0(eslint@9.39.2(jiti@2.6.1))(typescript@5.9.3))(eslint@9.39.2(jiti@2.6.1)))(eslint@9.39.2(jiti@2.6.1)))(eslint@9.39.2(jiti@2.6.1)) eslint-plugin-jsx-a11y: 6.10.2(eslint@9.39.2(jiti@2.6.1)) eslint-plugin-react: 7.37.5(eslint@9.39.2(jiti@2.6.1)) eslint-plugin-react-hooks: 7.0.1(eslint@9.39.2(jiti@2.6.1)) @@ -18938,6 +18951,21 @@ snapshots: transitivePeerDependencies: - supports-color + eslint-import-resolver-typescript@3.10.1(eslint-plugin-import@2.32.0(@typescript-eslint/parser@8.56.0(eslint@9.39.2(jiti@2.6.1))(typescript@5.9.3))(eslint@9.39.2(jiti@2.6.1)))(eslint@9.39.2(jiti@2.6.1)): + dependencies: + '@nolyfill/is-core-module': 1.0.39 + debug: 4.4.3 + eslint: 9.39.2(jiti@2.6.1) + get-tsconfig: 4.13.6 + is-bun-module: 2.0.0 + stable-hash: 0.0.5 + tinyglobby: 0.2.15 + unrs-resolver: 1.11.1 + optionalDependencies: + eslint-plugin-import: 2.32.0(@typescript-eslint/parser@8.56.0(eslint@9.39.2(jiti@2.6.1))(typescript@5.9.3))(eslint-import-resolver-typescript@3.10.1(eslint-plugin-import@2.32.0(@typescript-eslint/parser@8.56.0(eslint@9.39.2(jiti@2.6.1))(typescript@5.9.3))(eslint@9.39.2(jiti@2.6.1)))(eslint@9.39.2(jiti@2.6.1)))(eslint@9.39.2(jiti@2.6.1)) + transitivePeerDependencies: + - supports-color + eslint-import-resolver-typescript@3.10.1(eslint-plugin-import@2.32.0)(eslint@9.39.2(jiti@2.6.1)): dependencies: '@nolyfill/is-core-module': 1.0.39 @@ -18949,7 +18977,7 @@ snapshots: tinyglobby: 0.2.15 unrs-resolver: 1.11.1 optionalDependencies: - eslint-plugin-import: 2.32.0(@typescript-eslint/parser@8.56.0(eslint@9.39.2(jiti@2.6.1))(typescript@5.9.3))(eslint-import-resolver-typescript@3.10.1)(eslint@9.39.2(jiti@2.6.1)) + eslint-plugin-import: 2.32.0(@typescript-eslint/parser@8.55.0(eslint@9.39.2(jiti@2.6.1))(typescript@5.9.3))(eslint-import-resolver-typescript@3.10.1)(eslint@9.39.2(jiti@2.6.1)) transitivePeerDependencies: - supports-color @@ -18964,14 +18992,14 @@ snapshots: transitivePeerDependencies: - supports-color - eslint-module-utils@2.12.1(@typescript-eslint/parser@8.56.0(eslint@9.39.2(jiti@2.6.1))(typescript@5.9.3))(eslint-import-resolver-node@0.3.9)(eslint-import-resolver-typescript@3.10.1)(eslint@9.39.2(jiti@2.6.1)): + eslint-module-utils@2.12.1(@typescript-eslint/parser@8.56.0(eslint@9.39.2(jiti@2.6.1))(typescript@5.9.3))(eslint-import-resolver-node@0.3.9)(eslint-import-resolver-typescript@3.10.1(eslint-plugin-import@2.32.0(@typescript-eslint/parser@8.56.0(eslint@9.39.2(jiti@2.6.1))(typescript@5.9.3))(eslint@9.39.2(jiti@2.6.1)))(eslint@9.39.2(jiti@2.6.1)))(eslint@9.39.2(jiti@2.6.1)): dependencies: debug: 3.2.7 optionalDependencies: '@typescript-eslint/parser': 8.56.0(eslint@9.39.2(jiti@2.6.1))(typescript@5.9.3) eslint: 9.39.2(jiti@2.6.1) eslint-import-resolver-node: 0.3.9 - eslint-import-resolver-typescript: 3.10.1(eslint-plugin-import@2.32.0)(eslint@9.39.2(jiti@2.6.1)) + eslint-import-resolver-typescript: 3.10.1(eslint-plugin-import@2.32.0(@typescript-eslint/parser@8.56.0(eslint@9.39.2(jiti@2.6.1))(typescript@5.9.3))(eslint@9.39.2(jiti@2.6.1)))(eslint@9.39.2(jiti@2.6.1)) transitivePeerDependencies: - supports-color @@ -19004,7 +19032,7 @@ snapshots: - eslint-import-resolver-webpack - supports-color - eslint-plugin-import@2.32.0(@typescript-eslint/parser@8.56.0(eslint@9.39.2(jiti@2.6.1))(typescript@5.9.3))(eslint-import-resolver-typescript@3.10.1)(eslint@9.39.2(jiti@2.6.1)): + eslint-plugin-import@2.32.0(@typescript-eslint/parser@8.56.0(eslint@9.39.2(jiti@2.6.1))(typescript@5.9.3))(eslint-import-resolver-typescript@3.10.1(eslint-plugin-import@2.32.0(@typescript-eslint/parser@8.56.0(eslint@9.39.2(jiti@2.6.1))(typescript@5.9.3))(eslint@9.39.2(jiti@2.6.1)))(eslint@9.39.2(jiti@2.6.1)))(eslint@9.39.2(jiti@2.6.1)): dependencies: '@rtsao/scc': 1.1.0 array-includes: 3.1.9 @@ -19015,7 +19043,7 @@ snapshots: doctrine: 2.1.0 eslint: 9.39.2(jiti@2.6.1) eslint-import-resolver-node: 0.3.9 - eslint-module-utils: 2.12.1(@typescript-eslint/parser@8.56.0(eslint@9.39.2(jiti@2.6.1))(typescript@5.9.3))(eslint-import-resolver-node@0.3.9)(eslint-import-resolver-typescript@3.10.1)(eslint@9.39.2(jiti@2.6.1)) + eslint-module-utils: 2.12.1(@typescript-eslint/parser@8.56.0(eslint@9.39.2(jiti@2.6.1))(typescript@5.9.3))(eslint-import-resolver-node@0.3.9)(eslint-import-resolver-typescript@3.10.1(eslint-plugin-import@2.32.0(@typescript-eslint/parser@8.56.0(eslint@9.39.2(jiti@2.6.1))(typescript@5.9.3))(eslint@9.39.2(jiti@2.6.1)))(eslint@9.39.2(jiti@2.6.1)))(eslint@9.39.2(jiti@2.6.1)) hasown: 2.0.2 is-core-module: 2.16.1 is-glob: 4.0.3 diff --git a/services/platform-service/package.json b/services/platform-service/package.json index a7307c01..7603cbfd 100644 --- a/services/platform-service/package.json +++ b/services/platform-service/package.json @@ -8,7 +8,7 @@ "dev": "tsx watch src/server.ts", "build": "tsc", "start": "node dist/server.js", - "pretest": "corepack pnpm --dir ../.. --filter @bytelyst/auth --filter @bytelyst/blob --filter @bytelyst/storage --filter @bytelyst/config --filter @bytelyst/cosmos --filter @bytelyst/datastore --filter @bytelyst/errors --filter @bytelyst/events --filter @bytelyst/fastify-core build", + "pretest": "corepack pnpm --dir ../.. --filter @bytelyst/auth --filter @bytelyst/blob --filter @bytelyst/storage --filter @bytelyst/config --filter @bytelyst/cosmos --filter @bytelyst/datastore --filter @bytelyst/errors --filter @bytelyst/queue --filter @bytelyst/events --filter @bytelyst/fastify-core build", "test": "vitest run", "test:watch": "vitest", "lint": "eslint src/", @@ -25,6 +25,7 @@ "@bytelyst/errors": "workspace:*", "@bytelyst/events": "workspace:*", "@bytelyst/fastify-core": "workspace:*", + "@bytelyst/queue": "workspace:*", "@fastify/cors": "^10.0.2", "@fastify/rate-limit": "^10.3.0", "@fastify/swagger": "^9.4.2", diff --git a/services/platform-service/src/lib/config.ts b/services/platform-service/src/lib/config.ts index 7043a410..fa07fbeb 100644 --- a/services/platform-service/src/lib/config.ts +++ b/services/platform-service/src/lib/config.ts @@ -61,6 +61,10 @@ const envSchema = z.object({ TELEGRAM_DEFAULT_CHAT_ID: z.string().optional(), SLACK_WEBHOOK_URL: z.string().optional(), SLACK_DEFAULT_CHANNEL: z.string().optional(), + EVENT_BUS_BACKEND: z.enum(['memory', 'file']).default('file'), + EVENT_BUS_FILE: z.string().default('.data/platform-events.json'), + EVENT_BUS_POLL_MS: z.coerce.number().default(100), + EVENT_BUS_LEASE_MS: z.coerce.number().default(30_000), }); export const config = envSchema.parse(process.env); diff --git a/services/platform-service/src/lib/event-bus.test.ts b/services/platform-service/src/lib/event-bus.test.ts new file mode 100644 index 00000000..3be76808 --- /dev/null +++ b/services/platform-service/src/lib/event-bus.test.ts @@ -0,0 +1,47 @@ +import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'; +import { join } from 'node:path'; +import { tmpdir } from 'node:os'; + +const ORIGINAL_ENV = { ...process.env }; + +describe('event bus backend wiring', () => { + beforeEach(() => { + vi.resetModules(); + process.env = { + ...ORIGINAL_ENV, + COSMOS_ENDPOINT: 'http://localhost:8081', + COSMOS_KEY: 'test-key', + JWT_SECRET: 'test-jwt-secret', + }; + }); + + afterEach(async () => { + vi.resetModules(); + process.env = { ...ORIGINAL_ENV }; + }); + + it('uses the durable event bus when configured for file backend', async () => { + process.env.EVENT_BUS_BACKEND = 'file'; + process.env.EVENT_BUS_FILE = join(tmpdir(), `platform-events-${Date.now()}.json`); + + const [{ DurableEventBus }, eventBusModule] = await Promise.all([ + import('@bytelyst/events'), + import('./event-bus.js'), + ]); + + expect(eventBusModule.bus).toBeInstanceOf(DurableEventBus); + eventBusModule.startEventBus(); + await eventBusModule.stopEventBus(); + }); + + it('uses the in-memory event bus when configured for memory backend', async () => { + process.env.EVENT_BUS_BACKEND = 'memory'; + + const [{ EventBus }, eventBusModule] = await Promise.all([ + import('@bytelyst/events'), + import('./event-bus.js'), + ]); + + expect(eventBusModule.bus).toBeInstanceOf(EventBus); + }); +}); diff --git a/services/platform-service/src/lib/event-bus.ts b/services/platform-service/src/lib/event-bus.ts index 66b1c1bd..e5aeaa55 100644 --- a/services/platform-service/src/lib/event-bus.ts +++ b/services/platform-service/src/lib/event-bus.ts @@ -1,7 +1,30 @@ -import { EventBus } from '@bytelyst/events'; +import { DurableEventBus, EventBus } from '@bytelyst/events'; +import { FileQueueStore } from '@bytelyst/queue'; +import { config } from './config.js'; // ── Singleton Event Bus ────────────────────────────────────── // Single instance shared across all modules in platform-service. // Import this wherever you need to emit or subscribe to events. -export const bus = new EventBus(); +export const bus = + config.EVENT_BUS_BACKEND === 'file' + ? new DurableEventBus({ + store: new FileQueueStore({ filePath: config.EVENT_BUS_FILE }), + queueName: 'platform-events', + pollIntervalMs: config.EVENT_BUS_POLL_MS, + leaseMs: config.EVENT_BUS_LEASE_MS, + autoStart: false, + }) + : new EventBus(); + +export function startEventBus(): void { + if (bus instanceof DurableEventBus) { + bus.start(); + } +} + +export async function stopEventBus(): Promise { + if (bus instanceof DurableEventBus) { + await bus.stop(); + } +} diff --git a/services/platform-service/src/server.ts b/services/platform-service/src/server.ts index 7bb36aec..9f519286 100644 --- a/services/platform-service/src/server.ts +++ b/services/platform-service/src/server.ts @@ -89,6 +89,7 @@ import { registerDiagnosticsSubscribers } from './modules/diagnostics/subscriber import { registerDeliverySubscribers } from './modules/delivery/subscribers.js'; import { verifyToken } from './modules/auth/jwt.js'; import { registerOptionalApiKeyContext } from './lib/api-key-auth.js'; +import { startEventBus, stopEventBus } from './lib/event-bus.js'; await initCosmosIfNeeded(); await loadProductCache(); @@ -210,6 +211,10 @@ await app.register(surveyRoutes, { prefix: '/api' }); // Register event bus subscribers registerDiagnosticsSubscribers(app.log); registerDeliverySubscribers(app.log); +startEventBus(); +app.addHook('onClose', async () => { + await stopEventBus(); +}); // Start diagnostic trigger evaluation job (Phase 4) startTriggerEvaluationJob(app.log);