From 81951b173aec4292b1b06abdbba4246cc87ad494 Mon Sep 17 00:00:00 2001 From: root Date: Tue, 31 Mar 2026 08:08:53 +0000 Subject: [PATCH] feat(extraction): back product rate limits with valkey --- docker-compose.ecosystem.yml | 2 + pnpm-lock.yaml | 87 +++++ services/extraction-service/Dockerfile | 4 +- services/extraction-service/package.json | 1 + services/extraction-service/src/lib/config.ts | 2 + .../extract/product-rate-limit.test.ts | 108 +++--- .../src/modules/extract/product-rate-limit.ts | 345 ++++++++++++++---- .../src/modules/extract/routes.ts | 14 +- 8 files changed, 429 insertions(+), 134 deletions(-) diff --git a/docker-compose.ecosystem.yml b/docker-compose.ecosystem.yml index d862afef..594fa28b 100644 --- a/docker-compose.ecosystem.yml +++ b/docker-compose.ecosystem.yml @@ -289,6 +289,8 @@ services: environment: - PORT=4005 - PYTHON_SIDECAR_URL=http://localhost:4006 + - PRODUCT_RATE_LIMIT_STORE=${PRODUCT_RATE_LIMIT_STORE:-valkey} + - VALKEY_URL=${VALKEY_URL:-redis://valkey:6379} depends_on: cosmos-emulator: condition: service_healthy diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index f7c49890..c10436d5 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -1009,6 +1009,9 @@ importers: jose: specifier: ^6.0.8 version: 6.1.3 + redis: + specifier: ^4.7.0 + version: 4.7.1 zod: specifier: ^3.24.2 version: 3.25.76 @@ -3779,6 +3782,35 @@ packages: '@types/react': optional: true + '@redis/bloom@1.2.0': + resolution: {integrity: sha512-HG2DFjYKbpNmVXsa0keLHp/3leGJz1mjh09f2RLGGLQZzSHpkmZWuwJbAvo3QcRY8p80m5+ZdXZdYOSBLlp7Cg==} + peerDependencies: + '@redis/client': ^1.0.0 + + '@redis/client@1.6.1': + resolution: {integrity: sha512-/KCsg3xSlR+nCK8/8ZYSknYxvXHwubJrU82F3Lm1Fp6789VQ0/3RJKfsmRXjqfaTA++23CvC3hqmqe/2GEt6Kw==} + engines: {node: '>=14'} + + '@redis/graph@1.1.1': + resolution: {integrity: sha512-FEMTcTHZozZciLRl6GiiIB4zGm5z5F3F6a6FZCyrfxdKOhFlGkiAqlexWMBzCi4DcRoyiOsuLfW+cjlGWyExOw==} + peerDependencies: + '@redis/client': ^1.0.0 + + '@redis/json@1.0.7': + resolution: {integrity: sha512-6UyXfjVaTBTJtKNG4/9Z8PSpKE6XgSyEb8iwaqDcy+uKrd/DGYHTWkUdnQDyzm727V7p21WUMhsqz5oy65kPcQ==} + peerDependencies: + '@redis/client': ^1.0.0 + + '@redis/search@1.2.0': + resolution: {integrity: sha512-tYoDBbtqOVigEDMAcTGsRlMycIIjwMCgD8eR2t0NANeQmgK/lvxNAvYyb6bZDD4frHRhIHkJu2TBRvB0ERkOmw==} + peerDependencies: + '@redis/client': ^1.0.0 + + '@redis/time-series@1.1.0': + resolution: {integrity: sha512-c1Q99M5ljsIuc4YdaCwfUEXsofakb9c8+Zse2qxTadu8TalLXuAESzLvFAvNVbkmSlvlzIQOLpBCmWI9wTOt+g==} + peerDependencies: + '@redis/client': ^1.0.0 + '@reduxjs/toolkit@2.11.2': resolution: {integrity: sha512-Kd6kAHTA6/nUpp8mySPqj3en3dm0tdMIgbttnQ1xFMVpufoj+ADi8pXLBsd4xzTRHQa7t/Jv8W5UnCuW4kuWMQ==} peerDependencies: @@ -5294,6 +5326,10 @@ packages: resolution: {integrity: sha512-eYm0QWBtUrBWZWG0d386OGAw16Z995PiOVo2B7bjWSbHedGl5e0ZWaq65kOGgUSNesEIDkB9ISbTg/JK9dhCZA==} engines: {node: '>=6'} + cluster-key-slot@1.1.2: + resolution: {integrity: sha512-RMr0FhtfXemyinomL4hrWcYJxmX6deFdCxpJzhDttxgO1+bcCnkk+9drydLVDmAMG7NE6aN/fl4F7ucU/90gAA==} + engines: {node: '>=0.10.0'} + code-block-writer@13.0.3: resolution: {integrity: sha512-Oofo0pq3IKnsFtuHqSF7TqBfr71aeyZDVJ0HpmqB7FBM2qEigL0iPONSCZSO9pE9dZTAxANe5XHG9Uy0YMv8cg==} @@ -6295,6 +6331,10 @@ packages: resolution: {integrity: sha512-SFdFmIJi+ybC0vjlHN0ZGVGHc3lgE0DxPAT0djjVg+kjOnSqclqmj0KQ7ykTOLP6YxoqOvuAODGdcHJn+43q3g==} engines: {node: '>= 0.4'} + generic-pool@3.9.0: + resolution: {integrity: sha512-hymDOu5B53XvN4QT9dBmZxPX4CWhBPPLguTZ9MMFeFa/Kg0xWVfylOVNlJji/E7yTZWFd/q9GO5TxDLq156D7g==} + engines: {node: '>= 4'} + gensync@1.0.0-beta.2: resolution: {integrity: sha512-3hN7NaskYvMDLQY55gnW3NQ+mesEAepTqlg+VEbj7zzqEMBVNhzcGYYeqFo/TlYz6eQiFcp1HcsCZO+nGgS8zg==} engines: {node: '>=6.9.0'} @@ -8385,6 +8425,9 @@ packages: resolution: {integrity: sha512-6tDA8g98We0zd0GvVeMT9arEOnTw9qM03L9cJXaCjrip1OO764RDBLBfrB4cwzNGDj5OA5ioymC9GkizgWJDUg==} engines: {node: '>=8'} + redis@4.7.1: + resolution: {integrity: sha512-S1bJDnqLftzHXHP8JsT5II/CtHWQrASX5K96REjWjlmWKrviSOLWmM7QnRLstAWsu1VBBV1ffV6DzCvxNP0UJQ==} + redux-thunk@3.1.0: resolution: {integrity: sha512-NW2r5T6ksUKXCabzhL9z+h206HQw/NJkcLm1GPImRQ8IzfXwRGqjVhKJGauHirT0DAuyy6hjdnMZaRoAcy0Klw==} peerDependencies: @@ -9621,6 +9664,9 @@ packages: yallist@3.1.1: resolution: {integrity: sha512-a4UGQaWPH59mOXUYnAG2ewncQS4i4F43Tv3JoAM+s2VDAmS9NsK8GpDMLrCHPksFT7h3K6TOoUNn2pb7RoXx4g==} + yallist@4.0.0: + resolution: {integrity: sha512-3wdGidZyq5PB084XLES5TpOSRA3wjXAlIWMhum2kRcv/41Sn2emQ0dycQW4uZXLejwKvg6EsvbdlVL+FYEct7A==} + yaml@2.8.2: resolution: {integrity: sha512-mplynKqc1C2hTVYxd0PU2xQAc22TI1vShAYGksCCfxbn/dFwnHTNi1bvYsBTkhdUNtGIf5xNOg938rrSSYvS9A==} engines: {node: '>= 14.6'} @@ -12942,6 +12988,32 @@ snapshots: optionalDependencies: '@types/react': 19.2.14 + '@redis/bloom@1.2.0(@redis/client@1.6.1)': + dependencies: + '@redis/client': 1.6.1 + + '@redis/client@1.6.1': + dependencies: + cluster-key-slot: 1.1.2 + generic-pool: 3.9.0 + yallist: 4.0.0 + + '@redis/graph@1.1.1(@redis/client@1.6.1)': + dependencies: + '@redis/client': 1.6.1 + + '@redis/json@1.0.7(@redis/client@1.6.1)': + dependencies: + '@redis/client': 1.6.1 + + '@redis/search@1.2.0(@redis/client@1.6.1)': + dependencies: + '@redis/client': 1.6.1 + + '@redis/time-series@1.1.0(@redis/client@1.6.1)': + dependencies: + '@redis/client': 1.6.1 + '@reduxjs/toolkit@2.11.2(react-redux@9.2.0(@types/react@19.2.14)(react@19.2.3)(redux@5.0.1))(react@19.2.3)': dependencies: '@standard-schema/spec': 1.1.0 @@ -14615,6 +14687,8 @@ snapshots: clsx@2.1.1: {} + cluster-key-slot@1.1.2: {} + code-block-writer@13.0.3: {} code-point-at@1.1.0: {} @@ -15900,6 +15974,8 @@ snapshots: generator-function@2.0.1: {} + generic-pool@3.9.0: {} + gensync@1.0.0-beta.2: {} get-caller-file@2.0.5: {} @@ -18638,6 +18714,15 @@ snapshots: indent-string: 4.0.0 strip-indent: 3.0.0 + redis@4.7.1: + dependencies: + '@redis/bloom': 1.2.0(@redis/client@1.6.1) + '@redis/client': 1.6.1 + '@redis/graph': 1.1.1(@redis/client@1.6.1) + '@redis/json': 1.0.7(@redis/client@1.6.1) + '@redis/search': 1.2.0(@redis/client@1.6.1) + '@redis/time-series': 1.1.0(@redis/client@1.6.1) + redux-thunk@3.1.0(redux@5.0.1): dependencies: redux: 5.0.1 @@ -20211,6 +20296,8 @@ snapshots: yallist@3.1.1: {} + yallist@4.0.0: {} + yaml@2.8.2: {} yargs-parser@21.1.1: {} diff --git a/services/extraction-service/Dockerfile b/services/extraction-service/Dockerfile index fbfb5acd..5dad49b3 100644 --- a/services/extraction-service/Dockerfile +++ b/services/extraction-service/Dockerfile @@ -76,8 +76,8 @@ COPY dashboards/admin-web/package.json dashboards/admin-web/ COPY dashboards/tracker-web/package.json dashboards/tracker-web/ COPY scripts/package.json scripts/ -# Install all workspace deps -RUN pnpm install --frozen-lockfile +# Install all workspace deps without running prepare hooks before sources exist. +RUN pnpm install --frozen-lockfile --ignore-scripts # Copy source COPY packages/ packages/ diff --git a/services/extraction-service/package.json b/services/extraction-service/package.json index 8ca793f3..8a6ed837 100644 --- a/services/extraction-service/package.json +++ b/services/extraction-service/package.json @@ -33,6 +33,7 @@ "fastify": "^5.2.1", "fastify-metrics": "^10.3.0", "jose": "^6.0.8", + "redis": "^4.7.0", "zod": "^3.24.2" }, "devDependencies": { diff --git a/services/extraction-service/src/lib/config.ts b/services/extraction-service/src/lib/config.ts index 9721ce78..3c87b72a 100644 --- a/services/extraction-service/src/lib/config.ts +++ b/services/extraction-service/src/lib/config.ts @@ -14,6 +14,8 @@ const envSchema = z.object({ PYTHON_SIDECAR_URL: z.string().default('http://localhost:4006'), DEFAULT_MODEL_ID: z.string().default('gemini-2.5-flash'), EXTRACTION_QUEUE_BACKEND: z.enum(['memory', 'file']).default('file'), + PRODUCT_RATE_LIMIT_STORE: z.enum(['memory', 'valkey']).default('memory'), + VALKEY_URL: z.string().optional(), EXTRACTION_QUEUE_FILE: z.string().optional(), EXTRACTION_QUEUE_POLL_MS: z.coerce.number().default(100), EXTRACTION_QUEUE_LEASE_MS: z.coerce.number().default(30000), diff --git a/services/extraction-service/src/modules/extract/product-rate-limit.test.ts b/services/extraction-service/src/modules/extract/product-rate-limit.test.ts index b7fda5c7..294062c6 100644 --- a/services/extraction-service/src/modules/extract/product-rate-limit.test.ts +++ b/services/extraction-service/src/modules/extract/product-rate-limit.test.ts @@ -5,6 +5,7 @@ import { describe, it, expect, beforeEach, vi } from 'vitest'; import { checkProductRateLimit, + clearAllProductRateLimits, getProductRateLimitStatus, getRateLimitSummary, resetProductRateLimit, @@ -15,53 +16,51 @@ import { describe('product-rate-limit', () => { const PRODUCT_ID = 'test-product'; - beforeEach(() => { - // Stop auto-cleanup and reset state before each test + beforeEach(async () => { stopRateLimitCleanup(); - resetProductRateLimit(PRODUCT_ID); + await clearAllProductRateLimits(); + await resetProductRateLimit(PRODUCT_ID); vi.restoreAllMocks(); }); describe('checkProductRateLimit', () => { - it('allows first request', () => { - const result = checkProductRateLimit(PRODUCT_ID); + it('allows first request', async () => { + const result = await checkProductRateLimit(PRODUCT_ID); expect(result.allowed).toBe(true); expect(result.remaining).toBeGreaterThan(0); }); - it('tracks request count', () => { - checkProductRateLimit(PRODUCT_ID); - checkProductRateLimit(PRODUCT_ID); - const result = checkProductRateLimit(PRODUCT_ID); + it('tracks request count', async () => { + await checkProductRateLimit(PRODUCT_ID); + await checkProductRateLimit(PRODUCT_ID); + const result = await checkProductRateLimit(PRODUCT_ID); expect(result.allowed).toBe(true); expect(result.remaining).toBeLessThan(100); }); - it('blocks when limit exceeded', () => { - // Make 100 requests (default limit) + it('blocks when limit exceeded', async () => { for (let i = 0; i < 100; i++) { - checkProductRateLimit(PRODUCT_ID); + await checkProductRateLimit(PRODUCT_ID); } - const result = checkProductRateLimit(PRODUCT_ID); + const result = await checkProductRateLimit(PRODUCT_ID); expect(result.allowed).toBe(false); expect(result.remaining).toBe(0); }); - it('includes reset time in response', () => { - const result = checkProductRateLimit(PRODUCT_ID); + it('includes reset time in response', async () => { + const result = await checkProductRateLimit(PRODUCT_ID); expect(result.resetAt).toBeGreaterThan(Date.now()); expect(result.resetAt).toBeLessThanOrEqual(Date.now() + 60000); }); - it('includes retry after when blocked', () => { - // Exhaust limit + it('includes retry after when blocked', async () => { for (let i = 0; i < 100; i++) { - checkProductRateLimit(PRODUCT_ID); + await checkProductRateLimit(PRODUCT_ID); } - const result = checkProductRateLimit(PRODUCT_ID); + const result = await checkProductRateLimit(PRODUCT_ID); expect(result.allowed).toBe(false); expect(result.retryAfter).toBeDefined(); expect(result.retryAfter).toBeGreaterThan(0); @@ -70,106 +69,99 @@ describe('product-rate-limit', () => { }); describe('getProductRateLimitStatus', () => { - it('returns current status without incrementing', () => { - // Use up some quota + it('returns current status without incrementing', async () => { for (let i = 0; i < 10; i++) { - checkProductRateLimit(PRODUCT_ID); + await checkProductRateLimit(PRODUCT_ID); } - const status1 = getProductRateLimitStatus(PRODUCT_ID); - const status2 = getProductRateLimitStatus(PRODUCT_ID); + const status1 = await getProductRateLimitStatus(PRODUCT_ID); + const status2 = await getProductRateLimitStatus(PRODUCT_ID); expect(status1.remaining).toBe(status2.remaining); }); - it('returns full quota for new product', () => { - const result = getProductRateLimitStatus('brand-new-product'); + it('returns full quota for new product', async () => { + const result = await getProductRateLimitStatus('brand-new-product'); expect(result.allowed).toBe(true); expect(result.remaining).toBe(100); }); }); describe('getRateLimitSummary', () => { - it('returns empty summary initially', () => { - const summary = getRateLimitSummary(); + it('returns empty summary initially', async () => { + const summary = await getRateLimitSummary(); expect(summary.totalProducts).toBe(0); expect(summary.products).toEqual([]); }); - it('includes products that have made requests', () => { - checkProductRateLimit('product-a'); - checkProductRateLimit('product-b'); + it('includes products that have made requests', async () => { + await checkProductRateLimit('product-a'); + await checkProductRateLimit('product-b'); - const summary = getRateLimitSummary(); + const summary = await getRateLimitSummary(); expect(summary.totalProducts).toBe(2); expect(summary.products.map(p => p.productId)).toContain('product-a'); expect(summary.products.map(p => p.productId)).toContain('product-b'); }); - it('aggregates requests per product', () => { - // Reset to ensure clean state - resetProductRateLimit('product-a'); + it('aggregates requests per product', async () => { + await resetProductRateLimit('product-a'); - checkProductRateLimit('product-a'); - checkProductRateLimit('product-a'); - checkProductRateLimit('product-a'); + await checkProductRateLimit('product-a'); + await checkProductRateLimit('product-a'); + await checkProductRateLimit('product-a'); - const summary = getRateLimitSummary(); + const summary = await getRateLimitSummary(); const productA = summary.products.find(p => p.productId === 'product-a'); expect(productA?.requests).toBe(3); }); }); describe('resetProductRateLimit', () => { - it('clears rate limit for product', () => { - // Use up quota + it('clears rate limit for product', async () => { for (let i = 0; i < 100; i++) { - checkProductRateLimit(PRODUCT_ID); + await checkProductRateLimit(PRODUCT_ID); } - const beforeReset = checkProductRateLimit(PRODUCT_ID); + const beforeReset = await checkProductRateLimit(PRODUCT_ID); expect(beforeReset.allowed).toBe(false); - resetProductRateLimit(PRODUCT_ID); + await resetProductRateLimit(PRODUCT_ID); - const afterReset = checkProductRateLimit(PRODUCT_ID); + const afterReset = await checkProductRateLimit(PRODUCT_ID); expect(afterReset.allowed).toBe(true); expect(afterReset.remaining).toBe(99); }); }); describe('cleanupRateLimitStore', () => { - it('removes expired entries', () => { - // Create some entries - checkProductRateLimit('old-product'); + it('removes expired entries', async () => { + await checkProductRateLimit('old-product'); - // Fast forward time to expire entries vi.useFakeTimers(); vi.advanceTimersByTime(60001); - const cleaned = cleanupRateLimitStore(); + const cleaned = await cleanupRateLimitStore(); expect(cleaned).toBeGreaterThanOrEqual(0); vi.useRealTimers(); }); - it('keeps current entries', () => { - checkProductRateLimit(PRODUCT_ID); + it('keeps current entries', async () => { + await checkProductRateLimit(PRODUCT_ID); - const cleaned = cleanupRateLimitStore(); + const cleaned = await cleanupRateLimitStore(); expect(cleaned).toBe(0); }); }); describe('rate limiting per product isolation', () => { - it('isolates limits between products', () => { - // Exhaust limit for product A + it('isolates limits between products', async () => { for (let i = 0; i < 100; i++) { - checkProductRateLimit('product-a'); + await checkProductRateLimit('product-a'); } - // Product B should still have quota - const productBResult = checkProductRateLimit('product-b'); + const productBResult = await checkProductRateLimit('product-b'); expect(productBResult.allowed).toBe(true); expect(productBResult.remaining).toBe(99); }); diff --git a/services/extraction-service/src/modules/extract/product-rate-limit.ts b/services/extraction-service/src/modules/extract/product-rate-limit.ts index 08a331a8..99093169 100644 --- a/services/extraction-service/src/modules/extract/product-rate-limit.ts +++ b/services/extraction-service/src/modules/extract/product-rate-limit.ts @@ -1,3 +1,5 @@ +import { createClient } from 'redis'; + /** * Per-product rate limiting for extraction service. * @@ -21,6 +23,10 @@ export interface ProductRateLimitConfig { // Default limits (can be overridden via env) const DEFAULT_PRODUCT_RPM = parseInt(process.env.PRODUCT_RATE_LIMIT_RPM || '100', 10); const WINDOW_MS = 60_000; // 1 minute +const STORE_MODE = process.env.PRODUCT_RATE_LIMIT_STORE === 'valkey' ? 'valkey' : 'memory'; +const VALKEY_URL = process.env.VALKEY_URL || 'redis://valkey:6379'; +const VALKEY_PREFIX = 'extraction:product-rate-limit'; +type ValkeyClient = ReturnType; // ── In-memory rate limit store ────────────────────────────────── @@ -31,10 +37,82 @@ interface RateLimitEntry { } const rateLimitStore = new Map(); +let valkeyClient: ValkeyClient | null = null; +let valkeyConnectPromise: Promise | null = null; +let loggedValkeyFallback = false; -function getStoreKey(productId: string): string { - const windowIndex = Math.floor(Date.now() / WINDOW_MS); - return `${productId}:${windowIndex}`; +function getWindowIndex(now = Date.now()): number { + return Math.floor(now / WINDOW_MS); +} + +function getStoreKey(productId: string, now = Date.now()): string { + return `${productId}:${getWindowIndex(now)}`; +} + +function getValkeyKey(productId: string, now = Date.now()): string { + return `${VALKEY_PREFIX}:${getWindowIndex(now)}:${encodeURIComponent(productId)}`; +} + +function getWindowResetAt(now = Date.now()): number { + return (getWindowIndex(now) + 1) * WINDOW_MS; +} + +function shouldUseValkey(): boolean { + return STORE_MODE === 'valkey' && process.env.NODE_ENV !== 'test'; +} + +function logValkeyFallback(err: unknown): void { + if (loggedValkeyFallback) { + return; + } + loggedValkeyFallback = true; + const detail = err instanceof Error ? err.message : String(err); + // eslint-disable-next-line no-console + console.warn(`[product-rate-limit] Falling back to in-memory store: ${detail}`); +} + +async function getValkeyClient(): Promise { + if (!shouldUseValkey()) { + return null; + } + + if (valkeyClient?.isOpen) { + return valkeyClient; + } + + if (!valkeyConnectPromise) { + const client = createClient({ url: VALKEY_URL }); + client.on('error', err => { + logValkeyFallback(err); + }); + valkeyConnectPromise = client + .connect() + .then(() => { + loggedValkeyFallback = false; + valkeyClient = client; + return client; + }) + .catch(err => { + logValkeyFallback(err); + return null; + }) + .finally(() => { + valkeyConnectPromise = null; + }); + } + + return valkeyConnectPromise; +} + +function buildResult(now: number, count: number, resetAt: number): RateLimitResult { + return { + allowed: count <= DEFAULT_PRODUCT_RPM, + limit: DEFAULT_PRODUCT_RPM, + remaining: Math.max(0, DEFAULT_PRODUCT_RPM - count), + resetAt, + retryAfter: + count > DEFAULT_PRODUCT_RPM ? Math.ceil(Math.max(0, resetAt - now) / 1000) : undefined, + }; } // ── Rate limiting functions ───────────────────────────────────── @@ -47,84 +125,89 @@ export interface RateLimitResult { retryAfter?: number; } -/** - * Check if request is within product rate limit. - */ -export function checkProductRateLimit(productId: string): RateLimitResult { - const key = getStoreKey(productId); +function checkProductRateLimitMemory(productId: string): RateLimitResult { const now = Date.now(); - const resetAt = (Math.floor(now / WINDOW_MS) + 1) * WINDOW_MS; - + const key = getStoreKey(productId, now); + const resetAt = getWindowResetAt(now); const entry = rateLimitStore.get(key); if (!entry || now >= entry.resetAt) { - // New window rateLimitStore.set(key, { count: 1, windowStart: now, resetAt, }); - return { - allowed: true, - limit: DEFAULT_PRODUCT_RPM, - remaining: DEFAULT_PRODUCT_RPM - 1, - resetAt, - }; - } - - // Existing window - if (entry.count >= DEFAULT_PRODUCT_RPM) { - return { - allowed: false, - limit: DEFAULT_PRODUCT_RPM, - remaining: 0, - resetAt: entry.resetAt, - retryAfter: Math.ceil((entry.resetAt - now) / 1000), - }; + return buildResult(now, 1, resetAt); } entry.count++; - return { - allowed: true, - limit: DEFAULT_PRODUCT_RPM, - remaining: DEFAULT_PRODUCT_RPM - entry.count, - resetAt: entry.resetAt, - }; + return buildResult(now, entry.count, entry.resetAt); +} + +async function checkProductRateLimitValkey(productId: string): Promise { + const now = Date.now(); + const client = await getValkeyClient(); + if (!client) { + return checkProductRateLimitMemory(productId); + } + + const resetAt = getWindowResetAt(now); + const key = getValkeyKey(productId, now); + const ttlSeconds = Math.max(1, Math.ceil((resetAt - now) / 1000) + 5); + const count = await client.incr(key); + + if (count === 1) { + await client.expire(key, ttlSeconds); + } + + return buildResult(now, count, resetAt); +} + +/** + * Check if request is within product rate limit. + */ +export async function checkProductRateLimit(productId: string): Promise { + if (shouldUseValkey()) { + return checkProductRateLimitValkey(productId); + } + return checkProductRateLimitMemory(productId); +} + +function getProductRateLimitStatusMemory(productId: string): RateLimitResult { + const now = Date.now(); + const key = getStoreKey(productId, now); + const resetAt = getWindowResetAt(now); + const entry = rateLimitStore.get(key); + + if (!entry || now >= entry.resetAt) { + return buildResult(now, 0, resetAt); + } + + return buildResult(now, entry.count, entry.resetAt); +} + +async function getProductRateLimitStatusValkey(productId: string): Promise { + const now = Date.now(); + const client = await getValkeyClient(); + if (!client) { + return getProductRateLimitStatusMemory(productId); + } + + const count = parseInt((await client.get(getValkeyKey(productId, now))) || '0', 10); + return buildResult(now, count, getWindowResetAt(now)); } /** * Get current rate limit status for a product (without incrementing). */ -export function getProductRateLimitStatus(productId: string): RateLimitResult { - const key = getStoreKey(productId); - const now = Date.now(); - const resetAt = (Math.floor(now / WINDOW_MS) + 1) * WINDOW_MS; - - const entry = rateLimitStore.get(key); - - if (!entry || now >= entry.resetAt) { - return { - allowed: true, - limit: DEFAULT_PRODUCT_RPM, - remaining: DEFAULT_PRODUCT_RPM, - resetAt, - }; +export async function getProductRateLimitStatus(productId: string): Promise { + if (shouldUseValkey()) { + return getProductRateLimitStatusValkey(productId); } - - const remaining = Math.max(0, DEFAULT_PRODUCT_RPM - entry.count); - return { - allowed: remaining > 0, - limit: DEFAULT_PRODUCT_RPM, - remaining, - resetAt: entry.resetAt, - retryAfter: remaining === 0 ? Math.ceil((entry.resetAt - now) / 1000) : undefined, - }; + return getProductRateLimitStatusMemory(productId); } -/** - * Get rate limit summary across all products. - */ -export function getRateLimitSummary(): { +function getRateLimitSummaryMemory(): { products: Array<{ productId: string; currentWindow: number; @@ -152,7 +235,57 @@ export function getRateLimitSummary(): { return { products: [...products.entries()].map(([productId, data]) => ({ productId, - currentWindow: Math.floor(now / WINDOW_MS), + currentWindow: getWindowIndex(now), + requests: data.count, + limit: DEFAULT_PRODUCT_RPM, + resetAt: data.resetAt, + })), + totalProducts: products.size, + }; +} + +async function getRateLimitSummaryValkey(): Promise<{ + products: Array<{ + productId: string; + currentWindow: number; + requests: number; + limit: number; + resetAt: number; + }>; + totalProducts: number; +}> { + const client = await getValkeyClient(); + if (!client) { + return getRateLimitSummaryMemory(); + } + + const products = new Map(); + for await (const key of client.scanIterator({ MATCH: `${VALKEY_PREFIX}:*`, COUNT: 100 })) { + const value = await client.get(key); + if (!value) { + continue; + } + + const [, , , windowIndexRaw, ...productParts] = key.split(':'); + const windowIndex = parseInt(windowIndexRaw || '0', 10); + const productId = decodeURIComponent(productParts.join(':')); + const count = parseInt(value, 10); + const resetAt = (windowIndex + 1) * WINDOW_MS; + const existing = products.get(productId); + + if (existing) { + existing.count += count; + existing.resetAt = Math.max(existing.resetAt, resetAt); + existing.currentWindow = Math.max(existing.currentWindow, windowIndex); + } else { + products.set(productId, { count, resetAt, currentWindow: windowIndex }); + } + } + + return { + products: [...products.entries()].map(([productId, data]) => ({ + productId, + currentWindow: data.currentWindow, requests: data.count, limit: DEFAULT_PRODUCT_RPM, resetAt: data.resetAt, @@ -161,24 +294,83 @@ export function getRateLimitSummary(): { }; } +/** + * Get rate limit summary across all products. + */ +export async function getRateLimitSummary(): Promise<{ + products: Array<{ + productId: string; + currentWindow: number; + requests: number; + limit: number; + resetAt: number; + }>; + totalProducts: number; +}> { + if (shouldUseValkey()) { + return getRateLimitSummaryValkey(); + } + return getRateLimitSummaryMemory(); +} + +async function resetProductRateLimitValkey(productId: string): Promise { + const now = Date.now(); + const currentWindow = getWindowIndex(now); + const client = await getValkeyClient(); + if (!client) { + return; + } + + await client.del([ + `${VALKEY_PREFIX}:${currentWindow - 1}:${encodeURIComponent(productId)}`, + `${VALKEY_PREFIX}:${currentWindow}:${encodeURIComponent(productId)}`, + `${VALKEY_PREFIX}:${currentWindow + 1}:${encodeURIComponent(productId)}`, + ]); +} + /** * Reset rate limit for a product (admin operation). */ -export function resetProductRateLimit(productId: string): void { +export async function resetProductRateLimit(productId: string): Promise { const now = Date.now(); - const currentWindow = Math.floor(now / WINDOW_MS); + const currentWindow = getWindowIndex(now); - // Clear current and next window entries for (let i = -1; i <= 1; i++) { rateLimitStore.delete(`${productId}:${currentWindow + i}`); } + + if (shouldUseValkey()) { + await resetProductRateLimitValkey(productId); + } +} + +export async function clearAllProductRateLimits(): Promise { + rateLimitStore.clear(); + + const client = await getValkeyClient(); + if (!client) { + return; + } + + const keys: string[] = []; + for await (const key of client.scanIterator({ MATCH: `${VALKEY_PREFIX}:*`, COUNT: 100 })) { + keys.push(key); + } + + if (keys.length > 0) { + await client.del(keys); + } } /** * Cleanup old entries from rate limit store. * Called periodically to prevent memory growth. */ -export function cleanupRateLimitStore(): number { +export async function cleanupRateLimitStore(): Promise { + if (shouldUseValkey()) { + return 0; + } + const now = Date.now(); let cleaned = 0; @@ -200,12 +392,17 @@ let cleanupInterval: ReturnType | null = null; * Called during server startup. */ export function startRateLimitCleanup(): void { + if (shouldUseValkey()) { + return; + } + if (cleanupInterval) { clearInterval(cleanupInterval); } + cleanupInterval = setInterval( - () => { - const cleaned = cleanupRateLimitStore(); + async () => { + const cleaned = await cleanupRateLimitStore(); if (cleaned > 0 && process.env.NODE_ENV === 'development') { // eslint-disable-next-line no-console console.log(`[product-rate-limit] Cleaned up ${cleaned} expired entries`); @@ -225,6 +422,18 @@ export function stopRateLimitCleanup(): void { } } +export function getProductRateLimitStoreMode(): 'memory' | 'valkey' { + return STORE_MODE; +} + +export async function closeProductRateLimitStore(): Promise { + stopRateLimitCleanup(); + if (valkeyClient?.isOpen) { + await valkeyClient.quit(); + } + valkeyClient = null; +} + // Auto-start in production/development (not test) if (process.env.NODE_ENV !== 'test') { startRateLimitCleanup(); diff --git a/services/extraction-service/src/modules/extract/routes.ts b/services/extraction-service/src/modules/extract/routes.ts index db368dca..08a43291 100644 --- a/services/extraction-service/src/modules/extract/routes.ts +++ b/services/extraction-service/src/modules/extract/routes.ts @@ -16,6 +16,7 @@ import { sidecarBreaker } from '../../lib/circuit-breaker.js'; import { createJob, getJob, initJobQueue, listJobs, shutdownJobQueue } from './jobs.js'; import { checkProductRateLimit, + closeProductRateLimitStore, getProductRateLimitStatus, getRateLimitSummary, resetProductRateLimit, @@ -105,6 +106,7 @@ export async function extractRoutes(app: FastifyInstance) { startHealthMonitoring(); await initJobQueue(app.log); app.addHook('onClose', async () => { + await closeProductRateLimitStore(); await shutdownJobQueue(); }); @@ -129,7 +131,7 @@ export async function extractRoutes(app: FastifyInstance) { // Check per-product rate limit if (headerProductId) { - const productLimit = checkProductRateLimit(headerProductId); + const productLimit = await checkProductRateLimit(headerProductId); if (!productLimit.allowed) { reply.header('X-RateLimit-Limit', String(productLimit.limit)); reply.header('X-RateLimit-Remaining', '0'); @@ -358,7 +360,7 @@ export async function extractRoutes(app: FastifyInstance) { // Check per-product rate limit for async jobs const headerProductId = (req.headers['x-product-id'] as string) || productId; if (headerProductId) { - const productLimit = checkProductRateLimit(headerProductId); + const productLimit = await checkProductRateLimit(headerProductId); if (!productLimit.allowed) { reply.header('X-RateLimit-Limit', String(productLimit.limit)); reply.header('X-RateLimit-Remaining', '0'); @@ -550,9 +552,9 @@ export async function extractRoutes(app: FastifyInstance) { app.get('/extract/rate-limits/product', async (req, reply) => { const productId = (req.query as Record).productId; if (productId) { - return reply.send(getProductRateLimitStatus(productId)); + return reply.send(await getProductRateLimitStatus(productId)); } - return reply.send(getRateLimitSummary()); + return reply.send(await getRateLimitSummary()); }); /** @@ -563,12 +565,12 @@ export async function extractRoutes(app: FastifyInstance) { if (!productId) { throw new BadRequestError('productId is required'); } - resetProductRateLimit(productId); + await resetProductRateLimit(productId); req.log.info({ productId }, 'product rate limit reset'); return reply.send({ productId, reset: true, - newStatus: getProductRateLimitStatus(productId), + newStatus: await getProductRateLimitStatus(productId), }); });