refactor: align reconciliation runtime flows with repository

This commit is contained in:
Saravana Achu Mac 2026-04-04 16:28:59 -07:00
parent d1da7ec70c
commit 1f2b7bdf89
9 changed files with 49 additions and 41 deletions

View File

@ -20,6 +20,7 @@ import { reconciliationService } from './services/reconciliationService.js';
import { reconciliationWatchdogAutoResumeService } from './services/reconciliationWatchdogAutoResumeService.js'; import { reconciliationWatchdogAutoResumeService } from './services/reconciliationWatchdogAutoResumeService.js';
import { listActiveTradeProfiles } from './services/profileRepository.js'; import { listActiveTradeProfiles } from './services/profileRepository.js';
import { listActiveTradingUsers } from './services/userRepository.js'; import { listActiveTradingUsers } from './services/userRepository.js';
import * as runtimeOrderRepository from './services/runtimeOrderRepository.js';
async function main() { async function main() {
logger.info(`Starting ${config.PRODUCT_ID} trading backend...`); logger.info(`Starting ${config.PRODUCT_ID} trading backend...`);
@ -937,7 +938,7 @@ async function main() {
logger.warn(`[Reconcile] ${failedSyncs}/${results.length} profile sync tasks failed during exchange reconciliation.`); logger.warn(`[Reconcile] ${failedSyncs}/${results.length} profile sync tasks failed during exchange reconciliation.`);
} }
const staleBacklog = await supabaseService.getStaleOrders(5); const staleBacklog = await runtimeOrderRepository.getStaleOrders(5);
for (const ctx of userContexts) { for (const ctx of userContexts) {
if (!ctx.profileId || ctx.profileId === 'global' || ctx.profileId.startsWith('default-')) continue; if (!ctx.profileId || ctx.profileId === 'global' || ctx.profileId.startsWith('default-')) continue;

View File

@ -7,7 +7,7 @@
* Usage: npm run cleanup-stale-orders * Usage: npm run cleanup-stale-orders
*/ */
import { supabaseService } from '../services/SupabaseService.js'; import * as runtimeOrderRepository from '../services/runtimeOrderRepository.js';
import logger from '../utils/logger.js'; import logger from '../utils/logger.js';
async function cleanupStaleOrders() { async function cleanupStaleOrders() {
@ -15,7 +15,7 @@ async function cleanupStaleOrders() {
try { try {
// Get orders older than 24 hours in pending_new status // Get orders older than 24 hours in pending_new status
const veryOldOrders = await supabaseService.getStaleOrders(24 * 60); // 24 hours in minutes const veryOldOrders = await runtimeOrderRepository.getStaleOrders(24 * 60); // 24 hours in minutes
if (!veryOldOrders || veryOldOrders.length === 0) { if (!veryOldOrders || veryOldOrders.length === 0) {
logger.info('[Cleanup] No very old stale orders found. Database is clean! ✅'); logger.info('[Cleanup] No very old stale orders found. Database is clean! ✅');
@ -32,7 +32,7 @@ async function cleanupStaleOrders() {
logger.info(`[Cleanup] Marking order ${orderId} as 'unknown' (age: ${ageHours}h, symbol: ${order.symbol})`); logger.info(`[Cleanup] Marking order ${orderId} as 'unknown' (age: ${ageHours}h, symbol: ${order.symbol})`);
await supabaseService.updateOrderStatus?.(orderId, 'unknown'); await runtimeOrderRepository.updateOrderStatus(orderId, 'unknown');
updated++; updated++;
} }

View File

@ -8,7 +8,7 @@
* Usage: npm run revert-expired-orders * Usage: npm run revert-expired-orders
*/ */
import { supabaseService } from '../services/SupabaseService.js'; import * as runtimeOrderRepository from '../services/runtimeOrderRepository.js';
import logger from '../utils/logger.js'; import logger from '../utils/logger.js';
async function revertExpiredOrders() { async function revertExpiredOrders() {
@ -16,7 +16,7 @@ async function revertExpiredOrders() {
try { try {
// Find orders with status 'expired' or 'unknown' // Find orders with status 'expired' or 'unknown'
const expiredOrders = await supabaseService.getExpiredOrUnknownOrders(); const expiredOrders = await runtimeOrderRepository.getExpiredOrUnknownOrders();
if (!expiredOrders || expiredOrders.length === 0) { if (!expiredOrders || expiredOrders.length === 0) {
logger.info('[Revert] No expired or unknown orders found. Nothing to do! ✅'); logger.info('[Revert] No expired or unknown orders found. Nothing to do! ✅');
@ -33,7 +33,7 @@ async function revertExpiredOrders() {
// Use updateOrderStatus to reset status // Use updateOrderStatus to reset status
// Note: filledAt is undefined since we are resetting to pending // Note: filledAt is undefined since we are resetting to pending
await supabaseService.updateOrderStatus?.(orderId, 'pending_new'); await runtimeOrderRepository.updateOrderStatus(orderId, 'pending_new');
updated++; updated++;
} }

View File

@ -1,12 +1,12 @@
import { config } from '../config/index.js'; import { config } from '../config/index.js';
import { IExchangeConnector } from '../connectors/types.js'; import { IExchangeConnector } from '../connectors/types.js';
import { RiskEngine, RiskProfile } from './riskEngine.js'; import { RiskEngine, RiskProfile } from './riskEngine.js';
import { MarketContext, RuleResult, SignalDirection } from '../strategies/rules/types.js'; import { MarketContext, RuleResult, SignalDirection } from '../strategies/rules/types.js';
import logger from '../utils/logger.js'; import logger from '../utils/logger.js';
import { supabaseService } from './SupabaseService.js'; import { Notifier } from './notifier.js';
import { Notifier } from './notifier.js'; import { ApiServer } from './apiServer.js';
import { ApiServer } from './apiServer.js'; import { SymbolMapper } from '../utils/symbolMapper.js';
import { SymbolMapper } from '../utils/symbolMapper.js'; import * as runtimeOrderRepository from './runtimeOrderRepository.js';
let deprecationWarned = false; let deprecationWarned = false;
@ -159,7 +159,7 @@ export class ExecutionManager {
// Log order to database // Log order to database
if (this.userId !== 'global') { if (this.userId !== 'global') {
supabaseService.logOrder({ runtimeOrderRepository.logOrder({
user_id: this.userId, user_id: this.userId,
order_id: order.id || undefined, order_id: order.id || undefined,
symbol, symbol,
@ -254,7 +254,7 @@ export class ExecutionManager {
// Log to Supabase // Log to Supabase
if (this.userId !== 'global') { if (this.userId !== 'global') {
supabaseService.logTransaction({ runtimeOrderRepository.logTransaction({
user_id: this.userId, user_id: this.userId,
symbol, symbol,
side: pos.side, side: pos.side,
@ -356,7 +356,7 @@ export class ExecutionManager {
// Log to Supabase // Log to Supabase
if (this.userId !== 'global') { if (this.userId !== 'global') {
supabaseService.logOrder({ runtimeOrderRepository.logOrder({
user_id: this.userId, user_id: this.userId,
order_id: order.id, order_id: order.id,
symbol, symbol,

View File

@ -7,9 +7,9 @@ import { observabilityService } from './observabilityService.js';
import { import {
FilledLifecycleOrderRow, FilledLifecycleOrderRow,
ReconciliationBackfillAuditInsert, ReconciliationBackfillAuditInsert,
ReconciliationBackfillOrderInsert, ReconciliationBackfillOrderInsert
supabaseService
} from './SupabaseService.js'; } from './SupabaseService.js';
import * as runtimeOrderRepository from './runtimeOrderRepository.js';
import type { TradeExecutor } from './TradeExecutor.js'; import type { TradeExecutor } from './TradeExecutor.js';
import { import {
extractOrderSubTag, extractOrderSubTag,
@ -544,7 +544,7 @@ export class ReconciliationExitBackfillService {
}; };
} }
const auditReady = await supabaseService.isReconciliationBackfillAuditAvailable(); const auditReady = await runtimeOrderRepository.isReconciliationBackfillAuditAvailable();
if (!auditReady) { if (!auditReady) {
observabilityService.emitEvent({ observabilityService.emitEvent({
type: 'SYSTEM_ERROR', type: 'SYSTEM_ERROR',
@ -565,7 +565,7 @@ export class ReconciliationExitBackfillService {
}; };
} }
const lifecycleRows = await supabaseService.getFilledLifecycleOrdersForProfile(profileId); const lifecycleRows = await runtimeOrderRepository.getFilledLifecycleOrdersForProfile(profileId);
const openTrades = buildOpenTradeSlices(profileId, lifecycleRows); const openTrades = buildOpenTradeSlices(profileId, lifecycleRows);
const managedSymbolTokens = buildManagedBotSymbolTokenSet(); const managedSymbolTokens = buildManagedBotSymbolTokenSet();
const scopedOpenTrades = openTrades.filter((trade) => { const scopedOpenTrades = openTrades.filter((trade) => {
@ -594,7 +594,7 @@ export class ReconciliationExitBackfillService {
}; };
} }
const pendingRows = await supabaseService.getOpenOrdersForProfile(profileId); const pendingRows = await runtimeOrderRepository.getOpenOrdersForProfile(profileId);
const pendingTradeIds = new Set( const pendingTradeIds = new Set(
(pendingRows || []) (pendingRows || [])
.map((row) => String((row as any)?.trade_id || '').trim()) .map((row) => String((row as any)?.trade_id || '').trim())
@ -829,7 +829,7 @@ export class ReconciliationExitBackfillService {
} }
const proposedOrderIds = proposedRows.map((row) => row.order.order_id); const proposedOrderIds = proposedRows.map((row) => row.order.order_id);
const existingBefore = await supabaseService.getExistingOrderIds(proposedOrderIds, profileId); const existingBefore = await runtimeOrderRepository.getExistingOrderIds(proposedOrderIds, profileId);
const baseAuditRows: ReconciliationBackfillAuditInsert[] = proposedRows.map((row) => ({ const baseAuditRows: ReconciliationBackfillAuditInsert[] = proposedRows.map((row) => ({
batch_id: batchId, batch_id: batchId,
profile_id: profileId, profile_id: profileId,
@ -855,7 +855,7 @@ export class ReconciliationExitBackfillService {
const preAuditRows = dryRun const preAuditRows = dryRun
? [...baseAuditRows, ...noGoAuditRows, ...advisoryAuditRows] ? [...baseAuditRows, ...noGoAuditRows, ...advisoryAuditRows]
: [...baseAuditRows, ...noGoAuditRows, ...advisoryAuditRows]; : [...baseAuditRows, ...noGoAuditRows, ...advisoryAuditRows];
const preAuditSaved = await supabaseService.insertReconciliationBackfillAuditRows(preAuditRows); const preAuditSaved = await runtimeOrderRepository.insertReconciliationBackfillAuditRows(preAuditRows);
if (!preAuditSaved) { if (!preAuditSaved) {
return { return {
attempted: true, attempted: true,
@ -872,7 +872,7 @@ export class ReconciliationExitBackfillService {
let insertedRows = 0; let insertedRows = 0;
if (!dryRun && proposedRows.length > 0) { if (!dryRun && proposedRows.length > 0) {
const applyOk = await supabaseService.upsertReconciliationBackfillOrders(proposedRows.map((row) => row.order)); const applyOk = await runtimeOrderRepository.upsertReconciliationBackfillOrders(proposedRows.map((row) => row.order));
if (!applyOk) { if (!applyOk) {
observabilityService.emitEvent({ observabilityService.emitEvent({
type: 'SYSTEM_ERROR', type: 'SYSTEM_ERROR',
@ -893,7 +893,7 @@ export class ReconciliationExitBackfillService {
}; };
} }
const existingAfter = await supabaseService.getExistingOrderIds(proposedOrderIds, profileId); const existingAfter = await runtimeOrderRepository.getExistingOrderIds(proposedOrderIds, profileId);
insertedRows = proposedRows.filter((row) => !existingBefore.has(row.order.order_id) && existingAfter.has(row.order.order_id)).length; insertedRows = proposedRows.filter((row) => !existingBefore.has(row.order.order_id) && existingAfter.has(row.order.order_id)).length;
const postAuditRows: ReconciliationBackfillAuditInsert[] = proposedRows.map((row) => ({ const postAuditRows: ReconciliationBackfillAuditInsert[] = proposedRows.map((row) => ({
@ -918,7 +918,7 @@ export class ReconciliationExitBackfillService {
}, },
applied_at: !existingBefore.has(row.order.order_id) ? new Date().toISOString() : null applied_at: !existingBefore.has(row.order.order_id) ? new Date().toISOString() : null
})); }));
const postSaved = await supabaseService.insertReconciliationBackfillAuditRows(postAuditRows); const postSaved = await runtimeOrderRepository.insertReconciliationBackfillAuditRows(postAuditRows);
if (!postSaved) { if (!postSaved) {
logger.error(`[ReconcileBackfill] Failed to persist post-apply audit rows for batch ${batchId}`); logger.error(`[ReconcileBackfill] Failed to persist post-apply audit rows for batch ${batchId}`);
} }

View File

@ -14,8 +14,8 @@ import {
} from '../utils/alpacaSubTag.js'; } from '../utils/alpacaSubTag.js';
import { healthTracker } from './healthTracker.js'; import { healthTracker } from './healthTracker.js';
import { observabilityService } from './observabilityService.js'; import { observabilityService } from './observabilityService.js';
import { supabaseService } from './SupabaseService.js';
import type { TradeExecutor } from './TradeExecutor.js'; import type { TradeExecutor } from './TradeExecutor.js';
import * as runtimeOrderRepository from './runtimeOrderRepository.js';
type CoverageAction = 'ENTRY' | 'EXIT'; type CoverageAction = 'ENTRY' | 'EXIT';
@ -288,7 +288,7 @@ export class ReconciliationOrderCoverageService {
limitPerPage: fetchLimitPerPage, limitPerPage: fetchLimitPerPage,
maxPages: fetchMaxPages maxPages: fetchMaxPages
}); });
const knownTradeIds = await supabaseService.getKnownTradeIdsForProfile(ctx.profileId, tradeIdLookbackRows); const knownTradeIds = await runtimeOrderRepository.getKnownTradeIdsForProfile(ctx.profileId, tradeIdLookbackRows);
const candidateByOrderId = new Map<string, MissingOrderCandidate>(); const candidateByOrderId = new Map<string, MissingOrderCandidate>();
const unattributedRows: UnattributedOrderSample[] = []; const unattributedRows: UnattributedOrderSample[] = [];
@ -428,7 +428,7 @@ export class ReconciliationOrderCoverageService {
let actionableUnattributedRows = unattributedRows; let actionableUnattributedRows = unattributedRows;
if (unattributedRows.length > 0) { if (unattributedRows.length > 0) {
const legacyKnownIds = await supabaseService.getExistingOrderIds( const legacyKnownIds = await runtimeOrderRepository.getExistingOrderIds(
unattributedRows.map((row) => row.orderId), unattributedRows.map((row) => row.orderId),
ctx.profileId ctx.profileId
); );
@ -524,7 +524,7 @@ export class ReconciliationOrderCoverageService {
result.eligibleOrders = candidates.length; result.eligibleOrders = candidates.length;
if (candidates.length === 0) return result; if (candidates.length === 0) return result;
const existing = await supabaseService.getExistingOrderIds( const existing = await runtimeOrderRepository.getExistingOrderIds(
candidates.map((row) => row.orderId) candidates.map((row) => row.orderId)
); );
const missing = candidates.filter((row) => !existing.has(row.orderId)); const missing = candidates.filter((row) => !existing.has(row.orderId));
@ -536,10 +536,10 @@ export class ReconciliationOrderCoverageService {
result.skippedMaxInsertLimit = Math.max(0, missing.length - toInsert.length); result.skippedMaxInsertLimit = Math.max(0, missing.length - toInsert.length);
for (const row of toInsert) { for (const row of toInsert) {
await supabaseService.logOrder(row.payload); await runtimeOrderRepository.logOrder(row.payload);
} }
const insertedSet = await supabaseService.getExistingOrderIds( const insertedSet = await runtimeOrderRepository.getExistingOrderIds(
toInsert.map((row) => row.orderId) toInsert.map((row) => row.orderId)
); );
result.insertedRows = toInsert.filter((row) => insertedSet.has(row.orderId)).length; result.insertedRows = toInsert.filter((row) => insertedSet.has(row.orderId)).length;

View File

@ -8,6 +8,7 @@ import { getTradeProfileCapital } from './profileRepository.js';
import type { TradeExecutor } from './TradeExecutor.js'; import type { TradeExecutor } from './TradeExecutor.js';
import { buildAlpacaSubTag } from '../utils/alpacaSubTag.js'; import { buildAlpacaSubTag } from '../utils/alpacaSubTag.js';
import { normalizeBotSymbolToken } from '../utils/botSymbolScope.js'; import { normalizeBotSymbolToken } from '../utils/botSymbolScope.js';
import * as runtimeOrderRepository from './runtimeOrderRepository.js';
type TradeSide = 'BUY' | 'SELL'; type TradeSide = 'BUY' | 'SELL';
@ -109,7 +110,7 @@ const normalizeTradeSlices = async (
profileId: string, profileId: string,
symbol: string symbol: string
): Promise<TradeSlice[]> => { ): Promise<TradeSlice[]> => {
const virtualPosition = await supabaseService.getVirtualOpenPosition(profileId, symbol); const virtualPosition = await runtimeOrderRepository.getVirtualOpenPosition(profileId, symbol);
if (!virtualPosition || !(toNumber(virtualPosition.qty) > 0)) return []; if (!virtualPosition || !(toNumber(virtualPosition.qty) > 0)) return [];
const tradeIds = Array.from(new Set( const tradeIds = Array.from(new Set(
@ -121,7 +122,7 @@ const normalizeTradeSlices = async (
const slices: TradeSlice[] = []; const slices: TradeSlice[] = [];
for (const tradeId of tradeIds) { for (const tradeId of tradeIds) {
const slice = await supabaseService.getVirtualOpenPositionForTrade(profileId, symbol, tradeId); const slice = await runtimeOrderRepository.getVirtualOpenPositionForTrade(profileId, symbol, tradeId);
if (!slice || !(toNumber(slice.qty) > 0)) continue; if (!slice || !(toNumber(slice.qty) > 0)) continue;
slices.push({ slices.push({
symbol: String(slice.symbol || symbol).trim() || symbol, symbol: String(slice.symbol || symbol).trim() || symbol,
@ -325,7 +326,7 @@ export class ReconciliationParityHeartbeatService {
}; };
if (requireSubTagAttribution) { if (requireSubTagAttribution) {
let attributed = await supabaseService.hasLifecycleEntryOrderWithProfileSubTag( let attributed = await runtimeOrderRepository.hasLifecycleEntryOrderWithProfileSubTag(
trade.tradeId, trade.tradeId,
profileId, profileId,
trade.symbol trade.symbol
@ -333,7 +334,7 @@ export class ReconciliationParityHeartbeatService {
let attributionMode: 'subtag' | 'legacy_entry' | null = attributed ? 'subtag' : null; let attributionMode: 'subtag' | 'legacy_entry' | null = attributed ? 'subtag' : null;
if (!attributed && allowLegacyEntryAttribution) { if (!attributed && allowLegacyEntryAttribution) {
const legacyAttributed = await supabaseService.hasLifecycleEntryOrder( const legacyAttributed = await runtimeOrderRepository.hasLifecycleEntryOrder(
trade.tradeId, trade.tradeId,
profileId, profileId,
trade.symbol trade.symbol
@ -388,7 +389,7 @@ export class ReconciliationParityHeartbeatService {
} }
const synthetic = this.buildSyntheticExitPayload(ctx, trade); const synthetic = this.buildSyntheticExitPayload(ctx, trade);
const existingIds = await supabaseService.getExistingOrderIds([synthetic.orderId], profileId); const existingIds = await runtimeOrderRepository.getExistingOrderIds([synthetic.orderId], profileId);
if (existingIds.has(synthetic.orderId)) { if (existingIds.has(synthetic.orderId)) {
this.tradeParityState.delete(stateKey); this.tradeParityState.delete(stateKey);
continue; continue;
@ -410,7 +411,7 @@ export class ReconciliationParityHeartbeatService {
continue; continue;
} }
await supabaseService.logOrder(synthetic.payload); await runtimeOrderRepository.logOrder(synthetic.payload);
await ctx.executor.reconcileExitFill( await ctx.executor.reconcileExitFill(
synthetic.payload, synthetic.payload,
Number(synthetic.payload.price || 0), Number(synthetic.payload.price || 0),

View File

@ -89,3 +89,9 @@ export const repairMissingSubTagsForProfile = (...args: Parameters<typeof supaba
export const getStaleOrders = (...args: Parameters<typeof supabaseService.getStaleOrders>) => export const getStaleOrders = (...args: Parameters<typeof supabaseService.getStaleOrders>) =>
supabaseService.getStaleOrders(...args); supabaseService.getStaleOrders(...args);
export const getExpiredOrUnknownOrders = (...args: Parameters<typeof supabaseService.getExpiredOrUnknownOrders>) =>
supabaseService.getExpiredOrUnknownOrders(...args);
export const isReconciliationBackfillAuditAvailable = (...args: Parameters<typeof supabaseService.isReconciliationBackfillAuditAvailable>) =>
supabaseService.isReconciliationBackfillAuditAvailable(...args);

View File

@ -7,8 +7,8 @@ import { ApiServer } from './apiServer.js';
import { healthTracker } from './healthTracker.js'; import { healthTracker } from './healthTracker.js';
import { observabilityService } from './observabilityService.js'; import { observabilityService } from './observabilityService.js';
import { buildManagedBotSymbolTokenSet, isManagedBotSymbol } from '../utils/botSymbolScope.js'; import { buildManagedBotSymbolTokenSet, isManagedBotSymbol } from '../utils/botSymbolScope.js';
import { supabaseService } from './SupabaseService.js';
import { extractOrderSubTag, subTagBelongsToProfile, subTagHintsTrade } from '../utils/alpacaSubTag.js'; import { extractOrderSubTag, subTagBelongsToProfile, subTagHintsTrade } from '../utils/alpacaSubTag.js';
import * as runtimeOrderRepository from './runtimeOrderRepository.js';
export class TradeMonitor { export class TradeMonitor {
private interval: NodeJS.Timeout | null = null; private interval: NodeJS.Timeout | null = null;
@ -335,7 +335,7 @@ export class TradeMonitor {
pendingOrders.delete(orderId); pendingOrders.delete(orderId);
logger.info(`[TradeMonitor] Limit order ${orderId} cancelled due to timeout.`); logger.info(`[TradeMonitor] Limit order ${orderId} cancelled due to timeout.`);
try { try {
await supabaseService.updateOrderStatus(orderId, 'canceled'); await runtimeOrderRepository.updateOrderStatus(orderId, 'canceled');
} catch (e) { } catch (e) {
logger.warn(`[TradeMonitor] Failed to update DB status for timed-out order ${orderId}: ${e}`); logger.warn(`[TradeMonitor] Failed to update DB status for timed-out order ${orderId}: ${e}`);
} }
@ -344,7 +344,7 @@ export class TradeMonitor {
logger.warn('[TradeMonitor] Exchange does not support cancelOrder. Removing from local tracking.'); logger.warn('[TradeMonitor] Exchange does not support cancelOrder. Removing from local tracking.');
pendingOrders.delete(orderId); pendingOrders.delete(orderId);
try { try {
await supabaseService.updateOrderStatus(orderId, 'canceled'); await runtimeOrderRepository.updateOrderStatus(orderId, 'canceled');
} catch (e) { } catch (e) {
logger.warn(`[TradeMonitor] Failed to update DB status for timed-out order ${orderId}: ${e}`); logger.warn(`[TradeMonitor] Failed to update DB status for timed-out order ${orderId}: ${e}`);
} }