feat(platform): Phase 1 — Durable Event Bus + Worker Runtime

- New module: event-subscriptions/ (types, repository, routes, 15 tests)
  - Subscription CRUD: create/list/get/update/delete event subscriptions
  - DLQ: list/retry/delete/purge dead-letter queue entries
  - Event replay: POST /events/replay by topic + time range
- New lib: event-dispatcher.ts — subscription-driven dispatch with retry + DLQ
- New lib: event-store-bridge.ts — persistent event log for replay capability
- Worker runtime hardening (jobs/runner.ts):
  - Concurrency limit (MAX_CONCURRENT_JOBS=5)
  - Stuck-job recovery (10min threshold)
  - Graceful shutdown (30s drain)
  - Active job tracking + diagnostics (getActiveJobs/getActiveJobCount)
  - Per-job dedup (skip if already running)
- Wired dispatcher + event-subscriptions into server.ts startup
- Cosmos containers: event_subscriptions, event_dlq, event_log
- 1,293 tests passing (15 new)
This commit is contained in:
saravanakumardb1 2026-03-20 03:12:54 -07:00
parent 17f5671595
commit 15e24e5710
8 changed files with 867 additions and 4 deletions

View File

@ -0,0 +1,133 @@
import { randomUUID } from 'node:crypto';
import type { PlatformEventName, PlatformEvent } from '@bytelyst/events';
import { bus } from './event-bus.js';
import { storeEvent } from './event-store-bridge.js';
interface DispatcherOptions {
maxRetries?: number;
productId?: string;
}
type HandlerFn = (
topic: string,
payload: Record<string, unknown>,
productId: string
) => Promise<void>;
const handlers = new Map<string, HandlerFn[]>();
export function registerDispatchHandler(handlerType: string, fn: HandlerFn): void {
const existing = handlers.get(handlerType) ?? [];
existing.push(fn);
handlers.set(handlerType, existing);
}
export function clearDispatchHandlers(): void {
handlers.clear();
}
export async function dispatchEvent(
topic: string,
payload: Record<string, unknown>,
productId: string,
options: DispatcherOptions = {}
): Promise<{ dispatched: number; errors: number }> {
const maxRetries = options.maxRetries ?? 3;
let dispatched = 0;
let errors = 0;
// Store event for replay capability
const eventId = `evt_${randomUUID()}`;
try {
await storeEvent(productId, topic, payload, eventId);
} catch {
// best-effort storage — don't block dispatch
}
// Look up active subscriptions for this topic
const { findActiveByTopic } = await import('../modules/event-subscriptions/repository.js');
const subscriptions = await findActiveByTopic(productId, topic).catch(
() => [] as Awaited<ReturnType<typeof findActiveByTopic>>
);
for (const sub of subscriptions) {
// Apply filter expression if present
if (sub.filterExpression) {
try {
const filterFn = new Function('payload', `return ${sub.filterExpression}`);
if (!filterFn(payload)) continue;
} catch {
// skip invalid filter expressions
continue;
}
}
const handlerFns = handlers.get(sub.handlerType) ?? [];
for (const fn of handlerFns) {
let attempt = 0;
let success = false;
while (attempt < maxRetries && !success) {
attempt++;
try {
await fn(topic, payload, productId);
success = true;
dispatched++;
} catch {
if (attempt >= maxRetries) {
errors++;
// Send to DLQ
try {
const { createDlqEntry } =
await import('../modules/event-subscriptions/repository.js');
await createDlqEntry({
id: `dlq_${randomUUID()}`,
productId,
topic,
payload,
error: `Failed after ${maxRetries} attempts`,
attempts: attempt,
subscriptionId: sub.id,
originalEventId: eventId,
createdAt: new Date().toISOString(),
});
} catch {
// best-effort DLQ
}
}
}
}
}
}
return { dispatched, errors };
}
export function wireDispatcherToBus(): void {
// Subscribe to all known events and dispatch them
const eventNames: PlatformEventName[] = [
'user.created',
'user.deleted',
'subscription.created',
'subscription.changed',
'subscription.canceled',
'payment.succeeded',
'payment.failed',
'job.completed',
'job.failed',
'flag.toggled',
'license.activated',
'license.expired',
'invitation.redeemed',
'referral.completed',
'waitlist.joined',
];
for (const eventName of eventNames) {
bus.on(eventName, async (event: PlatformEvent) => {
const productId = (event.payload as Record<string, unknown>).productId as string | undefined;
if (productId) {
await dispatchEvent(event.type, event.payload as Record<string, unknown>, productId);
}
});
}
}

