""" LangExtract wrapper — calls lx.extract() with configurable parameters. """ from __future__ import annotations import os import time import structlog from .models import Extraction, ExtractMetadata, ExtractResponse logger = structlog.get_logger(__name__) DEFAULT_MODEL_ID = os.environ.get("DEFAULT_MODEL_ID", "gemini-2.5-flash") # ── Language detection ───────────────────────────────────────────── LANG_PATTERNS: list[tuple[str, str, list[str]]] = [ ("es", "Spanish", ["el", "la", "los", "las", "de", "en", "que", "por", "con", "para"]), ("fr", "French", ["le", "la", "les", "des", "une", "est", "que", "dans", "pour", "avec"]), ("de", "German", ["der", "die", "das", "ein", "eine", "ist", "und", "oder", "aber", "nicht"]), ("pt", "Portuguese", ["o", "os", "as", "de", "em", "que", "por", "com", "para", "como"]), ("ja", "Japanese", []), ("zh", "Chinese", []), ("ko", "Korean", []), ] def detect_language(text: str) -> tuple[str, str, float]: """Detect language from text. Returns (code, name, confidence).""" import re # CJK detection via unicode ranges if re.search(r"[\u3040-\u309F\u30A0-\u30FF\u4E00-\u9FFF]", text): if re.search(r"[\u3040-\u309F\u30A0-\u30FF]", text): return ("ja", "Japanese", 0.9) return ("zh", "Chinese", 0.85) if re.search(r"[\uAC00-\uD7AF]", text): return ("ko", "Korean", 0.9) if re.search(r"[\u0600-\u06FF]", text): return ("ar", "Arabic", 0.9) words = text.lower().split() for code, name, keywords in LANG_PATTERNS: if not keywords: continue matches = sum(1 for w in words if w in keywords) if matches >= 3: confidence = min(0.95, 0.5 + matches * 0.05) return (code, name, confidence) return ("en", "English", 0.85) async def extract( text: str, task_prompt: str | None = None, examples: list[dict] | None = None, model_id: str | None = None, extraction_passes: int | None = None, max_workers: int | None = None, max_char_buffer: int | None = None, language: str | None = None, ) -> ExtractResponse: """ Run LangExtract on the given text. Falls back to a mock implementation if: - langextract is not installed - No API key is configured (GEMINI_API_KEY env var) - USE_MOCK_EXTRACTOR env var is set to "true" """ resolved_model = model_id or DEFAULT_MODEL_ID start_time = time.monotonic() # Use mock if explicitly requested or no API key configured use_mock = ( os.environ.get("USE_MOCK_EXTRACTOR", "").lower() == "true" or not os.environ.get("GEMINI_API_KEY") ) if use_mock: logger.info("using_mock_extractor", reason="no API key or mock requested") return _mock_extract(text, resolved_model, start_time) try: import langextract as lx # Build LangExtract keyword arguments # API: lx.extract(text_or_documents, prompt_description, examples, model_id, ...) lx_kwargs: dict = { "model_id": resolved_model, } # Multi-language support: detect language and enrich prompt lang_code, lang_name, lang_conf = detect_language(text) if not language else (language, language, 1.0) lang_hint = "" if lang_code != "en" and lang_conf >= 0.7: lang_hint = f"\nIMPORTANT: The input text is in {lang_name}. Extract entities in their original language but use English for class labels." logger.info("multilang_detected", language=lang_code, confidence=round(lang_conf, 2)) if task_prompt: lx_kwargs["prompt_description"] = task_prompt + lang_hint elif lang_hint: lx_kwargs["prompt_description"] = f"Extract structured entities from the text.{lang_hint}" if examples: lx_kwargs["examples"] = examples if extraction_passes is not None: lx_kwargs["extraction_passes"] = extraction_passes if max_workers is not None: lx_kwargs["max_workers"] = max_workers if max_char_buffer is not None: lx_kwargs["max_char_buffer"] = max_char_buffer # text_or_documents is the first positional argument result = lx.extract(text, **lx_kwargs) # lx.extract returns AnnotatedDocument or list[AnnotatedDocument] # Each AnnotatedDocument has .annotations — list of Annotation objects extractions: list[Extraction] = [] docs = result if isinstance(result, list) else [result] for doc in docs: if hasattr(doc, "annotations"): for ann in doc.annotations: extractions.append( Extraction( extraction_class=getattr(ann, "label", getattr(ann, "type", "unknown")), extraction_text=getattr(ann, "text", str(ann)), attributes=getattr(ann, "attributes", None), ) ) duration_ms = (time.monotonic() - start_time) * 1000 logger.info( "extraction_complete", model_id=resolved_model, entity_count=len(extractions), duration_ms=round(duration_ms, 2), char_count=len(text), ) return ExtractResponse( extractions=extractions, metadata=ExtractMetadata( model_id=resolved_model, duration_ms=round(duration_ms, 2), char_count=len(text), language=lang_code if lang_code != "en" else None, ), ) except ImportError: logger.warning("langextract_not_installed", fallback="mock") return _mock_extract(text, resolved_model, start_time) except Exception as exc: logger.error("extraction_failed", error=str(exc), model_id=resolved_model) raise def _mock_extract( text: str, model_id: str, start_time: float, ) -> ExtractResponse: """ Mock extraction for development when langextract or API keys are unavailable. Returns simple keyword-based extractions. """ extractions: list[Extraction] = [] words = text.lower().split() if any(w in words for w in ["meeting", "call", "sync", "standup"]): extractions.append(Extraction( extraction_class="topic", extraction_text="meeting", attributes={"type": "event"}, )) if any(w in words for w in ["todo", "action", "task", "need to", "should"]): extractions.append(Extraction( extraction_class="action_item", extraction_text=text[:100], attributes={"priority": "medium"}, )) if any(w in words for w in ["decided", "agreed", "decision", "will"]): extractions.append(Extraction( extraction_class="decision", extraction_text=text[:100], )) duration_ms = (time.monotonic() - start_time) * 1000 return ExtractResponse( extractions=extractions, metadata=ExtractMetadata( model_id=f"{model_id}-mock", duration_ms=round(duration_ms, 2), char_count=len(text), ), )