From 283383561c88b3ffff3ec68db6aa60056b46426f Mon Sep 17 00:00:00 2001 From: Saravanakumar D Date: Sat, 30 May 2026 18:12:11 -0700 Subject: [PATCH] feat: complete operator job actions (requeue/reject/cancel) Adds lease-free operator lifecycle control to the fleet control plane: - coordinator.operatorAction() fences any current factory holder by bumping leaseEpoch (mirrors the reaper), preserves checkpoint, and routes the job: requeue -> queued (or blocked if deps unmet), reject -> dead_letter, cancel -> failed. Shipped jobs are terminal (invalid_state). - POST /fleet/jobs/:id/actions/:action route (400 on unknown action) - fleet-client.operatorAction() + Requeue/Cancel/Reject buttons on job detail - Tests: +5 coordinator, +1 routes, +2 fleet-client (platform fleet 140 green, tracker-web 212 green) Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../src/__tests__/fleet-client.test.ts | 22 ++++++ .../app/dashboard/fleet/jobs/[id]/page.tsx | 59 ++++++++++++-- .../tracker-web/src/lib/fleet-client.ts | 10 +++ .../src/modules/fleet/coordinator.test.ts | 77 +++++++++++++++++++ .../src/modules/fleet/coordinator.ts | 71 +++++++++++++++++ .../src/modules/fleet/routes.test.ts | 24 ++++++ .../src/modules/fleet/routes.ts | 24 +++++- 7 files changed, 281 insertions(+), 6 deletions(-) diff --git a/dashboards/tracker-web/src/__tests__/fleet-client.test.ts b/dashboards/tracker-web/src/__tests__/fleet-client.test.ts index 1ff89021..53c067cf 100644 --- a/dashboards/tracker-web/src/__tests__/fleet-client.test.ts +++ b/dashboards/tracker-web/src/__tests__/fleet-client.test.ts @@ -15,6 +15,7 @@ import { listJobs, getJob, patchJob, + operatorAction, getJobRuns, getJobEvents, getJobArtifacts, @@ -76,6 +77,27 @@ describe('fleet-client', () => { }); }); + describe('operatorAction', () => { + it('sends POST to /jobs/:id/actions/:action', async () => { + fetchSpy.mockResolvedValue({ id: 'j1', stage: 'queued' }); + const res = await operatorAction('j1', 'requeue'); + expect(res.stage).toBe('queued'); + expect(fetchSpy).toHaveBeenCalledWith( + '/jobs/j1/actions/requeue', + expect.objectContaining({ method: 'POST' }) + ); + }); + + it('supports reject and cancel actions', async () => { + fetchSpy.mockResolvedValue({ id: 'j1', stage: 'dead_letter' }); + await operatorAction('j1', 'reject'); + expect(fetchSpy).toHaveBeenCalledWith( + '/jobs/j1/actions/reject', + expect.objectContaining({ method: 'POST' }) + ); + }); + }); + describe('getJobRuns', () => { it('returns runs array', async () => { fetchSpy.mockResolvedValue({ runs: [{ id: 'r1', attempt: 1 }] }); diff --git a/dashboards/tracker-web/src/app/dashboard/fleet/jobs/[id]/page.tsx b/dashboards/tracker-web/src/app/dashboard/fleet/jobs/[id]/page.tsx index 2909428f..b6f8603b 100644 --- a/dashboards/tracker-web/src/app/dashboard/fleet/jobs/[id]/page.tsx +++ b/dashboards/tracker-web/src/app/dashboard/fleet/jobs/[id]/page.tsx @@ -13,6 +13,8 @@ import { getJobArtifacts, getJobDag, patchJob, + operatorAction, + type OperatorAction, type FleetJob, type FleetRun, type FleetEvent, @@ -32,6 +34,7 @@ export default function FleetJobDetailPage() { const [dag, setDag] = useState(null); const [loading, setLoading] = useState(true); const [shipping, setShipping] = useState(false); + const [acting, setActing] = useState(null); const refresh = useCallback(async () => { try { @@ -72,6 +75,20 @@ export default function FleetJobDetailPage() { } }; + const handleAction = async (action: OperatorAction) => { + if (!job) return; + setActing(action); + try { + const updated = await operatorAction(jobId, action); + setJob(updated); + await refresh(); + } catch { + /* show error in production */ + } finally { + setActing(null); + } + }; + if (loading) { return (
@@ -97,11 +114,43 @@ export default function FleetJobDetailPage() {
- {job.stage !== 'shipped' && job.stage !== 'failed' && ( - - )} +
+ {job.stage !== 'shipped' && job.stage !== 'failed' && ( + + )} + {job.stage !== 'shipped' && ( + + )} + {job.stage !== 'shipped' && job.stage !== 'failed' && job.stage !== 'dead_letter' && ( + <> + + + + )} +
{/* Job metadata */} diff --git a/dashboards/tracker-web/src/lib/fleet-client.ts b/dashboards/tracker-web/src/lib/fleet-client.ts index 3ba0fbb1..46f512d9 100644 --- a/dashboards/tracker-web/src/lib/fleet-client.ts +++ b/dashboards/tracker-web/src/lib/fleet-client.ts @@ -141,6 +141,16 @@ export async function patchJob( return apiFetch(`/jobs/${id}`, { method: 'PATCH', body: JSON.stringify(body) }); } +export type OperatorAction = 'requeue' | 'reject' | 'cancel'; + +/** + * Operator-initiated lifecycle action (no lease required). The coordinator + * fences any current factory holder by bumping the lease epoch. + */ +export async function operatorAction(id: string, action: OperatorAction): Promise { + return apiFetch(`/jobs/${id}/actions/${action}`, { method: 'POST' }); +} + export async function getJobRuns(jobId: string): Promise<{ runs: FleetRun[] }> { return apiFetch(`/jobs/${jobId}/runs`); } diff --git a/services/platform-service/src/modules/fleet/coordinator.test.ts b/services/platform-service/src/modules/fleet/coordinator.test.ts index 35dc4bfb..38272583 100644 --- a/services/platform-service/src/modules/fleet/coordinator.test.ts +++ b/services/platform-service/src/modules/fleet/coordinator.test.ts @@ -743,4 +743,81 @@ describe('fleet coordinator — Phase 3 per-product budgets', () => { const claim = await coord.claimNextJob(factory({ productId: OTHER })); expect(claim).not.toBeNull(); }); + + // ── Phase 3: OPERATOR ACTIONS (requeue / reject / cancel) ── + it('operator requeue: building job returns to queued, bumps epoch, fences the worker, preserves checkpoint', async () => { + const { job } = await coord.submitJob(PID, input()); + const claim = await coord.claimNextJob(factory()); + const epoch0 = claim!.job.leaseEpoch; + await coord.patchJobFenced(job.id, PID, { + leaseEpoch: epoch0, + stage: 'building', + checkpoint: { wipBranch: 'aq/wip/op', wipBase: 'b1', wipCommit: 'c1' }, + }); + + const res = await coord.operatorAction(job.id, PID, 'requeue'); + expect(res.ok).toBe(true); + + const after = await repo.getJob(job.id, PID); + expect(after?.stage).toBe('queued'); + expect(after?.leaseEpoch).toBe(epoch0 + 1); // fenced + expect(after?.checkpoint?.wipBranch).toBe('aq/wip/op'); // preserved + expect((await repo.getLease(job.id))?.status).toBe('released'); + + // the displaced worker (old epoch) can no longer write + const zombie = await coord.patchJobFenced(job.id, PID, { + leaseEpoch: epoch0, + stage: 'shipped', + }); + expect(zombie.ok).toBe(false); + if (!zombie.ok) expect(zombie.reason).toBe('fenced'); + }); + + it('operator requeue: a job with unmet deps returns to blocked, not queued', async () => { + await coord.submitJob(PID, input({ idempotencyKey: 'dep' })); + const { job } = await coord.submitJob(PID, input({ idempotencyKey: 'child', deps: ['dep'] })); + expect(job.stage).toBe('blocked'); // dep is still queued, not done + + const res = await coord.operatorAction(job.id, PID, 'requeue'); + expect(res.ok).toBe(true); + const after = await repo.getJob(job.id, PID); + expect(after?.stage).toBe('blocked'); + expect(after?.blockedReason).toContain('dep'); + }); + + it('operator reject: job moves to dead_letter', async () => { + const { job } = await coord.submitJob(PID, input()); + await coord.claimNextJob(factory()); + const res = await coord.operatorAction(job.id, PID, 'reject'); + expect(res.ok).toBe(true); + expect((await repo.getJob(job.id, PID))?.stage).toBe('dead_letter'); + }); + + it('operator cancel: job moves to failed and emits an operator_action event', async () => { + const { job } = await coord.submitJob(PID, input()); + await coord.claimNextJob(factory()); + const res = await coord.operatorAction(job.id, PID, 'cancel'); + expect(res.ok).toBe(true); + expect((await repo.getJob(job.id, PID))?.stage).toBe('failed'); + const events = await repo.listEvents(job.id); + const op = events.find(e => e.type === 'operator_action'); + expect(op?.data.action).toBe('cancel'); + expect(op?.actor).toBe('operator'); + }); + + it('operator action: shipped job is terminal (invalid_state); unknown job is not_found', async () => { + const { job } = await coord.submitJob(PID, input()); + const claim = await coord.claimNextJob(factory()); + await coord.patchJobFenced(job.id, PID, { + leaseEpoch: claim!.job.leaseEpoch, + stage: 'shipped', + }); + const shipped = await coord.operatorAction(job.id, PID, 'requeue'); + expect(shipped.ok).toBe(false); + if (!shipped.ok) expect(shipped.reason).toBe('invalid_state'); + + const missing = await coord.operatorAction('nope', PID, 'requeue'); + expect(missing.ok).toBe(false); + if (!missing.ok) expect(missing.reason).toBe('not_found'); + }); }); diff --git a/services/platform-service/src/modules/fleet/coordinator.ts b/services/platform-service/src/modules/fleet/coordinator.ts index 604ad2c7..8df0689d 100644 --- a/services/platform-service/src/modules/fleet/coordinator.ts +++ b/services/platform-service/src/modules/fleet/coordinator.ts @@ -828,6 +828,77 @@ export async function releaseLease( return { ok: true, doc: res.doc }; } +// ── Operator actions (§14 Phase 3 — approve/ship/reject/requeue) ────────────── + +export const OPERATOR_ACTIONS = ['requeue', 'reject', 'cancel'] as const; +export type OperatorActionKind = (typeof OPERATOR_ACTIONS)[number]; + +export type OperatorActionResult = + | { ok: true; doc: FleetJobDoc } + | { ok: false; reason: 'not_found' | 'conflict' | 'invalid_state' }; + +/** + * Operator-initiated lifecycle action from the control plane. Unlike + * `patchJobFenced`, the operator does NOT hold a lease — so this bumps the + * `leaseEpoch` to FENCE any current factory holder (a late report from the + * displaced worker is rejected by `fenced()`), mirroring the reaper (§25.3). + * + * - `requeue`: return the job to `queued` (or `blocked` if deps unmet) for a + * fresh claim. The checkpoint pointer is preserved (resume-friendly). + * - `reject`: move the job to `dead_letter` (operator gives up on it). + * - `cancel`: move the job to `failed` (stop work, not a dead letter). + * + * A successfully shipped job is terminal and cannot be acted on. + */ +export async function operatorAction( + jobId: string, + productId: string, + action: OperatorActionKind +): Promise { + const job = await repo.getJob(jobId, productId); + if (!job) return { ok: false, reason: 'not_found' }; + if (job.stage === 'shipped') return { ok: false, reason: 'invalid_state' }; + + const newEpoch = job.leaseEpoch + 1; // fence any current holder + + let stage: FleetStage; + let blockedReason: string | undefined; + if (action === 'requeue') { + const unmet = await unmetDeps(job); + stage = unmet.length > 0 ? 'blocked' : 'queued'; + blockedReason = unmet.length > 0 ? `waiting on: ${unmet.join(', ')}` : undefined; + } else if (action === 'reject') { + stage = 'dead_letter'; + } else { + stage = 'failed'; + } + + const res = await repo.revUpdateJob(jobId, productId, job.rev, { + stage, + leaseEpoch: newEpoch, + blockedReason, + }); + if (!res.ok) { + return { ok: false, reason: res.reason === 'not_found' ? 'not_found' : 'conflict' }; + } + + // Free the seat: release any still-held lease (best-effort, fenced by newEpoch). + const lease = await repo.getLease(jobId); + if (lease && lease.status === 'held') { + await repo.revUpdateLease(jobId, lease.rev, { status: 'released' }); + } + + await repo.appendEvent({ + jobId, + productId, + type: 'operator_action', + actor: 'operator', + data: { action, leaseEpoch: newEpoch, returnedTo: stage }, + }); + + return { ok: true, doc: res.doc }; +} + // ── Heartbeat (§8) ──────────────────────────────────────────────────────────── export interface HeartbeatContext { diff --git a/services/platform-service/src/modules/fleet/routes.test.ts b/services/platform-service/src/modules/fleet/routes.test.ts index b0aa7380..de2f7e87 100644 --- a/services/platform-service/src/modules/fleet/routes.test.ts +++ b/services/platform-service/src/modules/fleet/routes.test.ts @@ -141,4 +141,28 @@ describe('fleetRoutes', () => { const res = await app.inject({ method: 'GET', url: '/api/fleet/jobs/nope' }); expect(res.statusCode).toBe(404); }); + + it('operator action: requeue returns a claimed job to queued; unknown action is 400', async () => { + const app = await buildApp(); + const sub = await submit(app, { idempotencyKey: 'k1', bodyMd: '# task' }); + const jobId = JSON.parse(sub.body).job.id as string; + await app.inject({ + method: 'POST', + url: '/api/fleet/claim', + payload: { factoryId: 'fac_1', capabilities: [] }, + }); + + const requeue = await app.inject({ + method: 'POST', + url: `/api/fleet/jobs/${jobId}/actions/requeue`, + }); + expect(requeue.statusCode).toBe(200); + expect(JSON.parse(requeue.body).stage).toBe('queued'); + + const bad = await app.inject({ + method: 'POST', + url: `/api/fleet/jobs/${jobId}/actions/explode`, + }); + expect(bad.statusCode).toBe(400); + }); }); diff --git a/services/platform-service/src/modules/fleet/routes.ts b/services/platform-service/src/modules/fleet/routes.ts index ce8affd7..0c608d03 100644 --- a/services/platform-service/src/modules/fleet/routes.ts +++ b/services/platform-service/src/modules/fleet/routes.ts @@ -110,7 +110,29 @@ export async function fleetRoutes(app: FastifyInstance) { return res.doc; }); - // ── Atomic claim ── + // ── Operator actions (requeue / reject / cancel) ── + // Operator-initiated, lease-free lifecycle control from the fleet control plane. + // Fences any current factory holder by bumping the lease epoch (§14 Phase 3). + app.post('/fleet/jobs/:id/actions/:action', async req => { + await extractAuth(req); + const { id, action } = req.params as { id: string; action: string }; + const pid = getRequestProductId(req); + if (!(coordinator.OPERATOR_ACTIONS as readonly string[]).includes(action)) { + throw new BadRequestError( + `unknown operator action '${action}' (expected: ${coordinator.OPERATOR_ACTIONS.join(', ')})` + ); + } + const res = await coordinator.operatorAction(id, pid, action as coordinator.OperatorActionKind); + if (!res.ok) { + if (res.reason === 'not_found') throw new NotFoundError('Job not found'); + if (res.reason === 'invalid_state') { + throw new ConflictError('job is shipped (terminal) — operator action not allowed'); + } + throw new ConflictError('concurrent update conflict — retry'); + } + await trackerBridge.maybeEchoOnTransition(pid, id, req.log); + return res.doc; + }); app.post('/fleet/claim', async req => { await extractAuth(req); const parsed = ClaimSchema.safeParse(req.body);