View File

@ -0,0 +1,54 @@
import { getCollection } from './datastore.js';
interface StoredEvent {
id: string;
productId: string;
topic: string;
payload: Record<string, unknown>;
source?: string;
createdAt: string;
_ts?: number;
}
function eventLogCollection() {
return getCollection<StoredEvent>('event_log', '/productId');
}
export async function storeEvent(
productId: string,
topic: string,
payload: Record<string, unknown>,
eventId: string,
source?: string
): Promise<StoredEvent> {
const doc: StoredEvent = {
id: eventId,
productId,
topic,
payload,
source,
createdAt: new Date().toISOString(),
};
return eventLogCollection().create(doc);
}
export async function listEvents(
productId: string,
options: { topic?: string; since?: string; until?: string; limit?: number }
): Promise<StoredEvent[]> {
const events = await eventLogCollection().findMany({
filter: {
productId,
...(options.topic ? { topic: options.topic } : {}),
},
sort: { createdAt: -1 },
limit: options.limit ?? 100,
});
// Filter by time range in-memory (Cosmos SDK filter limitations)
return events.filter(e => {
if (options.since && e.createdAt < options.since) return false;
if (options.until && e.createdAt > options.until) return false;
return true;
});
}

View File

@ -0,0 +1,158 @@
import { randomUUID } from 'node:crypto';
import { getCollection } from '../../lib/datastore.js';
import type {
EventSubscriptionDoc,
CreateEventSubscriptionInput,
UpdateEventSubscriptionInput,
ListEventSubscriptionsQuery,
DlqEntryDoc,
ListDlqQuery,
} from './types.js';
function subsCollection() {
return getCollection<EventSubscriptionDoc>('event_subscriptions', '/productId');
}
function dlqCollection() {
return getCollection<DlqEntryDoc>('event_dlq', '/productId');
}
// ── Subscription CRUD ───────────────────────────────────────
export async function create(
productId: string,
input: CreateEventSubscriptionInput,
createdBy: string
): Promise<EventSubscriptionDoc> {
const now = new Date().toISOString();
const doc: EventSubscriptionDoc = {
id: `esub_${randomUUID()}`,
productId,
topic: input.topic,
handlerType: input.handlerType,
status: 'active',
config: input.config,
filterExpression: input.filterExpression,
description: input.description,
createdBy,
createdAt: now,
updatedAt: now,
};
return subsCollection().create(doc);
}
export async function getById(
id: string,
productId: string
): Promise<EventSubscriptionDoc | undefined> {
try {
const doc = await subsCollection().findById(id, productId);
return doc ?? undefined;
} catch {
return undefined;
}
}
export async function list(
productId: string,
query: ListEventSubscriptionsQuery
): Promise<EventSubscriptionDoc[]> {
return subsCollection().findMany({
filter: {
productId,
...(query.topic ? { topic: query.topic } : {}),
...(query.handlerType ? { handlerType: query.handlerType } : {}),
...(query.status ? { status: query.status } : {}),
},
sort: { createdAt: -1 },
limit: query.limit,
});
}
export async function update(
id: string,
productId: string,
input: UpdateEventSubscriptionInput
): Promise<EventSubscriptionDoc | undefined> {
const existing = await getById(id, productId);
if (!existing) return undefined;
const updates: Partial<EventSubscriptionDoc> = {
...(input.status !== undefined && { status: input.status }),
...(input.config !== undefined && { config: input.config }),
...(input.filterExpression !== undefined && { filterExpression: input.filterExpression }),
...(input.description !== undefined && { description: input.description }),
updatedAt: new Date().toISOString(),
};
return subsCollection().update(id, productId, updates);
}
export async function remove(id: string, productId: string): Promise<boolean> {
try {
await subsCollection().delete(id, productId);
return true;
} catch {
return false;
}
}
// ── Active subscription lookup by topic ─────────────────────
export async function findActiveByTopic(
productId: string,
topic: string
): Promise<EventSubscriptionDoc[]> {
return subsCollection().findMany({
filter: { productId, topic, status: 'active' },
});
}
// ── Dead-Letter Queue ───────────────────────────────────────
export async function createDlqEntry(entry: DlqEntryDoc): Promise<DlqEntryDoc> {
return dlqCollection().create(entry);
}
export async function listDlq(productId: string, query: ListDlqQuery): Promise<DlqEntryDoc[]> {
return dlqCollection().findMany({
filter: {
productId,
...(query.topic ? { topic: query.topic } : {}),
},
sort: { createdAt: -1 },
limit: query.limit,
});
}
export async function getDlqEntry(id: string, productId: string): Promise<DlqEntryDoc | undefined> {
try {
const doc = await dlqCollection().findById(id, productId);
return doc ?? undefined;
} catch {
return undefined;
}
}
export async function deleteDlqEntry(id: string, productId: string): Promise<boolean> {
try {
await dlqCollection().delete(id, productId);
return true;
} catch {
return false;
}
}
export async function purgeDlq(productId: string): Promise<number> {
const entries = await dlqCollection().findMany({ filter: { productId }, limit: 1000 });
let count = 0;
for (const entry of entries) {
try {
await dlqCollection().delete(entry.id, productId);
count++;
} catch {
// best-effort
}
}
return count;
}

