feat(platform): Phase 1 — Durable Event Bus + Worker Runtime
- New module: event-subscriptions/ (types, repository, routes, 15 tests) - Subscription CRUD: create/list/get/update/delete event subscriptions - DLQ: list/retry/delete/purge dead-letter queue entries - Event replay: POST /events/replay by topic + time range - New lib: event-dispatcher.ts — subscription-driven dispatch with retry + DLQ - New lib: event-store-bridge.ts — persistent event log for replay capability - Worker runtime hardening (jobs/runner.ts): - Concurrency limit (MAX_CONCURRENT_JOBS=5) - Stuck-job recovery (10min threshold) - Graceful shutdown (30s drain) - Active job tracking + diagnostics (getActiveJobs/getActiveJobCount) - Per-job dedup (skip if already running) - Wired dispatcher + event-subscriptions into server.ts startup - Cosmos containers: event_subscriptions, event_dlq, event_log - 1,293 tests passing (15 new)
This commit is contained in:
parent
17f5671595
commit
15e24e5710
133
services/platform-service/src/lib/event-dispatcher.ts
Normal file
133
services/platform-service/src/lib/event-dispatcher.ts
Normal file
@ -0,0 +1,133 @@
|
|||||||
|
import { randomUUID } from 'node:crypto';
|
||||||
|
import type { PlatformEventName, PlatformEvent } from '@bytelyst/events';
|
||||||
|
import { bus } from './event-bus.js';
|
||||||
|
import { storeEvent } from './event-store-bridge.js';
|
||||||
|
|
||||||
|
interface DispatcherOptions {
|
||||||
|
maxRetries?: number;
|
||||||
|
productId?: string;
|
||||||
|
}
|
||||||
|
|
||||||
|
type HandlerFn = (
|
||||||
|
topic: string,
|
||||||
|
payload: Record<string, unknown>,
|
||||||
|
productId: string
|
||||||
|
) => Promise<void>;
|
||||||
|
|
||||||
|
const handlers = new Map<string, HandlerFn[]>();
|
||||||
|
|
||||||
|
export function registerDispatchHandler(handlerType: string, fn: HandlerFn): void {
|
||||||
|
const existing = handlers.get(handlerType) ?? [];
|
||||||
|
existing.push(fn);
|
||||||
|
handlers.set(handlerType, existing);
|
||||||
|
}
|
||||||
|
|
||||||
|
export function clearDispatchHandlers(): void {
|
||||||
|
handlers.clear();
|
||||||
|
}
|
||||||
|
|
||||||
|
export async function dispatchEvent(
|
||||||
|
topic: string,
|
||||||
|
payload: Record<string, unknown>,
|
||||||
|
productId: string,
|
||||||
|
options: DispatcherOptions = {}
|
||||||
|
): Promise<{ dispatched: number; errors: number }> {
|
||||||
|
const maxRetries = options.maxRetries ?? 3;
|
||||||
|
let dispatched = 0;
|
||||||
|
let errors = 0;
|
||||||
|
|
||||||
|
// Store event for replay capability
|
||||||
|
const eventId = `evt_${randomUUID()}`;
|
||||||
|
try {
|
||||||
|
await storeEvent(productId, topic, payload, eventId);
|
||||||
|
} catch {
|
||||||
|
// best-effort storage — don't block dispatch
|
||||||
|
}
|
||||||
|
|
||||||
|
// Look up active subscriptions for this topic
|
||||||
|
const { findActiveByTopic } = await import('../modules/event-subscriptions/repository.js');
|
||||||
|
const subscriptions = await findActiveByTopic(productId, topic).catch(
|
||||||
|
() => [] as Awaited<ReturnType<typeof findActiveByTopic>>
|
||||||
|
);
|
||||||
|
|
||||||
|
for (const sub of subscriptions) {
|
||||||
|
// Apply filter expression if present
|
||||||
|
if (sub.filterExpression) {
|
||||||
|
try {
|
||||||
|
const filterFn = new Function('payload', `return ${sub.filterExpression}`);
|
||||||
|
if (!filterFn(payload)) continue;
|
||||||
|
} catch {
|
||||||
|
// skip invalid filter expressions
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
const handlerFns = handlers.get(sub.handlerType) ?? [];
|
||||||
|
for (const fn of handlerFns) {
|
||||||
|
let attempt = 0;
|
||||||
|
let success = false;
|
||||||
|
while (attempt < maxRetries && !success) {
|
||||||
|
attempt++;
|
||||||
|
try {
|
||||||
|
await fn(topic, payload, productId);
|
||||||
|
success = true;
|
||||||
|
dispatched++;
|
||||||
|
} catch {
|
||||||
|
if (attempt >= maxRetries) {
|
||||||
|
errors++;
|
||||||
|
// Send to DLQ
|
||||||
|
try {
|
||||||
|
const { createDlqEntry } =
|
||||||
|
await import('../modules/event-subscriptions/repository.js');
|
||||||
|
await createDlqEntry({
|
||||||
|
id: `dlq_${randomUUID()}`,
|
||||||
|
productId,
|
||||||
|
topic,
|
||||||
|
payload,
|
||||||
|
error: `Failed after ${maxRetries} attempts`,
|
||||||
|
attempts: attempt,
|
||||||
|
subscriptionId: sub.id,
|
||||||
|
originalEventId: eventId,
|
||||||
|
createdAt: new Date().toISOString(),
|
||||||
|
});
|
||||||
|
} catch {
|
||||||
|
// best-effort DLQ
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return { dispatched, errors };
|
||||||
|
}
|
||||||
|
|
||||||
|
export function wireDispatcherToBus(): void {
|
||||||
|
// Subscribe to all known events and dispatch them
|
||||||
|
const eventNames: PlatformEventName[] = [
|
||||||
|
'user.created',
|
||||||
|
'user.deleted',
|
||||||
|
'subscription.created',
|
||||||
|
'subscription.changed',
|
||||||
|
'subscription.canceled',
|
||||||
|
'payment.succeeded',
|
||||||
|
'payment.failed',
|
||||||
|
'job.completed',
|
||||||
|
'job.failed',
|
||||||
|
'flag.toggled',
|
||||||
|
'license.activated',
|
||||||
|
'license.expired',
|
||||||
|
'invitation.redeemed',
|
||||||
|
'referral.completed',
|
||||||
|
'waitlist.joined',
|
||||||
|
];
|
||||||
|
|
||||||
|
for (const eventName of eventNames) {
|
||||||
|
bus.on(eventName, async (event: PlatformEvent) => {
|
||||||
|
const productId = (event.payload as Record<string, unknown>).productId as string | undefined;
|
||||||
|
if (productId) {
|
||||||
|
await dispatchEvent(event.type, event.payload as Record<string, unknown>, productId);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
54
services/platform-service/src/lib/event-store-bridge.ts
Normal file
54
services/platform-service/src/lib/event-store-bridge.ts
Normal file
@ -0,0 +1,54 @@
|
|||||||
|
import { getCollection } from './datastore.js';
|
||||||
|
|
||||||
|
interface StoredEvent {
|
||||||
|
id: string;
|
||||||
|
productId: string;
|
||||||
|
topic: string;
|
||||||
|
payload: Record<string, unknown>;
|
||||||
|
source?: string;
|
||||||
|
createdAt: string;
|
||||||
|
_ts?: number;
|
||||||
|
}
|
||||||
|
|
||||||
|
function eventLogCollection() {
|
||||||
|
return getCollection<StoredEvent>('event_log', '/productId');
|
||||||
|
}
|
||||||
|
|
||||||
|
export async function storeEvent(
|
||||||
|
productId: string,
|
||||||
|
topic: string,
|
||||||
|
payload: Record<string, unknown>,
|
||||||
|
eventId: string,
|
||||||
|
source?: string
|
||||||
|
): Promise<StoredEvent> {
|
||||||
|
const doc: StoredEvent = {
|
||||||
|
id: eventId,
|
||||||
|
productId,
|
||||||
|
topic,
|
||||||
|
payload,
|
||||||
|
source,
|
||||||
|
createdAt: new Date().toISOString(),
|
||||||
|
};
|
||||||
|
return eventLogCollection().create(doc);
|
||||||
|
}
|
||||||
|
|
||||||
|
export async function listEvents(
|
||||||
|
productId: string,
|
||||||
|
options: { topic?: string; since?: string; until?: string; limit?: number }
|
||||||
|
): Promise<StoredEvent[]> {
|
||||||
|
const events = await eventLogCollection().findMany({
|
||||||
|
filter: {
|
||||||
|
productId,
|
||||||
|
...(options.topic ? { topic: options.topic } : {}),
|
||||||
|
},
|
||||||
|
sort: { createdAt: -1 },
|
||||||
|
limit: options.limit ?? 100,
|
||||||
|
});
|
||||||
|
|
||||||
|
// Filter by time range in-memory (Cosmos SDK filter limitations)
|
||||||
|
return events.filter(e => {
|
||||||
|
if (options.since && e.createdAt < options.since) return false;
|
||||||
|
if (options.until && e.createdAt > options.until) return false;
|
||||||
|
return true;
|
||||||
|
});
|
||||||
|
}
|
||||||
@ -0,0 +1,158 @@
|
|||||||
|
import { randomUUID } from 'node:crypto';
|
||||||
|
import { getCollection } from '../../lib/datastore.js';
|
||||||
|
import type {
|
||||||
|
EventSubscriptionDoc,
|
||||||
|
CreateEventSubscriptionInput,
|
||||||
|
UpdateEventSubscriptionInput,
|
||||||
|
ListEventSubscriptionsQuery,
|
||||||
|
DlqEntryDoc,
|
||||||
|
ListDlqQuery,
|
||||||
|
} from './types.js';
|
||||||
|
|
||||||
|
function subsCollection() {
|
||||||
|
return getCollection<EventSubscriptionDoc>('event_subscriptions', '/productId');
|
||||||
|
}
|
||||||
|
|
||||||
|
function dlqCollection() {
|
||||||
|
return getCollection<DlqEntryDoc>('event_dlq', '/productId');
|
||||||
|
}
|
||||||
|
|
||||||
|
// ── Subscription CRUD ───────────────────────────────────────
|
||||||
|
|
||||||
|
export async function create(
|
||||||
|
productId: string,
|
||||||
|
input: CreateEventSubscriptionInput,
|
||||||
|
createdBy: string
|
||||||
|
): Promise<EventSubscriptionDoc> {
|
||||||
|
const now = new Date().toISOString();
|
||||||
|
const doc: EventSubscriptionDoc = {
|
||||||
|
id: `esub_${randomUUID()}`,
|
||||||
|
productId,
|
||||||
|
topic: input.topic,
|
||||||
|
handlerType: input.handlerType,
|
||||||
|
status: 'active',
|
||||||
|
config: input.config,
|
||||||
|
filterExpression: input.filterExpression,
|
||||||
|
description: input.description,
|
||||||
|
createdBy,
|
||||||
|
createdAt: now,
|
||||||
|
updatedAt: now,
|
||||||
|
};
|
||||||
|
return subsCollection().create(doc);
|
||||||
|
}
|
||||||
|
|
||||||
|
export async function getById(
|
||||||
|
id: string,
|
||||||
|
productId: string
|
||||||
|
): Promise<EventSubscriptionDoc | undefined> {
|
||||||
|
try {
|
||||||
|
const doc = await subsCollection().findById(id, productId);
|
||||||
|
return doc ?? undefined;
|
||||||
|
} catch {
|
||||||
|
return undefined;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
export async function list(
|
||||||
|
productId: string,
|
||||||
|
query: ListEventSubscriptionsQuery
|
||||||
|
): Promise<EventSubscriptionDoc[]> {
|
||||||
|
return subsCollection().findMany({
|
||||||
|
filter: {
|
||||||
|
productId,
|
||||||
|
...(query.topic ? { topic: query.topic } : {}),
|
||||||
|
...(query.handlerType ? { handlerType: query.handlerType } : {}),
|
||||||
|
...(query.status ? { status: query.status } : {}),
|
||||||
|
},
|
||||||
|
sort: { createdAt: -1 },
|
||||||
|
limit: query.limit,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
export async function update(
|
||||||
|
id: string,
|
||||||
|
productId: string,
|
||||||
|
input: UpdateEventSubscriptionInput
|
||||||
|
): Promise<EventSubscriptionDoc | undefined> {
|
||||||
|
const existing = await getById(id, productId);
|
||||||
|
if (!existing) return undefined;
|
||||||
|
|
||||||
|
const updates: Partial<EventSubscriptionDoc> = {
|
||||||
|
...(input.status !== undefined && { status: input.status }),
|
||||||
|
...(input.config !== undefined && { config: input.config }),
|
||||||
|
...(input.filterExpression !== undefined && { filterExpression: input.filterExpression }),
|
||||||
|
...(input.description !== undefined && { description: input.description }),
|
||||||
|
updatedAt: new Date().toISOString(),
|
||||||
|
};
|
||||||
|
|
||||||
|
return subsCollection().update(id, productId, updates);
|
||||||
|
}
|
||||||
|
|
||||||
|
export async function remove(id: string, productId: string): Promise<boolean> {
|
||||||
|
try {
|
||||||
|
await subsCollection().delete(id, productId);
|
||||||
|
return true;
|
||||||
|
} catch {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// ── Active subscription lookup by topic ─────────────────────
|
||||||
|
|
||||||
|
export async function findActiveByTopic(
|
||||||
|
productId: string,
|
||||||
|
topic: string
|
||||||
|
): Promise<EventSubscriptionDoc[]> {
|
||||||
|
return subsCollection().findMany({
|
||||||
|
filter: { productId, topic, status: 'active' },
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
// ── Dead-Letter Queue ───────────────────────────────────────
|
||||||
|
|
||||||
|
export async function createDlqEntry(entry: DlqEntryDoc): Promise<DlqEntryDoc> {
|
||||||
|
return dlqCollection().create(entry);
|
||||||
|
}
|
||||||
|
|
||||||
|
export async function listDlq(productId: string, query: ListDlqQuery): Promise<DlqEntryDoc[]> {
|
||||||
|
return dlqCollection().findMany({
|
||||||
|
filter: {
|
||||||
|
productId,
|
||||||
|
...(query.topic ? { topic: query.topic } : {}),
|
||||||
|
},
|
||||||
|
sort: { createdAt: -1 },
|
||||||
|
limit: query.limit,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
export async function getDlqEntry(id: string, productId: string): Promise<DlqEntryDoc | undefined> {
|
||||||
|
try {
|
||||||
|
const doc = await dlqCollection().findById(id, productId);
|
||||||
|
return doc ?? undefined;
|
||||||
|
} catch {
|
||||||
|
return undefined;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
export async function deleteDlqEntry(id: string, productId: string): Promise<boolean> {
|
||||||
|
try {
|
||||||
|
await dlqCollection().delete(id, productId);
|
||||||
|
return true;
|
||||||
|
} catch {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
export async function purgeDlq(productId: string): Promise<number> {
|
||||||
|
const entries = await dlqCollection().findMany({ filter: { productId }, limit: 1000 });
|
||||||
|
let count = 0;
|
||||||
|
for (const entry of entries) {
|
||||||
|
try {
|
||||||
|
await dlqCollection().delete(entry.id, productId);
|
||||||
|
count++;
|
||||||
|
} catch {
|
||||||
|
// best-effort
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return count;
|
||||||
|
}
|
||||||
@ -0,0 +1,230 @@
|
|||||||
|
import Fastify from 'fastify';
|
||||||
|
import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest';
|
||||||
|
|
||||||
|
const repoMock = {
|
||||||
|
list: vi.fn(),
|
||||||
|
create: vi.fn(),
|
||||||
|
getById: vi.fn(),
|
||||||
|
update: vi.fn(),
|
||||||
|
remove: vi.fn(),
|
||||||
|
findActiveByTopic: vi.fn(),
|
||||||
|
createDlqEntry: vi.fn(),
|
||||||
|
listDlq: vi.fn(),
|
||||||
|
getDlqEntry: vi.fn(),
|
||||||
|
deleteDlqEntry: vi.fn(),
|
||||||
|
purgeDlq: vi.fn(),
|
||||||
|
};
|
||||||
|
|
||||||
|
const busMock = {
|
||||||
|
emit: vi.fn(),
|
||||||
|
on: vi.fn(),
|
||||||
|
};
|
||||||
|
|
||||||
|
const storeBridgeMock = {
|
||||||
|
listEvents: vi.fn(),
|
||||||
|
};
|
||||||
|
|
||||||
|
vi.mock('./repository.js', () => repoMock);
|
||||||
|
vi.mock('../../lib/event-bus.js', () => ({ bus: busMock }));
|
||||||
|
vi.mock('../../lib/event-store-bridge.js', () => storeBridgeMock);
|
||||||
|
|
||||||
|
async function buildApp(payload?: { sub: string; productId: string; role?: string }) {
|
||||||
|
const { eventSubscriptionRoutes } = await import('./routes.js');
|
||||||
|
const app = Fastify({ logger: false });
|
||||||
|
if (payload) {
|
||||||
|
app.addHook('onRequest', async req => {
|
||||||
|
req.jwtPayload = payload;
|
||||||
|
});
|
||||||
|
}
|
||||||
|
await app.register(eventSubscriptionRoutes, { prefix: '/api' });
|
||||||
|
return app;
|
||||||
|
}
|
||||||
|
|
||||||
|
describe('eventSubscriptionRoutes', () => {
|
||||||
|
beforeEach(() => {
|
||||||
|
vi.clearAllMocks();
|
||||||
|
});
|
||||||
|
|
||||||
|
afterEach(() => {
|
||||||
|
vi.restoreAllMocks();
|
||||||
|
});
|
||||||
|
|
||||||
|
// ── Subscription CRUD ───────────────────────────────────
|
||||||
|
|
||||||
|
it('GET /event-subscriptions lists subscriptions', async () => {
|
||||||
|
repoMock.list.mockResolvedValue([{ id: 'esub_1', topic: 'user.created' }]);
|
||||||
|
const app = await buildApp({ sub: 'admin_1', productId: 'lysnrai', role: 'admin' });
|
||||||
|
|
||||||
|
const res = await app.inject({ method: 'GET', url: '/api/event-subscriptions' });
|
||||||
|
expect(res.statusCode).toBe(200);
|
||||||
|
expect(repoMock.list).toHaveBeenCalledWith('lysnrai', expect.objectContaining({ limit: 50 }));
|
||||||
|
});
|
||||||
|
|
||||||
|
it('POST /event-subscriptions creates a subscription', async () => {
|
||||||
|
repoMock.create.mockResolvedValue({ id: 'esub_1', topic: 'user.created', status: 'active' });
|
||||||
|
const app = await buildApp({ sub: 'admin_1', productId: 'lysnrai', role: 'admin' });
|
||||||
|
|
||||||
|
const res = await app.inject({
|
||||||
|
method: 'POST',
|
||||||
|
url: '/api/event-subscriptions',
|
||||||
|
payload: {
|
||||||
|
topic: 'user.created',
|
||||||
|
handlerType: 'webhook',
|
||||||
|
config: { url: 'https://example.com' },
|
||||||
|
},
|
||||||
|
});
|
||||||
|
|
||||||
|
expect(res.statusCode).toBe(200);
|
||||||
|
expect(repoMock.create).toHaveBeenCalledWith(
|
||||||
|
'lysnrai',
|
||||||
|
expect.objectContaining({ topic: 'user.created', handlerType: 'webhook' }),
|
||||||
|
'admin_1'
|
||||||
|
);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('POST /event-subscriptions rejects invalid input', async () => {
|
||||||
|
const app = await buildApp({ sub: 'admin_1', productId: 'lysnrai', role: 'admin' });
|
||||||
|
|
||||||
|
const res = await app.inject({
|
||||||
|
method: 'POST',
|
||||||
|
url: '/api/event-subscriptions',
|
||||||
|
payload: {},
|
||||||
|
});
|
||||||
|
|
||||||
|
expect(res.statusCode).toBe(400);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('GET /event-subscriptions/:id returns 404 for unknown', async () => {
|
||||||
|
repoMock.getById.mockResolvedValue(undefined);
|
||||||
|
const app = await buildApp({ sub: 'admin_1', productId: 'lysnrai', role: 'admin' });
|
||||||
|
|
||||||
|
const res = await app.inject({ method: 'GET', url: '/api/event-subscriptions/esub_xxx' });
|
||||||
|
expect(res.statusCode).toBe(404);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('PATCH /event-subscriptions/:id updates a subscription', async () => {
|
||||||
|
repoMock.update.mockResolvedValue({ id: 'esub_1', status: 'paused' });
|
||||||
|
const app = await buildApp({ sub: 'admin_1', productId: 'lysnrai', role: 'admin' });
|
||||||
|
|
||||||
|
const res = await app.inject({
|
||||||
|
method: 'PATCH',
|
||||||
|
url: '/api/event-subscriptions/esub_1',
|
||||||
|
payload: { status: 'paused' },
|
||||||
|
});
|
||||||
|
|
||||||
|
expect(res.statusCode).toBe(200);
|
||||||
|
expect(repoMock.update).toHaveBeenCalledWith(
|
||||||
|
'esub_1',
|
||||||
|
'lysnrai',
|
||||||
|
expect.objectContaining({ status: 'paused' })
|
||||||
|
);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('DELETE /event-subscriptions/:id deletes a subscription', async () => {
|
||||||
|
repoMock.remove.mockResolvedValue(true);
|
||||||
|
const app = await buildApp({ sub: 'admin_1', productId: 'lysnrai', role: 'admin' });
|
||||||
|
|
||||||
|
const res = await app.inject({ method: 'DELETE', url: '/api/event-subscriptions/esub_1' });
|
||||||
|
expect(res.statusCode).toBe(200);
|
||||||
|
expect(JSON.parse(res.body)).toEqual({ deleted: true });
|
||||||
|
});
|
||||||
|
|
||||||
|
it('DELETE /event-subscriptions/:id returns 404 for unknown', async () => {
|
||||||
|
repoMock.remove.mockResolvedValue(false);
|
||||||
|
const app = await buildApp({ sub: 'admin_1', productId: 'lysnrai', role: 'admin' });
|
||||||
|
|
||||||
|
const res = await app.inject({ method: 'DELETE', url: '/api/event-subscriptions/esub_xxx' });
|
||||||
|
expect(res.statusCode).toBe(404);
|
||||||
|
});
|
||||||
|
|
||||||
|
// ── Auth ────────────────────────────────────────────────
|
||||||
|
|
||||||
|
it('rejects unauthenticated requests', async () => {
|
||||||
|
const app = await buildApp();
|
||||||
|
const res = await app.inject({ method: 'GET', url: '/api/event-subscriptions' });
|
||||||
|
expect(res.statusCode).toBe(403);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('rejects non-admin users', async () => {
|
||||||
|
const app = await buildApp({ sub: 'user_1', productId: 'lysnrai', role: 'viewer' });
|
||||||
|
const res = await app.inject({ method: 'GET', url: '/api/event-subscriptions' });
|
||||||
|
expect(res.statusCode).toBe(403);
|
||||||
|
});
|
||||||
|
|
||||||
|
// ── DLQ ─────────────────────────────────────────────────
|
||||||
|
|
||||||
|
it('GET /event-dlq lists DLQ entries', async () => {
|
||||||
|
repoMock.listDlq.mockResolvedValue([{ id: 'dlq_1', topic: 'user.created' }]);
|
||||||
|
const app = await buildApp({ sub: 'admin_1', productId: 'lysnrai', role: 'admin' });
|
||||||
|
|
||||||
|
const res = await app.inject({ method: 'GET', url: '/api/event-dlq' });
|
||||||
|
expect(res.statusCode).toBe(200);
|
||||||
|
expect(repoMock.listDlq).toHaveBeenCalled();
|
||||||
|
});
|
||||||
|
|
||||||
|
it('POST /event-dlq/:id/retry re-emits and removes entry', async () => {
|
||||||
|
repoMock.getDlqEntry.mockResolvedValue({
|
||||||
|
id: 'dlq_1',
|
||||||
|
topic: 'user.created',
|
||||||
|
payload: { userId: 'u1', email: 'a@b.com', plan: 'free', productId: 'lysnrai' },
|
||||||
|
});
|
||||||
|
repoMock.deleteDlqEntry.mockResolvedValue(true);
|
||||||
|
busMock.emit.mockResolvedValue({ results: [] });
|
||||||
|
const app = await buildApp({ sub: 'admin_1', productId: 'lysnrai', role: 'admin' });
|
||||||
|
|
||||||
|
const res = await app.inject({ method: 'POST', url: '/api/event-dlq/dlq_1/retry' });
|
||||||
|
expect(res.statusCode).toBe(200);
|
||||||
|
expect(busMock.emit).toHaveBeenCalledWith('user.created', expect.any(Object));
|
||||||
|
expect(repoMock.deleteDlqEntry).toHaveBeenCalledWith('dlq_1', 'lysnrai');
|
||||||
|
});
|
||||||
|
|
||||||
|
it('POST /event-dlq/:id/retry returns 404 for unknown', async () => {
|
||||||
|
repoMock.getDlqEntry.mockResolvedValue(undefined);
|
||||||
|
const app = await buildApp({ sub: 'admin_1', productId: 'lysnrai', role: 'admin' });
|
||||||
|
|
||||||
|
const res = await app.inject({ method: 'POST', url: '/api/event-dlq/dlq_xxx/retry' });
|
||||||
|
expect(res.statusCode).toBe(404);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('POST /event-dlq/purge purges all entries', async () => {
|
||||||
|
repoMock.purgeDlq.mockResolvedValue(5);
|
||||||
|
const app = await buildApp({ sub: 'admin_1', productId: 'lysnrai', role: 'admin' });
|
||||||
|
|
||||||
|
const res = await app.inject({ method: 'POST', url: '/api/event-dlq/purge' });
|
||||||
|
expect(res.statusCode).toBe(200);
|
||||||
|
expect(JSON.parse(res.body)).toEqual({ purged: 5 });
|
||||||
|
});
|
||||||
|
|
||||||
|
// ── Event Replay ────────────────────────────────────────
|
||||||
|
|
||||||
|
it('POST /events/replay replays stored events', async () => {
|
||||||
|
storeBridgeMock.listEvents.mockResolvedValue([
|
||||||
|
{ id: 'evt_1', topic: 'user.created', payload: { userId: 'u1', productId: 'lysnrai' } },
|
||||||
|
{ id: 'evt_2', topic: 'user.created', payload: { userId: 'u2', productId: 'lysnrai' } },
|
||||||
|
]);
|
||||||
|
busMock.emit.mockResolvedValue({ results: [] });
|
||||||
|
const app = await buildApp({ sub: 'admin_1', productId: 'lysnrai', role: 'admin' });
|
||||||
|
|
||||||
|
const res = await app.inject({
|
||||||
|
method: 'POST',
|
||||||
|
url: '/api/events/replay',
|
||||||
|
payload: { topic: 'user.created', since: '2026-01-01' },
|
||||||
|
});
|
||||||
|
|
||||||
|
expect(res.statusCode).toBe(200);
|
||||||
|
expect(JSON.parse(res.body)).toEqual({ replayed: 2, topic: 'user.created' });
|
||||||
|
expect(busMock.emit).toHaveBeenCalledTimes(2);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('POST /events/replay requires topic', async () => {
|
||||||
|
const app = await buildApp({ sub: 'admin_1', productId: 'lysnrai', role: 'admin' });
|
||||||
|
|
||||||
|
const res = await app.inject({
|
||||||
|
method: 'POST',
|
||||||
|
url: '/api/events/replay',
|
||||||
|
payload: {},
|
||||||
|
});
|
||||||
|
|
||||||
|
expect(res.statusCode).toBe(400);
|
||||||
|
});
|
||||||
|
});
|
||||||
@ -0,0 +1,141 @@
|
|||||||
|
import type { FastifyInstance } from 'fastify';
|
||||||
|
import type { PlatformEventName } from '@bytelyst/events';
|
||||||
|
import { BadRequestError, ForbiddenError, NotFoundError } from '../../lib/errors.js';
|
||||||
|
import { bus } from '../../lib/event-bus.js';
|
||||||
|
import { listEvents } from '../../lib/event-store-bridge.js';
|
||||||
|
import {
|
||||||
|
CreateEventSubscriptionSchema,
|
||||||
|
UpdateEventSubscriptionSchema,
|
||||||
|
ListEventSubscriptionsQuerySchema,
|
||||||
|
ListDlqQuerySchema,
|
||||||
|
} from './types.js';
|
||||||
|
import * as repo from './repository.js';
|
||||||
|
|
||||||
|
function requireAdmin(req: { jwtPayload?: { sub?: string; role?: string; productId?: string } }): {
|
||||||
|
userId: string;
|
||||||
|
productId: string;
|
||||||
|
} {
|
||||||
|
const payload = req.jwtPayload;
|
||||||
|
if (!payload?.sub) throw new ForbiddenError('Authentication required');
|
||||||
|
if (!payload.role || !['super_admin', 'admin'].includes(payload.role)) {
|
||||||
|
throw new ForbiddenError('Admin access required');
|
||||||
|
}
|
||||||
|
return {
|
||||||
|
userId: payload.sub,
|
||||||
|
productId: payload.productId ?? process.env.DEFAULT_PRODUCT_ID ?? 'lysnrai',
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
export async function eventSubscriptionRoutes(app: FastifyInstance) {
|
||||||
|
// ── Subscription CRUD ─────────────────────────────────────
|
||||||
|
|
||||||
|
app.get('/event-subscriptions', async req => {
|
||||||
|
const access = requireAdmin(req);
|
||||||
|
const parsed = ListEventSubscriptionsQuerySchema.safeParse(req.query);
|
||||||
|
if (!parsed.success) {
|
||||||
|
throw new BadRequestError(parsed.error.issues.map(i => i.message).join('; '));
|
||||||
|
}
|
||||||
|
return repo.list(access.productId, parsed.data);
|
||||||
|
});
|
||||||
|
|
||||||
|
app.post('/event-subscriptions', async req => {
|
||||||
|
const access = requireAdmin(req);
|
||||||
|
const parsed = CreateEventSubscriptionSchema.safeParse(req.body);
|
||||||
|
if (!parsed.success) {
|
||||||
|
throw new BadRequestError(parsed.error.issues.map(i => i.message).join('; '));
|
||||||
|
}
|
||||||
|
return repo.create(access.productId, parsed.data, access.userId);
|
||||||
|
});
|
||||||
|
|
||||||
|
app.get('/event-subscriptions/:id', async req => {
|
||||||
|
const access = requireAdmin(req);
|
||||||
|
const { id } = req.params as { id: string };
|
||||||
|
const sub = await repo.getById(id, access.productId);
|
||||||
|
if (!sub) throw new NotFoundError(`Event subscription '${id}' not found`);
|
||||||
|
return sub;
|
||||||
|
});
|
||||||
|
|
||||||
|
app.patch('/event-subscriptions/:id', async req => {
|
||||||
|
const access = requireAdmin(req);
|
||||||
|
const { id } = req.params as { id: string };
|
||||||
|
const parsed = UpdateEventSubscriptionSchema.safeParse(req.body);
|
||||||
|
if (!parsed.success) {
|
||||||
|
throw new BadRequestError(parsed.error.issues.map(i => i.message).join('; '));
|
||||||
|
}
|
||||||
|
const updated = await repo.update(id, access.productId, parsed.data);
|
||||||
|
if (!updated) throw new NotFoundError(`Event subscription '${id}' not found`);
|
||||||
|
return updated;
|
||||||
|
});
|
||||||
|
|
||||||
|
app.delete('/event-subscriptions/:id', async req => {
|
||||||
|
const access = requireAdmin(req);
|
||||||
|
const { id } = req.params as { id: string };
|
||||||
|
const deleted = await repo.remove(id, access.productId);
|
||||||
|
if (!deleted) throw new NotFoundError(`Event subscription '${id}' not found`);
|
||||||
|
return { deleted: true };
|
||||||
|
});
|
||||||
|
|
||||||
|
// ── Dead-Letter Queue ─────────────────────────────────────
|
||||||
|
|
||||||
|
app.get('/event-dlq', async req => {
|
||||||
|
const access = requireAdmin(req);
|
||||||
|
const parsed = ListDlqQuerySchema.safeParse(req.query);
|
||||||
|
if (!parsed.success) {
|
||||||
|
throw new BadRequestError(parsed.error.issues.map(i => i.message).join('; '));
|
||||||
|
}
|
||||||
|
return repo.listDlq(access.productId, parsed.data);
|
||||||
|
});
|
||||||
|
|
||||||
|
app.post('/event-dlq/:id/retry', async req => {
|
||||||
|
const access = requireAdmin(req);
|
||||||
|
const { id } = req.params as { id: string };
|
||||||
|
const entry = await repo.getDlqEntry(id, access.productId);
|
||||||
|
if (!entry) throw new NotFoundError(`DLQ entry '${id}' not found`);
|
||||||
|
|
||||||
|
// Re-emit via bus — cast topic since stored events are already validated
|
||||||
|
await bus.emit(
|
||||||
|
entry.topic as PlatformEventName,
|
||||||
|
entry.payload as Parameters<typeof bus.emit>[1]
|
||||||
|
);
|
||||||
|
|
||||||
|
await repo.deleteDlqEntry(id, access.productId);
|
||||||
|
return { retried: true, topic: entry.topic };
|
||||||
|
});
|
||||||
|
|
||||||
|
app.delete('/event-dlq/:id', async req => {
|
||||||
|
const access = requireAdmin(req);
|
||||||
|
const { id } = req.params as { id: string };
|
||||||
|
const deleted = await repo.deleteDlqEntry(id, access.productId);
|
||||||
|
if (!deleted) throw new NotFoundError(`DLQ entry '${id}' not found`);
|
||||||
|
return { deleted: true };
|
||||||
|
});
|
||||||
|
|
||||||
|
app.post('/event-dlq/purge', async req => {
|
||||||
|
const access = requireAdmin(req);
|
||||||
|
const count = await repo.purgeDlq(access.productId);
|
||||||
|
return { purged: count };
|
||||||
|
});
|
||||||
|
|
||||||
|
// ── Event Replay ──────────────────────────────────────────
|
||||||
|
|
||||||
|
app.post('/events/replay', async req => {
|
||||||
|
const access = requireAdmin(req);
|
||||||
|
const body = req.body as { topic?: string; since?: string; until?: string };
|
||||||
|
if (!body.topic) throw new BadRequestError('topic is required');
|
||||||
|
|
||||||
|
const events = await listEvents(access.productId, {
|
||||||
|
topic: body.topic,
|
||||||
|
since: body.since,
|
||||||
|
until: body.until,
|
||||||
|
limit: 100,
|
||||||
|
});
|
||||||
|
|
||||||
|
let replayed = 0;
|
||||||
|
for (const evt of events) {
|
||||||
|
await bus.emit(evt.topic as PlatformEventName, evt.payload as Parameters<typeof bus.emit>[1]);
|
||||||
|
replayed++;
|
||||||
|
}
|
||||||
|
|
||||||
|
return { replayed, topic: body.topic };
|
||||||
|
});
|
||||||
|
}
|
||||||
@ -0,0 +1,77 @@
|
|||||||
|
import { z } from 'zod';
|
||||||
|
|
||||||
|
export const EventHandlerTypeSchema = z.enum(['webhook', 'job', 'notification', 'sse']);
|
||||||
|
export const EventSubscriptionStatusSchema = z.enum(['active', 'paused', 'disabled']);
|
||||||
|
|
||||||
|
export const EventSubscriptionSchema = z.object({
|
||||||
|
id: z.string().min(1),
|
||||||
|
productId: z.string().min(1),
|
||||||
|
topic: z.string().min(1),
|
||||||
|
handlerType: EventHandlerTypeSchema,
|
||||||
|
status: EventSubscriptionStatusSchema,
|
||||||
|
config: z.record(z.unknown()).default({}),
|
||||||
|
filterExpression: z.string().optional(),
|
||||||
|
description: z.string().optional(),
|
||||||
|
createdBy: z.string().min(1),
|
||||||
|
createdAt: z.string(),
|
||||||
|
updatedAt: z.string(),
|
||||||
|
});
|
||||||
|
|
||||||
|
export type EventSubscriptionDoc = z.infer<typeof EventSubscriptionSchema> & {
|
||||||
|
_ts?: number;
|
||||||
|
_etag?: string;
|
||||||
|
};
|
||||||
|
|
||||||
|
export const CreateEventSubscriptionSchema = z.object({
|
||||||
|
topic: z.string().min(1),
|
||||||
|
handlerType: EventHandlerTypeSchema,
|
||||||
|
config: z.record(z.unknown()).default({}),
|
||||||
|
filterExpression: z.string().optional(),
|
||||||
|
description: z.string().optional(),
|
||||||
|
});
|
||||||
|
|
||||||
|
export type CreateEventSubscriptionInput = z.infer<typeof CreateEventSubscriptionSchema>;
|
||||||
|
|
||||||
|
export const UpdateEventSubscriptionSchema = z.object({
|
||||||
|
status: EventSubscriptionStatusSchema.optional(),
|
||||||
|
config: z.record(z.unknown()).optional(),
|
||||||
|
filterExpression: z.string().optional(),
|
||||||
|
description: z.string().optional(),
|
||||||
|
});
|
||||||
|
|
||||||
|
export type UpdateEventSubscriptionInput = z.infer<typeof UpdateEventSubscriptionSchema>;
|
||||||
|
|
||||||
|
export const ListEventSubscriptionsQuerySchema = z.object({
|
||||||
|
topic: z.string().optional(),
|
||||||
|
handlerType: EventHandlerTypeSchema.optional(),
|
||||||
|
status: EventSubscriptionStatusSchema.optional(),
|
||||||
|
limit: z.coerce.number().min(1).max(100).default(50),
|
||||||
|
});
|
||||||
|
|
||||||
|
export type ListEventSubscriptionsQuery = z.infer<typeof ListEventSubscriptionsQuerySchema>;
|
||||||
|
|
||||||
|
// ── Dead-Letter Queue ────────────────────────────────────────
|
||||||
|
|
||||||
|
export const DlqEntrySchema = z.object({
|
||||||
|
id: z.string().min(1),
|
||||||
|
productId: z.string().min(1),
|
||||||
|
topic: z.string().min(1),
|
||||||
|
payload: z.record(z.unknown()),
|
||||||
|
error: z.string(),
|
||||||
|
attempts: z.number().int().min(1),
|
||||||
|
subscriptionId: z.string().optional(),
|
||||||
|
originalEventId: z.string().optional(),
|
||||||
|
createdAt: z.string(),
|
||||||
|
});
|
||||||
|
|
||||||
|
export type DlqEntryDoc = z.infer<typeof DlqEntrySchema> & {
|
||||||
|
_ts?: number;
|
||||||
|
_etag?: string;
|
||||||
|
};
|
||||||
|
|
||||||
|
export const ListDlqQuerySchema = z.object({
|
||||||
|
topic: z.string().optional(),
|
||||||
|
limit: z.coerce.number().min(1).max(100).default(50),
|
||||||
|
});
|
||||||
|
|
||||||
|
export type ListDlqQuery = z.infer<typeof ListDlqQuerySchema>;
|
||||||
@ -9,9 +9,34 @@ import type { JobDefinitionDoc, JobRunDoc, JobContext, JobResult } from './types
|
|||||||
// Uses a simple lock flag to prevent overlapping ticks.
|
// Uses a simple lock flag to prevent overlapping ticks.
|
||||||
|
|
||||||
const DEFAULT_PRODUCT_ID = 'lysnrai';
|
const DEFAULT_PRODUCT_ID = 'lysnrai';
|
||||||
|
const MAX_CONCURRENT_JOBS = 5;
|
||||||
|
const STUCK_JOB_THRESHOLD_MS = 600_000; // 10 minutes
|
||||||
|
|
||||||
let tickInterval: ReturnType<typeof globalThis.setInterval> | null = null;
|
let tickInterval: ReturnType<typeof globalThis.setInterval> | null = null;
|
||||||
let isRunning = false;
|
let isRunning = false;
|
||||||
|
let isShuttingDown = false;
|
||||||
|
|
||||||
|
// Track active job executions for concurrency + heartbeat
|
||||||
|
const activeJobs = new Map<string, { jobName: string; startedAt: number; runId: string }>();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the number of currently running jobs.
|
||||||
|
*/
|
||||||
|
export function getActiveJobCount(): number {
|
||||||
|
return activeJobs.size;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get info about all active jobs (for diagnostics).
|
||||||
|
*/
|
||||||
|
export function getActiveJobs(): Array<{ jobName: string; runId: string; elapsedMs: number }> {
|
||||||
|
const now = Date.now();
|
||||||
|
return Array.from(activeJobs.values()).map(j => ({
|
||||||
|
jobName: j.jobName,
|
||||||
|
runId: j.runId,
|
||||||
|
elapsedMs: now - j.startedAt,
|
||||||
|
}));
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Start the job runner tick loop. Evaluates every 60 seconds.
|
* Start the job runner tick loop. Evaluates every 60 seconds.
|
||||||
@ -22,8 +47,9 @@ export function startRunner(log: {
|
|||||||
error: (...a: unknown[]) => void;
|
error: (...a: unknown[]) => void;
|
||||||
}): void {
|
}): void {
|
||||||
if (tickInterval) return;
|
if (tickInterval) return;
|
||||||
|
isShuttingDown = false;
|
||||||
|
|
||||||
log.info('[jobs] Starting job runner (60s tick)');
|
log.info('[jobs] Starting job runner (60s tick, max concurrency: %d)', MAX_CONCURRENT_JOBS);
|
||||||
|
|
||||||
// Run immediately on start, then every 60s
|
// Run immediately on start, then every 60s
|
||||||
tick(log).catch(() => {});
|
tick(log).catch(() => {});
|
||||||
@ -33,13 +59,20 @@ export function startRunner(log: {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Stop the job runner.
|
* Stop the job runner gracefully. Waits for active jobs to complete (up to 30s).
|
||||||
*/
|
*/
|
||||||
export function stopRunner(): void {
|
export async function stopRunner(): Promise<void> {
|
||||||
|
isShuttingDown = true;
|
||||||
if (tickInterval) {
|
if (tickInterval) {
|
||||||
globalThis.clearInterval(tickInterval);
|
globalThis.clearInterval(tickInterval);
|
||||||
tickInterval = null;
|
tickInterval = null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Wait for active jobs to finish (up to 30 seconds)
|
||||||
|
const deadline = Date.now() + 30_000;
|
||||||
|
while (activeJobs.size > 0 && Date.now() < deadline) {
|
||||||
|
await new Promise(resolve => globalThis.setTimeout(resolve, 500));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -54,6 +87,8 @@ async function tick(log: {
|
|||||||
isRunning = true;
|
isRunning = true;
|
||||||
|
|
||||||
try {
|
try {
|
||||||
|
if (isShuttingDown) return;
|
||||||
|
|
||||||
const now = new Date();
|
const now = new Date();
|
||||||
// Round to current minute for matching
|
// Round to current minute for matching
|
||||||
now.setUTCSeconds(0, 0);
|
now.setUTCSeconds(0, 0);
|
||||||
@ -61,6 +96,17 @@ async function tick(log: {
|
|||||||
const registered = getRegisteredJobs();
|
const registered = getRegisteredJobs();
|
||||||
if (registered.length === 0) return;
|
if (registered.length === 0) return;
|
||||||
|
|
||||||
|
// Recover stuck jobs (running longer than threshold)
|
||||||
|
for (const [key, job] of activeJobs) {
|
||||||
|
if (Date.now() - job.startedAt > STUCK_JOB_THRESHOLD_MS) {
|
||||||
|
log.warn(
|
||||||
|
{ jobName: job.jobName, runId: job.runId, elapsedMs: Date.now() - job.startedAt },
|
||||||
|
'[jobs] Removing stuck job from active tracking'
|
||||||
|
);
|
||||||
|
activeJobs.delete(key);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Get definitions from DB (or create defaults)
|
// Get definitions from DB (or create defaults)
|
||||||
let definitions: JobDefinitionDoc[];
|
let definitions: JobDefinitionDoc[];
|
||||||
try {
|
try {
|
||||||
@ -74,8 +120,20 @@ async function tick(log: {
|
|||||||
if (def.status !== 'enabled') continue;
|
if (def.status !== 'enabled') continue;
|
||||||
if (!getJobHandler(def.name)) continue;
|
if (!getJobHandler(def.name)) continue;
|
||||||
|
|
||||||
|
// Enforce concurrency limit
|
||||||
|
if (activeJobs.size >= MAX_CONCURRENT_JOBS) {
|
||||||
|
log.warn(
|
||||||
|
'[jobs] Concurrency limit reached (%d), skipping remaining jobs',
|
||||||
|
MAX_CONCURRENT_JOBS
|
||||||
|
);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Skip if this job is already running
|
||||||
|
if (activeJobs.has(def.name)) continue;
|
||||||
|
|
||||||
if (cronMatches(def.cronExpression, now)) {
|
if (cronMatches(def.cronExpression, now)) {
|
||||||
await executeJob(def, 'scheduler', log);
|
executeJob(def, 'scheduler', log).catch(() => {});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
@ -103,6 +161,9 @@ export async function executeJob(
|
|||||||
const runId = crypto.randomUUID();
|
const runId = crypto.randomUUID();
|
||||||
const startedAt = new Date().toISOString();
|
const startedAt = new Date().toISOString();
|
||||||
|
|
||||||
|
// Track this job as active
|
||||||
|
activeJobs.set(def.name, { jobName: def.name, startedAt: Date.now(), runId });
|
||||||
|
|
||||||
// Create run record
|
// Create run record
|
||||||
const run: JobRunDoc = {
|
const run: JobRunDoc = {
|
||||||
id: runId,
|
id: runId,
|
||||||
@ -194,6 +255,9 @@ export async function executeJob(
|
|||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Remove from active tracking
|
||||||
|
activeJobs.delete(def.name);
|
||||||
|
|
||||||
const durationMs = Date.now() - start;
|
const durationMs = Date.now() - start;
|
||||||
const completedAt = new Date().toISOString();
|
const completedAt = new Date().toISOString();
|
||||||
const status = result.success ? 'success' : 'failed';
|
const status = result.success ? 'success' : 'failed';
|
||||||
|
|||||||
@ -107,7 +107,9 @@ import { registerDiagnosticsSubscribers } from './modules/diagnostics/subscriber
|
|||||||
import { registerDeliverySubscribers } from './modules/delivery/subscribers.js';
|
import { registerDeliverySubscribers } from './modules/delivery/subscribers.js';
|
||||||
import { verifyToken } from './modules/auth/jwt.js';
|
import { verifyToken } from './modules/auth/jwt.js';
|
||||||
import { registerOptionalApiKeyContext } from './lib/api-key-auth.js';
|
import { registerOptionalApiKeyContext } from './lib/api-key-auth.js';
|
||||||
|
import { eventSubscriptionRoutes } from './modules/event-subscriptions/routes.js';
|
||||||
import { startEventBus, stopEventBus } from './lib/event-bus.js';
|
import { startEventBus, stopEventBus } from './lib/event-bus.js';
|
||||||
|
import { wireDispatcherToBus } from './lib/event-dispatcher.js';
|
||||||
|
|
||||||
await initCosmosIfNeeded();
|
await initCosmosIfNeeded();
|
||||||
await loadProductCache();
|
await loadProductCache();
|
||||||
@ -261,9 +263,13 @@ await app.register(retentionRoutes, { prefix: '/api' });
|
|||||||
await app.register(backupRoutes, { prefix: '/api' });
|
await app.register(backupRoutes, { prefix: '/api' });
|
||||||
await app.register(apiVersioningRoutes, { prefix: '/api' });
|
await app.register(apiVersioningRoutes, { prefix: '/api' });
|
||||||
|
|
||||||
|
// Event subscriptions + DLQ + replay
|
||||||
|
await app.register(eventSubscriptionRoutes, { prefix: '/api' });
|
||||||
|
|
||||||
// Register event bus subscribers
|
// Register event bus subscribers
|
||||||
registerDiagnosticsSubscribers(app.log);
|
registerDiagnosticsSubscribers(app.log);
|
||||||
registerDeliverySubscribers(app.log);
|
registerDeliverySubscribers(app.log);
|
||||||
|
wireDispatcherToBus();
|
||||||
startEventBus();
|
startEventBus();
|
||||||
app.addHook('onClose', async () => {
|
app.addHook('onClose', async () => {
|
||||||
await stopEventBus();
|
await stopEventBus();
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user