From ea426024070c5a80fb8869722bb5dab41d0dc9c9 Mon Sep 17 00:00:00 2001 From: Saravanakumar D Date: Sat, 30 May 2026 18:38:50 -0700 Subject: [PATCH] feat: add resumable SSE live event stream for fleet jobs Backend: GET /fleet/jobs/:id/events/stream emits a snapshot (seq > Last-Event-ID) then long-polls the append-only event log, closing after a bounded window so EventSource-style clients reconnect cleanly. Honors Last-Event-ID resume, keepalive comments, and a terminal error frame. Frontend: subscribeJobEvents uses fetch streaming (to send auth + product headers) with parseSseFrames, Last-Event-ID resume, reconnect backoff, and a fatal-on-error-frame fallback to polling. Job detail page subscribes live (deduped by seq), falls back to 4s polling on failure, and shows a Live badge; refresh() now merges events so a slow snapshot can't clobber streamed ones. Tests: +3 route (snapshot, resume cursor, append-after-connect), +5 client (parseSseFrames x2, subscribe deliver/error/resume/error-frame). fleet 150, web 222. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../src/__tests__/fleet-client.test.ts | 133 ++++++++++++++++++ .../app/dashboard/fleet/jobs/[id]/page.tsx | 56 +++++++- .../tracker-web/src/lib/fleet-client.ts | 127 +++++++++++++++++ docs/TASKS_TO_COMPLETE.md | 12 +- .../src/modules/fleet/routes.test.ts | 61 ++++++++ .../src/modules/fleet/routes.ts | 77 ++++++++++ 6 files changed, 460 insertions(+), 6 deletions(-) diff --git a/dashboards/tracker-web/src/__tests__/fleet-client.test.ts b/dashboards/tracker-web/src/__tests__/fleet-client.test.ts index 5f02539b..e1b3f815 100644 --- a/dashboards/tracker-web/src/__tests__/fleet-client.test.ts +++ b/dashboards/tracker-web/src/__tests__/fleet-client.test.ts @@ -27,6 +27,8 @@ import { upsertBudget, pauseBudget, resumeBudget, + parseSseFrames, + subscribeJobEvents, } from '@/lib/fleet-client'; describe('fleet-client', () => { @@ -234,4 +236,135 @@ describe('fleet-client', () => { expect(res).toBeNull(); }); }); + + describe('parseSseFrames', () => { + it('parses complete frames and skips keepalive comments', () => { + const buf = + 'id: 0\nevent: fleet-event\ndata: {"seq":0,"type":"submitted"}\n\n' + + ': keepalive\n\n' + + 'id: 1\nevent: fleet-event\ndata: {"seq":1,"type":"progress"}\n\n'; + const { events, rest } = parseSseFrames(buf); + expect(events).toHaveLength(2); + expect(events[0]).toMatchObject({ id: '0', event: 'fleet-event' }); + expect(JSON.parse(events[0].data).type).toBe('submitted'); + expect(events[1].id).toBe('1'); + expect(rest).toBe(''); + }); + + it('returns a trailing partial frame as rest', () => { + const buf = 'id: 0\nevent: fleet-event\ndata: {"seq":0}\n\nid: 1\ndata: {"seq"'; + const { events, rest } = parseSseFrames(buf); + expect(events).toHaveLength(1); + expect(rest).toBe('id: 1\ndata: {"seq"'); + }); + }); + + describe('subscribeJobEvents', () => { + function streamResponse(chunks: string[]): Response { + const encoder = new TextEncoder(); + const body = new ReadableStream({ + start(controller) { + for (const c of chunks) controller.enqueue(encoder.encode(c)); + controller.close(); + }, + }); + return new Response(body, { + status: 200, + headers: { 'content-type': 'text/event-stream' }, + }); + } + + afterEach(() => { + vi.unstubAllGlobals(); + }); + + it('delivers parsed fleet-events to onEvent', async () => { + const fetchMock = vi + .fn() + .mockResolvedValueOnce( + streamResponse([ + 'retry: 3000\n\n', + 'id: 0\nevent: fleet-event\ndata: {"seq":0,"type":"submitted"}\n\n', + ': keepalive\n\n', + 'id: 1\nevent: fleet-event\ndata: {"seq":1,"type":"progress"}\n\n', + ]) + ) + // never resolves → prevents a tight reconnect loop after the first close + .mockReturnValue(new Promise(() => {})); + vi.stubGlobal('fetch', fetchMock); + + const received: number[] = []; + await new Promise(resolve => { + const sub = subscribeJobEvents( + 'j1', + { + onEvent: e => { + received.push(e.seq); + if (received.length === 2) { + sub.close(); + resolve(); + } + }, + }, + { reconnectMs: 60_000 } + ); + }); + + expect(received).toEqual([0, 1]); + expect(fetchMock).toHaveBeenCalledWith( + '/api/fleet/jobs/j1/events/stream', + expect.objectContaining({ + headers: expect.objectContaining({ accept: 'text/event-stream' }), + }) + ); + }); + + it('invokes onError and stops when the stream is unavailable', async () => { + const fetchMock = vi.fn().mockResolvedValue(new Response('nope', { status: 500 })); + vi.stubGlobal('fetch', fetchMock); + + const err = await new Promise(resolve => { + subscribeJobEvents('j1', { onEvent: () => {}, onError: resolve }); + }); + + expect(err).toBeInstanceOf(Error); + expect(fetchMock).toHaveBeenCalledTimes(1); + }); + + it('treats a terminal error frame as fatal (onError, no reconnect)', async () => { + const fetchMock = vi + .fn() + .mockResolvedValueOnce( + streamResponse([ + 'id: 0\nevent: fleet-event\ndata: {"seq":0,"type":"submitted"}\n\n', + 'event: error\ndata: {"message":"stream interrupted"}\n\n', + ]) + ) + .mockReturnValue(new Promise(() => {})); + vi.stubGlobal('fetch', fetchMock); + + const seen: number[] = []; + const err = await new Promise(resolve => { + subscribeJobEvents('j1', { onEvent: e => seen.push(e.seq), onError: resolve }); + }); + + expect(seen).toEqual([0]); + expect(err).toBeInstanceOf(Error); + expect(fetchMock).toHaveBeenCalledTimes(1); + }); + + it('sends Last-Event-ID header when resuming from a cursor', async () => { + const fetchMock = vi.fn().mockReturnValue(new Promise(() => {})); + vi.stubGlobal('fetch', fetchMock); + + const sub = subscribeJobEvents('j1', { onEvent: () => {} }, { lastEventId: 5 }); + await Promise.resolve(); + sub.close(); + + expect(fetchMock).toHaveBeenCalledWith( + '/api/fleet/jobs/j1/events/stream', + expect.objectContaining({ headers: expect.objectContaining({ 'last-event-id': '5' }) }) + ); + }); + }); }); diff --git a/dashboards/tracker-web/src/app/dashboard/fleet/jobs/[id]/page.tsx b/dashboards/tracker-web/src/app/dashboard/fleet/jobs/[id]/page.tsx index e387df87..48109191 100644 --- a/dashboards/tracker-web/src/app/dashboard/fleet/jobs/[id]/page.tsx +++ b/dashboards/tracker-web/src/app/dashboard/fleet/jobs/[id]/page.tsx @@ -15,6 +15,7 @@ import { getJobExplain, patchJob, operatorAction, + subscribeJobEvents, type OperatorAction, type FleetJob, type FleetRun, @@ -38,6 +39,7 @@ export default function FleetJobDetailPage() { const [loading, setLoading] = useState(true); const [shipping, setShipping] = useState(false); const [acting, setActing] = useState(null); + const [live, setLive] = useState(false); const refresh = useCallback(async () => { try { @@ -51,7 +53,13 @@ export default function FleetJobDetailPage() { ]); setJob(j); setRuns(r.runs); - setEvents(e.events); + // Merge (not replace) so a slow snapshot can't clobber newer streamed + // events — the event log is append-only, keyed by monotonic `seq`. + setEvents(prev => { + const bySeq = new Map(prev.map(x => [x.seq, x])); + for (const ev of e.events) bySeq.set(ev.seq, ev); + return [...bySeq.values()].sort((a, b) => a.seq - b.seq); + }); setArtifacts(a.artifacts); setDag(d?.dag ?? null); setExplain(x); @@ -67,6 +75,39 @@ export default function FleetJobDetailPage() { refresh(); }, [token, jobId, refresh]); + // Live event stream: subscribe via SSE once authenticated; append new events + // (deduped by seq). Fall back to polling if streaming is unavailable. + useEffect(() => { + if (!token || !jobId) return; + let pollTimer: ReturnType | undefined; + + const appendEvent = (e: FleetEvent) => { + setEvents(prev => (prev.some(x => x.seq === e.seq) ? prev : [...prev, e])); + setLive(true); + }; + + const startPolling = () => { + if (pollTimer) return; + setLive(false); + pollTimer = setInterval(() => { + getJobEvents(jobId) + .then(r => setEvents(r.events)) + .catch(() => {}); + }, 4000); + }; + + const sub = subscribeJobEvents( + jobId, + { onEvent: appendEvent, onError: startPolling }, + { lastEventId: -1 } + ); + + return () => { + sub.close(); + if (pollTimer) clearInterval(pollTimer); + }; + }, [token, jobId]); + const handleShip = async () => { if (!job) return; setShipping(true); @@ -179,7 +220,18 @@ export default function FleetJobDetailPage() { {/* Event timeline */}
-

Event Timeline

+

+ Event Timeline + {live && ( + + + Live + + )} +

{events.length === 0 ? (

No events recorded.

) : ( diff --git a/dashboards/tracker-web/src/lib/fleet-client.ts b/dashboards/tracker-web/src/lib/fleet-client.ts index ab98b6e8..caca2024 100644 --- a/dashboards/tracker-web/src/lib/fleet-client.ts +++ b/dashboards/tracker-web/src/lib/fleet-client.ts @@ -200,6 +200,133 @@ export async function getJobEvents(jobId: string): Promise<{ events: FleetEvent[ return apiFetch(`/jobs/${jobId}/events`); } +// ── Live event stream (SSE) ─────────────────────────────────────────────────── + +export interface ParsedSseEvent { + id?: string; + event?: string; + data: string; +} + +/** + * Parse a raw SSE text buffer into complete frames. Returns the parsed events + * and any trailing partial frame (`rest`) that should be prepended to the next + * chunk. Comment lines (`:` keepalives) are skipped. Pure + side-effect free. + */ +export function parseSseFrames(buffer: string): { events: ParsedSseEvent[]; rest: string } { + const events: ParsedSseEvent[] = []; + let rest = buffer; + let idx = rest.indexOf('\n\n'); + while (idx !== -1) { + const frame = rest.slice(0, idx); + rest = rest.slice(idx + 2); + idx = rest.indexOf('\n\n'); + if (!frame.trim() || frame.startsWith(':')) continue; + const ev: ParsedSseEvent = { data: '' }; + const dataLines: string[] = []; + for (const line of frame.split('\n')) { + if (line.startsWith('id:')) ev.id = line.slice(3).trim(); + else if (line.startsWith('event:')) ev.event = line.slice(6).trim(); + else if (line.startsWith('data:')) dataLines.push(line.slice(5).trimStart()); + } + ev.data = dataLines.join('\n'); + events.push(ev); + } + return { events, rest }; +} + +export interface JobEventSubscription { + close: () => void; +} + +export interface SubscribeJobEventsOptions { + /** Resume cursor — only events with seq greater than this are delivered. */ + lastEventId?: number; + /** Backoff before reconnecting after a clean server close (ms). */ + reconnectMs?: number; +} + +const sseDelay = (ms: number): Promise => new Promise(resolve => setTimeout(resolve, ms)); + +/** + * Subscribe to a job's live event stream over SSE using `fetch` streaming (so + * auth + product headers can be sent — native EventSource cannot). Calls + * `onEvent` for every new fleet-event and auto-reconnects with Last-Event-ID + * after a clean server close. On a hard failure it invokes `onError` and stops, + * letting callers fall back to polling `getJobEvents`. Returns a handle whose + * `close()` aborts the stream. + */ +export function subscribeJobEvents( + jobId: string, + handlers: { onEvent: (e: FleetEvent) => void; onError?: (err: unknown) => void }, + opts?: SubscribeJobEventsOptions +): JobEventSubscription { + let closed = false; + const controller = new AbortController(); + let lastId = opts?.lastEventId ?? -1; + const reconnectMs = Math.max(250, opts?.reconnectMs ?? 1500); + + const token = typeof window !== 'undefined' ? localStorage.getItem('tracker_token') : null; + const pid = + typeof window !== 'undefined' ? localStorage.getItem('tracker_selected_product') : null; + + const connect = async (): Promise => { + while (!closed) { + try { + const headers: Record = { accept: 'text/event-stream' }; + if (token) headers['authorization'] = `Bearer ${token}`; + if (pid) headers['x-product-id'] = pid; + if (lastId >= 0) headers['last-event-id'] = String(lastId); + + const res = await fetch(`/api/fleet/jobs/${jobId}/events/stream`, { + headers, + signal: controller.signal, + }); + if (!res.ok || !res.body) throw new Error(`stream HTTP ${res.status}`); + + const reader = res.body.getReader(); + const decoder = new TextDecoder(); + let buffer = ''; + for (;;) { + const { value, done } = await reader.read(); + if (done) break; + buffer += decoder.decode(value, { stream: true }); + const { events, rest } = parseSseFrames(buffer); + buffer = rest; + for (const ev of events) { + // A terminal error frame means the server gave up mid-stream; + // surface it as fatal so the caller falls back to polling rather + // than reconnecting into the same failure forever. + if (ev.event === 'error') throw new Error('stream error frame'); + try { + const parsed = JSON.parse(ev.data) as FleetEvent; + lastId = parsed.seq; + handlers.onEvent(parsed); + } catch { + /* skip malformed frame */ + } + } + } + // Clean close (server hit its max duration) → reconnect after a backoff. + if (!closed) await sseDelay(reconnectMs); + } catch (err) { + if (closed) return; + controller.abort(); + handlers.onError?.(err); + return; + } + } + }; + + void connect(); + return { + close: () => { + closed = true; + controller.abort(); + }, + }; +} + export async function getJobArtifacts(jobId: string): Promise<{ artifacts: FleetArtifact[] }> { return apiFetch(`/jobs/${jobId}/artifacts`); } diff --git a/docs/TASKS_TO_COMPLETE.md b/docs/TASKS_TO_COMPLETE.md index 1259c951..7617a3f4 100644 --- a/docs/TASKS_TO_COMPLETE.md +++ b/docs/TASKS_TO_COMPLETE.md @@ -37,12 +37,16 @@ - Acceptance criteria: per-day spend visible with ceiling line; empty state when no data. - Verification command: `pnpm --filter @bytelyst/tracker-web test` -- [ ] **SSE live log streaming** +- [x] **SSE live log streaming** - Priority: P2 (larger; §17 single-stream contract) - - Current status: polling only - - Files involved: new streaming route in platform-service; EventSource consumer in job detail page + - Current status: ✅ DONE — `GET /fleet/jobs/:id/events/stream` (resumable SSE) + `subscribeJobEvents` + fetch-streaming consumer with Last-Event-ID resume, polling fallback, and a Live badge; fleet 150, + web 222 green + - Files involved: `services/platform-service/src/modules/fleet/routes.ts` (stream route + clampInt/delay), + `dashboards/tracker-web/src/lib/fleet-client.ts` (`parseSseFrames`, `subscribeJobEvents`), + job detail page (live subscribe + fallback + Live indicator), route + client tests - Implementation plan: `GET /fleet/jobs/:id/events/stream` (SSE) emitting appended events; - UI subscribes via EventSource with polling fallback. + UI subscribes via fetch streaming (auth headers) with polling fallback. - Acceptance criteria: new events appear without refresh; reconnect + fallback work. - Verification command: `pnpm --filter @lysnrai/platform-service test` diff --git a/services/platform-service/src/modules/fleet/routes.test.ts b/services/platform-service/src/modules/fleet/routes.test.ts index 3d7557fd..53d1ed65 100644 --- a/services/platform-service/src/modules/fleet/routes.test.ts +++ b/services/platform-service/src/modules/fleet/routes.test.ts @@ -112,6 +112,67 @@ describe('fleetRoutes', () => { expect(types).toContain('assigned'); }); + it('GET /fleet/jobs/:id/events streams a snapshot over SSE then closes', async () => { + const app = await buildApp(); + const sub = await submit(app, { idempotencyKey: 'sse1', bodyMd: '# task' }); + const jobId = JSON.parse(sub.body).job.id as string; + + // maxMs=0 => snapshot only, no follow loop, so inject resolves immediately. + const res = await app.inject({ + method: 'GET', + url: `/api/fleet/jobs/${jobId}/events/stream?maxMs=0`, + }); + expect(res.statusCode).toBe(200); + expect(res.headers['content-type']).toBe('text/event-stream'); + expect(res.payload).toContain('retry: 3000'); + expect(res.payload).toContain('event: fleet-event'); + expect(res.payload).toContain('id: 0'); + expect(res.payload).toContain('"type":"submitted"'); + }); + + it('GET /fleet/jobs/:id/events/stream honours Last-Event-ID (no snapshot replay)', async () => { + const app = await buildApp(); + const sub = await submit(app, { idempotencyKey: 'sse2', bodyMd: '# task' }); + const jobId = JSON.parse(sub.body).job.id as string; + + const res = await app.inject({ + method: 'GET', + url: `/api/fleet/jobs/${jobId}/events/stream?maxMs=0`, + headers: { 'last-event-id': '99' }, + }); + expect(res.statusCode).toBe(200); + expect(res.payload).not.toContain('event: fleet-event'); + expect(res.payload).toContain('retry: 3000'); + }); + + it('GET /fleet/jobs/:id/events/stream delivers events appended after connect', async () => { + const repo = await import('./repository.js'); + const app = await buildApp(); + const sub = await submit(app, { idempotencyKey: 'sse3', bodyMd: '# task' }); + const jobId = JSON.parse(sub.body).job.id as string; + + // Skip the snapshot (submitted=seq 0) and append a new event mid-stream; the + // poll loop should pick it up and deliver it before the stream closes. + const t = setTimeout(() => { + void repo.appendEvent({ + jobId, + productId: 'lysnrai', + type: 'progress', + data: { pct: 42 }, + }); + }, 40); + + const res = await app.inject({ + method: 'GET', + url: `/api/fleet/jobs/${jobId}/events/stream?lastEventId=0&pollMs=25&maxMs=400`, + }); + clearTimeout(t); + expect(res.statusCode).toBe(200); + expect(res.payload).toContain('"type":"progress"'); + expect(res.payload).toContain('id: 1'); + expect(res.payload).toContain(': keepalive'); + }); + it('POST /fleet/claim returns claimed:false when nothing is eligible', async () => { const app = await buildApp(); const res = await app.inject({ diff --git a/services/platform-service/src/modules/fleet/routes.ts b/services/platform-service/src/modules/fleet/routes.ts index 85c4f305..1515743f 100644 --- a/services/platform-service/src/modules/fleet/routes.ts +++ b/services/platform-service/src/modules/fleet/routes.ts @@ -11,6 +11,7 @@ * POST /fleet/factories/heartbeat factory liveness * GET /fleet/jobs/:id/runs job run history * GET /fleet/jobs/:id/events append-only event stream + * GET /fleet/jobs/:id/events/stream live event stream (SSE, resumable) * POST /fleet/jobs/:id/artifacts upload a run output (base64 body → blob + pointer) * GET /fleet/jobs/:id/artifacts list a job's artifact pointers * GET /fleet/artifacts/:artifactId pointer + fresh short-lived SAS download URL @@ -50,6 +51,15 @@ function badRequest(issues: { message: string }[]): never { throw new BadRequestError(issues.map(i => i.message).join('; ')); } +const delay = (ms: number): Promise => new Promise(resolve => setTimeout(resolve, ms)); + +/** Parse an integer query param, clamping to [min, max]; fall back to `fallback`. */ +function clampInt(raw: string | undefined, fallback: number, min: number, max: number): number { + const n = Number.parseInt(raw ?? '', 10); + if (!Number.isFinite(n)) return fallback; + return Math.min(max, Math.max(min, n)); +} + export async function fleetRoutes(app: FastifyInstance) { // ── Submit (idempotent) ── app.post('/fleet/jobs', async (req, reply) => { @@ -237,6 +247,73 @@ export async function fleetRoutes(app: FastifyInstance) { return { events }; }); + // ── Live event stream (SSE, §17 single-stream contract) ── + // Emits a snapshot of existing events (seq > Last-Event-ID), then long-polls + // the append-only stream for new events. Closes after `maxMs` so EventSource + // clients reconnect cleanly (carrying Last-Event-ID to resume). Designed to + // degrade gracefully: consumers fall back to `GET …/events` polling on error. + app.get('/fleet/jobs/:id/events/stream', async (req, reply) => { + await extractAuth(req); + const { id } = req.params as { id: string }; + const query = (req.query ?? {}) as Record; + + // Resume cursor: `Last-Event-ID` header (set by EventSource on reconnect) + // takes precedence over the `lastEventId` query param. `-1` => send all. + const headerId = req.headers['last-event-id']; + const rawCursor = (Array.isArray(headerId) ? headerId[0] : headerId) ?? query.lastEventId; + let cursor = Number.parseInt(rawCursor ?? '', 10); + if (!Number.isFinite(cursor)) cursor = -1; + + const pollMs = clampInt(query.pollMs, 1000, 50, 10_000); + const maxMs = clampInt(query.maxMs, 25_000, 0, 60_000); + + reply.hijack(); + const raw = reply.raw; + raw.writeHead(200, { + 'content-type': 'text/event-stream', + 'cache-control': 'no-cache, no-transform', + connection: 'keep-alive', + 'x-accel-buffering': 'no', + }); + // Tell the client how long to wait before reconnecting. + raw.write('retry: 3000\n\n'); + + let open = true; + const stop = () => { + open = false; + }; + req.raw.on('close', stop); + req.raw.on('error', stop); + + const flushNew = async (): Promise => { + const all = await repo.listEvents(id); + for (const evt of all) { + if (evt.seq <= cursor) continue; + if (!open) return; + raw.write(`id: ${evt.seq}\nevent: fleet-event\ndata: ${JSON.stringify(evt)}\n\n`); + cursor = evt.seq; + } + }; + + try { + await flushNew(); + const deadline = Date.now() + maxMs; + while (open && Date.now() < deadline) { + await delay(pollMs); + if (!open) break; + await flushNew(); + if (open) raw.write(': keepalive\n\n'); + } + } catch { + // Surface a terminal error event; the client will reconnect/fall back. + if (open) raw.write('event: error\ndata: {"message":"stream interrupted"}\n\n'); + } finally { + req.raw.off('close', stop); + req.raw.off('error', stop); + raw.end(); + } + }); + // ── Scoring explainability — why does this job route where it does? (§7) ── app.get('/fleet/jobs/:id/explain', async req => { await extractAuth(req);