feat(fleet): reaper/GC telemetry in /fleet/metrics + lease TTL backstop

Observability + defense-in-depth for the recovery/GC machinery:

- The reaper now accumulates process-wide telemetry (getReaperStats): cumulative
  expired/stale reclaims and per-container GC deletions, plus startedAt/last-run
  timestamps. GET /fleet/metrics returns it under a `reaper` field so operators
  can see recovery activity (dead_letter counts/alerts were already added).
- Cosmos TTL backstop on fleet_leases (2 days): a held lease is renewed
  continuously so it never expires while active; only finished leases age out,
  matching the ~24h app GC. Purely defense-in-depth behind the reaper, which still
  OWNS recovery (requeue + epoch bump + checkpoint). TTL is deliberately NOT set on
  fleet_events (ids are <jobId>:evt:<seq> with seq=count, so partial TTL deletion
  could collide ids); events/runs/jobs are pruned by the cascade GC instead.

Memory provider ignores defaultTtl, so tests/dev are unaffected.

Generated with [Devin](https://cli.devin.ai/docs)

Co-Authored-By: Devin <158243242+devin-ai-integration[bot]@users.noreply.github.com>
This commit is contained in:
saravanakumardb1 2026-06-01 12:18:22 -07:00
parent 141435fe95
commit 5413c0e789
4 changed files with 93 additions and 3 deletions

View File

@ -200,7 +200,15 @@ const CONTAINER_DEFS: Record<string, ContainerConfig> = {
],
},
fleet_runs: { partitionKeyPath: '/jobId' },
fleet_leases: { partitionKeyPath: '/jobId' },
// TTL backstop (2 days) for lease docs: a HELD lease is renewed continuously so
// its _ts stays fresh and it never expires while active; only finished
// (released/expired) leases — which stop being written — age out. This is purely
// defense-in-depth behind the app-level reaper/GC (which deletes finished leases
// after ~24h and owns recovery: requeue + epoch bump + checkpoint). NOTE: TTL is
// deliberately NOT set on fleet_events (event ids are `<jobId>:evt:<seq>` where
// seq = current count, so partial TTL deletion could collide ids) — events/runs/
// jobs are pruned by the cascade GC instead.
fleet_leases: { partitionKeyPath: '/jobId', defaultTtl: 2 * 86400 },
fleet_factories: { partitionKeyPath: '/productId' },
fleet_profiles: { partitionKeyPath: '/productId' },
fleet_events: { partitionKeyPath: '/jobId' },

View File

