feat(telemetry): Phase 3 — rate limiting, batch dedup, ETag config caching (614 tests)
This commit is contained in:
parent
0edafab501
commit
2fb341048e
@ -39,6 +39,35 @@ const PII_SCAN_ENABLED = process.env.TELEMETRY_PII_SCAN_ENABLED !== 'false';
|
||||
const CLIENT_BATCH_SIZE = parseInt(process.env.TELEMETRY_CLIENT_BATCH_SIZE ?? '20', 10);
|
||||
const CLIENT_FLUSH_MS = parseInt(process.env.TELEMETRY_CLIENT_FLUSH_MS ?? '60000', 10);
|
||||
const CLIENT_MAX_QUEUE = parseInt(process.env.TELEMETRY_CLIENT_MAX_QUEUE ?? '200', 10);
|
||||
const RATE_LIMIT_PER_MIN = parseInt(process.env.TELEMETRY_RATE_LIMIT_PER_MIN ?? '100', 10);
|
||||
const RATE_LIMIT_WINDOW_MS = 60_000;
|
||||
|
||||
// ─── Rate limiter (in-memory sliding window per installId) ──────
|
||||
|
||||
const rateBuckets = new Map<string, { count: number; windowStart: number }>();
|
||||
|
||||
export function checkRateLimit(installId: string, eventCount: number): boolean {
|
||||
const now = Date.now();
|
||||
const bucket = rateBuckets.get(installId);
|
||||
if (!bucket || now - bucket.windowStart > RATE_LIMIT_WINDOW_MS) {
|
||||
rateBuckets.set(installId, { count: eventCount, windowStart: now });
|
||||
return true;
|
||||
}
|
||||
if (bucket.count + eventCount > RATE_LIMIT_PER_MIN) {
|
||||
return false;
|
||||
}
|
||||
bucket.count += eventCount;
|
||||
return true;
|
||||
}
|
||||
|
||||
// Periodic cleanup of stale rate-limit buckets (every 5 min)
|
||||
const _cleanupTimer = globalThis.setInterval(() => {
|
||||
const cutoff = Date.now() - RATE_LIMIT_WINDOW_MS * 2;
|
||||
for (const [key, bucket] of rateBuckets) {
|
||||
if (bucket.windowStart < cutoff) rateBuckets.delete(key);
|
||||
}
|
||||
}, 300_000);
|
||||
if (typeof _cleanupTimer === 'object' && 'unref' in _cleanupTimer) _cleanupTimer.unref();
|
||||
|
||||
/** PII patterns — reject events containing these. */
|
||||
const PII_PATTERNS = [
|
||||
@ -334,16 +363,36 @@ export async function telemetryRoutes(app: FastifyInstance) {
|
||||
}
|
||||
|
||||
const { productId, events } = parsed.data;
|
||||
|
||||
// Rate limiting per installId
|
||||
const rateLimitKey = installToken || req.jwtPayload?.sub || 'unknown';
|
||||
if (!checkRateLimit(rateLimitKey, events.length)) {
|
||||
reply.code(429);
|
||||
return {
|
||||
accepted: 0,
|
||||
rejected: events.length,
|
||||
errors: [{ index: 0, reason: `Rate limit exceeded (${RATE_LIMIT_PER_MIN} events/min)` }],
|
||||
serverTime: new Date().toISOString(),
|
||||
};
|
||||
}
|
||||
|
||||
// Batch deduplication by event.id
|
||||
const seenIds = new Set<string>();
|
||||
const dedupedEvents = events.filter(e => {
|
||||
if (seenIds.has(e.id)) return false;
|
||||
seenIds.add(e.id);
|
||||
return true;
|
||||
});
|
||||
const now = new Date().toISOString();
|
||||
const ttl = DEFAULT_EVENT_TTL_DAYS * 86400;
|
||||
|
||||
let accepted = 0;
|
||||
let rejected = 0;
|
||||
let rejected = events.length - dedupedEvents.length; // duplicates
|
||||
const errors: Array<{ index: number; reason: string }> = [];
|
||||
const docsToInsert: TelemetryEventDoc[] = [];
|
||||
|
||||
for (let i = 0; i < events.length; i++) {
|
||||
const event = events[i];
|
||||
for (let i = 0; i < dedupedEvents.length; i++) {
|
||||
const event = dedupedEvents[i];
|
||||
|
||||
// Validate productId matches request-level
|
||||
if (event.productId !== productId) {
|
||||
@ -395,8 +444,8 @@ export async function telemetryRoutes(app: FastifyInstance) {
|
||||
};
|
||||
});
|
||||
|
||||
// ── Collection config (client poll) ───────────────────────────
|
||||
app.get('/telemetry/config', async req => {
|
||||
// ── Collection config (client poll) with ETag caching ─────────
|
||||
app.get('/telemetry/config', async (req, reply) => {
|
||||
if (!TELEMETRY_ENABLED) {
|
||||
return {
|
||||
enabled: false,
|
||||
@ -415,7 +464,19 @@ export async function telemetryRoutes(app: FastifyInstance) {
|
||||
const allPolicies = await repo.listPolicies(productId);
|
||||
const activePolicies = allPolicies.filter(p => p.enabled && policyMatchesContext(p, ctx));
|
||||
|
||||
return mergePolicies(activePolicies);
|
||||
const config = mergePolicies(activePolicies);
|
||||
|
||||
// ETag caching: hash the config to avoid redundant client processing
|
||||
const etag = createHash('sha256').update(JSON.stringify(config)).digest('hex').substring(0, 16);
|
||||
const clientEtag = req.headers['if-none-match'];
|
||||
if (clientEtag === `"${etag}"`) {
|
||||
reply.code(304);
|
||||
return;
|
||||
}
|
||||
|
||||
reply.header('ETag', `"${etag}"`);
|
||||
reply.header('Cache-Control', 'private, max-age=60');
|
||||
return config;
|
||||
});
|
||||
|
||||
// ── Admin: query events ───────────────────────────────────────
|
||||
|
||||
@ -19,6 +19,7 @@ import {
|
||||
generateFingerprint,
|
||||
policyMatchesContext,
|
||||
mergePolicies,
|
||||
checkRateLimit,
|
||||
} from './routes.js';
|
||||
|
||||
// ─── Minimal valid event for reuse ──────────────────────────────────
|
||||
@ -648,3 +649,27 @@ describe('mergePolicies', () => {
|
||||
expect(config.modules.length).toBe(2);
|
||||
});
|
||||
});
|
||||
|
||||
// ─── checkRateLimit ─────────────────────────────────────────────
|
||||
|
||||
describe('checkRateLimit', () => {
|
||||
it('allows events within the limit', () => {
|
||||
const id = `test_rate_${Date.now()}`;
|
||||
expect(checkRateLimit(id, 50)).toBe(true);
|
||||
expect(checkRateLimit(id, 49)).toBe(true);
|
||||
});
|
||||
|
||||
it('rejects events that exceed the limit', () => {
|
||||
const id = `test_rate_exceed_${Date.now()}`;
|
||||
expect(checkRateLimit(id, 80)).toBe(true);
|
||||
expect(checkRateLimit(id, 30)).toBe(false); // 80 + 30 > 100
|
||||
});
|
||||
|
||||
it('different installIds have separate buckets', () => {
|
||||
const id1 = `test_rate_a_${Date.now()}`;
|
||||
const id2 = `test_rate_b_${Date.now()}`;
|
||||
expect(checkRateLimit(id1, 90)).toBe(true);
|
||||
expect(checkRateLimit(id2, 90)).toBe(true); // separate bucket
|
||||
expect(checkRateLimit(id1, 20)).toBe(false); // id1 exceeded
|
||||
});
|
||||
});
|
||||
|
||||
Loading…
Reference in New Issue
Block a user