diff --git a/packages/datastore/src/__tests__/cosmos.test.ts b/packages/datastore/src/__tests__/cosmos.test.ts new file mode 100644 index 00000000..6bc86e5f --- /dev/null +++ b/packages/datastore/src/__tests__/cosmos.test.ts @@ -0,0 +1,128 @@ +/** + * CosmosDatastoreProvider.updateIfMatch — verifies the If-Match (_etag) optimistic + * concurrency mapping against a fake @azure/cosmos: a matching etag writes + bumps; + * a stale etag surfaces as HTTP 412 -> conflict; a missing doc -> not_found. + * + * Uses a fake Cosmos client (no live Cosmos) that enforces accessCondition IfMatch. + */ + +import { describe, it, expect, beforeEach, vi } from 'vitest'; + +interface StoredRec { + doc: Record; + etag: string; +} + +const h = vi.hoisted(() => { + const store = new Map(); + let etagSeq = 0; + return { + store, + seed(id: string, doc: Record): void { + etagSeq += 1; + store.set(id, { doc: { ...doc, _etag: `e${etagSeq}` }, etag: `e${etagSeq}` }); + }, + nextEtag(): string { + etagSeq += 1; + return `e${etagSeq}`; + }, + reset(): void { + store.clear(); + etagSeq = 0; + }, + }; +}); + +vi.mock('@azure/cosmos', () => { + type CosmosErr = Error & { code?: number }; + function err(code: number): CosmosErr { + const e = new Error(`cosmos ${code}`) as CosmosErr; + e.code = code; + return e; + } + class CosmosClient { + constructor(_opts: unknown) {} + database(_name: string) { + return { + container: (_c: string) => ({ + item: (id: string, _pk: string) => ({ + read() { + const rec = h.store.get(id); + if (!rec) throw err(404); + return Promise.resolve({ resource: { ...rec.doc, _etag: rec.etag } }); + }, + replace( + doc: Record, + options?: { accessCondition?: { type: string; condition?: string } } + ) { + const rec = h.store.get(id); + if (!rec) throw err(404); + const cond = options?.accessCondition?.condition; + if (cond !== undefined && cond !== rec.etag) throw err(412); + const etag = h.nextEtag(); + const saved = { ...doc, _etag: etag }; + h.store.set(id, { doc: saved, etag }); + return Promise.resolve({ resource: saved }); + }, + }), + }), + }; + } + } + return { CosmosClient }; +}); + +import { CosmosDatastoreProvider } from '../providers/cosmos.js'; +import type { BaseDocument, DocumentCollection } from '../types.js'; + +interface RevDoc extends BaseDocument { + name: string; + rev?: number; +} + +describe('CosmosDatastoreProvider.updateIfMatch', () => { + let col: DocumentCollection; + beforeEach(() => { + h.reset(); + const provider = new CosmosDatastoreProvider({ + endpoint: 'https://fake.documents.azure.com:443/', + key: 'fake-key', + database: 'test', + }); + col = provider.getCollection('rev', '/productId'); + h.seed('1', { id: '1', productId: 'p', name: 'A', rev: 0 }); + }); + + it('matching etag -> writes via If-Match and returns ok', async () => { + const cur = await col.findById('1', 'p'); + const res = await col.updateIfMatch('1', 'p', { etag: cur!._etag }, { name: 'B' }); + expect(res.ok).toBe(true); + if (res.ok) { + expect(res.doc.name).toBe('B'); + expect(res.doc.rev).toBe(1); + } + }); + + it('stale etag -> Cosmos 412 -> conflict, no overwrite', async () => { + const cur = await col.findById('1', 'p'); + const staleEtag = cur!._etag; + await col.updateIfMatch('1', 'p', { etag: staleEtag }, { name: 'B' }); // etag rotates + const res = await col.updateIfMatch('1', 'p', { etag: staleEtag }, { name: 'C' }); + expect(res.ok).toBe(false); + if (!res.ok) expect(res.reason).toBe('conflict'); + expect((await col.findById('1', 'p'))?.name).toBe('B'); + }); + + it('stale rev -> conflict before write', async () => { + await col.updateIfMatch('1', 'p', { rev: 0 }, { name: 'B' }); // rev -> 1 + const res = await col.updateIfMatch('1', 'p', { rev: 0 }, { name: 'C' }); + expect(res.ok).toBe(false); + if (!res.ok) expect(res.reason).toBe('conflict'); + }); + + it('missing doc -> not_found', async () => { + const res = await col.updateIfMatch('missing', 'p', { rev: 0 }, { name: 'X' }); + expect(res.ok).toBe(false); + if (!res.ok) expect(res.reason).toBe('not_found'); + }); +}); diff --git a/packages/datastore/src/__tests__/memory.test.ts b/packages/datastore/src/__tests__/memory.test.ts index 9652d2fc..c831909f 100644 --- a/packages/datastore/src/__tests__/memory.test.ts +++ b/packages/datastore/src/__tests__/memory.test.ts @@ -196,6 +196,71 @@ describe('MemoryDatastoreProvider', () => { await expect(collection.rawQuery('SELECT * FROM c')).rejects.toThrow('not supported'); }); }); + + describe('updateIfMatch (optimistic concurrency)', () => { + interface RevDoc extends BaseDocument { + name: string; + rev?: number; + } + let revCol: DocumentCollection; + beforeEach(async () => { + revCol = provider.getCollection('rev', '/productId'); + await revCol.create({ id: '1', productId: 'p', name: 'A', rev: 0 }); + }); + + it('matching rev writes the patch and bumps rev + _etag', async () => { + const res = await revCol.updateIfMatch('1', 'p', { rev: 0 }, { name: 'B' }); + expect(res.ok).toBe(true); + if (res.ok) { + expect(res.doc.name).toBe('B'); + expect(res.doc.rev).toBe(1); + expect(res.doc._etag).toBeDefined(); + } + }); + + it('stale rev => conflict and NO write', async () => { + await revCol.updateIfMatch('1', 'p', { rev: 0 }, { name: 'B' }); // rev -> 1 + const res = await revCol.updateIfMatch('1', 'p', { rev: 0 }, { name: 'C' }); + expect(res.ok).toBe(false); + if (!res.ok) expect(res.reason).toBe('conflict'); + // value unchanged by the losing write + expect((await revCol.findById('1', 'p'))?.name).toBe('B'); + }); + + it('matching etag writes; stale etag => conflict', async () => { + const first = await revCol.updateIfMatch('1', 'p', { rev: 0 }, { name: 'B' }); + const goodEtag = first.ok ? first.doc._etag : undefined; + const ok = await revCol.updateIfMatch('1', 'p', { etag: goodEtag }, { name: 'C' }); + expect(ok.ok).toBe(true); + const stale = await revCol.updateIfMatch('1', 'p', { etag: goodEtag }, { name: 'D' }); + expect(stale.ok).toBe(false); + if (!stale.ok) expect(stale.reason).toBe('conflict'); + }); + + it('missing doc => not_found', async () => { + const res = await revCol.updateIfMatch('nope', 'p', { rev: 0 }, { name: 'X' }); + expect(res.ok).toBe(false); + if (!res.ok) expect(res.reason).toBe('not_found'); + }); + + // THE PROOF: N concurrent compare-and-set on the SAME version — exactly one wins. + it('is atomic under TRUE concurrency: N concurrent claims => exactly one winner', async () => { + const N = 25; + const results = await Promise.all( + Array.from({ length: N }, (_, i) => + revCol.updateIfMatch('1', 'p', { rev: 0 }, { name: `claimer-${i}` }) + ) + ); + const winners = results.filter(r => r.ok); + const conflicts = results.filter(r => !r.ok); + expect(winners).toHaveLength(1); + expect(conflicts).toHaveLength(N - 1); + expect(conflicts.every(r => !r.ok && r.reason === 'conflict')).toBe(true); + // the stored doc reflects exactly one applied write at rev 1 + const stored = await revCol.findById('1', 'p'); + expect(stored?.rev).toBe(1); + }); + }); }); describe('matchesFilter', () => { diff --git a/packages/datastore/src/index.ts b/packages/datastore/src/index.ts index e0718b84..6b30e398 100644 --- a/packages/datastore/src/index.ts +++ b/packages/datastore/src/index.ts @@ -10,6 +10,8 @@ export type { DocumentCollection, DatastoreProvider, DatastoreProviderType, + ConcurrencyToken, + UpdateIfMatchResult, } from './types.js'; export { getDatastore, createDatastoreProvider, setDatastore, _resetDatastore } from './factory.js'; diff --git a/packages/datastore/src/providers/cosmos.ts b/packages/datastore/src/providers/cosmos.ts index 059f9d8a..02b8a48f 100644 --- a/packages/datastore/src/providers/cosmos.ts +++ b/packages/datastore/src/providers/cosmos.ts @@ -10,9 +10,11 @@ import type { AggregateQuery, BaseDocument, CollectionQuery, + ConcurrencyToken, DatastoreProvider, DocumentCollection, FilterMap, + UpdateIfMatchResult, } from '../types.js'; export interface CosmosProviderConfig { @@ -145,6 +147,56 @@ class CosmosCollection implements DocumentCollection return resource as T; } + /** + * Atomic compare-and-set via Cosmos optimistic concurrency. Conditions the + * replace on the document `_etag` (`accessCondition: { type: 'IfMatch' }`), so + * the server rejects the write (HTTP 412) if any other writer committed since + * the version this caller read — exactly one concurrent claimer wins. A 412 maps + * to `{ ok: false, reason: 'conflict' }`, a 404 to `'not_found'`. `rev` is also + * compared (when supplied) and bumped, for parity with the memory provider. + */ + async updateIfMatch( + id: string, + partitionKey: string, + expected: ConcurrencyToken, + patch: Partial + ): Promise> { + const c = await this.container(); + let existing: T | undefined; + try { + const read = await c.item(id, partitionKey).read(); + existing = read.resource; + } catch (err: unknown) { + if ((err as { code?: number })?.code === 404) return { ok: false, reason: 'not_found' }; + throw err; + } + if (!existing) return { ok: false, reason: 'not_found' }; + + const cur = existing as T & { rev?: number }; + // If the caller pinned a rev and it already moved on, a concurrent writer + // committed before our read — conflict without attempting the write. + if (expected.rev !== undefined && (cur.rev ?? 0) !== expected.rev) { + return { ok: false, reason: 'conflict' }; + } + // Condition the write on the caller's etag when given, else the just-read + // etag — either way the IfMatch guards the read→replace window server-side. + const condition = expected.etag ?? cur._etag; + const nextRev = (cur.rev ?? 0) + 1; + const merged = { ...existing, ...patch, rev: nextRev } as T; + + try { + const { resource } = await c.item(id, partitionKey).replace(merged, { + accessCondition: condition ? { type: 'IfMatch', condition } : undefined, + }); + return { ok: true, doc: resource as T }; + } catch (err: unknown) { + const code = (err as { code?: number })?.code; + if (code === 412) return { ok: false, reason: 'conflict' }; + if (code === 404) return { ok: false, reason: 'not_found' }; + throw err; + } + } + async upsert(doc: T): Promise { const c = await this.container(); const { resource } = await c.items.upsert(doc); diff --git a/packages/datastore/src/providers/memory.ts b/packages/datastore/src/providers/memory.ts index 9d1cc3fa..80ba7ac9 100644 --- a/packages/datastore/src/providers/memory.ts +++ b/packages/datastore/src/providers/memory.ts @@ -9,9 +9,11 @@ import type { AggregateQuery, BaseDocument, CollectionQuery, + ConcurrencyToken, DatastoreProvider, DocumentCollection, FilterMap, + UpdateIfMatchResult, } from '../types.js'; function deepClone(obj: T): T { @@ -123,6 +125,34 @@ class MemoryCollection implements DocumentCollection return deepClone(merged); } + /** + * Atomic compare-and-set. The entire get → compare → set runs SYNCHRONOUSLY in + * one block with NO await/yield, so under the single-threaded event loop two + * concurrent callers can never interleave between the compare and the set — the + * first wins (bumps `rev` + `_etag`), the rest see a mismatch and get `conflict`. + */ + async updateIfMatch( + id: string, + _partitionKey: string, + expected: ConcurrencyToken, + patch: Partial + ): Promise> { + const existing = this.docs.get(id); + if (!existing) return { ok: false, reason: 'not_found' }; + const rec = existing as BaseDocument & { rev?: number }; + const curRev = typeof rec.rev === 'number' ? rec.rev : 0; + if (expected.rev !== undefined && expected.rev !== curRev) { + return { ok: false, reason: 'conflict' }; + } + if (expected.etag !== undefined && expected.etag !== rec._etag) { + return { ok: false, reason: 'conflict' }; + } + const nextRev = curRev + 1; + const merged = { ...existing, ...patch, rev: nextRev, _etag: `m${nextRev}` } as T; + this.docs.set(id, deepClone(merged)); + return { ok: true, doc: deepClone(merged) }; + } + async upsert(doc: T): Promise { const clone = deepClone(doc); this.docs.set(doc.id, clone); diff --git a/packages/datastore/src/types.ts b/packages/datastore/src/types.ts index 02fd04e9..e16f26f8 100644 --- a/packages/datastore/src/types.ts +++ b/packages/datastore/src/types.ts @@ -10,6 +10,11 @@ export interface BaseDocument { id: string; productId: string; + /** + * Optimistic-concurrency tag surfaced by the provider (Cosmos `_etag`). Optional + * and provider-managed — callers read it back and pass it to `updateIfMatch`. + */ + _etag?: string; } // ── Filter operators ──────────────────────────────────────────────────────── @@ -57,6 +62,24 @@ export interface AggregationField { alias: string; } +// ── Optimistic concurrency ────────────────────────────────────────────────── + +/** + * The version a conditional write expects the stored document to still be at. + * `etag` is the provider tag (Cosmos `_etag`); `rev` is a portable monotonic + * counter maintained for parity (and used by the memory provider). Provide either + * or both — the write succeeds only if every supplied token still matches. + */ +export interface ConcurrencyToken { + etag?: string; + rev?: number; +} + +/** Result of a conditional (compare-and-swap) write. */ +export type UpdateIfMatchResult = + | { ok: true; doc: T } + | { ok: false; reason: 'conflict' | 'not_found' }; + // ── Document collection interface ─────────────────────────────────────────── export interface DocumentCollection { @@ -78,6 +101,21 @@ export interface DocumentCollection { /** Update a document by ID + partition key (merge semantics). */ update(id: string, partitionKey: string, updates: Partial): Promise; + /** + * Optimistic-concurrency update: merge `patch` into the document only if its + * stored version still matches `expected` (Cosmos If-Match on `_etag`; an + * atomic compare-and-set on `rev` for the memory provider). Bumps `rev` and + * surfaces a fresh `_etag` on success. Never overwrites a doc that changed — + * returns `{ ok: false, reason: 'conflict' }` instead, and `'not_found'` when + * the document is absent. This is the single-winner primitive for atomic claims. + */ + updateIfMatch( + id: string, + partitionKey: string, + expected: ConcurrencyToken, + patch: Partial + ): Promise>; + /** Upsert a document (create or replace). */ upsert(doc: T): Promise;