137 lines
4.2 KiB
TypeScript
137 lines
4.2 KiB
TypeScript
import { randomUUID } from 'node:crypto';
|
|
import type {
|
|
EnqueueJobInput,
|
|
ListJobsOptions,
|
|
QueueJob,
|
|
QueueJobStatus,
|
|
QueueStore,
|
|
} from './types.js';
|
|
|
|
function cloneValue<T>(value: T): T {
|
|
return JSON.parse(JSON.stringify(value)) as T;
|
|
}
|
|
|
|
export class MemoryQueueStore implements QueueStore {
|
|
private queues = new Map<string, Map<string, QueueJob>>();
|
|
|
|
async enqueue<TPayload = unknown>(
|
|
queueName: string,
|
|
input: EnqueueJobInput<TPayload>
|
|
): Promise<QueueJob<TPayload>> {
|
|
const queue = this.getQueue(queueName);
|
|
if (input.idempotencyKey) {
|
|
const existing = [...queue.values()].find(job => job.idempotencyKey === input.idempotencyKey);
|
|
if (existing) return cloneValue(existing) as QueueJob<TPayload>;
|
|
}
|
|
|
|
const now = new Date();
|
|
const job: QueueJob<TPayload> = {
|
|
id: input.id || randomUUID(),
|
|
queueName,
|
|
type: input.type,
|
|
payload: input.payload,
|
|
status: 'queued',
|
|
attempts: 0,
|
|
maxAttempts: input.maxAttempts ?? 3,
|
|
createdAt: now.toISOString(),
|
|
scheduledAt: new Date(now.getTime() + (input.delayMs ?? 0)).toISOString(),
|
|
progress: input.progress,
|
|
metadata: input.metadata,
|
|
idempotencyKey: input.idempotencyKey,
|
|
productId: input.productId,
|
|
userId: input.userId,
|
|
};
|
|
queue.set(job.id, cloneValue(job));
|
|
return cloneValue(job);
|
|
}
|
|
|
|
async get<TPayload = unknown, TResult = unknown>(
|
|
queueName: string,
|
|
id: string
|
|
): Promise<QueueJob<TPayload, TResult> | undefined> {
|
|
const job = this.getQueue(queueName).get(id);
|
|
return job ? (cloneValue(job) as QueueJob<TPayload, TResult>) : undefined;
|
|
}
|
|
|
|
async list<TPayload = unknown, TResult = unknown>(
|
|
queueName: string,
|
|
options?: ListJobsOptions
|
|
): Promise<Array<QueueJob<TPayload, TResult>>> {
|
|
const jobs = [...this.getQueue(queueName).values()]
|
|
.filter(job => !options?.status || job.status === options.status)
|
|
.sort((a, b) => b.createdAt.localeCompare(a.createdAt));
|
|
return cloneValue(jobs.slice(0, options?.limit ?? 50)) as Array<QueueJob<TPayload, TResult>>;
|
|
}
|
|
|
|
async claimNext<TPayload = unknown, TResult = unknown>(
|
|
queueName: string,
|
|
workerId: string,
|
|
leaseMs: number,
|
|
now = new Date()
|
|
): Promise<QueueJob<TPayload, TResult> | undefined> {
|
|
const queue = this.getQueue(queueName);
|
|
const candidates = [...queue.values()]
|
|
.filter(job => this.isClaimable(job, now))
|
|
.sort(
|
|
(a, b) =>
|
|
a.scheduledAt.localeCompare(b.scheduledAt) || a.createdAt.localeCompare(b.createdAt)
|
|
);
|
|
const next = candidates[0];
|
|
if (!next) return undefined;
|
|
|
|
next.status = 'running';
|
|
next.attempts += 1;
|
|
next.startedAt = next.startedAt || now.toISOString();
|
|
next.leaseOwner = workerId;
|
|
next.leaseExpiresAt = new Date(now.getTime() + leaseMs).toISOString();
|
|
queue.set(next.id, cloneValue(next));
|
|
return cloneValue(next) as QueueJob<TPayload, TResult>;
|
|
}
|
|
|
|
async patch<TPayload = unknown, TResult = unknown>(
|
|
queueName: string,
|
|
id: string,
|
|
patch: Partial<QueueJob<TPayload, TResult>>
|
|
): Promise<QueueJob<TPayload, TResult> | undefined> {
|
|
const queue = this.getQueue(queueName);
|
|
const current = queue.get(id);
|
|
if (!current) return undefined;
|
|
|
|
const merged = {
|
|
...current,
|
|
...patch,
|
|
};
|
|
queue.set(id, cloneValue(merged));
|
|
return cloneValue(merged) as QueueJob<TPayload, TResult>;
|
|
}
|
|
|
|
async clear(queueName?: string): Promise<void> {
|
|
if (queueName) {
|
|
this.queues.delete(queueName);
|
|
return;
|
|
}
|
|
this.queues.clear();
|
|
}
|
|
|
|
private getQueue(queueName: string): Map<string, QueueJob> {
|
|
if (!this.queues.has(queueName)) {
|
|
this.queues.set(queueName, new Map());
|
|
}
|
|
return this.queues.get(queueName)!;
|
|
}
|
|
|
|
private isClaimable(job: QueueJob, now: Date): boolean {
|
|
if (job.status === 'queued') {
|
|
return job.scheduledAt <= now.toISOString();
|
|
}
|
|
if (job.status === 'running') {
|
|
return !job.leaseExpiresAt || job.leaseExpiresAt <= now.toISOString();
|
|
}
|
|
return false;
|
|
}
|
|
}
|
|
|
|
export function isTerminalStatus(status: QueueJobStatus): boolean {
|
|
return ['succeeded', 'failed', 'dead_letter', 'cancelled'].includes(status);
|
|
}
|