From 349cdae4a60432a53f82ef8026c437819e07a156 Mon Sep 17 00:00:00 2001 From: root Date: Wed, 6 May 2026 07:33:52 +0000 Subject: [PATCH] fix(simple): support crypto fallback and scale-in entries --- backend/src/connectors/alpaca.ts | 26 ++++++- backend/src/index.ts | 9 ++- backend/src/services/ManualTrader.ts | 8 +- backend/src/services/TradeExecutor.ts | 33 ++++---- backend/src/services/apiServer.ts | 23 ++++++ backend/src/utils/cryptoMarketData.ts | 107 ++++++++++++++++++++++++++ 6 files changed, 185 insertions(+), 21 deletions(-) create mode 100644 backend/src/utils/cryptoMarketData.ts diff --git a/backend/src/connectors/alpaca.ts b/backend/src/connectors/alpaca.ts index 7f11cf0..a97091f 100644 --- a/backend/src/connectors/alpaca.ts +++ b/backend/src/connectors/alpaca.ts @@ -2,15 +2,20 @@ import Alpaca from '@alpacahq/alpaca-trade-api'; import { config } from '../config/index.js'; import logger from '../utils/logger.js'; import { SymbolMapper } from '../utils/symbolMapper.js'; +import { fetchCoinbaseCryptoCandles } from '../utils/cryptoMarketData.js'; import { AccountSnapshot, IExchangeConnector, Candle, ExchangeCapabilities, ExchangeOrderCorrelation } from './types.js'; export class AlpacaConnector implements IExchangeConnector { private client: any; + private apiKey: string; + private apiSecret: string; constructor(apiKey?: string, apiSecret?: string, options?: { paper?: boolean }) { + this.apiKey = apiKey || config.ALPACA_API_KEY; + this.apiSecret = apiSecret || config.ALPACA_API_SECRET; this.client = new (Alpaca as any)({ - keyId: apiKey || config.ALPACA_API_KEY, - secretKey: apiSecret || config.ALPACA_API_SECRET, + keyId: this.apiKey, + secretKey: this.apiSecret, paper: options?.paper ?? config.PAPER_TRADING, }); } @@ -47,7 +52,13 @@ export class AlpacaConnector implements IExchangeConnector { return []; } try { + const dataSymbol = String( + symbol.includes('/') + ? symbol.toUpperCase() + : SymbolMapper.toDataSymbol(symbol, 'alpaca') + ).trim(); const formattedSymbol = symbol.replace('/', ''); + const isCryptoSymbol = symbol.includes('/') || dataSymbol.includes('/'); let mappedTf = this.mapTimeframe(timeframe); let fetchLimit = limit; let needsAggregation = false; @@ -66,10 +77,17 @@ export class AlpacaConnector implements IExchangeConnector { limit: fetchLimit, }; - if (config.ASSET_CLASS !== 'crypto') { - options.feed = 'iex'; + if (isCryptoSymbol) { + return await fetchCoinbaseCryptoCandles({ + symbol: dataSymbol, + timeframe, + start: options.start, + limit, + }); } + options.feed = 'iex'; + const barsGenerator = this.client.getBarsV2(formattedSymbol, options); const candles: Candle[] = []; for await (const bar of barsGenerator) { diff --git a/backend/src/index.ts b/backend/src/index.ts index c143686..bfa0034 100644 --- a/backend/src/index.ts +++ b/backend/src/index.ts @@ -281,7 +281,9 @@ async function main() { if (!ctx) continue; if (shouldArmSimpleBuy(entry)) { - const reboundExistingPosition = await bindSimpleBoughtPosition(entry, ctx); + const reboundExistingPosition = String(entry.linked_trade_id || '').trim() + ? await bindSimpleBoughtPosition(entry, ctx) + : false; if (reboundExistingPosition) { logger.info(`[SimpleWorker] Rebound armed buy setup to existing open holding for ${symbol}`); continue; @@ -302,7 +304,10 @@ async function main() { threshold === 0 ? 'market' : 'limit', threshold === 0 ? undefined : triggerPrice, currentPrice, - entry.user_id + entry.user_id, + undefined, + undefined, + { allowExistingPosition: true } ); if (!result.success) { logger.warn(`[SimpleWorker] Buy trigger failed for ${symbol}: ${result.error || 'unknown error'}`); diff --git a/backend/src/services/ManualTrader.ts b/backend/src/services/ManualTrader.ts index 48d2d14..1fbc6a8 100644 --- a/backend/src/services/ManualTrader.ts +++ b/backend/src/services/ManualTrader.ts @@ -64,7 +64,10 @@ export class ManualTrader { priceHint?: number, userId?: string, sl?: number, - tp?: number + tp?: number, + options?: { + allowExistingPosition?: boolean; + } ): Promise<{ success: boolean; orderId?: string; tradeId?: string; error?: string; adjustedQty?: number; requestedQty?: number; remainingCapitalUsd?: number }> { const signalSide = (side.toLowerCase() === 'buy') ? SignalDirection.BUY : SignalDirection.SELL; const requestedQty = Number(qty); @@ -123,7 +126,8 @@ export class ManualTrader { type === 'market' ? estimatedPrice : price, sl, tp, - userId + userId, + options ); return { diff --git a/backend/src/services/TradeExecutor.ts b/backend/src/services/TradeExecutor.ts index 51cc2b0..46b3c98 100644 --- a/backend/src/services/TradeExecutor.ts +++ b/backend/src/services/TradeExecutor.ts @@ -932,15 +932,18 @@ export class TradeExecutor { * Places a direct order to open a position. * Does NOT check risk or strategy rules. Assumes caller has validated. */ - public async openPosition( - symbol: string, - side: SignalDirection, - qty: number, - type: 'market' | 'limit' = 'market', - price?: number, - sl?: number, - tp?: number, - userIdOverride?: string + public async openPosition( + symbol: string, + side: SignalDirection, + qty: number, + type: 'market' | 'limit' = 'market', + price?: number, + sl?: number, + tp?: number, + userIdOverride?: string, + options?: { + allowExistingPosition?: boolean; + } ): Promise<{ success: boolean, orderId?: string, tradeId?: string, error?: string }> { if (healthTracker.isPaused()) { logger.info(`[TradeExecutor] 🛑 Entry BLOCKED for ${symbol}: Bot is PAUSED by admin.`); @@ -997,11 +1000,15 @@ export class TradeExecutor { if (lifecycleActive) { const corroborated = await this.hasOpenLifecycleEvidence(ledgerProfileId, normalizedSymbol); if (corroborated) { - await releaseLockIfHeld(); - logger.warn(`[Executor] Entry lifecycle already open for ${symbol} (profile=${ledgerProfileId})`); - return { success: false, error: 'Entry lifecycle already exists' }; + if (!options?.allowExistingPosition) { + await releaseLockIfHeld(); + logger.warn(`[Executor] Entry lifecycle already open for ${symbol} (profile=${ledgerProfileId})`); + return { success: false, error: 'Entry lifecycle already exists' }; + } + logger.info(`[Executor] Allowing scale-in entry for ${symbol} despite existing lifecycle (profile=${ledgerProfileId})`); + } else { + logger.warn(`[Executor] Ignoring stale entry lifecycle marker for ${symbol} (profile=${ledgerProfileId})`); } - logger.warn(`[Executor] Ignoring stale entry lifecycle marker for ${symbol} (profile=${ledgerProfileId})`); } if (await this.hasActiveTradeId(tradeId)) { await releaseLockIfHeld(); diff --git a/backend/src/services/apiServer.ts b/backend/src/services/apiServer.ts index 0f05d59..091e98e 100644 --- a/backend/src/services/apiServer.ts +++ b/backend/src/services/apiServer.ts @@ -51,6 +51,7 @@ import { } from '../backtest/strategySafety.js'; import type { BacktestRequest, BacktestTimeframe } from '../backtest/types.js'; import { fetchFmpJson, FmpFetchError } from './fmpCache.js'; +import { fetchCoinbaseCryptoCandles } from '../utils/cryptoMarketData.js'; import { canonicalLifecycleService, type CanonicalLifecycleProfileMeta @@ -2788,6 +2789,28 @@ RULES: }, }); if (!r.ok) { + if (isCrypto) { + try { + const bars = await fetchCoinbaseCryptoCandles({ + symbol, + timeframe, + start: start.toISOString(), + end: now.toISOString(), + limit, + }); + return res.json({ symbol, period, bars: bars.map((b) => ({ + ts: b.timestamp, + open: b.open, + high: b.high, + low: b.low, + close: b.close, + volume: b.volume, + })) }); + } catch (fallbackError: any) { + const fallbackMessage = fallbackError?.message || String(fallbackError || ''); + return res.status(503).json({ error: `Crypto bars fetch failed via Alpaca and Coinbase fallback: ${fallbackMessage}` }); + } + } const txt = await r.text().catch(() => ''); return res.status(r.status).json({ error: `Alpaca bars fetch failed: ${txt}` }); } diff --git a/backend/src/utils/cryptoMarketData.ts b/backend/src/utils/cryptoMarketData.ts new file mode 100644 index 0000000..395e0d8 --- /dev/null +++ b/backend/src/utils/cryptoMarketData.ts @@ -0,0 +1,107 @@ +import type { Candle } from '../connectors/types.js'; + +function normalizeCryptoProductId(symbol: string): string { + const upper = String(symbol || '').trim().toUpperCase(); + if (!upper) return upper; + if (upper.includes('/')) return upper.replace('/', '-'); + if (upper.endsWith('USDT')) return `${upper.slice(0, -4)}-USDT`; + if (upper.endsWith('USD')) return `${upper.slice(0, -3)}-USD`; + return upper; +} + +function toCoinbaseGranularity(timeframe: string): number { + const normalized = String(timeframe || '').trim().toLowerCase(); + switch (normalized) { + case '1m': + case '1min': + return 60; + case '5m': + case '5min': + return 300; + case '15m': + case '15min': + return 900; + case '1h': + case '1hour': + return 3600; + case '4h': + case '4hour': + return 3600; + case '1d': + case '1day': + return 86400; + case '1w': + case '1week': + return 86400; + case '1month': + return 86400; + default: + return 3600; + } +} + +function aggregateBars(candles: Candle[], factor: number): Candle[] { + const aggregated: Candle[] = []; + for (let i = 0; i < candles.length; i += factor) { + const chunk = candles.slice(i, i + factor); + if (!chunk.length) continue; + aggregated.push({ + timestamp: chunk[0].timestamp, + open: chunk[0].open, + high: Math.max(...chunk.map((c) => c.high)), + low: Math.min(...chunk.map((c) => c.low)), + close: chunk[chunk.length - 1].close, + volume: chunk.reduce((sum, c) => sum + c.volume, 0), + }); + } + return aggregated; +} + +export async function fetchCoinbaseCryptoCandles(params: { + symbol: string; + timeframe: string; + start?: string; + end?: string; + limit?: number; +}): Promise { + const productId = normalizeCryptoProductId(params.symbol); + const granularity = toCoinbaseGranularity(params.timeframe); + const query = new URLSearchParams({ + granularity: String(granularity), + }); + if (params.start) query.set('start', params.start); + if (params.end) query.set('end', params.end); + + const response = await fetch(`https://api.exchange.coinbase.com/products/${encodeURIComponent(productId)}/candles?${query.toString()}`); + if (!response.ok) { + const txt = await response.text().catch(() => ''); + throw new Error(`Coinbase candles fetch failed: ${response.status} ${txt || response.statusText}`); + } + + const payload = await response.json() as any[]; + const candles: Candle[] = Array.isArray(payload) + ? payload.map((row: any) => ({ + timestamp: Number(row[0]) * 1000, + low: Number(row[1]), + high: Number(row[2]), + open: Number(row[3]), + close: Number(row[4]), + volume: Number(row[5] || 0), + })) + : []; + + const sorted = candles + .filter((c) => Number.isFinite(c.timestamp) && Number.isFinite(c.open) && Number.isFinite(c.high) && Number.isFinite(c.low) && Number.isFinite(c.close)) + .sort((a, b) => a.timestamp - b.timestamp); + + const normalizedTimeframe = String(params.timeframe || '').trim().toLowerCase(); + const aggregated = normalizedTimeframe === '4h' || normalizedTimeframe === '4hour' + ? aggregateBars(sorted, 4) + : sorted; + + if (params.limit && params.limit > 0 && aggregated.length > params.limit) { + return aggregated.slice(-params.limit); + } + + return aggregated; +}