fix(cosmos): support composite indexes; add fleet_jobs (priorityOrder, createdAt)
Azure Cosmos cannot serve a multi-field ORDER BY without a matching composite index (the local emulator is lenient, real Cosmos returns HTTP 400). The fleet listJobs() query orders by (priorityOrder, createdAt), which broke GET /api/fleet/metrics and /api/fleet/jobs on real Cosmos. - ContainerConfig gains an optional `compositeIndexes` field - container init applies it on create AND reconciles it onto existing containers (createIfNotExists never updates an existing index policy) - fleet_jobs declares the (priorityOrder ASC, createdAt ASC) composite index Verified live against Azure Cosmos: both endpoints now return 200. Generated with [Devin](https://cli.devin.ai/docs) Co-Authored-By: Devin <158243242+devin-ai-integration[bot]@users.noreply.github.com>
This commit is contained in:
parent
c1f85050a0
commit
e0a904c7ea
@ -83,11 +83,72 @@ async function createDatabaseSafe(
|
||||
}
|
||||
}
|
||||
|
||||
function buildIndexingPolicy(config: ContainerConfig) {
|
||||
if (!config.compositeIndexes || config.compositeIndexes.length === 0) return undefined;
|
||||
return {
|
||||
indexingMode: 'consistent' as const,
|
||||
automatic: true,
|
||||
includedPaths: [{ path: '/*' }],
|
||||
compositeIndexes: config.compositeIndexes.map(group =>
|
||||
group.map(p => ({ path: p.path, order: p.order ?? 'ascending' }))
|
||||
),
|
||||
};
|
||||
}
|
||||
|
||||
/** Normalize composite indexes for order-insensitive equality (default = ascending). */
|
||||
function normalizeComposites(composites: unknown): string {
|
||||
const arr = Array.isArray(composites) ? composites : [];
|
||||
return JSON.stringify(
|
||||
arr.map((group: unknown) =>
|
||||
(Array.isArray(group) ? group : []).map((p: { path?: string; order?: string }) => ({
|
||||
path: p.path,
|
||||
order: p.order ?? 'ascending',
|
||||
}))
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* Reconcile composite indexes onto an EXISTING container. `createIfNotExists`
|
||||
* never updates an existing container's indexing policy, so we read the current
|
||||
* definition and replace it only when the composite indexes differ.
|
||||
*/
|
||||
async function ensureIndexingPolicy(
|
||||
database: Database,
|
||||
name: string,
|
||||
config: ContainerConfig
|
||||
): Promise<void> {
|
||||
const desired = buildIndexingPolicy(config);
|
||||
if (!desired) return;
|
||||
const container = database.container(name);
|
||||
const { resource: def } = await container.read();
|
||||
if (!def) return;
|
||||
const current = (def.indexingPolicy ?? {}) as { compositeIndexes?: unknown };
|
||||
if (
|
||||
normalizeComposites(current.compositeIndexes) === normalizeComposites(desired.compositeIndexes)
|
||||
) {
|
||||
return; // already in place
|
||||
}
|
||||
await container.replace({
|
||||
id: name,
|
||||
partitionKey: def.partitionKey as PartitionKeyDefinition,
|
||||
...(def.defaultTtl != null && { defaultTtl: def.defaultTtl }),
|
||||
indexingPolicy: {
|
||||
...(def.indexingPolicy ?? {}),
|
||||
indexingMode: 'consistent',
|
||||
automatic: true,
|
||||
includedPaths: def.indexingPolicy?.includedPaths ?? desired.includedPaths,
|
||||
compositeIndexes: desired.compositeIndexes,
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
async function createContainerSafe(
|
||||
database: Database,
|
||||
name: string,
|
||||
config: ContainerConfig
|
||||
): Promise<void> {
|
||||
const indexingPolicy = buildIndexingPolicy(config);
|
||||
const payload = {
|
||||
id: name,
|
||||
partitionKey: {
|
||||
@ -95,14 +156,27 @@ async function createContainerSafe(
|
||||
kind: 'Hash',
|
||||
} as PartitionKeyDefinition,
|
||||
...(config.defaultTtl != null && { defaultTtl: config.defaultTtl }),
|
||||
...(indexingPolicy && { indexingPolicy }),
|
||||
};
|
||||
|
||||
for (let attempt = 0; attempt < 3; attempt += 1) {
|
||||
try {
|
||||
await database.containers.createIfNotExists(payload);
|
||||
// createIfNotExists won't update an existing container's index policy.
|
||||
if (indexingPolicy) await ensureIndexingPolicy(database, name, config);
|
||||
return;
|
||||
} catch (err) {
|
||||
if (isCosmosConflict(err)) return; // Container was created by another process.
|
||||
if (isCosmosConflict(err)) {
|
||||
// Container already existed — still reconcile its indexing policy (best-effort).
|
||||
if (indexingPolicy) {
|
||||
try {
|
||||
await ensureIndexingPolicy(database, name, config);
|
||||
} catch {
|
||||
/* reconciliation is best-effort; container is usable */
|
||||
}
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
// Sometimes the database/container metadata isn't immediately visible after creation.
|
||||
if (isCosmosNotFound(err) && attempt < 2) {
|
||||
|
||||
@ -5,4 +5,4 @@ export {
|
||||
initializeAllContainers,
|
||||
_resetRegistry,
|
||||
} from './containers.js';
|
||||
export type { ContainerConfig } from './types.js';
|
||||
export type { ContainerConfig, CompositeIndexPath } from './types.js';
|
||||
|
||||
@ -1,4 +1,19 @@
|
||||
/** A single path within a composite index, with its sort order. */
|
||||
export interface CompositeIndexPath {
|
||||
path: string;
|
||||
order?: 'ascending' | 'descending';
|
||||
}
|
||||
|
||||
export interface ContainerConfig {
|
||||
partitionKeyPath: string;
|
||||
defaultTtl?: number | null;
|
||||
/**
|
||||
* Composite indexes for multi-field ORDER BY queries. Azure Cosmos cannot
|
||||
* serve a query that orders by two or more fields without a matching
|
||||
* composite index (the local emulator is lenient, real Cosmos is not).
|
||||
* Each entry is an ordered list of paths, e.g.
|
||||
* `[[{ path: '/priorityOrder' }, { path: '/createdAt' }]]`.
|
||||
* Applied on container creation and reconciled onto existing containers.
|
||||
*/
|
||||
compositeIndexes?: CompositeIndexPath[][];
|
||||
}
|
||||
|
||||
@ -188,7 +188,17 @@ const CONTAINER_DEFS: Record<string, ContainerConfig> = {
|
||||
translations: { partitionKeyPath: '/locale' },
|
||||
i18n_locales: { partitionKeyPath: '/locale' },
|
||||
// Agent Gigafactory — fleet coordinator (see modules/fleet/README.md)
|
||||
fleet_jobs: { partitionKeyPath: '/productId' },
|
||||
fleet_jobs: {
|
||||
partitionKeyPath: '/productId',
|
||||
// listJobs() orders by (priorityOrder, createdAt) — Azure Cosmos requires a
|
||||
// matching composite index to serve multi-field ORDER BY (e.g. fleetMetrics).
|
||||
compositeIndexes: [
|
||||
[
|
||||
{ path: '/priorityOrder', order: 'ascending' },
|
||||
{ path: '/createdAt', order: 'ascending' },
|
||||
],
|
||||
],
|
||||
},
|
||||
fleet_runs: { partitionKeyPath: '/jobId' },
|
||||
fleet_leases: { partitionKeyPath: '/jobId' },
|
||||
fleet_factories: { partitionKeyPath: '/productId' },
|
||||
|
||||
Loading…
Reference in New Issue
Block a user