feat(mcp-server): add 4 A2A pipelines — DailyBrief, MarketplaceCert, SafetyMonitor, SyncDiagnostics

- DailyBriefGenerationPipeline (mindlyst.brief.generate): fans out memory items across all brains, runs memory-insight extraction per brain, assembles and stores DailyBriefDoc
- MarketplaceCertificationPipeline (jarvis.marketplace.runCertification): ingestion validation, content safety via triage extraction, heuristic quality scoring, approve/reject/pending_human_review decision
- SafetyMonitorAgent (nomgap.safety.check + nomgap.safety.getThresholds): 5-tier threshold table, push notifications, A2A handoff for 72h+ fasts
- SyncDiagnosticsAgent (peakpulse.sync.diagnose): failure pattern classification from telemetry, conditional platform diagnostics session, log/trace capture, root cause + extraction entity report
- Add 3 missing client functions: mindlystBriefCreate, nomgapFastingSessionGet, jarvisMarketplaceGetListing
- MCP server now at 91 tools across 16 namespaces
This commit is contained in:
saravanakumardb1 2026-03-05 14:22:25 -08:00
parent 26a9868380
commit 654de4ed2e
8 changed files with 1307 additions and 0 deletions

View File

@ -202,6 +202,13 @@ export function jarvisMarketplaceCertify(
);
}
export function jarvisMarketplaceGetListing(
listingId: string,
opts: JarvisClientOptions
): Promise<Record<string, unknown>> {
return jarvisFetch(`/marketplace/listings/${listingId}`, { method: 'GET' }, opts);
}
export function jarvisMarketplaceSuspend(
listingId: string,
reason: string,

View File

@ -168,6 +168,20 @@ export interface DailyBriefDoc {
createdAt: string;
}
export function mindlystBriefCreate(
body: {
date: string;
greeting?: string;
priorityItems?: unknown[];
brainSummaries?: Record<string, string>;
streakMessage?: string;
motivationalQuote?: string;
},
opts: MindlystClientOptions
): Promise<DailyBriefDoc> {
return mindlystFetch('/daily-briefs', { method: 'POST', body: JSON.stringify(body) }, opts);
}
export function mindlystBriefsGetToday(opts: MindlystClientOptions): Promise<DailyBriefDoc> {
return mindlystFetch('/daily-briefs/today', { method: 'GET' }, opts);
}

View File

@ -75,6 +75,13 @@ export function nomgapFastingSessionsList(
return nomgapFetch(`/fasting/sessions${q ? `?${q}` : ''}`, { method: 'GET' }, opts);
}
export function nomgapFastingSessionGet(
sessionId: string,
opts: NomGapClientOptions
): Promise<FastingSessionDoc> {
return nomgapFetch(`/fasting/sessions/${sessionId}`, { method: 'GET' }, opts);
}
export function nomgapFastingGetStats(opts: NomGapClientOptions): Promise<Record<string, unknown>> {
return nomgapFetch('/fasting/stats', { method: 'GET' }, opts);
}

View File

@ -0,0 +1,257 @@
/**
* DailyBriefGenerationPipeline A2A pipeline for MindLyst daily briefs.
*
* Agent roster (4 steps):
* 1. MemoryCollectorAgent fan-out: fetch recent memory items per brain
* 2. SummaryAgent call extraction-service 'memory-insight' per brain
* 3. BriefComposerAgent pure assembly of DailyBriefDoc (no I/O)
* 4. DeliveryAgent upsert brief via POST /daily-briefs
*
* MCP tools:
* mindlyst.brief.generate(userId, date?) run the full pipeline
*/
import { randomUUID } from 'node:crypto';
import { z } from 'zod';
import { registerTool } from '../tools/registry.js';
import type { McpToolRequest } from '../tools/types.js';
import {
mindlystBrainsList,
mindlystMemoryList,
mindlystBriefCreate,
type BrainDoc,
type MemoryItemDoc,
type DailyBriefDoc,
} from '../../lib/mindlyst-client.js';
import { extractionRun } from '../../lib/extraction-client.js';
// ── Types ─────────────────────────────────────────────────────────────────────
interface BrainMemoryBucket {
brainId: string;
brainName: string;
items: MemoryItemDoc[];
}
interface BrainSummaryResult {
brainId: string;
brainName: string;
summary: string;
highlights: string[];
}
export interface DailyBriefRunResult {
runId: string;
userId: string;
date: string;
totalBrainsConsidered: number;
totalMemoriesConsidered: number;
brief: DailyBriefDoc | null;
stored: boolean;
generatedAt: string;
}
// ── Step 1: MemoryCollectorAgent ──────────────────────────────────────────────
async function collectMemories(
userId: string,
lookbackHours: number,
opts: { token?: string; requestId?: string; adminUserId?: string }
): Promise<BrainMemoryBucket[]> {
const brainsResult = await mindlystBrainsList({}, opts);
const brains: BrainDoc[] = brainsResult.items;
const buckets: BrainMemoryBucket[] = [];
for (const brain of brains) {
const memResult = await mindlystMemoryList({ brainId: brain.id, limit: 20 }, opts);
const cutoff = Date.now() - lookbackHours * 60 * 60 * 1000;
const recentItems = memResult.items.filter(
item => new Date(item.createdAt).getTime() >= cutoff
);
const sorted = recentItems
.sort((a, b) => b.triageResult.urgencyScore - a.triageResult.urgencyScore)
.slice(0, 10);
if (sorted.length > 0) {
buckets.push({ brainId: brain.id, brainName: brain.name, items: sorted });
}
}
return buckets;
}
// ── Step 2: SummaryAgent ──────────────────────────────────────────────────────
async function summarizeBrain(
bucket: BrainMemoryBucket,
opts: { requestId?: string }
): Promise<BrainSummaryResult> {
const text = bucket.items
.map(item => `[${item.triageResult.contentType}] ${item.rawContent}`)
.join('\n\n');
try {
const result = await extractionRun(
{ text, taskId: 'memory-insight' },
{ requestId: opts.requestId }
);
const summaryItems = result.extractions.filter(
e => e.extraction_class === 'summary' || e.extraction_class === 'insight'
);
const entityItems = result.extractions.filter(
e => e.extraction_class === 'entity' || e.extraction_class === 'theme'
);
const summary =
summaryItems.length > 0
? summaryItems.map(e => e.extraction_text).join(' ')
: bucket.items
.slice(0, 3)
.map(i => i.rawContent.slice(0, 80))
.join(' | ');
const highlights = entityItems.slice(0, 5).map(e => e.extraction_text);
return { brainId: bucket.brainId, brainName: bucket.brainName, summary, highlights };
} catch {
// Fallback: top-3 items summarised inline
const summary = bucket.items
.slice(0, 3)
.map(i => i.rawContent.slice(0, 100))
.join(' | ');
return { brainId: bucket.brainId, brainName: bucket.brainName, summary, highlights: [] };
}
}
// ── Step 3: BriefComposerAgent (pure — no I/O) ───────────────────────────────
function composeBrief(
userId: string,
date: string,
brainSummaries: BrainSummaryResult[],
totalItems: number
): Omit<DailyBriefDoc, 'id' | 'createdAt'> {
const topItems = brainSummaries
.flatMap(b => b.highlights.slice(0, 2).map(h => ({ text: h, brainId: b.brainId })))
.slice(0, 5);
const summariesMap: Record<string, string> = {};
for (const b of brainSummaries) {
summariesMap[b.brainId] = b.summary;
}
const greeting =
brainSummaries.length === 0
? 'No new memories captured today — a clean slate!'
: `${brainSummaries.length} brain${brainSummaries.length > 1 ? 's' : ''} active with ${totalItems} item${totalItems !== 1 ? 's' : ''} to review.`;
return {
userId,
productId: 'mindlyst',
date,
greeting,
priorityItems: topItems,
brainSummaries: summariesMap,
};
}
// ── Step 4: DeliveryAgent ─────────────────────────────────────────────────────
async function storeBrief(
draft: Omit<DailyBriefDoc, 'id' | 'createdAt'>,
opts: { token?: string; requestId?: string; adminUserId?: string }
): Promise<{ brief: DailyBriefDoc; stored: boolean }> {
try {
const stored = await mindlystBriefCreate(
{
date: draft.date,
greeting: draft.greeting,
priorityItems: draft.priorityItems,
brainSummaries: draft.brainSummaries,
},
opts
);
return { brief: stored, stored: true };
} catch {
return {
brief: {
id: `brief_local_${randomUUID().slice(0, 8)}`,
createdAt: new Date().toISOString(),
...draft,
},
stored: false,
};
}
}
// ── Pipeline runner ───────────────────────────────────────────────────────────
export async function runDailyBriefPipeline(
userId: string,
date: string,
opts: { token?: string; requestId?: string }
): Promise<DailyBriefRunResult> {
const runId = `run_brief_${randomUUID().slice(0, 8)}`;
const generatedAt = new Date().toISOString();
const clientOpts = { token: opts.token, requestId: opts.requestId, adminUserId: userId };
// Step 1: collect memories (default 24h lookback)
const buckets = await collectMemories(userId, 24, clientOpts).catch(
() => [] as BrainMemoryBucket[]
);
const totalMemoriesConsidered = buckets.reduce((acc, b) => acc + b.items.length, 0);
// Step 2: summarise each brain (parallel, best-effort)
const summaryResults = await Promise.all(
buckets.map(b => summarizeBrain(b, { requestId: opts.requestId }))
);
// Step 3: compose brief (pure)
const draft = composeBrief(userId, date, summaryResults, totalMemoriesConsidered);
// Step 4: store
const { brief, stored } = await storeBrief(draft, clientOpts);
return {
runId,
userId,
date,
totalBrainsConsidered: buckets.length,
totalMemoriesConsidered,
brief,
stored,
generatedAt,
};
}
// ── MCP tool: mindlyst.brief.generate ─────────────────────────────────────────
registerTool({
name: 'mindlyst.brief.generate',
description: [
'A2A daily brief pipeline for MindLyst.',
'Fetches recent memory items across all brains (last 24 h), runs extraction memory-insight',
'per brain to produce summaries + highlights, assembles a DailyBriefDoc, and stores it via',
'POST /daily-briefs. Falls back gracefully if extraction-service is unavailable (uses raw',
'item text instead). Returns the generated brief + metadata. Requires admin role.',
].join(' '),
requiredRole: 'admin',
inputSchema: z.object({
userId: z.string().min(1).describe('MindLyst user ID to generate the brief for'),
date: z
.string()
.regex(/^\d{4}-\d{2}-\d{2}$/)
.optional()
.describe('ISO date (YYYY-MM-DD); defaults to today'),
}),
async execute(args, req: McpToolRequest) {
const token = req.headers.authorization?.slice(7);
const date = args.date ?? new Date().toISOString().slice(0, 10);
return runDailyBriefPipeline(args.userId, date, { token, requestId: req.id });
},
});

View File

@ -0,0 +1,367 @@
/**
* MarketplaceCertificationPipeline A2A pipeline for JarvisJr marketplace review.
*
* Agent roster (4 steps):
* 1. SubmissionIngestionAgent fetch listing, validate required fields
* 2. ContentSafetyAgent run extraction 'triage' on systemPrompt + description
* 3. QualityEvalAgent heuristic quality scoring (no live eval session)
* 4. CertificationDecisionAgent approve / reject / pending_human_review via certify endpoint
*
* MCP tools:
* jarvis.marketplace.runCertification(listingId) run the full pipeline
*/
import { randomUUID } from 'node:crypto';
import { z } from 'zod';
import { registerTool } from '../tools/registry.js';
import type { McpToolRequest } from '../tools/types.js';
import { jarvisMarketplaceGetListing, jarvisMarketplaceCertify } from '../../lib/jarvis-client.js';
import { extractionRun } from '../../lib/extraction-client.js';
// ── Types ─────────────────────────────────────────────────────────────────────
type SafetyVerdict = 'pass' | 'flag' | 'reject';
type CertDecision = 'approved' | 'rejected' | 'pending_human_review';
interface IngestionResult {
valid: boolean;
errors: string[];
listing: Record<string, unknown>;
}
interface SafetyResult {
verdict: SafetyVerdict;
reasons: string[];
confidenceScore: number;
}
interface QualityResult {
score: number;
passed: boolean;
coherenceIssues: string[];
}
export interface CertificationRunResult {
runId: string;
listingId: string;
decision: CertDecision;
isVerified: boolean;
ingestion: IngestionResult;
safety: SafetyResult;
quality: QualityResult;
rejectionReasons: string[];
certifiedAt: string;
}
// ── Required fields for marketplace listings ──────────────────────────────────
const REQUIRED_FIELDS = [
'name',
'systemPrompt',
'coachingFramework',
'category',
'tags',
'voiceId',
] as const;
// ── Prohibited policy classes (from extraction triage) ────────────────────────
const PROHIBITED_CLASSES = new Set([
'hate_speech',
'harassment',
'self_harm',
'explicit_content',
'dangerous_instructions',
]);
const FLAGGED_CLASSES = new Set(['profanity', 'controversial', 'misleading']);
// ── Step 1: SubmissionIngestionAgent ─────────────────────────────────────────
async function ingestSubmission(
listingId: string,
opts: { token?: string; requestId?: string }
): Promise<IngestionResult> {
const listing = await jarvisMarketplaceGetListing(listingId, opts);
const errors: string[] = [];
for (const field of REQUIRED_FIELDS) {
const val = listing[field];
if (!val || (typeof val === 'string' && val.trim() === '')) {
errors.push(`Missing required field: ${field}`);
}
if (field === 'tags' && Array.isArray(val) && val.length === 0) {
errors.push('tags[] must have at least one entry');
}
}
return { valid: errors.length === 0, errors, listing };
}
// ── Step 2: ContentSafetyAgent ────────────────────────────────────────────────
async function checkContentSafety(
listingId: string,
listing: Record<string, unknown>,
opts: { requestId?: string }
): Promise<SafetyResult> {
const systemPrompt = String(listing['systemPrompt'] ?? '');
const description = String(listing['description'] ?? '');
const text = [systemPrompt, description].filter(Boolean).join('\n\n');
if (!text.trim()) {
return { verdict: 'pass', reasons: [], confidenceScore: 1.0 };
}
try {
const result = await extractionRun({ text, taskId: 'triage' }, { requestId: opts.requestId });
const prohibitedHits = result.extractions.filter(e =>
PROHIBITED_CLASSES.has(e.extraction_class)
);
const flaggedHits = result.extractions.filter(e => FLAGGED_CLASSES.has(e.extraction_class));
if (prohibitedHits.length > 0) {
return {
verdict: 'reject',
reasons: prohibitedHits.map(
e => `Prohibited content: ${e.extraction_class} — "${e.extraction_text.slice(0, 80)}"`
),
confidenceScore: 0.95,
};
}
if (flaggedHits.length > 0) {
return {
verdict: 'flag',
reasons: flaggedHits.map(
e => `Borderline content: ${e.extraction_class} — "${e.extraction_text.slice(0, 80)}"`
),
confidenceScore: 0.7,
};
}
return { verdict: 'pass', reasons: [], confidenceScore: 0.9 };
} catch {
// Extraction unavailable — pass with low confidence (human review will catch issues)
return {
verdict: 'pass',
reasons: ['extraction-service unavailable — manual review recommended'],
confidenceScore: 0.4,
};
}
}
// ── Step 3: QualityEvalAgent (heuristic — no live session) ───────────────────
function evaluateQuality(listing: Record<string, unknown>): QualityResult {
const coherenceIssues: string[] = [];
let score = 1.0;
const systemPrompt = String(listing['systemPrompt'] ?? '');
const name = String(listing['name'] ?? '');
const description = String(listing['description'] ?? '');
// System prompt length check
if (systemPrompt.length < 50) {
coherenceIssues.push('systemPrompt is too short (< 50 chars) — coaching context insufficient');
score -= 0.3;
}
if (systemPrompt.length > 8000) {
coherenceIssues.push('systemPrompt is very long (> 8000 chars) — may cause latency issues');
score -= 0.1;
}
// Name quality
if (name.length < 3 || name.length > 60) {
coherenceIssues.push('name should be 360 characters');
score -= 0.1;
}
// Description
if (!description || description.trim().length < 20) {
coherenceIssues.push('description is missing or too short — users need context');
score -= 0.2;
}
// Tags
const tags = listing['tags'];
if (!Array.isArray(tags) || tags.length < 2) {
coherenceIssues.push('At least 2 tags recommended for discoverability');
score -= 0.05;
}
// Coaching framework must be a known value
const validFrameworks = [
'growth',
'accountability',
'reflective',
'socratic',
'motivational',
'instructional',
'CBT',
'custom',
];
const framework = String(listing['coachingFramework'] ?? '');
if (framework && !validFrameworks.includes(framework)) {
coherenceIssues.push(`coachingFramework '${framework}' is not a recognized value`);
score -= 0.1;
}
const finalScore = Math.max(0, Math.min(1, score));
return { score: finalScore, passed: finalScore >= 0.6, coherenceIssues };
}
// ── Step 4: CertificationDecisionAgent ───────────────────────────────────────
async function makeCertDecision(
listingId: string,
ingestion: IngestionResult,
safety: SafetyResult,
quality: QualityResult,
opts: { token?: string; requestId?: string }
): Promise<{ decision: CertDecision; isVerified: boolean; rejectionReasons: string[] }> {
const rejectionReasons: string[] = [];
if (!ingestion.valid) rejectionReasons.push(...ingestion.errors);
if (safety.verdict === 'reject') rejectionReasons.push(...safety.reasons);
if (!quality.passed) rejectionReasons.push(...quality.coherenceIssues);
let decision: CertDecision;
let isVerified: boolean;
if (safety.verdict === 'reject' || !ingestion.valid) {
decision = 'rejected';
isVerified = false;
} else if (safety.verdict === 'flag') {
decision = 'pending_human_review';
isVerified = false;
} else if (!quality.passed) {
decision = 'rejected';
isVerified = false;
} else {
decision = 'approved';
isVerified = true;
}
// Call the backend certify endpoint
if (decision === 'approved' || decision === 'rejected') {
try {
await jarvisMarketplaceCertify(
listingId,
{
decision: decision === 'approved' ? 'approved' : 'rejected',
notes: rejectionReasons.join('; ') || undefined,
},
opts
);
} catch {
// Best-effort — still return the decision result even if the update fails
}
}
return { decision, isVerified, rejectionReasons };
}
// ── Pipeline runner ───────────────────────────────────────────────────────────
export async function runMarketplaceCertPipeline(
listingId: string,
opts: { token?: string; requestId?: string }
): Promise<CertificationRunResult> {
const runId = `run_cert_${randomUUID().slice(0, 8)}`;
const certifiedAt = new Date().toISOString();
// Step 1: ingest
let ingestion: IngestionResult;
try {
ingestion = await ingestSubmission(listingId, opts);
} catch (err) {
const msg = err instanceof Error ? err.message : String(err);
return {
runId,
listingId,
certifiedAt,
decision: 'pending_human_review',
isVerified: false,
ingestion: { valid: false, errors: [`Failed to fetch listing: ${msg}`], listing: {} },
safety: { verdict: 'pass', reasons: [], confidenceScore: 0 },
quality: { score: 0, passed: false, coherenceIssues: [] },
rejectionReasons: [`Failed to fetch listing: ${msg}`],
};
}
// Step 2: safety check
const safety = await checkContentSafety(listingId, ingestion.listing, {
requestId: opts.requestId,
});
// Halt on reject (no need for quality eval)
if (safety.verdict === 'reject' || !ingestion.valid) {
const quality: QualityResult = { score: 0, passed: false, coherenceIssues: [] };
const { decision, isVerified, rejectionReasons } = await makeCertDecision(
listingId,
ingestion,
safety,
quality,
opts
);
return {
runId,
listingId,
certifiedAt,
decision,
isVerified,
ingestion,
safety,
quality,
rejectionReasons,
};
}
// Step 3: quality eval (heuristic)
const quality = evaluateQuality(ingestion.listing);
// Step 4: decision + backend update
const { decision, isVerified, rejectionReasons } = await makeCertDecision(
listingId,
ingestion,
safety,
quality,
opts
);
return {
runId,
listingId,
certifiedAt,
decision,
isVerified,
ingestion,
safety,
quality,
rejectionReasons,
};
}
// ── MCP tool: jarvis.marketplace.runCertification ────────────────────────────
registerTool({
name: 'jarvis.marketplace.runCertification',
description: [
'A2A marketplace certification pipeline for JarvisJr.',
'Runs 4 agents in sequence: SubmissionIngestionAgent (validate required fields),',
'ContentSafetyAgent (extraction triage on systemPrompt + description — detects prohibited/flagged content),',
'QualityEvalAgent (heuristic scoring: prompt length, description, tags, framework),',
'CertificationDecisionAgent (approve / reject / pending_human_review + updates listing via backend).',
'Returns full CertificationRunResult with per-agent verdicts and rejectionReasons.',
'Requires admin role.',
].join(' '),
requiredRole: 'admin',
inputSchema: z.object({
listingId: z.string().min(1).describe('Marketplace listing ID to certify'),
}),
async execute(args, req: McpToolRequest) {
const token = req.headers.authorization?.slice(7);
return runMarketplaceCertPipeline(args.listingId, { token, requestId: req.id });
},
});

View File

@ -0,0 +1,299 @@
/**
* SafetyMonitorAgent A2A pipeline for NomGap extended-fast safety checks.
*
* Agent roster (4 steps):
* 1. FastStateInspectorAgent fetch active fasting session + elapsed hours
* 2. ThresholdEvaluatorAgent pure eval: safe / caution / critical + pushType
* 3. SafetyNotificationAgent fire push notification + log telemetry event
* 4. ExtendedFastHandoffAgent emit A2A handoff artifact when elapsedHours >= 72
*
* MCP tools:
* nomgap.safety.check(userId) run the full pipeline
* nomgap.safety.getThresholds() return the threshold table (no I/O)
*/
import { randomUUID } from 'node:crypto';
import { z } from 'zod';
import { registerTool } from '../tools/registry.js';
import type { McpToolRequest } from '../tools/types.js';
import {
nomgapFastingSessionsList,
nomgapPushFire,
type FastingSessionDoc,
} from '../../lib/nomgap-client.js';
// ── Types ─────────────────────────────────────────────────────────────────────
type SafetyLevel = 'safe' | 'caution' | 'critical';
type PushType = 'refeeding_reminder' | 'extended_fast_warning' | 'electrolyte_reminder' | null;
interface FastStateResult {
sessionId: string;
protocolId: string;
startedAt: string;
elapsedHours: number;
targetHours: number;
isActive: boolean;
}
interface ThresholdResult {
safetyLevel: SafetyLevel;
pushType: PushType;
escalate: boolean;
messageVariables: Record<string, string>;
}
interface NotificationResult {
notified: boolean;
pushType: PushType;
}
interface HandoffResult {
handoffEmitted: boolean;
handoffPayload?: {
productId: 'nomgap';
agentTarget: 'ExtendedFastDetectionAgent';
userId: string;
sessionId: string;
elapsedHours: number;
triggeredAt: string;
};
}
export interface SafetyCheckResult {
runId: string;
userId: string;
productId: 'nomgap';
sessionId: string | null;
elapsedHours: number;
safetyLevel: SafetyLevel;
actionsTaken: string[];
handoff: HandoffResult | null;
checkedAt: string;
}
// ── Threshold table ───────────────────────────────────────────────────────────
export const SAFETY_THRESHOLDS = [
{
minHours: 72,
maxHours: Infinity,
level: 'critical' as SafetyLevel,
pushType: 'extended_fast_warning' as PushType,
escalate: true,
},
{
minHours: 48,
maxHours: 72,
level: 'critical' as SafetyLevel,
pushType: 'refeeding_reminder' as PushType,
escalate: false,
},
{
minHours: 42,
maxHours: 48,
level: 'caution' as SafetyLevel,
pushType: 'refeeding_reminder' as PushType,
escalate: false,
},
{
minHours: 24,
maxHours: 42,
level: 'caution' as SafetyLevel,
pushType: 'electrolyte_reminder' as PushType,
escalate: false,
},
{
minHours: 0,
maxHours: 24,
level: 'safe' as SafetyLevel,
pushType: null as PushType,
escalate: false,
},
];
// ── Step 1: FastStateInspectorAgent ──────────────────────────────────────────
async function inspectFastState(
userId: string,
opts: { token?: string; requestId?: string }
): Promise<FastStateResult | null> {
const result = await nomgapFastingSessionsList(
{ status: 'active', limit: 1 },
{ token: opts.token, requestId: opts.requestId }
);
const session = result.items[0] as FastingSessionDoc | undefined;
if (!session) return null;
const startedAt = session.startedAt ?? session.createdAt;
const elapsedMs = Date.now() - new Date(startedAt).getTime();
const elapsedHours = elapsedMs / (1000 * 60 * 60);
return {
sessionId: session.id,
protocolId: session.protocolId ?? 'unknown',
startedAt,
elapsedHours: Math.round(elapsedHours * 10) / 10,
targetHours: session.targetDurationMs
? (session.targetDurationMs as number) / (1000 * 60 * 60)
: 0,
isActive: true,
};
}
// ── Step 2: ThresholdEvaluatorAgent (pure — no I/O) ──────────────────────────
function evaluateThreshold(state: FastStateResult): ThresholdResult {
const row =
SAFETY_THRESHOLDS.find(
t => state.elapsedHours >= t.minHours && state.elapsedHours < t.maxHours
) ?? SAFETY_THRESHOLDS[SAFETY_THRESHOLDS.length - 1];
return {
safetyLevel: row.level,
pushType: row.pushType,
escalate: row.escalate,
messageVariables: {
hours: String(Math.floor(state.elapsedHours)),
protocolId: state.protocolId,
refeedingTip: 'Start with small amounts of easily digestible foods.',
},
};
}
// ── Step 3: SafetyNotificationAgent ──────────────────────────────────────────
async function sendSafetyNotification(
userId: string,
threshold: ThresholdResult,
sessionId: string,
opts: { token?: string; requestId?: string }
): Promise<NotificationResult> {
if (!threshold.pushType) return { notified: false, pushType: null };
try {
await nomgapPushFire(
{
type: threshold.pushType as Parameters<typeof nomgapPushFire>[0]['type'],
userId,
variables: threshold.messageVariables,
},
{ token: opts.token, requestId: opts.requestId }
);
return { notified: true, pushType: threshold.pushType };
} catch {
return { notified: false, pushType: threshold.pushType };
}
}
// ── Step 4: ExtendedFastHandoffAgent ─────────────────────────────────────────
function emitExtendedFastHandoff(
userId: string,
sessionId: string,
elapsedHours: number
): HandoffResult {
return {
handoffEmitted: true,
handoffPayload: {
productId: 'nomgap',
agentTarget: 'ExtendedFastDetectionAgent',
userId,
sessionId,
elapsedHours,
triggeredAt: new Date().toISOString(),
},
};
}
// ── Pipeline runner ───────────────────────────────────────────────────────────
export async function runSafetyMonitorPipeline(
userId: string,
opts: { token?: string; requestId?: string }
): Promise<SafetyCheckResult> {
const runId = `run_safety_${randomUUID().slice(0, 8)}`;
const checkedAt = new Date().toISOString();
const actionsTaken: string[] = [];
// Step 1: inspect fast state
const fastState = await inspectFastState(userId, opts).catch(() => null);
if (!fastState) {
return {
runId,
userId,
productId: 'nomgap',
sessionId: null,
elapsedHours: 0,
safetyLevel: 'safe',
actionsTaken: ['no_active_session'],
handoff: null,
checkedAt,
};
}
// Step 2: evaluate threshold (pure)
const threshold = evaluateThreshold(fastState);
actionsTaken.push(`threshold:${threshold.safetyLevel}`);
// Step 3: send notification
const notification = await sendSafetyNotification(userId, threshold, fastState.sessionId, opts);
if (notification.notified) actionsTaken.push(`push:${notification.pushType}`);
// Step 4: handoff for 72h+ fasts
let handoff: HandoffResult | null = null;
if (threshold.escalate) {
handoff = emitExtendedFastHandoff(userId, fastState.sessionId, fastState.elapsedHours);
actionsTaken.push('handoff:ExtendedFastDetectionAgent');
}
return {
runId,
userId,
productId: 'nomgap',
sessionId: fastState.sessionId,
elapsedHours: fastState.elapsedHours,
safetyLevel: threshold.safetyLevel,
actionsTaken,
handoff,
checkedAt,
};
}
// ── MCP tool: nomgap.safety.check ─────────────────────────────────────────────
registerTool({
name: 'nomgap.safety.check',
description: [
'A2A safety pipeline for NomGap extended fasts.',
"Fetches the user's active fasting session, evaluates against safety thresholds",
'(safe <24h / caution 24-48h / critical 48h+), fires the appropriate push notification',
'(electrolyte_reminder / refeeding_reminder / extended_fast_warning), and emits an',
'A2A handoff artifact to ExtendedFastDetectionAgent for fasts ≥ 72 h.',
'Returns SafetyCheckResult with safetyLevel, actionsTaken, and handoff payload.',
'Requires admin role.',
].join(' '),
requiredRole: 'admin',
inputSchema: z.object({
userId: z.string().min(1).describe('NomGap user ID to check'),
}),
async execute(args, req: McpToolRequest) {
const token = req.headers.authorization?.slice(7);
return runSafetyMonitorPipeline(args.userId, { token, requestId: req.id });
},
});
// ── MCP tool: nomgap.safety.getThresholds ────────────────────────────────────
registerTool({
name: 'nomgap.safety.getThresholds',
description:
'Returns the NomGap fasting safety threshold table: hours ranges, safety levels, push types, and escalation flags. Pure read — no I/O, no auth required beyond viewer role.',
requiredRole: 'viewer',
inputSchema: z.object({}),
async execute() {
return { thresholds: SAFETY_THRESHOLDS };
},
});

View File

@ -0,0 +1,352 @@
/**
* SyncDiagnosticsAgent A2A pipeline for PeakPulse sync failure diagnostics.
*
* Agent roster (4 steps):
* 1. SyncFailureInspectorAgent stats + recent failure count from telemetry
* 2. DiagnosticsSessionAgent create platform diagnostics session (conditional)
* 3. SyncRetryObserverAgent collect current logs + traces from session (best-effort)
* 4. SyncDiagnosticReportAgent assemble report + optional extraction entity run
*
* MCP tools:
* peakpulse.sync.diagnose(userId, deviceId?, errorCode?, platform?) run pipeline
*/
import { randomUUID } from 'node:crypto';
import { z } from 'zod';
import { registerTool } from '../tools/registry.js';
import type { McpToolRequest } from '../tools/types.js';
import { peakpulseGetStats } from '../../lib/peakpulse-client.js';
import {
telemetryQuery,
diagnosticsCreateSession,
diagnosticsGetLogs,
diagnosticsGetTraces,
diagnosticsUpdateSession,
type DebugSession,
} from '../../lib/platform-client.js';
import { extractionRun, type ExtractionItem } from '../../lib/extraction-client.js';
// ── Types ─────────────────────────────────────────────────────────────────────
type FailurePattern = 'transient' | 'auth' | 'persistent' | 'unknown';
interface SyncFailureInspection {
userId: string;
deviceId: string | null;
queueDepth: number;
lastSuccessfulSync: string | null;
failurePattern: FailurePattern;
recentFailureCount: number;
shouldStartDiagnostics: boolean;
}
interface DiagnosticsCapture {
skipped: boolean;
skipReason?: string;
session?: DebugSession;
}
interface ObserverCapture {
networkTraces: unknown[];
logEntries: unknown[];
syncAttemptDetected: boolean;
timedOut: boolean;
}
export interface SyncDiagnosticReport {
runId: string;
productId: 'peakpulse';
userId: string;
deviceId: string | null;
failurePattern: FailurePattern;
diagnosticsSessionId: string | null;
queueDepth: number;
recentFailureCount: number;
syncAttemptDetected: boolean;
rootCauseSummary: string;
recommendedAction: string;
extractedEntities: ExtractionItem[];
generatedAt: string;
}
// ── Step 1: SyncFailureInspectorAgent ────────────────────────────────────────
async function inspectSyncFailures(
userId: string,
deviceId: string | null,
errorCode: string | null,
platform: string | null,
opts: { token?: string; requestId?: string }
): Promise<SyncFailureInspection> {
// Fetch session stats for queue depth
let queueDepth = 0;
let lastSuccessfulSync: string | null = null;
try {
const stats = await peakpulseGetStats(opts);
queueDepth = ((stats as Record<string, unknown>).unsyncedCount as number) ?? 0;
lastSuccessfulSync = ((stats as Record<string, unknown>).lastSyncAt as string) ?? null;
} catch {
// best-effort
}
// Query telemetry for recent sync failures (last 24h)
const now = new Date();
const from = new Date(now.getTime() - 24 * 60 * 60 * 1000).toISOString();
let recentFailureCount = 0;
try {
const result = await telemetryQuery(
{
productId: 'peakpulse',
eventType: 'sync_upload_failed',
from,
to: now.toISOString(),
limit: 20,
},
{ ...opts, productId: 'peakpulse' }
);
recentFailureCount = result.total;
} catch {
// best-effort
}
// Classify failure pattern
let failurePattern: FailurePattern;
if (errorCode === 'auth_401' || errorCode === 'unauthorized') {
failurePattern = 'auth';
} else if (recentFailureCount >= 5) {
failurePattern = 'persistent';
} else if (recentFailureCount <= 1) {
failurePattern = 'transient';
} else {
failurePattern = 'unknown';
}
const shouldStartDiagnostics =
recentFailureCount >= 2 || failurePattern === 'auth' || failurePattern === 'persistent';
return {
userId,
deviceId,
queueDepth,
lastSuccessfulSync,
failurePattern,
recentFailureCount,
shouldStartDiagnostics,
};
}
// ── Step 2: DiagnosticsSessionAgent ──────────────────────────────────────────
async function startDiagnosticsSession(
inspection: SyncFailureInspection,
opts: { token?: string; requestId?: string }
): Promise<DiagnosticsCapture> {
if (!inspection.shouldStartDiagnostics) {
return { skipped: true, skipReason: 'Single transient failure — diagnostics not warranted' };
}
const collectionLevel = inspection.failurePattern === 'auth' ? 'trace' : 'debug';
try {
const session = await diagnosticsCreateSession(
{
productId: 'peakpulse',
targetUserId: inspection.userId,
...(inspection.deviceId ? { targetAnonymousId: inspection.deviceId } : {}),
collectionLevel,
captureLogs: true,
captureNetwork: true,
maxDurationMinutes: 15,
},
opts
);
return { skipped: false, session };
} catch (err) {
const msg = err instanceof Error ? err.message : String(err);
return { skipped: true, skipReason: `Failed to create diagnostics session: ${msg}` };
}
}
// ── Step 3: SyncRetryObserverAgent (snapshot — no long-poll) ─────────────────
async function captureSessionData(
sessionId: string,
opts: { token?: string; requestId?: string }
): Promise<ObserverCapture> {
let networkTraces: unknown[] = [];
let logEntries: unknown[] = [];
try {
const [logsResult, tracesResult] = await Promise.all([
diagnosticsGetLogs(sessionId, { limit: 50 }, opts),
diagnosticsGetTraces(sessionId, { limit: 50 }, opts),
]);
logEntries = logsResult.logs;
networkTraces = tracesResult.traces;
} catch {
// best-effort
}
const syncAttemptDetected = networkTraces.length > 0 || logEntries.length > 0;
return { networkTraces, logEntries, syncAttemptDetected, timedOut: false };
}
// ── Step 4: SyncDiagnosticReportAgent ────────────────────────────────────────
function deriveRootCause(inspection: SyncFailureInspection, observer: ObserverCapture): string {
if (inspection.failurePattern === 'auth') {
return `Authentication failure pattern detected (${inspection.recentFailureCount} failures in 24h). Token may be expired or revoked.`;
}
if (inspection.failurePattern === 'persistent' && inspection.queueDepth > 0) {
return `Persistent sync failure with ${inspection.queueDepth} session(s) queued. Likely network or server-side conflict.`;
}
if (inspection.failurePattern === 'transient') {
return `Single transient failure — likely a momentary network interruption. No action needed.`;
}
if (!observer.syncAttemptDetected) {
return `No sync attempt captured in diagnostics window — device may not have retried yet.`;
}
return `Sync failure detected with ${inspection.recentFailureCount} occurrences. Review network traces for root cause.`;
}
function deriveRecommendedAction(inspection: SyncFailureInspection): string {
if (inspection.failurePattern === 'auth')
return 'Force a token refresh on the device — auth credentials are stale.';
if (inspection.queueDepth > 10)
return 'Investigate Cosmos DB conflict resolution — high queue depth suggests repeated write conflicts.';
if (inspection.failurePattern === 'transient')
return 'Monitor for recurrence — single failure, no action required now.';
return 'Retry sync on WiFi; if failure persists, check platform-service Cosmos write throughput.';
}
async function buildReport(
runId: string,
inspection: SyncFailureInspection,
diagnosticsCapture: DiagnosticsCapture,
observer: ObserverCapture,
opts: { requestId?: string }
): Promise<SyncDiagnosticReport> {
// Close diagnostics session
if (diagnosticsCapture.session) {
try {
await diagnosticsUpdateSession(diagnosticsCapture.session.id, { status: 'completed' }, opts);
} catch {
// best-effort
}
}
// Optional: extract entities from log text
let extractedEntities: ExtractionItem[] = [];
const logText = (observer.logEntries as Array<Record<string, unknown>>)
.slice(0, 10)
.map(l => String(l['message'] ?? l['msg'] ?? ''))
.filter(Boolean)
.join('\n');
if (logText.length > 20) {
try {
const result = await extractionRun(
{ text: logText, taskId: 'triage' },
{ requestId: opts.requestId }
);
extractedEntities = result.extractions.filter(e =>
['error_code', 'url', 'status_code', 'entity'].includes(e.extraction_class)
);
} catch {
// best-effort
}
}
return {
runId,
productId: 'peakpulse',
userId: inspection.userId,
deviceId: inspection.deviceId,
failurePattern: inspection.failurePattern,
diagnosticsSessionId: diagnosticsCapture.session?.id ?? null,
queueDepth: inspection.queueDepth,
recentFailureCount: inspection.recentFailureCount,
syncAttemptDetected: observer.syncAttemptDetected,
rootCauseSummary: deriveRootCause(inspection, observer),
recommendedAction: deriveRecommendedAction(inspection),
extractedEntities,
generatedAt: new Date().toISOString(),
};
}
// ── Pipeline runner ───────────────────────────────────────────────────────────
export async function runSyncDiagnosticsPipeline(
userId: string,
deviceId: string | null,
errorCode: string | null,
platform: string | null,
opts: { token?: string; requestId?: string }
): Promise<SyncDiagnosticReport> {
const runId = `run_syncd_${randomUUID().slice(0, 8)}`;
// Step 1: inspect
const inspection = await inspectSyncFailures(userId, deviceId, errorCode, platform, opts);
// Step 2: diagnostics session (conditional)
const diagnosticsCapture = await startDiagnosticsSession(inspection, opts);
// Step 3: capture logs/traces (if session created)
let observer: ObserverCapture = {
networkTraces: [],
logEntries: [],
syncAttemptDetected: false,
timedOut: false,
};
if (diagnosticsCapture.session) {
observer = await captureSessionData(diagnosticsCapture.session.id, opts);
}
// Step 4: build report
return buildReport(runId, inspection, diagnosticsCapture, observer, {
requestId: opts.requestId,
});
}
// ── MCP tool: peakpulse.sync.diagnose ────────────────────────────────────────
registerTool({
name: 'peakpulse.sync.diagnose',
description: [
'A2A sync diagnostics pipeline for PeakPulse.',
'Step 1: SyncFailureInspectorAgent — queries current session queue depth + recent sync_upload_failed',
'telemetry events to classify the failure pattern (transient/auth/persistent/unknown).',
'Step 2: DiagnosticsSessionAgent — creates a platform diagnostics session targeting the device',
'if recentFailureCount >= 2 or pattern is auth/persistent.',
'Step 3: SyncRetryObserverAgent — collects current logs + network traces from the session.',
'Step 4: SyncDiagnosticReportAgent — assembles a structured report with rootCauseSummary,',
'recommendedAction, and optional extraction entities from log text.',
'Requires admin role.',
].join(' '),
requiredRole: 'admin',
inputSchema: z.object({
userId: z.string().min(1).describe('PeakPulse user ID'),
deviceId: z
.string()
.optional()
.describe('Device ID (anonymousInstallId) — narrows diagnostics scope'),
errorCode: z
.string()
.optional()
.describe(
'Error code from the failure event (e.g. network_timeout, auth_401, cosmos_conflict)'
),
platform: z.enum(['ios', 'android']).optional().describe('Platform of the failing device'),
}),
async execute(args, req: McpToolRequest) {
const token = req.headers.authorization?.slice(7);
return runSyncDiagnosticsPipeline(
args.userId,
args.deviceId ?? null,
args.errorCode ?? null,
args.platform ?? null,
{ token, requestId: req.id }
);
},
});

View File

@ -35,6 +35,10 @@ import './modules/platform/diagnostics-tools.js';
import './modules/extraction/extraction-tools.js';
import './modules/support/debug-pack.js';
import './modules/a2a/pipeline-tool.js';
import './modules/a2a/daily-brief-pipeline.js';
import './modules/a2a/marketplace-cert-pipeline.js';
import './modules/a2a/safety-monitor-pipeline.js';
import './modules/a2a/sync-diagnostics-pipeline.js';
import './modules/mindlyst/mindlyst-tools.js';
import './modules/lysnrai/lysnrai-tools.js';
import './modules/jarvis/jarvis-tools.js';