feat(extraction): scaffold extraction-service + @bytelyst/extraction package

- extraction-service: Fastify scaffold (port 4005) with extract/tasks modules
- src/lib/: config, errors, cosmos, product-config, python-bridge
- src/modules/extract/: types (Zod schemas), routes (POST /extract, batch, models)
- src/modules/tasks/: types, repository (Cosmos CRUD), routes (CRUD endpoints)
- Python sidecar: FastAPI app, LangExtract wrapper, models, task registry
- @bytelyst/extraction package: types, client factory, index exports
- Both pnpm build pass clean
This commit is contained in:
saravanakumardb1 2026-02-14 13:31:40 -08:00
parent 6c71255d19
commit c292bb5cc1
26 changed files with 1506 additions and 0 deletions

View File

@ -0,0 +1,24 @@
{
"name": "@bytelyst/extraction",
"version": "0.1.0",
"type": "module",
"description": "Shared types and client for the extraction service",
"exports": {
".": {
"import": "./dist/index.js",
"types": "./dist/index.d.ts"
}
},
"main": "./dist/index.js",
"types": "./dist/index.d.ts",
"files": [
"dist"
],
"scripts": {
"build": "tsc",
"test": "vitest run"
},
"peerDependencies": {
"@bytelyst/api-client": "workspace:*"
}
}

View File

@ -0,0 +1,78 @@
/**
* Extraction service client factory.
* Uses @bytelyst/api-client under the hood for consistent auth token injection.
*/
import { createApiClient } from '@bytelyst/api-client';
import type {
ExtractionClientConfig,
ExtractRequest,
ExtractResponse,
BatchExtractRequest,
BatchExtractResponse,
ExtractionTask,
} from './types.js';
export interface ExtractionClient {
/** Single document extraction. */
extract(req: ExtractRequest): Promise<ExtractResponse>;
/** Batch extraction (multiple inputs, shared config). */
extractBatch(req: BatchExtractRequest): Promise<BatchExtractResponse>;
/** List available extraction tasks. */
listTasks(productId?: string): Promise<ExtractionTask[]>;
/** Get a single task by ID. */
getTask(id: string, productId?: string): Promise<ExtractionTask>;
}
/**
* Create a typed extraction service client.
*
* @example
* ```ts
* const client = createExtractionClient({
* baseUrl: "http://localhost:4005",
* getToken: () => localStorage.getItem("access_token"),
* });
*
* const result = await client.extract({
* text: "John said we should ship by Friday.",
* taskId: "transcript-extraction",
* });
* ```
*/
export function createExtractionClient(config: ExtractionClientConfig): ExtractionClient {
const api = createApiClient({
baseUrl: config.baseUrl,
getToken: config.getToken,
});
return {
async extract(req: ExtractRequest): Promise<ExtractResponse> {
return api.fetch<ExtractResponse>('/api/extract', {
method: 'POST',
body: JSON.stringify(req),
});
},
async extractBatch(req: BatchExtractRequest): Promise<BatchExtractResponse> {
return api.fetch<BatchExtractResponse>('/api/extract/batch', {
method: 'POST',
body: JSON.stringify(req),
});
},
async listTasks(productId?: string): Promise<ExtractionTask[]> {
const qs = productId ? `?productId=${encodeURIComponent(productId)}` : '';
return api.fetch<ExtractionTask[]>(`/api/tasks${qs}`);
},
async getTask(id: string, productId?: string): Promise<ExtractionTask> {
const qs = productId ? `?productId=${encodeURIComponent(productId)}` : '';
return api.fetch<ExtractionTask>(`/api/tasks/${encodeURIComponent(id)}${qs}`);
},
};
}

View File

@ -0,0 +1,12 @@
export { createExtractionClient } from './client.js';
export type { ExtractionClient } from './client.js';
export type {
ExtractionEntity,
ExtractionExample,
ExtractionTask,
ExtractRequest,
ExtractResponse,
BatchExtractRequest,
BatchExtractResponse,
ExtractionClientConfig,
} from './types.js';

View File

@ -0,0 +1,78 @@
// ── Extraction types (shared across consumers) ─────────────────
export interface ExtractionEntity {
extraction_class: string;
extraction_text: string;
attributes?: Record<string, string>;
start_offset?: number;
end_offset?: number;
}
export interface ExtractionExample {
text: string;
extractions: ExtractionEntity[];
}
export interface ExtractionTask {
id: string;
name: string;
description?: string;
prompt: string;
classes: string[];
examples?: ExtractionExample[];
defaultModelId?: string;
builtIn: boolean;
productId: string;
createdAt?: string;
updatedAt?: string;
}
// ── Request / Response types ────────────────────────────────────
export interface ExtractRequest {
text: string;
taskId?: string;
taskPrompt?: string;
examples?: ExtractionExample[];
modelId?: string;
options?: {
extractionPasses?: number;
maxWorkers?: number;
maxCharBuffer?: number;
};
productId?: string;
}
export interface ExtractResponse {
extractions: ExtractionEntity[];
metadata: {
modelId: string;
durationMs: number;
tokenCount?: number;
charCount: number;
};
requestId?: string;
}
export interface BatchExtractRequest {
inputs: Array<{
text: string;
taskId?: string;
taskPrompt?: string;
}>;
examples?: ExtractionExample[];
modelId?: string;
productId?: string;
}
export interface BatchExtractResponse {
results: ExtractResponse[];
requestId?: string;
}
// ── Client config ───────────────────────────────────────────────
export interface ExtractionClientConfig {
baseUrl: string;
getToken?: () => string | null;
}

