feat(ratelimit): back api throttles with datastore
This commit is contained in:
parent
8b99b7a9a7
commit
14346fbd5d
@ -40,13 +40,13 @@ async function seedApiKeyToken(
|
||||
}
|
||||
|
||||
describe('api key auth', () => {
|
||||
beforeEach(() => {
|
||||
beforeEach(async () => {
|
||||
provider = new MemoryDatastoreProvider();
|
||||
setProvider(provider);
|
||||
delete process.env.API_KEY_RATE_LIMIT_CONFIG_JSON;
|
||||
delete process.env.API_KEY_PRODUCT_RATE_LIMIT_CONFIG_JSON;
|
||||
delete process.env.PLATFORM_RUNTIME_ENV;
|
||||
clearRateLimits();
|
||||
await clearRateLimits();
|
||||
});
|
||||
|
||||
it('attaches apiKeyAuth from x-api-key', async () => {
|
||||
@ -55,7 +55,7 @@ describe('api key auth', () => {
|
||||
await registerOptionalApiKeyContext(app);
|
||||
|
||||
app.get('/probe', async req => {
|
||||
const actor = requireJwtOrApiKey(req, {
|
||||
const actor = await requireJwtOrApiKey(req, {
|
||||
apiKeyScopes: ['jobs:read'],
|
||||
rateLimitKey: 'jobs:read',
|
||||
});
|
||||
@ -90,7 +90,7 @@ describe('api key auth', () => {
|
||||
await registerOptionalApiKeyContext(app);
|
||||
|
||||
app.get('/probe', async req => {
|
||||
return requireJwtOrApiKey(req, {
|
||||
return await requireJwtOrApiKey(req, {
|
||||
apiKeyScopes: ['jobs:read'],
|
||||
});
|
||||
});
|
||||
@ -113,7 +113,7 @@ describe('api key auth', () => {
|
||||
await registerOptionalApiKeyContext(app);
|
||||
|
||||
app.get('/probe', async req => {
|
||||
return requireJwtOrApiKey(req, {
|
||||
return await requireJwtOrApiKey(req, {
|
||||
apiKeyScopes: ['jobs:read'],
|
||||
});
|
||||
});
|
||||
@ -135,7 +135,7 @@ describe('api key auth', () => {
|
||||
await registerOptionalApiKeyContext(app);
|
||||
|
||||
app.get('/probe', async req => {
|
||||
return requireJwtOrApiKey(req, {
|
||||
return await requireJwtOrApiKey(req, {
|
||||
apiKeyScopes: ['jobs:write'],
|
||||
});
|
||||
});
|
||||
@ -164,7 +164,7 @@ describe('api key auth', () => {
|
||||
await registerOptionalApiKeyContext(app);
|
||||
|
||||
app.get('/probe', async req => {
|
||||
return requireJwtOrApiKey(req, {
|
||||
return await requireJwtOrApiKey(req, {
|
||||
apiKeyScopes: ['maintenance:read'],
|
||||
apiKeyTokenTypes: ['service_api'],
|
||||
});
|
||||
@ -188,7 +188,7 @@ describe('api key auth', () => {
|
||||
await registerOptionalApiKeyContext(app);
|
||||
|
||||
app.get('/probe', async req => {
|
||||
return requireJwtOrApiKey(req, {
|
||||
return await requireJwtOrApiKey(req, {
|
||||
apiKeyScopes: ['jobs:read'],
|
||||
});
|
||||
});
|
||||
@ -219,7 +219,7 @@ describe('api key auth', () => {
|
||||
await registerOptionalApiKeyContext(app);
|
||||
|
||||
app.post('/probe', async req => {
|
||||
return requireJwtOrApiKey(req, {
|
||||
return await requireJwtOrApiKey(req, {
|
||||
apiKeyScopes: ['jobs:write'],
|
||||
rateLimitKey: 'jobs:write',
|
||||
});
|
||||
@ -271,7 +271,7 @@ describe('api key auth', () => {
|
||||
await registerOptionalApiKeyContext(app);
|
||||
|
||||
app.post('/probe', async req => {
|
||||
return requireJwtOrApiKey(req, {
|
||||
return await requireJwtOrApiKey(req, {
|
||||
apiKeyScopes: ['jobs:write'],
|
||||
rateLimitKey: 'jobs:write',
|
||||
});
|
||||
@ -304,7 +304,7 @@ describe('api key auth', () => {
|
||||
});
|
||||
|
||||
app.get('/probe', async req => {
|
||||
return requireJwtOrApiKey(req, {
|
||||
return await requireJwtOrApiKey(req, {
|
||||
jwtRoles: ['admin'],
|
||||
});
|
||||
});
|
||||
|
||||
@ -1,3 +1,4 @@
|
||||
import crypto from 'node:crypto';
|
||||
import bcrypt from 'bcryptjs';
|
||||
import type { FastifyInstance, FastifyRequest } from 'fastify';
|
||||
import { ForbiddenError, TooManyRequestsError, UnauthorizedError } from './errors.js';
|
||||
@ -266,7 +267,7 @@ function ensureApiKeyScopes(
|
||||
return apiKey;
|
||||
}
|
||||
|
||||
function enforceApiKeyRateLimit(req: FastifyRequest, rateLimitKey?: string): void {
|
||||
async function enforceApiKeyRateLimit(req: FastifyRequest, rateLimitKey?: string): Promise<void> {
|
||||
if (!rateLimitKey || !req.apiKeyAuth) return;
|
||||
|
||||
const apiKeyRateLimitConfig = loadApiKeyRateLimitConfig();
|
||||
@ -274,7 +275,7 @@ function enforceApiKeyRateLimit(req: FastifyRequest, rateLimitKey?: string): voi
|
||||
if (!keyRule) return;
|
||||
|
||||
const compositeKey = `api-key:${req.apiKeyAuth.productId}:${req.apiKeyAuth.tokenId}:${rateLimitKey}`;
|
||||
const keyResult = rateLimitStore.checkAndRecord(compositeKey, keyRule);
|
||||
const keyResult = await rateLimitStore.checkAndRecord(compositeKey, keyRule);
|
||||
if (!keyResult.allowed) {
|
||||
logApiKeyWarning(req, 'token_rate_limited', {
|
||||
rateLimitKey,
|
||||
@ -289,7 +290,7 @@ function enforceApiKeyRateLimit(req: FastifyRequest, rateLimitKey?: string): voi
|
||||
if (!productRule) return;
|
||||
|
||||
const productCompositeKey = `api-key-product:${req.apiKeyAuth.productId}:${rateLimitKey}`;
|
||||
const productResult = rateLimitStore.checkAndRecord(productCompositeKey, productRule);
|
||||
const productResult = await rateLimitStore.checkAndRecord(productCompositeKey, productRule);
|
||||
if (!productResult.allowed) {
|
||||
logApiKeyWarning(req, 'product_rate_limited', {
|
||||
rateLimitKey,
|
||||
@ -301,10 +302,10 @@ function enforceApiKeyRateLimit(req: FastifyRequest, rateLimitKey?: string): voi
|
||||
}
|
||||
}
|
||||
|
||||
export function requireJwtOrApiKey(
|
||||
export async function requireJwtOrApiKey(
|
||||
req: FastifyRequest,
|
||||
{ allowJwt = false, jwtRoles, apiKeyScopes, apiKeyTokenTypes, rateLimitKey }: AccessOptions = {}
|
||||
): AccessActor {
|
||||
): Promise<AccessActor> {
|
||||
const jwt = req.jwtPayload;
|
||||
if (jwt?.sub) {
|
||||
if (jwtRoles && jwtRoles.length > 0) {
|
||||
@ -323,7 +324,7 @@ export function requireJwtOrApiKey(
|
||||
}
|
||||
|
||||
const apiKey = ensureApiKeyScopes(req, apiKeyScopes, apiKeyTokenTypes);
|
||||
enforceApiKeyRateLimit(req, rateLimitKey);
|
||||
await enforceApiKeyRateLimit(req, rateLimitKey);
|
||||
|
||||
return {
|
||||
actorId: apiKey.userId,
|
||||
|
||||
@ -26,7 +26,7 @@ export async function exportRoutes(app: FastifyInstance) {
|
||||
|
||||
// Start a new export job
|
||||
app.post('/exports', async (req, reply) => {
|
||||
const access = requireExportWrite(req);
|
||||
const access = await requireExportWrite(req);
|
||||
const parsed = CreateExportSchema.safeParse(req.body);
|
||||
if (!parsed.success) {
|
||||
throw new BadRequestError(parsed.error.issues.map(i => i.message).join('; '));
|
||||
@ -61,7 +61,7 @@ export async function exportRoutes(app: FastifyInstance) {
|
||||
|
||||
// List export jobs
|
||||
app.get('/exports', async req => {
|
||||
const access = requireExportRead(req);
|
||||
const access = await requireExportRead(req);
|
||||
const query = req.query as Record<string, string>;
|
||||
const limit = query.limit ? parseInt(query.limit, 10) : 20;
|
||||
const jobs = await repo.listExportJobs(access.productId, Math.min(limit, 100));
|
||||
@ -70,7 +70,7 @@ export async function exportRoutes(app: FastifyInstance) {
|
||||
|
||||
// Get a specific export job
|
||||
app.get('/exports/:id', async req => {
|
||||
const access = requireExportRead(req);
|
||||
const access = await requireExportRead(req);
|
||||
const { id } = req.params as { id: string };
|
||||
const job = await repo.getExportJob(id, access.productId);
|
||||
if (!job) throw new BadRequestError('Export job not found');
|
||||
|
||||
@ -26,13 +26,13 @@ export async function ipRuleRoutes(app: FastifyInstance) {
|
||||
|
||||
// List all IP rules
|
||||
app.get('/ratelimit/ip-rules', async req => {
|
||||
const access = requireIpRulesRead(req);
|
||||
const access = await requireIpRulesRead(req);
|
||||
return repo.listRules(access.productId);
|
||||
});
|
||||
|
||||
// Create an IP rule
|
||||
app.post('/ratelimit/ip-rules', async (req, reply) => {
|
||||
const access = requireIpRulesWrite(req);
|
||||
const access = await requireIpRulesWrite(req);
|
||||
const parsed = CreateIPRuleSchema.safeParse(req.body);
|
||||
if (!parsed.success) {
|
||||
throw new BadRequestError(parsed.error.issues.map(i => i.message).join('; '));
|
||||
@ -60,7 +60,7 @@ export async function ipRuleRoutes(app: FastifyInstance) {
|
||||
|
||||
// Delete an IP rule
|
||||
app.delete('/ratelimit/ip-rules/:id', async req => {
|
||||
const access = requireIpRulesWrite(req);
|
||||
const access = await requireIpRulesWrite(req);
|
||||
const { id } = req.params as { id: string };
|
||||
const deleted = await repo.deleteRule(id, access.productId);
|
||||
if (!deleted) throw new BadRequestError('IP rule not found');
|
||||
@ -69,7 +69,7 @@ export async function ipRuleRoutes(app: FastifyInstance) {
|
||||
|
||||
// Check if an IP is allowed/denied (utility endpoint)
|
||||
app.get('/ratelimit/check-ip/:ip', async req => {
|
||||
const access = requireIpRulesRead(req);
|
||||
const access = await requireIpRulesRead(req);
|
||||
const { ip } = req.params as { ip: string };
|
||||
const result = await repo.checkIP(ip, access.productId);
|
||||
return { ip, action: result, hasRule: result !== null };
|
||||
|
||||
@ -7,8 +7,8 @@ import { getJobHandler } from './registry.js';
|
||||
import { executeJob } from './runner.js';
|
||||
|
||||
export async function jobRoutes(app: FastifyInstance) {
|
||||
function requireJobsRead(req: import('fastify').FastifyRequest): string {
|
||||
const access = requireJwtOrApiKey(req, {
|
||||
async function requireJobsRead(req: import('fastify').FastifyRequest): Promise<string> {
|
||||
const access = await requireJwtOrApiKey(req, {
|
||||
allowJwt: true,
|
||||
apiKeyScopes: ['jobs:read'],
|
||||
apiKeyTokenTypes: ['service_api'],
|
||||
@ -17,8 +17,8 @@ export async function jobRoutes(app: FastifyInstance) {
|
||||
return access.productId;
|
||||
}
|
||||
|
||||
function requireJobsWrite(req: import('fastify').FastifyRequest): string {
|
||||
const access = requireJwtOrApiKey(req, {
|
||||
async function requireJobsWrite(req: import('fastify').FastifyRequest): Promise<string> {
|
||||
const access = await requireJwtOrApiKey(req, {
|
||||
jwtRoles: ['super_admin', 'admin'],
|
||||
apiKeyScopes: ['jobs:write'],
|
||||
apiKeyTokenTypes: ['service_api'],
|
||||
@ -29,20 +29,20 @@ export async function jobRoutes(app: FastifyInstance) {
|
||||
|
||||
// List all job definitions
|
||||
app.get('/jobs', async req => {
|
||||
const productId = requireJobsRead(req);
|
||||
const productId = await requireJobsRead(req);
|
||||
return repo.listJobDefinitions(productId);
|
||||
});
|
||||
|
||||
// Get a specific job definition
|
||||
app.get('/jobs/:id', async req => {
|
||||
const productId = requireJobsRead(req);
|
||||
const productId = await requireJobsRead(req);
|
||||
const { id } = req.params as { id: string };
|
||||
return repo.getJobDefinition(id, productId);
|
||||
});
|
||||
|
||||
// Update job (enable/disable, change cron, etc.)
|
||||
app.put('/jobs/:id', async req => {
|
||||
const productId = requireJobsWrite(req);
|
||||
const productId = await requireJobsWrite(req);
|
||||
const { id } = req.params as { id: string };
|
||||
const parsed = UpdateJobSchema.safeParse(req.body);
|
||||
if (!parsed.success) {
|
||||
@ -53,7 +53,7 @@ export async function jobRoutes(app: FastifyInstance) {
|
||||
|
||||
// Manually trigger a job
|
||||
app.post('/jobs/trigger', async req => {
|
||||
const productId = requireJobsWrite(req);
|
||||
const productId = await requireJobsWrite(req);
|
||||
const parsed = TriggerJobSchema.safeParse(req.body);
|
||||
if (!parsed.success) {
|
||||
throw new BadRequestError(parsed.error.issues.map(i => i.message).join('; '));
|
||||
@ -72,7 +72,7 @@ export async function jobRoutes(app: FastifyInstance) {
|
||||
|
||||
// List recent runs for a job
|
||||
app.get('/jobs/:name/runs', async req => {
|
||||
const productId = requireJobsRead(req);
|
||||
const productId = await requireJobsRead(req);
|
||||
const { name } = req.params as { name: string };
|
||||
const limit = parseInt((req.query as Record<string, string>).limit || '20', 10);
|
||||
return repo.listJobRuns(productId, name, Math.min(limit, 100));
|
||||
|
||||
@ -48,13 +48,13 @@ export async function maintenanceRoutes(app: FastifyInstance) {
|
||||
|
||||
// Get full maintenance config (admin sees bypass rules too)
|
||||
app.get('/settings/maintenance/full', async req => {
|
||||
const access = requireMaintenanceRead(req);
|
||||
const access = await requireMaintenanceRead(req);
|
||||
return repo.getMaintenanceConfig(access.productId);
|
||||
});
|
||||
|
||||
// Update maintenance mode
|
||||
app.put('/settings/maintenance', async req => {
|
||||
const access = requireMaintenanceWrite(req);
|
||||
const access = await requireMaintenanceWrite(req);
|
||||
const parsed = UpdateMaintenanceSchema.safeParse(req.body);
|
||||
if (!parsed.success) {
|
||||
throw new BadRequestError(parsed.error.issues.map(i => i.message).join('; '));
|
||||
@ -76,7 +76,7 @@ export async function maintenanceRoutes(app: FastifyInstance) {
|
||||
|
||||
// Create a scheduled maintenance window
|
||||
app.post('/settings/maintenance/schedule', async (req, reply) => {
|
||||
const access = requireMaintenanceWrite(req);
|
||||
const access = await requireMaintenanceWrite(req);
|
||||
const parsed = CreateMaintenanceWindowSchema.safeParse(req.body);
|
||||
if (!parsed.success) {
|
||||
throw new BadRequestError(parsed.error.issues.map(i => i.message).join('; '));
|
||||
@ -104,7 +104,7 @@ export async function maintenanceRoutes(app: FastifyInstance) {
|
||||
|
||||
// Delete a scheduled maintenance window
|
||||
app.delete('/settings/maintenance/schedule/:id', async req => {
|
||||
const access = requireMaintenanceWrite(req);
|
||||
const access = await requireMaintenanceWrite(req);
|
||||
const { id } = req.params as { id: string };
|
||||
const deleted = await repo.deleteWindow(id, access.productId);
|
||||
if (!deleted) throw new BadRequestError('Maintenance window not found');
|
||||
|
||||
@ -10,165 +10,165 @@ import type { RateLimitRule } from './types.js';
|
||||
// ── Store: sliding window counter ──────────────────────────────
|
||||
|
||||
describe('ratelimit store', () => {
|
||||
beforeEach(() => {
|
||||
clearAll();
|
||||
beforeEach(async () => {
|
||||
await clearAll();
|
||||
});
|
||||
|
||||
describe('checkAndRecord', () => {
|
||||
const rule: RateLimitRule = { maxRequests: 3, windowSeconds: 60 };
|
||||
|
||||
it('allows first request and returns full remaining', () => {
|
||||
const result = checkAndRecord('user:123', rule);
|
||||
it('allows first request and returns full remaining', async () => {
|
||||
const result = await checkAndRecord('user:123', rule);
|
||||
expect(result.allowed).toBe(true);
|
||||
expect(result.remaining).toBe(2);
|
||||
expect(result.limit).toBe(3);
|
||||
expect(result.resetAt).toBeDefined();
|
||||
});
|
||||
|
||||
it('decrements remaining on each call', () => {
|
||||
const r1 = checkAndRecord('user:a', rule);
|
||||
it('decrements remaining on each call', async () => {
|
||||
const r1 = await checkAndRecord('user:a', rule);
|
||||
expect(r1.remaining).toBe(2);
|
||||
|
||||
const r2 = checkAndRecord('user:a', rule);
|
||||
const r2 = await checkAndRecord('user:a', rule);
|
||||
expect(r2.remaining).toBe(1);
|
||||
|
||||
const r3 = checkAndRecord('user:a', rule);
|
||||
const r3 = await checkAndRecord('user:a', rule);
|
||||
expect(r3.remaining).toBe(0);
|
||||
});
|
||||
|
||||
it('blocks after exceeding maxRequests', () => {
|
||||
checkAndRecord('user:b', rule);
|
||||
checkAndRecord('user:b', rule);
|
||||
checkAndRecord('user:b', rule);
|
||||
it('blocks after exceeding maxRequests', async () => {
|
||||
await checkAndRecord('user:b', rule);
|
||||
await checkAndRecord('user:b', rule);
|
||||
await checkAndRecord('user:b', rule);
|
||||
|
||||
const result = checkAndRecord('user:b', rule);
|
||||
const result = await checkAndRecord('user:b', rule);
|
||||
expect(result.allowed).toBe(false);
|
||||
expect(result.remaining).toBe(0);
|
||||
expect(result.retryAfterMs).toBeGreaterThan(0);
|
||||
});
|
||||
|
||||
it('uses separate counters for different keys', () => {
|
||||
checkAndRecord('user:x', rule);
|
||||
checkAndRecord('user:x', rule);
|
||||
checkAndRecord('user:x', rule);
|
||||
it('uses separate counters for different keys', async () => {
|
||||
await checkAndRecord('user:x', rule);
|
||||
await checkAndRecord('user:x', rule);
|
||||
await checkAndRecord('user:x', rule);
|
||||
|
||||
const resultX = checkAndRecord('user:x', rule);
|
||||
const resultX = await checkAndRecord('user:x', rule);
|
||||
expect(resultX.allowed).toBe(false);
|
||||
|
||||
const resultY = checkAndRecord('user:y', rule);
|
||||
const resultY = await checkAndRecord('user:y', rule);
|
||||
expect(resultY.allowed).toBe(true);
|
||||
expect(resultY.remaining).toBe(2);
|
||||
});
|
||||
|
||||
it('returns resetAt as ISO string', () => {
|
||||
const result = checkAndRecord('user:time', rule);
|
||||
it('returns resetAt as ISO string', async () => {
|
||||
const result = await checkAndRecord('user:time', rule);
|
||||
expect(() => new Date(result.resetAt)).not.toThrow();
|
||||
expect(new Date(result.resetAt).getTime()).toBeGreaterThan(Date.now() - 1000);
|
||||
});
|
||||
|
||||
it('handles single-request limit', () => {
|
||||
it('handles single-request limit', async () => {
|
||||
const strictRule: RateLimitRule = { maxRequests: 1, windowSeconds: 10 };
|
||||
const r1 = checkAndRecord('user:strict', strictRule);
|
||||
const r1 = await checkAndRecord('user:strict', strictRule);
|
||||
expect(r1.allowed).toBe(true);
|
||||
expect(r1.remaining).toBe(0);
|
||||
|
||||
const r2 = checkAndRecord('user:strict', strictRule);
|
||||
const r2 = await checkAndRecord('user:strict', strictRule);
|
||||
expect(r2.allowed).toBe(false);
|
||||
});
|
||||
|
||||
it('handles large maxRequests without issues', () => {
|
||||
it('handles large maxRequests without issues', async () => {
|
||||
const bigRule: RateLimitRule = { maxRequests: 10000, windowSeconds: 60 };
|
||||
const result = checkAndRecord('user:big', bigRule);
|
||||
const result = await checkAndRecord('user:big', bigRule);
|
||||
expect(result.allowed).toBe(true);
|
||||
expect(result.remaining).toBe(9999);
|
||||
});
|
||||
});
|
||||
|
||||
describe('reset', () => {
|
||||
it('clears rate limit for a specific key', () => {
|
||||
it('clears rate limit for a specific key', async () => {
|
||||
const rule: RateLimitRule = { maxRequests: 2, windowSeconds: 60 };
|
||||
checkAndRecord('user:reset', rule);
|
||||
checkAndRecord('user:reset', rule);
|
||||
await checkAndRecord('user:reset', rule);
|
||||
await checkAndRecord('user:reset', rule);
|
||||
|
||||
const blocked = checkAndRecord('user:reset', rule);
|
||||
const blocked = await checkAndRecord('user:reset', rule);
|
||||
expect(blocked.allowed).toBe(false);
|
||||
|
||||
reset('user:reset');
|
||||
await reset('user:reset');
|
||||
|
||||
const afterReset = checkAndRecord('user:reset', rule);
|
||||
const afterReset = await checkAndRecord('user:reset', rule);
|
||||
expect(afterReset.allowed).toBe(true);
|
||||
expect(afterReset.remaining).toBe(1);
|
||||
});
|
||||
|
||||
it('does not affect other keys', () => {
|
||||
it('does not affect other keys', async () => {
|
||||
const rule: RateLimitRule = { maxRequests: 1, windowSeconds: 60 };
|
||||
checkAndRecord('user:keep', rule);
|
||||
checkAndRecord('user:clear', rule);
|
||||
await checkAndRecord('user:keep', rule);
|
||||
await checkAndRecord('user:clear', rule);
|
||||
|
||||
reset('user:clear');
|
||||
await reset('user:clear');
|
||||
|
||||
const keep = checkAndRecord('user:keep', rule);
|
||||
const keep = await checkAndRecord('user:keep', rule);
|
||||
expect(keep.allowed).toBe(false);
|
||||
|
||||
const cleared = checkAndRecord('user:clear', rule);
|
||||
const cleared = await checkAndRecord('user:clear', rule);
|
||||
expect(cleared.allowed).toBe(true);
|
||||
});
|
||||
|
||||
it('is safe to reset non-existent key', () => {
|
||||
expect(() => reset('nonexistent')).not.toThrow();
|
||||
it('is safe to reset non-existent key', async () => {
|
||||
await expect(reset('nonexistent')).resolves.toBeUndefined();
|
||||
});
|
||||
});
|
||||
|
||||
describe('peek', () => {
|
||||
const rule: RateLimitRule = { maxRequests: 3, windowSeconds: 60 };
|
||||
|
||||
it('returns full remaining for unknown key', () => {
|
||||
const result = peek('new-key', rule);
|
||||
it('returns full remaining for unknown key', async () => {
|
||||
const result = await peek('new-key', rule);
|
||||
expect(result.allowed).toBe(true);
|
||||
expect(result.remaining).toBe(3);
|
||||
expect(result.limit).toBe(3);
|
||||
});
|
||||
|
||||
it('does not consume a request', () => {
|
||||
peek('user:peek', rule);
|
||||
peek('user:peek', rule);
|
||||
peek('user:peek', rule);
|
||||
it('does not consume a request', async () => {
|
||||
await peek('user:peek', rule);
|
||||
await peek('user:peek', rule);
|
||||
await peek('user:peek', rule);
|
||||
|
||||
const result = checkAndRecord('user:peek', rule);
|
||||
const result = await checkAndRecord('user:peek', rule);
|
||||
expect(result.allowed).toBe(true);
|
||||
expect(result.remaining).toBe(2);
|
||||
});
|
||||
|
||||
it('reflects current usage', () => {
|
||||
checkAndRecord('user:usage', rule);
|
||||
checkAndRecord('user:usage', rule);
|
||||
it('reflects current usage', async () => {
|
||||
await checkAndRecord('user:usage', rule);
|
||||
await checkAndRecord('user:usage', rule);
|
||||
|
||||
const result = peek('user:usage', rule);
|
||||
const result = await peek('user:usage', rule);
|
||||
expect(result.remaining).toBe(1);
|
||||
expect(result.allowed).toBe(true);
|
||||
});
|
||||
|
||||
it('shows blocked status when limit exceeded', () => {
|
||||
checkAndRecord('user:full', rule);
|
||||
checkAndRecord('user:full', rule);
|
||||
checkAndRecord('user:full', rule);
|
||||
it('shows blocked status when limit exceeded', async () => {
|
||||
await checkAndRecord('user:full', rule);
|
||||
await checkAndRecord('user:full', rule);
|
||||
await checkAndRecord('user:full', rule);
|
||||
|
||||
const result = peek('user:full', rule);
|
||||
const result = await peek('user:full', rule);
|
||||
expect(result.allowed).toBe(false);
|
||||
expect(result.remaining).toBe(0);
|
||||
});
|
||||
});
|
||||
|
||||
describe('clearAll', () => {
|
||||
it('resets all keys', () => {
|
||||
it('resets all keys', async () => {
|
||||
const rule: RateLimitRule = { maxRequests: 1, windowSeconds: 60 };
|
||||
checkAndRecord('a', rule);
|
||||
checkAndRecord('b', rule);
|
||||
await checkAndRecord('a', rule);
|
||||
await checkAndRecord('b', rule);
|
||||
|
||||
clearAll();
|
||||
await clearAll();
|
||||
|
||||
expect(checkAndRecord('a', rule).allowed).toBe(true);
|
||||
expect(checkAndRecord('b', rule).allowed).toBe(true);
|
||||
expect((await checkAndRecord('a', rule)).allowed).toBe(true);
|
||||
expect((await checkAndRecord('b', rule)).allowed).toBe(true);
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
@ -80,7 +80,7 @@ export async function rateLimitRoutes(app: FastifyInstance) {
|
||||
const { key, productId, routePrefix } = parsed.data;
|
||||
const rule = findRule(productId, routePrefix);
|
||||
const compositeKey = `${productId}:${key}${routePrefix ? `:${routePrefix}` : ''}`;
|
||||
const result = store.checkAndRecord(compositeKey, rule);
|
||||
const result = await store.checkAndRecord(compositeKey, rule);
|
||||
|
||||
if (!result.allowed) {
|
||||
reply.code(429).header('Retry-After', String(Math.ceil((result.retryAfterMs ?? 0) / 1000)));
|
||||
@ -93,7 +93,7 @@ export async function rateLimitRoutes(app: FastifyInstance) {
|
||||
const { key, productId } = req.body as { key?: string; productId?: string };
|
||||
const pid = productId || getRequestProductId(req);
|
||||
if (!key) throw new BadRequestError('key is required');
|
||||
store.reset(`${pid}:${key}`);
|
||||
await store.reset(`${pid}:${key}`);
|
||||
return { reset: true };
|
||||
});
|
||||
|
||||
|
||||
@ -1,101 +1,166 @@
|
||||
/**
|
||||
* Rate limit store — sliding window counter.
|
||||
* Rate limit store — sliding window counter with datastore backing.
|
||||
*
|
||||
* Default: in-memory Map (single-instance).
|
||||
* When REDIS_URL is set: uses Redis sorted sets for horizontal scaling.
|
||||
*
|
||||
* Each key tracks request timestamps within the window. Old entries are
|
||||
* pruned on every check.
|
||||
* Default: datastore-backed for shared enforcement across service instances.
|
||||
* Fallback: in-memory when RATE_LIMIT_STORE_MODE=memory.
|
||||
*/
|
||||
|
||||
import crypto from 'node:crypto';
|
||||
import { getCollection } from '../../lib/datastore.js';
|
||||
import type { RateLimitEntry, RateLimitResult, RateLimitRule } from './types.js';
|
||||
|
||||
// ── In-Memory Store ──────────────────────────────────────────
|
||||
interface RateLimitDoc {
|
||||
id: string;
|
||||
productId: string;
|
||||
key: string;
|
||||
timestamps: number[];
|
||||
updatedAt: string;
|
||||
ttl?: number;
|
||||
}
|
||||
|
||||
const store = new Map<string, RateLimitEntry>();
|
||||
const memoryStore = new Map<string, RateLimitEntry>();
|
||||
let storeNamespace = 'default';
|
||||
|
||||
/**
|
||||
* Check (and record) a request against the sliding window for the given key.
|
||||
* Returns whether the request is allowed and remaining quota.
|
||||
*/
|
||||
export function checkAndRecord(key: string, rule: RateLimitRule): RateLimitResult {
|
||||
const now = Date.now();
|
||||
function shouldUseMemoryStore(): boolean {
|
||||
return process.env.RATE_LIMIT_STORE_MODE === 'memory';
|
||||
}
|
||||
|
||||
function rateLimitCollection() {
|
||||
return getCollection<RateLimitDoc>('rate_limit_entries', '/id');
|
||||
}
|
||||
|
||||
function normalizeKey(key: string): string {
|
||||
return `${storeNamespace}:${key}`;
|
||||
}
|
||||
|
||||
function docIdForKey(key: string): string {
|
||||
return `rl_${crypto.createHash('sha256').update(normalizeKey(key)).digest('hex')}`;
|
||||
}
|
||||
|
||||
function toResult(
|
||||
timestamps: number[],
|
||||
rule: RateLimitRule,
|
||||
now: number,
|
||||
blocked: boolean
|
||||
): RateLimitResult {
|
||||
const windowMs = rule.windowSeconds * 1000;
|
||||
const windowStart = now - windowMs;
|
||||
|
||||
// Get or create entry
|
||||
let entry = store.get(key);
|
||||
if (!entry) {
|
||||
entry = { timestamps: [] };
|
||||
store.set(key, entry);
|
||||
}
|
||||
|
||||
// Prune expired timestamps
|
||||
entry.timestamps = entry.timestamps.filter(t => t > windowStart);
|
||||
|
||||
// Check limit
|
||||
if (entry.timestamps.length >= rule.maxRequests) {
|
||||
const oldestInWindow = entry.timestamps[0];
|
||||
const resetAt = new Date(oldestInWindow + windowMs).toISOString();
|
||||
if (blocked) {
|
||||
const oldestInWindow = timestamps[0];
|
||||
const retryAfterMs = oldestInWindow + windowMs - now;
|
||||
return {
|
||||
allowed: false,
|
||||
limit: rule.maxRequests,
|
||||
remaining: 0,
|
||||
resetAt,
|
||||
resetAt: new Date(oldestInWindow + windowMs).toISOString(),
|
||||
retryAfterMs: Math.max(0, retryAfterMs),
|
||||
};
|
||||
}
|
||||
|
||||
// Record this request
|
||||
entry.timestamps.push(now);
|
||||
|
||||
const resetAt = new Date(now + windowMs).toISOString();
|
||||
return {
|
||||
allowed: true,
|
||||
limit: rule.maxRequests,
|
||||
remaining: rule.maxRequests - entry.timestamps.length,
|
||||
resetAt,
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Reset the rate limit for a specific key (e.g., after a successful auth).
|
||||
*/
|
||||
export function reset(key: string): void {
|
||||
store.delete(key);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get current state without recording a request (read-only check).
|
||||
*/
|
||||
export function peek(key: string, rule: RateLimitRule): RateLimitResult {
|
||||
const now = Date.now();
|
||||
const windowMs = rule.windowSeconds * 1000;
|
||||
const windowStart = now - windowMs;
|
||||
|
||||
const entry = store.get(key);
|
||||
if (!entry) {
|
||||
return {
|
||||
allowed: true,
|
||||
limit: rule.maxRequests,
|
||||
remaining: rule.maxRequests,
|
||||
resetAt: new Date(now + windowMs).toISOString(),
|
||||
};
|
||||
}
|
||||
|
||||
const active = entry.timestamps.filter(t => t > windowStart);
|
||||
const remaining = Math.max(0, rule.maxRequests - active.length);
|
||||
|
||||
return {
|
||||
allowed: remaining > 0,
|
||||
limit: rule.maxRequests,
|
||||
remaining,
|
||||
remaining: Math.max(0, rule.maxRequests - timestamps.length),
|
||||
resetAt: new Date(now + windowMs).toISOString(),
|
||||
};
|
||||
}
|
||||
|
||||
/** Clear all entries (for testing). */
|
||||
export function clearAll(): void {
|
||||
store.clear();
|
||||
async function checkAndRecordDatastore(key: string, rule: RateLimitRule): Promise<RateLimitResult> {
|
||||
const now = Date.now();
|
||||
const windowMs = rule.windowSeconds * 1000;
|
||||
const windowStart = now - windowMs;
|
||||
const docId = docIdForKey(key);
|
||||
const existing = await rateLimitCollection().findById(docId, docId);
|
||||
const timestamps = (existing?.timestamps ?? []).filter(timestamp => timestamp > windowStart);
|
||||
|
||||
if (timestamps.length >= rule.maxRequests) {
|
||||
return toResult(timestamps, rule, now, true);
|
||||
}
|
||||
|
||||
const nextTimestamps = [...timestamps, now];
|
||||
const doc: RateLimitDoc = {
|
||||
id: docId,
|
||||
productId: '__platform__',
|
||||
key: normalizeKey(key),
|
||||
timestamps: nextTimestamps,
|
||||
updatedAt: new Date(now).toISOString(),
|
||||
ttl: Math.max(rule.windowSeconds * 2, 60),
|
||||
};
|
||||
|
||||
await rateLimitCollection().upsert(doc);
|
||||
return toResult(nextTimestamps, rule, now, false);
|
||||
}
|
||||
|
||||
function checkAndRecordMemory(key: string, rule: RateLimitRule): RateLimitResult {
|
||||
const now = Date.now();
|
||||
const windowMs = rule.windowSeconds * 1000;
|
||||
const windowStart = now - windowMs;
|
||||
const normalizedKey = normalizeKey(key);
|
||||
|
||||
let entry = memoryStore.get(normalizedKey);
|
||||
if (!entry) {
|
||||
entry = { timestamps: [] };
|
||||
memoryStore.set(normalizedKey, entry);
|
||||
}
|
||||
|
||||
entry.timestamps = entry.timestamps.filter(timestamp => timestamp > windowStart);
|
||||
if (entry.timestamps.length >= rule.maxRequests) {
|
||||
return toResult(entry.timestamps, rule, now, true);
|
||||
}
|
||||
|
||||
entry.timestamps.push(now);
|
||||
return toResult(entry.timestamps, rule, now, false);
|
||||
}
|
||||
|
||||
async function peekDatastore(key: string, rule: RateLimitRule): Promise<RateLimitResult> {
|
||||
const now = Date.now();
|
||||
const windowMs = rule.windowSeconds * 1000;
|
||||
const windowStart = now - windowMs;
|
||||
const docId = docIdForKey(key);
|
||||
const existing = await rateLimitCollection().findById(docId, docId);
|
||||
const timestamps = (existing?.timestamps ?? []).filter(timestamp => timestamp > windowStart);
|
||||
return toResult(timestamps, rule, now, timestamps.length >= rule.maxRequests);
|
||||
}
|
||||
|
||||
function peekMemory(key: string, rule: RateLimitRule): RateLimitResult {
|
||||
const now = Date.now();
|
||||
const windowMs = rule.windowSeconds * 1000;
|
||||
const windowStart = now - windowMs;
|
||||
const entry = memoryStore.get(normalizeKey(key));
|
||||
const timestamps = (entry?.timestamps ?? []).filter(timestamp => timestamp > windowStart);
|
||||
return toResult(timestamps, rule, now, timestamps.length >= rule.maxRequests);
|
||||
}
|
||||
|
||||
export async function checkAndRecord(key: string, rule: RateLimitRule): Promise<RateLimitResult> {
|
||||
if (shouldUseMemoryStore()) {
|
||||
return checkAndRecordMemory(key, rule);
|
||||
}
|
||||
|
||||
return checkAndRecordDatastore(key, rule);
|
||||
}
|
||||
|
||||
export async function reset(key: string): Promise<void> {
|
||||
const normalizedKey = normalizeKey(key);
|
||||
if (shouldUseMemoryStore()) {
|
||||
memoryStore.delete(normalizedKey);
|
||||
return;
|
||||
}
|
||||
|
||||
const docId = docIdForKey(key);
|
||||
await rateLimitCollection()
|
||||
.delete(docId, docId)
|
||||
.catch(() => {});
|
||||
}
|
||||
|
||||
export async function peek(key: string, rule: RateLimitRule): Promise<RateLimitResult> {
|
||||
if (shouldUseMemoryStore()) {
|
||||
return peekMemory(key, rule);
|
||||
}
|
||||
|
||||
return peekDatastore(key, rule);
|
||||
}
|
||||
|
||||
export async function clearAll(): Promise<void> {
|
||||
memoryStore.clear();
|
||||
storeNamespace = crypto.randomUUID();
|
||||
}
|
||||
|
||||
@ -12,8 +12,8 @@ import * as repo from './repository.js';
|
||||
import * as tracker from './tracker.js';
|
||||
|
||||
export async function runRoutes(app: FastifyInstance) {
|
||||
function requireRunsRead(req: import('fastify').FastifyRequest): string {
|
||||
const access = requireJwtOrApiKey(req, {
|
||||
async function requireRunsRead(req: import('fastify').FastifyRequest): Promise<string> {
|
||||
const access = await requireJwtOrApiKey(req, {
|
||||
jwtRoles: ['super_admin', 'admin'],
|
||||
apiKeyScopes: ['jobs:read'],
|
||||
apiKeyTokenTypes: ['service_api'],
|
||||
@ -22,11 +22,11 @@ export async function runRoutes(app: FastifyInstance) {
|
||||
return access.productId;
|
||||
}
|
||||
|
||||
function requireRunsWrite(req: import('fastify').FastifyRequest): {
|
||||
async function requireRunsWrite(req: import('fastify').FastifyRequest): Promise<{
|
||||
productId: string;
|
||||
actorId: string;
|
||||
} {
|
||||
const access = requireJwtOrApiKey(req, {
|
||||
}> {
|
||||
const access = await requireJwtOrApiKey(req, {
|
||||
jwtRoles: ['super_admin', 'admin'],
|
||||
apiKeyTokenTypes: ['service_api'],
|
||||
rateLimitKey: 'jobs:write',
|
||||
@ -35,7 +35,7 @@ export async function runRoutes(app: FastifyInstance) {
|
||||
}
|
||||
|
||||
app.get('/runs', async req => {
|
||||
const productId = requireRunsRead(req);
|
||||
const productId = await requireRunsRead(req);
|
||||
const parsed = ListRunsQuerySchema.safeParse(req.query);
|
||||
if (!parsed.success) {
|
||||
throw new BadRequestError(parsed.error.issues.map(issue => issue.message).join('; '));
|
||||
@ -45,19 +45,19 @@ export async function runRoutes(app: FastifyInstance) {
|
||||
});
|
||||
|
||||
app.get('/runs/:id', async req => {
|
||||
const productId = requireRunsRead(req);
|
||||
const productId = await requireRunsRead(req);
|
||||
const { id } = req.params as { id: string };
|
||||
return repo.getRun(id, productId);
|
||||
});
|
||||
|
||||
app.get('/runs/:id/steps', async req => {
|
||||
const productId = requireRunsRead(req);
|
||||
const productId = await requireRunsRead(req);
|
||||
const { id } = req.params as { id: string };
|
||||
return repo.listRunSteps(productId, id);
|
||||
});
|
||||
|
||||
app.post('/runs', async req => {
|
||||
const { productId, actorId } = requireRunsWrite(req);
|
||||
const { productId, actorId } = await requireRunsWrite(req);
|
||||
const parsed = CreateRunSchema.safeParse(req.body);
|
||||
if (!parsed.success) {
|
||||
throw new BadRequestError(parsed.error.issues.map(issue => issue.message).join('; '));
|
||||
@ -71,7 +71,7 @@ export async function runRoutes(app: FastifyInstance) {
|
||||
});
|
||||
|
||||
app.patch('/runs/:id', async req => {
|
||||
const { productId } = requireRunsWrite(req);
|
||||
const { productId } = await requireRunsWrite(req);
|
||||
const { id } = req.params as { id: string };
|
||||
const parsed = UpdateRunSchema.safeParse(req.body);
|
||||
if (!parsed.success) {
|
||||
@ -95,7 +95,7 @@ export async function runRoutes(app: FastifyInstance) {
|
||||
});
|
||||
|
||||
app.post('/runs/:id/steps', async req => {
|
||||
const { productId } = requireRunsWrite(req);
|
||||
const { productId } = await requireRunsWrite(req);
|
||||
const { id } = req.params as { id: string };
|
||||
const parsed = CreateRunStepSchema.safeParse(req.body);
|
||||
if (!parsed.success) {
|
||||
@ -110,7 +110,7 @@ export async function runRoutes(app: FastifyInstance) {
|
||||
});
|
||||
|
||||
app.patch('/runs/:id/steps/:stepName', async req => {
|
||||
const { productId } = requireRunsWrite(req);
|
||||
const { productId } = await requireRunsWrite(req);
|
||||
const { id, stepName } = req.params as { id: string; stepName: string };
|
||||
const parsed = UpdateRunStepSchema.safeParse(req.body);
|
||||
if (!parsed.success) {
|
||||
|
||||
@ -7,8 +7,8 @@ import * as repo from './repository.js';
|
||||
import { dispatchEvent } from './dispatcher.js';
|
||||
|
||||
export async function webhookRoutes(app: FastifyInstance) {
|
||||
function requireWebhooksRead(req: import('fastify').FastifyRequest): string {
|
||||
const access = requireJwtOrApiKey(req, {
|
||||
async function requireWebhooksRead(req: import('fastify').FastifyRequest): Promise<string> {
|
||||
const access = await requireJwtOrApiKey(req, {
|
||||
jwtRoles: ['super_admin', 'admin'],
|
||||
apiKeyScopes: ['webhooks:read'],
|
||||
rateLimitKey: 'webhooks:read',
|
||||
@ -16,11 +16,11 @@ export async function webhookRoutes(app: FastifyInstance) {
|
||||
return access.productId;
|
||||
}
|
||||
|
||||
function requireWebhooksWrite(req: import('fastify').FastifyRequest): {
|
||||
async function requireWebhooksWrite(req: import('fastify').FastifyRequest): Promise<{
|
||||
actorId: string;
|
||||
productId: string;
|
||||
} {
|
||||
const access = requireJwtOrApiKey(req, {
|
||||
}> {
|
||||
const access = await requireJwtOrApiKey(req, {
|
||||
jwtRoles: ['super_admin', 'admin'],
|
||||
apiKeyScopes: ['webhooks:write'],
|
||||
rateLimitKey: 'webhooks:write',
|
||||
@ -33,13 +33,13 @@ export async function webhookRoutes(app: FastifyInstance) {
|
||||
|
||||
// List webhook subscriptions
|
||||
app.get('/webhooks/subscriptions', async req => {
|
||||
const productId = requireWebhooksRead(req);
|
||||
const productId = await requireWebhooksRead(req);
|
||||
return repo.listSubscriptions(productId);
|
||||
});
|
||||
|
||||
// Create a webhook subscription
|
||||
app.post('/webhooks/subscriptions', async req => {
|
||||
const access = requireWebhooksWrite(req);
|
||||
const access = await requireWebhooksWrite(req);
|
||||
const parsed = CreateWebhookSubscriptionSchema.safeParse(req.body);
|
||||
if (!parsed.success) {
|
||||
throw new BadRequestError(parsed.error.issues.map(i => i.message).join('; '));
|
||||
@ -58,7 +58,7 @@ export async function webhookRoutes(app: FastifyInstance) {
|
||||
|
||||
// Get a specific subscription
|
||||
app.get('/webhooks/subscriptions/:id', async req => {
|
||||
const productId = requireWebhooksRead(req);
|
||||
const productId = await requireWebhooksRead(req);
|
||||
const { id } = req.params as { id: string };
|
||||
|
||||
const sub = await repo.getSubscription(id, productId);
|
||||
@ -70,7 +70,7 @@ export async function webhookRoutes(app: FastifyInstance) {
|
||||
|
||||
// Update a subscription
|
||||
app.patch('/webhooks/subscriptions/:id', async req => {
|
||||
const access = requireWebhooksWrite(req);
|
||||
const access = await requireWebhooksWrite(req);
|
||||
const { id } = req.params as { id: string };
|
||||
|
||||
const parsed = UpdateWebhookSubscriptionSchema.safeParse(req.body);
|
||||
@ -85,7 +85,7 @@ export async function webhookRoutes(app: FastifyInstance) {
|
||||
|
||||
// Delete a subscription
|
||||
app.delete('/webhooks/subscriptions/:id', async req => {
|
||||
const productId = requireWebhooksWrite(req).productId;
|
||||
const productId = (await requireWebhooksWrite(req)).productId;
|
||||
const { id } = req.params as { id: string };
|
||||
|
||||
const deleted = await repo.deleteSubscription(id, productId);
|
||||
@ -95,7 +95,7 @@ export async function webhookRoutes(app: FastifyInstance) {
|
||||
|
||||
// List deliveries for a subscription
|
||||
app.get('/webhooks/subscriptions/:id/deliveries', async req => {
|
||||
requireWebhooksRead(req);
|
||||
await requireWebhooksRead(req);
|
||||
const { id } = req.params as { id: string };
|
||||
const query = req.query as Record<string, string>;
|
||||
return repo.listDeliveries(id, {
|
||||
@ -105,7 +105,7 @@ export async function webhookRoutes(app: FastifyInstance) {
|
||||
|
||||
// Test delivery — send a test event to a specific subscription
|
||||
app.post('/webhooks/subscriptions/:id/test', async req => {
|
||||
const productId = requireWebhooksWrite(req).productId;
|
||||
const productId = (await requireWebhooksWrite(req)).productId;
|
||||
const { id } = req.params as { id: string };
|
||||
|
||||
const sub = await repo.getSubscription(id, productId);
|
||||
@ -123,7 +123,7 @@ export async function webhookRoutes(app: FastifyInstance) {
|
||||
|
||||
// Rotate subscription secret
|
||||
app.post('/webhooks/subscriptions/:id/rotate-secret', async req => {
|
||||
const productId = requireWebhooksWrite(req).productId;
|
||||
const productId = (await requireWebhooksWrite(req)).productId;
|
||||
const { id } = req.params as { id: string };
|
||||
|
||||
const sub = await repo.getSubscription(id, productId);
|
||||
|
||||
Loading…
Reference in New Issue
Block a user