diff --git a/backend/src/services/AutoTrader.ts b/backend/src/services/AutoTrader.ts index 57f6682..b6b3f68 100644 --- a/backend/src/services/AutoTrader.ts +++ b/backend/src/services/AutoTrader.ts @@ -5,7 +5,6 @@ import { TradeExecutor } from './TradeExecutor.js'; import logger from '../utils/logger.js'; import { SymbolMapper } from '../utils/symbolMapper.js'; import { IExchangeConnector } from '../connectors/types.js'; -import { supabaseService } from './SupabaseService.js'; import { healthTracker } from './healthTracker.js'; import { getProfileConsecutiveLosses, @@ -334,7 +333,7 @@ export class AutoTrader { const maxDailyLossUsd = Number(riskLimits.maxDailyLossUsd); if (Number.isFinite(maxDailyLossUsd) && maxDailyLossUsd > 0) { - const dailyLossUsd = await getProfileDailyLossUsd(profileId, supabaseService); + const dailyLossUsd = await getProfileDailyLossUsd(profileId); if (dailyLossUsd >= maxDailyLossUsd) { return { allowed: false, @@ -345,7 +344,7 @@ export class AutoTrader { const dailyProfitTargetUsd = Number(riskLimits.dailyProfitTargetUsd); if (Number.isFinite(dailyProfitTargetUsd) && dailyProfitTargetUsd > 0) { - const dailyNetPnl = await getProfileDailyNetPnlUsd(profileId, supabaseService); + const dailyNetPnl = await getProfileDailyNetPnlUsd(profileId); if (dailyNetPnl >= dailyProfitTargetUsd) { return { allowed: false, @@ -356,7 +355,7 @@ export class AutoTrader { const maxConsecutiveLosses = Number(riskLimits.maxConsecutiveLosses); if (Number.isFinite(maxConsecutiveLosses) && maxConsecutiveLosses > 0) { - const consecutiveLosses = await getProfileConsecutiveLosses(profileId, 100, supabaseService); + const consecutiveLosses = await getProfileConsecutiveLosses(profileId, 100); if (consecutiveLosses >= maxConsecutiveLosses) { return { allowed: false, diff --git a/backend/src/services/apiServer.ts b/backend/src/services/apiServer.ts index 21f2600..ecc5d64 100644 --- a/backend/src/services/apiServer.ts +++ b/backend/src/services/apiServer.ts @@ -1845,7 +1845,7 @@ export class ApiServer { } try { - const entries = await listManualEntriesForUser(authUserId, supabaseService); + const entries = await listManualEntriesForUser(authUserId); res.json({ entries }); } catch (error: any) { res.status(500).json({ error: `Failed to load manual entries: ${error.message}` }); @@ -1867,16 +1867,14 @@ export class ApiServer { const orderLimit = Math.max(1, Math.min(5000, parseInt(String(req.query.limit || '5000'), 10) || 5000)); const [entries, orders, historyTradeKeys, profiles] = await Promise.all([ - listManualEntriesForUser(authUserId, supabaseService), + listManualEntriesForUser(authUserId), listRecentOrders({ userId: wantsAll ? undefined : authUserId, - limit: orderLimit, - legacyService: supabaseService + limit: orderLimit }), listRecentTradeHistoryKeys({ userId: wantsAll ? undefined : authUserId, - limit: orderLimit, - legacyService: supabaseService + limit: orderLimit }), wantsAll ? listAllTradeProfiles(supabaseService) @@ -1910,8 +1908,7 @@ export class ApiServer { const rows = await listRecentTradeHistory({ userId: wantsAll ? undefined : authUserId, - limit, - legacyService: supabaseService + limit }); res.json({ rows }); @@ -1928,7 +1925,7 @@ export class ApiServer { } try { - const entry = await saveManualEntryForUser(authUserId, req.body || {}, supabaseService); + const entry = await saveManualEntryForUser(authUserId, req.body || {}); res.status(201).json({ entry }); } catch (error: any) { res.status(400).json({ error: `Failed to save manual entry: ${error.message}` }); @@ -1946,7 +1943,7 @@ export class ApiServer { const entry = await saveManualEntryForUser(authUserId, { ...(req.body || {}), stock_instance_id: String(req.params.id || '').trim() - }, supabaseService); + }); res.json({ entry }); } catch (error: any) { res.status(400).json({ error: `Failed to update manual entry: ${error.message}` }); @@ -1961,7 +1958,7 @@ export class ApiServer { } try { - await deleteManualEntryForUser(authUserId, String(req.params.id || '').trim(), supabaseService); + await deleteManualEntryForUser(authUserId, String(req.params.id || '').trim()); res.json({ success: true }); } catch (error: any) { res.status(400).json({ error: `Failed to delete manual entry: ${error.message}` }); diff --git a/backend/src/services/manualEntryRepository.ts b/backend/src/services/manualEntryRepository.ts index 284c912..250dee5 100644 --- a/backend/src/services/manualEntryRepository.ts +++ b/backend/src/services/manualEntryRepository.ts @@ -1,8 +1,16 @@ import { randomUUID } from 'node:crypto'; +import { config } from '../config/index.js'; import logger from '../utils/logger.js'; -import type { supabaseService } from './SupabaseService.js'; - -type LegacySupabaseService = typeof supabaseService; +import { + MANUAL_ENTRY_CONTAINER, + buildBaseDocument, + buildDocId, + clampNumber, + queryDocuments, + toOptionalString, + upsertDocument +} from './tradingRecordStore.js'; +import { logTransaction } from './runtimeOrderRepository.js'; export interface ManualEntryRecord { stock_instance_id: string; @@ -25,13 +33,13 @@ export interface ManualEntryRecord { drop_threshold_for_buy?: number | null; } -function getClient(legacyService?: LegacySupabaseService) { - const client = legacyService?.getClient?.(); - if (!client) { - throw new Error('Manual entry store is not configured'); - } - return client; -} +type ManualEntryDocument = ManualEntryRecord & { + id: string; + productId: string; + type: 'manual_entry'; + created_at: string; + updated_at: string; +}; function normalizeNullableNumber(value: unknown): number | null | undefined { if (value === undefined) return undefined; @@ -70,119 +78,93 @@ function normalizeEntry(userId: string, input: Partial, exist }; } -async function insertHistoryRecord(userId: string, entry: ManualEntryRecord, reason: 'Manual Log' | 'Manual Close', legacyService?: LegacySupabaseService): Promise { - const client = getClient(legacyService); - const entryPrice = Number(entry.buy_price || 0); - const exitPrice = Number(entry.sell_price || 0); - const size = Number(entry.quantity || 0); +async function listManualEntryDocuments(userId: string): Promise { + const query = 'SELECT * FROM c WHERE c.productId = @productId AND c.type = @type AND c.user_id = @userId ORDER BY c.created_at DESC'; + return await queryDocuments(MANUAL_ENTRY_CONTAINER, query, [ + { name: '@productId', value: config.PRODUCT_ID }, + { name: '@type', value: 'manual_entry' }, + { name: '@userId', value: userId }, + ]); +} + +async function findManualEntryDocument(userId: string, entryId: string): Promise { + const rows = await queryDocuments(MANUAL_ENTRY_CONTAINER, 'SELECT TOP 1 * FROM c WHERE c.productId = @productId AND c.type = @type AND c.user_id = @userId AND c.stock_instance_id = @entryId', [ + { name: '@productId', value: config.PRODUCT_ID }, + { name: '@type', value: 'manual_entry' }, + { name: '@userId', value: userId }, + { name: '@entryId', value: entryId }, + ]); + return rows[0] || null; +} + +async function insertHistoryRecord(userId: string, entry: ManualEntryRecord, reason: 'Manual Log' | 'Manual Close'): Promise { + const entryPrice = clampNumber(entry.buy_price); + const exitPrice = clampNumber(entry.sell_price); + const size = clampNumber(entry.quantity); if (!(entryPrice > 0) || !(exitPrice > 0) || !(size > 0)) { return; } const pnl = (exitPrice - entryPrice) * size; const pnlPercent = entryPrice > 0 ? ((exitPrice - entryPrice) / entryPrice) * 100 : 0; - - const { error } = await client - .from('trade_history') - .insert([{ - user_id: userId, - symbol: entry.symbol, - side: 'BUY', - entry_price: entryPrice, - exit_price: exitPrice, - size, - pnl, - pnl_percent: pnlPercent, - source: 'MANUAL', - reason, - timestamp: Date.now() - }]); - - if (error) { - logger.warn(`[ManualEntries] Failed to insert history row: ${error.message}`); - } + await logTransaction({ + user_id: userId, + symbol: entry.symbol, + side: 'BUY', + entry_price: entryPrice, + exit_price: exitPrice, + size, + pnl, + pnl_percent: pnlPercent, + reason, + timestamp: Date.now(), + source: 'MANUAL' + }); } -export async function listManualEntriesForUser(userId: string, legacyService?: LegacySupabaseService): Promise { - const client = getClient(legacyService); - const { data, error } = await client - .from('stocks') - .select('*') - .eq('user_id', userId) - .order('created_at', { ascending: false }); - - if (error) { - throw new Error(error.message); - } - - return Array.isArray(data) ? data as ManualEntryRecord[] : []; +export async function listManualEntriesForUser(userId: string): Promise { + const rows = await listManualEntryDocuments(userId); + return rows as ManualEntryRecord[]; } -export async function saveManualEntryForUser( - userId: string, - input: Partial, - legacyService?: LegacySupabaseService -): Promise { - const client = getClient(legacyService); +export async function saveManualEntryForUser(userId: string, input: Partial): Promise { const entryId = String(input.stock_instance_id || '').trim(); - let existing: ManualEntryRecord | null = null; - - if (entryId) { - const { data, error } = await client - .from('stocks') - .select('*') - .eq('stock_instance_id', entryId) - .eq('user_id', userId) - .maybeSingle(); - if (error) { - throw new Error(error.message); - } - existing = data as ManualEntryRecord | null; - } - + const existing = entryId ? await findManualEntryDocument(userId, entryId) : null; const normalized = normalizeEntry(userId, input, existing); if (!normalized.symbol) { throw new Error('Symbol is required'); } + const payload = buildBaseDocument('manual_entry', { + ...normalized, + stock_instance_id: normalized.stock_instance_id, + }, existing?.id || buildDocId('manual_entry', userId, normalized.stock_instance_id)); + + await upsertDocument(MANUAL_ENTRY_CONTAINER, payload as ManualEntryDocument); + if (existing) { - const { error } = await client - .from('stocks') - .update(normalized) - .eq('stock_instance_id', normalized.stock_instance_id) - .eq('user_id', userId); - if (error) { - throw new Error(error.message); - } - if (normalized.sell_price && !existing.sell_price && normalized.buy_price) { - await insertHistoryRecord(userId, normalized, 'Manual Close', legacyService); + await insertHistoryRecord(userId, normalized, 'Manual Close'); } - return normalized; + } else if (normalized.sell_price && normalized.buy_price) { + await insertHistoryRecord(userId, normalized, 'Manual Log'); } - const { error } = await client - .from('stocks') - .insert([normalized]); - if (error) { - throw new Error(error.message); - } - - if (normalized.sell_price && normalized.buy_price) { - await insertHistoryRecord(userId, normalized, 'Manual Log', legacyService); - } return normalized; } -export async function deleteManualEntryForUser(userId: string, entryId: string, legacyService?: LegacySupabaseService): Promise { - const client = getClient(legacyService); - const { error } = await client - .from('stocks') - .delete() - .eq('stock_instance_id', entryId) - .eq('user_id', userId); - - if (error) { - throw new Error(error.message); +export async function deleteManualEntryForUser(userId: string, entryId: string): Promise { + const existing = await findManualEntryDocument(userId, entryId); + if (!existing) { + return; } + const tombstone = { + ...existing, + active: false, + status: 'deleted', + updated_at: new Date().toISOString(), + deleted_at: new Date().toISOString(), + }; + await upsertDocument(MANUAL_ENTRY_CONTAINER, tombstone); + logger.info(`[ManualEntries] Marked ${toOptionalString(entryId) || 'unknown'} deleted in Cosmos`); } diff --git a/backend/src/services/orderActivityRepository.ts b/backend/src/services/orderActivityRepository.ts index 50d815b..928c2bb 100644 --- a/backend/src/services/orderActivityRepository.ts +++ b/backend/src/services/orderActivityRepository.ts @@ -1,56 +1,51 @@ +import { config } from '../config/index.js'; import logger from '../utils/logger.js'; -import type { supabaseService } from './SupabaseService.js'; +import { ORDER_CONTAINER, queryDocuments } from './tradingRecordStore.js'; -type LegacySupabaseService = typeof supabaseService; - -function getClient(legacyService?: LegacySupabaseService) { - return legacyService?.getClient?.() ?? null; -} +type OrderActivityDocument = { + id?: string; + order_id?: string | null; + user_id?: string | null; + profile_id?: string | null; + symbol?: string | null; + type?: string | null; + side?: string | null; + qty?: number | string | null; + quantity?: number | string | null; + price?: number | string | null; + status?: string | null; + timestamp?: number | string | null; + filled_at?: string | null; + created_at?: string | null; + trade_id?: string | null; + action?: string | null; + source?: string | null; + stop_loss?: number | string | null; + take_profit?: number | string | null; + sub_tag?: string | null; +}; export async function listRecentOrders(options: { userId?: string; limit?: number; - legacyService?: LegacySupabaseService; }): Promise { - const client = getClient(options.legacyService); - if (!client) return []; - - const orderColumnsV3 = 'id,order_id,user_id,profile_id,symbol,type,side,qty,quantity,price,status,timestamp,filled_at,created_at,trade_id,action,source,stop_loss,take_profit,sub_tag'; - const orderColumnsV2 = 'id,order_id,user_id,profile_id,symbol,type,side,qty,quantity,price,status,timestamp,filled_at,created_at,trade_id,action,source,stop_loss,take_profit'; - const orderColumnsLegacy = 'id,order_id,user_id,profile_id,symbol,type,side,qty,price,status,timestamp,created_at'; - const limit = Math.max(1, Math.min(5000, Math.floor(Number(options.limit || 5000)))); - - const runQuery = async (columns: string) => { - let query = client - .from('orders') - .select(columns) - .order('created_at', { ascending: false }) - .limit(limit); + try { + const filters = ['c.productId = @productId', 'c.type = @type']; + const parameters: Array<{ name: string; value: unknown }> = [ + { name: '@productId', value: config.PRODUCT_ID }, + { name: '@type', value: 'trade_order' }, + ]; if (options.userId) { - query = query.eq('user_id', options.userId); + filters.push('c.user_id = @userId'); + parameters.push({ name: '@userId', value: options.userId }); } - return query; - }; - - try { - const { data: v3Data, error: v3Error } = await runQuery(orderColumnsV3); - if (!v3Error) return v3Data || []; - - logger.warn(`[OrderActivityRepo] V3 order query failed, falling back to v2 columns: ${v3Error.message}`); - const { data: v2Data, error: v2Error } = await runQuery(orderColumnsV2); - if (!v2Error) return v2Data || []; - - logger.warn(`[OrderActivityRepo] V2 order query failed, falling back to legacy columns: ${v2Error.message}`); - const { data: legacyData, error: legacyError } = await runQuery(orderColumnsLegacy); - if (legacyError) { - logger.error(`[OrderActivityRepo] Legacy order query failed: ${legacyError.message}`); - return []; - } - return legacyData || []; + const limit = Math.max(1, Math.min(5000, Math.floor(Number(options.limit || 5000)))); + const query = `SELECT TOP ${limit} * FROM c WHERE ${filters.join(' AND ')} ORDER BY c.created_at DESC`; + return await queryDocuments(ORDER_CONTAINER, query, parameters); } catch (error: any) { - logger.error(`[OrderActivityRepo] Recent order lookup unexpected error: ${error.message}`); + logger.error(`[OrderActivityRepo] Recent order lookup failed: ${error.message}`); return []; } } diff --git a/backend/src/services/reconciliationSubTagRepairService.ts b/backend/src/services/reconciliationSubTagRepairService.ts index aa4d04e..eb233e4 100644 --- a/backend/src/services/reconciliationSubTagRepairService.ts +++ b/backend/src/services/reconciliationSubTagRepairService.ts @@ -2,9 +2,9 @@ import { config } from '../config/index.js'; import logger from '../utils/logger.js'; import { observabilityService } from './observabilityService.js'; import { - type ReconciliationSubTagRepairSummary, - supabaseService + type ReconciliationSubTagRepairSummary } from './SupabaseService.js'; +import { repairMissingSubTagsForProfile } from './runtimeOrderRepository.js'; export interface ReconciliationSubTagRepairContext { profileId: string; @@ -36,7 +36,7 @@ export class ReconciliationSubTagRepairService { const lookbackHours = Math.max(1, Math.floor(Number(config.RECON_SUBTAG_REPAIR_LOOKBACK_HOURS || 720))); const maxUpdatesPerProfile = Math.max(1, Math.floor(Number(config.RECON_SUBTAG_REPAIR_MAX_UPDATES_PER_PROFILE || 500))); - const result = await supabaseService.repairMissingSubTagsForProfile({ + const result = await repairMissingSubTagsForProfile({ profileId: ctx.profileId, lookbackHours, maxRows: maxUpdatesPerProfile, diff --git a/backend/src/services/runtimeOrderRepository.ts b/backend/src/services/runtimeOrderRepository.ts index 03b1e91..90d256e 100644 --- a/backend/src/services/runtimeOrderRepository.ts +++ b/backend/src/services/runtimeOrderRepository.ts @@ -1,97 +1,1334 @@ -import { supabaseService } from './SupabaseService.js'; +import { randomUUID } from 'node:crypto'; +import { config } from '../config/index.js'; +import { + normalizeOrderAction, + normalizeOrderStatus, + normalizeOrderType, + normalizeTradeSide +} from '../domain/tradingEnums.js'; +import logger from '../utils/logger.js'; +import { + buildAlpacaSubTag, + isBytelystSubTag, + shouldAttachAlpacaSubTag, + subTagBelongsToProfile, + type AlpacaSubTagIntent +} from '../utils/alpacaSubTag.js'; +import { SymbolMapper } from '../utils/symbolMapper.js'; +import type { + FilledLifecycleOrderRow, + ReconciliationBackfillAuditInsert, + ReconciliationBackfillAuditQuery, + ReconciliationBackfillAuditRow, + ReconciliationBackfillBatchSummary, + ReconciliationBackfillOrderInsert, + ReconciliationSubTagRepairSummary, + StaleOrderScope, + VirtualOpenPosition +} from './SupabaseService.js'; +import { + ORDER_CONTAINER, + RECONCILIATION_AUDIT_CONTAINER, + TRADE_HISTORY_CONTAINER, + buildBaseDocument, + buildDocId, + clampNumber, + isCosmosConfigured, + nowIso, + queryDocuments, + toOptionalNumber, + toOptionalString, + upsertDocument +} from './tradingRecordStore.js'; -export const logOrder = (...args: Parameters) => - supabaseService.logOrder(...args); +type OrderDocument = FilledLifecycleOrderRow & { + id: string; + productId: string; + type: 'trade_order'; + created_at: string; + updated_at: string; +}; -export const logTransaction = (...args: Parameters) => - supabaseService.logTransaction(...args); +type TradeHistoryDocument = { + id: string; + productId: string; + type: 'trade_history'; + user_id: string; + profile_id?: string; + symbol: string; + side: string; + entry_price: number; + exit_price: number; + size: number; + pnl: number; + pnl_percent: number; + reason: string; + timestamp: number; + created_at: string; + updated_at: string; + stop_loss?: number; + take_profit?: number; + rules_metadata?: Record; + trade_id?: string; + source?: 'BOT' | 'MANUAL'; +}; -export const getOpenOrdersForProfile = (...args: Parameters) => - supabaseService.getOpenOrdersForProfile(...args); +type ReconciliationAuditDocument = Omit & { + id: string; + productId: string; + type: 'reconciliation_backfill_audit'; + updated_at: string; +}; -export const getRecentlyClosedOrdersForProfile = (...args: Parameters) => - supabaseService.getRecentlyClosedOrdersForProfile(...args); +const OPEN_ORDER_STATUSES = ['pending_new', 'accepted', 'pending', 'new', 'partially_filled', 'partially-filled']; +const CLOSED_ORDER_STATUSES = ['filled', 'canceled', 'expired', 'rejected', 'unknown']; +const FILLED_ORDER_STATUSES = ['filled', 'partially_filled', 'partially-filled']; -export const getPendingOrdersForProfile = (...args: Parameters) => - supabaseService.getPendingOrdersForProfile(...args); +function cosmosEnabled(): boolean { + return isCosmosConfigured(); +} -export const getLatestOrder = (...args: Parameters) => - supabaseService.getLatestOrder(...args); +function ensureCosmos(): void { + if (!cosmosEnabled()) { + throw new Error('Cosmos DB is required for runtime trading persistence'); + } +} -export const getOrderByTradeId = (...args: Parameters) => - supabaseService.getOrderByTradeId(...args); +function isUuid(value: string | undefined | null): boolean { + const normalized = String(value || '').trim(); + return /^[0-9a-f]{8}-[0-9a-f]{4}-[1-5][0-9a-f]{3}-[89ab][0-9a-f]{3}-[0-9a-f]{12}$/i.test(normalized); +} -export const getLatestEntryOrder = (...args: Parameters) => - supabaseService.getLatestEntryOrder(...args); +function orderStatusRank(status: string): number { + const normalized = String(status || '').trim().toLowerCase(); + if (normalized === 'filled') return 6; + if (['canceled', 'expired', 'rejected', 'unknown'].includes(normalized)) return 5; + if (['partially_filled', 'partially-filled'].includes(normalized)) return 4; + if (['accepted', 'pending', 'new'].includes(normalized)) return 3; + return 1; +} -export const getLatestFilledEntry = (...args: Parameters) => - supabaseService.getLatestFilledEntry(...args); +function pickMostReliableOrderStatus(existingStatus: string, incomingStatus: string): string { + return orderStatusRank(existingStatus) >= orderStatusRank(incomingStatus) + ? normalizeOrderStatus(existingStatus) + : normalizeOrderStatus(incomingStatus); +} -export const getLatestEntryRiskOrder = (...args: Parameters) => - supabaseService.getLatestEntryRiskOrder(...args); +function inferLifecycleAction(action: unknown, side: unknown): 'ENTRY' | 'EXIT' { + const normalizedAction = normalizeOrderAction(String(action || '')); + if (normalizedAction === 'ENTRY' || normalizedAction === 'EXIT') { + return normalizedAction; + } + return normalizeTradeSide(String(side || 'BUY')) === 'SELL' ? 'EXIT' : 'ENTRY'; +} -export const getVirtualOpenPosition = (...args: Parameters) => - supabaseService.getVirtualOpenPosition(...args); +function resolveSubTagIntent(action: unknown): AlpacaSubTagIntent { + const normalizedAction = normalizeOrderAction(String(action || '')); + if (normalizedAction === 'ENTRY' || normalizedAction === 'EXIT') { + return normalizedAction; + } + return 'UNKNOWN'; +} -export const getVirtualOpenPositionForTrade = (...args: Parameters) => - supabaseService.getVirtualOpenPositionForTrade(...args); +function resolvePersistedOrderSubTag(order: { + profile_id?: string; + trade_id?: string; + action?: string; + sub_tag?: string; + subTag?: string; +}): string | undefined { + const explicit = String(order.sub_tag || order.subTag || '').trim(); + if (explicit) return explicit; -export const hasActiveOrderForTradeId = (...args: Parameters) => - supabaseService.hasActiveOrderForTradeId(...args); + const profileId = String(order.profile_id || '').trim(); + const tradeId = String(order.trade_id || '').trim(); + if (!profileId || !tradeId || !shouldAttachAlpacaSubTag({ profileId })) { + return undefined; + } -export const hasFinalizedTradeHistory = (...args: Parameters) => - supabaseService.hasFinalizedTradeHistory(...args); + return buildAlpacaSubTag({ + profileId, + tradeId, + intent: resolveSubTagIntent(order.action) + }) || undefined; +} -export const hasLifecycleEntryOrder = (...args: Parameters) => - supabaseService.hasLifecycleEntryOrder(...args); +function buildLifecycleSymbolCandidates(symbol: string): string[] { + const normalized = String(symbol || '').trim(); + if (!normalized) return []; -export const hasLifecycleEntryOrderWithProfileSubTag = (...args: Parameters) => - supabaseService.hasLifecycleEntryOrderWithProfileSubTag(...args); + const provider = String(config.EXECUTION_PROVIDER || '').trim() || 'alpaca'; + const variants = new Set(); + variants.add(normalized); + variants.add(normalized.toUpperCase()); -export const isTradeLifecycleClosed = (...args: Parameters) => - supabaseService.isTradeLifecycleClosed(...args); + try { + const mapped = SymbolMapper.toTradeSymbol(normalized, provider); + if (mapped) { + variants.add(mapped); + variants.add(String(mapped).toUpperCase()); + } + } catch { + // Keep the direct symbol variants only. + } -export const getExistingOrderIds = (...args: Parameters) => - supabaseService.getExistingOrderIds(...args); + return Array.from(variants).filter(Boolean); +} -export const getKnownTradeIdsForProfile = (...args: Parameters) => - supabaseService.getKnownTradeIdsForProfile(...args); +function toTimestampMs(value: unknown, fallback: number): number { + if (typeof value === 'number') { + if (Number.isFinite(value) && value > 1_000_000_000_000) return value; + if (Number.isFinite(value) && value > 0) return value * 1000; + return fallback; + } + if (typeof value === 'string') { + const trimmed = value.trim(); + if (/^\d+(\.\d+)?$/.test(trimmed)) { + return toTimestampMs(Number(trimmed), fallback); + } + const parsed = Date.parse(trimmed); + if (Number.isFinite(parsed) && parsed > 0) return parsed; + } + return fallback; +} -export const updateOrderStatus = (...args: Parameters>) => - supabaseService.updateOrderStatus?.(...args); +function orderTimestamp(row: Partial, fallback: number): number { + return toTimestampMs(row.timestamp, toTimestampMs(row.created_at, fallback)); +} -export const getFilledLifecycleOrdersForProfile = (...args: Parameters) => - supabaseService.getFilledLifecycleOrdersForProfile(...args); +function sortByCreatedAtAsc>(rows: T[]): T[] { + return [...rows].sort((a, b) => orderTimestamp(a, 0) - orderTimestamp(b, 0)); +} -export const getFilledLifecycleOrdersForUser = (...args: Parameters) => - supabaseService.getFilledLifecycleOrdersForUser(...args); +function normalizeOrderDocument(order: { + user_id: string; + profile_id?: string; + order_id?: string; + symbol: string; + type: string; + side: string; + qty: number; + price: number; + status: string; + timestamp: number; + stop_loss?: number; + take_profit?: number; + trade_id?: string; + action?: string; + sub_tag?: string; + subTag?: string; +}): OrderDocument { + const normalizedStatus = normalizeOrderStatus(order.status); + const normalizedAction = normalizeOrderAction(order.action) || inferLifecycleAction(undefined, order.side); + const payload: OrderDocument = { + ...(buildBaseDocument('trade_order', { + user_id: String(order.user_id || '').trim(), + profile_id: toOptionalString(order.profile_id), + order_id: toOptionalString(order.order_id), + symbol: String(order.symbol || '').trim(), + type: normalizeOrderType(order.type), + side: normalizeTradeSide(order.side), + qty: clampNumber(order.qty), + quantity: clampNumber(order.qty), + price: clampNumber(order.price), + status: normalizedStatus, + timestamp: clampNumber(order.timestamp, Date.now()), + stop_loss: toOptionalNumber(order.stop_loss), + take_profit: toOptionalNumber(order.take_profit), + trade_id: toOptionalString(order.trade_id), + action: normalizedAction, + sub_tag: resolvePersistedOrderSubTag(order), + }, buildDocId('trade_order', order.profile_id, order.order_id, order.trade_id, randomUUID()))) as OrderDocument, + type: 'trade_order' + }; -export const getFilledLifecycleOrdersGlobal = (...args: Parameters) => - supabaseService.getFilledLifecycleOrdersGlobal(...args); + return payload; +} -export const insertReconciliationBackfillAuditRows = (...args: Parameters) => - supabaseService.insertReconciliationBackfillAuditRows(...args); +async function queryOrders(params?: { + userId?: string; + profileId?: string; + orderId?: string; + tradeId?: string; + symbol?: string; + symbols?: string[]; + statuses?: string[]; + action?: string; + requireSubTag?: boolean; + orderBy?: 'created_at' | 'updated_at' | 'timestamp'; + ascending?: boolean; + limit?: number; +}): Promise { + ensureCosmos(); -export const upsertReconciliationBackfillOrders = (...args: Parameters) => - supabaseService.upsertReconciliationBackfillOrders(...args); + const filters = ['c.productId = @productId', 'c.type = @type']; + const parameters: Array<{ name: string; value: unknown }> = [ + { name: '@productId', value: config.PRODUCT_ID }, + { name: '@type', value: 'trade_order' }, + ]; -export const getReconciliationBackfillAuditRows = (...args: Parameters) => - supabaseService.getReconciliationBackfillAuditRows(...args); + if (params?.userId) { + filters.push('c.user_id = @userId'); + parameters.push({ name: '@userId', value: params.userId }); + } + if (params?.profileId) { + filters.push('c.profile_id = @profileId'); + parameters.push({ name: '@profileId', value: params.profileId }); + } + if (params?.orderId) { + filters.push('(c.order_id = @orderId OR c.id = @orderId)'); + parameters.push({ name: '@orderId', value: params.orderId }); + } + if (params?.tradeId) { + filters.push('c.trade_id = @tradeId'); + parameters.push({ name: '@tradeId', value: params.tradeId }); + } + if (params?.symbol) { + filters.push('c.symbol = @symbol'); + parameters.push({ name: '@symbol', value: params.symbol }); + } + if (params?.symbols && params.symbols.length > 0) { + filters.push('ARRAY_CONTAINS(@symbols, c.symbol)'); + parameters.push({ name: '@symbols', value: params.symbols }); + } + if (params?.statuses && params.statuses.length > 0) { + filters.push('ARRAY_CONTAINS(@statuses, c.status)'); + parameters.push({ name: '@statuses', value: params.statuses.map((status) => normalizeOrderStatus(status)) }); + } + if (params?.action) { + filters.push('c.action = @action'); + parameters.push({ name: '@action', value: normalizeOrderAction(params.action) || params.action }); + } + if (params?.requireSubTag) { + filters.push('IS_DEFINED(c.sub_tag) AND c.sub_tag != ""'); + } -export const getReconciliationBackfillBatchSummaries = (...args: Parameters) => - supabaseService.getReconciliationBackfillBatchSummaries(...args); + const orderBy = params?.orderBy || 'created_at'; + const direction = params?.ascending ? 'ASC' : 'DESC'; + const limit = Math.max(1, Math.min(10_000, Math.floor(Number(params?.limit || 1000)))); -export const revertBackfillBatch = (...args: Parameters) => - supabaseService.revertBackfillBatch(...args); + const query = `SELECT TOP ${limit} * FROM c WHERE ${filters.join(' AND ')} ORDER BY c.${orderBy} ${direction}`; + return await queryDocuments(ORDER_CONTAINER, query, parameters); +} -export const repairMissingSubTagsForProfile = (...args: Parameters) => - supabaseService.repairMissingSubTagsForProfile(...args); +async function upsertOrderDocument(document: OrderDocument): Promise { + const now = nowIso(); + return await upsertDocument(ORDER_CONTAINER, { + ...document, + updated_at: now, + created_at: String(document.created_at || now), + }); +} -export const getStaleOrders = (...args: Parameters) => - supabaseService.getStaleOrders(...args); +async function queryTradeHistory(params?: { + userId?: string; + profileId?: string; + tradeId?: string; + symbol?: string; + limit?: number; +}): Promise { + ensureCosmos(); -export const getExpiredOrUnknownOrders = (...args: Parameters) => - supabaseService.getExpiredOrUnknownOrders(...args); + const filters = ['c.productId = @productId', 'c.type = @type']; + const parameters: Array<{ name: string; value: unknown }> = [ + { name: '@productId', value: config.PRODUCT_ID }, + { name: '@type', value: 'trade_history' }, + ]; -export const isReconciliationBackfillAuditAvailable = (...args: Parameters) => - supabaseService.isReconciliationBackfillAuditAvailable(...args); + if (params?.userId) { + filters.push('c.user_id = @userId'); + parameters.push({ name: '@userId', value: params.userId }); + } + if (params?.profileId) { + filters.push('c.profile_id = @profileId'); + parameters.push({ name: '@profileId', value: params.profileId }); + } + if (params?.tradeId) { + filters.push('c.trade_id = @tradeId'); + parameters.push({ name: '@tradeId', value: params.tradeId }); + } + if (params?.symbol) { + filters.push('c.symbol = @symbol'); + parameters.push({ name: '@symbol', value: params.symbol }); + } + + const limit = Math.max(1, Math.min(10_000, Math.floor(Number(params?.limit || 5000)))); + const query = `SELECT TOP ${limit} * FROM c WHERE ${filters.join(' AND ')} ORDER BY c.created_at DESC`; + return await queryDocuments(TRADE_HISTORY_CONTAINER, query, parameters); +} + +async function saveTradeHistoryDocument(transaction: { + user_id: string; + profile_id?: string; + symbol: string; + side: string; + entry_price: number; + exit_price: number; + size: number; + pnl: number; + pnl_percent: number; + reason: string; + timestamp: number; + stop_loss?: number; + take_profit?: number; + rules_metadata?: Record; + trade_id?: string; + source?: 'BOT' | 'MANUAL'; +}): Promise { + ensureCosmos(); + + const document = buildBaseDocument('trade_history', { + user_id: transaction.user_id, + profile_id: toOptionalString(transaction.profile_id), + symbol: String(transaction.symbol || '').trim(), + side: normalizeTradeSide(transaction.side), + entry_price: clampNumber(transaction.entry_price), + exit_price: clampNumber(transaction.exit_price), + size: clampNumber(transaction.size), + pnl: clampNumber(transaction.pnl), + pnl_percent: clampNumber(transaction.pnl_percent), + reason: String(transaction.reason || '').trim(), + timestamp: clampNumber(transaction.timestamp, Date.now()), + stop_loss: toOptionalNumber(transaction.stop_loss), + take_profit: toOptionalNumber(transaction.take_profit), + rules_metadata: transaction.rules_metadata, + trade_id: toOptionalString(transaction.trade_id), + source: transaction.source || 'BOT', + }, buildDocId('trade_history', transaction.profile_id, transaction.trade_id, transaction.timestamp, randomUUID())); + + return await upsertDocument(TRADE_HISTORY_CONTAINER, document as TradeHistoryDocument); +} + +async function queryAuditDocuments(params?: { + batchId?: string; + profileId?: string; + symbol?: string; + decisions?: string[]; + limit?: number; +}): Promise { + ensureCosmos(); + const filters = ['c.productId = @productId', 'c.type = @type']; + const parameters: Array<{ name: string; value: unknown }> = [ + { name: '@productId', value: config.PRODUCT_ID }, + { name: '@type', value: 'reconciliation_backfill_audit' }, + ]; + + if (params?.batchId) { + filters.push('c.batch_id = @batchId'); + parameters.push({ name: '@batchId', value: params.batchId }); + } + if (params?.profileId) { + filters.push('c.profile_id = @profileId'); + parameters.push({ name: '@profileId', value: params.profileId }); + } + if (params?.symbol) { + filters.push('c.symbol = @symbol'); + parameters.push({ name: '@symbol', value: params.symbol }); + } + if (params?.decisions && params.decisions.length > 0) { + filters.push('ARRAY_CONTAINS(@decisions, c.decision)'); + parameters.push({ name: '@decisions', value: params.decisions }); + } + + const limit = Math.max(1, Math.min(10_000, Math.floor(Number(params?.limit || 5000)))); + const query = `SELECT TOP ${limit} * FROM c WHERE ${filters.join(' AND ')} ORDER BY c.created_at DESC`; + return await queryDocuments(RECONCILIATION_AUDIT_CONTAINER, query, parameters); +} + +export async function logTransaction(transaction: { + user_id: string; + profile_id?: string; + symbol: string; + side: string; + entry_price: number; + exit_price: number; + size: number; + pnl: number; + pnl_percent: number; + reason: string; + timestamp: number; + stop_loss?: number; + take_profit?: number; + rules_metadata?: Record; + trade_id?: string; + source?: 'BOT' | 'MANUAL'; +}) { + try { + await saveTradeHistoryDocument(transaction); + logger.info(`Logged trade history for ${transaction.user_id} (${transaction.symbol}) to Cosmos`); + } catch (error: any) { + logger.error(`[RuntimeOrderRepo] Trade history persistence failed: ${error.message}`); + } +} + +export async function logOrder(order: { + user_id: string; + profile_id?: string; + order_id?: string; + symbol: string; + type: string; + side: string; + qty: number; + price: number; + status: string; + timestamp: number; + stop_loss?: number; + take_profit?: number; + trade_id?: string; + action?: string; + sub_tag?: string; + subTag?: string; +}) { + try { + ensureCosmos(); + const incoming = normalizeOrderDocument(order); + const existingRows = incoming.order_id + ? await queryOrders({ + orderId: String(incoming.order_id), + profileId: String(incoming.profile_id || '').trim() || undefined, + limit: 1 + }) + : []; + const existing = existingRows[0]; + + if (!existing) { + await upsertOrderDocument(incoming); + return; + } + + const existingStatus = normalizeOrderStatus(String(existing.status || 'pending_new')); + const mergedStatus = pickMostReliableOrderStatus(existingStatus, String(incoming.status || 'pending_new')); + const mergedQty = orderStatusRank(existingStatus) > orderStatusRank(String(incoming.status || '')) + ? clampNumber(existing.qty || existing.quantity) + : clampNumber(incoming.qty || incoming.quantity); + const mergedPrice = orderStatusRank(existingStatus) > orderStatusRank(String(incoming.status || '')) + ? clampNumber(existing.price) + : clampNumber(incoming.price); + const mergedTimestamp = Math.max(orderTimestamp(existing, 0), orderTimestamp(incoming, 0)); + const mergedSubTag = toOptionalString(incoming.sub_tag) || toOptionalString(existing.sub_tag); + + await upsertOrderDocument({ + ...existing, + ...incoming, + id: existing.id, + status: mergedStatus, + qty: mergedQty, + quantity: mergedQty, + price: mergedPrice, + timestamp: mergedTimestamp, + filled_at: incoming.filled_at || existing.filled_at, + sub_tag: mergedSubTag, + }); + } catch (error: any) { + logger.error(`[RuntimeOrderRepo] Order persistence failed: ${error.message}`); + } +} + +export async function getOpenOrdersForProfile(profileId: string): Promise { + if (!profileId) return []; + return await queryOrders({ + profileId, + statuses: OPEN_ORDER_STATUSES, + orderBy: 'created_at', + ascending: true, + limit: 2000 + }); +} + +export async function getRecentlyClosedOrdersForProfile(profileId: string, minutes: number = 10): Promise { + if (!profileId) return []; + const sinceMs = Date.now() - Math.max(1, Math.floor(minutes)) * 60 * 1000; + const rows = await queryOrders({ + profileId, + statuses: CLOSED_ORDER_STATUSES, + orderBy: 'updated_at', + ascending: true, + limit: 2000 + }); + return rows.filter((row) => toTimestampMs(row.updated_at, 0) >= sinceMs); +} + +export async function getPendingOrdersForProfile(profileId: string): Promise { + if (!profileId) return []; + return await queryOrders({ profileId, statuses: ['pending_new'], limit: 1000 }); +} + +export async function getLatestOrder(userId: string, symbol: string): Promise { + const rows = await queryOrders({ + userId, + symbol, + orderBy: 'timestamp', + ascending: false, + limit: 1 + }); + return rows[0] || null; +} + +export async function getOrderByTradeId(tradeId: string, profileId?: string): Promise { + const rows = await queryOrders({ + tradeId: String(tradeId || '').trim(), + profileId: String(profileId || '').trim() || undefined, + orderBy: 'created_at', + ascending: false, + limit: 1 + }); + const row = rows[0]; + if (!row) return null; + return { + order_id: row.order_id, + status: row.status, + qty: row.qty, + price: row.price, + symbol: row.symbol, + action: row.action, + stop_loss: row.stop_loss, + take_profit: row.take_profit, + }; +} + +export async function getLatestEntryOrder(profileId: string | undefined, symbol: string, userId?: string): Promise { + const rows = await queryOrders({ + profileId: String(profileId || '').trim() || undefined, + userId: String(userId || '').trim() || undefined, + symbol, + action: 'ENTRY', + orderBy: 'created_at', + ascending: false, + limit: 1 + }); + return rows[0] || null; +} + +export async function getLatestFilledEntry(userId: string, symbol: string, profileId?: string): Promise { + const rows = await queryOrders({ + profileId: String(profileId || '').trim() || undefined, + userId: profileId ? undefined : userId, + symbol, + action: 'ENTRY', + statuses: ['filled', 'partially_filled'], + orderBy: 'timestamp', + ascending: false, + limit: 1 + }); + return rows[0] || null; +} + +export async function getLatestEntryRiskOrder(profileId: string, symbol: string, side?: 'BUY' | 'SELL'): Promise { + const rows = await queryOrders({ + profileId, + symbol, + action: 'ENTRY', + statuses: ['filled', 'partially_filled'], + orderBy: 'created_at', + ascending: false, + limit: 250 + }); + return rows.find((row) => { + if (side && normalizeTradeSide(String(row.side || 'BUY')) !== side) return false; + return clampNumber(row.stop_loss) > 0 || clampNumber(row.take_profit) > 0; + }) || null; +} + +export async function hasActiveOrderForTradeId(tradeId: string, profileId?: string): Promise { + const rows = await queryOrders({ + tradeId: String(tradeId || '').trim(), + profileId: isUuid(profileId) ? profileId : undefined, + statuses: ['pending_new', 'accepted', 'new', 'partially_filled'], + limit: 1 + }); + return rows.length > 0; +} + +export async function hasFinalizedTradeHistory(tradeId: string, profileId?: string, symbol?: string): Promise { + const rows = await queryTradeHistory({ + tradeId: String(tradeId || '').trim(), + profileId: String(profileId || '').trim() || undefined, + symbol, + limit: 100 + }); + return rows.some((row) => !String(row.reason || '').toLowerCase().includes('partial exit')); +} + +export async function hasLifecycleEntryOrder(tradeId: string, profileId?: string, symbol?: string): Promise { + const rows = await queryOrders({ + tradeId: String(tradeId || '').trim(), + profileId: isUuid(profileId) ? profileId : undefined, + symbol, + statuses: ['filled', 'partially_filled'], + limit: 250 + }); + return rows.some((row) => inferLifecycleAction(row.action, row.side) === 'ENTRY'); +} + +export async function hasLifecycleEntryOrderWithProfileSubTag(tradeId: string, profileId: string, symbol?: string): Promise { + const rows = await queryOrders({ + tradeId: String(tradeId || '').trim(), + profileId: String(profileId || '').trim(), + symbol, + statuses: ['filled', 'partially_filled'], + action: 'ENTRY', + requireSubTag: true, + limit: 250 + }); + + return rows.some((row) => { + const subTag = String(row.sub_tag || '').trim(); + return Boolean(subTag && isBytelystSubTag(subTag) && subTagBelongsToProfile(subTag, profileId)); + }); +} + +export async function isTradeLifecycleClosed(tradeId: string, profileId?: string, symbol?: string): Promise { + const rows = await queryOrders({ + tradeId: String(tradeId || '').trim(), + profileId: isUuid(profileId) ? profileId : undefined, + symbol, + statuses: ['filled', 'partially_filled'], + limit: 2000 + }); + + let entryQty = 0; + let exitQty = 0; + for (const row of rows) { + const qty = clampNumber(row.qty || row.quantity); + if (!(qty > 0)) continue; + const action = inferLifecycleAction(row.action, row.side); + if (action === 'ENTRY') entryQty += qty; + if (action === 'EXIT') exitQty += qty; + } + if (entryQty > 0 && exitQty >= entryQty - 1e-8) { + return true; + } + + const historyRows = await queryTradeHistory({ + tradeId: String(tradeId || '').trim(), + profileId: isUuid(profileId) ? profileId : undefined, + symbol, + limit: 250 + }); + if (historyRows.length === 0) return false; + + let finalizedRows = 0; + let partialExitQty = 0; + for (const row of historyRows) { + const reason = String(row.reason || '').toLowerCase(); + const size = clampNumber(row.size); + if (reason.includes('partial exit')) { + partialExitQty += size; + continue; + } + finalizedRows += 1; + } + return finalizedRows > 0 || (entryQty > 0 && partialExitQty >= entryQty - 1e-8); +} + +export async function getExistingOrderIds(orderIds: string[], profileId?: string): Promise> { + const normalizedIds = Array.from(new Set(orderIds.map((id) => String(id || '').trim()).filter(Boolean))); + if (normalizedIds.length === 0) return new Set(); + + const rows = await queryOrders({ + profileId: isUuid(profileId) ? profileId : undefined, + limit: Math.max(normalizedIds.length, 100) + }); + + const found = new Set(); + for (const row of rows) { + const orderId = String(row.order_id || '').trim(); + if (orderId && normalizedIds.includes(orderId)) { + found.add(orderId); + } + } + return found; +} + +export async function getKnownTradeIdsForProfile(profileId: string, limit: number = 2000): Promise { + const rows = await queryOrders({ + profileId, + orderBy: 'created_at', + ascending: false, + limit + }); + const tradeIds = new Set(); + for (const row of rows) { + const tradeId = String(row.trade_id || '').trim(); + if (tradeId) { + tradeIds.add(tradeId); + } + } + return Array.from(tradeIds).slice(0, Math.max(1, Math.min(10000, Math.floor(limit)))); +} + +export async function updateOrderStatus(orderId: string, status: string, filledAt?: Date, price?: number, qty?: number) { + const rows = await queryOrders({ orderId: String(orderId || '').trim(), limit: 2 }); + const timestamp = nowIso(); + await Promise.all(rows.map(async (row) => { + await upsertOrderDocument({ + ...row, + status: normalizeOrderStatus(status), + updated_at: timestamp, + filled_at: filledAt ? filledAt.toISOString() : row.filled_at, + price: price && price > 0 ? price : row.price, + qty: qty && qty > 0 ? qty : row.qty, + quantity: qty && qty > 0 ? qty : (row.quantity || row.qty), + }); + })); +} + +async function fetchFilledLifecycleOrders(options: { + userId?: string; + profileId?: string; + symbols?: string[]; + maxRows?: number; +}): Promise<{ rows: FilledLifecycleOrderRow[]; truncated: boolean }> { + const limit = Math.max(1000, Math.min(200_000, Math.floor(Number(options.maxRows || 50_000)))); + const rows = await queryOrders({ + userId: String(options.userId || '').trim() || undefined, + profileId: String(options.profileId || '').trim() || undefined, + statuses: FILLED_ORDER_STATUSES, + limit + }); + const safeSymbols = Array.isArray(options.symbols) ? new Set(options.symbols.filter(Boolean)) : null; + const filtered = safeSymbols + ? rows.filter((row) => safeSymbols.has(String(row.symbol || ''))) + : rows; + return { + rows: sortByCreatedAtAsc(filtered), + truncated: filtered.length >= limit + }; +} + +export async function getFilledLifecycleOrdersForProfile(profileId: string, symbols?: string[]): Promise { + return (await fetchFilledLifecycleOrders({ profileId, symbols })).rows; +} + +export async function getFilledLifecycleOrdersForUser(options: { + userId: string; + profileId?: string; + symbols?: string[]; + maxRows?: number; +}): Promise<{ rows: FilledLifecycleOrderRow[]; truncated: boolean }> { + return await fetchFilledLifecycleOrders(options); +} + +export async function getFilledLifecycleOrdersGlobal(options?: { + profileId?: string; + symbols?: string[]; + maxRows?: number; +}): Promise<{ rows: FilledLifecycleOrderRow[]; truncated: boolean }> { + return await fetchFilledLifecycleOrders(options || {}); +} + +export async function insertReconciliationBackfillAuditRows(rows: ReconciliationBackfillAuditInsert[]): Promise { + try { + ensureCosmos(); + await Promise.all(rows.map(async (row, index) => { + const doc = buildBaseDocument('reconciliation_backfill_audit', { + ...row, + created_at: row.applied_at || row.reverted_at || nowIso(), + updated_at: row.reverted_at || row.applied_at || nowIso(), + id: String(index + 1), + }, buildDocId('reconciliation_audit', row.batch_id, row.trade_id, row.exchange_order_id, index)); + await upsertDocument(RECONCILIATION_AUDIT_CONTAINER, doc as ReconciliationAuditDocument); + })); + return true; + } catch (error: any) { + logger.error(`[RuntimeOrderRepo] Failed to save reconciliation audit rows: ${error.message}`); + return false; + } +} + +export async function upsertReconciliationBackfillOrders(rows: ReconciliationBackfillOrderInsert[]): Promise { + try { + await Promise.all(rows.map(async (row) => { + await logOrder({ + user_id: row.user_id, + profile_id: row.profile_id, + order_id: row.order_id, + symbol: row.symbol, + type: row.type, + side: row.side, + qty: row.qty || row.quantity, + price: row.price, + status: row.status, + timestamp: row.timestamp, + trade_id: row.trade_id, + action: row.action, + sub_tag: row.sub_tag, + }); + if (row.filled_at) { + await updateOrderStatus(row.order_id, row.status, new Date(row.filled_at), row.price, row.qty || row.quantity); + } + })); + return true; + } catch (error: any) { + logger.error(`[RuntimeOrderRepo] Failed to upsert reconciliation backfill orders: ${error.message}`); + return false; + } +} + +export async function getReconciliationBackfillAuditRows(query: ReconciliationBackfillAuditQuery): Promise<{ rows: ReconciliationBackfillAuditRow[]; totalCount: number }> { + const rows = await queryAuditDocuments({ + batchId: query.batchId, + profileId: query.profileId, + symbol: query.symbol, + decisions: query.decisions, + limit: Math.max(1, Math.min(5000, Math.floor(Number(query.limit || 500)))) + }); + + const filtered = rows.filter((row) => { + const createdAt = toTimestampMs(row.created_at, 0); + if (query.fromIso && createdAt < Date.parse(query.fromIso)) return false; + if (query.toIso && createdAt > Date.parse(query.toIso)) return false; + return true; + }); + + const offset = Math.max(0, Math.floor(Number(query.offset || 0))); + const limit = Math.max(1, Math.min(5000, Math.floor(Number(query.limit || 500)))); + return { + rows: filtered.slice(offset, offset + limit).map((row, index) => ({ + ...row, + id: Number.parseInt(String(row.id || index + 1), 10) || (index + 1) + })), + totalCount: filtered.length + }; +} + +export async function getReconciliationBackfillBatchSummaries(query?: { + profileId?: string; + symbol?: string; + fromIso?: string; + toIso?: string; + limit?: number; +}): Promise { + const rows = await queryAuditDocuments({ + profileId: query?.profileId, + symbol: query?.symbol, + limit: 10_000 + }); + + const summaryByBatch = new Map(); + for (const row of rows) { + const batchId = String(row.batch_id || '').trim(); + if (!batchId) continue; + const createdAt = String(row.created_at || nowIso()); + const existing = summaryByBatch.get(batchId) || { + batchId, + firstSeenAt: createdAt, + lastSeenAt: createdAt, + profileIds: [], + symbols: [], + totalRows: 0, + byDecision: {}, + dryRunRows: 0, + appliedRows: 0, + revertedRows: 0, + }; + existing.firstSeenAt = existing.firstSeenAt < createdAt ? existing.firstSeenAt : createdAt; + existing.lastSeenAt = existing.lastSeenAt > createdAt ? existing.lastSeenAt : createdAt; + if (row.profile_id && !existing.profileIds.includes(row.profile_id)) existing.profileIds.push(row.profile_id); + if (row.symbol && !existing.symbols.includes(row.symbol)) existing.symbols.push(row.symbol); + existing.totalRows += 1; + existing.byDecision[row.decision] = (existing.byDecision[row.decision] || 0) + 1; + if (row.dry_run) existing.dryRunRows += 1; + if (row.applied_at) existing.appliedRows += 1; + if (row.reverted_at) existing.revertedRows += 1; + summaryByBatch.set(batchId, existing); + } + + const fromMs = query?.fromIso ? Date.parse(query.fromIso) : 0; + const toMs = query?.toIso ? Date.parse(query.toIso) : Number.POSITIVE_INFINITY; + + const summaries = Array.from(summaryByBatch.values()) + .filter((summary) => { + const lastSeen = Date.parse(summary.lastSeenAt); + return lastSeen >= fromMs && lastSeen <= toMs; + }) + .sort((a, b) => Date.parse(b.lastSeenAt) - Date.parse(a.lastSeenAt)); + + return summaries.slice(0, Math.max(1, Math.min(1000, Math.floor(Number(query?.limit || 200))))); +} + +export async function revertBackfillBatch(batchId: string): Promise<{ reverted: number; errors: string[] }> { + const rows = await queryAuditDocuments({ batchId: String(batchId || '').trim(), limit: 5000 }); + const errors: string[] = []; + let reverted = 0; + + await Promise.all(rows.map(async (row) => { + const orderId = String(row.backfill_order_id || '').trim(); + if (!orderId) return; + try { + await updateOrderStatus(orderId, 'canceled'); + await upsertDocument(RECONCILIATION_AUDIT_CONTAINER, { + ...row, + reverted_at: nowIso(), + updated_at: nowIso(), + }); + reverted += 1; + } catch (error: any) { + errors.push(`${orderId}: ${error.message}`); + } + })); + + return { reverted, errors }; +} + +export async function repairMissingSubTagsForProfile(options: { + profileId: string; + lookbackHours: number; + maxRows: number; + dryRun: boolean; +}): Promise { + const summary: ReconciliationSubTagRepairSummary = { + attempted: true, + scannedRows: 0, + eligibleRows: 0, + updatedRows: 0, + skippedNoProfile: 0, + skippedNoTrade: 0, + skippedTagDisabled: 0, + skippedAlreadyTagged: 0, + dryRun: Boolean(options.dryRun) + }; + + if (!shouldAttachAlpacaSubTag({ profileId: options.profileId })) { + return { + ...summary, + unsupported: true + }; + } + + const sinceMs = Date.now() - Math.max(1, Math.floor(options.lookbackHours || 720)) * 60 * 60 * 1000; + const rows = await queryOrders({ + profileId: options.profileId, + limit: Math.max(1, Math.min(5000, Math.floor(Number(options.maxRows || 500)))) + }); + + const candidates = rows.filter((row) => { + const createdAt = toTimestampMs(row.created_at, 0); + return createdAt >= sinceMs && !String(row.sub_tag || '').trim() && String(row.source || '').trim() !== 'MANUAL'; + }); + summary.scannedRows = candidates.length; + + for (const row of candidates) { + const profileId = String(row.profile_id || '').trim(); + if (!profileId) { + summary.skippedNoProfile += 1; + continue; + } + const tradeId = String(row.trade_id || '').trim(); + if (!tradeId) { + summary.skippedNoTrade += 1; + continue; + } + if (String(row.sub_tag || '').trim()) { + summary.skippedAlreadyTagged += 1; + continue; + } + const derived = resolvePersistedOrderSubTag({ + profile_id: profileId, + trade_id: tradeId, + action: String(row.action || ''), + }); + if (!derived) { + summary.skippedTagDisabled += 1; + continue; + } + summary.eligibleRows += 1; + if (summary.dryRun) continue; + + await upsertOrderDocument({ + ...row, + sub_tag: derived, + updated_at: nowIso(), + }); + summary.updatedRows += 1; + } + + return summary; +} + +export async function getStaleOrders(staleThresholdMinutes: number = 5, scope?: string | StaleOrderScope): Promise { + const scopeObject: StaleOrderScope = typeof scope === 'string' ? { profileId: scope } : (scope || {}); + const threshold = Date.now() - staleThresholdMinutes * 60 * 1000; + const rows = await queryOrders({ + profileId: String(scopeObject.profileId || '').trim() || undefined, + userId: String(scopeObject.userId || '').trim() || undefined, + statuses: ['pending_new', 'pending', 'accepted', 'new'], + orderBy: 'created_at', + ascending: true, + limit: 250 + }); + + return rows.filter((row) => { + const createdAt = toTimestampMs(row.created_at, 0); + if (!(createdAt > 0 && createdAt < threshold)) return false; + if (scopeObject.profileNullOnly && row.profile_id) return false; + if (scopeObject.includeOrphanUserOrders && scopeObject.profileId) { + return row.profile_id === scopeObject.profileId || (!row.profile_id && row.user_id === scopeObject.userId); + } + return true; + }); +} + +export async function getExpiredOrUnknownOrders(): Promise { + return await queryOrders({ + statuses: ['expired', 'unknown'], + limit: 2000 + }); +} + +export async function isReconciliationBackfillAuditAvailable(): Promise { + return cosmosEnabled(); +} + +export async function getVirtualOpenPosition(profileId: string, symbol: string): Promise { + const symbolCandidates = buildLifecycleSymbolCandidates(symbol); + if (!symbolCandidates.length) return null; + + const rows = await queryOrders({ + profileId, + symbols: symbolCandidates, + statuses: ['filled', 'partially_filled'], + limit: 5000 + }); + + type TradeLedger = { + tradeId: string; + side: 'BUY' | 'SELL'; + entryQty: number; + entryNotional: number; + entryLastPrice: number; + exitQty: number; + userId?: string; + stopLoss: number; + takeProfit: number; + lastTs: number; + }; + + type SideAggregate = { + side: 'BUY' | 'SELL'; + qty: number; + notional: number; + userId?: string; + stopLoss: number; + takeProfit: number; + tradeIds: string[]; + primaryTradeId: string; + primaryTs: number; + }; + + const orderedRows = rows + .map((row, index) => ({ row, index, ts: orderTimestamp(row, index) })) + .sort((a, b) => (a.ts - b.ts) || (a.index - b.index)); + + const ledgerByTrade = new Map(); + const entrySideByTrade = new Map(); + const openTradeQueueBySide: Record<'BUY' | 'SELL', string[]> = { BUY: [], SELL: [] }; + const normalizeToken = (value: string): string => value.replace(/[^A-Za-z0-9]/g, '').slice(0, 24) || 'token'; + const profileToken = normalizeToken(profileId); + const symbolToken = normalizeToken(symbol); + let syntheticCounter = 0; + + const buildSyntheticTradeId = (side: 'BUY' | 'SELL', ts: number): string => { + syntheticCounter += 1; + const tsToken = Number.isFinite(ts) && ts > 0 ? Math.trunc(ts) : syntheticCounter; + return `__legacy__-${profileToken}-${symbolToken}-${side}-${tsToken}-${String(syntheticCounter).padStart(4, '0')}`; + }; + + for (const { row, ts } of orderedRows) { + const qty = clampNumber(row.qty || row.quantity); + if (qty <= 0) continue; + + const rowSide = normalizeTradeSide(row.side || 'BUY'); + const rawTradeId = String(row.trade_id || '').trim(); + const oppositeSide: 'BUY' | 'SELL' = rowSide === 'BUY' ? 'SELL' : 'BUY'; + const explicitAction = normalizeOrderAction(row.action || undefined); + let tradeId = rawTradeId; + let action = explicitAction; + + if (!action && !tradeId) { + action = openTradeQueueBySide[oppositeSide].length > 0 ? 'EXIT' : 'ENTRY'; + } + + if (!tradeId) { + if (action === 'EXIT' && openTradeQueueBySide[oppositeSide].length > 0) { + tradeId = openTradeQueueBySide[oppositeSide][0]; + } else { + tradeId = buildSyntheticTradeId(action === 'EXIT' ? oppositeSide : rowSide, ts); + } + } + + if (!action) { + const knownEntrySide = entrySideByTrade.get(tradeId); + action = knownEntrySide ? (rowSide === knownEntrySide ? 'ENTRY' : 'EXIT') : inferLifecycleAction(undefined, row.side); + } + + let ledger = ledgerByTrade.get(tradeId); + if (!ledger) { + ledger = { + tradeId, + side: rowSide, + entryQty: 0, + entryNotional: 0, + entryLastPrice: 0, + exitQty: 0, + userId: toOptionalString(row.user_id), + stopLoss: 0, + takeProfit: 0, + lastTs: ts + }; + ledgerByTrade.set(tradeId, ledger); + } + + if (action === 'ENTRY') { + if (ledger.entryQty === 0) ledger.side = rowSide; + ledger.entryQty += qty; + entrySideByTrade.set(tradeId, ledger.side); + if (!openTradeQueueBySide[ledger.side].includes(tradeId)) { + openTradeQueueBySide[ledger.side].push(tradeId); + } + const price = clampNumber(row.price); + if (price > 0) { + ledger.entryNotional += price * qty; + ledger.entryLastPrice = price; + } + const stopLoss = clampNumber(row.stop_loss); + const takeProfit = clampNumber(row.take_profit); + if (stopLoss > 0) ledger.stopLoss = stopLoss; + if (takeProfit > 0) ledger.takeProfit = takeProfit; + } else { + ledger.exitQty += qty; + const queue = openTradeQueueBySide[oppositeSide]; + const idx = queue.findIndex((value) => value === tradeId); + if (idx >= 0) queue.splice(idx, 1); + else if (queue.length > 0) queue.shift(); + } + if (row.user_id) ledger.userId = String(row.user_id); + ledger.lastTs = Math.max(ledger.lastTs, ts); + } + + const aggregateBySide = new Map<'BUY' | 'SELL', SideAggregate>(); + for (const tradeLedger of ledgerByTrade.values()) { + const remainingQty = tradeLedger.entryQty - tradeLedger.exitQty; + if (remainingQty <= 1e-8) continue; + const weightedEntryPrice = tradeLedger.entryQty > 0 && tradeLedger.entryNotional > 0 + ? tradeLedger.entryNotional / tradeLedger.entryQty + : tradeLedger.entryLastPrice; + if (!(weightedEntryPrice > 0)) continue; + + const normalizedTradeId = tradeLedger.tradeId.startsWith('__legacy__-') + ? `TRD-LEGACY-${tradeLedger.tradeId.slice('__legacy__-'.length)}` + : tradeLedger.tradeId; + + let aggregate = aggregateBySide.get(tradeLedger.side); + if (!aggregate) { + aggregate = { + side: tradeLedger.side, + qty: 0, + notional: 0, + userId: tradeLedger.userId, + stopLoss: tradeLedger.stopLoss, + takeProfit: tradeLedger.takeProfit, + tradeIds: [], + primaryTradeId: normalizedTradeId, + primaryTs: tradeLedger.lastTs + }; + aggregateBySide.set(tradeLedger.side, aggregate); + } + aggregate.qty += remainingQty; + aggregate.notional += remainingQty * weightedEntryPrice; + if (!aggregate.tradeIds.includes(normalizedTradeId)) aggregate.tradeIds.push(normalizedTradeId); + if (tradeLedger.lastTs >= aggregate.primaryTs) { + aggregate.primaryTs = tradeLedger.lastTs; + aggregate.primaryTradeId = normalizedTradeId; + aggregate.userId = tradeLedger.userId || aggregate.userId; + if (tradeLedger.stopLoss > 0) aggregate.stopLoss = tradeLedger.stopLoss; + if (tradeLedger.takeProfit > 0) aggregate.takeProfit = tradeLedger.takeProfit; + } + } + + let dominant: SideAggregate | null = null; + for (const candidate of aggregateBySide.values()) { + if (!dominant || candidate.qty > dominant.qty) { + dominant = candidate; + } + } + if (!dominant || dominant.qty <= 1e-8) return null; + + const entryPrice = dominant.notional / dominant.qty; + return { + profileId, + symbol, + side: dominant.side, + qty: Number(dominant.qty.toFixed(8)), + entryPrice: Number(entryPrice.toFixed(8)), + stopLoss: Number((dominant.stopLoss || 0).toFixed(8)), + takeProfit: Number((dominant.takeProfit || 0).toFixed(8)), + userId: dominant.userId, + tradeId: dominant.primaryTradeId, + tradeIds: dominant.tradeIds + }; +} + +export async function getVirtualOpenPositionForTrade(profileId: string, symbol: string, tradeId: string): Promise { + const symbolCandidates = buildLifecycleSymbolCandidates(symbol); + if (!symbolCandidates.length) return null; + const rows = await queryOrders({ + profileId, + tradeId: String(tradeId || '').trim(), + symbols: symbolCandidates, + statuses: ['filled', 'partially_filled'], + orderBy: 'created_at', + ascending: true, + limit: 1000 + }); + if (!rows.length) return null; + + let entrySide: 'BUY' | 'SELL' | null = null; + let entryQty = 0; + let entryNotional = 0; + let exitQty = 0; + let stopLoss = 0; + let takeProfit = 0; + let userId: string | undefined; + + for (const row of sortByCreatedAtAsc(rows)) { + const qty = clampNumber(row.qty || row.quantity); + if (qty <= 0) continue; + + const side = normalizeTradeSide(row.side || 'BUY'); + let action = normalizeOrderAction(row.action || undefined); + if (!action) { + action = entrySide ? (side === entrySide ? 'ENTRY' : 'EXIT') : inferLifecycleAction(undefined, row.side); + } + + if (action === 'ENTRY') { + if (!entrySide) entrySide = side; + entryQty += qty; + const price = clampNumber(row.price); + if (price > 0) entryNotional += price * qty; + const sl = clampNumber(row.stop_loss); + const tp = clampNumber(row.take_profit); + if (sl > 0) stopLoss = sl; + if (tp > 0) takeProfit = tp; + } else { + exitQty += qty; + } + userId = toOptionalString(row.user_id) || userId; + } + + const remainingQty = entryQty - exitQty; + if (!(remainingQty > 1e-8) || !(entryNotional > 0) || !entrySide) return null; + + return { + profileId, + symbol, + side: entrySide, + qty: Number(remainingQty.toFixed(8)), + entryPrice: Number((entryNotional / entryQty).toFixed(8)), + stopLoss: Number((stopLoss || 0).toFixed(8)), + takeProfit: Number((takeProfit || 0).toFixed(8)), + userId, + tradeId: String(tradeId || '').trim(), + tradeIds: [String(tradeId || '').trim()] + }; +} diff --git a/backend/src/services/tradeHistoryRepository.ts b/backend/src/services/tradeHistoryRepository.ts index d0ce806..2f72217 100644 --- a/backend/src/services/tradeHistoryRepository.ts +++ b/backend/src/services/tradeHistoryRepository.ts @@ -1,7 +1,6 @@ import logger from '../utils/logger.js'; -import type { supabaseService } from './SupabaseService.js'; - -type LegacySupabaseService = typeof supabaseService; +import { TRADE_HISTORY_CONTAINER, clampNumber, queryDocuments } from './tradingRecordStore.js'; +import { config } from '../config/index.js'; export interface TradeHistoryKeyRecord { trade_id: string; @@ -25,79 +24,78 @@ export interface TradeHistoryRow { source?: string; } +type TradeHistoryDocument = TradeHistoryRow & { + productId: string; + type: 'trade_history'; + user_id?: string; +}; + const startOfCurrentDayUtc = (): string => { const now = new Date(); return new Date(Date.UTC(now.getUTCFullYear(), now.getUTCMonth(), now.getUTCDate())).toISOString(); }; -function getClient(legacyService?: LegacySupabaseService) { - return legacyService?.getClient?.() ?? null; +async function listTradeHistoryDocuments(options: { + userId?: string; + profileId?: string; + limit?: number; +}): Promise { + const filters = ['c.productId = @productId', 'c.type = @type']; + const parameters: Array<{ name: string; value: unknown }> = [ + { name: '@productId', value: config.PRODUCT_ID }, + { name: '@type', value: 'trade_history' }, + ]; + + if (options.userId) { + filters.push('c.user_id = @userId'); + parameters.push({ name: '@userId', value: options.userId }); + } + if (options.profileId) { + filters.push('c.profile_id = @profileId'); + parameters.push({ name: '@profileId', value: options.profileId }); + } + + const limit = Math.max(1, Math.min(5000, Math.floor(Number(options.limit || 5000)))); + const query = `SELECT TOP ${limit} * FROM c WHERE ${filters.join(' AND ')} ORDER BY c.created_at DESC`; + return await queryDocuments(TRADE_HISTORY_CONTAINER, query, parameters); } -export async function getProfileDailyNetPnlUsd(profileId: string, legacyService?: LegacySupabaseService): Promise { - const client = getClient(legacyService); - if (!client || !profileId) return 0; - +export async function getProfileDailyNetPnlUsd(profileId: string): Promise { + if (!profileId) return 0; try { - const { data, error } = await client - .from('trade_history') - .select('pnl, created_at') - .eq('profile_id', profileId) - .gte('created_at', startOfCurrentDayUtc()) - .order('created_at', { ascending: false }) - .limit(5000); - - if (error) { - logger.error(`[TradeHistoryRepo] Daily net PnL lookup failed for ${profileId}: ${error.message}`); - return 0; - } - - return (data || []).reduce((sum: number, row: any) => { - const pnl = Number(row?.pnl || 0); - return Number.isFinite(pnl) ? sum + pnl : sum; + const rows = await listTradeHistoryDocuments({ profileId, limit: 5000 }); + const todayIso = startOfCurrentDayUtc(); + return rows.reduce((sum, row) => { + if (String(row.created_at || '') < todayIso) return sum; + return sum + clampNumber(row.pnl); }, 0); } catch (error: any) { - logger.error(`[TradeHistoryRepo] Daily net PnL unexpected error for ${profileId}: ${error.message}`); + logger.error(`[TradeHistoryRepo] Daily net PnL lookup failed for ${profileId}: ${error.message}`); return 0; } } -export async function getProfileDailyLossUsd(profileId: string, legacyService?: LegacySupabaseService): Promise { - const netPnl = await getProfileDailyNetPnlUsd(profileId, legacyService); +export async function getProfileDailyLossUsd(profileId: string): Promise { + const netPnl = await getProfileDailyNetPnlUsd(profileId); return netPnl < 0 ? Math.abs(netPnl) : 0; } -export async function getProfileConsecutiveLosses( - profileId: string, - lookback: number = 100, - legacyService?: LegacySupabaseService -): Promise { - const client = getClient(legacyService); - if (!client || !profileId) return 0; - +export async function getProfileConsecutiveLosses(profileId: string, lookback: number = 100): Promise { + if (!profileId) return 0; try { - const cappedLookback = Math.max(1, Math.min(500, Math.floor(lookback))); - const { data, error } = await client - .from('trade_history') - .select('pnl, created_at') - .eq('profile_id', profileId) - .order('created_at', { ascending: false }) - .limit(cappedLookback); - - if (error) { - logger.error(`[TradeHistoryRepo] Consecutive loss lookup failed for ${profileId}: ${error.message}`); - return 0; - } - + const rows = await listTradeHistoryDocuments({ + profileId, + limit: Math.max(1, Math.min(500, Math.floor(lookback))) + }); let consecutiveLosses = 0; - for (const row of data || []) { - const pnl = Number((row as any)?.pnl || 0); - if (!Number.isFinite(pnl) || pnl >= 0) break; + for (const row of rows) { + const pnl = clampNumber(row.pnl); + if (pnl >= 0) break; consecutiveLosses += 1; } return consecutiveLosses; } catch (error: any) { - logger.error(`[TradeHistoryRepo] Consecutive loss unexpected error for ${profileId}: ${error.message}`); + logger.error(`[TradeHistoryRepo] Consecutive loss lookup failed for ${profileId}: ${error.message}`); return 0; } } @@ -105,34 +103,17 @@ export async function getProfileConsecutiveLosses( export async function listRecentTradeHistoryKeys(options: { userId?: string; limit?: number; - legacyService?: LegacySupabaseService; }): Promise { - const client = getClient(options.legacyService); - if (!client) return []; - try { - let query = client - .from('trade_history') - .select('trade_id,profile_id') - .order('created_at', { ascending: false }) - .limit(Math.max(1, Math.min(5000, Math.floor(Number(options.limit || 5000))))); - - if (options.userId) { - query = query.eq('user_id', options.userId); - } - - const { data, error } = await query; - if (error) { - logger.error(`[TradeHistoryRepo] Recent trade key lookup failed: ${error.message}`); - return []; - } - - return (data || []).map((row: any) => ({ - trade_id: String(row?.trade_id || '').trim(), - profile_id: row?.profile_id ? String(row.profile_id) : null - })).filter((row) => Boolean(row.trade_id)); + const rows = await listTradeHistoryDocuments(options); + return rows + .map((row) => ({ + trade_id: String(row.trade_id || '').trim(), + profile_id: row.profile_id ? String(row.profile_id) : null, + })) + .filter((row) => Boolean(row.trade_id)); } catch (error: any) { - logger.error(`[TradeHistoryRepo] Recent trade key unexpected error: ${error.message}`); + logger.error(`[TradeHistoryRepo] Recent trade key lookup failed: ${error.message}`); return []; } } @@ -140,31 +121,12 @@ export async function listRecentTradeHistoryKeys(options: { export async function listRecentTradeHistory(options: { userId?: string; limit?: number; - legacyService?: LegacySupabaseService; }): Promise { - const client = getClient(options.legacyService); - if (!client) return []; - try { - let query = client - .from('trade_history') - .select('id,timestamp,symbol,side,size,entry_price,exit_price,pnl,pnl_percent,reason,profile_id,created_at,trade_id,source') - .order('created_at', { ascending: false }) - .limit(Math.max(1, Math.min(5000, Math.floor(Number(options.limit || 5000))))); - - if (options.userId) { - query = query.eq('user_id', options.userId); - } - - const { data, error } = await query; - if (error) { - logger.error(`[TradeHistoryRepo] Recent trade history lookup failed: ${error.message}`); - return []; - } - - return (data || []) as TradeHistoryRow[]; + const rows = await listTradeHistoryDocuments(options); + return rows as TradeHistoryRow[]; } catch (error: any) { - logger.error(`[TradeHistoryRepo] Recent trade history unexpected error: ${error.message}`); + logger.error(`[TradeHistoryRepo] Recent trade history lookup failed: ${error.message}`); return []; } } diff --git a/backend/src/services/tradingRecordStore.ts b/backend/src/services/tradingRecordStore.ts new file mode 100644 index 0000000..8f0bcc7 --- /dev/null +++ b/backend/src/services/tradingRecordStore.ts @@ -0,0 +1,85 @@ +import { getContainer } from '@bytelyst/cosmos'; +import { randomUUID } from 'node:crypto'; +import { config } from '../config/index.js'; + +export const ORDER_CONTAINER = 'trade_orders'; +export const TRADE_HISTORY_CONTAINER = 'trade_history'; +export const MANUAL_ENTRY_CONTAINER = 'manual_entries'; +export const RECONCILIATION_AUDIT_CONTAINER = 'reconciliation_backfill_audit'; + +export interface TradingRecordDocument { + id: string; + productId: string; + type: string; + created_at: string; + updated_at: string; +} + +export function isCosmosConfigured(): boolean { + return Boolean(config.COSMOS_ENDPOINT && config.COSMOS_KEY && config.COSMOS_DATABASE); +} + +export function requireCosmosConfigured(): void { + if (!isCosmosConfigured()) { + throw new Error('Cosmos DB is not configured for trading record persistence'); + } +} + +export function nowIso(): string { + return new Date().toISOString(); +} + +export function clampNumber(value: unknown, fallback: number = 0): number { + const parsed = Number(value); + return Number.isFinite(parsed) ? parsed : fallback; +} + +export function toOptionalString(value: unknown): string | undefined { + const text = String(value || '').trim(); + return text || undefined; +} + +export function toOptionalNumber(value: unknown): number | undefined { + const parsed = Number(value); + return Number.isFinite(parsed) ? parsed : undefined; +} + +export function buildDocId(...parts: Array): string { + const normalized = parts + .map((part) => String(part || '').trim()) + .filter(Boolean) + .join('::'); + return normalized || randomUUID(); +} + +export async function queryDocuments(containerName: string, query: string, parameters: Array<{ name: string; value: unknown }>): Promise { + requireCosmosConfigured(); + const container = getContainer(containerName); + const { resources } = await container.items.query({ query, parameters: parameters as any }).fetchAll(); + return resources; +} + +export async function upsertDocument(containerName: string, document: T): Promise { + requireCosmosConfigured(); + const container = getContainer(containerName); + const { resource } = await container.items.upsert(document); + return (resource || document) as T; +} + +export async function deleteDocument(containerName: string, id: string, partitionKey?: string): Promise { + requireCosmosConfigured(); + const container = getContainer(containerName); + await container.item(id, partitionKey || id).delete(); +} + +export function buildBaseDocument(type: string, payload: Record, id?: string): TradingRecordDocument { + const timestamp = nowIso(); + return { + id: id || buildDocId(type, payload['profile_id'], payload['order_id'], payload['trade_id'], payload['user_id'], randomUUID()), + productId: config.PRODUCT_ID, + type, + created_at: String(payload['created_at'] || timestamp), + updated_at: String(payload['updated_at'] || timestamp), + ...payload, + }; +} diff --git a/docs/OPERATIONS.md b/docs/OPERATIONS.md index 1f7f773..b06a8b3 100644 --- a/docs/OPERATIONS.md +++ b/docs/OPERATIONS.md @@ -8,7 +8,7 @@ It covers: - local development setup - verification and CI expectations -- staged cutover from legacy repos +- staged rollout of the new monorepo deployment - rollback rules - release go/no-go checks - post-cutover monitoring @@ -25,7 +25,6 @@ It covers: - access to: - platform-service - Azure Cosmos DB - - optional legacy Supabase project during migration ### Workspace bootstrap @@ -70,18 +69,11 @@ pnpm --filter @bytelyst/trading-mobile dev - `COSMOS_KEY` - `COSMOS_DATABASE` -### Transitional legacy migration support - -- `SUPABASE_URL` -- `SUPABASE_KEY` -- `SUPABASE_JWT_ISSUER` -- `SUPABASE_JWT_AUDIENCE` - Rule: -- platform-service and Cosmos are the target system -- Supabase remains transitional only where trading persistence has not yet been migrated -- trading user profiles, dynamic config, trading controls, snapshots, capital ledgers, and strategy presets now have Cosmos-backed authority paths +- platform-service and Cosmos are the only supported production systems for this repo +- legacy repos may still be consulted as code references, but they are not runtime dependencies +- trading user profiles, dynamic config, trading controls, snapshots, capital ledgers, and strategy presets already use Cosmos-backed authority paths ## Verification Standard @@ -128,10 +120,9 @@ pnpm lint ### Backend cutover -- deploy backend with platform JWT support and Cosmos-backed trading controls enabled -- allow legacy Supabase reads only for controlled migration seeding where a Cosmos-native repository is not complete yet +- deploy backend with platform JWT support and Cosmos-backed control-plane and execution persistence enabled - confirm runtime control reads/writes work through backend APIs -- confirm `dynamic_config` and trading-control containers are readable and writable +- confirm `dynamic_config`, trading-control, order, trade-history, and manual-entry containers are readable and writable - confirm unauthorized requests are rejected and tenant-scoped reads are enforced ### Web cutover @@ -179,14 +170,14 @@ Release is `go` only if all of the following are true: - `pnpm smoke:release` passes - platform-service auth is reachable from web and mobile - Cosmos control-plane reads and writes succeed +- Cosmos execution-data reads and writes succeed - kill-switch and maintenance behavior are validated on web and mobile - backend tenant isolation checks are green - operator-safe mobile interventions are limited to approved actions only -- known migration-only legacy dependencies are documented +- no legacy runtime data dependency remains in critical public flows Release is `no-go` if any of the following are true: -- Supabase fallback is still required for a critical public flow that has no monitored contingency - auth source of truth is ambiguous in production - admin/runtime-control actions are not fully audited - rollback owner or rollback commands are unclear @@ -233,8 +224,9 @@ Manual mobile release smoke is still required before broad rollout: ## Known Remaining Gaps -- full trading data-plane migration away from legacy Supabase is not complete -- web still carries some legacy compatibility layers around auth/profile bootstrap +- Cosmos-only execution persistence is now in place for the main backend runtime paths, but dormant legacy code and one-off reference scripts still need cleanup +- web still carries some compatibility layers around auth/profile bootstrap +- root `pnpm verify` is currently blocked by a web Vitest localStorage harness issue that needs its own cleanup pass - mobile does not yet include push notification infrastructure - feature-flag ownership and correlation-ID propagation are not fully standardized yet diff --git a/docs/ROADMAP.md b/docs/ROADMAP.md index a0a8d7d..22697af 100644 --- a/docs/ROADMAP.md +++ b/docs/ROADMAP.md @@ -8,7 +8,8 @@ It assumes: - `learning_ai_fastgap` is the structural reference monorepo - `learning_ai_common_plat` is the shared ecosystem dependency source -- `bytelyst-trading-dashboard-web`, `bytelyst-trading-bot-service`, and `bytelyst-trading-dashboard-mob` are migration inputs, not the target architecture +- `bytelyst-trading-dashboard-web`, `bytelyst-trading-bot-service`, and `bytelyst-trading-dashboard-mob` are code-reference inputs, not the target architecture +- `learning_ai_invt_trdg` is a greenfield deployment with Cosmos DB as the only supported target persistence system ## 2. Status Model @@ -30,21 +31,23 @@ It assumes: - [x] Backend migrated into `backend/` and passing typecheck, build, test, and backend verification gates - [x] Web migrated into `web/` with shared runtime, shared kill-switch gate, shared telemetry bootstrap, normalized backend URL resolution, and common-platform-native session handling - [x] Mobile migrated into `mobile/` with product identity, shared runtime bootstrap, launch-time kill-switch gate, platform-service auth, live backend polling plus websocket-backed updates, startup/error telemetry capture, secure session storage with invalidation handling, and explicit degraded/offline status surfacing -- [x] Backend now accepts common-platform JWTs with legacy Supabase fallback and persists global trading-control state through Cosmos-backed control storage -- [x] Dynamic config now flows through backend control-plane APIs with Cosmos-first storage and one-time legacy seeding during migration -- [x] Backend snapshots now use a Cosmos-first repository with one-time legacy seeding during migration -- [x] Distributed entry and reconciliation locks now use a Cosmos-first repository with legacy fallback -- [x] Capital ledger persistence now uses a Cosmos-first repository with one-time legacy seeding during migration +- [x] Backend now accepts common-platform JWTs and persists global trading-control state through Cosmos-backed control storage +- [x] Dynamic config now flows through backend control-plane APIs with Cosmos-backed storage +- [x] Backend snapshots now use a Cosmos-backed repository +- [x] Distributed entry and reconciliation locks now use a Cosmos-backed repository +- [x] Capital ledger persistence now uses a Cosmos-backed repository - [x] Mobile platform auth requests now use the common React Native platform SDK - [x] Backend risk and PnL aggregate reads now flow through repository abstractions instead of direct legacy service calls - [x] Web history, profile, marketplace, config, and manual-entry flows now run through backend APIs instead of browser-side table access - [x] Release smoke coverage now exists for web auth and product accessibility flows, with a tracked mobile release smoke checklist in operations - [x] Request ID propagation is now standardized across the main web/mobile API paths and echoed by backend HTTP responses - [x] Backtest feature access now reads from an explicit backend feature-flags contract instead of scraping generic runtime config -- [x] Trading user profiles and marketplace presets now have Cosmos-backed authority paths with legacy seeding/mirroring during migration +- [x] Trading user profiles and marketplace presets now have Cosmos-backed authority paths +- [x] Runtime order, trade-history, manual-entry, order-activity, and reconciliation-audit repositories now use Cosmos-backed trading-record storage instead of the legacy service layer +- [x] Runtime sub-tag repair now operates through the Cosmos-backed order repository - [x] Root verification and lint flows now run successfully without sandbox-hostile script harness behavior - [-] DRY cleanup completed for runtime/config/bootstrap concerns, shared websocket auth helpers, platform-session handling, and request tracing, but not yet for all persistence and feature-flag concerns -- [!] Full common-platform data-plane replacement remains a follow-up where selected trading-record repositories still depend on legacy Supabase storage because Cosmos-native equivalents are not finished yet +- [-] Cosmos-only execution persistence is now in place for the main backend runtime paths; remaining cleanup is removing dormant legacy code paths and reference scripts ## 3. Guiding Rules @@ -52,7 +55,8 @@ It assumes: 2. Do not duplicate auth, kill switch, telemetry, or config bootstrap. 3. Do not move core trading moat into common-platform packages. 4. Do not preserve legacy repo boundaries inside the new monorepo if they block clarity. -5. Migrate by contract, not by uncontrolled file copying. +5. Rebuild by contract, not by uncontrolled file copying. +6. Do not preserve migration-only fallback behavior in the new runtime. ## 4. Critical Path @@ -182,8 +186,8 @@ Ensure all surfaces adopt one consistent platform model for auth, kill switch, t - [x] Define correlation ID and request propagation strategy - [x] Define feature flag ownership and evaluation model - [x] Define system-of-record ownership by concern -- [x] Define degraded-platform fallback behavior -- [x] Define transitional adapters needed for legacy auth flows +- [x] Define degraded-platform behavior +- [x] Define compatibility boundaries only where required to preserve domain behavior during derivation ### Deliverables @@ -225,11 +229,11 @@ Make backend the stable authority before web and mobile migrate heavily onto it. - [x] Integrate explicit kill-switch and maintenance semantics - [x] Assign backend enforcement for global trade halt, tenant disable, and profile disable - [x] Add runtime control endpoints -- [x] Add platform-JWT verification with legacy fallback +- [x] Add platform-JWT verification - [x] Add Cosmos-backed global trading-control persistence -- [x] Move snapshots to Cosmos-first repository flow with legacy fallback -- [x] Move distributed runtime locks to Cosmos-first repository flow with legacy fallback -- [x] Move capital ledger persistence to Cosmos-first repository flow with legacy fallback +- [x] Move snapshots to Cosmos-backed repository flow +- [x] Move distributed runtime locks to Cosmos-backed repository flow +- [x] Move capital ledger persistence to Cosmos-backed repository flow - [-] Standardize admin controls and audit logging - [ ] Define admin audit event schema - [ ] Define durable state ownership between memory, database, and exchange sync @@ -368,7 +372,7 @@ Build mobile as a real ecosystem surface, not a mock UI shell. ### Objective -Remove duplicated implementation patterns exposed during migration. +Remove duplicated implementation patterns exposed during derivation from the legacy repos. ### Checklist @@ -378,7 +382,7 @@ Remove duplicated implementation patterns exposed during migration. - [x] Consolidate telemetry boot and event fields - [x] Consolidate kill-switch UX and service-state handling - [x] Consolidate shared types for product contracts -- [ ] Remove temporary migration-only adapters that are no longer needed +- [-] Remove temporary derivation-only adapters that are no longer needed ### Guardrail @@ -408,7 +412,7 @@ Validate that the new monorepo is safer and more coherent than the legacy setup - [ ] Add backend contract tests - [x] Add web auth and kill-switch smoke tests - [x] Add mobile launch/auth/kill-switch smoke coverage -- [x] Add docs for local dev, CI, Docker, and fallback behaviors +- [x] Add docs for local dev, CI, Docker, and degraded-platform behaviors - [x] Define cutover sequencing from legacy repos - [x] Define rollback paths - [x] Define release go/no-go checklist @@ -479,7 +483,7 @@ Validate that the new monorepo is safer and more coherent than the legacy setup - [ ] Mobile overview/alerts/positions/history - [ ] DRY cleanup - [x] Verification and cutover docs -- [-] Backend Cosmos-first repository migration for safety-critical persistence +- [x] Backend Cosmos-authoritative repository implementation for safety-critical persistence ### Recommended Rollout Order @@ -505,10 +509,15 @@ Validate that the new monorepo is safer and more coherent than the legacy setup ### Risk: auth model becomes split between Supabase-specific flows and platform-service flows -- [ ] Mitigation: use an adapter strategy during migration +- [x] Mitigation: preserve domain behavior while removing migration-only storage fallbacks - [ ] Mitigation: define one authoritative session model early - [ ] Mitigation: document transitional behavior explicitly +### Risk: repo-level verification stays red due to test-harness drift instead of product regressions + +- [x] Mitigation: keep backend safety gates green while cutting over persistence +- [!] Mitigation: fix the current web Vitest `window.localStorage` harness issue before claiming a fully green root `pnpm verify` + ### Risk: kill switch becomes semantically overloaded - [ ] Mitigation: separate product maintenance mode from trade-halt control