View File

@ -0,0 +1,10 @@
{
"extends": "../../tsconfig.base.json",
"compilerOptions": {
"outDir": "dist",
"rootDir": "src",
"lib": ["ES2022", "DOM"]
},
"include": ["src"],
"exclude": ["src/**/*.test.ts"]
}

61
pnpm-lock.yaml generated
View File

@ -97,6 +97,12 @@ importers:
packages/errors: {} packages/errors: {}
packages/extraction:
dependencies:
'@bytelyst/api-client':
specifier: workspace:*
version: link:../api-client
packages/fastify-core: packages/fastify-core:
dependencies: dependencies:
'@bytelyst/errors': '@bytelyst/errors':
@ -205,6 +211,61 @@ importers:
specifier: ^3.0.5 specifier: ^3.0.5
version: 3.2.4(@types/node@22.19.11)(happy-dom@18.0.1)(jsdom@28.0.0)(tsx@4.21.0)(yaml@2.8.2) version: 3.2.4(@types/node@22.19.11)(happy-dom@18.0.1)(jsdom@28.0.0)(tsx@4.21.0)(yaml@2.8.2)
services/extraction-service:
dependencies:
'@azure/cosmos':
specifier: ^4.2.0
version: 4.9.1(@azure/core-client@1.10.1)
'@bytelyst/auth':
specifier: workspace:*
version: link:../../packages/auth
'@bytelyst/config':
specifier: workspace:*
version: link:../../packages/config
'@bytelyst/cosmos':
specifier: workspace:*
version: link:../../packages/cosmos
'@bytelyst/errors':
specifier: workspace:*
version: link:../../packages/errors
'@bytelyst/fastify-core':
specifier: workspace:*
version: link:../../packages/fastify-core
'@fastify/cors':
specifier: ^10.0.2
version: 10.1.0
'@fastify/rate-limit':
specifier: ^10.3.0
version: 10.3.0
'@fastify/swagger':
specifier: ^9.4.2
version: 9.7.0
fastify:
specifier: ^5.2.1
version: 5.7.4
fastify-metrics:
specifier: ^10.3.0
version: 10.6.0(fastify@5.7.4)
jose:
specifier: ^6.0.8
version: 6.1.3
zod:
specifier: ^3.24.2
version: 3.25.76
devDependencies:
'@types/node':
specifier: ^22.12.0
version: 22.19.11
tsx:
specifier: ^4.19.2
version: 4.21.0
typescript:
specifier: ^5.7.3
version: 5.9.3
vitest:
specifier: ^3.0.5
version: 3.2.4(@types/node@22.19.11)(happy-dom@18.0.1)(jsdom@28.0.0)(tsx@4.21.0)(yaml@2.8.2)
services/growth-service: services/growth-service:
dependencies: dependencies:
'@azure/cosmos': '@azure/cosmos':

View File

@ -0,0 +1,29 @@
# ── Extraction Service Environment Variables ─────────────────────
# Copy to .env and fill in real values.
# ── Service ──────────────────────────────────────────────────────
PORT=4005
HOST=0.0.0.0
NODE_ENV=development
SERVICE_NAME=extraction-service
CORS_ORIGIN=*
# ── Python Sidecar ───────────────────────────────────────────────
PYTHON_SIDECAR_URL=http://localhost:4006
DEFAULT_MODEL_ID=gemini-2.5-flash
# ── Azure Cosmos DB ──────────────────────────────────────────────
COSMOS_ENDPOINT=https://cosmos-mywisprai.documents.azure.com:443/
COSMOS_KEY=your-cosmos-key
COSMOS_DATABASE=lysnrai
# ── Auth ─────────────────────────────────────────────────────────
JWT_SECRET=your-jwt-secret
# ── Product Identity ─────────────────────────────────────────────
DEFAULT_PRODUCT_ID=lysnrai
# ── Python Sidecar Env (passed to python/ process) ──────────────
GEMINI_API_KEY=your-gemini-api-key
# AZURE_OPENAI_API_KEY=your-azure-openai-key
# AZURE_OPENAI_ENDPOINT=https://your-resource.openai.azure.com/

View File

@ -0,0 +1,36 @@
{
"name": "@lysnrai/extraction-service",
"version": "0.1.0",
"private": true,
"description": "Extraction Service — LLM-powered structured extraction via LangExtract (product-agnostic)",
"type": "module",
"scripts": {
"dev": "tsx watch src/server.ts",
"build": "tsc",
"start": "node dist/server.js",
"test": "vitest run",
"test:watch": "vitest",
"lint": "eslint src/"
},
"dependencies": {
"@bytelyst/auth": "workspace:*",
"@bytelyst/config": "workspace:*",
"@bytelyst/cosmos": "workspace:*",
"@bytelyst/errors": "workspace:*",
"@bytelyst/fastify-core": "workspace:*",
"@azure/cosmos": "^4.2.0",
"@fastify/cors": "^10.0.2",
"@fastify/rate-limit": "^10.3.0",
"@fastify/swagger": "^9.4.2",
"fastify": "^5.2.1",
"fastify-metrics": "^10.3.0",
"jose": "^6.0.8",
"zod": "^3.24.2"
},
"devDependencies": {
"@types/node": "^22.12.0",
"tsx": "^4.19.2",
"typescript": "^5.7.3",
"vitest": "^3.0.5"
}
}

View File

@ -0,0 +1,6 @@
langextract>=0.3.0
fastapi>=0.115.0
uvicorn>=0.34.0
pydantic>=2.10.0
pydantic-settings>=2.7.0
structlog>=24.4.0

