learning_ai_invt_trdg/backend/src/index.ts

1445 lines
68 KiB
TypeScript

import { config, validateConfig, loadDynamicConfig } from './config/index.js';
import { ConnectorFactory } from './connectors/factory.js';
import { DirectionTracker, SignalType } from './strategies/directionTracker.js';
import { ProStrategyEngine } from './strategies/ProStrategyEngine.js';
import { SignalDirection } from './strategies/rules/types.js';
import { Notifier } from './services/notifier.js';
import { ApiServer } from './services/apiServer.js';
import logger from './utils/logger.js';
import { IExchangeConnector } from './connectors/types.js';
import { TradeExecutor } from './services/TradeExecutor.js';
import { AutoTrader, AutoTraderExecutionOutcome, PortfolioGuardResult } from './services/AutoTrader.js';
import { ManualTrader } from './services/ManualTrader.js';
import { TradeMonitor } from './services/tradeMonitor.js';
import { OrderStatusSyncEvent, OrderStatusSyncService } from './services/OrderStatusSyncService.js';
import { healthTracker } from './services/healthTracker.js';
import { observabilityService } from './services/observabilityService.js';
import { tradingTelemetry } from './services/tradingTelemetry.js';
import { reconciliationService } from './services/reconciliationService.js';
import { reconciliationWatchdogAutoResumeService } from './services/reconciliationWatchdogAutoResumeService.js';
import { getCurrentUserProfile, listActiveTradeProfiles } from './services/profileRepository.js';
import { listActiveTradingUsers } from './services/userRepository.js';
import * as runtimeOrderRepository from './services/runtimeOrderRepository.js';
import { listManualEntries, saveManualEntryForUser, type ManualEntryRecord } from './services/manualEntryRepository.js';
const SIMPLE_WORKFLOW_TYPE = 'simple';
const SIMPLE_WORKER_INTERVAL_MS = 15_000;
const toPositiveNumber = (value: unknown): number | null => {
const numeric = Number(value);
return Number.isFinite(numeric) && numeric > 0 ? numeric : null;
};
const toNonNegativeNumber = (value: unknown): number | null => {
const numeric = Number(value);
return Number.isFinite(numeric) && numeric >= 0 ? numeric : null;
};
const resolveSimpleDesiredQty = (entry: ManualEntryRecord, referencePrice: number): number | null => {
const sizingMode = String(entry.sizing_mode || 'quantity').trim().toLowerCase();
if (sizingMode === 'amount') {
const amountUsd = toPositiveNumber(entry.amount_usd);
if (!(amountUsd && amountUsd > 0) || !(referencePrice > 0)) return null;
const derivedQty = amountUsd / referencePrice;
return derivedQty > 0 ? derivedQty : null;
}
return toPositiveNumber(entry.quantity);
};
const normalizeTriggerMode = (value: unknown): 'dollar' | 'percent' | null => {
const normalized = String(value || '').trim().toLowerCase();
if (normalized === 'dollar' || normalized === 'percent') return normalized;
return null;
};
const isSimpleWorkflowEntry = (entry: ManualEntryRecord | null | undefined): entry is ManualEntryRecord => {
return String(entry?.workflow_type || '').trim().toLowerCase() === SIMPLE_WORKFLOW_TYPE;
};
const isSimpleBuyEntry = (entry: ManualEntryRecord): boolean => {
return String(entry.simple_side || '').trim().toLowerCase() === 'buy';
};
const isSimpleSellEntry = (entry: ManualEntryRecord): boolean => {
return String(entry.simple_side || '').trim().toLowerCase() === 'sell';
};
const isSimpleSubmittedStatus = (status?: string | null): boolean => {
return String(status || '').trim().toLowerCase() === 'simple_entry_submitted';
};
const isSimpleBoughtStatus = (status?: string | null): boolean => {
return String(status || '').trim().toLowerCase() === 'simple_bought';
};
const isSimpleSellArmedStatus = (status?: string | null): boolean => {
return String(status || '').trim().toLowerCase() === 'simple_armed_sell';
};
const isSimpleExitSubmittedStatus = (status?: string | null): boolean => {
return String(status || '').trim().toLowerCase() === 'simple_exit_submitted';
};
const shouldArmSimpleBuy = (entry: ManualEntryRecord): boolean => {
return String(entry.status || '').trim().toLowerCase() === 'simple_armed_buy';
};
const computeSimpleBuyTriggerPrice = (entry: ManualEntryRecord): number | null => {
const referencePrice = toPositiveNumber(entry.reference_price);
const threshold = toNonNegativeNumber(entry.drop_threshold_for_buy);
const mode = normalizeTriggerMode(entry.drop_trigger_mode);
if (!referencePrice || threshold === null || !mode) return null;
if (mode === 'dollar') {
const triggerPrice = referencePrice - threshold;
return triggerPrice > 0 ? Number(triggerPrice.toFixed(4)) : null;
}
const triggerPrice = referencePrice * (1 - (threshold / 100));
return triggerPrice > 0 ? Number(triggerPrice.toFixed(4)) : null;
};
const computeSimpleProfitTargetPrice = (entryPrice: number, entry: ManualEntryRecord): number | null => {
const threshold = toPositiveNumber(entry.gain_threshold_for_sell);
const mode = normalizeTriggerMode(entry.profit_target_mode);
if (!(entryPrice > 0) || !threshold || !mode) return null;
if (mode === 'dollar') {
return Number((entryPrice + threshold).toFixed(4));
}
return Number((entryPrice * (1 + (threshold / 100))).toFixed(4));
};
const resolvePreferredUserAlpacaCredentials = (user: {
ALPACA_API_KEY?: string;
ALPACA_SECRET_KEY?: string;
REAL_ALPACA_API_KEY?: string;
REAL_ALPACA_SECRET_KEY?: string;
}): { key: string; secret: string; source: 'paper' | 'live' | 'none' } => {
const paperKey = String(user.ALPACA_API_KEY || '').trim();
const paperSecret = String(user.ALPACA_SECRET_KEY || '').trim();
const liveKey = String(user.REAL_ALPACA_API_KEY || '').trim();
const liveSecret = String(user.REAL_ALPACA_SECRET_KEY || '').trim();
if (config.PAPER_TRADING) {
if (paperKey && paperSecret) return { key: paperKey, secret: paperSecret, source: 'paper' };
if (liveKey && liveSecret) return { key: liveKey, secret: liveSecret, source: 'live' };
} else {
if (liveKey && liveSecret) return { key: liveKey, secret: liveSecret, source: 'live' };
if (paperKey && paperSecret) return { key: paperKey, secret: paperSecret, source: 'paper' };
}
return { key: '', secret: '', source: 'none' };
};
async function main() {
logger.info(`Starting ${config.PRODUCT_ID} trading backend...`);
validateConfig();
// Telemetry init runs here (after bootstrap.ts Key Vault resolution)
tradingTelemetry.init();
tradingTelemetry.trackEvent('lifecycle', 'trading_loop', 'server_start', {
tags: { product: config.PRODUCT_ID, env: process.env.NODE_ENV ?? 'development' },
});
// --- 0. Primary Account Setup (for Market Data) ---
await loadDynamicConfig();
let dynamicConfigRefreshInFlight = false;
const refreshDynamicConfig = async () => {
if (dynamicConfigRefreshInFlight) return;
dynamicConfigRefreshInFlight = true;
try {
await loadDynamicConfig();
} catch (error: any) {
logger.error(`[Config] Dynamic refresh failed: ${error.message || error}`);
} finally {
dynamicConfigRefreshInFlight = false;
}
};
const dynamicConfigRefreshTimer = setInterval(
refreshDynamicConfig,
Math.max(15_000, Number(config.DYNAMIC_CONFIG_REFRESH_MS || 60_000))
);
if (typeof dynamicConfigRefreshTimer.unref === 'function') {
dynamicConfigRefreshTimer.unref();
}
// --- 0. Load user configuration (Cosmos-first; legacy Supabase optional) ---
logger.info('Fetching active trading users...');
let users = await listActiveTradingUsers();
// --- 1. Identify Primary Key (for Data Fetching) ---
const isPlaceholder = (val: string) => !val || val === 'your_key' || val === 'your_secret';
const preferredPrimaryUserCredentials = users.length > 0 ? resolvePreferredUserAlpacaCredentials(users[0]) : { key: '', secret: '', source: 'none' as const };
const primaryAlpacaKey = !isPlaceholder(config.ALPACA_API_KEY) ? config.ALPACA_API_KEY : preferredPrimaryUserCredentials.key;
const primaryAlpacaSecret = !isPlaceholder(config.ALPACA_API_SECRET) ? config.ALPACA_API_SECRET : preferredPrimaryUserCredentials.secret;
logger.info(`🚨 Bot Initialized for ${config.SYMBOLS.length} Symbols: ${config.SYMBOLS.join(', ')}`);
// --- 0. Initialize Modular Exchanges ---
const dataExchange = ConnectorFactory.getCustomConnector(
config.DATA_PROVIDER,
primaryAlpacaKey,
primaryAlpacaSecret,
{
paper: !isPlaceholder(config.ALPACA_API_KEY)
? config.PAPER_TRADING
: preferredPrimaryUserCredentials.source === 'paper',
}
);
const apiServer = new ApiServer(config.API_PORT);
apiServer.pruneSymbols(config.SYMBOLS);
const proEngine = new ProStrategyEngine(dataExchange);
const notifier = new Notifier();
const tracker = new DirectionTracker();
interface UserContext {
userId: string;
email: string;
profileId: string;
profileName: string;
profileSettings?: any;
monitoredSymbols: string[];
executor: TradeExecutor;
autoTrader: AutoTrader;
manualTrader: ManualTrader;
monitor: TradeMonitor;
orderSync: OrderStatusSyncService;
}
const userContexts: UserContext[] = [];
const orphanOrderSyncByUser = new Map<string, OrderStatusSyncService>();
let startedInFallbackMode = false;
const parseSymbols = (symbolsRaw?: string | null): string[] =>
String(symbolsRaw || '')
.split(',')
.map((value) => value.trim())
.filter(Boolean);
const resolveProfileSymbols = (profileSettings?: any): string[] => {
const profileSymbols = parseSymbols(profileSettings?.symbols);
return profileSymbols.length > 0 ? profileSymbols : config.SYMBOLS;
};
const collectMonitoredSymbols = (): string[] => {
const symbols = new Set<string>(config.SYMBOLS);
for (const ctx of userContexts) {
for (const symbol of (ctx.monitoredSymbols || [])) {
if (symbol) symbols.add(symbol);
}
}
return Array.from(symbols);
};
const getSimpleWorkerContext = (entry: ManualEntryRecord): UserContext | null => {
const requestedProfileId = String(entry.profile_id || '').trim();
const requestedUserId = String(entry.user_id || '').trim();
if (requestedProfileId) {
const byProfile = userContexts.find((ctx) => ctx.profileId === requestedProfileId);
if (byProfile) return byProfile;
}
if (requestedUserId) {
const byUser = userContexts.find((ctx) => ctx.userId === requestedUserId);
if (byUser) return byUser;
}
return null;
};
const resolveSimpleMarketPrice = (entry: ManualEntryRecord): number | null => {
const symbol = String(entry.symbol || '').trim().toUpperCase();
if (!symbol) return null;
const livePrice = Number(apiServer.getState().symbols?.[symbol]?.price || 0);
return Number.isFinite(livePrice) && livePrice > 0 ? livePrice : null;
};
const bindSimpleBoughtPosition = async (entry: ManualEntryRecord, ctx: UserContext): Promise<boolean> => {
const linkedTradeId = String(entry.linked_trade_id || '').trim();
const activePosition = linkedTradeId
? ctx.executor.getActivePosition(entry.symbol, linkedTradeId)
: ctx.executor.getActivePosition(entry.symbol);
if (!activePosition) {
return false;
}
await saveManualEntryForUser(entry.user_id, {
...entry,
profile_id: entry.profile_id || ctx.profileId,
linked_trade_id: activePosition.tradeId || entry.linked_trade_id,
entry_price: activePosition.entryPrice,
filled_quantity: activePosition.size,
buy_time: entry.buy_time || new Date().toISOString(),
status: 'simple_bought',
active: true,
});
return true;
};
let simpleWorkerRunning = false;
const runSimpleWorker = async () => {
if (simpleWorkerRunning) return;
simpleWorkerRunning = true;
try {
const entries = (await listManualEntries()).filter((entry) => entry.active && isSimpleWorkflowEntry(entry));
const processedSimpleExitKeys = new Set<string>();
for (const entry of entries) {
const symbol = String(entry.symbol || '').trim().toUpperCase();
if (!symbol) continue;
const ctx = getSimpleWorkerContext(entry);
if (!ctx) continue;
if (shouldArmSimpleBuy(entry)) {
const reboundExistingPosition = String(entry.linked_trade_id || '').trim()
? await bindSimpleBoughtPosition(entry, ctx)
: false;
if (reboundExistingPosition) {
logger.info(`[SimpleWorker] Rebound armed buy setup to existing open holding for ${symbol}`);
continue;
}
const currentPrice = resolveSimpleMarketPrice(entry) ?? toPositiveNumber(entry.reference_price);
const triggerPrice = computeSimpleBuyTriggerPrice(entry);
if (!(currentPrice && currentPrice > 0) || !triggerPrice) continue;
const desiredQty = resolveSimpleDesiredQty(entry, currentPrice);
const threshold = toNonNegativeNumber(entry.drop_threshold_for_buy);
if (desiredQty === null) continue;
if (currentPrice > triggerPrice) continue;
const result = await ctx.manualTrader.executeRequest(
symbol,
'buy',
desiredQty,
threshold === 0 ? 'market' : 'limit',
threshold === 0 ? undefined : triggerPrice,
currentPrice,
entry.user_id,
undefined,
undefined,
{
allowExistingPosition: true,
tradeIdHint: String(entry.linked_trade_id || '').trim() || `TRD-SIMPLE-${String(entry.stock_instance_id || '').replace(/[^A-Za-z0-9]/g, '').slice(0, 24)}`,
concurrencyKey: String(entry.stock_instance_id || '').trim() || symbol
}
);
if (!result.success) {
logger.warn(`[SimpleWorker] Buy trigger failed for ${symbol}: ${result.error || 'unknown error'}`);
continue;
}
await saveManualEntryForUser(entry.user_id, {
...entry,
profile_id: entry.profile_id || ctx.profileId,
linked_trade_id: result.tradeId || entry.linked_trade_id,
filled_quantity: result.adjustedQty ?? entry.filled_quantity,
buy_time: new Date().toISOString(),
status: 'simple_entry_submitted',
active: true,
});
continue;
}
const currentPrice = resolveSimpleMarketPrice(entry);
if (!(currentPrice && currentPrice > 0)) continue;
if (isSimpleSubmittedStatus(entry.status)) {
await bindSimpleBoughtPosition(entry, ctx);
continue;
}
if (!(isSimpleBoughtStatus(entry.status) || isSimpleSellArmedStatus(entry.status) || isSimpleExitSubmittedStatus(entry.status))) {
continue;
}
const linkedTradeId = String(entry.linked_trade_id || '').trim();
const exitDedupKey = linkedTradeId ? `${symbol}::${linkedTradeId}` : symbol;
if (processedSimpleExitKeys.has(exitDedupKey)) {
continue;
}
const exitLifecycle = ctx.executor.getExitLifecycle(symbol);
if (
exitLifecycle.state === 'initiated'
|| exitLifecycle.state === 'order_placed'
|| exitLifecycle.state === 'verifying'
|| exitLifecycle.state === 'quarantined'
) {
processedSimpleExitKeys.add(exitDedupKey);
continue;
}
const activePosition = linkedTradeId
? ctx.executor.getActivePosition(symbol, linkedTradeId)
: ctx.executor.getActivePosition(symbol);
if (isSimpleExitSubmittedStatus(entry.status)) {
if (!activePosition) {
await saveManualEntryForUser(entry.user_id, {
...entry,
active: false,
status: 'sellCompleted',
sell_time: entry.sell_time || new Date().toISOString(),
sell_price: currentPrice,
});
}
continue;
}
if (!activePosition) {
if (isSimpleBoughtStatus(entry.status)) {
await bindSimpleBoughtPosition(entry, ctx);
}
continue;
}
const activeEntryPrice = toPositiveNumber(entry.entry_price) || activePosition.entryPrice;
const targetPrice = computeSimpleProfitTargetPrice(activeEntryPrice, entry);
if (!targetPrice || currentPrice < targetPrice) continue;
const exitResult = await ctx.manualTrader.executeExit(symbol, currentPrice, 'Simple target hit', linkedTradeId || undefined);
if (!exitResult.success) {
logger.warn(`[SimpleWorker] Exit trigger failed for ${symbol}: ${exitResult.error || 'unknown error'}`);
processedSimpleExitKeys.add(exitDedupKey);
continue;
}
processedSimpleExitKeys.add(exitDedupKey);
await saveManualEntryForUser(entry.user_id, {
...entry,
profile_id: entry.profile_id || ctx.profileId,
linked_trade_id: linkedTradeId || activePosition.tradeId,
entry_price: activeEntryPrice,
filled_quantity: entry.filled_quantity ?? activePosition.size,
status: 'simple_exit_submitted',
active: true,
});
}
} catch (error: any) {
logger.error(`[SimpleWorker] Error during simple setup scan: ${error.message || error}`);
} finally {
simpleWorkerRunning = false;
}
};
const buildOrderSyncHandler = (
executor: TradeExecutor,
scopeLabel: string
) => async (event: OrderStatusSyncEvent): Promise<void> => {
const normalizedAction = (event.action || '').toUpperCase();
const normalizedStatus = (event.status || '').toLowerCase();
if (event.quarantined) {
logger.error(`[OrderSync] Quarantined order ${event.orderId} (${event.symbol}) in ${scopeLabel}. Manual review required.`);
return;
}
if (normalizedAction !== 'EXIT') return;
if (normalizedStatus !== 'filled' && normalizedStatus !== 'partially_filled') return;
const active = executor.getActivePosition(event.symbol, event.tradeId);
if (!active) return;
const fallbackPrice = active.entryPrice;
const exitPrice = event.fillPrice && event.fillPrice > 0 ? event.fillPrice : fallbackPrice;
const fillQty = event.fillQty && event.fillQty > 0
? Math.min(event.fillQty, active.size)
: (normalizedStatus === 'filled' ? active.size : undefined);
if (normalizedStatus === 'partially_filled' && (!fillQty || fillQty <= 0)) {
logger.error(`[OrderSync] EXIT partial fill missing qty for ${event.orderId} (${event.symbol}) in ${scopeLabel}. Manual review required.`);
return;
}
const applied = await executor.applyExitFill(
event.symbol,
exitPrice,
fillQty,
'Order Sync Exit Fill',
event.tradeId
);
if (!applied.success) {
logger.error(`[OrderSync] Failed to apply EXIT fill for ${event.orderId} (${event.symbol}) in ${scopeLabel}: ${applied.error || 'unknown'}`);
return;
}
if (!applied.fullyClosed) {
logger.info(`[OrderSync] Applied partial EXIT for ${event.symbol} in ${scopeLabel}: filled=${applied.appliedQty}, remaining=${applied.remainingSize}`);
}
};
const buildPortfolioGuard = (
userId: string,
currentExecutor: TradeExecutor,
initialProfileSettings?: any
) => async (): Promise<PortfolioGuardResult> => {
const relatedContexts = userContexts.filter((ctx) => ctx.executor.getUserId() === userId);
let accountOpenTrades = 0;
let accountCommittedCapital = 0;
let accountAllocatedCapital = 0;
for (const ctx of relatedContexts) {
const positions = Array.from(ctx.executor.getAllPositions().values());
accountOpenTrades += positions.length;
accountCommittedCapital += positions.reduce((sum, pos) => sum + (pos.entryPrice * pos.size), 0);
accountAllocatedCapital += Number(ctx.profileSettings?.allocated_capital || 0);
}
// During context bootstrap, include current profile if it is not in the shared list yet.
if (!relatedContexts.some((ctx) => ctx.executor === currentExecutor)) {
const currentPositions = Array.from(currentExecutor.getAllPositions().values());
accountOpenTrades += currentPositions.length;
accountCommittedCapital += currentPositions.reduce((sum, pos) => sum + (pos.entryPrice * pos.size), 0);
accountAllocatedCapital += Number(initialProfileSettings?.allocated_capital || 0);
}
if (accountOpenTrades >= config.MAX_OPEN_TRADES_PER_ACCOUNT) {
return {
allowed: false,
reason: `max account open trades reached (${config.MAX_OPEN_TRADES_PER_ACCOUNT})`
};
}
if (accountAllocatedCapital > 0 && accountCommittedCapital >= accountAllocatedCapital) {
return {
allowed: false,
reason: `account capital fully committed (${accountCommittedCapital.toFixed(2)} / ${accountAllocatedCapital.toFixed(2)})`
};
}
return { allowed: true };
};
// --- Helper: Build a UserContext from a profile ---
async function buildProfileContext(profile: any): Promise<UserContext | null> {
const snapshotUser = users.find(u => u.user_id === profile.user_id);
const currentProfile = await getCurrentUserProfile(String(profile.user_id || '').trim(), snapshotUser || {
user_id: String(profile.user_id || '').trim(),
trade_enable: true,
});
if (!currentProfile) {
logger.warn(`⚠️ Profile ${profile.name} owner not found in active users. Skipping.`);
return null;
}
const user = {
user_id: String(currentProfile.user_id || ''),
first_name: String(currentProfile.first_name || ''),
last_name: String(currentProfile.last_name || ''),
email: String(currentProfile.email || ''),
FMP_API_KEY: String(currentProfile.FMP_API_KEY || ''),
ALPACA_API_KEY: String(currentProfile.ALPACA_API_KEY || ''),
ALPACA_SECRET_KEY: String(currentProfile.ALPACA_SECRET_KEY || ''),
REAL_ALPACA_API_KEY: String(currentProfile.REAL_ALPACA_API_KEY || ''),
REAL_ALPACA_SECRET_KEY: String(currentProfile.REAL_ALPACA_SECRET_KEY || ''),
role: String(currentProfile.role || 'member'),
trade_enable: Boolean(currentProfile.trade_enable),
drop_threshold_for_buy: Number(currentProfile.drop_threshold_for_buy || 0),
gain_threshold_for_sell: Number(currentProfile.gain_threshold_for_sell || 0),
market_poll_interval_in_seconds: Number(currentProfile.market_poll_interval_in_seconds || 0),
};
const existingUserIndex = users.findIndex((candidate) => candidate.user_id === user.user_id);
if (existingUserIndex >= 0) {
users[existingUserIndex] = user;
} else {
users.push(user);
}
const preferredCredentials = resolvePreferredUserAlpacaCredentials(user);
const userKey = preferredCredentials.key;
const userSecret = preferredCredentials.secret;
if (!userKey || !userSecret) {
logger.warn(`⚠️ User ${user.email} missing keys for profile ${profile.name}. Skipping.`);
return null;
}
const userExecExchange = ConnectorFactory.getCustomConnector(
config.EXECUTION_PROVIDER,
userKey,
userSecret,
{ paper: preferredCredentials.source === 'paper' }
);
const userExecutor = new TradeExecutor(userExecExchange, apiServer, user.user_id, profile.id);
userExecutor.setProfileSettings(profile);
const monitoredSymbols = resolveProfileSymbols(profile);
await userExecutor.syncPositions(monitoredSymbols);
await userExecutor.rebuildStartupState();
await userExecutor.crossValidateCapitalLedger(monitoredSymbols);
const userAutoTrader = new AutoTrader(
userExecutor,
userExecExchange,
buildPortfolioGuard(user.user_id, userExecutor, profile)
);
const userManualTrader = new ManualTrader(userExecutor);
apiServer.registerManualTrader(profile.id, userManualTrader);
const userMonitor = new TradeMonitor(userExecExchange, userExecutor, apiServer);
userMonitor.start();
const userOrderSync = new OrderStatusSyncService(
userExecExchange,
config.ORDER_SYNC_INTERVAL_MS,
profile.id,
buildOrderSyncHandler(userExecutor, `profile=${profile.id}`)
);
userOrderSync.start();
if (!orphanOrderSyncByUser.has(user.user_id)) {
const orphanOrderSync = new OrderStatusSyncService(
userExecExchange,
config.ORDER_SYNC_INTERVAL_MS,
undefined,
undefined,
{
userId: user.user_id,
profileNullOnly: true
}
);
orphanOrderSync.start();
orphanOrderSyncByUser.set(user.user_id, orphanOrderSync);
}
return {
userId: user.user_id,
email: user.email,
profileId: profile.id,
profileName: profile.name,
profileSettings: profile,
monitoredSymbols,
executor: userExecutor,
autoTrader: userAutoTrader,
manualTrader: userManualTrader,
monitor: userMonitor,
orderSync: userOrderSync
};
}
// --- 0. Initialize Signal Subscribers (Users & Profiles) ---
const profiles = await listActiveTradeProfiles();
if (profiles.length > 0) {
logger.info(`👥 Initializing Execution Managers for ${profiles.length} Trade Profiles...`);
for (const profile of profiles) {
const ctx = await buildProfileContext(profile);
if (ctx) {
userContexts.push(ctx);
logger.info(`✅ Profile Ready: ${ctx.profileName} (${ctx.email}) [profile_id: ${ctx.profileId}]`);
}
}
} else if (users.length > 0) {
startedInFallbackMode = true;
// Fallback to one-per-user if no profiles table entries exist
logger.info(`👥 No specific profiles found. Falling back to multi-user default...`);
for (const user of users) {
const preferredCredentials = resolvePreferredUserAlpacaCredentials(user);
const userKey = preferredCredentials.key;
const userSecret = preferredCredentials.secret;
if (!userKey || !userSecret) continue;
const defaultProfileId = `default-${user.user_id}`;
const userExecExchange = ConnectorFactory.getCustomConnector(
config.EXECUTION_PROVIDER,
userKey,
userSecret,
{ paper: preferredCredentials.source === 'paper' }
);
const userExecutor = new TradeExecutor(userExecExchange, apiServer, user.user_id, defaultProfileId);
await userExecutor.syncPositions(config.SYMBOLS);
await userExecutor.rebuildStartupState();
await userExecutor.crossValidateCapitalLedger(config.SYMBOLS);
const userAutoTrader = new AutoTrader(
userExecutor,
userExecExchange,
buildPortfolioGuard(user.user_id, userExecutor, { allocated_capital: config.TOTAL_CAPITAL })
);
const userManualTrader = new ManualTrader(userExecutor);
apiServer.registerManualTrader(defaultProfileId, userManualTrader);
const userMonitor = new TradeMonitor(userExecExchange, userExecutor, apiServer);
userMonitor.start();
const userOrderSync = new OrderStatusSyncService(
userExecExchange,
config.ORDER_SYNC_INTERVAL_MS,
defaultProfileId,
buildOrderSyncHandler(userExecutor, `profile=${defaultProfileId}`)
);
userOrderSync.start();
if (!orphanOrderSyncByUser.has(user.user_id)) {
const orphanOrderSync = new OrderStatusSyncService(
userExecExchange,
config.ORDER_SYNC_INTERVAL_MS,
undefined,
undefined,
{
userId: user.user_id,
profileNullOnly: true
}
);
orphanOrderSync.start();
orphanOrderSyncByUser.set(user.user_id, orphanOrderSync);
}
userContexts.push({
userId: user.user_id,
email: user.email,
profileId: defaultProfileId,
profileName: 'Default',
monitoredSymbols: [...config.SYMBOLS],
executor: userExecutor,
autoTrader: userAutoTrader,
manualTrader: userManualTrader,
monitor: userMonitor,
orderSync: userOrderSync
});
}
if (!isPlaceholder(config.ALPACA_API_KEY) && !isPlaceholder(config.ALPACA_API_SECRET)) {
logger.info(`⚠️ No DB Users. Initializing Legacy Single-User Mode from .env`);
const executionExchange = ConnectorFactory.getCustomConnector(config.EXECUTION_PROVIDER, config.ALPACA_API_KEY, config.ALPACA_API_SECRET);
const executor = new TradeExecutor(executionExchange, apiServer, 'global', 'global');
await executor.syncPositions(config.SYMBOLS);
await executor.rebuildStartupState();
await executor.crossValidateCapitalLedger(config.SYMBOLS);
const autoTrader = new AutoTrader(
executor,
executionExchange,
buildPortfolioGuard('global', executor, { allocated_capital: config.TOTAL_CAPITAL })
);
const manualTrader = new ManualTrader(executor);
// --- Register using 'global' ID ---
apiServer.registerManualTrader('global', manualTrader);
const tradeMonitor = new TradeMonitor(executionExchange, executor, apiServer);
tradeMonitor.start();
const globalOrderSync = new OrderStatusSyncService(
executionExchange,
config.ORDER_SYNC_INTERVAL_MS,
'global',
buildOrderSyncHandler(executor, 'profile=global')
);
globalOrderSync.start();
userContexts.push({
userId: 'global',
email: 'Global .env User',
profileId: 'global',
profileName: 'Default',
monitoredSymbols: [...config.SYMBOLS],
executor,
autoTrader,
manualTrader,
monitor: tradeMonitor,
orderSync: globalOrderSync
});
}
}
if (userContexts.length === 0) {
logger.error('❌ No valid execution users found (DB or .env). Bot cannot trade.');
}
observabilityService.registerProfileProvider(() =>
userContexts
.map((ctx) => String(ctx.profileId || '').trim())
.filter(Boolean)
);
observabilityService.startCapitalWatchdog();
// --- PROFILE HOT-RELOAD: Sync new/updated/deactivated profiles periodically ---
setInterval(async () => {
try {
users = await listActiveTradingUsers();
const latestProfiles = await listActiveTradeProfiles();
const currentIds = new Set(userContexts.map(c => c.profileId));
const latestIds = new Set(latestProfiles.map((p: any) => p.id));
// 1. ADD new profiles that don't exist yet
for (const profile of latestProfiles) {
if (!currentIds.has(profile.id)) {
logger.info(`🆕 [ProfileSync] New profile detected: ${profile.name} (${profile.id}). Initializing...`);
const ctx = await buildProfileContext(profile);
if (ctx) {
userContexts.push(ctx);
logger.info(`✅ [ProfileSync] Profile hot-loaded: ${ctx.profileName} (${ctx.email})`);
}
}
}
if (startedInFallbackMode && latestProfiles.length > 0) {
for (let i = userContexts.length - 1; i >= 0; i--) {
const ctx = userContexts[i];
const isFallbackContext = ctx.profileId.startsWith('default-') || ctx.profileId === 'global';
if (!isFallbackContext) continue;
logger.info(`[ProfileSync] Retiring fallback context ${ctx.profileId} now that DB profiles are active.`);
ctx.monitor.stop();
ctx.orderSync.stop();
ctx.executor.dispose();
apiServer.unregisterManualTrader(ctx.profileId);
const removedUserId = ctx.userId;
userContexts.splice(i, 1);
const hasSameUserContext = userContexts.some((candidate) => candidate.userId === removedUserId);
if (!hasSameUserContext) {
const orphanSync = orphanOrderSyncByUser.get(removedUserId);
if (orphanSync) {
orphanSync.stop();
orphanOrderSyncByUser.delete(removedUserId);
}
}
}
startedInFallbackMode = false;
}
// 2. REMOVE profiles that were deactivated or deleted
for (let i = userContexts.length - 1; i >= 0; i--) {
const ctx = userContexts[i];
// Skip non-DB profiles (legacy/global)
if (ctx.profileId.startsWith('default-') || ctx.profileId === 'global') continue;
if (!latestIds.has(ctx.profileId)) {
logger.info(`🗑️ [ProfileSync] Profile removed/deactivated: ${ctx.profileName} (${ctx.profileId}). Stopping monitor.`);
ctx.monitor.stop();
ctx.orderSync.stop();
ctx.executor.dispose();
apiServer.unregisterManualTrader(ctx.profileId);
const removedUserId = ctx.userId;
userContexts.splice(i, 1);
const hasSameUserContext = userContexts.some((candidate) => candidate.userId === removedUserId);
if (!hasSameUserContext) {
const orphanSync = orphanOrderSyncByUser.get(removedUserId);
if (orphanSync) {
orphanSync.stop();
orphanOrderSyncByUser.delete(removedUserId);
}
}
}
}
// 3. UPDATE settings for existing profiles (strategy_config, symbols, capital, etc.)
for (const profile of latestProfiles) {
const ctx = userContexts.find(c => c.profileId === profile.id);
if (ctx) {
const nextSymbols = resolveProfileSymbols(profile);
const previousSymbols = ctx.monitoredSymbols || [];
const didSymbolsChange =
previousSymbols.length !== nextSymbols.length
|| previousSymbols.some((symbol, idx) => symbol !== nextSymbols[idx]);
// Update profileSettings in-place so the next trading loop picks up changes
ctx.profileSettings = profile;
ctx.profileName = profile.name;
ctx.executor.setProfileSettings(profile);
ctx.monitoredSymbols = nextSymbols;
if (didSymbolsChange) {
const resyncSymbols = Array.from(new Set([...previousSymbols, ...nextSymbols]));
logger.info(`[ProfileSync] Symbols changed for ${ctx.profileName} (${ctx.profileId}). Re-syncing: ${resyncSymbols.join(', ')}`);
await ctx.executor.syncPositions(resyncSymbols);
}
}
}
const monitoredSymbols = collectMonitoredSymbols();
apiServer.pruneSymbols(monitoredSymbols);
} catch (err: any) {
logger.error(`[ProfileSync] Error during profile sync: ${err.message}`);
}
}, config.PROFILE_SYNC_INTERVAL_MS);
apiServer.updateSettings({
executionMode: config.ENABLE_TRADING ? (config.PAPER_TRADING ? 'Paper' : 'Live') : 'Alerts',
riskPerTrade: config.PRO_STRATEGY.PARAMETERS.RISK_PER_TRADE,
totalCapital: config.TOTAL_CAPITAL,
maxOpenTrades: config.MAX_OPEN_TRADES,
isAlgoEnabled: config.ENABLE_TRADING,
enabledRules: proEngine.getEnabledRules()
});
logger.info(`Monitoring ${collectMonitoredSymbols().join(', ')} via DATA=${config.DATA_PROVIDER} and EXECUTION=${config.EXECUTION_PROVIDER}`);
void runSimpleWorker();
const simpleWorkerTimer = setInterval(() => {
void runSimpleWorker();
}, SIMPLE_WORKER_INTERVAL_MS);
if (typeof simpleWorkerTimer.unref === 'function') {
simpleWorkerTimer.unref();
}
// --- State variables for periodic alerts (Global Signal State) ---
const assetState = new Map<string, {
price: number;
signal: SignalType;
ema: number;
rsi: number;
entryPrice: number | null; // Used for Legacy Low-stress mode (Single user assumption in legacy code)
}>();
// Initialize state
collectMonitoredSymbols().forEach(s => {
assetState.set(s, {
price: 0,
signal: SignalType.NONE,
ema: 0,
rsi: 0,
entryPrice: null
});
});
apiServer.updateSettings({
enabledRules: proEngine.getEnabledRules()
});
let isTradingLoopRunning = false;
let isReconciliationRunning = false;
// --- 1. Main Trading Loop (Signal Changes) ---
const tradingLoop = async () => {
if (healthTracker.isTradingPaused()) {
logger.info('[Loop] Trading control is PAUSED. Skipping trading cycle.');
apiServer.publishHealthSnapshot({ broadcast: true });
return;
}
if (isTradingLoopRunning) {
logger.warn('[Loop] Previous trading cycle is still running. Skipping this interval tick.');
return;
}
isTradingLoopRunning = true;
const loopStartedAt = Date.now();
apiServer.updateRuntimeHealth({
tradingLoopRunning: true,
tradingLoopLastStartedAt: loopStartedAt
});
let loopSucceeded = true;
try {
const monitoredSymbols = collectMonitoredSymbols();
for (const symbol of monitoredSymbols) {
try {
// ... (pro mode check omitted) ...
const useProMode = config.PRO_STRATEGY?.ENABLED_RULES?.length > 0;
if (!assetState.has(symbol)) {
assetState.set(symbol, {
price: 0,
signal: SignalType.NONE,
ema: 0,
rsi: 0,
entryPrice: null
});
}
const state = assetState.get(symbol)!;
if (useProMode) {
// --- PER-PROFILE STRATEGY EXECUTION ---
// Each profile can have its own enabled rules via strategy_config.rules[]
// We run ProEngine per-profile so each profile only triggers on its own rules.
let baselineResult: any = null;
const sharedContext = await proEngine.buildMarketContext(symbol);
if (!sharedContext) {
continue;
}
const context = sharedContext;
const profileSignals: Record<string, {
profileName?: string;
signal: string;
passed: boolean;
reason?: string;
execution?: {
status: AutoTraderExecutionOutcome['status'];
code: string;
reason: string;
orderId?: string;
};
rules?: Record<string, any>;
}> = {};
const aggregatedRuleBuckets = new Map<string, {
passed: number;
failed: number;
pending: number;
skipped: number;
confidences: number[];
}>();
const directionalSignals: SignalDirection[] = [];
let currentDisplaySignal = state.signal as unknown as string;
if (userContexts.length > 0) {
const profileEvaluations = await Promise.allSettled(userContexts.map(async (userCtx) => {
const profileRules = userCtx.profileSettings?.strategy_config?.rules;
const minRulePassRatio = userCtx.profileSettings?.strategy_config?.execution?.minRulePassRatio ?? 1.0;
const result = await proEngine.evaluateContext(sharedContext, profileRules, minRulePassRatio);
if (!result) return null;
let executionOutcome: AutoTraderExecutionOutcome | null = null;
if (context) {
executionOutcome = await userCtx.autoTrader.handleSignal(symbol, result, context, userCtx.profileSettings);
}
return {
profileId: userCtx.profileId,
profileName: userCtx.profileName,
result,
executionOutcome
};
}));
for (const evaluation of profileEvaluations) {
if (evaluation.status !== 'fulfilled' || !evaluation.value) continue;
const { profileId, profileName, result, executionOutcome } = evaluation.value;
const profileRuleStatuses = result?.metadata?.ruleStatuses || {};
if (!baselineResult) {
baselineResult = result;
}
profileSignals[profileId] = {
profileName,
signal: String(result.signal || SignalDirection.NONE),
passed: Boolean(result.passed),
reason: result.reason,
execution: executionOutcome || undefined,
rules: profileRuleStatuses
};
if (result.passed && result.signal && result.signal !== SignalDirection.NONE) {
directionalSignals.push(result.signal as SignalDirection);
}
for (const [ruleName, ruleState] of Object.entries(profileRuleStatuses)) {
const bucket = aggregatedRuleBuckets.get(ruleName) || {
passed: 0,
failed: 0,
pending: 0,
skipped: 0,
confidences: []
};
const normalizedRuleState = ruleState as {
passed?: boolean;
isPending?: boolean;
isSkipped?: boolean;
metadata?: { confidence?: number };
};
if (normalizedRuleState.isSkipped) {
bucket.skipped += 1;
} else if (normalizedRuleState.isPending) {
bucket.pending += 1;
} else if (normalizedRuleState.passed) {
bucket.passed += 1;
} else {
bucket.failed += 1;
}
const confidence = Number(normalizedRuleState.metadata?.confidence);
if (Number.isFinite(confidence)) {
bucket.confidences.push(confidence);
}
aggregatedRuleBuckets.set(ruleName, bucket);
}
}
if (directionalSignals.length > 0) {
const uniqueSignals = Array.from(new Set(directionalSignals));
currentDisplaySignal = uniqueSignals.length === 1
? uniqueSignals[0]
: 'MIXED';
} else {
currentDisplaySignal = 'NONE';
}
} else {
baselineResult = await proEngine.evaluateContext(sharedContext);
if (baselineResult?.passed && baselineResult.signal !== SignalDirection.NONE) {
currentDisplaySignal = String(baselineResult.signal);
} else {
currentDisplaySignal = 'NONE';
}
}
if (currentDisplaySignal !== (state.signal as unknown as string)) {
if (currentDisplaySignal === SignalDirection.BUY || currentDisplaySignal === SignalDirection.SELL) {
const message = `PRO SIGNAL: ${currentDisplaySignal}\nAsset: ${symbol}`;
await notifier.sendAlert(message);
apiServer.addAlert('signal', symbol, `${currentDisplaySignal} Signal`);
} else if (currentDisplaySignal === 'MIXED') {
apiServer.addAlert('info', symbol, `Mixed profile signals on ${symbol}; review profile signal panel.`);
}
}
state.signal = currentDisplaySignal as unknown as SignalType;
if (context && context.currentPrice) {
state.price = context.currentPrice;
state.ema = context.ema20_1h || 0;
state.rsi = context.rsi_1h || 0;
let displayPos = null;
const symbolPositions = userContexts
.map((ctx) => ctx.executor.getActivePosition(symbol))
.filter((pos): pos is NonNullable<typeof pos> => !!pos);
if (symbolPositions.length === 1) {
const pos = symbolPositions[0];
const pnl = (state.price - pos.entryPrice) * pos.size * (pos.side === SignalDirection.BUY ? 1 : -1);
const pnlPercent = ((state.price - pos.entryPrice) / pos.entryPrice) * 100 * (pos.side === SignalDirection.BUY ? 1 : -1);
displayPos = {
...pos,
side: pos.side as 'BUY' | 'SELL',
unrealizedPnl: pnl,
unrealizedPnlPercent: pnlPercent,
marketValue: state.price * pos.size
};
}
const aggregatedRules: Record<string, { passed: boolean; reason: string; metadata?: any }> = {};
for (const [ruleName, bucket] of aggregatedRuleBuckets.entries()) {
const totalObserved = bucket.passed + bucket.failed;
const avgConfidence = bucket.confidences.length > 0
? bucket.confidences.reduce((sum, val) => sum + val, 0) / bucket.confidences.length
: undefined;
aggregatedRules[ruleName] = {
passed: totalObserved > 0 ? bucket.failed === 0 : false,
reason: `${bucket.passed} pass / ${bucket.failed} fail / ${bucket.pending} pending / ${bucket.skipped} skipped`,
metadata: avgConfidence !== undefined
? { confidence: Number(avgConfidence.toFixed(2)) }
: undefined
};
}
if (!Object.keys(aggregatedRules).length && baselineResult?.metadata?.ruleStatuses) {
for (const [ruleName, ruleState] of Object.entries(baselineResult.metadata.ruleStatuses)) {
const normalized = ruleState as { passed?: boolean; reason?: string; metadata?: any };
aggregatedRules[ruleName] = {
passed: !!normalized.passed,
reason: normalized.reason || 'No reason provided',
metadata: normalized.metadata
};
}
}
apiServer.updateSymbol(symbol, {
price: state.price,
change24h: context.change24h,
changeToday: context.changeToday,
session: context.session,
volatility: context.volatility,
signal: currentDisplaySignal,
tradingMode: config.ENABLE_TRADING ? (config.PAPER_TRADING ? 'Paper' : 'Live') : 'Alerts',
activePosition: displayPos,
profileSignals,
indicators: {
ema20_1h: context.ema20_1h,
ema20_15m: context.ema20_15m,
ema50_4h: context.ema50_4h,
ema200_4h: context.ema200_4h,
rsi_1h: context.rsi_1h,
rsi_15m: context.rsi_15m
},
rules: aggregatedRules
});
} else if (baselineResult === null) {
logger.warn(`[Dashboard] Insufficient data for ${symbol}.`);
}
// Continue to next symbol after handling PRO logic
continue;
}
// --- LEGACY LOGIC BELOW (Only runs if Pro Msg is disabled) ---
const candles = await dataExchange.fetchOHLCV(symbol, config.TIMEFRAME);
if (!candles || candles.length === 0) {
logger.warn(`No data received for ${symbol}.`);
continue;
}
const legacyResult = tracker.calculateDirection(candles);
state.price = candles[candles.length - 1].close;
// Track changes
if (legacyResult.signal !== state.signal) { // Simple change check
if (config.ENABLE_TREND_ALERTS) {
const message = `🚨 *Trend Signal Alert* 🚨\nSignal: ${legacyResult.signal}\nAsset: ${symbol}\nPrice: ${state.price}\nEMA-20: ${legacyResult.ema.toFixed(2)}\nRSI-14: ${legacyResult.rsi.toFixed(2)}\nTimeframe: ${config.TIMEFRAME}`;
await notifier.sendAlert(message);
}
// Update state
state.signal = legacyResult.signal;
state.ema = legacyResult.ema;
state.rsi = legacyResult.rsi;
// Low Stress Logic (Legacy - Only Global State Supported for now in Legacy Mode)
if (config.LOW_STRESS_MODE) {
if (legacyResult.signal === SignalType.BUY) {
state.entryPrice = state.price;
logger.info(`[Low-Stress] ${symbol} Entry Price set at ${state.entryPrice}`);
} else if (legacyResult.signal === SignalType.SELL) {
state.entryPrice = null;
logger.info(`[Low-Stress] ${symbol} Entry Price reset.`);
}
}
} else {
// Update heartbeat values even if no signal change
state.ema = legacyResult.ema;
state.rsi = legacyResult.rsi;
}
logger.info(`[${symbol}] Signal: ${state.signal} | Price: ${state.price} | EMA: ${legacyResult.ema.toFixed(2)} | RSI: ${legacyResult.rsi.toFixed(2)}`);
// Monitor Low-Stress Thresholds
if (config.LOW_STRESS_MODE && state.entryPrice) {
const percentChange = ((state.price - state.entryPrice) / state.entryPrice) * 100;
if (percentChange >= 1.5) {
const tpMessage = `💰 *Take Profit Target Reached (+1.5%)* 💰\nAsset: ${symbol}\nEntry: ${state.entryPrice}\nCurrent: ${state.price}\nProfit: ${percentChange.toFixed(2)}%\nPlan: $10/Day Low-Stress ✅`;
await notifier.sendAlert(tpMessage);
state.entryPrice = null;
} else if (percentChange <= -0.7) {
const slMessage = `⚠️ *Stop Loss Buffer Hit (-0.7%)* ⚠️\nAsset: ${symbol}\nEntry: ${state.entryPrice}\nCurrent: ${state.price}\nLoss: ${percentChange.toFixed(2)}%\nPlan: Risk Managed 🛡️`;
await notifier.sendAlert(slMessage);
state.entryPrice = null;
}
}
} catch (error) {
logger.error(`Error in loop for ${symbol}:`, error);
}
// Configurable delay between symbols to separate logs/requests
await new Promise(r => setTimeout(r, config.SYMBOL_DELAY_MS));
}
// --- Broadcast ALL positions across ALL profiles to dashboard ---
const allPositions: any[] = [];
for (const userCtx of userContexts) {
const posMap = userCtx.executor.getAllPositions();
for (const [, pos] of posMap.entries()) {
const symbolKey = pos.symbol;
const livePrice = assetState.get(symbolKey)?.price || pos.entryPrice;
const pnl = (livePrice - pos.entryPrice) * pos.size * (pos.side === SignalDirection.BUY ? 1 : -1);
const pnlPercent = pos.entryPrice > 0
? ((livePrice - pos.entryPrice) / pos.entryPrice) * 100 * (pos.side === SignalDirection.BUY ? 1 : -1)
: 0;
allPositions.push({
id: pos.tradeId || `${userCtx.profileId}-${symbolKey}`,
symbol: symbolKey,
side: pos.side as 'BUY' | 'SELL',
size: pos.size,
entryPrice: pos.entryPrice,
currentPrice: livePrice,
stopLoss: pos.stopLoss || 0,
takeProfit: pos.takeProfit || 0,
unrealizedPnl: pnl,
unrealizedPnlPercent: pnlPercent,
marketValue: livePrice * pos.size,
userId: userCtx.executor.getUserId(),
profileId: userCtx.profileId,
profileName: userCtx.profileName,
tradeId: pos.tradeId
});
}
}
apiServer.updatePositions(allPositions);
} catch (error) {
loopSucceeded = false;
throw error;
} finally {
const completedAt = Date.now();
const durationMs = completedAt - loopStartedAt;
apiServer.updateRuntimeHealth({
tradingLoopRunning: false,
tradingLoopLastCompletedAt: completedAt,
tradingLoopLastDurationMs: durationMs
});
observabilityService.recordTradingLoop(durationMs);
isTradingLoopRunning = false;
healthTracker.recordTradingLoop(loopSucceeded);
apiServer.publishHealthSnapshot({ broadcast: true });
}
};
// Initial run
await tradingLoop();
setInterval(tradingLoop, config.POLLING_INTERVAL);
// --- 1b. Periodic Exchange Reconciliation ---
const reconcileAllProfiles = async () => {
if (isReconciliationRunning) return;
isReconciliationRunning = true;
const startedAt = Date.now();
apiServer.updateRuntimeHealth({
reconciliationRunning: true,
reconciliationLastRunAt: startedAt
});
let success = true;
let mismatchCount = 0;
let missingFromExchange = 0;
let missingInDb = 0;
let noGoTrades = 0;
const noGoReasonCounts = new Map<string, number>();
const noGoSamples: Array<{ profileId: string; symbol: string; tradeId: string; reason: string }> = [];
let parityMismatchTrades = 0;
let parityQuarantinedTrades = 0;
let parityAutoClosedTrades = 0;
let parityMaxMismatchNotionalUsd = 0;
let parityTotalMismatchNotionalUsd = 0;
let integrityWatchdogTriggered = false;
let failedProfiles = 0;
try {
const results = await Promise.allSettled(
userContexts.map(async (ctx) => {
await ctx.executor.syncPositions(ctx.monitoredSymbols && ctx.monitoredSymbols.length > 0 ? ctx.monitoredSymbols : config.SYMBOLS);
})
);
const failedSyncs = results.filter((result) => result.status === 'rejected').length;
if (failedSyncs > 0) {
success = false;
logger.warn(`[Reconcile] ${failedSyncs}/${results.length} profile sync tasks failed during exchange reconciliation.`);
}
const staleBacklog = await runtimeOrderRepository.getStaleOrders(5);
for (const ctx of userContexts) {
if (!ctx.profileId || ctx.profileId === 'global' || ctx.profileId.startsWith('default-')) continue;
const reconResult = await reconciliationService.reconcileProfile({
profileId: ctx.profileId,
userId: ctx.userId,
executor: ctx.executor,
monitoredSymbols: ctx.monitoredSymbols
});
if (reconResult.error) {
failedProfiles += 1;
success = false;
logger.error(`[Reconcile] Profile ${ctx.profileId} failed reconciliation: ${reconResult.error}`);
continue;
}
if (!reconResult.processed) continue;
mismatchCount += reconResult.mismatchCount;
missingFromExchange += reconResult.missingFromExchange;
missingInDb += reconResult.missingInDb;
noGoTrades += reconResult.noGoTrades;
for (const [reason, count] of Object.entries(reconResult.noGoReasonCounts || {})) {
const key = String(reason || 'unknown').trim() || 'unknown';
noGoReasonCounts.set(key, (noGoReasonCounts.get(key) || 0) + Math.max(0, Number(count) || 0));
}
if (noGoSamples.length < 10) {
for (const sample of (reconResult.noGoSamples || [])) {
if (noGoSamples.length >= 10) break;
noGoSamples.push({
profileId: String(sample?.profileId || '').trim(),
symbol: String(sample?.symbol || '').trim(),
tradeId: String(sample?.tradeId || '').trim(),
reason: String(sample?.reason || '').trim() || 'unknown'
});
}
}
parityMismatchTrades += reconResult.parityMismatchTrades;
parityQuarantinedTrades += reconResult.parityQuarantinedTrades;
parityAutoClosedTrades += reconResult.parityAutoClosedTrades;
parityMaxMismatchNotionalUsd = Math.max(parityMaxMismatchNotionalUsd, reconResult.parityMaxMismatchNotionalUsd);
parityTotalMismatchNotionalUsd += reconResult.parityTotalMismatchNotionalUsd;
integrityWatchdogTriggered = integrityWatchdogTriggered || reconResult.integrityWatchdogTriggered;
}
if (mismatchCount > 0 || missingFromExchange > 0 || missingInDb > 0) {
logger.warn(`[Reconcile] Parity mismatches: ${mismatchCount}, missing-from-exchange: ${missingFromExchange}, missing-in-db: ${missingInDb}`);
}
if (noGoTrades > 0) {
const noGoReasonSummary = Object.fromEntries(
Array.from(noGoReasonCounts.entries())
.sort((a, b) => b[1] - a[1])
);
logger.warn(`[Reconcile] Integrity NO_GO backlog: ${noGoTrades} trade lifecycle rows require manual evidence review. reasons=${JSON.stringify(noGoReasonSummary)}`);
}
if (parityMismatchTrades > 0 || parityAutoClosedTrades > 0 || parityQuarantinedTrades > 0) {
logger.warn(`[Reconcile] Parity heartbeat: mismatched_trades=${parityMismatchTrades}, auto_closed=${parityAutoClosedTrades}, quarantined=${parityQuarantinedTrades}, max_notional=$${parityMaxMismatchNotionalUsd.toFixed(2)}, total_notional=$${parityTotalMismatchNotionalUsd.toFixed(2)}`);
}
apiServer.updateRuntimeHealth({
staleOrderBacklog: staleBacklog.length,
parityMismatchCount: mismatchCount,
exchangeConnectivity: failedSyncs > 0 ? 'degraded' : 'healthy',
reconciliationMismatchCount: mismatchCount,
reconciliationMissingFromExchange: missingFromExchange,
reconciliationMissingInDb: missingInDb,
reconciliationNoGoTrades: noGoTrades,
reconciliationParityMismatchTrades: parityMismatchTrades,
reconciliationParityQuarantinedTrades: parityQuarantinedTrades,
reconciliationParityAutoClosedTrades: parityAutoClosedTrades,
reconciliationParityMaxMismatchNotionalUsd: parityMaxMismatchNotionalUsd,
reconciliationParityTotalMismatchNotionalUsd: Number(parityTotalMismatchNotionalUsd.toFixed(2)),
reconciliationIntegrityWatchdogTriggered: integrityWatchdogTriggered,
reconciliationFailedProfiles: failedProfiles
});
} catch (err: any) {
success = false;
logger.error(`[Reconcile] Failed reconciliation cycle: ${err.message}`);
apiServer.updateRuntimeHealth({
exchangeConnectivity: 'degraded'
});
} finally {
const completedAt = Date.now();
const durationMs = completedAt - startedAt;
apiServer.updateRuntimeHealth({
reconciliationRunning: false,
reconciliationLastDurationMs: durationMs
});
observabilityService.recordReconciliationLoop(durationMs, mismatchCount, missingFromExchange, missingInDb);
healthTracker.recordReconciliationLoop(success, {
mismatchCount,
missingFromExchange,
missingInDb,
noGoTrades,
noGoReasonCounts: Object.fromEntries(noGoReasonCounts.entries()),
noGoSamples,
integrityWatchdogTriggered
});
reconciliationWatchdogAutoResumeService.evaluateCycle({
success,
mismatchCount,
missingFromExchange,
missingInDb,
noGoTrades,
parityMismatchTrades,
parityQuarantinedTrades,
failedProfiles,
integrityWatchdogTriggered
});
apiServer.publishHealthSnapshot({ broadcast: true });
isReconciliationRunning = false;
}
};
await reconcileAllProfiles();
setInterval(reconcileAllProfiles, config.MONITOR_INTERVAL_MS);
// --- 2. Periodic Pulse Alert (Every 4 Hours) ---
const FOUR_HOURS = 4 * 60 * 60 * 1000;
setInterval(async () => {
if (config.ENABLE_PULSE_ALERTS) {
logger.info('[System] Sending periodic 4-hour pulse alert...');
for (const [symbol, state] of assetState.entries()) {
// Formatting:
// 🕒 *Status: BTC/USD*
// Signal: BUY | Price: 90000 | ...
const pulseMessage = `🕒 *4-Hour Status: ${symbol}* 🕒\nCurrent Signal: ${state.signal}\nPrice: ${state.price}\nEMA-20: ${state.ema.toFixed(2)}\nRSI-14: ${state.rsi.toFixed(2)}\nStatus: Monitoring Active ✅`;
await notifier.sendAlert(pulseMessage);
// Tiny delay to avoid spamming webhook if many assets
await new Promise(r => setTimeout(r, 1000));
}
}
}, FOUR_HOURS);
logger.info(`[System] Periodic alerts scheduled for every 4 hours.`);
}
main().catch(err => {
logger.error('Critical Error:', err);
tradingTelemetry.trackEvent('error', 'trading_loop', 'fatal_error', {
message: err?.message ?? String(err),
});
void tradingTelemetry.shutdown();
process.exit(1);
});