learning_ai_common_plat/packages/queue/src/worker.ts

101 lines
3.1 KiB
TypeScript

import { randomUUID } from 'node:crypto';
import type { QueueWorkerOptions, WorkerContext } from './types.js';
export class QueueWorker<TPayload = unknown, TResult = unknown> {
private readonly queueName: string;
private readonly store: QueueWorkerOptions<TPayload, TResult>['store'];
private readonly handler: QueueWorkerOptions<TPayload, TResult>['handler'];
private readonly workerId: string;
private readonly pollIntervalMs: number;
private readonly leaseMs: number;
private readonly backoffMs: number;
private timer?: ReturnType<typeof setTimeout>;
private running = false;
private inflight?: Promise<void>;
constructor(options: QueueWorkerOptions<TPayload, TResult>) {
this.queueName = options.queueName;
this.store = options.store;
this.handler = options.handler;
this.workerId = options.workerId || `worker_${randomUUID()}`;
this.pollIntervalMs = options.pollIntervalMs ?? 200;
this.leaseMs = options.leaseMs ?? 30_000;
this.backoffMs = options.backoffMs ?? 1_000;
}
start(): void {
if (this.running) return;
this.running = true;
this.schedule(0);
}
async stop(): Promise<void> {
this.running = false;
if (this.timer) {
clearTimeout(this.timer);
this.timer = undefined;
}
await this.inflight;
}
private schedule(delayMs: number): void {
if (!this.running) return;
this.timer = globalThis.setTimeout(() => {
this.inflight = this.tick().finally(() => {
this.inflight = undefined;
});
}, delayMs);
}
private async tick(): Promise<void> {
const job = await this.store.claimNext<TPayload, TResult>(
this.queueName,
this.workerId,
this.leaseMs
);
if (!job) {
this.schedule(this.pollIntervalMs);
return;
}
const context: WorkerContext<TPayload, TResult> = {
patch: async patch => {
await this.store.patch<TPayload, TResult>(this.queueName, job.id, patch);
},
heartbeat: async () => {
await this.store.patch<TPayload, TResult>(this.queueName, job.id, {
leaseOwner: this.workerId,
leaseExpiresAt: new Date(Date.now() + this.leaseMs).toISOString(),
});
},
};
try {
const result = await this.handler(job, context);
await this.store.patch<TPayload, TResult>(this.queueName, job.id, {
status: 'succeeded',
result,
completedAt: new Date().toISOString(),
leaseOwner: undefined,
leaseExpiresAt: undefined,
});
} catch (err: unknown) {
const lastError = err instanceof Error ? err.message : String(err);
const finalStatus = job.attempts >= job.maxAttempts ? 'dead_letter' : 'queued';
await this.store.patch<TPayload, TResult>(this.queueName, job.id, {
status: finalStatus,
lastError,
scheduledAt:
finalStatus === 'queued'
? new Date(Date.now() + this.backoffMs * job.attempts).toISOString()
: job.scheduledAt,
completedAt: finalStatus === 'dead_letter' ? new Date().toISOString() : undefined,
leaseOwner: undefined,
leaseExpiresAt: undefined,
});
}
this.schedule(0);
}
}