learning_ai_common_plat/services/mcp-server/src/modules/a2a/transcript-extraction-pipeline.ts
saravanakumardb1 d7aa90b021 feat(mcp-server): 4 MCP tool gaps + 3 new A2A pipelines (Priority 3/6/7)
MCP tool gaps filled (DOMAIN_PRODUCTS.md alignment):
- jarvis.memory.create — POST /jarvis/agents/:agentId/memory (sessionId, type, content, importance, tags, expiresAt)
- jarvis.teams.listMembers — GET /jarvis/teams/:teamId/members (role, status, joinedAt)
- nomgap.fasting.getSession — GET /fasting/sessions/:id (client func already existed, MCP tool was missing)
- peakpulse.weather.getSnapshot — extracts weather field from peakpulseSessionGet response

New A2A pipelines (all registered in server.ts):
- transcript-extraction-pipeline.ts: lysnrai.transcripts.runExtractionPipeline
  - TranscriptCollectorAgent -> ExtractionBatchAgent -> ExtractionReportAgent
  - Queries transcripts missing extractedAt, runs extraction, returns batch report + dryRun support
- sync-conflict-pipeline.ts: chronomind.sync.diagnoseConflicts
  - ConflictDetectorAgent -> SyncStateInspectorAgent -> DiagnosticsSessionAgent -> ConflictReportAgent
  - Queries telemetry for sync_conflict events, classifies pattern, creates diagnostics session on conflict
- route-safety-pipeline.ts: peakpulse.sessions.assessSafety
  - SessionDataAgent -> RouteProfileAgent -> SafetyAnalysisAgent -> SafetyReportAgent
  - Fetches GPS + weather, evaluates UV/wind/altitude/speed risk factors, enriches with extraction entities

Client additions (jarvis-client.ts):
  jarvisMemoryCreate, jarvisTeamsListMembers + JarvisTeamMemberDoc interface

MCP server total: 93 tools across 17 namespaces
2026-03-05 15:18:21 -08:00

205 lines
6.8 KiB
TypeScript

