learning_ai_common_plat/services/mcp-server/src/modules/a2a/sync-conflict-pipeline.ts
saravanakumardb1 d7aa90b021 feat(mcp-server): 4 MCP tool gaps + 3 new A2A pipelines (Priority 3/6/7)
MCP tool gaps filled (DOMAIN_PRODUCTS.md alignment):
- jarvis.memory.create — POST /jarvis/agents/:agentId/memory (sessionId, type, content, importance, tags, expiresAt)
- jarvis.teams.listMembers — GET /jarvis/teams/:teamId/members (role, status, joinedAt)
- nomgap.fasting.getSession — GET /fasting/sessions/:id (client func already existed, MCP tool was missing)
- peakpulse.weather.getSnapshot — extracts weather field from peakpulseSessionGet response

New A2A pipelines (all registered in server.ts):
- transcript-extraction-pipeline.ts: lysnrai.transcripts.runExtractionPipeline
  - TranscriptCollectorAgent -> ExtractionBatchAgent -> ExtractionReportAgent
  - Queries transcripts missing extractedAt, runs extraction, returns batch report + dryRun support
- sync-conflict-pipeline.ts: chronomind.sync.diagnoseConflicts
  - ConflictDetectorAgent -> SyncStateInspectorAgent -> DiagnosticsSessionAgent -> ConflictReportAgent
  - Queries telemetry for sync_conflict events, classifies pattern, creates diagnostics session on conflict
- route-safety-pipeline.ts: peakpulse.sessions.assessSafety
  - SessionDataAgent -> RouteProfileAgent -> SafetyAnalysisAgent -> SafetyReportAgent
  - Fetches GPS + weather, evaluates UV/wind/altitude/speed risk factors, enriches with extraction entities

Client additions (jarvis-client.ts):
  jarvisMemoryCreate, jarvisTeamsListMembers + JarvisTeamMemberDoc interface

MCP server total: 93 tools across 17 namespaces
2026-03-05 15:18:21 -08:00

308 lines
11 KiB
TypeScript

