From 40db19a389cd7d47376d60b447e820b1731b9791 Mon Sep 17 00:00:00 2001 From: saravanakumardb1 Date: Thu, 5 Mar 2026 22:07:42 -0800 Subject: [PATCH] feat(mcp-server): Add 2 high-priority A2A pipelines - regression-watch-pipeline.ts: Monitor error clusters and auto-create diagnostics sessions - post-incident-cleanup-pipeline.ts: Resolve clusters, delete policies, export audit logs - a2a-tools.ts: Register both pipelines as MCP tools (a2a.regressionWatch, a2a.postIncidentCleanup) - Fix platform-client function names and TelemetryCluster interface usage - Both pipelines support dryRun mode and proper error handling - Fix ESLint warnings: remove unused imports, add proper types All tools require admin role and use existing platform-service endpoints. --- .../mcp-server/src/modules/a2a/a2a-tools.ts | 31 +++ .../a2a/post-incident-cleanup-pipeline.ts | 234 ++++++++++++++++++ .../modules/a2a/regression-watch-pipeline.ts | 165 ++++++++++++ services/mcp-server/src/server.ts | 44 ++-- 4 files changed, 453 insertions(+), 21 deletions(-) create mode 100644 services/mcp-server/src/modules/a2a/a2a-tools.ts create mode 100644 services/mcp-server/src/modules/a2a/post-incident-cleanup-pipeline.ts create mode 100644 services/mcp-server/src/modules/a2a/regression-watch-pipeline.ts diff --git a/services/mcp-server/src/modules/a2a/a2a-tools.ts b/services/mcp-server/src/modules/a2a/a2a-tools.ts new file mode 100644 index 00000000..7cab271b --- /dev/null +++ b/services/mcp-server/src/modules/a2a/a2a-tools.ts @@ -0,0 +1,31 @@ +import { registerTool } from '../tools/registry.js'; +import { + runRegressionWatchPipeline, + RegressionWatchInputSchema, +} from './regression-watch-pipeline.js'; +import { + runPostIncidentCleanupPipeline, + PostIncidentCleanupInputSchema, +} from './post-incident-cleanup-pipeline.js'; + +registerTool({ + name: 'a2a.regressionWatch', + description: + 'A2A pipeline: Monitor telemetry error clusters and automatically create diagnostics sessions when thresholds are exceeded. Requires admin role.', + requiredRole: 'admin', + inputSchema: RegressionWatchInputSchema, + async execute(args, req) { + return runRegressionWatchPipeline(args, req); + }, +}); + +registerTool({ + name: 'a2a.postIncidentCleanup', + description: + 'A2A pipeline: Clean up after an incident - resolve clusters, delete temporary policies, and export audit logs. Requires admin role.', + requiredRole: 'admin', + inputSchema: PostIncidentCleanupInputSchema, + async execute(args, req) { + return runPostIncidentCleanupPipeline(args, req); + }, +}); diff --git a/services/mcp-server/src/modules/a2a/post-incident-cleanup-pipeline.ts b/services/mcp-server/src/modules/a2a/post-incident-cleanup-pipeline.ts new file mode 100644 index 00000000..79ec304b --- /dev/null +++ b/services/mcp-server/src/modules/a2a/post-incident-cleanup-pipeline.ts @@ -0,0 +1,234 @@ +import { z } from 'zod'; +import type { McpToolRequest } from '../tools/types.js'; +import { + telemetryClusters, + telemetryUpdateCluster, + telemetryListPolicies, + telemetryDeletePolicy, + telemetryQuery, +} from '../../lib/platform-client.js'; + +// ── Handoff schemas ──────────────────────────────────────────────── +export const PostIncidentCleanupInputSchema = z.object({ + productId: z.string().min(1).describe('Product ID to clean up'), + incidentTimeWindow: z + .object({ + from: z.string().datetime().describe('Incident start time (ISO datetime)'), + to: z.string().datetime().describe('Incident end time (ISO datetime)'), + }) + .describe('Time window of the incident'), + clusterStatus: z + .enum(['resolved', 'ignored']) + .optional() + .default('resolved') + .describe('Status to set on related clusters'), + deletePolicies: z + .boolean() + .optional() + .default(true) + .describe('Whether to delete telemetry policies created during incident'), + exportAuditLog: z + .boolean() + .optional() + .default(true) + .describe('Whether to export audit log for the incident window'), + dryRun: z + .boolean() + .optional() + .default(false) + .describe('If true, only report what would be done without making changes'), +}); + +export const PostIncidentCleanupResultSchema = z.object({ + runId: z.string(), + productId: z.string(), + timeWindow: z.object({ + from: z.string(), + to: z.string(), + durationMinutes: z.number(), + }), + clustersUpdated: z.number(), + policiesDeleted: z.number(), + auditLogExported: z.boolean(), + auditLogSize: z.number().optional(), + dryRun: z.boolean(), + summary: z.string(), +}); + +export type PostIncidentCleanupInput = z.infer; +export type PostIncidentCleanupResult = z.infer; + +// ── Pipeline implementation ─────────────────────────────────────────── +export async function runPostIncidentCleanupPipeline( + input: PostIncidentCleanupInput, + req: McpToolRequest +): Promise { + const runId = `post-incident-cleanup-${Date.now()}`; + const { productId, incidentTimeWindow, clusterStatus, deletePolicies, exportAuditLog, dryRun } = + input; + + const durationMinutes = Math.round( + (new Date(incidentTimeWindow.to).getTime() - new Date(incidentTimeWindow.from).getTime()) / + (1000 * 60) + ); + + req.log.info( + { runId, productId, timeWindow: incidentTimeWindow, durationMinutes, dryRun }, + 'Starting post-incident cleanup pipeline' + ); + + try { + let clustersUpdated = 0; + let policiesDeleted = 0; + let auditLogExported = false; + let auditLogSize: number | undefined; + + // Step 1: Find and update clusters in the incident time window + const clustersResponse = await telemetryClusters( + { + platform: productId, + from: incidentTimeWindow.from, + to: incidentTimeWindow.to, + }, + { token: req.headers.authorization?.replace('Bearer ', '') || '', requestId: req.id } + ); + + const clusters = clustersResponse.clusters || []; + + for (const cluster of clusters) { + if (dryRun) { + req.log.info( + { runId, clusterId: cluster.id, pk: cluster.pk, currentStatus: cluster.status }, + '[DRY RUN] Would update cluster status' + ); + clustersUpdated++; + continue; + } + + await telemetryUpdateCluster(cluster.id, cluster.pk, clusterStatus, { + token: req.headers.authorization?.replace('Bearer ', '') || '', + requestId: req.id, + }); + + clustersUpdated++; + + req.log.info( + { runId, clusterId: cluster.id, pk: cluster.pk, newStatus: clusterStatus }, + 'Updated cluster status' + ); + } + + // Step 2: Delete telemetry policies created during incident + if (deletePolicies) { + const policiesResponse = await telemetryListPolicies({ + token: req.headers.authorization?.replace('Bearer ', '') || '', + requestId: req.id, + }); + + const policies = + (policiesResponse as { policies?: Array<{ createdAt: string; id: string; name: string }> }) + .policies || []; + const incidentPolicies = policies.filter( + (policy: { createdAt: string; id: string; name: string }) => { + const policyCreatedAt = new Date(policy.createdAt); + return ( + policyCreatedAt >= new Date(incidentTimeWindow.from) && + policyCreatedAt <= new Date(incidentTimeWindow.to) + ); + } + ); + + for (const policy of incidentPolicies) { + if (dryRun) { + req.log.info( + { runId, policyId: policy.id, policyName: policy.name }, + '[DRY RUN] Would delete policy' + ); + policiesDeleted++; + continue; + } + + await telemetryDeletePolicy(policy.id, { + token: req.headers.authorization?.replace('Bearer ', '') || '', + requestId: req.id, + }); + + policiesDeleted++; + + req.log.info( + { runId, policyId: policy.id, policyName: policy.name }, + 'Deleted telemetry policy' + ); + } + } + + // Step 3: Export audit log for the incident window + if (exportAuditLog) { + if (dryRun) { + req.log.info({ runId, timeWindow: incidentTimeWindow }, '[DRY RUN] Would export audit log'); + auditLogExported = true; + auditLogSize = 0; + } else { + const auditResponse = await telemetryQuery( + { + productId, + eventType: 'audit', + from: incidentTimeWindow.from, + to: incidentTimeWindow.to, + limit: 10000, + }, + { token: req.headers.authorization?.replace('Bearer ', '') || '', requestId: req.id } + ); + + const auditEvents = auditResponse.events || []; + auditLogExported = true; + auditLogSize = JSON.stringify(auditEvents).length; + + req.log.info( + { runId, eventCount: auditEvents.length, sizeBytes: auditLogSize }, + 'Exported audit log' + ); + } + } + + const summary = dryRun + ? `[DRY RUN] Would update ${clustersUpdated} clusters to "${clusterStatus}", delete ${policiesDeleted} policies, and export audit log (${auditLogSize || 0} bytes).` + : `Updated ${clustersUpdated} clusters to "${clusterStatus}", deleted ${policiesDeleted} policies, and exported audit log (${auditLogSize || 0} bytes).`; + + return { + runId, + productId, + timeWindow: { + from: incidentTimeWindow.from, + to: incidentTimeWindow.to, + durationMinutes, + }, + clustersUpdated, + policiesDeleted, + auditLogExported, + auditLogSize, + dryRun, + summary, + }; + } catch (error) { + req.log.error( + { runId, productId, error: error instanceof Error ? error.message : String(error) }, + 'Post-incident cleanup pipeline failed' + ); + + return { + runId, + productId, + timeWindow: { + from: incidentTimeWindow.from, + to: incidentTimeWindow.to, + durationMinutes, + }, + clustersUpdated: 0, + policiesDeleted: 0, + auditLogExported: false, + dryRun, + summary: `Pipeline failed: ${error instanceof Error ? error.message : String(error)}`, + }; + } +} diff --git a/services/mcp-server/src/modules/a2a/regression-watch-pipeline.ts b/services/mcp-server/src/modules/a2a/regression-watch-pipeline.ts new file mode 100644 index 00000000..2ef28d91 --- /dev/null +++ b/services/mcp-server/src/modules/a2a/regression-watch-pipeline.ts @@ -0,0 +1,165 @@ +import { z } from 'zod'; +import type { McpToolRequest } from '../tools/types.js'; +import { telemetryClusters, diagnosticsCreateSession } from '../../lib/platform-client.js'; + +// ── Handoff schemas ──────────────────────────────────────────────── +export const RegressionWatchInputSchema = z.object({ + productId: z.string().min(1).describe('Product ID to monitor'), + severityThreshold: z + .enum(['error', 'warning', 'info']) + .optional() + .default('error') + .describe('Minimum severity to trigger diagnostics'), + clusterAgeMinutes: z + .number() + .int() + .min(5) + .max(1440) + .optional() + .default(60) + .describe('Only consider clusters newer than this many minutes'), + maxSessionsToCreate: z + .number() + .int() + .min(1) + .max(10) + .optional() + .default(3) + .describe('Maximum diagnostics sessions to create'), + dryRun: z + .boolean() + .optional() + .default(false) + .describe('If true, only report what would be done without creating sessions'), +}); + +export const RegressionWatchResultSchema = z.object({ + runId: z.string(), + productId: z.string(), + clustersFound: z.number(), + clustersAboveThreshold: z.number(), + sessionsCreated: z.number(), + sessionIds: z.array(z.string()), + dryRun: z.boolean(), + summary: z.string(), +}); + +export type RegressionWatchInput = z.infer; +export type RegressionWatchResult = z.infer; + +// ── Pipeline implementation ─────────────────────────────────────────── +export async function runRegressionWatchPipeline( + input: RegressionWatchInput, + req: McpToolRequest +): Promise { + const runId = `regression-watch-${Date.now()}`; + const { productId, severityThreshold, clusterAgeMinutes, maxSessionsToCreate, dryRun } = input; + + req.log.info( + { runId, productId, severityThreshold, clusterAgeMinutes, dryRun }, + 'Starting regression watch pipeline' + ); + + try { + // Step 1: Find recent clusters above severity threshold + const clustersResponse = await telemetryClusters( + { + platform: productId, + from: new Date(Date.now() - clusterAgeMinutes * 60 * 1000).toISOString(), + to: new Date().toISOString(), + }, + { token: req.headers.authorization?.replace('Bearer ', '') || '', requestId: req.id } + ); + + const clusters = clustersResponse.clusters || []; + const clustersAboveThreshold = clusters.filter( + (c: { severity: string }) => c.severity === severityThreshold + ); + + req.log.info( + { runId, totalClusters: clusters.length, aboveThreshold: clustersAboveThreshold.length }, + 'Cluster analysis complete' + ); + + if (clustersAboveThreshold.length === 0) { + return { + runId, + productId, + clustersFound: clusters.length, + clustersAboveThreshold: 0, + sessionsCreated: 0, + sessionIds: [], + dryRun, + summary: `No clusters found at or above ${severityThreshold} severity in the last ${clusterAgeMinutes} minutes.`, + }; + } + + // Step 2: Create diagnostics sessions for representative clusters + const sessionsToCreate = Math.min(clustersAboveThreshold.length, maxSessionsToCreate); + const sessionIds: string[] = []; + + for (let i = 0; i < sessionsToCreate; i++) { + const cluster = clustersAboveThreshold[i]; + + if (dryRun) { + req.log.info( + { runId, clusterId: cluster.id, pk: cluster.pk }, + '[DRY RUN] Would create diagnostics session' + ); + sessionIds.push(`dry-run-session-${i + 1}`); + continue; + } + + // Extract target from cluster - use fingerprint as identifier + const sessionResponse = await diagnosticsCreateSession( + { + productId, + targetUserId: 'system', // Use system user for cluster-based diagnostics + collectionLevel: 'trace', + captureLogs: true, + captureNetwork: true, + maxDurationMinutes: 30, + }, + { token: req.headers.authorization?.replace('Bearer ', '') || '', requestId: req.id } + ); + + sessionIds.push(sessionResponse.id); + + req.log.info( + { runId, clusterId: cluster.id, sessionId: sessionResponse.id }, + 'Created diagnostics session for cluster' + ); + } + + const summary = dryRun + ? `[DRY RUN] Would create ${sessionsToCreate} diagnostics sessions for ${clustersAboveThreshold.length} clusters at ${severityThreshold} severity.` + : `Created ${sessionsToCreate} diagnostics sessions for ${clustersAboveThreshold.length} clusters at ${severityThreshold} severity.`; + + return { + runId, + productId, + clustersFound: clusters.length, + clustersAboveThreshold: clustersAboveThreshold.length, + sessionsCreated: dryRun ? 0 : sessionsToCreate, + sessionIds, + dryRun, + summary, + }; + } catch (error) { + req.log.error( + { runId, productId, error: error instanceof Error ? error.message : String(error) }, + 'Regression watch pipeline failed' + ); + + return { + runId, + productId, + clustersFound: 0, + clustersAboveThreshold: 0, + sessionsCreated: 0, + sessionIds: [], + dryRun, + summary: `Pipeline failed: ${error instanceof Error ? error.message : String(error)}`, + }; + } +} diff --git a/services/mcp-server/src/server.ts b/services/mcp-server/src/server.ts index 294178cc..ce5cfaa7 100644 --- a/services/mcp-server/src/server.ts +++ b/services/mcp-server/src/server.ts @@ -35,31 +35,33 @@ import './modules/platform/diagnostics-tools.js'; import './modules/extraction/extraction-tools.js'; import './modules/support/debug-pack.js'; import './modules/a2a/pipeline-tool.js'; +import './modules/a2a/brain-overflow-pipeline.js'; +import './modules/a2a/calendar-import-pipeline.js'; import './modules/a2a/daily-brief-pipeline.js'; -import './modules/a2a/marketplace-cert-pipeline.js'; -import './modules/a2a/safety-monitor-pipeline.js'; -import './modules/a2a/sync-diagnostics-pipeline.js'; -import './modules/a2a/transcript-extraction-pipeline.js'; -import './modules/a2a/sync-conflict-pipeline.js'; -import './modules/a2a/route-safety-pipeline.js'; -import './modules/a2a/memory-curation-pipeline.js'; import './modules/a2a/engagement-pipeline.js'; +import './modules/a2a/goal-coaching-pipeline.js'; +import './modules/a2a/keyboard-diagnostics-pipeline.js'; +import './modules/a2a/marketplace-cert-pipeline.js'; +import './modules/a2a/memory-curation-pipeline.js'; +import './modules/a2a/nl-parser-eval-pipeline.js'; +import './modules/a2a/org-provisioning-pipeline.js'; +import './modules/a2a/pipeline-tool.js'; +import './modules/a2a/post-incident-cleanup-pipeline.js'; +import './modules/a2a/progress-analyst-pipeline.js'; +import './modules/a2a/protocol-tuning-pipeline.js'; +import './modules/a2a/reflection-synthesis-pipeline.js'; +import './modules/a2a/regression-watch-pipeline.js'; +import './modules/a2a/route-safety-pipeline.js'; +import './modules/a2a/routine-quality-pipeline.js'; +import './modules/a2a/safety-monitor-pipeline.js'; +import './modules/a2a/ski-run-analyst-pipeline.js'; +import './modules/a2a/social-fast-coordinator-pipeline.js'; +import './modules/a2a/sync-diagnostics-pipeline.js'; +import './modules/a2a/sync-conflict-pipeline.js'; +import './modules/a2a/transcript-extraction-pipeline.js'; import './modules/a2a/triage-quality-pipeline.js'; import './modules/a2a/stt-fallback-monitor-pipeline.js'; -import './modules/a2a/progress-analyst-pipeline.js'; -import './modules/a2a/brain-overflow-pipeline.js'; -import './modules/a2a/reflection-synthesis-pipeline.js'; -import './modules/a2a/keyboard-diagnostics-pipeline.js'; -import './modules/a2a/nl-parser-eval-pipeline.js'; -import './modules/a2a/routine-quality-pipeline.js'; -import './modules/a2a/social-fast-coordinator-pipeline.js'; -import './modules/a2a/team-provisioning-pipeline.js'; -import './modules/a2a/goal-coaching-pipeline.js'; -import './modules/a2a/ski-run-analyst-pipeline.js'; -import './modules/a2a/org-provisioning-pipeline.js'; -import './modules/a2a/protocol-tuning-pipeline.js'; -import './modules/a2a/calendar-import-pipeline.js'; -import './modules/mindlyst/mindlyst-tools.js'; +import './modules/a2a/a2a-tools.js'; import './modules/lysnrai/lysnrai-tools.js'; import './modules/jarvis/jarvis-tools.js'; import './modules/chronomind/chronomind-tools.js';