diff --git a/services/platform-service/src/modules/fleet/reaper.test.ts b/services/platform-service/src/modules/fleet/reaper.test.ts new file mode 100644 index 00000000..8c3c6bea --- /dev/null +++ b/services/platform-service/src/modules/fleet/reaper.test.ts @@ -0,0 +1,85 @@ +/** + * Fleet lease reaper — verifies the background loop actually drives + * coordinator.reapExpiredLeases (start/stop/idempotency, immediate + interval + * passes, and that a failing pass never throws out of the timer). + */ + +import { describe, it, expect, vi, beforeEach, afterEach } from 'vitest'; + +const { reapSpy } = vi.hoisted(() => ({ reapSpy: vi.fn() })); + +vi.mock('./coordinator.js', () => ({ + reapExpiredLeases: reapSpy, +})); + +import { startLeaseReaper, stopLeaseReaper, isLeaseReaperRunning } from './reaper.js'; + +const log = { info: vi.fn(), warn: vi.fn() }; + +describe('fleet lease reaper', () => { + beforeEach(() => { + vi.useFakeTimers(); + reapSpy.mockReset(); + reapSpy.mockResolvedValue({ reaped: 0, jobIds: [] }); + log.info.mockReset(); + log.warn.mockReset(); + }); + + afterEach(() => { + stopLeaseReaper(); + vi.useRealTimers(); + }); + + it('runs a pass immediately on start, then on each interval', async () => { + startLeaseReaper(log); + expect(isLeaseReaperRunning()).toBe(true); + expect(reapSpy).toHaveBeenCalledTimes(1); // immediate boot pass + + await vi.advanceTimersByTimeAsync(30_000); + expect(reapSpy).toHaveBeenCalledTimes(2); + + await vi.advanceTimersByTimeAsync(30_000); + expect(reapSpy).toHaveBeenCalledTimes(3); + }); + + it('passes a fresh ISO timestamp to reapExpiredLeases', async () => { + startLeaseReaper(log); + expect(reapSpy).toHaveBeenCalledWith(expect.stringMatching(/^\d{4}-\d{2}-\d{2}T/)); + }); + + it('logs only when something was reclaimed', async () => { + reapSpy.mockResolvedValueOnce({ reaped: 2, jobIds: ['fjob_a', 'fjob_b'] }); + startLeaseReaper(log); + await vi.advanceTimersByTimeAsync(0); + expect(log.info).toHaveBeenCalledWith(expect.stringContaining('reclaimed 2')); + }); + + it('is idempotent: a second start does not double-schedule', async () => { + startLeaseReaper(log); + startLeaseReaper(log); + expect(reapSpy).toHaveBeenCalledTimes(1); + await vi.advanceTimersByTimeAsync(30_000); + expect(reapSpy).toHaveBeenCalledTimes(2); // one interval, not two + }); + + it('stop halts further passes and is idempotent', async () => { + startLeaseReaper(log); + stopLeaseReaper(); + expect(isLeaseReaperRunning()).toBe(false); + await vi.advanceTimersByTimeAsync(90_000); + expect(reapSpy).toHaveBeenCalledTimes(1); // only the boot pass ran + expect(() => stopLeaseReaper()).not.toThrow(); + }); + + it('swallows a failing pass and keeps the timer alive', async () => { + reapSpy.mockRejectedValueOnce(new Error('cosmos unavailable')); + startLeaseReaper(log); + await vi.advanceTimersByTimeAsync(0); // let the rejected boot pass settle + expect(log.warn).toHaveBeenCalledWith(expect.stringContaining('reap pass failed')); + + // The interval is still scheduled and the next pass runs. + reapSpy.mockResolvedValueOnce({ reaped: 0, jobIds: [] }); + await vi.advanceTimersByTimeAsync(30_000); + expect(reapSpy).toHaveBeenCalledTimes(2); + }); +}); diff --git a/services/platform-service/src/modules/fleet/reaper.ts b/services/platform-service/src/modules/fleet/reaper.ts new file mode 100644 index 00000000..63d15637 --- /dev/null +++ b/services/platform-service/src/modules/fleet/reaper.ts @@ -0,0 +1,63 @@ +/** + * Fleet lease reaper — background loop that reclaims jobs whose factory died. + * + * `coordinator.reapExpiredLeases` implements the full §25 recovery (fence the + * zombie holder via a leaseEpoch bump, return the job to queued/blocked, preserve + * the checkpoint pointer), but it is a pure function that does nothing until + * something calls it. Without a scheduler a crashed/disconnected factory's job + * stays stuck in an active stage forever. This module is that scheduler: a single + * process-wide timer (leases are queried across all products), started once at + * server boot and stopped on graceful shutdown — mirroring the diagnostics + * trigger job pattern. + */ + +import { reapExpiredLeases } from './coordinator.js'; + +/** How often to scan for expired leases. Well under the runner's 90s stale + * threshold so a dead factory's job is reclaimed promptly. */ +const REAP_INTERVAL_MS = 30_000; + +let reaperInterval: ReturnType | null = null; + +interface ReaperLog { + info: (...a: unknown[]) => void; + warn?: (...a: unknown[]) => void; +} + +/** One reap pass. Never throws — a transient datastore error is logged and the + * next tick retries (recovery must not crash the service). */ +async function runReapPass(log?: ReaperLog): Promise { + try { + const { reaped, jobIds } = await reapExpiredLeases(new Date().toISOString()); + if (reaped > 0) { + log?.info(`[fleet-reaper] reclaimed ${reaped} expired-lease job(s): ${jobIds.join(', ')}`); + } + } catch (err) { + const msg = err instanceof Error ? err.message : String(err); + (log?.warn ?? log?.info)?.(`[fleet-reaper] reap pass failed: ${msg}`); + } +} + +/** Start the lease reaper. Idempotent: a second call while running is a no-op. */ +export function startLeaseReaper(log?: ReaperLog): void { + if (reaperInterval) return; + log?.info(`[fleet-reaper] starting (every ${REAP_INTERVAL_MS / 1000}s)`); + // Run once on boot so a restart promptly recovers jobs orphaned while down. + void runReapPass(log); + reaperInterval = globalThis.setInterval(() => { + void runReapPass(log); + }, REAP_INTERVAL_MS); +} + +/** Stop the lease reaper (graceful shutdown). Idempotent. */ +export function stopLeaseReaper(): void { + if (reaperInterval) { + globalThis.clearInterval(reaperInterval); + reaperInterval = null; + } +} + +/** Test/diagnostics helper: is the reaper currently scheduled? */ +export function isLeaseReaperRunning(): boolean { + return reaperInterval !== null; +} diff --git a/services/platform-service/src/server.ts b/services/platform-service/src/server.ts index 13303694..bd3566f2 100644 --- a/services/platform-service/src/server.ts +++ b/services/platform-service/src/server.ts @@ -77,6 +77,7 @@ import { crashTriggerRoutes } from './modules/diagnostics/crash-trigger.js'; import { sessionReplayRoutes } from './modules/diagnostics/session-replay-routes.js'; import { performanceProfileRoutes } from './modules/diagnostics/performance-profile-routes.js'; import { startTriggerEvaluationJob } from './modules/diagnostics/trigger-job.js'; +import { startLeaseReaper, stopLeaseReaper } from './modules/fleet/reaper.js'; import { broadcastRoutes } from './modules/broadcasts/routes.js'; import { surveyRoutes } from './modules/surveys/routes.js'; import { jobRoutes } from './modules/jobs/routes.js'; @@ -360,4 +361,10 @@ app.addHook('onClose', async () => { // Start diagnostic trigger evaluation job (Phase 4) startTriggerEvaluationJob(app.log); +// Start the fleet lease reaper — reclaims jobs whose factory died (§25 recovery). +startLeaseReaper(app.log); +app.addHook('onClose', async () => { + stopLeaseReaper(); +}); + await startService(app, { port: config.PORT, host: config.HOST });