From 66c91233da29d29d0b02ee62eab483a991a83595 Mon Sep 17 00:00:00 2001 From: Saravanakumar D Date: Sat, 30 May 2026 19:47:05 -0700 Subject: [PATCH] feat(agent-queue): re-point TUI dashboard at /fleet API (parity) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add an opt-in fleet mode to the dashboard so an operator can drive the coordinator fleet from the same TUI used for the local folder queue. - lib/fleet-dash.mjs: dependency-injectable read/act adapter over the platform-service /fleet REST surface (jobs, metrics, factories, events, ship/requeue/reject). Pure-ish + fully unit-testable without a live service. - dashboard.mjs: render + act in fleet mode when AQ_FLEET_DASH=1 — board with counts, factories (per-factory rows or metrics aggregate), alerts, running (by lease/factory), actionable JOBS with manifest tags, recent, and a per-job events log. Single-flight async refresh keeps the last good board on failure; ship re-GETs a fresh leaseEpoch before PATCH; run/stop/promote are disabled (no safe server contract). Local mode is byte-for-byte unchanged. - lib/fleet-dash.test.mjs: 22 node:assert assertions (config, stage mapping, toBoard, fetch headers/timeout/errors, board assembly + graceful degradation, events, job actions) wired into selftest.sh. - docs: tick the Phase 3 "TUI re-pointed at /fleet" roadmap boxes. Verified: selftest.sh green (incl. new fleet-dash checks); live non-TTY render smoke against a stub /fleet server (both factories and metrics-aggregate paths); local mode unchanged. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- agent-queue/dashboard.mjs | 266 ++++++++++++++++++++-- agent-queue/docs/GIGAFACTORY_ROADMAP.md | 4 +- agent-queue/lib/fleet-dash.mjs | 218 ++++++++++++++++++ agent-queue/lib/fleet-dash.test.mjs | 287 ++++++++++++++++++++++++ agent-queue/selftest.sh | 6 + 5 files changed, 765 insertions(+), 16 deletions(-) create mode 100644 agent-queue/lib/fleet-dash.mjs create mode 100644 agent-queue/lib/fleet-dash.test.mjs diff --git a/agent-queue/dashboard.mjs b/agent-queue/dashboard.mjs index c5ea0fc..74a7302 100644 --- a/agent-queue/dashboard.mjs +++ b/agent-queue/dashboard.mjs @@ -23,6 +23,7 @@ import fs from 'node:fs'; import path from 'node:path'; import { fileURLToPath } from 'node:url'; import { execFileSync, spawn } from 'node:child_process'; +import { fleetConfig, fetchBoard, fetchEvents, jobAction } from './lib/fleet-dash.mjs'; const __dirname = path.dirname(fileURLToPath(import.meta.url)); @@ -200,6 +201,44 @@ const aq = (args) => { } }; +// ── fleet mode (Phase 3: TUI re-pointed at /fleet) ────────────────── +// Opt-in via AQ_FLEET_DASH=1. When ON, the board is sourced from the +// platform-service /fleet REST API instead of the local queue; when OFF, every +// fleet code path below is skipped and the dashboard is byte-for-byte the local +// tool. All fleet I/O lives in lib/fleet-dash.mjs (injectable + unit-tested). +const FLEET = fleetConfig(); +// Latest fleet board snapshot. On a refresh failure we KEEP the last good board +// (no destructive flicker) and surface a staleness banner instead. +let fleetState = { board: null, error: null, loading: FLEET.enabled, lastOk: 0 }; +let fleetRefreshing = false; // single-flight guard (no overlapping fetches) +let fleetEvents = { jobId: null, lines: [], error: null, loading: false }; + +// refreshFleet() — single-flight board refresh. Applies a successful board; +// on failure preserves the previous board and records the error. +const refreshFleet = async () => { + if (fleetRefreshing) return; + fleetRefreshing = true; + try { + const r = await fetchBoard(FLEET); + if (r.ok) fleetState = { board: r.board, error: null, loading: false, lastOk: Date.now() }; + else fleetState = { ...fleetState, error: r.error, loading: false }; + } catch (e) { + fleetState = { ...fleetState, error: (e && e.message) || 'refresh failed', loading: false }; + } finally { + fleetRefreshing = false; + } +}; + +// refreshFleetEvents(job) — load a job's event stream into the log view state. +const refreshFleetEvents = (job) => { + fleetEvents = { jobId: job, lines: [], error: null, loading: true }; + fetchEvents(FLEET, job).then((r) => { + fleetEvents = { jobId: job, lines: r.lines || [], error: r.ok ? null : r.error, loading: false }; + if (mode === 'log' && logJob === job) draw(); + }); +}; + + // daemonPid() — pid of a live `run` loop, or null. const daemonPid = () => { try { @@ -238,6 +277,11 @@ const setFlash = (msg, ms = 4000) => { flash = msg; flashUntil = Date.now() + ms const flashLine = () => (flash && Date.now() < flashUntil ? flash : ''); const buildItems = () => { + if (FLEET.enabled) { + const b = fleetState.board; + if (!b) return []; + return b.items.map((it) => ({ stage: it.stage, job: it.id, fleet: it })); + } const list = []; for (const st of ACTION_STAGES) { for (const f of listMd(DIRS[st]).sort()) list.push({ stage: st, job: f.replace(/\.md$/, '') }); @@ -270,16 +314,31 @@ const gate = (verb, stage) => ({ logs: true, }[verb]); -// doAction(verb) — run the gated action on the selected job via agent-queue.sh. +// doAction(verb) — run the gated action on the selected job. In fleet mode it +// calls the /fleet API (lib/fleet-dash.mjs); otherwise it shells out to +// agent-queue.sh. promote is unavailable in fleet mode (no safe server contract). const doAction = (verb) => { const it = items[selIdx]; if (!it) { setFlash(c('gray', 'no job selected')); return; } + if (FLEET.enabled && verb === 'promote') { + setFlash(c('gray', 'promote n/a in fleet mode (use ship/requeue/reject)')); + return; + } if (!gate(verb, it.stage)) { setFlash(c('gray', `${verb} not valid for a ${it.stage} job`)); return; } if ((verb === 'reject' || verb === 'requeue') && mode !== 'confirm') { confirmAction = { verb, job: it.job, run: () => doAction(verb) }; mode = 'confirm'; return; } + if (FLEET.enabled) { + setFlash(c('gray', `${verb}…`)); + mode = 'board'; confirmAction = null; + jobAction(FLEET, it.fleet, verb).then((r) => { + setFlash((r.ok ? c('green', '✓ ') : c('red', '✗ ')) + (r.message || `${verb} ${it.job}`)); + refreshFleet().then(draw); + }); + return; + } const r = aq([verb, it.job]); setFlash((r.ok ? c('green', '✓ ') : c('red', '✗ ')) + (lastLine(r.out) || `${verb} ${it.job}`)); mode = 'board'; confirmAction = null; @@ -288,7 +347,148 @@ const doAction = (verb) => { // ── render ────────────────────────────────────────────────────────── const ENGINE_COLOR = { devin: 'cyan', claude: 'yellow', codex: 'green' }; +const FLEET_STAGE_COLOR = { + queued: 'blue', assigned: 'yellow', building: 'yellow', + review: 'cyan', testing: 'cyan', shipped: 'green', + failed: 'red', dead_letter: 'red', +}; + +// drawFleetBoard() — the board sourced from the /fleet API (AQ_FLEET_DASH=1). +// Mirrors the local board layout; running rows reflect lease/factory status +// (there is no local PID/liveness in fleet mode). +function drawFleetBoard() { + items = buildItems(); + syncSelection(); + const board = fleetState.board; + const counts = board ? board.counts : { inbox: 0, building: 0, review: 0, testing: 0, shipped: 0, failed: 0 }; + const running = board ? board.running : []; + const recent = board ? board.recent : []; + + const out = []; + out.push(''); + out.push(` ${C.bold}AGENT QUEUE${C.reset} ${c('cyan', 'fleet')} ${c('gray', FLEET.api)}`); + const staleSec = fleetState.lastOk ? Math.floor((Date.now() - fleetState.lastOk) / 1000) : null; + let statusBit; + if (fleetState.loading && !board) statusBit = c('gray', '◌ loading…'); + else if (fleetState.error) statusBit = c('red', `⚠ ${trunc(fleetState.error, 40)}${board ? ` (stale ${staleSec}s)` : ''}`); + else statusBit = c('green', `● live${staleSec !== null ? ` (${staleSec}s ago)` : ''}`); + out.push( + ` ${c('gray', new Date().toLocaleTimeString())} refresh ${INTERVAL / 1000}s ${statusBit}` + + ` ${c('gray', `product ${FLEET.productId}`)} ${c('gray', INTERACTIVE ? 'press ? for help' : 'read-only')}` + ); + out.push(''); + out.push( + ` ${c('blue', '▢ inbox')} ${String(counts.inbox).padEnd(3)}` + + ` ${c('yellow', '◧ building')} ${String(counts.building).padEnd(3)}` + + ` ${c('cyan', '◔ review')} ${String(counts.review).padEnd(3)}` + + ` ${c('cyan', '◕ testing')} ${String(counts.testing).padEnd(3)}` + + ` ${c('green', '▣ shipped')} ${String(counts.shipped).padEnd(3)}` + + ` ${c('red', '✕ failed')} ${String(counts.failed).padEnd(3)}` + + ` ${C.bold}running ${running.length}${C.reset}` + ); + out.push(''); + + // factories (per-factory rows when /fleet/factories exists; else metrics aggregate) + out.push(` ${C.bold}FACTORIES${C.reset}`); + const factories = board ? board.factories : []; + const metrics = board ? board.metrics : null; + if (factories.length > 0) { + for (const f of factories) { + const health = String(f.health || 'ok'); + const hc = health === 'ok' ? 'green' : health === 'degraded' ? 'yellow' : 'red'; + out.push( + ` ${c('bold', trunc(f.factoryId || f.id || '?', 24).padEnd(24))} ` + + `${c(hc, health.padEnd(9))} ` + + `${c('gray', `load ${f.load ?? '?'}/${f.seatLimit ?? '?'}`)} ` + + `${c('gray', trunc((Array.isArray(f.capabilities) ? f.capabilities.join(', ') : f.capabilities) || '', 36))}` + ); + } + } else if (metrics && metrics.factory) { + const fm = metrics.factory; + const bh = fm.byHealth || {}; + out.push( + ` ${c('green', `ok ${bh.ok ?? 0}`)} ${c('yellow', `degraded ${bh.degraded ?? 0}`)} ${c('red', `down ${bh.down ?? 0}`)}` + + ` ${c('gray', `live ${fm.live ?? '?'} · stale ${fm.stale ?? '?'}`)}` + + ` ${c('gray', `seats ${fm.seatsUsed ?? '?'}/${fm.seatsTotal ?? '?'}`)}` + + ` ${c('gray', `util ${metrics.utilizationPct ?? '?'}%`)}` + ); + } else { + out.push(` ${c('dim', 'no factory data')}`); + } + // alerts (from metrics) + if (metrics && Array.isArray(metrics.alerts) && metrics.alerts.length > 0) { + for (const a of metrics.alerts) { + const sev = a.severity === 'critical' ? 'red' : 'yellow'; + out.push(` ${c(sev, '⚠ ')}${c(sev, a.kind || 'alert')}${a.message ? c('gray', ` — ${trunc(a.message, 50)}`) : ''}`); + } + } + out.push(''); + + // running (lease/factory status — no local pid) + out.push(` ${C.bold}RUNNING${C.reset}`); + if (running.length === 0) { + out.push(` ${c('dim', 'no jobs in flight')}`); + } else { + for (const r of running) { + const sc = FLEET_STAGE_COLOR[r.fleetStage] || 'gray'; + out.push( + ` ${c('bold', trunc(r.id, 30).padEnd(30))} ` + + `${c(sc, String(r.fleetStage).padEnd(9))} ` + + `${c('gray', r.factoryId ? `@${trunc(r.factoryId, 18)}` : 'unassigned')}` + ); + const mtags = manifestTags(r); + if (mtags) out.push(` ${mtags}`); + } + } + out.push(''); + + // actionable jobs (numbered + selectable) — reuses STAGE_TAG buckets + out.push(` ${C.bold}JOBS${C.reset} ${c('gray', '(review · testing · failed · inbox)')}`); + if (items.length === 0) { + out.push(` ${c('dim', board ? 'no actionable jobs' : 'waiting for fleet…')}`); + } else { + items.forEach((it, i) => { + const sel = i === selIdx; + const ptr = sel ? c('cyan', '▶') : ' '; + const num = c('gray', String(i + 1).padStart(2) + '.'); + const tag = (STAGE_TAG[it.stage] || (() => `[${it.stage}]`))(); + const name = sel ? `${C.bold}${trunc(it.job, 46)}${C.reset}` : trunc(it.job, 46); + out.push(` ${ptr} ${num} ${tag} ${name}`); + const jtags = manifestTags(it.fleet); + if (jtags) out.push(` ${jtags}`); + }); + } + out.push(''); + + // recent (shipped + failed) + out.push(` ${C.bold}RECENT${C.reset}`); + if (recent.length === 0) { + out.push(` ${c('dim', 'nothing finished yet')}`); + } else { + for (const r of recent) { + const failedRes = r.stage === 'failed'; + const mark = failedRes ? c('red', '✕') : c('green', '▣'); + const label = failedRes ? c('red', r.fleetStage) : c('green', r.fleetStage); + const when = r.updatedAt ? new Date(r.updatedAt).toLocaleTimeString() : ''; + out.push(` ${mark} ${trunc(r.id, 34).padEnd(34)} ${label} ${c('gray', when)}`); + } + } + out.push(''); + + // flash + footer + const fl = flashLine(); + if (fl) out.push(` ${fl}`); + if (mode === 'confirm' && confirmAction) { + out.push(` ${c('yellow', `${confirmAction.verb} "${confirmAction.job}" ? `)}${C.bold}y${C.reset}${c('gray', '/')}${C.bold}n${C.reset}`); + } else if (INTERACTIVE) { + out.push(c('gray', ' ↑/↓ select · enter events · s ship · x reject · u requeue')); + out.push(c('gray', ' g refresh · ? help · q quit')); + } + process.stdout.write('\x1b[2J\x1b[H' + out.join('\n') + '\n'); +} + function drawBoard() { + if (FLEET.enabled) return drawFleetBoard(); const metas = readMetas(); const metaByJob = Object.fromEntries(metas.filter((m) => m.job).map((m) => [m.job, m])); const running = metas.filter((m) => !m.ended && pidAlive(m.pid)); @@ -419,6 +619,16 @@ function drawBoard() { function drawLog() { const rows = (process.stdout.rows || 30) - 6; + if (FLEET.enabled) { + let body; + if (fleetEvents.loading) body = c('gray', ' loading events…'); + else if (fleetEvents.error) body = c('red', ` ${fleetEvents.error}`); + else if (!fleetEvents.lines.length) body = c('gray', ' no events for this job'); + else body = fleetEvents.lines.slice(-rows).join('\n'); + const head = ` ${C.bold}EVENTS${C.reset} ${c('cyan', logJob)} ${c('gray', 'q/esc back · g refresh')}`; + process.stdout.write('\x1b[2J\x1b[H' + head + '\n' + c('gray', ' ' + '─'.repeat(60)) + '\n' + body + '\n'); + return; + } let body = `no log for ${logJob}`; try { const txt = fs.readFileSync(path.join(DIRS.logs, `${logJob}.log`), 'utf8'); @@ -430,9 +640,10 @@ function drawLog() { function drawHelp() { const L = [ - '', ` ${C.bold}AGENT QUEUE — keys${C.reset}`, '', + '', ` ${C.bold}AGENT QUEUE — keys${C.reset}`, + FLEET.enabled ? ` ${c('cyan', 'fleet mode')} ${c('gray', '— board sourced from /fleet API; run/stop/promote disabled')}` : '', '', ` ${c('cyan', '↑/↓, j/k, 1-9')} select a job in the JOBS list`, - ` ${c('cyan', 'enter / l')} view the selected job's log (live)`, + ` ${c('cyan', 'enter / l')} ${FLEET.enabled ? "view the selected job's events" : "view the selected job's log (live)"}`, ` ${c('cyan', 'p')} promote (review → testing → shipped)`, ` ${c('cyan', 's')} ship (testing/QA → shipped, the manual gate)`, ` ${c('cyan', 'x')} reject (review/testing → failed) ${c('gray', '[confirm]')}`, @@ -459,16 +670,38 @@ const draw = () => { }; // ── main loop + key handling ──────────────────────────────────────── -if (!fs.existsSync(ROOT)) { - process.stdout.write(`agent-queue: queue root not found: ${ROOT}\nRun \`agent-queue.sh init\` first.\n`); - process.exit(1); +// Fleet mode (AQ_FLEET_DASH=1) sources the board from the /fleet API on an async, +// single-flight tick loop. Local mode keeps the original synchronous setInterval +// path byte-for-byte. `timer` holds whichever timer is live so quit() can clear it. +let timer = null; + +if (FLEET.enabled) { + if (!FLEET.ok) { + process.stdout.write( + `agent-queue: fleet dashboard enabled (AQ_FLEET_DASH=1) but missing config:\n` + + ` ${FLEET.missing.join(', ')}\n` + + `Set AQ_FLEET_API, AQ_FLEET_TOKEN and AQ_PRODUCT_ID, or unset AQ_FLEET_DASH.\n` + ); + process.exit(1); + } + draw(); // initial frame (loading…) + const tick = async () => { + await refreshFleet(); + if (mode !== 'log' && mode !== 'help') draw(); + timer = setTimeout(tick, INTERVAL); // single-flight: schedule only after the await + }; + tick(); +} else { + if (!fs.existsSync(ROOT)) { + process.stdout.write(`agent-queue: queue root not found: ${ROOT}\nRun \`agent-queue.sh init\` first.\n`); + process.exit(1); + } + draw(); + timer = setInterval(draw, INTERVAL); } -draw(); -const timer = setInterval(draw, INTERVAL); - const quit = () => { - clearInterval(timer); + if (timer) { clearTimeout(timer); clearInterval(timer); } try { if (process.stdin.isTTY) process.stdin.setRawMode(false); } catch { /* noop */ } process.stdout.write(C.reset + '\n'); process.exit(0); @@ -487,7 +720,7 @@ function onKey(key) { if (mode === 'help') { mode = 'board'; return draw(); } if (mode === 'log') { if (key === 'q' || key === '\u001b' || key === '\r' || key === '\n') { mode = 'board'; logJob = null; } - else if (key === 'g') { /* fallthrough to redraw */ } + else if (key === 'g') { if (FLEET.enabled && logJob) refreshFleetEvents(logJob); } return draw(); } if (mode === 'confirm') { @@ -504,14 +737,19 @@ function onKey(key) { case 'j': case '\u001b[B': moveSel(1); break; case 'k': case '\u001b[A': moveSel(-1); break; case '\r': case '\n': case 'l': - if (items[selIdx]) { logJob = items[selIdx].job; mode = 'log'; } + if (items[selIdx]) { + logJob = items[selIdx].job; mode = 'log'; + if (FLEET.enabled) refreshFleetEvents(logJob); + } break; case 'p': doAction('promote'); break; case 's': doAction('ship'); break; case 'x': doAction('reject'); break; case 'u': doAction('requeue'); break; - case 'r': startRun(); break; - case 'S': { const res = aq(['stop']); setFlash(c('red', '■ ') + (lastLine(res.out) || 'stopped')); break; } + case 'r': if (FLEET.enabled) setFlash(c('gray', 'run loop n/a in fleet mode')); else startRun(); break; + case 'S': + if (FLEET.enabled) { setFlash(c('gray', 'stop n/a in fleet mode')); break; } + { const res = aq(['stop']); setFlash(c('red', '■ ') + (lastLine(res.out) || 'stopped')); break; } default: if (/^[1-9]$/.test(key)) { const i = parseInt(key, 10) - 1; diff --git a/agent-queue/docs/GIGAFACTORY_ROADMAP.md b/agent-queue/docs/GIGAFACTORY_ROADMAP.md index 50c9635..5c343ca 100644 --- a/agent-queue/docs/GIGAFACTORY_ROADMAP.md +++ b/agent-queue/docs/GIGAFACTORY_ROADMAP.md @@ -281,7 +281,7 @@ Three transports were evaluated. **Decision: platform-service-native coordinator ### Phase 3 — Unified control plane - [ ] Add a **Fleet** surface to `tracker-web` reusing auth/Primitives/DataTable/product switcher: fleet map (factories + load/health), job table, job DAG, **live log streaming**, lease/heartbeat status, cost burndown, approve/ship buttons. - [ ] **Streaming caveat (correctness):** live logs **must not** use the existing buffering catch-all proxy `/api/tracker/[...path]` — it does `res.text()` and would never stream. Use a **dedicated Next.js Route Handler returning a `ReadableStream` (SSE)** or a direct SSE/WebSocket to platform-service. Full logs are shipped to blob storage (§17); the endpoint serves stored tail + live append. -- [ ] The Node TUI dashboard becomes a thin client of the same `/fleet` API (parity with web). +- [x] The Node TUI dashboard becomes a thin client of the same `/fleet` API (parity with web). *(devops-tools `agent-queue/dashboard.mjs` + `lib/fleet-dash.mjs`, `AQ_FLEET_DASH=1`.)* - **Acceptance:** an operator can watch all factories + tail any job log + ship from the browser. - **Verify gate:** web e2e (Playwright) covering fleet map render, live log, and a ship action. @@ -398,7 +398,7 @@ Each phase: **Goal → checklist → Exit criteria**. Don't start a phase until - [x] Cost burndown + budget kill-switch UI; multi-reviewer routing. *(common-plat `app/dashboard/fleet/budget/page.tsx` burndown + pause/resume; `ReviewGateCard` multi-reviewer quorum gate via `requestReview`/`submitReview`.)* - [x] Scoring router with configurable weights + explainability surfaced in UI. *(common-plat `fleet/scheduler.ts` tunable weights + `GET /fleet/jobs/:id/explain`; `ExplainPanel` breakdown in job detail.)* - [x] Preemption of low-priority by critical jobs (checkpoint + requeue). *(common-plat `fleet/scheduler.ts` `selectPreemptionVictim` + coordinator eviction under `FLEET_PREEMPTION`; victim requeued with checkpoint + bumped epoch, `preempted` event.)* -- [ ] TUI dashboard re-pointed at `/fleet` API (parity). +- [x] TUI dashboard re-pointed at `/fleet` API (parity). *(devops-tools `agent-queue/lib/fleet-dash.mjs` adapter + `dashboard.mjs` fleet mode under `AQ_FLEET_DASH=1`: board/factories/metrics/alerts, job actions ship/requeue/reject via `/fleet`, per-job events log; opt-in so local mode is byte-for-byte unchanged. Verified by `lib/fleet-dash.test.mjs` (22 assertions) wired into `selftest.sh` + live non-TTY render smoke.)* - [x] Web e2e (Playwright): fleet map, live log, ship, budget-pause. *(common-plat `dashboards/tracker-web/e2e/fleet.spec.ts`: fleet overview, metrics, job detail, ship, budget-pause, review-gate specs green.)* - **Exit criteria:** all boxes ✅; web `verify` (typecheck+lint+test+e2e) green; an operator runs the whole 3-repo parallel workload from the browser, including a budget pause + resume. diff --git a/agent-queue/lib/fleet-dash.mjs b/agent-queue/lib/fleet-dash.mjs new file mode 100644 index 0000000..b469bcc --- /dev/null +++ b/agent-queue/lib/fleet-dash.mjs @@ -0,0 +1,218 @@ +// fleet-dash.mjs — read/act adapter that re-points the agent-queue TUI dashboard +// at the platform-service `/fleet` REST API (roadmap Phase 3: "TUI dashboard +// re-pointed at /fleet API (parity)"). +// +// This module is intentionally pure-ish and dependency-injectable (the HTTP +// `fetchImpl` is a parameter) so it is unit-testable WITHOUT a live service. +// dashboard.mjs uses it ONLY when AQ_FLEET_DASH=1; otherwise the dashboard's +// local-queue behavior is byte-for-byte unchanged. +// +// Auth + scoping mirror agent-queue/lib/fleet-client.sh: +// base URL AQ_FLEET_API (already includes /api) +// bearer AQ_FLEET_TOKEN +// product X-Product-Id: AQ_PRODUCT_ID (sent on every request) + +const DEFAULT_TIMEOUT_MS = 8000; + +// fleetConfig(env) — resolve the dashboard's fleet mode from the environment. +// enabled iff AQ_FLEET_DASH=1 (explicit opt-in). `ok` additionally requires the +// api/token/product config to be complete; `missing` lists what's absent so the +// dashboard can fail visibly instead of silently doing nothing. +export function fleetConfig(env = process.env) { + const enabled = String(env.AQ_FLEET_DASH || '') === '1'; + const api = String(env.AQ_FLEET_API || '').replace(/\/+$/, ''); + const token = String(env.AQ_FLEET_TOKEN || ''); + const productId = String(env.AQ_PRODUCT_ID || ''); + const missing = []; + if (enabled) { + if (!api) missing.push('AQ_FLEET_API'); + if (!token) missing.push('AQ_FLEET_TOKEN'); + if (!productId) missing.push('AQ_PRODUCT_ID'); + } + return { enabled, api, token, productId, ok: enabled && missing.length === 0, missing }; +} + +// ── stage mapping ─────────────────────────────────────────────────────────── +// Fleet stages collapse onto the local board's kanban buckets so the dashboard's +// existing layout/gating/STAGE_TAG logic can be reused unchanged. +const STAGE_BUCKET = { + queued: 'inbox', + assigned: 'building', + building: 'building', + review: 'review', + testing: 'testing', + shipped: 'shipped', + failed: 'failed', + dead_letter: 'failed', +}; +export const mapStage = (s) => STAGE_BUCKET[s] || 'inbox'; + +// Stages an operator can act on from the dashboard (parity with the local +// ACTION_STAGES = review · testing · failed · inbox). +const ACTIONABLE = new Set(['review', 'testing', 'failed', 'inbox']); +const RUNNING_STAGES = new Set(['assigned', 'building']); +const ITEM_ORDER = { review: 0, testing: 1, failed: 2, inbox: 3 }; + +const trackerOf = (j) => j.trackerItemId || j.trackerItem || (j.data && j.data.trackerItem) || ''; +const capsOf = (j) => (Array.isArray(j.capabilities) ? j.capabilities.join(', ') : String(j.capabilities || '')); + +// toBoard({jobs, factories, metrics}) — normalize the raw API payloads into the +// shape the dashboard renders. Pure (no I/O), so it is fully unit-testable. +export function toBoard({ jobs = [], factories = [], metrics = null } = {}) { + const counts = { inbox: 0, building: 0, review: 0, testing: 0, shipped: 0, failed: 0 }; + const items = []; + const running = []; + const recent = []; + for (const j of jobs) { + const bucket = mapStage(j.stage); + if (counts[bucket] !== undefined) counts[bucket] += 1; + const norm = { + // `stage` is the bucket so the dashboard's gate()/STAGE_TAG work unchanged; + // `fleetStage` keeps the true server stage for display. + stage: bucket, + fleetStage: j.stage, + id: j.id, + priority: j.priority || 'medium', + profile: j.profile || '', + capabilities: capsOf(j), + tracker_item: trackerOf(j), + leaseEpoch: j.leaseEpoch, + factoryId: j.factoryId || j.leaseFactoryId || '', + attempts: j.attempts, + updatedAt: j.updatedAt || j.createdAt || '', + raw: j, + }; + if (RUNNING_STAGES.has(j.stage)) running.push(norm); + if (ACTIONABLE.has(bucket)) items.push(norm); + if (bucket === 'shipped' || bucket === 'failed') recent.push(norm); + } + items.sort((a, b) => (ITEM_ORDER[a.stage] - ITEM_ORDER[b.stage]) || cmp(a.id, b.id)); + recent.sort((a, b) => cmp(String(b.updatedAt), String(a.updatedAt))); + return { counts, items, running, recent: recent.slice(0, 5), factories, metrics }; +} + +const cmp = (a, b) => (a < b ? -1 : a > b ? 1 : 0); + +// ── HTTP ──────────────────────────────────────────────────────────────────-- +// fleetFetch — a single request against the coordinator. NEVER throws: network +// errors / timeouts / non-JSON bodies are returned as a structured result so the +// TUI stays responsive. A timeout is enforced via AbortController. +export async function fleetFetch(cfg, pathname, opts = {}, fetchImpl = globalThis.fetch) { + const url = `${cfg.api}${pathname}`; + const headers = { + Authorization: `Bearer ${cfg.token}`, + 'X-Product-Id': cfg.productId, + Accept: 'application/json', + }; + const hasBody = opts.body !== undefined; + if (hasBody) headers['Content-Type'] = 'application/json'; + const ac = new AbortController(); + const timer = setTimeout(() => ac.abort(), opts.timeoutMs || DEFAULT_TIMEOUT_MS); + try { + const res = await fetchImpl(url, { + method: opts.method || 'GET', + headers, + body: hasBody ? JSON.stringify(opts.body) : undefined, + signal: ac.signal, + }); + let json = null; + let text = ''; + try { text = await res.text(); } catch { /* ignore body read errors */ } + if (text) { try { json = JSON.parse(text); } catch { json = null; } } + return { ok: !!res.ok, status: res.status, json }; + } catch (e) { + const timedOut = e && (e.name === 'AbortError' || e.code === 'ABORT_ERR'); + return { ok: false, status: 0, json: null, error: timedOut ? 'timeout' : (e && e.message) || 'network error' }; + } finally { + clearTimeout(timer); + } +} + +// fetchBoard — assemble the board model. Jobs are REQUIRED (failure ⇒ board +// fails). Metrics + factories are best-effort: a missing/404/501 factories +// endpoint degrades to [] (it is optional server-side), and absent metrics +// simply omits the aggregate panel. +export async function fetchBoard(cfg, fetchImpl = globalThis.fetch) { + const jobsRes = await fleetFetch(cfg, '/fleet/jobs', {}, fetchImpl); + if (!jobsRes.ok || !jobsRes.json) { + return { ok: false, error: jobsRes.error || `jobs HTTP ${jobsRes.status}` }; + } + const jobs = Array.isArray(jobsRes.json.jobs) ? jobsRes.json.jobs : []; + + const metricsRes = await fleetFetch(cfg, '/fleet/metrics', {}, fetchImpl); + const metrics = metricsRes.ok && metricsRes.json ? metricsRes.json : null; + + const facRes = await fleetFetch(cfg, '/fleet/factories', {}, fetchImpl); + const factories = facRes.ok && facRes.json && Array.isArray(facRes.json.factories) + ? facRes.json.factories + : []; + + return { ok: true, board: toBoard({ jobs, factories, metrics }) }; +} + +// formatEvent — one fleet event → a single log line for the TUI log view. +export function formatEvent(e) { + const at = e.at ? safeTime(e.at) : ''; + const actor = e.actor ? ` ${e.actor}` : ''; + const data = e.data && typeof e.data === 'object' && Object.keys(e.data).length + ? ` ${JSON.stringify(e.data)}` + : ''; + return `${at} ${e.type || '?'}${actor}${data}`.trim(); +} + +const safeTime = (iso) => { + const d = new Date(iso); + return Number.isNaN(d.getTime()) ? String(iso) : d.toLocaleTimeString(); +}; + +// fetchEvents — the job's event stream rendered as log lines. +export async function fetchEvents(cfg, jobId, fetchImpl = globalThis.fetch) { + const res = await fleetFetch(cfg, `/fleet/jobs/${encodeURIComponent(jobId)}/events`, {}, fetchImpl); + if (!res.ok || !res.json) { + return { ok: false, error: res.error || `events HTTP ${res.status}`, lines: [] }; + } + const events = Array.isArray(res.json.events) ? res.json.events : []; + return { ok: true, lines: events.map(formatEvent) }; +} + +// Operator verbs the dashboard supports in fleet mode. `promote` has no safe +// server contract (client-inferred stage transitions could violate workflow +// invariants), so it is explicitly unavailable here. +const FLEET_VERBS = new Set(['ship', 'requeue', 'reject']); + +// jobAction — execute an operator verb against the coordinator. +// ship → re-GET the job for a FRESH leaseEpoch, then PATCH stage=shipped +// (a stale snapshot epoch would be fenced with 409). +// requeue → POST /actions/requeue (lease-free operator action) +// reject → POST /actions/reject +// Returns {ok, message}; never throws. +export async function jobAction(cfg, item, verb, fetchImpl = globalThis.fetch) { + if (verb === 'promote') return { ok: false, message: 'promote is not available in fleet mode' }; + if (!FLEET_VERBS.has(verb)) return { ok: false, message: `${verb} not supported in fleet mode` }; + const id = item && item.id; + if (!id) return { ok: false, message: 'no job selected' }; + + if (verb === 'ship') { + const cur = await fleetFetch(cfg, `/fleet/jobs/${encodeURIComponent(id)}`, {}, fetchImpl); + if (!cur.ok || !cur.json) return { ok: false, message: cur.error || `job HTTP ${cur.status}` }; + const res = await fleetFetch( + cfg, + `/fleet/jobs/${encodeURIComponent(id)}`, + { method: 'PATCH', body: { stage: 'shipped', leaseEpoch: cur.json.leaseEpoch } }, + fetchImpl, + ); + if (res.status === 409) return { ok: false, message: 'job changed (fenced) — refresh and retry' }; + if (!res.ok) return { ok: false, message: res.error || `ship HTTP ${res.status}` }; + return { ok: true, message: `shipped ${id}` }; + } + + const res = await fleetFetch( + cfg, + `/fleet/jobs/${encodeURIComponent(id)}/actions/${verb}`, + { method: 'POST', body: {} }, + fetchImpl, + ); + if (res.status === 409) return { ok: false, message: 'job conflict/terminal — refresh and retry' }; + if (!res.ok) return { ok: false, message: res.error || `${verb} HTTP ${res.status}` }; + return { ok: true, message: `${verb} ${id}` }; +} diff --git a/agent-queue/lib/fleet-dash.test.mjs b/agent-queue/lib/fleet-dash.test.mjs new file mode 100644 index 0000000..40ed8a3 --- /dev/null +++ b/agent-queue/lib/fleet-dash.test.mjs @@ -0,0 +1,287 @@ +// fleet-dash.test.mjs — dependency-light unit tests for the fleet-mode dashboard +// adapter. Uses node:assert only (no test framework), matching the repo style. +// Run: `node fleet-dash.test.mjs` (also wired into selftest.sh). +// +// These tests prove the dashboard's CONTRACT ASSUMPTIONS against the /fleet API +// (request shaping, response mapping, graceful degradation, action semantics) +// via an injected fetch stub. They do NOT prove live server compatibility. + +import assert from 'node:assert/strict'; +import { + fleetConfig, + mapStage, + toBoard, + fleetFetch, + fetchBoard, + fetchEvents, + formatEvent, + jobAction, +} from './fleet-dash.mjs'; + +let passed = 0; +const t = (name, fn) => { + try { + const r = fn(); + if (r && typeof r.then === 'function') { + return r.then( + () => { passed += 1; }, + (e) => { console.error(` ✗ ${name}\n ${e && e.message}`); process.exitCode = 1; }, + ); + } + passed += 1; + } catch (e) { + console.error(` ✗ ${name}\n ${e && e.message}`); + process.exitCode = 1; + } + return undefined; +}; + +// A recording fetch stub. `routes` maps a matcher → {status, body} (or a fn). +function makeFetch(routes) { + const calls = []; + const fetchImpl = async (url, opts = {}) => { + calls.push({ url, opts, headers: opts.headers || {}, method: opts.method || 'GET' }); + let entry = routes[url]; + if (!entry) { + // try suffix match (path only) + const key = Object.keys(routes).find((k) => url.endsWith(k)); + entry = key ? routes[key] : undefined; + } + if (typeof entry === 'function') entry = entry({ url, opts }); + if (entry === undefined) return mkRes(404, '{}'); + if (entry.throw) throw Object.assign(new Error(entry.throw), { name: entry.name || 'Error' }); + return mkRes(entry.status ?? 200, entry.body ?? ''); + }; + fetchImpl.calls = calls; + return fetchImpl; +} +const mkRes = (status, body) => ({ + ok: status >= 200 && status < 300, + status, + text: async () => (typeof body === 'string' ? body : JSON.stringify(body)), +}); + +const CFG = { enabled: true, ok: true, api: 'http://svc/api', token: 'tok', productId: 'prodX', missing: [] }; + +await (async () => { + // ── fleetConfig ── + t('fleetConfig: AQ_FLEET_DASH unset ⇒ disabled', () => { + const c = fleetConfig({}); + assert.equal(c.enabled, false); + assert.equal(c.ok, false); + }); + t('fleetConfig: enabled but missing config ⇒ not ok, lists missing', () => { + const c = fleetConfig({ AQ_FLEET_DASH: '1' }); + assert.equal(c.enabled, true); + assert.equal(c.ok, false); + assert.deepEqual(c.missing.sort(), ['AQ_FLEET_API', 'AQ_FLEET_TOKEN', 'AQ_PRODUCT_ID'].sort()); + }); + t('fleetConfig: enabled + complete ⇒ ok, trims trailing slash', () => { + const c = fleetConfig({ AQ_FLEET_DASH: '1', AQ_FLEET_API: 'http://svc/api/', AQ_FLEET_TOKEN: 'k', AQ_PRODUCT_ID: 'p' }); + assert.equal(c.ok, true); + assert.equal(c.api, 'http://svc/api'); + }); + + // ── mapStage ── + t('mapStage: fleet stages collapse to board buckets', () => { + assert.equal(mapStage('queued'), 'inbox'); + assert.equal(mapStage('assigned'), 'building'); + assert.equal(mapStage('building'), 'building'); + assert.equal(mapStage('review'), 'review'); + assert.equal(mapStage('testing'), 'testing'); + assert.equal(mapStage('shipped'), 'shipped'); + assert.equal(mapStage('failed'), 'failed'); + assert.equal(mapStage('dead_letter'), 'failed'); + assert.equal(mapStage('weird'), 'inbox'); + }); + + // ── toBoard ── + t('toBoard: counts, actionable items, running, recent', () => { + const jobs = [ + { id: 'a', stage: 'queued', priority: 'high', capabilities: ['os:any'] }, + { id: 'b', stage: 'building', priority: 'critical', factoryId: 'mac-1', leaseEpoch: 3 }, + { id: 'c', stage: 'review', updatedAt: '2026-01-01T00:00:02Z' }, + { id: 'd', stage: 'testing' }, + { id: 'e', stage: 'shipped', updatedAt: '2026-01-01T00:00:09Z' }, + { id: 'f', stage: 'failed', updatedAt: '2026-01-01T00:00:05Z' }, + { id: 'g', stage: 'dead_letter', updatedAt: '2026-01-01T00:00:01Z' }, + ]; + const b = toBoard({ jobs }); + assert.equal(b.counts.inbox, 1); + assert.equal(b.counts.building, 1); + assert.equal(b.counts.review, 1); + assert.equal(b.counts.testing, 1); + assert.equal(b.counts.shipped, 1); + assert.equal(b.counts.failed, 2); // failed + dead_letter + // running = assigned/building only + assert.deepEqual(b.running.map((x) => x.id), ['b']); + assert.equal(b.running[0].fleetStage, 'building'); + assert.equal(b.running[0].factoryId, 'mac-1'); + // actionable items exclude building/shipped, ordered review x.id), ['c', 'd', 'f', 'g', 'a']); + // item.stage is the bucket (so dashboard gate()/STAGE_TAG reuse works) + assert.equal(b.items[0].stage, 'review'); + assert.equal(b.items[4].stage, 'inbox'); + // recent = shipped+failed, newest first, capped at 5 + assert.deepEqual(b.recent.map((x) => x.id), ['e', 'f', 'g']); + }); + + // ── fleetFetch: headers + product scoping on every request ── + await t('fleetFetch: sends bearer + X-Product-Id; parses JSON', async () => { + const f = makeFetch({ '/fleet/jobs': { status: 200, body: { jobs: [] } } }); + const r = await fleetFetch(CFG, '/fleet/jobs', {}, f); + assert.equal(r.ok, true); + assert.deepEqual(r.json, { jobs: [] }); + const h = f.calls[0].headers; + assert.equal(h.Authorization, 'Bearer tok'); + assert.equal(h['X-Product-Id'], 'prodX'); + assert.equal(f.calls[0].url, 'http://svc/api/fleet/jobs'); + }); + await t('fleetFetch: network error ⇒ ok:false with message (no throw)', async () => { + const f = makeFetch({ '/fleet/jobs': { throw: 'boom' } }); + const r = await fleetFetch(CFG, '/fleet/jobs', {}, f); + assert.equal(r.ok, false); + assert.equal(r.status, 0); + assert.match(r.error, /boom/); + }); + await t('fleetFetch: abort ⇒ timeout error', async () => { + const f = makeFetch({ '/fleet/jobs': { throw: 'aborted', name: 'AbortError' } }); + const r = await fleetFetch(CFG, '/fleet/jobs', {}, f); + assert.equal(r.ok, false); + assert.equal(r.error, 'timeout'); + }); + await t('fleetFetch: non-JSON 500 body ⇒ ok:false, json null', async () => { + const f = makeFetch({ '/fleet/jobs': { status: 500, body: 'err' } }); + const r = await fleetFetch(CFG, '/fleet/jobs', {}, f); + assert.equal(r.ok, false); + assert.equal(r.status, 500); + assert.equal(r.json, null); + }); + + // ── fetchBoard: assembly + degradation ── + await t('fetchBoard: jobs + metrics + factories assembled', async () => { + const f = makeFetch({ + '/fleet/jobs': { body: { jobs: [{ id: 'a', stage: 'review' }] } }, + '/fleet/metrics': { body: { utilizationPct: 50, alerts: [] } }, + '/fleet/factories': { body: { factories: [{ factoryId: 'mac-1', health: 'ok' }] } }, + }); + const r = await fetchBoard(CFG, f); + assert.equal(r.ok, true); + assert.equal(r.board.items.length, 1); + assert.equal(r.board.metrics.utilizationPct, 50); + assert.equal(r.board.factories.length, 1); + }); + await t('fetchBoard: factories 404 ⇒ degrades to []', async () => { + const f = makeFetch({ + '/fleet/jobs': { body: { jobs: [] } }, + '/fleet/metrics': { body: {} }, + '/fleet/factories': { status: 404, body: {} }, + }); + const r = await fetchBoard(CFG, f); + assert.equal(r.ok, true); + assert.deepEqual(r.board.factories, []); + }); + await t('fetchBoard: factories 501 ⇒ degrades to []', async () => { + const f = makeFetch({ + '/fleet/jobs': { body: { jobs: [] } }, + '/fleet/metrics': { body: {} }, + '/fleet/factories': { status: 501, body: {} }, + }); + const r = await fetchBoard(CFG, f); + assert.equal(r.ok, true); + assert.deepEqual(r.board.factories, []); + }); + await t('fetchBoard: metrics failure ⇒ board still ok, metrics null', async () => { + const f = makeFetch({ + '/fleet/jobs': { body: { jobs: [] } }, + '/fleet/metrics': { status: 500, body: 'oops' }, + '/fleet/factories': { body: { factories: [] } }, + }); + const r = await fetchBoard(CFG, f); + assert.equal(r.ok, true); + assert.equal(r.board.metrics, null); + }); + await t('fetchBoard: jobs failure ⇒ board fails with error', async () => { + const f = makeFetch({ '/fleet/jobs': { status: 503, body: '{}' } }); + const r = await fetchBoard(CFG, f); + assert.equal(r.ok, false); + assert.match(r.error, /503/); + }); + + // ── events ── + t('formatEvent: renders type + actor + data', () => { + const line = formatEvent({ type: 'claimed', actor: 'mac-1', at: '2026-01-01T00:00:00Z', data: { leaseEpoch: 2 } }); + assert.match(line, /claimed/); + assert.match(line, /mac-1/); + assert.match(line, /leaseEpoch/); + }); + await t('fetchEvents: maps events to lines', async () => { + const f = makeFetch({ + '/events': { body: { events: [{ type: 'queued', at: '2026-01-01T00:00:00Z', data: {} }, { type: 'claimed', data: {} }] } }, + }); + const r = await fetchEvents(CFG, 'job-1', f); + assert.equal(r.ok, true); + assert.equal(r.lines.length, 2); + assert.match(r.lines[1], /claimed/); + assert.match(f.calls[0].url, /\/fleet\/jobs\/job-1\/events$/); + }); + await t('fetchEvents: failure ⇒ ok:false, empty lines', async () => { + const f = makeFetch({ '/events': { status: 500, body: 'x' } }); + const r = await fetchEvents(CFG, 'job-1', f); + assert.equal(r.ok, false); + assert.deepEqual(r.lines, []); + }); + + // ── jobAction ── + await t('jobAction: ship re-GETs fresh leaseEpoch then PATCHes shipped', async () => { + let patchBody = null; + const f = makeFetch({ + 'http://svc/api/fleet/jobs/j1': ({ opts }) => { + if ((opts.method || 'GET') === 'PATCH') { patchBody = JSON.parse(opts.body); return { status: 200, body: { id: 'j1', stage: 'shipped' } }; } + return { status: 200, body: { id: 'j1', stage: 'testing', leaseEpoch: 7 } }; // fresh epoch + }, + }); + const r = await jobAction(CFG, { id: 'j1', leaseEpoch: 2 /* stale */ }, 'ship', f); + assert.equal(r.ok, true); + assert.equal(patchBody.stage, 'shipped'); + assert.equal(patchBody.leaseEpoch, 7); // used the freshly-fetched epoch, not the stale 2 + }); + await t('jobAction: ship 409 ⇒ actionable fenced message', async () => { + const f = makeFetch({ + 'http://svc/api/fleet/jobs/j1': ({ opts }) => (opts.method === 'PATCH' + ? { status: 409, body: '{}' } + : { status: 200, body: { id: 'j1', leaseEpoch: 7 } }), + }); + const r = await jobAction(CFG, { id: 'j1' }, 'ship', f); + assert.equal(r.ok, false); + assert.match(r.message, /fenced|refresh/i); + }); + await t('jobAction: requeue ⇒ POST /actions/requeue', async () => { + const f = makeFetch({ '/fleet/jobs/j1/actions/requeue': { status: 200, body: { id: 'j1', stage: 'queued' } } }); + const r = await jobAction(CFG, { id: 'j1' }, 'requeue', f); + assert.equal(r.ok, true); + assert.equal(f.calls[0].method, 'POST'); + assert.match(f.calls[0].url, /\/actions\/requeue$/); + }); + await t('jobAction: reject 409 ⇒ conflict message', async () => { + const f = makeFetch({ '/fleet/jobs/j1/actions/reject': { status: 409, body: '{}' } }); + const r = await jobAction(CFG, { id: 'j1' }, 'reject', f); + assert.equal(r.ok, false); + assert.match(r.message, /conflict|terminal|refresh/i); + }); + t('jobAction: promote ⇒ explicitly unavailable in fleet mode', async () => { + return jobAction(CFG, { id: 'j1' }, 'promote', makeFetch({})).then((r) => { + assert.equal(r.ok, false); + assert.match(r.message, /promote/i); + }); + }); +})(); + +// Summary line (selftest greps for PASS). +process.on('exit', () => { + if (process.exitCode && process.exitCode !== 0) { + console.error('fleet-dash.test FAIL'); + } else { + console.log(`fleet-dash.test PASS (${passed} assertions)`); + } +}); diff --git a/agent-queue/selftest.sh b/agent-queue/selftest.sh index 4870625..beedf8f 100755 --- a/agent-queue/selftest.sh +++ b/agent-queue/selftest.sh @@ -41,6 +41,12 @@ bash -n "${BASH_SOURCE[0]}" && pass "bash -n selftest.sh" # 3. dashboard syntax (optional) if command -v node >/dev/null 2>&1; then node --check "$HERE/dashboard.mjs" && pass "node --check dashboard.mjs" + node --check "$HERE/lib/fleet-dash.mjs" && pass "node --check lib/fleet-dash.mjs" + if out="$(node "$HERE/lib/fleet-dash.test.mjs" 2>&1)" && printf '%s' "$out" | grep -q 'fleet-dash.test PASS'; then + pass "fleet-dash.test (${out##*PASS })" + else + printf '%s\n' "$out"; fail "fleet-dash.test" + fi else info "node not installed — skipping dashboard check" fi