Merge fix/fleet-run-result-on-ship: run result mirrors shipped + docs
This commit is contained in:
commit
60989d7b62
@ -121,23 +121,54 @@ The UI calls platform-service fleet endpoints via `/api/fleet/[...path]` proxy.
|
||||
| ------------------ | ----------------------- | ----------------------------------- |
|
||||
| `PLATFORM_API_URL` | `http://localhost:4003` | Platform-service base URL for proxy |
|
||||
|
||||
## Job Lifecycle & Shipping (testing → shipped)
|
||||
|
||||
Stages: `queued → assigned → building → review → testing → shipped` (plus `blocked`,
|
||||
`failed`, `dead_letter`). A factory drives `assigned → building → review`, then runs
|
||||
its local verify gate.
|
||||
|
||||
There are two ways a job reaches the terminal `shipped` stage (the
|
||||
`testing → shipped` transition has no claimable lease holder after the review gate,
|
||||
so it is driven by one of):
|
||||
|
||||
1. **Factory autoship** (`AQ_FLEET_AUTOSHIP=1` on the agent-queue factory): when the
|
||||
factory's local verify passes it reports `testing`, then advances the coordinator
|
||||
job `testing → shipped` autonomously (the factory's verify **is** the test phase).
|
||||
This is the autonomous `submit → … → shipped` path. Default off.
|
||||
2. **`ship` operator action** (`POST /fleet/jobs/:id/actions/:action` with
|
||||
`ship`): an operator/controller marks a non-terminal job `shipped`. Lease-free
|
||||
(works after the human review gate), idempotent, and retries on optimistic-
|
||||
concurrency conflict.
|
||||
|
||||
With `AQ_FLEET_AUTOSHIP=0` (default) a verify-passing job rests at `testing` for the
|
||||
**human review gate** (`review/request` + multi-reviewer `review` approve) or a manual
|
||||
`ship`.
|
||||
|
||||
Whenever a job reaches `shipped` (autoship PATCH, `ship` action, or a terminal lease
|
||||
release), the coordinator mirrors the outcome onto the latest **run**
|
||||
(`result = 'shipped'`, `endedAt` set) and — if budgets are enabled — accrues that
|
||||
run's `insights.costUsd`. So the dashboard's per-run result/cost/tokens stay
|
||||
consistent with the job stage.
|
||||
|
||||
## API Reference Summary
|
||||
|
||||
| Endpoint | Method | Phase | Notes |
|
||||
| ---------------------------------- | ------ | ----- | -------------------------------------------------- |
|
||||
| `/fleet/jobs` | GET | 2 | List jobs (query: stage, productId, limit, offset) |
|
||||
| `/fleet/jobs` | POST | 2 | Submit job (+ optional children[] for DAG) |
|
||||
| `/fleet/jobs/:id` | GET | 2 | Get job |
|
||||
| `/fleet/jobs/:id` | PATCH | 2 | Update stage (fenced) |
|
||||
| `/fleet/jobs/:id/claim` | POST | 2 | Factory claims next job |
|
||||
| `/fleet/jobs/:id/children` | POST | 3 | Add children to existing job |
|
||||
| `/fleet/jobs/:id/dag` | GET | 3 | Get DAG subtree |
|
||||
| `/fleet/factories` | GET | 2 | List factories |
|
||||
| `/fleet/factories/:id/heartbeat` | POST | 2 | Factory heartbeat |
|
||||
| `/fleet/budgets/:productId` | GET | 3 | Get budget |
|
||||
| `/fleet/budgets/:productId` | PUT | 3 | Upsert budget |
|
||||
| `/fleet/budgets/:productId/pause` | POST | 3 | Pause budget |
|
||||
| `/fleet/budgets/:productId/resume` | POST | 3 | Resume budget |
|
||||
| Endpoint | Method | Phase | Notes |
|
||||
| ---------------------------------- | ------ | ----- | --------------------------------------------------------- |
|
||||
| `/fleet/jobs` | GET | 2 | List jobs (query: stage, productId, limit, offset) |
|
||||
| `/fleet/jobs` | POST | 2 | Submit job (+ optional children[] for DAG) |
|
||||
| `/fleet/jobs/:id` | GET | 2 | Get job |
|
||||
| `/fleet/jobs/:id` | PATCH | 2 | Update stage (fenced) |
|
||||
| `/fleet/jobs/:id/actions/:action` | POST | 3 | Operator action: `requeue` / `reject` / `cancel` / `ship` |
|
||||
| `/fleet/jobs/:id/lease/release` | POST | 2 | Release lease (optional `stage`, `insights`, `result`) |
|
||||
| `/fleet/jobs/:id/claim` | POST | 2 | Factory claims next job |
|
||||
| `/fleet/jobs/:id/children` | POST | 3 | Add children to existing job |
|
||||
| `/fleet/jobs/:id/dag` | GET | 3 | Get DAG subtree |
|
||||
| `/fleet/factories` | GET | 2 | List factories |
|
||||
| `/fleet/factories/:id/heartbeat` | POST | 2 | Factory heartbeat |
|
||||
| `/fleet/budgets/:productId` | GET | 3 | Get budget |
|
||||
| `/fleet/budgets/:productId` | PUT | 3 | Upsert budget |
|
||||
| `/fleet/budgets/:productId/pause` | POST | 3 | Pause budget |
|
||||
| `/fleet/budgets/:productId/resume` | POST | 3 | Resume budget |
|
||||
|
||||
## Architecture Decisions
|
||||
|
||||
|
||||
@ -609,6 +609,26 @@ export interface PatchJobInputInternal {
|
||||
blockedReason?: string;
|
||||
}
|
||||
|
||||
/**
|
||||
* Mark a job's latest run as shipped so the run-level `result` mirrors the
|
||||
* terminal job stage (a job can reach `shipped` via autoship PATCH, the `ship`
|
||||
* operator action, or a lease release). Idempotent; returns the latest run.
|
||||
*/
|
||||
async function markLatestRunShipped(jobId: string): Promise<FleetRunDoc | undefined> {
|
||||
const runs = await repo.listRunsByJob(jobId);
|
||||
const latest = runs.reduce<FleetRunDoc | undefined>(
|
||||
(acc, r) => (!acc || r.attempt > acc.attempt ? r : acc),
|
||||
undefined
|
||||
);
|
||||
if (latest && latest.result !== 'shipped') {
|
||||
await repo.updateRun(latest.id, jobId, {
|
||||
result: 'shipped',
|
||||
...(latest.endedAt ? {} : { endedAt: new Date().toISOString() }),
|
||||
});
|
||||
}
|
||||
return latest;
|
||||
}
|
||||
|
||||
export async function patchJobFenced(
|
||||
jobId: string,
|
||||
productId: string,
|
||||
@ -638,22 +658,20 @@ export async function patchJobFenced(
|
||||
await maybeUnblockParent(job.parentId, productId);
|
||||
}
|
||||
|
||||
// Phase 3 budgets: accrue spend when a job ships (terminal success).
|
||||
// Flag-gated + best-effort — an accrual failure NEVER fails the transition.
|
||||
// Idempotent per run via `<jobId>:<leaseEpoch>` so duplicate ship transitions do
|
||||
// not double-count. Accrue the run's ACTUAL cost (insights.costUsd) so spentUsd
|
||||
// and the costBurndown (which also reads insights.costUsd) agree.
|
||||
if (patch.stage === 'shipped' && isBudgetsEnabled()) {
|
||||
// On ship (terminal success): mirror the outcome onto the run, and accrue spend.
|
||||
// Best-effort — never fail the transition. Budget accrual is flag-gated and
|
||||
// idempotent per run via `<jobId>:<leaseEpoch>`, using the run's ACTUAL cost
|
||||
// (insights.costUsd) so spentUsd and the costBurndown agree.
|
||||
if (patch.stage === 'shipped') {
|
||||
try {
|
||||
const runId = `${jobId}:${job.leaseEpoch}`;
|
||||
const runs = await repo.listRunsByJob(jobId);
|
||||
const latest = runs.reduce<FleetRunDoc | undefined>(
|
||||
(acc, r) => (!acc || r.attempt > acc.attempt ? r : acc),
|
||||
undefined
|
||||
);
|
||||
await accrueSpend(productId, latest?.insights?.costUsd ?? 0, runId);
|
||||
// Run-level result mirrors the terminal stage (ungated).
|
||||
const latest = await markLatestRunShipped(jobId);
|
||||
// Budgets (flag-gated): accrue the run's actual cost, idempotent per run.
|
||||
if (isBudgetsEnabled()) {
|
||||
await accrueSpend(productId, latest?.insights?.costUsd ?? 0, `${jobId}:${job.leaseEpoch}`);
|
||||
}
|
||||
} catch {
|
||||
// swallow — budget accounting is downstream of the job lifecycle
|
||||
// swallow — run/budget bookkeeping is downstream of the job lifecycle
|
||||
}
|
||||
}
|
||||
|
||||
@ -984,6 +1002,15 @@ export async function operatorAction(
|
||||
data: { action, leaseEpoch: newEpoch, returnedTo: stage },
|
||||
});
|
||||
|
||||
// Operator `ship` is a terminal success — mirror it onto the run result.
|
||||
if (stage === 'shipped') {
|
||||
try {
|
||||
await markLatestRunShipped(jobId);
|
||||
} catch {
|
||||
// best-effort — run bookkeeping never fails the operator action
|
||||
}
|
||||
}
|
||||
|
||||
return { ok: true, doc: res.doc };
|
||||
}
|
||||
return { ok: false, reason: 'conflict' };
|
||||
|
||||
Loading…
Reference in New Issue
Block a user