Merge fix/fleet-phase3-hardening: budget authz, idempotent accrual, cycle detection, artifact cleanup

This commit is contained in:
saravanakumardb1 2026-05-31 02:46:18 -07:00
commit a528ffea1d
7 changed files with 240 additions and 85 deletions

View File

@ -1,67 +0,0 @@
{
"platform-events": [
{
"id": "5be64f54-5c39-4f0f-af7c-ac7069121a11",
"queueName": "platform-events",
"type": "user.created",
"payload": {
"event": {
"id": "37c00297-57b4-4024-a946-82a2067f350b",
"type": "user.created",
"payload": {
"userId": "usr_8f2f412f-69f1-4b5f-949c-d40e175b7a93",
"email": "test@chronomind.app",
"plan": "free",
"productId": "chronomind"
},
"timestamp": "2026-05-30T06:45:48.670Z",
"source": "auth/register"
}
},
"status": "succeeded",
"attempts": 1,
"maxAttempts": 3,
"createdAt": "2026-05-30T06:45:48.671Z",
"scheduledAt": "2026-05-30T06:45:48.671Z",
"metadata": {
"source": "auth/register"
},
"idempotencyKey": "37c00297-57b4-4024-a946-82a2067f350b",
"productId": "chronomind",
"startedAt": "2026-05-30T06:45:48.759Z",
"completedAt": "2026-05-30T06:45:51.756Z"
},
{
"id": "7438ea3f-fc58-4d04-b58b-628dec76f1ce",
"queueName": "platform-events",
"type": "user.email_verification_requested",
"payload": {
"event": {
"id": "84156892-12f7-448d-9579-7f759af83bbf",
"type": "user.email_verification_requested",
"payload": {
"userId": "usr_8f2f412f-69f1-4b5f-949c-d40e175b7a93",
"email": "test@chronomind.app",
"verificationToken": "4d96e404-8c86-443c-bb95-cf68b7cc194c",
"displayName": "Test User",
"productId": "chronomind"
},
"timestamp": "2026-05-30T06:45:48.822Z",
"source": "auth/register"
}
},
"status": "succeeded",
"attempts": 1,
"maxAttempts": 3,
"createdAt": "2026-05-30T06:45:48.828Z",
"scheduledAt": "2026-05-30T06:45:48.828Z",
"metadata": {
"source": "auth/register"
},
"idempotencyKey": "84156892-12f7-448d-9579-7f759af83bbf",
"productId": "chronomind",
"startedAt": "2026-05-30T06:45:51.771Z",
"completedAt": "2026-05-30T06:45:54.163Z"
}
]
}

View File

@ -1,2 +1,5 @@
node_modules/
dist/
# Local datastore (memory provider event log) — never commit
.data/

View File

