learning_ai_common_plat/services/extraction-service/python/src/extractor.py
saravanakumardb1 5c1744d3a4 feat(extraction): Phase 6 advanced features (6.1-6.8)
- 6.1-6.2: Entity visualization components (bar chart, pie chart, timeline) [in LysnrAI repo]
- 6.3-6.4: Async job queue — POST /extract/jobs, GET /extract/jobs/:id, GET /extract/jobs
- 6.5-6.6: Model registry with tier (standard/premium/free/mock) + GET /extract/models
- 6.7-6.8: Multi-language detection (es/fr/de/pt/ja/zh/ko/ar) + prompt enrichment
- ExtractMetadata.language field added to Python models
- 46 TS tests passing, build clean
2026-02-14 14:08:02 -08:00

211 lines
7.2 KiB
Python

"""
LangExtract wrapper — calls lx.extract() with configurable parameters.
"""
from __future__ import annotations
import os
import time
import structlog
from .models import Extraction, ExtractMetadata, ExtractResponse
logger = structlog.get_logger(__name__)
DEFAULT_MODEL_ID = os.environ.get("DEFAULT_MODEL_ID", "gemini-2.5-flash")
# ── Language detection ─────────────────────────────────────────────
LANG_PATTERNS: list[tuple[str, str, list[str]]] = [
("es", "Spanish", ["el", "la", "los", "las", "de", "en", "que", "por", "con", "para"]),
("fr", "French", ["le", "la", "les", "des", "une", "est", "que", "dans", "pour", "avec"]),
("de", "German", ["der", "die", "das", "ein", "eine", "ist", "und", "oder", "aber", "nicht"]),
("pt", "Portuguese", ["o", "os", "as", "de", "em", "que", "por", "com", "para", "como"]),
("ja", "Japanese", []),
("zh", "Chinese", []),
("ko", "Korean", []),
]
def detect_language(text: str) -> tuple[str, str, float]:
"""Detect language from text. Returns (code, name, confidence)."""
import re
# CJK detection via unicode ranges
if re.search(r"[\u3040-\u309F\u30A0-\u30FF\u4E00-\u9FFF]", text):
if re.search(r"[\u3040-\u309F\u30A0-\u30FF]", text):
return ("ja", "Japanese", 0.9)
return ("zh", "Chinese", 0.85)
if re.search(r"[\uAC00-\uD7AF]", text):
return ("ko", "Korean", 0.9)
if re.search(r"[\u0600-\u06FF]", text):
return ("ar", "Arabic", 0.9)
words = text.lower().split()
for code, name, keywords in LANG_PATTERNS:
if not keywords:
continue
matches = sum(1 for w in words if w in keywords)
if matches >= 3:
confidence = min(0.95, 0.5 + matches * 0.05)
return (code, name, confidence)
return ("en", "English", 0.85)
async def extract(
text: str,
task_prompt: str | None = None,
examples: list[dict] | None = None,
model_id: str | None = None,
extraction_passes: int | None = None,
max_workers: int | None = None,
max_char_buffer: int | None = None,
language: str | None = None,
) -> ExtractResponse:
"""
Run LangExtract on the given text.
Falls back to a mock implementation if:
- langextract is not installed
- No API key is configured (GEMINI_API_KEY env var)
- USE_MOCK_EXTRACTOR env var is set to "true"
"""
resolved_model = model_id or DEFAULT_MODEL_ID
start_time = time.monotonic()
# Use mock if explicitly requested or no API key configured
use_mock = (
os.environ.get("USE_MOCK_EXTRACTOR", "").lower() == "true"
or not os.environ.get("GEMINI_API_KEY")
)
if use_mock:
logger.info("using_mock_extractor", reason="no API key or mock requested")
return _mock_extract(text, resolved_model, start_time)
try:
import langextract as lx
# Build LangExtract keyword arguments
# API: lx.extract(text_or_documents, prompt_description, examples, model_id, ...)
lx_kwargs: dict = {
"model_id": resolved_model,
}
# Multi-language support: detect language and enrich prompt
lang_code, lang_name, lang_conf = detect_language(text) if not language else (language, language, 1.0)
lang_hint = ""
if lang_code != "en" and lang_conf >= 0.7:
lang_hint = f"\nIMPORTANT: The input text is in {lang_name}. Extract entities in their original language but use English for class labels."
logger.info("multilang_detected", language=lang_code, confidence=round(lang_conf, 2))
if task_prompt:
lx_kwargs["prompt_description"] = task_prompt + lang_hint
elif lang_hint:
lx_kwargs["prompt_description"] = f"Extract structured entities from the text.{lang_hint}"
if examples:
lx_kwargs["examples"] = examples
if extraction_passes is not None:
lx_kwargs["extraction_passes"] = extraction_passes
if max_workers is not None:
lx_kwargs["max_workers"] = max_workers
if max_char_buffer is not None:
lx_kwargs["max_char_buffer"] = max_char_buffer
# text_or_documents is the first positional argument
result = lx.extract(text, **lx_kwargs)
# lx.extract returns AnnotatedDocument or list[AnnotatedDocument]
# Each AnnotatedDocument has .annotations — list of Annotation objects
extractions: list[Extraction] = []
docs = result if isinstance(result, list) else [result]
for doc in docs:
if hasattr(doc, "annotations"):
for ann in doc.annotations:
extractions.append(
Extraction(
extraction_class=getattr(ann, "label", getattr(ann, "type", "unknown")),
extraction_text=getattr(ann, "text", str(ann)),
attributes=getattr(ann, "attributes", None),
)
)
duration_ms = (time.monotonic() - start_time) * 1000
logger.info(
"extraction_complete",
model_id=resolved_model,
entity_count=len(extractions),
duration_ms=round(duration_ms, 2),
char_count=len(text),
)
return ExtractResponse(
extractions=extractions,
metadata=ExtractMetadata(
model_id=resolved_model,
duration_ms=round(duration_ms, 2),
char_count=len(text),
language=lang_code if lang_code != "en" else None,
),
)
except ImportError:
logger.warning("langextract_not_installed", fallback="mock")
return _mock_extract(text, resolved_model, start_time)
except Exception as exc:
logger.error("extraction_failed", error=str(exc), model_id=resolved_model)
raise
def _mock_extract(
text: str,
model_id: str,
start_time: float,
) -> ExtractResponse:
"""
Mock extraction for development when langextract or API keys are unavailable.
Returns simple keyword-based extractions.
"""
extractions: list[Extraction] = []
words = text.lower().split()
if any(w in words for w in ["meeting", "call", "sync", "standup"]):
extractions.append(Extraction(
extraction_class="topic",
extraction_text="meeting",
attributes={"type": "event"},
))
if any(w in words for w in ["todo", "action", "task", "need to", "should"]):
extractions.append(Extraction(
extraction_class="action_item",
extraction_text=text[:100],
attributes={"priority": "medium"},
))
if any(w in words for w in ["decided", "agreed", "decision", "will"]):
extractions.append(Extraction(
extraction_class="decision",
extraction_text=text[:100],
))
duration_ms = (time.monotonic() - start_time) * 1000
return ExtractResponse(
extractions=extractions,
metadata=ExtractMetadata(
model_id=f"{model_id}-mock",
duration_ms=round(duration_ms, 2),
char_count=len(text),
),
)