fix(triage-quality-pipeline): remove unused MemoryItemDoc import

This commit is contained in:
saravanakumardb1 2026-03-05 16:18:54 -08:00
parent f49099883a
commit e4d489d40c
4 changed files with 720 additions and 0 deletions

View File

@ -0,0 +1,240 @@
/**
* EngagementAgent A2A pipeline for NomGap user engagement recovery.
*
* Agent roster (3 steps):
* 1. StreakRiskDetectorAgent query telemetry for streak_risk events; cross-ref fasting stats
* 2. EngagementTriggerAgent fire streak_risk push for at-risk users; escalate to weekly_digest if streak = 0
* 3. EngagementReportAgent assemble per-user engagement action report
*
* MCP tools:
* nomgap.engagement.recover(userId?, from?, to?) run pipeline for one or all at-risk users
*/
import { randomUUID } from 'node:crypto';
import { z } from 'zod';
import { registerTool } from '../tools/registry.js';
import type { McpToolRequest } from '../tools/types.js';
import { telemetryQuery } from '../../lib/platform-client.js';
import { nomgapFastingGetStats, nomgapPushFire } from '../../lib/nomgap-client.js';
// ── Types ──────────────────────────────────────────────────────────────────────
type EngagementAction = 'streak_risk' | 'weekly_digest' | 'none';
interface RiskSignal {
userId: string;
recentRiskEvents: number;
currentStreak: number | null;
longestFast: number | null;
}
interface TriggerResult {
userId: string;
action: EngagementAction;
fired: boolean;
skipReason?: string;
error?: string;
}
export interface EngagementReport {
runId: string;
productId: 'nomgap';
usersScanned: number;
atRiskCount: number;
zeroStreakCount: number;
fired: number;
skipped: number;
failed: number;
perUser: TriggerResult[];
summary: string;
generatedAt: string;
}
// ── Step 1: StreakRiskDetectorAgent ────────────────────────────────────────────
async function detectAtRiskUsers(
userIdFilter: string | null,
fromTime: string,
toTime: string,
opts: { token?: string; requestId?: string }
): Promise<RiskSignal[]> {
let riskUserIds: string[] = [];
try {
const result = await telemetryQuery(
{
productId: 'nomgap',
eventType: 'streak_risk',
from: fromTime,
to: toTime,
limit: 50,
},
{ ...opts, productId: 'nomgap' }
);
const events = result.events as Array<Record<string, unknown>>;
const seen = new Set<string>();
for (const event of events) {
const uid = (event['userId'] as string) || (event['anonymousId'] as string);
if (uid && !seen.has(uid)) {
seen.add(uid);
riskUserIds.push(uid);
}
}
} catch {
// best-effort
}
// If caller specified a single userId, target only that user
if (userIdFilter) {
if (!riskUserIds.includes(userIdFilter)) riskUserIds = [userIdFilter];
else riskUserIds = [userIdFilter];
}
const signals: RiskSignal[] = [];
for (const userId of riskUserIds) {
let currentStreak: number | null = null;
let longestFast: number | null = null;
try {
const stats = await nomgapFastingGetStats({ token: opts.token, requestId: opts.requestId });
const s = stats as Record<string, unknown>;
currentStreak = typeof s['currentStreak'] === 'number' ? s['currentStreak'] : null;
longestFast = typeof s['longestFastHours'] === 'number' ? s['longestFastHours'] : null;
} catch {
// best-effort
}
signals.push({
userId,
recentRiskEvents: riskUserIds.filter(id => id === userId).length,
currentStreak,
longestFast,
});
}
return signals;
}
// ── Step 2: EngagementTriggerAgent ────────────────────────────────────────────
async function fireTrigger(
signal: RiskSignal,
opts: { token?: string; requestId?: string }
): Promise<TriggerResult> {
const { userId, currentStreak } = signal;
// Escalate to weekly_digest if streak has dropped to zero
const action: EngagementAction = currentStreak === 0 ? 'weekly_digest' : 'streak_risk';
try {
await nomgapPushFire(
{ type: action, userId },
{ token: opts.token, requestId: opts.requestId }
);
return { userId, action, fired: true };
} catch (err) {
return {
userId,
action,
fired: false,
error: err instanceof Error ? err.message : String(err),
};
}
}
// ── Step 3: EngagementReportAgent ─────────────────────────────────────────────
function buildEngagementReport(
runId: string,
signals: RiskSignal[],
results: TriggerResult[]
): EngagementReport {
const now = new Date().toISOString();
const fired = results.filter(r => r.fired).length;
const skipped = results.filter(r => !r.fired && !r.error).length;
const failed = results.filter(r => !!r.error).length;
const zeroStreakCount = signals.filter(s => s.currentStreak === 0).length;
const summary =
signals.length === 0
? 'No at-risk users detected in the specified window.'
: `${fired} engagement push(es) fired for ${signals.length} at-risk user(s). ${zeroStreakCount} escalated to weekly_digest (streak = 0). ${failed} failed.`;
return {
runId,
productId: 'nomgap',
usersScanned: signals.length,
atRiskCount: signals.length,
zeroStreakCount,
fired,
skipped,
failed,
perUser: results,
summary,
generatedAt: now,
};
}
// ── Pipeline runner ────────────────────────────────────────────────────────────
async function runEngagementPipeline(
userIdFilter: string | null,
fromTime: string,
toTime: string,
req: McpToolRequest
): Promise<EngagementReport> {
const runId = randomUUID();
const opts = { token: req.headers.authorization?.slice(7), requestId: req.id };
req.log.info({ runId, stepId: 'detect', userIdFilter }, 'StreakRiskDetectorAgent start');
const signals = await detectAtRiskUsers(userIdFilter, fromTime, toTime, opts);
req.log.info(
{ runId, stepId: 'detect', atRiskCount: signals.length },
'StreakRiskDetectorAgent done'
);
req.log.info({ runId, stepId: 'trigger', count: signals.length }, 'EngagementTriggerAgent start');
const results: TriggerResult[] = [];
for (const signal of signals) {
const result = await fireTrigger(signal, opts);
results.push(result);
}
req.log.info(
{ runId, stepId: 'trigger', fired: results.filter(r => r.fired).length },
'EngagementTriggerAgent done'
);
req.log.info({ runId, stepId: 'report' }, 'EngagementReportAgent start');
const report = buildEngagementReport(runId, signals, results);
req.log.info({ runId, stepId: 'report', summary: report.summary }, 'EngagementReportAgent done');
return report;
}
// ── MCP tool registration ─────────────────────────────────────────────────────
registerTool({
name: 'nomgap.engagement.recover',
description:
'A2A pipeline: detects at-risk NomGap users from streak_risk telemetry events, fires streak_risk push notifications (or weekly_digest for zero-streak users), and returns a per-user engagement action report. Optionally target a single userId. Requires admin role.',
requiredRole: 'admin',
inputSchema: z.object({
userId: z
.string()
.optional()
.describe('Target a specific user (omit to process all at-risk users from telemetry)'),
from: z
.string()
.datetime()
.optional()
.describe('Start of risk signal window (default: 24h ago)'),
to: z.string().datetime().optional().describe('End of risk signal window (default: now)'),
}),
async execute(args, req) {
const now = new Date();
const toTime = args.to ?? now.toISOString();
const fromTime = args.from ?? new Date(now.getTime() - 24 * 60 * 60 * 1000).toISOString();
return runEngagementPipeline(args.userId ?? null, fromTime, toTime, req);
},
});

