feat: add fleet metrics + alerting (GET /fleet/metrics)

Adds coordinator.fleetMetrics() computing queue depth, stage histogram,
oldest-queued age (starvation signal), factory health and seat utilization,
plus derived alerts (no_live_capacity, all_factories_down, queue_starvation,
saturated, stale_factories). Exposed via GET /fleet/metrics and surfaced as a
metrics+alerts panel on the fleet overview. Thresholds injectable for tests.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
This commit is contained in:
Saravanakumar D 2026-05-30 18:51:59 -07:00
parent d780739cbe
commit c9c2c174db
9 changed files with 414 additions and 3 deletions

View File

@ -170,6 +170,36 @@ async function mockFleet(
}
if (path.endsWith('/factories')) return route.fulfill({ json: { factories: [FACTORY] } });
// ── Fleet metrics ──
if (path.endsWith('/metrics')) {
return route.fulfill({
json: {
productId: 'lysnrai',
generatedAt: ISO,
jobs: {
total: 1,
byStage: { queued: 1 },
queueDepth: 1,
blocked: 0,
active: 0,
oldestQueuedAgeMs: 1000,
},
factories: {
total: 1,
live: 1,
stale: 0,
byHealth: { ok: 1, degraded: 0, down: 0 },
seatsUsed: 1,
seatsTotal: 4,
utilizationPct: 25,
},
alerts: [
{ level: 'warning', code: 'queue_starvation', message: 'A job has waited too long.' },
],
},
});
}
return route.fulfill({ json: {} });
});
@ -183,6 +213,8 @@ test.describe('Fleet — Overview', () => {
await page.goto('/dashboard/fleet');
await expect(page.getByRole('heading', { name: 'Fleet Control Plane' })).toBeVisible();
await expect(page.getByTestId('fleet-metrics')).toBeVisible();
await expect(page.getByTestId('fleet-alerts')).toBeVisible();
await expect(page.getByLabel('Factory factory-alpha')).toBeVisible();
await expect(page.getByRole('table', { name: 'Recent fleet jobs' })).toBeVisible();
await expect(page.getByRole('link', { name: 'feat-x' })).toBeVisible();

View File

@ -22,6 +22,7 @@ import {
getJobDag,
getJobExplain,
listFactories,
getFleetMetrics,
getBudget,
getBudgetBurndown,
upsertBudget,
@ -163,6 +164,42 @@ describe('fleet-client', () => {
});
});
describe('getFleetMetrics', () => {
it('returns metrics on success', async () => {
fetchSpy.mockResolvedValue({
productId: 'lysnrai',
generatedAt: new Date().toISOString(),
jobs: {
total: 1,
byStage: {},
queueDepth: 1,
blocked: 0,
active: 0,
oldestQueuedAgeMs: null,
},
factories: {
total: 0,
live: 0,
stale: 0,
byHealth: { ok: 0, degraded: 0, down: 0 },
seatsUsed: 0,
seatsTotal: 0,
utilizationPct: 0,
},
alerts: [],
});
const res = await getFleetMetrics();
expect(res?.jobs.queueDepth).toBe(1);
expect(fetchSpy).toHaveBeenCalledWith('/metrics', expect.anything());
});
it('returns null on 404', async () => {
fetchSpy.mockRejectedValue(new Error('404 Not Found'));
const res = await getFleetMetrics();
expect(res).toBeNull();
});
});
describe('listFactories', () => {
it('returns factories on success', async () => {
fetchSpy.mockResolvedValue({ factories: [{ id: 'f1' }] });

View File

@ -4,7 +4,14 @@ import { useEffect, useState, useCallback } from 'react';
import Link from 'next/link';
import { PageHeader } from '@bytelyst/dashboard-components';
import { useAuth } from '@/lib/auth-context';
import { listFactories, listJobs, type FleetFactory, type FleetJob } from '@/lib/fleet-client';
import {
listFactories,
listJobs,
getFleetMetrics,
type FleetFactory,
type FleetJob,
type FleetMetrics,
} from '@/lib/fleet-client';
const POLL_INTERVAL = 30_000;
@ -23,6 +30,15 @@ function HealthBadge({ health }: { health: string }) {
);
}
function MetricCard({ label, value }: { label: string; value: number | string }) {
return (
<div className="rounded-lg border p-3">
<p className="text-xs text-muted-foreground">{label}</p>
<p className="text-2xl font-semibold tabular-nums">{value}</p>
</div>
);
}
function StageBadge({ stage }: { stage: string }) {
const colors: Record<string, string> = {
queued: 'bg-blue-500/20 text-blue-700 dark:text-blue-400',
@ -44,13 +60,19 @@ export default function FleetOverviewPage() {
const { token } = useAuth();
const [factories, setFactories] = useState<FleetFactory[]>([]);
const [jobs, setJobs] = useState<FleetJob[]>([]);
const [metrics, setMetrics] = useState<FleetMetrics | null>(null);
const [loading, setLoading] = useState(true);
const refresh = useCallback(async () => {
try {
const [facRes, jobRes] = await Promise.all([listFactories(), listJobs({ limit: 10 })]);
const [facRes, jobRes, metricsRes] = await Promise.all([
listFactories(),
listJobs({ limit: 10 }),
getFleetMetrics().catch(() => null),
]);
setFactories(facRes.factories);
setJobs(jobRes.jobs);
setMetrics(metricsRes);
} catch {
/* degrade gracefully */
} finally {
@ -78,6 +100,40 @@ export default function FleetOverviewPage() {
<div className="p-6 space-y-8">
<PageHeader title="Fleet Control Plane" />
{/* Metrics + alerts */}
{metrics && (
<section aria-label="Fleet metrics" data-testid="fleet-metrics">
{metrics.alerts.length > 0 && (
<div className="space-y-2 mb-4" data-testid="fleet-alerts">
{metrics.alerts.map(a => (
<div
key={a.code}
role="alert"
className={`rounded-md border px-3 py-2 text-sm ${
a.level === 'critical'
? 'border-red-500/40 bg-red-500/10 text-red-700 dark:text-red-400'
: 'border-yellow-500/40 bg-yellow-500/10 text-yellow-700 dark:text-yellow-400'
}`}
>
<span className="font-medium uppercase mr-2">{a.level}</span>
{a.message}
</div>
))}
</div>
)}
<div className="grid gap-4 grid-cols-2 sm:grid-cols-3 lg:grid-cols-5">
<MetricCard label="Queue depth" value={metrics.jobs.queueDepth} />
<MetricCard label="Active" value={metrics.jobs.active} />
<MetricCard label="Blocked" value={metrics.jobs.blocked} />
<MetricCard
label="Live factories"
value={`${metrics.factories.live}/${metrics.factories.total}`}
/>
<MetricCard label="Utilization" value={`${metrics.factories.utilizationPct}%`} />
</div>
</section>
)}
{/* Factory cards */}
<section>
<h2 className="text-lg font-semibold mb-3">Factories</h2>

View File

@ -200,6 +200,41 @@ export async function getJobEvents(jobId: string): Promise<{ events: FleetEvent[
return apiFetch(`/jobs/${jobId}/events`);
}
// ── Fleet metrics + alerting ──────────────────────────────────────────────────
export interface FleetAlert {
level: 'warning' | 'critical';
code: string;
message: string;
}
export interface FleetMetrics {
productId: string;
generatedAt: string;
jobs: {
total: number;
byStage: Record<string, number>;
queueDepth: number;
blocked: number;
active: number;
oldestQueuedAgeMs: number | null;
};
factories: {
total: number;
live: number;
stale: number;
byHealth: { ok: number; degraded: number; down: number };
seatsUsed: number;
seatsTotal: number;
utilizationPct: number;
};
alerts: FleetAlert[];
}
export async function getFleetMetrics(): Promise<FleetMetrics | null> {
return apiFetchOptional('/metrics');
}
// ── Live event stream (SSE) ───────────────────────────────────────────────────
export interface ParsedSseEvent {

View File

@ -63,7 +63,11 @@
- [ ] **Phase-1 `budget.wall` enforcement** — P3 — `agent-queue.sh` — wall-clock ceiling extending timeout.
- [ ] **Node `dash` tag surfacing** — P3 — `dashboard.mjs` — profile/priority/caps/tracker-item link.
- [ ] **Roadmap §14 reconciliation** — P3 — tick Phase-2/3 boxes in `learning_ai_devops_tools`.
- [ ] **Fleet metrics + alerting** — P3 — queue depth, assign latency, utilization, reclaim counts (§17).
- [x] **Fleet metrics + alerting** — P3 — ✅ DONE — `GET /fleet/metrics` (`coordinator.fleetMetrics`):
queue depth, stage histogram, oldest-queued age (starvation), factory health/seat utilization, and
derived alerts (`no_live_capacity`, `all_factories_down`, `queue_starvation`, `saturated`,
`stale_factories`). Surfaced as a metrics+alerts panel on the fleet overview (`getFleetMetrics`).
Files: `coordinator.ts`, `routes.ts`, `fleet-client.ts`, `dashboard/fleet/page.tsx` + tests + e2e.
- [ ] **Multi-reviewer routing** — P3 — Phase-3 §14.
- [ ] **TUI re-point at `/fleet`** — P3 — Phase-3 §14.

View File

@ -951,4 +951,87 @@ describe('fleet coordinator — Phase 3 per-product budgets', () => {
expect(burndown.totalUsd).toBe(0);
expect(burndown.days.every(d => d.costUsd === 0)).toBe(true);
});
// ── FLEET METRICS + ALERTING (§17) ──
it('fleetMetrics: empty fleet reports zeros and no alerts', async () => {
const m = await coord.fleetMetrics('emptyproduct');
expect(m.jobs.total).toBe(0);
expect(m.jobs.queueDepth).toBe(0);
expect(m.jobs.oldestQueuedAgeMs).toBeNull();
expect(m.factories.total).toBe(0);
expect(m.factories.utilizationPct).toBe(0);
expect(m.alerts).toEqual([]);
});
it('fleetMetrics: counts stages, seats, and utilization', async () => {
await coord.submitJob(PID, input({ idempotencyKey: 'q1' }));
await coord.submitJob(PID, input({ idempotencyKey: 'q2' }));
await coord.heartbeat({
productId: PID,
factoryId: 'fac_1',
health: 'ok',
load: 1,
seatLimit: 2,
});
await coord.heartbeat({
productId: PID,
factoryId: 'fac_2',
health: 'degraded',
load: 2,
seatLimit: 2,
});
const m = await coord.fleetMetrics(PID);
expect(m.jobs.total).toBe(2);
expect(m.jobs.queueDepth).toBe(2);
expect(m.jobs.byStage.queued).toBe(2);
expect(m.factories.total).toBe(2);
expect(m.factories.byHealth).toEqual({ ok: 1, degraded: 1, down: 0 });
expect(m.factories.seatsUsed).toBe(3);
expect(m.factories.seatsTotal).toBe(4);
expect(m.factories.utilizationPct).toBe(75);
});
it('fleetMetrics: raises no_live_capacity when jobs are queued with no healthy factory', async () => {
await coord.submitJob(PID, input({ idempotencyKey: 'q1' }));
const m = await coord.fleetMetrics(PID);
expect(m.alerts.some(a => a.code === 'no_live_capacity' && a.level === 'critical')).toBe(true);
});
it('fleetMetrics: raises all_factories_down and stale_factories', async () => {
await coord.heartbeat({
productId: PID,
factoryId: 'fac_1',
health: 'down',
load: 0,
seatLimit: 1,
});
// Force staleness by evaluating far in the future.
const future = Date.now() + 10 * 60_000;
const m = await coord.fleetMetrics(PID, { nowMs: future, staleMaxAgeMs: 60_000 });
expect(m.factories.stale).toBe(1);
expect(m.factories.live).toBe(0);
expect(m.alerts.some(a => a.code === 'all_factories_down')).toBe(true);
expect(m.alerts.some(a => a.code === 'stale_factories')).toBe(true);
});
it('fleetMetrics: raises queue_starvation and saturated under load', async () => {
await coord.submitJob(PID, input({ idempotencyKey: 'q1' }));
await coord.heartbeat({
productId: PID,
factoryId: 'fac_1',
health: 'ok',
load: 1,
seatLimit: 1,
});
// Evaluate 20 minutes later so the queued job exceeds the 1s starvation bound.
const later = Date.now() + 20 * 60_000;
const m = await coord.fleetMetrics(PID, {
nowMs: later,
starvationMs: 1000,
staleMaxAgeMs: 60 * 60_000,
});
expect(m.alerts.some(a => a.code === 'queue_starvation')).toBe(true);
expect(m.alerts.some(a => a.code === 'saturated')).toBe(true);
});
});

View File

@ -35,6 +35,7 @@ import {
} from './scheduler.js';
import {
ACTIVE_STAGES,
FLEET_STAGES,
DEP_DONE_HARD,
DEP_DONE_SOFT,
PRIORITY_ORDER,
@ -1151,6 +1152,147 @@ export async function costBurndown(productId: string, days = 30): Promise<CostBu
};
}
// ── Phase 3: Fleet metrics + alerting (§17) ───────────────────────────────────
export interface FleetAlert {
level: 'warning' | 'critical';
code: string;
message: string;
}
export interface FleetMetrics {
productId: string;
generatedAt: string;
jobs: {
total: number;
byStage: Record<FleetStage, number>;
queueDepth: number;
blocked: number;
active: number;
oldestQueuedAgeMs: number | null;
};
factories: {
total: number;
live: number;
stale: number;
byHealth: { ok: number; degraded: number; down: number };
seatsUsed: number;
seatsTotal: number;
utilizationPct: number;
};
alerts: FleetAlert[];
}
/** A factory is considered stale after this long without a heartbeat. */
const DEFAULT_STALE_FACTORY_MS = 90_000;
/** A queued job waiting longer than this raises a starvation alert. */
const DEFAULT_STARVATION_MS = 900_000;
/**
* Compute point-in-time operational metrics for a product's fleet (§17):
* queue depth, stage histogram, oldest-queued age (starvation), factory health
* and seat utilization plus derived alerts. Read-only: a single per-partition
* job + factory scan, no writes. Thresholds are injectable for deterministic tests.
*/
export async function fleetMetrics(
productId: string,
opts?: { nowMs?: number; staleMaxAgeMs?: number; starvationMs?: number }
): Promise<FleetMetrics> {
const nowMs = opts?.nowMs ?? Date.now();
const staleMaxAgeMs = opts?.staleMaxAgeMs ?? DEFAULT_STALE_FACTORY_MS;
const starvationMs = opts?.starvationMs ?? DEFAULT_STARVATION_MS;
const [allJobs, factories] = await Promise.all([
repo.listJobs({ productId }),
repo.listFactories(productId),
]);
const byStage = Object.fromEntries(FLEET_STAGES.map(s => [s, 0])) as Record<FleetStage, number>;
let oldestQueuedAgeMs: number | null = null;
for (const j of allJobs) {
byStage[j.stage] = (byStage[j.stage] ?? 0) + 1;
if (j.stage === 'queued') {
const age = nowMs - Date.parse(j.createdAt);
if (Number.isFinite(age) && (oldestQueuedAgeMs === null || age > oldestQueuedAgeMs)) {
oldestQueuedAgeMs = age;
}
}
}
const queueDepth = byStage.queued ?? 0;
const blocked = byStage.blocked ?? 0;
const active = ACTIVE_STAGES.reduce((n, s) => n + (byStage[s] ?? 0), 0);
const byHealth = { ok: 0, degraded: 0, down: 0 };
let live = 0;
let stale = 0;
let seatsUsed = 0;
let seatsTotal = 0;
for (const f of factories) {
if (isFactoryStale(f, nowMs, staleMaxAgeMs)) stale++;
else live++;
if (f.health === 'ok' || f.health === 'degraded' || f.health === 'down') byHealth[f.health]++;
seatsUsed += f.load ?? 0;
seatsTotal += f.seatLimit ?? 0;
}
const utilizationPct = seatsTotal > 0 ? Math.round((seatsUsed / seatsTotal) * 1000) / 10 : 0;
const alerts: FleetAlert[] = [];
const liveHealthy = factories.filter(
f => !isFactoryStale(f, nowMs, staleMaxAgeMs) && f.health !== 'down'
).length;
if (queueDepth > 0 && liveHealthy === 0) {
alerts.push({
level: 'critical',
code: 'no_live_capacity',
message: `${queueDepth} job(s) queued but no live, healthy factory is available.`,
});
}
if (factories.length > 0 && byHealth.down === factories.length) {
alerts.push({
level: 'critical',
code: 'all_factories_down',
message: 'All registered factories report health "down".',
});
}
if (oldestQueuedAgeMs !== null && oldestQueuedAgeMs > starvationMs) {
alerts.push({
level: 'warning',
code: 'queue_starvation',
message: `Oldest queued job has waited ${Math.round(oldestQueuedAgeMs / 1000)}s (threshold ${Math.round(starvationMs / 1000)}s).`,
});
}
if (seatsTotal > 0 && seatsUsed >= seatsTotal && queueDepth > 0) {
alerts.push({
level: 'warning',
code: 'saturated',
message: `All ${seatsTotal} factory seat(s) are in use with ${queueDepth} job(s) still queued.`,
});
}
if (stale > 0) {
alerts.push({
level: 'warning',
code: 'stale_factories',
message: `${stale} factory(ies) have not sent a heartbeat within ${Math.round(staleMaxAgeMs / 1000)}s.`,
});
}
return {
productId,
generatedAt: new Date(nowMs).toISOString(),
jobs: { total: allJobs.length, byStage, queueDepth, blocked, active, oldestQueuedAgeMs },
factories: {
total: factories.length,
live,
stale,
byHealth,
seatsUsed,
seatsTotal,
utilizationPct,
},
alerts,
};
}
export interface ReapResult {
reaped: number;
jobIds: string[];

View File

@ -173,6 +173,20 @@ describe('fleetRoutes', () => {
expect(res.payload).toContain(': keepalive');
});
it('GET /fleet/metrics returns jobs, factories, and alerts', async () => {
const app = await buildApp();
await submit(app, { idempotencyKey: 'm1', bodyMd: '# task' });
const res = await app.inject({ method: 'GET', url: '/api/fleet/metrics' });
expect(res.statusCode).toBe(200);
const body = JSON.parse(res.body);
expect(body.productId).toBe('lysnrai');
expect(body.jobs.queueDepth).toBe(1);
expect(body.factories.total).toBe(0);
// A queued job with no factory should raise the no_live_capacity alert.
expect(body.alerts.some((a: { code: string }) => a.code === 'no_live_capacity')).toBe(true);
});
it('POST /fleet/claim returns claimed:false when nothing is eligible', async () => {
const app = await buildApp();
const res = await app.inject({

View File

@ -12,6 +12,7 @@
* GET /fleet/jobs/:id/runs job run history
* GET /fleet/jobs/:id/events append-only event stream
* GET /fleet/jobs/:id/events/stream live event stream (SSE, resumable)
* GET /fleet/metrics fleet metrics + alerts (queue depth, utilization)
* POST /fleet/jobs/:id/artifacts upload a run output (base64 body blob + pointer)
* GET /fleet/jobs/:id/artifacts list a job's artifact pointers
* GET /fleet/artifacts/:artifactId pointer + fresh short-lived SAS download URL
@ -324,6 +325,13 @@ export async function fleetRoutes(app: FastifyInstance) {
return explain;
});
// ── Fleet metrics + alerting (§17) ──
app.get('/fleet/metrics', async req => {
await extractAuth(req);
const pid = getRequestProductId(req);
return coordinator.fleetMetrics(pid);
});
// ── Artifacts: upload (base64 body → blob + pointer) ──
app.post('/fleet/jobs/:id/artifacts', async (req, reply) => {
await extractAuth(req);