feat(platform-service): fleet repositories with rev compare-and-swap (P2 foundation)

One repository per fleet_* container on the @bytelyst/datastore abstraction
(memory + cosmos): create/getById/list (by productId, stage, idempotencyKey),
partition-aware single-partition queries, ordered append-only appendEvent, and
runs/leases/factories/profiles/artifacts CRUD. Adds revUpdateJob/revUpdateLease —
a `rev`-token compare-and-swap that writes only when the stored rev still matches
(the optimistic-concurrency primitive for atomic claim + fenced transitions;
maps to Cosmos _etag/If-Match in production).
This commit is contained in:
saravanakumardb1 2026-05-29 20:20:15 -07:00
parent 721d3fcb48
commit fada354df8
2 changed files with 444 additions and 0 deletions

View File

@ -0,0 +1,187 @@
/**
* Fleet repository CRUD round-trips, list filters, appendEvent ordering, and
* the rev compare-and-swap. Runs on the in-memory datastore provider.
*/
import { afterEach, beforeEach, describe, expect, it } from 'vitest';
import { MemoryDatastoreProvider } from '@bytelyst/datastore';
import { _resetDatastoreProvider, setProvider } from '../../lib/datastore.js';
import * as repo from './repository.js';
import type { FleetJobDoc } from './types.js';
const PID = 'lysnrai';
const now = '2026-05-30T00:00:00.000Z';
function jobDoc(over: Partial<FleetJobDoc> = {}): FleetJobDoc {
return {
id: 'fjob_1',
productId: PID,
stage: 'queued',
idempotencyKey: 'task-1',
contentHash: 'h1',
bodyMd: '# task',
manifestSnapshot: {
priority: 'medium',
capabilities: [],
prefersEngine: [],
allowedScope: [],
deps: [],
},
priority: 'medium',
priorityOrder: 2,
capabilities: [],
deps: [],
kind: 'leaf',
attempts: 0,
leaseEpoch: 0,
rev: 0,
createdAt: now,
updatedAt: now,
...over,
};
}
describe('fleet repository', () => {
beforeEach(() => setProvider(new MemoryDatastoreProvider()));
afterEach(() => _resetDatastoreProvider());
it('jobs: create / getById / list by stage + idempotencyKey', async () => {
await repo.createJob(jobDoc({ id: 'fjob_1', idempotencyKey: 'a', stage: 'queued' }));
await repo.createJob(jobDoc({ id: 'fjob_2', idempotencyKey: 'b', stage: 'blocked' }));
expect((await repo.getJob('fjob_1', PID))?.idempotencyKey).toBe('a');
expect(await repo.getJob('missing', PID)).toBeNull();
const queued = await repo.listJobs({ productId: PID, stage: 'queued' });
expect(queued.map(j => j.id)).toEqual(['fjob_1']);
const byKey = await repo.findJobsByIdempotencyKey(PID, 'b');
expect(byKey).toHaveLength(1);
expect(byKey[0].id).toBe('fjob_2');
});
it('jobs: revUpdate is a compare-and-swap', async () => {
await repo.createJob(jobDoc({ id: 'fjob_cas', rev: 0, stage: 'queued' }));
const first = await repo.revUpdateJob('fjob_cas', PID, 0, { stage: 'assigned' });
expect(first.ok).toBe(true);
if (first.ok) expect(first.doc.rev).toBe(1);
// a second writer using the stale rev (0) must conflict
const stale = await repo.revUpdateJob('fjob_cas', PID, 0, { stage: 'building' });
expect(stale.ok).toBe(false);
if (!stale.ok) expect(stale.reason).toBe('conflict');
const missing = await repo.revUpdateJob('nope', PID, 0, { stage: 'failed' });
expect(missing.ok).toBe(false);
if (!missing.ok) expect(missing.reason).toBe('not_found');
});
it('runs: create + list ordered by attempt', async () => {
await repo.createRun({
id: 'r2',
productId: PID,
jobId: 'j',
attempt: 2,
engine: 'devin',
startedAt: now,
insights: {},
});
await repo.createRun({
id: 'r1',
productId: PID,
jobId: 'j',
attempt: 1,
engine: 'devin',
startedAt: now,
insights: {},
});
const runs = await repo.listRunsByJob('j');
expect(runs.map(r => r.attempt)).toEqual([1, 2]);
});
it('leases: create + revUpdate', async () => {
await repo.createLease({
id: 'j',
productId: PID,
jobId: 'j',
leaseEpoch: 1,
renewals: 0,
status: 'held',
rev: 0,
updatedAt: now,
});
const res = await repo.revUpdateLease('j', 0, { renewals: 1 });
expect(res.ok).toBe(true);
if (res.ok) expect(res.doc.renewals).toBe(1);
expect((await repo.getLease('j'))?.rev).toBe(1);
});
it('factories: upsert + list by productId', async () => {
await repo.upsertFactory({
id: 'fac_1',
productId: PID,
factoryId: 'fac_1',
descriptor: {},
capabilities: ['os:mac'],
health: 'ok',
load: 0,
seatLimit: 1,
lastHeartbeatAt: now,
});
await repo.upsertFactory({
id: 'fac_1',
productId: PID,
factoryId: 'fac_1',
descriptor: {},
capabilities: ['os:mac'],
health: 'degraded',
load: 2,
seatLimit: 1,
lastHeartbeatAt: now,
});
const list = await repo.listFactories(PID);
expect(list).toHaveLength(1);
expect(list[0].health).toBe('degraded');
});
it('profiles: create + get', async () => {
await repo.createProfile({
id: 'prof_1',
productId: PID,
name: 'backend',
version: 1,
snapshot: { persona: 'x' },
createdAt: now,
});
expect((await repo.getProfile('prof_1', PID))?.name).toBe('backend');
});
it('events: appendEvent yields an ordered, append-only stream', async () => {
await repo.appendEvent({ jobId: 'j', productId: PID, type: 'submitted' });
await repo.appendEvent({ jobId: 'j', productId: PID, type: 'assigned', actor: 'fac_1' });
await repo.appendEvent({
jobId: 'j',
productId: PID,
type: 'transition',
data: { stage: 'building' },
});
const events = await repo.listEvents('j');
expect(events.map(e => e.seq)).toEqual([0, 1, 2]);
expect(events.map(e => e.type)).toEqual(['submitted', 'assigned', 'transition']);
});
it('artifacts: create + list', async () => {
await repo.createArtifact({
id: 'art_1',
productId: PID,
jobId: 'j',
kind: 'coverage',
blobUrl: 'https://b/x',
createdAt: now,
});
const arts = await repo.listArtifacts('j');
expect(arts).toHaveLength(1);
expect(arts[0].blobUrl).toBe('https://b/x');
});
});