View File

@ -0,0 +1,230 @@
import Fastify from 'fastify';
import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest';
const repoMock = {
list: vi.fn(),
create: vi.fn(),
getById: vi.fn(),
update: vi.fn(),
remove: vi.fn(),
findActiveByTopic: vi.fn(),
createDlqEntry: vi.fn(),
listDlq: vi.fn(),
getDlqEntry: vi.fn(),
deleteDlqEntry: vi.fn(),
purgeDlq: vi.fn(),
};
const busMock = {
emit: vi.fn(),
on: vi.fn(),
};
const storeBridgeMock = {
listEvents: vi.fn(),
};
vi.mock('./repository.js', () => repoMock);
vi.mock('../../lib/event-bus.js', () => ({ bus: busMock }));
vi.mock('../../lib/event-store-bridge.js', () => storeBridgeMock);
async function buildApp(payload?: { sub: string; productId: string; role?: string }) {
const { eventSubscriptionRoutes } = await import('./routes.js');
const app = Fastify({ logger: false });
if (payload) {
app.addHook('onRequest', async req => {
req.jwtPayload = payload;
});
}
await app.register(eventSubscriptionRoutes, { prefix: '/api' });
return app;
}
describe('eventSubscriptionRoutes', () => {
beforeEach(() => {
vi.clearAllMocks();
});
afterEach(() => {
vi.restoreAllMocks();
});
// ── Subscription CRUD ───────────────────────────────────
it('GET /event-subscriptions lists subscriptions', async () => {
repoMock.list.mockResolvedValue([{ id: 'esub_1', topic: 'user.created' }]);
const app = await buildApp({ sub: 'admin_1', productId: 'lysnrai', role: 'admin' });
const res = await app.inject({ method: 'GET', url: '/api/event-subscriptions' });
expect(res.statusCode).toBe(200);
expect(repoMock.list).toHaveBeenCalledWith('lysnrai', expect.objectContaining({ limit: 50 }));
});
it('POST /event-subscriptions creates a subscription', async () => {
repoMock.create.mockResolvedValue({ id: 'esub_1', topic: 'user.created', status: 'active' });
const app = await buildApp({ sub: 'admin_1', productId: 'lysnrai', role: 'admin' });
const res = await app.inject({
method: 'POST',
url: '/api/event-subscriptions',
payload: {
topic: 'user.created',
handlerType: 'webhook',
config: { url: 'https://example.com' },
},
});
expect(res.statusCode).toBe(200);
expect(repoMock.create).toHaveBeenCalledWith(
'lysnrai',
expect.objectContaining({ topic: 'user.created', handlerType: 'webhook' }),
'admin_1'
);
});
it('POST /event-subscriptions rejects invalid input', async () => {
const app = await buildApp({ sub: 'admin_1', productId: 'lysnrai', role: 'admin' });
const res = await app.inject({
method: 'POST',
url: '/api/event-subscriptions',
payload: {},
});
expect(res.statusCode).toBe(400);
});
it('GET /event-subscriptions/:id returns 404 for unknown', async () => {
repoMock.getById.mockResolvedValue(undefined);
const app = await buildApp({ sub: 'admin_1', productId: 'lysnrai', role: 'admin' });
const res = await app.inject({ method: 'GET', url: '/api/event-subscriptions/esub_xxx' });
expect(res.statusCode).toBe(404);
});
it('PATCH /event-subscriptions/:id updates a subscription', async () => {
repoMock.update.mockResolvedValue({ id: 'esub_1', status: 'paused' });
const app = await buildApp({ sub: 'admin_1', productId: 'lysnrai', role: 'admin' });
const res = await app.inject({
method: 'PATCH',
url: '/api/event-subscriptions/esub_1',
payload: { status: 'paused' },
});
expect(res.statusCode).toBe(200);
expect(repoMock.update).toHaveBeenCalledWith(
'esub_1',
'lysnrai',
expect.objectContaining({ status: 'paused' })
);
});
it('DELETE /event-subscriptions/:id deletes a subscription', async () => {
repoMock.remove.mockResolvedValue(true);
const app = await buildApp({ sub: 'admin_1', productId: 'lysnrai', role: 'admin' });
const res = await app.inject({ method: 'DELETE', url: '/api/event-subscriptions/esub_1' });
expect(res.statusCode).toBe(200);
expect(JSON.parse(res.body)).toEqual({ deleted: true });
});
it('DELETE /event-subscriptions/:id returns 404 for unknown', async () => {
repoMock.remove.mockResolvedValue(false);
const app = await buildApp({ sub: 'admin_1', productId: 'lysnrai', role: 'admin' });
const res = await app.inject({ method: 'DELETE', url: '/api/event-subscriptions/esub_xxx' });
expect(res.statusCode).toBe(404);
});
// ── Auth ────────────────────────────────────────────────
it('rejects unauthenticated requests', async () => {
const app = await buildApp();
const res = await app.inject({ method: 'GET', url: '/api/event-subscriptions' });
expect(res.statusCode).toBe(403);
});
it('rejects non-admin users', async () => {
const app = await buildApp({ sub: 'user_1', productId: 'lysnrai', role: 'viewer' });
const res = await app.inject({ method: 'GET', url: '/api/event-subscriptions' });
expect(res.statusCode).toBe(403);
});
// ── DLQ ─────────────────────────────────────────────────
it('GET /event-dlq lists DLQ entries', async () => {
repoMock.listDlq.mockResolvedValue([{ id: 'dlq_1', topic: 'user.created' }]);
const app = await buildApp({ sub: 'admin_1', productId: 'lysnrai', role: 'admin' });
const res = await app.inject({ method: 'GET', url: '/api/event-dlq' });
expect(res.statusCode).toBe(200);
expect(repoMock.listDlq).toHaveBeenCalled();
});
it('POST /event-dlq/:id/retry re-emits and removes entry', async () => {
repoMock.getDlqEntry.mockResolvedValue({
id: 'dlq_1',
topic: 'user.created',
payload: { userId: 'u1', email: 'a@b.com', plan: 'free', productId: 'lysnrai' },
});
repoMock.deleteDlqEntry.mockResolvedValue(true);
busMock.emit.mockResolvedValue({ results: [] });
const app = await buildApp({ sub: 'admin_1', productId: 'lysnrai', role: 'admin' });
const res = await app.inject({ method: 'POST', url: '/api/event-dlq/dlq_1/retry' });
expect(res.statusCode).toBe(200);
expect(busMock.emit).toHaveBeenCalledWith('user.created', expect.any(Object));
expect(repoMock.deleteDlqEntry).toHaveBeenCalledWith('dlq_1', 'lysnrai');
});
it('POST /event-dlq/:id/retry returns 404 for unknown', async () => {
repoMock.getDlqEntry.mockResolvedValue(undefined);
const app = await buildApp({ sub: 'admin_1', productId: 'lysnrai', role: 'admin' });
const res = await app.inject({ method: 'POST', url: '/api/event-dlq/dlq_xxx/retry' });
expect(res.statusCode).toBe(404);
});
it('POST /event-dlq/purge purges all entries', async () => {
repoMock.purgeDlq.mockResolvedValue(5);
const app = await buildApp({ sub: 'admin_1', productId: 'lysnrai', role: 'admin' });
const res = await app.inject({ method: 'POST', url: '/api/event-dlq/purge' });
expect(res.statusCode).toBe(200);
expect(JSON.parse(res.body)).toEqual({ purged: 5 });
});
// ── Event Replay ────────────────────────────────────────
it('POST /events/replay replays stored events', async () => {
storeBridgeMock.listEvents.mockResolvedValue([
{ id: 'evt_1', topic: 'user.created', payload: { userId: 'u1', productId: 'lysnrai' } },
{ id: 'evt_2', topic: 'user.created', payload: { userId: 'u2', productId: 'lysnrai' } },
]);
busMock.emit.mockResolvedValue({ results: [] });
const app = await buildApp({ sub: 'admin_1', productId: 'lysnrai', role: 'admin' });
const res = await app.inject({
method: 'POST',
url: '/api/events/replay',
payload: { topic: 'user.created', since: '2026-01-01' },
});
expect(res.statusCode).toBe(200);
expect(JSON.parse(res.body)).toEqual({ replayed: 2, topic: 'user.created' });
expect(busMock.emit).toHaveBeenCalledTimes(2);
});
it('POST /events/replay requires topic', async () => {
const app = await buildApp({ sub: 'admin_1', productId: 'lysnrai', role: 'admin' });
const res = await app.inject({
method: 'POST',
url: '/api/events/replay',
payload: {},
});
expect(res.statusCode).toBe(400);
});
});

