learning_ai_common_plat/packages/cosmos/src/containers.ts

125 lines
3.7 KiB
TypeScript

/**
* Container registry for dashboards that need partition key validation
* and createIfNotExists support.
*/
import { Container, PartitionKeyDefinition, type Database } from '@azure/cosmos';
import { getCosmosClient, getDatabase } from './client.js';
import type { ContainerConfig } from './types.js';
const _registry: Map<string, ContainerConfig> = new Map();
const _containerCache: Map<string, Container> = new Map();
/**
* Register containers with their partition key configuration.
* Call once at app startup before any getRegisteredContainer() calls.
*/
export function registerContainers(definitions: Record<string, ContainerConfig>): void {
for (const [name, config] of Object.entries(definitions)) {
_registry.set(name, config);
}
}
/**
* Get a container that was previously registered.
* Throws if the container name is unknown.
*/
export function getRegisteredContainer(name: string): Container {
if (!_registry.has(name)) {
throw new Error(`Unknown container '${name}'. Valid: ${[..._registry.keys()].join(', ')}`);
}
let container = _containerCache.get(name);
if (!container) {
container = getDatabase().container(name);
_containerCache.set(name, container);
}
return container;
}
/**
* Create all registered containers if they don't exist.
* Call from a seed script or on first deploy.
*/
export async function initializeAllContainers(): Promise<void> {
const client = getCosmosClient();
const dbId = process.env.COSMOS_DATABASE || 'lysnrai';
const database = await createDatabaseSafe(client, dbId);
for (const [name, config] of _registry.entries()) {
await createContainerSafe(database, name, config);
}
}
function sleep(ms: number): Promise<void> {
return new Promise(resolve => globalThis.setTimeout(resolve, ms));
}
function isCosmosConflict(err: unknown): boolean {
const e = err as { code?: number; statusCode?: number; message?: string } | null;
if (!e) return false;
if (e.code === 409 || e.statusCode === 409) return true;
return (e.message || '').toLowerCase().includes('already exists');
}
function isCosmosNotFound(err: unknown): boolean {
const e = err as { code?: number; statusCode?: number; message?: string } | null;
if (!e) return false;
if (e.code === 404 || e.statusCode === 404) return true;
return (e.message || '').toLowerCase().includes('not found');
}
async function createDatabaseSafe(
client: ReturnType<typeof getCosmosClient>,
dbId: string
): Promise<Database> {
try {
const { database } = await client.databases.createIfNotExists({ id: dbId });
return database;
} catch (err) {
// createIfNotExists is not atomic; concurrent create can race and throw a conflict.
if (isCosmosConflict(err)) return client.database(dbId);
throw err;
}
}
async function createContainerSafe(
database: Database,
name: string,
config: ContainerConfig
): Promise<void> {
const payload = {
id: name,
partitionKey: {
paths: [config.partitionKeyPath],
} as PartitionKeyDefinition,
...(config.defaultTtl != null && { defaultTtl: config.defaultTtl }),
};
for (let attempt = 0; attempt < 3; attempt += 1) {
try {
await database.containers.createIfNotExists(payload);
return;
} catch (err) {
if (isCosmosConflict(err)) return; // Container was created by another process.
// Sometimes the database/container metadata isn't immediately visible after creation.
if (isCosmosNotFound(err) && attempt < 2) {
await sleep(250 * (attempt + 1));
continue;
}
throw err;
}
}
}
/**
* Reset the registry (useful for testing).
* @internal
*/
export function _resetRegistry(): void {
_registry.clear();
_containerCache.clear();
}