feat(fleet): per-product budgets with pause/resume (Phase 3 Slice 3)

- FleetBudgetDoc: ceilingUsd, window, spentUsd, status (active/paused)
- FLEET_BUDGETS flag (default OFF = no enforcement, unchanged behavior)
- Enforcement in claimNextJob: paused or ceiling-exceeded → null
- accrueSpend(): monotonic spend accumulation, auto-pause at ceiling
- Budget routes: GET/PUT /fleet/budgets/:productId, pause, resume
- UpsertBudgetSchema for route validation
- 7 new coordinator tests (ceiling, auto-pause, manual pause/resume,
  flag OFF bypass, monotonic accounting, cross-product isolation)

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
This commit is contained in:
Saravanakumar D 2026-05-30 02:30:12 -07:00
parent 484ed05c1f
commit f4ea7b4a5b
5 changed files with 250 additions and 0 deletions

View File

@ -665,3 +665,82 @@ describe('fleet coordinator — Phase 3 DAG decomposition', () => {
expect(claim!.job.id).toBe(job.id);
});
});
describe('fleet coordinator — Phase 3 per-product budgets', () => {
beforeEach(() => setProvider(new MemoryDatastoreProvider()));
afterEach(() => {
_resetDatastoreProvider();
delete process.env.FLEET_BUDGETS;
});
it('under ceiling: claims proceed normally', async () => {
process.env.FLEET_BUDGETS = '1';
await coord.upsertBudget(PID, 100, 'monthly');
await coord.submitJob(PID, input({ idempotencyKey: 'budget-ok' }));
const claim = await coord.claimNextJob(factory());
expect(claim).not.toBeNull();
expect(claim!.job.idempotencyKey).toBe('budget-ok');
});
it('crossing ceiling: auto-pauses and blocks claims', async () => {
process.env.FLEET_BUDGETS = '1';
await coord.upsertBudget(PID, 10, 'monthly');
// Accrue spend up to ceiling
await coord.accrueSpend(PID, 10);
const budget = await coord.getBudget(PID);
expect(budget?.status).toBe('paused');
expect(budget?.spentUsd).toBe(10);
// Claims should now be blocked
await coord.submitJob(PID, input({ idempotencyKey: 'blocked-by-budget' }));
const claim = await coord.claimNextJob(factory());
expect(claim).toBeNull();
});
it('manual pause blocks claims', async () => {
process.env.FLEET_BUDGETS = '1';
await coord.upsertBudget(PID, 100, 'monthly');
await coord.pauseBudget(PID);
await coord.submitJob(PID, input({ idempotencyKey: 'paused-claim' }));
const claim = await coord.claimNextJob(factory());
expect(claim).toBeNull();
});
it('resume restores claims', async () => {
process.env.FLEET_BUDGETS = '1';
await coord.upsertBudget(PID, 100, 'monthly');
await coord.pauseBudget(PID);
await coord.resumeBudget(PID);
await coord.submitJob(PID, input({ idempotencyKey: 'resumed-claim' }));
const claim = await coord.claimNextJob(factory());
expect(claim).not.toBeNull();
});
it('flag OFF: no enforcement even with exceeded budget', async () => {
delete process.env.FLEET_BUDGETS;
await coord.upsertBudget(PID, 10, 'monthly');
await coord.accrueSpend(PID, 20); // exceeded
await coord.submitJob(PID, input({ idempotencyKey: 'flag-off-ok' }));
const claim = await coord.claimNextJob(factory());
expect(claim).not.toBeNull();
});
it('spend accounting is monotonic', async () => {
await coord.upsertBudget(PID, 100, 'monthly');
await coord.accrueSpend(PID, 5);
await coord.accrueSpend(PID, 3);
const budget = await coord.getBudget(PID);
expect(budget?.spentUsd).toBe(8);
});
it('other products unaffected by one paused budget', async () => {
process.env.FLEET_BUDGETS = '1';
await coord.upsertBudget(PID, 10, 'monthly');
await coord.pauseBudget(PID);
// A different product can still claim
const OTHER = 'chronomind';
await coord.submitJob(OTHER, input({ idempotencyKey: 'other-ok' }));
const claim = await coord.claimNextJob(factory({ productId: OTHER }));
expect(claim).not.toBeNull();
});
});

View File

