diff --git a/services/extraction-service/python/src/extractor.py b/services/extraction-service/python/src/extractor.py index 92e3fe68..bbfbca66 100644 --- a/services/extraction-service/python/src/extractor.py +++ b/services/extraction-service/python/src/extractor.py @@ -16,6 +16,45 @@ logger = structlog.get_logger(__name__) DEFAULT_MODEL_ID = os.environ.get("DEFAULT_MODEL_ID", "gemini-2.5-flash") +# ── Language detection ───────────────────────────────────────────── + +LANG_PATTERNS: list[tuple[str, str, list[str]]] = [ + ("es", "Spanish", ["el", "la", "los", "las", "de", "en", "que", "por", "con", "para"]), + ("fr", "French", ["le", "la", "les", "des", "une", "est", "que", "dans", "pour", "avec"]), + ("de", "German", ["der", "die", "das", "ein", "eine", "ist", "und", "oder", "aber", "nicht"]), + ("pt", "Portuguese", ["o", "os", "as", "de", "em", "que", "por", "com", "para", "como"]), + ("ja", "Japanese", []), + ("zh", "Chinese", []), + ("ko", "Korean", []), +] + + +def detect_language(text: str) -> tuple[str, str, float]: + """Detect language from text. Returns (code, name, confidence).""" + import re + + # CJK detection via unicode ranges + if re.search(r"[\u3040-\u309F\u30A0-\u30FF\u4E00-\u9FFF]", text): + if re.search(r"[\u3040-\u309F\u30A0-\u30FF]", text): + return ("ja", "Japanese", 0.9) + return ("zh", "Chinese", 0.85) + if re.search(r"[\uAC00-\uD7AF]", text): + return ("ko", "Korean", 0.9) + if re.search(r"[\u0600-\u06FF]", text): + return ("ar", "Arabic", 0.9) + + words = text.lower().split() + for code, name, keywords in LANG_PATTERNS: + if not keywords: + continue + matches = sum(1 for w in words if w in keywords) + if matches >= 3: + confidence = min(0.95, 0.5 + matches * 0.05) + return (code, name, confidence) + + return ("en", "English", 0.85) + + async def extract( text: str, task_prompt: str | None = None, @@ -24,6 +63,7 @@ async def extract( extraction_passes: int | None = None, max_workers: int | None = None, max_char_buffer: int | None = None, + language: str | None = None, ) -> ExtractResponse: """ Run LangExtract on the given text. @@ -55,8 +95,17 @@ async def extract( "model_id": resolved_model, } + # Multi-language support: detect language and enrich prompt + lang_code, lang_name, lang_conf = detect_language(text) if not language else (language, language, 1.0) + lang_hint = "" + if lang_code != "en" and lang_conf >= 0.7: + lang_hint = f"\nIMPORTANT: The input text is in {lang_name}. Extract entities in their original language but use English for class labels." + logger.info("multilang_detected", language=lang_code, confidence=round(lang_conf, 2)) + if task_prompt: - lx_kwargs["prompt_description"] = task_prompt + lx_kwargs["prompt_description"] = task_prompt + lang_hint + elif lang_hint: + lx_kwargs["prompt_description"] = f"Extract structured entities from the text.{lang_hint}" if examples: lx_kwargs["examples"] = examples @@ -104,6 +153,7 @@ async def extract( model_id=resolved_model, duration_ms=round(duration_ms, 2), char_count=len(text), + language=lang_code if lang_code != "en" else None, ), ) diff --git a/services/extraction-service/python/src/models.py b/services/extraction-service/python/src/models.py index 2ba681b3..948e753a 100644 --- a/services/extraction-service/python/src/models.py +++ b/services/extraction-service/python/src/models.py @@ -39,6 +39,7 @@ class ExtractMetadata(BaseModel): duration_ms: float token_count: int | None = None char_count: int + language: str | None = None class ExtractResponse(BaseModel): diff --git a/services/extraction-service/src/modules/extract/jobs.ts b/services/extraction-service/src/modules/extract/jobs.ts new file mode 100644 index 00000000..da7a6190 --- /dev/null +++ b/services/extraction-service/src/modules/extract/jobs.ts @@ -0,0 +1,95 @@ +/** + * Async extraction job queue. + * + * For large batch requests, callers can submit an async job and poll for results. + * Jobs are stored in-memory (Cosmos persistence deferred to Phase 7). + * + * Flow: POST /extract/jobs → { jobId } → GET /extract/jobs/:id → { status, results } + */ + +import { randomUUID } from 'node:crypto'; +import { + sidecarExtract, + type SidecarExtractRequest, + type SidecarExtractResponse, +} from '../../lib/python-bridge.js'; + +export type JobStatus = 'pending' | 'processing' | 'completed' | 'failed'; + +export interface ExtractionJob { + id: string; + status: JobStatus; + inputs: SidecarExtractRequest[]; + results: SidecarExtractResponse[]; + errors: Array<{ index: number; error: string }>; + progress: { completed: number; total: number }; + createdAt: string; + completedAt?: string; +} + +const jobStore = new Map(); + +/** + * Create a new async extraction job and start processing in background. + */ +export function createJob(inputs: SidecarExtractRequest[], requestId?: string): ExtractionJob { + const job: ExtractionJob = { + id: randomUUID(), + status: 'pending', + inputs, + results: [], + errors: [], + progress: { completed: 0, total: inputs.length }, + createdAt: new Date().toISOString(), + }; + + jobStore.set(job.id, job); + + // Start processing in background (non-blocking) + processJob(job, requestId).catch(() => { + job.status = 'failed'; + }); + + return job; +} + +/** + * Get job by ID. + */ +export function getJob(jobId: string): ExtractionJob | undefined { + return jobStore.get(jobId); +} + +/** + * List recent jobs (last 50). + */ +export function listJobs(limit = 50): ExtractionJob[] { + return [...jobStore.values()] + .sort((a, b) => b.createdAt.localeCompare(a.createdAt)) + .slice(0, limit); +} + +// ── Internal ───────────────────────────────────────────────────── + +async function processJob(job: ExtractionJob, requestId?: string): Promise { + job.status = 'processing'; + + for (let i = 0; i < job.inputs.length; i++) { + try { + const result = await sidecarExtract(job.inputs[i], requestId); + job.results.push(result); + } catch (err) { + const message = err instanceof Error ? err.message : 'Unknown error'; + job.errors.push({ index: i, error: message }); + // Push a placeholder so indices align + job.results.push({ + extractions: [], + metadata: { model_id: 'error', duration_ms: 0, char_count: 0 }, + }); + } + job.progress.completed = i + 1; + } + + job.status = job.errors.length === job.inputs.length ? 'failed' : 'completed'; + job.completedAt = new Date().toISOString(); +} diff --git a/services/extraction-service/src/modules/extract/routes.ts b/services/extraction-service/src/modules/extract/routes.ts index 4030cc41..4bbc050a 100644 --- a/services/extraction-service/src/modules/extract/routes.ts +++ b/services/extraction-service/src/modules/extract/routes.ts @@ -13,6 +13,7 @@ import { BadRequestError } from '../../lib/errors.js'; import { checkQuota, incrementUsage, getUsageSummary } from './usage.js'; import { recordExtraction, getMetricsSummary } from '../../lib/metrics.js'; import { sidecarBreaker } from '../../lib/circuit-breaker.js'; +import { createJob, getJob, listJobs } from './jobs.js'; // ── In-memory LRU cache ──────────────────────────────────────── const CACHE_TTL_MS = parseInt(process.env.EXTRACTION_CACHE_TTL_MS || '86400000', 10); // 24h @@ -63,6 +64,29 @@ function cachePut( cache.set(cacheKey(text, taskId, modelId), { response, createdAt: Date.now() }); } +// ── Model registry ─────────────────────────────────────────────── +const MODEL_REGISTRY = [ + { + id: 'gemini-2.5-flash', + provider: 'google', + description: 'Gemini 2.5 Flash (default)', + tier: 'standard', + }, + { id: 'gemini-2.5-pro', provider: 'google', description: 'Gemini 2.5 Pro', tier: 'premium' }, + { + id: 'gemini-2.0-flash', + provider: 'google', + description: 'Gemini 2.0 Flash (legacy)', + tier: 'standard', + }, + { + id: 'gemini-2.5-flash-mock', + provider: 'mock', + description: 'Mock extractor (no API key)', + tier: 'free', + }, +]; + export async function extractRoutes(app: FastifyInstance) { // Rate limiting for extraction endpoints — 30 req/min per IP (configurable) await app.register(rateLimit, { @@ -280,15 +304,99 @@ export async function extractRoutes(app: FastifyInstance) { }); }); + /** + * POST /extract/jobs — Submit async batch extraction job. + */ + app.post('/extract/jobs', async (req, reply) => { + const parsed = BatchExtractRequestSchema.safeParse(req.body); + if (!parsed.success) { + throw new BadRequestError(parsed.error.issues.map(i => i.message).join('; ')); + } + + const { inputs, examples, modelId } = parsed.data; + const requestId = req.headers['x-request-id'] as string | undefined; + + const sidecarRequests = inputs.map(input => ({ + text: input.text, + task_id: input.taskId, + task_prompt: input.taskPrompt, + examples: examples?.map(e => ({ + text: e.text, + extractions: e.extractions.map(ex => ({ + extraction_class: ex.extraction_class, + extraction_text: ex.extraction_text, + attributes: ex.attributes, + })), + })), + model_id: modelId, + })); + + const job = createJob(sidecarRequests, requestId); + req.log.info({ jobId: job.id, inputCount: inputs.length }, 'async job created'); + + return reply.status(202).send({ + jobId: job.id, + status: job.status, + progress: job.progress, + createdAt: job.createdAt, + }); + }); + + /** + * GET /extract/jobs/:id — Get async job status and results. + */ + app.get('/extract/jobs/:id', async (req, reply) => { + const { id } = req.params as { id: string }; + const job = getJob(id); + if (!job) { + return reply.status(404).send({ error: 'Job not found' }); + } + + return reply.send({ + id: job.id, + status: job.status, + progress: job.progress, + results: + job.status === 'completed' || job.status === 'failed' + ? job.results.map(r => ({ + extractions: r.extractions, + metadata: { + modelId: r.metadata.model_id, + durationMs: r.metadata.duration_ms, + charCount: r.metadata.char_count, + }, + })) + : undefined, + errors: job.errors.length > 0 ? job.errors : undefined, + createdAt: job.createdAt, + completedAt: job.completedAt, + }); + }); + + /** + * GET /extract/jobs — List recent async jobs. + */ + app.get('/extract/jobs', async (_req, reply) => { + const jobs = listJobs(); + return reply.send({ + jobs: jobs.map(j => ({ + id: j.id, + status: j.status, + progress: j.progress, + inputCount: j.inputs.length, + createdAt: j.createdAt, + completedAt: j.completedAt, + })), + }); + }); + /** * GET /extract/models — List available model providers. */ app.get('/extract/models', async (_req, reply) => { return reply.send({ - models: [ - { id: 'gemini-2.5-flash', provider: 'google', description: 'Gemini 2.5 Flash (default)' }, - { id: 'gemini-2.5-pro', provider: 'google', description: 'Gemini 2.5 Pro' }, - ], + models: MODEL_REGISTRY, + default: process.env.DEFAULT_MODEL_ID || 'gemini-2.5-flash', }); });