refactor(platform-service): migrate webhooks/routes and migrations/runner from cosmos.js to datastore

This commit is contained in:
saravanakumardb1 2026-03-02 02:02:47 -08:00
parent 8d9fc4b8d4
commit 5401fad419
2 changed files with 23 additions and 26 deletions

View File

@ -1,12 +1,13 @@
import { getContainer } from '../lib/cosmos.js'; import { getCollection } from '../lib/datastore.js';
import type { BaseDocument, DocumentCollection } from '@bytelyst/datastore';
import { MIGRATIONS } from './registry.js'; import { MIGRATIONS } from './registry.js';
import type { MigrationDoc } from './types.js'; import type { MigrationDoc } from './types.js';
const CONTAINER = 'migrations'; const CONTAINER = 'migrations';
const PRODUCT_ID = 'platform'; const PRODUCT_ID = 'platform';
function container() { function collection(): DocumentCollection<MigrationDoc & BaseDocument> {
return getContainer(CONTAINER); return getCollection<MigrationDoc & BaseDocument>(CONTAINER, '/productId');
} }
/** /**
@ -25,13 +26,11 @@ export async function runPendingMigrations(): Promise<{
// Fetch already-applied migrations // Fetch already-applied migrations
let appliedVersions: Set<number>; let appliedVersions: Set<number>;
try { try {
const { resources } = await container() const results = await collection().rawQuery<{ version: number }>(
.items.query<MigrationDoc>({ "SELECT c.version FROM c WHERE c.productId = @pid AND c.status = 'applied'",
query: "SELECT c.version FROM c WHERE c.productId = @pid AND c.status = 'applied'", { pid: PRODUCT_ID }
parameters: [{ name: '@pid', value: PRODUCT_ID }], );
}) appliedVersions = new Set(results.map(r => r.version));
.fetchAll();
appliedVersions = new Set(resources.map(r => r.version));
} catch { } catch {
// Container may not exist yet — treat as empty // Container may not exist yet — treat as empty
appliedVersions = new Set(); appliedVersions = new Set();
@ -44,7 +43,7 @@ export async function runPendingMigrations(): Promise<{
} }
const startTime = Date.now(); const startTime = Date.now();
const doc: MigrationDoc = { const doc = {
id: migration.name, id: migration.name,
productId: PRODUCT_ID, productId: PRODUCT_ID,
version: migration.version, version: migration.version,
@ -52,7 +51,7 @@ export async function runPendingMigrations(): Promise<{
description: migration.description, description: migration.description,
appliedAt: new Date().toISOString(), appliedAt: new Date().toISOString(),
durationMs: 0, durationMs: 0,
status: 'applied', status: 'applied' as MigrationDoc['status'],
}; };
try { try {
@ -63,14 +62,16 @@ export async function runPendingMigrations(): Promise<{
} catch (err) { } catch (err) {
doc.durationMs = Date.now() - startTime; doc.durationMs = Date.now() - startTime;
doc.status = 'failed'; doc.status = 'failed';
doc.error = err instanceof Error ? err.message : String(err); (doc as MigrationDoc).error = err instanceof Error ? err.message : String(err);
failed.push(migration.name); failed.push(migration.name);
process.stderr.write(`[migrations] FAILED ${migration.name}: ${doc.error}\n`); process.stderr.write(
`[migrations] FAILED ${migration.name}: ${(doc as MigrationDoc).error}\n`
);
} }
// Record migration result (best-effort) // Record migration result (best-effort)
try { try {
await container().items.upsert(doc); await collection().upsert(doc as MigrationDoc & BaseDocument);
} catch (recordErr) { } catch (recordErr) {
process.stderr.write( process.stderr.write(
`[migrations] Failed to record migration ${migration.name}: ${recordErr}\n` `[migrations] Failed to record migration ${migration.name}: ${recordErr}\n`
@ -95,13 +96,10 @@ export async function runPendingMigrations(): Promise<{
*/ */
export async function listMigrations(): Promise<MigrationDoc[]> { export async function listMigrations(): Promise<MigrationDoc[]> {
try { try {
const { resources } = await container() return await collection().rawQuery<MigrationDoc>(
.items.query<MigrationDoc>({ 'SELECT * FROM c WHERE c.productId = @pid ORDER BY c.version ASC',
query: 'SELECT * FROM c WHERE c.productId = @pid ORDER BY c.version ASC', { pid: PRODUCT_ID }
parameters: [{ name: '@pid', value: PRODUCT_ID }], );
})
.fetchAll();
return resources;
} catch { } catch {
return []; return [];
} }

View File

@ -119,10 +119,9 @@ export async function webhookRoutes(app: FastifyInstance) {
sub.secret = newSecret; sub.secret = newSecret;
sub.updatedAt = new Date().toISOString(); sub.updatedAt = new Date().toISOString();
await repo.updateSubscription(id, productId, {}); // upsert the full doc to persist the new secret (not exposed via UpdateSchema)
// Direct replace to update secret (not exposed via UpdateSchema) const { getCollection } = await import('../../lib/datastore.js');
const container = (await import('../../lib/cosmos.js')).getContainer('webhook_subscriptions'); await getCollection('webhook_subscriptions', '/productId').upsert(sub);
await container.item(id, productId).replace(sub);
return { secret: newSecret, message: 'Secret rotated. Update your webhook consumer.' }; return { secret: newSecret, message: 'Secret rotated. Update your webhook consumer.' };
}); });