From 5401fad419e5b6e08c1e8542d7c6fe96ccfc6c04 Mon Sep 17 00:00:00 2001 From: saravanakumardb1 Date: Mon, 2 Mar 2026 02:02:47 -0800 Subject: [PATCH] refactor(platform-service): migrate webhooks/routes and migrations/runner from cosmos.js to datastore --- .../platform-service/src/migrations/runner.ts | 42 +++++++++---------- .../src/modules/webhooks/routes.ts | 7 ++-- 2 files changed, 23 insertions(+), 26 deletions(-) diff --git a/services/platform-service/src/migrations/runner.ts b/services/platform-service/src/migrations/runner.ts index 72c29bf0..2d748b7d 100644 --- a/services/platform-service/src/migrations/runner.ts +++ b/services/platform-service/src/migrations/runner.ts @@ -1,12 +1,13 @@ -import { getContainer } from '../lib/cosmos.js'; +import { getCollection } from '../lib/datastore.js'; +import type { BaseDocument, DocumentCollection } from '@bytelyst/datastore'; import { MIGRATIONS } from './registry.js'; import type { MigrationDoc } from './types.js'; const CONTAINER = 'migrations'; const PRODUCT_ID = 'platform'; -function container() { - return getContainer(CONTAINER); +function collection(): DocumentCollection { + return getCollection(CONTAINER, '/productId'); } /** @@ -25,13 +26,11 @@ export async function runPendingMigrations(): Promise<{ // Fetch already-applied migrations let appliedVersions: Set; try { - const { resources } = await container() - .items.query({ - query: "SELECT c.version FROM c WHERE c.productId = @pid AND c.status = 'applied'", - parameters: [{ name: '@pid', value: PRODUCT_ID }], - }) - .fetchAll(); - appliedVersions = new Set(resources.map(r => r.version)); + const results = await collection().rawQuery<{ version: number }>( + "SELECT c.version FROM c WHERE c.productId = @pid AND c.status = 'applied'", + { pid: PRODUCT_ID } + ); + appliedVersions = new Set(results.map(r => r.version)); } catch { // Container may not exist yet — treat as empty appliedVersions = new Set(); @@ -44,7 +43,7 @@ export async function runPendingMigrations(): Promise<{ } const startTime = Date.now(); - const doc: MigrationDoc = { + const doc = { id: migration.name, productId: PRODUCT_ID, version: migration.version, @@ -52,7 +51,7 @@ export async function runPendingMigrations(): Promise<{ description: migration.description, appliedAt: new Date().toISOString(), durationMs: 0, - status: 'applied', + status: 'applied' as MigrationDoc['status'], }; try { @@ -63,14 +62,16 @@ export async function runPendingMigrations(): Promise<{ } catch (err) { doc.durationMs = Date.now() - startTime; doc.status = 'failed'; - doc.error = err instanceof Error ? err.message : String(err); + (doc as MigrationDoc).error = err instanceof Error ? err.message : String(err); failed.push(migration.name); - process.stderr.write(`[migrations] FAILED ${migration.name}: ${doc.error}\n`); + process.stderr.write( + `[migrations] FAILED ${migration.name}: ${(doc as MigrationDoc).error}\n` + ); } // Record migration result (best-effort) try { - await container().items.upsert(doc); + await collection().upsert(doc as MigrationDoc & BaseDocument); } catch (recordErr) { process.stderr.write( `[migrations] Failed to record migration ${migration.name}: ${recordErr}\n` @@ -95,13 +96,10 @@ export async function runPendingMigrations(): Promise<{ */ export async function listMigrations(): Promise { try { - const { resources } = await container() - .items.query({ - query: 'SELECT * FROM c WHERE c.productId = @pid ORDER BY c.version ASC', - parameters: [{ name: '@pid', value: PRODUCT_ID }], - }) - .fetchAll(); - return resources; + return await collection().rawQuery( + 'SELECT * FROM c WHERE c.productId = @pid ORDER BY c.version ASC', + { pid: PRODUCT_ID } + ); } catch { return []; } diff --git a/services/platform-service/src/modules/webhooks/routes.ts b/services/platform-service/src/modules/webhooks/routes.ts index 2fcd6f46..ddf722a2 100644 --- a/services/platform-service/src/modules/webhooks/routes.ts +++ b/services/platform-service/src/modules/webhooks/routes.ts @@ -119,10 +119,9 @@ export async function webhookRoutes(app: FastifyInstance) { sub.secret = newSecret; sub.updatedAt = new Date().toISOString(); - await repo.updateSubscription(id, productId, {}); - // Direct replace to update secret (not exposed via UpdateSchema) - const container = (await import('../../lib/cosmos.js')).getContainer('webhook_subscriptions'); - await container.item(id, productId).replace(sub); + // upsert the full doc to persist the new secret (not exposed via UpdateSchema) + const { getCollection } = await import('../../lib/datastore.js'); + await getCollection('webhook_subscriptions', '/productId').upsert(sub); return { secret: newSecret, message: 'Secret rotated. Update your webhook consumer.' }; });