From 14346fbd5d705433b021bfc08f50b366c8865705 Mon Sep 17 00:00:00 2001 From: root Date: Sun, 15 Mar 2026 09:37:03 +0000 Subject: [PATCH] feat(ratelimit): back api throttles with datastore --- .../src/lib/api-key-auth.test.ts | 22 +- .../platform-service/src/lib/api-key-auth.ts | 13 +- .../src/modules/exports/routes.ts | 6 +- .../src/modules/ip-rules/routes.ts | 8 +- .../src/modules/jobs/routes.ts | 18 +- .../src/modules/maintenance/routes.ts | 8 +- .../src/modules/ratelimit/ratelimit.test.ts | 124 +++++----- .../src/modules/ratelimit/routes.ts | 4 +- .../src/modules/ratelimit/store.ts | 213 ++++++++++++------ .../src/modules/runs/routes.ts | 24 +- .../src/modules/webhooks/routes.ts | 26 +-- 11 files changed, 266 insertions(+), 200 deletions(-) diff --git a/services/platform-service/src/lib/api-key-auth.test.ts b/services/platform-service/src/lib/api-key-auth.test.ts index cb71c628..8920e662 100644 --- a/services/platform-service/src/lib/api-key-auth.test.ts +++ b/services/platform-service/src/lib/api-key-auth.test.ts @@ -40,13 +40,13 @@ async function seedApiKeyToken( } describe('api key auth', () => { - beforeEach(() => { + beforeEach(async () => { provider = new MemoryDatastoreProvider(); setProvider(provider); delete process.env.API_KEY_RATE_LIMIT_CONFIG_JSON; delete process.env.API_KEY_PRODUCT_RATE_LIMIT_CONFIG_JSON; delete process.env.PLATFORM_RUNTIME_ENV; - clearRateLimits(); + await clearRateLimits(); }); it('attaches apiKeyAuth from x-api-key', async () => { @@ -55,7 +55,7 @@ describe('api key auth', () => { await registerOptionalApiKeyContext(app); app.get('/probe', async req => { - const actor = requireJwtOrApiKey(req, { + const actor = await requireJwtOrApiKey(req, { apiKeyScopes: ['jobs:read'], rateLimitKey: 'jobs:read', }); @@ -90,7 +90,7 @@ describe('api key auth', () => { await registerOptionalApiKeyContext(app); app.get('/probe', async req => { - return requireJwtOrApiKey(req, { + return await requireJwtOrApiKey(req, { apiKeyScopes: ['jobs:read'], }); }); @@ -113,7 +113,7 @@ describe('api key auth', () => { await registerOptionalApiKeyContext(app); app.get('/probe', async req => { - return requireJwtOrApiKey(req, { + return await requireJwtOrApiKey(req, { apiKeyScopes: ['jobs:read'], }); }); @@ -135,7 +135,7 @@ describe('api key auth', () => { await registerOptionalApiKeyContext(app); app.get('/probe', async req => { - return requireJwtOrApiKey(req, { + return await requireJwtOrApiKey(req, { apiKeyScopes: ['jobs:write'], }); }); @@ -164,7 +164,7 @@ describe('api key auth', () => { await registerOptionalApiKeyContext(app); app.get('/probe', async req => { - return requireJwtOrApiKey(req, { + return await requireJwtOrApiKey(req, { apiKeyScopes: ['maintenance:read'], apiKeyTokenTypes: ['service_api'], }); @@ -188,7 +188,7 @@ describe('api key auth', () => { await registerOptionalApiKeyContext(app); app.get('/probe', async req => { - return requireJwtOrApiKey(req, { + return await requireJwtOrApiKey(req, { apiKeyScopes: ['jobs:read'], }); }); @@ -219,7 +219,7 @@ describe('api key auth', () => { await registerOptionalApiKeyContext(app); app.post('/probe', async req => { - return requireJwtOrApiKey(req, { + return await requireJwtOrApiKey(req, { apiKeyScopes: ['jobs:write'], rateLimitKey: 'jobs:write', }); @@ -271,7 +271,7 @@ describe('api key auth', () => { await registerOptionalApiKeyContext(app); app.post('/probe', async req => { - return requireJwtOrApiKey(req, { + return await requireJwtOrApiKey(req, { apiKeyScopes: ['jobs:write'], rateLimitKey: 'jobs:write', }); @@ -304,7 +304,7 @@ describe('api key auth', () => { }); app.get('/probe', async req => { - return requireJwtOrApiKey(req, { + return await requireJwtOrApiKey(req, { jwtRoles: ['admin'], }); }); diff --git a/services/platform-service/src/lib/api-key-auth.ts b/services/platform-service/src/lib/api-key-auth.ts index 829245d9..2fc04813 100644 --- a/services/platform-service/src/lib/api-key-auth.ts +++ b/services/platform-service/src/lib/api-key-auth.ts @@ -1,3 +1,4 @@ +import crypto from 'node:crypto'; import bcrypt from 'bcryptjs'; import type { FastifyInstance, FastifyRequest } from 'fastify'; import { ForbiddenError, TooManyRequestsError, UnauthorizedError } from './errors.js'; @@ -266,7 +267,7 @@ function ensureApiKeyScopes( return apiKey; } -function enforceApiKeyRateLimit(req: FastifyRequest, rateLimitKey?: string): void { +async function enforceApiKeyRateLimit(req: FastifyRequest, rateLimitKey?: string): Promise { if (!rateLimitKey || !req.apiKeyAuth) return; const apiKeyRateLimitConfig = loadApiKeyRateLimitConfig(); @@ -274,7 +275,7 @@ function enforceApiKeyRateLimit(req: FastifyRequest, rateLimitKey?: string): voi if (!keyRule) return; const compositeKey = `api-key:${req.apiKeyAuth.productId}:${req.apiKeyAuth.tokenId}:${rateLimitKey}`; - const keyResult = rateLimitStore.checkAndRecord(compositeKey, keyRule); + const keyResult = await rateLimitStore.checkAndRecord(compositeKey, keyRule); if (!keyResult.allowed) { logApiKeyWarning(req, 'token_rate_limited', { rateLimitKey, @@ -289,7 +290,7 @@ function enforceApiKeyRateLimit(req: FastifyRequest, rateLimitKey?: string): voi if (!productRule) return; const productCompositeKey = `api-key-product:${req.apiKeyAuth.productId}:${rateLimitKey}`; - const productResult = rateLimitStore.checkAndRecord(productCompositeKey, productRule); + const productResult = await rateLimitStore.checkAndRecord(productCompositeKey, productRule); if (!productResult.allowed) { logApiKeyWarning(req, 'product_rate_limited', { rateLimitKey, @@ -301,10 +302,10 @@ function enforceApiKeyRateLimit(req: FastifyRequest, rateLimitKey?: string): voi } } -export function requireJwtOrApiKey( +export async function requireJwtOrApiKey( req: FastifyRequest, { allowJwt = false, jwtRoles, apiKeyScopes, apiKeyTokenTypes, rateLimitKey }: AccessOptions = {} -): AccessActor { +): Promise { const jwt = req.jwtPayload; if (jwt?.sub) { if (jwtRoles && jwtRoles.length > 0) { @@ -323,7 +324,7 @@ export function requireJwtOrApiKey( } const apiKey = ensureApiKeyScopes(req, apiKeyScopes, apiKeyTokenTypes); - enforceApiKeyRateLimit(req, rateLimitKey); + await enforceApiKeyRateLimit(req, rateLimitKey); return { actorId: apiKey.userId, diff --git a/services/platform-service/src/modules/exports/routes.ts b/services/platform-service/src/modules/exports/routes.ts index 9e7dd11b..ca63509a 100644 --- a/services/platform-service/src/modules/exports/routes.ts +++ b/services/platform-service/src/modules/exports/routes.ts @@ -26,7 +26,7 @@ export async function exportRoutes(app: FastifyInstance) { // Start a new export job app.post('/exports', async (req, reply) => { - const access = requireExportWrite(req); + const access = await requireExportWrite(req); const parsed = CreateExportSchema.safeParse(req.body); if (!parsed.success) { throw new BadRequestError(parsed.error.issues.map(i => i.message).join('; ')); @@ -61,7 +61,7 @@ export async function exportRoutes(app: FastifyInstance) { // List export jobs app.get('/exports', async req => { - const access = requireExportRead(req); + const access = await requireExportRead(req); const query = req.query as Record; const limit = query.limit ? parseInt(query.limit, 10) : 20; const jobs = await repo.listExportJobs(access.productId, Math.min(limit, 100)); @@ -70,7 +70,7 @@ export async function exportRoutes(app: FastifyInstance) { // Get a specific export job app.get('/exports/:id', async req => { - const access = requireExportRead(req); + const access = await requireExportRead(req); const { id } = req.params as { id: string }; const job = await repo.getExportJob(id, access.productId); if (!job) throw new BadRequestError('Export job not found'); diff --git a/services/platform-service/src/modules/ip-rules/routes.ts b/services/platform-service/src/modules/ip-rules/routes.ts index e0e368f8..695ab0b6 100644 --- a/services/platform-service/src/modules/ip-rules/routes.ts +++ b/services/platform-service/src/modules/ip-rules/routes.ts @@ -26,13 +26,13 @@ export async function ipRuleRoutes(app: FastifyInstance) { // List all IP rules app.get('/ratelimit/ip-rules', async req => { - const access = requireIpRulesRead(req); + const access = await requireIpRulesRead(req); return repo.listRules(access.productId); }); // Create an IP rule app.post('/ratelimit/ip-rules', async (req, reply) => { - const access = requireIpRulesWrite(req); + const access = await requireIpRulesWrite(req); const parsed = CreateIPRuleSchema.safeParse(req.body); if (!parsed.success) { throw new BadRequestError(parsed.error.issues.map(i => i.message).join('; ')); @@ -60,7 +60,7 @@ export async function ipRuleRoutes(app: FastifyInstance) { // Delete an IP rule app.delete('/ratelimit/ip-rules/:id', async req => { - const access = requireIpRulesWrite(req); + const access = await requireIpRulesWrite(req); const { id } = req.params as { id: string }; const deleted = await repo.deleteRule(id, access.productId); if (!deleted) throw new BadRequestError('IP rule not found'); @@ -69,7 +69,7 @@ export async function ipRuleRoutes(app: FastifyInstance) { // Check if an IP is allowed/denied (utility endpoint) app.get('/ratelimit/check-ip/:ip', async req => { - const access = requireIpRulesRead(req); + const access = await requireIpRulesRead(req); const { ip } = req.params as { ip: string }; const result = await repo.checkIP(ip, access.productId); return { ip, action: result, hasRule: result !== null }; diff --git a/services/platform-service/src/modules/jobs/routes.ts b/services/platform-service/src/modules/jobs/routes.ts index 033617f1..754e1188 100644 --- a/services/platform-service/src/modules/jobs/routes.ts +++ b/services/platform-service/src/modules/jobs/routes.ts @@ -7,8 +7,8 @@ import { getJobHandler } from './registry.js'; import { executeJob } from './runner.js'; export async function jobRoutes(app: FastifyInstance) { - function requireJobsRead(req: import('fastify').FastifyRequest): string { - const access = requireJwtOrApiKey(req, { + async function requireJobsRead(req: import('fastify').FastifyRequest): Promise { + const access = await requireJwtOrApiKey(req, { allowJwt: true, apiKeyScopes: ['jobs:read'], apiKeyTokenTypes: ['service_api'], @@ -17,8 +17,8 @@ export async function jobRoutes(app: FastifyInstance) { return access.productId; } - function requireJobsWrite(req: import('fastify').FastifyRequest): string { - const access = requireJwtOrApiKey(req, { + async function requireJobsWrite(req: import('fastify').FastifyRequest): Promise { + const access = await requireJwtOrApiKey(req, { jwtRoles: ['super_admin', 'admin'], apiKeyScopes: ['jobs:write'], apiKeyTokenTypes: ['service_api'], @@ -29,20 +29,20 @@ export async function jobRoutes(app: FastifyInstance) { // List all job definitions app.get('/jobs', async req => { - const productId = requireJobsRead(req); + const productId = await requireJobsRead(req); return repo.listJobDefinitions(productId); }); // Get a specific job definition app.get('/jobs/:id', async req => { - const productId = requireJobsRead(req); + const productId = await requireJobsRead(req); const { id } = req.params as { id: string }; return repo.getJobDefinition(id, productId); }); // Update job (enable/disable, change cron, etc.) app.put('/jobs/:id', async req => { - const productId = requireJobsWrite(req); + const productId = await requireJobsWrite(req); const { id } = req.params as { id: string }; const parsed = UpdateJobSchema.safeParse(req.body); if (!parsed.success) { @@ -53,7 +53,7 @@ export async function jobRoutes(app: FastifyInstance) { // Manually trigger a job app.post('/jobs/trigger', async req => { - const productId = requireJobsWrite(req); + const productId = await requireJobsWrite(req); const parsed = TriggerJobSchema.safeParse(req.body); if (!parsed.success) { throw new BadRequestError(parsed.error.issues.map(i => i.message).join('; ')); @@ -72,7 +72,7 @@ export async function jobRoutes(app: FastifyInstance) { // List recent runs for a job app.get('/jobs/:name/runs', async req => { - const productId = requireJobsRead(req); + const productId = await requireJobsRead(req); const { name } = req.params as { name: string }; const limit = parseInt((req.query as Record).limit || '20', 10); return repo.listJobRuns(productId, name, Math.min(limit, 100)); diff --git a/services/platform-service/src/modules/maintenance/routes.ts b/services/platform-service/src/modules/maintenance/routes.ts index b47ac747..a3e393ea 100644 --- a/services/platform-service/src/modules/maintenance/routes.ts +++ b/services/platform-service/src/modules/maintenance/routes.ts @@ -48,13 +48,13 @@ export async function maintenanceRoutes(app: FastifyInstance) { // Get full maintenance config (admin sees bypass rules too) app.get('/settings/maintenance/full', async req => { - const access = requireMaintenanceRead(req); + const access = await requireMaintenanceRead(req); return repo.getMaintenanceConfig(access.productId); }); // Update maintenance mode app.put('/settings/maintenance', async req => { - const access = requireMaintenanceWrite(req); + const access = await requireMaintenanceWrite(req); const parsed = UpdateMaintenanceSchema.safeParse(req.body); if (!parsed.success) { throw new BadRequestError(parsed.error.issues.map(i => i.message).join('; ')); @@ -76,7 +76,7 @@ export async function maintenanceRoutes(app: FastifyInstance) { // Create a scheduled maintenance window app.post('/settings/maintenance/schedule', async (req, reply) => { - const access = requireMaintenanceWrite(req); + const access = await requireMaintenanceWrite(req); const parsed = CreateMaintenanceWindowSchema.safeParse(req.body); if (!parsed.success) { throw new BadRequestError(parsed.error.issues.map(i => i.message).join('; ')); @@ -104,7 +104,7 @@ export async function maintenanceRoutes(app: FastifyInstance) { // Delete a scheduled maintenance window app.delete('/settings/maintenance/schedule/:id', async req => { - const access = requireMaintenanceWrite(req); + const access = await requireMaintenanceWrite(req); const { id } = req.params as { id: string }; const deleted = await repo.deleteWindow(id, access.productId); if (!deleted) throw new BadRequestError('Maintenance window not found'); diff --git a/services/platform-service/src/modules/ratelimit/ratelimit.test.ts b/services/platform-service/src/modules/ratelimit/ratelimit.test.ts index 743c2681..09610f8e 100644 --- a/services/platform-service/src/modules/ratelimit/ratelimit.test.ts +++ b/services/platform-service/src/modules/ratelimit/ratelimit.test.ts @@ -10,165 +10,165 @@ import type { RateLimitRule } from './types.js'; // ── Store: sliding window counter ────────────────────────────── describe('ratelimit store', () => { - beforeEach(() => { - clearAll(); + beforeEach(async () => { + await clearAll(); }); describe('checkAndRecord', () => { const rule: RateLimitRule = { maxRequests: 3, windowSeconds: 60 }; - it('allows first request and returns full remaining', () => { - const result = checkAndRecord('user:123', rule); + it('allows first request and returns full remaining', async () => { + const result = await checkAndRecord('user:123', rule); expect(result.allowed).toBe(true); expect(result.remaining).toBe(2); expect(result.limit).toBe(3); expect(result.resetAt).toBeDefined(); }); - it('decrements remaining on each call', () => { - const r1 = checkAndRecord('user:a', rule); + it('decrements remaining on each call', async () => { + const r1 = await checkAndRecord('user:a', rule); expect(r1.remaining).toBe(2); - const r2 = checkAndRecord('user:a', rule); + const r2 = await checkAndRecord('user:a', rule); expect(r2.remaining).toBe(1); - const r3 = checkAndRecord('user:a', rule); + const r3 = await checkAndRecord('user:a', rule); expect(r3.remaining).toBe(0); }); - it('blocks after exceeding maxRequests', () => { - checkAndRecord('user:b', rule); - checkAndRecord('user:b', rule); - checkAndRecord('user:b', rule); + it('blocks after exceeding maxRequests', async () => { + await checkAndRecord('user:b', rule); + await checkAndRecord('user:b', rule); + await checkAndRecord('user:b', rule); - const result = checkAndRecord('user:b', rule); + const result = await checkAndRecord('user:b', rule); expect(result.allowed).toBe(false); expect(result.remaining).toBe(0); expect(result.retryAfterMs).toBeGreaterThan(0); }); - it('uses separate counters for different keys', () => { - checkAndRecord('user:x', rule); - checkAndRecord('user:x', rule); - checkAndRecord('user:x', rule); + it('uses separate counters for different keys', async () => { + await checkAndRecord('user:x', rule); + await checkAndRecord('user:x', rule); + await checkAndRecord('user:x', rule); - const resultX = checkAndRecord('user:x', rule); + const resultX = await checkAndRecord('user:x', rule); expect(resultX.allowed).toBe(false); - const resultY = checkAndRecord('user:y', rule); + const resultY = await checkAndRecord('user:y', rule); expect(resultY.allowed).toBe(true); expect(resultY.remaining).toBe(2); }); - it('returns resetAt as ISO string', () => { - const result = checkAndRecord('user:time', rule); + it('returns resetAt as ISO string', async () => { + const result = await checkAndRecord('user:time', rule); expect(() => new Date(result.resetAt)).not.toThrow(); expect(new Date(result.resetAt).getTime()).toBeGreaterThan(Date.now() - 1000); }); - it('handles single-request limit', () => { + it('handles single-request limit', async () => { const strictRule: RateLimitRule = { maxRequests: 1, windowSeconds: 10 }; - const r1 = checkAndRecord('user:strict', strictRule); + const r1 = await checkAndRecord('user:strict', strictRule); expect(r1.allowed).toBe(true); expect(r1.remaining).toBe(0); - const r2 = checkAndRecord('user:strict', strictRule); + const r2 = await checkAndRecord('user:strict', strictRule); expect(r2.allowed).toBe(false); }); - it('handles large maxRequests without issues', () => { + it('handles large maxRequests without issues', async () => { const bigRule: RateLimitRule = { maxRequests: 10000, windowSeconds: 60 }; - const result = checkAndRecord('user:big', bigRule); + const result = await checkAndRecord('user:big', bigRule); expect(result.allowed).toBe(true); expect(result.remaining).toBe(9999); }); }); describe('reset', () => { - it('clears rate limit for a specific key', () => { + it('clears rate limit for a specific key', async () => { const rule: RateLimitRule = { maxRequests: 2, windowSeconds: 60 }; - checkAndRecord('user:reset', rule); - checkAndRecord('user:reset', rule); + await checkAndRecord('user:reset', rule); + await checkAndRecord('user:reset', rule); - const blocked = checkAndRecord('user:reset', rule); + const blocked = await checkAndRecord('user:reset', rule); expect(blocked.allowed).toBe(false); - reset('user:reset'); + await reset('user:reset'); - const afterReset = checkAndRecord('user:reset', rule); + const afterReset = await checkAndRecord('user:reset', rule); expect(afterReset.allowed).toBe(true); expect(afterReset.remaining).toBe(1); }); - it('does not affect other keys', () => { + it('does not affect other keys', async () => { const rule: RateLimitRule = { maxRequests: 1, windowSeconds: 60 }; - checkAndRecord('user:keep', rule); - checkAndRecord('user:clear', rule); + await checkAndRecord('user:keep', rule); + await checkAndRecord('user:clear', rule); - reset('user:clear'); + await reset('user:clear'); - const keep = checkAndRecord('user:keep', rule); + const keep = await checkAndRecord('user:keep', rule); expect(keep.allowed).toBe(false); - const cleared = checkAndRecord('user:clear', rule); + const cleared = await checkAndRecord('user:clear', rule); expect(cleared.allowed).toBe(true); }); - it('is safe to reset non-existent key', () => { - expect(() => reset('nonexistent')).not.toThrow(); + it('is safe to reset non-existent key', async () => { + await expect(reset('nonexistent')).resolves.toBeUndefined(); }); }); describe('peek', () => { const rule: RateLimitRule = { maxRequests: 3, windowSeconds: 60 }; - it('returns full remaining for unknown key', () => { - const result = peek('new-key', rule); + it('returns full remaining for unknown key', async () => { + const result = await peek('new-key', rule); expect(result.allowed).toBe(true); expect(result.remaining).toBe(3); expect(result.limit).toBe(3); }); - it('does not consume a request', () => { - peek('user:peek', rule); - peek('user:peek', rule); - peek('user:peek', rule); + it('does not consume a request', async () => { + await peek('user:peek', rule); + await peek('user:peek', rule); + await peek('user:peek', rule); - const result = checkAndRecord('user:peek', rule); + const result = await checkAndRecord('user:peek', rule); expect(result.allowed).toBe(true); expect(result.remaining).toBe(2); }); - it('reflects current usage', () => { - checkAndRecord('user:usage', rule); - checkAndRecord('user:usage', rule); + it('reflects current usage', async () => { + await checkAndRecord('user:usage', rule); + await checkAndRecord('user:usage', rule); - const result = peek('user:usage', rule); + const result = await peek('user:usage', rule); expect(result.remaining).toBe(1); expect(result.allowed).toBe(true); }); - it('shows blocked status when limit exceeded', () => { - checkAndRecord('user:full', rule); - checkAndRecord('user:full', rule); - checkAndRecord('user:full', rule); + it('shows blocked status when limit exceeded', async () => { + await checkAndRecord('user:full', rule); + await checkAndRecord('user:full', rule); + await checkAndRecord('user:full', rule); - const result = peek('user:full', rule); + const result = await peek('user:full', rule); expect(result.allowed).toBe(false); expect(result.remaining).toBe(0); }); }); describe('clearAll', () => { - it('resets all keys', () => { + it('resets all keys', async () => { const rule: RateLimitRule = { maxRequests: 1, windowSeconds: 60 }; - checkAndRecord('a', rule); - checkAndRecord('b', rule); + await checkAndRecord('a', rule); + await checkAndRecord('b', rule); - clearAll(); + await clearAll(); - expect(checkAndRecord('a', rule).allowed).toBe(true); - expect(checkAndRecord('b', rule).allowed).toBe(true); + expect((await checkAndRecord('a', rule)).allowed).toBe(true); + expect((await checkAndRecord('b', rule)).allowed).toBe(true); }); }); }); diff --git a/services/platform-service/src/modules/ratelimit/routes.ts b/services/platform-service/src/modules/ratelimit/routes.ts index 98b43c9a..8e8c55bd 100644 --- a/services/platform-service/src/modules/ratelimit/routes.ts +++ b/services/platform-service/src/modules/ratelimit/routes.ts @@ -80,7 +80,7 @@ export async function rateLimitRoutes(app: FastifyInstance) { const { key, productId, routePrefix } = parsed.data; const rule = findRule(productId, routePrefix); const compositeKey = `${productId}:${key}${routePrefix ? `:${routePrefix}` : ''}`; - const result = store.checkAndRecord(compositeKey, rule); + const result = await store.checkAndRecord(compositeKey, rule); if (!result.allowed) { reply.code(429).header('Retry-After', String(Math.ceil((result.retryAfterMs ?? 0) / 1000))); @@ -93,7 +93,7 @@ export async function rateLimitRoutes(app: FastifyInstance) { const { key, productId } = req.body as { key?: string; productId?: string }; const pid = productId || getRequestProductId(req); if (!key) throw new BadRequestError('key is required'); - store.reset(`${pid}:${key}`); + await store.reset(`${pid}:${key}`); return { reset: true }; }); diff --git a/services/platform-service/src/modules/ratelimit/store.ts b/services/platform-service/src/modules/ratelimit/store.ts index 76c78ae4..d01a3ece 100644 --- a/services/platform-service/src/modules/ratelimit/store.ts +++ b/services/platform-service/src/modules/ratelimit/store.ts @@ -1,101 +1,166 @@ /** - * Rate limit store — sliding window counter. + * Rate limit store — sliding window counter with datastore backing. * - * Default: in-memory Map (single-instance). - * When REDIS_URL is set: uses Redis sorted sets for horizontal scaling. - * - * Each key tracks request timestamps within the window. Old entries are - * pruned on every check. + * Default: datastore-backed for shared enforcement across service instances. + * Fallback: in-memory when RATE_LIMIT_STORE_MODE=memory. */ +import crypto from 'node:crypto'; +import { getCollection } from '../../lib/datastore.js'; import type { RateLimitEntry, RateLimitResult, RateLimitRule } from './types.js'; -// ── In-Memory Store ────────────────────────────────────────── +interface RateLimitDoc { + id: string; + productId: string; + key: string; + timestamps: number[]; + updatedAt: string; + ttl?: number; +} -const store = new Map(); +const memoryStore = new Map(); +let storeNamespace = 'default'; -/** - * Check (and record) a request against the sliding window for the given key. - * Returns whether the request is allowed and remaining quota. - */ -export function checkAndRecord(key: string, rule: RateLimitRule): RateLimitResult { - const now = Date.now(); +function shouldUseMemoryStore(): boolean { + return process.env.RATE_LIMIT_STORE_MODE === 'memory'; +} + +function rateLimitCollection() { + return getCollection('rate_limit_entries', '/id'); +} + +function normalizeKey(key: string): string { + return `${storeNamespace}:${key}`; +} + +function docIdForKey(key: string): string { + return `rl_${crypto.createHash('sha256').update(normalizeKey(key)).digest('hex')}`; +} + +function toResult( + timestamps: number[], + rule: RateLimitRule, + now: number, + blocked: boolean +): RateLimitResult { const windowMs = rule.windowSeconds * 1000; - const windowStart = now - windowMs; - // Get or create entry - let entry = store.get(key); - if (!entry) { - entry = { timestamps: [] }; - store.set(key, entry); - } - - // Prune expired timestamps - entry.timestamps = entry.timestamps.filter(t => t > windowStart); - - // Check limit - if (entry.timestamps.length >= rule.maxRequests) { - const oldestInWindow = entry.timestamps[0]; - const resetAt = new Date(oldestInWindow + windowMs).toISOString(); + if (blocked) { + const oldestInWindow = timestamps[0]; const retryAfterMs = oldestInWindow + windowMs - now; return { allowed: false, limit: rule.maxRequests, remaining: 0, - resetAt, + resetAt: new Date(oldestInWindow + windowMs).toISOString(), retryAfterMs: Math.max(0, retryAfterMs), }; } - // Record this request - entry.timestamps.push(now); - - const resetAt = new Date(now + windowMs).toISOString(); return { allowed: true, limit: rule.maxRequests, - remaining: rule.maxRequests - entry.timestamps.length, - resetAt, - }; -} - -/** - * Reset the rate limit for a specific key (e.g., after a successful auth). - */ -export function reset(key: string): void { - store.delete(key); -} - -/** - * Get current state without recording a request (read-only check). - */ -export function peek(key: string, rule: RateLimitRule): RateLimitResult { - const now = Date.now(); - const windowMs = rule.windowSeconds * 1000; - const windowStart = now - windowMs; - - const entry = store.get(key); - if (!entry) { - return { - allowed: true, - limit: rule.maxRequests, - remaining: rule.maxRequests, - resetAt: new Date(now + windowMs).toISOString(), - }; - } - - const active = entry.timestamps.filter(t => t > windowStart); - const remaining = Math.max(0, rule.maxRequests - active.length); - - return { - allowed: remaining > 0, - limit: rule.maxRequests, - remaining, + remaining: Math.max(0, rule.maxRequests - timestamps.length), resetAt: new Date(now + windowMs).toISOString(), }; } -/** Clear all entries (for testing). */ -export function clearAll(): void { - store.clear(); +async function checkAndRecordDatastore(key: string, rule: RateLimitRule): Promise { + const now = Date.now(); + const windowMs = rule.windowSeconds * 1000; + const windowStart = now - windowMs; + const docId = docIdForKey(key); + const existing = await rateLimitCollection().findById(docId, docId); + const timestamps = (existing?.timestamps ?? []).filter(timestamp => timestamp > windowStart); + + if (timestamps.length >= rule.maxRequests) { + return toResult(timestamps, rule, now, true); + } + + const nextTimestamps = [...timestamps, now]; + const doc: RateLimitDoc = { + id: docId, + productId: '__platform__', + key: normalizeKey(key), + timestamps: nextTimestamps, + updatedAt: new Date(now).toISOString(), + ttl: Math.max(rule.windowSeconds * 2, 60), + }; + + await rateLimitCollection().upsert(doc); + return toResult(nextTimestamps, rule, now, false); +} + +function checkAndRecordMemory(key: string, rule: RateLimitRule): RateLimitResult { + const now = Date.now(); + const windowMs = rule.windowSeconds * 1000; + const windowStart = now - windowMs; + const normalizedKey = normalizeKey(key); + + let entry = memoryStore.get(normalizedKey); + if (!entry) { + entry = { timestamps: [] }; + memoryStore.set(normalizedKey, entry); + } + + entry.timestamps = entry.timestamps.filter(timestamp => timestamp > windowStart); + if (entry.timestamps.length >= rule.maxRequests) { + return toResult(entry.timestamps, rule, now, true); + } + + entry.timestamps.push(now); + return toResult(entry.timestamps, rule, now, false); +} + +async function peekDatastore(key: string, rule: RateLimitRule): Promise { + const now = Date.now(); + const windowMs = rule.windowSeconds * 1000; + const windowStart = now - windowMs; + const docId = docIdForKey(key); + const existing = await rateLimitCollection().findById(docId, docId); + const timestamps = (existing?.timestamps ?? []).filter(timestamp => timestamp > windowStart); + return toResult(timestamps, rule, now, timestamps.length >= rule.maxRequests); +} + +function peekMemory(key: string, rule: RateLimitRule): RateLimitResult { + const now = Date.now(); + const windowMs = rule.windowSeconds * 1000; + const windowStart = now - windowMs; + const entry = memoryStore.get(normalizeKey(key)); + const timestamps = (entry?.timestamps ?? []).filter(timestamp => timestamp > windowStart); + return toResult(timestamps, rule, now, timestamps.length >= rule.maxRequests); +} + +export async function checkAndRecord(key: string, rule: RateLimitRule): Promise { + if (shouldUseMemoryStore()) { + return checkAndRecordMemory(key, rule); + } + + return checkAndRecordDatastore(key, rule); +} + +export async function reset(key: string): Promise { + const normalizedKey = normalizeKey(key); + if (shouldUseMemoryStore()) { + memoryStore.delete(normalizedKey); + return; + } + + const docId = docIdForKey(key); + await rateLimitCollection() + .delete(docId, docId) + .catch(() => {}); +} + +export async function peek(key: string, rule: RateLimitRule): Promise { + if (shouldUseMemoryStore()) { + return peekMemory(key, rule); + } + + return peekDatastore(key, rule); +} + +export async function clearAll(): Promise { + memoryStore.clear(); + storeNamespace = crypto.randomUUID(); } diff --git a/services/platform-service/src/modules/runs/routes.ts b/services/platform-service/src/modules/runs/routes.ts index 81fe3182..0ad871be 100644 --- a/services/platform-service/src/modules/runs/routes.ts +++ b/services/platform-service/src/modules/runs/routes.ts @@ -12,8 +12,8 @@ import * as repo from './repository.js'; import * as tracker from './tracker.js'; export async function runRoutes(app: FastifyInstance) { - function requireRunsRead(req: import('fastify').FastifyRequest): string { - const access = requireJwtOrApiKey(req, { + async function requireRunsRead(req: import('fastify').FastifyRequest): Promise { + const access = await requireJwtOrApiKey(req, { jwtRoles: ['super_admin', 'admin'], apiKeyScopes: ['jobs:read'], apiKeyTokenTypes: ['service_api'], @@ -22,11 +22,11 @@ export async function runRoutes(app: FastifyInstance) { return access.productId; } - function requireRunsWrite(req: import('fastify').FastifyRequest): { + async function requireRunsWrite(req: import('fastify').FastifyRequest): Promise<{ productId: string; actorId: string; - } { - const access = requireJwtOrApiKey(req, { + }> { + const access = await requireJwtOrApiKey(req, { jwtRoles: ['super_admin', 'admin'], apiKeyTokenTypes: ['service_api'], rateLimitKey: 'jobs:write', @@ -35,7 +35,7 @@ export async function runRoutes(app: FastifyInstance) { } app.get('/runs', async req => { - const productId = requireRunsRead(req); + const productId = await requireRunsRead(req); const parsed = ListRunsQuerySchema.safeParse(req.query); if (!parsed.success) { throw new BadRequestError(parsed.error.issues.map(issue => issue.message).join('; ')); @@ -45,19 +45,19 @@ export async function runRoutes(app: FastifyInstance) { }); app.get('/runs/:id', async req => { - const productId = requireRunsRead(req); + const productId = await requireRunsRead(req); const { id } = req.params as { id: string }; return repo.getRun(id, productId); }); app.get('/runs/:id/steps', async req => { - const productId = requireRunsRead(req); + const productId = await requireRunsRead(req); const { id } = req.params as { id: string }; return repo.listRunSteps(productId, id); }); app.post('/runs', async req => { - const { productId, actorId } = requireRunsWrite(req); + const { productId, actorId } = await requireRunsWrite(req); const parsed = CreateRunSchema.safeParse(req.body); if (!parsed.success) { throw new BadRequestError(parsed.error.issues.map(issue => issue.message).join('; ')); @@ -71,7 +71,7 @@ export async function runRoutes(app: FastifyInstance) { }); app.patch('/runs/:id', async req => { - const { productId } = requireRunsWrite(req); + const { productId } = await requireRunsWrite(req); const { id } = req.params as { id: string }; const parsed = UpdateRunSchema.safeParse(req.body); if (!parsed.success) { @@ -95,7 +95,7 @@ export async function runRoutes(app: FastifyInstance) { }); app.post('/runs/:id/steps', async req => { - const { productId } = requireRunsWrite(req); + const { productId } = await requireRunsWrite(req); const { id } = req.params as { id: string }; const parsed = CreateRunStepSchema.safeParse(req.body); if (!parsed.success) { @@ -110,7 +110,7 @@ export async function runRoutes(app: FastifyInstance) { }); app.patch('/runs/:id/steps/:stepName', async req => { - const { productId } = requireRunsWrite(req); + const { productId } = await requireRunsWrite(req); const { id, stepName } = req.params as { id: string; stepName: string }; const parsed = UpdateRunStepSchema.safeParse(req.body); if (!parsed.success) { diff --git a/services/platform-service/src/modules/webhooks/routes.ts b/services/platform-service/src/modules/webhooks/routes.ts index 053c7327..10f219c2 100644 --- a/services/platform-service/src/modules/webhooks/routes.ts +++ b/services/platform-service/src/modules/webhooks/routes.ts @@ -7,8 +7,8 @@ import * as repo from './repository.js'; import { dispatchEvent } from './dispatcher.js'; export async function webhookRoutes(app: FastifyInstance) { - function requireWebhooksRead(req: import('fastify').FastifyRequest): string { - const access = requireJwtOrApiKey(req, { + async function requireWebhooksRead(req: import('fastify').FastifyRequest): Promise { + const access = await requireJwtOrApiKey(req, { jwtRoles: ['super_admin', 'admin'], apiKeyScopes: ['webhooks:read'], rateLimitKey: 'webhooks:read', @@ -16,11 +16,11 @@ export async function webhookRoutes(app: FastifyInstance) { return access.productId; } - function requireWebhooksWrite(req: import('fastify').FastifyRequest): { + async function requireWebhooksWrite(req: import('fastify').FastifyRequest): Promise<{ actorId: string; productId: string; - } { - const access = requireJwtOrApiKey(req, { + }> { + const access = await requireJwtOrApiKey(req, { jwtRoles: ['super_admin', 'admin'], apiKeyScopes: ['webhooks:write'], rateLimitKey: 'webhooks:write', @@ -33,13 +33,13 @@ export async function webhookRoutes(app: FastifyInstance) { // List webhook subscriptions app.get('/webhooks/subscriptions', async req => { - const productId = requireWebhooksRead(req); + const productId = await requireWebhooksRead(req); return repo.listSubscriptions(productId); }); // Create a webhook subscription app.post('/webhooks/subscriptions', async req => { - const access = requireWebhooksWrite(req); + const access = await requireWebhooksWrite(req); const parsed = CreateWebhookSubscriptionSchema.safeParse(req.body); if (!parsed.success) { throw new BadRequestError(parsed.error.issues.map(i => i.message).join('; ')); @@ -58,7 +58,7 @@ export async function webhookRoutes(app: FastifyInstance) { // Get a specific subscription app.get('/webhooks/subscriptions/:id', async req => { - const productId = requireWebhooksRead(req); + const productId = await requireWebhooksRead(req); const { id } = req.params as { id: string }; const sub = await repo.getSubscription(id, productId); @@ -70,7 +70,7 @@ export async function webhookRoutes(app: FastifyInstance) { // Update a subscription app.patch('/webhooks/subscriptions/:id', async req => { - const access = requireWebhooksWrite(req); + const access = await requireWebhooksWrite(req); const { id } = req.params as { id: string }; const parsed = UpdateWebhookSubscriptionSchema.safeParse(req.body); @@ -85,7 +85,7 @@ export async function webhookRoutes(app: FastifyInstance) { // Delete a subscription app.delete('/webhooks/subscriptions/:id', async req => { - const productId = requireWebhooksWrite(req).productId; + const productId = (await requireWebhooksWrite(req)).productId; const { id } = req.params as { id: string }; const deleted = await repo.deleteSubscription(id, productId); @@ -95,7 +95,7 @@ export async function webhookRoutes(app: FastifyInstance) { // List deliveries for a subscription app.get('/webhooks/subscriptions/:id/deliveries', async req => { - requireWebhooksRead(req); + await requireWebhooksRead(req); const { id } = req.params as { id: string }; const query = req.query as Record; return repo.listDeliveries(id, { @@ -105,7 +105,7 @@ export async function webhookRoutes(app: FastifyInstance) { // Test delivery — send a test event to a specific subscription app.post('/webhooks/subscriptions/:id/test', async req => { - const productId = requireWebhooksWrite(req).productId; + const productId = (await requireWebhooksWrite(req)).productId; const { id } = req.params as { id: string }; const sub = await repo.getSubscription(id, productId); @@ -123,7 +123,7 @@ export async function webhookRoutes(app: FastifyInstance) { // Rotate subscription secret app.post('/webhooks/subscriptions/:id/rotate-secret', async req => { - const productId = requireWebhooksWrite(req).productId; + const productId = (await requireWebhooksWrite(req)).productId; const { id } = req.params as { id: string }; const sub = await repo.getSubscription(id, productId);