fix(extraction-service): export rate limit cleanup functions for graceful shutdown

This commit is contained in:
saravanakumardb1 2026-03-02 10:16:24 -08:00
parent aeae62027f
commit 41b32a840f
3 changed files with 97 additions and 47 deletions

View File

@ -3,7 +3,7 @@
*/
import { describe, it, expect, vi, beforeEach } from 'vitest';
import { createJob, getJob, listJobs } from './jobs.js';
import { createJob, getJob, listJobs, resetJobStore } from './jobs.js';
import { type WebhookConfig } from './webhooks.js';
// Mock the python-bridge to avoid real sidecar calls
@ -25,6 +25,7 @@ describe('extraction jobs', () => {
beforeEach(() => {
mockSidecarExtract.mockReset();
mockTriggerJobWebhook.mockReset();
resetJobStore();
});
describe('createJob', () => {
@ -51,11 +52,7 @@ describe('extraction jobs', () => {
metadata: { model_id: 'test', duration_ms: 10, char_count: 5 },
});
const job = createJob([
{ text: 'First' },
{ text: 'Second' },
{ text: 'Third' },
]);
const job = createJob([{ text: 'First' }, { text: 'Second' }, { text: 'Third' }]);
expect(job.progress.total).toBe(3);
});
@ -94,10 +91,13 @@ describe('extraction jobs', () => {
const job = createJob([{ text: 'Meet John at 3pm' }]);
// Wait for background processing
await vi.waitFor(() => {
const j = getJob(job.id);
expect(j!.status).toBe('completed');
}, { timeout: 2000 });
await vi.waitFor(
() => {
const j = getJob(job.id);
expect(j!.status).toBe('completed');
},
{ timeout: 2000 }
);
const completed = getJob(job.id)!;
expect(completed.results).toHaveLength(1);
@ -111,11 +111,14 @@ describe('extraction jobs', () => {
const job = createJob([{ text: 'Will fail' }]);
await vi.waitFor(() => {
const j = getJob(job.id);
expect(j!.status).not.toBe('pending');
expect(j!.status).not.toBe('processing');
}, { timeout: 2000 });
await vi.waitFor(
() => {
const j = getJob(job.id);
expect(j!.status).not.toBe('pending');
expect(j!.status).not.toBe('processing');
},
{ timeout: 2000 }
);
const finished = getJob(job.id)!;
expect(finished.errors).toHaveLength(1);
@ -142,10 +145,13 @@ describe('extraction jobs', () => {
{ text: 'Empty result' },
]);
await vi.waitFor(() => {
const j = getJob(job.id);
expect(j!.progress.completed).toBe(3);
}, { timeout: 2000 });
await vi.waitFor(
() => {
const j = getJob(job.id);
expect(j!.progress.completed).toBe(3);
},
{ timeout: 2000 }
);
const finished = getJob(job.id)!;
expect(finished.status).toBe('completed'); // Not all failed
@ -234,10 +240,13 @@ describe('extraction jobs', () => {
const job = createJob([{ text: 'test' }], 'req-123', webhookConfig);
// Wait for background processing
await vi.waitFor(() => {
const j = getJob(job.id);
expect(j!.status).toBe('completed');
}, { timeout: 2000 });
await vi.waitFor(
() => {
const j = getJob(job.id);
expect(j!.status).toBe('completed');
},
{ timeout: 2000 }
);
expect(mockTriggerJobWebhook).toHaveBeenCalledWith(
expect.objectContaining({ id: job.id, status: 'completed' }),
@ -260,10 +269,13 @@ describe('extraction jobs', () => {
const job = createJob([{ text: 'test' }], 'req-123', webhookConfig);
// Wait for background processing
await vi.waitFor(() => {
const j = getJob(job.id);
expect(j!.status).toBe('completed');
}, { timeout: 2000 });
await vi.waitFor(
() => {
const j = getJob(job.id);
expect(j!.status).toBe('completed');
},
{ timeout: 2000 }
);
// Job should still be completed even if webhook failed
const completed = getJob(job.id)!;
@ -279,10 +291,13 @@ describe('extraction jobs', () => {
const job = createJob([{ text: 'test' }]);
// Wait for background processing
await vi.waitFor(() => {
const j = getJob(job.id);
expect(j!.status).toBe('completed');
}, { timeout: 2000 });
await vi.waitFor(
() => {
const j = getJob(job.id);
expect(j!.status).toBe('completed');
},
{ timeout: 2000 }
);
expect(mockTriggerJobWebhook).not.toHaveBeenCalled();
});

View File

@ -9,17 +9,14 @@
* - Configurable via env vars per tier
*/
import { z } from 'zod';
// ── Configuration ───────────────────────────────────────────────
const ProductRateLimitSchema = z.object({
productId: z.string(),
windowMs: z.number().default(60_000), // 1 minute window
maxRequests: z.number().default(100), // requests per window
});
export type ProductRateLimitConfig = z.infer<typeof ProductRateLimitSchema>;
/** Configuration for product rate limiting */
export interface ProductRateLimitConfig {
productId: string;
windowMs: number;
maxRequests: number;
}
// Default limits (can be overridden via env)
const DEFAULT_PRODUCT_RPM = parseInt(process.env.PRODUCT_RATE_LIMIT_RPM || '100', 10);
@ -195,10 +192,40 @@ export function cleanupRateLimitStore(): number {
return cleaned;
}
// Auto-cleanup every 5 minutes
setInterval(() => {
const cleaned = cleanupRateLimitStore();
if (cleaned > 0 && process.env.NODE_ENV === 'development') {
console.log(`[product-rate-limit] Cleaned up ${cleaned} expired entries`);
// Cleanup interval handle (exported for testing/shutdown)
let cleanupInterval: ReturnType<typeof setInterval> | null = null;
/**
* Start automatic cleanup of expired rate limit entries.
* Called during server startup.
*/
export function startRateLimitCleanup(): void {
if (cleanupInterval) {
clearInterval(cleanupInterval);
}
}, 5 * 60 * 1000);
cleanupInterval = setInterval(
() => {
const cleaned = cleanupRateLimitStore();
if (cleaned > 0 && process.env.NODE_ENV === 'development') {
// eslint-disable-next-line no-console
console.log(`[product-rate-limit] Cleaned up ${cleaned} expired entries`);
}
},
5 * 60 * 1000
);
}
/**
* Stop automatic cleanup (for graceful shutdown/testing).
*/
export function stopRateLimitCleanup(): void {
if (cleanupInterval) {
clearInterval(cleanupInterval);
cleanupInterval = null;
}
}
// Auto-start in production/development (not test)
if (process.env.NODE_ENV !== 'test') {
startRateLimitCleanup();
}

View File

@ -365,6 +365,7 @@ export async function extractRoutes(app: FastifyInstance) {
limit: productLimit.limit,
});
}
reply.header('X-RateLimit-Remaining', String(productLimit.remaining));
}
const sidecarRequests = inputs.map(input => ({
@ -386,10 +387,17 @@ export async function extractRoutes(app: FastifyInstance) {
const body = req.body as Record<string, unknown>;
let webhookConfig: WebhookConfig | undefined;
if (body.webhookUrl && typeof body.webhookUrl === 'string') {
// Validate webhook URL format
try {
new URL(body.webhookUrl);
} catch {
throw new BadRequestError('Invalid webhookUrl format');
}
webhookConfig = {
url: body.webhookUrl,
secret: typeof body.webhookSecret === 'string' ? body.webhookSecret : 'default-secret',
retryAttempts: typeof body.webhookRetryAttempts === 'number' ? body.webhookRetryAttempts : 3,
retryAttempts:
typeof body.webhookRetryAttempts === 'number' ? body.webhookRetryAttempts : 3,
};
}