fix(fleet): Phase 3 hardening — budget authz, idempotent accrual, cycle detection, artifact
Re-applies 4 defects merged with Phase 3 (still present on main), ported from the stale reference branch and adapted to current main. FIX 1 (BLOCKER): all 5 budget routes (GET/burndown/PUT/pause/resume) read productId from the URL with no caller check, so a caller for product A could read or modify product B budget. Add requireOwnProduct() -> 403 on mismatch. FIX 2: stop tracking services/platform-service/.data/platform-events.json (the EVENT_BUS_FILE runtime log); gitignore .data/. FIX 3: accrueSpend was definition-only (budgets never accrued) and not idempotent. Add accruedRunIds to FleetBudgetDoc; accrueSpend(productId, costUsd, runId) no-ops on a seen runId; wire it into patchJobFenced on stage shipped, flag-gated + best-effort + idempotent via jobId:leaseEpoch. Accrue the run ACTUAL insights.costUsd so spentUsd and costBurndown agree. FIX 4: submitChildren cycle detection is now batch-aware — rejects duplicate child keys and walks each child declared deps across BOTH the unpersisted batch and existing jobs, rejecting child-self, child-parent and sibling cycles. Tests: +2 budget-authz (403 on all 5 verbs), +3 accrual (idempotent runId, ship accrues insights.costUsd once, flag-off no accrual), +5 cycle detection. Gates green: tsc/build, fleet+items (204), full suite (only the unrelated single-fork migration-isolation file flakes, passes isolated). Flags stay default-OFF. Generated with [Devin](https://cli.devin.ai/docs) Co-Authored-By: Devin <158243242+devin-ai-integration[bot]@users.noreply.github.com>
This commit is contained in:
parent
c1aad8a819
commit
1503ef2e19
@ -1,67 +0,0 @@
|
|||||||
{
|
|
||||||
"platform-events": [
|
|
||||||
{
|
|
||||||
"id": "5be64f54-5c39-4f0f-af7c-ac7069121a11",
|
|
||||||
"queueName": "platform-events",
|
|
||||||
"type": "user.created",
|
|
||||||
"payload": {
|
|
||||||
"event": {
|
|
||||||
"id": "37c00297-57b4-4024-a946-82a2067f350b",
|
|
||||||
"type": "user.created",
|
|
||||||
"payload": {
|
|
||||||
"userId": "usr_8f2f412f-69f1-4b5f-949c-d40e175b7a93",
|
|
||||||
"email": "test@chronomind.app",
|
|
||||||
"plan": "free",
|
|
||||||
"productId": "chronomind"
|
|
||||||
},
|
|
||||||
"timestamp": "2026-05-30T06:45:48.670Z",
|
|
||||||
"source": "auth/register"
|
|
||||||
}
|
|
||||||
},
|
|
||||||
"status": "succeeded",
|
|
||||||
"attempts": 1,
|
|
||||||
"maxAttempts": 3,
|
|
||||||
"createdAt": "2026-05-30T06:45:48.671Z",
|
|
||||||
"scheduledAt": "2026-05-30T06:45:48.671Z",
|
|
||||||
"metadata": {
|
|
||||||
"source": "auth/register"
|
|
||||||
},
|
|
||||||
"idempotencyKey": "37c00297-57b4-4024-a946-82a2067f350b",
|
|
||||||
"productId": "chronomind",
|
|
||||||
"startedAt": "2026-05-30T06:45:48.759Z",
|
|
||||||
"completedAt": "2026-05-30T06:45:51.756Z"
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"id": "7438ea3f-fc58-4d04-b58b-628dec76f1ce",
|
|
||||||
"queueName": "platform-events",
|
|
||||||
"type": "user.email_verification_requested",
|
|
||||||
"payload": {
|
|
||||||
"event": {
|
|
||||||
"id": "84156892-12f7-448d-9579-7f759af83bbf",
|
|
||||||
"type": "user.email_verification_requested",
|
|
||||||
"payload": {
|
|
||||||
"userId": "usr_8f2f412f-69f1-4b5f-949c-d40e175b7a93",
|
|
||||||
"email": "test@chronomind.app",
|
|
||||||
"verificationToken": "4d96e404-8c86-443c-bb95-cf68b7cc194c",
|
|
||||||
"displayName": "Test User",
|
|
||||||
"productId": "chronomind"
|
|
||||||
},
|
|
||||||
"timestamp": "2026-05-30T06:45:48.822Z",
|
|
||||||
"source": "auth/register"
|
|
||||||
}
|
|
||||||
},
|
|
||||||
"status": "succeeded",
|
|
||||||
"attempts": 1,
|
|
||||||
"maxAttempts": 3,
|
|
||||||
"createdAt": "2026-05-30T06:45:48.828Z",
|
|
||||||
"scheduledAt": "2026-05-30T06:45:48.828Z",
|
|
||||||
"metadata": {
|
|
||||||
"source": "auth/register"
|
|
||||||
},
|
|
||||||
"idempotencyKey": "84156892-12f7-448d-9579-7f759af83bbf",
|
|
||||||
"productId": "chronomind",
|
|
||||||
"startedAt": "2026-05-30T06:45:51.771Z",
|
|
||||||
"completedAt": "2026-05-30T06:45:54.163Z"
|
|
||||||
}
|
|
||||||
]
|
|
||||||
}
|
|
||||||
3
services/platform-service/.gitignore
vendored
3
services/platform-service/.gitignore
vendored
@ -1,2 +1,5 @@
|
|||||||
node_modules/
|
node_modules/
|
||||||
dist/
|
dist/
|
||||||
|
|
||||||
|
# Local datastore (memory provider event log) — never commit
|
||||||
|
.data/
|
||||||
|
|||||||
@ -1189,3 +1189,109 @@ describe('fleet coordinator — Phase 3 per-product budgets', () => {
|
|||||||
if (!res.ok) expect(res.reason).toBe('invalid_state');
|
if (!res.ok) expect(res.reason).toBe('invalid_state');
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
|
// ── Phase 3 fix-ups: idempotent accrual + ship-wiring + batch-aware cycle detection ──
|
||||||
|
describe('fleet coordinator — budget accrual idempotency + ship wiring', () => {
|
||||||
|
beforeEach(() => setProvider(new MemoryDatastoreProvider()));
|
||||||
|
afterEach(() => {
|
||||||
|
_resetDatastoreProvider();
|
||||||
|
delete process.env.FLEET_BUDGETS;
|
||||||
|
});
|
||||||
|
|
||||||
|
it('accrueSpend is idempotent per runId (same run never double-counts)', async () => {
|
||||||
|
await coord.upsertBudget(PID, 100, 'monthly');
|
||||||
|
await coord.accrueSpend(PID, 5, 'run-x');
|
||||||
|
await coord.accrueSpend(PID, 5, 'run-x'); // duplicate run — no-op
|
||||||
|
expect((await coord.getBudget(PID))?.spentUsd).toBe(5);
|
||||||
|
await coord.accrueSpend(PID, 5, 'run-y'); // distinct run — accrues
|
||||||
|
expect((await coord.getBudget(PID))?.spentUsd).toBe(10);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("shipping a job accrues the run's insights.costUsd exactly once (FLEET_BUDGETS on)", async () => {
|
||||||
|
process.env.FLEET_BUDGETS = '1';
|
||||||
|
await coord.upsertBudget(PID, 100, 'monthly');
|
||||||
|
await coord.submitJob(PID, input({ idempotencyKey: 'ship-1' }));
|
||||||
|
const claim = await coord.claimNextJob(factory());
|
||||||
|
const claimed = claim!.job;
|
||||||
|
// factory reports the run's actual cost (what costBurndown + accrual read)
|
||||||
|
const runs = await repo.listRunsByJob(claimed.id);
|
||||||
|
await repo.updateRun(runs[0]!.id, claimed.id, { insights: { costUsd: 7 } });
|
||||||
|
// ship twice with the same lease epoch — must accrue only once
|
||||||
|
await coord.patchJobFenced(claimed.id, PID, {
|
||||||
|
leaseEpoch: claimed.leaseEpoch,
|
||||||
|
stage: 'shipped',
|
||||||
|
});
|
||||||
|
await coord.patchJobFenced(claimed.id, PID, {
|
||||||
|
leaseEpoch: claimed.leaseEpoch,
|
||||||
|
stage: 'shipped',
|
||||||
|
});
|
||||||
|
expect((await coord.getBudget(PID))?.spentUsd).toBe(7);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('shipping does NOT accrue when FLEET_BUDGETS is off', async () => {
|
||||||
|
await coord.upsertBudget(PID, 100, 'monthly');
|
||||||
|
await coord.submitJob(PID, input({ idempotencyKey: 'ship-off' }));
|
||||||
|
const claim = await coord.claimNextJob(factory());
|
||||||
|
const claimed = claim!.job;
|
||||||
|
const runs = await repo.listRunsByJob(claimed.id);
|
||||||
|
await repo.updateRun(runs[0]!.id, claimed.id, { insights: { costUsd: 7 } });
|
||||||
|
await coord.patchJobFenced(claimed.id, PID, {
|
||||||
|
leaseEpoch: claimed.leaseEpoch,
|
||||||
|
stage: 'shipped',
|
||||||
|
});
|
||||||
|
expect((await coord.getBudget(PID))?.spentUsd).toBe(0);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
describe('fleet coordinator — DAG submitChildren cycle detection', () => {
|
||||||
|
beforeEach(() => setProvider(new MemoryDatastoreProvider()));
|
||||||
|
afterEach(() => _resetDatastoreProvider());
|
||||||
|
|
||||||
|
it('rejects a child that depends on itself', async () => {
|
||||||
|
const { job: parent } = await coord.submitJob(PID, input({ idempotencyKey: 'p-self' }));
|
||||||
|
await expect(
|
||||||
|
coord.submitChildren(parent.id, PID, [
|
||||||
|
{ idempotencyKey: 'c-self', bodyMd: '# c', deps: ['c-self'] },
|
||||||
|
])
|
||||||
|
).rejects.toThrow(BadRequestError);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('rejects a child that depends on the parent (child-parent cycle)', async () => {
|
||||||
|
const { job: parent } = await coord.submitJob(PID, input({ idempotencyKey: 'p-back' }));
|
||||||
|
await expect(
|
||||||
|
coord.submitChildren(parent.id, PID, [
|
||||||
|
{ idempotencyKey: 'c-back', bodyMd: '# c', deps: ['p-back'] },
|
||||||
|
])
|
||||||
|
).rejects.toThrow(BadRequestError);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('rejects mutually-dependent siblings (child-sibling cycle)', async () => {
|
||||||
|
const { job: parent } = await coord.submitJob(PID, input({ idempotencyKey: 'p-sib' }));
|
||||||
|
await expect(
|
||||||
|
coord.submitChildren(parent.id, PID, [
|
||||||
|
{ idempotencyKey: 'c-a', bodyMd: '# a', deps: ['c-b'] },
|
||||||
|
{ idempotencyKey: 'c-b', bodyMd: '# b', deps: ['c-a'] },
|
||||||
|
])
|
||||||
|
).rejects.toThrow(BadRequestError);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('rejects duplicate child idempotency-keys in one batch', async () => {
|
||||||
|
const { job: parent } = await coord.submitJob(PID, input({ idempotencyKey: 'p-dup' }));
|
||||||
|
await expect(
|
||||||
|
coord.submitChildren(parent.id, PID, [
|
||||||
|
{ idempotencyKey: 'dup', bodyMd: '# a' },
|
||||||
|
{ idempotencyKey: 'dup', bodyMd: '# b' },
|
||||||
|
])
|
||||||
|
).rejects.toThrow(BadRequestError);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('accepts a valid acyclic fan-out (sibling dependency, no cycle)', async () => {
|
||||||
|
const { job: parent } = await coord.submitJob(PID, input({ idempotencyKey: 'p-ok' }));
|
||||||
|
const result = await coord.submitChildren(parent.id, PID, [
|
||||||
|
{ idempotencyKey: 'step-1', bodyMd: '# 1' },
|
||||||
|
{ idempotencyKey: 'step-2', bodyMd: '# 2', deps: ['step-1'] },
|
||||||
|
]);
|
||||||
|
expect(result.childJobs).toHaveLength(2);
|
||||||
|
expect(result.parent.stage).toBe('blocked');
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|||||||
@ -638,6 +638,25 @@ export async function patchJobFenced(
|
|||||||
await maybeUnblockParent(job.parentId, productId);
|
await maybeUnblockParent(job.parentId, productId);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Phase 3 budgets: accrue spend when a job ships (terminal success).
|
||||||
|
// Flag-gated + best-effort — an accrual failure NEVER fails the transition.
|
||||||
|
// Idempotent per run via `<jobId>:<leaseEpoch>` so duplicate ship transitions do
|
||||||
|
// not double-count. Accrue the run's ACTUAL cost (insights.costUsd) so spentUsd
|
||||||
|
// and the costBurndown (which also reads insights.costUsd) agree.
|
||||||
|
if (patch.stage === 'shipped' && isBudgetsEnabled()) {
|
||||||
|
try {
|
||||||
|
const runId = `${jobId}:${job.leaseEpoch}`;
|
||||||
|
const runs = await repo.listRunsByJob(jobId);
|
||||||
|
const latest = runs.reduce<FleetRunDoc | undefined>(
|
||||||
|
(acc, r) => (!acc || r.attempt > acc.attempt ? r : acc),
|
||||||
|
undefined
|
||||||
|
);
|
||||||
|
await accrueSpend(productId, latest?.insights?.costUsd ?? 0, runId);
|
||||||
|
} catch {
|
||||||
|
// swallow — budget accounting is downstream of the job lifecycle
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return { ok: true, doc: res.doc };
|
return { ok: true, doc: res.doc };
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -688,11 +707,38 @@ export async function submitChildren(
|
|||||||
const parent = await repo.getJob(parentId, productId);
|
const parent = await repo.getJob(parentId, productId);
|
||||||
if (!parent) throw new BadRequestError('Parent job not found');
|
if (!parent) throw new BadRequestError('Parent job not found');
|
||||||
|
|
||||||
// Cycle check: children depend on parent implicitly; ensure no back-edge
|
// Cycle check (batch-aware): after submit the parent depends on EVERY child key,
|
||||||
|
// so a cycle exists if any child — through its own declared deps, resolved across
|
||||||
|
// both this unpersisted batch and existing jobs — can reach the parent or itself.
|
||||||
|
// `wouldCreateCycle` alone only walks persisted jobs, so it misses sibling/declared
|
||||||
|
// edges within the batch; this resolver covers both.
|
||||||
const childKeys = children.map(c => c.idempotencyKey);
|
const childKeys = children.map(c => c.idempotencyKey);
|
||||||
|
const seenKeys = new Set<string>();
|
||||||
for (const ck of childKeys) {
|
for (const ck of childKeys) {
|
||||||
if (await wouldCreateCycle(productId, ck, [parent.idempotencyKey])) {
|
if (seenKeys.has(ck)) throw new BadRequestError(`duplicate child idempotency-key '${ck}'`);
|
||||||
throw new BadRequestError(`dependency cycle detected for child '${ck}'`);
|
seenKeys.add(ck);
|
||||||
|
}
|
||||||
|
const batchDeps = new Map<string, string[]>();
|
||||||
|
for (const c of children) batchDeps.set(c.idempotencyKey, c.deps ?? []);
|
||||||
|
const depsOf = async (key: string): Promise<string[]> => {
|
||||||
|
if (batchDeps.has(key)) return batchDeps.get(key) ?? [];
|
||||||
|
const matches = await repo.findJobsByIdempotencyKey(productId, key);
|
||||||
|
return matches.flatMap(m => m.deps);
|
||||||
|
};
|
||||||
|
for (const ck of childKeys) {
|
||||||
|
const visited = new Set<string>();
|
||||||
|
let frontier = await depsOf(ck);
|
||||||
|
while (frontier.length > 0) {
|
||||||
|
const nextFrontier: string[] = [];
|
||||||
|
for (const k of frontier) {
|
||||||
|
if (k === parent.idempotencyKey || k === ck) {
|
||||||
|
throw new BadRequestError(`dependency cycle detected for child '${ck}'`);
|
||||||
|
}
|
||||||
|
if (visited.has(k)) continue;
|
||||||
|
visited.add(k);
|
||||||
|
nextFrontier.push(...(await depsOf(k)));
|
||||||
|
}
|
||||||
|
frontier = nextFrontier;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1255,6 +1301,7 @@ export async function upsertBudget(
|
|||||||
spentUsd: existing?.spentUsd ?? 0,
|
spentUsd: existing?.spentUsd ?? 0,
|
||||||
status: existing?.status ?? 'active',
|
status: existing?.status ?? 'active',
|
||||||
windowStart: existing?.windowStart ?? now,
|
windowStart: existing?.windowStart ?? now,
|
||||||
|
accruedRunIds: existing?.accruedRunIds ?? [],
|
||||||
updatedAt: now,
|
updatedAt: now,
|
||||||
};
|
};
|
||||||
return repo.upsertBudget(doc);
|
return repo.upsertBudget(doc);
|
||||||
@ -1271,22 +1318,34 @@ export async function resumeBudget(productId: string): Promise<FleetBudgetDoc |
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Accrue spend for a product. Called after a run completes with cost data.
|
* Accrue spend for a product after a run completes.
|
||||||
* Idempotent per run: uses the run's costUsd from insights.
|
*
|
||||||
|
* Idempotent per run: `runId` identifies the completed run (e.g. `<jobId>:<leaseEpoch>`).
|
||||||
|
* If that runId was already accrued, this is a no-op — re-delivering the same completion
|
||||||
|
* (retries, duplicate transitions) never double-counts. Spend is monotonic. Auto-pauses
|
||||||
|
* the product when the ceiling is reached.
|
||||||
*/
|
*/
|
||||||
export async function accrueSpend(
|
export async function accrueSpend(
|
||||||
productId: string,
|
productId: string,
|
||||||
costUsd: number
|
costUsd: number,
|
||||||
|
runId?: string
|
||||||
): Promise<FleetBudgetDoc | null> {
|
): Promise<FleetBudgetDoc | null> {
|
||||||
if (costUsd <= 0) return repo.getBudget(productId);
|
|
||||||
const budget = await repo.getBudget(productId);
|
const budget = await repo.getBudget(productId);
|
||||||
if (!budget) return null;
|
if (!budget) return null;
|
||||||
const newSpent = budget.spentUsd + costUsd;
|
// Idempotency guard: skip if this run was already accrued (retries / duplicate
|
||||||
const updates: Partial<FleetBudgetDoc> = { spentUsd: newSpent };
|
// ship transitions never double-count; spend stays monotonic).
|
||||||
// Auto-pause when ceiling exceeded
|
if (runId && (budget.accruedRunIds ?? []).includes(runId)) return budget;
|
||||||
if (newSpent >= budget.ceilingUsd && budget.status === 'active') {
|
const updates: Partial<FleetBudgetDoc> = {};
|
||||||
updates.status = 'paused';
|
if (runId) updates.accruedRunIds = [...(budget.accruedRunIds ?? []), runId];
|
||||||
|
if (costUsd > 0) {
|
||||||
|
const newSpent = budget.spentUsd + costUsd;
|
||||||
|
updates.spentUsd = newSpent;
|
||||||
|
// Auto-pause when ceiling exceeded
|
||||||
|
if (newSpent >= budget.ceilingUsd && budget.status === 'active') {
|
||||||
|
updates.status = 'paused';
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
if (Object.keys(updates).length === 0) return budget;
|
||||||
return repo.updateBudget(productId, updates);
|
return repo.updateBudget(productId, updates);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -417,4 +417,42 @@ describe('fleetRoutes', () => {
|
|||||||
expect(body.days).toHaveLength(7);
|
expect(body.days).toHaveLength(7);
|
||||||
expect(body.totalUsd).toBe(0);
|
expect(body.totalUsd).toBe(0);
|
||||||
});
|
});
|
||||||
|
|
||||||
|
// ── Phase 3 budgets: caller-product scoping (getRequestProductId mocked to 'lysnrai') ──
|
||||||
|
it("budget routes work for the caller's own product", async () => {
|
||||||
|
const app = await buildApp();
|
||||||
|
const put = await app.inject({
|
||||||
|
method: 'PUT',
|
||||||
|
url: '/api/fleet/budgets/lysnrai',
|
||||||
|
payload: { ceilingUsd: 100, window: 'monthly' },
|
||||||
|
});
|
||||||
|
expect(put.statusCode).toBe(200);
|
||||||
|
const get = await app.inject({ method: 'GET', url: '/api/fleet/budgets/lysnrai' });
|
||||||
|
expect(get.statusCode).toBe(200);
|
||||||
|
expect(JSON.parse(get.body).productId).toBe('lysnrai');
|
||||||
|
});
|
||||||
|
|
||||||
|
it('budget routes reject a foreign productId with 403 (all 5 verbs, no cross-product access)', async () => {
|
||||||
|
const app = await buildApp();
|
||||||
|
const get = await app.inject({ method: 'GET', url: '/api/fleet/budgets/chronomind' });
|
||||||
|
expect(get.statusCode).toBe(403);
|
||||||
|
const burndown = await app.inject({
|
||||||
|
method: 'GET',
|
||||||
|
url: '/api/fleet/budgets/chronomind/burndown',
|
||||||
|
});
|
||||||
|
expect(burndown.statusCode).toBe(403);
|
||||||
|
const put = await app.inject({
|
||||||
|
method: 'PUT',
|
||||||
|
url: '/api/fleet/budgets/chronomind',
|
||||||
|
payload: { ceilingUsd: 999, window: 'monthly' },
|
||||||
|
});
|
||||||
|
expect(put.statusCode).toBe(403);
|
||||||
|
const pause = await app.inject({ method: 'POST', url: '/api/fleet/budgets/chronomind/pause' });
|
||||||
|
expect(pause.statusCode).toBe(403);
|
||||||
|
const resume = await app.inject({
|
||||||
|
method: 'POST',
|
||||||
|
url: '/api/fleet/budgets/chronomind/resume',
|
||||||
|
});
|
||||||
|
expect(resume.statusCode).toBe(403);
|
||||||
|
});
|
||||||
});
|
});
|
||||||
|
|||||||
@ -25,7 +25,7 @@
|
|||||||
|
|
||||||
import type { FastifyInstance } from 'fastify';
|
import type { FastifyInstance } from 'fastify';
|
||||||
import { getRequestProductId } from '../../lib/request-context.js';
|
import { getRequestProductId } from '../../lib/request-context.js';
|
||||||
import { BadRequestError, ConflictError, NotFoundError } from '../../lib/errors.js';
|
import { BadRequestError, ConflictError, ForbiddenError, NotFoundError } from '../../lib/errors.js';
|
||||||
import { extractAuth } from '../../lib/auth.js';
|
import { extractAuth } from '../../lib/auth.js';
|
||||||
import * as repo from './repository.js';
|
import * as repo from './repository.js';
|
||||||
import * as coordinator from './coordinator.js';
|
import * as coordinator from './coordinator.js';
|
||||||
@ -55,6 +55,20 @@ function badRequest(issues: { message: string }[]): never {
|
|||||||
throw new BadRequestError(issues.map(i => i.message).join('; '));
|
throw new BadRequestError(issues.map(i => i.message).join('; '));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Resolve the caller's productId and enforce that any `:productId` path param
|
||||||
|
* matches it. Prevents a caller authenticated for product A from reading or
|
||||||
|
* mutating another product's fleet budget. Returns the caller's productId.
|
||||||
|
*/
|
||||||
|
function requireOwnProduct(req: import('fastify').FastifyRequest): string {
|
||||||
|
const pid = getRequestProductId(req);
|
||||||
|
const { productId } = req.params as { productId?: string };
|
||||||
|
if (productId && productId !== pid) {
|
||||||
|
throw new ForbiddenError("Cannot access another product's fleet budget");
|
||||||
|
}
|
||||||
|
return pid;
|
||||||
|
}
|
||||||
|
|
||||||
const delay = (ms: number): Promise<void> => new Promise(resolve => setTimeout(resolve, ms));
|
const delay = (ms: number): Promise<void> => new Promise(resolve => setTimeout(resolve, ms));
|
||||||
|
|
||||||
/** Parse an integer query param, clamping to [min, max]; fall back to `fallback`. */
|
/** Parse an integer query param, clamping to [min, max]; fall back to `fallback`. */
|
||||||
@ -496,7 +510,7 @@ export async function fleetRoutes(app: FastifyInstance) {
|
|||||||
// ── Phase 3 Budgets: GET/PUT per-product budget ──
|
// ── Phase 3 Budgets: GET/PUT per-product budget ──
|
||||||
app.get('/fleet/budgets/:productId', async req => {
|
app.get('/fleet/budgets/:productId', async req => {
|
||||||
await extractAuth(req);
|
await extractAuth(req);
|
||||||
const { productId } = req.params as { productId: string };
|
const productId = requireOwnProduct(req);
|
||||||
const budget = await coordinator.getBudget(productId);
|
const budget = await coordinator.getBudget(productId);
|
||||||
if (!budget) throw new NotFoundError('No budget configured for this product');
|
if (!budget) throw new NotFoundError('No budget configured for this product');
|
||||||
return budget;
|
return budget;
|
||||||
@ -505,7 +519,7 @@ export async function fleetRoutes(app: FastifyInstance) {
|
|||||||
// ── Cost burndown — spend-over-time vs ceiling (§14) ──
|
// ── Cost burndown — spend-over-time vs ceiling (§14) ──
|
||||||
app.get('/fleet/budgets/:productId/burndown', async req => {
|
app.get('/fleet/budgets/:productId/burndown', async req => {
|
||||||
await extractAuth(req);
|
await extractAuth(req);
|
||||||
const { productId } = req.params as { productId: string };
|
const productId = requireOwnProduct(req);
|
||||||
const { days } = req.query as { days?: string };
|
const { days } = req.query as { days?: string };
|
||||||
const parsedDays = days ? Number.parseInt(days, 10) : undefined;
|
const parsedDays = days ? Number.parseInt(days, 10) : undefined;
|
||||||
return coordinator.costBurndown(
|
return coordinator.costBurndown(
|
||||||
@ -516,7 +530,7 @@ export async function fleetRoutes(app: FastifyInstance) {
|
|||||||
|
|
||||||
app.put('/fleet/budgets/:productId', async (req, reply) => {
|
app.put('/fleet/budgets/:productId', async (req, reply) => {
|
||||||
await extractAuth(req);
|
await extractAuth(req);
|
||||||
const { productId } = req.params as { productId: string };
|
const productId = requireOwnProduct(req);
|
||||||
const parsed = UpsertBudgetSchema.safeParse(req.body);
|
const parsed = UpsertBudgetSchema.safeParse(req.body);
|
||||||
if (!parsed.success) badRequest(parsed.error.issues);
|
if (!parsed.success) badRequest(parsed.error.issues);
|
||||||
const budget = await coordinator.upsertBudget(
|
const budget = await coordinator.upsertBudget(
|
||||||
@ -530,7 +544,7 @@ export async function fleetRoutes(app: FastifyInstance) {
|
|||||||
|
|
||||||
app.post('/fleet/budgets/:productId/pause', async req => {
|
app.post('/fleet/budgets/:productId/pause', async req => {
|
||||||
await extractAuth(req);
|
await extractAuth(req);
|
||||||
const { productId } = req.params as { productId: string };
|
const productId = requireOwnProduct(req);
|
||||||
const budget = await coordinator.pauseBudget(productId);
|
const budget = await coordinator.pauseBudget(productId);
|
||||||
if (!budget) throw new NotFoundError('No budget configured for this product');
|
if (!budget) throw new NotFoundError('No budget configured for this product');
|
||||||
return budget;
|
return budget;
|
||||||
@ -538,7 +552,7 @@ export async function fleetRoutes(app: FastifyInstance) {
|
|||||||
|
|
||||||
app.post('/fleet/budgets/:productId/resume', async req => {
|
app.post('/fleet/budgets/:productId/resume', async req => {
|
||||||
await extractAuth(req);
|
await extractAuth(req);
|
||||||
const { productId } = req.params as { productId: string };
|
const productId = requireOwnProduct(req);
|
||||||
const budget = await coordinator.resumeBudget(productId);
|
const budget = await coordinator.resumeBudget(productId);
|
||||||
if (!budget) throw new NotFoundError('No budget configured for this product');
|
if (!budget) throw new NotFoundError('No budget configured for this product');
|
||||||
return budget;
|
return budget;
|
||||||
|
|||||||
@ -493,6 +493,8 @@ export const FleetBudgetDocSchema = z.object({
|
|||||||
spentUsd: z.number().nonnegative().default(0),
|
spentUsd: z.number().nonnegative().default(0),
|
||||||
status: z.enum(BUDGET_STATUSES).default('active'),
|
status: z.enum(BUDGET_STATUSES).default('active'),
|
||||||
windowStart: z.string().optional(),
|
windowStart: z.string().optional(),
|
||||||
|
/** Run identifiers already accrued — makes accrueSpend idempotent per run. */
|
||||||
|
accruedRunIds: z.array(z.string()).default([]),
|
||||||
updatedAt: z.string(),
|
updatedAt: z.string(),
|
||||||
});
|
});
|
||||||
export type FleetBudgetDoc = z.infer<typeof FleetBudgetDocSchema>;
|
export type FleetBudgetDoc = z.infer<typeof FleetBudgetDocSchema>;
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user