/** * Browser-side stream consumers for Next.js API route clients. * * These functions consume a `Response` from a fetch call that returns * streaming data (SSE or NDJSON) and invoke callbacks for each chunk. */ /** * Consume a Server-Sent Events (SSE) stream from a Response. * * Parses `data:` lines and invokes `onData` for each parsed JSON object. * Stops when the stream ends or `[DONE]` is received. * * @param response - Fetch Response with SSE body * @param onData - Callback for each parsed data event * @param onDone - Callback when stream completes */ export async function consumeSSEStream( response: Response, onData: (data: Record) => void, onDone: () => void ): Promise { if (!response.body) { onDone(); return; } const reader = response.body.getReader(); const decoder = new TextDecoder(); let buffer = ''; try { while (true) { const { done, value } = await reader.read(); if (done) break; buffer += decoder.decode(value, { stream: true }); const lines = buffer.split('\n'); buffer = lines.pop() ?? ''; for (const line of lines) { const trimmed = line.trim(); if (!trimmed) continue; if (trimmed.startsWith('data:')) { const payload = trimmed.slice(5).trim(); if (payload === '[DONE]') { onDone(); return; } try { onData(JSON.parse(payload) as Record); } catch { // skip malformed SSE data } } } } } finally { reader.releaseLock(); } onDone(); } /** * Consume an NDJSON (newline-delimited JSON) stream from a Response. * * Parses each line as JSON and invokes `onChunk` for each parsed object. * * @param response - Fetch Response with NDJSON body * @param onChunk - Callback for each parsed NDJSON line */ export async function consumeNdjsonStream( response: Response, onChunk: (chunk: T) => void ): Promise { if (!response.body) return; const reader = response.body.getReader(); const decoder = new TextDecoder(); let buffer = ''; try { while (true) { const { done, value } = await reader.read(); if (done) break; buffer += decoder.decode(value, { stream: true }); const lines = buffer.split('\n'); buffer = lines.pop() ?? ''; for (const line of lines) { const trimmed = line.trim(); if (!trimmed) continue; try { onChunk(JSON.parse(trimmed) as T); } catch { // skip malformed lines } } } // Process remaining buffer if (buffer.trim()) { try { onChunk(JSON.parse(buffer.trim()) as T); } catch { // skip } } } finally { reader.releaseLock(); } }