feat(fleet): complete budget enforcement — per-engine ceilings + overspend projection

Builds on the existing product-level hard claim gate + idempotent accrual.

- Per-engine sub-ceilings (engineCeilingsUsd) with per-engine accrual
  (spentByEngineUsd). An engine at its sub-ceiling is routed around at claim
  time via the same per-engine availability gate as the circuit breaker — it
  never pauses the whole product, so other engines keep flowing. Gated by
  FLEET_BUDGETS (defaults off).
- /fleet/metrics now surfaces a budget summary (ceiling/spend/status/projection
  + per-engine breakdown) and derives guardrail alerts: budget_overspend_projected
  (burn-rate extrapolation, guarded against early-window false alarms),
  budget_exhausted, and engine_budget_exhausted. Surfaced whenever a budget exists,
  independent of the enforcement flag, so operators see the burn in dry-run.

projectBudgetSpend is pure + unit-tested; per-engine spend follows the same
idempotent accrual path as the total, so spentUsd and spentByEngineUsd agree.

Generated with [Devin](https://cli.devin.ai/docs)

Co-Authored-By: Devin <158243242+devin-ai-integration[bot]@users.noreply.github.com>
This commit is contained in:
saravanakumardb1 2026-06-01 13:23:44 -07:00
parent bdbb0a8ce4
commit bcd806c6ff
5 changed files with 316 additions and 14 deletions

View File

@ -76,13 +76,34 @@ unless explicitly enabled.
unless the lookup is injected, so default scoring is untouched.
- **Per-engine circuit breaker (`FLEET_ENGINE_BREAKER`).** `engine-breaker.ts`
tracks failures per `(factoryId, engine)` (CLOSED → OPEN after N consecutive
failures → HALF_OPEN probe after a reset window). `releaseLease` always **records**
failures → HALF*OPEN probe after a reset window). `releaseLease` always **records**
the outcome (so breaker state is observable in `/fleet/metrics → engineBreakers`
even before enforcement); when the flag is on, an OPEN pair is removed from that
factory's candidate set so a repeatedly-failing engine (e.g. `codex` erroring on a
box) is routed _around_. The breaker only ever **restricts** — it never forces a
box) is routed \_around*. The breaker only ever **restricts** — it never forces a
route. Jobs without a concrete `engine` resolve on the runner and are never gated.
## Budgets (§3)
Per-product cost ceilings (`FleetBudgetDoc`, pk `/productId`), enforced at claim
time when `FLEET_BUDGETS` is on:
- **Product ceiling (hard gate).** A claim is blocked when the budget is `paused`
or `spentUsd >= ceilingUsd`. `accrueSpend` adds each shipped run's actual
`insights.costUsd` (idempotent per `<jobId>:<leaseEpoch>`) and auto-pauses the
product at the ceiling.
- **Per-engine sub-ceilings.** `engineCeilingsUsd` (e.g. `{ "codex": 50 }`) cap
spend for a single engine; `spentByEngineUsd` tracks per-engine accrual. An engine
at its sub-ceiling is **routed around** at claim time (composed into the same
per-engine availability gate as the circuit breaker) — it never pauses the whole
product, so other engines keep flowing.
- **Projected-overspend alerts.** `/fleet/metrics` surfaces a `budget` summary
(ceiling, spend, status, per-engine breakdown, and the projected end-of-window
spend) and derives alerts: `budget_overspend_projected` (warning, when the burn
rate extrapolates past the ceiling — guarded so it won't fire in the first 10% of
a window), `budget_exhausted` (critical), and `engine_budget_exhausted` (warning).
These surface whenever a budget is configured, independent of the enforcement flag.
## Submit semantics (idempotency + deps)
- same `idempotencyKey` + identical `bodyMd` → returns the existing job (dedup).

View File

