95 lines
3.0 KiB
TypeScript
95 lines
3.0 KiB
TypeScript
import { describe, it, expect, vi } from 'vitest';
|
|
import { consumeSSEStream, consumeNdjsonStream } from './client-parsers.js';
|
|
|
|
function createResponse(body: string): Response {
|
|
const encoder = new TextEncoder();
|
|
const stream = new ReadableStream<Uint8Array>({
|
|
start(controller) {
|
|
controller.enqueue(encoder.encode(body));
|
|
controller.close();
|
|
},
|
|
});
|
|
return { ok: true, body: stream } as unknown as Response;
|
|
}
|
|
|
|
describe('consumeSSEStream', () => {
|
|
it('parses data: lines and calls onData', async () => {
|
|
const response = createResponse('data: {"msg":"hello"}\ndata: {"msg":"world"}\n\n');
|
|
const onData = vi.fn();
|
|
const onDone = vi.fn();
|
|
|
|
await consumeSSEStream(response, onData, onDone);
|
|
|
|
expect(onData).toHaveBeenCalledTimes(2);
|
|
expect(onData).toHaveBeenCalledWith({ msg: 'hello' });
|
|
expect(onData).toHaveBeenCalledWith({ msg: 'world' });
|
|
expect(onDone).toHaveBeenCalled();
|
|
});
|
|
|
|
it('stops on [DONE] marker', async () => {
|
|
const response = createResponse('data: {"msg":"hello"}\ndata: [DONE]\ndata: {"msg":"after"}\n');
|
|
const onData = vi.fn();
|
|
const onDone = vi.fn();
|
|
|
|
await consumeSSEStream(response, onData, onDone);
|
|
|
|
expect(onData).toHaveBeenCalledTimes(1);
|
|
expect(onDone).toHaveBeenCalled();
|
|
});
|
|
|
|
it('calls onDone when response has no body', async () => {
|
|
const response = { ok: true, body: null } as unknown as Response;
|
|
const onDone = vi.fn();
|
|
|
|
await consumeSSEStream(response, vi.fn(), onDone);
|
|
expect(onDone).toHaveBeenCalled();
|
|
});
|
|
|
|
it('skips malformed SSE data', async () => {
|
|
const response = createResponse('data: not-json\ndata: {"valid":true}\n');
|
|
const onData = vi.fn();
|
|
const onDone = vi.fn();
|
|
|
|
await consumeSSEStream(response, onData, onDone);
|
|
expect(onData).toHaveBeenCalledTimes(1);
|
|
expect(onData).toHaveBeenCalledWith({ valid: true });
|
|
});
|
|
});
|
|
|
|
describe('consumeNdjsonStream', () => {
|
|
it('parses NDJSON lines and calls onChunk', async () => {
|
|
const response = createResponse('{"a":1}\n{"a":2}\n');
|
|
const onChunk = vi.fn();
|
|
|
|
await consumeNdjsonStream(response, onChunk);
|
|
|
|
expect(onChunk).toHaveBeenCalledTimes(2);
|
|
expect(onChunk).toHaveBeenCalledWith({ a: 1 });
|
|
expect(onChunk).toHaveBeenCalledWith({ a: 2 });
|
|
});
|
|
|
|
it('handles no body', async () => {
|
|
const response = { ok: true, body: null } as unknown as Response;
|
|
const onChunk = vi.fn();
|
|
|
|
await consumeNdjsonStream(response, onChunk);
|
|
expect(onChunk).not.toHaveBeenCalled();
|
|
});
|
|
|
|
it('processes remaining buffer', async () => {
|
|
const response = createResponse('{"a":1}\n{"a":2}');
|
|
const onChunk = vi.fn();
|
|
|
|
await consumeNdjsonStream(response, onChunk);
|
|
expect(onChunk).toHaveBeenCalledTimes(2);
|
|
});
|
|
|
|
it('skips malformed lines', async () => {
|
|
const response = createResponse('{"a":1}\nbad\n{"a":2}\n');
|
|
const onChunk = vi.fn();
|
|
|
|
await consumeNdjsonStream(response, onChunk);
|
|
expect(onChunk).toHaveBeenCalledTimes(2);
|
|
});
|
|
});
|