diff --git a/.windsurf/workflows/repos.txt b/.windsurf/workflows/repos.txt index 828a9bca..29293819 100644 --- a/.windsurf/workflows/repos.txt +++ b/.windsurf/workflows/repos.txt @@ -10,3 +10,4 @@ learning_ai_fastgap learning_ai_jarvis_jr learning_ai_peakpulse learning_ai_notes +learning_ai_flowmonk diff --git a/packages/event-store/package.json b/packages/event-store/package.json new file mode 100644 index 00000000..77f5b8e3 --- /dev/null +++ b/packages/event-store/package.json @@ -0,0 +1,21 @@ +{ + "name": "@bytelyst/event-store", + "version": "0.1.0", + "description": "Persistent event store with pluggable backends (in-memory, file, Cosmos) for ByteLyst product backends", + "type": "module", + "exports": { + ".": { + "import": "./dist/index.js", + "types": "./dist/index.d.ts" + } + }, + "main": "./dist/index.js", + "types": "./dist/index.d.ts", + "files": [ + "dist" + ], + "scripts": { + "build": "tsc", + "test": "vitest run" + } +} diff --git a/packages/event-store/src/file-store.ts b/packages/event-store/src/file-store.ts new file mode 100644 index 00000000..88b8df1d --- /dev/null +++ b/packages/event-store/src/file-store.ts @@ -0,0 +1,68 @@ +/** + * File-based event store implementation. + * Appends events as JSON lines to a file on disk. + * Suitable for single-instance dev/staging deployments. + */ + +import { readFile, appendFile, writeFile, mkdir } from 'node:fs/promises'; +import { dirname } from 'node:path'; +import type { EventStore, StoredEvent, EventStoreQuery } from './types.js'; + +export interface FileStoreOptions { + filePath: string; +} + +export class FileEventStore implements EventStore { + private readonly filePath: string; + + constructor(options: FileStoreOptions) { + this.filePath = options.filePath; + } + + async append(event: StoredEvent): Promise { + await mkdir(dirname(this.filePath), { recursive: true }); + await appendFile(this.filePath, JSON.stringify(event) + '\n', 'utf-8'); + } + + async query(q: EventStoreQuery): Promise { + const all = await this.readAll(); + let results = all; + + if (q.userId) results = results.filter(e => e.userId === q.userId); + if (q.type) results = results.filter(e => e.type === q.type); + if (q.after) results = results.filter(e => e.timestamp > q.after!); + if (q.before) results = results.filter(e => e.timestamp < q.before!); + + if (q.limit && q.limit > 0) { + results = results.slice(-q.limit); + } + + return results; + } + + async recent(limit = 50): Promise { + const all = await this.readAll(); + return all.slice(-limit); + } + + async count(): Promise { + const all = await this.readAll(); + return all.length; + } + + async clear(): Promise { + await writeFile(this.filePath, '', 'utf-8'); + } + + private async readAll(): Promise { + try { + const content = await readFile(this.filePath, 'utf-8'); + return content + .split('\n') + .filter(line => line.trim()) + .map(line => JSON.parse(line) as StoredEvent); + } catch { + return []; + } + } +} diff --git a/packages/event-store/src/index.ts b/packages/event-store/src/index.ts new file mode 100644 index 00000000..594f451c --- /dev/null +++ b/packages/event-store/src/index.ts @@ -0,0 +1,5 @@ +export type { EventStore, StoredEvent, EventStoreQuery } from './types.js'; +export { MemoryEventStore } from './memory-store.js'; +export type { MemoryStoreOptions } from './memory-store.js'; +export { FileEventStore } from './file-store.js'; +export type { FileStoreOptions } from './file-store.js'; diff --git a/packages/event-store/src/memory-store.test.ts b/packages/event-store/src/memory-store.test.ts new file mode 100644 index 00000000..953abfc3 --- /dev/null +++ b/packages/event-store/src/memory-store.test.ts @@ -0,0 +1,88 @@ +import { describe, it, expect, beforeEach } from 'vitest'; +import { MemoryEventStore } from './memory-store.js'; +import type { StoredEvent } from './types.js'; + +function makeEvent(overrides?: Partial): StoredEvent { + return { + id: crypto.randomUUID(), + type: 'test.event', + userId: 'u1', + productId: 'testprod', + timestamp: new Date().toISOString(), + payload: {}, + ...overrides, + }; +} + +describe('MemoryEventStore', () => { + let store: MemoryEventStore; + + beforeEach(() => { + store = new MemoryEventStore({ maxEvents: 100 }); + }); + + it('appends and retrieves events', async () => { + await store.append(makeEvent()); + expect(await store.count()).toBe(1); + const recent = await store.recent(); + expect(recent).toHaveLength(1); + }); + + it('caps at maxEvents', async () => { + for (let i = 0; i < 120; i++) { + await store.append(makeEvent({ id: `e${i}` })); + } + expect(await store.count()).toBe(100); + }); + + it('queries by userId', async () => { + await store.append(makeEvent({ userId: 'u1' })); + await store.append(makeEvent({ userId: 'u2' })); + await store.append(makeEvent({ userId: 'u1' })); + const results = await store.query({ userId: 'u1' }); + expect(results).toHaveLength(2); + }); + + it('queries by type', async () => { + await store.append(makeEvent({ type: 'task.created' })); + await store.append(makeEvent({ type: 'schedule.generated' })); + await store.append(makeEvent({ type: 'task.created' })); + const results = await store.query({ type: 'task.created' }); + expect(results).toHaveLength(2); + }); + + it('queries with time range', async () => { + await store.append(makeEvent({ timestamp: '2026-01-01T00:00:00Z' })); + await store.append(makeEvent({ timestamp: '2026-03-01T00:00:00Z' })); + await store.append(makeEvent({ timestamp: '2026-06-01T00:00:00Z' })); + const results = await store.query({ + after: '2026-02-01T00:00:00Z', + before: '2026-04-01T00:00:00Z', + }); + expect(results).toHaveLength(1); + }); + + it('queries with limit', async () => { + for (let i = 0; i < 10; i++) { + await store.append(makeEvent()); + } + const results = await store.query({ limit: 3 }); + expect(results).toHaveLength(3); + }); + + it('clears all events', async () => { + await store.append(makeEvent()); + await store.append(makeEvent()); + await store.clear(); + expect(await store.count()).toBe(0); + }); + + it('recent returns last N events', async () => { + for (let i = 0; i < 10; i++) { + await store.append(makeEvent({ id: `e${i}` })); + } + const recent = await store.recent(3); + expect(recent).toHaveLength(3); + expect(recent[0].id).toBe('e7'); + }); +}); diff --git a/packages/event-store/src/memory-store.ts b/packages/event-store/src/memory-store.ts new file mode 100644 index 00000000..69ab390e --- /dev/null +++ b/packages/event-store/src/memory-store.ts @@ -0,0 +1,54 @@ +/** + * In-memory event store implementation. + * Useful for development, testing, and as a fallback when no persistent backend is configured. + * Caps at maxEvents to prevent unbounded memory growth. + */ + +import type { EventStore, StoredEvent, EventStoreQuery } from './types.js'; + +export interface MemoryStoreOptions { + maxEvents?: number; +} + +export class MemoryEventStore implements EventStore { + private events: StoredEvent[] = []; + private readonly maxEvents: number; + + constructor(options?: MemoryStoreOptions) { + this.maxEvents = options?.maxEvents ?? 10_000; + } + + async append(event: StoredEvent): Promise { + this.events.push(event); + if (this.events.length > this.maxEvents) { + this.events = this.events.slice(-this.maxEvents); + } + } + + async query(q: EventStoreQuery): Promise { + let results = this.events; + + if (q.userId) results = results.filter(e => e.userId === q.userId); + if (q.type) results = results.filter(e => e.type === q.type); + if (q.after) results = results.filter(e => e.timestamp > q.after!); + if (q.before) results = results.filter(e => e.timestamp < q.before!); + + if (q.limit && q.limit > 0) { + results = results.slice(-q.limit); + } + + return results; + } + + async recent(limit = 50): Promise { + return this.events.slice(-limit); + } + + async count(): Promise { + return this.events.length; + } + + async clear(): Promise { + this.events = []; + } +} diff --git a/packages/event-store/src/types.ts b/packages/event-store/src/types.ts new file mode 100644 index 00000000..5b114188 --- /dev/null +++ b/packages/event-store/src/types.ts @@ -0,0 +1,29 @@ +/** + * Pluggable event store interface for ByteLyst product backends. + * Products define their own event shapes; the store handles persistence. + */ + +export interface StoredEvent { + id: string; + type: string; + userId: string; + productId: string; + timestamp: string; + payload: Record; +} + +export interface EventStoreQuery { + userId?: string; + type?: string; + after?: string; + before?: string; + limit?: number; +} + +export interface EventStore { + append(event: StoredEvent): Promise; + query(q: EventStoreQuery): Promise; + recent(limit?: number): Promise; + count(): Promise; + clear(): Promise; +} diff --git a/packages/event-store/tsconfig.json b/packages/event-store/tsconfig.json new file mode 100644 index 00000000..5edad813 --- /dev/null +++ b/packages/event-store/tsconfig.json @@ -0,0 +1,9 @@ +{ + "extends": "../../tsconfig.base.json", + "compilerOptions": { + "outDir": "dist", + "rootDir": "src" + }, + "include": ["src"], + "exclude": ["src/**/*.test.ts"] +} diff --git a/packages/fastify-sse/package.json b/packages/fastify-sse/package.json new file mode 100644 index 00000000..fee48d16 --- /dev/null +++ b/packages/fastify-sse/package.json @@ -0,0 +1,24 @@ +{ + "name": "@bytelyst/fastify-sse", + "version": "0.1.0", + "description": "Fastify plugin for Server-Sent Events (SSE) — real-time push for ByteLyst product backends", + "type": "module", + "exports": { + ".": { + "import": "./dist/index.js", + "types": "./dist/index.d.ts" + } + }, + "main": "./dist/index.js", + "types": "./dist/index.d.ts", + "files": [ + "dist" + ], + "scripts": { + "build": "tsc", + "test": "vitest run" + }, + "peerDependencies": { + "fastify": "^5.0.0" + } +} diff --git a/packages/fastify-sse/src/hub.ts b/packages/fastify-sse/src/hub.ts new file mode 100644 index 00000000..74369462 --- /dev/null +++ b/packages/fastify-sse/src/hub.ts @@ -0,0 +1,143 @@ +/** + * SSE Hub — manages connected clients and broadcasts events. + * Product backends create an SSEHub instance and push events to it; + * the hub fans out to all connected SSE clients. + */ + +import type { ServerResponse } from 'node:http'; + +export interface SSEMessage { + event?: string; + data: string; + id?: string; + retry?: number; +} + +interface ConnectedClient { + id: string; + userId?: string; + res: ServerResponse; + connectedAt: string; +} + +export class SSEHub { + private clients = new Map(); + private clientCounter = 0; + + /** + * Add a new SSE client connection. + * Sets up the SSE headers and returns a client ID. + */ + addClient(res: ServerResponse, userId?: string): string { + const id = `sse_${++this.clientCounter}_${Date.now()}`; + + res.writeHead(200, { + 'Content-Type': 'text/event-stream', + 'Cache-Control': 'no-cache', + Connection: 'keep-alive', + 'X-Accel-Buffering': 'no', + }); + + // Send initial connection event + res.write(`event: connected\ndata: ${JSON.stringify({ clientId: id })}\n\n`); + + const client: ConnectedClient = { + id, + userId, + res, + connectedAt: new Date().toISOString(), + }; + + this.clients.set(id, client); + + // Clean up on close + res.on('close', () => { + this.clients.delete(id); + }); + + return id; + } + + /** + * Broadcast an SSE message to all connected clients. + */ + broadcast(message: SSEMessage): number { + let sent = 0; + const formatted = formatSSE(message); + + for (const [id, client] of this.clients) { + try { + client.res.write(formatted); + sent++; + } catch { + this.clients.delete(id); + } + } + + return sent; + } + + /** + * Send an SSE message to a specific user's connections. + */ + sendToUser(userId: string, message: SSEMessage): number { + let sent = 0; + const formatted = formatSSE(message); + + for (const [id, client] of this.clients) { + if (client.userId === userId) { + try { + client.res.write(formatted); + sent++; + } catch { + this.clients.delete(id); + } + } + } + + return sent; + } + + /** + * Send a heartbeat (comment) to all clients to keep connections alive. + */ + heartbeat(): void { + for (const [id, client] of this.clients) { + try { + client.res.write(': heartbeat\n\n'); + } catch { + this.clients.delete(id); + } + } + } + + /** + * Get count of connected clients. + */ + get clientCount(): number { + return this.clients.size; + } + + /** + * Disconnect all clients. + */ + disconnectAll(): void { + for (const [, client] of this.clients) { + try { + client.res.end(); + } catch { + /* already closed */ + } + } + this.clients.clear(); + } +} + +function formatSSE(message: SSEMessage): string { + let output = ''; + if (message.id) output += `id: ${message.id}\n`; + if (message.event) output += `event: ${message.event}\n`; + if (message.retry) output += `retry: ${message.retry}\n`; + output += `data: ${message.data}\n\n`; + return output; +} diff --git a/packages/fastify-sse/src/index.ts b/packages/fastify-sse/src/index.ts new file mode 100644 index 00000000..bb76a9c2 --- /dev/null +++ b/packages/fastify-sse/src/index.ts @@ -0,0 +1,3 @@ +export { SSEHub } from './hub.js'; +export { ssePlugin } from './plugin.js'; +export type { SSEPluginOptions, SSEClient } from './plugin.js'; diff --git a/packages/fastify-sse/src/plugin.ts b/packages/fastify-sse/src/plugin.ts new file mode 100644 index 00000000..1c12d644 --- /dev/null +++ b/packages/fastify-sse/src/plugin.ts @@ -0,0 +1,61 @@ +/** + * Fastify plugin that registers an SSE endpoint. + * Product backends configure the path and optional auth check. + */ + +import type { FastifyInstance, FastifyRequest, FastifyReply } from 'fastify'; +import { SSEHub } from './hub.js'; + +export interface SSEClient { + id: string; + userId?: string; +} + +export interface SSEPluginOptions { + /** Route path for the SSE endpoint (default: /events/stream) */ + path?: string; + /** Extract userId from request (optional, enables per-user targeting) */ + getUserId?: (req: FastifyRequest) => string | undefined; + /** Heartbeat interval in ms (default: 30000, set 0 to disable) */ + heartbeatIntervalMs?: number; + /** The SSE hub instance to use (creates one if not provided) */ + hub?: SSEHub; +} + +export async function ssePlugin( + app: FastifyInstance, + options: SSEPluginOptions = {} +): Promise { + const path = options.path ?? '/events/stream'; + const heartbeatMs = options.heartbeatIntervalMs ?? 30_000; + const hub = options.hub ?? new SSEHub(); + + // Decorate app with the hub so routes can push events + if (!app.hasDecorator('sseHub')) { + app.decorate('sseHub', hub); + } + + // Register SSE endpoint + app.get(path, async (req: FastifyRequest, reply: FastifyReply) => { + const userId = options.getUserId?.(req); + + // Hijack the raw response for SSE streaming + const raw = reply.raw; + hub.addClient(raw, userId); + + // Prevent Fastify from sending its own response + reply.hijack(); + }); + + // Heartbeat timer + let heartbeatTimer: ReturnType | undefined; + if (heartbeatMs > 0) { + heartbeatTimer = setInterval(() => hub.heartbeat(), heartbeatMs); + } + + // Cleanup on close + app.addHook('onClose', async () => { + if (heartbeatTimer) clearInterval(heartbeatTimer); + hub.disconnectAll(); + }); +} diff --git a/packages/fastify-sse/tsconfig.json b/packages/fastify-sse/tsconfig.json new file mode 100644 index 00000000..5edad813 --- /dev/null +++ b/packages/fastify-sse/tsconfig.json @@ -0,0 +1,9 @@ +{ + "extends": "../../tsconfig.base.json", + "compilerOptions": { + "outDir": "dist", + "rootDir": "src" + }, + "include": ["src"], + "exclude": ["src/**/*.test.ts"] +} diff --git a/packages/webhook-dispatch/package.json b/packages/webhook-dispatch/package.json new file mode 100644 index 00000000..c9ff5c20 --- /dev/null +++ b/packages/webhook-dispatch/package.json @@ -0,0 +1,21 @@ +{ + "name": "@bytelyst/webhook-dispatch", + "version": "0.1.0", + "description": "Reusable webhook dispatch with HMAC-SHA256 signing, exponential backoff retry, and delivery tracking for ByteLyst product backends", + "type": "module", + "exports": { + ".": { + "import": "./dist/index.js", + "types": "./dist/index.d.ts" + } + }, + "main": "./dist/index.js", + "types": "./dist/index.d.ts", + "files": [ + "dist" + ], + "scripts": { + "build": "tsc", + "test": "vitest run" + } +} diff --git a/packages/webhook-dispatch/src/dispatcher.test.ts b/packages/webhook-dispatch/src/dispatcher.test.ts new file mode 100644 index 00000000..42f2e2c8 --- /dev/null +++ b/packages/webhook-dispatch/src/dispatcher.test.ts @@ -0,0 +1,163 @@ +import { describe, it, expect, vi, beforeEach, afterEach } from 'vitest'; +import { signPayload, deliverToTarget, dispatchToTargets } from './dispatcher.js'; +import type { WebhookTarget } from './types.js'; + +function makeTarget(overrides?: Partial): WebhookTarget { + return { + id: 'wh_1', + url: 'https://example.com/webhook', + secret: 'test-secret-key', + events: [], + enabled: true, + ...overrides, + }; +} + +describe('signPayload', () => { + it('produces consistent HMAC-SHA256 signatures', () => { + const sig1 = signPayload('{"test":true}', 'secret'); + const sig2 = signPayload('{"test":true}', 'secret'); + expect(sig1).toBe(sig2); + expect(sig1).toMatch(/^[a-f0-9]{64}$/); + }); + + it('produces different signatures for different secrets', () => { + const sig1 = signPayload('data', 'secret1'); + const sig2 = signPayload('data', 'secret2'); + expect(sig1).not.toBe(sig2); + }); +}); + +describe('deliverToTarget', () => { + beforeEach(() => { + vi.stubGlobal('fetch', vi.fn()); + }); + afterEach(() => { + vi.restoreAllMocks(); + }); + + it('returns success on 200', async () => { + (fetch as ReturnType).mockResolvedValue({ ok: true, status: 200 }); + + const result = await deliverToTarget( + makeTarget(), + 'task.created', + { taskId: 't1' }, + 'flowmonk', + { maxRetries: 1, backoffIntervals: [0] } + ); + + expect(result.status).toBe('success'); + expect(result.attempts).toHaveLength(1); + expect(result.attempts[0].responseCode).toBe(200); + }); + + it('returns failed after exhausting retries on 500', async () => { + (fetch as ReturnType).mockResolvedValue({ ok: false, status: 500 }); + + const result = await deliverToTarget( + makeTarget(), + 'task.created', + { taskId: 't1' }, + 'flowmonk', + { maxRetries: 2, backoffIntervals: [0] } + ); + + expect(result.status).toBe('failed'); + expect(result.attempts).toHaveLength(2); + }); + + it('returns failed on network error', async () => { + (fetch as ReturnType).mockRejectedValue(new Error('ECONNREFUSED')); + + const result = await deliverToTarget( + makeTarget(), + 'task.created', + { taskId: 't1' }, + 'flowmonk', + { maxRetries: 1, backoffIntervals: [0] } + ); + + expect(result.status).toBe('failed'); + expect(result.attempts[0].error).toBe('ECONNREFUSED'); + }); + + it('calls onDelivery callback', async () => { + (fetch as ReturnType).mockResolvedValue({ ok: true, status: 200 }); + const onDelivery = vi.fn(); + + await deliverToTarget(makeTarget(), 'task.created', { taskId: 't1' }, 'flowmonk', { + maxRetries: 1, + backoffIntervals: [0], + onDelivery, + }); + + expect(onDelivery).toHaveBeenCalledOnce(); + expect(onDelivery.mock.calls[0][0].status).toBe('success'); + }); + + it('sends correct headers', async () => { + (fetch as ReturnType).mockResolvedValue({ ok: true, status: 200 }); + + await deliverToTarget( + makeTarget({ url: 'https://test.com/hook' }), + 'schedule.generated', + {}, + 'flowmonk', + { maxRetries: 1, backoffIntervals: [0] } + ); + + const call = (fetch as ReturnType).mock.calls[0]; + expect(call[0]).toBe('https://test.com/hook'); + expect(call[1].headers['Content-Type']).toBe('application/json'); + expect(call[1].headers['X-Webhook-Event']).toBe('schedule.generated'); + expect(call[1].headers['X-Webhook-Signature']).toMatch(/^sha256=[a-f0-9]{64}$/); + }); +}); + +describe('dispatchToTargets', () => { + beforeEach(() => { + vi.stubGlobal('fetch', vi.fn().mockResolvedValue({ ok: true, status: 200 })); + }); + afterEach(() => { + vi.restoreAllMocks(); + }); + + it('dispatches to all enabled matching targets', async () => { + const targets = [ + makeTarget({ id: 'wh_1', events: ['task.created'] }), + makeTarget({ id: 'wh_2', events: ['task.created', 'schedule.generated'] }), + makeTarget({ id: 'wh_3', events: ['schedule.generated'], enabled: false }), + ]; + + const results = await dispatchToTargets(targets, 'task.created', {}, 'flowmonk', { + maxRetries: 1, + backoffIntervals: [0], + }); + + expect(results).toHaveLength(2); + expect(results.every(r => r.status === 'success')).toBe(true); + }); + + it('dispatches to targets with empty events (wildcard)', async () => { + const targets = [makeTarget({ id: 'wh_1', events: [] })]; + + const results = await dispatchToTargets(targets, 'any.event', {}, 'flowmonk', { + maxRetries: 1, + backoffIntervals: [0], + }); + + expect(results).toHaveLength(1); + }); + + it('returns empty array when no targets match', async () => { + const targets = [makeTarget({ events: ['other.event'] })]; + + const results = await dispatchToTargets(targets, 'task.created', {}, 'flowmonk', { + maxRetries: 1, + backoffIntervals: [0], + }); + + expect(results).toHaveLength(0); + }); +}); diff --git a/packages/webhook-dispatch/src/dispatcher.ts b/packages/webhook-dispatch/src/dispatcher.ts new file mode 100644 index 00000000..11e8be24 --- /dev/null +++ b/packages/webhook-dispatch/src/dispatcher.ts @@ -0,0 +1,157 @@ +/** + * Reusable webhook dispatcher with HMAC-SHA256 signing and exponential backoff retry. + * Extracted from platform-service's dispatcher for cross-product reuse. + */ + +import { randomUUID, createHmac } from 'node:crypto'; +import type { + WebhookTarget, + WebhookPayload, + DeliveryAttempt, + DeliveryResult, + DispatchOptions, +} from './types.js'; + +const DEFAULT_MAX_RETRIES = 3; +const DEFAULT_TIMEOUT_MS = 5_000; +const DEFAULT_BACKOFF = [10_000, 60_000, 300_000]; + +/** + * Sign a webhook payload body with HMAC-SHA256. + */ +export function signPayload(body: string, secret: string): string { + return createHmac('sha256', secret).update(body).digest('hex'); +} + +/** + * Dispatch an event to a single webhook target with retry and signing. + */ +export async function deliverToTarget( + target: WebhookTarget, + event: string, + data: Record, + productId: string, + options?: DispatchOptions +): Promise { + const maxRetries = options?.maxRetries ?? DEFAULT_MAX_RETRIES; + const timeoutMs = options?.timeoutMs ?? DEFAULT_TIMEOUT_MS; + const backoff = options?.backoffIntervals ?? DEFAULT_BACKOFF; + + const deliveryId = randomUUID(); + const timestamp = new Date().toISOString(); + + const payload: WebhookPayload = { + id: deliveryId, + event, + productId, + timestamp, + data, + }; + + const body = JSON.stringify(payload); + const signature = signPayload(body, target.secret); + const attempts: DeliveryAttempt[] = []; + + for (let attempt = 0; attempt < maxRetries; attempt++) { + if (attempt > 0) { + const delay = backoff[attempt - 1] ?? backoff[backoff.length - 1]; + await sleep(delay); + } + + const start = Date.now(); + const attemptRecord: DeliveryAttempt = { + attemptNumber: attempt + 1, + durationMs: 0, + attemptedAt: new Date().toISOString(), + }; + + try { + const res = await fetch(target.url, { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + 'X-Webhook-Signature': `sha256=${signature}`, + 'X-Webhook-Timestamp': timestamp, + 'X-Webhook-Event': event, + 'X-Webhook-Delivery-Id': deliveryId, + }, + body, + signal: AbortSignal.timeout(timeoutMs), + }); + + attemptRecord.durationMs = Date.now() - start; + attemptRecord.responseCode = res.status; + + if (res.ok) { + attempts.push(attemptRecord); + const result: DeliveryResult = { + deliveryId, + targetId: target.id, + event, + status: 'success', + attempts, + completedAt: new Date().toISOString(), + }; + await options?.onDelivery?.(result); + return result; + } + + attemptRecord.error = `HTTP ${res.status}`; + } catch (err) { + attemptRecord.durationMs = Date.now() - start; + attemptRecord.error = err instanceof Error ? err.message : String(err); + } + + attempts.push(attemptRecord); + } + + const result: DeliveryResult = { + deliveryId, + targetId: target.id, + event, + status: 'failed', + attempts, + completedAt: new Date().toISOString(), + }; + await options?.onDelivery?.(result); + return result; +} + +/** + * Dispatch an event to all matching targets. + * Fire-and-forget: errors are collected in results, never thrown. + */ +export async function dispatchToTargets( + targets: WebhookTarget[], + event: string, + data: Record, + productId: string, + options?: DispatchOptions +): Promise { + const matching = targets.filter( + t => t.enabled && (t.events.length === 0 || t.events.includes(event)) + ); + + if (matching.length === 0) return []; + + const results = await Promise.allSettled( + matching.map(target => deliverToTarget(target, event, data, productId, options)) + ); + + return results.map(r => + r.status === 'fulfilled' + ? r.value + : { + deliveryId: randomUUID(), + targetId: 'unknown', + event, + status: 'failed' as const, + attempts: [], + completedAt: new Date().toISOString(), + } + ); +} + +function sleep(ms: number): Promise { + return new Promise(resolve => setTimeout(resolve, ms)); +} diff --git a/packages/webhook-dispatch/src/index.ts b/packages/webhook-dispatch/src/index.ts new file mode 100644 index 00000000..b302f6ea --- /dev/null +++ b/packages/webhook-dispatch/src/index.ts @@ -0,0 +1,8 @@ +export { signPayload, deliverToTarget, dispatchToTargets } from './dispatcher.js'; +export type { + WebhookTarget, + WebhookPayload, + DeliveryAttempt, + DeliveryResult, + DispatchOptions, +} from './types.js'; diff --git a/packages/webhook-dispatch/src/types.ts b/packages/webhook-dispatch/src/types.ts new file mode 100644 index 00000000..34161541 --- /dev/null +++ b/packages/webhook-dispatch/src/types.ts @@ -0,0 +1,53 @@ +/** + * Reusable webhook dispatch types. + * Decoupled from any specific database — product backends provide their own persistence. + */ + +export interface WebhookTarget { + /** Unique ID for this webhook registration */ + id: string; + /** URL to POST to */ + url: string; + /** HMAC-SHA256 signing secret */ + secret: string; + /** Event types this target subscribes to (empty = all) */ + events: string[]; + /** Whether this target is enabled */ + enabled: boolean; +} + +export interface WebhookPayload { + id: string; + event: string; + productId: string; + timestamp: string; + data: Record; +} + +export interface DeliveryAttempt { + attemptNumber: number; + responseCode?: number; + durationMs: number; + error?: string; + attemptedAt: string; +} + +export interface DeliveryResult { + deliveryId: string; + targetId: string; + event: string; + status: 'success' | 'failed'; + attempts: DeliveryAttempt[]; + completedAt: string; +} + +export interface DispatchOptions { + /** Max retries per target (default: 3) */ + maxRetries?: number; + /** Timeout per attempt in ms (default: 5000) */ + timeoutMs?: number; + /** Backoff intervals in ms (default: [10000, 60000, 300000]) */ + backoffIntervals?: number[]; + /** Called after each delivery completes (success or failure) */ + onDelivery?: (result: DeliveryResult) => void | Promise; +} diff --git a/packages/webhook-dispatch/tsconfig.json b/packages/webhook-dispatch/tsconfig.json new file mode 100644 index 00000000..5edad813 --- /dev/null +++ b/packages/webhook-dispatch/tsconfig.json @@ -0,0 +1,9 @@ +{ + "extends": "../../tsconfig.base.json", + "compilerOptions": { + "outDir": "dist", + "rootDir": "src" + }, + "include": ["src"], + "exclude": ["src/**/*.test.ts"] +} diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index e2b2994f..fe5c25b9 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -405,6 +405,8 @@ importers: packages/errors: {} + packages/event-store: {} + packages/events: dependencies: zod: @@ -439,6 +441,12 @@ importers: specifier: ^10.6.0 version: 10.6.0(fastify@5.7.4) + packages/fastify-sse: + dependencies: + fastify: + specifier: ^5.0.0 + version: 5.7.4 + packages/feature-flag-client: {} packages/feedback-client: @@ -593,6 +601,8 @@ importers: specifier: ^5.2.1 version: 5.7.4 + packages/webhook-dispatch: {} + services/extraction-service: dependencies: '@azure/cosmos':