diff --git a/services/mcp-server/src/lib/platform-client.ts b/services/mcp-server/src/lib/platform-client.ts index 122b7ac6..9246a993 100644 --- a/services/mcp-server/src/lib/platform-client.ts +++ b/services/mcp-server/src/lib/platform-client.ts @@ -247,6 +247,53 @@ export async function runStepsUpdate( ); } +// ── AI Budgets ─────────────────────────────────────────────────────────────── + +export async function aiBudgetsRecordSpend( + body: { + scopeType: 'product' | 'agent'; + scopeId: string; + policyId?: string; + agentId?: string; + agentVersionId?: string; + runId?: string; + evaluationRunId?: string; + model?: string; + tokensUsed?: number; + costUsd: number; + source?: string; + }, + opts: PlatformClientOptions +): Promise { + return platformFetch( + '/api/ai-budgets/spend', + { method: 'POST', body: JSON.stringify(body) }, + opts + ); +} + +// ── Support Cases ──────────────────────────────────────────────────────────── + +export async function supportCasesCreate( + body: { + orgId?: string; + workspaceId?: string; + requesterUserId?: string; + assignedTo?: string; + title: string; + description?: string; + priority?: 'critical' | 'high' | 'medium' | 'low'; + source?: 'manual' | 'agent' | 'telemetry' | 'customer'; + runId?: string; + reviewId?: string; + knowledgeBaseId?: string; + tags?: string[]; + }, + opts: PlatformClientOptions +): Promise { + return platformFetch('/api/support/cases', { method: 'POST', body: JSON.stringify(body) }, opts); +} + // ── Diagnostics ─────────────────────────────────────────────────────────────── export interface DebugSession { diff --git a/services/mcp-server/src/modules/a2a/governance-integration.test.ts b/services/mcp-server/src/modules/a2a/governance-integration.test.ts new file mode 100644 index 00000000..d89bcea8 --- /dev/null +++ b/services/mcp-server/src/modules/a2a/governance-integration.test.ts @@ -0,0 +1,153 @@ +import { beforeEach, describe, expect, it, vi } from 'vitest'; +import type { McpLogger, McpToolRequest } from '../tools/types.js'; + +process.env.JWT_SECRET ??= 'test-secret'; +process.env.PLATFORM_SERVICE_URL ??= 'http://localhost:4003'; +process.env.EXTRACTION_SERVICE_URL ??= 'http://localhost:4005'; + +vi.mock('../../lib/platform-client.js', () => ({ + aiBudgetsRecordSpend: vi.fn(), + supportCasesCreate: vi.fn(), + runsCreate: vi.fn(), + runsUpdate: vi.fn(), + runStepsCreate: vi.fn(), + runStepsUpdate: vi.fn(), +})); + +vi.mock('../../lib/extraction-client.js', () => ({ + extractionRun: vi.fn(), +})); + +vi.mock('../../lib/jarvis-client.js', () => ({ + jarvisMarketplaceGetListing: vi.fn(), + jarvisMarketplaceCertify: vi.fn(), +})); + +vi.mock('../../lib/lysnrai-client.js', () => ({ + lysnraiTranscriptsList: vi.fn(), + lysnraiTranscriptRunExtraction: vi.fn(), +})); + +import { + aiBudgetsRecordSpend, + supportCasesCreate, + runsCreate, + runsUpdate, + runStepsCreate, + runStepsUpdate, +} from '../../lib/platform-client.js'; +import { extractionRun } from '../../lib/extraction-client.js'; +import { jarvisMarketplaceCertify, jarvisMarketplaceGetListing } from '../../lib/jarvis-client.js'; +import { + lysnraiTranscriptRunExtraction, + lysnraiTranscriptsList, +} from '../../lib/lysnrai-client.js'; + +const log: McpLogger = { + info: vi.fn(), + warn: vi.fn(), + error: vi.fn(), + debug: vi.fn(), +}; + +function buildReq(id = 'req_1'): McpToolRequest { + return { + id, + headers: { authorization: 'Bearer jwt_1' }, + log, + jwtPayload: { sub: 'admin_1', role: 'admin', productId: 'lysnrai' }, + } as unknown as McpToolRequest; +} + +beforeEach(() => { + vi.clearAllMocks(); + vi.mocked(aiBudgetsRecordSpend).mockResolvedValue({} as never); + vi.mocked(supportCasesCreate).mockResolvedValue({} as never); + vi.mocked(runsCreate).mockResolvedValue({} as never); + vi.mocked(runsUpdate).mockResolvedValue({} as never); + vi.mocked(runStepsCreate).mockResolvedValue({} as never); + vi.mocked(runStepsUpdate).mockResolvedValue({} as never); +}); + +describe('A2A governance integration', () => { + it('records budget spend and creates support case for NL parser regressions', async () => { + const { runNLParserEvalPipeline } = await import('./nl-parser-eval-pipeline.js'); + vi.mocked(extractionRun).mockResolvedValue({ + extractions: [], + metadata: { modelId: 'gemini-2.5-flash', taskId: 'timer-parse' }, + }); + + const report = await runNLParserEvalPipeline([], buildReq('req_eval')); + + expect(report.regressions.length).toBeGreaterThan(0); + expect(aiBudgetsRecordSpend).toHaveBeenCalledOnce(); + expect(supportCasesCreate).toHaveBeenCalledWith( + expect.objectContaining({ + title: 'NL parser regression detected', + }), + expect.objectContaining({ productId: 'chronomind' }) + ); + }); + + it('creates support case for marketplace certification needing human review', async () => { + const { runMarketplaceCertPipeline } = await import('./marketplace-cert-pipeline.js'); + vi.mocked(jarvisMarketplaceGetListing).mockResolvedValue({ + id: 'listing_1', + name: 'Coach', + systemPrompt: 'safe prompt', + coachingFramework: 'growth', + category: 'productivity', + tags: ['coach', 'habit'], + voiceId: 'voice_1', + description: 'Helpful description for the listing.', + } as never); + vi.mocked(extractionRun).mockResolvedValue({ + extractions: [ + { + extraction_class: 'controversial', + extraction_text: 'borderline copy', + }, + ], + metadata: { modelId: 'gemini-2.5-flash', taskId: 'triage' }, + }); + vi.mocked(jarvisMarketplaceCertify).mockResolvedValue({} as never); + + const result = await runMarketplaceCertPipeline('listing_1', { + token: 'jwt_1', + requestId: 'req_market', + }); + + expect(result.decision).toBe('pending_human_review'); + expect(aiBudgetsRecordSpend).toHaveBeenCalledOnce(); + expect(supportCasesCreate).toHaveBeenCalledWith( + expect.objectContaining({ + title: expect.stringContaining('Marketplace certification pending_human_review'), + }), + expect.objectContaining({ productId: 'jarvisjr' }) + ); + }); + + it('records spend and opens support case when transcript extraction reports failures', async () => { + const { runTranscriptExtractionPipeline } = await import('./transcript-extraction-pipeline.js'); + vi.mocked(lysnraiTranscriptsList).mockResolvedValue({ + transcripts: [ + { id: 't1', extractedAt: null }, + { id: 't2', extractedAt: null }, + ], + } as never); + vi.mocked(lysnraiTranscriptRunExtraction) + .mockResolvedValueOnce({} as never) + .mockRejectedValueOnce(new Error('boom')); + + const report = await runTranscriptExtractionPipeline(10, false, buildReq('req_tx')); + + expect(report.failed).toBe(1); + expect(aiBudgetsRecordSpend).toHaveBeenCalledOnce(); + expect(supportCasesCreate).toHaveBeenCalledWith( + expect.objectContaining({ + title: 'Transcript extraction pipeline reported failures', + }), + expect.objectContaining({ productId: 'lysnrai' }) + ); + }); +}); diff --git a/services/mcp-server/src/modules/a2a/governance.ts b/services/mcp-server/src/modules/a2a/governance.ts new file mode 100644 index 00000000..62b9a254 --- /dev/null +++ b/services/mcp-server/src/modules/a2a/governance.ts @@ -0,0 +1,87 @@ +import { aiBudgetsRecordSpend, supportCasesCreate } from '../../lib/platform-client.js'; +import type { PlatformClientOptions } from '../../lib/platform-client.js'; + +interface BudgetSpendInput { + productId: string; + runId: string; + source: string; + scopeType?: 'product' | 'agent'; + scopeId?: string; + agentId?: string; + agentVersionId?: string; + evaluationRunId?: string; + model?: string; + tokensUsed?: number; + costUsd: number; +} + +interface SupportCaseInput { + productId: string; + runId: string; + title: string; + description: string; + priority?: 'critical' | 'high' | 'medium' | 'low'; + source?: 'manual' | 'agent' | 'telemetry' | 'customer'; + requesterUserId?: string; + tags?: string[]; +} + +export async function recordBudgetSpend( + input: BudgetSpendInput, + opts: PlatformClientOptions +): Promise { + if (input.costUsd <= 0) return; + await safeGovernanceCall(() => + aiBudgetsRecordSpend( + { + scopeType: input.scopeType ?? 'product', + scopeId: input.scopeId ?? input.productId, + agentId: input.agentId, + agentVersionId: input.agentVersionId, + runId: input.runId, + evaluationRunId: input.evaluationRunId, + model: input.model, + tokensUsed: input.tokensUsed ?? 0, + costUsd: input.costUsd, + source: input.source, + }, + withProduct(opts, input.productId) + ) + ); +} + +export async function createSupportCaseForRun( + input: SupportCaseInput, + opts: PlatformClientOptions +): Promise { + await safeGovernanceCall(() => + supportCasesCreate( + { + requesterUserId: input.requesterUserId, + title: input.title, + description: input.description, + priority: input.priority ?? 'high', + source: input.source ?? 'agent', + runId: input.runId, + tags: input.tags ?? [], + }, + withProduct(opts, input.productId) + ) + ); +} + +function withProduct(opts: PlatformClientOptions, productId: string): PlatformClientOptions { + return { + token: opts.token, + requestId: opts.requestId, + productId, + }; +} + +async function safeGovernanceCall(fn: () => Promise): Promise { + try { + await fn(); + } catch { + // Governance hooks must not fail the pipeline itself. + } +} diff --git a/services/mcp-server/src/modules/a2a/marketplace-cert-pipeline.ts b/services/mcp-server/src/modules/a2a/marketplace-cert-pipeline.ts index fcbadca1..e440c72b 100644 --- a/services/mcp-server/src/modules/a2a/marketplace-cert-pipeline.ts +++ b/services/mcp-server/src/modules/a2a/marketplace-cert-pipeline.ts @@ -17,6 +17,15 @@ import { registerTool } from '../tools/registry.js'; import type { McpToolRequest } from '../tools/types.js'; import { jarvisMarketplaceGetListing, jarvisMarketplaceCertify } from '../../lib/jarvis-client.js'; import { extractionRun } from '../../lib/extraction-client.js'; +import { + trackRunCompleted, + trackRunFailed, + trackRunStarted, + trackStepCompleted, + trackStepFailed, + trackStepStarted, +} from './run-tracker.js'; +import { createSupportCaseForRun, recordBudgetSpend } from './governance.js'; // ── Types ───────────────────────────────────────────────────────────────────── @@ -270,34 +279,210 @@ export async function runMarketplaceCertPipeline( ): Promise { const runId = `run_cert_${randomUUID().slice(0, 8)}`; const certifiedAt = new Date().toISOString(); + let currentStep: + | { stepName: 'ingest' | 'safety' | 'quality' | 'decision'; order: number } + | undefined; - // Step 1: ingest - let ingestion: IngestionResult; - try { - ingestion = await ingestSubmission(listingId, opts); - } catch (err) { - const msg = err instanceof Error ? err.message : String(err); - return { + await safeTrack(() => + trackRunStarted({ runId, - listingId, - certifiedAt, - decision: 'pending_human_review', - isVerified: false, - ingestion: { valid: false, errors: [`Failed to fetch listing: ${msg}`], listing: {} }, - safety: { verdict: 'pass', reasons: [], confidenceScore: 0 }, - quality: { score: 0, passed: false, coherenceIssues: [] }, - rejectionReasons: [`Failed to fetch listing: ${msg}`], - }; - } + productId: 'jarvisjr', + name: 'marketplace-certification', + requestId: opts.requestId, + token: opts.token, + input: { listingId }, + }) + ); - // Step 2: safety check - const safety = await checkContentSafety(listingId, ingestion.listing, { - requestId: opts.requestId, - }); + try { + currentStep = { stepName: 'ingest', order: 1 }; + const ingestStep = currentStep; + await safeTrack(() => + trackStepStarted({ + runId, + productId: 'jarvisjr', + stepName: ingestStep.stepName, + order: ingestStep.order, + token: opts.token, + requestId: opts.requestId, + input: { listingId }, + }) + ); + let ingestion: IngestionResult; + try { + ingestion = await ingestSubmission(listingId, opts); + } catch (err) { + const msg = err instanceof Error ? err.message : String(err); + const result = { + runId, + listingId, + certifiedAt, + decision: 'pending_human_review' as const, + isVerified: false, + ingestion: { valid: false, errors: [`Failed to fetch listing: ${msg}`], listing: {} }, + safety: { verdict: 'pass' as const, reasons: [], confidenceScore: 0 }, + quality: { score: 0, passed: false, coherenceIssues: [] }, + rejectionReasons: [`Failed to fetch listing: ${msg}`], + }; + await createSupportCaseForRun( + { + productId: 'jarvisjr', + runId, + title: `Marketplace certification fetch failed for ${listingId}`, + description: msg, + priority: 'high', + tags: ['a2a', 'marketplace', 'certification'], + }, + opts + ); + await safeTrack(() => + trackRunCompleted({ + runId, + productId: 'jarvisjr', + name: 'marketplace-certification', + requestId: opts.requestId, + token: opts.token, + output: { decision: result.decision, isVerified: result.isVerified }, + }) + ); + return result; + } + await safeTrack(() => + trackStepCompleted({ + runId, + productId: 'jarvisjr', + stepName: ingestStep.stepName, + order: ingestStep.order, + token: opts.token, + requestId: opts.requestId, + output: { valid: ingestion.valid, errorCount: ingestion.errors.length }, + }) + ); - // Halt on reject (no need for quality eval) - if (safety.verdict === 'reject' || !ingestion.valid) { - const quality: QualityResult = { score: 0, passed: false, coherenceIssues: [] }; + currentStep = { stepName: 'safety', order: 2 }; + const safetyStep = currentStep; + await safeTrack(() => + trackStepStarted({ + runId, + productId: 'jarvisjr', + stepName: safetyStep.stepName, + order: safetyStep.order, + token: opts.token, + requestId: opts.requestId, + }) + ); + const safety = await checkContentSafety(listingId, ingestion.listing, { + requestId: opts.requestId, + }); + await recordBudgetSpend( + { + productId: 'jarvisjr', + runId, + source: 'a2a.marketplace_certification', + costUsd: 0.002, + tokensUsed: Math.ceil( + `${String(ingestion.listing['systemPrompt'] ?? '')}${String(ingestion.listing['description'] ?? '')}` + .length / 4 + ), + }, + opts + ); + await safeTrack(() => + trackStepCompleted({ + runId, + productId: 'jarvisjr', + stepName: safetyStep.stepName, + order: safetyStep.order, + token: opts.token, + requestId: opts.requestId, + output: { verdict: safety.verdict, confidenceScore: safety.confidenceScore }, + }) + ); + + if (safety.verdict === 'reject' || !ingestion.valid) { + currentStep = { stepName: 'decision', order: 4 }; + const quality: QualityResult = { score: 0, passed: false, coherenceIssues: [] }; + const { decision, isVerified, rejectionReasons } = await makeCertDecision( + listingId, + ingestion, + safety, + quality, + opts + ); + const result = { + runId, + listingId, + certifiedAt, + decision, + isVerified, + ingestion, + safety, + quality, + rejectionReasons, + }; + if (decision !== 'approved') { + await createSupportCaseForRun( + { + productId: 'jarvisjr', + runId, + title: `Marketplace certification ${decision} for ${listingId}`, + description: rejectionReasons.join('; ') || 'Marketplace certification needs attention', + priority: decision === 'rejected' ? 'high' : 'medium', + tags: ['a2a', 'marketplace', decision], + }, + opts + ); + } + await safeTrack(() => + trackRunCompleted({ + runId, + productId: 'jarvisjr', + name: 'marketplace-certification', + requestId: opts.requestId, + token: opts.token, + output: { decision, isVerified }, + }) + ); + return result; + } + + currentStep = { stepName: 'quality', order: 3 }; + const qualityStep = currentStep; + await safeTrack(() => + trackStepStarted({ + runId, + productId: 'jarvisjr', + stepName: qualityStep.stepName, + order: qualityStep.order, + token: opts.token, + requestId: opts.requestId, + }) + ); + const quality = evaluateQuality(ingestion.listing); + await safeTrack(() => + trackStepCompleted({ + runId, + productId: 'jarvisjr', + stepName: qualityStep.stepName, + order: qualityStep.order, + token: opts.token, + requestId: opts.requestId, + output: { score: quality.score, passed: quality.passed }, + }) + ); + + currentStep = { stepName: 'decision', order: 4 }; + const decisionStep = currentStep; + await safeTrack(() => + trackStepStarted({ + runId, + productId: 'jarvisjr', + stepName: decisionStep.stepName, + order: decisionStep.order, + token: opts.token, + requestId: opts.requestId, + }) + ); const { decision, isVerified, rejectionReasons } = await makeCertDecision( listingId, ingestion, @@ -305,7 +490,19 @@ export async function runMarketplaceCertPipeline( quality, opts ); - return { + await safeTrack(() => + trackStepCompleted({ + runId, + productId: 'jarvisjr', + stepName: decisionStep.stepName, + order: decisionStep.order, + token: opts.token, + requestId: opts.requestId, + output: { decision, isVerified }, + }) + ); + + const result = { runId, listingId, certifiedAt, @@ -316,31 +513,66 @@ export async function runMarketplaceCertPipeline( quality, rejectionReasons, }; + if (decision !== 'approved') { + await createSupportCaseForRun( + { + productId: 'jarvisjr', + runId, + title: `Marketplace certification ${decision} for ${listingId}`, + description: rejectionReasons.join('; ') || 'Marketplace certification needs attention', + priority: decision === 'rejected' ? 'high' : 'medium', + tags: ['a2a', 'marketplace', decision], + }, + opts + ); + } + await safeTrack(() => + trackRunCompleted({ + runId, + productId: 'jarvisjr', + name: 'marketplace-certification', + requestId: opts.requestId, + token: opts.token, + output: { decision, isVerified }, + }) + ); + return result; + } catch (error) { + const message = error instanceof Error ? error.message : String(error); + if (currentStep) { + const failedStep = currentStep; + await safeTrack(() => + trackStepFailed({ + runId, + productId: 'jarvisjr', + stepName: failedStep.stepName, + order: failedStep.order, + token: opts.token, + requestId: opts.requestId, + error: message, + }) + ); + } + await safeTrack(() => + trackRunFailed({ + runId, + productId: 'jarvisjr', + name: 'marketplace-certification', + requestId: opts.requestId, + token: opts.token, + error: message, + }) + ); + throw error; } +} - // Step 3: quality eval (heuristic) - const quality = evaluateQuality(ingestion.listing); - - // Step 4: decision + backend update - const { decision, isVerified, rejectionReasons } = await makeCertDecision( - listingId, - ingestion, - safety, - quality, - opts - ); - - return { - runId, - listingId, - certifiedAt, - decision, - isVerified, - ingestion, - safety, - quality, - rejectionReasons, - }; +async function safeTrack(fn: () => Promise): Promise { + try { + await fn(); + } catch { + // Tracking must never fail the pipeline itself. + } } // ── MCP tool: jarvis.marketplace.runCertification ──────────────────────────── diff --git a/services/mcp-server/src/modules/a2a/nl-parser-eval-pipeline.ts b/services/mcp-server/src/modules/a2a/nl-parser-eval-pipeline.ts index e8d8ecbf..a71d9a39 100644 --- a/services/mcp-server/src/modules/a2a/nl-parser-eval-pipeline.ts +++ b/services/mcp-server/src/modules/a2a/nl-parser-eval-pipeline.ts @@ -15,6 +15,15 @@ import { z } from 'zod'; import { registerTool } from '../tools/registry.js'; import type { McpToolRequest } from '../tools/types.js'; import { extractionRun } from '../../lib/extraction-client.js'; +import { + trackRunCompleted, + trackRunFailed, + trackRunStarted, + trackStepCompleted, + trackStepFailed, + trackStepStarted, +} from './run-tracker.js'; +import { createSupportCaseForRun, recordBudgetSpend } from './governance.js'; // ── Canonical test suite ────────────────────────────────────────────────────── @@ -211,40 +220,203 @@ function buildEvalReport(runId: string, results: ParseEvalResult[]): NLParserEva // ── Pipeline runner ──────────────────────────────────────────────────────────── -async function runNLParserEvalPipeline( +export async function runNLParserEvalPipeline( customPhrases: string[], req: McpToolRequest ): Promise { const runId = randomUUID(); const opts = { token: req.headers.authorization?.slice(7), requestId: req.id }; + let currentStep: { stepName: 'sample' | 'eval' | 'report'; order: number } | undefined; - req.log.info( - { runId, stepId: 'sample', customCount: customPhrases.length }, - 'PhraseSamplerAgent start' - ); - const testCases = assemblePhrases(customPhrases); - req.log.info( - { runId, stepId: 'sample', totalCases: testCases.length }, - 'PhraseSamplerAgent done' + await safeTrack(() => + trackRunStarted({ + runId, + productId: 'chronomind', + name: 'nl-parser-eval', + requestId: req.id, + token: opts.token, + input: { customPhraseCount: customPhrases.length }, + }) ); - req.log.info({ runId, stepId: 'eval', totalCases: testCases.length }, 'ParseEvalAgent start'); - const results: ParseEvalResult[] = []; - for (const testCase of testCases) { - const result = await evalPhrase(testCase, opts); - results.push(result); + try { + currentStep = { stepName: 'sample', order: 1 }; + const sampleStep = currentStep; + await safeTrack(() => + trackStepStarted({ + runId, + productId: 'chronomind', + stepName: sampleStep.stepName, + order: sampleStep.order, + token: opts.token, + requestId: req.id, + input: { customPhraseCount: customPhrases.length }, + }) + ); + + req.log.info( + { runId, stepId: 'sample', customCount: customPhrases.length }, + 'PhraseSamplerAgent start' + ); + const testCases = assemblePhrases(customPhrases); + await safeTrack(() => + trackStepCompleted({ + runId, + productId: 'chronomind', + stepName: sampleStep.stepName, + order: sampleStep.order, + token: opts.token, + requestId: req.id, + output: { totalCases: testCases.length }, + }) + ); + req.log.info( + { runId, stepId: 'sample', totalCases: testCases.length }, + 'PhraseSamplerAgent done' + ); + + currentStep = { stepName: 'eval', order: 2 }; + const evalStep = currentStep; + await safeTrack(() => + trackStepStarted({ + runId, + productId: 'chronomind', + stepName: evalStep.stepName, + order: evalStep.order, + token: opts.token, + requestId: req.id, + input: { totalCases: testCases.length }, + }) + ); + req.log.info({ runId, stepId: 'eval', totalCases: testCases.length }, 'ParseEvalAgent start'); + const results: ParseEvalResult[] = []; + for (const testCase of testCases) { + const result = await evalPhrase(testCase, opts); + results.push(result); + } + const passed = results.filter(r => r.outcome === 'pass').length; + const estimatedTokens = testCases.reduce( + (sum, item) => sum + Math.ceil(item.phrase.length / 4), + 0 + ); + await safeTrack(() => + trackStepCompleted({ + runId, + productId: 'chronomind', + stepName: evalStep.stepName, + order: evalStep.order, + token: opts.token, + requestId: req.id, + output: { passed, total: results.length }, + }) + ); + await recordBudgetSpend( + { + productId: 'chronomind', + runId, + source: 'a2a.nl_parser_eval', + costUsd: Number((estimatedTokens * 0.000002).toFixed(6)), + tokensUsed: estimatedTokens, + }, + opts + ); + req.log.info({ runId, stepId: 'eval', passed, total: results.length }, 'ParseEvalAgent done'); + + currentStep = { stepName: 'report', order: 3 }; + const reportStep = currentStep; + await safeTrack(() => + trackStepStarted({ + runId, + productId: 'chronomind', + stepName: reportStep.stepName, + order: reportStep.order, + token: opts.token, + requestId: req.id, + }) + ); + req.log.info({ runId, stepId: 'report' }, 'RegressionReportAgent start'); + const report = buildEvalReport(runId, results); + await safeTrack(() => + trackStepCompleted({ + runId, + productId: 'chronomind', + stepName: reportStep.stepName, + order: reportStep.order, + token: opts.token, + requestId: req.id, + output: { passRate: report.passRate, regressions: report.regressions.length }, + }) + ); + await safeTrack(() => + trackRunCompleted({ + runId, + productId: 'chronomind', + name: 'nl-parser-eval', + requestId: req.id, + token: opts.token, + output: { passRate: report.passRate, regressions: report.regressions.length }, + }) + ); + if (report.regressions.length > 0 || report.passRate < 0.8) { + await createSupportCaseForRun( + { + productId: 'chronomind', + runId, + title: 'NL parser regression detected', + description: report.summary, + priority: report.passRate < 0.5 ? 'critical' : 'high', + tags: ['a2a', 'nl-parser', 'evaluation'], + }, + opts + ); + } + req.log.info( + { + runId, + stepId: 'report', + passRate: report.passRate, + regressions: report.regressions.length, + }, + 'RegressionReportAgent done' + ); + + return report; + } catch (error) { + const message = error instanceof Error ? error.message : String(error); + if (currentStep) { + const failedStep = currentStep; + await safeTrack(() => + trackStepFailed({ + runId, + productId: 'chronomind', + stepName: failedStep.stepName, + order: failedStep.order, + token: opts.token, + requestId: req.id, + error: message, + }) + ); + } + await safeTrack(() => + trackRunFailed({ + runId, + productId: 'chronomind', + name: 'nl-parser-eval', + requestId: req.id, + token: opts.token, + error: message, + }) + ); + throw error; } - const passed = results.filter(r => r.outcome === 'pass').length; - req.log.info({ runId, stepId: 'eval', passed, total: results.length }, 'ParseEvalAgent done'); +} - req.log.info({ runId, stepId: 'report' }, 'RegressionReportAgent start'); - const report = buildEvalReport(runId, results); - req.log.info( - { runId, stepId: 'report', passRate: report.passRate, regressions: report.regressions.length }, - 'RegressionReportAgent done' - ); - - return report; +async function safeTrack(fn: () => Promise): Promise { + try { + await fn(); + } catch { + // Tracking must never fail the pipeline itself. + } } // ── MCP tool registration ───────────────────────────────────────────────────── diff --git a/services/mcp-server/src/modules/a2a/runner.ts b/services/mcp-server/src/modules/a2a/runner.ts index 655c7da8..0e4ea15d 100644 --- a/services/mcp-server/src/modules/a2a/runner.ts +++ b/services/mcp-server/src/modules/a2a/runner.ts @@ -9,7 +9,11 @@ import { randomUUID } from 'node:crypto'; import type { McpLogger } from '../tools/types.js'; -import type { SupportIncidentBrief, FinalIncidentReport } from './types.js'; +import type { + DiagnosticsSessionResult, + FinalIncidentReport, + SupportIncidentBrief, +} from './types.js'; import { dispatch } from './agents/dispatcher.js'; import { analyze } from './agents/telemetry-analyst.js'; import { orchestrate } from './agents/diagnostics-orchestrator.js'; @@ -66,12 +70,13 @@ export async function runIncidentPipeline( try { // ── Step 1: Dispatcher ──────────────────────────────────────────────── currentStep = { stepName: 'dispatcher', order: 1 }; + const dispatcherStep = currentStep; await safeTrack(() => trackStepStarted({ runId, productId: brief.productId, - stepName: currentStep.stepName, - order: currentStep.order, + stepName: dispatcherStep.stepName, + order: dispatcherStep.order, token: opts.token, requestId: opts.requestId, input: { @@ -90,8 +95,8 @@ export async function runIncidentPipeline( trackStepCompleted({ runId, productId: brief.productId, - stepName: currentStep.stepName, - order: currentStep.order, + stepName: dispatcherStep.stepName, + order: dispatcherStep.order, token: opts.token, requestId: opts.requestId, output: { @@ -114,12 +119,13 @@ export async function runIncidentPipeline( // ── Step 2: Telemetry Analyst ───────────────────────────────────────── currentStep = { stepName: 'telemetry_analyst', order: 2 }; + const analystStep = currentStep; await safeTrack(() => trackStepStarted({ runId, productId: brief.productId, - stepName: currentStep.stepName, - order: currentStep.order, + stepName: analystStep.stepName, + order: analystStep.order, token: opts.token, requestId: opts.requestId, }) @@ -131,8 +137,8 @@ export async function runIncidentPipeline( trackStepCompleted({ runId, productId: brief.productId, - stepName: currentStep.stepName, - order: currentStep.order, + stepName: analystStep.stepName, + order: analystStep.order, token: opts.token, requestId: opts.requestId, output: { @@ -156,34 +162,36 @@ export async function runIncidentPipeline( ); // ── Step 3: Diagnostics Orchestrator (conditional) ─────────────────── - let diagResult = null; + let diagResult: DiagnosticsSessionResult | null = null; if (decision.steps.includes('diagnostics_orchestrator')) { currentStep = { stepName: 'diagnostics_orchestrator', order: 3 }; + const diagnosticsStep = currentStep; await safeTrack(() => trackStepStarted({ runId, productId: brief.productId, - stepName: currentStep.stepName, - order: currentStep.order, + stepName: diagnosticsStep.stepName, + order: diagnosticsStep.order, token: opts.token, requestId: opts.requestId, }) ); - diagResult = await orchestrate(decision, findings, { token: opts.token }); + const diagnosticsResult = await orchestrate(decision, findings, { token: opts.token }); + diagResult = diagnosticsResult; await safeTrack(() => trackStepCompleted({ runId, productId: brief.productId, - stepName: currentStep.stepName, - order: currentStep.order, + stepName: diagnosticsStep.stepName, + order: diagnosticsStep.order, token: opts.token, requestId: opts.requestId, output: { - skipped: diagResult.skipped, - sessionId: diagResult.session?.id, - sessionError: diagResult.sessionError, + skipped: diagnosticsResult.skipped, + sessionId: diagnosticsResult.session?.id, + sessionError: diagnosticsResult.sessionError, }, }) ); @@ -191,11 +199,11 @@ export async function runIncidentPipeline( log.info( { runId, - stepId: diagResult.runContext.stepId, + stepId: diagnosticsResult.runContext.stepId, a2aStep: 'diagnostics_orchestrator.complete', - skipped: diagResult.skipped, - sessionId: diagResult.session?.id, - sessionError: diagResult.sessionError, + skipped: diagnosticsResult.skipped, + sessionId: diagnosticsResult.session?.id, + sessionError: diagnosticsResult.sessionError, }, 'Diagnostics orchestrator completed' ); @@ -203,12 +211,13 @@ export async function runIncidentPipeline( // ── Step 4: Report Writer ───────────────────────────────────────────── currentStep = { stepName: 'report_writer', order: 4 }; + const reportStep = currentStep; await safeTrack(() => trackStepStarted({ runId, productId: brief.productId, - stepName: currentStep.stepName, - order: currentStep.order, + stepName: reportStep.stepName, + order: reportStep.order, token: opts.token, requestId: opts.requestId, }) @@ -220,8 +229,8 @@ export async function runIncidentPipeline( trackStepCompleted({ runId, productId: brief.productId, - stepName: currentStep.stepName, - order: currentStep.order, + stepName: reportStep.stepName, + order: reportStep.order, token: opts.token, requestId: opts.requestId, output: { @@ -261,12 +270,13 @@ export async function runIncidentPipeline( } catch (error) { const message = error instanceof Error ? error.message : String(error); if (currentStep) { + const failedStep = currentStep; await safeTrack(() => trackStepFailed({ runId, productId: brief.productId, - stepName: currentStep.stepName, - order: currentStep.order, + stepName: failedStep.stepName, + order: failedStep.order, token: opts.token, requestId: opts.requestId, error: message, diff --git a/services/mcp-server/src/modules/a2a/transcript-extraction-pipeline.ts b/services/mcp-server/src/modules/a2a/transcript-extraction-pipeline.ts index ffb74432..007e7aa7 100644 --- a/services/mcp-server/src/modules/a2a/transcript-extraction-pipeline.ts +++ b/services/mcp-server/src/modules/a2a/transcript-extraction-pipeline.ts @@ -20,6 +20,15 @@ import { type TranscriptDoc, } from '../../lib/lysnrai-client.js'; import { config } from '../../lib/config.js'; +import { + trackRunCompleted, + trackRunFailed, + trackRunStarted, + trackStepCompleted, + trackStepFailed, + trackStepStarted, +} from './run-tracker.js'; +import { createSupportCaseForRun, recordBudgetSpend } from './governance.js'; // ── Types ────────────────────────────────────────────────────────────────────── @@ -139,7 +148,7 @@ function buildReport( // ── Pipeline runner ──────────────────────────────────────────────────────────── -async function runTranscriptExtractionPipeline( +export async function runTranscriptExtractionPipeline( limit: number, dryRun: boolean, req: McpToolRequest @@ -149,34 +158,193 @@ async function runTranscriptExtractionPipeline( token: req.headers.authorization?.slice(7), requestId: req.id, }; + let currentStep: { stepName: 'collect' | 'batch' | 'report'; order: number } | undefined; - req.log.info({ runId, stepId: 'collect', limit, dryRun }, 'TranscriptCollectorAgent start'); - const collection = await collectUnextracted(limit, opts); - req.log.info( - { + await safeTrack(() => + trackRunStarted({ runId, - stepId: 'collect', - totalFetched: collection.totalFetched, - unextractedCount: collection.unextractedIds.length, - }, - 'TranscriptCollectorAgent done' + productId: 'lysnrai', + name: 'transcript-extraction-pipeline', + requestId: req.id, + token: opts.token, + input: { limit, dryRun }, + }) ); - req.log.info( - { runId, stepId: 'batch', count: collection.unextractedIds.length, dryRun }, - 'ExtractionBatchAgent start' - ); - const batch = await runExtractionBatch(collection.unextractedIds, dryRun, opts); - req.log.info( - { runId, stepId: 'batch', succeeded: batch.succeeded.length, failed: batch.failed.length }, - 'ExtractionBatchAgent done' - ); + try { + currentStep = { stepName: 'collect', order: 1 }; + const collectStep = currentStep; + await safeTrack(() => + trackStepStarted({ + runId, + productId: 'lysnrai', + stepName: collectStep.stepName, + order: collectStep.order, + token: opts.token, + requestId: req.id, + input: { limit, dryRun }, + }) + ); + req.log.info({ runId, stepId: 'collect', limit, dryRun }, 'TranscriptCollectorAgent start'); + const collection = await collectUnextracted(limit, opts); + await safeTrack(() => + trackStepCompleted({ + runId, + productId: 'lysnrai', + stepName: collectStep.stepName, + order: collectStep.order, + token: opts.token, + requestId: req.id, + output: { + totalFetched: collection.totalFetched, + unextractedCount: collection.unextractedIds.length, + }, + }) + ); + req.log.info( + { + runId, + stepId: 'collect', + totalFetched: collection.totalFetched, + unextractedCount: collection.unextractedIds.length, + }, + 'TranscriptCollectorAgent done' + ); - req.log.info({ runId, stepId: 'report' }, 'ExtractionReportAgent start'); - const report = buildReport(runId, dryRun, collection, batch); - req.log.info({ runId, stepId: 'report', summary: report.summary }, 'ExtractionReportAgent done'); + currentStep = { stepName: 'batch', order: 2 }; + const batchStep = currentStep; + await safeTrack(() => + trackStepStarted({ + runId, + productId: 'lysnrai', + stepName: batchStep.stepName, + order: batchStep.order, + token: opts.token, + requestId: req.id, + input: { count: collection.unextractedIds.length, dryRun }, + }) + ); + req.log.info( + { runId, stepId: 'batch', count: collection.unextractedIds.length, dryRun }, + 'ExtractionBatchAgent start' + ); + const batch = await runExtractionBatch(collection.unextractedIds, dryRun, opts); + await safeTrack(() => + trackStepCompleted({ + runId, + productId: 'lysnrai', + stepName: batchStep.stepName, + order: batchStep.order, + token: opts.token, + requestId: req.id, + output: { succeeded: batch.succeeded.length, failed: batch.failed.length }, + }) + ); + await recordBudgetSpend( + { + productId: 'lysnrai', + runId, + source: 'a2a.transcript_extraction', + costUsd: Number((batch.succeeded.length * 0.002).toFixed(6)), + tokensUsed: batch.succeeded.length * 1000, + }, + opts + ); + req.log.info( + { runId, stepId: 'batch', succeeded: batch.succeeded.length, failed: batch.failed.length }, + 'ExtractionBatchAgent done' + ); - return report; + currentStep = { stepName: 'report', order: 3 }; + const reportStep = currentStep; + await safeTrack(() => + trackStepStarted({ + runId, + productId: 'lysnrai', + stepName: reportStep.stepName, + order: reportStep.order, + token: opts.token, + requestId: req.id, + }) + ); + req.log.info({ runId, stepId: 'report' }, 'ExtractionReportAgent start'); + const report = buildReport(runId, dryRun, collection, batch); + await safeTrack(() => + trackStepCompleted({ + runId, + productId: 'lysnrai', + stepName: reportStep.stepName, + order: reportStep.order, + token: opts.token, + requestId: req.id, + output: { succeeded: report.succeeded, failed: report.failed }, + }) + ); + await safeTrack(() => + trackRunCompleted({ + runId, + productId: 'lysnrai', + name: 'transcript-extraction-pipeline', + requestId: req.id, + token: opts.token, + output: { succeeded: report.succeeded, failed: report.failed }, + }) + ); + if (report.failed > 0) { + await createSupportCaseForRun( + { + productId: 'lysnrai', + runId, + title: 'Transcript extraction pipeline reported failures', + description: report.summary, + priority: report.failed >= 5 ? 'high' : 'medium', + tags: ['a2a', 'transcripts', 'extraction'], + }, + opts + ); + } + req.log.info( + { runId, stepId: 'report', summary: report.summary }, + 'ExtractionReportAgent done' + ); + + return report; + } catch (error) { + const message = error instanceof Error ? error.message : String(error); + if (currentStep) { + const failedStep = currentStep; + await safeTrack(() => + trackStepFailed({ + runId, + productId: 'lysnrai', + stepName: failedStep.stepName, + order: failedStep.order, + token: opts.token, + requestId: req.id, + error: message, + }) + ); + } + await safeTrack(() => + trackRunFailed({ + runId, + productId: 'lysnrai', + name: 'transcript-extraction-pipeline', + requestId: req.id, + token: opts.token, + error: message, + }) + ); + throw error; + } +} + +async function safeTrack(fn: () => Promise): Promise { + try { + await fn(); + } catch { + // Tracking must never fail the pipeline itself. + } } // ── MCP tool registration ─────────────────────────────────────────────────────