learning_ai_invt_trdg/backend/reconcileCapitalLedgerState.ts

637 lines
22 KiB
TypeScript

import 'dotenv/config';
import { createClient } from '@supabase/supabase-js';
type CliOptions = {
apply: boolean;
includeInactive: boolean;
includeQuarantinedHistory: boolean;
profileIds: string[];
};
type ProfileRow = {
id: string;
name?: string | null;
is_active?: boolean | null;
allocated_capital?: number | string | null;
};
type LedgerRow = {
profile_id: string;
allocated_capital?: number | string | null;
reserved_for_orders?: number | string | null;
reserved_for_positions?: number | string | null;
realized_pnl?: number | string | null;
updated_at?: string | null;
};
type TradeHistoryRow = {
pnl?: number | string | null;
reason?: string | null;
};
type OrderRow = {
symbol?: string | null;
trade_id?: string | null;
action?: string | null;
side?: string | null;
qty?: number | string | null;
quantity?: number | string | null;
price?: number | string | null;
status?: string | null;
timestamp?: number | string | null;
created_at?: string | null;
};
type ProfileRepairReport = {
profileId: string;
profileName: string;
isActive: boolean;
allocatedCapital: number;
ordersAnalyzed: number;
historyRowsAnalyzed: number;
openOrderRowsWithoutPrice: number;
current: {
allocatedCapital: number;
reservedForOrders: number;
reservedForPositions: number;
realizedPnl: number;
};
target: {
allocatedCapital: number;
reservedForOrders: number;
reservedForPositions: number;
realizedPnl: number;
};
delta: {
allocatedCapital: number;
reservedForOrders: number;
reservedForPositions: number;
realizedPnl: number;
};
rawUtilizationBeforePct: number | null;
rawUtilizationAfterPct: number | null;
overAllocatedBefore: boolean;
overAllocatedAfter: boolean;
changed: boolean;
};
const PAGE_SIZE = 1000;
const EPSILON = 1e-8;
const OPEN_ORDER_STATUSES = new Set([
'pending_new',
'accepted',
'pending',
'new',
'partially_filled',
'partially-filled',
'partiallyfilled',
'partial_fill'
]);
const FILLED_STATUSES = new Set([
'filled',
'partially_filled',
'partially-filled',
'partiallyfilled',
'partial_fill'
]);
const parseArgs = (argv: string[]): CliOptions => {
const profileIds = new Set<string>();
let apply = false;
let includeInactive = false;
let includeQuarantinedHistory = false;
for (const raw of argv) {
const arg = String(raw || '').trim();
if (!arg) continue;
if (arg === '--apply') {
apply = true;
continue;
}
if (arg === '--include-inactive') {
includeInactive = true;
continue;
}
if (arg === '--include-quarantined-history') {
includeQuarantinedHistory = true;
continue;
}
if (arg.startsWith('--profile=')) {
const value = String(arg.slice('--profile='.length) || '').trim();
if (value) profileIds.add(value);
}
}
return {
apply,
includeInactive,
includeQuarantinedHistory,
profileIds: Array.from(profileIds)
};
};
const toNumber = (value: unknown): number => {
const parsed = Number(value);
return Number.isFinite(parsed) ? parsed : 0;
};
const round8 = (value: number): number => Number(value.toFixed(8));
const formatErrorPayload = (error: any): string => {
if (!error) return 'unknown_error';
if (error instanceof Error) {
return JSON.stringify({
message: error.message,
stack: error.stack || null
});
}
if (typeof error === 'object') {
return JSON.stringify({
message: String(error.message || ''),
code: String(error.code || ''),
details: String(error.details || ''),
hint: String(error.hint || ''),
raw: error
});
}
return JSON.stringify({ message: String(error) });
};
const failIfError = (error: any, context: string): void => {
if (!error) return;
throw new Error(`${context}: ${formatErrorPayload(error)}`);
};
const normalizeStatus = (status: unknown): string => {
const normalized = String(status || '').trim().toLowerCase();
if (normalized === 'partially-filled') return 'partially_filled';
if (normalized === 'partiallyfilled') return 'partially_filled';
if (normalized === 'partial_fill') return 'partially_filled';
return normalized;
};
const normalizeSide = (side: unknown): 'BUY' | 'SELL' => {
const normalized = String(side || '').trim().toUpperCase();
return normalized === 'SELL' || normalized === 'SHORT' ? 'SELL' : 'BUY';
};
const normalizeAction = (action: unknown): 'ENTRY' | 'EXIT' | undefined => {
const normalized = String(action || '').trim().toUpperCase();
if (normalized === 'ENTRY' || normalized === 'EXIT') return normalized;
return undefined;
};
const inferAction = (row: OrderRow, knownEntrySide?: 'BUY' | 'SELL'): 'ENTRY' | 'EXIT' => {
const explicit = normalizeAction(row.action);
if (explicit) return explicit;
const side = normalizeSide(row.side);
if (knownEntrySide) {
return side === knownEntrySide ? 'ENTRY' : 'EXIT';
}
return side === 'BUY' ? 'ENTRY' : 'EXIT';
};
const normalizeExecutionScopeSymbol = (symbol: unknown, provider: string): string => {
const raw = String(symbol || '')
.trim()
.toUpperCase()
.replace(/[\/_\-\s]/g, '');
if (!raw) return '';
if (provider === 'alpaca' && raw.endsWith('USDT')) {
return `${raw.slice(0, -4)}USD`;
}
return raw;
};
const toTimestamp = (row: OrderRow, fallback: number): number => {
const ts = Number(row.timestamp);
if (Number.isFinite(ts) && ts > 0) {
return ts > 1_000_000_000_000 ? ts : ts * 1000;
}
const created = Date.parse(String(row.created_at || ''));
if (Number.isFinite(created) && created > 0) return created;
return fallback;
};
const isQuarantinedHistoryReason = (reason: unknown): boolean => {
const normalized = String(reason || '').trim().toUpperCase();
return normalized.startsWith('[INVALID_')
|| normalized.startsWith('[DUPLICATE_')
|| normalized.startsWith('[RECONCILED_TO_');
};
const fetchPagedByProfile = async <T>(
supabase: any,
table: 'orders' | 'trade_history',
columns: string,
profileId: string
): Promise<T[]> => {
const rows: T[] = [];
let offset = 0;
for (;;) {
const { data, error } = await supabase
.from(table)
.select(columns)
.eq('profile_id', profileId)
.order('created_at', { ascending: true })
.range(offset, offset + PAGE_SIZE - 1);
failIfError(error, `fetchPagedByProfile:${table}:${profileId}`);
const chunk = (data || []) as T[];
if (!chunk.length) break;
rows.push(...chunk);
if (chunk.length < PAGE_SIZE) break;
offset += PAGE_SIZE;
}
return rows;
};
const fetchOrdersForProfile = async (
supabase: any,
profileId: string
): Promise<OrderRow[]> => {
const withQuantity = 'symbol,trade_id,action,side,qty,quantity,price,status,timestamp,created_at';
const withoutQuantity = 'symbol,trade_id,action,side,qty,price,status,timestamp,created_at';
try {
return await fetchPagedByProfile<OrderRow>(supabase, 'orders', withQuantity, profileId);
} catch (error: any) {
const message = String(error?.message || '').toLowerCase();
if (message.includes('column') && message.includes('quantity')) {
return await fetchPagedByProfile<OrderRow>(supabase, 'orders', withoutQuantity, profileId);
}
throw error;
}
};
const computeReservedForOpenEntryOrders = (orders: OrderRow[]): {
reserved: number;
rowsWithoutPrice: number;
} => {
let reserved = 0;
let rowsWithoutPrice = 0;
for (const row of orders) {
const status = normalizeStatus(row.status);
if (!OPEN_ORDER_STATUSES.has(status)) continue;
const action = inferAction(row);
if (action !== 'ENTRY') continue;
const qty = toNumber(row.qty);
const quantity = qty > 0 ? qty : toNumber(row.quantity);
if (!(quantity > 0)) continue;
const price = toNumber(row.price);
if (!(price > 0)) {
rowsWithoutPrice += 1;
continue;
}
reserved += quantity * price;
}
return {
reserved: round8(reserved),
rowsWithoutPrice
};
};
const computeOpenPositionNotional = (orders: OrderRow[], provider: string): number => {
type TradeLedger = {
side: 'BUY' | 'SELL';
entryQty: number;
entryNotional: number;
entryLastPrice: number;
exitQty: number;
};
const byExecutionSymbol = new Map<string, Array<{ row: OrderRow; ts: number; idx: number }>>();
let rowIndex = 0;
for (const row of orders) {
const status = normalizeStatus(row.status);
if (!FILLED_STATUSES.has(status)) continue;
const qty = toNumber(row.qty);
const quantity = qty > 0 ? qty : toNumber(row.quantity);
if (!(quantity > 0)) continue;
const symbolKey = normalizeExecutionScopeSymbol(row.symbol, provider);
if (!symbolKey) continue;
const bucket = byExecutionSymbol.get(symbolKey) || [];
bucket.push({
row,
ts: toTimestamp(row, rowIndex),
idx: rowIndex
});
byExecutionSymbol.set(symbolKey, bucket);
rowIndex += 1;
}
let reservedNotional = 0;
for (const [symbolKey, bucket] of Array.from(byExecutionSymbol.entries())) {
const ordered = [...bucket].sort((a, b) => (a.ts - b.ts) || (a.idx - b.idx));
const ledgerByTrade = new Map<string, TradeLedger>();
const entrySideByTrade = new Map<string, 'BUY' | 'SELL'>();
const openQueueBySide: Record<'BUY' | 'SELL', string[]> = { BUY: [], SELL: [] };
let syntheticCounter = 0;
const buildSyntheticTradeId = (side: 'BUY' | 'SELL', ts: number): string => {
syntheticCounter += 1;
const tsToken = Number.isFinite(ts) && ts > 0 ? Math.trunc(ts) : syntheticCounter;
return `__legacy__-${symbolKey}-${side}-${tsToken}-${String(syntheticCounter).padStart(4, '0')}`;
};
for (const wrapped of ordered) {
const row = wrapped.row;
const qtyRaw = toNumber(row.qty);
const qty = qtyRaw > 0 ? qtyRaw : toNumber(row.quantity);
if (!(qty > 0)) continue;
const rowSide = normalizeSide(row.side);
const oppositeSide: 'BUY' | 'SELL' = rowSide === 'BUY' ? 'SELL' : 'BUY';
const explicitAction = normalizeAction(row.action);
let action = explicitAction;
let tradeId = String(row.trade_id || '').trim();
if (!action && !tradeId) {
action = openQueueBySide[oppositeSide].length > 0 ? 'EXIT' : 'ENTRY';
}
if (!tradeId) {
if (action === 'EXIT' && openQueueBySide[oppositeSide].length > 0) {
tradeId = openQueueBySide[oppositeSide][0];
} else {
tradeId = buildSyntheticTradeId(action === 'EXIT' ? oppositeSide : rowSide, wrapped.ts);
}
}
if (!action) {
action = inferAction(row, entrySideByTrade.get(tradeId));
}
let tradeLedger = ledgerByTrade.get(tradeId);
if (!tradeLedger) {
tradeLedger = {
side: rowSide,
entryQty: 0,
entryNotional: 0,
entryLastPrice: 0,
exitQty: 0
};
ledgerByTrade.set(tradeId, tradeLedger);
}
if (action === 'ENTRY') {
if (tradeLedger.entryQty <= EPSILON) {
tradeLedger.side = rowSide;
}
tradeLedger.entryQty += qty;
entrySideByTrade.set(tradeId, tradeLedger.side);
if (!openQueueBySide[tradeLedger.side].includes(tradeId)) {
openQueueBySide[tradeLedger.side].push(tradeId);
}
const price = toNumber(row.price);
if (price > 0) {
tradeLedger.entryNotional += price * qty;
tradeLedger.entryLastPrice = price;
}
} else {
tradeLedger.exitQty += qty;
const queue = openQueueBySide[oppositeSide];
const idx = queue.findIndex((queuedTradeId) => queuedTradeId === tradeId);
if (idx >= 0) {
queue.splice(idx, 1);
} else if (queue.length > 0) {
queue.shift();
}
}
}
for (const tradeLedger of Array.from(ledgerByTrade.values())) {
const remainingQty = tradeLedger.entryQty - tradeLedger.exitQty;
if (!(remainingQty > EPSILON)) continue;
const weightedEntryPrice = tradeLedger.entryQty > EPSILON && tradeLedger.entryNotional > EPSILON
? tradeLedger.entryNotional / tradeLedger.entryQty
: tradeLedger.entryLastPrice;
if (!(weightedEntryPrice > 0)) continue;
reservedNotional += remainingQty * weightedEntryPrice;
}
}
return round8(reservedNotional);
};
const run = async (): Promise<void> => {
const options = parseArgs(process.argv.slice(2));
const supabaseUrl = String(process.env.SUPABASE_URL || '').trim();
const supabaseKey = String(
process.env.SUPABASE_KEY
|| process.env.SUPABASE_SERVICE_ROLE_KEY
|| process.env.SUPABASE_ANON_KEY
|| ''
).trim();
if (!supabaseUrl || !supabaseKey) {
throw new Error('Missing Supabase credentials. Expected SUPABASE_URL + SUPABASE_KEY/SUPABASE_SERVICE_ROLE_KEY.');
}
const provider = String(process.env.EXECUTION_PROVIDER || process.env.PROVIDER || 'alpaca').trim().toLowerCase() || 'alpaca';
const supabase = createClient(supabaseUrl, supabaseKey);
let profileQuery = supabase
.from('trade_profiles')
.select('id,name,is_active,allocated_capital')
.order('created_at', { ascending: true });
if (!options.includeInactive) {
profileQuery = profileQuery.eq('is_active', true);
}
if (options.profileIds.length > 0) {
profileQuery = profileQuery.in('id', options.profileIds);
}
const { data: profileData, error: profileError } = await profileQuery;
failIfError(profileError, 'load_profiles');
const profiles = (profileData || []) as ProfileRow[];
const profileIds = profiles.map((profile) => String(profile.id || '').trim()).filter(Boolean);
if (profileIds.length === 0) {
console.log(JSON.stringify({
mode: options.apply ? 'apply' : 'dry-run',
provider,
profilesProcessed: 0,
message: 'No matching profiles found.'
}, null, 2));
return;
}
const { data: ledgerData, error: ledgerError } = await supabase
.from('capital_ledgers')
.select('profile_id,allocated_capital,reserved_for_orders,reserved_for_positions,realized_pnl,updated_at')
.in('profile_id', profileIds);
failIfError(ledgerError, 'load_capital_ledgers');
const ledgerByProfile = new Map<string, LedgerRow>();
for (const row of (ledgerData || []) as LedgerRow[]) {
const profileId = String(row.profile_id || '').trim();
if (!profileId) continue;
ledgerByProfile.set(profileId, row);
}
const reports: ProfileRepairReport[] = [];
for (const profile of profiles) {
const profileId = String(profile.id || '').trim();
if (!profileId) continue;
const [historyRows, orders] = await Promise.all([
fetchPagedByProfile<TradeHistoryRow>(supabase, 'trade_history', 'pnl,reason', profileId),
fetchOrdersForProfile(supabase, profileId)
]);
const realizedFromHistory = historyRows.reduce((sum, row) => {
if (!options.includeQuarantinedHistory && isQuarantinedHistoryReason(row.reason)) {
return sum;
}
return sum + toNumber(row.pnl);
}, 0);
const openOrders = computeReservedForOpenEntryOrders(orders);
const reservedPositions = computeOpenPositionNotional(orders, provider);
const allocatedCapital = toNumber(profile.allocated_capital);
const currentLedger = ledgerByProfile.get(profileId);
const currentAllocated = toNumber(currentLedger?.allocated_capital);
const currentReservedOrders = toNumber(currentLedger?.reserved_for_orders);
const currentReservedPositions = toNumber(currentLedger?.reserved_for_positions);
const currentRealized = toNumber(currentLedger?.realized_pnl);
const targetAllocated = round8(allocatedCapital);
const targetReservedOrders = round8(openOrders.reserved);
const targetReservedPositions = round8(reservedPositions);
const targetRealized = round8(realizedFromHistory);
const deltaAllocated = round8(targetAllocated - currentAllocated);
const deltaReservedOrders = round8(targetReservedOrders - currentReservedOrders);
const deltaReservedPositions = round8(targetReservedPositions - currentReservedPositions);
const deltaRealized = round8(targetRealized - currentRealized);
const changed = !currentLedger
|| Math.abs(deltaAllocated) > 0.01
|| Math.abs(deltaReservedOrders) > 0.01
|| Math.abs(deltaReservedPositions) > 0.01
|| Math.abs(deltaRealized) > 0.01;
const utilizationBefore = currentAllocated > 0
? ((currentReservedOrders + currentReservedPositions) / currentAllocated) * 100
: null;
const utilizationAfter = targetAllocated > 0
? ((targetReservedOrders + targetReservedPositions) / targetAllocated) * 100
: null;
const overAllocatedBefore = currentAllocated > 0
? currentReservedOrders + currentReservedPositions > currentAllocated + EPSILON
: false;
const overAllocatedAfter = targetAllocated > 0
? targetReservedOrders + targetReservedPositions > targetAllocated + EPSILON
: false;
reports.push({
profileId,
profileName: String(profile.name || ''),
isActive: Boolean(profile.is_active),
allocatedCapital: targetAllocated,
ordersAnalyzed: orders.length,
historyRowsAnalyzed: historyRows.length,
openOrderRowsWithoutPrice: openOrders.rowsWithoutPrice,
current: {
allocatedCapital: round8(currentAllocated),
reservedForOrders: round8(currentReservedOrders),
reservedForPositions: round8(currentReservedPositions),
realizedPnl: round8(currentRealized)
},
target: {
allocatedCapital: targetAllocated,
reservedForOrders: targetReservedOrders,
reservedForPositions: targetReservedPositions,
realizedPnl: targetRealized
},
delta: {
allocatedCapital: deltaAllocated,
reservedForOrders: deltaReservedOrders,
reservedForPositions: deltaReservedPositions,
realizedPnl: deltaRealized
},
rawUtilizationBeforePct: utilizationBefore === null ? null : round8(utilizationBefore),
rawUtilizationAfterPct: utilizationAfter === null ? null : round8(utilizationAfter),
overAllocatedBefore,
overAllocatedAfter,
changed
});
}
const changedReports = reports.filter((report) => report.changed);
const payload = changedReports.map((report) => ({
profile_id: report.profileId,
allocated_capital: report.target.allocatedCapital,
reserved_for_orders: report.target.reservedForOrders,
reserved_for_positions: report.target.reservedForPositions,
realized_pnl: report.target.realizedPnl,
updated_at: new Date().toISOString()
}));
let appliedRows = 0;
if (options.apply && payload.length > 0) {
const { error: upsertError } = await supabase
.from('capital_ledgers')
.upsert(payload, { onConflict: 'profile_id' });
failIfError(upsertError, 'upsert_capital_ledgers');
appliedRows = payload.length;
}
const summary = {
mode: options.apply ? 'apply' : 'dry-run',
provider,
includeInactive: options.includeInactive,
includeQuarantinedHistory: options.includeQuarantinedHistory,
profilesProcessed: reports.length,
profilesChanged: changedReports.length,
appliedRows,
overAllocatedBeforeCount: reports.filter((report) => report.overAllocatedBefore).length,
overAllocatedAfterCount: reports.filter((report) => report.overAllocatedAfter).length,
totals: {
realizedDeltaApplied: round8(changedReports.reduce((sum, report) => sum + report.delta.realizedPnl, 0)),
reservedOrdersDeltaApplied: round8(changedReports.reduce((sum, report) => sum + report.delta.reservedForOrders, 0)),
reservedPositionsDeltaApplied: round8(changedReports.reduce((sum, report) => sum + report.delta.reservedForPositions, 0))
},
reports: reports.sort((left, right) => {
const leftScore = Math.abs(left.delta.realizedPnl) + Math.abs(left.delta.reservedForPositions);
const rightScore = Math.abs(right.delta.realizedPnl) + Math.abs(right.delta.reservedForPositions);
return rightScore - leftScore;
})
};
console.log(JSON.stringify(summary, null, 2));
};
run().catch((error) => {
console.error(JSON.stringify({
error: formatErrorPayload(error)
}, null, 2));
process.exit(1);
});