@ -1189,3 +1189,109 @@ describe('fleet coordinator — Phase 3 per-product budgets', () => {
if (!res.ok) expect(res.reason).toBe('invalid_state');
});
});
// ── Phase 3 fix-ups: idempotent accrual + ship-wiring + batch-aware cycle detection ──
describe('fleet coordinator — budget accrual idempotency + ship wiring', () => {
beforeEach(() => setProvider(new MemoryDatastoreProvider()));
afterEach(() => {
_resetDatastoreProvider();
delete process.env.FLEET_BUDGETS;
});
it('accrueSpend is idempotent per runId (same run never double-counts)', async () => {
await coord.upsertBudget(PID, 100, 'monthly');
await coord.accrueSpend(PID, 5, 'run-x');
await coord.accrueSpend(PID, 5, 'run-x'); // duplicate run — no-op
expect((await coord.getBudget(PID))?.spentUsd).toBe(5);
await coord.accrueSpend(PID, 5, 'run-y'); // distinct run — accrues
expect((await coord.getBudget(PID))?.spentUsd).toBe(10);
});
it("shipping a job accrues the run's insights.costUsd exactly once (FLEET_BUDGETS on)", async () => {
process.env.FLEET_BUDGETS = '1';
await coord.upsertBudget(PID, 100, 'monthly');
await coord.submitJob(PID, input({ idempotencyKey: 'ship-1' }));
const claim = await coord.claimNextJob(factory());
const claimed = claim!.job;
// factory reports the run's actual cost (what costBurndown + accrual read)
const runs = await repo.listRunsByJob(claimed.id);
await repo.updateRun(runs[0]!.id, claimed.id, { insights: { costUsd: 7 } });
// ship twice with the same lease epoch — must accrue only once
await coord.patchJobFenced(claimed.id, PID, {
leaseEpoch: claimed.leaseEpoch,
stage: 'shipped',
});
await coord.patchJobFenced(claimed.id, PID, {
leaseEpoch: claimed.leaseEpoch,
stage: 'shipped',
});
expect((await coord.getBudget(PID))?.spentUsd).toBe(7);
});
it('shipping does NOT accrue when FLEET_BUDGETS is off', async () => {
await coord.upsertBudget(PID, 100, 'monthly');
await coord.submitJob(PID, input({ idempotencyKey: 'ship-off' }));
const claim = await coord.claimNextJob(factory());
const claimed = claim!.job;
const runs = await repo.listRunsByJob(claimed.id);
await repo.updateRun(runs[0]!.id, claimed.id, { insights: { costUsd: 7 } });
await coord.patchJobFenced(claimed.id, PID, {
leaseEpoch: claimed.leaseEpoch,
stage: 'shipped',
});
expect((await coord.getBudget(PID))?.spentUsd).toBe(0);
});
});
describe('fleet coordinator — DAG submitChildren cycle detection', () => {
beforeEach(() => setProvider(new MemoryDatastoreProvider()));
afterEach(() => _resetDatastoreProvider());
it('rejects a child that depends on itself', async () => {
const { job: parent } = await coord.submitJob(PID, input({ idempotencyKey: 'p-self' }));
await expect(
coord.submitChildren(parent.id, PID, [
{ idempotencyKey: 'c-self', bodyMd: '# c', deps: ['c-self'] },
])
).rejects.toThrow(BadRequestError);
});
it('rejects a child that depends on the parent (child-parent cycle)', async () => {
const { job: parent } = await coord.submitJob(PID, input({ idempotencyKey: 'p-back' }));
await expect(
coord.submitChildren(parent.id, PID, [
{ idempotencyKey: 'c-back', bodyMd: '# c', deps: ['p-back'] },
])
).rejects.toThrow(BadRequestError);
});
it('rejects mutually-dependent siblings (child-sibling cycle)', async () => {
const { job: parent } = await coord.submitJob(PID, input({ idempotencyKey: 'p-sib' }));
await expect(
coord.submitChildren(parent.id, PID, [
{ idempotencyKey: 'c-a', bodyMd: '# a', deps: ['c-b'] },
{ idempotencyKey: 'c-b', bodyMd: '# b', deps: ['c-a'] },
])
).rejects.toThrow(BadRequestError);
});
it('rejects duplicate child idempotency-keys in one batch', async () => {
const { job: parent } = await coord.submitJob(PID, input({ idempotencyKey: 'p-dup' }));
await expect(
coord.submitChildren(parent.id, PID, [
{ idempotencyKey: 'dup', bodyMd: '# a' },
{ idempotencyKey: 'dup', bodyMd: '# b' },
])
).rejects.toThrow(BadRequestError);
});
it('accepts a valid acyclic fan-out (sibling dependency, no cycle)', async () => {
const { job: parent } = await coord.submitJob(PID, input({ idempotencyKey: 'p-ok' }));
const result = await coord.submitChildren(parent.id, PID, [
{ idempotencyKey: 'step-1', bodyMd: '# 1' },
{ idempotencyKey: 'step-2', bodyMd: '# 2', deps: ['step-1'] },
]);
expect(result.childJobs).toHaveLength(2);
expect(result.parent.stage).toBe('blocked');
});
});

View File

