From 40fd0e05ad113b257b8c496ac3ce2eae994982da Mon Sep 17 00:00:00 2001 From: saravanakumardb1 Date: Fri, 29 May 2026 20:58:55 -0700 Subject: [PATCH 1/2] feat(datastore): add updateIfMatch optimistic-concurrency write (If-Match / atomic CAS) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds an additive, backward-compatible conditional write to the datastore abstraction so consumers can do true single-winner compare-and-swap: updateIfMatch(id, partitionKey, expected: { etag?, rev? }, patch) -> { ok: true, doc } | { ok: false, reason: 'conflict' | 'not_found' } - types: ConcurrencyToken + UpdateIfMatchResult; optional _etag on BaseDocument (provider-managed, surfaced on reads); new method on DocumentCollection; exported. - memory provider: get -> compare -> set in ONE synchronous block (no await/yield), so two concurrent callers cannot interleave under the single-threaded event loop — the first wins and bumps rev + _etag, the rest get conflict. True in-process atomicity. - cosmos provider: conditional replace with accessCondition { type: 'IfMatch', condition: _etag }; translates Cosmos 412 -> conflict, 404 -> not_found; also compares/bumps rev for parity. Existing method signatures are unchanged (additive only). Tests: memory match/stale/ missing + an N-concurrent Promise.all atomicity proof; cosmos If-Match mapping via a fake @azure/cosmos (match writes, stale etag -> 412 conflict, missing -> not_found). --- .../datastore/src/__tests__/cosmos.test.ts | 128 ++++++++++++++++++ .../datastore/src/__tests__/memory.test.ts | 65 +++++++++ packages/datastore/src/index.ts | 2 + packages/datastore/src/providers/cosmos.ts | 52 +++++++ packages/datastore/src/providers/memory.ts | 30 ++++ packages/datastore/src/types.ts | 38 ++++++ 6 files changed, 315 insertions(+) create mode 100644 packages/datastore/src/__tests__/cosmos.test.ts diff --git a/packages/datastore/src/__tests__/cosmos.test.ts b/packages/datastore/src/__tests__/cosmos.test.ts new file mode 100644 index 00000000..6bc86e5f --- /dev/null +++ b/packages/datastore/src/__tests__/cosmos.test.ts @@ -0,0 +1,128 @@ +/** + * CosmosDatastoreProvider.updateIfMatch — verifies the If-Match (_etag) optimistic + * concurrency mapping against a fake @azure/cosmos: a matching etag writes + bumps; + * a stale etag surfaces as HTTP 412 -> conflict; a missing doc -> not_found. + * + * Uses a fake Cosmos client (no live Cosmos) that enforces accessCondition IfMatch. + */ + +import { describe, it, expect, beforeEach, vi } from 'vitest'; + +interface StoredRec { + doc: Record; + etag: string; +} + +const h = vi.hoisted(() => { + const store = new Map(); + let etagSeq = 0; + return { + store, + seed(id: string, doc: Record): void { + etagSeq += 1; + store.set(id, { doc: { ...doc, _etag: `e${etagSeq}` }, etag: `e${etagSeq}` }); + }, + nextEtag(): string { + etagSeq += 1; + return `e${etagSeq}`; + }, + reset(): void { + store.clear(); + etagSeq = 0; + }, + }; +}); + +vi.mock('@azure/cosmos', () => { + type CosmosErr = Error & { code?: number }; + function err(code: number): CosmosErr { + const e = new Error(`cosmos ${code}`) as CosmosErr; + e.code = code; + return e; + } + class CosmosClient { + constructor(_opts: unknown) {} + database(_name: string) { + return { + container: (_c: string) => ({ + item: (id: string, _pk: string) => ({ + read() { + const rec = h.store.get(id); + if (!rec) throw err(404); + return Promise.resolve({ resource: { ...rec.doc, _etag: rec.etag } }); + }, + replace( + doc: Record, + options?: { accessCondition?: { type: string; condition?: string } } + ) { + const rec = h.store.get(id); + if (!rec) throw err(404); + const cond = options?.accessCondition?.condition; + if (cond !== undefined && cond !== rec.etag) throw err(412); + const etag = h.nextEtag(); + const saved = { ...doc, _etag: etag }; + h.store.set(id, { doc: saved, etag }); + return Promise.resolve({ resource: saved }); + }, + }), + }), + }; + } + } + return { CosmosClient }; +}); + +import { CosmosDatastoreProvider } from '../providers/cosmos.js'; +import type { BaseDocument, DocumentCollection } from '../types.js'; + +interface RevDoc extends BaseDocument { + name: string; + rev?: number; +} + +describe('CosmosDatastoreProvider.updateIfMatch', () => { + let col: DocumentCollection; + beforeEach(() => { + h.reset(); + const provider = new CosmosDatastoreProvider({ + endpoint: 'https://fake.documents.azure.com:443/', + key: 'fake-key', + database: 'test', + }); + col = provider.getCollection('rev', '/productId'); + h.seed('1', { id: '1', productId: 'p', name: 'A', rev: 0 }); + }); + + it('matching etag -> writes via If-Match and returns ok', async () => { + const cur = await col.findById('1', 'p'); + const res = await col.updateIfMatch('1', 'p', { etag: cur!._etag }, { name: 'B' }); + expect(res.ok).toBe(true); + if (res.ok) { + expect(res.doc.name).toBe('B'); + expect(res.doc.rev).toBe(1); + } + }); + + it('stale etag -> Cosmos 412 -> conflict, no overwrite', async () => { + const cur = await col.findById('1', 'p'); + const staleEtag = cur!._etag; + await col.updateIfMatch('1', 'p', { etag: staleEtag }, { name: 'B' }); // etag rotates + const res = await col.updateIfMatch('1', 'p', { etag: staleEtag }, { name: 'C' }); + expect(res.ok).toBe(false); + if (!res.ok) expect(res.reason).toBe('conflict'); + expect((await col.findById('1', 'p'))?.name).toBe('B'); + }); + + it('stale rev -> conflict before write', async () => { + await col.updateIfMatch('1', 'p', { rev: 0 }, { name: 'B' }); // rev -> 1 + const res = await col.updateIfMatch('1', 'p', { rev: 0 }, { name: 'C' }); + expect(res.ok).toBe(false); + if (!res.ok) expect(res.reason).toBe('conflict'); + }); + + it('missing doc -> not_found', async () => { + const res = await col.updateIfMatch('missing', 'p', { rev: 0 }, { name: 'X' }); + expect(res.ok).toBe(false); + if (!res.ok) expect(res.reason).toBe('not_found'); + }); +}); diff --git a/packages/datastore/src/__tests__/memory.test.ts b/packages/datastore/src/__tests__/memory.test.ts index 9652d2fc..c831909f 100644 --- a/packages/datastore/src/__tests__/memory.test.ts +++ b/packages/datastore/src/__tests__/memory.test.ts @@ -196,6 +196,71 @@ describe('MemoryDatastoreProvider', () => { await expect(collection.rawQuery('SELECT * FROM c')).rejects.toThrow('not supported'); }); }); + + describe('updateIfMatch (optimistic concurrency)', () => { + interface RevDoc extends BaseDocument { + name: string; + rev?: number; + } + let revCol: DocumentCollection; + beforeEach(async () => { + revCol = provider.getCollection('rev', '/productId'); + await revCol.create({ id: '1', productId: 'p', name: 'A', rev: 0 }); + }); + + it('matching rev writes the patch and bumps rev + _etag', async () => { + const res = await revCol.updateIfMatch('1', 'p', { rev: 0 }, { name: 'B' }); + expect(res.ok).toBe(true); + if (res.ok) { + expect(res.doc.name).toBe('B'); + expect(res.doc.rev).toBe(1); + expect(res.doc._etag).toBeDefined(); + } + }); + + it('stale rev => conflict and NO write', async () => { + await revCol.updateIfMatch('1', 'p', { rev: 0 }, { name: 'B' }); // rev -> 1 + const res = await revCol.updateIfMatch('1', 'p', { rev: 0 }, { name: 'C' }); + expect(res.ok).toBe(false); + if (!res.ok) expect(res.reason).toBe('conflict'); + // value unchanged by the losing write + expect((await revCol.findById('1', 'p'))?.name).toBe('B'); + }); + + it('matching etag writes; stale etag => conflict', async () => { + const first = await revCol.updateIfMatch('1', 'p', { rev: 0 }, { name: 'B' }); + const goodEtag = first.ok ? first.doc._etag : undefined; + const ok = await revCol.updateIfMatch('1', 'p', { etag: goodEtag }, { name: 'C' }); + expect(ok.ok).toBe(true); + const stale = await revCol.updateIfMatch('1', 'p', { etag: goodEtag }, { name: 'D' }); + expect(stale.ok).toBe(false); + if (!stale.ok) expect(stale.reason).toBe('conflict'); + }); + + it('missing doc => not_found', async () => { + const res = await revCol.updateIfMatch('nope', 'p', { rev: 0 }, { name: 'X' }); + expect(res.ok).toBe(false); + if (!res.ok) expect(res.reason).toBe('not_found'); + }); + + // THE PROOF: N concurrent compare-and-set on the SAME version — exactly one wins. + it('is atomic under TRUE concurrency: N concurrent claims => exactly one winner', async () => { + const N = 25; + const results = await Promise.all( + Array.from({ length: N }, (_, i) => + revCol.updateIfMatch('1', 'p', { rev: 0 }, { name: `claimer-${i}` }) + ) + ); + const winners = results.filter(r => r.ok); + const conflicts = results.filter(r => !r.ok); + expect(winners).toHaveLength(1); + expect(conflicts).toHaveLength(N - 1); + expect(conflicts.every(r => !r.ok && r.reason === 'conflict')).toBe(true); + // the stored doc reflects exactly one applied write at rev 1 + const stored = await revCol.findById('1', 'p'); + expect(stored?.rev).toBe(1); + }); + }); }); describe('matchesFilter', () => { diff --git a/packages/datastore/src/index.ts b/packages/datastore/src/index.ts index e0718b84..6b30e398 100644 --- a/packages/datastore/src/index.ts +++ b/packages/datastore/src/index.ts @@ -10,6 +10,8 @@ export type { DocumentCollection, DatastoreProvider, DatastoreProviderType, + ConcurrencyToken, + UpdateIfMatchResult, } from './types.js'; export { getDatastore, createDatastoreProvider, setDatastore, _resetDatastore } from './factory.js'; diff --git a/packages/datastore/src/providers/cosmos.ts b/packages/datastore/src/providers/cosmos.ts index 059f9d8a..02b8a48f 100644 --- a/packages/datastore/src/providers/cosmos.ts +++ b/packages/datastore/src/providers/cosmos.ts @@ -10,9 +10,11 @@ import type { AggregateQuery, BaseDocument, CollectionQuery, + ConcurrencyToken, DatastoreProvider, DocumentCollection, FilterMap, + UpdateIfMatchResult, } from '../types.js'; export interface CosmosProviderConfig { @@ -145,6 +147,56 @@ class CosmosCollection implements DocumentCollection return resource as T; } + /** + * Atomic compare-and-set via Cosmos optimistic concurrency. Conditions the + * replace on the document `_etag` (`accessCondition: { type: 'IfMatch' }`), so + * the server rejects the write (HTTP 412) if any other writer committed since + * the version this caller read — exactly one concurrent claimer wins. A 412 maps + * to `{ ok: false, reason: 'conflict' }`, a 404 to `'not_found'`. `rev` is also + * compared (when supplied) and bumped, for parity with the memory provider. + */ + async updateIfMatch( + id: string, + partitionKey: string, + expected: ConcurrencyToken, + patch: Partial + ): Promise> { + const c = await this.container(); + let existing: T | undefined; + try { + const read = await c.item(id, partitionKey).read(); + existing = read.resource; + } catch (err: unknown) { + if ((err as { code?: number })?.code === 404) return { ok: false, reason: 'not_found' }; + throw err; + } + if (!existing) return { ok: false, reason: 'not_found' }; + + const cur = existing as T & { rev?: number }; + // If the caller pinned a rev and it already moved on, a concurrent writer + // committed before our read — conflict without attempting the write. + if (expected.rev !== undefined && (cur.rev ?? 0) !== expected.rev) { + return { ok: false, reason: 'conflict' }; + } + // Condition the write on the caller's etag when given, else the just-read + // etag — either way the IfMatch guards the read→replace window server-side. + const condition = expected.etag ?? cur._etag; + const nextRev = (cur.rev ?? 0) + 1; + const merged = { ...existing, ...patch, rev: nextRev } as T; + + try { + const { resource } = await c.item(id, partitionKey).replace(merged, { + accessCondition: condition ? { type: 'IfMatch', condition } : undefined, + }); + return { ok: true, doc: resource as T }; + } catch (err: unknown) { + const code = (err as { code?: number })?.code; + if (code === 412) return { ok: false, reason: 'conflict' }; + if (code === 404) return { ok: false, reason: 'not_found' }; + throw err; + } + } + async upsert(doc: T): Promise { const c = await this.container(); const { resource } = await c.items.upsert(doc); diff --git a/packages/datastore/src/providers/memory.ts b/packages/datastore/src/providers/memory.ts index 9d1cc3fa..80ba7ac9 100644 --- a/packages/datastore/src/providers/memory.ts +++ b/packages/datastore/src/providers/memory.ts @@ -9,9 +9,11 @@ import type { AggregateQuery, BaseDocument, CollectionQuery, + ConcurrencyToken, DatastoreProvider, DocumentCollection, FilterMap, + UpdateIfMatchResult, } from '../types.js'; function deepClone(obj: T): T { @@ -123,6 +125,34 @@ class MemoryCollection implements DocumentCollection return deepClone(merged); } + /** + * Atomic compare-and-set. The entire get → compare → set runs SYNCHRONOUSLY in + * one block with NO await/yield, so under the single-threaded event loop two + * concurrent callers can never interleave between the compare and the set — the + * first wins (bumps `rev` + `_etag`), the rest see a mismatch and get `conflict`. + */ + async updateIfMatch( + id: string, + _partitionKey: string, + expected: ConcurrencyToken, + patch: Partial + ): Promise> { + const existing = this.docs.get(id); + if (!existing) return { ok: false, reason: 'not_found' }; + const rec = existing as BaseDocument & { rev?: number }; + const curRev = typeof rec.rev === 'number' ? rec.rev : 0; + if (expected.rev !== undefined && expected.rev !== curRev) { + return { ok: false, reason: 'conflict' }; + } + if (expected.etag !== undefined && expected.etag !== rec._etag) { + return { ok: false, reason: 'conflict' }; + } + const nextRev = curRev + 1; + const merged = { ...existing, ...patch, rev: nextRev, _etag: `m${nextRev}` } as T; + this.docs.set(id, deepClone(merged)); + return { ok: true, doc: deepClone(merged) }; + } + async upsert(doc: T): Promise { const clone = deepClone(doc); this.docs.set(doc.id, clone); diff --git a/packages/datastore/src/types.ts b/packages/datastore/src/types.ts index 02fd04e9..e16f26f8 100644 --- a/packages/datastore/src/types.ts +++ b/packages/datastore/src/types.ts @@ -10,6 +10,11 @@ export interface BaseDocument { id: string; productId: string; + /** + * Optimistic-concurrency tag surfaced by the provider (Cosmos `_etag`). Optional + * and provider-managed — callers read it back and pass it to `updateIfMatch`. + */ + _etag?: string; } // ── Filter operators ──────────────────────────────────────────────────────── @@ -57,6 +62,24 @@ export interface AggregationField { alias: string; } +// ── Optimistic concurrency ────────────────────────────────────────────────── + +/** + * The version a conditional write expects the stored document to still be at. + * `etag` is the provider tag (Cosmos `_etag`); `rev` is a portable monotonic + * counter maintained for parity (and used by the memory provider). Provide either + * or both — the write succeeds only if every supplied token still matches. + */ +export interface ConcurrencyToken { + etag?: string; + rev?: number; +} + +/** Result of a conditional (compare-and-swap) write. */ +export type UpdateIfMatchResult = + | { ok: true; doc: T } + | { ok: false; reason: 'conflict' | 'not_found' }; + // ── Document collection interface ─────────────────────────────────────────── export interface DocumentCollection { @@ -78,6 +101,21 @@ export interface DocumentCollection { /** Update a document by ID + partition key (merge semantics). */ update(id: string, partitionKey: string, updates: Partial): Promise; + /** + * Optimistic-concurrency update: merge `patch` into the document only if its + * stored version still matches `expected` (Cosmos If-Match on `_etag`; an + * atomic compare-and-set on `rev` for the memory provider). Bumps `rev` and + * surfaces a fresh `_etag` on success. Never overwrites a doc that changed — + * returns `{ ok: false, reason: 'conflict' }` instead, and `'not_found'` when + * the document is absent. This is the single-winner primitive for atomic claims. + */ + updateIfMatch( + id: string, + partitionKey: string, + expected: ConcurrencyToken, + patch: Partial + ): Promise>; + /** Upsert a document (create or replace). */ upsert(doc: T): Promise; From 33c1d8d5fa98d371bc6fe01d86b4b4137c8aa548 Mon Sep 17 00:00:00 2001 From: saravanakumardb1 Date: Fri, 29 May 2026 20:59:08 -0700 Subject: [PATCH 2/2] fix(platform-service): make fleet job claim truly atomic via datastore updateIfMatch MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The foundation's revUpdateJob/revUpdateLease did a read -> rev-check -> write with await points between them, so two CONCURRENT claims could both read the same rev, both pass the check, and both write — a double-assignment the old (sequential) race test could not catch. Rewire revUpdateJob/revUpdateLease to delegate to the datastore's updateIfMatch, which performs the compare and the write as one indivisible operation (Cosmos If-Match; synchronous compare-set on memory). The coordinator's tryClaimJob keeps identical external behavior (ok/conflict) but is now genuinely single-winner. Upgrades the coordinator tests to prove atomicity under TRUE concurrency: - two contenders via Promise.all -> exactly one ok, one conflict; assigned once; one run; one lease; leaseEpoch 1. - N-claimer (15) stress via Promise.all -> one ok, N-1 conflicts, no double-assignment. - N concurrent claimNextJob for one job -> exactly one non-null claim. - N concurrent lease renewals -> exactly one wins. Verified these concurrent tests FAIL against the old read-check-write (double-assign) and pass after the fix. --- .../src/modules/fleet/coordinator.test.ts | 54 ++++++++++++++++--- .../src/modules/fleet/repository.ts | 39 +++++++------- 2 files changed, 68 insertions(+), 25 deletions(-) diff --git a/services/platform-service/src/modules/fleet/coordinator.test.ts b/services/platform-service/src/modules/fleet/coordinator.test.ts index d24f0751..74966e2e 100644 --- a/services/platform-service/src/modules/fleet/coordinator.test.ts +++ b/services/platform-service/src/modules/fleet/coordinator.test.ts @@ -46,13 +46,16 @@ describe('fleet coordinator', () => { expect(job.productId).toBe(PID); }); - // ── ATOMIC CLAIM RACE ── - it('atomic claim: two contenders on the same job version — exactly one wins', async () => { + // ── ATOMIC CLAIM RACE (TRUE concurrency, not sequential) ── + it('atomic claim: two TRULY concurrent contenders on the same job version — exactly one wins', async () => { const { job } = await coord.submitJob(PID, input()); - // Both contenders see the SAME job version (rev). The CAS picks one winner. - const a = await coord.tryClaimJob(job, factory({ factoryId: 'fac_A' })); - const b = await coord.tryClaimJob(job, factory({ factoryId: 'fac_B' })); + // Both contenders see the SAME freshly-read job version and race via Promise.all. + // The datastore compare-and-set (updateIfMatch) admits exactly one winner. + const [a, b] = await Promise.all([ + coord.tryClaimJob(job, factory({ factoryId: 'fac_A' })), + coord.tryClaimJob(job, factory({ factoryId: 'fac_B' })), + ]); const oks = [a, b].filter(r => r.ok); const conflicts = [a, b].filter(r => !r.ok); @@ -60,7 +63,7 @@ describe('fleet coordinator', () => { expect(conflicts).toHaveLength(1); expect(conflicts[0].ok === false && conflicts[0].reason).toBe('conflict'); - // no double-assignment: the job is assigned exactly once, single run, single holder + // no double-assignment: assigned exactly once, single run, single lease holder const stored = await repo.getJob(job.id, PID); expect(stored?.stage).toBe('assigned'); expect(stored?.attempts).toBe(1); @@ -69,6 +72,45 @@ describe('fleet coordinator', () => { expect(lease?.leaseEpoch).toBe(1); }); + it('atomic claim: N concurrent contenders (stress) — exactly one ok, N-1 conflicts, no double-assignment', async () => { + const N = 15; + const { job } = await coord.submitJob(PID, input()); + const results = await Promise.all( + Array.from({ length: N }, (_, i) => + coord.tryClaimJob(job, factory({ factoryId: `fac_${i}` })) + ) + ); + expect(results.filter(r => r.ok)).toHaveLength(1); + expect(results.filter(r => !r.ok && r.reason === 'conflict')).toHaveLength(N - 1); + // exactly one run + one assigned job + leaseEpoch 1 + expect(await repo.listRunsByJob(job.id)).toHaveLength(1); + const stored = await repo.getJob(job.id, PID); + expect(stored?.stage).toBe('assigned'); + expect(stored?.attempts).toBe(1); + expect((await repo.getLease(job.id))?.leaseEpoch).toBe(1); + }); + + it('claimNextJob: N concurrent claimers for a single queued job — exactly one succeeds', async () => { + await coord.submitJob(PID, input()); + const N = 12; + const results = await Promise.all( + Array.from({ length: N }, (_, i) => coord.claimNextJob(factory({ factoryId: `f_${i}` }))) + ); + expect(results.filter(r => r !== null)).toHaveLength(1); + }); + + it('lease renew under contention — exactly one concurrent renewal wins', async () => { + const { job } = await coord.submitJob(PID, input()); + const claim = await coord.claimNextJob(factory()); + const epoch = claim!.job.leaseEpoch; + const N = 10; + const results = await Promise.all( + Array.from({ length: N }, () => coord.renewLease(job.id, PID, epoch, 600)) + ); + expect(results.filter(r => r.ok)).toHaveLength(1); + expect(results.filter(r => !r.ok && r.reason === 'conflict')).toHaveLength(N - 1); + }); + // ── PRIORITY + AGE SELECTION ── it('claimNextJob returns highest-priority then oldest', async () => { await coord.submitJob(PID, input({ idempotencyKey: 'low-old', priority: 'low' })); diff --git a/services/platform-service/src/modules/fleet/repository.ts b/services/platform-service/src/modules/fleet/repository.ts index 8f2cee4a..e9fb0373 100644 --- a/services/platform-service/src/modules/fleet/repository.ts +++ b/services/platform-service/src/modules/fleet/repository.ts @@ -109,22 +109,25 @@ export async function updateJob( }); } -/** Compare-and-swap on `rev` — the atomic-claim / fenced-transition primitive. */ +/** + * Compare-and-swap on `rev` — the atomic-claim / fenced-transition primitive. + * Delegates to the datastore's `updateIfMatch`, which performs the compare and the + * write as one indivisible operation (Cosmos If-Match; a synchronous compare-set on + * the memory provider). This is genuinely atomic under TRUE concurrency — there is + * no read → check → write with an intervening await that two callers could interleave. + */ export async function revUpdateJob( id: string, productId: string, expectedRev: number, updates: Partial ): Promise> { - const cur = await jobs().findById(id, productId); - if (!cur) return { ok: false, reason: 'not_found' }; - if ((cur.rev ?? 0) !== expectedRev) return { ok: false, reason: 'conflict' }; - const doc = await jobs().update(id, productId, { - ...updates, - rev: expectedRev + 1, - updatedAt: new Date().toISOString(), - }); - return { ok: true, doc }; + return jobs().updateIfMatch( + id, + productId, + { rev: expectedRev }, + { ...updates, updatedAt: new Date().toISOString() } + ); } export async function deleteJob(id: string, productId: string): Promise { @@ -161,20 +164,18 @@ export async function createLease(doc: FleetLeaseDoc): Promise { return leases().create(doc); } +/** Compare-and-swap on a lease's `rev` — atomic via the datastore updateIfMatch. */ export async function revUpdateLease( jobId: string, expectedRev: number, updates: Partial ): Promise> { - const cur = await leases().findById(jobId, jobId); - if (!cur) return { ok: false, reason: 'not_found' }; - if ((cur.rev ?? 0) !== expectedRev) return { ok: false, reason: 'conflict' }; - const doc = await leases().update(jobId, jobId, { - ...updates, - rev: expectedRev + 1, - updatedAt: new Date().toISOString(), - }); - return { ok: true, doc }; + return leases().updateIfMatch( + jobId, + jobId, + { rev: expectedRev }, + { ...updates, updatedAt: new Date().toISOString() } + ); } export async function listExpiredLeases(nowIso: string): Promise {