116 lines
2.8 KiB
TypeScript
116 lines
2.8 KiB
TypeScript
/**
|
|
* Browser-side stream consumers for Next.js API route clients.
|
|
*
|
|
* These functions consume a `Response` from a fetch call that returns
|
|
* streaming data (SSE or NDJSON) and invoke callbacks for each chunk.
|
|
*/
|
|
|
|
/**
|
|
* Consume a Server-Sent Events (SSE) stream from a Response.
|
|
*
|
|
* Parses `data:` lines and invokes `onData` for each parsed JSON object.
|
|
* Stops when the stream ends or `[DONE]` is received.
|
|
*
|
|
* @param response - Fetch Response with SSE body
|
|
* @param onData - Callback for each parsed data event
|
|
* @param onDone - Callback when stream completes
|
|
*/
|
|
export async function consumeSSEStream(
|
|
response: Response,
|
|
onData: (data: Record<string, unknown>) => void,
|
|
onDone: () => void
|
|
): Promise<void> {
|
|
if (!response.body) {
|
|
onDone();
|
|
return;
|
|
}
|
|
|
|
const reader = response.body.getReader();
|
|
const decoder = new TextDecoder();
|
|
let buffer = '';
|
|
|
|
try {
|
|
while (true) {
|
|
const { done, value } = await reader.read();
|
|
if (done) break;
|
|
|
|
buffer += decoder.decode(value, { stream: true });
|
|
const lines = buffer.split('\n');
|
|
buffer = lines.pop() ?? '';
|
|
|
|
for (const line of lines) {
|
|
const trimmed = line.trim();
|
|
if (!trimmed) continue;
|
|
|
|
if (trimmed.startsWith('data:')) {
|
|
const payload = trimmed.slice(5).trim();
|
|
if (payload === '[DONE]') {
|
|
onDone();
|
|
return;
|
|
}
|
|
try {
|
|
onData(JSON.parse(payload) as Record<string, unknown>);
|
|
} catch {
|
|
// skip malformed SSE data
|
|
}
|
|
}
|
|
}
|
|
}
|
|
} finally {
|
|
reader.releaseLock();
|
|
}
|
|
|
|
onDone();
|
|
}
|
|
|
|
/**
|
|
* Consume an NDJSON (newline-delimited JSON) stream from a Response.
|
|
*
|
|
* Parses each line as JSON and invokes `onChunk` for each parsed object.
|
|
*
|
|
* @param response - Fetch Response with NDJSON body
|
|
* @param onChunk - Callback for each parsed NDJSON line
|
|
*/
|
|
export async function consumeNdjsonStream<T>(
|
|
response: Response,
|
|
onChunk: (chunk: T) => void
|
|
): Promise<void> {
|
|
if (!response.body) return;
|
|
|
|
const reader = response.body.getReader();
|
|
const decoder = new TextDecoder();
|
|
let buffer = '';
|
|
|
|
try {
|
|
while (true) {
|
|
const { done, value } = await reader.read();
|
|
if (done) break;
|
|
|
|
buffer += decoder.decode(value, { stream: true });
|
|
const lines = buffer.split('\n');
|
|
buffer = lines.pop() ?? '';
|
|
|
|
for (const line of lines) {
|
|
const trimmed = line.trim();
|
|
if (!trimmed) continue;
|
|
try {
|
|
onChunk(JSON.parse(trimmed) as T);
|
|
} catch {
|
|
// skip malformed lines
|
|
}
|
|
}
|
|
}
|
|
|
|
// Process remaining buffer
|
|
if (buffer.trim()) {
|
|
try {
|
|
onChunk(JSON.parse(buffer.trim()) as T);
|
|
} catch {
|
|
// skip
|
|
}
|
|
}
|
|
} finally {
|
|
reader.releaseLock();
|
|
}
|
|
}
|