diff --git a/services/platform-service/src/modules/fleet/repository.test.ts b/services/platform-service/src/modules/fleet/repository.test.ts new file mode 100644 index 00000000..cdf5b6b5 --- /dev/null +++ b/services/platform-service/src/modules/fleet/repository.test.ts @@ -0,0 +1,187 @@ +/** + * Fleet repository — CRUD round-trips, list filters, appendEvent ordering, and + * the rev compare-and-swap. Runs on the in-memory datastore provider. + */ + +import { afterEach, beforeEach, describe, expect, it } from 'vitest'; +import { MemoryDatastoreProvider } from '@bytelyst/datastore'; +import { _resetDatastoreProvider, setProvider } from '../../lib/datastore.js'; +import * as repo from './repository.js'; +import type { FleetJobDoc } from './types.js'; + +const PID = 'lysnrai'; +const now = '2026-05-30T00:00:00.000Z'; + +function jobDoc(over: Partial = {}): FleetJobDoc { + return { + id: 'fjob_1', + productId: PID, + stage: 'queued', + idempotencyKey: 'task-1', + contentHash: 'h1', + bodyMd: '# task', + manifestSnapshot: { + priority: 'medium', + capabilities: [], + prefersEngine: [], + allowedScope: [], + deps: [], + }, + priority: 'medium', + priorityOrder: 2, + capabilities: [], + deps: [], + kind: 'leaf', + attempts: 0, + leaseEpoch: 0, + rev: 0, + createdAt: now, + updatedAt: now, + ...over, + }; +} + +describe('fleet repository', () => { + beforeEach(() => setProvider(new MemoryDatastoreProvider())); + afterEach(() => _resetDatastoreProvider()); + + it('jobs: create / getById / list by stage + idempotencyKey', async () => { + await repo.createJob(jobDoc({ id: 'fjob_1', idempotencyKey: 'a', stage: 'queued' })); + await repo.createJob(jobDoc({ id: 'fjob_2', idempotencyKey: 'b', stage: 'blocked' })); + + expect((await repo.getJob('fjob_1', PID))?.idempotencyKey).toBe('a'); + expect(await repo.getJob('missing', PID)).toBeNull(); + + const queued = await repo.listJobs({ productId: PID, stage: 'queued' }); + expect(queued.map(j => j.id)).toEqual(['fjob_1']); + + const byKey = await repo.findJobsByIdempotencyKey(PID, 'b'); + expect(byKey).toHaveLength(1); + expect(byKey[0].id).toBe('fjob_2'); + }); + + it('jobs: revUpdate is a compare-and-swap', async () => { + await repo.createJob(jobDoc({ id: 'fjob_cas', rev: 0, stage: 'queued' })); + + const first = await repo.revUpdateJob('fjob_cas', PID, 0, { stage: 'assigned' }); + expect(first.ok).toBe(true); + if (first.ok) expect(first.doc.rev).toBe(1); + + // a second writer using the stale rev (0) must conflict + const stale = await repo.revUpdateJob('fjob_cas', PID, 0, { stage: 'building' }); + expect(stale.ok).toBe(false); + if (!stale.ok) expect(stale.reason).toBe('conflict'); + + const missing = await repo.revUpdateJob('nope', PID, 0, { stage: 'failed' }); + expect(missing.ok).toBe(false); + if (!missing.ok) expect(missing.reason).toBe('not_found'); + }); + + it('runs: create + list ordered by attempt', async () => { + await repo.createRun({ + id: 'r2', + productId: PID, + jobId: 'j', + attempt: 2, + engine: 'devin', + startedAt: now, + insights: {}, + }); + await repo.createRun({ + id: 'r1', + productId: PID, + jobId: 'j', + attempt: 1, + engine: 'devin', + startedAt: now, + insights: {}, + }); + const runs = await repo.listRunsByJob('j'); + expect(runs.map(r => r.attempt)).toEqual([1, 2]); + }); + + it('leases: create + revUpdate', async () => { + await repo.createLease({ + id: 'j', + productId: PID, + jobId: 'j', + leaseEpoch: 1, + renewals: 0, + status: 'held', + rev: 0, + updatedAt: now, + }); + const res = await repo.revUpdateLease('j', 0, { renewals: 1 }); + expect(res.ok).toBe(true); + if (res.ok) expect(res.doc.renewals).toBe(1); + expect((await repo.getLease('j'))?.rev).toBe(1); + }); + + it('factories: upsert + list by productId', async () => { + await repo.upsertFactory({ + id: 'fac_1', + productId: PID, + factoryId: 'fac_1', + descriptor: {}, + capabilities: ['os:mac'], + health: 'ok', + load: 0, + seatLimit: 1, + lastHeartbeatAt: now, + }); + await repo.upsertFactory({ + id: 'fac_1', + productId: PID, + factoryId: 'fac_1', + descriptor: {}, + capabilities: ['os:mac'], + health: 'degraded', + load: 2, + seatLimit: 1, + lastHeartbeatAt: now, + }); + const list = await repo.listFactories(PID); + expect(list).toHaveLength(1); + expect(list[0].health).toBe('degraded'); + }); + + it('profiles: create + get', async () => { + await repo.createProfile({ + id: 'prof_1', + productId: PID, + name: 'backend', + version: 1, + snapshot: { persona: 'x' }, + createdAt: now, + }); + expect((await repo.getProfile('prof_1', PID))?.name).toBe('backend'); + }); + + it('events: appendEvent yields an ordered, append-only stream', async () => { + await repo.appendEvent({ jobId: 'j', productId: PID, type: 'submitted' }); + await repo.appendEvent({ jobId: 'j', productId: PID, type: 'assigned', actor: 'fac_1' }); + await repo.appendEvent({ + jobId: 'j', + productId: PID, + type: 'transition', + data: { stage: 'building' }, + }); + const events = await repo.listEvents('j'); + expect(events.map(e => e.seq)).toEqual([0, 1, 2]); + expect(events.map(e => e.type)).toEqual(['submitted', 'assigned', 'transition']); + }); + + it('artifacts: create + list', async () => { + await repo.createArtifact({ + id: 'art_1', + productId: PID, + jobId: 'j', + kind: 'coverage', + blobUrl: 'https://b/x', + createdAt: now, + }); + const arts = await repo.listArtifacts('j'); + expect(arts).toHaveLength(1); + expect(arts[0].blobUrl).toBe('https://b/x'); + }); +}); diff --git a/services/platform-service/src/modules/fleet/repository.ts b/services/platform-service/src/modules/fleet/repository.ts new file mode 100644 index 00000000..8f2cee4a --- /dev/null +++ b/services/platform-service/src/modules/fleet/repository.ts @@ -0,0 +1,257 @@ +/** + * Fleet repositories — one per fleet_* container, cloud-agnostic via @bytelyst/datastore. + * + * Partition keys (see lib/cosmos-init.ts): + * fleet_jobs /productId fleet_runs /jobId + * fleet_leases /jobId fleet_factories /productId + * fleet_profiles /productId fleet_events /jobId + * fleet_artifacts /jobId + * + * Optimistic concurrency: jobs and leases carry a monotonic `rev` token. + * `revUpdate*` is a compare-and-swap — it writes only when the stored `rev` + * still equals the caller's expected `rev`, otherwise it reports a conflict + * WITHOUT writing. In production (Cosmos) this maps to an `_etag` / If-Match + * conditional replace; on the memory provider it is enforced by re-reading the + * current `rev` immediately before the write, which is exact for the sequential + * calls the coordinator + tests make (see coordinator.claimNextJob). + */ + +import type { DocumentCollection } from '@bytelyst/datastore'; +import { getCollection } from '../../lib/datastore.js'; +import type { + FleetArtifactDoc, + FleetEventDoc, + FleetFactoryDoc, + FleetJobDoc, + FleetLeaseDoc, + FleetProfileDoc, + FleetRunDoc, + FleetStage, +} from './types.js'; + +// ── Collections ─────────────────────────────────────────────────────────────── + +function jobs(): DocumentCollection { + return getCollection('fleet_jobs', '/productId'); +} +function runs(): DocumentCollection { + return getCollection('fleet_runs', '/jobId'); +} +function leases(): DocumentCollection { + return getCollection('fleet_leases', '/jobId'); +} +function factories(): DocumentCollection { + return getCollection('fleet_factories', '/productId'); +} +function profiles(): DocumentCollection { + return getCollection('fleet_profiles', '/productId'); +} +function events(): DocumentCollection { + return getCollection('fleet_events', '/jobId'); +} +function artifacts(): DocumentCollection { + return getCollection('fleet_artifacts', '/jobId'); +} + +/** Result of a compare-and-swap update. */ +export type RevResult = { ok: true; doc: T } | { ok: false; reason: 'not_found' | 'conflict' }; + +// ── Jobs ────────────────────────────────────────────────────────────────────── + +export async function createJob(doc: FleetJobDoc): Promise { + return jobs().create(doc); +} + +export async function getJob(id: string, productId: string): Promise { + return jobs().findById(id, productId); +} + +export interface ListJobsFilter { + productId: string; + stage?: FleetStage; + idempotencyKey?: string; + limit?: number; + offset?: number; +} + +export async function listJobs(f: ListJobsFilter): Promise { + const filter: Record = { productId: f.productId }; + if (f.stage) filter.stage = f.stage; + if (f.idempotencyKey) filter.idempotencyKey = f.idempotencyKey; + return jobs().findMany({ + filter, + sort: { priorityOrder: 1, createdAt: 1 }, + offset: f.offset, + limit: f.limit, + }); +} + +/** All jobs for a product sharing an idempotency-key (dedupe lookups). */ +export async function findJobsByIdempotencyKey( + productId: string, + idempotencyKey: string +): Promise { + return jobs().findMany({ filter: { productId, idempotencyKey } }); +} + +/** Unconditional merge update (use only when concurrency is not contended). */ +export async function updateJob( + id: string, + productId: string, + updates: Partial +): Promise { + const cur = await jobs().findById(id, productId); + if (!cur) return null; + return jobs().update(id, productId, { + ...updates, + rev: (cur.rev ?? 0) + 1, + updatedAt: new Date().toISOString(), + }); +} + +/** Compare-and-swap on `rev` — the atomic-claim / fenced-transition primitive. */ +export async function revUpdateJob( + id: string, + productId: string, + expectedRev: number, + updates: Partial +): Promise> { + const cur = await jobs().findById(id, productId); + if (!cur) return { ok: false, reason: 'not_found' }; + if ((cur.rev ?? 0) !== expectedRev) return { ok: false, reason: 'conflict' }; + const doc = await jobs().update(id, productId, { + ...updates, + rev: expectedRev + 1, + updatedAt: new Date().toISOString(), + }); + return { ok: true, doc }; +} + +export async function deleteJob(id: string, productId: string): Promise { + await jobs().delete(id, productId); +} + +// ── Runs ──────────────────────────────────────────────────────────────────── + +export async function createRun(doc: FleetRunDoc): Promise { + return runs().create(doc); +} + +export async function updateRun( + id: string, + jobId: string, + updates: Partial +): Promise { + const cur = await runs().findById(id, jobId); + if (!cur) return null; + return runs().update(id, jobId, updates); +} + +export async function listRunsByJob(jobId: string): Promise { + return runs().findMany({ filter: { jobId }, sort: { attempt: 1 } }); +} + +// ── Leases ────────────────────────────────────────────────────────────────── + +export async function getLease(jobId: string): Promise { + return leases().findById(jobId, jobId); +} + +export async function createLease(doc: FleetLeaseDoc): Promise { + return leases().create(doc); +} + +export async function revUpdateLease( + jobId: string, + expectedRev: number, + updates: Partial +): Promise> { + const cur = await leases().findById(jobId, jobId); + if (!cur) return { ok: false, reason: 'not_found' }; + if ((cur.rev ?? 0) !== expectedRev) return { ok: false, reason: 'conflict' }; + const doc = await leases().update(jobId, jobId, { + ...updates, + rev: expectedRev + 1, + updatedAt: new Date().toISOString(), + }); + return { ok: true, doc }; +} + +export async function listExpiredLeases(nowIso: string): Promise { + return leases().findMany({ + filter: { status: 'held', expiresAt: { $lt: nowIso } }, + }); +} + +// ── Factories ───────────────────────────────────────────────────────────────── + +export async function getFactory( + factoryId: string, + productId: string +): Promise { + return factories().findById(factoryId, productId); +} + +export async function upsertFactory(doc: FleetFactoryDoc): Promise { + return factories().upsert(doc); +} + +export async function listFactories(productId: string): Promise { + return factories().findMany({ filter: { productId } }); +} + +// ── Profiles ──────────────────────────────────────────────────────────────── + +export async function createProfile(doc: FleetProfileDoc): Promise { + return profiles().create(doc); +} + +export async function getProfile(id: string, productId: string): Promise { + return profiles().findById(id, productId); +} + +export async function listProfiles(productId: string): Promise { + return profiles().findMany({ filter: { productId } }); +} + +// ── Events (append-only) ──────────────────────────────────────────────────── + +export interface AppendEventInput { + jobId: string; + productId: string; + type: string; + actor?: string; + data?: Record; +} + +/** Append an ordered event to a job's stream; `seq` is monotonic per job. */ +export async function appendEvent(input: AppendEventInput): Promise { + const existing = await events().findMany({ filter: { jobId: input.jobId } }); + const seq = existing.length; + const doc: FleetEventDoc = { + id: `${input.jobId}:evt:${seq}`, + productId: input.productId, + jobId: input.jobId, + seq, + type: input.type, + at: new Date().toISOString(), + actor: input.actor, + data: input.data ?? {}, + }; + return events().create(doc); +} + +export async function listEvents(jobId: string): Promise { + const docs = await events().findMany({ filter: { jobId }, sort: { seq: 1 } }); + return docs; +} + +// ── Artifacts ───────────────────────────────────────────────────────────────── + +export async function createArtifact(doc: FleetArtifactDoc): Promise { + return artifacts().create(doc); +} + +export async function listArtifacts(jobId: string): Promise { + return artifacts().findMany({ filter: { jobId }, sort: { createdAt: 1 } }); +}