fix(simple): support crypto fallback and scale-in entries

This commit is contained in:
root 2026-05-06 07:33:52 +00:00
parent e92236b764
commit 349cdae4a6
6 changed files with 185 additions and 21 deletions

View File

@ -2,15 +2,20 @@ import Alpaca from '@alpacahq/alpaca-trade-api';
import { config } from '../config/index.js';
import logger from '../utils/logger.js';
import { SymbolMapper } from '../utils/symbolMapper.js';
import { fetchCoinbaseCryptoCandles } from '../utils/cryptoMarketData.js';
import { AccountSnapshot, IExchangeConnector, Candle, ExchangeCapabilities, ExchangeOrderCorrelation } from './types.js';
export class AlpacaConnector implements IExchangeConnector {
private client: any;
private apiKey: string;
private apiSecret: string;
constructor(apiKey?: string, apiSecret?: string, options?: { paper?: boolean }) {
this.apiKey = apiKey || config.ALPACA_API_KEY;
this.apiSecret = apiSecret || config.ALPACA_API_SECRET;
this.client = new (Alpaca as any)({
keyId: apiKey || config.ALPACA_API_KEY,
secretKey: apiSecret || config.ALPACA_API_SECRET,
keyId: this.apiKey,
secretKey: this.apiSecret,
paper: options?.paper ?? config.PAPER_TRADING,
});
}
@ -47,7 +52,13 @@ export class AlpacaConnector implements IExchangeConnector {
return [];
}
try {
const dataSymbol = String(
symbol.includes('/')
? symbol.toUpperCase()
: SymbolMapper.toDataSymbol(symbol, 'alpaca')
).trim();
const formattedSymbol = symbol.replace('/', '');
const isCryptoSymbol = symbol.includes('/') || dataSymbol.includes('/');
let mappedTf = this.mapTimeframe(timeframe);
let fetchLimit = limit;
let needsAggregation = false;
@ -66,10 +77,17 @@ export class AlpacaConnector implements IExchangeConnector {
limit: fetchLimit,
};
if (config.ASSET_CLASS !== 'crypto') {
options.feed = 'iex';
if (isCryptoSymbol) {
return await fetchCoinbaseCryptoCandles({
symbol: dataSymbol,
timeframe,
start: options.start,
limit,
});
}
options.feed = 'iex';
const barsGenerator = this.client.getBarsV2(formattedSymbol, options);
const candles: Candle[] = [];
for await (const bar of barsGenerator) {

View File

@ -281,7 +281,9 @@ async function main() {
if (!ctx) continue;
if (shouldArmSimpleBuy(entry)) {
const reboundExistingPosition = await bindSimpleBoughtPosition(entry, ctx);
const reboundExistingPosition = String(entry.linked_trade_id || '').trim()
? await bindSimpleBoughtPosition(entry, ctx)
: false;
if (reboundExistingPosition) {
logger.info(`[SimpleWorker] Rebound armed buy setup to existing open holding for ${symbol}`);
continue;
@ -302,7 +304,10 @@ async function main() {
threshold === 0 ? 'market' : 'limit',
threshold === 0 ? undefined : triggerPrice,
currentPrice,
entry.user_id
entry.user_id,
undefined,
undefined,
{ allowExistingPosition: true }
);
if (!result.success) {
logger.warn(`[SimpleWorker] Buy trigger failed for ${symbol}: ${result.error || 'unknown error'}`);

View File

@ -64,7 +64,10 @@ export class ManualTrader {
priceHint?: number,
userId?: string,
sl?: number,
tp?: number
tp?: number,
options?: {
allowExistingPosition?: boolean;
}
): Promise<{ success: boolean; orderId?: string; tradeId?: string; error?: string; adjustedQty?: number; requestedQty?: number; remainingCapitalUsd?: number }> {
const signalSide = (side.toLowerCase() === 'buy') ? SignalDirection.BUY : SignalDirection.SELL;
const requestedQty = Number(qty);
@ -123,7 +126,8 @@ export class ManualTrader {
type === 'market' ? estimatedPrice : price,
sl,
tp,
userId
userId,
options
);
return {

View File

@ -932,15 +932,18 @@ export class TradeExecutor {
* Places a direct order to open a position.
* Does NOT check risk or strategy rules. Assumes caller has validated.
*/
public async openPosition(
symbol: string,
side: SignalDirection,
qty: number,
type: 'market' | 'limit' = 'market',
price?: number,
sl?: number,
tp?: number,
userIdOverride?: string
public async openPosition(
symbol: string,
side: SignalDirection,
qty: number,
type: 'market' | 'limit' = 'market',
price?: number,
sl?: number,
tp?: number,
userIdOverride?: string,
options?: {
allowExistingPosition?: boolean;
}
): Promise<{ success: boolean, orderId?: string, tradeId?: string, error?: string }> {
if (healthTracker.isPaused()) {
logger.info(`[TradeExecutor] 🛑 Entry BLOCKED for ${symbol}: Bot is PAUSED by admin.`);
@ -997,11 +1000,15 @@ export class TradeExecutor {
if (lifecycleActive) {
const corroborated = await this.hasOpenLifecycleEvidence(ledgerProfileId, normalizedSymbol);
if (corroborated) {
await releaseLockIfHeld();
logger.warn(`[Executor] Entry lifecycle already open for ${symbol} (profile=${ledgerProfileId})`);
return { success: false, error: 'Entry lifecycle already exists' };
if (!options?.allowExistingPosition) {
await releaseLockIfHeld();
logger.warn(`[Executor] Entry lifecycle already open for ${symbol} (profile=${ledgerProfileId})`);
return { success: false, error: 'Entry lifecycle already exists' };
}
logger.info(`[Executor] Allowing scale-in entry for ${symbol} despite existing lifecycle (profile=${ledgerProfileId})`);
} else {
logger.warn(`[Executor] Ignoring stale entry lifecycle marker for ${symbol} (profile=${ledgerProfileId})`);
}
logger.warn(`[Executor] Ignoring stale entry lifecycle marker for ${symbol} (profile=${ledgerProfileId})`);
}
if (await this.hasActiveTradeId(tradeId)) {
await releaseLockIfHeld();

View File

@ -51,6 +51,7 @@ import {
} from '../backtest/strategySafety.js';
import type { BacktestRequest, BacktestTimeframe } from '../backtest/types.js';
import { fetchFmpJson, FmpFetchError } from './fmpCache.js';
import { fetchCoinbaseCryptoCandles } from '../utils/cryptoMarketData.js';
import {
canonicalLifecycleService,
type CanonicalLifecycleProfileMeta
@ -2788,6 +2789,28 @@ RULES:
},
});
if (!r.ok) {
if (isCrypto) {
try {
const bars = await fetchCoinbaseCryptoCandles({
symbol,
timeframe,
start: start.toISOString(),
end: now.toISOString(),
limit,
});
return res.json({ symbol, period, bars: bars.map((b) => ({
ts: b.timestamp,
open: b.open,
high: b.high,
low: b.low,
close: b.close,
volume: b.volume,
})) });
} catch (fallbackError: any) {
const fallbackMessage = fallbackError?.message || String(fallbackError || '');
return res.status(503).json({ error: `Crypto bars fetch failed via Alpaca and Coinbase fallback: ${fallbackMessage}` });
}
}
const txt = await r.text().catch(() => '');
return res.status(r.status).json({ error: `Alpaca bars fetch failed: ${txt}` });
}

View File

@ -0,0 +1,107 @@
import type { Candle } from '../connectors/types.js';
function normalizeCryptoProductId(symbol: string): string {
const upper = String(symbol || '').trim().toUpperCase();
if (!upper) return upper;
if (upper.includes('/')) return upper.replace('/', '-');
if (upper.endsWith('USDT')) return `${upper.slice(0, -4)}-USDT`;
if (upper.endsWith('USD')) return `${upper.slice(0, -3)}-USD`;
return upper;
}
function toCoinbaseGranularity(timeframe: string): number {
const normalized = String(timeframe || '').trim().toLowerCase();
switch (normalized) {
case '1m':
case '1min':
return 60;
case '5m':
case '5min':
return 300;
case '15m':
case '15min':
return 900;
case '1h':
case '1hour':
return 3600;
case '4h':
case '4hour':
return 3600;
case '1d':
case '1day':
return 86400;
case '1w':
case '1week':
return 86400;
case '1month':
return 86400;
default:
return 3600;
}
}
function aggregateBars(candles: Candle[], factor: number): Candle[] {
const aggregated: Candle[] = [];
for (let i = 0; i < candles.length; i += factor) {
const chunk = candles.slice(i, i + factor);
if (!chunk.length) continue;
aggregated.push({
timestamp: chunk[0].timestamp,
open: chunk[0].open,
high: Math.max(...chunk.map((c) => c.high)),
low: Math.min(...chunk.map((c) => c.low)),
close: chunk[chunk.length - 1].close,
volume: chunk.reduce((sum, c) => sum + c.volume, 0),
});
}
return aggregated;
}
export async function fetchCoinbaseCryptoCandles(params: {
symbol: string;
timeframe: string;
start?: string;
end?: string;
limit?: number;
}): Promise<Candle[]> {
const productId = normalizeCryptoProductId(params.symbol);
const granularity = toCoinbaseGranularity(params.timeframe);
const query = new URLSearchParams({
granularity: String(granularity),
});
if (params.start) query.set('start', params.start);
if (params.end) query.set('end', params.end);
const response = await fetch(`https://api.exchange.coinbase.com/products/${encodeURIComponent(productId)}/candles?${query.toString()}`);
if (!response.ok) {
const txt = await response.text().catch(() => '');
throw new Error(`Coinbase candles fetch failed: ${response.status} ${txt || response.statusText}`);
}
const payload = await response.json() as any[];
const candles: Candle[] = Array.isArray(payload)
? payload.map((row: any) => ({
timestamp: Number(row[0]) * 1000,
low: Number(row[1]),
high: Number(row[2]),
open: Number(row[3]),
close: Number(row[4]),
volume: Number(row[5] || 0),
}))
: [];
const sorted = candles
.filter((c) => Number.isFinite(c.timestamp) && Number.isFinite(c.open) && Number.isFinite(c.high) && Number.isFinite(c.low) && Number.isFinite(c.close))
.sort((a, b) => a.timestamp - b.timestamp);
const normalizedTimeframe = String(params.timeframe || '').trim().toLowerCase();
const aggregated = normalizedTimeframe === '4h' || normalizedTimeframe === '4hour'
? aggregateBars(sorted, 4)
: sorted;
if (params.limit && params.limit > 0 && aggregated.length > params.limit) {
return aggregated.slice(-params.limit);
}
return aggregated;
}