diff --git a/services/platform-service/src/lib/cosmos-init.ts b/services/platform-service/src/lib/cosmos-init.ts index 3e278004..41544842 100644 --- a/services/platform-service/src/lib/cosmos-init.ts +++ b/services/platform-service/src/lib/cosmos-init.ts @@ -206,6 +206,9 @@ const CONTAINER_DEFS: Record = { fleet_events: { partitionKeyPath: '/jobId' }, fleet_artifacts: { partitionKeyPath: '/jobId' }, fleet_factory_tokens: { partitionKeyPath: '/productId' }, + // M0 RU gate: per-product monotonic "work changed" counter (see + // docs/GIGAFACTORY/FLEET_DISPATCH_REDESIGN.md §8/§12). + fleet_queue_state: { partitionKeyPath: '/productId' }, }; export async function initCosmosIfNeeded(): Promise { diff --git a/services/platform-service/src/modules/fleet/repository.test.ts b/services/platform-service/src/modules/fleet/repository.test.ts index 039794a7..3d6073fe 100644 --- a/services/platform-service/src/modules/fleet/repository.test.ts +++ b/services/platform-service/src/modules/fleet/repository.test.ts @@ -77,6 +77,33 @@ describe('fleet repository', () => { if (!missing.ok) expect(missing.reason).toBe('not_found'); }); + it('queue-state (M0 gate): version starts at 0, bumps on create + stage change', async () => { + expect(await repo.getQueueVersion(PID)).toBe(0); + + // submit (createJob) bumps once + await repo.createJob(jobDoc({ id: 'fjob_q', rev: 0, stage: 'queued' })); + const afterCreate = await repo.getQueueVersion(PID); + expect(afterCreate).toBe(1); + + // a successful stage-changing CAS bumps again + const ok = await repo.revUpdateJob('fjob_q', PID, 0, { stage: 'assigned' }); + expect(ok.ok).toBe(true); + expect(await repo.getQueueVersion(PID)).toBe(2); + + // a CAS WITHOUT a stage change (e.g. lease renewal) does NOT bump + const noStage = await repo.revUpdateJob('fjob_q', PID, 1, { leaseEpoch: 5 }); + expect(noStage.ok).toBe(true); + expect(await repo.getQueueVersion(PID)).toBe(2); + + // a CONFLICTING CAS (stale rev) does NOT bump + const stale = await repo.revUpdateJob('fjob_q', PID, 0, { stage: 'building' }); + expect(stale.ok).toBe(false); + expect(await repo.getQueueVersion(PID)).toBe(2); + + // the gate is scoped per product + expect(await repo.getQueueVersion('other-product')).toBe(0); + }); + it('runs: create + list ordered by attempt', async () => { await repo.createRun({ id: 'r2', diff --git a/services/platform-service/src/modules/fleet/repository.ts b/services/platform-service/src/modules/fleet/repository.ts index 9e427f79..fb6abe49 100644 --- a/services/platform-service/src/modules/fleet/repository.ts +++ b/services/platform-service/src/modules/fleet/repository.ts @@ -27,6 +27,7 @@ import type { FleetJobDoc, FleetLeaseDoc, FleetProfileDoc, + FleetQueueStateDoc, FleetRunDoc, FleetStage, } from './types.js'; @@ -60,14 +61,47 @@ function factoryTokens(): DocumentCollection { function budgets(): DocumentCollection { return getCollection('fleet_budgets', '/productId'); } +function queueState(): DocumentCollection { + return getCollection('fleet_queue_state', '/productId'); +} /** Result of a compare-and-swap update. */ export type RevResult = { ok: true; doc: T } | { ok: false; reason: 'not_found' | 'conflict' }; +// ── Queue state (M0 RU gate — per-product monotonic "work changed" counter) ───── + +/** Current queue version for a product (0 if never bumped). A ~1 RU point read. */ +export async function getQueueVersion(productId: string): Promise { + const doc = await queueState().findById(productId, productId); + return doc?.version ?? 0; +} + +/** + * Best-effort monotonic bump of a product's queue version. Called on job create + + * every stage change so a polling factory can detect "something changed" with a + * cheap point read instead of a full `listJobs` scan. NEVER throws — the gate is + * an optimization, so a bump failure must not fail the underlying job write. + */ +export async function bumpQueueVersion(productId: string): Promise { + try { + const cur = await queueState().findById(productId, productId); + await queueState().upsert({ + id: productId, + productId, + version: (cur?.version ?? 0) + 1, + updatedAt: new Date().toISOString(), + }); + } catch { + /* best-effort: the gate is an optimization, never fail the job write */ + } +} + // ── Jobs ────────────────────────────────────────────────────────────────────── export async function createJob(doc: FleetJobDoc): Promise { - return jobs().create(doc); + const created = await jobs().create(doc); + await bumpQueueVersion(doc.productId); // new queued work appeared + return created; } export async function getJob(id: string, productId: string): Promise { @@ -118,11 +152,13 @@ export async function updateJob( ): Promise { const cur = await jobs().findById(id, productId); if (!cur) return null; - return jobs().update(id, productId, { + const updated = await jobs().update(id, productId, { ...updates, rev: (cur.rev ?? 0) + 1, updatedAt: new Date().toISOString(), }); + if (updates.stage !== undefined) await bumpQueueVersion(productId); // claimable set changed + return updated; } /** @@ -138,12 +174,16 @@ export async function revUpdateJob( expectedRev: number, updates: Partial ): Promise> { - return jobs().updateIfMatch( + const res = await jobs().updateIfMatch( id, productId, { rev: expectedRev }, { ...updates, updatedAt: new Date().toISOString() } ); + // A successful stage transition changes the claimable set — bump the gate so + // polling factories notice. Only on the winning CAS + when stage actually moved. + if (res.ok && updates.stage !== undefined) await bumpQueueVersion(productId); + return res; } export async function deleteJob(id: string, productId: string): Promise { diff --git a/services/platform-service/src/modules/fleet/routes.test.ts b/services/platform-service/src/modules/fleet/routes.test.ts index a748580f..a2af5316 100644 --- a/services/platform-service/src/modules/fleet/routes.test.ts +++ b/services/platform-service/src/modules/fleet/routes.test.ts @@ -52,6 +52,21 @@ describe('fleetRoutes', () => { expect(res.statusCode).toBe(400); }); + it('GET /fleet/queue-state (M0 gate) returns 0, then advances after a submit', async () => { + const app = await buildApp(); + const before = await app.inject({ method: 'GET', url: '/api/fleet/queue-state' }); + expect(before.statusCode).toBe(200); + const v0 = JSON.parse(before.body).version as number; + expect(v0).toBe(0); + + await submit(app, { idempotencyKey: 'qs-1', bodyMd: '# task' }); + + const after = await app.inject({ method: 'GET', url: '/api/fleet/queue-state' }); + const body = JSON.parse(after.body); + expect(body.productId).toBe('lysnrai'); + expect(body.version).toBeGreaterThan(v0); + }); + it('gated lifecycle via routes: submit -> claim -> building -> review -> approve -> ship -> metrics', async () => { const app = await buildApp(); const sub = await submit(app, { diff --git a/services/platform-service/src/modules/fleet/routes.ts b/services/platform-service/src/modules/fleet/routes.ts index c26fe1f7..bad5264e 100644 --- a/services/platform-service/src/modules/fleet/routes.ts +++ b/services/platform-service/src/modules/fleet/routes.ts @@ -13,6 +13,7 @@ * GET /fleet/jobs/:id/events append-only event stream * GET /fleet/jobs/:id/events/stream live event stream (SSE, resumable) * GET /fleet/metrics fleet metrics + alerts (queue depth, utilization) + * GET /fleet/queue-state per-product queue version (M0 RU gate — cheap point read) * POST /fleet/jobs/:id/review/request route a building job into the review gate * POST /fleet/jobs/:id/review submit a reviewer decision (approve/reject) * POST /fleet/jobs/:id/artifacts upload a run output (base64 body → blob + pointer) @@ -401,6 +402,16 @@ export async function fleetRoutes(app: FastifyInstance) { return coordinator.fleetMetrics(pid); }); + // ── M0 RU gate: per-product queue version (cheap ~1 RU point read) ── + // A polling factory reads this each tick and only runs the expensive claim when + // `version` changed since its last attempt. See + // docs/GIGAFACTORY/FLEET_DISPATCH_REDESIGN.md §8/§12. + app.get('/fleet/queue-state', async req => { + await extractAuth(req); + const pid = getRequestProductId(req); + return { productId: pid, version: await repo.getQueueVersion(pid) }; + }); + // ── Artifacts: upload (base64 body → blob + pointer) ── app.post('/fleet/jobs/:id/artifacts', async (req, reply) => { await extractAuth(req); diff --git a/services/platform-service/src/modules/fleet/types.ts b/services/platform-service/src/modules/fleet/types.ts index 62332dba..7112fd15 100644 --- a/services/platform-service/src/modules/fleet/types.ts +++ b/services/platform-service/src/modules/fleet/types.ts @@ -524,6 +524,23 @@ export const FleetBudgetDocSchema = z.object({ }); export type FleetBudgetDoc = z.infer; +/** + * FleetQueueStateDoc — a per-product monotonic "work changed" counter (pk `/productId`). + * The M0 RU gate (docs/GIGAFACTORY/FLEET_DISPATCH_REDESIGN.md §8/§12): a factory + * point-reads `version` (~1 RU) and only runs the expensive claim when it has + * changed since its last attempt. Bumped on job create + every stage change. The + * bump is best-effort and not exact under concurrency (the gate is an + * optimization, not a correctness boundary) — a factory's periodic safety claim + * backstops any missed bump. + */ +export const FleetQueueStateDocSchema = z.object({ + id: z.string(), + productId: z.string().min(1), + version: z.number().int().nonnegative().default(0), + updatedAt: z.string(), +}); +export type FleetQueueStateDoc = z.infer; + /** Upsert a product's budget config. */ export const UpsertBudgetSchema = z.object({ ceilingUsd: z.number().nonnegative(),