View File

@ -0,0 +1,115 @@
"""
Extraction sidecar FastAPI app running LangExtract.
Internal service, not exposed externally. Called by the Fastify extraction-service.
Default port: 4006.
"""
from __future__ import annotations
import os
import structlog
import uvicorn
from fastapi import FastAPI, HTTPException, Request
from .extractor import extract
from .models import (
BatchExtractRequest,
ExtractRequest,
ExtractResponse,
HealthResponse,
)
logger = structlog.get_logger(__name__)
app = FastAPI(
title="Extraction Sidecar",
description="Internal LangExtract sidecar for extraction-service",
version="0.1.0",
)
@app.get("/health", response_model=HealthResponse)
async def health() -> HealthResponse:
return HealthResponse()
@app.post("/extract", response_model=ExtractResponse)
async def extract_endpoint(req: ExtractRequest, request: Request) -> ExtractResponse:
request_id = request.headers.get("x-request-id", "")
logger.info(
"extract_request",
task_id=req.task_id,
model_id=req.model_id,
text_length=len(req.text),
request_id=request_id,
)
try:
result = await extract(
text=req.text,
task_prompt=req.task_prompt,
examples=[e.model_dump() for e in req.examples] if req.examples else None,
model_id=req.model_id,
extraction_passes=req.extraction_passes,
max_workers=req.max_workers,
max_char_buffer=req.max_char_buffer,
)
return result
except Exception as exc:
logger.error("extract_failed", error=str(exc), request_id=request_id)
raise HTTPException(status_code=500, detail=str(exc)) from exc
@app.post("/extract/batch", response_model=list[ExtractResponse])
async def extract_batch_endpoint(
batch: BatchExtractRequest,
request: Request,
) -> list[ExtractResponse]:
request_id = request.headers.get("x-request-id", "")
logger.info(
"batch_extract_request",
input_count=len(batch.requests),
request_id=request_id,
)
results: list[ExtractResponse] = []
for req in batch.requests:
try:
result = await extract(
text=req.text,
task_prompt=req.task_prompt,
examples=[e.model_dump() for e in req.examples] if req.examples else None,
model_id=req.model_id,
extraction_passes=req.extraction_passes,
max_workers=req.max_workers,
max_char_buffer=req.max_char_buffer,
)
results.append(result)
except Exception as exc:
logger.error("batch_item_failed", error=str(exc), request_id=request_id)
raise HTTPException(status_code=500, detail=str(exc)) from exc
return results
def main() -> None:
port = int(os.environ.get("SIDECAR_PORT", "4006"))
host = os.environ.get("SIDECAR_HOST", "0.0.0.0")
logger.info("sidecar_starting", port=port, host=host)
uvicorn.run(
"src.app:app",
host=host,
port=port,
reload=os.environ.get("NODE_ENV") == "development",
log_level="info",
)
if __name__ == "__main__":
main()

View File

@ -0,0 +1,141 @@
"""
LangExtract wrapper calls lx.extract() with configurable parameters.
"""
from __future__ import annotations
import os
import time
import structlog
from .models import Extraction, ExtractMetadata, ExtractResponse
logger = structlog.get_logger(__name__)
DEFAULT_MODEL_ID = os.environ.get("DEFAULT_MODEL_ID", "gemini-2.5-flash")
async def extract(
text: str,
task_prompt: str | None = None,
examples: list[dict] | None = None,
model_id: str | None = None,
extraction_passes: int | None = None,
max_workers: int | None = None,
max_char_buffer: int | None = None,
) -> ExtractResponse:
"""
Run LangExtract on the given text.
Falls back to a mock implementation if langextract is not installed
or no API key is configured.
"""
resolved_model = model_id or DEFAULT_MODEL_ID
start_time = time.monotonic()
try:
import langextract as lx
# Build LangExtract parameters
lx_kwargs: dict = {
"text": text,
"model_id": resolved_model,
}
if task_prompt:
lx_kwargs["prompt"] = task_prompt
if examples:
lx_kwargs["examples"] = examples
if extraction_passes is not None:
lx_kwargs["extraction_passes"] = extraction_passes
if max_workers is not None:
lx_kwargs["max_workers"] = max_workers
if max_char_buffer is not None:
lx_kwargs["max_char_buffer"] = max_char_buffer
result = lx.extract(**lx_kwargs)
extractions = [
Extraction(
extraction_class=e.get("extraction_class", "unknown"),
extraction_text=e.get("extraction_text", ""),
attributes=e.get("attributes"),
)
for e in (result.extractions if hasattr(result, "extractions") else result)
]
duration_ms = (time.monotonic() - start_time) * 1000
logger.info(
"extraction_complete",
model_id=resolved_model,
entity_count=len(extractions),
duration_ms=round(duration_ms, 2),
char_count=len(text),
)
return ExtractResponse(
extractions=extractions,
metadata=ExtractMetadata(
model_id=resolved_model,
duration_ms=round(duration_ms, 2),
char_count=len(text),
),
)
except ImportError:
logger.warning("langextract_not_installed", fallback="mock")
return _mock_extract(text, resolved_model, start_time)
except Exception as exc:
logger.error("extraction_failed", error=str(exc), model_id=resolved_model)
raise
def _mock_extract(
text: str,
model_id: str,
start_time: float,
) -> ExtractResponse:
"""
Mock extraction for development when langextract or API keys are unavailable.
Returns simple keyword-based extractions.
"""
extractions: list[Extraction] = []
words = text.lower().split()
if any(w in words for w in ["meeting", "call", "sync", "standup"]):
extractions.append(Extraction(
extraction_class="topic",
extraction_text="meeting",
attributes={"type": "event"},
))
if any(w in words for w in ["todo", "action", "task", "need to", "should"]):
extractions.append(Extraction(
extraction_class="action_item",
extraction_text=text[:100],
attributes={"priority": "medium"},
))
if any(w in words for w in ["decided", "agreed", "decision", "will"]):
extractions.append(Extraction(
extraction_class="decision",
extraction_text=text[:100],
))
duration_ms = (time.monotonic() - start_time) * 1000
return ExtractResponse(
extractions=extractions,
metadata=ExtractMetadata(
model_id=f"{model_id}-mock",
duration_ms=round(duration_ms, 2),
char_count=len(text),
),
)

