learning_ai_common_plat/services/extraction-service/src/modules/extract/routes.ts

570 lines
18 KiB
TypeScript

import type { FastifyInstance } from 'fastify';
import rateLimit from '@fastify/rate-limit';
import { createHash } from 'node:crypto';
import { ExtractRequestSchema, BatchExtractRequestSchema } from './types.js';
import {
sidecarExtract,
sidecarExtractBatch,
sidecarHealth,
type SidecarExtractResponse,
} from '../../lib/python-bridge.js';
import { BadRequestError } from '../../lib/errors.js';
import { checkQuota, incrementUsage, getUsageSummary } from './usage.js';
import { recordExtraction, getMetricsSummary } from '../../lib/metrics.js';
import { sidecarBreaker } from '../../lib/circuit-breaker.js';
import { createJob, getJob, listJobs } from './jobs.js';
import {
checkProductRateLimit,
getProductRateLimitStatus,
getRateLimitSummary,
resetProductRateLimit,
} from './product-rate-limit.js';
import {
startHealthMonitoring,
getHealthState,
getHealthSummary,
checkHealthNow,
} from './sidecar-monitor.js';
import { getDeliveryStats, type WebhookConfig } from './webhooks.js';
// ── In-memory LRU cache ────────────────────────────────────────
const CACHE_TTL_MS = parseInt(process.env.EXTRACTION_CACHE_TTL_MS || '86400000', 10); // 24h
const CACHE_MAX = parseInt(process.env.EXTRACTION_CACHE_MAX || '500', 10);
interface CacheEntry {
response: SidecarExtractResponse;
createdAt: number;
}
const cache = new Map<string, CacheEntry>();
let cacheHits = 0;
let cacheMisses = 0;
function cacheKey(text: string, taskId?: string, modelId?: string): string {
return createHash('sha256')
.update(`${taskId || ''}:${modelId || ''}:${text}`)
.digest('hex');
}
function cacheGet(text: string, taskId?: string, modelId?: string): SidecarExtractResponse | null {
const key = cacheKey(text, taskId, modelId);
const entry = cache.get(key);
if (!entry) {
cacheMisses++;
return null;
}
if (Date.now() - entry.createdAt > CACHE_TTL_MS) {
cache.delete(key);
cacheMisses++;
return null;
}
cacheHits++;
return entry.response;
}
function cachePut(
text: string,
taskId: string | undefined,
modelId: string | undefined,
response: SidecarExtractResponse
): void {
// Evict oldest if at capacity
if (cache.size >= CACHE_MAX) {
const firstKey = cache.keys().next().value;
if (firstKey) cache.delete(firstKey);
}
cache.set(cacheKey(text, taskId, modelId), { response, createdAt: Date.now() });
}
// ── Model registry ───────────────────────────────────────────────
const MODEL_REGISTRY = [
{
id: 'gemini-2.5-flash',
provider: 'google',
description: 'Gemini 2.5 Flash (default)',
tier: 'standard',
},
{ id: 'gemini-2.5-pro', provider: 'google', description: 'Gemini 2.5 Pro', tier: 'premium' },
{
id: 'gemini-2.0-flash',
provider: 'google',
description: 'Gemini 2.0 Flash (legacy)',
tier: 'standard',
},
{
id: 'gemini-2.5-flash-mock',
provider: 'mock',
description: 'Mock extractor (no API key)',
tier: 'free',
},
];
export async function extractRoutes(app: FastifyInstance) {
// Start sidecar health monitoring
startHealthMonitoring();
// Rate limiting for extraction endpoints — 30 req/min per IP (configurable)
await app.register(rateLimit, {
max: 30,
timeWindow: '1 minute',
keyGenerator: req => req.ip,
});
/**
* POST /extract — Single document extraction.
*/
app.post('/extract', async (req, reply) => {
const parsed = ExtractRequestSchema.safeParse(req.body);
if (!parsed.success) {
throw new BadRequestError(parsed.error.issues.map(i => i.message).join('; '));
}
const { text, taskId, taskPrompt, examples, modelId, options, productId } = parsed.data;
const requestId = req.headers['x-request-id'] as string | undefined;
const headerProductId = (req.headers['x-product-id'] as string) || productId;
// Check per-product rate limit
if (headerProductId) {
const productLimit = checkProductRateLimit(headerProductId);
if (!productLimit.allowed) {
reply.header('X-RateLimit-Limit', String(productLimit.limit));
reply.header('X-RateLimit-Remaining', '0');
reply.header('X-RateLimit-Reset', String(Math.ceil(productLimit.resetAt / 1000)));
reply.header('Retry-After', String(productLimit.retryAfter || 60));
return reply.status(429).send({
error: 'Product rate limit exceeded',
productId: headerProductId,
limit: productLimit.limit,
resetAt: new Date(productLimit.resetAt).toISOString(),
});
}
reply.header('X-RateLimit-Remaining', String(productLimit.remaining));
}
// Enforce per-user daily quota
const userId = req.headers['x-user-id'] as string | undefined;
const userPlan = (req.headers['x-user-plan'] as string) || 'free';
if (userId) {
const quota = checkQuota(userId, userPlan);
if (!quota.allowed) {
reply.header('X-RateLimit-Limit', String(quota.limit));
reply.header('X-RateLimit-Remaining', '0');
return reply.status(429).send({
error: 'Daily extraction quota exceeded',
limit: quota.limit,
used: quota.used,
plan: userPlan,
});
}
}
req.log.info(
{
taskId,
modelId,
textLength: text.length,
userId,
userPlan,
productId: req.headers['x-product-id'],
},
'extraction request'
);
// Check cache
const cached = cacheGet(text, taskId, modelId);
if (cached) {
req.log.info({ taskId, userId, cacheHit: true }, 'cache hit');
recordExtraction({
taskId,
modelId,
productId: req.headers['x-product-id'] as string,
status: 'cache_hit',
});
reply.header('X-Extraction-Cache', 'HIT');
return reply.send({
extractions: cached.extractions,
metadata: {
modelId: cached.metadata.model_id,
durationMs: cached.metadata.duration_ms,
tokenCount: cached.metadata.token_count,
charCount: cached.metadata.char_count,
},
requestId,
});
}
reply.header('X-Extraction-Cache', 'MISS');
// Circuit breaker — fail fast if sidecar is down
if (!sidecarBreaker.allowRequest()) {
recordExtraction({ taskId, modelId, status: 'error' });
return reply.status(503).send({
error: 'Extraction service temporarily unavailable (circuit open)',
circuitState: sidecarBreaker.currentState,
});
}
let result;
try {
result = await sidecarExtract(
{
text,
task_id: taskId,
task_prompt: taskPrompt,
examples: examples?.map(e => ({
text: e.text,
extractions: e.extractions.map(ex => ({
extraction_class: ex.extraction_class,
extraction_text: ex.extraction_text,
attributes: ex.attributes,
})),
})),
model_id: modelId,
extraction_passes: options?.extractionPasses,
max_workers: options?.maxWorkers,
max_char_buffer: options?.maxCharBuffer,
},
requestId
);
sidecarBreaker.recordSuccess();
} catch (err) {
sidecarBreaker.recordFailure();
const message = err instanceof Error ? err.message : 'Sidecar error';
req.log.error({ error: message, taskId, userId }, 'extraction failed');
recordExtraction({
taskId,
modelId,
productId: req.headers['x-product-id'] as string,
status: 'error',
});
// Map sidecar errors to proper HTTP status codes (5.10)
if (message.includes('timeout') || message.includes('Timeout')) {
return reply.status(408).send({ error: 'Extraction timed out', detail: message });
}
if (message.includes('429') || message.includes('rate limit')) {
return reply
.status(429)
.send({ error: 'Upstream LLM rate limit', detail: message, retryAfter: 60 });
}
if (message.includes('Model') && message.includes('unavailable')) {
return reply.status(503).send({ error: 'Model unavailable', detail: message });
}
if (message.includes('400') || message.includes('Invalid')) {
return reply.status(400).send({ error: 'Invalid extraction request', detail: message });
}
return reply.status(502).send({ error: 'Extraction sidecar error', detail: message });
}
cachePut(text, taskId, modelId, result);
if (userId) incrementUsage(userId, userPlan);
recordExtraction({
taskId,
modelId,
productId: req.headers['x-product-id'] as string,
status: 'success',
durationMs: result.metadata.duration_ms,
entityCount: result.extractions.length,
});
req.log.info(
{
taskId,
modelId,
entityCount: result.extractions.length,
durationMs: result.metadata.duration_ms,
tokenCount: result.metadata.token_count,
charCount: result.metadata.char_count,
userId,
productId: req.headers['x-product-id'],
cacheHit: false,
},
'extraction complete'
);
return reply.send({
extractions: result.extractions,
metadata: {
modelId: result.metadata.model_id,
durationMs: result.metadata.duration_ms,
tokenCount: result.metadata.token_count,
charCount: result.metadata.char_count,
},
requestId,
});
});
/**
* POST /extract/batch — Batch extraction (multiple inputs, shared config).
*/
app.post('/extract/batch', async (req, reply) => {
const parsed = BatchExtractRequestSchema.safeParse(req.body);
if (!parsed.success) {
throw new BadRequestError(parsed.error.issues.map(i => i.message).join('; '));
}
const { inputs, examples, modelId } = parsed.data;
const requestId = req.headers['x-request-id'] as string | undefined;
req.log.info({ inputCount: inputs.length, modelId }, 'batch extraction request');
const sidecarRequests = inputs.map(input => ({
text: input.text,
task_id: input.taskId,
task_prompt: input.taskPrompt,
examples: examples?.map(e => ({
text: e.text,
extractions: e.extractions.map(ex => ({
extraction_class: ex.extraction_class,
extraction_text: ex.extraction_text,
attributes: ex.attributes,
})),
})),
model_id: modelId,
}));
const results = await sidecarExtractBatch(sidecarRequests, requestId);
return reply.send({
results: results.map(r => ({
extractions: r.extractions,
metadata: {
modelId: r.metadata.model_id,
durationMs: r.metadata.duration_ms,
tokenCount: r.metadata.token_count,
charCount: r.metadata.char_count,
},
})),
requestId,
});
});
/**
* POST /extract/jobs — Submit async batch extraction job.
*/
app.post('/extract/jobs', async (req, reply) => {
const parsed = BatchExtractRequestSchema.safeParse(req.body);
if (!parsed.success) {
throw new BadRequestError(parsed.error.issues.map(i => i.message).join('; '));
}
const { inputs, examples, modelId, productId } = parsed.data;
const requestId = req.headers['x-request-id'] as string | undefined;
// Check per-product rate limit for async jobs
const headerProductId = (req.headers['x-product-id'] as string) || productId;
if (headerProductId) {
const productLimit = checkProductRateLimit(headerProductId);
if (!productLimit.allowed) {
reply.header('X-RateLimit-Limit', String(productLimit.limit));
reply.header('X-RateLimit-Remaining', '0');
reply.header('Retry-After', String(productLimit.retryAfter || 60));
return reply.status(429).send({
error: 'Product rate limit exceeded',
productId: headerProductId,
limit: productLimit.limit,
});
}
}
const sidecarRequests = inputs.map(input => ({
text: input.text,
task_id: input.taskId,
task_prompt: input.taskPrompt,
examples: examples?.map(e => ({
text: e.text,
extractions: e.extractions.map(ex => ({
extraction_class: ex.extraction_class,
extraction_text: ex.extraction_text,
attributes: ex.attributes,
})),
})),
model_id: modelId,
}));
// Parse optional webhook configuration from request body
const body = req.body as Record<string, unknown>;
let webhookConfig: WebhookConfig | undefined;
if (body.webhookUrl && typeof body.webhookUrl === 'string') {
webhookConfig = {
url: body.webhookUrl,
secret: typeof body.webhookSecret === 'string' ? body.webhookSecret : 'default-secret',
retryAttempts: typeof body.webhookRetryAttempts === 'number' ? body.webhookRetryAttempts : 3,
};
}
const job = createJob(sidecarRequests, requestId, webhookConfig);
req.log.info({ jobId: job.id, inputCount: inputs.length }, 'async job created');
return reply.status(202).send({
jobId: job.id,
status: job.status,
progress: job.progress,
createdAt: job.createdAt,
webhookConfigured: !!webhookConfig,
});
});
/**
* GET /extract/jobs/:id — Get async job status and results.
*/
app.get('/extract/jobs/:id', async (req, reply) => {
const { id } = req.params as { id: string };
const job = getJob(id);
if (!job) {
return reply.status(404).send({ error: 'Job not found' });
}
return reply.send({
id: job.id,
status: job.status,
progress: job.progress,
results:
job.status === 'completed' || job.status === 'failed'
? job.results.map(r => ({
extractions: r.extractions,
metadata: {
modelId: r.metadata.model_id,
durationMs: r.metadata.duration_ms,
charCount: r.metadata.char_count,
},
}))
: undefined,
errors: job.errors.length > 0 ? job.errors : undefined,
createdAt: job.createdAt,
completedAt: job.completedAt,
});
});
/**
* GET /extract/jobs — List recent async jobs.
*/
app.get('/extract/jobs', async (_req, reply) => {
const jobs = listJobs();
return reply.send({
jobs: jobs.map(j => ({
id: j.id,
status: j.status,
progress: j.progress,
inputCount: j.inputs.length,
createdAt: j.createdAt,
completedAt: j.completedAt,
})),
});
});
/**
* GET /extract/models — List available model providers.
*/
app.get('/extract/models', async (_req, reply) => {
return reply.send({
models: MODEL_REGISTRY,
default: process.env.DEFAULT_MODEL_ID || 'gemini-2.5-flash',
});
});
/**
* GET /extract/sidecar-health — Check Python sidecar status.
*/
app.get('/extract/sidecar-health', async (_req, reply) => {
try {
const health = await sidecarHealth();
return reply.send({ status: 'ok', sidecar: health });
} catch (err) {
const message = err instanceof Error ? err.message : 'Sidecar unavailable';
return reply.status(503).send({ status: 'error', error: message });
}
});
/**
* GET /extract/metrics — Extraction metrics summary.
*/
app.get('/extract/metrics', async (_req, reply) => {
return reply.send(getMetricsSummary());
});
/**
* GET /extract/usage — Per-user extraction usage (admin).
*/
app.get('/extract/usage', async (req, reply) => {
const userId = (req.query as Record<string, string>).userId;
const plan = (req.query as Record<string, string>).plan || 'free';
if (!userId) {
throw new BadRequestError('userId query parameter is required');
}
return reply.send(getUsageSummary(userId, plan));
});
/**
* GET /extract/cache-stats — Cache statistics.
*/
app.get('/extract/cache-stats', async (_req, reply) => {
const total = cacheHits + cacheMisses;
return reply.send({
size: cache.size,
maxSize: CACHE_MAX,
ttlMs: CACHE_TTL_MS,
hits: cacheHits,
misses: cacheMisses,
hitRate: total > 0 ? Math.round((cacheHits / total) * 1000) / 1000 : 0,
});
});
/**
* GET /extract/monitoring/sidecar — Sidecar health monitoring status.
*/
app.get('/extract/monitoring/sidecar', async (_req, reply) => {
return reply.send({
state: getHealthState(),
summary: getHealthSummary(),
});
});
/**
* POST /extract/monitoring/sidecar/check — Trigger immediate health check.
*/
app.post('/extract/monitoring/sidecar/check', async (req, reply) => {
const check = await checkHealthNow();
req.log.info({ sidecarStatus: check.status }, 'manual health check');
return reply.send({
check,
state: getHealthState(),
});
});
/**
* GET /extract/rate-limits/product — Get product rate limit status.
*/
app.get('/extract/rate-limits/product', async (req, reply) => {
const productId = (req.query as Record<string, string>).productId;
if (productId) {
return reply.send(getProductRateLimitStatus(productId));
}
return reply.send(getRateLimitSummary());
});
/**
* POST /extract/rate-limits/product/reset — Reset product rate limit (admin).
*/
app.post('/extract/rate-limits/product/reset', async (req, reply) => {
const productId = (req.body as Record<string, string>)?.productId;
if (!productId) {
throw new BadRequestError('productId is required');
}
resetProductRateLimit(productId);
req.log.info({ productId }, 'product rate limit reset');
return reply.send({
productId,
reset: true,
newStatus: getProductRateLimitStatus(productId),
});
});
/**
* GET /extract/webhooks/delivery-stats — Webhook delivery statistics.
*/
app.get('/extract/webhooks/delivery-stats', async (_req, reply) => {
return reply.send(getDeliveryStats());
});
}