diff --git a/services/platform-service/src/modules/telemetry/routes.ts b/services/platform-service/src/modules/telemetry/routes.ts index b68546b6..7657c758 100644 --- a/services/platform-service/src/modules/telemetry/routes.ts +++ b/services/platform-service/src/modules/telemetry/routes.ts @@ -39,6 +39,35 @@ const PII_SCAN_ENABLED = process.env.TELEMETRY_PII_SCAN_ENABLED !== 'false'; const CLIENT_BATCH_SIZE = parseInt(process.env.TELEMETRY_CLIENT_BATCH_SIZE ?? '20', 10); const CLIENT_FLUSH_MS = parseInt(process.env.TELEMETRY_CLIENT_FLUSH_MS ?? '60000', 10); const CLIENT_MAX_QUEUE = parseInt(process.env.TELEMETRY_CLIENT_MAX_QUEUE ?? '200', 10); +const RATE_LIMIT_PER_MIN = parseInt(process.env.TELEMETRY_RATE_LIMIT_PER_MIN ?? '100', 10); +const RATE_LIMIT_WINDOW_MS = 60_000; + +// ─── Rate limiter (in-memory sliding window per installId) ────── + +const rateBuckets = new Map(); + +export function checkRateLimit(installId: string, eventCount: number): boolean { + const now = Date.now(); + const bucket = rateBuckets.get(installId); + if (!bucket || now - bucket.windowStart > RATE_LIMIT_WINDOW_MS) { + rateBuckets.set(installId, { count: eventCount, windowStart: now }); + return true; + } + if (bucket.count + eventCount > RATE_LIMIT_PER_MIN) { + return false; + } + bucket.count += eventCount; + return true; +} + +// Periodic cleanup of stale rate-limit buckets (every 5 min) +const _cleanupTimer = globalThis.setInterval(() => { + const cutoff = Date.now() - RATE_LIMIT_WINDOW_MS * 2; + for (const [key, bucket] of rateBuckets) { + if (bucket.windowStart < cutoff) rateBuckets.delete(key); + } +}, 300_000); +if (typeof _cleanupTimer === 'object' && 'unref' in _cleanupTimer) _cleanupTimer.unref(); /** PII patterns — reject events containing these. */ const PII_PATTERNS = [ @@ -334,16 +363,36 @@ export async function telemetryRoutes(app: FastifyInstance) { } const { productId, events } = parsed.data; + + // Rate limiting per installId + const rateLimitKey = installToken || req.jwtPayload?.sub || 'unknown'; + if (!checkRateLimit(rateLimitKey, events.length)) { + reply.code(429); + return { + accepted: 0, + rejected: events.length, + errors: [{ index: 0, reason: `Rate limit exceeded (${RATE_LIMIT_PER_MIN} events/min)` }], + serverTime: new Date().toISOString(), + }; + } + + // Batch deduplication by event.id + const seenIds = new Set(); + const dedupedEvents = events.filter(e => { + if (seenIds.has(e.id)) return false; + seenIds.add(e.id); + return true; + }); const now = new Date().toISOString(); const ttl = DEFAULT_EVENT_TTL_DAYS * 86400; let accepted = 0; - let rejected = 0; + let rejected = events.length - dedupedEvents.length; // duplicates const errors: Array<{ index: number; reason: string }> = []; const docsToInsert: TelemetryEventDoc[] = []; - for (let i = 0; i < events.length; i++) { - const event = events[i]; + for (let i = 0; i < dedupedEvents.length; i++) { + const event = dedupedEvents[i]; // Validate productId matches request-level if (event.productId !== productId) { @@ -395,8 +444,8 @@ export async function telemetryRoutes(app: FastifyInstance) { }; }); - // ── Collection config (client poll) ─────────────────────────── - app.get('/telemetry/config', async req => { + // ── Collection config (client poll) with ETag caching ───────── + app.get('/telemetry/config', async (req, reply) => { if (!TELEMETRY_ENABLED) { return { enabled: false, @@ -415,7 +464,19 @@ export async function telemetryRoutes(app: FastifyInstance) { const allPolicies = await repo.listPolicies(productId); const activePolicies = allPolicies.filter(p => p.enabled && policyMatchesContext(p, ctx)); - return mergePolicies(activePolicies); + const config = mergePolicies(activePolicies); + + // ETag caching: hash the config to avoid redundant client processing + const etag = createHash('sha256').update(JSON.stringify(config)).digest('hex').substring(0, 16); + const clientEtag = req.headers['if-none-match']; + if (clientEtag === `"${etag}"`) { + reply.code(304); + return; + } + + reply.header('ETag', `"${etag}"`); + reply.header('Cache-Control', 'private, max-age=60'); + return config; }); // ── Admin: query events ─────────────────────────────────────── diff --git a/services/platform-service/src/modules/telemetry/telemetry.test.ts b/services/platform-service/src/modules/telemetry/telemetry.test.ts index 18c625fd..3a6787c1 100644 --- a/services/platform-service/src/modules/telemetry/telemetry.test.ts +++ b/services/platform-service/src/modules/telemetry/telemetry.test.ts @@ -19,6 +19,7 @@ import { generateFingerprint, policyMatchesContext, mergePolicies, + checkRateLimit, } from './routes.js'; // ─── Minimal valid event for reuse ────────────────────────────────── @@ -648,3 +649,27 @@ describe('mergePolicies', () => { expect(config.modules.length).toBe(2); }); }); + +// ─── checkRateLimit ───────────────────────────────────────────── + +describe('checkRateLimit', () => { + it('allows events within the limit', () => { + const id = `test_rate_${Date.now()}`; + expect(checkRateLimit(id, 50)).toBe(true); + expect(checkRateLimit(id, 49)).toBe(true); + }); + + it('rejects events that exceed the limit', () => { + const id = `test_rate_exceed_${Date.now()}`; + expect(checkRateLimit(id, 80)).toBe(true); + expect(checkRateLimit(id, 30)).toBe(false); // 80 + 30 > 100 + }); + + it('different installIds have separate buckets', () => { + const id1 = `test_rate_a_${Date.now()}`; + const id2 = `test_rate_b_${Date.now()}`; + expect(checkRateLimit(id1, 90)).toBe(true); + expect(checkRateLimit(id2, 90)).toBe(true); // separate bucket + expect(checkRateLimit(id1, 20)).toBe(false); // id1 exceeded + }); +});