feat(packages): add @bytelyst/event-store, @bytelyst/fastify-sse, @bytelyst/webhook-dispatch — reusable event infrastructure for product backends
This commit is contained in:
parent
38653bd9ec
commit
07d698e700
@ -10,3 +10,4 @@ learning_ai_fastgap
|
||||
learning_ai_jarvis_jr
|
||||
learning_ai_peakpulse
|
||||
learning_ai_notes
|
||||
learning_ai_flowmonk
|
||||
|
||||
21
packages/event-store/package.json
Normal file
21
packages/event-store/package.json
Normal file
@ -0,0 +1,21 @@
|
||||
{
|
||||
"name": "@bytelyst/event-store",
|
||||
"version": "0.1.0",
|
||||
"description": "Persistent event store with pluggable backends (in-memory, file, Cosmos) for ByteLyst product backends",
|
||||
"type": "module",
|
||||
"exports": {
|
||||
".": {
|
||||
"import": "./dist/index.js",
|
||||
"types": "./dist/index.d.ts"
|
||||
}
|
||||
},
|
||||
"main": "./dist/index.js",
|
||||
"types": "./dist/index.d.ts",
|
||||
"files": [
|
||||
"dist"
|
||||
],
|
||||
"scripts": {
|
||||
"build": "tsc",
|
||||
"test": "vitest run"
|
||||
}
|
||||
}
|
||||
68
packages/event-store/src/file-store.ts
Normal file
68
packages/event-store/src/file-store.ts
Normal file
@ -0,0 +1,68 @@
|
||||
/**
|
||||
* File-based event store implementation.
|
||||
* Appends events as JSON lines to a file on disk.
|
||||
* Suitable for single-instance dev/staging deployments.
|
||||
*/
|
||||
|
||||
import { readFile, appendFile, writeFile, mkdir } from 'node:fs/promises';
|
||||
import { dirname } from 'node:path';
|
||||
import type { EventStore, StoredEvent, EventStoreQuery } from './types.js';
|
||||
|
||||
export interface FileStoreOptions {
|
||||
filePath: string;
|
||||
}
|
||||
|
||||
export class FileEventStore implements EventStore {
|
||||
private readonly filePath: string;
|
||||
|
||||
constructor(options: FileStoreOptions) {
|
||||
this.filePath = options.filePath;
|
||||
}
|
||||
|
||||
async append(event: StoredEvent): Promise<void> {
|
||||
await mkdir(dirname(this.filePath), { recursive: true });
|
||||
await appendFile(this.filePath, JSON.stringify(event) + '\n', 'utf-8');
|
||||
}
|
||||
|
||||
async query(q: EventStoreQuery): Promise<StoredEvent[]> {
|
||||
const all = await this.readAll();
|
||||
let results = all;
|
||||
|
||||
if (q.userId) results = results.filter(e => e.userId === q.userId);
|
||||
if (q.type) results = results.filter(e => e.type === q.type);
|
||||
if (q.after) results = results.filter(e => e.timestamp > q.after!);
|
||||
if (q.before) results = results.filter(e => e.timestamp < q.before!);
|
||||
|
||||
if (q.limit && q.limit > 0) {
|
||||
results = results.slice(-q.limit);
|
||||
}
|
||||
|
||||
return results;
|
||||
}
|
||||
|
||||
async recent(limit = 50): Promise<StoredEvent[]> {
|
||||
const all = await this.readAll();
|
||||
return all.slice(-limit);
|
||||
}
|
||||
|
||||
async count(): Promise<number> {
|
||||
const all = await this.readAll();
|
||||
return all.length;
|
||||
}
|
||||
|
||||
async clear(): Promise<void> {
|
||||
await writeFile(this.filePath, '', 'utf-8');
|
||||
}
|
||||
|
||||
private async readAll(): Promise<StoredEvent[]> {
|
||||
try {
|
||||
const content = await readFile(this.filePath, 'utf-8');
|
||||
return content
|
||||
.split('\n')
|
||||
.filter(line => line.trim())
|
||||
.map(line => JSON.parse(line) as StoredEvent);
|
||||
} catch {
|
||||
return [];
|
||||
}
|
||||
}
|
||||
}
|
||||
5
packages/event-store/src/index.ts
Normal file
5
packages/event-store/src/index.ts
Normal file
@ -0,0 +1,5 @@
|
||||
export type { EventStore, StoredEvent, EventStoreQuery } from './types.js';
|
||||
export { MemoryEventStore } from './memory-store.js';
|
||||
export type { MemoryStoreOptions } from './memory-store.js';
|
||||
export { FileEventStore } from './file-store.js';
|
||||
export type { FileStoreOptions } from './file-store.js';
|
||||
88
packages/event-store/src/memory-store.test.ts
Normal file
88
packages/event-store/src/memory-store.test.ts
Normal file
@ -0,0 +1,88 @@
|
||||
import { describe, it, expect, beforeEach } from 'vitest';
|
||||
import { MemoryEventStore } from './memory-store.js';
|
||||
import type { StoredEvent } from './types.js';
|
||||
|
||||
function makeEvent(overrides?: Partial<StoredEvent>): StoredEvent {
|
||||
return {
|
||||
id: crypto.randomUUID(),
|
||||
type: 'test.event',
|
||||
userId: 'u1',
|
||||
productId: 'testprod',
|
||||
timestamp: new Date().toISOString(),
|
||||
payload: {},
|
||||
...overrides,
|
||||
};
|
||||
}
|
||||
|
||||
describe('MemoryEventStore', () => {
|
||||
let store: MemoryEventStore;
|
||||
|
||||
beforeEach(() => {
|
||||
store = new MemoryEventStore({ maxEvents: 100 });
|
||||
});
|
||||
|
||||
it('appends and retrieves events', async () => {
|
||||
await store.append(makeEvent());
|
||||
expect(await store.count()).toBe(1);
|
||||
const recent = await store.recent();
|
||||
expect(recent).toHaveLength(1);
|
||||
});
|
||||
|
||||
it('caps at maxEvents', async () => {
|
||||
for (let i = 0; i < 120; i++) {
|
||||
await store.append(makeEvent({ id: `e${i}` }));
|
||||
}
|
||||
expect(await store.count()).toBe(100);
|
||||
});
|
||||
|
||||
it('queries by userId', async () => {
|
||||
await store.append(makeEvent({ userId: 'u1' }));
|
||||
await store.append(makeEvent({ userId: 'u2' }));
|
||||
await store.append(makeEvent({ userId: 'u1' }));
|
||||
const results = await store.query({ userId: 'u1' });
|
||||
expect(results).toHaveLength(2);
|
||||
});
|
||||
|
||||
it('queries by type', async () => {
|
||||
await store.append(makeEvent({ type: 'task.created' }));
|
||||
await store.append(makeEvent({ type: 'schedule.generated' }));
|
||||
await store.append(makeEvent({ type: 'task.created' }));
|
||||
const results = await store.query({ type: 'task.created' });
|
||||
expect(results).toHaveLength(2);
|
||||
});
|
||||
|
||||
it('queries with time range', async () => {
|
||||
await store.append(makeEvent({ timestamp: '2026-01-01T00:00:00Z' }));
|
||||
await store.append(makeEvent({ timestamp: '2026-03-01T00:00:00Z' }));
|
||||
await store.append(makeEvent({ timestamp: '2026-06-01T00:00:00Z' }));
|
||||
const results = await store.query({
|
||||
after: '2026-02-01T00:00:00Z',
|
||||
before: '2026-04-01T00:00:00Z',
|
||||
});
|
||||
expect(results).toHaveLength(1);
|
||||
});
|
||||
|
||||
it('queries with limit', async () => {
|
||||
for (let i = 0; i < 10; i++) {
|
||||
await store.append(makeEvent());
|
||||
}
|
||||
const results = await store.query({ limit: 3 });
|
||||
expect(results).toHaveLength(3);
|
||||
});
|
||||
|
||||
it('clears all events', async () => {
|
||||
await store.append(makeEvent());
|
||||
await store.append(makeEvent());
|
||||
await store.clear();
|
||||
expect(await store.count()).toBe(0);
|
||||
});
|
||||
|
||||
it('recent returns last N events', async () => {
|
||||
for (let i = 0; i < 10; i++) {
|
||||
await store.append(makeEvent({ id: `e${i}` }));
|
||||
}
|
||||
const recent = await store.recent(3);
|
||||
expect(recent).toHaveLength(3);
|
||||
expect(recent[0].id).toBe('e7');
|
||||
});
|
||||
});
|
||||
54
packages/event-store/src/memory-store.ts
Normal file
54
packages/event-store/src/memory-store.ts
Normal file
@ -0,0 +1,54 @@
|
||||
/**
|
||||
* In-memory event store implementation.
|
||||
* Useful for development, testing, and as a fallback when no persistent backend is configured.
|
||||
* Caps at maxEvents to prevent unbounded memory growth.
|
||||
*/
|
||||
|
||||
import type { EventStore, StoredEvent, EventStoreQuery } from './types.js';
|
||||
|
||||
export interface MemoryStoreOptions {
|
||||
maxEvents?: number;
|
||||
}
|
||||
|
||||
export class MemoryEventStore implements EventStore {
|
||||
private events: StoredEvent[] = [];
|
||||
private readonly maxEvents: number;
|
||||
|
||||
constructor(options?: MemoryStoreOptions) {
|
||||
this.maxEvents = options?.maxEvents ?? 10_000;
|
||||
}
|
||||
|
||||
async append(event: StoredEvent): Promise<void> {
|
||||
this.events.push(event);
|
||||
if (this.events.length > this.maxEvents) {
|
||||
this.events = this.events.slice(-this.maxEvents);
|
||||
}
|
||||
}
|
||||
|
||||
async query(q: EventStoreQuery): Promise<StoredEvent[]> {
|
||||
let results = this.events;
|
||||
|
||||
if (q.userId) results = results.filter(e => e.userId === q.userId);
|
||||
if (q.type) results = results.filter(e => e.type === q.type);
|
||||
if (q.after) results = results.filter(e => e.timestamp > q.after!);
|
||||
if (q.before) results = results.filter(e => e.timestamp < q.before!);
|
||||
|
||||
if (q.limit && q.limit > 0) {
|
||||
results = results.slice(-q.limit);
|
||||
}
|
||||
|
||||
return results;
|
||||
}
|
||||
|
||||
async recent(limit = 50): Promise<StoredEvent[]> {
|
||||
return this.events.slice(-limit);
|
||||
}
|
||||
|
||||
async count(): Promise<number> {
|
||||
return this.events.length;
|
||||
}
|
||||
|
||||
async clear(): Promise<void> {
|
||||
this.events = [];
|
||||
}
|
||||
}
|
||||
29
packages/event-store/src/types.ts
Normal file
29
packages/event-store/src/types.ts
Normal file
@ -0,0 +1,29 @@
|
||||
/**
|
||||
* Pluggable event store interface for ByteLyst product backends.
|
||||
* Products define their own event shapes; the store handles persistence.
|
||||
*/
|
||||
|
||||
export interface StoredEvent {
|
||||
id: string;
|
||||
type: string;
|
||||
userId: string;
|
||||
productId: string;
|
||||
timestamp: string;
|
||||
payload: Record<string, unknown>;
|
||||
}
|
||||
|
||||
export interface EventStoreQuery {
|
||||
userId?: string;
|
||||
type?: string;
|
||||
after?: string;
|
||||
before?: string;
|
||||
limit?: number;
|
||||
}
|
||||
|
||||
export interface EventStore {
|
||||
append(event: StoredEvent): Promise<void>;
|
||||
query(q: EventStoreQuery): Promise<StoredEvent[]>;
|
||||
recent(limit?: number): Promise<StoredEvent[]>;
|
||||
count(): Promise<number>;
|
||||
clear(): Promise<void>;
|
||||
}
|
||||
9
packages/event-store/tsconfig.json
Normal file
9
packages/event-store/tsconfig.json
Normal file
@ -0,0 +1,9 @@
|
||||
{
|
||||
"extends": "../../tsconfig.base.json",
|
||||
"compilerOptions": {
|
||||
"outDir": "dist",
|
||||
"rootDir": "src"
|
||||
},
|
||||
"include": ["src"],
|
||||
"exclude": ["src/**/*.test.ts"]
|
||||
}
|
||||
24
packages/fastify-sse/package.json
Normal file
24
packages/fastify-sse/package.json
Normal file
@ -0,0 +1,24 @@
|
||||
{
|
||||
"name": "@bytelyst/fastify-sse",
|
||||
"version": "0.1.0",
|
||||
"description": "Fastify plugin for Server-Sent Events (SSE) — real-time push for ByteLyst product backends",
|
||||
"type": "module",
|
||||
"exports": {
|
||||
".": {
|
||||
"import": "./dist/index.js",
|
||||
"types": "./dist/index.d.ts"
|
||||
}
|
||||
},
|
||||
"main": "./dist/index.js",
|
||||
"types": "./dist/index.d.ts",
|
||||
"files": [
|
||||
"dist"
|
||||
],
|
||||
"scripts": {
|
||||
"build": "tsc",
|
||||
"test": "vitest run"
|
||||
},
|
||||
"peerDependencies": {
|
||||
"fastify": "^5.0.0"
|
||||
}
|
||||
}
|
||||
143
packages/fastify-sse/src/hub.ts
Normal file
143
packages/fastify-sse/src/hub.ts
Normal file
@ -0,0 +1,143 @@
|
||||
/**
|
||||
* SSE Hub — manages connected clients and broadcasts events.
|
||||
* Product backends create an SSEHub instance and push events to it;
|
||||
* the hub fans out to all connected SSE clients.
|
||||
*/
|
||||
|
||||
import type { ServerResponse } from 'node:http';
|
||||
|
||||
export interface SSEMessage {
|
||||
event?: string;
|
||||
data: string;
|
||||
id?: string;
|
||||
retry?: number;
|
||||
}
|
||||
|
||||
interface ConnectedClient {
|
||||
id: string;
|
||||
userId?: string;
|
||||
res: ServerResponse;
|
||||
connectedAt: string;
|
||||
}
|
||||
|
||||
export class SSEHub {
|
||||
private clients = new Map<string, ConnectedClient>();
|
||||
private clientCounter = 0;
|
||||
|
||||
/**
|
||||
* Add a new SSE client connection.
|
||||
* Sets up the SSE headers and returns a client ID.
|
||||
*/
|
||||
addClient(res: ServerResponse, userId?: string): string {
|
||||
const id = `sse_${++this.clientCounter}_${Date.now()}`;
|
||||
|
||||
res.writeHead(200, {
|
||||
'Content-Type': 'text/event-stream',
|
||||
'Cache-Control': 'no-cache',
|
||||
Connection: 'keep-alive',
|
||||
'X-Accel-Buffering': 'no',
|
||||
});
|
||||
|
||||
// Send initial connection event
|
||||
res.write(`event: connected\ndata: ${JSON.stringify({ clientId: id })}\n\n`);
|
||||
|
||||
const client: ConnectedClient = {
|
||||
id,
|
||||
userId,
|
||||
res,
|
||||
connectedAt: new Date().toISOString(),
|
||||
};
|
||||
|
||||
this.clients.set(id, client);
|
||||
|
||||
// Clean up on close
|
||||
res.on('close', () => {
|
||||
this.clients.delete(id);
|
||||
});
|
||||
|
||||
return id;
|
||||
}
|
||||
|
||||
/**
|
||||
* Broadcast an SSE message to all connected clients.
|
||||
*/
|
||||
broadcast(message: SSEMessage): number {
|
||||
let sent = 0;
|
||||
const formatted = formatSSE(message);
|
||||
|
||||
for (const [id, client] of this.clients) {
|
||||
try {
|
||||
client.res.write(formatted);
|
||||
sent++;
|
||||
} catch {
|
||||
this.clients.delete(id);
|
||||
}
|
||||
}
|
||||
|
||||
return sent;
|
||||
}
|
||||
|
||||
/**
|
||||
* Send an SSE message to a specific user's connections.
|
||||
*/
|
||||
sendToUser(userId: string, message: SSEMessage): number {
|
||||
let sent = 0;
|
||||
const formatted = formatSSE(message);
|
||||
|
||||
for (const [id, client] of this.clients) {
|
||||
if (client.userId === userId) {
|
||||
try {
|
||||
client.res.write(formatted);
|
||||
sent++;
|
||||
} catch {
|
||||
this.clients.delete(id);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return sent;
|
||||
}
|
||||
|
||||
/**
|
||||
* Send a heartbeat (comment) to all clients to keep connections alive.
|
||||
*/
|
||||
heartbeat(): void {
|
||||
for (const [id, client] of this.clients) {
|
||||
try {
|
||||
client.res.write(': heartbeat\n\n');
|
||||
} catch {
|
||||
this.clients.delete(id);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get count of connected clients.
|
||||
*/
|
||||
get clientCount(): number {
|
||||
return this.clients.size;
|
||||
}
|
||||
|
||||
/**
|
||||
* Disconnect all clients.
|
||||
*/
|
||||
disconnectAll(): void {
|
||||
for (const [, client] of this.clients) {
|
||||
try {
|
||||
client.res.end();
|
||||
} catch {
|
||||
/* already closed */
|
||||
}
|
||||
}
|
||||
this.clients.clear();
|
||||
}
|
||||
}
|
||||
|
||||
function formatSSE(message: SSEMessage): string {
|
||||
let output = '';
|
||||
if (message.id) output += `id: ${message.id}\n`;
|
||||
if (message.event) output += `event: ${message.event}\n`;
|
||||
if (message.retry) output += `retry: ${message.retry}\n`;
|
||||
output += `data: ${message.data}\n\n`;
|
||||
return output;
|
||||
}
|
||||
3
packages/fastify-sse/src/index.ts
Normal file
3
packages/fastify-sse/src/index.ts
Normal file
@ -0,0 +1,3 @@
|
||||
export { SSEHub } from './hub.js';
|
||||
export { ssePlugin } from './plugin.js';
|
||||
export type { SSEPluginOptions, SSEClient } from './plugin.js';
|
||||
61
packages/fastify-sse/src/plugin.ts
Normal file
61
packages/fastify-sse/src/plugin.ts
Normal file
@ -0,0 +1,61 @@
|
||||
/**
|
||||
* Fastify plugin that registers an SSE endpoint.
|
||||
* Product backends configure the path and optional auth check.
|
||||
*/
|
||||
|
||||
import type { FastifyInstance, FastifyRequest, FastifyReply } from 'fastify';
|
||||
import { SSEHub } from './hub.js';
|
||||
|
||||
export interface SSEClient {
|
||||
id: string;
|
||||
userId?: string;
|
||||
}
|
||||
|
||||
export interface SSEPluginOptions {
|
||||
/** Route path for the SSE endpoint (default: /events/stream) */
|
||||
path?: string;
|
||||
/** Extract userId from request (optional, enables per-user targeting) */
|
||||
getUserId?: (req: FastifyRequest) => string | undefined;
|
||||
/** Heartbeat interval in ms (default: 30000, set 0 to disable) */
|
||||
heartbeatIntervalMs?: number;
|
||||
/** The SSE hub instance to use (creates one if not provided) */
|
||||
hub?: SSEHub;
|
||||
}
|
||||
|
||||
export async function ssePlugin(
|
||||
app: FastifyInstance,
|
||||
options: SSEPluginOptions = {}
|
||||
): Promise<void> {
|
||||
const path = options.path ?? '/events/stream';
|
||||
const heartbeatMs = options.heartbeatIntervalMs ?? 30_000;
|
||||
const hub = options.hub ?? new SSEHub();
|
||||
|
||||
// Decorate app with the hub so routes can push events
|
||||
if (!app.hasDecorator('sseHub')) {
|
||||
app.decorate('sseHub', hub);
|
||||
}
|
||||
|
||||
// Register SSE endpoint
|
||||
app.get(path, async (req: FastifyRequest, reply: FastifyReply) => {
|
||||
const userId = options.getUserId?.(req);
|
||||
|
||||
// Hijack the raw response for SSE streaming
|
||||
const raw = reply.raw;
|
||||
hub.addClient(raw, userId);
|
||||
|
||||
// Prevent Fastify from sending its own response
|
||||
reply.hijack();
|
||||
});
|
||||
|
||||
// Heartbeat timer
|
||||
let heartbeatTimer: ReturnType<typeof setInterval> | undefined;
|
||||
if (heartbeatMs > 0) {
|
||||
heartbeatTimer = setInterval(() => hub.heartbeat(), heartbeatMs);
|
||||
}
|
||||
|
||||
// Cleanup on close
|
||||
app.addHook('onClose', async () => {
|
||||
if (heartbeatTimer) clearInterval(heartbeatTimer);
|
||||
hub.disconnectAll();
|
||||
});
|
||||
}
|
||||
9
packages/fastify-sse/tsconfig.json
Normal file
9
packages/fastify-sse/tsconfig.json
Normal file
@ -0,0 +1,9 @@
|
||||
{
|
||||
"extends": "../../tsconfig.base.json",
|
||||
"compilerOptions": {
|
||||
"outDir": "dist",
|
||||
"rootDir": "src"
|
||||
},
|
||||
"include": ["src"],
|
||||
"exclude": ["src/**/*.test.ts"]
|
||||
}
|
||||
21
packages/webhook-dispatch/package.json
Normal file
21
packages/webhook-dispatch/package.json
Normal file
@ -0,0 +1,21 @@
|
||||
{
|
||||
"name": "@bytelyst/webhook-dispatch",
|
||||
"version": "0.1.0",
|
||||
"description": "Reusable webhook dispatch with HMAC-SHA256 signing, exponential backoff retry, and delivery tracking for ByteLyst product backends",
|
||||
"type": "module",
|
||||
"exports": {
|
||||
".": {
|
||||
"import": "./dist/index.js",
|
||||
"types": "./dist/index.d.ts"
|
||||
}
|
||||
},
|
||||
"main": "./dist/index.js",
|
||||
"types": "./dist/index.d.ts",
|
||||
"files": [
|
||||
"dist"
|
||||
],
|
||||
"scripts": {
|
||||
"build": "tsc",
|
||||
"test": "vitest run"
|
||||
}
|
||||
}
|
||||
163
packages/webhook-dispatch/src/dispatcher.test.ts
Normal file
163
packages/webhook-dispatch/src/dispatcher.test.ts
Normal file
@ -0,0 +1,163 @@
|
||||
import { describe, it, expect, vi, beforeEach, afterEach } from 'vitest';
|
||||
import { signPayload, deliverToTarget, dispatchToTargets } from './dispatcher.js';
|
||||
import type { WebhookTarget } from './types.js';
|
||||
|
||||
function makeTarget(overrides?: Partial<WebhookTarget>): WebhookTarget {
|
||||
return {
|
||||
id: 'wh_1',
|
||||
url: 'https://example.com/webhook',
|
||||
secret: 'test-secret-key',
|
||||
events: [],
|
||||
enabled: true,
|
||||
...overrides,
|
||||
};
|
||||
}
|
||||
|
||||
describe('signPayload', () => {
|
||||
it('produces consistent HMAC-SHA256 signatures', () => {
|
||||
const sig1 = signPayload('{"test":true}', 'secret');
|
||||
const sig2 = signPayload('{"test":true}', 'secret');
|
||||
expect(sig1).toBe(sig2);
|
||||
expect(sig1).toMatch(/^[a-f0-9]{64}$/);
|
||||
});
|
||||
|
||||
it('produces different signatures for different secrets', () => {
|
||||
const sig1 = signPayload('data', 'secret1');
|
||||
const sig2 = signPayload('data', 'secret2');
|
||||
expect(sig1).not.toBe(sig2);
|
||||
});
|
||||
});
|
||||
|
||||
describe('deliverToTarget', () => {
|
||||
beforeEach(() => {
|
||||
vi.stubGlobal('fetch', vi.fn());
|
||||
});
|
||||
afterEach(() => {
|
||||
vi.restoreAllMocks();
|
||||
});
|
||||
|
||||
it('returns success on 200', async () => {
|
||||
(fetch as ReturnType<typeof vi.fn>).mockResolvedValue({ ok: true, status: 200 });
|
||||
|
||||
const result = await deliverToTarget(
|
||||
makeTarget(),
|
||||
'task.created',
|
||||
{ taskId: 't1' },
|
||||
'flowmonk',
|
||||
{ maxRetries: 1, backoffIntervals: [0] }
|
||||
);
|
||||
|
||||
expect(result.status).toBe('success');
|
||||
expect(result.attempts).toHaveLength(1);
|
||||
expect(result.attempts[0].responseCode).toBe(200);
|
||||
});
|
||||
|
||||
it('returns failed after exhausting retries on 500', async () => {
|
||||
(fetch as ReturnType<typeof vi.fn>).mockResolvedValue({ ok: false, status: 500 });
|
||||
|
||||
const result = await deliverToTarget(
|
||||
makeTarget(),
|
||||
'task.created',
|
||||
{ taskId: 't1' },
|
||||
'flowmonk',
|
||||
{ maxRetries: 2, backoffIntervals: [0] }
|
||||
);
|
||||
|
||||
expect(result.status).toBe('failed');
|
||||
expect(result.attempts).toHaveLength(2);
|
||||
});
|
||||
|
||||
it('returns failed on network error', async () => {
|
||||
(fetch as ReturnType<typeof vi.fn>).mockRejectedValue(new Error('ECONNREFUSED'));
|
||||
|
||||
const result = await deliverToTarget(
|
||||
makeTarget(),
|
||||
'task.created',
|
||||
{ taskId: 't1' },
|
||||
'flowmonk',
|
||||
{ maxRetries: 1, backoffIntervals: [0] }
|
||||
);
|
||||
|
||||
expect(result.status).toBe('failed');
|
||||
expect(result.attempts[0].error).toBe('ECONNREFUSED');
|
||||
});
|
||||
|
||||
it('calls onDelivery callback', async () => {
|
||||
(fetch as ReturnType<typeof vi.fn>).mockResolvedValue({ ok: true, status: 200 });
|
||||
const onDelivery = vi.fn();
|
||||
|
||||
await deliverToTarget(makeTarget(), 'task.created', { taskId: 't1' }, 'flowmonk', {
|
||||
maxRetries: 1,
|
||||
backoffIntervals: [0],
|
||||
onDelivery,
|
||||
});
|
||||
|
||||
expect(onDelivery).toHaveBeenCalledOnce();
|
||||
expect(onDelivery.mock.calls[0][0].status).toBe('success');
|
||||
});
|
||||
|
||||
it('sends correct headers', async () => {
|
||||
(fetch as ReturnType<typeof vi.fn>).mockResolvedValue({ ok: true, status: 200 });
|
||||
|
||||
await deliverToTarget(
|
||||
makeTarget({ url: 'https://test.com/hook' }),
|
||||
'schedule.generated',
|
||||
{},
|
||||
'flowmonk',
|
||||
{ maxRetries: 1, backoffIntervals: [0] }
|
||||
);
|
||||
|
||||
const call = (fetch as ReturnType<typeof vi.fn>).mock.calls[0];
|
||||
expect(call[0]).toBe('https://test.com/hook');
|
||||
expect(call[1].headers['Content-Type']).toBe('application/json');
|
||||
expect(call[1].headers['X-Webhook-Event']).toBe('schedule.generated');
|
||||
expect(call[1].headers['X-Webhook-Signature']).toMatch(/^sha256=[a-f0-9]{64}$/);
|
||||
});
|
||||
});
|
||||
|
||||
describe('dispatchToTargets', () => {
|
||||
beforeEach(() => {
|
||||
vi.stubGlobal('fetch', vi.fn().mockResolvedValue({ ok: true, status: 200 }));
|
||||
});
|
||||
afterEach(() => {
|
||||
vi.restoreAllMocks();
|
||||
});
|
||||
|
||||
it('dispatches to all enabled matching targets', async () => {
|
||||
const targets = [
|
||||
makeTarget({ id: 'wh_1', events: ['task.created'] }),
|
||||
makeTarget({ id: 'wh_2', events: ['task.created', 'schedule.generated'] }),
|
||||
makeTarget({ id: 'wh_3', events: ['schedule.generated'], enabled: false }),
|
||||
];
|
||||
|
||||
const results = await dispatchToTargets(targets, 'task.created', {}, 'flowmonk', {
|
||||
maxRetries: 1,
|
||||
backoffIntervals: [0],
|
||||
});
|
||||
|
||||
expect(results).toHaveLength(2);
|
||||
expect(results.every(r => r.status === 'success')).toBe(true);
|
||||
});
|
||||
|
||||
it('dispatches to targets with empty events (wildcard)', async () => {
|
||||
const targets = [makeTarget({ id: 'wh_1', events: [] })];
|
||||
|
||||
const results = await dispatchToTargets(targets, 'any.event', {}, 'flowmonk', {
|
||||
maxRetries: 1,
|
||||
backoffIntervals: [0],
|
||||
});
|
||||
|
||||
expect(results).toHaveLength(1);
|
||||
});
|
||||
|
||||
it('returns empty array when no targets match', async () => {
|
||||
const targets = [makeTarget({ events: ['other.event'] })];
|
||||
|
||||
const results = await dispatchToTargets(targets, 'task.created', {}, 'flowmonk', {
|
||||
maxRetries: 1,
|
||||
backoffIntervals: [0],
|
||||
});
|
||||
|
||||
expect(results).toHaveLength(0);
|
||||
});
|
||||
});
|
||||
157
packages/webhook-dispatch/src/dispatcher.ts
Normal file
157
packages/webhook-dispatch/src/dispatcher.ts
Normal file
@ -0,0 +1,157 @@
|
||||
/**
|
||||
* Reusable webhook dispatcher with HMAC-SHA256 signing and exponential backoff retry.
|
||||
* Extracted from platform-service's dispatcher for cross-product reuse.
|
||||
*/
|
||||
|
||||
import { randomUUID, createHmac } from 'node:crypto';
|
||||
import type {
|
||||
WebhookTarget,
|
||||
WebhookPayload,
|
||||
DeliveryAttempt,
|
||||
DeliveryResult,
|
||||
DispatchOptions,
|
||||
} from './types.js';
|
||||
|
||||
const DEFAULT_MAX_RETRIES = 3;
|
||||
const DEFAULT_TIMEOUT_MS = 5_000;
|
||||
const DEFAULT_BACKOFF = [10_000, 60_000, 300_000];
|
||||
|
||||
/**
|
||||
* Sign a webhook payload body with HMAC-SHA256.
|
||||
*/
|
||||
export function signPayload(body: string, secret: string): string {
|
||||
return createHmac('sha256', secret).update(body).digest('hex');
|
||||
}
|
||||
|
||||
/**
|
||||
* Dispatch an event to a single webhook target with retry and signing.
|
||||
*/
|
||||
export async function deliverToTarget(
|
||||
target: WebhookTarget,
|
||||
event: string,
|
||||
data: Record<string, unknown>,
|
||||
productId: string,
|
||||
options?: DispatchOptions
|
||||
): Promise<DeliveryResult> {
|
||||
const maxRetries = options?.maxRetries ?? DEFAULT_MAX_RETRIES;
|
||||
const timeoutMs = options?.timeoutMs ?? DEFAULT_TIMEOUT_MS;
|
||||
const backoff = options?.backoffIntervals ?? DEFAULT_BACKOFF;
|
||||
|
||||
const deliveryId = randomUUID();
|
||||
const timestamp = new Date().toISOString();
|
||||
|
||||
const payload: WebhookPayload = {
|
||||
id: deliveryId,
|
||||
event,
|
||||
productId,
|
||||
timestamp,
|
||||
data,
|
||||
};
|
||||
|
||||
const body = JSON.stringify(payload);
|
||||
const signature = signPayload(body, target.secret);
|
||||
const attempts: DeliveryAttempt[] = [];
|
||||
|
||||
for (let attempt = 0; attempt < maxRetries; attempt++) {
|
||||
if (attempt > 0) {
|
||||
const delay = backoff[attempt - 1] ?? backoff[backoff.length - 1];
|
||||
await sleep(delay);
|
||||
}
|
||||
|
||||
const start = Date.now();
|
||||
const attemptRecord: DeliveryAttempt = {
|
||||
attemptNumber: attempt + 1,
|
||||
durationMs: 0,
|
||||
attemptedAt: new Date().toISOString(),
|
||||
};
|
||||
|
||||
try {
|
||||
const res = await fetch(target.url, {
|
||||
method: 'POST',
|
||||
headers: {
|
||||
'Content-Type': 'application/json',
|
||||
'X-Webhook-Signature': `sha256=${signature}`,
|
||||
'X-Webhook-Timestamp': timestamp,
|
||||
'X-Webhook-Event': event,
|
||||
'X-Webhook-Delivery-Id': deliveryId,
|
||||
},
|
||||
body,
|
||||
signal: AbortSignal.timeout(timeoutMs),
|
||||
});
|
||||
|
||||
attemptRecord.durationMs = Date.now() - start;
|
||||
attemptRecord.responseCode = res.status;
|
||||
|
||||
if (res.ok) {
|
||||
attempts.push(attemptRecord);
|
||||
const result: DeliveryResult = {
|
||||
deliveryId,
|
||||
targetId: target.id,
|
||||
event,
|
||||
status: 'success',
|
||||
attempts,
|
||||
completedAt: new Date().toISOString(),
|
||||
};
|
||||
await options?.onDelivery?.(result);
|
||||
return result;
|
||||
}
|
||||
|
||||
attemptRecord.error = `HTTP ${res.status}`;
|
||||
} catch (err) {
|
||||
attemptRecord.durationMs = Date.now() - start;
|
||||
attemptRecord.error = err instanceof Error ? err.message : String(err);
|
||||
}
|
||||
|
||||
attempts.push(attemptRecord);
|
||||
}
|
||||
|
||||
const result: DeliveryResult = {
|
||||
deliveryId,
|
||||
targetId: target.id,
|
||||
event,
|
||||
status: 'failed',
|
||||
attempts,
|
||||
completedAt: new Date().toISOString(),
|
||||
};
|
||||
await options?.onDelivery?.(result);
|
||||
return result;
|
||||
}
|
||||
|
||||
/**
|
||||
* Dispatch an event to all matching targets.
|
||||
* Fire-and-forget: errors are collected in results, never thrown.
|
||||
*/
|
||||
export async function dispatchToTargets(
|
||||
targets: WebhookTarget[],
|
||||
event: string,
|
||||
data: Record<string, unknown>,
|
||||
productId: string,
|
||||
options?: DispatchOptions
|
||||
): Promise<DeliveryResult[]> {
|
||||
const matching = targets.filter(
|
||||
t => t.enabled && (t.events.length === 0 || t.events.includes(event))
|
||||
);
|
||||
|
||||
if (matching.length === 0) return [];
|
||||
|
||||
const results = await Promise.allSettled(
|
||||
matching.map(target => deliverToTarget(target, event, data, productId, options))
|
||||
);
|
||||
|
||||
return results.map(r =>
|
||||
r.status === 'fulfilled'
|
||||
? r.value
|
||||
: {
|
||||
deliveryId: randomUUID(),
|
||||
targetId: 'unknown',
|
||||
event,
|
||||
status: 'failed' as const,
|
||||
attempts: [],
|
||||
completedAt: new Date().toISOString(),
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
function sleep(ms: number): Promise<void> {
|
||||
return new Promise(resolve => setTimeout(resolve, ms));
|
||||
}
|
||||
8
packages/webhook-dispatch/src/index.ts
Normal file
8
packages/webhook-dispatch/src/index.ts
Normal file
@ -0,0 +1,8 @@
|
||||
export { signPayload, deliverToTarget, dispatchToTargets } from './dispatcher.js';
|
||||
export type {
|
||||
WebhookTarget,
|
||||
WebhookPayload,
|
||||
DeliveryAttempt,
|
||||
DeliveryResult,
|
||||
DispatchOptions,
|
||||
} from './types.js';
|
||||
53
packages/webhook-dispatch/src/types.ts
Normal file
53
packages/webhook-dispatch/src/types.ts
Normal file
@ -0,0 +1,53 @@
|
||||
/**
|
||||
* Reusable webhook dispatch types.
|
||||
* Decoupled from any specific database — product backends provide their own persistence.
|
||||
*/
|
||||
|
||||
export interface WebhookTarget {
|
||||
/** Unique ID for this webhook registration */
|
||||
id: string;
|
||||
/** URL to POST to */
|
||||
url: string;
|
||||
/** HMAC-SHA256 signing secret */
|
||||
secret: string;
|
||||
/** Event types this target subscribes to (empty = all) */
|
||||
events: string[];
|
||||
/** Whether this target is enabled */
|
||||
enabled: boolean;
|
||||
}
|
||||
|
||||
export interface WebhookPayload {
|
||||
id: string;
|
||||
event: string;
|
||||
productId: string;
|
||||
timestamp: string;
|
||||
data: Record<string, unknown>;
|
||||
}
|
||||
|
||||
export interface DeliveryAttempt {
|
||||
attemptNumber: number;
|
||||
responseCode?: number;
|
||||
durationMs: number;
|
||||
error?: string;
|
||||
attemptedAt: string;
|
||||
}
|
||||
|
||||
export interface DeliveryResult {
|
||||
deliveryId: string;
|
||||
targetId: string;
|
||||
event: string;
|
||||
status: 'success' | 'failed';
|
||||
attempts: DeliveryAttempt[];
|
||||
completedAt: string;
|
||||
}
|
||||
|
||||
export interface DispatchOptions {
|
||||
/** Max retries per target (default: 3) */
|
||||
maxRetries?: number;
|
||||
/** Timeout per attempt in ms (default: 5000) */
|
||||
timeoutMs?: number;
|
||||
/** Backoff intervals in ms (default: [10000, 60000, 300000]) */
|
||||
backoffIntervals?: number[];
|
||||
/** Called after each delivery completes (success or failure) */
|
||||
onDelivery?: (result: DeliveryResult) => void | Promise<void>;
|
||||
}
|
||||
9
packages/webhook-dispatch/tsconfig.json
Normal file
9
packages/webhook-dispatch/tsconfig.json
Normal file
@ -0,0 +1,9 @@
|
||||
{
|
||||
"extends": "../../tsconfig.base.json",
|
||||
"compilerOptions": {
|
||||
"outDir": "dist",
|
||||
"rootDir": "src"
|
||||
},
|
||||
"include": ["src"],
|
||||
"exclude": ["src/**/*.test.ts"]
|
||||
}
|
||||
10
pnpm-lock.yaml
generated
10
pnpm-lock.yaml
generated
@ -405,6 +405,8 @@ importers:
|
||||
|
||||
packages/errors: {}
|
||||
|
||||
packages/event-store: {}
|
||||
|
||||
packages/events:
|
||||
dependencies:
|
||||
zod:
|
||||
@ -439,6 +441,12 @@ importers:
|
||||
specifier: ^10.6.0
|
||||
version: 10.6.0(fastify@5.7.4)
|
||||
|
||||
packages/fastify-sse:
|
||||
dependencies:
|
||||
fastify:
|
||||
specifier: ^5.0.0
|
||||
version: 5.7.4
|
||||
|
||||
packages/feature-flag-client: {}
|
||||
|
||||
packages/feedback-client:
|
||||
@ -593,6 +601,8 @@ importers:
|
||||
specifier: ^5.2.1
|
||||
version: 5.7.4
|
||||
|
||||
packages/webhook-dispatch: {}
|
||||
|
||||
services/extraction-service:
|
||||
dependencies:
|
||||
'@azure/cosmos':
|
||||
|
||||
Loading…
Reference in New Issue
Block a user