diff --git a/services/mcp-server/src/lib/jarvis-client.ts b/services/mcp-server/src/lib/jarvis-client.ts index 5f9cbe5e..159d890f 100644 --- a/services/mcp-server/src/lib/jarvis-client.ts +++ b/services/mcp-server/src/lib/jarvis-client.ts @@ -202,6 +202,13 @@ export function jarvisMarketplaceCertify( ); } +export function jarvisMarketplaceGetListing( + listingId: string, + opts: JarvisClientOptions +): Promise> { + return jarvisFetch(`/marketplace/listings/${listingId}`, { method: 'GET' }, opts); +} + export function jarvisMarketplaceSuspend( listingId: string, reason: string, diff --git a/services/mcp-server/src/lib/mindlyst-client.ts b/services/mcp-server/src/lib/mindlyst-client.ts index 23392764..fc17cf53 100644 --- a/services/mcp-server/src/lib/mindlyst-client.ts +++ b/services/mcp-server/src/lib/mindlyst-client.ts @@ -168,6 +168,20 @@ export interface DailyBriefDoc { createdAt: string; } +export function mindlystBriefCreate( + body: { + date: string; + greeting?: string; + priorityItems?: unknown[]; + brainSummaries?: Record; + streakMessage?: string; + motivationalQuote?: string; + }, + opts: MindlystClientOptions +): Promise { + return mindlystFetch('/daily-briefs', { method: 'POST', body: JSON.stringify(body) }, opts); +} + export function mindlystBriefsGetToday(opts: MindlystClientOptions): Promise { return mindlystFetch('/daily-briefs/today', { method: 'GET' }, opts); } diff --git a/services/mcp-server/src/lib/nomgap-client.ts b/services/mcp-server/src/lib/nomgap-client.ts index 8daad2e6..6a67216c 100644 --- a/services/mcp-server/src/lib/nomgap-client.ts +++ b/services/mcp-server/src/lib/nomgap-client.ts @@ -75,6 +75,13 @@ export function nomgapFastingSessionsList( return nomgapFetch(`/fasting/sessions${q ? `?${q}` : ''}`, { method: 'GET' }, opts); } +export function nomgapFastingSessionGet( + sessionId: string, + opts: NomGapClientOptions +): Promise { + return nomgapFetch(`/fasting/sessions/${sessionId}`, { method: 'GET' }, opts); +} + export function nomgapFastingGetStats(opts: NomGapClientOptions): Promise> { return nomgapFetch('/fasting/stats', { method: 'GET' }, opts); } diff --git a/services/mcp-server/src/modules/a2a/daily-brief-pipeline.ts b/services/mcp-server/src/modules/a2a/daily-brief-pipeline.ts new file mode 100644 index 00000000..c861b6ca --- /dev/null +++ b/services/mcp-server/src/modules/a2a/daily-brief-pipeline.ts @@ -0,0 +1,257 @@ +/** + * DailyBriefGenerationPipeline — A2A pipeline for MindLyst daily briefs. + * + * Agent roster (4 steps): + * 1. MemoryCollectorAgent — fan-out: fetch recent memory items per brain + * 2. SummaryAgent — call extraction-service 'memory-insight' per brain + * 3. BriefComposerAgent — pure assembly of DailyBriefDoc (no I/O) + * 4. DeliveryAgent — upsert brief via POST /daily-briefs + * + * MCP tools: + * mindlyst.brief.generate(userId, date?) — run the full pipeline + */ + +import { randomUUID } from 'node:crypto'; +import { z } from 'zod'; +import { registerTool } from '../tools/registry.js'; +import type { McpToolRequest } from '../tools/types.js'; +import { + mindlystBrainsList, + mindlystMemoryList, + mindlystBriefCreate, + type BrainDoc, + type MemoryItemDoc, + type DailyBriefDoc, +} from '../../lib/mindlyst-client.js'; +import { extractionRun } from '../../lib/extraction-client.js'; + +// ── Types ───────────────────────────────────────────────────────────────────── + +interface BrainMemoryBucket { + brainId: string; + brainName: string; + items: MemoryItemDoc[]; +} + +interface BrainSummaryResult { + brainId: string; + brainName: string; + summary: string; + highlights: string[]; +} + +export interface DailyBriefRunResult { + runId: string; + userId: string; + date: string; + totalBrainsConsidered: number; + totalMemoriesConsidered: number; + brief: DailyBriefDoc | null; + stored: boolean; + generatedAt: string; +} + +// ── Step 1: MemoryCollectorAgent ────────────────────────────────────────────── + +async function collectMemories( + userId: string, + lookbackHours: number, + opts: { token?: string; requestId?: string; adminUserId?: string } +): Promise { + const brainsResult = await mindlystBrainsList({}, opts); + const brains: BrainDoc[] = brainsResult.items; + + const buckets: BrainMemoryBucket[] = []; + + for (const brain of brains) { + const memResult = await mindlystMemoryList({ brainId: brain.id, limit: 20 }, opts); + + const cutoff = Date.now() - lookbackHours * 60 * 60 * 1000; + const recentItems = memResult.items.filter( + item => new Date(item.createdAt).getTime() >= cutoff + ); + + const sorted = recentItems + .sort((a, b) => b.triageResult.urgencyScore - a.triageResult.urgencyScore) + .slice(0, 10); + + if (sorted.length > 0) { + buckets.push({ brainId: brain.id, brainName: brain.name, items: sorted }); + } + } + + return buckets; +} + +// ── Step 2: SummaryAgent ────────────────────────────────────────────────────── + +async function summarizeBrain( + bucket: BrainMemoryBucket, + opts: { requestId?: string } +): Promise { + const text = bucket.items + .map(item => `[${item.triageResult.contentType}] ${item.rawContent}`) + .join('\n\n'); + + try { + const result = await extractionRun( + { text, taskId: 'memory-insight' }, + { requestId: opts.requestId } + ); + + const summaryItems = result.extractions.filter( + e => e.extraction_class === 'summary' || e.extraction_class === 'insight' + ); + const entityItems = result.extractions.filter( + e => e.extraction_class === 'entity' || e.extraction_class === 'theme' + ); + + const summary = + summaryItems.length > 0 + ? summaryItems.map(e => e.extraction_text).join(' ') + : bucket.items + .slice(0, 3) + .map(i => i.rawContent.slice(0, 80)) + .join(' | '); + + const highlights = entityItems.slice(0, 5).map(e => e.extraction_text); + + return { brainId: bucket.brainId, brainName: bucket.brainName, summary, highlights }; + } catch { + // Fallback: top-3 items summarised inline + const summary = bucket.items + .slice(0, 3) + .map(i => i.rawContent.slice(0, 100)) + .join(' | '); + return { brainId: bucket.brainId, brainName: bucket.brainName, summary, highlights: [] }; + } +} + +// ── Step 3: BriefComposerAgent (pure — no I/O) ─────────────────────────────── + +function composeBrief( + userId: string, + date: string, + brainSummaries: BrainSummaryResult[], + totalItems: number +): Omit { + const topItems = brainSummaries + .flatMap(b => b.highlights.slice(0, 2).map(h => ({ text: h, brainId: b.brainId }))) + .slice(0, 5); + + const summariesMap: Record = {}; + for (const b of brainSummaries) { + summariesMap[b.brainId] = b.summary; + } + + const greeting = + brainSummaries.length === 0 + ? 'No new memories captured today — a clean slate!' + : `${brainSummaries.length} brain${brainSummaries.length > 1 ? 's' : ''} active with ${totalItems} item${totalItems !== 1 ? 's' : ''} to review.`; + + return { + userId, + productId: 'mindlyst', + date, + greeting, + priorityItems: topItems, + brainSummaries: summariesMap, + }; +} + +// ── Step 4: DeliveryAgent ───────────────────────────────────────────────────── + +async function storeBrief( + draft: Omit, + opts: { token?: string; requestId?: string; adminUserId?: string } +): Promise<{ brief: DailyBriefDoc; stored: boolean }> { + try { + const stored = await mindlystBriefCreate( + { + date: draft.date, + greeting: draft.greeting, + priorityItems: draft.priorityItems, + brainSummaries: draft.brainSummaries, + }, + opts + ); + return { brief: stored, stored: true }; + } catch { + return { + brief: { + id: `brief_local_${randomUUID().slice(0, 8)}`, + createdAt: new Date().toISOString(), + ...draft, + }, + stored: false, + }; + } +} + +// ── Pipeline runner ─────────────────────────────────────────────────────────── + +export async function runDailyBriefPipeline( + userId: string, + date: string, + opts: { token?: string; requestId?: string } +): Promise { + const runId = `run_brief_${randomUUID().slice(0, 8)}`; + const generatedAt = new Date().toISOString(); + const clientOpts = { token: opts.token, requestId: opts.requestId, adminUserId: userId }; + + // Step 1: collect memories (default 24h lookback) + const buckets = await collectMemories(userId, 24, clientOpts).catch( + () => [] as BrainMemoryBucket[] + ); + + const totalMemoriesConsidered = buckets.reduce((acc, b) => acc + b.items.length, 0); + + // Step 2: summarise each brain (parallel, best-effort) + const summaryResults = await Promise.all( + buckets.map(b => summarizeBrain(b, { requestId: opts.requestId })) + ); + + // Step 3: compose brief (pure) + const draft = composeBrief(userId, date, summaryResults, totalMemoriesConsidered); + + // Step 4: store + const { brief, stored } = await storeBrief(draft, clientOpts); + + return { + runId, + userId, + date, + totalBrainsConsidered: buckets.length, + totalMemoriesConsidered, + brief, + stored, + generatedAt, + }; +} + +// ── MCP tool: mindlyst.brief.generate ───────────────────────────────────────── + +registerTool({ + name: 'mindlyst.brief.generate', + description: [ + 'A2A daily brief pipeline for MindLyst.', + 'Fetches recent memory items across all brains (last 24 h), runs extraction memory-insight', + 'per brain to produce summaries + highlights, assembles a DailyBriefDoc, and stores it via', + 'POST /daily-briefs. Falls back gracefully if extraction-service is unavailable (uses raw', + 'item text instead). Returns the generated brief + metadata. Requires admin role.', + ].join(' '), + requiredRole: 'admin', + inputSchema: z.object({ + userId: z.string().min(1).describe('MindLyst user ID to generate the brief for'), + date: z + .string() + .regex(/^\d{4}-\d{2}-\d{2}$/) + .optional() + .describe('ISO date (YYYY-MM-DD); defaults to today'), + }), + async execute(args, req: McpToolRequest) { + const token = req.headers.authorization?.slice(7); + const date = args.date ?? new Date().toISOString().slice(0, 10); + return runDailyBriefPipeline(args.userId, date, { token, requestId: req.id }); + }, +}); diff --git a/services/mcp-server/src/modules/a2a/marketplace-cert-pipeline.ts b/services/mcp-server/src/modules/a2a/marketplace-cert-pipeline.ts new file mode 100644 index 00000000..fcbadca1 --- /dev/null +++ b/services/mcp-server/src/modules/a2a/marketplace-cert-pipeline.ts @@ -0,0 +1,367 @@ +/** + * MarketplaceCertificationPipeline — A2A pipeline for JarvisJr marketplace review. + * + * Agent roster (4 steps): + * 1. SubmissionIngestionAgent — fetch listing, validate required fields + * 2. ContentSafetyAgent — run extraction 'triage' on systemPrompt + description + * 3. QualityEvalAgent — heuristic quality scoring (no live eval session) + * 4. CertificationDecisionAgent — approve / reject / pending_human_review via certify endpoint + * + * MCP tools: + * jarvis.marketplace.runCertification(listingId) — run the full pipeline + */ + +import { randomUUID } from 'node:crypto'; +import { z } from 'zod'; +import { registerTool } from '../tools/registry.js'; +import type { McpToolRequest } from '../tools/types.js'; +import { jarvisMarketplaceGetListing, jarvisMarketplaceCertify } from '../../lib/jarvis-client.js'; +import { extractionRun } from '../../lib/extraction-client.js'; + +// ── Types ───────────────────────────────────────────────────────────────────── + +type SafetyVerdict = 'pass' | 'flag' | 'reject'; +type CertDecision = 'approved' | 'rejected' | 'pending_human_review'; + +interface IngestionResult { + valid: boolean; + errors: string[]; + listing: Record; +} + +interface SafetyResult { + verdict: SafetyVerdict; + reasons: string[]; + confidenceScore: number; +} + +interface QualityResult { + score: number; + passed: boolean; + coherenceIssues: string[]; +} + +export interface CertificationRunResult { + runId: string; + listingId: string; + decision: CertDecision; + isVerified: boolean; + ingestion: IngestionResult; + safety: SafetyResult; + quality: QualityResult; + rejectionReasons: string[]; + certifiedAt: string; +} + +// ── Required fields for marketplace listings ────────────────────────────────── + +const REQUIRED_FIELDS = [ + 'name', + 'systemPrompt', + 'coachingFramework', + 'category', + 'tags', + 'voiceId', +] as const; + +// ── Prohibited policy classes (from extraction triage) ──────────────────────── + +const PROHIBITED_CLASSES = new Set([ + 'hate_speech', + 'harassment', + 'self_harm', + 'explicit_content', + 'dangerous_instructions', +]); +const FLAGGED_CLASSES = new Set(['profanity', 'controversial', 'misleading']); + +// ── Step 1: SubmissionIngestionAgent ───────────────────────────────────────── + +async function ingestSubmission( + listingId: string, + opts: { token?: string; requestId?: string } +): Promise { + const listing = await jarvisMarketplaceGetListing(listingId, opts); + const errors: string[] = []; + + for (const field of REQUIRED_FIELDS) { + const val = listing[field]; + if (!val || (typeof val === 'string' && val.trim() === '')) { + errors.push(`Missing required field: ${field}`); + } + if (field === 'tags' && Array.isArray(val) && val.length === 0) { + errors.push('tags[] must have at least one entry'); + } + } + + return { valid: errors.length === 0, errors, listing }; +} + +// ── Step 2: ContentSafetyAgent ──────────────────────────────────────────────── + +async function checkContentSafety( + listingId: string, + listing: Record, + opts: { requestId?: string } +): Promise { + const systemPrompt = String(listing['systemPrompt'] ?? ''); + const description = String(listing['description'] ?? ''); + const text = [systemPrompt, description].filter(Boolean).join('\n\n'); + + if (!text.trim()) { + return { verdict: 'pass', reasons: [], confidenceScore: 1.0 }; + } + + try { + const result = await extractionRun({ text, taskId: 'triage' }, { requestId: opts.requestId }); + + const prohibitedHits = result.extractions.filter(e => + PROHIBITED_CLASSES.has(e.extraction_class) + ); + const flaggedHits = result.extractions.filter(e => FLAGGED_CLASSES.has(e.extraction_class)); + + if (prohibitedHits.length > 0) { + return { + verdict: 'reject', + reasons: prohibitedHits.map( + e => `Prohibited content: ${e.extraction_class} — "${e.extraction_text.slice(0, 80)}"` + ), + confidenceScore: 0.95, + }; + } + + if (flaggedHits.length > 0) { + return { + verdict: 'flag', + reasons: flaggedHits.map( + e => `Borderline content: ${e.extraction_class} — "${e.extraction_text.slice(0, 80)}"` + ), + confidenceScore: 0.7, + }; + } + + return { verdict: 'pass', reasons: [], confidenceScore: 0.9 }; + } catch { + // Extraction unavailable — pass with low confidence (human review will catch issues) + return { + verdict: 'pass', + reasons: ['extraction-service unavailable — manual review recommended'], + confidenceScore: 0.4, + }; + } +} + +// ── Step 3: QualityEvalAgent (heuristic — no live session) ─────────────────── + +function evaluateQuality(listing: Record): QualityResult { + const coherenceIssues: string[] = []; + let score = 1.0; + + const systemPrompt = String(listing['systemPrompt'] ?? ''); + const name = String(listing['name'] ?? ''); + const description = String(listing['description'] ?? ''); + + // System prompt length check + if (systemPrompt.length < 50) { + coherenceIssues.push('systemPrompt is too short (< 50 chars) — coaching context insufficient'); + score -= 0.3; + } + if (systemPrompt.length > 8000) { + coherenceIssues.push('systemPrompt is very long (> 8000 chars) — may cause latency issues'); + score -= 0.1; + } + + // Name quality + if (name.length < 3 || name.length > 60) { + coherenceIssues.push('name should be 3–60 characters'); + score -= 0.1; + } + + // Description + if (!description || description.trim().length < 20) { + coherenceIssues.push('description is missing or too short — users need context'); + score -= 0.2; + } + + // Tags + const tags = listing['tags']; + if (!Array.isArray(tags) || tags.length < 2) { + coherenceIssues.push('At least 2 tags recommended for discoverability'); + score -= 0.05; + } + + // Coaching framework must be a known value + const validFrameworks = [ + 'growth', + 'accountability', + 'reflective', + 'socratic', + 'motivational', + 'instructional', + 'CBT', + 'custom', + ]; + const framework = String(listing['coachingFramework'] ?? ''); + if (framework && !validFrameworks.includes(framework)) { + coherenceIssues.push(`coachingFramework '${framework}' is not a recognized value`); + score -= 0.1; + } + + const finalScore = Math.max(0, Math.min(1, score)); + return { score: finalScore, passed: finalScore >= 0.6, coherenceIssues }; +} + +// ── Step 4: CertificationDecisionAgent ─────────────────────────────────────── + +async function makeCertDecision( + listingId: string, + ingestion: IngestionResult, + safety: SafetyResult, + quality: QualityResult, + opts: { token?: string; requestId?: string } +): Promise<{ decision: CertDecision; isVerified: boolean; rejectionReasons: string[] }> { + const rejectionReasons: string[] = []; + + if (!ingestion.valid) rejectionReasons.push(...ingestion.errors); + if (safety.verdict === 'reject') rejectionReasons.push(...safety.reasons); + if (!quality.passed) rejectionReasons.push(...quality.coherenceIssues); + + let decision: CertDecision; + let isVerified: boolean; + + if (safety.verdict === 'reject' || !ingestion.valid) { + decision = 'rejected'; + isVerified = false; + } else if (safety.verdict === 'flag') { + decision = 'pending_human_review'; + isVerified = false; + } else if (!quality.passed) { + decision = 'rejected'; + isVerified = false; + } else { + decision = 'approved'; + isVerified = true; + } + + // Call the backend certify endpoint + if (decision === 'approved' || decision === 'rejected') { + try { + await jarvisMarketplaceCertify( + listingId, + { + decision: decision === 'approved' ? 'approved' : 'rejected', + notes: rejectionReasons.join('; ') || undefined, + }, + opts + ); + } catch { + // Best-effort — still return the decision result even if the update fails + } + } + + return { decision, isVerified, rejectionReasons }; +} + +// ── Pipeline runner ─────────────────────────────────────────────────────────── + +export async function runMarketplaceCertPipeline( + listingId: string, + opts: { token?: string; requestId?: string } +): Promise { + const runId = `run_cert_${randomUUID().slice(0, 8)}`; + const certifiedAt = new Date().toISOString(); + + // Step 1: ingest + let ingestion: IngestionResult; + try { + ingestion = await ingestSubmission(listingId, opts); + } catch (err) { + const msg = err instanceof Error ? err.message : String(err); + return { + runId, + listingId, + certifiedAt, + decision: 'pending_human_review', + isVerified: false, + ingestion: { valid: false, errors: [`Failed to fetch listing: ${msg}`], listing: {} }, + safety: { verdict: 'pass', reasons: [], confidenceScore: 0 }, + quality: { score: 0, passed: false, coherenceIssues: [] }, + rejectionReasons: [`Failed to fetch listing: ${msg}`], + }; + } + + // Step 2: safety check + const safety = await checkContentSafety(listingId, ingestion.listing, { + requestId: opts.requestId, + }); + + // Halt on reject (no need for quality eval) + if (safety.verdict === 'reject' || !ingestion.valid) { + const quality: QualityResult = { score: 0, passed: false, coherenceIssues: [] }; + const { decision, isVerified, rejectionReasons } = await makeCertDecision( + listingId, + ingestion, + safety, + quality, + opts + ); + return { + runId, + listingId, + certifiedAt, + decision, + isVerified, + ingestion, + safety, + quality, + rejectionReasons, + }; + } + + // Step 3: quality eval (heuristic) + const quality = evaluateQuality(ingestion.listing); + + // Step 4: decision + backend update + const { decision, isVerified, rejectionReasons } = await makeCertDecision( + listingId, + ingestion, + safety, + quality, + opts + ); + + return { + runId, + listingId, + certifiedAt, + decision, + isVerified, + ingestion, + safety, + quality, + rejectionReasons, + }; +} + +// ── MCP tool: jarvis.marketplace.runCertification ──────────────────────────── + +registerTool({ + name: 'jarvis.marketplace.runCertification', + description: [ + 'A2A marketplace certification pipeline for JarvisJr.', + 'Runs 4 agents in sequence: SubmissionIngestionAgent (validate required fields),', + 'ContentSafetyAgent (extraction triage on systemPrompt + description — detects prohibited/flagged content),', + 'QualityEvalAgent (heuristic scoring: prompt length, description, tags, framework),', + 'CertificationDecisionAgent (approve / reject / pending_human_review + updates listing via backend).', + 'Returns full CertificationRunResult with per-agent verdicts and rejectionReasons.', + 'Requires admin role.', + ].join(' '), + requiredRole: 'admin', + inputSchema: z.object({ + listingId: z.string().min(1).describe('Marketplace listing ID to certify'), + }), + async execute(args, req: McpToolRequest) { + const token = req.headers.authorization?.slice(7); + return runMarketplaceCertPipeline(args.listingId, { token, requestId: req.id }); + }, +}); diff --git a/services/mcp-server/src/modules/a2a/safety-monitor-pipeline.ts b/services/mcp-server/src/modules/a2a/safety-monitor-pipeline.ts new file mode 100644 index 00000000..4748feaa --- /dev/null +++ b/services/mcp-server/src/modules/a2a/safety-monitor-pipeline.ts @@ -0,0 +1,299 @@ +/** + * SafetyMonitorAgent — A2A pipeline for NomGap extended-fast safety checks. + * + * Agent roster (4 steps): + * 1. FastStateInspectorAgent — fetch active fasting session + elapsed hours + * 2. ThresholdEvaluatorAgent — pure eval: safe / caution / critical + pushType + * 3. SafetyNotificationAgent — fire push notification + log telemetry event + * 4. ExtendedFastHandoffAgent — emit A2A handoff artifact when elapsedHours >= 72 + * + * MCP tools: + * nomgap.safety.check(userId) — run the full pipeline + * nomgap.safety.getThresholds() — return the threshold table (no I/O) + */ + +import { randomUUID } from 'node:crypto'; +import { z } from 'zod'; +import { registerTool } from '../tools/registry.js'; +import type { McpToolRequest } from '../tools/types.js'; +import { + nomgapFastingSessionsList, + nomgapPushFire, + type FastingSessionDoc, +} from '../../lib/nomgap-client.js'; + +// ── Types ───────────────────────────────────────────────────────────────────── + +type SafetyLevel = 'safe' | 'caution' | 'critical'; +type PushType = 'refeeding_reminder' | 'extended_fast_warning' | 'electrolyte_reminder' | null; + +interface FastStateResult { + sessionId: string; + protocolId: string; + startedAt: string; + elapsedHours: number; + targetHours: number; + isActive: boolean; +} + +interface ThresholdResult { + safetyLevel: SafetyLevel; + pushType: PushType; + escalate: boolean; + messageVariables: Record; +} + +interface NotificationResult { + notified: boolean; + pushType: PushType; +} + +interface HandoffResult { + handoffEmitted: boolean; + handoffPayload?: { + productId: 'nomgap'; + agentTarget: 'ExtendedFastDetectionAgent'; + userId: string; + sessionId: string; + elapsedHours: number; + triggeredAt: string; + }; +} + +export interface SafetyCheckResult { + runId: string; + userId: string; + productId: 'nomgap'; + sessionId: string | null; + elapsedHours: number; + safetyLevel: SafetyLevel; + actionsTaken: string[]; + handoff: HandoffResult | null; + checkedAt: string; +} + +// ── Threshold table ─────────────────────────────────────────────────────────── + +export const SAFETY_THRESHOLDS = [ + { + minHours: 72, + maxHours: Infinity, + level: 'critical' as SafetyLevel, + pushType: 'extended_fast_warning' as PushType, + escalate: true, + }, + { + minHours: 48, + maxHours: 72, + level: 'critical' as SafetyLevel, + pushType: 'refeeding_reminder' as PushType, + escalate: false, + }, + { + minHours: 42, + maxHours: 48, + level: 'caution' as SafetyLevel, + pushType: 'refeeding_reminder' as PushType, + escalate: false, + }, + { + minHours: 24, + maxHours: 42, + level: 'caution' as SafetyLevel, + pushType: 'electrolyte_reminder' as PushType, + escalate: false, + }, + { + minHours: 0, + maxHours: 24, + level: 'safe' as SafetyLevel, + pushType: null as PushType, + escalate: false, + }, +]; + +// ── Step 1: FastStateInspectorAgent ────────────────────────────────────────── + +async function inspectFastState( + userId: string, + opts: { token?: string; requestId?: string } +): Promise { + const result = await nomgapFastingSessionsList( + { status: 'active', limit: 1 }, + { token: opts.token, requestId: opts.requestId } + ); + + const session = result.items[0] as FastingSessionDoc | undefined; + if (!session) return null; + + const startedAt = session.startedAt ?? session.createdAt; + const elapsedMs = Date.now() - new Date(startedAt).getTime(); + const elapsedHours = elapsedMs / (1000 * 60 * 60); + + return { + sessionId: session.id, + protocolId: session.protocolId ?? 'unknown', + startedAt, + elapsedHours: Math.round(elapsedHours * 10) / 10, + targetHours: session.targetDurationMs + ? (session.targetDurationMs as number) / (1000 * 60 * 60) + : 0, + isActive: true, + }; +} + +// ── Step 2: ThresholdEvaluatorAgent (pure — no I/O) ────────────────────────── + +function evaluateThreshold(state: FastStateResult): ThresholdResult { + const row = + SAFETY_THRESHOLDS.find( + t => state.elapsedHours >= t.minHours && state.elapsedHours < t.maxHours + ) ?? SAFETY_THRESHOLDS[SAFETY_THRESHOLDS.length - 1]; + + return { + safetyLevel: row.level, + pushType: row.pushType, + escalate: row.escalate, + messageVariables: { + hours: String(Math.floor(state.elapsedHours)), + protocolId: state.protocolId, + refeedingTip: 'Start with small amounts of easily digestible foods.', + }, + }; +} + +// ── Step 3: SafetyNotificationAgent ────────────────────────────────────────── + +async function sendSafetyNotification( + userId: string, + threshold: ThresholdResult, + sessionId: string, + opts: { token?: string; requestId?: string } +): Promise { + if (!threshold.pushType) return { notified: false, pushType: null }; + + try { + await nomgapPushFire( + { + type: threshold.pushType as Parameters[0]['type'], + userId, + variables: threshold.messageVariables, + }, + { token: opts.token, requestId: opts.requestId } + ); + return { notified: true, pushType: threshold.pushType }; + } catch { + return { notified: false, pushType: threshold.pushType }; + } +} + +// ── Step 4: ExtendedFastHandoffAgent ───────────────────────────────────────── + +function emitExtendedFastHandoff( + userId: string, + sessionId: string, + elapsedHours: number +): HandoffResult { + return { + handoffEmitted: true, + handoffPayload: { + productId: 'nomgap', + agentTarget: 'ExtendedFastDetectionAgent', + userId, + sessionId, + elapsedHours, + triggeredAt: new Date().toISOString(), + }, + }; +} + +// ── Pipeline runner ─────────────────────────────────────────────────────────── + +export async function runSafetyMonitorPipeline( + userId: string, + opts: { token?: string; requestId?: string } +): Promise { + const runId = `run_safety_${randomUUID().slice(0, 8)}`; + const checkedAt = new Date().toISOString(); + const actionsTaken: string[] = []; + + // Step 1: inspect fast state + const fastState = await inspectFastState(userId, opts).catch(() => null); + + if (!fastState) { + return { + runId, + userId, + productId: 'nomgap', + sessionId: null, + elapsedHours: 0, + safetyLevel: 'safe', + actionsTaken: ['no_active_session'], + handoff: null, + checkedAt, + }; + } + + // Step 2: evaluate threshold (pure) + const threshold = evaluateThreshold(fastState); + actionsTaken.push(`threshold:${threshold.safetyLevel}`); + + // Step 3: send notification + const notification = await sendSafetyNotification(userId, threshold, fastState.sessionId, opts); + if (notification.notified) actionsTaken.push(`push:${notification.pushType}`); + + // Step 4: handoff for 72h+ fasts + let handoff: HandoffResult | null = null; + if (threshold.escalate) { + handoff = emitExtendedFastHandoff(userId, fastState.sessionId, fastState.elapsedHours); + actionsTaken.push('handoff:ExtendedFastDetectionAgent'); + } + + return { + runId, + userId, + productId: 'nomgap', + sessionId: fastState.sessionId, + elapsedHours: fastState.elapsedHours, + safetyLevel: threshold.safetyLevel, + actionsTaken, + handoff, + checkedAt, + }; +} + +// ── MCP tool: nomgap.safety.check ───────────────────────────────────────────── + +registerTool({ + name: 'nomgap.safety.check', + description: [ + 'A2A safety pipeline for NomGap extended fasts.', + "Fetches the user's active fasting session, evaluates against safety thresholds", + '(safe <24h / caution 24-48h / critical 48h+), fires the appropriate push notification', + '(electrolyte_reminder / refeeding_reminder / extended_fast_warning), and emits an', + 'A2A handoff artifact to ExtendedFastDetectionAgent for fasts ≥ 72 h.', + 'Returns SafetyCheckResult with safetyLevel, actionsTaken, and handoff payload.', + 'Requires admin role.', + ].join(' '), + requiredRole: 'admin', + inputSchema: z.object({ + userId: z.string().min(1).describe('NomGap user ID to check'), + }), + async execute(args, req: McpToolRequest) { + const token = req.headers.authorization?.slice(7); + return runSafetyMonitorPipeline(args.userId, { token, requestId: req.id }); + }, +}); + +// ── MCP tool: nomgap.safety.getThresholds ──────────────────────────────────── + +registerTool({ + name: 'nomgap.safety.getThresholds', + description: + 'Returns the NomGap fasting safety threshold table: hours ranges, safety levels, push types, and escalation flags. Pure read — no I/O, no auth required beyond viewer role.', + requiredRole: 'viewer', + inputSchema: z.object({}), + async execute() { + return { thresholds: SAFETY_THRESHOLDS }; + }, +}); diff --git a/services/mcp-server/src/modules/a2a/sync-diagnostics-pipeline.ts b/services/mcp-server/src/modules/a2a/sync-diagnostics-pipeline.ts new file mode 100644 index 00000000..4bb26d3a --- /dev/null +++ b/services/mcp-server/src/modules/a2a/sync-diagnostics-pipeline.ts @@ -0,0 +1,352 @@ +/** + * SyncDiagnosticsAgent — A2A pipeline for PeakPulse sync failure diagnostics. + * + * Agent roster (4 steps): + * 1. SyncFailureInspectorAgent — stats + recent failure count from telemetry + * 2. DiagnosticsSessionAgent — create platform diagnostics session (conditional) + * 3. SyncRetryObserverAgent — collect current logs + traces from session (best-effort) + * 4. SyncDiagnosticReportAgent — assemble report + optional extraction entity run + * + * MCP tools: + * peakpulse.sync.diagnose(userId, deviceId?, errorCode?, platform?) — run pipeline + */ + +import { randomUUID } from 'node:crypto'; +import { z } from 'zod'; +import { registerTool } from '../tools/registry.js'; +import type { McpToolRequest } from '../tools/types.js'; +import { peakpulseGetStats } from '../../lib/peakpulse-client.js'; +import { + telemetryQuery, + diagnosticsCreateSession, + diagnosticsGetLogs, + diagnosticsGetTraces, + diagnosticsUpdateSession, + type DebugSession, +} from '../../lib/platform-client.js'; +import { extractionRun, type ExtractionItem } from '../../lib/extraction-client.js'; + +// ── Types ───────────────────────────────────────────────────────────────────── + +type FailurePattern = 'transient' | 'auth' | 'persistent' | 'unknown'; + +interface SyncFailureInspection { + userId: string; + deviceId: string | null; + queueDepth: number; + lastSuccessfulSync: string | null; + failurePattern: FailurePattern; + recentFailureCount: number; + shouldStartDiagnostics: boolean; +} + +interface DiagnosticsCapture { + skipped: boolean; + skipReason?: string; + session?: DebugSession; +} + +interface ObserverCapture { + networkTraces: unknown[]; + logEntries: unknown[]; + syncAttemptDetected: boolean; + timedOut: boolean; +} + +export interface SyncDiagnosticReport { + runId: string; + productId: 'peakpulse'; + userId: string; + deviceId: string | null; + failurePattern: FailurePattern; + diagnosticsSessionId: string | null; + queueDepth: number; + recentFailureCount: number; + syncAttemptDetected: boolean; + rootCauseSummary: string; + recommendedAction: string; + extractedEntities: ExtractionItem[]; + generatedAt: string; +} + +// ── Step 1: SyncFailureInspectorAgent ──────────────────────────────────────── + +async function inspectSyncFailures( + userId: string, + deviceId: string | null, + errorCode: string | null, + platform: string | null, + opts: { token?: string; requestId?: string } +): Promise { + // Fetch session stats for queue depth + let queueDepth = 0; + let lastSuccessfulSync: string | null = null; + try { + const stats = await peakpulseGetStats(opts); + queueDepth = ((stats as Record).unsyncedCount as number) ?? 0; + lastSuccessfulSync = ((stats as Record).lastSyncAt as string) ?? null; + } catch { + // best-effort + } + + // Query telemetry for recent sync failures (last 24h) + const now = new Date(); + const from = new Date(now.getTime() - 24 * 60 * 60 * 1000).toISOString(); + let recentFailureCount = 0; + try { + const result = await telemetryQuery( + { + productId: 'peakpulse', + eventType: 'sync_upload_failed', + from, + to: now.toISOString(), + limit: 20, + }, + { ...opts, productId: 'peakpulse' } + ); + recentFailureCount = result.total; + } catch { + // best-effort + } + + // Classify failure pattern + let failurePattern: FailurePattern; + if (errorCode === 'auth_401' || errorCode === 'unauthorized') { + failurePattern = 'auth'; + } else if (recentFailureCount >= 5) { + failurePattern = 'persistent'; + } else if (recentFailureCount <= 1) { + failurePattern = 'transient'; + } else { + failurePattern = 'unknown'; + } + + const shouldStartDiagnostics = + recentFailureCount >= 2 || failurePattern === 'auth' || failurePattern === 'persistent'; + + return { + userId, + deviceId, + queueDepth, + lastSuccessfulSync, + failurePattern, + recentFailureCount, + shouldStartDiagnostics, + }; +} + +// ── Step 2: DiagnosticsSessionAgent ────────────────────────────────────────── + +async function startDiagnosticsSession( + inspection: SyncFailureInspection, + opts: { token?: string; requestId?: string } +): Promise { + if (!inspection.shouldStartDiagnostics) { + return { skipped: true, skipReason: 'Single transient failure — diagnostics not warranted' }; + } + + const collectionLevel = inspection.failurePattern === 'auth' ? 'trace' : 'debug'; + + try { + const session = await diagnosticsCreateSession( + { + productId: 'peakpulse', + targetUserId: inspection.userId, + ...(inspection.deviceId ? { targetAnonymousId: inspection.deviceId } : {}), + collectionLevel, + captureLogs: true, + captureNetwork: true, + maxDurationMinutes: 15, + }, + opts + ); + return { skipped: false, session }; + } catch (err) { + const msg = err instanceof Error ? err.message : String(err); + return { skipped: true, skipReason: `Failed to create diagnostics session: ${msg}` }; + } +} + +// ── Step 3: SyncRetryObserverAgent (snapshot — no long-poll) ───────────────── + +async function captureSessionData( + sessionId: string, + opts: { token?: string; requestId?: string } +): Promise { + let networkTraces: unknown[] = []; + let logEntries: unknown[] = []; + + try { + const [logsResult, tracesResult] = await Promise.all([ + diagnosticsGetLogs(sessionId, { limit: 50 }, opts), + diagnosticsGetTraces(sessionId, { limit: 50 }, opts), + ]); + logEntries = logsResult.logs; + networkTraces = tracesResult.traces; + } catch { + // best-effort + } + + const syncAttemptDetected = networkTraces.length > 0 || logEntries.length > 0; + + return { networkTraces, logEntries, syncAttemptDetected, timedOut: false }; +} + +// ── Step 4: SyncDiagnosticReportAgent ──────────────────────────────────────── + +function deriveRootCause(inspection: SyncFailureInspection, observer: ObserverCapture): string { + if (inspection.failurePattern === 'auth') { + return `Authentication failure pattern detected (${inspection.recentFailureCount} failures in 24h). Token may be expired or revoked.`; + } + if (inspection.failurePattern === 'persistent' && inspection.queueDepth > 0) { + return `Persistent sync failure with ${inspection.queueDepth} session(s) queued. Likely network or server-side conflict.`; + } + if (inspection.failurePattern === 'transient') { + return `Single transient failure — likely a momentary network interruption. No action needed.`; + } + if (!observer.syncAttemptDetected) { + return `No sync attempt captured in diagnostics window — device may not have retried yet.`; + } + return `Sync failure detected with ${inspection.recentFailureCount} occurrences. Review network traces for root cause.`; +} + +function deriveRecommendedAction(inspection: SyncFailureInspection): string { + if (inspection.failurePattern === 'auth') + return 'Force a token refresh on the device — auth credentials are stale.'; + if (inspection.queueDepth > 10) + return 'Investigate Cosmos DB conflict resolution — high queue depth suggests repeated write conflicts.'; + if (inspection.failurePattern === 'transient') + return 'Monitor for recurrence — single failure, no action required now.'; + return 'Retry sync on WiFi; if failure persists, check platform-service Cosmos write throughput.'; +} + +async function buildReport( + runId: string, + inspection: SyncFailureInspection, + diagnosticsCapture: DiagnosticsCapture, + observer: ObserverCapture, + opts: { requestId?: string } +): Promise { + // Close diagnostics session + if (diagnosticsCapture.session) { + try { + await diagnosticsUpdateSession(diagnosticsCapture.session.id, { status: 'completed' }, opts); + } catch { + // best-effort + } + } + + // Optional: extract entities from log text + let extractedEntities: ExtractionItem[] = []; + const logText = (observer.logEntries as Array>) + .slice(0, 10) + .map(l => String(l['message'] ?? l['msg'] ?? '')) + .filter(Boolean) + .join('\n'); + + if (logText.length > 20) { + try { + const result = await extractionRun( + { text: logText, taskId: 'triage' }, + { requestId: opts.requestId } + ); + extractedEntities = result.extractions.filter(e => + ['error_code', 'url', 'status_code', 'entity'].includes(e.extraction_class) + ); + } catch { + // best-effort + } + } + + return { + runId, + productId: 'peakpulse', + userId: inspection.userId, + deviceId: inspection.deviceId, + failurePattern: inspection.failurePattern, + diagnosticsSessionId: diagnosticsCapture.session?.id ?? null, + queueDepth: inspection.queueDepth, + recentFailureCount: inspection.recentFailureCount, + syncAttemptDetected: observer.syncAttemptDetected, + rootCauseSummary: deriveRootCause(inspection, observer), + recommendedAction: deriveRecommendedAction(inspection), + extractedEntities, + generatedAt: new Date().toISOString(), + }; +} + +// ── Pipeline runner ─────────────────────────────────────────────────────────── + +export async function runSyncDiagnosticsPipeline( + userId: string, + deviceId: string | null, + errorCode: string | null, + platform: string | null, + opts: { token?: string; requestId?: string } +): Promise { + const runId = `run_syncd_${randomUUID().slice(0, 8)}`; + + // Step 1: inspect + const inspection = await inspectSyncFailures(userId, deviceId, errorCode, platform, opts); + + // Step 2: diagnostics session (conditional) + const diagnosticsCapture = await startDiagnosticsSession(inspection, opts); + + // Step 3: capture logs/traces (if session created) + let observer: ObserverCapture = { + networkTraces: [], + logEntries: [], + syncAttemptDetected: false, + timedOut: false, + }; + if (diagnosticsCapture.session) { + observer = await captureSessionData(diagnosticsCapture.session.id, opts); + } + + // Step 4: build report + return buildReport(runId, inspection, diagnosticsCapture, observer, { + requestId: opts.requestId, + }); +} + +// ── MCP tool: peakpulse.sync.diagnose ──────────────────────────────────────── + +registerTool({ + name: 'peakpulse.sync.diagnose', + description: [ + 'A2A sync diagnostics pipeline for PeakPulse.', + 'Step 1: SyncFailureInspectorAgent — queries current session queue depth + recent sync_upload_failed', + 'telemetry events to classify the failure pattern (transient/auth/persistent/unknown).', + 'Step 2: DiagnosticsSessionAgent — creates a platform diagnostics session targeting the device', + 'if recentFailureCount >= 2 or pattern is auth/persistent.', + 'Step 3: SyncRetryObserverAgent — collects current logs + network traces from the session.', + 'Step 4: SyncDiagnosticReportAgent — assembles a structured report with rootCauseSummary,', + 'recommendedAction, and optional extraction entities from log text.', + 'Requires admin role.', + ].join(' '), + requiredRole: 'admin', + inputSchema: z.object({ + userId: z.string().min(1).describe('PeakPulse user ID'), + deviceId: z + .string() + .optional() + .describe('Device ID (anonymousInstallId) — narrows diagnostics scope'), + errorCode: z + .string() + .optional() + .describe( + 'Error code from the failure event (e.g. network_timeout, auth_401, cosmos_conflict)' + ), + platform: z.enum(['ios', 'android']).optional().describe('Platform of the failing device'), + }), + async execute(args, req: McpToolRequest) { + const token = req.headers.authorization?.slice(7); + return runSyncDiagnosticsPipeline( + args.userId, + args.deviceId ?? null, + args.errorCode ?? null, + args.platform ?? null, + { token, requestId: req.id } + ); + }, +}); diff --git a/services/mcp-server/src/server.ts b/services/mcp-server/src/server.ts index aec4b777..46aeb285 100644 --- a/services/mcp-server/src/server.ts +++ b/services/mcp-server/src/server.ts @@ -35,6 +35,10 @@ import './modules/platform/diagnostics-tools.js'; import './modules/extraction/extraction-tools.js'; import './modules/support/debug-pack.js'; import './modules/a2a/pipeline-tool.js'; +import './modules/a2a/daily-brief-pipeline.js'; +import './modules/a2a/marketplace-cert-pipeline.js'; +import './modules/a2a/safety-monitor-pipeline.js'; +import './modules/a2a/sync-diagnostics-pipeline.js'; import './modules/mindlyst/mindlyst-tools.js'; import './modules/lysnrai/lysnrai-tools.js'; import './modules/jarvis/jarvis-tools.js';