learning_ai_invt_trdg/backend/src/services/reconciliationExitBackfillService.ts

954 lines
36 KiB
TypeScript

import { createHash, randomUUID } from 'crypto';
import { config } from '../config/index.js';
import logger from '../utils/logger.js';
import { normalizeOrderAction, normalizeOrderStatus, normalizeTradeSide } from '../domain/tradingEnums.js';
import { healthTracker } from './healthTracker.js';
import { observabilityService } from './observabilityService.js';
import type {
FilledLifecycleOrderRow,
ReconciliationBackfillAuditInsert,
ReconciliationBackfillOrderInsert
} from './tradingPersistenceTypes.js';
import * as runtimeOrderRepository from './runtimeOrderRepository.js';
import type { TradeExecutor } from './TradeExecutor.js';
import {
extractOrderSubTag,
isBytelystSubTag,
subTagBelongsToProfile,
subTagHintsTrade
} from '../utils/alpacaSubTag.js';
import {
buildManagedBotSymbolTokenSet,
isManagedBotSymbol,
normalizeBotSymbolToken
} from '../utils/botSymbolScope.js';
const EPSILON = 1e-8;
type MatchMethod =
| 'trade_id'
| 'sub_tag'
| 'client_order_id'
| 'qty_unique'
| 'single_open_trade';
type OpenTradeSlice = {
profileId: string;
userId: string;
symbol: string;
symbolToken: string;
tradeId: string;
entrySide: 'BUY' | 'SELL';
entryQty: number;
exitQty: number;
openQty: number;
latestTimestampMs: number;
};
type ExchangeFillEvidence = {
symbolToken: string;
exchangeOrderId: string;
clientOrderId: string;
tradeIdHint: string;
subTag: string;
side: 'BUY' | 'SELL';
qty: number;
price: number;
filledAtIso: string;
filledAtMs: number;
};
type AssignedEvidence = ExchangeFillEvidence & {
matchedBy: MatchMethod;
};
type ProposedBackfillCandidate = {
order: ReconciliationBackfillOrderInsert;
exchangeOrderId: string;
exchangeClientOrderId?: string;
exchangeSubTag?: string;
matchedBy: MatchMethod | 'dust';
};
export interface ReconciliationExitBackfillContext {
profileId: string;
userId: string;
executor: TradeExecutor;
}
export interface ReconciliationExitBackfillResult {
attempted: boolean;
skippedReason?: string;
batchId?: string;
dryRun: boolean;
openTradeCandidates: number;
proposedRows: number;
insertedRows: number;
noGoTrades: number;
noGoReasonCounts: Record<string, number>;
noGoSamples: Array<{
symbol: string;
tradeId: string;
reason: string;
}>;
}
const toNumber = (value: unknown): number => {
const parsed = Number(value);
return Number.isFinite(parsed) ? parsed : 0;
};
const toTimestampMs = (value: unknown, fallback: number): number => {
if (typeof value === 'number') {
if (Number.isFinite(value) && value > 1_000_000_000_000) return value;
if (Number.isFinite(value) && value > 0) return value * 1000;
return fallback;
}
if (typeof value === 'string') {
if (/^\d+(\.\d+)?$/.test(value.trim())) {
return toTimestampMs(Number(value.trim()), fallback);
}
const parsed = Date.parse(value);
if (Number.isFinite(parsed) && parsed > 0) return parsed;
}
return fallback;
};
const toIso = (timestampMs: number): string => {
const safeTs = Number.isFinite(timestampMs) && timestampMs > 0 ? timestampMs : Date.now();
return new Date(safeTs).toISOString();
};
const positionQty = (position: any): number => {
const candidates = [
position?.qty,
position?.size,
position?.position_qty,
position?.positionQty,
position?.contracts
];
for (const candidate of candidates) {
const parsed = Math.abs(toNumber(candidate));
if (parsed > 0) return parsed;
}
return 0;
};
const fillQty = (order: any): number => {
const candidates = [
order?.filled_qty,
order?.filledQty,
order?.filled_quantity,
order?.filled,
order?.qty,
order?.amount,
order?.size
];
for (const candidate of candidates) {
const parsed = toNumber(candidate);
if (parsed > 0) return parsed;
}
return 0;
};
const fillPrice = (order: any): number => {
const candidates = [order?.filled_avg_price, order?.avg_price, order?.price, order?.limit_price];
for (const candidate of candidates) {
const parsed = toNumber(candidate);
if (parsed > 0) return parsed;
}
return 0;
};
const filledAtMs = (order: any): number => {
const candidates = [
order?.filled_at,
order?.filledAt,
order?.updated_at,
order?.closed_at,
order?.timestamp,
order?.submitted_at
];
for (const candidate of candidates) {
const parsed = toTimestampMs(candidate, 0);
if (parsed > 0) return parsed;
}
return Date.now();
};
const parseProfileAllowlist = (): Set<string> => {
const values = Array.isArray(config.RECON_EXIT_BACKFILL_PROFILE_ALLOWLIST)
? config.RECON_EXIT_BACKFILL_PROFILE_ALLOWLIST
: [];
return new Set(values.map((value) => String(value || '').trim()).filter(Boolean));
};
const buildBackfillOrderId = (
profileId: string,
tradeId: string,
exchangeOrderId: string,
filledAtIso: string
): string => {
const digest = createHash('md5')
.update(`${profileId}:${tradeId}:${exchangeOrderId}:${filledAtIso}`)
.digest('hex');
return `BFILL-${digest}`;
};
const normalizeEvidenceOrders = (symbolKey: string, exchangeOrders: any[]): ExchangeFillEvidence[] => {
const out: ExchangeFillEvidence[] = [];
for (const order of exchangeOrders || []) {
const status = normalizeOrderStatus(String(order?.status || ''));
if (status !== 'filled' && status !== 'partially_filled') continue;
const qty = fillQty(order);
if (!(qty > EPSILON)) continue;
const orderSymbolToken = normalizeBotSymbolToken(order?.symbol);
if (orderSymbolToken && orderSymbolToken !== symbolKey) continue;
const orderId = String(order?.id || order?.order_id || '').trim();
if (!orderId) continue;
const side = normalizeTradeSide(String(order?.side || 'BUY'));
const ts = filledAtMs(order);
out.push({
symbolToken: symbolKey,
exchangeOrderId: orderId,
clientOrderId: String(order?.client_order_id || order?.clientOrderId || '').trim(),
tradeIdHint: String(order?.trade_id || order?.tradeId || '').trim(),
subTag: extractOrderSubTag(order),
side,
qty,
price: fillPrice(order),
filledAtIso: toIso(ts),
filledAtMs: ts
});
}
return out.sort((a, b) => a.filledAtMs - b.filledAtMs);
};
const buildOpenTradeSlices = (profileId: string, rows: FilledLifecycleOrderRow[]): OpenTradeSlice[] => {
type Ledger = {
profileId: string;
userId: string;
symbol: string;
symbolToken: string;
tradeId: string;
entrySide: 'BUY' | 'SELL';
entryQty: number;
exitQty: number;
latestTimestampMs: number;
};
const ledgerByTrade = new Map<string, Ledger>();
const sorted = [...rows].sort((a, b) => {
const aTs = toTimestampMs(a.timestamp ?? a.created_at ?? 0, 0);
const bTs = toTimestampMs(b.timestamp ?? b.created_at ?? 0, 0);
return aTs - bTs;
});
for (const row of sorted) {
const tradeId = String(row.trade_id || '').trim();
if (!tradeId) continue;
const symbol = String(row.symbol || '').trim();
if (!symbol) continue;
const qty = toNumber(row.qty ?? row.quantity);
if (!(qty > EPSILON)) continue;
const side = normalizeTradeSide(String(row.side || 'BUY'));
const explicitAction = normalizeOrderAction(row.action || undefined);
const key = `${profileId}::${tradeId}`;
const ts = toTimestampMs(row.timestamp ?? row.created_at ?? 0, Date.now());
let ledger = ledgerByTrade.get(key);
if (!ledger) {
ledger = {
profileId,
userId: String(row.user_id || '').trim(),
symbol,
symbolToken: normalizeBotSymbolToken(symbol),
tradeId,
entrySide: side,
entryQty: 0,
exitQty: 0,
latestTimestampMs: ts
};
ledgerByTrade.set(key, ledger);
}
const action = explicitAction || (side === ledger.entrySide ? 'ENTRY' : 'EXIT');
if (action === 'ENTRY') {
if (!(ledger.entryQty > EPSILON)) {
ledger.entrySide = side;
}
ledger.entryQty += qty;
} else {
ledger.exitQty += qty;
}
ledger.latestTimestampMs = Math.max(ledger.latestTimestampMs, ts);
if (!ledger.userId) {
ledger.userId = String(row.user_id || '').trim();
}
}
const out: OpenTradeSlice[] = [];
for (const ledger of ledgerByTrade.values()) {
const openQty = Number((ledger.entryQty - ledger.exitQty).toFixed(8));
if (!(openQty > EPSILON)) continue;
out.push({
profileId: ledger.profileId,
userId: ledger.userId,
symbol: ledger.symbol,
symbolToken: ledger.symbolToken,
tradeId: ledger.tradeId,
entrySide: ledger.entrySide,
entryQty: ledger.entryQty,
exitQty: ledger.exitQty,
openQty,
latestTimestampMs: ledger.latestTimestampMs
});
}
return out;
};
const expectedExitSide = (entrySide: 'BUY' | 'SELL'): 'BUY' | 'SELL' => {
return entrySide === 'BUY' ? 'SELL' : 'BUY';
};
const startsWithCaseInsensitive = (value: string, prefix: string): boolean => {
return value.slice(0, prefix.length).toLowerCase() === prefix.toLowerCase();
};
const parseTradeFromClientOrderId = (
profileId: string,
clientOrderIdRaw: string
): { tradeId: string } | null => {
const clientOrderId = String(clientOrderIdRaw || '').trim();
if (!clientOrderId) return null;
const prefix = `bytelyst-${profileId}-`;
if (!startsWithCaseInsensitive(clientOrderId, prefix)) return null;
const suffix = clientOrderId.slice(prefix.length).trim();
if (!suffix) return null;
const lower = suffix.toLowerCase();
if (lower.endsWith('-exit')) {
const tradeId = suffix.slice(0, -5).trim();
return tradeId ? { tradeId } : null;
}
if (lower.endsWith('-entry')) {
const tradeId = suffix.slice(0, -6).trim();
return tradeId ? { tradeId } : null;
}
return { tradeId: suffix };
};
const dustThresholdForTrade = (openQty: number): number => {
const absThreshold = Math.max(0, toNumber(config.RECON_EXIT_BACKFILL_DUST_ABS_QTY));
const relThreshold = Math.max(0, toNumber(config.RECON_EXIT_BACKFILL_DUST_REL_PCT)) * Math.max(openQty, 0);
return Math.max(absThreshold, relThreshold);
};
const summarizeNoGoReasons = (rows: ReconciliationBackfillAuditInsert[]): Record<string, number> => {
const summary: Record<string, number> = {};
for (const row of rows || []) {
const key = String(row?.reason || 'unknown').trim() || 'unknown';
summary[key] = (summary[key] || 0) + 1;
}
return summary;
};
const noGoSamples = (rows: ReconciliationBackfillAuditInsert[], limit = 5): Array<{ symbol: string; tradeId: string; reason: string }> => {
return (rows || [])
.slice(0, Math.max(0, limit))
.map((row) => ({
symbol: String(row?.symbol || '').trim(),
tradeId: String(row?.trade_id || '').trim(),
reason: String(row?.reason || '').trim() || 'unknown'
}));
};
const isTradeTemporallyEligible = (
trade: OpenTradeSlice,
row: ExchangeFillEvidence,
graceMs: number
): boolean => {
const tradeTs = toNumber(trade.latestTimestampMs);
const rowTs = toNumber(row.filledAtMs);
if (!(tradeTs > 0) || !(rowTs > 0)) return true;
const safeGraceMs = Math.max(0, Math.floor(graceMs));
return rowTs + safeGraceMs >= tradeTs;
};
const allocateEvidence = (
trades: OpenTradeSlice[],
evidence: ExchangeFillEvidence[],
profileId: string,
options: {
allowHeuristicMatch: boolean;
fillAfterTradeGraceMs: number;
}
): {
byTrade: Map<string, AssignedEvidence[]>;
unmatched: ExchangeFillEvidence[];
} => {
const byTrade = new Map<string, AssignedEvidence[]>();
const unmatched: ExchangeFillEvidence[] = [];
const remainingByTrade = new Map<string, number>();
const tradeById = new Map<string, OpenTradeSlice>();
trades.forEach((trade) => {
remainingByTrade.set(trade.tradeId, trade.openQty);
tradeById.set(trade.tradeId, trade);
});
const assign = (tradeId: string, row: ExchangeFillEvidence, matchedBy: MatchMethod): boolean => {
const trade = tradeById.get(tradeId);
if (!trade) return false;
const expectedSide = expectedExitSide(trade.entrySide);
if (row.side !== expectedSide) return false;
if (!isTradeTemporallyEligible(trade, row, options.fillAfterTradeGraceMs)) return false;
const remaining = toNumber(remainingByTrade.get(tradeId));
if (!(remaining > EPSILON)) return false;
const list = byTrade.get(tradeId) || [];
list.push({ ...row, matchedBy });
byTrade.set(tradeId, list);
remainingByTrade.set(tradeId, Math.max(0, remaining - row.qty));
return true;
};
const unassigned = [...evidence];
// Pass 1: explicit trade_id from exchange order payload.
for (let i = unassigned.length - 1; i >= 0; i--) {
const row = unassigned[i];
const hinted = String(row.tradeIdHint || '').trim();
if (!hinted) continue;
if (assign(hinted, row, 'trade_id')) {
unassigned.splice(i, 1);
}
}
// Pass 2: bytelyst sub-tag carries deterministic trade token.
for (let i = unassigned.length - 1; i >= 0; i--) {
const row = unassigned[i];
if (!row.subTag || !isBytelystSubTag(row.subTag)) continue;
if (!subTagBelongsToProfile(row.subTag, profileId)) continue;
const matchingTrades = trades.filter((trade) => subTagHintsTrade(row.subTag, trade.tradeId));
if (matchingTrades.length !== 1) continue;
if (assign(matchingTrades[0].tradeId, row, 'sub_tag')) {
unassigned.splice(i, 1);
}
}
// Pass 3: client_order_id contains trade_id marker.
for (let i = unassigned.length - 1; i >= 0; i--) {
const row = unassigned[i];
const resolved = parseTradeFromClientOrderId(profileId, row.clientOrderId);
if (!resolved?.tradeId) continue;
if (assign(resolved.tradeId, row, 'client_order_id')) {
unassigned.splice(i, 1);
}
}
// Pass 4: if multiple trades are open, assign only when a unique qty match exists.
if (options.allowHeuristicMatch) {
for (let i = unassigned.length - 1; i >= 0; i--) {
const row = unassigned[i];
const candidates = trades.filter((trade) => {
if (row.side !== expectedExitSide(trade.entrySide)) return false;
if (!isTradeTemporallyEligible(trade, row, options.fillAfterTradeGraceMs)) return false;
const remaining = toNumber(remainingByTrade.get(trade.tradeId));
if (!(remaining > EPSILON)) return false;
const threshold = dustThresholdForTrade(remaining);
return Math.abs(remaining - row.qty) <= threshold;
});
if (candidates.length !== 1) continue;
if (assign(candidates[0].tradeId, row, 'qty_unique')) {
unassigned.splice(i, 1);
}
}
// Pass 5: safe fallback for single-trade symbols.
if (trades.length === 1 && unassigned.length > 0) {
const tradeId = trades[0].tradeId;
for (let i = unassigned.length - 1; i >= 0; i--) {
const row = unassigned[i];
if (assign(tradeId, row, 'single_open_trade')) {
unassigned.splice(i, 1);
}
}
}
}
unmatched.push(...unassigned);
return { byTrade, unmatched };
};
export class ReconciliationExitBackfillService {
async runProfile(ctx: ReconciliationExitBackfillContext): Promise<ReconciliationExitBackfillResult> {
const dryRun = Boolean(config.RECON_EXIT_BACKFILL_DRY_RUN);
const requireStrongAttribution = Boolean(config.RECON_EXIT_BACKFILL_REQUIRE_STRONG_ATTRIBUTION);
const allowHeuristicMatch = Boolean(config.RECON_EXIT_BACKFILL_ALLOW_HEURISTIC_MATCH);
const fillAfterTradeGraceMs = Math.max(
0,
Math.floor(Number(config.RECON_EXIT_BACKFILL_FILL_AFTER_TRADE_GRACE_MINUTES || 5)) * 60 * 1000
);
const profileAllowlist = parseProfileAllowlist();
const profileId = String(ctx.profileId || '').trim();
if (!config.ENABLE_RECON_EXIT_BACKFILL) {
return {
attempted: false,
skippedReason: 'disabled',
dryRun,
openTradeCandidates: 0,
proposedRows: 0,
insertedRows: 0,
noGoTrades: 0,
noGoReasonCounts: {},
noGoSamples: []
};
}
if (profileAllowlist.size > 0 && !profileAllowlist.has(profileId)) {
return {
attempted: false,
skippedReason: 'profile_not_allowlisted',
dryRun,
openTradeCandidates: 0,
proposedRows: 0,
insertedRows: 0,
noGoTrades: 0,
noGoReasonCounts: {},
noGoSamples: []
};
}
if (config.RECON_EXIT_BACKFILL_REQUIRE_PAUSE && !healthTracker.isPaused()) {
return {
attempted: false,
skippedReason: 'pause_required',
dryRun,
openTradeCandidates: 0,
proposedRows: 0,
insertedRows: 0,
noGoTrades: 0,
noGoReasonCounts: {},
noGoSamples: []
};
}
const auditReady = await runtimeOrderRepository.isReconciliationBackfillAuditAvailable();
if (!auditReady) {
observabilityService.emitEvent({
type: 'SYSTEM_ERROR',
severity: 'ERROR',
message: `Reconciliation EXIT backfill aborted: audit table missing for profile ${profileId}`,
profileId
});
return {
attempted: false,
skippedReason: 'audit_table_missing',
dryRun,
openTradeCandidates: 0,
proposedRows: 0,
insertedRows: 0,
noGoTrades: 0,
noGoReasonCounts: {},
noGoSamples: []
};
}
const lifecycleRows = await runtimeOrderRepository.getFilledLifecycleOrdersForProfile(profileId);
const openTrades = buildOpenTradeSlices(profileId, lifecycleRows);
const managedSymbolTokens = buildManagedBotSymbolTokenSet();
const scopedOpenTrades = openTrades.filter((trade) => {
if (managedSymbolTokens.has(trade.symbolToken)) return true;
return isManagedBotSymbol(trade.symbol, managedSymbolTokens);
});
if (openTrades.length !== scopedOpenTrades.length) {
logger.info('[ReconcileBackfill] Skipped non-bot open trades', {
event: 'reconciliation_backfill_symbol_scope_filtered',
profileId,
dropped: openTrades.length - scopedOpenTrades.length
});
}
if (scopedOpenTrades.length === 0) {
return {
attempted: true,
dryRun,
openTradeCandidates: 0,
proposedRows: 0,
insertedRows: 0,
noGoTrades: 0,
noGoReasonCounts: {},
noGoSamples: []
};
}
const pendingRows = await runtimeOrderRepository.getOpenOrdersForProfile(profileId);
const pendingTradeIds = new Set(
(pendingRows || [])
.map((row) => String((row as any)?.trade_id || '').trim())
.filter(Boolean)
);
const batchId = `RECON-BFILL-${randomUUID()}`;
const proposedRows: ProposedBackfillCandidate[] = [];
const noGoAuditRows: ReconciliationBackfillAuditInsert[] = [];
const advisoryAuditRows: ReconciliationBackfillAuditInsert[] = [];
const tradesBySymbol = new Map<string, OpenTradeSlice[]>();
for (const trade of scopedOpenTrades) {
const key = trade.symbolToken;
const list = tradesBySymbol.get(key) || [];
list.push(trade);
tradesBySymbol.set(key, list);
}
for (const tradesForSymbol of tradesBySymbol.values()) {
const canonicalSymbol = tradesForSymbol[0].symbol;
const symbolKey = tradesForSymbol[0].symbolToken;
const exchangePosition = await ctx.executor.fetchExchangePosition(canonicalSymbol);
const exchangeOpenQty = positionQty(exchangePosition);
const symbolIsFlat = exchangeOpenQty <= Math.max(0, toNumber(config.RECON_EXIT_BACKFILL_DUST_ABS_QTY));
const exchangeClosed = await ctx.executor.fetchExchangeClosedOrders(
[canonicalSymbol],
config.RECON_EXIT_BACKFILL_LOOKBACK_HOURS,
{
limitPerPage: Math.max(1, Math.min(500, Math.floor(Number(config.RECON_ORDER_COVERAGE_FETCH_LIMIT_PER_PAGE || 500)))),
maxPages: Math.max(1, Math.min(100, Math.floor(Number(config.RECON_ORDER_COVERAGE_MAX_FETCH_PAGES || 8))))
}
);
const evidence = normalizeEvidenceOrders(symbolKey, exchangeClosed);
let droppedForeignSubTag = 0;
let droppedUnattributed = 0;
let droppedMalformedSubTag = 0;
const scopedEvidence = evidence.filter((row) => {
const hasTradeHint = String(row.tradeIdHint || '').trim().length > 0;
const clientCorrelation = parseTradeFromClientOrderId(profileId, row.clientOrderId);
if (row.subTag) {
if (!isBytelystSubTag(row.subTag)) {
droppedMalformedSubTag += 1;
if (requireStrongAttribution && !hasTradeHint && !clientCorrelation?.tradeId) {
droppedUnattributed += 1;
return false;
}
return !requireStrongAttribution || hasTradeHint || !!clientCorrelation?.tradeId;
}
if (!subTagBelongsToProfile(row.subTag, profileId)) {
droppedForeignSubTag += 1;
return false;
}
}
const attributed = hasTradeHint
|| !!clientCorrelation?.tradeId
|| (row.subTag && isBytelystSubTag(row.subTag) && subTagBelongsToProfile(row.subTag, profileId));
if (requireStrongAttribution && !attributed) {
droppedUnattributed += 1;
return false;
}
return true;
});
const droppedEvidence = evidence.length - scopedEvidence.length;
if (droppedEvidence > 0) {
logger.info('[ReconcileBackfill] Scoped exchange evidence by attribution/sub-tag', {
event: 'reconciliation_backfill_subtag_scope',
profileId,
symbol: canonicalSymbol,
totalEvidence: evidence.length,
keptEvidence: scopedEvidence.length,
droppedEvidence,
droppedForeignSubTag,
droppedMalformedSubTag,
droppedUnattributed,
requireStrongAttribution
});
}
const allocation = allocateEvidence(tradesForSymbol, scopedEvidence, profileId, {
allowHeuristicMatch,
fillAfterTradeGraceMs
});
for (const trade of tradesForSymbol) {
const tradePending = pendingTradeIds.has(trade.tradeId);
if (tradePending) {
advisoryAuditRows.push({
batch_id: batchId,
profile_id: profileId,
symbol: trade.symbol,
trade_id: trade.tradeId,
dry_run: dryRun,
decision: 'SKIP_PENDING_ORDER',
reason: 'pending_order_blocker',
metadata: {
openQty: trade.openQty
}
});
continue;
}
const assigned = (allocation.byTrade.get(trade.tradeId) || [])
.sort((a, b) => a.filledAtMs - b.filledAtMs);
const threshold = dustThresholdForTrade(trade.openQty);
let remaining = trade.openQty;
let usedEvidenceRows = 0;
for (const row of assigned) {
if (!(remaining > EPSILON)) break;
const applyQty = Math.min(remaining, row.qty);
if (!(applyQty > EPSILON)) continue;
const backfillOrderId = buildBackfillOrderId(
profileId,
trade.tradeId,
row.exchangeOrderId,
row.filledAtIso
);
proposedRows.push({
order: {
user_id: trade.userId || ctx.userId,
profile_id: profileId,
order_id: backfillOrderId,
symbol: trade.symbol,
type: 'market',
side: row.side,
qty: Number(applyQty.toFixed(8)),
quantity: Number(applyQty.toFixed(8)),
price: Number(row.price.toFixed(8)),
status: 'filled',
timestamp: row.filledAtMs,
filled_at: row.filledAtIso,
trade_id: trade.tradeId,
action: 'EXIT',
source: 'BOT',
sub_tag: row.subTag || undefined
},
exchangeOrderId: row.exchangeOrderId,
exchangeClientOrderId: row.clientOrderId || undefined,
exchangeSubTag: row.subTag || undefined,
matchedBy: row.matchedBy
});
remaining = Number((remaining - applyQty).toFixed(8));
usedEvidenceRows += 1;
}
if (remaining > EPSILON) {
if (remaining <= threshold && symbolIsFlat) {
const dustFilledAtIso = assigned.length > 0
? assigned[assigned.length - 1].filledAtIso
: toIso(trade.latestTimestampMs);
const dustTs = assigned.length > 0
? assigned[assigned.length - 1].filledAtMs
: trade.latestTimestampMs;
const dustExchangeOrderId = `DUST-REMAINDER-${trade.tradeId}`;
const backfillOrderId = buildBackfillOrderId(
profileId,
trade.tradeId,
dustExchangeOrderId,
dustFilledAtIso
);
proposedRows.push({
order: {
user_id: trade.userId || ctx.userId,
profile_id: profileId,
order_id: backfillOrderId,
symbol: trade.symbol,
type: 'market',
side: expectedExitSide(trade.entrySide),
qty: Number(remaining.toFixed(8)),
quantity: Number(remaining.toFixed(8)),
price: Number((assigned[assigned.length - 1]?.price || 0).toFixed(8)),
status: 'filled',
timestamp: dustTs,
filled_at: dustFilledAtIso,
trade_id: trade.tradeId,
action: 'EXIT',
source: 'BOT',
sub_tag: assigned[assigned.length - 1]?.subTag || undefined
},
exchangeOrderId: dustExchangeOrderId,
exchangeSubTag: assigned[assigned.length - 1]?.subTag || undefined,
matchedBy: 'dust'
});
remaining = 0;
} else {
const reason = !symbolIsFlat && usedEvidenceRows === 0
? `exchange_not_flat:${exchangeOpenQty}`
: (!symbolIsFlat && remaining <= threshold)
? 'dust_remainder_blocked_exchange_not_flat'
: 'missing_fill_evidence_for_large_remainder';
const isAdvisoryBlockedActive =
reason.startsWith('exchange_not_flat:')
|| reason === 'dust_remainder_blocked_exchange_not_flat';
const destination = isAdvisoryBlockedActive
? advisoryAuditRows
: noGoAuditRows;
destination.push({
batch_id: batchId,
profile_id: profileId,
symbol: trade.symbol,
trade_id: trade.tradeId,
dry_run: dryRun,
decision: isAdvisoryBlockedActive ? 'SKIP_ACTIVE_POSITION' : 'NO_GO',
reason,
metadata: {
openQty: trade.openQty,
evidenceRows: assigned.length,
evidenceRowsUsed: usedEvidenceRows,
remaining,
dustThreshold: threshold,
unmatchedEvidenceCount: allocation.unmatched.length,
exchangeOpenQty,
symbolIsFlat,
requireStrongAttribution,
allowHeuristicMatch,
fillAfterTradeGraceMs
}
});
}
}
}
}
const proposedOrderIds = proposedRows.map((row) => row.order.order_id);
const existingBefore = await runtimeOrderRepository.getExistingOrderIds(proposedOrderIds, profileId);
const baseAuditRows: ReconciliationBackfillAuditInsert[] = proposedRows.map((row) => ({
batch_id: batchId,
profile_id: profileId,
symbol: row.order.symbol,
trade_id: row.order.trade_id,
exchange_order_id: row.exchangeOrderId,
exchange_client_order_id: row.exchangeClientOrderId || null,
backfill_order_id: row.order.order_id,
filled_qty: row.order.qty,
filled_price: row.order.price,
filled_at: row.order.filled_at || null,
dry_run: dryRun,
decision: dryRun ? 'DRY_RUN' : 'PENDING_APPLY',
reason: existingBefore.has(row.order.order_id) ? 'already_exists' : 'eligible',
metadata: {
action: row.order.action,
status: row.order.status,
matchedBy: row.matchedBy,
exchangeSubTag: row.exchangeSubTag || null
}
}));
const preAuditRows = dryRun
? [...baseAuditRows, ...noGoAuditRows, ...advisoryAuditRows]
: [...baseAuditRows, ...noGoAuditRows, ...advisoryAuditRows];
const preAuditSaved = await runtimeOrderRepository.insertReconciliationBackfillAuditRows(preAuditRows);
if (!preAuditSaved) {
return {
attempted: true,
batchId,
dryRun,
openTradeCandidates: scopedOpenTrades.length,
proposedRows: proposedRows.length,
insertedRows: 0,
noGoTrades: noGoAuditRows.length,
noGoReasonCounts: summarizeNoGoReasons(noGoAuditRows),
noGoSamples: noGoSamples(noGoAuditRows)
};
}
let insertedRows = 0;
if (!dryRun && proposedRows.length > 0) {
const applyOk = await runtimeOrderRepository.upsertReconciliationBackfillOrders(proposedRows.map((row) => row.order));
if (!applyOk) {
observabilityService.emitEvent({
type: 'SYSTEM_ERROR',
severity: 'ERROR',
message: `Reconciliation EXIT backfill apply failed for profile ${profileId}. Batch ${batchId}`,
profileId
});
return {
attempted: true,
batchId,
dryRun,
openTradeCandidates: scopedOpenTrades.length,
proposedRows: proposedRows.length,
insertedRows: 0,
noGoTrades: noGoAuditRows.length,
noGoReasonCounts: summarizeNoGoReasons(noGoAuditRows),
noGoSamples: noGoSamples(noGoAuditRows)
};
}
const existingAfter = await runtimeOrderRepository.getExistingOrderIds(proposedOrderIds, profileId);
insertedRows = proposedRows.filter((row) => !existingBefore.has(row.order.order_id) && existingAfter.has(row.order.order_id)).length;
const postAuditRows: ReconciliationBackfillAuditInsert[] = proposedRows.map((row) => ({
batch_id: batchId,
profile_id: profileId,
symbol: row.order.symbol,
trade_id: row.order.trade_id,
exchange_order_id: row.exchangeOrderId,
exchange_client_order_id: row.exchangeClientOrderId || null,
backfill_order_id: row.order.order_id,
filled_qty: row.order.qty,
filled_price: row.order.price,
filled_at: row.order.filled_at || null,
dry_run: false,
decision: existingBefore.has(row.order.order_id) ? 'SKIP_EXISTING' : 'APPLIED',
reason: existingBefore.has(row.order.order_id) ? 'already_exists' : 'inserted',
metadata: {
action: row.order.action,
status: row.order.status,
matchedBy: row.matchedBy,
exchangeSubTag: row.exchangeSubTag || null
},
applied_at: !existingBefore.has(row.order.order_id) ? new Date().toISOString() : null
}));
const postSaved = await runtimeOrderRepository.insertReconciliationBackfillAuditRows(postAuditRows);
if (!postSaved) {
logger.error(`[ReconcileBackfill] Failed to persist post-apply audit rows for batch ${batchId}`);
}
}
logger.info('[ReconcileBackfill] EXIT backfill evaluated', {
event: 'reconciliation_exit_backfill',
profileId,
batchId,
dryRun,
openTradeCandidates: scopedOpenTrades.length,
proposedRows: proposedRows.length,
insertedRows,
noGoTrades: noGoAuditRows.length,
advisoryBlockedTrades: advisoryAuditRows.length
});
return {
attempted: true,
batchId,
dryRun,
openTradeCandidates: scopedOpenTrades.length,
proposedRows: proposedRows.length,
insertedRows,
noGoTrades: noGoAuditRows.length,
noGoReasonCounts: summarizeNoGoReasons(noGoAuditRows),
noGoSamples: noGoSamples(noGoAuditRows)
};
}
}
export const reconciliationExitBackfillService = new ReconciliationExitBackfillService();