feat(fleet): operator re-drive for dead-letter jobs + dead-letter alert/UI
Closes the loop on the retry automation — a job that exhausts its retries lands in dead_letter with no way to recover it: - New `redrive` operator action: requeues the job AND grants a fresh retry budget by anchoring a new `attemptsBase` to the current `attempts` (and clearing any retryNotBefore backoff). `attempts` stays monotonic so run ids never collide; a plain `requeue` leaves the budget exhausted and would instantly re-dead-letter. The retry policy now measures used budget as `attempts - attemptsBase`. - fleetMetrics raises a `dead_letter` warning alert when any job is dead-lettered. - tracker-web: a "Re-drive" button on dead_letter/failed jobs; the timeline already renders the retry_scheduled / dead_letter / pr_merged / pr_merge_failed / factory_stale events generically. Backward compatible: attemptsBase defaults to 0 and old docs without it read as 0. Generated with [Devin](https://cli.devin.ai/docs) Co-Authored-By: Devin <158243242+devin-ai-integration[bot]@users.noreply.github.com>
This commit is contained in:
parent
9e0afc23d2
commit
a6adaee835
@ -229,6 +229,16 @@ export default function FleetJobDetailPage() {
|
||||
{acting === 'requeue' ? 'Requeuing...' : 'Requeue'}
|
||||
</Button>
|
||||
)}
|
||||
{(job.stage === 'dead_letter' || job.stage === 'failed') && (
|
||||
<Button
|
||||
onClick={() => handleAction('redrive')}
|
||||
disabled={acting !== null}
|
||||
aria-label="Re-drive this job with a fresh retry budget"
|
||||
title="Requeue with a fresh retry budget (resets the attempt counter)"
|
||||
>
|
||||
{acting === 'redrive' ? 'Re-driving...' : 'Re-drive ↻'}
|
||||
</Button>
|
||||
)}
|
||||
{job.stage !== 'shipped' && job.stage !== 'failed' && job.stage !== 'dead_letter' && (
|
||||
<>
|
||||
<Button
|
||||
|
||||
@ -260,7 +260,7 @@ export async function patchJob(id: string, body: PatchJobBody): Promise<FleetJob
|
||||
return apiFetch(`/jobs/${id}`, { method: 'PATCH', body: JSON.stringify(body) });
|
||||
}
|
||||
|
||||
export type OperatorAction = 'requeue' | 'reject' | 'cancel' | 'ship';
|
||||
export type OperatorAction = 'requeue' | 'reject' | 'cancel' | 'ship' | 'redrive';
|
||||
|
||||
/**
|
||||
* Operator-initiated lifecycle action (no lease required). The coordinator
|
||||
|
||||
@ -228,6 +228,7 @@ export async function submitJob(productId: string, input: SubmitJobInput): Promi
|
||||
verify: input.verify,
|
||||
autoMerge: input.autoMerge,
|
||||
attempts: 0,
|
||||
attemptsBase: 0,
|
||||
leaseEpoch: 0,
|
||||
rev: 0,
|
||||
createdAt: now,
|
||||
@ -290,6 +291,7 @@ export async function submitJob(productId: string, input: SubmitJobInput): Promi
|
||||
kind: 'leaf',
|
||||
parentId: id,
|
||||
attempts: 0,
|
||||
attemptsBase: 0,
|
||||
leaseEpoch: 0,
|
||||
rev: 0,
|
||||
createdAt: childNow,
|
||||
@ -960,6 +962,7 @@ export async function submitChildren(
|
||||
kind: 'leaf',
|
||||
parentId,
|
||||
attempts: 0,
|
||||
attemptsBase: 0,
|
||||
leaseEpoch: 0,
|
||||
rev: 0,
|
||||
createdAt: now,
|
||||
@ -1114,8 +1117,10 @@ export function decideFailureOutcome(job: FleetJobDoc, result?: string): Failure
|
||||
const retryable =
|
||||
!!policy && policy.max > 0 && classes.length > 0 && classes.some(c => on.includes(c));
|
||||
if (!retryable) return { kind: 'failed' };
|
||||
if (job.attempts >= policy!.max) return { kind: 'dead_letter' };
|
||||
return { kind: 'retry', backoffMs: parseBackoffMs(policy!.backoff, job.attempts) };
|
||||
// Budget is consumed since the last redrive: attempts - attemptsBase.
|
||||
const used = job.attempts - (job.attemptsBase ?? 0);
|
||||
if (used >= policy!.max) return { kind: 'dead_letter' };
|
||||
return { kind: 'retry', backoffMs: parseBackoffMs(policy!.backoff, used) };
|
||||
}
|
||||
|
||||
export async function releaseLease(
|
||||
@ -1202,7 +1207,7 @@ export async function releaseLease(
|
||||
|
||||
// ── Operator actions (§14 Phase 3 — approve/ship/reject/requeue) ──────────────
|
||||
|
||||
export const OPERATOR_ACTIONS = ['requeue', 'reject', 'cancel', 'ship'] as const;
|
||||
export const OPERATOR_ACTIONS = ['requeue', 'reject', 'cancel', 'ship', 'redrive'] as const;
|
||||
export type OperatorActionKind = (typeof OPERATOR_ACTIONS)[number];
|
||||
|
||||
/** Bounded retries for the operator-action optimistic-concurrency loop. */
|
||||
@ -1220,6 +1225,11 @@ export type OperatorActionResult =
|
||||
*
|
||||
* - `requeue`: return the job to `queued` (or `blocked` if deps unmet) for a
|
||||
* fresh claim. The checkpoint pointer is preserved (resume-friendly).
|
||||
* - `redrive`: like `requeue`, but ALSO grants a FRESH retry budget by anchoring
|
||||
* `attemptsBase` to the current `attempts` and clearing any `retryNotBefore`
|
||||
* backoff. (`attempts` stays monotonic so run ids never collide.) This is how
|
||||
* an operator re-drives a `dead_letter`/`failed` job — a plain requeue leaves
|
||||
* the budget exhausted, so the job would immediately dead-letter again.
|
||||
* - `reject`: move the job to `dead_letter` (operator gives up on it).
|
||||
* - `cancel`: move the job to `failed` (stop work, not a dead letter).
|
||||
*
|
||||
@ -1248,7 +1258,10 @@ export async function operatorAction(
|
||||
|
||||
let stage: FleetStage;
|
||||
let blockedReason: string | undefined;
|
||||
if (action === 'requeue') {
|
||||
// `redrive` additionally resets the retry budget so the job gets a full set of
|
||||
// attempts again (a plain requeue keeps attempts at max → instant re-dead-letter).
|
||||
const resetRetry = action === 'redrive';
|
||||
if (action === 'requeue' || action === 'redrive') {
|
||||
const unmet = await unmetDeps(job);
|
||||
stage = unmet.length > 0 ? 'blocked' : 'queued';
|
||||
blockedReason = unmet.length > 0 ? `waiting on: ${unmet.join(', ')}` : undefined;
|
||||
@ -1266,6 +1279,7 @@ export async function operatorAction(
|
||||
stage,
|
||||
leaseEpoch: newEpoch,
|
||||
blockedReason,
|
||||
...(resetRetry ? { attemptsBase: job.attempts, retryNotBefore: undefined } : {}),
|
||||
});
|
||||
if (!res.ok) {
|
||||
if (res.reason === 'not_found') return { ok: false, reason: 'not_found' };
|
||||
@ -1859,6 +1873,14 @@ export async function fleetMetrics(
|
||||
message: `${stale} factory(ies) have not sent a heartbeat within ${Math.round(staleMaxAgeMs / 1000)}s.`,
|
||||
});
|
||||
}
|
||||
const deadLetter = byStage.dead_letter ?? 0;
|
||||
if (deadLetter > 0) {
|
||||
alerts.push({
|
||||
level: 'warning',
|
||||
code: 'dead_letter',
|
||||
message: `${deadLetter} job(s) in dead_letter — exhausted retries or rejected; needs operator triage.`,
|
||||
});
|
||||
}
|
||||
|
||||
return {
|
||||
productId,
|
||||
|
||||
@ -151,4 +151,54 @@ describe('fleet retry — releaseLease enforcement', () => {
|
||||
expect(c2?.job.id).toBe(job.id);
|
||||
expect(c2?.job.attempts).toBe(2);
|
||||
});
|
||||
|
||||
it('operator redrive grants a dead_letter job a FRESH retry budget (queued, no backoff)', async () => {
|
||||
const { job } = await coord.submitJob(PID, input({ retry: { max: 2, on: ['timeout'] } }));
|
||||
// Burn the whole budget: two attempts → dead_letter.
|
||||
const c1 = await claim();
|
||||
await coord.releaseLease(job.id, PID, c1.job.leaseEpoch, 'failed', { result: 'timeout' });
|
||||
const c2 = await coord.claimNextJob(factory());
|
||||
await coord.releaseLease(job.id, PID, c2!.job.leaseEpoch, 'failed', { result: 'timeout' });
|
||||
expect((await repo.getJob(job.id, PID))?.stage).toBe('dead_letter');
|
||||
|
||||
const res = await coord.operatorAction(job.id, PID, 'redrive');
|
||||
expect(res.ok).toBe(true);
|
||||
const after = await repo.getJob(job.id, PID);
|
||||
expect(after?.stage).toBe('queued');
|
||||
expect(after?.attemptsBase).toBe(after?.attempts); // budget anchored to now
|
||||
expect(after?.retryNotBefore).toBeUndefined(); // claimable immediately
|
||||
|
||||
// Fresh budget: the next failure RETRIES (used 1 < max 2) instead of dead-lettering,
|
||||
// and the re-claim doesn't collide on run id (attempts stays monotonic).
|
||||
const c3 = await coord.claimNextJob(factory());
|
||||
expect(c3?.job.id).toBe(job.id);
|
||||
expect(c3!.job.attempts).toBe(3); // monotonic
|
||||
await coord.releaseLease(job.id, PID, c3!.job.leaseEpoch, 'failed', { result: 'timeout' });
|
||||
expect((await repo.getJob(job.id, PID))?.stage).toBe('queued');
|
||||
});
|
||||
|
||||
it('a plain requeue leaves the budget exhausted (re-dead-letters); redrive resets it', async () => {
|
||||
const { job } = await coord.submitJob(PID, input({ retry: { max: 1, on: ['timeout'] } }));
|
||||
const c1 = await claim();
|
||||
await coord.releaseLease(job.id, PID, c1.job.leaseEpoch, 'failed', { result: 'timeout' });
|
||||
// requeue: budget still exhausted → one more attempt re-dead-letters.
|
||||
await coord.operatorAction(job.id, PID, 'requeue');
|
||||
const c2 = await coord.claimNextJob(factory());
|
||||
await coord.releaseLease(job.id, PID, c2!.job.leaseEpoch, 'failed', { result: 'timeout' });
|
||||
expect((await repo.getJob(job.id, PID))?.stage).toBe('dead_letter');
|
||||
// redrive: anchors the budget so the next attempt is fresh again.
|
||||
await coord.operatorAction(job.id, PID, 'redrive');
|
||||
const j = await repo.getJob(job.id, PID);
|
||||
expect(j?.attemptsBase).toBe(j?.attempts);
|
||||
expect(j?.stage).toBe('queued');
|
||||
});
|
||||
|
||||
it('fleetMetrics raises a dead_letter alert when jobs are dead-lettered', async () => {
|
||||
const { job } = await coord.submitJob(PID, input({ retry: { max: 1, on: ['timeout'] } }));
|
||||
const c = await claim();
|
||||
await coord.releaseLease(job.id, PID, c.job.leaseEpoch, 'failed', { result: 'timeout' });
|
||||
const metrics = await coord.fleetMetrics(PID);
|
||||
expect(metrics.jobs.byStage.dead_letter).toBe(1);
|
||||
expect(metrics.alerts.map(a => a.code)).toContain('dead_letter');
|
||||
});
|
||||
});
|
||||
|
||||
@ -204,6 +204,13 @@ export const FleetJobDocSchema = z.object({
|
||||
autoMerge: z.boolean().optional(),
|
||||
checkpoint: CheckpointSchema.optional(),
|
||||
attempts: z.number().int().nonnegative().default(0),
|
||||
/**
|
||||
* Attempt count at which the CURRENT retry budget started (operator `redrive`
|
||||
* sets this to `attempts` to grant a fresh budget). The retry policy compares
|
||||
* `attempts - attemptsBase` against `retry.max`, so `attempts` stays monotonic
|
||||
* (run ids are keyed by it) while the budget can be reset. Default 0.
|
||||
*/
|
||||
attemptsBase: z.number().int().nonnegative().default(0),
|
||||
leaseEpoch: z.number().int().nonnegative().default(0),
|
||||
rev: z.number().int().nonnegative().default(0),
|
||||
blockedReason: z.string().optional(),
|
||||
|
||||
Loading…
Reference in New Issue
Block a user