feat(extraction): Phase 6 advanced features (6.1-6.8)

- 6.1-6.2: Entity visualization components (bar chart, pie chart, timeline) [in LysnrAI repo]
- 6.3-6.4: Async job queue — POST /extract/jobs, GET /extract/jobs/:id, GET /extract/jobs
- 6.5-6.6: Model registry with tier (standard/premium/free/mock) + GET /extract/models
- 6.7-6.8: Multi-language detection (es/fr/de/pt/ja/zh/ko/ar) + prompt enrichment
- ExtractMetadata.language field added to Python models
- 46 TS tests passing, build clean
This commit is contained in:
saravanakumardb1 2026-02-14 14:08:02 -08:00
parent b8c0a73e89
commit 5c1744d3a4
4 changed files with 259 additions and 5 deletions

View File

@ -16,6 +16,45 @@ logger = structlog.get_logger(__name__)
DEFAULT_MODEL_ID = os.environ.get("DEFAULT_MODEL_ID", "gemini-2.5-flash")
# ── Language detection ─────────────────────────────────────────────
LANG_PATTERNS: list[tuple[str, str, list[str]]] = [
("es", "Spanish", ["el", "la", "los", "las", "de", "en", "que", "por", "con", "para"]),
("fr", "French", ["le", "la", "les", "des", "une", "est", "que", "dans", "pour", "avec"]),
("de", "German", ["der", "die", "das", "ein", "eine", "ist", "und", "oder", "aber", "nicht"]),
("pt", "Portuguese", ["o", "os", "as", "de", "em", "que", "por", "com", "para", "como"]),
("ja", "Japanese", []),
("zh", "Chinese", []),
("ko", "Korean", []),
]
def detect_language(text: str) -> tuple[str, str, float]:
"""Detect language from text. Returns (code, name, confidence)."""
import re
# CJK detection via unicode ranges
if re.search(r"[\u3040-\u309F\u30A0-\u30FF\u4E00-\u9FFF]", text):
if re.search(r"[\u3040-\u309F\u30A0-\u30FF]", text):
return ("ja", "Japanese", 0.9)
return ("zh", "Chinese", 0.85)
if re.search(r"[\uAC00-\uD7AF]", text):
return ("ko", "Korean", 0.9)
if re.search(r"[\u0600-\u06FF]", text):
return ("ar", "Arabic", 0.9)
words = text.lower().split()
for code, name, keywords in LANG_PATTERNS:
if not keywords:
continue
matches = sum(1 for w in words if w in keywords)
if matches >= 3:
confidence = min(0.95, 0.5 + matches * 0.05)
return (code, name, confidence)
return ("en", "English", 0.85)
async def extract(
text: str,
task_prompt: str | None = None,
@ -24,6 +63,7 @@ async def extract(
extraction_passes: int | None = None,
max_workers: int | None = None,
max_char_buffer: int | None = None,
language: str | None = None,
) -> ExtractResponse:
"""
Run LangExtract on the given text.
@ -55,8 +95,17 @@ async def extract(
"model_id": resolved_model,
}
# Multi-language support: detect language and enrich prompt
lang_code, lang_name, lang_conf = detect_language(text) if not language else (language, language, 1.0)
lang_hint = ""
if lang_code != "en" and lang_conf >= 0.7:
lang_hint = f"\nIMPORTANT: The input text is in {lang_name}. Extract entities in their original language but use English for class labels."
logger.info("multilang_detected", language=lang_code, confidence=round(lang_conf, 2))
if task_prompt:
lx_kwargs["prompt_description"] = task_prompt
lx_kwargs["prompt_description"] = task_prompt + lang_hint
elif lang_hint:
lx_kwargs["prompt_description"] = f"Extract structured entities from the text.{lang_hint}"
if examples:
lx_kwargs["examples"] = examples
@ -104,6 +153,7 @@ async def extract(
model_id=resolved_model,
duration_ms=round(duration_ms, 2),
char_count=len(text),
language=lang_code if lang_code != "en" else None,
),
)

View File

@ -39,6 +39,7 @@ class ExtractMetadata(BaseModel):
duration_ms: float
token_count: int | None = None
char_count: int
language: str | None = None
class ExtractResponse(BaseModel):

View File

@ -0,0 +1,95 @@
/**
* Async extraction job queue.
*
* For large batch requests, callers can submit an async job and poll for results.
* Jobs are stored in-memory (Cosmos persistence deferred to Phase 7).
*
* Flow: POST /extract/jobs { jobId } GET /extract/jobs/:id { status, results }
*/
import { randomUUID } from 'node:crypto';
import {
sidecarExtract,
type SidecarExtractRequest,
type SidecarExtractResponse,
} from '../../lib/python-bridge.js';
export type JobStatus = 'pending' | 'processing' | 'completed' | 'failed';
export interface ExtractionJob {
id: string;
status: JobStatus;
inputs: SidecarExtractRequest[];
results: SidecarExtractResponse[];
errors: Array<{ index: number; error: string }>;
progress: { completed: number; total: number };
createdAt: string;
completedAt?: string;
}
const jobStore = new Map<string, ExtractionJob>();
/**
* Create a new async extraction job and start processing in background.
*/
export function createJob(inputs: SidecarExtractRequest[], requestId?: string): ExtractionJob {
const job: ExtractionJob = {
id: randomUUID(),
status: 'pending',
inputs,
results: [],
errors: [],
progress: { completed: 0, total: inputs.length },
createdAt: new Date().toISOString(),
};
jobStore.set(job.id, job);
// Start processing in background (non-blocking)
processJob(job, requestId).catch(() => {
job.status = 'failed';
});
return job;
}
/**
* Get job by ID.
*/
export function getJob(jobId: string): ExtractionJob | undefined {
return jobStore.get(jobId);
}
/**
* List recent jobs (last 50).
*/
export function listJobs(limit = 50): ExtractionJob[] {
return [...jobStore.values()]
.sort((a, b) => b.createdAt.localeCompare(a.createdAt))
.slice(0, limit);
}
// ── Internal ─────────────────────────────────────────────────────
async function processJob(job: ExtractionJob, requestId?: string): Promise<void> {
job.status = 'processing';
for (let i = 0; i < job.inputs.length; i++) {
try {
const result = await sidecarExtract(job.inputs[i], requestId);
job.results.push(result);
} catch (err) {
const message = err instanceof Error ? err.message : 'Unknown error';
job.errors.push({ index: i, error: message });
// Push a placeholder so indices align
job.results.push({
extractions: [],
metadata: { model_id: 'error', duration_ms: 0, char_count: 0 },
});
}
job.progress.completed = i + 1;
}
job.status = job.errors.length === job.inputs.length ? 'failed' : 'completed';
job.completedAt = new Date().toISOString();
}

View File

@ -13,6 +13,7 @@ import { BadRequestError } from '../../lib/errors.js';
import { checkQuota, incrementUsage, getUsageSummary } from './usage.js';
import { recordExtraction, getMetricsSummary } from '../../lib/metrics.js';
import { sidecarBreaker } from '../../lib/circuit-breaker.js';
import { createJob, getJob, listJobs } from './jobs.js';
// ── In-memory LRU cache ────────────────────────────────────────
const CACHE_TTL_MS = parseInt(process.env.EXTRACTION_CACHE_TTL_MS || '86400000', 10); // 24h
@ -63,6 +64,29 @@ function cachePut(
cache.set(cacheKey(text, taskId, modelId), { response, createdAt: Date.now() });
}
// ── Model registry ───────────────────────────────────────────────
const MODEL_REGISTRY = [
{
id: 'gemini-2.5-flash',
provider: 'google',
description: 'Gemini 2.5 Flash (default)',
tier: 'standard',
},
{ id: 'gemini-2.5-pro', provider: 'google', description: 'Gemini 2.5 Pro', tier: 'premium' },
{
id: 'gemini-2.0-flash',
provider: 'google',
description: 'Gemini 2.0 Flash (legacy)',
tier: 'standard',
},
{
id: 'gemini-2.5-flash-mock',
provider: 'mock',
description: 'Mock extractor (no API key)',
tier: 'free',
},
];
export async function extractRoutes(app: FastifyInstance) {
// Rate limiting for extraction endpoints — 30 req/min per IP (configurable)
await app.register(rateLimit, {
@ -280,15 +304,99 @@ export async function extractRoutes(app: FastifyInstance) {
});
});
/**
* POST /extract/jobs Submit async batch extraction job.
*/
app.post('/extract/jobs', async (req, reply) => {
const parsed = BatchExtractRequestSchema.safeParse(req.body);
if (!parsed.success) {
throw new BadRequestError(parsed.error.issues.map(i => i.message).join('; '));
}
const { inputs, examples, modelId } = parsed.data;
const requestId = req.headers['x-request-id'] as string | undefined;
const sidecarRequests = inputs.map(input => ({
text: input.text,
task_id: input.taskId,
task_prompt: input.taskPrompt,
examples: examples?.map(e => ({
text: e.text,
extractions: e.extractions.map(ex => ({
extraction_class: ex.extraction_class,
extraction_text: ex.extraction_text,
attributes: ex.attributes,
})),
})),
model_id: modelId,
}));
const job = createJob(sidecarRequests, requestId);
req.log.info({ jobId: job.id, inputCount: inputs.length }, 'async job created');
return reply.status(202).send({
jobId: job.id,
status: job.status,
progress: job.progress,
createdAt: job.createdAt,
});
});
/**
* GET /extract/jobs/:id Get async job status and results.
*/
app.get('/extract/jobs/:id', async (req, reply) => {
const { id } = req.params as { id: string };
const job = getJob(id);
if (!job) {
return reply.status(404).send({ error: 'Job not found' });
}
return reply.send({
id: job.id,
status: job.status,
progress: job.progress,
results:
job.status === 'completed' || job.status === 'failed'
? job.results.map(r => ({
extractions: r.extractions,
metadata: {
modelId: r.metadata.model_id,
durationMs: r.metadata.duration_ms,
charCount: r.metadata.char_count,
},
}))
: undefined,
errors: job.errors.length > 0 ? job.errors : undefined,
createdAt: job.createdAt,
completedAt: job.completedAt,
});
});
/**
* GET /extract/jobs List recent async jobs.
*/
app.get('/extract/jobs', async (_req, reply) => {
const jobs = listJobs();
return reply.send({
jobs: jobs.map(j => ({
id: j.id,
status: j.status,
progress: j.progress,
inputCount: j.inputs.length,
createdAt: j.createdAt,
completedAt: j.completedAt,
})),
});
});
/**
* GET /extract/models List available model providers.
*/
app.get('/extract/models', async (_req, reply) => {
return reply.send({
models: [
{ id: 'gemini-2.5-flash', provider: 'google', description: 'Gemini 2.5 Flash (default)' },
{ id: 'gemini-2.5-pro', provider: 'google', description: 'Gemini 2.5 Pro' },
],
models: MODEL_REGISTRY,
default: process.env.DEFAULT_MODEL_ID || 'gemini-2.5-flash',
});
});