fix(platform-service): make fleet job claim truly atomic via datastore updateIfMatch

The foundation's revUpdateJob/revUpdateLease did a read -> rev-check -> write with
await points between them, so two CONCURRENT claims could both read the same rev,
both pass the check, and both write — a double-assignment the old (sequential) race
test could not catch.

Rewire revUpdateJob/revUpdateLease to delegate to the datastore's updateIfMatch,
which performs the compare and the write as one indivisible operation (Cosmos
If-Match; synchronous compare-set on memory). The coordinator's tryClaimJob keeps
identical external behavior (ok/conflict) but is now genuinely single-winner.

Upgrades the coordinator tests to prove atomicity under TRUE concurrency:
- two contenders via Promise.all -> exactly one ok, one conflict; assigned once;
  one run; one lease; leaseEpoch 1.
- N-claimer (15) stress via Promise.all -> one ok, N-1 conflicts, no double-assignment.
- N concurrent claimNextJob for one job -> exactly one non-null claim.
- N concurrent lease renewals -> exactly one wins.

Verified these concurrent tests FAIL against the old read-check-write (double-assign)
and pass after the fix.
This commit is contained in:
saravanakumardb1 2026-05-29 20:59:08 -07:00
parent 40fd0e05ad
commit 33c1d8d5fa
2 changed files with 68 additions and 25 deletions

View File

@ -46,13 +46,16 @@ describe('fleet coordinator', () => {
expect(job.productId).toBe(PID);
});
// ── ATOMIC CLAIM RACE ──
it('atomic claim: two contenders on the same job version — exactly one wins', async () => {
// ── ATOMIC CLAIM RACE (TRUE concurrency, not sequential) ──
it('atomic claim: two TRULY concurrent contenders on the same job version — exactly one wins', async () => {
const { job } = await coord.submitJob(PID, input());
// Both contenders see the SAME job version (rev). The CAS picks one winner.
const a = await coord.tryClaimJob(job, factory({ factoryId: 'fac_A' }));
const b = await coord.tryClaimJob(job, factory({ factoryId: 'fac_B' }));
// Both contenders see the SAME freshly-read job version and race via Promise.all.
// The datastore compare-and-set (updateIfMatch) admits exactly one winner.
const [a, b] = await Promise.all([
coord.tryClaimJob(job, factory({ factoryId: 'fac_A' })),
coord.tryClaimJob(job, factory({ factoryId: 'fac_B' })),
]);
const oks = [a, b].filter(r => r.ok);
const conflicts = [a, b].filter(r => !r.ok);
@ -60,7 +63,7 @@ describe('fleet coordinator', () => {
expect(conflicts).toHaveLength(1);
expect(conflicts[0].ok === false && conflicts[0].reason).toBe('conflict');
// no double-assignment: the job is assigned exactly once, single run, single holder
// no double-assignment: assigned exactly once, single run, single lease holder
const stored = await repo.getJob(job.id, PID);
expect(stored?.stage).toBe('assigned');
expect(stored?.attempts).toBe(1);
@ -69,6 +72,45 @@ describe('fleet coordinator', () => {
expect(lease?.leaseEpoch).toBe(1);
});
it('atomic claim: N concurrent contenders (stress) — exactly one ok, N-1 conflicts, no double-assignment', async () => {
const N = 15;
const { job } = await coord.submitJob(PID, input());
const results = await Promise.all(
Array.from({ length: N }, (_, i) =>
coord.tryClaimJob(job, factory({ factoryId: `fac_${i}` }))
)
);
expect(results.filter(r => r.ok)).toHaveLength(1);
expect(results.filter(r => !r.ok && r.reason === 'conflict')).toHaveLength(N - 1);
// exactly one run + one assigned job + leaseEpoch 1
expect(await repo.listRunsByJob(job.id)).toHaveLength(1);
const stored = await repo.getJob(job.id, PID);
expect(stored?.stage).toBe('assigned');
expect(stored?.attempts).toBe(1);
expect((await repo.getLease(job.id))?.leaseEpoch).toBe(1);
});
it('claimNextJob: N concurrent claimers for a single queued job — exactly one succeeds', async () => {
await coord.submitJob(PID, input());
const N = 12;
const results = await Promise.all(
Array.from({ length: N }, (_, i) => coord.claimNextJob(factory({ factoryId: `f_${i}` })))
);
expect(results.filter(r => r !== null)).toHaveLength(1);
});
it('lease renew under contention — exactly one concurrent renewal wins', async () => {
const { job } = await coord.submitJob(PID, input());
const claim = await coord.claimNextJob(factory());
const epoch = claim!.job.leaseEpoch;
const N = 10;
const results = await Promise.all(
Array.from({ length: N }, () => coord.renewLease(job.id, PID, epoch, 600))
);
expect(results.filter(r => r.ok)).toHaveLength(1);
expect(results.filter(r => !r.ok && r.reason === 'conflict')).toHaveLength(N - 1);
});
// ── PRIORITY + AGE SELECTION ──
it('claimNextJob returns highest-priority then oldest', async () => {
await coord.submitJob(PID, input({ idempotencyKey: 'low-old', priority: 'low' }));

View File

@ -109,22 +109,25 @@ export async function updateJob(
});
}
/** Compare-and-swap on `rev` — the atomic-claim / fenced-transition primitive. */
/**
* Compare-and-swap on `rev` the atomic-claim / fenced-transition primitive.
* Delegates to the datastore's `updateIfMatch`, which performs the compare and the
* write as one indivisible operation (Cosmos If-Match; a synchronous compare-set on
* the memory provider). This is genuinely atomic under TRUE concurrency there is
* no read check write with an intervening await that two callers could interleave.
*/
export async function revUpdateJob(
id: string,
productId: string,
expectedRev: number,
updates: Partial<FleetJobDoc>
): Promise<RevResult<FleetJobDoc>> {
const cur = await jobs().findById(id, productId);
if (!cur) return { ok: false, reason: 'not_found' };
if ((cur.rev ?? 0) !== expectedRev) return { ok: false, reason: 'conflict' };
const doc = await jobs().update(id, productId, {
...updates,
rev: expectedRev + 1,
updatedAt: new Date().toISOString(),
});
return { ok: true, doc };
return jobs().updateIfMatch(
id,
productId,
{ rev: expectedRev },
{ ...updates, updatedAt: new Date().toISOString() }
);
}
export async function deleteJob(id: string, productId: string): Promise<void> {
@ -161,20 +164,18 @@ export async function createLease(doc: FleetLeaseDoc): Promise<FleetLeaseDoc> {
return leases().create(doc);
}
/** Compare-and-swap on a lease's `rev` — atomic via the datastore updateIfMatch. */
export async function revUpdateLease(
jobId: string,
expectedRev: number,
updates: Partial<FleetLeaseDoc>
): Promise<RevResult<FleetLeaseDoc>> {
const cur = await leases().findById(jobId, jobId);
if (!cur) return { ok: false, reason: 'not_found' };
if ((cur.rev ?? 0) !== expectedRev) return { ok: false, reason: 'conflict' };
const doc = await leases().update(jobId, jobId, {
...updates,
rev: expectedRev + 1,
updatedAt: new Date().toISOString(),
});
return { ok: true, doc };
return leases().updateIfMatch(
jobId,
jobId,
{ rev: expectedRev },
{ ...updates, updatedAt: new Date().toISOString() }
);
}
export async function listExpiredLeases(nowIso: string): Promise<FleetLeaseDoc[]> {