174 lines
5.4 KiB
TypeScript
174 lines
5.4 KiB
TypeScript
import { randomUUID } from 'node:crypto';
|
|
import { mkdir, readFile, rename, writeFile } from 'node:fs/promises';
|
|
import { dirname } from 'node:path';
|
|
import type { EnqueueJobInput, ListJobsOptions, QueueJob, QueueStore } from './types.js';
|
|
|
|
export interface FileQueueStoreOptions {
|
|
filePath: string;
|
|
}
|
|
|
|
export class FileQueueStore implements QueueStore {
|
|
private readonly filePath: string;
|
|
private operation = Promise.resolve();
|
|
|
|
constructor(options: FileQueueStoreOptions) {
|
|
this.filePath = options.filePath;
|
|
}
|
|
|
|
enqueue<TPayload = unknown>(
|
|
queueName: string,
|
|
input: EnqueueJobInput<TPayload>
|
|
): Promise<QueueJob<TPayload>> {
|
|
return this.withLock(async () => {
|
|
const state = await this.readState();
|
|
const queue = state[queueName] ?? [];
|
|
if (input.idempotencyKey) {
|
|
const existing = queue.find(job => job.idempotencyKey === input.idempotencyKey);
|
|
if (existing) return existing as QueueJob<TPayload>;
|
|
}
|
|
|
|
const now = new Date();
|
|
const job: QueueJob<TPayload> = {
|
|
id: input.id || randomUUID(),
|
|
queueName,
|
|
type: input.type,
|
|
payload: input.payload,
|
|
status: 'queued',
|
|
attempts: 0,
|
|
maxAttempts: input.maxAttempts ?? 3,
|
|
createdAt: now.toISOString(),
|
|
scheduledAt: new Date(now.getTime() + (input.delayMs ?? 0)).toISOString(),
|
|
progress: input.progress,
|
|
metadata: input.metadata,
|
|
idempotencyKey: input.idempotencyKey,
|
|
productId: input.productId,
|
|
userId: input.userId,
|
|
};
|
|
state[queueName] = [...queue, job];
|
|
await this.writeState(state);
|
|
return job;
|
|
});
|
|
}
|
|
|
|
get<TPayload = unknown, TResult = unknown>(
|
|
queueName: string,
|
|
id: string
|
|
): Promise<QueueJob<TPayload, TResult> | undefined> {
|
|
return this.withLock(async () => {
|
|
const state = await this.readState();
|
|
const job = (state[queueName] ?? []).find(item => item.id === id);
|
|
return job as QueueJob<TPayload, TResult> | undefined;
|
|
});
|
|
}
|
|
|
|
list<TPayload = unknown, TResult = unknown>(
|
|
queueName: string,
|
|
options?: ListJobsOptions
|
|
): Promise<Array<QueueJob<TPayload, TResult>>> {
|
|
return this.withLock(async () => {
|
|
const state = await this.readState();
|
|
const jobs = (state[queueName] ?? [])
|
|
.filter(job => !options?.status || job.status === options.status)
|
|
.sort((a, b) => b.createdAt.localeCompare(a.createdAt))
|
|
.slice(0, options?.limit ?? 50);
|
|
return jobs as Array<QueueJob<TPayload, TResult>>;
|
|
});
|
|
}
|
|
|
|
claimNext<TPayload = unknown, TResult = unknown>(
|
|
queueName: string,
|
|
workerId: string,
|
|
leaseMs: number,
|
|
now = new Date()
|
|
): Promise<QueueJob<TPayload, TResult> | undefined> {
|
|
return this.withLock(async () => {
|
|
const state = await this.readState();
|
|
const queue = state[queueName] ?? [];
|
|
const next = queue
|
|
.filter(job => this.isClaimable(job, now))
|
|
.sort(
|
|
(a, b) =>
|
|
a.scheduledAt.localeCompare(b.scheduledAt) || a.createdAt.localeCompare(b.createdAt)
|
|
)[0];
|
|
if (!next) return undefined;
|
|
|
|
next.status = 'running';
|
|
next.attempts += 1;
|
|
next.startedAt = next.startedAt || now.toISOString();
|
|
next.leaseOwner = workerId;
|
|
next.leaseExpiresAt = new Date(now.getTime() + leaseMs).toISOString();
|
|
await this.writeState(state);
|
|
return next as QueueJob<TPayload, TResult>;
|
|
});
|
|
}
|
|
|
|
patch<TPayload = unknown, TResult = unknown>(
|
|
queueName: string,
|
|
id: string,
|
|
patch: Partial<QueueJob<TPayload, TResult>>
|
|
): Promise<QueueJob<TPayload, TResult> | undefined> {
|
|
return this.withLock(async () => {
|
|
const state = await this.readState();
|
|
const queue = state[queueName] ?? [];
|
|
const index = queue.findIndex(job => job.id === id);
|
|
if (index === -1) return undefined;
|
|
|
|
const merged = {
|
|
...queue[index],
|
|
...patch,
|
|
};
|
|
queue[index] = merged;
|
|
state[queueName] = queue;
|
|
await this.writeState(state);
|
|
return merged as QueueJob<TPayload, TResult>;
|
|
});
|
|
}
|
|
|
|
clear(queueName?: string): Promise<void> {
|
|
return this.withLock(async () => {
|
|
if (!queueName) {
|
|
await this.writeState({});
|
|
return;
|
|
}
|
|
const state = await this.readState();
|
|
delete state[queueName];
|
|
await this.writeState(state);
|
|
});
|
|
}
|
|
|
|
private async withLock<T>(fn: () => Promise<T>): Promise<T> {
|
|
const run = this.operation.then(fn, fn);
|
|
this.operation = run.then(
|
|
() => undefined,
|
|
() => undefined
|
|
);
|
|
return run;
|
|
}
|
|
|
|
private async readState(): Promise<Record<string, QueueJob[]>> {
|
|
try {
|
|
const raw = await readFile(this.filePath, 'utf-8');
|
|
return JSON.parse(raw) as Record<string, QueueJob[]>;
|
|
} catch {
|
|
return {};
|
|
}
|
|
}
|
|
|
|
private async writeState(state: Record<string, QueueJob[]>): Promise<void> {
|
|
await mkdir(dirname(this.filePath), { recursive: true });
|
|
const tempPath = `${this.filePath}.${randomUUID()}.tmp`;
|
|
await writeFile(tempPath, JSON.stringify(state, null, 2), 'utf-8');
|
|
await rename(tempPath, this.filePath);
|
|
}
|
|
|
|
private isClaimable(job: QueueJob, now: Date): boolean {
|
|
if (job.status === 'queued') {
|
|
return job.scheduledAt <= now.toISOString();
|
|
}
|
|
if (job.status === 'running') {
|
|
return !job.leaseExpiresAt || job.leaseExpiresAt <= now.toISOString();
|
|
}
|
|
return false;
|
|
}
|
|
}
|