learning_ai_common_plat/services/extraction-service/src/modules/extract/jobs.ts
saravanakumardb1 5c1744d3a4 feat(extraction): Phase 6 advanced features (6.1-6.8)
- 6.1-6.2: Entity visualization components (bar chart, pie chart, timeline) [in LysnrAI repo]
- 6.3-6.4: Async job queue — POST /extract/jobs, GET /extract/jobs/:id, GET /extract/jobs
- 6.5-6.6: Model registry with tier (standard/premium/free/mock) + GET /extract/models
- 6.7-6.8: Multi-language detection (es/fr/de/pt/ja/zh/ko/ar) + prompt enrichment
- ExtractMetadata.language field added to Python models
- 46 TS tests passing, build clean
2026-02-14 14:08:02 -08:00

96 lines
2.7 KiB
TypeScript

/**
* Async extraction job queue.
*
* For large batch requests, callers can submit an async job and poll for results.
* Jobs are stored in-memory (Cosmos persistence deferred to Phase 7).
*
* Flow: POST /extract/jobs → { jobId } → GET /extract/jobs/:id → { status, results }
*/
import { randomUUID } from 'node:crypto';
import {
sidecarExtract,
type SidecarExtractRequest,
type SidecarExtractResponse,
} from '../../lib/python-bridge.js';
export type JobStatus = 'pending' | 'processing' | 'completed' | 'failed';
export interface ExtractionJob {
id: string;
status: JobStatus;
inputs: SidecarExtractRequest[];
results: SidecarExtractResponse[];
errors: Array<{ index: number; error: string }>;
progress: { completed: number; total: number };
createdAt: string;
completedAt?: string;
}
const jobStore = new Map<string, ExtractionJob>();
/**
* Create a new async extraction job and start processing in background.
*/
export function createJob(inputs: SidecarExtractRequest[], requestId?: string): ExtractionJob {
const job: ExtractionJob = {
id: randomUUID(),
status: 'pending',
inputs,
results: [],
errors: [],
progress: { completed: 0, total: inputs.length },
createdAt: new Date().toISOString(),
};
jobStore.set(job.id, job);
// Start processing in background (non-blocking)
processJob(job, requestId).catch(() => {
job.status = 'failed';
});
return job;
}
/**
* Get job by ID.
*/
export function getJob(jobId: string): ExtractionJob | undefined {
return jobStore.get(jobId);
}
/**
* List recent jobs (last 50).
*/
export function listJobs(limit = 50): ExtractionJob[] {
return [...jobStore.values()]
.sort((a, b) => b.createdAt.localeCompare(a.createdAt))
.slice(0, limit);
}
// ── Internal ─────────────────────────────────────────────────────
async function processJob(job: ExtractionJob, requestId?: string): Promise<void> {
job.status = 'processing';
for (let i = 0; i < job.inputs.length; i++) {
try {
const result = await sidecarExtract(job.inputs[i], requestId);
job.results.push(result);
} catch (err) {
const message = err instanceof Error ? err.message : 'Unknown error';
job.errors.push({ index: i, error: message });
// Push a placeholder so indices align
job.results.push({
extractions: [],
metadata: { model_id: 'error', duration_ms: 0, char_count: 0 },
});
}
job.progress.completed = i + 1;
}
job.status = job.errors.length === job.inputs.length ? 'failed' : 'completed';
job.completedAt = new Date().toISOString();
}