learning_ai_common_plat/packages/queue/src/file-store.ts

174 lines
5.4 KiB
TypeScript

import { randomUUID } from 'node:crypto';
import { mkdir, readFile, rename, writeFile } from 'node:fs/promises';
import { dirname } from 'node:path';
import type { EnqueueJobInput, ListJobsOptions, QueueJob, QueueStore } from './types.js';
export interface FileQueueStoreOptions {
filePath: string;
}
export class FileQueueStore implements QueueStore {
private readonly filePath: string;
private operation = Promise.resolve();
constructor(options: FileQueueStoreOptions) {
this.filePath = options.filePath;
}
enqueue<TPayload = unknown>(
queueName: string,
input: EnqueueJobInput<TPayload>
): Promise<QueueJob<TPayload>> {
return this.withLock(async () => {
const state = await this.readState();
const queue = state[queueName] ?? [];
if (input.idempotencyKey) {
const existing = queue.find(job => job.idempotencyKey === input.idempotencyKey);
if (existing) return existing as QueueJob<TPayload>;
}
const now = new Date();
const job: QueueJob<TPayload> = {
id: input.id || randomUUID(),
queueName,
type: input.type,
payload: input.payload,
status: 'queued',
attempts: 0,
maxAttempts: input.maxAttempts ?? 3,
createdAt: now.toISOString(),
scheduledAt: new Date(now.getTime() + (input.delayMs ?? 0)).toISOString(),
progress: input.progress,
metadata: input.metadata,
idempotencyKey: input.idempotencyKey,
productId: input.productId,
userId: input.userId,
};
state[queueName] = [...queue, job];
await this.writeState(state);
return job;
});
}
get<TPayload = unknown, TResult = unknown>(
queueName: string,
id: string
): Promise<QueueJob<TPayload, TResult> | undefined> {
return this.withLock(async () => {
const state = await this.readState();
const job = (state[queueName] ?? []).find(item => item.id === id);
return job as QueueJob<TPayload, TResult> | undefined;
});
}
list<TPayload = unknown, TResult = unknown>(
queueName: string,
options?: ListJobsOptions
): Promise<Array<QueueJob<TPayload, TResult>>> {
return this.withLock(async () => {
const state = await this.readState();
const jobs = (state[queueName] ?? [])
.filter(job => !options?.status || job.status === options.status)
.sort((a, b) => b.createdAt.localeCompare(a.createdAt))
.slice(0, options?.limit ?? 50);
return jobs as Array<QueueJob<TPayload, TResult>>;
});
}
claimNext<TPayload = unknown, TResult = unknown>(
queueName: string,
workerId: string,
leaseMs: number,
now = new Date()
): Promise<QueueJob<TPayload, TResult> | undefined> {
return this.withLock(async () => {
const state = await this.readState();
const queue = state[queueName] ?? [];
const next = queue
.filter(job => this.isClaimable(job, now))
.sort(
(a, b) =>
a.scheduledAt.localeCompare(b.scheduledAt) || a.createdAt.localeCompare(b.createdAt)
)[0];
if (!next) return undefined;
next.status = 'running';
next.attempts += 1;
next.startedAt = next.startedAt || now.toISOString();
next.leaseOwner = workerId;
next.leaseExpiresAt = new Date(now.getTime() + leaseMs).toISOString();
await this.writeState(state);
return next as QueueJob<TPayload, TResult>;
});
}
patch<TPayload = unknown, TResult = unknown>(
queueName: string,
id: string,
patch: Partial<QueueJob<TPayload, TResult>>
): Promise<QueueJob<TPayload, TResult> | undefined> {
return this.withLock(async () => {
const state = await this.readState();
const queue = state[queueName] ?? [];
const index = queue.findIndex(job => job.id === id);
if (index === -1) return undefined;
const merged = {
...queue[index],
...patch,
};
queue[index] = merged;
state[queueName] = queue;
await this.writeState(state);
return merged as QueueJob<TPayload, TResult>;
});
}
clear(queueName?: string): Promise<void> {
return this.withLock(async () => {
if (!queueName) {
await this.writeState({});
return;
}
const state = await this.readState();
delete state[queueName];
await this.writeState(state);
});
}
private async withLock<T>(fn: () => Promise<T>): Promise<T> {
const run = this.operation.then(fn, fn);
this.operation = run.then(
() => undefined,
() => undefined
);
return run;
}
private async readState(): Promise<Record<string, QueueJob[]>> {
try {
const raw = await readFile(this.filePath, 'utf-8');
return JSON.parse(raw) as Record<string, QueueJob[]>;
} catch {
return {};
}
}
private async writeState(state: Record<string, QueueJob[]>): Promise<void> {
await mkdir(dirname(this.filePath), { recursive: true });
const tempPath = `${this.filePath}.${randomUUID()}.tmp`;
await writeFile(tempPath, JSON.stringify(state, null, 2), 'utf-8');
await rename(tempPath, this.filePath);
}
private isClaimable(job: QueueJob, now: Date): boolean {
if (job.status === 'queued') {
return job.scheduledAt <= now.toISOString();
}
if (job.status === 'running') {
return !job.leaseExpiresAt || job.leaseExpiresAt <= now.toISOString();
}
return false;
}
}