From 560c95a599da5f499f0f0f455bf30775083e8fe3 Mon Sep 17 00:00:00 2001 From: Saravana Achu Mac Date: Sat, 4 Apr 2026 16:37:54 -0700 Subject: [PATCH] refactor: move capital ledger to cosmos-first repository --- backend/src/services/CapitalLedger.ts | 151 +++++++----------- .../src/services/capitalLedgerRepository.ts | 137 ++++++++++++++++ docs/ROADMAP.md | 22 ++- 3 files changed, 206 insertions(+), 104 deletions(-) create mode 100644 backend/src/services/capitalLedgerRepository.ts diff --git a/backend/src/services/CapitalLedger.ts b/backend/src/services/CapitalLedger.ts index b713545..592479b 100644 --- a/backend/src/services/CapitalLedger.ts +++ b/backend/src/services/CapitalLedger.ts @@ -2,6 +2,7 @@ import logger from '../utils/logger.js'; import { config } from '../config/index.js'; import { supabaseService } from './SupabaseService.js'; import { getTradeProfileCapital } from './profileRepository.js'; +import { getCapitalLedger, upsertCapitalLedger } from './capitalLedgerRepository.js'; export interface CapitalLedgerRecord { profile_id: string; @@ -34,34 +35,20 @@ export class CapitalLedger { } private async ensureLedger(profileId: string, allocatedCapital?: number): Promise { - const client = supabaseService.getClient(); try { - if (!client) { - logger.error(`[CapitalLedger] ensureLedger aborted for ${profileId}: Supabase client unavailable (fail-closed).`); - return null; - } const profileCapital = await getTradeProfileCapital(profileId, supabaseService); const allocation = toNumeric(allocatedCapital ?? profileCapital?.allocatedCapital ?? config.TOTAL_CAPITAL); + const existing = await getCapitalLedger(profileId, supabaseService); + const nextRecord: CapitalLedgerRecord = { + profile_id: profileId, + allocated_capital: allocation, + reserved_for_orders: toNumeric(existing?.reserved_for_orders), + reserved_for_positions: toNumeric(existing?.reserved_for_positions), + realized_pnl: toNumeric(existing?.realized_pnl), + updated_at: new Date().toISOString() + }; - const { data, error } = await client - .from('capital_ledgers') - .upsert({ - profile_id: profileId, - allocated_capital: allocation - }, { onConflict: 'profile_id' }) - .select('*') - .maybeSingle(); - - if (error) { - if (this.isRpcNetworkFailure(error)) { - logger.error(`[CapitalLedger] ensureLedger RPC network failure for ${profileId}, aborting ledger mutation (fail-closed): ${error.message}`); - return null; - } - logger.error(`[CapitalLedger] ensureLedger failed: ${error.message}`); - return null; - } - - return data as CapitalLedgerRecord; + return await upsertCapitalLedger(nextRecord, supabaseService); } catch (err: any) { if (this.isRpcNetworkFailure(err)) { logger.error(`[CapitalLedger] ensureLedger network failure for ${profileId}, aborting ledger mutation (fail-closed): ${err.message}`); @@ -72,69 +59,34 @@ export class CapitalLedger { } } - private async rpc(fn: string, args: Record): Promise { - const client = supabaseService.getClient(); - if (!client) { - logger.error(`[CapitalLedger] ${fn} aborted: Supabase client unavailable (fail-closed).`); - return null; - } - try { - const { data, error } = await client.rpc(fn, args); - if (error) { - if (this.isRpcNetworkFailure(error)) { - logger.error(`[CapitalLedger] ${fn} RPC network failure, rejecting mutation (fail-closed): ${error.message}`); - return null; - } - logger.error(`[CapitalLedger] ${fn} failed: ${error.message}`); - return null; - } - return data as T; - } catch (err: any) { - if (this.isRpcNetworkFailure(err)) { - logger.error(`[CapitalLedger] ${fn} network failure, rejecting mutation (fail-closed): ${err.message}`); - return null; - } - logger.error(`[CapitalLedger] ${fn} unexpected error: ${err.message}`); - return null; - } - } - - private async mutate(profileId: string, fn: () => Promise) { + private async mutate( + profileId: string, + fn: (current: CapitalLedgerRecord) => Promise + ) { return this.withLock(profileId, async () => { const ledger = await this.ensureLedger(profileId); if (!ledger) return null; - return fn(); + return fn(ledger); }); } public async getLedger(profileId: string): Promise { - const client = supabaseService.getClient(); - if (!client) return null; - const { data, error } = await client - .from('capital_ledgers') - .select('*') - .eq('profile_id', profileId) - .maybeSingle(); - - if (error) { - if (this.isRpcNetworkFailure(error)) { - logger.error(`[CapitalLedger] getLedger RPC network failure for ${profileId}, returning null (fail-closed): ${error.message}`); - return null; - } - logger.error(`[CapitalLedger] getLedger failed: ${error.message}`); - return null; - } - - return data as CapitalLedgerRecord; + return getCapitalLedger(profileId, supabaseService); } public async reserveForOrder(profileId: string, amount: number): Promise { if (!profileId || amount <= 0) return false; - const result = await this.mutate(profileId, async () => { - return this.rpc('fn_reserve_for_order', { - p_profile: profileId, - p_amount: amount - }); + const result = await this.mutate(profileId, async (ledger) => { + const available = this.availableCapital(ledger); + if (available + 1e-8 < amount) { + return null; + } + + return upsertCapitalLedger({ + ...ledger, + reserved_for_orders: toNumeric(ledger.reserved_for_orders) + amount, + updated_at: new Date().toISOString() + }, supabaseService); }); if (result) return true; @@ -157,43 +109,48 @@ export class CapitalLedger { public async releaseOrderReservation(profileId: string, amount: number): Promise { if (!profileId || amount <= 0) return; - await this.mutate(profileId, async () => { - return this.rpc('fn_release_order_reservation', { - p_profile: profileId, - p_amount: amount - }); + await this.mutate(profileId, async (ledger) => { + return upsertCapitalLedger({ + ...ledger, + reserved_for_orders: Math.max(0, toNumeric(ledger.reserved_for_orders) - amount), + updated_at: new Date().toISOString() + }, supabaseService); }); } public async adjustPositionReservation(profileId: string, delta: number): Promise { if (!profileId || delta === 0) return; - await this.mutate(profileId, async () => { - return this.rpc('fn_adjust_position_reservation', { - p_profile: profileId, - p_delta: delta - }); + await this.mutate(profileId, async (ledger) => { + return upsertCapitalLedger({ + ...ledger, + reserved_for_positions: Math.max(0, toNumeric(ledger.reserved_for_positions) + delta), + updated_at: new Date().toISOString() + }, supabaseService); }); } public async recordRealizedPnl(profileId: string, delta: number): Promise { if (!profileId || delta === 0) return; - await this.mutate(profileId, async () => { - return this.rpc('fn_record_realized_pnl', { - p_profile: profileId, - p_delta: delta - }); + await this.mutate(profileId, async (ledger) => { + return upsertCapitalLedger({ + ...ledger, + realized_pnl: toNumeric(ledger.realized_pnl) + delta, + updated_at: new Date().toISOString() + }, supabaseService); }); } public async rebuildLedger(profileId: string, reservedOrders: number, reservedPositions: number): Promise { if (!profileId) return; await this.withLock(profileId, async () => { - await this.ensureLedger(profileId); - await this.rpc('fn_rebuild_ledger', { - p_profile: profileId, - p_reserved_orders: reservedOrders, - p_reserved_positions: reservedPositions - }); + const ledger = await this.ensureLedger(profileId); + if (!ledger) return null; + return upsertCapitalLedger({ + ...ledger, + reserved_for_orders: Math.max(0, toNumeric(reservedOrders)), + reserved_for_positions: Math.max(0, toNumeric(reservedPositions)), + updated_at: new Date().toISOString() + }, supabaseService); }); } diff --git a/backend/src/services/capitalLedgerRepository.ts b/backend/src/services/capitalLedgerRepository.ts new file mode 100644 index 0000000..2825907 --- /dev/null +++ b/backend/src/services/capitalLedgerRepository.ts @@ -0,0 +1,137 @@ +import { getContainer } from '@bytelyst/cosmos'; +import { config } from '../config/index.js'; +import logger from '../utils/logger.js'; +import type { supabaseService } from './SupabaseService.js'; +import type { CapitalLedgerRecord } from './CapitalLedger.js'; + +type LegacySupabaseService = typeof supabaseService; + +const CONTAINER_NAME = 'capital_ledgers'; + +interface CapitalLedgerDocument { + id: string; + productId: string; + profile_id: string; + allocated_capital: number; + reserved_for_orders: number; + reserved_for_positions: number; + realized_pnl: number; + updated_at: string; +} + +const toNumeric = (value: unknown): number => { + const numeric = Number(value); + return Number.isFinite(numeric) ? numeric : 0; +}; + +function isCosmosConfigured(): boolean { + return Boolean(config.COSMOS_ENDPOINT && config.COSMOS_KEY); +} + +function toLedgerRecord(doc: Partial | null | undefined): CapitalLedgerRecord | null { + const profileId = String(doc?.profile_id || '').trim(); + if (!profileId) return null; + + return { + profile_id: profileId, + allocated_capital: toNumeric(doc?.allocated_capital), + reserved_for_orders: toNumeric(doc?.reserved_for_orders), + reserved_for_positions: toNumeric(doc?.reserved_for_positions), + realized_pnl: toNumeric(doc?.realized_pnl), + updated_at: String(doc?.updated_at || new Date().toISOString()) + }; +} + +function toLedgerDocument(record: CapitalLedgerRecord): CapitalLedgerDocument { + return { + id: record.profile_id, + productId: config.PRODUCT_ID, + ...record + }; +} + +async function readFromCosmos(profileId: string): Promise { + const container = getContainer(CONTAINER_NAME); + try { + const { resource } = await container.item(profileId, config.PRODUCT_ID).read(); + return toLedgerRecord(resource); + } catch (error) { + const code = (error as { code?: number })?.code; + if (code === 404) return null; + throw error; + } +} + +async function writeToCosmos(record: CapitalLedgerRecord): Promise { + const container = getContainer(CONTAINER_NAME); + const { resource } = await container.items.upsert(toLedgerDocument(record)); + return toLedgerRecord(resource as unknown as CapitalLedgerDocument); +} + +export async function getCapitalLedger(profileId: string, legacyService?: LegacySupabaseService): Promise { + if (!profileId) return null; + + if (isCosmosConfigured()) { + try { + const cosmosRecord = await readFromCosmos(profileId); + if (cosmosRecord) return cosmosRecord; + } catch (error) { + logger.warn(`[CapitalLedgerRepo] Cosmos read failed, falling back to legacy store: ${error instanceof Error ? error.message : 'unknown error'}`); + } + } + + const client = legacyService?.getClient?.(); + if (!client) return null; + + const { data, error } = await client + .from('capital_ledgers') + .select('*') + .eq('profile_id', profileId) + .maybeSingle(); + + if (error) { + logger.error(`[CapitalLedgerRepo] Legacy read failed for ${profileId}: ${error.message}`); + return null; + } + + return toLedgerRecord(data as CapitalLedgerDocument); +} + +export async function upsertCapitalLedger( + record: CapitalLedgerRecord, + legacyService?: LegacySupabaseService +): Promise { + if (!record.profile_id) return null; + + if (isCosmosConfigured()) { + try { + const saved = await writeToCosmos(record); + if (saved) return saved; + } catch (error) { + logger.warn(`[CapitalLedgerRepo] Cosmos upsert failed, falling back to legacy store: ${error instanceof Error ? error.message : 'unknown error'}`); + } + } + + const client = legacyService?.getClient?.(); + if (!client) return null; + + const { data, error } = await client + .from('capital_ledgers') + .upsert({ + profile_id: record.profile_id, + allocated_capital: record.allocated_capital, + reserved_for_orders: record.reserved_for_orders, + reserved_for_positions: record.reserved_for_positions, + realized_pnl: record.realized_pnl, + updated_at: record.updated_at + }, { onConflict: 'profile_id' }) + .select('*') + .maybeSingle(); + + if (error) { + logger.error(`[CapitalLedgerRepo] Legacy upsert failed for ${record.profile_id}: ${error.message}`); + return null; + } + + return toLedgerRecord(data as CapitalLedgerDocument); +} diff --git a/docs/ROADMAP.md b/docs/ROADMAP.md index 9369646..363235d 100644 --- a/docs/ROADMAP.md +++ b/docs/ROADMAP.md @@ -32,10 +32,13 @@ It assumes: - [x] Mobile migrated into `mobile/` with product identity, shared runtime bootstrap, launch-time kill-switch gate, platform-service auth, live backend polling plus websocket-backed updates, startup/error telemetry capture, secure session storage with invalidation handling, and explicit degraded/offline status surfacing - [x] Backend now accepts common-platform JWTs with legacy Supabase fallback and persists global trading-control state through Cosmos-backed control storage - [x] Dynamic config now flows through backend control-plane APIs with Cosmos-first storage and legacy Supabase fallback +- [x] Backend snapshots now use a Cosmos-first repository with legacy fallback +- [x] Distributed entry and reconciliation locks now use a Cosmos-first repository with legacy fallback +- [x] Capital ledger persistence now uses a Cosmos-first repository with legacy fallback - [x] Mobile platform auth requests now use the common React Native platform SDK - [x] Root verification and lint flows now run successfully without sandbox-hostile script harness behavior - [-] DRY cleanup completed for runtime/config/bootstrap concerns, shared websocket auth helpers, and platform-session handling, but not yet for all data-plane persistence flows -- [!] Full common-platform data-plane replacement remains a follow-up; backend and web still retain legacy Supabase data access for trading records and configuration tables +- [!] Full common-platform data-plane replacement remains a follow-up; backend and web still retain legacy Supabase access for profile risk aggregates, trading records, and some configuration/history tables ## 3. Guiding Rules @@ -218,6 +221,9 @@ Make backend the stable authority before web and mobile migrate heavily onto it. - [x] Add runtime control endpoints - [x] Add platform-JWT verification with legacy fallback - [x] Add Cosmos-backed global trading-control persistence +- [x] Move snapshots to Cosmos-first repository flow with legacy fallback +- [x] Move distributed runtime locks to Cosmos-first repository flow with legacy fallback +- [x] Move capital ledger persistence to Cosmos-first repository flow with legacy fallback - [-] Standardize admin controls and audit logging - [ ] Define admin audit event schema - [ ] Define durable state ownership between memory, database, and exchange sync @@ -430,6 +436,7 @@ Validate that the new monorepo is safer and more coherent than the legacy setup - [ ] Add websocket auth model and namespaces - [ ] Add runtime control endpoints - [ ] Add telemetry and health integration +- [x] Add Cosmos-first repository layer for snapshots, distributed locks, and capital ledger persistence - [ ] Add reconciliation and safety docs - [ ] Define admin audit event schema @@ -466,6 +473,7 @@ Validate that the new monorepo is safer and more coherent than the legacy setup - [ ] Mobile overview/alerts/positions/history - [ ] DRY cleanup - [x] Verification and cutover docs +- [-] Backend Cosmos-first repository migration for safety-critical persistence ### Recommended Rollout Order @@ -513,8 +521,8 @@ Validate that the new monorepo is safer and more coherent than the legacy setup ### Recommended Cutover Approach -- [ ] Build target contracts in the new repo -- [ ] Validate backend behavior in isolation +- [x] Build target contracts in the new repo +- [x] Validate backend behavior in isolation - [ ] Migrate internal web usage - [ ] Release mobile in controlled beta - [ ] Switch operational ownership only after monitoring and support confidence is established @@ -585,7 +593,7 @@ Reason: ## 16. Immediate Next Steps -- [ ] Approve PRD and roadmap direction -- [ ] Scaffold the root monorepo structure -- [ ] Add `shared/product.json`, root `package.json`, `pnpm-workspace.yaml`, and `.env.example` -- [ ] Define backend contract skeleton before porting large amounts of legacy code +- [ ] Finish profile risk/PnL aggregate repository migration off legacy Supabase +- [ ] Finish remaining web direct legacy data-table reads and writes behind backend APIs +- [ ] Replace remaining transitional web auth compatibility surfaces with fully common-platform-native session handling +- [ ] Add release smoke coverage for web auth/kill-switch and mobile auth/kill-switch flows