diff --git a/services/platform-service/src/lib/cosmos-init.ts b/services/platform-service/src/lib/cosmos-init.ts index 84d2a98a..a0e93928 100644 --- a/services/platform-service/src/lib/cosmos-init.ts +++ b/services/platform-service/src/lib/cosmos-init.ts @@ -187,6 +187,14 @@ const CONTAINER_DEFS: Record = { // i18n (P3) translations: { partitionKeyPath: '/locale' }, i18n_locales: { partitionKeyPath: '/locale' }, + // Agent Gigafactory — fleet coordinator (see modules/fleet/README.md) + fleet_jobs: { partitionKeyPath: '/productId' }, + fleet_runs: { partitionKeyPath: '/jobId' }, + fleet_leases: { partitionKeyPath: '/jobId' }, + fleet_factories: { partitionKeyPath: '/productId' }, + fleet_profiles: { partitionKeyPath: '/productId' }, + fleet_events: { partitionKeyPath: '/jobId' }, + fleet_artifacts: { partitionKeyPath: '/jobId' }, }; export async function initCosmosIfNeeded(): Promise { diff --git a/services/platform-service/src/modules/fleet/README.md b/services/platform-service/src/modules/fleet/README.md new file mode 100644 index 00000000..a9c21af0 --- /dev/null +++ b/services/platform-service/src/modules/fleet/README.md @@ -0,0 +1,79 @@ +# Fleet module — agent gigafactory coordinator (Phase 2 foundation) + +Product-agnostic, cloud-agnostic coordinator for distributed agent jobs. This is +the durable backend that supersedes the single-host stand-ins built in the +`agent-queue` (devops-tools) repo. Everything runs on the `@bytelyst/datastore` +abstraction, so all tests execute on `DB_PROVIDER=memory` (no Cosmos/network). + +Spec: `../learning_ai_devops_tools/agent-queue/docs/GIGAFACTORY_ROADMAP.md` +(§4 core contract, §7 scheduler/claim, §8 factory/lease/heartbeat, §13 containers, +§18 failure model, §25 durability/recovery, §26 insights). + +## Containers (partition keys) + +| Container | PK | Purpose | +| ----------------- | ------------ | --------------------------------------------------------------------------------------------------------------------------------------- | +| `fleet_jobs` | `/productId` | durable job: `manifestSnapshot`, verbatim `bodyMd`, `stage`, `idempotencyKey`, `deps`, `checkpoint`, `priority`, `rev`, `leaseEpoch`, … | +| `fleet_runs` | `/jobId` | one execution attempt: engine, timings, `result`, `insights` (tokens/cost/diff) | +| `fleet_leases` | `/jobId` | single-holder lease: `holderFactoryId`, `expiresAt`, `leaseEpoch`, `status` | +| `fleet_factories` | `/productId` | registered worker host: `capabilities`, `health`, `load`, `seatLimit`, `lastHeartbeatAt` | +| `fleet_profiles` | `/productId` | immutable, versioned profile snapshot | +| `fleet_events` | `/jobId` | append-only audit/event stream (monotonic `seq`) | +| `fleet_artifacts` | `/jobId` | pointers to blob-stored artifacts (no inline logs) | + +Every document carries `productId`. Containers are registered in +`lib/cosmos-init.ts`. + +## Concurrency protocol + +**Optimistic concurrency (`rev`).** Jobs and leases carry a monotonic `rev` token. +`repository.revUpdate*` is a compare-and-swap: it writes only if the stored `rev` +still equals the caller's expected `rev`, else it reports `conflict` and writes +nothing. In production (Cosmos) this maps to `_etag` / `If-Match`; on the memory +provider it is enforced by re-reading `rev` immediately before the write, which is +exact for the sequential calls the coordinator and tests make. + +**Atomic claim (`claimNextJob`).** Select the highest-priority, oldest job that is +`queued` (or `blocked` with now-satisfied deps) and whose `capabilities` are a +subset of the factory's, then `tryClaimJob` does a `rev` CAS to flip it to +`assigned` and acquire/create its lease. Under contention exactly one factory wins +the CAS; losers get `conflict` and re-select. No double-assignment, ever. + +**Leases + fencing.** Acquiring/reclaiming a lease increments `leaseEpoch`. Every +worker mutation (`patchJobFenced`, `renewLease`, `releaseLease`) carries its +`leaseEpoch`; a call whose epoch is `< job.leaseEpoch` is rejected (`fenced`) — a +stale/zombie worker can never overwrite a reassigned job. + +**Heartbeat.** `heartbeat(factory)` upserts `lastHeartbeatAt` + health/load; +`isFactoryStale` detects a missed-heartbeat factory. + +**Reaper.** `reapExpiredLeases(now)` scans `held` leases with `expiresAt < now`, +bumps `leaseEpoch` (fencing the dead holder), returns the job to `queued` (or +`blocked` if deps are now unmet) **preserving the `checkpoint` pointer** (resume +from WIP), and marks the lease `expired`. Idempotent — a reaped lease is no longer +`held`, so a second pass reaps nothing. Cosmos TTL cannot do this (it only deletes +the lease doc; it cannot requeue the job, bump the epoch, or keep the checkpoint), +so the reaper — not TTL — owns recovery. + +## Submit semantics (idempotency + deps) + +- same `idempotencyKey` + identical `bodyMd` → returns the existing job (dedup). +- same key + different content while still `queued`/`blocked` → supersede in place. +- same key + different content once past `queued` → `409 Conflict`. +- a job with unmet `deps` is `blocked` (a dep is met at `shipped`, or `testing` + when `depsMode: soft`); submit-time cycle detection rejects cyclic graphs. + +## REST (under `/api`, auth + productId) + +`POST /fleet/jobs` · `GET /fleet/jobs` · `GET /fleet/jobs/:id` · +`PATCH /fleet/jobs/:id` (fenced) · `POST /fleet/claim` · +`POST /fleet/jobs/:id/lease/renew` · `POST /fleet/jobs/:id/lease/release` · +`POST /fleet/factories/heartbeat` · `GET /fleet/jobs/:id/runs` · +`GET /fleet/jobs/:id/events`. + +## Files + +`types.ts` (Zod schemas → inferred types) · `repository.ts` (per-container repos + +`revUpdate` CAS) · `coordinator.ts` (claim/lease/fence/heartbeat/reaper + submit) · +`routes.ts` (REST) · `*.test.ts` (schema, repo, coordinator incl. the atomic-claim +race / fencing / reaper, and route inject tests). diff --git a/services/platform-service/src/modules/fleet/coordinator.test.ts b/services/platform-service/src/modules/fleet/coordinator.test.ts new file mode 100644 index 00000000..d24f0751 --- /dev/null +++ b/services/platform-service/src/modules/fleet/coordinator.test.ts @@ -0,0 +1,240 @@ +/** + * Fleet coordinator — the concurrency core. Memory provider; deterministic + * (race driven via the rev compare-and-swap, not real threads). + */ + +import { afterEach, beforeEach, describe, expect, it } from 'vitest'; +import { MemoryDatastoreProvider } from '@bytelyst/datastore'; +import { ConflictError, BadRequestError } from '@bytelyst/errors'; +import { _resetDatastoreProvider, setProvider } from '../../lib/datastore.js'; +import * as repo from './repository.js'; +import * as coord from './coordinator.js'; +import type { SubmitJobInput } from './types.js'; + +const PID = 'lysnrai'; + +function input(over: Partial = {}): SubmitJobInput { + return { + idempotencyKey: 'task-1', + bodyMd: '# do the thing', + priority: 'medium', + capabilities: [], + prefersEngine: [], + allowedScope: [], + deps: [], + kind: 'leaf', + ...over, + }; +} + +const factory = (over = {}) => ({ + productId: PID, + factoryId: 'fac_1', + capabilities: [] as string[], + leaseSeconds: 900, + ...over, +}); + +describe('fleet coordinator', () => { + beforeEach(() => setProvider(new MemoryDatastoreProvider())); + afterEach(() => _resetDatastoreProvider()); + + it('submit: new job is queued with no deps', async () => { + const { job, outcome } = await coord.submitJob(PID, input()); + expect(outcome).toBe('created'); + expect(job.stage).toBe('queued'); + expect(job.productId).toBe(PID); + }); + + // ── ATOMIC CLAIM RACE ── + it('atomic claim: two contenders on the same job version — exactly one wins', async () => { + const { job } = await coord.submitJob(PID, input()); + + // Both contenders see the SAME job version (rev). The CAS picks one winner. + const a = await coord.tryClaimJob(job, factory({ factoryId: 'fac_A' })); + const b = await coord.tryClaimJob(job, factory({ factoryId: 'fac_B' })); + + const oks = [a, b].filter(r => r.ok); + const conflicts = [a, b].filter(r => !r.ok); + expect(oks).toHaveLength(1); + expect(conflicts).toHaveLength(1); + expect(conflicts[0].ok === false && conflicts[0].reason).toBe('conflict'); + + // no double-assignment: the job is assigned exactly once, single run, single holder + const stored = await repo.getJob(job.id, PID); + expect(stored?.stage).toBe('assigned'); + expect(stored?.attempts).toBe(1); + expect(await repo.listRunsByJob(job.id)).toHaveLength(1); + const lease = await repo.getLease(job.id); + expect(lease?.leaseEpoch).toBe(1); + }); + + // ── PRIORITY + AGE SELECTION ── + it('claimNextJob returns highest-priority then oldest', async () => { + await coord.submitJob(PID, input({ idempotencyKey: 'low-old', priority: 'low' })); + await coord.submitJob(PID, input({ idempotencyKey: 'crit-new', priority: 'critical' })); + await coord.submitJob(PID, input({ idempotencyKey: 'med', priority: 'medium' })); + + const claim = await coord.claimNextJob(factory()); + expect(claim?.job.idempotencyKey).toBe('crit-new'); + }); + + it('claimNextJob respects capability subset', async () => { + await coord.submitJob(PID, input({ idempotencyKey: 'needs-mac', capabilities: ['os:mac'] })); + const noCaps = await coord.claimNextJob(factory({ capabilities: [] })); + expect(noCaps).toBeNull(); + const withCaps = await coord.claimNextJob(factory({ capabilities: ['os:mac', 'has:git'] })); + expect(withCaps?.job.idempotencyKey).toBe('needs-mac'); + }); + + // ── DEPS GATING ── + it('deps: a job with unmet deps is blocked and not claimable until the dep ships', async () => { + await coord.submitJob(PID, input({ idempotencyKey: 'A' })); + const { job: b } = await coord.submitJob(PID, input({ idempotencyKey: 'B', deps: ['A'] })); + expect(b.stage).toBe('blocked'); + + // first claim returns A (B is blocked) + const first = await coord.claimNextJob(factory()); + expect(first?.job.idempotencyKey).toBe('A'); + // B still not claimable + expect(await coord.claimNextJob(factory())).toBeNull(); + + // ship A → B becomes claimable + const a = await repo.getJob(first!.job.id, PID); + await repo.updateJob(a!.id, PID, { stage: 'shipped' }); + const second = await coord.claimNextJob(factory()); + expect(second?.job.idempotencyKey).toBe('B'); + }); + + it('deps-mode soft: dep satisfied when dependency is in testing', async () => { + await coord.submitJob(PID, input({ idempotencyKey: 'A' })); + const { job: c } = await coord.submitJob( + PID, + input({ idempotencyKey: 'C', deps: ['A'], depsMode: 'soft' }) + ); + expect(c.stage).toBe('blocked'); + const a = (await repo.findJobsByIdempotencyKey(PID, 'A'))[0]; + await repo.updateJob(a.id, PID, { stage: 'testing' }); + const unmet = await coord.unmetDeps((await repo.getJob(c.id, PID))!); + expect(unmet).toEqual([]); + }); + + it('cycle detection: a cyclic submit is rejected', async () => { + await coord.submitJob(PID, input({ idempotencyKey: 'B', deps: ['A'] })); + await expect( + coord.submitJob(PID, input({ idempotencyKey: 'A', deps: ['B'] })) + ).rejects.toBeInstanceOf(BadRequestError); + }); + + // ── FENCING ── + it('fencing: a stale leaseEpoch is rejected; the current epoch succeeds', async () => { + const { job } = await coord.submitJob(PID, input()); + const claim = await coord.claimNextJob(factory()); + expect(claim).not.toBeNull(); + const epoch = claim!.job.leaseEpoch; // 1 + + const stale = await coord.patchJobFenced(job.id, PID, { + leaseEpoch: epoch - 1, + stage: 'building', + }); + expect(stale.ok).toBe(false); + if (!stale.ok) expect(stale.reason).toBe('fenced'); + + const current = await coord.patchJobFenced(job.id, PID, { + leaseEpoch: epoch, + stage: 'building', + }); + expect(current.ok).toBe(true); + if (current.ok) expect(current.doc.stage).toBe('building'); + }); + + // ── REAPER ── + it('reaper: expired lease returns the job to queued, bumps epoch, preserves checkpoint; idempotent', async () => { + const { job } = await coord.submitJob(PID, input()); + const claim = await coord.claimNextJob(factory()); + const epoch0 = claim!.job.leaseEpoch; + + // worker checkpoints WIP, then dies; force the lease to be expired + await coord.patchJobFenced(job.id, PID, { + leaseEpoch: epoch0, + stage: 'building', + checkpoint: { wipBranch: 'aq/wip/x', wipBase: 'base1', wipCommit: 'c1' }, + }); + const lease = await repo.getLease(job.id); + await repo.revUpdateLease(job.id, lease!.rev, { expiresAt: '2000-01-01T00:00:00.000Z' }); + + const res = await coord.reapExpiredLeases(new Date().toISOString()); + expect(res.reaped).toBe(1); + + const reclaimed = await repo.getJob(job.id, PID); + expect(reclaimed?.stage).toBe('queued'); + expect(reclaimed?.leaseEpoch).toBe(epoch0 + 1); // fenced + expect(reclaimed?.checkpoint?.wipBranch).toBe('aq/wip/x'); // preserved + expect((await repo.getLease(job.id))?.status).toBe('expired'); + + // idempotent — running again reaps nothing + const again = await coord.reapExpiredLeases(new Date().toISOString()); + expect(again.reaped).toBe(0); + }); + + it('reaper: the zombie (old epoch) is fenced out after reclaim', async () => { + const { job } = await coord.submitJob(PID, input()); + const claim = await coord.claimNextJob(factory()); + const zombieEpoch = claim!.job.leaseEpoch; + await coord.patchJobFenced(job.id, PID, { leaseEpoch: zombieEpoch, stage: 'building' }); + const lease = await repo.getLease(job.id); + await repo.revUpdateLease(job.id, lease!.rev, { expiresAt: '2000-01-01T00:00:00.000Z' }); + await coord.reapExpiredLeases(new Date().toISOString()); + + // zombie tries to write with its now-stale epoch -> fenced + const zombie = await coord.patchJobFenced(job.id, PID, { + leaseEpoch: zombieEpoch, + stage: 'shipped', + }); + expect(zombie.ok).toBe(false); + if (!zombie.ok) expect(zombie.reason).toBe('fenced'); + }); + + // ── HEARTBEAT ── + it('heartbeat updates lastHeartbeatAt/health; staleness is detectable', async () => { + await coord.heartbeat({ + productId: PID, + factoryId: 'fac_1', + capabilities: ['os:mac'], + health: 'ok', + load: 1, + }); + const fac = await repo.getFactory('fac_1', PID); + expect(fac?.health).toBe('ok'); + expect(fac?.capabilities).toEqual(['os:mac']); + expect(coord.isFactoryStale(fac!, Date.now(), 60_000)).toBe(false); + const oldNow = new Date(fac!.lastHeartbeatAt).getTime() + 120_000; + expect(coord.isFactoryStale(fac!, oldNow, 60_000)).toBe(true); + }); + + // ── IDEMPOTENCY ── + it('idempotent submit: same key+content => 1 job', async () => { + const a = await coord.submitJob(PID, input({ idempotencyKey: 'k', bodyMd: 'same' })); + const b = await coord.submitJob(PID, input({ idempotencyKey: 'k', bodyMd: 'same' })); + expect(b.outcome).toBe('deduplicated'); + expect(b.job.id).toBe(a.job.id); + expect(await repo.findJobsByIdempotencyKey(PID, 'k')).toHaveLength(1); + }); + + it('idempotent submit: same key+changed content while queued => superseded', async () => { + const a = await coord.submitJob(PID, input({ idempotencyKey: 'k', bodyMd: 'v1' })); + const b = await coord.submitJob(PID, input({ idempotencyKey: 'k', bodyMd: 'v2' })); + expect(b.outcome).toBe('superseded'); + expect(b.job.id).toBe(a.job.id); + expect(b.job.bodyMd).toBe('v2'); + expect(await repo.findJobsByIdempotencyKey(PID, 'k')).toHaveLength(1); + }); + + it('idempotent submit: same key+changed content past queued => 409', async () => { + await coord.submitJob(PID, input({ idempotencyKey: 'k', bodyMd: 'v1' })); + await coord.claimNextJob(factory()); // job now assigned (past queued) + await expect( + coord.submitJob(PID, input({ idempotencyKey: 'k', bodyMd: 'v2' })) + ).rejects.toBeInstanceOf(ConflictError); + }); +}); diff --git a/services/platform-service/src/modules/fleet/coordinator.ts b/services/platform-service/src/modules/fleet/coordinator.ts new file mode 100644 index 00000000..1d259c54 --- /dev/null +++ b/services/platform-service/src/modules/fleet/coordinator.ts @@ -0,0 +1,503 @@ +/** + * Fleet coordinator — the concurrency core (Phase 2 §4/§7/§8/§18/§25). + * + * Responsibilities: + * - submitJob idempotent submit + submit-time dependency cycle detection + * - claimNextJob atomic, single-winner claim (priority+age, deps + capability + * gated) via a rev compare-and-swap, then lease acquisition + * - patchJobFenced fenced state transition (rejects a stale leaseEpoch) + * - renewLease / releaseLease + * - heartbeat factory liveness + * - reapExpiredLeases reclaim dead-worker jobs (bumps leaseEpoch to fence the + * zombie, returns the job to queued/blocked, preserves checkpoint) + * + * Concurrency model: `rev` is an optimistic-concurrency token on jobs + leases. + * Every contended mutation goes through repository.revUpdate*, which writes only + * if the stored `rev` still matches — so under contention EXACTLY ONE caller wins + * and losers get a `conflict` and retry. Maps to Cosmos `_etag` / If-Match in prod. + */ + +import { createHash } from 'node:crypto'; +import { BadRequestError, ConflictError } from '../../lib/errors.js'; +import * as repo from './repository.js'; +import { + ACTIVE_STAGES, + DEP_DONE_HARD, + DEP_DONE_SOFT, + PRIORITY_ORDER, + type FleetJobDoc, + type FleetLeaseDoc, + type FleetRunDoc, + type FleetStage, + type SubmitJobInput, +} from './types.js'; + +const CLAIM_MAX_RETRIES = 8; + +export function contentHash(bodyMd: string): string { + return createHash('sha256').update(bodyMd).digest('hex'); +} + +/** Every required capability token must be advertised by the factory. */ +export function capabilitiesSubset(required: string[], available: string[]): boolean { + const set = new Set(available); + return required.every(token => set.has(token)); +} + +// ── Dependency evaluation (§5) ──────────────────────────────────────────────── + +/** Unmet dependency keys for a job given the current store state. */ +export async function unmetDeps(job: FleetJobDoc): Promise { + if (!job.deps || job.deps.length === 0) return []; + const done = job.depsMode === 'soft' ? DEP_DONE_SOFT : DEP_DONE_HARD; + const unmet: string[] = []; + for (const depKey of job.deps) { + const matches = await repo.findJobsByIdempotencyKey(job.productId, depKey); + const satisfied = matches.some(m => done.includes(m.stage)); + if (!satisfied) unmet.push(depKey); + } + return unmet; +} + +async function stageForDeps(job: FleetJobDoc): Promise<{ stage: FleetStage; unmet: string[] }> { + const unmet = await unmetDeps(job); + return { stage: unmet.length > 0 ? 'blocked' : 'queued', unmet }; +} + +/** + * Submit-time cycle detection: would a job with `newKey` depending on `newDeps` + * create a cycle in the idempotency-key dependency graph (existing jobs + new)? + */ +export async function wouldCreateCycle( + productId: string, + newKey: string, + newDeps: string[] +): Promise { + if (newDeps.includes(newKey)) return true; // self-dependency + const visited = new Set(); + let frontier = [...newDeps]; + while (frontier.length > 0) { + const next: string[] = []; + for (const key of frontier) { + if (key === newKey) return true; // reached back to the new node + if (visited.has(key)) continue; + visited.add(key); + const matches = await repo.findJobsByIdempotencyKey(productId, key); + for (const m of matches) next.push(...m.deps); + } + frontier = next; + } + return false; +} + +// ── Submit (idempotent) ─────────────────────────────────────────────────────── + +export interface SubmitResult { + job: FleetJobDoc; + outcome: 'created' | 'deduplicated' | 'superseded'; +} + +export async function submitJob(productId: string, input: SubmitJobInput): Promise { + const hash = contentHash(input.bodyMd); + const existingForKey = await repo.findJobsByIdempotencyKey(productId, input.idempotencyKey); + + // Idempotency (§4): same key + identical content => return existing (no dup). + const identical = existingForKey.find(j => j.contentHash === hash); + if (identical) return { job: identical, outcome: 'deduplicated' }; + + if (existingForKey.length > 0) { + // same key, different content + const supersedable = existingForKey.find(j => j.stage === 'queued' || j.stage === 'blocked'); + if (!supersedable) { + throw new ConflictError( + `idempotency-key '${input.idempotencyKey}' already in use by an in-flight/terminal job with different content` + ); + } + // supersede the still-queued/blocked job in place + if (await wouldCreateCycle(productId, input.idempotencyKey, input.deps)) { + throw new BadRequestError('dependency cycle detected — submission rejected'); + } + const refreshed = applyInputToJob(supersedable, input, hash); + const { stage } = await stageForDeps(refreshed); + const updated = await repo.updateJob(supersedable.id, productId, { + ...stripIdentity(refreshed), + stage, + }); + await repo.appendEvent({ + jobId: supersedable.id, + productId, + type: 'superseded', + data: { idempotencyKey: input.idempotencyKey }, + }); + return { job: updated ?? supersedable, outcome: 'superseded' }; + } + + // brand-new job + if (await wouldCreateCycle(productId, input.idempotencyKey, input.deps)) { + throw new BadRequestError('dependency cycle detected — submission rejected'); + } + const now = new Date().toISOString(); + const id = `fjob_${crypto.randomUUID()}`; + const base: FleetJobDoc = { + id, + productId, + stage: 'queued', + idempotencyKey: input.idempotencyKey, + contentHash: hash, + bodyMd: input.bodyMd, + manifestSnapshot: { + priority: input.priority, + capabilities: input.capabilities, + engineClass: input.engineClass, + profile: input.profile, + prefersEngine: input.prefersEngine, + allowedScope: input.allowedScope, + deps: input.deps, + depsMode: input.depsMode, + budget: input.budget, + retry: input.retry, + }, + priority: input.priority, + priorityOrder: PRIORITY_ORDER[input.priority], + capabilities: input.capabilities, + engineClass: input.engineClass, + profile: input.profile, + deps: input.deps, + depsMode: input.depsMode, + budget: input.budget, + retry: input.retry, + kind: input.kind, + parentId: input.parentId, + trackerItemId: input.trackerItemId, + attempts: 0, + leaseEpoch: 0, + rev: 0, + createdAt: now, + updatedAt: now, + }; + const { stage } = await stageForDeps(base); + base.stage = stage; + const created = await repo.createJob(base); + await repo.appendEvent({ + jobId: id, + productId, + type: 'submitted', + data: { stage, idempotencyKey: input.idempotencyKey }, + }); + return { job: created, outcome: 'created' }; +} + +function applyInputToJob(job: FleetJobDoc, input: SubmitJobInput, hash: string): FleetJobDoc { + return { + ...job, + contentHash: hash, + bodyMd: input.bodyMd, + priority: input.priority, + priorityOrder: PRIORITY_ORDER[input.priority], + capabilities: input.capabilities, + engineClass: input.engineClass, + profile: input.profile, + deps: input.deps, + depsMode: input.depsMode, + budget: input.budget, + retry: input.retry, + manifestSnapshot: { + priority: input.priority, + capabilities: input.capabilities, + engineClass: input.engineClass, + profile: input.profile, + prefersEngine: input.prefersEngine, + allowedScope: input.allowedScope, + deps: input.deps, + depsMode: input.depsMode, + budget: input.budget, + retry: input.retry, + }, + }; +} + +function stripIdentity(job: FleetJobDoc): Partial { + const { id: _id, productId: _pid, createdAt: _c, rev: _r, ...rest } = job; + return rest; +} + +// ── Atomic claim (§4/§7) ──────────────────────────────────────────────────── + +export interface ClaimContext { + productId: string; + factoryId: string; + capabilities: string[]; + leaseSeconds: number; +} + +export interface ClaimResult { + job: FleetJobDoc; + lease: FleetLeaseDoc; + run: FleetRunDoc; +} + +/** A job is eligible for a factory iff queued/blocked-with-met-deps + caps subset. */ +async function eligibleForClaim(job: FleetJobDoc, factoryCaps: string[]): Promise { + if (job.stage !== 'queued' && job.stage !== 'blocked') return false; + if (!capabilitiesSubset(job.capabilities, factoryCaps)) return false; + const unmet = await unmetDeps(job); + return unmet.length === 0; +} + +/** + * Try to claim ONE specific job version. The rev compare-and-swap is the single + * point at which exactly one contender wins; a stale `rev` => conflict (no write, + * no double-assignment). + */ +export async function tryClaimJob( + job: FleetJobDoc, + ctx: ClaimContext +): Promise> { + const newEpoch = job.leaseEpoch + 1; + const attempt = job.attempts + 1; + const claimed = await repo.revUpdateJob(job.id, ctx.productId, job.rev, { + stage: 'assigned', + leaseEpoch: newEpoch, + attempts: attempt, + }); + if (!claimed.ok) return claimed; + + const now = Date.now(); + const expiresAt = new Date(now + ctx.leaseSeconds * 1000).toISOString(); + const nowIso = new Date(now).toISOString(); + + const existingLease = await repo.getLease(job.id); + let lease: FleetLeaseDoc; + if (existingLease) { + const updated = await repo.revUpdateLease(job.id, existingLease.rev, { + holderFactoryId: ctx.factoryId, + expiresAt, + leaseEpoch: newEpoch, + renewals: 0, + status: 'held', + }); + lease = updated.ok ? updated.doc : existingLease; + } else { + lease = await repo.createLease({ + id: job.id, + productId: ctx.productId, + jobId: job.id, + holderFactoryId: ctx.factoryId, + expiresAt, + leaseEpoch: newEpoch, + renewals: 0, + status: 'held', + rev: 0, + updatedAt: nowIso, + }); + } + + const run = await repo.createRun({ + id: `${job.id}:run:${attempt}`, + productId: ctx.productId, + jobId: job.id, + attempt, + factoryId: ctx.factoryId, + engine: job.engineClass ?? 'unknown', + startedAt: nowIso, + insights: {}, + }); + + await repo.appendEvent({ + jobId: job.id, + productId: ctx.productId, + type: 'assigned', + actor: ctx.factoryId, + data: { leaseEpoch: newEpoch, attempt }, + }); + + return { ok: true, doc: { job: claimed.doc, lease, run } }; +} + +/** Select the highest-priority, oldest eligible job and atomically claim it. */ +export async function claimNextJob(ctx: ClaimContext): Promise { + for (let i = 0; i < CLAIM_MAX_RETRIES; i++) { + const candidates = await repo.listJobs({ productId: ctx.productId }); + const eligible: FleetJobDoc[] = []; + for (const job of candidates) { + if (await eligibleForClaim(job, ctx.capabilities)) eligible.push(job); + } + if (eligible.length === 0) return null; + eligible.sort( + (a, b) => a.priorityOrder - b.priorityOrder || a.createdAt.localeCompare(b.createdAt) + ); + const result = await tryClaimJob(eligible[0], ctx); + if (result.ok) return result.doc; + if (result.reason === 'not_found') continue; + // conflict: another factory won this version — re-select and retry + } + return null; +} + +// ── Fenced transitions + leases (§4/§8) ────────────────────────────────────── + +export type FenceResult = + | { ok: true; doc: T } + | { ok: false; reason: 'not_found' | 'fenced' | 'conflict' }; + +function fenced(job: FleetJobDoc, leaseEpoch: number): boolean { + // a call older than the current epoch is a stale/zombie worker — reject it + return leaseEpoch < job.leaseEpoch; +} + +export interface PatchJobInputInternal { + leaseEpoch: number; + stage?: FleetStage; + checkpoint?: FleetJobDoc['checkpoint']; + blockedReason?: string; +} + +export async function patchJobFenced( + jobId: string, + productId: string, + patch: PatchJobInputInternal +): Promise> { + const job = await repo.getJob(jobId, productId); + if (!job) return { ok: false, reason: 'not_found' }; + if (fenced(job, patch.leaseEpoch)) return { ok: false, reason: 'fenced' }; + + const updates: Partial = {}; + if (patch.stage) updates.stage = patch.stage; + if (patch.checkpoint) updates.checkpoint = patch.checkpoint; + if (patch.blockedReason !== undefined) updates.blockedReason = patch.blockedReason; + + const res = await repo.revUpdateJob(jobId, productId, job.rev, updates); + if (!res.ok) return { ok: false, reason: res.reason === 'not_found' ? 'not_found' : 'conflict' }; + await repo.appendEvent({ + jobId, + productId, + type: 'transition', + data: { stage: patch.stage ?? job.stage, leaseEpoch: patch.leaseEpoch }, + }); + return { ok: true, doc: res.doc }; +} + +export async function renewLease( + jobId: string, + productId: string, + leaseEpoch: number, + leaseSeconds: number +): Promise> { + const job = await repo.getJob(jobId, productId); + if (!job) return { ok: false, reason: 'not_found' }; + if (fenced(job, leaseEpoch)) return { ok: false, reason: 'fenced' }; + const lease = await repo.getLease(jobId); + if (!lease) return { ok: false, reason: 'not_found' }; + const expiresAt = new Date(Date.now() + leaseSeconds * 1000).toISOString(); + const res = await repo.revUpdateLease(jobId, lease.rev, { + expiresAt, + renewals: lease.renewals + 1, + status: 'held', + }); + if (!res.ok) return { ok: false, reason: 'conflict' }; + await repo.appendEvent({ jobId, productId, type: 'lease_renewed', data: { leaseEpoch } }); + return { ok: true, doc: res.doc }; +} + +export async function releaseLease( + jobId: string, + productId: string, + leaseEpoch: number, + stage?: FleetStage +): Promise> { + const job = await repo.getJob(jobId, productId); + if (!job) return { ok: false, reason: 'not_found' }; + if (fenced(job, leaseEpoch)) return { ok: false, reason: 'fenced' }; + const lease = await repo.getLease(jobId); + if (!lease) return { ok: false, reason: 'not_found' }; + const res = await repo.revUpdateLease(jobId, lease.rev, { status: 'released' }); + if (!res.ok) return { ok: false, reason: 'conflict' }; + if (stage) await repo.revUpdateJob(jobId, productId, job.rev, { stage }); + await repo.appendEvent({ jobId, productId, type: 'lease_released', data: { leaseEpoch, stage } }); + return { ok: true, doc: res.doc }; +} + +// ── Heartbeat (§8) ──────────────────────────────────────────────────────────── + +export interface HeartbeatContext { + productId: string; + factoryId: string; + descriptor?: Record; + capabilities?: string[]; + health?: FleetFactoryHealthInput; + load?: number; + seatLimit?: number; +} +type FleetFactoryHealthInput = 'ok' | 'degraded' | 'down'; + +export async function heartbeat(ctx: HeartbeatContext): Promise> { + const now = new Date().toISOString(); + const existing = await repo.getFactory(ctx.factoryId, ctx.productId); + await repo.upsertFactory({ + id: ctx.factoryId, + productId: ctx.productId, + factoryId: ctx.factoryId, + descriptor: ctx.descriptor ?? existing?.descriptor ?? {}, + capabilities: ctx.capabilities ?? existing?.capabilities ?? [], + health: ctx.health ?? existing?.health ?? 'ok', + load: ctx.load ?? existing?.load ?? 0, + seatLimit: ctx.seatLimit ?? existing?.seatLimit ?? 1, + lastHeartbeatAt: now, + }); + return { ok: true, doc: true }; +} + +/** A factory is stale if its last heartbeat is older than `maxAgeMs`. */ +export function isFactoryStale( + factory: { lastHeartbeatAt: string }, + nowMs: number, + maxAgeMs: number +): boolean { + return nowMs - new Date(factory.lastHeartbeatAt).getTime() > maxAgeMs; +} + +// ── Reaper (§25.3) ──────────────────────────────────────────────────────────── + +export interface ReapResult { + reaped: number; + jobIds: string[]; +} + +/** + * Reclaim jobs whose lease has expired. Cosmos TTL would only DELETE the lease + * doc — it cannot return the job to `queued`, bump the fencing epoch, or preserve + * the checkpoint — so the reaper (not TTL) owns recovery. Idempotent: a reaped + * lease becomes `expired` and is no longer selected by listExpiredLeases. + */ +export async function reapExpiredLeases(nowIso: string): Promise { + const expired = await repo.listExpiredLeases(nowIso); + const jobIds: string[] = []; + for (const lease of expired) { + const job = await repo.getJob(lease.jobId, lease.productId); + if (!job) continue; + if (!ACTIVE_STAGES.includes(job.stage)) continue; // already resting; nothing to reclaim + const newEpoch = job.leaseEpoch + 1; // fence the zombie holder + const unmet = await unmetDeps(job); + const stage: FleetStage = unmet.length > 0 ? 'blocked' : 'queued'; + // checkpoint pointer is intentionally preserved on the job (resume-friendly) + const jobRes = await repo.revUpdateJob(job.id, lease.productId, job.rev, { + stage, + leaseEpoch: newEpoch, + blockedReason: unmet.length > 0 ? `waiting on: ${unmet.join(', ')}` : undefined, + }); + if (!jobRes.ok) continue; + await repo.revUpdateLease(lease.jobId, lease.rev, { + status: 'expired', + leaseEpoch: newEpoch, + holderFactoryId: undefined, + }); + await repo.appendEvent({ + jobId: job.id, + productId: lease.productId, + type: 'lease_expired', + data: { leaseEpoch: newEpoch, returnedTo: stage }, + }); + jobIds.push(job.id); + } + return { reaped: jobIds.length, jobIds }; +} diff --git a/services/platform-service/src/modules/fleet/repository.test.ts b/services/platform-service/src/modules/fleet/repository.test.ts new file mode 100644 index 00000000..cdf5b6b5 --- /dev/null +++ b/services/platform-service/src/modules/fleet/repository.test.ts @@ -0,0 +1,187 @@ +/** + * Fleet repository — CRUD round-trips, list filters, appendEvent ordering, and + * the rev compare-and-swap. Runs on the in-memory datastore provider. + */ + +import { afterEach, beforeEach, describe, expect, it } from 'vitest'; +import { MemoryDatastoreProvider } from '@bytelyst/datastore'; +import { _resetDatastoreProvider, setProvider } from '../../lib/datastore.js'; +import * as repo from './repository.js'; +import type { FleetJobDoc } from './types.js'; + +const PID = 'lysnrai'; +const now = '2026-05-30T00:00:00.000Z'; + +function jobDoc(over: Partial = {}): FleetJobDoc { + return { + id: 'fjob_1', + productId: PID, + stage: 'queued', + idempotencyKey: 'task-1', + contentHash: 'h1', + bodyMd: '# task', + manifestSnapshot: { + priority: 'medium', + capabilities: [], + prefersEngine: [], + allowedScope: [], + deps: [], + }, + priority: 'medium', + priorityOrder: 2, + capabilities: [], + deps: [], + kind: 'leaf', + attempts: 0, + leaseEpoch: 0, + rev: 0, + createdAt: now, + updatedAt: now, + ...over, + }; +} + +describe('fleet repository', () => { + beforeEach(() => setProvider(new MemoryDatastoreProvider())); + afterEach(() => _resetDatastoreProvider()); + + it('jobs: create / getById / list by stage + idempotencyKey', async () => { + await repo.createJob(jobDoc({ id: 'fjob_1', idempotencyKey: 'a', stage: 'queued' })); + await repo.createJob(jobDoc({ id: 'fjob_2', idempotencyKey: 'b', stage: 'blocked' })); + + expect((await repo.getJob('fjob_1', PID))?.idempotencyKey).toBe('a'); + expect(await repo.getJob('missing', PID)).toBeNull(); + + const queued = await repo.listJobs({ productId: PID, stage: 'queued' }); + expect(queued.map(j => j.id)).toEqual(['fjob_1']); + + const byKey = await repo.findJobsByIdempotencyKey(PID, 'b'); + expect(byKey).toHaveLength(1); + expect(byKey[0].id).toBe('fjob_2'); + }); + + it('jobs: revUpdate is a compare-and-swap', async () => { + await repo.createJob(jobDoc({ id: 'fjob_cas', rev: 0, stage: 'queued' })); + + const first = await repo.revUpdateJob('fjob_cas', PID, 0, { stage: 'assigned' }); + expect(first.ok).toBe(true); + if (first.ok) expect(first.doc.rev).toBe(1); + + // a second writer using the stale rev (0) must conflict + const stale = await repo.revUpdateJob('fjob_cas', PID, 0, { stage: 'building' }); + expect(stale.ok).toBe(false); + if (!stale.ok) expect(stale.reason).toBe('conflict'); + + const missing = await repo.revUpdateJob('nope', PID, 0, { stage: 'failed' }); + expect(missing.ok).toBe(false); + if (!missing.ok) expect(missing.reason).toBe('not_found'); + }); + + it('runs: create + list ordered by attempt', async () => { + await repo.createRun({ + id: 'r2', + productId: PID, + jobId: 'j', + attempt: 2, + engine: 'devin', + startedAt: now, + insights: {}, + }); + await repo.createRun({ + id: 'r1', + productId: PID, + jobId: 'j', + attempt: 1, + engine: 'devin', + startedAt: now, + insights: {}, + }); + const runs = await repo.listRunsByJob('j'); + expect(runs.map(r => r.attempt)).toEqual([1, 2]); + }); + + it('leases: create + revUpdate', async () => { + await repo.createLease({ + id: 'j', + productId: PID, + jobId: 'j', + leaseEpoch: 1, + renewals: 0, + status: 'held', + rev: 0, + updatedAt: now, + }); + const res = await repo.revUpdateLease('j', 0, { renewals: 1 }); + expect(res.ok).toBe(true); + if (res.ok) expect(res.doc.renewals).toBe(1); + expect((await repo.getLease('j'))?.rev).toBe(1); + }); + + it('factories: upsert + list by productId', async () => { + await repo.upsertFactory({ + id: 'fac_1', + productId: PID, + factoryId: 'fac_1', + descriptor: {}, + capabilities: ['os:mac'], + health: 'ok', + load: 0, + seatLimit: 1, + lastHeartbeatAt: now, + }); + await repo.upsertFactory({ + id: 'fac_1', + productId: PID, + factoryId: 'fac_1', + descriptor: {}, + capabilities: ['os:mac'], + health: 'degraded', + load: 2, + seatLimit: 1, + lastHeartbeatAt: now, + }); + const list = await repo.listFactories(PID); + expect(list).toHaveLength(1); + expect(list[0].health).toBe('degraded'); + }); + + it('profiles: create + get', async () => { + await repo.createProfile({ + id: 'prof_1', + productId: PID, + name: 'backend', + version: 1, + snapshot: { persona: 'x' }, + createdAt: now, + }); + expect((await repo.getProfile('prof_1', PID))?.name).toBe('backend'); + }); + + it('events: appendEvent yields an ordered, append-only stream', async () => { + await repo.appendEvent({ jobId: 'j', productId: PID, type: 'submitted' }); + await repo.appendEvent({ jobId: 'j', productId: PID, type: 'assigned', actor: 'fac_1' }); + await repo.appendEvent({ + jobId: 'j', + productId: PID, + type: 'transition', + data: { stage: 'building' }, + }); + const events = await repo.listEvents('j'); + expect(events.map(e => e.seq)).toEqual([0, 1, 2]); + expect(events.map(e => e.type)).toEqual(['submitted', 'assigned', 'transition']); + }); + + it('artifacts: create + list', async () => { + await repo.createArtifact({ + id: 'art_1', + productId: PID, + jobId: 'j', + kind: 'coverage', + blobUrl: 'https://b/x', + createdAt: now, + }); + const arts = await repo.listArtifacts('j'); + expect(arts).toHaveLength(1); + expect(arts[0].blobUrl).toBe('https://b/x'); + }); +}); diff --git a/services/platform-service/src/modules/fleet/repository.ts b/services/platform-service/src/modules/fleet/repository.ts new file mode 100644 index 00000000..8f2cee4a --- /dev/null +++ b/services/platform-service/src/modules/fleet/repository.ts @@ -0,0 +1,257 @@ +/** + * Fleet repositories — one per fleet_* container, cloud-agnostic via @bytelyst/datastore. + * + * Partition keys (see lib/cosmos-init.ts): + * fleet_jobs /productId fleet_runs /jobId + * fleet_leases /jobId fleet_factories /productId + * fleet_profiles /productId fleet_events /jobId + * fleet_artifacts /jobId + * + * Optimistic concurrency: jobs and leases carry a monotonic `rev` token. + * `revUpdate*` is a compare-and-swap — it writes only when the stored `rev` + * still equals the caller's expected `rev`, otherwise it reports a conflict + * WITHOUT writing. In production (Cosmos) this maps to an `_etag` / If-Match + * conditional replace; on the memory provider it is enforced by re-reading the + * current `rev` immediately before the write, which is exact for the sequential + * calls the coordinator + tests make (see coordinator.claimNextJob). + */ + +import type { DocumentCollection } from '@bytelyst/datastore'; +import { getCollection } from '../../lib/datastore.js'; +import type { + FleetArtifactDoc, + FleetEventDoc, + FleetFactoryDoc, + FleetJobDoc, + FleetLeaseDoc, + FleetProfileDoc, + FleetRunDoc, + FleetStage, +} from './types.js'; + +// ── Collections ─────────────────────────────────────────────────────────────── + +function jobs(): DocumentCollection { + return getCollection('fleet_jobs', '/productId'); +} +function runs(): DocumentCollection { + return getCollection('fleet_runs', '/jobId'); +} +function leases(): DocumentCollection { + return getCollection('fleet_leases', '/jobId'); +} +function factories(): DocumentCollection { + return getCollection('fleet_factories', '/productId'); +} +function profiles(): DocumentCollection { + return getCollection('fleet_profiles', '/productId'); +} +function events(): DocumentCollection { + return getCollection('fleet_events', '/jobId'); +} +function artifacts(): DocumentCollection { + return getCollection('fleet_artifacts', '/jobId'); +} + +/** Result of a compare-and-swap update. */ +export type RevResult = { ok: true; doc: T } | { ok: false; reason: 'not_found' | 'conflict' }; + +// ── Jobs ────────────────────────────────────────────────────────────────────── + +export async function createJob(doc: FleetJobDoc): Promise { + return jobs().create(doc); +} + +export async function getJob(id: string, productId: string): Promise { + return jobs().findById(id, productId); +} + +export interface ListJobsFilter { + productId: string; + stage?: FleetStage; + idempotencyKey?: string; + limit?: number; + offset?: number; +} + +export async function listJobs(f: ListJobsFilter): Promise { + const filter: Record = { productId: f.productId }; + if (f.stage) filter.stage = f.stage; + if (f.idempotencyKey) filter.idempotencyKey = f.idempotencyKey; + return jobs().findMany({ + filter, + sort: { priorityOrder: 1, createdAt: 1 }, + offset: f.offset, + limit: f.limit, + }); +} + +/** All jobs for a product sharing an idempotency-key (dedupe lookups). */ +export async function findJobsByIdempotencyKey( + productId: string, + idempotencyKey: string +): Promise { + return jobs().findMany({ filter: { productId, idempotencyKey } }); +} + +/** Unconditional merge update (use only when concurrency is not contended). */ +export async function updateJob( + id: string, + productId: string, + updates: Partial +): Promise { + const cur = await jobs().findById(id, productId); + if (!cur) return null; + return jobs().update(id, productId, { + ...updates, + rev: (cur.rev ?? 0) + 1, + updatedAt: new Date().toISOString(), + }); +} + +/** Compare-and-swap on `rev` — the atomic-claim / fenced-transition primitive. */ +export async function revUpdateJob( + id: string, + productId: string, + expectedRev: number, + updates: Partial +): Promise> { + const cur = await jobs().findById(id, productId); + if (!cur) return { ok: false, reason: 'not_found' }; + if ((cur.rev ?? 0) !== expectedRev) return { ok: false, reason: 'conflict' }; + const doc = await jobs().update(id, productId, { + ...updates, + rev: expectedRev + 1, + updatedAt: new Date().toISOString(), + }); + return { ok: true, doc }; +} + +export async function deleteJob(id: string, productId: string): Promise { + await jobs().delete(id, productId); +} + +// ── Runs ──────────────────────────────────────────────────────────────────── + +export async function createRun(doc: FleetRunDoc): Promise { + return runs().create(doc); +} + +export async function updateRun( + id: string, + jobId: string, + updates: Partial +): Promise { + const cur = await runs().findById(id, jobId); + if (!cur) return null; + return runs().update(id, jobId, updates); +} + +export async function listRunsByJob(jobId: string): Promise { + return runs().findMany({ filter: { jobId }, sort: { attempt: 1 } }); +} + +// ── Leases ────────────────────────────────────────────────────────────────── + +export async function getLease(jobId: string): Promise { + return leases().findById(jobId, jobId); +} + +export async function createLease(doc: FleetLeaseDoc): Promise { + return leases().create(doc); +} + +export async function revUpdateLease( + jobId: string, + expectedRev: number, + updates: Partial +): Promise> { + const cur = await leases().findById(jobId, jobId); + if (!cur) return { ok: false, reason: 'not_found' }; + if ((cur.rev ?? 0) !== expectedRev) return { ok: false, reason: 'conflict' }; + const doc = await leases().update(jobId, jobId, { + ...updates, + rev: expectedRev + 1, + updatedAt: new Date().toISOString(), + }); + return { ok: true, doc }; +} + +export async function listExpiredLeases(nowIso: string): Promise { + return leases().findMany({ + filter: { status: 'held', expiresAt: { $lt: nowIso } }, + }); +} + +// ── Factories ───────────────────────────────────────────────────────────────── + +export async function getFactory( + factoryId: string, + productId: string +): Promise { + return factories().findById(factoryId, productId); +} + +export async function upsertFactory(doc: FleetFactoryDoc): Promise { + return factories().upsert(doc); +} + +export async function listFactories(productId: string): Promise { + return factories().findMany({ filter: { productId } }); +} + +// ── Profiles ──────────────────────────────────────────────────────────────── + +export async function createProfile(doc: FleetProfileDoc): Promise { + return profiles().create(doc); +} + +export async function getProfile(id: string, productId: string): Promise { + return profiles().findById(id, productId); +} + +export async function listProfiles(productId: string): Promise { + return profiles().findMany({ filter: { productId } }); +} + +// ── Events (append-only) ──────────────────────────────────────────────────── + +export interface AppendEventInput { + jobId: string; + productId: string; + type: string; + actor?: string; + data?: Record; +} + +/** Append an ordered event to a job's stream; `seq` is monotonic per job. */ +export async function appendEvent(input: AppendEventInput): Promise { + const existing = await events().findMany({ filter: { jobId: input.jobId } }); + const seq = existing.length; + const doc: FleetEventDoc = { + id: `${input.jobId}:evt:${seq}`, + productId: input.productId, + jobId: input.jobId, + seq, + type: input.type, + at: new Date().toISOString(), + actor: input.actor, + data: input.data ?? {}, + }; + return events().create(doc); +} + +export async function listEvents(jobId: string): Promise { + const docs = await events().findMany({ filter: { jobId }, sort: { seq: 1 } }); + return docs; +} + +// ── Artifacts ───────────────────────────────────────────────────────────────── + +export async function createArtifact(doc: FleetArtifactDoc): Promise { + return artifacts().create(doc); +} + +export async function listArtifacts(jobId: string): Promise { + return artifacts().findMany({ filter: { jobId }, sort: { createdAt: 1 } }); +} diff --git a/services/platform-service/src/modules/fleet/routes.test.ts b/services/platform-service/src/modules/fleet/routes.test.ts new file mode 100644 index 00000000..b0aa7380 --- /dev/null +++ b/services/platform-service/src/modules/fleet/routes.test.ts @@ -0,0 +1,144 @@ +/** + * Fleet routes — Fastify inject, real coordinator/repo on the memory provider. + * Auth + productId resolution are mocked (as in the items module routes test). + */ + +import Fastify, { type FastifyInstance } from 'fastify'; +import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'; +import { MemoryDatastoreProvider } from '@bytelyst/datastore'; +import { _resetDatastoreProvider, setProvider } from '../../lib/datastore.js'; + +vi.mock('../../lib/auth.js', () => ({ + extractAuth: vi.fn(async () => ({ sub: 'user_1', role: 'admin' })), +})); +vi.mock('../../lib/request-context.js', () => ({ + getRequestProductId: () => 'lysnrai', +})); + +async function buildApp(): Promise { + const { fleetRoutes } = await import('./routes.js'); + // Fastify's default error handler maps a thrown ServiceError (which carries a + // `.statusCode`) to the right HTTP status — same as the items routes test. + const app = Fastify({ logger: false }); + await app.register(fleetRoutes, { prefix: '/api' }); + return app; +} + +async function submit(app: FastifyInstance, body: Record) { + return app.inject({ method: 'POST', url: '/api/fleet/jobs', payload: body }); +} + +describe('fleetRoutes', () => { + beforeEach(() => setProvider(new MemoryDatastoreProvider())); + afterEach(() => { + _resetDatastoreProvider(); + vi.clearAllMocks(); + }); + + it('POST /fleet/jobs submits (201) and is idempotent (200 dedup)', async () => { + const app = await buildApp(); + const r1 = await submit(app, { idempotencyKey: 'k1', bodyMd: '# task' }); + expect(r1.statusCode).toBe(201); + expect(JSON.parse(r1.body).outcome).toBe('created'); + + const r2 = await submit(app, { idempotencyKey: 'k1', bodyMd: '# task' }); + expect(r2.statusCode).toBe(200); + expect(JSON.parse(r2.body).outcome).toBe('deduplicated'); + }); + + it('POST /fleet/jobs rejects an invalid body (400)', async () => { + const app = await buildApp(); + const res = await submit(app, { idempotencyKey: 'k1' }); // missing bodyMd + expect(res.statusCode).toBe(400); + }); + + it('full lifecycle: submit -> list -> claim -> patch(fenced) -> renew -> release', async () => { + const app = await buildApp(); + const sub = await submit(app, { idempotencyKey: 'k1', bodyMd: '# task', priority: 'high' }); + const jobId = JSON.parse(sub.body).job.id as string; + + const list = await app.inject({ method: 'GET', url: '/api/fleet/jobs?stage=queued' }); + expect(list.statusCode).toBe(200); + expect(JSON.parse(list.body).jobs).toHaveLength(1); + + const claim = await app.inject({ + method: 'POST', + url: '/api/fleet/claim', + payload: { factoryId: 'fac_1', capabilities: [] }, + }); + expect(claim.statusCode).toBe(200); + const claimed = JSON.parse(claim.body); + expect(claimed.claimed).toBe(true); + expect(claimed.job.stage).toBe('assigned'); + const epoch = claimed.job.leaseEpoch as number; + expect(epoch).toBe(1); + + // fenced: stale epoch rejected (409) + const stale = await app.inject({ + method: 'PATCH', + url: `/api/fleet/jobs/${jobId}`, + payload: { leaseEpoch: epoch - 1, stage: 'building' }, + }); + expect(stale.statusCode).toBe(409); + + // current epoch succeeds + const patch = await app.inject({ + method: 'PATCH', + url: `/api/fleet/jobs/${jobId}`, + payload: { leaseEpoch: epoch, stage: 'building' }, + }); + expect(patch.statusCode).toBe(200); + expect(JSON.parse(patch.body).stage).toBe('building'); + + const renew = await app.inject({ + method: 'POST', + url: `/api/fleet/jobs/${jobId}/lease/renew`, + payload: { leaseEpoch: epoch, leaseSeconds: 600 }, + }); + expect(renew.statusCode).toBe(200); + expect(JSON.parse(renew.body).renewals).toBe(1); + + const release = await app.inject({ + method: 'POST', + url: `/api/fleet/jobs/${jobId}/lease/release`, + payload: { leaseEpoch: epoch, stage: 'review' }, + }); + expect(release.statusCode).toBe(200); + expect(JSON.parse(release.body).status).toBe('released'); + + const events = await app.inject({ method: 'GET', url: `/api/fleet/jobs/${jobId}/events` }); + const types = JSON.parse(events.body).events.map((e: { type: string }) => e.type); + expect(types).toContain('submitted'); + expect(types).toContain('assigned'); + }); + + it('POST /fleet/claim returns claimed:false when nothing is eligible', async () => { + const app = await buildApp(); + const res = await app.inject({ + method: 'POST', + url: '/api/fleet/claim', + payload: { factoryId: 'fac_1' }, + }); + expect(res.statusCode).toBe(200); + expect(JSON.parse(res.body).claimed).toBe(false); + }); + + it('POST /fleet/factories/heartbeat upserts a factory', async () => { + const app = await buildApp(); + const res = await app.inject({ + method: 'POST', + url: '/api/fleet/factories/heartbeat', + payload: { factoryId: 'fac_1', capabilities: ['os:mac'], health: 'ok' }, + }); + expect(res.statusCode).toBe(200); + const body = JSON.parse(res.body); + expect(body.ok).toBe(true); + expect(body.factory.factoryId).toBe('fac_1'); + }); + + it('GET /fleet/jobs/:id returns 404 when missing', async () => { + const app = await buildApp(); + const res = await app.inject({ method: 'GET', url: '/api/fleet/jobs/nope' }); + expect(res.statusCode).toBe(404); + }); +}); diff --git a/services/platform-service/src/modules/fleet/routes.ts b/services/platform-service/src/modules/fleet/routes.ts new file mode 100644 index 00000000..0d630aea --- /dev/null +++ b/services/platform-service/src/modules/fleet/routes.ts @@ -0,0 +1,182 @@ +/** + * Fleet REST endpoints — agent gigafactory coordinator. + * + * POST /fleet/jobs submit a job (idempotent) + * GET /fleet/jobs list jobs (by stage / idempotencyKey) + * GET /fleet/jobs/:id get one job + * PATCH /fleet/jobs/:id fenced state transition (carries leaseEpoch) + * POST /fleet/claim atomic claim for a factory + * POST /fleet/jobs/:id/lease/renew renew a held lease + * POST /fleet/jobs/:id/lease/release release a held lease + * POST /fleet/factories/heartbeat factory liveness + * GET /fleet/jobs/:id/runs job run history + * GET /fleet/jobs/:id/events append-only event stream + * + * All routes require auth + a resolved productId, exactly like the items module. + */ + +import type { FastifyInstance } from 'fastify'; +import { getRequestProductId } from '../../lib/request-context.js'; +import { BadRequestError, ConflictError, NotFoundError } from '../../lib/errors.js'; +import { extractAuth } from '../../lib/auth.js'; +import * as repo from './repository.js'; +import * as coordinator from './coordinator.js'; +import { + SubmitJobSchema, + ListJobsQuerySchema, + PatchJobSchema, + ClaimSchema, + RenewLeaseSchema, + ReleaseLeaseSchema, + HeartbeatSchema, +} from './types.js'; + +function badRequest(issues: { message: string }[]): never { + throw new BadRequestError(issues.map(i => i.message).join('; ')); +} + +export async function fleetRoutes(app: FastifyInstance) { + // ── Submit (idempotent) ── + app.post('/fleet/jobs', async (req, reply) => { + await extractAuth(req); + const parsed = SubmitJobSchema.safeParse(req.body); + if (!parsed.success) badRequest(parsed.error.issues); + const pid = parsed.data.productId || getRequestProductId(req); + const result = await coordinator.submitJob(pid, parsed.data); + reply.code(result.outcome === 'created' ? 201 : 200); + return { outcome: result.outcome, job: result.job }; + }); + + // ── List ── + app.get('/fleet/jobs', async req => { + await extractAuth(req); + const parsed = ListJobsQuerySchema.safeParse(req.query); + if (!parsed.success) badRequest(parsed.error.issues); + const q = parsed.data; + const pid = q.productId || getRequestProductId(req); + const jobs = await repo.listJobs({ + productId: pid, + stage: q.stage, + idempotencyKey: q.idempotencyKey, + limit: q.limit, + offset: q.offset, + }); + return { jobs, limit: q.limit, offset: q.offset }; + }); + + // ── Get one ── + app.get('/fleet/jobs/:id', async req => { + await extractAuth(req); + const { id } = req.params as { id: string }; + const pid = getRequestProductId(req); + const job = await repo.getJob(id, pid); + if (!job) throw new NotFoundError('Job not found'); + return job; + }); + + // ── Fenced state transition ── + app.patch('/fleet/jobs/:id', async req => { + await extractAuth(req); + const { id } = req.params as { id: string }; + const pid = getRequestProductId(req); + const parsed = PatchJobSchema.safeParse(req.body); + if (!parsed.success) badRequest(parsed.error.issues); + const res = await coordinator.patchJobFenced(id, pid, parsed.data); + if (!res.ok) { + if (res.reason === 'not_found') throw new NotFoundError('Job not found'); + if (res.reason === 'fenced') { + throw new ConflictError('stale leaseEpoch — transition fenced (job reassigned)'); + } + throw new ConflictError('concurrent update conflict — retry'); + } + return res.doc; + }); + + // ── Atomic claim ── + app.post('/fleet/claim', async req => { + await extractAuth(req); + const parsed = ClaimSchema.safeParse(req.body); + if (!parsed.success) badRequest(parsed.error.issues); + const pid = parsed.data.productId || getRequestProductId(req); + const claim = await coordinator.claimNextJob({ + productId: pid, + factoryId: parsed.data.factoryId, + capabilities: parsed.data.capabilities, + leaseSeconds: parsed.data.leaseSeconds, + }); + if (!claim) return { claimed: false }; + return { claimed: true, ...claim }; + }); + + // ── Lease renew ── + app.post('/fleet/jobs/:id/lease/renew', async req => { + await extractAuth(req); + const { id } = req.params as { id: string }; + const pid = getRequestProductId(req); + const parsed = RenewLeaseSchema.safeParse(req.body); + if (!parsed.success) badRequest(parsed.error.issues); + const res = await coordinator.renewLease( + id, + pid, + parsed.data.leaseEpoch, + parsed.data.leaseSeconds + ); + if (!res.ok) { + if (res.reason === 'not_found') throw new NotFoundError('Job or lease not found'); + if (res.reason === 'fenced') throw new ConflictError('stale leaseEpoch — renew fenced'); + throw new ConflictError('lease renew conflict — retry'); + } + return res.doc; + }); + + // ── Lease release ── + app.post('/fleet/jobs/:id/lease/release', async req => { + await extractAuth(req); + const { id } = req.params as { id: string }; + const pid = getRequestProductId(req); + const parsed = ReleaseLeaseSchema.safeParse(req.body); + if (!parsed.success) badRequest(parsed.error.issues); + const res = await coordinator.releaseLease(id, pid, parsed.data.leaseEpoch, parsed.data.stage); + if (!res.ok) { + if (res.reason === 'not_found') throw new NotFoundError('Job or lease not found'); + if (res.reason === 'fenced') throw new ConflictError('stale leaseEpoch — release fenced'); + throw new ConflictError('lease release conflict — retry'); + } + return res.doc; + }); + + // ── Factory heartbeat ── + app.post('/fleet/factories/heartbeat', async req => { + await extractAuth(req); + const parsed = HeartbeatSchema.safeParse(req.body); + if (!parsed.success) badRequest(parsed.error.issues); + const pid = parsed.data.productId || getRequestProductId(req); + await coordinator.heartbeat({ + productId: pid, + factoryId: parsed.data.factoryId, + descriptor: parsed.data.descriptor, + capabilities: parsed.data.capabilities, + health: parsed.data.health, + load: parsed.data.load, + seatLimit: parsed.data.seatLimit, + }); + const factory = await repo.getFactory(parsed.data.factoryId, pid); + return { ok: true, factory }; + }); + + // ── Runs ── + app.get('/fleet/jobs/:id/runs', async req => { + await extractAuth(req); + const { id } = req.params as { id: string }; + const runs = await repo.listRunsByJob(id); + return { runs }; + }); + + // ── Events ── + app.get('/fleet/jobs/:id/events', async req => { + await extractAuth(req); + const { id } = req.params as { id: string }; + const events = await repo.listEvents(id); + return { events }; + }); +} diff --git a/services/platform-service/src/modules/fleet/types.test.ts b/services/platform-service/src/modules/fleet/types.test.ts new file mode 100644 index 00000000..5cdb93ff --- /dev/null +++ b/services/platform-service/src/modules/fleet/types.test.ts @@ -0,0 +1,219 @@ +/** + * Fleet schema validation — valid docs pass; missing productId / bad enum fail. + */ + +import { describe, it, expect } from 'vitest'; +import { + FleetJobDocSchema, + FleetRunDocSchema, + FleetLeaseDocSchema, + FleetFactoryDocSchema, + FleetProfileDocSchema, + FleetEventDocSchema, + FleetArtifactDocSchema, + SubmitJobSchema, + PatchJobSchema, + ClaimSchema, + FLEET_STAGES, + FLEET_PRIORITIES, +} from './types.js'; + +const now = '2026-05-30T00:00:00.000Z'; + +const validJob = { + id: 'fjob_1', + productId: 'lysnrai', + stage: 'queued', + idempotencyKey: 'task-1', + contentHash: 'abc', + bodyMd: '# task', + manifestSnapshot: { + priority: 'medium', + capabilities: [], + prefersEngine: [], + allowedScope: [], + deps: [], + }, + priority: 'medium', + priorityOrder: 2, + capabilities: [], + deps: [], + kind: 'leaf', + attempts: 0, + leaseEpoch: 0, + rev: 0, + createdAt: now, + updatedAt: now, +}; + +describe('FleetJobDocSchema', () => { + it('accepts a valid job', () => { + expect(FleetJobDocSchema.safeParse(validJob).success).toBe(true); + }); + it('rejects a missing productId', () => { + const { productId: _omit, ...bad } = validJob; + expect(FleetJobDocSchema.safeParse(bad).success).toBe(false); + }); + it('rejects an empty productId', () => { + expect(FleetJobDocSchema.safeParse({ ...validJob, productId: '' }).success).toBe(false); + }); + it('rejects an invalid stage enum', () => { + expect(FleetJobDocSchema.safeParse({ ...validJob, stage: 'done' }).success).toBe(false); + }); + it('rejects an invalid priority enum', () => { + expect(FleetJobDocSchema.safeParse({ ...validJob, priority: 'urgent' }).success).toBe(false); + }); +}); + +describe('FleetRunDocSchema', () => { + const validRun = { + id: 'fjob_1:run:1', + productId: 'lysnrai', + jobId: 'fjob_1', + attempt: 1, + engine: 'devin', + startedAt: now, + insights: { tokensIn: 10, tokensOut: 5 }, + }; + it('accepts a valid run', () => { + expect(FleetRunDocSchema.safeParse(validRun).success).toBe(true); + }); + it('rejects missing productId', () => { + const { productId: _o, ...bad } = validRun; + expect(FleetRunDocSchema.safeParse(bad).success).toBe(false); + }); + it('rejects attempt <= 0', () => { + expect(FleetRunDocSchema.safeParse({ ...validRun, attempt: 0 }).success).toBe(false); + }); +}); + +describe('FleetLeaseDocSchema', () => { + const validLease = { + id: 'fjob_1', + productId: 'lysnrai', + jobId: 'fjob_1', + leaseEpoch: 1, + renewals: 0, + status: 'held', + rev: 0, + updatedAt: now, + }; + it('accepts a valid lease', () => { + expect(FleetLeaseDocSchema.safeParse(validLease).success).toBe(true); + }); + it('rejects an invalid status', () => { + expect(FleetLeaseDocSchema.safeParse({ ...validLease, status: 'open' }).success).toBe(false); + }); + it('rejects missing productId', () => { + const { productId: _o, ...bad } = validLease; + expect(FleetLeaseDocSchema.safeParse(bad).success).toBe(false); + }); +}); + +describe('FleetFactoryDocSchema', () => { + const validFactory = { + id: 'fac_1', + productId: 'lysnrai', + factoryId: 'fac_1', + descriptor: { os: 'mac' }, + capabilities: ['os:mac'], + health: 'ok', + load: 0, + seatLimit: 2, + lastHeartbeatAt: now, + }; + it('accepts a valid factory', () => { + expect(FleetFactoryDocSchema.safeParse(validFactory).success).toBe(true); + }); + it('rejects invalid health enum', () => { + expect(FleetFactoryDocSchema.safeParse({ ...validFactory, health: 'sick' }).success).toBe( + false + ); + }); +}); + +describe('FleetProfileDocSchema / FleetEventDocSchema / FleetArtifactDocSchema', () => { + it('accepts a valid profile and rejects missing productId', () => { + const valid = { + id: 'prof_1', + productId: 'lysnrai', + name: 'backend', + version: 1, + snapshot: {}, + createdAt: now, + }; + expect(FleetProfileDocSchema.safeParse(valid).success).toBe(true); + const { productId: _o, ...bad } = valid; + expect(FleetProfileDocSchema.safeParse(bad).success).toBe(false); + }); + it('accepts a valid event and rejects missing type', () => { + const valid = { + id: 'fjob_1:evt:0', + productId: 'lysnrai', + jobId: 'fjob_1', + seq: 0, + type: 'submitted', + at: now, + data: {}, + }; + expect(FleetEventDocSchema.safeParse(valid).success).toBe(true); + const { type: _t, ...bad } = valid; + expect(FleetEventDocSchema.safeParse(bad).success).toBe(false); + }); + it('accepts a valid artifact and rejects missing blobUrl', () => { + const valid = { + id: 'art_1', + productId: 'lysnrai', + jobId: 'fjob_1', + kind: 'coverage', + blobUrl: 'https://b/x', + createdAt: now, + }; + expect(FleetArtifactDocSchema.safeParse(valid).success).toBe(true); + const { blobUrl: _b, ...bad } = valid; + expect(FleetArtifactDocSchema.safeParse(bad).success).toBe(false); + }); +}); + +describe('request schemas', () => { + it('SubmitJobSchema applies defaults', () => { + const parsed = SubmitJobSchema.safeParse({ idempotencyKey: 'k', bodyMd: '# do' }); + expect(parsed.success).toBe(true); + if (parsed.success) { + expect(parsed.data.priority).toBe('medium'); + expect(parsed.data.kind).toBe('leaf'); + expect(parsed.data.deps).toEqual([]); + } + }); + it('SubmitJobSchema rejects empty bodyMd / idempotencyKey', () => { + expect(SubmitJobSchema.safeParse({ idempotencyKey: '', bodyMd: 'x' }).success).toBe(false); + expect(SubmitJobSchema.safeParse({ idempotencyKey: 'k', bodyMd: '' }).success).toBe(false); + }); + it('PatchJobSchema requires leaseEpoch', () => { + expect(PatchJobSchema.safeParse({ stage: 'building' }).success).toBe(false); + expect(PatchJobSchema.safeParse({ leaseEpoch: 1, stage: 'building' }).success).toBe(true); + }); + it('ClaimSchema requires factoryId', () => { + expect(ClaimSchema.safeParse({ capabilities: [] }).success).toBe(false); + expect(ClaimSchema.safeParse({ factoryId: 'fac_1' }).success).toBe(true); + }); +}); + +describe('enum constants', () => { + it('stages match the agent-queue lifecycle', () => { + expect(FLEET_STAGES).toEqual([ + 'queued', + 'blocked', + 'assigned', + 'building', + 'review', + 'testing', + 'shipped', + 'failed', + 'dead_letter', + ]); + }); + it('priorities', () => { + expect(FLEET_PRIORITIES).toEqual(['critical', 'high', 'medium', 'low']); + }); +}); diff --git a/services/platform-service/src/modules/fleet/types.ts b/services/platform-service/src/modules/fleet/types.ts new file mode 100644 index 00000000..f20c859f --- /dev/null +++ b/services/platform-service/src/modules/fleet/types.ts @@ -0,0 +1,317 @@ +/** + * Fleet module — types for the agent-gigafactory coordinator (Phase 2 foundation). + * + * Durable data model for distributed agent jobs: jobs, runs, leases, factories, + * profiles, events, and artifacts. Product-agnostic — every document carries a + * `productId`. Zod schemas are the source of truth; doc/input types are inferred + * from them (no `any`). + * + * Field names + lifecycle mirror the agent-queue gigafactory spec + * (../learning_ai_devops_tools/agent-queue/docs/GIGAFACTORY_ROADMAP.md §4/§7/§8/§13). + */ + +import { z } from 'zod'; + +// ── Enums ─────────────────────────────────────────────────────────────────── + +/** Canonical job lifecycle (§11). `blocked` = unmet deps; `dead_letter` = retries exhausted. */ +export const FLEET_STAGES = [ + 'queued', + 'blocked', + 'assigned', + 'building', + 'review', + 'testing', + 'shipped', + 'failed', + 'dead_letter', +] as const; +export type FleetStage = (typeof FLEET_STAGES)[number]; + +/** Stages from which a worker may still progress / that count as "in flight". */ +export const ACTIVE_STAGES: readonly FleetStage[] = ['assigned', 'building', 'review', 'testing']; +/** Stages that satisfy a hard dependency / a soft dependency. */ +export const DEP_DONE_HARD: readonly FleetStage[] = ['shipped']; +export const DEP_DONE_SOFT: readonly FleetStage[] = ['shipped', 'testing']; + +export const FLEET_PRIORITIES = ['critical', 'high', 'medium', 'low'] as const; +export type FleetPriority = (typeof FLEET_PRIORITIES)[number]; +export const PRIORITY_ORDER: Record = { + critical: 0, + high: 1, + medium: 2, + low: 3, +}; + +export const FLEET_ENGINE_CLASSES = ['agentic-coder', 'chat-coder', 'review-only'] as const; +export type FleetEngineClass = (typeof FLEET_ENGINE_CLASSES)[number]; + +export const DEPS_MODES = ['hard', 'soft'] as const; +export type DepsMode = (typeof DEPS_MODES)[number]; + +/** Failure classes a `retry.on` may name (mirrors agent-queue). */ +export const RETRY_CLASSES = ['timeout', 'verify_failed', 'crash', 'agent_error'] as const; +export type RetryClass = (typeof RETRY_CLASSES)[number]; + +/** Terminal run results recorded on a FleetRunDoc. */ +export const RUN_RESULTS = [ + 'review', + 'testing', + 'shipped', + 'failed', + 'timeout', + 'verify_failed', + 'capability_mismatch', + 'no_engine', + 'retries_exhausted', +] as const; +export type RunResult = (typeof RUN_RESULTS)[number]; + +export const FACTORY_HEALTH = ['ok', 'degraded', 'down'] as const; +export type FactoryHealth = (typeof FACTORY_HEALTH)[number]; + +export const LEASE_STATUS = ['held', 'expired', 'released'] as const; +export type LeaseStatus = (typeof LEASE_STATUS)[number]; + +export const JOB_KINDS = ['leaf', 'composite'] as const; +export type JobKind = (typeof JOB_KINDS)[number]; + +// ── Shared value objects ───────────────────────────────────────────────────── + +export const CheckpointSchema = z.object({ + wipBranch: z.string(), + wipBase: z.string().optional(), + wipCommit: z.string().optional(), +}); + +export const BudgetSchema = z.object({ + usd: z.number().nonnegative().optional(), + tokens: z.number().nonnegative().optional(), + wall: z.number().nonnegative().optional(), +}); + +export const RetryPolicySchema = z.object({ + max: z.number().int().nonnegative(), + backoff: z.string().optional(), + on: z.array(z.enum(RETRY_CLASSES)).default([]), +}); + +export const ManifestSnapshotSchema = z.object({ + priority: z.enum(FLEET_PRIORITIES).default('medium'), + capabilities: z.array(z.string()).default([]), + engineClass: z.enum(FLEET_ENGINE_CLASSES).optional(), + profile: z.string().optional(), + prefersEngine: z.array(z.string()).default([]), + allowedScope: z.array(z.string()).default([]), + deps: z.array(z.string()).default([]), + depsMode: z.enum(DEPS_MODES).optional(), + budget: BudgetSchema.optional(), + retry: RetryPolicySchema.optional(), +}); + +export const InsightsSchema = z.object({ + model: z.string().optional(), + tokensIn: z.number().optional(), + tokensOut: z.number().optional(), + tokensCached: z.number().optional(), + costUsd: z.number().optional(), + estimated: z.boolean().optional(), + turns: z.number().optional(), + toolCalls: z.number().optional(), + filesChanged: z.number().optional(), + linesAdded: z.number().optional(), + linesDeleted: z.number().optional(), +}); + +// ── Container documents ─────────────────────────────────────────────────────── + +/** + * FleetJobDoc — the durable job (pk `/productId`). + * `rev` is the optimistic-concurrency token used by the coordinator's atomic + * claim + fenced transitions (maps to Cosmos `_etag` / If-Match in production; + * see repository.revUpdate). + */ +export const FleetJobDocSchema = z.object({ + id: z.string(), + productId: z.string().min(1), + stage: z.enum(FLEET_STAGES), + idempotencyKey: z.string().min(1), + contentHash: z.string(), + bodyMd: z.string(), + manifestSnapshot: ManifestSnapshotSchema, + priority: z.enum(FLEET_PRIORITIES), + priorityOrder: z.number().int(), + capabilities: z.array(z.string()).default([]), + engineClass: z.enum(FLEET_ENGINE_CLASSES).optional(), + profile: z.string().optional(), + deps: z.array(z.string()).default([]), + depsMode: z.enum(DEPS_MODES).optional(), + budget: BudgetSchema.optional(), + retry: RetryPolicySchema.optional(), + kind: z.enum(JOB_KINDS).default('leaf'), + parentId: z.string().optional(), + trackerItemId: z.string().optional(), + checkpoint: CheckpointSchema.optional(), + attempts: z.number().int().nonnegative().default(0), + leaseEpoch: z.number().int().nonnegative().default(0), + rev: z.number().int().nonnegative().default(0), + blockedReason: z.string().optional(), + createdAt: z.string(), + updatedAt: z.string(), +}); +export type FleetJobDoc = z.infer; + +/** FleetRunDoc — one execution attempt of a job (pk `/jobId`). */ +export const FleetRunDocSchema = z.object({ + id: z.string(), + productId: z.string().min(1), + jobId: z.string().min(1), + attempt: z.number().int().positive(), + factoryId: z.string().optional(), + engine: z.string(), + profileSnapshot: z.record(z.string(), z.unknown()).optional(), + startedAt: z.string(), + endedAt: z.string().optional(), + exit: z.number().int().optional(), + verifyResult: z.enum(['pass', 'fail']).optional(), + result: z.enum(RUN_RESULTS).optional(), + insights: InsightsSchema.default({}), +}); +export type FleetRunDoc = z.infer; + +/** FleetLeaseDoc — the single-holder lease for a job (pk `/jobId`). */ +export const FleetLeaseDocSchema = z.object({ + id: z.string(), + productId: z.string().min(1), + jobId: z.string().min(1), + holderFactoryId: z.string().optional(), + expiresAt: z.string().optional(), + leaseEpoch: z.number().int().nonnegative().default(0), + renewals: z.number().int().nonnegative().default(0), + status: z.enum(LEASE_STATUS).default('held'), + rev: z.number().int().nonnegative().default(0), + updatedAt: z.string(), +}); +export type FleetLeaseDoc = z.infer; + +/** FleetFactoryDoc — a registered worker host (pk `/productId`). */ +export const FleetFactoryDocSchema = z.object({ + id: z.string(), + productId: z.string().min(1), + factoryId: z.string().min(1), + descriptor: z.record(z.string(), z.unknown()).default({}), + capabilities: z.array(z.string()).default([]), + health: z.enum(FACTORY_HEALTH).default('ok'), + load: z.number().nonnegative().default(0), + seatLimit: z.number().int().positive().default(1), + lastHeartbeatAt: z.string(), +}); +export type FleetFactoryDoc = z.infer; + +/** FleetProfileDoc — an immutable, versioned profile snapshot (pk `/productId`). */ +export const FleetProfileDocSchema = z.object({ + id: z.string(), + productId: z.string().min(1), + name: z.string().min(1), + version: z.number().int().positive(), + snapshot: z.record(z.string(), z.unknown()), + createdAt: z.string(), +}); +export type FleetProfileDoc = z.infer; + +/** FleetEventDoc — append-only audit/event stream entry (pk `/jobId`). */ +export const FleetEventDocSchema = z.object({ + id: z.string(), + productId: z.string().min(1), + jobId: z.string().min(1), + seq: z.number().int().nonnegative(), + type: z.string().min(1), + at: z.string(), + actor: z.string().optional(), + data: z.record(z.string(), z.unknown()).default({}), +}); +export type FleetEventDoc = z.infer; + +/** FleetArtifactDoc — pointer to a blob-stored artifact (pk `/jobId`). No inline logs. */ +export const FleetArtifactDocSchema = z.object({ + id: z.string(), + productId: z.string().min(1), + jobId: z.string().min(1), + runId: z.string().optional(), + kind: z.string().min(1), + blobUrl: z.string().min(1), + sizeBytes: z.number().int().nonnegative().optional(), + createdAt: z.string(), +}); +export type FleetArtifactDoc = z.infer; + +// ── API request schemas (routes) ────────────────────────────────────────────── + +export const SubmitJobSchema = z.object({ + productId: z.string().min(1).optional(), + idempotencyKey: z.string().min(1), + bodyMd: z.string().min(1), + priority: z.enum(FLEET_PRIORITIES).default('medium'), + capabilities: z.array(z.string()).default([]), + engineClass: z.enum(FLEET_ENGINE_CLASSES).optional(), + profile: z.string().optional(), + prefersEngine: z.array(z.string()).default([]), + allowedScope: z.array(z.string()).default([]), + deps: z.array(z.string()).default([]), + depsMode: z.enum(DEPS_MODES).optional(), + budget: BudgetSchema.optional(), + retry: RetryPolicySchema.optional(), + kind: z.enum(JOB_KINDS).default('leaf'), + parentId: z.string().optional(), + trackerItemId: z.string().optional(), +}); +export type SubmitJobInput = z.infer; + +export const ListJobsQuerySchema = z.object({ + productId: z.string().optional(), + stage: z.enum(FLEET_STAGES).optional(), + idempotencyKey: z.string().optional(), + limit: z.coerce.number().int().min(1).max(200).default(50), + offset: z.coerce.number().int().min(0).default(0), +}); +export type ListJobsQuery = z.infer; + +/** Fenced state transition — worker MUST carry its leaseEpoch. */ +export const PatchJobSchema = z.object({ + leaseEpoch: z.number().int().nonnegative(), + stage: z.enum(FLEET_STAGES).optional(), + checkpoint: CheckpointSchema.optional(), + blockedReason: z.string().optional(), +}); +export type PatchJobInput = z.infer; + +export const ClaimSchema = z.object({ + productId: z.string().min(1).optional(), + factoryId: z.string().min(1), + capabilities: z.array(z.string()).default([]), + leaseSeconds: z.number().int().positive().max(86400).default(900), +}); +export type ClaimInput = z.infer; + +export const RenewLeaseSchema = z.object({ + leaseEpoch: z.number().int().nonnegative(), + leaseSeconds: z.number().int().positive().max(86400).default(900), +}); +export type RenewLeaseInput = z.infer; + +export const ReleaseLeaseSchema = z.object({ + leaseEpoch: z.number().int().nonnegative(), + stage: z.enum(FLEET_STAGES).optional(), +}); +export type ReleaseLeaseInput = z.infer; + +export const HeartbeatSchema = z.object({ + productId: z.string().min(1).optional(), + factoryId: z.string().min(1), + descriptor: z.record(z.string(), z.unknown()).optional(), + capabilities: z.array(z.string()).optional(), + health: z.enum(FACTORY_HEALTH).optional(), + load: z.number().nonnegative().optional(), + seatLimit: z.number().int().positive().optional(), +}); +export type HeartbeatInput = z.infer; diff --git a/services/platform-service/src/server.ts b/services/platform-service/src/server.ts index 9d0546ae..13303694 100644 --- a/services/platform-service/src/server.ts +++ b/services/platform-service/src/server.ts @@ -63,6 +63,7 @@ import { licenseRoutes } from './modules/licenses/routes.js'; import { stripeRoutes } from './modules/stripe/routes.js'; import { settingsRoutes } from './modules/settings/routes.js'; import { itemRoutes } from './modules/items/routes.js'; +import { fleetRoutes } from './modules/fleet/routes.js'; import { commentRoutes } from './modules/comments/routes.js'; import { voteRoutes } from './modules/votes/routes.js'; import { publicRoutes } from './modules/public/routes.js'; @@ -207,6 +208,8 @@ await app.register(stripeRoutes, { prefix: '/api' }); await app.register(settingsRoutes, { prefix: '/api' }); // Tracker modules (merged from tracker-service) await app.register(itemRoutes, { prefix: '/api' }); +// Agent Gigafactory — fleet coordinator (jobs/claim/lease/fence/reaper) +await app.register(fleetRoutes, { prefix: '/api' }); await app.register(commentRoutes, { prefix: '/api' }); await app.register(voteRoutes, { prefix: '/api' }); // API tokens module