diff --git a/services/mcp-server/src/modules/a2a/engagement-pipeline.ts b/services/mcp-server/src/modules/a2a/engagement-pipeline.ts new file mode 100644 index 00000000..ddcd1bb5 --- /dev/null +++ b/services/mcp-server/src/modules/a2a/engagement-pipeline.ts @@ -0,0 +1,240 @@ +/** + * EngagementAgent — A2A pipeline for NomGap user engagement recovery. + * + * Agent roster (3 steps): + * 1. StreakRiskDetectorAgent — query telemetry for streak_risk events; cross-ref fasting stats + * 2. EngagementTriggerAgent — fire streak_risk push for at-risk users; escalate to weekly_digest if streak = 0 + * 3. EngagementReportAgent — assemble per-user engagement action report + * + * MCP tools: + * nomgap.engagement.recover(userId?, from?, to?) — run pipeline for one or all at-risk users + */ + +import { randomUUID } from 'node:crypto'; +import { z } from 'zod'; +import { registerTool } from '../tools/registry.js'; +import type { McpToolRequest } from '../tools/types.js'; +import { telemetryQuery } from '../../lib/platform-client.js'; +import { nomgapFastingGetStats, nomgapPushFire } from '../../lib/nomgap-client.js'; + +// ── Types ────────────────────────────────────────────────────────────────────── + +type EngagementAction = 'streak_risk' | 'weekly_digest' | 'none'; + +interface RiskSignal { + userId: string; + recentRiskEvents: number; + currentStreak: number | null; + longestFast: number | null; +} + +interface TriggerResult { + userId: string; + action: EngagementAction; + fired: boolean; + skipReason?: string; + error?: string; +} + +export interface EngagementReport { + runId: string; + productId: 'nomgap'; + usersScanned: number; + atRiskCount: number; + zeroStreakCount: number; + fired: number; + skipped: number; + failed: number; + perUser: TriggerResult[]; + summary: string; + generatedAt: string; +} + +// ── Step 1: StreakRiskDetectorAgent ──────────────────────────────────────────── + +async function detectAtRiskUsers( + userIdFilter: string | null, + fromTime: string, + toTime: string, + opts: { token?: string; requestId?: string } +): Promise { + let riskUserIds: string[] = []; + + try { + const result = await telemetryQuery( + { + productId: 'nomgap', + eventType: 'streak_risk', + from: fromTime, + to: toTime, + limit: 50, + }, + { ...opts, productId: 'nomgap' } + ); + + const events = result.events as Array>; + const seen = new Set(); + for (const event of events) { + const uid = (event['userId'] as string) || (event['anonymousId'] as string); + if (uid && !seen.has(uid)) { + seen.add(uid); + riskUserIds.push(uid); + } + } + } catch { + // best-effort + } + + // If caller specified a single userId, target only that user + if (userIdFilter) { + if (!riskUserIds.includes(userIdFilter)) riskUserIds = [userIdFilter]; + else riskUserIds = [userIdFilter]; + } + + const signals: RiskSignal[] = []; + for (const userId of riskUserIds) { + let currentStreak: number | null = null; + let longestFast: number | null = null; + + try { + const stats = await nomgapFastingGetStats({ token: opts.token, requestId: opts.requestId }); + const s = stats as Record; + currentStreak = typeof s['currentStreak'] === 'number' ? s['currentStreak'] : null; + longestFast = typeof s['longestFastHours'] === 'number' ? s['longestFastHours'] : null; + } catch { + // best-effort + } + + signals.push({ + userId, + recentRiskEvents: riskUserIds.filter(id => id === userId).length, + currentStreak, + longestFast, + }); + } + + return signals; +} + +// ── Step 2: EngagementTriggerAgent ──────────────────────────────────────────── + +async function fireTrigger( + signal: RiskSignal, + opts: { token?: string; requestId?: string } +): Promise { + const { userId, currentStreak } = signal; + + // Escalate to weekly_digest if streak has dropped to zero + const action: EngagementAction = currentStreak === 0 ? 'weekly_digest' : 'streak_risk'; + + try { + await nomgapPushFire( + { type: action, userId }, + { token: opts.token, requestId: opts.requestId } + ); + return { userId, action, fired: true }; + } catch (err) { + return { + userId, + action, + fired: false, + error: err instanceof Error ? err.message : String(err), + }; + } +} + +// ── Step 3: EngagementReportAgent ───────────────────────────────────────────── + +function buildEngagementReport( + runId: string, + signals: RiskSignal[], + results: TriggerResult[] +): EngagementReport { + const now = new Date().toISOString(); + const fired = results.filter(r => r.fired).length; + const skipped = results.filter(r => !r.fired && !r.error).length; + const failed = results.filter(r => !!r.error).length; + const zeroStreakCount = signals.filter(s => s.currentStreak === 0).length; + + const summary = + signals.length === 0 + ? 'No at-risk users detected in the specified window.' + : `${fired} engagement push(es) fired for ${signals.length} at-risk user(s). ${zeroStreakCount} escalated to weekly_digest (streak = 0). ${failed} failed.`; + + return { + runId, + productId: 'nomgap', + usersScanned: signals.length, + atRiskCount: signals.length, + zeroStreakCount, + fired, + skipped, + failed, + perUser: results, + summary, + generatedAt: now, + }; +} + +// ── Pipeline runner ──────────────────────────────────────────────────────────── + +async function runEngagementPipeline( + userIdFilter: string | null, + fromTime: string, + toTime: string, + req: McpToolRequest +): Promise { + const runId = randomUUID(); + const opts = { token: req.headers.authorization?.slice(7), requestId: req.id }; + + req.log.info({ runId, stepId: 'detect', userIdFilter }, 'StreakRiskDetectorAgent start'); + const signals = await detectAtRiskUsers(userIdFilter, fromTime, toTime, opts); + req.log.info( + { runId, stepId: 'detect', atRiskCount: signals.length }, + 'StreakRiskDetectorAgent done' + ); + + req.log.info({ runId, stepId: 'trigger', count: signals.length }, 'EngagementTriggerAgent start'); + const results: TriggerResult[] = []; + for (const signal of signals) { + const result = await fireTrigger(signal, opts); + results.push(result); + } + req.log.info( + { runId, stepId: 'trigger', fired: results.filter(r => r.fired).length }, + 'EngagementTriggerAgent done' + ); + + req.log.info({ runId, stepId: 'report' }, 'EngagementReportAgent start'); + const report = buildEngagementReport(runId, signals, results); + req.log.info({ runId, stepId: 'report', summary: report.summary }, 'EngagementReportAgent done'); + + return report; +} + +// ── MCP tool registration ───────────────────────────────────────────────────── + +registerTool({ + name: 'nomgap.engagement.recover', + description: + 'A2A pipeline: detects at-risk NomGap users from streak_risk telemetry events, fires streak_risk push notifications (or weekly_digest for zero-streak users), and returns a per-user engagement action report. Optionally target a single userId. Requires admin role.', + requiredRole: 'admin', + inputSchema: z.object({ + userId: z + .string() + .optional() + .describe('Target a specific user (omit to process all at-risk users from telemetry)'), + from: z + .string() + .datetime() + .optional() + .describe('Start of risk signal window (default: 24h ago)'), + to: z.string().datetime().optional().describe('End of risk signal window (default: now)'), + }), + async execute(args, req) { + const now = new Date(); + const toTime = args.to ?? now.toISOString(); + const fromTime = args.from ?? new Date(now.getTime() - 24 * 60 * 60 * 1000).toISOString(); + return runEngagementPipeline(args.userId ?? null, fromTime, toTime, req); + }, +}); diff --git a/services/mcp-server/src/modules/a2a/memory-curation-pipeline.ts b/services/mcp-server/src/modules/a2a/memory-curation-pipeline.ts new file mode 100644 index 00000000..71cae270 --- /dev/null +++ b/services/mcp-server/src/modules/a2a/memory-curation-pipeline.ts @@ -0,0 +1,225 @@ +/** + * MemoryCurationAgent — A2A pipeline for JarvisJr agent memory hygiene. + * + * Agent roster (3 steps): + * 1. AgentInventoryAgent — list all agents owned by a user + * 2. MemoryScanAgent — per agent: list memories below importance threshold + * 3. CurationReportAgent — prune stale entries + return per-agent curation report + * + * MCP tools: + * jarvis.memory.curate(importanceThreshold?, dryRun?) — run pipeline across all agents + */ + +import { randomUUID } from 'node:crypto'; +import { z } from 'zod'; +import { registerTool } from '../tools/registry.js'; +import type { McpToolRequest } from '../tools/types.js'; +import { + jarvisAgentsList, + jarvisMemoryList, + jarvisMemoryPrune, + type JarvisMemoryDoc, +} from '../../lib/jarvis-client.js'; +import { config } from '../../lib/config.js'; + +// ── Types ────────────────────────────────────────────────────────────────────── + +interface AgentMemoryScan { + agentId: string; + agentName: string; + totalMemories: number; + staleMemories: JarvisMemoryDoc[]; + expiredMemories: JarvisMemoryDoc[]; +} + +interface CurationAction { + agentId: string; + agentName: string; + staleCount: number; + expiredCount: number; + pruned: number; + skipped: boolean; +} + +export interface MemoryCurationReport { + runId: string; + productId: 'jarvisjr'; + importanceThreshold: number; + dryRun: boolean; + agentsScanned: number; + totalStaleFound: number; + totalExpiredFound: number; + totalPruned: number; + perAgent: CurationAction[]; + summary: string; + generatedAt: string; +} + +// ── Step 1: AgentInventoryAgent ──────────────────────────────────────────────── + +async function listAgents(opts: { + token?: string; + requestId?: string; +}): Promise> { + try { + const result = await jarvisAgentsList({ limit: config.QUERY_MAX_LIMIT }, opts); + return result.agents.map(a => ({ id: a.id, name: a.name })); + } catch { + return []; + } +} + +// ── Step 2: MemoryScanAgent ──────────────────────────────────────────────────── + +async function scanAgentMemory( + agentId: string, + agentName: string, + importanceThreshold: number, + opts: { token?: string; requestId?: string } +): Promise { + try { + const result = await jarvisMemoryList(agentId, { limit: config.QUERY_MAX_LIMIT }, opts); + const now = new Date(); + + const staleMemories = result.memories.filter(m => m.importance < importanceThreshold); + const expiredMemories = result.memories.filter(m => m.expiresAt && new Date(m.expiresAt) < now); + + return { + agentId, + agentName, + totalMemories: result.total, + staleMemories, + expiredMemories, + }; + } catch { + return { agentId, agentName, totalMemories: 0, staleMemories: [], expiredMemories: [] }; + } +} + +// ── Step 3: CurationReportAgent ──────────────────────────────────────────────── + +async function curateAgentMemory( + scan: AgentMemoryScan, + dryRun: boolean, + opts: { token?: string; requestId?: string } +): Promise { + const staleCount = scan.staleMemories.length; + const expiredCount = scan.expiredMemories.length; + + if (dryRun || (staleCount === 0 && expiredCount === 0)) { + return { + agentId: scan.agentId, + agentName: scan.agentName, + staleCount, + expiredCount, + pruned: 0, + skipped: dryRun || (staleCount === 0 && expiredCount === 0), + }; + } + + let pruned = 0; + try { + const result = await jarvisMemoryPrune(scan.agentId, opts); + pruned = result.pruned; + } catch { + // best-effort + } + + return { + agentId: scan.agentId, + agentName: scan.agentName, + staleCount, + expiredCount, + pruned, + skipped: false, + }; +} + +// ── Pipeline runner ──────────────────────────────────────────────────────────── + +async function runMemoryCurationPipeline( + importanceThreshold: number, + dryRun: boolean, + req: McpToolRequest +): Promise { + const runId = randomUUID(); + const opts = { token: req.headers.authorization?.slice(7), requestId: req.id }; + + req.log.info( + { runId, stepId: 'inventory', dryRun, importanceThreshold }, + 'AgentInventoryAgent start' + ); + const agents = await listAgents(opts); + req.log.info( + { runId, stepId: 'inventory', agentCount: agents.length }, + 'AgentInventoryAgent done' + ); + + req.log.info({ runId, stepId: 'scan', agentCount: agents.length }, 'MemoryScanAgent start'); + const scans: AgentMemoryScan[] = []; + for (const agent of agents) { + const scan = await scanAgentMemory(agent.id, agent.name, importanceThreshold, opts); + scans.push(scan); + } + req.log.info( + { runId, stepId: 'scan', totalStale: scans.reduce((s, x) => s + x.staleMemories.length, 0) }, + 'MemoryScanAgent done' + ); + + req.log.info({ runId, stepId: 'curate', dryRun }, 'CurationReportAgent start'); + const actions: CurationAction[] = []; + for (const scan of scans) { + const action = await curateAgentMemory(scan, dryRun, opts); + actions.push(action); + } + + const totalStaleFound = actions.reduce((s, a) => s + a.staleCount, 0); + const totalExpiredFound = actions.reduce((s, a) => s + a.expiredCount, 0); + const totalPruned = actions.reduce((s, a) => s + a.pruned, 0); + + const summary = dryRun + ? `DRY RUN: ${totalStaleFound} stale + ${totalExpiredFound} expired memories found across ${agents.length} agents. No pruning performed.` + : totalPruned > 0 + ? `Pruned ${totalPruned} entries across ${actions.filter(a => !a.skipped).length} agents (${totalStaleFound} stale, ${totalExpiredFound} expired found).` + : `No memories required pruning across ${agents.length} agents.`; + + req.log.info({ runId, stepId: 'curate', totalPruned, summary }, 'CurationReportAgent done'); + + return { + runId, + productId: 'jarvisjr', + importanceThreshold, + dryRun, + agentsScanned: agents.length, + totalStaleFound, + totalExpiredFound, + totalPruned, + perAgent: actions, + summary, + generatedAt: new Date().toISOString(), + }; +} + +// ── MCP tool registration ───────────────────────────────────────────────────── + +registerTool({ + name: 'jarvis.memory.curate', + description: + 'A2A pipeline: scans all JarvisJr coaching agents, identifies memories below an importance threshold or past their expiry date, prunes stale entries, and returns a per-agent hygiene report. Use dryRun=true to preview without pruning. Requires admin role.', + requiredRole: 'admin', + inputSchema: z.object({ + importanceThreshold: z.coerce + .number() + .min(0) + .max(1) + .default(0.3) + .describe('Memories with importance below this value are considered stale (default 0.3)'), + dryRun: z + .boolean() + .default(false) + .describe('If true, scan and report without pruning any memories'), + }), + async execute(args, req) { + return runMemoryCurationPipeline(args.importanceThreshold, args.dryRun, req); + }, +}); diff --git a/services/mcp-server/src/modules/a2a/triage-quality-pipeline.ts b/services/mcp-server/src/modules/a2a/triage-quality-pipeline.ts new file mode 100644 index 00000000..cc37ca57 --- /dev/null +++ b/services/mcp-server/src/modules/a2a/triage-quality-pipeline.ts @@ -0,0 +1,252 @@ +/** + * TriageQualityAgent — A2A pipeline for MindLyst memory triage quality improvement. + * + * Agent roster (3 steps): + * 1. LowConfidenceCollectorAgent — list memory items, filter for confidenceScore below threshold + * 2. RetriageAgent — re-run extraction on each low-confidence item + * 3. TriageQualityReportAgent — compare old vs new routing, auto-reassign if brainId changed + * + * MCP tools: + * mindlyst.memory.triageQuality(confidenceThreshold?, dryRun?) — run pipeline + */ + +import { randomUUID } from 'node:crypto'; +import { z } from 'zod'; +import { registerTool } from '../tools/registry.js'; +import type { McpToolRequest } from '../tools/types.js'; +import { + mindlystMemoryList, + mindlystMemoryRetriage, + mindlystMemoryReassign, +} from '../../lib/mindlyst-client.js'; +import { config } from '../../lib/config.js'; + +// ── Types ────────────────────────────────────────────────────────────────────── + +interface LowConfidenceItem { + id: string; + confidenceScore: number; + currentBrainId: string | null; + contentType: string; +} + +interface RetriageResult { + itemId: string; + oldConfidenceScore: number; + newConfidenceScore: number; + oldBrainId: string | null; + newBrainId: string | null; + brainChanged: boolean; + reassigned: boolean; + error?: string; +} + +export interface TriageQualityReport { + runId: string; + productId: 'mindlyst'; + confidenceThreshold: number; + dryRun: boolean; + totalFetched: number; + lowConfidenceCount: number; + retriaged: number; + improved: number; + brainChanges: number; + reassigned: number; + failed: number; + perItem: RetriageResult[]; + summary: string; + generatedAt: string; +} + +// ── Step 1: LowConfidenceCollectorAgent ─────────────────────────────────────── + +async function collectLowConfidenceItems( + confidenceThreshold: number, + opts: { token?: string; requestId?: string } +): Promise<{ items: LowConfidenceItem[]; totalFetched: number }> { + try { + const result = await mindlystMemoryList({ limit: config.QUERY_MAX_LIMIT }, opts); + + const allItems = result.items; + const lowConfidence = allItems + .filter(item => item.triageResult.confidenceScore < confidenceThreshold) + .map(item => ({ + id: item.id, + confidenceScore: item.triageResult.confidenceScore, + currentBrainId: item.brainIds[0] ?? null, + contentType: item.triageResult.contentType, + })); + + return { items: lowConfidence, totalFetched: allItems.length }; + } catch { + return { items: [], totalFetched: 0 }; + } +} + +// ── Step 2: RetriageAgent ───────────────────────────────────────────────────── + +async function retriageItem( + item: LowConfidenceItem, + dryRun: boolean, + opts: { token?: string; requestId?: string } +): Promise { + if (dryRun) { + return { + itemId: item.id, + oldConfidenceScore: item.confidenceScore, + newConfidenceScore: item.confidenceScore, + oldBrainId: item.currentBrainId, + newBrainId: item.currentBrainId, + brainChanged: false, + reassigned: false, + }; + } + + try { + const updated = await mindlystMemoryRetriage(item.id, opts); + const newConfidence = updated.triageResult.confidenceScore; + const newBrainId = updated.triageResult.suggestedBrainId ?? updated.brainIds[0] ?? null; + const brainChanged = newBrainId !== null && newBrainId !== item.currentBrainId; + + let reassigned = false; + if (brainChanged && newBrainId) { + try { + await mindlystMemoryReassign(item.id, newBrainId, opts); + reassigned = true; + } catch { + // best-effort reassignment + } + } + + return { + itemId: item.id, + oldConfidenceScore: item.confidenceScore, + newConfidenceScore: newConfidence, + oldBrainId: item.currentBrainId, + newBrainId, + brainChanged, + reassigned, + }; + } catch (err) { + return { + itemId: item.id, + oldConfidenceScore: item.confidenceScore, + newConfidenceScore: item.confidenceScore, + oldBrainId: item.currentBrainId, + newBrainId: item.currentBrainId, + brainChanged: false, + reassigned: false, + error: err instanceof Error ? err.message : String(err), + }; + } +} + +// ── Step 3: TriageQualityReportAgent ────────────────────────────────────────── + +function buildTriageReport( + runId: string, + confidenceThreshold: number, + dryRun: boolean, + totalFetched: number, + results: RetriageResult[] +): TriageQualityReport { + const now = new Date().toISOString(); + const improved = results.filter( + r => r.newConfidenceScore > r.oldConfidenceScore && !r.error + ).length; + const brainChanges = results.filter(r => r.brainChanged).length; + const reassigned = results.filter(r => r.reassigned).length; + const failed = results.filter(r => !!r.error).length; + + const summary = dryRun + ? `DRY RUN: ${results.length} items below confidence threshold ${confidenceThreshold} found. No retriaging performed.` + : results.length === 0 + ? `All ${totalFetched} memory items meet confidence threshold ${confidenceThreshold}. No action needed.` + : `Retriaged ${results.length - failed}/${results.length} items. ${improved} improved confidence, ${brainChanges} brain routing changes, ${reassigned} items reassigned. ${failed} failed.`; + + return { + runId, + productId: 'mindlyst', + confidenceThreshold, + dryRun, + totalFetched, + lowConfidenceCount: results.length, + retriaged: results.length - failed, + improved, + brainChanges, + reassigned, + failed, + perItem: results, + summary, + generatedAt: now, + }; +} + +// ── Pipeline runner ──────────────────────────────────────────────────────────── + +async function runTriageQualityPipeline( + confidenceThreshold: number, + dryRun: boolean, + req: McpToolRequest +): Promise { + const runId = randomUUID(); + const opts = { token: req.headers.authorization?.slice(7), requestId: req.id }; + + req.log.info( + { runId, stepId: 'collect', confidenceThreshold, dryRun }, + 'LowConfidenceCollectorAgent start' + ); + const { items, totalFetched } = await collectLowConfidenceItems(confidenceThreshold, opts); + req.log.info( + { runId, stepId: 'collect', totalFetched, lowConfidenceCount: items.length }, + 'LowConfidenceCollectorAgent done' + ); + + req.log.info({ runId, stepId: 'retriage', count: items.length, dryRun }, 'RetriageAgent start'); + const results: RetriageResult[] = []; + for (const item of items) { + const result = await retriageItem(item, dryRun, opts); + results.push(result); + } + req.log.info( + { + runId, + stepId: 'retriage', + improved: results.filter(r => r.newConfidenceScore > r.oldConfidenceScore).length, + }, + 'RetriageAgent done' + ); + + req.log.info({ runId, stepId: 'report' }, 'TriageQualityReportAgent start'); + const report = buildTriageReport(runId, confidenceThreshold, dryRun, totalFetched, results); + req.log.info( + { runId, stepId: 'report', summary: report.summary }, + 'TriageQualityReportAgent done' + ); + + return report; +} + +// ── MCP tool registration ───────────────────────────────────────────────────── + +registerTool({ + name: 'mindlyst.memory.triageQuality', + description: + 'A2A pipeline: finds MindLyst memory items with confidenceScore below a threshold, re-runs extraction on each, and auto-reassigns items where brain routing has changed. Returns a quality improvement report with confidence deltas and brain reassignment counts. Use dryRun=true to preview. Requires admin role.', + requiredRole: 'admin', + inputSchema: z.object({ + confidenceThreshold: z.coerce + .number() + .min(0) + .max(1) + .default(0.5) + .describe('Items with confidenceScore below this are re-triaged (default 0.5)'), + dryRun: z + .boolean() + .default(false) + .describe('If true, collect and count only — do not retriage or reassign'), + }), + async execute(args, req) { + return runTriageQualityPipeline(args.confidenceThreshold, args.dryRun, req); + }, +}); diff --git a/services/mcp-server/src/server.ts b/services/mcp-server/src/server.ts index 92f83266..cc43ed70 100644 --- a/services/mcp-server/src/server.ts +++ b/services/mcp-server/src/server.ts @@ -42,6 +42,9 @@ import './modules/a2a/sync-diagnostics-pipeline.js'; import './modules/a2a/transcript-extraction-pipeline.js'; import './modules/a2a/sync-conflict-pipeline.js'; import './modules/a2a/route-safety-pipeline.js'; +import './modules/a2a/memory-curation-pipeline.js'; +import './modules/a2a/engagement-pipeline.js'; +import './modules/a2a/triage-quality-pipeline.js'; import './modules/mindlyst/mindlyst-tools.js'; import './modules/lysnrai/lysnrai-tools.js'; import './modules/jarvis/jarvis-tools.js';