feat(a2a): persist incident pipeline runs

This commit is contained in:
root 2026-03-15 05:58:15 +00:00
parent 07e9475b70
commit daec38faf7
7 changed files with 724 additions and 58 deletions

View File

@ -9,6 +9,16 @@ export interface PlatformClientOptions {
productId?: string;
}
export type RunKind = 'job' | 'agent';
export type RunStatus = 'queued' | 'running' | 'succeeded' | 'failed' | 'cancelled';
export type RunStepStatus =
| 'pending'
| 'running'
| 'succeeded'
| 'failed'
| 'skipped'
| 'cancelled';
export async function platformFetch<T>(
path: string,
init: RequestInit,
@ -167,6 +177,76 @@ export async function telemetryUpdateCluster(
);
}
// ── Runs ──────────────────────────────────────────────────────────────────────
export async function runsCreate(
body: {
id: string;
kind: RunKind;
name: string;
source: string;
triggeredBy?: string;
parentRunId?: string;
queueName?: string;
queueJobId?: string;
input?: Record<string, unknown>;
metadata?: Record<string, unknown>;
},
opts: PlatformClientOptions
): Promise<unknown> {
return platformFetch('/api/runs', { method: 'POST', body: JSON.stringify(body) }, opts);
}
export async function runsUpdate(
runId: string,
body: {
status: RunStatus;
output?: Record<string, unknown>;
error?: string;
},
opts: PlatformClientOptions
): Promise<unknown> {
return platformFetch(
`/api/runs/${encodeURIComponent(runId)}`,
{ method: 'PATCH', body: JSON.stringify(body) },
opts
);
}
export async function runStepsCreate(
runId: string,
body: {
stepName: string;
order: number;
input?: Record<string, unknown>;
metadata?: Record<string, unknown>;
},
opts: PlatformClientOptions
): Promise<unknown> {
return platformFetch(
`/api/runs/${encodeURIComponent(runId)}/steps`,
{ method: 'POST', body: JSON.stringify(body) },
opts
);
}
export async function runStepsUpdate(
runId: string,
stepName: string,
body: {
status: RunStepStatus;
output?: Record<string, unknown>;
error?: string;
},
opts: PlatformClientOptions
): Promise<unknown> {
return platformFetch(
`/api/runs/${encodeURIComponent(runId)}/steps/${encodeURIComponent(stepName)}`,
{ method: 'PATCH', body: JSON.stringify(body) },
opts
);
}
// ── Diagnostics ───────────────────────────────────────────────────────────────
export interface DebugSession {

View File

@ -0,0 +1,122 @@
import {
runStepsCreate,
runStepsUpdate,
runsCreate,
runsUpdate,
} from '../../lib/platform-client.js';
import type { PlatformClientOptions } from '../../lib/platform-client.js';
interface TrackRunInput {
runId: string;
productId: string;
name: string;
initiatedBy?: string;
requestId?: string;
token?: string;
input?: Record<string, unknown>;
}
interface TrackStepInput {
runId: string;
productId: string;
stepName: string;
order: number;
token?: string;
requestId?: string;
input?: Record<string, unknown>;
output?: Record<string, unknown>;
error?: string;
}
export async function trackRunStarted(input: TrackRunInput): Promise<void> {
await runsCreate(
{
id: input.runId,
kind: 'agent',
name: input.name,
source: 'mcp.a2a',
triggeredBy: input.initiatedBy,
input: input.input,
metadata: {
productId: input.productId,
},
},
buildOptions(input)
);
}
export async function trackRunCompleted(
input: TrackRunInput & { output?: Record<string, unknown> }
): Promise<void> {
await runsUpdate(
input.runId,
{
status: 'succeeded',
output: input.output,
},
buildOptions(input)
);
}
export async function trackRunFailed(
input: TrackRunInput & { error: string; output?: Record<string, unknown> }
): Promise<void> {
await runsUpdate(
input.runId,
{
status: 'failed',
error: input.error,
output: input.output,
},
buildOptions(input)
);
}
export async function trackStepStarted(input: TrackStepInput): Promise<void> {
await runStepsCreate(
input.runId,
{
stepName: input.stepName,
order: input.order,
input: input.input,
},
buildOptions(input)
);
}
export async function trackStepCompleted(input: TrackStepInput): Promise<void> {
await runStepsUpdate(
input.runId,
input.stepName,
{
status: 'succeeded',
output: input.output,
},
buildOptions(input)
);
}
export async function trackStepFailed(input: TrackStepInput): Promise<void> {
await runStepsUpdate(
input.runId,
input.stepName,
{
status: 'failed',
error: input.error ?? 'Step failed',
output: input.output,
},
buildOptions(input)
);
}
function buildOptions(input: {
token?: string;
requestId?: string;
productId: string;
}): PlatformClientOptions {
return {
token: input.token,
requestId: input.requestId,
productId: input.productId,
};
}

View File

@ -5,9 +5,20 @@ import type { McpLogger } from '../tools/types.js';
vi.mock('../../lib/platform-client.js', () => ({
telemetryClusters: vi.fn(),
diagnosticsCreateSession: vi.fn(),
runsCreate: vi.fn(),
runsUpdate: vi.fn(),
runStepsCreate: vi.fn(),
runStepsUpdate: vi.fn(),
}));
import { telemetryClusters, diagnosticsCreateSession } from '../../lib/platform-client.js';
import {
telemetryClusters,
diagnosticsCreateSession,
runsCreate,
runsUpdate,
runStepsCreate,
runStepsUpdate,
} from '../../lib/platform-client.js';
import { runIncidentPipeline } from './runner.js';
import type { SupportIncidentBrief } from './types.js';
@ -54,6 +65,10 @@ beforeEach(() => {
expiresAt: '2026-03-05T11:00:00Z',
collectionLevel: 'debug',
} as never);
vi.mocked(runsCreate).mockResolvedValue({} as never);
vi.mocked(runsUpdate).mockResolvedValue({} as never);
vi.mocked(runStepsCreate).mockResolvedValue({} as never);
vi.mocked(runStepsUpdate).mockResolvedValue({} as never);
});
describe('A2A incident pipeline', () => {
@ -115,6 +130,28 @@ describe('A2A incident pipeline', () => {
expect(stepNames).toContain('telemetry_analyst.complete');
expect(stepNames).toContain('pipeline.complete');
});
it('persists the incident pipeline run and steps through platform-service', async () => {
await runIncidentPipeline(baseBrief, {
log: mockLog,
token: 'jwt_1',
requestId: 'req_runs',
initiatedBy: 'admin_1',
});
expect(runsCreate).toHaveBeenCalledOnce();
expect(runStepsCreate).toHaveBeenCalledTimes(3);
expect(runStepsUpdate).toHaveBeenCalledTimes(3);
expect(runsUpdate).toHaveBeenCalledWith(
expect.stringMatching(/^run_/),
expect.objectContaining({ status: 'succeeded' }),
expect.objectContaining({
token: 'jwt_1',
requestId: 'req_runs',
productId: 'lysnrai',
})
);
});
});
describe('dispatcher', () => {

View File

@ -14,6 +14,14 @@ import { dispatch } from './agents/dispatcher.js';
import { analyze } from './agents/telemetry-analyst.js';
import { orchestrate } from './agents/diagnostics-orchestrator.js';
import { writeReport } from './agents/report-writer.js';
import {
trackRunCompleted,
trackRunFailed,
trackRunStarted,
trackStepCompleted,
trackStepFailed,
trackStepStarted,
} from './run-tracker.js';
export interface PipelineOptions {
token?: string;
@ -28,76 +36,262 @@ export async function runIncidentPipeline(
): Promise<FinalIncidentReport> {
const runId = `run_${randomUUID()}`;
const { log } = opts;
let currentStep:
| {
stepName: 'dispatcher' | 'telemetry_analyst' | 'diagnostics_orchestrator' | 'report_writer';
order: number;
}
| undefined;
await safeTrack(() =>
trackRunStarted({
runId,
productId: brief.productId,
name: 'incident-pipeline',
initiatedBy: opts.initiatedBy,
requestId: opts.requestId,
token: opts.token,
input: {
openDiagnosticsSession: brief.openDiagnosticsSession,
userReport: brief.userReport,
},
})
);
log.info(
{ runId, productId: brief.productId, a2aStep: 'pipeline.start' },
'A2A pipeline started'
);
// ── Step 1: Dispatcher ──────────────────────────────────────────────────
const decision = dispatch(brief, {
runId,
requestId: opts.requestId,
initiatedBy: opts.initiatedBy,
});
try {
// ── Step 1: Dispatcher ────────────────────────────────────────────────
currentStep = { stepName: 'dispatcher', order: 1 };
await safeTrack(() =>
trackStepStarted({
runId,
productId: brief.productId,
stepName: currentStep.stepName,
order: currentStep.order,
token: opts.token,
requestId: opts.requestId,
input: {
userReport: brief.userReport,
},
})
);
log.info(
{
const decision = dispatch(brief, {
runId,
stepId: decision.runContext.stepId,
a2aStep: 'dispatcher.complete',
steps: decision.steps,
resolvedTimeWindow: decision.resolvedTimeWindow,
},
'Dispatcher decided pipeline steps'
);
requestId: opts.requestId,
initiatedBy: opts.initiatedBy,
});
// ── Step 2: Telemetry Analyst ───────────────────────────────────────────
const findings = await analyze(decision, { token: opts.token });
log.info(
{
runId,
stepId: findings.runContext.stepId,
a2aStep: 'telemetry_analyst.complete',
clusterCount: findings.clusters.length,
hypothesisCount: findings.topHypotheses.length,
queryError: findings.queryError,
},
'Telemetry analyst completed'
);
// ── Step 3: Diagnostics Orchestrator (conditional) ─────────────────────
let diagResult = null;
if (decision.steps.includes('diagnostics_orchestrator')) {
diagResult = await orchestrate(decision, findings, { token: opts.token });
await safeTrack(() =>
trackStepCompleted({
runId,
productId: brief.productId,
stepName: currentStep.stepName,
order: currentStep.order,
token: opts.token,
requestId: opts.requestId,
output: {
steps: decision.steps,
resolvedTimeWindow: decision.resolvedTimeWindow,
},
})
);
log.info(
{
runId,
stepId: diagResult.runContext.stepId,
a2aStep: 'diagnostics_orchestrator.complete',
skipped: diagResult.skipped,
sessionId: diagResult.session?.id,
sessionError: diagResult.sessionError,
stepId: decision.runContext.stepId,
a2aStep: 'dispatcher.complete',
steps: decision.steps,
resolvedTimeWindow: decision.resolvedTimeWindow,
},
'Diagnostics orchestrator completed'
'Dispatcher decided pipeline steps'
);
// ── Step 2: Telemetry Analyst ─────────────────────────────────────────
currentStep = { stepName: 'telemetry_analyst', order: 2 };
await safeTrack(() =>
trackStepStarted({
runId,
productId: brief.productId,
stepName: currentStep.stepName,
order: currentStep.order,
token: opts.token,
requestId: opts.requestId,
})
);
const findings = await analyze(decision, { token: opts.token });
await safeTrack(() =>
trackStepCompleted({
runId,
productId: brief.productId,
stepName: currentStep.stepName,
order: currentStep.order,
token: opts.token,
requestId: opts.requestId,
output: {
clusterCount: findings.clusters.length,
hypothesisCount: findings.topHypotheses.length,
queryError: findings.queryError,
},
})
);
log.info(
{
runId,
stepId: findings.runContext.stepId,
a2aStep: 'telemetry_analyst.complete',
clusterCount: findings.clusters.length,
hypothesisCount: findings.topHypotheses.length,
queryError: findings.queryError,
},
'Telemetry analyst completed'
);
// ── Step 3: Diagnostics Orchestrator (conditional) ───────────────────
let diagResult = null;
if (decision.steps.includes('diagnostics_orchestrator')) {
currentStep = { stepName: 'diagnostics_orchestrator', order: 3 };
await safeTrack(() =>
trackStepStarted({
runId,
productId: brief.productId,
stepName: currentStep.stepName,
order: currentStep.order,
token: opts.token,
requestId: opts.requestId,
})
);
diagResult = await orchestrate(decision, findings, { token: opts.token });
await safeTrack(() =>
trackStepCompleted({
runId,
productId: brief.productId,
stepName: currentStep.stepName,
order: currentStep.order,
token: opts.token,
requestId: opts.requestId,
output: {
skipped: diagResult.skipped,
sessionId: diagResult.session?.id,
sessionError: diagResult.sessionError,
},
})
);
log.info(
{
runId,
stepId: diagResult.runContext.stepId,
a2aStep: 'diagnostics_orchestrator.complete',
skipped: diagResult.skipped,
sessionId: diagResult.session?.id,
sessionError: diagResult.sessionError,
},
'Diagnostics orchestrator completed'
);
}
// ── Step 4: Report Writer ─────────────────────────────────────────────
currentStep = { stepName: 'report_writer', order: 4 };
await safeTrack(() =>
trackStepStarted({
runId,
productId: brief.productId,
stepName: currentStep.stepName,
order: currentStep.order,
token: opts.token,
requestId: opts.requestId,
})
);
const report = writeReport(decision, findings, diagResult);
await safeTrack(() =>
trackStepCompleted({
runId,
productId: brief.productId,
stepName: currentStep.stepName,
order: currentStep.order,
token: opts.token,
requestId: opts.requestId,
output: {
clusterCount: report.clusterCount,
hasSession: !!report.diagnosticsSession,
},
})
);
await safeTrack(() =>
trackRunCompleted({
runId,
productId: brief.productId,
name: 'incident-pipeline',
initiatedBy: opts.initiatedBy,
requestId: opts.requestId,
token: opts.token,
output: {
clusterCount: report.clusterCount,
hasSession: !!report.diagnosticsSession,
},
})
);
log.info(
{
runId,
a2aStep: 'pipeline.complete',
productId: brief.productId,
clusterCount: report.clusterCount,
hasSession: !!report.diagnosticsSession,
},
'A2A pipeline completed'
);
return report;
} catch (error) {
const message = error instanceof Error ? error.message : String(error);
if (currentStep) {
await safeTrack(() =>
trackStepFailed({
runId,
productId: brief.productId,
stepName: currentStep.stepName,
order: currentStep.order,
token: opts.token,
requestId: opts.requestId,
error: message,
})
);
}
await safeTrack(() =>
trackRunFailed({
runId,
productId: brief.productId,
name: 'incident-pipeline',
initiatedBy: opts.initiatedBy,
requestId: opts.requestId,
token: opts.token,
error: message,
})
);
throw error;
}
}
async function safeTrack(fn: () => Promise<void>): Promise<void> {
try {
await fn();
} catch {
// Tracking must never fail the pipeline itself.
}
// ── Step 4: Report Writer ───────────────────────────────────────────────
const report = writeReport(decision, findings, diagResult);
log.info(
{
runId,
a2aStep: 'pipeline.complete',
productId: brief.productId,
clusterCount: report.clusterCount,
hasSession: !!report.diagnosticsSession,
},
'A2A pipeline completed'
);
return report;
}

View File

@ -0,0 +1,89 @@
import Fastify from 'fastify';
import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest';
const repoMock = {
listRuns: vi.fn(),
getRun: vi.fn(),
listRunSteps: vi.fn(),
updateRun: vi.fn(),
updateRunStep: vi.fn(),
};
const trackerMock = {
startRun: vi.fn(),
completeRun: vi.fn(),
failRun: vi.fn(),
startRunStep: vi.fn(),
completeRunStep: vi.fn(),
failRunStep: vi.fn(),
};
vi.mock('./repository.js', () => repoMock);
vi.mock('./tracker.js', () => trackerMock);
async function buildApp(payload?: { sub: string; productId: string; role?: string }) {
const { runRoutes } = await import('./routes.js');
const app = Fastify({ logger: false });
if (payload) {
app.addHook('onRequest', async req => {
req.jwtPayload = payload;
});
}
await app.register(runRoutes, { prefix: '/api' });
return app;
}
describe('runRoutes', () => {
beforeEach(() => {
vi.clearAllMocks();
});
afterEach(() => {
vi.restoreAllMocks();
});
it('POST /runs creates a durable run for admin jwt callers', async () => {
trackerMock.startRun.mockResolvedValue({ id: 'run_1', status: 'running' });
const app = await buildApp({ sub: 'admin_1', productId: 'lysnrai', role: 'admin' });
const res = await app.inject({
method: 'POST',
url: '/api/runs',
payload: {
id: 'run_1',
kind: 'agent',
name: 'incident-pipeline',
source: 'mcp.a2a',
},
});
expect(res.statusCode).toBe(200);
expect(trackerMock.startRun).toHaveBeenCalledWith({
id: 'run_1',
kind: 'agent',
name: 'incident-pipeline',
source: 'mcp.a2a',
productId: 'lysnrai',
triggeredBy: 'admin_1',
});
});
it('PATCH /runs/:id completes a run', async () => {
trackerMock.completeRun.mockResolvedValue({ id: 'run_1', status: 'succeeded' });
const app = await buildApp({ sub: 'admin_1', productId: 'lysnrai', role: 'admin' });
const res = await app.inject({
method: 'PATCH',
url: '/api/runs/run_1',
payload: {
status: 'succeeded',
output: { reportId: 'rep_1' },
},
});
expect(res.statusCode).toBe(200);
expect(trackerMock.completeRun).toHaveBeenCalledWith('run_1', 'lysnrai', {
reportId: 'rep_1',
});
});
});

View File

@ -1,8 +1,15 @@
import type { FastifyInstance } from 'fastify';
import { requireJwtOrApiKey } from '../../lib/api-key-auth.js';
import { BadRequestError } from '../../lib/errors.js';
import { ListRunsQuerySchema } from './types.js';
import {
CreateRunSchema,
CreateRunStepSchema,
ListRunsQuerySchema,
UpdateRunSchema,
UpdateRunStepSchema,
} from './types.js';
import * as repo from './repository.js';
import * as tracker from './tracker.js';
export async function runRoutes(app: FastifyInstance) {
function requireRunsRead(req: import('fastify').FastifyRequest): string {
@ -14,6 +21,17 @@ export async function runRoutes(app: FastifyInstance) {
return access.productId;
}
function requireRunsWrite(req: import('fastify').FastifyRequest): {
productId: string;
actorId: string;
} {
const access = requireJwtOrApiKey(req, {
jwtRoles: ['super_admin', 'admin'],
rateLimitKey: 'jobs:write',
});
return { productId: access.productId, actorId: access.actorId };
}
app.get('/runs', async req => {
const productId = requireRunsRead(req);
const parsed = ListRunsQuerySchema.safeParse(req.query);
@ -35,4 +53,90 @@ export async function runRoutes(app: FastifyInstance) {
const { id } = req.params as { id: string };
return repo.listRunSteps(productId, id);
});
app.post('/runs', async req => {
const { productId, actorId } = requireRunsWrite(req);
const parsed = CreateRunSchema.safeParse(req.body);
if (!parsed.success) {
throw new BadRequestError(parsed.error.issues.map(issue => issue.message).join('; '));
}
return tracker.startRun({
...parsed.data,
productId,
triggeredBy: parsed.data.triggeredBy ?? actorId,
});
});
app.patch('/runs/:id', async req => {
const { productId } = requireRunsWrite(req);
const { id } = req.params as { id: string };
const parsed = UpdateRunSchema.safeParse(req.body);
if (!parsed.success) {
throw new BadRequestError(parsed.error.issues.map(issue => issue.message).join('; '));
}
if (parsed.data.status === 'succeeded') {
return tracker.completeRun(id, productId, parsed.data.output);
}
if (parsed.data.status === 'failed') {
return tracker.failRun(id, productId, parsed.data.error ?? 'Run failed', parsed.data.output);
}
return repo.updateRun(id, productId, {
status: parsed.data.status,
output: parsed.data.output,
error: parsed.data.error,
completedAt: parsed.data.status === 'cancelled' ? new Date().toISOString() : undefined,
});
});
app.post('/runs/:id/steps', async req => {
const { productId } = requireRunsWrite(req);
const { id } = req.params as { id: string };
const parsed = CreateRunStepSchema.safeParse(req.body);
if (!parsed.success) {
throw new BadRequestError(parsed.error.issues.map(issue => issue.message).join('; '));
}
return tracker.startRunStep({
runId: id,
productId,
...parsed.data,
});
});
app.patch('/runs/:id/steps/:stepName', async req => {
const { productId } = requireRunsWrite(req);
const { id, stepName } = req.params as { id: string; stepName: string };
const parsed = UpdateRunStepSchema.safeParse(req.body);
if (!parsed.success) {
throw new BadRequestError(parsed.error.issues.map(issue => issue.message).join('; '));
}
if (parsed.data.status === 'succeeded') {
return tracker.completeRunStep(id, productId, stepName, parsed.data.output);
}
if (parsed.data.status === 'failed') {
return tracker.failRunStep(
id,
productId,
stepName,
parsed.data.error ?? 'Run step failed',
parsed.data.output
);
}
return repo.updateRunStep(`${id}:${stepName}`, id, productId, {
status: parsed.data.status,
output: parsed.data.output,
error: parsed.data.error,
completedAt:
parsed.data.status === 'cancelled' || parsed.data.status === 'skipped'
? new Date().toISOString()
: undefined,
});
});
}

View File

@ -67,3 +67,43 @@ export const ListRunsQuerySchema = z.object({
});
export type ListRunsQuery = z.infer<typeof ListRunsQuerySchema>;
export const CreateRunSchema = z.object({
id: z.string().min(1),
kind: RunKindSchema,
name: z.string().min(1),
source: z.string().min(1),
triggeredBy: z.string().optional(),
parentRunId: z.string().optional(),
queueName: z.string().optional(),
queueJobId: z.string().optional(),
input: z.record(z.unknown()).optional(),
metadata: z.record(z.unknown()).optional(),
});
export type CreateRunInput = z.infer<typeof CreateRunSchema>;
export const UpdateRunSchema = z.object({
status: RunStatusSchema,
output: z.record(z.unknown()).optional(),
error: z.string().optional(),
});
export type UpdateRunInput = z.infer<typeof UpdateRunSchema>;
export const CreateRunStepSchema = z.object({
stepName: z.string().min(1),
order: z.number().int().min(0),
input: z.record(z.unknown()).optional(),
metadata: z.record(z.unknown()).optional(),
});
export type CreateRunStepInput = z.infer<typeof CreateRunStepSchema>;
export const UpdateRunStepSchema = z.object({
status: RunStepStatusSchema,
output: z.record(z.unknown()).optional(),
error: z.string().optional(),
});
export type UpdateRunStepInput = z.infer<typeof UpdateRunStepSchema>;