View File

@ -0,0 +1,141 @@
import type { FastifyInstance } from 'fastify';
import type { PlatformEventName } from '@bytelyst/events';
import { BadRequestError, ForbiddenError, NotFoundError } from '../../lib/errors.js';
import { bus } from '../../lib/event-bus.js';
import { listEvents } from '../../lib/event-store-bridge.js';
import {
CreateEventSubscriptionSchema,
UpdateEventSubscriptionSchema,
ListEventSubscriptionsQuerySchema,
ListDlqQuerySchema,
} from './types.js';
import * as repo from './repository.js';
function requireAdmin(req: { jwtPayload?: { sub?: string; role?: string; productId?: string } }): {
userId: string;
productId: string;
} {
const payload = req.jwtPayload;
if (!payload?.sub) throw new ForbiddenError('Authentication required');
if (!payload.role || !['super_admin', 'admin'].includes(payload.role)) {
throw new ForbiddenError('Admin access required');
}
return {
userId: payload.sub,
productId: payload.productId ?? process.env.DEFAULT_PRODUCT_ID ?? 'lysnrai',
};
}
export async function eventSubscriptionRoutes(app: FastifyInstance) {
// ── Subscription CRUD ─────────────────────────────────────
app.get('/event-subscriptions', async req => {
const access = requireAdmin(req);
const parsed = ListEventSubscriptionsQuerySchema.safeParse(req.query);
if (!parsed.success) {
throw new BadRequestError(parsed.error.issues.map(i => i.message).join('; '));
}
return repo.list(access.productId, parsed.data);
});
app.post('/event-subscriptions', async req => {
const access = requireAdmin(req);
const parsed = CreateEventSubscriptionSchema.safeParse(req.body);
if (!parsed.success) {
throw new BadRequestError(parsed.error.issues.map(i => i.message).join('; '));
}
return repo.create(access.productId, parsed.data, access.userId);
});
app.get('/event-subscriptions/:id', async req => {
const access = requireAdmin(req);
const { id } = req.params as { id: string };
const sub = await repo.getById(id, access.productId);
if (!sub) throw new NotFoundError(`Event subscription '${id}' not found`);
return sub;
});
app.patch('/event-subscriptions/:id', async req => {
const access = requireAdmin(req);
const { id } = req.params as { id: string };
const parsed = UpdateEventSubscriptionSchema.safeParse(req.body);
if (!parsed.success) {
throw new BadRequestError(parsed.error.issues.map(i => i.message).join('; '));
}
const updated = await repo.update(id, access.productId, parsed.data);
if (!updated) throw new NotFoundError(`Event subscription '${id}' not found`);
return updated;
});
app.delete('/event-subscriptions/:id', async req => {
const access = requireAdmin(req);
const { id } = req.params as { id: string };
const deleted = await repo.remove(id, access.productId);
if (!deleted) throw new NotFoundError(`Event subscription '${id}' not found`);
return { deleted: true };
});
// ── Dead-Letter Queue ─────────────────────────────────────
app.get('/event-dlq', async req => {
const access = requireAdmin(req);
const parsed = ListDlqQuerySchema.safeParse(req.query);
if (!parsed.success) {
throw new BadRequestError(parsed.error.issues.map(i => i.message).join('; '));
}
return repo.listDlq(access.productId, parsed.data);
});
app.post('/event-dlq/:id/retry', async req => {
const access = requireAdmin(req);
const { id } = req.params as { id: string };
const entry = await repo.getDlqEntry(id, access.productId);
if (!entry) throw new NotFoundError(`DLQ entry '${id}' not found`);
// Re-emit via bus — cast topic since stored events are already validated
await bus.emit(
entry.topic as PlatformEventName,
entry.payload as Parameters<typeof bus.emit>[1]
);
await repo.deleteDlqEntry(id, access.productId);
return { retried: true, topic: entry.topic };
});
app.delete('/event-dlq/:id', async req => {
const access = requireAdmin(req);
const { id } = req.params as { id: string };
const deleted = await repo.deleteDlqEntry(id, access.productId);
if (!deleted) throw new NotFoundError(`DLQ entry '${id}' not found`);
return { deleted: true };
});
app.post('/event-dlq/purge', async req => {
const access = requireAdmin(req);
const count = await repo.purgeDlq(access.productId);
return { purged: count };
});
// ── Event Replay ──────────────────────────────────────────
app.post('/events/replay', async req => {
const access = requireAdmin(req);
const body = req.body as { topic?: string; since?: string; until?: string };
if (!body.topic) throw new BadRequestError('topic is required');
const events = await listEvents(access.productId, {
topic: body.topic,
since: body.since,
until: body.until,
limit: 100,
});
let replayed = 0;
for (const evt of events) {
await bus.emit(evt.topic as PlatformEventName, evt.payload as Parameters<typeof bus.emit>[1]);
replayed++;
}
return { replayed, topic: body.topic };
});
}

