fix(mcp-server): Improve error handling in A2A pipelines

- regression-watch-pipeline.ts: Add try/catch around session creation, continue on failures
- post-incident-cleanup-pipeline.ts: Add try/catch around policy deletion and audit export
- Fix extractionResetProductRateLimit optional parameter pattern
- Update return values to use actual counts instead of targets
- All pipelines now continue processing individual items instead of failing entirely
- Add proper type casting for audit response events array
This commit is contained in:
saravanakumardb1 2026-03-05 22:09:56 -08:00
parent ccf71b898c
commit 3a7139790c
3 changed files with 88 additions and 47 deletions

View File

@ -192,12 +192,12 @@ export async function extractionGetProductRateLimitStatus(
export async function extractionResetProductRateLimit( export async function extractionResetProductRateLimit(
productId: string, productId: string,
opts: { requestId?: string } opts?: { requestId?: string }
): Promise<unknown> { ): Promise<unknown> {
const url = `${config.EXTRACTION_SERVICE_URL}/api/extract/rate-limits/product/reset`; const url = `${config.EXTRACTION_SERVICE_URL}/api/extract/rate-limits/product/reset`;
const headers: Record<string, string> = { const headers: Record<string, string> = {
'Content-Type': 'application/json', 'Content-Type': 'application/json',
...(opts.requestId ? { 'x-request-id': opts.requestId } : {}), ...(opts?.requestId ? { 'x-request-id': opts.requestId } : {}),
}; };
const res = await fetch(url, { const res = await fetch(url, {
method: 'POST', method: 'POST',

View File

@ -148,17 +148,30 @@ export async function runPostIncidentCleanupPipeline(
continue; continue;
} }
await telemetryDeletePolicy(policy.id, { try {
token: req.headers.authorization?.replace('Bearer ', '') || '', await telemetryDeletePolicy(policy.id, {
requestId: req.id, token: req.headers.authorization?.replace('Bearer ', '') || '',
}); requestId: req.id,
});
policiesDeleted++; policiesDeleted++;
req.log.info( req.log.info(
{ runId, policyId: policy.id, policyName: policy.name }, { runId, policyId: policy.id, policyName: policy.name },
'Deleted telemetry policy' 'Deleted telemetry policy'
); );
} catch (policyError) {
req.log.warn(
{
runId,
policyId: policy.id,
policyName: policy.name,
error: policyError instanceof Error ? policyError.message : String(policyError),
},
'Failed to delete telemetry policy (continuing)'
);
// Continue with next policy instead of failing entire pipeline
}
} }
} }
@ -169,25 +182,37 @@ export async function runPostIncidentCleanupPipeline(
auditLogExported = true; auditLogExported = true;
auditLogSize = 0; auditLogSize = 0;
} else { } else {
const auditResponse = await telemetryQuery( try {
{ const auditResponse = await telemetryQuery(
productId, {
eventType: 'audit', productId,
from: incidentTimeWindow.from, eventType: 'audit',
to: incidentTimeWindow.to, from: incidentTimeWindow.from,
limit: 10000, to: incidentTimeWindow.to,
}, limit: 10000,
{ token: req.headers.authorization?.replace('Bearer ', '') || '', requestId: req.id } },
); { token: req.headers.authorization?.replace('Bearer ', '') || '', requestId: req.id }
);
const auditEvents = auditResponse.events || []; const auditEvents = (auditResponse as { events?: unknown[] }).events || [];
auditLogExported = true; auditLogExported = true;
auditLogSize = JSON.stringify(auditEvents).length; auditLogSize = JSON.stringify(auditEvents).length;
req.log.info( req.log.info(
{ runId, eventCount: auditEvents.length, sizeBytes: auditLogSize }, { runId, eventCount: auditEvents.length, sizeBytes: auditLogSize },
'Exported audit log' 'Exported audit log'
); );
} catch (auditError) {
req.log.warn(
{
runId,
timeWindow: incidentTimeWindow,
error: auditError instanceof Error ? auditError.message : String(auditError),
},
'Failed to export audit log (continuing)'
);
// Continue without audit log instead of failing entire pipeline
}
} }
} }

