feat(agent-queue): re-point TUI dashboard at /fleet API (parity)

Add an opt-in fleet mode to the dashboard so an operator can drive the
coordinator fleet from the same TUI used for the local folder queue.

- lib/fleet-dash.mjs: dependency-injectable read/act adapter over the
  platform-service /fleet REST surface (jobs, metrics, factories, events,
  ship/requeue/reject). Pure-ish + fully unit-testable without a live service.
- dashboard.mjs: render + act in fleet mode when AQ_FLEET_DASH=1 — board with
  counts, factories (per-factory rows or metrics aggregate), alerts, running
  (by lease/factory), actionable JOBS with manifest tags, recent, and a
  per-job events log. Single-flight async refresh keeps the last good board on
  failure; ship re-GETs a fresh leaseEpoch before PATCH; run/stop/promote are
  disabled (no safe server contract). Local mode is byte-for-byte unchanged.
- lib/fleet-dash.test.mjs: 22 node:assert assertions (config, stage mapping,
  toBoard, fetch headers/timeout/errors, board assembly + graceful degradation,
  events, job actions) wired into selftest.sh.
- docs: tick the Phase 3 "TUI re-pointed at /fleet" roadmap boxes.

Verified: selftest.sh green (incl. new fleet-dash checks); live non-TTY render
smoke against a stub /fleet server (both factories and metrics-aggregate paths);
local mode unchanged.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
This commit is contained in:
Saravanakumar D 2026-05-30 19:47:05 -07:00
parent 1a5b791e2e
commit 66c91233da
5 changed files with 765 additions and 16 deletions

View File