View File

@ -0,0 +1,77 @@
import { z } from 'zod';
export const EventHandlerTypeSchema = z.enum(['webhook', 'job', 'notification', 'sse']);
export const EventSubscriptionStatusSchema = z.enum(['active', 'paused', 'disabled']);
export const EventSubscriptionSchema = z.object({
id: z.string().min(1),
productId: z.string().min(1),
topic: z.string().min(1),
handlerType: EventHandlerTypeSchema,
status: EventSubscriptionStatusSchema,
config: z.record(z.unknown()).default({}),
filterExpression: z.string().optional(),
description: z.string().optional(),
createdBy: z.string().min(1),
createdAt: z.string(),
updatedAt: z.string(),
});
export type EventSubscriptionDoc = z.infer<typeof EventSubscriptionSchema> & {
_ts?: number;
_etag?: string;
};
export const CreateEventSubscriptionSchema = z.object({
topic: z.string().min(1),
handlerType: EventHandlerTypeSchema,
config: z.record(z.unknown()).default({}),
filterExpression: z.string().optional(),
description: z.string().optional(),
});
export type CreateEventSubscriptionInput = z.infer<typeof CreateEventSubscriptionSchema>;
export const UpdateEventSubscriptionSchema = z.object({
status: EventSubscriptionStatusSchema.optional(),
config: z.record(z.unknown()).optional(),
filterExpression: z.string().optional(),
description: z.string().optional(),
});
export type UpdateEventSubscriptionInput = z.infer<typeof UpdateEventSubscriptionSchema>;
export const ListEventSubscriptionsQuerySchema = z.object({
topic: z.string().optional(),
handlerType: EventHandlerTypeSchema.optional(),
status: EventSubscriptionStatusSchema.optional(),
limit: z.coerce.number().min(1).max(100).default(50),
});
export type ListEventSubscriptionsQuery = z.infer<typeof ListEventSubscriptionsQuerySchema>;
// ── Dead-Letter Queue ────────────────────────────────────────
export const DlqEntrySchema = z.object({
id: z.string().min(1),
productId: z.string().min(1),
topic: z.string().min(1),
payload: z.record(z.unknown()),
error: z.string(),
attempts: z.number().int().min(1),
subscriptionId: z.string().optional(),
originalEventId: z.string().optional(),
createdAt: z.string(),
});
export type DlqEntryDoc = z.infer<typeof DlqEntrySchema> & {
_ts?: number;
_etag?: string;
};
export const ListDlqQuerySchema = z.object({
topic: z.string().optional(),
limit: z.coerce.number().min(1).max(100).default(50),
});
export type ListDlqQuery = z.infer<typeof ListDlqQuerySchema>;

