import { randomUUID } from 'crypto'; import logger from '../utils/logger.js'; import { TradeExecutor } from './TradeExecutor.js'; import { distributedLockService } from './distributedLockService.js'; import { healthTracker } from './healthTracker.js'; import { observabilityService } from './observabilityService.js'; import { normalizeOrderAction } from '../domain/tradingEnums.js'; import { reconciliationOrderCoverageService } from './reconciliationOrderCoverageService.js'; import { reconciliationExitBackfillService } from './reconciliationExitBackfillService.js'; import { reconciliationSubTagRepairService } from './reconciliationSubTagRepairService.js'; import { reconciliationParityHeartbeatService } from './reconciliationParityHeartbeatService.js'; import { buildManagedBotSymbolTokenSet, isManagedBotSymbol } from '../utils/botSymbolScope.js'; import { extractOrderSubTag } from '../utils/alpacaSubTag.js'; import { config } from '../config/index.js'; import * as runtimeOrderRepository from './runtimeOrderRepository.js'; export interface ReconciliationContext { profileId: string; userId: string; executor: TradeExecutor; monitoredSymbols?: string[]; } export interface ReconciliationResult { processed: boolean; mismatchCount: number; missingFromExchange: number; missingInDb: number; noGoTrades: number; noGoReasonCounts: Record; noGoSamples: Array<{ profileId: string; symbol: string; tradeId: string; reason: string; }>; parityMismatchTrades: number; parityQuarantinedTrades: number; parityAutoClosedTrades: number; parityMaxMismatchNotionalUsd: number; parityTotalMismatchNotionalUsd: number; integrityWatchdogTriggered: boolean; error?: string; } const normalizeComparableStatus = (statusRaw?: string | null): string => { const status = String(statusRaw || '').trim().toLowerCase(); if (!status) return ''; if (status === 'new' || status === 'accepted' || status === 'pending') return 'pending_new'; if (status === 'partially-filled') return 'partially_filled'; if (status === 'cancelled' || status === 'canceled') return 'canceled'; return status; }; const identifyOrderKeys = (order: any): string[] => { if (!order) return []; return [ order?.order_id, order?.id, order?.client_order_id, order?.clientOrderId, order?.trade_id, order?.tradeId ] .map((value) => String(value || '').trim().toLowerCase()) .filter(Boolean); }; const determineAction = (order: any): 'ENTRY' | 'EXIT' => { const candidate = normalizeOrderAction(order?.action); if (candidate) return candidate; const side = String(order?.side || '').trim().toLowerCase(); if (side === 'sell' || side === 'short') return 'EXIT'; return 'ENTRY'; }; const determineFillQty = (order: any): number => { const candidates = [order?.filled_qty, order?.filledQty, order?.filled_quantity, order?.qty, order?.amount, order?.size]; for (const candidate of candidates) { const parsed = Number(candidate); if (Number.isFinite(parsed) && parsed > 0) return parsed; } return 0; }; const determineFillPrice = (order: any): number => { const candidates = [order?.filled_avg_price, order?.avg_price, order?.price, order?.requestedPrice, order?.requested_price]; for (const candidate of candidates) { const parsed = Number(candidate); if (Number.isFinite(parsed) && parsed > 0) return parsed; } return 0; }; const extractOrderSymbol = (order: any): string => { return String(order?.symbol || order?.symbol_name || '').trim(); }; export class ReconciliationService { private skippedSafetyStepAlertByScope = new Map(); private integrityWatchdogLastEmittedAt = new Map(); private shouldEmitSkippedSafetyStepAlert(scope: string): boolean { const key = String(scope || 'global').trim() || 'global'; const now = Date.now(); const throttleMs = Math.max(0, Number(config.RECONCILIATION_SLO_ALERT_THROTTLE_MS || 600_000)); const last = this.skippedSafetyStepAlertByScope.get(key) || 0; if (throttleMs > 0 && (now - last) < throttleMs) return false; this.skippedSafetyStepAlertByScope.set(key, now); return true; } async reconcileProfile(ctx: ReconciliationContext): Promise { const lockOwner = `${process.pid}-${randomUUID()}`; const acquired = await distributedLockService.tryAcquireReconciliationLock(ctx.profileId, lockOwner, 30); if (!acquired) { healthTracker.incrementReconciliationLockContention(); observabilityService.incrementReconciliationLockContention(); return { processed: false, mismatchCount: 0, missingFromExchange: 0, missingInDb: 0, noGoTrades: 0, noGoReasonCounts: {}, noGoSamples: [], parityMismatchTrades: 0, parityQuarantinedTrades: 0, parityAutoClosedTrades: 0, parityMaxMismatchNotionalUsd: 0, parityTotalMismatchNotionalUsd: 0, integrityWatchdogTriggered: false }; } const result: ReconciliationResult = { processed: true, mismatchCount: 0, missingFromExchange: 0, missingInDb: 0, noGoTrades: 0, noGoReasonCounts: {}, noGoSamples: [], parityMismatchTrades: 0, parityQuarantinedTrades: 0, parityAutoClosedTrades: 0, parityMaxMismatchNotionalUsd: 0, parityTotalMismatchNotionalUsd: 0, integrityWatchdogTriggered: false }; try { const [dbOpenOrders, dbClosedOrders] = await Promise.all([ runtimeOrderRepository.getOpenOrdersForProfile(ctx.profileId), runtimeOrderRepository.getRecentlyClosedOrdersForProfile(ctx.profileId, 10) ]); const exchangeOrders = await ctx.executor.fetchExchangeOpenOrders(); const rawDbOpenOrders = dbOpenOrders || []; const rawDbClosedOrders = dbClosedOrders || []; const rawExchangeOrders = exchangeOrders || []; const managedSymbolTokens = buildManagedBotSymbolTokenSet(); const inManagedScope = (order: any): boolean => { const symbol = extractOrderSymbol(order); if (!symbol) return true; return isManagedBotSymbol(symbol, managedSymbolTokens); }; const scopedDbOpenOrders = rawDbOpenOrders.filter((order) => inManagedScope(order)); const scopedDbClosedOrders = rawDbClosedOrders.filter((order) => inManagedScope(order)); const scopedExchangeOrders = rawExchangeOrders.filter((order) => inManagedScope(order)); const droppedCount = (rawDbOpenOrders.length - scopedDbOpenOrders.length) + (rawDbClosedOrders.length - scopedDbClosedOrders.length) + (rawExchangeOrders.length - scopedExchangeOrders.length); if (droppedCount > 0) { logger.info('[Reconcile] Skipped non-bot symbols during parity reconciliation', { event: 'reconciliation_symbol_scope_filtered', profileId: ctx.profileId, dropped: droppedCount }); } const exchangeLookup = new Map(); const exchangeHandled = new Set(); const closedLookup = new Map(); const closedHandled = new Set(); scopedExchangeOrders.forEach((order) => { const key = this.identifyUniqueKey(order); if (!key) return; exchangeLookup.set(key, order); }); scopedDbClosedOrders.forEach((order) => { const key = this.identifyUniqueKey(order); if (!key) return; closedLookup.set(key, order); }); for (const dbOrder of scopedDbOpenOrders) { const match = this.findMatchingOrder(dbOrder, exchangeLookup, exchangeHandled); if (match) { const normalizedExchangeStatus = normalizeComparableStatus(match.status); const normalizedDbStatus = normalizeComparableStatus(dbOrder.status); if (normalizedExchangeStatus && normalizedExchangeStatus !== normalizedDbStatus) { result.mismatchCount += 1; await this.processStatusChange(ctx, dbOrder, match, normalizedExchangeStatus); } } else { result.missingFromExchange += 1; await this.handleMissingExchange(ctx, dbOrder); } } for (const exchangeOrder of scopedExchangeOrders) { const key = this.identifyUniqueKey(exchangeOrder); if (!key || exchangeHandled.has(key)) continue; const closedMatch = this.matchOrderWithoutHandling(exchangeOrder, closedLookup, closedHandled); const normalizedStatus = normalizeComparableStatus(exchangeOrder.status); if (closedMatch) { result.mismatchCount += 1; await this.processStatusChange(ctx, closedMatch, exchangeOrder, normalizedStatus); continue; } result.missingInDb += 1; await this.handleExchangeOnly(ctx, exchangeOrder); } const coverageResult = await reconciliationOrderCoverageService.runProfile(ctx); if (!coverageResult.attempted && coverageResult.skippedReason) { if (this.shouldEmitSkippedSafetyStepAlert(`${ctx.profileId}:coverage:${coverageResult.skippedReason}`)) { observabilityService.emitEvent({ type: 'RECONCILIATION_DEGRADED', severity: coverageResult.skippedReason === 'disabled' ? 'ERROR' : 'WARN', message: `Order coverage reconciliation skipped (${coverageResult.skippedReason}) for profile ${ctx.profileId}.`, profileId: ctx.profileId, userId: ctx.userId }); } } if (coverageResult.attempted && ( coverageResult.missingInDb > 0 || coverageResult.insertedRows > 0 || coverageResult.skippedUnmappedTrade > 0 || coverageResult.skippedUnmappedAction > 0 )) { logger.warn('[Reconcile] Missing-order coverage evaluated', { event: 'reconciliation_order_coverage_evaluated', profileId: ctx.profileId, userId: ctx.userId, dryRun: coverageResult.dryRun, scannedOrders: coverageResult.scannedOrders, filledLikeOrders: coverageResult.filledLikeOrders, botOwnedOrders: coverageResult.botOwnedOrders, eligibleOrders: coverageResult.eligibleOrders, missingInDb: coverageResult.missingInDb, insertedRows: coverageResult.insertedRows, skippedNotBotOwned: coverageResult.skippedNotBotOwned, skippedNotBotOwnedActionable: coverageResult.skippedNotBotOwnedActionable, skippedNotBotOwnedLegacyInDb: coverageResult.skippedNotBotOwnedLegacyInDb, skippedNotBotOwnedBeforeBaseline: coverageResult.skippedNotBotOwnedBeforeBaseline, skippedUnmappedTrade: coverageResult.skippedUnmappedTrade, skippedUnmappedAction: coverageResult.skippedUnmappedAction, skippedMissingFillData: coverageResult.skippedMissingFillData, skippedMissingOrderId: coverageResult.skippedMissingOrderId, skippedExisting: coverageResult.skippedExisting, skippedMaxInsertLimit: coverageResult.skippedMaxInsertLimit }); } if (coverageResult.attempted) { const unresolvedCoverageMissing = Math.max(0, coverageResult.missingInDb - coverageResult.insertedRows); result.missingInDb += unresolvedCoverageMissing; } const backfillResult = await reconciliationExitBackfillService.runProfile(ctx); if (!backfillResult.attempted && backfillResult.skippedReason) { if (this.shouldEmitSkippedSafetyStepAlert(`${ctx.profileId}:backfill:${backfillResult.skippedReason}`)) { observabilityService.emitEvent({ type: 'RECONCILIATION_DEGRADED', severity: backfillResult.skippedReason === 'disabled' ? 'ERROR' : 'WARN', message: `EXIT backfill reconciliation skipped (${backfillResult.skippedReason}) for profile ${ctx.profileId}.`, profileId: ctx.profileId, userId: ctx.userId }); } } if (backfillResult.attempted && (backfillResult.proposedRows > 0 || backfillResult.noGoTrades > 0)) { logger.warn('[Reconcile] EXIT backfill evaluated', { event: 'reconciliation_exit_backfill_profile', profileId: ctx.profileId, userId: ctx.userId, batchId: backfillResult.batchId, dryRun: backfillResult.dryRun, openTradeCandidates: backfillResult.openTradeCandidates, proposedRows: backfillResult.proposedRows, insertedRows: backfillResult.insertedRows, noGoTrades: backfillResult.noGoTrades }); } result.noGoTrades = backfillResult.noGoTrades; result.noGoReasonCounts = { ...(backfillResult.noGoReasonCounts || {}) }; result.noGoSamples = (backfillResult.noGoSamples || []).map((sample) => ({ profileId: ctx.profileId, symbol: String(sample?.symbol || '').trim(), tradeId: String(sample?.tradeId || '').trim(), reason: String(sample?.reason || '').trim() || 'unknown' })); result.integrityWatchdogTriggered = this.evaluateIntegrityWatchdog(ctx, result.missingInDb, backfillResult.noGoTrades); if (!backfillResult.attempted && backfillResult.skippedReason === 'audit_table_missing') { logger.error(`[Reconcile] EXIT backfill skipped for ${ctx.profileId}: audit_table_missing`); } const subTagRepairResult = await reconciliationSubTagRepairService.runProfile({ profileId: ctx.profileId, userId: ctx.userId }); if (subTagRepairResult.unsupported) { logger.error(`[Reconcile] Sub-tag repair skipped for ${ctx.profileId}: unsupported_column`); } const parityResult = await reconciliationParityHeartbeatService.runProfile({ profileId: ctx.profileId, userId: ctx.userId, executor: ctx.executor, monitoredSymbols: ctx.monitoredSymbols }); if (parityResult.attempted) { result.parityMismatchTrades = parityResult.mismatchTrades; result.parityQuarantinedTrades = parityResult.quarantinedTrades; result.parityAutoClosedTrades = parityResult.autoClosedTrades; result.parityMaxMismatchNotionalUsd = parityResult.maxMismatchNotionalUsd; result.parityTotalMismatchNotionalUsd = parityResult.totalMismatchNotionalUsd; result.integrityWatchdogTriggered = result.integrityWatchdogTriggered || parityResult.integrityWatchdogTriggered; } else if (parityResult.skippedReason) { logger.info(`[Reconcile] Parity heartbeat skipped for ${ctx.profileId}: ${parityResult.skippedReason}`); } return result; } catch (error: any) { const message = String(error?.message || error || 'unknown_error'); logger.error(`[Reconcile] Profile ${ctx.profileId} failed: ${message}`); observabilityService.emitEvent({ type: 'RECONCILIATION_DEGRADED', severity: 'ERROR', message: `Reconciliation failed for profile ${ctx.profileId}: ${message}`, profileId: ctx.profileId, userId: ctx.userId }); return { ...result, processed: false, error: message }; } finally { await distributedLockService.releaseReconciliationLock(ctx.profileId, lockOwner); } } private evaluateIntegrityWatchdog(ctx: ReconciliationContext, missingInDb: number, noGoTrades: number): boolean { if (!config.ENABLE_RECON_INTEGRITY_WATCHDOG) return false; const missingDbThreshold = Math.max(1, Number(config.RECON_INTEGRITY_WATCHDOG_MISSING_DB_THRESHOLD || 1)); const noGoThreshold = Math.max(1, Number(config.RECON_INTEGRITY_WATCHDOG_NO_GO_THRESHOLD || 1)); const exceedsThreshold = missingInDb >= missingDbThreshold || noGoTrades >= noGoThreshold; if (!exceedsThreshold) return false; const key = String(ctx.profileId || '').trim() || 'global'; const now = Date.now(); const throttleMs = Math.max(0, Number(config.RECON_INTEGRITY_WATCHDOG_THROTTLE_MS || 600_000)); const last = this.integrityWatchdogLastEmittedAt.get(key) || 0; if (throttleMs > 0 && (now - last) < throttleMs) { return true; } this.integrityWatchdogLastEmittedAt.set(key, now); const message = `Reconciliation integrity watchdog triggered for ${ctx.profileId}: missing_in_db=${missingInDb} (threshold=${missingDbThreshold}), no_go=${noGoTrades} (threshold=${noGoThreshold}).`; logger.error(`[Reconcile] ${message}`); observabilityService.emitEvent({ type: 'RECONCILIATION_DEGRADED', severity: 'ERROR', message, profileId: ctx.profileId, userId: ctx.userId }); return true; } private identifyUniqueKey(order: any): string | null { const keys = identifyOrderKeys(order); return keys.length > 0 ? keys[0] : null; } private findMatchingOrder(dbOrder: any, lookup: Map, handled: Set): any | null { const keys = identifyOrderKeys(dbOrder); for (const key of keys) { const exchangeOrder = lookup.get(key); if (!exchangeOrder) continue; const uniqueKey = this.identifyUniqueKey(exchangeOrder); if (!uniqueKey) continue; if (handled.has(uniqueKey)) continue; handled.add(uniqueKey); return exchangeOrder; } return null; } private matchOrderWithoutHandling(order: any, lookup: Map, handled: Set): any | null { const keys = identifyOrderKeys(order); for (const key of keys) { const candidate = lookup.get(key); if (!candidate) continue; const uniqueKey = this.identifyUniqueKey(candidate); if (!uniqueKey) continue; if (handled.has(uniqueKey)) continue; handled.add(uniqueKey); return candidate; } return null; } private async processStatusChange(ctx: ReconciliationContext, dbOrder: any, exchangeOrder: any, normalizedStatus: string) { const action = determineAction(dbOrder || exchangeOrder); await runtimeOrderRepository.logOrder(this.buildLifecyclePayload(ctx, dbOrder, exchangeOrder, normalizedStatus)); const orderId = String((exchangeOrder || dbOrder)?.order_id || (exchangeOrder || dbOrder)?.id || '').trim(); const tradeId = String((exchangeOrder || dbOrder)?.trade_id || (exchangeOrder || dbOrder)?.tradeId || '').trim(); logger.info('Reconciliation correction applied', { event: 'reconciliation_correction', profileId: ctx.profileId, userId: ctx.userId, orderId, tradeId, status: normalizedStatus, action }); if (action === 'ENTRY') { if (['filled', 'partially_filled'].includes(normalizedStatus)) { await ctx.executor.reconcileEntryFill(exchangeOrder, determineFillPrice(exchangeOrder), determineFillQty(exchangeOrder)); } else if (normalizedStatus === 'canceled') { await ctx.executor.reconcileCancel(exchangeOrder); } } else { if (['filled', 'partially_filled'].includes(normalizedStatus)) { await ctx.executor.reconcileExitFill(exchangeOrder, determineFillPrice(exchangeOrder), determineFillQty(exchangeOrder)); } else if (normalizedStatus === 'canceled') { await ctx.executor.reconcileCancel(exchangeOrder); } } } private async handleMissingExchange(ctx: ReconciliationContext, dbOrder: any) { await runtimeOrderRepository.logOrder(this.buildLifecyclePayload(ctx, dbOrder)); logger.info('Reconciliation cancel added (missing exchange)', { event: 'reconciliation_cancel_missing_exchange', profileId: ctx.profileId, userId: ctx.userId, orderId: String(dbOrder?.order_id || dbOrder?.id || '').trim(), tradeId: String(dbOrder?.trade_id || dbOrder?.tradeId || '').trim(), symbol: String(dbOrder?.symbol || '') }); await ctx.executor.reconcileCancel(dbOrder); } private async handleExchangeOnly(ctx: ReconciliationContext, exchangeOrder: any) { const normalizedStatus = normalizeComparableStatus(exchangeOrder.status); await runtimeOrderRepository.logOrder(this.buildLifecyclePayload(ctx, exchangeOrder, undefined, normalizedStatus)); logger.info('Reconciliation discovery added', { event: 'reconciliation_exchange_discovery', profileId: ctx.profileId, userId: ctx.userId, orderId: String(exchangeOrder?.order_id || exchangeOrder?.id || '').trim(), tradeId: String(exchangeOrder?.trade_id || exchangeOrder?.tradeId || '').trim(), symbol: String(exchangeOrder?.symbol || exchangeOrder?.symbol_name || '') }); if (['filled', 'partially_filled'].includes(normalizedStatus)) { const action = determineAction(exchangeOrder); if (action === 'ENTRY') { await ctx.executor.reconcileEntryFill(exchangeOrder, determineFillPrice(exchangeOrder), determineFillQty(exchangeOrder)); } else { await ctx.executor.reconcileExitFill(exchangeOrder, determineFillPrice(exchangeOrder), determineFillQty(exchangeOrder)); } } else if (normalizedStatus === 'canceled') { await ctx.executor.reconcileCancel(exchangeOrder); } } private buildLifecyclePayload(ctx: ReconciliationContext, dbOrder?: any, exchangeOrder?: any, forcedStatus?: string) { const order = exchangeOrder || dbOrder || {}; return { user_id: ctx.userId, profile_id: ctx.profileId, order_id: String(order.order_id || order.id || order.client_order_id || '').trim(), symbol: String(order.symbol || order.symbol_name || '').trim(), type: order.type || order.order_type || 'market', side: order.side || order.direction || 'BUY', qty: Number(order.qty ?? order.quantity ?? order.amount ?? 0), price: Number(order.price ?? order.limit_price ?? order.avg_price ?? 0), status: forcedStatus || exchangeOrder?.status || dbOrder?.status, timestamp: order.timestamp || Date.now(), trade_id: order.trade_id || order.tradeId, action: normalizeOrderAction(order.action) || determineAction(order), sub_tag: extractOrderSubTag(order) || undefined }; } } export const reconciliationService = new ReconciliationService();