@ -1083,6 +1083,55 @@ describe('fleet coordinator — Phase 3 per-product budgets', () => {
expect(m.alerts.some(a => a.code === 'saturated')).toBe(true);
});
it('fleetMetrics: no budget configured ⇒ budget summary is null', async () => {
await coord.submitJob(PID, input({ idempotencyKey: 'nb' }));
const m = await coord.fleetMetrics(PID);
expect(m.budget).toBeNull();
expect(m.alerts.some(a => a.code.startsWith('budget'))).toBe(false);
});
it('fleetMetrics: surfaces a budget summary and projected-overspend warning', async () => {
const start = Date.parse('2026-06-01T00:00:00.000Z');
await coord.upsertBudget(PID, 100, 'monthly');
await repo.updateBudget(PID, { windowStart: new Date(start).toISOString() });
await coord.accrueSpend(PID, 30, 'r1'); // 30 spent
// 3 days into a 30-day window (10% elapsed) ⇒ project 30 / 0.1 = 300 > 100.
const now = start + 3 * 86_400_000;
const m = await coord.fleetMetrics(PID, { nowMs: now });
expect(m.budget).toMatchObject({ ceilingUsd: 100, spentUsd: 30, status: 'active' });
expect(m.budget?.projectedUsd).toBeGreaterThan(100);
expect(
m.alerts.some(a => a.code === 'budget_overspend_projected' && a.level === 'warning')
).toBe(true);
});
it('fleetMetrics: an on-track budget raises no projected-overspend alert', async () => {
const start = Date.parse('2026-06-01T00:00:00.000Z');
await coord.upsertBudget(PID, 100, 'monthly');
await repo.updateBudget(PID, { windowStart: new Date(start).toISOString() });
await coord.accrueSpend(PID, 5, 'r1'); // 5 spent, 50% elapsed ⇒ project 10 < 100
const now = start + 15 * 86_400_000;
const m = await coord.fleetMetrics(PID, { nowMs: now });
expect(m.alerts.some(a => a.code === 'budget_overspend_projected')).toBe(false);
});
it('fleetMetrics: an exhausted budget raises a critical alert', async () => {
await coord.upsertBudget(PID, 10, 'monthly');
await coord.accrueSpend(PID, 10, 'r1'); // hits ceiling
const m = await coord.fleetMetrics(PID);
expect(m.alerts.some(a => a.code === 'budget_exhausted' && a.level === 'critical')).toBe(true);
});
it('fleetMetrics: a per-engine sub-ceiling reached raises engine_budget_exhausted', async () => {
await coord.upsertBudget(PID, 1000, 'monthly', { codex: 5 });
await coord.accrueSpend(PID, 5, 'r1', 'codex');
const m = await coord.fleetMetrics(PID);
expect(m.budget?.engines).toContainEqual(
expect.objectContaining({ engine: 'codex', exhausted: true })
);
expect(m.alerts.some(a => a.code === 'engine_budget_exhausted')).toBe(true);
});
// ── MULTI-REVIEWER HUMAN GATE (§14 Phase 3) ──
async function toBuilding(idempotencyKey = 'rev-1') {
const { job } = await coord.submitJob(PID, input({ idempotencyKey }));
@ -1253,6 +1302,96 @@ describe('fleet coordinator — budget accrual idempotency + ship wiring', () =>
});
});
describe('fleet coordinator §3 — per-engine budget ceilings', () => {
beforeEach(() => setProvider(new MemoryDatastoreProvider()));
afterEach(() => {
_resetDatastoreProvider();
delete process.env.FLEET_BUDGETS;
});
it('projectBudgetSpend: extrapolates burn rate, caps fraction, and guards early window', () => {
const start = '2026-06-01T00:00:00.000Z';
const t0 = Date.parse(start);
const b = { spentUsd: 30, window: 'monthly' as const, windowStart: start };
// 3 days into 30 (10% elapsed) ⇒ 30 / 0.1 = 300
expect(coord.projectBudgetSpend(b, t0 + 3 * 86_400_000)?.projectedUsd).toBeCloseTo(300);
// beyond the window the fraction caps at 1 ⇒ projection = actual spend
expect(coord.projectBudgetSpend(b, t0 + 60 * 86_400_000)?.projectedUsd).toBeCloseTo(30);
// too early (under minFraction) ⇒ no projection (avoid crying wolf)
expect(coord.projectBudgetSpend(b, t0 + 3_600_000)).toBeNull();
// no windowStart ⇒ cannot project
expect(coord.projectBudgetSpend({ spentUsd: 5, window: 'daily' }, t0)).toBeNull();
});
it('upsert stores per-engine ceilings and preserves them on a re-upsert', async () => {
await coord.upsertBudget(PID, 100, 'monthly', { codex: 20 });
expect((await coord.getBudget(PID))?.engineCeilingsUsd).toEqual({ codex: 20 });
// re-upsert WITHOUT engine ceilings preserves the prior ones (and spend)
await coord.accrueSpend(PID, 3, 'r1', 'codex');
await coord.upsertBudget(PID, 200, 'monthly');
const b = await coord.getBudget(PID);
expect(b?.ceilingUsd).toBe(200);
expect(b?.engineCeilingsUsd).toEqual({ codex: 20 });
expect(b?.spentByEngineUsd).toEqual({ codex: 3 });
});
it('accrueSpend tracks per-engine spend alongside the total (idempotent per run)', async () => {
await coord.upsertBudget(PID, 100, 'monthly');
await coord.accrueSpend(PID, 4, 'r1', 'codex');
await coord.accrueSpend(PID, 4, 'r1', 'codex'); // duplicate run — no double count
await coord.accrueSpend(PID, 6, 'r2', 'devin');
const b = await coord.getBudget(PID);
expect(b?.spentUsd).toBe(10);
expect(b?.spentByEngineUsd).toEqual({ codex: 4, devin: 6 });
});
it('shipping accrues per-engine spend keyed by the run engine (FLEET_BUDGETS on)', async () => {
process.env.FLEET_BUDGETS = '1';
await coord.upsertBudget(PID, 100, 'monthly');
await coord.submitJob(PID, input({ idempotencyKey: 'ship-eng', engine: 'codex' }));
const claim = await coord.claimNextJob(factory({ capabilities: ['engine:codex'] }));
const claimed = claim!.job;
const runs = await repo.listRunsByJob(claimed.id);
await repo.updateRun(runs[0]!.id, claimed.id, { insights: { engine: 'codex', costUsd: 9 } });
await coord.patchJobFenced(claimed.id, PID, {
leaseEpoch: claimed.leaseEpoch,
stage: 'shipped',
});
expect((await coord.getBudget(PID))?.spentByEngineUsd).toEqual({ codex: 9 });
});
it('an engine at its sub-ceiling is routed around at claim time (FLEET_BUDGETS on)', async () => {
process.env.FLEET_BUDGETS = '1';
// product ceiling high (no pause); codex sub-ceiling reached.
await coord.upsertBudget(PID, 1000, 'monthly', { codex: 5 });
await coord.accrueSpend(PID, 5, 'seed', 'codex'); // codex spend hits its ceiling
await coord.submitJob(PID, input({ idempotencyKey: 'c', engine: 'codex' }));
await coord.submitJob(PID, input({ idempotencyKey: 'd', engine: 'devin' }));
const claim = await coord.claimNextJob(
factory({ capabilities: ['engine:codex', 'engine:devin'] })
);
expect(claim?.job.engine).toBe('devin'); // codex blocked, devin uncapped
});
it('a sole engine at its sub-ceiling yields no claim (FLEET_BUDGETS on)', async () => {
process.env.FLEET_BUDGETS = '1';
await coord.upsertBudget(PID, 1000, 'monthly', { codex: 5 });
await coord.accrueSpend(PID, 5, 'seed', 'codex');
await coord.submitJob(PID, input({ engine: 'codex' }));
const claim = await coord.claimNextJob(factory({ capabilities: ['engine:codex'] }));
expect(claim).toBeNull();
});
it('with FLEET_BUDGETS off, per-engine ceilings are ignored (default behavior)', async () => {
delete process.env.FLEET_BUDGETS;
await coord.upsertBudget(PID, 1000, 'monthly', { codex: 5 });
await coord.accrueSpend(PID, 5, 'seed', 'codex');
await coord.submitJob(PID, input({ engine: 'codex' }));
const claim = await coord.claimNextJob(factory({ capabilities: ['engine:codex'] }));
expect(claim?.job.engine).toBe('codex');
});
});
describe('fleet coordinator — DAG submitChildren cycle detection', () => {
beforeEach(() => setProvider(new MemoryDatastoreProvider()));
afterEach(() => _resetDatastoreProvider());

View File

@ -569,11 +569,26 @@ export async function claimNextJob(ctx: ClaimContext): Promise<ClaimResult | nul
const runs = await repo.listRunsByProduct(ctx.productId);
engineQuality = computeEngineQuality(runs).lookup;
}
// §2 per-engine circuit breaker: stop routing an engine that keeps failing on
// THIS factory (flag-gated; only RESTRICTS the candidate set, never forces one).
const isEngineAvailable = isEngineBreakerEnabled()
? (engine: string): boolean => engineBreaker.allow(ctx.factoryId, engine)
: undefined;
// §2/§3 per-engine availability gate — composes two RESTRICTING guards (each
// flag-gated; neither can force a route):
// • circuit breaker: stop routing an engine that keeps failing on THIS factory.
// • per-engine budget: stop routing an engine that has hit its sub-ceiling.
// The product budget itself (read fresh each retry below) is unchanged during a
// claim, so its per-engine ceilings are read once here.
const breakerOn = isEngineBreakerEnabled();
const budgetForEngines = isBudgetsEnabled() ? await repo.getBudget(ctx.productId) : null;
const engineCeilings = budgetForEngines?.engineCeilingsUsd;
const engineBudgetExhausted = (engine: string): boolean => {
const ceiling = engineCeilings?.[engine];
if (ceiling === undefined) return false;
return (budgetForEngines?.spentByEngineUsd?.[engine] ?? 0) >= ceiling;
};
const isEngineAvailable =
breakerOn || engineCeilings
? (engine: string): boolean =>
(!breakerOn || engineBreaker.allow(ctx.factoryId, engine)) &&
!engineBudgetExhausted(engine)
: undefined;
for (let i = 0; i < CLAIM_MAX_RETRIES; i++) {
// Phase 3: budget enforcement (FLEET_BUDGETS flag)
@ -852,7 +867,12 @@ export async function patchJobFenced(
await mergeRunPrOnShip(job, latest);
// Budgets (flag-gated): accrue the run's actual cost, idempotent per run.
if (isBudgetsEnabled()) {
await accrueSpend(productId, latest?.insights?.costUsd ?? 0, `${jobId}:${job.leaseEpoch}`);
await accrueSpend(
productId,
latest?.insights?.costUsd ?? 0,
`${jobId}:${job.leaseEpoch}`,
latest?.insights?.engine ?? latest?.engine
);
}
} catch {
// swallow — run/budget bookkeeping is downstream of the job lifecycle
@ -1654,11 +1674,12 @@ export async function getBudget(productId: string): Promise<FleetBudgetDoc | nul
return repo.getBudget(productId);
}
/** Upsert a product's budget (ceiling + window). */
/** Upsert a product's budget (ceiling + window, plus optional per-engine ceilings). */
export async function upsertBudget(
productId: string,
ceilingUsd: number,
window: BudgetWindow = 'monthly'
window: BudgetWindow = 'monthly',
engineCeilingsUsd?: Record<string, number>
): Promise<FleetBudgetDoc> {
const now = new Date().toISOString();
const existing = await repo.getBudget(productId);
@ -1671,6 +1692,13 @@ export async function upsertBudget(
status: existing?.status ?? 'active',
windowStart: existing?.windowStart ?? now,
accruedRunIds: existing?.accruedRunIds ?? [],
// Per-engine config is replaced when provided, else preserved; spend is preserved.
...(engineCeilingsUsd !== undefined
? { engineCeilingsUsd }
: existing?.engineCeilingsUsd
? { engineCeilingsUsd: existing.engineCeilingsUsd }
: {}),
...(existing?.spentByEngineUsd ? { spentByEngineUsd: existing.spentByEngineUsd } : {}),
updatedAt: now,
};
return repo.upsertBudget(doc);
@ -1697,7 +1725,8 @@ export async function resumeBudget(productId: string): Promise<FleetBudgetDoc |
export async function accrueSpend(
productId: string,
costUsd: number,
runId?: string
runId?: string,
engine?: string
): Promise<FleetBudgetDoc | null> {
const budget = await repo.getBudget(productId);
if (!budget) return null;
@ -1709,7 +1738,14 @@ export async function accrueSpend(
if (costUsd > 0) {
const newSpent = budget.spentUsd + costUsd;
updates.spentUsd = newSpent;
// Auto-pause when ceiling exceeded
// Track per-engine spend alongside the total (drives per-engine ceilings + the
// dashboard breakdown). Same idempotency guard as the total above.
if (engine && engine !== 'unknown') {
const prev = budget.spentByEngineUsd ?? {};
updates.spentByEngineUsd = { ...prev, [engine]: (prev[engine] ?? 0) + costUsd };
}
// Auto-pause when the PRODUCT ceiling is exceeded (per-engine ceilings only
// route around that engine at claim time — they never pause the whole product).
if (newSpent >= budget.ceilingUsd && budget.status === 'active') {
updates.status = 'paused';
}
@ -1718,6 +1754,42 @@ export async function accrueSpend(
return repo.updateBudget(productId, updates);
}
/** Nominal length of each budget window, in ms (monthly ≈ 30d). */
const BUDGET_WINDOW_MS: Record<BudgetWindow, number> = {
daily: 86_400_000,
weekly: 7 * 86_400_000,
monthly: 30 * 86_400_000,
};
/** A budget's projected end-of-window spend at the current burn rate. */
export interface BudgetProjection {
/** Fraction of the window elapsed, in (0,1]. */
fraction: number;
/** spentUsd extrapolated to the full window (spentUsd / fraction). */
projectedUsd: number;
}
/**
* Project a budget's full-window spend from its burn rate so far (PURE). Returns
* null when we shouldn't project yet: no `windowStart`, or too little of the
* window has elapsed (`minFraction`, default 10%) early in a window a couple of
* runs extrapolate to wildly inflated totals and would cry wolf. The fraction is
* capped at 1 so a window that has fully elapsed reports actual spend, not more.
*/
export function projectBudgetSpend(
budget: Pick<FleetBudgetDoc, 'spentUsd' | 'window' | 'windowStart'>,
nowMs: number,
minFraction = 0.1
): BudgetProjection | null {
if (!budget.windowStart) return null;
const start = Date.parse(budget.windowStart);
if (Number.isNaN(start)) return null;
const windowMs = BUDGET_WINDOW_MS[budget.window];
const fraction = Math.min(1, Math.max(0, nowMs - start) / windowMs);
if (fraction < minFraction) return null;
return { fraction, projectedUsd: budget.spentUsd / fraction };
}
// ── Cost burndown (§14 Phase 3 — spend-over-time vs ceiling) ──────────────────
/** One day of the burndown series. */
@ -1810,9 +1882,23 @@ export interface FleetMetrics {
seatsTotal: number;
utilizationPct: number;
};
/** Budget summary for the dashboard (§3) — null when no budget is configured. */
budget: FleetBudgetSummary | null;
alerts: FleetAlert[];
}
/** Compact budget snapshot surfaced on metrics for the dashboard burndown/guardrail. */
export interface FleetBudgetSummary {
ceilingUsd: number;
spentUsd: number;
status: string;
window: BudgetWindow;
/** Projected end-of-window spend at the current burn rate (null if too early). */
projectedUsd: number | null;
/** Per-engine spend vs sub-ceiling, only for engines that HAVE a ceiling. */
engines: { engine: string; spentUsd: number; ceilingUsd: number; exhausted: boolean }[];
}
/** A factory is considered stale after this long without a heartbeat. */
const DEFAULT_STALE_FACTORY_MS = 90_000;
/** A queued job waiting longer than this raises a starvation alert. */
@ -1832,9 +1918,10 @@ export async function fleetMetrics(
const staleMaxAgeMs = opts?.staleMaxAgeMs ?? DEFAULT_STALE_FACTORY_MS;
const starvationMs = opts?.starvationMs ?? DEFAULT_STARVATION_MS;
const [allJobs, factories] = await Promise.all([
const [allJobs, factories, budgetDoc] = await Promise.all([
repo.listJobs({ productId }),
repo.listFactories(productId),
repo.getBudget(productId),
]);
const byStage = Object.fromEntries(FLEET_STAGES.map(s => [s, 0])) as Record<FleetStage, number>;
@ -1914,6 +2001,51 @@ export async function fleetMetrics(
});
}
// ── Budget guardrails (§3): exhaustion + projected-overspend warnings ──
// Surfaced whenever a budget is configured (read-only; independent of the
// FLEET_BUDGETS enforcement flag, so operators see the burn even in dry-run).
let budget: FleetBudgetSummary | null = null;
if (budgetDoc && budgetDoc.ceilingUsd > 0) {
const projection = projectBudgetSpend(budgetDoc, nowMs);
const engines = Object.entries(budgetDoc.engineCeilingsUsd ?? {}).map(
([engine, ceilingUsd]) => {
const spentUsd = budgetDoc.spentByEngineUsd?.[engine] ?? 0;
return { engine, spentUsd, ceilingUsd, exhausted: spentUsd >= ceilingUsd };
}
);
budget = {
ceilingUsd: budgetDoc.ceilingUsd,
spentUsd: budgetDoc.spentUsd,
status: budgetDoc.status,
window: budgetDoc.window,
projectedUsd: projection?.projectedUsd ?? null,
engines,
};
if (budgetDoc.spentUsd >= budgetDoc.ceilingUsd) {
alerts.push({
level: 'critical',
code: 'budget_exhausted',
message: `Budget exhausted: $${budgetDoc.spentUsd.toFixed(2)} of the $${budgetDoc.ceilingUsd.toFixed(2)} ${budgetDoc.window} ceiling spent — new claims are blocked.`,
});
} else if (projection && projection.projectedUsd > budgetDoc.ceilingUsd) {
alerts.push({
level: 'warning',
code: 'budget_overspend_projected',
message: `Projected ${budgetDoc.window} spend $${projection.projectedUsd.toFixed(2)} exceeds the $${budgetDoc.ceilingUsd.toFixed(2)} ceiling at the current burn rate (${Math.round(projection.fraction * 100)}% of the window elapsed, $${budgetDoc.spentUsd.toFixed(2)} spent).`,
});
}
for (const e of engines) {
if (e.exhausted) {
alerts.push({
level: 'warning',
code: 'engine_budget_exhausted',
message: `Engine '${e.engine}' reached its $${e.ceilingUsd.toFixed(2)} sub-ceiling ($${e.spentUsd.toFixed(2)} spent) — it is being routed around.`,
});
}
}
}
return {
productId,
generatedAt: new Date(nowMs).toISOString(),
@ -1927,6 +2059,7 @@ export async function fleetMetrics(
seatsTotal,
utilizationPct,
},
budget,
alerts,
};
}

View File

@ -603,7 +603,8 @@ export async function fleetRoutes(app: FastifyInstance) {
const budget = await coordinator.upsertBudget(
productId,
parsed.data.ceilingUsd,
parsed.data.window
parsed.data.window,
parsed.data.engineCeilingsUsd
);
reply.code(200);
return budget;

View File

@ -570,6 +570,12 @@ export const FleetBudgetDocSchema = z.object({
windowStart: z.string().optional(),
/** Run identifiers already accrued — makes accrueSpend idempotent per run. */
accruedRunIds: z.array(z.string()).default([]),
/** Optional per-engine sub-ceilings (USD). An engine that reaches its ceiling is
* routed around at claim time (only that engine is blocked, not the product). */
engineCeilingsUsd: z.record(z.string(), z.number().nonnegative()).optional(),
/** Per-engine accrued spend (USD), tracked alongside `spentUsd` for the ceilings
* above + the dashboard breakdown. Idempotent via the same `accruedRunIds`. */
spentByEngineUsd: z.record(z.string(), z.number().nonnegative()).optional(),
updatedAt: z.string(),
});
export type FleetBudgetDoc = z.infer<typeof FleetBudgetDocSchema>;
@ -595,6 +601,8 @@ export type FleetQueueStateDoc = z.infer<typeof FleetQueueStateDocSchema>;
export const UpsertBudgetSchema = z.object({
ceilingUsd: z.number().nonnegative(),
window: z.enum(BUDGET_WINDOWS).default('monthly'),
/** Optional per-engine sub-ceilings (USD), e.g. `{ "codex": 50 }`. */
engineCeilingsUsd: z.record(z.string(), z.number().nonnegative()).optional(),
});
export type UpsertBudgetInput = z.infer<typeof UpsertBudgetSchema>;