fix(fleet): schedule the lease reaper so dead-factory jobs are recovered

reapExpiredLeases implements the full section-25 recovery (fence the zombie
holder via a leaseEpoch bump, return the job to queued/blocked, preserve the
checkpoint) but nothing ever called it: no route, no cron, no timer. So when a
factory crashed, lost network, or shut down, its in-flight job stayed stuck in
an active stage forever and was never requeued — the recovery code was dormant.

Add a process-wide background reaper (leases are queried across all products)
that runs reapExpiredLeases every 30s, started at server boot and stopped on
graceful shutdown, mirroring the diagnostics trigger-job pattern. A failing pass
is logged and retried on the next tick rather than crashing the service.

Generated with [Devin](https://cli.devin.ai/docs)

Co-Authored-By: Devin <158243242+devin-ai-integration[bot]@users.noreply.github.com>
This commit is contained in:
saravanakumardb1 2026-06-01 11:11:14 -07:00
parent 2ed19464c5
commit 0bf8be9be5
3 changed files with 155 additions and 0 deletions

View File

@ -0,0 +1,85 @@
/**
* Fleet lease reaper verifies the background loop actually drives
* coordinator.reapExpiredLeases (start/stop/idempotency, immediate + interval
* passes, and that a failing pass never throws out of the timer).
*/
import { describe, it, expect, vi, beforeEach, afterEach } from 'vitest';
const { reapSpy } = vi.hoisted(() => ({ reapSpy: vi.fn() }));
vi.mock('./coordinator.js', () => ({
reapExpiredLeases: reapSpy,
}));
import { startLeaseReaper, stopLeaseReaper, isLeaseReaperRunning } from './reaper.js';
const log = { info: vi.fn(), warn: vi.fn() };
describe('fleet lease reaper', () => {
beforeEach(() => {
vi.useFakeTimers();
reapSpy.mockReset();
reapSpy.mockResolvedValue({ reaped: 0, jobIds: [] });
log.info.mockReset();
log.warn.mockReset();
});
afterEach(() => {
stopLeaseReaper();
vi.useRealTimers();
});
it('runs a pass immediately on start, then on each interval', async () => {
startLeaseReaper(log);
expect(isLeaseReaperRunning()).toBe(true);
expect(reapSpy).toHaveBeenCalledTimes(1); // immediate boot pass
await vi.advanceTimersByTimeAsync(30_000);
expect(reapSpy).toHaveBeenCalledTimes(2);
await vi.advanceTimersByTimeAsync(30_000);
expect(reapSpy).toHaveBeenCalledTimes(3);
});
it('passes a fresh ISO timestamp to reapExpiredLeases', async () => {
startLeaseReaper(log);
expect(reapSpy).toHaveBeenCalledWith(expect.stringMatching(/^\d{4}-\d{2}-\d{2}T/));
});
it('logs only when something was reclaimed', async () => {
reapSpy.mockResolvedValueOnce({ reaped: 2, jobIds: ['fjob_a', 'fjob_b'] });
startLeaseReaper(log);
await vi.advanceTimersByTimeAsync(0);
expect(log.info).toHaveBeenCalledWith(expect.stringContaining('reclaimed 2'));
});
it('is idempotent: a second start does not double-schedule', async () => {
startLeaseReaper(log);
startLeaseReaper(log);
expect(reapSpy).toHaveBeenCalledTimes(1);
await vi.advanceTimersByTimeAsync(30_000);
expect(reapSpy).toHaveBeenCalledTimes(2); // one interval, not two
});
it('stop halts further passes and is idempotent', async () => {
startLeaseReaper(log);
stopLeaseReaper();
expect(isLeaseReaperRunning()).toBe(false);
await vi.advanceTimersByTimeAsync(90_000);
expect(reapSpy).toHaveBeenCalledTimes(1); // only the boot pass ran
expect(() => stopLeaseReaper()).not.toThrow();
});
it('swallows a failing pass and keeps the timer alive', async () => {
reapSpy.mockRejectedValueOnce(new Error('cosmos unavailable'));
startLeaseReaper(log);
await vi.advanceTimersByTimeAsync(0); // let the rejected boot pass settle
expect(log.warn).toHaveBeenCalledWith(expect.stringContaining('reap pass failed'));
// The interval is still scheduled and the next pass runs.
reapSpy.mockResolvedValueOnce({ reaped: 0, jobIds: [] });
await vi.advanceTimersByTimeAsync(30_000);
expect(reapSpy).toHaveBeenCalledTimes(2);
});
});

View File

@ -0,0 +1,63 @@
/**
* Fleet lease reaper background loop that reclaims jobs whose factory died.
*
* `coordinator.reapExpiredLeases` implements the full §25 recovery (fence the
* zombie holder via a leaseEpoch bump, return the job to queued/blocked, preserve
* the checkpoint pointer), but it is a pure function that does nothing until
* something calls it. Without a scheduler a crashed/disconnected factory's job
* stays stuck in an active stage forever. This module is that scheduler: a single
* process-wide timer (leases are queried across all products), started once at
* server boot and stopped on graceful shutdown mirroring the diagnostics
* trigger job pattern.
*/
import { reapExpiredLeases } from './coordinator.js';
/** How often to scan for expired leases. Well under the runner's 90s stale
* threshold so a dead factory's job is reclaimed promptly. */
const REAP_INTERVAL_MS = 30_000;
let reaperInterval: ReturnType<typeof globalThis.setInterval> | null = null;
interface ReaperLog {
info: (...a: unknown[]) => void;
warn?: (...a: unknown[]) => void;
}
/** One reap pass. Never throws a transient datastore error is logged and the
* next tick retries (recovery must not crash the service). */
async function runReapPass(log?: ReaperLog): Promise<void> {
try {
const { reaped, jobIds } = await reapExpiredLeases(new Date().toISOString());
if (reaped > 0) {
log?.info(`[fleet-reaper] reclaimed ${reaped} expired-lease job(s): ${jobIds.join(', ')}`);
}
} catch (err) {
const msg = err instanceof Error ? err.message : String(err);
(log?.warn ?? log?.info)?.(`[fleet-reaper] reap pass failed: ${msg}`);
}
}
/** Start the lease reaper. Idempotent: a second call while running is a no-op. */
export function startLeaseReaper(log?: ReaperLog): void {
if (reaperInterval) return;
log?.info(`[fleet-reaper] starting (every ${REAP_INTERVAL_MS / 1000}s)`);
// Run once on boot so a restart promptly recovers jobs orphaned while down.
void runReapPass(log);
reaperInterval = globalThis.setInterval(() => {
void runReapPass(log);
}, REAP_INTERVAL_MS);
}
/** Stop the lease reaper (graceful shutdown). Idempotent. */
export function stopLeaseReaper(): void {
if (reaperInterval) {
globalThis.clearInterval(reaperInterval);
reaperInterval = null;
}
}
/** Test/diagnostics helper: is the reaper currently scheduled? */
export function isLeaseReaperRunning(): boolean {
return reaperInterval !== null;
}

View File

@ -77,6 +77,7 @@ import { crashTriggerRoutes } from './modules/diagnostics/crash-trigger.js';
import { sessionReplayRoutes } from './modules/diagnostics/session-replay-routes.js';
import { performanceProfileRoutes } from './modules/diagnostics/performance-profile-routes.js';
import { startTriggerEvaluationJob } from './modules/diagnostics/trigger-job.js';
import { startLeaseReaper, stopLeaseReaper } from './modules/fleet/reaper.js';
import { broadcastRoutes } from './modules/broadcasts/routes.js';
import { surveyRoutes } from './modules/surveys/routes.js';
import { jobRoutes } from './modules/jobs/routes.js';
@ -360,4 +361,10 @@ app.addHook('onClose', async () => {
// Start diagnostic trigger evaluation job (Phase 4)
startTriggerEvaluationJob(app.log);
// Start the fleet lease reaper — reclaims jobs whose factory died (§25 recovery).
startLeaseReaper(app.log);
app.addHook('onClose', async () => {
stopLeaseReaper();
});
await startService(app, { port: config.PORT, host: config.HOST });