feat(queue): add durable worker runtime and extraction integration

This commit is contained in:
root 2026-03-14 06:25:10 +00:00
parent bc1f9c59c2
commit 2b4fccb744
16 changed files with 1029 additions and 377 deletions

View File

@ -47,6 +47,10 @@ SLACK_DEFAULT_CHANNEL=
PYTHON_SIDECAR_URL=http://localhost:4006
DEFAULT_MODEL_ID=gemini-2.5-flash
GEMINI_API_KEY=your-gemini-api-key
EXTRACTION_QUEUE_BACKEND=file
EXTRACTION_QUEUE_FILE=.data/extraction-jobs.json
EXTRACTION_QUEUE_POLL_MS=100
EXTRACTION_QUEUE_LEASE_MS=30000
# ── Webhooks (optional — fire-and-forget callbacks) ──────────
WEBHOOK_INVITATION_REDEEMED_URL=

View File

@ -0,0 +1,25 @@
{
"name": "@bytelyst/queue",
"version": "0.1.0",
"description": "Durable job queue with pluggable stores and worker runtime",
"type": "module",
"exports": {
".": {
"import": "./dist/index.js",
"types": "./dist/index.d.ts"
}
},
"main": "./dist/index.js",
"types": "./dist/index.d.ts",
"files": [
"dist"
],
"scripts": {
"build": "tsc",
"test": "vitest run"
},
"devDependencies": {
"@types/node": "^22.12.0",
"vitest": "^3.0.5"
}
}

View File

@ -0,0 +1,173 @@
import { randomUUID } from 'node:crypto';
import { mkdir, readFile, rename, writeFile } from 'node:fs/promises';
import { dirname } from 'node:path';
import type { EnqueueJobInput, ListJobsOptions, QueueJob, QueueStore } from './types.js';
export interface FileQueueStoreOptions {
filePath: string;
}
export class FileQueueStore implements QueueStore {
private readonly filePath: string;
private operation = Promise.resolve();
constructor(options: FileQueueStoreOptions) {
this.filePath = options.filePath;
}
enqueue<TPayload = unknown>(
queueName: string,
input: EnqueueJobInput<TPayload>
): Promise<QueueJob<TPayload>> {
return this.withLock(async () => {
const state = await this.readState();
const queue = state[queueName] ?? [];
if (input.idempotencyKey) {
const existing = queue.find(job => job.idempotencyKey === input.idempotencyKey);
if (existing) return existing as QueueJob<TPayload>;
}
const now = new Date();
const job: QueueJob<TPayload> = {
id: input.id || randomUUID(),
queueName,
type: input.type,
payload: input.payload,
status: 'queued',
attempts: 0,
maxAttempts: input.maxAttempts ?? 3,
createdAt: now.toISOString(),
scheduledAt: new Date(now.getTime() + (input.delayMs ?? 0)).toISOString(),
progress: input.progress,
metadata: input.metadata,
idempotencyKey: input.idempotencyKey,
productId: input.productId,
userId: input.userId,
};
state[queueName] = [...queue, job];
await this.writeState(state);
return job;
});
}
get<TPayload = unknown, TResult = unknown>(
queueName: string,
id: string
): Promise<QueueJob<TPayload, TResult> | undefined> {
return this.withLock(async () => {
const state = await this.readState();
const job = (state[queueName] ?? []).find(item => item.id === id);
return job as QueueJob<TPayload, TResult> | undefined;
});
}
list<TPayload = unknown, TResult = unknown>(
queueName: string,
options?: ListJobsOptions
): Promise<Array<QueueJob<TPayload, TResult>>> {
return this.withLock(async () => {
const state = await this.readState();
const jobs = (state[queueName] ?? [])
.filter(job => !options?.status || job.status === options.status)
.sort((a, b) => b.createdAt.localeCompare(a.createdAt))
.slice(0, options?.limit ?? 50);
return jobs as Array<QueueJob<TPayload, TResult>>;
});
}
claimNext<TPayload = unknown, TResult = unknown>(
queueName: string,
workerId: string,
leaseMs: number,
now = new Date()
): Promise<QueueJob<TPayload, TResult> | undefined> {
return this.withLock(async () => {
const state = await this.readState();
const queue = state[queueName] ?? [];
const next = queue
.filter(job => this.isClaimable(job, now))
.sort(
(a, b) =>
a.scheduledAt.localeCompare(b.scheduledAt) || a.createdAt.localeCompare(b.createdAt)
)[0];
if (!next) return undefined;
next.status = 'running';
next.attempts += 1;
next.startedAt = next.startedAt || now.toISOString();
next.leaseOwner = workerId;
next.leaseExpiresAt = new Date(now.getTime() + leaseMs).toISOString();
await this.writeState(state);
return next as QueueJob<TPayload, TResult>;
});
}
patch<TPayload = unknown, TResult = unknown>(
queueName: string,
id: string,
patch: Partial<QueueJob<TPayload, TResult>>
): Promise<QueueJob<TPayload, TResult> | undefined> {
return this.withLock(async () => {
const state = await this.readState();
const queue = state[queueName] ?? [];
const index = queue.findIndex(job => job.id === id);
if (index === -1) return undefined;
const merged = {
...queue[index],
...patch,
};
queue[index] = merged;
state[queueName] = queue;
await this.writeState(state);
return merged as QueueJob<TPayload, TResult>;
});
}
clear(queueName?: string): Promise<void> {
return this.withLock(async () => {
if (!queueName) {
await this.writeState({});
return;
}
const state = await this.readState();
delete state[queueName];
await this.writeState(state);
});
}
private async withLock<T>(fn: () => Promise<T>): Promise<T> {
const run = this.operation.then(fn, fn);
this.operation = run.then(
() => undefined,
() => undefined
);
return run;
}
private async readState(): Promise<Record<string, QueueJob[]>> {
try {
const raw = await readFile(this.filePath, 'utf-8');
return JSON.parse(raw) as Record<string, QueueJob[]>;
} catch {
return {};
}
}
private async writeState(state: Record<string, QueueJob[]>): Promise<void> {
await mkdir(dirname(this.filePath), { recursive: true });
const tempPath = `${this.filePath}.${randomUUID()}.tmp`;
await writeFile(tempPath, JSON.stringify(state, null, 2), 'utf-8');
await rename(tempPath, this.filePath);
}
private isClaimable(job: QueueJob, now: Date): boolean {
if (job.status === 'queued') {
return job.scheduledAt <= now.toISOString();
}
if (job.status === 'running') {
return !job.leaseExpiresAt || job.leaseExpiresAt <= now.toISOString();
}
return false;
}
}

View File

@ -0,0 +1,13 @@
export type {
EnqueueJobInput,
ListJobsOptions,
QueueHandler,
QueueJob,
QueueJobStatus,
QueueStore,
QueueWorkerOptions,
WorkerContext,
} from './types.js';
export { MemoryQueueStore, isTerminalStatus } from './memory-store.js';
export { FileQueueStore, type FileQueueStoreOptions } from './file-store.js';
export { QueueWorker } from './worker.js';

View File

