learning_ai_invt_trdg/backend/src/services/reconciliationService.ts

506 lines
24 KiB
TypeScript

import { randomUUID } from 'crypto';
import logger from '../utils/logger.js';
import { TradeExecutor } from './TradeExecutor.js';
import { distributedLockService } from './distributedLockService.js';
import { healthTracker } from './healthTracker.js';
import { observabilityService } from './observabilityService.js';
import { normalizeOrderAction } from '../domain/tradingEnums.js';
import { reconciliationOrderCoverageService } from './reconciliationOrderCoverageService.js';
import { reconciliationExitBackfillService } from './reconciliationExitBackfillService.js';
import { reconciliationSubTagRepairService } from './reconciliationSubTagRepairService.js';
import { reconciliationParityHeartbeatService } from './reconciliationParityHeartbeatService.js';
import { buildManagedBotSymbolTokenSet, isManagedBotSymbol } from '../utils/botSymbolScope.js';
import { extractOrderSubTag } from '../utils/alpacaSubTag.js';
import { config } from '../config/index.js';
import * as runtimeOrderRepository from './runtimeOrderRepository.js';
export interface ReconciliationContext {
profileId: string;
userId: string;
executor: TradeExecutor;
monitoredSymbols?: string[];
}
export interface ReconciliationResult {
processed: boolean;
mismatchCount: number;
missingFromExchange: number;
missingInDb: number;
noGoTrades: number;
noGoReasonCounts: Record<string, number>;
noGoSamples: Array<{
profileId: string;
symbol: string;
tradeId: string;
reason: string;
}>;
parityMismatchTrades: number;
parityQuarantinedTrades: number;
parityAutoClosedTrades: number;
parityMaxMismatchNotionalUsd: number;
parityTotalMismatchNotionalUsd: number;
integrityWatchdogTriggered: boolean;
error?: string;
}
const normalizeComparableStatus = (statusRaw?: string | null): string => {
const status = String(statusRaw || '').trim().toLowerCase();
if (!status) return '';
if (status === 'new' || status === 'accepted' || status === 'pending') return 'pending_new';
if (status === 'partially-filled') return 'partially_filled';
if (status === 'cancelled' || status === 'canceled') return 'canceled';
return status;
};
const identifyOrderKeys = (order: any): string[] => {
if (!order) return [];
return [
order?.order_id,
order?.id,
order?.client_order_id,
order?.clientOrderId,
order?.trade_id,
order?.tradeId
]
.map((value) => String(value || '').trim().toLowerCase())
.filter(Boolean);
};
const determineAction = (order: any): 'ENTRY' | 'EXIT' => {
const candidate = normalizeOrderAction(order?.action);
if (candidate) return candidate;
const side = String(order?.side || '').trim().toLowerCase();
if (side === 'sell' || side === 'short') return 'EXIT';
return 'ENTRY';
};
const determineFillQty = (order: any): number => {
const candidates = [order?.filled_qty, order?.filledQty, order?.filled_quantity, order?.qty, order?.amount, order?.size];
for (const candidate of candidates) {
const parsed = Number(candidate);
if (Number.isFinite(parsed) && parsed > 0) return parsed;
}
return 0;
};
const determineFillPrice = (order: any): number => {
const candidates = [order?.filled_avg_price, order?.avg_price, order?.price, order?.requestedPrice, order?.requested_price];
for (const candidate of candidates) {
const parsed = Number(candidate);
if (Number.isFinite(parsed) && parsed > 0) return parsed;
}
return 0;
};
const extractOrderSymbol = (order: any): string => {
return String(order?.symbol || order?.symbol_name || '').trim();
};
export class ReconciliationService {
private skippedSafetyStepAlertByScope = new Map<string, number>();
private integrityWatchdogLastEmittedAt = new Map<string, number>();
private shouldEmitSkippedSafetyStepAlert(scope: string): boolean {
const key = String(scope || 'global').trim() || 'global';
const now = Date.now();
const throttleMs = Math.max(0, Number(config.RECONCILIATION_SLO_ALERT_THROTTLE_MS || 600_000));
const last = this.skippedSafetyStepAlertByScope.get(key) || 0;
if (throttleMs > 0 && (now - last) < throttleMs) return false;
this.skippedSafetyStepAlertByScope.set(key, now);
return true;
}
async reconcileProfile(ctx: ReconciliationContext): Promise<ReconciliationResult> {
const lockOwner = `${process.pid}-${randomUUID()}`;
const acquired = await distributedLockService.tryAcquireReconciliationLock(ctx.profileId, lockOwner, 30);
if (!acquired) {
healthTracker.incrementReconciliationLockContention();
observabilityService.incrementReconciliationLockContention();
return {
processed: false,
mismatchCount: 0,
missingFromExchange: 0,
missingInDb: 0,
noGoTrades: 0,
noGoReasonCounts: {},
noGoSamples: [],
parityMismatchTrades: 0,
parityQuarantinedTrades: 0,
parityAutoClosedTrades: 0,
parityMaxMismatchNotionalUsd: 0,
parityTotalMismatchNotionalUsd: 0,
integrityWatchdogTriggered: false
};
}
const result: ReconciliationResult = {
processed: true,
mismatchCount: 0,
missingFromExchange: 0,
missingInDb: 0,
noGoTrades: 0,
noGoReasonCounts: {},
noGoSamples: [],
parityMismatchTrades: 0,
parityQuarantinedTrades: 0,
parityAutoClosedTrades: 0,
parityMaxMismatchNotionalUsd: 0,
parityTotalMismatchNotionalUsd: 0,
integrityWatchdogTriggered: false
};
try {
const [dbOpenOrders, dbClosedOrders] = await Promise.all([
runtimeOrderRepository.getOpenOrdersForProfile(ctx.profileId),
runtimeOrderRepository.getRecentlyClosedOrdersForProfile(ctx.profileId, 10)
]);
const exchangeOrders = await ctx.executor.fetchExchangeOpenOrders();
const rawDbOpenOrders = dbOpenOrders || [];
const rawDbClosedOrders = dbClosedOrders || [];
const rawExchangeOrders = exchangeOrders || [];
const managedSymbolTokens = buildManagedBotSymbolTokenSet();
const inManagedScope = (order: any): boolean => {
const symbol = extractOrderSymbol(order);
if (!symbol) return true;
return isManagedBotSymbol(symbol, managedSymbolTokens);
};
const scopedDbOpenOrders = rawDbOpenOrders.filter((order) => inManagedScope(order));
const scopedDbClosedOrders = rawDbClosedOrders.filter((order) => inManagedScope(order));
const scopedExchangeOrders = rawExchangeOrders.filter((order) => inManagedScope(order));
const droppedCount = (rawDbOpenOrders.length - scopedDbOpenOrders.length)
+ (rawDbClosedOrders.length - scopedDbClosedOrders.length)
+ (rawExchangeOrders.length - scopedExchangeOrders.length);
if (droppedCount > 0) {
logger.info('[Reconcile] Skipped non-bot symbols during parity reconciliation', {
event: 'reconciliation_symbol_scope_filtered',
profileId: ctx.profileId,
dropped: droppedCount
});
}
const exchangeLookup = new Map<string, any>();
const exchangeHandled = new Set<string>();
const closedLookup = new Map<string, any>();
const closedHandled = new Set<string>();
scopedExchangeOrders.forEach((order) => {
const key = this.identifyUniqueKey(order);
if (!key) return;
exchangeLookup.set(key, order);
});
scopedDbClosedOrders.forEach((order) => {
const key = this.identifyUniqueKey(order);
if (!key) return;
closedLookup.set(key, order);
});
for (const dbOrder of scopedDbOpenOrders) {
const match = this.findMatchingOrder(dbOrder, exchangeLookup, exchangeHandled);
if (match) {
const normalizedExchangeStatus = normalizeComparableStatus(match.status);
const normalizedDbStatus = normalizeComparableStatus(dbOrder.status);
if (normalizedExchangeStatus && normalizedExchangeStatus !== normalizedDbStatus) {
result.mismatchCount += 1;
await this.processStatusChange(ctx, dbOrder, match, normalizedExchangeStatus);
}
} else {
result.missingFromExchange += 1;
await this.handleMissingExchange(ctx, dbOrder);
}
}
for (const exchangeOrder of scopedExchangeOrders) {
const key = this.identifyUniqueKey(exchangeOrder);
if (!key || exchangeHandled.has(key)) continue;
const closedMatch = this.matchOrderWithoutHandling(exchangeOrder, closedLookup, closedHandled);
const normalizedStatus = normalizeComparableStatus(exchangeOrder.status);
if (closedMatch) {
result.mismatchCount += 1;
await this.processStatusChange(ctx, closedMatch, exchangeOrder, normalizedStatus);
continue;
}
result.missingInDb += 1;
await this.handleExchangeOnly(ctx, exchangeOrder);
}
const coverageResult = await reconciliationOrderCoverageService.runProfile(ctx);
if (!coverageResult.attempted && coverageResult.skippedReason) {
if (this.shouldEmitSkippedSafetyStepAlert(`${ctx.profileId}:coverage:${coverageResult.skippedReason}`)) {
observabilityService.emitEvent({
type: 'RECONCILIATION_DEGRADED',
severity: coverageResult.skippedReason === 'disabled' ? 'ERROR' : 'WARN',
message: `Order coverage reconciliation skipped (${coverageResult.skippedReason}) for profile ${ctx.profileId}.`,
profileId: ctx.profileId,
userId: ctx.userId
});
}
}
if (coverageResult.attempted && (
coverageResult.missingInDb > 0
|| coverageResult.insertedRows > 0
|| coverageResult.skippedUnmappedTrade > 0
|| coverageResult.skippedUnmappedAction > 0
)) {
logger.warn('[Reconcile] Missing-order coverage evaluated', {
event: 'reconciliation_order_coverage_evaluated',
profileId: ctx.profileId,
userId: ctx.userId,
dryRun: coverageResult.dryRun,
scannedOrders: coverageResult.scannedOrders,
filledLikeOrders: coverageResult.filledLikeOrders,
botOwnedOrders: coverageResult.botOwnedOrders,
eligibleOrders: coverageResult.eligibleOrders,
missingInDb: coverageResult.missingInDb,
insertedRows: coverageResult.insertedRows,
skippedNotBotOwned: coverageResult.skippedNotBotOwned,
skippedNotBotOwnedActionable: coverageResult.skippedNotBotOwnedActionable,
skippedNotBotOwnedLegacyInDb: coverageResult.skippedNotBotOwnedLegacyInDb,
skippedNotBotOwnedBeforeBaseline: coverageResult.skippedNotBotOwnedBeforeBaseline,
skippedUnmappedTrade: coverageResult.skippedUnmappedTrade,
skippedUnmappedAction: coverageResult.skippedUnmappedAction,
skippedMissingFillData: coverageResult.skippedMissingFillData,
skippedMissingOrderId: coverageResult.skippedMissingOrderId,
skippedExisting: coverageResult.skippedExisting,
skippedMaxInsertLimit: coverageResult.skippedMaxInsertLimit
});
}
if (coverageResult.attempted) {
const unresolvedCoverageMissing = Math.max(0, coverageResult.missingInDb - coverageResult.insertedRows);
result.missingInDb += unresolvedCoverageMissing;
}
const backfillResult = await reconciliationExitBackfillService.runProfile(ctx);
if (!backfillResult.attempted && backfillResult.skippedReason) {
if (this.shouldEmitSkippedSafetyStepAlert(`${ctx.profileId}:backfill:${backfillResult.skippedReason}`)) {
observabilityService.emitEvent({
type: 'RECONCILIATION_DEGRADED',
severity: backfillResult.skippedReason === 'disabled' ? 'ERROR' : 'WARN',
message: `EXIT backfill reconciliation skipped (${backfillResult.skippedReason}) for profile ${ctx.profileId}.`,
profileId: ctx.profileId,
userId: ctx.userId
});
}
}
if (backfillResult.attempted && (backfillResult.proposedRows > 0 || backfillResult.noGoTrades > 0)) {
logger.warn('[Reconcile] EXIT backfill evaluated', {
event: 'reconciliation_exit_backfill_profile',
profileId: ctx.profileId,
userId: ctx.userId,
batchId: backfillResult.batchId,
dryRun: backfillResult.dryRun,
openTradeCandidates: backfillResult.openTradeCandidates,
proposedRows: backfillResult.proposedRows,
insertedRows: backfillResult.insertedRows,
noGoTrades: backfillResult.noGoTrades
});
}
result.noGoTrades = backfillResult.noGoTrades;
result.noGoReasonCounts = { ...(backfillResult.noGoReasonCounts || {}) };
result.noGoSamples = (backfillResult.noGoSamples || []).map((sample) => ({
profileId: ctx.profileId,
symbol: String(sample?.symbol || '').trim(),
tradeId: String(sample?.tradeId || '').trim(),
reason: String(sample?.reason || '').trim() || 'unknown'
}));
result.integrityWatchdogTriggered = this.evaluateIntegrityWatchdog(ctx, result.missingInDb, backfillResult.noGoTrades);
if (!backfillResult.attempted && backfillResult.skippedReason === 'audit_table_missing') {
logger.error(`[Reconcile] EXIT backfill skipped for ${ctx.profileId}: audit_table_missing`);
}
const subTagRepairResult = await reconciliationSubTagRepairService.runProfile({
profileId: ctx.profileId,
userId: ctx.userId
});
if (subTagRepairResult.unsupported) {
logger.error(`[Reconcile] Sub-tag repair skipped for ${ctx.profileId}: unsupported_column`);
}
const parityResult = await reconciliationParityHeartbeatService.runProfile({
profileId: ctx.profileId,
userId: ctx.userId,
executor: ctx.executor,
monitoredSymbols: ctx.monitoredSymbols
});
if (parityResult.attempted) {
result.parityMismatchTrades = parityResult.mismatchTrades;
result.parityQuarantinedTrades = parityResult.quarantinedTrades;
result.parityAutoClosedTrades = parityResult.autoClosedTrades;
result.parityMaxMismatchNotionalUsd = parityResult.maxMismatchNotionalUsd;
result.parityTotalMismatchNotionalUsd = parityResult.totalMismatchNotionalUsd;
result.integrityWatchdogTriggered = result.integrityWatchdogTriggered || parityResult.integrityWatchdogTriggered;
} else if (parityResult.skippedReason) {
logger.info(`[Reconcile] Parity heartbeat skipped for ${ctx.profileId}: ${parityResult.skippedReason}`);
}
return result;
} catch (error: any) {
const message = String(error?.message || error || 'unknown_error');
logger.error(`[Reconcile] Profile ${ctx.profileId} failed: ${message}`);
observabilityService.emitEvent({
type: 'RECONCILIATION_DEGRADED',
severity: 'ERROR',
message: `Reconciliation failed for profile ${ctx.profileId}: ${message}`,
profileId: ctx.profileId,
userId: ctx.userId
});
return {
...result,
processed: false,
error: message
};
} finally {
await distributedLockService.releaseReconciliationLock(ctx.profileId, lockOwner);
}
}
private evaluateIntegrityWatchdog(ctx: ReconciliationContext, missingInDb: number, noGoTrades: number): boolean {
if (!config.ENABLE_RECON_INTEGRITY_WATCHDOG) return false;
const missingDbThreshold = Math.max(1, Number(config.RECON_INTEGRITY_WATCHDOG_MISSING_DB_THRESHOLD || 1));
const noGoThreshold = Math.max(1, Number(config.RECON_INTEGRITY_WATCHDOG_NO_GO_THRESHOLD || 1));
const exceedsThreshold = missingInDb >= missingDbThreshold || noGoTrades >= noGoThreshold;
if (!exceedsThreshold) return false;
const key = String(ctx.profileId || '').trim() || 'global';
const now = Date.now();
const throttleMs = Math.max(0, Number(config.RECON_INTEGRITY_WATCHDOG_THROTTLE_MS || 600_000));
const last = this.integrityWatchdogLastEmittedAt.get(key) || 0;
if (throttleMs > 0 && (now - last) < throttleMs) {
return true;
}
this.integrityWatchdogLastEmittedAt.set(key, now);
const message = `Reconciliation integrity watchdog triggered for ${ctx.profileId}: missing_in_db=${missingInDb} (threshold=${missingDbThreshold}), no_go=${noGoTrades} (threshold=${noGoThreshold}).`;
logger.error(`[Reconcile] ${message}`);
observabilityService.emitEvent({
type: 'RECONCILIATION_DEGRADED',
severity: 'ERROR',
message,
profileId: ctx.profileId,
userId: ctx.userId
});
return true;
}
private identifyUniqueKey(order: any): string | null {
const keys = identifyOrderKeys(order);
return keys.length > 0 ? keys[0] : null;
}
private findMatchingOrder(dbOrder: any, lookup: Map<string, any>, handled: Set<string>): any | null {
const keys = identifyOrderKeys(dbOrder);
for (const key of keys) {
const exchangeOrder = lookup.get(key);
if (!exchangeOrder) continue;
const uniqueKey = this.identifyUniqueKey(exchangeOrder);
if (!uniqueKey) continue;
if (handled.has(uniqueKey)) continue;
handled.add(uniqueKey);
return exchangeOrder;
}
return null;
}
private matchOrderWithoutHandling(order: any, lookup: Map<string, any>, handled: Set<string>): any | null {
const keys = identifyOrderKeys(order);
for (const key of keys) {
const candidate = lookup.get(key);
if (!candidate) continue;
const uniqueKey = this.identifyUniqueKey(candidate);
if (!uniqueKey) continue;
if (handled.has(uniqueKey)) continue;
handled.add(uniqueKey);
return candidate;
}
return null;
}
private async processStatusChange(ctx: ReconciliationContext, dbOrder: any, exchangeOrder: any, normalizedStatus: string) {
const action = determineAction(dbOrder || exchangeOrder);
await runtimeOrderRepository.logOrder(this.buildLifecyclePayload(ctx, dbOrder, exchangeOrder, normalizedStatus));
const orderId = String((exchangeOrder || dbOrder)?.order_id || (exchangeOrder || dbOrder)?.id || '').trim();
const tradeId = String((exchangeOrder || dbOrder)?.trade_id || (exchangeOrder || dbOrder)?.tradeId || '').trim();
logger.info('Reconciliation correction applied', {
event: 'reconciliation_correction',
profileId: ctx.profileId,
userId: ctx.userId,
orderId,
tradeId,
status: normalizedStatus,
action
});
if (action === 'ENTRY') {
if (['filled', 'partially_filled'].includes(normalizedStatus)) {
await ctx.executor.reconcileEntryFill(exchangeOrder, determineFillPrice(exchangeOrder), determineFillQty(exchangeOrder));
} else if (normalizedStatus === 'canceled') {
await ctx.executor.reconcileCancel(exchangeOrder);
}
} else {
if (['filled', 'partially_filled'].includes(normalizedStatus)) {
await ctx.executor.reconcileExitFill(exchangeOrder, determineFillPrice(exchangeOrder), determineFillQty(exchangeOrder));
} else if (normalizedStatus === 'canceled') {
await ctx.executor.reconcileCancel(exchangeOrder);
}
}
}
private async handleMissingExchange(ctx: ReconciliationContext, dbOrder: any) {
await runtimeOrderRepository.logOrder(this.buildLifecyclePayload(ctx, dbOrder));
logger.info('Reconciliation cancel added (missing exchange)', {
event: 'reconciliation_cancel_missing_exchange',
profileId: ctx.profileId,
userId: ctx.userId,
orderId: String(dbOrder?.order_id || dbOrder?.id || '').trim(),
tradeId: String(dbOrder?.trade_id || dbOrder?.tradeId || '').trim(),
symbol: String(dbOrder?.symbol || '')
});
await ctx.executor.reconcileCancel(dbOrder);
}
private async handleExchangeOnly(ctx: ReconciliationContext, exchangeOrder: any) {
const normalizedStatus = normalizeComparableStatus(exchangeOrder.status);
await runtimeOrderRepository.logOrder(this.buildLifecyclePayload(ctx, exchangeOrder, undefined, normalizedStatus));
logger.info('Reconciliation discovery added', {
event: 'reconciliation_exchange_discovery',
profileId: ctx.profileId,
userId: ctx.userId,
orderId: String(exchangeOrder?.order_id || exchangeOrder?.id || '').trim(),
tradeId: String(exchangeOrder?.trade_id || exchangeOrder?.tradeId || '').trim(),
symbol: String(exchangeOrder?.symbol || exchangeOrder?.symbol_name || '')
});
if (['filled', 'partially_filled'].includes(normalizedStatus)) {
const action = determineAction(exchangeOrder);
if (action === 'ENTRY') {
await ctx.executor.reconcileEntryFill(exchangeOrder, determineFillPrice(exchangeOrder), determineFillQty(exchangeOrder));
} else {
await ctx.executor.reconcileExitFill(exchangeOrder, determineFillPrice(exchangeOrder), determineFillQty(exchangeOrder));
}
} else if (normalizedStatus === 'canceled') {
await ctx.executor.reconcileCancel(exchangeOrder);
}
}
private buildLifecyclePayload(ctx: ReconciliationContext, dbOrder?: any, exchangeOrder?: any, forcedStatus?: string) {
const order = exchangeOrder || dbOrder || {};
return {
user_id: ctx.userId,
profile_id: ctx.profileId,
order_id: String(order.order_id || order.id || order.client_order_id || '').trim(),
symbol: String(order.symbol || order.symbol_name || '').trim(),
type: order.type || order.order_type || 'market',
side: order.side || order.direction || 'BUY',
qty: Number(order.qty ?? order.quantity ?? order.amount ?? 0),
price: Number(order.price ?? order.limit_price ?? order.avg_price ?? 0),
status: forcedStatus || exchangeOrder?.status || dbOrder?.status,
timestamp: order.timestamp || Date.now(),
trade_id: order.trade_id || order.tradeId,
action: normalizeOrderAction(order.action) || determineAction(order),
sub_tag: extractOrderSubTag(order) || undefined
};
}
}
export const reconciliationService = new ReconciliationService();