Merge PR #28: Phase 2 Foundation — fleet module + coordinator (claim/lease/fence/heartbeat/reaper)

Foundation review: 50 fleet tests green, build clean, no regressions. NOTE: the
atomic-claim uses an in-module rev-CAS over an unconditional datastore write —
true cross-process atomicity requires an If-Match/_etag-conditional update in
@bytelyst/datastore (tracked P0 hardening follow-up before P2-S3).
This commit is contained in:
saravanakumardb1 2026-05-29 20:41:36 -07:00
commit 873955ac04
11 changed files with 2139 additions and 0 deletions

View File

@ -187,6 +187,14 @@ const CONTAINER_DEFS: Record<string, ContainerConfig> = {
// i18n (P3)
translations: { partitionKeyPath: '/locale' },
i18n_locales: { partitionKeyPath: '/locale' },
// Agent Gigafactory — fleet coordinator (see modules/fleet/README.md)
fleet_jobs: { partitionKeyPath: '/productId' },
fleet_runs: { partitionKeyPath: '/jobId' },
fleet_leases: { partitionKeyPath: '/jobId' },
fleet_factories: { partitionKeyPath: '/productId' },
fleet_profiles: { partitionKeyPath: '/productId' },
fleet_events: { partitionKeyPath: '/jobId' },
fleet_artifacts: { partitionKeyPath: '/jobId' },
};
export async function initCosmosIfNeeded(): Promise<void> {

View File

@ -0,0 +1,79 @@
# Fleet module — agent gigafactory coordinator (Phase 2 foundation)
Product-agnostic, cloud-agnostic coordinator for distributed agent jobs. This is
the durable backend that supersedes the single-host stand-ins built in the
`agent-queue` (devops-tools) repo. Everything runs on the `@bytelyst/datastore`
abstraction, so all tests execute on `DB_PROVIDER=memory` (no Cosmos/network).
Spec: `../learning_ai_devops_tools/agent-queue/docs/GIGAFACTORY_ROADMAP.md`
(§4 core contract, §7 scheduler/claim, §8 factory/lease/heartbeat, §13 containers,
§18 failure model, §25 durability/recovery, §26 insights).
## Containers (partition keys)
| Container | PK | Purpose |
| ----------------- | ------------ | --------------------------------------------------------------------------------------------------------------------------------------- |
| `fleet_jobs` | `/productId` | durable job: `manifestSnapshot`, verbatim `bodyMd`, `stage`, `idempotencyKey`, `deps`, `checkpoint`, `priority`, `rev`, `leaseEpoch`, … |
| `fleet_runs` | `/jobId` | one execution attempt: engine, timings, `result`, `insights` (tokens/cost/diff) |
| `fleet_leases` | `/jobId` | single-holder lease: `holderFactoryId`, `expiresAt`, `leaseEpoch`, `status` |
| `fleet_factories` | `/productId` | registered worker host: `capabilities`, `health`, `load`, `seatLimit`, `lastHeartbeatAt` |
| `fleet_profiles` | `/productId` | immutable, versioned profile snapshot |
| `fleet_events` | `/jobId` | append-only audit/event stream (monotonic `seq`) |
| `fleet_artifacts` | `/jobId` | pointers to blob-stored artifacts (no inline logs) |
Every document carries `productId`. Containers are registered in
`lib/cosmos-init.ts`.
## Concurrency protocol
**Optimistic concurrency (`rev`).** Jobs and leases carry a monotonic `rev` token.
`repository.revUpdate*` is a compare-and-swap: it writes only if the stored `rev`
still equals the caller's expected `rev`, else it reports `conflict` and writes
nothing. In production (Cosmos) this maps to `_etag` / `If-Match`; on the memory
provider it is enforced by re-reading `rev` immediately before the write, which is
exact for the sequential calls the coordinator and tests make.
**Atomic claim (`claimNextJob`).** Select the highest-priority, oldest job that is
`queued` (or `blocked` with now-satisfied deps) and whose `capabilities` are a
subset of the factory's, then `tryClaimJob` does a `rev` CAS to flip it to
`assigned` and acquire/create its lease. Under contention exactly one factory wins
the CAS; losers get `conflict` and re-select. No double-assignment, ever.
**Leases + fencing.** Acquiring/reclaiming a lease increments `leaseEpoch`. Every
worker mutation (`patchJobFenced`, `renewLease`, `releaseLease`) carries its
`leaseEpoch`; a call whose epoch is `< job.leaseEpoch` is rejected (`fenced`) — a
stale/zombie worker can never overwrite a reassigned job.
**Heartbeat.** `heartbeat(factory)` upserts `lastHeartbeatAt` + health/load;
`isFactoryStale` detects a missed-heartbeat factory.
**Reaper.** `reapExpiredLeases(now)` scans `held` leases with `expiresAt < now`,
bumps `leaseEpoch` (fencing the dead holder), returns the job to `queued` (or
`blocked` if deps are now unmet) **preserving the `checkpoint` pointer** (resume
from WIP), and marks the lease `expired`. Idempotent — a reaped lease is no longer
`held`, so a second pass reaps nothing. Cosmos TTL cannot do this (it only deletes
the lease doc; it cannot requeue the job, bump the epoch, or keep the checkpoint),
so the reaper — not TTL — owns recovery.
## Submit semantics (idempotency + deps)
- same `idempotencyKey` + identical `bodyMd` → returns the existing job (dedup).
- same key + different content while still `queued`/`blocked` → supersede in place.
- same key + different content once past `queued``409 Conflict`.
- a job with unmet `deps` is `blocked` (a dep is met at `shipped`, or `testing`
when `depsMode: soft`); submit-time cycle detection rejects cyclic graphs.
## REST (under `/api`, auth + productId)
`POST /fleet/jobs` · `GET /fleet/jobs` · `GET /fleet/jobs/:id` ·
`PATCH /fleet/jobs/:id` (fenced) · `POST /fleet/claim` ·
`POST /fleet/jobs/:id/lease/renew` · `POST /fleet/jobs/:id/lease/release` ·
`POST /fleet/factories/heartbeat` · `GET /fleet/jobs/:id/runs` ·
`GET /fleet/jobs/:id/events`.
## Files
`types.ts` (Zod schemas → inferred types) · `repository.ts` (per-container repos +
`revUpdate` CAS) · `coordinator.ts` (claim/lease/fence/heartbeat/reaper + submit) ·
`routes.ts` (REST) · `*.test.ts` (schema, repo, coordinator incl. the atomic-claim
race / fencing / reaper, and route inject tests).

View File

@ -0,0 +1,240 @@
/**
* Fleet coordinator the concurrency core. Memory provider; deterministic
* (race driven via the rev compare-and-swap, not real threads).
*/
import { afterEach, beforeEach, describe, expect, it } from 'vitest';
import { MemoryDatastoreProvider } from '@bytelyst/datastore';
import { ConflictError, BadRequestError } from '@bytelyst/errors';
import { _resetDatastoreProvider, setProvider } from '../../lib/datastore.js';
import * as repo from './repository.js';
import * as coord from './coordinator.js';
import type { SubmitJobInput } from './types.js';
const PID = 'lysnrai';
function input(over: Partial<SubmitJobInput> = {}): SubmitJobInput {
return {
idempotencyKey: 'task-1',
bodyMd: '# do the thing',
priority: 'medium',
capabilities: [],
prefersEngine: [],
allowedScope: [],
deps: [],
kind: 'leaf',
...over,
};
}
const factory = (over = {}) => ({
productId: PID,
factoryId: 'fac_1',
capabilities: [] as string[],
leaseSeconds: 900,
...over,
});
describe('fleet coordinator', () => {
beforeEach(() => setProvider(new MemoryDatastoreProvider()));
afterEach(() => _resetDatastoreProvider());
it('submit: new job is queued with no deps', async () => {
const { job, outcome } = await coord.submitJob(PID, input());
expect(outcome).toBe('created');
expect(job.stage).toBe('queued');
expect(job.productId).toBe(PID);
});
// ── ATOMIC CLAIM RACE ──
it('atomic claim: two contenders on the same job version — exactly one wins', async () => {
const { job } = await coord.submitJob(PID, input());
// Both contenders see the SAME job version (rev). The CAS picks one winner.
const a = await coord.tryClaimJob(job, factory({ factoryId: 'fac_A' }));
const b = await coord.tryClaimJob(job, factory({ factoryId: 'fac_B' }));
const oks = [a, b].filter(r => r.ok);
const conflicts = [a, b].filter(r => !r.ok);
expect(oks).toHaveLength(1);
expect(conflicts).toHaveLength(1);
expect(conflicts[0].ok === false && conflicts[0].reason).toBe('conflict');
// no double-assignment: the job is assigned exactly once, single run, single holder
const stored = await repo.getJob(job.id, PID);
expect(stored?.stage).toBe('assigned');
expect(stored?.attempts).toBe(1);
expect(await repo.listRunsByJob(job.id)).toHaveLength(1);
const lease = await repo.getLease(job.id);
expect(lease?.leaseEpoch).toBe(1);
});
// ── PRIORITY + AGE SELECTION ──
it('claimNextJob returns highest-priority then oldest', async () => {
await coord.submitJob(PID, input({ idempotencyKey: 'low-old', priority: 'low' }));
await coord.submitJob(PID, input({ idempotencyKey: 'crit-new', priority: 'critical' }));
await coord.submitJob(PID, input({ idempotencyKey: 'med', priority: 'medium' }));
const claim = await coord.claimNextJob(factory());
expect(claim?.job.idempotencyKey).toBe('crit-new');
});
it('claimNextJob respects capability subset', async () => {
await coord.submitJob(PID, input({ idempotencyKey: 'needs-mac', capabilities: ['os:mac'] }));
const noCaps = await coord.claimNextJob(factory({ capabilities: [] }));
expect(noCaps).toBeNull();
const withCaps = await coord.claimNextJob(factory({ capabilities: ['os:mac', 'has:git'] }));
expect(withCaps?.job.idempotencyKey).toBe('needs-mac');
});
// ── DEPS GATING ──
it('deps: a job with unmet deps is blocked and not claimable until the dep ships', async () => {
await coord.submitJob(PID, input({ idempotencyKey: 'A' }));
const { job: b } = await coord.submitJob(PID, input({ idempotencyKey: 'B', deps: ['A'] }));
expect(b.stage).toBe('blocked');
// first claim returns A (B is blocked)
const first = await coord.claimNextJob(factory());
expect(first?.job.idempotencyKey).toBe('A');
// B still not claimable
expect(await coord.claimNextJob(factory())).toBeNull();
// ship A → B becomes claimable
const a = await repo.getJob(first!.job.id, PID);
await repo.updateJob(a!.id, PID, { stage: 'shipped' });
const second = await coord.claimNextJob(factory());
expect(second?.job.idempotencyKey).toBe('B');
});
it('deps-mode soft: dep satisfied when dependency is in testing', async () => {
await coord.submitJob(PID, input({ idempotencyKey: 'A' }));
const { job: c } = await coord.submitJob(
PID,
input({ idempotencyKey: 'C', deps: ['A'], depsMode: 'soft' })
);
expect(c.stage).toBe('blocked');
const a = (await repo.findJobsByIdempotencyKey(PID, 'A'))[0];
await repo.updateJob(a.id, PID, { stage: 'testing' });
const unmet = await coord.unmetDeps((await repo.getJob(c.id, PID))!);
expect(unmet).toEqual([]);
});
it('cycle detection: a cyclic submit is rejected', async () => {
await coord.submitJob(PID, input({ idempotencyKey: 'B', deps: ['A'] }));
await expect(
coord.submitJob(PID, input({ idempotencyKey: 'A', deps: ['B'] }))
).rejects.toBeInstanceOf(BadRequestError);
});
// ── FENCING ──
it('fencing: a stale leaseEpoch is rejected; the current epoch succeeds', async () => {
const { job } = await coord.submitJob(PID, input());
const claim = await coord.claimNextJob(factory());
expect(claim).not.toBeNull();
const epoch = claim!.job.leaseEpoch; // 1
const stale = await coord.patchJobFenced(job.id, PID, {
leaseEpoch: epoch - 1,
stage: 'building',
});
expect(stale.ok).toBe(false);
if (!stale.ok) expect(stale.reason).toBe('fenced');
const current = await coord.patchJobFenced(job.id, PID, {
leaseEpoch: epoch,
stage: 'building',
});
expect(current.ok).toBe(true);
if (current.ok) expect(current.doc.stage).toBe('building');
});
// ── REAPER ──
it('reaper: expired lease returns the job to queued, bumps epoch, preserves checkpoint; idempotent', async () => {
const { job } = await coord.submitJob(PID, input());
const claim = await coord.claimNextJob(factory());
const epoch0 = claim!.job.leaseEpoch;
// worker checkpoints WIP, then dies; force the lease to be expired
await coord.patchJobFenced(job.id, PID, {
leaseEpoch: epoch0,
stage: 'building',
checkpoint: { wipBranch: 'aq/wip/x', wipBase: 'base1', wipCommit: 'c1' },
});
const lease = await repo.getLease(job.id);
await repo.revUpdateLease(job.id, lease!.rev, { expiresAt: '2000-01-01T00:00:00.000Z' });
const res = await coord.reapExpiredLeases(new Date().toISOString());
expect(res.reaped).toBe(1);
const reclaimed = await repo.getJob(job.id, PID);
expect(reclaimed?.stage).toBe('queued');
expect(reclaimed?.leaseEpoch).toBe(epoch0 + 1); // fenced
expect(reclaimed?.checkpoint?.wipBranch).toBe('aq/wip/x'); // preserved
expect((await repo.getLease(job.id))?.status).toBe('expired');
// idempotent — running again reaps nothing
const again = await coord.reapExpiredLeases(new Date().toISOString());
expect(again.reaped).toBe(0);
});
it('reaper: the zombie (old epoch) is fenced out after reclaim', async () => {
const { job } = await coord.submitJob(PID, input());
const claim = await coord.claimNextJob(factory());
const zombieEpoch = claim!.job.leaseEpoch;
await coord.patchJobFenced(job.id, PID, { leaseEpoch: zombieEpoch, stage: 'building' });
const lease = await repo.getLease(job.id);
await repo.revUpdateLease(job.id, lease!.rev, { expiresAt: '2000-01-01T00:00:00.000Z' });
await coord.reapExpiredLeases(new Date().toISOString());
// zombie tries to write with its now-stale epoch -> fenced
const zombie = await coord.patchJobFenced(job.id, PID, {
leaseEpoch: zombieEpoch,
stage: 'shipped',
});
expect(zombie.ok).toBe(false);
if (!zombie.ok) expect(zombie.reason).toBe('fenced');
});
// ── HEARTBEAT ──
it('heartbeat updates lastHeartbeatAt/health; staleness is detectable', async () => {
await coord.heartbeat({
productId: PID,
factoryId: 'fac_1',
capabilities: ['os:mac'],
health: 'ok',
load: 1,
});
const fac = await repo.getFactory('fac_1', PID);
expect(fac?.health).toBe('ok');
expect(fac?.capabilities).toEqual(['os:mac']);
expect(coord.isFactoryStale(fac!, Date.now(), 60_000)).toBe(false);
const oldNow = new Date(fac!.lastHeartbeatAt).getTime() + 120_000;
expect(coord.isFactoryStale(fac!, oldNow, 60_000)).toBe(true);
});
// ── IDEMPOTENCY ──
it('idempotent submit: same key+content => 1 job', async () => {
const a = await coord.submitJob(PID, input({ idempotencyKey: 'k', bodyMd: 'same' }));
const b = await coord.submitJob(PID, input({ idempotencyKey: 'k', bodyMd: 'same' }));
expect(b.outcome).toBe('deduplicated');
expect(b.job.id).toBe(a.job.id);
expect(await repo.findJobsByIdempotencyKey(PID, 'k')).toHaveLength(1);
});
it('idempotent submit: same key+changed content while queued => superseded', async () => {
const a = await coord.submitJob(PID, input({ idempotencyKey: 'k', bodyMd: 'v1' }));
const b = await coord.submitJob(PID, input({ idempotencyKey: 'k', bodyMd: 'v2' }));
expect(b.outcome).toBe('superseded');
expect(b.job.id).toBe(a.job.id);
expect(b.job.bodyMd).toBe('v2');
expect(await repo.findJobsByIdempotencyKey(PID, 'k')).toHaveLength(1);
});
it('idempotent submit: same key+changed content past queued => 409', async () => {
await coord.submitJob(PID, input({ idempotencyKey: 'k', bodyMd: 'v1' }));
await coord.claimNextJob(factory()); // job now assigned (past queued)
await expect(
coord.submitJob(PID, input({ idempotencyKey: 'k', bodyMd: 'v2' }))
).rejects.toBeInstanceOf(ConflictError);
});
});

View File

@ -0,0 +1,503 @@
/**
* Fleet coordinator the concurrency core (Phase 2 §4/§7/§8/§18/§25).
*
* Responsibilities:
* - submitJob idempotent submit + submit-time dependency cycle detection
* - claimNextJob atomic, single-winner claim (priority+age, deps + capability
* gated) via a rev compare-and-swap, then lease acquisition
* - patchJobFenced fenced state transition (rejects a stale leaseEpoch)
* - renewLease / releaseLease
* - heartbeat factory liveness
* - reapExpiredLeases reclaim dead-worker jobs (bumps leaseEpoch to fence the
* zombie, returns the job to queued/blocked, preserves checkpoint)
*
* Concurrency model: `rev` is an optimistic-concurrency token on jobs + leases.
* Every contended mutation goes through repository.revUpdate*, which writes only
* if the stored `rev` still matches so under contention EXACTLY ONE caller wins
* and losers get a `conflict` and retry. Maps to Cosmos `_etag` / If-Match in prod.
*/
import { createHash } from 'node:crypto';
import { BadRequestError, ConflictError } from '../../lib/errors.js';
import * as repo from './repository.js';
import {
ACTIVE_STAGES,
DEP_DONE_HARD,
DEP_DONE_SOFT,
PRIORITY_ORDER,
type FleetJobDoc,
type FleetLeaseDoc,
type FleetRunDoc,
type FleetStage,
type SubmitJobInput,
} from './types.js';
const CLAIM_MAX_RETRIES = 8;
export function contentHash(bodyMd: string): string {
return createHash('sha256').update(bodyMd).digest('hex');
}
/** Every required capability token must be advertised by the factory. */
export function capabilitiesSubset(required: string[], available: string[]): boolean {
const set = new Set(available);
return required.every(token => set.has(token));
}
// ── Dependency evaluation (§5) ────────────────────────────────────────────────
/** Unmet dependency keys for a job given the current store state. */
export async function unmetDeps(job: FleetJobDoc): Promise<string[]> {
if (!job.deps || job.deps.length === 0) return [];
const done = job.depsMode === 'soft' ? DEP_DONE_SOFT : DEP_DONE_HARD;
const unmet: string[] = [];
for (const depKey of job.deps) {
const matches = await repo.findJobsByIdempotencyKey(job.productId, depKey);
const satisfied = matches.some(m => done.includes(m.stage));
if (!satisfied) unmet.push(depKey);
}
return unmet;
}
async function stageForDeps(job: FleetJobDoc): Promise<{ stage: FleetStage; unmet: string[] }> {
const unmet = await unmetDeps(job);
return { stage: unmet.length > 0 ? 'blocked' : 'queued', unmet };
}
/**
* Submit-time cycle detection: would a job with `newKey` depending on `newDeps`
* create a cycle in the idempotency-key dependency graph (existing jobs + new)?
*/
export async function wouldCreateCycle(
productId: string,
newKey: string,
newDeps: string[]
): Promise<boolean> {
if (newDeps.includes(newKey)) return true; // self-dependency
const visited = new Set<string>();
let frontier = [...newDeps];
while (frontier.length > 0) {
const next: string[] = [];
for (const key of frontier) {
if (key === newKey) return true; // reached back to the new node
if (visited.has(key)) continue;
visited.add(key);
const matches = await repo.findJobsByIdempotencyKey(productId, key);
for (const m of matches) next.push(...m.deps);
}
frontier = next;
}
return false;
}
// ── Submit (idempotent) ───────────────────────────────────────────────────────
export interface SubmitResult {
job: FleetJobDoc;
outcome: 'created' | 'deduplicated' | 'superseded';
}
export async function submitJob(productId: string, input: SubmitJobInput): Promise<SubmitResult> {
const hash = contentHash(input.bodyMd);
const existingForKey = await repo.findJobsByIdempotencyKey(productId, input.idempotencyKey);
// Idempotency (§4): same key + identical content => return existing (no dup).
const identical = existingForKey.find(j => j.contentHash === hash);
if (identical) return { job: identical, outcome: 'deduplicated' };
if (existingForKey.length > 0) {
// same key, different content
const supersedable = existingForKey.find(j => j.stage === 'queued' || j.stage === 'blocked');
if (!supersedable) {
throw new ConflictError(
`idempotency-key '${input.idempotencyKey}' already in use by an in-flight/terminal job with different content`
);
}
// supersede the still-queued/blocked job in place
if (await wouldCreateCycle(productId, input.idempotencyKey, input.deps)) {
throw new BadRequestError('dependency cycle detected — submission rejected');
}
const refreshed = applyInputToJob(supersedable, input, hash);
const { stage } = await stageForDeps(refreshed);
const updated = await repo.updateJob(supersedable.id, productId, {
...stripIdentity(refreshed),
stage,
});
await repo.appendEvent({
jobId: supersedable.id,
productId,
type: 'superseded',
data: { idempotencyKey: input.idempotencyKey },
});
return { job: updated ?? supersedable, outcome: 'superseded' };
}
// brand-new job
if (await wouldCreateCycle(productId, input.idempotencyKey, input.deps)) {
throw new BadRequestError('dependency cycle detected — submission rejected');
}
const now = new Date().toISOString();
const id = `fjob_${crypto.randomUUID()}`;
const base: FleetJobDoc = {
id,
productId,
stage: 'queued',
idempotencyKey: input.idempotencyKey,
contentHash: hash,
bodyMd: input.bodyMd,
manifestSnapshot: {
priority: input.priority,
capabilities: input.capabilities,
engineClass: input.engineClass,
profile: input.profile,
prefersEngine: input.prefersEngine,
allowedScope: input.allowedScope,
deps: input.deps,
depsMode: input.depsMode,
budget: input.budget,
retry: input.retry,
},
priority: input.priority,
priorityOrder: PRIORITY_ORDER[input.priority],
capabilities: input.capabilities,
engineClass: input.engineClass,
profile: input.profile,
deps: input.deps,
depsMode: input.depsMode,
budget: input.budget,
retry: input.retry,
kind: input.kind,
parentId: input.parentId,
trackerItemId: input.trackerItemId,
attempts: 0,
leaseEpoch: 0,
rev: 0,
createdAt: now,
updatedAt: now,
};
const { stage } = await stageForDeps(base);
base.stage = stage;
const created = await repo.createJob(base);
await repo.appendEvent({
jobId: id,
productId,
type: 'submitted',
data: { stage, idempotencyKey: input.idempotencyKey },
});
return { job: created, outcome: 'created' };
}
function applyInputToJob(job: FleetJobDoc, input: SubmitJobInput, hash: string): FleetJobDoc {
return {
...job,
contentHash: hash,
bodyMd: input.bodyMd,
priority: input.priority,
priorityOrder: PRIORITY_ORDER[input.priority],
capabilities: input.capabilities,
engineClass: input.engineClass,
profile: input.profile,
deps: input.deps,
depsMode: input.depsMode,
budget: input.budget,
retry: input.retry,
manifestSnapshot: {
priority: input.priority,
capabilities: input.capabilities,
engineClass: input.engineClass,
profile: input.profile,
prefersEngine: input.prefersEngine,
allowedScope: input.allowedScope,
deps: input.deps,
depsMode: input.depsMode,
budget: input.budget,
retry: input.retry,
},
};
}
function stripIdentity(job: FleetJobDoc): Partial<FleetJobDoc> {
const { id: _id, productId: _pid, createdAt: _c, rev: _r, ...rest } = job;
return rest;
}
// ── Atomic claim (§4/§7) ────────────────────────────────────────────────────
export interface ClaimContext {
productId: string;
factoryId: string;
capabilities: string[];
leaseSeconds: number;
}
export interface ClaimResult {
job: FleetJobDoc;
lease: FleetLeaseDoc;
run: FleetRunDoc;
}
/** A job is eligible for a factory iff queued/blocked-with-met-deps + caps subset. */
async function eligibleForClaim(job: FleetJobDoc, factoryCaps: string[]): Promise<boolean> {
if (job.stage !== 'queued' && job.stage !== 'blocked') return false;
if (!capabilitiesSubset(job.capabilities, factoryCaps)) return false;
const unmet = await unmetDeps(job);
return unmet.length === 0;
}
/**
* Try to claim ONE specific job version. The rev compare-and-swap is the single
* point at which exactly one contender wins; a stale `rev` => conflict (no write,
* no double-assignment).
*/
export async function tryClaimJob(
job: FleetJobDoc,
ctx: ClaimContext
): Promise<repo.RevResult<ClaimResult>> {
const newEpoch = job.leaseEpoch + 1;
const attempt = job.attempts + 1;
const claimed = await repo.revUpdateJob(job.id, ctx.productId, job.rev, {
stage: 'assigned',
leaseEpoch: newEpoch,
attempts: attempt,
});
if (!claimed.ok) return claimed;
const now = Date.now();
const expiresAt = new Date(now + ctx.leaseSeconds * 1000).toISOString();
const nowIso = new Date(now).toISOString();
const existingLease = await repo.getLease(job.id);
let lease: FleetLeaseDoc;
if (existingLease) {
const updated = await repo.revUpdateLease(job.id, existingLease.rev, {
holderFactoryId: ctx.factoryId,
expiresAt,
leaseEpoch: newEpoch,
renewals: 0,
status: 'held',
});
lease = updated.ok ? updated.doc : existingLease;
} else {
lease = await repo.createLease({
id: job.id,
productId: ctx.productId,
jobId: job.id,
holderFactoryId: ctx.factoryId,
expiresAt,
leaseEpoch: newEpoch,
renewals: 0,
status: 'held',
rev: 0,
updatedAt: nowIso,
});
}
const run = await repo.createRun({
id: `${job.id}:run:${attempt}`,
productId: ctx.productId,
jobId: job.id,
attempt,
factoryId: ctx.factoryId,
engine: job.engineClass ?? 'unknown',
startedAt: nowIso,
insights: {},
});
await repo.appendEvent({
jobId: job.id,
productId: ctx.productId,
type: 'assigned',
actor: ctx.factoryId,
data: { leaseEpoch: newEpoch, attempt },
});
return { ok: true, doc: { job: claimed.doc, lease, run } };
}
/** Select the highest-priority, oldest eligible job and atomically claim it. */
export async function claimNextJob(ctx: ClaimContext): Promise<ClaimResult | null> {
for (let i = 0; i < CLAIM_MAX_RETRIES; i++) {
const candidates = await repo.listJobs({ productId: ctx.productId });
const eligible: FleetJobDoc[] = [];
for (const job of candidates) {
if (await eligibleForClaim(job, ctx.capabilities)) eligible.push(job);
}
if (eligible.length === 0) return null;
eligible.sort(
(a, b) => a.priorityOrder - b.priorityOrder || a.createdAt.localeCompare(b.createdAt)
);
const result = await tryClaimJob(eligible[0], ctx);
if (result.ok) return result.doc;
if (result.reason === 'not_found') continue;
// conflict: another factory won this version — re-select and retry
}
return null;
}
// ── Fenced transitions + leases (§4/§8) ──────────────────────────────────────
export type FenceResult<T> =
| { ok: true; doc: T }
| { ok: false; reason: 'not_found' | 'fenced' | 'conflict' };
function fenced(job: FleetJobDoc, leaseEpoch: number): boolean {
// a call older than the current epoch is a stale/zombie worker — reject it
return leaseEpoch < job.leaseEpoch;
}
export interface PatchJobInputInternal {
leaseEpoch: number;
stage?: FleetStage;
checkpoint?: FleetJobDoc['checkpoint'];
blockedReason?: string;
}
export async function patchJobFenced(
jobId: string,
productId: string,
patch: PatchJobInputInternal
): Promise<FenceResult<FleetJobDoc>> {
const job = await repo.getJob(jobId, productId);
if (!job) return { ok: false, reason: 'not_found' };
if (fenced(job, patch.leaseEpoch)) return { ok: false, reason: 'fenced' };
const updates: Partial<FleetJobDoc> = {};
if (patch.stage) updates.stage = patch.stage;
if (patch.checkpoint) updates.checkpoint = patch.checkpoint;
if (patch.blockedReason !== undefined) updates.blockedReason = patch.blockedReason;
const res = await repo.revUpdateJob(jobId, productId, job.rev, updates);
if (!res.ok) return { ok: false, reason: res.reason === 'not_found' ? 'not_found' : 'conflict' };
await repo.appendEvent({
jobId,
productId,
type: 'transition',
data: { stage: patch.stage ?? job.stage, leaseEpoch: patch.leaseEpoch },
});
return { ok: true, doc: res.doc };
}
export async function renewLease(
jobId: string,
productId: string,
leaseEpoch: number,
leaseSeconds: number
): Promise<FenceResult<FleetLeaseDoc>> {
const job = await repo.getJob(jobId, productId);
if (!job) return { ok: false, reason: 'not_found' };
if (fenced(job, leaseEpoch)) return { ok: false, reason: 'fenced' };
const lease = await repo.getLease(jobId);
if (!lease) return { ok: false, reason: 'not_found' };
const expiresAt = new Date(Date.now() + leaseSeconds * 1000).toISOString();
const res = await repo.revUpdateLease(jobId, lease.rev, {
expiresAt,
renewals: lease.renewals + 1,
status: 'held',
});
if (!res.ok) return { ok: false, reason: 'conflict' };
await repo.appendEvent({ jobId, productId, type: 'lease_renewed', data: { leaseEpoch } });
return { ok: true, doc: res.doc };
}
export async function releaseLease(
jobId: string,
productId: string,
leaseEpoch: number,
stage?: FleetStage
): Promise<FenceResult<FleetLeaseDoc>> {
const job = await repo.getJob(jobId, productId);
if (!job) return { ok: false, reason: 'not_found' };
if (fenced(job, leaseEpoch)) return { ok: false, reason: 'fenced' };
const lease = await repo.getLease(jobId);
if (!lease) return { ok: false, reason: 'not_found' };
const res = await repo.revUpdateLease(jobId, lease.rev, { status: 'released' });
if (!res.ok) return { ok: false, reason: 'conflict' };
if (stage) await repo.revUpdateJob(jobId, productId, job.rev, { stage });
await repo.appendEvent({ jobId, productId, type: 'lease_released', data: { leaseEpoch, stage } });
return { ok: true, doc: res.doc };
}
// ── Heartbeat (§8) ────────────────────────────────────────────────────────────
export interface HeartbeatContext {
productId: string;
factoryId: string;
descriptor?: Record<string, unknown>;
capabilities?: string[];
health?: FleetFactoryHealthInput;
load?: number;
seatLimit?: number;
}
type FleetFactoryHealthInput = 'ok' | 'degraded' | 'down';
export async function heartbeat(ctx: HeartbeatContext): Promise<repo.RevResult<true>> {
const now = new Date().toISOString();
const existing = await repo.getFactory(ctx.factoryId, ctx.productId);
await repo.upsertFactory({
id: ctx.factoryId,
productId: ctx.productId,
factoryId: ctx.factoryId,
descriptor: ctx.descriptor ?? existing?.descriptor ?? {},
capabilities: ctx.capabilities ?? existing?.capabilities ?? [],
health: ctx.health ?? existing?.health ?? 'ok',
load: ctx.load ?? existing?.load ?? 0,
seatLimit: ctx.seatLimit ?? existing?.seatLimit ?? 1,
lastHeartbeatAt: now,
});
return { ok: true, doc: true };
}
/** A factory is stale if its last heartbeat is older than `maxAgeMs`. */
export function isFactoryStale(
factory: { lastHeartbeatAt: string },
nowMs: number,
maxAgeMs: number
): boolean {
return nowMs - new Date(factory.lastHeartbeatAt).getTime() > maxAgeMs;
}
// ── Reaper (§25.3) ────────────────────────────────────────────────────────────
export interface ReapResult {
reaped: number;
jobIds: string[];
}
/**
* Reclaim jobs whose lease has expired. Cosmos TTL would only DELETE the lease
* doc it cannot return the job to `queued`, bump the fencing epoch, or preserve
* the checkpoint so the reaper (not TTL) owns recovery. Idempotent: a reaped
* lease becomes `expired` and is no longer selected by listExpiredLeases.
*/
export async function reapExpiredLeases(nowIso: string): Promise<ReapResult> {
const expired = await repo.listExpiredLeases(nowIso);
const jobIds: string[] = [];
for (const lease of expired) {
const job = await repo.getJob(lease.jobId, lease.productId);
if (!job) continue;
if (!ACTIVE_STAGES.includes(job.stage)) continue; // already resting; nothing to reclaim
const newEpoch = job.leaseEpoch + 1; // fence the zombie holder
const unmet = await unmetDeps(job);
const stage: FleetStage = unmet.length > 0 ? 'blocked' : 'queued';
// checkpoint pointer is intentionally preserved on the job (resume-friendly)
const jobRes = await repo.revUpdateJob(job.id, lease.productId, job.rev, {
stage,
leaseEpoch: newEpoch,
blockedReason: unmet.length > 0 ? `waiting on: ${unmet.join(', ')}` : undefined,
});
if (!jobRes.ok) continue;
await repo.revUpdateLease(lease.jobId, lease.rev, {
status: 'expired',
leaseEpoch: newEpoch,
holderFactoryId: undefined,
});
await repo.appendEvent({
jobId: job.id,
productId: lease.productId,
type: 'lease_expired',
data: { leaseEpoch: newEpoch, returnedTo: stage },
});
jobIds.push(job.id);
}
return { reaped: jobIds.length, jobIds };
}

View File

@ -0,0 +1,187 @@
/**
* Fleet repository CRUD round-trips, list filters, appendEvent ordering, and
* the rev compare-and-swap. Runs on the in-memory datastore provider.
*/
import { afterEach, beforeEach, describe, expect, it } from 'vitest';
import { MemoryDatastoreProvider } from '@bytelyst/datastore';
import { _resetDatastoreProvider, setProvider } from '../../lib/datastore.js';
import * as repo from './repository.js';
import type { FleetJobDoc } from './types.js';
const PID = 'lysnrai';
const now = '2026-05-30T00:00:00.000Z';
function jobDoc(over: Partial<FleetJobDoc> = {}): FleetJobDoc {
return {
id: 'fjob_1',
productId: PID,
stage: 'queued',
idempotencyKey: 'task-1',
contentHash: 'h1',
bodyMd: '# task',
manifestSnapshot: {
priority: 'medium',
capabilities: [],
prefersEngine: [],
allowedScope: [],
deps: [],
},
priority: 'medium',
priorityOrder: 2,
capabilities: [],
deps: [],
kind: 'leaf',
attempts: 0,
leaseEpoch: 0,
rev: 0,
createdAt: now,
updatedAt: now,
...over,
};
}
describe('fleet repository', () => {
beforeEach(() => setProvider(new MemoryDatastoreProvider()));
afterEach(() => _resetDatastoreProvider());
it('jobs: create / getById / list by stage + idempotencyKey', async () => {
await repo.createJob(jobDoc({ id: 'fjob_1', idempotencyKey: 'a', stage: 'queued' }));
await repo.createJob(jobDoc({ id: 'fjob_2', idempotencyKey: 'b', stage: 'blocked' }));
expect((await repo.getJob('fjob_1', PID))?.idempotencyKey).toBe('a');
expect(await repo.getJob('missing', PID)).toBeNull();
const queued = await repo.listJobs({ productId: PID, stage: 'queued' });
expect(queued.map(j => j.id)).toEqual(['fjob_1']);
const byKey = await repo.findJobsByIdempotencyKey(PID, 'b');
expect(byKey).toHaveLength(1);
expect(byKey[0].id).toBe('fjob_2');
});
it('jobs: revUpdate is a compare-and-swap', async () => {
await repo.createJob(jobDoc({ id: 'fjob_cas', rev: 0, stage: 'queued' }));
const first = await repo.revUpdateJob('fjob_cas', PID, 0, { stage: 'assigned' });
expect(first.ok).toBe(true);
if (first.ok) expect(first.doc.rev).toBe(1);
// a second writer using the stale rev (0) must conflict
const stale = await repo.revUpdateJob('fjob_cas', PID, 0, { stage: 'building' });
expect(stale.ok).toBe(false);
if (!stale.ok) expect(stale.reason).toBe('conflict');
const missing = await repo.revUpdateJob('nope', PID, 0, { stage: 'failed' });
expect(missing.ok).toBe(false);
if (!missing.ok) expect(missing.reason).toBe('not_found');
});
it('runs: create + list ordered by attempt', async () => {
await repo.createRun({
id: 'r2',
productId: PID,
jobId: 'j',
attempt: 2,
engine: 'devin',
startedAt: now,
insights: {},
});
await repo.createRun({
id: 'r1',
productId: PID,
jobId: 'j',
attempt: 1,
engine: 'devin',
startedAt: now,
insights: {},
});
const runs = await repo.listRunsByJob('j');
expect(runs.map(r => r.attempt)).toEqual([1, 2]);
});
it('leases: create + revUpdate', async () => {
await repo.createLease({
id: 'j',
productId: PID,
jobId: 'j',
leaseEpoch: 1,
renewals: 0,
status: 'held',
rev: 0,
updatedAt: now,
});
const res = await repo.revUpdateLease('j', 0, { renewals: 1 });
expect(res.ok).toBe(true);
if (res.ok) expect(res.doc.renewals).toBe(1);
expect((await repo.getLease('j'))?.rev).toBe(1);
});
it('factories: upsert + list by productId', async () => {
await repo.upsertFactory({
id: 'fac_1',
productId: PID,
factoryId: 'fac_1',
descriptor: {},
capabilities: ['os:mac'],
health: 'ok',
load: 0,
seatLimit: 1,
lastHeartbeatAt: now,
});
await repo.upsertFactory({
id: 'fac_1',
productId: PID,
factoryId: 'fac_1',
descriptor: {},
capabilities: ['os:mac'],
health: 'degraded',
load: 2,
seatLimit: 1,
lastHeartbeatAt: now,
});
const list = await repo.listFactories(PID);
expect(list).toHaveLength(1);
expect(list[0].health).toBe('degraded');
});
it('profiles: create + get', async () => {
await repo.createProfile({
id: 'prof_1',
productId: PID,
name: 'backend',
version: 1,
snapshot: { persona: 'x' },
createdAt: now,
});
expect((await repo.getProfile('prof_1', PID))?.name).toBe('backend');
});
it('events: appendEvent yields an ordered, append-only stream', async () => {
await repo.appendEvent({ jobId: 'j', productId: PID, type: 'submitted' });
await repo.appendEvent({ jobId: 'j', productId: PID, type: 'assigned', actor: 'fac_1' });
await repo.appendEvent({
jobId: 'j',
productId: PID,
type: 'transition',
data: { stage: 'building' },
});
const events = await repo.listEvents('j');
expect(events.map(e => e.seq)).toEqual([0, 1, 2]);
expect(events.map(e => e.type)).toEqual(['submitted', 'assigned', 'transition']);
});
it('artifacts: create + list', async () => {
await repo.createArtifact({
id: 'art_1',
productId: PID,
jobId: 'j',
kind: 'coverage',
blobUrl: 'https://b/x',
createdAt: now,
});
const arts = await repo.listArtifacts('j');
expect(arts).toHaveLength(1);
expect(arts[0].blobUrl).toBe('https://b/x');
});
});

View File

@ -0,0 +1,257 @@
/**
* Fleet repositories one per fleet_* container, cloud-agnostic via @bytelyst/datastore.
*
* Partition keys (see lib/cosmos-init.ts):
* fleet_jobs /productId fleet_runs /jobId
* fleet_leases /jobId fleet_factories /productId
* fleet_profiles /productId fleet_events /jobId
* fleet_artifacts /jobId
*
* Optimistic concurrency: jobs and leases carry a monotonic `rev` token.
* `revUpdate*` is a compare-and-swap it writes only when the stored `rev`
* still equals the caller's expected `rev`, otherwise it reports a conflict
* WITHOUT writing. In production (Cosmos) this maps to an `_etag` / If-Match
* conditional replace; on the memory provider it is enforced by re-reading the
* current `rev` immediately before the write, which is exact for the sequential
* calls the coordinator + tests make (see coordinator.claimNextJob).
*/
import type { DocumentCollection } from '@bytelyst/datastore';
import { getCollection } from '../../lib/datastore.js';
import type {
FleetArtifactDoc,
FleetEventDoc,
FleetFactoryDoc,
FleetJobDoc,
FleetLeaseDoc,
FleetProfileDoc,
FleetRunDoc,
FleetStage,
} from './types.js';
// ── Collections ───────────────────────────────────────────────────────────────
function jobs(): DocumentCollection<FleetJobDoc> {
return getCollection<FleetJobDoc>('fleet_jobs', '/productId');
}
function runs(): DocumentCollection<FleetRunDoc> {
return getCollection<FleetRunDoc>('fleet_runs', '/jobId');
}
function leases(): DocumentCollection<FleetLeaseDoc> {
return getCollection<FleetLeaseDoc>('fleet_leases', '/jobId');
}
function factories(): DocumentCollection<FleetFactoryDoc> {
return getCollection<FleetFactoryDoc>('fleet_factories', '/productId');
}
function profiles(): DocumentCollection<FleetProfileDoc> {
return getCollection<FleetProfileDoc>('fleet_profiles', '/productId');
}
function events(): DocumentCollection<FleetEventDoc> {
return getCollection<FleetEventDoc>('fleet_events', '/jobId');
}
function artifacts(): DocumentCollection<FleetArtifactDoc> {
return getCollection<FleetArtifactDoc>('fleet_artifacts', '/jobId');
}
/** Result of a compare-and-swap update. */
export type RevResult<T> = { ok: true; doc: T } | { ok: false; reason: 'not_found' | 'conflict' };
// ── Jobs ──────────────────────────────────────────────────────────────────────
export async function createJob(doc: FleetJobDoc): Promise<FleetJobDoc> {
return jobs().create(doc);
}
export async function getJob(id: string, productId: string): Promise<FleetJobDoc | null> {
return jobs().findById(id, productId);
}
export interface ListJobsFilter {
productId: string;
stage?: FleetStage;
idempotencyKey?: string;
limit?: number;
offset?: number;
}
export async function listJobs(f: ListJobsFilter): Promise<FleetJobDoc[]> {
const filter: Record<string, string> = { productId: f.productId };
if (f.stage) filter.stage = f.stage;
if (f.idempotencyKey) filter.idempotencyKey = f.idempotencyKey;
return jobs().findMany({
filter,
sort: { priorityOrder: 1, createdAt: 1 },
offset: f.offset,
limit: f.limit,
});
}
/** All jobs for a product sharing an idempotency-key (dedupe lookups). */
export async function findJobsByIdempotencyKey(
productId: string,
idempotencyKey: string
): Promise<FleetJobDoc[]> {
return jobs().findMany({ filter: { productId, idempotencyKey } });
}
/** Unconditional merge update (use only when concurrency is not contended). */
export async function updateJob(
id: string,
productId: string,
updates: Partial<FleetJobDoc>
): Promise<FleetJobDoc | null> {
const cur = await jobs().findById(id, productId);
if (!cur) return null;
return jobs().update(id, productId, {
...updates,
rev: (cur.rev ?? 0) + 1,
updatedAt: new Date().toISOString(),
});
}
/** Compare-and-swap on `rev` — the atomic-claim / fenced-transition primitive. */
export async function revUpdateJob(
id: string,
productId: string,
expectedRev: number,
updates: Partial<FleetJobDoc>
): Promise<RevResult<FleetJobDoc>> {
const cur = await jobs().findById(id, productId);
if (!cur) return { ok: false, reason: 'not_found' };
if ((cur.rev ?? 0) !== expectedRev) return { ok: false, reason: 'conflict' };
const doc = await jobs().update(id, productId, {
...updates,
rev: expectedRev + 1,
updatedAt: new Date().toISOString(),
});
return { ok: true, doc };
}
export async function deleteJob(id: string, productId: string): Promise<void> {
await jobs().delete(id, productId);
}
// ── Runs ────────────────────────────────────────────────────────────────────
export async function createRun(doc: FleetRunDoc): Promise<FleetRunDoc> {
return runs().create(doc);
}
export async function updateRun(
id: string,
jobId: string,
updates: Partial<FleetRunDoc>
): Promise<FleetRunDoc | null> {
const cur = await runs().findById(id, jobId);
if (!cur) return null;
return runs().update(id, jobId, updates);
}
export async function listRunsByJob(jobId: string): Promise<FleetRunDoc[]> {
return runs().findMany({ filter: { jobId }, sort: { attempt: 1 } });
}
// ── Leases ──────────────────────────────────────────────────────────────────
export async function getLease(jobId: string): Promise<FleetLeaseDoc | null> {
return leases().findById(jobId, jobId);
}
export async function createLease(doc: FleetLeaseDoc): Promise<FleetLeaseDoc> {
return leases().create(doc);
}
export async function revUpdateLease(
jobId: string,
expectedRev: number,
updates: Partial<FleetLeaseDoc>
): Promise<RevResult<FleetLeaseDoc>> {
const cur = await leases().findById(jobId, jobId);
if (!cur) return { ok: false, reason: 'not_found' };
if ((cur.rev ?? 0) !== expectedRev) return { ok: false, reason: 'conflict' };
const doc = await leases().update(jobId, jobId, {
...updates,
rev: expectedRev + 1,
updatedAt: new Date().toISOString(),
});
return { ok: true, doc };
}
export async function listExpiredLeases(nowIso: string): Promise<FleetLeaseDoc[]> {
return leases().findMany({
filter: { status: 'held', expiresAt: { $lt: nowIso } },
});
}
// ── Factories ─────────────────────────────────────────────────────────────────
export async function getFactory(
factoryId: string,
productId: string
): Promise<FleetFactoryDoc | null> {
return factories().findById(factoryId, productId);
}
export async function upsertFactory(doc: FleetFactoryDoc): Promise<FleetFactoryDoc> {
return factories().upsert(doc);
}
export async function listFactories(productId: string): Promise<FleetFactoryDoc[]> {
return factories().findMany({ filter: { productId } });
}
// ── Profiles ────────────────────────────────────────────────────────────────
export async function createProfile(doc: FleetProfileDoc): Promise<FleetProfileDoc> {
return profiles().create(doc);
}
export async function getProfile(id: string, productId: string): Promise<FleetProfileDoc | null> {
return profiles().findById(id, productId);
}
export async function listProfiles(productId: string): Promise<FleetProfileDoc[]> {
return profiles().findMany({ filter: { productId } });
}
// ── Events (append-only) ────────────────────────────────────────────────────
export interface AppendEventInput {
jobId: string;
productId: string;
type: string;
actor?: string;
data?: Record<string, unknown>;
}
/** Append an ordered event to a job's stream; `seq` is monotonic per job. */
export async function appendEvent(input: AppendEventInput): Promise<FleetEventDoc> {
const existing = await events().findMany({ filter: { jobId: input.jobId } });
const seq = existing.length;
const doc: FleetEventDoc = {
id: `${input.jobId}:evt:${seq}`,
productId: input.productId,
jobId: input.jobId,
seq,
type: input.type,
at: new Date().toISOString(),
actor: input.actor,
data: input.data ?? {},
};
return events().create(doc);
}
export async function listEvents(jobId: string): Promise<FleetEventDoc[]> {
const docs = await events().findMany({ filter: { jobId }, sort: { seq: 1 } });
return docs;
}
// ── Artifacts ─────────────────────────────────────────────────────────────────
export async function createArtifact(doc: FleetArtifactDoc): Promise<FleetArtifactDoc> {
return artifacts().create(doc);
}
export async function listArtifacts(jobId: string): Promise<FleetArtifactDoc[]> {
return artifacts().findMany({ filter: { jobId }, sort: { createdAt: 1 } });
}

View File

@ -0,0 +1,144 @@
/**
* Fleet routes Fastify inject, real coordinator/repo on the memory provider.
* Auth + productId resolution are mocked (as in the items module routes test).
*/
import Fastify, { type FastifyInstance } from 'fastify';
import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest';
import { MemoryDatastoreProvider } from '@bytelyst/datastore';
import { _resetDatastoreProvider, setProvider } from '../../lib/datastore.js';
vi.mock('../../lib/auth.js', () => ({
extractAuth: vi.fn(async () => ({ sub: 'user_1', role: 'admin' })),
}));
vi.mock('../../lib/request-context.js', () => ({
getRequestProductId: () => 'lysnrai',
}));
async function buildApp(): Promise<FastifyInstance> {
const { fleetRoutes } = await import('./routes.js');
// Fastify's default error handler maps a thrown ServiceError (which carries a
// `.statusCode`) to the right HTTP status — same as the items routes test.
const app = Fastify({ logger: false });
await app.register(fleetRoutes, { prefix: '/api' });
return app;
}
async function submit(app: FastifyInstance, body: Record<string, unknown>) {
return app.inject({ method: 'POST', url: '/api/fleet/jobs', payload: body });
}
describe('fleetRoutes', () => {
beforeEach(() => setProvider(new MemoryDatastoreProvider()));
afterEach(() => {
_resetDatastoreProvider();
vi.clearAllMocks();
});
it('POST /fleet/jobs submits (201) and is idempotent (200 dedup)', async () => {
const app = await buildApp();
const r1 = await submit(app, { idempotencyKey: 'k1', bodyMd: '# task' });
expect(r1.statusCode).toBe(201);
expect(JSON.parse(r1.body).outcome).toBe('created');
const r2 = await submit(app, { idempotencyKey: 'k1', bodyMd: '# task' });
expect(r2.statusCode).toBe(200);
expect(JSON.parse(r2.body).outcome).toBe('deduplicated');
});
it('POST /fleet/jobs rejects an invalid body (400)', async () => {
const app = await buildApp();
const res = await submit(app, { idempotencyKey: 'k1' }); // missing bodyMd
expect(res.statusCode).toBe(400);
});
it('full lifecycle: submit -> list -> claim -> patch(fenced) -> renew -> release', async () => {
const app = await buildApp();
const sub = await submit(app, { idempotencyKey: 'k1', bodyMd: '# task', priority: 'high' });
const jobId = JSON.parse(sub.body).job.id as string;
const list = await app.inject({ method: 'GET', url: '/api/fleet/jobs?stage=queued' });
expect(list.statusCode).toBe(200);
expect(JSON.parse(list.body).jobs).toHaveLength(1);
const claim = await app.inject({
method: 'POST',
url: '/api/fleet/claim',
payload: { factoryId: 'fac_1', capabilities: [] },
});
expect(claim.statusCode).toBe(200);
const claimed = JSON.parse(claim.body);
expect(claimed.claimed).toBe(true);
expect(claimed.job.stage).toBe('assigned');
const epoch = claimed.job.leaseEpoch as number;
expect(epoch).toBe(1);
// fenced: stale epoch rejected (409)
const stale = await app.inject({
method: 'PATCH',
url: `/api/fleet/jobs/${jobId}`,
payload: { leaseEpoch: epoch - 1, stage: 'building' },
});
expect(stale.statusCode).toBe(409);
// current epoch succeeds
const patch = await app.inject({
method: 'PATCH',
url: `/api/fleet/jobs/${jobId}`,
payload: { leaseEpoch: epoch, stage: 'building' },
});
expect(patch.statusCode).toBe(200);
expect(JSON.parse(patch.body).stage).toBe('building');
const renew = await app.inject({
method: 'POST',
url: `/api/fleet/jobs/${jobId}/lease/renew`,
payload: { leaseEpoch: epoch, leaseSeconds: 600 },
});
expect(renew.statusCode).toBe(200);
expect(JSON.parse(renew.body).renewals).toBe(1);
const release = await app.inject({
method: 'POST',
url: `/api/fleet/jobs/${jobId}/lease/release`,
payload: { leaseEpoch: epoch, stage: 'review' },
});
expect(release.statusCode).toBe(200);
expect(JSON.parse(release.body).status).toBe('released');
const events = await app.inject({ method: 'GET', url: `/api/fleet/jobs/${jobId}/events` });
const types = JSON.parse(events.body).events.map((e: { type: string }) => e.type);
expect(types).toContain('submitted');
expect(types).toContain('assigned');
});
it('POST /fleet/claim returns claimed:false when nothing is eligible', async () => {
const app = await buildApp();
const res = await app.inject({
method: 'POST',
url: '/api/fleet/claim',
payload: { factoryId: 'fac_1' },
});
expect(res.statusCode).toBe(200);
expect(JSON.parse(res.body).claimed).toBe(false);
});
it('POST /fleet/factories/heartbeat upserts a factory', async () => {
const app = await buildApp();
const res = await app.inject({
method: 'POST',
url: '/api/fleet/factories/heartbeat',
payload: { factoryId: 'fac_1', capabilities: ['os:mac'], health: 'ok' },
});
expect(res.statusCode).toBe(200);
const body = JSON.parse(res.body);
expect(body.ok).toBe(true);
expect(body.factory.factoryId).toBe('fac_1');
});
it('GET /fleet/jobs/:id returns 404 when missing', async () => {
const app = await buildApp();
const res = await app.inject({ method: 'GET', url: '/api/fleet/jobs/nope' });
expect(res.statusCode).toBe(404);
});
});

View File

@ -0,0 +1,182 @@
/**
* Fleet REST endpoints agent gigafactory coordinator.
*
* POST /fleet/jobs submit a job (idempotent)
* GET /fleet/jobs list jobs (by stage / idempotencyKey)
* GET /fleet/jobs/:id get one job
* PATCH /fleet/jobs/:id fenced state transition (carries leaseEpoch)
* POST /fleet/claim atomic claim for a factory
* POST /fleet/jobs/:id/lease/renew renew a held lease
* POST /fleet/jobs/:id/lease/release release a held lease
* POST /fleet/factories/heartbeat factory liveness
* GET /fleet/jobs/:id/runs job run history
* GET /fleet/jobs/:id/events append-only event stream
*
* All routes require auth + a resolved productId, exactly like the items module.
*/
import type { FastifyInstance } from 'fastify';
import { getRequestProductId } from '../../lib/request-context.js';
import { BadRequestError, ConflictError, NotFoundError } from '../../lib/errors.js';
import { extractAuth } from '../../lib/auth.js';
import * as repo from './repository.js';
import * as coordinator from './coordinator.js';
import {
SubmitJobSchema,
ListJobsQuerySchema,
PatchJobSchema,
ClaimSchema,
RenewLeaseSchema,
ReleaseLeaseSchema,
HeartbeatSchema,
} from './types.js';
function badRequest(issues: { message: string }[]): never {
throw new BadRequestError(issues.map(i => i.message).join('; '));
}
export async function fleetRoutes(app: FastifyInstance) {
// ── Submit (idempotent) ──
app.post('/fleet/jobs', async (req, reply) => {
await extractAuth(req);
const parsed = SubmitJobSchema.safeParse(req.body);
if (!parsed.success) badRequest(parsed.error.issues);
const pid = parsed.data.productId || getRequestProductId(req);
const result = await coordinator.submitJob(pid, parsed.data);
reply.code(result.outcome === 'created' ? 201 : 200);
return { outcome: result.outcome, job: result.job };
});
// ── List ──
app.get('/fleet/jobs', async req => {
await extractAuth(req);
const parsed = ListJobsQuerySchema.safeParse(req.query);
if (!parsed.success) badRequest(parsed.error.issues);
const q = parsed.data;
const pid = q.productId || getRequestProductId(req);
const jobs = await repo.listJobs({
productId: pid,
stage: q.stage,
idempotencyKey: q.idempotencyKey,
limit: q.limit,
offset: q.offset,
});
return { jobs, limit: q.limit, offset: q.offset };
});
// ── Get one ──
app.get('/fleet/jobs/:id', async req => {
await extractAuth(req);
const { id } = req.params as { id: string };
const pid = getRequestProductId(req);
const job = await repo.getJob(id, pid);
if (!job) throw new NotFoundError('Job not found');
return job;
});
// ── Fenced state transition ──
app.patch('/fleet/jobs/:id', async req => {
await extractAuth(req);
const { id } = req.params as { id: string };
const pid = getRequestProductId(req);
const parsed = PatchJobSchema.safeParse(req.body);
if (!parsed.success) badRequest(parsed.error.issues);
const res = await coordinator.patchJobFenced(id, pid, parsed.data);
if (!res.ok) {
if (res.reason === 'not_found') throw new NotFoundError('Job not found');
if (res.reason === 'fenced') {
throw new ConflictError('stale leaseEpoch — transition fenced (job reassigned)');
}
throw new ConflictError('concurrent update conflict — retry');
}
return res.doc;
});
// ── Atomic claim ──
app.post('/fleet/claim', async req => {
await extractAuth(req);
const parsed = ClaimSchema.safeParse(req.body);
if (!parsed.success) badRequest(parsed.error.issues);
const pid = parsed.data.productId || getRequestProductId(req);
const claim = await coordinator.claimNextJob({
productId: pid,
factoryId: parsed.data.factoryId,
capabilities: parsed.data.capabilities,
leaseSeconds: parsed.data.leaseSeconds,
});
if (!claim) return { claimed: false };
return { claimed: true, ...claim };
});
// ── Lease renew ──
app.post('/fleet/jobs/:id/lease/renew', async req => {
await extractAuth(req);
const { id } = req.params as { id: string };
const pid = getRequestProductId(req);
const parsed = RenewLeaseSchema.safeParse(req.body);
if (!parsed.success) badRequest(parsed.error.issues);
const res = await coordinator.renewLease(
id,
pid,
parsed.data.leaseEpoch,
parsed.data.leaseSeconds
);
if (!res.ok) {
if (res.reason === 'not_found') throw new NotFoundError('Job or lease not found');
if (res.reason === 'fenced') throw new ConflictError('stale leaseEpoch — renew fenced');
throw new ConflictError('lease renew conflict — retry');
}
return res.doc;
});
// ── Lease release ──
app.post('/fleet/jobs/:id/lease/release', async req => {
await extractAuth(req);
const { id } = req.params as { id: string };
const pid = getRequestProductId(req);
const parsed = ReleaseLeaseSchema.safeParse(req.body);
if (!parsed.success) badRequest(parsed.error.issues);
const res = await coordinator.releaseLease(id, pid, parsed.data.leaseEpoch, parsed.data.stage);
if (!res.ok) {
if (res.reason === 'not_found') throw new NotFoundError('Job or lease not found');
if (res.reason === 'fenced') throw new ConflictError('stale leaseEpoch — release fenced');
throw new ConflictError('lease release conflict — retry');
}
return res.doc;
});
// ── Factory heartbeat ──
app.post('/fleet/factories/heartbeat', async req => {
await extractAuth(req);
const parsed = HeartbeatSchema.safeParse(req.body);
if (!parsed.success) badRequest(parsed.error.issues);
const pid = parsed.data.productId || getRequestProductId(req);
await coordinator.heartbeat({
productId: pid,
factoryId: parsed.data.factoryId,
descriptor: parsed.data.descriptor,
capabilities: parsed.data.capabilities,
health: parsed.data.health,
load: parsed.data.load,
seatLimit: parsed.data.seatLimit,
});
const factory = await repo.getFactory(parsed.data.factoryId, pid);
return { ok: true, factory };
});
// ── Runs ──
app.get('/fleet/jobs/:id/runs', async req => {
await extractAuth(req);
const { id } = req.params as { id: string };
const runs = await repo.listRunsByJob(id);
return { runs };
});
// ── Events ──
app.get('/fleet/jobs/:id/events', async req => {
await extractAuth(req);
const { id } = req.params as { id: string };
const events = await repo.listEvents(id);
return { events };
});
}

View File

@ -0,0 +1,219 @@
/**
* Fleet schema validation valid docs pass; missing productId / bad enum fail.
*/
import { describe, it, expect } from 'vitest';
import {
FleetJobDocSchema,
FleetRunDocSchema,
FleetLeaseDocSchema,
FleetFactoryDocSchema,
FleetProfileDocSchema,
FleetEventDocSchema,
FleetArtifactDocSchema,
SubmitJobSchema,
PatchJobSchema,
ClaimSchema,
FLEET_STAGES,
FLEET_PRIORITIES,
} from './types.js';
const now = '2026-05-30T00:00:00.000Z';
const validJob = {
id: 'fjob_1',
productId: 'lysnrai',
stage: 'queued',
idempotencyKey: 'task-1',
contentHash: 'abc',
bodyMd: '# task',
manifestSnapshot: {
priority: 'medium',
capabilities: [],
prefersEngine: [],
allowedScope: [],
deps: [],
},
priority: 'medium',
priorityOrder: 2,
capabilities: [],
deps: [],
kind: 'leaf',
attempts: 0,
leaseEpoch: 0,
rev: 0,
createdAt: now,
updatedAt: now,
};
describe('FleetJobDocSchema', () => {
it('accepts a valid job', () => {
expect(FleetJobDocSchema.safeParse(validJob).success).toBe(true);
});
it('rejects a missing productId', () => {
const { productId: _omit, ...bad } = validJob;
expect(FleetJobDocSchema.safeParse(bad).success).toBe(false);
});
it('rejects an empty productId', () => {
expect(FleetJobDocSchema.safeParse({ ...validJob, productId: '' }).success).toBe(false);
});
it('rejects an invalid stage enum', () => {
expect(FleetJobDocSchema.safeParse({ ...validJob, stage: 'done' }).success).toBe(false);
});
it('rejects an invalid priority enum', () => {
expect(FleetJobDocSchema.safeParse({ ...validJob, priority: 'urgent' }).success).toBe(false);
});
});
describe('FleetRunDocSchema', () => {
const validRun = {
id: 'fjob_1:run:1',
productId: 'lysnrai',
jobId: 'fjob_1',
attempt: 1,
engine: 'devin',
startedAt: now,
insights: { tokensIn: 10, tokensOut: 5 },
};
it('accepts a valid run', () => {
expect(FleetRunDocSchema.safeParse(validRun).success).toBe(true);
});
it('rejects missing productId', () => {
const { productId: _o, ...bad } = validRun;
expect(FleetRunDocSchema.safeParse(bad).success).toBe(false);
});
it('rejects attempt <= 0', () => {
expect(FleetRunDocSchema.safeParse({ ...validRun, attempt: 0 }).success).toBe(false);
});
});
describe('FleetLeaseDocSchema', () => {
const validLease = {
id: 'fjob_1',
productId: 'lysnrai',
jobId: 'fjob_1',
leaseEpoch: 1,
renewals: 0,
status: 'held',
rev: 0,
updatedAt: now,
};
it('accepts a valid lease', () => {
expect(FleetLeaseDocSchema.safeParse(validLease).success).toBe(true);
});
it('rejects an invalid status', () => {
expect(FleetLeaseDocSchema.safeParse({ ...validLease, status: 'open' }).success).toBe(false);
});
it('rejects missing productId', () => {
const { productId: _o, ...bad } = validLease;
expect(FleetLeaseDocSchema.safeParse(bad).success).toBe(false);
});
});
describe('FleetFactoryDocSchema', () => {
const validFactory = {
id: 'fac_1',
productId: 'lysnrai',
factoryId: 'fac_1',
descriptor: { os: 'mac' },
capabilities: ['os:mac'],
health: 'ok',
load: 0,
seatLimit: 2,
lastHeartbeatAt: now,
};
it('accepts a valid factory', () => {
expect(FleetFactoryDocSchema.safeParse(validFactory).success).toBe(true);
});
it('rejects invalid health enum', () => {
expect(FleetFactoryDocSchema.safeParse({ ...validFactory, health: 'sick' }).success).toBe(
false
);
});
});
describe('FleetProfileDocSchema / FleetEventDocSchema / FleetArtifactDocSchema', () => {
it('accepts a valid profile and rejects missing productId', () => {
const valid = {
id: 'prof_1',
productId: 'lysnrai',
name: 'backend',
version: 1,
snapshot: {},
createdAt: now,
};
expect(FleetProfileDocSchema.safeParse(valid).success).toBe(true);
const { productId: _o, ...bad } = valid;
expect(FleetProfileDocSchema.safeParse(bad).success).toBe(false);
});
it('accepts a valid event and rejects missing type', () => {
const valid = {
id: 'fjob_1:evt:0',
productId: 'lysnrai',
jobId: 'fjob_1',
seq: 0,
type: 'submitted',
at: now,
data: {},
};
expect(FleetEventDocSchema.safeParse(valid).success).toBe(true);
const { type: _t, ...bad } = valid;
expect(FleetEventDocSchema.safeParse(bad).success).toBe(false);
});
it('accepts a valid artifact and rejects missing blobUrl', () => {
const valid = {
id: 'art_1',
productId: 'lysnrai',
jobId: 'fjob_1',
kind: 'coverage',
blobUrl: 'https://b/x',
createdAt: now,
};
expect(FleetArtifactDocSchema.safeParse(valid).success).toBe(true);
const { blobUrl: _b, ...bad } = valid;
expect(FleetArtifactDocSchema.safeParse(bad).success).toBe(false);
});
});
describe('request schemas', () => {
it('SubmitJobSchema applies defaults', () => {
const parsed = SubmitJobSchema.safeParse({ idempotencyKey: 'k', bodyMd: '# do' });
expect(parsed.success).toBe(true);
if (parsed.success) {
expect(parsed.data.priority).toBe('medium');
expect(parsed.data.kind).toBe('leaf');
expect(parsed.data.deps).toEqual([]);
}
});
it('SubmitJobSchema rejects empty bodyMd / idempotencyKey', () => {
expect(SubmitJobSchema.safeParse({ idempotencyKey: '', bodyMd: 'x' }).success).toBe(false);
expect(SubmitJobSchema.safeParse({ idempotencyKey: 'k', bodyMd: '' }).success).toBe(false);
});
it('PatchJobSchema requires leaseEpoch', () => {
expect(PatchJobSchema.safeParse({ stage: 'building' }).success).toBe(false);
expect(PatchJobSchema.safeParse({ leaseEpoch: 1, stage: 'building' }).success).toBe(true);
});
it('ClaimSchema requires factoryId', () => {
expect(ClaimSchema.safeParse({ capabilities: [] }).success).toBe(false);
expect(ClaimSchema.safeParse({ factoryId: 'fac_1' }).success).toBe(true);
});
});
describe('enum constants', () => {
it('stages match the agent-queue lifecycle', () => {
expect(FLEET_STAGES).toEqual([
'queued',
'blocked',
'assigned',
'building',
'review',
'testing',
'shipped',
'failed',
'dead_letter',
]);
});
it('priorities', () => {
expect(FLEET_PRIORITIES).toEqual(['critical', 'high', 'medium', 'low']);
});
});

View File

@ -0,0 +1,317 @@
/**
* Fleet module types for the agent-gigafactory coordinator (Phase 2 foundation).
*
* Durable data model for distributed agent jobs: jobs, runs, leases, factories,
* profiles, events, and artifacts. Product-agnostic every document carries a
* `productId`. Zod schemas are the source of truth; doc/input types are inferred
* from them (no `any`).
*
* Field names + lifecycle mirror the agent-queue gigafactory spec
* (../learning_ai_devops_tools/agent-queue/docs/GIGAFACTORY_ROADMAP.md §4/§7/§8/§13).
*/
import { z } from 'zod';
// ── Enums ───────────────────────────────────────────────────────────────────
/** Canonical job lifecycle (§11). `blocked` = unmet deps; `dead_letter` = retries exhausted. */
export const FLEET_STAGES = [
'queued',
'blocked',
'assigned',
'building',
'review',
'testing',
'shipped',
'failed',
'dead_letter',
] as const;
export type FleetStage = (typeof FLEET_STAGES)[number];
/** Stages from which a worker may still progress / that count as "in flight". */
export const ACTIVE_STAGES: readonly FleetStage[] = ['assigned', 'building', 'review', 'testing'];
/** Stages that satisfy a hard dependency / a soft dependency. */
export const DEP_DONE_HARD: readonly FleetStage[] = ['shipped'];
export const DEP_DONE_SOFT: readonly FleetStage[] = ['shipped', 'testing'];
export const FLEET_PRIORITIES = ['critical', 'high', 'medium', 'low'] as const;
export type FleetPriority = (typeof FLEET_PRIORITIES)[number];
export const PRIORITY_ORDER: Record<FleetPriority, number> = {
critical: 0,
high: 1,
medium: 2,
low: 3,
};
export const FLEET_ENGINE_CLASSES = ['agentic-coder', 'chat-coder', 'review-only'] as const;
export type FleetEngineClass = (typeof FLEET_ENGINE_CLASSES)[number];
export const DEPS_MODES = ['hard', 'soft'] as const;
export type DepsMode = (typeof DEPS_MODES)[number];
/** Failure classes a `retry.on` may name (mirrors agent-queue). */
export const RETRY_CLASSES = ['timeout', 'verify_failed', 'crash', 'agent_error'] as const;
export type RetryClass = (typeof RETRY_CLASSES)[number];
/** Terminal run results recorded on a FleetRunDoc. */
export const RUN_RESULTS = [
'review',
'testing',
'shipped',
'failed',
'timeout',
'verify_failed',
'capability_mismatch',
'no_engine',
'retries_exhausted',
] as const;
export type RunResult = (typeof RUN_RESULTS)[number];
export const FACTORY_HEALTH = ['ok', 'degraded', 'down'] as const;
export type FactoryHealth = (typeof FACTORY_HEALTH)[number];
export const LEASE_STATUS = ['held', 'expired', 'released'] as const;
export type LeaseStatus = (typeof LEASE_STATUS)[number];
export const JOB_KINDS = ['leaf', 'composite'] as const;
export type JobKind = (typeof JOB_KINDS)[number];
// ── Shared value objects ─────────────────────────────────────────────────────
export const CheckpointSchema = z.object({
wipBranch: z.string(),
wipBase: z.string().optional(),
wipCommit: z.string().optional(),
});
export const BudgetSchema = z.object({
usd: z.number().nonnegative().optional(),
tokens: z.number().nonnegative().optional(),
wall: z.number().nonnegative().optional(),
});
export const RetryPolicySchema = z.object({
max: z.number().int().nonnegative(),
backoff: z.string().optional(),
on: z.array(z.enum(RETRY_CLASSES)).default([]),
});
export const ManifestSnapshotSchema = z.object({
priority: z.enum(FLEET_PRIORITIES).default('medium'),
capabilities: z.array(z.string()).default([]),
engineClass: z.enum(FLEET_ENGINE_CLASSES).optional(),
profile: z.string().optional(),
prefersEngine: z.array(z.string()).default([]),
allowedScope: z.array(z.string()).default([]),
deps: z.array(z.string()).default([]),
depsMode: z.enum(DEPS_MODES).optional(),
budget: BudgetSchema.optional(),
retry: RetryPolicySchema.optional(),
});
export const InsightsSchema = z.object({
model: z.string().optional(),
tokensIn: z.number().optional(),
tokensOut: z.number().optional(),
tokensCached: z.number().optional(),
costUsd: z.number().optional(),
estimated: z.boolean().optional(),
turns: z.number().optional(),
toolCalls: z.number().optional(),
filesChanged: z.number().optional(),
linesAdded: z.number().optional(),
linesDeleted: z.number().optional(),
});
// ── Container documents ───────────────────────────────────────────────────────
/**
* FleetJobDoc the durable job (pk `/productId`).
* `rev` is the optimistic-concurrency token used by the coordinator's atomic
* claim + fenced transitions (maps to Cosmos `_etag` / If-Match in production;
* see repository.revUpdate).
*/
export const FleetJobDocSchema = z.object({
id: z.string(),
productId: z.string().min(1),
stage: z.enum(FLEET_STAGES),
idempotencyKey: z.string().min(1),
contentHash: z.string(),
bodyMd: z.string(),
manifestSnapshot: ManifestSnapshotSchema,
priority: z.enum(FLEET_PRIORITIES),
priorityOrder: z.number().int(),
capabilities: z.array(z.string()).default([]),
engineClass: z.enum(FLEET_ENGINE_CLASSES).optional(),
profile: z.string().optional(),
deps: z.array(z.string()).default([]),
depsMode: z.enum(DEPS_MODES).optional(),
budget: BudgetSchema.optional(),
retry: RetryPolicySchema.optional(),
kind: z.enum(JOB_KINDS).default('leaf'),
parentId: z.string().optional(),
trackerItemId: z.string().optional(),
checkpoint: CheckpointSchema.optional(),
attempts: z.number().int().nonnegative().default(0),
leaseEpoch: z.number().int().nonnegative().default(0),
rev: z.number().int().nonnegative().default(0),
blockedReason: z.string().optional(),
createdAt: z.string(),
updatedAt: z.string(),
});
export type FleetJobDoc = z.infer<typeof FleetJobDocSchema>;
/** FleetRunDoc — one execution attempt of a job (pk `/jobId`). */
export const FleetRunDocSchema = z.object({
id: z.string(),
productId: z.string().min(1),
jobId: z.string().min(1),
attempt: z.number().int().positive(),
factoryId: z.string().optional(),
engine: z.string(),
profileSnapshot: z.record(z.string(), z.unknown()).optional(),
startedAt: z.string(),
endedAt: z.string().optional(),
exit: z.number().int().optional(),
verifyResult: z.enum(['pass', 'fail']).optional(),
result: z.enum(RUN_RESULTS).optional(),
insights: InsightsSchema.default({}),
});
export type FleetRunDoc = z.infer<typeof FleetRunDocSchema>;
/** FleetLeaseDoc — the single-holder lease for a job (pk `/jobId`). */
export const FleetLeaseDocSchema = z.object({
id: z.string(),
productId: z.string().min(1),
jobId: z.string().min(1),
holderFactoryId: z.string().optional(),
expiresAt: z.string().optional(),
leaseEpoch: z.number().int().nonnegative().default(0),
renewals: z.number().int().nonnegative().default(0),
status: z.enum(LEASE_STATUS).default('held'),
rev: z.number().int().nonnegative().default(0),
updatedAt: z.string(),
});
export type FleetLeaseDoc = z.infer<typeof FleetLeaseDocSchema>;
/** FleetFactoryDoc — a registered worker host (pk `/productId`). */
export const FleetFactoryDocSchema = z.object({
id: z.string(),
productId: z.string().min(1),
factoryId: z.string().min(1),
descriptor: z.record(z.string(), z.unknown()).default({}),
capabilities: z.array(z.string()).default([]),
health: z.enum(FACTORY_HEALTH).default('ok'),
load: z.number().nonnegative().default(0),
seatLimit: z.number().int().positive().default(1),
lastHeartbeatAt: z.string(),
});
export type FleetFactoryDoc = z.infer<typeof FleetFactoryDocSchema>;
/** FleetProfileDoc — an immutable, versioned profile snapshot (pk `/productId`). */
export const FleetProfileDocSchema = z.object({
id: z.string(),
productId: z.string().min(1),
name: z.string().min(1),
version: z.number().int().positive(),
snapshot: z.record(z.string(), z.unknown()),
createdAt: z.string(),
});
export type FleetProfileDoc = z.infer<typeof FleetProfileDocSchema>;
/** FleetEventDoc — append-only audit/event stream entry (pk `/jobId`). */
export const FleetEventDocSchema = z.object({
id: z.string(),
productId: z.string().min(1),
jobId: z.string().min(1),
seq: z.number().int().nonnegative(),
type: z.string().min(1),
at: z.string(),
actor: z.string().optional(),
data: z.record(z.string(), z.unknown()).default({}),
});
export type FleetEventDoc = z.infer<typeof FleetEventDocSchema>;
/** FleetArtifactDoc — pointer to a blob-stored artifact (pk `/jobId`). No inline logs. */
export const FleetArtifactDocSchema = z.object({
id: z.string(),
productId: z.string().min(1),
jobId: z.string().min(1),
runId: z.string().optional(),
kind: z.string().min(1),
blobUrl: z.string().min(1),
sizeBytes: z.number().int().nonnegative().optional(),
createdAt: z.string(),
});
export type FleetArtifactDoc = z.infer<typeof FleetArtifactDocSchema>;
// ── API request schemas (routes) ──────────────────────────────────────────────
export const SubmitJobSchema = z.object({
productId: z.string().min(1).optional(),
idempotencyKey: z.string().min(1),
bodyMd: z.string().min(1),
priority: z.enum(FLEET_PRIORITIES).default('medium'),
capabilities: z.array(z.string()).default([]),
engineClass: z.enum(FLEET_ENGINE_CLASSES).optional(),
profile: z.string().optional(),
prefersEngine: z.array(z.string()).default([]),
allowedScope: z.array(z.string()).default([]),
deps: z.array(z.string()).default([]),
depsMode: z.enum(DEPS_MODES).optional(),
budget: BudgetSchema.optional(),
retry: RetryPolicySchema.optional(),
kind: z.enum(JOB_KINDS).default('leaf'),
parentId: z.string().optional(),
trackerItemId: z.string().optional(),
});
export type SubmitJobInput = z.infer<typeof SubmitJobSchema>;
export const ListJobsQuerySchema = z.object({
productId: z.string().optional(),
stage: z.enum(FLEET_STAGES).optional(),
idempotencyKey: z.string().optional(),
limit: z.coerce.number().int().min(1).max(200).default(50),
offset: z.coerce.number().int().min(0).default(0),
});
export type ListJobsQuery = z.infer<typeof ListJobsQuerySchema>;
/** Fenced state transition — worker MUST carry its leaseEpoch. */
export const PatchJobSchema = z.object({
leaseEpoch: z.number().int().nonnegative(),
stage: z.enum(FLEET_STAGES).optional(),
checkpoint: CheckpointSchema.optional(),
blockedReason: z.string().optional(),
});
export type PatchJobInput = z.infer<typeof PatchJobSchema>;
export const ClaimSchema = z.object({
productId: z.string().min(1).optional(),
factoryId: z.string().min(1),
capabilities: z.array(z.string()).default([]),
leaseSeconds: z.number().int().positive().max(86400).default(900),
});
export type ClaimInput = z.infer<typeof ClaimSchema>;
export const RenewLeaseSchema = z.object({
leaseEpoch: z.number().int().nonnegative(),
leaseSeconds: z.number().int().positive().max(86400).default(900),
});
export type RenewLeaseInput = z.infer<typeof RenewLeaseSchema>;
export const ReleaseLeaseSchema = z.object({
leaseEpoch: z.number().int().nonnegative(),
stage: z.enum(FLEET_STAGES).optional(),
});
export type ReleaseLeaseInput = z.infer<typeof ReleaseLeaseSchema>;
export const HeartbeatSchema = z.object({
productId: z.string().min(1).optional(),
factoryId: z.string().min(1),
descriptor: z.record(z.string(), z.unknown()).optional(),
capabilities: z.array(z.string()).optional(),
health: z.enum(FACTORY_HEALTH).optional(),
load: z.number().nonnegative().optional(),
seatLimit: z.number().int().positive().optional(),
});
export type HeartbeatInput = z.infer<typeof HeartbeatSchema>;

View File

@ -63,6 +63,7 @@ import { licenseRoutes } from './modules/licenses/routes.js';
import { stripeRoutes } from './modules/stripe/routes.js';
import { settingsRoutes } from './modules/settings/routes.js';
import { itemRoutes } from './modules/items/routes.js';
import { fleetRoutes } from './modules/fleet/routes.js';
import { commentRoutes } from './modules/comments/routes.js';
import { voteRoutes } from './modules/votes/routes.js';
import { publicRoutes } from './modules/public/routes.js';
@ -207,6 +208,8 @@ await app.register(stripeRoutes, { prefix: '/api' });
await app.register(settingsRoutes, { prefix: '/api' });
// Tracker modules (merged from tracker-service)
await app.register(itemRoutes, { prefix: '/api' });
// Agent Gigafactory — fleet coordinator (jobs/claim/lease/fence/reaper)
await app.register(fleetRoutes, { prefix: '/api' });
await app.register(commentRoutes, { prefix: '/api' });
await app.register(voteRoutes, { prefix: '/api' });
// API tokens module