View File

@ -0,0 +1,52 @@
"""
Pydantic models for the extraction sidecar API.
These mirror the Zod schemas in the TypeScript service.
"""
from __future__ import annotations
from pydantic import BaseModel, Field
class Extraction(BaseModel):
extraction_class: str
extraction_text: str
attributes: dict[str, str] | None = None
class ExtractionExample(BaseModel):
text: str
extractions: list[Extraction]
class ExtractRequest(BaseModel):
text: str = Field(..., min_length=1, max_length=50_000)
task_id: str | None = None
task_prompt: str | None = None
examples: list[ExtractionExample] | None = None
model_id: str | None = None
extraction_passes: int | None = Field(None, ge=1, le=5)
max_workers: int | None = Field(None, ge=1, le=50)
max_char_buffer: int | None = Field(None, ge=100, le=10_000)
class BatchExtractRequest(BaseModel):
requests: list[ExtractRequest] = Field(..., min_length=1, max_length=50)
class ExtractMetadata(BaseModel):
model_id: str
duration_ms: float
token_count: int | None = None
char_count: int
class ExtractResponse(BaseModel):
extractions: list[Extraction]
metadata: ExtractMetadata
class HealthResponse(BaseModel):
status: str = "ok"
version: str = "0.1.0"
sidecar: str = "langextract"

View File

@ -0,0 +1,201 @@
"""
Built-in extraction task definitions.
These are seeded into Cosmos DB on service startup.
"""
from __future__ import annotations
BUILTIN_TASKS: list[dict] = [
{
"id": "transcript-extraction",
"name": "Transcript Extraction",
"description": "Extract structured entities from meeting transcripts and voice notes.",
"prompt": (
"Extract action items, decisions, questions, deadlines, people, and topics "
"from the following transcript. Each extraction should be verbatim text from "
"the source with the appropriate classification."
),
"classes": ["action_item", "decision", "question", "deadline", "person", "topic"],
"examples": [
{
"text": "John said we need to ship the feature by Friday. Sarah agreed to handle the testing.",
"extractions": [
{
"extraction_class": "deadline",
"extraction_text": "ship the feature by Friday",
},
{
"extraction_class": "person",
"extraction_text": "John",
},
{
"extraction_class": "person",
"extraction_text": "Sarah",
},
{
"extraction_class": "action_item",
"extraction_text": "Sarah agreed to handle the testing",
},
{
"extraction_class": "decision",
"extraction_text": "we need to ship the feature by Friday",
},
],
},
],
"defaultModelId": "gemini-2.5-flash",
"builtIn": True,
},
{
"id": "triage",
"name": "MindLyst Triage",
"description": "Extract topics, entities, actions, emotions, and brain routing signals from user captures.",
"prompt": (
"Analyze the following user capture and extract: topics, named entities, "
"action items, emotional signals, date references, and brain routing signals. "
"Brain signals should include which brain (work, home, money, health, global) "
"the content belongs to with a confidence score."
),
"classes": ["topic", "entity", "action", "emotion", "date_reference", "brain_signal"],
"examples": [
{
"text": "Remind me to call the dentist tomorrow about my appointment. I'm stressed about the cost.",
"extractions": [
{
"extraction_class": "action",
"extraction_text": "call the dentist tomorrow",
},
{
"extraction_class": "date_reference",
"extraction_text": "tomorrow",
},
{
"extraction_class": "emotion",
"extraction_text": "stressed about the cost",
"attributes": {"valence": "negative"},
},
{
"extraction_class": "brain_signal",
"extraction_text": "dentist",
"attributes": {"brain": "health", "confidence": "0.9"},
},
{
"extraction_class": "brain_signal",
"extraction_text": "cost",
"attributes": {"brain": "money", "confidence": "0.6"},
},
],
},
],
"defaultModelId": "gemini-2.5-flash",
"builtIn": True,
},
{
"id": "memory-insight",
"name": "Memory Insight Extraction",
"description": "Extract patterns, recurring themes, relationships, and milestones from accumulated brain memories.",
"prompt": (
"Analyze the following collection of memory items and extract: recurring patterns, "
"themes, relationships between items, and milestones. Focus on cross-cutting insights "
"that span multiple items."
),
"classes": ["pattern", "recurring_theme", "relationship", "milestone"],
"examples": [
{
"text": "Item 1: Skipped gym again. Item 2: Feeling tired at work. Item 3: Had coffee at 4pm.",
"extractions": [
{
"extraction_class": "pattern",
"extraction_text": "Skipped gym again",
"attributes": {"frequency": "recurring"},
},
{
"extraction_class": "relationship",
"extraction_text": "Feeling tired at work",
"attributes": {"related_to": "Skipped gym again"},
},
],
},
],
"defaultModelId": "gemini-2.5-flash",
"builtIn": True,
},
{
"id": "reflection-enrichment",
"name": "Reflection Enrichment",
"description": "Extract emotional states, accomplishments, concerns, and goal progress from journal-style text.",
"prompt": (
"Analyze the following reflection or journal entry and extract: emotional states, "
"accomplishments, concerns, and goal progress indicators."
),
"classes": ["emotional_state", "accomplishment", "concern", "goal_progress"],
"examples": [
{
"text": "Good day overall. Finally finished the proposal I've been putting off. Still worried about the budget review next week.",
"extractions": [
{
"extraction_class": "emotional_state",
"extraction_text": "Good day overall",
"attributes": {"valence": "positive"},
},
{
"extraction_class": "accomplishment",
"extraction_text": "finished the proposal",
},
{
"extraction_class": "concern",
"extraction_text": "worried about the budget review next week",
},
],
},
],
"defaultModelId": "gemini-2.5-flash",
"builtIn": True,
},
{
"id": "bug-report-extraction",
"name": "Bug Report Extraction",
"description": "Extract structured fields from bug report submissions.",
"prompt": (
"Extract steps to reproduce, expected behavior, actual behavior, affected component, "
"and severity from the following bug report."
),
"classes": [
"steps_to_reproduce",
"expected_behavior",
"actual_behavior",
"affected_component",
"severity",
],
"examples": [
{
"text": "When I click the save button on the settings page, nothing happens. It should save my preferences. This is a critical issue affecting all users.",
"extractions": [
{
"extraction_class": "steps_to_reproduce",
"extraction_text": "click the save button on the settings page",
},
{
"extraction_class": "actual_behavior",
"extraction_text": "nothing happens",
},
{
"extraction_class": "expected_behavior",
"extraction_text": "should save my preferences",
},
{
"extraction_class": "affected_component",
"extraction_text": "settings page",
},
{
"extraction_class": "severity",
"extraction_text": "critical issue affecting all users",
"attributes": {"level": "critical"},
},
],
},
],
"defaultModelId": "gemini-2.5-flash",
"builtIn": True,
},
]

