feat(mcp-server): A2A batch-2 — STTFallbackMonitorAgent (lysnrai) + ProgressAnalystAgent (jarvis) + BrainOverflowAgent (mindlyst)
stt-fallback-monitor-pipeline.ts: lysnrai.stt.monitorFallback - STTEventScannerAgent -> DiagnosticsLaunchAgent -> FallbackReportAgent - Queries stt_completed telemetry, splits azure vs whisper events, computes offline rate - Opens diagnostics session if rate exceeds threshold (default 20%); 6h default window progress-analyst-pipeline.ts: jarvis.progress.analyze - SessionMetricsCollectorAgent -> PlateauDetectorAgent -> ProgressReportAgent - Collects skillMetrics timeseries per agent from recent sessions - Detects plateau (delta < threshold), recommends increase_difficulty or supplementary_agent brain-overflow-pipeline.ts: mindlyst.brains.checkOverflow - BrainInventoryAgent -> OverflowDetectorAgent -> OverflowReportAgent - Flags brains above item count threshold with no acted-on items in N days - Per-brain suggestion: archive / reassign / review MCP server total: 99 tools
This commit is contained in:
parent
e4d489d40c
commit
b8e230f018
235
services/mcp-server/src/modules/a2a/brain-overflow-pipeline.ts
Normal file
235
services/mcp-server/src/modules/a2a/brain-overflow-pipeline.ts
Normal file
@ -0,0 +1,235 @@
|
||||
/**
|
||||
* BrainOverflowAgent — A2A pipeline for MindLyst brain capacity management.
|
||||
*
|
||||
* Agent roster (3 steps):
|
||||
* 1. BrainInventoryAgent — list all brains; query item counts per brain
|
||||
* 2. OverflowDetectorAgent — flag brains above itemThreshold with no acted-on items in staleDays
|
||||
* 3. OverflowReportAgent — assemble report with archive / reassign suggestions per overflowed brain
|
||||
*
|
||||
* MCP tools:
|
||||
* mindlyst.brains.checkOverflow(itemThreshold?, staleDays?) — run pipeline
|
||||
*/
|
||||
|
||||
import { randomUUID } from 'node:crypto';
|
||||
import { z } from 'zod';
|
||||
import { registerTool } from '../tools/registry.js';
|
||||
import type { McpToolRequest } from '../tools/types.js';
|
||||
import {
|
||||
mindlystBrainsList,
|
||||
mindlystMemoryList,
|
||||
type BrainDoc,
|
||||
} from '../../lib/mindlyst-client.js';
|
||||
import { config } from '../../lib/config.js';
|
||||
|
||||
// ── Types ──────────────────────────────────────────────────────────────────────
|
||||
|
||||
type OverflowSuggestion = 'archive' | 'reassign' | 'review' | 'none';
|
||||
|
||||
interface BrainCapacityResult {
|
||||
brainId: string;
|
||||
brainName: string;
|
||||
totalItems: number;
|
||||
actedOnItems: number;
|
||||
pendingItems: number;
|
||||
mostRecentActedOnAt: string | null;
|
||||
staleDaysWithoutAction: number | null;
|
||||
isOverflowed: boolean;
|
||||
suggestion: OverflowSuggestion;
|
||||
suggestionReason: string;
|
||||
}
|
||||
|
||||
export interface BrainOverflowReport {
|
||||
runId: string;
|
||||
productId: 'mindlyst';
|
||||
itemThreshold: number;
|
||||
staleDays: number;
|
||||
brainsScanned: number;
|
||||
overflowedCount: number;
|
||||
healthyCount: number;
|
||||
perBrain: BrainCapacityResult[];
|
||||
summary: string;
|
||||
generatedAt: string;
|
||||
}
|
||||
|
||||
// ── Step 1: BrainInventoryAgent ───────────────────────────────────────────────
|
||||
|
||||
async function inventoryBrains(opts: { token?: string; requestId?: string }): Promise<BrainDoc[]> {
|
||||
try {
|
||||
const result = await mindlystBrainsList({ limit: config.QUERY_MAX_LIMIT }, opts);
|
||||
return result.items;
|
||||
} catch {
|
||||
return [];
|
||||
}
|
||||
}
|
||||
|
||||
// ── Step 2: OverflowDetectorAgent ─────────────────────────────────────────────
|
||||
|
||||
async function checkBrainCapacity(
|
||||
brain: BrainDoc,
|
||||
itemThreshold: number,
|
||||
staleDays: number,
|
||||
opts: { token?: string; requestId?: string }
|
||||
): Promise<BrainCapacityResult> {
|
||||
let totalItems = 0;
|
||||
let actedOnItems = 0;
|
||||
let pendingItems = 0;
|
||||
let mostRecentActedOnAt: string | null = null;
|
||||
|
||||
try {
|
||||
const result = await mindlystMemoryList(
|
||||
{ brainId: brain.id, limit: config.QUERY_MAX_LIMIT },
|
||||
opts
|
||||
);
|
||||
|
||||
totalItems = result.items.length;
|
||||
actedOnItems = result.items.filter(i => i.actedOn).length;
|
||||
pendingItems = totalItems - actedOnItems;
|
||||
|
||||
const actedDates = result.items
|
||||
.filter(i => i.actedOn && i.actedOnAt)
|
||||
.map(i => i.actedOnAt as string)
|
||||
.sort()
|
||||
.reverse();
|
||||
mostRecentActedOnAt = actedDates[0] ?? null;
|
||||
} catch {
|
||||
// best-effort
|
||||
}
|
||||
|
||||
const staleDaysWithoutAction = mostRecentActedOnAt
|
||||
? Math.floor((Date.now() - new Date(mostRecentActedOnAt).getTime()) / (1000 * 60 * 60 * 24))
|
||||
: null;
|
||||
|
||||
const isTooFull = totalItems >= itemThreshold;
|
||||
const isStale =
|
||||
staleDaysWithoutAction === null ? pendingItems > 0 : staleDaysWithoutAction >= staleDays;
|
||||
|
||||
const isOverflowed = isTooFull && isStale;
|
||||
|
||||
let suggestion: OverflowSuggestion = 'none';
|
||||
let suggestionReason = 'Brain is healthy.';
|
||||
|
||||
if (isOverflowed) {
|
||||
if (actedOnItems === 0 && totalItems >= itemThreshold) {
|
||||
suggestion = 'archive';
|
||||
suggestionReason = `${totalItems} items, none acted on — brain may be abandoned. Consider archiving.`;
|
||||
} else if (pendingItems > actedOnItems * 2) {
|
||||
suggestion = 'reassign';
|
||||
suggestionReason = `${pendingItems} pending vs ${actedOnItems} acted-on, stale for ${staleDaysWithoutAction ?? '?'} days. Reassign pending items to active brains.`;
|
||||
} else {
|
||||
suggestion = 'review';
|
||||
suggestionReason = `${totalItems} items with ${staleDaysWithoutAction ?? '?'} days since last action. Manual review recommended.`;
|
||||
}
|
||||
} else if (isTooFull) {
|
||||
suggestion = 'review';
|
||||
suggestionReason = `${totalItems} items (above threshold ${itemThreshold}) but recently active. Consider splitting into sub-brains.`;
|
||||
}
|
||||
|
||||
return {
|
||||
brainId: brain.id,
|
||||
brainName: brain.name,
|
||||
totalItems,
|
||||
actedOnItems,
|
||||
pendingItems,
|
||||
mostRecentActedOnAt,
|
||||
staleDaysWithoutAction,
|
||||
isOverflowed,
|
||||
suggestion,
|
||||
suggestionReason,
|
||||
};
|
||||
}
|
||||
|
||||
// ── Step 3: OverflowReportAgent ───────────────────────────────────────────────
|
||||
|
||||
function assembleOverflowReport(
|
||||
runId: string,
|
||||
itemThreshold: number,
|
||||
staleDays: number,
|
||||
results: BrainCapacityResult[]
|
||||
): BrainOverflowReport {
|
||||
const overflowedCount = results.filter(r => r.isOverflowed).length;
|
||||
const healthyCount = results.filter(r => !r.isOverflowed).length;
|
||||
|
||||
const summary =
|
||||
results.length === 0
|
||||
? 'No brains found.'
|
||||
: overflowedCount === 0
|
||||
? `All ${results.length} brains are within capacity thresholds.`
|
||||
: `${overflowedCount}/${results.length} brains flagged for overflow (>${itemThreshold} items, stale >${staleDays} days). ${results.filter(r => r.suggestion === 'archive').length} archive, ${results.filter(r => r.suggestion === 'reassign').length} reassign, ${results.filter(r => r.suggestion === 'review').length} review.`;
|
||||
|
||||
return {
|
||||
runId,
|
||||
productId: 'mindlyst',
|
||||
itemThreshold,
|
||||
staleDays,
|
||||
brainsScanned: results.length,
|
||||
overflowedCount,
|
||||
healthyCount,
|
||||
perBrain: results,
|
||||
summary,
|
||||
generatedAt: new Date().toISOString(),
|
||||
};
|
||||
}
|
||||
|
||||
// ── Pipeline runner ────────────────────────────────────────────────────────────
|
||||
|
||||
async function runBrainOverflowPipeline(
|
||||
itemThreshold: number,
|
||||
staleDays: number,
|
||||
req: McpToolRequest
|
||||
): Promise<BrainOverflowReport> {
|
||||
const runId = randomUUID();
|
||||
const opts = { token: req.headers.authorization?.slice(7), requestId: req.id };
|
||||
|
||||
req.log.info({ runId, stepId: 'inventory' }, 'BrainInventoryAgent start');
|
||||
const brains = await inventoryBrains(opts);
|
||||
req.log.info(
|
||||
{ runId, stepId: 'inventory', brainCount: brains.length },
|
||||
'BrainInventoryAgent done'
|
||||
);
|
||||
|
||||
req.log.info(
|
||||
{ runId, stepId: 'detect', itemThreshold, staleDays },
|
||||
'OverflowDetectorAgent start'
|
||||
);
|
||||
const results: BrainCapacityResult[] = [];
|
||||
for (const brain of brains) {
|
||||
const result = await checkBrainCapacity(brain, itemThreshold, staleDays, opts);
|
||||
results.push(result);
|
||||
}
|
||||
req.log.info(
|
||||
{ runId, stepId: 'detect', overflowedCount: results.filter(r => r.isOverflowed).length },
|
||||
'OverflowDetectorAgent done'
|
||||
);
|
||||
|
||||
req.log.info({ runId, stepId: 'report' }, 'OverflowReportAgent start');
|
||||
const report = assembleOverflowReport(runId, itemThreshold, staleDays, results);
|
||||
req.log.info({ runId, stepId: 'report', summary: report.summary }, 'OverflowReportAgent done');
|
||||
|
||||
return report;
|
||||
}
|
||||
|
||||
// ── MCP tool registration ─────────────────────────────────────────────────────
|
||||
|
||||
registerTool({
|
||||
name: 'mindlyst.brains.checkOverflow',
|
||||
description:
|
||||
'A2A pipeline: scans all MindLyst brains for capacity overflow — brains above a configured item count with no acted-on items in N days. Classifies each overflowed brain as archive, reassign, or review, and returns a capacity health report. Requires admin role.',
|
||||
requiredRole: 'admin',
|
||||
inputSchema: z.object({
|
||||
itemThreshold: z.coerce
|
||||
.number()
|
||||
.int()
|
||||
.min(1)
|
||||
.default(50)
|
||||
.describe('Item count above which a brain is considered full (default 50)'),
|
||||
staleDays: z.coerce
|
||||
.number()
|
||||
.int()
|
||||
.min(1)
|
||||
.default(14)
|
||||
.describe('Days without action before a full brain is flagged as overflowed (default 14)'),
|
||||
}),
|
||||
async execute(args, req) {
|
||||
return runBrainOverflowPipeline(args.itemThreshold, args.staleDays, req);
|
||||
},
|
||||
});
|
||||
272
services/mcp-server/src/modules/a2a/progress-analyst-pipeline.ts
Normal file
272
services/mcp-server/src/modules/a2a/progress-analyst-pipeline.ts
Normal file
@ -0,0 +1,272 @@
|
||||
/**
|
||||
* ProgressAnalystAgent — A2A pipeline for JarvisJr coaching skill progress analysis.
|
||||
*
|
||||
* Agent roster (3 steps):
|
||||
* 1. SessionMetricsCollectorAgent — per agent: list recent sessions, extract skillMetrics timeseries
|
||||
* 2. PlateauDetectorAgent — identify metrics that have not improved across last N sessions
|
||||
* 3. ProgressReportAgent — assemble per-agent recommendations (difficulty change / supplementary agent)
|
||||
*
|
||||
* MCP tools:
|
||||
* jarvis.progress.analyze(lookbackSessions?, plateauThreshold?) — run pipeline across all agents
|
||||
*/
|
||||
|
||||
import { randomUUID } from 'node:crypto';
|
||||
import { z } from 'zod';
|
||||
import { registerTool } from '../tools/registry.js';
|
||||
import type { McpToolRequest } from '../tools/types.js';
|
||||
import {
|
||||
jarvisAgentsList,
|
||||
jarvisSessionsList,
|
||||
type JarvisAgentDoc,
|
||||
type JarvisSessionDoc,
|
||||
} from '../../lib/jarvis-client.js';
|
||||
|
||||
// ── Types ──────────────────────────────────────────────────────────────────────
|
||||
|
||||
interface SkillTrend {
|
||||
skill: string;
|
||||
scores: number[];
|
||||
latest: number;
|
||||
earliest: number;
|
||||
delta: number;
|
||||
isPlateaued: boolean;
|
||||
}
|
||||
|
||||
interface AgentProgressAnalysis {
|
||||
agentId: string;
|
||||
agentName: string;
|
||||
sessionsAnalyzed: number;
|
||||
skills: SkillTrend[];
|
||||
plateauedSkills: string[];
|
||||
recommendation: AgentRecommendation;
|
||||
}
|
||||
|
||||
type AgentRecommendation =
|
||||
| { action: 'none'; reason: string }
|
||||
| { action: 'increase_difficulty'; currentLevel: string; reason: string }
|
||||
| { action: 'supplementary_agent'; suggestedRole: string; reason: string }
|
||||
| { action: 'insufficient_data'; reason: string };
|
||||
|
||||
export interface ProgressAnalystReport {
|
||||
runId: string;
|
||||
productId: 'jarvisjr';
|
||||
lookbackSessions: number;
|
||||
plateauThreshold: number;
|
||||
agentsAnalyzed: number;
|
||||
agentsWithPlateau: number;
|
||||
agentsProgressing: number;
|
||||
perAgent: AgentProgressAnalysis[];
|
||||
summary: string;
|
||||
generatedAt: string;
|
||||
}
|
||||
|
||||
// ── Step 1: SessionMetricsCollectorAgent ──────────────────────────────────────
|
||||
|
||||
async function collectAgentMetrics(
|
||||
agent: JarvisAgentDoc,
|
||||
lookbackSessions: number,
|
||||
opts: { token?: string; requestId?: string }
|
||||
): Promise<{ agent: JarvisAgentDoc; sessions: JarvisSessionDoc[] }> {
|
||||
try {
|
||||
const result = await jarvisSessionsList({ agentId: agent.id, limit: lookbackSessions }, opts);
|
||||
return { agent, sessions: result.sessions };
|
||||
} catch {
|
||||
return { agent, sessions: [] };
|
||||
}
|
||||
}
|
||||
|
||||
// ── Step 2: PlateauDetectorAgent ──────────────────────────────────────────────
|
||||
|
||||
function detectPlateaus(sessions: JarvisSessionDoc[], plateauThreshold: number): SkillTrend[] {
|
||||
const completed = sessions
|
||||
.filter(s => s.status === 'completed' && s.skillMetrics)
|
||||
.sort((a, b) => new Date(a.createdAt).getTime() - new Date(b.createdAt).getTime());
|
||||
|
||||
if (completed.length < 2) return [];
|
||||
|
||||
const skillMap = new Map<string, number[]>();
|
||||
for (const session of completed) {
|
||||
for (const [skill, score] of Object.entries(session.skillMetrics!)) {
|
||||
const arr = skillMap.get(skill) ?? [];
|
||||
arr.push(score);
|
||||
skillMap.set(skill, arr);
|
||||
}
|
||||
}
|
||||
|
||||
const trends: SkillTrend[] = [];
|
||||
for (const [skill, scores] of skillMap.entries()) {
|
||||
if (scores.length < 2) continue;
|
||||
const earliest = scores[0]!;
|
||||
const latest = scores[scores.length - 1]!;
|
||||
const delta = latest - earliest;
|
||||
const isPlateaued = Math.abs(delta) < plateauThreshold;
|
||||
trends.push({ skill, scores, latest, earliest, delta, isPlateaued });
|
||||
}
|
||||
|
||||
return trends;
|
||||
}
|
||||
|
||||
function buildRecommendation(
|
||||
agent: JarvisAgentDoc,
|
||||
trends: SkillTrend[],
|
||||
sessionCount: number
|
||||
): AgentRecommendation {
|
||||
if (sessionCount < 3) {
|
||||
return {
|
||||
action: 'insufficient_data',
|
||||
reason: `Only ${sessionCount} completed sessions — need at least 3 to detect plateaus.`,
|
||||
};
|
||||
}
|
||||
|
||||
const plateaued = trends.filter(t => t.isPlateaued);
|
||||
const progressing = trends.filter(t => !t.isPlateaued && t.delta > 0);
|
||||
|
||||
if (plateaued.length === 0) {
|
||||
return {
|
||||
action: 'none',
|
||||
reason: `${progressing.length} skill(s) improving. No plateau detected.`,
|
||||
};
|
||||
}
|
||||
|
||||
const avgLatest =
|
||||
plateaued.length > 0 ? plateaued.reduce((s, t) => s + t.latest, 0) / plateaued.length : 0;
|
||||
|
||||
const currentLevel =
|
||||
((agent as unknown as Record<string, unknown>)['difficultyLevel'] as string) ?? 'intermediate';
|
||||
|
||||
if (avgLatest >= 0.75 && currentLevel !== 'advanced') {
|
||||
return {
|
||||
action: 'increase_difficulty',
|
||||
currentLevel,
|
||||
reason: `${plateaued.map(t => t.skill).join(', ')} plateaued at high scores (avg ${(avgLatest * 100).toFixed(0)}%). User may have outgrown current difficulty.`,
|
||||
};
|
||||
}
|
||||
|
||||
return {
|
||||
action: 'supplementary_agent',
|
||||
suggestedRole: 'specialist',
|
||||
reason: `${plateaued.map(t => t.skill).join(', ')} stagnant despite ${sessionCount} sessions. A specialist agent targeting these skills may break the plateau.`,
|
||||
};
|
||||
}
|
||||
|
||||
// ── Step 3: ProgressReportAgent ───────────────────────────────────────────────
|
||||
|
||||
function assembleProgressReport(
|
||||
runId: string,
|
||||
lookbackSessions: number,
|
||||
plateauThreshold: number,
|
||||
analyses: AgentProgressAnalysis[]
|
||||
): ProgressAnalystReport {
|
||||
const agentsWithPlateau = analyses.filter(a => a.plateauedSkills.length > 0).length;
|
||||
const agentsProgressing = analyses.filter(
|
||||
a => a.plateauedSkills.length === 0 && a.recommendation.action !== 'insufficient_data'
|
||||
).length;
|
||||
|
||||
const summary =
|
||||
analyses.length === 0
|
||||
? 'No agents found to analyze.'
|
||||
: `${agentsWithPlateau}/${analyses.length} agents show skill plateaus. ${agentsProgressing} progressing normally.`;
|
||||
|
||||
return {
|
||||
runId,
|
||||
productId: 'jarvisjr',
|
||||
lookbackSessions,
|
||||
plateauThreshold,
|
||||
agentsAnalyzed: analyses.length,
|
||||
agentsWithPlateau,
|
||||
agentsProgressing,
|
||||
perAgent: analyses,
|
||||
summary,
|
||||
generatedAt: new Date().toISOString(),
|
||||
};
|
||||
}
|
||||
|
||||
// ── Pipeline runner ────────────────────────────────────────────────────────────
|
||||
|
||||
async function runProgressAnalystPipeline(
|
||||
lookbackSessions: number,
|
||||
plateauThreshold: number,
|
||||
req: McpToolRequest
|
||||
): Promise<ProgressAnalystReport> {
|
||||
const runId = randomUUID();
|
||||
const opts = { token: req.headers.authorization?.slice(7), requestId: req.id };
|
||||
|
||||
req.log.info(
|
||||
{ runId, stepId: 'collect', lookbackSessions },
|
||||
'SessionMetricsCollectorAgent start'
|
||||
);
|
||||
const agentsResult = await jarvisAgentsList({ limit: 50 }, opts).catch(() => ({
|
||||
agents: [] as JarvisAgentDoc[],
|
||||
total: 0,
|
||||
}));
|
||||
const collected = await Promise.all(
|
||||
agentsResult.agents.map(agent => collectAgentMetrics(agent, lookbackSessions, opts))
|
||||
);
|
||||
req.log.info(
|
||||
{ runId, stepId: 'collect', agentCount: collected.length },
|
||||
'SessionMetricsCollectorAgent done'
|
||||
);
|
||||
|
||||
req.log.info(
|
||||
{ runId, stepId: 'plateau', agentCount: collected.length },
|
||||
'PlateauDetectorAgent start'
|
||||
);
|
||||
const analyses: AgentProgressAnalysis[] = collected.map(({ agent, sessions }) => {
|
||||
const trends = detectPlateaus(sessions, plateauThreshold);
|
||||
const plateauedSkills = trends.filter(t => t.isPlateaued).map(t => t.skill);
|
||||
const recommendation = buildRecommendation(
|
||||
agent,
|
||||
trends,
|
||||
sessions.filter(s => s.status === 'completed').length
|
||||
);
|
||||
return {
|
||||
agentId: agent.id,
|
||||
agentName: agent.name,
|
||||
sessionsAnalyzed: sessions.length,
|
||||
skills: trends,
|
||||
plateauedSkills,
|
||||
recommendation,
|
||||
};
|
||||
});
|
||||
req.log.info(
|
||||
{
|
||||
runId,
|
||||
stepId: 'plateau',
|
||||
plateauCount: analyses.filter(a => a.plateauedSkills.length > 0).length,
|
||||
},
|
||||
'PlateauDetectorAgent done'
|
||||
);
|
||||
|
||||
req.log.info({ runId, stepId: 'report' }, 'ProgressReportAgent start');
|
||||
const report = assembleProgressReport(runId, lookbackSessions, plateauThreshold, analyses);
|
||||
req.log.info({ runId, stepId: 'report', summary: report.summary }, 'ProgressReportAgent done');
|
||||
|
||||
return report;
|
||||
}
|
||||
|
||||
// ── MCP tool registration ─────────────────────────────────────────────────────
|
||||
|
||||
registerTool({
|
||||
name: 'jarvis.progress.analyze',
|
||||
description:
|
||||
'A2A pipeline: analyzes skill progression across all JarvisJr coaching agents. Collects skillMetrics from recent sessions, detects stagnation/plateaus, and recommends difficulty increases or supplementary agents. Returns a per-agent report with skill trend deltas and actionable recommendations. Requires admin role.',
|
||||
requiredRole: 'admin',
|
||||
inputSchema: z.object({
|
||||
lookbackSessions: z.coerce
|
||||
.number()
|
||||
.int()
|
||||
.min(3)
|
||||
.max(50)
|
||||
.default(10)
|
||||
.describe('Number of most recent sessions to analyze per agent (default 10)'),
|
||||
plateauThreshold: z.coerce
|
||||
.number()
|
||||
.min(0)
|
||||
.max(0.5)
|
||||
.default(0.05)
|
||||
.describe('Max skill score delta to classify as plateaued (default 0.05 = 5% change)'),
|
||||
}),
|
||||
async execute(args, req) {
|
||||
return runProgressAnalystPipeline(args.lookbackSessions, args.plateauThreshold, req);
|
||||
},
|
||||
});
|
||||
@ -0,0 +1,251 @@
|
||||
/**
|
||||
* STTFallbackMonitorAgent — A2A pipeline for LysnrAI offline STT fallback monitoring.
|
||||
*
|
||||
* Agent roster (3 steps):
|
||||
* 1. STTEventScannerAgent — query telemetry for stt_completed events, compute online/offline split
|
||||
* 2. DiagnosticsLaunchAgent — if offline rate exceeds threshold, open a diagnostics session
|
||||
* 3. FallbackReportAgent — assemble rate analysis + diagnostics session ID + recommendations
|
||||
*
|
||||
* MCP tools:
|
||||
* lysnrai.stt.monitorFallback(from?, to?, offlineThreshold?) — run pipeline
|
||||
*/
|
||||
|
||||
import { randomUUID } from 'node:crypto';
|
||||
import { z } from 'zod';
|
||||
import { registerTool } from '../tools/registry.js';
|
||||
import type { McpToolRequest } from '../tools/types.js';
|
||||
import {
|
||||
telemetryQuery,
|
||||
diagnosticsCreateSession,
|
||||
diagnosticsUpdateSession,
|
||||
type DebugSession,
|
||||
} from '../../lib/platform-client.js';
|
||||
import { lysnraiSttGetBackendStatus } from '../../lib/lysnrai-client.js';
|
||||
|
||||
// ── Types ──────────────────────────────────────────────────────────────────────
|
||||
|
||||
interface SttEventScan {
|
||||
totalEvents: number;
|
||||
azureCount: number;
|
||||
whisperCount: number;
|
||||
unknownCount: number;
|
||||
offlineRate: number;
|
||||
exceedsThreshold: boolean;
|
||||
threshold: number;
|
||||
currentBackend: string | null;
|
||||
}
|
||||
|
||||
export interface STTFallbackReport {
|
||||
runId: string;
|
||||
productId: 'lysnrai';
|
||||
from: string;
|
||||
to: string;
|
||||
offlineThreshold: number;
|
||||
totalEvents: number;
|
||||
azureCount: number;
|
||||
whisperCount: number;
|
||||
offlineRate: number;
|
||||
thresholdExceeded: boolean;
|
||||
diagnosticsSessionId: string | null;
|
||||
currentBackend: string | null;
|
||||
recommendation: string;
|
||||
generatedAt: string;
|
||||
}
|
||||
|
||||
// ── Step 1: STTEventScannerAgent ──────────────────────────────────────────────
|
||||
|
||||
async function scanSttEvents(
|
||||
fromTime: string,
|
||||
toTime: string,
|
||||
offlineThreshold: number,
|
||||
opts: { token?: string; requestId?: string }
|
||||
): Promise<SttEventScan> {
|
||||
let totalEvents = 0;
|
||||
let azureCount = 0;
|
||||
let whisperCount = 0;
|
||||
let unknownCount = 0;
|
||||
let currentBackend: string | null = null;
|
||||
|
||||
try {
|
||||
const result = await telemetryQuery(
|
||||
{
|
||||
productId: 'lysnrai',
|
||||
eventType: 'stt_completed',
|
||||
from: fromTime,
|
||||
to: toTime,
|
||||
limit: 500,
|
||||
},
|
||||
{ ...opts, productId: 'lysnrai' }
|
||||
);
|
||||
|
||||
const events = result.events as Array<Record<string, unknown>>;
|
||||
totalEvents = events.length;
|
||||
|
||||
for (const event of events) {
|
||||
const props = (event['properties'] as Record<string, unknown>) ?? {};
|
||||
const provider = String(props['stt_provider'] ?? event['stt_provider'] ?? '').toLowerCase();
|
||||
if (provider === 'azure') azureCount++;
|
||||
else if (provider === 'whisper') whisperCount++;
|
||||
else unknownCount++;
|
||||
}
|
||||
} catch {
|
||||
// best-effort
|
||||
}
|
||||
|
||||
try {
|
||||
const status = await lysnraiSttGetBackendStatus(opts);
|
||||
currentBackend = ((status as unknown as Record<string, unknown>)['provider'] as string) ?? null;
|
||||
} catch {
|
||||
// best-effort
|
||||
}
|
||||
|
||||
const offlineRate = totalEvents > 0 ? whisperCount / totalEvents : 0;
|
||||
const exceedsThreshold = offlineRate >= offlineThreshold;
|
||||
|
||||
return {
|
||||
totalEvents,
|
||||
azureCount,
|
||||
whisperCount,
|
||||
unknownCount,
|
||||
offlineRate,
|
||||
exceedsThreshold,
|
||||
threshold: offlineThreshold,
|
||||
currentBackend,
|
||||
};
|
||||
}
|
||||
|
||||
// ── Step 2: DiagnosticsLaunchAgent ────────────────────────────────────────────
|
||||
|
||||
async function launchDiagnosticsIfNeeded(
|
||||
scan: SttEventScan,
|
||||
opts: { token?: string; requestId?: string }
|
||||
): Promise<DebugSession | null> {
|
||||
if (!scan.exceedsThreshold) return null;
|
||||
|
||||
try {
|
||||
const session = await diagnosticsCreateSession(
|
||||
{
|
||||
productId: 'lysnrai',
|
||||
collectionLevel: 'debug',
|
||||
},
|
||||
opts
|
||||
);
|
||||
return session;
|
||||
} catch {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
// ── Step 3: FallbackReportAgent ───────────────────────────────────────────────
|
||||
|
||||
async function buildFallbackReport(
|
||||
runId: string,
|
||||
scan: SttEventScan,
|
||||
diagnosticsSession: DebugSession | null,
|
||||
fromTime: string,
|
||||
toTime: string,
|
||||
opts: { requestId?: string }
|
||||
): Promise<STTFallbackReport> {
|
||||
if (diagnosticsSession) {
|
||||
try {
|
||||
await diagnosticsUpdateSession(diagnosticsSession.id, { status: 'completed' }, opts);
|
||||
} catch {
|
||||
// best-effort
|
||||
}
|
||||
}
|
||||
|
||||
let recommendation: string;
|
||||
if (scan.totalEvents === 0) {
|
||||
recommendation =
|
||||
'No STT events found in the specified window. Expand the time range or verify telemetry is flowing.';
|
||||
} else if (!scan.exceedsThreshold) {
|
||||
recommendation = `Offline fallback rate ${(scan.offlineRate * 100).toFixed(1)}% is within acceptable threshold (${(scan.threshold * 100).toFixed(0)}%). Azure Speech SDK is functioning normally.`;
|
||||
} else {
|
||||
recommendation = `ALERT: ${(scan.offlineRate * 100).toFixed(1)}% of STT events used Whisper (offline fallback), exceeding ${(scan.threshold * 100).toFixed(0)}% threshold. Check Azure Speech service connectivity, subscription key validity, and network routing from LysnrAI desktop app.`;
|
||||
}
|
||||
|
||||
return {
|
||||
runId,
|
||||
productId: 'lysnrai',
|
||||
from: fromTime,
|
||||
to: toTime,
|
||||
offlineThreshold: scan.threshold,
|
||||
totalEvents: scan.totalEvents,
|
||||
azureCount: scan.azureCount,
|
||||
whisperCount: scan.whisperCount,
|
||||
offlineRate: scan.offlineRate,
|
||||
thresholdExceeded: scan.exceedsThreshold,
|
||||
diagnosticsSessionId: diagnosticsSession?.id ?? null,
|
||||
currentBackend: scan.currentBackend,
|
||||
recommendation,
|
||||
generatedAt: new Date().toISOString(),
|
||||
};
|
||||
}
|
||||
|
||||
// ── Pipeline runner ────────────────────────────────────────────────────────────
|
||||
|
||||
async function runSTTFallbackMonitorPipeline(
|
||||
fromTime: string,
|
||||
toTime: string,
|
||||
offlineThreshold: number,
|
||||
req: McpToolRequest
|
||||
): Promise<STTFallbackReport> {
|
||||
const runId = randomUUID();
|
||||
const opts = { token: req.headers.authorization?.slice(7), requestId: req.id };
|
||||
|
||||
req.log.info(
|
||||
{ runId, stepId: 'scan', fromTime, toTime, offlineThreshold },
|
||||
'STTEventScannerAgent start'
|
||||
);
|
||||
const scan = await scanSttEvents(fromTime, toTime, offlineThreshold, opts);
|
||||
req.log.info(
|
||||
{ runId, stepId: 'scan', totalEvents: scan.totalEvents, offlineRate: scan.offlineRate },
|
||||
'STTEventScannerAgent done'
|
||||
);
|
||||
|
||||
req.log.info(
|
||||
{ runId, stepId: 'diagnostics', exceedsThreshold: scan.exceedsThreshold },
|
||||
'DiagnosticsLaunchAgent start'
|
||||
);
|
||||
const diagnosticsSession = await launchDiagnosticsIfNeeded(scan, opts);
|
||||
req.log.info(
|
||||
{ runId, stepId: 'diagnostics', sessionId: diagnosticsSession?.id ?? null },
|
||||
'DiagnosticsLaunchAgent done'
|
||||
);
|
||||
|
||||
req.log.info({ runId, stepId: 'report' }, 'FallbackReportAgent start');
|
||||
const report = await buildFallbackReport(runId, scan, diagnosticsSession, fromTime, toTime, {
|
||||
requestId: req.id,
|
||||
});
|
||||
req.log.info(
|
||||
{ runId, stepId: 'report', thresholdExceeded: report.thresholdExceeded },
|
||||
'FallbackReportAgent done'
|
||||
);
|
||||
|
||||
return report;
|
||||
}
|
||||
|
||||
// ── MCP tool registration ─────────────────────────────────────────────────────
|
||||
|
||||
registerTool({
|
||||
name: 'lysnrai.stt.monitorFallback',
|
||||
description:
|
||||
'A2A pipeline: scans LysnrAI STT telemetry events to measure the offline Whisper fallback rate vs Azure Speech SDK. If the offline rate exceeds the threshold, opens a diagnostics session. Returns an analysis report with provider split, current backend status, and actionable recommendations. Requires admin role.',
|
||||
requiredRole: 'admin',
|
||||
inputSchema: z.object({
|
||||
from: z.string().datetime().optional().describe('Start of telemetry window (default: 6h ago)'),
|
||||
to: z.string().datetime().optional().describe('End of telemetry window (default: now)'),
|
||||
offlineThreshold: z.coerce
|
||||
.number()
|
||||
.min(0)
|
||||
.max(1)
|
||||
.default(0.2)
|
||||
.describe('Fraction of events using offline Whisper that triggers an alert (default 0.20)'),
|
||||
}),
|
||||
async execute(args, req) {
|
||||
const now = new Date();
|
||||
const toTime = args.to ?? now.toISOString();
|
||||
const fromTime = args.from ?? new Date(now.getTime() - 6 * 60 * 60 * 1000).toISOString();
|
||||
return runSTTFallbackMonitorPipeline(fromTime, toTime, args.offlineThreshold, req);
|
||||
},
|
||||
});
|
||||
@ -45,6 +45,9 @@ import './modules/a2a/route-safety-pipeline.js';
|
||||
import './modules/a2a/memory-curation-pipeline.js';
|
||||
import './modules/a2a/engagement-pipeline.js';
|
||||
import './modules/a2a/triage-quality-pipeline.js';
|
||||
import './modules/a2a/stt-fallback-monitor-pipeline.js';
|
||||
import './modules/a2a/progress-analyst-pipeline.js';
|
||||
import './modules/a2a/brain-overflow-pipeline.js';
|
||||
import './modules/mindlyst/mindlyst-tools.js';
|
||||
import './modules/lysnrai/lysnrai-tools.js';
|
||||
import './modules/jarvis/jarvis-tools.js';
|
||||
|
||||
Loading…
Reference in New Issue
Block a user