import { mkdtemp, rm } from 'node:fs/promises'; import { join } from 'node:path'; import { tmpdir } from 'node:os'; import { describe, it, expect, vi } from 'vitest'; import { FileQueueStore } from '@bytelyst/queue'; import { MemoryQueueStore } from '@bytelyst/queue'; import { DurableEventBus } from './durable.js'; describe('DurableEventBus', () => { it('delivers queued events through the worker', async () => { const store = new MemoryQueueStore(); const bus = new DurableEventBus({ store, autoStart: false, pollIntervalMs: 10, }); const handler = vi.fn(); bus.on('user.created', handler); bus.start(); const result = await bus.emit('user.created', { userId: 'u1', email: 'test@example.com', plan: 'free', productId: 'lysnrai', }); expect(result.handlerCount).toBe(1); await waitFor(() => { expect(handler).toHaveBeenCalledOnce(); }); await bus.stop(); }); it('persists emitted events across bus instances before the worker starts', async () => { const dir = await mkdtemp(join(tmpdir(), 'events-store-')); const filePath = join(dir, 'events.json'); try { const first = new DurableEventBus({ store: new FileQueueStore({ filePath }), autoStart: false, pollIntervalMs: 10, }); await first.emit('payment.failed', { invoiceId: 'inv_1', userId: 'u1', amount: 499, retryCount: 1, productId: 'lysnrai', }); await first.stop(); const second = new DurableEventBus({ store: new FileQueueStore({ filePath }), autoStart: false, pollIntervalMs: 10, }); const handler = vi.fn(); second.on('payment.failed', handler); second.start(); await waitFor(() => { expect(handler).toHaveBeenCalledOnce(); }); await second.stop(); } finally { await rm(dir, { recursive: true, force: true }); } }); }); async function waitFor(assertion: () => void, timeoutMs = 1_000): Promise { const startedAt = Date.now(); while (Date.now() - startedAt < timeoutMs) { try { assertion(); return; } catch { await new Promise(resolve => setTimeout(resolve, 20)); } } assertion(); }