View File

@ -0,0 +1,18 @@
import { z } from 'zod';
const envSchema = z.object({
PORT: z.coerce.number().default(4005),
HOST: z.string().default('0.0.0.0'),
NODE_ENV: z.enum(['development', 'production', 'test']).default('development'),
CORS_ORIGIN: z.string().optional(),
SERVICE_NAME: z.string().default('extraction-service'),
COSMOS_ENDPOINT: z.string().min(1, 'COSMOS_ENDPOINT is required'),
COSMOS_KEY: z.string().min(1, 'COSMOS_KEY is required'),
COSMOS_DATABASE: z.string().default('lysnrai'),
JWT_SECRET: z.string().min(1, 'JWT_SECRET is required'),
DEFAULT_PRODUCT_ID: z.string().default('lysnrai'),
PYTHON_SIDECAR_URL: z.string().default('http://localhost:4006'),
DEFAULT_MODEL_ID: z.string().default('gemini-2.5-flash'),
});
export const config = envSchema.parse(process.env);

View File

@ -0,0 +1,4 @@
/**
* Re-export from @bytelyst/cosmos shared across all services.
*/
export { getContainer, getCosmosClient, getDatabase } from '@bytelyst/cosmos';

View File

@ -0,0 +1,12 @@
/**
* Re-export from @bytelyst/errors shared across all services.
*/
export {
ServiceError,
BadRequestError,
UnauthorizedError,
ForbiddenError,
NotFoundError,
ConflictError,
TooManyRequestsError,
} from '@bytelyst/errors';

View File

@ -0,0 +1,7 @@
/**
* Re-export from @bytelyst/config shared product identity.
* The extraction service is product-agnostic; every document carries its own productId.
*/
import { getProductId } from '@bytelyst/config';
export const DEFAULT_PRODUCT_ID = process.env.DEFAULT_PRODUCT_ID || getProductId();

View File

