From 32e426d423a6204aa1a508e00e252e63cc347b1f Mon Sep 17 00:00:00 2001 From: saravanakumardb1 Date: Sun, 31 May 2026 01:49:59 -0700 Subject: [PATCH] feat(fleet): run-insights reporting + ship action; complete the lifecycle MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Make the Agent Gigafactory fully drivable end-to-end via the API: - lease/release now accepts `insights` (model/tokens/cost) + `result`, recorded on the current run with endedAt — factories report cost/token metrics on completion (previously no API existed; runs stayed insights:{}). - add `ship` operator action so a job in `testing` (where the review gate left no lease holder) can reach the terminal `shipped` stage. Idempotent. - operatorAction now retries on optimistic-concurrency conflict with backoff (mirrors submitReview) so a ship right after approve survives real-Cosmos read-after-write lag instead of a spurious 409. Tests: +2 coordinator (ship idempotent, release records insights) and +2 route integration (gated submit->...->ship->metrics; release-with-insights). 170 pass. Generated with [Devin](https://cli.devin.ai/docs) Co-Authored-By: Devin <158243242+devin-ai-integration[bot]@users.noreply.github.com> --- .../src/modules/fleet/coordinator.test.ts | 38 ++++++ .../src/modules/fleet/coordinator.ts | 121 +++++++++++------- .../src/modules/fleet/routes.test.ts | 79 ++++++++++++ .../src/modules/fleet/routes.ts | 5 +- .../src/modules/fleet/types.ts | 4 + 5 files changed, 199 insertions(+), 48 deletions(-) diff --git a/services/platform-service/src/modules/fleet/coordinator.test.ts b/services/platform-service/src/modules/fleet/coordinator.test.ts index cd298071..4976e739 100644 --- a/services/platform-service/src/modules/fleet/coordinator.test.ts +++ b/services/platform-service/src/modules/fleet/coordinator.test.ts @@ -821,6 +821,44 @@ describe('fleet coordinator — Phase 3 per-product budgets', () => { if (!missing.ok) expect(missing.reason).toBe('not_found'); }); + it('operator action: ship transitions a testing job to shipped (idempotent)', async () => { + const { job } = await coord.submitJob(PID, input()); + const claim = await coord.claimNextJob(factory()); + // simulate the review gate having routed the job to `testing` (no lease holder) + await coord.patchJobFenced(job.id, PID, { + leaseEpoch: claim!.job.leaseEpoch, + stage: 'testing', + }); + + const shipped = await coord.operatorAction(job.id, PID, 'ship'); + expect(shipped.ok).toBe(true); + if (shipped.ok) expect(shipped.doc.stage).toBe('shipped'); + + // re-shipping is a no-op (idempotent), not invalid_state + const again = await coord.operatorAction(job.id, PID, 'ship'); + expect(again.ok).toBe(true); + if (again.ok) expect(again.doc.stage).toBe('shipped'); + }); + + it('releaseLease records run insights + result on the current run', async () => { + const { job } = await coord.submitJob(PID, input()); + const claim = await coord.claimNextJob(factory()); + const epoch = claim!.job.leaseEpoch; + + const insights = { model: 'claude-sonnet-4', tokensIn: 1200, tokensOut: 340, costUsd: 0.0123 }; + const res = await coord.releaseLease(job.id, PID, epoch, 'shipped', { + insights, + result: 'shipped', + }); + expect(res.ok).toBe(true); + + const runs = await repo.listRunsByJob(job.id); + expect(runs.length).toBe(1); + expect(runs[0]!.insights).toMatchObject(insights); + expect(runs[0]!.result).toBe('shipped'); + expect(runs[0]!.endedAt).toBeTruthy(); + }); + it('operator reject/cancel are idempotent on already-terminal jobs (no epoch bump)', async () => { const { job } = await coord.submitJob(PID, input()); await coord.claimNextJob(factory()); diff --git a/services/platform-service/src/modules/fleet/coordinator.ts b/services/platform-service/src/modules/fleet/coordinator.ts index f855e55a..b72d610c 100644 --- a/services/platform-service/src/modules/fleet/coordinator.ts +++ b/services/platform-service/src/modules/fleet/coordinator.ts @@ -821,7 +821,8 @@ export async function releaseLease( jobId: string, productId: string, leaseEpoch: number, - stage?: FleetStage + stage?: FleetStage, + report?: { insights?: FleetRunDoc['insights']; result?: FleetRunDoc['result'] } ): Promise> { const job = await repo.getJob(jobId, productId); if (!job) return { ok: false, reason: 'not_found' }; @@ -831,15 +832,27 @@ export async function releaseLease( const res = await repo.revUpdateLease(jobId, lease.rev, { status: 'released' }); if (!res.ok) return { ok: false, reason: 'conflict' }; if (stage) await repo.revUpdateJob(jobId, productId, job.rev, { stage }); + // Record the factory's reported cost/token metrics + outcome on the current run. + if (report?.insights || report?.result) { + const runId = `${jobId}:run:${job.attempts}`; + await repo.updateRun(runId, jobId, { + ...(report.insights ? { insights: report.insights } : {}), + ...(report.result ? { result: report.result } : {}), + endedAt: new Date().toISOString(), + }); + } await repo.appendEvent({ jobId, productId, type: 'lease_released', data: { leaseEpoch, stage } }); return { ok: true, doc: res.doc }; } // ── Operator actions (§14 Phase 3 — approve/ship/reject/requeue) ────────────── -export const OPERATOR_ACTIONS = ['requeue', 'reject', 'cancel'] as const; +export const OPERATOR_ACTIONS = ['requeue', 'reject', 'cancel', 'ship'] as const; export type OperatorActionKind = (typeof OPERATOR_ACTIONS)[number]; +/** Bounded retries for the operator-action optimistic-concurrency loop. */ +const OPERATOR_ACTION_MAX_RETRIES = 5; + export type OperatorActionResult = | { ok: true; doc: FleetJobDoc } | { ok: false; reason: 'not_found' | 'conflict' | 'invalid_state' }; @@ -862,58 +875,72 @@ export async function operatorAction( productId: string, action: OperatorActionKind ): Promise { - const job = await repo.getJob(jobId, productId); - if (!job) return { ok: false, reason: 'not_found' }; - if (job.stage === 'shipped') return { ok: false, reason: 'invalid_state' }; + // Retry on optimistic-concurrency conflict (real Cosmos read-after-write lag), + // mirroring submitReview — a single attempt can spuriously 409 right after a + // prior transition (e.g. review->testing) has not yet settled. + for (let attempt = 0; attempt < OPERATOR_ACTION_MAX_RETRIES; attempt++) { + const job = await repo.getJob(jobId, productId); + if (!job) return { ok: false, reason: 'not_found' }; - // Idempotent terminal actions: re-rejecting a dead_letter / re-cancelling a - // failed job is a no-op (don't bump the epoch or append a duplicate event). - if (action === 'reject' && job.stage === 'dead_letter') return { ok: true, doc: job }; - if (action === 'cancel' && job.stage === 'failed') return { ok: true, doc: job }; + // Idempotent terminal actions: re-shipping/rejecting/cancelling a job already + // in that terminal state is a no-op (don't bump the epoch or append a dup event). + if (action === 'ship' && job.stage === 'shipped') return { ok: true, doc: job }; + if (job.stage === 'shipped') return { ok: false, reason: 'invalid_state' }; + if (action === 'reject' && job.stage === 'dead_letter') return { ok: true, doc: job }; + if (action === 'cancel' && job.stage === 'failed') return { ok: true, doc: job }; - const newEpoch = job.leaseEpoch + 1; // fence any current holder + const newEpoch = job.leaseEpoch + 1; // fence any current holder - let stage: FleetStage; - let blockedReason: string | undefined; - if (action === 'requeue') { - const unmet = await unmetDeps(job); - stage = unmet.length > 0 ? 'blocked' : 'queued'; - blockedReason = unmet.length > 0 ? `waiting on: ${unmet.join(', ')}` : undefined; - } else if (action === 'reject') { - stage = 'dead_letter'; - } else { - stage = 'failed'; - } + let stage: FleetStage; + let blockedReason: string | undefined; + if (action === 'requeue') { + const unmet = await unmetDeps(job); + stage = unmet.length > 0 ? 'blocked' : 'queued'; + blockedReason = unmet.length > 0 ? `waiting on: ${unmet.join(', ')}` : undefined; + } else if (action === 'reject') { + stage = 'dead_letter'; + } else if (action === 'ship') { + // Operator/controller marks the job done (e.g. after the review gate routes + // it to `testing`, where no factory holds a lease to release it). + stage = 'shipped'; + } else { + stage = 'failed'; + } - const res = await repo.revUpdateJob(jobId, productId, job.rev, { - stage, - leaseEpoch: newEpoch, - blockedReason, - }); - if (!res.ok) { - return { ok: false, reason: res.reason === 'not_found' ? 'not_found' : 'conflict' }; - } - - // Free the seat: release any still-held lease, fencing it with the new epoch so - // a stale renewal cannot resurrect it (mirrors the reaper, §25.3). - const lease = await repo.getLease(jobId); - if (lease && lease.status === 'held') { - await repo.revUpdateLease(jobId, lease.rev, { - status: 'released', + const res = await repo.revUpdateJob(jobId, productId, job.rev, { + stage, leaseEpoch: newEpoch, - holderFactoryId: undefined, + blockedReason, }); + if (!res.ok) { + if (res.reason === 'not_found') return { ok: false, reason: 'not_found' }; + // conflict -> brief backoff so a read-after-write (real Cosmos) settles, then re-read + await new Promise(r => globalThis.setTimeout(r, 40 * (attempt + 1))); + continue; + } + + // Free the seat: release any still-held lease, fencing it with the new epoch so + // a stale renewal cannot resurrect it (mirrors the reaper, §25.3). + const lease = await repo.getLease(jobId); + if (lease && lease.status === 'held') { + await repo.revUpdateLease(jobId, lease.rev, { + status: 'released', + leaseEpoch: newEpoch, + holderFactoryId: undefined, + }); + } + + await repo.appendEvent({ + jobId, + productId, + type: 'operator_action', + actor: 'operator', + data: { action, leaseEpoch: newEpoch, returnedTo: stage }, + }); + + return { ok: true, doc: res.doc }; } - - await repo.appendEvent({ - jobId, - productId, - type: 'operator_action', - actor: 'operator', - data: { action, leaseEpoch: newEpoch, returnedTo: stage }, - }); - - return { ok: true, doc: res.doc }; + return { ok: false, reason: 'conflict' }; } // ── Multi-reviewer human gate (§14 Phase 3 — review-policy routing) ──────────── diff --git a/services/platform-service/src/modules/fleet/routes.test.ts b/services/platform-service/src/modules/fleet/routes.test.ts index 78924988..58dacf4f 100644 --- a/services/platform-service/src/modules/fleet/routes.test.ts +++ b/services/platform-service/src/modules/fleet/routes.test.ts @@ -52,6 +52,85 @@ describe('fleetRoutes', () => { expect(res.statusCode).toBe(400); }); + it('gated lifecycle via routes: submit -> claim -> building -> review -> approve -> ship -> metrics', async () => { + const app = await buildApp(); + const sub = await submit(app, { + idempotencyKey: 'gated-1', + bodyMd: '# build feature', + priority: 'high', + }); + const jobId = JSON.parse(sub.body).job.id as string; + + const claim = await app.inject({ + method: 'POST', + url: '/api/fleet/claim', + payload: { factoryId: 'fac_1', capabilities: [] }, + }); + const epoch = JSON.parse(claim.body).job.leaseEpoch as number; + + const toBuilding = await app.inject({ + method: 'PATCH', + url: `/api/fleet/jobs/${jobId}`, + payload: { leaseEpoch: epoch, stage: 'building' }, + }); + expect(toBuilding.statusCode).toBe(200); + + const reviewReq = await app.inject({ + method: 'POST', + url: `/api/fleet/jobs/${jobId}/review/request`, + payload: { requiredApprovals: 1, reviewers: ['rev@x'] }, + }); + expect(reviewReq.statusCode).toBe(200); + expect(JSON.parse(reviewReq.body).stage).toBe('review'); + + const approve = await app.inject({ + method: 'POST', + url: `/api/fleet/jobs/${jobId}/review`, + payload: { reviewer: 'rev@x', decision: 'approve' }, + }); + expect(approve.statusCode).toBe(200); + expect(JSON.parse(approve.body).stage).toBe('testing'); + + // ship from testing — the gap this closes (no lease holder after the gate) + const ship = await app.inject({ method: 'POST', url: `/api/fleet/jobs/${jobId}/actions/ship` }); + expect(ship.statusCode).toBe(200); + expect(JSON.parse(ship.body).stage).toBe('shipped'); + + const metrics = await app.inject({ method: 'GET', url: '/api/fleet/metrics' }); + expect(JSON.parse(metrics.body).jobs.byStage.shipped).toBe(1); + }); + + it('release with insights records run cost/tokens (factory reporting)', async () => { + const app = await buildApp(); + const sub = await submit(app, { idempotencyKey: 'ins-1', bodyMd: '# task' }); + const jobId = JSON.parse(sub.body).job.id as string; + const claim = await app.inject({ + method: 'POST', + url: '/api/fleet/claim', + payload: { factoryId: 'fac_1', capabilities: [] }, + }); + const epoch = JSON.parse(claim.body).job.leaseEpoch as number; + + const rel = await app.inject({ + method: 'POST', + url: `/api/fleet/jobs/${jobId}/lease/release`, + payload: { + leaseEpoch: epoch, + stage: 'shipped', + insights: { model: 'claude-sonnet-4', tokensIn: 100, tokensOut: 20, costUsd: 0.01 }, + result: 'shipped', + }, + }); + expect(rel.statusCode).toBe(200); + + const runs = await app.inject({ method: 'GET', url: `/api/fleet/jobs/${jobId}/runs` }); + const run = JSON.parse(runs.body).runs[0]; + expect(run.insights.costUsd).toBe(0.01); + expect(run.insights.tokensIn).toBe(100); + expect(run.result).toBe('shipped'); + expect(run.endedAt).toBeTruthy(); + }); + it('full lifecycle: submit -> list -> claim -> patch(fenced) -> renew -> release', async () => { const app = await buildApp(); const sub = await submit(app, { idempotencyKey: 'k1', bodyMd: '# task', priority: 'high' }); diff --git a/services/platform-service/src/modules/fleet/routes.ts b/services/platform-service/src/modules/fleet/routes.ts index f5a36b6f..359f553d 100644 --- a/services/platform-service/src/modules/fleet/routes.ts +++ b/services/platform-service/src/modules/fleet/routes.ts @@ -244,7 +244,10 @@ export async function fleetRoutes(app: FastifyInstance) { const pid = getRequestProductId(req); const parsed = ReleaseLeaseSchema.safeParse(req.body); if (!parsed.success) badRequest(parsed.error.issues); - const res = await coordinator.releaseLease(id, pid, parsed.data.leaseEpoch, parsed.data.stage); + const res = await coordinator.releaseLease(id, pid, parsed.data.leaseEpoch, parsed.data.stage, { + insights: parsed.data.insights, + result: parsed.data.result, + }); if (!res.ok) { if (res.reason === 'not_found') throw new NotFoundError('Job or lease not found'); if (res.reason === 'fenced') throw new ConflictError('stale leaseEpoch — release fenced'); diff --git a/services/platform-service/src/modules/fleet/types.ts b/services/platform-service/src/modules/fleet/types.ts index d1655f96..80e8e674 100644 --- a/services/platform-service/src/modules/fleet/types.ts +++ b/services/platform-service/src/modules/fleet/types.ts @@ -408,6 +408,10 @@ export type RenewLeaseInput = z.infer; export const ReleaseLeaseSchema = z.object({ leaseEpoch: z.number().int().nonnegative(), stage: z.enum(FLEET_STAGES).optional(), + // A factory reports cost/token/effort metrics + the run outcome when it + // releases the lease at the end of a work unit. Recorded on the current run. + insights: InsightsSchema.optional(), + result: z.enum(RUN_RESULTS).optional(), }); export type ReleaseLeaseInput = z.infer;