learning_ai_invt_trdg/backend/reconcileTradeHistoryLifecycle.ts

373 lines
13 KiB
TypeScript

import 'dotenv/config';
import { createClient } from '@supabase/supabase-js';
type OrderRow = {
id: string;
order_id?: string | null;
user_id?: string | null;
profile_id?: string | null;
symbol?: string | null;
trade_id?: string | null;
action?: string | null;
side?: string | null;
qty?: number | string | null;
price?: number | string | null;
status?: string | null;
created_at?: string | null;
};
type HistoryRow = {
id: string;
user_id?: string | null;
profile_id?: string | null;
symbol?: string | null;
trade_id?: string | null;
side?: string | null;
size?: number | string | null;
entry_price?: number | string | null;
exit_price?: number | string | null;
pnl?: number | string | null;
pnl_percent?: number | string | null;
reason?: string | null;
source?: string | null;
timestamp?: number | string | null;
created_at?: string | null;
};
type CanonicalTrade = {
tradeId: string;
userId: string | null;
profileId: string | null;
symbol: string;
side: 'BUY' | 'SELL';
closedQty: number;
avgEntry: number;
avgExit: number;
pnl: number;
pnlPercent: number;
};
const PAGE_SIZE = 1000;
const EPS = 1e-8;
const DEFAULT_START = '2026-02-12T00:00:00.000Z';
const CANONICAL_REASON = '[RECONCILED_CANONICAL] Order lifecycle reconciled from orders';
const ZEROED_PREFIX = '[RECONCILED_TO_ORDERS]';
const args = process.argv.slice(2);
const applyMode = args.includes('--apply');
const startArg = args.find((arg) => arg.startsWith('--start='));
const startIso = startArg ? String(startArg.split('=')[1] || '').trim() : DEFAULT_START;
const tradeIds = args.filter((arg) => !arg.startsWith('--'));
const supabaseUrl = String(process.env.SUPABASE_URL || '').trim();
const supabaseKey = String(
process.env.SUPABASE_KEY
|| process.env.SUPABASE_SERVICE_ROLE_KEY
|| process.env.SUPABASE_ANON_KEY
|| ''
).trim();
if (!supabaseUrl || !supabaseKey) {
throw new Error('Missing Supabase credentials. Expected SUPABASE_URL + SUPABASE_KEY/SUPABASE_SERVICE_ROLE_KEY.');
}
const supabase = createClient(supabaseUrl, supabaseKey);
const toNumber = (value: unknown): number => {
const num = Number(value);
return Number.isFinite(num) ? num : 0;
};
const normalizeSide = (side: string | null | undefined): 'BUY' | 'SELL' => {
const normalized = String(side || '').trim().toUpperCase();
return normalized === 'SELL' || normalized === 'SHORT' ? 'SELL' : 'BUY';
};
const normalizeAction = (action: string | null | undefined): 'ENTRY' | 'EXIT' | undefined => {
const normalized = String(action || '').trim().toUpperCase();
if (normalized === 'ENTRY' || normalized === 'EXIT') return normalized;
return undefined;
};
const inferAction = (row: OrderRow): 'ENTRY' | 'EXIT' | undefined => {
const explicit = normalizeAction(row.action);
if (explicit) return explicit;
if (String(row.trade_id || '').trim().length === 0) return undefined;
return normalizeSide(row.side) === 'BUY' ? 'ENTRY' : 'EXIT';
};
const startsWithIgnoreCase = (value: string | null | undefined, prefix: string): boolean => {
return String(value || '').toLowerCase().startsWith(prefix.toLowerCase());
};
const sortByCreatedAt = <T extends { created_at?: string | null }>(rows: T[]): T[] => {
return [...rows].sort((a, b) => {
const aTs = Date.parse(String(a.created_at || '')) || 0;
const bTs = Date.parse(String(b.created_at || '')) || 0;
return aTs - bTs;
});
};
const fetchPaged = async <T>(table: 'orders' | 'trade_history', columns: string): Promise<T[]> => {
const out: T[] = [];
let offset = 0;
for (;;) {
const { data, error } = await supabase
.from(table)
.select(columns)
.gte('created_at', startIso)
.order('created_at', { ascending: true })
.range(offset, offset + PAGE_SIZE - 1);
if (error) throw error;
const chunk = (data || []) as T[];
if (!chunk.length) break;
out.push(...chunk);
if (chunk.length < PAGE_SIZE) break;
offset += PAGE_SIZE;
}
return out;
};
const getCanonicalFromOrders = (tradeId: string, rows: OrderRow[]): CanonicalTrade | null => {
const ordered = sortByCreatedAt(rows);
if (!ordered.length) return null;
type Lot = { qty: number; price: number };
const lots: Lot[] = [];
let entrySide: 'BUY' | 'SELL' | null = null;
let closedQty = 0;
let closedEntryNotional = 0;
let closedExitNotional = 0;
let realizedPnl = 0;
let userId: string | null = null;
let profileId: string | null = null;
let symbol = '';
for (const row of ordered) {
const qty = toNumber(row.qty);
const price = toNumber(row.price);
if (!(qty > 0) || !(price > 0)) continue;
if (!userId && row.user_id) userId = row.user_id;
if (profileId === null && row.profile_id !== undefined) profileId = row.profile_id || null;
if (!symbol) symbol = String(row.symbol || '').trim();
const action = inferAction(row);
if (!action) continue;
const side = normalizeSide(row.side);
if (action === 'ENTRY') {
if (!entrySide) entrySide = side;
if (side !== entrySide) continue;
lots.push({ qty, price });
continue;
}
if (!entrySide) continue;
const expectedExitSide = entrySide === 'BUY' ? 'SELL' : 'BUY';
if (side !== expectedExitSide) continue;
let remaining = qty;
while (remaining > EPS && lots.length > 0) {
const lot = lots[0];
const closeQty = Math.min(remaining, lot.qty);
if (closeQty <= EPS) break;
lot.qty -= closeQty;
remaining -= closeQty;
closedQty += closeQty;
closedEntryNotional += closeQty * lot.price;
closedExitNotional += closeQty * price;
realizedPnl += entrySide === 'BUY'
? (price - lot.price) * closeQty
: (lot.price - price) * closeQty;
if (lot.qty <= EPS) lots.shift();
}
}
if (!(closedQty > EPS) || !(closedEntryNotional > 0) || !(closedExitNotional > 0) || !entrySide) {
return null;
}
const avgEntry = closedEntryNotional / closedQty;
const avgExit = closedExitNotional / closedQty;
const pnlPercent = avgEntry > 0
? ((avgExit - avgEntry) / avgEntry) * 100 * (entrySide === 'BUY' ? 1 : -1)
: 0;
return {
tradeId,
userId,
profileId,
symbol,
side: entrySide,
closedQty,
avgEntry,
avgExit,
pnl: realizedPnl,
pnlPercent
};
};
const run = async (): Promise<void> => {
const orderColumns = 'id,order_id,user_id,profile_id,symbol,trade_id,action,side,qty,price,status,created_at';
const historyColumns = 'id,user_id,profile_id,symbol,trade_id,side,size,entry_price,exit_price,pnl,pnl_percent,reason,source,timestamp,created_at';
let orderRows: OrderRow[] = [];
let historyRows: HistoryRow[] = [];
if (tradeIds.length > 0) {
for (const tradeId of tradeIds) {
const { data: oData, error: oError } = await supabase
.from('orders')
.select(orderColumns)
.eq('trade_id', tradeId)
.in('status', ['filled', 'partially_filled'])
.order('created_at', { ascending: true })
.limit(5000);
if (oError) throw oError;
orderRows.push(...((oData || []) as OrderRow[]));
const { data: hData, error: hError } = await supabase
.from('trade_history')
.select(historyColumns)
.eq('trade_id', tradeId)
.order('created_at', { ascending: true })
.limit(5000);
if (hError) throw hError;
historyRows.push(...((hData || []) as HistoryRow[]));
}
} else {
orderRows = await fetchPaged<OrderRow>('orders', orderColumns);
orderRows = orderRows.filter((row) => ['filled', 'partially_filled'].includes(String(row.status || '').toLowerCase()));
historyRows = await fetchPaged<HistoryRow>('trade_history', historyColumns);
}
const ordersByTrade = new Map<string, OrderRow[]>();
for (const row of orderRows) {
const tradeId = String(row.trade_id || '').trim();
if (!tradeId) continue;
const list = ordersByTrade.get(tradeId) || [];
list.push(row);
ordersByTrade.set(tradeId, list);
}
const historyByTrade = new Map<string, HistoryRow[]>();
for (const row of historyRows) {
const tradeId = String(row.trade_id || '').trim();
if (!tradeId) continue;
const list = historyByTrade.get(tradeId) || [];
list.push(row);
historyByTrade.set(tradeId, list);
}
const targets: Array<{
tradeId: string;
canonical: CanonicalTrade;
currentPnl: number;
diff: number;
history: HistoryRow[];
}> = [];
for (const [tradeId, rows] of ordersByTrade.entries()) {
const canonical = getCanonicalFromOrders(tradeId, rows);
if (!canonical) continue;
const history = historyByTrade.get(tradeId) || [];
const currentPnl = history.reduce((sum, row) => sum + toNumber(row.pnl), 0);
const diff = Number((canonical.pnl - currentPnl).toFixed(8));
if (Math.abs(diff) <= 0.02) continue;
targets.push({
tradeId,
canonical,
currentPnl,
diff,
history
});
}
targets.sort((a, b) => Math.abs(b.diff) - Math.abs(a.diff));
console.log(JSON.stringify({
mode: applyMode ? 'apply' : 'dry-run',
startIso,
explicitTradeIds: tradeIds.length > 0 ? tradeIds : null,
totalOrders: orderRows.length,
totalHistory: historyRows.length,
targets: targets.map((target) => ({
trade_id: target.tradeId,
current_history_pnl: Number(target.currentPnl.toFixed(8)),
canonical_order_pnl: Number(target.canonical.pnl.toFixed(8)),
diff_order_minus_history: target.diff,
history_rows: target.history.length
}))
}, null, 2));
if (!applyMode || targets.length === 0) return;
for (const target of targets) {
const rowsToNeutralize = target.history.filter((row) => {
const pnl = toNumber(row.pnl);
if (Math.abs(pnl) <= EPS) return false;
return !startsWithIgnoreCase(row.reason, ZEROED_PREFIX)
&& !startsWithIgnoreCase(row.reason, CANONICAL_REASON);
});
for (const row of rowsToNeutralize) {
const nextReason = startsWithIgnoreCase(row.reason, ZEROED_PREFIX)
? String(row.reason || '')
: `${ZEROED_PREFIX} ${String(row.reason || 'Lifecycle row superseded by canonical order-derived aggregate').trim()}`;
const { error } = await supabase
.from('trade_history')
.update({
pnl: 0,
pnl_percent: 0,
reason: nextReason
})
.eq('id', row.id);
if (error) throw error;
}
const canonicalPayload = {
user_id: target.canonical.userId,
profile_id: target.canonical.profileId,
symbol: target.canonical.symbol,
trade_id: target.canonical.tradeId,
side: target.canonical.side,
size: Number(target.canonical.closedQty.toFixed(8)),
entry_price: Number(target.canonical.avgEntry.toFixed(8)),
exit_price: Number(target.canonical.avgExit.toFixed(8)),
pnl: Number(target.canonical.pnl.toFixed(8)),
pnl_percent: Number(target.canonical.pnlPercent.toFixed(8)),
reason: CANONICAL_REASON,
source: 'BOT',
timestamp: Date.now()
};
const existingCanonical = target.history.find((row) => startsWithIgnoreCase(row.reason, CANONICAL_REASON));
if (existingCanonical) {
const { error } = await supabase
.from('trade_history')
.update(canonicalPayload)
.eq('id', existingCanonical.id);
if (error) throw error;
} else {
const { error } = await supabase
.from('trade_history')
.insert([canonicalPayload]);
if (error) throw error;
}
}
};
run().catch((error) => {
console.error('[reconcileTradeHistoryLifecycle] failed:', error);
process.exit(1);
});