learning_ai_common_plat/packages/queue/src/queue.test.ts

71 lines
2.1 KiB
TypeScript

import { mkdtemp, readFile } from 'node:fs/promises';
import { tmpdir } from 'node:os';
import { join } from 'node:path';
import { describe, expect, it, vi } from 'vitest';
import { FileQueueStore } from './file-store.js';
import { MemoryQueueStore } from './memory-store.js';
import { QueueWorker } from './worker.js';
describe('MemoryQueueStore', () => {
it('enqueues and retrieves jobs', async () => {
const store = new MemoryQueueStore();
const job = await store.enqueue('test', {
type: 'demo',
payload: { value: 1 },
});
const found = await store.get('test', job.id);
expect(found?.payload).toEqual({ value: 1 });
expect(found?.status).toBe('queued');
});
});
describe('FileQueueStore', () => {
it('persists jobs across store instances', async () => {
const dir = await mkdtemp(join(tmpdir(), 'queue-store-'));
const filePath = join(dir, 'jobs.json');
const first = new FileQueueStore({ filePath });
const job = await first.enqueue('persisted', {
type: 'demo',
payload: { ok: true },
});
const second = new FileQueueStore({ filePath });
const found = await second.get('persisted', job.id);
expect(found?.payload).toEqual({ ok: true });
const raw = JSON.parse(await readFile(filePath, 'utf-8')) as Record<
string,
Array<{ id: string }>
>;
expect(raw.persisted[0].id).toBe(job.id);
});
});
describe('QueueWorker', () => {
it('processes queued jobs to completion', async () => {
const store = new MemoryQueueStore();
const worker = new QueueWorker<{ value: number }, { doubled: number }>({
queueName: 'work',
store,
pollIntervalMs: 5,
handler: async job => ({ doubled: job.payload.value * 2 }),
});
worker.start();
const job = await store.enqueue('work', {
type: 'double',
payload: { value: 21 },
});
await vi.waitFor(async () => {
const updated = await store.get<{ value: number }, { doubled: number }>('work', job.id);
expect(updated?.status).toBe('succeeded');
expect(updated?.result).toEqual({ doubled: 42 });
});
await worker.stop();
});
});