learning_ai_invt_trdg/backend/src/services/SupabaseService.ts
Saravana Achu Mac 4cfb446f57 feat(backend): WebSocket namespaces, audit persistence, tab flags, telemetry
- Add /trading and /admin named Socket.IO namespaces; root namespace kept for
  backward compat; admin namespace rejects non-admins at connect time
- Wire auditRepository.ts: persist TradeAuditEvent to Cosmos audit-events
  container (best-effort); expose GET /api/admin/audit for admin queries
- Add tradingTelemetry singleton (Node.js Map-based storage adapter); init
  and fatal-error tracking wired in index.ts main()
- Add TAB_MARKETPLACE_ENABLED / TAB_MEMBERSHIP_ENABLED config flags; expose
  tabs.* shape in GET /api/feature-flags response
- Fix SupabaseService URL validation (regex check before createClient)
- Wire check:api-contract and check:audit-repository into npm run test
- Switch @bytelyst/* deps to file:../vendor/* references

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-29 19:35:00 -04:00

2925 lines
115 KiB
TypeScript

import { createClient, SupabaseClient } from '@supabase/supabase-js';
import { config } from '../config/index.js';
import logger from '../utils/logger.js';
import {
normalizeOrderAction,
normalizeOrderStatus,
normalizeOrderType,
normalizeTradeSide
} from '../domain/tradingEnums.js';
import {
buildAlpacaSubTag,
shouldAttachAlpacaSubTag,
isBytelystSubTag,
subTagBelongsToProfile,
type AlpacaSubTagIntent
} from '../utils/alpacaSubTag.js';
import { SymbolMapper } from '../utils/symbolMapper.js';
import type {
FilledLifecycleOrderRow,
ReconciliationBackfillAuditInsert,
ReconciliationBackfillAuditQuery,
ReconciliationBackfillAuditRow,
ReconciliationBackfillBatchSummary,
ReconciliationBackfillOrderInsert,
ReconciliationSubTagRepairSummary,
StaleOrderScope,
VirtualOpenPosition
} from './tradingPersistenceTypes.js';
export type {
FilledLifecycleOrderRow,
ReconciliationBackfillAuditInsert,
ReconciliationBackfillAuditQuery,
ReconciliationBackfillAuditRow,
ReconciliationBackfillBatchSummary,
ReconciliationBackfillOrderInsert,
ReconciliationSubTagRepairSummary,
StaleOrderScope,
VirtualOpenPosition
} from './tradingPersistenceTypes.js';
import type { UserConfig } from './tradingUserTypes.js';
export type { UserConfig } from './tradingUserTypes.js';
class SupabaseService {
private client: SupabaseClient | null = null;
private tradeHistorySupportsSource: boolean | null = null;
private ordersSupportsSubTag: boolean | null = null;
private reconciliationBackfillAuditTableAvailable: boolean | null = null;
private snapshotOwnerId: string | null = null;
private readonly defaultRiskLimits = {
maxDailyLossUsd: 50,
maxOpenTrades: config.MAX_OPEN_TRADES,
maxConsecutiveLosses: 2
};
private readonly defaultExecution = {
orderType: 'market',
cooldownMinutes: 30,
entryMode: 'both'
};
constructor() {
const validUrl = /^https?:\/\//i.test(config.SUPABASE_URL ?? '');
if (validUrl && config.SUPABASE_KEY) {
this.client = createClient(config.SUPABASE_URL, config.SUPABASE_KEY);
} else {
logger.warn(
'Legacy Supabase URL/key not configured; Supabase-backed persistence is disabled. Cosmos-backed paths are unaffected.'
);
}
}
getClient(): SupabaseClient | null {
return this.client;
}
private isUuid(value: string | undefined | null): boolean {
const normalized = String(value || '').trim();
return /^[0-9a-f]{8}-[0-9a-f]{4}-[1-5][0-9a-f]{3}-[89ab][0-9a-f]{3}-[0-9a-f]{12}$/i.test(normalized);
}
private resolveSubTagIntent(action: unknown): AlpacaSubTagIntent {
const normalizedAction = normalizeOrderAction(String(action || ''));
if (normalizedAction === 'ENTRY' || normalizedAction === 'EXIT') {
return normalizedAction;
}
return 'UNKNOWN';
}
private buildLifecycleSymbolCandidates(symbol: string): string[] {
const normalized = String(symbol || '').trim();
if (!normalized) return [];
const provider = String(config.EXECUTION_PROVIDER || '').trim() || 'alpaca';
const variants = new Set<string>();
const push = (value: string) => {
const token = String(value || '').trim();
if (!token) return;
variants.add(token);
variants.add(token.toUpperCase());
};
push(normalized);
push(SymbolMapper.toTradeSymbol(normalized, provider));
push(SymbolMapper.toDataSymbol(normalized, provider));
return Array.from(variants.values());
}
private resolvePersistedOrderSubTag(order: {
profile_id?: string;
trade_id?: string;
action?: string;
sub_tag?: string;
subTag?: string;
}): string {
const explicitSubTag = String(order.sub_tag || order.subTag || '').trim();
if (explicitSubTag) return explicitSubTag;
const profileId = String(order.profile_id || '').trim();
if (!profileId) return '';
if (!shouldAttachAlpacaSubTag({ profileId })) return '';
const tradeId = String(order.trade_id || '').trim() || undefined;
const intent = this.resolveSubTagIntent(order.action);
return buildAlpacaSubTag({
profileId,
tradeId,
intent
}) || '';
}
private hydrateBackfillSubTags(rows: ReconciliationBackfillOrderInsert[]): ReconciliationBackfillOrderInsert[] {
return rows.map((row) => {
const subTag = this.resolvePersistedOrderSubTag({
profile_id: row.profile_id,
trade_id: row.trade_id,
action: row.action,
sub_tag: row.sub_tag
});
if (!subTag) return row;
if (subTag === String(row.sub_tag || '').trim()) return row;
return {
...row,
sub_tag: subTag
};
});
}
private inferLifecycleAction(actionRaw?: string | null, sideRaw?: string | null): 'ENTRY' | 'EXIT' | undefined {
const explicit = normalizeOrderAction(actionRaw || undefined);
if (explicit) return explicit;
const side = normalizeTradeSide(sideRaw || 'BUY');
return side === 'BUY' ? 'ENTRY' : 'EXIT';
}
private orderStatusRank(statusRaw?: string | null): number {
const status = normalizeOrderStatus(String(statusRaw || 'pending_new'));
if (status === 'filled') return 6;
if (status === 'partially_filled') return 5;
if (status === 'canceled' || status === 'rejected' || status === 'expired') return 4;
if (status === 'unknown') return 1;
return 2;
}
private pickMostReliableOrderStatus(currentStatusRaw?: string | null, incomingStatusRaw?: string | null): string {
const current = normalizeOrderStatus(String(currentStatusRaw || 'pending_new'));
const incoming = normalizeOrderStatus(String(incomingStatusRaw || 'pending_new'));
const currentRank = this.orderStatusRank(current);
const incomingRank = this.orderStatusRank(incoming);
return incomingRank >= currentRank ? incoming : current;
}
private decodeJwtPayload(token: string): Record<string, any> | null {
try {
const segments = token.split('.');
if (segments.length < 2) return null;
const payloadSegment = segments[1]
.replace(/-/g, '+')
.replace(/_/g, '/');
const padded = payloadSegment + '='.repeat((4 - (payloadSegment.length % 4)) % 4);
const json = Buffer.from(padded, 'base64').toString('utf8');
const parsed = JSON.parse(json);
return parsed && typeof parsed === 'object' ? parsed : null;
} catch {
return null;
}
}
async getActiveUsers(): Promise<UserConfig[]> {
if (!this.client) return [];
try {
const { data, error } = await this.client
.from('users')
.select('*')
.eq('trade_enable', true);
if (error) {
logger.error(`Supabase fetch error: ${error.message}`);
return [];
}
return data as UserConfig[];
} catch (err: any) {
logger.error(`Supabase unexpected error: ${err.message}`);
return [];
}
}
async logTransaction(transaction: {
user_id: string;
profile_id?: string;
symbol: string;
side: string;
entry_price: number;
exit_price: number;
size: number;
pnl: number;
pnl_percent: number;
reason: string;
timestamp: number;
stop_loss?: number;
take_profit?: number;
rules_metadata?: Record<string, any>;
trade_id?: string;
source?: 'BOT' | 'MANUAL';
}) {
if (!this.client) return;
try {
const { source: _source, ...transactionWithoutSource } = transaction;
const normalizedTransactionBase = {
...transactionWithoutSource,
side: normalizeTradeSide(transaction.side)
};
const normalizedTransactionWithSource = {
...normalizedTransactionBase,
source: transaction.source || 'BOT'
};
const shouldTrySource = this.tradeHistorySupportsSource !== false;
if (shouldTrySource) {
const { error: sourceInsertError } = await this.client
.from('trade_history')
.insert([normalizedTransactionWithSource]);
if (!sourceInsertError) {
this.tradeHistorySupportsSource = true;
logger.info(`✅ Logged trade to DB for user ${transaction.user_id} (${transaction.symbol})`);
return;
}
const missingSourceColumn = this.isMissingColumnError(sourceInsertError, 'trade_history', 'source');
if (!missingSourceColumn) {
logger.error(`Supabase transaction insert error: ${sourceInsertError.message}`);
return;
}
this.tradeHistorySupportsSource = false;
logger.warn('[Supabase] trade_history.source column not present. Retrying with legacy payload.');
}
const { error: fallbackError } = await this.client
.from('trade_history')
.insert([normalizedTransactionBase]);
if (fallbackError) {
logger.error(`Supabase legacy transaction insert error: ${fallbackError.message}`);
return;
}
logger.info(`✅ Logged trade to DB for user ${transaction.user_id} (${transaction.symbol})`);
} catch (err: any) {
logger.error(`Supabase unexpected transaction error: ${err.message}`);
}
}
private isMissingColumnError(error: any, tableName: string, columnName: string): boolean {
const message = String(error?.message || '').toLowerCase();
const details = String(error?.details || '').toLowerCase();
const hint = String(error?.hint || '').toLowerCase();
const column = columnName.toLowerCase();
const table = tableName.toLowerCase();
const mentionsMissingColumn =
message.includes(`could not find the '${column}' column`) ||
message.includes(`column ${table}.${column} does not exist`) ||
message.includes(`column "${column}" does not exist`) ||
details.includes(`column ${column}`) ||
hint.includes(`column ${column}`);
const mentionsTableContext =
message.includes(table) ||
details.includes(table) ||
message.includes('schema cache');
return mentionsMissingColumn && mentionsTableContext;
}
private isMissingRelationError(error: any, relationName: string): boolean {
const message = String(error?.message || '').toLowerCase();
const details = String(error?.details || '').toLowerCase();
const hint = String(error?.hint || '').toLowerCase();
const relation = relationName.toLowerCase();
return (
message.includes('does not exist')
&& (message.includes(relation) || details.includes(relation) || hint.includes(relation))
);
}
private isMissingOnConflictConstraint(error: any): boolean {
const message = String(error?.message || '').toLowerCase();
const details = String(error?.details || '').toLowerCase();
return (
message.includes('no unique or exclusion constraint matching the on conflict specification')
|| details.includes('no unique or exclusion constraint matching the on conflict specification')
);
}
async logOrder(order: {
user_id: string;
profile_id?: string;
order_id?: string;
symbol: string;
type: string;
side: string;
qty: number;
price: number;
status: string;
timestamp: number;
stop_loss?: number;
take_profit?: number;
trade_id?: string;
action?: string;
sub_tag?: string;
subTag?: string;
}) {
if (!this.client) return;
try {
const normalizedAction = normalizeOrderAction(order.action);
const explicitIncomingSubTag = String(order.sub_tag || order.subTag || '').trim();
const normalizedSubTag = this.resolvePersistedOrderSubTag({
profile_id: order.profile_id,
trade_id: order.trade_id,
action: normalizedAction,
sub_tag: order.sub_tag,
subTag: order.subTag
});
const canPersistSubTag = this.ordersSupportsSubTag !== false;
const normalizedOrder = {
...order,
type: normalizeOrderType(order.type),
side: normalizeTradeSide(order.side),
status: normalizeOrderStatus(order.status),
action: normalizedAction,
...(canPersistSubTag && normalizedSubTag ? { sub_tag: normalizedSubTag } : {})
};
const normalizedOrderId = String(normalizedOrder.order_id || '').trim();
const normalizedProfileId = String(normalizedOrder.profile_id || '').trim();
if (normalizedOrderId) {
let existingQuery = this.client
.from('orders')
.select('id,order_id,profile_id,status,trade_id,action,qty,price,timestamp,stop_loss,take_profit,sub_tag')
.eq('order_id', normalizedOrderId)
.order('created_at', { ascending: false })
.limit(1);
if (normalizedProfileId) {
existingQuery = existingQuery.eq('profile_id', normalizedProfileId);
}
const { data: existingRows, error: existingError } = await existingQuery;
if (existingError) {
logger.warn(`[Supabase] Existing-order lookup failed for ${normalizedOrderId}. Falling back to insert path: ${existingError.message}`);
} else if ((existingRows || []).length > 0) {
const existing = (existingRows || [])[0] as any;
const existingStatus = normalizeOrderStatus(String(existing?.status || 'pending_new'));
const mergedStatus = this.pickMostReliableOrderStatus(existingStatus, normalizedOrder.status);
const keepExistingTerminal = this.orderStatusRank(existingStatus) > this.orderStatusRank(normalizedOrder.status);
const existingQty = Number(existing?.qty || 0);
const existingPrice = Number(existing?.price || 0);
const existingTimestamp = Number(existing?.timestamp || 0);
const incomingTimestamp = Number(normalizedOrder.timestamp || 0);
const existingSubTag = String(existing?.sub_tag || '').trim();
const persistedSubTag = explicitIncomingSubTag
|| existingSubTag
|| normalizedSubTag
|| undefined;
const mergedPayload = {
...normalizedOrder,
profile_id: normalizedProfileId || existing?.profile_id || undefined,
trade_id: normalizedOrder.trade_id || existing?.trade_id || undefined,
action: normalizedOrder.action || normalizeOrderAction(existing?.action),
status: mergedStatus,
qty: keepExistingTerminal && Number.isFinite(existingQty) && existingQty > 0 ? existingQty : normalizedOrder.qty,
price: keepExistingTerminal && Number.isFinite(existingPrice) && existingPrice > 0 ? existingPrice : normalizedOrder.price,
timestamp: Math.max(
Number.isFinite(existingTimestamp) ? existingTimestamp : 0,
Number.isFinite(incomingTimestamp) ? incomingTimestamp : 0
),
stop_loss: normalizedOrder.stop_loss || Number(existing?.stop_loss || 0) || undefined,
take_profit: normalizedOrder.take_profit || Number(existing?.take_profit || 0) || undefined,
...(canPersistSubTag
? { sub_tag: persistedSubTag }
: {})
};
let updateQuery = this.client
.from('orders')
.update(mergedPayload)
.eq('order_id', normalizedOrderId);
if (normalizedProfileId || existing?.profile_id) {
updateQuery = updateQuery.eq('profile_id', normalizedProfileId || existing.profile_id);
}
let { error: updateError } = await updateQuery;
if (updateError && this.isMissingColumnError(updateError, 'orders', 'sub_tag')) {
this.ordersSupportsSubTag = false;
const { sub_tag: _subTag, ...legacyPayload } = mergedPayload as any;
let legacyUpdateQuery = this.client
.from('orders')
.update(legacyPayload)
.eq('order_id', normalizedOrderId);
if (normalizedProfileId || existing?.profile_id) {
legacyUpdateQuery = legacyUpdateQuery.eq('profile_id', normalizedProfileId || existing.profile_id);
}
const fallback = await legacyUpdateQuery;
updateError = fallback.error;
} else if (!updateError && mergedPayload.sub_tag) {
this.ordersSupportsSubTag = true;
}
if (updateError) {
logger.error(`[Supabase] Error de-duplicating order ${normalizedOrderId}: ${updateError.message}`);
} else {
logger.debug(`[Supabase] Upserted existing order by order_id=${normalizedOrderId}`);
}
return;
}
}
let { error } = await this.client
.from('orders')
.insert([normalizedOrder]);
if (error && this.isMissingColumnError(error, 'orders', 'sub_tag')) {
this.ordersSupportsSubTag = false;
const { sub_tag: _subTag, ...legacyPayload } = normalizedOrder as any;
const fallback = await this.client
.from('orders')
.insert([legacyPayload]);
error = fallback.error;
} else if (!error && (normalizedOrder as any).sub_tag) {
this.ordersSupportsSubTag = true;
}
if (error) {
logger.error(`[Supabase] Error logging order: ${error.message}`);
} else {
logger.info(`✅ Logged order to DB for user ${order.user_id} (${order.symbol})`);
}
} catch (err: any) {
logger.error(`Supabase unexpected order error: ${err.message}`);
}
}
async updateOrderStatus(orderId: string, status: string, filledAt?: Date, price?: number, qty?: number) {
if (!this.client) return;
try {
const updateData: any = {
status: normalizeOrderStatus(status),
updated_at: new Date().toISOString()
};
if (filledAt) {
updateData.filled_at = filledAt.toISOString();
}
if (price !== undefined && price > 0) {
updateData.price = price;
}
if (qty !== undefined && qty > 0) {
updateData.qty = qty;
}
// Try updating by both 'id' and 'order_id' fields for compatibility
const { error: error1 } = await this.client
.from('orders')
.update(updateData)
.eq('order_id', orderId);
const { error: error2 } = await this.client
.from('orders')
.update(updateData)
.eq('id', orderId);
if (error1 && error2) {
logger.error(`Supabase order update error: ${error1.message} / ${error2.message}`);
} else {
logger.debug(`[Supabase] Updated order ${orderId} status to ${status}`);
}
} catch (err: any) {
logger.error(`Supabase unexpected order update error: ${err.message}`);
}
}
async getLatestOrder(userId: string, symbol: string) {
if (!this.client) return null;
try {
const { data, error } = await this.client
.from('orders')
.select('*')
.eq('user_id', userId)
.eq('symbol', symbol)
.order('timestamp', { ascending: false })
.limit(1)
.maybeSingle();
if (error) {
logger.error(`[Supabase] Error fetching latest order for ${symbol}: ${error.message}`);
}
return data;
} catch (err: any) {
logger.error(`Supabase unexpected get order error: ${err.message}`);
return null;
}
}
async getOrderByTradeId(tradeId: string, profileId?: string) {
if (!this.client) return null;
const normalizedTradeId = String(tradeId || '').trim();
if (!normalizedTradeId) return null;
try {
let query = this.client
.from('orders')
.select('order_id,status,qty,price,symbol,action,stop_loss,take_profit')
.eq('trade_id', normalizedTradeId)
.order('created_at', { ascending: false })
.limit(1);
if (profileId) {
query = query.eq('profile_id', profileId);
}
const { data, error } = await query.maybeSingle();
if (error) {
logger.error(`[Supabase] Error fetching order for trade ${normalizedTradeId}: ${error.message}`);
return null;
}
return data;
} catch (err: any) {
logger.error(`Supabase unexpected get order by trade error: ${err.message}`);
return null;
}
}
async getActiveProfiles(): Promise<any[]> {
if (!this.client) return [];
try {
const { data, error } = await this.client
.from('trade_profiles')
.select('*')
.eq('is_active', true);
if (error) {
logger.error(`[Supabase] Error fetching profiles: ${error.message}`);
return [];
}
return (data || []).map((profile: any) => ({
...profile,
strategy_config: this.normalizeStrategyConfig(profile?.strategy_config)
}));
} catch (err: any) {
logger.error(`Supabase unexpected profile fetch error: ${err.message}`);
return [];
}
}
async getProfilesForUser(userId: string): Promise<Array<{
id: string;
user_id: string;
name: string;
allocated_capital: number;
is_active: boolean;
}>> {
if (!this.client || !this.isUuid(userId)) return [];
try {
const { data, error } = await this.client
.from('trade_profiles')
.select('id,user_id,name,allocated_capital,is_active')
.eq('user_id', userId)
.order('name', { ascending: true });
if (error) {
logger.error(`[Supabase] Error fetching profiles for user ${userId}: ${error.message}`);
return [];
}
return ((data || []) as any[]).map((row) => ({
id: String(row.id || ''),
user_id: String(row.user_id || ''),
name: String(row.name || row.id || 'Unnamed Profile'),
allocated_capital: Number(row.allocated_capital || 0),
is_active: Boolean(row.is_active)
})).filter((row) => this.isUuid(row.id) && this.isUuid(row.user_id));
} catch (err: any) {
logger.error(`[Supabase] Unexpected user profile fetch error for ${userId}: ${err.message}`);
return [];
}
}
async getAllProfiles(): Promise<Array<{
id: string;
user_id: string;
name: string;
allocated_capital: number;
is_active: boolean;
}>> {
if (!this.client) return [];
try {
const { data, error } = await this.client
.from('trade_profiles')
.select('id,user_id,name,allocated_capital,is_active')
.order('name', { ascending: true });
if (error) {
logger.error(`[Supabase] Error fetching all profiles: ${error.message}`);
return [];
}
return ((data || []) as any[]).map((row) => ({
id: String(row.id || ''),
user_id: String(row.user_id || ''),
name: String(row.name || row.id || 'Unnamed Profile'),
allocated_capital: Number(row.allocated_capital || 0),
is_active: Boolean(row.is_active)
})).filter((row) => this.isUuid(row.id) && this.isUuid(row.user_id));
} catch (err: any) {
logger.error(`[Supabase] Unexpected all profile fetch error: ${err.message}`);
return [];
}
}
private normalizeStrategyConfig(rawConfig: any): any {
const safeConfig = rawConfig && typeof rawConfig === 'object' && !Array.isArray(rawConfig)
? rawConfig
: {};
const rawRules = Array.isArray(safeConfig.rules) ? safeConfig.rules : [];
const rules = rawRules
.filter((rule: any) => rule && typeof rule === 'object' && typeof rule.ruleId === 'string')
.map((rule: any) => ({
ruleId: rule.ruleId,
enabled: Boolean(rule.enabled),
ruleType: (rule.ruleType === 'mandatory' || rule.ruleType === 'voting') ? rule.ruleType : undefined,
params: this.normalizeRuleParams(rule.ruleId, rule.params)
}));
const riskLimits = safeConfig.riskLimits && typeof safeConfig.riskLimits === 'object'
? safeConfig.riskLimits
: {};
const maxDailyLossUsd = Number(riskLimits.maxDailyLossUsd);
const maxOpenTrades = Number(riskLimits.maxOpenTrades);
const maxConsecutiveLosses = Number(riskLimits.maxConsecutiveLosses);
const dailyProfitTargetUsd = Number(riskLimits.dailyProfitTargetUsd);
const execution = safeConfig.execution && typeof safeConfig.execution === 'object'
? safeConfig.execution
: {};
const rawOrderType = String(execution.orderType || this.defaultExecution.orderType).toLowerCase();
const orderType = rawOrderType === 'limit' ? 'limit' : 'market';
const cooldownMinutes = Number(execution.cooldownMinutes);
const minRulePassRatio = Number(execution.minRulePassRatio);
const rawEntryMode = String(
execution.entryMode ?? (execution.longOnly ? 'long_only' : this.defaultExecution.entryMode)
).toLowerCase();
const entryMode = (rawEntryMode === 'long_only' || rawEntryMode === 'longonly' || rawEntryMode === 'buy_only')
? 'long_only'
: 'both';
return {
...safeConfig,
rules,
riskLimits: {
maxDailyLossUsd: Number.isFinite(maxDailyLossUsd) && maxDailyLossUsd > 0
? maxDailyLossUsd
: this.defaultRiskLimits.maxDailyLossUsd,
maxOpenTrades: Number.isFinite(maxOpenTrades) && maxOpenTrades > 0
? Math.floor(maxOpenTrades)
: this.defaultRiskLimits.maxOpenTrades,
maxConsecutiveLosses: Number.isFinite(maxConsecutiveLosses) && maxConsecutiveLosses >= 0
? Math.floor(maxConsecutiveLosses)
: this.defaultRiskLimits.maxConsecutiveLosses,
dailyProfitTargetUsd: Number.isFinite(dailyProfitTargetUsd) && dailyProfitTargetUsd > 0
? dailyProfitTargetUsd
: undefined
},
execution: {
...execution,
orderType,
cooldownMinutes: Number.isFinite(cooldownMinutes) && cooldownMinutes >= 0
? cooldownMinutes
: this.defaultExecution.cooldownMinutes,
entryMode,
minRulePassRatio: Number.isFinite(minRulePassRatio) && minRulePassRatio >= 0 && minRulePassRatio <= 1
? minRulePassRatio
: 1.0
}
};
}
private normalizeRuleParams(ruleId: string, rawParams: any): Record<string, any> {
const params = rawParams && typeof rawParams === 'object' && !Array.isArray(rawParams)
? { ...rawParams }
: {};
if (ruleId === 'SessionRule' && params.allowedSessions && !params.sessions) {
params.sessions = params.allowedSessions;
}
if (ruleId === 'AIAnalysisRule') {
if (params.confidenceThreshold !== undefined && params.minConfidence === undefined) {
params.minConfidence = params.confidenceThreshold;
}
const minConfidence = Number(params.minConfidence);
if (Number.isFinite(minConfidence) && minConfidence >= 0) {
params.minConfidence = minConfidence <= 1 ? minConfidence * 100 : minConfidence;
}
}
return params;
}
/**
* Returns today's realized net profit/loss (USD) for a profile.
* Can be positive (profit) or negative (loss).
*/
async getProfileDailyNetPnlUsd(profileId: string): Promise<number> {
if (!this.client) return 0;
try {
const now = new Date();
const startOfDayUtc = new Date(Date.UTC(
now.getUTCFullYear(),
now.getUTCMonth(),
now.getUTCDate()
));
const { data, error } = await this.client
.from('trade_history')
.select('pnl, created_at')
.eq('profile_id', profileId)
.gte('created_at', startOfDayUtc.toISOString())
.order('created_at', { ascending: false })
.limit(5000);
if (error) {
logger.error(`[Supabase] Error fetching daily net PnL for profile ${profileId}: ${error.message}`);
return 0;
}
const netPnl = (data || []).reduce((sum: number, row: any) => {
const pnl = Number(row?.pnl || 0);
return Number.isFinite(pnl) ? sum + pnl : sum;
}, 0);
return netPnl;
} catch (err: any) {
logger.error(`[Supabase] Unexpected daily net PnL error for profile ${profileId}: ${err.message}`);
return 0;
}
}
/**
* Returns today's realized loss (absolute USD) for a profile.
* Positive net pnl returns 0, negative net pnl returns abs(net).
*/
async getProfileDailyLossUsd(profileId: string): Promise<number> {
if (!this.client) return 0;
try {
const now = new Date();
const startOfDayUtc = new Date(Date.UTC(
now.getUTCFullYear(),
now.getUTCMonth(),
now.getUTCDate()
));
const { data, error } = await this.client
.from('trade_history')
.select('pnl, created_at')
.eq('profile_id', profileId)
.gte('created_at', startOfDayUtc.toISOString())
.order('created_at', { ascending: false })
.limit(5000);
if (error) {
logger.error(`[Supabase] Error fetching daily loss for profile ${profileId}: ${error.message}`);
return 0;
}
const netPnl = (data || []).reduce((sum: number, row: any) => {
const pnl = Number(row?.pnl || 0);
return Number.isFinite(pnl) ? sum + pnl : sum;
}, 0);
return netPnl < 0 ? Math.abs(netPnl) : 0;
} catch (err: any) {
logger.error(`[Supabase] Unexpected daily loss error for profile ${profileId}: ${err.message}`);
return 0;
}
}
/**
* Returns the current consecutive losing trade count for a profile.
* Stops counting at first non-losing trade in reverse chronological order.
*/
async getProfileConsecutiveLosses(profileId: string, lookback: number = 100): Promise<number> {
if (!this.client) return 0;
try {
const cappedLookback = Math.max(1, Math.min(500, Math.floor(lookback)));
const { data, error } = await this.client
.from('trade_history')
.select('pnl, created_at')
.eq('profile_id', profileId)
.order('created_at', { ascending: false })
.limit(cappedLookback);
if (error) {
logger.error(`[Supabase] Error fetching consecutive losses for profile ${profileId}: ${error.message}`);
return 0;
}
let consecutiveLosses = 0;
for (const row of data || []) {
const pnl = Number((row as any)?.pnl || 0);
if (!Number.isFinite(pnl) || pnl >= 0) break;
consecutiveLosses++;
}
return consecutiveLosses;
} catch (err: any) {
logger.error(`[Supabase] Unexpected consecutive loss error for profile ${profileId}: ${err.message}`);
return 0;
}
}
/**
* Get orders stuck in pending_new status for more than X minutes.
* Optionally scoped to a single profile for per-account reconciliation.
*/
async getStaleOrders(
staleThresholdMinutes: number = 5,
scope?: string | StaleOrderScope
): Promise<any[]> {
if (!this.client) return [];
try {
const thresholdTime = new Date(Date.now() - staleThresholdMinutes * 60 * 1000).toISOString();
const scopeObject: StaleOrderScope = typeof scope === 'string'
? { profileId: scope }
: (scope || {});
const profileIdRaw = String(scopeObject.profileId || '').trim();
const profileId = this.isUuid(profileIdRaw) ? profileIdRaw : '';
const userId = String(scopeObject.userId || '').trim();
const includeOrphanUserOrders = Boolean(scopeObject.includeOrphanUserOrders && userId);
const profileNullOnly = Boolean(scopeObject.profileNullOnly);
let query = this.client
.from('orders')
.select('*')
.in('status', ['pending_new', 'pending', 'accepted', 'new'])
.lt('created_at', thresholdTime)
.order('created_at', { ascending: true })
.limit(250); // Process in bounded batches
if (profileNullOnly) {
query = query.is('profile_id', null);
if (userId) {
query = query.eq('user_id', userId);
}
} else if (profileId && includeOrphanUserOrders) {
query = query.or(`profile_id.eq.${profileId},and(profile_id.is.null,user_id.eq.${userId})`);
} else if (profileId) {
query = query.eq('profile_id', profileId);
} else if (userId) {
query = query.eq('user_id', userId);
}
const { data, error } = await query;
if (error) {
logger.error(`[Supabase] Error fetching stale orders: ${error.message}`);
return [];
}
return data || [];
} catch (err: any) {
logger.error(`Supabase unexpected stale orders error: ${err.message}`);
return [];
}
}
/**
* Get orders marked as 'expired' or 'unknown' (useful for cleanup/revert)
*/
async getExpiredOrUnknownOrders(): Promise<any[]> {
if (!this.client) return [];
try {
const { data, error } = await this.client
.from('orders')
.select('*')
.in('status', ['expired', 'unknown']);
if (error) {
logger.error(`[Supabase] Error fetching expired orders: ${error.message}`);
return [];
}
return data || [];
} catch (err: any) {
logger.error(`Supabase unexpected expired fetch error: ${err.message}`);
return [];
}
}
async getPendingOrdersForProfile(profileId: string): Promise<any[]> {
if (!this.client) return [];
try {
const { data, error } = await this.client
.from('orders')
.select('*')
.eq('profile_id', profileId)
.eq('status', 'pending_new');
if (error) {
logger.error(`[Supabase] Error fetching pending orders for profile ${profileId}: ${error.message}`);
return [];
}
return data || [];
} catch (err: any) {
logger.error(`Supabase unexpected pending fetch error: ${err.message}`);
return [];
}
}
async getOpenOrdersForProfile(profileId: string): Promise<any[]> {
if (!this.client || !profileId) return [];
try {
const openStatuses = [
'pending_new',
'accepted',
'pending',
'new',
'partially_filled',
'partially-filled'
];
const { data, error } = await this.client
.from('orders')
.select('id,order_id,profile_id,symbol,status,action,trade_id,created_at')
.eq('profile_id', profileId)
.in('status', openStatuses)
.order('created_at', { ascending: true });
if (error) {
logger.error(`[Supabase] Error fetching open orders for profile ${profileId}: ${error.message}`);
return [];
}
return data || [];
} catch (err: any) {
logger.error(`Supabase unexpected open orders fetch error for profile ${profileId}: ${err.message}`);
return [];
}
}
async getRecentlyClosedOrdersForProfile(profileId: string, minutes: number = 10): Promise<any[]> {
if (!this.client || !profileId) return [];
const safeMinutes = Math.max(1, Math.floor(minutes));
const since = new Date(Date.now() - safeMinutes * 60 * 1000).toISOString();
try {
const { data, error } = await this.client
.from('orders')
.select('*')
.eq('profile_id', profileId)
.in('status', ['filled', 'canceled', 'expired', 'rejected', 'unknown'])
.gte('updated_at', since)
.order('updated_at', { ascending: true });
if (error) {
logger.error(`[Supabase] Error fetching recent closed orders for profile ${profileId}: ${error.message}`);
return [];
}
return data || [];
} catch (err: any) {
logger.error(`Supabase unexpected recent closed orders fetch error for profile ${profileId}: ${err.message}`);
return [];
}
}
private async fetchFilledLifecycleOrders(options: {
userId?: string;
profileId?: string;
symbols?: string[];
maxRows?: number;
}): Promise<{ rows: FilledLifecycleOrderRow[]; truncated: boolean }> {
if (!this.client) return { rows: [], truncated: false };
const safeUserId = this.isUuid(options.userId) ? String(options.userId) : '';
const safeProfileId = this.isUuid(options.profileId) ? String(options.profileId) : '';
const safeSymbols = Array.isArray(options.symbols)
? options.symbols.map((value) => String(value || '').trim()).filter(Boolean)
: [];
const maxRows = Math.max(1000, Math.min(200_000, Math.floor(Number(options.maxRows || 50_000))));
const statusFilter = ['filled', 'partially_filled', 'partially-filled'];
const columnVariants = [
'id,order_id,user_id,profile_id,symbol,trade_id,action,side,qty,quantity,price,status,source,sub_tag,stop_loss,take_profit,timestamp,created_at,filled_at,type',
'id,order_id,user_id,profile_id,symbol,trade_id,action,side,qty,quantity,price,status,source,stop_loss,take_profit,timestamp,created_at,filled_at,type',
'id,order_id,user_id,profile_id,symbol,trade_id,action,side,qty,quantity,price,status,source,timestamp,created_at,filled_at,type',
'id,order_id,user_id,profile_id,symbol,trade_id,action,side,qty,quantity,price,status,source,timestamp,created_at,type',
'id,order_id,user_id,profile_id,symbol,trade_id,action,side,qty,quantity,price,status,timestamp,created_at,type'
];
const rows: FilledLifecycleOrderRow[] = [];
const pageSize = 1000;
let offset = 0;
let truncated = false;
let selectedColumns = columnVariants[0];
const buildQuery = (columns: string) => {
let query = this.client!
.from('orders')
.select(columns)
.in('status', statusFilter)
.order('created_at', { ascending: true })
.range(offset, offset + pageSize - 1);
if (safeUserId) {
query = query.eq('user_id', safeUserId);
}
if (safeProfileId) {
query = query.eq('profile_id', safeProfileId);
}
if (safeSymbols.length > 0) {
query = query.in('symbol', safeSymbols);
}
return query;
};
try {
for (; ;) {
let data: FilledLifecycleOrderRow[] | null = null;
let finalError: any = null;
for (const columns of columnVariants) {
const { data: candidateData, error } = await buildQuery(columns);
if (!error) {
selectedColumns = columns;
data = (candidateData || []) as FilledLifecycleOrderRow[];
finalError = null;
break;
}
finalError = error;
const missingKnownColumn = this.isMissingColumnError(error, 'orders', 'sub_tag')
|| this.isMissingColumnError(error, 'orders', 'filled_at')
|| this.isMissingColumnError(error, 'orders', 'stop_loss')
|| this.isMissingColumnError(error, 'orders', 'take_profit')
|| this.isMissingColumnError(error, 'orders', 'quantity')
|| this.isMissingColumnError(error, 'orders', 'source');
if (!missingKnownColumn) {
break;
}
}
if (finalError) {
logger.error(`[Supabase] Error fetching filled lifecycle rows (columns=${selectedColumns}): ${finalError.message}`);
return { rows: [], truncated: false };
}
const batch = data || [];
if (batch.length === 0) break;
rows.push(...batch);
if (rows.length >= maxRows) {
rows.length = maxRows;
truncated = true;
break;
}
if (batch.length < pageSize) break;
offset += pageSize;
}
return { rows, truncated };
} catch (err: any) {
logger.error(`[Supabase] Unexpected filled lifecycle fetch error: ${err.message}`);
return { rows: [], truncated: false };
}
}
async getFilledLifecycleOrdersForProfile(profileId: string, symbols?: string[]): Promise<FilledLifecycleOrderRow[]> {
if (!profileId) return [];
const result = await this.fetchFilledLifecycleOrders({
profileId,
symbols
});
return result.rows;
}
async getFilledLifecycleOrdersForUser(options: {
userId: string;
profileId?: string;
symbols?: string[];
maxRows?: number;
}): Promise<{ rows: FilledLifecycleOrderRow[]; truncated: boolean }> {
const userId = String(options.userId || '').trim();
if (!this.isUuid(userId)) return { rows: [], truncated: false };
const profileId = String(options.profileId || '').trim();
return await this.fetchFilledLifecycleOrders({
userId,
profileId: profileId || undefined,
symbols: options.symbols,
maxRows: options.maxRows
});
}
async getFilledLifecycleOrdersGlobal(options?: {
profileId?: string;
symbols?: string[];
maxRows?: number;
}): Promise<{ rows: FilledLifecycleOrderRow[]; truncated: boolean }> {
const profileId = String(options?.profileId || '').trim();
return await this.fetchFilledLifecycleOrders({
profileId: profileId || undefined,
symbols: options?.symbols,
maxRows: options?.maxRows
});
}
async getExistingOrderIds(orderIds: string[], profileId?: string): Promise<Set<string>> {
if (!this.client) return new Set();
const normalizedIds = Array.from(new Set(orderIds.map((id) => String(id || '').trim()).filter(Boolean)));
if (normalizedIds.length === 0) return new Set();
const found = new Set<string>();
const chunkSize = 100;
for (let i = 0; i < normalizedIds.length; i += chunkSize) {
const chunk = normalizedIds.slice(i, i + chunkSize);
try {
let query = this.client
.from('orders')
.select('order_id')
.in('order_id', chunk)
.limit(chunk.length);
if (this.isUuid(profileId)) {
query = query.eq('profile_id', profileId);
}
const { data, error } = await query;
if (error) {
logger.error(`[Supabase] Error checking existing order ids: ${error.message}`);
continue;
}
for (const row of (data || []) as Array<{ order_id?: string | null }>) {
const orderId = String(row.order_id || '').trim();
if (orderId) found.add(orderId);
}
} catch (err: any) {
logger.error(`[Supabase] Unexpected existing order id lookup error: ${err.message}`);
}
}
return found;
}
async upsertReconciliationBackfillOrders(rows: ReconciliationBackfillOrderInsert[]): Promise<boolean> {
const client = this.client;
if (!client || rows.length === 0) return true;
const normalizedInputRows = this.hydrateBackfillSubTags(rows);
const stripSubTag = (inputRows: ReconciliationBackfillOrderInsert[]): ReconciliationBackfillOrderInsert[] =>
inputRows.map((row) => {
const { sub_tag: _subTag, ...rest } = row as any;
return rest as ReconciliationBackfillOrderInsert;
});
const stripFilledAt = (inputRows: ReconciliationBackfillOrderInsert[]): ReconciliationBackfillOrderInsert[] =>
inputRows.map((row) => {
const { filled_at: _filledAt, ...rest } = row as any;
return rest as ReconciliationBackfillOrderInsert;
});
const adaptRowsForMissingColumn = (
inputRows: ReconciliationBackfillOrderInsert[],
error: any
): { rows: ReconciliationBackfillOrderInsert[]; changed: boolean } => {
let rowsOut = inputRows;
let changed = false;
if (this.isMissingColumnError(error, 'orders', 'sub_tag')) {
this.ordersSupportsSubTag = false;
rowsOut = stripSubTag(rowsOut);
changed = true;
}
if (this.isMissingColumnError(error, 'orders', 'filled_at')) {
rowsOut = stripFilledAt(rowsOut);
changed = true;
}
return { rows: rowsOut, changed };
};
const tryInsertWithColumnFallback = async (inputRows: ReconciliationBackfillOrderInsert[]): Promise<{ error: any }> => {
let candidateRows = inputRows;
for (let attempt = 0; attempt < 3; attempt += 1) {
const result = await client
.from('orders')
.insert(candidateRows as any[]);
if (!result.error) {
if (candidateRows.some((row) => String((row as any).sub_tag || '').trim())) {
this.ordersSupportsSubTag = true;
}
return { error: null };
}
const adapted = adaptRowsForMissingColumn(candidateRows, result.error);
if (!adapted.changed) {
return { error: result.error };
}
candidateRows = adapted.rows;
}
return { error: new Error('Exceeded insert fallback attempts for backfill orders') };
};
const tryUpsertWithColumnFallback = async (inputRows: ReconciliationBackfillOrderInsert[]): Promise<{ error: any; rowsUsed: ReconciliationBackfillOrderInsert[] }> => {
let candidateRows = inputRows;
for (let attempt = 0; attempt < 3; attempt += 1) {
const result = await client
.from('orders')
.upsert(candidateRows as any[], {
onConflict: 'order_id',
ignoreDuplicates: true
});
if (!result.error) {
if (candidateRows.some((row) => String((row as any).sub_tag || '').trim())) {
this.ordersSupportsSubTag = true;
}
return { error: null, rowsUsed: candidateRows };
}
const adapted = adaptRowsForMissingColumn(candidateRows, result.error);
if (!adapted.changed) {
return { error: result.error, rowsUsed: candidateRows };
}
candidateRows = adapted.rows;
}
return {
error: new Error('Exceeded upsert fallback attempts for backfill orders'),
rowsUsed: inputRows
};
};
const dedupeRows = (input: ReconciliationBackfillOrderInsert[]): ReconciliationBackfillOrderInsert[] => {
const seen = new Set<string>();
const unique: ReconciliationBackfillOrderInsert[] = [];
for (const row of input) {
const profileId = String(row.profile_id || '').trim();
const orderId = String(row.order_id || '').trim();
if (!orderId) continue;
const key = `${profileId}::${orderId}`;
if (seen.has(key)) continue;
seen.add(key);
unique.push(row);
}
return unique;
};
const insertMissingRows = async (inputRows: ReconciliationBackfillOrderInsert[]): Promise<boolean> => {
const rowsWithSchemaHints = this.ordersSupportsSubTag === false
? stripSubTag(inputRows)
: inputRows;
const normalizedRows = dedupeRows(rowsWithSchemaHints);
if (normalizedRows.length === 0) return true;
const rowsByProfile = new Map<string, ReconciliationBackfillOrderInsert[]>();
for (const row of normalizedRows) {
const profileKey = String(row.profile_id || '').trim();
const list = rowsByProfile.get(profileKey) || [];
list.push(row);
rowsByProfile.set(profileKey, list);
}
const missingOnly: ReconciliationBackfillOrderInsert[] = [];
for (const [profileKey, profileRows] of rowsByProfile.entries()) {
const orderIds = profileRows.map((row) => String(row.order_id || '').trim()).filter(Boolean);
if (orderIds.length === 0) continue;
const existingIds = await this.getExistingOrderIds(orderIds, profileKey || undefined);
for (const row of profileRows) {
const orderId = String(row.order_id || '').trim();
if (!orderId || existingIds.has(orderId)) continue;
missingOnly.push(row);
}
}
if (missingOnly.length === 0) {
return true;
}
const { error: insertError } = await tryInsertWithColumnFallback(missingOnly);
if (!insertError) return true;
logger.error(`[Supabase] Backfill fallback insert failed: ${insertError.message}`);
return false;
};
try {
const rowsWithSchemaHints = this.ordersSupportsSubTag === false
? stripSubTag(normalizedInputRows)
: normalizedInputRows;
const { error, rowsUsed } = await tryUpsertWithColumnFallback(rowsWithSchemaHints);
if (!error) return true;
if (this.isMissingOnConflictConstraint(error)) {
logger.warn('[Supabase] orders.order_id lacks ON CONFLICT constraint. Falling back to pre-checked insert path.');
return await insertMissingRows(rowsUsed);
}
logger.error(`[Supabase] Backfill order upsert failed: ${error.message}`);
const { rows: fallbackRows } = adaptRowsForMissingColumn(rowsUsed, error);
const { error: fallbackError } = await tryUpsertWithColumnFallback(fallbackRows);
if (fallbackError && this.isMissingOnConflictConstraint(fallbackError)) {
logger.warn('[Supabase] orders.order_id lacks ON CONFLICT constraint on legacy schema fallback. Using pre-checked insert path.');
return await insertMissingRows(fallbackRows as ReconciliationBackfillOrderInsert[]);
}
if (fallbackError) {
logger.error(`[Supabase] Backfill order upsert fallback failed: ${fallbackError.message}`);
return false;
}
return true;
} catch (err: any) {
logger.error(`[Supabase] Unexpected backfill order upsert error: ${err.message}`);
return false;
}
}
async isReconciliationBackfillAuditAvailable(forceRefresh: boolean = false): Promise<boolean> {
if (!this.client) return false;
if (!forceRefresh && this.reconciliationBackfillAuditTableAvailable !== null) {
return this.reconciliationBackfillAuditTableAvailable;
}
try {
const { error } = await this.client
.from('reconciliation_backfill_audit')
.select('id')
.limit(1);
if (error) {
if (this.isMissingRelationError(error, 'reconciliation_backfill_audit')) {
this.reconciliationBackfillAuditTableAvailable = false;
return false;
}
logger.error(`[Supabase] Backfill audit table probe failed: ${error.message}`);
this.reconciliationBackfillAuditTableAvailable = false;
return false;
}
this.reconciliationBackfillAuditTableAvailable = true;
return true;
} catch (err: any) {
logger.error(`[Supabase] Unexpected backfill audit table probe error: ${err.message}`);
this.reconciliationBackfillAuditTableAvailable = false;
return false;
}
}
async insertReconciliationBackfillAuditRows(rows: ReconciliationBackfillAuditInsert[]): Promise<boolean> {
if (!this.client || rows.length === 0) return true;
const available = await this.isReconciliationBackfillAuditAvailable();
if (!available) {
logger.error('[Supabase] reconciliation_backfill_audit table is not available.');
return false;
}
try {
const { error } = await this.client
.from('reconciliation_backfill_audit')
.insert(rows);
if (error) {
logger.error(`[Supabase] Backfill audit insert failed: ${error.message}`);
return false;
}
return true;
} catch (err: any) {
logger.error(`[Supabase] Unexpected backfill audit insert error: ${err.message}`);
return false;
}
}
async getReconciliationBackfillAuditRows(query: ReconciliationBackfillAuditQuery): Promise<{ rows: ReconciliationBackfillAuditRow[]; totalCount: number }> {
if (!this.client) return { rows: [], totalCount: 0 };
const available = await this.isReconciliationBackfillAuditAvailable();
if (!available) return { rows: [], totalCount: 0 };
const safeLimit = Math.max(1, Math.min(500, Math.floor(Number(query.limit || 100))));
const safeOffset = Math.max(0, Math.floor(Number(query.offset || 0)));
try {
let builder = this.client
.from('reconciliation_backfill_audit')
.select(
'id,batch_id,profile_id,symbol,trade_id,exchange_order_id,exchange_client_order_id,backfill_order_id,filled_qty,filled_price,filled_at,dry_run,decision,reason,metadata,applied_at,reverted_at,created_at',
{ count: 'exact' }
)
.order('created_at', { ascending: false })
.range(safeOffset, safeOffset + safeLimit - 1);
const profileId = String(query.profileId || '').trim();
const symbol = String(query.symbol || '').trim();
const batchId = String(query.batchId || '').trim();
const fromIso = String(query.fromIso || '').trim();
const toIso = String(query.toIso || '').trim();
const decisions = Array.isArray(query.decisions)
? query.decisions.map((value) => String(value || '').trim()).filter(Boolean)
: [];
if (profileId) {
builder = builder.eq('profile_id', profileId);
}
if (symbol) {
builder = builder.eq('symbol', symbol);
}
if (batchId) {
builder = builder.eq('batch_id', batchId);
}
if (decisions.length > 0) {
builder = builder.in('decision', decisions);
}
if (fromIso) {
builder = builder.gte('created_at', fromIso);
}
if (toIso) {
builder = builder.lte('created_at', toIso);
}
const { data, error, count } = await builder;
if (error) {
logger.error(`[Supabase] Backfill audit row query failed: ${error.message}`);
return { rows: [], totalCount: 0 };
}
return {
rows: ((data || []) as ReconciliationBackfillAuditRow[]),
totalCount: Number(count || 0)
};
} catch (err: any) {
logger.error(`[Supabase] Unexpected backfill audit row query error: ${err.message}`);
return { rows: [], totalCount: 0 };
}
}
async getReconciliationBackfillBatchSummaries(query: {
profileId?: string;
symbol?: string;
fromIso?: string;
toIso?: string;
limit?: number;
}): Promise<ReconciliationBackfillBatchSummary[]> {
if (!this.client) return [];
const available = await this.isReconciliationBackfillAuditAvailable();
if (!available) return [];
const safeBatchLimit = Math.max(1, Math.min(100, Math.floor(Number(query.limit || 20))));
const scanLimit = Math.max(500, safeBatchLimit * 200);
try {
let builder = this.client
.from('reconciliation_backfill_audit')
.select('batch_id,profile_id,symbol,decision,dry_run,created_at,applied_at,reverted_at')
.order('created_at', { ascending: false })
.limit(scanLimit);
const profileId = String(query.profileId || '').trim();
const symbol = String(query.symbol || '').trim();
const fromIso = String(query.fromIso || '').trim();
const toIso = String(query.toIso || '').trim();
if (profileId) {
builder = builder.eq('profile_id', profileId);
}
if (symbol) {
builder = builder.eq('symbol', symbol);
}
if (fromIso) {
builder = builder.gte('created_at', fromIso);
}
if (toIso) {
builder = builder.lte('created_at', toIso);
}
const { data, error } = await builder;
if (error) {
logger.error(`[Supabase] Backfill batch summary query failed: ${error.message}`);
return [];
}
const rows = (data || []) as Array<{
batch_id?: string | null;
profile_id?: string | null;
symbol?: string | null;
decision?: string | null;
dry_run?: boolean | null;
created_at?: string | null;
applied_at?: string | null;
reverted_at?: string | null;
}>;
const summaries = new Map<string, ReconciliationBackfillBatchSummary>();
for (const row of rows) {
const batchId = String(row.batch_id || '').trim();
if (!batchId) continue;
const createdAt = String(row.created_at || '').trim();
if (!createdAt) continue;
let summary = summaries.get(batchId);
if (!summary) {
summary = {
batchId,
firstSeenAt: createdAt,
lastSeenAt: createdAt,
profileIds: [],
symbols: [],
totalRows: 0,
byDecision: {},
dryRunRows: 0,
appliedRows: 0,
revertedRows: 0
};
summaries.set(batchId, summary);
}
summary.totalRows += 1;
const decision = String(row.decision || '').trim() || 'UNKNOWN';
summary.byDecision[decision] = (summary.byDecision[decision] || 0) + 1;
if (row.dry_run) summary.dryRunRows += 1;
if (String(row.applied_at || '').trim()) summary.appliedRows += 1;
if (String(row.reverted_at || '').trim()) summary.revertedRows += 1;
const profile = String(row.profile_id || '').trim();
const symbolValue = String(row.symbol || '').trim();
if (profile && !summary.profileIds.includes(profile)) summary.profileIds.push(profile);
if (symbolValue && !summary.symbols.includes(symbolValue)) summary.symbols.push(symbolValue);
if (createdAt < summary.firstSeenAt) summary.firstSeenAt = createdAt;
if (createdAt > summary.lastSeenAt) summary.lastSeenAt = createdAt;
}
return Array.from(summaries.values())
.sort((a, b) => Date.parse(b.lastSeenAt) - Date.parse(a.lastSeenAt))
.slice(0, safeBatchLimit);
} catch (err: any) {
logger.error(`[Supabase] Unexpected backfill batch summary error: ${err.message}`);
return [];
}
}
/**
* Reverts a reconciliation backfill batch in a non-destructive way.
* Safety model:
* - never delete rows
* - status-only rollback on synthetic BFILL-* orders
* - keep immutable audit history with decision=reverted marker
*/
async revertBackfillBatch(batchId: string): Promise<{ reverted: number; errors: string[] }> {
const errors: string[] = [];
if (!this.client || !batchId) {
errors.push('Missing client or batchId');
return { reverted: 0, errors };
}
try {
// 1. Read applied rows for this batch.
const { data: auditRows, error: auditError } = await this.client
.from('reconciliation_backfill_audit')
.select('backfill_order_id')
.eq('batch_id', batchId)
.eq('decision', 'APPLIED');
if (auditError) {
errors.push(`Revert fetch audit mapping failed: ${auditError.message}`);
return { reverted: 0, errors };
}
const orderIds = Array.from(
new Set(
(auditRows || [])
.map((row: any) => String(row.backfill_order_id || '').trim())
.filter((id) => id.length > 0 && id.startsWith('BFILL-'))
)
);
if (orderIds.length > 0) {
// 2. Status-only rollback for synthetic backfill orders.
const { error: updateError } = await this.client
.from('orders')
.update({ status: 'canceled' })
.in('order_id', orderIds)
.like('order_id', 'BFILL-%');
if (updateError) {
errors.push(`Order status rollback failed: ${updateError.message}`);
return { reverted: 0, errors };
}
}
// 3. Mark audit rows as REVERTED (append-only audit).
const { error: markError } = await this.client
.from('reconciliation_backfill_audit')
.update({
decision: 'REVERTED',
reason: 'operator_revert_status_only',
reverted_at: new Date().toISOString()
})
.eq('batch_id', batchId)
.eq('decision', 'APPLIED');
if (markError) {
errors.push(`Audit mark REVERTED failed: ${markError.message}`);
}
return { reverted: orderIds.length, errors };
} catch (err: any) {
logger.error(`[Supabase] Unexpected backfill batch revert error: ${err.message}`);
errors.push(`Unexpected error: ${err.message}`);
return { reverted: 0, errors };
}
}
async hasActiveOrderForTradeId(tradeId: string, profileId?: string): Promise<boolean> {
if (!this.client) return false;
const normalizedTradeId = String(tradeId || '').trim();
if (!normalizedTradeId) return false;
try {
let query = this.client
.from('orders')
.select('id')
.eq('trade_id', normalizedTradeId)
.in('status', ['pending_new', 'accepted', 'new', 'partially_filled'])
.limit(1);
if (this.isUuid(profileId)) {
query = query.eq('profile_id', profileId);
}
const { data, error } = await query;
if (error) {
logger.error(`[Supabase] Error checking active trade ${normalizedTradeId}: ${error.message}`);
return false;
}
return (data || []).length > 0;
} catch (err: any) {
logger.error(`[Supabase] Unexpected active trade check error for ${normalizedTradeId}: ${err.message}`);
return false;
}
}
async getRecentOrdersForProfile(profileId: string, limit: number = 50): Promise<any[]> {
if (!this.client) return [];
try {
const safeLimit = Math.max(1, Math.min(500, Math.floor(limit)));
const { data, error } = await this.client
.from('orders')
.select('id,order_id,profile_id,symbol,status,action,trade_id,created_at')
.eq('profile_id', profileId)
.order('created_at', { ascending: false })
.limit(safeLimit);
if (error) {
logger.error(`[Supabase] Error fetching recent orders for profile ${profileId}: ${error.message}`);
return [];
}
return data || [];
} catch (err: any) {
logger.error(`[Supabase] Unexpected recent-order fetch error for profile ${profileId}: ${err.message}`);
return [];
}
}
async getKnownTradeIdsForProfile(profileId: string, limit: number = 2000): Promise<string[]> {
if (!this.client || !profileId) return [];
const safeLimit = Math.max(1, Math.min(10000, Math.floor(limit)));
const pageSize = Math.min(1000, safeLimit);
const tradeIds = new Set<string>();
let offset = 0;
try {
while (tradeIds.size < safeLimit) {
const { data, error } = await this.client
.from('orders')
.select('trade_id')
.eq('profile_id', profileId)
.not('trade_id', 'is', null)
.order('created_at', { ascending: false })
.range(offset, offset + pageSize - 1);
if (error) {
logger.error(`[Supabase] Error fetching known trade ids for profile ${profileId}: ${error.message}`);
return Array.from(tradeIds);
}
const chunk = (data || []) as Array<{ trade_id?: string | null }>;
if (chunk.length === 0) break;
for (const row of chunk) {
const tradeId = String(row.trade_id || '').trim();
if (!tradeId) continue;
tradeIds.add(tradeId);
if (tradeIds.size >= safeLimit) break;
}
if (chunk.length < pageSize) break;
offset += pageSize;
}
return Array.from(tradeIds);
} catch (err: any) {
logger.error(`[Supabase] Unexpected known trade-id fetch error for profile ${profileId}: ${err.message}`);
return Array.from(tradeIds);
}
}
async repairMissingSubTagsForProfile(options: {
profileId: string;
lookbackHours: number;
maxRows: number;
dryRun: boolean;
}): Promise<ReconciliationSubTagRepairSummary> {
const summary: ReconciliationSubTagRepairSummary = {
attempted: true,
scannedRows: 0,
eligibleRows: 0,
updatedRows: 0,
skippedNoProfile: 0,
skippedNoTrade: 0,
skippedTagDisabled: 0,
skippedAlreadyTagged: 0,
dryRun: Boolean(options.dryRun)
};
if (!this.client) {
return {
...summary,
attempted: false
};
}
const profileId = String(options.profileId || '').trim();
if (!this.isUuid(profileId)) {
return {
...summary,
attempted: false
};
}
if (this.ordersSupportsSubTag === false) {
return {
...summary,
unsupported: true
};
}
const lookbackHours = Math.max(1, Math.floor(Number(options.lookbackHours || 720)));
const maxRows = Math.max(1, Math.min(5000, Math.floor(Number(options.maxRows || 500))));
const sinceIso = new Date(Date.now() - lookbackHours * 60 * 60 * 1000).toISOString();
type RepairCandidateRow = {
id?: string | null;
profile_id?: string | null;
trade_id?: string | null;
action?: string | null;
side?: string | null;
sub_tag?: string | null;
source?: string | null;
};
let rows: RepairCandidateRow[] = [];
try {
const { data, error } = await this.client
.from('orders')
.select('id,profile_id,trade_id,action,side,sub_tag,source')
.eq('profile_id', profileId)
.is('sub_tag', null)
.or('source.is.null,source.neq.MANUAL')
.gte('created_at', sinceIso)
.order('created_at', { ascending: false })
.limit(maxRows);
if (error) {
if (this.isMissingColumnError(error, 'orders', 'sub_tag')) {
this.ordersSupportsSubTag = false;
return {
...summary,
unsupported: true
};
}
logger.error(`[Supabase] Missing sub_tag repair query failed for profile ${profileId}: ${error.message}`);
return summary;
}
rows = (data || []) as RepairCandidateRow[];
} catch (err: any) {
logger.error(`[Supabase] Unexpected missing sub_tag repair query error for profile ${profileId}: ${err.message}`);
return summary;
}
summary.scannedRows = rows.length;
if (rows.length === 0) return summary;
for (const row of rows) {
const rowId = String(row.id || '').trim();
if (!rowId) continue;
const rowProfileId = String(row.profile_id || '').trim();
if (!rowProfileId) {
summary.skippedNoProfile += 1;
continue;
}
const rowTradeId = String(row.trade_id || '').trim();
if (!rowTradeId) {
summary.skippedNoTrade += 1;
continue;
}
const existingTag = String(row.sub_tag || '').trim();
if (existingTag) {
summary.skippedAlreadyTagged += 1;
continue;
}
const action = normalizeOrderAction(row.action || undefined)
|| (normalizeTradeSide(String(row.side || 'BUY')) === 'SELL' ? 'EXIT' : 'ENTRY');
const derivedSubTag = this.resolvePersistedOrderSubTag({
profile_id: rowProfileId,
trade_id: rowTradeId,
action
});
if (!derivedSubTag) {
summary.skippedTagDisabled += 1;
continue;
}
summary.eligibleRows += 1;
if (summary.dryRun) continue;
try {
const { error } = await this.client
.from('orders')
.update({
sub_tag: derivedSubTag,
updated_at: new Date().toISOString()
})
.eq('id', rowId)
.is('sub_tag', null);
if (error) {
if (this.isMissingColumnError(error, 'orders', 'sub_tag')) {
this.ordersSupportsSubTag = false;
return {
...summary,
unsupported: true
};
}
logger.error(`[Supabase] Missing sub_tag repair update failed for profile ${profileId} row ${rowId}: ${error.message}`);
continue;
}
this.ordersSupportsSubTag = true;
summary.updatedRows += 1;
} catch (err: any) {
logger.error(`[Supabase] Unexpected missing sub_tag repair update error for profile ${profileId} row ${rowId}: ${err.message}`);
}
}
return summary;
}
/**
* Retrieves the latest FILLED ENTRY order to correctly link a trade chain.
* Searches by Profile ID (if provided) or User ID.
*/
async getLatestFilledEntry(userId: string, symbol: string, profileId?: string) {
if (!this.client) return null;
try {
let query = this.client
.from('orders')
.select('*')
.eq('symbol', symbol)
.eq('action', 'ENTRY')
.in('status', ['filled', 'partially_filled']);
const normalizedProfileId = String(profileId || '').trim();
if (this.isUuid(normalizedProfileId)) {
query = query.eq('profile_id', normalizedProfileId);
} else {
query = query.eq('user_id', userId);
}
const { data, error } = await query
.order('timestamp', { ascending: false }) // Use timestamp if available, fallback to created_at
.limit(1)
.maybeSingle();
if (error) {
logger.error(`[Supabase] Error fetching latest filled entry for ${symbol}: ${error.message}`);
return null;
}
return data;
} catch (err: any) {
logger.error(`Supabase unexpected filled entry fetch error: ${err.message}`);
return null;
}
}
/**
* Broad search for any recent relevant order to recover context
*/
async getLatestEntryOrder(profileId: string | undefined, symbol: string, userId?: string) {
if (!this.client) return null;
try {
let query = this.client
.from('orders')
.select('*')
.eq('symbol', symbol)
.eq('action', 'ENTRY')
.order('created_at', { ascending: false })
.limit(1);
const normalizedProfileId = String(profileId || '').trim();
if (this.isUuid(normalizedProfileId)) {
query = query.eq('profile_id', normalizedProfileId);
} else {
const normalizedUserId = String(userId || '').trim();
if (normalizedUserId) {
query = query.eq('user_id', normalizedUserId);
}
}
const { data, error } = await query.maybeSingle();
if (error) {
logger.error(`[Supabase] Error fetching latest entry order for ${symbol}: ${error.message}`);
}
return data;
} catch (err: any) {
logger.error(`Supabase unexpected get entry order error: ${err.message}`);
return null;
}
}
/**
* Finds the most recent filled ENTRY order with non-zero risk levels.
* Used as a fallback when virtual position reconstruction lacks SL/TP.
*/
async getLatestEntryRiskOrder(profileId: string, symbol: string, side?: 'BUY' | 'SELL') {
if (!this.client) return null;
try {
let query = this.client
.from('orders')
.select('*')
.eq('profile_id', profileId)
.eq('symbol', symbol)
.eq('action', 'ENTRY')
.in('status', ['filled', 'partially_filled'])
.or('stop_loss.gt.0,take_profit.gt.0')
.order('created_at', { ascending: false })
.limit(1);
if (side) {
query = query.eq('side', side);
}
const { data, error } = await query.maybeSingle();
if (error) {
logger.error(`[Supabase] Error fetching latest entry risk order for ${profileId}/${symbol}: ${error.message}`);
return null;
}
return data;
} catch (err: any) {
logger.error(`[Supabase] Unexpected latest entry risk order fetch error: ${err.message}`);
return null;
}
}
/**
* Returns true if lifecycle has at least one filled ENTRY order.
* This prevents synthetic/ambiguous trade_ids from generating fake PnL logs.
*/
async hasLifecycleEntryOrder(tradeId: string, profileId?: string, symbol?: string): Promise<boolean> {
if (!this.client) return false;
const normalizedTradeId = String(tradeId || '').trim();
if (!normalizedTradeId) return false;
try {
let query = this.client
.from('orders')
.select('id')
.eq('trade_id', normalizedTradeId)
.eq('action', 'ENTRY')
.in('status', ['filled', 'partially_filled'])
.limit(1);
if (this.isUuid(profileId)) {
query = query.eq('profile_id', profileId);
}
if (symbol) {
query = query.eq('symbol', symbol);
}
const { data, error } = await query;
if (error) {
logger.error(`[Supabase] Error checking lifecycle entry for ${normalizedTradeId}: ${error.message}. Failing closed.`);
return false;
}
if ((data || []).length > 0) {
return true;
}
// Legacy fallback: older rows can have NULL action for BUY entries.
let legacyQuery = this.client
.from('orders')
.select('id')
.eq('trade_id', normalizedTradeId)
.is('action', null)
.in('side', ['BUY', 'buy'])
.in('status', ['filled', 'partially_filled'])
.limit(1);
if (this.isUuid(profileId)) {
legacyQuery = legacyQuery.eq('profile_id', profileId);
}
if (symbol) {
legacyQuery = legacyQuery.eq('symbol', symbol);
}
const { data: legacyData, error: legacyError } = await legacyQuery;
if (legacyError) {
logger.error(`[Supabase] Error checking legacy lifecycle entry for ${normalizedTradeId}: ${legacyError.message}. Failing closed.`);
return false;
}
return (legacyData || []).length > 0;
} catch (err: any) {
logger.error(`[Supabase] Unexpected lifecycle entry check error for ${normalizedTradeId}: ${err.message}. Failing closed.`);
return false;
}
}
/**
* Strict attribution gate:
* returns true only when the ENTRY chain has at least one filled row carrying
* a Bytelyst sub-tag that maps back to the provided profile.
*/
async hasLifecycleEntryOrderWithProfileSubTag(tradeId: string, profileId: string, symbol?: string): Promise<boolean> {
if (!this.client) return false;
const normalizedTradeId = String(tradeId || '').trim();
const normalizedProfileId = String(profileId || '').trim();
if (!normalizedTradeId || !this.isUuid(normalizedProfileId)) return false;
try {
let query = this.client
.from('orders')
.select('sub_tag')
.eq('trade_id', normalizedTradeId)
.eq('profile_id', normalizedProfileId)
.eq('action', 'ENTRY')
.in('status', ['filled', 'partially_filled'])
.limit(250);
if (symbol) {
query = query.eq('symbol', symbol);
}
const { data, error } = await query;
if (error) {
logger.error(`[Supabase] Error checking lifecycle entry sub-tag attribution for ${normalizedTradeId}: ${error.message}`);
return false;
}
for (const row of (data || []) as Array<{ sub_tag?: string | null }>) {
const subTag = String(row?.sub_tag || '').trim();
if (!subTag) continue;
if (!isBytelystSubTag(subTag)) continue;
if (subTagBelongsToProfile(subTag, normalizedProfileId)) {
return true;
}
}
return false;
} catch (err: any) {
logger.error(`[Supabase] Unexpected lifecycle entry sub-tag attribution error for ${normalizedTradeId}: ${err.message}`);
return false;
}
}
/**
* Returns true when a trade lifecycle already has a finalized (non-partial) history row.
* Used to enforce idempotent close logging.
*/
async hasFinalizedTradeHistory(tradeId: string, profileId?: string, symbol?: string): Promise<boolean> {
if (!this.client) return false;
const normalizedTradeId = String(tradeId || '').trim();
if (!normalizedTradeId) return false;
try {
let query = this.client
.from('trade_history')
.select('id,reason')
.eq('trade_id', normalizedTradeId)
.limit(50);
if (profileId) {
query = query.eq('profile_id', profileId);
}
if (symbol) {
query = query.eq('symbol', symbol);
}
const { data, error } = await query;
if (error) {
logger.error(`[Supabase] Error checking finalized history for ${normalizedTradeId}: ${error.message}`);
return false;
}
return (data || []).some((row: any) => {
const reason = String(row?.reason || '').toLowerCase();
return !reason.includes('partial exit');
});
} catch (err: any) {
logger.error(`[Supabase] Unexpected finalized history check error for ${normalizedTradeId}: ${err.message}`);
return false;
}
}
/**
* Determines whether a lifecycle identified by trade_id is already closed.
* Used to resolve stale EXIT orders that remain pending_new after the lifecycle
* has been finalized by another exchange event.
*/
async isTradeLifecycleClosed(tradeId: string, profileId?: string, symbol?: string): Promise<boolean> {
if (!this.client) return false;
const normalizedTradeId = String(tradeId || '').trim();
if (!normalizedTradeId) return false;
try {
let orderQuery = this.client
.from('orders')
.select('action, side, qty, status')
.eq('trade_id', normalizedTradeId)
.in('status', ['filled', 'partially_filled']);
if (this.isUuid(profileId)) {
orderQuery = orderQuery.eq('profile_id', profileId);
}
if (symbol) {
orderQuery = orderQuery.eq('symbol', symbol);
}
const { data: orderRows, error: orderError } = await orderQuery.limit(1000);
if (orderError) {
logger.error(`[Supabase] Error checking lifecycle closure for ${normalizedTradeId}: ${orderError.message}`);
return false;
}
let entryQty = 0;
let exitQty = 0;
for (const row of orderRows || []) {
const action = this.inferLifecycleAction(
(row as any)?.action || undefined,
(row as any)?.side || undefined
);
const qty = Number((row as any)?.qty || 0);
if (!Number.isFinite(qty) || qty <= 0) continue;
if (action === 'ENTRY') {
entryQty += qty;
} else if (action === 'EXIT') {
exitQty += qty;
}
}
if (entryQty > 0 && exitQty >= entryQty - 1e-8) {
return true;
}
let historyQuery = this.client
.from('trade_history')
.select('id,reason,size')
.eq('trade_id', normalizedTradeId)
.limit(200);
if (this.isUuid(profileId)) {
historyQuery = historyQuery.eq('profile_id', profileId);
}
if (symbol) {
historyQuery = historyQuery.eq('symbol', symbol);
}
const { data: historyRows, error: historyError } = await historyQuery;
if (historyError) {
logger.error(`[Supabase] Error checking history closure for ${normalizedTradeId}: ${historyError.message}`);
return false;
}
const rows = historyRows || [];
if (!rows.length) return false;
let finalizedRows = 0;
let partialExitQty = 0;
for (const row of rows as any[]) {
const reason = String(row?.reason || '').toLowerCase();
const size = Number(row?.size || 0);
if (reason.includes('partial exit')) {
if (Number.isFinite(size) && size > 0) {
partialExitQty += size;
}
continue;
}
finalizedRows += 1;
}
if (finalizedRows > 0) {
return true;
}
if (entryQty > 0 && partialExitQty >= entryQty - 1e-8) {
return true;
}
return false;
} catch (err: any) {
logger.error(`[Supabase] Unexpected lifecycle closure check error for ${normalizedTradeId}: ${err.message}`);
return false;
}
}
/**
* Reconstructs a profile-scoped virtual open position from filled order lifecycle.
* This is the source of truth for dedicated profiles sharing a single exchange account.
*/
async getVirtualOpenPosition(profileId: string, symbol: string): Promise<VirtualOpenPosition | null> {
if (!this.client) return null;
type LedgerOrderRow = {
trade_id?: string | null;
action?: string | null;
side?: string | null;
qty?: number | string | null;
price?: number | string | null;
user_id?: string | null;
stop_loss?: number | string | null;
take_profit?: number | string | null;
timestamp?: number | string | null;
created_at?: string | null;
};
type TradeLedger = {
tradeId: string;
side: 'BUY' | 'SELL';
entryQty: number;
entryNotional: number;
entryLastPrice: number;
exitQty: number;
userId?: string;
stopLoss: number;
takeProfit: number;
lastTs: number;
};
type SideAggregate = {
side: 'BUY' | 'SELL';
qty: number;
notional: number;
userId?: string;
stopLoss: number;
takeProfit: number;
tradeIds: string[];
primaryTradeId: string;
primaryTs: number;
};
const toNumber = (value: any): number => {
const parsed = Number(value);
return Number.isFinite(parsed) ? parsed : 0;
};
const toTimestamp = (row: LedgerOrderRow, fallback: number): number => {
const ts = Number(row.timestamp);
if (Number.isFinite(ts) && ts > 0) return ts;
const createdAtTs = Date.parse(String(row.created_at || ''));
if (Number.isFinite(createdAtTs) && createdAtTs > 0) return createdAtTs;
return fallback;
};
try {
const symbolCandidates = this.buildLifecycleSymbolCandidates(symbol);
if (!symbolCandidates.length) return null;
const { data, error } = await this.client
.from('orders')
.select('trade_id, action, side, qty, price, user_id, stop_loss, take_profit, timestamp, created_at')
.eq('profile_id', profileId)
.in('symbol', symbolCandidates)
.in('status', ['filled', 'partially_filled'])
.order('created_at', { ascending: true })
.limit(2000);
if (error) {
logger.error(`[Supabase] Error computing virtual position for ${profileId}/${symbol}: ${error.message}`);
return null;
}
const rows = (data || []) as LedgerOrderRow[];
if (!rows.length) return null;
const orderedRows = rows
.map((row, index) => ({
row,
index,
ts: toTimestamp(row, index)
}))
.sort((a, b) => (a.ts - b.ts) || (a.index - b.index));
const ledgerByTrade = new Map<string, TradeLedger>();
const entrySideByTrade = new Map<string, 'BUY' | 'SELL'>();
const openTradeQueueBySide: Record<'BUY' | 'SELL', string[]> = { BUY: [], SELL: [] };
const normalizeToken = (value: string): string =>
value.replace(/[^A-Za-z0-9]/g, '').slice(0, 24) || 'token';
const profileToken = normalizeToken(profileId);
const symbolToken = normalizeToken(symbol);
let syntheticCounter = 0;
let inferredLegacyRows = 0;
const buildSyntheticTradeId = (side: 'BUY' | 'SELL', ts: number): string => {
syntheticCounter += 1;
const tsToken = Number.isFinite(ts) && ts > 0 ? Math.trunc(ts) : syntheticCounter;
return `__legacy__-${profileToken}-${symbolToken}-${side}-${tsToken}-${String(syntheticCounter).padStart(4, '0')}`;
};
for (const { row, ts } of orderedRows) {
const qty = toNumber(row.qty);
if (qty <= 0) continue;
const rowSide = normalizeTradeSide(row.side || 'BUY');
const rawTradeId = String(row.trade_id || '').trim();
const oppositeSide: 'BUY' | 'SELL' = rowSide === 'BUY' ? 'SELL' : 'BUY';
const explicitAction = normalizeOrderAction(row.action || undefined);
let tradeId = rawTradeId;
let action = explicitAction;
if (!action && !tradeId) {
action = openTradeQueueBySide[oppositeSide].length > 0 ? 'EXIT' : 'ENTRY';
inferredLegacyRows += 1;
}
if (!tradeId) {
if (action === 'EXIT' && openTradeQueueBySide[oppositeSide].length > 0) {
tradeId = openTradeQueueBySide[oppositeSide][0];
} else {
tradeId = buildSyntheticTradeId(action === 'EXIT' ? oppositeSide : rowSide, ts);
}
}
if (!action) {
const knownEntrySide = entrySideByTrade.get(tradeId);
if (knownEntrySide) {
action = rowSide === knownEntrySide ? 'ENTRY' : 'EXIT';
} else {
action = this.inferLifecycleAction(row.action || undefined, row.side || undefined);
}
}
let tradeLedger = ledgerByTrade.get(tradeId);
if (!tradeLedger) {
tradeLedger = {
tradeId,
side: rowSide,
entryQty: 0,
entryNotional: 0,
entryLastPrice: 0,
exitQty: 0,
userId: row.user_id || undefined,
stopLoss: 0,
takeProfit: 0,
lastTs: ts
};
ledgerByTrade.set(tradeId, tradeLedger);
}
if (action === 'ENTRY') {
if (tradeLedger.entryQty === 0) {
tradeLedger.side = rowSide;
}
tradeLedger.entryQty += qty;
entrySideByTrade.set(tradeId, tradeLedger.side);
if (!openTradeQueueBySide[tradeLedger.side].includes(tradeId)) {
openTradeQueueBySide[tradeLedger.side].push(tradeId);
}
const price = toNumber(row.price);
if (price > 0) {
tradeLedger.entryNotional += price * qty;
tradeLedger.entryLastPrice = price;
}
const stopLoss = toNumber(row.stop_loss);
const takeProfit = toNumber(row.take_profit);
if (stopLoss > 0) tradeLedger.stopLoss = stopLoss;
if (takeProfit > 0) tradeLedger.takeProfit = takeProfit;
} else {
tradeLedger.exitQty += qty;
const queue = openTradeQueueBySide[oppositeSide];
const tradeIdx = queue.findIndex((queuedTradeId) => queuedTradeId === tradeId);
if (tradeIdx >= 0) {
queue.splice(tradeIdx, 1);
} else if (queue.length > 0) {
queue.shift();
}
}
if (row.user_id) tradeLedger.userId = row.user_id;
tradeLedger.lastTs = Math.max(tradeLedger.lastTs, ts);
}
if (inferredLegacyRows > 0) {
logger.warn(`[Supabase] Inferred lifecycle action for ${inferredLegacyRows} legacy rows in ${profileId}/${symbol} (missing action/trade_id).`);
}
const aggregateBySide = new Map<'BUY' | 'SELL', SideAggregate>();
for (const tradeLedger of ledgerByTrade.values()) {
const remainingQty = tradeLedger.entryQty - tradeLedger.exitQty;
if (remainingQty <= 1e-8) continue;
const weightedEntryPrice = tradeLedger.entryQty > 0 && tradeLedger.entryNotional > 0
? (tradeLedger.entryNotional / tradeLedger.entryQty)
: tradeLedger.entryLastPrice;
if (!(weightedEntryPrice > 0)) continue;
const normalizedTradeId = tradeLedger.tradeId.startsWith('__legacy__-')
? `TRD-LEGACY-${tradeLedger.tradeId.slice('__legacy__-'.length)}`
: tradeLedger.tradeId;
let aggregate = aggregateBySide.get(tradeLedger.side);
if (!aggregate) {
aggregate = {
side: tradeLedger.side,
qty: 0,
notional: 0,
userId: tradeLedger.userId,
stopLoss: tradeLedger.stopLoss,
takeProfit: tradeLedger.takeProfit,
tradeIds: [],
primaryTradeId: normalizedTradeId,
primaryTs: tradeLedger.lastTs
};
aggregateBySide.set(tradeLedger.side, aggregate);
}
aggregate.qty += remainingQty;
aggregate.notional += remainingQty * weightedEntryPrice;
if (!aggregate.tradeIds.includes(normalizedTradeId)) {
aggregate.tradeIds.push(normalizedTradeId);
}
const currentPrimaryIsLegacy = aggregate.primaryTradeId.startsWith('TRD-LEGACY-');
const candidateIsLegacy = normalizedTradeId.startsWith('TRD-LEGACY-');
const shouldReplacePrimary =
(!candidateIsLegacy && currentPrimaryIsLegacy)
|| (candidateIsLegacy === currentPrimaryIsLegacy && tradeLedger.lastTs >= aggregate.primaryTs);
if (shouldReplacePrimary) {
aggregate.primaryTs = tradeLedger.lastTs;
aggregate.primaryTradeId = normalizedTradeId;
aggregate.userId = tradeLedger.userId || aggregate.userId;
if (tradeLedger.stopLoss > 0) aggregate.stopLoss = tradeLedger.stopLoss;
if (tradeLedger.takeProfit > 0) aggregate.takeProfit = tradeLedger.takeProfit;
}
}
if (!aggregateBySide.size) return null;
let dominantSidePosition: SideAggregate | null = null;
for (const candidate of aggregateBySide.values()) {
if (!dominantSidePosition || candidate.qty > dominantSidePosition.qty) {
dominantSidePosition = candidate;
}
}
if (!dominantSidePosition || dominantSidePosition.qty <= 1e-8) return null;
if (aggregateBySide.size > 1) {
logger.warn(`[Supabase] Mixed-side virtual position detected for ${profileId}/${symbol}. Using dominant side ${dominantSidePosition.side}.`);
}
let fallbackStopLoss = Number(dominantSidePosition.stopLoss || 0);
let fallbackTakeProfit = Number(dominantSidePosition.takeProfit || 0);
if (fallbackStopLoss <= 0 || fallbackTakeProfit <= 0) {
for (let i = orderedRows.length - 1; i >= 0; i--) {
const row = orderedRows[i].row;
const rowSide = normalizeTradeSide(row.side || 'BUY');
if (rowSide !== dominantSidePosition.side) continue;
const explicitAction = normalizeOrderAction(row.action || undefined);
const hasExplicitTradeId = String(row.trade_id || '').trim().length > 0;
if (!explicitAction && !hasExplicitTradeId) continue;
const inferredAction = explicitAction || (rowSide === 'BUY' ? 'ENTRY' : 'EXIT');
if (inferredAction !== 'ENTRY') continue;
const sl = toNumber(row.stop_loss);
const tp = toNumber(row.take_profit);
if (fallbackStopLoss <= 0 && sl > 0) fallbackStopLoss = sl;
if (fallbackTakeProfit <= 0 && tp > 0) fallbackTakeProfit = tp;
if (fallbackStopLoss > 0 && fallbackTakeProfit > 0) break;
}
}
const entryPrice = dominantSidePosition.notional / dominantSidePosition.qty;
return {
profileId,
symbol,
side: dominantSidePosition.side,
qty: Number(dominantSidePosition.qty.toFixed(8)),
entryPrice: Number(entryPrice.toFixed(8)),
stopLoss: Number(fallbackStopLoss || 0),
takeProfit: Number(fallbackTakeProfit || 0),
userId: dominantSidePosition.userId,
tradeId: dominantSidePosition.primaryTradeId,
tradeIds: dominantSidePosition.tradeIds
};
} catch (err: any) {
logger.error(`[Supabase] Unexpected virtual position reconstruction error for ${profileId}/${symbol}: ${err.message}`);
return null;
}
}
async getVirtualOpenPositionForTrade(profileId: string, symbol: string, tradeId: string): Promise<VirtualOpenPosition | null> {
if (!this.client) return null;
const normalizedTradeId = String(tradeId || '').trim();
if (!normalizedTradeId) return null;
type LedgerOrderRow = {
action?: string | null;
side?: string | null;
qty?: number | string | null;
price?: number | string | null;
user_id?: string | null;
stop_loss?: number | string | null;
take_profit?: number | string | null;
timestamp?: number | string | null;
created_at?: string | null;
};
const toNumber = (value: any): number => {
const parsed = Number(value);
return Number.isFinite(parsed) ? parsed : 0;
};
const toTimestamp = (row: LedgerOrderRow, fallback: number): number => {
const ts = Number(row.timestamp);
if (Number.isFinite(ts) && ts > 0) return ts;
const createdAtTs = Date.parse(String(row.created_at || ''));
if (Number.isFinite(createdAtTs) && createdAtTs > 0) return createdAtTs;
return fallback;
};
try {
const symbolCandidates = this.buildLifecycleSymbolCandidates(symbol);
if (!symbolCandidates.length) return null;
const { data, error } = await this.client
.from('orders')
.select('action, side, qty, price, user_id, stop_loss, take_profit, timestamp, created_at')
.eq('profile_id', profileId)
.in('symbol', symbolCandidates)
.eq('trade_id', normalizedTradeId)
.in('status', ['filled', 'partially_filled'])
.order('created_at', { ascending: true })
.limit(1000);
if (error) {
logger.error(`[Supabase] Error computing virtual trade slice for ${profileId}/${symbol}/${normalizedTradeId}: ${error.message}`);
return null;
}
const rows = (data || []) as LedgerOrderRow[];
if (!rows.length) return null;
let entrySide: 'BUY' | 'SELL' | null = null;
let entryQty = 0;
let entryNotional = 0;
let entryLastPrice = 0;
let exitQty = 0;
let stopLoss = 0;
let takeProfit = 0;
let userId: string | undefined;
const orderedRows = rows
.map((row, index) => ({ row, ts: toTimestamp(row, index), index }))
.sort((a, b) => (a.ts - b.ts) || (a.index - b.index))
.map((wrapped) => wrapped.row);
for (const row of orderedRows) {
const qty = toNumber(row.qty);
if (qty <= 0) continue;
const side = normalizeTradeSide(row.side || 'BUY');
const explicitAction = normalizeOrderAction(row.action || undefined);
let action = explicitAction;
if (!action) {
if (entrySide) {
action = side === entrySide ? 'ENTRY' : 'EXIT';
} else {
action = this.inferLifecycleAction(row.action || undefined, row.side || undefined);
}
}
if (action === 'ENTRY') {
if (!entrySide) {
entrySide = side;
}
entryQty += qty;
const price = toNumber(row.price);
if (price > 0) {
entryNotional += price * qty;
entryLastPrice = price;
}
const sl = toNumber(row.stop_loss);
const tp = toNumber(row.take_profit);
if (sl > 0) stopLoss = sl;
if (tp > 0) takeProfit = tp;
} else {
exitQty += qty;
}
if (row.user_id) userId = row.user_id;
}
if (!entrySide || entryQty <= 0) return null;
const remainingQty = entryQty - exitQty;
if (remainingQty <= 1e-8) return null;
const entryPrice = entryNotional > 0 ? (entryNotional / entryQty) : entryLastPrice;
if (!(entryPrice > 0)) return null;
if (stopLoss <= 0 || takeProfit <= 0) {
for (let i = orderedRows.length - 1; i >= 0; i--) {
const row = orderedRows[i];
const side = normalizeTradeSide(row.side || 'BUY');
const action = normalizeOrderAction(row.action || undefined) || (side === entrySide ? 'ENTRY' : 'EXIT');
if (action !== 'ENTRY') continue;
const sl = toNumber(row.stop_loss);
const tp = toNumber(row.take_profit);
if (stopLoss <= 0 && sl > 0) stopLoss = sl;
if (takeProfit <= 0 && tp > 0) takeProfit = tp;
if (stopLoss > 0 && takeProfit > 0) break;
}
}
return {
profileId,
symbol,
side: entrySide,
qty: Number(remainingQty.toFixed(8)),
entryPrice: Number(entryPrice.toFixed(8)),
stopLoss: Number(stopLoss || 0),
takeProfit: Number(takeProfit || 0),
userId,
tradeId: normalizedTradeId,
tradeIds: [normalizedTradeId]
};
} catch (err: any) {
logger.error(`[Supabase] Unexpected virtual trade slice error for ${profileId}/${symbol}/${normalizedTradeId}: ${err.message}`);
return null;
}
}
async verifyAccessToken(token: string): Promise<{ userId: string | null; error?: string }> {
if (!this.client) {
return { userId: null, error: 'Supabase client not configured' };
}
try {
const { data, error } = await this.client.auth.getUser(token);
if (error || !data.user) {
return { userId: null, error: error?.message || 'Invalid token' };
}
const claims = this.decodeJwtPayload(token);
if (!claims) {
return { userId: null, error: 'Invalid token claims' };
}
const requiredIssuer = config.SUPABASE_JWT_ISSUER;
if (requiredIssuer) {
const tokenIssuer = String(claims.iss || '');
if (tokenIssuer !== requiredIssuer) {
return { userId: null, error: 'Invalid token issuer' };
}
}
const requiredAudience = config.SUPABASE_JWT_AUDIENCE;
if (requiredAudience) {
const tokenAudience = claims.aud;
const isAudienceValid = Array.isArray(tokenAudience)
? tokenAudience.includes(requiredAudience)
: String(tokenAudience || '') === requiredAudience;
if (!isAudienceValid) {
return { userId: null, error: 'Invalid token audience' };
}
}
return { userId: data.user.id };
} catch (err: any) {
return { userId: null, error: err.message };
}
}
async isAdmin(userId: string): Promise<boolean> {
if (!this.client || !userId) return false;
try {
const { data, error } = await this.client
.from('users')
.select('role')
.eq('user_id', userId)
.maybeSingle();
if (error || !data) return false;
return String(data.role).toLowerCase() === 'admin';
} catch {
return false;
}
}
async getProfileOwner(profileId: string): Promise<string | null> {
if (!this.client) return null;
try {
const { data, error } = await this.client
.from('trade_profiles')
.select('user_id')
.eq('id', profileId)
.maybeSingle();
if (error || !data) return null;
return data.user_id || null;
} catch {
return null;
}
}
/**
* Returns profile allocation metadata used by capital guards.
*/
async getProfileCapital(profileId: string): Promise<{
allocatedCapital: number;
isActive: boolean;
userId?: string;
} | null> {
if (!this.client) return null;
try {
const { data, error } = await this.client
.from('trade_profiles')
.select('allocated_capital,is_active,user_id')
.eq('id', profileId)
.maybeSingle();
if (error || !data) return null;
const allocatedCapital = Number((data as any).allocated_capital || 0);
return {
allocatedCapital: Number.isFinite(allocatedCapital) && allocatedCapital > 0 ? allocatedCapital : 0,
isActive: Boolean((data as any).is_active),
userId: (data as any).user_id || undefined
};
} catch {
return null;
}
}
async getProfileForBacktest(profileId: string, userId: string): Promise<{
id: string;
user_id: string;
name: string;
symbols: string;
allocated_capital: number;
risk_per_trade_percent: number;
strategy_config: any;
} | null> {
if (!this.client) return null;
const normalizedProfileId = String(profileId || '').trim();
const normalizedUserId = String(userId || '').trim();
if (!this.isUuid(normalizedProfileId) || !this.isUuid(normalizedUserId)) return null;
try {
const { data, error } = await this.client
.from('trade_profiles')
.select('id,user_id,name,symbols,allocated_capital,risk_per_trade_percent,strategy_config')
.eq('id', normalizedProfileId)
.eq('user_id', normalizedUserId)
.maybeSingle();
if (error || !data) return null;
return {
id: String((data as any).id),
user_id: String((data as any).user_id),
name: String((data as any).name || ''),
symbols: String((data as any).symbols || ''),
allocated_capital: Number((data as any).allocated_capital || 0),
risk_per_trade_percent: Number((data as any).risk_per_trade_percent || 0),
strategy_config: this.normalizeStrategyConfig((data as any).strategy_config)
};
} catch {
return null;
}
}
async getSnapshotOwnerId(): Promise<string | null> {
if (this.snapshotOwnerId) return this.snapshotOwnerId;
const configured = String(config.SNAPSHOT_USER_ID || '').trim().toLowerCase();
if (this.isUuid(configured)) {
this.snapshotOwnerId = configured;
return configured;
}
if (!this.client) return null;
try {
const { data, error } = await this.client
.from('users')
.select('user_id')
.order('created_at', { ascending: true })
.limit(1)
.maybeSingle();
if (error || !data) {
return null;
}
const resolved = String(data.user_id || '').trim();
if (resolved && this.isUuid(resolved)) {
this.snapshotOwnerId = resolved;
return resolved;
}
return null;
} catch (err: any) {
logger.error(`[Supabase] Snapshot owner lookup failed: ${err.message}`);
return null;
}
}
async saveBotStateSnapshot(userId: string, state: unknown): Promise<void> {
if (!this.client || !userId) return;
try {
const { error } = await this.client
.from('bot_state_snapshots')
.insert([{ user_id: userId, state }]);
if (error) {
logger.warn(`[Supabase] Snapshot insert failed: ${error.message}`);
}
} catch (err: any) {
logger.error(`[Supabase] Unexpected snapshot insert error: ${err.message}`);
}
}
async loadLatestBotStateSnapshot(userId: string): Promise<{ state: unknown } | null> {
if (!this.client || !userId) return null;
try {
const { data, error } = await this.client
.from('bot_state_snapshots')
.select('state')
.eq('user_id', userId)
.order('created_at', { ascending: false })
.limit(1)
.maybeSingle();
if (error || !data) {
return null;
}
return { state: (data as any).state };
} catch (err: any) {
logger.error(`[Supabase] Snapshot load failed: ${err.message}`);
return null;
}
}
}
export const supabaseService = new SupabaseService();