diff --git a/.env.example b/.env.example index 9074dc55..60364310 100644 --- a/.env.example +++ b/.env.example @@ -47,6 +47,10 @@ SLACK_DEFAULT_CHANNEL= PYTHON_SIDECAR_URL=http://localhost:4006 DEFAULT_MODEL_ID=gemini-2.5-flash GEMINI_API_KEY=your-gemini-api-key +EXTRACTION_QUEUE_BACKEND=file +EXTRACTION_QUEUE_FILE=.data/extraction-jobs.json +EXTRACTION_QUEUE_POLL_MS=100 +EXTRACTION_QUEUE_LEASE_MS=30000 # ── Webhooks (optional — fire-and-forget callbacks) ────────── WEBHOOK_INVITATION_REDEEMED_URL= diff --git a/packages/queue/package.json b/packages/queue/package.json new file mode 100644 index 00000000..a638b7aa --- /dev/null +++ b/packages/queue/package.json @@ -0,0 +1,25 @@ +{ + "name": "@bytelyst/queue", + "version": "0.1.0", + "description": "Durable job queue with pluggable stores and worker runtime", + "type": "module", + "exports": { + ".": { + "import": "./dist/index.js", + "types": "./dist/index.d.ts" + } + }, + "main": "./dist/index.js", + "types": "./dist/index.d.ts", + "files": [ + "dist" + ], + "scripts": { + "build": "tsc", + "test": "vitest run" + }, + "devDependencies": { + "@types/node": "^22.12.0", + "vitest": "^3.0.5" + } +} diff --git a/packages/queue/src/file-store.ts b/packages/queue/src/file-store.ts new file mode 100644 index 00000000..2484ac02 --- /dev/null +++ b/packages/queue/src/file-store.ts @@ -0,0 +1,173 @@ +import { randomUUID } from 'node:crypto'; +import { mkdir, readFile, rename, writeFile } from 'node:fs/promises'; +import { dirname } from 'node:path'; +import type { EnqueueJobInput, ListJobsOptions, QueueJob, QueueStore } from './types.js'; + +export interface FileQueueStoreOptions { + filePath: string; +} + +export class FileQueueStore implements QueueStore { + private readonly filePath: string; + private operation = Promise.resolve(); + + constructor(options: FileQueueStoreOptions) { + this.filePath = options.filePath; + } + + enqueue( + queueName: string, + input: EnqueueJobInput + ): Promise> { + return this.withLock(async () => { + const state = await this.readState(); + const queue = state[queueName] ?? []; + if (input.idempotencyKey) { + const existing = queue.find(job => job.idempotencyKey === input.idempotencyKey); + if (existing) return existing as QueueJob; + } + + const now = new Date(); + const job: QueueJob = { + id: input.id || randomUUID(), + queueName, + type: input.type, + payload: input.payload, + status: 'queued', + attempts: 0, + maxAttempts: input.maxAttempts ?? 3, + createdAt: now.toISOString(), + scheduledAt: new Date(now.getTime() + (input.delayMs ?? 0)).toISOString(), + progress: input.progress, + metadata: input.metadata, + idempotencyKey: input.idempotencyKey, + productId: input.productId, + userId: input.userId, + }; + state[queueName] = [...queue, job]; + await this.writeState(state); + return job; + }); + } + + get( + queueName: string, + id: string + ): Promise | undefined> { + return this.withLock(async () => { + const state = await this.readState(); + const job = (state[queueName] ?? []).find(item => item.id === id); + return job as QueueJob | undefined; + }); + } + + list( + queueName: string, + options?: ListJobsOptions + ): Promise>> { + return this.withLock(async () => { + const state = await this.readState(); + const jobs = (state[queueName] ?? []) + .filter(job => !options?.status || job.status === options.status) + .sort((a, b) => b.createdAt.localeCompare(a.createdAt)) + .slice(0, options?.limit ?? 50); + return jobs as Array>; + }); + } + + claimNext( + queueName: string, + workerId: string, + leaseMs: number, + now = new Date() + ): Promise | undefined> { + return this.withLock(async () => { + const state = await this.readState(); + const queue = state[queueName] ?? []; + const next = queue + .filter(job => this.isClaimable(job, now)) + .sort( + (a, b) => + a.scheduledAt.localeCompare(b.scheduledAt) || a.createdAt.localeCompare(b.createdAt) + )[0]; + if (!next) return undefined; + + next.status = 'running'; + next.attempts += 1; + next.startedAt = next.startedAt || now.toISOString(); + next.leaseOwner = workerId; + next.leaseExpiresAt = new Date(now.getTime() + leaseMs).toISOString(); + await this.writeState(state); + return next as QueueJob; + }); + } + + patch( + queueName: string, + id: string, + patch: Partial> + ): Promise | undefined> { + return this.withLock(async () => { + const state = await this.readState(); + const queue = state[queueName] ?? []; + const index = queue.findIndex(job => job.id === id); + if (index === -1) return undefined; + + const merged = { + ...queue[index], + ...patch, + }; + queue[index] = merged; + state[queueName] = queue; + await this.writeState(state); + return merged as QueueJob; + }); + } + + clear(queueName?: string): Promise { + return this.withLock(async () => { + if (!queueName) { + await this.writeState({}); + return; + } + const state = await this.readState(); + delete state[queueName]; + await this.writeState(state); + }); + } + + private async withLock(fn: () => Promise): Promise { + const run = this.operation.then(fn, fn); + this.operation = run.then( + () => undefined, + () => undefined + ); + return run; + } + + private async readState(): Promise> { + try { + const raw = await readFile(this.filePath, 'utf-8'); + return JSON.parse(raw) as Record; + } catch { + return {}; + } + } + + private async writeState(state: Record): Promise { + await mkdir(dirname(this.filePath), { recursive: true }); + const tempPath = `${this.filePath}.${randomUUID()}.tmp`; + await writeFile(tempPath, JSON.stringify(state, null, 2), 'utf-8'); + await rename(tempPath, this.filePath); + } + + private isClaimable(job: QueueJob, now: Date): boolean { + if (job.status === 'queued') { + return job.scheduledAt <= now.toISOString(); + } + if (job.status === 'running') { + return !job.leaseExpiresAt || job.leaseExpiresAt <= now.toISOString(); + } + return false; + } +} diff --git a/packages/queue/src/index.ts b/packages/queue/src/index.ts new file mode 100644 index 00000000..0ed27dd2 --- /dev/null +++ b/packages/queue/src/index.ts @@ -0,0 +1,13 @@ +export type { + EnqueueJobInput, + ListJobsOptions, + QueueHandler, + QueueJob, + QueueJobStatus, + QueueStore, + QueueWorkerOptions, + WorkerContext, +} from './types.js'; +export { MemoryQueueStore, isTerminalStatus } from './memory-store.js'; +export { FileQueueStore, type FileQueueStoreOptions } from './file-store.js'; +export { QueueWorker } from './worker.js'; diff --git a/packages/queue/src/memory-store.ts b/packages/queue/src/memory-store.ts new file mode 100644 index 00000000..4eb06d69 --- /dev/null +++ b/packages/queue/src/memory-store.ts @@ -0,0 +1,136 @@ +import { randomUUID } from 'node:crypto'; +import type { + EnqueueJobInput, + ListJobsOptions, + QueueJob, + QueueJobStatus, + QueueStore, +} from './types.js'; + +function cloneValue(value: T): T { + return JSON.parse(JSON.stringify(value)) as T; +} + +export class MemoryQueueStore implements QueueStore { + private queues = new Map>(); + + async enqueue( + queueName: string, + input: EnqueueJobInput + ): Promise> { + const queue = this.getQueue(queueName); + if (input.idempotencyKey) { + const existing = [...queue.values()].find(job => job.idempotencyKey === input.idempotencyKey); + if (existing) return cloneValue(existing) as QueueJob; + } + + const now = new Date(); + const job: QueueJob = { + id: input.id || randomUUID(), + queueName, + type: input.type, + payload: input.payload, + status: 'queued', + attempts: 0, + maxAttempts: input.maxAttempts ?? 3, + createdAt: now.toISOString(), + scheduledAt: new Date(now.getTime() + (input.delayMs ?? 0)).toISOString(), + progress: input.progress, + metadata: input.metadata, + idempotencyKey: input.idempotencyKey, + productId: input.productId, + userId: input.userId, + }; + queue.set(job.id, cloneValue(job)); + return cloneValue(job); + } + + async get( + queueName: string, + id: string + ): Promise | undefined> { + const job = this.getQueue(queueName).get(id); + return job ? (cloneValue(job) as QueueJob) : undefined; + } + + async list( + queueName: string, + options?: ListJobsOptions + ): Promise>> { + const jobs = [...this.getQueue(queueName).values()] + .filter(job => !options?.status || job.status === options.status) + .sort((a, b) => b.createdAt.localeCompare(a.createdAt)); + return cloneValue(jobs.slice(0, options?.limit ?? 50)) as Array>; + } + + async claimNext( + queueName: string, + workerId: string, + leaseMs: number, + now = new Date() + ): Promise | undefined> { + const queue = this.getQueue(queueName); + const candidates = [...queue.values()] + .filter(job => this.isClaimable(job, now)) + .sort( + (a, b) => + a.scheduledAt.localeCompare(b.scheduledAt) || a.createdAt.localeCompare(b.createdAt) + ); + const next = candidates[0]; + if (!next) return undefined; + + next.status = 'running'; + next.attempts += 1; + next.startedAt = next.startedAt || now.toISOString(); + next.leaseOwner = workerId; + next.leaseExpiresAt = new Date(now.getTime() + leaseMs).toISOString(); + queue.set(next.id, cloneValue(next)); + return cloneValue(next) as QueueJob; + } + + async patch( + queueName: string, + id: string, + patch: Partial> + ): Promise | undefined> { + const queue = this.getQueue(queueName); + const current = queue.get(id); + if (!current) return undefined; + + const merged = { + ...current, + ...patch, + }; + queue.set(id, cloneValue(merged)); + return cloneValue(merged) as QueueJob; + } + + async clear(queueName?: string): Promise { + if (queueName) { + this.queues.delete(queueName); + return; + } + this.queues.clear(); + } + + private getQueue(queueName: string): Map { + if (!this.queues.has(queueName)) { + this.queues.set(queueName, new Map()); + } + return this.queues.get(queueName)!; + } + + private isClaimable(job: QueueJob, now: Date): boolean { + if (job.status === 'queued') { + return job.scheduledAt <= now.toISOString(); + } + if (job.status === 'running') { + return !job.leaseExpiresAt || job.leaseExpiresAt <= now.toISOString(); + } + return false; + } +} + +export function isTerminalStatus(status: QueueJobStatus): boolean { + return ['succeeded', 'failed', 'dead_letter', 'cancelled'].includes(status); +} diff --git a/packages/queue/src/queue.test.ts b/packages/queue/src/queue.test.ts new file mode 100644 index 00000000..a56c5c37 --- /dev/null +++ b/packages/queue/src/queue.test.ts @@ -0,0 +1,70 @@ +import { mkdtemp, readFile } from 'node:fs/promises'; +import { tmpdir } from 'node:os'; +import { join } from 'node:path'; +import { describe, expect, it, vi } from 'vitest'; +import { FileQueueStore } from './file-store.js'; +import { MemoryQueueStore } from './memory-store.js'; +import { QueueWorker } from './worker.js'; + +describe('MemoryQueueStore', () => { + it('enqueues and retrieves jobs', async () => { + const store = new MemoryQueueStore(); + const job = await store.enqueue('test', { + type: 'demo', + payload: { value: 1 }, + }); + + const found = await store.get('test', job.id); + expect(found?.payload).toEqual({ value: 1 }); + expect(found?.status).toBe('queued'); + }); +}); + +describe('FileQueueStore', () => { + it('persists jobs across store instances', async () => { + const dir = await mkdtemp(join(tmpdir(), 'queue-store-')); + const filePath = join(dir, 'jobs.json'); + + const first = new FileQueueStore({ filePath }); + const job = await first.enqueue('persisted', { + type: 'demo', + payload: { ok: true }, + }); + + const second = new FileQueueStore({ filePath }); + const found = await second.get('persisted', job.id); + expect(found?.payload).toEqual({ ok: true }); + + const raw = JSON.parse(await readFile(filePath, 'utf-8')) as Record< + string, + Array<{ id: string }> + >; + expect(raw.persisted[0].id).toBe(job.id); + }); +}); + +describe('QueueWorker', () => { + it('processes queued jobs to completion', async () => { + const store = new MemoryQueueStore(); + const worker = new QueueWorker<{ value: number }, { doubled: number }>({ + queueName: 'work', + store, + pollIntervalMs: 5, + handler: async job => ({ doubled: job.payload.value * 2 }), + }); + + worker.start(); + const job = await store.enqueue('work', { + type: 'double', + payload: { value: 21 }, + }); + + await vi.waitFor(async () => { + const updated = await store.get<{ value: number }, { doubled: number }>('work', job.id); + expect(updated?.status).toBe('succeeded'); + expect(updated?.result).toEqual({ doubled: 42 }); + }); + + await worker.stop(); + }); +}); diff --git a/packages/queue/src/types.ts b/packages/queue/src/types.ts new file mode 100644 index 00000000..cafc7dd7 --- /dev/null +++ b/packages/queue/src/types.ts @@ -0,0 +1,95 @@ +export type QueueJobStatus = + | 'queued' + | 'running' + | 'succeeded' + | 'failed' + | 'dead_letter' + | 'cancelled'; + +export interface QueueJob { + id: string; + queueName: string; + type: string; + payload: TPayload; + status: QueueJobStatus; + attempts: number; + maxAttempts: number; + createdAt: string; + scheduledAt: string; + startedAt?: string; + completedAt?: string; + lastError?: string; + result?: TResult; + progress?: Record; + metadata?: Record; + leaseOwner?: string; + leaseExpiresAt?: string; + idempotencyKey?: string; + productId?: string; + userId?: string; +} + +export interface EnqueueJobInput { + id?: string; + type: string; + payload: TPayload; + maxAttempts?: number; + delayMs?: number; + progress?: Record; + metadata?: Record; + idempotencyKey?: string; + productId?: string; + userId?: string; +} + +export interface ListJobsOptions { + limit?: number; + status?: QueueJobStatus; +} + +export interface QueueStore { + enqueue( + queueName: string, + input: EnqueueJobInput + ): Promise>; + get( + queueName: string, + id: string + ): Promise | undefined>; + list( + queueName: string, + options?: ListJobsOptions + ): Promise>>; + claimNext( + queueName: string, + workerId: string, + leaseMs: number, + now?: Date + ): Promise | undefined>; + patch( + queueName: string, + id: string, + patch: Partial> + ): Promise | undefined>; + clear(queueName?: string): Promise; +} + +export interface WorkerContext { + patch(patch: Partial>): Promise; + heartbeat(): Promise; +} + +export type QueueHandler = ( + job: QueueJob, + context: WorkerContext +) => Promise; + +export interface QueueWorkerOptions { + queueName: string; + store: QueueStore; + handler: QueueHandler; + workerId?: string; + pollIntervalMs?: number; + leaseMs?: number; + backoffMs?: number; +} diff --git a/packages/queue/src/worker.ts b/packages/queue/src/worker.ts new file mode 100644 index 00000000..7ed687c4 --- /dev/null +++ b/packages/queue/src/worker.ts @@ -0,0 +1,100 @@ +import { randomUUID } from 'node:crypto'; +import type { QueueWorkerOptions, WorkerContext } from './types.js'; + +export class QueueWorker { + private readonly queueName: string; + private readonly store: QueueWorkerOptions['store']; + private readonly handler: QueueWorkerOptions['handler']; + private readonly workerId: string; + private readonly pollIntervalMs: number; + private readonly leaseMs: number; + private readonly backoffMs: number; + private timer?: ReturnType; + private running = false; + private inflight?: Promise; + + constructor(options: QueueWorkerOptions) { + this.queueName = options.queueName; + this.store = options.store; + this.handler = options.handler; + this.workerId = options.workerId || `worker_${randomUUID()}`; + this.pollIntervalMs = options.pollIntervalMs ?? 200; + this.leaseMs = options.leaseMs ?? 30_000; + this.backoffMs = options.backoffMs ?? 1_000; + } + + start(): void { + if (this.running) return; + this.running = true; + this.schedule(0); + } + + async stop(): Promise { + this.running = false; + if (this.timer) { + clearTimeout(this.timer); + this.timer = undefined; + } + await this.inflight; + } + + private schedule(delayMs: number): void { + if (!this.running) return; + this.timer = globalThis.setTimeout(() => { + this.inflight = this.tick().finally(() => { + this.inflight = undefined; + }); + }, delayMs); + } + + private async tick(): Promise { + const job = await this.store.claimNext( + this.queueName, + this.workerId, + this.leaseMs + ); + if (!job) { + this.schedule(this.pollIntervalMs); + return; + } + + const context: WorkerContext = { + patch: async patch => { + await this.store.patch(this.queueName, job.id, patch); + }, + heartbeat: async () => { + await this.store.patch(this.queueName, job.id, { + leaseOwner: this.workerId, + leaseExpiresAt: new Date(Date.now() + this.leaseMs).toISOString(), + }); + }, + }; + + try { + const result = await this.handler(job, context); + await this.store.patch(this.queueName, job.id, { + status: 'succeeded', + result, + completedAt: new Date().toISOString(), + leaseOwner: undefined, + leaseExpiresAt: undefined, + }); + } catch (err: unknown) { + const lastError = err instanceof Error ? err.message : String(err); + const finalStatus = job.attempts >= job.maxAttempts ? 'dead_letter' : 'queued'; + await this.store.patch(this.queueName, job.id, { + status: finalStatus, + lastError, + scheduledAt: + finalStatus === 'queued' + ? new Date(Date.now() + this.backoffMs * job.attempts).toISOString() + : job.scheduledAt, + completedAt: finalStatus === 'dead_letter' ? new Date().toISOString() : undefined, + leaseOwner: undefined, + leaseExpiresAt: undefined, + }); + } + + this.schedule(0); + } +} diff --git a/packages/queue/tsconfig.json b/packages/queue/tsconfig.json new file mode 100644 index 00000000..5edad813 --- /dev/null +++ b/packages/queue/tsconfig.json @@ -0,0 +1,9 @@ +{ + "extends": "../../tsconfig.base.json", + "compilerOptions": { + "outDir": "dist", + "rootDir": "src" + }, + "include": ["src"], + "exclude": ["src/**/*.test.ts"] +} diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index d83d1199..13a31c2c 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -188,7 +188,7 @@ importers: version: 9.39.2(jiti@2.6.1) eslint-config-next: specifier: 16.1.6 - version: 16.1.6(@typescript-eslint/parser@8.55.0(eslint@9.39.2(jiti@2.6.1))(typescript@5.9.3))(eslint@9.39.2(jiti@2.6.1))(typescript@5.9.3) + version: 16.1.6(@typescript-eslint/parser@8.56.0(eslint@9.39.2(jiti@2.6.1))(typescript@5.9.3))(eslint@9.39.2(jiti@2.6.1))(typescript@5.9.3) husky: specifier: ^9.0.0 version: 9.1.7 @@ -282,7 +282,7 @@ importers: version: 9.39.2(jiti@2.6.1) eslint-config-next: specifier: 16.1.6 - version: 16.1.6(@typescript-eslint/parser@8.56.0(eslint@9.39.2(jiti@2.6.1))(typescript@5.9.3))(eslint@9.39.2(jiti@2.6.1))(typescript@5.9.3) + version: 16.1.6(@typescript-eslint/parser@8.55.0(eslint@9.39.2(jiti@2.6.1))(typescript@5.9.3))(eslint@9.39.2(jiti@2.6.1))(typescript@5.9.3) husky: specifier: ^9.0.0 version: 9.1.7 @@ -517,6 +517,15 @@ importers: specifier: ^3.0.0 version: 3.2.4(@types/debug@4.1.12)(@types/node@22.19.11)(happy-dom@18.0.1)(jiti@2.6.1)(jsdom@28.0.0(@noble/hashes@1.8.0))(lightningcss@1.31.1)(msw@2.12.10(@types/node@22.19.11)(typescript@5.9.3))(terser@5.46.0)(tsx@4.21.0)(yaml@2.8.2) + packages/queue: + devDependencies: + '@types/node': + specifier: ^22.12.0 + version: 22.19.11 + vitest: + specifier: ^3.0.5 + version: 3.2.4(@types/debug@4.1.12)(@types/node@22.19.11)(happy-dom@18.0.1)(jiti@2.6.1)(jsdom@28.0.0(@noble/hashes@1.8.0))(lightningcss@1.31.1)(msw@2.12.10(@types/node@22.19.11)(typescript@5.9.3))(terser@5.46.0)(tsx@4.21.0)(yaml@2.8.2) + packages/react-auth: dependencies: '@bytelyst/api-client': @@ -641,6 +650,9 @@ importers: '@bytelyst/fastify-core': specifier: workspace:* version: link:../../packages/fastify-core + '@bytelyst/queue': + specifier: workspace:* + version: link:../../packages/queue '@fastify/cors': specifier: ^10.0.2 version: 10.1.0 @@ -18903,7 +18915,7 @@ snapshots: '@next/eslint-plugin-next': 16.1.6 eslint: 9.39.2(jiti@2.6.1) eslint-import-resolver-node: 0.3.9 - eslint-import-resolver-typescript: 3.10.1(eslint-plugin-import@2.32.0(@typescript-eslint/parser@8.56.0(eslint@9.39.2(jiti@2.6.1))(typescript@5.9.3))(eslint@9.39.2(jiti@2.6.1)))(eslint@9.39.2(jiti@2.6.1)) + eslint-import-resolver-typescript: 3.10.1(eslint-plugin-import@2.32.0)(eslint@9.39.2(jiti@2.6.1)) eslint-plugin-import: 2.32.0(@typescript-eslint/parser@8.56.0(eslint@9.39.2(jiti@2.6.1))(typescript@5.9.3))(eslint-import-resolver-typescript@3.10.1)(eslint@9.39.2(jiti@2.6.1)) eslint-plugin-jsx-a11y: 6.10.2(eslint@9.39.2(jiti@2.6.1)) eslint-plugin-react: 7.37.5(eslint@9.39.2(jiti@2.6.1)) @@ -18926,7 +18938,7 @@ snapshots: transitivePeerDependencies: - supports-color - eslint-import-resolver-typescript@3.10.1(eslint-plugin-import@2.32.0(@typescript-eslint/parser@8.56.0(eslint@9.39.2(jiti@2.6.1))(typescript@5.9.3))(eslint@9.39.2(jiti@2.6.1)))(eslint@9.39.2(jiti@2.6.1)): + eslint-import-resolver-typescript@3.10.1(eslint-plugin-import@2.32.0)(eslint@9.39.2(jiti@2.6.1)): dependencies: '@nolyfill/is-core-module': 1.0.39 debug: 4.4.3 @@ -18941,21 +18953,6 @@ snapshots: transitivePeerDependencies: - supports-color - eslint-import-resolver-typescript@3.10.1(eslint-plugin-import@2.32.0)(eslint@9.39.2(jiti@2.6.1)): - dependencies: - '@nolyfill/is-core-module': 1.0.39 - debug: 4.4.3 - eslint: 9.39.2(jiti@2.6.1) - get-tsconfig: 4.13.6 - is-bun-module: 2.0.0 - stable-hash: 0.0.5 - tinyglobby: 0.2.15 - unrs-resolver: 1.11.1 - optionalDependencies: - eslint-plugin-import: 2.32.0(@typescript-eslint/parser@8.55.0(eslint@9.39.2(jiti@2.6.1))(typescript@5.9.3))(eslint-import-resolver-typescript@3.10.1)(eslint@9.39.2(jiti@2.6.1)) - transitivePeerDependencies: - - supports-color - eslint-module-utils@2.12.1(@typescript-eslint/parser@8.55.0(eslint@9.39.2(jiti@2.6.1))(typescript@5.9.3))(eslint-import-resolver-node@0.3.9)(eslint-import-resolver-typescript@3.10.1)(eslint@9.39.2(jiti@2.6.1)): dependencies: debug: 3.2.7 @@ -18967,14 +18964,14 @@ snapshots: transitivePeerDependencies: - supports-color - eslint-module-utils@2.12.1(@typescript-eslint/parser@8.56.0(eslint@9.39.2(jiti@2.6.1))(typescript@5.9.3))(eslint-import-resolver-node@0.3.9)(eslint-import-resolver-typescript@3.10.1(eslint-plugin-import@2.32.0(@typescript-eslint/parser@8.56.0(eslint@9.39.2(jiti@2.6.1))(typescript@5.9.3))(eslint@9.39.2(jiti@2.6.1)))(eslint@9.39.2(jiti@2.6.1)))(eslint@9.39.2(jiti@2.6.1)): + eslint-module-utils@2.12.1(@typescript-eslint/parser@8.56.0(eslint@9.39.2(jiti@2.6.1))(typescript@5.9.3))(eslint-import-resolver-node@0.3.9)(eslint-import-resolver-typescript@3.10.1)(eslint@9.39.2(jiti@2.6.1)): dependencies: debug: 3.2.7 optionalDependencies: '@typescript-eslint/parser': 8.56.0(eslint@9.39.2(jiti@2.6.1))(typescript@5.9.3) eslint: 9.39.2(jiti@2.6.1) eslint-import-resolver-node: 0.3.9 - eslint-import-resolver-typescript: 3.10.1(eslint-plugin-import@2.32.0(@typescript-eslint/parser@8.56.0(eslint@9.39.2(jiti@2.6.1))(typescript@5.9.3))(eslint@9.39.2(jiti@2.6.1)))(eslint@9.39.2(jiti@2.6.1)) + eslint-import-resolver-typescript: 3.10.1(eslint-plugin-import@2.32.0)(eslint@9.39.2(jiti@2.6.1)) transitivePeerDependencies: - supports-color @@ -19018,7 +19015,7 @@ snapshots: doctrine: 2.1.0 eslint: 9.39.2(jiti@2.6.1) eslint-import-resolver-node: 0.3.9 - eslint-module-utils: 2.12.1(@typescript-eslint/parser@8.56.0(eslint@9.39.2(jiti@2.6.1))(typescript@5.9.3))(eslint-import-resolver-node@0.3.9)(eslint-import-resolver-typescript@3.10.1(eslint-plugin-import@2.32.0(@typescript-eslint/parser@8.56.0(eslint@9.39.2(jiti@2.6.1))(typescript@5.9.3))(eslint@9.39.2(jiti@2.6.1)))(eslint@9.39.2(jiti@2.6.1)))(eslint@9.39.2(jiti@2.6.1)) + eslint-module-utils: 2.12.1(@typescript-eslint/parser@8.56.0(eslint@9.39.2(jiti@2.6.1))(typescript@5.9.3))(eslint-import-resolver-node@0.3.9)(eslint-import-resolver-typescript@3.10.1)(eslint@9.39.2(jiti@2.6.1)) hasown: 2.0.2 is-core-module: 2.16.1 is-glob: 4.0.3 diff --git a/services/extraction-service/package.json b/services/extraction-service/package.json index 2f5827b3..33ebccf5 100644 --- a/services/extraction-service/package.json +++ b/services/extraction-service/package.json @@ -24,6 +24,7 @@ "@bytelyst/cosmos": "workspace:*", "@bytelyst/errors": "workspace:*", "@bytelyst/fastify-core": "workspace:*", + "@bytelyst/queue": "workspace:*", "@azure/cosmos": "^4.2.0", "@fastify/cors": "^10.0.2", "@fastify/rate-limit": "^10.3.0", diff --git a/services/extraction-service/src/lib/config.ts b/services/extraction-service/src/lib/config.ts index aaa271cd..9721ce78 100644 --- a/services/extraction-service/src/lib/config.ts +++ b/services/extraction-service/src/lib/config.ts @@ -13,6 +13,10 @@ const envSchema = z.object({ DEFAULT_PRODUCT_ID: z.string().default('lysnrai'), PYTHON_SIDECAR_URL: z.string().default('http://localhost:4006'), DEFAULT_MODEL_ID: z.string().default('gemini-2.5-flash'), + EXTRACTION_QUEUE_BACKEND: z.enum(['memory', 'file']).default('file'), + EXTRACTION_QUEUE_FILE: z.string().optional(), + EXTRACTION_QUEUE_POLL_MS: z.coerce.number().default(100), + EXTRACTION_QUEUE_LEASE_MS: z.coerce.number().default(30000), }); export const config = envSchema.parse(process.env); diff --git a/services/extraction-service/src/modules/extract/jobs.e2e.test.ts b/services/extraction-service/src/modules/extract/jobs.e2e.test.ts new file mode 100644 index 00000000..24f12a5e --- /dev/null +++ b/services/extraction-service/src/modules/extract/jobs.e2e.test.ts @@ -0,0 +1,107 @@ +import Fastify from 'fastify'; +import { mkdtemp } from 'node:fs/promises'; +import { tmpdir } from 'node:os'; +import { join } from 'node:path'; +import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'; +import { FileQueueStore } from '@bytelyst/queue'; + +vi.mock('../../lib/python-bridge.js', () => ({ + sidecarExtract: vi.fn(), + sidecarExtractBatch: vi.fn(), + sidecarHealth: vi.fn(), +})); + +vi.mock('../../lib/errors.js', () => ({ + BadRequestError: class BadRequestError extends Error {}, +})); + +vi.mock('./sidecar-monitor.js', () => ({ + startHealthMonitoring: vi.fn(), + getHealthState: vi.fn(), + getHealthSummary: vi.fn(), + checkHealthNow: vi.fn(), +})); + +import { extractRoutes } from './routes.js'; +import { sidecarExtract } from '../../lib/python-bridge.js'; +import { resetJobStore, setJobStoreForTesting } from './jobs.js'; + +const mockSidecarExtract = vi.mocked(sidecarExtract); + +describe('extract jobs e2e', () => { + let app: ReturnType | undefined; + let queueFilePath = ''; + + beforeEach(async () => { + const dir = await mkdtemp(join(tmpdir(), 'extract-jobs-e2e-')); + queueFilePath = join(dir, 'jobs.json'); + process.env.EXTRACTION_QUEUE_BACKEND = 'file'; + process.env.EXTRACTION_QUEUE_FILE = queueFilePath; + process.env.EXTRACTION_QUEUE_POLL_MS = '5'; + mockSidecarExtract.mockReset(); + }); + + afterEach(async () => { + if (app) { + await app.close(); + app = undefined; + } + await resetJobStore(); + await setJobStoreForTesting(undefined); + }); + + it('persists async jobs across app restart', async () => { + mockSidecarExtract.mockResolvedValue({ + extractions: [{ extraction_class: 'topic', extraction_text: 'queue' }], + metadata: { model_id: 'gemini', duration_ms: 20, char_count: 10 }, + }); + + await setJobStoreForTesting(new FileQueueStore({ filePath: queueFilePath })); + app = Fastify({ logger: false }); + await app.register(extractRoutes, { prefix: '/api' }); + + const createResponse = await app.inject({ + method: 'POST', + url: '/api/extract/jobs', + payload: { + inputs: [{ text: 'queue durability test', taskId: 'triage' }], + }, + }); + + expect(createResponse.statusCode).toBe(202); + const created = createResponse.json() as { jobId: string }; + + await vi.waitFor(async () => { + const response = await app!.inject({ + method: 'GET', + url: `/api/extract/jobs/${created.jobId}`, + }); + expect(response.statusCode).toBe(200); + expect((response.json() as { status: string }).status).toBe('completed'); + }); + + await app.close(); + app = undefined; + + await setJobStoreForTesting(new FileQueueStore({ filePath: queueFilePath })); + app = Fastify({ logger: false }); + await app.register(extractRoutes, { prefix: '/api' }); + + const persistedResponse = await app.inject({ + method: 'GET', + url: `/api/extract/jobs/${created.jobId}`, + }); + + expect(persistedResponse.statusCode).toBe(200); + expect(persistedResponse.json()).toMatchObject({ + id: created.jobId, + status: 'completed', + progress: { completed: 1, total: 1 }, + results: [ + { + extractions: [{ extraction_class: 'topic', extraction_text: 'queue' }], + }, + ], + }); + }); +}); diff --git a/services/extraction-service/src/modules/extract/jobs.test.ts b/services/extraction-service/src/modules/extract/jobs.test.ts index 2f229d0b..b456f333 100644 --- a/services/extraction-service/src/modules/extract/jobs.test.ts +++ b/services/extraction-service/src/modules/extract/jobs.test.ts @@ -1,305 +1,111 @@ /** - * Tests for async extraction job queue — createJob, getJob, listJobs. + * Tests for durable extraction jobs — queue-backed create/get/list behavior. */ import { describe, it, expect, vi, beforeEach } from 'vitest'; -import { createJob, getJob, listJobs, resetJobStore } from './jobs.js'; +import { MemoryQueueStore } from '@bytelyst/queue'; +import { createJob, getJob, listJobs, resetJobStore, setJobStoreForTesting } from './jobs.js'; import { type WebhookConfig } from './webhooks.js'; -// Mock the python-bridge to avoid real sidecar calls vi.mock('../../lib/python-bridge.js', () => ({ sidecarExtract: vi.fn(), })); -// Mock the webhooks module -vi.mock('./webhooks.js', () => ({ - triggerJobWebhook: vi.fn(), -})); +vi.mock('./webhooks.js', async () => { + const actual = await vi.importActual('./webhooks.js'); + return { + ...actual, + triggerJobWebhook: vi.fn(), + }; +}); import { sidecarExtract } from '../../lib/python-bridge.js'; import { triggerJobWebhook } from './webhooks.js'; + const mockSidecarExtract = vi.mocked(sidecarExtract); const mockTriggerJobWebhook = vi.mocked(triggerJobWebhook); describe('extraction jobs', () => { - beforeEach(() => { + beforeEach(async () => { mockSidecarExtract.mockReset(); mockTriggerJobWebhook.mockReset(); - resetJobStore(); + await setJobStoreForTesting(new MemoryQueueStore()); + await resetJobStore(); }); - describe('createJob', () => { - it('creates a job and starts processing', () => { - mockSidecarExtract.mockResolvedValue({ - extractions: [], - metadata: { model_id: 'test', duration_ms: 10, char_count: 5 }, - }); - - const job = createJob([{ text: 'Hello world' }]); - - expect(job.id).toBeDefined(); - expect(['pending', 'processing', 'completed']).toContain(job.status); - expect(job.inputs).toHaveLength(1); - expect(job.results).toEqual([]); - expect(job.errors).toEqual([]); - expect(job.progress).toEqual({ completed: 0, total: 1 }); - expect(job.createdAt).toBeDefined(); + it('creates a durable job and processes it', async () => { + mockSidecarExtract.mockResolvedValue({ + extractions: [{ extraction_class: 'person', extraction_text: 'John' }], + metadata: { model_id: 'gemini', duration_ms: 100, char_count: 20 }, }); - it('sets correct total from inputs length', () => { - mockSidecarExtract.mockResolvedValue({ - extractions: [], - metadata: { model_id: 'test', duration_ms: 10, char_count: 5 }, - }); + const job = await createJob([{ text: 'Meet John at 3pm' }]); + expect(['pending', 'processing', 'completed']).toContain(job.status); - const job = createJob([{ text: 'First' }, { text: 'Second' }, { text: 'Third' }]); - - expect(job.progress.total).toBe(3); - }); - - it('generates unique IDs for each job', () => { - mockSidecarExtract.mockResolvedValue({ - extractions: [], - metadata: { model_id: 'test', duration_ms: 10, char_count: 5 }, - }); - - const job1 = createJob([{ text: 'A' }]); - const job2 = createJob([{ text: 'B' }]); - - expect(job1.id).not.toBe(job2.id); - }); - - it('stores the job for later retrieval', () => { - mockSidecarExtract.mockResolvedValue({ - extractions: [], - metadata: { model_id: 'test', duration_ms: 10, char_count: 5 }, - }); - - const job = createJob([{ text: 'Stored' }]); - const retrieved = getJob(job.id); - - expect(retrieved).toBeDefined(); - expect(retrieved!.id).toBe(job.id); - }); - - it('processes job and marks as completed on success', async () => { - mockSidecarExtract.mockResolvedValue({ - extractions: [{ extraction_class: 'person', extraction_text: 'John' }], - metadata: { model_id: 'gemini', duration_ms: 100, char_count: 20 }, - }); - - const job = createJob([{ text: 'Meet John at 3pm' }]); - - // Wait for background processing - await vi.waitFor( - () => { - const j = getJob(job.id); - expect(j!.status).toBe('completed'); - }, - { timeout: 2000 } - ); - - const completed = getJob(job.id)!; - expect(completed.results).toHaveLength(1); - expect(completed.results[0].extractions[0].extraction_text).toBe('John'); - expect(completed.progress.completed).toBe(1); - expect(completed.completedAt).toBeDefined(); - }); - - it('records errors for failed extractions', async () => { - mockSidecarExtract.mockRejectedValue(new Error('Sidecar unavailable')); - - const job = createJob([{ text: 'Will fail' }]); - - await vi.waitFor( - () => { - const j = getJob(job.id); - expect(j!.status).not.toBe('pending'); - expect(j!.status).not.toBe('processing'); - }, - { timeout: 2000 } - ); - - const finished = getJob(job.id)!; - expect(finished.errors).toHaveLength(1); - expect(finished.errors[0].index).toBe(0); - expect(finished.errors[0].error).toBe('Sidecar unavailable'); - expect(finished.status).toBe('failed'); - }); - - it('handles mixed success/failure in batch', async () => { - mockSidecarExtract - .mockResolvedValueOnce({ - extractions: [{ extraction_class: 'person', extraction_text: 'Alice' }], - metadata: { model_id: 'test', duration_ms: 50, char_count: 10 }, - }) - .mockRejectedValueOnce(new Error('timeout')) - .mockResolvedValueOnce({ - extractions: [], - metadata: { model_id: 'test', duration_ms: 30, char_count: 5 }, - }); - - const job = createJob([ - { text: 'Alice is here' }, - { text: 'Will timeout' }, - { text: 'Empty result' }, - ]); - - await vi.waitFor( - () => { - const j = getJob(job.id); - expect(j!.progress.completed).toBe(3); - }, - { timeout: 2000 } - ); - - const finished = getJob(job.id)!; - expect(finished.status).toBe('completed'); // Not all failed - expect(finished.errors).toHaveLength(1); - expect(finished.errors[0].index).toBe(1); - expect(finished.results).toHaveLength(3); // Placeholder for failed one + await vi.waitFor(async () => { + const completed = await getJob(job.id); + expect(completed?.status).toBe('completed'); + expect(completed?.results).toHaveLength(1); + expect(completed?.results[0].extractions[0].extraction_text).toBe('John'); }); }); - describe('getJob', () => { - it('returns undefined for unknown job ID', () => { - expect(getJob('nonexistent-id')).toBeUndefined(); + it('marks the job failed when every input errors', async () => { + mockSidecarExtract.mockRejectedValue(new Error('Sidecar unavailable')); + + const job = await createJob([{ text: 'fail me' }]); + + await vi.waitFor(async () => { + const finished = await getJob(job.id); + expect(finished?.status).toBe('failed'); + expect(finished?.errors[0].error).toBe('Sidecar unavailable'); }); }); - describe('listJobs', () => { - it('returns jobs sorted by creation date (newest first)', () => { - mockSidecarExtract.mockResolvedValue({ - extractions: [], - metadata: { model_id: 'test', duration_ms: 10, char_count: 5 }, - }); - - createJob([{ text: 'First' }]); - createJob([{ text: 'Second' }]); - createJob([{ text: 'Third' }]); - - const jobs = listJobs(); - expect(jobs.length).toBeGreaterThanOrEqual(3); - - // Verify sorted newest first - for (let i = 0; i < jobs.length - 1; i++) { - expect(jobs[i].createdAt >= jobs[i + 1].createdAt).toBe(true); - } + it('lists jobs newest first', async () => { + mockSidecarExtract.mockResolvedValue({ + extractions: [], + metadata: { model_id: 'test', duration_ms: 10, char_count: 5 }, }); - it('respects limit parameter', () => { - mockSidecarExtract.mockResolvedValue({ - extractions: [], - metadata: { model_id: 'test', duration_ms: 10, char_count: 5 }, - }); + const first = await createJob([{ text: 'First' }]); + await new Promise(resolve => globalThis.setTimeout(resolve, 10)); + const second = await createJob([{ text: 'Second' }]); - for (let i = 0; i < 5; i++) { - createJob([{ text: `Job ${i}` }]); - } - - const limited = listJobs(2); - expect(limited.length).toBe(2); + await vi.waitFor(async () => { + const jobs = await listJobs(); + expect(jobs.length).toBeGreaterThanOrEqual(2); }); - it('defaults to limit of 50', () => { - const jobs = listJobs(); - expect(jobs.length).toBeLessThanOrEqual(50); - }); + const jobs = await listJobs(); + expect(jobs[0].id).toBe(second.id); + expect(jobs.some(job => job.id === first.id)).toBe(true); }); - describe('webhook integration', () => { - it('stores webhook config on job creation', () => { - mockSidecarExtract.mockResolvedValue({ - extractions: [], - metadata: { model_id: 'test', duration_ms: 10, char_count: 5 }, - }); + it('stores webhook config and triggers webhook on completion', async () => { + mockSidecarExtract.mockResolvedValue({ + extractions: [], + metadata: { model_id: 'test', duration_ms: 10, char_count: 5 }, + }); + mockTriggerJobWebhook.mockResolvedValue(undefined); - const webhookConfig: WebhookConfig = { - url: 'https://example.com/webhook', - secret: 'secret', - retryAttempts: 3, - }; + const webhookConfig: WebhookConfig = { + url: 'https://example.com/webhook', + secret: 'secret', + retryAttempts: 3, + }; - const job = createJob([{ text: 'test' }], 'req-123', webhookConfig); + const job = await createJob([{ text: 'test' }], 'req-123', webhookConfig); + expect(job.webhookConfig).toEqual(webhookConfig); - expect(job.webhookConfig).toEqual(webhookConfig); + await vi.waitFor(async () => { + const finished = await getJob(job.id); + expect(finished?.status).toBe('completed'); }); - it('triggers webhook on job completion', async () => { - mockSidecarExtract.mockResolvedValue({ - extractions: [{ extraction_class: 'test', extraction_text: 'result' }], - metadata: { model_id: 'gemini', duration_ms: 100, char_count: 10 }, - }); - mockTriggerJobWebhook.mockResolvedValue(undefined); - - const webhookConfig: WebhookConfig = { - url: 'https://example.com/webhook', - secret: 'secret', - }; - - const job = createJob([{ text: 'test' }], 'req-123', webhookConfig); - - // Wait for background processing - await vi.waitFor( - () => { - const j = getJob(job.id); - expect(j!.status).toBe('completed'); - }, - { timeout: 2000 } - ); - - expect(mockTriggerJobWebhook).toHaveBeenCalledWith( - expect.objectContaining({ id: job.id, status: 'completed' }), - webhookConfig - ); - }); - - it('does not fail job if webhook fails', async () => { - mockSidecarExtract.mockResolvedValue({ - extractions: [], - metadata: { model_id: 'test', duration_ms: 10, char_count: 5 }, - }); - mockTriggerJobWebhook.mockRejectedValue(new Error('Webhook failed')); - - const webhookConfig: WebhookConfig = { - url: 'https://example.com/webhook', - secret: 'secret', - }; - - const job = createJob([{ text: 'test' }], 'req-123', webhookConfig); - - // Wait for background processing - await vi.waitFor( - () => { - const j = getJob(job.id); - expect(j!.status).toBe('completed'); - }, - { timeout: 2000 } - ); - - // Job should still be completed even if webhook failed - const completed = getJob(job.id)!; - expect(completed.status).toBe('completed'); - }); - - it('does not trigger webhook if not configured', async () => { - mockSidecarExtract.mockResolvedValue({ - extractions: [], - metadata: { model_id: 'test', duration_ms: 10, char_count: 5 }, - }); - - const job = createJob([{ text: 'test' }]); - - // Wait for background processing - await vi.waitFor( - () => { - const j = getJob(job.id); - expect(j!.status).toBe('completed'); - }, - { timeout: 2000 } - ); - - expect(mockTriggerJobWebhook).not.toHaveBeenCalled(); - }); + expect(mockTriggerJobWebhook).toHaveBeenCalledWith( + expect.objectContaining({ id: job.id, status: 'completed' }), + webhookConfig + ); }); }); diff --git a/services/extraction-service/src/modules/extract/jobs.ts b/services/extraction-service/src/modules/extract/jobs.ts index 80b04bd3..e8c912cb 100644 --- a/services/extraction-service/src/modules/extract/jobs.ts +++ b/services/extraction-service/src/modules/extract/jobs.ts @@ -1,14 +1,20 @@ /** - * Async extraction job queue. + * Durable async extraction job queue. * - * For large batch requests, callers can submit an async job and poll for results. - * Jobs are stored in-memory (Cosmos persistence deferred to Phase 7). - * - * Flow: POST /extract/jobs → { jobId } → GET /extract/jobs/:id → { status, results } - * Optional: webhook callback on completion + * Jobs are stored in a pluggable queue store and processed by a background + * worker. File-backed storage is the default for single-instance dev/staging + * deployments; tests can swap in the memory store. */ -import { randomUUID } from 'node:crypto'; +import { mkdir } from 'node:fs/promises'; +import { dirname, resolve } from 'node:path'; +import { + FileQueueStore, + MemoryQueueStore, + QueueWorker, + type QueueJob, + type QueueStore, +} from '@bytelyst/queue'; import { sidecarExtract, type SidecarExtractRequest, @@ -18,6 +24,20 @@ import { triggerJobWebhook, type WebhookConfig } from './webhooks.js'; export type JobStatus = 'pending' | 'processing' | 'completed' | 'failed'; +export interface ExtractionJobPayload { + inputs: SidecarExtractRequest[]; + requestId?: string; + webhookConfig?: WebhookConfig; +} + +export interface ExtractionJobResult { + processingStatus: 'completed' | 'failed'; + results: SidecarExtractResponse[]; + errors: Array<{ index: number; error: string }>; + progress: { completed: number; total: number }; + completedAt: string; +} + export interface ExtractionJob { id: string; status: JobStatus; @@ -28,112 +48,200 @@ export interface ExtractionJob { createdAt: string; completedAt?: string; webhookConfig?: WebhookConfig; + attempts: number; + lastError?: string; } -const jobStore = new Map(); -const MAX_JOBS = 1000; // Prevent unbounded memory growth +const QUEUE_NAME = 'extraction-jobs'; +let queueStore: QueueStore | undefined; +let queueWorker: QueueWorker | undefined; +let queueInitialized = false; -/** - * Cleanup old jobs to prevent memory leak. - * Keeps most recent jobs, removes oldest completed/failed first. - */ -function cleanupOldJobs(): void { - if (jobStore.size <= MAX_JOBS) return; +function getQueueBackend(): 'memory' | 'file' { + const backend = process.env.EXTRACTION_QUEUE_BACKEND; + if (backend === 'memory' || backend === 'file') return backend; + return process.env.NODE_ENV === 'test' ? 'memory' : 'file'; +} - const allJobs = [...jobStore.values()].sort((a, b) => a.createdAt.localeCompare(b.createdAt)); +function getQueueFilePath(): string { + return resolve(process.cwd(), process.env.EXTRACTION_QUEUE_FILE || '.data/extraction-jobs.json'); +} - const toRemove = allJobs.slice(0, allJobs.length - MAX_JOBS); - for (const job of toRemove) { - // Only remove completed or failed jobs - if (job.status === 'completed' || job.status === 'failed') { - jobStore.delete(job.id); - } +async function resolveQueueStore(): Promise { + if (queueStore) return queueStore; + + if (getQueueBackend() === 'memory') { + queueStore = new MemoryQueueStore(); + return queueStore; } + + const filePath = getQueueFilePath(); + await mkdir(dirname(filePath), { recursive: true }); + queueStore = new FileQueueStore({ filePath }); + return queueStore; } -/** - * Create a new async extraction job and start processing in background. - */ -export function createJob( - inputs: SidecarExtractRequest[], - requestId?: string, - webhookConfig?: WebhookConfig -): ExtractionJob { - const job: ExtractionJob = { - id: randomUUID(), - status: 'pending', - inputs, - results: [], - errors: [], - progress: { completed: 0, total: inputs.length }, - createdAt: new Date().toISOString(), - webhookConfig, +function mapQueueJob(job: QueueJob): ExtractionJob { + const total = job.payload.inputs.length; + const result = job.result; + const progress = (job.progress as ExtractionJob['progress'] | undefined) ?? + result?.progress ?? { + completed: 0, + total, + }; + + return { + id: job.id, + status: + job.status === 'queued' + ? 'pending' + : job.status === 'running' + ? 'processing' + : job.status === 'succeeded' + ? (result?.processingStatus ?? 'completed') + : 'failed', + inputs: job.payload.inputs, + results: result?.results ?? [], + errors: result?.errors ?? [], + progress, + createdAt: job.createdAt, + completedAt: result?.completedAt ?? job.completedAt, + webhookConfig: job.payload.webhookConfig, + attempts: job.attempts, + lastError: job.lastError, }; +} - jobStore.set(job.id, job); +async function processExtractionJob( + job: QueueJob, + context: { + patch: (patch: Partial>) => Promise; + heartbeat: () => Promise; + } +): Promise { + const results: SidecarExtractResponse[] = []; + const errors: Array<{ index: number; error: string }> = []; + const total = job.payload.inputs.length; - // Cleanup old jobs to prevent memory leak - cleanupOldJobs(); - - // Start processing in background (non-blocking) - processJob(job, requestId).catch(() => { - job.status = 'failed'; + await context.patch({ + progress: { completed: 0, total }, }); - return job; -} - -/** - * Get job by ID. - */ -export function getJob(jobId: string): ExtractionJob | undefined { - return jobStore.get(jobId); -} - -/** - * List recent jobs (last 50). - */ -export function listJobs(limit = 50): ExtractionJob[] { - return [...jobStore.values()] - .sort((a, b) => b.createdAt.localeCompare(a.createdAt)) - .slice(0, limit); -} - -/** - * Reset job store (for testing). - */ -export function resetJobStore(): void { - jobStore.clear(); -} - -// ── Internal ───────────────────────────────────────────────────── - -async function processJob(job: ExtractionJob, requestId?: string): Promise { - job.status = 'processing'; - - for (let i = 0; i < job.inputs.length; i++) { + for (let i = 0; i < total; i++) { try { - const result = await sidecarExtract(job.inputs[i], requestId); - job.results.push(result); + const result = await sidecarExtract(job.payload.inputs[i], job.payload.requestId); + results.push(result); } catch (err) { const message = err instanceof Error ? err.message : 'Unknown error'; - job.errors.push({ index: i, error: message }); - // Push a placeholder so indices align - job.results.push({ + errors.push({ index: i, error: message }); + results.push({ extractions: [], metadata: { model_id: 'error', duration_ms: 0, char_count: 0 }, }); } - job.progress.completed = i + 1; + + await context.patch({ + progress: { completed: i + 1, total }, + }); + await context.heartbeat(); } - job.status = job.errors.length === job.inputs.length ? 'failed' : 'completed'; - job.completedAt = new Date().toISOString(); + const completedAt = new Date().toISOString(); + const result: ExtractionJobResult = { + processingStatus: errors.length === total ? 'failed' : 'completed', + results, + errors, + progress: { completed: total, total }, + completedAt, + }; - // Trigger webhook if configured - if (job.webhookConfig) { - await triggerJobWebhook(job, job.webhookConfig).catch(() => { - // Webhook failures don't affect job status + if (job.payload.webhookConfig) { + const view: ExtractionJob = { + ...mapQueueJob({ + ...job, + status: 'succeeded', + result, + completedAt, + }), + completedAt, + }; + await triggerJobWebhook(view, job.payload.webhookConfig).catch(() => { + // Webhook failures are non-fatal to job processing. }); } + + return result; +} + +export async function initJobQueue( + log: { info: (...args: unknown[]) => void } = { info: () => {} } +): Promise { + if (queueInitialized) return; + + const store = await resolveQueueStore(); + queueWorker = new QueueWorker({ + queueName: QUEUE_NAME, + store, + handler: processExtractionJob, + pollIntervalMs: parseInt(process.env.EXTRACTION_QUEUE_POLL_MS || '100', 10), + leaseMs: parseInt(process.env.EXTRACTION_QUEUE_LEASE_MS || '30000', 10), + }); + queueWorker.start(); + queueInitialized = true; + log.info({ backend: getQueueBackend() }, '[extract/jobs] queue worker started'); +} + +export async function shutdownJobQueue(): Promise { + if (queueWorker) { + await queueWorker.stop(); + } + queueWorker = undefined; + queueInitialized = false; +} + +export async function createJob( + inputs: SidecarExtractRequest[], + requestId?: string, + webhookConfig?: WebhookConfig +): Promise { + await initJobQueue(); + const store = await resolveQueueStore(); + const job = await store.enqueue(QUEUE_NAME, { + type: 'extract.batch', + payload: { + inputs, + requestId, + webhookConfig, + }, + progress: { + completed: 0, + total: inputs.length, + }, + productId: process.env.DEFAULT_PRODUCT_ID, + }); + return mapQueueJob(job); +} + +export async function getJob(jobId: string): Promise { + const store = await resolveQueueStore(); + const job = await store.get(QUEUE_NAME, jobId); + return job ? mapQueueJob(job) : undefined; +} + +export async function listJobs(limit = 50): Promise { + const store = await resolveQueueStore(); + const jobs = await store.list(QUEUE_NAME, { limit }); + return jobs.map(mapQueueJob); +} + +export async function resetJobStore(): Promise { + await shutdownJobQueue(); + if (queueStore) { + await queueStore.clear(QUEUE_NAME); + } +} + +export async function setJobStoreForTesting(store?: QueueStore): Promise { + await shutdownJobQueue(); + queueStore = store; } diff --git a/services/extraction-service/src/modules/extract/routes.ts b/services/extraction-service/src/modules/extract/routes.ts index 3bfbdb41..db368dca 100644 --- a/services/extraction-service/src/modules/extract/routes.ts +++ b/services/extraction-service/src/modules/extract/routes.ts @@ -13,7 +13,7 @@ import { BadRequestError } from '../../lib/errors.js'; import { checkQuota, incrementUsage, getUsageSummary } from './usage.js'; import { recordExtraction, getMetricsSummary } from '../../lib/metrics.js'; import { sidecarBreaker } from '../../lib/circuit-breaker.js'; -import { createJob, getJob, listJobs } from './jobs.js'; +import { createJob, getJob, initJobQueue, listJobs, shutdownJobQueue } from './jobs.js'; import { checkProductRateLimit, getProductRateLimitStatus, @@ -103,6 +103,10 @@ const MODEL_REGISTRY = [ export async function extractRoutes(app: FastifyInstance) { // Start sidecar health monitoring startHealthMonitoring(); + await initJobQueue(app.log); + app.addHook('onClose', async () => { + await shutdownJobQueue(); + }); // Rate limiting for extraction endpoints — 30 req/min per IP (configurable) await app.register(rateLimit, { @@ -401,7 +405,7 @@ export async function extractRoutes(app: FastifyInstance) { }; } - const job = createJob(sidecarRequests, requestId, webhookConfig); + const job = await createJob(sidecarRequests, requestId, webhookConfig); req.log.info({ jobId: job.id, inputCount: inputs.length }, 'async job created'); return reply.status(202).send({ @@ -418,7 +422,7 @@ export async function extractRoutes(app: FastifyInstance) { */ app.get('/extract/jobs/:id', async (req, reply) => { const { id } = req.params as { id: string }; - const job = getJob(id); + const job = await getJob(id); if (!job) { return reply.status(404).send({ error: 'Job not found' }); } @@ -448,7 +452,7 @@ export async function extractRoutes(app: FastifyInstance) { * GET /extract/jobs — List recent async jobs. */ app.get('/extract/jobs', async (_req, reply) => { - const jobs = listJobs(); + const jobs = await listJobs(); return reply.send({ jobs: jobs.map(j => ({ id: j.id,