bytelyst-devops-tools/agent-queue/lib/fleet-dash.mjs
Saravanakumar D 66c91233da feat(agent-queue): re-point TUI dashboard at /fleet API (parity)
Add an opt-in fleet mode to the dashboard so an operator can drive the
coordinator fleet from the same TUI used for the local folder queue.

- lib/fleet-dash.mjs: dependency-injectable read/act adapter over the
  platform-service /fleet REST surface (jobs, metrics, factories, events,
  ship/requeue/reject). Pure-ish + fully unit-testable without a live service.
- dashboard.mjs: render + act in fleet mode when AQ_FLEET_DASH=1 — board with
  counts, factories (per-factory rows or metrics aggregate), alerts, running
  (by lease/factory), actionable JOBS with manifest tags, recent, and a
  per-job events log. Single-flight async refresh keeps the last good board on
  failure; ship re-GETs a fresh leaseEpoch before PATCH; run/stop/promote are
  disabled (no safe server contract). Local mode is byte-for-byte unchanged.
- lib/fleet-dash.test.mjs: 22 node:assert assertions (config, stage mapping,
  toBoard, fetch headers/timeout/errors, board assembly + graceful degradation,
  events, job actions) wired into selftest.sh.
- docs: tick the Phase 3 "TUI re-pointed at /fleet" roadmap boxes.

Verified: selftest.sh green (incl. new fleet-dash checks); live non-TTY render
smoke against a stub /fleet server (both factories and metrics-aggregate paths);
local mode unchanged.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
2026-05-30 19:47:56 -07:00

219 lines
9.8 KiB
JavaScript

