From 1503ef2e196ba85d7795f35d6d8c5656ae6a3449 Mon Sep 17 00:00:00 2001 From: saravanakumardb1 Date: Sun, 31 May 2026 02:45:52 -0700 Subject: [PATCH] =?UTF-8?q?fix(fleet):=20Phase=203=20hardening=20=E2=80=94?= =?UTF-8?q?=20budget=20authz,=20idempotent=20accrual,=20cycle=20detection,?= =?UTF-8?q?=20artifact?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Re-applies 4 defects merged with Phase 3 (still present on main), ported from the stale reference branch and adapted to current main. FIX 1 (BLOCKER): all 5 budget routes (GET/burndown/PUT/pause/resume) read productId from the URL with no caller check, so a caller for product A could read or modify product B budget. Add requireOwnProduct() -> 403 on mismatch. FIX 2: stop tracking services/platform-service/.data/platform-events.json (the EVENT_BUS_FILE runtime log); gitignore .data/. FIX 3: accrueSpend was definition-only (budgets never accrued) and not idempotent. Add accruedRunIds to FleetBudgetDoc; accrueSpend(productId, costUsd, runId) no-ops on a seen runId; wire it into patchJobFenced on stage shipped, flag-gated + best-effort + idempotent via jobId:leaseEpoch. Accrue the run ACTUAL insights.costUsd so spentUsd and costBurndown agree. FIX 4: submitChildren cycle detection is now batch-aware — rejects duplicate child keys and walks each child declared deps across BOTH the unpersisted batch and existing jobs, rejecting child-self, child-parent and sibling cycles. Tests: +2 budget-authz (403 on all 5 verbs), +3 accrual (idempotent runId, ship accrues insights.costUsd once, flag-off no accrual), +5 cycle detection. Gates green: tsc/build, fleet+items (204), full suite (only the unrelated single-fork migration-isolation file flakes, passes isolated). Flags stay default-OFF. Generated with [Devin](https://cli.devin.ai/docs) Co-Authored-By: Devin <158243242+devin-ai-integration[bot]@users.noreply.github.com> --- .../.data/platform-events.json | 67 ----------- services/platform-service/.gitignore | 3 + .../src/modules/fleet/coordinator.test.ts | 106 ++++++++++++++++++ .../src/modules/fleet/coordinator.ts | 83 ++++++++++++-- .../src/modules/fleet/routes.test.ts | 38 +++++++ .../src/modules/fleet/routes.ts | 26 ++++- .../src/modules/fleet/types.ts | 2 + 7 files changed, 240 insertions(+), 85 deletions(-) delete mode 100644 services/platform-service/.data/platform-events.json diff --git a/services/platform-service/.data/platform-events.json b/services/platform-service/.data/platform-events.json deleted file mode 100644 index 637c238d..00000000 --- a/services/platform-service/.data/platform-events.json +++ /dev/null @@ -1,67 +0,0 @@ -{ - "platform-events": [ - { - "id": "5be64f54-5c39-4f0f-af7c-ac7069121a11", - "queueName": "platform-events", - "type": "user.created", - "payload": { - "event": { - "id": "37c00297-57b4-4024-a946-82a2067f350b", - "type": "user.created", - "payload": { - "userId": "usr_8f2f412f-69f1-4b5f-949c-d40e175b7a93", - "email": "test@chronomind.app", - "plan": "free", - "productId": "chronomind" - }, - "timestamp": "2026-05-30T06:45:48.670Z", - "source": "auth/register" - } - }, - "status": "succeeded", - "attempts": 1, - "maxAttempts": 3, - "createdAt": "2026-05-30T06:45:48.671Z", - "scheduledAt": "2026-05-30T06:45:48.671Z", - "metadata": { - "source": "auth/register" - }, - "idempotencyKey": "37c00297-57b4-4024-a946-82a2067f350b", - "productId": "chronomind", - "startedAt": "2026-05-30T06:45:48.759Z", - "completedAt": "2026-05-30T06:45:51.756Z" - }, - { - "id": "7438ea3f-fc58-4d04-b58b-628dec76f1ce", - "queueName": "platform-events", - "type": "user.email_verification_requested", - "payload": { - "event": { - "id": "84156892-12f7-448d-9579-7f759af83bbf", - "type": "user.email_verification_requested", - "payload": { - "userId": "usr_8f2f412f-69f1-4b5f-949c-d40e175b7a93", - "email": "test@chronomind.app", - "verificationToken": "4d96e404-8c86-443c-bb95-cf68b7cc194c", - "displayName": "Test User", - "productId": "chronomind" - }, - "timestamp": "2026-05-30T06:45:48.822Z", - "source": "auth/register" - } - }, - "status": "succeeded", - "attempts": 1, - "maxAttempts": 3, - "createdAt": "2026-05-30T06:45:48.828Z", - "scheduledAt": "2026-05-30T06:45:48.828Z", - "metadata": { - "source": "auth/register" - }, - "idempotencyKey": "84156892-12f7-448d-9579-7f759af83bbf", - "productId": "chronomind", - "startedAt": "2026-05-30T06:45:51.771Z", - "completedAt": "2026-05-30T06:45:54.163Z" - } - ] -} diff --git a/services/platform-service/.gitignore b/services/platform-service/.gitignore index b9470778..2f881063 100644 --- a/services/platform-service/.gitignore +++ b/services/platform-service/.gitignore @@ -1,2 +1,5 @@ node_modules/ dist/ + +# Local datastore (memory provider event log) — never commit +.data/ diff --git a/services/platform-service/src/modules/fleet/coordinator.test.ts b/services/platform-service/src/modules/fleet/coordinator.test.ts index 4976e739..41a7e7d6 100644 --- a/services/platform-service/src/modules/fleet/coordinator.test.ts +++ b/services/platform-service/src/modules/fleet/coordinator.test.ts @@ -1189,3 +1189,109 @@ describe('fleet coordinator — Phase 3 per-product budgets', () => { if (!res.ok) expect(res.reason).toBe('invalid_state'); }); }); + +// ── Phase 3 fix-ups: idempotent accrual + ship-wiring + batch-aware cycle detection ── +describe('fleet coordinator — budget accrual idempotency + ship wiring', () => { + beforeEach(() => setProvider(new MemoryDatastoreProvider())); + afterEach(() => { + _resetDatastoreProvider(); + delete process.env.FLEET_BUDGETS; + }); + + it('accrueSpend is idempotent per runId (same run never double-counts)', async () => { + await coord.upsertBudget(PID, 100, 'monthly'); + await coord.accrueSpend(PID, 5, 'run-x'); + await coord.accrueSpend(PID, 5, 'run-x'); // duplicate run — no-op + expect((await coord.getBudget(PID))?.spentUsd).toBe(5); + await coord.accrueSpend(PID, 5, 'run-y'); // distinct run — accrues + expect((await coord.getBudget(PID))?.spentUsd).toBe(10); + }); + + it("shipping a job accrues the run's insights.costUsd exactly once (FLEET_BUDGETS on)", async () => { + process.env.FLEET_BUDGETS = '1'; + await coord.upsertBudget(PID, 100, 'monthly'); + await coord.submitJob(PID, input({ idempotencyKey: 'ship-1' })); + const claim = await coord.claimNextJob(factory()); + const claimed = claim!.job; + // factory reports the run's actual cost (what costBurndown + accrual read) + const runs = await repo.listRunsByJob(claimed.id); + await repo.updateRun(runs[0]!.id, claimed.id, { insights: { costUsd: 7 } }); + // ship twice with the same lease epoch — must accrue only once + await coord.patchJobFenced(claimed.id, PID, { + leaseEpoch: claimed.leaseEpoch, + stage: 'shipped', + }); + await coord.patchJobFenced(claimed.id, PID, { + leaseEpoch: claimed.leaseEpoch, + stage: 'shipped', + }); + expect((await coord.getBudget(PID))?.spentUsd).toBe(7); + }); + + it('shipping does NOT accrue when FLEET_BUDGETS is off', async () => { + await coord.upsertBudget(PID, 100, 'monthly'); + await coord.submitJob(PID, input({ idempotencyKey: 'ship-off' })); + const claim = await coord.claimNextJob(factory()); + const claimed = claim!.job; + const runs = await repo.listRunsByJob(claimed.id); + await repo.updateRun(runs[0]!.id, claimed.id, { insights: { costUsd: 7 } }); + await coord.patchJobFenced(claimed.id, PID, { + leaseEpoch: claimed.leaseEpoch, + stage: 'shipped', + }); + expect((await coord.getBudget(PID))?.spentUsd).toBe(0); + }); +}); + +describe('fleet coordinator — DAG submitChildren cycle detection', () => { + beforeEach(() => setProvider(new MemoryDatastoreProvider())); + afterEach(() => _resetDatastoreProvider()); + + it('rejects a child that depends on itself', async () => { + const { job: parent } = await coord.submitJob(PID, input({ idempotencyKey: 'p-self' })); + await expect( + coord.submitChildren(parent.id, PID, [ + { idempotencyKey: 'c-self', bodyMd: '# c', deps: ['c-self'] }, + ]) + ).rejects.toThrow(BadRequestError); + }); + + it('rejects a child that depends on the parent (child-parent cycle)', async () => { + const { job: parent } = await coord.submitJob(PID, input({ idempotencyKey: 'p-back' })); + await expect( + coord.submitChildren(parent.id, PID, [ + { idempotencyKey: 'c-back', bodyMd: '# c', deps: ['p-back'] }, + ]) + ).rejects.toThrow(BadRequestError); + }); + + it('rejects mutually-dependent siblings (child-sibling cycle)', async () => { + const { job: parent } = await coord.submitJob(PID, input({ idempotencyKey: 'p-sib' })); + await expect( + coord.submitChildren(parent.id, PID, [ + { idempotencyKey: 'c-a', bodyMd: '# a', deps: ['c-b'] }, + { idempotencyKey: 'c-b', bodyMd: '# b', deps: ['c-a'] }, + ]) + ).rejects.toThrow(BadRequestError); + }); + + it('rejects duplicate child idempotency-keys in one batch', async () => { + const { job: parent } = await coord.submitJob(PID, input({ idempotencyKey: 'p-dup' })); + await expect( + coord.submitChildren(parent.id, PID, [ + { idempotencyKey: 'dup', bodyMd: '# a' }, + { idempotencyKey: 'dup', bodyMd: '# b' }, + ]) + ).rejects.toThrow(BadRequestError); + }); + + it('accepts a valid acyclic fan-out (sibling dependency, no cycle)', async () => { + const { job: parent } = await coord.submitJob(PID, input({ idempotencyKey: 'p-ok' })); + const result = await coord.submitChildren(parent.id, PID, [ + { idempotencyKey: 'step-1', bodyMd: '# 1' }, + { idempotencyKey: 'step-2', bodyMd: '# 2', deps: ['step-1'] }, + ]); + expect(result.childJobs).toHaveLength(2); + expect(result.parent.stage).toBe('blocked'); + }); +}); diff --git a/services/platform-service/src/modules/fleet/coordinator.ts b/services/platform-service/src/modules/fleet/coordinator.ts index b72d610c..8adfe0cc 100644 --- a/services/platform-service/src/modules/fleet/coordinator.ts +++ b/services/platform-service/src/modules/fleet/coordinator.ts @@ -638,6 +638,25 @@ export async function patchJobFenced( await maybeUnblockParent(job.parentId, productId); } + // Phase 3 budgets: accrue spend when a job ships (terminal success). + // Flag-gated + best-effort — an accrual failure NEVER fails the transition. + // Idempotent per run via `:` so duplicate ship transitions do + // not double-count. Accrue the run's ACTUAL cost (insights.costUsd) so spentUsd + // and the costBurndown (which also reads insights.costUsd) agree. + if (patch.stage === 'shipped' && isBudgetsEnabled()) { + try { + const runId = `${jobId}:${job.leaseEpoch}`; + const runs = await repo.listRunsByJob(jobId); + const latest = runs.reduce( + (acc, r) => (!acc || r.attempt > acc.attempt ? r : acc), + undefined + ); + await accrueSpend(productId, latest?.insights?.costUsd ?? 0, runId); + } catch { + // swallow — budget accounting is downstream of the job lifecycle + } + } + return { ok: true, doc: res.doc }; } @@ -688,11 +707,38 @@ export async function submitChildren( const parent = await repo.getJob(parentId, productId); if (!parent) throw new BadRequestError('Parent job not found'); - // Cycle check: children depend on parent implicitly; ensure no back-edge + // Cycle check (batch-aware): after submit the parent depends on EVERY child key, + // so a cycle exists if any child — through its own declared deps, resolved across + // both this unpersisted batch and existing jobs — can reach the parent or itself. + // `wouldCreateCycle` alone only walks persisted jobs, so it misses sibling/declared + // edges within the batch; this resolver covers both. const childKeys = children.map(c => c.idempotencyKey); + const seenKeys = new Set(); for (const ck of childKeys) { - if (await wouldCreateCycle(productId, ck, [parent.idempotencyKey])) { - throw new BadRequestError(`dependency cycle detected for child '${ck}'`); + if (seenKeys.has(ck)) throw new BadRequestError(`duplicate child idempotency-key '${ck}'`); + seenKeys.add(ck); + } + const batchDeps = new Map(); + for (const c of children) batchDeps.set(c.idempotencyKey, c.deps ?? []); + const depsOf = async (key: string): Promise => { + if (batchDeps.has(key)) return batchDeps.get(key) ?? []; + const matches = await repo.findJobsByIdempotencyKey(productId, key); + return matches.flatMap(m => m.deps); + }; + for (const ck of childKeys) { + const visited = new Set(); + let frontier = await depsOf(ck); + while (frontier.length > 0) { + const nextFrontier: string[] = []; + for (const k of frontier) { + if (k === parent.idempotencyKey || k === ck) { + throw new BadRequestError(`dependency cycle detected for child '${ck}'`); + } + if (visited.has(k)) continue; + visited.add(k); + nextFrontier.push(...(await depsOf(k))); + } + frontier = nextFrontier; } } @@ -1255,6 +1301,7 @@ export async function upsertBudget( spentUsd: existing?.spentUsd ?? 0, status: existing?.status ?? 'active', windowStart: existing?.windowStart ?? now, + accruedRunIds: existing?.accruedRunIds ?? [], updatedAt: now, }; return repo.upsertBudget(doc); @@ -1271,22 +1318,34 @@ export async function resumeBudget(productId: string): Promise:`). + * If that runId was already accrued, this is a no-op — re-delivering the same completion + * (retries, duplicate transitions) never double-counts. Spend is monotonic. Auto-pauses + * the product when the ceiling is reached. */ export async function accrueSpend( productId: string, - costUsd: number + costUsd: number, + runId?: string ): Promise { - if (costUsd <= 0) return repo.getBudget(productId); const budget = await repo.getBudget(productId); if (!budget) return null; - const newSpent = budget.spentUsd + costUsd; - const updates: Partial = { spentUsd: newSpent }; - // Auto-pause when ceiling exceeded - if (newSpent >= budget.ceilingUsd && budget.status === 'active') { - updates.status = 'paused'; + // Idempotency guard: skip if this run was already accrued (retries / duplicate + // ship transitions never double-count; spend stays monotonic). + if (runId && (budget.accruedRunIds ?? []).includes(runId)) return budget; + const updates: Partial = {}; + if (runId) updates.accruedRunIds = [...(budget.accruedRunIds ?? []), runId]; + if (costUsd > 0) { + const newSpent = budget.spentUsd + costUsd; + updates.spentUsd = newSpent; + // Auto-pause when ceiling exceeded + if (newSpent >= budget.ceilingUsd && budget.status === 'active') { + updates.status = 'paused'; + } } + if (Object.keys(updates).length === 0) return budget; return repo.updateBudget(productId, updates); } diff --git a/services/platform-service/src/modules/fleet/routes.test.ts b/services/platform-service/src/modules/fleet/routes.test.ts index 58dacf4f..a748580f 100644 --- a/services/platform-service/src/modules/fleet/routes.test.ts +++ b/services/platform-service/src/modules/fleet/routes.test.ts @@ -417,4 +417,42 @@ describe('fleetRoutes', () => { expect(body.days).toHaveLength(7); expect(body.totalUsd).toBe(0); }); + + // ── Phase 3 budgets: caller-product scoping (getRequestProductId mocked to 'lysnrai') ── + it("budget routes work for the caller's own product", async () => { + const app = await buildApp(); + const put = await app.inject({ + method: 'PUT', + url: '/api/fleet/budgets/lysnrai', + payload: { ceilingUsd: 100, window: 'monthly' }, + }); + expect(put.statusCode).toBe(200); + const get = await app.inject({ method: 'GET', url: '/api/fleet/budgets/lysnrai' }); + expect(get.statusCode).toBe(200); + expect(JSON.parse(get.body).productId).toBe('lysnrai'); + }); + + it('budget routes reject a foreign productId with 403 (all 5 verbs, no cross-product access)', async () => { + const app = await buildApp(); + const get = await app.inject({ method: 'GET', url: '/api/fleet/budgets/chronomind' }); + expect(get.statusCode).toBe(403); + const burndown = await app.inject({ + method: 'GET', + url: '/api/fleet/budgets/chronomind/burndown', + }); + expect(burndown.statusCode).toBe(403); + const put = await app.inject({ + method: 'PUT', + url: '/api/fleet/budgets/chronomind', + payload: { ceilingUsd: 999, window: 'monthly' }, + }); + expect(put.statusCode).toBe(403); + const pause = await app.inject({ method: 'POST', url: '/api/fleet/budgets/chronomind/pause' }); + expect(pause.statusCode).toBe(403); + const resume = await app.inject({ + method: 'POST', + url: '/api/fleet/budgets/chronomind/resume', + }); + expect(resume.statusCode).toBe(403); + }); }); diff --git a/services/platform-service/src/modules/fleet/routes.ts b/services/platform-service/src/modules/fleet/routes.ts index 359f553d..84ce2380 100644 --- a/services/platform-service/src/modules/fleet/routes.ts +++ b/services/platform-service/src/modules/fleet/routes.ts @@ -25,7 +25,7 @@ import type { FastifyInstance } from 'fastify'; import { getRequestProductId } from '../../lib/request-context.js'; -import { BadRequestError, ConflictError, NotFoundError } from '../../lib/errors.js'; +import { BadRequestError, ConflictError, ForbiddenError, NotFoundError } from '../../lib/errors.js'; import { extractAuth } from '../../lib/auth.js'; import * as repo from './repository.js'; import * as coordinator from './coordinator.js'; @@ -55,6 +55,20 @@ function badRequest(issues: { message: string }[]): never { throw new BadRequestError(issues.map(i => i.message).join('; ')); } +/** + * Resolve the caller's productId and enforce that any `:productId` path param + * matches it. Prevents a caller authenticated for product A from reading or + * mutating another product's fleet budget. Returns the caller's productId. + */ +function requireOwnProduct(req: import('fastify').FastifyRequest): string { + const pid = getRequestProductId(req); + const { productId } = req.params as { productId?: string }; + if (productId && productId !== pid) { + throw new ForbiddenError("Cannot access another product's fleet budget"); + } + return pid; +} + const delay = (ms: number): Promise => new Promise(resolve => setTimeout(resolve, ms)); /** Parse an integer query param, clamping to [min, max]; fall back to `fallback`. */ @@ -496,7 +510,7 @@ export async function fleetRoutes(app: FastifyInstance) { // ── Phase 3 Budgets: GET/PUT per-product budget ── app.get('/fleet/budgets/:productId', async req => { await extractAuth(req); - const { productId } = req.params as { productId: string }; + const productId = requireOwnProduct(req); const budget = await coordinator.getBudget(productId); if (!budget) throw new NotFoundError('No budget configured for this product'); return budget; @@ -505,7 +519,7 @@ export async function fleetRoutes(app: FastifyInstance) { // ── Cost burndown — spend-over-time vs ceiling (§14) ── app.get('/fleet/budgets/:productId/burndown', async req => { await extractAuth(req); - const { productId } = req.params as { productId: string }; + const productId = requireOwnProduct(req); const { days } = req.query as { days?: string }; const parsedDays = days ? Number.parseInt(days, 10) : undefined; return coordinator.costBurndown( @@ -516,7 +530,7 @@ export async function fleetRoutes(app: FastifyInstance) { app.put('/fleet/budgets/:productId', async (req, reply) => { await extractAuth(req); - const { productId } = req.params as { productId: string }; + const productId = requireOwnProduct(req); const parsed = UpsertBudgetSchema.safeParse(req.body); if (!parsed.success) badRequest(parsed.error.issues); const budget = await coordinator.upsertBudget( @@ -530,7 +544,7 @@ export async function fleetRoutes(app: FastifyInstance) { app.post('/fleet/budgets/:productId/pause', async req => { await extractAuth(req); - const { productId } = req.params as { productId: string }; + const productId = requireOwnProduct(req); const budget = await coordinator.pauseBudget(productId); if (!budget) throw new NotFoundError('No budget configured for this product'); return budget; @@ -538,7 +552,7 @@ export async function fleetRoutes(app: FastifyInstance) { app.post('/fleet/budgets/:productId/resume', async req => { await extractAuth(req); - const { productId } = req.params as { productId: string }; + const productId = requireOwnProduct(req); const budget = await coordinator.resumeBudget(productId); if (!budget) throw new NotFoundError('No budget configured for this product'); return budget; diff --git a/services/platform-service/src/modules/fleet/types.ts b/services/platform-service/src/modules/fleet/types.ts index 80e8e674..fa908415 100644 --- a/services/platform-service/src/modules/fleet/types.ts +++ b/services/platform-service/src/modules/fleet/types.ts @@ -493,6 +493,8 @@ export const FleetBudgetDocSchema = z.object({ spentUsd: z.number().nonnegative().default(0), status: z.enum(BUDGET_STATUSES).default('active'), windowStart: z.string().optional(), + /** Run identifiers already accrued — makes accrueSpend idempotent per run. */ + accruedRunIds: z.array(z.string()).default([]), updatedAt: z.string(), }); export type FleetBudgetDoc = z.infer;