From 03d4b7def9dd62b0dc36b53402173b2f4231d730 Mon Sep 17 00:00:00 2001 From: saravanakumardb1 Date: Mon, 2 Mar 2026 01:45:16 -0800 Subject: [PATCH] refactor(backend): migrate ChronoMind repositories to @bytelyst/datastore --- backend/package-lock.json | 22 ++++ backend/package.json | 1 + backend/src/lib/datastore.ts | 67 ++++++++++ backend/src/modules/households/repository.ts | 70 +++------- backend/src/modules/routines/repository.ts | 104 +++++---------- .../src/modules/shared-timers/repository.ts | 70 +++------- backend/src/modules/timers/repository.ts | 109 +++++----------- backend/src/modules/webhooks/repository.ts | 122 ++++++------------ backend/src/server.ts | 2 + 9 files changed, 238 insertions(+), 329 deletions(-) create mode 100644 backend/src/lib/datastore.ts diff --git a/backend/package-lock.json b/backend/package-lock.json index 3d042a6..3a2eb73 100644 --- a/backend/package-lock.json +++ b/backend/package-lock.json @@ -12,6 +12,7 @@ "@bytelyst/auth": "file:../../learning_ai_common_plat/packages/auth", "@bytelyst/config": "file:../../learning_ai_common_plat/packages/config", "@bytelyst/cosmos": "file:../../learning_ai_common_plat/packages/cosmos", + "@bytelyst/datastore": "file:../../learning_ai_common_plat/packages/datastore", "@bytelyst/errors": "file:../../learning_ai_common_plat/packages/errors", "@bytelyst/fastify-core": "file:../../learning_ai_common_plat/packages/fastify-core", "fastify": "^5.2.1", @@ -64,6 +65,21 @@ "@azure/cosmos": ">=4.0.0" } }, + "../../learning_ai_common_plat/packages/datastore": { + "name": "@bytelyst/datastore", + "version": "0.1.0", + "devDependencies": { + "vitest": "^3.0.0" + }, + "peerDependencies": { + "@azure/cosmos": ">=4.0.0" + }, + "peerDependenciesMeta": { + "@azure/cosmos": { + "optional": true + } + } + }, "../../learning_ai_common_plat/packages/errors": { "name": "@bytelyst/errors", "version": "0.1.0" @@ -76,6 +92,7 @@ }, "devDependencies": { "@fastify/swagger": "^9.7.0", + "@fastify/swagger-ui": "^5.2.5", "fastify-metrics": "^10.6.0" }, "peerDependencies": { @@ -332,6 +349,10 @@ "resolved": "../../learning_ai_common_plat/packages/cosmos", "link": true }, + "node_modules/@bytelyst/datastore": { + "resolved": "../../learning_ai_common_plat/packages/datastore", + "link": true + }, "node_modules/@bytelyst/errors": { "resolved": "../../learning_ai_common_plat/packages/errors", "link": true @@ -2494,6 +2515,7 @@ "integrity": "sha512-w+N7Hifpc3gRjZ63vYBXA56dvvRlNWRczTdmCBBa+CotUzAPf5b7YMdMR/8CQoeYE5LX3W4wj6RYTgonm1b9DA==", "dev": true, "license": "MIT", + "peer": true, "dependencies": { "esbuild": "^0.27.0", "fdir": "^6.5.0", diff --git a/backend/package.json b/backend/package.json index f1af60c..36c0e9d 100644 --- a/backend/package.json +++ b/backend/package.json @@ -16,6 +16,7 @@ "@bytelyst/auth": "file:../../learning_ai_common_plat/packages/auth", "@bytelyst/config": "file:../../learning_ai_common_plat/packages/config", "@bytelyst/cosmos": "file:../../learning_ai_common_plat/packages/cosmos", + "@bytelyst/datastore": "file:../../learning_ai_common_plat/packages/datastore", "@bytelyst/errors": "file:../../learning_ai_common_plat/packages/errors", "@bytelyst/fastify-core": "file:../../learning_ai_common_plat/packages/fastify-core", "@azure/cosmos": "^4.2.0", diff --git a/backend/src/lib/datastore.ts b/backend/src/lib/datastore.ts new file mode 100644 index 0000000..7e9dd82 --- /dev/null +++ b/backend/src/lib/datastore.ts @@ -0,0 +1,67 @@ +/** + * Cloud-agnostic datastore bridge for chronomind-backend. + * + * Wraps @bytelyst/datastore with ChronoMind container registry config. + * Repositories import getCollection() from here instead of getContainer() from cosmos. + * + * Migration: Replace `import { getContainer } from '../../lib/cosmos.js'` + * with `import { getCollection } from '../../lib/datastore.js'` + */ + +import { + type DatastoreProvider, + type DocumentCollection, + type BaseDocument, + setDatastore, + CosmosDatastoreProvider, + MemoryDatastoreProvider, +} from '@bytelyst/datastore'; + +let _provider: DatastoreProvider | null = null; + +/** + * Initialize the datastore provider. + * Call once at service startup (before any repository calls). + */ +export function initDatastore(): DatastoreProvider { + if (_provider) return _provider; + + const dbProvider = (process.env.DB_PROVIDER || 'cosmos').toLowerCase(); + + if (dbProvider === 'memory') { + _provider = new MemoryDatastoreProvider(); + } else { + _provider = new CosmosDatastoreProvider(); + } + + setDatastore(_provider); + return _provider; +} + +/** + * Inject a provider directly (for testing). + */ +export function setProvider(provider: DatastoreProvider): void { + _provider = provider; +} + +/** + * Get a typed collection from the datastore. + * Drop-in replacement for getContainer() — returns a DocumentCollection instead of a Cosmos Container. + */ +export function getCollection( + name: string, + partitionKeyPath: string = '/productId' +): DocumentCollection { + if (!_provider) { + initDatastore(); + } + return _provider!.getCollection(name, partitionKeyPath); +} + +/** + * @internal — for testing only + */ +export function _resetDatastoreProvider(): void { + _provider = null; +} diff --git a/backend/src/modules/households/repository.ts b/backend/src/modules/households/repository.ts index 377d158..dac3220 100644 --- a/backend/src/modules/households/repository.ts +++ b/backend/src/modules/households/repository.ts @@ -1,5 +1,5 @@ /** - * Households repository — Cosmos DB CRUD for household membership. + * Households repository — cloud-agnostic CRUD for household membership. * * Container: households (partition key: /id) * @@ -7,37 +7,30 @@ * partitioned by their own /id since multiple users share the same doc. */ -import { getContainer } from '../../lib/cosmos.js'; +import { getCollection } from '../../lib/datastore.js'; import type { HouseholdDoc, HouseholdQuery } from './types.js'; -function container() { - return getContainer('households'); +function collection() { + return getCollection('households', '/id'); } export async function getHousehold(id: string): Promise { - try { - const { resource } = await container().item(id, id).read(); - return resource ?? null; - } catch { - return null; - } + return collection().findById(id, id); } export async function createHousehold(doc: HouseholdDoc): Promise { - const { resource } = await container().items.create(doc); - return resource as HouseholdDoc; + return collection().create(doc); } export async function replaceHousehold(doc: HouseholdDoc): Promise { - const { resource } = await container().item(doc.id, doc.id).replace(doc); - return resource as HouseholdDoc; + return collection().upsert(doc); } export async function deleteHousehold(id: string): Promise { try { const existing = await getHousehold(id); if (!existing) return false; - await container().item(id, id).delete(); + await collection().delete(id, id); return true; } catch { return false; @@ -49,47 +42,24 @@ export async function listHouseholdsForUser( productId: string, query: HouseholdQuery ): Promise<{ items: HouseholdDoc[]; total: number }> { - const countResult = await container() - .items.query({ - query: - 'SELECT VALUE COUNT(1) FROM c WHERE c.productId = @productId AND ARRAY_CONTAINS(c.members, { "userId": @userId }, true)', - parameters: [ - { name: '@productId', value: productId }, - { name: '@userId', value: userId }, - ], - }) - .fetchAll(); - const total = countResult.resources[0] ?? 0; + // ARRAY_CONTAINS on object arrays requires post-filtering (not expressible as simple filters) + const all = await collection().findMany({ filter: { productId } }); + const matched = all.filter(h => h.members.some(m => m.userId === userId)); - const { resources } = await container() - .items.query({ - query: - 'SELECT * FROM c WHERE c.productId = @productId AND ARRAY_CONTAINS(c.members, { "userId": @userId }, true) ORDER BY c.createdAt DESC OFFSET @offset LIMIT @limit', - parameters: [ - { name: '@productId', value: productId }, - { name: '@userId', value: userId }, - { name: '@offset', value: query.offset }, - { name: '@limit', value: query.limit }, - ], - }) - .fetchAll(); + const total = matched.length; - return { items: resources, total }; + // Sort by createdAt DESC, then paginate + matched.sort((a, b) => b.createdAt.localeCompare(a.createdAt)); + const items = matched.slice(query.offset, query.offset + query.limit); + + return { items, total }; } export async function findHouseholdByInviteCode( code: string, productId: string ): Promise { - const { resources } = await container() - .items.query({ - query: - 'SELECT * FROM c WHERE c.productId = @productId AND ARRAY_CONTAINS(c.invites, { "code": @code, "status": "pending" }, true)', - parameters: [ - { name: '@productId', value: productId }, - { name: '@code', value: code }, - ], - }) - .fetchAll(); - return resources[0] ?? null; + // ARRAY_CONTAINS on object arrays requires post-filtering + const all = await collection().findMany({ filter: { productId } }); + return all.find(h => h.invites.some(i => i.code === code && i.status === 'pending')) ?? null; } diff --git a/backend/src/modules/routines/repository.ts b/backend/src/modules/routines/repository.ts index 6892b69..d2c89e5 100644 --- a/backend/src/modules/routines/repository.ts +++ b/backend/src/modules/routines/repository.ts @@ -1,14 +1,15 @@ /** - * Routines repository — Cosmos DB CRUD + sync + batch upsert. + * Routines repository — cloud-agnostic CRUD + sync + batch upsert. * * Container: routines (partition key: /userId) */ -import { getContainer } from '../../lib/cosmos.js'; +import { getCollection } from '../../lib/datastore.js'; import type { RoutineDoc, RoutineQuery, BatchUpsertRoutinesResult } from './types.js'; +import type { FilterMap } from '@bytelyst/datastore'; -function container() { - return getContainer('routines'); +function collection() { + return getCollection('routines', '/userId'); } export async function listRoutines( @@ -16,63 +17,32 @@ export async function listRoutines( productId: string, query: RoutineQuery ): Promise<{ items: RoutineDoc[]; total: number }> { - const conditions: string[] = ['c.userId = @userId', 'c.productId = @productId']; - const params: { name: string; value: string | number | boolean }[] = [ - { name: '@userId', value: userId }, - { name: '@productId', value: productId }, - ]; + const filter: FilterMap = { userId, productId }; - if (query.status) { - conditions.push('c.status = @status'); - params.push({ name: '@status', value: query.status }); - } - if (query.isTemplate !== undefined) { - conditions.push('c.isTemplate = @isTemplate'); - params.push({ name: '@isTemplate', value: query.isTemplate }); - } - if (query.category) { - conditions.push('c.category = @category'); - params.push({ name: '@category', value: query.category }); - } + if (query.status) filter.status = query.status; + if (query.isTemplate !== undefined) filter.isTemplate = query.isTemplate; + if (query.category) filter.category = query.category; - const where = `WHERE ${conditions.join(' AND ')}`; - const sortField = `c.${query.sortBy}`; - const orderDir = query.sortOrder.toUpperCase(); + const sortDir = query.sortOrder === 'asc' ? 1 : -1; - const countResult = await container() - .items.query({ - query: `SELECT VALUE COUNT(1) FROM c ${where}`, - parameters: params, - }) - .fetchAll(); - const total = countResult.resources[0] ?? 0; + const total = await collection().count(filter); - const { resources } = await container() - .items.query({ - query: `SELECT * FROM c ${where} ORDER BY ${sortField} ${orderDir} OFFSET @offset LIMIT @limit`, - parameters: [ - ...params, - { name: '@offset', value: query.offset }, - { name: '@limit', value: query.limit }, - ], - }) - .fetchAll(); + const items = await collection().findMany({ + filter, + sort: { [query.sortBy]: sortDir } as Record, + offset: query.offset, + limit: query.limit, + }); - return { items: resources, total }; + return { items, total }; } export async function getRoutine(id: string, userId: string): Promise { - try { - const { resource } = await container().item(id, userId).read(); - return resource ?? null; - } catch { - return null; - } + return collection().findById(id, userId); } export async function createRoutine(doc: RoutineDoc): Promise { - const { resource } = await container().items.create(doc); - return resource as RoutineDoc; + return collection().create(doc); } export async function updateRoutine( @@ -82,7 +52,7 @@ export async function updateRoutine( expectedSyncVersion: number ): Promise<{ doc: RoutineDoc | null; conflict: boolean; serverVersion?: number }> { try { - const { resource: existing } = await container().item(id, userId).read(); + const existing = await collection().findById(id, userId); if (!existing) return { doc: null, conflict: false }; if (expectedSyncVersion <= existing.syncVersion) { @@ -96,8 +66,8 @@ export async function updateRoutine( syncVersion: expectedSyncVersion, lastSyncedAt: now, }; - const { resource } = await container().item(id, userId).replace(merged); - return { doc: resource as RoutineDoc, conflict: false }; + const doc = await collection().upsert(merged); + return { doc, conflict: false }; } catch { return { doc: null, conflict: false }; } @@ -105,9 +75,9 @@ export async function updateRoutine( export async function deleteRoutine(id: string, userId: string): Promise { try { - const { resource: existing } = await container().item(id, userId).read(); + const existing = await collection().findById(id, userId); if (!existing) return false; - await container().item(id, userId).delete(); + await collection().delete(id, userId); return true; } catch { return false; @@ -120,19 +90,11 @@ export async function getRoutinesSince( sinceTimestamp: string, limit: number ): Promise { - const { resources } = await container() - .items.query({ - query: - 'SELECT * FROM c WHERE c.userId = @userId AND c.productId = @productId AND c.lastSyncedAt >= @since ORDER BY c.lastSyncedAt ASC OFFSET 0 LIMIT @limit', - parameters: [ - { name: '@userId', value: userId }, - { name: '@productId', value: productId }, - { name: '@since', value: sinceTimestamp }, - { name: '@limit', value: limit }, - ], - }) - .fetchAll(); - return resources; + return collection().findMany({ + filter: { userId, productId, lastSyncedAt: { $gte: sinceTimestamp } }, + sort: { lastSyncedAt: 1 }, + limit, + }); } export async function batchUpsertRoutines( @@ -158,7 +120,7 @@ export async function batchUpsertRoutines( productId, lastSyncedAt: now, } as RoutineDoc; - await container().item(routine.id, userId).replace(merged); + await collection().upsert(merged); synced.push(routine.id); } else { conflicts.push({ id: routine.id, serverVersion: existing.syncVersion }); @@ -169,8 +131,8 @@ export async function batchUpsertRoutines( userId, productId, lastSyncedAt: now, - }; - await container().items.create(doc); + } as RoutineDoc; + await collection().create(doc); synced.push(routine.id); } } catch (err) { diff --git a/backend/src/modules/shared-timers/repository.ts b/backend/src/modules/shared-timers/repository.ts index 7593029..77cf7de 100644 --- a/backend/src/modules/shared-timers/repository.ts +++ b/backend/src/modules/shared-timers/repository.ts @@ -1,14 +1,15 @@ /** - * Shared timers repository — Cosmos DB CRUD for household shared timers. + * Shared timers repository — cloud-agnostic CRUD for household shared timers. * * Container: shared_timers (partition key: /householdId) */ -import { getContainer } from '../../lib/cosmos.js'; +import { getCollection } from '../../lib/datastore.js'; import type { SharedTimerDoc, SharedTimerQuery } from './types.js'; +import type { FilterMap } from '@bytelyst/datastore'; -function container() { - return getContainer('shared_timers'); +function collection() { + return getCollection('shared_timers', '/householdId'); } export async function listSharedTimers( @@ -16,74 +17,45 @@ export async function listSharedTimers( productId: string, query: SharedTimerQuery ): Promise<{ items: SharedTimerDoc[]; total: number }> { - const conditions: string[] = ['c.householdId = @householdId', 'c.productId = @productId']; - const params: { name: string; value: string | number }[] = [ - { name: '@householdId', value: householdId }, - { name: '@productId', value: productId }, - ]; + const filter: FilterMap = { householdId, productId }; - if (query.state) { - conditions.push('c.state = @state'); - params.push({ name: '@state', value: query.state }); - } - if (query.type) { - conditions.push('c.type = @type'); - params.push({ name: '@type', value: query.type }); - } + if (query.state) filter.state = query.state; + if (query.type) filter.type = query.type; - const where = `WHERE ${conditions.join(' AND ')}`; - const sortField = `c.${query.sortBy}`; - const orderDir = query.sortOrder.toUpperCase(); + const sortDir = query.sortOrder === 'asc' ? 1 : -1; - const countResult = await container() - .items.query({ - query: `SELECT VALUE COUNT(1) FROM c ${where}`, - parameters: params, - }) - .fetchAll(); - const total = countResult.resources[0] ?? 0; + const total = await collection().count(filter); - const { resources } = await container() - .items.query({ - query: `SELECT * FROM c ${where} ORDER BY ${sortField} ${orderDir} OFFSET @offset LIMIT @limit`, - parameters: [ - ...params, - { name: '@offset', value: query.offset }, - { name: '@limit', value: query.limit }, - ], - }) - .fetchAll(); + const items = await collection().findMany({ + filter, + sort: { [query.sortBy]: sortDir } as Record, + offset: query.offset, + limit: query.limit, + }); - return { items: resources, total }; + return { items, total }; } export async function getSharedTimer( id: string, householdId: string ): Promise { - try { - const { resource } = await container().item(id, householdId).read(); - return resource ?? null; - } catch { - return null; - } + return collection().findById(id, householdId); } export async function createSharedTimer(doc: SharedTimerDoc): Promise { - const { resource } = await container().items.create(doc); - return resource as SharedTimerDoc; + return collection().create(doc); } export async function replaceSharedTimer(doc: SharedTimerDoc): Promise { - const { resource } = await container().item(doc.id, doc.householdId).replace(doc); - return resource as SharedTimerDoc; + return collection().upsert(doc); } export async function deleteSharedTimer(id: string, householdId: string): Promise { try { const existing = await getSharedTimer(id, householdId); if (!existing) return false; - await container().item(id, householdId).delete(); + await collection().delete(id, householdId); return true; } catch { return false; diff --git a/backend/src/modules/timers/repository.ts b/backend/src/modules/timers/repository.ts index 197ffd9..f59faa8 100644 --- a/backend/src/modules/timers/repository.ts +++ b/backend/src/modules/timers/repository.ts @@ -1,14 +1,15 @@ /** - * Timers repository — Cosmos DB CRUD + sync + batch upsert. + * Timers repository — cloud-agnostic CRUD + sync + batch upsert. * * Container: timers (partition key: /userId) */ -import { getContainer } from '../../lib/cosmos.js'; +import { getCollection } from '../../lib/datastore.js'; import type { TimerDoc, TimerQuery, BatchUpsertResult } from './types.js'; +import type { FilterMap } from '@bytelyst/datastore'; -function container() { - return getContainer('timers'); +function collection() { + return getCollection('timers', '/userId'); } export async function listTimers( @@ -16,69 +17,33 @@ export async function listTimers( productId: string, query: TimerQuery ): Promise<{ items: TimerDoc[]; total: number }> { - const conditions: string[] = ['c.userId = @userId', 'c.productId = @productId']; - const params: { name: string; value: string | number }[] = [ - { name: '@userId', value: userId }, - { name: '@productId', value: productId }, - ]; + const filter: FilterMap = { userId, productId }; - if (query.state) { - conditions.push('c.state = @state'); - params.push({ name: '@state', value: query.state }); - } - if (query.type) { - conditions.push('c.type = @type'); - params.push({ name: '@type', value: query.type }); - } - if (query.urgency) { - conditions.push('c.urgency = @urgency'); - params.push({ name: '@urgency', value: query.urgency }); - } - if (query.category) { - conditions.push('c.category = @category'); - params.push({ name: '@category', value: query.category }); - } + if (query.state) filter.state = query.state; + if (query.type) filter.type = query.type; + if (query.urgency) filter.urgency = query.urgency; + if (query.category) filter.category = query.category; - const where = `WHERE ${conditions.join(' AND ')}`; - const sortField = `c.${query.sortBy}`; - const orderDir = query.sortOrder.toUpperCase(); + const sortDir = query.sortOrder === 'asc' ? 1 : -1; - // Count query - const countResult = await container() - .items.query({ - query: `SELECT VALUE COUNT(1) FROM c ${where}`, - parameters: params, - }) - .fetchAll(); - const total = countResult.resources[0] ?? 0; + const total = await collection().count(filter); - // Data query with pagination - const { resources } = await container() - .items.query({ - query: `SELECT * FROM c ${where} ORDER BY ${sortField} ${orderDir} OFFSET @offset LIMIT @limit`, - parameters: [ - ...params, - { name: '@offset', value: query.offset }, - { name: '@limit', value: query.limit }, - ], - }) - .fetchAll(); + const items = await collection().findMany({ + filter, + sort: { [query.sortBy]: sortDir } as Record, + offset: query.offset, + limit: query.limit, + }); - return { items: resources, total }; + return { items, total }; } export async function getTimer(id: string, userId: string): Promise { - try { - const { resource } = await container().item(id, userId).read(); - return resource ?? null; - } catch { - return null; - } + return collection().findById(id, userId); } export async function createTimer(doc: TimerDoc): Promise { - const { resource } = await container().items.create(doc); - return resource as TimerDoc; + return collection().create(doc); } export async function updateTimer( @@ -88,7 +53,7 @@ export async function updateTimer( expectedSyncVersion: number ): Promise<{ doc: TimerDoc | null; conflict: boolean; serverVersion?: number }> { try { - const { resource: existing } = await container().item(id, userId).read(); + const existing = await collection().findById(id, userId); if (!existing) return { doc: null, conflict: false }; // Optimistic concurrency: reject stale writes @@ -103,8 +68,8 @@ export async function updateTimer( syncVersion: expectedSyncVersion, lastSyncedAt: now, }; - const { resource } = await container().item(id, userId).replace(merged); - return { doc: resource as TimerDoc, conflict: false }; + const doc = await collection().upsert(merged); + return { doc, conflict: false }; } catch { return { doc: null, conflict: false }; } @@ -112,9 +77,9 @@ export async function updateTimer( export async function deleteTimer(id: string, userId: string): Promise { try { - const { resource: existing } = await container().item(id, userId).read(); + const existing = await collection().findById(id, userId); if (!existing) return false; - await container().item(id, userId).delete(); + await collection().delete(id, userId); return true; } catch { return false; @@ -127,19 +92,11 @@ export async function getTimersSince( sinceTimestamp: string, limit: number ): Promise { - const { resources } = await container() - .items.query({ - query: - 'SELECT * FROM c WHERE c.userId = @userId AND c.productId = @productId AND c.lastSyncedAt >= @since ORDER BY c.lastSyncedAt ASC OFFSET 0 LIMIT @limit', - parameters: [ - { name: '@userId', value: userId }, - { name: '@productId', value: productId }, - { name: '@since', value: sinceTimestamp }, - { name: '@limit', value: limit }, - ], - }) - .fetchAll(); - return resources; + return collection().findMany({ + filter: { userId, productId, lastSyncedAt: { $gte: sinceTimestamp } }, + sort: { lastSyncedAt: 1 }, + limit, + }); } export async function batchUpsert( @@ -166,7 +123,7 @@ export async function batchUpsert( productId, lastSyncedAt: now, }; - await container().item(timer.id, userId).replace(merged); + await collection().upsert(merged); synced.push(timer.id); } else { conflicts.push({ id: timer.id, serverVersion: existing.syncVersion }); @@ -179,7 +136,7 @@ export async function batchUpsert( productId, lastSyncedAt: now, } as TimerDoc; - await container().items.create(doc); + await collection().create(doc); synced.push(timer.id); } } catch (err) { diff --git a/backend/src/modules/webhooks/repository.ts b/backend/src/modules/webhooks/repository.ts index 0030e8f..fc56cc6 100644 --- a/backend/src/modules/webhooks/repository.ts +++ b/backend/src/modules/webhooks/repository.ts @@ -1,4 +1,4 @@ -import { getContainer } from '../../lib/cosmos.js'; +import { getCollection } from '../../lib/datastore.js'; import { NotFoundError, ConflictError } from '../../lib/errors.js'; import type { WebhookSubscriptionDoc, @@ -8,15 +8,12 @@ import type { WebhookEventType, } from './types.js'; -const SUBS_CONTAINER = 'webhook_subscriptions'; -const EVENTS_CONTAINER = 'webhook_events'; - -function subsContainer() { - return getContainer(SUBS_CONTAINER); +function subsCollection() { + return getCollection('webhook_subscriptions', '/userId'); } -function eventsContainer() { - return getContainer(EVENTS_CONTAINER); +function eventsCollection() { + return getCollection('webhook_events', '/subscriptionId'); } // ── Subscription CRUD ───────────────────────────────────────── @@ -25,29 +22,18 @@ export async function listSubscriptions( userId: string, productId: string ): Promise { - const { resources } = await subsContainer() - .items.query( - { - query: - 'SELECT * FROM c WHERE c.userId = @userId AND c.productId = @productId ORDER BY c.createdAt DESC', - parameters: [ - { name: '@userId', value: userId }, - { name: '@productId', value: productId }, - ], - }, - { partitionKey: userId } - ) - .fetchAll(); - - return resources; + return subsCollection().findMany({ + filter: { userId, productId }, + sort: { createdAt: -1 }, + }); } export async function getSubscription(id: string, userId: string): Promise { - const { resource } = await subsContainer().item(id, userId).read(); - if (!resource) { + const doc = await subsCollection().findById(id, userId); + if (!doc) { throw new NotFoundError(`Webhook subscription '${id}' not found`); } - return resource; + return doc; } export async function createSubscription( @@ -73,10 +59,9 @@ export async function createSubscription( }; try { - const { resource } = await subsContainer().items.create(doc); - return resource as WebhookSubscriptionDoc; + return await subsCollection().create(doc); } catch (err: unknown) { - if (err && typeof err === 'object' && 'code' in err && err.code === 409) { + if (err instanceof Error && err.message.includes('already exists')) { throw new ConflictError(`Subscription '${id}' already exists`); } throw err; @@ -96,13 +81,12 @@ export async function updateSubscription( updatedAt: new Date().toISOString(), }; - const { resource } = await subsContainer().item(id, userId).replace(updated); - return resource as WebhookSubscriptionDoc; + return subsCollection().upsert(updated); } export async function deleteSubscription(id: string, userId: string): Promise { await getSubscription(id, userId); // verify exists - await subsContainer().item(id, userId).delete(); + await subsCollection().delete(id, userId); } // ── Find Subscriptions for Event ────────────────────────────── @@ -112,22 +96,10 @@ export async function findSubscriptionsForEvent( productId: string, eventType: WebhookEventType ): Promise { - const { resources } = await subsContainer() - .items.query( - { - query: - 'SELECT * FROM c WHERE c.userId = @userId AND c.productId = @productId AND c.active = true AND ARRAY_CONTAINS(c.events, @eventType)', - parameters: [ - { name: '@userId', value: userId }, - { name: '@productId', value: productId }, - { name: '@eventType', value: eventType }, - ], - }, - { partitionKey: userId } - ) - .fetchAll(); - - return resources; + // events is a string[] — $contains works for primitive arrays in memory provider + return subsCollection().findMany({ + filter: { userId, productId, active: true, events: { $contains: eventType } }, + }); } // ── Increment Failure Count ─────────────────────────────────── @@ -139,54 +111,38 @@ export async function incrementFailureCount(id: string, userId: string): Promise // Auto-disable after 10 consecutive failures const active = failureCount < 10; - await subsContainer() - .item(id, userId) - .replace({ - ...existing, - failureCount, - active, - updatedAt: new Date().toISOString(), - }); + await subsCollection().upsert({ + ...existing, + failureCount, + active, + updatedAt: new Date().toISOString(), + }); } export async function resetFailureCount(id: string, userId: string): Promise { const existing = await getSubscription(id, userId); - await subsContainer() - .item(id, userId) - .replace({ - ...existing, - failureCount: 0, - lastDeliveryAt: new Date().toISOString(), - updatedAt: new Date().toISOString(), - }); + await subsCollection().upsert({ + ...existing, + failureCount: 0, + lastDeliveryAt: new Date().toISOString(), + updatedAt: new Date().toISOString(), + }); } // ── Event Log ───────────────────────────────────────────────── export async function createEvent(doc: WebhookEventDoc): Promise { - const { resource } = await eventsContainer().items.create(doc); - return resource as WebhookEventDoc; + return eventsCollection().create(doc); } export async function updateEvent(doc: WebhookEventDoc): Promise { - const { resource } = await eventsContainer().item(doc.id, doc.subscriptionId).replace(doc); - return resource as WebhookEventDoc; + return eventsCollection().upsert(doc); } export async function listEvents(subscriptionId: string, limit = 50): Promise { - const { resources } = await eventsContainer() - .items.query( - { - query: - 'SELECT TOP @limit * FROM c WHERE c.subscriptionId = @subscriptionId ORDER BY c.createdAt DESC', - parameters: [ - { name: '@subscriptionId', value: subscriptionId }, - { name: '@limit', value: limit }, - ], - }, - { partitionKey: subscriptionId } - ) - .fetchAll(); - - return resources; + return eventsCollection().findMany({ + filter: { subscriptionId }, + sort: { createdAt: -1 }, + limit, + }); } diff --git a/backend/src/server.ts b/backend/src/server.ts index 93a6d58..4834bf6 100644 --- a/backend/src/server.ts +++ b/backend/src/server.ts @@ -13,6 +13,7 @@ import { householdRoutes } from './modules/households/routes.js'; import { sharedTimerRoutes } from './modules/shared-timers/routes.js'; import { webhookRoutes } from './modules/webhooks/routes.js'; import { initCosmosIfNeeded } from './lib/cosmos-init.js'; +import { initDatastore } from './lib/datastore.js'; import { config } from './lib/config.js'; import { jwtVerify } from 'jose'; @@ -21,6 +22,7 @@ import type { JwtPayload } from './lib/request-context.js'; const jwtSecret = new TextEncoder().encode(config.JWT_SECRET); await initCosmosIfNeeded(); +initDatastore(); const app = await createServiceApp({ name: 'chronomind-backend',