Merge PR #29: Phase 2 atomic-claim hardening — true single-winner claim
Adds @bytelyst/datastore updateIfMatch (Cosmos If-Match/_etag + process-atomic memory impl); rewires fleet revUpdate* to conditional writes. Concurrent claim proven via Promise.all + N-claimer stress (fails on old read-check-write, passes now). datastore 48 + fleet 53 green; builds clean; no consumer regressed.
This commit is contained in:
commit
6f6e005114
128
packages/datastore/src/__tests__/cosmos.test.ts
Normal file
128
packages/datastore/src/__tests__/cosmos.test.ts
Normal file
@ -0,0 +1,128 @@
|
|||||||
|
/**
|
||||||
|
* CosmosDatastoreProvider.updateIfMatch — verifies the If-Match (_etag) optimistic
|
||||||
|
* concurrency mapping against a fake @azure/cosmos: a matching etag writes + bumps;
|
||||||
|
* a stale etag surfaces as HTTP 412 -> conflict; a missing doc -> not_found.
|
||||||
|
*
|
||||||
|
* Uses a fake Cosmos client (no live Cosmos) that enforces accessCondition IfMatch.
|
||||||
|
*/
|
||||||
|
|
||||||
|
import { describe, it, expect, beforeEach, vi } from 'vitest';
|
||||||
|
|
||||||
|
interface StoredRec {
|
||||||
|
doc: Record<string, unknown>;
|
||||||
|
etag: string;
|
||||||
|
}
|
||||||
|
|
||||||
|
const h = vi.hoisted(() => {
|
||||||
|
const store = new Map<string, StoredRec>();
|
||||||
|
let etagSeq = 0;
|
||||||
|
return {
|
||||||
|
store,
|
||||||
|
seed(id: string, doc: Record<string, unknown>): void {
|
||||||
|
etagSeq += 1;
|
||||||
|
store.set(id, { doc: { ...doc, _etag: `e${etagSeq}` }, etag: `e${etagSeq}` });
|
||||||
|
},
|
||||||
|
nextEtag(): string {
|
||||||
|
etagSeq += 1;
|
||||||
|
return `e${etagSeq}`;
|
||||||
|
},
|
||||||
|
reset(): void {
|
||||||
|
store.clear();
|
||||||
|
etagSeq = 0;
|
||||||
|
},
|
||||||
|
};
|
||||||
|
});
|
||||||
|
|
||||||
|
vi.mock('@azure/cosmos', () => {
|
||||||
|
type CosmosErr = Error & { code?: number };
|
||||||
|
function err(code: number): CosmosErr {
|
||||||
|
const e = new Error(`cosmos ${code}`) as CosmosErr;
|
||||||
|
e.code = code;
|
||||||
|
return e;
|
||||||
|
}
|
||||||
|
class CosmosClient {
|
||||||
|
constructor(_opts: unknown) {}
|
||||||
|
database(_name: string) {
|
||||||
|
return {
|
||||||
|
container: (_c: string) => ({
|
||||||
|
item: (id: string, _pk: string) => ({
|
||||||
|
read() {
|
||||||
|
const rec = h.store.get(id);
|
||||||
|
if (!rec) throw err(404);
|
||||||
|
return Promise.resolve({ resource: { ...rec.doc, _etag: rec.etag } });
|
||||||
|
},
|
||||||
|
replace(
|
||||||
|
doc: Record<string, unknown>,
|
||||||
|
options?: { accessCondition?: { type: string; condition?: string } }
|
||||||
|
) {
|
||||||
|
const rec = h.store.get(id);
|
||||||
|
if (!rec) throw err(404);
|
||||||
|
const cond = options?.accessCondition?.condition;
|
||||||
|
if (cond !== undefined && cond !== rec.etag) throw err(412);
|
||||||
|
const etag = h.nextEtag();
|
||||||
|
const saved = { ...doc, _etag: etag };
|
||||||
|
h.store.set(id, { doc: saved, etag });
|
||||||
|
return Promise.resolve({ resource: saved });
|
||||||
|
},
|
||||||
|
}),
|
||||||
|
}),
|
||||||
|
};
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return { CosmosClient };
|
||||||
|
});
|
||||||
|
|
||||||
|
import { CosmosDatastoreProvider } from '../providers/cosmos.js';
|
||||||
|
import type { BaseDocument, DocumentCollection } from '../types.js';
|
||||||
|
|
||||||
|
interface RevDoc extends BaseDocument {
|
||||||
|
name: string;
|
||||||
|
rev?: number;
|
||||||
|
}
|
||||||
|
|
||||||
|
describe('CosmosDatastoreProvider.updateIfMatch', () => {
|
||||||
|
let col: DocumentCollection<RevDoc>;
|
||||||
|
beforeEach(() => {
|
||||||
|
h.reset();
|
||||||
|
const provider = new CosmosDatastoreProvider({
|
||||||
|
endpoint: 'https://fake.documents.azure.com:443/',
|
||||||
|
key: 'fake-key',
|
||||||
|
database: 'test',
|
||||||
|
});
|
||||||
|
col = provider.getCollection<RevDoc>('rev', '/productId');
|
||||||
|
h.seed('1', { id: '1', productId: 'p', name: 'A', rev: 0 });
|
||||||
|
});
|
||||||
|
|
||||||
|
it('matching etag -> writes via If-Match and returns ok', async () => {
|
||||||
|
const cur = await col.findById('1', 'p');
|
||||||
|
const res = await col.updateIfMatch('1', 'p', { etag: cur!._etag }, { name: 'B' });
|
||||||
|
expect(res.ok).toBe(true);
|
||||||
|
if (res.ok) {
|
||||||
|
expect(res.doc.name).toBe('B');
|
||||||
|
expect(res.doc.rev).toBe(1);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
it('stale etag -> Cosmos 412 -> conflict, no overwrite', async () => {
|
||||||
|
const cur = await col.findById('1', 'p');
|
||||||
|
const staleEtag = cur!._etag;
|
||||||
|
await col.updateIfMatch('1', 'p', { etag: staleEtag }, { name: 'B' }); // etag rotates
|
||||||
|
const res = await col.updateIfMatch('1', 'p', { etag: staleEtag }, { name: 'C' });
|
||||||
|
expect(res.ok).toBe(false);
|
||||||
|
if (!res.ok) expect(res.reason).toBe('conflict');
|
||||||
|
expect((await col.findById('1', 'p'))?.name).toBe('B');
|
||||||
|
});
|
||||||
|
|
||||||
|
it('stale rev -> conflict before write', async () => {
|
||||||
|
await col.updateIfMatch('1', 'p', { rev: 0 }, { name: 'B' }); // rev -> 1
|
||||||
|
const res = await col.updateIfMatch('1', 'p', { rev: 0 }, { name: 'C' });
|
||||||
|
expect(res.ok).toBe(false);
|
||||||
|
if (!res.ok) expect(res.reason).toBe('conflict');
|
||||||
|
});
|
||||||
|
|
||||||
|
it('missing doc -> not_found', async () => {
|
||||||
|
const res = await col.updateIfMatch('missing', 'p', { rev: 0 }, { name: 'X' });
|
||||||
|
expect(res.ok).toBe(false);
|
||||||
|
if (!res.ok) expect(res.reason).toBe('not_found');
|
||||||
|
});
|
||||||
|
});
|
||||||
@ -196,6 +196,71 @@ describe('MemoryDatastoreProvider', () => {
|
|||||||
await expect(collection.rawQuery('SELECT * FROM c')).rejects.toThrow('not supported');
|
await expect(collection.rawQuery('SELECT * FROM c')).rejects.toThrow('not supported');
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
|
describe('updateIfMatch (optimistic concurrency)', () => {
|
||||||
|
interface RevDoc extends BaseDocument {
|
||||||
|
name: string;
|
||||||
|
rev?: number;
|
||||||
|
}
|
||||||
|
let revCol: DocumentCollection<RevDoc>;
|
||||||
|
beforeEach(async () => {
|
||||||
|
revCol = provider.getCollection<RevDoc>('rev', '/productId');
|
||||||
|
await revCol.create({ id: '1', productId: 'p', name: 'A', rev: 0 });
|
||||||
|
});
|
||||||
|
|
||||||
|
it('matching rev writes the patch and bumps rev + _etag', async () => {
|
||||||
|
const res = await revCol.updateIfMatch('1', 'p', { rev: 0 }, { name: 'B' });
|
||||||
|
expect(res.ok).toBe(true);
|
||||||
|
if (res.ok) {
|
||||||
|
expect(res.doc.name).toBe('B');
|
||||||
|
expect(res.doc.rev).toBe(1);
|
||||||
|
expect(res.doc._etag).toBeDefined();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
it('stale rev => conflict and NO write', async () => {
|
||||||
|
await revCol.updateIfMatch('1', 'p', { rev: 0 }, { name: 'B' }); // rev -> 1
|
||||||
|
const res = await revCol.updateIfMatch('1', 'p', { rev: 0 }, { name: 'C' });
|
||||||
|
expect(res.ok).toBe(false);
|
||||||
|
if (!res.ok) expect(res.reason).toBe('conflict');
|
||||||
|
// value unchanged by the losing write
|
||||||
|
expect((await revCol.findById('1', 'p'))?.name).toBe('B');
|
||||||
|
});
|
||||||
|
|
||||||
|
it('matching etag writes; stale etag => conflict', async () => {
|
||||||
|
const first = await revCol.updateIfMatch('1', 'p', { rev: 0 }, { name: 'B' });
|
||||||
|
const goodEtag = first.ok ? first.doc._etag : undefined;
|
||||||
|
const ok = await revCol.updateIfMatch('1', 'p', { etag: goodEtag }, { name: 'C' });
|
||||||
|
expect(ok.ok).toBe(true);
|
||||||
|
const stale = await revCol.updateIfMatch('1', 'p', { etag: goodEtag }, { name: 'D' });
|
||||||
|
expect(stale.ok).toBe(false);
|
||||||
|
if (!stale.ok) expect(stale.reason).toBe('conflict');
|
||||||
|
});
|
||||||
|
|
||||||
|
it('missing doc => not_found', async () => {
|
||||||
|
const res = await revCol.updateIfMatch('nope', 'p', { rev: 0 }, { name: 'X' });
|
||||||
|
expect(res.ok).toBe(false);
|
||||||
|
if (!res.ok) expect(res.reason).toBe('not_found');
|
||||||
|
});
|
||||||
|
|
||||||
|
// THE PROOF: N concurrent compare-and-set on the SAME version — exactly one wins.
|
||||||
|
it('is atomic under TRUE concurrency: N concurrent claims => exactly one winner', async () => {
|
||||||
|
const N = 25;
|
||||||
|
const results = await Promise.all(
|
||||||
|
Array.from({ length: N }, (_, i) =>
|
||||||
|
revCol.updateIfMatch('1', 'p', { rev: 0 }, { name: `claimer-${i}` })
|
||||||
|
)
|
||||||
|
);
|
||||||
|
const winners = results.filter(r => r.ok);
|
||||||
|
const conflicts = results.filter(r => !r.ok);
|
||||||
|
expect(winners).toHaveLength(1);
|
||||||
|
expect(conflicts).toHaveLength(N - 1);
|
||||||
|
expect(conflicts.every(r => !r.ok && r.reason === 'conflict')).toBe(true);
|
||||||
|
// the stored doc reflects exactly one applied write at rev 1
|
||||||
|
const stored = await revCol.findById('1', 'p');
|
||||||
|
expect(stored?.rev).toBe(1);
|
||||||
|
});
|
||||||
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
describe('matchesFilter', () => {
|
describe('matchesFilter', () => {
|
||||||
|
|||||||
@ -10,6 +10,8 @@ export type {
|
|||||||
DocumentCollection,
|
DocumentCollection,
|
||||||
DatastoreProvider,
|
DatastoreProvider,
|
||||||
DatastoreProviderType,
|
DatastoreProviderType,
|
||||||
|
ConcurrencyToken,
|
||||||
|
UpdateIfMatchResult,
|
||||||
} from './types.js';
|
} from './types.js';
|
||||||
|
|
||||||
export { getDatastore, createDatastoreProvider, setDatastore, _resetDatastore } from './factory.js';
|
export { getDatastore, createDatastoreProvider, setDatastore, _resetDatastore } from './factory.js';
|
||||||
|
|||||||
@ -10,9 +10,11 @@ import type {
|
|||||||
AggregateQuery,
|
AggregateQuery,
|
||||||
BaseDocument,
|
BaseDocument,
|
||||||
CollectionQuery,
|
CollectionQuery,
|
||||||
|
ConcurrencyToken,
|
||||||
DatastoreProvider,
|
DatastoreProvider,
|
||||||
DocumentCollection,
|
DocumentCollection,
|
||||||
FilterMap,
|
FilterMap,
|
||||||
|
UpdateIfMatchResult,
|
||||||
} from '../types.js';
|
} from '../types.js';
|
||||||
|
|
||||||
export interface CosmosProviderConfig {
|
export interface CosmosProviderConfig {
|
||||||
@ -145,6 +147,56 @@ class CosmosCollection<T extends BaseDocument> implements DocumentCollection<T>
|
|||||||
return resource as T;
|
return resource as T;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Atomic compare-and-set via Cosmos optimistic concurrency. Conditions the
|
||||||
|
* replace on the document `_etag` (`accessCondition: { type: 'IfMatch' }`), so
|
||||||
|
* the server rejects the write (HTTP 412) if any other writer committed since
|
||||||
|
* the version this caller read — exactly one concurrent claimer wins. A 412 maps
|
||||||
|
* to `{ ok: false, reason: 'conflict' }`, a 404 to `'not_found'`. `rev` is also
|
||||||
|
* compared (when supplied) and bumped, for parity with the memory provider.
|
||||||
|
*/
|
||||||
|
async updateIfMatch(
|
||||||
|
id: string,
|
||||||
|
partitionKey: string,
|
||||||
|
expected: ConcurrencyToken,
|
||||||
|
patch: Partial<T>
|
||||||
|
): Promise<UpdateIfMatchResult<T>> {
|
||||||
|
const c = await this.container();
|
||||||
|
let existing: T | undefined;
|
||||||
|
try {
|
||||||
|
const read = await c.item(id, partitionKey).read<T>();
|
||||||
|
existing = read.resource;
|
||||||
|
} catch (err: unknown) {
|
||||||
|
if ((err as { code?: number })?.code === 404) return { ok: false, reason: 'not_found' };
|
||||||
|
throw err;
|
||||||
|
}
|
||||||
|
if (!existing) return { ok: false, reason: 'not_found' };
|
||||||
|
|
||||||
|
const cur = existing as T & { rev?: number };
|
||||||
|
// If the caller pinned a rev and it already moved on, a concurrent writer
|
||||||
|
// committed before our read — conflict without attempting the write.
|
||||||
|
if (expected.rev !== undefined && (cur.rev ?? 0) !== expected.rev) {
|
||||||
|
return { ok: false, reason: 'conflict' };
|
||||||
|
}
|
||||||
|
// Condition the write on the caller's etag when given, else the just-read
|
||||||
|
// etag — either way the IfMatch guards the read→replace window server-side.
|
||||||
|
const condition = expected.etag ?? cur._etag;
|
||||||
|
const nextRev = (cur.rev ?? 0) + 1;
|
||||||
|
const merged = { ...existing, ...patch, rev: nextRev } as T;
|
||||||
|
|
||||||
|
try {
|
||||||
|
const { resource } = await c.item(id, partitionKey).replace<T>(merged, {
|
||||||
|
accessCondition: condition ? { type: 'IfMatch', condition } : undefined,
|
||||||
|
});
|
||||||
|
return { ok: true, doc: resource as T };
|
||||||
|
} catch (err: unknown) {
|
||||||
|
const code = (err as { code?: number })?.code;
|
||||||
|
if (code === 412) return { ok: false, reason: 'conflict' };
|
||||||
|
if (code === 404) return { ok: false, reason: 'not_found' };
|
||||||
|
throw err;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
async upsert(doc: T): Promise<T> {
|
async upsert(doc: T): Promise<T> {
|
||||||
const c = await this.container();
|
const c = await this.container();
|
||||||
const { resource } = await c.items.upsert<T>(doc);
|
const { resource } = await c.items.upsert<T>(doc);
|
||||||
|
|||||||
@ -9,9 +9,11 @@ import type {
|
|||||||
AggregateQuery,
|
AggregateQuery,
|
||||||
BaseDocument,
|
BaseDocument,
|
||||||
CollectionQuery,
|
CollectionQuery,
|
||||||
|
ConcurrencyToken,
|
||||||
DatastoreProvider,
|
DatastoreProvider,
|
||||||
DocumentCollection,
|
DocumentCollection,
|
||||||
FilterMap,
|
FilterMap,
|
||||||
|
UpdateIfMatchResult,
|
||||||
} from '../types.js';
|
} from '../types.js';
|
||||||
|
|
||||||
function deepClone<T>(obj: T): T {
|
function deepClone<T>(obj: T): T {
|
||||||
@ -123,6 +125,34 @@ class MemoryCollection<T extends BaseDocument> implements DocumentCollection<T>
|
|||||||
return deepClone(merged);
|
return deepClone(merged);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Atomic compare-and-set. The entire get → compare → set runs SYNCHRONOUSLY in
|
||||||
|
* one block with NO await/yield, so under the single-threaded event loop two
|
||||||
|
* concurrent callers can never interleave between the compare and the set — the
|
||||||
|
* first wins (bumps `rev` + `_etag`), the rest see a mismatch and get `conflict`.
|
||||||
|
*/
|
||||||
|
async updateIfMatch(
|
||||||
|
id: string,
|
||||||
|
_partitionKey: string,
|
||||||
|
expected: ConcurrencyToken,
|
||||||
|
patch: Partial<T>
|
||||||
|
): Promise<UpdateIfMatchResult<T>> {
|
||||||
|
const existing = this.docs.get(id);
|
||||||
|
if (!existing) return { ok: false, reason: 'not_found' };
|
||||||
|
const rec = existing as BaseDocument & { rev?: number };
|
||||||
|
const curRev = typeof rec.rev === 'number' ? rec.rev : 0;
|
||||||
|
if (expected.rev !== undefined && expected.rev !== curRev) {
|
||||||
|
return { ok: false, reason: 'conflict' };
|
||||||
|
}
|
||||||
|
if (expected.etag !== undefined && expected.etag !== rec._etag) {
|
||||||
|
return { ok: false, reason: 'conflict' };
|
||||||
|
}
|
||||||
|
const nextRev = curRev + 1;
|
||||||
|
const merged = { ...existing, ...patch, rev: nextRev, _etag: `m${nextRev}` } as T;
|
||||||
|
this.docs.set(id, deepClone(merged));
|
||||||
|
return { ok: true, doc: deepClone(merged) };
|
||||||
|
}
|
||||||
|
|
||||||
async upsert(doc: T): Promise<T> {
|
async upsert(doc: T): Promise<T> {
|
||||||
const clone = deepClone(doc);
|
const clone = deepClone(doc);
|
||||||
this.docs.set(doc.id, clone);
|
this.docs.set(doc.id, clone);
|
||||||
|
|||||||
@ -10,6 +10,11 @@
|
|||||||
export interface BaseDocument {
|
export interface BaseDocument {
|
||||||
id: string;
|
id: string;
|
||||||
productId: string;
|
productId: string;
|
||||||
|
/**
|
||||||
|
* Optimistic-concurrency tag surfaced by the provider (Cosmos `_etag`). Optional
|
||||||
|
* and provider-managed — callers read it back and pass it to `updateIfMatch`.
|
||||||
|
*/
|
||||||
|
_etag?: string;
|
||||||
}
|
}
|
||||||
|
|
||||||
// ── Filter operators ────────────────────────────────────────────────────────
|
// ── Filter operators ────────────────────────────────────────────────────────
|
||||||
@ -57,6 +62,24 @@ export interface AggregationField {
|
|||||||
alias: string;
|
alias: string;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ── Optimistic concurrency ──────────────────────────────────────────────────
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The version a conditional write expects the stored document to still be at.
|
||||||
|
* `etag` is the provider tag (Cosmos `_etag`); `rev` is a portable monotonic
|
||||||
|
* counter maintained for parity (and used by the memory provider). Provide either
|
||||||
|
* or both — the write succeeds only if every supplied token still matches.
|
||||||
|
*/
|
||||||
|
export interface ConcurrencyToken {
|
||||||
|
etag?: string;
|
||||||
|
rev?: number;
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Result of a conditional (compare-and-swap) write. */
|
||||||
|
export type UpdateIfMatchResult<T> =
|
||||||
|
| { ok: true; doc: T }
|
||||||
|
| { ok: false; reason: 'conflict' | 'not_found' };
|
||||||
|
|
||||||
// ── Document collection interface ───────────────────────────────────────────
|
// ── Document collection interface ───────────────────────────────────────────
|
||||||
|
|
||||||
export interface DocumentCollection<T extends BaseDocument = BaseDocument> {
|
export interface DocumentCollection<T extends BaseDocument = BaseDocument> {
|
||||||
@ -78,6 +101,21 @@ export interface DocumentCollection<T extends BaseDocument = BaseDocument> {
|
|||||||
/** Update a document by ID + partition key (merge semantics). */
|
/** Update a document by ID + partition key (merge semantics). */
|
||||||
update(id: string, partitionKey: string, updates: Partial<T>): Promise<T>;
|
update(id: string, partitionKey: string, updates: Partial<T>): Promise<T>;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Optimistic-concurrency update: merge `patch` into the document only if its
|
||||||
|
* stored version still matches `expected` (Cosmos If-Match on `_etag`; an
|
||||||
|
* atomic compare-and-set on `rev` for the memory provider). Bumps `rev` and
|
||||||
|
* surfaces a fresh `_etag` on success. Never overwrites a doc that changed —
|
||||||
|
* returns `{ ok: false, reason: 'conflict' }` instead, and `'not_found'` when
|
||||||
|
* the document is absent. This is the single-winner primitive for atomic claims.
|
||||||
|
*/
|
||||||
|
updateIfMatch(
|
||||||
|
id: string,
|
||||||
|
partitionKey: string,
|
||||||
|
expected: ConcurrencyToken,
|
||||||
|
patch: Partial<T>
|
||||||
|
): Promise<UpdateIfMatchResult<T>>;
|
||||||
|
|
||||||
/** Upsert a document (create or replace). */
|
/** Upsert a document (create or replace). */
|
||||||
upsert(doc: T): Promise<T>;
|
upsert(doc: T): Promise<T>;
|
||||||
|
|
||||||
|
|||||||
@ -46,13 +46,16 @@ describe('fleet coordinator', () => {
|
|||||||
expect(job.productId).toBe(PID);
|
expect(job.productId).toBe(PID);
|
||||||
});
|
});
|
||||||
|
|
||||||
// ── ATOMIC CLAIM RACE ──
|
// ── ATOMIC CLAIM RACE (TRUE concurrency, not sequential) ──
|
||||||
it('atomic claim: two contenders on the same job version — exactly one wins', async () => {
|
it('atomic claim: two TRULY concurrent contenders on the same job version — exactly one wins', async () => {
|
||||||
const { job } = await coord.submitJob(PID, input());
|
const { job } = await coord.submitJob(PID, input());
|
||||||
|
|
||||||
// Both contenders see the SAME job version (rev). The CAS picks one winner.
|
// Both contenders see the SAME freshly-read job version and race via Promise.all.
|
||||||
const a = await coord.tryClaimJob(job, factory({ factoryId: 'fac_A' }));
|
// The datastore compare-and-set (updateIfMatch) admits exactly one winner.
|
||||||
const b = await coord.tryClaimJob(job, factory({ factoryId: 'fac_B' }));
|
const [a, b] = await Promise.all([
|
||||||
|
coord.tryClaimJob(job, factory({ factoryId: 'fac_A' })),
|
||||||
|
coord.tryClaimJob(job, factory({ factoryId: 'fac_B' })),
|
||||||
|
]);
|
||||||
|
|
||||||
const oks = [a, b].filter(r => r.ok);
|
const oks = [a, b].filter(r => r.ok);
|
||||||
const conflicts = [a, b].filter(r => !r.ok);
|
const conflicts = [a, b].filter(r => !r.ok);
|
||||||
@ -60,7 +63,7 @@ describe('fleet coordinator', () => {
|
|||||||
expect(conflicts).toHaveLength(1);
|
expect(conflicts).toHaveLength(1);
|
||||||
expect(conflicts[0].ok === false && conflicts[0].reason).toBe('conflict');
|
expect(conflicts[0].ok === false && conflicts[0].reason).toBe('conflict');
|
||||||
|
|
||||||
// no double-assignment: the job is assigned exactly once, single run, single holder
|
// no double-assignment: assigned exactly once, single run, single lease holder
|
||||||
const stored = await repo.getJob(job.id, PID);
|
const stored = await repo.getJob(job.id, PID);
|
||||||
expect(stored?.stage).toBe('assigned');
|
expect(stored?.stage).toBe('assigned');
|
||||||
expect(stored?.attempts).toBe(1);
|
expect(stored?.attempts).toBe(1);
|
||||||
@ -69,6 +72,45 @@ describe('fleet coordinator', () => {
|
|||||||
expect(lease?.leaseEpoch).toBe(1);
|
expect(lease?.leaseEpoch).toBe(1);
|
||||||
});
|
});
|
||||||
|
|
||||||
|
it('atomic claim: N concurrent contenders (stress) — exactly one ok, N-1 conflicts, no double-assignment', async () => {
|
||||||
|
const N = 15;
|
||||||
|
const { job } = await coord.submitJob(PID, input());
|
||||||
|
const results = await Promise.all(
|
||||||
|
Array.from({ length: N }, (_, i) =>
|
||||||
|
coord.tryClaimJob(job, factory({ factoryId: `fac_${i}` }))
|
||||||
|
)
|
||||||
|
);
|
||||||
|
expect(results.filter(r => r.ok)).toHaveLength(1);
|
||||||
|
expect(results.filter(r => !r.ok && r.reason === 'conflict')).toHaveLength(N - 1);
|
||||||
|
// exactly one run + one assigned job + leaseEpoch 1
|
||||||
|
expect(await repo.listRunsByJob(job.id)).toHaveLength(1);
|
||||||
|
const stored = await repo.getJob(job.id, PID);
|
||||||
|
expect(stored?.stage).toBe('assigned');
|
||||||
|
expect(stored?.attempts).toBe(1);
|
||||||
|
expect((await repo.getLease(job.id))?.leaseEpoch).toBe(1);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('claimNextJob: N concurrent claimers for a single queued job — exactly one succeeds', async () => {
|
||||||
|
await coord.submitJob(PID, input());
|
||||||
|
const N = 12;
|
||||||
|
const results = await Promise.all(
|
||||||
|
Array.from({ length: N }, (_, i) => coord.claimNextJob(factory({ factoryId: `f_${i}` })))
|
||||||
|
);
|
||||||
|
expect(results.filter(r => r !== null)).toHaveLength(1);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('lease renew under contention — exactly one concurrent renewal wins', async () => {
|
||||||
|
const { job } = await coord.submitJob(PID, input());
|
||||||
|
const claim = await coord.claimNextJob(factory());
|
||||||
|
const epoch = claim!.job.leaseEpoch;
|
||||||
|
const N = 10;
|
||||||
|
const results = await Promise.all(
|
||||||
|
Array.from({ length: N }, () => coord.renewLease(job.id, PID, epoch, 600))
|
||||||
|
);
|
||||||
|
expect(results.filter(r => r.ok)).toHaveLength(1);
|
||||||
|
expect(results.filter(r => !r.ok && r.reason === 'conflict')).toHaveLength(N - 1);
|
||||||
|
});
|
||||||
|
|
||||||
// ── PRIORITY + AGE SELECTION ──
|
// ── PRIORITY + AGE SELECTION ──
|
||||||
it('claimNextJob returns highest-priority then oldest', async () => {
|
it('claimNextJob returns highest-priority then oldest', async () => {
|
||||||
await coord.submitJob(PID, input({ idempotencyKey: 'low-old', priority: 'low' }));
|
await coord.submitJob(PID, input({ idempotencyKey: 'low-old', priority: 'low' }));
|
||||||
|
|||||||
@ -109,22 +109,25 @@ export async function updateJob(
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Compare-and-swap on `rev` — the atomic-claim / fenced-transition primitive. */
|
/**
|
||||||
|
* Compare-and-swap on `rev` — the atomic-claim / fenced-transition primitive.
|
||||||
|
* Delegates to the datastore's `updateIfMatch`, which performs the compare and the
|
||||||
|
* write as one indivisible operation (Cosmos If-Match; a synchronous compare-set on
|
||||||
|
* the memory provider). This is genuinely atomic under TRUE concurrency — there is
|
||||||
|
* no read → check → write with an intervening await that two callers could interleave.
|
||||||
|
*/
|
||||||
export async function revUpdateJob(
|
export async function revUpdateJob(
|
||||||
id: string,
|
id: string,
|
||||||
productId: string,
|
productId: string,
|
||||||
expectedRev: number,
|
expectedRev: number,
|
||||||
updates: Partial<FleetJobDoc>
|
updates: Partial<FleetJobDoc>
|
||||||
): Promise<RevResult<FleetJobDoc>> {
|
): Promise<RevResult<FleetJobDoc>> {
|
||||||
const cur = await jobs().findById(id, productId);
|
return jobs().updateIfMatch(
|
||||||
if (!cur) return { ok: false, reason: 'not_found' };
|
id,
|
||||||
if ((cur.rev ?? 0) !== expectedRev) return { ok: false, reason: 'conflict' };
|
productId,
|
||||||
const doc = await jobs().update(id, productId, {
|
{ rev: expectedRev },
|
||||||
...updates,
|
{ ...updates, updatedAt: new Date().toISOString() }
|
||||||
rev: expectedRev + 1,
|
);
|
||||||
updatedAt: new Date().toISOString(),
|
|
||||||
});
|
|
||||||
return { ok: true, doc };
|
|
||||||
}
|
}
|
||||||
|
|
||||||
export async function deleteJob(id: string, productId: string): Promise<void> {
|
export async function deleteJob(id: string, productId: string): Promise<void> {
|
||||||
@ -161,20 +164,18 @@ export async function createLease(doc: FleetLeaseDoc): Promise<FleetLeaseDoc> {
|
|||||||
return leases().create(doc);
|
return leases().create(doc);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/** Compare-and-swap on a lease's `rev` — atomic via the datastore updateIfMatch. */
|
||||||
export async function revUpdateLease(
|
export async function revUpdateLease(
|
||||||
jobId: string,
|
jobId: string,
|
||||||
expectedRev: number,
|
expectedRev: number,
|
||||||
updates: Partial<FleetLeaseDoc>
|
updates: Partial<FleetLeaseDoc>
|
||||||
): Promise<RevResult<FleetLeaseDoc>> {
|
): Promise<RevResult<FleetLeaseDoc>> {
|
||||||
const cur = await leases().findById(jobId, jobId);
|
return leases().updateIfMatch(
|
||||||
if (!cur) return { ok: false, reason: 'not_found' };
|
jobId,
|
||||||
if ((cur.rev ?? 0) !== expectedRev) return { ok: false, reason: 'conflict' };
|
jobId,
|
||||||
const doc = await leases().update(jobId, jobId, {
|
{ rev: expectedRev },
|
||||||
...updates,
|
{ ...updates, updatedAt: new Date().toISOString() }
|
||||||
rev: expectedRev + 1,
|
);
|
||||||
updatedAt: new Date().toISOString(),
|
|
||||||
});
|
|
||||||
return { ok: true, doc };
|
|
||||||
}
|
}
|
||||||
|
|
||||||
export async function listExpiredLeases(nowIso: string): Promise<FleetLeaseDoc[]> {
|
export async function listExpiredLeases(nowIso: string): Promise<FleetLeaseDoc[]> {
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user