feat: add resumable SSE live event stream for fleet jobs

Backend: GET /fleet/jobs/:id/events/stream emits a snapshot (seq > Last-Event-ID)
then long-polls the append-only event log, closing after a bounded window so
EventSource-style clients reconnect cleanly. Honors Last-Event-ID resume,
keepalive comments, and a terminal error frame.

Frontend: subscribeJobEvents uses fetch streaming (to send auth + product
headers) with parseSseFrames, Last-Event-ID resume, reconnect backoff, and a
fatal-on-error-frame fallback to polling. Job detail page subscribes live
(deduped by seq), falls back to 4s polling on failure, and shows a Live badge;
refresh() now merges events so a slow snapshot can't clobber streamed ones.

Tests: +3 route (snapshot, resume cursor, append-after-connect), +5 client
(parseSseFrames x2, subscribe deliver/error/resume/error-frame). fleet 150, web 222.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
This commit is contained in:
Saravanakumar D 2026-05-30 18:38:50 -07:00
parent 1ae15a7755
commit ea42602407
6 changed files with 460 additions and 6 deletions

View File

@ -27,6 +27,8 @@ import {
upsertBudget,
pauseBudget,
resumeBudget,
parseSseFrames,
subscribeJobEvents,
} from '@/lib/fleet-client';
describe('fleet-client', () => {
@ -234,4 +236,135 @@ describe('fleet-client', () => {
expect(res).toBeNull();
});
});
describe('parseSseFrames', () => {
it('parses complete frames and skips keepalive comments', () => {
const buf =
'id: 0\nevent: fleet-event\ndata: {"seq":0,"type":"submitted"}\n\n' +
': keepalive\n\n' +
'id: 1\nevent: fleet-event\ndata: {"seq":1,"type":"progress"}\n\n';
const { events, rest } = parseSseFrames(buf);
expect(events).toHaveLength(2);
expect(events[0]).toMatchObject({ id: '0', event: 'fleet-event' });
expect(JSON.parse(events[0].data).type).toBe('submitted');
expect(events[1].id).toBe('1');
expect(rest).toBe('');
});
it('returns a trailing partial frame as rest', () => {
const buf = 'id: 0\nevent: fleet-event\ndata: {"seq":0}\n\nid: 1\ndata: {"seq"';
const { events, rest } = parseSseFrames(buf);
expect(events).toHaveLength(1);
expect(rest).toBe('id: 1\ndata: {"seq"');
});
});
describe('subscribeJobEvents', () => {
function streamResponse(chunks: string[]): Response {
const encoder = new TextEncoder();
const body = new ReadableStream<Uint8Array>({
start(controller) {
for (const c of chunks) controller.enqueue(encoder.encode(c));
controller.close();
},
});
return new Response(body, {
status: 200,
headers: { 'content-type': 'text/event-stream' },
});
}
afterEach(() => {
vi.unstubAllGlobals();
});
it('delivers parsed fleet-events to onEvent', async () => {
const fetchMock = vi
.fn()
.mockResolvedValueOnce(
streamResponse([
'retry: 3000\n\n',
'id: 0\nevent: fleet-event\ndata: {"seq":0,"type":"submitted"}\n\n',
': keepalive\n\n',
'id: 1\nevent: fleet-event\ndata: {"seq":1,"type":"progress"}\n\n',
])
)
// never resolves → prevents a tight reconnect loop after the first close
.mockReturnValue(new Promise(() => {}));
vi.stubGlobal('fetch', fetchMock);
const received: number[] = [];
await new Promise<void>(resolve => {
const sub = subscribeJobEvents(
'j1',
{
onEvent: e => {
received.push(e.seq);
if (received.length === 2) {
sub.close();
resolve();
}
},
},
{ reconnectMs: 60_000 }
);
});
expect(received).toEqual([0, 1]);
expect(fetchMock).toHaveBeenCalledWith(
'/api/fleet/jobs/j1/events/stream',
expect.objectContaining({
headers: expect.objectContaining({ accept: 'text/event-stream' }),
})
);
});
it('invokes onError and stops when the stream is unavailable', async () => {
const fetchMock = vi.fn().mockResolvedValue(new Response('nope', { status: 500 }));
vi.stubGlobal('fetch', fetchMock);
const err = await new Promise<unknown>(resolve => {
subscribeJobEvents('j1', { onEvent: () => {}, onError: resolve });
});
expect(err).toBeInstanceOf(Error);
expect(fetchMock).toHaveBeenCalledTimes(1);
});
it('treats a terminal error frame as fatal (onError, no reconnect)', async () => {
const fetchMock = vi
.fn()
.mockResolvedValueOnce(
streamResponse([
'id: 0\nevent: fleet-event\ndata: {"seq":0,"type":"submitted"}\n\n',
'event: error\ndata: {"message":"stream interrupted"}\n\n',
])
)
.mockReturnValue(new Promise(() => {}));
vi.stubGlobal('fetch', fetchMock);
const seen: number[] = [];
const err = await new Promise<unknown>(resolve => {
subscribeJobEvents('j1', { onEvent: e => seen.push(e.seq), onError: resolve });
});
expect(seen).toEqual([0]);
expect(err).toBeInstanceOf(Error);
expect(fetchMock).toHaveBeenCalledTimes(1);
});
it('sends Last-Event-ID header when resuming from a cursor', async () => {
const fetchMock = vi.fn().mockReturnValue(new Promise(() => {}));
vi.stubGlobal('fetch', fetchMock);
const sub = subscribeJobEvents('j1', { onEvent: () => {} }, { lastEventId: 5 });
await Promise.resolve();
sub.close();
expect(fetchMock).toHaveBeenCalledWith(
'/api/fleet/jobs/j1/events/stream',
expect.objectContaining({ headers: expect.objectContaining({ 'last-event-id': '5' }) })
);
});
});
});

