diff --git a/docs/MCP+A2A/EXECUTION_CHECKLIST.md b/docs/MCP+A2A/EXECUTION_CHECKLIST.md index f1d7fe59..76799646 100644 --- a/docs/MCP+A2A/EXECUTION_CHECKLIST.md +++ b/docs/MCP+A2A/EXECUTION_CHECKLIST.md @@ -47,7 +47,26 @@ This is the “ready to start building” checklist that turns the docs in this - [x] Python sidecar health visible via `extraction.sidecarHealth` - [ ] End-to-end integration test with real platform-service (Phase 2) -## 5) Phase 2+ quick sanity checks +## 5) Phase 2 — A2A orchestration ✅ COMPLETE + +### Milestone D (A2A runner + handoff schemas) ✅ — d1d643f+ + +- [x] Handoff artifact schemas (Zod): `SupportIncidentBrief`, `TelemetryFindings`, `DiagnosticsSessionResult`, `FinalIncidentReport` +- [x] `DispatcherAgent` — validates brief, fills default 24h time window, decides which steps to run +- [x] `TelemetryAnalystAgent` — queries clusters, filters by platform, derives hypotheses + actions +- [x] `DiagnosticsOrchestratorAgent` — conditionally creates expiring session based on findings severity +- [x] `ReportWriterAgent` — assembles structured `FinalIncidentReport` + full markdown report +- [x] `runIncidentPipeline` runner — logs every step with stable `runId` + `stepId` (audit tracing) +- [x] `support.runIncidentPipeline` MCP tool — single callable entry point for the pipeline +- [x] `McpToolRequest.log` — structured logger added; `routes.ts` passes `req.log` to all tool calls +- [x] 6 Vitest tests covering: no-session run, session creation, skip when no target, upstream failure, log audit, time window defaulting + +### Acceptance criteria ✅ + +- [x] "Support Debug Pack" runs end-to-end via Dispatcher → Telemetry Analyst → Diagnostics Orchestrator → Report Writer +- [x] Every handoff is logged with stable `runId` + `stepId` (Phase 1: logs; Phase 2+: Cosmos container) + +## 6) Phase 3+ quick sanity checks - If you make telemetry policy-aware clients: - ensure `GET /api/telemetry/config` consumption is cached (ETag) and privacy-safe. diff --git a/services/mcp-server/src/modules/a2a/agents/diagnostics-orchestrator.ts b/services/mcp-server/src/modules/a2a/agents/diagnostics-orchestrator.ts new file mode 100644 index 00000000..fb4f25f9 --- /dev/null +++ b/services/mcp-server/src/modules/a2a/agents/diagnostics-orchestrator.ts @@ -0,0 +1,78 @@ +/** + * DiagnosticsOrchestratorAgent — optionally opens a live remote diagnostics + * session for the affected user / install ID based on TelemetryFindings. + */ + +import { randomUUID } from 'node:crypto'; +import { diagnosticsCreateSession } from '../../../lib/platform-client.js'; +import type { DispatchDecision, TelemetryFindings, DiagnosticsSessionResult } from '../types.js'; + +interface CreatedSession { + id: string; + status: string; + expiresAt: string; + collectionLevel?: string; + targetUserId?: string; + targetAnonymousId?: string; +} + +export async function orchestrate( + decision: DispatchDecision, + findings: TelemetryFindings, + opts: { token?: string } +): Promise { + const stepId = `step_diag_${randomUUID().slice(0, 8)}`; + const ctx = { ...findings.runContext, stepId }; + + const { brief } = decision; + const hasTarget = !!(brief.userReport.userId || brief.userReport.anonymousInstallId); + + // Skip if not requested or no target + if (!brief.openDiagnosticsSession || !hasTarget) { + return { + runContext: ctx, + skipped: true, + skipReason: brief.openDiagnosticsSession + ? 'No userId or anonymousInstallId provided — cannot target a diagnostics session' + : 'openDiagnosticsSession is false — skipping session creation', + }; + } + + // Choose collection level based on findings severity + const hasFatal = findings.clusters.some(c => c.severity === 'fatal'); + const collectionLevel = hasFatal ? 'trace' : 'debug'; + + try { + const session = (await diagnosticsCreateSession( + { + productId: brief.productId, + targetUserId: brief.userReport.userId, + targetAnonymousId: brief.userReport.anonymousInstallId, + collectionLevel, + captureLogs: true, + captureNetwork: true, + maxDurationMinutes: 30, + }, + { token: opts.token, requestId: findings.runContext.requestId, productId: brief.productId } + )) as CreatedSession; + + return { + runContext: ctx, + skipped: false, + session: { + id: session.id, + status: session.status, + expiresAt: session.expiresAt, + collectionLevel: session.collectionLevel ?? collectionLevel, + targetUserId: brief.userReport.userId, + targetAnonymousId: brief.userReport.anonymousInstallId, + }, + }; + } catch (err) { + return { + runContext: ctx, + skipped: false, + sessionError: err instanceof Error ? err.message : String(err), + }; + } +} diff --git a/services/mcp-server/src/modules/a2a/agents/dispatcher.ts b/services/mcp-server/src/modules/a2a/agents/dispatcher.ts new file mode 100644 index 00000000..5ff24c6d --- /dev/null +++ b/services/mcp-server/src/modules/a2a/agents/dispatcher.ts @@ -0,0 +1,44 @@ +/** + * DispatcherAgent — validates the SupportIncidentBrief, fills in defaults, + * and decides which downstream agents to activate. + */ + +import { randomUUID } from 'node:crypto'; +import type { SupportIncidentBrief, DispatchDecision, RunContext } from '../types.js'; + +/** Default look-back when no timeWindow provided — 24 hours */ +const DEFAULT_LOOKBACK_MS = 24 * 60 * 60 * 1000; + +export function dispatch( + brief: SupportIncidentBrief, + opts: { runId: string; requestId?: string; initiatedBy?: string } +): DispatchDecision { + const stepId = `step_dispatch_${randomUUID().slice(0, 8)}`; + + const runContext: RunContext = { + runId: opts.runId, + stepId, + productId: brief.productId, + requestId: opts.requestId, + initiatedBy: opts.initiatedBy, + }; + + // Resolve time window — default to last 24 h if not provided + const now = new Date(); + const resolvedTimeWindow = { + to: brief.timeWindow.to ?? now.toISOString(), + from: brief.timeWindow.from ?? new Date(now.getTime() - DEFAULT_LOOKBACK_MS).toISOString(), + }; + + // Decide which agents to run + const steps: DispatchDecision['steps'] = ['telemetry_analyst']; + + const hasTarget = !!(brief.userReport.userId || brief.userReport.anonymousInstallId); + if (brief.openDiagnosticsSession && hasTarget) { + steps.push('diagnostics_orchestrator'); + } + + steps.push('report_writer'); + + return { runContext, brief, steps, resolvedTimeWindow }; +} diff --git a/services/mcp-server/src/modules/a2a/agents/report-writer.ts b/services/mcp-server/src/modules/a2a/agents/report-writer.ts new file mode 100644 index 00000000..bebb7be3 --- /dev/null +++ b/services/mcp-server/src/modules/a2a/agents/report-writer.ts @@ -0,0 +1,100 @@ +/** + * ReportWriterAgent — assembles all step outputs into a FinalIncidentReport + * with a stable runId and a markdown summary. + */ + +import type { + DispatchDecision, + TelemetryFindings, + DiagnosticsSessionResult, + FinalIncidentReport, +} from '../types.js'; + +export function writeReport( + decision: DispatchDecision, + findings: TelemetryFindings, + diagResult: DiagnosticsSessionResult | null +): FinalIncidentReport { + const { runContext, brief } = decision; + const generatedAt = new Date().toISOString(); + + const lines: string[] = [ + `# Incident Report — ${brief.productId}`, + `**Run ID:** \`${runContext.runId}\``, + `**Generated:** ${generatedAt}`, + `**Reported issue:** ${brief.userReport.summary}`, + brief.userReport.platform ? `**Platform:** ${brief.userReport.platform}` : '', + brief.userReport.appVersion ? `**App version:** ${brief.userReport.appVersion}` : '', + '', + `## Time Window`, + `- From: ${decision.resolvedTimeWindow.from}`, + `- To: ${decision.resolvedTimeWindow.to}`, + '', + `## Telemetry Findings (${findings.clusters.length} open cluster${findings.clusters.length !== 1 ? 's' : ''})`, + ]; + + if (findings.queryError) { + lines.push(`> ⚠️ Telemetry query failed: ${findings.queryError}`); + } else if (findings.clusters.length === 0) { + lines.push('No open error clusters found in this time window.'); + } else { + for (const c of findings.clusters.slice(0, 10)) { + lines.push( + `- **[${c.severity.toUpperCase()}]** \`${c.clusterId}\` — ${c.totalCount} events` + + ` | platform: ${c.platform} | module: ${c.module}` + + (c.sampleMessage ? ` | "${c.sampleMessage}"` : '') + ); + } + if (findings.clusters.length > 10) { + lines.push(` … and ${findings.clusters.length - 10} more clusters`); + } + } + + lines.push(''); + lines.push('## Top Hypotheses'); + if (findings.topHypotheses.length === 0) { + lines.push('No hypotheses generated.'); + } else { + for (const h of findings.topHypotheses) lines.push(`- ${h}`); + } + + lines.push(''); + lines.push('## Recommended Actions'); + if (findings.recommendedActions.length === 0) { + lines.push('No actions recommended.'); + } else { + for (const a of findings.recommendedActions) lines.push(`- ${a}`); + } + + lines.push(''); + lines.push('## Diagnostics Session'); + if (!diagResult || diagResult.skipped) { + lines.push(diagResult?.skipReason ? `Skipped: ${diagResult.skipReason}` : 'Not requested.'); + } else if (diagResult.sessionError) { + lines.push(`> ⚠️ Session creation failed: ${diagResult.sessionError}`); + } else if (diagResult.session) { + const s = diagResult.session; + lines.push(`- **Session ID:** \`${s.id}\``); + lines.push(`- **Status:** ${s.status}`); + lines.push(`- **Collection level:** ${s.collectionLevel}`); + lines.push(`- **Expires:** ${s.expiresAt}`); + if (s.targetUserId) lines.push(`- **Target user:** ${s.targetUserId}`); + } + + lines.push(''); + lines.push('---'); + lines.push(`*Generated by ByteLyst A2A incident pipeline · run \`${runContext.runId}\`*`); + + return { + runId: runContext.runId, + productId: brief.productId, + generatedAt, + summary: brief.userReport.summary, + clusterCount: findings.clusters.length, + clusters: findings.clusters, + topHypotheses: findings.topHypotheses, + recommendedActions: findings.recommendedActions, + diagnosticsSession: diagResult?.session ?? null, + markdownReport: lines.filter(l => l !== '').join('\n'), + }; +} diff --git a/services/mcp-server/src/modules/a2a/agents/telemetry-analyst.ts b/services/mcp-server/src/modules/a2a/agents/telemetry-analyst.ts new file mode 100644 index 00000000..9dcd12df --- /dev/null +++ b/services/mcp-server/src/modules/a2a/agents/telemetry-analyst.ts @@ -0,0 +1,166 @@ +/** + * TelemetryAnalystAgent — queries telemetry clusters for the incident time + * window and produces structured TelemetryFindings with hypotheses and + * recommended next actions. + */ + +import { randomUUID } from 'node:crypto'; +import { telemetryClusters } from '../../../lib/platform-client.js'; +import type { DispatchDecision, TelemetryFindings, ClusterRef } from '../types.js'; + +interface RawCluster { + id?: string; + pk?: string; + fingerprint?: string; + severity?: string; + totalCount?: number; + lastSeenAt?: string; + platform?: string; + module?: string; + sampleMessage?: string; + status?: string; +} + +export async function analyze( + decision: DispatchDecision, + opts: { token?: string } +): Promise { + const { runContext, brief, resolvedTimeWindow } = decision; + const stepId = `step_telemetry_${randomUUID().slice(0, 8)}`; + const ctx = { ...runContext, stepId }; + + let clusters: ClusterRef[] = []; + let queryError: string | undefined; + + try { + const result = await telemetryClusters( + { productId: brief.productId, from: resolvedTimeWindow.from, to: resolvedTimeWindow.to }, + { token: opts.token, requestId: runContext.requestId, productId: brief.productId } + ); + + const raw: RawCluster[] = (result as { clusters?: RawCluster[] }).clusters ?? []; + + clusters = raw + .filter(c => c.status !== 'resolved' && c.status !== 'ignored') + .map(c => ({ + clusterId: c.id ?? '', + pk: c.pk ?? '', + fingerprint: c.fingerprint ?? '', + severity: c.severity ?? 'error', + totalCount: c.totalCount ?? 0, + lastSeenAt: c.lastSeenAt ?? '', + platform: c.platform ?? '', + module: c.module ?? '', + sampleMessage: c.sampleMessage, + })); + + // Filter by platform if the incident report names one + if (brief.userReport.platform) { + const p = brief.userReport.platform.toLowerCase(); + const platformClusters = clusters.filter(c => c.platform.toLowerCase() === p); + if (platformClusters.length > 0) clusters = platformClusters; + } + + // Sort by totalCount descending + clusters.sort((a, b) => b.totalCount - a.totalCount); + } catch (err) { + queryError = err instanceof Error ? err.message : String(err); + } + + const topHypotheses = deriveHypotheses(clusters, brief.userReport.summary); + const recommendedActions = deriveActions(clusters, decision); + + return { runContext: ctx, clusters, topHypotheses, recommendedActions, queryError }; +} + +// ── Heuristic hypothesis generator ──────────────────────────────────────── + +function deriveHypotheses(clusters: ClusterRef[], summary: string): string[] { + const hypotheses: string[] = []; + const lower = summary.toLowerCase(); + + if (clusters.length === 0) { + hypotheses.push('No open error clusters found — issue may be intermittent or not yet surfaced'); + return hypotheses; + } + + const fatalClusters = clusters.filter(c => c.severity === 'fatal'); + if (fatalClusters.length > 0) { + hypotheses.push( + `${fatalClusters.length} fatal cluster(s) detected — highest priority for investigation` + ); + } + + const topCluster = clusters[0]; + if (topCluster.sampleMessage) { + hypotheses.push(`Top cluster message: "${topCluster.sampleMessage}"`); + } + + if (lower.includes('dictation') || lower.includes('speech') || lower.includes('transcri')) { + hypotheses.push('Audio permission or microphone access may be revoked'); + hypotheses.push('Speech-to-text pipeline may have a timeout or encoding error'); + } + + if (lower.includes('insert') || lower.includes('paste') || lower.includes('nothing')) { + hypotheses.push('Text insertion API returned a no-op — possible permission or focus issue'); + } + + if (lower.includes('crash') || lower.includes('fatal')) { + hypotheses.push('Memory pressure or unexpected nil dereference in affected module'); + } + + const modules = [ + ...new Set( + clusters + .slice(0, 5) + .map(c => c.module) + .filter(Boolean) + ), + ]; + if (modules.length > 0) { + hypotheses.push(`Affected modules: ${modules.join(', ')}`); + } + + return hypotheses.slice(0, 5); +} + +function deriveActions(clusters: ClusterRef[], decision: DispatchDecision): string[] { + const actions: string[] = []; + + if (clusters.length === 0) { + actions.push('Widen time window or check alternate platforms'); + actions.push('Verify telemetry is enabled and events are being ingested'); + return actions; + } + + const hasTarget = !!( + decision.brief.userReport.userId || decision.brief.userReport.anonymousInstallId + ); + + if (hasTarget && !decision.brief.openDiagnosticsSession) { + actions.push( + 'Re-run with openDiagnosticsSession: true to capture a live trace for the affected user' + ); + } + + const fatalClusters = clusters.filter(c => c.severity === 'fatal'); + if (fatalClusters.length > 0) { + actions.push( + `Resolve or investigate ${fatalClusters.length} fatal cluster(s): ` + + fatalClusters + .slice(0, 3) + .map(c => c.clusterId) + .join(', ') + ); + } + + if (clusters.length >= 3) { + actions.push( + 'Consider creating a targeted telemetry policy to capture debug-level events for affected module' + ); + } + + actions.push(`Review top cluster (${clusters[0].clusterId}) in admin telemetry explorer`); + + return actions.slice(0, 5); +} diff --git a/services/mcp-server/src/modules/a2a/pipeline-tool.ts b/services/mcp-server/src/modules/a2a/pipeline-tool.ts new file mode 100644 index 00000000..6de6e2e3 --- /dev/null +++ b/services/mcp-server/src/modules/a2a/pipeline-tool.ts @@ -0,0 +1,42 @@ +/** + * MCP tool: support.runIncidentPipeline + * + * Wraps the full A2A incident pipeline as a single callable MCP tool. + * Agents run in sequence: Dispatcher → TelemetryAnalyst → DiagnosticsOrchestrator → ReportWriter. + */ + +import { z } from 'zod'; +import { registerTool } from '../tools/registry.js'; +import { SupportIncidentBriefSchema } from './types.js'; +import { runIncidentPipeline } from './runner.js'; +import type { McpToolRequest } from '../tools/types.js'; + +registerTool({ + name: 'support.runIncidentPipeline', + description: [ + 'A2A incident pipeline: Dispatcher → TelemetryAnalyst → DiagnosticsOrchestrator → ReportWriter.', + 'Provide a productId + userReport describing the issue. The pipeline queries telemetry clusters,', + 'generates hypotheses and recommended actions, optionally opens a live diagnostics session', + '(set openDiagnosticsSession: true + userId/anonymousInstallId), and returns a FinalIncidentReport', + 'with a markdown summary. Every step is logged with a stable runId for audit tracing.', + 'Requires admin role.', + ].join(' '), + requiredRole: 'admin', + inputSchema: SupportIncidentBriefSchema.extend({ + openDiagnosticsSession: z + .boolean() + .default(false) + .describe( + 'Open a live diagnostics session for the affected user (requires userId or anonymousInstallId)' + ), + }), + async execute(args, req: McpToolRequest) { + const token = req.headers.authorization?.slice(7); + return runIncidentPipeline(args, { + token, + requestId: req.id, + initiatedBy: req.jwtPayload?.sub, + log: req.log, + }); + }, +}); diff --git a/services/mcp-server/src/modules/a2a/runner.test.ts b/services/mcp-server/src/modules/a2a/runner.test.ts new file mode 100644 index 00000000..d9a57c91 --- /dev/null +++ b/services/mcp-server/src/modules/a2a/runner.test.ts @@ -0,0 +1,130 @@ +import { describe, it, expect, vi, beforeEach } from 'vitest'; +import type { McpLogger } from '../tools/types.js'; + +// ── Mock platform-client before importing runner ─────────────────────────── +vi.mock('../../lib/platform-client.js', () => ({ + telemetryClusters: vi.fn(), + diagnosticsCreateSession: vi.fn(), +})); + +import { telemetryClusters, diagnosticsCreateSession } from '../../lib/platform-client.js'; +import { runIncidentPipeline } from './runner.js'; +import type { SupportIncidentBrief } from './types.js'; + +const mockLog: McpLogger = { + info: vi.fn(), + warn: vi.fn(), + error: vi.fn(), + debug: vi.fn(), +}; + +const baseBrief: SupportIncidentBrief = { + productId: 'lysnrai', + userReport: { summary: 'Dictation inserts nothing in Messages', platform: 'ios' }, + timeWindow: {}, + openDiagnosticsSession: false, +}; + +const mockClusters = [ + { + id: 'abc123:202503', + pk: 'lysnrai:ios:keyboard', + fingerprint: 'abc123', + severity: 'error', + totalCount: 42, + lastSeenAt: '2026-03-05T10:00:00Z', + platform: 'ios', + module: 'keyboard', + sampleMessage: 'insertText returned noop', + status: 'open', + }, +]; + +beforeEach(() => { + vi.clearAllMocks(); + vi.mocked(telemetryClusters).mockResolvedValue({ clusters: mockClusters } as never); + vi.mocked(diagnosticsCreateSession).mockResolvedValue({ + id: 'sess_001', + status: 'active', + expiresAt: '2026-03-05T11:00:00Z', + collectionLevel: 'debug', + } as never); +}); + +describe('A2A incident pipeline', () => { + it('runs dispatcher → telemetry analyst → report writer (no diagnostics)', async () => { + const report = await runIncidentPipeline(baseBrief, { + log: mockLog, + requestId: 'req_test', + }); + + expect(report.runId).toMatch(/^run_/); + expect(report.productId).toBe('lysnrai'); + expect(report.clusterCount).toBe(1); + expect(report.clusters[0].clusterId).toBe('abc123:202503'); + expect(report.diagnosticsSession).toBeNull(); + expect(report.markdownReport).toContain('abc123:202503'); + expect(report.topHypotheses.length).toBeGreaterThan(0); + expect(report.recommendedActions.length).toBeGreaterThan(0); + expect(diagnosticsCreateSession).not.toHaveBeenCalled(); + }); + + it('creates a diagnostics session when openDiagnosticsSession is true + userId provided', async () => { + const brief: SupportIncidentBrief = { + ...baseBrief, + userReport: { ...baseBrief.userReport, userId: 'usr_abc' }, + openDiagnosticsSession: true, + }; + + const report = await runIncidentPipeline(brief, { log: mockLog }); + + expect(diagnosticsCreateSession).toHaveBeenCalledOnce(); + expect(report.diagnosticsSession?.id).toBe('sess_001'); + expect(report.markdownReport).toContain('sess_001'); + }); + + it('skips diagnostics session when no userId/anonymousInstallId', async () => { + const brief: SupportIncidentBrief = { ...baseBrief, openDiagnosticsSession: true }; + const report = await runIncidentPipeline(brief, { log: mockLog }); + + expect(diagnosticsCreateSession).not.toHaveBeenCalled(); + expect(report.diagnosticsSession).toBeNull(); + }); + + it('handles telemetry query failure gracefully', async () => { + vi.mocked(telemetryClusters).mockRejectedValue(new Error('upstream timeout')); + + const report = await runIncidentPipeline(baseBrief, { log: mockLog }); + + expect(report.clusterCount).toBe(0); + expect(report.markdownReport).toContain('upstream timeout'); + }); + + it('logs runId + stepId on every step', async () => { + await runIncidentPipeline(baseBrief, { log: mockLog }); + + const infoCalls = vi.mocked(mockLog.info).mock.calls; + const stepNames = infoCalls.map(([obj]) => (obj as Record).a2aStep); + expect(stepNames).toContain('pipeline.start'); + expect(stepNames).toContain('dispatcher.complete'); + expect(stepNames).toContain('telemetry_analyst.complete'); + expect(stepNames).toContain('pipeline.complete'); + }); +}); + +describe('dispatcher', () => { + it('defaults time window to last 24h when not provided', async () => { + const before = Date.now(); + const report = await runIncidentPipeline(baseBrief, { log: mockLog }); + const after = Date.now(); + + const clusterCall = vi.mocked(telemetryClusters).mock.calls[0][0] as Record; + const fromTs = new Date(clusterCall.from).getTime(); + const toTs = new Date(clusterCall.to).getTime(); + + expect(toTs).toBeGreaterThanOrEqual(before); + expect(toTs).toBeLessThanOrEqual(after + 100); + expect(toTs - fromTs).toBeGreaterThanOrEqual(24 * 60 * 60 * 1000 - 100); + expect(report).toBeDefined(); + }); +}); diff --git a/services/mcp-server/src/modules/a2a/runner.ts b/services/mcp-server/src/modules/a2a/runner.ts new file mode 100644 index 00000000..5db7eb30 --- /dev/null +++ b/services/mcp-server/src/modules/a2a/runner.ts @@ -0,0 +1,103 @@ +/** + * A2A Pipeline Runner — executes the incident pipeline in sequence: + * DispatcherAgent → TelemetryAnalystAgent → DiagnosticsOrchestratorAgent → ReportWriterAgent + * + * Every step transition is logged with runId + stepId for full audit tracing. + * Phase 1: handoffs are persisted via structured req.log entries. + * Phase 2+: handoffs will additionally be written to a Cosmos container. + */ + +import { randomUUID } from 'node:crypto'; +import type { McpLogger } from '../tools/types.js'; +import type { SupportIncidentBrief, FinalIncidentReport } from './types.js'; +import { dispatch } from './agents/dispatcher.js'; +import { analyze } from './agents/telemetry-analyst.js'; +import { orchestrate } from './agents/diagnostics-orchestrator.js'; +import { writeReport } from './agents/report-writer.js'; + +export interface PipelineOptions { + token?: string; + requestId?: string; + initiatedBy?: string; + log: McpLogger; +} + +export async function runIncidentPipeline( + brief: SupportIncidentBrief, + opts: PipelineOptions +): Promise { + const runId = `run_${randomUUID()}`; + const { log } = opts; + + log.info( + { runId, productId: brief.productId, a2aStep: 'pipeline.start' }, + 'A2A pipeline started' + ); + + // ── Step 1: Dispatcher ────────────────────────────────────────────────── + const decision = dispatch(brief, { + runId, + requestId: opts.requestId, + initiatedBy: opts.initiatedBy, + }); + + log.info( + { + runId, + stepId: decision.runContext.stepId, + a2aStep: 'dispatcher.complete', + steps: decision.steps, + resolvedTimeWindow: decision.resolvedTimeWindow, + }, + 'Dispatcher decided pipeline steps' + ); + + // ── Step 2: Telemetry Analyst ─────────────────────────────────────────── + const findings = await analyze(decision, { token: opts.token }); + + log.info( + { + runId, + stepId: findings.runContext.stepId, + a2aStep: 'telemetry_analyst.complete', + clusterCount: findings.clusters.length, + hypothesisCount: findings.topHypotheses.length, + queryError: findings.queryError, + }, + 'Telemetry analyst completed' + ); + + // ── Step 3: Diagnostics Orchestrator (conditional) ───────────────────── + let diagResult = null; + if (decision.steps.includes('diagnostics_orchestrator')) { + diagResult = await orchestrate(decision, findings, { token: opts.token }); + + log.info( + { + runId, + stepId: diagResult.runContext.stepId, + a2aStep: 'diagnostics_orchestrator.complete', + skipped: diagResult.skipped, + sessionId: diagResult.session?.id, + sessionError: diagResult.sessionError, + }, + 'Diagnostics orchestrator completed' + ); + } + + // ── Step 4: Report Writer ─────────────────────────────────────────────── + const report = writeReport(decision, findings, diagResult); + + log.info( + { + runId, + a2aStep: 'pipeline.complete', + productId: brief.productId, + clusterCount: report.clusterCount, + hasSession: !!report.diagnosticsSession, + }, + 'A2A pipeline completed' + ); + + return report; +} diff --git a/services/mcp-server/src/modules/a2a/types.ts b/services/mcp-server/src/modules/a2a/types.ts new file mode 100644 index 00000000..339a02f6 --- /dev/null +++ b/services/mcp-server/src/modules/a2a/types.ts @@ -0,0 +1,112 @@ +/** + * A2A Orchestration — Handoff artifact schemas and types. + * + * Every artifact carries a stable `runId` + `stepId` for audit tracing. + * All artifacts are explicitly scoped to a `productId`. + */ + +import { z } from 'zod'; + +// ── Pipeline run identity ────────────────────────────────────────────────── + +export const RunContextSchema = z.object({ + runId: z.string().describe('Stable ID for the entire pipeline run'), + stepId: z.string().describe('ID for this individual step'), + productId: z.string().min(1), + requestId: z.string().optional(), + initiatedBy: z.string().optional().describe('userId who triggered the pipeline'), +}); +export type RunContext = z.infer; + +// ── A) SupportIncidentBrief — input to DispatcherAgent ─────────────────── + +export const UserReportSchema = z.object({ + summary: z.string().min(1).describe('Human-readable description of the issue'), + platform: z.string().optional().describe('e.g. ios, android, web'), + channel: z.string().optional().describe('e.g. keyboard_extension, dictation'), + appVersion: z.string().optional(), + buildNumber: z.string().optional(), + userId: z.string().optional(), + anonymousInstallId: z.string().optional(), +}); +export type UserReport = z.infer; + +export const SupportIncidentBriefSchema = z.object({ + productId: z.string().min(1), + userReport: UserReportSchema, + timeWindow: z + .object({ + from: z.string().datetime().optional(), + to: z.string().datetime().optional(), + }) + .default({}), + openDiagnosticsSession: z + .boolean() + .default(false) + .describe('If true and userId/anonymousInstallId present, open a live diagnostics session'), +}); +export type SupportIncidentBrief = z.infer; + +// ── B) DispatchDecision — output of DispatcherAgent ─────────────────────── + +export interface DispatchDecision { + runContext: RunContext; + brief: SupportIncidentBrief; + steps: ('telemetry_analyst' | 'diagnostics_orchestrator' | 'report_writer')[]; + /** Resolved time window (fills defaults if omitted in brief) */ + resolvedTimeWindow: { from: string; to: string }; +} + +// ── C) TelemetryFindings — output of TelemetryAnalystAgent ─────────────── + +export interface ClusterRef { + clusterId: string; + pk: string; + fingerprint: string; + severity: string; + totalCount: number; + lastSeenAt: string; + platform: string; + module: string; + sampleMessage?: string; +} + +export interface TelemetryFindings { + runContext: RunContext; + clusters: ClusterRef[]; + topHypotheses: string[]; + recommendedActions: string[]; + queryError?: string; +} + +// ── D) DiagnosticsSessionResult — output of DiagnosticsOrchestratorAgent ─ + +export interface DiagnosticsSessionResult { + runContext: RunContext; + skipped: boolean; + skipReason?: string; + session?: { + id: string; + status: string; + expiresAt: string; + collectionLevel: string; + targetUserId?: string; + targetAnonymousId?: string; + }; + sessionError?: string; +} + +// ── E) FinalIncidentReport — output of ReportWriterAgent ───────────────── + +export interface FinalIncidentReport { + runId: string; + productId: string; + generatedAt: string; + summary: string; + clusterCount: number; + clusters: ClusterRef[]; + topHypotheses: string[]; + recommendedActions: string[]; + diagnosticsSession: DiagnosticsSessionResult['session'] | null; + markdownReport: string; +} diff --git a/services/mcp-server/src/modules/tools/routes.ts b/services/mcp-server/src/modules/tools/routes.ts index c0e11771..3e3a2091 100644 --- a/services/mcp-server/src/modules/tools/routes.ts +++ b/services/mcp-server/src/modules/tools/routes.ts @@ -42,7 +42,13 @@ export async function toolRoutes(app: FastifyInstance) { ); try { - const result = await tool.execute(parsed.data, req as unknown as McpToolRequest); + const mcpReq: McpToolRequest = { + id: req.id, + headers: { authorization: req.headers.authorization }, + jwtPayload: req.jwtPayload, + log: req.log, + }; + const result = await tool.execute(parsed.data, mcpReq); return { content: [{ type: 'text', text: JSON.stringify(result, null, 2) }], }; diff --git a/services/mcp-server/src/modules/tools/types.ts b/services/mcp-server/src/modules/tools/types.ts index c5d51320..fdcd93b2 100644 --- a/services/mcp-server/src/modules/tools/types.ts +++ b/services/mcp-server/src/modules/tools/types.ts @@ -1,5 +1,13 @@ import type { ZodTypeAny, z } from 'zod'; +/** Minimal structured logger — matches Fastify's pino-based logger shape */ +export interface McpLogger { + info(obj: Record, msg?: string): void; + warn(obj: Record, msg?: string): void; + error(obj: Record, msg?: string): void; + debug(obj: Record, msg?: string): void; +} + /** * Minimal request context passed to tool execute functions. * Avoids a direct FastifyRequest dependency inside tool modules. @@ -11,6 +19,8 @@ export interface McpToolRequest { headers: { authorization?: string | undefined }; /** JWT payload decoded by the auth hook */ jwtPayload?: { sub?: string; role?: string; productId?: string }; + /** Structured logger (child of Fastify request logger, carries requestId) */ + log: McpLogger; } /** diff --git a/services/mcp-server/src/server.ts b/services/mcp-server/src/server.ts index ddacff64..2a1474b1 100644 --- a/services/mcp-server/src/server.ts +++ b/services/mcp-server/src/server.ts @@ -22,6 +22,7 @@ import './modules/platform/telemetry-policy-tools.js'; import './modules/platform/diagnostics-tools.js'; import './modules/extraction/extraction-tools.js'; import './modules/support/debug-pack.js'; +import './modules/a2a/pipeline-tool.js'; const app = await createServiceApp({ name: 'mcp-server',