feat(fleet): M0 RU gate — cheap per-product queue version + skip-claim

Adds fleet_queue_state (monotonic version per product), bumped on job create +
every stage change in the repository layer (best-effort, never fails a job
write), and a GET /fleet/queue-state read endpoint. Lets a polling factory
detect "work changed" with a ~1 RU point read instead of a full listJobs scan
on every claim. Registers the container; tests cover the bump + endpoint.

See agent-queue docs/GIGAFACTORY/FLEET_DISPATCH_REDESIGN.md §8/§12 (M0).

Generated with [Devin](https://cli.devin.ai/docs)

Co-Authored-By: Devin <158243242+devin-ai-integration[bot]@users.noreply.github.com>
This commit is contained in:
saravanakumardb1 2026-05-31 23:18:00 -07:00
parent 5bc72cf221
commit ba7db0008d
6 changed files with 116 additions and 3 deletions

View File

@ -206,6 +206,9 @@ const CONTAINER_DEFS: Record<string, ContainerConfig> = {
fleet_events: { partitionKeyPath: '/jobId' },
fleet_artifacts: { partitionKeyPath: '/jobId' },
fleet_factory_tokens: { partitionKeyPath: '/productId' },
// M0 RU gate: per-product monotonic "work changed" counter (see
// docs/GIGAFACTORY/FLEET_DISPATCH_REDESIGN.md §8/§12).
fleet_queue_state: { partitionKeyPath: '/productId' },
};
export async function initCosmosIfNeeded(): Promise<void> {

View File

@ -77,6 +77,33 @@ describe('fleet repository', () => {
if (!missing.ok) expect(missing.reason).toBe('not_found');
});
it('queue-state (M0 gate): version starts at 0, bumps on create + stage change', async () => {
expect(await repo.getQueueVersion(PID)).toBe(0);
// submit (createJob) bumps once
await repo.createJob(jobDoc({ id: 'fjob_q', rev: 0, stage: 'queued' }));
const afterCreate = await repo.getQueueVersion(PID);
expect(afterCreate).toBe(1);
// a successful stage-changing CAS bumps again
const ok = await repo.revUpdateJob('fjob_q', PID, 0, { stage: 'assigned' });
expect(ok.ok).toBe(true);
expect(await repo.getQueueVersion(PID)).toBe(2);
// a CAS WITHOUT a stage change (e.g. lease renewal) does NOT bump
const noStage = await repo.revUpdateJob('fjob_q', PID, 1, { leaseEpoch: 5 });
expect(noStage.ok).toBe(true);
expect(await repo.getQueueVersion(PID)).toBe(2);
// a CONFLICTING CAS (stale rev) does NOT bump
const stale = await repo.revUpdateJob('fjob_q', PID, 0, { stage: 'building' });
expect(stale.ok).toBe(false);
expect(await repo.getQueueVersion(PID)).toBe(2);
// the gate is scoped per product
expect(await repo.getQueueVersion('other-product')).toBe(0);
});
it('runs: create + list ordered by attempt', async () => {
await repo.createRun({
id: 'r2',

View File

@ -27,6 +27,7 @@ import type {
FleetJobDoc,
FleetLeaseDoc,
FleetProfileDoc,
FleetQueueStateDoc,
FleetRunDoc,
FleetStage,
} from './types.js';
@ -60,14 +61,47 @@ function factoryTokens(): DocumentCollection<FleetFactoryTokenDoc> {
function budgets(): DocumentCollection<FleetBudgetDoc> {
return getCollection<FleetBudgetDoc>('fleet_budgets', '/productId');
}
function queueState(): DocumentCollection<FleetQueueStateDoc> {
return getCollection<FleetQueueStateDoc>('fleet_queue_state', '/productId');
}
/** Result of a compare-and-swap update. */
export type RevResult<T> = { ok: true; doc: T } | { ok: false; reason: 'not_found' | 'conflict' };
// ── Queue state (M0 RU gate — per-product monotonic "work changed" counter) ─────
/** Current queue version for a product (0 if never bumped). A ~1 RU point read. */
export async function getQueueVersion(productId: string): Promise<number> {
const doc = await queueState().findById(productId, productId);
return doc?.version ?? 0;
}
/**
* Best-effort monotonic bump of a product's queue version. Called on job create +
* every stage change so a polling factory can detect "something changed" with a
* cheap point read instead of a full `listJobs` scan. NEVER throws the gate is
* an optimization, so a bump failure must not fail the underlying job write.
*/
export async function bumpQueueVersion(productId: string): Promise<void> {
try {
const cur = await queueState().findById(productId, productId);
await queueState().upsert({
id: productId,
productId,
version: (cur?.version ?? 0) + 1,
updatedAt: new Date().toISOString(),
});
} catch {
/* best-effort: the gate is an optimization, never fail the job write */
}
}
// ── Jobs ──────────────────────────────────────────────────────────────────────
export async function createJob(doc: FleetJobDoc): Promise<FleetJobDoc> {
return jobs().create(doc);
const created = await jobs().create(doc);
await bumpQueueVersion(doc.productId); // new queued work appeared
return created;
}
export async function getJob(id: string, productId: string): Promise<FleetJobDoc | null> {
@ -118,11 +152,13 @@ export async function updateJob(
): Promise<FleetJobDoc | null> {
const cur = await jobs().findById(id, productId);
if (!cur) return null;
return jobs().update(id, productId, {
const updated = await jobs().update(id, productId, {
...updates,
rev: (cur.rev ?? 0) + 1,
updatedAt: new Date().toISOString(),
});
if (updates.stage !== undefined) await bumpQueueVersion(productId); // claimable set changed
return updated;
}
/**
@ -138,12 +174,16 @@ export async function revUpdateJob(
expectedRev: number,
updates: Partial<FleetJobDoc>
): Promise<RevResult<FleetJobDoc>> {
return jobs().updateIfMatch(
const res = await jobs().updateIfMatch(
id,
productId,
{ rev: expectedRev },
{ ...updates, updatedAt: new Date().toISOString() }
);
// A successful stage transition changes the claimable set — bump the gate so
// polling factories notice. Only on the winning CAS + when stage actually moved.
if (res.ok && updates.stage !== undefined) await bumpQueueVersion(productId);
return res;
}
export async function deleteJob(id: string, productId: string): Promise<void> {

View File

@ -52,6 +52,21 @@ describe('fleetRoutes', () => {
expect(res.statusCode).toBe(400);
});
it('GET /fleet/queue-state (M0 gate) returns 0, then advances after a submit', async () => {
const app = await buildApp();
const before = await app.inject({ method: 'GET', url: '/api/fleet/queue-state' });
expect(before.statusCode).toBe(200);
const v0 = JSON.parse(before.body).version as number;
expect(v0).toBe(0);
await submit(app, { idempotencyKey: 'qs-1', bodyMd: '# task' });
const after = await app.inject({ method: 'GET', url: '/api/fleet/queue-state' });
const body = JSON.parse(after.body);
expect(body.productId).toBe('lysnrai');
expect(body.version).toBeGreaterThan(v0);
});
it('gated lifecycle via routes: submit -> claim -> building -> review -> approve -> ship -> metrics', async () => {
const app = await buildApp();
const sub = await submit(app, {

View File

@ -13,6 +13,7 @@
* GET /fleet/jobs/:id/events append-only event stream
* GET /fleet/jobs/:id/events/stream live event stream (SSE, resumable)
* GET /fleet/metrics fleet metrics + alerts (queue depth, utilization)
* GET /fleet/queue-state per-product queue version (M0 RU gate cheap point read)
* POST /fleet/jobs/:id/review/request route a building job into the review gate
* POST /fleet/jobs/:id/review submit a reviewer decision (approve/reject)
* POST /fleet/jobs/:id/artifacts upload a run output (base64 body blob + pointer)
@ -401,6 +402,16 @@ export async function fleetRoutes(app: FastifyInstance) {
return coordinator.fleetMetrics(pid);
});
// ── M0 RU gate: per-product queue version (cheap ~1 RU point read) ──
// A polling factory reads this each tick and only runs the expensive claim when
// `version` changed since its last attempt. See
// docs/GIGAFACTORY/FLEET_DISPATCH_REDESIGN.md §8/§12.
app.get('/fleet/queue-state', async req => {
await extractAuth(req);
const pid = getRequestProductId(req);
return { productId: pid, version: await repo.getQueueVersion(pid) };
});
// ── Artifacts: upload (base64 body → blob + pointer) ──
app.post('/fleet/jobs/:id/artifacts', async (req, reply) => {
await extractAuth(req);

View File

@ -524,6 +524,23 @@ export const FleetBudgetDocSchema = z.object({
});
export type FleetBudgetDoc = z.infer<typeof FleetBudgetDocSchema>;
/**
* FleetQueueStateDoc a per-product monotonic "work changed" counter (pk `/productId`).
* The M0 RU gate (docs/GIGAFACTORY/FLEET_DISPATCH_REDESIGN.md §8/§12): a factory
* point-reads `version` (~1 RU) and only runs the expensive claim when it has
* changed since its last attempt. Bumped on job create + every stage change. The
* bump is best-effort and not exact under concurrency (the gate is an
* optimization, not a correctness boundary) a factory's periodic safety claim
* backstops any missed bump.
*/
export const FleetQueueStateDocSchema = z.object({
id: z.string(),
productId: z.string().min(1),
version: z.number().int().nonnegative().default(0),
updatedAt: z.string(),
});
export type FleetQueueStateDoc = z.infer<typeof FleetQueueStateDocSchema>;
/** Upsert a product's budget config. */
export const UpsertBudgetSchema = z.object({
ceilingUsd: z.number().nonnegative(),