@ -23,6 +23,7 @@ import fs from 'node:fs';
import path from 'node:path';
import { fileURLToPath } from 'node:url';
import { execFileSync, spawn } from 'node:child_process';
import { fleetConfig, fetchBoard, fetchEvents, jobAction } from './lib/fleet-dash.mjs';
const __dirname = path.dirname(fileURLToPath(import.meta.url));
@ -200,6 +201,44 @@ const aq = (args) => {
}
};
// ── fleet mode (Phase 3: TUI re-pointed at /fleet) ──────────────────
// Opt-in via AQ_FLEET_DASH=1. When ON, the board is sourced from the
// platform-service /fleet REST API instead of the local queue; when OFF, every
// fleet code path below is skipped and the dashboard is byte-for-byte the local
// tool. All fleet I/O lives in lib/fleet-dash.mjs (injectable + unit-tested).
const FLEET = fleetConfig();
// Latest fleet board snapshot. On a refresh failure we KEEP the last good board
// (no destructive flicker) and surface a staleness banner instead.
let fleetState = { board: null, error: null, loading: FLEET.enabled, lastOk: 0 };
let fleetRefreshing = false; // single-flight guard (no overlapping fetches)
let fleetEvents = { jobId: null, lines: [], error: null, loading: false };
// refreshFleet() — single-flight board refresh. Applies a successful board;
// on failure preserves the previous board and records the error.
const refreshFleet = async () => {
if (fleetRefreshing) return;
fleetRefreshing = true;
try {
const r = await fetchBoard(FLEET);
if (r.ok) fleetState = { board: r.board, error: null, loading: false, lastOk: Date.now() };
else fleetState = { ...fleetState, error: r.error, loading: false };
} catch (e) {
fleetState = { ...fleetState, error: (e && e.message) || 'refresh failed', loading: false };
} finally {
fleetRefreshing = false;
}
};
// refreshFleetEvents(job) — load a job's event stream into the log view state.
const refreshFleetEvents = (job) => {
fleetEvents = { jobId: job, lines: [], error: null, loading: true };
fetchEvents(FLEET, job).then((r) => {
fleetEvents = { jobId: job, lines: r.lines || [], error: r.ok ? null : r.error, loading: false };
if (mode === 'log' && logJob === job) draw();
});
};
// daemonPid() — pid of a live `run` loop, or null.
const daemonPid = () => {
try {
@ -238,6 +277,11 @@ const setFlash = (msg, ms = 4000) => { flash = msg; flashUntil = Date.now() + ms
const flashLine = () => (flash && Date.now() < flashUntil ? flash : '');
const buildItems = () => {
if (FLEET.enabled) {
const b = fleetState.board;
if (!b) return [];
return b.items.map((it) => ({ stage: it.stage, job: it.id, fleet: it }));
}
const list = [];
for (const st of ACTION_STAGES) {
for (const f of listMd(DIRS[st]).sort()) list.push({ stage: st, job: f.replace(/\.md$/, '') });
@ -270,16 +314,31 @@ const gate = (verb, stage) => ({
logs: true,
}[verb]);
// doAction(verb) — run the gated action on the selected job via agent-queue.sh.
// doAction(verb) — run the gated action on the selected job. In fleet mode it
// calls the /fleet API (lib/fleet-dash.mjs); otherwise it shells out to
// agent-queue.sh. promote is unavailable in fleet mode (no safe server contract).
const doAction = (verb) => {
const it = items[selIdx];
if (!it) { setFlash(c('gray', 'no job selected')); return; }
if (FLEET.enabled && verb === 'promote') {
setFlash(c('gray', 'promote n/a in fleet mode (use ship/requeue/reject)'));
return;
}
if (!gate(verb, it.stage)) { setFlash(c('gray', `${verb} not valid for a ${it.stage} job`)); return; }
if ((verb === 'reject' || verb === 'requeue') && mode !== 'confirm') {
confirmAction = { verb, job: it.job, run: () => doAction(verb) };
mode = 'confirm';
return;
}
if (FLEET.enabled) {
setFlash(c('gray', `${verb}`));
mode = 'board'; confirmAction = null;
jobAction(FLEET, it.fleet, verb).then((r) => {
setFlash((r.ok ? c('green', '✓ ') : c('red', '✗ ')) + (r.message || `${verb} ${it.job}`));
refreshFleet().then(draw);
});
return;
}
const r = aq([verb, it.job]);
setFlash((r.ok ? c('green', '✓ ') : c('red', '✗ ')) + (lastLine(r.out) || `${verb} ${it.job}`));
mode = 'board'; confirmAction = null;
@ -288,7 +347,148 @@ const doAction = (verb) => {
// ── render ──────────────────────────────────────────────────────────
const ENGINE_COLOR = { devin: 'cyan', claude: 'yellow', codex: 'green' };
const FLEET_STAGE_COLOR = {
queued: 'blue', assigned: 'yellow', building: 'yellow',
review: 'cyan', testing: 'cyan', shipped: 'green',
failed: 'red', dead_letter: 'red',
};
// drawFleetBoard() — the board sourced from the /fleet API (AQ_FLEET_DASH=1).
// Mirrors the local board layout; running rows reflect lease/factory status
// (there is no local PID/liveness in fleet mode).
function drawFleetBoard() {
items = buildItems();
syncSelection();
const board = fleetState.board;
const counts = board ? board.counts : { inbox: 0, building: 0, review: 0, testing: 0, shipped: 0, failed: 0 };
const running = board ? board.running : [];
const recent = board ? board.recent : [];
const out = [];
out.push('');
out.push(` ${C.bold}AGENT QUEUE${C.reset} ${c('cyan', 'fleet')} ${c('gray', FLEET.api)}`);
const staleSec = fleetState.lastOk ? Math.floor((Date.now() - fleetState.lastOk) / 1000) : null;
let statusBit;
if (fleetState.loading && !board) statusBit = c('gray', '◌ loading…');
else if (fleetState.error) statusBit = c('red', `${trunc(fleetState.error, 40)}${board ? ` (stale ${staleSec}s)` : ''}`);
else statusBit = c('green', `● live${staleSec !== null ? ` (${staleSec}s ago)` : ''}`);
out.push(
` ${c('gray', new Date().toLocaleTimeString())} refresh ${INTERVAL / 1000}s ${statusBit}` +
` ${c('gray', `product ${FLEET.productId}`)} ${c('gray', INTERACTIVE ? 'press ? for help' : 'read-only')}`
);
out.push('');
out.push(
` ${c('blue', '▢ inbox')} ${String(counts.inbox).padEnd(3)}` +
` ${c('yellow', '◧ building')} ${String(counts.building).padEnd(3)}` +
` ${c('cyan', '◔ review')} ${String(counts.review).padEnd(3)}` +
` ${c('cyan', '◕ testing')} ${String(counts.testing).padEnd(3)}` +
` ${c('green', '▣ shipped')} ${String(counts.shipped).padEnd(3)}` +
` ${c('red', '✕ failed')} ${String(counts.failed).padEnd(3)}` +
` ${C.bold}running ${running.length}${C.reset}`
);
out.push('');
// factories (per-factory rows when /fleet/factories exists; else metrics aggregate)
out.push(` ${C.bold}FACTORIES${C.reset}`);
const factories = board ? board.factories : [];
const metrics = board ? board.metrics : null;
if (factories.length > 0) {
for (const f of factories) {
const health = String(f.health || 'ok');
const hc = health === 'ok' ? 'green' : health === 'degraded' ? 'yellow' : 'red';
out.push(
` ${c('bold', trunc(f.factoryId || f.id || '?', 24).padEnd(24))} ` +
`${c(hc, health.padEnd(9))} ` +
`${c('gray', `load ${f.load ?? '?'}/${f.seatLimit ?? '?'}`)} ` +
`${c('gray', trunc((Array.isArray(f.capabilities) ? f.capabilities.join(', ') : f.capabilities) || '', 36))}`
);
}
} else if (metrics && metrics.factory) {
const fm = metrics.factory;
const bh = fm.byHealth || {};
out.push(
` ${c('green', `ok ${bh.ok ?? 0}`)} ${c('yellow', `degraded ${bh.degraded ?? 0}`)} ${c('red', `down ${bh.down ?? 0}`)}` +
` ${c('gray', `live ${fm.live ?? '?'} · stale ${fm.stale ?? '?'}`)}` +
` ${c('gray', `seats ${fm.seatsUsed ?? '?'}/${fm.seatsTotal ?? '?'}`)}` +
` ${c('gray', `util ${metrics.utilizationPct ?? '?'}%`)}`
);
} else {
out.push(` ${c('dim', 'no factory data')}`);
}
// alerts (from metrics)
if (metrics && Array.isArray(metrics.alerts) && metrics.alerts.length > 0) {
for (const a of metrics.alerts) {
const sev = a.severity === 'critical' ? 'red' : 'yellow';
out.push(` ${c(sev, '⚠ ')}${c(sev, a.kind || 'alert')}${a.message ? c('gray', `${trunc(a.message, 50)}`) : ''}`);
}
}
out.push('');
// running (lease/factory status — no local pid)
out.push(` ${C.bold}RUNNING${C.reset}`);
if (running.length === 0) {
out.push(` ${c('dim', 'no jobs in flight')}`);
} else {
for (const r of running) {
const sc = FLEET_STAGE_COLOR[r.fleetStage] || 'gray';
out.push(
` ${c('bold', trunc(r.id, 30).padEnd(30))} ` +
`${c(sc, String(r.fleetStage).padEnd(9))} ` +
`${c('gray', r.factoryId ? `@${trunc(r.factoryId, 18)}` : 'unassigned')}`
);
const mtags = manifestTags(r);
if (mtags) out.push(` ${mtags}`);
}
}
out.push('');
// actionable jobs (numbered + selectable) — reuses STAGE_TAG buckets
out.push(` ${C.bold}JOBS${C.reset} ${c('gray', '(review · testing · failed · inbox)')}`);
if (items.length === 0) {
out.push(` ${c('dim', board ? 'no actionable jobs' : 'waiting for fleet…')}`);
} else {
items.forEach((it, i) => {
const sel = i === selIdx;
const ptr = sel ? c('cyan', '▶') : ' ';
const num = c('gray', String(i + 1).padStart(2) + '.');
const tag = (STAGE_TAG[it.stage] || (() => `[${it.stage}]`))();
const name = sel ? `${C.bold}${trunc(it.job, 46)}${C.reset}` : trunc(it.job, 46);
out.push(` ${ptr} ${num} ${tag} ${name}`);
const jtags = manifestTags(it.fleet);
if (jtags) out.push(` ${jtags}`);
});
}
out.push('');
// recent (shipped + failed)
out.push(` ${C.bold}RECENT${C.reset}`);
if (recent.length === 0) {
out.push(` ${c('dim', 'nothing finished yet')}`);
} else {
for (const r of recent) {
const failedRes = r.stage === 'failed';
const mark = failedRes ? c('red', '✕') : c('green', '▣');
const label = failedRes ? c('red', r.fleetStage) : c('green', r.fleetStage);
const when = r.updatedAt ? new Date(r.updatedAt).toLocaleTimeString() : '';
out.push(` ${mark} ${trunc(r.id, 34).padEnd(34)} ${label} ${c('gray', when)}`);
}
}
out.push('');
// flash + footer
const fl = flashLine();
if (fl) out.push(` ${fl}`);
if (mode === 'confirm' && confirmAction) {
out.push(` ${c('yellow', `${confirmAction.verb} "${confirmAction.job}" ? `)}${C.bold}y${C.reset}${c('gray', '/')}${C.bold}n${C.reset}`);
} else if (INTERACTIVE) {
out.push(c('gray', ' ↑/↓ select · enter events · s ship · x reject · u requeue'));
out.push(c('gray', ' g refresh · ? help · q quit'));
}
process.stdout.write('\x1b[2J\x1b[H' + out.join('\n') + '\n');
}
function drawBoard() {
if (FLEET.enabled) return drawFleetBoard();
const metas = readMetas();
const metaByJob = Object.fromEntries(metas.filter((m) => m.job).map((m) => [m.job, m]));
const running = metas.filter((m) => !m.ended && pidAlive(m.pid));
@ -419,6 +619,16 @@ function drawBoard() {
function drawLog() {
const rows = (process.stdout.rows || 30) - 6;
if (FLEET.enabled) {
let body;
if (fleetEvents.loading) body = c('gray', ' loading events…');
else if (fleetEvents.error) body = c('red', ` ${fleetEvents.error}`);
else if (!fleetEvents.lines.length) body = c('gray', ' no events for this job');
else body = fleetEvents.lines.slice(-rows).join('\n');
const head = ` ${C.bold}EVENTS${C.reset} ${c('cyan', logJob)} ${c('gray', 'q/esc back · g refresh')}`;
process.stdout.write('\x1b[2J\x1b[H' + head + '\n' + c('gray', ' ' + '─'.repeat(60)) + '\n' + body + '\n');
return;
}
let body = `no log for ${logJob}`;
try {
const txt = fs.readFileSync(path.join(DIRS.logs, `${logJob}.log`), 'utf8');
@ -430,9 +640,10 @@ function drawLog() {
function drawHelp() {
const L = [
'', ` ${C.bold}AGENT QUEUE — keys${C.reset}`, '',
'', ` ${C.bold}AGENT QUEUE — keys${C.reset}`,
FLEET.enabled ? ` ${c('cyan', 'fleet mode')} ${c('gray', '— board sourced from /fleet API; run/stop/promote disabled')}` : '', '',
` ${c('cyan', '↑/↓, j/k, 1-9')} select a job in the JOBS list`,
` ${c('cyan', 'enter / l')} view the selected job's log (live)`,
` ${c('cyan', 'enter / l')} ${FLEET.enabled ? "view the selected job's events" : "view the selected job's log (live)"}`,
` ${c('cyan', 'p')} promote (review → testing → shipped)`,
` ${c('cyan', 's')} ship (testing/QA → shipped, the manual gate)`,
` ${c('cyan', 'x')} reject (review/testing → failed) ${c('gray', '[confirm]')}`,
@ -459,16 +670,38 @@ const draw = () => {
};
// ── main loop + key handling ────────────────────────────────────────
if (!fs.existsSync(ROOT)) {
process.stdout.write(`agent-queue: queue root not found: ${ROOT}\nRun \`agent-queue.sh init\` first.\n`);
process.exit(1);
// Fleet mode (AQ_FLEET_DASH=1) sources the board from the /fleet API on an async,
// single-flight tick loop. Local mode keeps the original synchronous setInterval
// path byte-for-byte. `timer` holds whichever timer is live so quit() can clear it.
let timer = null;
if (FLEET.enabled) {
if (!FLEET.ok) {
process.stdout.write(
`agent-queue: fleet dashboard enabled (AQ_FLEET_DASH=1) but missing config:\n` +
` ${FLEET.missing.join(', ')}\n` +
`Set AQ_FLEET_API, AQ_FLEET_TOKEN and AQ_PRODUCT_ID, or unset AQ_FLEET_DASH.\n`
);
process.exit(1);
}
draw(); // initial frame (loading…)
const tick = async () => {
await refreshFleet();
if (mode !== 'log' && mode !== 'help') draw();
timer = setTimeout(tick, INTERVAL); // single-flight: schedule only after the await
};
tick();
} else {
if (!fs.existsSync(ROOT)) {
process.stdout.write(`agent-queue: queue root not found: ${ROOT}\nRun \`agent-queue.sh init\` first.\n`);
process.exit(1);
}
draw();
timer = setInterval(draw, INTERVAL);
}
draw();
const timer = setInterval(draw, INTERVAL);
const quit = () => {
clearInterval(timer);
if (timer) { clearTimeout(timer); clearInterval(timer); }
try { if (process.stdin.isTTY) process.stdin.setRawMode(false); } catch { /* noop */ }
process.stdout.write(C.reset + '\n');
process.exit(0);
@ -487,7 +720,7 @@ function onKey(key) {
if (mode === 'help') { mode = 'board'; return draw(); }
if (mode === 'log') {
if (key === 'q' || key === '\u001b' || key === '\r' || key === '\n') { mode = 'board'; logJob = null; }
else if (key === 'g') { /* fallthrough to redraw */ }
else if (key === 'g') { if (FLEET.enabled && logJob) refreshFleetEvents(logJob); }
return draw();
}
if (mode === 'confirm') {
@ -504,14 +737,19 @@ function onKey(key) {
case 'j': case '\u001b[B': moveSel(1); break;
case 'k': case '\u001b[A': moveSel(-1); break;
case '\r': case '\n': case 'l':
if (items[selIdx]) { logJob = items[selIdx].job; mode = 'log'; }
if (items[selIdx]) {
logJob = items[selIdx].job; mode = 'log';
if (FLEET.enabled) refreshFleetEvents(logJob);
}
break;
case 'p': doAction('promote'); break;
case 's': doAction('ship'); break;
case 'x': doAction('reject'); break;
case 'u': doAction('requeue'); break;
case 'r': startRun(); break;
case 'S': { const res = aq(['stop']); setFlash(c('red', '■ ') + (lastLine(res.out) || 'stopped')); break; }
case 'r': if (FLEET.enabled) setFlash(c('gray', 'run loop n/a in fleet mode')); else startRun(); break;
case 'S':
if (FLEET.enabled) { setFlash(c('gray', 'stop n/a in fleet mode')); break; }
{ const res = aq(['stop']); setFlash(c('red', '■ ') + (lastLine(res.out) || 'stopped')); break; }
default:
if (/^[1-9]$/.test(key)) {
const i = parseInt(key, 10) - 1;

View File

@ -281,7 +281,7 @@ Three transports were evaluated. **Decision: platform-service-native coordinator
### Phase 3 — Unified control plane
- [ ] Add a **Fleet** surface to `tracker-web` reusing auth/Primitives/DataTable/product switcher: fleet map (factories + load/health), job table, job DAG, **live log streaming**, lease/heartbeat status, cost burndown, approve/ship buttons.
- [ ] **Streaming caveat (correctness):** live logs **must not** use the existing buffering catch-all proxy `/api/tracker/[...path]` — it does `res.text()` and would never stream. Use a **dedicated Next.js Route Handler returning a `ReadableStream` (SSE)** or a direct SSE/WebSocket to platform-service. Full logs are shipped to blob storage (§17); the endpoint serves stored tail + live append.
- [ ] The Node TUI dashboard becomes a thin client of the same `/fleet` API (parity with web).
- [x] The Node TUI dashboard becomes a thin client of the same `/fleet` API (parity with web). *(devops-tools `agent-queue/dashboard.mjs` + `lib/fleet-dash.mjs`, `AQ_FLEET_DASH=1`.)*
- **Acceptance:** an operator can watch all factories + tail any job log + ship from the browser.
- **Verify gate:** web e2e (Playwright) covering fleet map render, live log, and a ship action.
@ -398,7 +398,7 @@ Each phase: **Goal → checklist → Exit criteria**. Don't start a phase until
- [x] Cost burndown + budget kill-switch UI; multi-reviewer routing. *(common-plat `app/dashboard/fleet/budget/page.tsx` burndown + pause/resume; `ReviewGateCard` multi-reviewer quorum gate via `requestReview`/`submitReview`.)*
- [x] Scoring router with configurable weights + explainability surfaced in UI. *(common-plat `fleet/scheduler.ts` tunable weights + `GET /fleet/jobs/:id/explain`; `ExplainPanel` breakdown in job detail.)*
- [x] Preemption of low-priority by critical jobs (checkpoint + requeue). *(common-plat `fleet/scheduler.ts` `selectPreemptionVictim` + coordinator eviction under `FLEET_PREEMPTION`; victim requeued with checkpoint + bumped epoch, `preempted` event.)*
- [ ] TUI dashboard re-pointed at `/fleet` API (parity).
- [x] TUI dashboard re-pointed at `/fleet` API (parity). *(devops-tools `agent-queue/lib/fleet-dash.mjs` adapter + `dashboard.mjs` fleet mode under `AQ_FLEET_DASH=1`: board/factories/metrics/alerts, job actions ship/requeue/reject via `/fleet`, per-job events log; opt-in so local mode is byte-for-byte unchanged. Verified by `lib/fleet-dash.test.mjs` (22 assertions) wired into `selftest.sh` + live non-TTY render smoke.)*
- [x] Web e2e (Playwright): fleet map, live log, ship, budget-pause. *(common-plat `dashboards/tracker-web/e2e/fleet.spec.ts`: fleet overview, metrics, job detail, ship, budget-pause, review-gate specs green.)*
- **Exit criteria:** all boxes ✅; web `verify` (typecheck+lint+test+e2e) green; an operator runs the whole 3-repo parallel workload from the browser, including a budget pause + resume.

View File

@ -0,0 +1,218 @@
// fleet-dash.mjs — read/act adapter that re-points the agent-queue TUI dashboard
// at the platform-service `/fleet` REST API (roadmap Phase 3: "TUI dashboard
// re-pointed at /fleet API (parity)").
//
// This module is intentionally pure-ish and dependency-injectable (the HTTP
// `fetchImpl` is a parameter) so it is unit-testable WITHOUT a live service.
// dashboard.mjs uses it ONLY when AQ_FLEET_DASH=1; otherwise the dashboard's
// local-queue behavior is byte-for-byte unchanged.
//
// Auth + scoping mirror agent-queue/lib/fleet-client.sh:
// base URL AQ_FLEET_API (already includes /api)
// bearer AQ_FLEET_TOKEN
// product X-Product-Id: AQ_PRODUCT_ID (sent on every request)
const DEFAULT_TIMEOUT_MS = 8000;
// fleetConfig(env) — resolve the dashboard's fleet mode from the environment.
// enabled iff AQ_FLEET_DASH=1 (explicit opt-in). `ok` additionally requires the
// api/token/product config to be complete; `missing` lists what's absent so the
// dashboard can fail visibly instead of silently doing nothing.
export function fleetConfig(env = process.env) {
const enabled = String(env.AQ_FLEET_DASH || '') === '1';
const api = String(env.AQ_FLEET_API || '').replace(/\/+$/, '');
const token = String(env.AQ_FLEET_TOKEN || '');
const productId = String(env.AQ_PRODUCT_ID || '');
const missing = [];
if (enabled) {
if (!api) missing.push('AQ_FLEET_API');
if (!token) missing.push('AQ_FLEET_TOKEN');
if (!productId) missing.push('AQ_PRODUCT_ID');
}
return { enabled, api, token, productId, ok: enabled && missing.length === 0, missing };
}
// ── stage mapping ───────────────────────────────────────────────────────────
// Fleet stages collapse onto the local board's kanban buckets so the dashboard's
// existing layout/gating/STAGE_TAG logic can be reused unchanged.
const STAGE_BUCKET = {
queued: 'inbox',
assigned: 'building',
building: 'building',
review: 'review',
testing: 'testing',
shipped: 'shipped',
failed: 'failed',
dead_letter: 'failed',
};
export const mapStage = (s) => STAGE_BUCKET[s] || 'inbox';
// Stages an operator can act on from the dashboard (parity with the local
// ACTION_STAGES = review · testing · failed · inbox).
const ACTIONABLE = new Set(['review', 'testing', 'failed', 'inbox']);
const RUNNING_STAGES = new Set(['assigned', 'building']);
const ITEM_ORDER = { review: 0, testing: 1, failed: 2, inbox: 3 };
const trackerOf = (j) => j.trackerItemId || j.trackerItem || (j.data && j.data.trackerItem) || '';
const capsOf = (j) => (Array.isArray(j.capabilities) ? j.capabilities.join(', ') : String(j.capabilities || ''));
// toBoard({jobs, factories, metrics}) — normalize the raw API payloads into the
// shape the dashboard renders. Pure (no I/O), so it is fully unit-testable.
export function toBoard({ jobs = [], factories = [], metrics = null } = {}) {
const counts = { inbox: 0, building: 0, review: 0, testing: 0, shipped: 0, failed: 0 };
const items = [];
const running = [];
const recent = [];
for (const j of jobs) {
const bucket = mapStage(j.stage);
if (counts[bucket] !== undefined) counts[bucket] += 1;
const norm = {
// `stage` is the bucket so the dashboard's gate()/STAGE_TAG work unchanged;
// `fleetStage` keeps the true server stage for display.
stage: bucket,
fleetStage: j.stage,
id: j.id,
priority: j.priority || 'medium',
profile: j.profile || '',
capabilities: capsOf(j),
tracker_item: trackerOf(j),
leaseEpoch: j.leaseEpoch,
factoryId: j.factoryId || j.leaseFactoryId || '',
attempts: j.attempts,
updatedAt: j.updatedAt || j.createdAt || '',
raw: j,
};
if (RUNNING_STAGES.has(j.stage)) running.push(norm);
if (ACTIONABLE.has(bucket)) items.push(norm);
if (bucket === 'shipped' || bucket === 'failed') recent.push(norm);
}
items.sort((a, b) => (ITEM_ORDER[a.stage] - ITEM_ORDER[b.stage]) || cmp(a.id, b.id));
recent.sort((a, b) => cmp(String(b.updatedAt), String(a.updatedAt)));
return { counts, items, running, recent: recent.slice(0, 5), factories, metrics };
}
const cmp = (a, b) => (a < b ? -1 : a > b ? 1 : 0);
// ── HTTP ──────────────────────────────────────────────────────────────────--
// fleetFetch — a single request against the coordinator. NEVER throws: network
// errors / timeouts / non-JSON bodies are returned as a structured result so the
// TUI stays responsive. A timeout is enforced via AbortController.
export async function fleetFetch(cfg, pathname, opts = {}, fetchImpl = globalThis.fetch) {
const url = `${cfg.api}${pathname}`;
const headers = {
Authorization: `Bearer ${cfg.token}`,
'X-Product-Id': cfg.productId,
Accept: 'application/json',
};
const hasBody = opts.body !== undefined;
if (hasBody) headers['Content-Type'] = 'application/json';
const ac = new AbortController();
const timer = setTimeout(() => ac.abort(), opts.timeoutMs || DEFAULT_TIMEOUT_MS);
try {
const res = await fetchImpl(url, {
method: opts.method || 'GET',
headers,
body: hasBody ? JSON.stringify(opts.body) : undefined,
signal: ac.signal,
});
let json = null;
let text = '';
try { text = await res.text(); } catch { /* ignore body read errors */ }
if (text) { try { json = JSON.parse(text); } catch { json = null; } }
return { ok: !!res.ok, status: res.status, json };
} catch (e) {
const timedOut = e && (e.name === 'AbortError' || e.code === 'ABORT_ERR');
return { ok: false, status: 0, json: null, error: timedOut ? 'timeout' : (e && e.message) || 'network error' };
} finally {
clearTimeout(timer);
}
}
// fetchBoard — assemble the board model. Jobs are REQUIRED (failure ⇒ board
// fails). Metrics + factories are best-effort: a missing/404/501 factories
// endpoint degrades to [] (it is optional server-side), and absent metrics
// simply omits the aggregate panel.
export async function fetchBoard(cfg, fetchImpl = globalThis.fetch) {
const jobsRes = await fleetFetch(cfg, '/fleet/jobs', {}, fetchImpl);
if (!jobsRes.ok || !jobsRes.json) {
return { ok: false, error: jobsRes.error || `jobs HTTP ${jobsRes.status}` };
}
const jobs = Array.isArray(jobsRes.json.jobs) ? jobsRes.json.jobs : [];
const metricsRes = await fleetFetch(cfg, '/fleet/metrics', {}, fetchImpl);
const metrics = metricsRes.ok && metricsRes.json ? metricsRes.json : null;
const facRes = await fleetFetch(cfg, '/fleet/factories', {}, fetchImpl);
const factories = facRes.ok && facRes.json && Array.isArray(facRes.json.factories)
? facRes.json.factories
: [];
return { ok: true, board: toBoard({ jobs, factories, metrics }) };
}
// formatEvent — one fleet event → a single log line for the TUI log view.
export function formatEvent(e) {
const at = e.at ? safeTime(e.at) : '';
const actor = e.actor ? ` ${e.actor}` : '';
const data = e.data && typeof e.data === 'object' && Object.keys(e.data).length
? ` ${JSON.stringify(e.data)}`
: '';
return `${at} ${e.type || '?'}${actor}${data}`.trim();
}
const safeTime = (iso) => {
const d = new Date(iso);
return Number.isNaN(d.getTime()) ? String(iso) : d.toLocaleTimeString();
};
// fetchEvents — the job's event stream rendered as log lines.
export async function fetchEvents(cfg, jobId, fetchImpl = globalThis.fetch) {
const res = await fleetFetch(cfg, `/fleet/jobs/${encodeURIComponent(jobId)}/events`, {}, fetchImpl);
if (!res.ok || !res.json) {
return { ok: false, error: res.error || `events HTTP ${res.status}`, lines: [] };
}
const events = Array.isArray(res.json.events) ? res.json.events : [];
return { ok: true, lines: events.map(formatEvent) };
}
// Operator verbs the dashboard supports in fleet mode. `promote` has no safe
// server contract (client-inferred stage transitions could violate workflow
// invariants), so it is explicitly unavailable here.
const FLEET_VERBS = new Set(['ship', 'requeue', 'reject']);
// jobAction — execute an operator verb against the coordinator.
// ship → re-GET the job for a FRESH leaseEpoch, then PATCH stage=shipped
// (a stale snapshot epoch would be fenced with 409).
// requeue → POST /actions/requeue (lease-free operator action)
// reject → POST /actions/reject
// Returns {ok, message}; never throws.
export async function jobAction(cfg, item, verb, fetchImpl = globalThis.fetch) {
if (verb === 'promote') return { ok: false, message: 'promote is not available in fleet mode' };
if (!FLEET_VERBS.has(verb)) return { ok: false, message: `${verb} not supported in fleet mode` };
const id = item && item.id;
if (!id) return { ok: false, message: 'no job selected' };
if (verb === 'ship') {
const cur = await fleetFetch(cfg, `/fleet/jobs/${encodeURIComponent(id)}`, {}, fetchImpl);
if (!cur.ok || !cur.json) return { ok: false, message: cur.error || `job HTTP ${cur.status}` };
const res = await fleetFetch(
cfg,
`/fleet/jobs/${encodeURIComponent(id)}`,
{ method: 'PATCH', body: { stage: 'shipped', leaseEpoch: cur.json.leaseEpoch } },
fetchImpl,
);
if (res.status === 409) return { ok: false, message: 'job changed (fenced) — refresh and retry' };
if (!res.ok) return { ok: false, message: res.error || `ship HTTP ${res.status}` };
return { ok: true, message: `shipped ${id}` };
}
const res = await fleetFetch(
cfg,
`/fleet/jobs/${encodeURIComponent(id)}/actions/${verb}`,
{ method: 'POST', body: {} },
fetchImpl,
);
if (res.status === 409) return { ok: false, message: 'job conflict/terminal — refresh and retry' };
if (!res.ok) return { ok: false, message: res.error || `${verb} HTTP ${res.status}` };
return { ok: true, message: `${verb} ${id}` };
}

View File

@ -0,0 +1,287 @@
// fleet-dash.test.mjs — dependency-light unit tests for the fleet-mode dashboard
// adapter. Uses node:assert only (no test framework), matching the repo style.
// Run: `node fleet-dash.test.mjs` (also wired into selftest.sh).
//
// These tests prove the dashboard's CONTRACT ASSUMPTIONS against the /fleet API
// (request shaping, response mapping, graceful degradation, action semantics)
// via an injected fetch stub. They do NOT prove live server compatibility.
import assert from 'node:assert/strict';
import {
fleetConfig,
mapStage,
toBoard,
fleetFetch,
fetchBoard,
fetchEvents,
formatEvent,
jobAction,
} from './fleet-dash.mjs';
let passed = 0;
const t = (name, fn) => {
try {
const r = fn();
if (r && typeof r.then === 'function') {
return r.then(
() => { passed += 1; },
(e) => { console.error(`${name}\n ${e && e.message}`); process.exitCode = 1; },
);
}
passed += 1;
} catch (e) {
console.error(`${name}\n ${e && e.message}`);
process.exitCode = 1;
}
return undefined;
};
// A recording fetch stub. `routes` maps a matcher → {status, body} (or a fn).
function makeFetch(routes) {
const calls = [];
const fetchImpl = async (url, opts = {}) => {
calls.push({ url, opts, headers: opts.headers || {}, method: opts.method || 'GET' });
let entry = routes[url];
if (!entry) {
// try suffix match (path only)
const key = Object.keys(routes).find((k) => url.endsWith(k));
entry = key ? routes[key] : undefined;
}
if (typeof entry === 'function') entry = entry({ url, opts });
if (entry === undefined) return mkRes(404, '{}');
if (entry.throw) throw Object.assign(new Error(entry.throw), { name: entry.name || 'Error' });
return mkRes(entry.status ?? 200, entry.body ?? '');
};
fetchImpl.calls = calls;
return fetchImpl;
}
const mkRes = (status, body) => ({
ok: status >= 200 && status < 300,
status,
text: async () => (typeof body === 'string' ? body : JSON.stringify(body)),
});
const CFG = { enabled: true, ok: true, api: 'http://svc/api', token: 'tok', productId: 'prodX', missing: [] };
await (async () => {
// ── fleetConfig ──
t('fleetConfig: AQ_FLEET_DASH unset ⇒ disabled', () => {
const c = fleetConfig({});
assert.equal(c.enabled, false);
assert.equal(c.ok, false);
});
t('fleetConfig: enabled but missing config ⇒ not ok, lists missing', () => {
const c = fleetConfig({ AQ_FLEET_DASH: '1' });
assert.equal(c.enabled, true);
assert.equal(c.ok, false);
assert.deepEqual(c.missing.sort(), ['AQ_FLEET_API', 'AQ_FLEET_TOKEN', 'AQ_PRODUCT_ID'].sort());
});
t('fleetConfig: enabled + complete ⇒ ok, trims trailing slash', () => {
const c = fleetConfig({ AQ_FLEET_DASH: '1', AQ_FLEET_API: 'http://svc/api/', AQ_FLEET_TOKEN: 'k', AQ_PRODUCT_ID: 'p' });
assert.equal(c.ok, true);
assert.equal(c.api, 'http://svc/api');
});
// ── mapStage ──
t('mapStage: fleet stages collapse to board buckets', () => {
assert.equal(mapStage('queued'), 'inbox');
assert.equal(mapStage('assigned'), 'building');
assert.equal(mapStage('building'), 'building');
assert.equal(mapStage('review'), 'review');
assert.equal(mapStage('testing'), 'testing');
assert.equal(mapStage('shipped'), 'shipped');
assert.equal(mapStage('failed'), 'failed');
assert.equal(mapStage('dead_letter'), 'failed');
assert.equal(mapStage('weird'), 'inbox');
});
// ── toBoard ──
t('toBoard: counts, actionable items, running, recent', () => {
const jobs = [
{ id: 'a', stage: 'queued', priority: 'high', capabilities: ['os:any'] },
{ id: 'b', stage: 'building', priority: 'critical', factoryId: 'mac-1', leaseEpoch: 3 },
{ id: 'c', stage: 'review', updatedAt: '2026-01-01T00:00:02Z' },
{ id: 'd', stage: 'testing' },
{ id: 'e', stage: 'shipped', updatedAt: '2026-01-01T00:00:09Z' },
{ id: 'f', stage: 'failed', updatedAt: '2026-01-01T00:00:05Z' },
{ id: 'g', stage: 'dead_letter', updatedAt: '2026-01-01T00:00:01Z' },
];
const b = toBoard({ jobs });
assert.equal(b.counts.inbox, 1);
assert.equal(b.counts.building, 1);
assert.equal(b.counts.review, 1);
assert.equal(b.counts.testing, 1);
assert.equal(b.counts.shipped, 1);
assert.equal(b.counts.failed, 2); // failed + dead_letter
// running = assigned/building only
assert.deepEqual(b.running.map((x) => x.id), ['b']);
assert.equal(b.running[0].fleetStage, 'building');
assert.equal(b.running[0].factoryId, 'mac-1');
// actionable items exclude building/shipped, ordered review<testing<failed<inbox
assert.deepEqual(b.items.map((x) => x.id), ['c', 'd', 'f', 'g', 'a']);
// item.stage is the bucket (so dashboard gate()/STAGE_TAG reuse works)
assert.equal(b.items[0].stage, 'review');
assert.equal(b.items[4].stage, 'inbox');
// recent = shipped+failed, newest first, capped at 5
assert.deepEqual(b.recent.map((x) => x.id), ['e', 'f', 'g']);
});
// ── fleetFetch: headers + product scoping on every request ──
await t('fleetFetch: sends bearer + X-Product-Id; parses JSON', async () => {
const f = makeFetch({ '/fleet/jobs': { status: 200, body: { jobs: [] } } });
const r = await fleetFetch(CFG, '/fleet/jobs', {}, f);
assert.equal(r.ok, true);
assert.deepEqual(r.json, { jobs: [] });
const h = f.calls[0].headers;
assert.equal(h.Authorization, 'Bearer tok');
assert.equal(h['X-Product-Id'], 'prodX');
assert.equal(f.calls[0].url, 'http://svc/api/fleet/jobs');
});
await t('fleetFetch: network error ⇒ ok:false with message (no throw)', async () => {
const f = makeFetch({ '/fleet/jobs': { throw: 'boom' } });
const r = await fleetFetch(CFG, '/fleet/jobs', {}, f);
assert.equal(r.ok, false);
assert.equal(r.status, 0);
assert.match(r.error, /boom/);
});
await t('fleetFetch: abort ⇒ timeout error', async () => {
const f = makeFetch({ '/fleet/jobs': { throw: 'aborted', name: 'AbortError' } });
const r = await fleetFetch(CFG, '/fleet/jobs', {}, f);
assert.equal(r.ok, false);
assert.equal(r.error, 'timeout');
});
await t('fleetFetch: non-JSON 500 body ⇒ ok:false, json null', async () => {
const f = makeFetch({ '/fleet/jobs': { status: 500, body: '<html>err</html>' } });
const r = await fleetFetch(CFG, '/fleet/jobs', {}, f);
assert.equal(r.ok, false);
assert.equal(r.status, 500);
assert.equal(r.json, null);
});
// ── fetchBoard: assembly + degradation ──
await t('fetchBoard: jobs + metrics + factories assembled', async () => {
const f = makeFetch({
'/fleet/jobs': { body: { jobs: [{ id: 'a', stage: 'review' }] } },
'/fleet/metrics': { body: { utilizationPct: 50, alerts: [] } },
'/fleet/factories': { body: { factories: [{ factoryId: 'mac-1', health: 'ok' }] } },
});
const r = await fetchBoard(CFG, f);
assert.equal(r.ok, true);
assert.equal(r.board.items.length, 1);
assert.equal(r.board.metrics.utilizationPct, 50);
assert.equal(r.board.factories.length, 1);
});
await t('fetchBoard: factories 404 ⇒ degrades to []', async () => {
const f = makeFetch({
'/fleet/jobs': { body: { jobs: [] } },
'/fleet/metrics': { body: {} },
'/fleet/factories': { status: 404, body: {} },
});
const r = await fetchBoard(CFG, f);
assert.equal(r.ok, true);
assert.deepEqual(r.board.factories, []);
});
await t('fetchBoard: factories 501 ⇒ degrades to []', async () => {
const f = makeFetch({
'/fleet/jobs': { body: { jobs: [] } },
'/fleet/metrics': { body: {} },
'/fleet/factories': { status: 501, body: {} },
});
const r = await fetchBoard(CFG, f);
assert.equal(r.ok, true);
assert.deepEqual(r.board.factories, []);
});
await t('fetchBoard: metrics failure ⇒ board still ok, metrics null', async () => {
const f = makeFetch({
'/fleet/jobs': { body: { jobs: [] } },
'/fleet/metrics': { status: 500, body: 'oops' },
'/fleet/factories': { body: { factories: [] } },
});
const r = await fetchBoard(CFG, f);
assert.equal(r.ok, true);
assert.equal(r.board.metrics, null);
});
await t('fetchBoard: jobs failure ⇒ board fails with error', async () => {
const f = makeFetch({ '/fleet/jobs': { status: 503, body: '{}' } });
const r = await fetchBoard(CFG, f);
assert.equal(r.ok, false);
assert.match(r.error, /503/);
});
// ── events ──
t('formatEvent: renders type + actor + data', () => {
const line = formatEvent({ type: 'claimed', actor: 'mac-1', at: '2026-01-01T00:00:00Z', data: { leaseEpoch: 2 } });
assert.match(line, /claimed/);
assert.match(line, /mac-1/);
assert.match(line, /leaseEpoch/);
});
await t('fetchEvents: maps events to lines', async () => {
const f = makeFetch({
'/events': { body: { events: [{ type: 'queued', at: '2026-01-01T00:00:00Z', data: {} }, { type: 'claimed', data: {} }] } },
});
const r = await fetchEvents(CFG, 'job-1', f);
assert.equal(r.ok, true);
assert.equal(r.lines.length, 2);
assert.match(r.lines[1], /claimed/);
assert.match(f.calls[0].url, /\/fleet\/jobs\/job-1\/events$/);
});
await t('fetchEvents: failure ⇒ ok:false, empty lines', async () => {
const f = makeFetch({ '/events': { status: 500, body: 'x' } });
const r = await fetchEvents(CFG, 'job-1', f);
assert.equal(r.ok, false);
assert.deepEqual(r.lines, []);
});
// ── jobAction ──
await t('jobAction: ship re-GETs fresh leaseEpoch then PATCHes shipped', async () => {
let patchBody = null;
const f = makeFetch({
'http://svc/api/fleet/jobs/j1': ({ opts }) => {
if ((opts.method || 'GET') === 'PATCH') { patchBody = JSON.parse(opts.body); return { status: 200, body: { id: 'j1', stage: 'shipped' } }; }
return { status: 200, body: { id: 'j1', stage: 'testing', leaseEpoch: 7 } }; // fresh epoch
},
});
const r = await jobAction(CFG, { id: 'j1', leaseEpoch: 2 /* stale */ }, 'ship', f);
assert.equal(r.ok, true);
assert.equal(patchBody.stage, 'shipped');
assert.equal(patchBody.leaseEpoch, 7); // used the freshly-fetched epoch, not the stale 2
});
await t('jobAction: ship 409 ⇒ actionable fenced message', async () => {
const f = makeFetch({
'http://svc/api/fleet/jobs/j1': ({ opts }) => (opts.method === 'PATCH'
? { status: 409, body: '{}' }
: { status: 200, body: { id: 'j1', leaseEpoch: 7 } }),
});
const r = await jobAction(CFG, { id: 'j1' }, 'ship', f);
assert.equal(r.ok, false);
assert.match(r.message, /fenced|refresh/i);
});
await t('jobAction: requeue ⇒ POST /actions/requeue', async () => {
const f = makeFetch({ '/fleet/jobs/j1/actions/requeue': { status: 200, body: { id: 'j1', stage: 'queued' } } });
const r = await jobAction(CFG, { id: 'j1' }, 'requeue', f);
assert.equal(r.ok, true);
assert.equal(f.calls[0].method, 'POST');
assert.match(f.calls[0].url, /\/actions\/requeue$/);
});
await t('jobAction: reject 409 ⇒ conflict message', async () => {
const f = makeFetch({ '/fleet/jobs/j1/actions/reject': { status: 409, body: '{}' } });
const r = await jobAction(CFG, { id: 'j1' }, 'reject', f);
assert.equal(r.ok, false);
assert.match(r.message, /conflict|terminal|refresh/i);
});
t('jobAction: promote ⇒ explicitly unavailable in fleet mode', async () => {
return jobAction(CFG, { id: 'j1' }, 'promote', makeFetch({})).then((r) => {
assert.equal(r.ok, false);
assert.match(r.message, /promote/i);
});
});
})();
// Summary line (selftest greps for PASS).
process.on('exit', () => {
if (process.exitCode && process.exitCode !== 0) {
console.error('fleet-dash.test FAIL');
} else {
console.log(`fleet-dash.test PASS (${passed} assertions)`);
}
});

View File

@ -41,6 +41,12 @@ bash -n "${BASH_SOURCE[0]}" && pass "bash -n selftest.sh"
# 3. dashboard syntax (optional)
if command -v node >/dev/null 2>&1; then
node --check "$HERE/dashboard.mjs" && pass "node --check dashboard.mjs"
node --check "$HERE/lib/fleet-dash.mjs" && pass "node --check lib/fleet-dash.mjs"
if out="$(node "$HERE/lib/fleet-dash.test.mjs" 2>&1)" && printf '%s' "$out" | grep -q 'fleet-dash.test PASS'; then
pass "fleet-dash.test (${out##*PASS })"
else
printf '%s\n' "$out"; fail "fleet-dash.test"
fi
else
info "node not installed — skipping dashboard check"
fi