bytelyst-devops-tools/agent-queue/dashboard.mjs
Saravanakumar D 66c91233da feat(agent-queue): re-point TUI dashboard at /fleet API (parity)
Add an opt-in fleet mode to the dashboard so an operator can drive the
coordinator fleet from the same TUI used for the local folder queue.

- lib/fleet-dash.mjs: dependency-injectable read/act adapter over the
  platform-service /fleet REST surface (jobs, metrics, factories, events,
  ship/requeue/reject). Pure-ish + fully unit-testable without a live service.
- dashboard.mjs: render + act in fleet mode when AQ_FLEET_DASH=1 — board with
  counts, factories (per-factory rows or metrics aggregate), alerts, running
  (by lease/factory), actionable JOBS with manifest tags, recent, and a
  per-job events log. Single-flight async refresh keeps the last good board on
  failure; ship re-GETs a fresh leaseEpoch before PATCH; run/stop/promote are
  disabled (no safe server contract). Local mode is byte-for-byte unchanged.
- lib/fleet-dash.test.mjs: 22 node:assert assertions (config, stage mapping,
  toBoard, fetch headers/timeout/errors, board assembly + graceful degradation,
  events, job actions) wired into selftest.sh.
- docs: tick the Phase 3 "TUI re-pointed at /fleet" roadmap boxes.

Verified: selftest.sh green (incl. new fleet-dash checks); live non-TTY render
smoke against a stub /fleet server (both factories and metrics-aggregate paths);
local mode unchanged.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
2026-05-30 19:47:56 -07:00

