refactor(backend): migrate ChronoMind repositories to @bytelyst/datastore

This commit is contained in:
saravanakumardb1 2026-03-02 01:45:16 -08:00
parent d6b1bb6f63
commit 03d4b7def9
9 changed files with 238 additions and 329 deletions

View File

@ -12,6 +12,7 @@
"@bytelyst/auth": "file:../../learning_ai_common_plat/packages/auth", "@bytelyst/auth": "file:../../learning_ai_common_plat/packages/auth",
"@bytelyst/config": "file:../../learning_ai_common_plat/packages/config", "@bytelyst/config": "file:../../learning_ai_common_plat/packages/config",
"@bytelyst/cosmos": "file:../../learning_ai_common_plat/packages/cosmos", "@bytelyst/cosmos": "file:../../learning_ai_common_plat/packages/cosmos",
"@bytelyst/datastore": "file:../../learning_ai_common_plat/packages/datastore",
"@bytelyst/errors": "file:../../learning_ai_common_plat/packages/errors", "@bytelyst/errors": "file:../../learning_ai_common_plat/packages/errors",
"@bytelyst/fastify-core": "file:../../learning_ai_common_plat/packages/fastify-core", "@bytelyst/fastify-core": "file:../../learning_ai_common_plat/packages/fastify-core",
"fastify": "^5.2.1", "fastify": "^5.2.1",
@ -64,6 +65,21 @@
"@azure/cosmos": ">=4.0.0" "@azure/cosmos": ">=4.0.0"
} }
}, },
"../../learning_ai_common_plat/packages/datastore": {
"name": "@bytelyst/datastore",
"version": "0.1.0",
"devDependencies": {
"vitest": "^3.0.0"
},
"peerDependencies": {
"@azure/cosmos": ">=4.0.0"
},
"peerDependenciesMeta": {
"@azure/cosmos": {
"optional": true
}
}
},
"../../learning_ai_common_plat/packages/errors": { "../../learning_ai_common_plat/packages/errors": {
"name": "@bytelyst/errors", "name": "@bytelyst/errors",
"version": "0.1.0" "version": "0.1.0"
@ -76,6 +92,7 @@
}, },
"devDependencies": { "devDependencies": {
"@fastify/swagger": "^9.7.0", "@fastify/swagger": "^9.7.0",
"@fastify/swagger-ui": "^5.2.5",
"fastify-metrics": "^10.6.0" "fastify-metrics": "^10.6.0"
}, },
"peerDependencies": { "peerDependencies": {
@ -332,6 +349,10 @@
"resolved": "../../learning_ai_common_plat/packages/cosmos", "resolved": "../../learning_ai_common_plat/packages/cosmos",
"link": true "link": true
}, },
"node_modules/@bytelyst/datastore": {
"resolved": "../../learning_ai_common_plat/packages/datastore",
"link": true
},
"node_modules/@bytelyst/errors": { "node_modules/@bytelyst/errors": {
"resolved": "../../learning_ai_common_plat/packages/errors", "resolved": "../../learning_ai_common_plat/packages/errors",
"link": true "link": true
@ -2494,6 +2515,7 @@
"integrity": "sha512-w+N7Hifpc3gRjZ63vYBXA56dvvRlNWRczTdmCBBa+CotUzAPf5b7YMdMR/8CQoeYE5LX3W4wj6RYTgonm1b9DA==", "integrity": "sha512-w+N7Hifpc3gRjZ63vYBXA56dvvRlNWRczTdmCBBa+CotUzAPf5b7YMdMR/8CQoeYE5LX3W4wj6RYTgonm1b9DA==",
"dev": true, "dev": true,
"license": "MIT", "license": "MIT",
"peer": true,
"dependencies": { "dependencies": {
"esbuild": "^0.27.0", "esbuild": "^0.27.0",
"fdir": "^6.5.0", "fdir": "^6.5.0",

View File

@ -16,6 +16,7 @@
"@bytelyst/auth": "file:../../learning_ai_common_plat/packages/auth", "@bytelyst/auth": "file:../../learning_ai_common_plat/packages/auth",
"@bytelyst/config": "file:../../learning_ai_common_plat/packages/config", "@bytelyst/config": "file:../../learning_ai_common_plat/packages/config",
"@bytelyst/cosmos": "file:../../learning_ai_common_plat/packages/cosmos", "@bytelyst/cosmos": "file:../../learning_ai_common_plat/packages/cosmos",
"@bytelyst/datastore": "file:../../learning_ai_common_plat/packages/datastore",
"@bytelyst/errors": "file:../../learning_ai_common_plat/packages/errors", "@bytelyst/errors": "file:../../learning_ai_common_plat/packages/errors",
"@bytelyst/fastify-core": "file:../../learning_ai_common_plat/packages/fastify-core", "@bytelyst/fastify-core": "file:../../learning_ai_common_plat/packages/fastify-core",
"@azure/cosmos": "^4.2.0", "@azure/cosmos": "^4.2.0",

View File

@ -0,0 +1,67 @@
/**
* Cloud-agnostic datastore bridge for chronomind-backend.
*
* Wraps @bytelyst/datastore with ChronoMind container registry config.
* Repositories import getCollection() from here instead of getContainer() from cosmos.
*
* Migration: Replace `import { getContainer } from '../../lib/cosmos.js'`
* with `import { getCollection } from '../../lib/datastore.js'`
*/
import {
type DatastoreProvider,
type DocumentCollection,
type BaseDocument,
setDatastore,
CosmosDatastoreProvider,
MemoryDatastoreProvider,
} from '@bytelyst/datastore';
let _provider: DatastoreProvider | null = null;
/**
* Initialize the datastore provider.
* Call once at service startup (before any repository calls).
*/
export function initDatastore(): DatastoreProvider {
if (_provider) return _provider;
const dbProvider = (process.env.DB_PROVIDER || 'cosmos').toLowerCase();
if (dbProvider === 'memory') {
_provider = new MemoryDatastoreProvider();
} else {
_provider = new CosmosDatastoreProvider();
}
setDatastore(_provider);
return _provider;
}
/**
* Inject a provider directly (for testing).
*/
export function setProvider(provider: DatastoreProvider): void {
_provider = provider;
}
/**
* Get a typed collection from the datastore.
* Drop-in replacement for getContainer() returns a DocumentCollection instead of a Cosmos Container.
*/
export function getCollection<T extends BaseDocument = BaseDocument>(
name: string,
partitionKeyPath: string = '/productId'
): DocumentCollection<T> {
if (!_provider) {
initDatastore();
}
return _provider!.getCollection<T>(name, partitionKeyPath);
}
/**
* @internal for testing only
*/
export function _resetDatastoreProvider(): void {
_provider = null;
}

View File

@ -1,5 +1,5 @@
/** /**
* Households repository Cosmos DB CRUD for household membership. * Households repository cloud-agnostic CRUD for household membership.
* *
* Container: households (partition key: /id) * Container: households (partition key: /id)
* *
@ -7,37 +7,30 @@
* partitioned by their own /id since multiple users share the same doc. * partitioned by their own /id since multiple users share the same doc.
*/ */
import { getContainer } from '../../lib/cosmos.js'; import { getCollection } from '../../lib/datastore.js';
import type { HouseholdDoc, HouseholdQuery } from './types.js'; import type { HouseholdDoc, HouseholdQuery } from './types.js';
function container() { function collection() {
return getContainer('households'); return getCollection<HouseholdDoc>('households', '/id');
} }
export async function getHousehold(id: string): Promise<HouseholdDoc | null> { export async function getHousehold(id: string): Promise<HouseholdDoc | null> {
try { return collection().findById(id, id);
const { resource } = await container().item(id, id).read<HouseholdDoc>();
return resource ?? null;
} catch {
return null;
}
} }
export async function createHousehold(doc: HouseholdDoc): Promise<HouseholdDoc> { export async function createHousehold(doc: HouseholdDoc): Promise<HouseholdDoc> {
const { resource } = await container().items.create(doc); return collection().create(doc);
return resource as HouseholdDoc;
} }
export async function replaceHousehold(doc: HouseholdDoc): Promise<HouseholdDoc> { export async function replaceHousehold(doc: HouseholdDoc): Promise<HouseholdDoc> {
const { resource } = await container().item(doc.id, doc.id).replace(doc); return collection().upsert(doc);
return resource as HouseholdDoc;
} }
export async function deleteHousehold(id: string): Promise<boolean> { export async function deleteHousehold(id: string): Promise<boolean> {
try { try {
const existing = await getHousehold(id); const existing = await getHousehold(id);
if (!existing) return false; if (!existing) return false;
await container().item(id, id).delete(); await collection().delete(id, id);
return true; return true;
} catch { } catch {
return false; return false;
@ -49,47 +42,24 @@ export async function listHouseholdsForUser(
productId: string, productId: string,
query: HouseholdQuery query: HouseholdQuery
): Promise<{ items: HouseholdDoc[]; total: number }> { ): Promise<{ items: HouseholdDoc[]; total: number }> {
const countResult = await container() // ARRAY_CONTAINS on object arrays requires post-filtering (not expressible as simple filters)
.items.query<number>({ const all = await collection().findMany({ filter: { productId } });
query: const matched = all.filter(h => h.members.some(m => m.userId === userId));
'SELECT VALUE COUNT(1) FROM c WHERE c.productId = @productId AND ARRAY_CONTAINS(c.members, { "userId": @userId }, true)',
parameters: [
{ name: '@productId', value: productId },
{ name: '@userId', value: userId },
],
})
.fetchAll();
const total = countResult.resources[0] ?? 0;
const { resources } = await container() const total = matched.length;
.items.query<HouseholdDoc>({
query:
'SELECT * FROM c WHERE c.productId = @productId AND ARRAY_CONTAINS(c.members, { "userId": @userId }, true) ORDER BY c.createdAt DESC OFFSET @offset LIMIT @limit',
parameters: [
{ name: '@productId', value: productId },
{ name: '@userId', value: userId },
{ name: '@offset', value: query.offset },
{ name: '@limit', value: query.limit },
],
})
.fetchAll();
return { items: resources, total }; // Sort by createdAt DESC, then paginate
matched.sort((a, b) => b.createdAt.localeCompare(a.createdAt));
const items = matched.slice(query.offset, query.offset + query.limit);
return { items, total };
} }
export async function findHouseholdByInviteCode( export async function findHouseholdByInviteCode(
code: string, code: string,
productId: string productId: string
): Promise<HouseholdDoc | null> { ): Promise<HouseholdDoc | null> {
const { resources } = await container() // ARRAY_CONTAINS on object arrays requires post-filtering
.items.query<HouseholdDoc>({ const all = await collection().findMany({ filter: { productId } });
query: return all.find(h => h.invites.some(i => i.code === code && i.status === 'pending')) ?? null;
'SELECT * FROM c WHERE c.productId = @productId AND ARRAY_CONTAINS(c.invites, { "code": @code, "status": "pending" }, true)',
parameters: [
{ name: '@productId', value: productId },
{ name: '@code', value: code },
],
})
.fetchAll();
return resources[0] ?? null;
} }

View File

@ -1,14 +1,15 @@
/** /**
* Routines repository Cosmos DB CRUD + sync + batch upsert. * Routines repository cloud-agnostic CRUD + sync + batch upsert.
* *
* Container: routines (partition key: /userId) * Container: routines (partition key: /userId)
*/ */
import { getContainer } from '../../lib/cosmos.js'; import { getCollection } from '../../lib/datastore.js';
import type { RoutineDoc, RoutineQuery, BatchUpsertRoutinesResult } from './types.js'; import type { RoutineDoc, RoutineQuery, BatchUpsertRoutinesResult } from './types.js';
import type { FilterMap } from '@bytelyst/datastore';
function container() { function collection() {
return getContainer('routines'); return getCollection<RoutineDoc>('routines', '/userId');
} }
export async function listRoutines( export async function listRoutines(
@ -16,63 +17,32 @@ export async function listRoutines(
productId: string, productId: string,
query: RoutineQuery query: RoutineQuery
): Promise<{ items: RoutineDoc[]; total: number }> { ): Promise<{ items: RoutineDoc[]; total: number }> {
const conditions: string[] = ['c.userId = @userId', 'c.productId = @productId']; const filter: FilterMap = { userId, productId };
const params: { name: string; value: string | number | boolean }[] = [
{ name: '@userId', value: userId },
{ name: '@productId', value: productId },
];
if (query.status) { if (query.status) filter.status = query.status;
conditions.push('c.status = @status'); if (query.isTemplate !== undefined) filter.isTemplate = query.isTemplate;
params.push({ name: '@status', value: query.status }); if (query.category) filter.category = query.category;
}
if (query.isTemplate !== undefined) {
conditions.push('c.isTemplate = @isTemplate');
params.push({ name: '@isTemplate', value: query.isTemplate });
}
if (query.category) {
conditions.push('c.category = @category');
params.push({ name: '@category', value: query.category });
}
const where = `WHERE ${conditions.join(' AND ')}`; const sortDir = query.sortOrder === 'asc' ? 1 : -1;
const sortField = `c.${query.sortBy}`;
const orderDir = query.sortOrder.toUpperCase();
const countResult = await container() const total = await collection().count(filter);
.items.query<number>({
query: `SELECT VALUE COUNT(1) FROM c ${where}`,
parameters: params,
})
.fetchAll();
const total = countResult.resources[0] ?? 0;
const { resources } = await container() const items = await collection().findMany({
.items.query<RoutineDoc>({ filter,
query: `SELECT * FROM c ${where} ORDER BY ${sortField} ${orderDir} OFFSET @offset LIMIT @limit`, sort: { [query.sortBy]: sortDir } as Record<string, 1 | -1>,
parameters: [ offset: query.offset,
...params, limit: query.limit,
{ name: '@offset', value: query.offset }, });
{ name: '@limit', value: query.limit },
],
})
.fetchAll();
return { items: resources, total }; return { items, total };
} }
export async function getRoutine(id: string, userId: string): Promise<RoutineDoc | null> { export async function getRoutine(id: string, userId: string): Promise<RoutineDoc | null> {
try { return collection().findById(id, userId);
const { resource } = await container().item(id, userId).read<RoutineDoc>();
return resource ?? null;
} catch {
return null;
}
} }
export async function createRoutine(doc: RoutineDoc): Promise<RoutineDoc> { export async function createRoutine(doc: RoutineDoc): Promise<RoutineDoc> {
const { resource } = await container().items.create(doc); return collection().create(doc);
return resource as RoutineDoc;
} }
export async function updateRoutine( export async function updateRoutine(
@ -82,7 +52,7 @@ export async function updateRoutine(
expectedSyncVersion: number expectedSyncVersion: number
): Promise<{ doc: RoutineDoc | null; conflict: boolean; serverVersion?: number }> { ): Promise<{ doc: RoutineDoc | null; conflict: boolean; serverVersion?: number }> {
try { try {
const { resource: existing } = await container().item(id, userId).read<RoutineDoc>(); const existing = await collection().findById(id, userId);
if (!existing) return { doc: null, conflict: false }; if (!existing) return { doc: null, conflict: false };
if (expectedSyncVersion <= existing.syncVersion) { if (expectedSyncVersion <= existing.syncVersion) {
@ -96,8 +66,8 @@ export async function updateRoutine(
syncVersion: expectedSyncVersion, syncVersion: expectedSyncVersion,
lastSyncedAt: now, lastSyncedAt: now,
}; };
const { resource } = await container().item(id, userId).replace(merged); const doc = await collection().upsert(merged);
return { doc: resource as RoutineDoc, conflict: false }; return { doc, conflict: false };
} catch { } catch {
return { doc: null, conflict: false }; return { doc: null, conflict: false };
} }
@ -105,9 +75,9 @@ export async function updateRoutine(
export async function deleteRoutine(id: string, userId: string): Promise<boolean> { export async function deleteRoutine(id: string, userId: string): Promise<boolean> {
try { try {
const { resource: existing } = await container().item(id, userId).read<RoutineDoc>(); const existing = await collection().findById(id, userId);
if (!existing) return false; if (!existing) return false;
await container().item(id, userId).delete(); await collection().delete(id, userId);
return true; return true;
} catch { } catch {
return false; return false;
@ -120,19 +90,11 @@ export async function getRoutinesSince(
sinceTimestamp: string, sinceTimestamp: string,
limit: number limit: number
): Promise<RoutineDoc[]> { ): Promise<RoutineDoc[]> {
const { resources } = await container() return collection().findMany({
.items.query<RoutineDoc>({ filter: { userId, productId, lastSyncedAt: { $gte: sinceTimestamp } },
query: sort: { lastSyncedAt: 1 },
'SELECT * FROM c WHERE c.userId = @userId AND c.productId = @productId AND c.lastSyncedAt >= @since ORDER BY c.lastSyncedAt ASC OFFSET 0 LIMIT @limit', limit,
parameters: [ });
{ name: '@userId', value: userId },
{ name: '@productId', value: productId },
{ name: '@since', value: sinceTimestamp },
{ name: '@limit', value: limit },
],
})
.fetchAll();
return resources;
} }
export async function batchUpsertRoutines( export async function batchUpsertRoutines(
@ -158,7 +120,7 @@ export async function batchUpsertRoutines(
productId, productId,
lastSyncedAt: now, lastSyncedAt: now,
} as RoutineDoc; } as RoutineDoc;
await container().item(routine.id, userId).replace(merged); await collection().upsert(merged);
synced.push(routine.id); synced.push(routine.id);
} else { } else {
conflicts.push({ id: routine.id, serverVersion: existing.syncVersion }); conflicts.push({ id: routine.id, serverVersion: existing.syncVersion });
@ -169,8 +131,8 @@ export async function batchUpsertRoutines(
userId, userId,
productId, productId,
lastSyncedAt: now, lastSyncedAt: now,
}; } as RoutineDoc;
await container().items.create(doc); await collection().create(doc);
synced.push(routine.id); synced.push(routine.id);
} }
} catch (err) { } catch (err) {

View File

@ -1,14 +1,15 @@
/** /**
* Shared timers repository Cosmos DB CRUD for household shared timers. * Shared timers repository cloud-agnostic CRUD for household shared timers.
* *
* Container: shared_timers (partition key: /householdId) * Container: shared_timers (partition key: /householdId)
*/ */
import { getContainer } from '../../lib/cosmos.js'; import { getCollection } from '../../lib/datastore.js';
import type { SharedTimerDoc, SharedTimerQuery } from './types.js'; import type { SharedTimerDoc, SharedTimerQuery } from './types.js';
import type { FilterMap } from '@bytelyst/datastore';
function container() { function collection() {
return getContainer('shared_timers'); return getCollection<SharedTimerDoc>('shared_timers', '/householdId');
} }
export async function listSharedTimers( export async function listSharedTimers(
@ -16,74 +17,45 @@ export async function listSharedTimers(
productId: string, productId: string,
query: SharedTimerQuery query: SharedTimerQuery
): Promise<{ items: SharedTimerDoc[]; total: number }> { ): Promise<{ items: SharedTimerDoc[]; total: number }> {
const conditions: string[] = ['c.householdId = @householdId', 'c.productId = @productId']; const filter: FilterMap = { householdId, productId };
const params: { name: string; value: string | number }[] = [
{ name: '@householdId', value: householdId },
{ name: '@productId', value: productId },
];
if (query.state) { if (query.state) filter.state = query.state;
conditions.push('c.state = @state'); if (query.type) filter.type = query.type;
params.push({ name: '@state', value: query.state });
}
if (query.type) {
conditions.push('c.type = @type');
params.push({ name: '@type', value: query.type });
}
const where = `WHERE ${conditions.join(' AND ')}`; const sortDir = query.sortOrder === 'asc' ? 1 : -1;
const sortField = `c.${query.sortBy}`;
const orderDir = query.sortOrder.toUpperCase();
const countResult = await container() const total = await collection().count(filter);
.items.query<number>({
query: `SELECT VALUE COUNT(1) FROM c ${where}`,
parameters: params,
})
.fetchAll();
const total = countResult.resources[0] ?? 0;
const { resources } = await container() const items = await collection().findMany({
.items.query<SharedTimerDoc>({ filter,
query: `SELECT * FROM c ${where} ORDER BY ${sortField} ${orderDir} OFFSET @offset LIMIT @limit`, sort: { [query.sortBy]: sortDir } as Record<string, 1 | -1>,
parameters: [ offset: query.offset,
...params, limit: query.limit,
{ name: '@offset', value: query.offset }, });
{ name: '@limit', value: query.limit },
],
})
.fetchAll();
return { items: resources, total }; return { items, total };
} }
export async function getSharedTimer( export async function getSharedTimer(
id: string, id: string,
householdId: string householdId: string
): Promise<SharedTimerDoc | null> { ): Promise<SharedTimerDoc | null> {
try { return collection().findById(id, householdId);
const { resource } = await container().item(id, householdId).read<SharedTimerDoc>();
return resource ?? null;
} catch {
return null;
}
} }
export async function createSharedTimer(doc: SharedTimerDoc): Promise<SharedTimerDoc> { export async function createSharedTimer(doc: SharedTimerDoc): Promise<SharedTimerDoc> {
const { resource } = await container().items.create(doc); return collection().create(doc);
return resource as SharedTimerDoc;
} }
export async function replaceSharedTimer(doc: SharedTimerDoc): Promise<SharedTimerDoc> { export async function replaceSharedTimer(doc: SharedTimerDoc): Promise<SharedTimerDoc> {
const { resource } = await container().item(doc.id, doc.householdId).replace(doc); return collection().upsert(doc);
return resource as SharedTimerDoc;
} }
export async function deleteSharedTimer(id: string, householdId: string): Promise<boolean> { export async function deleteSharedTimer(id: string, householdId: string): Promise<boolean> {
try { try {
const existing = await getSharedTimer(id, householdId); const existing = await getSharedTimer(id, householdId);
if (!existing) return false; if (!existing) return false;
await container().item(id, householdId).delete(); await collection().delete(id, householdId);
return true; return true;
} catch { } catch {
return false; return false;

View File

@ -1,14 +1,15 @@
/** /**
* Timers repository Cosmos DB CRUD + sync + batch upsert. * Timers repository cloud-agnostic CRUD + sync + batch upsert.
* *
* Container: timers (partition key: /userId) * Container: timers (partition key: /userId)
*/ */
import { getContainer } from '../../lib/cosmos.js'; import { getCollection } from '../../lib/datastore.js';
import type { TimerDoc, TimerQuery, BatchUpsertResult } from './types.js'; import type { TimerDoc, TimerQuery, BatchUpsertResult } from './types.js';
import type { FilterMap } from '@bytelyst/datastore';
function container() { function collection() {
return getContainer('timers'); return getCollection<TimerDoc>('timers', '/userId');
} }
export async function listTimers( export async function listTimers(
@ -16,69 +17,33 @@ export async function listTimers(
productId: string, productId: string,
query: TimerQuery query: TimerQuery
): Promise<{ items: TimerDoc[]; total: number }> { ): Promise<{ items: TimerDoc[]; total: number }> {
const conditions: string[] = ['c.userId = @userId', 'c.productId = @productId']; const filter: FilterMap = { userId, productId };
const params: { name: string; value: string | number }[] = [
{ name: '@userId', value: userId },
{ name: '@productId', value: productId },
];
if (query.state) { if (query.state) filter.state = query.state;
conditions.push('c.state = @state'); if (query.type) filter.type = query.type;
params.push({ name: '@state', value: query.state }); if (query.urgency) filter.urgency = query.urgency;
} if (query.category) filter.category = query.category;
if (query.type) {
conditions.push('c.type = @type');
params.push({ name: '@type', value: query.type });
}
if (query.urgency) {
conditions.push('c.urgency = @urgency');
params.push({ name: '@urgency', value: query.urgency });
}
if (query.category) {
conditions.push('c.category = @category');
params.push({ name: '@category', value: query.category });
}
const where = `WHERE ${conditions.join(' AND ')}`; const sortDir = query.sortOrder === 'asc' ? 1 : -1;
const sortField = `c.${query.sortBy}`;
const orderDir = query.sortOrder.toUpperCase();
// Count query const total = await collection().count(filter);
const countResult = await container()
.items.query<number>({
query: `SELECT VALUE COUNT(1) FROM c ${where}`,
parameters: params,
})
.fetchAll();
const total = countResult.resources[0] ?? 0;
// Data query with pagination const items = await collection().findMany({
const { resources } = await container() filter,
.items.query<TimerDoc>({ sort: { [query.sortBy]: sortDir } as Record<string, 1 | -1>,
query: `SELECT * FROM c ${where} ORDER BY ${sortField} ${orderDir} OFFSET @offset LIMIT @limit`, offset: query.offset,
parameters: [ limit: query.limit,
...params, });
{ name: '@offset', value: query.offset },
{ name: '@limit', value: query.limit },
],
})
.fetchAll();
return { items: resources, total }; return { items, total };
} }
export async function getTimer(id: string, userId: string): Promise<TimerDoc | null> { export async function getTimer(id: string, userId: string): Promise<TimerDoc | null> {
try { return collection().findById(id, userId);
const { resource } = await container().item(id, userId).read<TimerDoc>();
return resource ?? null;
} catch {
return null;
}
} }
export async function createTimer(doc: TimerDoc): Promise<TimerDoc> { export async function createTimer(doc: TimerDoc): Promise<TimerDoc> {
const { resource } = await container().items.create(doc); return collection().create(doc);
return resource as TimerDoc;
} }
export async function updateTimer( export async function updateTimer(
@ -88,7 +53,7 @@ export async function updateTimer(
expectedSyncVersion: number expectedSyncVersion: number
): Promise<{ doc: TimerDoc | null; conflict: boolean; serverVersion?: number }> { ): Promise<{ doc: TimerDoc | null; conflict: boolean; serverVersion?: number }> {
try { try {
const { resource: existing } = await container().item(id, userId).read<TimerDoc>(); const existing = await collection().findById(id, userId);
if (!existing) return { doc: null, conflict: false }; if (!existing) return { doc: null, conflict: false };
// Optimistic concurrency: reject stale writes // Optimistic concurrency: reject stale writes
@ -103,8 +68,8 @@ export async function updateTimer(
syncVersion: expectedSyncVersion, syncVersion: expectedSyncVersion,
lastSyncedAt: now, lastSyncedAt: now,
}; };
const { resource } = await container().item(id, userId).replace(merged); const doc = await collection().upsert(merged);
return { doc: resource as TimerDoc, conflict: false }; return { doc, conflict: false };
} catch { } catch {
return { doc: null, conflict: false }; return { doc: null, conflict: false };
} }
@ -112,9 +77,9 @@ export async function updateTimer(
export async function deleteTimer(id: string, userId: string): Promise<boolean> { export async function deleteTimer(id: string, userId: string): Promise<boolean> {
try { try {
const { resource: existing } = await container().item(id, userId).read<TimerDoc>(); const existing = await collection().findById(id, userId);
if (!existing) return false; if (!existing) return false;
await container().item(id, userId).delete(); await collection().delete(id, userId);
return true; return true;
} catch { } catch {
return false; return false;
@ -127,19 +92,11 @@ export async function getTimersSince(
sinceTimestamp: string, sinceTimestamp: string,
limit: number limit: number
): Promise<TimerDoc[]> { ): Promise<TimerDoc[]> {
const { resources } = await container() return collection().findMany({
.items.query<TimerDoc>({ filter: { userId, productId, lastSyncedAt: { $gte: sinceTimestamp } },
query: sort: { lastSyncedAt: 1 },
'SELECT * FROM c WHERE c.userId = @userId AND c.productId = @productId AND c.lastSyncedAt >= @since ORDER BY c.lastSyncedAt ASC OFFSET 0 LIMIT @limit', limit,
parameters: [ });
{ name: '@userId', value: userId },
{ name: '@productId', value: productId },
{ name: '@since', value: sinceTimestamp },
{ name: '@limit', value: limit },
],
})
.fetchAll();
return resources;
} }
export async function batchUpsert( export async function batchUpsert(
@ -166,7 +123,7 @@ export async function batchUpsert(
productId, productId,
lastSyncedAt: now, lastSyncedAt: now,
}; };
await container().item(timer.id, userId).replace(merged); await collection().upsert(merged);
synced.push(timer.id); synced.push(timer.id);
} else { } else {
conflicts.push({ id: timer.id, serverVersion: existing.syncVersion }); conflicts.push({ id: timer.id, serverVersion: existing.syncVersion });
@ -179,7 +136,7 @@ export async function batchUpsert(
productId, productId,
lastSyncedAt: now, lastSyncedAt: now,
} as TimerDoc; } as TimerDoc;
await container().items.create(doc); await collection().create(doc);
synced.push(timer.id); synced.push(timer.id);
} }
} catch (err) { } catch (err) {

View File

@ -1,4 +1,4 @@
import { getContainer } from '../../lib/cosmos.js'; import { getCollection } from '../../lib/datastore.js';
import { NotFoundError, ConflictError } from '../../lib/errors.js'; import { NotFoundError, ConflictError } from '../../lib/errors.js';
import type { import type {
WebhookSubscriptionDoc, WebhookSubscriptionDoc,
@ -8,15 +8,12 @@ import type {
WebhookEventType, WebhookEventType,
} from './types.js'; } from './types.js';
const SUBS_CONTAINER = 'webhook_subscriptions'; function subsCollection() {
const EVENTS_CONTAINER = 'webhook_events'; return getCollection<WebhookSubscriptionDoc>('webhook_subscriptions', '/userId');
function subsContainer() {
return getContainer(SUBS_CONTAINER);
} }
function eventsContainer() { function eventsCollection() {
return getContainer(EVENTS_CONTAINER); return getCollection<WebhookEventDoc>('webhook_events', '/subscriptionId');
} }
// ── Subscription CRUD ───────────────────────────────────────── // ── Subscription CRUD ─────────────────────────────────────────
@ -25,29 +22,18 @@ export async function listSubscriptions(
userId: string, userId: string,
productId: string productId: string
): Promise<WebhookSubscriptionDoc[]> { ): Promise<WebhookSubscriptionDoc[]> {
const { resources } = await subsContainer() return subsCollection().findMany({
.items.query<WebhookSubscriptionDoc>( filter: { userId, productId },
{ sort: { createdAt: -1 },
query: });
'SELECT * FROM c WHERE c.userId = @userId AND c.productId = @productId ORDER BY c.createdAt DESC',
parameters: [
{ name: '@userId', value: userId },
{ name: '@productId', value: productId },
],
},
{ partitionKey: userId }
)
.fetchAll();
return resources;
} }
export async function getSubscription(id: string, userId: string): Promise<WebhookSubscriptionDoc> { export async function getSubscription(id: string, userId: string): Promise<WebhookSubscriptionDoc> {
const { resource } = await subsContainer().item(id, userId).read<WebhookSubscriptionDoc>(); const doc = await subsCollection().findById(id, userId);
if (!resource) { if (!doc) {
throw new NotFoundError(`Webhook subscription '${id}' not found`); throw new NotFoundError(`Webhook subscription '${id}' not found`);
} }
return resource; return doc;
} }
export async function createSubscription( export async function createSubscription(
@ -73,10 +59,9 @@ export async function createSubscription(
}; };
try { try {
const { resource } = await subsContainer().items.create(doc); return await subsCollection().create(doc);
return resource as WebhookSubscriptionDoc;
} catch (err: unknown) { } catch (err: unknown) {
if (err && typeof err === 'object' && 'code' in err && err.code === 409) { if (err instanceof Error && err.message.includes('already exists')) {
throw new ConflictError(`Subscription '${id}' already exists`); throw new ConflictError(`Subscription '${id}' already exists`);
} }
throw err; throw err;
@ -96,13 +81,12 @@ export async function updateSubscription(
updatedAt: new Date().toISOString(), updatedAt: new Date().toISOString(),
}; };
const { resource } = await subsContainer().item(id, userId).replace(updated); return subsCollection().upsert(updated);
return resource as WebhookSubscriptionDoc;
} }
export async function deleteSubscription(id: string, userId: string): Promise<void> { export async function deleteSubscription(id: string, userId: string): Promise<void> {
await getSubscription(id, userId); // verify exists await getSubscription(id, userId); // verify exists
await subsContainer().item(id, userId).delete(); await subsCollection().delete(id, userId);
} }
// ── Find Subscriptions for Event ────────────────────────────── // ── Find Subscriptions for Event ──────────────────────────────
@ -112,22 +96,10 @@ export async function findSubscriptionsForEvent(
productId: string, productId: string,
eventType: WebhookEventType eventType: WebhookEventType
): Promise<WebhookSubscriptionDoc[]> { ): Promise<WebhookSubscriptionDoc[]> {
const { resources } = await subsContainer() // events is a string[] — $contains works for primitive arrays in memory provider
.items.query<WebhookSubscriptionDoc>( return subsCollection().findMany({
{ filter: { userId, productId, active: true, events: { $contains: eventType } },
query: });
'SELECT * FROM c WHERE c.userId = @userId AND c.productId = @productId AND c.active = true AND ARRAY_CONTAINS(c.events, @eventType)',
parameters: [
{ name: '@userId', value: userId },
{ name: '@productId', value: productId },
{ name: '@eventType', value: eventType },
],
},
{ partitionKey: userId }
)
.fetchAll();
return resources;
} }
// ── Increment Failure Count ─────────────────────────────────── // ── Increment Failure Count ───────────────────────────────────
@ -139,54 +111,38 @@ export async function incrementFailureCount(id: string, userId: string): Promise
// Auto-disable after 10 consecutive failures // Auto-disable after 10 consecutive failures
const active = failureCount < 10; const active = failureCount < 10;
await subsContainer() await subsCollection().upsert({
.item(id, userId) ...existing,
.replace({ failureCount,
...existing, active,
failureCount, updatedAt: new Date().toISOString(),
active, });
updatedAt: new Date().toISOString(),
});
} }
export async function resetFailureCount(id: string, userId: string): Promise<void> { export async function resetFailureCount(id: string, userId: string): Promise<void> {
const existing = await getSubscription(id, userId); const existing = await getSubscription(id, userId);
await subsContainer() await subsCollection().upsert({
.item(id, userId) ...existing,
.replace({ failureCount: 0,
...existing, lastDeliveryAt: new Date().toISOString(),
failureCount: 0, updatedAt: new Date().toISOString(),
lastDeliveryAt: new Date().toISOString(), });
updatedAt: new Date().toISOString(),
});
} }
// ── Event Log ───────────────────────────────────────────────── // ── Event Log ─────────────────────────────────────────────────
export async function createEvent(doc: WebhookEventDoc): Promise<WebhookEventDoc> { export async function createEvent(doc: WebhookEventDoc): Promise<WebhookEventDoc> {
const { resource } = await eventsContainer().items.create(doc); return eventsCollection().create(doc);
return resource as WebhookEventDoc;
} }
export async function updateEvent(doc: WebhookEventDoc): Promise<WebhookEventDoc> { export async function updateEvent(doc: WebhookEventDoc): Promise<WebhookEventDoc> {
const { resource } = await eventsContainer().item(doc.id, doc.subscriptionId).replace(doc); return eventsCollection().upsert(doc);
return resource as WebhookEventDoc;
} }
export async function listEvents(subscriptionId: string, limit = 50): Promise<WebhookEventDoc[]> { export async function listEvents(subscriptionId: string, limit = 50): Promise<WebhookEventDoc[]> {
const { resources } = await eventsContainer() return eventsCollection().findMany({
.items.query<WebhookEventDoc>( filter: { subscriptionId },
{ sort: { createdAt: -1 },
query: limit,
'SELECT TOP @limit * FROM c WHERE c.subscriptionId = @subscriptionId ORDER BY c.createdAt DESC', });
parameters: [
{ name: '@subscriptionId', value: subscriptionId },
{ name: '@limit', value: limit },
],
},
{ partitionKey: subscriptionId }
)
.fetchAll();
return resources;
} }

View File

@ -13,6 +13,7 @@ import { householdRoutes } from './modules/households/routes.js';
import { sharedTimerRoutes } from './modules/shared-timers/routes.js'; import { sharedTimerRoutes } from './modules/shared-timers/routes.js';
import { webhookRoutes } from './modules/webhooks/routes.js'; import { webhookRoutes } from './modules/webhooks/routes.js';
import { initCosmosIfNeeded } from './lib/cosmos-init.js'; import { initCosmosIfNeeded } from './lib/cosmos-init.js';
import { initDatastore } from './lib/datastore.js';
import { config } from './lib/config.js'; import { config } from './lib/config.js';
import { jwtVerify } from 'jose'; import { jwtVerify } from 'jose';
@ -21,6 +22,7 @@ import type { JwtPayload } from './lib/request-context.js';
const jwtSecret = new TextEncoder().encode(config.JWT_SECRET); const jwtSecret = new TextEncoder().encode(config.JWT_SECRET);
await initCosmosIfNeeded(); await initCosmosIfNeeded();
initDatastore();
const app = await createServiceApp({ const app = await createServiceApp({
name: 'chronomind-backend', name: 'chronomind-backend',