@ -0,0 +1,136 @@
import { randomUUID } from 'node:crypto';
import type {
EnqueueJobInput,
ListJobsOptions,
QueueJob,
QueueJobStatus,
QueueStore,
} from './types.js';
function cloneValue<T>(value: T): T {
return JSON.parse(JSON.stringify(value)) as T;
}
export class MemoryQueueStore implements QueueStore {
private queues = new Map<string, Map<string, QueueJob>>();
async enqueue<TPayload = unknown>(
queueName: string,
input: EnqueueJobInput<TPayload>
): Promise<QueueJob<TPayload>> {
const queue = this.getQueue(queueName);
if (input.idempotencyKey) {
const existing = [...queue.values()].find(job => job.idempotencyKey === input.idempotencyKey);
if (existing) return cloneValue(existing) as QueueJob<TPayload>;
}
const now = new Date();
const job: QueueJob<TPayload> = {
id: input.id || randomUUID(),
queueName,
type: input.type,
payload: input.payload,
status: 'queued',
attempts: 0,
maxAttempts: input.maxAttempts ?? 3,
createdAt: now.toISOString(),
scheduledAt: new Date(now.getTime() + (input.delayMs ?? 0)).toISOString(),
progress: input.progress,
metadata: input.metadata,
idempotencyKey: input.idempotencyKey,
productId: input.productId,
userId: input.userId,
};
queue.set(job.id, cloneValue(job));
return cloneValue(job);
}
async get<TPayload = unknown, TResult = unknown>(
queueName: string,
id: string
): Promise<QueueJob<TPayload, TResult> | undefined> {
const job = this.getQueue(queueName).get(id);
return job ? (cloneValue(job) as QueueJob<TPayload, TResult>) : undefined;
}
async list<TPayload = unknown, TResult = unknown>(
queueName: string,
options?: ListJobsOptions
): Promise<Array<QueueJob<TPayload, TResult>>> {
const jobs = [...this.getQueue(queueName).values()]
.filter(job => !options?.status || job.status === options.status)
.sort((a, b) => b.createdAt.localeCompare(a.createdAt));
return cloneValue(jobs.slice(0, options?.limit ?? 50)) as Array<QueueJob<TPayload, TResult>>;
}
async claimNext<TPayload = unknown, TResult = unknown>(
queueName: string,
workerId: string,
leaseMs: number,
now = new Date()
): Promise<QueueJob<TPayload, TResult> | undefined> {
const queue = this.getQueue(queueName);
const candidates = [...queue.values()]
.filter(job => this.isClaimable(job, now))
.sort(
(a, b) =>
a.scheduledAt.localeCompare(b.scheduledAt) || a.createdAt.localeCompare(b.createdAt)
);
const next = candidates[0];
if (!next) return undefined;
next.status = 'running';
next.attempts += 1;
next.startedAt = next.startedAt || now.toISOString();
next.leaseOwner = workerId;
next.leaseExpiresAt = new Date(now.getTime() + leaseMs).toISOString();
queue.set(next.id, cloneValue(next));
return cloneValue(next) as QueueJob<TPayload, TResult>;
}
async patch<TPayload = unknown, TResult = unknown>(
queueName: string,
id: string,
patch: Partial<QueueJob<TPayload, TResult>>
): Promise<QueueJob<TPayload, TResult> | undefined> {
const queue = this.getQueue(queueName);
const current = queue.get(id);
if (!current) return undefined;
const merged = {
...current,
...patch,
};
queue.set(id, cloneValue(merged));
return cloneValue(merged) as QueueJob<TPayload, TResult>;
}
async clear(queueName?: string): Promise<void> {
if (queueName) {
this.queues.delete(queueName);
return;
}
this.queues.clear();
}
private getQueue(queueName: string): Map<string, QueueJob> {
if (!this.queues.has(queueName)) {
this.queues.set(queueName, new Map());
}
return this.queues.get(queueName)!;
}
private isClaimable(job: QueueJob, now: Date): boolean {
if (job.status === 'queued') {
return job.scheduledAt <= now.toISOString();
}
if (job.status === 'running') {
return !job.leaseExpiresAt || job.leaseExpiresAt <= now.toISOString();
}
return false;
}
}
export function isTerminalStatus(status: QueueJobStatus): boolean {
return ['succeeded', 'failed', 'dead_letter', 'cancelled'].includes(status);
}

View File

@ -0,0 +1,70 @@
import { mkdtemp, readFile } from 'node:fs/promises';
import { tmpdir } from 'node:os';
import { join } from 'node:path';
import { describe, expect, it, vi } from 'vitest';
import { FileQueueStore } from './file-store.js';
import { MemoryQueueStore } from './memory-store.js';
import { QueueWorker } from './worker.js';
describe('MemoryQueueStore', () => {
it('enqueues and retrieves jobs', async () => {
const store = new MemoryQueueStore();
const job = await store.enqueue('test', {
type: 'demo',
payload: { value: 1 },
});
const found = await store.get('test', job.id);
expect(found?.payload).toEqual({ value: 1 });
expect(found?.status).toBe('queued');
});
});
describe('FileQueueStore', () => {
it('persists jobs across store instances', async () => {
const dir = await mkdtemp(join(tmpdir(), 'queue-store-'));
const filePath = join(dir, 'jobs.json');
const first = new FileQueueStore({ filePath });
const job = await first.enqueue('persisted', {
type: 'demo',
payload: { ok: true },
});
const second = new FileQueueStore({ filePath });
const found = await second.get('persisted', job.id);
expect(found?.payload).toEqual({ ok: true });
const raw = JSON.parse(await readFile(filePath, 'utf-8')) as Record<
string,
Array<{ id: string }>
>;
expect(raw.persisted[0].id).toBe(job.id);
});
});
describe('QueueWorker', () => {
it('processes queued jobs to completion', async () => {
const store = new MemoryQueueStore();
const worker = new QueueWorker<{ value: number }, { doubled: number }>({
queueName: 'work',
store,
pollIntervalMs: 5,
handler: async job => ({ doubled: job.payload.value * 2 }),
});
worker.start();
const job = await store.enqueue('work', {
type: 'double',
payload: { value: 21 },
});
await vi.waitFor(async () => {
const updated = await store.get<{ value: number }, { doubled: number }>('work', job.id);
expect(updated?.status).toBe('succeeded');
expect(updated?.result).toEqual({ doubled: 42 });
});
await worker.stop();
});
});

View File