770 lines
32 KiB
JavaScript
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env node
// agent-queue dashboard — a zero-dependency, INTERACTIVE TUI for the folder queue.
//
// Reads the same queue/ state written by agent-queue.sh and re-renders a board
// every interval: kanban counts, running workers (engine, elapsed, last log line),
// and a navigable, numbered job list you can act on without leaving the screen.
//
// Lifecycle: inbox → building → review → testing → shipped (+ failed)
//
// Interactive keys (when run in a TTY):
// ↑/↓ or j/k or 1-9 select a job enter / l view its log
// p promote s ship (testing→shipped) x reject
// u requeue r run loop S stop g refresh now
// ? help q / Ctrl-C quit
// All actions shell out to agent-queue.sh — it stays the single source of truth.
//
// Usage: node dashboard.mjs [--interval 2] [--root /path/to/queue]
// AGENT_QUEUE_ROOT=/path node dashboard.mjs
// AQ_TRACKER_WEB=https://tracker.example.com node dashboard.mjs
// (makes job tracker-item tags clickable terminal hyperlinks)
import fs from 'node:fs';
import path from 'node:path';
import { fileURLToPath } from 'node:url';
import { execFileSync, spawn } from 'node:child_process';
import { fleetConfig, fetchBoard, fetchEvents, jobAction } from './lib/fleet-dash.mjs';
const __dirname = path.dirname(fileURLToPath(import.meta.url));
// ── args / config ───────────────────────────────────────────────────
const argv = process.argv.slice(2);
const getArg = (flag, def) => {
const i = argv.indexOf(flag);
return i !== -1 && argv[i + 1] ? argv[i + 1] : def;
};
const ROOT = path.resolve(getArg('--root', process.env.AGENT_QUEUE_ROOT || path.join(__dirname, 'queue')));
const INTERVAL = Math.max(1, parseInt(getArg('--interval', '2'), 10)) * 1000;
// A running worker is flagged stalled if its log has not changed in this many minutes.
const STALL_MIN = Math.max(1, parseInt(process.env.AGENT_QUEUE_STALL_MIN || '10', 10));
const DIRS = {
inbox: path.join(ROOT, 'inbox'),
building: path.join(ROOT, 'building'),
review: path.join(ROOT, 'review'),
testing: path.join(ROOT, 'testing'),
shipped: path.join(ROOT, 'shipped'),
failed: path.join(ROOT, 'failed'),
logs: path.join(ROOT, 'logs'),
state: path.join(ROOT, '.state'),
};
// ── ansi ────────────────────────────────────────────────────────────
const C = {
reset: '\x1b[0m', dim: '\x1b[2m', bold: '\x1b[1m',
red: '\x1b[31m', green: '\x1b[32m', yellow: '\x1b[33m',
blue: '\x1b[34m', cyan: '\x1b[36m', gray: '\x1b[90m',
};
const c = (col, s) => `${C[col]}${s}${C.reset}`;
// ── helpers ─────────────────────────────────────────────────────────
const listMd = (dir) => {
try { return fs.readdirSync(dir).filter((f) => f.endsWith('.md')); }
catch { return []; }
};
const count = (dir) => listMd(dir).length;
const parseMeta = (file) => {
const out = {};
try {
for (const line of fs.readFileSync(file, 'utf8').split('\n')) {
const i = line.indexOf('=');
if (i > 0) out[line.slice(0, i)] = line.slice(i + 1);
}
} catch { /* ignore */ }
return out;
};
// Compact per-job insights (read-only from meta; agent-queue.sh is the source of
// truth). Surfaces tokens or cost + attempts + line deltas for finished jobs.
const insightsTag = (m) => {
const parts = [];
if (m.attempts && m.attempts !== '1') parts.push(`x${m.attempts}`);
if (m.cost_usd) parts.push(`$${m.cost_usd}${m.usage_estimated ? '~' : ''}`);
else if (m.tokens_in || m.tokens_out) parts.push(`tok ${m.tokens_in || 0}/${m.tokens_out || 0}`);
if (m.lines_added || m.lines_deleted) parts.push(`+${m.lines_added || 0}/-${m.lines_deleted || 0}`);
if (m.duration_s) parts.push(`${m.duration_s}s`);
return parts.join(' ');
};
// Manifest tags (read-only): the routing inputs an operator cares about when
// scanning the board — priority, profile, capabilities, and a tracker-item
// reference. Rendered from a job's meta (launched jobs) or, for never-launched
// inbox jobs, parsed from the .md frontmatter (see readManifest). The
// tracker-item becomes a real terminal hyperlink when AQ_TRACKER_WEB is set.
const TRACKER_WEB = (process.env.AQ_TRACKER_WEB || '').replace(/\/+$/, '');
const osc8 = (url, label) => `\x1b]8;;${url}\x07${label}\x1b]8;;\x07`;
const trackerTag = (id) => {
if (!id) return '';
const label = `${id}`;
return TRACKER_WEB ? osc8(`${TRACKER_WEB}/${encodeURIComponent(id)}`, label) : label;
};
const PRIORITY_COLOR = { critical: 'red', high: 'yellow', medium: 'gray', low: 'gray' };
const manifestTags = (m) => {
if (!m) return '';
const parts = [];
if (m.priority && m.priority !== 'medium') {
parts.push(c(PRIORITY_COLOR[m.priority] || 'gray', `${m.priority}`));
}
if (m.profile) parts.push(c('blue', `${m.profile}`));
if (m.capabilities) {
const caps = String(m.capabilities).replace(/^\[|\]$/g, '').trim();
if (caps) parts.push(c('gray', `caps ${trunc(caps, 36)}`));
}
if (m.tracker_item) parts.push(c('cyan', trackerTag(m.tracker_item)));
return parts.join(' ');
};
const pidAlive = (pid) => {
if (!pid) return false;
try { process.kill(Number(pid), 0); return true; } catch { return false; }
};
const lastLogLine = (job) => {
try {
const txt = fs.readFileSync(path.join(DIRS.logs, `${job}.log`), 'utf8');
const lines = txt.split('\n').map((l) => l.trim()).filter(Boolean);
return lines.length ? lines[lines.length - 1] : '';
} catch { return ''; }
};
// seconds since a job's log was last modified (no new agent output); null if no log
const logAgeSec = (job) => {
try {
const mt = fs.statSync(path.join(DIRS.logs, `${job}.log`)).mtimeMs;
return Math.max(0, Math.floor((Date.now() - mt) / 1000));
} catch { return null; }
};
const fmtElapsed = (startSec) => {
if (!startSec) return ' -- ';
const s = Math.max(0, Math.floor(Date.now() / 1000) - Number(startSec));
const m = Math.floor(s / 60);
const h = Math.floor(m / 60);
if (h > 0) return `${h}h${String(m % 60).padStart(2, '0')}m`;
return `${m}m${String(s % 60).padStart(2, '0')}s`;
};
const trunc = (s, n) => (s.length > n ? s.slice(0, n - 1) + '…' : s);
const shortPath = (p) => (p || '').replace(process.env.HOME || '~', '~');
const readMetas = () => {
let files = [];
try { files = fs.readdirSync(DIRS.state).filter((f) => f.endsWith('.meta')); }
catch { /* ignore */ }
return files.map((f) => parseMeta(path.join(DIRS.state, f)));
};
// readManifest(stage, job) — manifest tags for a job that has no launched meta
// yet (e.g. queued in inbox/). Parses the leading --- frontmatter block of the
// job's .md and maps the few fields manifestTags renders. Never throws.
const FM_TAG_KEYS = {
priority: 'priority', profile: 'profile',
capabilities: 'capabilities', 'tracker-item': 'tracker_item',
};
const readManifest = (stage, job) => {
const out = {};
try {
const lines = fs.readFileSync(path.join(DIRS[stage], `${job}.md`), 'utf8').split('\n');
if ((lines[0] || '').trim() !== '---') return out;
for (let i = 1; i < lines.length; i++) {
if (lines[i].trim() === '---') break;
const line = lines[i].replace(/^\s+/, '');
const ci = line.indexOf(':');
if (ci <= 0) continue;
const key = line.slice(0, ci).trim();
if (!FM_TAG_KEYS[key]) continue;
out[FM_TAG_KEYS[key]] = line.slice(ci + 1).trim().replace(/^["']|["']$/g, '');
}
} catch { /* ignore */ }
return out;
};
// ── agent-queue.sh control (single source of truth) ─────────────────
const AQ = path.join(__dirname, 'agent-queue.sh');
const stripAnsi = (s) => (s || '').replace(/\x1b\[[0-9;]*m/g, '');
const lastLine = (s) => {
const lines = stripAnsi(s).split('\n').map((l) => l.trim()).filter(Boolean);
return lines.length ? lines[lines.length - 1] : '';
};
// aq(args) — run an agent-queue.sh subcommand, capturing output (never throws).
const aq = (args) => {
try {
const out = execFileSync('bash', [AQ, ...args], {
encoding: 'utf8', stdio: ['ignore', 'pipe', 'pipe'],
env: { ...process.env, AGENT_QUEUE_ROOT: ROOT },
});
return { ok: true, out };
} catch (e) {
return { ok: false, out: ((e.stdout || '') + (e.stderr || '') || e.message || '').toString() };
}
};
// ── fleet mode (Phase 3: TUI re-pointed at /fleet) ──────────────────
// Opt-in via AQ_FLEET_DASH=1. When ON, the board is sourced from the
// platform-service /fleet REST API instead of the local queue; when OFF, every
// fleet code path below is skipped and the dashboard is byte-for-byte the local
// tool. All fleet I/O lives in lib/fleet-dash.mjs (injectable + unit-tested).
const FLEET = fleetConfig();
// Latest fleet board snapshot. On a refresh failure we KEEP the last good board
// (no destructive flicker) and surface a staleness banner instead.
let fleetState = { board: null, error: null, loading: FLEET.enabled, lastOk: 0 };
let fleetRefreshing = false; // single-flight guard (no overlapping fetches)
let fleetEvents = { jobId: null, lines: [], error: null, loading: false };
// refreshFleet() — single-flight board refresh. Applies a successful board;
// on failure preserves the previous board and records the error.
const refreshFleet = async () => {
if (fleetRefreshing) return;
fleetRefreshing = true;
try {
const r = await fetchBoard(FLEET);
if (r.ok) fleetState = { board: r.board, error: null, loading: false, lastOk: Date.now() };
else fleetState = { ...fleetState, error: r.error, loading: false };
} catch (e) {
fleetState = { ...fleetState, error: (e && e.message) || 'refresh failed', loading: false };
} finally {
fleetRefreshing = false;
}
};
// refreshFleetEvents(job) — load a job's event stream into the log view state.
const refreshFleetEvents = (job) => {
fleetEvents = { jobId: job, lines: [], error: null, loading: true };
fetchEvents(FLEET, job).then((r) => {
fleetEvents = { jobId: job, lines: r.lines || [], error: r.ok ? null : r.error, loading: false };
if (mode === 'log' && logJob === job) draw();
});
};
// daemonPid() — pid of a live `run` loop, or null.
const daemonPid = () => {
try {
const pid = fs.readFileSync(path.join(DIRS.state, 'daemon.pid'), 'utf8').trim();
return pid && pidAlive(pid) ? pid : null;
} catch { return null; }
};
// startRun() — spawn a detached `run` loop writing to logs/run-loop.log.
const startRun = () => {
if (daemonPid()) { setFlash(c('yellow', 'run loop already active')); return; }
try {
const fd = fs.openSync(path.join(DIRS.logs, 'run-loop.log'), 'a');
const child = spawn('bash', [AQ, 'run'], {
detached: true, stdio: ['ignore', fd, fd],
env: { ...process.env, AGENT_QUEUE_ROOT: ROOT },
});
child.unref();
setFlash(c('green', `▶ run loop started (max ${process.env.AGENT_QUEUE_MAX || 3})`));
} catch (e) { setFlash(c('red', `run failed: ${e.message}`)); }
};
// ── interactive state ───────────────────────────────────────────────
const INTERACTIVE = !!process.stdin.isTTY;
const ACTION_STAGES = ['review', 'testing', 'failed', 'inbox'];
let mode = 'board'; // 'board' | 'log' | 'help' | 'confirm'
let items = []; // actionable jobs, rebuilt each draw
let selIdx = 0; // selected index into items
let selJob = null; // selected job name (stable across refreshes)
let flash = ''; // transient status message
let flashUntil = 0;
let logJob = null; // job whose log is being viewed
let confirmAction = null; // { verb, job, run }
const setFlash = (msg, ms = 4000) => { flash = msg; flashUntil = Date.now() + ms; };
const flashLine = () => (flash && Date.now() < flashUntil ? flash : '');
const buildItems = () => {
if (FLEET.enabled) {
const b = fleetState.board;
if (!b) return [];
return b.items.map((it) => ({ stage: it.stage, job: it.id, fleet: it }));
}
const list = [];
for (const st of ACTION_STAGES) {
for (const f of listMd(DIRS[st]).sort()) list.push({ stage: st, job: f.replace(/\.md$/, '') });
}
return list;
};
const syncSelection = () => {
if (selJob) {
const i = items.findIndex((it) => it.job === selJob);
if (i >= 0) { selIdx = i; return; }
}
selIdx = Math.max(0, Math.min(selIdx, items.length - 1));
selJob = items[selIdx]?.job ?? null;
};
const STAGE_TAG = {
review: () => c('cyan', '[review ]'),
testing: () => c('cyan', '[testing]'),
failed: () => c('red', '[failed ]'),
inbox: () => c('blue', '[inbox ]'),
};
// gate(verb, stage) — is this action valid for a job in this stage?
const gate = (verb, stage) => ({
promote: stage === 'review' || stage === 'testing',
ship: stage === 'testing',
reject: stage === 'review' || stage === 'testing',
requeue: stage === 'failed' || stage === 'review' || stage === 'testing',
logs: true,
}[verb]);
// doAction(verb) — run the gated action on the selected job. In fleet mode it
// calls the /fleet API (lib/fleet-dash.mjs); otherwise it shells out to
// agent-queue.sh. promote is unavailable in fleet mode (no safe server contract).
const doAction = (verb) => {
const it = items[selIdx];
if (!it) { setFlash(c('gray', 'no job selected')); return; }
if (FLEET.enabled && verb === 'promote') {
setFlash(c('gray', 'promote n/a in fleet mode (use ship/requeue/reject)'));
return;
}
if (!gate(verb, it.stage)) { setFlash(c('gray', `${verb} not valid for a ${it.stage} job`)); return; }
if ((verb === 'reject' || verb === 'requeue') && mode !== 'confirm') {
confirmAction = { verb, job: it.job, run: () => doAction(verb) };
mode = 'confirm';
return;
}
if (FLEET.enabled) {
setFlash(c('gray', `${verb}`));
mode = 'board'; confirmAction = null;
jobAction(FLEET, it.fleet, verb).then((r) => {
setFlash((r.ok ? c('green', '✓ ') : c('red', '✗ ')) + (r.message || `${verb} ${it.job}`));
refreshFleet().then(draw);
});
return;
}
const r = aq([verb, it.job]);
setFlash((r.ok ? c('green', '✓ ') : c('red', '✗ ')) + (lastLine(r.out) || `${verb} ${it.job}`));
mode = 'board'; confirmAction = null;
};
// ── render ──────────────────────────────────────────────────────────
const ENGINE_COLOR = { devin: 'cyan', claude: 'yellow', codex: 'green' };
const FLEET_STAGE_COLOR = {
queued: 'blue', assigned: 'yellow', building: 'yellow',
review: 'cyan', testing: 'cyan', shipped: 'green',
failed: 'red', dead_letter: 'red',
};
// drawFleetBoard() — the board sourced from the /fleet API (AQ_FLEET_DASH=1).
// Mirrors the local board layout; running rows reflect lease/factory status
// (there is no local PID/liveness in fleet mode).
function drawFleetBoard() {
items = buildItems();
syncSelection();
const board = fleetState.board;
const counts = board ? board.counts : { inbox: 0, building: 0, review: 0, testing: 0, shipped: 0, failed: 0 };
const running = board ? board.running : [];
const recent = board ? board.recent : [];
const out = [];
out.push('');
out.push(` ${C.bold}AGENT QUEUE${C.reset} ${c('cyan', 'fleet')} ${c('gray', FLEET.api)}`);
const staleSec = fleetState.lastOk ? Math.floor((Date.now() - fleetState.lastOk) / 1000) : null;
let statusBit;
if (fleetState.loading && !board) statusBit = c('gray', '◌ loading…');
else if (fleetState.error) statusBit = c('red', `${trunc(fleetState.error, 40)}${board ? ` (stale ${staleSec}s)` : ''}`);
else statusBit = c('green', `● live${staleSec !== null ? ` (${staleSec}s ago)` : ''}`);
out.push(
` ${c('gray', new Date().toLocaleTimeString())} refresh ${INTERVAL / 1000}s ${statusBit}` +
` ${c('gray', `product ${FLEET.productId}`)} ${c('gray', INTERACTIVE ? 'press ? for help' : 'read-only')}`
);
out.push('');
out.push(
` ${c('blue', '▢ inbox')} ${String(counts.inbox).padEnd(3)}` +
` ${c('yellow', '◧ building')} ${String(counts.building).padEnd(3)}` +
` ${c('cyan', '◔ review')} ${String(counts.review).padEnd(3)}` +
` ${c('cyan', '◕ testing')} ${String(counts.testing).padEnd(3)}` +
` ${c('green', '▣ shipped')} ${String(counts.shipped).padEnd(3)}` +
` ${c('red', '✕ failed')} ${String(counts.failed).padEnd(3)}` +
` ${C.bold}running ${running.length}${C.reset}`
);
out.push('');
// factories (per-factory rows when /fleet/factories exists; else metrics aggregate)
out.push(` ${C.bold}FACTORIES${C.reset}`);
const factories = board ? board.factories : [];
const metrics = board ? board.metrics : null;
if (factories.length > 0) {
for (const f of factories) {
const health = String(f.health || 'ok');
const hc = health === 'ok' ? 'green' : health === 'degraded' ? 'yellow' : 'red';
out.push(
` ${c('bold', trunc(f.factoryId || f.id || '?', 24).padEnd(24))} ` +
`${c(hc, health.padEnd(9))} ` +
`${c('gray', `load ${f.load ?? '?'}/${f.seatLimit ?? '?'}`)} ` +
`${c('gray', trunc((Array.isArray(f.capabilities) ? f.capabilities.join(', ') : f.capabilities) || '', 36))}`
);
}
} else if (metrics && metrics.factory) {
const fm = metrics.factory;
const bh = fm.byHealth || {};
out.push(
` ${c('green', `ok ${bh.ok ?? 0}`)} ${c('yellow', `degraded ${bh.degraded ?? 0}`)} ${c('red', `down ${bh.down ?? 0}`)}` +
` ${c('gray', `live ${fm.live ?? '?'} · stale ${fm.stale ?? '?'}`)}` +
` ${c('gray', `seats ${fm.seatsUsed ?? '?'}/${fm.seatsTotal ?? '?'}`)}` +
` ${c('gray', `util ${metrics.utilizationPct ?? '?'}%`)}`
);
} else {
out.push(` ${c('dim', 'no factory data')}`);
}
// alerts (from metrics)
if (metrics && Array.isArray(metrics.alerts) && metrics.alerts.length > 0) {
for (const a of metrics.alerts) {
const sev = a.severity === 'critical' ? 'red' : 'yellow';
out.push(` ${c(sev, '⚠ ')}${c(sev, a.kind || 'alert')}${a.message ? c('gray', `${trunc(a.message, 50)}`) : ''}`);
}
}
out.push('');
// running (lease/factory status — no local pid)
out.push(` ${C.bold}RUNNING${C.reset}`);
if (running.length === 0) {
out.push(` ${c('dim', 'no jobs in flight')}`);
} else {
for (const r of running) {
const sc = FLEET_STAGE_COLOR[r.fleetStage] || 'gray';
out.push(
` ${c('bold', trunc(r.id, 30).padEnd(30))} ` +
`${c(sc, String(r.fleetStage).padEnd(9))} ` +
`${c('gray', r.factoryId ? `@${trunc(r.factoryId, 18)}` : 'unassigned')}`
);
const mtags = manifestTags(r);
if (mtags) out.push(` ${mtags}`);
}
}
out.push('');
// actionable jobs (numbered + selectable) — reuses STAGE_TAG buckets
out.push(` ${C.bold}JOBS${C.reset} ${c('gray', '(review · testing · failed · inbox)')}`);
if (items.length === 0) {
out.push(` ${c('dim', board ? 'no actionable jobs' : 'waiting for fleet…')}`);
} else {
items.forEach((it, i) => {
const sel = i === selIdx;
const ptr = sel ? c('cyan', '▶') : ' ';
const num = c('gray', String(i + 1).padStart(2) + '.');
const tag = (STAGE_TAG[it.stage] || (() => `[${it.stage}]`))();
const name = sel ? `${C.bold}${trunc(it.job, 46)}${C.reset}` : trunc(it.job, 46);
out.push(` ${ptr} ${num} ${tag} ${name}`);
const jtags = manifestTags(it.fleet);
if (jtags) out.push(` ${jtags}`);
});
}
out.push('');
// recent (shipped + failed)
out.push(` ${C.bold}RECENT${C.reset}`);
if (recent.length === 0) {
out.push(` ${c('dim', 'nothing finished yet')}`);
} else {
for (const r of recent) {
const failedRes = r.stage === 'failed';
const mark = failedRes ? c('red', '✕') : c('green', '▣');
const label = failedRes ? c('red', r.fleetStage) : c('green', r.fleetStage);
const when = r.updatedAt ? new Date(r.updatedAt).toLocaleTimeString() : '';
out.push(` ${mark} ${trunc(r.id, 34).padEnd(34)} ${label} ${c('gray', when)}`);
}
}
out.push('');
// flash + footer
const fl = flashLine();
if (fl) out.push(` ${fl}`);
if (mode === 'confirm' && confirmAction) {
out.push(` ${c('yellow', `${confirmAction.verb} "${confirmAction.job}" ? `)}${C.bold}y${C.reset}${c('gray', '/')}${C.bold}n${C.reset}`);
} else if (INTERACTIVE) {
out.push(c('gray', ' ↑/↓ select · enter events · s ship · x reject · u requeue'));
out.push(c('gray', ' g refresh · ? help · q quit'));
}
process.stdout.write('\x1b[2J\x1b[H' + out.join('\n') + '\n');
}
function drawBoard() {
if (FLEET.enabled) return drawFleetBoard();
const metas = readMetas();
const metaByJob = Object.fromEntries(metas.filter((m) => m.job).map((m) => [m.job, m]));
const running = metas.filter((m) => !m.ended && pidAlive(m.pid));
const finished = metas
.filter((m) => m.ended)
.sort((a, b) => Number(b.ended) - Number(a.ended));
const counts = {
inbox: count(DIRS.inbox), building: count(DIRS.building),
review: count(DIRS.review), testing: count(DIRS.testing),
shipped: count(DIRS.shipped), failed: count(DIRS.failed),
};
// rebuild actionable list + keep selection stable
items = buildItems();
syncSelection();
const loop = daemonPid();
const out = [];
out.push('');
out.push(` ${C.bold}AGENT QUEUE${C.reset} ${c('gray', ROOT)}`);
out.push(
` ${c('gray', new Date().toLocaleTimeString())} refresh ${INTERVAL / 1000}s ` +
(loop ? c('green', `● run loop pid ${loop}`) : c('gray', '○ run loop stopped')) +
` ${c('gray', INTERACTIVE ? 'press ? for help' : 'read-only')}`
);
out.push('');
out.push(
` ${c('blue', '▢ inbox')} ${String(counts.inbox).padEnd(3)}` +
` ${c('yellow', '◧ building')} ${String(counts.building).padEnd(3)}` +
` ${c('cyan', '◔ review')} ${String(counts.review).padEnd(3)}` +
` ${c('cyan', '◕ testing')} ${String(counts.testing).padEnd(3)}` +
` ${c('green', '▣ shipped')} ${String(counts.shipped).padEnd(3)}` +
` ${c('red', '✕ failed')} ${String(counts.failed).padEnd(3)}` +
` ${C.bold}running ${running.length}${C.reset}`
);
out.push('');
// running table
out.push(` ${C.bold}RUNNING${C.reset}`);
if (running.length === 0) {
out.push(` ${c('dim', 'no workers running')}`);
} else {
for (const m of running) {
const eng = m.engine || '?';
const engC = ENGINE_COLOR[eng] || 'gray';
const age = logAgeSec(m.job);
const stalled = age !== null && age > STALL_MIN * 60;
out.push(
` ${c('bold', trunc(m.job || '?', 30).padEnd(30))} ` +
`${c(engC, eng.padEnd(7))} ` +
`${fmtElapsed(m.started).padStart(7)} ` +
`${c('gray', 'pid ' + (m.pid || '?'))}` +
`${stalled ? ' ' + c('red', '⚠ stalled') : ''}`
);
out.push(` ${c('dim', trunc(shortPath(m.cwd || ''), 70))}`);
const mtags = manifestTags(m);
if (mtags) out.push(` ${mtags}`);
const last = lastLogLine(m.job);
if (last) out.push(` ${c('cyan', ' ')}${c('dim', trunc(last, 70))}`);
}
}
out.push('');
// actionable job list (numbered + selectable)
out.push(` ${C.bold}JOBS${C.reset} ${c('gray', '(review · testing · failed · inbox)')}`);
if (items.length === 0) {
out.push(` ${c('dim', 'no actionable jobs')}`);
} else {
items.forEach((it, i) => {
const sel = i === selIdx;
const ptr = sel ? c('cyan', '▶') : ' ';
const num = c('gray', String(i + 1).padStart(2) + '.');
const tag = (STAGE_TAG[it.stage] || (() => `[${it.stage}]`))();
const name = sel ? `${C.bold}${trunc(it.job, 46)}${C.reset}` : trunc(it.job, 46);
out.push(` ${ptr} ${num} ${tag} ${name}`);
const jtags = manifestTags(metaByJob[it.job] || readManifest(it.stage, it.job));
if (jtags) out.push(` ${jtags}`);
});
}
out.push('');
// recent finished
out.push(` ${C.bold}RECENT${C.reset}`);
const recent = finished.slice(0, 5);
if (recent.length === 0) {
out.push(` ${c('dim', 'nothing finished yet')}`);
} else {
for (const m of recent) {
const res = m.result || '';
const failedRes = res === 'failed' || res === 'timeout' || res === 'verify_failed' ||
res === 'rejected' || res === 'retries_exhausted' || res === 'capability_mismatch' ||
res === 'budget_exceeded' || res === 'no_engine';
const mark = failedRes ? c('red', '✕') : c('green', '▣');
const when = m.ended ? new Date(Number(m.ended) * 1000).toLocaleTimeString() : '';
let label;
if (res === 'shipped') label = c('green', 'shipped');
else if (res === 'testing') label = c('cyan', 'testing (QA)');
else if (res === 'review') label = c('cyan', 'review');
else if (res === 'verify_failed') label = c('red', 'verify failed');
else if (res === 'timeout') label = c('red', 'timeout');
else if (res === 'budget_exceeded') label = c('red', 'budget exceeded');
else if (res === 'rejected') label = c('red', 'rejected');
else if (res === 'retries_exhausted') label = c('red', 'retries exhausted');
else if (res === 'failed') label = c('red', 'failed rc=' + (m.exit || '?'));
else label = c('gray', res || '?');
out.push(
` ${mark} ${trunc(m.job || '?', 34).padEnd(34)} ` +
`${c('gray', (m.engine || '').padEnd(7))} ` +
`${label} ${c('gray', when)} ${c('cyan', insightsTag(m))}`
);
}
}
out.push('');
// flash + footer
const fl = flashLine();
if (fl) out.push(` ${fl}`);
if (mode === 'confirm' && confirmAction) {
out.push(` ${c('yellow', `${confirmAction.verb} "${confirmAction.job}" ? `)}${C.bold}y${C.reset}${c('gray', '/')}${C.bold}n${C.reset}`);
} else if (INTERACTIVE) {
out.push(c('gray', ' ↑/↓ select · enter logs · p promote · s ship · x reject · u requeue'));
out.push(c('gray', ' r run · S stop · g refresh · ? help · q quit'));
}
process.stdout.write('\x1b[2J\x1b[H' + out.join('\n') + '\n');
}
function drawLog() {
const rows = (process.stdout.rows || 30) - 6;
if (FLEET.enabled) {
let body;
if (fleetEvents.loading) body = c('gray', ' loading events…');
else if (fleetEvents.error) body = c('red', ` ${fleetEvents.error}`);
else if (!fleetEvents.lines.length) body = c('gray', ' no events for this job');
else body = fleetEvents.lines.slice(-rows).join('\n');
const head = ` ${C.bold}EVENTS${C.reset} ${c('cyan', logJob)} ${c('gray', 'q/esc back · g refresh')}`;
process.stdout.write('\x1b[2J\x1b[H' + head + '\n' + c('gray', ' ' + '─'.repeat(60)) + '\n' + body + '\n');
return;
}
let body = `no log for ${logJob}`;
try {
const txt = fs.readFileSync(path.join(DIRS.logs, `${logJob}.log`), 'utf8');
body = txt.split('\n').slice(-rows).join('\n');
} catch { /* keep default */ }
const head = ` ${C.bold}LOG${C.reset} ${c('cyan', logJob)} ${c('gray', 'q/esc back · g refresh')}`;
process.stdout.write('\x1b[2J\x1b[H' + head + '\n' + c('gray', ' ' + '─'.repeat(60)) + '\n' + body + '\n');
}
function drawHelp() {
const L = [
'', ` ${C.bold}AGENT QUEUE — keys${C.reset}`,
FLEET.enabled ? ` ${c('cyan', 'fleet mode')} ${c('gray', '— board sourced from /fleet API; run/stop/promote disabled')}` : '', '',
` ${c('cyan', '↑/↓, j/k, 1-9')} select a job in the JOBS list`,
` ${c('cyan', 'enter / l')} ${FLEET.enabled ? "view the selected job's events" : "view the selected job's log (live)"}`,
` ${c('cyan', 'p')} promote (review → testing → shipped)`,
` ${c('cyan', 's')} ship (testing/QA → shipped, the manual gate)`,
` ${c('cyan', 'x')} reject (review/testing → failed) ${c('gray', '[confirm]')}`,
` ${c('cyan', 'u')} requeue (failed/review/testing → inbox) ${c('gray', '[confirm]')}`,
'',
` ${c('cyan', 'r')} start the run loop (detached, max ${process.env.AGENT_QUEUE_MAX || 3})`,
` ${c('cyan', 'S')} stop the run loop + running workers`,
` ${c('cyan', 'g')} refresh now`,
` ${c('cyan', '? / h')} toggle this help`,
` ${c('cyan', 'q / Ctrl-C')} quit`,
'',
` ${c('gray', 'Lifecycle: inbox → building → review → testing → shipped (+ failed)')}`,
` ${c('gray', 'auto: rc=0 → review; verify pass → testing; verify fail → failed')}`,
` ${c('gray', 'manual: ship (testing → shipped)')}`,
'', ` ${c('gray', 'press any key to return')}`, '',
];
process.stdout.write('\x1b[2J\x1b[H' + L.join('\n') + '\n');
}
const draw = () => {
if (mode === 'log') drawLog();
else if (mode === 'help') drawHelp();
else drawBoard();
};
// ── main loop + key handling ────────────────────────────────────────
// Fleet mode (AQ_FLEET_DASH=1) sources the board from the /fleet API on an async,
// single-flight tick loop. Local mode keeps the original synchronous setInterval
// path byte-for-byte. `timer` holds whichever timer is live so quit() can clear it.
let timer = null;
if (FLEET.enabled) {
if (!FLEET.ok) {
process.stdout.write(
`agent-queue: fleet dashboard enabled (AQ_FLEET_DASH=1) but missing config:\n` +
` ${FLEET.missing.join(', ')}\n` +
`Set AQ_FLEET_API, AQ_FLEET_TOKEN and AQ_PRODUCT_ID, or unset AQ_FLEET_DASH.\n`
);
process.exit(1);
}
draw(); // initial frame (loading…)
const tick = async () => {
await refreshFleet();
if (mode !== 'log' && mode !== 'help') draw();
timer = setTimeout(tick, INTERVAL); // single-flight: schedule only after the await
};
tick();
} else {
if (!fs.existsSync(ROOT)) {
process.stdout.write(`agent-queue: queue root not found: ${ROOT}\nRun \`agent-queue.sh init\` first.\n`);
process.exit(1);
}
draw();
timer = setInterval(draw, INTERVAL);
}
const quit = () => {
if (timer) { clearTimeout(timer); clearInterval(timer); }
try { if (process.stdin.isTTY) process.stdin.setRawMode(false); } catch { /* noop */ }
process.stdout.write(C.reset + '\n');
process.exit(0);
};
const moveSel = (delta) => {
if (items.length === 0) return;
selIdx = (selIdx + delta + items.length) % items.length;
selJob = items[selIdx]?.job ?? null;
};
function onKey(key) {
// global quit
if (key === '\u0003') return quit(); // Ctrl-C always quits
if (mode === 'help') { mode = 'board'; return draw(); }
if (mode === 'log') {
if (key === 'q' || key === '\u001b' || key === '\r' || key === '\n') { mode = 'board'; logJob = null; }
else if (key === 'g') { if (FLEET.enabled && logJob) refreshFleetEvents(logJob); }
return draw();
}
if (mode === 'confirm') {
if (key === 'y' || key === 'Y') confirmAction?.run();
else { mode = 'board'; confirmAction = null; setFlash(c('gray', 'cancelled')); }
return draw();
}
// board mode
switch (key) {
case 'q': return quit();
case '?': case 'h': mode = 'help'; break;
case 'g': break; // just redraw
case 'j': case '\u001b[B': moveSel(1); break;
case 'k': case '\u001b[A': moveSel(-1); break;
case '\r': case '\n': case 'l':
if (items[selIdx]) {
logJob = items[selIdx].job; mode = 'log';
if (FLEET.enabled) refreshFleetEvents(logJob);
}
break;
case 'p': doAction('promote'); break;
case 's': doAction('ship'); break;
case 'x': doAction('reject'); break;
case 'u': doAction('requeue'); break;
case 'r': if (FLEET.enabled) setFlash(c('gray', 'run loop n/a in fleet mode')); else startRun(); break;
case 'S':
if (FLEET.enabled) { setFlash(c('gray', 'stop n/a in fleet mode')); break; }
{ const res = aq(['stop']); setFlash(c('red', '■ ') + (lastLine(res.out) || 'stopped')); break; }
default:
if (/^[1-9]$/.test(key)) {
const i = parseInt(key, 10) - 1;
if (i < items.length) { selIdx = i; selJob = items[i].job; }
} else { return; } // ignore unknown keys (no redraw)
}
draw();
}
if (INTERACTIVE) {
process.stdin.setRawMode(true);
process.stdin.resume();
process.stdin.setEncoding('utf8');
process.stdin.on('data', onKey);
}
process.on('SIGINT', quit);
process.on('SIGTERM', quit);