fix: harden operator action lease release + idempotent terminal actions

- release lease with fenced epoch (leaseEpoch+1, clear holder) so a stale
  renewal cannot resurrect a held lease after operator displacement
- reject on dead_letter / cancel on failed are now idempotent no-ops
  (no epoch bump, no duplicate event)
- add coordinator test for terminal idempotency

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
This commit is contained in:
Saravanakumar D 2026-05-30 18:15:26 -07:00
parent 283383561c
commit c8ab43d3ae
2 changed files with 29 additions and 2 deletions

View File

@ -820,4 +820,21 @@ describe('fleet coordinator — Phase 3 per-product budgets', () => {
expect(missing.ok).toBe(false);
if (!missing.ok) expect(missing.reason).toBe('not_found');
});
it('operator reject/cancel are idempotent on already-terminal jobs (no epoch bump)', async () => {
const { job } = await coord.submitJob(PID, input());
await coord.claimNextJob(factory());
await coord.operatorAction(job.id, PID, 'reject');
const afterFirst = await repo.getJob(job.id, PID);
const epochAfterReject = afterFirst!.leaseEpoch;
// re-reject is a no-op: stage stays dead_letter, epoch unchanged, no new event
const again = await coord.operatorAction(job.id, PID, 'reject');
expect(again.ok).toBe(true);
const afterSecond = await repo.getJob(job.id, PID);
expect(afterSecond?.stage).toBe('dead_letter');
expect(afterSecond?.leaseEpoch).toBe(epochAfterReject);
const rejectEvents = (await repo.listEvents(job.id)).filter(e => e.type === 'operator_action');
expect(rejectEvents).toHaveLength(1);
});
});

View File

@ -859,6 +859,11 @@ export async function operatorAction(
if (!job) return { ok: false, reason: 'not_found' };
if (job.stage === 'shipped') return { ok: false, reason: 'invalid_state' };
// Idempotent terminal actions: re-rejecting a dead_letter / re-cancelling a
// failed job is a no-op (don't bump the epoch or append a duplicate event).
if (action === 'reject' && job.stage === 'dead_letter') return { ok: true, doc: job };
if (action === 'cancel' && job.stage === 'failed') return { ok: true, doc: job };
const newEpoch = job.leaseEpoch + 1; // fence any current holder
let stage: FleetStage;
@ -882,10 +887,15 @@ export async function operatorAction(
return { ok: false, reason: res.reason === 'not_found' ? 'not_found' : 'conflict' };
}
// Free the seat: release any still-held lease (best-effort, fenced by newEpoch).
// Free the seat: release any still-held lease, fencing it with the new epoch so
// a stale renewal cannot resurrect it (mirrors the reaper, §25.3).
const lease = await repo.getLease(jobId);
if (lease && lease.status === 'held') {
await repo.revUpdateLease(jobId, lease.rev, { status: 'released' });
await repo.revUpdateLease(jobId, lease.rev, {
status: 'released',
leaseEpoch: newEpoch,
holderFactoryId: undefined,
});
}
await repo.appendEvent({