101 lines
3.1 KiB
TypeScript
101 lines
3.1 KiB
TypeScript
import { randomUUID } from 'node:crypto';
|
|
import type { QueueWorkerOptions, WorkerContext } from './types.js';
|
|
|
|
export class QueueWorker<TPayload = unknown, TResult = unknown> {
|
|
private readonly queueName: string;
|
|
private readonly store: QueueWorkerOptions<TPayload, TResult>['store'];
|
|
private readonly handler: QueueWorkerOptions<TPayload, TResult>['handler'];
|
|
private readonly workerId: string;
|
|
private readonly pollIntervalMs: number;
|
|
private readonly leaseMs: number;
|
|
private readonly backoffMs: number;
|
|
private timer?: ReturnType<typeof setTimeout>;
|
|
private running = false;
|
|
private inflight?: Promise<void>;
|
|
|
|
constructor(options: QueueWorkerOptions<TPayload, TResult>) {
|
|
this.queueName = options.queueName;
|
|
this.store = options.store;
|
|
this.handler = options.handler;
|
|
this.workerId = options.workerId || `worker_${randomUUID()}`;
|
|
this.pollIntervalMs = options.pollIntervalMs ?? 200;
|
|
this.leaseMs = options.leaseMs ?? 30_000;
|
|
this.backoffMs = options.backoffMs ?? 1_000;
|
|
}
|
|
|
|
start(): void {
|
|
if (this.running) return;
|
|
this.running = true;
|
|
this.schedule(0);
|
|
}
|
|
|
|
async stop(): Promise<void> {
|
|
this.running = false;
|
|
if (this.timer) {
|
|
clearTimeout(this.timer);
|
|
this.timer = undefined;
|
|
}
|
|
await this.inflight;
|
|
}
|
|
|
|
private schedule(delayMs: number): void {
|
|
if (!this.running) return;
|
|
this.timer = globalThis.setTimeout(() => {
|
|
this.inflight = this.tick().finally(() => {
|
|
this.inflight = undefined;
|
|
});
|
|
}, delayMs);
|
|
}
|
|
|
|
private async tick(): Promise<void> {
|
|
const job = await this.store.claimNext<TPayload, TResult>(
|
|
this.queueName,
|
|
this.workerId,
|
|
this.leaseMs
|
|
);
|
|
if (!job) {
|
|
this.schedule(this.pollIntervalMs);
|
|
return;
|
|
}
|
|
|
|
const context: WorkerContext<TPayload, TResult> = {
|
|
patch: async patch => {
|
|
await this.store.patch<TPayload, TResult>(this.queueName, job.id, patch);
|
|
},
|
|
heartbeat: async () => {
|
|
await this.store.patch<TPayload, TResult>(this.queueName, job.id, {
|
|
leaseOwner: this.workerId,
|
|
leaseExpiresAt: new Date(Date.now() + this.leaseMs).toISOString(),
|
|
});
|
|
},
|
|
};
|
|
|
|
try {
|
|
const result = await this.handler(job, context);
|
|
await this.store.patch<TPayload, TResult>(this.queueName, job.id, {
|
|
status: 'succeeded',
|
|
result,
|
|
completedAt: new Date().toISOString(),
|
|
leaseOwner: undefined,
|
|
leaseExpiresAt: undefined,
|
|
});
|
|
} catch (err: unknown) {
|
|
const lastError = err instanceof Error ? err.message : String(err);
|
|
const finalStatus = job.attempts >= job.maxAttempts ? 'dead_letter' : 'queued';
|
|
await this.store.patch<TPayload, TResult>(this.queueName, job.id, {
|
|
status: finalStatus,
|
|
lastError,
|
|
scheduledAt:
|
|
finalStatus === 'queued'
|
|
? new Date(Date.now() + this.backoffMs * job.attempts).toISOString()
|
|
: job.scheduledAt,
|
|
completedAt: finalStatus === 'dead_letter' ? new Date().toISOString() : undefined,
|
|
leaseOwner: undefined,
|
|
leaseExpiresAt: undefined,
|
|
});
|
|
}
|
|
|
|
this.schedule(0);
|
|
}
|
|
}
|