/** * Fleet API client — typed wrapper for the fleet coordinator endpoints. * Follows the same pattern as tracker-client.ts. * Degrades gracefully: 404s return null, network errors return defaults. */ import { createApiClient } from '@bytelyst/api-client'; // ── Types ─────────────────────────────────────────────────────────────────── export interface FleetJob { id: string; productId: string; stage: string; idempotencyKey: string; bodyMd: string; priority: string; priorityOrder: number; capabilities: string[]; kind: string; parentId?: string; attempts: number; leaseEpoch: number; createdAt: string; updatedAt: string; reviewPolicy?: ReviewPolicy; reviewDecisions?: ReviewDecision[]; repo?: string; baseBranch?: string; /** PR mode: verify command run in the checkout before the PR opens. */ verify?: string; /** PR mode: squash-merge the PR automatically when verify passes. */ autoMerge?: boolean; /** Job dependencies (idempotency keys this job is gated on). */ deps?: string[]; } export interface FleetFactory { id: string; productId: string; factoryId: string; capabilities: string[]; health: 'ok' | 'degraded' | 'down'; load: number; seatLimit: number; lastHeartbeatAt: string; } /** Per-run cost / token / effort metrics reported by a factory. */ export interface FleetRunInsights { model?: string; tokensIn?: number; tokensOut?: number; tokensCached?: number; costUsd?: number; estimated?: boolean; turns?: number; toolCalls?: number; filesChanged?: number; linesAdded?: number; linesDeleted?: number; } export interface FleetRun { id: string; jobId: string; attempt: number; factoryId?: string; engine: string; startedAt: string; endedAt?: string; result?: string; insights: FleetRunInsights; prUrl?: string; branch?: string; prState?: 'open' | 'merged'; } export interface FleetEvent { id: string; jobId: string; seq: number; type: string; at: string; actor?: string; data: Record; } export interface FleetArtifact { id: string; jobId: string; kind: string; contentType: string; sizeBytes: number; createdAt: string; } export interface FleetBudget { id: string; productId: string; ceilingUsd: number; window: string; spentUsd: number; status: 'active' | 'paused'; updatedAt: string; } export interface BurndownPoint { date: string; costUsd: number; cumulativeUsd: number; } export interface CostBurndown { productId: string; ceilingUsd: number | null; window: string | null; totalUsd: number; days: BurndownPoint[]; } export interface DagNode { id: string; idempotencyKey: string; stage: string; priority: string; kind: string; parentId?: string; children: DagNode[]; } export interface ScoreBreakdown { capabilityFit: number; affinity: number; load: number; costFit: number; health: number; starvation: number; } export interface FactoryScoreExplain { factoryId: string; eligible: boolean; ineligibleReasons: string[]; score: number; breakdown: ScoreBreakdown; } export interface JobExplain { jobId: string; stage: string; weights: Record; depsSatisfied: boolean; unmetDeps: string[]; factories: FactoryScoreExplain[]; bestFactoryId: string | null; } // ── Client ────────────────────────────────────────────────────────────────── const fleetApi = createApiClient({ baseUrl: '/api/fleet', getToken: () => (typeof window !== 'undefined' ? localStorage.getItem('tracker_token') : null), }); function apiFetch(path: string, options?: RequestInit): Promise { const extra: Record = {}; if (typeof window !== 'undefined') { const pid = localStorage.getItem('tracker_selected_product'); if (pid) extra['x-product-id'] = pid; } return fleetApi.fetch(path, { ...options, headers: { ...extra, ...(options?.headers as Record) }, }); } /** Graceful fetch — returns null on 404 instead of throwing. */ async function apiFetchOptional(path: string, options?: RequestInit): Promise { try { return await apiFetch(path, options); } catch (err: unknown) { if (err instanceof Error && err.message.includes('404')) return null; throw err; } } // ── Jobs ──────────────────────────────────────────────────────────────────── export interface ListJobsParams { stage?: string; productId?: string; limit?: number; offset?: number; } export async function listJobs(params?: ListJobsParams): Promise<{ jobs: FleetJob[] }> { const qs = params ? `?${new URLSearchParams(params as Record).toString()}` : ''; return apiFetch(`/jobs${qs}`); } export async function getJob(id: string): Promise { return apiFetchOptional(`/jobs/${id}`); } export interface SubmitJobBody { idempotencyKey: string; bodyMd: string; priority?: 'critical' | 'high' | 'medium' | 'low'; capabilities?: string[]; /** PR mode: open a PR against this repo (`owner/name` or clone URL) + base branch. */ repo?: string; baseBranch?: string; /** PR mode: verify command run in the checkout before the PR opens; auto-merge the PR. */ verify?: string; autoMerge?: boolean; } /** Submit a new fleet job. Optionally target a specific product (factory's product), * overriding the dashboard's selected product for this submission. */ export async function submitJob( body: SubmitJobBody, productId?: string ): Promise<{ job: FleetJob }> { const headers = productId ? { 'x-product-id': productId } : undefined; return apiFetch(`/jobs`, { method: 'POST', body: JSON.stringify(body), headers }); } /** WIP checkpoint a factory carries across lease re-assignments (server schema). */ export interface FleetCheckpoint { wipBranch: string; wipBase?: string; wipCommit?: string; } export interface PatchJobBody { leaseEpoch: number; stage?: string; checkpoint?: FleetCheckpoint; blockedReason?: string; } export async function patchJob(id: string, body: PatchJobBody): Promise { return apiFetch(`/jobs/${id}`, { method: 'PATCH', body: JSON.stringify(body) }); } export type OperatorAction = 'requeue' | 'reject' | 'cancel' | 'ship'; /** * Operator-initiated lifecycle action (no lease required). The coordinator * fences any current factory holder by bumping the lease epoch. */ export async function operatorAction(id: string, action: OperatorAction): Promise { return apiFetch(`/jobs/${id}/actions/${action}`, { method: 'POST' }); } // ── Multi-reviewer human gate ───────────────────────────────────────────────── export interface ReviewPolicy { requiredApprovals: number; reviewers: string[]; } export interface ReviewDecision { reviewer: string; decision: 'approve' | 'reject'; at: string; note?: string; } export type ReviewGate = 'pending' | 'approved' | 'rejected'; /** Route a building job into the review gate with an optional policy. */ export async function requestReview( id: string, policy?: { requiredApprovals?: number; reviewers?: string[] } ): Promise { return apiFetch(`/jobs/${id}/review/request`, { method: 'POST', body: JSON.stringify(policy ?? {}), }); } /** Submit a single reviewer's approve/reject decision. */ export async function submitReview( id: string, input: { reviewer: string; decision: 'approve' | 'reject'; note?: string } ): Promise { return apiFetch(`/jobs/${id}/review`, { method: 'POST', body: JSON.stringify(input), }); } export async function getJobRuns(jobId: string): Promise<{ runs: FleetRun[] }> { return apiFetch(`/jobs/${jobId}/runs`); } export async function getJobEvents(jobId: string): Promise<{ events: FleetEvent[] }> { return apiFetch(`/jobs/${jobId}/events`); } // ── Fleet metrics + alerting ────────────────────────────────────────────────── export interface FleetAlert { level: 'warning' | 'critical'; code: string; message: string; } export interface FleetMetrics { productId: string; generatedAt: string; jobs: { total: number; byStage: Record; queueDepth: number; blocked: number; active: number; oldestQueuedAgeMs: number | null; }; factories: { total: number; live: number; stale: number; byHealth: { ok: number; degraded: number; down: number }; seatsUsed: number; seatsTotal: number; utilizationPct: number; }; alerts: FleetAlert[]; } export async function getFleetMetrics(): Promise { return apiFetchOptional('/metrics'); } // ── Live event stream (SSE) ─────────────────────────────────────────────────── export interface ParsedSseEvent { id?: string; event?: string; data: string; } /** * Parse a raw SSE text buffer into complete frames. Returns the parsed events * and any trailing partial frame (`rest`) that should be prepended to the next * chunk. Comment lines (`:` keepalives) are skipped. Pure + side-effect free. */ export function parseSseFrames(buffer: string): { events: ParsedSseEvent[]; rest: string } { const events: ParsedSseEvent[] = []; let rest = buffer; let idx = rest.indexOf('\n\n'); while (idx !== -1) { const frame = rest.slice(0, idx); rest = rest.slice(idx + 2); idx = rest.indexOf('\n\n'); if (!frame.trim() || frame.startsWith(':')) continue; const ev: ParsedSseEvent = { data: '' }; const dataLines: string[] = []; for (const line of frame.split('\n')) { if (line.startsWith('id:')) ev.id = line.slice(3).trim(); else if (line.startsWith('event:')) ev.event = line.slice(6).trim(); else if (line.startsWith('data:')) dataLines.push(line.slice(5).trimStart()); } ev.data = dataLines.join('\n'); events.push(ev); } return { events, rest }; } export interface JobEventSubscription { close: () => void; } export interface SubscribeJobEventsOptions { /** Resume cursor — only events with seq greater than this are delivered. */ lastEventId?: number; /** Backoff before reconnecting after a clean server close (ms). */ reconnectMs?: number; } const sseDelay = (ms: number): Promise => new Promise(resolve => setTimeout(resolve, ms)); /** * Subscribe to a job's live event stream over SSE using `fetch` streaming (so * auth + product headers can be sent — native EventSource cannot). Calls * `onEvent` for every new fleet-event and auto-reconnects with Last-Event-ID * after a clean server close. On a hard failure it invokes `onError` and stops, * letting callers fall back to polling `getJobEvents`. Returns a handle whose * `close()` aborts the stream. */ export function subscribeJobEvents( jobId: string, handlers: { onEvent: (e: FleetEvent) => void; onError?: (err: unknown) => void }, opts?: SubscribeJobEventsOptions ): JobEventSubscription { let closed = false; const controller = new AbortController(); let lastId = opts?.lastEventId ?? -1; const reconnectMs = Math.max(250, opts?.reconnectMs ?? 1500); const token = typeof window !== 'undefined' ? localStorage.getItem('tracker_token') : null; const pid = typeof window !== 'undefined' ? localStorage.getItem('tracker_selected_product') : null; const connect = async (): Promise => { while (!closed) { try { const headers: Record = { accept: 'text/event-stream' }; if (token) headers['authorization'] = `Bearer ${token}`; if (pid) headers['x-product-id'] = pid; if (lastId >= 0) headers['last-event-id'] = String(lastId); const res = await fetch(`/api/fleet/jobs/${jobId}/events/stream`, { headers, signal: controller.signal, }); if (!res.ok || !res.body) throw new Error(`stream HTTP ${res.status}`); const reader = res.body.getReader(); const decoder = new TextDecoder(); let buffer = ''; for (;;) { const { value, done } = await reader.read(); if (done) break; buffer += decoder.decode(value, { stream: true }); const { events, rest } = parseSseFrames(buffer); buffer = rest; for (const ev of events) { // A terminal error frame means the server gave up mid-stream; // surface it as fatal so the caller falls back to polling rather // than reconnecting into the same failure forever. if (ev.event === 'error') throw new Error('stream error frame'); try { const parsed = JSON.parse(ev.data) as FleetEvent; lastId = parsed.seq; handlers.onEvent(parsed); } catch { /* skip malformed frame */ } } } // Clean close (server hit its max duration) → reconnect after a backoff. if (!closed) await sseDelay(reconnectMs); } catch (err) { if (closed) return; controller.abort(); handlers.onError?.(err); return; } } }; void connect(); return { close: () => { closed = true; controller.abort(); }, }; } export async function getJobArtifacts(jobId: string): Promise<{ artifacts: FleetArtifact[] }> { return apiFetch(`/jobs/${jobId}/artifacts`); } /** Resolve a short-lived signed download URL for an artifact (e.g. a `log`). */ export async function getArtifactDownloadUrl(artifactId: string): Promise { const res = await apiFetchOptional<{ downloadUrl?: string }>(`/artifacts/${artifactId}`); return res?.downloadUrl ?? null; } export async function getJobDag(jobId: string): Promise<{ dag: DagNode } | null> { return apiFetchOptional(`/jobs/${jobId}/dag`); } export async function getJobExplain(jobId: string): Promise { return apiFetchOptional(`/jobs/${jobId}/explain`); } // ── Factories ─────────────────────────────────────────────────────────────── export async function listFactories(): Promise<{ factories: FleetFactory[] }> { try { return await apiFetch('/factories'); } catch { return { factories: [] }; } } // ── Budgets ───────────────────────────────────────────────────────────────── /** * Spend as a clamped 0–100 percentage of the ceiling. Guards against a missing, * non-finite, or zero ceiling (which would otherwise yield NaN/Infinity and * render a broken spend bar) by returning 0 — callers should show a "no ceiling" * state in that case. */ export function budgetUsagePct(spentUsd: number, ceilingUsd: number): number { if (!Number.isFinite(ceilingUsd) || ceilingUsd <= 0) return 0; const pct = (spentUsd / ceilingUsd) * 100; if (!Number.isFinite(pct) || pct < 0) return 0; return Math.min(100, pct); } export async function getBudget(productId: string): Promise { return apiFetchOptional(`/budgets/${productId}`); } export async function upsertBudget( productId: string, ceilingUsd: number, window: string ): Promise { return apiFetch(`/budgets/${productId}`, { method: 'PUT', body: JSON.stringify({ ceilingUsd, window }), }); } export async function pauseBudget(productId: string): Promise { return apiFetch(`/budgets/${productId}/pause`, { method: 'POST' }); } export async function resumeBudget(productId: string): Promise { return apiFetch(`/budgets/${productId}/resume`, { method: 'POST' }); } export async function getBudgetBurndown( productId: string, days?: number ): Promise { const qs = days ? `?days=${days}` : ''; return apiFetchOptional(`/budgets/${productId}/burndown${qs}`); }