@ -0,0 +1,95 @@
export type QueueJobStatus =
| 'queued'
| 'running'
| 'succeeded'
| 'failed'
| 'dead_letter'
| 'cancelled';
export interface QueueJob<TPayload = unknown, TResult = unknown> {
id: string;
queueName: string;
type: string;
payload: TPayload;
status: QueueJobStatus;
attempts: number;
maxAttempts: number;
createdAt: string;
scheduledAt: string;
startedAt?: string;
completedAt?: string;
lastError?: string;
result?: TResult;
progress?: Record<string, unknown>;
metadata?: Record<string, unknown>;
leaseOwner?: string;
leaseExpiresAt?: string;
idempotencyKey?: string;
productId?: string;
userId?: string;
}
export interface EnqueueJobInput<TPayload = unknown> {
id?: string;
type: string;
payload: TPayload;
maxAttempts?: number;
delayMs?: number;
progress?: Record<string, unknown>;
metadata?: Record<string, unknown>;
idempotencyKey?: string;
productId?: string;
userId?: string;
}
export interface ListJobsOptions {
limit?: number;
status?: QueueJobStatus;
}
export interface QueueStore {
enqueue<TPayload = unknown>(
queueName: string,
input: EnqueueJobInput<TPayload>
): Promise<QueueJob<TPayload>>;
get<TPayload = unknown, TResult = unknown>(
queueName: string,
id: string
): Promise<QueueJob<TPayload, TResult> | undefined>;
list<TPayload = unknown, TResult = unknown>(
queueName: string,
options?: ListJobsOptions
): Promise<Array<QueueJob<TPayload, TResult>>>;
claimNext<TPayload = unknown, TResult = unknown>(
queueName: string,
workerId: string,
leaseMs: number,
now?: Date
): Promise<QueueJob<TPayload, TResult> | undefined>;
patch<TPayload = unknown, TResult = unknown>(
queueName: string,
id: string,
patch: Partial<QueueJob<TPayload, TResult>>
): Promise<QueueJob<TPayload, TResult> | undefined>;
clear(queueName?: string): Promise<void>;
}
export interface WorkerContext<TPayload = unknown, TResult = unknown> {
patch(patch: Partial<QueueJob<TPayload, TResult>>): Promise<void>;
heartbeat(): Promise<void>;
}
export type QueueHandler<TPayload = unknown, TResult = unknown> = (
job: QueueJob<TPayload, TResult>,
context: WorkerContext<TPayload, TResult>
) => Promise<TResult>;
export interface QueueWorkerOptions<TPayload = unknown, TResult = unknown> {
queueName: string;
store: QueueStore;
handler: QueueHandler<TPayload, TResult>;
workerId?: string;
pollIntervalMs?: number;
leaseMs?: number;
backoffMs?: number;
}

View File

@ -0,0 +1,100 @@
import { randomUUID } from 'node:crypto';
import type { QueueWorkerOptions, WorkerContext } from './types.js';
export class QueueWorker<TPayload = unknown, TResult = unknown> {
private readonly queueName: string;
private readonly store: QueueWorkerOptions<TPayload, TResult>['store'];
private readonly handler: QueueWorkerOptions<TPayload, TResult>['handler'];
private readonly workerId: string;
private readonly pollIntervalMs: number;
private readonly leaseMs: number;
private readonly backoffMs: number;
private timer?: ReturnType<typeof setTimeout>;
private running = false;
private inflight?: Promise<void>;
constructor(options: QueueWorkerOptions<TPayload, TResult>) {
this.queueName = options.queueName;
this.store = options.store;
this.handler = options.handler;
this.workerId = options.workerId || `worker_${randomUUID()}`;
this.pollIntervalMs = options.pollIntervalMs ?? 200;
this.leaseMs = options.leaseMs ?? 30_000;
this.backoffMs = options.backoffMs ?? 1_000;
}
start(): void {
if (this.running) return;
this.running = true;
this.schedule(0);
}
async stop(): Promise<void> {
this.running = false;
if (this.timer) {
clearTimeout(this.timer);
this.timer = undefined;
}
await this.inflight;
}
private schedule(delayMs: number): void {
if (!this.running) return;
this.timer = globalThis.setTimeout(() => {
this.inflight = this.tick().finally(() => {
this.inflight = undefined;
});
}, delayMs);
}
private async tick(): Promise<void> {
const job = await this.store.claimNext<TPayload, TResult>(
this.queueName,
this.workerId,
this.leaseMs
);
if (!job) {
this.schedule(this.pollIntervalMs);
return;
}
const context: WorkerContext<TPayload, TResult> = {
patch: async patch => {
await this.store.patch<TPayload, TResult>(this.queueName, job.id, patch);
},
heartbeat: async () => {
await this.store.patch<TPayload, TResult>(this.queueName, job.id, {
leaseOwner: this.workerId,
leaseExpiresAt: new Date(Date.now() + this.leaseMs).toISOString(),
});
},
};
try {
const result = await this.handler(job, context);
await this.store.patch<TPayload, TResult>(this.queueName, job.id, {
status: 'succeeded',
result,
completedAt: new Date().toISOString(),
leaseOwner: undefined,
leaseExpiresAt: undefined,
});
} catch (err: unknown) {
const lastError = err instanceof Error ? err.message : String(err);
const finalStatus = job.attempts >= job.maxAttempts ? 'dead_letter' : 'queued';
await this.store.patch<TPayload, TResult>(this.queueName, job.id, {
status: finalStatus,
lastError,
scheduledAt:
finalStatus === 'queued'
? new Date(Date.now() + this.backoffMs * job.attempts).toISOString()
: job.scheduledAt,
completedAt: finalStatus === 'dead_letter' ? new Date().toISOString() : undefined,
leaseOwner: undefined,
leaseExpiresAt: undefined,
});
}
this.schedule(0);
}
}

View File

@ -0,0 +1,9 @@
{
"extends": "../../tsconfig.base.json",
"compilerOptions": {
"outDir": "dist",
"rootDir": "src"
},
"include": ["src"],
"exclude": ["src/**/*.test.ts"]
}

41
pnpm-lock.yaml generated
View File

