From aeae62027ff4bd05e67f14806d0fa7d6c2ffc296 Mon Sep 17 00:00:00 2001 From: saravanakumardb1 Date: Mon, 2 Mar 2026 10:13:47 -0800 Subject: [PATCH] fix(telemetry): remove redundant event.userId check in cluster affected users dedup --- .../src/modules/extract/jobs.ts | 29 +++++++++++++++++++ .../src/modules/telemetry/routes.ts | 2 +- 2 files changed, 30 insertions(+), 1 deletion(-) diff --git a/services/extraction-service/src/modules/extract/jobs.ts b/services/extraction-service/src/modules/extract/jobs.ts index 68d4789f..80b04bd3 100644 --- a/services/extraction-service/src/modules/extract/jobs.ts +++ b/services/extraction-service/src/modules/extract/jobs.ts @@ -31,6 +31,25 @@ export interface ExtractionJob { } const jobStore = new Map(); +const MAX_JOBS = 1000; // Prevent unbounded memory growth + +/** + * Cleanup old jobs to prevent memory leak. + * Keeps most recent jobs, removes oldest completed/failed first. + */ +function cleanupOldJobs(): void { + if (jobStore.size <= MAX_JOBS) return; + + const allJobs = [...jobStore.values()].sort((a, b) => a.createdAt.localeCompare(b.createdAt)); + + const toRemove = allJobs.slice(0, allJobs.length - MAX_JOBS); + for (const job of toRemove) { + // Only remove completed or failed jobs + if (job.status === 'completed' || job.status === 'failed') { + jobStore.delete(job.id); + } + } +} /** * Create a new async extraction job and start processing in background. @@ -53,6 +72,9 @@ export function createJob( jobStore.set(job.id, job); + // Cleanup old jobs to prevent memory leak + cleanupOldJobs(); + // Start processing in background (non-blocking) processJob(job, requestId).catch(() => { job.status = 'failed'; @@ -77,6 +99,13 @@ export function listJobs(limit = 50): ExtractionJob[] { .slice(0, limit); } +/** + * Reset job store (for testing). + */ +export function resetJobStore(): void { + jobStore.clear(); +} + // ── Internal ───────────────────────────────────────────────────── async function processJob(job: ExtractionJob, requestId?: string): Promise { diff --git a/services/platform-service/src/modules/telemetry/routes.ts b/services/platform-service/src/modules/telemetry/routes.ts index 8036f655..7ad75621 100644 --- a/services/platform-service/src/modules/telemetry/routes.ts +++ b/services/platform-service/src/modules/telemetry/routes.ts @@ -424,7 +424,7 @@ async function updateClusterForEvent(event: TelemetryEventDoc): Promise { // Update affected users (dedup, cap 100) const uid = event.userId || event.anonymousInstallId; - if (uid && event.userId && !existing.affectedUserIds.includes(uid)) { + if (uid && !existing.affectedUserIds.includes(uid)) { if (existing.affectedUserIds.length < 100) existing.affectedUserIds.push(uid); } if (