@ -0,0 +1,145 @@
/**
* HTTP bridge to the Python sidecar running LangExtract.
*
* The sidecar exposes FastAPI endpoints on an internal port (default 4006).
* This module provides a typed client with timeout, retry, and error mapping.
*/
import { config } from './config.js';
const SIDECAR_URL = config.PYTHON_SIDECAR_URL;
const DEFAULT_TIMEOUT_MS = 120_000;
export interface SidecarExtractRequest {
text: string;
task_id?: string;
task_prompt?: string;
examples?: SidecarExample[];
model_id?: string;
extraction_passes?: number;
max_workers?: number;
max_char_buffer?: number;
}
export interface SidecarExample {
text: string;
extractions: SidecarExtraction[];
}
export interface SidecarExtraction {
extraction_class: string;
extraction_text: string;
attributes?: Record<string, string>;
}
export interface SidecarExtractResponse {
extractions: SidecarExtraction[];
metadata: {
model_id: string;
duration_ms: number;
token_count?: number;
char_count: number;
};
}
export interface SidecarHealthResponse {
status: string;
version?: string;
}
/**
* POST /extract on the Python sidecar.
*/
export async function sidecarExtract(
req: SidecarExtractRequest,
requestId?: string,
timeoutMs: number = DEFAULT_TIMEOUT_MS
): Promise<SidecarExtractResponse> {
const headers: Record<string, string> = {
'Content-Type': 'application/json',
};
if (requestId) {
headers['x-request-id'] = requestId;
}
const res = await fetch(`${SIDECAR_URL}/extract`, {
method: 'POST',
headers,
body: JSON.stringify(req),
signal: AbortSignal.timeout(timeoutMs),
});
if (!res.ok) {
const body = await res.json().catch(() => ({ error: res.statusText }));
throw new Error(`Sidecar error ${res.status}: ${body.error || body.detail || res.statusText}`);
}
return res.json() as Promise<SidecarExtractResponse>;
}
/**
* POST /extract/batch on the Python sidecar.
*/
export async function sidecarExtractBatch(
requests: SidecarExtractRequest[],
requestId?: string,
timeoutMs: number = DEFAULT_TIMEOUT_MS
): Promise<SidecarExtractResponse[]> {
const headers: Record<string, string> = {
'Content-Type': 'application/json',
};
if (requestId) {
headers['x-request-id'] = requestId;
}
const res = await fetch(`${SIDECAR_URL}/extract/batch`, {
method: 'POST',
headers,
body: JSON.stringify({ requests }),
signal: AbortSignal.timeout(timeoutMs),
});
if (!res.ok) {
const body = await res.json().catch(() => ({ error: res.statusText }));
throw new Error(
`Sidecar batch error ${res.status}: ${body.error || body.detail || res.statusText}`
);
}
return res.json() as Promise<SidecarExtractResponse[]>;
}
/**
* GET /health on the Python sidecar.
*/
export async function sidecarHealth(): Promise<SidecarHealthResponse> {
const res = await fetch(`${SIDECAR_URL}/health`, {
signal: AbortSignal.timeout(5_000),
});
if (!res.ok) {
throw new Error(`Sidecar health check failed: ${res.status}`);
}
return res.json() as Promise<SidecarHealthResponse>;
}
/**
* Wait for the Python sidecar to become ready (poll /health).
*/
export async function waitForSidecar(
maxRetries: number = 30,
intervalMs: number = 2_000
): Promise<boolean> {
for (let i = 0; i < maxRetries; i++) {
try {
await sidecarHealth();
return true;
} catch {
if (i < maxRetries - 1) {
await new Promise(resolve => globalThis.setTimeout(resolve, intervalMs));
}
}
}
return false;
}

View File

@ -0,0 +1,129 @@
import type { FastifyInstance } from 'fastify';
import { ExtractRequestSchema, BatchExtractRequestSchema } from './types.js';
import { sidecarExtract, sidecarExtractBatch, sidecarHealth } from '../../lib/python-bridge.js';
import { BadRequestError } from '../../lib/errors.js';
export async function extractRoutes(app: FastifyInstance) {
/**
* POST /extract Single document extraction.
*/
app.post('/extract', async (req, reply) => {
const parsed = ExtractRequestSchema.safeParse(req.body);
if (!parsed.success) {
throw new BadRequestError(parsed.error.issues.map(i => i.message).join('; '));
}
const { text, taskId, taskPrompt, examples, modelId, options } = parsed.data;
const requestId = req.headers['x-request-id'] as string | undefined;
req.log.info({ taskId, modelId, textLength: text.length }, 'extraction request');
const result = await sidecarExtract(
{
text,
task_id: taskId,
task_prompt: taskPrompt,
examples: examples?.map(e => ({
text: e.text,
extractions: e.extractions.map(ex => ({
extraction_class: ex.extraction_class,
extraction_text: ex.extraction_text,
attributes: ex.attributes,
})),
})),
model_id: modelId,
extraction_passes: options?.extractionPasses,
max_workers: options?.maxWorkers,
max_char_buffer: options?.maxCharBuffer,
},
requestId
);
req.log.info(
{ entityCount: result.extractions.length, durationMs: result.metadata.duration_ms },
'extraction complete'
);
return reply.send({
extractions: result.extractions,
metadata: {
modelId: result.metadata.model_id,
durationMs: result.metadata.duration_ms,
tokenCount: result.metadata.token_count,
charCount: result.metadata.char_count,
},
requestId,
});
});
/**
* POST /extract/batch Batch extraction (multiple inputs, shared config).
*/
app.post('/extract/batch', async (req, reply) => {
const parsed = BatchExtractRequestSchema.safeParse(req.body);
if (!parsed.success) {
throw new BadRequestError(parsed.error.issues.map(i => i.message).join('; '));
}
const { inputs, examples, modelId } = parsed.data;
const requestId = req.headers['x-request-id'] as string | undefined;
req.log.info({ inputCount: inputs.length, modelId }, 'batch extraction request');
const sidecarRequests = inputs.map(input => ({
text: input.text,
task_id: input.taskId,
task_prompt: input.taskPrompt,
examples: examples?.map(e => ({
text: e.text,
extractions: e.extractions.map(ex => ({
extraction_class: ex.extraction_class,
extraction_text: ex.extraction_text,
attributes: ex.attributes,
})),
})),
model_id: modelId,
}));
const results = await sidecarExtractBatch(sidecarRequests, requestId);
return reply.send({
results: results.map(r => ({
extractions: r.extractions,
metadata: {
modelId: r.metadata.model_id,
durationMs: r.metadata.duration_ms,
tokenCount: r.metadata.token_count,
charCount: r.metadata.char_count,
},
})),
requestId,
});
});
/**
* GET /extract/models List available model providers.
*/
app.get('/extract/models', async (_req, reply) => {
return reply.send({
models: [
{ id: 'gemini-2.5-flash', provider: 'google', description: 'Gemini 2.5 Flash (default)' },
{ id: 'gemini-2.5-pro', provider: 'google', description: 'Gemini 2.5 Pro' },
],
});
});
/**
* GET /extract/sidecar-health Check Python sidecar status.
*/
app.get('/extract/sidecar-health', async (_req, reply) => {
try {
const health = await sidecarHealth();
return reply.send({ status: 'ok', sidecar: health });
} catch (err) {
const message = err instanceof Error ? err.message : 'Sidecar unavailable';
return reply.status(503).send({ status: 'error', error: message });
}
});
}

