From a76b932502df4155d703e000a928c3e54b6bab95 Mon Sep 17 00:00:00 2001 From: root Date: Sat, 14 Mar 2026 16:08:07 +0000 Subject: [PATCH] feat(platform-service): add durable run tracking --- .../platform-service/src/lib/cosmos-init.ts | 3 + .../src/modules/jobs/runner.test.ts | 59 +++++++++ .../src/modules/jobs/runner.ts | 61 +++++++++ .../src/modules/runs/repository.test.ts | 61 +++++++++ .../src/modules/runs/repository.ts | 75 +++++++++++ .../src/modules/runs/routes.ts | 38 ++++++ .../src/modules/runs/runs.api-key.test.ts | 60 +++++++++ .../src/modules/runs/tracker.ts | 118 ++++++++++++++++++ .../src/modules/runs/types.ts | 69 ++++++++++ services/platform-service/src/server.ts | 2 + 10 files changed, 546 insertions(+) create mode 100644 services/platform-service/src/modules/jobs/runner.test.ts create mode 100644 services/platform-service/src/modules/runs/repository.test.ts create mode 100644 services/platform-service/src/modules/runs/repository.ts create mode 100644 services/platform-service/src/modules/runs/routes.ts create mode 100644 services/platform-service/src/modules/runs/runs.api-key.test.ts create mode 100644 services/platform-service/src/modules/runs/tracker.ts create mode 100644 services/platform-service/src/modules/runs/types.ts diff --git a/services/platform-service/src/lib/cosmos-init.ts b/services/platform-service/src/lib/cosmos-init.ts index 53b1cbea..e2276ac5 100644 --- a/services/platform-service/src/lib/cosmos-init.ts +++ b/services/platform-service/src/lib/cosmos-init.ts @@ -60,6 +60,9 @@ const CONTAINER_DEFS: Record = { // Scheduled jobs job_definitions: { partitionKeyPath: '/productId' }, job_runs: { partitionKeyPath: '/pk' }, + // Generic orchestration runs + agent_runs: { partitionKeyPath: '/productId', defaultTtl: 30 * 86400 }, + agent_run_steps: { partitionKeyPath: '/pk', defaultTtl: 30 * 86400 }, // Telemetry (client diagnostics — see docs/WINDSURF/CLIENT_TELEMETRY_DESIGN.md) telemetry_events: { partitionKeyPath: '/pk', defaultTtl: 30 * 86400 }, telemetry_error_clusters: { partitionKeyPath: '/pk', defaultTtl: 90 * 86400 }, diff --git a/services/platform-service/src/modules/jobs/runner.test.ts b/services/platform-service/src/modules/jobs/runner.test.ts new file mode 100644 index 00000000..a9e7880f --- /dev/null +++ b/services/platform-service/src/modules/jobs/runner.test.ts @@ -0,0 +1,59 @@ +import { afterEach, beforeEach, describe, expect, it } from 'vitest'; +import { MemoryDatastoreProvider } from '@bytelyst/datastore'; +import { _resetDatastoreProvider, setProvider } from '../../lib/datastore.js'; +import { clearRegistry, registerJob } from './registry.js'; +import { executeJob } from './runner.js'; +import * as runsRepo from '../runs/repository.js'; +import type { JobDefinitionDoc } from './types.js'; + +describe('jobs runner', () => { + beforeEach(() => { + setProvider(new MemoryDatastoreProvider()); + clearRegistry(); + }); + + afterEach(() => { + clearRegistry(); + _resetDatastoreProvider(); + }); + + it('writes a generic run record and step for executed jobs', async () => { + registerJob('nightly-sync', async () => ({ + success: true, + message: 'ok', + metrics: { synced: 12 }, + })); + + const definition: JobDefinitionDoc = { + id: 'job_nightly-sync', + productId: 'lysnrai', + name: 'nightly-sync', + cronExpression: '0 * * * *', + status: 'enabled', + createdAt: new Date().toISOString(), + updatedAt: new Date().toISOString(), + timeoutMs: 10_000, + retryOnFailure: false, + maxRetries: 0, + }; + + const log = { + info: () => {}, + warn: () => {}, + error: () => {}, + }; + + const run = await executeJob(definition, 'manual', log); + const trackedRun = await runsRepo.getRun(run.id, 'lysnrai'); + const steps = await runsRepo.listRunSteps('lysnrai', run.id); + + expect(trackedRun.kind).toBe('job'); + expect(trackedRun.status).toBe('succeeded'); + expect(trackedRun.source).toBe('platform.jobs'); + expect(steps).toHaveLength(1); + expect(steps[0]).toMatchObject({ + stepName: 'execute', + status: 'succeeded', + }); + }); +}); diff --git a/services/platform-service/src/modules/jobs/runner.ts b/services/platform-service/src/modules/jobs/runner.ts index 2aee2784..b4693f4c 100644 --- a/services/platform-service/src/modules/jobs/runner.ts +++ b/services/platform-service/src/modules/jobs/runner.ts @@ -1,6 +1,7 @@ import { cronMatches, nextCronOccurrence } from './cron.js'; import { getJobHandler, getRegisteredJobs } from './registry.js'; import * as repo from './repository.js'; +import * as runTracker from '../runs/tracker.js'; import type { JobDefinitionDoc, JobRunDoc, JobContext, JobResult } from './types.js'; // ── In-Process Job Runner ──────────────────────────────────── @@ -119,6 +120,37 @@ export async function executeJob( // Non-fatal — continue execution } + try { + await runTracker.startRun({ + id: runId, + productId: def.productId, + kind: 'job', + name: def.name, + source: 'platform.jobs', + triggeredBy, + input: { + jobDefinitionId: def.id, + cronExpression: def.cronExpression, + }, + metadata: { + timeoutMs: def.timeoutMs, + retryOnFailure: def.retryOnFailure, + maxRetries: def.maxRetries, + }, + }); + await runTracker.startRunStep({ + runId, + productId: def.productId, + stepName: 'execute', + order: 1, + input: { + triggeredBy, + }, + }); + } catch { + // Non-fatal + } + // Mark definition as running try { await repo.updateJobDefinition(def.id, def.productId, { @@ -182,6 +214,35 @@ export async function executeJob( // Non-fatal } + try { + if (result.success) { + await runTracker.completeRunStep(runId, def.productId, 'execute', { + message: result.message ?? 'completed', + ...(result.metrics ? { metrics: result.metrics } : {}), + }); + await runTracker.completeRun(runId, def.productId, { + message: result.message ?? 'completed', + ...(result.metrics ? { metrics: result.metrics } : {}), + }); + } else { + await runTracker.failRunStep( + runId, + def.productId, + 'execute', + result.message ?? 'Job failed', + result.metrics ? { metrics: result.metrics } : undefined + ); + await runTracker.failRun( + runId, + def.productId, + result.message ?? 'Job failed', + result.metrics ? { metrics: result.metrics } : undefined + ); + } + } catch { + // Non-fatal + } + // Update definition const nextRun = nextCronOccurrence(def.cronExpression, new Date()); try { diff --git a/services/platform-service/src/modules/runs/repository.test.ts b/services/platform-service/src/modules/runs/repository.test.ts new file mode 100644 index 00000000..ce362ca0 --- /dev/null +++ b/services/platform-service/src/modules/runs/repository.test.ts @@ -0,0 +1,61 @@ +import { beforeEach, afterEach, describe, expect, it } from 'vitest'; +import { MemoryDatastoreProvider } from '@bytelyst/datastore'; +import { _resetDatastoreProvider, setProvider } from '../../lib/datastore.js'; +import * as repo from './repository.js'; + +describe('runs repository', () => { + beforeEach(() => { + setProvider(new MemoryDatastoreProvider()); + }); + + afterEach(() => { + _resetDatastoreProvider(); + }); + + it('creates and retrieves runs with ordered steps', async () => { + await repo.createRun({ + id: 'run_1', + productId: 'lysnrai', + kind: 'job', + name: 'nightly-sync', + source: 'platform.jobs', + status: 'running', + createdAt: '2026-03-14T00:00:00.000Z', + startedAt: '2026-03-14T00:00:00.000Z', + updatedAt: '2026-03-14T00:00:00.000Z', + }); + + await repo.createRunStep({ + id: 'run_1:collect', + pk: 'lysnrai:run_1', + runId: 'run_1', + productId: 'lysnrai', + stepName: 'collect', + order: 2, + status: 'running', + createdAt: '2026-03-14T00:00:00.000Z', + startedAt: '2026-03-14T00:00:00.000Z', + updatedAt: '2026-03-14T00:00:00.000Z', + }); + + await repo.createRunStep({ + id: 'run_1:validate', + pk: 'lysnrai:run_1', + runId: 'run_1', + productId: 'lysnrai', + stepName: 'validate', + order: 1, + status: 'succeeded', + createdAt: '2026-03-14T00:00:00.000Z', + startedAt: '2026-03-14T00:00:00.000Z', + completedAt: '2026-03-14T00:01:00.000Z', + updatedAt: '2026-03-14T00:01:00.000Z', + }); + + const run = await repo.getRun('run_1', 'lysnrai'); + const steps = await repo.listRunSteps('lysnrai', 'run_1'); + + expect(run.name).toBe('nightly-sync'); + expect(steps.map(step => step.stepName)).toEqual(['validate', 'collect']); + }); +}); diff --git a/services/platform-service/src/modules/runs/repository.ts b/services/platform-service/src/modules/runs/repository.ts new file mode 100644 index 00000000..2ef0d0c7 --- /dev/null +++ b/services/platform-service/src/modules/runs/repository.ts @@ -0,0 +1,75 @@ +import { NotFoundError } from '../../lib/errors.js'; +import { getCollection } from '../../lib/datastore.js'; +import type { ListRunsQuery, RunDoc, RunStepDoc } from './types.js'; + +function runsCollection() { + return getCollection('agent_runs', '/productId'); +} + +function stepsCollection() { + return getCollection('agent_run_steps', '/pk'); +} + +export async function createRun(doc: RunDoc): Promise { + return runsCollection().create(doc); +} + +export async function updateRun( + id: string, + productId: string, + updates: Partial +): Promise { + const updated = await runsCollection().update(id, productId, { + ...updates, + updatedAt: new Date().toISOString(), + }); + if (!updated) throw new NotFoundError(`Run '${id}' not found`); + return updated; +} + +export async function getRun(id: string, productId: string): Promise { + const run = await runsCollection().findById(id, productId); + if (!run) throw new NotFoundError(`Run '${id}' not found`); + return run; +} + +export async function listRuns(productId: string, query: ListRunsQuery): Promise { + return runsCollection().findMany({ + filter: { + productId, + ...(query.kind ? { kind: query.kind } : {}), + ...(query.status ? { status: query.status } : {}), + }, + sort: { createdAt: -1 }, + limit: query.limit, + }); +} + +export async function createRunStep(doc: RunStepDoc): Promise { + return stepsCollection().create(doc); +} + +export async function updateRunStep( + id: string, + runId: string, + productId: string, + updates: Partial +): Promise { + const updated = await stepsCollection().update(id, `${productId}:${runId}`, { + ...updates, + updatedAt: new Date().toISOString(), + }); + if (!updated) throw new NotFoundError(`Run step '${id}' not found`); + return updated; +} + +export async function listRunSteps(productId: string, runId: string): Promise { + return stepsCollection().findMany({ + filter: { + pk: `${productId}:${runId}`, + runId, + }, + sort: { order: 1 }, + limit: 100, + }); +} diff --git a/services/platform-service/src/modules/runs/routes.ts b/services/platform-service/src/modules/runs/routes.ts new file mode 100644 index 00000000..49b04fc4 --- /dev/null +++ b/services/platform-service/src/modules/runs/routes.ts @@ -0,0 +1,38 @@ +import type { FastifyInstance } from 'fastify'; +import { requireJwtOrApiKey } from '../../lib/api-key-auth.js'; +import { BadRequestError } from '../../lib/errors.js'; +import { ListRunsQuerySchema } from './types.js'; +import * as repo from './repository.js'; + +export async function runRoutes(app: FastifyInstance) { + function requireRunsRead(req: import('fastify').FastifyRequest): string { + const access = requireJwtOrApiKey(req, { + jwtRoles: ['super_admin', 'admin'], + apiKeyScopes: ['jobs:read'], + rateLimitKey: 'jobs:read', + }); + return access.productId; + } + + app.get('/runs', async req => { + const productId = requireRunsRead(req); + const parsed = ListRunsQuerySchema.safeParse(req.query); + if (!parsed.success) { + throw new BadRequestError(parsed.error.issues.map(issue => issue.message).join('; ')); + } + + return repo.listRuns(productId, parsed.data); + }); + + app.get('/runs/:id', async req => { + const productId = requireRunsRead(req); + const { id } = req.params as { id: string }; + return repo.getRun(id, productId); + }); + + app.get('/runs/:id/steps', async req => { + const productId = requireRunsRead(req); + const { id } = req.params as { id: string }; + return repo.listRunSteps(productId, id); + }); +} diff --git a/services/platform-service/src/modules/runs/runs.api-key.test.ts b/services/platform-service/src/modules/runs/runs.api-key.test.ts new file mode 100644 index 00000000..d328583e --- /dev/null +++ b/services/platform-service/src/modules/runs/runs.api-key.test.ts @@ -0,0 +1,60 @@ +import Fastify from 'fastify'; +import bcrypt from 'bcryptjs'; +import { beforeEach, describe, expect, it, vi } from 'vitest'; +import { MemoryDatastoreProvider } from '@bytelyst/datastore'; +import { setProvider } from '../../lib/datastore.js'; +import { registerOptionalApiKeyContext } from '../../lib/api-key-auth.js'; + +const rawApiKey = `wai_${'c'.repeat(64)}`; + +const repoMock = { + listRuns: vi.fn(), + getRun: vi.fn(), + listRunSteps: vi.fn(), +}; + +vi.mock('./repository.js', () => repoMock); + +async function seedApiKey(scopes: string[]) { + const provider = new MemoryDatastoreProvider(); + setProvider(provider); + const collection = provider.getCollection('api_tokens', '/id'); + await collection.create({ + id: 'tok_runs_1', + productId: 'lysnrai', + userId: 'svc_runs', + userName: 'Runs Service', + prefix: rawApiKey.slice(0, 12), + tokenHash: await bcrypt.hash(rawApiKey, 10), + status: 'active', + scopes, + expiresAt: '2099-01-01T00:00:00.000Z', + lastUsed: null, + }); +} + +describe('runRoutes api key integration', () => { + beforeEach(() => { + vi.clearAllMocks(); + delete process.env.API_KEY_RATE_LIMIT_CONFIG_JSON; + }); + + it('allows run reads via scoped api key', async () => { + await seedApiKey(['jobs:read']); + repoMock.listRuns.mockResolvedValue([{ id: 'run_1', productId: 'lysnrai' }]); + + const { runRoutes } = await import('./routes.js'); + const app = Fastify(); + await registerOptionalApiKeyContext(app); + await app.register(runRoutes, { prefix: '/api' }); + + const res = await app.inject({ + method: 'GET', + url: '/api/runs?limit=10', + headers: { 'x-api-key': rawApiKey }, + }); + + expect(res.statusCode).toBe(200); + expect(repoMock.listRuns).toHaveBeenCalledWith('lysnrai', { limit: 10 }); + }); +}); diff --git a/services/platform-service/src/modules/runs/tracker.ts b/services/platform-service/src/modules/runs/tracker.ts new file mode 100644 index 00000000..8b60a083 --- /dev/null +++ b/services/platform-service/src/modules/runs/tracker.ts @@ -0,0 +1,118 @@ +import type { RunDoc, RunStepDoc } from './types.js'; +import * as repo from './repository.js'; + +interface StartRunInput { + id: string; + productId: string; + kind: RunDoc['kind']; + name: string; + source: string; + triggeredBy?: string; + parentRunId?: string; + queueName?: string; + queueJobId?: string; + input?: Record; + metadata?: Record; +} + +interface StartRunStepInput { + runId: string; + productId: string; + stepName: string; + order: number; + input?: Record; + metadata?: Record; +} + +export async function startRun(input: StartRunInput): Promise { + const now = new Date().toISOString(); + return repo.createRun({ + id: input.id, + productId: input.productId, + kind: input.kind, + name: input.name, + source: input.source, + status: 'running', + triggeredBy: input.triggeredBy, + parentRunId: input.parentRunId, + queueName: input.queueName, + queueJobId: input.queueJobId, + input: input.input, + metadata: input.metadata, + createdAt: now, + startedAt: now, + updatedAt: now, + }); +} + +export async function completeRun( + id: string, + productId: string, + output?: Record +): Promise { + return repo.updateRun(id, productId, { + status: 'succeeded', + output, + completedAt: new Date().toISOString(), + }); +} + +export async function failRun( + id: string, + productId: string, + error: string, + output?: Record +): Promise { + return repo.updateRun(id, productId, { + status: 'failed', + error, + output, + completedAt: new Date().toISOString(), + }); +} + +export async function startRunStep(input: StartRunStepInput): Promise { + const now = new Date().toISOString(); + return repo.createRunStep({ + id: `${input.runId}:${input.stepName}`, + pk: `${input.productId}:${input.runId}`, + runId: input.runId, + productId: input.productId, + stepName: input.stepName, + order: input.order, + status: 'running', + input: input.input, + metadata: input.metadata, + createdAt: now, + startedAt: now, + updatedAt: now, + }); +} + +export async function completeRunStep( + runId: string, + productId: string, + stepName: string, + output?: Record +): Promise { + return repo.updateRunStep(`${runId}:${stepName}`, runId, productId, { + status: 'succeeded', + output, + completedAt: new Date().toISOString(), + }); +} + +export async function failRunStep( + runId: string, + productId: string, + stepName: string, + error: string, + output?: Record +): Promise { + return repo.updateRunStep(`${runId}:${stepName}`, runId, productId, { + status: 'failed', + error, + output, + completedAt: new Date().toISOString(), + }); +} diff --git a/services/platform-service/src/modules/runs/types.ts b/services/platform-service/src/modules/runs/types.ts new file mode 100644 index 00000000..e9f2c3b9 --- /dev/null +++ b/services/platform-service/src/modules/runs/types.ts @@ -0,0 +1,69 @@ +import { z } from 'zod'; + +export const RunKindSchema = z.enum(['job', 'agent']); +export const RunStatusSchema = z.enum(['queued', 'running', 'succeeded', 'failed', 'cancelled']); +export const RunStepStatusSchema = z.enum([ + 'pending', + 'running', + 'succeeded', + 'failed', + 'skipped', + 'cancelled', +]); + +export const RunSchema = z.object({ + id: z.string().min(1), + productId: z.string().min(1), + kind: RunKindSchema, + name: z.string().min(1), + source: z.string().min(1), + status: RunStatusSchema, + triggeredBy: z.string().optional(), + parentRunId: z.string().optional(), + queueName: z.string().optional(), + queueJobId: z.string().optional(), + input: z.record(z.unknown()).optional(), + output: z.record(z.unknown()).optional(), + error: z.string().optional(), + metadata: z.record(z.unknown()).optional(), + createdAt: z.string(), + startedAt: z.string().optional(), + completedAt: z.string().optional(), + updatedAt: z.string(), +}); + +export type RunDoc = z.infer & { + _ts?: number; + _etag?: string; +}; + +export const RunStepSchema = z.object({ + id: z.string().min(1), + runId: z.string().min(1), + productId: z.string().min(1), + stepName: z.string().min(1), + order: z.number().int().min(0), + status: RunStepStatusSchema, + input: z.record(z.unknown()).optional(), + output: z.record(z.unknown()).optional(), + error: z.string().optional(), + metadata: z.record(z.unknown()).optional(), + createdAt: z.string(), + startedAt: z.string().optional(), + completedAt: z.string().optional(), + updatedAt: z.string(), +}); + +export type RunStepDoc = z.infer & { + pk: string; + _ts?: number; + _etag?: string; +}; + +export const ListRunsQuerySchema = z.object({ + kind: RunKindSchema.optional(), + status: RunStatusSchema.optional(), + limit: z.coerce.number().min(1).max(100).default(20), +}); + +export type ListRunsQuery = z.infer; diff --git a/services/platform-service/src/server.ts b/services/platform-service/src/server.ts index 9f519286..b2b2fd3e 100644 --- a/services/platform-service/src/server.ts +++ b/services/platform-service/src/server.ts @@ -65,6 +65,7 @@ import { startTriggerEvaluationJob } from './modules/diagnostics/trigger-job.js' import { broadcastRoutes } from './modules/broadcasts/routes.js'; import { surveyRoutes } from './modules/surveys/routes.js'; import { jobRoutes } from './modules/jobs/routes.js'; +import { runRoutes } from './modules/runs/routes.js'; import { statusRoutes } from './modules/status/routes.js'; import { deliveryRoutes } from './modules/delivery/routes.js'; import { sessionRoutes } from './modules/sessions/routes.js'; @@ -179,6 +180,7 @@ await app.register(performanceProfileRoutes, { prefix: '/api' }); await app.register(publicRoutes, { prefix: '/api' }); // Scheduled jobs module (admin: list, trigger, view runs) await app.register(jobRoutes, { prefix: '/api' }); +await app.register(runRoutes, { prefix: '/api' }); // Public status page + incident management await app.register(statusRoutes, { prefix: '/api' }); // Transactional email delivery