feat(fleet): anti-flap hysteresis + autoscale Prometheus series & dashboard (ops #5)

Make the capacity autoscaling signal safe to act on automatically and observable
in Grafana.

Anti-flap hysteresis:
- New pure applyHysteresis: suppresses a direction reversal (scale_in after
  scale_out, or vice versa) within a cooldown window so a consumer cannot thrash
  capacity. A critical scale-out (queued work, zero usable capacity) always
  bypasses the cooldown. Cooldown anchor only advances on an emitted action, so a
  suppressed signal keeps counting down from the real last action.
- Process-wide per-product cooldown state (mirrors reaper/breaker in-mem state)
  with a test seam; cooldown tunable via FLEET_AUTOSCALE_COOLDOWN_SEC (default 300).
- GET /fleet/autoscale[/all] now serve the debounced (stateful) recommendation.

Observability:
- Prometheus exposition emits the RAW recommendation per product
  (fleet_autoscale_recommended_seats/delta/pressure + one-hot fleet_autoscale_action
  {action}). RAW (not stateful) so a scrape never mutates the cooldown anchors.
- Grafana "Fleet Overview" gains two panels: products recommending scale-out
  (stat) + recommended seat delta vs backlog (timeseries).

Docs: FLEET_AUTOSCALE_COOLDOWN_SEC in .env.example.

Tests: +10 (hysteresis/stateful/cooldown + prom autoscale series); full suite 1856
green; lint + tsc clean. Verified live: a throwaway Prometheus scraped the running
service and the dashboard PromQL returned real scale-out/scale-in recommendations
across products.

Generated with [Devin](https://cli.devin.ai/docs)

Co-Authored-By: Devin <158243242+devin-ai-integration[bot]@users.noreply.github.com>
This commit is contained in:
saravanakumardb1 2026-06-01 23:02:08 -07:00
parent 321cfe7546
commit c63736459b
7 changed files with 411 additions and 9 deletions

View File

@ -118,3 +118,8 @@ FLEET_AUTOSCALE_SCALE_OUT_PCT=85
FLEET_AUTOSCALE_SCALE_IN_PCT=20
FLEET_AUTOSCALE_MAX_STEP=5
FLEET_AUTOSCALE_MIN_SEATS=0
# Anti-flap cooldown (seconds): the /fleet/autoscale endpoints suppress a
# direction reversal (scale_in after scale_out, or vice versa) within this
# window so a consumer cannot thrash capacity. A critical scale-out (queued work
# with zero usable capacity) always bypasses the cooldown. Default 300.
FLEET_AUTOSCALE_COOLDOWN_SEC=300

View File

@ -9,7 +9,7 @@
"tags": ["fleet", "gigafactory"],
"uid": "fleet-overview",
"schemaVersion": 38,
"version": 1,
"version": 2,
"refresh": "30s",
"time": { "from": "now-6h", "to": "now" },
"timepicker": { "refresh_intervals": ["15s", "30s", "1m", "5m", "15m", "1h"] },
@ -179,6 +179,56 @@
"legendFormat": "stale"
}
]
},
{
"title": "Autoscale: products recommending scale-out",
"type": "stat",
"datasource": { "type": "prometheus", "uid": "prometheus" },
"gridPos": { "h": 4, "w": 6, "x": 0, "y": 20 },
"fieldConfig": {
"defaults": {
"thresholds": {
"mode": "absolute",
"steps": [
{ "color": "green", "value": null },
{ "color": "yellow", "value": 1 }
]
}
}
},
"options": {
"colorMode": "value",
"graphMode": "none",
"reduceOptions": { "calcs": ["lastNotNull"] }
},
"targets": [
{
"refId": "A",
"expr": "sum(fleet_autoscale_action{product=~\"$product\",action=\"scale_out\"}) or vector(0)"
}
]
},
{
"title": "Autoscale: recommended seat delta",
"type": "timeseries",
"description": "Recommended change in factory seats per product (positive = scale out, negative = scale in). Raw signal, pre-hysteresis.",
"datasource": { "type": "prometheus", "uid": "prometheus" },
"gridPos": { "h": 8, "w": 18, "x": 6, "y": 20 },
"fieldConfig": {
"defaults": { "custom": { "drawStyle": "line", "fillOpacity": 10 } }
},
"targets": [
{
"refId": "A",
"expr": "fleet_autoscale_delta{product=~\"$product\"}",
"legendFormat": "{{product}} delta"
},
{
"refId": "B",
"expr": "fleet_autoscale_pressure{product=~\"$product\"}",
"legendFormat": "{{product}} backlog"
}
]
}
]
}