View File

@ -0,0 +1,77 @@
import { z } from 'zod';
// ── Shared sub-schemas ──────────────────────────────────────────
export const ExtractionExampleSchema = z.object({
text: z.string().min(1),
extractions: z.array(
z.object({
extraction_class: z.string().min(1),
extraction_text: z.string().min(1),
attributes: z.record(z.string()).optional(),
})
),
});
export const ExtractionResultSchema = z.object({
extraction_class: z.string(),
extraction_text: z.string(),
attributes: z.record(z.string()).optional(),
start_offset: z.number().optional(),
end_offset: z.number().optional(),
});
// ── Request schemas ─────────────────────────────────────────────
export const ExtractRequestSchema = z.object({
text: z.string().min(1, 'text is required').max(50_000, 'text exceeds 50,000 character limit'),
taskId: z.string().optional(),
taskPrompt: z.string().optional(),
examples: z.array(ExtractionExampleSchema).optional(),
modelId: z.string().optional(),
options: z
.object({
extractionPasses: z.number().int().min(1).max(5).optional(),
maxWorkers: z.number().int().min(1).max(50).optional(),
maxCharBuffer: z.number().int().min(100).max(10_000).optional(),
})
.optional(),
productId: z.string().optional(),
});
export const BatchExtractRequestSchema = z.object({
inputs: z
.array(
z.object({
text: z.string().min(1).max(50_000),
taskId: z.string().optional(),
taskPrompt: z.string().optional(),
})
)
.min(1)
.max(50),
examples: z.array(ExtractionExampleSchema).optional(),
modelId: z.string().optional(),
productId: z.string().optional(),
});
// ── Response schemas ────────────────────────────────────────────
export const ExtractResponseSchema = z.object({
extractions: z.array(ExtractionResultSchema),
metadata: z.object({
modelId: z.string(),
durationMs: z.number(),
tokenCount: z.number().optional(),
charCount: z.number(),
}),
requestId: z.string().optional(),
});
// ── TypeScript types ────────────────────────────────────────────
export type ExtractionExample = z.infer<typeof ExtractionExampleSchema>;
export type ExtractionResult = z.infer<typeof ExtractionResultSchema>;
export type ExtractRequest = z.infer<typeof ExtractRequestSchema>;
export type BatchExtractRequest = z.infer<typeof BatchExtractRequestSchema>;
export type ExtractResponse = z.infer<typeof ExtractResponseSchema>;

View File

@ -0,0 +1,100 @@
import { getContainer } from '../../lib/cosmos.js';
import { NotFoundError, ConflictError } from '../../lib/errors.js';
import type { ExtractionTaskDoc, CreateTask, UpdateTask } from './types.js';
const CONTAINER_ID = 'extraction_tasks';
function container() {
return getContainer(CONTAINER_ID);
}
/**
* List all extraction tasks for a given productId.
*/
export async function listTasks(productId: string): Promise<ExtractionTaskDoc[]> {
const { resources } = await container()
.items.query<ExtractionTaskDoc>(
{
query: 'SELECT * FROM c WHERE c.productId = @productId ORDER BY c.builtIn DESC, c.name ASC',
parameters: [{ name: '@productId', value: productId }],
},
{ partitionKey: productId }
)
.fetchAll();
return resources;
}
/**
* Get a single task by ID.
*/
export async function getTask(id: string, productId: string): Promise<ExtractionTaskDoc> {
const { resource } = await container().item(id, productId).read<ExtractionTaskDoc>();
if (!resource) {
throw new NotFoundError(`Task '${id}' not found`);
}
return resource;
}
/**
* Create a new custom task.
*/
export async function createTask(input: CreateTask, productId: string): Promise<ExtractionTaskDoc> {
const now = new Date().toISOString();
const doc: ExtractionTaskDoc = {
...input,
productId,
builtIn: false,
createdAt: now,
updatedAt: now,
};
try {
const { resource } = await container().items.create(doc);
return resource as ExtractionTaskDoc;
} catch (err: unknown) {
if (err && typeof err === 'object' && 'code' in err && err.code === 409) {
throw new ConflictError(`Task '${input.id}' already exists`);
}
throw err;
}
}
/**
* Update an existing custom task.
*/
export async function updateTask(
id: string,
productId: string,
updates: UpdateTask
): Promise<ExtractionTaskDoc> {
const existing = await getTask(id, productId);
const updated: ExtractionTaskDoc = {
...existing,
...updates,
updatedAt: new Date().toISOString(),
};
const { resource } = await container().item(id, productId).replace(updated);
return resource as ExtractionTaskDoc;
}
/**
* Delete a custom task (built-in tasks cannot be deleted).
*/
export async function deleteTask(id: string, productId: string): Promise<void> {
const existing = await getTask(id, productId);
if (existing.builtIn) {
throw new ConflictError(`Cannot delete built-in task '${id}'`);
}
await container().item(id, productId).delete();
}
/**
* Upsert a task (used for seeding built-in tasks).
*/
export async function upsertTask(doc: ExtractionTaskDoc): Promise<ExtractionTaskDoc> {
const { resource } = await container().items.upsert(doc);
return resource as unknown as ExtractionTaskDoc;
}

View File

