learning_ai_common_plat/services/mcp-server/src/modules/a2a/transcript-extraction-pipeline.ts

373 lines
11 KiB
TypeScript

/**
* TranscriptExtractionPipelineAgent — A2A pipeline for LysnrAI transcript enrichment.
*
* Agent roster (3 steps):
* 1. TranscriptCollectorAgent — list transcripts, filter where extractedAt is null
* 2. ExtractionBatchAgent — run extraction on each unprocessed transcript (serial, best-effort)
* 3. ExtractionReportAgent — assemble report with counts, errors, sample entities
*
* MCP tools:
* lysnrai.transcripts.runExtractionPipeline(limit?, dryRun?) — run pipeline
*/
import { randomUUID } from 'node:crypto';
import { z } from 'zod';
import { registerTool } from '../tools/registry.js';
import type { McpToolRequest } from '../tools/types.js';
import {
lysnraiTranscriptsList,
lysnraiTranscriptRunExtraction,
type TranscriptDoc,
} from '../../lib/lysnrai-client.js';
import { config } from '../../lib/config.js';
import {
trackRunCompleted,
trackRunFailed,
trackRunStarted,
trackStepCompleted,
trackStepFailed,
trackStepStarted,
} from './run-tracker.js';
import { createSupportCaseForRun, recordBudgetSpend } from './governance.js';
// ── Types ──────────────────────────────────────────────────────────────────────
interface CollectionResult {
totalFetched: number;
unextractedIds: string[];
sampleTranscripts: TranscriptDoc[];
}
interface BatchResult {
processed: number;
succeeded: string[];
failed: Array<{ id: string; error: string }>;
skipped: boolean;
}
export interface TranscriptExtractionReport {
runId: string;
productId: 'lysnrai';
dryRun: boolean;
totalFetched: number;
unextractedCount: number;
processed: number;
succeeded: number;
failed: number;
failedIds: string[];
sampleExtractedIds: string[];
summary: string;
generatedAt: string;
}
// ── Step 1: TranscriptCollectorAgent ──────────────────────────────────────────
async function collectUnextracted(
limit: number,
opts: { token?: string; requestId?: string }
): Promise<CollectionResult> {
const result = await lysnraiTranscriptsList({ limit }, opts);
const transcripts = result.transcripts;
const unextracted = transcripts.filter(t => !t.extractedAt);
return {
totalFetched: transcripts.length,
unextractedIds: unextracted.map(t => t.id),
sampleTranscripts: unextracted.slice(0, 5),
};
}
// ── Step 2: ExtractionBatchAgent ──────────────────────────────────────────────
async function runExtractionBatch(
transcriptIds: string[],
dryRun: boolean,
opts: { token?: string; requestId?: string }
): Promise<BatchResult> {
if (dryRun || transcriptIds.length === 0) {
return {
processed: 0,
succeeded: [],
failed: [],
skipped: dryRun,
};
}
const succeeded: string[] = [];
const failed: Array<{ id: string; error: string }> = [];
for (const id of transcriptIds) {
try {
await lysnraiTranscriptRunExtraction(id, opts);
succeeded.push(id);
} catch (err) {
failed.push({ id, error: err instanceof Error ? err.message : String(err) });
}
}
return { processed: transcriptIds.length, succeeded, failed, skipped: false };
}
// ── Step 3: ExtractionReportAgent ─────────────────────────────────────────────
function buildReport(
runId: string,
dryRun: boolean,
collection: CollectionResult,
batch: BatchResult
): TranscriptExtractionReport {
const now = new Date().toISOString();
const unextractedCount = collection.unextractedIds.length;
let summary: string;
if (dryRun) {
summary = `DRY RUN: Found ${unextractedCount} unextracted transcripts out of ${collection.totalFetched} fetched. No extraction was run.`;
} else if (unextractedCount === 0) {
summary = `All ${collection.totalFetched} transcripts are already extracted. Nothing to do.`;
} else {
const failNote = batch.failed.length > 0 ? ` ${batch.failed.length} failed.` : '';
summary = `Extracted ${batch.succeeded.length}/${unextractedCount} transcripts.${failNote}`;
}
return {
runId,
productId: 'lysnrai',
dryRun,
totalFetched: collection.totalFetched,
unextractedCount,
processed: batch.processed,
succeeded: batch.succeeded.length,
failed: batch.failed.length,
failedIds: batch.failed.map(f => f.id),
sampleExtractedIds: batch.succeeded.slice(0, 5),
summary,
generatedAt: now,
};
}
// ── Pipeline runner ────────────────────────────────────────────────────────────
export async function runTranscriptExtractionPipeline(
limit: number,
dryRun: boolean,
req: McpToolRequest
): Promise<TranscriptExtractionReport> {
const runId = randomUUID();
const opts = {
token: req.headers.authorization?.slice(7),
requestId: req.id,
};
let currentStep: { stepName: 'collect' | 'batch' | 'report'; order: number } | undefined;
await safeTrack(() =>
trackRunStarted({
runId,
productId: 'lysnrai',
name: 'transcript-extraction-pipeline',
requestId: req.id,
token: opts.token,
input: { limit, dryRun },
})
);
try {
currentStep = { stepName: 'collect', order: 1 };
const collectStep = currentStep;
await safeTrack(() =>
trackStepStarted({
runId,
productId: 'lysnrai',
stepName: collectStep.stepName,
order: collectStep.order,
token: opts.token,
requestId: req.id,
input: { limit, dryRun },
})
);
req.log.info({ runId, stepId: 'collect', limit, dryRun }, 'TranscriptCollectorAgent start');
const collection = await collectUnextracted(limit, opts);
await safeTrack(() =>
trackStepCompleted({
runId,
productId: 'lysnrai',
stepName: collectStep.stepName,
order: collectStep.order,
token: opts.token,
requestId: req.id,
output: {
totalFetched: collection.totalFetched,
unextractedCount: collection.unextractedIds.length,
},
})
);
req.log.info(
{
runId,
stepId: 'collect',
totalFetched: collection.totalFetched,
unextractedCount: collection.unextractedIds.length,
},
'TranscriptCollectorAgent done'
);
currentStep = { stepName: 'batch', order: 2 };
const batchStep = currentStep;
await safeTrack(() =>
trackStepStarted({
runId,
productId: 'lysnrai',
stepName: batchStep.stepName,
order: batchStep.order,
token: opts.token,
requestId: req.id,
input: { count: collection.unextractedIds.length, dryRun },
})
);
req.log.info(
{ runId, stepId: 'batch', count: collection.unextractedIds.length, dryRun },
'ExtractionBatchAgent start'
);
const batch = await runExtractionBatch(collection.unextractedIds, dryRun, opts);
await safeTrack(() =>
trackStepCompleted({
runId,
productId: 'lysnrai',
stepName: batchStep.stepName,
order: batchStep.order,
token: opts.token,
requestId: req.id,
output: { succeeded: batch.succeeded.length, failed: batch.failed.length },
})
);
await recordBudgetSpend(
{
productId: 'lysnrai',
runId,
source: 'a2a.transcript_extraction',
costUsd: Number((batch.succeeded.length * 0.002).toFixed(6)),
tokensUsed: batch.succeeded.length * 1000,
},
opts
);
req.log.info(
{ runId, stepId: 'batch', succeeded: batch.succeeded.length, failed: batch.failed.length },
'ExtractionBatchAgent done'
);
currentStep = { stepName: 'report', order: 3 };
const reportStep = currentStep;
await safeTrack(() =>
trackStepStarted({
runId,
productId: 'lysnrai',
stepName: reportStep.stepName,
order: reportStep.order,
token: opts.token,
requestId: req.id,
})
);
req.log.info({ runId, stepId: 'report' }, 'ExtractionReportAgent start');
const report = buildReport(runId, dryRun, collection, batch);
await safeTrack(() =>
trackStepCompleted({
runId,
productId: 'lysnrai',
stepName: reportStep.stepName,
order: reportStep.order,
token: opts.token,
requestId: req.id,
output: { succeeded: report.succeeded, failed: report.failed },
})
);
await safeTrack(() =>
trackRunCompleted({
runId,
productId: 'lysnrai',
name: 'transcript-extraction-pipeline',
requestId: req.id,
token: opts.token,
output: { succeeded: report.succeeded, failed: report.failed },
})
);
if (report.failed > 0) {
await createSupportCaseForRun(
{
productId: 'lysnrai',
runId,
title: 'Transcript extraction pipeline reported failures',
description: report.summary,
priority: report.failed >= 5 ? 'high' : 'medium',
tags: ['a2a', 'transcripts', 'extraction'],
},
opts
);
}
req.log.info(
{ runId, stepId: 'report', summary: report.summary },
'ExtractionReportAgent done'
);
return report;
} catch (error) {
const message = error instanceof Error ? error.message : String(error);
if (currentStep) {
const failedStep = currentStep;
await safeTrack(() =>
trackStepFailed({
runId,
productId: 'lysnrai',
stepName: failedStep.stepName,
order: failedStep.order,
token: opts.token,
requestId: req.id,
error: message,
})
);
}
await safeTrack(() =>
trackRunFailed({
runId,
productId: 'lysnrai',
name: 'transcript-extraction-pipeline',
requestId: req.id,
token: opts.token,
error: message,
})
);
throw error;
}
}
async function safeTrack(fn: () => Promise<void>): Promise<void> {
try {
await fn();
} catch {
// Tracking must never fail the pipeline itself.
}
}
// ── MCP tool registration ─────────────────────────────────────────────────────
registerTool({
name: 'lysnrai.transcripts.runExtractionPipeline',
description:
'A2A pipeline: fetches LysnrAI transcripts missing extraction data, runs the extraction service on each, and returns a report with counts and failures. Use dryRun=true to preview without running extraction. Requires admin role.',
requiredRole: 'admin',
inputSchema: z.object({
limit: z.coerce
.number()
.min(1)
.max(config.QUERY_MAX_LIMIT)
.default(config.QUERY_DEFAULT_LIMIT)
.describe('Max transcripts to fetch and process per run'),
dryRun: z
.boolean()
.default(false)
.describe('If true, only collect and count — do not run extraction'),
}),
async execute(args, req) {
return runTranscriptExtractionPipeline(args.limit, args.dryRun, req);
},
});