View File

@ -0,0 +1,257 @@
/**
* Fleet repositories one per fleet_* container, cloud-agnostic via @bytelyst/datastore.
*
* Partition keys (see lib/cosmos-init.ts):
* fleet_jobs /productId fleet_runs /jobId
* fleet_leases /jobId fleet_factories /productId
* fleet_profiles /productId fleet_events /jobId
* fleet_artifacts /jobId
*
* Optimistic concurrency: jobs and leases carry a monotonic `rev` token.
* `revUpdate*` is a compare-and-swap it writes only when the stored `rev`
* still equals the caller's expected `rev`, otherwise it reports a conflict
* WITHOUT writing. In production (Cosmos) this maps to an `_etag` / If-Match
* conditional replace; on the memory provider it is enforced by re-reading the
* current `rev` immediately before the write, which is exact for the sequential
* calls the coordinator + tests make (see coordinator.claimNextJob).
*/
import type { DocumentCollection } from '@bytelyst/datastore';
import { getCollection } from '../../lib/datastore.js';
import type {
FleetArtifactDoc,
FleetEventDoc,
FleetFactoryDoc,
FleetJobDoc,
FleetLeaseDoc,
FleetProfileDoc,
FleetRunDoc,
FleetStage,
} from './types.js';
// ── Collections ───────────────────────────────────────────────────────────────
function jobs(): DocumentCollection<FleetJobDoc> {
return getCollection<FleetJobDoc>('fleet_jobs', '/productId');
}
function runs(): DocumentCollection<FleetRunDoc> {
return getCollection<FleetRunDoc>('fleet_runs', '/jobId');
}
function leases(): DocumentCollection<FleetLeaseDoc> {
return getCollection<FleetLeaseDoc>('fleet_leases', '/jobId');
}
function factories(): DocumentCollection<FleetFactoryDoc> {
return getCollection<FleetFactoryDoc>('fleet_factories', '/productId');
}
function profiles(): DocumentCollection<FleetProfileDoc> {
return getCollection<FleetProfileDoc>('fleet_profiles', '/productId');
}
function events(): DocumentCollection<FleetEventDoc> {
return getCollection<FleetEventDoc>('fleet_events', '/jobId');
}
function artifacts(): DocumentCollection<FleetArtifactDoc> {
return getCollection<FleetArtifactDoc>('fleet_artifacts', '/jobId');
}
/** Result of a compare-and-swap update. */
export type RevResult<T> = { ok: true; doc: T } | { ok: false; reason: 'not_found' | 'conflict' };
// ── Jobs ──────────────────────────────────────────────────────────────────────
export async function createJob(doc: FleetJobDoc): Promise<FleetJobDoc> {
return jobs().create(doc);
}
export async function getJob(id: string, productId: string): Promise<FleetJobDoc | null> {
return jobs().findById(id, productId);
}
export interface ListJobsFilter {
productId: string;
stage?: FleetStage;
idempotencyKey?: string;
limit?: number;
offset?: number;
}
export async function listJobs(f: ListJobsFilter): Promise<FleetJobDoc[]> {
const filter: Record<string, string> = { productId: f.productId };
if (f.stage) filter.stage = f.stage;
if (f.idempotencyKey) filter.idempotencyKey = f.idempotencyKey;
return jobs().findMany({
filter,
sort: { priorityOrder: 1, createdAt: 1 },
offset: f.offset,
limit: f.limit,
});
}
/** All jobs for a product sharing an idempotency-key (dedupe lookups). */
export async function findJobsByIdempotencyKey(
productId: string,
idempotencyKey: string
): Promise<FleetJobDoc[]> {
return jobs().findMany({ filter: { productId, idempotencyKey } });
}
/** Unconditional merge update (use only when concurrency is not contended). */
export async function updateJob(
id: string,
productId: string,
updates: Partial<FleetJobDoc>
): Promise<FleetJobDoc | null> {
const cur = await jobs().findById(id, productId);
if (!cur) return null;
return jobs().update(id, productId, {
...updates,
rev: (cur.rev ?? 0) + 1,
updatedAt: new Date().toISOString(),
});
}
/** Compare-and-swap on `rev` — the atomic-claim / fenced-transition primitive. */
export async function revUpdateJob(
id: string,
productId: string,
expectedRev: number,
updates: Partial<FleetJobDoc>
): Promise<RevResult<FleetJobDoc>> {
const cur = await jobs().findById(id, productId);
if (!cur) return { ok: false, reason: 'not_found' };
if ((cur.rev ?? 0) !== expectedRev) return { ok: false, reason: 'conflict' };
const doc = await jobs().update(id, productId, {
...updates,
rev: expectedRev + 1,
updatedAt: new Date().toISOString(),
});
return { ok: true, doc };
}
export async function deleteJob(id: string, productId: string): Promise<void> {
await jobs().delete(id, productId);
}
// ── Runs ────────────────────────────────────────────────────────────────────
export async function createRun(doc: FleetRunDoc): Promise<FleetRunDoc> {
return runs().create(doc);
}
export async function updateRun(
id: string,
jobId: string,
updates: Partial<FleetRunDoc>
): Promise<FleetRunDoc | null> {
const cur = await runs().findById(id, jobId);
if (!cur) return null;
return runs().update(id, jobId, updates);
}
export async function listRunsByJob(jobId: string): Promise<FleetRunDoc[]> {
return runs().findMany({ filter: { jobId }, sort: { attempt: 1 } });
}
// ── Leases ──────────────────────────────────────────────────────────────────
export async function getLease(jobId: string): Promise<FleetLeaseDoc | null> {
return leases().findById(jobId, jobId);
}
export async function createLease(doc: FleetLeaseDoc): Promise<FleetLeaseDoc> {
return leases().create(doc);
}
export async function revUpdateLease(
jobId: string,
expectedRev: number,
updates: Partial<FleetLeaseDoc>
): Promise<RevResult<FleetLeaseDoc>> {
const cur = await leases().findById(jobId, jobId);
if (!cur) return { ok: false, reason: 'not_found' };
if ((cur.rev ?? 0) !== expectedRev) return { ok: false, reason: 'conflict' };
const doc = await leases().update(jobId, jobId, {
...updates,
rev: expectedRev + 1,
updatedAt: new Date().toISOString(),
});
return { ok: true, doc };
}
export async function listExpiredLeases(nowIso: string): Promise<FleetLeaseDoc[]> {
return leases().findMany({
filter: { status: 'held', expiresAt: { $lt: nowIso } },
});
}
// ── Factories ─────────────────────────────────────────────────────────────────
export async function getFactory(
factoryId: string,
productId: string
): Promise<FleetFactoryDoc | null> {
return factories().findById(factoryId, productId);
}
export async function upsertFactory(doc: FleetFactoryDoc): Promise<FleetFactoryDoc> {
return factories().upsert(doc);
}
export async function listFactories(productId: string): Promise<FleetFactoryDoc[]> {
return factories().findMany({ filter: { productId } });
}
// ── Profiles ────────────────────────────────────────────────────────────────
export async function createProfile(doc: FleetProfileDoc): Promise<FleetProfileDoc> {
return profiles().create(doc);
}
export async function getProfile(id: string, productId: string): Promise<FleetProfileDoc | null> {
return profiles().findById(id, productId);
}
export async function listProfiles(productId: string): Promise<FleetProfileDoc[]> {
return profiles().findMany({ filter: { productId } });
}
// ── Events (append-only) ────────────────────────────────────────────────────
export interface AppendEventInput {
jobId: string;
productId: string;
type: string;
actor?: string;
data?: Record<string, unknown>;
}
/** Append an ordered event to a job's stream; `seq` is monotonic per job. */
export async function appendEvent(input: AppendEventInput): Promise<FleetEventDoc> {
const existing = await events().findMany({ filter: { jobId: input.jobId } });
const seq = existing.length;
const doc: FleetEventDoc = {
id: `${input.jobId}:evt:${seq}`,
productId: input.productId,
jobId: input.jobId,
seq,
type: input.type,
at: new Date().toISOString(),
actor: input.actor,
data: input.data ?? {},
};
return events().create(doc);
}
export async function listEvents(jobId: string): Promise<FleetEventDoc[]> {
const docs = await events().findMany({ filter: { jobId }, sort: { seq: 1 } });
return docs;
}
// ── Artifacts ─────────────────────────────────────────────────────────────────
export async function createArtifact(doc: FleetArtifactDoc): Promise<FleetArtifactDoc> {
return artifacts().create(doc);
}
export async function listArtifacts(jobId: string): Promise<FleetArtifactDoc[]> {
return artifacts().findMany({ filter: { jobId }, sort: { createdAt: 1 } });
}