@ -638,6 +638,25 @@ export async function patchJobFenced(
await maybeUnblockParent(job.parentId, productId);
}
// Phase 3 budgets: accrue spend when a job ships (terminal success).
// Flag-gated + best-effort — an accrual failure NEVER fails the transition.
// Idempotent per run via `<jobId>:<leaseEpoch>` so duplicate ship transitions do
// not double-count. Accrue the run's ACTUAL cost (insights.costUsd) so spentUsd
// and the costBurndown (which also reads insights.costUsd) agree.
if (patch.stage === 'shipped' && isBudgetsEnabled()) {
try {
const runId = `${jobId}:${job.leaseEpoch}`;
const runs = await repo.listRunsByJob(jobId);
const latest = runs.reduce<FleetRunDoc | undefined>(
(acc, r) => (!acc || r.attempt > acc.attempt ? r : acc),
undefined
);
await accrueSpend(productId, latest?.insights?.costUsd ?? 0, runId);
} catch {
// swallow — budget accounting is downstream of the job lifecycle
}
}
return { ok: true, doc: res.doc };
}
@ -688,11 +707,38 @@ export async function submitChildren(
const parent = await repo.getJob(parentId, productId);
if (!parent) throw new BadRequestError('Parent job not found');
// Cycle check: children depend on parent implicitly; ensure no back-edge
// Cycle check (batch-aware): after submit the parent depends on EVERY child key,
// so a cycle exists if any child — through its own declared deps, resolved across
// both this unpersisted batch and existing jobs — can reach the parent or itself.
// `wouldCreateCycle` alone only walks persisted jobs, so it misses sibling/declared
// edges within the batch; this resolver covers both.
const childKeys = children.map(c => c.idempotencyKey);
const seenKeys = new Set<string>();
for (const ck of childKeys) {
if (await wouldCreateCycle(productId, ck, [parent.idempotencyKey])) {
throw new BadRequestError(`dependency cycle detected for child '${ck}'`);
if (seenKeys.has(ck)) throw new BadRequestError(`duplicate child idempotency-key '${ck}'`);
seenKeys.add(ck);
}
const batchDeps = new Map<string, string[]>();
for (const c of children) batchDeps.set(c.idempotencyKey, c.deps ?? []);
const depsOf = async (key: string): Promise<string[]> => {
if (batchDeps.has(key)) return batchDeps.get(key) ?? [];
const matches = await repo.findJobsByIdempotencyKey(productId, key);
return matches.flatMap(m => m.deps);
};
for (const ck of childKeys) {
const visited = new Set<string>();
let frontier = await depsOf(ck);
while (frontier.length > 0) {
const nextFrontier: string[] = [];
for (const k of frontier) {
if (k === parent.idempotencyKey || k === ck) {
throw new BadRequestError(`dependency cycle detected for child '${ck}'`);
}
if (visited.has(k)) continue;
visited.add(k);
nextFrontier.push(...(await depsOf(k)));
}
frontier = nextFrontier;
}
}
@ -1255,6 +1301,7 @@ export async function upsertBudget(
spentUsd: existing?.spentUsd ?? 0,
status: existing?.status ?? 'active',
windowStart: existing?.windowStart ?? now,
accruedRunIds: existing?.accruedRunIds ?? [],
updatedAt: now,
};
return repo.upsertBudget(doc);
@ -1271,22 +1318,34 @@ export async function resumeBudget(productId: string): Promise<FleetBudgetDoc |
}
/**
* Accrue spend for a product. Called after a run completes with cost data.
* Idempotent per run: uses the run's costUsd from insights.
* Accrue spend for a product after a run completes.
*
* Idempotent per run: `runId` identifies the completed run (e.g. `<jobId>:<leaseEpoch>`).
* If that runId was already accrued, this is a no-op re-delivering the same completion
* (retries, duplicate transitions) never double-counts. Spend is monotonic. Auto-pauses
* the product when the ceiling is reached.
*/
export async function accrueSpend(
productId: string,
costUsd: number
costUsd: number,
runId?: string
): Promise<FleetBudgetDoc | null> {
if (costUsd <= 0) return repo.getBudget(productId);
const budget = await repo.getBudget(productId);
if (!budget) return null;
const newSpent = budget.spentUsd + costUsd;
const updates: Partial<FleetBudgetDoc> = { spentUsd: newSpent };
// Auto-pause when ceiling exceeded
if (newSpent >= budget.ceilingUsd && budget.status === 'active') {
updates.status = 'paused';
// Idempotency guard: skip if this run was already accrued (retries / duplicate
// ship transitions never double-count; spend stays monotonic).
if (runId && (budget.accruedRunIds ?? []).includes(runId)) return budget;
const updates: Partial<FleetBudgetDoc> = {};
if (runId) updates.accruedRunIds = [...(budget.accruedRunIds ?? []), runId];
if (costUsd > 0) {
const newSpent = budget.spentUsd + costUsd;
updates.spentUsd = newSpent;
// Auto-pause when ceiling exceeded
if (newSpent >= budget.ceilingUsd && budget.status === 'active') {
updates.status = 'paused';
}
}
if (Object.keys(updates).length === 0) return budget;
return repo.updateBudget(productId, updates);
}

View File

@ -417,4 +417,42 @@ describe('fleetRoutes', () => {
expect(body.days).toHaveLength(7);
expect(body.totalUsd).toBe(0);
});
// ── Phase 3 budgets: caller-product scoping (getRequestProductId mocked to 'lysnrai') ──
it("budget routes work for the caller's own product", async () => {
const app = await buildApp();
const put = await app.inject({
method: 'PUT',
url: '/api/fleet/budgets/lysnrai',
payload: { ceilingUsd: 100, window: 'monthly' },
});
expect(put.statusCode).toBe(200);
const get = await app.inject({ method: 'GET', url: '/api/fleet/budgets/lysnrai' });
expect(get.statusCode).toBe(200);
expect(JSON.parse(get.body).productId).toBe('lysnrai');
});
it('budget routes reject a foreign productId with 403 (all 5 verbs, no cross-product access)', async () => {
const app = await buildApp();
const get = await app.inject({ method: 'GET', url: '/api/fleet/budgets/chronomind' });
expect(get.statusCode).toBe(403);
const burndown = await app.inject({
method: 'GET',
url: '/api/fleet/budgets/chronomind/burndown',
});
expect(burndown.statusCode).toBe(403);
const put = await app.inject({
method: 'PUT',
url: '/api/fleet/budgets/chronomind',
payload: { ceilingUsd: 999, window: 'monthly' },
});
expect(put.statusCode).toBe(403);
const pause = await app.inject({ method: 'POST', url: '/api/fleet/budgets/chronomind/pause' });
expect(pause.statusCode).toBe(403);
const resume = await app.inject({
method: 'POST',
url: '/api/fleet/budgets/chronomind/resume',
});
expect(resume.statusCode).toBe(403);
});
});

