From c9c2c174dbc6a9f1d2d871b3d6e52e37081196ca Mon Sep 17 00:00:00 2001 From: Saravanakumar D Date: Sat, 30 May 2026 18:51:59 -0700 Subject: [PATCH] feat: add fleet metrics + alerting (GET /fleet/metrics) Adds coordinator.fleetMetrics() computing queue depth, stage histogram, oldest-queued age (starvation signal), factory health and seat utilization, plus derived alerts (no_live_capacity, all_factories_down, queue_starvation, saturated, stale_factories). Exposed via GET /fleet/metrics and surfaced as a metrics+alerts panel on the fleet overview. Thresholds injectable for tests. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- dashboards/tracker-web/e2e/fleet.spec.ts | 32 ++++ .../src/__tests__/fleet-client.test.ts | 37 +++++ .../src/app/dashboard/fleet/page.tsx | 60 +++++++- .../tracker-web/src/lib/fleet-client.ts | 35 +++++ docs/TASKS_TO_COMPLETE.md | 6 +- .../src/modules/fleet/coordinator.test.ts | 83 ++++++++++ .../src/modules/fleet/coordinator.ts | 142 ++++++++++++++++++ .../src/modules/fleet/routes.test.ts | 14 ++ .../src/modules/fleet/routes.ts | 8 + 9 files changed, 414 insertions(+), 3 deletions(-) diff --git a/dashboards/tracker-web/e2e/fleet.spec.ts b/dashboards/tracker-web/e2e/fleet.spec.ts index 31fb09e8..0a3d67e1 100644 --- a/dashboards/tracker-web/e2e/fleet.spec.ts +++ b/dashboards/tracker-web/e2e/fleet.spec.ts @@ -170,6 +170,36 @@ async function mockFleet( } if (path.endsWith('/factories')) return route.fulfill({ json: { factories: [FACTORY] } }); + // ── Fleet metrics ── + if (path.endsWith('/metrics')) { + return route.fulfill({ + json: { + productId: 'lysnrai', + generatedAt: ISO, + jobs: { + total: 1, + byStage: { queued: 1 }, + queueDepth: 1, + blocked: 0, + active: 0, + oldestQueuedAgeMs: 1000, + }, + factories: { + total: 1, + live: 1, + stale: 0, + byHealth: { ok: 1, degraded: 0, down: 0 }, + seatsUsed: 1, + seatsTotal: 4, + utilizationPct: 25, + }, + alerts: [ + { level: 'warning', code: 'queue_starvation', message: 'A job has waited too long.' }, + ], + }, + }); + } + return route.fulfill({ json: {} }); }); @@ -183,6 +213,8 @@ test.describe('Fleet — Overview', () => { await page.goto('/dashboard/fleet'); await expect(page.getByRole('heading', { name: 'Fleet Control Plane' })).toBeVisible(); + await expect(page.getByTestId('fleet-metrics')).toBeVisible(); + await expect(page.getByTestId('fleet-alerts')).toBeVisible(); await expect(page.getByLabel('Factory factory-alpha')).toBeVisible(); await expect(page.getByRole('table', { name: 'Recent fleet jobs' })).toBeVisible(); await expect(page.getByRole('link', { name: 'feat-x' })).toBeVisible(); diff --git a/dashboards/tracker-web/src/__tests__/fleet-client.test.ts b/dashboards/tracker-web/src/__tests__/fleet-client.test.ts index e1b3f815..0987f000 100644 --- a/dashboards/tracker-web/src/__tests__/fleet-client.test.ts +++ b/dashboards/tracker-web/src/__tests__/fleet-client.test.ts @@ -22,6 +22,7 @@ import { getJobDag, getJobExplain, listFactories, + getFleetMetrics, getBudget, getBudgetBurndown, upsertBudget, @@ -163,6 +164,42 @@ describe('fleet-client', () => { }); }); + describe('getFleetMetrics', () => { + it('returns metrics on success', async () => { + fetchSpy.mockResolvedValue({ + productId: 'lysnrai', + generatedAt: new Date().toISOString(), + jobs: { + total: 1, + byStage: {}, + queueDepth: 1, + blocked: 0, + active: 0, + oldestQueuedAgeMs: null, + }, + factories: { + total: 0, + live: 0, + stale: 0, + byHealth: { ok: 0, degraded: 0, down: 0 }, + seatsUsed: 0, + seatsTotal: 0, + utilizationPct: 0, + }, + alerts: [], + }); + const res = await getFleetMetrics(); + expect(res?.jobs.queueDepth).toBe(1); + expect(fetchSpy).toHaveBeenCalledWith('/metrics', expect.anything()); + }); + + it('returns null on 404', async () => { + fetchSpy.mockRejectedValue(new Error('404 Not Found')); + const res = await getFleetMetrics(); + expect(res).toBeNull(); + }); + }); + describe('listFactories', () => { it('returns factories on success', async () => { fetchSpy.mockResolvedValue({ factories: [{ id: 'f1' }] }); diff --git a/dashboards/tracker-web/src/app/dashboard/fleet/page.tsx b/dashboards/tracker-web/src/app/dashboard/fleet/page.tsx index 5763df0e..c39ae207 100644 --- a/dashboards/tracker-web/src/app/dashboard/fleet/page.tsx +++ b/dashboards/tracker-web/src/app/dashboard/fleet/page.tsx @@ -4,7 +4,14 @@ import { useEffect, useState, useCallback } from 'react'; import Link from 'next/link'; import { PageHeader } from '@bytelyst/dashboard-components'; import { useAuth } from '@/lib/auth-context'; -import { listFactories, listJobs, type FleetFactory, type FleetJob } from '@/lib/fleet-client'; +import { + listFactories, + listJobs, + getFleetMetrics, + type FleetFactory, + type FleetJob, + type FleetMetrics, +} from '@/lib/fleet-client'; const POLL_INTERVAL = 30_000; @@ -23,6 +30,15 @@ function HealthBadge({ health }: { health: string }) { ); } +function MetricCard({ label, value }: { label: string; value: number | string }) { + return ( +
+

