feat(extraction): Phase 6 advanced features (6.1-6.8)
- 6.1-6.2: Entity visualization components (bar chart, pie chart, timeline) [in LysnrAI repo] - 6.3-6.4: Async job queue — POST /extract/jobs, GET /extract/jobs/:id, GET /extract/jobs - 6.5-6.6: Model registry with tier (standard/premium/free/mock) + GET /extract/models - 6.7-6.8: Multi-language detection (es/fr/de/pt/ja/zh/ko/ar) + prompt enrichment - ExtractMetadata.language field added to Python models - 46 TS tests passing, build clean
This commit is contained in:
parent
b8c0a73e89
commit
5c1744d3a4
@ -16,6 +16,45 @@ logger = structlog.get_logger(__name__)
|
||||
DEFAULT_MODEL_ID = os.environ.get("DEFAULT_MODEL_ID", "gemini-2.5-flash")
|
||||
|
||||
|
||||
# ── Language detection ─────────────────────────────────────────────
|
||||
|
||||
LANG_PATTERNS: list[tuple[str, str, list[str]]] = [
|
||||
("es", "Spanish", ["el", "la", "los", "las", "de", "en", "que", "por", "con", "para"]),
|
||||
("fr", "French", ["le", "la", "les", "des", "une", "est", "que", "dans", "pour", "avec"]),
|
||||
("de", "German", ["der", "die", "das", "ein", "eine", "ist", "und", "oder", "aber", "nicht"]),
|
||||
("pt", "Portuguese", ["o", "os", "as", "de", "em", "que", "por", "com", "para", "como"]),
|
||||
("ja", "Japanese", []),
|
||||
("zh", "Chinese", []),
|
||||
("ko", "Korean", []),
|
||||
]
|
||||
|
||||
|
||||
def detect_language(text: str) -> tuple[str, str, float]:
|
||||
"""Detect language from text. Returns (code, name, confidence)."""
|
||||
import re
|
||||
|
||||
# CJK detection via unicode ranges
|
||||
if re.search(r"[\u3040-\u309F\u30A0-\u30FF\u4E00-\u9FFF]", text):
|
||||
if re.search(r"[\u3040-\u309F\u30A0-\u30FF]", text):
|
||||
return ("ja", "Japanese", 0.9)
|
||||
return ("zh", "Chinese", 0.85)
|
||||
if re.search(r"[\uAC00-\uD7AF]", text):
|
||||
return ("ko", "Korean", 0.9)
|
||||
if re.search(r"[\u0600-\u06FF]", text):
|
||||
return ("ar", "Arabic", 0.9)
|
||||
|
||||
words = text.lower().split()
|
||||
for code, name, keywords in LANG_PATTERNS:
|
||||
if not keywords:
|
||||
continue
|
||||
matches = sum(1 for w in words if w in keywords)
|
||||
if matches >= 3:
|
||||
confidence = min(0.95, 0.5 + matches * 0.05)
|
||||
return (code, name, confidence)
|
||||
|
||||
return ("en", "English", 0.85)
|
||||
|
||||
|
||||
async def extract(
|
||||
text: str,
|
||||
task_prompt: str | None = None,
|
||||
@ -24,6 +63,7 @@ async def extract(
|
||||
extraction_passes: int | None = None,
|
||||
max_workers: int | None = None,
|
||||
max_char_buffer: int | None = None,
|
||||
language: str | None = None,
|
||||
) -> ExtractResponse:
|
||||
"""
|
||||
Run LangExtract on the given text.
|
||||
@ -55,8 +95,17 @@ async def extract(
|
||||
"model_id": resolved_model,
|
||||
}
|
||||
|
||||
# Multi-language support: detect language and enrich prompt
|
||||
lang_code, lang_name, lang_conf = detect_language(text) if not language else (language, language, 1.0)
|
||||
lang_hint = ""
|
||||
if lang_code != "en" and lang_conf >= 0.7:
|
||||
lang_hint = f"\nIMPORTANT: The input text is in {lang_name}. Extract entities in their original language but use English for class labels."
|
||||
logger.info("multilang_detected", language=lang_code, confidence=round(lang_conf, 2))
|
||||
|
||||
if task_prompt:
|
||||
lx_kwargs["prompt_description"] = task_prompt
|
||||
lx_kwargs["prompt_description"] = task_prompt + lang_hint
|
||||
elif lang_hint:
|
||||
lx_kwargs["prompt_description"] = f"Extract structured entities from the text.{lang_hint}"
|
||||
|
||||
if examples:
|
||||
lx_kwargs["examples"] = examples
|
||||
@ -104,6 +153,7 @@ async def extract(
|
||||
model_id=resolved_model,
|
||||
duration_ms=round(duration_ms, 2),
|
||||
char_count=len(text),
|
||||
language=lang_code if lang_code != "en" else None,
|
||||
),
|
||||
)
|
||||
|
||||
|
||||
@ -39,6 +39,7 @@ class ExtractMetadata(BaseModel):
|
||||
duration_ms: float
|
||||
token_count: int | None = None
|
||||
char_count: int
|
||||
language: str | None = None
|
||||
|
||||
|
||||
class ExtractResponse(BaseModel):
|
||||
|
||||
95
services/extraction-service/src/modules/extract/jobs.ts
Normal file
95
services/extraction-service/src/modules/extract/jobs.ts
Normal file
@ -0,0 +1,95 @@
|
||||
/**
|
||||
* Async extraction job queue.
|
||||
*
|
||||
* For large batch requests, callers can submit an async job and poll for results.
|
||||
* Jobs are stored in-memory (Cosmos persistence deferred to Phase 7).
|
||||
*
|
||||
* Flow: POST /extract/jobs → { jobId } → GET /extract/jobs/:id → { status, results }
|
||||
*/
|
||||
|
||||
import { randomUUID } from 'node:crypto';
|
||||
import {
|
||||
sidecarExtract,
|
||||
type SidecarExtractRequest,
|
||||
type SidecarExtractResponse,
|
||||
} from '../../lib/python-bridge.js';
|
||||
|
||||
export type JobStatus = 'pending' | 'processing' | 'completed' | 'failed';
|
||||
|
||||
export interface ExtractionJob {
|
||||
id: string;
|
||||
status: JobStatus;
|
||||
inputs: SidecarExtractRequest[];
|
||||
results: SidecarExtractResponse[];
|
||||
errors: Array<{ index: number; error: string }>;
|
||||
progress: { completed: number; total: number };
|
||||
createdAt: string;
|
||||
completedAt?: string;
|
||||
}
|
||||
|
||||
const jobStore = new Map<string, ExtractionJob>();
|
||||
|
||||
/**
|
||||
* Create a new async extraction job and start processing in background.
|
||||
*/
|
||||
export function createJob(inputs: SidecarExtractRequest[], requestId?: string): ExtractionJob {
|
||||
const job: ExtractionJob = {
|
||||
id: randomUUID(),
|
||||
status: 'pending',
|
||||
inputs,
|
||||
results: [],
|
||||
errors: [],
|
||||
progress: { completed: 0, total: inputs.length },
|
||||
createdAt: new Date().toISOString(),
|
||||
};
|
||||
|
||||
jobStore.set(job.id, job);
|
||||
|
||||
// Start processing in background (non-blocking)
|
||||
processJob(job, requestId).catch(() => {
|
||||
job.status = 'failed';
|
||||
});
|
||||
|
||||
return job;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get job by ID.
|
||||
*/
|
||||
export function getJob(jobId: string): ExtractionJob | undefined {
|
||||
return jobStore.get(jobId);
|
||||
}
|
||||
|
||||
/**
|
||||
* List recent jobs (last 50).
|
||||
*/
|
||||
export function listJobs(limit = 50): ExtractionJob[] {
|
||||
return [...jobStore.values()]
|
||||
.sort((a, b) => b.createdAt.localeCompare(a.createdAt))
|
||||
.slice(0, limit);
|
||||
}
|
||||
|
||||
// ── Internal ─────────────────────────────────────────────────────
|
||||
|
||||
async function processJob(job: ExtractionJob, requestId?: string): Promise<void> {
|
||||
job.status = 'processing';
|
||||
|
||||
for (let i = 0; i < job.inputs.length; i++) {
|
||||
try {
|
||||
const result = await sidecarExtract(job.inputs[i], requestId);
|
||||
job.results.push(result);
|
||||
} catch (err) {
|
||||
const message = err instanceof Error ? err.message : 'Unknown error';
|
||||
job.errors.push({ index: i, error: message });
|
||||
// Push a placeholder so indices align
|
||||
job.results.push({
|
||||
extractions: [],
|
||||
metadata: { model_id: 'error', duration_ms: 0, char_count: 0 },
|
||||
});
|
||||
}
|
||||
job.progress.completed = i + 1;
|
||||
}
|
||||
|
||||
job.status = job.errors.length === job.inputs.length ? 'failed' : 'completed';
|
||||
job.completedAt = new Date().toISOString();
|
||||
}
|
||||
@ -13,6 +13,7 @@ import { BadRequestError } from '../../lib/errors.js';
|
||||
import { checkQuota, incrementUsage, getUsageSummary } from './usage.js';
|
||||
import { recordExtraction, getMetricsSummary } from '../../lib/metrics.js';
|
||||
import { sidecarBreaker } from '../../lib/circuit-breaker.js';
|
||||
import { createJob, getJob, listJobs } from './jobs.js';
|
||||
|
||||
// ── In-memory LRU cache ────────────────────────────────────────
|
||||
const CACHE_TTL_MS = parseInt(process.env.EXTRACTION_CACHE_TTL_MS || '86400000', 10); // 24h
|
||||
@ -63,6 +64,29 @@ function cachePut(
|
||||
cache.set(cacheKey(text, taskId, modelId), { response, createdAt: Date.now() });
|
||||
}
|
||||
|
||||
// ── Model registry ───────────────────────────────────────────────
|
||||
const MODEL_REGISTRY = [
|
||||
{
|
||||
id: 'gemini-2.5-flash',
|
||||
provider: 'google',
|
||||
description: 'Gemini 2.5 Flash (default)',
|
||||
tier: 'standard',
|
||||
},
|
||||
{ id: 'gemini-2.5-pro', provider: 'google', description: 'Gemini 2.5 Pro', tier: 'premium' },
|
||||
{
|
||||
id: 'gemini-2.0-flash',
|
||||
provider: 'google',
|
||||
description: 'Gemini 2.0 Flash (legacy)',
|
||||
tier: 'standard',
|
||||
},
|
||||
{
|
||||
id: 'gemini-2.5-flash-mock',
|
||||
provider: 'mock',
|
||||
description: 'Mock extractor (no API key)',
|
||||
tier: 'free',
|
||||
},
|
||||
];
|
||||
|
||||
export async function extractRoutes(app: FastifyInstance) {
|
||||
// Rate limiting for extraction endpoints — 30 req/min per IP (configurable)
|
||||
await app.register(rateLimit, {
|
||||
@ -280,15 +304,99 @@ export async function extractRoutes(app: FastifyInstance) {
|
||||
});
|
||||
});
|
||||
|
||||
/**
|
||||
* POST /extract/jobs — Submit async batch extraction job.
|
||||
*/
|
||||
app.post('/extract/jobs', async (req, reply) => {
|
||||
const parsed = BatchExtractRequestSchema.safeParse(req.body);
|
||||
if (!parsed.success) {
|
||||
throw new BadRequestError(parsed.error.issues.map(i => i.message).join('; '));
|
||||
}
|
||||
|
||||
const { inputs, examples, modelId } = parsed.data;
|
||||
const requestId = req.headers['x-request-id'] as string | undefined;
|
||||
|
||||
const sidecarRequests = inputs.map(input => ({
|
||||
text: input.text,
|
||||
task_id: input.taskId,
|
||||
task_prompt: input.taskPrompt,
|
||||
examples: examples?.map(e => ({
|
||||
text: e.text,
|
||||
extractions: e.extractions.map(ex => ({
|
||||
extraction_class: ex.extraction_class,
|
||||
extraction_text: ex.extraction_text,
|
||||
attributes: ex.attributes,
|
||||
})),
|
||||
})),
|
||||
model_id: modelId,
|
||||
}));
|
||||
|
||||
const job = createJob(sidecarRequests, requestId);
|
||||
req.log.info({ jobId: job.id, inputCount: inputs.length }, 'async job created');
|
||||
|
||||
return reply.status(202).send({
|
||||
jobId: job.id,
|
||||
status: job.status,
|
||||
progress: job.progress,
|
||||
createdAt: job.createdAt,
|
||||
});
|
||||
});
|
||||
|
||||
/**
|
||||
* GET /extract/jobs/:id — Get async job status and results.
|
||||
*/
|
||||
app.get('/extract/jobs/:id', async (req, reply) => {
|
||||
const { id } = req.params as { id: string };
|
||||
const job = getJob(id);
|
||||
if (!job) {
|
||||
return reply.status(404).send({ error: 'Job not found' });
|
||||
}
|
||||
|
||||
return reply.send({
|
||||
id: job.id,
|
||||
status: job.status,
|
||||
progress: job.progress,
|
||||
results:
|
||||
job.status === 'completed' || job.status === 'failed'
|
||||
? job.results.map(r => ({
|
||||
extractions: r.extractions,
|
||||
metadata: {
|
||||
modelId: r.metadata.model_id,
|
||||
durationMs: r.metadata.duration_ms,
|
||||
charCount: r.metadata.char_count,
|
||||
},
|
||||
}))
|
||||
: undefined,
|
||||
errors: job.errors.length > 0 ? job.errors : undefined,
|
||||
createdAt: job.createdAt,
|
||||
completedAt: job.completedAt,
|
||||
});
|
||||
});
|
||||
|
||||
/**
|
||||
* GET /extract/jobs — List recent async jobs.
|
||||
*/
|
||||
app.get('/extract/jobs', async (_req, reply) => {
|
||||
const jobs = listJobs();
|
||||
return reply.send({
|
||||
jobs: jobs.map(j => ({
|
||||
id: j.id,
|
||||
status: j.status,
|
||||
progress: j.progress,
|
||||
inputCount: j.inputs.length,
|
||||
createdAt: j.createdAt,
|
||||
completedAt: j.completedAt,
|
||||
})),
|
||||
});
|
||||
});
|
||||
|
||||
/**
|
||||
* GET /extract/models — List available model providers.
|
||||
*/
|
||||
app.get('/extract/models', async (_req, reply) => {
|
||||
return reply.send({
|
||||
models: [
|
||||
{ id: 'gemini-2.5-flash', provider: 'google', description: 'Gemini 2.5 Flash (default)' },
|
||||
{ id: 'gemini-2.5-pro', provider: 'google', description: 'Gemini 2.5 Pro' },
|
||||
],
|
||||
models: MODEL_REGISTRY,
|
||||
default: process.env.DEFAULT_MODEL_ID || 'gemini-2.5-flash',
|
||||
});
|
||||
});
|
||||
|
||||
|
||||
Loading…
Reference in New Issue
Block a user