learning_ai_common_plat/services/extraction-service/python/src/app.py
saravanakumardb1 9c8a3169dc feat(extraction): Phase 5 caching + cost controls (5.1-5.6)
- 5.1: Python sidecar LRU cache (cache.py) with configurable TTL + max size
- 5.2: Fastify-level cache with X-Extraction-Cache HIT/MISS header + /extract/cache-stats
- 5.3-5.5: Per-user daily quota (free=10, pro=100, enterprise=unlimited) with 429 response
- 5.6: GET /extract/usage endpoint for admin usage reporting
- Both Python + TS caches use sha256(taskId:modelId:text) keys
- 46 TS tests + 29 Python tests still passing
2026-02-14 14:02:21 -08:00

125 lines
3.5 KiB
Python

"""
Extraction sidecar — FastAPI app running LangExtract.
Internal service, not exposed externally. Called by the Fastify extraction-service.
Default port: 4006.
"""
from __future__ import annotations
import os
import structlog
import uvicorn
from fastapi import FastAPI, HTTPException, Request
from .cache import extraction_cache
from .extractor import extract
from .models import (
BatchExtractRequest,
ExtractRequest,
ExtractResponse,
HealthResponse,
)
logger = structlog.get_logger(__name__)
app = FastAPI(
title="Extraction Sidecar",
description="Internal LangExtract sidecar for extraction-service",
version="0.1.0",
)
@app.get("/health")
async def health():
base = HealthResponse()
return {**base.model_dump(), "cache": extraction_cache.stats}
@app.post("/extract", response_model=ExtractResponse)
async def extract_endpoint(req: ExtractRequest, request: Request) -> ExtractResponse:
request_id = request.headers.get("x-request-id", "")
logger.info(
"extract_request",
task_id=req.task_id,
model_id=req.model_id,
text_length=len(req.text),
request_id=request_id,
)
# Check cache first
cached = extraction_cache.get(req.text, req.task_id, req.model_id)
if cached is not None:
logger.info("cache_hit", task_id=req.task_id, request_id=request_id)
return cached
try:
result = await extract(
text=req.text,
task_prompt=req.task_prompt,
examples=[e.model_dump() for e in req.examples] if req.examples else None,
model_id=req.model_id,
extraction_passes=req.extraction_passes,
max_workers=req.max_workers,
max_char_buffer=req.max_char_buffer,
)
extraction_cache.put(req.text, req.task_id, req.model_id, result)
return result
except Exception as exc:
logger.error("extract_failed", error=str(exc), request_id=request_id)
raise HTTPException(status_code=500, detail=str(exc)) from exc
@app.post("/extract/batch", response_model=list[ExtractResponse])
async def extract_batch_endpoint(
batch: BatchExtractRequest,
request: Request,
) -> list[ExtractResponse]:
request_id = request.headers.get("x-request-id", "")
logger.info(
"batch_extract_request",
input_count=len(batch.requests),
request_id=request_id,
)
results: list[ExtractResponse] = []
for req in batch.requests:
try:
result = await extract(
text=req.text,
task_prompt=req.task_prompt,
examples=[e.model_dump() for e in req.examples] if req.examples else None,
model_id=req.model_id,
extraction_passes=req.extraction_passes,
max_workers=req.max_workers,
max_char_buffer=req.max_char_buffer,
)
results.append(result)
except Exception as exc:
logger.error("batch_item_failed", error=str(exc), request_id=request_id)
raise HTTPException(status_code=500, detail=str(exc)) from exc
return results
def main() -> None:
port = int(os.environ.get("SIDECAR_PORT", "4006"))
host = os.environ.get("SIDECAR_HOST", "0.0.0.0")
logger.info("sidecar_starting", port=port, host=host)
uvicorn.run(
"src.app:app",
host=host,
port=port,
reload=os.environ.get("NODE_ENV") == "development",
log_level="info",
)
if __name__ == "__main__":
main()