diff --git a/.env.example b/.env.example index 55ed2807..f0f83cdb 100644 --- a/.env.example +++ b/.env.example @@ -118,3 +118,8 @@ FLEET_AUTOSCALE_SCALE_OUT_PCT=85 FLEET_AUTOSCALE_SCALE_IN_PCT=20 FLEET_AUTOSCALE_MAX_STEP=5 FLEET_AUTOSCALE_MIN_SEATS=0 +# Anti-flap cooldown (seconds): the /fleet/autoscale endpoints suppress a +# direction reversal (scale_in after scale_out, or vice versa) within this +# window so a consumer cannot thrash capacity. A critical scale-out (queued work +# with zero usable capacity) always bypasses the cooldown. Default 300. +FLEET_AUTOSCALE_COOLDOWN_SEC=300 diff --git a/services/monitoring/grafana/dashboards/fleet-overview.json b/services/monitoring/grafana/dashboards/fleet-overview.json index b0f577ee..8f8e5eab 100644 --- a/services/monitoring/grafana/dashboards/fleet-overview.json +++ b/services/monitoring/grafana/dashboards/fleet-overview.json @@ -9,7 +9,7 @@ "tags": ["fleet", "gigafactory"], "uid": "fleet-overview", "schemaVersion": 38, - "version": 1, + "version": 2, "refresh": "30s", "time": { "from": "now-6h", "to": "now" }, "timepicker": { "refresh_intervals": ["15s", "30s", "1m", "5m", "15m", "1h"] }, @@ -179,6 +179,56 @@ "legendFormat": "stale" } ] + }, + { + "title": "Autoscale: products recommending scale-out", + "type": "stat", + "datasource": { "type": "prometheus", "uid": "prometheus" }, + "gridPos": { "h": 4, "w": 6, "x": 0, "y": 20 }, + "fieldConfig": { + "defaults": { + "thresholds": { + "mode": "absolute", + "steps": [ + { "color": "green", "value": null }, + { "color": "yellow", "value": 1 } + ] + } + } + }, + "options": { + "colorMode": "value", + "graphMode": "none", + "reduceOptions": { "calcs": ["lastNotNull"] } + }, + "targets": [ + { + "refId": "A", + "expr": "sum(fleet_autoscale_action{product=~\"$product\",action=\"scale_out\"}) or vector(0)" + } + ] + }, + { + "title": "Autoscale: recommended seat delta", + "type": "timeseries", + "description": "Recommended change in factory seats per product (positive = scale out, negative = scale in). Raw signal, pre-hysteresis.", + "datasource": { "type": "prometheus", "uid": "prometheus" }, + "gridPos": { "h": 8, "w": 18, "x": 6, "y": 20 }, + "fieldConfig": { + "defaults": { "custom": { "drawStyle": "line", "fillOpacity": 10 } } + }, + "targets": [ + { + "refId": "A", + "expr": "fleet_autoscale_delta{product=~\"$product\"}", + "legendFormat": "{{product}} delta" + }, + { + "refId": "B", + "expr": "fleet_autoscale_pressure{product=~\"$product\"}", + "legendFormat": "{{product}} backlog" + } + ] } ] } diff --git a/services/platform-service/src/modules/fleet/autoscaler.test.ts b/services/platform-service/src/modules/fleet/autoscaler.test.ts index 71bed8f6..19a8f3c4 100644 --- a/services/platform-service/src/modules/fleet/autoscaler.test.ts +++ b/services/platform-service/src/modules/fleet/autoscaler.test.ts @@ -8,8 +8,15 @@ import { MemoryDatastoreProvider } from '@bytelyst/datastore'; import { _resetDatastoreProvider, setProvider } from '../../lib/datastore.js'; import { recommendScaling, + recommendScalingStateful, + applyHysteresis, scalingThresholdsFromEnv, + cooldownMsFromEnv, + _resetScaleStates, DEFAULT_SCALING_THRESHOLDS, + DEFAULT_COOLDOWN_MS, + type ScalingRecommendation, + type ScaleState, } from './autoscaler.js'; import type { FleetMetrics, FleetAlert } from './coordinator.js'; @@ -190,6 +197,151 @@ describe('scalingThresholdsFromEnv', () => { }); }); +function rec(over: Partial = {}): ScalingRecommendation { + return { + productId: 'lysnrai', + action: 'scale_out', + recommendedSeats: 5, + delta: 1, + reason: 'x', + urgency: 'warning', + signals: { + queueDepth: 1, + blocked: 0, + active: 0, + seatsUsed: 4, + seatsTotal: 4, + utilizationPct: 100, + pressure: 1, + alertCodes: [], + }, + generatedAt: '2026-06-01T00:00:00.000Z', + ...over, + }; +} + +describe('applyHysteresis', () => { + const cfg = { cooldownMs: 1000 }; + + it('emits the first action and anchors the cooldown', () => { + const { recommendation, nextState } = applyHysteresis( + rec({ action: 'scale_out' }), + undefined, + 100, + cfg + ); + expect(recommendation.action).toBe('scale_out'); + expect(nextState).toEqual({ action: 'scale_out', at: 100 }); + }); + + it('suppresses a direction reversal within the cooldown (scale_in after scale_out)', () => { + const prior: ScaleState = { action: 'scale_out', at: 0 }; + const { recommendation, nextState } = applyHysteresis( + rec({ action: 'scale_in', urgency: 'none', delta: -3 }), + prior, + 500, + cfg + ); + expect(recommendation.action).toBe('hold'); + expect(recommendation.suppressed).toBe(true); + expect(recommendation.delta).toBe(0); + expect(recommendation.cooldownRemainingMs).toBe(500); + // The anchor is preserved so the cooldown keeps elapsing. + expect(nextState).toBe(prior); + }); + + it('allows the reversal once the cooldown has elapsed', () => { + const prior: ScaleState = { action: 'scale_out', at: 0 }; + const { recommendation, nextState } = applyHysteresis( + rec({ action: 'scale_in', urgency: 'none' }), + prior, + 1500, + cfg + ); + expect(recommendation.action).toBe('scale_in'); + expect(nextState).toEqual({ action: 'scale_in', at: 1500 }); + }); + + it('NEVER debounces a critical scale-out (capacity emergency bypasses cooldown)', () => { + const prior: ScaleState = { action: 'scale_in', at: 0 }; + const { recommendation } = applyHysteresis( + rec({ action: 'scale_out', urgency: 'critical' }), + prior, + 100, + cfg + ); + expect(recommendation.action).toBe('scale_out'); + expect(recommendation.suppressed).toBeUndefined(); + }); + + it('a hold never anchors the cooldown nor gets suppressed', () => { + const prior: ScaleState = { action: 'scale_out', at: 0 }; + const { recommendation, nextState } = applyHysteresis(rec({ action: 'hold' }), prior, 100, cfg); + expect(recommendation.action).toBe('hold'); + expect(nextState).toBe(prior); + }); + + it('does not suppress a same-direction repeat (keeps scaling out while saturated)', () => { + const prior: ScaleState = { action: 'scale_out', at: 0 }; + const { recommendation, nextState } = applyHysteresis( + rec({ action: 'scale_out' }), + prior, + 200, + cfg + ); + expect(recommendation.action).toBe('scale_out'); + expect(nextState).toEqual({ action: 'scale_out', at: 200 }); + }); +}); + +describe('recommendScalingStateful (in-memory cooldown)', () => { + afterEach(() => _resetScaleStates()); + + const saturated = () => + metrics({ + jobs: { queueDepth: 3, total: 3 }, + factories: { seatsUsed: 4, seatsTotal: 4, utilizationPct: 100 }, + alerts: [alert('saturated')], + }); + const idle = () => + metrics({ + jobs: { queueDepth: 0, active: 0, total: 0 }, + factories: { seatsUsed: 0, seatsTotal: 6, utilizationPct: 0 }, + }); + + it('debounces a scale_out → scale_in flip, then allows it after the cooldown', () => { + const opts = { cooldownMs: 10_000 }; + const out = recommendScalingStateful(saturated(), {}, { nowMs: 0, ...opts }); + expect(out.action).toBe('scale_out'); + + const flip = recommendScalingStateful(idle(), {}, { nowMs: 1_000, ...opts }); + expect(flip.action).toBe('hold'); + expect(flip.suppressed).toBe(true); + + const later = recommendScalingStateful(idle(), {}, { nowMs: 20_000, ...opts }); + expect(later.action).toBe('scale_in'); + }); + + it('isolates cooldown state per product', () => { + recommendScalingStateful(saturated(), {}, { nowMs: 0, cooldownMs: 10_000 }); + const other = recommendScalingStateful( + { ...idle(), productId: 'flowmonk' }, + {}, + { nowMs: 1_000, cooldownMs: 10_000 } + ); + expect(other.action).toBe('scale_in'); // different product → no cooldown carryover + }); +}); + +describe('cooldownMsFromEnv', () => { + afterEach(() => delete process.env.FLEET_AUTOSCALE_COOLDOWN_SEC); + it('defaults when unset and reads seconds when configured', () => { + expect(cooldownMsFromEnv()).toBe(DEFAULT_COOLDOWN_MS); + process.env.FLEET_AUTOSCALE_COOLDOWN_SEC = '60'; + expect(cooldownMsFromEnv()).toBe(60_000); + }); +}); + async function buildApp(): Promise { const { fleetRoutes } = await import('./routes.js'); const app = Fastify({ logger: false }); diff --git a/services/platform-service/src/modules/fleet/autoscaler.ts b/services/platform-service/src/modules/fleet/autoscaler.ts index f8c3190c..3906b911 100644 --- a/services/platform-service/src/modules/fleet/autoscaler.ts +++ b/services/platform-service/src/modules/fleet/autoscaler.ts @@ -60,6 +60,10 @@ export interface ScalingRecommendation { reason: string; /** Urgency: `critical` when work is queued with zero usable capacity. */ urgency: 'critical' | 'warning' | 'none'; + /** Set when anti-flap hysteresis held back an otherwise-actionable signal. */ + suppressed?: boolean; + /** Milliseconds remaining on the cooldown when `suppressed` is true. */ + cooldownRemainingMs?: number; signals: ScalingSignals; generatedAt: string; } @@ -218,3 +222,102 @@ export function recommendScaling( return hold('Capacity is balanced against current demand.'); } + +// ── Anti-flap hysteresis (§5) ───────────────────────────────────────────────── +// +// The raw recommender reacts to the instantaneous snapshot, which can oscillate +// (scale_out on a queue blip, then scale_in the moment it drains). Hysteresis +// suppresses a *direction reversal* within a cooldown window so a consumer that +// acts on the signal cannot thrash capacity. A CRITICAL scale-out (queued work, +// zero usable capacity) is never debounced — that is a genuine emergency. + +/** Last emitted scale decision for a product (the cooldown anchor). */ +export interface ScaleState { + action: ScalingAction; + at: number; +} + +/** Default cooldown between opposite-direction scale actions (5 minutes). */ +export const DEFAULT_COOLDOWN_MS = 300_000; + +/** + * Apply anti-flap hysteresis to a raw recommendation. PURE: given the raw + * recommendation, the product's prior emitted state, the current time, and the + * cooldown, returns the (possibly suppressed) recommendation plus the next state + * to persist. The cooldown anchor only advances when an action is actually + * emitted, so a suppressed signal keeps counting down from the real last action. + */ +export function applyHysteresis( + raw: ScalingRecommendation, + prior: ScaleState | undefined, + nowMs: number, + opts: { cooldownMs: number } +): { recommendation: ScalingRecommendation; nextState: ScaleState | undefined } { + // Holds never anchor the cooldown and never need suppressing. + if (raw.action === 'hold') return { recommendation: raw, nextState: prior }; + + // A capacity emergency always wins — never debounce a critical scale-out. + const isCriticalScaleOut = raw.action === 'scale_out' && raw.urgency === 'critical'; + + const elapsed = prior ? nowMs - prior.at : Number.POSITIVE_INFINITY; + const isReversal = !!prior && prior.action !== 'hold' && prior.action !== raw.action; + + if (isReversal && elapsed < opts.cooldownMs && !isCriticalScaleOut) { + const remaining = opts.cooldownMs - elapsed; + const suppressed: ScalingRecommendation = { + ...raw, + action: 'hold', + recommendedSeats: raw.signals.seatsTotal, + delta: 0, + urgency: 'none', + suppressed: true, + cooldownRemainingMs: remaining, + reason: `Hysteresis: holding '${raw.action}' for ${Math.ceil(remaining / 1000)}s after a recent '${prior!.action}' (anti-flap cooldown).`, + }; + // Keep the original anchor so the cooldown keeps elapsing. + return { recommendation: suppressed, nextState: prior }; + } + + // Emitted (allowed) — advance the cooldown anchor to now. + return { recommendation: raw, nextState: { action: raw.action, at: nowMs } }; +} + +/** Process-wide per-product cooldown anchors (mirrors reaper/breaker in-mem state). */ +const scaleStates = new Map(); + +/** Test seam: clear all hysteresis state. */ +export function _resetScaleStates(): void { + scaleStates.clear(); +} + +/** Snapshot of the current per-product hysteresis anchors (observability/tests). */ +export function getScaleStates(): Record { + return Object.fromEntries(scaleStates); +} + +/** Resolve the cooldown from `FLEET_AUTOSCALE_COOLDOWN_SEC` (default 5m). */ +export function cooldownMsFromEnv(): number { + const sec = envNum('FLEET_AUTOSCALE_COOLDOWN_SEC'); + return sec !== undefined ? sec * 1000 : DEFAULT_COOLDOWN_MS; +} + +/** + * Stateful recommendation: computes the raw signal then applies anti-flap + * hysteresis against this process's per-product cooldown state. This is what the + * actionable `/fleet/autoscale[/all]` endpoints serve. The Prometheus exposition + * deliberately uses the RAW recommender instead, so a scrape never mutates the + * cooldown anchors. + */ +export function recommendScalingStateful( + metrics: FleetMetrics, + thresholds: Partial = {}, + opts?: { nowMs?: number; cooldownMs?: number } +): ScalingRecommendation { + const raw = recommendScaling(metrics, thresholds); + const nowMs = opts?.nowMs ?? Date.now(); + const cooldownMs = opts?.cooldownMs ?? cooldownMsFromEnv(); + const prior = scaleStates.get(metrics.productId); + const { recommendation, nextState } = applyHysteresis(raw, prior, nowMs, { cooldownMs }); + if (nextState) scaleStates.set(metrics.productId, nextState); + return recommendation; +} diff --git a/services/platform-service/src/modules/fleet/prometheus.test.ts b/services/platform-service/src/modules/fleet/prometheus.test.ts index e390874f..055cbc2b 100644 --- a/services/platform-service/src/modules/fleet/prometheus.test.ts +++ b/services/platform-service/src/modules/fleet/prometheus.test.ts @@ -144,6 +144,44 @@ describe('renderFleetMetricsProm', () => { expect(out).toContain('fleet_engine_breaker_open_count 1'); }); + it('emits autoscale series only when a recommendation is attached', () => { + const out = renderFleetMetricsProm({ + ...base, + products: [ + { + productId: 'lysnrai', + metrics: metrics(), + recommendation: { + productId: 'lysnrai', + action: 'scale_out', + recommendedSeats: 6, + delta: 2, + reason: 'x', + urgency: 'warning', + signals: { + queueDepth: 2, + blocked: 0, + active: 0, + seatsUsed: 4, + seatsTotal: 4, + utilizationPct: 100, + pressure: 2, + alertCodes: ['saturated'], + }, + generatedAt: '2026-06-01T00:00:00.000Z', + }, + }, + ], + }); + expect(out).toContain('fleet_autoscale_recommended_seats{product="lysnrai"} 6'); + expect(out).toContain('fleet_autoscale_delta{product="lysnrai"} 2'); + expect(out).toContain('fleet_autoscale_pressure{product="lysnrai"} 2'); + expect(out).toContain('fleet_autoscale_action{product="lysnrai",action="scale_out"} 1'); + expect(out).toContain('fleet_autoscale_action{product="lysnrai",action="scale_in"} 0'); + // no recommendation attached ⇒ no autoscale series + expect(renderFleetMetricsProm(base)).not.toContain('fleet_autoscale_'); + }); + it('escapes special characters in label values', () => { const out = renderFleetMetricsProm({ ...base, diff --git a/services/platform-service/src/modules/fleet/prometheus.ts b/services/platform-service/src/modules/fleet/prometheus.ts index e9fe7f5f..1423971a 100644 --- a/services/platform-service/src/modules/fleet/prometheus.ts +++ b/services/platform-service/src/modules/fleet/prometheus.ts @@ -11,11 +11,17 @@ import type { FleetMetrics } from './coordinator.js'; import type { ReaperStats } from './reaper.js'; import type { EngineBreakerEntry } from './engine-breaker.js'; +import type { ScalingAction, ScalingRecommendation } from './autoscaler.js'; + +/** Scale actions emitted as the `action` label on `fleet_autoscale_action`. */ +const SCALING_ACTIONS: readonly ScalingAction[] = ['scale_out', 'scale_in', 'hold']; /** One product's computed metrics, paired with its id for labelling. */ export interface ProductMetrics { productId: string; metrics: FleetMetrics; + /** Optional raw autoscale recommendation — emitted as `fleet_autoscale_*`. */ + recommendation?: ScalingRecommendation; } export interface FleetPromInput { @@ -52,7 +58,7 @@ export function renderFleetMetricsProm(input: FleetPromInput): string { const def = (name: string, type: Metric['type'], help: string): Metric => (m[name] ??= { name, help, type, rows: [] }); - for (const { productId, metrics } of input.products) { + for (const { productId, metrics, recommendation } of input.products) { const p: [string, string][] = [['product', productId]]; def('fleet_jobs_total', 'gauge', 'Total jobs for the product').rows.push( line('fleet_jobs_total', p, metrics.jobs.total) @@ -131,6 +137,41 @@ export function renderFleetMetricsProm(input: FleetPromInput): string { ); } } + + // ── Capacity autoscaling signal (§5) — the RAW recommendation ── + if (recommendation) { + def( + 'fleet_autoscale_recommended_seats', + 'gauge', + 'Recommended total factory seats for the product' + ).rows.push(line('fleet_autoscale_recommended_seats', p, recommendation.recommendedSeats)); + def( + 'fleet_autoscale_delta', + 'gauge', + 'Recommended seat change (positive = scale out, negative = scale in)' + ).rows.push(line('fleet_autoscale_delta', p, recommendation.delta)); + def('fleet_autoscale_pressure', 'gauge', 'Runnable backlog driving scale-out').rows.push( + line('fleet_autoscale_pressure', p, recommendation.signals.pressure) + ); + // One-hot the current action so a dashboard can count e.g. scale_out products. + const action = def( + 'fleet_autoscale_action', + 'gauge', + 'Current scale recommendation (1 = active) labelled by action' + ); + for (const a of SCALING_ACTIONS) { + action.rows.push( + line( + 'fleet_autoscale_action', + [ + ['product', productId], + ['action', a], + ], + recommendation.action === a ? 1 : 0 + ) + ); + } + } } // ── Process-wide reaper / GC counters ── diff --git a/services/platform-service/src/modules/fleet/routes.ts b/services/platform-service/src/modules/fleet/routes.ts index 063141c5..efaae547 100644 --- a/services/platform-service/src/modules/fleet/routes.ts +++ b/services/platform-service/src/modules/fleet/routes.ts @@ -39,7 +39,11 @@ import * as trackerBridge from './tracker-bridge.js'; import { getReaperStats } from './reaper.js'; import { getEngineBreakerSnapshot } from './engine-breaker.js'; import { renderFleetMetricsProm, scrapeTokenMatches } from './prometheus.js'; -import { recommendScaling, scalingThresholdsFromEnv } from './autoscaler.js'; +import { + recommendScaling, + recommendScalingStateful, + scalingThresholdsFromEnv, +} from './autoscaler.js'; import { CORRELATION_HEADER, resolveCorrelationId } from './trace.js'; import { getAllProducts } from '../products/cache.js'; import { @@ -500,11 +504,18 @@ export async function fleetRoutes(app: FastifyInstance) { } } const products = getAllProducts(); + const thresholds = scalingThresholdsFromEnv(); const perProduct = await Promise.all( - products.map(async p => ({ - productId: p.productId, - metrics: await coordinator.fleetMetrics(p.productId), - })) + products.map(async p => { + const metrics = await coordinator.fleetMetrics(p.productId); + // RAW recommendation here (not the stateful one): a scrape must never + // mutate the hysteresis cooldown anchors that the /autoscale endpoints own. + return { + productId: p.productId, + metrics, + recommendation: recommendScaling(metrics, thresholds), + }; + }) ); const body = renderFleetMetricsProm({ products: perProduct, @@ -525,7 +536,9 @@ export async function fleetRoutes(app: FastifyInstance) { await extractAuth(req); const pid = await requireProductAccess(req); const metrics = await coordinator.fleetMetrics(pid); - return recommendScaling(metrics, scalingThresholdsFromEnv()); + // Stateful: anti-flap hysteresis debounces direction reversals so a consumer + // acting on this signal cannot thrash capacity (critical scale-out bypasses). + return recommendScalingStateful(metrics, scalingThresholdsFromEnv()); }); // ── Capacity autoscaling signal — GLOBAL (all products) ── @@ -541,7 +554,7 @@ export async function fleetRoutes(app: FastifyInstance) { const thresholds = scalingThresholdsFromEnv(); const recommendations = await Promise.all( getAllProducts().map(async p => - recommendScaling(await coordinator.fleetMetrics(p.productId), thresholds) + recommendScalingStateful(await coordinator.fleetMetrics(p.productId), thresholds) ) ); return {