feat(platform-service): add durable run tracking

This commit is contained in:
root 2026-03-14 16:08:07 +00:00
parent 885ee2d504
commit a76b932502
10 changed files with 546 additions and 0 deletions

View File

@ -60,6 +60,9 @@ const CONTAINER_DEFS: Record<string, ContainerConfig> = {
// Scheduled jobs
job_definitions: { partitionKeyPath: '/productId' },
job_runs: { partitionKeyPath: '/pk' },
// Generic orchestration runs
agent_runs: { partitionKeyPath: '/productId', defaultTtl: 30 * 86400 },
agent_run_steps: { partitionKeyPath: '/pk', defaultTtl: 30 * 86400 },
// Telemetry (client diagnostics — see docs/WINDSURF/CLIENT_TELEMETRY_DESIGN.md)
telemetry_events: { partitionKeyPath: '/pk', defaultTtl: 30 * 86400 },
telemetry_error_clusters: { partitionKeyPath: '/pk', defaultTtl: 90 * 86400 },

View File

@ -0,0 +1,59 @@
import { afterEach, beforeEach, describe, expect, it } from 'vitest';
import { MemoryDatastoreProvider } from '@bytelyst/datastore';
import { _resetDatastoreProvider, setProvider } from '../../lib/datastore.js';
import { clearRegistry, registerJob } from './registry.js';
import { executeJob } from './runner.js';
import * as runsRepo from '../runs/repository.js';
import type { JobDefinitionDoc } from './types.js';
describe('jobs runner', () => {
beforeEach(() => {
setProvider(new MemoryDatastoreProvider());
clearRegistry();
});
afterEach(() => {
clearRegistry();
_resetDatastoreProvider();
});
it('writes a generic run record and step for executed jobs', async () => {
registerJob('nightly-sync', async () => ({
success: true,
message: 'ok',
metrics: { synced: 12 },
}));
const definition: JobDefinitionDoc = {
id: 'job_nightly-sync',
productId: 'lysnrai',
name: 'nightly-sync',
cronExpression: '0 * * * *',
status: 'enabled',
createdAt: new Date().toISOString(),
updatedAt: new Date().toISOString(),
timeoutMs: 10_000,
retryOnFailure: false,
maxRetries: 0,
};
const log = {
info: () => {},
warn: () => {},
error: () => {},
};
const run = await executeJob(definition, 'manual', log);
const trackedRun = await runsRepo.getRun(run.id, 'lysnrai');
const steps = await runsRepo.listRunSteps('lysnrai', run.id);
expect(trackedRun.kind).toBe('job');
expect(trackedRun.status).toBe('succeeded');
expect(trackedRun.source).toBe('platform.jobs');
expect(steps).toHaveLength(1);
expect(steps[0]).toMatchObject({
stepName: 'execute',
status: 'succeeded',
});
});
});

View File

@ -1,6 +1,7 @@
import { cronMatches, nextCronOccurrence } from './cron.js';
import { getJobHandler, getRegisteredJobs } from './registry.js';
import * as repo from './repository.js';
import * as runTracker from '../runs/tracker.js';
import type { JobDefinitionDoc, JobRunDoc, JobContext, JobResult } from './types.js';
// ── In-Process Job Runner ────────────────────────────────────
@ -119,6 +120,37 @@ export async function executeJob(
// Non-fatal — continue execution
}
try {
await runTracker.startRun({
id: runId,
productId: def.productId,
kind: 'job',
name: def.name,
source: 'platform.jobs',
triggeredBy,
input: {
jobDefinitionId: def.id,
cronExpression: def.cronExpression,
},
metadata: {
timeoutMs: def.timeoutMs,
retryOnFailure: def.retryOnFailure,
maxRetries: def.maxRetries,
},
});
await runTracker.startRunStep({
runId,
productId: def.productId,
stepName: 'execute',
order: 1,
input: {
triggeredBy,
},
});
} catch {
// Non-fatal
}
// Mark definition as running
try {
await repo.updateJobDefinition(def.id, def.productId, {
@ -182,6 +214,35 @@ export async function executeJob(
// Non-fatal
}
try {
if (result.success) {
await runTracker.completeRunStep(runId, def.productId, 'execute', {
message: result.message ?? 'completed',
...(result.metrics ? { metrics: result.metrics } : {}),
});
await runTracker.completeRun(runId, def.productId, {
message: result.message ?? 'completed',
...(result.metrics ? { metrics: result.metrics } : {}),
});
} else {
await runTracker.failRunStep(
runId,
def.productId,
'execute',
result.message ?? 'Job failed',
result.metrics ? { metrics: result.metrics } : undefined
);
await runTracker.failRun(
runId,
def.productId,
result.message ?? 'Job failed',
result.metrics ? { metrics: result.metrics } : undefined
);
}
} catch {
// Non-fatal
}
// Update definition
const nextRun = nextCronOccurrence(def.cronExpression, new Date());
try {

View File

@ -0,0 +1,61 @@
import { beforeEach, afterEach, describe, expect, it } from 'vitest';
import { MemoryDatastoreProvider } from '@bytelyst/datastore';
import { _resetDatastoreProvider, setProvider } from '../../lib/datastore.js';
import * as repo from './repository.js';
describe('runs repository', () => {
beforeEach(() => {
setProvider(new MemoryDatastoreProvider());
});
afterEach(() => {
_resetDatastoreProvider();
});
it('creates and retrieves runs with ordered steps', async () => {
await repo.createRun({
id: 'run_1',
productId: 'lysnrai',
kind: 'job',
name: 'nightly-sync',
source: 'platform.jobs',
status: 'running',
createdAt: '2026-03-14T00:00:00.000Z',
startedAt: '2026-03-14T00:00:00.000Z',
updatedAt: '2026-03-14T00:00:00.000Z',
});
await repo.createRunStep({
id: 'run_1:collect',
pk: 'lysnrai:run_1',
runId: 'run_1',
productId: 'lysnrai',
stepName: 'collect',
order: 2,
status: 'running',
createdAt: '2026-03-14T00:00:00.000Z',
startedAt: '2026-03-14T00:00:00.000Z',
updatedAt: '2026-03-14T00:00:00.000Z',
});
await repo.createRunStep({
id: 'run_1:validate',
pk: 'lysnrai:run_1',
runId: 'run_1',
productId: 'lysnrai',
stepName: 'validate',
order: 1,
status: 'succeeded',
createdAt: '2026-03-14T00:00:00.000Z',
startedAt: '2026-03-14T00:00:00.000Z',
completedAt: '2026-03-14T00:01:00.000Z',
updatedAt: '2026-03-14T00:01:00.000Z',
});
const run = await repo.getRun('run_1', 'lysnrai');
const steps = await repo.listRunSteps('lysnrai', 'run_1');
expect(run.name).toBe('nightly-sync');
expect(steps.map(step => step.stepName)).toEqual(['validate', 'collect']);
});
});

View File

@ -0,0 +1,75 @@
import { NotFoundError } from '../../lib/errors.js';
import { getCollection } from '../../lib/datastore.js';
import type { ListRunsQuery, RunDoc, RunStepDoc } from './types.js';
function runsCollection() {
return getCollection<RunDoc>('agent_runs', '/productId');
}
function stepsCollection() {
return getCollection<RunStepDoc>('agent_run_steps', '/pk');
}
export async function createRun(doc: RunDoc): Promise<RunDoc> {
return runsCollection().create(doc);
}
export async function updateRun(
id: string,
productId: string,
updates: Partial<RunDoc>
): Promise<RunDoc> {
const updated = await runsCollection().update(id, productId, {
...updates,
updatedAt: new Date().toISOString(),
});
if (!updated) throw new NotFoundError(`Run '${id}' not found`);
return updated;
}
export async function getRun(id: string, productId: string): Promise<RunDoc> {
const run = await runsCollection().findById(id, productId);
if (!run) throw new NotFoundError(`Run '${id}' not found`);
return run;
}
export async function listRuns(productId: string, query: ListRunsQuery): Promise<RunDoc[]> {
return runsCollection().findMany({
filter: {
productId,
...(query.kind ? { kind: query.kind } : {}),
...(query.status ? { status: query.status } : {}),
},
sort: { createdAt: -1 },
limit: query.limit,
});
}
export async function createRunStep(doc: RunStepDoc): Promise<RunStepDoc> {
return stepsCollection().create(doc);
}
export async function updateRunStep(
id: string,
runId: string,
productId: string,
updates: Partial<RunStepDoc>
): Promise<RunStepDoc> {
const updated = await stepsCollection().update(id, `${productId}:${runId}`, {
...updates,
updatedAt: new Date().toISOString(),
});
if (!updated) throw new NotFoundError(`Run step '${id}' not found`);
return updated;
}
export async function listRunSteps(productId: string, runId: string): Promise<RunStepDoc[]> {
return stepsCollection().findMany({
filter: {
pk: `${productId}:${runId}`,
runId,
},
sort: { order: 1 },
limit: 100,
});
}

View File

@ -0,0 +1,38 @@
import type { FastifyInstance } from 'fastify';
import { requireJwtOrApiKey } from '../../lib/api-key-auth.js';
import { BadRequestError } from '../../lib/errors.js';
import { ListRunsQuerySchema } from './types.js';
import * as repo from './repository.js';
export async function runRoutes(app: FastifyInstance) {
function requireRunsRead(req: import('fastify').FastifyRequest): string {
const access = requireJwtOrApiKey(req, {
jwtRoles: ['super_admin', 'admin'],
apiKeyScopes: ['jobs:read'],
rateLimitKey: 'jobs:read',
});
return access.productId;
}
app.get('/runs', async req => {
const productId = requireRunsRead(req);
const parsed = ListRunsQuerySchema.safeParse(req.query);
if (!parsed.success) {
throw new BadRequestError(parsed.error.issues.map(issue => issue.message).join('; '));
}
return repo.listRuns(productId, parsed.data);
});
app.get('/runs/:id', async req => {
const productId = requireRunsRead(req);
const { id } = req.params as { id: string };
return repo.getRun(id, productId);
});
app.get('/runs/:id/steps', async req => {
const productId = requireRunsRead(req);
const { id } = req.params as { id: string };
return repo.listRunSteps(productId, id);
});
}

View File

@ -0,0 +1,60 @@
import Fastify from 'fastify';
import bcrypt from 'bcryptjs';
import { beforeEach, describe, expect, it, vi } from 'vitest';
import { MemoryDatastoreProvider } from '@bytelyst/datastore';
import { setProvider } from '../../lib/datastore.js';
import { registerOptionalApiKeyContext } from '../../lib/api-key-auth.js';
const rawApiKey = `wai_${'c'.repeat(64)}`;
const repoMock = {
listRuns: vi.fn(),
getRun: vi.fn(),
listRunSteps: vi.fn(),
};
vi.mock('./repository.js', () => repoMock);
async function seedApiKey(scopes: string[]) {
const provider = new MemoryDatastoreProvider();
setProvider(provider);
const collection = provider.getCollection('api_tokens', '/id');
await collection.create({
id: 'tok_runs_1',
productId: 'lysnrai',
userId: 'svc_runs',
userName: 'Runs Service',
prefix: rawApiKey.slice(0, 12),
tokenHash: await bcrypt.hash(rawApiKey, 10),
status: 'active',
scopes,
expiresAt: '2099-01-01T00:00:00.000Z',
lastUsed: null,
});
}
describe('runRoutes api key integration', () => {
beforeEach(() => {
vi.clearAllMocks();
delete process.env.API_KEY_RATE_LIMIT_CONFIG_JSON;
});
it('allows run reads via scoped api key', async () => {
await seedApiKey(['jobs:read']);
repoMock.listRuns.mockResolvedValue([{ id: 'run_1', productId: 'lysnrai' }]);
const { runRoutes } = await import('./routes.js');
const app = Fastify();
await registerOptionalApiKeyContext(app);
await app.register(runRoutes, { prefix: '/api' });
const res = await app.inject({
method: 'GET',
url: '/api/runs?limit=10',
headers: { 'x-api-key': rawApiKey },
});
expect(res.statusCode).toBe(200);
expect(repoMock.listRuns).toHaveBeenCalledWith('lysnrai', { limit: 10 });
});
});

View File

@ -0,0 +1,118 @@
import type { RunDoc, RunStepDoc } from './types.js';
import * as repo from './repository.js';
interface StartRunInput {
id: string;
productId: string;
kind: RunDoc['kind'];
name: string;
source: string;
triggeredBy?: string;
parentRunId?: string;
queueName?: string;
queueJobId?: string;
input?: Record<string, unknown>;
metadata?: Record<string, unknown>;
}
interface StartRunStepInput {
runId: string;
productId: string;
stepName: string;
order: number;
input?: Record<string, unknown>;
metadata?: Record<string, unknown>;
}
export async function startRun(input: StartRunInput): Promise<RunDoc> {
const now = new Date().toISOString();
return repo.createRun({
id: input.id,
productId: input.productId,
kind: input.kind,
name: input.name,
source: input.source,
status: 'running',
triggeredBy: input.triggeredBy,
parentRunId: input.parentRunId,
queueName: input.queueName,
queueJobId: input.queueJobId,
input: input.input,
metadata: input.metadata,
createdAt: now,
startedAt: now,
updatedAt: now,
});
}
export async function completeRun(
id: string,
productId: string,
output?: Record<string, unknown>
): Promise<RunDoc> {
return repo.updateRun(id, productId, {
status: 'succeeded',
output,
completedAt: new Date().toISOString(),
});
}
export async function failRun(
id: string,
productId: string,
error: string,
output?: Record<string, unknown>
): Promise<RunDoc> {
return repo.updateRun(id, productId, {
status: 'failed',
error,
output,
completedAt: new Date().toISOString(),
});
}
export async function startRunStep(input: StartRunStepInput): Promise<RunStepDoc> {
const now = new Date().toISOString();
return repo.createRunStep({
id: `${input.runId}:${input.stepName}`,
pk: `${input.productId}:${input.runId}`,
runId: input.runId,
productId: input.productId,
stepName: input.stepName,
order: input.order,
status: 'running',
input: input.input,
metadata: input.metadata,
createdAt: now,
startedAt: now,
updatedAt: now,
});
}
export async function completeRunStep(
runId: string,
productId: string,
stepName: string,
output?: Record<string, unknown>
): Promise<RunStepDoc> {
return repo.updateRunStep(`${runId}:${stepName}`, runId, productId, {
status: 'succeeded',
output,
completedAt: new Date().toISOString(),
});
}
export async function failRunStep(
runId: string,
productId: string,
stepName: string,
error: string,
output?: Record<string, unknown>
): Promise<RunStepDoc> {
return repo.updateRunStep(`${runId}:${stepName}`, runId, productId, {
status: 'failed',
error,
output,
completedAt: new Date().toISOString(),
});
}

View File

@ -0,0 +1,69 @@
import { z } from 'zod';
export const RunKindSchema = z.enum(['job', 'agent']);
export const RunStatusSchema = z.enum(['queued', 'running', 'succeeded', 'failed', 'cancelled']);
export const RunStepStatusSchema = z.enum([
'pending',
'running',
'succeeded',
'failed',
'skipped',
'cancelled',
]);
export const RunSchema = z.object({
id: z.string().min(1),
productId: z.string().min(1),
kind: RunKindSchema,
name: z.string().min(1),
source: z.string().min(1),
status: RunStatusSchema,
triggeredBy: z.string().optional(),
parentRunId: z.string().optional(),
queueName: z.string().optional(),
queueJobId: z.string().optional(),
input: z.record(z.unknown()).optional(),
output: z.record(z.unknown()).optional(),
error: z.string().optional(),
metadata: z.record(z.unknown()).optional(),
createdAt: z.string(),
startedAt: z.string().optional(),
completedAt: z.string().optional(),
updatedAt: z.string(),
});
export type RunDoc = z.infer<typeof RunSchema> & {
_ts?: number;
_etag?: string;
};
export const RunStepSchema = z.object({
id: z.string().min(1),
runId: z.string().min(1),
productId: z.string().min(1),
stepName: z.string().min(1),
order: z.number().int().min(0),
status: RunStepStatusSchema,
input: z.record(z.unknown()).optional(),
output: z.record(z.unknown()).optional(),
error: z.string().optional(),
metadata: z.record(z.unknown()).optional(),
createdAt: z.string(),
startedAt: z.string().optional(),
completedAt: z.string().optional(),
updatedAt: z.string(),
});
export type RunStepDoc = z.infer<typeof RunStepSchema> & {
pk: string;
_ts?: number;
_etag?: string;
};
export const ListRunsQuerySchema = z.object({
kind: RunKindSchema.optional(),
status: RunStatusSchema.optional(),
limit: z.coerce.number().min(1).max(100).default(20),
});
export type ListRunsQuery = z.infer<typeof ListRunsQuerySchema>;

View File

@ -65,6 +65,7 @@ import { startTriggerEvaluationJob } from './modules/diagnostics/trigger-job.js'
import { broadcastRoutes } from './modules/broadcasts/routes.js';
import { surveyRoutes } from './modules/surveys/routes.js';
import { jobRoutes } from './modules/jobs/routes.js';
import { runRoutes } from './modules/runs/routes.js';
import { statusRoutes } from './modules/status/routes.js';
import { deliveryRoutes } from './modules/delivery/routes.js';
import { sessionRoutes } from './modules/sessions/routes.js';
@ -179,6 +180,7 @@ await app.register(performanceProfileRoutes, { prefix: '/api' });
await app.register(publicRoutes, { prefix: '/api' });
// Scheduled jobs module (admin: list, trigger, view runs)
await app.register(jobRoutes, { prefix: '/api' });
await app.register(runRoutes, { prefix: '/api' });
// Public status page + incident management
await app.register(statusRoutes, { prefix: '/api' });
// Transactional email delivery