View File

@ -15,6 +15,7 @@ import {
getJobExplain,
patchJob,
operatorAction,
subscribeJobEvents,
type OperatorAction,
type FleetJob,
type FleetRun,
@ -38,6 +39,7 @@ export default function FleetJobDetailPage() {
const [loading, setLoading] = useState(true);
const [shipping, setShipping] = useState(false);
const [acting, setActing] = useState<OperatorAction | null>(null);
const [live, setLive] = useState(false);
const refresh = useCallback(async () => {
try {
@ -51,7 +53,13 @@ export default function FleetJobDetailPage() {
]);
setJob(j);
setRuns(r.runs);
setEvents(e.events);
// Merge (not replace) so a slow snapshot can't clobber newer streamed
// events — the event log is append-only, keyed by monotonic `seq`.
setEvents(prev => {
const bySeq = new Map(prev.map(x => [x.seq, x]));
for (const ev of e.events) bySeq.set(ev.seq, ev);
return [...bySeq.values()].sort((a, b) => a.seq - b.seq);
});
setArtifacts(a.artifacts);
setDag(d?.dag ?? null);
setExplain(x);
@ -67,6 +75,39 @@ export default function FleetJobDetailPage() {
refresh();
}, [token, jobId, refresh]);
// Live event stream: subscribe via SSE once authenticated; append new events
// (deduped by seq). Fall back to polling if streaming is unavailable.
useEffect(() => {
if (!token || !jobId) return;
let pollTimer: ReturnType<typeof setInterval> | undefined;
const appendEvent = (e: FleetEvent) => {
setEvents(prev => (prev.some(x => x.seq === e.seq) ? prev : [...prev, e]));
setLive(true);
};
const startPolling = () => {
if (pollTimer) return;
setLive(false);
pollTimer = setInterval(() => {
getJobEvents(jobId)
.then(r => setEvents(r.events))
.catch(() => {});
}, 4000);
};
const sub = subscribeJobEvents(
jobId,
{ onEvent: appendEvent, onError: startPolling },
{ lastEventId: -1 }
);
return () => {
sub.close();
if (pollTimer) clearInterval(pollTimer);
};
}, [token, jobId]);
const handleShip = async () => {
if (!job) return;
setShipping(true);
@ -179,7 +220,18 @@ export default function FleetJobDetailPage() {
{/* Event timeline */}
<section>
<h2 className="text-lg font-semibold mb-2">Event Timeline</h2>
<h2 className="text-lg font-semibold mb-2 flex items-center gap-2">
Event Timeline
{live && (
<span
className="inline-flex items-center gap-1 text-xs font-medium text-green-600"
data-testid="live-indicator"
>
<span className="h-2 w-2 rounded-full bg-green-500 animate-pulse" />
Live
</span>
)}
</h2>
{events.length === 0 ? (
<p className="text-muted-foreground text-sm">No events recorded.</p>
) : (

View File

@ -200,6 +200,133 @@ export async function getJobEvents(jobId: string): Promise<{ events: FleetEvent[
return apiFetch(`/jobs/${jobId}/events`);
}
// ── Live event stream (SSE) ───────────────────────────────────────────────────
export interface ParsedSseEvent {
id?: string;
event?: string;
data: string;
}
/**
* Parse a raw SSE text buffer into complete frames. Returns the parsed events
* and any trailing partial frame (`rest`) that should be prepended to the next
* chunk. Comment lines (`:` keepalives) are skipped. Pure + side-effect free.
*/
export function parseSseFrames(buffer: string): { events: ParsedSseEvent[]; rest: string } {
const events: ParsedSseEvent[] = [];
let rest = buffer;
let idx = rest.indexOf('\n\n');
while (idx !== -1) {
const frame = rest.slice(0, idx);
rest = rest.slice(idx + 2);
idx = rest.indexOf('\n\n');
if (!frame.trim() || frame.startsWith(':')) continue;
const ev: ParsedSseEvent = { data: '' };
const dataLines: string[] = [];
for (const line of frame.split('\n')) {
if (line.startsWith('id:')) ev.id = line.slice(3).trim();
else if (line.startsWith('event:')) ev.event = line.slice(6).trim();
else if (line.startsWith('data:')) dataLines.push(line.slice(5).trimStart());
}
ev.data = dataLines.join('\n');
events.push(ev);
}
return { events, rest };
}
export interface JobEventSubscription {
close: () => void;
}
export interface SubscribeJobEventsOptions {
/** Resume cursor — only events with seq greater than this are delivered. */
lastEventId?: number;
/** Backoff before reconnecting after a clean server close (ms). */
reconnectMs?: number;
}
const sseDelay = (ms: number): Promise<void> => new Promise(resolve => setTimeout(resolve, ms));
/**
* Subscribe to a job's live event stream over SSE using `fetch` streaming (so
* auth + product headers can be sent native EventSource cannot). Calls
* `onEvent` for every new fleet-event and auto-reconnects with Last-Event-ID
* after a clean server close. On a hard failure it invokes `onError` and stops,
* letting callers fall back to polling `getJobEvents`. Returns a handle whose
* `close()` aborts the stream.
*/
export function subscribeJobEvents(
jobId: string,
handlers: { onEvent: (e: FleetEvent) => void; onError?: (err: unknown) => void },
opts?: SubscribeJobEventsOptions
): JobEventSubscription {
let closed = false;
const controller = new AbortController();
let lastId = opts?.lastEventId ?? -1;
const reconnectMs = Math.max(250, opts?.reconnectMs ?? 1500);
const token = typeof window !== 'undefined' ? localStorage.getItem('tracker_token') : null;
const pid =
typeof window !== 'undefined' ? localStorage.getItem('tracker_selected_product') : null;
const connect = async (): Promise<void> => {
while (!closed) {
try {
const headers: Record<string, string> = { accept: 'text/event-stream' };
if (token) headers['authorization'] = `Bearer ${token}`;
if (pid) headers['x-product-id'] = pid;
if (lastId >= 0) headers['last-event-id'] = String(lastId);
const res = await fetch(`/api/fleet/jobs/${jobId}/events/stream`, {
headers,
signal: controller.signal,
});
if (!res.ok || !res.body) throw new Error(`stream HTTP ${res.status}`);
const reader = res.body.getReader();
const decoder = new TextDecoder();
let buffer = '';
for (;;) {
const { value, done } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
const { events, rest } = parseSseFrames(buffer);
buffer = rest;
for (const ev of events) {
// A terminal error frame means the server gave up mid-stream;
// surface it as fatal so the caller falls back to polling rather
// than reconnecting into the same failure forever.
if (ev.event === 'error') throw new Error('stream error frame');
try {
const parsed = JSON.parse(ev.data) as FleetEvent;
lastId = parsed.seq;
handlers.onEvent(parsed);
} catch {
/* skip malformed frame */
}
}
}
// Clean close (server hit its max duration) → reconnect after a backoff.
if (!closed) await sseDelay(reconnectMs);
} catch (err) {
if (closed) return;
controller.abort();
handlers.onError?.(err);
return;
}
}
};
void connect();
return {
close: () => {
closed = true;
controller.abort();
},
};
}
export async function getJobArtifacts(jobId: string): Promise<{ artifacts: FleetArtifact[] }> {
return apiFetch(`/jobs/${jobId}/artifacts`);
}

View File

@ -37,12 +37,16 @@
- Acceptance criteria: per-day spend visible with ceiling line; empty state when no data.
- Verification command: `pnpm --filter @bytelyst/tracker-web test`
- [ ] **SSE live log streaming**
- [x] **SSE live log streaming**
- Priority: P2 (larger; §17 single-stream contract)
- Current status: polling only
- Files involved: new streaming route in platform-service; EventSource consumer in job detail page
- Current status: ✅ DONE — `GET /fleet/jobs/:id/events/stream` (resumable SSE) + `subscribeJobEvents`
fetch-streaming consumer with Last-Event-ID resume, polling fallback, and a Live badge; fleet 150,
web 222 green
- Files involved: `services/platform-service/src/modules/fleet/routes.ts` (stream route + clampInt/delay),
`dashboards/tracker-web/src/lib/fleet-client.ts` (`parseSseFrames`, `subscribeJobEvents`),
job detail page (live subscribe + fallback + Live indicator), route + client tests
- Implementation plan: `GET /fleet/jobs/:id/events/stream` (SSE) emitting appended events;
UI subscribes via EventSource with polling fallback.
UI subscribes via fetch streaming (auth headers) with polling fallback.
- Acceptance criteria: new events appear without refresh; reconnect + fallback work.
- Verification command: `pnpm --filter @lysnrai/platform-service test`

View File

@ -112,6 +112,67 @@ describe('fleetRoutes', () => {
expect(types).toContain('assigned');
});
it('GET /fleet/jobs/:id/events streams a snapshot over SSE then closes', async () => {
const app = await buildApp();
const sub = await submit(app, { idempotencyKey: 'sse1', bodyMd: '# task' });
const jobId = JSON.parse(sub.body).job.id as string;
// maxMs=0 => snapshot only, no follow loop, so inject resolves immediately.
const res = await app.inject({
method: 'GET',
url: `/api/fleet/jobs/${jobId}/events/stream?maxMs=0`,
});
expect(res.statusCode).toBe(200);
expect(res.headers['content-type']).toBe('text/event-stream');
expect(res.payload).toContain('retry: 3000');
expect(res.payload).toContain('event: fleet-event');
expect(res.payload).toContain('id: 0');
expect(res.payload).toContain('"type":"submitted"');
});
it('GET /fleet/jobs/:id/events/stream honours Last-Event-ID (no snapshot replay)', async () => {
const app = await buildApp();
const sub = await submit(app, { idempotencyKey: 'sse2', bodyMd: '# task' });
const jobId = JSON.parse(sub.body).job.id as string;
const res = await app.inject({
method: 'GET',
url: `/api/fleet/jobs/${jobId}/events/stream?maxMs=0`,
headers: { 'last-event-id': '99' },
});
expect(res.statusCode).toBe(200);
expect(res.payload).not.toContain('event: fleet-event');
expect(res.payload).toContain('retry: 3000');
});
it('GET /fleet/jobs/:id/events/stream delivers events appended after connect', async () => {
const repo = await import('./repository.js');
const app = await buildApp();
const sub = await submit(app, { idempotencyKey: 'sse3', bodyMd: '# task' });
const jobId = JSON.parse(sub.body).job.id as string;
// Skip the snapshot (submitted=seq 0) and append a new event mid-stream; the
// poll loop should pick it up and deliver it before the stream closes.
const t = setTimeout(() => {
void repo.appendEvent({
jobId,
productId: 'lysnrai',
type: 'progress',
data: { pct: 42 },
});
}, 40);
const res = await app.inject({
method: 'GET',
url: `/api/fleet/jobs/${jobId}/events/stream?lastEventId=0&pollMs=25&maxMs=400`,
});
clearTimeout(t);
expect(res.statusCode).toBe(200);
expect(res.payload).toContain('"type":"progress"');
expect(res.payload).toContain('id: 1');
expect(res.payload).toContain(': keepalive');
});
it('POST /fleet/claim returns claimed:false when nothing is eligible', async () => {
const app = await buildApp();
const res = await app.inject({

View File

@ -11,6 +11,7 @@
* POST /fleet/factories/heartbeat factory liveness
* GET /fleet/jobs/:id/runs job run history
* GET /fleet/jobs/:id/events append-only event stream
* GET /fleet/jobs/:id/events/stream live event stream (SSE, resumable)
* POST /fleet/jobs/:id/artifacts upload a run output (base64 body blob + pointer)
* GET /fleet/jobs/:id/artifacts list a job's artifact pointers
* GET /fleet/artifacts/:artifactId pointer + fresh short-lived SAS download URL
@ -50,6 +51,15 @@ function badRequest(issues: { message: string }[]): never {
throw new BadRequestError(issues.map(i => i.message).join('; '));
}
const delay = (ms: number): Promise<void> => new Promise(resolve => setTimeout(resolve, ms));
/** Parse an integer query param, clamping to [min, max]; fall back to `fallback`. */
function clampInt(raw: string | undefined, fallback: number, min: number, max: number): number {
const n = Number.parseInt(raw ?? '', 10);
if (!Number.isFinite(n)) return fallback;
return Math.min(max, Math.max(min, n));
}
export async function fleetRoutes(app: FastifyInstance) {
// ── Submit (idempotent) ──
app.post('/fleet/jobs', async (req, reply) => {
@ -237,6 +247,73 @@ export async function fleetRoutes(app: FastifyInstance) {
return { events };
});
// ── Live event stream (SSE, §17 single-stream contract) ──
// Emits a snapshot of existing events (seq > Last-Event-ID), then long-polls
// the append-only stream for new events. Closes after `maxMs` so EventSource
// clients reconnect cleanly (carrying Last-Event-ID to resume). Designed to
// degrade gracefully: consumers fall back to `GET …/events` polling on error.
app.get('/fleet/jobs/:id/events/stream', async (req, reply) => {
await extractAuth(req);
const { id } = req.params as { id: string };
const query = (req.query ?? {}) as Record<string, string | undefined>;
// Resume cursor: `Last-Event-ID` header (set by EventSource on reconnect)
// takes precedence over the `lastEventId` query param. `-1` => send all.
const headerId = req.headers['last-event-id'];
const rawCursor = (Array.isArray(headerId) ? headerId[0] : headerId) ?? query.lastEventId;
let cursor = Number.parseInt(rawCursor ?? '', 10);
if (!Number.isFinite(cursor)) cursor = -1;
const pollMs = clampInt(query.pollMs, 1000, 50, 10_000);
const maxMs = clampInt(query.maxMs, 25_000, 0, 60_000);
reply.hijack();
const raw = reply.raw;
raw.writeHead(200, {
'content-type': 'text/event-stream',
'cache-control': 'no-cache, no-transform',
connection: 'keep-alive',
'x-accel-buffering': 'no',
});
// Tell the client how long to wait before reconnecting.
raw.write('retry: 3000\n\n');
let open = true;
const stop = () => {
open = false;
};
req.raw.on('close', stop);
req.raw.on('error', stop);
const flushNew = async (): Promise<void> => {
const all = await repo.listEvents(id);
for (const evt of all) {
if (evt.seq <= cursor) continue;
if (!open) return;
raw.write(`id: ${evt.seq}\nevent: fleet-event\ndata: ${JSON.stringify(evt)}\n\n`);
cursor = evt.seq;
}
};
try {
await flushNew();
const deadline = Date.now() + maxMs;
while (open && Date.now() < deadline) {
await delay(pollMs);
if (!open) break;
await flushNew();
if (open) raw.write(': keepalive\n\n');
}
} catch {
// Surface a terminal error event; the client will reconnect/fall back.
if (open) raw.write('event: error\ndata: {"message":"stream interrupted"}\n\n');
} finally {
req.raw.off('close', stop);
req.raw.off('error', stop);
raw.end();
}
});
// ── Scoring explainability — why does this job route where it does? (§7) ──
app.get('/fleet/jobs/:id/explain', async req => {
await extractAuth(req);