feat(platform-service): fleet coordinator — claim/lease/fence/heartbeat/reaper (P2 foundation)

The concurrency core (§4/§7/§8/§18/§25):
- claimNextJob: priority+age selection over queued/dep-satisfied jobs whose caps
  are a subset of the factory's, then tryClaimJob does a rev CAS to flip to
  assigned + acquire the lease — exactly one contender wins, no double-assignment.
- leases + fencing: acquire/reclaim bumps leaseEpoch; patchJobFenced/renew/release
  reject a call whose leaseEpoch < job.leaseEpoch (zombie worker can't overwrite).
- heartbeat + isFactoryStale for factory liveness.
- reapExpiredLeases: returns expired-lease jobs to queued/blocked, bumps the epoch
  (fencing the dead holder), preserves the checkpoint pointer (resume), marks the
  lease expired; idempotent. Documents why Cosmos TTL cannot do this.
- submit: idempotent (dedup/supersede/409) + submit-time dependency cycle
  detection; deps gating (shipped, or testing when depsMode:soft).

Tests drive the atomic-claim race, fencing, and reaper deterministically via the
rev CAS (no real threads).
This commit is contained in:
saravanakumardb1 2026-05-29 20:20:30 -07:00
parent fada354df8
commit 8f51570da7
2 changed files with 743 additions and 0 deletions

View File

@ -0,0 +1,240 @@
/**
* Fleet coordinator the concurrency core. Memory provider; deterministic
* (race driven via the rev compare-and-swap, not real threads).
*/
import { afterEach, beforeEach, describe, expect, it } from 'vitest';
import { MemoryDatastoreProvider } from '@bytelyst/datastore';
import { ConflictError, BadRequestError } from '@bytelyst/errors';
import { _resetDatastoreProvider, setProvider } from '../../lib/datastore.js';
import * as repo from './repository.js';
import * as coord from './coordinator.js';
import type { SubmitJobInput } from './types.js';
const PID = 'lysnrai';
function input(over: Partial<SubmitJobInput> = {}): SubmitJobInput {
return {
idempotencyKey: 'task-1',
bodyMd: '# do the thing',
priority: 'medium',
capabilities: [],
prefersEngine: [],
allowedScope: [],
deps: [],
kind: 'leaf',
...over,
};
}
const factory = (over = {}) => ({
productId: PID,
factoryId: 'fac_1',
capabilities: [] as string[],
leaseSeconds: 900,
...over,
});
describe('fleet coordinator', () => {
beforeEach(() => setProvider(new MemoryDatastoreProvider()));
afterEach(() => _resetDatastoreProvider());
it('submit: new job is queued with no deps', async () => {
const { job, outcome } = await coord.submitJob(PID, input());
expect(outcome).toBe('created');
expect(job.stage).toBe('queued');
expect(job.productId).toBe(PID);
});
// ── ATOMIC CLAIM RACE ──
it('atomic claim: two contenders on the same job version — exactly one wins', async () => {
const { job } = await coord.submitJob(PID, input());
// Both contenders see the SAME job version (rev). The CAS picks one winner.
const a = await coord.tryClaimJob(job, factory({ factoryId: 'fac_A' }));
const b = await coord.tryClaimJob(job, factory({ factoryId: 'fac_B' }));
const oks = [a, b].filter(r => r.ok);
const conflicts = [a, b].filter(r => !r.ok);
expect(oks).toHaveLength(1);
expect(conflicts).toHaveLength(1);
expect(conflicts[0].ok === false && conflicts[0].reason).toBe('conflict');
// no double-assignment: the job is assigned exactly once, single run, single holder
const stored = await repo.getJob(job.id, PID);
expect(stored?.stage).toBe('assigned');
expect(stored?.attempts).toBe(1);
expect(await repo.listRunsByJob(job.id)).toHaveLength(1);
const lease = await repo.getLease(job.id);
expect(lease?.leaseEpoch).toBe(1);
});
// ── PRIORITY + AGE SELECTION ──
it('claimNextJob returns highest-priority then oldest', async () => {
await coord.submitJob(PID, input({ idempotencyKey: 'low-old', priority: 'low' }));
await coord.submitJob(PID, input({ idempotencyKey: 'crit-new', priority: 'critical' }));
await coord.submitJob(PID, input({ idempotencyKey: 'med', priority: 'medium' }));
const claim = await coord.claimNextJob(factory());
expect(claim?.job.idempotencyKey).toBe('crit-new');
});
it('claimNextJob respects capability subset', async () => {
await coord.submitJob(PID, input({ idempotencyKey: 'needs-mac', capabilities: ['os:mac'] }));
const noCaps = await coord.claimNextJob(factory({ capabilities: [] }));
expect(noCaps).toBeNull();
const withCaps = await coord.claimNextJob(factory({ capabilities: ['os:mac', 'has:git'] }));
expect(withCaps?.job.idempotencyKey).toBe('needs-mac');
});
// ── DEPS GATING ──
it('deps: a job with unmet deps is blocked and not claimable until the dep ships', async () => {
await coord.submitJob(PID, input({ idempotencyKey: 'A' }));
const { job: b } = await coord.submitJob(PID, input({ idempotencyKey: 'B', deps: ['A'] }));
expect(b.stage).toBe('blocked');
// first claim returns A (B is blocked)
const first = await coord.claimNextJob(factory());
expect(first?.job.idempotencyKey).toBe('A');
// B still not claimable
expect(await coord.claimNextJob(factory())).toBeNull();
// ship A → B becomes claimable
const a = await repo.getJob(first!.job.id, PID);
await repo.updateJob(a!.id, PID, { stage: 'shipped' });
const second = await coord.claimNextJob(factory());
expect(second?.job.idempotencyKey).toBe('B');
});
it('deps-mode soft: dep satisfied when dependency is in testing', async () => {
await coord.submitJob(PID, input({ idempotencyKey: 'A' }));
const { job: c } = await coord.submitJob(
PID,
input({ idempotencyKey: 'C', deps: ['A'], depsMode: 'soft' })
);
expect(c.stage).toBe('blocked');
const a = (await repo.findJobsByIdempotencyKey(PID, 'A'))[0];
await repo.updateJob(a.id, PID, { stage: 'testing' });
const unmet = await coord.unmetDeps((await repo.getJob(c.id, PID))!);
expect(unmet).toEqual([]);
});
it('cycle detection: a cyclic submit is rejected', async () => {
await coord.submitJob(PID, input({ idempotencyKey: 'B', deps: ['A'] }));
await expect(
coord.submitJob(PID, input({ idempotencyKey: 'A', deps: ['B'] }))
).rejects.toBeInstanceOf(BadRequestError);
});
// ── FENCING ──
it('fencing: a stale leaseEpoch is rejected; the current epoch succeeds', async () => {
const { job } = await coord.submitJob(PID, input());
const claim = await coord.claimNextJob(factory());
expect(claim).not.toBeNull();
const epoch = claim!.job.leaseEpoch; // 1
const stale = await coord.patchJobFenced(job.id, PID, {
leaseEpoch: epoch - 1,
stage: 'building',
});
expect(stale.ok).toBe(false);
if (!stale.ok) expect(stale.reason).toBe('fenced');
const current = await coord.patchJobFenced(job.id, PID, {
leaseEpoch: epoch,
stage: 'building',
});
expect(current.ok).toBe(true);
if (current.ok) expect(current.doc.stage).toBe('building');
});
// ── REAPER ──
it('reaper: expired lease returns the job to queued, bumps epoch, preserves checkpoint; idempotent', async () => {
const { job } = await coord.submitJob(PID, input());
const claim = await coord.claimNextJob(factory());
const epoch0 = claim!.job.leaseEpoch;
// worker checkpoints WIP, then dies; force the lease to be expired
await coord.patchJobFenced(job.id, PID, {
leaseEpoch: epoch0,
stage: 'building',
checkpoint: { wipBranch: 'aq/wip/x', wipBase: 'base1', wipCommit: 'c1' },
});
const lease = await repo.getLease(job.id);
await repo.revUpdateLease(job.id, lease!.rev, { expiresAt: '2000-01-01T00:00:00.000Z' });
const res = await coord.reapExpiredLeases(new Date().toISOString());
expect(res.reaped).toBe(1);
const reclaimed = await repo.getJob(job.id, PID);
expect(reclaimed?.stage).toBe('queued');
expect(reclaimed?.leaseEpoch).toBe(epoch0 + 1); // fenced
expect(reclaimed?.checkpoint?.wipBranch).toBe('aq/wip/x'); // preserved
expect((await repo.getLease(job.id))?.status).toBe('expired');
// idempotent — running again reaps nothing
const again = await coord.reapExpiredLeases(new Date().toISOString());
expect(again.reaped).toBe(0);
});
it('reaper: the zombie (old epoch) is fenced out after reclaim', async () => {
const { job } = await coord.submitJob(PID, input());
const claim = await coord.claimNextJob(factory());
const zombieEpoch = claim!.job.leaseEpoch;
await coord.patchJobFenced(job.id, PID, { leaseEpoch: zombieEpoch, stage: 'building' });
const lease = await repo.getLease(job.id);
await repo.revUpdateLease(job.id, lease!.rev, { expiresAt: '2000-01-01T00:00:00.000Z' });
await coord.reapExpiredLeases(new Date().toISOString());
// zombie tries to write with its now-stale epoch -> fenced
const zombie = await coord.patchJobFenced(job.id, PID, {
leaseEpoch: zombieEpoch,
stage: 'shipped',
});
expect(zombie.ok).toBe(false);
if (!zombie.ok) expect(zombie.reason).toBe('fenced');
});
// ── HEARTBEAT ──
it('heartbeat updates lastHeartbeatAt/health; staleness is detectable', async () => {
await coord.heartbeat({
productId: PID,
factoryId: 'fac_1',
capabilities: ['os:mac'],
health: 'ok',
load: 1,
});
const fac = await repo.getFactory('fac_1', PID);
expect(fac?.health).toBe('ok');
expect(fac?.capabilities).toEqual(['os:mac']);
expect(coord.isFactoryStale(fac!, Date.now(), 60_000)).toBe(false);
const oldNow = new Date(fac!.lastHeartbeatAt).getTime() + 120_000;
expect(coord.isFactoryStale(fac!, oldNow, 60_000)).toBe(true);
});
// ── IDEMPOTENCY ──
it('idempotent submit: same key+content => 1 job', async () => {
const a = await coord.submitJob(PID, input({ idempotencyKey: 'k', bodyMd: 'same' }));
const b = await coord.submitJob(PID, input({ idempotencyKey: 'k', bodyMd: 'same' }));
expect(b.outcome).toBe('deduplicated');
expect(b.job.id).toBe(a.job.id);
expect(await repo.findJobsByIdempotencyKey(PID, 'k')).toHaveLength(1);
});
it('idempotent submit: same key+changed content while queued => superseded', async () => {
const a = await coord.submitJob(PID, input({ idempotencyKey: 'k', bodyMd: 'v1' }));
const b = await coord.submitJob(PID, input({ idempotencyKey: 'k', bodyMd: 'v2' }));
expect(b.outcome).toBe('superseded');
expect(b.job.id).toBe(a.job.id);
expect(b.job.bodyMd).toBe('v2');
expect(await repo.findJobsByIdempotencyKey(PID, 'k')).toHaveLength(1);
});
it('idempotent submit: same key+changed content past queued => 409', async () => {
await coord.submitJob(PID, input({ idempotencyKey: 'k', bodyMd: 'v1' }));
await coord.claimNextJob(factory()); // job now assigned (past queued)
await expect(
coord.submitJob(PID, input({ idempotencyKey: 'k', bodyMd: 'v2' }))
).rejects.toBeInstanceOf(ConflictError);
});
});

View File

@ -0,0 +1,503 @@
/**
* Fleet coordinator the concurrency core (Phase 2 §4/§7/§8/§18/§25).
*
* Responsibilities:
* - submitJob idempotent submit + submit-time dependency cycle detection
* - claimNextJob atomic, single-winner claim (priority+age, deps + capability
* gated) via a rev compare-and-swap, then lease acquisition
* - patchJobFenced fenced state transition (rejects a stale leaseEpoch)
* - renewLease / releaseLease
* - heartbeat factory liveness
* - reapExpiredLeases reclaim dead-worker jobs (bumps leaseEpoch to fence the
* zombie, returns the job to queued/blocked, preserves checkpoint)
*
* Concurrency model: `rev` is an optimistic-concurrency token on jobs + leases.
* Every contended mutation goes through repository.revUpdate*, which writes only
* if the stored `rev` still matches so under contention EXACTLY ONE caller wins
* and losers get a `conflict` and retry. Maps to Cosmos `_etag` / If-Match in prod.
*/
import { createHash } from 'node:crypto';
import { BadRequestError, ConflictError } from '../../lib/errors.js';
import * as repo from './repository.js';
import {
ACTIVE_STAGES,
DEP_DONE_HARD,
DEP_DONE_SOFT,
PRIORITY_ORDER,
type FleetJobDoc,
type FleetLeaseDoc,
type FleetRunDoc,
type FleetStage,
type SubmitJobInput,
} from './types.js';
const CLAIM_MAX_RETRIES = 8;
export function contentHash(bodyMd: string): string {
return createHash('sha256').update(bodyMd).digest('hex');
}
/** Every required capability token must be advertised by the factory. */
export function capabilitiesSubset(required: string[], available: string[]): boolean {
const set = new Set(available);
return required.every(token => set.has(token));
}
// ── Dependency evaluation (§5) ────────────────────────────────────────────────
/** Unmet dependency keys for a job given the current store state. */
export async function unmetDeps(job: FleetJobDoc): Promise<string[]> {
if (!job.deps || job.deps.length === 0) return [];
const done = job.depsMode === 'soft' ? DEP_DONE_SOFT : DEP_DONE_HARD;
const unmet: string[] = [];
for (const depKey of job.deps) {
const matches = await repo.findJobsByIdempotencyKey(job.productId, depKey);
const satisfied = matches.some(m => done.includes(m.stage));
if (!satisfied) unmet.push(depKey);
}
return unmet;
}
async function stageForDeps(job: FleetJobDoc): Promise<{ stage: FleetStage; unmet: string[] }> {
const unmet = await unmetDeps(job);
return { stage: unmet.length > 0 ? 'blocked' : 'queued', unmet };
}
/**
* Submit-time cycle detection: would a job with `newKey` depending on `newDeps`
* create a cycle in the idempotency-key dependency graph (existing jobs + new)?
*/
export async function wouldCreateCycle(
productId: string,
newKey: string,
newDeps: string[]
): Promise<boolean> {
if (newDeps.includes(newKey)) return true; // self-dependency
const visited = new Set<string>();
let frontier = [...newDeps];
while (frontier.length > 0) {
const next: string[] = [];
for (const key of frontier) {
if (key === newKey) return true; // reached back to the new node
if (visited.has(key)) continue;
visited.add(key);
const matches = await repo.findJobsByIdempotencyKey(productId, key);
for (const m of matches) next.push(...m.deps);
}
frontier = next;
}
return false;
}
// ── Submit (idempotent) ───────────────────────────────────────────────────────
export interface SubmitResult {
job: FleetJobDoc;
outcome: 'created' | 'deduplicated' | 'superseded';
}
export async function submitJob(productId: string, input: SubmitJobInput): Promise<SubmitResult> {
const hash = contentHash(input.bodyMd);
const existingForKey = await repo.findJobsByIdempotencyKey(productId, input.idempotencyKey);
// Idempotency (§4): same key + identical content => return existing (no dup).
const identical = existingForKey.find(j => j.contentHash === hash);
if (identical) return { job: identical, outcome: 'deduplicated' };
if (existingForKey.length > 0) {
// same key, different content
const supersedable = existingForKey.find(j => j.stage === 'queued' || j.stage === 'blocked');
if (!supersedable) {
throw new ConflictError(
`idempotency-key '${input.idempotencyKey}' already in use by an in-flight/terminal job with different content`
);
}
// supersede the still-queued/blocked job in place
if (await wouldCreateCycle(productId, input.idempotencyKey, input.deps)) {
throw new BadRequestError('dependency cycle detected — submission rejected');
}
const refreshed = applyInputToJob(supersedable, input, hash);
const { stage } = await stageForDeps(refreshed);
const updated = await repo.updateJob(supersedable.id, productId, {
...stripIdentity(refreshed),
stage,
});
await repo.appendEvent({
jobId: supersedable.id,
productId,
type: 'superseded',
data: { idempotencyKey: input.idempotencyKey },
});
return { job: updated ?? supersedable, outcome: 'superseded' };
}
// brand-new job
if (await wouldCreateCycle(productId, input.idempotencyKey, input.deps)) {
throw new BadRequestError('dependency cycle detected — submission rejected');
}
const now = new Date().toISOString();
const id = `fjob_${crypto.randomUUID()}`;
const base: FleetJobDoc = {
id,
productId,
stage: 'queued',
idempotencyKey: input.idempotencyKey,
contentHash: hash,
bodyMd: input.bodyMd,
manifestSnapshot: {
priority: input.priority,
capabilities: input.capabilities,
engineClass: input.engineClass,
profile: input.profile,
prefersEngine: input.prefersEngine,
allowedScope: input.allowedScope,
deps: input.deps,
depsMode: input.depsMode,
budget: input.budget,
retry: input.retry,
},
priority: input.priority,
priorityOrder: PRIORITY_ORDER[input.priority],
capabilities: input.capabilities,
engineClass: input.engineClass,
profile: input.profile,
deps: input.deps,
depsMode: input.depsMode,
budget: input.budget,
retry: input.retry,
kind: input.kind,
parentId: input.parentId,
trackerItemId: input.trackerItemId,
attempts: 0,
leaseEpoch: 0,
rev: 0,
createdAt: now,
updatedAt: now,
};
const { stage } = await stageForDeps(base);
base.stage = stage;
const created = await repo.createJob(base);
await repo.appendEvent({
jobId: id,
productId,
type: 'submitted',
data: { stage, idempotencyKey: input.idempotencyKey },
});
return { job: created, outcome: 'created' };
}
function applyInputToJob(job: FleetJobDoc, input: SubmitJobInput, hash: string): FleetJobDoc {
return {
...job,
contentHash: hash,
bodyMd: input.bodyMd,
priority: input.priority,
priorityOrder: PRIORITY_ORDER[input.priority],
capabilities: input.capabilities,
engineClass: input.engineClass,
profile: input.profile,
deps: input.deps,
depsMode: input.depsMode,
budget: input.budget,
retry: input.retry,
manifestSnapshot: {
priority: input.priority,
capabilities: input.capabilities,
engineClass: input.engineClass,
profile: input.profile,
prefersEngine: input.prefersEngine,
allowedScope: input.allowedScope,
deps: input.deps,
depsMode: input.depsMode,
budget: input.budget,
retry: input.retry,
},
};
}
function stripIdentity(job: FleetJobDoc): Partial<FleetJobDoc> {
const { id: _id, productId: _pid, createdAt: _c, rev: _r, ...rest } = job;
return rest;
}
// ── Atomic claim (§4/§7) ────────────────────────────────────────────────────
export interface ClaimContext {
productId: string;
factoryId: string;
capabilities: string[];
leaseSeconds: number;
}
export interface ClaimResult {
job: FleetJobDoc;
lease: FleetLeaseDoc;
run: FleetRunDoc;
}
/** A job is eligible for a factory iff queued/blocked-with-met-deps + caps subset. */
async function eligibleForClaim(job: FleetJobDoc, factoryCaps: string[]): Promise<boolean> {
if (job.stage !== 'queued' && job.stage !== 'blocked') return false;
if (!capabilitiesSubset(job.capabilities, factoryCaps)) return false;
const unmet = await unmetDeps(job);
return unmet.length === 0;
}
/**
* Try to claim ONE specific job version. The rev compare-and-swap is the single
* point at which exactly one contender wins; a stale `rev` => conflict (no write,
* no double-assignment).
*/
export async function tryClaimJob(
job: FleetJobDoc,
ctx: ClaimContext
): Promise<repo.RevResult<ClaimResult>> {
const newEpoch = job.leaseEpoch + 1;
const attempt = job.attempts + 1;
const claimed = await repo.revUpdateJob(job.id, ctx.productId, job.rev, {
stage: 'assigned',
leaseEpoch: newEpoch,
attempts: attempt,
});
if (!claimed.ok) return claimed;
const now = Date.now();
const expiresAt = new Date(now + ctx.leaseSeconds * 1000).toISOString();
const nowIso = new Date(now).toISOString();
const existingLease = await repo.getLease(job.id);
let lease: FleetLeaseDoc;
if (existingLease) {
const updated = await repo.revUpdateLease(job.id, existingLease.rev, {
holderFactoryId: ctx.factoryId,
expiresAt,
leaseEpoch: newEpoch,
renewals: 0,
status: 'held',
});
lease = updated.ok ? updated.doc : existingLease;
} else {
lease = await repo.createLease({
id: job.id,
productId: ctx.productId,
jobId: job.id,
holderFactoryId: ctx.factoryId,
expiresAt,
leaseEpoch: newEpoch,
renewals: 0,
status: 'held',
rev: 0,
updatedAt: nowIso,
});
}
const run = await repo.createRun({
id: `${job.id}:run:${attempt}`,
productId: ctx.productId,
jobId: job.id,
attempt,
factoryId: ctx.factoryId,
engine: job.engineClass ?? 'unknown',
startedAt: nowIso,
insights: {},
});
await repo.appendEvent({
jobId: job.id,
productId: ctx.productId,
type: 'assigned',
actor: ctx.factoryId,
data: { leaseEpoch: newEpoch, attempt },
});
return { ok: true, doc: { job: claimed.doc, lease, run } };
}
/** Select the highest-priority, oldest eligible job and atomically claim it. */
export async function claimNextJob(ctx: ClaimContext): Promise<ClaimResult | null> {
for (let i = 0; i < CLAIM_MAX_RETRIES; i++) {
const candidates = await repo.listJobs({ productId: ctx.productId });
const eligible: FleetJobDoc[] = [];
for (const job of candidates) {
if (await eligibleForClaim(job, ctx.capabilities)) eligible.push(job);
}
if (eligible.length === 0) return null;
eligible.sort(
(a, b) => a.priorityOrder - b.priorityOrder || a.createdAt.localeCompare(b.createdAt)
);
const result = await tryClaimJob(eligible[0], ctx);
if (result.ok) return result.doc;
if (result.reason === 'not_found') continue;
// conflict: another factory won this version — re-select and retry
}
return null;
}
// ── Fenced transitions + leases (§4/§8) ──────────────────────────────────────
export type FenceResult<T> =
| { ok: true; doc: T }
| { ok: false; reason: 'not_found' | 'fenced' | 'conflict' };
function fenced(job: FleetJobDoc, leaseEpoch: number): boolean {
// a call older than the current epoch is a stale/zombie worker — reject it
return leaseEpoch < job.leaseEpoch;
}
export interface PatchJobInputInternal {
leaseEpoch: number;
stage?: FleetStage;
checkpoint?: FleetJobDoc['checkpoint'];
blockedReason?: string;
}
export async function patchJobFenced(
jobId: string,
productId: string,
patch: PatchJobInputInternal
): Promise<FenceResult<FleetJobDoc>> {
const job = await repo.getJob(jobId, productId);
if (!job) return { ok: false, reason: 'not_found' };
if (fenced(job, patch.leaseEpoch)) return { ok: false, reason: 'fenced' };
const updates: Partial<FleetJobDoc> = {};
if (patch.stage) updates.stage = patch.stage;
if (patch.checkpoint) updates.checkpoint = patch.checkpoint;
if (patch.blockedReason !== undefined) updates.blockedReason = patch.blockedReason;
const res = await repo.revUpdateJob(jobId, productId, job.rev, updates);
if (!res.ok) return { ok: false, reason: res.reason === 'not_found' ? 'not_found' : 'conflict' };
await repo.appendEvent({
jobId,
productId,
type: 'transition',
data: { stage: patch.stage ?? job.stage, leaseEpoch: patch.leaseEpoch },
});
return { ok: true, doc: res.doc };
}
export async function renewLease(
jobId: string,
productId: string,
leaseEpoch: number,
leaseSeconds: number
): Promise<FenceResult<FleetLeaseDoc>> {
const job = await repo.getJob(jobId, productId);
if (!job) return { ok: false, reason: 'not_found' };
if (fenced(job, leaseEpoch)) return { ok: false, reason: 'fenced' };
const lease = await repo.getLease(jobId);
if (!lease) return { ok: false, reason: 'not_found' };
const expiresAt = new Date(Date.now() + leaseSeconds * 1000).toISOString();
const res = await repo.revUpdateLease(jobId, lease.rev, {
expiresAt,
renewals: lease.renewals + 1,
status: 'held',
});
if (!res.ok) return { ok: false, reason: 'conflict' };
await repo.appendEvent({ jobId, productId, type: 'lease_renewed', data: { leaseEpoch } });
return { ok: true, doc: res.doc };
}
export async function releaseLease(
jobId: string,
productId: string,
leaseEpoch: number,
stage?: FleetStage
): Promise<FenceResult<FleetLeaseDoc>> {
const job = await repo.getJob(jobId, productId);
if (!job) return { ok: false, reason: 'not_found' };
if (fenced(job, leaseEpoch)) return { ok: false, reason: 'fenced' };
const lease = await repo.getLease(jobId);
if (!lease) return { ok: false, reason: 'not_found' };
const res = await repo.revUpdateLease(jobId, lease.rev, { status: 'released' });
if (!res.ok) return { ok: false, reason: 'conflict' };
if (stage) await repo.revUpdateJob(jobId, productId, job.rev, { stage });
await repo.appendEvent({ jobId, productId, type: 'lease_released', data: { leaseEpoch, stage } });
return { ok: true, doc: res.doc };
}
// ── Heartbeat (§8) ────────────────────────────────────────────────────────────
export interface HeartbeatContext {
productId: string;
factoryId: string;
descriptor?: Record<string, unknown>;
capabilities?: string[];
health?: FleetFactoryHealthInput;
load?: number;
seatLimit?: number;
}
type FleetFactoryHealthInput = 'ok' | 'degraded' | 'down';
export async function heartbeat(ctx: HeartbeatContext): Promise<repo.RevResult<true>> {
const now = new Date().toISOString();
const existing = await repo.getFactory(ctx.factoryId, ctx.productId);
await repo.upsertFactory({
id: ctx.factoryId,
productId: ctx.productId,
factoryId: ctx.factoryId,
descriptor: ctx.descriptor ?? existing?.descriptor ?? {},
capabilities: ctx.capabilities ?? existing?.capabilities ?? [],
health: ctx.health ?? existing?.health ?? 'ok',
load: ctx.load ?? existing?.load ?? 0,
seatLimit: ctx.seatLimit ?? existing?.seatLimit ?? 1,
lastHeartbeatAt: now,
});
return { ok: true, doc: true };
}
/** A factory is stale if its last heartbeat is older than `maxAgeMs`. */
export function isFactoryStale(
factory: { lastHeartbeatAt: string },
nowMs: number,
maxAgeMs: number
): boolean {
return nowMs - new Date(factory.lastHeartbeatAt).getTime() > maxAgeMs;
}
// ── Reaper (§25.3) ────────────────────────────────────────────────────────────
export interface ReapResult {
reaped: number;
jobIds: string[];
}
/**
* Reclaim jobs whose lease has expired. Cosmos TTL would only DELETE the lease
* doc it cannot return the job to `queued`, bump the fencing epoch, or preserve
* the checkpoint so the reaper (not TTL) owns recovery. Idempotent: a reaped
* lease becomes `expired` and is no longer selected by listExpiredLeases.
*/
export async function reapExpiredLeases(nowIso: string): Promise<ReapResult> {
const expired = await repo.listExpiredLeases(nowIso);
const jobIds: string[] = [];
for (const lease of expired) {
const job = await repo.getJob(lease.jobId, lease.productId);
if (!job) continue;
if (!ACTIVE_STAGES.includes(job.stage)) continue; // already resting; nothing to reclaim
const newEpoch = job.leaseEpoch + 1; // fence the zombie holder
const unmet = await unmetDeps(job);
const stage: FleetStage = unmet.length > 0 ? 'blocked' : 'queued';
// checkpoint pointer is intentionally preserved on the job (resume-friendly)
const jobRes = await repo.revUpdateJob(job.id, lease.productId, job.rev, {
stage,
leaseEpoch: newEpoch,
blockedReason: unmet.length > 0 ? `waiting on: ${unmet.join(', ')}` : undefined,
});
if (!jobRes.ok) continue;
await repo.revUpdateLease(lease.jobId, lease.rev, {
status: 'expired',
leaseEpoch: newEpoch,
holderFactoryId: undefined,
});
await repo.appendEvent({
jobId: job.id,
productId: lease.productId,
type: 'lease_expired',
data: { leaseEpoch: newEpoch, returnedTo: stage },
});
jobIds.push(job.id);
}
return { reaped: jobIds.length, jobIds };
}