From 8f51570da7196fac027b553ed554029223641ef1 Mon Sep 17 00:00:00 2001 From: saravanakumardb1 Date: Fri, 29 May 2026 20:20:30 -0700 Subject: [PATCH] =?UTF-8?q?feat(platform-service):=20fleet=20coordinator?= =?UTF-8?q?=20=E2=80=94=20claim/lease/fence/heartbeat/reaper=20(P2=20found?= =?UTF-8?q?ation)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The concurrency core (§4/§7/§8/§18/§25): - claimNextJob: priority+age selection over queued/dep-satisfied jobs whose caps are a subset of the factory's, then tryClaimJob does a rev CAS to flip to assigned + acquire the lease — exactly one contender wins, no double-assignment. - leases + fencing: acquire/reclaim bumps leaseEpoch; patchJobFenced/renew/release reject a call whose leaseEpoch < job.leaseEpoch (zombie worker can't overwrite). - heartbeat + isFactoryStale for factory liveness. - reapExpiredLeases: returns expired-lease jobs to queued/blocked, bumps the epoch (fencing the dead holder), preserves the checkpoint pointer (resume), marks the lease expired; idempotent. Documents why Cosmos TTL cannot do this. - submit: idempotent (dedup/supersede/409) + submit-time dependency cycle detection; deps gating (shipped, or testing when depsMode:soft). Tests drive the atomic-claim race, fencing, and reaper deterministically via the rev CAS (no real threads). --- .../src/modules/fleet/coordinator.test.ts | 240 +++++++++ .../src/modules/fleet/coordinator.ts | 503 ++++++++++++++++++ 2 files changed, 743 insertions(+) create mode 100644 services/platform-service/src/modules/fleet/coordinator.test.ts create mode 100644 services/platform-service/src/modules/fleet/coordinator.ts diff --git a/services/platform-service/src/modules/fleet/coordinator.test.ts b/services/platform-service/src/modules/fleet/coordinator.test.ts new file mode 100644 index 00000000..d24f0751 --- /dev/null +++ b/services/platform-service/src/modules/fleet/coordinator.test.ts @@ -0,0 +1,240 @@ +/** + * Fleet coordinator — the concurrency core. Memory provider; deterministic + * (race driven via the rev compare-and-swap, not real threads). + */ + +import { afterEach, beforeEach, describe, expect, it } from 'vitest'; +import { MemoryDatastoreProvider } from '@bytelyst/datastore'; +import { ConflictError, BadRequestError } from '@bytelyst/errors'; +import { _resetDatastoreProvider, setProvider } from '../../lib/datastore.js'; +import * as repo from './repository.js'; +import * as coord from './coordinator.js'; +import type { SubmitJobInput } from './types.js'; + +const PID = 'lysnrai'; + +function input(over: Partial = {}): SubmitJobInput { + return { + idempotencyKey: 'task-1', + bodyMd: '# do the thing', + priority: 'medium', + capabilities: [], + prefersEngine: [], + allowedScope: [], + deps: [], + kind: 'leaf', + ...over, + }; +} + +const factory = (over = {}) => ({ + productId: PID, + factoryId: 'fac_1', + capabilities: [] as string[], + leaseSeconds: 900, + ...over, +}); + +describe('fleet coordinator', () => { + beforeEach(() => setProvider(new MemoryDatastoreProvider())); + afterEach(() => _resetDatastoreProvider()); + + it('submit: new job is queued with no deps', async () => { + const { job, outcome } = await coord.submitJob(PID, input()); + expect(outcome).toBe('created'); + expect(job.stage).toBe('queued'); + expect(job.productId).toBe(PID); + }); + + // ── ATOMIC CLAIM RACE ── + it('atomic claim: two contenders on the same job version — exactly one wins', async () => { + const { job } = await coord.submitJob(PID, input()); + + // Both contenders see the SAME job version (rev). The CAS picks one winner. + const a = await coord.tryClaimJob(job, factory({ factoryId: 'fac_A' })); + const b = await coord.tryClaimJob(job, factory({ factoryId: 'fac_B' })); + + const oks = [a, b].filter(r => r.ok); + const conflicts = [a, b].filter(r => !r.ok); + expect(oks).toHaveLength(1); + expect(conflicts).toHaveLength(1); + expect(conflicts[0].ok === false && conflicts[0].reason).toBe('conflict'); + + // no double-assignment: the job is assigned exactly once, single run, single holder + const stored = await repo.getJob(job.id, PID); + expect(stored?.stage).toBe('assigned'); + expect(stored?.attempts).toBe(1); + expect(await repo.listRunsByJob(job.id)).toHaveLength(1); + const lease = await repo.getLease(job.id); + expect(lease?.leaseEpoch).toBe(1); + }); + + // ── PRIORITY + AGE SELECTION ── + it('claimNextJob returns highest-priority then oldest', async () => { + await coord.submitJob(PID, input({ idempotencyKey: 'low-old', priority: 'low' })); + await coord.submitJob(PID, input({ idempotencyKey: 'crit-new', priority: 'critical' })); + await coord.submitJob(PID, input({ idempotencyKey: 'med', priority: 'medium' })); + + const claim = await coord.claimNextJob(factory()); + expect(claim?.job.idempotencyKey).toBe('crit-new'); + }); + + it('claimNextJob respects capability subset', async () => { + await coord.submitJob(PID, input({ idempotencyKey: 'needs-mac', capabilities: ['os:mac'] })); + const noCaps = await coord.claimNextJob(factory({ capabilities: [] })); + expect(noCaps).toBeNull(); + const withCaps = await coord.claimNextJob(factory({ capabilities: ['os:mac', 'has:git'] })); + expect(withCaps?.job.idempotencyKey).toBe('needs-mac'); + }); + + // ── DEPS GATING ── + it('deps: a job with unmet deps is blocked and not claimable until the dep ships', async () => { + await coord.submitJob(PID, input({ idempotencyKey: 'A' })); + const { job: b } = await coord.submitJob(PID, input({ idempotencyKey: 'B', deps: ['A'] })); + expect(b.stage).toBe('blocked'); + + // first claim returns A (B is blocked) + const first = await coord.claimNextJob(factory()); + expect(first?.job.idempotencyKey).toBe('A'); + // B still not claimable + expect(await coord.claimNextJob(factory())).toBeNull(); + + // ship A → B becomes claimable + const a = await repo.getJob(first!.job.id, PID); + await repo.updateJob(a!.id, PID, { stage: 'shipped' }); + const second = await coord.claimNextJob(factory()); + expect(second?.job.idempotencyKey).toBe('B'); + }); + + it('deps-mode soft: dep satisfied when dependency is in testing', async () => { + await coord.submitJob(PID, input({ idempotencyKey: 'A' })); + const { job: c } = await coord.submitJob( + PID, + input({ idempotencyKey: 'C', deps: ['A'], depsMode: 'soft' }) + ); + expect(c.stage).toBe('blocked'); + const a = (await repo.findJobsByIdempotencyKey(PID, 'A'))[0]; + await repo.updateJob(a.id, PID, { stage: 'testing' }); + const unmet = await coord.unmetDeps((await repo.getJob(c.id, PID))!); + expect(unmet).toEqual([]); + }); + + it('cycle detection: a cyclic submit is rejected', async () => { + await coord.submitJob(PID, input({ idempotencyKey: 'B', deps: ['A'] })); + await expect( + coord.submitJob(PID, input({ idempotencyKey: 'A', deps: ['B'] })) + ).rejects.toBeInstanceOf(BadRequestError); + }); + + // ── FENCING ── + it('fencing: a stale leaseEpoch is rejected; the current epoch succeeds', async () => { + const { job } = await coord.submitJob(PID, input()); + const claim = await coord.claimNextJob(factory()); + expect(claim).not.toBeNull(); + const epoch = claim!.job.leaseEpoch; // 1 + + const stale = await coord.patchJobFenced(job.id, PID, { + leaseEpoch: epoch - 1, + stage: 'building', + }); + expect(stale.ok).toBe(false); + if (!stale.ok) expect(stale.reason).toBe('fenced'); + + const current = await coord.patchJobFenced(job.id, PID, { + leaseEpoch: epoch, + stage: 'building', + }); + expect(current.ok).toBe(true); + if (current.ok) expect(current.doc.stage).toBe('building'); + }); + + // ── REAPER ── + it('reaper: expired lease returns the job to queued, bumps epoch, preserves checkpoint; idempotent', async () => { + const { job } = await coord.submitJob(PID, input()); + const claim = await coord.claimNextJob(factory()); + const epoch0 = claim!.job.leaseEpoch; + + // worker checkpoints WIP, then dies; force the lease to be expired + await coord.patchJobFenced(job.id, PID, { + leaseEpoch: epoch0, + stage: 'building', + checkpoint: { wipBranch: 'aq/wip/x', wipBase: 'base1', wipCommit: 'c1' }, + }); + const lease = await repo.getLease(job.id); + await repo.revUpdateLease(job.id, lease!.rev, { expiresAt: '2000-01-01T00:00:00.000Z' }); + + const res = await coord.reapExpiredLeases(new Date().toISOString()); + expect(res.reaped).toBe(1); + + const reclaimed = await repo.getJob(job.id, PID); + expect(reclaimed?.stage).toBe('queued'); + expect(reclaimed?.leaseEpoch).toBe(epoch0 + 1); // fenced + expect(reclaimed?.checkpoint?.wipBranch).toBe('aq/wip/x'); // preserved + expect((await repo.getLease(job.id))?.status).toBe('expired'); + + // idempotent — running again reaps nothing + const again = await coord.reapExpiredLeases(new Date().toISOString()); + expect(again.reaped).toBe(0); + }); + + it('reaper: the zombie (old epoch) is fenced out after reclaim', async () => { + const { job } = await coord.submitJob(PID, input()); + const claim = await coord.claimNextJob(factory()); + const zombieEpoch = claim!.job.leaseEpoch; + await coord.patchJobFenced(job.id, PID, { leaseEpoch: zombieEpoch, stage: 'building' }); + const lease = await repo.getLease(job.id); + await repo.revUpdateLease(job.id, lease!.rev, { expiresAt: '2000-01-01T00:00:00.000Z' }); + await coord.reapExpiredLeases(new Date().toISOString()); + + // zombie tries to write with its now-stale epoch -> fenced + const zombie = await coord.patchJobFenced(job.id, PID, { + leaseEpoch: zombieEpoch, + stage: 'shipped', + }); + expect(zombie.ok).toBe(false); + if (!zombie.ok) expect(zombie.reason).toBe('fenced'); + }); + + // ── HEARTBEAT ── + it('heartbeat updates lastHeartbeatAt/health; staleness is detectable', async () => { + await coord.heartbeat({ + productId: PID, + factoryId: 'fac_1', + capabilities: ['os:mac'], + health: 'ok', + load: 1, + }); + const fac = await repo.getFactory('fac_1', PID); + expect(fac?.health).toBe('ok'); + expect(fac?.capabilities).toEqual(['os:mac']); + expect(coord.isFactoryStale(fac!, Date.now(), 60_000)).toBe(false); + const oldNow = new Date(fac!.lastHeartbeatAt).getTime() + 120_000; + expect(coord.isFactoryStale(fac!, oldNow, 60_000)).toBe(true); + }); + + // ── IDEMPOTENCY ── + it('idempotent submit: same key+content => 1 job', async () => { + const a = await coord.submitJob(PID, input({ idempotencyKey: 'k', bodyMd: 'same' })); + const b = await coord.submitJob(PID, input({ idempotencyKey: 'k', bodyMd: 'same' })); + expect(b.outcome).toBe('deduplicated'); + expect(b.job.id).toBe(a.job.id); + expect(await repo.findJobsByIdempotencyKey(PID, 'k')).toHaveLength(1); + }); + + it('idempotent submit: same key+changed content while queued => superseded', async () => { + const a = await coord.submitJob(PID, input({ idempotencyKey: 'k', bodyMd: 'v1' })); + const b = await coord.submitJob(PID, input({ idempotencyKey: 'k', bodyMd: 'v2' })); + expect(b.outcome).toBe('superseded'); + expect(b.job.id).toBe(a.job.id); + expect(b.job.bodyMd).toBe('v2'); + expect(await repo.findJobsByIdempotencyKey(PID, 'k')).toHaveLength(1); + }); + + it('idempotent submit: same key+changed content past queued => 409', async () => { + await coord.submitJob(PID, input({ idempotencyKey: 'k', bodyMd: 'v1' })); + await coord.claimNextJob(factory()); // job now assigned (past queued) + await expect( + coord.submitJob(PID, input({ idempotencyKey: 'k', bodyMd: 'v2' })) + ).rejects.toBeInstanceOf(ConflictError); + }); +}); diff --git a/services/platform-service/src/modules/fleet/coordinator.ts b/services/platform-service/src/modules/fleet/coordinator.ts new file mode 100644 index 00000000..1d259c54 --- /dev/null +++ b/services/platform-service/src/modules/fleet/coordinator.ts @@ -0,0 +1,503 @@ +/** + * Fleet coordinator — the concurrency core (Phase 2 §4/§7/§8/§18/§25). + * + * Responsibilities: + * - submitJob idempotent submit + submit-time dependency cycle detection + * - claimNextJob atomic, single-winner claim (priority+age, deps + capability + * gated) via a rev compare-and-swap, then lease acquisition + * - patchJobFenced fenced state transition (rejects a stale leaseEpoch) + * - renewLease / releaseLease + * - heartbeat factory liveness + * - reapExpiredLeases reclaim dead-worker jobs (bumps leaseEpoch to fence the + * zombie, returns the job to queued/blocked, preserves checkpoint) + * + * Concurrency model: `rev` is an optimistic-concurrency token on jobs + leases. + * Every contended mutation goes through repository.revUpdate*, which writes only + * if the stored `rev` still matches — so under contention EXACTLY ONE caller wins + * and losers get a `conflict` and retry. Maps to Cosmos `_etag` / If-Match in prod. + */ + +import { createHash } from 'node:crypto'; +import { BadRequestError, ConflictError } from '../../lib/errors.js'; +import * as repo from './repository.js'; +import { + ACTIVE_STAGES, + DEP_DONE_HARD, + DEP_DONE_SOFT, + PRIORITY_ORDER, + type FleetJobDoc, + type FleetLeaseDoc, + type FleetRunDoc, + type FleetStage, + type SubmitJobInput, +} from './types.js'; + +const CLAIM_MAX_RETRIES = 8; + +export function contentHash(bodyMd: string): string { + return createHash('sha256').update(bodyMd).digest('hex'); +} + +/** Every required capability token must be advertised by the factory. */ +export function capabilitiesSubset(required: string[], available: string[]): boolean { + const set = new Set(available); + return required.every(token => set.has(token)); +} + +// ── Dependency evaluation (§5) ──────────────────────────────────────────────── + +/** Unmet dependency keys for a job given the current store state. */ +export async function unmetDeps(job: FleetJobDoc): Promise { + if (!job.deps || job.deps.length === 0) return []; + const done = job.depsMode === 'soft' ? DEP_DONE_SOFT : DEP_DONE_HARD; + const unmet: string[] = []; + for (const depKey of job.deps) { + const matches = await repo.findJobsByIdempotencyKey(job.productId, depKey); + const satisfied = matches.some(m => done.includes(m.stage)); + if (!satisfied) unmet.push(depKey); + } + return unmet; +} + +async function stageForDeps(job: FleetJobDoc): Promise<{ stage: FleetStage; unmet: string[] }> { + const unmet = await unmetDeps(job); + return { stage: unmet.length > 0 ? 'blocked' : 'queued', unmet }; +} + +/** + * Submit-time cycle detection: would a job with `newKey` depending on `newDeps` + * create a cycle in the idempotency-key dependency graph (existing jobs + new)? + */ +export async function wouldCreateCycle( + productId: string, + newKey: string, + newDeps: string[] +): Promise { + if (newDeps.includes(newKey)) return true; // self-dependency + const visited = new Set(); + let frontier = [...newDeps]; + while (frontier.length > 0) { + const next: string[] = []; + for (const key of frontier) { + if (key === newKey) return true; // reached back to the new node + if (visited.has(key)) continue; + visited.add(key); + const matches = await repo.findJobsByIdempotencyKey(productId, key); + for (const m of matches) next.push(...m.deps); + } + frontier = next; + } + return false; +} + +// ── Submit (idempotent) ─────────────────────────────────────────────────────── + +export interface SubmitResult { + job: FleetJobDoc; + outcome: 'created' | 'deduplicated' | 'superseded'; +} + +export async function submitJob(productId: string, input: SubmitJobInput): Promise { + const hash = contentHash(input.bodyMd); + const existingForKey = await repo.findJobsByIdempotencyKey(productId, input.idempotencyKey); + + // Idempotency (§4): same key + identical content => return existing (no dup). + const identical = existingForKey.find(j => j.contentHash === hash); + if (identical) return { job: identical, outcome: 'deduplicated' }; + + if (existingForKey.length > 0) { + // same key, different content + const supersedable = existingForKey.find(j => j.stage === 'queued' || j.stage === 'blocked'); + if (!supersedable) { + throw new ConflictError( + `idempotency-key '${input.idempotencyKey}' already in use by an in-flight/terminal job with different content` + ); + } + // supersede the still-queued/blocked job in place + if (await wouldCreateCycle(productId, input.idempotencyKey, input.deps)) { + throw new BadRequestError('dependency cycle detected — submission rejected'); + } + const refreshed = applyInputToJob(supersedable, input, hash); + const { stage } = await stageForDeps(refreshed); + const updated = await repo.updateJob(supersedable.id, productId, { + ...stripIdentity(refreshed), + stage, + }); + await repo.appendEvent({ + jobId: supersedable.id, + productId, + type: 'superseded', + data: { idempotencyKey: input.idempotencyKey }, + }); + return { job: updated ?? supersedable, outcome: 'superseded' }; + } + + // brand-new job + if (await wouldCreateCycle(productId, input.idempotencyKey, input.deps)) { + throw new BadRequestError('dependency cycle detected — submission rejected'); + } + const now = new Date().toISOString(); + const id = `fjob_${crypto.randomUUID()}`; + const base: FleetJobDoc = { + id, + productId, + stage: 'queued', + idempotencyKey: input.idempotencyKey, + contentHash: hash, + bodyMd: input.bodyMd, + manifestSnapshot: { + priority: input.priority, + capabilities: input.capabilities, + engineClass: input.engineClass, + profile: input.profile, + prefersEngine: input.prefersEngine, + allowedScope: input.allowedScope, + deps: input.deps, + depsMode: input.depsMode, + budget: input.budget, + retry: input.retry, + }, + priority: input.priority, + priorityOrder: PRIORITY_ORDER[input.priority], + capabilities: input.capabilities, + engineClass: input.engineClass, + profile: input.profile, + deps: input.deps, + depsMode: input.depsMode, + budget: input.budget, + retry: input.retry, + kind: input.kind, + parentId: input.parentId, + trackerItemId: input.trackerItemId, + attempts: 0, + leaseEpoch: 0, + rev: 0, + createdAt: now, + updatedAt: now, + }; + const { stage } = await stageForDeps(base); + base.stage = stage; + const created = await repo.createJob(base); + await repo.appendEvent({ + jobId: id, + productId, + type: 'submitted', + data: { stage, idempotencyKey: input.idempotencyKey }, + }); + return { job: created, outcome: 'created' }; +} + +function applyInputToJob(job: FleetJobDoc, input: SubmitJobInput, hash: string): FleetJobDoc { + return { + ...job, + contentHash: hash, + bodyMd: input.bodyMd, + priority: input.priority, + priorityOrder: PRIORITY_ORDER[input.priority], + capabilities: input.capabilities, + engineClass: input.engineClass, + profile: input.profile, + deps: input.deps, + depsMode: input.depsMode, + budget: input.budget, + retry: input.retry, + manifestSnapshot: { + priority: input.priority, + capabilities: input.capabilities, + engineClass: input.engineClass, + profile: input.profile, + prefersEngine: input.prefersEngine, + allowedScope: input.allowedScope, + deps: input.deps, + depsMode: input.depsMode, + budget: input.budget, + retry: input.retry, + }, + }; +} + +function stripIdentity(job: FleetJobDoc): Partial { + const { id: _id, productId: _pid, createdAt: _c, rev: _r, ...rest } = job; + return rest; +} + +// ── Atomic claim (§4/§7) ──────────────────────────────────────────────────── + +export interface ClaimContext { + productId: string; + factoryId: string; + capabilities: string[]; + leaseSeconds: number; +} + +export interface ClaimResult { + job: FleetJobDoc; + lease: FleetLeaseDoc; + run: FleetRunDoc; +} + +/** A job is eligible for a factory iff queued/blocked-with-met-deps + caps subset. */ +async function eligibleForClaim(job: FleetJobDoc, factoryCaps: string[]): Promise { + if (job.stage !== 'queued' && job.stage !== 'blocked') return false; + if (!capabilitiesSubset(job.capabilities, factoryCaps)) return false; + const unmet = await unmetDeps(job); + return unmet.length === 0; +} + +/** + * Try to claim ONE specific job version. The rev compare-and-swap is the single + * point at which exactly one contender wins; a stale `rev` => conflict (no write, + * no double-assignment). + */ +export async function tryClaimJob( + job: FleetJobDoc, + ctx: ClaimContext +): Promise> { + const newEpoch = job.leaseEpoch + 1; + const attempt = job.attempts + 1; + const claimed = await repo.revUpdateJob(job.id, ctx.productId, job.rev, { + stage: 'assigned', + leaseEpoch: newEpoch, + attempts: attempt, + }); + if (!claimed.ok) return claimed; + + const now = Date.now(); + const expiresAt = new Date(now + ctx.leaseSeconds * 1000).toISOString(); + const nowIso = new Date(now).toISOString(); + + const existingLease = await repo.getLease(job.id); + let lease: FleetLeaseDoc; + if (existingLease) { + const updated = await repo.revUpdateLease(job.id, existingLease.rev, { + holderFactoryId: ctx.factoryId, + expiresAt, + leaseEpoch: newEpoch, + renewals: 0, + status: 'held', + }); + lease = updated.ok ? updated.doc : existingLease; + } else { + lease = await repo.createLease({ + id: job.id, + productId: ctx.productId, + jobId: job.id, + holderFactoryId: ctx.factoryId, + expiresAt, + leaseEpoch: newEpoch, + renewals: 0, + status: 'held', + rev: 0, + updatedAt: nowIso, + }); + } + + const run = await repo.createRun({ + id: `${job.id}:run:${attempt}`, + productId: ctx.productId, + jobId: job.id, + attempt, + factoryId: ctx.factoryId, + engine: job.engineClass ?? 'unknown', + startedAt: nowIso, + insights: {}, + }); + + await repo.appendEvent({ + jobId: job.id, + productId: ctx.productId, + type: 'assigned', + actor: ctx.factoryId, + data: { leaseEpoch: newEpoch, attempt }, + }); + + return { ok: true, doc: { job: claimed.doc, lease, run } }; +} + +/** Select the highest-priority, oldest eligible job and atomically claim it. */ +export async function claimNextJob(ctx: ClaimContext): Promise { + for (let i = 0; i < CLAIM_MAX_RETRIES; i++) { + const candidates = await repo.listJobs({ productId: ctx.productId }); + const eligible: FleetJobDoc[] = []; + for (const job of candidates) { + if (await eligibleForClaim(job, ctx.capabilities)) eligible.push(job); + } + if (eligible.length === 0) return null; + eligible.sort( + (a, b) => a.priorityOrder - b.priorityOrder || a.createdAt.localeCompare(b.createdAt) + ); + const result = await tryClaimJob(eligible[0], ctx); + if (result.ok) return result.doc; + if (result.reason === 'not_found') continue; + // conflict: another factory won this version — re-select and retry + } + return null; +} + +// ── Fenced transitions + leases (§4/§8) ────────────────────────────────────── + +export type FenceResult = + | { ok: true; doc: T } + | { ok: false; reason: 'not_found' | 'fenced' | 'conflict' }; + +function fenced(job: FleetJobDoc, leaseEpoch: number): boolean { + // a call older than the current epoch is a stale/zombie worker — reject it + return leaseEpoch < job.leaseEpoch; +} + +export interface PatchJobInputInternal { + leaseEpoch: number; + stage?: FleetStage; + checkpoint?: FleetJobDoc['checkpoint']; + blockedReason?: string; +} + +export async function patchJobFenced( + jobId: string, + productId: string, + patch: PatchJobInputInternal +): Promise> { + const job = await repo.getJob(jobId, productId); + if (!job) return { ok: false, reason: 'not_found' }; + if (fenced(job, patch.leaseEpoch)) return { ok: false, reason: 'fenced' }; + + const updates: Partial = {}; + if (patch.stage) updates.stage = patch.stage; + if (patch.checkpoint) updates.checkpoint = patch.checkpoint; + if (patch.blockedReason !== undefined) updates.blockedReason = patch.blockedReason; + + const res = await repo.revUpdateJob(jobId, productId, job.rev, updates); + if (!res.ok) return { ok: false, reason: res.reason === 'not_found' ? 'not_found' : 'conflict' }; + await repo.appendEvent({ + jobId, + productId, + type: 'transition', + data: { stage: patch.stage ?? job.stage, leaseEpoch: patch.leaseEpoch }, + }); + return { ok: true, doc: res.doc }; +} + +export async function renewLease( + jobId: string, + productId: string, + leaseEpoch: number, + leaseSeconds: number +): Promise> { + const job = await repo.getJob(jobId, productId); + if (!job) return { ok: false, reason: 'not_found' }; + if (fenced(job, leaseEpoch)) return { ok: false, reason: 'fenced' }; + const lease = await repo.getLease(jobId); + if (!lease) return { ok: false, reason: 'not_found' }; + const expiresAt = new Date(Date.now() + leaseSeconds * 1000).toISOString(); + const res = await repo.revUpdateLease(jobId, lease.rev, { + expiresAt, + renewals: lease.renewals + 1, + status: 'held', + }); + if (!res.ok) return { ok: false, reason: 'conflict' }; + await repo.appendEvent({ jobId, productId, type: 'lease_renewed', data: { leaseEpoch } }); + return { ok: true, doc: res.doc }; +} + +export async function releaseLease( + jobId: string, + productId: string, + leaseEpoch: number, + stage?: FleetStage +): Promise> { + const job = await repo.getJob(jobId, productId); + if (!job) return { ok: false, reason: 'not_found' }; + if (fenced(job, leaseEpoch)) return { ok: false, reason: 'fenced' }; + const lease = await repo.getLease(jobId); + if (!lease) return { ok: false, reason: 'not_found' }; + const res = await repo.revUpdateLease(jobId, lease.rev, { status: 'released' }); + if (!res.ok) return { ok: false, reason: 'conflict' }; + if (stage) await repo.revUpdateJob(jobId, productId, job.rev, { stage }); + await repo.appendEvent({ jobId, productId, type: 'lease_released', data: { leaseEpoch, stage } }); + return { ok: true, doc: res.doc }; +} + +// ── Heartbeat (§8) ──────────────────────────────────────────────────────────── + +export interface HeartbeatContext { + productId: string; + factoryId: string; + descriptor?: Record; + capabilities?: string[]; + health?: FleetFactoryHealthInput; + load?: number; + seatLimit?: number; +} +type FleetFactoryHealthInput = 'ok' | 'degraded' | 'down'; + +export async function heartbeat(ctx: HeartbeatContext): Promise> { + const now = new Date().toISOString(); + const existing = await repo.getFactory(ctx.factoryId, ctx.productId); + await repo.upsertFactory({ + id: ctx.factoryId, + productId: ctx.productId, + factoryId: ctx.factoryId, + descriptor: ctx.descriptor ?? existing?.descriptor ?? {}, + capabilities: ctx.capabilities ?? existing?.capabilities ?? [], + health: ctx.health ?? existing?.health ?? 'ok', + load: ctx.load ?? existing?.load ?? 0, + seatLimit: ctx.seatLimit ?? existing?.seatLimit ?? 1, + lastHeartbeatAt: now, + }); + return { ok: true, doc: true }; +} + +/** A factory is stale if its last heartbeat is older than `maxAgeMs`. */ +export function isFactoryStale( + factory: { lastHeartbeatAt: string }, + nowMs: number, + maxAgeMs: number +): boolean { + return nowMs - new Date(factory.lastHeartbeatAt).getTime() > maxAgeMs; +} + +// ── Reaper (§25.3) ──────────────────────────────────────────────────────────── + +export interface ReapResult { + reaped: number; + jobIds: string[]; +} + +/** + * Reclaim jobs whose lease has expired. Cosmos TTL would only DELETE the lease + * doc — it cannot return the job to `queued`, bump the fencing epoch, or preserve + * the checkpoint — so the reaper (not TTL) owns recovery. Idempotent: a reaped + * lease becomes `expired` and is no longer selected by listExpiredLeases. + */ +export async function reapExpiredLeases(nowIso: string): Promise { + const expired = await repo.listExpiredLeases(nowIso); + const jobIds: string[] = []; + for (const lease of expired) { + const job = await repo.getJob(lease.jobId, lease.productId); + if (!job) continue; + if (!ACTIVE_STAGES.includes(job.stage)) continue; // already resting; nothing to reclaim + const newEpoch = job.leaseEpoch + 1; // fence the zombie holder + const unmet = await unmetDeps(job); + const stage: FleetStage = unmet.length > 0 ? 'blocked' : 'queued'; + // checkpoint pointer is intentionally preserved on the job (resume-friendly) + const jobRes = await repo.revUpdateJob(job.id, lease.productId, job.rev, { + stage, + leaseEpoch: newEpoch, + blockedReason: unmet.length > 0 ? `waiting on: ${unmet.join(', ')}` : undefined, + }); + if (!jobRes.ok) continue; + await repo.revUpdateLease(lease.jobId, lease.rev, { + status: 'expired', + leaseEpoch: newEpoch, + holderFactoryId: undefined, + }); + await repo.appendEvent({ + jobId: job.id, + productId: lease.productId, + type: 'lease_expired', + data: { leaseEpoch: newEpoch, returnedTo: stage }, + }); + jobIds.push(job.id); + } + return { reaped: jobIds.length, jobIds }; +}