learning_ai_common_plat/docs/MCP+A2A/agents/SyncDiagnosticsAgent.md

7.1 KiB

SyncDiagnosticsAgent — A2A Spec

Product: PeakPulse
Trigger: Telemetry event sync_upload_failed (from TelemetryService.swift / Android SyncRepository), or on-demand via peakpulse.sync.diagnose(userId)
Output: Active diagnostics session targeting the failing device + structured sync diagnostic report logged as telemetry event


Agent roster

Step Agent Input Output
1 SyncFailureInspectorAgent userId, deviceId, failure event payload Sync queue depth + last successful sync + failure pattern classification
2 DiagnosticsSessionAgent userId, deviceId, productId, failure context Active platform.diagnostics session ID targeting the device
3 SyncRetryObserverAgent Session ID + retry window Collected network traces from next sync attempt (or timeout report)
4 SyncDiagnosticReportAgent Traces + failure classification + session stats Structured diagnostic report + telemetry event + optional alert to admin

Agent contracts

SyncFailureInspectorAgent

// Tool: peakpulse.syncStatus (GET /peak/sync-status?userId=<id>)
// Tool: platform.telemetry.query (GET /api/telemetry/query?productId=peakpulse&eventType=sync_upload_failed)
input: {
  userId: string;
  deviceId?: string;
  failureEvent: {
    eventType: 'sync_upload_failed';
    platform: 'ios' | 'android';
    appVersion: string;
    errorCode: string;       // e.g. 'network_timeout', 'auth_401', 'cosmos_conflict'
    sessionId?: string;      // the PeakPulse session that failed to upload
    timestamp: string;
  };
}
output: {
  userId: string;
  deviceId: string | null;
  queueDepth: number;              // number of unsynced sessions pending upload
  lastSuccessfulSync: string | null; // ISO 8601
  failurePattern: 'transient' | 'auth' | 'persistent' | 'unknown';
  recentFailureCount: number;      // failures in last 24 h
  shouldStartDiagnostics: boolean; // true if recentFailureCount >= 2 or pattern != 'transient'
}

DiagnosticsSessionAgent

// Tool: platform.diagnostics.sessions.create
// POST /api/diagnostics/sessions
// Only called when SyncFailureInspectorAgent.shouldStartDiagnostics === true
input: {
  userId: string;
  deviceId: string | null;
  productId: 'peakpulse';
  failurePattern: string;
  collectionLevel: 'debug' | 'trace'; // 'trace' for auth failures, 'debug' otherwise
}
output: {
  sessionId: string;
  sessionStatus: 'active';
  expiresAt: string; // ISO 8601
  collectionLevel: string;
  captureNetwork: true; // always true for sync diagnostics — network traces are the signal
}

SyncRetryObserverAgent

// Tool: platform.diagnostics.sessions.get  (polls status)
// Tool: platform.diagnostics.sessions.getLogs (fetches captured log entries)
// Tool: platform.diagnostics.sessions.getTraces (fetches OTel network spans)
// Polls until a new sync attempt is captured OR the session expires
input: {
  sessionId: string;
  pollIntervalSeconds: number;  // default 30
  maxWaitMinutes: number;       // default 15
}
output: {
  syncAttemptDetected: boolean;
  networkTraces: unknown[];     // raw OTel spans from the sync attempt
  logEntries: unknown[];        // filtered for 'sync' module
  capturedAt: string | null;
  timedOut: boolean;
}

SyncDiagnosticReportAgent

// Tool: platform.telemetry.events (POST /api/telemetry/events — logs the report as a structured event)
// Tool: extraction.run (POST /api/extract — optional: extract error patterns from log text)
input: {
  userId: string;
  deviceId: string | null;
  sessionId: string;
  failurePattern: string;
  networkTraces: unknown[];
  logEntries: unknown[];
  syncAttemptDetected: boolean;
  timedOut: boolean;
}
output: {
  reportId: string;           // telemetry event ID
  rootCauseSummary: string;   // short natural-language summary
  recommendedAction: string;  // e.g. 'force token refresh', 'check Cosmos conflict resolution', 'retry on wifi'
  extractedEntities: unknown[]; // from extraction-service (error codes, URLs, status codes)
  diagnosticSessionId: string;
}

Data model

interface SyncDiagnosticReport {
  productId: 'peakpulse';
  reportId: string;
  userId: string;
  deviceId: string | null;
  failurePattern: 'transient' | 'auth' | 'persistent' | 'unknown';
  diagnosticsSessionId: string;
  syncAttemptDetected: boolean;
  rootCauseSummary: string;
  recommendedAction: string;
  extractedEntities: Array<{
    extraction_class: string;
    extraction_text: string;
  }>;
  generatedAt: string; // ISO 8601
}

Decision gate: when to start diagnostics

sync_upload_failed event received
         │
         ▼
recentFailureCount >= 2?  ──── yes ──► start DiagnosticsSessionAgent
         │
         no
         │
failurePattern == 'auth'? ──── yes ──► start DiagnosticsSessionAgent (trace level)
         │
         no
         │
         └──► log telemetry only, skip diagnostics (transient, single failure)

Error handling

  • If DiagnosticsSessionAgent fails to create a session (e.g., platform-service unavailable), the report is still generated from the failure event alone — diagnostics collection is best-effort.
  • If SyncRetryObserverAgent times out (no retry within maxWaitMinutes), the report is marked syncAttemptDetected: false with timedOut: true.
  • If extraction-service is unavailable, SyncDiagnosticReportAgent skips entity extraction and sets extractedEntities: [].
  • The diagnostics session is closed (status: completed) after the report is generated regardless of outcome.

MCP tool surface

peakpulse.sync.diagnose(userId: string, deviceId?: string): SyncDiagnosticReport
peakpulse.sync.status(userId: string): { queueDepth: number; lastSuccessfulSync: string | null; recentFailureCount: number }

Implementation checklist

  • SyncFailureInspectorAgent — wraps peakpulse.syncStatus + telemetry query for recent failures
  • DiagnosticsSessionAgent — calls platform.diagnostics.sessions.create with captureNetwork: true
  • SyncRetryObserverAgent — polls getLogs + getTraces on active session until retry detected or timeout
  • SyncDiagnosticReportAgent — assembles report, calls extraction.run (best-effort), logs telemetry event
  • Event subscription: subscribe to sync_upload_failed telemetry events for productId=peakpulse
  • Decision gate: implement shouldStartDiagnostics threshold (≥ 2 failures or auth pattern)
  • Session cleanup: close diagnostics session after report is generated
  • MCP tool registration: peakpulse.sync.diagnose + peakpulse.sync.status
  • Integration test: simulate sync_upload_failed event → verify session created + report generated