@ -188,7 +188,7 @@ importers:
version: 9.39.2(jiti@2.6.1)
eslint-config-next:
specifier: 16.1.6
version: 16.1.6(@typescript-eslint/parser@8.55.0(eslint@9.39.2(jiti@2.6.1))(typescript@5.9.3))(eslint@9.39.2(jiti@2.6.1))(typescript@5.9.3)
version: 16.1.6(@typescript-eslint/parser@8.56.0(eslint@9.39.2(jiti@2.6.1))(typescript@5.9.3))(eslint@9.39.2(jiti@2.6.1))(typescript@5.9.3)
husky:
specifier: ^9.0.0
version: 9.1.7
@ -282,7 +282,7 @@ importers:
version: 9.39.2(jiti@2.6.1)
eslint-config-next:
specifier: 16.1.6
version: 16.1.6(@typescript-eslint/parser@8.56.0(eslint@9.39.2(jiti@2.6.1))(typescript@5.9.3))(eslint@9.39.2(jiti@2.6.1))(typescript@5.9.3)
version: 16.1.6(@typescript-eslint/parser@8.55.0(eslint@9.39.2(jiti@2.6.1))(typescript@5.9.3))(eslint@9.39.2(jiti@2.6.1))(typescript@5.9.3)
husky:
specifier: ^9.0.0
version: 9.1.7
@ -517,6 +517,15 @@ importers:
specifier: ^3.0.0
version: 3.2.4(@types/debug@4.1.12)(@types/node@22.19.11)(happy-dom@18.0.1)(jiti@2.6.1)(jsdom@28.0.0(@noble/hashes@1.8.0))(lightningcss@1.31.1)(msw@2.12.10(@types/node@22.19.11)(typescript@5.9.3))(terser@5.46.0)(tsx@4.21.0)(yaml@2.8.2)
packages/queue:
devDependencies:
'@types/node':
specifier: ^22.12.0
version: 22.19.11
vitest:
specifier: ^3.0.5
version: 3.2.4(@types/debug@4.1.12)(@types/node@22.19.11)(happy-dom@18.0.1)(jiti@2.6.1)(jsdom@28.0.0(@noble/hashes@1.8.0))(lightningcss@1.31.1)(msw@2.12.10(@types/node@22.19.11)(typescript@5.9.3))(terser@5.46.0)(tsx@4.21.0)(yaml@2.8.2)
packages/react-auth:
dependencies:
'@bytelyst/api-client':
@ -641,6 +650,9 @@ importers:
'@bytelyst/fastify-core':
specifier: workspace:*
version: link:../../packages/fastify-core
'@bytelyst/queue':
specifier: workspace:*
version: link:../../packages/queue
'@fastify/cors':
specifier: ^10.0.2
version: 10.1.0
@ -18903,7 +18915,7 @@ snapshots:
'@next/eslint-plugin-next': 16.1.6
eslint: 9.39.2(jiti@2.6.1)
eslint-import-resolver-node: 0.3.9
eslint-import-resolver-typescript: 3.10.1(eslint-plugin-import@2.32.0(@typescript-eslint/parser@8.56.0(eslint@9.39.2(jiti@2.6.1))(typescript@5.9.3))(eslint@9.39.2(jiti@2.6.1)))(eslint@9.39.2(jiti@2.6.1))
eslint-import-resolver-typescript: 3.10.1(eslint-plugin-import@2.32.0)(eslint@9.39.2(jiti@2.6.1))
eslint-plugin-import: 2.32.0(@typescript-eslint/parser@8.56.0(eslint@9.39.2(jiti@2.6.1))(typescript@5.9.3))(eslint-import-resolver-typescript@3.10.1)(eslint@9.39.2(jiti@2.6.1))
eslint-plugin-jsx-a11y: 6.10.2(eslint@9.39.2(jiti@2.6.1))
eslint-plugin-react: 7.37.5(eslint@9.39.2(jiti@2.6.1))
@ -18926,7 +18938,7 @@ snapshots:
transitivePeerDependencies:
- supports-color
eslint-import-resolver-typescript@3.10.1(eslint-plugin-import@2.32.0(@typescript-eslint/parser@8.56.0(eslint@9.39.2(jiti@2.6.1))(typescript@5.9.3))(eslint@9.39.2(jiti@2.6.1)))(eslint@9.39.2(jiti@2.6.1)):
eslint-import-resolver-typescript@3.10.1(eslint-plugin-import@2.32.0)(eslint@9.39.2(jiti@2.6.1)):
dependencies:
'@nolyfill/is-core-module': 1.0.39
debug: 4.4.3
@ -18941,21 +18953,6 @@ snapshots:
transitivePeerDependencies:
- supports-color
eslint-import-resolver-typescript@3.10.1(eslint-plugin-import@2.32.0)(eslint@9.39.2(jiti@2.6.1)):
dependencies:
'@nolyfill/is-core-module': 1.0.39
debug: 4.4.3
eslint: 9.39.2(jiti@2.6.1)
get-tsconfig: 4.13.6
is-bun-module: 2.0.0
stable-hash: 0.0.5
tinyglobby: 0.2.15
unrs-resolver: 1.11.1
optionalDependencies:
eslint-plugin-import: 2.32.0(@typescript-eslint/parser@8.55.0(eslint@9.39.2(jiti@2.6.1))(typescript@5.9.3))(eslint-import-resolver-typescript@3.10.1)(eslint@9.39.2(jiti@2.6.1))
transitivePeerDependencies:
- supports-color
eslint-module-utils@2.12.1(@typescript-eslint/parser@8.55.0(eslint@9.39.2(jiti@2.6.1))(typescript@5.9.3))(eslint-import-resolver-node@0.3.9)(eslint-import-resolver-typescript@3.10.1)(eslint@9.39.2(jiti@2.6.1)):
dependencies:
debug: 3.2.7
@ -18967,14 +18964,14 @@ snapshots:
transitivePeerDependencies:
- supports-color
eslint-module-utils@2.12.1(@typescript-eslint/parser@8.56.0(eslint@9.39.2(jiti@2.6.1))(typescript@5.9.3))(eslint-import-resolver-node@0.3.9)(eslint-import-resolver-typescript@3.10.1(eslint-plugin-import@2.32.0(@typescript-eslint/parser@8.56.0(eslint@9.39.2(jiti@2.6.1))(typescript@5.9.3))(eslint@9.39.2(jiti@2.6.1)))(eslint@9.39.2(jiti@2.6.1)))(eslint@9.39.2(jiti@2.6.1)):
eslint-module-utils@2.12.1(@typescript-eslint/parser@8.56.0(eslint@9.39.2(jiti@2.6.1))(typescript@5.9.3))(eslint-import-resolver-node@0.3.9)(eslint-import-resolver-typescript@3.10.1)(eslint@9.39.2(jiti@2.6.1)):
dependencies:
debug: 3.2.7
optionalDependencies:
'@typescript-eslint/parser': 8.56.0(eslint@9.39.2(jiti@2.6.1))(typescript@5.9.3)
eslint: 9.39.2(jiti@2.6.1)
eslint-import-resolver-node: 0.3.9
eslint-import-resolver-typescript: 3.10.1(eslint-plugin-import@2.32.0(@typescript-eslint/parser@8.56.0(eslint@9.39.2(jiti@2.6.1))(typescript@5.9.3))(eslint@9.39.2(jiti@2.6.1)))(eslint@9.39.2(jiti@2.6.1))
eslint-import-resolver-typescript: 3.10.1(eslint-plugin-import@2.32.0)(eslint@9.39.2(jiti@2.6.1))
transitivePeerDependencies:
- supports-color
@ -19018,7 +19015,7 @@ snapshots:
doctrine: 2.1.0
eslint: 9.39.2(jiti@2.6.1)
eslint-import-resolver-node: 0.3.9
eslint-module-utils: 2.12.1(@typescript-eslint/parser@8.56.0(eslint@9.39.2(jiti@2.6.1))(typescript@5.9.3))(eslint-import-resolver-node@0.3.9)(eslint-import-resolver-typescript@3.10.1(eslint-plugin-import@2.32.0(@typescript-eslint/parser@8.56.0(eslint@9.39.2(jiti@2.6.1))(typescript@5.9.3))(eslint@9.39.2(jiti@2.6.1)))(eslint@9.39.2(jiti@2.6.1)))(eslint@9.39.2(jiti@2.6.1))
eslint-module-utils: 2.12.1(@typescript-eslint/parser@8.56.0(eslint@9.39.2(jiti@2.6.1))(typescript@5.9.3))(eslint-import-resolver-node@0.3.9)(eslint-import-resolver-typescript@3.10.1)(eslint@9.39.2(jiti@2.6.1))
hasown: 2.0.2
is-core-module: 2.16.1
is-glob: 4.0.3

View File

