feat(fleet): stale-factory lease reclaim + bounded GC sweep

Two recovery/cleanup gaps left the coordinator's containers growing without
bound and jobs stuck longer than necessary:

- reclaimStaleFactoryLeases: a crashed/partitioned factory stops heartbeating
  ~90s before its 900s lease TTL expires; the reaper now reclaims held leases of
  stale (or vanished) holders within one stale window, via the same fence +
  checkpoint-preserving path as the expiry reaper (refactored into reclaimLeaseJob).

- sweepFleetGarbage: deletes ephemeral coordination state on by default (finished
  expired/released leases past a 24h TTL; factory docs with no heartbeat for 7d —
  a live host just re-registers). Terminal-job retention (jobs + their runs/events/
  artifacts+blobs) is OPT-IN only via FLEET_GC_RETENTION_DAYS (default 0 = never
  delete history). Every delete is best-effort so one failure can't stall the sweep.

Both are wired into the existing reaper loop: recovery scans run every 30s, the
deletion sweep is throttled to hourly. New repo helpers (listHeldLeases,
listFinishedLeasesOlderThan, deleteLease, listAllFactories, deleteFactory,
listTerminalJobsOlderThan, deleteRun, deleteEvent) back the new coordinator
functions. Covered by cleanup.test.ts + expanded reaper.test.ts.

