// fleet-dash.mjs — read/act adapter that re-points the agent-queue TUI dashboard // at the platform-service `/fleet` REST API (roadmap Phase 3: "TUI dashboard // re-pointed at /fleet API (parity)"). // // This module is intentionally pure-ish and dependency-injectable (the HTTP // `fetchImpl` is a parameter) so it is unit-testable WITHOUT a live service. // dashboard.mjs uses it ONLY when AQ_FLEET_DASH=1; otherwise the dashboard's // local-queue behavior is byte-for-byte unchanged. // // Auth + scoping mirror agent-queue/lib/fleet-client.sh: // base URL AQ_FLEET_API (already includes /api) // bearer AQ_FLEET_TOKEN // product X-Product-Id: AQ_PRODUCT_ID (sent on every request) const DEFAULT_TIMEOUT_MS = 8000; // fleetConfig(env) — resolve the dashboard's fleet mode from the environment. // enabled iff AQ_FLEET_DASH=1 (explicit opt-in). `ok` additionally requires the // api/token/product config to be complete; `missing` lists what's absent so the // dashboard can fail visibly instead of silently doing nothing. export function fleetConfig(env = process.env) { const enabled = String(env.AQ_FLEET_DASH || '') === '1'; const api = String(env.AQ_FLEET_API || '').replace(/\/+$/, ''); const token = String(env.AQ_FLEET_TOKEN || ''); const productId = String(env.AQ_PRODUCT_ID || ''); const missing = []; if (enabled) { if (!api) missing.push('AQ_FLEET_API'); if (!token) missing.push('AQ_FLEET_TOKEN'); if (!productId) missing.push('AQ_PRODUCT_ID'); } return { enabled, api, token, productId, ok: enabled && missing.length === 0, missing }; } // ── stage mapping ─────────────────────────────────────────────────────────── // Fleet stages collapse onto the local board's kanban buckets so the dashboard's // existing layout/gating/STAGE_TAG logic can be reused unchanged. const STAGE_BUCKET = { queued: 'inbox', assigned: 'building', building: 'building', review: 'review', testing: 'testing', shipped: 'shipped', failed: 'failed', dead_letter: 'failed', }; export const mapStage = (s) => STAGE_BUCKET[s] || 'inbox'; // Stages an operator can act on from the dashboard (parity with the local // ACTION_STAGES = review · testing · failed · inbox). const ACTIONABLE = new Set(['review', 'testing', 'failed', 'inbox']); const RUNNING_STAGES = new Set(['assigned', 'building']); const ITEM_ORDER = { review: 0, testing: 1, failed: 2, inbox: 3 }; const trackerOf = (j) => j.trackerItemId || j.trackerItem || (j.data && j.data.trackerItem) || ''; const capsOf = (j) => (Array.isArray(j.capabilities) ? j.capabilities.join(', ') : String(j.capabilities || '')); // toBoard({jobs, factories, metrics}) — normalize the raw API payloads into the // shape the dashboard renders. Pure (no I/O), so it is fully unit-testable. export function toBoard({ jobs = [], factories = [], metrics = null } = {}) { const counts = { inbox: 0, building: 0, review: 0, testing: 0, shipped: 0, failed: 0 }; const items = []; const running = []; const recent = []; for (const j of jobs) { const bucket = mapStage(j.stage); if (counts[bucket] !== undefined) counts[bucket] += 1; const norm = { // `stage` is the bucket so the dashboard's gate()/STAGE_TAG work unchanged; // `fleetStage` keeps the true server stage for display. stage: bucket, fleetStage: j.stage, id: j.id, priority: j.priority || 'medium', profile: j.profile || '', capabilities: capsOf(j), tracker_item: trackerOf(j), leaseEpoch: j.leaseEpoch, factoryId: j.factoryId || j.leaseFactoryId || '', attempts: j.attempts, updatedAt: j.updatedAt || j.createdAt || '', raw: j, }; if (RUNNING_STAGES.has(j.stage)) running.push(norm); if (ACTIONABLE.has(bucket)) items.push(norm); if (bucket === 'shipped' || bucket === 'failed') recent.push(norm); } items.sort((a, b) => (ITEM_ORDER[a.stage] - ITEM_ORDER[b.stage]) || cmp(a.id, b.id)); recent.sort((a, b) => cmp(String(b.updatedAt), String(a.updatedAt))); return { counts, items, running, recent: recent.slice(0, 5), factories, metrics }; } const cmp = (a, b) => (a < b ? -1 : a > b ? 1 : 0); // ── HTTP ──────────────────────────────────────────────────────────────────-- // fleetFetch — a single request against the coordinator. NEVER throws: network // errors / timeouts / non-JSON bodies are returned as a structured result so the // TUI stays responsive. A timeout is enforced via AbortController. export async function fleetFetch(cfg, pathname, opts = {}, fetchImpl = globalThis.fetch) { const url = `${cfg.api}${pathname}`; const headers = { Authorization: `Bearer ${cfg.token}`, 'X-Product-Id': cfg.productId, Accept: 'application/json', }; const hasBody = opts.body !== undefined; if (hasBody) headers['Content-Type'] = 'application/json'; const ac = new AbortController(); const timer = setTimeout(() => ac.abort(), opts.timeoutMs || DEFAULT_TIMEOUT_MS); try { const res = await fetchImpl(url, { method: opts.method || 'GET', headers, body: hasBody ? JSON.stringify(opts.body) : undefined, signal: ac.signal, }); let json = null; let text = ''; try { text = await res.text(); } catch { /* ignore body read errors */ } if (text) { try { json = JSON.parse(text); } catch { json = null; } } return { ok: !!res.ok, status: res.status, json }; } catch (e) { const timedOut = e && (e.name === 'AbortError' || e.code === 'ABORT_ERR'); return { ok: false, status: 0, json: null, error: timedOut ? 'timeout' : (e && e.message) || 'network error' }; } finally { clearTimeout(timer); } } // fetchBoard — assemble the board model. Jobs are REQUIRED (failure ⇒ board // fails). Metrics + factories are best-effort: a missing/404/501 factories // endpoint degrades to [] (it is optional server-side), and absent metrics // simply omits the aggregate panel. export async function fetchBoard(cfg, fetchImpl = globalThis.fetch) { const jobsRes = await fleetFetch(cfg, '/fleet/jobs', {}, fetchImpl); if (!jobsRes.ok || !jobsRes.json) { return { ok: false, error: jobsRes.error || `jobs HTTP ${jobsRes.status}` }; } const jobs = Array.isArray(jobsRes.json.jobs) ? jobsRes.json.jobs : []; const metricsRes = await fleetFetch(cfg, '/fleet/metrics', {}, fetchImpl); const metrics = metricsRes.ok && metricsRes.json ? metricsRes.json : null; const facRes = await fleetFetch(cfg, '/fleet/factories', {}, fetchImpl); const factories = facRes.ok && facRes.json && Array.isArray(facRes.json.factories) ? facRes.json.factories : []; return { ok: true, board: toBoard({ jobs, factories, metrics }) }; } // formatEvent — one fleet event → a single log line for the TUI log view. export function formatEvent(e) { const at = e.at ? safeTime(e.at) : ''; const actor = e.actor ? ` ${e.actor}` : ''; const data = e.data && typeof e.data === 'object' && Object.keys(e.data).length ? ` ${JSON.stringify(e.data)}` : ''; return `${at} ${e.type || '?'}${actor}${data}`.trim(); } const safeTime = (iso) => { const d = new Date(iso); return Number.isNaN(d.getTime()) ? String(iso) : d.toLocaleTimeString(); }; // fetchEvents — the job's event stream rendered as log lines. export async function fetchEvents(cfg, jobId, fetchImpl = globalThis.fetch) { const res = await fleetFetch(cfg, `/fleet/jobs/${encodeURIComponent(jobId)}/events`, {}, fetchImpl); if (!res.ok || !res.json) { return { ok: false, error: res.error || `events HTTP ${res.status}`, lines: [] }; } const events = Array.isArray(res.json.events) ? res.json.events : []; return { ok: true, lines: events.map(formatEvent) }; } // Operator verbs the dashboard supports in fleet mode. `promote` has no safe // server contract (client-inferred stage transitions could violate workflow // invariants), so it is explicitly unavailable here. const FLEET_VERBS = new Set(['ship', 'requeue', 'reject']); // jobAction — execute an operator verb against the coordinator. // ship → re-GET the job for a FRESH leaseEpoch, then PATCH stage=shipped // (a stale snapshot epoch would be fenced with 409). // requeue → POST /actions/requeue (lease-free operator action) // reject → POST /actions/reject // Returns {ok, message}; never throws. export async function jobAction(cfg, item, verb, fetchImpl = globalThis.fetch) { if (verb === 'promote') return { ok: false, message: 'promote is not available in fleet mode' }; if (!FLEET_VERBS.has(verb)) return { ok: false, message: `${verb} not supported in fleet mode` }; const id = item && item.id; if (!id) return { ok: false, message: 'no job selected' }; if (verb === 'ship') { const cur = await fleetFetch(cfg, `/fleet/jobs/${encodeURIComponent(id)}`, {}, fetchImpl); if (!cur.ok || !cur.json) return { ok: false, message: cur.error || `job HTTP ${cur.status}` }; const res = await fleetFetch( cfg, `/fleet/jobs/${encodeURIComponent(id)}`, { method: 'PATCH', body: { stage: 'shipped', leaseEpoch: cur.json.leaseEpoch } }, fetchImpl, ); if (res.status === 409) return { ok: false, message: 'job changed (fenced) — refresh and retry' }; if (!res.ok) return { ok: false, message: res.error || `ship HTTP ${res.status}` }; return { ok: true, message: `shipped ${id}` }; } const res = await fleetFetch( cfg, `/fleet/jobs/${encodeURIComponent(id)}/actions/${verb}`, { method: 'POST', body: {} }, fetchImpl, ); if (res.status === 409) return { ok: false, message: 'job conflict/terminal — refresh and retry' }; if (!res.ok) return { ok: false, message: res.error || `${verb} HTTP ${res.status}` }; return { ok: true, message: `${verb} ${id}` }; }