learning_ai_common_plat/services/platform-service/src/lib/event-dispatcher.ts
saravanakumardb1 ca6a4d41d8 feat(flags): production-grade feature flag system — multi-variate, segments, audit, SSE, scheduling, prerequisites
- types.ts: multi-variate flags (boolean/string/number/JSON), targeting rules with 18 operators, scheduling (enableAt/disableAt/gradual rollout), prerequisites, segments, audit log, evaluation context
- evaluator.ts: pure evaluation engine — schedule checking, prerequisite dependencies (circular detection), individual targeting, targeting rules (AND clauses), segment matching, percentage rollout (FNV-1a), OS version/platform/region filtering
- repository.ts: 3 collections — feature_flags, flag_segments, flag_audit_log
- routes.ts: 18 endpoints — flag CRUD, toggle, archive, kill switch (with tag filter), segment CRUD, audit log, POST /flags/evaluate (multi-variate), SSE /flags/stream, legacy /flags/poll backward-compat
- seed.ts: updated to produce full FeatureFlagDoc with variations, version
- flags.test.ts: 63 tests — schema validation, evaluator engine, targeting rules, segments, prerequisites, scheduling, gradual rollouts, multi-variate, version comparison, deterministic hashing
- @bytelyst/events: added flag.created, flag.updated, flag.deleted, flag.kill_switch event types
- @bytelyst/feature-flag-client: multi-variate support (getValue, getEvaluation, getAllEvaluations), SSE streaming mode, onChange listeners, auth token injection
- event-dispatcher.ts + webhooks/types.ts: wired new flag events
2026-03-21 11:44:49 -07:00

138 lines
3.8 KiB
TypeScript

import { randomUUID } from 'node:crypto';
import type { PlatformEventName, PlatformEvent } from '@bytelyst/events';
import { bus } from './event-bus.js';
import { storeEvent } from './event-store-bridge.js';
interface DispatcherOptions {
maxRetries?: number;
productId?: string;
}
type HandlerFn = (
topic: string,
payload: Record<string, unknown>,
productId: string
) => Promise<void>;
const handlers = new Map<string, HandlerFn[]>();
export function registerDispatchHandler(handlerType: string, fn: HandlerFn): void {
const existing = handlers.get(handlerType) ?? [];
existing.push(fn);
handlers.set(handlerType, existing);
}
export function clearDispatchHandlers(): void {
handlers.clear();
}
export async function dispatchEvent(
topic: string,
payload: Record<string, unknown>,
productId: string,
options: DispatcherOptions = {}
): Promise<{ dispatched: number; errors: number }> {
const maxRetries = options.maxRetries ?? 3;
let dispatched = 0;
let errors = 0;
// Store event for replay capability
const eventId = `evt_${randomUUID()}`;
try {
await storeEvent(productId, topic, payload, eventId);
} catch {
// best-effort storage — don't block dispatch
}
// Look up active subscriptions for this topic
const { findActiveByTopic } = await import('../modules/event-subscriptions/repository.js');
const subscriptions = await findActiveByTopic(productId, topic).catch(
() => [] as Awaited<ReturnType<typeof findActiveByTopic>>
);
for (const sub of subscriptions) {
// Apply filter expression if present
if (sub.filterExpression) {
try {
const filterFn = new Function('payload', `return ${sub.filterExpression}`);
if (!filterFn(payload)) continue;
} catch {
// skip invalid filter expressions
continue;
}
}
const handlerFns = handlers.get(sub.handlerType) ?? [];
for (const fn of handlerFns) {
let attempt = 0;
let success = false;
while (attempt < maxRetries && !success) {
attempt++;
try {
await fn(topic, payload, productId);
success = true;
dispatched++;
} catch {
if (attempt >= maxRetries) {
errors++;
// Send to DLQ
try {
const { createDlqEntry } =
await import('../modules/event-subscriptions/repository.js');
await createDlqEntry({
id: `dlq_${randomUUID()}`,
productId,
topic,
payload,
error: `Failed after ${maxRetries} attempts`,
attempts: attempt,
subscriptionId: sub.id,
originalEventId: eventId,
createdAt: new Date().toISOString(),
});
} catch {
// best-effort DLQ
}
}
}
}
}
}
return { dispatched, errors };
}
export function wireDispatcherToBus(): void {
// Subscribe to all known events and dispatch them
const eventNames: PlatformEventName[] = [
'user.created',
'user.deleted',
'subscription.created',
'subscription.changed',
'subscription.canceled',
'payment.succeeded',
'payment.failed',
'job.completed',
'job.failed',
'flag.toggled',
'flag.created',
'flag.updated',
'flag.deleted',
'flag.kill_switch',
'license.activated',
'license.expired',
'invitation.redeemed',
'referral.completed',
'waitlist.joined',
];
for (const eventName of eventNames) {
bus.on(eventName, async (event: PlatformEvent) => {
const productId = (event.payload as Record<string, unknown>).productId as string | undefined;
if (productId) {
await dispatchEvent(event.type, event.payload as Record<string, unknown>, productId);
}
});
}
}