refactor: move capital ledger to cosmos-first repository
This commit is contained in:
parent
5c4c001f35
commit
560c95a599
@ -2,6 +2,7 @@ import logger from '../utils/logger.js';
|
||||
import { config } from '../config/index.js';
|
||||
import { supabaseService } from './SupabaseService.js';
|
||||
import { getTradeProfileCapital } from './profileRepository.js';
|
||||
import { getCapitalLedger, upsertCapitalLedger } from './capitalLedgerRepository.js';
|
||||
|
||||
export interface CapitalLedgerRecord {
|
||||
profile_id: string;
|
||||
@ -34,34 +35,20 @@ export class CapitalLedger {
|
||||
}
|
||||
|
||||
private async ensureLedger(profileId: string, allocatedCapital?: number): Promise<CapitalLedgerRecord | null> {
|
||||
const client = supabaseService.getClient();
|
||||
try {
|
||||
if (!client) {
|
||||
logger.error(`[CapitalLedger] ensureLedger aborted for ${profileId}: Supabase client unavailable (fail-closed).`);
|
||||
return null;
|
||||
}
|
||||
const profileCapital = await getTradeProfileCapital(profileId, supabaseService);
|
||||
const allocation = toNumeric(allocatedCapital ?? profileCapital?.allocatedCapital ?? config.TOTAL_CAPITAL);
|
||||
const existing = await getCapitalLedger(profileId, supabaseService);
|
||||
const nextRecord: CapitalLedgerRecord = {
|
||||
profile_id: profileId,
|
||||
allocated_capital: allocation,
|
||||
reserved_for_orders: toNumeric(existing?.reserved_for_orders),
|
||||
reserved_for_positions: toNumeric(existing?.reserved_for_positions),
|
||||
realized_pnl: toNumeric(existing?.realized_pnl),
|
||||
updated_at: new Date().toISOString()
|
||||
};
|
||||
|
||||
const { data, error } = await client
|
||||
.from('capital_ledgers')
|
||||
.upsert({
|
||||
profile_id: profileId,
|
||||
allocated_capital: allocation
|
||||
}, { onConflict: 'profile_id' })
|
||||
.select('*')
|
||||
.maybeSingle();
|
||||
|
||||
if (error) {
|
||||
if (this.isRpcNetworkFailure(error)) {
|
||||
logger.error(`[CapitalLedger] ensureLedger RPC network failure for ${profileId}, aborting ledger mutation (fail-closed): ${error.message}`);
|
||||
return null;
|
||||
}
|
||||
logger.error(`[CapitalLedger] ensureLedger failed: ${error.message}`);
|
||||
return null;
|
||||
}
|
||||
|
||||
return data as CapitalLedgerRecord;
|
||||
return await upsertCapitalLedger(nextRecord, supabaseService);
|
||||
} catch (err: any) {
|
||||
if (this.isRpcNetworkFailure(err)) {
|
||||
logger.error(`[CapitalLedger] ensureLedger network failure for ${profileId}, aborting ledger mutation (fail-closed): ${err.message}`);
|
||||
@ -72,69 +59,34 @@ export class CapitalLedger {
|
||||
}
|
||||
}
|
||||
|
||||
private async rpc<T>(fn: string, args: Record<string, unknown>): Promise<T | null> {
|
||||
const client = supabaseService.getClient();
|
||||
if (!client) {
|
||||
logger.error(`[CapitalLedger] ${fn} aborted: Supabase client unavailable (fail-closed).`);
|
||||
return null;
|
||||
}
|
||||
try {
|
||||
const { data, error } = await client.rpc(fn, args);
|
||||
if (error) {
|
||||
if (this.isRpcNetworkFailure(error)) {
|
||||
logger.error(`[CapitalLedger] ${fn} RPC network failure, rejecting mutation (fail-closed): ${error.message}`);
|
||||
return null;
|
||||
}
|
||||
logger.error(`[CapitalLedger] ${fn} failed: ${error.message}`);
|
||||
return null;
|
||||
}
|
||||
return data as T;
|
||||
} catch (err: any) {
|
||||
if (this.isRpcNetworkFailure(err)) {
|
||||
logger.error(`[CapitalLedger] ${fn} network failure, rejecting mutation (fail-closed): ${err.message}`);
|
||||
return null;
|
||||
}
|
||||
logger.error(`[CapitalLedger] ${fn} unexpected error: ${err.message}`);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
private async mutate(profileId: string, fn: () => Promise<CapitalLedgerRecord | null>) {
|
||||
private async mutate(
|
||||
profileId: string,
|
||||
fn: (current: CapitalLedgerRecord) => Promise<CapitalLedgerRecord | null>
|
||||
) {
|
||||
return this.withLock(profileId, async () => {
|
||||
const ledger = await this.ensureLedger(profileId);
|
||||
if (!ledger) return null;
|
||||
return fn();
|
||||
return fn(ledger);
|
||||
});
|
||||
}
|
||||
|
||||
public async getLedger(profileId: string): Promise<CapitalLedgerRecord | null> {
|
||||
const client = supabaseService.getClient();
|
||||
if (!client) return null;
|
||||
const { data, error } = await client
|
||||
.from('capital_ledgers')
|
||||
.select('*')
|
||||
.eq('profile_id', profileId)
|
||||
.maybeSingle();
|
||||
|
||||
if (error) {
|
||||
if (this.isRpcNetworkFailure(error)) {
|
||||
logger.error(`[CapitalLedger] getLedger RPC network failure for ${profileId}, returning null (fail-closed): ${error.message}`);
|
||||
return null;
|
||||
}
|
||||
logger.error(`[CapitalLedger] getLedger failed: ${error.message}`);
|
||||
return null;
|
||||
}
|
||||
|
||||
return data as CapitalLedgerRecord;
|
||||
return getCapitalLedger(profileId, supabaseService);
|
||||
}
|
||||
|
||||
public async reserveForOrder(profileId: string, amount: number): Promise<boolean> {
|
||||
if (!profileId || amount <= 0) return false;
|
||||
const result = await this.mutate(profileId, async () => {
|
||||
return this.rpc<CapitalLedgerRecord>('fn_reserve_for_order', {
|
||||
p_profile: profileId,
|
||||
p_amount: amount
|
||||
});
|
||||
const result = await this.mutate(profileId, async (ledger) => {
|
||||
const available = this.availableCapital(ledger);
|
||||
if (available + 1e-8 < amount) {
|
||||
return null;
|
||||
}
|
||||
|
||||
return upsertCapitalLedger({
|
||||
...ledger,
|
||||
reserved_for_orders: toNumeric(ledger.reserved_for_orders) + amount,
|
||||
updated_at: new Date().toISOString()
|
||||
}, supabaseService);
|
||||
});
|
||||
if (result) return true;
|
||||
|
||||
@ -157,43 +109,48 @@ export class CapitalLedger {
|
||||
|
||||
public async releaseOrderReservation(profileId: string, amount: number): Promise<void> {
|
||||
if (!profileId || amount <= 0) return;
|
||||
await this.mutate(profileId, async () => {
|
||||
return this.rpc<CapitalLedgerRecord>('fn_release_order_reservation', {
|
||||
p_profile: profileId,
|
||||
p_amount: amount
|
||||
});
|
||||
await this.mutate(profileId, async (ledger) => {
|
||||
return upsertCapitalLedger({
|
||||
...ledger,
|
||||
reserved_for_orders: Math.max(0, toNumeric(ledger.reserved_for_orders) - amount),
|
||||
updated_at: new Date().toISOString()
|
||||
}, supabaseService);
|
||||
});
|
||||
}
|
||||
|
||||
public async adjustPositionReservation(profileId: string, delta: number): Promise<void> {
|
||||
if (!profileId || delta === 0) return;
|
||||
await this.mutate(profileId, async () => {
|
||||
return this.rpc<CapitalLedgerRecord>('fn_adjust_position_reservation', {
|
||||
p_profile: profileId,
|
||||
p_delta: delta
|
||||
});
|
||||
await this.mutate(profileId, async (ledger) => {
|
||||
return upsertCapitalLedger({
|
||||
...ledger,
|
||||
reserved_for_positions: Math.max(0, toNumeric(ledger.reserved_for_positions) + delta),
|
||||
updated_at: new Date().toISOString()
|
||||
}, supabaseService);
|
||||
});
|
||||
}
|
||||
|
||||
public async recordRealizedPnl(profileId: string, delta: number): Promise<void> {
|
||||
if (!profileId || delta === 0) return;
|
||||
await this.mutate(profileId, async () => {
|
||||
return this.rpc<CapitalLedgerRecord>('fn_record_realized_pnl', {
|
||||
p_profile: profileId,
|
||||
p_delta: delta
|
||||
});
|
||||
await this.mutate(profileId, async (ledger) => {
|
||||
return upsertCapitalLedger({
|
||||
...ledger,
|
||||
realized_pnl: toNumeric(ledger.realized_pnl) + delta,
|
||||
updated_at: new Date().toISOString()
|
||||
}, supabaseService);
|
||||
});
|
||||
}
|
||||
|
||||
public async rebuildLedger(profileId: string, reservedOrders: number, reservedPositions: number): Promise<void> {
|
||||
if (!profileId) return;
|
||||
await this.withLock(profileId, async () => {
|
||||
await this.ensureLedger(profileId);
|
||||
await this.rpc<CapitalLedgerRecord>('fn_rebuild_ledger', {
|
||||
p_profile: profileId,
|
||||
p_reserved_orders: reservedOrders,
|
||||
p_reserved_positions: reservedPositions
|
||||
});
|
||||
const ledger = await this.ensureLedger(profileId);
|
||||
if (!ledger) return null;
|
||||
return upsertCapitalLedger({
|
||||
...ledger,
|
||||
reserved_for_orders: Math.max(0, toNumeric(reservedOrders)),
|
||||
reserved_for_positions: Math.max(0, toNumeric(reservedPositions)),
|
||||
updated_at: new Date().toISOString()
|
||||
}, supabaseService);
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
137
backend/src/services/capitalLedgerRepository.ts
Normal file
137
backend/src/services/capitalLedgerRepository.ts
Normal file
@ -0,0 +1,137 @@
|
||||
import { getContainer } from '@bytelyst/cosmos';
|
||||
import { config } from '../config/index.js';
|
||||
import logger from '../utils/logger.js';
|
||||
import type { supabaseService } from './SupabaseService.js';
|
||||
import type { CapitalLedgerRecord } from './CapitalLedger.js';
|
||||
|
||||
type LegacySupabaseService = typeof supabaseService;
|
||||
|
||||
const CONTAINER_NAME = 'capital_ledgers';
|
||||
|
||||
interface CapitalLedgerDocument {
|
||||
id: string;
|
||||
productId: string;
|
||||
profile_id: string;
|
||||
allocated_capital: number;
|
||||
reserved_for_orders: number;
|
||||
reserved_for_positions: number;
|
||||
realized_pnl: number;
|
||||
updated_at: string;
|
||||
}
|
||||
|
||||
const toNumeric = (value: unknown): number => {
|
||||
const numeric = Number(value);
|
||||
return Number.isFinite(numeric) ? numeric : 0;
|
||||
};
|
||||
|
||||
function isCosmosConfigured(): boolean {
|
||||
return Boolean(config.COSMOS_ENDPOINT && config.COSMOS_KEY);
|
||||
}
|
||||
|
||||
function toLedgerRecord(doc: Partial<CapitalLedgerDocument> | null | undefined): CapitalLedgerRecord | null {
|
||||
const profileId = String(doc?.profile_id || '').trim();
|
||||
if (!profileId) return null;
|
||||
|
||||
return {
|
||||
profile_id: profileId,
|
||||
allocated_capital: toNumeric(doc?.allocated_capital),
|
||||
reserved_for_orders: toNumeric(doc?.reserved_for_orders),
|
||||
reserved_for_positions: toNumeric(doc?.reserved_for_positions),
|
||||
realized_pnl: toNumeric(doc?.realized_pnl),
|
||||
updated_at: String(doc?.updated_at || new Date().toISOString())
|
||||
};
|
||||
}
|
||||
|
||||
function toLedgerDocument(record: CapitalLedgerRecord): CapitalLedgerDocument {
|
||||
return {
|
||||
id: record.profile_id,
|
||||
productId: config.PRODUCT_ID,
|
||||
...record
|
||||
};
|
||||
}
|
||||
|
||||
async function readFromCosmos(profileId: string): Promise<CapitalLedgerRecord | null> {
|
||||
const container = getContainer(CONTAINER_NAME);
|
||||
try {
|
||||
const { resource } = await container.item(profileId, config.PRODUCT_ID).read<CapitalLedgerDocument>();
|
||||
return toLedgerRecord(resource);
|
||||
} catch (error) {
|
||||
const code = (error as { code?: number })?.code;
|
||||
if (code === 404) return null;
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
async function writeToCosmos(record: CapitalLedgerRecord): Promise<CapitalLedgerRecord | null> {
|
||||
const container = getContainer(CONTAINER_NAME);
|
||||
const { resource } = await container.items.upsert(toLedgerDocument(record));
|
||||
return toLedgerRecord(resource as unknown as CapitalLedgerDocument);
|
||||
}
|
||||
|
||||
export async function getCapitalLedger(profileId: string, legacyService?: LegacySupabaseService): Promise<CapitalLedgerRecord | null> {
|
||||
if (!profileId) return null;
|
||||
|
||||
if (isCosmosConfigured()) {
|
||||
try {
|
||||
const cosmosRecord = await readFromCosmos(profileId);
|
||||
if (cosmosRecord) return cosmosRecord;
|
||||
} catch (error) {
|
||||
logger.warn(`[CapitalLedgerRepo] Cosmos read failed, falling back to legacy store: ${error instanceof Error ? error.message : 'unknown error'}`);
|
||||
}
|
||||
}
|
||||
|
||||
const client = legacyService?.getClient?.();
|
||||
if (!client) return null;
|
||||
|
||||
const { data, error } = await client
|
||||
.from('capital_ledgers')
|
||||
.select('*')
|
||||
.eq('profile_id', profileId)
|
||||
.maybeSingle();
|
||||
|
||||
if (error) {
|
||||
logger.error(`[CapitalLedgerRepo] Legacy read failed for ${profileId}: ${error.message}`);
|
||||
return null;
|
||||
}
|
||||
|
||||
return toLedgerRecord(data as CapitalLedgerDocument);
|
||||
}
|
||||
|
||||
export async function upsertCapitalLedger(
|
||||
record: CapitalLedgerRecord,
|
||||
legacyService?: LegacySupabaseService
|
||||
): Promise<CapitalLedgerRecord | null> {
|
||||
if (!record.profile_id) return null;
|
||||
|
||||
if (isCosmosConfigured()) {
|
||||
try {
|
||||
const saved = await writeToCosmos(record);
|
||||
if (saved) return saved;
|
||||
} catch (error) {
|
||||
logger.warn(`[CapitalLedgerRepo] Cosmos upsert failed, falling back to legacy store: ${error instanceof Error ? error.message : 'unknown error'}`);
|
||||
}
|
||||
}
|
||||
|
||||
const client = legacyService?.getClient?.();
|
||||
if (!client) return null;
|
||||
|
||||
const { data, error } = await client
|
||||
.from('capital_ledgers')
|
||||
.upsert({
|
||||
profile_id: record.profile_id,
|
||||
allocated_capital: record.allocated_capital,
|
||||
reserved_for_orders: record.reserved_for_orders,
|
||||
reserved_for_positions: record.reserved_for_positions,
|
||||
realized_pnl: record.realized_pnl,
|
||||
updated_at: record.updated_at
|
||||
}, { onConflict: 'profile_id' })
|
||||
.select('*')
|
||||
.maybeSingle();
|
||||
|
||||
if (error) {
|
||||
logger.error(`[CapitalLedgerRepo] Legacy upsert failed for ${record.profile_id}: ${error.message}`);
|
||||
return null;
|
||||
}
|
||||
|
||||
return toLedgerRecord(data as CapitalLedgerDocument);
|
||||
}
|
||||
@ -32,10 +32,13 @@ It assumes:
|
||||
- [x] Mobile migrated into `mobile/` with product identity, shared runtime bootstrap, launch-time kill-switch gate, platform-service auth, live backend polling plus websocket-backed updates, startup/error telemetry capture, secure session storage with invalidation handling, and explicit degraded/offline status surfacing
|
||||
- [x] Backend now accepts common-platform JWTs with legacy Supabase fallback and persists global trading-control state through Cosmos-backed control storage
|
||||
- [x] Dynamic config now flows through backend control-plane APIs with Cosmos-first storage and legacy Supabase fallback
|
||||
- [x] Backend snapshots now use a Cosmos-first repository with legacy fallback
|
||||
- [x] Distributed entry and reconciliation locks now use a Cosmos-first repository with legacy fallback
|
||||
- [x] Capital ledger persistence now uses a Cosmos-first repository with legacy fallback
|
||||
- [x] Mobile platform auth requests now use the common React Native platform SDK
|
||||
- [x] Root verification and lint flows now run successfully without sandbox-hostile script harness behavior
|
||||
- [-] DRY cleanup completed for runtime/config/bootstrap concerns, shared websocket auth helpers, and platform-session handling, but not yet for all data-plane persistence flows
|
||||
- [!] Full common-platform data-plane replacement remains a follow-up; backend and web still retain legacy Supabase data access for trading records and configuration tables
|
||||
- [!] Full common-platform data-plane replacement remains a follow-up; backend and web still retain legacy Supabase access for profile risk aggregates, trading records, and some configuration/history tables
|
||||
|
||||
## 3. Guiding Rules
|
||||
|
||||
@ -218,6 +221,9 @@ Make backend the stable authority before web and mobile migrate heavily onto it.
|
||||
- [x] Add runtime control endpoints
|
||||
- [x] Add platform-JWT verification with legacy fallback
|
||||
- [x] Add Cosmos-backed global trading-control persistence
|
||||
- [x] Move snapshots to Cosmos-first repository flow with legacy fallback
|
||||
- [x] Move distributed runtime locks to Cosmos-first repository flow with legacy fallback
|
||||
- [x] Move capital ledger persistence to Cosmos-first repository flow with legacy fallback
|
||||
- [-] Standardize admin controls and audit logging
|
||||
- [ ] Define admin audit event schema
|
||||
- [ ] Define durable state ownership between memory, database, and exchange sync
|
||||
@ -430,6 +436,7 @@ Validate that the new monorepo is safer and more coherent than the legacy setup
|
||||
- [ ] Add websocket auth model and namespaces
|
||||
- [ ] Add runtime control endpoints
|
||||
- [ ] Add telemetry and health integration
|
||||
- [x] Add Cosmos-first repository layer for snapshots, distributed locks, and capital ledger persistence
|
||||
- [ ] Add reconciliation and safety docs
|
||||
- [ ] Define admin audit event schema
|
||||
|
||||
@ -466,6 +473,7 @@ Validate that the new monorepo is safer and more coherent than the legacy setup
|
||||
- [ ] Mobile overview/alerts/positions/history
|
||||
- [ ] DRY cleanup
|
||||
- [x] Verification and cutover docs
|
||||
- [-] Backend Cosmos-first repository migration for safety-critical persistence
|
||||
|
||||
### Recommended Rollout Order
|
||||
|
||||
@ -513,8 +521,8 @@ Validate that the new monorepo is safer and more coherent than the legacy setup
|
||||
|
||||
### Recommended Cutover Approach
|
||||
|
||||
- [ ] Build target contracts in the new repo
|
||||
- [ ] Validate backend behavior in isolation
|
||||
- [x] Build target contracts in the new repo
|
||||
- [x] Validate backend behavior in isolation
|
||||
- [ ] Migrate internal web usage
|
||||
- [ ] Release mobile in controlled beta
|
||||
- [ ] Switch operational ownership only after monitoring and support confidence is established
|
||||
@ -585,7 +593,7 @@ Reason:
|
||||
|
||||
## 16. Immediate Next Steps
|
||||
|
||||
- [ ] Approve PRD and roadmap direction
|
||||
- [ ] Scaffold the root monorepo structure
|
||||
- [ ] Add `shared/product.json`, root `package.json`, `pnpm-workspace.yaml`, and `.env.example`
|
||||
- [ ] Define backend contract skeleton before porting large amounts of legacy code
|
||||
- [ ] Finish profile risk/PnL aggregate repository migration off legacy Supabase
|
||||
- [ ] Finish remaining web direct legacy data-table reads and writes behind backend APIs
|
||||
- [ ] Replace remaining transitional web auth compatibility surfaces with fully common-platform-native session handling
|
||||
- [ ] Add release smoke coverage for web auth/kill-switch and mobile auth/kill-switch flows
|
||||
|
||||
Loading…
Reference in New Issue
Block a user