@ -0,0 +1,67 @@
import type { FastifyInstance } from 'fastify';
import { CreateTaskSchema, UpdateTaskSchema } from './types.js';
import * as repo from './repository.js';
import { BadRequestError } from '../../lib/errors.js';
import { DEFAULT_PRODUCT_ID } from '../../lib/product-config.js';
export async function taskRoutes(app: FastifyInstance) {
/**
* GET /tasks List all extraction tasks.
*/
app.get('/tasks', async (req, reply) => {
const productId = (req.query as Record<string, string>).productId || DEFAULT_PRODUCT_ID;
const tasks = await repo.listTasks(productId);
return reply.send(tasks);
});
/**
* GET /tasks/:id Get a single task by ID.
*/
app.get('/tasks/:id', async (req, reply) => {
const { id } = req.params as { id: string };
const productId = (req.query as Record<string, string>).productId || DEFAULT_PRODUCT_ID;
const task = await repo.getTask(id, productId);
return reply.send(task);
});
/**
* POST /tasks Create a custom task (admin only).
*/
app.post('/tasks', async (req, reply) => {
const parsed = CreateTaskSchema.safeParse(req.body);
if (!parsed.success) {
throw new BadRequestError(parsed.error.issues.map(i => i.message).join('; '));
}
const productId = parsed.data.productId || DEFAULT_PRODUCT_ID;
const task = await repo.createTask(parsed.data, productId);
return reply.status(201).send(task);
});
/**
* PUT /tasks/:id Update an existing task (admin only).
*/
app.put('/tasks/:id', async (req, reply) => {
const { id } = req.params as { id: string };
const productId = (req.query as Record<string, string>).productId || DEFAULT_PRODUCT_ID;
const parsed = UpdateTaskSchema.safeParse(req.body);
if (!parsed.success) {
throw new BadRequestError(parsed.error.issues.map(i => i.message).join('; '));
}
const task = await repo.updateTask(id, productId, parsed.data);
return reply.send(task);
});
/**
* DELETE /tasks/:id Delete a custom task (admin only).
*/
app.delete('/tasks/:id', async (req, reply) => {
const { id } = req.params as { id: string };
const productId = (req.query as Record<string, string>).productId || DEFAULT_PRODUCT_ID;
await repo.deleteTask(id, productId);
return reply.status(204).send();
});
}

View File

@ -0,0 +1,52 @@
import { z } from 'zod';
import { ExtractionExampleSchema } from '../extract/types.js';
// ── Task definition schemas ─────────────────────────────────────
export const ExtractionTaskSchema = z.object({
id: z.string().min(1),
name: z.string().min(1),
description: z.string().optional(),
prompt: z.string().min(1),
classes: z.array(z.string().min(1)),
examples: z.array(ExtractionExampleSchema).optional(),
defaultModelId: z.string().optional(),
builtIn: z.boolean().default(false),
productId: z.string(),
createdAt: z.string().optional(),
updatedAt: z.string().optional(),
});
export const CreateTaskSchema = z.object({
id: z.string().min(1),
name: z.string().min(1),
description: z.string().optional(),
prompt: z.string().min(1),
classes: z.array(z.string().min(1)),
examples: z.array(ExtractionExampleSchema).optional(),
defaultModelId: z.string().optional(),
productId: z.string().optional(),
});
export const UpdateTaskSchema = z.object({
name: z.string().min(1).optional(),
description: z.string().optional(),
prompt: z.string().min(1).optional(),
classes: z.array(z.string().min(1)).optional(),
examples: z.array(ExtractionExampleSchema).optional(),
defaultModelId: z.string().optional(),
});
// ── TypeScript types ────────────────────────────────────────────
export type ExtractionTask = z.infer<typeof ExtractionTaskSchema>;
export type CreateTask = z.infer<typeof CreateTaskSchema>;
export type UpdateTask = z.infer<typeof UpdateTaskSchema>;
// ── Cosmos DB document shape ────────────────────────────────────
export interface ExtractionTaskDoc extends ExtractionTask {
_ts?: number;
_etag?: string;
}

View File

@ -0,0 +1,33 @@
/**
* Extraction Service Fastify server entry point.
*
* Modules: extract, tasks.
* Port: 4005 (configurable via PORT env var).
* Product-agnostic: all data scoped by productId.
*
* Depends on a Python sidecar running LangExtract (default port 4006).
*/
import { createServiceApp, startService } from '@bytelyst/fastify-core';
import { extractRoutes } from './modules/extract/routes.js';
import { taskRoutes } from './modules/tasks/routes.js';
import { config } from './lib/config.js';
const app = await createServiceApp({
name: 'extraction-service',
version: '0.1.0',
description: 'LLM-powered structured extraction via LangExtract — product-agnostic',
corsOrigin: config.CORS_ORIGIN,
swagger: {
title: 'Extraction Service',
description: 'LLM-powered structured extraction via LangExtract',
port: config.PORT,
},
metrics: true,
});
// Register route modules
await app.register(extractRoutes, { prefix: '/api' });
await app.register(taskRoutes, { prefix: '/api' });
await startService(app, { port: config.PORT, host: config.HOST });

View File

@ -0,0 +1,19 @@
{
"compilerOptions": {
"target": "ES2022",
"module": "ESNext",
"moduleResolution": "bundler",
"outDir": "dist",
"rootDir": "src",
"strict": true,
"esModuleInterop": true,
"skipLibCheck": true,
"forceConsistentCasingInFileNames": true,
"resolveJsonModule": true,
"declaration": true,
"declarationMap": true,
"sourceMap": true
},
"include": ["src/**/*"],
"exclude": ["node_modules", "dist", "src/**/*.test.ts"]
}