From bcd806c6fff79b7be489c824ec2e7f3ea06dee12 Mon Sep 17 00:00:00 2001 From: saravanakumardb1 Date: Mon, 1 Jun 2026 13:23:44 -0700 Subject: [PATCH] =?UTF-8?q?feat(fleet):=20complete=20budget=20enforcement?= =?UTF-8?q?=20=E2=80=94=20per-engine=20ceilings=20+=20overspend=20projecti?= =?UTF-8?q?on?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Builds on the existing product-level hard claim gate + idempotent accrual. - Per-engine sub-ceilings (engineCeilingsUsd) with per-engine accrual (spentByEngineUsd). An engine at its sub-ceiling is routed around at claim time via the same per-engine availability gate as the circuit breaker — it never pauses the whole product, so other engines keep flowing. Gated by FLEET_BUDGETS (defaults off). - /fleet/metrics now surfaces a budget summary (ceiling/spend/status/projection + per-engine breakdown) and derives guardrail alerts: budget_overspend_projected (burn-rate extrapolation, guarded against early-window false alarms), budget_exhausted, and engine_budget_exhausted. Surfaced whenever a budget exists, independent of the enforcement flag, so operators see the burn in dry-run. projectBudgetSpend is pure + unit-tested; per-engine spend follows the same idempotent accrual path as the total, so spentUsd and spentByEngineUsd agree. Generated with [Devin](https://cli.devin.ai/docs) Co-Authored-By: Devin <158243242+devin-ai-integration[bot]@users.noreply.github.com> --- .../src/modules/fleet/README.md | 25 ++- .../src/modules/fleet/coordinator.test.ts | 139 ++++++++++++++++ .../src/modules/fleet/coordinator.ts | 155 ++++++++++++++++-- .../src/modules/fleet/routes.ts | 3 +- .../src/modules/fleet/types.ts | 8 + 5 files changed, 316 insertions(+), 14 deletions(-) diff --git a/services/platform-service/src/modules/fleet/README.md b/services/platform-service/src/modules/fleet/README.md index 99f3e388..5c119e7e 100644 --- a/services/platform-service/src/modules/fleet/README.md +++ b/services/platform-service/src/modules/fleet/README.md @@ -76,13 +76,34 @@ unless explicitly enabled. unless the lookup is injected, so default scoring is untouched. - **Per-engine circuit breaker (`FLEET_ENGINE_BREAKER`).** `engine-breaker.ts` tracks failures per `(factoryId, engine)` (CLOSED → OPEN after N consecutive - failures → HALF_OPEN probe after a reset window). `releaseLease` always **records** + failures → HALF*OPEN probe after a reset window). `releaseLease` always **records** the outcome (so breaker state is observable in `/fleet/metrics → engineBreakers` even before enforcement); when the flag is on, an OPEN pair is removed from that factory's candidate set so a repeatedly-failing engine (e.g. `codex` erroring on a - box) is routed _around_. The breaker only ever **restricts** — it never forces a + box) is routed \_around*. The breaker only ever **restricts** — it never forces a route. Jobs without a concrete `engine` resolve on the runner and are never gated. +## Budgets (§3) + +Per-product cost ceilings (`FleetBudgetDoc`, pk `/productId`), enforced at claim +time when `FLEET_BUDGETS` is on: + +- **Product ceiling (hard gate).** A claim is blocked when the budget is `paused` + or `spentUsd >= ceilingUsd`. `accrueSpend` adds each shipped run's actual + `insights.costUsd` (idempotent per `:`) and auto-pauses the + product at the ceiling. +- **Per-engine sub-ceilings.** `engineCeilingsUsd` (e.g. `{ "codex": 50 }`) cap + spend for a single engine; `spentByEngineUsd` tracks per-engine accrual. An engine + at its sub-ceiling is **routed around** at claim time (composed into the same + per-engine availability gate as the circuit breaker) — it never pauses the whole + product, so other engines keep flowing. +- **Projected-overspend alerts.** `/fleet/metrics` surfaces a `budget` summary + (ceiling, spend, status, per-engine breakdown, and the projected end-of-window + spend) and derives alerts: `budget_overspend_projected` (warning, when the burn + rate extrapolates past the ceiling — guarded so it won't fire in the first 10% of + a window), `budget_exhausted` (critical), and `engine_budget_exhausted` (warning). + These surface whenever a budget is configured, independent of the enforcement flag. + ## Submit semantics (idempotency + deps) - same `idempotencyKey` + identical `bodyMd` → returns the existing job (dedup). diff --git a/services/platform-service/src/modules/fleet/coordinator.test.ts b/services/platform-service/src/modules/fleet/coordinator.test.ts index ac2ad1a8..2e0ab280 100644 --- a/services/platform-service/src/modules/fleet/coordinator.test.ts +++ b/services/platform-service/src/modules/fleet/coordinator.test.ts @@ -1083,6 +1083,55 @@ describe('fleet coordinator — Phase 3 per-product budgets', () => { expect(m.alerts.some(a => a.code === 'saturated')).toBe(true); }); + it('fleetMetrics: no budget configured ⇒ budget summary is null', async () => { + await coord.submitJob(PID, input({ idempotencyKey: 'nb' })); + const m = await coord.fleetMetrics(PID); + expect(m.budget).toBeNull(); + expect(m.alerts.some(a => a.code.startsWith('budget'))).toBe(false); + }); + + it('fleetMetrics: surfaces a budget summary and projected-overspend warning', async () => { + const start = Date.parse('2026-06-01T00:00:00.000Z'); + await coord.upsertBudget(PID, 100, 'monthly'); + await repo.updateBudget(PID, { windowStart: new Date(start).toISOString() }); + await coord.accrueSpend(PID, 30, 'r1'); // 30 spent + // 3 days into a 30-day window (10% elapsed) ⇒ project 30 / 0.1 = 300 > 100. + const now = start + 3 * 86_400_000; + const m = await coord.fleetMetrics(PID, { nowMs: now }); + expect(m.budget).toMatchObject({ ceilingUsd: 100, spentUsd: 30, status: 'active' }); + expect(m.budget?.projectedUsd).toBeGreaterThan(100); + expect( + m.alerts.some(a => a.code === 'budget_overspend_projected' && a.level === 'warning') + ).toBe(true); + }); + + it('fleetMetrics: an on-track budget raises no projected-overspend alert', async () => { + const start = Date.parse('2026-06-01T00:00:00.000Z'); + await coord.upsertBudget(PID, 100, 'monthly'); + await repo.updateBudget(PID, { windowStart: new Date(start).toISOString() }); + await coord.accrueSpend(PID, 5, 'r1'); // 5 spent, 50% elapsed ⇒ project 10 < 100 + const now = start + 15 * 86_400_000; + const m = await coord.fleetMetrics(PID, { nowMs: now }); + expect(m.alerts.some(a => a.code === 'budget_overspend_projected')).toBe(false); + }); + + it('fleetMetrics: an exhausted budget raises a critical alert', async () => { + await coord.upsertBudget(PID, 10, 'monthly'); + await coord.accrueSpend(PID, 10, 'r1'); // hits ceiling + const m = await coord.fleetMetrics(PID); + expect(m.alerts.some(a => a.code === 'budget_exhausted' && a.level === 'critical')).toBe(true); + }); + + it('fleetMetrics: a per-engine sub-ceiling reached raises engine_budget_exhausted', async () => { + await coord.upsertBudget(PID, 1000, 'monthly', { codex: 5 }); + await coord.accrueSpend(PID, 5, 'r1', 'codex'); + const m = await coord.fleetMetrics(PID); + expect(m.budget?.engines).toContainEqual( + expect.objectContaining({ engine: 'codex', exhausted: true }) + ); + expect(m.alerts.some(a => a.code === 'engine_budget_exhausted')).toBe(true); + }); + // ── MULTI-REVIEWER HUMAN GATE (§14 Phase 3) ── async function toBuilding(idempotencyKey = 'rev-1') { const { job } = await coord.submitJob(PID, input({ idempotencyKey })); @@ -1253,6 +1302,96 @@ describe('fleet coordinator — budget accrual idempotency + ship wiring', () => }); }); +describe('fleet coordinator §3 — per-engine budget ceilings', () => { + beforeEach(() => setProvider(new MemoryDatastoreProvider())); + afterEach(() => { + _resetDatastoreProvider(); + delete process.env.FLEET_BUDGETS; + }); + + it('projectBudgetSpend: extrapolates burn rate, caps fraction, and guards early window', () => { + const start = '2026-06-01T00:00:00.000Z'; + const t0 = Date.parse(start); + const b = { spentUsd: 30, window: 'monthly' as const, windowStart: start }; + // 3 days into 30 (10% elapsed) ⇒ 30 / 0.1 = 300 + expect(coord.projectBudgetSpend(b, t0 + 3 * 86_400_000)?.projectedUsd).toBeCloseTo(300); + // beyond the window the fraction caps at 1 ⇒ projection = actual spend + expect(coord.projectBudgetSpend(b, t0 + 60 * 86_400_000)?.projectedUsd).toBeCloseTo(30); + // too early (under minFraction) ⇒ no projection (avoid crying wolf) + expect(coord.projectBudgetSpend(b, t0 + 3_600_000)).toBeNull(); + // no windowStart ⇒ cannot project + expect(coord.projectBudgetSpend({ spentUsd: 5, window: 'daily' }, t0)).toBeNull(); + }); + + it('upsert stores per-engine ceilings and preserves them on a re-upsert', async () => { + await coord.upsertBudget(PID, 100, 'monthly', { codex: 20 }); + expect((await coord.getBudget(PID))?.engineCeilingsUsd).toEqual({ codex: 20 }); + // re-upsert WITHOUT engine ceilings preserves the prior ones (and spend) + await coord.accrueSpend(PID, 3, 'r1', 'codex'); + await coord.upsertBudget(PID, 200, 'monthly'); + const b = await coord.getBudget(PID); + expect(b?.ceilingUsd).toBe(200); + expect(b?.engineCeilingsUsd).toEqual({ codex: 20 }); + expect(b?.spentByEngineUsd).toEqual({ codex: 3 }); + }); + + it('accrueSpend tracks per-engine spend alongside the total (idempotent per run)', async () => { + await coord.upsertBudget(PID, 100, 'monthly'); + await coord.accrueSpend(PID, 4, 'r1', 'codex'); + await coord.accrueSpend(PID, 4, 'r1', 'codex'); // duplicate run — no double count + await coord.accrueSpend(PID, 6, 'r2', 'devin'); + const b = await coord.getBudget(PID); + expect(b?.spentUsd).toBe(10); + expect(b?.spentByEngineUsd).toEqual({ codex: 4, devin: 6 }); + }); + + it('shipping accrues per-engine spend keyed by the run engine (FLEET_BUDGETS on)', async () => { + process.env.FLEET_BUDGETS = '1'; + await coord.upsertBudget(PID, 100, 'monthly'); + await coord.submitJob(PID, input({ idempotencyKey: 'ship-eng', engine: 'codex' })); + const claim = await coord.claimNextJob(factory({ capabilities: ['engine:codex'] })); + const claimed = claim!.job; + const runs = await repo.listRunsByJob(claimed.id); + await repo.updateRun(runs[0]!.id, claimed.id, { insights: { engine: 'codex', costUsd: 9 } }); + await coord.patchJobFenced(claimed.id, PID, { + leaseEpoch: claimed.leaseEpoch, + stage: 'shipped', + }); + expect((await coord.getBudget(PID))?.spentByEngineUsd).toEqual({ codex: 9 }); + }); + + it('an engine at its sub-ceiling is routed around at claim time (FLEET_BUDGETS on)', async () => { + process.env.FLEET_BUDGETS = '1'; + // product ceiling high (no pause); codex sub-ceiling reached. + await coord.upsertBudget(PID, 1000, 'monthly', { codex: 5 }); + await coord.accrueSpend(PID, 5, 'seed', 'codex'); // codex spend hits its ceiling + await coord.submitJob(PID, input({ idempotencyKey: 'c', engine: 'codex' })); + await coord.submitJob(PID, input({ idempotencyKey: 'd', engine: 'devin' })); + const claim = await coord.claimNextJob( + factory({ capabilities: ['engine:codex', 'engine:devin'] }) + ); + expect(claim?.job.engine).toBe('devin'); // codex blocked, devin uncapped + }); + + it('a sole engine at its sub-ceiling yields no claim (FLEET_BUDGETS on)', async () => { + process.env.FLEET_BUDGETS = '1'; + await coord.upsertBudget(PID, 1000, 'monthly', { codex: 5 }); + await coord.accrueSpend(PID, 5, 'seed', 'codex'); + await coord.submitJob(PID, input({ engine: 'codex' })); + const claim = await coord.claimNextJob(factory({ capabilities: ['engine:codex'] })); + expect(claim).toBeNull(); + }); + + it('with FLEET_BUDGETS off, per-engine ceilings are ignored (default behavior)', async () => { + delete process.env.FLEET_BUDGETS; + await coord.upsertBudget(PID, 1000, 'monthly', { codex: 5 }); + await coord.accrueSpend(PID, 5, 'seed', 'codex'); + await coord.submitJob(PID, input({ engine: 'codex' })); + const claim = await coord.claimNextJob(factory({ capabilities: ['engine:codex'] })); + expect(claim?.job.engine).toBe('codex'); + }); +}); + describe('fleet coordinator — DAG submitChildren cycle detection', () => { beforeEach(() => setProvider(new MemoryDatastoreProvider())); afterEach(() => _resetDatastoreProvider()); diff --git a/services/platform-service/src/modules/fleet/coordinator.ts b/services/platform-service/src/modules/fleet/coordinator.ts index 274945a9..c3018315 100644 --- a/services/platform-service/src/modules/fleet/coordinator.ts +++ b/services/platform-service/src/modules/fleet/coordinator.ts @@ -569,11 +569,26 @@ export async function claimNextJob(ctx: ClaimContext): Promise engineBreaker.allow(ctx.factoryId, engine) - : undefined; + // §2/§3 per-engine availability gate — composes two RESTRICTING guards (each + // flag-gated; neither can force a route): + // • circuit breaker: stop routing an engine that keeps failing on THIS factory. + // • per-engine budget: stop routing an engine that has hit its sub-ceiling. + // The product budget itself (read fresh each retry below) is unchanged during a + // claim, so its per-engine ceilings are read once here. + const breakerOn = isEngineBreakerEnabled(); + const budgetForEngines = isBudgetsEnabled() ? await repo.getBudget(ctx.productId) : null; + const engineCeilings = budgetForEngines?.engineCeilingsUsd; + const engineBudgetExhausted = (engine: string): boolean => { + const ceiling = engineCeilings?.[engine]; + if (ceiling === undefined) return false; + return (budgetForEngines?.spentByEngineUsd?.[engine] ?? 0) >= ceiling; + }; + const isEngineAvailable = + breakerOn || engineCeilings + ? (engine: string): boolean => + (!breakerOn || engineBreaker.allow(ctx.factoryId, engine)) && + !engineBudgetExhausted(engine) + : undefined; for (let i = 0; i < CLAIM_MAX_RETRIES; i++) { // Phase 3: budget enforcement (FLEET_BUDGETS flag) @@ -852,7 +867,12 @@ export async function patchJobFenced( await mergeRunPrOnShip(job, latest); // Budgets (flag-gated): accrue the run's actual cost, idempotent per run. if (isBudgetsEnabled()) { - await accrueSpend(productId, latest?.insights?.costUsd ?? 0, `${jobId}:${job.leaseEpoch}`); + await accrueSpend( + productId, + latest?.insights?.costUsd ?? 0, + `${jobId}:${job.leaseEpoch}`, + latest?.insights?.engine ?? latest?.engine + ); } } catch { // swallow — run/budget bookkeeping is downstream of the job lifecycle @@ -1654,11 +1674,12 @@ export async function getBudget(productId: string): Promise ): Promise { const now = new Date().toISOString(); const existing = await repo.getBudget(productId); @@ -1671,6 +1692,13 @@ export async function upsertBudget( status: existing?.status ?? 'active', windowStart: existing?.windowStart ?? now, accruedRunIds: existing?.accruedRunIds ?? [], + // Per-engine config is replaced when provided, else preserved; spend is preserved. + ...(engineCeilingsUsd !== undefined + ? { engineCeilingsUsd } + : existing?.engineCeilingsUsd + ? { engineCeilingsUsd: existing.engineCeilingsUsd } + : {}), + ...(existing?.spentByEngineUsd ? { spentByEngineUsd: existing.spentByEngineUsd } : {}), updatedAt: now, }; return repo.upsertBudget(doc); @@ -1697,7 +1725,8 @@ export async function resumeBudget(productId: string): Promise { const budget = await repo.getBudget(productId); if (!budget) return null; @@ -1709,7 +1738,14 @@ export async function accrueSpend( if (costUsd > 0) { const newSpent = budget.spentUsd + costUsd; updates.spentUsd = newSpent; - // Auto-pause when ceiling exceeded + // Track per-engine spend alongside the total (drives per-engine ceilings + the + // dashboard breakdown). Same idempotency guard as the total above. + if (engine && engine !== 'unknown') { + const prev = budget.spentByEngineUsd ?? {}; + updates.spentByEngineUsd = { ...prev, [engine]: (prev[engine] ?? 0) + costUsd }; + } + // Auto-pause when the PRODUCT ceiling is exceeded (per-engine ceilings only + // route around that engine at claim time — they never pause the whole product). if (newSpent >= budget.ceilingUsd && budget.status === 'active') { updates.status = 'paused'; } @@ -1718,6 +1754,42 @@ export async function accrueSpend( return repo.updateBudget(productId, updates); } +/** Nominal length of each budget window, in ms (monthly ≈ 30d). */ +const BUDGET_WINDOW_MS: Record = { + daily: 86_400_000, + weekly: 7 * 86_400_000, + monthly: 30 * 86_400_000, +}; + +/** A budget's projected end-of-window spend at the current burn rate. */ +export interface BudgetProjection { + /** Fraction of the window elapsed, in (0,1]. */ + fraction: number; + /** spentUsd extrapolated to the full window (spentUsd / fraction). */ + projectedUsd: number; +} + +/** + * Project a budget's full-window spend from its burn rate so far (PURE). Returns + * null when we shouldn't project yet: no `windowStart`, or too little of the + * window has elapsed (`minFraction`, default 10%) — early in a window a couple of + * runs extrapolate to wildly inflated totals and would cry wolf. The fraction is + * capped at 1 so a window that has fully elapsed reports actual spend, not more. + */ +export function projectBudgetSpend( + budget: Pick, + nowMs: number, + minFraction = 0.1 +): BudgetProjection | null { + if (!budget.windowStart) return null; + const start = Date.parse(budget.windowStart); + if (Number.isNaN(start)) return null; + const windowMs = BUDGET_WINDOW_MS[budget.window]; + const fraction = Math.min(1, Math.max(0, nowMs - start) / windowMs); + if (fraction < minFraction) return null; + return { fraction, projectedUsd: budget.spentUsd / fraction }; +} + // ── Cost burndown (§14 Phase 3 — spend-over-time vs ceiling) ────────────────── /** One day of the burndown series. */ @@ -1810,9 +1882,23 @@ export interface FleetMetrics { seatsTotal: number; utilizationPct: number; }; + /** Budget summary for the dashboard (§3) — null when no budget is configured. */ + budget: FleetBudgetSummary | null; alerts: FleetAlert[]; } +/** Compact budget snapshot surfaced on metrics for the dashboard burndown/guardrail. */ +export interface FleetBudgetSummary { + ceilingUsd: number; + spentUsd: number; + status: string; + window: BudgetWindow; + /** Projected end-of-window spend at the current burn rate (null if too early). */ + projectedUsd: number | null; + /** Per-engine spend vs sub-ceiling, only for engines that HAVE a ceiling. */ + engines: { engine: string; spentUsd: number; ceilingUsd: number; exhausted: boolean }[]; +} + /** A factory is considered stale after this long without a heartbeat. */ const DEFAULT_STALE_FACTORY_MS = 90_000; /** A queued job waiting longer than this raises a starvation alert. */ @@ -1832,9 +1918,10 @@ export async function fleetMetrics( const staleMaxAgeMs = opts?.staleMaxAgeMs ?? DEFAULT_STALE_FACTORY_MS; const starvationMs = opts?.starvationMs ?? DEFAULT_STARVATION_MS; - const [allJobs, factories] = await Promise.all([ + const [allJobs, factories, budgetDoc] = await Promise.all([ repo.listJobs({ productId }), repo.listFactories(productId), + repo.getBudget(productId), ]); const byStage = Object.fromEntries(FLEET_STAGES.map(s => [s, 0])) as Record; @@ -1914,6 +2001,51 @@ export async function fleetMetrics( }); } + // ── Budget guardrails (§3): exhaustion + projected-overspend warnings ── + // Surfaced whenever a budget is configured (read-only; independent of the + // FLEET_BUDGETS enforcement flag, so operators see the burn even in dry-run). + let budget: FleetBudgetSummary | null = null; + if (budgetDoc && budgetDoc.ceilingUsd > 0) { + const projection = projectBudgetSpend(budgetDoc, nowMs); + const engines = Object.entries(budgetDoc.engineCeilingsUsd ?? {}).map( + ([engine, ceilingUsd]) => { + const spentUsd = budgetDoc.spentByEngineUsd?.[engine] ?? 0; + return { engine, spentUsd, ceilingUsd, exhausted: spentUsd >= ceilingUsd }; + } + ); + budget = { + ceilingUsd: budgetDoc.ceilingUsd, + spentUsd: budgetDoc.spentUsd, + status: budgetDoc.status, + window: budgetDoc.window, + projectedUsd: projection?.projectedUsd ?? null, + engines, + }; + + if (budgetDoc.spentUsd >= budgetDoc.ceilingUsd) { + alerts.push({ + level: 'critical', + code: 'budget_exhausted', + message: `Budget exhausted: $${budgetDoc.spentUsd.toFixed(2)} of the $${budgetDoc.ceilingUsd.toFixed(2)} ${budgetDoc.window} ceiling spent — new claims are blocked.`, + }); + } else if (projection && projection.projectedUsd > budgetDoc.ceilingUsd) { + alerts.push({ + level: 'warning', + code: 'budget_overspend_projected', + message: `Projected ${budgetDoc.window} spend $${projection.projectedUsd.toFixed(2)} exceeds the $${budgetDoc.ceilingUsd.toFixed(2)} ceiling at the current burn rate (${Math.round(projection.fraction * 100)}% of the window elapsed, $${budgetDoc.spentUsd.toFixed(2)} spent).`, + }); + } + for (const e of engines) { + if (e.exhausted) { + alerts.push({ + level: 'warning', + code: 'engine_budget_exhausted', + message: `Engine '${e.engine}' reached its $${e.ceilingUsd.toFixed(2)} sub-ceiling ($${e.spentUsd.toFixed(2)} spent) — it is being routed around.`, + }); + } + } + } + return { productId, generatedAt: new Date(nowMs).toISOString(), @@ -1927,6 +2059,7 @@ export async function fleetMetrics( seatsTotal, utilizationPct, }, + budget, alerts, }; } diff --git a/services/platform-service/src/modules/fleet/routes.ts b/services/platform-service/src/modules/fleet/routes.ts index a62f704f..02879ca5 100644 --- a/services/platform-service/src/modules/fleet/routes.ts +++ b/services/platform-service/src/modules/fleet/routes.ts @@ -603,7 +603,8 @@ export async function fleetRoutes(app: FastifyInstance) { const budget = await coordinator.upsertBudget( productId, parsed.data.ceilingUsd, - parsed.data.window + parsed.data.window, + parsed.data.engineCeilingsUsd ); reply.code(200); return budget; diff --git a/services/platform-service/src/modules/fleet/types.ts b/services/platform-service/src/modules/fleet/types.ts index a804c871..1cf52676 100644 --- a/services/platform-service/src/modules/fleet/types.ts +++ b/services/platform-service/src/modules/fleet/types.ts @@ -570,6 +570,12 @@ export const FleetBudgetDocSchema = z.object({ windowStart: z.string().optional(), /** Run identifiers already accrued — makes accrueSpend idempotent per run. */ accruedRunIds: z.array(z.string()).default([]), + /** Optional per-engine sub-ceilings (USD). An engine that reaches its ceiling is + * routed around at claim time (only that engine is blocked, not the product). */ + engineCeilingsUsd: z.record(z.string(), z.number().nonnegative()).optional(), + /** Per-engine accrued spend (USD), tracked alongside `spentUsd` for the ceilings + * above + the dashboard breakdown. Idempotent via the same `accruedRunIds`. */ + spentByEngineUsd: z.record(z.string(), z.number().nonnegative()).optional(), updatedAt: z.string(), }); export type FleetBudgetDoc = z.infer; @@ -595,6 +601,8 @@ export type FleetQueueStateDoc = z.infer; export const UpsertBudgetSchema = z.object({ ceilingUsd: z.number().nonnegative(), window: z.enum(BUDGET_WINDOWS).default('monthly'), + /** Optional per-engine sub-ceilings (USD), e.g. `{ "codex": 50 }`. */ + engineCeilingsUsd: z.record(z.string(), z.number().nonnegative()).optional(), }); export type UpsertBudgetInput = z.infer;