diff --git a/services/platform-service/src/modules/fleet/cleanup.test.ts b/services/platform-service/src/modules/fleet/cleanup.test.ts new file mode 100644 index 00000000..256c8fc5 --- /dev/null +++ b/services/platform-service/src/modules/fleet/cleanup.test.ts @@ -0,0 +1,224 @@ +/** + * Fleet cleanup/recovery — stale-factory lease reclaim + garbage-collection sweep. + * Memory provider; deterministic (no real clock — `now` is injected). + */ + +import { afterEach, beforeEach, describe, expect, it } from 'vitest'; +import { MemoryDatastoreProvider } from '@bytelyst/datastore'; +import { _resetDatastoreProvider, setProvider } from '../../lib/datastore.js'; +import * as repo from './repository.js'; +import * as coord from './coordinator.js'; +import { + FleetJobDocSchema, + type FleetJobDoc, + type FleetLeaseDoc, + type SubmitJobInput, +} from './types.js'; + +const PID = 'lysnrai'; + +function input(over: Partial = {}): SubmitJobInput { + return { + idempotencyKey: 'task-1', + bodyMd: '# do the thing', + priority: 'medium', + capabilities: [], + prefersEngine: [], + allowedScope: [], + deps: [], + kind: 'leaf', + ...over, + }; +} + +const factory = (over = {}) => ({ + productId: PID, + factoryId: 'fac_1', + capabilities: [] as string[], + leaseSeconds: 900, + ...over, +}); + +describe('fleet cleanup — reclaimStaleFactoryLeases', () => { + beforeEach(() => setProvider(new MemoryDatastoreProvider())); + afterEach(() => _resetDatastoreProvider()); + + it('reclaims a held lease whose holder factory stopped heartbeating', async () => { + const { job } = await coord.submitJob(PID, input()); + await coord.heartbeat({ productId: PID, factoryId: 'fac_1', capabilities: [], load: 0 }); + const claim = await coord.claimNextJob(factory()); + expect(claim?.job.stage).toBe('assigned'); + + // Factory goes silent: rewrite its heartbeat to ~5 minutes ago. + const fac = await repo.getFactory('fac_1', PID); + await repo.upsertFactory({ + ...fac!, + lastHeartbeatAt: new Date(Date.now() - 5 * 60_000).toISOString(), + }); + + const res = await coord.reclaimStaleFactoryLeases(Date.now()); + expect(res.reaped).toBe(1); + expect(res.jobIds).toEqual([job.id]); + + const after = await repo.getJob(job.id, PID); + expect(after?.stage).toBe('queued'); // returned to the queue + expect(after?.leaseEpoch).toBe(claim!.job.leaseEpoch + 1); // zombie fenced + expect((await repo.getLease(job.id))?.status).toBe('expired'); + }); + + it('reclaims a held lease whose holder factory never registered (vanished)', async () => { + const { job } = await coord.submitJob(PID, input()); + await coord.claimNextJob(factory()); // no heartbeat ⇒ no factory doc + const res = await coord.reclaimStaleFactoryLeases(Date.now()); + expect(res.reaped).toBe(1); + expect((await repo.getJob(job.id, PID))?.stage).toBe('queued'); + }); + + it('leaves a fresh factory\u2019s lease untouched', async () => { + await coord.submitJob(PID, input()); + await coord.heartbeat({ productId: PID, factoryId: 'fac_1', capabilities: [], load: 0 }); + const claim = await coord.claimNextJob(factory()); + const res = await coord.reclaimStaleFactoryLeases(Date.now()); + expect(res.reaped).toBe(0); + expect((await repo.getJob(claim!.job.id, PID))?.stage).toBe('assigned'); + }); +}); + +describe('fleet cleanup — sweepFleetGarbage', () => { + beforeEach(() => setProvider(new MemoryDatastoreProvider())); + afterEach(() => _resetDatastoreProvider()); + + const now = Date.now(); + + /** Seed a terminal job directly (the repo write paths force updatedAt=now, so we + * go through createJob to control the backdated updatedAt the GC keys on). */ + async function seedTerminalJob( + id: string, + ageMs: number, + stage = 'shipped' + ): Promise { + const at = new Date(now - ageMs).toISOString(); + const doc = FleetJobDocSchema.parse({ + id, + productId: PID, + stage, + idempotencyKey: id, + contentHash: 'h', + bodyMd: '# done', + manifestSnapshot: { + priority: 'medium', + capabilities: [], + prefersEngine: [], + allowedScope: [], + deps: [], + }, + priority: 'medium', + priorityOrder: 2, + capabilities: [], + deps: [], + kind: 'leaf', + attempts: 1, + leaseEpoch: 0, + rev: 0, + createdAt: at, + updatedAt: at, + }); + return repo.createJob(doc); + } + + async function seedFinishedLease(jobId: string, status: 'expired' | 'released', ageMs: number) { + const doc: FleetLeaseDoc = { + id: jobId, + productId: PID, + jobId, + status, + leaseEpoch: 1, + renewals: 0, + rev: 0, + updatedAt: new Date(now - ageMs).toISOString(), + }; + await repo.createLease(doc); + } + + it('deletes finished leases older than the TTL, keeps recent ones', async () => { + await seedFinishedLease('old', 'released', 48 * 60 * 60 * 1000); // 2 days + await seedFinishedLease('recent', 'expired', 60_000); // 1 min + + const res = await coord.sweepFleetGarbage({ now }); + expect(res.leasesDeleted).toBe(1); + expect(await repo.getLease('old')).toBeNull(); + expect(await repo.getLease('recent')).not.toBeNull(); + }); + + it('deletes long-dead factory docs, keeps live ones', async () => { + await repo.upsertFactory({ + id: 'dead', + productId: PID, + factoryId: 'dead', + descriptor: {}, + capabilities: [], + health: 'ok', + load: 0, + seatLimit: 1, + lastHeartbeatAt: new Date(now - 10 * 24 * 60 * 60 * 1000).toISOString(), // 10 days + }); + await repo.upsertFactory({ + id: 'live', + productId: PID, + factoryId: 'live', + descriptor: {}, + capabilities: [], + health: 'ok', + load: 0, + seatLimit: 1, + lastHeartbeatAt: new Date(now).toISOString(), + }); + + const res = await coord.sweepFleetGarbage({ now }); + expect(res.factoriesDeleted).toBe(1); + expect(await repo.getFactory('dead', PID)).toBeNull(); + expect(await repo.getFactory('live', PID)).not.toBeNull(); + }); + + it('never prunes terminal jobs/history by default (jobRetentionMs unset)', async () => { + const job = await seedTerminalJob('jt_old', 365 * 24 * 60 * 60 * 1000); // 1 year + const res = await coord.sweepFleetGarbage({ now }); + expect(res.jobsDeleted).toBe(0); + expect(await repo.getJob(job.id, PID)).not.toBeNull(); + }); + + it('prunes old terminal jobs and cascades runs/events when retention is set', async () => { + const job = await seedTerminalJob('jt_40d', 40 * 24 * 60 * 60 * 1000); // 40 days + await repo.createRun({ + id: 'run_1', + productId: PID, + jobId: job.id, + attempt: 1, + engine: 'devin', + startedAt: new Date(now).toISOString(), + insights: {}, + }); + await repo.appendEvent({ jobId: job.id, productId: PID, type: 'submitted' }); + + const res = await coord.sweepFleetGarbage({ + now, + jobRetentionMs: 30 * 24 * 60 * 60 * 1000, // 30-day retention + }); + expect(res.jobsDeleted).toBe(1); + expect(res.runsDeleted).toBe(1); + expect(res.eventsDeleted).toBeGreaterThanOrEqual(1); + expect(await repo.getJob(job.id, PID)).toBeNull(); + expect(await repo.listRunsByJob(job.id)).toHaveLength(0); + expect(await repo.listEvents(job.id)).toHaveLength(0); + }); + + it('keeps terminal jobs newer than the retention window', async () => { + const job = await seedTerminalJob('jt_5d', 5 * 24 * 60 * 60 * 1000); // 5 days + const res = await coord.sweepFleetGarbage({ + now, + jobRetentionMs: 30 * 24 * 60 * 60 * 1000, + }); + expect(res.jobsDeleted).toBe(0); + expect(await repo.getJob(job.id, PID)).not.toBeNull(); + }); +}); diff --git a/services/platform-service/src/modules/fleet/coordinator.ts b/services/platform-service/src/modules/fleet/coordinator.ts index 4de03feb..0f016dff 100644 --- a/services/platform-service/src/modules/fleet/coordinator.ts +++ b/services/platform-service/src/modules/fleet/coordinator.ts @@ -22,6 +22,7 @@ import { execFile } from 'node:child_process'; import { promisify } from 'node:util'; import { BadRequestError, ConflictError } from '../../lib/errors.js'; import * as repo from './repository.js'; +import { deleteArtifact as deleteArtifactWithBlob } from './artifacts-blob.js'; import { selectJob, selectPreemptionVictim, @@ -1739,6 +1740,48 @@ export interface ReapResult { jobIds: string[]; } +/** + * Reclaim ONE held lease's job: fence the zombie holder (bump leaseEpoch), return + * the job to queued/blocked preserving its checkpoint, mark the lease terminal, and + * emit an audit event. Returns the reclaimed jobId, or null when there was nothing + * to do (job gone, already resting, or a lost CAS race — all idempotent no-ops). + * Shared by the expiry reaper and the stale-factory reclaim so both behave identically. + */ +async function reclaimLeaseJob( + lease: FleetLeaseDoc, + eventType: 'lease_expired' | 'factory_stale' +): Promise { + const job = await repo.getJob(lease.jobId, lease.productId); + if (!job) return null; + if (!ACTIVE_STAGES.includes(job.stage)) return null; // already resting; nothing to reclaim + const newEpoch = job.leaseEpoch + 1; // fence the zombie holder + const unmet = await unmetDeps(job); + const stage: FleetStage = unmet.length > 0 ? 'blocked' : 'queued'; + // checkpoint pointer is intentionally preserved on the job (resume-friendly) + const jobRes = await repo.revUpdateJob(job.id, lease.productId, job.rev, { + stage, + leaseEpoch: newEpoch, + blockedReason: unmet.length > 0 ? `waiting on: ${unmet.join(', ')}` : undefined, + }); + if (!jobRes.ok) return null; + await repo.revUpdateLease(lease.jobId, lease.rev, { + status: 'expired', + leaseEpoch: newEpoch, + holderFactoryId: undefined, + }); + await repo.appendEvent({ + jobId: job.id, + productId: lease.productId, + type: eventType, + data: { + leaseEpoch: newEpoch, + returnedTo: stage, + ...(lease.holderFactoryId ? { holderFactoryId: lease.holderFactoryId } : {}), + }, + }); + return job.id; +} + /** * Reclaim jobs whose lease has expired. Cosmos TTL would only DELETE the lease * doc — it cannot return the job to `queued`, bump the fencing epoch, or preserve @@ -1749,31 +1792,153 @@ export async function reapExpiredLeases(nowIso: string): Promise { const expired = await repo.listExpiredLeases(nowIso); const jobIds: string[] = []; for (const lease of expired) { - const job = await repo.getJob(lease.jobId, lease.productId); - if (!job) continue; - if (!ACTIVE_STAGES.includes(job.stage)) continue; // already resting; nothing to reclaim - const newEpoch = job.leaseEpoch + 1; // fence the zombie holder - const unmet = await unmetDeps(job); - const stage: FleetStage = unmet.length > 0 ? 'blocked' : 'queued'; - // checkpoint pointer is intentionally preserved on the job (resume-friendly) - const jobRes = await repo.revUpdateJob(job.id, lease.productId, job.rev, { - stage, - leaseEpoch: newEpoch, - blockedReason: unmet.length > 0 ? `waiting on: ${unmet.join(', ')}` : undefined, - }); - if (!jobRes.ok) continue; - await repo.revUpdateLease(lease.jobId, lease.rev, { - status: 'expired', - leaseEpoch: newEpoch, - holderFactoryId: undefined, - }); - await repo.appendEvent({ - jobId: job.id, - productId: lease.productId, - type: 'lease_expired', - data: { leaseEpoch: newEpoch, returnedTo: stage }, - }); - jobIds.push(job.id); + const id = await reclaimLeaseJob(lease, 'lease_expired'); + if (id) jobIds.push(id); } return { reaped: jobIds.length, jobIds }; } + +/** + * Reclaim held leases whose holder factory has gone stale (missed heartbeats past + * `staleMaxAgeMs`) or vanished entirely, without waiting out the (much longer) + * lease TTL. A crashed/network-partitioned factory stops heartbeating long before + * its 900s lease expires; this recovers its job within one stale window instead. + * Idempotent and safe alongside the expiry reaper (both go through reclaimLeaseJob). + */ +export async function reclaimStaleFactoryLeases( + nowMs: number, + staleMaxAgeMs = DEFAULT_STALE_FACTORY_MS +): Promise { + const [held, factories] = await Promise.all([repo.listHeldLeases(), repo.listAllFactories()]); + // factoryId → most-recent heartbeat (a factory can appear once per product). + const lastBeat = new Map(); + for (const f of factories) { + const t = Date.parse(f.lastHeartbeatAt); + if (Number.isNaN(t)) continue; + lastBeat.set(f.factoryId, Math.max(lastBeat.get(f.factoryId) ?? 0, t)); + } + const jobIds: string[] = []; + for (const lease of held) { + const holder = lease.holderFactoryId; + if (!holder) continue; + const beat = lastBeat.get(holder); + // Reclaim when the holder is unknown (vanished) or hasn't heartbeat recently. + const isStale = beat === undefined || nowMs - beat > staleMaxAgeMs; + if (!isStale) continue; + const id = await reclaimLeaseJob(lease, 'factory_stale'); + if (id) jobIds.push(id); + } + return { reaped: jobIds.length, jobIds }; +} + +/** Terminal job stages eligible for retention GC (resting, never to run again). */ +export const GC_TERMINAL_STAGES: readonly FleetStage[] = ['shipped', 'failed', 'dead_letter']; + +export interface SweepOptions { + /** Authoritative now (ms epoch). */ + now: number; + /** Delete finished (expired/released) lease docs older than this. Default 24h. */ + finishedLeaseTtlMs?: number; + /** Delete factory docs with no heartbeat for longer than this. Default 7d. */ + deadFactoryTtlMs?: number; + /** When > 0, delete terminal jobs (and cascade runs/events/artifacts/lease) last + * updated longer ago than this. 0/undefined ⇒ DISABLED (never deletes history). */ + jobRetentionMs?: number; +} + +export interface SweepResult { + leasesDeleted: number; + factoriesDeleted: number; + jobsDeleted: number; + runsDeleted: number; + eventsDeleted: number; + artifactsDeleted: number; +} + +const DEFAULT_FINISHED_LEASE_TTL_MS = 24 * 60 * 60 * 1000; // 1 day +const DEFAULT_DEAD_FACTORY_TTL_MS = 7 * 24 * 60 * 60 * 1000; // 7 days + +/** + * Garbage-collect fleet bookkeeping so the containers don't grow without bound. + * + * On by default (safe — these are coordination ephemera, not history): + * • finished leases (expired/released) older than `finishedLeaseTtlMs` + * • factory docs with no heartbeat for longer than `deadFactoryTtlMs` (a live + * host simply re-registers on its next heartbeat) + * + * Opt-in only (destroys audit history — `jobRetentionMs` must be explicitly > 0): + * • terminal jobs older than `jobRetentionMs`, cascading to their runs, events, + * artifact pointers (+ backing blobs, best-effort), and lease. + * + * Every deletion is best-effort: one failure is swallowed so the sweep makes + * forward progress and retries on the next pass. + */ +export async function sweepFleetGarbage(opts: SweepOptions): Promise { + const now = opts.now; + const finishedLeaseTtlMs = opts.finishedLeaseTtlMs ?? DEFAULT_FINISHED_LEASE_TTL_MS; + const deadFactoryTtlMs = opts.deadFactoryTtlMs ?? DEFAULT_DEAD_FACTORY_TTL_MS; + const jobRetentionMs = opts.jobRetentionMs ?? 0; + const result: SweepResult = { + leasesDeleted: 0, + factoriesDeleted: 0, + jobsDeleted: 0, + runsDeleted: 0, + eventsDeleted: 0, + artifactsDeleted: 0, + }; + + // 1) Finished leases (ephemeral coordination state) past their TTL. + const leaseCutoff = new Date(now - finishedLeaseTtlMs).toISOString(); + for (const lease of await repo.listFinishedLeasesOlderThan(leaseCutoff)) { + try { + await repo.deleteLease(lease.jobId); + result.leasesDeleted++; + } catch { + /* best-effort */ + } + } + + // 2) Long-dead factory docs (a live host re-registers on next heartbeat). + for (const f of await repo.listAllFactories()) { + const beat = Date.parse(f.lastHeartbeatAt); + if (Number.isNaN(beat) || now - beat <= deadFactoryTtlMs) continue; + try { + await repo.deleteFactory(f.factoryId, f.productId); + result.factoriesDeleted++; + } catch { + /* best-effort */ + } + } + + // 3) Terminal-job retention cascade — OPT-IN only (destroys history). + if (jobRetentionMs > 0) { + const jobCutoff = new Date(now - jobRetentionMs).toISOString(); + for (const job of await repo.listTerminalJobsOlderThan(jobCutoff, GC_TERMINAL_STAGES)) { + try { + for (const run of await repo.listRunsByJob(job.id)) { + await repo.deleteRun(run.id, job.id); + result.runsDeleted++; + } + for (const ev of await repo.listEvents(job.id)) { + await repo.deleteEvent(ev.id, job.id); + result.eventsDeleted++; + } + for (const art of await repo.listArtifactsByJob(job.id, job.productId)) { + try { + await deleteArtifactWithBlob(art.id, job.productId); + } catch { + await repo.deleteArtifact(art.id, job.productId); // pointer at least + } + result.artifactsDeleted++; + } + await repo.deleteLease(job.id).catch(() => {}); + await repo.deleteJob(job.id, job.productId); + result.jobsDeleted++; + } catch { + /* best-effort: leave this job for the next pass */ + } + } + } + + return result; +} diff --git a/services/platform-service/src/modules/fleet/reaper.test.ts b/services/platform-service/src/modules/fleet/reaper.test.ts index 8c3c6bea..0f7dffbe 100644 --- a/services/platform-service/src/modules/fleet/reaper.test.ts +++ b/services/platform-service/src/modules/fleet/reaper.test.ts @@ -6,21 +6,39 @@ import { describe, it, expect, vi, beforeEach, afterEach } from 'vitest'; -const { reapSpy } = vi.hoisted(() => ({ reapSpy: vi.fn() })); +const { reapSpy, staleSpy, sweepSpy } = vi.hoisted(() => ({ + reapSpy: vi.fn(), + staleSpy: vi.fn(), + sweepSpy: vi.fn(), +})); vi.mock('./coordinator.js', () => ({ reapExpiredLeases: reapSpy, + reclaimStaleFactoryLeases: staleSpy, + sweepFleetGarbage: sweepSpy, })); import { startLeaseReaper, stopLeaseReaper, isLeaseReaperRunning } from './reaper.js'; const log = { info: vi.fn(), warn: vi.fn() }; +const emptyGc = { + leasesDeleted: 0, + factoriesDeleted: 0, + jobsDeleted: 0, + runsDeleted: 0, + eventsDeleted: 0, + artifactsDeleted: 0, +}; describe('fleet lease reaper', () => { beforeEach(() => { vi.useFakeTimers(); reapSpy.mockReset(); + staleSpy.mockReset(); + sweepSpy.mockReset(); reapSpy.mockResolvedValue({ reaped: 0, jobIds: [] }); + staleSpy.mockResolvedValue({ reaped: 0, jobIds: [] }); + sweepSpy.mockResolvedValue({ ...emptyGc }); log.info.mockReset(); log.warn.mockReset(); }); @@ -47,11 +65,30 @@ describe('fleet lease reaper', () => { expect(reapSpy).toHaveBeenCalledWith(expect.stringMatching(/^\d{4}-\d{2}-\d{2}T/)); }); - it('logs only when something was reclaimed', async () => { + it('also reclaims stale-factory leases each pass', async () => { + startLeaseReaper(log); + await vi.advanceTimersByTimeAsync(0); // flush the boot pass (stale runs after an await) + expect(staleSpy).toHaveBeenCalledTimes(1); + await vi.advanceTimersByTimeAsync(30_000); + expect(staleSpy).toHaveBeenCalledTimes(2); + }); + + it('logs a combined count when something was reclaimed', async () => { reapSpy.mockResolvedValueOnce({ reaped: 2, jobIds: ['fjob_a', 'fjob_b'] }); + staleSpy.mockResolvedValueOnce({ reaped: 1, jobIds: ['fjob_c'] }); startLeaseReaper(log); await vi.advanceTimersByTimeAsync(0); - expect(log.info).toHaveBeenCalledWith(expect.stringContaining('reclaimed 2')); + expect(log.info).toHaveBeenCalledWith(expect.stringContaining('reclaimed 3 job(s)')); + }); + + it('runs the GC sweep on boot then throttles it (hourly, not every tick)', async () => { + startLeaseReaper(log); + await vi.advanceTimersByTimeAsync(0); // flush the boot pass (sweep runs after awaits) + expect(sweepSpy).toHaveBeenCalledTimes(1); // boot sweep + await vi.advanceTimersByTimeAsync(30_000); // a fast reclaim tick — no sweep + expect(sweepSpy).toHaveBeenCalledTimes(1); + await vi.advanceTimersByTimeAsync(60 * 60 * 1000); // cross the hourly boundary + expect(sweepSpy).toHaveBeenCalledTimes(2); }); it('is idempotent: a second start does not double-schedule', async () => { @@ -75,7 +112,7 @@ describe('fleet lease reaper', () => { reapSpy.mockRejectedValueOnce(new Error('cosmos unavailable')); startLeaseReaper(log); await vi.advanceTimersByTimeAsync(0); // let the rejected boot pass settle - expect(log.warn).toHaveBeenCalledWith(expect.stringContaining('reap pass failed')); + expect(log.warn).toHaveBeenCalledWith(expect.stringContaining('reclaim pass failed')); // The interval is still scheduled and the next pass runs. reapSpy.mockResolvedValueOnce({ reaped: 0, jobIds: [] }); diff --git a/services/platform-service/src/modules/fleet/reaper.ts b/services/platform-service/src/modules/fleet/reaper.ts index 63d15637..6b72333b 100644 --- a/services/platform-service/src/modules/fleet/reaper.ts +++ b/services/platform-service/src/modules/fleet/reaper.ts @@ -1,47 +1,93 @@ /** - * Fleet lease reaper — background loop that reclaims jobs whose factory died. + * Fleet lease reaper — background loop that recovers and garbage-collects fleet + * state. Each pass does two recovery scans plus a periodic GC sweep: * - * `coordinator.reapExpiredLeases` implements the full §25 recovery (fence the - * zombie holder via a leaseEpoch bump, return the job to queued/blocked, preserve - * the checkpoint pointer), but it is a pure function that does nothing until - * something calls it. Without a scheduler a crashed/disconnected factory's job - * stays stuck in an active stage forever. This module is that scheduler: a single - * process-wide timer (leases are queried across all products), started once at - * server boot and stopped on graceful shutdown — mirroring the diagnostics - * trigger job pattern. + * • reapExpiredLeases — reclaim jobs whose lease TTL elapsed (§25 recovery: + * fence the zombie via a leaseEpoch bump, return the + * job to queued/blocked, preserve the checkpoint). + * • reclaimStaleFactoryLeases — reclaim held leases of factories that stopped + * heartbeating, long before the (much longer) TTL. + * • sweepFleetGarbage — (less frequently) delete finished leases and dead + * factory docs, and — only when retention is set — + * prune terminal jobs and their runs/events/artifacts. + * + * All of this is implemented as pure coordinator functions that do nothing until + * something calls them; this module is that scheduler. A single process-wide timer + * (the scans query across all products), started once at server boot and stopped + * on graceful shutdown, mirroring the diagnostics trigger-job pattern. */ -import { reapExpiredLeases } from './coordinator.js'; +import { reapExpiredLeases, reclaimStaleFactoryLeases, sweepFleetGarbage } from './coordinator.js'; -/** How often to scan for expired leases. Well under the runner's 90s stale +/** How often to scan for expired/stale leases. Well under the runner's 90s stale * threshold so a dead factory's job is reclaimed promptly. */ const REAP_INTERVAL_MS = 30_000; +/** Run the (heavier, deletion) GC sweep at most this often, regardless of the + * faster recovery cadence above. */ +const SWEEP_INTERVAL_MS = 60 * 60 * 1000; // hourly + +/** Terminal-job retention from env: `FLEET_GC_RETENTION_DAYS`. 0/unset ⇒ jobs and + * their history are NEVER deleted (only ephemeral leases/dead-factory docs are). */ +function jobRetentionMs(): number { + const days = Number(process.env.FLEET_GC_RETENTION_DAYS ?? '0'); + return Number.isFinite(days) && days > 0 ? days * 24 * 60 * 60 * 1000 : 0; +} + let reaperInterval: ReturnType | null = null; +let lastSweepMs = 0; interface ReaperLog { info: (...a: unknown[]) => void; warn?: (...a: unknown[]) => void; } -/** One reap pass. Never throws — a transient datastore error is logged and the - * next tick retries (recovery must not crash the service). */ +/** One reap pass: expiry reclaim + stale-factory reclaim, then a throttled GC + * sweep. Never throws — a transient datastore error is logged and the next tick + * retries (recovery must not crash the service). */ async function runReapPass(log?: ReaperLog): Promise { + const nowMs = Date.now(); try { - const { reaped, jobIds } = await reapExpiredLeases(new Date().toISOString()); - if (reaped > 0) { - log?.info(`[fleet-reaper] reclaimed ${reaped} expired-lease job(s): ${jobIds.join(', ')}`); + const expired = await reapExpiredLeases(new Date(nowMs).toISOString()); + const stale = await reclaimStaleFactoryLeases(nowMs); + const reclaimed = [...expired.jobIds, ...stale.jobIds]; + if (reclaimed.length > 0) { + log?.info( + `[fleet-reaper] reclaimed ${reclaimed.length} job(s) ` + + `(${expired.reaped} expired-lease, ${stale.reaped} stale-factory): ${reclaimed.join(', ')}` + ); } } catch (err) { const msg = err instanceof Error ? err.message : String(err); - (log?.warn ?? log?.info)?.(`[fleet-reaper] reap pass failed: ${msg}`); + (log?.warn ?? log?.info)?.(`[fleet-reaper] reclaim pass failed: ${msg}`); + } + + // Throttled GC sweep — deletions, so run far less often than recovery. + if (nowMs - lastSweepMs < SWEEP_INTERVAL_MS) return; + lastSweepMs = nowMs; + try { + const gc = await sweepFleetGarbage({ now: nowMs, jobRetentionMs: jobRetentionMs() }); + const total = + gc.leasesDeleted + gc.factoriesDeleted + gc.jobsDeleted + gc.runsDeleted + gc.eventsDeleted; + if (total > 0) { + log?.info( + `[fleet-reaper] gc: ${gc.leasesDeleted} lease(s), ${gc.factoriesDeleted} factory(ies), ` + + `${gc.jobsDeleted} job(s) + ${gc.runsDeleted} run(s)/${gc.eventsDeleted} event(s)/` + + `${gc.artifactsDeleted} artifact(s)` + ); + } + } catch (err) { + const msg = err instanceof Error ? err.message : String(err); + (log?.warn ?? log?.info)?.(`[fleet-reaper] gc sweep failed: ${msg}`); } } /** Start the lease reaper. Idempotent: a second call while running is a no-op. */ export function startLeaseReaper(log?: ReaperLog): void { if (reaperInterval) return; - log?.info(`[fleet-reaper] starting (every ${REAP_INTERVAL_MS / 1000}s)`); + // Let the first tick run the GC sweep too (don't skip it for an hour on boot). + lastSweepMs = 0; + log?.info(`[fleet-reaper] starting (reclaim every ${REAP_INTERVAL_MS / 1000}s)`); // Run once on boot so a restart promptly recovers jobs orphaned while down. void runReapPass(log); reaperInterval = globalThis.setInterval(() => { diff --git a/services/platform-service/src/modules/fleet/repository.ts b/services/platform-service/src/modules/fleet/repository.ts index fb6abe49..382d9e8d 100644 --- a/services/platform-service/src/modules/fleet/repository.ts +++ b/services/platform-service/src/modules/fleet/repository.ts @@ -190,6 +190,17 @@ export async function deleteJob(id: string, productId: string): Promise { await jobs().delete(id, productId); } +/** Terminal jobs (cross-partition) last updated before `cutoffIso` — retention GC + * candidates. `stages` is the terminal set the caller considers prunable. */ +export async function listTerminalJobsOlderThan( + cutoffIso: string, + stages: readonly FleetStage[] +): Promise { + return jobs().findMany({ + filter: { stage: { $in: stages as unknown as string[] }, updatedAt: { $lt: cutoffIso } }, + }); +} + /** * Record the last tracker Item status echoed for a job (§10 round-trip idempotency). * Deliberately does NOT bump `rev` — it is bookkeeping for the downstream echo, not a @@ -228,6 +239,10 @@ export async function listRunsByJob(jobId: string): Promise { return runs().findMany({ filter: { jobId }, sort: { attempt: 1 } }); } +export async function deleteRun(id: string, jobId: string): Promise { + await runs().delete(id, jobId); +} + /** All runs for a product (cross-partition) — used for cost aggregation. */ export async function listRunsByProduct(productId: string): Promise { return runs().findMany({ filter: { productId } }); @@ -263,6 +278,23 @@ export async function listExpiredLeases(nowIso: string): Promise { + return leases().findMany({ filter: { status: 'held' } }); +} + +/** Finished (expired/released) leases last touched before `cutoffIso` — GC candidates. + * These are pure coordination ephemera (not audit history), safe to delete. */ +export async function listFinishedLeasesOlderThan(cutoffIso: string): Promise { + return leases().findMany({ + filter: { status: { $in: ['expired', 'released'] }, updatedAt: { $lt: cutoffIso } }, + }); +} + +export async function deleteLease(jobId: string): Promise { + await leases().delete(jobId, jobId); +} + // ── Factories ───────────────────────────────────────────────────────────────── export async function getFactory( @@ -280,6 +312,16 @@ export async function listFactories(productId: string): Promise { + return factories().findMany({}); +} + +export async function deleteFactory(factoryId: string, productId: string): Promise { + await factories().delete(factoryId, productId); +} + // ── Profiles ──────────────────────────────────────────────────────────────── export async function createProfile(doc: FleetProfileDoc): Promise { @@ -326,6 +368,10 @@ export async function listEvents(jobId: string): Promise { return docs; } +export async function deleteEvent(id: string, jobId: string): Promise { + await events().delete(id, jobId); +} + // ── Artifacts (pointers only — bytes live in blob, never Cosmos; §13) ────────── export async function createArtifact(doc: FleetArtifactDoc): Promise {