diff --git a/backend/src/index.ts b/backend/src/index.ts index da9534b..7c89847 100644 --- a/backend/src/index.ts +++ b/backend/src/index.ts @@ -20,6 +20,7 @@ import { reconciliationService } from './services/reconciliationService.js'; import { reconciliationWatchdogAutoResumeService } from './services/reconciliationWatchdogAutoResumeService.js'; import { listActiveTradeProfiles } from './services/profileRepository.js'; import { listActiveTradingUsers } from './services/userRepository.js'; +import * as runtimeOrderRepository from './services/runtimeOrderRepository.js'; async function main() { logger.info(`Starting ${config.PRODUCT_ID} trading backend...`); @@ -937,7 +938,7 @@ async function main() { logger.warn(`[Reconcile] ${failedSyncs}/${results.length} profile sync tasks failed during exchange reconciliation.`); } - const staleBacklog = await supabaseService.getStaleOrders(5); + const staleBacklog = await runtimeOrderRepository.getStaleOrders(5); for (const ctx of userContexts) { if (!ctx.profileId || ctx.profileId === 'global' || ctx.profileId.startsWith('default-')) continue; diff --git a/backend/src/scripts/cleanupStaleOrders.ts b/backend/src/scripts/cleanupStaleOrders.ts index 550fc67..269ce01 100644 --- a/backend/src/scripts/cleanupStaleOrders.ts +++ b/backend/src/scripts/cleanupStaleOrders.ts @@ -7,7 +7,7 @@ * Usage: npm run cleanup-stale-orders */ -import { supabaseService } from '../services/SupabaseService.js'; +import * as runtimeOrderRepository from '../services/runtimeOrderRepository.js'; import logger from '../utils/logger.js'; async function cleanupStaleOrders() { @@ -15,7 +15,7 @@ async function cleanupStaleOrders() { try { // Get orders older than 24 hours in pending_new status - const veryOldOrders = await supabaseService.getStaleOrders(24 * 60); // 24 hours in minutes + const veryOldOrders = await runtimeOrderRepository.getStaleOrders(24 * 60); // 24 hours in minutes if (!veryOldOrders || veryOldOrders.length === 0) { logger.info('[Cleanup] No very old stale orders found. Database is clean! ✅'); @@ -32,7 +32,7 @@ async function cleanupStaleOrders() { logger.info(`[Cleanup] Marking order ${orderId} as 'unknown' (age: ${ageHours}h, symbol: ${order.symbol})`); - await supabaseService.updateOrderStatus?.(orderId, 'unknown'); + await runtimeOrderRepository.updateOrderStatus(orderId, 'unknown'); updated++; } diff --git a/backend/src/scripts/revertExpiredOrders.ts b/backend/src/scripts/revertExpiredOrders.ts index 2680c14..4c7c622 100644 --- a/backend/src/scripts/revertExpiredOrders.ts +++ b/backend/src/scripts/revertExpiredOrders.ts @@ -8,7 +8,7 @@ * Usage: npm run revert-expired-orders */ -import { supabaseService } from '../services/SupabaseService.js'; +import * as runtimeOrderRepository from '../services/runtimeOrderRepository.js'; import logger from '../utils/logger.js'; async function revertExpiredOrders() { @@ -16,7 +16,7 @@ async function revertExpiredOrders() { try { // Find orders with status 'expired' or 'unknown' - const expiredOrders = await supabaseService.getExpiredOrUnknownOrders(); + const expiredOrders = await runtimeOrderRepository.getExpiredOrUnknownOrders(); if (!expiredOrders || expiredOrders.length === 0) { logger.info('[Revert] No expired or unknown orders found. Nothing to do! ✅'); @@ -33,7 +33,7 @@ async function revertExpiredOrders() { // Use updateOrderStatus to reset status // Note: filledAt is undefined since we are resetting to pending - await supabaseService.updateOrderStatus?.(orderId, 'pending_new'); + await runtimeOrderRepository.updateOrderStatus(orderId, 'pending_new'); updated++; } diff --git a/backend/src/services/executionManager.ts b/backend/src/services/executionManager.ts index ace0a25..82f0349 100644 --- a/backend/src/services/executionManager.ts +++ b/backend/src/services/executionManager.ts @@ -1,12 +1,12 @@ import { config } from '../config/index.js'; import { IExchangeConnector } from '../connectors/types.js'; import { RiskEngine, RiskProfile } from './riskEngine.js'; -import { MarketContext, RuleResult, SignalDirection } from '../strategies/rules/types.js'; -import logger from '../utils/logger.js'; -import { supabaseService } from './SupabaseService.js'; -import { Notifier } from './notifier.js'; -import { ApiServer } from './apiServer.js'; -import { SymbolMapper } from '../utils/symbolMapper.js'; +import { MarketContext, RuleResult, SignalDirection } from '../strategies/rules/types.js'; +import logger from '../utils/logger.js'; +import { Notifier } from './notifier.js'; +import { ApiServer } from './apiServer.js'; +import { SymbolMapper } from '../utils/symbolMapper.js'; +import * as runtimeOrderRepository from './runtimeOrderRepository.js'; let deprecationWarned = false; @@ -159,7 +159,7 @@ export class ExecutionManager { // Log order to database if (this.userId !== 'global') { - supabaseService.logOrder({ + runtimeOrderRepository.logOrder({ user_id: this.userId, order_id: order.id || undefined, symbol, @@ -254,7 +254,7 @@ export class ExecutionManager { // Log to Supabase if (this.userId !== 'global') { - supabaseService.logTransaction({ + runtimeOrderRepository.logTransaction({ user_id: this.userId, symbol, side: pos.side, @@ -356,7 +356,7 @@ export class ExecutionManager { // Log to Supabase if (this.userId !== 'global') { - supabaseService.logOrder({ + runtimeOrderRepository.logOrder({ user_id: this.userId, order_id: order.id, symbol, diff --git a/backend/src/services/reconciliationExitBackfillService.ts b/backend/src/services/reconciliationExitBackfillService.ts index 2e3e517..683bf27 100644 --- a/backend/src/services/reconciliationExitBackfillService.ts +++ b/backend/src/services/reconciliationExitBackfillService.ts @@ -7,9 +7,9 @@ import { observabilityService } from './observabilityService.js'; import { FilledLifecycleOrderRow, ReconciliationBackfillAuditInsert, - ReconciliationBackfillOrderInsert, - supabaseService + ReconciliationBackfillOrderInsert } from './SupabaseService.js'; +import * as runtimeOrderRepository from './runtimeOrderRepository.js'; import type { TradeExecutor } from './TradeExecutor.js'; import { extractOrderSubTag, @@ -544,7 +544,7 @@ export class ReconciliationExitBackfillService { }; } - const auditReady = await supabaseService.isReconciliationBackfillAuditAvailable(); + const auditReady = await runtimeOrderRepository.isReconciliationBackfillAuditAvailable(); if (!auditReady) { observabilityService.emitEvent({ type: 'SYSTEM_ERROR', @@ -565,7 +565,7 @@ export class ReconciliationExitBackfillService { }; } - const lifecycleRows = await supabaseService.getFilledLifecycleOrdersForProfile(profileId); + const lifecycleRows = await runtimeOrderRepository.getFilledLifecycleOrdersForProfile(profileId); const openTrades = buildOpenTradeSlices(profileId, lifecycleRows); const managedSymbolTokens = buildManagedBotSymbolTokenSet(); const scopedOpenTrades = openTrades.filter((trade) => { @@ -594,7 +594,7 @@ export class ReconciliationExitBackfillService { }; } - const pendingRows = await supabaseService.getOpenOrdersForProfile(profileId); + const pendingRows = await runtimeOrderRepository.getOpenOrdersForProfile(profileId); const pendingTradeIds = new Set( (pendingRows || []) .map((row) => String((row as any)?.trade_id || '').trim()) @@ -829,7 +829,7 @@ export class ReconciliationExitBackfillService { } const proposedOrderIds = proposedRows.map((row) => row.order.order_id); - const existingBefore = await supabaseService.getExistingOrderIds(proposedOrderIds, profileId); + const existingBefore = await runtimeOrderRepository.getExistingOrderIds(proposedOrderIds, profileId); const baseAuditRows: ReconciliationBackfillAuditInsert[] = proposedRows.map((row) => ({ batch_id: batchId, profile_id: profileId, @@ -855,7 +855,7 @@ export class ReconciliationExitBackfillService { const preAuditRows = dryRun ? [...baseAuditRows, ...noGoAuditRows, ...advisoryAuditRows] : [...baseAuditRows, ...noGoAuditRows, ...advisoryAuditRows]; - const preAuditSaved = await supabaseService.insertReconciliationBackfillAuditRows(preAuditRows); + const preAuditSaved = await runtimeOrderRepository.insertReconciliationBackfillAuditRows(preAuditRows); if (!preAuditSaved) { return { attempted: true, @@ -872,7 +872,7 @@ export class ReconciliationExitBackfillService { let insertedRows = 0; if (!dryRun && proposedRows.length > 0) { - const applyOk = await supabaseService.upsertReconciliationBackfillOrders(proposedRows.map((row) => row.order)); + const applyOk = await runtimeOrderRepository.upsertReconciliationBackfillOrders(proposedRows.map((row) => row.order)); if (!applyOk) { observabilityService.emitEvent({ type: 'SYSTEM_ERROR', @@ -893,7 +893,7 @@ export class ReconciliationExitBackfillService { }; } - const existingAfter = await supabaseService.getExistingOrderIds(proposedOrderIds, profileId); + const existingAfter = await runtimeOrderRepository.getExistingOrderIds(proposedOrderIds, profileId); insertedRows = proposedRows.filter((row) => !existingBefore.has(row.order.order_id) && existingAfter.has(row.order.order_id)).length; const postAuditRows: ReconciliationBackfillAuditInsert[] = proposedRows.map((row) => ({ @@ -918,7 +918,7 @@ export class ReconciliationExitBackfillService { }, applied_at: !existingBefore.has(row.order.order_id) ? new Date().toISOString() : null })); - const postSaved = await supabaseService.insertReconciliationBackfillAuditRows(postAuditRows); + const postSaved = await runtimeOrderRepository.insertReconciliationBackfillAuditRows(postAuditRows); if (!postSaved) { logger.error(`[ReconcileBackfill] Failed to persist post-apply audit rows for batch ${batchId}`); } diff --git a/backend/src/services/reconciliationOrderCoverageService.ts b/backend/src/services/reconciliationOrderCoverageService.ts index 93b146d..e9dcdd4 100644 --- a/backend/src/services/reconciliationOrderCoverageService.ts +++ b/backend/src/services/reconciliationOrderCoverageService.ts @@ -14,8 +14,8 @@ import { } from '../utils/alpacaSubTag.js'; import { healthTracker } from './healthTracker.js'; import { observabilityService } from './observabilityService.js'; -import { supabaseService } from './SupabaseService.js'; import type { TradeExecutor } from './TradeExecutor.js'; +import * as runtimeOrderRepository from './runtimeOrderRepository.js'; type CoverageAction = 'ENTRY' | 'EXIT'; @@ -288,7 +288,7 @@ export class ReconciliationOrderCoverageService { limitPerPage: fetchLimitPerPage, maxPages: fetchMaxPages }); - const knownTradeIds = await supabaseService.getKnownTradeIdsForProfile(ctx.profileId, tradeIdLookbackRows); + const knownTradeIds = await runtimeOrderRepository.getKnownTradeIdsForProfile(ctx.profileId, tradeIdLookbackRows); const candidateByOrderId = new Map(); const unattributedRows: UnattributedOrderSample[] = []; @@ -428,7 +428,7 @@ export class ReconciliationOrderCoverageService { let actionableUnattributedRows = unattributedRows; if (unattributedRows.length > 0) { - const legacyKnownIds = await supabaseService.getExistingOrderIds( + const legacyKnownIds = await runtimeOrderRepository.getExistingOrderIds( unattributedRows.map((row) => row.orderId), ctx.profileId ); @@ -524,7 +524,7 @@ export class ReconciliationOrderCoverageService { result.eligibleOrders = candidates.length; if (candidates.length === 0) return result; - const existing = await supabaseService.getExistingOrderIds( + const existing = await runtimeOrderRepository.getExistingOrderIds( candidates.map((row) => row.orderId) ); const missing = candidates.filter((row) => !existing.has(row.orderId)); @@ -536,10 +536,10 @@ export class ReconciliationOrderCoverageService { result.skippedMaxInsertLimit = Math.max(0, missing.length - toInsert.length); for (const row of toInsert) { - await supabaseService.logOrder(row.payload); + await runtimeOrderRepository.logOrder(row.payload); } - const insertedSet = await supabaseService.getExistingOrderIds( + const insertedSet = await runtimeOrderRepository.getExistingOrderIds( toInsert.map((row) => row.orderId) ); result.insertedRows = toInsert.filter((row) => insertedSet.has(row.orderId)).length; diff --git a/backend/src/services/reconciliationParityHeartbeatService.ts b/backend/src/services/reconciliationParityHeartbeatService.ts index 9790b1d..3acb7c1 100644 --- a/backend/src/services/reconciliationParityHeartbeatService.ts +++ b/backend/src/services/reconciliationParityHeartbeatService.ts @@ -8,6 +8,7 @@ import { getTradeProfileCapital } from './profileRepository.js'; import type { TradeExecutor } from './TradeExecutor.js'; import { buildAlpacaSubTag } from '../utils/alpacaSubTag.js'; import { normalizeBotSymbolToken } from '../utils/botSymbolScope.js'; +import * as runtimeOrderRepository from './runtimeOrderRepository.js'; type TradeSide = 'BUY' | 'SELL'; @@ -109,7 +110,7 @@ const normalizeTradeSlices = async ( profileId: string, symbol: string ): Promise => { - const virtualPosition = await supabaseService.getVirtualOpenPosition(profileId, symbol); + const virtualPosition = await runtimeOrderRepository.getVirtualOpenPosition(profileId, symbol); if (!virtualPosition || !(toNumber(virtualPosition.qty) > 0)) return []; const tradeIds = Array.from(new Set( @@ -121,7 +122,7 @@ const normalizeTradeSlices = async ( const slices: TradeSlice[] = []; for (const tradeId of tradeIds) { - const slice = await supabaseService.getVirtualOpenPositionForTrade(profileId, symbol, tradeId); + const slice = await runtimeOrderRepository.getVirtualOpenPositionForTrade(profileId, symbol, tradeId); if (!slice || !(toNumber(slice.qty) > 0)) continue; slices.push({ symbol: String(slice.symbol || symbol).trim() || symbol, @@ -325,7 +326,7 @@ export class ReconciliationParityHeartbeatService { }; if (requireSubTagAttribution) { - let attributed = await supabaseService.hasLifecycleEntryOrderWithProfileSubTag( + let attributed = await runtimeOrderRepository.hasLifecycleEntryOrderWithProfileSubTag( trade.tradeId, profileId, trade.symbol @@ -333,7 +334,7 @@ export class ReconciliationParityHeartbeatService { let attributionMode: 'subtag' | 'legacy_entry' | null = attributed ? 'subtag' : null; if (!attributed && allowLegacyEntryAttribution) { - const legacyAttributed = await supabaseService.hasLifecycleEntryOrder( + const legacyAttributed = await runtimeOrderRepository.hasLifecycleEntryOrder( trade.tradeId, profileId, trade.symbol @@ -388,7 +389,7 @@ export class ReconciliationParityHeartbeatService { } const synthetic = this.buildSyntheticExitPayload(ctx, trade); - const existingIds = await supabaseService.getExistingOrderIds([synthetic.orderId], profileId); + const existingIds = await runtimeOrderRepository.getExistingOrderIds([synthetic.orderId], profileId); if (existingIds.has(synthetic.orderId)) { this.tradeParityState.delete(stateKey); continue; @@ -410,7 +411,7 @@ export class ReconciliationParityHeartbeatService { continue; } - await supabaseService.logOrder(synthetic.payload); + await runtimeOrderRepository.logOrder(synthetic.payload); await ctx.executor.reconcileExitFill( synthetic.payload, Number(synthetic.payload.price || 0), diff --git a/backend/src/services/runtimeOrderRepository.ts b/backend/src/services/runtimeOrderRepository.ts index 43f286c..03b1e91 100644 --- a/backend/src/services/runtimeOrderRepository.ts +++ b/backend/src/services/runtimeOrderRepository.ts @@ -89,3 +89,9 @@ export const repairMissingSubTagsForProfile = (...args: Parameters) => supabaseService.getStaleOrders(...args); + +export const getExpiredOrUnknownOrders = (...args: Parameters) => + supabaseService.getExpiredOrUnknownOrders(...args); + +export const isReconciliationBackfillAuditAvailable = (...args: Parameters) => + supabaseService.isReconciliationBackfillAuditAvailable(...args); diff --git a/backend/src/services/tradeMonitor.ts b/backend/src/services/tradeMonitor.ts index 170f460..50c345c 100644 --- a/backend/src/services/tradeMonitor.ts +++ b/backend/src/services/tradeMonitor.ts @@ -7,8 +7,8 @@ import { ApiServer } from './apiServer.js'; import { healthTracker } from './healthTracker.js'; import { observabilityService } from './observabilityService.js'; import { buildManagedBotSymbolTokenSet, isManagedBotSymbol } from '../utils/botSymbolScope.js'; -import { supabaseService } from './SupabaseService.js'; import { extractOrderSubTag, subTagBelongsToProfile, subTagHintsTrade } from '../utils/alpacaSubTag.js'; +import * as runtimeOrderRepository from './runtimeOrderRepository.js'; export class TradeMonitor { private interval: NodeJS.Timeout | null = null; @@ -335,7 +335,7 @@ export class TradeMonitor { pendingOrders.delete(orderId); logger.info(`[TradeMonitor] Limit order ${orderId} cancelled due to timeout.`); try { - await supabaseService.updateOrderStatus(orderId, 'canceled'); + await runtimeOrderRepository.updateOrderStatus(orderId, 'canceled'); } catch (e) { logger.warn(`[TradeMonitor] Failed to update DB status for timed-out order ${orderId}: ${e}`); } @@ -344,7 +344,7 @@ export class TradeMonitor { logger.warn('[TradeMonitor] Exchange does not support cancelOrder. Removing from local tracking.'); pendingOrders.delete(orderId); try { - await supabaseService.updateOrderStatus(orderId, 'canceled'); + await runtimeOrderRepository.updateOrderStatus(orderId, 'canceled'); } catch (e) { logger.warn(`[TradeMonitor] Failed to update DB status for timed-out order ${orderId}: ${e}`); }