diff --git a/services/platform-service/src/lib/event-dispatcher.ts b/services/platform-service/src/lib/event-dispatcher.ts new file mode 100644 index 00000000..e250de60 --- /dev/null +++ b/services/platform-service/src/lib/event-dispatcher.ts @@ -0,0 +1,133 @@ +import { randomUUID } from 'node:crypto'; +import type { PlatformEventName, PlatformEvent } from '@bytelyst/events'; +import { bus } from './event-bus.js'; +import { storeEvent } from './event-store-bridge.js'; + +interface DispatcherOptions { + maxRetries?: number; + productId?: string; +} + +type HandlerFn = ( + topic: string, + payload: Record, + productId: string +) => Promise; + +const handlers = new Map(); + +export function registerDispatchHandler(handlerType: string, fn: HandlerFn): void { + const existing = handlers.get(handlerType) ?? []; + existing.push(fn); + handlers.set(handlerType, existing); +} + +export function clearDispatchHandlers(): void { + handlers.clear(); +} + +export async function dispatchEvent( + topic: string, + payload: Record, + productId: string, + options: DispatcherOptions = {} +): Promise<{ dispatched: number; errors: number }> { + const maxRetries = options.maxRetries ?? 3; + let dispatched = 0; + let errors = 0; + + // Store event for replay capability + const eventId = `evt_${randomUUID()}`; + try { + await storeEvent(productId, topic, payload, eventId); + } catch { + // best-effort storage — don't block dispatch + } + + // Look up active subscriptions for this topic + const { findActiveByTopic } = await import('../modules/event-subscriptions/repository.js'); + const subscriptions = await findActiveByTopic(productId, topic).catch( + () => [] as Awaited> + ); + + for (const sub of subscriptions) { + // Apply filter expression if present + if (sub.filterExpression) { + try { + const filterFn = new Function('payload', `return ${sub.filterExpression}`); + if (!filterFn(payload)) continue; + } catch { + // skip invalid filter expressions + continue; + } + } + + const handlerFns = handlers.get(sub.handlerType) ?? []; + for (const fn of handlerFns) { + let attempt = 0; + let success = false; + while (attempt < maxRetries && !success) { + attempt++; + try { + await fn(topic, payload, productId); + success = true; + dispatched++; + } catch { + if (attempt >= maxRetries) { + errors++; + // Send to DLQ + try { + const { createDlqEntry } = + await import('../modules/event-subscriptions/repository.js'); + await createDlqEntry({ + id: `dlq_${randomUUID()}`, + productId, + topic, + payload, + error: `Failed after ${maxRetries} attempts`, + attempts: attempt, + subscriptionId: sub.id, + originalEventId: eventId, + createdAt: new Date().toISOString(), + }); + } catch { + // best-effort DLQ + } + } + } + } + } + } + + return { dispatched, errors }; +} + +export function wireDispatcherToBus(): void { + // Subscribe to all known events and dispatch them + const eventNames: PlatformEventName[] = [ + 'user.created', + 'user.deleted', + 'subscription.created', + 'subscription.changed', + 'subscription.canceled', + 'payment.succeeded', + 'payment.failed', + 'job.completed', + 'job.failed', + 'flag.toggled', + 'license.activated', + 'license.expired', + 'invitation.redeemed', + 'referral.completed', + 'waitlist.joined', + ]; + + for (const eventName of eventNames) { + bus.on(eventName, async (event: PlatformEvent) => { + const productId = (event.payload as Record).productId as string | undefined; + if (productId) { + await dispatchEvent(event.type, event.payload as Record, productId); + } + }); + } +} diff --git a/services/platform-service/src/lib/event-store-bridge.ts b/services/platform-service/src/lib/event-store-bridge.ts new file mode 100644 index 00000000..92e68e7e --- /dev/null +++ b/services/platform-service/src/lib/event-store-bridge.ts @@ -0,0 +1,54 @@ +import { getCollection } from './datastore.js'; + +interface StoredEvent { + id: string; + productId: string; + topic: string; + payload: Record; + source?: string; + createdAt: string; + _ts?: number; +} + +function eventLogCollection() { + return getCollection('event_log', '/productId'); +} + +export async function storeEvent( + productId: string, + topic: string, + payload: Record, + eventId: string, + source?: string +): Promise { + const doc: StoredEvent = { + id: eventId, + productId, + topic, + payload, + source, + createdAt: new Date().toISOString(), + }; + return eventLogCollection().create(doc); +} + +export async function listEvents( + productId: string, + options: { topic?: string; since?: string; until?: string; limit?: number } +): Promise { + const events = await eventLogCollection().findMany({ + filter: { + productId, + ...(options.topic ? { topic: options.topic } : {}), + }, + sort: { createdAt: -1 }, + limit: options.limit ?? 100, + }); + + // Filter by time range in-memory (Cosmos SDK filter limitations) + return events.filter(e => { + if (options.since && e.createdAt < options.since) return false; + if (options.until && e.createdAt > options.until) return false; + return true; + }); +} diff --git a/services/platform-service/src/modules/event-subscriptions/repository.ts b/services/platform-service/src/modules/event-subscriptions/repository.ts new file mode 100644 index 00000000..8afd5822 --- /dev/null +++ b/services/platform-service/src/modules/event-subscriptions/repository.ts @@ -0,0 +1,158 @@ +import { randomUUID } from 'node:crypto'; +import { getCollection } from '../../lib/datastore.js'; +import type { + EventSubscriptionDoc, + CreateEventSubscriptionInput, + UpdateEventSubscriptionInput, + ListEventSubscriptionsQuery, + DlqEntryDoc, + ListDlqQuery, +} from './types.js'; + +function subsCollection() { + return getCollection('event_subscriptions', '/productId'); +} + +function dlqCollection() { + return getCollection('event_dlq', '/productId'); +} + +// ── Subscription CRUD ─────────────────────────────────────── + +export async function create( + productId: string, + input: CreateEventSubscriptionInput, + createdBy: string +): Promise { + const now = new Date().toISOString(); + const doc: EventSubscriptionDoc = { + id: `esub_${randomUUID()}`, + productId, + topic: input.topic, + handlerType: input.handlerType, + status: 'active', + config: input.config, + filterExpression: input.filterExpression, + description: input.description, + createdBy, + createdAt: now, + updatedAt: now, + }; + return subsCollection().create(doc); +} + +export async function getById( + id: string, + productId: string +): Promise { + try { + const doc = await subsCollection().findById(id, productId); + return doc ?? undefined; + } catch { + return undefined; + } +} + +export async function list( + productId: string, + query: ListEventSubscriptionsQuery +): Promise { + return subsCollection().findMany({ + filter: { + productId, + ...(query.topic ? { topic: query.topic } : {}), + ...(query.handlerType ? { handlerType: query.handlerType } : {}), + ...(query.status ? { status: query.status } : {}), + }, + sort: { createdAt: -1 }, + limit: query.limit, + }); +} + +export async function update( + id: string, + productId: string, + input: UpdateEventSubscriptionInput +): Promise { + const existing = await getById(id, productId); + if (!existing) return undefined; + + const updates: Partial = { + ...(input.status !== undefined && { status: input.status }), + ...(input.config !== undefined && { config: input.config }), + ...(input.filterExpression !== undefined && { filterExpression: input.filterExpression }), + ...(input.description !== undefined && { description: input.description }), + updatedAt: new Date().toISOString(), + }; + + return subsCollection().update(id, productId, updates); +} + +export async function remove(id: string, productId: string): Promise { + try { + await subsCollection().delete(id, productId); + return true; + } catch { + return false; + } +} + +// ── Active subscription lookup by topic ───────────────────── + +export async function findActiveByTopic( + productId: string, + topic: string +): Promise { + return subsCollection().findMany({ + filter: { productId, topic, status: 'active' }, + }); +} + +// ── Dead-Letter Queue ─────────────────────────────────────── + +export async function createDlqEntry(entry: DlqEntryDoc): Promise { + return dlqCollection().create(entry); +} + +export async function listDlq(productId: string, query: ListDlqQuery): Promise { + return dlqCollection().findMany({ + filter: { + productId, + ...(query.topic ? { topic: query.topic } : {}), + }, + sort: { createdAt: -1 }, + limit: query.limit, + }); +} + +export async function getDlqEntry(id: string, productId: string): Promise { + try { + const doc = await dlqCollection().findById(id, productId); + return doc ?? undefined; + } catch { + return undefined; + } +} + +export async function deleteDlqEntry(id: string, productId: string): Promise { + try { + await dlqCollection().delete(id, productId); + return true; + } catch { + return false; + } +} + +export async function purgeDlq(productId: string): Promise { + const entries = await dlqCollection().findMany({ filter: { productId }, limit: 1000 }); + let count = 0; + for (const entry of entries) { + try { + await dlqCollection().delete(entry.id, productId); + count++; + } catch { + // best-effort + } + } + return count; +} diff --git a/services/platform-service/src/modules/event-subscriptions/routes.test.ts b/services/platform-service/src/modules/event-subscriptions/routes.test.ts new file mode 100644 index 00000000..b362c0ac --- /dev/null +++ b/services/platform-service/src/modules/event-subscriptions/routes.test.ts @@ -0,0 +1,230 @@ +import Fastify from 'fastify'; +import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'; + +const repoMock = { + list: vi.fn(), + create: vi.fn(), + getById: vi.fn(), + update: vi.fn(), + remove: vi.fn(), + findActiveByTopic: vi.fn(), + createDlqEntry: vi.fn(), + listDlq: vi.fn(), + getDlqEntry: vi.fn(), + deleteDlqEntry: vi.fn(), + purgeDlq: vi.fn(), +}; + +const busMock = { + emit: vi.fn(), + on: vi.fn(), +}; + +const storeBridgeMock = { + listEvents: vi.fn(), +}; + +vi.mock('./repository.js', () => repoMock); +vi.mock('../../lib/event-bus.js', () => ({ bus: busMock })); +vi.mock('../../lib/event-store-bridge.js', () => storeBridgeMock); + +async function buildApp(payload?: { sub: string; productId: string; role?: string }) { + const { eventSubscriptionRoutes } = await import('./routes.js'); + const app = Fastify({ logger: false }); + if (payload) { + app.addHook('onRequest', async req => { + req.jwtPayload = payload; + }); + } + await app.register(eventSubscriptionRoutes, { prefix: '/api' }); + return app; +} + +describe('eventSubscriptionRoutes', () => { + beforeEach(() => { + vi.clearAllMocks(); + }); + + afterEach(() => { + vi.restoreAllMocks(); + }); + + // ── Subscription CRUD ─────────────────────────────────── + + it('GET /event-subscriptions lists subscriptions', async () => { + repoMock.list.mockResolvedValue([{ id: 'esub_1', topic: 'user.created' }]); + const app = await buildApp({ sub: 'admin_1', productId: 'lysnrai', role: 'admin' }); + + const res = await app.inject({ method: 'GET', url: '/api/event-subscriptions' }); + expect(res.statusCode).toBe(200); + expect(repoMock.list).toHaveBeenCalledWith('lysnrai', expect.objectContaining({ limit: 50 })); + }); + + it('POST /event-subscriptions creates a subscription', async () => { + repoMock.create.mockResolvedValue({ id: 'esub_1', topic: 'user.created', status: 'active' }); + const app = await buildApp({ sub: 'admin_1', productId: 'lysnrai', role: 'admin' }); + + const res = await app.inject({ + method: 'POST', + url: '/api/event-subscriptions', + payload: { + topic: 'user.created', + handlerType: 'webhook', + config: { url: 'https://example.com' }, + }, + }); + + expect(res.statusCode).toBe(200); + expect(repoMock.create).toHaveBeenCalledWith( + 'lysnrai', + expect.objectContaining({ topic: 'user.created', handlerType: 'webhook' }), + 'admin_1' + ); + }); + + it('POST /event-subscriptions rejects invalid input', async () => { + const app = await buildApp({ sub: 'admin_1', productId: 'lysnrai', role: 'admin' }); + + const res = await app.inject({ + method: 'POST', + url: '/api/event-subscriptions', + payload: {}, + }); + + expect(res.statusCode).toBe(400); + }); + + it('GET /event-subscriptions/:id returns 404 for unknown', async () => { + repoMock.getById.mockResolvedValue(undefined); + const app = await buildApp({ sub: 'admin_1', productId: 'lysnrai', role: 'admin' }); + + const res = await app.inject({ method: 'GET', url: '/api/event-subscriptions/esub_xxx' }); + expect(res.statusCode).toBe(404); + }); + + it('PATCH /event-subscriptions/:id updates a subscription', async () => { + repoMock.update.mockResolvedValue({ id: 'esub_1', status: 'paused' }); + const app = await buildApp({ sub: 'admin_1', productId: 'lysnrai', role: 'admin' }); + + const res = await app.inject({ + method: 'PATCH', + url: '/api/event-subscriptions/esub_1', + payload: { status: 'paused' }, + }); + + expect(res.statusCode).toBe(200); + expect(repoMock.update).toHaveBeenCalledWith( + 'esub_1', + 'lysnrai', + expect.objectContaining({ status: 'paused' }) + ); + }); + + it('DELETE /event-subscriptions/:id deletes a subscription', async () => { + repoMock.remove.mockResolvedValue(true); + const app = await buildApp({ sub: 'admin_1', productId: 'lysnrai', role: 'admin' }); + + const res = await app.inject({ method: 'DELETE', url: '/api/event-subscriptions/esub_1' }); + expect(res.statusCode).toBe(200); + expect(JSON.parse(res.body)).toEqual({ deleted: true }); + }); + + it('DELETE /event-subscriptions/:id returns 404 for unknown', async () => { + repoMock.remove.mockResolvedValue(false); + const app = await buildApp({ sub: 'admin_1', productId: 'lysnrai', role: 'admin' }); + + const res = await app.inject({ method: 'DELETE', url: '/api/event-subscriptions/esub_xxx' }); + expect(res.statusCode).toBe(404); + }); + + // ── Auth ──────────────────────────────────────────────── + + it('rejects unauthenticated requests', async () => { + const app = await buildApp(); + const res = await app.inject({ method: 'GET', url: '/api/event-subscriptions' }); + expect(res.statusCode).toBe(403); + }); + + it('rejects non-admin users', async () => { + const app = await buildApp({ sub: 'user_1', productId: 'lysnrai', role: 'viewer' }); + const res = await app.inject({ method: 'GET', url: '/api/event-subscriptions' }); + expect(res.statusCode).toBe(403); + }); + + // ── DLQ ───────────────────────────────────────────────── + + it('GET /event-dlq lists DLQ entries', async () => { + repoMock.listDlq.mockResolvedValue([{ id: 'dlq_1', topic: 'user.created' }]); + const app = await buildApp({ sub: 'admin_1', productId: 'lysnrai', role: 'admin' }); + + const res = await app.inject({ method: 'GET', url: '/api/event-dlq' }); + expect(res.statusCode).toBe(200); + expect(repoMock.listDlq).toHaveBeenCalled(); + }); + + it('POST /event-dlq/:id/retry re-emits and removes entry', async () => { + repoMock.getDlqEntry.mockResolvedValue({ + id: 'dlq_1', + topic: 'user.created', + payload: { userId: 'u1', email: 'a@b.com', plan: 'free', productId: 'lysnrai' }, + }); + repoMock.deleteDlqEntry.mockResolvedValue(true); + busMock.emit.mockResolvedValue({ results: [] }); + const app = await buildApp({ sub: 'admin_1', productId: 'lysnrai', role: 'admin' }); + + const res = await app.inject({ method: 'POST', url: '/api/event-dlq/dlq_1/retry' }); + expect(res.statusCode).toBe(200); + expect(busMock.emit).toHaveBeenCalledWith('user.created', expect.any(Object)); + expect(repoMock.deleteDlqEntry).toHaveBeenCalledWith('dlq_1', 'lysnrai'); + }); + + it('POST /event-dlq/:id/retry returns 404 for unknown', async () => { + repoMock.getDlqEntry.mockResolvedValue(undefined); + const app = await buildApp({ sub: 'admin_1', productId: 'lysnrai', role: 'admin' }); + + const res = await app.inject({ method: 'POST', url: '/api/event-dlq/dlq_xxx/retry' }); + expect(res.statusCode).toBe(404); + }); + + it('POST /event-dlq/purge purges all entries', async () => { + repoMock.purgeDlq.mockResolvedValue(5); + const app = await buildApp({ sub: 'admin_1', productId: 'lysnrai', role: 'admin' }); + + const res = await app.inject({ method: 'POST', url: '/api/event-dlq/purge' }); + expect(res.statusCode).toBe(200); + expect(JSON.parse(res.body)).toEqual({ purged: 5 }); + }); + + // ── Event Replay ──────────────────────────────────────── + + it('POST /events/replay replays stored events', async () => { + storeBridgeMock.listEvents.mockResolvedValue([ + { id: 'evt_1', topic: 'user.created', payload: { userId: 'u1', productId: 'lysnrai' } }, + { id: 'evt_2', topic: 'user.created', payload: { userId: 'u2', productId: 'lysnrai' } }, + ]); + busMock.emit.mockResolvedValue({ results: [] }); + const app = await buildApp({ sub: 'admin_1', productId: 'lysnrai', role: 'admin' }); + + const res = await app.inject({ + method: 'POST', + url: '/api/events/replay', + payload: { topic: 'user.created', since: '2026-01-01' }, + }); + + expect(res.statusCode).toBe(200); + expect(JSON.parse(res.body)).toEqual({ replayed: 2, topic: 'user.created' }); + expect(busMock.emit).toHaveBeenCalledTimes(2); + }); + + it('POST /events/replay requires topic', async () => { + const app = await buildApp({ sub: 'admin_1', productId: 'lysnrai', role: 'admin' }); + + const res = await app.inject({ + method: 'POST', + url: '/api/events/replay', + payload: {}, + }); + + expect(res.statusCode).toBe(400); + }); +}); diff --git a/services/platform-service/src/modules/event-subscriptions/routes.ts b/services/platform-service/src/modules/event-subscriptions/routes.ts new file mode 100644 index 00000000..4203a321 --- /dev/null +++ b/services/platform-service/src/modules/event-subscriptions/routes.ts @@ -0,0 +1,141 @@ +import type { FastifyInstance } from 'fastify'; +import type { PlatformEventName } from '@bytelyst/events'; +import { BadRequestError, ForbiddenError, NotFoundError } from '../../lib/errors.js'; +import { bus } from '../../lib/event-bus.js'; +import { listEvents } from '../../lib/event-store-bridge.js'; +import { + CreateEventSubscriptionSchema, + UpdateEventSubscriptionSchema, + ListEventSubscriptionsQuerySchema, + ListDlqQuerySchema, +} from './types.js'; +import * as repo from './repository.js'; + +function requireAdmin(req: { jwtPayload?: { sub?: string; role?: string; productId?: string } }): { + userId: string; + productId: string; +} { + const payload = req.jwtPayload; + if (!payload?.sub) throw new ForbiddenError('Authentication required'); + if (!payload.role || !['super_admin', 'admin'].includes(payload.role)) { + throw new ForbiddenError('Admin access required'); + } + return { + userId: payload.sub, + productId: payload.productId ?? process.env.DEFAULT_PRODUCT_ID ?? 'lysnrai', + }; +} + +export async function eventSubscriptionRoutes(app: FastifyInstance) { + // ── Subscription CRUD ───────────────────────────────────── + + app.get('/event-subscriptions', async req => { + const access = requireAdmin(req); + const parsed = ListEventSubscriptionsQuerySchema.safeParse(req.query); + if (!parsed.success) { + throw new BadRequestError(parsed.error.issues.map(i => i.message).join('; ')); + } + return repo.list(access.productId, parsed.data); + }); + + app.post('/event-subscriptions', async req => { + const access = requireAdmin(req); + const parsed = CreateEventSubscriptionSchema.safeParse(req.body); + if (!parsed.success) { + throw new BadRequestError(parsed.error.issues.map(i => i.message).join('; ')); + } + return repo.create(access.productId, parsed.data, access.userId); + }); + + app.get('/event-subscriptions/:id', async req => { + const access = requireAdmin(req); + const { id } = req.params as { id: string }; + const sub = await repo.getById(id, access.productId); + if (!sub) throw new NotFoundError(`Event subscription '${id}' not found`); + return sub; + }); + + app.patch('/event-subscriptions/:id', async req => { + const access = requireAdmin(req); + const { id } = req.params as { id: string }; + const parsed = UpdateEventSubscriptionSchema.safeParse(req.body); + if (!parsed.success) { + throw new BadRequestError(parsed.error.issues.map(i => i.message).join('; ')); + } + const updated = await repo.update(id, access.productId, parsed.data); + if (!updated) throw new NotFoundError(`Event subscription '${id}' not found`); + return updated; + }); + + app.delete('/event-subscriptions/:id', async req => { + const access = requireAdmin(req); + const { id } = req.params as { id: string }; + const deleted = await repo.remove(id, access.productId); + if (!deleted) throw new NotFoundError(`Event subscription '${id}' not found`); + return { deleted: true }; + }); + + // ── Dead-Letter Queue ───────────────────────────────────── + + app.get('/event-dlq', async req => { + const access = requireAdmin(req); + const parsed = ListDlqQuerySchema.safeParse(req.query); + if (!parsed.success) { + throw new BadRequestError(parsed.error.issues.map(i => i.message).join('; ')); + } + return repo.listDlq(access.productId, parsed.data); + }); + + app.post('/event-dlq/:id/retry', async req => { + const access = requireAdmin(req); + const { id } = req.params as { id: string }; + const entry = await repo.getDlqEntry(id, access.productId); + if (!entry) throw new NotFoundError(`DLQ entry '${id}' not found`); + + // Re-emit via bus — cast topic since stored events are already validated + await bus.emit( + entry.topic as PlatformEventName, + entry.payload as Parameters[1] + ); + + await repo.deleteDlqEntry(id, access.productId); + return { retried: true, topic: entry.topic }; + }); + + app.delete('/event-dlq/:id', async req => { + const access = requireAdmin(req); + const { id } = req.params as { id: string }; + const deleted = await repo.deleteDlqEntry(id, access.productId); + if (!deleted) throw new NotFoundError(`DLQ entry '${id}' not found`); + return { deleted: true }; + }); + + app.post('/event-dlq/purge', async req => { + const access = requireAdmin(req); + const count = await repo.purgeDlq(access.productId); + return { purged: count }; + }); + + // ── Event Replay ────────────────────────────────────────── + + app.post('/events/replay', async req => { + const access = requireAdmin(req); + const body = req.body as { topic?: string; since?: string; until?: string }; + if (!body.topic) throw new BadRequestError('topic is required'); + + const events = await listEvents(access.productId, { + topic: body.topic, + since: body.since, + until: body.until, + limit: 100, + }); + + let replayed = 0; + for (const evt of events) { + await bus.emit(evt.topic as PlatformEventName, evt.payload as Parameters[1]); + replayed++; + } + + return { replayed, topic: body.topic }; + }); +} diff --git a/services/platform-service/src/modules/event-subscriptions/types.ts b/services/platform-service/src/modules/event-subscriptions/types.ts new file mode 100644 index 00000000..e644f365 --- /dev/null +++ b/services/platform-service/src/modules/event-subscriptions/types.ts @@ -0,0 +1,77 @@ +import { z } from 'zod'; + +export const EventHandlerTypeSchema = z.enum(['webhook', 'job', 'notification', 'sse']); +export const EventSubscriptionStatusSchema = z.enum(['active', 'paused', 'disabled']); + +export const EventSubscriptionSchema = z.object({ + id: z.string().min(1), + productId: z.string().min(1), + topic: z.string().min(1), + handlerType: EventHandlerTypeSchema, + status: EventSubscriptionStatusSchema, + config: z.record(z.unknown()).default({}), + filterExpression: z.string().optional(), + description: z.string().optional(), + createdBy: z.string().min(1), + createdAt: z.string(), + updatedAt: z.string(), +}); + +export type EventSubscriptionDoc = z.infer & { + _ts?: number; + _etag?: string; +}; + +export const CreateEventSubscriptionSchema = z.object({ + topic: z.string().min(1), + handlerType: EventHandlerTypeSchema, + config: z.record(z.unknown()).default({}), + filterExpression: z.string().optional(), + description: z.string().optional(), +}); + +export type CreateEventSubscriptionInput = z.infer; + +export const UpdateEventSubscriptionSchema = z.object({ + status: EventSubscriptionStatusSchema.optional(), + config: z.record(z.unknown()).optional(), + filterExpression: z.string().optional(), + description: z.string().optional(), +}); + +export type UpdateEventSubscriptionInput = z.infer; + +export const ListEventSubscriptionsQuerySchema = z.object({ + topic: z.string().optional(), + handlerType: EventHandlerTypeSchema.optional(), + status: EventSubscriptionStatusSchema.optional(), + limit: z.coerce.number().min(1).max(100).default(50), +}); + +export type ListEventSubscriptionsQuery = z.infer; + +// ── Dead-Letter Queue ──────────────────────────────────────── + +export const DlqEntrySchema = z.object({ + id: z.string().min(1), + productId: z.string().min(1), + topic: z.string().min(1), + payload: z.record(z.unknown()), + error: z.string(), + attempts: z.number().int().min(1), + subscriptionId: z.string().optional(), + originalEventId: z.string().optional(), + createdAt: z.string(), +}); + +export type DlqEntryDoc = z.infer & { + _ts?: number; + _etag?: string; +}; + +export const ListDlqQuerySchema = z.object({ + topic: z.string().optional(), + limit: z.coerce.number().min(1).max(100).default(50), +}); + +export type ListDlqQuery = z.infer; diff --git a/services/platform-service/src/modules/jobs/runner.ts b/services/platform-service/src/modules/jobs/runner.ts index b4693f4c..afc5f26e 100644 --- a/services/platform-service/src/modules/jobs/runner.ts +++ b/services/platform-service/src/modules/jobs/runner.ts @@ -9,9 +9,34 @@ import type { JobDefinitionDoc, JobRunDoc, JobContext, JobResult } from './types // Uses a simple lock flag to prevent overlapping ticks. const DEFAULT_PRODUCT_ID = 'lysnrai'; +const MAX_CONCURRENT_JOBS = 5; +const STUCK_JOB_THRESHOLD_MS = 600_000; // 10 minutes let tickInterval: ReturnType | null = null; let isRunning = false; +let isShuttingDown = false; + +// Track active job executions for concurrency + heartbeat +const activeJobs = new Map(); + +/** + * Get the number of currently running jobs. + */ +export function getActiveJobCount(): number { + return activeJobs.size; +} + +/** + * Get info about all active jobs (for diagnostics). + */ +export function getActiveJobs(): Array<{ jobName: string; runId: string; elapsedMs: number }> { + const now = Date.now(); + return Array.from(activeJobs.values()).map(j => ({ + jobName: j.jobName, + runId: j.runId, + elapsedMs: now - j.startedAt, + })); +} /** * Start the job runner tick loop. Evaluates every 60 seconds. @@ -22,8 +47,9 @@ export function startRunner(log: { error: (...a: unknown[]) => void; }): void { if (tickInterval) return; + isShuttingDown = false; - log.info('[jobs] Starting job runner (60s tick)'); + log.info('[jobs] Starting job runner (60s tick, max concurrency: %d)', MAX_CONCURRENT_JOBS); // Run immediately on start, then every 60s tick(log).catch(() => {}); @@ -33,13 +59,20 @@ export function startRunner(log: { } /** - * Stop the job runner. + * Stop the job runner gracefully. Waits for active jobs to complete (up to 30s). */ -export function stopRunner(): void { +export async function stopRunner(): Promise { + isShuttingDown = true; if (tickInterval) { globalThis.clearInterval(tickInterval); tickInterval = null; } + + // Wait for active jobs to finish (up to 30 seconds) + const deadline = Date.now() + 30_000; + while (activeJobs.size > 0 && Date.now() < deadline) { + await new Promise(resolve => globalThis.setTimeout(resolve, 500)); + } } /** @@ -54,6 +87,8 @@ async function tick(log: { isRunning = true; try { + if (isShuttingDown) return; + const now = new Date(); // Round to current minute for matching now.setUTCSeconds(0, 0); @@ -61,6 +96,17 @@ async function tick(log: { const registered = getRegisteredJobs(); if (registered.length === 0) return; + // Recover stuck jobs (running longer than threshold) + for (const [key, job] of activeJobs) { + if (Date.now() - job.startedAt > STUCK_JOB_THRESHOLD_MS) { + log.warn( + { jobName: job.jobName, runId: job.runId, elapsedMs: Date.now() - job.startedAt }, + '[jobs] Removing stuck job from active tracking' + ); + activeJobs.delete(key); + } + } + // Get definitions from DB (or create defaults) let definitions: JobDefinitionDoc[]; try { @@ -74,8 +120,20 @@ async function tick(log: { if (def.status !== 'enabled') continue; if (!getJobHandler(def.name)) continue; + // Enforce concurrency limit + if (activeJobs.size >= MAX_CONCURRENT_JOBS) { + log.warn( + '[jobs] Concurrency limit reached (%d), skipping remaining jobs', + MAX_CONCURRENT_JOBS + ); + break; + } + + // Skip if this job is already running + if (activeJobs.has(def.name)) continue; + if (cronMatches(def.cronExpression, now)) { - await executeJob(def, 'scheduler', log); + executeJob(def, 'scheduler', log).catch(() => {}); } } } finally { @@ -103,6 +161,9 @@ export async function executeJob( const runId = crypto.randomUUID(); const startedAt = new Date().toISOString(); + // Track this job as active + activeJobs.set(def.name, { jobName: def.name, startedAt: Date.now(), runId }); + // Create run record const run: JobRunDoc = { id: runId, @@ -194,6 +255,9 @@ export async function executeJob( }; } + // Remove from active tracking + activeJobs.delete(def.name); + const durationMs = Date.now() - start; const completedAt = new Date().toISOString(); const status = result.success ? 'success' : 'failed'; diff --git a/services/platform-service/src/server.ts b/services/platform-service/src/server.ts index 004286b5..311001da 100644 --- a/services/platform-service/src/server.ts +++ b/services/platform-service/src/server.ts @@ -107,7 +107,9 @@ import { registerDiagnosticsSubscribers } from './modules/diagnostics/subscriber import { registerDeliverySubscribers } from './modules/delivery/subscribers.js'; import { verifyToken } from './modules/auth/jwt.js'; import { registerOptionalApiKeyContext } from './lib/api-key-auth.js'; +import { eventSubscriptionRoutes } from './modules/event-subscriptions/routes.js'; import { startEventBus, stopEventBus } from './lib/event-bus.js'; +import { wireDispatcherToBus } from './lib/event-dispatcher.js'; await initCosmosIfNeeded(); await loadProductCache(); @@ -261,9 +263,13 @@ await app.register(retentionRoutes, { prefix: '/api' }); await app.register(backupRoutes, { prefix: '/api' }); await app.register(apiVersioningRoutes, { prefix: '/api' }); +// Event subscriptions + DLQ + replay +await app.register(eventSubscriptionRoutes, { prefix: '/api' }); + // Register event bus subscribers registerDiagnosticsSubscribers(app.log); registerDeliverySubscribers(app.log); +wireDispatcherToBus(); startEventBus(); app.addHook('onClose', async () => { await stopEventBus();