feat(mcp-server): Phase 2 A2A orchestration — DispatcherAgent, TelemetryAnalystAgent, DiagnosticsOrchestratorAgent, ReportWriterAgent, runIncidentPipeline runner + support.runIncidentPipeline MCP tool (10 tests)
This commit is contained in:
parent
d1d643f782
commit
0f044830fa
@ -47,7 +47,26 @@ This is the “ready to start building” checklist that turns the docs in this
|
||||
- [x] Python sidecar health visible via `extraction.sidecarHealth`
|
||||
- [ ] End-to-end integration test with real platform-service (Phase 2)
|
||||
|
||||
## 5) Phase 2+ quick sanity checks
|
||||
## 5) Phase 2 — A2A orchestration ✅ COMPLETE
|
||||
|
||||
### Milestone D (A2A runner + handoff schemas) ✅ — d1d643f+
|
||||
|
||||
- [x] Handoff artifact schemas (Zod): `SupportIncidentBrief`, `TelemetryFindings`, `DiagnosticsSessionResult`, `FinalIncidentReport`
|
||||
- [x] `DispatcherAgent` — validates brief, fills default 24h time window, decides which steps to run
|
||||
- [x] `TelemetryAnalystAgent` — queries clusters, filters by platform, derives hypotheses + actions
|
||||
- [x] `DiagnosticsOrchestratorAgent` — conditionally creates expiring session based on findings severity
|
||||
- [x] `ReportWriterAgent` — assembles structured `FinalIncidentReport` + full markdown report
|
||||
- [x] `runIncidentPipeline` runner — logs every step with stable `runId` + `stepId` (audit tracing)
|
||||
- [x] `support.runIncidentPipeline` MCP tool — single callable entry point for the pipeline
|
||||
- [x] `McpToolRequest.log` — structured logger added; `routes.ts` passes `req.log` to all tool calls
|
||||
- [x] 6 Vitest tests covering: no-session run, session creation, skip when no target, upstream failure, log audit, time window defaulting
|
||||
|
||||
### Acceptance criteria ✅
|
||||
|
||||
- [x] "Support Debug Pack" runs end-to-end via Dispatcher → Telemetry Analyst → Diagnostics Orchestrator → Report Writer
|
||||
- [x] Every handoff is logged with stable `runId` + `stepId` (Phase 1: logs; Phase 2+: Cosmos container)
|
||||
|
||||
## 6) Phase 3+ quick sanity checks
|
||||
|
||||
- If you make telemetry policy-aware clients:
|
||||
- ensure `GET /api/telemetry/config` consumption is cached (ETag) and privacy-safe.
|
||||
|
||||
@ -0,0 +1,78 @@
|
||||
/**
|
||||
* DiagnosticsOrchestratorAgent — optionally opens a live remote diagnostics
|
||||
* session for the affected user / install ID based on TelemetryFindings.
|
||||
*/
|
||||
|
||||
import { randomUUID } from 'node:crypto';
|
||||
import { diagnosticsCreateSession } from '../../../lib/platform-client.js';
|
||||
import type { DispatchDecision, TelemetryFindings, DiagnosticsSessionResult } from '../types.js';
|
||||
|
||||
interface CreatedSession {
|
||||
id: string;
|
||||
status: string;
|
||||
expiresAt: string;
|
||||
collectionLevel?: string;
|
||||
targetUserId?: string;
|
||||
targetAnonymousId?: string;
|
||||
}
|
||||
|
||||
export async function orchestrate(
|
||||
decision: DispatchDecision,
|
||||
findings: TelemetryFindings,
|
||||
opts: { token?: string }
|
||||
): Promise<DiagnosticsSessionResult> {
|
||||
const stepId = `step_diag_${randomUUID().slice(0, 8)}`;
|
||||
const ctx = { ...findings.runContext, stepId };
|
||||
|
||||
const { brief } = decision;
|
||||
const hasTarget = !!(brief.userReport.userId || brief.userReport.anonymousInstallId);
|
||||
|
||||
// Skip if not requested or no target
|
||||
if (!brief.openDiagnosticsSession || !hasTarget) {
|
||||
return {
|
||||
runContext: ctx,
|
||||
skipped: true,
|
||||
skipReason: brief.openDiagnosticsSession
|
||||
? 'No userId or anonymousInstallId provided — cannot target a diagnostics session'
|
||||
: 'openDiagnosticsSession is false — skipping session creation',
|
||||
};
|
||||
}
|
||||
|
||||
// Choose collection level based on findings severity
|
||||
const hasFatal = findings.clusters.some(c => c.severity === 'fatal');
|
||||
const collectionLevel = hasFatal ? 'trace' : 'debug';
|
||||
|
||||
try {
|
||||
const session = (await diagnosticsCreateSession(
|
||||
{
|
||||
productId: brief.productId,
|
||||
targetUserId: brief.userReport.userId,
|
||||
targetAnonymousId: brief.userReport.anonymousInstallId,
|
||||
collectionLevel,
|
||||
captureLogs: true,
|
||||
captureNetwork: true,
|
||||
maxDurationMinutes: 30,
|
||||
},
|
||||
{ token: opts.token, requestId: findings.runContext.requestId, productId: brief.productId }
|
||||
)) as CreatedSession;
|
||||
|
||||
return {
|
||||
runContext: ctx,
|
||||
skipped: false,
|
||||
session: {
|
||||
id: session.id,
|
||||
status: session.status,
|
||||
expiresAt: session.expiresAt,
|
||||
collectionLevel: session.collectionLevel ?? collectionLevel,
|
||||
targetUserId: brief.userReport.userId,
|
||||
targetAnonymousId: brief.userReport.anonymousInstallId,
|
||||
},
|
||||
};
|
||||
} catch (err) {
|
||||
return {
|
||||
runContext: ctx,
|
||||
skipped: false,
|
||||
sessionError: err instanceof Error ? err.message : String(err),
|
||||
};
|
||||
}
|
||||
}
|
||||
44
services/mcp-server/src/modules/a2a/agents/dispatcher.ts
Normal file
44
services/mcp-server/src/modules/a2a/agents/dispatcher.ts
Normal file
@ -0,0 +1,44 @@
|
||||
/**
|
||||
* DispatcherAgent — validates the SupportIncidentBrief, fills in defaults,
|
||||
* and decides which downstream agents to activate.
|
||||
*/
|
||||
|
||||
import { randomUUID } from 'node:crypto';
|
||||
import type { SupportIncidentBrief, DispatchDecision, RunContext } from '../types.js';
|
||||
|
||||
/** Default look-back when no timeWindow provided — 24 hours */
|
||||
const DEFAULT_LOOKBACK_MS = 24 * 60 * 60 * 1000;
|
||||
|
||||
export function dispatch(
|
||||
brief: SupportIncidentBrief,
|
||||
opts: { runId: string; requestId?: string; initiatedBy?: string }
|
||||
): DispatchDecision {
|
||||
const stepId = `step_dispatch_${randomUUID().slice(0, 8)}`;
|
||||
|
||||
const runContext: RunContext = {
|
||||
runId: opts.runId,
|
||||
stepId,
|
||||
productId: brief.productId,
|
||||
requestId: opts.requestId,
|
||||
initiatedBy: opts.initiatedBy,
|
||||
};
|
||||
|
||||
// Resolve time window — default to last 24 h if not provided
|
||||
const now = new Date();
|
||||
const resolvedTimeWindow = {
|
||||
to: brief.timeWindow.to ?? now.toISOString(),
|
||||
from: brief.timeWindow.from ?? new Date(now.getTime() - DEFAULT_LOOKBACK_MS).toISOString(),
|
||||
};
|
||||
|
||||
// Decide which agents to run
|
||||
const steps: DispatchDecision['steps'] = ['telemetry_analyst'];
|
||||
|
||||
const hasTarget = !!(brief.userReport.userId || brief.userReport.anonymousInstallId);
|
||||
if (brief.openDiagnosticsSession && hasTarget) {
|
||||
steps.push('diagnostics_orchestrator');
|
||||
}
|
||||
|
||||
steps.push('report_writer');
|
||||
|
||||
return { runContext, brief, steps, resolvedTimeWindow };
|
||||
}
|
||||
100
services/mcp-server/src/modules/a2a/agents/report-writer.ts
Normal file
100
services/mcp-server/src/modules/a2a/agents/report-writer.ts
Normal file
@ -0,0 +1,100 @@
|
||||
/**
|
||||
* ReportWriterAgent — assembles all step outputs into a FinalIncidentReport
|
||||
* with a stable runId and a markdown summary.
|
||||
*/
|
||||
|
||||
import type {
|
||||
DispatchDecision,
|
||||
TelemetryFindings,
|
||||
DiagnosticsSessionResult,
|
||||
FinalIncidentReport,
|
||||
} from '../types.js';
|
||||
|
||||
export function writeReport(
|
||||
decision: DispatchDecision,
|
||||
findings: TelemetryFindings,
|
||||
diagResult: DiagnosticsSessionResult | null
|
||||
): FinalIncidentReport {
|
||||
const { runContext, brief } = decision;
|
||||
const generatedAt = new Date().toISOString();
|
||||
|
||||
const lines: string[] = [
|
||||
`# Incident Report — ${brief.productId}`,
|
||||
`**Run ID:** \`${runContext.runId}\``,
|
||||
`**Generated:** ${generatedAt}`,
|
||||
`**Reported issue:** ${brief.userReport.summary}`,
|
||||
brief.userReport.platform ? `**Platform:** ${brief.userReport.platform}` : '',
|
||||
brief.userReport.appVersion ? `**App version:** ${brief.userReport.appVersion}` : '',
|
||||
'',
|
||||
`## Time Window`,
|
||||
`- From: ${decision.resolvedTimeWindow.from}`,
|
||||
`- To: ${decision.resolvedTimeWindow.to}`,
|
||||
'',
|
||||
`## Telemetry Findings (${findings.clusters.length} open cluster${findings.clusters.length !== 1 ? 's' : ''})`,
|
||||
];
|
||||
|
||||
if (findings.queryError) {
|
||||
lines.push(`> ⚠️ Telemetry query failed: ${findings.queryError}`);
|
||||
} else if (findings.clusters.length === 0) {
|
||||
lines.push('No open error clusters found in this time window.');
|
||||
} else {
|
||||
for (const c of findings.clusters.slice(0, 10)) {
|
||||
lines.push(
|
||||
`- **[${c.severity.toUpperCase()}]** \`${c.clusterId}\` — ${c.totalCount} events` +
|
||||
` | platform: ${c.platform} | module: ${c.module}` +
|
||||
(c.sampleMessage ? ` | "${c.sampleMessage}"` : '')
|
||||
);
|
||||
}
|
||||
if (findings.clusters.length > 10) {
|
||||
lines.push(` … and ${findings.clusters.length - 10} more clusters`);
|
||||
}
|
||||
}
|
||||
|
||||
lines.push('');
|
||||
lines.push('## Top Hypotheses');
|
||||
if (findings.topHypotheses.length === 0) {
|
||||
lines.push('No hypotheses generated.');
|
||||
} else {
|
||||
for (const h of findings.topHypotheses) lines.push(`- ${h}`);
|
||||
}
|
||||
|
||||
lines.push('');
|
||||
lines.push('## Recommended Actions');
|
||||
if (findings.recommendedActions.length === 0) {
|
||||
lines.push('No actions recommended.');
|
||||
} else {
|
||||
for (const a of findings.recommendedActions) lines.push(`- ${a}`);
|
||||
}
|
||||
|
||||
lines.push('');
|
||||
lines.push('## Diagnostics Session');
|
||||
if (!diagResult || diagResult.skipped) {
|
||||
lines.push(diagResult?.skipReason ? `Skipped: ${diagResult.skipReason}` : 'Not requested.');
|
||||
} else if (diagResult.sessionError) {
|
||||
lines.push(`> ⚠️ Session creation failed: ${diagResult.sessionError}`);
|
||||
} else if (diagResult.session) {
|
||||
const s = diagResult.session;
|
||||
lines.push(`- **Session ID:** \`${s.id}\``);
|
||||
lines.push(`- **Status:** ${s.status}`);
|
||||
lines.push(`- **Collection level:** ${s.collectionLevel}`);
|
||||
lines.push(`- **Expires:** ${s.expiresAt}`);
|
||||
if (s.targetUserId) lines.push(`- **Target user:** ${s.targetUserId}`);
|
||||
}
|
||||
|
||||
lines.push('');
|
||||
lines.push('---');
|
||||
lines.push(`*Generated by ByteLyst A2A incident pipeline · run \`${runContext.runId}\`*`);
|
||||
|
||||
return {
|
||||
runId: runContext.runId,
|
||||
productId: brief.productId,
|
||||
generatedAt,
|
||||
summary: brief.userReport.summary,
|
||||
clusterCount: findings.clusters.length,
|
||||
clusters: findings.clusters,
|
||||
topHypotheses: findings.topHypotheses,
|
||||
recommendedActions: findings.recommendedActions,
|
||||
diagnosticsSession: diagResult?.session ?? null,
|
||||
markdownReport: lines.filter(l => l !== '').join('\n'),
|
||||
};
|
||||
}
|
||||
166
services/mcp-server/src/modules/a2a/agents/telemetry-analyst.ts
Normal file
166
services/mcp-server/src/modules/a2a/agents/telemetry-analyst.ts
Normal file
@ -0,0 +1,166 @@
|
||||
/**
|
||||
* TelemetryAnalystAgent — queries telemetry clusters for the incident time
|
||||
* window and produces structured TelemetryFindings with hypotheses and
|
||||
* recommended next actions.
|
||||
*/
|
||||
|
||||
import { randomUUID } from 'node:crypto';
|
||||
import { telemetryClusters } from '../../../lib/platform-client.js';
|
||||
import type { DispatchDecision, TelemetryFindings, ClusterRef } from '../types.js';
|
||||
|
||||
interface RawCluster {
|
||||
id?: string;
|
||||
pk?: string;
|
||||
fingerprint?: string;
|
||||
severity?: string;
|
||||
totalCount?: number;
|
||||
lastSeenAt?: string;
|
||||
platform?: string;
|
||||
module?: string;
|
||||
sampleMessage?: string;
|
||||
status?: string;
|
||||
}
|
||||
|
||||
export async function analyze(
|
||||
decision: DispatchDecision,
|
||||
opts: { token?: string }
|
||||
): Promise<TelemetryFindings> {
|
||||
const { runContext, brief, resolvedTimeWindow } = decision;
|
||||
const stepId = `step_telemetry_${randomUUID().slice(0, 8)}`;
|
||||
const ctx = { ...runContext, stepId };
|
||||
|
||||
let clusters: ClusterRef[] = [];
|
||||
let queryError: string | undefined;
|
||||
|
||||
try {
|
||||
const result = await telemetryClusters(
|
||||
{ productId: brief.productId, from: resolvedTimeWindow.from, to: resolvedTimeWindow.to },
|
||||
{ token: opts.token, requestId: runContext.requestId, productId: brief.productId }
|
||||
);
|
||||
|
||||
const raw: RawCluster[] = (result as { clusters?: RawCluster[] }).clusters ?? [];
|
||||
|
||||
clusters = raw
|
||||
.filter(c => c.status !== 'resolved' && c.status !== 'ignored')
|
||||
.map(c => ({
|
||||
clusterId: c.id ?? '',
|
||||
pk: c.pk ?? '',
|
||||
fingerprint: c.fingerprint ?? '',
|
||||
severity: c.severity ?? 'error',
|
||||
totalCount: c.totalCount ?? 0,
|
||||
lastSeenAt: c.lastSeenAt ?? '',
|
||||
platform: c.platform ?? '',
|
||||
module: c.module ?? '',
|
||||
sampleMessage: c.sampleMessage,
|
||||
}));
|
||||
|
||||
// Filter by platform if the incident report names one
|
||||
if (brief.userReport.platform) {
|
||||
const p = brief.userReport.platform.toLowerCase();
|
||||
const platformClusters = clusters.filter(c => c.platform.toLowerCase() === p);
|
||||
if (platformClusters.length > 0) clusters = platformClusters;
|
||||
}
|
||||
|
||||
// Sort by totalCount descending
|
||||
clusters.sort((a, b) => b.totalCount - a.totalCount);
|
||||
} catch (err) {
|
||||
queryError = err instanceof Error ? err.message : String(err);
|
||||
}
|
||||
|
||||
const topHypotheses = deriveHypotheses(clusters, brief.userReport.summary);
|
||||
const recommendedActions = deriveActions(clusters, decision);
|
||||
|
||||
return { runContext: ctx, clusters, topHypotheses, recommendedActions, queryError };
|
||||
}
|
||||
|
||||
// ── Heuristic hypothesis generator ────────────────────────────────────────
|
||||
|
||||
function deriveHypotheses(clusters: ClusterRef[], summary: string): string[] {
|
||||
const hypotheses: string[] = [];
|
||||
const lower = summary.toLowerCase();
|
||||
|
||||
if (clusters.length === 0) {
|
||||
hypotheses.push('No open error clusters found — issue may be intermittent or not yet surfaced');
|
||||
return hypotheses;
|
||||
}
|
||||
|
||||
const fatalClusters = clusters.filter(c => c.severity === 'fatal');
|
||||
if (fatalClusters.length > 0) {
|
||||
hypotheses.push(
|
||||
`${fatalClusters.length} fatal cluster(s) detected — highest priority for investigation`
|
||||
);
|
||||
}
|
||||
|
||||
const topCluster = clusters[0];
|
||||
if (topCluster.sampleMessage) {
|
||||
hypotheses.push(`Top cluster message: "${topCluster.sampleMessage}"`);
|
||||
}
|
||||
|
||||
if (lower.includes('dictation') || lower.includes('speech') || lower.includes('transcri')) {
|
||||
hypotheses.push('Audio permission or microphone access may be revoked');
|
||||
hypotheses.push('Speech-to-text pipeline may have a timeout or encoding error');
|
||||
}
|
||||
|
||||
if (lower.includes('insert') || lower.includes('paste') || lower.includes('nothing')) {
|
||||
hypotheses.push('Text insertion API returned a no-op — possible permission or focus issue');
|
||||
}
|
||||
|
||||
if (lower.includes('crash') || lower.includes('fatal')) {
|
||||
hypotheses.push('Memory pressure or unexpected nil dereference in affected module');
|
||||
}
|
||||
|
||||
const modules = [
|
||||
...new Set(
|
||||
clusters
|
||||
.slice(0, 5)
|
||||
.map(c => c.module)
|
||||
.filter(Boolean)
|
||||
),
|
||||
];
|
||||
if (modules.length > 0) {
|
||||
hypotheses.push(`Affected modules: ${modules.join(', ')}`);
|
||||
}
|
||||
|
||||
return hypotheses.slice(0, 5);
|
||||
}
|
||||
|
||||
function deriveActions(clusters: ClusterRef[], decision: DispatchDecision): string[] {
|
||||
const actions: string[] = [];
|
||||
|
||||
if (clusters.length === 0) {
|
||||
actions.push('Widen time window or check alternate platforms');
|
||||
actions.push('Verify telemetry is enabled and events are being ingested');
|
||||
return actions;
|
||||
}
|
||||
|
||||
const hasTarget = !!(
|
||||
decision.brief.userReport.userId || decision.brief.userReport.anonymousInstallId
|
||||
);
|
||||
|
||||
if (hasTarget && !decision.brief.openDiagnosticsSession) {
|
||||
actions.push(
|
||||
'Re-run with openDiagnosticsSession: true to capture a live trace for the affected user'
|
||||
);
|
||||
}
|
||||
|
||||
const fatalClusters = clusters.filter(c => c.severity === 'fatal');
|
||||
if (fatalClusters.length > 0) {
|
||||
actions.push(
|
||||
`Resolve or investigate ${fatalClusters.length} fatal cluster(s): ` +
|
||||
fatalClusters
|
||||
.slice(0, 3)
|
||||
.map(c => c.clusterId)
|
||||
.join(', ')
|
||||
);
|
||||
}
|
||||
|
||||
if (clusters.length >= 3) {
|
||||
actions.push(
|
||||
'Consider creating a targeted telemetry policy to capture debug-level events for affected module'
|
||||
);
|
||||
}
|
||||
|
||||
actions.push(`Review top cluster (${clusters[0].clusterId}) in admin telemetry explorer`);
|
||||
|
||||
return actions.slice(0, 5);
|
||||
}
|
||||
42
services/mcp-server/src/modules/a2a/pipeline-tool.ts
Normal file
42
services/mcp-server/src/modules/a2a/pipeline-tool.ts
Normal file
@ -0,0 +1,42 @@
|
||||
/**
|
||||
* MCP tool: support.runIncidentPipeline
|
||||
*
|
||||
* Wraps the full A2A incident pipeline as a single callable MCP tool.
|
||||
* Agents run in sequence: Dispatcher → TelemetryAnalyst → DiagnosticsOrchestrator → ReportWriter.
|
||||
*/
|
||||
|
||||
import { z } from 'zod';
|
||||
import { registerTool } from '../tools/registry.js';
|
||||
import { SupportIncidentBriefSchema } from './types.js';
|
||||
import { runIncidentPipeline } from './runner.js';
|
||||
import type { McpToolRequest } from '../tools/types.js';
|
||||
|
||||
registerTool({
|
||||
name: 'support.runIncidentPipeline',
|
||||
description: [
|
||||
'A2A incident pipeline: Dispatcher → TelemetryAnalyst → DiagnosticsOrchestrator → ReportWriter.',
|
||||
'Provide a productId + userReport describing the issue. The pipeline queries telemetry clusters,',
|
||||
'generates hypotheses and recommended actions, optionally opens a live diagnostics session',
|
||||
'(set openDiagnosticsSession: true + userId/anonymousInstallId), and returns a FinalIncidentReport',
|
||||
'with a markdown summary. Every step is logged with a stable runId for audit tracing.',
|
||||
'Requires admin role.',
|
||||
].join(' '),
|
||||
requiredRole: 'admin',
|
||||
inputSchema: SupportIncidentBriefSchema.extend({
|
||||
openDiagnosticsSession: z
|
||||
.boolean()
|
||||
.default(false)
|
||||
.describe(
|
||||
'Open a live diagnostics session for the affected user (requires userId or anonymousInstallId)'
|
||||
),
|
||||
}),
|
||||
async execute(args, req: McpToolRequest) {
|
||||
const token = req.headers.authorization?.slice(7);
|
||||
return runIncidentPipeline(args, {
|
||||
token,
|
||||
requestId: req.id,
|
||||
initiatedBy: req.jwtPayload?.sub,
|
||||
log: req.log,
|
||||
});
|
||||
},
|
||||
});
|
||||
130
services/mcp-server/src/modules/a2a/runner.test.ts
Normal file
130
services/mcp-server/src/modules/a2a/runner.test.ts
Normal file
@ -0,0 +1,130 @@
|
||||
import { describe, it, expect, vi, beforeEach } from 'vitest';
|
||||
import type { McpLogger } from '../tools/types.js';
|
||||
|
||||
// ── Mock platform-client before importing runner ───────────────────────────
|
||||
vi.mock('../../lib/platform-client.js', () => ({
|
||||
telemetryClusters: vi.fn(),
|
||||
diagnosticsCreateSession: vi.fn(),
|
||||
}));
|
||||
|
||||
import { telemetryClusters, diagnosticsCreateSession } from '../../lib/platform-client.js';
|
||||
import { runIncidentPipeline } from './runner.js';
|
||||
import type { SupportIncidentBrief } from './types.js';
|
||||
|
||||
const mockLog: McpLogger = {
|
||||
info: vi.fn(),
|
||||
warn: vi.fn(),
|
||||
error: vi.fn(),
|
||||
debug: vi.fn(),
|
||||
};
|
||||
|
||||
const baseBrief: SupportIncidentBrief = {
|
||||
productId: 'lysnrai',
|
||||
userReport: { summary: 'Dictation inserts nothing in Messages', platform: 'ios' },
|
||||
timeWindow: {},
|
||||
openDiagnosticsSession: false,
|
||||
};
|
||||
|
||||
const mockClusters = [
|
||||
{
|
||||
id: 'abc123:202503',
|
||||
pk: 'lysnrai:ios:keyboard',
|
||||
fingerprint: 'abc123',
|
||||
severity: 'error',
|
||||
totalCount: 42,
|
||||
lastSeenAt: '2026-03-05T10:00:00Z',
|
||||
platform: 'ios',
|
||||
module: 'keyboard',
|
||||
sampleMessage: 'insertText returned noop',
|
||||
status: 'open',
|
||||
},
|
||||
];
|
||||
|
||||
beforeEach(() => {
|
||||
vi.clearAllMocks();
|
||||
vi.mocked(telemetryClusters).mockResolvedValue({ clusters: mockClusters } as never);
|
||||
vi.mocked(diagnosticsCreateSession).mockResolvedValue({
|
||||
id: 'sess_001',
|
||||
status: 'active',
|
||||
expiresAt: '2026-03-05T11:00:00Z',
|
||||
collectionLevel: 'debug',
|
||||
} as never);
|
||||
});
|
||||
|
||||
describe('A2A incident pipeline', () => {
|
||||
it('runs dispatcher → telemetry analyst → report writer (no diagnostics)', async () => {
|
||||
const report = await runIncidentPipeline(baseBrief, {
|
||||
log: mockLog,
|
||||
requestId: 'req_test',
|
||||
});
|
||||
|
||||
expect(report.runId).toMatch(/^run_/);
|
||||
expect(report.productId).toBe('lysnrai');
|
||||
expect(report.clusterCount).toBe(1);
|
||||
expect(report.clusters[0].clusterId).toBe('abc123:202503');
|
||||
expect(report.diagnosticsSession).toBeNull();
|
||||
expect(report.markdownReport).toContain('abc123:202503');
|
||||
expect(report.topHypotheses.length).toBeGreaterThan(0);
|
||||
expect(report.recommendedActions.length).toBeGreaterThan(0);
|
||||
expect(diagnosticsCreateSession).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it('creates a diagnostics session when openDiagnosticsSession is true + userId provided', async () => {
|
||||
const brief: SupportIncidentBrief = {
|
||||
...baseBrief,
|
||||
userReport: { ...baseBrief.userReport, userId: 'usr_abc' },
|
||||
openDiagnosticsSession: true,
|
||||
};
|
||||
|
||||
const report = await runIncidentPipeline(brief, { log: mockLog });
|
||||
|
||||
expect(diagnosticsCreateSession).toHaveBeenCalledOnce();
|
||||
expect(report.diagnosticsSession?.id).toBe('sess_001');
|
||||
expect(report.markdownReport).toContain('sess_001');
|
||||
});
|
||||
|
||||
it('skips diagnostics session when no userId/anonymousInstallId', async () => {
|
||||
const brief: SupportIncidentBrief = { ...baseBrief, openDiagnosticsSession: true };
|
||||
const report = await runIncidentPipeline(brief, { log: mockLog });
|
||||
|
||||
expect(diagnosticsCreateSession).not.toHaveBeenCalled();
|
||||
expect(report.diagnosticsSession).toBeNull();
|
||||
});
|
||||
|
||||
it('handles telemetry query failure gracefully', async () => {
|
||||
vi.mocked(telemetryClusters).mockRejectedValue(new Error('upstream timeout'));
|
||||
|
||||
const report = await runIncidentPipeline(baseBrief, { log: mockLog });
|
||||
|
||||
expect(report.clusterCount).toBe(0);
|
||||
expect(report.markdownReport).toContain('upstream timeout');
|
||||
});
|
||||
|
||||
it('logs runId + stepId on every step', async () => {
|
||||
await runIncidentPipeline(baseBrief, { log: mockLog });
|
||||
|
||||
const infoCalls = vi.mocked(mockLog.info).mock.calls;
|
||||
const stepNames = infoCalls.map(([obj]) => (obj as Record<string, unknown>).a2aStep);
|
||||
expect(stepNames).toContain('pipeline.start');
|
||||
expect(stepNames).toContain('dispatcher.complete');
|
||||
expect(stepNames).toContain('telemetry_analyst.complete');
|
||||
expect(stepNames).toContain('pipeline.complete');
|
||||
});
|
||||
});
|
||||
|
||||
describe('dispatcher', () => {
|
||||
it('defaults time window to last 24h when not provided', async () => {
|
||||
const before = Date.now();
|
||||
const report = await runIncidentPipeline(baseBrief, { log: mockLog });
|
||||
const after = Date.now();
|
||||
|
||||
const clusterCall = vi.mocked(telemetryClusters).mock.calls[0][0] as Record<string, string>;
|
||||
const fromTs = new Date(clusterCall.from).getTime();
|
||||
const toTs = new Date(clusterCall.to).getTime();
|
||||
|
||||
expect(toTs).toBeGreaterThanOrEqual(before);
|
||||
expect(toTs).toBeLessThanOrEqual(after + 100);
|
||||
expect(toTs - fromTs).toBeGreaterThanOrEqual(24 * 60 * 60 * 1000 - 100);
|
||||
expect(report).toBeDefined();
|
||||
});
|
||||
});
|
||||
103
services/mcp-server/src/modules/a2a/runner.ts
Normal file
103
services/mcp-server/src/modules/a2a/runner.ts
Normal file
@ -0,0 +1,103 @@
|
||||
/**
|
||||
* A2A Pipeline Runner — executes the incident pipeline in sequence:
|
||||
* DispatcherAgent → TelemetryAnalystAgent → DiagnosticsOrchestratorAgent → ReportWriterAgent
|
||||
*
|
||||
* Every step transition is logged with runId + stepId for full audit tracing.
|
||||
* Phase 1: handoffs are persisted via structured req.log entries.
|
||||
* Phase 2+: handoffs will additionally be written to a Cosmos container.
|
||||
*/
|
||||
|
||||
import { randomUUID } from 'node:crypto';
|
||||
import type { McpLogger } from '../tools/types.js';
|
||||
import type { SupportIncidentBrief, FinalIncidentReport } from './types.js';
|
||||
import { dispatch } from './agents/dispatcher.js';
|
||||
import { analyze } from './agents/telemetry-analyst.js';
|
||||
import { orchestrate } from './agents/diagnostics-orchestrator.js';
|
||||
import { writeReport } from './agents/report-writer.js';
|
||||
|
||||
export interface PipelineOptions {
|
||||
token?: string;
|
||||
requestId?: string;
|
||||
initiatedBy?: string;
|
||||
log: McpLogger;
|
||||
}
|
||||
|
||||
export async function runIncidentPipeline(
|
||||
brief: SupportIncidentBrief,
|
||||
opts: PipelineOptions
|
||||
): Promise<FinalIncidentReport> {
|
||||
const runId = `run_${randomUUID()}`;
|
||||
const { log } = opts;
|
||||
|
||||
log.info(
|
||||
{ runId, productId: brief.productId, a2aStep: 'pipeline.start' },
|
||||
'A2A pipeline started'
|
||||
);
|
||||
|
||||
// ── Step 1: Dispatcher ──────────────────────────────────────────────────
|
||||
const decision = dispatch(brief, {
|
||||
runId,
|
||||
requestId: opts.requestId,
|
||||
initiatedBy: opts.initiatedBy,
|
||||
});
|
||||
|
||||
log.info(
|
||||
{
|
||||
runId,
|
||||
stepId: decision.runContext.stepId,
|
||||
a2aStep: 'dispatcher.complete',
|
||||
steps: decision.steps,
|
||||
resolvedTimeWindow: decision.resolvedTimeWindow,
|
||||
},
|
||||
'Dispatcher decided pipeline steps'
|
||||
);
|
||||
|
||||
// ── Step 2: Telemetry Analyst ───────────────────────────────────────────
|
||||
const findings = await analyze(decision, { token: opts.token });
|
||||
|
||||
log.info(
|
||||
{
|
||||
runId,
|
||||
stepId: findings.runContext.stepId,
|
||||
a2aStep: 'telemetry_analyst.complete',
|
||||
clusterCount: findings.clusters.length,
|
||||
hypothesisCount: findings.topHypotheses.length,
|
||||
queryError: findings.queryError,
|
||||
},
|
||||
'Telemetry analyst completed'
|
||||
);
|
||||
|
||||
// ── Step 3: Diagnostics Orchestrator (conditional) ─────────────────────
|
||||
let diagResult = null;
|
||||
if (decision.steps.includes('diagnostics_orchestrator')) {
|
||||
diagResult = await orchestrate(decision, findings, { token: opts.token });
|
||||
|
||||
log.info(
|
||||
{
|
||||
runId,
|
||||
stepId: diagResult.runContext.stepId,
|
||||
a2aStep: 'diagnostics_orchestrator.complete',
|
||||
skipped: diagResult.skipped,
|
||||
sessionId: diagResult.session?.id,
|
||||
sessionError: diagResult.sessionError,
|
||||
},
|
||||
'Diagnostics orchestrator completed'
|
||||
);
|
||||
}
|
||||
|
||||
// ── Step 4: Report Writer ───────────────────────────────────────────────
|
||||
const report = writeReport(decision, findings, diagResult);
|
||||
|
||||
log.info(
|
||||
{
|
||||
runId,
|
||||
a2aStep: 'pipeline.complete',
|
||||
productId: brief.productId,
|
||||
clusterCount: report.clusterCount,
|
||||
hasSession: !!report.diagnosticsSession,
|
||||
},
|
||||
'A2A pipeline completed'
|
||||
);
|
||||
|
||||
return report;
|
||||
}
|
||||
112
services/mcp-server/src/modules/a2a/types.ts
Normal file
112
services/mcp-server/src/modules/a2a/types.ts
Normal file
@ -0,0 +1,112 @@
|
||||
/**
|
||||
* A2A Orchestration — Handoff artifact schemas and types.
|
||||
*
|
||||
* Every artifact carries a stable `runId` + `stepId` for audit tracing.
|
||||
* All artifacts are explicitly scoped to a `productId`.
|
||||
*/
|
||||
|
||||
import { z } from 'zod';
|
||||
|
||||
// ── Pipeline run identity ──────────────────────────────────────────────────
|
||||
|
||||
export const RunContextSchema = z.object({
|
||||
runId: z.string().describe('Stable ID for the entire pipeline run'),
|
||||
stepId: z.string().describe('ID for this individual step'),
|
||||
productId: z.string().min(1),
|
||||
requestId: z.string().optional(),
|
||||
initiatedBy: z.string().optional().describe('userId who triggered the pipeline'),
|
||||
});
|
||||
export type RunContext = z.infer<typeof RunContextSchema>;
|
||||
|
||||
// ── A) SupportIncidentBrief — input to DispatcherAgent ───────────────────
|
||||
|
||||
export const UserReportSchema = z.object({
|
||||
summary: z.string().min(1).describe('Human-readable description of the issue'),
|
||||
platform: z.string().optional().describe('e.g. ios, android, web'),
|
||||
channel: z.string().optional().describe('e.g. keyboard_extension, dictation'),
|
||||
appVersion: z.string().optional(),
|
||||
buildNumber: z.string().optional(),
|
||||
userId: z.string().optional(),
|
||||
anonymousInstallId: z.string().optional(),
|
||||
});
|
||||
export type UserReport = z.infer<typeof UserReportSchema>;
|
||||
|
||||
export const SupportIncidentBriefSchema = z.object({
|
||||
productId: z.string().min(1),
|
||||
userReport: UserReportSchema,
|
||||
timeWindow: z
|
||||
.object({
|
||||
from: z.string().datetime().optional(),
|
||||
to: z.string().datetime().optional(),
|
||||
})
|
||||
.default({}),
|
||||
openDiagnosticsSession: z
|
||||
.boolean()
|
||||
.default(false)
|
||||
.describe('If true and userId/anonymousInstallId present, open a live diagnostics session'),
|
||||
});
|
||||
export type SupportIncidentBrief = z.infer<typeof SupportIncidentBriefSchema>;
|
||||
|
||||
// ── B) DispatchDecision — output of DispatcherAgent ───────────────────────
|
||||
|
||||
export interface DispatchDecision {
|
||||
runContext: RunContext;
|
||||
brief: SupportIncidentBrief;
|
||||
steps: ('telemetry_analyst' | 'diagnostics_orchestrator' | 'report_writer')[];
|
||||
/** Resolved time window (fills defaults if omitted in brief) */
|
||||
resolvedTimeWindow: { from: string; to: string };
|
||||
}
|
||||
|
||||
// ── C) TelemetryFindings — output of TelemetryAnalystAgent ───────────────
|
||||
|
||||
export interface ClusterRef {
|
||||
clusterId: string;
|
||||
pk: string;
|
||||
fingerprint: string;
|
||||
severity: string;
|
||||
totalCount: number;
|
||||
lastSeenAt: string;
|
||||
platform: string;
|
||||
module: string;
|
||||
sampleMessage?: string;
|
||||
}
|
||||
|
||||
export interface TelemetryFindings {
|
||||
runContext: RunContext;
|
||||
clusters: ClusterRef[];
|
||||
topHypotheses: string[];
|
||||
recommendedActions: string[];
|
||||
queryError?: string;
|
||||
}
|
||||
|
||||
// ── D) DiagnosticsSessionResult — output of DiagnosticsOrchestratorAgent ─
|
||||
|
||||
export interface DiagnosticsSessionResult {
|
||||
runContext: RunContext;
|
||||
skipped: boolean;
|
||||
skipReason?: string;
|
||||
session?: {
|
||||
id: string;
|
||||
status: string;
|
||||
expiresAt: string;
|
||||
collectionLevel: string;
|
||||
targetUserId?: string;
|
||||
targetAnonymousId?: string;
|
||||
};
|
||||
sessionError?: string;
|
||||
}
|
||||
|
||||
// ── E) FinalIncidentReport — output of ReportWriterAgent ─────────────────
|
||||
|
||||
export interface FinalIncidentReport {
|
||||
runId: string;
|
||||
productId: string;
|
||||
generatedAt: string;
|
||||
summary: string;
|
||||
clusterCount: number;
|
||||
clusters: ClusterRef[];
|
||||
topHypotheses: string[];
|
||||
recommendedActions: string[];
|
||||
diagnosticsSession: DiagnosticsSessionResult['session'] | null;
|
||||
markdownReport: string;
|
||||
}
|
||||
@ -42,7 +42,13 @@ export async function toolRoutes(app: FastifyInstance) {
|
||||
);
|
||||
|
||||
try {
|
||||
const result = await tool.execute(parsed.data, req as unknown as McpToolRequest);
|
||||
const mcpReq: McpToolRequest = {
|
||||
id: req.id,
|
||||
headers: { authorization: req.headers.authorization },
|
||||
jwtPayload: req.jwtPayload,
|
||||
log: req.log,
|
||||
};
|
||||
const result = await tool.execute(parsed.data, mcpReq);
|
||||
return {
|
||||
content: [{ type: 'text', text: JSON.stringify(result, null, 2) }],
|
||||
};
|
||||
|
||||
@ -1,5 +1,13 @@
|
||||
import type { ZodTypeAny, z } from 'zod';
|
||||
|
||||
/** Minimal structured logger — matches Fastify's pino-based logger shape */
|
||||
export interface McpLogger {
|
||||
info(obj: Record<string, unknown>, msg?: string): void;
|
||||
warn(obj: Record<string, unknown>, msg?: string): void;
|
||||
error(obj: Record<string, unknown>, msg?: string): void;
|
||||
debug(obj: Record<string, unknown>, msg?: string): void;
|
||||
}
|
||||
|
||||
/**
|
||||
* Minimal request context passed to tool execute functions.
|
||||
* Avoids a direct FastifyRequest dependency inside tool modules.
|
||||
@ -11,6 +19,8 @@ export interface McpToolRequest {
|
||||
headers: { authorization?: string | undefined };
|
||||
/** JWT payload decoded by the auth hook */
|
||||
jwtPayload?: { sub?: string; role?: string; productId?: string };
|
||||
/** Structured logger (child of Fastify request logger, carries requestId) */
|
||||
log: McpLogger;
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
@ -22,6 +22,7 @@ import './modules/platform/telemetry-policy-tools.js';
|
||||
import './modules/platform/diagnostics-tools.js';
|
||||
import './modules/extraction/extraction-tools.js';
|
||||
import './modules/support/debug-pack.js';
|
||||
import './modules/a2a/pipeline-tool.js';
|
||||
|
||||
const app = await createServiceApp({
|
||||
name: 'mcp-server',
|
||||
|
||||
Loading…
Reference in New Issue
Block a user