@ -24,6 +24,7 @@
"@bytelyst/cosmos": "workspace:*",
"@bytelyst/errors": "workspace:*",
"@bytelyst/fastify-core": "workspace:*",
"@bytelyst/queue": "workspace:*",
"@azure/cosmos": "^4.2.0",
"@fastify/cors": "^10.0.2",
"@fastify/rate-limit": "^10.3.0",

View File

@ -13,6 +13,10 @@ const envSchema = z.object({
DEFAULT_PRODUCT_ID: z.string().default('lysnrai'),
PYTHON_SIDECAR_URL: z.string().default('http://localhost:4006'),
DEFAULT_MODEL_ID: z.string().default('gemini-2.5-flash'),
EXTRACTION_QUEUE_BACKEND: z.enum(['memory', 'file']).default('file'),
EXTRACTION_QUEUE_FILE: z.string().optional(),
EXTRACTION_QUEUE_POLL_MS: z.coerce.number().default(100),
EXTRACTION_QUEUE_LEASE_MS: z.coerce.number().default(30000),
});
export const config = envSchema.parse(process.env);

View File

@ -0,0 +1,107 @@
import Fastify from 'fastify';
import { mkdtemp } from 'node:fs/promises';
import { tmpdir } from 'node:os';
import { join } from 'node:path';
import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest';
import { FileQueueStore } from '@bytelyst/queue';
vi.mock('../../lib/python-bridge.js', () => ({
sidecarExtract: vi.fn(),
sidecarExtractBatch: vi.fn(),
sidecarHealth: vi.fn(),
}));
vi.mock('../../lib/errors.js', () => ({
BadRequestError: class BadRequestError extends Error {},
}));
vi.mock('./sidecar-monitor.js', () => ({
startHealthMonitoring: vi.fn(),
getHealthState: vi.fn(),
getHealthSummary: vi.fn(),
checkHealthNow: vi.fn(),
}));
import { extractRoutes } from './routes.js';
import { sidecarExtract } from '../../lib/python-bridge.js';
import { resetJobStore, setJobStoreForTesting } from './jobs.js';
const mockSidecarExtract = vi.mocked(sidecarExtract);
describe('extract jobs e2e', () => {
let app: ReturnType<typeof Fastify> | undefined;
let queueFilePath = '';
beforeEach(async () => {
const dir = await mkdtemp(join(tmpdir(), 'extract-jobs-e2e-'));
queueFilePath = join(dir, 'jobs.json');
process.env.EXTRACTION_QUEUE_BACKEND = 'file';
process.env.EXTRACTION_QUEUE_FILE = queueFilePath;
process.env.EXTRACTION_QUEUE_POLL_MS = '5';
mockSidecarExtract.mockReset();
});
afterEach(async () => {
if (app) {
await app.close();
app = undefined;
}
await resetJobStore();
await setJobStoreForTesting(undefined);
});
it('persists async jobs across app restart', async () => {
mockSidecarExtract.mockResolvedValue({
extractions: [{ extraction_class: 'topic', extraction_text: 'queue' }],
metadata: { model_id: 'gemini', duration_ms: 20, char_count: 10 },
});
await setJobStoreForTesting(new FileQueueStore({ filePath: queueFilePath }));
app = Fastify({ logger: false });
await app.register(extractRoutes, { prefix: '/api' });
const createResponse = await app.inject({
method: 'POST',
url: '/api/extract/jobs',
payload: {
inputs: [{ text: 'queue durability test', taskId: 'triage' }],
},
});
expect(createResponse.statusCode).toBe(202);
const created = createResponse.json() as { jobId: string };
await vi.waitFor(async () => {
const response = await app!.inject({
method: 'GET',
url: `/api/extract/jobs/${created.jobId}`,
});
expect(response.statusCode).toBe(200);
expect((response.json() as { status: string }).status).toBe('completed');
});
await app.close();
app = undefined;
await setJobStoreForTesting(new FileQueueStore({ filePath: queueFilePath }));
app = Fastify({ logger: false });
await app.register(extractRoutes, { prefix: '/api' });
const persistedResponse = await app.inject({
method: 'GET',
url: `/api/extract/jobs/${created.jobId}`,
});
expect(persistedResponse.statusCode).toBe(200);
expect(persistedResponse.json()).toMatchObject({
id: created.jobId,
status: 'completed',
progress: { completed: 1, total: 1 },
results: [
{
extractions: [{ extraction_class: 'topic', extraction_text: 'queue' }],
},
],
});
});
});

View File

