feat(fleet): enforce job.retry — auto-requeue, backoff, and dead-letter
job.retry (max / on / backoff) was persisted but never enforced: a failed attempt just went to `failed` and required a manual operator requeue. Now, when a factory releases a lease reporting a failure, the coordinator applies the policy: - retryable result (matches a retry.on class) with attempts remaining ⇒ requeue (queued, or blocked if deps are now unmet) with a retry backoff; - retryable but attempts exhausted ⇒ dead_letter; - no policy or non-retryable result (capability_mismatch/no_engine) ⇒ failed, exactly as before (behavior-preserving). Backoff is honored via a new job.retryNotBefore timestamp; the scheduler skips a queued job until it elapses (new pure isAwaitingRetryBackoff gate in selectJob). parseBackoffMs supports "<n>", "<n>s|m|h", "<n>ms", and "exp" (30s·2^(n-1), capped 1h). retry_scheduled / dead_letter audit events are emitted. decideFailureOutcome and parseBackoffMs are pure and unit-tested (retry.test.ts), plus scheduler-gate and end-to-end releaseLease coverage. Generated with [Devin](https://cli.devin.ai/docs) Co-Authored-By: Devin <158243242+devin-ai-integration[bot]@users.noreply.github.com>
This commit is contained in:
parent
68bfa3dbd8
commit
42d27d8a4f
@ -1018,6 +1018,79 @@ export async function renewLease(
|
||||
return { ok: true, doc: res.doc };
|
||||
}
|
||||
|
||||
/** Run results that denote a failed attempt (vs. a successful review/testing/shipped). */
|
||||
const FAILURE_RESULTS: readonly string[] = [
|
||||
'failed',
|
||||
'timeout',
|
||||
'verify_failed',
|
||||
'capability_mismatch',
|
||||
'no_engine',
|
||||
'retries_exhausted',
|
||||
];
|
||||
|
||||
/** Map a run result onto the retry classes (`retry.on`) it can satisfy. Deterministic
|
||||
* config failures (capability_mismatch/no_engine) map to nothing — retrying can't help. */
|
||||
function retryClassesForResult(result?: string): string[] {
|
||||
switch (result) {
|
||||
case 'timeout':
|
||||
return ['timeout'];
|
||||
case 'verify_failed':
|
||||
return ['verify_failed'];
|
||||
case 'failed':
|
||||
return ['agent_error', 'crash'];
|
||||
default:
|
||||
return [];
|
||||
}
|
||||
}
|
||||
|
||||
/** Parse a `retry.backoff` hint into milliseconds for the given (1-based) attempt.
|
||||
* Supports `"<n>"` (seconds), `"<n>s|m|h"`, and `"exp"`/`"exponential"` (30s·2^(n-1),
|
||||
* capped at 1h). Anything unparseable ⇒ 0 (immediate requeue). */
|
||||
export function parseBackoffMs(backoff: string | undefined, attempt: number): number {
|
||||
if (!backoff) return 0;
|
||||
const b = backoff.trim().toLowerCase();
|
||||
if (b === 'exp' || b === 'exponential') {
|
||||
const base = 30_000;
|
||||
return Math.min(base * 2 ** Math.max(0, attempt - 1), 60 * 60 * 1000);
|
||||
}
|
||||
const m = b.match(/^(\d+)\s*(ms|s|m|h)?$/);
|
||||
if (!m) return 0;
|
||||
const n = Number(m[1]);
|
||||
switch (m[2]) {
|
||||
case 'ms':
|
||||
return n;
|
||||
case 'm':
|
||||
return n * 60_000;
|
||||
case 'h':
|
||||
return n * 60 * 60_000;
|
||||
case 's':
|
||||
default:
|
||||
return n * 1_000; // bare number ⇒ seconds
|
||||
}
|
||||
}
|
||||
|
||||
export type FailureDecision =
|
||||
| { kind: 'retry'; backoffMs: number }
|
||||
| { kind: 'dead_letter' }
|
||||
| { kind: 'failed' };
|
||||
|
||||
/**
|
||||
* Decide what becomes of a job whose attempt failed with `result`, per its
|
||||
* `retry` policy: retry (when the result matches a `retry.on` class and attempts
|
||||
* remain), dead-letter (matching class but attempts exhausted), or plain failed
|
||||
* (no policy / non-retryable result — preserves the prior behavior). Pure.
|
||||
*/
|
||||
export function decideFailureOutcome(job: FleetJobDoc, result?: string): FailureDecision {
|
||||
const policy = job.retry;
|
||||
const classes = retryClassesForResult(result);
|
||||
const on = (policy?.on ?? []) as readonly string[];
|
||||
const retryable =
|
||||
!!policy && policy.max > 0 && classes.length > 0 && classes.some(c => on.includes(c));
|
||||
if (!retryable) return { kind: 'failed' };
|
||||
if (job.attempts >= policy!.max) return { kind: 'dead_letter' };
|
||||
return { kind: 'retry', backoffMs: parseBackoffMs(policy!.backoff, job.attempts) };
|
||||
}
|
||||
|
||||
export async function releaseLease(
|
||||
jobId: string,
|
||||
productId: string,
|
||||
@ -1038,7 +1111,49 @@ export async function releaseLease(
|
||||
if (!lease) return { ok: false, reason: 'not_found' };
|
||||
const res = await repo.revUpdateLease(jobId, lease.rev, { status: 'released' });
|
||||
if (!res.ok) return { ok: false, reason: 'conflict' };
|
||||
if (stage) await repo.revUpdateJob(jobId, productId, job.rev, { stage });
|
||||
|
||||
// A failed attempt (by reported result, or an explicit `failed` stage) is routed
|
||||
// through the retry policy: auto-requeue with backoff, dead-letter when attempts
|
||||
// are exhausted, else plain `failed` (no policy / non-retryable). Success stages
|
||||
// (review/testing/shipped) pass through unchanged.
|
||||
const isFailure = report?.result ? FAILURE_RESULTS.includes(report.result) : stage === 'failed';
|
||||
if (stage && isFailure) {
|
||||
const decision = decideFailureOutcome(job, report?.result ?? 'failed');
|
||||
if (decision.kind === 'retry') {
|
||||
const unmet = await unmetDeps(job);
|
||||
const nextStage: FleetStage = unmet.length > 0 ? 'blocked' : 'queued';
|
||||
const retryNotBefore = new Date(Date.now() + decision.backoffMs).toISOString();
|
||||
await repo.revUpdateJob(jobId, productId, job.rev, {
|
||||
stage: nextStage,
|
||||
retryNotBefore,
|
||||
blockedReason: unmet.length > 0 ? `waiting on: ${unmet.join(', ')}` : undefined,
|
||||
});
|
||||
await repo.appendEvent({
|
||||
jobId,
|
||||
productId,
|
||||
type: 'retry_scheduled',
|
||||
data: {
|
||||
attempt: job.attempts,
|
||||
max: job.retry?.max,
|
||||
result: report?.result ?? 'failed',
|
||||
retryNotBefore,
|
||||
returnedTo: nextStage,
|
||||
},
|
||||
});
|
||||
} else if (decision.kind === 'dead_letter') {
|
||||
await repo.revUpdateJob(jobId, productId, job.rev, { stage: 'dead_letter' });
|
||||
await repo.appendEvent({
|
||||
jobId,
|
||||
productId,
|
||||
type: 'dead_letter',
|
||||
data: { attempts: job.attempts, max: job.retry?.max, result: report?.result ?? 'failed' },
|
||||
});
|
||||
} else {
|
||||
await repo.revUpdateJob(jobId, productId, job.rev, { stage });
|
||||
}
|
||||
} else if (stage) {
|
||||
await repo.revUpdateJob(jobId, productId, job.rev, { stage });
|
||||
}
|
||||
// Record the factory's reported metrics + outcome + PR deliverable on the run.
|
||||
if (report?.insights || report?.result || report?.prUrl || report?.branch || report?.prState) {
|
||||
const runId = `${jobId}:run:${job.attempts}`;
|
||||
|
||||
154
services/platform-service/src/modules/fleet/retry.test.ts
Normal file
154
services/platform-service/src/modules/fleet/retry.test.ts
Normal file
@ -0,0 +1,154 @@
|
||||
/**
|
||||
* Fleet retry / dead-letter automation — enforces job.retry on a failed attempt:
|
||||
* auto-requeue (with backoff) while attempts remain, dead-letter when exhausted,
|
||||
* plain `failed` when there's no policy or the result isn't retryable.
|
||||
*/
|
||||
|
||||
import { afterEach, beforeEach, describe, expect, it } from 'vitest';
|
||||
import { MemoryDatastoreProvider } from '@bytelyst/datastore';
|
||||
import { _resetDatastoreProvider, setProvider } from '../../lib/datastore.js';
|
||||
import * as repo from './repository.js';
|
||||
import * as coord from './coordinator.js';
|
||||
import { parseBackoffMs, decideFailureOutcome } from './coordinator.js';
|
||||
import type { FleetJobDoc, SubmitJobInput } from './types.js';
|
||||
|
||||
const PID = 'lysnrai';
|
||||
|
||||
function input(over: Partial<SubmitJobInput> = {}): SubmitJobInput {
|
||||
return {
|
||||
idempotencyKey: 'task-1',
|
||||
bodyMd: '# do the thing',
|
||||
priority: 'medium',
|
||||
capabilities: [],
|
||||
prefersEngine: [],
|
||||
allowedScope: [],
|
||||
deps: [],
|
||||
kind: 'leaf',
|
||||
...over,
|
||||
};
|
||||
}
|
||||
|
||||
const factory = (over = {}) => ({
|
||||
productId: PID,
|
||||
factoryId: 'fac_1',
|
||||
capabilities: [] as string[],
|
||||
leaseSeconds: 900,
|
||||
...over,
|
||||
});
|
||||
|
||||
/** Claim the queued job (assigns it + creates the lease) and return its leaseEpoch. */
|
||||
async function claim() {
|
||||
const c = await coord.claimNextJob(factory());
|
||||
if (!c) throw new Error('expected a claim');
|
||||
return c;
|
||||
}
|
||||
|
||||
describe('parseBackoffMs', () => {
|
||||
it('parses seconds/minutes/hours and bare numbers', () => {
|
||||
expect(parseBackoffMs('30s', 1)).toBe(30_000);
|
||||
expect(parseBackoffMs('5m', 1)).toBe(5 * 60_000);
|
||||
expect(parseBackoffMs('1h', 1)).toBe(60 * 60_000);
|
||||
expect(parseBackoffMs('45', 1)).toBe(45_000); // bare ⇒ seconds
|
||||
expect(parseBackoffMs('250ms', 1)).toBe(250);
|
||||
});
|
||||
it('grows exponentially (capped at 1h) and defaults to 0', () => {
|
||||
expect(parseBackoffMs('exp', 1)).toBe(30_000);
|
||||
expect(parseBackoffMs('exp', 2)).toBe(60_000);
|
||||
expect(parseBackoffMs('exp', 99)).toBe(60 * 60 * 1000); // capped
|
||||
expect(parseBackoffMs(undefined, 1)).toBe(0);
|
||||
expect(parseBackoffMs('garbage', 1)).toBe(0);
|
||||
});
|
||||
});
|
||||
|
||||
describe('decideFailureOutcome (pure)', () => {
|
||||
const base = { attempts: 1 } as FleetJobDoc;
|
||||
it('no policy ⇒ failed', () => {
|
||||
expect(decideFailureOutcome({ ...base } as FleetJobDoc, 'timeout').kind).toBe('failed');
|
||||
});
|
||||
it('non-retryable result ⇒ failed even with a policy', () => {
|
||||
const job = { ...base, retry: { max: 3, on: ['timeout'] } } as FleetJobDoc;
|
||||
expect(decideFailureOutcome(job, 'no_engine').kind).toBe('failed');
|
||||
expect(decideFailureOutcome(job, 'capability_mismatch').kind).toBe('failed');
|
||||
});
|
||||
it('matching class with attempts left ⇒ retry', () => {
|
||||
const job = { ...base, attempts: 1, retry: { max: 3, on: ['timeout'] } } as FleetJobDoc;
|
||||
expect(decideFailureOutcome(job, 'timeout').kind).toBe('retry');
|
||||
});
|
||||
it('matching class but attempts exhausted ⇒ dead_letter', () => {
|
||||
const job = { ...base, attempts: 3, retry: { max: 3, on: ['timeout'] } } as FleetJobDoc;
|
||||
expect(decideFailureOutcome(job, 'timeout').kind).toBe('dead_letter');
|
||||
});
|
||||
it("generic 'failed' matches agent_error/crash classes", () => {
|
||||
const job = { ...base, attempts: 1, retry: { max: 2, on: ['agent_error'] } } as FleetJobDoc;
|
||||
expect(decideFailureOutcome(job, 'failed').kind).toBe('retry');
|
||||
});
|
||||
});
|
||||
|
||||
describe('fleet retry — releaseLease enforcement', () => {
|
||||
beforeEach(() => setProvider(new MemoryDatastoreProvider()));
|
||||
afterEach(() => _resetDatastoreProvider());
|
||||
|
||||
it('requeues a retryable failure and sets a backoff', async () => {
|
||||
const { job } = await coord.submitJob(PID, input({ retry: { max: 3, on: ['timeout'] } }));
|
||||
const c = await claim();
|
||||
const res = await coord.releaseLease(job.id, PID, c.job.leaseEpoch, 'failed', {
|
||||
result: 'timeout',
|
||||
});
|
||||
expect(res.ok).toBe(true);
|
||||
|
||||
const after = await repo.getJob(job.id, PID);
|
||||
expect(after?.stage).toBe('queued'); // back in the queue, not failed
|
||||
// No backoff configured ⇒ retryNotBefore is ~now (not in the future) ⇒ claimable.
|
||||
expect(Date.parse(after!.retryNotBefore!)).toBeLessThanOrEqual(Date.now());
|
||||
});
|
||||
|
||||
it('honors backoff: a requeued job is not claimable until retryNotBefore', async () => {
|
||||
const { job } = await coord.submitJob(
|
||||
PID,
|
||||
input({ retry: { max: 3, on: ['timeout'], backoff: '1h' } })
|
||||
);
|
||||
const c = await claim();
|
||||
await coord.releaseLease(job.id, PID, c.job.leaseEpoch, 'failed', { result: 'timeout' });
|
||||
|
||||
const after = await repo.getJob(job.id, PID);
|
||||
expect(after?.stage).toBe('queued');
|
||||
expect(after?.retryNotBefore).toBeTruthy();
|
||||
expect(Date.parse(after!.retryNotBefore!)).toBeGreaterThan(Date.now());
|
||||
|
||||
// The scheduler must NOT hand it out while backing off.
|
||||
expect(await coord.claimNextJob(factory())).toBeNull();
|
||||
});
|
||||
|
||||
it('dead-letters once attempts are exhausted', async () => {
|
||||
const { job } = await coord.submitJob(PID, input({ retry: { max: 1, on: ['timeout'] } }));
|
||||
const c = await claim(); // attempts becomes 1 (== max)
|
||||
await coord.releaseLease(job.id, PID, c.job.leaseEpoch, 'failed', { result: 'timeout' });
|
||||
|
||||
const after = await repo.getJob(job.id, PID);
|
||||
expect(after?.stage).toBe('dead_letter');
|
||||
});
|
||||
|
||||
it('a non-retryable failure stays failed (no policy)', async () => {
|
||||
const { job } = await coord.submitJob(PID, input());
|
||||
const c = await claim();
|
||||
await coord.releaseLease(job.id, PID, c.job.leaseEpoch, 'failed', { result: 'failed' });
|
||||
expect((await repo.getJob(job.id, PID))?.stage).toBe('failed');
|
||||
});
|
||||
|
||||
it('a successful release is unaffected by retry logic', async () => {
|
||||
const { job } = await coord.submitJob(PID, input({ retry: { max: 3, on: ['timeout'] } }));
|
||||
const c = await claim();
|
||||
await coord.releaseLease(job.id, PID, c.job.leaseEpoch, 'review', { result: 'review' });
|
||||
expect((await repo.getJob(job.id, PID))?.stage).toBe('review');
|
||||
});
|
||||
|
||||
it('an immediate-backoff retry is claimable again on the next attempt', async () => {
|
||||
const { job } = await coord.submitJob(PID, input({ retry: { max: 3, on: ['timeout'] } }));
|
||||
const c1 = await claim();
|
||||
await coord.releaseLease(job.id, PID, c1.job.leaseEpoch, 'failed', { result: 'timeout' });
|
||||
// backoff unset ⇒ retryNotBefore ~= now ⇒ immediately re-claimable
|
||||
const c2 = await coord.claimNextJob(factory());
|
||||
expect(c2?.job.id).toBe(job.id);
|
||||
expect(c2?.job.attempts).toBe(2);
|
||||
});
|
||||
});
|
||||
@ -10,6 +10,7 @@ import {
|
||||
selectJob,
|
||||
capabilitiesSubset,
|
||||
engineEligible,
|
||||
isAwaitingRetryBackoff,
|
||||
type SchedulerContext,
|
||||
type SchedulerFactory,
|
||||
} from './scheduler.js';
|
||||
@ -127,6 +128,22 @@ describe('scheduler §7 — concrete-engine routing gate', () => {
|
||||
});
|
||||
});
|
||||
|
||||
describe('scheduler §7 — retry backoff gate', () => {
|
||||
it('isAwaitingRetryBackoff: future retryNotBefore is backing off, past/absent is not', () => {
|
||||
expect(isAwaitingRetryBackoff(job({ id: 'j' }), NOW)).toBe(false);
|
||||
expect(isAwaitingRetryBackoff(job({ id: 'j', retryNotBefore: iso(-60_000) }), NOW)).toBe(true);
|
||||
expect(isAwaitingRetryBackoff(job({ id: 'j', retryNotBefore: iso(60_000) }), NOW)).toBe(false);
|
||||
});
|
||||
|
||||
it('selectJob skips a queued job still serving its retry backoff', () => {
|
||||
const backingOff = job({ id: 'boff', retryNotBefore: iso(-5 * 60_000) }); // 5 min in the future
|
||||
expect(selectJob([backingOff], fac(), ctx())).toBeNull();
|
||||
// Once the backoff has elapsed it becomes claimable again.
|
||||
const elapsedCtx = ctx({ now: NOW + 10 * 60_000 });
|
||||
expect(selectJob([backingOff], fac(), elapsedCtx)?.id).toBe('boff');
|
||||
});
|
||||
});
|
||||
|
||||
describe('scheduler §7 — priority + age tie-breaks (all else equal)', () => {
|
||||
it('priority dominates when scores tie', () => {
|
||||
const low = job({ id: 'low', priority: 'low' });
|
||||
|
||||
@ -156,6 +156,14 @@ export function engineEligible(job: FleetJobDoc, available: string[]): boolean {
|
||||
return available.includes(`engine:${job.engine}`);
|
||||
}
|
||||
|
||||
/** True while a job is serving a retry backoff (`retryNotBefore` in the future):
|
||||
* it is queued but must not be claimed yet. No `retryNotBefore` ⇒ never backing off. */
|
||||
export function isAwaitingRetryBackoff(job: FleetJobDoc, now: number): boolean {
|
||||
if (!job.retryNotBefore) return false;
|
||||
const t = Date.parse(job.retryNotBefore);
|
||||
return !Number.isNaN(t) && t > now;
|
||||
}
|
||||
|
||||
function overlaps(a: readonly string[], b: readonly string[]): boolean {
|
||||
if (a.length === 0 || b.length === 0) return false;
|
||||
const set = new Set(b);
|
||||
@ -387,6 +395,7 @@ export function selectJob(
|
||||
const eligible = candidates.filter(
|
||||
job =>
|
||||
(job.stage === 'queued' || job.stage === 'blocked') &&
|
||||
!isAwaitingRetryBackoff(job, ctx.now) &&
|
||||
capabilitiesSubset(job.capabilities ?? [], factory.capabilities) &&
|
||||
engineEligible(job, factory.capabilities) &&
|
||||
depsSatisfied(job)
|
||||
|
||||
@ -207,6 +207,12 @@ export const FleetJobDocSchema = z.object({
|
||||
leaseEpoch: z.number().int().nonnegative().default(0),
|
||||
rev: z.number().int().nonnegative().default(0),
|
||||
blockedReason: z.string().optional(),
|
||||
/**
|
||||
* Earliest time this job may be claimed again (ISO). Set by the retry policy
|
||||
* when a failed attempt is auto-requeued with a backoff; the scheduler skips a
|
||||
* queued job until `now >= retryNotBefore`. Absent ⇒ immediately claimable.
|
||||
*/
|
||||
retryNotBefore: z.string().optional(),
|
||||
/**
|
||||
* Multi-reviewer human gate state (§14 Phase 3). Both are present only while a
|
||||
* job is (or has been) in `review`; `requestReview` (re)sets them.
|
||||
|
||||
Loading…
Reference in New Issue
Block a user