View File

@ -0,0 +1,225 @@
/**
* MemoryCurationAgent A2A pipeline for JarvisJr agent memory hygiene.
*
* Agent roster (3 steps):
* 1. AgentInventoryAgent list all agents owned by a user
* 2. MemoryScanAgent per agent: list memories below importance threshold
* 3. CurationReportAgent prune stale entries + return per-agent curation report
*
* MCP tools:
* jarvis.memory.curate(importanceThreshold?, dryRun?) run pipeline across all agents
*/
import { randomUUID } from 'node:crypto';
import { z } from 'zod';
import { registerTool } from '../tools/registry.js';
import type { McpToolRequest } from '../tools/types.js';
import {
jarvisAgentsList,
jarvisMemoryList,
jarvisMemoryPrune,
type JarvisMemoryDoc,
} from '../../lib/jarvis-client.js';
import { config } from '../../lib/config.js';
// ── Types ──────────────────────────────────────────────────────────────────────
interface AgentMemoryScan {
agentId: string;
agentName: string;
totalMemories: number;
staleMemories: JarvisMemoryDoc[];
expiredMemories: JarvisMemoryDoc[];
}
interface CurationAction {
agentId: string;
agentName: string;
staleCount: number;
expiredCount: number;
pruned: number;
skipped: boolean;
}
export interface MemoryCurationReport {
runId: string;
productId: 'jarvisjr';
importanceThreshold: number;
dryRun: boolean;
agentsScanned: number;
totalStaleFound: number;
totalExpiredFound: number;
totalPruned: number;
perAgent: CurationAction[];
summary: string;
generatedAt: string;
}
// ── Step 1: AgentInventoryAgent ────────────────────────────────────────────────
async function listAgents(opts: {
token?: string;
requestId?: string;
}): Promise<Array<{ id: string; name: string }>> {
try {
const result = await jarvisAgentsList({ limit: config.QUERY_MAX_LIMIT }, opts);
return result.agents.map(a => ({ id: a.id, name: a.name }));
} catch {
return [];
}
}
// ── Step 2: MemoryScanAgent ────────────────────────────────────────────────────
async function scanAgentMemory(
agentId: string,
agentName: string,
importanceThreshold: number,
opts: { token?: string; requestId?: string }
): Promise<AgentMemoryScan> {
try {
const result = await jarvisMemoryList(agentId, { limit: config.QUERY_MAX_LIMIT }, opts);
const now = new Date();
const staleMemories = result.memories.filter(m => m.importance < importanceThreshold);
const expiredMemories = result.memories.filter(m => m.expiresAt && new Date(m.expiresAt) < now);
return {
agentId,
agentName,
totalMemories: result.total,
staleMemories,
expiredMemories,
};
} catch {
return { agentId, agentName, totalMemories: 0, staleMemories: [], expiredMemories: [] };
}
}
// ── Step 3: CurationReportAgent ────────────────────────────────────────────────
async function curateAgentMemory(
scan: AgentMemoryScan,
dryRun: boolean,
opts: { token?: string; requestId?: string }
): Promise<CurationAction> {
const staleCount = scan.staleMemories.length;
const expiredCount = scan.expiredMemories.length;
if (dryRun || (staleCount === 0 && expiredCount === 0)) {
return {
agentId: scan.agentId,
agentName: scan.agentName,
staleCount,
expiredCount,
pruned: 0,
skipped: dryRun || (staleCount === 0 && expiredCount === 0),
};
}
let pruned = 0;
try {
const result = await jarvisMemoryPrune(scan.agentId, opts);
pruned = result.pruned;
} catch {
// best-effort
}
return {
agentId: scan.agentId,
agentName: scan.agentName,
staleCount,
expiredCount,
pruned,
skipped: false,
};
}
// ── Pipeline runner ────────────────────────────────────────────────────────────
async function runMemoryCurationPipeline(
importanceThreshold: number,
dryRun: boolean,
req: McpToolRequest
): Promise<MemoryCurationReport> {
const runId = randomUUID();
const opts = { token: req.headers.authorization?.slice(7), requestId: req.id };
req.log.info(
{ runId, stepId: 'inventory', dryRun, importanceThreshold },
'AgentInventoryAgent start'
);
const agents = await listAgents(opts);
req.log.info(
{ runId, stepId: 'inventory', agentCount: agents.length },
'AgentInventoryAgent done'
);
req.log.info({ runId, stepId: 'scan', agentCount: agents.length }, 'MemoryScanAgent start');
const scans: AgentMemoryScan[] = [];
for (const agent of agents) {
const scan = await scanAgentMemory(agent.id, agent.name, importanceThreshold, opts);
scans.push(scan);
}
req.log.info(
{ runId, stepId: 'scan', totalStale: scans.reduce((s, x) => s + x.staleMemories.length, 0) },
'MemoryScanAgent done'
);
req.log.info({ runId, stepId: 'curate', dryRun }, 'CurationReportAgent start');
const actions: CurationAction[] = [];
for (const scan of scans) {
const action = await curateAgentMemory(scan, dryRun, opts);
actions.push(action);
}
const totalStaleFound = actions.reduce((s, a) => s + a.staleCount, 0);
const totalExpiredFound = actions.reduce((s, a) => s + a.expiredCount, 0);
const totalPruned = actions.reduce((s, a) => s + a.pruned, 0);
const summary = dryRun
? `DRY RUN: ${totalStaleFound} stale + ${totalExpiredFound} expired memories found across ${agents.length} agents. No pruning performed.`
: totalPruned > 0
? `Pruned ${totalPruned} entries across ${actions.filter(a => !a.skipped).length} agents (${totalStaleFound} stale, ${totalExpiredFound} expired found).`
: `No memories required pruning across ${agents.length} agents.`;
req.log.info({ runId, stepId: 'curate', totalPruned, summary }, 'CurationReportAgent done');
return {
runId,
productId: 'jarvisjr',
importanceThreshold,
dryRun,
agentsScanned: agents.length,
totalStaleFound,
totalExpiredFound,
totalPruned,
perAgent: actions,
summary,
generatedAt: new Date().toISOString(),
};
}
// ── MCP tool registration ─────────────────────────────────────────────────────
registerTool({
name: 'jarvis.memory.curate',
description:
'A2A pipeline: scans all JarvisJr coaching agents, identifies memories below an importance threshold or past their expiry date, prunes stale entries, and returns a per-agent hygiene report. Use dryRun=true to preview without pruning. Requires admin role.',
requiredRole: 'admin',
inputSchema: z.object({
importanceThreshold: z.coerce
.number()
.min(0)
.max(1)
.default(0.3)
.describe('Memories with importance below this value are considered stale (default 0.3)'),
dryRun: z
.boolean()
.default(false)
.describe('If true, scan and report without pruning any memories'),
}),
async execute(args, req) {
return runMemoryCurationPipeline(args.importanceThreshold, args.dryRun, req);
},
});