View File

@ -25,7 +25,7 @@
import type { FastifyInstance } from 'fastify';
import { getRequestProductId } from '../../lib/request-context.js';
import { BadRequestError, ConflictError, NotFoundError } from '../../lib/errors.js';
import { BadRequestError, ConflictError, ForbiddenError, NotFoundError } from '../../lib/errors.js';
import { extractAuth } from '../../lib/auth.js';
import * as repo from './repository.js';
import * as coordinator from './coordinator.js';
@ -55,6 +55,20 @@ function badRequest(issues: { message: string }[]): never {
throw new BadRequestError(issues.map(i => i.message).join('; '));
}
/**
* Resolve the caller's productId and enforce that any `:productId` path param
* matches it. Prevents a caller authenticated for product A from reading or
* mutating another product's fleet budget. Returns the caller's productId.
*/
function requireOwnProduct(req: import('fastify').FastifyRequest): string {
const pid = getRequestProductId(req);
const { productId } = req.params as { productId?: string };
if (productId && productId !== pid) {
throw new ForbiddenError("Cannot access another product's fleet budget");
}
return pid;
}
const delay = (ms: number): Promise<void> => new Promise(resolve => setTimeout(resolve, ms));
/** Parse an integer query param, clamping to [min, max]; fall back to `fallback`. */
@ -496,7 +510,7 @@ export async function fleetRoutes(app: FastifyInstance) {
// ── Phase 3 Budgets: GET/PUT per-product budget ──
app.get('/fleet/budgets/:productId', async req => {
await extractAuth(req);
const { productId } = req.params as { productId: string };
const productId = requireOwnProduct(req);
const budget = await coordinator.getBudget(productId);
if (!budget) throw new NotFoundError('No budget configured for this product');
return budget;
@ -505,7 +519,7 @@ export async function fleetRoutes(app: FastifyInstance) {
// ── Cost burndown — spend-over-time vs ceiling (§14) ──
app.get('/fleet/budgets/:productId/burndown', async req => {
await extractAuth(req);
const { productId } = req.params as { productId: string };
const productId = requireOwnProduct(req);
const { days } = req.query as { days?: string };
const parsedDays = days ? Number.parseInt(days, 10) : undefined;
return coordinator.costBurndown(
@ -516,7 +530,7 @@ export async function fleetRoutes(app: FastifyInstance) {
app.put('/fleet/budgets/:productId', async (req, reply) => {
await extractAuth(req);
const { productId } = req.params as { productId: string };
const productId = requireOwnProduct(req);
const parsed = UpsertBudgetSchema.safeParse(req.body);
if (!parsed.success) badRequest(parsed.error.issues);
const budget = await coordinator.upsertBudget(
@ -530,7 +544,7 @@ export async function fleetRoutes(app: FastifyInstance) {
app.post('/fleet/budgets/:productId/pause', async req => {
await extractAuth(req);
const { productId } = req.params as { productId: string };
const productId = requireOwnProduct(req);
const budget = await coordinator.pauseBudget(productId);
if (!budget) throw new NotFoundError('No budget configured for this product');
return budget;
@ -538,7 +552,7 @@ export async function fleetRoutes(app: FastifyInstance) {
app.post('/fleet/budgets/:productId/resume', async req => {
await extractAuth(req);
const { productId } = req.params as { productId: string };
const productId = requireOwnProduct(req);
const budget = await coordinator.resumeBudget(productId);
if (!budget) throw new NotFoundError('No budget configured for this product');
return budget;

View File

@ -493,6 +493,8 @@ export const FleetBudgetDocSchema = z.object({
spentUsd: z.number().nonnegative().default(0),
status: z.enum(BUDGET_STATUSES).default('active'),
windowStart: z.string().optional(),
/** Run identifiers already accrued — makes accrueSpend idempotent per run. */
accruedRunIds: z.array(z.string()).default([]),
updatedAt: z.string(),
});
export type FleetBudgetDoc = z.infer<typeof FleetBudgetDocSchema>;