/**
* SyncConflictDiagnosticsAgent — A2A pipeline for ChronoMind sync conflict diagnostics.
*
* Agent roster (4 steps):
* 1. ConflictDetectorAgent — query telemetry for sync_conflict events for a user
* 2. SyncStateInspectorAgent — pull current sync status (queue depth, unsynced count)
* 3. DiagnosticsSessionAgent — create platform diagnostics session when conflicts found
* 4. ConflictReportAgent — assemble report with root cause analysis + remediation
*
* MCP tools:
* chronomind.sync.diagnoseConflicts(userId, deviceId?, from?, to?) — run pipeline
*/
import { randomUUID } from 'node:crypto';
import { z } from 'zod';
import { registerTool } from '../tools/registry.js';
import type { McpToolRequest } from '../tools/types.js';
import {
telemetryQuery,
diagnosticsCreateSession,
diagnosticsGetLogs,
diagnosticsUpdateSession,
type DebugSession,
} from '../../lib/platform-client.js';
import { chronomindSyncStatus } from '../../lib/chronomind-client.js';
// ── Types ──────────────────────────────────────────────────────────────────────
type ConflictPattern = 'version_clash' | 'concurrent_edit' | 'stale_device' | 'unknown';
interface ConflictDetection {
userId: string;
deviceId: string | null;
conflictCount: number;
recentConflicts: unknown[];
conflictPattern: ConflictPattern;
fromTime: string;
toTime: string;
}
interface SyncStateResult {
unsyncedCount: number;
pendingCount: number;
lastSyncAt: string | null;
activeTimers: number;
}
interface DiagnosticsCapture {
skipped: boolean;
skipReason?: string;
session?: DebugSession;
logEntries: unknown[];
}
export interface SyncConflictReport {
runId: string;
productId: 'chronomind';
userId: string;
deviceId: string | null;
conflictCount: number;
conflictPattern: ConflictPattern;
unsyncedCount: number;
pendingCount: number;
diagnosticsSessionId: string | null;
logEntries: unknown[];
rootCauseSummary: string;
recommendedAction: string;
generatedAt: string;
}
// ── Step 1: ConflictDetectorAgent ─────────────────────────────────────────────
async function detectConflicts(
userId: string,
deviceId: string | null,
fromTime: string,
toTime: string,
opts: { token?: string; requestId?: string }
): Promise<ConflictDetection> {
let conflictCount = 0;
let recentConflicts: unknown[] = [];
try {
const result = await telemetryQuery(
{
productId: 'chronomind',
eventType: 'sync_conflict',
from: fromTime,
to: toTime,
limit: 20,
},
{ ...opts, productId: 'chronomind' }
);
conflictCount = result.total;
recentConflicts = result.events;
} catch {
// best-effort
}
let conflictPattern: ConflictPattern = 'unknown';
if (conflictCount > 0) {
const events = recentConflicts as Array<Record<string, unknown>>;
const hasVersionClash = events.some(e => String(e['errorCode'] ?? '').includes('version'));
const hasConcurrentEdit = events.some(e => String(e['errorCode'] ?? '').includes('concurrent'));
const hasStaleDevice = events.some(e => String(e['errorCode'] ?? '').includes('stale'));
if (hasVersionClash) conflictPattern = 'version_clash';
else if (hasConcurrentEdit) conflictPattern = 'concurrent_edit';
else if (hasStaleDevice) conflictPattern = 'stale_device';
}
return {
userId,
deviceId,
conflictCount,
recentConflicts,
conflictPattern,
fromTime,
toTime,
};
}
// ── Step 2: SyncStateInspectorAgent ───────────────────────────────────────────
async function inspectSyncState(opts: {
token?: string;
requestId?: string;
}): Promise<SyncStateResult> {
try {
const status = await chronomindSyncStatus(opts);
return {
unsyncedCount: status.unsyncedCount ?? 0,
pendingCount: status.pending ?? 0,
lastSyncAt: status.lastSyncedAt ?? null,
activeTimers: status.active ?? 0,
};
} catch {
return { unsyncedCount: 0, pendingCount: 0, lastSyncAt: null, activeTimers: 0 };
}
}
// ── Step 3: DiagnosticsSessionAgent ───────────────────────────────────────────
async function captureDiagnostics(
runId: string,
userId: string,
conflictDetection: ConflictDetection,
opts: { token?: string; requestId?: string }
): Promise<DiagnosticsCapture> {
if (conflictDetection.conflictCount === 0) {
return { skipped: true, skipReason: 'no_conflicts_detected', logEntries: [] };
}
try {
const session = await diagnosticsCreateSession(
{
productId: 'chronomind',
targetUserId: userId,
maxDurationMinutes: 30,
},
opts
);
const logs = await diagnosticsGetLogs(session.id, { limit: 50 }, opts).catch(() => ({
logs: [],
}));
await diagnosticsUpdateSession(session.id, { status: 'completed' }, opts).catch(() => null);
return { skipped: false, session, logEntries: (logs as { logs: unknown[] }).logs };
} catch {
return { skipped: true, skipReason: 'diagnostics_session_failed', logEntries: [] };
}
}
// ── Step 4: ConflictReportAgent ───────────────────────────────────────────────
function buildConflictReport(
runId: string,
detection: ConflictDetection,
syncState: SyncStateResult,
diagnostics: DiagnosticsCapture
): SyncConflictReport {
const now = new Date().toISOString();
const rootCauseSummary = (() => {
if (detection.conflictCount === 0)
return 'No sync conflicts detected in the specified time window.';
switch (detection.conflictPattern) {
case 'version_clash':
return `${detection.conflictCount} version clash conflict(s) detected. Timer sync versions diverged — likely caused by the same timer being edited on two devices while offline.`;
case 'concurrent_edit':
return `${detection.conflictCount} concurrent edit conflict(s) detected. Multiple devices modified the same timer simultaneously.`;
case 'stale_device':
return `${detection.conflictCount} stale device conflict(s) detected. A device is syncing with an outdated timer version — device likely offline for an extended period.`;
default:
return `${detection.conflictCount} sync conflict(s) detected with unclassified pattern. Check diagnostics logs for detailed error codes.`;
}
})();
const recommendedAction = (() => {
if (detection.conflictCount === 0) return 'No action required.';
if (syncState.unsyncedCount > 10)
return 'Force full re-sync from the primary device. Clear the offline queue on secondary devices before next sync.';
if (detection.conflictPattern === 'version_clash')
return 'Trigger a full timer re-sync (POST /timers/sync with force=true) from the device with the latest edits.';
if (detection.conflictPattern === 'stale_device')
return "Clear the stale device's local cache and trigger a fresh pull from the server.";
return 'Review diagnostics logs for detailed conflict context, then force re-sync.';
})();
return {
runId,
productId: 'chronomind',
userId: detection.userId,
deviceId: detection.deviceId,
conflictCount: detection.conflictCount,
conflictPattern: detection.conflictPattern,
unsyncedCount: syncState.unsyncedCount,
pendingCount: syncState.pendingCount,
diagnosticsSessionId: diagnostics.session?.id ?? null,
logEntries: diagnostics.logEntries,
rootCauseSummary,
recommendedAction,
generatedAt: now,
};
}
// ── Pipeline runner ────────────────────────────────────────────────────────────
async function runSyncConflictPipeline(
userId: string,
deviceId: string | null,
fromTime: string,
toTime: string,
req: McpToolRequest
): Promise<SyncConflictReport> {
const runId = randomUUID();
const opts = {
token: req.headers.authorization?.slice(7),
requestId: req.id,
};
req.log.info({ runId, stepId: 'detect', userId, deviceId }, 'ConflictDetectorAgent start');
const detection = await detectConflicts(userId, deviceId, fromTime, toTime, opts);
req.log.info(
{
runId,
stepId: 'detect',
conflictCount: detection.conflictCount,
pattern: detection.conflictPattern,
},
'ConflictDetectorAgent done'
);
req.log.info({ runId, stepId: 'syncState' }, 'SyncStateInspectorAgent start');
const syncState = await inspectSyncState(opts);
req.log.info(
{ runId, stepId: 'syncState', unsyncedCount: syncState.unsyncedCount },
'SyncStateInspectorAgent done'
);
req.log.info(
{ runId, stepId: 'diagnostics', conflictCount: detection.conflictCount },
'DiagnosticsSessionAgent start'
);
const diagnostics = await captureDiagnostics(runId, userId, detection, opts);
req.log.info(
{
runId,
stepId: 'diagnostics',
skipped: diagnostics.skipped,
sessionId: diagnostics.session?.id,
},
'DiagnosticsSessionAgent done'
);
req.log.info({ runId, stepId: 'report' }, 'ConflictReportAgent start');
const report = buildConflictReport(runId, detection, syncState, diagnostics);
req.log.info(
{ runId, stepId: 'report', conflictCount: report.conflictCount },
'ConflictReportAgent done'
);
return report;
}
// ── MCP tool registration ─────────────────────────────────────────────────────
registerTool({
name: 'chronomind.sync.diagnoseConflicts',
description:
'A2A pipeline: queries ChronoMind telemetry for sync_conflict events, inspects current sync queue state, creates a diagnostics session for affected users, and returns a root-cause conflict report with remediation steps. Requires admin role.',
requiredRole: 'admin',
inputSchema: z.object({
userId: z.string().min(1).describe('User to run conflict diagnostics for'),
deviceId: z.string().optional().describe('Optional: narrow to a specific device ID'),
from: z.string().datetime().optional().describe('Start of conflict window (default: 24h ago)'),
to: z.string().datetime().optional().describe('End of conflict window (default: now)'),
}),
async execute(args, req) {
const now = new Date();
const toTime = args.to ?? now.toISOString();
const fromTime = args.from ?? new Date(now.getTime() - 24 * 60 * 60 * 1000).toISOString();
return runSyncConflictPipeline(args.userId, args.deviceId ?? null, fromTime, toTime, req);
},
});