View File

@ -0,0 +1,252 @@
/**
* TriageQualityAgent A2A pipeline for MindLyst memory triage quality improvement.
*
* Agent roster (3 steps):
* 1. LowConfidenceCollectorAgent list memory items, filter for confidenceScore below threshold
* 2. RetriageAgent re-run extraction on each low-confidence item
* 3. TriageQualityReportAgent compare old vs new routing, auto-reassign if brainId changed
*
* MCP tools:
* mindlyst.memory.triageQuality(confidenceThreshold?, dryRun?) run pipeline
*/
import { randomUUID } from 'node:crypto';
import { z } from 'zod';
import { registerTool } from '../tools/registry.js';
import type { McpToolRequest } from '../tools/types.js';
import {
mindlystMemoryList,
mindlystMemoryRetriage,
mindlystMemoryReassign,
} from '../../lib/mindlyst-client.js';
import { config } from '../../lib/config.js';
// ── Types ──────────────────────────────────────────────────────────────────────
interface LowConfidenceItem {
id: string;
confidenceScore: number;
currentBrainId: string | null;
contentType: string;
}
interface RetriageResult {
itemId: string;
oldConfidenceScore: number;
newConfidenceScore: number;
oldBrainId: string | null;
newBrainId: string | null;
brainChanged: boolean;
reassigned: boolean;
error?: string;
}
export interface TriageQualityReport {
runId: string;
productId: 'mindlyst';
confidenceThreshold: number;
dryRun: boolean;
totalFetched: number;
lowConfidenceCount: number;
retriaged: number;
improved: number;
brainChanges: number;
reassigned: number;
failed: number;
perItem: RetriageResult[];
summary: string;
generatedAt: string;
}
// ── Step 1: LowConfidenceCollectorAgent ───────────────────────────────────────
async function collectLowConfidenceItems(
confidenceThreshold: number,
opts: { token?: string; requestId?: string }
): Promise<{ items: LowConfidenceItem[]; totalFetched: number }> {
try {
const result = await mindlystMemoryList({ limit: config.QUERY_MAX_LIMIT }, opts);
const allItems = result.items;
const lowConfidence = allItems
.filter(item => item.triageResult.confidenceScore < confidenceThreshold)
.map(item => ({
id: item.id,
confidenceScore: item.triageResult.confidenceScore,
currentBrainId: item.brainIds[0] ?? null,
contentType: item.triageResult.contentType,
}));
return { items: lowConfidence, totalFetched: allItems.length };
} catch {
return { items: [], totalFetched: 0 };
}
}
// ── Step 2: RetriageAgent ─────────────────────────────────────────────────────
async function retriageItem(
item: LowConfidenceItem,
dryRun: boolean,
opts: { token?: string; requestId?: string }
): Promise<RetriageResult> {
if (dryRun) {
return {
itemId: item.id,
oldConfidenceScore: item.confidenceScore,
newConfidenceScore: item.confidenceScore,
oldBrainId: item.currentBrainId,
newBrainId: item.currentBrainId,
brainChanged: false,
reassigned: false,
};
}
try {
const updated = await mindlystMemoryRetriage(item.id, opts);
const newConfidence = updated.triageResult.confidenceScore;
const newBrainId = updated.triageResult.suggestedBrainId ?? updated.brainIds[0] ?? null;
const brainChanged = newBrainId !== null && newBrainId !== item.currentBrainId;
let reassigned = false;
if (brainChanged && newBrainId) {
try {
await mindlystMemoryReassign(item.id, newBrainId, opts);
reassigned = true;
} catch {
// best-effort reassignment
}
}
return {
itemId: item.id,
oldConfidenceScore: item.confidenceScore,
newConfidenceScore: newConfidence,
oldBrainId: item.currentBrainId,
newBrainId,
brainChanged,
reassigned,
};
} catch (err) {
return {
itemId: item.id,
oldConfidenceScore: item.confidenceScore,
newConfidenceScore: item.confidenceScore,
oldBrainId: item.currentBrainId,
newBrainId: item.currentBrainId,
brainChanged: false,
reassigned: false,
error: err instanceof Error ? err.message : String(err),
};
}
}
// ── Step 3: TriageQualityReportAgent ──────────────────────────────────────────
function buildTriageReport(
runId: string,
confidenceThreshold: number,
dryRun: boolean,
totalFetched: number,
results: RetriageResult[]
): TriageQualityReport {
const now = new Date().toISOString();
const improved = results.filter(
r => r.newConfidenceScore > r.oldConfidenceScore && !r.error
).length;
const brainChanges = results.filter(r => r.brainChanged).length;
const reassigned = results.filter(r => r.reassigned).length;
const failed = results.filter(r => !!r.error).length;
const summary = dryRun
? `DRY RUN: ${results.length} items below confidence threshold ${confidenceThreshold} found. No retriaging performed.`
: results.length === 0
? `All ${totalFetched} memory items meet confidence threshold ${confidenceThreshold}. No action needed.`
: `Retriaged ${results.length - failed}/${results.length} items. ${improved} improved confidence, ${brainChanges} brain routing changes, ${reassigned} items reassigned. ${failed} failed.`;
return {
runId,
productId: 'mindlyst',
confidenceThreshold,
dryRun,
totalFetched,
lowConfidenceCount: results.length,
retriaged: results.length - failed,
improved,
brainChanges,
reassigned,
failed,
perItem: results,
summary,
generatedAt: now,
};
}
// ── Pipeline runner ────────────────────────────────────────────────────────────
async function runTriageQualityPipeline(
confidenceThreshold: number,
dryRun: boolean,
req: McpToolRequest
): Promise<TriageQualityReport> {
const runId = randomUUID();
const opts = { token: req.headers.authorization?.slice(7), requestId: req.id };
req.log.info(
{ runId, stepId: 'collect', confidenceThreshold, dryRun },
'LowConfidenceCollectorAgent start'
);
const { items, totalFetched } = await collectLowConfidenceItems(confidenceThreshold, opts);
req.log.info(
{ runId, stepId: 'collect', totalFetched, lowConfidenceCount: items.length },
'LowConfidenceCollectorAgent done'
);
req.log.info({ runId, stepId: 'retriage', count: items.length, dryRun }, 'RetriageAgent start');
const results: RetriageResult[] = [];
for (const item of items) {
const result = await retriageItem(item, dryRun, opts);
results.push(result);
}
req.log.info(
{
runId,
stepId: 'retriage',
improved: results.filter(r => r.newConfidenceScore > r.oldConfidenceScore).length,
},
'RetriageAgent done'
);
req.log.info({ runId, stepId: 'report' }, 'TriageQualityReportAgent start');
const report = buildTriageReport(runId, confidenceThreshold, dryRun, totalFetched, results);
req.log.info(
{ runId, stepId: 'report', summary: report.summary },
'TriageQualityReportAgent done'
);
return report;
}
// ── MCP tool registration ─────────────────────────────────────────────────────
registerTool({
name: 'mindlyst.memory.triageQuality',
description:
'A2A pipeline: finds MindLyst memory items with confidenceScore below a threshold, re-runs extraction on each, and auto-reassigns items where brain routing has changed. Returns a quality improvement report with confidence deltas and brain reassignment counts. Use dryRun=true to preview. Requires admin role.',
requiredRole: 'admin',
inputSchema: z.object({
confidenceThreshold: z.coerce
.number()
.min(0)
.max(1)
.default(0.5)
.describe('Items with confidenceScore below this are re-triaged (default 0.5)'),
dryRun: z
.boolean()
.default(false)
.describe('If true, collect and count only — do not retriage or reassign'),
}),
async execute(args, req) {
return runTriageQualityPipeline(args.confidenceThreshold, args.dryRun, req);
},
});

View File

@ -42,6 +42,9 @@ import './modules/a2a/sync-diagnostics-pipeline.js';
import './modules/a2a/transcript-extraction-pipeline.js';
import './modules/a2a/sync-conflict-pipeline.js';
import './modules/a2a/route-safety-pipeline.js';
import './modules/a2a/memory-curation-pipeline.js';
import './modules/a2a/engagement-pipeline.js';
import './modules/a2a/triage-quality-pipeline.js';
import './modules/mindlyst/mindlyst-tools.js';
import './modules/lysnrai/lysnrai-tools.js';
import './modules/jarvis/jarvis-tools.js';