@ -39,6 +39,8 @@ import {
type FleetLeaseDoc,
type FleetRunDoc,
type FleetStage,
type FleetBudgetDoc,
type BudgetWindow,
type SubmitJobInput,
} from './types.js';
@ -61,6 +63,12 @@ export function isPreemptionEnabled(): boolean {
return v === '1' || v === 'true' || v === 'on';
}
/** FLEET_BUDGETS env gate — default OFF (no budget enforcement). */
export function isBudgetsEnabled(): boolean {
const v = (process.env.FLEET_BUDGETS ?? '').trim().toLowerCase();
return v === '1' || v === 'true' || v === 'on';
}
/** Weight registry — in Phase 3 loaded from env/config; in-memory for now. */
let weightRegistry: FleetWeightRegistry = {};
@ -455,6 +463,15 @@ export async function claimNextJob(ctx: ClaimContext): Promise<ClaimResult | nul
const seatLimit = ctx.seatLimit ?? 1;
for (let i = 0; i < CLAIM_MAX_RETRIES; i++) {
// Phase 3: budget enforcement (FLEET_BUDGETS flag)
if (isBudgetsEnabled()) {
const budget = await repo.getBudget(ctx.productId);
if (budget) {
if (budget.status === 'paused') return null;
if (budget.spentUsd >= budget.ceilingUsd) return null;
}
}
const candidates = await repo.listJobs({ productId: ctx.productId });
const satisfied = await depsSatisfiedIds(candidates);
@ -850,6 +867,66 @@ export function isFactoryStale(
return nowMs - new Date(factory.lastHeartbeatAt).getTime() > maxAgeMs;
}
// ── Phase 3: Per-product budgets (§11/§13) ────────────────────────────────────
/**
* Get or create a product's budget. Returns existing budget or null if none set.
*/
export async function getBudget(productId: string): Promise<FleetBudgetDoc | null> {
return repo.getBudget(productId);
}
/** Upsert a product's budget (ceiling + window). */
export async function upsertBudget(
productId: string,
ceilingUsd: number,
window: BudgetWindow = 'monthly'
): Promise<FleetBudgetDoc> {
const now = new Date().toISOString();
const existing = await repo.getBudget(productId);
const doc: FleetBudgetDoc = {
id: productId,
productId,
ceilingUsd,
window,
spentUsd: existing?.spentUsd ?? 0,
status: existing?.status ?? 'active',
windowStart: existing?.windowStart ?? now,
updatedAt: now,
};
return repo.upsertBudget(doc);
}
/** Pause a product's budget — claims for this product are blocked. */
export async function pauseBudget(productId: string): Promise<FleetBudgetDoc | null> {
return repo.updateBudget(productId, { status: 'paused' });
}
/** Resume a product's budget — claims for this product proceed again. */
export async function resumeBudget(productId: string): Promise<FleetBudgetDoc | null> {
return repo.updateBudget(productId, { status: 'active' });
}
/**
* Accrue spend for a product. Called after a run completes with cost data.
* Idempotent per run: uses the run's costUsd from insights.
*/
export async function accrueSpend(
productId: string,
costUsd: number
): Promise<FleetBudgetDoc | null> {
if (costUsd <= 0) return repo.getBudget(productId);
const budget = await repo.getBudget(productId);
if (!budget) return null;
const newSpent = budget.spentUsd + costUsd;
const updates: Partial<FleetBudgetDoc> = { spentUsd: newSpent };
// Auto-pause when ceiling exceeded
if (newSpent >= budget.ceilingUsd && budget.status === 'active') {
updates.status = 'paused';
}
return repo.updateBudget(productId, updates);
}
// ── Reaper (§25.3) ────────────────────────────────────────────────────────────
export interface ReapResult {

View File

@ -20,6 +20,7 @@ import type { DocumentCollection } from '@bytelyst/datastore';
import { getCollection } from '../../lib/datastore.js';
import type {
FleetArtifactDoc,
FleetBudgetDoc,
FleetEventDoc,
FleetFactoryDoc,
FleetFactoryTokenDoc,
@ -56,6 +57,9 @@ function artifacts(): DocumentCollection<FleetArtifactDoc> {
function factoryTokens(): DocumentCollection<FleetFactoryTokenDoc> {
return getCollection<FleetFactoryTokenDoc>('fleet_factory_tokens', '/productId');
}
function budgets(): DocumentCollection<FleetBudgetDoc> {
return getCollection<FleetBudgetDoc>('fleet_budgets', '/productId');
}
/** Result of a compare-and-swap update. */
export type RevResult<T> = { ok: true; doc: T } | { ok: false; reason: 'not_found' | 'conflict' };
@ -359,3 +363,25 @@ export async function updateFactoryToken(
if (!cur) return null;
return factoryTokens().update(id, productId, updates);
}
// ── Budgets (Phase 3 §11/§13) ──────────────────────────────────────────────
export async function getBudget(productId: string): Promise<FleetBudgetDoc | null> {
return budgets().findById(productId, productId);
}
export async function upsertBudget(doc: FleetBudgetDoc): Promise<FleetBudgetDoc> {
return budgets().upsert(doc);
}
export async function updateBudget(
productId: string,
updates: Partial<FleetBudgetDoc>
): Promise<FleetBudgetDoc | null> {
const cur = await budgets().findById(productId, productId);
if (!cur) return null;
return budgets().update(productId, productId, {
...updates,
updatedAt: new Date().toISOString(),
});
}

View File

@ -43,6 +43,7 @@ import {
IngestItemSchema,
EchoJobSchema,
SubmitChildrenSchema,
UpsertBudgetSchema,
} from './types.js';
function badRequest(issues: { message: string }[]): never {
@ -323,6 +324,45 @@ export async function fleetRoutes(app: FastifyInstance) {
return { revoked };
});
// ── Phase 3 Budgets: GET/PUT per-product budget ──
app.get('/fleet/budgets/:productId', async req => {
await extractAuth(req);
const { productId } = req.params as { productId: string };
const budget = await coordinator.getBudget(productId);
if (!budget) throw new NotFoundError('No budget configured for this product');
return budget;
});
app.put('/fleet/budgets/:productId', async (req, reply) => {
await extractAuth(req);
const { productId } = req.params as { productId: string };
const parsed = UpsertBudgetSchema.safeParse(req.body);
if (!parsed.success) badRequest(parsed.error.issues);
const budget = await coordinator.upsertBudget(
productId,
parsed.data.ceilingUsd,
parsed.data.window
);
reply.code(200);
return budget;
});
app.post('/fleet/budgets/:productId/pause', async req => {
await extractAuth(req);
const { productId } = req.params as { productId: string };
const budget = await coordinator.pauseBudget(productId);
if (!budget) throw new NotFoundError('No budget configured for this product');
return budget;
});
app.post('/fleet/budgets/:productId/resume', async req => {
await extractAuth(req);
const { productId } = req.params as { productId: string };
const budget = await coordinator.resumeBudget(productId);
if (!budget) throw new NotFoundError('No budget configured for this product');
return budget;
});
// ── Phase 3 DAG: submit children for an existing parent ──
app.post('/fleet/jobs/:id/children', async (req, reply) => {
await extractAuth(req);

View File

@ -431,6 +431,34 @@ export const EchoJobSchema = z.object({
});
export type EchoJobInput = z.infer<typeof EchoJobSchema>;
// ── Phase 3: Per-product budgets (§11/§13) ──────────────────────────────────
export const BUDGET_STATUSES = ['active', 'paused'] as const;
export type BudgetStatus = (typeof BUDGET_STATUSES)[number];
export const BUDGET_WINDOWS = ['daily', 'weekly', 'monthly'] as const;
export type BudgetWindow = (typeof BUDGET_WINDOWS)[number];
/** FleetBudgetDoc — per-product cost ceiling (pk `/productId`). */
export const FleetBudgetDocSchema = z.object({
id: z.string(),
productId: z.string().min(1),
ceilingUsd: z.number().nonnegative(),
window: z.enum(BUDGET_WINDOWS).default('monthly'),
spentUsd: z.number().nonnegative().default(0),
status: z.enum(BUDGET_STATUSES).default('active'),
windowStart: z.string().optional(),
updatedAt: z.string(),
});
export type FleetBudgetDoc = z.infer<typeof FleetBudgetDocSchema>;
/** Upsert a product's budget config. */
export const UpsertBudgetSchema = z.object({
ceilingUsd: z.number().nonnegative(),
window: z.enum(BUDGET_WINDOWS).default('monthly'),
});
export type UpsertBudgetInput = z.infer<typeof UpsertBudgetSchema>;
/** Phase 3 DAG: submit children for an existing parent job. */
export const SubmitChildrenSchema = z.object({
children: z.array(