View File

@ -97,6 +97,7 @@ export async function runRegressionWatchPipeline(
// Step 2: Create diagnostics sessions for representative clusters // Step 2: Create diagnostics sessions for representative clusters
const sessionsToCreate = Math.min(clustersAboveThreshold.length, maxSessionsToCreate); const sessionsToCreate = Math.min(clustersAboveThreshold.length, maxSessionsToCreate);
const sessionIds: string[] = []; const sessionIds: string[] = [];
let sessionsCreated = 0;
for (let i = 0; i < sessionsToCreate; i++) { for (let i = 0; i < sessionsToCreate; i++) {
const cluster = clustersAboveThreshold[i]; const cluster = clustersAboveThreshold[i];
@ -107,40 +108,55 @@ export async function runRegressionWatchPipeline(
'[DRY RUN] Would create diagnostics session' '[DRY RUN] Would create diagnostics session'
); );
sessionIds.push(`dry-run-session-${i + 1}`); sessionIds.push(`dry-run-session-${i + 1}`);
sessionsCreated++;
continue; continue;
} }
// Extract target from cluster - use fingerprint as identifier try {
const sessionResponse = await diagnosticsCreateSession( // Extract target from cluster - use fingerprint as identifier
{ const sessionResponse = await diagnosticsCreateSession(
productId, {
targetUserId: 'system', // Use system user for cluster-based diagnostics productId,
collectionLevel: 'trace', targetUserId: 'system', // Use system user for cluster-based diagnostics
captureLogs: true, collectionLevel: 'trace',
captureNetwork: true, captureLogs: true,
maxDurationMinutes: 30, captureNetwork: true,
}, maxDurationMinutes: 30,
{ token: req.headers.authorization?.replace('Bearer ', '') || '', requestId: req.id } },
); { token: req.headers.authorization?.replace('Bearer ', '') || '', requestId: req.id }
);
sessionIds.push(sessionResponse.id); sessionIds.push(sessionResponse.id);
sessionsCreated++;
req.log.info( req.log.info(
{ runId, clusterId: cluster.id, sessionId: sessionResponse.id }, { runId, clusterId: cluster.id, sessionId: sessionResponse.id },
'Created diagnostics session for cluster' 'Created diagnostics session for cluster'
); );
} catch (sessionError) {
req.log.warn(
{
runId,
clusterId: cluster.id,
pk: cluster.pk,
error: sessionError instanceof Error ? sessionError.message : String(sessionError),
},
'Failed to create diagnostics session for cluster (continuing)'
);
// Continue with next cluster instead of failing entire pipeline
}
} }
const summary = dryRun const summary = dryRun
? `[DRY RUN] Would create ${sessionsToCreate} diagnostics sessions for ${clustersAboveThreshold.length} clusters at ${severityThreshold} severity.` ? `[DRY RUN] Would create ${sessionsToCreate} diagnostics sessions for ${clustersAboveThreshold.length} clusters at ${severityThreshold} severity.`
: `Created ${sessionsToCreate} diagnostics sessions for ${clustersAboveThreshold.length} clusters at ${severityThreshold} severity.`; : `Created ${sessionsCreated} diagnostics sessions for ${clustersAboveThreshold.length} clusters at ${severityThreshold} severity.`;
return { return {
runId, runId,
productId, productId,
clustersFound: clusters.length, clustersFound: clusters.length,
clustersAboveThreshold: clustersAboveThreshold.length, clustersAboveThreshold: clustersAboveThreshold.length,
sessionsCreated: dryRun ? 0 : sessionsToCreate, sessionsCreated: dryRun ? 0 : sessionsCreated,
sessionIds, sessionIds,
dryRun, dryRun,
summary, summary,