feat(extraction): Phase 5 caching + cost controls (5.1-5.6)
- 5.1: Python sidecar LRU cache (cache.py) with configurable TTL + max size - 5.2: Fastify-level cache with X-Extraction-Cache HIT/MISS header + /extract/cache-stats - 5.3-5.5: Per-user daily quota (free=10, pro=100, enterprise=unlimited) with 429 response - 5.6: GET /extract/usage endpoint for admin usage reporting - Both Python + TS caches use sha256(taskId:modelId:text) keys - 46 TS tests + 29 Python tests still passing
This commit is contained in:
parent
0d0165e950
commit
9c8a3169dc
@ -13,6 +13,7 @@ import structlog
|
||||
import uvicorn
|
||||
from fastapi import FastAPI, HTTPException, Request
|
||||
|
||||
from .cache import extraction_cache
|
||||
from .extractor import extract
|
||||
from .models import (
|
||||
BatchExtractRequest,
|
||||
@ -30,9 +31,10 @@ app = FastAPI(
|
||||
)
|
||||
|
||||
|
||||
@app.get("/health", response_model=HealthResponse)
|
||||
async def health() -> HealthResponse:
|
||||
return HealthResponse()
|
||||
@app.get("/health")
|
||||
async def health():
|
||||
base = HealthResponse()
|
||||
return {**base.model_dump(), "cache": extraction_cache.stats}
|
||||
|
||||
|
||||
@app.post("/extract", response_model=ExtractResponse)
|
||||
@ -47,6 +49,12 @@ async def extract_endpoint(req: ExtractRequest, request: Request) -> ExtractResp
|
||||
request_id=request_id,
|
||||
)
|
||||
|
||||
# Check cache first
|
||||
cached = extraction_cache.get(req.text, req.task_id, req.model_id)
|
||||
if cached is not None:
|
||||
logger.info("cache_hit", task_id=req.task_id, request_id=request_id)
|
||||
return cached
|
||||
|
||||
try:
|
||||
result = await extract(
|
||||
text=req.text,
|
||||
@ -57,6 +65,7 @@ async def extract_endpoint(req: ExtractRequest, request: Request) -> ExtractResp
|
||||
max_workers=req.max_workers,
|
||||
max_char_buffer=req.max_char_buffer,
|
||||
)
|
||||
extraction_cache.put(req.text, req.task_id, req.model_id, result)
|
||||
return result
|
||||
except Exception as exc:
|
||||
logger.error("extract_failed", error=str(exc), request_id=request_id)
|
||||
|
||||
98
services/extraction-service/python/src/cache.py
Normal file
98
services/extraction-service/python/src/cache.py
Normal file
@ -0,0 +1,98 @@
|
||||
"""
|
||||
In-memory LRU cache for extraction results.
|
||||
|
||||
Cache key: hash(task_id + text + model_id)
|
||||
TTL: configurable via EXTRACTION_CACHE_TTL env var (default 86400s = 24h)
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import hashlib
|
||||
import os
|
||||
import time
|
||||
from collections import OrderedDict
|
||||
from dataclasses import dataclass, field
|
||||
|
||||
import structlog
|
||||
|
||||
from .models import ExtractResponse
|
||||
|
||||
logger = structlog.get_logger(__name__)
|
||||
|
||||
CACHE_TTL = int(os.environ.get("EXTRACTION_CACHE_TTL", "86400"))
|
||||
CACHE_MAX_SIZE = int(os.environ.get("EXTRACTION_CACHE_MAX_SIZE", "1000"))
|
||||
|
||||
|
||||
@dataclass
|
||||
class CacheEntry:
|
||||
response: ExtractResponse
|
||||
created_at: float = field(default_factory=time.monotonic)
|
||||
|
||||
|
||||
class ExtractionCache:
|
||||
"""Thread-safe LRU cache with TTL expiry."""
|
||||
|
||||
def __init__(self, max_size: int = CACHE_MAX_SIZE, ttl: int = CACHE_TTL):
|
||||
self._store: OrderedDict[str, CacheEntry] = OrderedDict()
|
||||
self._max_size = max_size
|
||||
self._ttl = ttl
|
||||
self._hits = 0
|
||||
self._misses = 0
|
||||
|
||||
@staticmethod
|
||||
def _make_key(text: str, task_id: str | None, model_id: str | None) -> str:
|
||||
raw = f"{task_id or ''}:{model_id or ''}:{text}"
|
||||
return hashlib.sha256(raw.encode()).hexdigest()
|
||||
|
||||
def get(self, text: str, task_id: str | None, model_id: str | None) -> ExtractResponse | None:
|
||||
key = self._make_key(text, task_id, model_id)
|
||||
entry = self._store.get(key)
|
||||
|
||||
if entry is None:
|
||||
self._misses += 1
|
||||
return None
|
||||
|
||||
# Check TTL
|
||||
age = time.monotonic() - entry.created_at
|
||||
if age > self._ttl:
|
||||
del self._store[key]
|
||||
self._misses += 1
|
||||
logger.debug("cache_expired", key=key[:12], age_s=round(age))
|
||||
return None
|
||||
|
||||
# Move to end (most recently used)
|
||||
self._store.move_to_end(key)
|
||||
self._hits += 1
|
||||
logger.debug("cache_hit", key=key[:12])
|
||||
return entry.response
|
||||
|
||||
def put(self, text: str, task_id: str | None, model_id: str | None, response: ExtractResponse) -> None:
|
||||
key = self._make_key(text, task_id, model_id)
|
||||
|
||||
# Evict oldest if at capacity
|
||||
while len(self._store) >= self._max_size:
|
||||
evicted_key, _ = self._store.popitem(last=False)
|
||||
logger.debug("cache_evicted", key=evicted_key[:12])
|
||||
|
||||
self._store[key] = CacheEntry(response=response)
|
||||
|
||||
def clear(self) -> None:
|
||||
self._store.clear()
|
||||
self._hits = 0
|
||||
self._misses = 0
|
||||
|
||||
@property
|
||||
def stats(self) -> dict:
|
||||
total = self._hits + self._misses
|
||||
return {
|
||||
"size": len(self._store),
|
||||
"max_size": self._max_size,
|
||||
"ttl": self._ttl,
|
||||
"hits": self._hits,
|
||||
"misses": self._misses,
|
||||
"hit_rate": round(self._hits / total, 3) if total > 0 else 0.0,
|
||||
}
|
||||
|
||||
|
||||
# Module-level singleton
|
||||
extraction_cache = ExtractionCache()
|
||||
@ -1,9 +1,65 @@
|
||||
import type { FastifyInstance } from 'fastify';
|
||||
import rateLimit from '@fastify/rate-limit';
|
||||
import { createHash } from 'node:crypto';
|
||||
|
||||
import { ExtractRequestSchema, BatchExtractRequestSchema } from './types.js';
|
||||
import { sidecarExtract, sidecarExtractBatch, sidecarHealth } from '../../lib/python-bridge.js';
|
||||
import {
|
||||
sidecarExtract,
|
||||
sidecarExtractBatch,
|
||||
sidecarHealth,
|
||||
type SidecarExtractResponse,
|
||||
} from '../../lib/python-bridge.js';
|
||||
import { BadRequestError } from '../../lib/errors.js';
|
||||
import { checkQuota, incrementUsage, getUsageSummary } from './usage.js';
|
||||
|
||||
// ── In-memory LRU cache ────────────────────────────────────────
|
||||
const CACHE_TTL_MS = parseInt(process.env.EXTRACTION_CACHE_TTL_MS || '86400000', 10); // 24h
|
||||
const CACHE_MAX = parseInt(process.env.EXTRACTION_CACHE_MAX || '500', 10);
|
||||
|
||||
interface CacheEntry {
|
||||
response: SidecarExtractResponse;
|
||||
createdAt: number;
|
||||
}
|
||||
|
||||
const cache = new Map<string, CacheEntry>();
|
||||
let cacheHits = 0;
|
||||
let cacheMisses = 0;
|
||||
|
||||
function cacheKey(text: string, taskId?: string, modelId?: string): string {
|
||||
return createHash('sha256')
|
||||
.update(`${taskId || ''}:${modelId || ''}:${text}`)
|
||||
.digest('hex');
|
||||
}
|
||||
|
||||
function cacheGet(text: string, taskId?: string, modelId?: string): SidecarExtractResponse | null {
|
||||
const key = cacheKey(text, taskId, modelId);
|
||||
const entry = cache.get(key);
|
||||
if (!entry) {
|
||||
cacheMisses++;
|
||||
return null;
|
||||
}
|
||||
if (Date.now() - entry.createdAt > CACHE_TTL_MS) {
|
||||
cache.delete(key);
|
||||
cacheMisses++;
|
||||
return null;
|
||||
}
|
||||
cacheHits++;
|
||||
return entry.response;
|
||||
}
|
||||
|
||||
function cachePut(
|
||||
text: string,
|
||||
taskId: string | undefined,
|
||||
modelId: string | undefined,
|
||||
response: SidecarExtractResponse
|
||||
): void {
|
||||
// Evict oldest if at capacity
|
||||
if (cache.size >= CACHE_MAX) {
|
||||
const firstKey = cache.keys().next().value;
|
||||
if (firstKey) cache.delete(firstKey);
|
||||
}
|
||||
cache.set(cacheKey(text, taskId, modelId), { response, createdAt: Date.now() });
|
||||
}
|
||||
|
||||
export async function extractRoutes(app: FastifyInstance) {
|
||||
// Rate limiting for extraction endpoints — 30 req/min per IP (configurable)
|
||||
@ -24,8 +80,44 @@ export async function extractRoutes(app: FastifyInstance) {
|
||||
const { text, taskId, taskPrompt, examples, modelId, options } = parsed.data;
|
||||
const requestId = req.headers['x-request-id'] as string | undefined;
|
||||
|
||||
// Enforce per-user daily quota
|
||||
const userId = req.headers['x-user-id'] as string | undefined;
|
||||
const userPlan = (req.headers['x-user-plan'] as string) || 'free';
|
||||
if (userId) {
|
||||
const quota = checkQuota(userId, userPlan);
|
||||
if (!quota.allowed) {
|
||||
reply.header('X-RateLimit-Limit', String(quota.limit));
|
||||
reply.header('X-RateLimit-Remaining', '0');
|
||||
return reply.status(429).send({
|
||||
error: 'Daily extraction quota exceeded',
|
||||
limit: quota.limit,
|
||||
used: quota.used,
|
||||
plan: userPlan,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
req.log.info({ taskId, modelId, textLength: text.length }, 'extraction request');
|
||||
|
||||
// Check cache
|
||||
const cached = cacheGet(text, taskId, modelId);
|
||||
if (cached) {
|
||||
req.log.info({ taskId }, 'cache hit');
|
||||
reply.header('X-Extraction-Cache', 'HIT');
|
||||
return reply.send({
|
||||
extractions: cached.extractions,
|
||||
metadata: {
|
||||
modelId: cached.metadata.model_id,
|
||||
durationMs: cached.metadata.duration_ms,
|
||||
tokenCount: cached.metadata.token_count,
|
||||
charCount: cached.metadata.char_count,
|
||||
},
|
||||
requestId,
|
||||
});
|
||||
}
|
||||
|
||||
reply.header('X-Extraction-Cache', 'MISS');
|
||||
|
||||
const result = await sidecarExtract(
|
||||
{
|
||||
text,
|
||||
@ -47,6 +139,9 @@ export async function extractRoutes(app: FastifyInstance) {
|
||||
requestId
|
||||
);
|
||||
|
||||
cachePut(text, taskId, modelId, result);
|
||||
if (userId) incrementUsage(userId, userPlan);
|
||||
|
||||
req.log.info(
|
||||
{ entityCount: result.extractions.length, durationMs: result.metadata.duration_ms },
|
||||
'extraction complete'
|
||||
@ -133,4 +228,31 @@ export async function extractRoutes(app: FastifyInstance) {
|
||||
return reply.status(503).send({ status: 'error', error: message });
|
||||
}
|
||||
});
|
||||
|
||||
/**
|
||||
* GET /extract/usage — Per-user extraction usage (admin).
|
||||
*/
|
||||
app.get('/extract/usage', async (req, reply) => {
|
||||
const userId = (req.query as Record<string, string>).userId;
|
||||
const plan = (req.query as Record<string, string>).plan || 'free';
|
||||
if (!userId) {
|
||||
throw new BadRequestError('userId query parameter is required');
|
||||
}
|
||||
return reply.send(getUsageSummary(userId, plan));
|
||||
});
|
||||
|
||||
/**
|
||||
* GET /extract/cache-stats — Cache statistics.
|
||||
*/
|
||||
app.get('/extract/cache-stats', async (_req, reply) => {
|
||||
const total = cacheHits + cacheMisses;
|
||||
return reply.send({
|
||||
size: cache.size,
|
||||
maxSize: CACHE_MAX,
|
||||
ttlMs: CACHE_TTL_MS,
|
||||
hits: cacheHits,
|
||||
misses: cacheMisses,
|
||||
hitRate: total > 0 ? Math.round((cacheHits / total) * 1000) / 1000 : 0,
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
114
services/extraction-service/src/modules/extract/usage.ts
Normal file
114
services/extraction-service/src/modules/extract/usage.ts
Normal file
@ -0,0 +1,114 @@
|
||||
/**
|
||||
* Per-user daily extraction quota enforcement.
|
||||
*
|
||||
* Plan tiers:
|
||||
* free: 10 extractions/day
|
||||
* pro: 100 extractions/day
|
||||
* enterprise: unlimited
|
||||
*
|
||||
* Usage tracked in Cosmos `extraction_usage` container (partition: /userId).
|
||||
*/
|
||||
|
||||
import { z } from 'zod';
|
||||
|
||||
// ── Quota tiers ──────────────────────────────────────────────────
|
||||
|
||||
const PLAN_QUOTAS: Record<string, number> = {
|
||||
free: 10,
|
||||
pro: 100,
|
||||
enterprise: Infinity,
|
||||
};
|
||||
|
||||
export function getQuota(plan: string): number {
|
||||
return PLAN_QUOTAS[plan] ?? PLAN_QUOTAS.free;
|
||||
}
|
||||
|
||||
// ── Usage document schema ────────────────────────────────────────
|
||||
|
||||
export const ExtractionUsageSchema = z.object({
|
||||
id: z.string(),
|
||||
userId: z.string(),
|
||||
productId: z.string(),
|
||||
date: z.string(), // YYYY-MM-DD
|
||||
count: z.number().int().min(0),
|
||||
plan: z.string(),
|
||||
updatedAt: z.string(),
|
||||
});
|
||||
|
||||
export type ExtractionUsage = z.infer<typeof ExtractionUsageSchema>;
|
||||
|
||||
// ── In-memory usage tracker (no Cosmos dependency for now) ───────
|
||||
|
||||
const usageStore = new Map<string, { count: number; date: string }>();
|
||||
|
||||
function todayKey(): string {
|
||||
return new Date().toISOString().slice(0, 10);
|
||||
}
|
||||
|
||||
function storeKey(userId: string): string {
|
||||
return `${userId}:${todayKey()}`;
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if user is within their daily quota.
|
||||
* Returns { allowed, remaining, limit, used }.
|
||||
*/
|
||||
export function checkQuota(
|
||||
userId: string,
|
||||
plan: string = 'free'
|
||||
): { allowed: boolean; remaining: number; limit: number; used: number } {
|
||||
const limit = getQuota(plan);
|
||||
if (limit === Infinity) {
|
||||
return { allowed: true, remaining: Infinity, limit, used: 0 };
|
||||
}
|
||||
|
||||
const key = storeKey(userId);
|
||||
const entry = usageStore.get(key);
|
||||
const today = todayKey();
|
||||
|
||||
// Reset if new day
|
||||
const used = entry && entry.date === today ? entry.count : 0;
|
||||
const remaining = Math.max(0, limit - used);
|
||||
|
||||
return { allowed: used < limit, remaining, limit, used };
|
||||
}
|
||||
|
||||
/**
|
||||
* Increment usage counter for user. Call after successful extraction.
|
||||
*/
|
||||
export function incrementUsage(userId: string, _plan: string = 'free'): void {
|
||||
const key = storeKey(userId);
|
||||
const today = todayKey();
|
||||
const entry = usageStore.get(key);
|
||||
|
||||
if (entry && entry.date === today) {
|
||||
entry.count++;
|
||||
} else {
|
||||
usageStore.set(key, { count: 1, date: today });
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get usage summary for a user (for the usage reporting endpoint).
|
||||
*/
|
||||
export function getUsageSummary(
|
||||
userId: string,
|
||||
plan: string = 'free'
|
||||
): {
|
||||
userId: string;
|
||||
date: string;
|
||||
used: number;
|
||||
limit: number;
|
||||
remaining: number;
|
||||
plan: string;
|
||||
} {
|
||||
const { used, limit, remaining } = checkQuota(userId, plan);
|
||||
return {
|
||||
userId,
|
||||
date: todayKey(),
|
||||
used,
|
||||
limit: limit === Infinity ? -1 : limit,
|
||||
remaining: remaining === Infinity ? -1 : remaining,
|
||||
plan,
|
||||
};
|
||||
}
|
||||
Loading…
Reference in New Issue
Block a user