feat: complete operator job actions (requeue/reject/cancel)

Adds lease-free operator lifecycle control to the fleet control plane:
- coordinator.operatorAction() fences any current factory holder by bumping
  leaseEpoch (mirrors the reaper), preserves checkpoint, and routes the job:
  requeue -> queued (or blocked if deps unmet), reject -> dead_letter,
  cancel -> failed. Shipped jobs are terminal (invalid_state).
- POST /fleet/jobs/:id/actions/:action route (400 on unknown action)
- fleet-client.operatorAction() + Requeue/Cancel/Reject buttons on job detail
- Tests: +5 coordinator, +1 routes, +2 fleet-client (platform fleet 140 green,
  tracker-web 212 green)

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
This commit is contained in:
Saravanakumar D 2026-05-30 18:12:11 -07:00
parent 0f903b935a
commit 283383561c
7 changed files with 281 additions and 6 deletions

View File

@ -15,6 +15,7 @@ import {
listJobs,
getJob,
patchJob,
operatorAction,
getJobRuns,
getJobEvents,
getJobArtifacts,
@ -76,6 +77,27 @@ describe('fleet-client', () => {
});
});
describe('operatorAction', () => {
it('sends POST to /jobs/:id/actions/:action', async () => {
fetchSpy.mockResolvedValue({ id: 'j1', stage: 'queued' });
const res = await operatorAction('j1', 'requeue');
expect(res.stage).toBe('queued');
expect(fetchSpy).toHaveBeenCalledWith(
'/jobs/j1/actions/requeue',
expect.objectContaining({ method: 'POST' })
);
});
it('supports reject and cancel actions', async () => {
fetchSpy.mockResolvedValue({ id: 'j1', stage: 'dead_letter' });
await operatorAction('j1', 'reject');
expect(fetchSpy).toHaveBeenCalledWith(
'/jobs/j1/actions/reject',
expect.objectContaining({ method: 'POST' })
);
});
});
describe('getJobRuns', () => {
it('returns runs array', async () => {
fetchSpy.mockResolvedValue({ runs: [{ id: 'r1', attempt: 1 }] });

View File

@ -13,6 +13,8 @@ import {
getJobArtifacts,
getJobDag,
patchJob,
operatorAction,
type OperatorAction,
type FleetJob,
type FleetRun,
type FleetEvent,
@ -32,6 +34,7 @@ export default function FleetJobDetailPage() {
const [dag, setDag] = useState<DagNode | null>(null);
const [loading, setLoading] = useState(true);
const [shipping, setShipping] = useState(false);
const [acting, setActing] = useState<OperatorAction | null>(null);
const refresh = useCallback(async () => {
try {
@ -72,6 +75,20 @@ export default function FleetJobDetailPage() {
}
};
const handleAction = async (action: OperatorAction) => {
if (!job) return;
setActing(action);
try {
const updated = await operatorAction(jobId, action);
setJob(updated);
await refresh();
} catch {
/* show error in production */
} finally {
setActing(null);
}
};
if (loading) {
return (
<div className="p-6">
@ -97,11 +114,43 @@ export default function FleetJobDetailPage() {
<div className="p-6 space-y-8">
<div className="flex items-center justify-between">
<PageHeader title={job.idempotencyKey} />
{job.stage !== 'shipped' && job.stage !== 'failed' && (
<Button onClick={handleShip} disabled={shipping} aria-label="Ship this job">
{shipping ? 'Shipping...' : 'Ship ✓'}
</Button>
)}
<div className="flex items-center gap-2">
{job.stage !== 'shipped' && job.stage !== 'failed' && (
<Button onClick={handleShip} disabled={shipping} aria-label="Ship this job">
{shipping ? 'Shipping...' : 'Ship ✓'}
</Button>
)}
{job.stage !== 'shipped' && (
<Button
variant="secondary"
onClick={() => handleAction('requeue')}
disabled={acting !== null}
aria-label="Requeue this job"
>
{acting === 'requeue' ? 'Requeuing...' : 'Requeue'}
</Button>
)}
{job.stage !== 'shipped' && job.stage !== 'failed' && job.stage !== 'dead_letter' && (
<>
<Button
variant="destructive"
onClick={() => handleAction('cancel')}
disabled={acting !== null}
aria-label="Cancel this job"
>
{acting === 'cancel' ? 'Cancelling...' : 'Cancel'}
</Button>
<Button
variant="destructive"
onClick={() => handleAction('reject')}
disabled={acting !== null}
aria-label="Reject this job to dead letter"
>
{acting === 'reject' ? 'Rejecting...' : 'Reject'}
</Button>
</>
)}
</div>
</div>
{/* Job metadata */}

View File

@ -141,6 +141,16 @@ export async function patchJob(
return apiFetch(`/jobs/${id}`, { method: 'PATCH', body: JSON.stringify(body) });
}
export type OperatorAction = 'requeue' | 'reject' | 'cancel';
/**
* Operator-initiated lifecycle action (no lease required). The coordinator
* fences any current factory holder by bumping the lease epoch.
*/
export async function operatorAction(id: string, action: OperatorAction): Promise<FleetJob> {
return apiFetch(`/jobs/${id}/actions/${action}`, { method: 'POST' });
}
export async function getJobRuns(jobId: string): Promise<{ runs: FleetRun[] }> {
return apiFetch(`/jobs/${jobId}/runs`);
}

View File

@ -743,4 +743,81 @@ describe('fleet coordinator — Phase 3 per-product budgets', () => {
const claim = await coord.claimNextJob(factory({ productId: OTHER }));
expect(claim).not.toBeNull();
});
// ── Phase 3: OPERATOR ACTIONS (requeue / reject / cancel) ──
it('operator requeue: building job returns to queued, bumps epoch, fences the worker, preserves checkpoint', async () => {
const { job } = await coord.submitJob(PID, input());
const claim = await coord.claimNextJob(factory());
const epoch0 = claim!.job.leaseEpoch;
await coord.patchJobFenced(job.id, PID, {
leaseEpoch: epoch0,
stage: 'building',
checkpoint: { wipBranch: 'aq/wip/op', wipBase: 'b1', wipCommit: 'c1' },
});
const res = await coord.operatorAction(job.id, PID, 'requeue');
expect(res.ok).toBe(true);
const after = await repo.getJob(job.id, PID);
expect(after?.stage).toBe('queued');
expect(after?.leaseEpoch).toBe(epoch0 + 1); // fenced
expect(after?.checkpoint?.wipBranch).toBe('aq/wip/op'); // preserved
expect((await repo.getLease(job.id))?.status).toBe('released');
// the displaced worker (old epoch) can no longer write
const zombie = await coord.patchJobFenced(job.id, PID, {
leaseEpoch: epoch0,
stage: 'shipped',
});
expect(zombie.ok).toBe(false);
if (!zombie.ok) expect(zombie.reason).toBe('fenced');
});
it('operator requeue: a job with unmet deps returns to blocked, not queued', async () => {
await coord.submitJob(PID, input({ idempotencyKey: 'dep' }));
const { job } = await coord.submitJob(PID, input({ idempotencyKey: 'child', deps: ['dep'] }));
expect(job.stage).toBe('blocked'); // dep is still queued, not done
const res = await coord.operatorAction(job.id, PID, 'requeue');
expect(res.ok).toBe(true);
const after = await repo.getJob(job.id, PID);
expect(after?.stage).toBe('blocked');
expect(after?.blockedReason).toContain('dep');
});
it('operator reject: job moves to dead_letter', async () => {
const { job } = await coord.submitJob(PID, input());
await coord.claimNextJob(factory());
const res = await coord.operatorAction(job.id, PID, 'reject');
expect(res.ok).toBe(true);
expect((await repo.getJob(job.id, PID))?.stage).toBe('dead_letter');
});
it('operator cancel: job moves to failed and emits an operator_action event', async () => {
const { job } = await coord.submitJob(PID, input());
await coord.claimNextJob(factory());
const res = await coord.operatorAction(job.id, PID, 'cancel');
expect(res.ok).toBe(true);
expect((await repo.getJob(job.id, PID))?.stage).toBe('failed');
const events = await repo.listEvents(job.id);
const op = events.find(e => e.type === 'operator_action');
expect(op?.data.action).toBe('cancel');
expect(op?.actor).toBe('operator');
});
it('operator action: shipped job is terminal (invalid_state); unknown job is not_found', async () => {
const { job } = await coord.submitJob(PID, input());
const claim = await coord.claimNextJob(factory());
await coord.patchJobFenced(job.id, PID, {
leaseEpoch: claim!.job.leaseEpoch,
stage: 'shipped',
});
const shipped = await coord.operatorAction(job.id, PID, 'requeue');
expect(shipped.ok).toBe(false);
if (!shipped.ok) expect(shipped.reason).toBe('invalid_state');
const missing = await coord.operatorAction('nope', PID, 'requeue');
expect(missing.ok).toBe(false);
if (!missing.ok) expect(missing.reason).toBe('not_found');
});
});

View File

@ -828,6 +828,77 @@ export async function releaseLease(
return { ok: true, doc: res.doc };
}
// ── Operator actions (§14 Phase 3 — approve/ship/reject/requeue) ──────────────
export const OPERATOR_ACTIONS = ['requeue', 'reject', 'cancel'] as const;
export type OperatorActionKind = (typeof OPERATOR_ACTIONS)[number];
export type OperatorActionResult =
| { ok: true; doc: FleetJobDoc }
| { ok: false; reason: 'not_found' | 'conflict' | 'invalid_state' };
/**
* Operator-initiated lifecycle action from the control plane. Unlike
* `patchJobFenced`, the operator does NOT hold a lease so this bumps the
* `leaseEpoch` to FENCE any current factory holder (a late report from the
* displaced worker is rejected by `fenced()`), mirroring the reaper (§25.3).
*
* - `requeue`: return the job to `queued` (or `blocked` if deps unmet) for a
* fresh claim. The checkpoint pointer is preserved (resume-friendly).
* - `reject`: move the job to `dead_letter` (operator gives up on it).
* - `cancel`: move the job to `failed` (stop work, not a dead letter).
*
* A successfully shipped job is terminal and cannot be acted on.
*/
export async function operatorAction(
jobId: string,
productId: string,
action: OperatorActionKind
): Promise<OperatorActionResult> {
const job = await repo.getJob(jobId, productId);
if (!job) return { ok: false, reason: 'not_found' };
if (job.stage === 'shipped') return { ok: false, reason: 'invalid_state' };
const newEpoch = job.leaseEpoch + 1; // fence any current holder
let stage: FleetStage;
let blockedReason: string | undefined;
if (action === 'requeue') {
const unmet = await unmetDeps(job);
stage = unmet.length > 0 ? 'blocked' : 'queued';
blockedReason = unmet.length > 0 ? `waiting on: ${unmet.join(', ')}` : undefined;
} else if (action === 'reject') {
stage = 'dead_letter';
} else {
stage = 'failed';
}
const res = await repo.revUpdateJob(jobId, productId, job.rev, {
stage,
leaseEpoch: newEpoch,
blockedReason,
});
if (!res.ok) {
return { ok: false, reason: res.reason === 'not_found' ? 'not_found' : 'conflict' };
}
// Free the seat: release any still-held lease (best-effort, fenced by newEpoch).
const lease = await repo.getLease(jobId);
if (lease && lease.status === 'held') {
await repo.revUpdateLease(jobId, lease.rev, { status: 'released' });
}
await repo.appendEvent({
jobId,
productId,
type: 'operator_action',
actor: 'operator',
data: { action, leaseEpoch: newEpoch, returnedTo: stage },
});
return { ok: true, doc: res.doc };
}
// ── Heartbeat (§8) ────────────────────────────────────────────────────────────
export interface HeartbeatContext {

View File

@ -141,4 +141,28 @@ describe('fleetRoutes', () => {
const res = await app.inject({ method: 'GET', url: '/api/fleet/jobs/nope' });
expect(res.statusCode).toBe(404);
});
it('operator action: requeue returns a claimed job to queued; unknown action is 400', async () => {
const app = await buildApp();
const sub = await submit(app, { idempotencyKey: 'k1', bodyMd: '# task' });
const jobId = JSON.parse(sub.body).job.id as string;
await app.inject({
method: 'POST',
url: '/api/fleet/claim',
payload: { factoryId: 'fac_1', capabilities: [] },
});
const requeue = await app.inject({
method: 'POST',
url: `/api/fleet/jobs/${jobId}/actions/requeue`,
});
expect(requeue.statusCode).toBe(200);
expect(JSON.parse(requeue.body).stage).toBe('queued');
const bad = await app.inject({
method: 'POST',
url: `/api/fleet/jobs/${jobId}/actions/explode`,
});
expect(bad.statusCode).toBe(400);
});
});

View File

@ -110,7 +110,29 @@ export async function fleetRoutes(app: FastifyInstance) {
return res.doc;
});
// ── Atomic claim ──
// ── Operator actions (requeue / reject / cancel) ──
// Operator-initiated, lease-free lifecycle control from the fleet control plane.
// Fences any current factory holder by bumping the lease epoch (§14 Phase 3).
app.post('/fleet/jobs/:id/actions/:action', async req => {
await extractAuth(req);
const { id, action } = req.params as { id: string; action: string };
const pid = getRequestProductId(req);
if (!(coordinator.OPERATOR_ACTIONS as readonly string[]).includes(action)) {
throw new BadRequestError(
`unknown operator action '${action}' (expected: ${coordinator.OPERATOR_ACTIONS.join(', ')})`
);
}
const res = await coordinator.operatorAction(id, pid, action as coordinator.OperatorActionKind);
if (!res.ok) {
if (res.reason === 'not_found') throw new NotFoundError('Job not found');
if (res.reason === 'invalid_state') {
throw new ConflictError('job is shipped (terminal) — operator action not allowed');
}
throw new ConflictError('concurrent update conflict — retry');
}
await trackerBridge.maybeEchoOnTransition(pid, id, req.log);
return res.doc;
});
app.post('/fleet/claim', async req => {
await extractAuth(req);
const parsed = ClaimSchema.safeParse(req.body);