learning_ai_common_plat/packages/ai-ui/src/useStreamingText.ts
2026-05-27 12:07:23 -07:00

131 lines
4.0 KiB
TypeScript

import type { StreamProtocol } from './types.js';
/**
* Consume a streaming HTTP response body and yield text deltas.
*
* - `'text'` — body is plain text; each chunk yielded as-is.
* - `'sse'` — body is Server-Sent Events; we parse `data: ...` lines.
* `[DONE]` and empty data frames are skipped. JSON frames
* that include a `content` or `text` field are unwrapped.
* - `'data'` — Vercel AI SDK "data stream" prefix protocol. Lines look
* like `0:"hello"`, `2:[{...}]`, etc. We extract the `0:`
* (text) frames and ignore the rest until the matching
* components ship.
*
* The generator terminates when the body ends OR when the `AbortSignal`
* fires — callers can early-exit cleanly.
*
* @example
* ```ts
* const ac = new AbortController();
* for await (const chunk of streamText(response, 'text', ac.signal)) {
* setDraft(prev => prev + chunk);
* }
* ```
*/
export async function* streamText(
response: Response,
protocol: StreamProtocol = 'text',
signal?: AbortSignal,
): AsyncGenerator<string, void, unknown> {
if (!response.body) return;
const reader = response.body.getReader();
const decoder = new TextDecoder();
let buffer = '';
try {
while (true) {
if (signal?.aborted) return;
const { done, value } = await reader.read();
if (done) {
// Flush any tail bytes through the decoder.
const tail = decoder.decode();
if (tail) yield* parseChunk(tail, protocol, buffer)[0];
return;
}
buffer += decoder.decode(value, { stream: true });
const [chunks, remaining] = parseChunk('', protocol, buffer);
buffer = remaining;
for (const chunk of chunks) {
if (signal?.aborted) return;
yield chunk;
}
}
} finally {
// Best-effort cancellation — ignore if the stream is already closed.
try {
reader.releaseLock();
} catch {
/* noop */
}
}
}
/**
* Parse buffered text into emit-ready chunks per protocol.
*
* Returns `[chunksToEmit, remainingBuffer]`. The `remainingBuffer`
* accumulates partial frames (e.g. an SSE `data:` line without its
* terminating `\n\n`) until the next read fills them in.
*/
function parseChunk(
flush: string,
protocol: StreamProtocol,
buffer: string,
): [string[], string] {
const combined = buffer + flush;
if (protocol === 'text') {
// The caller-supplied buffer concept doesn't apply — emit everything.
return combined ? [[combined], ''] : [[], ''];
}
if (protocol === 'sse') {
const out: string[] = [];
// SSE frames are separated by a blank line (\n\n).
const parts = combined.split('\n\n');
const remaining = parts.pop() ?? '';
for (const frame of parts) {
for (const line of frame.split('\n')) {
if (!line.startsWith('data:')) continue;
const raw = line.slice(5).trimStart();
if (raw === '[DONE]' || raw === '') continue;
// Best-effort JSON unwrap — falls back to the raw string.
try {
const obj = JSON.parse(raw);
if (typeof obj === 'string') out.push(obj);
else if (typeof obj?.content === 'string') out.push(obj.content);
else if (typeof obj?.text === 'string') out.push(obj.text);
else if (typeof obj?.delta === 'string') out.push(obj.delta);
} catch {
out.push(raw);
}
}
}
return [out, remaining];
}
// protocol === 'data' — Vercel AI SDK data-stream prefix protocol.
const out: string[] = [];
const lines = combined.split('\n');
const remaining = lines.pop() ?? '';
for (const line of lines) {
if (!line) continue;
const colon = line.indexOf(':');
if (colon < 1) continue;
const prefix = line.slice(0, colon);
if (prefix !== '0') continue; // Only text frames for the MVP.
const payload = line.slice(colon + 1);
try {
const value = JSON.parse(payload);
if (typeof value === 'string') out.push(value);
} catch {
/* ignore malformed frame */
}
}
return [out, remaining];
}