From 5413c0e78993e01f9cdeb7ae3d457fd98d8b5081 Mon Sep 17 00:00:00 2001 From: saravanakumardb1 Date: Mon, 1 Jun 2026 12:18:22 -0700 Subject: [PATCH] feat(fleet): reaper/GC telemetry in /fleet/metrics + lease TTL backstop Observability + defense-in-depth for the recovery/GC machinery: - The reaper now accumulates process-wide telemetry (getReaperStats): cumulative expired/stale reclaims and per-container GC deletions, plus startedAt/last-run timestamps. GET /fleet/metrics returns it under a `reaper` field so operators can see recovery activity (dead_letter counts/alerts were already added). - Cosmos TTL backstop on fleet_leases (2 days): a held lease is renewed continuously so it never expires while active; only finished leases age out, matching the ~24h app GC. Purely defense-in-depth behind the reaper, which still OWNS recovery (requeue + epoch bump + checkpoint). TTL is deliberately NOT set on fleet_events (ids are :evt: with seq=count, so partial TTL deletion could collide ids); events/runs/jobs are pruned by the cascade GC instead. Memory provider ignores defaultTtl, so tests/dev are unaffected. Generated with [Devin](https://cli.devin.ai/docs) Co-Authored-By: Devin <158243242+devin-ai-integration[bot]@users.noreply.github.com> --- .../platform-service/src/lib/cosmos-init.ts | 10 +++- .../src/modules/fleet/reaper.test.ts | 26 ++++++++- .../src/modules/fleet/reaper.ts | 54 +++++++++++++++++++ .../src/modules/fleet/routes.ts | 6 ++- 4 files changed, 93 insertions(+), 3 deletions(-) diff --git a/services/platform-service/src/lib/cosmos-init.ts b/services/platform-service/src/lib/cosmos-init.ts index 41544842..8b40e341 100644 --- a/services/platform-service/src/lib/cosmos-init.ts +++ b/services/platform-service/src/lib/cosmos-init.ts @@ -200,7 +200,15 @@ const CONTAINER_DEFS: Record = { ], }, fleet_runs: { partitionKeyPath: '/jobId' }, - fleet_leases: { partitionKeyPath: '/jobId' }, + // TTL backstop (2 days) for lease docs: a HELD lease is renewed continuously so + // its _ts stays fresh and it never expires while active; only finished + // (released/expired) leases — which stop being written — age out. This is purely + // defense-in-depth behind the app-level reaper/GC (which deletes finished leases + // after ~24h and owns recovery: requeue + epoch bump + checkpoint). NOTE: TTL is + // deliberately NOT set on fleet_events (event ids are `:evt:` where + // seq = current count, so partial TTL deletion could collide ids) — events/runs/ + // jobs are pruned by the cascade GC instead. + fleet_leases: { partitionKeyPath: '/jobId', defaultTtl: 2 * 86400 }, fleet_factories: { partitionKeyPath: '/productId' }, fleet_profiles: { partitionKeyPath: '/productId' }, fleet_events: { partitionKeyPath: '/jobId' }, diff --git a/services/platform-service/src/modules/fleet/reaper.test.ts b/services/platform-service/src/modules/fleet/reaper.test.ts index 8a95d887..22ee095b 100644 --- a/services/platform-service/src/modules/fleet/reaper.test.ts +++ b/services/platform-service/src/modules/fleet/reaper.test.ts @@ -18,7 +18,12 @@ vi.mock('./coordinator.js', () => ({ sweepFleetGarbage: sweepSpy, })); -import { startLeaseReaper, stopLeaseReaper, isLeaseReaperRunning } from './reaper.js'; +import { + startLeaseReaper, + stopLeaseReaper, + isLeaseReaperRunning, + getReaperStats, +} from './reaper.js'; const log = { info: vi.fn(), warn: vi.fn() }; const emptyGc = { @@ -109,6 +114,25 @@ describe('fleet lease reaper', () => { expect(() => stopLeaseReaper()).not.toThrow(); }); + it('accumulates reclaim + gc telemetry in getReaperStats', async () => { + const before = getReaperStats().totals; + reapSpy.mockResolvedValueOnce({ reaped: 2, jobIds: ['a', 'b'] }); + staleSpy.mockResolvedValueOnce({ reaped: 1, jobIds: ['c'] }); + sweepSpy.mockResolvedValueOnce({ ...emptyGc, leasesDeleted: 4, tokensDeleted: 2 }); + startLeaseReaper(log); + await vi.advanceTimersByTimeAsync(0); // flush boot pass (reclaim + sweep) + + const s = getReaperStats(); + expect(s.running).toBe(true); + expect(s.startedAt).toBeTruthy(); + expect(s.totals.expiredReclaimed - before.expiredReclaimed).toBe(2); + expect(s.totals.staleReclaimed - before.staleReclaimed).toBe(1); + expect(s.totals.leasesDeleted - before.leasesDeleted).toBe(4); + expect(s.totals.tokensDeleted - before.tokensDeleted).toBe(2); + expect(s.lastReclaimAt).toBeTruthy(); + expect(s.lastSweepAt).toBeTruthy(); + }); + it('swallows a failing pass and keeps the timer alive', async () => { reapSpy.mockRejectedValueOnce(new Error('cosmos unavailable')); startLeaseReaper(log); diff --git a/services/platform-service/src/modules/fleet/reaper.ts b/services/platform-service/src/modules/fleet/reaper.ts index bc683163..881a0393 100644 --- a/services/platform-service/src/modules/fleet/reaper.ts +++ b/services/platform-service/src/modules/fleet/reaper.ts @@ -42,6 +42,48 @@ interface ReaperLog { warn?: (...a: unknown[]) => void; } +/** Cumulative reaper/GC activity since the process started — operator telemetry. */ +export interface ReaperStats { + running: boolean; + startedAt: string | null; + lastReclaimAt: string | null; + lastSweepAt: string | null; + totals: { + expiredReclaimed: number; + staleReclaimed: number; + leasesDeleted: number; + factoriesDeleted: number; + tokensDeleted: number; + jobsDeleted: number; + runsDeleted: number; + eventsDeleted: number; + artifactsDeleted: number; + }; +} + +const stats: ReaperStats = { + running: false, + startedAt: null, + lastReclaimAt: null, + lastSweepAt: null, + totals: { + expiredReclaimed: 0, + staleReclaimed: 0, + leasesDeleted: 0, + factoriesDeleted: 0, + tokensDeleted: 0, + jobsDeleted: 0, + runsDeleted: 0, + eventsDeleted: 0, + artifactsDeleted: 0, + }, +}; + +/** Snapshot of cumulative reaper/GC activity (process-wide; surfaced in metrics). */ +export function getReaperStats(): ReaperStats { + return { ...stats, running: reaperInterval !== null, totals: { ...stats.totals } }; +} + /** One reap pass: expiry reclaim + stale-factory reclaim, then a throttled GC * sweep. Never throws — a transient datastore error is logged and the next tick * retries (recovery must not crash the service). */ @@ -52,6 +94,9 @@ async function runReapPass(log?: ReaperLog): Promise { const stale = await reclaimStaleFactoryLeases(nowMs); const reclaimed = [...expired.jobIds, ...stale.jobIds]; if (reclaimed.length > 0) { + stats.lastReclaimAt = new Date(nowMs).toISOString(); + stats.totals.expiredReclaimed += expired.reaped; + stats.totals.staleReclaimed += stale.reaped; log?.info( `[fleet-reaper] reclaimed ${reclaimed.length} job(s) ` + `(${expired.reaped} expired-lease, ${stale.reaped} stale-factory): ${reclaimed.join(', ')}` @@ -67,6 +112,14 @@ async function runReapPass(log?: ReaperLog): Promise { lastSweepMs = nowMs; try { const gc = await sweepFleetGarbage({ now: nowMs, jobRetentionMs: jobRetentionMs() }); + stats.lastSweepAt = new Date(nowMs).toISOString(); + stats.totals.leasesDeleted += gc.leasesDeleted; + stats.totals.factoriesDeleted += gc.factoriesDeleted; + stats.totals.tokensDeleted += gc.tokensDeleted; + stats.totals.jobsDeleted += gc.jobsDeleted; + stats.totals.runsDeleted += gc.runsDeleted; + stats.totals.eventsDeleted += gc.eventsDeleted; + stats.totals.artifactsDeleted += gc.artifactsDeleted; const total = gc.leasesDeleted + gc.factoriesDeleted + @@ -92,6 +145,7 @@ export function startLeaseReaper(log?: ReaperLog): void { if (reaperInterval) return; // Let the first tick run the GC sweep too (don't skip it for an hour on boot). lastSweepMs = 0; + stats.startedAt = new Date().toISOString(); log?.info(`[fleet-reaper] starting (reclaim every ${REAP_INTERVAL_MS / 1000}s)`); // Run once on boot so a restart promptly recovers jobs orphaned while down. void runReapPass(log); diff --git a/services/platform-service/src/modules/fleet/routes.ts b/services/platform-service/src/modules/fleet/routes.ts index 527af349..e3426893 100644 --- a/services/platform-service/src/modules/fleet/routes.ts +++ b/services/platform-service/src/modules/fleet/routes.ts @@ -33,6 +33,7 @@ import * as coordinator from './coordinator.js'; import * as artifactsBlob from './artifacts-blob.js'; import * as enrollment from './enrollment.js'; import * as trackerBridge from './tracker-bridge.js'; +import { getReaperStats } from './reaper.js'; import { SubmitJobSchema, ListJobsQuerySchema, @@ -445,7 +446,10 @@ export async function fleetRoutes(app: FastifyInstance) { app.get('/fleet/metrics', async req => { await extractAuth(req); const pid = getRequestProductId(req); - return coordinator.fleetMetrics(pid); + const metrics = await coordinator.fleetMetrics(pid); + // Attach process-wide recovery/GC telemetry (the reaper runs across products, + // so these counters are global — operator visibility into recovery activity). + return { ...metrics, reaper: getReaperStats() }; }); // ── M0 RU gate: per-product queue version (cheap ~1 RU point read) ──