learning_ai_common_plat/packages/queue/src/memory-store.ts

137 lines
4.2 KiB
TypeScript

import { randomUUID } from 'node:crypto';
import type {
EnqueueJobInput,
ListJobsOptions,
QueueJob,
QueueJobStatus,
QueueStore,
} from './types.js';
function cloneValue<T>(value: T): T {
return JSON.parse(JSON.stringify(value)) as T;
}
export class MemoryQueueStore implements QueueStore {
private queues = new Map<string, Map<string, QueueJob>>();
async enqueue<TPayload = unknown>(
queueName: string,
input: EnqueueJobInput<TPayload>
): Promise<QueueJob<TPayload>> {
const queue = this.getQueue(queueName);
if (input.idempotencyKey) {
const existing = [...queue.values()].find(job => job.idempotencyKey === input.idempotencyKey);
if (existing) return cloneValue(existing) as QueueJob<TPayload>;
}
const now = new Date();
const job: QueueJob<TPayload> = {
id: input.id || randomUUID(),
queueName,
type: input.type,
payload: input.payload,
status: 'queued',
attempts: 0,
maxAttempts: input.maxAttempts ?? 3,
createdAt: now.toISOString(),
scheduledAt: new Date(now.getTime() + (input.delayMs ?? 0)).toISOString(),
progress: input.progress,
metadata: input.metadata,
idempotencyKey: input.idempotencyKey,
productId: input.productId,
userId: input.userId,
};
queue.set(job.id, cloneValue(job));
return cloneValue(job);
}
async get<TPayload = unknown, TResult = unknown>(
queueName: string,
id: string
): Promise<QueueJob<TPayload, TResult> | undefined> {
const job = this.getQueue(queueName).get(id);
return job ? (cloneValue(job) as QueueJob<TPayload, TResult>) : undefined;
}
async list<TPayload = unknown, TResult = unknown>(
queueName: string,
options?: ListJobsOptions
): Promise<Array<QueueJob<TPayload, TResult>>> {
const jobs = [...this.getQueue(queueName).values()]
.filter(job => !options?.status || job.status === options.status)
.sort((a, b) => b.createdAt.localeCompare(a.createdAt));
return cloneValue(jobs.slice(0, options?.limit ?? 50)) as Array<QueueJob<TPayload, TResult>>;
}
async claimNext<TPayload = unknown, TResult = unknown>(
queueName: string,
workerId: string,
leaseMs: number,
now = new Date()
): Promise<QueueJob<TPayload, TResult> | undefined> {
const queue = this.getQueue(queueName);
const candidates = [...queue.values()]
.filter(job => this.isClaimable(job, now))
.sort(
(a, b) =>
a.scheduledAt.localeCompare(b.scheduledAt) || a.createdAt.localeCompare(b.createdAt)
);
const next = candidates[0];
if (!next) return undefined;
next.status = 'running';
next.attempts += 1;
next.startedAt = next.startedAt || now.toISOString();
next.leaseOwner = workerId;
next.leaseExpiresAt = new Date(now.getTime() + leaseMs).toISOString();
queue.set(next.id, cloneValue(next));
return cloneValue(next) as QueueJob<TPayload, TResult>;
}
async patch<TPayload = unknown, TResult = unknown>(
queueName: string,
id: string,
patch: Partial<QueueJob<TPayload, TResult>>
): Promise<QueueJob<TPayload, TResult> | undefined> {
const queue = this.getQueue(queueName);
const current = queue.get(id);
if (!current) return undefined;
const merged = {
...current,
...patch,
};
queue.set(id, cloneValue(merged));
return cloneValue(merged) as QueueJob<TPayload, TResult>;
}
async clear(queueName?: string): Promise<void> {
if (queueName) {
this.queues.delete(queueName);
return;
}
this.queues.clear();
}
private getQueue(queueName: string): Map<string, QueueJob> {
if (!this.queues.has(queueName)) {
this.queues.set(queueName, new Map());
}
return this.queues.get(queueName)!;
}
private isClaimable(job: QueueJob, now: Date): boolean {
if (job.status === 'queued') {
return job.scheduledAt <= now.toISOString();
}
if (job.status === 'running') {
return !job.leaseExpiresAt || job.leaseExpiresAt <= now.toISOString();
}
return false;
}
}
export function isTerminalStatus(status: QueueJobStatus): boolean {
return ['succeeded', 'failed', 'dead_letter', 'cancelled'].includes(status);
}