{label}

+

{value}

+
+ ); +} + function StageBadge({ stage }: { stage: string }) { const colors: Record = { queued: 'bg-blue-500/20 text-blue-700 dark:text-blue-400', @@ -44,13 +60,19 @@ export default function FleetOverviewPage() { const { token } = useAuth(); const [factories, setFactories] = useState([]); const [jobs, setJobs] = useState([]); + const [metrics, setMetrics] = useState(null); const [loading, setLoading] = useState(true); const refresh = useCallback(async () => { try { - const [facRes, jobRes] = await Promise.all([listFactories(), listJobs({ limit: 10 })]); + const [facRes, jobRes, metricsRes] = await Promise.all([ + listFactories(), + listJobs({ limit: 10 }), + getFleetMetrics().catch(() => null), + ]); setFactories(facRes.factories); setJobs(jobRes.jobs); + setMetrics(metricsRes); } catch { /* degrade gracefully */ } finally { @@ -78,6 +100,40 @@ export default function FleetOverviewPage() {
+ {/* Metrics + alerts */} + {metrics && ( +
+ {metrics.alerts.length > 0 && ( +
+ {metrics.alerts.map(a => ( +
+ {a.level} + {a.message} +
+ ))} +
+ )} +
+ + + + + +
+
+ )} + {/* Factory cards */}

Factories

diff --git a/dashboards/tracker-web/src/lib/fleet-client.ts b/dashboards/tracker-web/src/lib/fleet-client.ts index caca2024..1a9fab36 100644 --- a/dashboards/tracker-web/src/lib/fleet-client.ts +++ b/dashboards/tracker-web/src/lib/fleet-client.ts @@ -200,6 +200,41 @@ export async function getJobEvents(jobId: string): Promise<{ events: FleetEvent[ return apiFetch(`/jobs/${jobId}/events`); } +// ── Fleet metrics + alerting ────────────────────────────────────────────────── + +export interface FleetAlert { + level: 'warning' | 'critical'; + code: string; + message: string; +} + +export interface FleetMetrics { + productId: string; + generatedAt: string; + jobs: { + total: number; + byStage: Record; + queueDepth: number; + blocked: number; + active: number; + oldestQueuedAgeMs: number | null; + }; + factories: { + total: number; + live: number; + stale: number; + byHealth: { ok: number; degraded: number; down: number }; + seatsUsed: number; + seatsTotal: number; + utilizationPct: number; + }; + alerts: FleetAlert[]; +} + +export async function getFleetMetrics(): Promise { + return apiFetchOptional('/metrics'); +} + // ── Live event stream (SSE) ─────────────────────────────────────────────────── export interface ParsedSseEvent { diff --git a/docs/TASKS_TO_COMPLETE.md b/docs/TASKS_TO_COMPLETE.md index fb754974..d6d4eaf0 100644 --- a/docs/TASKS_TO_COMPLETE.md +++ b/docs/TASKS_TO_COMPLETE.md @@ -63,7 +63,11 @@ - [ ] **Phase-1 `budget.wall` enforcement** — P3 — `agent-queue.sh` — wall-clock ceiling extending timeout. - [ ] **Node `dash` tag surfacing** — P3 — `dashboard.mjs` — profile/priority/caps/tracker-item link. - [ ] **Roadmap §14 reconciliation** — P3 — tick Phase-2/3 boxes in `learning_ai_devops_tools`. -- [ ] **Fleet metrics + alerting** — P3 — queue depth, assign latency, utilization, reclaim counts (§17). +- [x] **Fleet metrics + alerting** — P3 — ✅ DONE — `GET /fleet/metrics` (`coordinator.fleetMetrics`): + queue depth, stage histogram, oldest-queued age (starvation), factory health/seat utilization, and + derived alerts (`no_live_capacity`, `all_factories_down`, `queue_starvation`, `saturated`, + `stale_factories`). Surfaced as a metrics+alerts panel on the fleet overview (`getFleetMetrics`). + Files: `coordinator.ts`, `routes.ts`, `fleet-client.ts`, `dashboard/fleet/page.tsx` + tests + e2e. - [ ] **Multi-reviewer routing** — P3 — Phase-3 §14. - [ ] **TUI re-point at `/fleet`** — P3 — Phase-3 §14. diff --git a/services/platform-service/src/modules/fleet/coordinator.test.ts b/services/platform-service/src/modules/fleet/coordinator.test.ts index c6da7971..4eb49303 100644 --- a/services/platform-service/src/modules/fleet/coordinator.test.ts +++ b/services/platform-service/src/modules/fleet/coordinator.test.ts @@ -951,4 +951,87 @@ describe('fleet coordinator — Phase 3 per-product budgets', () => { expect(burndown.totalUsd).toBe(0); expect(burndown.days.every(d => d.costUsd === 0)).toBe(true); }); + + // ── FLEET METRICS + ALERTING (§17) ── + it('fleetMetrics: empty fleet reports zeros and no alerts', async () => { + const m = await coord.fleetMetrics('emptyproduct'); + expect(m.jobs.total).toBe(0); + expect(m.jobs.queueDepth).toBe(0); + expect(m.jobs.oldestQueuedAgeMs).toBeNull(); + expect(m.factories.total).toBe(0); + expect(m.factories.utilizationPct).toBe(0); + expect(m.alerts).toEqual([]); + }); + + it('fleetMetrics: counts stages, seats, and utilization', async () => { + await coord.submitJob(PID, input({ idempotencyKey: 'q1' })); + await coord.submitJob(PID, input({ idempotencyKey: 'q2' })); + await coord.heartbeat({ + productId: PID, + factoryId: 'fac_1', + health: 'ok', + load: 1, + seatLimit: 2, + }); + await coord.heartbeat({ + productId: PID, + factoryId: 'fac_2', + health: 'degraded', + load: 2, + seatLimit: 2, + }); + + const m = await coord.fleetMetrics(PID); + expect(m.jobs.total).toBe(2); + expect(m.jobs.queueDepth).toBe(2); + expect(m.jobs.byStage.queued).toBe(2); + expect(m.factories.total).toBe(2); + expect(m.factories.byHealth).toEqual({ ok: 1, degraded: 1, down: 0 }); + expect(m.factories.seatsUsed).toBe(3); + expect(m.factories.seatsTotal).toBe(4); + expect(m.factories.utilizationPct).toBe(75); + }); + + it('fleetMetrics: raises no_live_capacity when jobs are queued with no healthy factory', async () => { + await coord.submitJob(PID, input({ idempotencyKey: 'q1' })); + const m = await coord.fleetMetrics(PID); + expect(m.alerts.some(a => a.code === 'no_live_capacity' && a.level === 'critical')).toBe(true); + }); + + it('fleetMetrics: raises all_factories_down and stale_factories', async () => { + await coord.heartbeat({ + productId: PID, + factoryId: 'fac_1', + health: 'down', + load: 0, + seatLimit: 1, + }); + // Force staleness by evaluating far in the future. + const future = Date.now() + 10 * 60_000; + const m = await coord.fleetMetrics(PID, { nowMs: future, staleMaxAgeMs: 60_000 }); + expect(m.factories.stale).toBe(1); + expect(m.factories.live).toBe(0); + expect(m.alerts.some(a => a.code === 'all_factories_down')).toBe(true); + expect(m.alerts.some(a => a.code === 'stale_factories')).toBe(true); + }); + + it('fleetMetrics: raises queue_starvation and saturated under load', async () => { + await coord.submitJob(PID, input({ idempotencyKey: 'q1' })); + await coord.heartbeat({ + productId: PID, + factoryId: 'fac_1', + health: 'ok', + load: 1, + seatLimit: 1, + }); + // Evaluate 20 minutes later so the queued job exceeds the 1s starvation bound. + const later = Date.now() + 20 * 60_000; + const m = await coord.fleetMetrics(PID, { + nowMs: later, + starvationMs: 1000, + staleMaxAgeMs: 60 * 60_000, + }); + expect(m.alerts.some(a => a.code === 'queue_starvation')).toBe(true); + expect(m.alerts.some(a => a.code === 'saturated')).toBe(true); + }); }); diff --git a/services/platform-service/src/modules/fleet/coordinator.ts b/services/platform-service/src/modules/fleet/coordinator.ts index 2dd2f346..a0f7fbc0 100644 --- a/services/platform-service/src/modules/fleet/coordinator.ts +++ b/services/platform-service/src/modules/fleet/coordinator.ts @@ -35,6 +35,7 @@ import { } from './scheduler.js'; import { ACTIVE_STAGES, + FLEET_STAGES, DEP_DONE_HARD, DEP_DONE_SOFT, PRIORITY_ORDER, @@ -1151,6 +1152,147 @@ export async function costBurndown(productId: string, days = 30): Promise; + queueDepth: number; + blocked: number; + active: number; + oldestQueuedAgeMs: number | null; + }; + factories: { + total: number; + live: number; + stale: number; + byHealth: { ok: number; degraded: number; down: number }; + seatsUsed: number; + seatsTotal: number; + utilizationPct: number; + }; + alerts: FleetAlert[]; +} + +/** A factory is considered stale after this long without a heartbeat. */ +const DEFAULT_STALE_FACTORY_MS = 90_000; +/** A queued job waiting longer than this raises a starvation alert. */ +const DEFAULT_STARVATION_MS = 900_000; + +/** + * Compute point-in-time operational metrics for a product's fleet (§17): + * queue depth, stage histogram, oldest-queued age (starvation), factory health + * and seat utilization — plus derived alerts. Read-only: a single per-partition + * job + factory scan, no writes. Thresholds are injectable for deterministic tests. + */ +export async function fleetMetrics( + productId: string, + opts?: { nowMs?: number; staleMaxAgeMs?: number; starvationMs?: number } +): Promise { + const nowMs = opts?.nowMs ?? Date.now(); + const staleMaxAgeMs = opts?.staleMaxAgeMs ?? DEFAULT_STALE_FACTORY_MS; + const starvationMs = opts?.starvationMs ?? DEFAULT_STARVATION_MS; + + const [allJobs, factories] = await Promise.all([ + repo.listJobs({ productId }), + repo.listFactories(productId), + ]); + + const byStage = Object.fromEntries(FLEET_STAGES.map(s => [s, 0])) as Record; + let oldestQueuedAgeMs: number | null = null; + for (const j of allJobs) { + byStage[j.stage] = (byStage[j.stage] ?? 0) + 1; + if (j.stage === 'queued') { + const age = nowMs - Date.parse(j.createdAt); + if (Number.isFinite(age) && (oldestQueuedAgeMs === null || age > oldestQueuedAgeMs)) { + oldestQueuedAgeMs = age; + } + } + } + const queueDepth = byStage.queued ?? 0; + const blocked = byStage.blocked ?? 0; + const active = ACTIVE_STAGES.reduce((n, s) => n + (byStage[s] ?? 0), 0); + + const byHealth = { ok: 0, degraded: 0, down: 0 }; + let live = 0; + let stale = 0; + let seatsUsed = 0; + let seatsTotal = 0; + for (const f of factories) { + if (isFactoryStale(f, nowMs, staleMaxAgeMs)) stale++; + else live++; + if (f.health === 'ok' || f.health === 'degraded' || f.health === 'down') byHealth[f.health]++; + seatsUsed += f.load ?? 0; + seatsTotal += f.seatLimit ?? 0; + } + const utilizationPct = seatsTotal > 0 ? Math.round((seatsUsed / seatsTotal) * 1000) / 10 : 0; + + const alerts: FleetAlert[] = []; + const liveHealthy = factories.filter( + f => !isFactoryStale(f, nowMs, staleMaxAgeMs) && f.health !== 'down' + ).length; + if (queueDepth > 0 && liveHealthy === 0) { + alerts.push({ + level: 'critical', + code: 'no_live_capacity', + message: `${queueDepth} job(s) queued but no live, healthy factory is available.`, + }); + } + if (factories.length > 0 && byHealth.down === factories.length) { + alerts.push({ + level: 'critical', + code: 'all_factories_down', + message: 'All registered factories report health "down".', + }); + } + if (oldestQueuedAgeMs !== null && oldestQueuedAgeMs > starvationMs) { + alerts.push({ + level: 'warning', + code: 'queue_starvation', + message: `Oldest queued job has waited ${Math.round(oldestQueuedAgeMs / 1000)}s (threshold ${Math.round(starvationMs / 1000)}s).`, + }); + } + if (seatsTotal > 0 && seatsUsed >= seatsTotal && queueDepth > 0) { + alerts.push({ + level: 'warning', + code: 'saturated', + message: `All ${seatsTotal} factory seat(s) are in use with ${queueDepth} job(s) still queued.`, + }); + } + if (stale > 0) { + alerts.push({ + level: 'warning', + code: 'stale_factories', + message: `${stale} factory(ies) have not sent a heartbeat within ${Math.round(staleMaxAgeMs / 1000)}s.`, + }); + } + + return { + productId, + generatedAt: new Date(nowMs).toISOString(), + jobs: { total: allJobs.length, byStage, queueDepth, blocked, active, oldestQueuedAgeMs }, + factories: { + total: factories.length, + live, + stale, + byHealth, + seatsUsed, + seatsTotal, + utilizationPct, + }, + alerts, + }; +} + export interface ReapResult { reaped: number; jobIds: string[]; diff --git a/services/platform-service/src/modules/fleet/routes.test.ts b/services/platform-service/src/modules/fleet/routes.test.ts index 53d1ed65..d08ffb88 100644 --- a/services/platform-service/src/modules/fleet/routes.test.ts +++ b/services/platform-service/src/modules/fleet/routes.test.ts @@ -173,6 +173,20 @@ describe('fleetRoutes', () => { expect(res.payload).toContain(': keepalive'); }); + it('GET /fleet/metrics returns jobs, factories, and alerts', async () => { + const app = await buildApp(); + await submit(app, { idempotencyKey: 'm1', bodyMd: '# task' }); + + const res = await app.inject({ method: 'GET', url: '/api/fleet/metrics' }); + expect(res.statusCode).toBe(200); + const body = JSON.parse(res.body); + expect(body.productId).toBe('lysnrai'); + expect(body.jobs.queueDepth).toBe(1); + expect(body.factories.total).toBe(0); + // A queued job with no factory should raise the no_live_capacity alert. + expect(body.alerts.some((a: { code: string }) => a.code === 'no_live_capacity')).toBe(true); + }); + it('POST /fleet/claim returns claimed:false when nothing is eligible', async () => { const app = await buildApp(); const res = await app.inject({ diff --git a/services/platform-service/src/modules/fleet/routes.ts b/services/platform-service/src/modules/fleet/routes.ts index 1515743f..9bb71d9a 100644 --- a/services/platform-service/src/modules/fleet/routes.ts +++ b/services/platform-service/src/modules/fleet/routes.ts @@ -12,6 +12,7 @@ * GET /fleet/jobs/:id/runs job run history * GET /fleet/jobs/:id/events append-only event stream * GET /fleet/jobs/:id/events/stream live event stream (SSE, resumable) + * GET /fleet/metrics fleet metrics + alerts (queue depth, utilization) * POST /fleet/jobs/:id/artifacts upload a run output (base64 body → blob + pointer) * GET /fleet/jobs/:id/artifacts list a job's artifact pointers * GET /fleet/artifacts/:artifactId pointer + fresh short-lived SAS download URL @@ -324,6 +325,13 @@ export async function fleetRoutes(app: FastifyInstance) { return explain; }); + // ── Fleet metrics + alerting (§17) ── + app.get('/fleet/metrics', async req => { + await extractAuth(req); + const pid = getRequestProductId(req); + return coordinator.fleetMetrics(pid); + }); + // ── Artifacts: upload (base64 body → blob + pointer) ── app.post('/fleet/jobs/:id/artifacts', async (req, reply) => { await extractAuth(req);