diff --git a/docs/GIGAFACTORY/FLEET_CONTROL_PLANE.md b/docs/GIGAFACTORY/FLEET_CONTROL_PLANE.md index a510c9c8..5c9977d8 100644 --- a/docs/GIGAFACTORY/FLEET_CONTROL_PLANE.md +++ b/docs/GIGAFACTORY/FLEET_CONTROL_PLANE.md @@ -121,23 +121,54 @@ The UI calls platform-service fleet endpoints via `/api/fleet/[...path]` proxy. | ------------------ | ----------------------- | ----------------------------------- | | `PLATFORM_API_URL` | `http://localhost:4003` | Platform-service base URL for proxy | +## Job Lifecycle & Shipping (testing → shipped) + +Stages: `queued → assigned → building → review → testing → shipped` (plus `blocked`, +`failed`, `dead_letter`). A factory drives `assigned → building → review`, then runs +its local verify gate. + +There are two ways a job reaches the terminal `shipped` stage (the +`testing → shipped` transition has no claimable lease holder after the review gate, +so it is driven by one of): + +1. **Factory autoship** (`AQ_FLEET_AUTOSHIP=1` on the agent-queue factory): when the + factory's local verify passes it reports `testing`, then advances the coordinator + job `testing → shipped` autonomously (the factory's verify **is** the test phase). + This is the autonomous `submit → … → shipped` path. Default off. +2. **`ship` operator action** (`POST /fleet/jobs/:id/actions/:action` with + `ship`): an operator/controller marks a non-terminal job `shipped`. Lease-free + (works after the human review gate), idempotent, and retries on optimistic- + concurrency conflict. + +With `AQ_FLEET_AUTOSHIP=0` (default) a verify-passing job rests at `testing` for the +**human review gate** (`review/request` + multi-reviewer `review` approve) or a manual +`ship`. + +Whenever a job reaches `shipped` (autoship PATCH, `ship` action, or a terminal lease +release), the coordinator mirrors the outcome onto the latest **run** +(`result = 'shipped'`, `endedAt` set) and — if budgets are enabled — accrues that +run's `insights.costUsd`. So the dashboard's per-run result/cost/tokens stay +consistent with the job stage. + ## API Reference Summary -| Endpoint | Method | Phase | Notes | -| ---------------------------------- | ------ | ----- | -------------------------------------------------- | -| `/fleet/jobs` | GET | 2 | List jobs (query: stage, productId, limit, offset) | -| `/fleet/jobs` | POST | 2 | Submit job (+ optional children[] for DAG) | -| `/fleet/jobs/:id` | GET | 2 | Get job | -| `/fleet/jobs/:id` | PATCH | 2 | Update stage (fenced) | -| `/fleet/jobs/:id/claim` | POST | 2 | Factory claims next job | -| `/fleet/jobs/:id/children` | POST | 3 | Add children to existing job | -| `/fleet/jobs/:id/dag` | GET | 3 | Get DAG subtree | -| `/fleet/factories` | GET | 2 | List factories | -| `/fleet/factories/:id/heartbeat` | POST | 2 | Factory heartbeat | -| `/fleet/budgets/:productId` | GET | 3 | Get budget | -| `/fleet/budgets/:productId` | PUT | 3 | Upsert budget | -| `/fleet/budgets/:productId/pause` | POST | 3 | Pause budget | -| `/fleet/budgets/:productId/resume` | POST | 3 | Resume budget | +| Endpoint | Method | Phase | Notes | +| ---------------------------------- | ------ | ----- | --------------------------------------------------------- | +| `/fleet/jobs` | GET | 2 | List jobs (query: stage, productId, limit, offset) | +| `/fleet/jobs` | POST | 2 | Submit job (+ optional children[] for DAG) | +| `/fleet/jobs/:id` | GET | 2 | Get job | +| `/fleet/jobs/:id` | PATCH | 2 | Update stage (fenced) | +| `/fleet/jobs/:id/actions/:action` | POST | 3 | Operator action: `requeue` / `reject` / `cancel` / `ship` | +| `/fleet/jobs/:id/lease/release` | POST | 2 | Release lease (optional `stage`, `insights`, `result`) | +| `/fleet/jobs/:id/claim` | POST | 2 | Factory claims next job | +| `/fleet/jobs/:id/children` | POST | 3 | Add children to existing job | +| `/fleet/jobs/:id/dag` | GET | 3 | Get DAG subtree | +| `/fleet/factories` | GET | 2 | List factories | +| `/fleet/factories/:id/heartbeat` | POST | 2 | Factory heartbeat | +| `/fleet/budgets/:productId` | GET | 3 | Get budget | +| `/fleet/budgets/:productId` | PUT | 3 | Upsert budget | +| `/fleet/budgets/:productId/pause` | POST | 3 | Pause budget | +| `/fleet/budgets/:productId/resume` | POST | 3 | Resume budget | ## Architecture Decisions diff --git a/services/platform-service/src/modules/fleet/coordinator.ts b/services/platform-service/src/modules/fleet/coordinator.ts index 8adfe0cc..dacf02df 100644 --- a/services/platform-service/src/modules/fleet/coordinator.ts +++ b/services/platform-service/src/modules/fleet/coordinator.ts @@ -609,6 +609,26 @@ export interface PatchJobInputInternal { blockedReason?: string; } +/** + * Mark a job's latest run as shipped so the run-level `result` mirrors the + * terminal job stage (a job can reach `shipped` via autoship PATCH, the `ship` + * operator action, or a lease release). Idempotent; returns the latest run. + */ +async function markLatestRunShipped(jobId: string): Promise { + const runs = await repo.listRunsByJob(jobId); + const latest = runs.reduce( + (acc, r) => (!acc || r.attempt > acc.attempt ? r : acc), + undefined + ); + if (latest && latest.result !== 'shipped') { + await repo.updateRun(latest.id, jobId, { + result: 'shipped', + ...(latest.endedAt ? {} : { endedAt: new Date().toISOString() }), + }); + } + return latest; +} + export async function patchJobFenced( jobId: string, productId: string, @@ -638,22 +658,20 @@ export async function patchJobFenced( await maybeUnblockParent(job.parentId, productId); } - // Phase 3 budgets: accrue spend when a job ships (terminal success). - // Flag-gated + best-effort — an accrual failure NEVER fails the transition. - // Idempotent per run via `:` so duplicate ship transitions do - // not double-count. Accrue the run's ACTUAL cost (insights.costUsd) so spentUsd - // and the costBurndown (which also reads insights.costUsd) agree. - if (patch.stage === 'shipped' && isBudgetsEnabled()) { + // On ship (terminal success): mirror the outcome onto the run, and accrue spend. + // Best-effort — never fail the transition. Budget accrual is flag-gated and + // idempotent per run via `:`, using the run's ACTUAL cost + // (insights.costUsd) so spentUsd and the costBurndown agree. + if (patch.stage === 'shipped') { try { - const runId = `${jobId}:${job.leaseEpoch}`; - const runs = await repo.listRunsByJob(jobId); - const latest = runs.reduce( - (acc, r) => (!acc || r.attempt > acc.attempt ? r : acc), - undefined - ); - await accrueSpend(productId, latest?.insights?.costUsd ?? 0, runId); + // Run-level result mirrors the terminal stage (ungated). + const latest = await markLatestRunShipped(jobId); + // Budgets (flag-gated): accrue the run's actual cost, idempotent per run. + if (isBudgetsEnabled()) { + await accrueSpend(productId, latest?.insights?.costUsd ?? 0, `${jobId}:${job.leaseEpoch}`); + } } catch { - // swallow — budget accounting is downstream of the job lifecycle + // swallow — run/budget bookkeeping is downstream of the job lifecycle } } @@ -984,6 +1002,15 @@ export async function operatorAction( data: { action, leaseEpoch: newEpoch, returnedTo: stage }, }); + // Operator `ship` is a terminal success — mirror it onto the run result. + if (stage === 'shipped') { + try { + await markLatestRunShipped(jobId); + } catch { + // best-effort — run bookkeeping never fails the operator action + } + } + return { ok: true, doc: res.doc }; } return { ok: false, reason: 'conflict' };