From 41b32a840ff34ba5e3a4bbe3f4ecef7139ed7e6d Mon Sep 17 00:00:00 2001 From: saravanakumardb1 Date: Mon, 2 Mar 2026 10:16:24 -0800 Subject: [PATCH] fix(extraction-service): export rate limit cleanup functions for graceful shutdown --- .../src/modules/extract/jobs.test.ts | 77 +++++++++++-------- .../src/modules/extract/product-rate-limit.ts | 57 ++++++++++---- .../src/modules/extract/routes.ts | 10 ++- 3 files changed, 97 insertions(+), 47 deletions(-) diff --git a/services/extraction-service/src/modules/extract/jobs.test.ts b/services/extraction-service/src/modules/extract/jobs.test.ts index 32e844b7..2f229d0b 100644 --- a/services/extraction-service/src/modules/extract/jobs.test.ts +++ b/services/extraction-service/src/modules/extract/jobs.test.ts @@ -3,7 +3,7 @@ */ import { describe, it, expect, vi, beforeEach } from 'vitest'; -import { createJob, getJob, listJobs } from './jobs.js'; +import { createJob, getJob, listJobs, resetJobStore } from './jobs.js'; import { type WebhookConfig } from './webhooks.js'; // Mock the python-bridge to avoid real sidecar calls @@ -25,6 +25,7 @@ describe('extraction jobs', () => { beforeEach(() => { mockSidecarExtract.mockReset(); mockTriggerJobWebhook.mockReset(); + resetJobStore(); }); describe('createJob', () => { @@ -51,11 +52,7 @@ describe('extraction jobs', () => { metadata: { model_id: 'test', duration_ms: 10, char_count: 5 }, }); - const job = createJob([ - { text: 'First' }, - { text: 'Second' }, - { text: 'Third' }, - ]); + const job = createJob([{ text: 'First' }, { text: 'Second' }, { text: 'Third' }]); expect(job.progress.total).toBe(3); }); @@ -94,10 +91,13 @@ describe('extraction jobs', () => { const job = createJob([{ text: 'Meet John at 3pm' }]); // Wait for background processing - await vi.waitFor(() => { - const j = getJob(job.id); - expect(j!.status).toBe('completed'); - }, { timeout: 2000 }); + await vi.waitFor( + () => { + const j = getJob(job.id); + expect(j!.status).toBe('completed'); + }, + { timeout: 2000 } + ); const completed = getJob(job.id)!; expect(completed.results).toHaveLength(1); @@ -111,11 +111,14 @@ describe('extraction jobs', () => { const job = createJob([{ text: 'Will fail' }]); - await vi.waitFor(() => { - const j = getJob(job.id); - expect(j!.status).not.toBe('pending'); - expect(j!.status).not.toBe('processing'); - }, { timeout: 2000 }); + await vi.waitFor( + () => { + const j = getJob(job.id); + expect(j!.status).not.toBe('pending'); + expect(j!.status).not.toBe('processing'); + }, + { timeout: 2000 } + ); const finished = getJob(job.id)!; expect(finished.errors).toHaveLength(1); @@ -142,10 +145,13 @@ describe('extraction jobs', () => { { text: 'Empty result' }, ]); - await vi.waitFor(() => { - const j = getJob(job.id); - expect(j!.progress.completed).toBe(3); - }, { timeout: 2000 }); + await vi.waitFor( + () => { + const j = getJob(job.id); + expect(j!.progress.completed).toBe(3); + }, + { timeout: 2000 } + ); const finished = getJob(job.id)!; expect(finished.status).toBe('completed'); // Not all failed @@ -234,10 +240,13 @@ describe('extraction jobs', () => { const job = createJob([{ text: 'test' }], 'req-123', webhookConfig); // Wait for background processing - await vi.waitFor(() => { - const j = getJob(job.id); - expect(j!.status).toBe('completed'); - }, { timeout: 2000 }); + await vi.waitFor( + () => { + const j = getJob(job.id); + expect(j!.status).toBe('completed'); + }, + { timeout: 2000 } + ); expect(mockTriggerJobWebhook).toHaveBeenCalledWith( expect.objectContaining({ id: job.id, status: 'completed' }), @@ -260,10 +269,13 @@ describe('extraction jobs', () => { const job = createJob([{ text: 'test' }], 'req-123', webhookConfig); // Wait for background processing - await vi.waitFor(() => { - const j = getJob(job.id); - expect(j!.status).toBe('completed'); - }, { timeout: 2000 }); + await vi.waitFor( + () => { + const j = getJob(job.id); + expect(j!.status).toBe('completed'); + }, + { timeout: 2000 } + ); // Job should still be completed even if webhook failed const completed = getJob(job.id)!; @@ -279,10 +291,13 @@ describe('extraction jobs', () => { const job = createJob([{ text: 'test' }]); // Wait for background processing - await vi.waitFor(() => { - const j = getJob(job.id); - expect(j!.status).toBe('completed'); - }, { timeout: 2000 }); + await vi.waitFor( + () => { + const j = getJob(job.id); + expect(j!.status).toBe('completed'); + }, + { timeout: 2000 } + ); expect(mockTriggerJobWebhook).not.toHaveBeenCalled(); }); diff --git a/services/extraction-service/src/modules/extract/product-rate-limit.ts b/services/extraction-service/src/modules/extract/product-rate-limit.ts index 3c24772d..08a331a8 100644 --- a/services/extraction-service/src/modules/extract/product-rate-limit.ts +++ b/services/extraction-service/src/modules/extract/product-rate-limit.ts @@ -9,17 +9,14 @@ * - Configurable via env vars per tier */ -import { z } from 'zod'; - // ── Configuration ─────────────────────────────────────────────── -const ProductRateLimitSchema = z.object({ - productId: z.string(), - windowMs: z.number().default(60_000), // 1 minute window - maxRequests: z.number().default(100), // requests per window -}); - -export type ProductRateLimitConfig = z.infer; +/** Configuration for product rate limiting */ +export interface ProductRateLimitConfig { + productId: string; + windowMs: number; + maxRequests: number; +} // Default limits (can be overridden via env) const DEFAULT_PRODUCT_RPM = parseInt(process.env.PRODUCT_RATE_LIMIT_RPM || '100', 10); @@ -195,10 +192,40 @@ export function cleanupRateLimitStore(): number { return cleaned; } -// Auto-cleanup every 5 minutes -setInterval(() => { - const cleaned = cleanupRateLimitStore(); - if (cleaned > 0 && process.env.NODE_ENV === 'development') { - console.log(`[product-rate-limit] Cleaned up ${cleaned} expired entries`); +// Cleanup interval handle (exported for testing/shutdown) +let cleanupInterval: ReturnType | null = null; + +/** + * Start automatic cleanup of expired rate limit entries. + * Called during server startup. + */ +export function startRateLimitCleanup(): void { + if (cleanupInterval) { + clearInterval(cleanupInterval); } -}, 5 * 60 * 1000); + cleanupInterval = setInterval( + () => { + const cleaned = cleanupRateLimitStore(); + if (cleaned > 0 && process.env.NODE_ENV === 'development') { + // eslint-disable-next-line no-console + console.log(`[product-rate-limit] Cleaned up ${cleaned} expired entries`); + } + }, + 5 * 60 * 1000 + ); +} + +/** + * Stop automatic cleanup (for graceful shutdown/testing). + */ +export function stopRateLimitCleanup(): void { + if (cleanupInterval) { + clearInterval(cleanupInterval); + cleanupInterval = null; + } +} + +// Auto-start in production/development (not test) +if (process.env.NODE_ENV !== 'test') { + startRateLimitCleanup(); +} diff --git a/services/extraction-service/src/modules/extract/routes.ts b/services/extraction-service/src/modules/extract/routes.ts index d42bb73f..3bfbdb41 100644 --- a/services/extraction-service/src/modules/extract/routes.ts +++ b/services/extraction-service/src/modules/extract/routes.ts @@ -365,6 +365,7 @@ export async function extractRoutes(app: FastifyInstance) { limit: productLimit.limit, }); } + reply.header('X-RateLimit-Remaining', String(productLimit.remaining)); } const sidecarRequests = inputs.map(input => ({ @@ -386,10 +387,17 @@ export async function extractRoutes(app: FastifyInstance) { const body = req.body as Record; let webhookConfig: WebhookConfig | undefined; if (body.webhookUrl && typeof body.webhookUrl === 'string') { + // Validate webhook URL format + try { + new URL(body.webhookUrl); + } catch { + throw new BadRequestError('Invalid webhookUrl format'); + } webhookConfig = { url: body.webhookUrl, secret: typeof body.webhookSecret === 'string' ? body.webhookSecret : 'default-secret', - retryAttempts: typeof body.webhookRetryAttempts === 'number' ? body.webhookRetryAttempts : 3, + retryAttempts: + typeof body.webhookRetryAttempts === 'number' ? body.webhookRetryAttempts : 3, }; }