learning_ai_common_plat/packages/datastore/src/providers/cosmos.ts
saravanakumardb1 40fd0e05ad feat(datastore): add updateIfMatch optimistic-concurrency write (If-Match / atomic CAS)
Adds an additive, backward-compatible conditional write to the datastore abstraction
so consumers can do true single-winner compare-and-swap:

  updateIfMatch(id, partitionKey, expected: { etag?, rev? }, patch)
    -> { ok: true, doc } | { ok: false, reason: 'conflict' | 'not_found' }

- types: ConcurrencyToken + UpdateIfMatchResult; optional _etag on BaseDocument
  (provider-managed, surfaced on reads); new method on DocumentCollection; exported.
- memory provider: get -> compare -> set in ONE synchronous block (no await/yield),
  so two concurrent callers cannot interleave under the single-threaded event loop —
  the first wins and bumps rev + _etag, the rest get conflict. True in-process atomicity.
- cosmos provider: conditional replace with accessCondition { type: 'IfMatch',
  condition: _etag }; translates Cosmos 412 -> conflict, 404 -> not_found; also
  compares/bumps rev for parity.

Existing method signatures are unchanged (additive only). Tests: memory match/stale/
missing + an N-concurrent Promise.all atomicity proof; cosmos If-Match mapping via a
fake @azure/cosmos (match writes, stale etag -> 412 conflict, missing -> not_found).
2026-05-29 20:58:55 -07:00

296 lines
9.6 KiB
TypeScript

