From 33c1d8d5fa98d371bc6fe01d86b4b4137c8aa548 Mon Sep 17 00:00:00 2001 From: saravanakumardb1 Date: Fri, 29 May 2026 20:59:08 -0700 Subject: [PATCH] fix(platform-service): make fleet job claim truly atomic via datastore updateIfMatch MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The foundation's revUpdateJob/revUpdateLease did a read -> rev-check -> write with await points between them, so two CONCURRENT claims could both read the same rev, both pass the check, and both write — a double-assignment the old (sequential) race test could not catch. Rewire revUpdateJob/revUpdateLease to delegate to the datastore's updateIfMatch, which performs the compare and the write as one indivisible operation (Cosmos If-Match; synchronous compare-set on memory). The coordinator's tryClaimJob keeps identical external behavior (ok/conflict) but is now genuinely single-winner. Upgrades the coordinator tests to prove atomicity under TRUE concurrency: - two contenders via Promise.all -> exactly one ok, one conflict; assigned once; one run; one lease; leaseEpoch 1. - N-claimer (15) stress via Promise.all -> one ok, N-1 conflicts, no double-assignment. - N concurrent claimNextJob for one job -> exactly one non-null claim. - N concurrent lease renewals -> exactly one wins. Verified these concurrent tests FAIL against the old read-check-write (double-assign) and pass after the fix. --- .../src/modules/fleet/coordinator.test.ts | 54 ++++++++++++++++--- .../src/modules/fleet/repository.ts | 39 +++++++------- 2 files changed, 68 insertions(+), 25 deletions(-) diff --git a/services/platform-service/src/modules/fleet/coordinator.test.ts b/services/platform-service/src/modules/fleet/coordinator.test.ts index d24f0751..74966e2e 100644 --- a/services/platform-service/src/modules/fleet/coordinator.test.ts +++ b/services/platform-service/src/modules/fleet/coordinator.test.ts @@ -46,13 +46,16 @@ describe('fleet coordinator', () => { expect(job.productId).toBe(PID); }); - // ── ATOMIC CLAIM RACE ── - it('atomic claim: two contenders on the same job version — exactly one wins', async () => { + // ── ATOMIC CLAIM RACE (TRUE concurrency, not sequential) ── + it('atomic claim: two TRULY concurrent contenders on the same job version — exactly one wins', async () => { const { job } = await coord.submitJob(PID, input()); - // Both contenders see the SAME job version (rev). The CAS picks one winner. - const a = await coord.tryClaimJob(job, factory({ factoryId: 'fac_A' })); - const b = await coord.tryClaimJob(job, factory({ factoryId: 'fac_B' })); + // Both contenders see the SAME freshly-read job version and race via Promise.all. + // The datastore compare-and-set (updateIfMatch) admits exactly one winner. + const [a, b] = await Promise.all([ + coord.tryClaimJob(job, factory({ factoryId: 'fac_A' })), + coord.tryClaimJob(job, factory({ factoryId: 'fac_B' })), + ]); const oks = [a, b].filter(r => r.ok); const conflicts = [a, b].filter(r => !r.ok); @@ -60,7 +63,7 @@ describe('fleet coordinator', () => { expect(conflicts).toHaveLength(1); expect(conflicts[0].ok === false && conflicts[0].reason).toBe('conflict'); - // no double-assignment: the job is assigned exactly once, single run, single holder + // no double-assignment: assigned exactly once, single run, single lease holder const stored = await repo.getJob(job.id, PID); expect(stored?.stage).toBe('assigned'); expect(stored?.attempts).toBe(1); @@ -69,6 +72,45 @@ describe('fleet coordinator', () => { expect(lease?.leaseEpoch).toBe(1); }); + it('atomic claim: N concurrent contenders (stress) — exactly one ok, N-1 conflicts, no double-assignment', async () => { + const N = 15; + const { job } = await coord.submitJob(PID, input()); + const results = await Promise.all( + Array.from({ length: N }, (_, i) => + coord.tryClaimJob(job, factory({ factoryId: `fac_${i}` })) + ) + ); + expect(results.filter(r => r.ok)).toHaveLength(1); + expect(results.filter(r => !r.ok && r.reason === 'conflict')).toHaveLength(N - 1); + // exactly one run + one assigned job + leaseEpoch 1 + expect(await repo.listRunsByJob(job.id)).toHaveLength(1); + const stored = await repo.getJob(job.id, PID); + expect(stored?.stage).toBe('assigned'); + expect(stored?.attempts).toBe(1); + expect((await repo.getLease(job.id))?.leaseEpoch).toBe(1); + }); + + it('claimNextJob: N concurrent claimers for a single queued job — exactly one succeeds', async () => { + await coord.submitJob(PID, input()); + const N = 12; + const results = await Promise.all( + Array.from({ length: N }, (_, i) => coord.claimNextJob(factory({ factoryId: `f_${i}` }))) + ); + expect(results.filter(r => r !== null)).toHaveLength(1); + }); + + it('lease renew under contention — exactly one concurrent renewal wins', async () => { + const { job } = await coord.submitJob(PID, input()); + const claim = await coord.claimNextJob(factory()); + const epoch = claim!.job.leaseEpoch; + const N = 10; + const results = await Promise.all( + Array.from({ length: N }, () => coord.renewLease(job.id, PID, epoch, 600)) + ); + expect(results.filter(r => r.ok)).toHaveLength(1); + expect(results.filter(r => !r.ok && r.reason === 'conflict')).toHaveLength(N - 1); + }); + // ── PRIORITY + AGE SELECTION ── it('claimNextJob returns highest-priority then oldest', async () => { await coord.submitJob(PID, input({ idempotencyKey: 'low-old', priority: 'low' })); diff --git a/services/platform-service/src/modules/fleet/repository.ts b/services/platform-service/src/modules/fleet/repository.ts index 8f2cee4a..e9fb0373 100644 --- a/services/platform-service/src/modules/fleet/repository.ts +++ b/services/platform-service/src/modules/fleet/repository.ts @@ -109,22 +109,25 @@ export async function updateJob( }); } -/** Compare-and-swap on `rev` — the atomic-claim / fenced-transition primitive. */ +/** + * Compare-and-swap on `rev` — the atomic-claim / fenced-transition primitive. + * Delegates to the datastore's `updateIfMatch`, which performs the compare and the + * write as one indivisible operation (Cosmos If-Match; a synchronous compare-set on + * the memory provider). This is genuinely atomic under TRUE concurrency — there is + * no read → check → write with an intervening await that two callers could interleave. + */ export async function revUpdateJob( id: string, productId: string, expectedRev: number, updates: Partial ): Promise> { - const cur = await jobs().findById(id, productId); - if (!cur) return { ok: false, reason: 'not_found' }; - if ((cur.rev ?? 0) !== expectedRev) return { ok: false, reason: 'conflict' }; - const doc = await jobs().update(id, productId, { - ...updates, - rev: expectedRev + 1, - updatedAt: new Date().toISOString(), - }); - return { ok: true, doc }; + return jobs().updateIfMatch( + id, + productId, + { rev: expectedRev }, + { ...updates, updatedAt: new Date().toISOString() } + ); } export async function deleteJob(id: string, productId: string): Promise { @@ -161,20 +164,18 @@ export async function createLease(doc: FleetLeaseDoc): Promise { return leases().create(doc); } +/** Compare-and-swap on a lease's `rev` — atomic via the datastore updateIfMatch. */ export async function revUpdateLease( jobId: string, expectedRev: number, updates: Partial ): Promise> { - const cur = await leases().findById(jobId, jobId); - if (!cur) return { ok: false, reason: 'not_found' }; - if ((cur.rev ?? 0) !== expectedRev) return { ok: false, reason: 'conflict' }; - const doc = await leases().update(jobId, jobId, { - ...updates, - rev: expectedRev + 1, - updatedAt: new Date().toISOString(), - }); - return { ok: true, doc }; + return leases().updateIfMatch( + jobId, + jobId, + { rev: expectedRev }, + { ...updates, updatedAt: new Date().toISOString() } + ); } export async function listExpiredLeases(nowIso: string): Promise {