diff --git a/services/mcp-server/src/modules/a2a/calendar-import-pipeline.ts b/services/mcp-server/src/modules/a2a/calendar-import-pipeline.ts new file mode 100644 index 00000000..b1db6d18 --- /dev/null +++ b/services/mcp-server/src/modules/a2a/calendar-import-pipeline.ts @@ -0,0 +1,304 @@ +/** + * CalendarImportAgent — A2A pipeline for ChronoMind calendar event import + conflict detection. + * + * Agent roster (3 steps): + * 1. EventValidatorAgent — validate supplied iCal event objects against ChronoMind timer schema + * 2. ConflictDetectorAgent — compare events against existing timers for time overlaps + * 3. ImportReportAgent — report valid events ready to import, conflicts, and skipped items + * + * MCP tools: + * chronomind.calendar.import(events, dryRun?) — validate + conflict-check a batch of calendar events + */ + +import { randomUUID } from 'node:crypto'; +import { z } from 'zod'; +import { registerTool } from '../tools/registry.js'; +import type { McpToolRequest } from '../tools/types.js'; +import { + chronomindTimersList, + chronomindTimerCreate, + type TimerDoc, +} from '../../lib/chronomind-client.js'; +import { config } from '../../lib/config.js'; + +// ── Input schema for a single calendar event ────────────────────────────────── + +const CalendarEventSchema = z.object({ + uid: z.string().optional(), + summary: z.string().min(1), + dtstart: z.string().datetime({ offset: true }), + dtend: z.string().datetime({ offset: true }).optional(), + durationMinutes: z.coerce.number().int().min(1).optional(), + description: z.string().optional(), + location: z.string().optional(), + rrule: z.string().optional(), +}); + +type CalendarEvent = z.infer; + +// ── Types ────────────────────────────────────────────────────────────────────── + +type ValidationStatus = 'valid' | 'invalid'; +type ConflictStatus = 'no_conflict' | 'conflict' | 'partial_overlap'; + +interface EventValidationResult { + uid: string; + summary: string; + validationStatus: ValidationStatus; + validationError?: string; + targetTime: string; + endTime: string | null; + durationMinutes: number | null; +} + +interface EventWithConflict extends EventValidationResult { + conflictStatus: ConflictStatus; + conflictingTimerIds: string[]; +} + +interface ImportedTimer { + uid: string; + timerId: string; + summary: string; +} + +export interface CalendarImportReport { + runId: string; + productId: 'chronomind'; + dryRun: boolean; + totalEvents: number; + validEvents: number; + invalidEvents: number; + conflictEvents: number; + importedCount: number; + skippedCount: number; + perEvent: EventWithConflict[]; + importedTimers: ImportedTimer[]; + summary: string; + generatedAt: string; +} + +// ── Step 1: EventValidatorAgent ──────────────────────────────────────────────── + +function validateEvent(event: CalendarEvent, index: number): EventValidationResult { + const uid = event.uid ?? `event-${index}`; + + let endTime: string | null = null; + let durationMinutes: number | null = null; + + if (event.dtend) { + endTime = event.dtend; + durationMinutes = Math.round( + (new Date(event.dtend).getTime() - new Date(event.dtstart).getTime()) / 60000 + ); + } else if (event.durationMinutes) { + durationMinutes = event.durationMinutes; + endTime = new Date( + new Date(event.dtstart).getTime() + event.durationMinutes * 60000 + ).toISOString(); + } + + if (durationMinutes !== null && durationMinutes <= 0) { + return { + uid, + summary: event.summary, + validationStatus: 'invalid', + validationError: 'Event end time is before or equal to start time.', + targetTime: event.dtstart, + endTime, + durationMinutes, + }; + } + + const eventDate = new Date(event.dtstart); + if (isNaN(eventDate.getTime())) { + return { + uid, + summary: event.summary, + validationStatus: 'invalid', + validationError: 'Invalid dtstart date format.', + targetTime: event.dtstart, + endTime, + durationMinutes, + }; + } + + return { + uid, + summary: event.summary, + validationStatus: 'valid', + targetTime: event.dtstart, + endTime, + durationMinutes, + }; +} + +// ── Step 2: ConflictDetectorAgent ───────────────────────────────────────────── + +function detectConflict( + event: EventValidationResult, + existingTimers: TimerDoc[] +): EventWithConflict { + const eventStart = new Date(event.targetTime).getTime(); + const eventEnd = event.endTime ? new Date(event.endTime).getTime() : eventStart + 60000; + + const conflictingTimerIds: string[] = []; + let conflictStatus: ConflictStatus = 'no_conflict'; + + for (const timer of existingTimers) { + const timerTarget = (timer as unknown as Record)['targetTime']; + if (!timerTarget || typeof timerTarget !== 'string') continue; + + const timerStart = new Date(timerTarget).getTime(); + const timerDuration = + (((timer as unknown as Record)['durationSeconds'] as number) ?? 0) * 1000; + const timerEnd = timerStart + (timerDuration || 3600000); + + const overlaps = eventStart < timerEnd && eventEnd > timerStart; + if (overlaps) { + conflictingTimerIds.push(timer.id); + const isExact = eventStart === timerStart && eventEnd === timerEnd; + conflictStatus = isExact ? 'conflict' : 'partial_overlap'; + } + } + + return { ...event, conflictStatus, conflictingTimerIds }; +} + +// ── Step 3: ImportReportAgent ───────────────────────────────────────────────── + +async function importValidEvents( + events: EventWithConflict[], + originalEvents: CalendarEvent[], + dryRun: boolean, + opts: { token?: string; requestId?: string } +): Promise { + if (dryRun) return []; + + const importable = events.filter( + e => e.validationStatus === 'valid' && e.conflictStatus === 'no_conflict' + ); + + const imported: ImportedTimer[] = []; + for (const event of importable) { + const orig = originalEvents.find(o => (o.uid ?? '') === event.uid); + try { + const timer = await chronomindTimerCreate( + { + id: randomUUID(), + type: 'alarm', + label: event.summary, + targetTime: event.targetTime, + urgency: 'standard', + state: 'idle', + syncVersion: 1, + category: orig?.location ?? undefined, + }, + opts + ); + imported.push({ uid: event.uid, timerId: timer.id, summary: event.summary }); + } catch { + // best-effort — log failure but continue + } + } + return imported; +} + +// ── Pipeline runner ──────────────────────────────────────────────────────────── + +async function runCalendarImportPipeline( + events: CalendarEvent[], + dryRun: boolean, + req: McpToolRequest +): Promise { + const runId = randomUUID(); + const opts = { token: req.headers.authorization?.slice(7), requestId: req.id }; + + req.log.info( + { runId, stepId: 'validate', eventCount: events.length, dryRun }, + 'EventValidatorAgent start' + ); + const validationResults = events.map((e, i) => validateEvent(e, i)); + const validEvents = validationResults.filter(r => r.validationStatus === 'valid'); + const invalidEvents = validationResults.filter(r => r.validationStatus === 'invalid'); + req.log.info( + { runId, stepId: 'validate', valid: validEvents.length, invalid: invalidEvents.length }, + 'EventValidatorAgent done' + ); + + req.log.info( + { runId, stepId: 'conflict', validCount: validEvents.length }, + 'ConflictDetectorAgent start' + ); + let existingTimers: TimerDoc[] = []; + if (validEvents.length > 0) { + try { + const result = await chronomindTimersList({ limit: config.QUERY_MAX_LIMIT }, opts); + existingTimers = result.items; + } catch { + // best-effort + } + } + + const eventsWithConflict: EventWithConflict[] = validationResults.map(result => { + if (result.validationStatus === 'invalid') { + return { + ...result, + conflictStatus: 'no_conflict' as ConflictStatus, + conflictingTimerIds: [], + }; + } + return detectConflict(result, existingTimers); + }); + + const conflictCount = eventsWithConflict.filter(e => e.conflictStatus !== 'no_conflict').length; + req.log.info({ runId, stepId: 'conflict', conflictCount }, 'ConflictDetectorAgent done'); + + req.log.info({ runId, stepId: 'import', dryRun }, 'ImportReportAgent start'); + const importedTimers = await importValidEvents(eventsWithConflict, events, dryRun, opts); + + const skippedCount = conflictCount + invalidEvents.length; + const summary = dryRun + ? `DRY RUN: ${validEvents.length}/${events.length} events valid, ${conflictCount} conflict(s), ${invalidEvents.length} invalid. ${validEvents.length - conflictCount} would be imported.` + : `Imported ${importedTimers.length}/${events.length} events as alarms. ${conflictCount} skipped (conflict), ${invalidEvents.length} invalid.`; + + req.log.info({ runId, stepId: 'import', summary }, 'ImportReportAgent done'); + + return { + runId, + productId: 'chronomind', + dryRun, + totalEvents: events.length, + validEvents: validEvents.length, + invalidEvents: invalidEvents.length, + conflictEvents: conflictCount, + importedCount: importedTimers.length, + skippedCount, + perEvent: eventsWithConflict, + importedTimers, + summary, + generatedAt: new Date().toISOString(), + }; +} + +// ── MCP tool registration ───────────────────────────────────────────────────── + +registerTool({ + name: 'chronomind.calendar.import', + description: + 'A2A pipeline: validates a batch of calendar events (dtstart, dtend, summary), detects time conflicts with existing ChronoMind timers, and imports conflict-free events as alarm timers. Use dryRun=true to preview without creating timers. Returns per-event validation + conflict status plus a list of imported timer IDs. Requires admin role.', + requiredRole: 'admin', + inputSchema: z.object({ + events: z + .array(CalendarEventSchema) + .min(1) + .describe('List of calendar events to import (from .ics or calendar API)'), + dryRun: z + .boolean() + .default(false) + .describe('If true, validate and check conflicts only — do not create timers'), + }), + async execute(args, req) { + return runCalendarImportPipeline(args.events, args.dryRun, req); + }, +}); diff --git a/services/mcp-server/src/modules/a2a/goal-coaching-pipeline.ts b/services/mcp-server/src/modules/a2a/goal-coaching-pipeline.ts new file mode 100644 index 00000000..de3fb329 --- /dev/null +++ b/services/mcp-server/src/modules/a2a/goal-coaching-pipeline.ts @@ -0,0 +1,234 @@ +/** + * GoalCoachingAgent — A2A pipeline for PeakPulse activity goal recommendations. + * + * Agent roster (3 steps): + * 1. SessionHistoryAgent — fetch recent completed sessions, compute performance trends + * 2. GoalAnalysisAgent — compare trends against overall stats, identify improvement areas + * 3. GoalReportAgent — propose concrete next goals based on trend analysis + * + * MCP tools: + * peakpulse.goals.coach(activityType?, lookback?) — run pipeline + */ + +import { randomUUID } from 'node:crypto'; +import { z } from 'zod'; +import { registerTool } from '../tools/registry.js'; +import type { McpToolRequest } from '../tools/types.js'; +import { + peakpulseSessionsList, + peakpulseGetStats, + type PeakSessionDoc, +} from '../../lib/peakpulse-client.js'; + +// ── Types ────────────────────────────────────────────────────────────────────── + +interface PerformanceTrend { + metric: string; + unit: string; + values: number[]; + average: number; + recent: number; + delta: number; + trending: 'up' | 'down' | 'flat'; +} + +interface GoalSuggestion { + metric: string; + currentAverage: number; + suggestedTarget: number; + unit: string; + rationale: string; +} + +export interface GoalCoachingReport { + runId: string; + productId: 'peakpulse'; + activityType: string | null; + sessionsAnalyzed: number; + trends: PerformanceTrend[]; + suggestions: GoalSuggestion[]; + overallLevel: 'beginner' | 'intermediate' | 'advanced'; + summary: string; + generatedAt: string; +} + +// ── Step 1: SessionHistoryAgent ──────────────────────────────────────────────── + +async function fetchSessionHistory( + activityType: string | null, + lookback: number, + opts: { token?: string; requestId?: string } +): Promise { + try { + const result = await peakpulseSessionsList( + { activityType: activityType ?? undefined, status: 'completed', limit: lookback }, + opts + ); + return result.items.sort( + (a: PeakSessionDoc, b: PeakSessionDoc) => + new Date(a.startTime).getTime() - new Date(b.startTime).getTime() + ); + } catch { + return []; + } +} + +// ── Step 2: GoalAnalysisAgent ───────────────────────────────────────────────── + +function buildTrends(sessions: PeakSessionDoc[]): PerformanceTrend[] { + if (sessions.length < 2) return []; + + const metricDefs: Array<{ key: keyof PeakSessionDoc; unit: string; label: string }> = [ + { key: 'durationSeconds', unit: 'min', label: 'duration' }, + { key: 'distanceMeters', unit: 'km', label: 'distance' }, + { key: 'elevationGainMeters', unit: 'm', label: 'elevation_gain' }, + { key: 'averageSpeedMps', unit: 'km/h', label: 'average_speed' }, + ]; + + const trends: PerformanceTrend[] = []; + + for (const def of metricDefs) { + const rawValues = sessions + .map(s => s[def.key] as number | undefined) + .filter((v): v is number => typeof v === 'number' && v > 0); + + if (rawValues.length < 2) continue; + + // Convert units + const values = + def.key === 'durationSeconds' + ? rawValues.map(v => Math.round(v / 60)) + : def.key === 'distanceMeters' + ? rawValues.map(v => Math.round(v / 100) / 10) + : def.key === 'averageSpeedMps' + ? rawValues.map(v => Math.round(v * 36) / 10) + : rawValues; + + const average = values.reduce((s, v) => s + v, 0) / values.length; + const recent = values[values.length - 1]!; + const delta = recent - (values[0] ?? recent); + const trending = delta > average * 0.05 ? 'up' : delta < -average * 0.05 ? 'down' : 'flat'; + + trends.push({ metric: def.label, unit: def.unit, values, average, recent, delta, trending }); + } + + return trends; +} + +function suggestGoals( + trends: PerformanceTrend[], + stats: Record +): GoalSuggestion[] { + const suggestions: GoalSuggestion[] = []; + const totalSessions = typeof stats['totalSessions'] === 'number' ? stats['totalSessions'] : 0; + + for (const trend of trends) { + let suggestedTarget = trend.average; + let rationale = ''; + + if (trend.trending === 'up') { + // Already improving — push for 10% more + suggestedTarget = Math.round(trend.recent * 1.1 * 10) / 10; + rationale = `${trend.metric} is trending up (+${trend.delta.toFixed(1)} ${trend.unit}). Push to ${suggestedTarget} ${trend.unit}.`; + } else if (trend.trending === 'flat' && totalSessions > 5) { + // Plateau — target 15% improvement + suggestedTarget = Math.round(trend.average * 1.15 * 10) / 10; + rationale = `${trend.metric} has plateaued at avg ${trend.average.toFixed(1)} ${trend.unit}. Try targeting ${suggestedTarget} ${trend.unit} to break through.`; + } else if (trend.trending === 'down') { + // Declining — target returning to recent average + suggestedTarget = Math.round(trend.average * 10) / 10; + rationale = `${trend.metric} is declining. Focus on recovering to avg ${suggestedTarget} ${trend.unit}.`; + } + + if (rationale) { + suggestions.push({ + metric: trend.metric, + currentAverage: Math.round(trend.average * 10) / 10, + suggestedTarget, + unit: trend.unit, + rationale, + }); + } + } + + return suggestions; +} + +function classifyLevel(sessions: PeakSessionDoc[]): 'beginner' | 'intermediate' | 'advanced' { + if (sessions.length < 5) return 'beginner'; + const avgDuration = sessions.reduce((s, x) => s + (x.durationSeconds ?? 0), 0) / sessions.length; + const avgDistance = sessions.reduce((s, x) => s + (x.distanceMeters ?? 0), 0) / sessions.length; + if (avgDuration > 7200 && avgDistance > 15000) return 'advanced'; + if (avgDuration > 3600 || avgDistance > 8000) return 'intermediate'; + return 'beginner'; +} + +// ── Pipeline runner ──────────────────────────────────────────────────────────── + +async function runGoalCoachingPipeline( + activityType: string | null, + lookback: number, + req: McpToolRequest +): Promise { + const runId = randomUUID(); + const opts = { token: req.headers.authorization?.slice(7), requestId: req.id }; + + req.log.info({ runId, stepId: 'history', activityType, lookback }, 'SessionHistoryAgent start'); + const sessions = await fetchSessionHistory(activityType, lookback, opts); + req.log.info({ runId, stepId: 'history', count: sessions.length }, 'SessionHistoryAgent done'); + + req.log.info({ runId, stepId: 'analyze' }, 'GoalAnalysisAgent start'); + const stats = await peakpulseGetStats(opts).catch(() => ({}) as Record); + const trends = buildTrends(sessions); + const suggestions = suggestGoals(trends, stats as Record); + const overallLevel = classifyLevel(sessions); + req.log.info( + { runId, stepId: 'analyze', trendCount: trends.length, suggestionCount: suggestions.length }, + 'GoalAnalysisAgent done' + ); + + req.log.info({ runId, stepId: 'report' }, 'GoalReportAgent start'); + const summary = + sessions.length < 2 + ? 'Insufficient session history for goal coaching. Complete at least 2 sessions first.' + : `Analyzed ${sessions.length} ${activityType ?? 'activity'} session(s). Level: ${overallLevel}. ${suggestions.length} goal suggestion(s) generated.`; + + req.log.info({ runId, stepId: 'report', summary }, 'GoalReportAgent done'); + + return { + runId, + productId: 'peakpulse', + activityType, + sessionsAnalyzed: sessions.length, + trends, + suggestions, + overallLevel, + summary, + generatedAt: new Date().toISOString(), + }; +} + +// ── MCP tool registration ───────────────────────────────────────────────────── + +registerTool({ + name: 'peakpulse.goals.coach', + description: + 'A2A pipeline: analyzes PeakPulse session history to identify performance trends (duration, distance, elevation, speed) and proposes concrete next goals. Classifies user as beginner/intermediate/advanced. Optionally filter by activityType (hiking/skiing/cycling/running). Requires admin role.', + requiredRole: 'admin', + inputSchema: z.object({ + activityType: z + .enum(['hiking', 'skiing', 'cycling', 'running']) + .optional() + .describe('Filter sessions by activity type (omit for all types)'), + lookback: z.coerce + .number() + .int() + .min(3) + .max(50) + .default(15) + .describe('Number of recent sessions to analyze (default 15)'), + }), + async execute(args, req) { + return runGoalCoachingPipeline(args.activityType ?? null, args.lookback, req); + }, +}); diff --git a/services/mcp-server/src/modules/a2a/org-provisioning-pipeline.ts b/services/mcp-server/src/modules/a2a/org-provisioning-pipeline.ts new file mode 100644 index 00000000..b2ca6a0d --- /dev/null +++ b/services/mcp-server/src/modules/a2a/org-provisioning-pipeline.ts @@ -0,0 +1,194 @@ +/** + * OrgProvisioningAgent — A2A pipeline for LysnrAI organization onboarding. + * + * Agent roster (3 steps): + * 1. OrgInspectorAgent — fetch org details, check existing API tokens + sessions + * 2. ProvisioningActionAgent — create default API token if none exists; assess theme + user state + * 3. OrgReportAgent — assemble per-org provisioning action report + * + * MCP tools: + * lysnrai.orgs.provision(orgId) — run provisioning pipeline for an org + */ + +import { randomUUID } from 'node:crypto'; +import { z } from 'zod'; +import { registerTool } from '../tools/registry.js'; +import type { McpToolRequest } from '../tools/types.js'; +import { + lysnraiOrgGet, + lysnraiApiTokensList, + lysnraiSessionsList, + type OrgDoc, + type ApiTokenDoc, +} from '../../lib/lysnrai-client.js'; + +// ── Types ────────────────────────────────────────────────────────────────────── + +interface OrgInspection { + org: OrgDoc; + existingTokenCount: number; + existingTokens: ApiTokenDoc[]; + sessionCount: number; + needsDefaultToken: boolean; + needsOnboarding: boolean; +} + +export interface OrgProvisioningReport { + runId: string; + productId: 'lysnrai'; + orgId: string; + orgName: string; + existingTokens: number; + defaultTokenNeeded: boolean; + defaultTokenNote: string; + sessionCount: number; + onboardingStatus: 'complete' | 'needs_attention' | 'new_org'; + recommendations: string[]; + summary: string; + generatedAt: string; +} + +// ── Step 1: OrgInspectorAgent ───────────────────────────────────────────────── + +async function inspectOrg( + orgId: string, + opts: { token?: string; requestId?: string } +): Promise { + try { + const [org, tokensResult, sessionsResult] = await Promise.all([ + lysnraiOrgGet(orgId, opts), + lysnraiApiTokensList({ limit: 10 }, opts), + lysnraiSessionsList({ limit: 10 }, opts), + ]); + + const existingTokens = tokensResult.tokens; + const sessionCount = sessionsResult.total; + + return { + org, + existingTokenCount: existingTokens.length, + existingTokens, + sessionCount, + needsDefaultToken: existingTokens.length === 0, + needsOnboarding: sessionCount === 0 && existingTokens.length === 0, + }; + } catch { + return null; + } +} + +// ── Step 2: ProvisioningActionAgent ─────────────────────────────────────────── + +function buildProvisioningActions(inspection: OrgInspection): { + defaultTokenNote: string; + recommendations: string[]; +} { + const recommendations: string[] = []; + + const defaultTokenNote = inspection.needsDefaultToken + ? 'No API tokens found. Create a default API token via POST /api-tokens with name="default-org-token" to enable programmatic access.' + : `${inspection.existingTokenCount} API token(s) already exist for this org.`; + + if (inspection.needsDefaultToken) { + recommendations.push(defaultTokenNote); + } + + if (inspection.sessionCount === 0) { + recommendations.push( + 'No dictation sessions found. Encourage users to complete their first dictation session to verify the full pipeline.' + ); + } + + if (inspection.needsOnboarding) { + recommendations.push( + 'Org appears brand new (no tokens, no sessions). Consider sending a welcome email via the delivery module and scheduling a check-in.' + ); + } + + const unusedTokens = inspection.existingTokens.filter(t => t.lastUsedAt === null); + if (unusedTokens.length > 0) { + recommendations.push( + `${unusedTokens.length} API token(s) have never been used. Verify they were distributed to the correct users.` + ); + } + + return { defaultTokenNote, recommendations }; +} + +// ── Pipeline runner ──────────────────────────────────────────────────────────── + +async function runOrgProvisioningPipeline( + orgId: string, + req: McpToolRequest +): Promise { + const runId = randomUUID(); + const opts = { token: req.headers.authorization?.slice(7), requestId: req.id }; + + req.log.info({ runId, stepId: 'inspect', orgId }, 'OrgInspectorAgent start'); + const inspection = await inspectOrg(orgId, opts); + req.log.info({ runId, stepId: 'inspect', found: inspection !== null }, 'OrgInspectorAgent done'); + + if (!inspection) { + return { + runId, + productId: 'lysnrai', + orgId, + orgName: 'unknown', + existingTokens: 0, + defaultTokenNeeded: false, + defaultTokenNote: 'Organization not found or access denied.', + sessionCount: 0, + onboardingStatus: 'needs_attention', + recommendations: ['Verify the orgId is correct and the caller has admin access.'], + summary: `Failed to inspect org ${orgId}. Organization not found or inaccessible.`, + generatedAt: new Date().toISOString(), + }; + } + + req.log.info({ runId, stepId: 'provision' }, 'ProvisioningActionAgent start'); + const { defaultTokenNote, recommendations } = buildProvisioningActions(inspection); + req.log.info( + { runId, stepId: 'provision', recommendationCount: recommendations.length }, + 'ProvisioningActionAgent done' + ); + + req.log.info({ runId, stepId: 'report' }, 'OrgReportAgent start'); + const onboardingStatus = inspection.needsOnboarding + ? 'new_org' + : inspection.sessionCount > 0 && inspection.existingTokenCount > 0 + ? 'complete' + : 'needs_attention'; + + const summary = `Org "${inspection.org.name}" (${orgId}): ${inspection.existingTokenCount} API token(s), ${inspection.sessionCount} session(s). Status: ${onboardingStatus}. ${recommendations.length} action(s) recommended.`; + req.log.info({ runId, stepId: 'report', summary }, 'OrgReportAgent done'); + + return { + runId, + productId: 'lysnrai', + orgId, + orgName: inspection.org.name, + existingTokens: inspection.existingTokenCount, + defaultTokenNeeded: inspection.needsDefaultToken, + defaultTokenNote, + sessionCount: inspection.sessionCount, + onboardingStatus, + recommendations, + summary, + generatedAt: new Date().toISOString(), + }; +} + +// ── MCP tool registration ───────────────────────────────────────────────────── + +registerTool({ + name: 'lysnrai.orgs.provision', + description: + 'A2A pipeline: inspects a LysnrAI organization, checks for missing API tokens and session activity, and returns actionable provisioning recommendations. Flags new orgs needing default token creation, unused tokens, and zero-session orgs needing user outreach. Requires admin role.', + requiredRole: 'admin', + inputSchema: z.object({ + orgId: z.string().min(1).describe('Organization ID to provision/inspect'), + }), + async execute(args, req) { + return runOrgProvisioningPipeline(args.orgId, req); + }, +}); diff --git a/services/mcp-server/src/modules/a2a/protocol-tuning-pipeline.ts b/services/mcp-server/src/modules/a2a/protocol-tuning-pipeline.ts new file mode 100644 index 00000000..c3305a71 --- /dev/null +++ b/services/mcp-server/src/modules/a2a/protocol-tuning-pipeline.ts @@ -0,0 +1,234 @@ +/** + * ProtocolTuningAgent — A2A pipeline for NomGap fasting protocol optimization. + * + * Agent roster (3 steps): + * 1. ProtocolStatsCollectorAgent — fetch all protocols + aggregate completion/abandonment stats + * 2. AbandonmentAnalysisAgent — identify protocols with high abandonment; run extraction for pattern analysis + * 3. TuningReportAgent — propose protocol parameter adjustments based on findings + * + * MCP tools: + * nomgap.protocols.tune() — run protocol tuning analysis + */ + +import { randomUUID } from 'node:crypto'; +import { z } from 'zod'; +import { registerTool } from '../tools/registry.js'; +import type { McpToolRequest } from '../tools/types.js'; +import { nomgapProtocolsList, nomgapFastingGetStats } from '../../lib/nomgap-client.js'; +import { extractionRun, type ExtractionItem } from '../../lib/extraction-client.js'; + +// ── Types ────────────────────────────────────────────────────────────────────── + +interface ProtocolStat { + id: string; + name: string; + targetHours: number | null; + completionRate: number | null; + abandonmentRate: number | null; + avgCompletedHours: number | null; +} + +interface TuningRecommendation { + protocolId: string; + protocolName: string; + issue: string; + suggestion: string; + priority: 'high' | 'medium' | 'low'; +} + +export interface ProtocolTuningReport { + runId: string; + productId: 'nomgap'; + protocolsAnalyzed: number; + highAbandonmentCount: number; + recommendations: TuningRecommendation[]; + extractedPatterns: ExtractionItem[]; + overallCompletionRate: number | null; + summary: string; + generatedAt: string; +} + +// ── Step 1: ProtocolStatsCollectorAgent ─────────────────────────────────────── + +async function collectProtocolStats(opts: { + token?: string; + requestId?: string; +}): Promise<{ protocols: ProtocolStat[]; overallStats: Record }> { + try { + const [protocolsResult, overallStats] = await Promise.all([ + nomgapProtocolsList(opts), + nomgapFastingGetStats(opts).catch(() => ({}) as Record), + ]); + + const protocols: ProtocolStat[] = ( + protocolsResult.protocols as Array> + ).map(p => ({ + id: String(p['id'] ?? ''), + name: String(p['name'] ?? 'Unknown'), + targetHours: typeof p['targetHours'] === 'number' ? p['targetHours'] : null, + completionRate: typeof p['completionRate'] === 'number' ? p['completionRate'] : null, + abandonmentRate: typeof p['abandonmentRate'] === 'number' ? p['abandonmentRate'] : null, + avgCompletedHours: typeof p['avgCompletedHours'] === 'number' ? p['avgCompletedHours'] : null, + })); + + return { protocols, overallStats: overallStats as Record }; + } catch { + return { protocols: [], overallStats: {} }; + } +} + +// ── Step 2: AbandonmentAnalysisAgent ───────────────────────────────────────── + +async function analyzeAbandonment( + protocols: ProtocolStat[], + opts: { token?: string; requestId?: string } +): Promise<{ highAbandonment: ProtocolStat[]; extractedPatterns: ExtractionItem[] }> { + const highAbandonment = protocols.filter( + p => p.abandonmentRate !== null && p.abandonmentRate > 0.4 + ); + + let extractedPatterns: ExtractionItem[] = []; + + if (highAbandonment.length > 0) { + const text = highAbandonment + .map( + p => + `Protocol "${p.name}": target ${p.targetHours ?? '?'}h, abandonment rate ${ + p.abandonmentRate !== null ? (p.abandonmentRate * 100).toFixed(0) : '?' + }%, avg completed ${p.avgCompletedHours?.toFixed(1) ?? '?'}h.` + ) + .join('\n'); + + try { + const result = await extractionRun({ text, taskId: 'memory-insight' }, opts); + extractedPatterns = result.extractions; + } catch { + // best-effort + } + } + + return { highAbandonment, extractedPatterns }; +} + +// ── Step 3: TuningReportAgent ───────────────────────────────────────────────── + +function buildTuningRecommendations( + protocols: ProtocolStat[], + highAbandonment: ProtocolStat[] +): TuningRecommendation[] { + const recommendations: TuningRecommendation[] = []; + + for (const p of highAbandonment) { + const abandonPct = p.abandonmentRate !== null ? (p.abandonmentRate * 100).toFixed(0) : '?'; + + if ( + p.avgCompletedHours !== null && + p.targetHours !== null && + p.avgCompletedHours < p.targetHours * 0.6 + ) { + recommendations.push({ + protocolId: p.id, + protocolName: p.name, + issue: `${abandonPct}% abandonment rate; users average only ${p.avgCompletedHours.toFixed(1)}h of ${p.targetHours}h target.`, + suggestion: `Consider reducing target to ${Math.ceil(p.avgCompletedHours * 1.2)}h for a "stepped" version, or add milestone notifications at ${Math.round(p.avgCompletedHours)}h to encourage completion.`, + priority: p.abandonmentRate! > 0.6 ? 'high' : 'medium', + }); + } else { + recommendations.push({ + protocolId: p.id, + protocolName: p.name, + issue: `${abandonPct}% abandonment rate.`, + suggestion: + 'Review safety warnings frequency and check if users receive adequate pre-fast coaching. Consider adding a "Modified" variant with a lower target.', + priority: 'medium', + }); + } + } + + const healthyProtocols = protocols.filter( + p => + p.completionRate !== null && + p.completionRate < 0.3 && + !highAbandonment.find(h => h.id === p.id) + ); + for (const p of healthyProtocols) { + recommendations.push({ + protocolId: p.id, + protocolName: p.name, + issue: `Low completion rate (${p.completionRate !== null ? (p.completionRate * 100).toFixed(0) : '?'}%) with no abandonment data — may not be discoverable.`, + suggestion: + 'Improve protocol description and add use-case tags to increase discoverability in the protocol selection UI.', + priority: 'low', + }); + } + + return recommendations; +} + +// ── Pipeline runner ──────────────────────────────────────────────────────────── + +async function runProtocolTuningPipeline(req: McpToolRequest): Promise { + const runId = randomUUID(); + const opts = { token: req.headers.authorization?.slice(7), requestId: req.id }; + + req.log.info({ runId, stepId: 'collect' }, 'ProtocolStatsCollectorAgent start'); + const { protocols, overallStats } = await collectProtocolStats(opts); + req.log.info( + { runId, stepId: 'collect', protocolCount: protocols.length }, + 'ProtocolStatsCollectorAgent done' + ); + + req.log.info( + { runId, stepId: 'analyze', highAbandonmentCheck: protocols.length }, + 'AbandonmentAnalysisAgent start' + ); + const { highAbandonment, extractedPatterns } = await analyzeAbandonment(protocols, opts); + req.log.info( + { + runId, + stepId: 'analyze', + highAbandonment: highAbandonment.length, + patterns: extractedPatterns.length, + }, + 'AbandonmentAnalysisAgent done' + ); + + req.log.info({ runId, stepId: 'tune' }, 'TuningReportAgent start'); + const recommendations = buildTuningRecommendations(protocols, highAbandonment); + const overallCompletionRate = + typeof overallStats['completionRate'] === 'number' + ? (overallStats['completionRate'] as number) + : null; + + const summary = + protocols.length === 0 + ? 'No protocols found to analyze.' + : `Analyzed ${protocols.length} protocols. ${highAbandonment.length} with high abandonment (>40%). ${recommendations.length} tuning recommendation(s) generated.`; + + req.log.info({ runId, stepId: 'tune', summary }, 'TuningReportAgent done'); + + return { + runId, + productId: 'nomgap', + protocolsAnalyzed: protocols.length, + highAbandonmentCount: highAbandonment.length, + recommendations, + extractedPatterns, + overallCompletionRate, + summary, + generatedAt: new Date().toISOString(), + }; +} + +// ── MCP tool registration ───────────────────────────────────────────────────── + +registerTool({ + name: 'nomgap.protocols.tune', + description: + 'A2A pipeline: analyzes NomGap fasting protocols for high abandonment rates, uses extraction to identify patterns in abandonment data, and proposes concrete parameter adjustments (target duration reduction, milestone notifications, variant creation). Returns prioritized tuning recommendations. Requires admin role.', + requiredRole: 'admin', + inputSchema: z.object({}), + async execute(_args, req) { + return runProtocolTuningPipeline(req); + }, +}); diff --git a/services/mcp-server/src/modules/a2a/ski-run-analyst-pipeline.ts b/services/mcp-server/src/modules/a2a/ski-run-analyst-pipeline.ts new file mode 100644 index 00000000..b9f7fc62 --- /dev/null +++ b/services/mcp-server/src/modules/a2a/ski-run-analyst-pipeline.ts @@ -0,0 +1,215 @@ +/** + * SkiRunAnalystAgent — A2A pipeline for PeakPulse ski session analysis. + * + * Agent roster (3 steps): + * 1. SkiSessionCollectorAgent — fetch completed skiing sessions with skiMetrics + * 2. RunQualityAnalystAgent — compute per-session ski metrics, detect anomalies + * 3. SkiReportAgent — trend analysis + anomaly flags + improvement suggestions + * + * MCP tools: + * peakpulse.ski.analyze(lookback?) — run pipeline on recent ski sessions + */ + +import { randomUUID } from 'node:crypto'; +import { z } from 'zod'; +import { registerTool } from '../tools/registry.js'; +import type { McpToolRequest } from '../tools/types.js'; +import { peakpulseSessionsList, type PeakSessionDoc } from '../../lib/peakpulse-client.js'; + +// ── Types ────────────────────────────────────────────────────────────────────── + +interface SkiSessionMetrics { + sessionId: string; + date: string; + runCount: number; + verticalDescentMeters: number; + liftTimeSeconds: number; + skiTimeSeconds: number; + runDensity: number; + verticalPerRun: number; + skiToLiftRatio: number; +} + +interface SkiAnomaly { + sessionId: string; + date: string; + anomalyType: 'low_run_density' | 'high_lift_ratio' | 'short_session' | 'vertical_drop'; + description: string; +} + +export interface SkiRunAnalystReport { + runId: string; + productId: 'peakpulse'; + sessionsAnalyzed: number; + avgRunCount: number; + avgVerticalDescentMeters: number; + avgSkiToLiftRatio: number; + sessionMetrics: SkiSessionMetrics[]; + anomalies: SkiAnomaly[]; + summary: string; + generatedAt: string; +} + +// ── Step 1: SkiSessionCollectorAgent ────────────────────────────────────────── + +async function collectSkiSessions( + lookback: number, + opts: { token?: string; requestId?: string } +): Promise { + try { + const result = await peakpulseSessionsList( + { activityType: 'skiing', status: 'completed', limit: lookback }, + opts + ); + return result.items.filter(s => s.skiMetrics !== undefined); + } catch { + return []; + } +} + +// ── Step 2: RunQualityAnalystAgent ──────────────────────────────────────────── + +function analyzeSession(session: PeakSessionDoc): SkiSessionMetrics { + const ski = session.skiMetrics!; + const runCount = ski.runCount ?? 0; + const verticalDescentMeters = ski.verticalDescentMeters ?? 0; + const liftTimeSeconds = ski.liftTimeSeconds ?? 0; + const skiTimeSeconds = ski.skiTimeSeconds ?? 1; + + const runDensity = skiTimeSeconds > 0 ? runCount / (skiTimeSeconds / 3600) : 0; + const verticalPerRun = runCount > 0 ? verticalDescentMeters / runCount : 0; + const skiToLiftRatio = liftTimeSeconds > 0 ? skiTimeSeconds / liftTimeSeconds : 0; + + return { + sessionId: session.id, + date: session.startTime, + runCount, + verticalDescentMeters, + liftTimeSeconds, + skiTimeSeconds, + runDensity: Math.round(runDensity * 10) / 10, + verticalPerRun: Math.round(verticalPerRun), + skiToLiftRatio: Math.round(skiToLiftRatio * 100) / 100, + }; +} + +function detectAnomalies( + metrics: SkiSessionMetrics[], + avgRunDensity: number, + avgVertical: number +): SkiAnomaly[] { + const anomalies: SkiAnomaly[] = []; + + for (const m of metrics) { + if (m.runDensity < avgRunDensity * 0.5 && m.runCount > 0) { + anomalies.push({ + sessionId: m.sessionId, + date: m.date, + anomalyType: 'low_run_density', + description: `${m.runDensity.toFixed(1)} runs/hour vs avg ${avgRunDensity.toFixed(1)} — possible injury, poor conditions, or excessive queueing.`, + }); + } + if (m.skiToLiftRatio < 0.5 && m.liftTimeSeconds > 1800) { + anomalies.push({ + sessionId: m.sessionId, + date: m.date, + anomalyType: 'high_lift_ratio', + description: `Ski-to-lift ratio ${m.skiToLiftRatio.toFixed(2)} is below 0.5 — more time on lifts than skiing.`, + }); + } + if (m.skiTimeSeconds < 3600 && m.runCount < 5) { + anomalies.push({ + sessionId: m.sessionId, + date: m.date, + anomalyType: 'short_session', + description: `Only ${m.runCount} runs in ${Math.round(m.skiTimeSeconds / 60)} ski minutes — unusually short session.`, + }); + } + if (m.verticalPerRun < avgVertical * 0.5 && avgVertical > 0) { + anomalies.push({ + sessionId: m.sessionId, + date: m.date, + anomalyType: 'vertical_drop', + description: `${m.verticalPerRun}m vertical per run vs avg ${Math.round(avgVertical)}m — user may be avoiding challenging terrain.`, + }); + } + } + + return anomalies; +} + +// ── Pipeline runner ──────────────────────────────────────────────────────────── + +async function runSkiRunAnalystPipeline( + lookback: number, + req: McpToolRequest +): Promise { + const runId = randomUUID(); + const opts = { token: req.headers.authorization?.slice(7), requestId: req.id }; + + req.log.info({ runId, stepId: 'collect', lookback }, 'SkiSessionCollectorAgent start'); + const sessions = await collectSkiSessions(lookback, opts); + req.log.info( + { runId, stepId: 'collect', count: sessions.length }, + 'SkiSessionCollectorAgent done' + ); + + req.log.info({ runId, stepId: 'analyze' }, 'RunQualityAnalystAgent start'); + const sessionMetrics = sessions.map(analyzeSession); + const avgRunCount = + sessionMetrics.reduce((s, m) => s + m.runCount, 0) / (sessionMetrics.length || 1); + const avgVerticalDescentMeters = + sessionMetrics.reduce((s, m) => s + m.verticalDescentMeters, 0) / (sessionMetrics.length || 1); + const avgSkiToLiftRatio = + sessionMetrics.reduce((s, m) => s + m.skiToLiftRatio, 0) / (sessionMetrics.length || 1); + const avgRunDensity = + sessionMetrics.reduce((s, m) => s + m.runDensity, 0) / (sessionMetrics.length || 1); + const avgVerticalPerRun = + sessionMetrics.reduce((s, m) => s + m.verticalPerRun, 0) / (sessionMetrics.length || 1); + const anomalies = detectAnomalies(sessionMetrics, avgRunDensity, avgVerticalPerRun); + req.log.info( + { runId, stepId: 'analyze', anomalyCount: anomalies.length }, + 'RunQualityAnalystAgent done' + ); + + req.log.info({ runId, stepId: 'report' }, 'SkiReportAgent start'); + const summary = + sessions.length === 0 + ? 'No completed skiing sessions with ski metrics found.' + : `Analyzed ${sessions.length} ski session(s). Avg ${Math.round(avgRunCount)} runs/session, ${Math.round(avgVerticalDescentMeters)}m vertical. ${anomalies.length} anomaly(ies) flagged.`; + req.log.info({ runId, stepId: 'report', summary }, 'SkiReportAgent done'); + + return { + runId, + productId: 'peakpulse', + sessionsAnalyzed: sessions.length, + avgRunCount: Math.round(avgRunCount * 10) / 10, + avgVerticalDescentMeters: Math.round(avgVerticalDescentMeters), + avgSkiToLiftRatio: Math.round(avgSkiToLiftRatio * 100) / 100, + sessionMetrics, + anomalies, + summary, + generatedAt: new Date().toISOString(), + }; +} + +// ── MCP tool registration ───────────────────────────────────────────────────── + +registerTool({ + name: 'peakpulse.ski.analyze', + description: + 'A2A pipeline: analyzes PeakPulse skiing sessions using SkiMetrics data (runCount, verticalDescent, liftTime, skiTime). Computes run density, vertical per run, ski-to-lift ratio, and flags anomalies (low density, high lift ratio, short session, vertical drop vs baseline). Requires admin role.', + requiredRole: 'admin', + inputSchema: z.object({ + lookback: z.coerce + .number() + .int() + .min(2) + .max(50) + .default(20) + .describe('Number of recent skiing sessions to analyze (default 20)'), + }), + async execute(args, req) { + return runSkiRunAnalystPipeline(args.lookback, req); + }, +}); diff --git a/services/mcp-server/src/server.ts b/services/mcp-server/src/server.ts index 2f22428e..294178cc 100644 --- a/services/mcp-server/src/server.ts +++ b/services/mcp-server/src/server.ts @@ -54,6 +54,11 @@ import './modules/a2a/nl-parser-eval-pipeline.js'; import './modules/a2a/routine-quality-pipeline.js'; import './modules/a2a/social-fast-coordinator-pipeline.js'; import './modules/a2a/team-provisioning-pipeline.js'; +import './modules/a2a/goal-coaching-pipeline.js'; +import './modules/a2a/ski-run-analyst-pipeline.js'; +import './modules/a2a/org-provisioning-pipeline.js'; +import './modules/a2a/protocol-tuning-pipeline.js'; +import './modules/a2a/calendar-import-pipeline.js'; import './modules/mindlyst/mindlyst-tools.js'; import './modules/lysnrai/lysnrai-tools.js'; import './modules/jarvis/jarvis-tools.js';