View File

@ -8,8 +8,15 @@ import { MemoryDatastoreProvider } from '@bytelyst/datastore';
import { _resetDatastoreProvider, setProvider } from '../../lib/datastore.js';
import {
recommendScaling,
recommendScalingStateful,
applyHysteresis,
scalingThresholdsFromEnv,
cooldownMsFromEnv,
_resetScaleStates,
DEFAULT_SCALING_THRESHOLDS,
DEFAULT_COOLDOWN_MS,
type ScalingRecommendation,
type ScaleState,
} from './autoscaler.js';
import type { FleetMetrics, FleetAlert } from './coordinator.js';
@ -190,6 +197,151 @@ describe('scalingThresholdsFromEnv', () => {
});
});
function rec(over: Partial<ScalingRecommendation> = {}): ScalingRecommendation {
return {
productId: 'lysnrai',
action: 'scale_out',
recommendedSeats: 5,
delta: 1,
reason: 'x',
urgency: 'warning',
signals: {
queueDepth: 1,
blocked: 0,
active: 0,
seatsUsed: 4,
seatsTotal: 4,
utilizationPct: 100,
pressure: 1,
alertCodes: [],
},
generatedAt: '2026-06-01T00:00:00.000Z',
...over,
};
}
describe('applyHysteresis', () => {
const cfg = { cooldownMs: 1000 };
it('emits the first action and anchors the cooldown', () => {
const { recommendation, nextState } = applyHysteresis(
rec({ action: 'scale_out' }),
undefined,
100,
cfg
);
expect(recommendation.action).toBe('scale_out');
expect(nextState).toEqual({ action: 'scale_out', at: 100 });
});
it('suppresses a direction reversal within the cooldown (scale_in after scale_out)', () => {
const prior: ScaleState = { action: 'scale_out', at: 0 };
const { recommendation, nextState } = applyHysteresis(
rec({ action: 'scale_in', urgency: 'none', delta: -3 }),
prior,
500,
cfg
);
expect(recommendation.action).toBe('hold');
expect(recommendation.suppressed).toBe(true);
expect(recommendation.delta).toBe(0);
expect(recommendation.cooldownRemainingMs).toBe(500);
// The anchor is preserved so the cooldown keeps elapsing.
expect(nextState).toBe(prior);
});
it('allows the reversal once the cooldown has elapsed', () => {
const prior: ScaleState = { action: 'scale_out', at: 0 };
const { recommendation, nextState } = applyHysteresis(
rec({ action: 'scale_in', urgency: 'none' }),
prior,
1500,
cfg
);
expect(recommendation.action).toBe('scale_in');
expect(nextState).toEqual({ action: 'scale_in', at: 1500 });
});
it('NEVER debounces a critical scale-out (capacity emergency bypasses cooldown)', () => {
const prior: ScaleState = { action: 'scale_in', at: 0 };
const { recommendation } = applyHysteresis(
rec({ action: 'scale_out', urgency: 'critical' }),
prior,
100,
cfg
);
expect(recommendation.action).toBe('scale_out');
expect(recommendation.suppressed).toBeUndefined();
});
it('a hold never anchors the cooldown nor gets suppressed', () => {
const prior: ScaleState = { action: 'scale_out', at: 0 };
const { recommendation, nextState } = applyHysteresis(rec({ action: 'hold' }), prior, 100, cfg);
expect(recommendation.action).toBe('hold');
expect(nextState).toBe(prior);
});
it('does not suppress a same-direction repeat (keeps scaling out while saturated)', () => {
const prior: ScaleState = { action: 'scale_out', at: 0 };
const { recommendation, nextState } = applyHysteresis(
rec({ action: 'scale_out' }),
prior,
200,
cfg
);
expect(recommendation.action).toBe('scale_out');
expect(nextState).toEqual({ action: 'scale_out', at: 200 });
});
});
describe('recommendScalingStateful (in-memory cooldown)', () => {
afterEach(() => _resetScaleStates());
const saturated = () =>
metrics({
jobs: { queueDepth: 3, total: 3 },
factories: { seatsUsed: 4, seatsTotal: 4, utilizationPct: 100 },
alerts: [alert('saturated')],
});
const idle = () =>
metrics({
jobs: { queueDepth: 0, active: 0, total: 0 },
factories: { seatsUsed: 0, seatsTotal: 6, utilizationPct: 0 },
});
it('debounces a scale_out → scale_in flip, then allows it after the cooldown', () => {
const opts = { cooldownMs: 10_000 };
const out = recommendScalingStateful(saturated(), {}, { nowMs: 0, ...opts });
expect(out.action).toBe('scale_out');
const flip = recommendScalingStateful(idle(), {}, { nowMs: 1_000, ...opts });
expect(flip.action).toBe('hold');
expect(flip.suppressed).toBe(true);
const later = recommendScalingStateful(idle(), {}, { nowMs: 20_000, ...opts });
expect(later.action).toBe('scale_in');
});
it('isolates cooldown state per product', () => {
recommendScalingStateful(saturated(), {}, { nowMs: 0, cooldownMs: 10_000 });
const other = recommendScalingStateful(
{ ...idle(), productId: 'flowmonk' },
{},
{ nowMs: 1_000, cooldownMs: 10_000 }
);
expect(other.action).toBe('scale_in'); // different product → no cooldown carryover
});
});
describe('cooldownMsFromEnv', () => {
afterEach(() => delete process.env.FLEET_AUTOSCALE_COOLDOWN_SEC);
it('defaults when unset and reads seconds when configured', () => {
expect(cooldownMsFromEnv()).toBe(DEFAULT_COOLDOWN_MS);
process.env.FLEET_AUTOSCALE_COOLDOWN_SEC = '60';
expect(cooldownMsFromEnv()).toBe(60_000);
});
});
async function buildApp(): Promise<FastifyInstance> {
const { fleetRoutes } = await import('./routes.js');
const app = Fastify({ logger: false });

View File

@ -60,6 +60,10 @@ export interface ScalingRecommendation {
reason: string;
/** Urgency: `critical` when work is queued with zero usable capacity. */
urgency: 'critical' | 'warning' | 'none';
/** Set when anti-flap hysteresis held back an otherwise-actionable signal. */
suppressed?: boolean;
/** Milliseconds remaining on the cooldown when `suppressed` is true. */
cooldownRemainingMs?: number;
signals: ScalingSignals;
generatedAt: string;
}
@ -218,3 +222,102 @@ export function recommendScaling(
return hold('Capacity is balanced against current demand.');
}
// ── Anti-flap hysteresis (§5) ─────────────────────────────────────────────────
//
// The raw recommender reacts to the instantaneous snapshot, which can oscillate
// (scale_out on a queue blip, then scale_in the moment it drains). Hysteresis
// suppresses a *direction reversal* within a cooldown window so a consumer that
// acts on the signal cannot thrash capacity. A CRITICAL scale-out (queued work,
// zero usable capacity) is never debounced — that is a genuine emergency.
/** Last emitted scale decision for a product (the cooldown anchor). */
export interface ScaleState {
action: ScalingAction;
at: number;
}
/** Default cooldown between opposite-direction scale actions (5 minutes). */
export const DEFAULT_COOLDOWN_MS = 300_000;
/**
* Apply anti-flap hysteresis to a raw recommendation. PURE: given the raw
* recommendation, the product's prior emitted state, the current time, and the
* cooldown, returns the (possibly suppressed) recommendation plus the next state
* to persist. The cooldown anchor only advances when an action is actually
* emitted, so a suppressed signal keeps counting down from the real last action.
*/
export function applyHysteresis(
raw: ScalingRecommendation,
prior: ScaleState | undefined,
nowMs: number,
opts: { cooldownMs: number }
): { recommendation: ScalingRecommendation; nextState: ScaleState | undefined } {
// Holds never anchor the cooldown and never need suppressing.
if (raw.action === 'hold') return { recommendation: raw, nextState: prior };
// A capacity emergency always wins — never debounce a critical scale-out.
const isCriticalScaleOut = raw.action === 'scale_out' && raw.urgency === 'critical';
const elapsed = prior ? nowMs - prior.at : Number.POSITIVE_INFINITY;
const isReversal = !!prior && prior.action !== 'hold' && prior.action !== raw.action;
if (isReversal && elapsed < opts.cooldownMs && !isCriticalScaleOut) {
const remaining = opts.cooldownMs - elapsed;
const suppressed: ScalingRecommendation = {
...raw,
action: 'hold',
recommendedSeats: raw.signals.seatsTotal,
delta: 0,
urgency: 'none',
suppressed: true,
cooldownRemainingMs: remaining,
reason: `Hysteresis: holding '${raw.action}' for ${Math.ceil(remaining / 1000)}s after a recent '${prior!.action}' (anti-flap cooldown).`,
};
// Keep the original anchor so the cooldown keeps elapsing.
return { recommendation: suppressed, nextState: prior };
}
// Emitted (allowed) — advance the cooldown anchor to now.
return { recommendation: raw, nextState: { action: raw.action, at: nowMs } };
}
/** Process-wide per-product cooldown anchors (mirrors reaper/breaker in-mem state). */
const scaleStates = new Map<string, ScaleState>();
/** Test seam: clear all hysteresis state. */
export function _resetScaleStates(): void {
scaleStates.clear();
}
/** Snapshot of the current per-product hysteresis anchors (observability/tests). */
export function getScaleStates(): Record<string, ScaleState> {
return Object.fromEntries(scaleStates);
}
/** Resolve the cooldown from `FLEET_AUTOSCALE_COOLDOWN_SEC` (default 5m). */
export function cooldownMsFromEnv(): number {
const sec = envNum('FLEET_AUTOSCALE_COOLDOWN_SEC');
return sec !== undefined ? sec * 1000 : DEFAULT_COOLDOWN_MS;
}
/**
* Stateful recommendation: computes the raw signal then applies anti-flap
* hysteresis against this process's per-product cooldown state. This is what the
* actionable `/fleet/autoscale[/all]` endpoints serve. The Prometheus exposition
* deliberately uses the RAW recommender instead, so a scrape never mutates the
* cooldown anchors.
*/
export function recommendScalingStateful(
metrics: FleetMetrics,
thresholds: Partial<ScalingThresholds> = {},
opts?: { nowMs?: number; cooldownMs?: number }
): ScalingRecommendation {
const raw = recommendScaling(metrics, thresholds);
const nowMs = opts?.nowMs ?? Date.now();
const cooldownMs = opts?.cooldownMs ?? cooldownMsFromEnv();
const prior = scaleStates.get(metrics.productId);
const { recommendation, nextState } = applyHysteresis(raw, prior, nowMs, { cooldownMs });
if (nextState) scaleStates.set(metrics.productId, nextState);
return recommendation;
}

View File

@ -144,6 +144,44 @@ describe('renderFleetMetricsProm', () => {
expect(out).toContain('fleet_engine_breaker_open_count 1');
});
it('emits autoscale series only when a recommendation is attached', () => {
const out = renderFleetMetricsProm({
...base,
products: [
{
productId: 'lysnrai',
metrics: metrics(),
recommendation: {
productId: 'lysnrai',
action: 'scale_out',
recommendedSeats: 6,
delta: 2,
reason: 'x',
urgency: 'warning',
signals: {
queueDepth: 2,
blocked: 0,
active: 0,
seatsUsed: 4,
seatsTotal: 4,
utilizationPct: 100,
pressure: 2,
alertCodes: ['saturated'],
},
generatedAt: '2026-06-01T00:00:00.000Z',
},
},
],
});
expect(out).toContain('fleet_autoscale_recommended_seats{product="lysnrai"} 6');
expect(out).toContain('fleet_autoscale_delta{product="lysnrai"} 2');
expect(out).toContain('fleet_autoscale_pressure{product="lysnrai"} 2');
expect(out).toContain('fleet_autoscale_action{product="lysnrai",action="scale_out"} 1');
expect(out).toContain('fleet_autoscale_action{product="lysnrai",action="scale_in"} 0');
// no recommendation attached ⇒ no autoscale series
expect(renderFleetMetricsProm(base)).not.toContain('fleet_autoscale_');
});
it('escapes special characters in label values', () => {
const out = renderFleetMetricsProm({
...base,

View File

@ -11,11 +11,17 @@
import type { FleetMetrics } from './coordinator.js';
import type { ReaperStats } from './reaper.js';
import type { EngineBreakerEntry } from './engine-breaker.js';
import type { ScalingAction, ScalingRecommendation } from './autoscaler.js';
/** Scale actions emitted as the `action` label on `fleet_autoscale_action`. */
const SCALING_ACTIONS: readonly ScalingAction[] = ['scale_out', 'scale_in', 'hold'];
/** One product's computed metrics, paired with its id for labelling. */
export interface ProductMetrics {
productId: string;
metrics: FleetMetrics;
/** Optional raw autoscale recommendation — emitted as `fleet_autoscale_*`. */
recommendation?: ScalingRecommendation;
}
export interface FleetPromInput {
@ -52,7 +58,7 @@ export function renderFleetMetricsProm(input: FleetPromInput): string {
const def = (name: string, type: Metric['type'], help: string): Metric =>
(m[name] ??= { name, help, type, rows: [] });
for (const { productId, metrics } of input.products) {
for (const { productId, metrics, recommendation } of input.products) {
const p: [string, string][] = [['product', productId]];
def('fleet_jobs_total', 'gauge', 'Total jobs for the product').rows.push(
line('fleet_jobs_total', p, metrics.jobs.total)
@ -131,6 +137,41 @@ export function renderFleetMetricsProm(input: FleetPromInput): string {
);
}
}
// ── Capacity autoscaling signal (§5) — the RAW recommendation ──
if (recommendation) {
def(
'fleet_autoscale_recommended_seats',
'gauge',
'Recommended total factory seats for the product'
).rows.push(line('fleet_autoscale_recommended_seats', p, recommendation.recommendedSeats));
def(
'fleet_autoscale_delta',
'gauge',
'Recommended seat change (positive = scale out, negative = scale in)'
).rows.push(line('fleet_autoscale_delta', p, recommendation.delta));
def('fleet_autoscale_pressure', 'gauge', 'Runnable backlog driving scale-out').rows.push(
line('fleet_autoscale_pressure', p, recommendation.signals.pressure)
);
// One-hot the current action so a dashboard can count e.g. scale_out products.
const action = def(
'fleet_autoscale_action',
'gauge',
'Current scale recommendation (1 = active) labelled by action'
);
for (const a of SCALING_ACTIONS) {
action.rows.push(
line(
'fleet_autoscale_action',
[
['product', productId],
['action', a],
],
recommendation.action === a ? 1 : 0
)
);
}
}
}
// ── Process-wide reaper / GC counters ──

View File

@ -39,7 +39,11 @@ import * as trackerBridge from './tracker-bridge.js';
import { getReaperStats } from './reaper.js';
import { getEngineBreakerSnapshot } from './engine-breaker.js';
import { renderFleetMetricsProm, scrapeTokenMatches } from './prometheus.js';
import { recommendScaling, scalingThresholdsFromEnv } from './autoscaler.js';
import {
recommendScaling,
recommendScalingStateful,
scalingThresholdsFromEnv,
} from './autoscaler.js';
import { CORRELATION_HEADER, resolveCorrelationId } from './trace.js';
import { getAllProducts } from '../products/cache.js';
import {
@ -500,11 +504,18 @@ export async function fleetRoutes(app: FastifyInstance) {
}
}
const products = getAllProducts();
const thresholds = scalingThresholdsFromEnv();
const perProduct = await Promise.all(
products.map(async p => ({
productId: p.productId,
metrics: await coordinator.fleetMetrics(p.productId),
}))
products.map(async p => {
const metrics = await coordinator.fleetMetrics(p.productId);
// RAW recommendation here (not the stateful one): a scrape must never
// mutate the hysteresis cooldown anchors that the /autoscale endpoints own.
return {
productId: p.productId,
metrics,
recommendation: recommendScaling(metrics, thresholds),
};
})
);
const body = renderFleetMetricsProm({
products: perProduct,
@ -525,7 +536,9 @@ export async function fleetRoutes(app: FastifyInstance) {
await extractAuth(req);
const pid = await requireProductAccess(req);
const metrics = await coordinator.fleetMetrics(pid);
return recommendScaling(metrics, scalingThresholdsFromEnv());
// Stateful: anti-flap hysteresis debounces direction reversals so a consumer
// acting on this signal cannot thrash capacity (critical scale-out bypasses).
return recommendScalingStateful(metrics, scalingThresholdsFromEnv());
});
// ── Capacity autoscaling signal — GLOBAL (all products) ──
@ -541,7 +554,7 @@ export async function fleetRoutes(app: FastifyInstance) {
const thresholds = scalingThresholdsFromEnv();
const recommendations = await Promise.all(
getAllProducts().map(async p =>
recommendScaling(await coordinator.fleetMetrics(p.productId), thresholds)
recommendScalingStateful(await coordinator.fleetMetrics(p.productId), thresholds)
)
);
return {