/**
* Azure Cosmos DB datastore provider.
*
* Wraps @azure/cosmos SDK behind the cloud-agnostic DocumentCollection interface.
* Translates FilterMap queries to Cosmos SQL.
*/
import { filterToCosmosSQL } from '../filter.js';
import type {
AggregateQuery,
BaseDocument,
CollectionQuery,
ConcurrencyToken,
DatastoreProvider,
DocumentCollection,
FilterMap,
UpdateIfMatchResult,
} from '../types.js';
export interface CosmosProviderConfig {
endpoint: string;
key: string;
database: string;
}
export class CosmosDatastoreProvider implements DatastoreProvider {
private client: import('@azure/cosmos').CosmosClient | null = null;
private databaseRef: import('@azure/cosmos').Database | null = null;
private config: CosmosProviderConfig;
private collections = new Map<string, CosmosCollection<BaseDocument>>();
constructor(config?: CosmosProviderConfig) {
this.config = config ?? {
endpoint: getEnvOrThrow('COSMOS_ENDPOINT'),
key: getEnvOrThrow('COSMOS_KEY'),
database: process.env.COSMOS_DATABASE || 'lysnrai',
};
}
private async getDatabase() {
if (!this.databaseRef) {
const { CosmosClient } = await import('@azure/cosmos');
this.client = new CosmosClient({
endpoint: this.config.endpoint,
key: this.config.key,
});
this.databaseRef = this.client.database(this.config.database);
}
return this.databaseRef as import('@azure/cosmos').Database;
}
getCollection<T extends BaseDocument = BaseDocument>(
name: string,
partitionKeyPath: string
): DocumentCollection<T> {
const cacheKey = `${name}:${partitionKeyPath}`;
let collection = this.collections.get(cacheKey);
if (!collection) {
collection = new CosmosCollection<BaseDocument>(name, partitionKeyPath, () =>
this.getDatabase()
);
this.collections.set(cacheKey, collection);
}
return collection as unknown as DocumentCollection<T>;
}
async isHealthy(): Promise<boolean> {
try {
const db = await this.getDatabase();
await db.read();
return true;
} catch {
return false;
}
}
}
class CosmosCollection<T extends BaseDocument> implements DocumentCollection<T> {
constructor(
private containerName: string,
private partitionKeyPath: string,
private getDatabase: () => Promise<import('@azure/cosmos').Database>
) {}
private async container() {
const db = await this.getDatabase();
return db.container(this.containerName);
}
private pkField(): string {
// Convert /userId to userId, /id to id, etc.
return this.partitionKeyPath.replace(/^\//, '');
}
async findById(id: string, partitionKey: string): Promise<T | null> {
try {
const c = await this.container();
const { resource } = await c.item(id, partitionKey).read<T>();
return resource ?? null;
} catch (err: unknown) {
if ((err as { code?: number })?.code === 404) return null;
throw err;
}
}
async findMany(query?: CollectionQuery<T>): Promise<T[]> {
const c = await this.container();
const sql = this.buildSelectSQL(query);
const { resources } = await c.items
.query<T>({
query: sql.query,
parameters: sql.parameters as import('@azure/cosmos').SqlParameter[],
})
.fetchAll();
return resources;
}
async findOne(query?: CollectionQuery<T>): Promise<T | null> {
const results = await this.findMany({ ...query, limit: 1 });
return results[0] ?? null;
}
async count(filter?: FilterMap): Promise<number> {
const c = await this.container();
const whereClause = filter ? filterToCosmosSQL(filter) : { query: '', parameters: [] };
const { resources } = await c.items
.query<number>({
query: `SELECT VALUE COUNT(1) FROM c ${whereClause.query}`,
parameters: whereClause.parameters as import('@azure/cosmos').SqlParameter[],
})
.fetchAll();
return resources[0] ?? 0;
}
async create(doc: T): Promise<T> {
const c = await this.container();
const { resource } = await c.items.create<T>(doc);
return resource as T;
}
async update(id: string, partitionKey: string, updates: Partial<T>): Promise<T> {
const c = await this.container();
const { resource: existing } = await c.item(id, partitionKey).read<T>();
if (!existing) throw new Error(`Document '${id}' not found`);
const merged = { ...existing, ...updates } as T;
const { resource } = await c.item(id, partitionKey).replace<T>(merged);
return resource as T;
}
/**
* Atomic compare-and-set via Cosmos optimistic concurrency. Conditions the
* replace on the document `_etag` (`accessCondition: { type: 'IfMatch' }`), so
* the server rejects the write (HTTP 412) if any other writer committed since
* the version this caller read — exactly one concurrent claimer wins. A 412 maps
* to `{ ok: false, reason: 'conflict' }`, a 404 to `'not_found'`. `rev` is also
* compared (when supplied) and bumped, for parity with the memory provider.
*/
async updateIfMatch(
id: string,
partitionKey: string,
expected: ConcurrencyToken,
patch: Partial<T>
): Promise<UpdateIfMatchResult<T>> {
const c = await this.container();
let existing: T | undefined;
try {
const read = await c.item(id, partitionKey).read<T>();
existing = read.resource;
} catch (err: unknown) {
if ((err as { code?: number })?.code === 404) return { ok: false, reason: 'not_found' };
throw err;
}
if (!existing) return { ok: false, reason: 'not_found' };
const cur = existing as T & { rev?: number };
// If the caller pinned a rev and it already moved on, a concurrent writer
// committed before our read — conflict without attempting the write.
if (expected.rev !== undefined && (cur.rev ?? 0) !== expected.rev) {
return { ok: false, reason: 'conflict' };
}
// Condition the write on the caller's etag when given, else the just-read
// etag — either way the IfMatch guards the read→replace window server-side.
const condition = expected.etag ?? cur._etag;
const nextRev = (cur.rev ?? 0) + 1;
const merged = { ...existing, ...patch, rev: nextRev } as T;
try {
const { resource } = await c.item(id, partitionKey).replace<T>(merged, {
accessCondition: condition ? { type: 'IfMatch', condition } : undefined,
});
return { ok: true, doc: resource as T };
} catch (err: unknown) {
const code = (err as { code?: number })?.code;
if (code === 412) return { ok: false, reason: 'conflict' };
if (code === 404) return { ok: false, reason: 'not_found' };
throw err;
}
}
async upsert(doc: T): Promise<T> {
const c = await this.container();
const { resource } = await c.items.upsert<T>(doc);
return resource as T;
}
async delete(id: string, partitionKey: string): Promise<void> {
const c = await this.container();
await c.item(id, partitionKey).delete();
}
async aggregate<R = Record<string, unknown>>(query: AggregateQuery): Promise<R[]> {
const c = await this.container();
const whereClause = query.filter
? filterToCosmosSQL(query.filter)
: { query: '', parameters: [] };
const aggFields = query.aggregations
.map(a => {
const func = a.op === 'count' ? `COUNT(1)` : `${a.op.toUpperCase()}(c.${a.field})`;
return `${func} AS ${a.alias}`;
})
.join(', ');
const { resources } = await c.items
.query<R>({
query: `SELECT c.${query.groupBy}, ${aggFields} FROM c ${whereClause.query} GROUP BY c.${query.groupBy}`,
parameters: whereClause.parameters as import('@azure/cosmos').SqlParameter[],
})
.fetchAll();
return resources;
}
async rawQuery<R = unknown>(query: string, parameters?: Record<string, unknown>): Promise<R[]> {
const c = await this.container();
const cosmosParams = parameters
? Object.entries(parameters).map(([name, value]) => ({
name: name.startsWith('@') ? name : `@${name}`,
value,
}))
: [];
const { resources } = await c.items
.query<R>({
query,
parameters: cosmosParams as import('@azure/cosmos').SqlParameter[],
})
.fetchAll();
return resources;
}
private buildSelectSQL(query?: CollectionQuery<T>): {
query: string;
parameters: Array<{ name: string; value: unknown }>;
} {
const parts: string[] = [];
// SELECT
if (query?.select && query.select.length > 0) {
const fields = query.select.map(f => `c.${f as string}`).join(', ');
parts.push(`SELECT ${fields} FROM c`);
} else {
parts.push('SELECT * FROM c');
}
// WHERE
const whereClause = query?.filter
? filterToCosmosSQL(query.filter)
: { query: '', parameters: [] };
const parameters = [...whereClause.parameters];
if (whereClause.query) parts.push(whereClause.query);
// ORDER BY
if (query?.sort) {
const orderParts = Object.entries(query.sort)
.map(([field, dir]) => `c.${field} ${dir === 1 ? 'ASC' : 'DESC'}`)
.join(', ');
if (orderParts) parts.push(`ORDER BY ${orderParts}`);
}
// OFFSET / LIMIT
if (query?.offset !== undefined && query?.limit !== undefined) {
parts.push(`OFFSET ${query.offset} LIMIT ${query.limit}`);
} else if (query?.limit !== undefined) {
parts.push(`OFFSET 0 LIMIT ${query.limit}`);
}
return { query: parts.join(' '), parameters };
}
}
function getEnvOrThrow(name: string): string {
const value = process.env[name];
if (!value)
throw new Error(`Environment variable ${name} is required for CosmosDatastoreProvider`);
return value;
}