feat(backend): add domain event bus + webhook dispatch

Add typed event bus (6 events: timer.created/fired/completed,
routine.started/completed, household.created) with Promise.allSettled
isolation. Wire webhook subscriber bridge using @bytelyst/webhook-dispatch
for HMAC-signed delivery with retry.

All 219 tests pass.
This commit is contained in:
saravanakumardb1 2026-04-13 11:28:38 -07:00
parent 427080a2a3
commit fbac905e9c
5 changed files with 6706 additions and 277 deletions

View File

@ -26,6 +26,7 @@
"@bytelyst/field-encrypt": "*",
"@bytelyst/fastify-auth": "*",
"@bytelyst/fastify-core": "*",
"@bytelyst/webhook-dispatch": "*",
"@azure/cosmos": "^4.2.0",
"fastify": "5.7.4",
"jose": "^6.0.8",

View File

@ -0,0 +1,52 @@
/**
* Domain event bus singleton for ChronoMind backend.
*
* Lightweight typed pub/sub for domain events. Handlers run via
* Promise.allSettled a failing handler never blocks others.
*/
export interface TimerCreatedEvent { timerId: string; userId: string; label: string; }
export interface TimerFiredEvent { timerId: string; userId: string; label: string; }
export interface TimerCompletedEvent { timerId: string; userId: string; label: string; }
export interface RoutineStartedEvent { routineId: string; userId: string; name: string; }
export interface RoutineCompletedEvent { routineId: string; userId: string; name: string; }
export interface HouseholdCreatedEvent { householdId: string; userId: string; name: string; }
export type ChronoMindEventMap = {
'timer.created': TimerCreatedEvent;
'timer.fired': TimerFiredEvent;
'timer.completed': TimerCompletedEvent;
'routine.started': RoutineStartedEvent;
'routine.completed': RoutineCompletedEvent;
'household.created': HouseholdCreatedEvent;
};
type Handler<T> = (payload: T) => void | Promise<void>;
class DomainEventBus {
private handlers = new Map<string, Set<Handler<unknown>>>();
on<K extends keyof ChronoMindEventMap>(event: K, handler: Handler<ChronoMindEventMap[K]>): () => void {
if (!this.handlers.has(event)) this.handlers.set(event, new Set());
this.handlers.get(event)!.add(handler as Handler<unknown>);
return () => { this.handlers.get(event)?.delete(handler as Handler<unknown>); };
}
async emit<K extends keyof ChronoMindEventMap>(event: K, payload: ChronoMindEventMap[K]): Promise<void> {
const fns = this.handlers.get(event);
if (!fns || fns.size === 0) return;
await Promise.allSettled([...fns].map(fn => fn(payload)));
}
removeAll(): void { this.handlers.clear(); }
}
let _bus: DomainEventBus | null = null;
export function getEventBus(): DomainEventBus {
if (!_bus) _bus = new DomainEventBus();
return _bus;
}
/** @internal — for testing only. */
export function _resetEventBus(): void { _bus = null; }

View File

@ -0,0 +1,58 @@
/**
* Webhook subscriber bridges domain events to @bytelyst/webhook-dispatch.
*/
import { dispatchToTargets, type WebhookTarget, type DeliveryResult, type DispatchOptions } from '@bytelyst/webhook-dispatch';
import { getEventBus, type ChronoMindEventMap } from './event-bus.js';
import { PRODUCT_ID } from './product-config.js';
const targets: WebhookTarget[] = [];
export function registerWebhookTarget(target: WebhookTarget): void {
const idx = targets.findIndex(t => t.id === target.id);
if (idx >= 0) targets[idx] = target; else targets.push(target);
}
export function removeWebhookTarget(id: string): boolean {
const idx = targets.findIndex(t => t.id === id);
if (idx < 0) return false;
targets.splice(idx, 1);
return true;
}
export function listWebhookTargets(): WebhookTarget[] { return [...targets]; }
let deliveryLog: DeliveryResult[] = [];
const dispatchOptions: DispatchOptions = {
maxRetries: 3,
timeoutMs: 5_000,
onDelivery: (result: DeliveryResult) => {
deliveryLog.push(result);
if (deliveryLog.length > 1_000) deliveryLog = deliveryLog.slice(-500);
},
};
export function getRecentDeliveries(limit = 50): DeliveryResult[] { return deliveryLog.slice(-limit); }
async function dispatchEvent<K extends keyof ChronoMindEventMap>(
event: K, payload: ChronoMindEventMap[K],
): Promise<void> {
if (targets.length === 0) return;
await dispatchToTargets(targets, event, payload as unknown as Record<string, unknown>, PRODUCT_ID, dispatchOptions).catch(() => {});
}
const unsubscribers: (() => void)[] = [];
export function initWebhookSubscriber(): void {
const bus = getEventBus();
unsubscribers.push(
bus.on('timer.created', (p) => dispatchEvent('timer.created', p)),
bus.on('timer.fired', (p) => dispatchEvent('timer.fired', p)),
bus.on('timer.completed', (p) => dispatchEvent('timer.completed', p)),
bus.on('routine.started', (p) => dispatchEvent('routine.started', p)),
bus.on('routine.completed', (p) => dispatchEvent('routine.completed', p)),
bus.on('household.created', (p) => dispatchEvent('household.created', p)),
);
}
export function stopWebhookSubscriber(): void {
for (const unsub of unsubscribers) unsub();
unsubscribers.length = 0;
}

View File

@ -22,6 +22,7 @@ import { generateContextMessage, type ContextMessageInput } from './lib/ai-conte
import { z } from 'zod';
import { getBufferedEvents, flushEvents } from './lib/telemetry.js';
import { PRODUCT_ID, productConfig } from './lib/product-config.js';
import { initWebhookSubscriber, stopWebhookSubscriber } from './lib/webhook-subscriber.js';
import { jwtVerify } from 'jose';
import type { JwtPayload } from './lib/request-context.js';
@ -103,4 +104,7 @@ app.get('/api/diagnostics/config', async () => ({
await initEncryption(PRODUCT_ID, app.log);
initWebhookSubscriber();
app.addHook('onClose', () => stopWebhookSubscriber());
await startService(app, { port: config.PORT, host: config.HOST });

6868
pnpm-lock.yaml generated

File diff suppressed because it is too large Load Diff