/**
* TranscriptExtractionPipelineAgent — A2A pipeline for LysnrAI transcript enrichment.
*
* Agent roster (3 steps):
* 1. TranscriptCollectorAgent — list transcripts, filter where extractedAt is null
* 2. ExtractionBatchAgent — run extraction on each unprocessed transcript (serial, best-effort)
* 3. ExtractionReportAgent — assemble report with counts, errors, sample entities
*
* MCP tools:
* lysnrai.transcripts.runExtractionPipeline(limit?, dryRun?) — run pipeline
*/
import { randomUUID } from 'node:crypto';
import { z } from 'zod';
import { registerTool } from '../tools/registry.js';
import type { McpToolRequest } from '../tools/types.js';
import {
lysnraiTranscriptsList,
lysnraiTranscriptRunExtraction,
type TranscriptDoc,
} from '../../lib/lysnrai-client.js';
import { config } from '../../lib/config.js';
// ── Types ──────────────────────────────────────────────────────────────────────
interface CollectionResult {
totalFetched: number;
unextractedIds: string[];
sampleTranscripts: TranscriptDoc[];
}
interface BatchResult {
processed: number;
succeeded: string[];
failed: Array<{ id: string; error: string }>;
skipped: boolean;
}
export interface TranscriptExtractionReport {
runId: string;
productId: 'lysnrai';
dryRun: boolean;
totalFetched: number;
unextractedCount: number;
processed: number;
succeeded: number;
failed: number;
failedIds: string[];
sampleExtractedIds: string[];
summary: string;
generatedAt: string;
}
// ── Step 1: TranscriptCollectorAgent ──────────────────────────────────────────
async function collectUnextracted(
limit: number,
opts: { token?: string; requestId?: string }
): Promise<CollectionResult> {
const result = await lysnraiTranscriptsList({ limit }, opts);
const transcripts = result.transcripts;
const unextracted = transcripts.filter(t => !t.extractedAt);
return {
totalFetched: transcripts.length,
unextractedIds: unextracted.map(t => t.id),
sampleTranscripts: unextracted.slice(0, 5),
};
}
// ── Step 2: ExtractionBatchAgent ──────────────────────────────────────────────
async function runExtractionBatch(
transcriptIds: string[],
dryRun: boolean,
opts: { token?: string; requestId?: string }
): Promise<BatchResult> {
if (dryRun || transcriptIds.length === 0) {
return {
processed: 0,
succeeded: [],
failed: [],
skipped: dryRun,
};
}
const succeeded: string[] = [];
const failed: Array<{ id: string; error: string }> = [];
for (const id of transcriptIds) {
try {
await lysnraiTranscriptRunExtraction(id, opts);
succeeded.push(id);
} catch (err) {
failed.push({ id, error: err instanceof Error ? err.message : String(err) });
}
}
return { processed: transcriptIds.length, succeeded, failed, skipped: false };
}
// ── Step 3: ExtractionReportAgent ─────────────────────────────────────────────
function buildReport(
runId: string,
dryRun: boolean,
collection: CollectionResult,
batch: BatchResult
): TranscriptExtractionReport {
const now = new Date().toISOString();
const unextractedCount = collection.unextractedIds.length;
let summary: string;
if (dryRun) {
summary = `DRY RUN: Found ${unextractedCount} unextracted transcripts out of ${collection.totalFetched} fetched. No extraction was run.`;
} else if (unextractedCount === 0) {
summary = `All ${collection.totalFetched} transcripts are already extracted. Nothing to do.`;
} else {
const failNote = batch.failed.length > 0 ? ` ${batch.failed.length} failed.` : '';
summary = `Extracted ${batch.succeeded.length}/${unextractedCount} transcripts.${failNote}`;
}
return {
runId,
productId: 'lysnrai',
dryRun,
totalFetched: collection.totalFetched,
unextractedCount,
processed: batch.processed,
succeeded: batch.succeeded.length,
failed: batch.failed.length,
failedIds: batch.failed.map(f => f.id),
sampleExtractedIds: batch.succeeded.slice(0, 5),
summary,
generatedAt: now,
};
}
// ── Pipeline runner ────────────────────────────────────────────────────────────
async function runTranscriptExtractionPipeline(
limit: number,
dryRun: boolean,
req: McpToolRequest
): Promise<TranscriptExtractionReport> {
const runId = randomUUID();
const opts = {
token: req.headers.authorization?.slice(7),
requestId: req.id,
};
req.log.info({ runId, stepId: 'collect', limit, dryRun }, 'TranscriptCollectorAgent start');
const collection = await collectUnextracted(limit, opts);
req.log.info(
{
runId,
stepId: 'collect',
totalFetched: collection.totalFetched,
unextractedCount: collection.unextractedIds.length,
},
'TranscriptCollectorAgent done'
);
req.log.info(
{ runId, stepId: 'batch', count: collection.unextractedIds.length, dryRun },
'ExtractionBatchAgent start'
);
const batch = await runExtractionBatch(collection.unextractedIds, dryRun, opts);
req.log.info(
{ runId, stepId: 'batch', succeeded: batch.succeeded.length, failed: batch.failed.length },
'ExtractionBatchAgent done'
);
req.log.info({ runId, stepId: 'report' }, 'ExtractionReportAgent start');
const report = buildReport(runId, dryRun, collection, batch);
req.log.info({ runId, stepId: 'report', summary: report.summary }, 'ExtractionReportAgent done');
return report;
}
// ── MCP tool registration ─────────────────────────────────────────────────────
registerTool({
name: 'lysnrai.transcripts.runExtractionPipeline',
description:
'A2A pipeline: fetches LysnrAI transcripts missing extraction data, runs the extraction service on each, and returns a report with counts and failures. Use dryRun=true to preview without running extraction. Requires admin role.',
requiredRole: 'admin',
inputSchema: z.object({
limit: z.coerce
.number()
.min(1)
.max(config.QUERY_MAX_LIMIT)
.default(config.QUERY_DEFAULT_LIMIT)
.describe('Max transcripts to fetch and process per run'),
dryRun: z
.boolean()
.default(false)
.describe('If true, only collect and count — do not run extraction'),
}),
async execute(args, req) {
return runTranscriptExtractionPipeline(args.limit, args.dryRun, req);
},
});