From f4ea7b4a5b9cb1af54b383c27d6c378afa8089f4 Mon Sep 17 00:00:00 2001 From: Saravanakumar D Date: Sat, 30 May 2026 02:30:12 -0700 Subject: [PATCH] feat(fleet): per-product budgets with pause/resume (Phase 3 Slice 3) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - FleetBudgetDoc: ceilingUsd, window, spentUsd, status (active/paused) - FLEET_BUDGETS flag (default OFF = no enforcement, unchanged behavior) - Enforcement in claimNextJob: paused or ceiling-exceeded → null - accrueSpend(): monotonic spend accumulation, auto-pause at ceiling - Budget routes: GET/PUT /fleet/budgets/:productId, pause, resume - UpsertBudgetSchema for route validation - 7 new coordinator tests (ceiling, auto-pause, manual pause/resume, flag OFF bypass, monotonic accounting, cross-product isolation) Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../src/modules/fleet/coordinator.test.ts | 79 +++++++++++++++++++ .../src/modules/fleet/coordinator.ts | 77 ++++++++++++++++++ .../src/modules/fleet/repository.ts | 26 ++++++ .../src/modules/fleet/routes.ts | 40 ++++++++++ .../src/modules/fleet/types.ts | 28 +++++++ 5 files changed, 250 insertions(+) diff --git a/services/platform-service/src/modules/fleet/coordinator.test.ts b/services/platform-service/src/modules/fleet/coordinator.test.ts index 04041c62..35dc4bfb 100644 --- a/services/platform-service/src/modules/fleet/coordinator.test.ts +++ b/services/platform-service/src/modules/fleet/coordinator.test.ts @@ -665,3 +665,82 @@ describe('fleet coordinator — Phase 3 DAG decomposition', () => { expect(claim!.job.id).toBe(job.id); }); }); + +describe('fleet coordinator — Phase 3 per-product budgets', () => { + beforeEach(() => setProvider(new MemoryDatastoreProvider())); + afterEach(() => { + _resetDatastoreProvider(); + delete process.env.FLEET_BUDGETS; + }); + + it('under ceiling: claims proceed normally', async () => { + process.env.FLEET_BUDGETS = '1'; + await coord.upsertBudget(PID, 100, 'monthly'); + await coord.submitJob(PID, input({ idempotencyKey: 'budget-ok' })); + const claim = await coord.claimNextJob(factory()); + expect(claim).not.toBeNull(); + expect(claim!.job.idempotencyKey).toBe('budget-ok'); + }); + + it('crossing ceiling: auto-pauses and blocks claims', async () => { + process.env.FLEET_BUDGETS = '1'; + await coord.upsertBudget(PID, 10, 'monthly'); + // Accrue spend up to ceiling + await coord.accrueSpend(PID, 10); + const budget = await coord.getBudget(PID); + expect(budget?.status).toBe('paused'); + expect(budget?.spentUsd).toBe(10); + + // Claims should now be blocked + await coord.submitJob(PID, input({ idempotencyKey: 'blocked-by-budget' })); + const claim = await coord.claimNextJob(factory()); + expect(claim).toBeNull(); + }); + + it('manual pause blocks claims', async () => { + process.env.FLEET_BUDGETS = '1'; + await coord.upsertBudget(PID, 100, 'monthly'); + await coord.pauseBudget(PID); + await coord.submitJob(PID, input({ idempotencyKey: 'paused-claim' })); + const claim = await coord.claimNextJob(factory()); + expect(claim).toBeNull(); + }); + + it('resume restores claims', async () => { + process.env.FLEET_BUDGETS = '1'; + await coord.upsertBudget(PID, 100, 'monthly'); + await coord.pauseBudget(PID); + await coord.resumeBudget(PID); + await coord.submitJob(PID, input({ idempotencyKey: 'resumed-claim' })); + const claim = await coord.claimNextJob(factory()); + expect(claim).not.toBeNull(); + }); + + it('flag OFF: no enforcement even with exceeded budget', async () => { + delete process.env.FLEET_BUDGETS; + await coord.upsertBudget(PID, 10, 'monthly'); + await coord.accrueSpend(PID, 20); // exceeded + await coord.submitJob(PID, input({ idempotencyKey: 'flag-off-ok' })); + const claim = await coord.claimNextJob(factory()); + expect(claim).not.toBeNull(); + }); + + it('spend accounting is monotonic', async () => { + await coord.upsertBudget(PID, 100, 'monthly'); + await coord.accrueSpend(PID, 5); + await coord.accrueSpend(PID, 3); + const budget = await coord.getBudget(PID); + expect(budget?.spentUsd).toBe(8); + }); + + it('other products unaffected by one paused budget', async () => { + process.env.FLEET_BUDGETS = '1'; + await coord.upsertBudget(PID, 10, 'monthly'); + await coord.pauseBudget(PID); + // A different product can still claim + const OTHER = 'chronomind'; + await coord.submitJob(OTHER, input({ idempotencyKey: 'other-ok' })); + const claim = await coord.claimNextJob(factory({ productId: OTHER })); + expect(claim).not.toBeNull(); + }); +}); diff --git a/services/platform-service/src/modules/fleet/coordinator.ts b/services/platform-service/src/modules/fleet/coordinator.ts index 1b8c30c6..604ad2c7 100644 --- a/services/platform-service/src/modules/fleet/coordinator.ts +++ b/services/platform-service/src/modules/fleet/coordinator.ts @@ -39,6 +39,8 @@ import { type FleetLeaseDoc, type FleetRunDoc, type FleetStage, + type FleetBudgetDoc, + type BudgetWindow, type SubmitJobInput, } from './types.js'; @@ -61,6 +63,12 @@ export function isPreemptionEnabled(): boolean { return v === '1' || v === 'true' || v === 'on'; } +/** FLEET_BUDGETS env gate — default OFF (no budget enforcement). */ +export function isBudgetsEnabled(): boolean { + const v = (process.env.FLEET_BUDGETS ?? '').trim().toLowerCase(); + return v === '1' || v === 'true' || v === 'on'; +} + /** Weight registry — in Phase 3 loaded from env/config; in-memory for now. */ let weightRegistry: FleetWeightRegistry = {}; @@ -455,6 +463,15 @@ export async function claimNextJob(ctx: ClaimContext): Promise= budget.ceilingUsd) return null; + } + } + const candidates = await repo.listJobs({ productId: ctx.productId }); const satisfied = await depsSatisfiedIds(candidates); @@ -850,6 +867,66 @@ export function isFactoryStale( return nowMs - new Date(factory.lastHeartbeatAt).getTime() > maxAgeMs; } +// ── Phase 3: Per-product budgets (§11/§13) ──────────────────────────────────── + +/** + * Get or create a product's budget. Returns existing budget or null if none set. + */ +export async function getBudget(productId: string): Promise { + return repo.getBudget(productId); +} + +/** Upsert a product's budget (ceiling + window). */ +export async function upsertBudget( + productId: string, + ceilingUsd: number, + window: BudgetWindow = 'monthly' +): Promise { + const now = new Date().toISOString(); + const existing = await repo.getBudget(productId); + const doc: FleetBudgetDoc = { + id: productId, + productId, + ceilingUsd, + window, + spentUsd: existing?.spentUsd ?? 0, + status: existing?.status ?? 'active', + windowStart: existing?.windowStart ?? now, + updatedAt: now, + }; + return repo.upsertBudget(doc); +} + +/** Pause a product's budget — claims for this product are blocked. */ +export async function pauseBudget(productId: string): Promise { + return repo.updateBudget(productId, { status: 'paused' }); +} + +/** Resume a product's budget — claims for this product proceed again. */ +export async function resumeBudget(productId: string): Promise { + return repo.updateBudget(productId, { status: 'active' }); +} + +/** + * Accrue spend for a product. Called after a run completes with cost data. + * Idempotent per run: uses the run's costUsd from insights. + */ +export async function accrueSpend( + productId: string, + costUsd: number +): Promise { + if (costUsd <= 0) return repo.getBudget(productId); + const budget = await repo.getBudget(productId); + if (!budget) return null; + const newSpent = budget.spentUsd + costUsd; + const updates: Partial = { spentUsd: newSpent }; + // Auto-pause when ceiling exceeded + if (newSpent >= budget.ceilingUsd && budget.status === 'active') { + updates.status = 'paused'; + } + return repo.updateBudget(productId, updates); +} + // ── Reaper (§25.3) ──────────────────────────────────────────────────────────── export interface ReapResult { diff --git a/services/platform-service/src/modules/fleet/repository.ts b/services/platform-service/src/modules/fleet/repository.ts index 23b6378e..2a952830 100644 --- a/services/platform-service/src/modules/fleet/repository.ts +++ b/services/platform-service/src/modules/fleet/repository.ts @@ -20,6 +20,7 @@ import type { DocumentCollection } from '@bytelyst/datastore'; import { getCollection } from '../../lib/datastore.js'; import type { FleetArtifactDoc, + FleetBudgetDoc, FleetEventDoc, FleetFactoryDoc, FleetFactoryTokenDoc, @@ -56,6 +57,9 @@ function artifacts(): DocumentCollection { function factoryTokens(): DocumentCollection { return getCollection('fleet_factory_tokens', '/productId'); } +function budgets(): DocumentCollection { + return getCollection('fleet_budgets', '/productId'); +} /** Result of a compare-and-swap update. */ export type RevResult = { ok: true; doc: T } | { ok: false; reason: 'not_found' | 'conflict' }; @@ -359,3 +363,25 @@ export async function updateFactoryToken( if (!cur) return null; return factoryTokens().update(id, productId, updates); } + +// ── Budgets (Phase 3 §11/§13) ────────────────────────────────────────────── + +export async function getBudget(productId: string): Promise { + return budgets().findById(productId, productId); +} + +export async function upsertBudget(doc: FleetBudgetDoc): Promise { + return budgets().upsert(doc); +} + +export async function updateBudget( + productId: string, + updates: Partial +): Promise { + const cur = await budgets().findById(productId, productId); + if (!cur) return null; + return budgets().update(productId, productId, { + ...updates, + updatedAt: new Date().toISOString(), + }); +} diff --git a/services/platform-service/src/modules/fleet/routes.ts b/services/platform-service/src/modules/fleet/routes.ts index 6e04aa1d..ce8affd7 100644 --- a/services/platform-service/src/modules/fleet/routes.ts +++ b/services/platform-service/src/modules/fleet/routes.ts @@ -43,6 +43,7 @@ import { IngestItemSchema, EchoJobSchema, SubmitChildrenSchema, + UpsertBudgetSchema, } from './types.js'; function badRequest(issues: { message: string }[]): never { @@ -323,6 +324,45 @@ export async function fleetRoutes(app: FastifyInstance) { return { revoked }; }); + // ── Phase 3 Budgets: GET/PUT per-product budget ── + app.get('/fleet/budgets/:productId', async req => { + await extractAuth(req); + const { productId } = req.params as { productId: string }; + const budget = await coordinator.getBudget(productId); + if (!budget) throw new NotFoundError('No budget configured for this product'); + return budget; + }); + + app.put('/fleet/budgets/:productId', async (req, reply) => { + await extractAuth(req); + const { productId } = req.params as { productId: string }; + const parsed = UpsertBudgetSchema.safeParse(req.body); + if (!parsed.success) badRequest(parsed.error.issues); + const budget = await coordinator.upsertBudget( + productId, + parsed.data.ceilingUsd, + parsed.data.window + ); + reply.code(200); + return budget; + }); + + app.post('/fleet/budgets/:productId/pause', async req => { + await extractAuth(req); + const { productId } = req.params as { productId: string }; + const budget = await coordinator.pauseBudget(productId); + if (!budget) throw new NotFoundError('No budget configured for this product'); + return budget; + }); + + app.post('/fleet/budgets/:productId/resume', async req => { + await extractAuth(req); + const { productId } = req.params as { productId: string }; + const budget = await coordinator.resumeBudget(productId); + if (!budget) throw new NotFoundError('No budget configured for this product'); + return budget; + }); + // ── Phase 3 DAG: submit children for an existing parent ── app.post('/fleet/jobs/:id/children', async (req, reply) => { await extractAuth(req); diff --git a/services/platform-service/src/modules/fleet/types.ts b/services/platform-service/src/modules/fleet/types.ts index 0114f82d..bd1d3edd 100644 --- a/services/platform-service/src/modules/fleet/types.ts +++ b/services/platform-service/src/modules/fleet/types.ts @@ -431,6 +431,34 @@ export const EchoJobSchema = z.object({ }); export type EchoJobInput = z.infer; +// ── Phase 3: Per-product budgets (§11/§13) ────────────────────────────────── + +export const BUDGET_STATUSES = ['active', 'paused'] as const; +export type BudgetStatus = (typeof BUDGET_STATUSES)[number]; + +export const BUDGET_WINDOWS = ['daily', 'weekly', 'monthly'] as const; +export type BudgetWindow = (typeof BUDGET_WINDOWS)[number]; + +/** FleetBudgetDoc — per-product cost ceiling (pk `/productId`). */ +export const FleetBudgetDocSchema = z.object({ + id: z.string(), + productId: z.string().min(1), + ceilingUsd: z.number().nonnegative(), + window: z.enum(BUDGET_WINDOWS).default('monthly'), + spentUsd: z.number().nonnegative().default(0), + status: z.enum(BUDGET_STATUSES).default('active'), + windowStart: z.string().optional(), + updatedAt: z.string(), +}); +export type FleetBudgetDoc = z.infer; + +/** Upsert a product's budget config. */ +export const UpsertBudgetSchema = z.object({ + ceilingUsd: z.number().nonnegative(), + window: z.enum(BUDGET_WINDOWS).default('monthly'), +}); +export type UpsertBudgetInput = z.infer; + /** Phase 3 DAG: submit children for an existing parent job. */ export const SubmitChildrenSchema = z.object({ children: z.array(