Generated with [Devin](https://cli.devin.ai/docs)

Co-Authored-By: Devin <158243242+devin-ai-integration[bot]@users.noreply.github.com>
This commit is contained in:
saravanakumardb1 2026-06-01 11:34:14 -07:00
parent 0bf8be9be5
commit 68bfa3dbd8
5 changed files with 565 additions and 47 deletions

View File

@ -0,0 +1,224 @@
/**
* Fleet cleanup/recovery stale-factory lease reclaim + garbage-collection sweep.
* Memory provider; deterministic (no real clock `now` is injected).
*/
import { afterEach, beforeEach, describe, expect, it } from 'vitest';
import { MemoryDatastoreProvider } from '@bytelyst/datastore';
import { _resetDatastoreProvider, setProvider } from '../../lib/datastore.js';
import * as repo from './repository.js';
import * as coord from './coordinator.js';
import {
FleetJobDocSchema,
type FleetJobDoc,
type FleetLeaseDoc,
type SubmitJobInput,
} from './types.js';
const PID = 'lysnrai';
function input(over: Partial<SubmitJobInput> = {}): SubmitJobInput {
return {
idempotencyKey: 'task-1',
bodyMd: '# do the thing',
priority: 'medium',
capabilities: [],
prefersEngine: [],
allowedScope: [],
deps: [],
kind: 'leaf',
...over,
};
}
const factory = (over = {}) => ({
productId: PID,
factoryId: 'fac_1',
capabilities: [] as string[],
leaseSeconds: 900,
...over,
});
describe('fleet cleanup — reclaimStaleFactoryLeases', () => {
beforeEach(() => setProvider(new MemoryDatastoreProvider()));
afterEach(() => _resetDatastoreProvider());
it('reclaims a held lease whose holder factory stopped heartbeating', async () => {
const { job } = await coord.submitJob(PID, input());
await coord.heartbeat({ productId: PID, factoryId: 'fac_1', capabilities: [], load: 0 });
const claim = await coord.claimNextJob(factory());
expect(claim?.job.stage).toBe('assigned');
// Factory goes silent: rewrite its heartbeat to ~5 minutes ago.
const fac = await repo.getFactory('fac_1', PID);
await repo.upsertFactory({
...fac!,
lastHeartbeatAt: new Date(Date.now() - 5 * 60_000).toISOString(),
});
const res = await coord.reclaimStaleFactoryLeases(Date.now());
expect(res.reaped).toBe(1);
expect(res.jobIds).toEqual([job.id]);
const after = await repo.getJob(job.id, PID);
expect(after?.stage).toBe('queued'); // returned to the queue
expect(after?.leaseEpoch).toBe(claim!.job.leaseEpoch + 1); // zombie fenced
expect((await repo.getLease(job.id))?.status).toBe('expired');
});
it('reclaims a held lease whose holder factory never registered (vanished)', async () => {
const { job } = await coord.submitJob(PID, input());
await coord.claimNextJob(factory()); // no heartbeat ⇒ no factory doc
const res = await coord.reclaimStaleFactoryLeases(Date.now());
expect(res.reaped).toBe(1);
expect((await repo.getJob(job.id, PID))?.stage).toBe('queued');
});
it('leaves a fresh factory\u2019s lease untouched', async () => {
await coord.submitJob(PID, input());
await coord.heartbeat({ productId: PID, factoryId: 'fac_1', capabilities: [], load: 0 });
const claim = await coord.claimNextJob(factory());
const res = await coord.reclaimStaleFactoryLeases(Date.now());
expect(res.reaped).toBe(0);
expect((await repo.getJob(claim!.job.id, PID))?.stage).toBe('assigned');
});
});
describe('fleet cleanup — sweepFleetGarbage', () => {
beforeEach(() => setProvider(new MemoryDatastoreProvider()));
afterEach(() => _resetDatastoreProvider());
const now = Date.now();
/** Seed a terminal job directly (the repo write paths force updatedAt=now, so we
* go through createJob to control the backdated updatedAt the GC keys on). */
async function seedTerminalJob(
id: string,
ageMs: number,
stage = 'shipped'
): Promise<FleetJobDoc> {
const at = new Date(now - ageMs).toISOString();
const doc = FleetJobDocSchema.parse({
id,
productId: PID,
stage,
idempotencyKey: id,
contentHash: 'h',
bodyMd: '# done',
manifestSnapshot: {
priority: 'medium',
capabilities: [],
prefersEngine: [],
allowedScope: [],
deps: [],
},
priority: 'medium',
priorityOrder: 2,
capabilities: [],
deps: [],
kind: 'leaf',
attempts: 1,
leaseEpoch: 0,
rev: 0,
createdAt: at,
updatedAt: at,
});
return repo.createJob(doc);
}
async function seedFinishedLease(jobId: string, status: 'expired' | 'released', ageMs: number) {
const doc: FleetLeaseDoc = {
id: jobId,
productId: PID,
jobId,
status,
leaseEpoch: 1,
renewals: 0,
rev: 0,
updatedAt: new Date(now - ageMs).toISOString(),
};
await repo.createLease(doc);
}
it('deletes finished leases older than the TTL, keeps recent ones', async () => {
await seedFinishedLease('old', 'released', 48 * 60 * 60 * 1000); // 2 days
await seedFinishedLease('recent', 'expired', 60_000); // 1 min
const res = await coord.sweepFleetGarbage({ now });
expect(res.leasesDeleted).toBe(1);
expect(await repo.getLease('old')).toBeNull();
expect(await repo.getLease('recent')).not.toBeNull();
});
it('deletes long-dead factory docs, keeps live ones', async () => {
await repo.upsertFactory({
id: 'dead',
productId: PID,
factoryId: 'dead',
descriptor: {},
capabilities: [],
health: 'ok',
load: 0,
seatLimit: 1,
lastHeartbeatAt: new Date(now - 10 * 24 * 60 * 60 * 1000).toISOString(), // 10 days
});
await repo.upsertFactory({
id: 'live',
productId: PID,
factoryId: 'live',
descriptor: {},
capabilities: [],
health: 'ok',
load: 0,
seatLimit: 1,
lastHeartbeatAt: new Date(now).toISOString(),
});
const res = await coord.sweepFleetGarbage({ now });
expect(res.factoriesDeleted).toBe(1);
expect(await repo.getFactory('dead', PID)).toBeNull();
expect(await repo.getFactory('live', PID)).not.toBeNull();
});
it('never prunes terminal jobs/history by default (jobRetentionMs unset)', async () => {
const job = await seedTerminalJob('jt_old', 365 * 24 * 60 * 60 * 1000); // 1 year
const res = await coord.sweepFleetGarbage({ now });
expect(res.jobsDeleted).toBe(0);
expect(await repo.getJob(job.id, PID)).not.toBeNull();
});
it('prunes old terminal jobs and cascades runs/events when retention is set', async () => {
const job = await seedTerminalJob('jt_40d', 40 * 24 * 60 * 60 * 1000); // 40 days
await repo.createRun({
id: 'run_1',
productId: PID,
jobId: job.id,
attempt: 1,
engine: 'devin',
startedAt: new Date(now).toISOString(),
insights: {},
});
await repo.appendEvent({ jobId: job.id, productId: PID, type: 'submitted' });
const res = await coord.sweepFleetGarbage({
now,
jobRetentionMs: 30 * 24 * 60 * 60 * 1000, // 30-day retention
});
expect(res.jobsDeleted).toBe(1);
expect(res.runsDeleted).toBe(1);
expect(res.eventsDeleted).toBeGreaterThanOrEqual(1);
expect(await repo.getJob(job.id, PID)).toBeNull();
expect(await repo.listRunsByJob(job.id)).toHaveLength(0);
expect(await repo.listEvents(job.id)).toHaveLength(0);
});
it('keeps terminal jobs newer than the retention window', async () => {
const job = await seedTerminalJob('jt_5d', 5 * 24 * 60 * 60 * 1000); // 5 days
const res = await coord.sweepFleetGarbage({
now,
jobRetentionMs: 30 * 24 * 60 * 60 * 1000,
});
expect(res.jobsDeleted).toBe(0);
expect(await repo.getJob(job.id, PID)).not.toBeNull();
});
});

View File

@ -22,6 +22,7 @@ import { execFile } from 'node:child_process';
import { promisify } from 'node:util';
import { BadRequestError, ConflictError } from '../../lib/errors.js';
import * as repo from './repository.js';
import { deleteArtifact as deleteArtifactWithBlob } from './artifacts-blob.js';
import {
selectJob,
selectPreemptionVictim,
@ -1739,6 +1740,48 @@ export interface ReapResult {
jobIds: string[];
}
/**
* Reclaim ONE held lease's job: fence the zombie holder (bump leaseEpoch), return
* the job to queued/blocked preserving its checkpoint, mark the lease terminal, and
* emit an audit event. Returns the reclaimed jobId, or null when there was nothing
* to do (job gone, already resting, or a lost CAS race all idempotent no-ops).
* Shared by the expiry reaper and the stale-factory reclaim so both behave identically.
*/
async function reclaimLeaseJob(
lease: FleetLeaseDoc,
eventType: 'lease_expired' | 'factory_stale'
): Promise<string | null> {
const job = await repo.getJob(lease.jobId, lease.productId);
if (!job) return null;
if (!ACTIVE_STAGES.includes(job.stage)) return null; // already resting; nothing to reclaim
const newEpoch = job.leaseEpoch + 1; // fence the zombie holder
const unmet = await unmetDeps(job);
const stage: FleetStage = unmet.length > 0 ? 'blocked' : 'queued';
// checkpoint pointer is intentionally preserved on the job (resume-friendly)
const jobRes = await repo.revUpdateJob(job.id, lease.productId, job.rev, {
stage,
leaseEpoch: newEpoch,
blockedReason: unmet.length > 0 ? `waiting on: ${unmet.join(', ')}` : undefined,
});
if (!jobRes.ok) return null;
await repo.revUpdateLease(lease.jobId, lease.rev, {
status: 'expired',
leaseEpoch: newEpoch,
holderFactoryId: undefined,
});
await repo.appendEvent({
jobId: job.id,
productId: lease.productId,
type: eventType,
data: {
leaseEpoch: newEpoch,
returnedTo: stage,
...(lease.holderFactoryId ? { holderFactoryId: lease.holderFactoryId } : {}),
},
});
return job.id;
}
/**
* Reclaim jobs whose lease has expired. Cosmos TTL would only DELETE the lease
* doc it cannot return the job to `queued`, bump the fencing epoch, or preserve
@ -1749,31 +1792,153 @@ export async function reapExpiredLeases(nowIso: string): Promise<ReapResult> {
const expired = await repo.listExpiredLeases(nowIso);
const jobIds: string[] = [];
for (const lease of expired) {
const job = await repo.getJob(lease.jobId, lease.productId);
if (!job) continue;
if (!ACTIVE_STAGES.includes(job.stage)) continue; // already resting; nothing to reclaim
const newEpoch = job.leaseEpoch + 1; // fence the zombie holder
const unmet = await unmetDeps(job);
const stage: FleetStage = unmet.length > 0 ? 'blocked' : 'queued';
// checkpoint pointer is intentionally preserved on the job (resume-friendly)
const jobRes = await repo.revUpdateJob(job.id, lease.productId, job.rev, {
stage,
leaseEpoch: newEpoch,
blockedReason: unmet.length > 0 ? `waiting on: ${unmet.join(', ')}` : undefined,
});
if (!jobRes.ok) continue;
await repo.revUpdateLease(lease.jobId, lease.rev, {
status: 'expired',
leaseEpoch: newEpoch,
holderFactoryId: undefined,
});
await repo.appendEvent({
jobId: job.id,
productId: lease.productId,
type: 'lease_expired',
data: { leaseEpoch: newEpoch, returnedTo: stage },
});
jobIds.push(job.id);
const id = await reclaimLeaseJob(lease, 'lease_expired');
if (id) jobIds.push(id);
}
return { reaped: jobIds.length, jobIds };
}
/**
* Reclaim held leases whose holder factory has gone stale (missed heartbeats past
* `staleMaxAgeMs`) or vanished entirely, without waiting out the (much longer)
* lease TTL. A crashed/network-partitioned factory stops heartbeating long before
* its 900s lease expires; this recovers its job within one stale window instead.
* Idempotent and safe alongside the expiry reaper (both go through reclaimLeaseJob).
*/
export async function reclaimStaleFactoryLeases(
nowMs: number,
staleMaxAgeMs = DEFAULT_STALE_FACTORY_MS
): Promise<ReapResult> {
const [held, factories] = await Promise.all([repo.listHeldLeases(), repo.listAllFactories()]);
// factoryId → most-recent heartbeat (a factory can appear once per product).
const lastBeat = new Map<string, number>();
for (const f of factories) {
const t = Date.parse(f.lastHeartbeatAt);
if (Number.isNaN(t)) continue;
lastBeat.set(f.factoryId, Math.max(lastBeat.get(f.factoryId) ?? 0, t));
}
const jobIds: string[] = [];
for (const lease of held) {
const holder = lease.holderFactoryId;
if (!holder) continue;
const beat = lastBeat.get(holder);
// Reclaim when the holder is unknown (vanished) or hasn't heartbeat recently.
const isStale = beat === undefined || nowMs - beat > staleMaxAgeMs;
if (!isStale) continue;
const id = await reclaimLeaseJob(lease, 'factory_stale');
if (id) jobIds.push(id);
}
return { reaped: jobIds.length, jobIds };
}
/** Terminal job stages eligible for retention GC (resting, never to run again). */
export const GC_TERMINAL_STAGES: readonly FleetStage[] = ['shipped', 'failed', 'dead_letter'];
export interface SweepOptions {
/** Authoritative now (ms epoch). */
now: number;
/** Delete finished (expired/released) lease docs older than this. Default 24h. */
finishedLeaseTtlMs?: number;
/** Delete factory docs with no heartbeat for longer than this. Default 7d. */
deadFactoryTtlMs?: number;
/** When > 0, delete terminal jobs (and cascade runs/events/artifacts/lease) last
* updated longer ago than this. 0/undefined DISABLED (never deletes history). */
jobRetentionMs?: number;
}
export interface SweepResult {
leasesDeleted: number;
factoriesDeleted: number;
jobsDeleted: number;
runsDeleted: number;
eventsDeleted: number;
artifactsDeleted: number;
}
const DEFAULT_FINISHED_LEASE_TTL_MS = 24 * 60 * 60 * 1000; // 1 day
const DEFAULT_DEAD_FACTORY_TTL_MS = 7 * 24 * 60 * 60 * 1000; // 7 days
/**
* Garbage-collect fleet bookkeeping so the containers don't grow without bound.
*
* On by default (safe these are coordination ephemera, not history):
* finished leases (expired/released) older than `finishedLeaseTtlMs`
* factory docs with no heartbeat for longer than `deadFactoryTtlMs` (a live
* host simply re-registers on its next heartbeat)
*
* Opt-in only (destroys audit history `jobRetentionMs` must be explicitly > 0):
* terminal jobs older than `jobRetentionMs`, cascading to their runs, events,
* artifact pointers (+ backing blobs, best-effort), and lease.
*
* Every deletion is best-effort: one failure is swallowed so the sweep makes
* forward progress and retries on the next pass.
*/
export async function sweepFleetGarbage(opts: SweepOptions): Promise<SweepResult> {
const now = opts.now;
const finishedLeaseTtlMs = opts.finishedLeaseTtlMs ?? DEFAULT_FINISHED_LEASE_TTL_MS;
const deadFactoryTtlMs = opts.deadFactoryTtlMs ?? DEFAULT_DEAD_FACTORY_TTL_MS;
const jobRetentionMs = opts.jobRetentionMs ?? 0;
const result: SweepResult = {
leasesDeleted: 0,
factoriesDeleted: 0,
jobsDeleted: 0,
runsDeleted: 0,
eventsDeleted: 0,
artifactsDeleted: 0,
};
// 1) Finished leases (ephemeral coordination state) past their TTL.
const leaseCutoff = new Date(now - finishedLeaseTtlMs).toISOString();
for (const lease of await repo.listFinishedLeasesOlderThan(leaseCutoff)) {
try {
await repo.deleteLease(lease.jobId);
result.leasesDeleted++;
} catch {
/* best-effort */
}
}
// 2) Long-dead factory docs (a live host re-registers on next heartbeat).
for (const f of await repo.listAllFactories()) {
const beat = Date.parse(f.lastHeartbeatAt);
if (Number.isNaN(beat) || now - beat <= deadFactoryTtlMs) continue;
try {
await repo.deleteFactory(f.factoryId, f.productId);
result.factoriesDeleted++;
} catch {
/* best-effort */
}
}
// 3) Terminal-job retention cascade — OPT-IN only (destroys history).
if (jobRetentionMs > 0) {
const jobCutoff = new Date(now - jobRetentionMs).toISOString();
for (const job of await repo.listTerminalJobsOlderThan(jobCutoff, GC_TERMINAL_STAGES)) {
try {
for (const run of await repo.listRunsByJob(job.id)) {
await repo.deleteRun(run.id, job.id);
result.runsDeleted++;
}
for (const ev of await repo.listEvents(job.id)) {
await repo.deleteEvent(ev.id, job.id);
result.eventsDeleted++;
}
for (const art of await repo.listArtifactsByJob(job.id, job.productId)) {
try {
await deleteArtifactWithBlob(art.id, job.productId);
} catch {
await repo.deleteArtifact(art.id, job.productId); // pointer at least
}
result.artifactsDeleted++;
}
await repo.deleteLease(job.id).catch(() => {});
await repo.deleteJob(job.id, job.productId);
result.jobsDeleted++;
} catch {
/* best-effort: leave this job for the next pass */
}
}
}
return result;
}

View File

@ -6,21 +6,39 @@
import { describe, it, expect, vi, beforeEach, afterEach } from 'vitest';
const { reapSpy } = vi.hoisted(() => ({ reapSpy: vi.fn() }));
const { reapSpy, staleSpy, sweepSpy } = vi.hoisted(() => ({
reapSpy: vi.fn(),
staleSpy: vi.fn(),
sweepSpy: vi.fn(),
}));
vi.mock('./coordinator.js', () => ({
reapExpiredLeases: reapSpy,
reclaimStaleFactoryLeases: staleSpy,
sweepFleetGarbage: sweepSpy,
}));
import { startLeaseReaper, stopLeaseReaper, isLeaseReaperRunning } from './reaper.js';
const log = { info: vi.fn(), warn: vi.fn() };
const emptyGc = {
leasesDeleted: 0,
factoriesDeleted: 0,
jobsDeleted: 0,
runsDeleted: 0,
eventsDeleted: 0,
artifactsDeleted: 0,
};
describe('fleet lease reaper', () => {
beforeEach(() => {
vi.useFakeTimers();
reapSpy.mockReset();
staleSpy.mockReset();
sweepSpy.mockReset();
reapSpy.mockResolvedValue({ reaped: 0, jobIds: [] });
staleSpy.mockResolvedValue({ reaped: 0, jobIds: [] });
sweepSpy.mockResolvedValue({ ...emptyGc });
log.info.mockReset();
log.warn.mockReset();
});
@ -47,11 +65,30 @@ describe('fleet lease reaper', () => {
expect(reapSpy).toHaveBeenCalledWith(expect.stringMatching(/^\d{4}-\d{2}-\d{2}T/));
});
it('logs only when something was reclaimed', async () => {
it('also reclaims stale-factory leases each pass', async () => {
startLeaseReaper(log);
await vi.advanceTimersByTimeAsync(0); // flush the boot pass (stale runs after an await)
expect(staleSpy).toHaveBeenCalledTimes(1);
await vi.advanceTimersByTimeAsync(30_000);
expect(staleSpy).toHaveBeenCalledTimes(2);
});
it('logs a combined count when something was reclaimed', async () => {
reapSpy.mockResolvedValueOnce({ reaped: 2, jobIds: ['fjob_a', 'fjob_b'] });
staleSpy.mockResolvedValueOnce({ reaped: 1, jobIds: ['fjob_c'] });
startLeaseReaper(log);
await vi.advanceTimersByTimeAsync(0);
expect(log.info).toHaveBeenCalledWith(expect.stringContaining('reclaimed 2'));
expect(log.info).toHaveBeenCalledWith(expect.stringContaining('reclaimed 3 job(s)'));
});
it('runs the GC sweep on boot then throttles it (hourly, not every tick)', async () => {
startLeaseReaper(log);
await vi.advanceTimersByTimeAsync(0); // flush the boot pass (sweep runs after awaits)
expect(sweepSpy).toHaveBeenCalledTimes(1); // boot sweep
await vi.advanceTimersByTimeAsync(30_000); // a fast reclaim tick — no sweep
expect(sweepSpy).toHaveBeenCalledTimes(1);
await vi.advanceTimersByTimeAsync(60 * 60 * 1000); // cross the hourly boundary
expect(sweepSpy).toHaveBeenCalledTimes(2);
});
it('is idempotent: a second start does not double-schedule', async () => {
@ -75,7 +112,7 @@ describe('fleet lease reaper', () => {
reapSpy.mockRejectedValueOnce(new Error('cosmos unavailable'));
startLeaseReaper(log);
await vi.advanceTimersByTimeAsync(0); // let the rejected boot pass settle
expect(log.warn).toHaveBeenCalledWith(expect.stringContaining('reap pass failed'));
expect(log.warn).toHaveBeenCalledWith(expect.stringContaining('reclaim pass failed'));
// The interval is still scheduled and the next pass runs.
reapSpy.mockResolvedValueOnce({ reaped: 0, jobIds: [] });

View File

@ -1,47 +1,93 @@
/**
* Fleet lease reaper background loop that reclaims jobs whose factory died.
* Fleet lease reaper background loop that recovers and garbage-collects fleet
* state. Each pass does two recovery scans plus a periodic GC sweep:
*
* `coordinator.reapExpiredLeases` implements the full §25 recovery (fence the
* zombie holder via a leaseEpoch bump, return the job to queued/blocked, preserve
* the checkpoint pointer), but it is a pure function that does nothing until
* something calls it. Without a scheduler a crashed/disconnected factory's job
* stays stuck in an active stage forever. This module is that scheduler: a single
* process-wide timer (leases are queried across all products), started once at
* server boot and stopped on graceful shutdown mirroring the diagnostics
* trigger job pattern.
* reapExpiredLeases reclaim jobs whose lease TTL elapsed (§25 recovery:
* fence the zombie via a leaseEpoch bump, return the
* job to queued/blocked, preserve the checkpoint).
* reclaimStaleFactoryLeases reclaim held leases of factories that stopped
* heartbeating, long before the (much longer) TTL.
* sweepFleetGarbage (less frequently) delete finished leases and dead
* factory docs, and only when retention is set
* prune terminal jobs and their runs/events/artifacts.
*
* All of this is implemented as pure coordinator functions that do nothing until
* something calls them; this module is that scheduler. A single process-wide timer
* (the scans query across all products), started once at server boot and stopped
* on graceful shutdown, mirroring the diagnostics trigger-job pattern.
*/
import { reapExpiredLeases } from './coordinator.js';
import { reapExpiredLeases, reclaimStaleFactoryLeases, sweepFleetGarbage } from './coordinator.js';
/** How often to scan for expired leases. Well under the runner's 90s stale
/** How often to scan for expired/stale leases. Well under the runner's 90s stale
* threshold so a dead factory's job is reclaimed promptly. */
const REAP_INTERVAL_MS = 30_000;
/** Run the (heavier, deletion) GC sweep at most this often, regardless of the
* faster recovery cadence above. */
const SWEEP_INTERVAL_MS = 60 * 60 * 1000; // hourly
/** Terminal-job retention from env: `FLEET_GC_RETENTION_DAYS`. 0/unset jobs and
* their history are NEVER deleted (only ephemeral leases/dead-factory docs are). */
function jobRetentionMs(): number {
const days = Number(process.env.FLEET_GC_RETENTION_DAYS ?? '0');
return Number.isFinite(days) && days > 0 ? days * 24 * 60 * 60 * 1000 : 0;
}
let reaperInterval: ReturnType<typeof globalThis.setInterval> | null = null;
let lastSweepMs = 0;
interface ReaperLog {
info: (...a: unknown[]) => void;
warn?: (...a: unknown[]) => void;
}
/** One reap pass. Never throws a transient datastore error is logged and the
* next tick retries (recovery must not crash the service). */
/** One reap pass: expiry reclaim + stale-factory reclaim, then a throttled GC
* sweep. Never throws a transient datastore error is logged and the next tick
* retries (recovery must not crash the service). */
async function runReapPass(log?: ReaperLog): Promise<void> {
const nowMs = Date.now();
try {
const { reaped, jobIds } = await reapExpiredLeases(new Date().toISOString());
if (reaped > 0) {
log?.info(`[fleet-reaper] reclaimed ${reaped} expired-lease job(s): ${jobIds.join(', ')}`);
const expired = await reapExpiredLeases(new Date(nowMs).toISOString());
const stale = await reclaimStaleFactoryLeases(nowMs);
const reclaimed = [...expired.jobIds, ...stale.jobIds];
if (reclaimed.length > 0) {
log?.info(
`[fleet-reaper] reclaimed ${reclaimed.length} job(s) ` +
`(${expired.reaped} expired-lease, ${stale.reaped} stale-factory): ${reclaimed.join(', ')}`
);
}
} catch (err) {
const msg = err instanceof Error ? err.message : String(err);
(log?.warn ?? log?.info)?.(`[fleet-reaper] reap pass failed: ${msg}`);
(log?.warn ?? log?.info)?.(`[fleet-reaper] reclaim pass failed: ${msg}`);
}
// Throttled GC sweep — deletions, so run far less often than recovery.
if (nowMs - lastSweepMs < SWEEP_INTERVAL_MS) return;
lastSweepMs = nowMs;
try {
const gc = await sweepFleetGarbage({ now: nowMs, jobRetentionMs: jobRetentionMs() });
const total =
gc.leasesDeleted + gc.factoriesDeleted + gc.jobsDeleted + gc.runsDeleted + gc.eventsDeleted;
if (total > 0) {
log?.info(
`[fleet-reaper] gc: ${gc.leasesDeleted} lease(s), ${gc.factoriesDeleted} factory(ies), ` +
`${gc.jobsDeleted} job(s) + ${gc.runsDeleted} run(s)/${gc.eventsDeleted} event(s)/` +
`${gc.artifactsDeleted} artifact(s)`
);
}
} catch (err) {
const msg = err instanceof Error ? err.message : String(err);
(log?.warn ?? log?.info)?.(`[fleet-reaper] gc sweep failed: ${msg}`);
}
}
/** Start the lease reaper. Idempotent: a second call while running is a no-op. */
export function startLeaseReaper(log?: ReaperLog): void {
if (reaperInterval) return;
log?.info(`[fleet-reaper] starting (every ${REAP_INTERVAL_MS / 1000}s)`);
// Let the first tick run the GC sweep too (don't skip it for an hour on boot).
lastSweepMs = 0;
log?.info(`[fleet-reaper] starting (reclaim every ${REAP_INTERVAL_MS / 1000}s)`);
// Run once on boot so a restart promptly recovers jobs orphaned while down.
void runReapPass(log);
reaperInterval = globalThis.setInterval(() => {

View File

@ -190,6 +190,17 @@ export async function deleteJob(id: string, productId: string): Promise<void> {
await jobs().delete(id, productId);
}
/** Terminal jobs (cross-partition) last updated before `cutoffIso` retention GC
* candidates. `stages` is the terminal set the caller considers prunable. */
export async function listTerminalJobsOlderThan(
cutoffIso: string,
stages: readonly FleetStage[]
): Promise<FleetJobDoc[]> {
return jobs().findMany({
filter: { stage: { $in: stages as unknown as string[] }, updatedAt: { $lt: cutoffIso } },
});
}
/**
* Record the last tracker Item status echoed for a job (§10 round-trip idempotency).
* Deliberately does NOT bump `rev` it is bookkeeping for the downstream echo, not a
@ -228,6 +239,10 @@ export async function listRunsByJob(jobId: string): Promise<FleetRunDoc[]> {
return runs().findMany({ filter: { jobId }, sort: { attempt: 1 } });
}
export async function deleteRun(id: string, jobId: string): Promise<void> {
await runs().delete(id, jobId);
}
/** All runs for a product (cross-partition) — used for cost aggregation. */
export async function listRunsByProduct(productId: string): Promise<FleetRunDoc[]> {
return runs().findMany({ filter: { productId } });
@ -263,6 +278,23 @@ export async function listExpiredLeases(nowIso: string): Promise<FleetLeaseDoc[]
});
}
/** All currently-held leases (cross-partition) — used by stale-factory reclaim. */
export async function listHeldLeases(): Promise<FleetLeaseDoc[]> {
return leases().findMany({ filter: { status: 'held' } });
}
/** Finished (expired/released) leases last touched before `cutoffIso` GC candidates.
* These are pure coordination ephemera (not audit history), safe to delete. */
export async function listFinishedLeasesOlderThan(cutoffIso: string): Promise<FleetLeaseDoc[]> {
return leases().findMany({
filter: { status: { $in: ['expired', 'released'] }, updatedAt: { $lt: cutoffIso } },
});
}
export async function deleteLease(jobId: string): Promise<void> {
await leases().delete(jobId, jobId);
}
// ── Factories ─────────────────────────────────────────────────────────────────
export async function getFactory(
@ -280,6 +312,16 @@ export async function listFactories(productId: string): Promise<FleetFactoryDoc[
return factories().findMany({ filter: { productId } });
}
/** All registered factories across every product (cross-partition) used by the
* reaper to find stale holders and GC long-dead factory docs. */
export async function listAllFactories(): Promise<FleetFactoryDoc[]> {
return factories().findMany({});
}
export async function deleteFactory(factoryId: string, productId: string): Promise<void> {
await factories().delete(factoryId, productId);
}
// ── Profiles ────────────────────────────────────────────────────────────────
export async function createProfile(doc: FleetProfileDoc): Promise<FleetProfileDoc> {
@ -326,6 +368,10 @@ export async function listEvents(jobId: string): Promise<FleetEventDoc[]> {
return docs;
}
export async function deleteEvent(id: string, jobId: string): Promise<void> {
await events().delete(id, jobId);
}
// ── Artifacts (pointers only — bytes live in blob, never Cosmos; §13) ──────────
export async function createArtifact(doc: FleetArtifactDoc): Promise<FleetArtifactDoc> {