// fleet-dash.mjs — read/act adapter that re-points the agent-queue TUI dashboard
// at the platform-service `/fleet` REST API (roadmap Phase 3: "TUI dashboard
// re-pointed at /fleet API (parity)").
//
// This module is intentionally pure-ish and dependency-injectable (the HTTP
// `fetchImpl` is a parameter) so it is unit-testable WITHOUT a live service.
// dashboard.mjs uses it ONLY when AQ_FLEET_DASH=1; otherwise the dashboard's
// local-queue behavior is byte-for-byte unchanged.
//
// Auth + scoping mirror agent-queue/lib/fleet-client.sh:
// base URL AQ_FLEET_API (already includes /api)
// bearer AQ_FLEET_TOKEN
// product X-Product-Id: AQ_PRODUCT_ID (sent on every request)
const DEFAULT_TIMEOUT_MS = 8000;
// fleetConfig(env) — resolve the dashboard's fleet mode from the environment.
// enabled iff AQ_FLEET_DASH=1 (explicit opt-in). `ok` additionally requires the
// api/token/product config to be complete; `missing` lists what's absent so the
// dashboard can fail visibly instead of silently doing nothing.
export function fleetConfig(env = process.env) {
const enabled = String(env.AQ_FLEET_DASH || '') === '1';
const api = String(env.AQ_FLEET_API || '').replace(/\/+$/, '');
const token = String(env.AQ_FLEET_TOKEN || '');
const productId = String(env.AQ_PRODUCT_ID || '');
const missing = [];
if (enabled) {
if (!api) missing.push('AQ_FLEET_API');
if (!token) missing.push('AQ_FLEET_TOKEN');
if (!productId) missing.push('AQ_PRODUCT_ID');
}
return { enabled, api, token, productId, ok: enabled && missing.length === 0, missing };
}
// ── stage mapping ───────────────────────────────────────────────────────────
// Fleet stages collapse onto the local board's kanban buckets so the dashboard's
// existing layout/gating/STAGE_TAG logic can be reused unchanged.
const STAGE_BUCKET = {
queued: 'inbox',
assigned: 'building',
building: 'building',
review: 'review',
testing: 'testing',
shipped: 'shipped',
failed: 'failed',
dead_letter: 'failed',
};
export const mapStage = (s) => STAGE_BUCKET[s] || 'inbox';
// Stages an operator can act on from the dashboard (parity with the local
// ACTION_STAGES = review · testing · failed · inbox).
const ACTIONABLE = new Set(['review', 'testing', 'failed', 'inbox']);
const RUNNING_STAGES = new Set(['assigned', 'building']);
const ITEM_ORDER = { review: 0, testing: 1, failed: 2, inbox: 3 };
const trackerOf = (j) => j.trackerItemId || j.trackerItem || (j.data && j.data.trackerItem) || '';
const capsOf = (j) => (Array.isArray(j.capabilities) ? j.capabilities.join(', ') : String(j.capabilities || ''));
// toBoard({jobs, factories, metrics}) — normalize the raw API payloads into the
// shape the dashboard renders. Pure (no I/O), so it is fully unit-testable.
export function toBoard({ jobs = [], factories = [], metrics = null } = {}) {
const counts = { inbox: 0, building: 0, review: 0, testing: 0, shipped: 0, failed: 0 };
const items = [];
const running = [];
const recent = [];
for (const j of jobs) {
const bucket = mapStage(j.stage);
if (counts[bucket] !== undefined) counts[bucket] += 1;
const norm = {
// `stage` is the bucket so the dashboard's gate()/STAGE_TAG work unchanged;
// `fleetStage` keeps the true server stage for display.
stage: bucket,
fleetStage: j.stage,
id: j.id,
priority: j.priority || 'medium',
profile: j.profile || '',
capabilities: capsOf(j),
tracker_item: trackerOf(j),
leaseEpoch: j.leaseEpoch,
factoryId: j.factoryId || j.leaseFactoryId || '',
attempts: j.attempts,
updatedAt: j.updatedAt || j.createdAt || '',
raw: j,
};
if (RUNNING_STAGES.has(j.stage)) running.push(norm);
if (ACTIONABLE.has(bucket)) items.push(norm);
if (bucket === 'shipped' || bucket === 'failed') recent.push(norm);
}
items.sort((a, b) => (ITEM_ORDER[a.stage] - ITEM_ORDER[b.stage]) || cmp(a.id, b.id));
recent.sort((a, b) => cmp(String(b.updatedAt), String(a.updatedAt)));
return { counts, items, running, recent: recent.slice(0, 5), factories, metrics };
}
const cmp = (a, b) => (a < b ? -1 : a > b ? 1 : 0);
// ── HTTP ──────────────────────────────────────────────────────────────────--
// fleetFetch — a single request against the coordinator. NEVER throws: network
// errors / timeouts / non-JSON bodies are returned as a structured result so the
// TUI stays responsive. A timeout is enforced via AbortController.
export async function fleetFetch(cfg, pathname, opts = {}, fetchImpl = globalThis.fetch) {
const url = `${cfg.api}${pathname}`;
const headers = {
Authorization: `Bearer ${cfg.token}`,
'X-Product-Id': cfg.productId,
Accept: 'application/json',
};
const hasBody = opts.body !== undefined;
if (hasBody) headers['Content-Type'] = 'application/json';
const ac = new AbortController();
const timer = setTimeout(() => ac.abort(), opts.timeoutMs || DEFAULT_TIMEOUT_MS);
try {
const res = await fetchImpl(url, {
method: opts.method || 'GET',
headers,
body: hasBody ? JSON.stringify(opts.body) : undefined,
signal: ac.signal,
});
let json = null;
let text = '';
try { text = await res.text(); } catch { /* ignore body read errors */ }
if (text) { try { json = JSON.parse(text); } catch { json = null; } }
return { ok: !!res.ok, status: res.status, json };
} catch (e) {
const timedOut = e && (e.name === 'AbortError' || e.code === 'ABORT_ERR');
return { ok: false, status: 0, json: null, error: timedOut ? 'timeout' : (e && e.message) || 'network error' };
} finally {
clearTimeout(timer);
}
}
// fetchBoard — assemble the board model. Jobs are REQUIRED (failure ⇒ board
// fails). Metrics + factories are best-effort: a missing/404/501 factories
// endpoint degrades to [] (it is optional server-side), and absent metrics
// simply omits the aggregate panel.
export async function fetchBoard(cfg, fetchImpl = globalThis.fetch) {
const jobsRes = await fleetFetch(cfg, '/fleet/jobs', {}, fetchImpl);
if (!jobsRes.ok || !jobsRes.json) {
return { ok: false, error: jobsRes.error || `jobs HTTP ${jobsRes.status}` };
}
const jobs = Array.isArray(jobsRes.json.jobs) ? jobsRes.json.jobs : [];
const metricsRes = await fleetFetch(cfg, '/fleet/metrics', {}, fetchImpl);
const metrics = metricsRes.ok && metricsRes.json ? metricsRes.json : null;
const facRes = await fleetFetch(cfg, '/fleet/factories', {}, fetchImpl);
const factories = facRes.ok && facRes.json && Array.isArray(facRes.json.factories)
? facRes.json.factories
: [];
return { ok: true, board: toBoard({ jobs, factories, metrics }) };
}
// formatEvent — one fleet event → a single log line for the TUI log view.
export function formatEvent(e) {
const at = e.at ? safeTime(e.at) : '';
const actor = e.actor ? ` ${e.actor}` : '';
const data = e.data && typeof e.data === 'object' && Object.keys(e.data).length
? ` ${JSON.stringify(e.data)}`
: '';
return `${at} ${e.type || '?'}${actor}${data}`.trim();
}
const safeTime = (iso) => {
const d = new Date(iso);
return Number.isNaN(d.getTime()) ? String(iso) : d.toLocaleTimeString();
};
// fetchEvents — the job's event stream rendered as log lines.
export async function fetchEvents(cfg, jobId, fetchImpl = globalThis.fetch) {
const res = await fleetFetch(cfg, `/fleet/jobs/${encodeURIComponent(jobId)}/events`, {}, fetchImpl);
if (!res.ok || !res.json) {
return { ok: false, error: res.error || `events HTTP ${res.status}`, lines: [] };
}
const events = Array.isArray(res.json.events) ? res.json.events : [];
return { ok: true, lines: events.map(formatEvent) };
}
// Operator verbs the dashboard supports in fleet mode. `promote` has no safe
// server contract (client-inferred stage transitions could violate workflow
// invariants), so it is explicitly unavailable here.
const FLEET_VERBS = new Set(['ship', 'requeue', 'reject']);
// jobAction — execute an operator verb against the coordinator.
// ship → re-GET the job for a FRESH leaseEpoch, then PATCH stage=shipped
// (a stale snapshot epoch would be fenced with 409).
// requeue → POST /actions/requeue (lease-free operator action)
// reject → POST /actions/reject
// Returns {ok, message}; never throws.
export async function jobAction(cfg, item, verb, fetchImpl = globalThis.fetch) {
if (verb === 'promote') return { ok: false, message: 'promote is not available in fleet mode' };
if (!FLEET_VERBS.has(verb)) return { ok: false, message: `${verb} not supported in fleet mode` };
const id = item && item.id;
if (!id) return { ok: false, message: 'no job selected' };
if (verb === 'ship') {
const cur = await fleetFetch(cfg, `/fleet/jobs/${encodeURIComponent(id)}`, {}, fetchImpl);
if (!cur.ok || !cur.json) return { ok: false, message: cur.error || `job HTTP ${cur.status}` };
const res = await fleetFetch(
cfg,
`/fleet/jobs/${encodeURIComponent(id)}`,
{ method: 'PATCH', body: { stage: 'shipped', leaseEpoch: cur.json.leaseEpoch } },
fetchImpl,
);
if (res.status === 409) return { ok: false, message: 'job changed (fenced) — refresh and retry' };
if (!res.ok) return { ok: false, message: res.error || `ship HTTP ${res.status}` };
return { ok: true, message: `shipped ${id}` };
}
const res = await fleetFetch(
cfg,
`/fleet/jobs/${encodeURIComponent(id)}/actions/${verb}`,
{ method: 'POST', body: {} },
fetchImpl,
);
if (res.status === 409) return { ok: false, message: 'job conflict/terminal — refresh and retry' };
if (!res.ok) return { ok: false, message: res.error || `${verb} HTTP ${res.status}` };
return { ok: true, message: `${verb} ${id}` };
}