learning_ai_invt_trdg/backend/reconcileNoGoQtyMismatchOnce.ts

426 lines
16 KiB
TypeScript

import 'dotenv/config';
import { createHash, randomUUID } from 'crypto';
import { config, loadDynamicConfig } from '../src/config/index.js';
import { ConnectorFactory } from '../src/connectors/factory.js';
import { normalizeOrderAction, normalizeTradeSide } from '../src/domain/tradingEnums.js';
import { healthTracker } from '../src/services/healthTracker.js';
import {
ReconciliationBackfillAuditInsert,
ReconciliationBackfillOrderInsert,
supabaseService
} from '../src/services/SupabaseService.js';
import { TradeExecutor } from '../src/services/TradeExecutor.js';
import logger from '../src/utils/logger.js';
import { buildAlpacaSubTag } from '../src/utils/alpacaSubTag.js';
type CliOptions = {
apply: boolean;
profileIds: Set<string>;
};
type ExchangeEvidence = {
orderId: string;
side: 'BUY' | 'SELL';
qty: number;
price: number;
filledAtIso: string;
};
type TradeSlice = {
symbol: string;
tradeId: string;
entrySide: 'BUY' | 'SELL';
entryQty: number;
exitQty: number;
openQty: number;
};
type ProposedCandidate = {
order: ReconciliationBackfillOrderInsert;
exchangeOrderId: string;
};
const EPSILON = 1e-8;
const MAX_LOOKBACK_HOURS = 240;
const NO_GO_REASON = 'missing_fill_evidence_for_large_remainder';
const parseOptions = (argv: string[]): CliOptions => {
const out: CliOptions = {
apply: false,
profileIds: new Set<string>()
};
for (const arg of argv) {
if (arg === '--apply') {
out.apply = true;
continue;
}
if (arg.startsWith('--profile=')) {
const value = String(arg.slice('--profile='.length) || '').trim();
if (value) out.profileIds.add(value);
}
}
return out;
};
const toNumber = (value: unknown): number => {
const parsed = Number(value);
return Number.isFinite(parsed) ? parsed : 0;
};
const toTimestampMs = (value: unknown): number => {
if (typeof value === 'number') {
if (Number.isFinite(value) && value > 1_000_000_000_000) return value;
if (Number.isFinite(value) && value > 0) return value * 1000;
return Date.now();
}
if (typeof value === 'string') {
const trimmed = value.trim();
if (!trimmed) return Date.now();
const numeric = Number(trimmed);
if (Number.isFinite(numeric) && numeric > 0) return toTimestampMs(numeric);
const parsed = Date.parse(trimmed);
if (Number.isFinite(parsed) && parsed > 0) return parsed;
}
return Date.now();
};
const buildBackfillOrderId = (
profileId: string,
tradeId: string,
exchangeOrderId: string,
filledAtIso: string
): string => {
const digest = createHash('md5')
.update(`${profileId}:${tradeId}:${exchangeOrderId}:${filledAtIso}`)
.digest('hex');
return `BFILL-${digest}`;
};
const expectedExitSide = (entrySide: 'BUY' | 'SELL'): 'BUY' | 'SELL' => {
return entrySide === 'BUY' ? 'SELL' : 'BUY';
};
const isPlaceholder = (value: string | undefined): boolean => {
const normalized = String(value || '').trim();
if (!normalized) return true;
return normalized === 'your_key' || normalized === 'your_secret';
};
const buildOpenTradeSlices = (rows: any[]): Map<string, TradeSlice> => {
const out = new Map<string, TradeSlice>();
const sorted = [...rows].sort((a, b) => {
const ats = toTimestampMs(a.timestamp ?? a.created_at ?? 0);
const bts = toTimestampMs(b.timestamp ?? b.created_at ?? 0);
return ats - bts;
});
for (const row of sorted) {
const tradeId = String(row.trade_id || '').trim();
if (!tradeId) continue;
const symbol = String(row.symbol || '').trim();
if (!symbol) continue;
const qty = toNumber(row.qty ?? row.quantity);
if (!(qty > EPSILON)) continue;
const side = normalizeTradeSide(String(row.side || 'BUY'));
const explicitAction = normalizeOrderAction(row.action || undefined);
let slice = out.get(tradeId);
if (!slice) {
slice = {
symbol,
tradeId,
entrySide: side,
entryQty: 0,
exitQty: 0,
openQty: 0
};
out.set(tradeId, slice);
}
const action = explicitAction || (side === slice.entrySide ? 'ENTRY' : 'EXIT');
if (action === 'ENTRY') {
if (!(slice.entryQty > EPSILON)) {
slice.entrySide = side;
}
slice.entryQty += qty;
} else {
slice.exitQty += qty;
}
slice.openQty = Number((slice.entryQty - slice.exitQty).toFixed(8));
}
for (const [tradeId, slice] of Array.from(out.entries())) {
if (!(slice.openQty > EPSILON)) out.delete(tradeId);
}
return out;
};
const normalizeExchangeEvidence = (rows: any[]): Map<string, ExchangeEvidence> => {
const out = new Map<string, ExchangeEvidence>();
for (const row of rows || []) {
const orderId = String(row?.id || row?.order_id || '').trim();
if (!orderId) continue;
const side = normalizeTradeSide(String(row?.side || 'BUY'));
const qty = toNumber(row?.filled_qty ?? row?.filledQty ?? row?.qty ?? row?.amount ?? row?.size);
const price = toNumber(row?.filled_avg_price ?? row?.avg_price ?? row?.price ?? row?.limit_price);
if (!(qty > EPSILON)) continue;
const filledAtMs = toTimestampMs(
row?.filled_at ?? row?.filledAt ?? row?.updated_at ?? row?.closed_at ?? row?.submitted_at ?? row?.timestamp
);
out.set(orderId, {
orderId,
side,
qty,
price,
filledAtIso: new Date(filledAtMs).toISOString()
});
}
return out;
};
const run = async (): Promise<void> => {
const options = parseOptions(process.argv.slice(2));
logger.silent = true;
await loadDynamicConfig(supabaseService);
healthTracker.recordTradingControl({
mode: 'PAUSED',
lastChangedBy: 'maintenance-script',
lastChangedAt: Date.now(),
reason: 'NO_GO mismatch repair cycle'
});
const [users, profiles] = await Promise.all([
supabaseService.getActiveUsers(),
supabaseService.getActiveProfiles()
]);
const userById = new Map<string, any>();
for (const user of users || []) {
const userId = String((user as any)?.user_id || '').trim();
if (!userId) continue;
userById.set(userId, user);
}
const noGoAudit = await supabaseService.getReconciliationBackfillAuditRows({
decisions: ['NO_GO'],
limit: 1000,
offset: 0
});
const seenNoGoTrade = new Set<string>();
const noGoTradeByProfile = new Map<string, Set<string>>();
for (const row of noGoAudit.rows || []) {
const reason = String(row.reason || '').trim();
if (reason !== NO_GO_REASON) continue;
const tradeId = String(row.trade_id || '').trim();
const profileId = String(row.profile_id || '').trim();
if (!tradeId || !profileId) continue;
if (options.profileIds.size > 0 && !options.profileIds.has(profileId)) continue;
const dedupe = `${profileId}::${tradeId}`;
if (seenNoGoTrade.has(dedupe)) continue;
seenNoGoTrade.add(dedupe);
const list = noGoTradeByProfile.get(profileId) || new Set<string>();
list.add(tradeId);
noGoTradeByProfile.set(profileId, list);
}
const batchId = `RECON-BFILL-NOGO-MISMATCH-${randomUUID()}`;
const proposedRows: ProposedCandidate[] = [];
const auditRows: ReconciliationBackfillAuditInsert[] = [];
const skipped: Array<Record<string, any>> = [];
for (const profile of profiles || []) {
const profileId = String(profile?.id || '').trim();
if (!profileId) continue;
const targetTrades = noGoTradeByProfile.get(profileId);
if (!targetTrades || targetTrades.size === 0) continue;
const userId = String(profile?.user_id || '').trim();
const user = userById.get(userId);
if (!user) {
skipped.push({ profileId, reason: 'user_not_found' });
continue;
}
const apiKey = config.PAPER_TRADING ? user.ALPACA_API_KEY : user.REAL_ALPACA_API_KEY;
const apiSecret = config.PAPER_TRADING ? user.ALPACA_SECRET_KEY : user.REAL_ALPACA_SECRET_KEY;
if (isPlaceholder(apiKey) || isPlaceholder(apiSecret)) {
skipped.push({ profileId, reason: 'missing_exchange_credentials' });
continue;
}
const lifecycleRows = await supabaseService.getFilledLifecycleOrdersForProfile(profileId);
const slices = buildOpenTradeSlices(lifecycleRows);
const symbols = Array.from(new Set(
Array.from(targetTrades.values())
.map((tradeId) => slices.get(tradeId)?.symbol)
.filter(Boolean) as string[]
));
if (symbols.length === 0) continue;
const connector = ConnectorFactory.getCustomConnector(config.EXECUTION_PROVIDER, apiKey, apiSecret);
const executor = new TradeExecutor(connector, undefined, userId, profileId);
executor.setProfileSettings(profile);
try {
const exchangeRows = await executor.fetchExchangeClosedOrders(symbols, MAX_LOOKBACK_HOURS);
const evidenceByOrderId = normalizeExchangeEvidence(exchangeRows || []);
for (const tradeId of targetTrades.values()) {
const slice = slices.get(tradeId);
if (!slice || !(slice.openQty > EPSILON)) continue;
const expectedSide = expectedExitSide(slice.entrySide);
const exitRows = lifecycleRows.filter((row) => {
return String(row.trade_id || '').trim() === tradeId
&& normalizeOrderAction(row.action || undefined) === 'EXIT'
&& (normalizeTradeSide(String(row.side || '')) === expectedSide);
});
let remaining = slice.openQty;
let evidenceUsed = 0;
for (const exitRow of exitRows) {
if (!(remaining > EPSILON)) break;
const orderId = String(exitRow.order_id || '').trim();
if (!orderId) continue;
const evidence = evidenceByOrderId.get(orderId);
if (!evidence) continue;
if (evidence.side !== expectedSide) continue;
const dbQty = toNumber(exitRow.qty ?? exitRow.quantity);
const exchangeQty = toNumber(evidence.qty);
const missingFromOrder = Number((exchangeQty - dbQty).toFixed(8));
if (!(missingFromOrder > EPSILON)) continue;
const applyQty = Math.min(remaining, missingFromOrder);
if (!(applyQty > EPSILON)) continue;
const filledAtIso = evidence.filledAtIso;
const backfillOrderId = buildBackfillOrderId(profileId, tradeId, orderId, filledAtIso);
const subTag = buildAlpacaSubTag({
profileId,
tradeId,
intent: 'EXIT'
}) || undefined;
proposedRows.push({
order: {
user_id: userId,
profile_id: profileId,
order_id: backfillOrderId,
symbol: slice.symbol,
type: 'market',
side: expectedSide,
qty: Number(applyQty.toFixed(8)),
quantity: Number(applyQty.toFixed(8)),
price: Number(toNumber(evidence.price).toFixed(8)),
status: 'filled',
timestamp: toTimestampMs(filledAtIso),
filled_at: filledAtIso,
trade_id: tradeId,
action: 'EXIT',
source: 'BOT',
sub_tag: subTag
},
exchangeOrderId: orderId
});
auditRows.push({
batch_id: batchId,
profile_id: profileId,
symbol: slice.symbol,
trade_id: tradeId,
exchange_order_id: orderId,
exchange_client_order_id: null,
backfill_order_id: backfillOrderId,
filled_qty: Number(applyQty.toFixed(8)),
filled_price: Number(toNumber(evidence.price).toFixed(8)),
filled_at: filledAtIso,
dry_run: !options.apply,
decision: options.apply ? 'PENDING_APPLY' : 'DRY_RUN',
reason: 'existing_exit_order_qty_mismatch',
metadata: {
openQtyBefore: slice.openQty,
expectedSide,
dbExitQty: dbQty,
exchangeFilledQty: exchangeQty,
missingFromOrder,
applyQty
}
});
remaining = Number((remaining - applyQty).toFixed(8));
evidenceUsed += 1;
}
if (remaining > EPSILON) {
skipped.push({
profileId,
tradeId,
symbol: slice.symbol,
reason: 'insufficient_same_order_evidence',
remaining,
evidenceUsed
});
}
}
} finally {
executor.dispose();
}
}
await supabaseService.insertReconciliationBackfillAuditRows(auditRows);
let insertedRows = 0;
if (options.apply && proposedRows.length > 0) {
const proposedOrderIds = proposedRows.map((row) => row.order.order_id);
const existingBefore = await supabaseService.getExistingOrderIds(proposedOrderIds);
const ok = await supabaseService.upsertReconciliationBackfillOrders(proposedRows.map((row) => row.order));
if (!ok) throw new Error('Failed to upsert mismatch backfill rows.');
const existingAfter = await supabaseService.getExistingOrderIds(proposedOrderIds);
insertedRows = proposedRows.filter((row) => !existingBefore.has(row.order.order_id) && existingAfter.has(row.order.order_id)).length;
const postAuditRows: ReconciliationBackfillAuditInsert[] = proposedRows.map((row) => ({
batch_id: batchId,
profile_id: row.order.profile_id,
symbol: row.order.symbol,
trade_id: row.order.trade_id,
exchange_order_id: row.exchangeOrderId,
exchange_client_order_id: null,
backfill_order_id: row.order.order_id,
filled_qty: row.order.qty,
filled_price: row.order.price,
filled_at: row.order.filled_at || null,
dry_run: false,
decision: existingBefore.has(row.order.order_id) ? 'SKIP_EXISTING' : 'APPLIED',
reason: existingBefore.has(row.order.order_id) ? 'already_exists' : 'inserted_existing_exit_order_qty_mismatch',
metadata: {
matchedBy: 'existing_exit_order_qty_mismatch'
},
applied_at: !existingBefore.has(row.order.order_id) ? new Date().toISOString() : null
}));
await supabaseService.insertReconciliationBackfillAuditRows(postAuditRows);
}
console.log(JSON.stringify({
mode: options.apply ? 'apply' : 'dry-run',
batchId,
proposedRows: proposedRows.length,
insertedRows,
skipped
}, null, 2));
};
run().catch((error) => {
const message = error instanceof Error ? error.message : String(error);
console.error(JSON.stringify({ error: message }, null, 2));
process.exit(1);
});