learning_ai_invt_trdg/backend/src/services/OrderStatusSyncService.ts

506 lines
22 KiB
TypeScript

import { IExchangeConnector } from '../connectors/types.js';
import logger from '../utils/logger.js';
import { config } from '../config/index.js';
import { SymbolMapper } from '../utils/symbolMapper.js';
import { healthTracker } from './healthTracker.js';
import * as runtimeOrderRepository from './runtimeOrderRepository.js';
const normalizeThrown = (value: unknown): Error => {
if (value instanceof Error) return value;
if (value && typeof value === 'object') return new Error(JSON.stringify(value));
return new Error(String(value ?? 'Unknown error'));
};
export interface OrderSyncOrderRecord {
id?: string;
order_id?: string;
symbol: string;
status?: string;
qty?: number | string;
price?: number | string;
action?: string;
trade_id?: string;
user_id?: string;
profile_id?: string;
filled_at?: string;
updated_at?: string;
created_at?: string;
}
export interface OrderStatusSyncEvent {
orderId: string;
symbol: string;
previousStatus: string;
status: string;
action?: string;
tradeId?: string;
userId?: string;
profileId?: string;
filledAt?: Date;
fillPrice?: number;
fillQty?: number;
quarantined?: boolean;
}
export interface OrderStatusSyncScopeOptions {
userId?: string;
includeOrphanUserOrders?: boolean;
profileNullOnly?: boolean;
}
/**
* OrderStatusSyncService
*
* Periodically checks for orders stuck in 'pending_new' status and updates them
* by querying the exchange for the actual order status.
*
* This prevents stale data in the database when orders are filled but the status
* update fails or is missed.
*/
export class OrderStatusSyncService {
private static readonly UUID_PATTERN = /^[0-9a-f]{8}-[0-9a-f]{4}-[1-5][0-9a-f]{3}-[89ab][0-9a-f]{3}-[0-9a-f]{12}$/i;
private interval: NodeJS.Timeout | null = null;
private isRunning = false;
private lastStats = {
updated: 0,
failed: 0,
notFound: 0,
quarantined: 0,
lastRunAt: 0
};
private missingExchangeDetections = new Map<string, { count: number; lastSeenAt: number }>();
private static warnedCapabilities = new Set<string>();
constructor(
private exchange: IExchangeConnector,
private syncIntervalMs: number = config.ORDER_SYNC_INTERVAL_MS,
private profileId?: string,
private onStatusChange?: (event: OrderStatusSyncEvent) => Promise<void> | void,
private scopeOptions: OrderStatusSyncScopeOptions = {}
) { }
/**
* Start the background sync service
*/
public start() {
if (this.interval) {
logger.warn('[OrderSync] Service already running');
return;
}
const scopeParts: string[] = [];
if (this.profileId) scopeParts.push(`profile=${this.profileId}`);
if (this.scopeOptions.userId) scopeParts.push(`user=${this.scopeOptions.userId}`);
if (this.scopeOptions.profileNullOnly) scopeParts.push('profile=null');
if (this.scopeOptions.includeOrphanUserOrders) scopeParts.push('include-orphans=true');
const scopeLabel = scopeParts.length > 0 ? scopeParts.join(',') : 'global';
const cadenceLabel = this.syncIntervalMs >= 60_000
? `${(this.syncIntervalMs / 60_000).toFixed(this.syncIntervalMs % 60_000 === 0 ? 0 : 1)} minute(s)`
: `${Math.max(1, Math.round(this.syncIntervalMs / 1000))} second(s)`;
logger.info(`[OrderSync] Starting order status sync (${scopeLabel}, every ${cadenceLabel})...`);
// Run immediately on start
this.syncStaleOrders();
// Then run periodically
this.interval = setInterval(() => {
this.syncStaleOrders();
}, this.syncIntervalMs);
}
/**
* Stop the background sync service
*/
public stop() {
if (this.interval) {
clearInterval(this.interval);
this.interval = null;
logger.info('[OrderSync] Service stopped');
}
}
/**
* Main sync logic: Query DB for stale orders and update their status
*/
private async syncStaleOrders() {
if (this.isRunning) {
logger.debug('[OrderSync] Previous sync still running, skipping...');
return;
}
this.isRunning = true;
try {
const staleOrders = await this.getStaleOrders();
const recentClosedOrders = await this.getRecentClosedOrders();
const candidateOrders = new Map<string, OrderSyncOrderRecord>();
for (const row of [...(staleOrders || []), ...(recentClosedOrders || [])]) {
const key = String(row.order_id || row.id || '').trim();
if (!key) continue;
if (!candidateOrders.has(key)) {
candidateOrders.set(key, row);
}
}
if (candidateOrders.size === 0) {
logger.debug('[OrderSync] No stale or recent-closed orders found');
return;
}
logger.info(`[OrderSync] Found ${staleOrders.length} stale + ${recentClosedOrders.length} recent-closed orders to check${this.profileId ? ` (profile=${this.profileId})` : ''}`);
let updated = 0;
let failed = 0;
let notFound = 0;
let quarantined = 0;
for (const order of candidateOrders.values()) {
try {
const result = await this.syncOrderStatus(order);
if (result === 'quarantined') {
quarantined++;
} else {
updated++;
}
} catch (error: any) {
if (error.message?.includes('not found')) {
notFound++;
} else {
failed++;
}
}
}
this.lastStats = {
updated,
failed,
notFound,
quarantined,
lastRunAt: Date.now()
};
logger.info(`[OrderSync] Sync complete: ${updated} updated, ${notFound} not found on exchange, ${quarantined} quarantined, ${failed} failed${this.profileId ? ` (profile=${this.profileId})` : ''}`);
} catch (error: any) {
logger.error(`[OrderSync] Sync error: ${error.message}`);
} finally {
healthTracker.recordOrderSyncLoop();
this.isRunning = false;
}
}
/**
* Get orders from DB that are stuck in pending_new status
*/
private async getStaleOrders(): Promise<OrderSyncOrderRecord[]> {
try {
const staleOrders = await runtimeOrderRepository.getStaleOrders(config.STALE_ORDER_THRESHOLD_MINUTES, {
profileId: this.profileId,
userId: this.scopeOptions.userId,
includeOrphanUserOrders: this.scopeOptions.includeOrphanUserOrders,
profileNullOnly: this.scopeOptions.profileNullOnly
});
return staleOrders;
} catch (error: any) {
logger.error(`[OrderSync] Error fetching stale orders: ${error.message}`);
return [];
}
}
private async getRecentClosedOrders(): Promise<OrderSyncOrderRecord[]> {
const profileId = String(this.profileId || '').trim();
if (!profileId || !OrderStatusSyncService.UUID_PATTERN.test(profileId)) return [];
try {
const rows = await runtimeOrderRepository.getRecentlyClosedOrdersForProfile(
profileId,
config.ORDER_SYNC_RECENT_CLOSED_LOOKBACK_MINUTES
);
return rows || [];
} catch (error: any) {
logger.error(`[OrderSync] Error fetching recent closed orders: ${error.message}`);
return [];
}
}
/**
* Check a single order's status on the exchange and update DB
*/
private async emitStatusChange(event: OrderStatusSyncEvent): Promise<void> {
if (!this.onStatusChange) return;
try {
await this.onStatusChange(event);
} catch (error: any) {
logger.error(`[OrderSync] onStatusChange callback failed for ${event.orderId}: ${error.message}`);
}
}
private isMissingOrderError(error: any): boolean {
const message = String(error?.message || error || '').toLowerCase();
const details = String(error?.details || '').toLowerCase();
const code = String(error?.code || error?.status || error?.statusCode || '').toLowerCase();
const corpus = `${message} ${details} ${code}`;
return (
corpus.includes('not found')
|| corpus.includes('404')
|| corpus.includes('unknown order')
|| corpus.includes('does not exist')
|| corpus.includes('no such order')
);
}
private async syncOrderStatus(order: OrderSyncOrderRecord): Promise<'updated' | 'quarantined'> {
const orderId = order.order_id || order.id;
if (!orderId) {
logger.warn('[OrderSync] Order missing ID, skipping');
return 'updated';
}
try {
// Check if exchange supports getOrder
if (!this.exchange.getOrder) {
if (!OrderStatusSyncService.warnedCapabilities.has('getOrder')) {
OrderStatusSyncService.warnedCapabilities.add('getOrder');
logger.warn('[OrderSync] Exchange does not support getOrder. Background sync will be limited.');
}
return 'updated'; // Don't mark as unknown if we can't check
}
// Query exchange for order status
let exchangeOrder: any = null;
let missingFromExchange = false;
try {
// Ensure symbol is mapped to exchange format (e.g. BTC/USDT -> BTC/USD for Alpaca)
const tradeSymbol = SymbolMapper.toTradeSymbol(order.symbol, config.EXECUTION_PROVIDER);
exchangeOrder = await this.exchange.getOrder(orderId, tradeSymbol);
} catch (error: any) {
if (this.isMissingOrderError(error)) {
missingFromExchange = true;
logger.warn(`[OrderSync] Order ${orderId} not found on exchange (exception path): ${error.message}`);
} else {
// Transient exchange failure: keep existing status and retry later.
logger.debug(`[OrderSync] Could not fetch order ${orderId} from exchange: ${error.message}`);
return 'updated';
}
}
if (!exchangeOrder) {
missingFromExchange = true;
}
if (!missingFromExchange) {
this.missingExchangeDetections.delete(orderId);
}
if (missingFromExchange) {
const normalizedCurrentStatus = String(order.status || '').toLowerCase();
const alreadyClosed = ['filled', 'partially_filled', 'partially-filled', 'canceled', 'expired', 'rejected', 'unknown']
.includes(normalizedCurrentStatus);
if (alreadyClosed) {
this.missingExchangeDetections.delete(orderId);
logger.debug(`[OrderSync] Closed order ${orderId} missing from exchange; preserving DB state (${normalizedCurrentStatus}).`);
return 'updated';
}
const createdAt = order.created_at ? new Date(order.created_at).getTime() : Date.now();
const orderAge = Date.now() - createdAt;
const ageHours = orderAge / (1000 * 60 * 60);
const ageMinutes = orderAge / (1000 * 60);
const graceMinutes = Math.max(0, Number(config.ORDER_SYNC_MISSING_GRACE_MINUTES || 0));
const minConfirmations = Math.max(1, Math.floor(Number(config.ORDER_SYNC_MISSING_CONFIRMATION_COUNT || 1)));
if (ageMinutes < graceMinutes) {
logger.debug(`[OrderSync] Order ${orderId} missing on exchange but within grace window (${ageMinutes.toFixed(1)}m < ${graceMinutes}m).`);
return 'updated';
}
const detection = this.missingExchangeDetections.get(orderId) || { count: 0, lastSeenAt: 0 };
const nextDetection = {
count: detection.count + 1,
lastSeenAt: Date.now()
};
this.missingExchangeDetections.set(orderId, nextDetection);
if (nextDetection.count < minConfirmations) {
logger.warn(`[OrderSync] Order ${orderId} missing from exchange (confirmation ${nextDetection.count}/${minConfirmations}). Deferring state mutation.`);
return 'updated';
}
const normalizedAction = String(order.action || '').toUpperCase();
const normalizedSide = String((order as any).side || '').toUpperCase();
// If lifecycle is already closed, mark stale order canceled so dashboard
// does not keep ghost active rows. Applies to EXIT and legacy rows without action.
if (order.trade_id) {
const isLifecycleClosed = await runtimeOrderRepository.isTradeLifecycleClosed(
order.trade_id,
order.profile_id,
order.symbol
);
if (isLifecycleClosed) {
logger.warn(`[OrderSync] Order ${orderId} (${order.symbol}) not found on exchange, but lifecycle ${order.trade_id} is closed. Marking as canceled.`);
await runtimeOrderRepository.updateOrderStatus(orderId, 'canceled');
this.missingExchangeDetections.delete(orderId);
await this.emitStatusChange({
orderId,
symbol: order.symbol,
previousStatus: (order.status || '').toLowerCase(),
status: 'canceled',
action: order.action,
tradeId: order.trade_id,
userId: order.user_id,
profileId: order.profile_id
});
return 'updated';
}
}
// Legacy safety: older EXIT-like orders may be missing trade_id.
// For dedicated profiles, resolve by profile+symbol virtual lifecycle state.
const isExitLikeWithoutTradeId = !order.trade_id
&& !!order.profile_id
&& (normalizedAction === 'EXIT' || (!normalizedAction && normalizedSide === 'SELL'));
if (isExitLikeWithoutTradeId) {
try {
const virtualPosition = await runtimeOrderRepository.getVirtualOpenPosition(order.profile_id!, order.symbol);
if (!virtualPosition) {
logger.warn(`[OrderSync] Legacy EXIT-like order ${orderId} (${order.symbol}) has no open virtual lifecycle. Marking as canceled.`);
await runtimeOrderRepository.updateOrderStatus(orderId, 'canceled');
this.missingExchangeDetections.delete(orderId);
await this.emitStatusChange({
orderId,
symbol: order.symbol,
previousStatus: (order.status || '').toLowerCase(),
status: 'canceled',
action: order.action,
tradeId: order.trade_id,
userId: order.user_id,
profileId: order.profile_id
});
return 'updated';
}
} catch (virtualErr: any) {
logger.warn(`[OrderSync] Failed virtual lifecycle check for legacy order ${orderId}: ${virtualErr.message}`);
}
}
// If order is older than 24 hours and not found, mark as expired to clean up UI
if (ageHours > 24) {
logger.error(`[OrderSync][QUARANTINE] Order ${orderId} not found and >24h old. Marking as UNKNOWN for manual review.`);
await runtimeOrderRepository.updateOrderStatus(orderId, 'unknown');
this.missingExchangeDetections.delete(orderId);
await this.emitStatusChange({
orderId,
symbol: order.symbol,
previousStatus: (order.status || '').toLowerCase(),
status: 'unknown',
action: order.action,
tradeId: order.trade_id,
userId: order.user_id,
profileId: order.profile_id,
quarantined: true
});
return 'quarantined';
} else {
logger.debug(`[OrderSync] Order ${orderId} (${order.symbol}) not found on exchange (age: ${ageHours.toFixed(1)}h) - keeping existing status`);
}
return 'updated';
}
const exchangeStatus = String(exchangeOrder.status || '').toLowerCase();
const fillPriceRaw = exchangeOrder.filled_avg_price;
const fillPrice = fillPriceRaw !== undefined ? Number(fillPriceRaw) : undefined;
const fillQtyRaw = exchangeOrder.filled_qty ?? exchangeOrder.filledQty ?? exchangeOrder.filled_quantity;
const fillQty = fillQtyRaw !== undefined ? Number(fillQtyRaw) : undefined;
const dbQty = Number(order.qty ?? 0);
const dbPrice = Number(order.price ?? 0);
const fillLike = exchangeStatus === 'filled' || exchangeStatus === 'partially_filled';
const qtyDrift = fillLike
&& Number.isFinite(fillQty)
&& (fillQty as number) > 0
&& (!Number.isFinite(dbQty) || dbQty <= 0 || Math.abs((fillQty as number) - dbQty) / Math.max(Math.abs(fillQty as number), Math.abs(dbQty), 1e-9) > 1e-6);
const priceDrift = fillLike
&& Number.isFinite(fillPrice)
&& (fillPrice as number) > 0
&& (!Number.isFinite(dbPrice) || dbPrice <= 0 || Math.abs((fillPrice as number) - dbPrice) / Math.max(Math.abs(fillPrice as number), Math.abs(dbPrice), 1e-9) > 5e-4);
const fillDataDrift = qtyDrift || priceDrift;
const exchangeFilledAt = fillLike
? (() => {
const rawFilledAt = String(exchangeOrder.filled_at || '').trim();
if (!rawFilledAt) return new Date();
const parsed = Date.parse(rawFilledAt);
return Number.isFinite(parsed) && parsed > 0 ? new Date(parsed) : new Date();
})()
: undefined;
// Update DB if status changed
if (exchangeStatus && exchangeStatus !== order.status?.toLowerCase()) {
logger.info(`[OrderSync] Updating order ${orderId} (${order.symbol}): ${order.status}${exchangeStatus}`);
await runtimeOrderRepository.updateOrderStatus(
orderId,
exchangeStatus,
exchangeFilledAt,
Number.isFinite(fillPrice) ? fillPrice : undefined,
Number.isFinite(fillQty) ? fillQty : undefined
);
await this.emitStatusChange({
orderId,
symbol: order.symbol,
previousStatus: (order.status || '').toLowerCase(),
status: exchangeStatus,
action: order.action,
tradeId: order.trade_id,
userId: order.user_id,
profileId: order.profile_id,
filledAt: exchangeFilledAt,
fillPrice: Number.isFinite(fillPrice) ? fillPrice : undefined,
fillQty: Number.isFinite(fillQty) ? fillQty : undefined
});
this.missingExchangeDetections.delete(orderId);
} else if (fillDataDrift) {
logger.info(`[OrderSync] Refreshing fill data for ${orderId} (${order.symbol}): qty ${order.qty} -> ${fillQty}, price ${order.price} -> ${fillPrice}`);
await runtimeOrderRepository.updateOrderStatus(
orderId,
exchangeStatus || String(order.status || 'filled'),
exchangeFilledAt,
Number.isFinite(fillPrice) ? fillPrice : undefined,
Number.isFinite(fillQty) ? fillQty : undefined
);
await this.emitStatusChange({
orderId,
symbol: order.symbol,
previousStatus: (order.status || '').toLowerCase(),
status: exchangeStatus || (order.status || '').toLowerCase(),
action: order.action,
tradeId: order.trade_id,
userId: order.user_id,
profileId: order.profile_id,
filledAt: exchangeFilledAt,
fillPrice: Number.isFinite(fillPrice) ? fillPrice : undefined,
fillQty: Number.isFinite(fillQty) ? fillQty : undefined
});
this.missingExchangeDetections.delete(orderId);
} else {
logger.debug(`[OrderSync] Order ${orderId} status unchanged: ${exchangeStatus}`);
this.missingExchangeDetections.delete(orderId);
}
return 'updated';
} catch (error: any) {
logger.error(`[OrderSync] Failed to sync order ${orderId}: ${error.message}`);
throw normalizeThrown(error);
}
}
/**
* Manually trigger a sync (useful for testing or on-demand sync)
*/
public async triggerSync(): Promise<void> {
logger.info('[OrderSync] Manual sync triggered');
await this.syncStaleOrders();
}
public getLastStats() {
return this.lastStats;
}
}