@ -18,7 +18,12 @@ vi.mock('./coordinator.js', () => ({
sweepFleetGarbage: sweepSpy,
}));
import { startLeaseReaper, stopLeaseReaper, isLeaseReaperRunning } from './reaper.js';
import {
startLeaseReaper,
stopLeaseReaper,
isLeaseReaperRunning,
getReaperStats,
} from './reaper.js';
const log = { info: vi.fn(), warn: vi.fn() };
const emptyGc = {
@ -109,6 +114,25 @@ describe('fleet lease reaper', () => {
expect(() => stopLeaseReaper()).not.toThrow();
});
it('accumulates reclaim + gc telemetry in getReaperStats', async () => {
const before = getReaperStats().totals;
reapSpy.mockResolvedValueOnce({ reaped: 2, jobIds: ['a', 'b'] });
staleSpy.mockResolvedValueOnce({ reaped: 1, jobIds: ['c'] });
sweepSpy.mockResolvedValueOnce({ ...emptyGc, leasesDeleted: 4, tokensDeleted: 2 });
startLeaseReaper(log);
await vi.advanceTimersByTimeAsync(0); // flush boot pass (reclaim + sweep)
const s = getReaperStats();
expect(s.running).toBe(true);
expect(s.startedAt).toBeTruthy();
expect(s.totals.expiredReclaimed - before.expiredReclaimed).toBe(2);
expect(s.totals.staleReclaimed - before.staleReclaimed).toBe(1);
expect(s.totals.leasesDeleted - before.leasesDeleted).toBe(4);
expect(s.totals.tokensDeleted - before.tokensDeleted).toBe(2);
expect(s.lastReclaimAt).toBeTruthy();
expect(s.lastSweepAt).toBeTruthy();
});
it('swallows a failing pass and keeps the timer alive', async () => {
reapSpy.mockRejectedValueOnce(new Error('cosmos unavailable'));
startLeaseReaper(log);

View File

@ -42,6 +42,48 @@ interface ReaperLog {
warn?: (...a: unknown[]) => void;
}
/** Cumulative reaper/GC activity since the process started — operator telemetry. */
export interface ReaperStats {
running: boolean;
startedAt: string | null;
lastReclaimAt: string | null;
lastSweepAt: string | null;
totals: {
expiredReclaimed: number;
staleReclaimed: number;
leasesDeleted: number;
factoriesDeleted: number;
tokensDeleted: number;
jobsDeleted: number;
runsDeleted: number;
eventsDeleted: number;
artifactsDeleted: number;
};
}
const stats: ReaperStats = {
running: false,
startedAt: null,
lastReclaimAt: null,
lastSweepAt: null,
totals: {
expiredReclaimed: 0,
staleReclaimed: 0,
leasesDeleted: 0,
factoriesDeleted: 0,
tokensDeleted: 0,
jobsDeleted: 0,
runsDeleted: 0,
eventsDeleted: 0,
artifactsDeleted: 0,
},
};
/** Snapshot of cumulative reaper/GC activity (process-wide; surfaced in metrics). */
export function getReaperStats(): ReaperStats {
return { ...stats, running: reaperInterval !== null, totals: { ...stats.totals } };
}
/** One reap pass: expiry reclaim + stale-factory reclaim, then a throttled GC
* sweep. Never throws a transient datastore error is logged and the next tick
* retries (recovery must not crash the service). */
@ -52,6 +94,9 @@ async function runReapPass(log?: ReaperLog): Promise<void> {
const stale = await reclaimStaleFactoryLeases(nowMs);
const reclaimed = [...expired.jobIds, ...stale.jobIds];
if (reclaimed.length > 0) {
stats.lastReclaimAt = new Date(nowMs).toISOString();
stats.totals.expiredReclaimed += expired.reaped;
stats.totals.staleReclaimed += stale.reaped;
log?.info(
`[fleet-reaper] reclaimed ${reclaimed.length} job(s) ` +
`(${expired.reaped} expired-lease, ${stale.reaped} stale-factory): ${reclaimed.join(', ')}`
@ -67,6 +112,14 @@ async function runReapPass(log?: ReaperLog): Promise<void> {
lastSweepMs = nowMs;
try {
const gc = await sweepFleetGarbage({ now: nowMs, jobRetentionMs: jobRetentionMs() });
stats.lastSweepAt = new Date(nowMs).toISOString();
stats.totals.leasesDeleted += gc.leasesDeleted;
stats.totals.factoriesDeleted += gc.factoriesDeleted;
stats.totals.tokensDeleted += gc.tokensDeleted;
stats.totals.jobsDeleted += gc.jobsDeleted;
stats.totals.runsDeleted += gc.runsDeleted;
stats.totals.eventsDeleted += gc.eventsDeleted;
stats.totals.artifactsDeleted += gc.artifactsDeleted;
const total =
gc.leasesDeleted +
gc.factoriesDeleted +
@ -92,6 +145,7 @@ export function startLeaseReaper(log?: ReaperLog): void {
if (reaperInterval) return;
// Let the first tick run the GC sweep too (don't skip it for an hour on boot).
lastSweepMs = 0;
stats.startedAt = new Date().toISOString();
log?.info(`[fleet-reaper] starting (reclaim every ${REAP_INTERVAL_MS / 1000}s)`);
// Run once on boot so a restart promptly recovers jobs orphaned while down.
void runReapPass(log);

View File

@ -33,6 +33,7 @@ import * as coordinator from './coordinator.js';
import * as artifactsBlob from './artifacts-blob.js';
import * as enrollment from './enrollment.js';
import * as trackerBridge from './tracker-bridge.js';
import { getReaperStats } from './reaper.js';
import {
SubmitJobSchema,
ListJobsQuerySchema,
@ -445,7 +446,10 @@ export async function fleetRoutes(app: FastifyInstance) {
app.get('/fleet/metrics', async req => {
await extractAuth(req);
const pid = getRequestProductId(req);
return coordinator.fleetMetrics(pid);
const metrics = await coordinator.fleetMetrics(pid);
// Attach process-wide recovery/GC telemetry (the reaper runs across products,
// so these counters are global — operator visibility into recovery activity).
return { ...metrics, reaper: getReaperStats() };
});
// ── M0 RU gate: per-product queue version (cheap ~1 RU point read) ──