@ -1,305 +1,111 @@
/**
* Tests for async extraction job queue createJob, getJob, listJobs.
* Tests for durable extraction jobs queue-backed create/get/list behavior.
*/
import { describe, it, expect, vi, beforeEach } from 'vitest';
import { createJob, getJob, listJobs, resetJobStore } from './jobs.js';
import { MemoryQueueStore } from '@bytelyst/queue';
import { createJob, getJob, listJobs, resetJobStore, setJobStoreForTesting } from './jobs.js';
import { type WebhookConfig } from './webhooks.js';
// Mock the python-bridge to avoid real sidecar calls
vi.mock('../../lib/python-bridge.js', () => ({
sidecarExtract: vi.fn(),
}));
// Mock the webhooks module
vi.mock('./webhooks.js', () => ({
triggerJobWebhook: vi.fn(),
}));
vi.mock('./webhooks.js', async () => {
const actual = await vi.importActual<typeof import('./webhooks.js')>('./webhooks.js');
return {
...actual,
triggerJobWebhook: vi.fn(),
};
});
import { sidecarExtract } from '../../lib/python-bridge.js';
import { triggerJobWebhook } from './webhooks.js';
const mockSidecarExtract = vi.mocked(sidecarExtract);
const mockTriggerJobWebhook = vi.mocked(triggerJobWebhook);
describe('extraction jobs', () => {
beforeEach(() => {
beforeEach(async () => {
mockSidecarExtract.mockReset();
mockTriggerJobWebhook.mockReset();
resetJobStore();
await setJobStoreForTesting(new MemoryQueueStore());
await resetJobStore();
});
describe('createJob', () => {
it('creates a job and starts processing', () => {
mockSidecarExtract.mockResolvedValue({
extractions: [],
metadata: { model_id: 'test', duration_ms: 10, char_count: 5 },
});
const job = createJob([{ text: 'Hello world' }]);
expect(job.id).toBeDefined();
expect(['pending', 'processing', 'completed']).toContain(job.status);
expect(job.inputs).toHaveLength(1);
expect(job.results).toEqual([]);
expect(job.errors).toEqual([]);
expect(job.progress).toEqual({ completed: 0, total: 1 });
expect(job.createdAt).toBeDefined();
it('creates a durable job and processes it', async () => {
mockSidecarExtract.mockResolvedValue({
extractions: [{ extraction_class: 'person', extraction_text: 'John' }],
metadata: { model_id: 'gemini', duration_ms: 100, char_count: 20 },
});
it('sets correct total from inputs length', () => {
mockSidecarExtract.mockResolvedValue({
extractions: [],
metadata: { model_id: 'test', duration_ms: 10, char_count: 5 },
});
const job = await createJob([{ text: 'Meet John at 3pm' }]);
expect(['pending', 'processing', 'completed']).toContain(job.status);
const job = createJob([{ text: 'First' }, { text: 'Second' }, { text: 'Third' }]);
expect(job.progress.total).toBe(3);
});
it('generates unique IDs for each job', () => {
mockSidecarExtract.mockResolvedValue({
extractions: [],
metadata: { model_id: 'test', duration_ms: 10, char_count: 5 },
});
const job1 = createJob([{ text: 'A' }]);
const job2 = createJob([{ text: 'B' }]);
expect(job1.id).not.toBe(job2.id);
});
it('stores the job for later retrieval', () => {
mockSidecarExtract.mockResolvedValue({
extractions: [],
metadata: { model_id: 'test', duration_ms: 10, char_count: 5 },
});
const job = createJob([{ text: 'Stored' }]);
const retrieved = getJob(job.id);
expect(retrieved).toBeDefined();
expect(retrieved!.id).toBe(job.id);
});
it('processes job and marks as completed on success', async () => {
mockSidecarExtract.mockResolvedValue({
extractions: [{ extraction_class: 'person', extraction_text: 'John' }],
metadata: { model_id: 'gemini', duration_ms: 100, char_count: 20 },
});
const job = createJob([{ text: 'Meet John at 3pm' }]);
// Wait for background processing
await vi.waitFor(
() => {
const j = getJob(job.id);
expect(j!.status).toBe('completed');
},
{ timeout: 2000 }
);
const completed = getJob(job.id)!;
expect(completed.results).toHaveLength(1);
expect(completed.results[0].extractions[0].extraction_text).toBe('John');
expect(completed.progress.completed).toBe(1);
expect(completed.completedAt).toBeDefined();
});
it('records errors for failed extractions', async () => {
mockSidecarExtract.mockRejectedValue(new Error('Sidecar unavailable'));
const job = createJob([{ text: 'Will fail' }]);
await vi.waitFor(
() => {
const j = getJob(job.id);
expect(j!.status).not.toBe('pending');
expect(j!.status).not.toBe('processing');
},
{ timeout: 2000 }
);
const finished = getJob(job.id)!;
expect(finished.errors).toHaveLength(1);
expect(finished.errors[0].index).toBe(0);
expect(finished.errors[0].error).toBe('Sidecar unavailable');
expect(finished.status).toBe('failed');
});
it('handles mixed success/failure in batch', async () => {
mockSidecarExtract
.mockResolvedValueOnce({
extractions: [{ extraction_class: 'person', extraction_text: 'Alice' }],
metadata: { model_id: 'test', duration_ms: 50, char_count: 10 },
})
.mockRejectedValueOnce(new Error('timeout'))
.mockResolvedValueOnce({
extractions: [],
metadata: { model_id: 'test', duration_ms: 30, char_count: 5 },
});
const job = createJob([
{ text: 'Alice is here' },
{ text: 'Will timeout' },
{ text: 'Empty result' },
]);
await vi.waitFor(
() => {
const j = getJob(job.id);
expect(j!.progress.completed).toBe(3);
},
{ timeout: 2000 }
);
const finished = getJob(job.id)!;
expect(finished.status).toBe('completed'); // Not all failed
expect(finished.errors).toHaveLength(1);
expect(finished.errors[0].index).toBe(1);
expect(finished.results).toHaveLength(3); // Placeholder for failed one
await vi.waitFor(async () => {
const completed = await getJob(job.id);
expect(completed?.status).toBe('completed');
expect(completed?.results).toHaveLength(1);
expect(completed?.results[0].extractions[0].extraction_text).toBe('John');
});
});
describe('getJob', () => {
it('returns undefined for unknown job ID', () => {
expect(getJob('nonexistent-id')).toBeUndefined();
it('marks the job failed when every input errors', async () => {
mockSidecarExtract.mockRejectedValue(new Error('Sidecar unavailable'));
const job = await createJob([{ text: 'fail me' }]);
await vi.waitFor(async () => {
const finished = await getJob(job.id);
expect(finished?.status).toBe('failed');
expect(finished?.errors[0].error).toBe('Sidecar unavailable');
});
});
describe('listJobs', () => {
it('returns jobs sorted by creation date (newest first)', () => {
mockSidecarExtract.mockResolvedValue({
extractions: [],
metadata: { model_id: 'test', duration_ms: 10, char_count: 5 },
});
createJob([{ text: 'First' }]);
createJob([{ text: 'Second' }]);
createJob([{ text: 'Third' }]);
const jobs = listJobs();
expect(jobs.length).toBeGreaterThanOrEqual(3);
// Verify sorted newest first
for (let i = 0; i < jobs.length - 1; i++) {
expect(jobs[i].createdAt >= jobs[i + 1].createdAt).toBe(true);
}
it('lists jobs newest first', async () => {
mockSidecarExtract.mockResolvedValue({
extractions: [],
metadata: { model_id: 'test', duration_ms: 10, char_count: 5 },
});
it('respects limit parameter', () => {
mockSidecarExtract.mockResolvedValue({
extractions: [],
metadata: { model_id: 'test', duration_ms: 10, char_count: 5 },
});
const first = await createJob([{ text: 'First' }]);
await new Promise(resolve => globalThis.setTimeout(resolve, 10));
const second = await createJob([{ text: 'Second' }]);
for (let i = 0; i < 5; i++) {
createJob([{ text: `Job ${i}` }]);
}
const limited = listJobs(2);
expect(limited.length).toBe(2);
await vi.waitFor(async () => {
const jobs = await listJobs();
expect(jobs.length).toBeGreaterThanOrEqual(2);
});
it('defaults to limit of 50', () => {
const jobs = listJobs();
expect(jobs.length).toBeLessThanOrEqual(50);
});
const jobs = await listJobs();
expect(jobs[0].id).toBe(second.id);
expect(jobs.some(job => job.id === first.id)).toBe(true);
});
describe('webhook integration', () => {
it('stores webhook config on job creation', () => {
mockSidecarExtract.mockResolvedValue({
extractions: [],
metadata: { model_id: 'test', duration_ms: 10, char_count: 5 },
});
it('stores webhook config and triggers webhook on completion', async () => {
mockSidecarExtract.mockResolvedValue({
extractions: [],
metadata: { model_id: 'test', duration_ms: 10, char_count: 5 },
});
mockTriggerJobWebhook.mockResolvedValue(undefined);
const webhookConfig: WebhookConfig = {
url: 'https://example.com/webhook',
secret: 'secret',
retryAttempts: 3,
};
const webhookConfig: WebhookConfig = {
url: 'https://example.com/webhook',
secret: 'secret',
retryAttempts: 3,
};
const job = createJob([{ text: 'test' }], 'req-123', webhookConfig);
const job = await createJob([{ text: 'test' }], 'req-123', webhookConfig);
expect(job.webhookConfig).toEqual(webhookConfig);
expect(job.webhookConfig).toEqual(webhookConfig);
await vi.waitFor(async () => {
const finished = await getJob(job.id);
expect(finished?.status).toBe('completed');
});
it('triggers webhook on job completion', async () => {
mockSidecarExtract.mockResolvedValue({
extractions: [{ extraction_class: 'test', extraction_text: 'result' }],
metadata: { model_id: 'gemini', duration_ms: 100, char_count: 10 },
});
mockTriggerJobWebhook.mockResolvedValue(undefined);
const webhookConfig: WebhookConfig = {
url: 'https://example.com/webhook',
secret: 'secret',
};
const job = createJob([{ text: 'test' }], 'req-123', webhookConfig);
// Wait for background processing
await vi.waitFor(
() => {
const j = getJob(job.id);
expect(j!.status).toBe('completed');
},
{ timeout: 2000 }
);
expect(mockTriggerJobWebhook).toHaveBeenCalledWith(
expect.objectContaining({ id: job.id, status: 'completed' }),
webhookConfig
);
});
it('does not fail job if webhook fails', async () => {
mockSidecarExtract.mockResolvedValue({
extractions: [],
metadata: { model_id: 'test', duration_ms: 10, char_count: 5 },
});
mockTriggerJobWebhook.mockRejectedValue(new Error('Webhook failed'));
const webhookConfig: WebhookConfig = {
url: 'https://example.com/webhook',
secret: 'secret',
};
const job = createJob([{ text: 'test' }], 'req-123', webhookConfig);
// Wait for background processing
await vi.waitFor(
() => {
const j = getJob(job.id);
expect(j!.status).toBe('completed');
},
{ timeout: 2000 }
);
// Job should still be completed even if webhook failed
const completed = getJob(job.id)!;
expect(completed.status).toBe('completed');
});
it('does not trigger webhook if not configured', async () => {
mockSidecarExtract.mockResolvedValue({
extractions: [],
metadata: { model_id: 'test', duration_ms: 10, char_count: 5 },
});
const job = createJob([{ text: 'test' }]);
// Wait for background processing
await vi.waitFor(
() => {
const j = getJob(job.id);
expect(j!.status).toBe('completed');
},
{ timeout: 2000 }
);
expect(mockTriggerJobWebhook).not.toHaveBeenCalled();
});
expect(mockTriggerJobWebhook).toHaveBeenCalledWith(
expect.objectContaining({ id: job.id, status: 'completed' }),
webhookConfig
);
});
});

View File

@ -1,14 +1,20 @@
/**
* Async extraction job queue.
* Durable async extraction job queue.
*
* For large batch requests, callers can submit an async job and poll for results.
* Jobs are stored in-memory (Cosmos persistence deferred to Phase 7).
*
* Flow: POST /extract/jobs { jobId } GET /extract/jobs/:id { status, results }
* Optional: webhook callback on completion
* Jobs are stored in a pluggable queue store and processed by a background
* worker. File-backed storage is the default for single-instance dev/staging
* deployments; tests can swap in the memory store.
*/
import { randomUUID } from 'node:crypto';
import { mkdir } from 'node:fs/promises';
import { dirname, resolve } from 'node:path';
import {
FileQueueStore,
MemoryQueueStore,
QueueWorker,
type QueueJob,
type QueueStore,
} from '@bytelyst/queue';
import {
sidecarExtract,
type SidecarExtractRequest,
@ -18,6 +24,20 @@ import { triggerJobWebhook, type WebhookConfig } from './webhooks.js';
export type JobStatus = 'pending' | 'processing' | 'completed' | 'failed';
export interface ExtractionJobPayload {
inputs: SidecarExtractRequest[];
requestId?: string;
webhookConfig?: WebhookConfig;
}
export interface ExtractionJobResult {
processingStatus: 'completed' | 'failed';
results: SidecarExtractResponse[];
errors: Array<{ index: number; error: string }>;
progress: { completed: number; total: number };
completedAt: string;
}
export interface ExtractionJob {
id: string;
status: JobStatus;
@ -28,112 +48,200 @@ export interface ExtractionJob {
createdAt: string;
completedAt?: string;
webhookConfig?: WebhookConfig;
attempts: number;
lastError?: string;
}
const jobStore = new Map<string, ExtractionJob>();
const MAX_JOBS = 1000; // Prevent unbounded memory growth
const QUEUE_NAME = 'extraction-jobs';
let queueStore: QueueStore | undefined;
let queueWorker: QueueWorker<ExtractionJobPayload, ExtractionJobResult> | undefined;
let queueInitialized = false;
/**
* Cleanup old jobs to prevent memory leak.
* Keeps most recent jobs, removes oldest completed/failed first.
*/
function cleanupOldJobs(): void {
if (jobStore.size <= MAX_JOBS) return;
function getQueueBackend(): 'memory' | 'file' {
const backend = process.env.EXTRACTION_QUEUE_BACKEND;
if (backend === 'memory' || backend === 'file') return backend;
return process.env.NODE_ENV === 'test' ? 'memory' : 'file';
}
const allJobs = [...jobStore.values()].sort((a, b) => a.createdAt.localeCompare(b.createdAt));
function getQueueFilePath(): string {
return resolve(process.cwd(), process.env.EXTRACTION_QUEUE_FILE || '.data/extraction-jobs.json');
}
const toRemove = allJobs.slice(0, allJobs.length - MAX_JOBS);
for (const job of toRemove) {
// Only remove completed or failed jobs
if (job.status === 'completed' || job.status === 'failed') {
jobStore.delete(job.id);
}
async function resolveQueueStore(): Promise<QueueStore> {
if (queueStore) return queueStore;
if (getQueueBackend() === 'memory') {
queueStore = new MemoryQueueStore();
return queueStore;
}
const filePath = getQueueFilePath();
await mkdir(dirname(filePath), { recursive: true });
queueStore = new FileQueueStore({ filePath });
return queueStore;
}
/**
* Create a new async extraction job and start processing in background.
*/
export function createJob(
inputs: SidecarExtractRequest[],
requestId?: string,
webhookConfig?: WebhookConfig
): ExtractionJob {
const job: ExtractionJob = {
id: randomUUID(),
status: 'pending',
inputs,
results: [],
errors: [],
progress: { completed: 0, total: inputs.length },
createdAt: new Date().toISOString(),
webhookConfig,
function mapQueueJob(job: QueueJob<ExtractionJobPayload, ExtractionJobResult>): ExtractionJob {
const total = job.payload.inputs.length;
const result = job.result;
const progress = (job.progress as ExtractionJob['progress'] | undefined) ??
result?.progress ?? {
completed: 0,
total,
};
return {
id: job.id,
status:
job.status === 'queued'
? 'pending'
: job.status === 'running'
? 'processing'
: job.status === 'succeeded'
? (result?.processingStatus ?? 'completed')
: 'failed',
inputs: job.payload.inputs,
results: result?.results ?? [],
errors: result?.errors ?? [],
progress,
createdAt: job.createdAt,
completedAt: result?.completedAt ?? job.completedAt,
webhookConfig: job.payload.webhookConfig,
attempts: job.attempts,
lastError: job.lastError,
};
}
jobStore.set(job.id, job);
async function processExtractionJob(
job: QueueJob<ExtractionJobPayload, ExtractionJobResult>,
context: {
patch: (patch: Partial<QueueJob<ExtractionJobPayload, ExtractionJobResult>>) => Promise<void>;
heartbeat: () => Promise<void>;
}
): Promise<ExtractionJobResult> {
const results: SidecarExtractResponse[] = [];
const errors: Array<{ index: number; error: string }> = [];
const total = job.payload.inputs.length;
// Cleanup old jobs to prevent memory leak
cleanupOldJobs();
// Start processing in background (non-blocking)
processJob(job, requestId).catch(() => {
job.status = 'failed';
await context.patch({
progress: { completed: 0, total },
});
return job;
}
/**
* Get job by ID.
*/
export function getJob(jobId: string): ExtractionJob | undefined {
return jobStore.get(jobId);
}
/**
* List recent jobs (last 50).
*/
export function listJobs(limit = 50): ExtractionJob[] {
return [...jobStore.values()]
.sort((a, b) => b.createdAt.localeCompare(a.createdAt))
.slice(0, limit);
}
/**
* Reset job store (for testing).
*/
export function resetJobStore(): void {
jobStore.clear();
}
// ── Internal ─────────────────────────────────────────────────────
async function processJob(job: ExtractionJob, requestId?: string): Promise<void> {
job.status = 'processing';
for (let i = 0; i < job.inputs.length; i++) {
for (let i = 0; i < total; i++) {
try {
const result = await sidecarExtract(job.inputs[i], requestId);
job.results.push(result);
const result = await sidecarExtract(job.payload.inputs[i], job.payload.requestId);
results.push(result);
} catch (err) {
const message = err instanceof Error ? err.message : 'Unknown error';
job.errors.push({ index: i, error: message });
// Push a placeholder so indices align
job.results.push({
errors.push({ index: i, error: message });
results.push({
extractions: [],
metadata: { model_id: 'error', duration_ms: 0, char_count: 0 },
});
}
job.progress.completed = i + 1;
await context.patch({
progress: { completed: i + 1, total },
});
await context.heartbeat();
}
job.status = job.errors.length === job.inputs.length ? 'failed' : 'completed';
job.completedAt = new Date().toISOString();
const completedAt = new Date().toISOString();
const result: ExtractionJobResult = {
processingStatus: errors.length === total ? 'failed' : 'completed',
results,
errors,
progress: { completed: total, total },
completedAt,
};
// Trigger webhook if configured
if (job.webhookConfig) {
await triggerJobWebhook(job, job.webhookConfig).catch(() => {
// Webhook failures don't affect job status
if (job.payload.webhookConfig) {
const view: ExtractionJob = {
...mapQueueJob({
...job,
status: 'succeeded',
result,
completedAt,
}),
completedAt,
};
await triggerJobWebhook(view, job.payload.webhookConfig).catch(() => {
// Webhook failures are non-fatal to job processing.
});
}
return result;
}
export async function initJobQueue(
log: { info: (...args: unknown[]) => void } = { info: () => {} }
): Promise<void> {
if (queueInitialized) return;
const store = await resolveQueueStore();
queueWorker = new QueueWorker<ExtractionJobPayload, ExtractionJobResult>({
queueName: QUEUE_NAME,
store,
handler: processExtractionJob,
pollIntervalMs: parseInt(process.env.EXTRACTION_QUEUE_POLL_MS || '100', 10),
leaseMs: parseInt(process.env.EXTRACTION_QUEUE_LEASE_MS || '30000', 10),
});
queueWorker.start();
queueInitialized = true;
log.info({ backend: getQueueBackend() }, '[extract/jobs] queue worker started');
}
export async function shutdownJobQueue(): Promise<void> {
if (queueWorker) {
await queueWorker.stop();
}
queueWorker = undefined;
queueInitialized = false;
}
export async function createJob(
inputs: SidecarExtractRequest[],
requestId?: string,
webhookConfig?: WebhookConfig
): Promise<ExtractionJob> {
await initJobQueue();
const store = await resolveQueueStore();
const job = await store.enqueue<ExtractionJobPayload>(QUEUE_NAME, {
type: 'extract.batch',
payload: {
inputs,
requestId,
webhookConfig,
},
progress: {
completed: 0,
total: inputs.length,
},
productId: process.env.DEFAULT_PRODUCT_ID,
});
return mapQueueJob(job);
}
export async function getJob(jobId: string): Promise<ExtractionJob | undefined> {
const store = await resolveQueueStore();
const job = await store.get<ExtractionJobPayload, ExtractionJobResult>(QUEUE_NAME, jobId);
return job ? mapQueueJob(job) : undefined;
}
export async function listJobs(limit = 50): Promise<ExtractionJob[]> {
const store = await resolveQueueStore();
const jobs = await store.list<ExtractionJobPayload, ExtractionJobResult>(QUEUE_NAME, { limit });
return jobs.map(mapQueueJob);
}
export async function resetJobStore(): Promise<void> {
await shutdownJobQueue();
if (queueStore) {
await queueStore.clear(QUEUE_NAME);
}
}
export async function setJobStoreForTesting(store?: QueueStore): Promise<void> {
await shutdownJobQueue();
queueStore = store;
}

View File

@ -13,7 +13,7 @@ import { BadRequestError } from '../../lib/errors.js';
import { checkQuota, incrementUsage, getUsageSummary } from './usage.js';
import { recordExtraction, getMetricsSummary } from '../../lib/metrics.js';
import { sidecarBreaker } from '../../lib/circuit-breaker.js';
import { createJob, getJob, listJobs } from './jobs.js';
import { createJob, getJob, initJobQueue, listJobs, shutdownJobQueue } from './jobs.js';
import {
checkProductRateLimit,
getProductRateLimitStatus,
@ -103,6 +103,10 @@ const MODEL_REGISTRY = [
export async function extractRoutes(app: FastifyInstance) {
// Start sidecar health monitoring
startHealthMonitoring();
await initJobQueue(app.log);
app.addHook('onClose', async () => {
await shutdownJobQueue();
});
// Rate limiting for extraction endpoints — 30 req/min per IP (configurable)
await app.register(rateLimit, {
@ -401,7 +405,7 @@ export async function extractRoutes(app: FastifyInstance) {
};
}
const job = createJob(sidecarRequests, requestId, webhookConfig);
const job = await createJob(sidecarRequests, requestId, webhookConfig);
req.log.info({ jobId: job.id, inputCount: inputs.length }, 'async job created');
return reply.status(202).send({
@ -418,7 +422,7 @@ export async function extractRoutes(app: FastifyInstance) {
*/
app.get('/extract/jobs/:id', async (req, reply) => {
const { id } = req.params as { id: string };
const job = getJob(id);
const job = await getJob(id);
if (!job) {
return reply.status(404).send({ error: 'Job not found' });
}
@ -448,7 +452,7 @@ export async function extractRoutes(app: FastifyInstance) {
* GET /extract/jobs List recent async jobs.
*/
app.get('/extract/jobs', async (_req, reply) => {
const jobs = listJobs();
const jobs = await listJobs();
return reply.send({
jobs: jobs.map(j => ({
id: j.id,