feat(extraction): back product rate limits with valkey

This commit is contained in:
root 2026-03-31 08:08:53 +00:00
parent 89f2f1288b
commit 81951b173a
8 changed files with 429 additions and 134 deletions

View File

@ -289,6 +289,8 @@ services:
environment:
- PORT=4005
- PYTHON_SIDECAR_URL=http://localhost:4006
- PRODUCT_RATE_LIMIT_STORE=${PRODUCT_RATE_LIMIT_STORE:-valkey}
- VALKEY_URL=${VALKEY_URL:-redis://valkey:6379}
depends_on:
cosmos-emulator:
condition: service_healthy

87
pnpm-lock.yaml generated
View File

@ -1009,6 +1009,9 @@ importers:
jose:
specifier: ^6.0.8
version: 6.1.3
redis:
specifier: ^4.7.0
version: 4.7.1
zod:
specifier: ^3.24.2
version: 3.25.76
@ -3779,6 +3782,35 @@ packages:
'@types/react':
optional: true
'@redis/bloom@1.2.0':
resolution: {integrity: sha512-HG2DFjYKbpNmVXsa0keLHp/3leGJz1mjh09f2RLGGLQZzSHpkmZWuwJbAvo3QcRY8p80m5+ZdXZdYOSBLlp7Cg==}
peerDependencies:
'@redis/client': ^1.0.0
'@redis/client@1.6.1':
resolution: {integrity: sha512-/KCsg3xSlR+nCK8/8ZYSknYxvXHwubJrU82F3Lm1Fp6789VQ0/3RJKfsmRXjqfaTA++23CvC3hqmqe/2GEt6Kw==}
engines: {node: '>=14'}
'@redis/graph@1.1.1':
resolution: {integrity: sha512-FEMTcTHZozZciLRl6GiiIB4zGm5z5F3F6a6FZCyrfxdKOhFlGkiAqlexWMBzCi4DcRoyiOsuLfW+cjlGWyExOw==}
peerDependencies:
'@redis/client': ^1.0.0
'@redis/json@1.0.7':
resolution: {integrity: sha512-6UyXfjVaTBTJtKNG4/9Z8PSpKE6XgSyEb8iwaqDcy+uKrd/DGYHTWkUdnQDyzm727V7p21WUMhsqz5oy65kPcQ==}
peerDependencies:
'@redis/client': ^1.0.0
'@redis/search@1.2.0':
resolution: {integrity: sha512-tYoDBbtqOVigEDMAcTGsRlMycIIjwMCgD8eR2t0NANeQmgK/lvxNAvYyb6bZDD4frHRhIHkJu2TBRvB0ERkOmw==}
peerDependencies:
'@redis/client': ^1.0.0
'@redis/time-series@1.1.0':
resolution: {integrity: sha512-c1Q99M5ljsIuc4YdaCwfUEXsofakb9c8+Zse2qxTadu8TalLXuAESzLvFAvNVbkmSlvlzIQOLpBCmWI9wTOt+g==}
peerDependencies:
'@redis/client': ^1.0.0
'@reduxjs/toolkit@2.11.2':
resolution: {integrity: sha512-Kd6kAHTA6/nUpp8mySPqj3en3dm0tdMIgbttnQ1xFMVpufoj+ADi8pXLBsd4xzTRHQa7t/Jv8W5UnCuW4kuWMQ==}
peerDependencies:
@ -5294,6 +5326,10 @@ packages:
resolution: {integrity: sha512-eYm0QWBtUrBWZWG0d386OGAw16Z995PiOVo2B7bjWSbHedGl5e0ZWaq65kOGgUSNesEIDkB9ISbTg/JK9dhCZA==}
engines: {node: '>=6'}
cluster-key-slot@1.1.2:
resolution: {integrity: sha512-RMr0FhtfXemyinomL4hrWcYJxmX6deFdCxpJzhDttxgO1+bcCnkk+9drydLVDmAMG7NE6aN/fl4F7ucU/90gAA==}
engines: {node: '>=0.10.0'}
code-block-writer@13.0.3:
resolution: {integrity: sha512-Oofo0pq3IKnsFtuHqSF7TqBfr71aeyZDVJ0HpmqB7FBM2qEigL0iPONSCZSO9pE9dZTAxANe5XHG9Uy0YMv8cg==}
@ -6295,6 +6331,10 @@ packages:
resolution: {integrity: sha512-SFdFmIJi+ybC0vjlHN0ZGVGHc3lgE0DxPAT0djjVg+kjOnSqclqmj0KQ7ykTOLP6YxoqOvuAODGdcHJn+43q3g==}
engines: {node: '>= 0.4'}
generic-pool@3.9.0:
resolution: {integrity: sha512-hymDOu5B53XvN4QT9dBmZxPX4CWhBPPLguTZ9MMFeFa/Kg0xWVfylOVNlJji/E7yTZWFd/q9GO5TxDLq156D7g==}
engines: {node: '>= 4'}
gensync@1.0.0-beta.2:
resolution: {integrity: sha512-3hN7NaskYvMDLQY55gnW3NQ+mesEAepTqlg+VEbj7zzqEMBVNhzcGYYeqFo/TlYz6eQiFcp1HcsCZO+nGgS8zg==}
engines: {node: '>=6.9.0'}
@ -8385,6 +8425,9 @@ packages:
resolution: {integrity: sha512-6tDA8g98We0zd0GvVeMT9arEOnTw9qM03L9cJXaCjrip1OO764RDBLBfrB4cwzNGDj5OA5ioymC9GkizgWJDUg==}
engines: {node: '>=8'}
redis@4.7.1:
resolution: {integrity: sha512-S1bJDnqLftzHXHP8JsT5II/CtHWQrASX5K96REjWjlmWKrviSOLWmM7QnRLstAWsu1VBBV1ffV6DzCvxNP0UJQ==}
redux-thunk@3.1.0:
resolution: {integrity: sha512-NW2r5T6ksUKXCabzhL9z+h206HQw/NJkcLm1GPImRQ8IzfXwRGqjVhKJGauHirT0DAuyy6hjdnMZaRoAcy0Klw==}
peerDependencies:
@ -9621,6 +9664,9 @@ packages:
yallist@3.1.1:
resolution: {integrity: sha512-a4UGQaWPH59mOXUYnAG2ewncQS4i4F43Tv3JoAM+s2VDAmS9NsK8GpDMLrCHPksFT7h3K6TOoUNn2pb7RoXx4g==}
yallist@4.0.0:
resolution: {integrity: sha512-3wdGidZyq5PB084XLES5TpOSRA3wjXAlIWMhum2kRcv/41Sn2emQ0dycQW4uZXLejwKvg6EsvbdlVL+FYEct7A==}
yaml@2.8.2:
resolution: {integrity: sha512-mplynKqc1C2hTVYxd0PU2xQAc22TI1vShAYGksCCfxbn/dFwnHTNi1bvYsBTkhdUNtGIf5xNOg938rrSSYvS9A==}
engines: {node: '>= 14.6'}
@ -12942,6 +12988,32 @@ snapshots:
optionalDependencies:
'@types/react': 19.2.14
'@redis/bloom@1.2.0(@redis/client@1.6.1)':
dependencies:
'@redis/client': 1.6.1
'@redis/client@1.6.1':
dependencies:
cluster-key-slot: 1.1.2
generic-pool: 3.9.0
yallist: 4.0.0
'@redis/graph@1.1.1(@redis/client@1.6.1)':
dependencies:
'@redis/client': 1.6.1
'@redis/json@1.0.7(@redis/client@1.6.1)':
dependencies:
'@redis/client': 1.6.1
'@redis/search@1.2.0(@redis/client@1.6.1)':
dependencies:
'@redis/client': 1.6.1
'@redis/time-series@1.1.0(@redis/client@1.6.1)':
dependencies:
'@redis/client': 1.6.1
'@reduxjs/toolkit@2.11.2(react-redux@9.2.0(@types/react@19.2.14)(react@19.2.3)(redux@5.0.1))(react@19.2.3)':
dependencies:
'@standard-schema/spec': 1.1.0
@ -14615,6 +14687,8 @@ snapshots:
clsx@2.1.1: {}
cluster-key-slot@1.1.2: {}
code-block-writer@13.0.3: {}
code-point-at@1.1.0: {}
@ -15900,6 +15974,8 @@ snapshots:
generator-function@2.0.1: {}
generic-pool@3.9.0: {}
gensync@1.0.0-beta.2: {}
get-caller-file@2.0.5: {}
@ -18638,6 +18714,15 @@ snapshots:
indent-string: 4.0.0
strip-indent: 3.0.0
redis@4.7.1:
dependencies:
'@redis/bloom': 1.2.0(@redis/client@1.6.1)
'@redis/client': 1.6.1
'@redis/graph': 1.1.1(@redis/client@1.6.1)
'@redis/json': 1.0.7(@redis/client@1.6.1)
'@redis/search': 1.2.0(@redis/client@1.6.1)
'@redis/time-series': 1.1.0(@redis/client@1.6.1)
redux-thunk@3.1.0(redux@5.0.1):
dependencies:
redux: 5.0.1
@ -20211,6 +20296,8 @@ snapshots:
yallist@3.1.1: {}
yallist@4.0.0: {}
yaml@2.8.2: {}
yargs-parser@21.1.1: {}

View File

@ -76,8 +76,8 @@ COPY dashboards/admin-web/package.json dashboards/admin-web/
COPY dashboards/tracker-web/package.json dashboards/tracker-web/
COPY scripts/package.json scripts/
# Install all workspace deps
RUN pnpm install --frozen-lockfile
# Install all workspace deps without running prepare hooks before sources exist.
RUN pnpm install --frozen-lockfile --ignore-scripts
# Copy source
COPY packages/ packages/

View File

@ -33,6 +33,7 @@
"fastify": "^5.2.1",
"fastify-metrics": "^10.3.0",
"jose": "^6.0.8",
"redis": "^4.7.0",
"zod": "^3.24.2"
},
"devDependencies": {

View File

@ -14,6 +14,8 @@ const envSchema = z.object({
PYTHON_SIDECAR_URL: z.string().default('http://localhost:4006'),
DEFAULT_MODEL_ID: z.string().default('gemini-2.5-flash'),
EXTRACTION_QUEUE_BACKEND: z.enum(['memory', 'file']).default('file'),
PRODUCT_RATE_LIMIT_STORE: z.enum(['memory', 'valkey']).default('memory'),
VALKEY_URL: z.string().optional(),
EXTRACTION_QUEUE_FILE: z.string().optional(),
EXTRACTION_QUEUE_POLL_MS: z.coerce.number().default(100),
EXTRACTION_QUEUE_LEASE_MS: z.coerce.number().default(30000),

View File

@ -5,6 +5,7 @@
import { describe, it, expect, beforeEach, vi } from 'vitest';
import {
checkProductRateLimit,
clearAllProductRateLimits,
getProductRateLimitStatus,
getRateLimitSummary,
resetProductRateLimit,
@ -15,53 +16,51 @@ import {
describe('product-rate-limit', () => {
const PRODUCT_ID = 'test-product';
beforeEach(() => {
// Stop auto-cleanup and reset state before each test
beforeEach(async () => {
stopRateLimitCleanup();
resetProductRateLimit(PRODUCT_ID);
await clearAllProductRateLimits();
await resetProductRateLimit(PRODUCT_ID);
vi.restoreAllMocks();
});
describe('checkProductRateLimit', () => {
it('allows first request', () => {
const result = checkProductRateLimit(PRODUCT_ID);
it('allows first request', async () => {
const result = await checkProductRateLimit(PRODUCT_ID);
expect(result.allowed).toBe(true);
expect(result.remaining).toBeGreaterThan(0);
});
it('tracks request count', () => {
checkProductRateLimit(PRODUCT_ID);
checkProductRateLimit(PRODUCT_ID);
const result = checkProductRateLimit(PRODUCT_ID);
it('tracks request count', async () => {
await checkProductRateLimit(PRODUCT_ID);
await checkProductRateLimit(PRODUCT_ID);
const result = await checkProductRateLimit(PRODUCT_ID);
expect(result.allowed).toBe(true);
expect(result.remaining).toBeLessThan(100);
});
it('blocks when limit exceeded', () => {
// Make 100 requests (default limit)
it('blocks when limit exceeded', async () => {
for (let i = 0; i < 100; i++) {
checkProductRateLimit(PRODUCT_ID);
await checkProductRateLimit(PRODUCT_ID);
}
const result = checkProductRateLimit(PRODUCT_ID);
const result = await checkProductRateLimit(PRODUCT_ID);
expect(result.allowed).toBe(false);
expect(result.remaining).toBe(0);
});
it('includes reset time in response', () => {
const result = checkProductRateLimit(PRODUCT_ID);
it('includes reset time in response', async () => {
const result = await checkProductRateLimit(PRODUCT_ID);
expect(result.resetAt).toBeGreaterThan(Date.now());
expect(result.resetAt).toBeLessThanOrEqual(Date.now() + 60000);
});
it('includes retry after when blocked', () => {
// Exhaust limit
it('includes retry after when blocked', async () => {
for (let i = 0; i < 100; i++) {
checkProductRateLimit(PRODUCT_ID);
await checkProductRateLimit(PRODUCT_ID);
}
const result = checkProductRateLimit(PRODUCT_ID);
const result = await checkProductRateLimit(PRODUCT_ID);
expect(result.allowed).toBe(false);
expect(result.retryAfter).toBeDefined();
expect(result.retryAfter).toBeGreaterThan(0);
@ -70,106 +69,99 @@ describe('product-rate-limit', () => {
});
describe('getProductRateLimitStatus', () => {
it('returns current status without incrementing', () => {
// Use up some quota
it('returns current status without incrementing', async () => {
for (let i = 0; i < 10; i++) {
checkProductRateLimit(PRODUCT_ID);
await checkProductRateLimit(PRODUCT_ID);
}
const status1 = getProductRateLimitStatus(PRODUCT_ID);
const status2 = getProductRateLimitStatus(PRODUCT_ID);
const status1 = await getProductRateLimitStatus(PRODUCT_ID);
const status2 = await getProductRateLimitStatus(PRODUCT_ID);
expect(status1.remaining).toBe(status2.remaining);
});
it('returns full quota for new product', () => {
const result = getProductRateLimitStatus('brand-new-product');
it('returns full quota for new product', async () => {
const result = await getProductRateLimitStatus('brand-new-product');
expect(result.allowed).toBe(true);
expect(result.remaining).toBe(100);
});
});
describe('getRateLimitSummary', () => {
it('returns empty summary initially', () => {
const summary = getRateLimitSummary();
it('returns empty summary initially', async () => {
const summary = await getRateLimitSummary();
expect(summary.totalProducts).toBe(0);
expect(summary.products).toEqual([]);
});
it('includes products that have made requests', () => {
checkProductRateLimit('product-a');
checkProductRateLimit('product-b');
it('includes products that have made requests', async () => {
await checkProductRateLimit('product-a');
await checkProductRateLimit('product-b');
const summary = getRateLimitSummary();
const summary = await getRateLimitSummary();
expect(summary.totalProducts).toBe(2);
expect(summary.products.map(p => p.productId)).toContain('product-a');
expect(summary.products.map(p => p.productId)).toContain('product-b');
});
it('aggregates requests per product', () => {
// Reset to ensure clean state
resetProductRateLimit('product-a');
it('aggregates requests per product', async () => {
await resetProductRateLimit('product-a');
checkProductRateLimit('product-a');
checkProductRateLimit('product-a');
checkProductRateLimit('product-a');
await checkProductRateLimit('product-a');
await checkProductRateLimit('product-a');
await checkProductRateLimit('product-a');
const summary = getRateLimitSummary();
const summary = await getRateLimitSummary();
const productA = summary.products.find(p => p.productId === 'product-a');
expect(productA?.requests).toBe(3);
});
});
describe('resetProductRateLimit', () => {
it('clears rate limit for product', () => {
// Use up quota
it('clears rate limit for product', async () => {
for (let i = 0; i < 100; i++) {
checkProductRateLimit(PRODUCT_ID);
await checkProductRateLimit(PRODUCT_ID);
}
const beforeReset = checkProductRateLimit(PRODUCT_ID);
const beforeReset = await checkProductRateLimit(PRODUCT_ID);
expect(beforeReset.allowed).toBe(false);
resetProductRateLimit(PRODUCT_ID);
await resetProductRateLimit(PRODUCT_ID);
const afterReset = checkProductRateLimit(PRODUCT_ID);
const afterReset = await checkProductRateLimit(PRODUCT_ID);
expect(afterReset.allowed).toBe(true);
expect(afterReset.remaining).toBe(99);
});
});
describe('cleanupRateLimitStore', () => {
it('removes expired entries', () => {
// Create some entries
checkProductRateLimit('old-product');
it('removes expired entries', async () => {
await checkProductRateLimit('old-product');
// Fast forward time to expire entries
vi.useFakeTimers();
vi.advanceTimersByTime(60001);
const cleaned = cleanupRateLimitStore();
const cleaned = await cleanupRateLimitStore();
expect(cleaned).toBeGreaterThanOrEqual(0);
vi.useRealTimers();
});
it('keeps current entries', () => {
checkProductRateLimit(PRODUCT_ID);
it('keeps current entries', async () => {
await checkProductRateLimit(PRODUCT_ID);
const cleaned = cleanupRateLimitStore();
const cleaned = await cleanupRateLimitStore();
expect(cleaned).toBe(0);
});
});
describe('rate limiting per product isolation', () => {
it('isolates limits between products', () => {
// Exhaust limit for product A
it('isolates limits between products', async () => {
for (let i = 0; i < 100; i++) {
checkProductRateLimit('product-a');
await checkProductRateLimit('product-a');
}
// Product B should still have quota
const productBResult = checkProductRateLimit('product-b');
const productBResult = await checkProductRateLimit('product-b');
expect(productBResult.allowed).toBe(true);
expect(productBResult.remaining).toBe(99);
});

View File

@ -1,3 +1,5 @@
import { createClient } from 'redis';
/**
* Per-product rate limiting for extraction service.
*
@ -21,6 +23,10 @@ export interface ProductRateLimitConfig {
// Default limits (can be overridden via env)
const DEFAULT_PRODUCT_RPM = parseInt(process.env.PRODUCT_RATE_LIMIT_RPM || '100', 10);
const WINDOW_MS = 60_000; // 1 minute
const STORE_MODE = process.env.PRODUCT_RATE_LIMIT_STORE === 'valkey' ? 'valkey' : 'memory';
const VALKEY_URL = process.env.VALKEY_URL || 'redis://valkey:6379';
const VALKEY_PREFIX = 'extraction:product-rate-limit';
type ValkeyClient = ReturnType<typeof createClient>;
// ── In-memory rate limit store ──────────────────────────────────
@ -31,10 +37,82 @@ interface RateLimitEntry {
}
const rateLimitStore = new Map<string, RateLimitEntry>();
let valkeyClient: ValkeyClient | null = null;
let valkeyConnectPromise: Promise<ValkeyClient | null> | null = null;
let loggedValkeyFallback = false;
function getStoreKey(productId: string): string {
const windowIndex = Math.floor(Date.now() / WINDOW_MS);
return `${productId}:${windowIndex}`;
function getWindowIndex(now = Date.now()): number {
return Math.floor(now / WINDOW_MS);
}
function getStoreKey(productId: string, now = Date.now()): string {
return `${productId}:${getWindowIndex(now)}`;
}
function getValkeyKey(productId: string, now = Date.now()): string {
return `${VALKEY_PREFIX}:${getWindowIndex(now)}:${encodeURIComponent(productId)}`;
}
function getWindowResetAt(now = Date.now()): number {
return (getWindowIndex(now) + 1) * WINDOW_MS;
}
function shouldUseValkey(): boolean {
return STORE_MODE === 'valkey' && process.env.NODE_ENV !== 'test';
}
function logValkeyFallback(err: unknown): void {
if (loggedValkeyFallback) {
return;
}
loggedValkeyFallback = true;
const detail = err instanceof Error ? err.message : String(err);
// eslint-disable-next-line no-console
console.warn(`[product-rate-limit] Falling back to in-memory store: ${detail}`);
}
async function getValkeyClient(): Promise<ValkeyClient | null> {
if (!shouldUseValkey()) {
return null;
}
if (valkeyClient?.isOpen) {
return valkeyClient;
}
if (!valkeyConnectPromise) {
const client = createClient({ url: VALKEY_URL });
client.on('error', err => {
logValkeyFallback(err);
});
valkeyConnectPromise = client
.connect()
.then(() => {
loggedValkeyFallback = false;
valkeyClient = client;
return client;
})
.catch(err => {
logValkeyFallback(err);
return null;
})
.finally(() => {
valkeyConnectPromise = null;
});
}
return valkeyConnectPromise;
}
function buildResult(now: number, count: number, resetAt: number): RateLimitResult {
return {
allowed: count <= DEFAULT_PRODUCT_RPM,
limit: DEFAULT_PRODUCT_RPM,
remaining: Math.max(0, DEFAULT_PRODUCT_RPM - count),
resetAt,
retryAfter:
count > DEFAULT_PRODUCT_RPM ? Math.ceil(Math.max(0, resetAt - now) / 1000) : undefined,
};
}
// ── Rate limiting functions ─────────────────────────────────────
@ -47,84 +125,89 @@ export interface RateLimitResult {
retryAfter?: number;
}
/**
* Check if request is within product rate limit.
*/
export function checkProductRateLimit(productId: string): RateLimitResult {
const key = getStoreKey(productId);
function checkProductRateLimitMemory(productId: string): RateLimitResult {
const now = Date.now();
const resetAt = (Math.floor(now / WINDOW_MS) + 1) * WINDOW_MS;
const key = getStoreKey(productId, now);
const resetAt = getWindowResetAt(now);
const entry = rateLimitStore.get(key);
if (!entry || now >= entry.resetAt) {
// New window
rateLimitStore.set(key, {
count: 1,
windowStart: now,
resetAt,
});
return {
allowed: true,
limit: DEFAULT_PRODUCT_RPM,
remaining: DEFAULT_PRODUCT_RPM - 1,
resetAt,
};
}
// Existing window
if (entry.count >= DEFAULT_PRODUCT_RPM) {
return {
allowed: false,
limit: DEFAULT_PRODUCT_RPM,
remaining: 0,
resetAt: entry.resetAt,
retryAfter: Math.ceil((entry.resetAt - now) / 1000),
};
return buildResult(now, 1, resetAt);
}
entry.count++;
return {
allowed: true,
limit: DEFAULT_PRODUCT_RPM,
remaining: DEFAULT_PRODUCT_RPM - entry.count,
resetAt: entry.resetAt,
};
return buildResult(now, entry.count, entry.resetAt);
}
async function checkProductRateLimitValkey(productId: string): Promise<RateLimitResult> {
const now = Date.now();
const client = await getValkeyClient();
if (!client) {
return checkProductRateLimitMemory(productId);
}
const resetAt = getWindowResetAt(now);
const key = getValkeyKey(productId, now);
const ttlSeconds = Math.max(1, Math.ceil((resetAt - now) / 1000) + 5);
const count = await client.incr(key);
if (count === 1) {
await client.expire(key, ttlSeconds);
}
return buildResult(now, count, resetAt);
}
/**
* Check if request is within product rate limit.
*/
export async function checkProductRateLimit(productId: string): Promise<RateLimitResult> {
if (shouldUseValkey()) {
return checkProductRateLimitValkey(productId);
}
return checkProductRateLimitMemory(productId);
}
function getProductRateLimitStatusMemory(productId: string): RateLimitResult {
const now = Date.now();
const key = getStoreKey(productId, now);
const resetAt = getWindowResetAt(now);
const entry = rateLimitStore.get(key);
if (!entry || now >= entry.resetAt) {
return buildResult(now, 0, resetAt);
}
return buildResult(now, entry.count, entry.resetAt);
}
async function getProductRateLimitStatusValkey(productId: string): Promise<RateLimitResult> {
const now = Date.now();
const client = await getValkeyClient();
if (!client) {
return getProductRateLimitStatusMemory(productId);
}
const count = parseInt((await client.get(getValkeyKey(productId, now))) || '0', 10);
return buildResult(now, count, getWindowResetAt(now));
}
/**
* Get current rate limit status for a product (without incrementing).
*/
export function getProductRateLimitStatus(productId: string): RateLimitResult {
const key = getStoreKey(productId);
const now = Date.now();
const resetAt = (Math.floor(now / WINDOW_MS) + 1) * WINDOW_MS;
const entry = rateLimitStore.get(key);
if (!entry || now >= entry.resetAt) {
return {
allowed: true,
limit: DEFAULT_PRODUCT_RPM,
remaining: DEFAULT_PRODUCT_RPM,
resetAt,
};
export async function getProductRateLimitStatus(productId: string): Promise<RateLimitResult> {
if (shouldUseValkey()) {
return getProductRateLimitStatusValkey(productId);
}
const remaining = Math.max(0, DEFAULT_PRODUCT_RPM - entry.count);
return {
allowed: remaining > 0,
limit: DEFAULT_PRODUCT_RPM,
remaining,
resetAt: entry.resetAt,
retryAfter: remaining === 0 ? Math.ceil((entry.resetAt - now) / 1000) : undefined,
};
return getProductRateLimitStatusMemory(productId);
}
/**
* Get rate limit summary across all products.
*/
export function getRateLimitSummary(): {
function getRateLimitSummaryMemory(): {
products: Array<{
productId: string;
currentWindow: number;
@ -152,7 +235,57 @@ export function getRateLimitSummary(): {
return {
products: [...products.entries()].map(([productId, data]) => ({
productId,
currentWindow: Math.floor(now / WINDOW_MS),
currentWindow: getWindowIndex(now),
requests: data.count,
limit: DEFAULT_PRODUCT_RPM,
resetAt: data.resetAt,
})),
totalProducts: products.size,
};
}
async function getRateLimitSummaryValkey(): Promise<{
products: Array<{
productId: string;
currentWindow: number;
requests: number;
limit: number;
resetAt: number;
}>;
totalProducts: number;
}> {
const client = await getValkeyClient();
if (!client) {
return getRateLimitSummaryMemory();
}
const products = new Map<string, { count: number; resetAt: number; currentWindow: number }>();
for await (const key of client.scanIterator({ MATCH: `${VALKEY_PREFIX}:*`, COUNT: 100 })) {
const value = await client.get(key);
if (!value) {
continue;
}
const [, , , windowIndexRaw, ...productParts] = key.split(':');
const windowIndex = parseInt(windowIndexRaw || '0', 10);
const productId = decodeURIComponent(productParts.join(':'));
const count = parseInt(value, 10);
const resetAt = (windowIndex + 1) * WINDOW_MS;
const existing = products.get(productId);
if (existing) {
existing.count += count;
existing.resetAt = Math.max(existing.resetAt, resetAt);
existing.currentWindow = Math.max(existing.currentWindow, windowIndex);
} else {
products.set(productId, { count, resetAt, currentWindow: windowIndex });
}
}
return {
products: [...products.entries()].map(([productId, data]) => ({
productId,
currentWindow: data.currentWindow,
requests: data.count,
limit: DEFAULT_PRODUCT_RPM,
resetAt: data.resetAt,
@ -161,24 +294,83 @@ export function getRateLimitSummary(): {
};
}
/**
* Get rate limit summary across all products.
*/
export async function getRateLimitSummary(): Promise<{
products: Array<{
productId: string;
currentWindow: number;
requests: number;
limit: number;
resetAt: number;
}>;
totalProducts: number;
}> {
if (shouldUseValkey()) {
return getRateLimitSummaryValkey();
}
return getRateLimitSummaryMemory();
}
async function resetProductRateLimitValkey(productId: string): Promise<void> {
const now = Date.now();
const currentWindow = getWindowIndex(now);
const client = await getValkeyClient();
if (!client) {
return;
}
await client.del([
`${VALKEY_PREFIX}:${currentWindow - 1}:${encodeURIComponent(productId)}`,
`${VALKEY_PREFIX}:${currentWindow}:${encodeURIComponent(productId)}`,
`${VALKEY_PREFIX}:${currentWindow + 1}:${encodeURIComponent(productId)}`,
]);
}
/**
* Reset rate limit for a product (admin operation).
*/
export function resetProductRateLimit(productId: string): void {
export async function resetProductRateLimit(productId: string): Promise<void> {
const now = Date.now();
const currentWindow = Math.floor(now / WINDOW_MS);
const currentWindow = getWindowIndex(now);
// Clear current and next window entries
for (let i = -1; i <= 1; i++) {
rateLimitStore.delete(`${productId}:${currentWindow + i}`);
}
if (shouldUseValkey()) {
await resetProductRateLimitValkey(productId);
}
}
export async function clearAllProductRateLimits(): Promise<void> {
rateLimitStore.clear();
const client = await getValkeyClient();
if (!client) {
return;
}
const keys: string[] = [];
for await (const key of client.scanIterator({ MATCH: `${VALKEY_PREFIX}:*`, COUNT: 100 })) {
keys.push(key);
}
if (keys.length > 0) {
await client.del(keys);
}
}
/**
* Cleanup old entries from rate limit store.
* Called periodically to prevent memory growth.
*/
export function cleanupRateLimitStore(): number {
export async function cleanupRateLimitStore(): Promise<number> {
if (shouldUseValkey()) {
return 0;
}
const now = Date.now();
let cleaned = 0;
@ -200,12 +392,17 @@ let cleanupInterval: ReturnType<typeof setInterval> | null = null;
* Called during server startup.
*/
export function startRateLimitCleanup(): void {
if (shouldUseValkey()) {
return;
}
if (cleanupInterval) {
clearInterval(cleanupInterval);
}
cleanupInterval = setInterval(
() => {
const cleaned = cleanupRateLimitStore();
async () => {
const cleaned = await cleanupRateLimitStore();
if (cleaned > 0 && process.env.NODE_ENV === 'development') {
// eslint-disable-next-line no-console
console.log(`[product-rate-limit] Cleaned up ${cleaned} expired entries`);
@ -225,6 +422,18 @@ export function stopRateLimitCleanup(): void {
}
}
export function getProductRateLimitStoreMode(): 'memory' | 'valkey' {
return STORE_MODE;
}
export async function closeProductRateLimitStore(): Promise<void> {
stopRateLimitCleanup();
if (valkeyClient?.isOpen) {
await valkeyClient.quit();
}
valkeyClient = null;
}
// Auto-start in production/development (not test)
if (process.env.NODE_ENV !== 'test') {
startRateLimitCleanup();

View File

@ -16,6 +16,7 @@ import { sidecarBreaker } from '../../lib/circuit-breaker.js';
import { createJob, getJob, initJobQueue, listJobs, shutdownJobQueue } from './jobs.js';
import {
checkProductRateLimit,
closeProductRateLimitStore,
getProductRateLimitStatus,
getRateLimitSummary,
resetProductRateLimit,
@ -105,6 +106,7 @@ export async function extractRoutes(app: FastifyInstance) {
startHealthMonitoring();
await initJobQueue(app.log);
app.addHook('onClose', async () => {
await closeProductRateLimitStore();
await shutdownJobQueue();
});
@ -129,7 +131,7 @@ export async function extractRoutes(app: FastifyInstance) {
// Check per-product rate limit
if (headerProductId) {
const productLimit = checkProductRateLimit(headerProductId);
const productLimit = await checkProductRateLimit(headerProductId);
if (!productLimit.allowed) {
reply.header('X-RateLimit-Limit', String(productLimit.limit));
reply.header('X-RateLimit-Remaining', '0');
@ -358,7 +360,7 @@ export async function extractRoutes(app: FastifyInstance) {
// Check per-product rate limit for async jobs
const headerProductId = (req.headers['x-product-id'] as string) || productId;
if (headerProductId) {
const productLimit = checkProductRateLimit(headerProductId);
const productLimit = await checkProductRateLimit(headerProductId);
if (!productLimit.allowed) {
reply.header('X-RateLimit-Limit', String(productLimit.limit));
reply.header('X-RateLimit-Remaining', '0');
@ -550,9 +552,9 @@ export async function extractRoutes(app: FastifyInstance) {
app.get('/extract/rate-limits/product', async (req, reply) => {
const productId = (req.query as Record<string, string>).productId;
if (productId) {
return reply.send(getProductRateLimitStatus(productId));
return reply.send(await getProductRateLimitStatus(productId));
}
return reply.send(getRateLimitSummary());
return reply.send(await getRateLimitSummary());
});
/**
@ -563,12 +565,12 @@ export async function extractRoutes(app: FastifyInstance) {
if (!productId) {
throw new BadRequestError('productId is required');
}
resetProductRateLimit(productId);
await resetProductRateLimit(productId);
req.log.info({ productId }, 'product rate limit reset');
return reply.send({
productId,
reset: true,
newStatus: getProductRateLimitStatus(productId),
newStatus: await getProductRateLimitStatus(productId),
});
});