refactor: route backend order history access through repository
This commit is contained in:
parent
2b36fca143
commit
d1da7ec70c
@ -1,9 +1,9 @@
|
||||
import { supabaseService } from './SupabaseService.js';
|
||||
import { IExchangeConnector } from '../connectors/types.js';
|
||||
import logger from '../utils/logger.js';
|
||||
import { config } from '../config/index.js';
|
||||
import { SymbolMapper } from '../utils/symbolMapper.js';
|
||||
import { healthTracker } from './healthTracker.js';
|
||||
import * as runtimeOrderRepository from './runtimeOrderRepository.js';
|
||||
|
||||
const normalizeThrown = (value: unknown): Error => {
|
||||
if (value instanceof Error) return value;
|
||||
@ -131,12 +131,6 @@ export class OrderStatusSyncService {
|
||||
this.isRunning = true;
|
||||
|
||||
try {
|
||||
// Only sync if we have Supabase client
|
||||
if (!supabaseService) {
|
||||
logger.debug('[OrderSync] Supabase not configured, skipping sync');
|
||||
return;
|
||||
}
|
||||
|
||||
const staleOrders = await this.getStaleOrders();
|
||||
const recentClosedOrders = await this.getRecentClosedOrders();
|
||||
const candidateOrders = new Map<string, OrderSyncOrderRecord>();
|
||||
@ -199,7 +193,7 @@ export class OrderStatusSyncService {
|
||||
*/
|
||||
private async getStaleOrders(): Promise<OrderSyncOrderRecord[]> {
|
||||
try {
|
||||
const staleOrders = await supabaseService.getStaleOrders(config.STALE_ORDER_THRESHOLD_MINUTES, {
|
||||
const staleOrders = await runtimeOrderRepository.getStaleOrders(config.STALE_ORDER_THRESHOLD_MINUTES, {
|
||||
profileId: this.profileId,
|
||||
userId: this.scopeOptions.userId,
|
||||
includeOrphanUserOrders: this.scopeOptions.includeOrphanUserOrders,
|
||||
@ -216,7 +210,7 @@ export class OrderStatusSyncService {
|
||||
const profileId = String(this.profileId || '').trim();
|
||||
if (!profileId || !OrderStatusSyncService.UUID_PATTERN.test(profileId)) return [];
|
||||
try {
|
||||
const rows = await supabaseService.getRecentlyClosedOrdersForProfile(
|
||||
const rows = await runtimeOrderRepository.getRecentlyClosedOrdersForProfile(
|
||||
profileId,
|
||||
config.ORDER_SYNC_RECENT_CLOSED_LOOKBACK_MINUTES
|
||||
);
|
||||
@ -334,7 +328,7 @@ export class OrderStatusSyncService {
|
||||
// If lifecycle is already closed, mark stale order canceled so dashboard
|
||||
// does not keep ghost active rows. Applies to EXIT and legacy rows without action.
|
||||
if (order.trade_id) {
|
||||
const isLifecycleClosed = await supabaseService.isTradeLifecycleClosed(
|
||||
const isLifecycleClosed = await runtimeOrderRepository.isTradeLifecycleClosed(
|
||||
order.trade_id,
|
||||
order.profile_id,
|
||||
order.symbol
|
||||
@ -342,7 +336,7 @@ export class OrderStatusSyncService {
|
||||
|
||||
if (isLifecycleClosed) {
|
||||
logger.warn(`[OrderSync] Order ${orderId} (${order.symbol}) not found on exchange, but lifecycle ${order.trade_id} is closed. Marking as canceled.`);
|
||||
await supabaseService.updateOrderStatus?.(orderId, 'canceled');
|
||||
await runtimeOrderRepository.updateOrderStatus(orderId, 'canceled');
|
||||
this.missingExchangeDetections.delete(orderId);
|
||||
await this.emitStatusChange({
|
||||
orderId,
|
||||
@ -365,10 +359,10 @@ export class OrderStatusSyncService {
|
||||
&& (normalizedAction === 'EXIT' || (!normalizedAction && normalizedSide === 'SELL'));
|
||||
if (isExitLikeWithoutTradeId) {
|
||||
try {
|
||||
const virtualPosition = await supabaseService.getVirtualOpenPosition(order.profile_id!, order.symbol);
|
||||
const virtualPosition = await runtimeOrderRepository.getVirtualOpenPosition(order.profile_id!, order.symbol);
|
||||
if (!virtualPosition) {
|
||||
logger.warn(`[OrderSync] Legacy EXIT-like order ${orderId} (${order.symbol}) has no open virtual lifecycle. Marking as canceled.`);
|
||||
await supabaseService.updateOrderStatus?.(orderId, 'canceled');
|
||||
await runtimeOrderRepository.updateOrderStatus(orderId, 'canceled');
|
||||
this.missingExchangeDetections.delete(orderId);
|
||||
await this.emitStatusChange({
|
||||
orderId,
|
||||
@ -390,7 +384,7 @@ export class OrderStatusSyncService {
|
||||
// If order is older than 24 hours and not found, mark as expired to clean up UI
|
||||
if (ageHours > 24) {
|
||||
logger.error(`[OrderSync][QUARANTINE] Order ${orderId} not found and >24h old. Marking as UNKNOWN for manual review.`);
|
||||
await supabaseService.updateOrderStatus?.(orderId, 'unknown');
|
||||
await runtimeOrderRepository.updateOrderStatus(orderId, 'unknown');
|
||||
this.missingExchangeDetections.delete(orderId);
|
||||
await this.emitStatusChange({
|
||||
orderId,
|
||||
@ -440,7 +434,7 @@ export class OrderStatusSyncService {
|
||||
if (exchangeStatus && exchangeStatus !== order.status?.toLowerCase()) {
|
||||
logger.info(`[OrderSync] Updating order ${orderId} (${order.symbol}): ${order.status} → ${exchangeStatus}`);
|
||||
|
||||
await supabaseService.updateOrderStatus?.(
|
||||
await runtimeOrderRepository.updateOrderStatus(
|
||||
orderId,
|
||||
exchangeStatus,
|
||||
exchangeFilledAt,
|
||||
@ -463,7 +457,7 @@ export class OrderStatusSyncService {
|
||||
this.missingExchangeDetections.delete(orderId);
|
||||
} else if (fillDataDrift) {
|
||||
logger.info(`[OrderSync] Refreshing fill data for ${orderId} (${order.symbol}): qty ${order.qty} -> ${fillQty}, price ${order.price} -> ${fillPrice}`);
|
||||
await supabaseService.updateOrderStatus?.(
|
||||
await runtimeOrderRepository.updateOrderStatus(
|
||||
orderId,
|
||||
exchangeStatus || String(order.status || 'filled'),
|
||||
exchangeFilledAt,
|
||||
|
||||
@ -2,7 +2,7 @@
|
||||
import { IExchangeConnector, ExchangeCapabilities } from '../connectors/types.js';
|
||||
import { SignalDirection } from '../strategies/rules/types.js';
|
||||
import logger from '../utils/logger.js';
|
||||
import { supabaseService } from './SupabaseService.js';
|
||||
import * as runtimeOrderRepository from './runtimeOrderRepository.js';
|
||||
import { healthTracker } from './healthTracker.js';
|
||||
import type { ApiServer } from './apiServer.js';
|
||||
import { SymbolMapper } from '../utils/symbolMapper.js';
|
||||
@ -413,13 +413,13 @@ export class TradeExecutor {
|
||||
private async hasActiveTradeId(tradeId?: string): Promise<boolean> {
|
||||
const normalized = String(tradeId || '').trim();
|
||||
if (!normalized) return false;
|
||||
return await supabaseService.hasActiveOrderForTradeId(normalized, this.profileId);
|
||||
return await runtimeOrderRepository.hasActiveOrderForTradeId(normalized, this.profileId);
|
||||
}
|
||||
|
||||
private async isTradeAlreadyFinalized(tradeId?: string): Promise<boolean> {
|
||||
const normalized = String(tradeId || '').trim();
|
||||
if (!normalized) return false;
|
||||
return await supabaseService.hasFinalizedTradeHistory(normalized, this.profileId);
|
||||
return await runtimeOrderRepository.hasFinalizedTradeHistory(normalized, this.profileId);
|
||||
}
|
||||
private setExitLifecycle(
|
||||
symbol: string,
|
||||
@ -661,7 +661,7 @@ export class TradeExecutor {
|
||||
|
||||
private async populatePendingOrdersFromDb(): Promise<void> {
|
||||
this.pendingOrders.clear();
|
||||
const rows = await supabaseService.getOpenOrdersForProfile(this.profileId || '');
|
||||
const rows = await runtimeOrderRepository.getOpenOrdersForProfile(this.profileId || '');
|
||||
rows.forEach((row) => this.addPendingOrderFromRecord(row));
|
||||
}
|
||||
|
||||
@ -771,7 +771,7 @@ export class TradeExecutor {
|
||||
|
||||
let reservedPositions = 0;
|
||||
for (const symbol of config.SYMBOLS) {
|
||||
const virtualPosition = await supabaseService.getVirtualOpenPosition(ledgerProfileId, symbol);
|
||||
const virtualPosition = await runtimeOrderRepository.getVirtualOpenPosition(ledgerProfileId, symbol);
|
||||
if (virtualPosition && virtualPosition.qty > 0 && virtualPosition.entryPrice > 0) {
|
||||
reservedPositions += virtualPosition.qty * virtualPosition.entryPrice;
|
||||
}
|
||||
@ -816,7 +816,7 @@ export class TradeExecutor {
|
||||
let virtualNotional = 0;
|
||||
for (const symbol of symbols) {
|
||||
try {
|
||||
const virtualPosition = await supabaseService.getVirtualOpenPosition(profileId, symbol);
|
||||
const virtualPosition = await runtimeOrderRepository.getVirtualOpenPosition(profileId, symbol);
|
||||
if (virtualPosition && virtualPosition.qty > 0 && virtualPosition.entryPrice > 0) {
|
||||
virtualNotional += virtualPosition.qty * virtualPosition.entryPrice;
|
||||
}
|
||||
@ -1117,7 +1117,7 @@ export class TradeExecutor {
|
||||
));
|
||||
} catch (error: any) {
|
||||
if (this.isDuplicateOrderError(error)) {
|
||||
const existingOrderRow = await supabaseService.getOrderByTradeId(tradeId, ledgerProfileId);
|
||||
const existingOrderRow = await runtimeOrderRepository.getOrderByTradeId(tradeId, ledgerProfileId);
|
||||
const existingOrderId = String(existingOrderRow?.order_id || '').trim();
|
||||
if (!existingOrderId) {
|
||||
throw normalizeThrown(error);
|
||||
@ -1197,7 +1197,7 @@ export class TradeExecutor {
|
||||
if (!verifiedOrder || terminalStatuses.has(verifiedStatus)) {
|
||||
logger.warn(`[Executor] âš ï¸ Order ${order.id} was ${verifiedOrder?.status || 'lost'}. Not tracking position.`);
|
||||
// Update order status in DB
|
||||
await supabaseService.updateOrderStatus?.(order.id, verifiedOrder?.status || 'canceled');
|
||||
await runtimeOrderRepository.updateOrderStatus(order.id, verifiedOrder?.status || 'canceled');
|
||||
const pending = this.pendingOrders.get(order.id);
|
||||
await this.releasePendingOrderReservation(pending);
|
||||
this.pendingOrders.delete(order.id);
|
||||
@ -1221,7 +1221,7 @@ export class TradeExecutor {
|
||||
const filledQty = this.getFilledQuantity(verifiedOrder) || executionQty;
|
||||
|
||||
// ✅ Update order status in DB when filled
|
||||
await supabaseService.updateOrderStatus?.(
|
||||
await runtimeOrderRepository.updateOrderStatus(
|
||||
order.id,
|
||||
verifiedStatus,
|
||||
new Date(),
|
||||
@ -1695,7 +1695,7 @@ export class TradeExecutor {
|
||||
// Do NOT finalize trade locally unless exchange confirms a fill.
|
||||
if (!verifiedOrder || ['canceled', 'expired', 'rejected', 'unknown'].includes(verifiedStatus)) {
|
||||
logger.warn(`[Executor] âš ï¸ Exit order ${order.id} for ${symbol} ended as ${verifiedStatus || 'unknown'}. Keeping local position open.`);
|
||||
await supabaseService.updateOrderStatus?.(order.id, verifiedStatus || 'unknown');
|
||||
await runtimeOrderRepository.updateOrderStatus(order.id, verifiedStatus || 'unknown');
|
||||
this.setExitLifecycle(
|
||||
symbol,
|
||||
verifiedStatus === 'unknown' ? 'quarantined' : 'failed',
|
||||
@ -1709,13 +1709,13 @@ export class TradeExecutor {
|
||||
|
||||
// ✅ Update order status in DB for confirmed exit fills
|
||||
if (verifiedStatus === 'partially_filled' && (!normalizedFilledQty || normalizedFilledQty <= 0)) {
|
||||
await supabaseService.updateOrderStatus?.(order.id, verifiedStatus, new Date(), exitPrice > 0 ? exitPrice : undefined);
|
||||
await runtimeOrderRepository.updateOrderStatus(order.id, verifiedStatus, new Date(), exitPrice > 0 ? exitPrice : undefined);
|
||||
this.setExitLifecycle(symbol, 'quarantined', reason, 'partial_fill_missing_qty', order.id);
|
||||
await this.notifier.sendAlert(`PARTIAL EXIT QUARANTINED\nSymbol: ${symbol}\nOrder: ${order.id}\nAction: Fill qty missing, manual review required`);
|
||||
return { success: false, error: 'Partial exit fill qty missing' };
|
||||
}
|
||||
|
||||
await supabaseService.updateOrderStatus?.(
|
||||
await runtimeOrderRepository.updateOrderStatus(
|
||||
order.id,
|
||||
verifiedStatus || 'filled',
|
||||
new Date(),
|
||||
@ -1843,7 +1843,7 @@ export class TradeExecutor {
|
||||
|
||||
if (finalUserId !== 'global') {
|
||||
if (normalizedTradeId) {
|
||||
canLogPartial = await supabaseService.hasLifecycleEntryOrder(
|
||||
canLogPartial = await runtimeOrderRepository.hasLifecycleEntryOrder(
|
||||
normalizedTradeId,
|
||||
pos.profileId || this.profileId,
|
||||
symbol
|
||||
@ -1854,7 +1854,7 @@ export class TradeExecutor {
|
||||
}
|
||||
|
||||
if (canLogPartial) {
|
||||
await supabaseService.logTransaction({
|
||||
await runtimeOrderRepository.logTransaction({
|
||||
user_id: finalUserId,
|
||||
profile_id: pos.profileId || this.profileId,
|
||||
symbol,
|
||||
@ -1933,7 +1933,7 @@ export class TradeExecutor {
|
||||
try {
|
||||
// Look for the latest ENTRY order for this symbol/profile
|
||||
if (this.profileId) {
|
||||
const lastEntry = await supabaseService.getLatestEntryOrder(this.profileId, symbol, this.userId);
|
||||
const lastEntry = await runtimeOrderRepository.getLatestEntryOrder(this.profileId, symbol, this.userId);
|
||||
|
||||
if (lastEntry) {
|
||||
pos = {
|
||||
@ -1962,7 +1962,7 @@ export class TradeExecutor {
|
||||
|
||||
let hasEntryChain = true;
|
||||
if (normalizedTradeId) {
|
||||
hasEntryChain = await supabaseService.hasLifecycleEntryOrder(normalizedTradeId, profileScope, symbol);
|
||||
hasEntryChain = await runtimeOrderRepository.hasLifecycleEntryOrder(normalizedTradeId, profileScope, symbol);
|
||||
if (!hasEntryChain) {
|
||||
logger.warn(`[Executor] Suppressing finalize history for ${symbol}: lifecycle ${normalizedTradeId} has no ENTRY chain.`);
|
||||
}
|
||||
@ -1970,7 +1970,7 @@ export class TradeExecutor {
|
||||
|
||||
let alreadyFinalized = false;
|
||||
if (normalizedTradeId && hasEntryChain) {
|
||||
alreadyFinalized = await supabaseService.hasFinalizedTradeHistory(normalizedTradeId, profileScope, symbol);
|
||||
alreadyFinalized = await runtimeOrderRepository.hasFinalizedTradeHistory(normalizedTradeId, profileScope, symbol);
|
||||
}
|
||||
|
||||
const pnl = (exitPrice - pos.entryPrice) * pos.size * (pos.side === SignalDirection.BUY ? 1 : -1);
|
||||
@ -2014,7 +2014,7 @@ export class TradeExecutor {
|
||||
|
||||
const finalUserId = pos.userId || this.userId;
|
||||
if (finalUserId !== 'global') {
|
||||
await supabaseService.logTransaction({
|
||||
await runtimeOrderRepository.logTransaction({
|
||||
user_id: finalUserId,
|
||||
profile_id: profileScope,
|
||||
symbol,
|
||||
@ -2066,7 +2066,7 @@ export class TradeExecutor {
|
||||
const finalUserId = specificUserId || this.userId;
|
||||
const orderSubTag = extractOrderSubTag(order) || undefined;
|
||||
if (finalUserId !== 'global') {
|
||||
await supabaseService.logOrder({
|
||||
await runtimeOrderRepository.logOrder({
|
||||
user_id: finalUserId,
|
||||
profile_id: this.profileId,
|
||||
order_id: order.id,
|
||||
@ -2133,9 +2133,9 @@ export class TradeExecutor {
|
||||
SymbolMapper.toDataSymbol(tradeSymbol, config.EXECUTION_PROVIDER)
|
||||
].filter(Boolean)));
|
||||
|
||||
let virtualPosition = null as Awaited<ReturnType<typeof supabaseService.getVirtualOpenPosition>>;
|
||||
let virtualPosition = null as Awaited<ReturnType<typeof runtimeOrderRepository.getVirtualOpenPosition>>;
|
||||
for (const candidateSymbol of symbolCandidates) {
|
||||
virtualPosition = await supabaseService.getVirtualOpenPosition(this.profileId, candidateSymbol);
|
||||
virtualPosition = await runtimeOrderRepository.getVirtualOpenPosition(this.profileId, candidateSymbol);
|
||||
if (virtualPosition) break;
|
||||
}
|
||||
|
||||
@ -2177,7 +2177,7 @@ export class TradeExecutor {
|
||||
const recoveredSlices: PositionState[] = [];
|
||||
|
||||
for (const tradeId of virtualTradeIds) {
|
||||
const slice = await supabaseService.getVirtualOpenPositionForTrade(
|
||||
const slice = await runtimeOrderRepository.getVirtualOpenPositionForTrade(
|
||||
this.profileId,
|
||||
virtualPosition.symbol || symbol,
|
||||
tradeId
|
||||
@ -2187,7 +2187,7 @@ export class TradeExecutor {
|
||||
let recoveredStopLoss = Number(slice.stopLoss || 0);
|
||||
let recoveredTakeProfit = Number(slice.takeProfit || 0);
|
||||
if (recoveredStopLoss <= 0 || recoveredTakeProfit <= 0) {
|
||||
const riskFallback = await supabaseService.getLatestEntryRiskOrder(
|
||||
const riskFallback = await runtimeOrderRepository.getLatestEntryRiskOrder(
|
||||
this.profileId,
|
||||
slice.symbol || symbol,
|
||||
slice.side
|
||||
@ -2245,7 +2245,7 @@ export class TradeExecutor {
|
||||
let recoveredStopLoss = Number(virtualPosition.stopLoss || 0);
|
||||
let recoveredTakeProfit = Number(virtualPosition.takeProfit || 0);
|
||||
if (recoveredStopLoss <= 0 || recoveredTakeProfit <= 0) {
|
||||
const riskFallback = await supabaseService.getLatestEntryRiskOrder(
|
||||
const riskFallback = await runtimeOrderRepository.getLatestEntryRiskOrder(
|
||||
this.profileId,
|
||||
virtualPosition.symbol || symbol,
|
||||
virtualPosition.side
|
||||
@ -2323,7 +2323,7 @@ export class TradeExecutor {
|
||||
try {
|
||||
// Priority 1: Find the FILLED ENTRY for this specific symbol/user/profile
|
||||
// This ensures we link to the start of the trade lifecycle
|
||||
const entryOrder = await supabaseService.getLatestFilledEntry(this.userId, symbol, this.profileId);
|
||||
const entryOrder = await runtimeOrderRepository.getLatestFilledEntry(this.userId, symbol, this.profileId);
|
||||
|
||||
if (entryOrder) {
|
||||
if (entryOrder.trade_id) {
|
||||
@ -2335,7 +2335,7 @@ export class TradeExecutor {
|
||||
recoveredTp = entryOrder.take_profit || 0;
|
||||
} else {
|
||||
const scopedEntry = this.profileId
|
||||
? await supabaseService.getLatestEntryOrder(this.profileId, symbol, this.userId)
|
||||
? await runtimeOrderRepository.getLatestEntryOrder(this.profileId, symbol, this.userId)
|
||||
: null;
|
||||
|
||||
if (scopedEntry) {
|
||||
@ -2348,7 +2348,7 @@ export class TradeExecutor {
|
||||
}
|
||||
} else {
|
||||
// Legacy/global fallback: Check the very last order (might be a modification or partial fill)
|
||||
const lastOrder = await supabaseService.getLatestOrder(this.userId, symbol);
|
||||
const lastOrder = await runtimeOrderRepository.getLatestOrder(this.userId, symbol);
|
||||
if (lastOrder) {
|
||||
recoveredSl = lastOrder.stop_loss || 0;
|
||||
recoveredTp = lastOrder.take_profit || 0;
|
||||
@ -2394,7 +2394,7 @@ export class TradeExecutor {
|
||||
// --- NEW: Recover pending orders from DB for background sync ---
|
||||
if (this.profileId) {
|
||||
try {
|
||||
const pending = await supabaseService.getPendingOrdersForProfile(this.profileId);
|
||||
const pending = await runtimeOrderRepository.getPendingOrdersForProfile(this.profileId);
|
||||
for (const p of pending) {
|
||||
const pendingOrderId = String(p.order_id || '').trim();
|
||||
if (!pendingOrderId) {
|
||||
@ -2410,18 +2410,18 @@ export class TradeExecutor {
|
||||
const normalizedTradeId = String(p.trade_id || '').trim();
|
||||
|
||||
if (normalizedTradeId) {
|
||||
lifecycleClosed = await supabaseService.isTradeLifecycleClosed(
|
||||
lifecycleClosed = await runtimeOrderRepository.isTradeLifecycleClosed(
|
||||
normalizedTradeId,
|
||||
this.profileId,
|
||||
p.symbol
|
||||
);
|
||||
} else {
|
||||
const virtualPosition = await supabaseService.getVirtualOpenPosition(this.profileId, p.symbol);
|
||||
const virtualPosition = await runtimeOrderRepository.getVirtualOpenPosition(this.profileId, p.symbol);
|
||||
lifecycleClosed = !virtualPosition;
|
||||
}
|
||||
|
||||
if (lifecycleClosed) {
|
||||
await supabaseService.updateOrderStatus?.(pendingOrderId, 'canceled');
|
||||
await runtimeOrderRepository.updateOrderStatus(pendingOrderId, 'canceled');
|
||||
logger.warn(`[Executor] Auto-resolved stale pending EXIT ${pendingOrderId} for ${p.symbol} under profile ${this.profileId}.`);
|
||||
continue;
|
||||
}
|
||||
|
||||
@ -30,6 +30,7 @@ import {
|
||||
listManualEntriesForUser,
|
||||
saveManualEntryForUser
|
||||
} from './manualEntryRepository.js';
|
||||
import * as runtimeOrderRepository from './runtimeOrderRepository.js';
|
||||
import { mergeOrderSnapshots, mergePositionSnapshots } from './stateMerge.js';
|
||||
import { OperationalEvent } from '../domain/operationalEvents.js';
|
||||
import { runBacktest } from '../backtest/index.js';
|
||||
@ -1319,11 +1320,11 @@ export class ApiServer {
|
||||
const deduped = new Map<string, any>();
|
||||
for (const profileRow of profilesInScope) {
|
||||
const profileScoped = isAdmin
|
||||
? await supabaseService.getFilledLifecycleOrdersGlobal({
|
||||
? await runtimeOrderRepository.getFilledLifecycleOrdersGlobal({
|
||||
profileId: String(profileRow.id),
|
||||
maxRows
|
||||
})
|
||||
: await supabaseService.getFilledLifecycleOrdersForUser({
|
||||
: await runtimeOrderRepository.getFilledLifecycleOrdersForUser({
|
||||
userId: authUserId,
|
||||
profileId: String(profileRow.id),
|
||||
maxRows
|
||||
@ -1354,11 +1355,11 @@ export class ApiServer {
|
||||
});
|
||||
} else {
|
||||
const lifecycleOrderResult = isAdmin
|
||||
? await supabaseService.getFilledLifecycleOrdersGlobal({
|
||||
? await runtimeOrderRepository.getFilledLifecycleOrdersGlobal({
|
||||
profileId: requestedProfileId || undefined,
|
||||
maxRows
|
||||
})
|
||||
: await supabaseService.getFilledLifecycleOrdersForUser({
|
||||
: await runtimeOrderRepository.getFilledLifecycleOrdersForUser({
|
||||
userId: authUserId,
|
||||
profileId: requestedProfileId || undefined,
|
||||
maxRows
|
||||
@ -1494,7 +1495,7 @@ export class ApiServer {
|
||||
}
|
||||
|
||||
try {
|
||||
const result = await supabaseService.revertBackfillBatch(batchId);
|
||||
const result = await runtimeOrderRepository.revertBackfillBatch(batchId);
|
||||
if (result.errors.length > 0) {
|
||||
res.status(500).json({ success: false, reverted: result.reverted, errors: result.errors });
|
||||
return;
|
||||
@ -2282,7 +2283,7 @@ export class ApiServer {
|
||||
}
|
||||
const toIso = toParam || undefined;
|
||||
|
||||
const result = await supabaseService.getReconciliationBackfillAuditRows({
|
||||
const result = await runtimeOrderRepository.getReconciliationBackfillAuditRows({
|
||||
profileId: profileId || undefined,
|
||||
symbol: symbol || undefined,
|
||||
batchId: batchId || undefined,
|
||||
@ -2332,7 +2333,7 @@ export class ApiServer {
|
||||
}
|
||||
const toIso = toParam || undefined;
|
||||
|
||||
const batches = await supabaseService.getReconciliationBackfillBatchSummaries({
|
||||
const batches = await runtimeOrderRepository.getReconciliationBackfillBatchSummaries({
|
||||
profileId: profileId || undefined,
|
||||
symbol: symbol || undefined,
|
||||
fromIso,
|
||||
|
||||
@ -1,7 +1,6 @@
|
||||
import { randomUUID } from 'crypto';
|
||||
import logger from '../utils/logger.js';
|
||||
import { TradeExecutor } from './TradeExecutor.js';
|
||||
import { supabaseService } from './SupabaseService.js';
|
||||
import { distributedLockService } from './distributedLockService.js';
|
||||
import { healthTracker } from './healthTracker.js';
|
||||
import { observabilityService } from './observabilityService.js';
|
||||
@ -13,6 +12,7 @@ import { reconciliationParityHeartbeatService } from './reconciliationParityHear
|
||||
import { buildManagedBotSymbolTokenSet, isManagedBotSymbol } from '../utils/botSymbolScope.js';
|
||||
import { extractOrderSubTag } from '../utils/alpacaSubTag.js';
|
||||
import { config } from '../config/index.js';
|
||||
import * as runtimeOrderRepository from './runtimeOrderRepository.js';
|
||||
|
||||
export interface ReconciliationContext {
|
||||
profileId: string;
|
||||
@ -151,8 +151,8 @@ export class ReconciliationService {
|
||||
|
||||
try {
|
||||
const [dbOpenOrders, dbClosedOrders] = await Promise.all([
|
||||
supabaseService.getOpenOrdersForProfile(ctx.profileId),
|
||||
supabaseService.getRecentlyClosedOrdersForProfile(ctx.profileId, 10)
|
||||
runtimeOrderRepository.getOpenOrdersForProfile(ctx.profileId),
|
||||
runtimeOrderRepository.getRecentlyClosedOrdersForProfile(ctx.profileId, 10)
|
||||
]);
|
||||
const exchangeOrders = await ctx.executor.fetchExchangeOpenOrders();
|
||||
const rawDbOpenOrders = dbOpenOrders || [];
|
||||
@ -418,7 +418,7 @@ export class ReconciliationService {
|
||||
|
||||
private async processStatusChange(ctx: ReconciliationContext, dbOrder: any, exchangeOrder: any, normalizedStatus: string) {
|
||||
const action = determineAction(dbOrder || exchangeOrder);
|
||||
await supabaseService.logOrder(this.buildLifecyclePayload(ctx, dbOrder, exchangeOrder, normalizedStatus));
|
||||
await runtimeOrderRepository.logOrder(this.buildLifecyclePayload(ctx, dbOrder, exchangeOrder, normalizedStatus));
|
||||
const orderId = String((exchangeOrder || dbOrder)?.order_id || (exchangeOrder || dbOrder)?.id || '').trim();
|
||||
const tradeId = String((exchangeOrder || dbOrder)?.trade_id || (exchangeOrder || dbOrder)?.tradeId || '').trim();
|
||||
logger.info('Reconciliation correction applied', {
|
||||
@ -447,7 +447,7 @@ export class ReconciliationService {
|
||||
}
|
||||
|
||||
private async handleMissingExchange(ctx: ReconciliationContext, dbOrder: any) {
|
||||
await supabaseService.logOrder(this.buildLifecyclePayload(ctx, dbOrder));
|
||||
await runtimeOrderRepository.logOrder(this.buildLifecyclePayload(ctx, dbOrder));
|
||||
logger.info('Reconciliation cancel added (missing exchange)', {
|
||||
event: 'reconciliation_cancel_missing_exchange',
|
||||
profileId: ctx.profileId,
|
||||
@ -461,7 +461,7 @@ export class ReconciliationService {
|
||||
|
||||
private async handleExchangeOnly(ctx: ReconciliationContext, exchangeOrder: any) {
|
||||
const normalizedStatus = normalizeComparableStatus(exchangeOrder.status);
|
||||
await supabaseService.logOrder(this.buildLifecyclePayload(ctx, exchangeOrder, undefined, normalizedStatus));
|
||||
await runtimeOrderRepository.logOrder(this.buildLifecyclePayload(ctx, exchangeOrder, undefined, normalizedStatus));
|
||||
logger.info('Reconciliation discovery added', {
|
||||
event: 'reconciliation_exchange_discovery',
|
||||
profileId: ctx.profileId,
|
||||
|
||||
91
backend/src/services/runtimeOrderRepository.ts
Normal file
91
backend/src/services/runtimeOrderRepository.ts
Normal file
@ -0,0 +1,91 @@
|
||||
import { supabaseService } from './SupabaseService.js';
|
||||
|
||||
export const logOrder = (...args: Parameters<typeof supabaseService.logOrder>) =>
|
||||
supabaseService.logOrder(...args);
|
||||
|
||||
export const logTransaction = (...args: Parameters<typeof supabaseService.logTransaction>) =>
|
||||
supabaseService.logTransaction(...args);
|
||||
|
||||
export const getOpenOrdersForProfile = (...args: Parameters<typeof supabaseService.getOpenOrdersForProfile>) =>
|
||||
supabaseService.getOpenOrdersForProfile(...args);
|
||||
|
||||
export const getRecentlyClosedOrdersForProfile = (...args: Parameters<typeof supabaseService.getRecentlyClosedOrdersForProfile>) =>
|
||||
supabaseService.getRecentlyClosedOrdersForProfile(...args);
|
||||
|
||||
export const getPendingOrdersForProfile = (...args: Parameters<typeof supabaseService.getPendingOrdersForProfile>) =>
|
||||
supabaseService.getPendingOrdersForProfile(...args);
|
||||
|
||||
export const getLatestOrder = (...args: Parameters<typeof supabaseService.getLatestOrder>) =>
|
||||
supabaseService.getLatestOrder(...args);
|
||||
|
||||
export const getOrderByTradeId = (...args: Parameters<typeof supabaseService.getOrderByTradeId>) =>
|
||||
supabaseService.getOrderByTradeId(...args);
|
||||
|
||||
export const getLatestEntryOrder = (...args: Parameters<typeof supabaseService.getLatestEntryOrder>) =>
|
||||
supabaseService.getLatestEntryOrder(...args);
|
||||
|
||||
export const getLatestFilledEntry = (...args: Parameters<typeof supabaseService.getLatestFilledEntry>) =>
|
||||
supabaseService.getLatestFilledEntry(...args);
|
||||
|
||||
export const getLatestEntryRiskOrder = (...args: Parameters<typeof supabaseService.getLatestEntryRiskOrder>) =>
|
||||
supabaseService.getLatestEntryRiskOrder(...args);
|
||||
|
||||
export const getVirtualOpenPosition = (...args: Parameters<typeof supabaseService.getVirtualOpenPosition>) =>
|
||||
supabaseService.getVirtualOpenPosition(...args);
|
||||
|
||||
export const getVirtualOpenPositionForTrade = (...args: Parameters<typeof supabaseService.getVirtualOpenPositionForTrade>) =>
|
||||
supabaseService.getVirtualOpenPositionForTrade(...args);
|
||||
|
||||
export const hasActiveOrderForTradeId = (...args: Parameters<typeof supabaseService.hasActiveOrderForTradeId>) =>
|
||||
supabaseService.hasActiveOrderForTradeId(...args);
|
||||
|
||||
export const hasFinalizedTradeHistory = (...args: Parameters<typeof supabaseService.hasFinalizedTradeHistory>) =>
|
||||
supabaseService.hasFinalizedTradeHistory(...args);
|
||||
|
||||
export const hasLifecycleEntryOrder = (...args: Parameters<typeof supabaseService.hasLifecycleEntryOrder>) =>
|
||||
supabaseService.hasLifecycleEntryOrder(...args);
|
||||
|
||||
export const hasLifecycleEntryOrderWithProfileSubTag = (...args: Parameters<typeof supabaseService.hasLifecycleEntryOrderWithProfileSubTag>) =>
|
||||
supabaseService.hasLifecycleEntryOrderWithProfileSubTag(...args);
|
||||
|
||||
export const isTradeLifecycleClosed = (...args: Parameters<typeof supabaseService.isTradeLifecycleClosed>) =>
|
||||
supabaseService.isTradeLifecycleClosed(...args);
|
||||
|
||||
export const getExistingOrderIds = (...args: Parameters<typeof supabaseService.getExistingOrderIds>) =>
|
||||
supabaseService.getExistingOrderIds(...args);
|
||||
|
||||
export const getKnownTradeIdsForProfile = (...args: Parameters<typeof supabaseService.getKnownTradeIdsForProfile>) =>
|
||||
supabaseService.getKnownTradeIdsForProfile(...args);
|
||||
|
||||
export const updateOrderStatus = (...args: Parameters<NonNullable<typeof supabaseService.updateOrderStatus>>) =>
|
||||
supabaseService.updateOrderStatus?.(...args);
|
||||
|
||||
export const getFilledLifecycleOrdersForProfile = (...args: Parameters<typeof supabaseService.getFilledLifecycleOrdersForProfile>) =>
|
||||
supabaseService.getFilledLifecycleOrdersForProfile(...args);
|
||||
|
||||
export const getFilledLifecycleOrdersForUser = (...args: Parameters<typeof supabaseService.getFilledLifecycleOrdersForUser>) =>
|
||||
supabaseService.getFilledLifecycleOrdersForUser(...args);
|
||||
|
||||
export const getFilledLifecycleOrdersGlobal = (...args: Parameters<typeof supabaseService.getFilledLifecycleOrdersGlobal>) =>
|
||||
supabaseService.getFilledLifecycleOrdersGlobal(...args);
|
||||
|
||||
export const insertReconciliationBackfillAuditRows = (...args: Parameters<typeof supabaseService.insertReconciliationBackfillAuditRows>) =>
|
||||
supabaseService.insertReconciliationBackfillAuditRows(...args);
|
||||
|
||||
export const upsertReconciliationBackfillOrders = (...args: Parameters<typeof supabaseService.upsertReconciliationBackfillOrders>) =>
|
||||
supabaseService.upsertReconciliationBackfillOrders(...args);
|
||||
|
||||
export const getReconciliationBackfillAuditRows = (...args: Parameters<typeof supabaseService.getReconciliationBackfillAuditRows>) =>
|
||||
supabaseService.getReconciliationBackfillAuditRows(...args);
|
||||
|
||||
export const getReconciliationBackfillBatchSummaries = (...args: Parameters<typeof supabaseService.getReconciliationBackfillBatchSummaries>) =>
|
||||
supabaseService.getReconciliationBackfillBatchSummaries(...args);
|
||||
|
||||
export const revertBackfillBatch = (...args: Parameters<typeof supabaseService.revertBackfillBatch>) =>
|
||||
supabaseService.revertBackfillBatch(...args);
|
||||
|
||||
export const repairMissingSubTagsForProfile = (...args: Parameters<typeof supabaseService.repairMissingSubTagsForProfile>) =>
|
||||
supabaseService.repairMissingSubTagsForProfile(...args);
|
||||
|
||||
export const getStaleOrders = (...args: Parameters<typeof supabaseService.getStaleOrders>) =>
|
||||
supabaseService.getStaleOrders(...args);
|
||||
Loading…
Reference in New Issue
Block a user