View File

@ -9,9 +9,34 @@ import type { JobDefinitionDoc, JobRunDoc, JobContext, JobResult } from './types
// Uses a simple lock flag to prevent overlapping ticks. // Uses a simple lock flag to prevent overlapping ticks.
const DEFAULT_PRODUCT_ID = 'lysnrai'; const DEFAULT_PRODUCT_ID = 'lysnrai';
const MAX_CONCURRENT_JOBS = 5;
const STUCK_JOB_THRESHOLD_MS = 600_000; // 10 minutes
let tickInterval: ReturnType<typeof globalThis.setInterval> | null = null; let tickInterval: ReturnType<typeof globalThis.setInterval> | null = null;
let isRunning = false; let isRunning = false;
let isShuttingDown = false;
// Track active job executions for concurrency + heartbeat
const activeJobs = new Map<string, { jobName: string; startedAt: number; runId: string }>();
/**
* Get the number of currently running jobs.
*/
export function getActiveJobCount(): number {
return activeJobs.size;
}
/**
* Get info about all active jobs (for diagnostics).
*/
export function getActiveJobs(): Array<{ jobName: string; runId: string; elapsedMs: number }> {
const now = Date.now();
return Array.from(activeJobs.values()).map(j => ({
jobName: j.jobName,
runId: j.runId,
elapsedMs: now - j.startedAt,
}));
}
/** /**
* Start the job runner tick loop. Evaluates every 60 seconds. * Start the job runner tick loop. Evaluates every 60 seconds.
@ -22,8 +47,9 @@ export function startRunner(log: {
error: (...a: unknown[]) => void; error: (...a: unknown[]) => void;
}): void { }): void {
if (tickInterval) return; if (tickInterval) return;
isShuttingDown = false;
log.info('[jobs] Starting job runner (60s tick)'); log.info('[jobs] Starting job runner (60s tick, max concurrency: %d)', MAX_CONCURRENT_JOBS);
// Run immediately on start, then every 60s // Run immediately on start, then every 60s
tick(log).catch(() => {}); tick(log).catch(() => {});
@ -33,13 +59,20 @@ export function startRunner(log: {
} }
/** /**
* Stop the job runner. * Stop the job runner gracefully. Waits for active jobs to complete (up to 30s).
*/ */
export function stopRunner(): void { export async function stopRunner(): Promise<void> {
isShuttingDown = true;
if (tickInterval) { if (tickInterval) {
globalThis.clearInterval(tickInterval); globalThis.clearInterval(tickInterval);
tickInterval = null; tickInterval = null;
} }
// Wait for active jobs to finish (up to 30 seconds)
const deadline = Date.now() + 30_000;
while (activeJobs.size > 0 && Date.now() < deadline) {
await new Promise(resolve => globalThis.setTimeout(resolve, 500));
}
} }
/** /**
@ -54,6 +87,8 @@ async function tick(log: {
isRunning = true; isRunning = true;
try { try {
if (isShuttingDown) return;
const now = new Date(); const now = new Date();
// Round to current minute for matching // Round to current minute for matching
now.setUTCSeconds(0, 0); now.setUTCSeconds(0, 0);
@ -61,6 +96,17 @@ async function tick(log: {
const registered = getRegisteredJobs(); const registered = getRegisteredJobs();
if (registered.length === 0) return; if (registered.length === 0) return;
// Recover stuck jobs (running longer than threshold)
for (const [key, job] of activeJobs) {
if (Date.now() - job.startedAt > STUCK_JOB_THRESHOLD_MS) {
log.warn(
{ jobName: job.jobName, runId: job.runId, elapsedMs: Date.now() - job.startedAt },
'[jobs] Removing stuck job from active tracking'
);
activeJobs.delete(key);
}
}
// Get definitions from DB (or create defaults) // Get definitions from DB (or create defaults)
let definitions: JobDefinitionDoc[]; let definitions: JobDefinitionDoc[];
try { try {
@ -74,8 +120,20 @@ async function tick(log: {
if (def.status !== 'enabled') continue; if (def.status !== 'enabled') continue;
if (!getJobHandler(def.name)) continue; if (!getJobHandler(def.name)) continue;
// Enforce concurrency limit
if (activeJobs.size >= MAX_CONCURRENT_JOBS) {
log.warn(
'[jobs] Concurrency limit reached (%d), skipping remaining jobs',
MAX_CONCURRENT_JOBS
);
break;
}
// Skip if this job is already running
if (activeJobs.has(def.name)) continue;
if (cronMatches(def.cronExpression, now)) { if (cronMatches(def.cronExpression, now)) {
await executeJob(def, 'scheduler', log); executeJob(def, 'scheduler', log).catch(() => {});
} }
} }
} finally { } finally {
@ -103,6 +161,9 @@ export async function executeJob(
const runId = crypto.randomUUID(); const runId = crypto.randomUUID();
const startedAt = new Date().toISOString(); const startedAt = new Date().toISOString();
// Track this job as active
activeJobs.set(def.name, { jobName: def.name, startedAt: Date.now(), runId });
// Create run record // Create run record
const run: JobRunDoc = { const run: JobRunDoc = {
id: runId, id: runId,
@ -194,6 +255,9 @@ export async function executeJob(
}; };
} }
// Remove from active tracking
activeJobs.delete(def.name);
const durationMs = Date.now() - start; const durationMs = Date.now() - start;
const completedAt = new Date().toISOString(); const completedAt = new Date().toISOString();
const status = result.success ? 'success' : 'failed'; const status = result.success ? 'success' : 'failed';

View File

@ -107,7 +107,9 @@ import { registerDiagnosticsSubscribers } from './modules/diagnostics/subscriber
import { registerDeliverySubscribers } from './modules/delivery/subscribers.js'; import { registerDeliverySubscribers } from './modules/delivery/subscribers.js';
import { verifyToken } from './modules/auth/jwt.js'; import { verifyToken } from './modules/auth/jwt.js';
import { registerOptionalApiKeyContext } from './lib/api-key-auth.js'; import { registerOptionalApiKeyContext } from './lib/api-key-auth.js';
import { eventSubscriptionRoutes } from './modules/event-subscriptions/routes.js';
import { startEventBus, stopEventBus } from './lib/event-bus.js'; import { startEventBus, stopEventBus } from './lib/event-bus.js';
import { wireDispatcherToBus } from './lib/event-dispatcher.js';
await initCosmosIfNeeded(); await initCosmosIfNeeded();
await loadProductCache(); await loadProductCache();
@ -261,9 +263,13 @@ await app.register(retentionRoutes, { prefix: '/api' });
await app.register(backupRoutes, { prefix: '/api' }); await app.register(backupRoutes, { prefix: '/api' });
await app.register(apiVersioningRoutes, { prefix: '/api' }); await app.register(apiVersioningRoutes, { prefix: '/api' });
// Event subscriptions + DLQ + replay
await app.register(eventSubscriptionRoutes, { prefix: '/api' });
// Register event bus subscribers // Register event bus subscribers
registerDiagnosticsSubscribers(app.log); registerDiagnosticsSubscribers(app.log);
registerDeliverySubscribers(app.log); registerDeliverySubscribers(app.log);
wireDispatcherToBus();
startEventBus(); startEventBus();
app.addHook('onClose', async () => { app.addHook('onClose', async () => {
await stopEventBus(); await stopEventBus();