learning_ai_invt_trdg/backend/src/connectors/alpaca.ts

461 lines
18 KiB
TypeScript

import Alpaca from '@alpacahq/alpaca-trade-api';
import { config } from '../config/index.js';
import logger from '../utils/logger.js';
import { SymbolMapper } from '../utils/symbolMapper.js';
import { AccountSnapshot, IExchangeConnector, Candle, ExchangeCapabilities, ExchangeOrderCorrelation } from './types.js';
export class AlpacaConnector implements IExchangeConnector {
private client: any;
constructor(apiKey?: string, apiSecret?: string) {
this.client = new (Alpaca as any)({
keyId: apiKey || config.ALPACA_API_KEY,
secretKey: apiSecret || config.ALPACA_API_SECRET,
paper: config.PAPER_TRADING,
});
}
public getCapabilities(): ExchangeCapabilities {
return {
fetchOpenOrders: true,
fetchClosedOrders: true,
shorting: config.ASSET_CLASS !== 'crypto', // Alpaca crypto shorting is limited
margin: config.ASSET_CLASS !== 'crypto',
leverage: config.ASSET_CLASS !== 'crypto',
tradingWindow: true
};
}
private mapTimeframe(tf: string): string {
const map: { [key: string]: string } = {
'1m': '1Min',
'5m': '5Min',
'15m': '15Min',
'1h': '1Hour',
'4h': '4Hour',
'1d': '1Day',
'1Min': '1Min',
'1Hour': '1Hour',
'1Day': '1Day'
};
return map[tf] || tf;
}
async fetchOHLCV(symbol: string, timeframe: string, limit: number = 100): Promise<Candle[]> {
if (!this.client?.getBarsV2) {
logger.warn(`[Alpaca] getBarsV2 capability missing. Skipping fetchOHLCV for ${symbol}`);
return [];
}
try {
const formattedSymbol = symbol.replace('/', '');
let mappedTf = this.mapTimeframe(timeframe);
let fetchLimit = limit;
let needsAggregation = false;
// Alpaca V2 Crypto does NOT support 4Hour. We must fetch 1Hour and aggregate.
if (timeframe === '4h' || timeframe === '4Hour') {
mappedTf = '1Hour';
fetchLimit = limit * 4;
needsAggregation = true;
logger.info(`[Alpaca] 🔄 Aggregating 4h bars from 1h feed for ${symbol}`);
}
const options: any = {
start: new Date(Date.now() - 3600000 * 24 * 7).toISOString(), // 7 day lookback to be safe
timeframe: mappedTf,
limit: fetchLimit,
};
if (config.ASSET_CLASS !== 'crypto') {
options.feed = 'iex';
}
const barsGenerator = this.client.getBarsV2(formattedSymbol, options);
const candles: Candle[] = [];
for await (const bar of barsGenerator) {
candles.push({
timestamp: new Date(bar.Timestamp).getTime(),
open: bar.OpenPrice,
high: bar.HighPrice,
low: bar.LowPrice,
close: bar.ClosePrice,
volume: bar.Volume,
});
}
if (candles.length === 0) {
logger.warn(`[Alpaca] No bars found for ${formattedSymbol}.`);
return [];
}
const sorted = candles.sort((a, b) => a.timestamp - b.timestamp);
if (needsAggregation) {
return this.aggregateBars(sorted, 4);
}
return sorted;
} catch (error: any) {
logger.error(`[Alpaca] Error: ${error.message || error}`);
throw error;
}
}
private aggregateBars(candles: Candle[], factor: number): Candle[] {
const aggregated: Candle[] = [];
for (let i = 0; i < candles.length; i += factor) {
const chunk = candles.slice(i, i + factor);
if (chunk.length === 0) continue;
aggregated.push({
timestamp: chunk[0].timestamp,
open: chunk[0].open,
high: Math.max(...chunk.map(c => c.high)),
low: Math.min(...chunk.map(c => c.low)),
close: chunk[chunk.length - 1].close,
volume: chunk.reduce((sum, c) => sum + c.volume, 0)
});
}
return aggregated;
}
private sanitizeClientOrderToken(value: unknown, fallback: string, maxLength: number): string {
const cleaned = String(value || '')
.trim()
.replace(/[^A-Za-z0-9_-]/g, '');
const token = cleaned || fallback;
if (token.length <= maxLength) return token;
return token.slice(token.length - maxLength);
}
private buildCorrelationClientOrderId(correlation?: ExchangeOrderCorrelation): string | undefined {
const tradeId = String(correlation?.tradeId || '').trim();
if (!tradeId) return undefined;
const profileToken = this.sanitizeClientOrderToken(correlation?.profileId || 'global', 'global', 12);
const tradeToken = this.sanitizeClientOrderToken(tradeId, 'trade', 24);
const intentToken = this.sanitizeClientOrderToken(correlation?.intent || 'unknown', 'unknown', 8).toLowerCase();
const candidate = `bytelyst-${profileToken}-${tradeToken}-${intentToken}`;
return candidate.length <= 48 ? candidate : candidate.slice(0, 48);
}
async placeOrder(
symbol: string,
side: 'buy' | 'sell',
qty: number,
type: 'market' | 'limit',
price?: number,
stopLoss?: number,
takeProfit?: number,
clientOrderId?: string,
correlation?: ExchangeOrderCorrelation
): Promise<any> {
try {
const formattedSymbol = symbol.replace('/', '');
const orderOptions: any = {
symbol: formattedSymbol,
qty,
side,
type,
time_in_force: 'gtc'
};
if (type === 'limit' && price) {
orderOptions.limit_price = price;
}
// Bracket Order (One-Cancels-Other for Profit/Stop)
// NOTE: Alpaca does not support advanced orders for Crypto.
// Our bot handles local SL/TP monitoring in TradeMonitor.ts, so we can skip exchange-side brackets for crypto.
const isCrypto = config.ASSET_CLASS === 'crypto' || formattedSymbol.endsWith('USD') || formattedSymbol.endsWith('USDT');
if ((stopLoss || takeProfit) && !isCrypto) {
orderOptions.order_class = 'bracket';
if (takeProfit) {
orderOptions.take_profit = { limit_price: takeProfit };
}
if (stopLoss) {
orderOptions.stop_loss = { stop_price: stopLoss };
}
}
logger.info(`[Alpaca] Placing ${side} order for ${qty} ${formattedSymbol} (Raw: ${symbol})...`);
const resolvedClientOrderId = clientOrderId || this.buildCorrelationClientOrderId(correlation);
if (resolvedClientOrderId) {
orderOptions.client_order_id = resolvedClientOrderId;
}
if (correlation?.subTag) {
// Broker API omnibus field. Keep snake_case variant for compatibility.
orderOptions.subtag = correlation.subTag;
}
if (!this.client?.createOrder) {
throw new Error("Alpaca client createOrder capability missing");
}
const order = await this.client.createOrder(orderOptions);
if (resolvedClientOrderId && !order?.client_order_id) {
order.client_order_id = resolvedClientOrderId;
}
if (correlation?.subTag) {
order.subtag = order?.subtag || correlation.subTag;
order.sub_tag = order?.sub_tag || correlation.subTag;
}
return order;
} catch (error: any) {
const errorData = error.response?.data ? JSON.stringify(error.response.data) : (error.message || error);
logger.error(`[Alpaca] Order Error: ${errorData}`);
throw new Error(errorData);
}
}
async getOrder(orderId: string): Promise<any> {
if (!this.client?.getOrder) return null;
try {
const order = await this.client.getOrder(orderId);
return order;
} catch (error: any) {
logger.error(`[Alpaca] GetOrder Error for ${orderId}: ${error.message || error}`);
return null;
}
}
async cancelOrder(orderId: string): Promise<boolean> {
if (!this.client?.cancelOrder) return false;
try {
await this.client.cancelOrder(orderId);
logger.info(`[Alpaca] Order ${orderId} cancelled successfully.`);
return true;
} catch (error: any) {
logger.error(`[Alpaca] CancelOrder Error for ${orderId}: ${error.message || error}`);
return false;
}
}
async fetchAccountSnapshot(): Promise<AccountSnapshot | null> {
if (!this.client?.getAccount) return null;
try {
const account = await this.client.getAccount();
const parseNumber = (value: unknown) => {
const parsed = typeof value === 'string' ? Number(value) : Number(value ?? 0);
return Number.isFinite(parsed) ? parsed : 0;
};
return {
buying_power: parseNumber(account.buying_power || account.buyingPower),
cash: parseNumber(account.cash),
currency: String(account.currency || 'USD'),
timestamp: Date.now()
};
} catch (error: any) {
logger.warn(`[Alpaca] Account snapshot failed: ${error.message || error}`);
return null;
}
}
private normalizeDataSymbol(symbol: string): string {
const upper = String(symbol || '').toUpperCase();
if (!upper) return upper;
if (upper.includes('/')) return upper;
if (upper.endsWith('USDT')) {
return `${upper.slice(0, -4)}/USDT`;
}
if (upper.endsWith('USD')) {
return `${upper.slice(0, -3)}/USD`;
}
return upper;
}
private buildNormalizedSymbolTargets(symbols?: string[]): Set<string> {
const targets = new Set<string>();
if (!symbols || symbols.length === 0) return targets;
for (const raw of symbols) {
const input = String(raw || '').trim().toUpperCase();
if (!input) continue;
const normalizedInput = this.normalizeDataSymbol(input);
const tradeVariant = SymbolMapper.toTradeSymbol(normalizedInput, 'alpaca').toUpperCase();
const dataVariant = SymbolMapper.toDataSymbol(normalizedInput, 'alpaca').toUpperCase();
const normalizedTradeVariant = this.normalizeDataSymbol(tradeVariant);
const normalizedDataVariant = this.normalizeDataSymbol(dataVariant);
targets.add(input);
targets.add(normalizedInput);
targets.add(tradeVariant);
targets.add(dataVariant);
targets.add(normalizedTradeVariant);
targets.add(normalizedDataVariant);
if (normalizedInput.endsWith('/USDT')) {
targets.add(normalizedInput.replace('/USDT', '/USD'));
}
if (normalizedInput.endsWith('/USD')) {
targets.add(normalizedInput.replace('/USD', '/USDT'));
}
}
return targets;
}
async fetchOpenOrders(symbols?: string[]): Promise<any[]> {
if (!this.client?.getOrders) return [];
try {
const opts: any = {
status: 'open',
direction: 'desc',
limit: 200
};
const orders: any[] = await this.client.getOrders(opts);
if (!symbols || symbols.length === 0) {
return orders;
}
const normalizedTargets = this.buildNormalizedSymbolTargets(symbols);
return orders.filter((order) => {
const normalized = this.normalizeDataSymbol(order.symbol);
return normalizedTargets.has(normalized);
});
} catch (error: any) {
logger.error(`[Alpaca] Fetch open orders failed: ${error.message || error}`);
return [];
}
}
async fetchClosedOrders(
symbols?: string[],
options?: {
after?: Date;
limit?: number;
maxPages?: number;
}
): Promise<any[]> {
if (!this.client?.getOrders) return [];
try {
const limit = Math.max(1, Math.min(500, Math.floor(Number(options?.limit || 500))));
const maxPages = Math.max(1, Math.min(100, Math.floor(Number(options?.maxPages || 1))));
const normalizedTargets = this.buildNormalizedSymbolTargets(symbols);
const afterMs = options?.after instanceof Date && Number.isFinite(options.after.getTime())
? options.after.getTime()
: 0;
const parseOrderTimestampMs = (order: any): number => {
const candidates = [
order?.submitted_at,
order?.created_at,
order?.updated_at,
order?.filled_at
];
for (const candidate of candidates) {
const parsed = Date.parse(String(candidate || '').trim());
if (Number.isFinite(parsed) && parsed > 0) return parsed;
}
return 0;
};
const includeOrder = (order: any): boolean => {
if (!symbols || symbols.length === 0) return true;
const normalized = this.normalizeDataSymbol(order?.symbol);
return normalizedTargets.has(normalized);
};
const deduped = new Map<string, any>();
let until: Date | undefined;
let page = 0;
let reachedPotentialTruncation = false;
let previousCursorMs = Number.POSITIVE_INFINITY;
while (page < maxPages) {
const opts: any = {
status: 'closed',
direction: 'desc',
limit
};
if (options?.after instanceof Date && Number.isFinite(options.after.getTime())) {
opts.after = options.after;
}
if (until instanceof Date && Number.isFinite(until.getTime())) {
opts.until = until;
}
const batch: any[] = await this.client.getOrders(opts);
if (!Array.isArray(batch) || batch.length === 0) break;
page += 1;
for (const order of batch) {
if (!includeOrder(order)) continue;
const orderKey = String(order?.id || order?.order_id || order?.client_order_id || '').trim();
if (!orderKey) continue;
if (!deduped.has(orderKey)) {
deduped.set(orderKey, order);
}
}
let oldestBatchTs = Number.POSITIVE_INFINITY;
for (const order of batch) {
const ts = parseOrderTimestampMs(order);
if (ts > 0 && ts < oldestBatchTs) {
oldestBatchTs = ts;
}
}
if (!Number.isFinite(oldestBatchTs) || oldestBatchTs <= 0) break;
if (afterMs > 0 && oldestBatchTs <= afterMs) break;
if (batch.length < limit) break;
const nextCursorMs = oldestBatchTs - 1;
if (!(nextCursorMs > 0) || nextCursorMs >= previousCursorMs) break;
previousCursorMs = nextCursorMs;
until = new Date(nextCursorMs);
}
reachedPotentialTruncation = page >= maxPages;
if (reachedPotentialTruncation) {
logger.warn(`[Alpaca] fetchClosedOrders reached maxPages=${maxPages}. Results may be truncated for lookback window.`);
}
return Array.from(deduped.values());
} catch (error: any) {
logger.error(`[Alpaca] Fetch closed orders failed: ${error.message || error}`);
return [];
}
}
async getPosition(symbol: string): Promise<any> {
if (!this.client?.getPosition) return null;
try {
// Alpaca Positions endpoint usually expects symbols without slashes (e.g. BTCUSD)
// even if Market Data V2 uses BTC/USD.
const formattedSymbol = symbol.replace('/', '');
logger.info(`[Alpaca] Fetching position for ${formattedSymbol} (Raw: ${symbol})...`);
const position = await this.client.getPosition(formattedSymbol);
return position;
} catch (error: any) {
// Alpaca throws 404 if no position exists
if (error.message?.includes('404')) {
// logger.info(`[Alpaca] No position found for ${symbol}`);
return null;
}
const message = error.message || String(error);
logger.error(`[Alpaca] GetPosition Error for ${symbol}: ${message}`);
throw new Error(message);
}
}
async isTradingWindowOpen(): Promise<boolean | null> {
if (config.ASSET_CLASS === 'crypto') {
return true;
}
if (!this.client?.getClock) return null;
try {
const clock = await this.client.getClock();
if (typeof clock?.is_open === 'boolean') {
return clock.is_open;
}
return null;
} catch (error: any) {
logger.warn(`[Alpaca] Could not resolve market clock: ${error.message || error}`);
return null;
}
}
}