feat(fleet): run-insights reporting + ship action; complete the lifecycle

Make the Agent Gigafactory fully drivable end-to-end via the API:

- lease/release now accepts `insights` (model/tokens/cost) + `result`, recorded
  on the current run with endedAt — factories report cost/token metrics on
  completion (previously no API existed; runs stayed insights:{}).
- add `ship` operator action so a job in `testing` (where the review gate left
  no lease holder) can reach the terminal `shipped` stage. Idempotent.
- operatorAction now retries on optimistic-concurrency conflict with backoff
  (mirrors submitReview) so a ship right after approve survives real-Cosmos
  read-after-write lag instead of a spurious 409.

Tests: +2 coordinator (ship idempotent, release records insights) and +2 route
integration (gated submit->...->ship->metrics; release-with-insights). 170 pass.

Generated with [Devin](https://cli.devin.ai/docs)

Co-Authored-By: Devin <158243242+devin-ai-integration[bot]@users.noreply.github.com>
This commit is contained in:
saravanakumardb1 2026-05-31 01:49:59 -07:00
parent 939c7b4621
commit 32e426d423
5 changed files with 199 additions and 48 deletions

View File

@ -821,6 +821,44 @@ describe('fleet coordinator — Phase 3 per-product budgets', () => {
if (!missing.ok) expect(missing.reason).toBe('not_found');
});
it('operator action: ship transitions a testing job to shipped (idempotent)', async () => {
const { job } = await coord.submitJob(PID, input());
const claim = await coord.claimNextJob(factory());
// simulate the review gate having routed the job to `testing` (no lease holder)
await coord.patchJobFenced(job.id, PID, {
leaseEpoch: claim!.job.leaseEpoch,
stage: 'testing',
});
const shipped = await coord.operatorAction(job.id, PID, 'ship');
expect(shipped.ok).toBe(true);
if (shipped.ok) expect(shipped.doc.stage).toBe('shipped');
// re-shipping is a no-op (idempotent), not invalid_state
const again = await coord.operatorAction(job.id, PID, 'ship');
expect(again.ok).toBe(true);
if (again.ok) expect(again.doc.stage).toBe('shipped');
});
it('releaseLease records run insights + result on the current run', async () => {
const { job } = await coord.submitJob(PID, input());
const claim = await coord.claimNextJob(factory());
const epoch = claim!.job.leaseEpoch;
const insights = { model: 'claude-sonnet-4', tokensIn: 1200, tokensOut: 340, costUsd: 0.0123 };
const res = await coord.releaseLease(job.id, PID, epoch, 'shipped', {
insights,
result: 'shipped',
});
expect(res.ok).toBe(true);
const runs = await repo.listRunsByJob(job.id);
expect(runs.length).toBe(1);
expect(runs[0]!.insights).toMatchObject(insights);
expect(runs[0]!.result).toBe('shipped');
expect(runs[0]!.endedAt).toBeTruthy();
});
it('operator reject/cancel are idempotent on already-terminal jobs (no epoch bump)', async () => {
const { job } = await coord.submitJob(PID, input());
await coord.claimNextJob(factory());

View File

@ -821,7 +821,8 @@ export async function releaseLease(
jobId: string,
productId: string,
leaseEpoch: number,
stage?: FleetStage
stage?: FleetStage,
report?: { insights?: FleetRunDoc['insights']; result?: FleetRunDoc['result'] }
): Promise<FenceResult<FleetLeaseDoc>> {
const job = await repo.getJob(jobId, productId);
if (!job) return { ok: false, reason: 'not_found' };
@ -831,15 +832,27 @@ export async function releaseLease(
const res = await repo.revUpdateLease(jobId, lease.rev, { status: 'released' });
if (!res.ok) return { ok: false, reason: 'conflict' };
if (stage) await repo.revUpdateJob(jobId, productId, job.rev, { stage });
// Record the factory's reported cost/token metrics + outcome on the current run.
if (report?.insights || report?.result) {
const runId = `${jobId}:run:${job.attempts}`;
await repo.updateRun(runId, jobId, {
...(report.insights ? { insights: report.insights } : {}),
...(report.result ? { result: report.result } : {}),
endedAt: new Date().toISOString(),
});
}
await repo.appendEvent({ jobId, productId, type: 'lease_released', data: { leaseEpoch, stage } });
return { ok: true, doc: res.doc };
}
// ── Operator actions (§14 Phase 3 — approve/ship/reject/requeue) ──────────────
export const OPERATOR_ACTIONS = ['requeue', 'reject', 'cancel'] as const;
export const OPERATOR_ACTIONS = ['requeue', 'reject', 'cancel', 'ship'] as const;
export type OperatorActionKind = (typeof OPERATOR_ACTIONS)[number];
/** Bounded retries for the operator-action optimistic-concurrency loop. */
const OPERATOR_ACTION_MAX_RETRIES = 5;
export type OperatorActionResult =
| { ok: true; doc: FleetJobDoc }
| { ok: false; reason: 'not_found' | 'conflict' | 'invalid_state' };
@ -862,58 +875,72 @@ export async function operatorAction(
productId: string,
action: OperatorActionKind
): Promise<OperatorActionResult> {
const job = await repo.getJob(jobId, productId);
if (!job) return { ok: false, reason: 'not_found' };
if (job.stage === 'shipped') return { ok: false, reason: 'invalid_state' };
// Retry on optimistic-concurrency conflict (real Cosmos read-after-write lag),
// mirroring submitReview — a single attempt can spuriously 409 right after a
// prior transition (e.g. review->testing) has not yet settled.
for (let attempt = 0; attempt < OPERATOR_ACTION_MAX_RETRIES; attempt++) {
const job = await repo.getJob(jobId, productId);
if (!job) return { ok: false, reason: 'not_found' };
// Idempotent terminal actions: re-rejecting a dead_letter / re-cancelling a
// failed job is a no-op (don't bump the epoch or append a duplicate event).
if (action === 'reject' && job.stage === 'dead_letter') return { ok: true, doc: job };
if (action === 'cancel' && job.stage === 'failed') return { ok: true, doc: job };
// Idempotent terminal actions: re-shipping/rejecting/cancelling a job already
// in that terminal state is a no-op (don't bump the epoch or append a dup event).
if (action === 'ship' && job.stage === 'shipped') return { ok: true, doc: job };
if (job.stage === 'shipped') return { ok: false, reason: 'invalid_state' };
if (action === 'reject' && job.stage === 'dead_letter') return { ok: true, doc: job };
if (action === 'cancel' && job.stage === 'failed') return { ok: true, doc: job };
const newEpoch = job.leaseEpoch + 1; // fence any current holder
const newEpoch = job.leaseEpoch + 1; // fence any current holder
let stage: FleetStage;
let blockedReason: string | undefined;
if (action === 'requeue') {
const unmet = await unmetDeps(job);
stage = unmet.length > 0 ? 'blocked' : 'queued';
blockedReason = unmet.length > 0 ? `waiting on: ${unmet.join(', ')}` : undefined;
} else if (action === 'reject') {
stage = 'dead_letter';
} else {
stage = 'failed';
}
let stage: FleetStage;
let blockedReason: string | undefined;
if (action === 'requeue') {
const unmet = await unmetDeps(job);
stage = unmet.length > 0 ? 'blocked' : 'queued';
blockedReason = unmet.length > 0 ? `waiting on: ${unmet.join(', ')}` : undefined;
} else if (action === 'reject') {
stage = 'dead_letter';
} else if (action === 'ship') {
// Operator/controller marks the job done (e.g. after the review gate routes
// it to `testing`, where no factory holds a lease to release it).
stage = 'shipped';
} else {
stage = 'failed';
}
const res = await repo.revUpdateJob(jobId, productId, job.rev, {
stage,
leaseEpoch: newEpoch,
blockedReason,
});
if (!res.ok) {
return { ok: false, reason: res.reason === 'not_found' ? 'not_found' : 'conflict' };
}
// Free the seat: release any still-held lease, fencing it with the new epoch so
// a stale renewal cannot resurrect it (mirrors the reaper, §25.3).
const lease = await repo.getLease(jobId);
if (lease && lease.status === 'held') {
await repo.revUpdateLease(jobId, lease.rev, {
status: 'released',
const res = await repo.revUpdateJob(jobId, productId, job.rev, {
stage,
leaseEpoch: newEpoch,
holderFactoryId: undefined,
blockedReason,
});
if (!res.ok) {
if (res.reason === 'not_found') return { ok: false, reason: 'not_found' };
// conflict -> brief backoff so a read-after-write (real Cosmos) settles, then re-read
await new Promise(r => globalThis.setTimeout(r, 40 * (attempt + 1)));
continue;
}
// Free the seat: release any still-held lease, fencing it with the new epoch so
// a stale renewal cannot resurrect it (mirrors the reaper, §25.3).
const lease = await repo.getLease(jobId);
if (lease && lease.status === 'held') {
await repo.revUpdateLease(jobId, lease.rev, {
status: 'released',
leaseEpoch: newEpoch,
holderFactoryId: undefined,
});
}
await repo.appendEvent({
jobId,
productId,
type: 'operator_action',
actor: 'operator',
data: { action, leaseEpoch: newEpoch, returnedTo: stage },
});
return { ok: true, doc: res.doc };
}
await repo.appendEvent({
jobId,
productId,
type: 'operator_action',
actor: 'operator',
data: { action, leaseEpoch: newEpoch, returnedTo: stage },
});
return { ok: true, doc: res.doc };
return { ok: false, reason: 'conflict' };
}
// ── Multi-reviewer human gate (§14 Phase 3 — review-policy routing) ────────────

View File

@ -52,6 +52,85 @@ describe('fleetRoutes', () => {
expect(res.statusCode).toBe(400);
});
it('gated lifecycle via routes: submit -> claim -> building -> review -> approve -> ship -> metrics', async () => {
const app = await buildApp();
const sub = await submit(app, {
idempotencyKey: 'gated-1',
bodyMd: '# build feature',
priority: 'high',
});
const jobId = JSON.parse(sub.body).job.id as string;
const claim = await app.inject({
method: 'POST',
url: '/api/fleet/claim',
payload: { factoryId: 'fac_1', capabilities: [] },
});
const epoch = JSON.parse(claim.body).job.leaseEpoch as number;
const toBuilding = await app.inject({
method: 'PATCH',
url: `/api/fleet/jobs/${jobId}`,
payload: { leaseEpoch: epoch, stage: 'building' },
});
expect(toBuilding.statusCode).toBe(200);
const reviewReq = await app.inject({
method: 'POST',
url: `/api/fleet/jobs/${jobId}/review/request`,
payload: { requiredApprovals: 1, reviewers: ['rev@x'] },
});
expect(reviewReq.statusCode).toBe(200);
expect(JSON.parse(reviewReq.body).stage).toBe('review');
const approve = await app.inject({
method: 'POST',
url: `/api/fleet/jobs/${jobId}/review`,
payload: { reviewer: 'rev@x', decision: 'approve' },
});
expect(approve.statusCode).toBe(200);
expect(JSON.parse(approve.body).stage).toBe('testing');
// ship from testing — the gap this closes (no lease holder after the gate)
const ship = await app.inject({ method: 'POST', url: `/api/fleet/jobs/${jobId}/actions/ship` });
expect(ship.statusCode).toBe(200);
expect(JSON.parse(ship.body).stage).toBe('shipped');
const metrics = await app.inject({ method: 'GET', url: '/api/fleet/metrics' });
expect(JSON.parse(metrics.body).jobs.byStage.shipped).toBe(1);
});
it('release with insights records run cost/tokens (factory reporting)', async () => {
const app = await buildApp();
const sub = await submit(app, { idempotencyKey: 'ins-1', bodyMd: '# task' });
const jobId = JSON.parse(sub.body).job.id as string;
const claim = await app.inject({
method: 'POST',
url: '/api/fleet/claim',
payload: { factoryId: 'fac_1', capabilities: [] },
});
const epoch = JSON.parse(claim.body).job.leaseEpoch as number;
const rel = await app.inject({
method: 'POST',
url: `/api/fleet/jobs/${jobId}/lease/release`,
payload: {
leaseEpoch: epoch,
stage: 'shipped',
insights: { model: 'claude-sonnet-4', tokensIn: 100, tokensOut: 20, costUsd: 0.01 },
result: 'shipped',
},
});
expect(rel.statusCode).toBe(200);
const runs = await app.inject({ method: 'GET', url: `/api/fleet/jobs/${jobId}/runs` });
const run = JSON.parse(runs.body).runs[0];
expect(run.insights.costUsd).toBe(0.01);
expect(run.insights.tokensIn).toBe(100);
expect(run.result).toBe('shipped');
expect(run.endedAt).toBeTruthy();
});
it('full lifecycle: submit -> list -> claim -> patch(fenced) -> renew -> release', async () => {
const app = await buildApp();
const sub = await submit(app, { idempotencyKey: 'k1', bodyMd: '# task', priority: 'high' });

View File

@ -244,7 +244,10 @@ export async function fleetRoutes(app: FastifyInstance) {
const pid = getRequestProductId(req);
const parsed = ReleaseLeaseSchema.safeParse(req.body);
if (!parsed.success) badRequest(parsed.error.issues);
const res = await coordinator.releaseLease(id, pid, parsed.data.leaseEpoch, parsed.data.stage);
const res = await coordinator.releaseLease(id, pid, parsed.data.leaseEpoch, parsed.data.stage, {
insights: parsed.data.insights,
result: parsed.data.result,
});
if (!res.ok) {
if (res.reason === 'not_found') throw new NotFoundError('Job or lease not found');
if (res.reason === 'fenced') throw new ConflictError('stale leaseEpoch — release fenced');

View File

@ -408,6 +408,10 @@ export type RenewLeaseInput = z.infer<typeof RenewLeaseSchema>;
export const ReleaseLeaseSchema = z.object({
leaseEpoch: z.number().int().nonnegative(),
stage: z.enum(FLEET_STAGES).optional(),
// A factory reports cost/token/effort metrics + the run outcome when it
// releases the lease at the end of a work unit. Recorded on the current run.
insights: InsightsSchema.optional(),
result: z.enum(RUN_RESULTS).optional(),
});
export type ReleaseLeaseInput = z.infer<typeof ReleaseLeaseSchema>;