From 42d27d8a4ffcfc84303291d1474c3c5b35d1f2fa Mon Sep 17 00:00:00 2001 From: saravanakumardb1 Date: Mon, 1 Jun 2026 11:40:33 -0700 Subject: [PATCH] =?UTF-8?q?feat(fleet):=20enforce=20job.retry=20=E2=80=94?= =?UTF-8?q?=20auto-requeue,=20backoff,=20and=20dead-letter?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit job.retry (max / on / backoff) was persisted but never enforced: a failed attempt just went to `failed` and required a manual operator requeue. Now, when a factory releases a lease reporting a failure, the coordinator applies the policy: - retryable result (matches a retry.on class) with attempts remaining ⇒ requeue (queued, or blocked if deps are now unmet) with a retry backoff; - retryable but attempts exhausted ⇒ dead_letter; - no policy or non-retryable result (capability_mismatch/no_engine) ⇒ failed, exactly as before (behavior-preserving). Backoff is honored via a new job.retryNotBefore timestamp; the scheduler skips a queued job until it elapses (new pure isAwaitingRetryBackoff gate in selectJob). parseBackoffMs supports "", "s|m|h", "ms", and "exp" (30s·2^(n-1), capped 1h). retry_scheduled / dead_letter audit events are emitted. decideFailureOutcome and parseBackoffMs are pure and unit-tested (retry.test.ts), plus scheduler-gate and end-to-end releaseLease coverage. Generated with [Devin](https://cli.devin.ai/docs) Co-Authored-By: Devin <158243242+devin-ai-integration[bot]@users.noreply.github.com> --- .../src/modules/fleet/coordinator.ts | 117 ++++++++++++- .../src/modules/fleet/retry.test.ts | 154 ++++++++++++++++++ .../src/modules/fleet/scheduler.test.ts | 17 ++ .../src/modules/fleet/scheduler.ts | 9 + .../src/modules/fleet/types.ts | 6 + 5 files changed, 302 insertions(+), 1 deletion(-) create mode 100644 services/platform-service/src/modules/fleet/retry.test.ts diff --git a/services/platform-service/src/modules/fleet/coordinator.ts b/services/platform-service/src/modules/fleet/coordinator.ts index 0f016dff..463978f2 100644 --- a/services/platform-service/src/modules/fleet/coordinator.ts +++ b/services/platform-service/src/modules/fleet/coordinator.ts @@ -1018,6 +1018,79 @@ export async function renewLease( return { ok: true, doc: res.doc }; } +/** Run results that denote a failed attempt (vs. a successful review/testing/shipped). */ +const FAILURE_RESULTS: readonly string[] = [ + 'failed', + 'timeout', + 'verify_failed', + 'capability_mismatch', + 'no_engine', + 'retries_exhausted', +]; + +/** Map a run result onto the retry classes (`retry.on`) it can satisfy. Deterministic + * config failures (capability_mismatch/no_engine) map to nothing — retrying can't help. */ +function retryClassesForResult(result?: string): string[] { + switch (result) { + case 'timeout': + return ['timeout']; + case 'verify_failed': + return ['verify_failed']; + case 'failed': + return ['agent_error', 'crash']; + default: + return []; + } +} + +/** Parse a `retry.backoff` hint into milliseconds for the given (1-based) attempt. + * Supports `""` (seconds), `"s|m|h"`, and `"exp"`/`"exponential"` (30s·2^(n-1), + * capped at 1h). Anything unparseable ⇒ 0 (immediate requeue). */ +export function parseBackoffMs(backoff: string | undefined, attempt: number): number { + if (!backoff) return 0; + const b = backoff.trim().toLowerCase(); + if (b === 'exp' || b === 'exponential') { + const base = 30_000; + return Math.min(base * 2 ** Math.max(0, attempt - 1), 60 * 60 * 1000); + } + const m = b.match(/^(\d+)\s*(ms|s|m|h)?$/); + if (!m) return 0; + const n = Number(m[1]); + switch (m[2]) { + case 'ms': + return n; + case 'm': + return n * 60_000; + case 'h': + return n * 60 * 60_000; + case 's': + default: + return n * 1_000; // bare number ⇒ seconds + } +} + +export type FailureDecision = + | { kind: 'retry'; backoffMs: number } + | { kind: 'dead_letter' } + | { kind: 'failed' }; + +/** + * Decide what becomes of a job whose attempt failed with `result`, per its + * `retry` policy: retry (when the result matches a `retry.on` class and attempts + * remain), dead-letter (matching class but attempts exhausted), or plain failed + * (no policy / non-retryable result — preserves the prior behavior). Pure. + */ +export function decideFailureOutcome(job: FleetJobDoc, result?: string): FailureDecision { + const policy = job.retry; + const classes = retryClassesForResult(result); + const on = (policy?.on ?? []) as readonly string[]; + const retryable = + !!policy && policy.max > 0 && classes.length > 0 && classes.some(c => on.includes(c)); + if (!retryable) return { kind: 'failed' }; + if (job.attempts >= policy!.max) return { kind: 'dead_letter' }; + return { kind: 'retry', backoffMs: parseBackoffMs(policy!.backoff, job.attempts) }; +} + export async function releaseLease( jobId: string, productId: string, @@ -1038,7 +1111,49 @@ export async function releaseLease( if (!lease) return { ok: false, reason: 'not_found' }; const res = await repo.revUpdateLease(jobId, lease.rev, { status: 'released' }); if (!res.ok) return { ok: false, reason: 'conflict' }; - if (stage) await repo.revUpdateJob(jobId, productId, job.rev, { stage }); + + // A failed attempt (by reported result, or an explicit `failed` stage) is routed + // through the retry policy: auto-requeue with backoff, dead-letter when attempts + // are exhausted, else plain `failed` (no policy / non-retryable). Success stages + // (review/testing/shipped) pass through unchanged. + const isFailure = report?.result ? FAILURE_RESULTS.includes(report.result) : stage === 'failed'; + if (stage && isFailure) { + const decision = decideFailureOutcome(job, report?.result ?? 'failed'); + if (decision.kind === 'retry') { + const unmet = await unmetDeps(job); + const nextStage: FleetStage = unmet.length > 0 ? 'blocked' : 'queued'; + const retryNotBefore = new Date(Date.now() + decision.backoffMs).toISOString(); + await repo.revUpdateJob(jobId, productId, job.rev, { + stage: nextStage, + retryNotBefore, + blockedReason: unmet.length > 0 ? `waiting on: ${unmet.join(', ')}` : undefined, + }); + await repo.appendEvent({ + jobId, + productId, + type: 'retry_scheduled', + data: { + attempt: job.attempts, + max: job.retry?.max, + result: report?.result ?? 'failed', + retryNotBefore, + returnedTo: nextStage, + }, + }); + } else if (decision.kind === 'dead_letter') { + await repo.revUpdateJob(jobId, productId, job.rev, { stage: 'dead_letter' }); + await repo.appendEvent({ + jobId, + productId, + type: 'dead_letter', + data: { attempts: job.attempts, max: job.retry?.max, result: report?.result ?? 'failed' }, + }); + } else { + await repo.revUpdateJob(jobId, productId, job.rev, { stage }); + } + } else if (stage) { + await repo.revUpdateJob(jobId, productId, job.rev, { stage }); + } // Record the factory's reported metrics + outcome + PR deliverable on the run. if (report?.insights || report?.result || report?.prUrl || report?.branch || report?.prState) { const runId = `${jobId}:run:${job.attempts}`; diff --git a/services/platform-service/src/modules/fleet/retry.test.ts b/services/platform-service/src/modules/fleet/retry.test.ts new file mode 100644 index 00000000..e11538c4 --- /dev/null +++ b/services/platform-service/src/modules/fleet/retry.test.ts @@ -0,0 +1,154 @@ +/** + * Fleet retry / dead-letter automation — enforces job.retry on a failed attempt: + * auto-requeue (with backoff) while attempts remain, dead-letter when exhausted, + * plain `failed` when there's no policy or the result isn't retryable. + */ + +import { afterEach, beforeEach, describe, expect, it } from 'vitest'; +import { MemoryDatastoreProvider } from '@bytelyst/datastore'; +import { _resetDatastoreProvider, setProvider } from '../../lib/datastore.js'; +import * as repo from './repository.js'; +import * as coord from './coordinator.js'; +import { parseBackoffMs, decideFailureOutcome } from './coordinator.js'; +import type { FleetJobDoc, SubmitJobInput } from './types.js'; + +const PID = 'lysnrai'; + +function input(over: Partial = {}): SubmitJobInput { + return { + idempotencyKey: 'task-1', + bodyMd: '# do the thing', + priority: 'medium', + capabilities: [], + prefersEngine: [], + allowedScope: [], + deps: [], + kind: 'leaf', + ...over, + }; +} + +const factory = (over = {}) => ({ + productId: PID, + factoryId: 'fac_1', + capabilities: [] as string[], + leaseSeconds: 900, + ...over, +}); + +/** Claim the queued job (assigns it + creates the lease) and return its leaseEpoch. */ +async function claim() { + const c = await coord.claimNextJob(factory()); + if (!c) throw new Error('expected a claim'); + return c; +} + +describe('parseBackoffMs', () => { + it('parses seconds/minutes/hours and bare numbers', () => { + expect(parseBackoffMs('30s', 1)).toBe(30_000); + expect(parseBackoffMs('5m', 1)).toBe(5 * 60_000); + expect(parseBackoffMs('1h', 1)).toBe(60 * 60_000); + expect(parseBackoffMs('45', 1)).toBe(45_000); // bare ⇒ seconds + expect(parseBackoffMs('250ms', 1)).toBe(250); + }); + it('grows exponentially (capped at 1h) and defaults to 0', () => { + expect(parseBackoffMs('exp', 1)).toBe(30_000); + expect(parseBackoffMs('exp', 2)).toBe(60_000); + expect(parseBackoffMs('exp', 99)).toBe(60 * 60 * 1000); // capped + expect(parseBackoffMs(undefined, 1)).toBe(0); + expect(parseBackoffMs('garbage', 1)).toBe(0); + }); +}); + +describe('decideFailureOutcome (pure)', () => { + const base = { attempts: 1 } as FleetJobDoc; + it('no policy ⇒ failed', () => { + expect(decideFailureOutcome({ ...base } as FleetJobDoc, 'timeout').kind).toBe('failed'); + }); + it('non-retryable result ⇒ failed even with a policy', () => { + const job = { ...base, retry: { max: 3, on: ['timeout'] } } as FleetJobDoc; + expect(decideFailureOutcome(job, 'no_engine').kind).toBe('failed'); + expect(decideFailureOutcome(job, 'capability_mismatch').kind).toBe('failed'); + }); + it('matching class with attempts left ⇒ retry', () => { + const job = { ...base, attempts: 1, retry: { max: 3, on: ['timeout'] } } as FleetJobDoc; + expect(decideFailureOutcome(job, 'timeout').kind).toBe('retry'); + }); + it('matching class but attempts exhausted ⇒ dead_letter', () => { + const job = { ...base, attempts: 3, retry: { max: 3, on: ['timeout'] } } as FleetJobDoc; + expect(decideFailureOutcome(job, 'timeout').kind).toBe('dead_letter'); + }); + it("generic 'failed' matches agent_error/crash classes", () => { + const job = { ...base, attempts: 1, retry: { max: 2, on: ['agent_error'] } } as FleetJobDoc; + expect(decideFailureOutcome(job, 'failed').kind).toBe('retry'); + }); +}); + +describe('fleet retry — releaseLease enforcement', () => { + beforeEach(() => setProvider(new MemoryDatastoreProvider())); + afterEach(() => _resetDatastoreProvider()); + + it('requeues a retryable failure and sets a backoff', async () => { + const { job } = await coord.submitJob(PID, input({ retry: { max: 3, on: ['timeout'] } })); + const c = await claim(); + const res = await coord.releaseLease(job.id, PID, c.job.leaseEpoch, 'failed', { + result: 'timeout', + }); + expect(res.ok).toBe(true); + + const after = await repo.getJob(job.id, PID); + expect(after?.stage).toBe('queued'); // back in the queue, not failed + // No backoff configured ⇒ retryNotBefore is ~now (not in the future) ⇒ claimable. + expect(Date.parse(after!.retryNotBefore!)).toBeLessThanOrEqual(Date.now()); + }); + + it('honors backoff: a requeued job is not claimable until retryNotBefore', async () => { + const { job } = await coord.submitJob( + PID, + input({ retry: { max: 3, on: ['timeout'], backoff: '1h' } }) + ); + const c = await claim(); + await coord.releaseLease(job.id, PID, c.job.leaseEpoch, 'failed', { result: 'timeout' }); + + const after = await repo.getJob(job.id, PID); + expect(after?.stage).toBe('queued'); + expect(after?.retryNotBefore).toBeTruthy(); + expect(Date.parse(after!.retryNotBefore!)).toBeGreaterThan(Date.now()); + + // The scheduler must NOT hand it out while backing off. + expect(await coord.claimNextJob(factory())).toBeNull(); + }); + + it('dead-letters once attempts are exhausted', async () => { + const { job } = await coord.submitJob(PID, input({ retry: { max: 1, on: ['timeout'] } })); + const c = await claim(); // attempts becomes 1 (== max) + await coord.releaseLease(job.id, PID, c.job.leaseEpoch, 'failed', { result: 'timeout' }); + + const after = await repo.getJob(job.id, PID); + expect(after?.stage).toBe('dead_letter'); + }); + + it('a non-retryable failure stays failed (no policy)', async () => { + const { job } = await coord.submitJob(PID, input()); + const c = await claim(); + await coord.releaseLease(job.id, PID, c.job.leaseEpoch, 'failed', { result: 'failed' }); + expect((await repo.getJob(job.id, PID))?.stage).toBe('failed'); + }); + + it('a successful release is unaffected by retry logic', async () => { + const { job } = await coord.submitJob(PID, input({ retry: { max: 3, on: ['timeout'] } })); + const c = await claim(); + await coord.releaseLease(job.id, PID, c.job.leaseEpoch, 'review', { result: 'review' }); + expect((await repo.getJob(job.id, PID))?.stage).toBe('review'); + }); + + it('an immediate-backoff retry is claimable again on the next attempt', async () => { + const { job } = await coord.submitJob(PID, input({ retry: { max: 3, on: ['timeout'] } })); + const c1 = await claim(); + await coord.releaseLease(job.id, PID, c1.job.leaseEpoch, 'failed', { result: 'timeout' }); + // backoff unset ⇒ retryNotBefore ~= now ⇒ immediately re-claimable + const c2 = await coord.claimNextJob(factory()); + expect(c2?.job.id).toBe(job.id); + expect(c2?.job.attempts).toBe(2); + }); +}); diff --git a/services/platform-service/src/modules/fleet/scheduler.test.ts b/services/platform-service/src/modules/fleet/scheduler.test.ts index 2f53c583..9b58be3e 100644 --- a/services/platform-service/src/modules/fleet/scheduler.test.ts +++ b/services/platform-service/src/modules/fleet/scheduler.test.ts @@ -10,6 +10,7 @@ import { selectJob, capabilitiesSubset, engineEligible, + isAwaitingRetryBackoff, type SchedulerContext, type SchedulerFactory, } from './scheduler.js'; @@ -127,6 +128,22 @@ describe('scheduler §7 — concrete-engine routing gate', () => { }); }); +describe('scheduler §7 — retry backoff gate', () => { + it('isAwaitingRetryBackoff: future retryNotBefore is backing off, past/absent is not', () => { + expect(isAwaitingRetryBackoff(job({ id: 'j' }), NOW)).toBe(false); + expect(isAwaitingRetryBackoff(job({ id: 'j', retryNotBefore: iso(-60_000) }), NOW)).toBe(true); + expect(isAwaitingRetryBackoff(job({ id: 'j', retryNotBefore: iso(60_000) }), NOW)).toBe(false); + }); + + it('selectJob skips a queued job still serving its retry backoff', () => { + const backingOff = job({ id: 'boff', retryNotBefore: iso(-5 * 60_000) }); // 5 min in the future + expect(selectJob([backingOff], fac(), ctx())).toBeNull(); + // Once the backoff has elapsed it becomes claimable again. + const elapsedCtx = ctx({ now: NOW + 10 * 60_000 }); + expect(selectJob([backingOff], fac(), elapsedCtx)?.id).toBe('boff'); + }); +}); + describe('scheduler §7 — priority + age tie-breaks (all else equal)', () => { it('priority dominates when scores tie', () => { const low = job({ id: 'low', priority: 'low' }); diff --git a/services/platform-service/src/modules/fleet/scheduler.ts b/services/platform-service/src/modules/fleet/scheduler.ts index 703de200..2da1e6c3 100644 --- a/services/platform-service/src/modules/fleet/scheduler.ts +++ b/services/platform-service/src/modules/fleet/scheduler.ts @@ -156,6 +156,14 @@ export function engineEligible(job: FleetJobDoc, available: string[]): boolean { return available.includes(`engine:${job.engine}`); } +/** True while a job is serving a retry backoff (`retryNotBefore` in the future): + * it is queued but must not be claimed yet. No `retryNotBefore` ⇒ never backing off. */ +export function isAwaitingRetryBackoff(job: FleetJobDoc, now: number): boolean { + if (!job.retryNotBefore) return false; + const t = Date.parse(job.retryNotBefore); + return !Number.isNaN(t) && t > now; +} + function overlaps(a: readonly string[], b: readonly string[]): boolean { if (a.length === 0 || b.length === 0) return false; const set = new Set(b); @@ -387,6 +395,7 @@ export function selectJob( const eligible = candidates.filter( job => (job.stage === 'queued' || job.stage === 'blocked') && + !isAwaitingRetryBackoff(job, ctx.now) && capabilitiesSubset(job.capabilities ?? [], factory.capabilities) && engineEligible(job, factory.capabilities) && depsSatisfied(job) diff --git a/services/platform-service/src/modules/fleet/types.ts b/services/platform-service/src/modules/fleet/types.ts index 30c480af..f7e9659b 100644 --- a/services/platform-service/src/modules/fleet/types.ts +++ b/services/platform-service/src/modules/fleet/types.ts @@ -207,6 +207,12 @@ export const FleetJobDocSchema = z.object({ leaseEpoch: z.number().int().nonnegative().default(0), rev: z.number().int().nonnegative().default(0), blockedReason: z.string().optional(), + /** + * Earliest time this job may be claimed again (ISO). Set by the retry policy + * when a failed attempt is auto-requeued with a backoff; the scheduler skips a + * queued job until `now >= retryNotBefore`. Absent ⇒ immediately claimable. + */ + retryNotBefore: z.string().optional(), /** * Multi-reviewer human gate state (§14 Phase 3). Both are present only while a * job is (or has been) in `review`; `requestReview` (re)sets them.