learning_ai_invt_trdg/backend/src/services/runtimeOrderRepository.ts

1343 lines
48 KiB
TypeScript

import { randomUUID } from 'node:crypto';
import { config } from '../config/index.js';
import {
normalizeOrderAction,
normalizeOrderStatus,
normalizeOrderType,
normalizeTradeSide
} from '../domain/tradingEnums.js';
import logger from '../utils/logger.js';
import {
buildAlpacaSubTag,
isBytelystSubTag,
shouldAttachAlpacaSubTag,
subTagBelongsToProfile,
type AlpacaSubTagIntent
} from '../utils/alpacaSubTag.js';
import { SymbolMapper } from '../utils/symbolMapper.js';
import type {
FilledLifecycleOrderRow,
ReconciliationBackfillAuditInsert,
ReconciliationBackfillAuditQuery,
ReconciliationBackfillAuditRow,
ReconciliationBackfillBatchSummary,
ReconciliationBackfillOrderInsert,
ReconciliationSubTagRepairSummary,
StaleOrderScope,
VirtualOpenPosition
} from './tradingPersistenceTypes.js';
import {
ORDER_CONTAINER,
RECONCILIATION_AUDIT_CONTAINER,
TRADE_HISTORY_CONTAINER,
buildBaseDocument,
buildDocId,
clampNumber,
isCosmosConfigured,
nowIso,
queryDocuments,
toOptionalNumber,
toOptionalString,
upsertDocument
} from './tradingRecordStore.js';
type OrderDocument = FilledLifecycleOrderRow & {
id: string;
productId: string;
type: 'trade_order';
created_at: string;
updated_at: string;
};
type TradeHistoryDocument = {
id: string;
productId: string;
type: 'trade_history';
user_id: string;
profile_id?: string;
symbol: string;
side: string;
entry_price: number;
exit_price: number;
size: number;
pnl: number;
pnl_percent: number;
reason: string;
timestamp: number;
created_at: string;
updated_at: string;
stop_loss?: number;
take_profit?: number;
rules_metadata?: Record<string, any>;
trade_id?: string;
source?: 'BOT' | 'MANUAL';
};
type ReconciliationAuditDocument = Omit<ReconciliationBackfillAuditRow, 'id'> & {
id: string;
productId: string;
type: 'reconciliation_backfill_audit';
updated_at: string;
};
const OPEN_ORDER_STATUSES = ['pending_new', 'accepted', 'pending', 'new', 'partially_filled', 'partially-filled'];
const CLOSED_ORDER_STATUSES = ['filled', 'canceled', 'expired', 'rejected', 'unknown'];
const FILLED_ORDER_STATUSES = ['filled', 'partially_filled', 'partially-filled'];
function dustThresholdQty(): number {
const configured = Number(config.MIN_POSITION_QTY || 0.0001);
if (Number.isFinite(configured) && configured > 0) {
return configured;
}
return 0.0001;
}
function cosmosEnabled(): boolean {
return isCosmosConfigured();
}
function ensureCosmos(): void {
if (!cosmosEnabled()) {
throw new Error('Cosmos DB is required for runtime trading persistence');
}
}
function isUuid(value: string | undefined | null): boolean {
const normalized = String(value || '').trim();
return /^[0-9a-f]{8}-[0-9a-f]{4}-[1-5][0-9a-f]{3}-[89ab][0-9a-f]{3}-[0-9a-f]{12}$/i.test(normalized);
}
function orderStatusRank(status: string): number {
const normalized = String(status || '').trim().toLowerCase();
if (normalized === 'filled') return 6;
if (['canceled', 'expired', 'rejected', 'unknown'].includes(normalized)) return 5;
if (['partially_filled', 'partially-filled'].includes(normalized)) return 4;
if (['accepted', 'pending', 'new'].includes(normalized)) return 3;
return 1;
}
function pickMostReliableOrderStatus(existingStatus: string, incomingStatus: string): string {
return orderStatusRank(existingStatus) >= orderStatusRank(incomingStatus)
? normalizeOrderStatus(existingStatus)
: normalizeOrderStatus(incomingStatus);
}
function inferLifecycleAction(action: unknown, side: unknown): 'ENTRY' | 'EXIT' {
const normalizedAction = normalizeOrderAction(String(action || ''));
if (normalizedAction === 'ENTRY' || normalizedAction === 'EXIT') {
return normalizedAction;
}
return normalizeTradeSide(String(side || 'BUY')) === 'SELL' ? 'EXIT' : 'ENTRY';
}
function resolveSubTagIntent(action: unknown): AlpacaSubTagIntent {
const normalizedAction = normalizeOrderAction(String(action || ''));
if (normalizedAction === 'ENTRY' || normalizedAction === 'EXIT') {
return normalizedAction;
}
return 'UNKNOWN';
}
function resolvePersistedOrderSubTag(order: {
profile_id?: string;
trade_id?: string;
action?: string;
sub_tag?: string;
subTag?: string;
}): string | undefined {
const explicit = String(order.sub_tag || order.subTag || '').trim();
if (explicit) return explicit;
const profileId = String(order.profile_id || '').trim();
const tradeId = String(order.trade_id || '').trim();
if (!profileId || !tradeId || !shouldAttachAlpacaSubTag({ profileId })) {
return undefined;
}
return buildAlpacaSubTag({
profileId,
tradeId,
intent: resolveSubTagIntent(order.action)
}) || undefined;
}
function buildLifecycleSymbolCandidates(symbol: string): string[] {
const normalized = String(symbol || '').trim();
if (!normalized) return [];
const provider = String(config.EXECUTION_PROVIDER || '').trim() || 'alpaca';
const variants = new Set<string>();
variants.add(normalized);
variants.add(normalized.toUpperCase());
try {
const mapped = SymbolMapper.toTradeSymbol(normalized, provider);
if (mapped) {
variants.add(mapped);
variants.add(String(mapped).toUpperCase());
}
} catch {
// Keep the direct symbol variants only.
}
return Array.from(variants).filter(Boolean);
}
function toTimestampMs(value: unknown, fallback: number): number {
if (typeof value === 'number') {
if (Number.isFinite(value) && value > 1_000_000_000_000) return value;
if (Number.isFinite(value) && value > 0) return value * 1000;
return fallback;
}
if (typeof value === 'string') {
const trimmed = value.trim();
if (/^\d+(\.\d+)?$/.test(trimmed)) {
return toTimestampMs(Number(trimmed), fallback);
}
const parsed = Date.parse(trimmed);
if (Number.isFinite(parsed) && parsed > 0) return parsed;
}
return fallback;
}
function orderTimestamp(row: Partial<FilledLifecycleOrderRow>, fallback: number): number {
return toTimestampMs(row.timestamp, toTimestampMs(row.created_at, fallback));
}
function sortByCreatedAtAsc<T extends Partial<FilledLifecycleOrderRow>>(rows: T[]): T[] {
return [...rows].sort((a, b) => orderTimestamp(a, 0) - orderTimestamp(b, 0));
}
function normalizeOrderDocument(order: {
user_id: string;
profile_id?: string;
order_id?: string;
symbol: string;
type: string;
side: string;
qty: number;
price: number;
status: string;
timestamp: number;
stop_loss?: number;
take_profit?: number;
trade_id?: string;
action?: string;
sub_tag?: string;
subTag?: string;
}): OrderDocument {
const normalizedStatus = normalizeOrderStatus(order.status);
const normalizedAction = normalizeOrderAction(order.action) || inferLifecycleAction(undefined, order.side);
const payload: OrderDocument = {
...(buildBaseDocument('trade_order', {
user_id: String(order.user_id || '').trim(),
profile_id: toOptionalString(order.profile_id),
order_id: toOptionalString(order.order_id),
symbol: String(order.symbol || '').trim(),
type: normalizeOrderType(order.type),
side: normalizeTradeSide(order.side),
qty: clampNumber(order.qty),
quantity: clampNumber(order.qty),
price: clampNumber(order.price),
status: normalizedStatus,
timestamp: clampNumber(order.timestamp, Date.now()),
stop_loss: toOptionalNumber(order.stop_loss),
take_profit: toOptionalNumber(order.take_profit),
trade_id: toOptionalString(order.trade_id),
action: normalizedAction,
sub_tag: resolvePersistedOrderSubTag(order),
}, buildDocId('trade_order', order.profile_id, order.order_id, order.trade_id, randomUUID()))) as OrderDocument,
type: 'trade_order'
};
return payload;
}
async function queryOrders(params?: {
userId?: string;
profileId?: string;
orderId?: string;
tradeId?: string;
symbol?: string;
symbols?: string[];
statuses?: string[];
action?: string;
requireSubTag?: boolean;
orderBy?: 'created_at' | 'updated_at' | 'timestamp';
ascending?: boolean;
limit?: number;
}): Promise<OrderDocument[]> {
ensureCosmos();
const filters = ['c.productId = @productId', 'c.type = @type'];
const parameters: Array<{ name: string; value: unknown }> = [
{ name: '@productId', value: config.PRODUCT_ID },
{ name: '@type', value: 'trade_order' },
];
if (params?.userId) {
filters.push('c.user_id = @userId');
parameters.push({ name: '@userId', value: params.userId });
}
if (params?.profileId) {
filters.push('c.profile_id = @profileId');
parameters.push({ name: '@profileId', value: params.profileId });
}
if (params?.orderId) {
filters.push('(c.order_id = @orderId OR c.id = @orderId)');
parameters.push({ name: '@orderId', value: params.orderId });
}
if (params?.tradeId) {
filters.push('c.trade_id = @tradeId');
parameters.push({ name: '@tradeId', value: params.tradeId });
}
if (params?.symbol) {
filters.push('c.symbol = @symbol');
parameters.push({ name: '@symbol', value: params.symbol });
}
if (params?.symbols && params.symbols.length > 0) {
filters.push('ARRAY_CONTAINS(@symbols, c.symbol)');
parameters.push({ name: '@symbols', value: params.symbols });
}
if (params?.statuses && params.statuses.length > 0) {
filters.push('ARRAY_CONTAINS(@statuses, c.status)');
parameters.push({ name: '@statuses', value: params.statuses.map((status) => normalizeOrderStatus(status)) });
}
if (params?.action) {
filters.push('c.action = @action');
parameters.push({ name: '@action', value: normalizeOrderAction(params.action) || params.action });
}
if (params?.requireSubTag) {
filters.push('IS_DEFINED(c.sub_tag) AND c.sub_tag != ""');
}
const orderBy = params?.orderBy || 'created_at';
const direction = params?.ascending ? 'ASC' : 'DESC';
const limit = Math.max(1, Math.min(10_000, Math.floor(Number(params?.limit || 1000))));
const query = `SELECT TOP ${limit} * FROM c WHERE ${filters.join(' AND ')} ORDER BY c.${orderBy} ${direction}`;
return await queryDocuments<OrderDocument>(ORDER_CONTAINER, query, parameters);
}
async function upsertOrderDocument(document: OrderDocument): Promise<OrderDocument> {
const now = nowIso();
return await upsertDocument<OrderDocument>(ORDER_CONTAINER, {
...document,
updated_at: now,
created_at: String(document.created_at || now),
});
}
async function queryTradeHistory(params?: {
userId?: string;
profileId?: string;
tradeId?: string;
symbol?: string;
limit?: number;
}): Promise<TradeHistoryDocument[]> {
ensureCosmos();
const filters = ['c.productId = @productId', 'c.type = @type'];
const parameters: Array<{ name: string; value: unknown }> = [
{ name: '@productId', value: config.PRODUCT_ID },
{ name: '@type', value: 'trade_history' },
];
if (params?.userId) {
filters.push('c.user_id = @userId');
parameters.push({ name: '@userId', value: params.userId });
}
if (params?.profileId) {
filters.push('c.profile_id = @profileId');
parameters.push({ name: '@profileId', value: params.profileId });
}
if (params?.tradeId) {
filters.push('c.trade_id = @tradeId');
parameters.push({ name: '@tradeId', value: params.tradeId });
}
if (params?.symbol) {
filters.push('c.symbol = @symbol');
parameters.push({ name: '@symbol', value: params.symbol });
}
const limit = Math.max(1, Math.min(10_000, Math.floor(Number(params?.limit || 5000))));
const query = `SELECT TOP ${limit} * FROM c WHERE ${filters.join(' AND ')} ORDER BY c.created_at DESC`;
return await queryDocuments<TradeHistoryDocument>(TRADE_HISTORY_CONTAINER, query, parameters);
}
async function saveTradeHistoryDocument(transaction: {
user_id: string;
profile_id?: string;
symbol: string;
side: string;
entry_price: number;
exit_price: number;
size: number;
pnl: number;
pnl_percent: number;
reason: string;
timestamp: number;
stop_loss?: number;
take_profit?: number;
rules_metadata?: Record<string, any>;
trade_id?: string;
source?: 'BOT' | 'MANUAL';
}): Promise<TradeHistoryDocument> {
ensureCosmos();
const document = buildBaseDocument('trade_history', {
user_id: transaction.user_id,
profile_id: toOptionalString(transaction.profile_id),
symbol: String(transaction.symbol || '').trim(),
side: normalizeTradeSide(transaction.side),
entry_price: clampNumber(transaction.entry_price),
exit_price: clampNumber(transaction.exit_price),
size: clampNumber(transaction.size),
pnl: clampNumber(transaction.pnl),
pnl_percent: clampNumber(transaction.pnl_percent),
reason: String(transaction.reason || '').trim(),
timestamp: clampNumber(transaction.timestamp, Date.now()),
stop_loss: toOptionalNumber(transaction.stop_loss),
take_profit: toOptionalNumber(transaction.take_profit),
rules_metadata: transaction.rules_metadata,
trade_id: toOptionalString(transaction.trade_id),
source: transaction.source || 'BOT',
}, buildDocId('trade_history', transaction.profile_id, transaction.trade_id, transaction.timestamp, randomUUID()));
return await upsertDocument<TradeHistoryDocument>(TRADE_HISTORY_CONTAINER, document as TradeHistoryDocument);
}
async function queryAuditDocuments(params?: {
batchId?: string;
profileId?: string;
symbol?: string;
decisions?: string[];
limit?: number;
}): Promise<ReconciliationAuditDocument[]> {
ensureCosmos();
const filters = ['c.productId = @productId', 'c.type = @type'];
const parameters: Array<{ name: string; value: unknown }> = [
{ name: '@productId', value: config.PRODUCT_ID },
{ name: '@type', value: 'reconciliation_backfill_audit' },
];
if (params?.batchId) {
filters.push('c.batch_id = @batchId');
parameters.push({ name: '@batchId', value: params.batchId });
}
if (params?.profileId) {
filters.push('c.profile_id = @profileId');
parameters.push({ name: '@profileId', value: params.profileId });
}
if (params?.symbol) {
filters.push('c.symbol = @symbol');
parameters.push({ name: '@symbol', value: params.symbol });
}
if (params?.decisions && params.decisions.length > 0) {
filters.push('ARRAY_CONTAINS(@decisions, c.decision)');
parameters.push({ name: '@decisions', value: params.decisions });
}
const limit = Math.max(1, Math.min(10_000, Math.floor(Number(params?.limit || 5000))));
const query = `SELECT TOP ${limit} * FROM c WHERE ${filters.join(' AND ')} ORDER BY c.created_at DESC`;
return await queryDocuments<ReconciliationAuditDocument>(RECONCILIATION_AUDIT_CONTAINER, query, parameters);
}
export async function logTransaction(transaction: {
user_id: string;
profile_id?: string;
symbol: string;
side: string;
entry_price: number;
exit_price: number;
size: number;
pnl: number;
pnl_percent: number;
reason: string;
timestamp: number;
stop_loss?: number;
take_profit?: number;
rules_metadata?: Record<string, any>;
trade_id?: string;
source?: 'BOT' | 'MANUAL';
}) {
try {
await saveTradeHistoryDocument(transaction);
logger.info(`Logged trade history for ${transaction.user_id} (${transaction.symbol}) to Cosmos`);
} catch (error: any) {
logger.error(`[RuntimeOrderRepo] Trade history persistence failed: ${error.message}`);
}
}
export async function logOrder(order: {
user_id: string;
profile_id?: string;
order_id?: string;
symbol: string;
type: string;
side: string;
qty: number;
price: number;
status: string;
timestamp: number;
stop_loss?: number;
take_profit?: number;
trade_id?: string;
action?: string;
sub_tag?: string;
subTag?: string;
}) {
try {
ensureCosmos();
const incoming = normalizeOrderDocument(order);
const existingRows = incoming.order_id
? await queryOrders({
orderId: String(incoming.order_id),
profileId: String(incoming.profile_id || '').trim() || undefined,
limit: 1
})
: [];
const existing = existingRows[0];
if (!existing) {
await upsertOrderDocument(incoming);
return;
}
const existingStatus = normalizeOrderStatus(String(existing.status || 'pending_new'));
const mergedStatus = pickMostReliableOrderStatus(existingStatus, String(incoming.status || 'pending_new'));
const mergedQty = orderStatusRank(existingStatus) > orderStatusRank(String(incoming.status || ''))
? clampNumber(existing.qty || existing.quantity)
: clampNumber(incoming.qty || incoming.quantity);
const mergedPrice = orderStatusRank(existingStatus) > orderStatusRank(String(incoming.status || ''))
? clampNumber(existing.price)
: clampNumber(incoming.price);
const mergedTimestamp = Math.max(orderTimestamp(existing, 0), orderTimestamp(incoming, 0));
const mergedSubTag = toOptionalString(incoming.sub_tag) || toOptionalString(existing.sub_tag);
await upsertOrderDocument({
...existing,
...incoming,
id: existing.id,
status: mergedStatus,
qty: mergedQty,
quantity: mergedQty,
price: mergedPrice,
timestamp: mergedTimestamp,
filled_at: incoming.filled_at || existing.filled_at,
sub_tag: mergedSubTag,
});
} catch (error: any) {
logger.error(`[RuntimeOrderRepo] Order persistence failed: ${error.message}`);
}
}
export async function getOpenOrdersForProfile(profileId: string): Promise<any[]> {
if (!profileId) return [];
return await queryOrders({
profileId,
statuses: OPEN_ORDER_STATUSES,
orderBy: 'created_at',
ascending: true,
limit: 2000
});
}
export async function getRecentlyClosedOrdersForProfile(profileId: string, minutes: number = 10): Promise<any[]> {
if (!profileId) return [];
const sinceMs = Date.now() - Math.max(1, Math.floor(minutes)) * 60 * 1000;
const rows = await queryOrders({
profileId,
statuses: CLOSED_ORDER_STATUSES,
orderBy: 'updated_at',
ascending: true,
limit: 2000
});
return rows.filter((row) => toTimestampMs(row.updated_at, 0) >= sinceMs);
}
export async function getPendingOrdersForProfile(profileId: string): Promise<any[]> {
if (!profileId) return [];
return await queryOrders({ profileId, statuses: ['pending_new'], limit: 1000 });
}
export async function getLatestOrder(userId: string, symbol: string): Promise<any | null> {
const rows = await queryOrders({
userId,
symbol,
orderBy: 'timestamp',
ascending: false,
limit: 1
});
return rows[0] || null;
}
export async function getOrderByTradeId(tradeId: string, profileId?: string): Promise<any | null> {
const rows = await queryOrders({
tradeId: String(tradeId || '').trim(),
profileId: String(profileId || '').trim() || undefined,
orderBy: 'created_at',
ascending: false,
limit: 1
});
const row = rows[0];
if (!row) return null;
return {
order_id: row.order_id,
status: row.status,
qty: row.qty,
price: row.price,
symbol: row.symbol,
action: row.action,
stop_loss: row.stop_loss,
take_profit: row.take_profit,
};
}
export async function getLatestEntryOrder(profileId: string | undefined, symbol: string, userId?: string): Promise<any | null> {
const rows = await queryOrders({
profileId: String(profileId || '').trim() || undefined,
userId: String(userId || '').trim() || undefined,
symbol,
action: 'ENTRY',
orderBy: 'created_at',
ascending: false,
limit: 1
});
return rows[0] || null;
}
export async function getLatestFilledEntry(userId: string, symbol: string, profileId?: string): Promise<any | null> {
const rows = await queryOrders({
profileId: String(profileId || '').trim() || undefined,
userId: profileId ? undefined : userId,
symbol,
action: 'ENTRY',
statuses: ['filled', 'partially_filled'],
orderBy: 'timestamp',
ascending: false,
limit: 1
});
return rows[0] || null;
}
export async function getLatestEntryRiskOrder(profileId: string, symbol: string, side?: 'BUY' | 'SELL'): Promise<any | null> {
const rows = await queryOrders({
profileId,
symbol,
action: 'ENTRY',
statuses: ['filled', 'partially_filled'],
orderBy: 'created_at',
ascending: false,
limit: 250
});
return rows.find((row) => {
if (side && normalizeTradeSide(String(row.side || 'BUY')) !== side) return false;
return clampNumber(row.stop_loss) > 0 || clampNumber(row.take_profit) > 0;
}) || null;
}
export async function hasActiveOrderForTradeId(tradeId: string, profileId?: string): Promise<boolean> {
const rows = await queryOrders({
tradeId: String(tradeId || '').trim(),
profileId: isUuid(profileId) ? profileId : undefined,
statuses: ['pending_new', 'accepted', 'new', 'partially_filled'],
limit: 1
});
return rows.length > 0;
}
export async function hasFinalizedTradeHistory(tradeId: string, profileId?: string, symbol?: string): Promise<boolean> {
const rows = await queryTradeHistory({
tradeId: String(tradeId || '').trim(),
profileId: String(profileId || '').trim() || undefined,
symbol,
limit: 100
});
return rows.some((row) => !String(row.reason || '').toLowerCase().includes('partial exit'));
}
export async function hasLifecycleEntryOrder(tradeId: string, profileId?: string, symbol?: string): Promise<boolean> {
const rows = await queryOrders({
tradeId: String(tradeId || '').trim(),
profileId: isUuid(profileId) ? profileId : undefined,
symbol,
statuses: ['filled', 'partially_filled'],
limit: 250
});
return rows.some((row) => inferLifecycleAction(row.action, row.side) === 'ENTRY');
}
export async function hasLifecycleEntryOrderWithProfileSubTag(tradeId: string, profileId: string, symbol?: string): Promise<boolean> {
const rows = await queryOrders({
tradeId: String(tradeId || '').trim(),
profileId: String(profileId || '').trim(),
symbol,
statuses: ['filled', 'partially_filled'],
action: 'ENTRY',
requireSubTag: true,
limit: 250
});
return rows.some((row) => {
const subTag = String(row.sub_tag || '').trim();
return Boolean(subTag && isBytelystSubTag(subTag) && subTagBelongsToProfile(subTag, profileId));
});
}
export async function isTradeLifecycleClosed(tradeId: string, profileId?: string, symbol?: string): Promise<boolean> {
const rows = await queryOrders({
tradeId: String(tradeId || '').trim(),
profileId: isUuid(profileId) ? profileId : undefined,
symbol,
statuses: ['filled', 'partially_filled'],
limit: 2000
});
let entryQty = 0;
let exitQty = 0;
for (const row of rows) {
const qty = clampNumber(row.qty || row.quantity);
if (!(qty > 0)) continue;
const action = inferLifecycleAction(row.action, row.side);
if (action === 'ENTRY') entryQty += qty;
if (action === 'EXIT') exitQty += qty;
}
if (entryQty > 0 && exitQty >= entryQty - 1e-8) {
return true;
}
const historyRows = await queryTradeHistory({
tradeId: String(tradeId || '').trim(),
profileId: isUuid(profileId) ? profileId : undefined,
symbol,
limit: 250
});
if (historyRows.length === 0) return false;
let finalizedRows = 0;
let partialExitQty = 0;
for (const row of historyRows) {
const reason = String(row.reason || '').toLowerCase();
const size = clampNumber(row.size);
if (reason.includes('partial exit')) {
partialExitQty += size;
continue;
}
finalizedRows += 1;
}
return finalizedRows > 0 || (entryQty > 0 && partialExitQty >= entryQty - 1e-8);
}
export async function getExistingOrderIds(orderIds: string[], profileId?: string): Promise<Set<string>> {
const normalizedIds = Array.from(new Set(orderIds.map((id) => String(id || '').trim()).filter(Boolean)));
if (normalizedIds.length === 0) return new Set();
const rows = await queryOrders({
profileId: isUuid(profileId) ? profileId : undefined,
limit: Math.max(normalizedIds.length, 100)
});
const found = new Set<string>();
for (const row of rows) {
const orderId = String(row.order_id || '').trim();
if (orderId && normalizedIds.includes(orderId)) {
found.add(orderId);
}
}
return found;
}
export async function getKnownTradeIdsForProfile(profileId: string, limit: number = 2000): Promise<string[]> {
const rows = await queryOrders({
profileId,
orderBy: 'created_at',
ascending: false,
limit
});
const tradeIds = new Set<string>();
for (const row of rows) {
const tradeId = String(row.trade_id || '').trim();
if (tradeId) {
tradeIds.add(tradeId);
}
}
return Array.from(tradeIds).slice(0, Math.max(1, Math.min(10000, Math.floor(limit))));
}
export async function updateOrderStatus(orderId: string, status: string, filledAt?: Date, price?: number, qty?: number) {
const rows = await queryOrders({ orderId: String(orderId || '').trim(), limit: 2 });
const timestamp = nowIso();
await Promise.all(rows.map(async (row) => {
await upsertOrderDocument({
...row,
status: normalizeOrderStatus(status),
updated_at: timestamp,
filled_at: filledAt ? filledAt.toISOString() : row.filled_at,
price: price && price > 0 ? price : row.price,
qty: qty && qty > 0 ? qty : row.qty,
quantity: qty && qty > 0 ? qty : (row.quantity || row.qty),
});
}));
}
async function fetchFilledLifecycleOrders(options: {
userId?: string;
profileId?: string;
symbols?: string[];
maxRows?: number;
}): Promise<{ rows: FilledLifecycleOrderRow[]; truncated: boolean }> {
const limit = Math.max(1000, Math.min(200_000, Math.floor(Number(options.maxRows || 50_000))));
const rows = await queryOrders({
userId: String(options.userId || '').trim() || undefined,
profileId: String(options.profileId || '').trim() || undefined,
statuses: FILLED_ORDER_STATUSES,
limit
});
const safeSymbols = Array.isArray(options.symbols) ? new Set(options.symbols.filter(Boolean)) : null;
const filtered = safeSymbols
? rows.filter((row) => safeSymbols.has(String(row.symbol || '')))
: rows;
return {
rows: sortByCreatedAtAsc(filtered),
truncated: filtered.length >= limit
};
}
export async function getFilledLifecycleOrdersForProfile(profileId: string, symbols?: string[]): Promise<FilledLifecycleOrderRow[]> {
return (await fetchFilledLifecycleOrders({ profileId, symbols })).rows;
}
export async function getFilledLifecycleOrdersForUser(options: {
userId: string;
profileId?: string;
symbols?: string[];
maxRows?: number;
}): Promise<{ rows: FilledLifecycleOrderRow[]; truncated: boolean }> {
return await fetchFilledLifecycleOrders(options);
}
export async function getFilledLifecycleOrdersGlobal(options?: {
profileId?: string;
symbols?: string[];
maxRows?: number;
}): Promise<{ rows: FilledLifecycleOrderRow[]; truncated: boolean }> {
return await fetchFilledLifecycleOrders(options || {});
}
export async function insertReconciliationBackfillAuditRows(rows: ReconciliationBackfillAuditInsert[]): Promise<boolean> {
try {
ensureCosmos();
await Promise.all(rows.map(async (row, index) => {
const doc = buildBaseDocument('reconciliation_backfill_audit', {
...row,
created_at: row.applied_at || row.reverted_at || nowIso(),
updated_at: row.reverted_at || row.applied_at || nowIso(),
id: String(index + 1),
}, buildDocId('reconciliation_audit', row.batch_id, row.trade_id, row.exchange_order_id, index));
await upsertDocument<ReconciliationAuditDocument>(RECONCILIATION_AUDIT_CONTAINER, doc as ReconciliationAuditDocument);
}));
return true;
} catch (error: any) {
logger.error(`[RuntimeOrderRepo] Failed to save reconciliation audit rows: ${error.message}`);
return false;
}
}
export async function upsertReconciliationBackfillOrders(rows: ReconciliationBackfillOrderInsert[]): Promise<boolean> {
try {
await Promise.all(rows.map(async (row) => {
await logOrder({
user_id: row.user_id,
profile_id: row.profile_id,
order_id: row.order_id,
symbol: row.symbol,
type: row.type,
side: row.side,
qty: row.qty || row.quantity,
price: row.price,
status: row.status,
timestamp: row.timestamp,
trade_id: row.trade_id,
action: row.action,
sub_tag: row.sub_tag,
});
if (row.filled_at) {
await updateOrderStatus(row.order_id, row.status, new Date(row.filled_at), row.price, row.qty || row.quantity);
}
}));
return true;
} catch (error: any) {
logger.error(`[RuntimeOrderRepo] Failed to upsert reconciliation backfill orders: ${error.message}`);
return false;
}
}
export async function getReconciliationBackfillAuditRows(query: ReconciliationBackfillAuditQuery): Promise<{ rows: ReconciliationBackfillAuditRow[]; totalCount: number }> {
const rows = await queryAuditDocuments({
batchId: query.batchId,
profileId: query.profileId,
symbol: query.symbol,
decisions: query.decisions,
limit: Math.max(1, Math.min(5000, Math.floor(Number(query.limit || 500))))
});
const filtered = rows.filter((row) => {
const createdAt = toTimestampMs(row.created_at, 0);
if (query.fromIso && createdAt < Date.parse(query.fromIso)) return false;
if (query.toIso && createdAt > Date.parse(query.toIso)) return false;
return true;
});
const offset = Math.max(0, Math.floor(Number(query.offset || 0)));
const limit = Math.max(1, Math.min(5000, Math.floor(Number(query.limit || 500))));
return {
rows: filtered.slice(offset, offset + limit).map((row, index) => ({
...row,
id: Number.parseInt(String(row.id || index + 1), 10) || (index + 1)
})),
totalCount: filtered.length
};
}
export async function getReconciliationBackfillBatchSummaries(query?: {
profileId?: string;
symbol?: string;
fromIso?: string;
toIso?: string;
limit?: number;
}): Promise<ReconciliationBackfillBatchSummary[]> {
const rows = await queryAuditDocuments({
profileId: query?.profileId,
symbol: query?.symbol,
limit: 10_000
});
const summaryByBatch = new Map<string, ReconciliationBackfillBatchSummary>();
for (const row of rows) {
const batchId = String(row.batch_id || '').trim();
if (!batchId) continue;
const createdAt = String(row.created_at || nowIso());
const existing = summaryByBatch.get(batchId) || {
batchId,
firstSeenAt: createdAt,
lastSeenAt: createdAt,
profileIds: [],
symbols: [],
totalRows: 0,
byDecision: {},
dryRunRows: 0,
appliedRows: 0,
revertedRows: 0,
};
existing.firstSeenAt = existing.firstSeenAt < createdAt ? existing.firstSeenAt : createdAt;
existing.lastSeenAt = existing.lastSeenAt > createdAt ? existing.lastSeenAt : createdAt;
if (row.profile_id && !existing.profileIds.includes(row.profile_id)) existing.profileIds.push(row.profile_id);
if (row.symbol && !existing.symbols.includes(row.symbol)) existing.symbols.push(row.symbol);
existing.totalRows += 1;
existing.byDecision[row.decision] = (existing.byDecision[row.decision] || 0) + 1;
if (row.dry_run) existing.dryRunRows += 1;
if (row.applied_at) existing.appliedRows += 1;
if (row.reverted_at) existing.revertedRows += 1;
summaryByBatch.set(batchId, existing);
}
const fromMs = query?.fromIso ? Date.parse(query.fromIso) : 0;
const toMs = query?.toIso ? Date.parse(query.toIso) : Number.POSITIVE_INFINITY;
const summaries = Array.from(summaryByBatch.values())
.filter((summary) => {
const lastSeen = Date.parse(summary.lastSeenAt);
return lastSeen >= fromMs && lastSeen <= toMs;
})
.sort((a, b) => Date.parse(b.lastSeenAt) - Date.parse(a.lastSeenAt));
return summaries.slice(0, Math.max(1, Math.min(1000, Math.floor(Number(query?.limit || 200)))));
}
export async function revertBackfillBatch(batchId: string): Promise<{ reverted: number; errors: string[] }> {
const rows = await queryAuditDocuments({ batchId: String(batchId || '').trim(), limit: 5000 });
const errors: string[] = [];
let reverted = 0;
await Promise.all(rows.map(async (row) => {
const orderId = String(row.backfill_order_id || '').trim();
if (!orderId) return;
try {
await updateOrderStatus(orderId, 'canceled');
await upsertDocument<ReconciliationAuditDocument>(RECONCILIATION_AUDIT_CONTAINER, {
...row,
reverted_at: nowIso(),
updated_at: nowIso(),
});
reverted += 1;
} catch (error: any) {
errors.push(`${orderId}: ${error.message}`);
}
}));
return { reverted, errors };
}
export async function repairMissingSubTagsForProfile(options: {
profileId: string;
lookbackHours: number;
maxRows: number;
dryRun: boolean;
}): Promise<ReconciliationSubTagRepairSummary> {
const summary: ReconciliationSubTagRepairSummary = {
attempted: true,
scannedRows: 0,
eligibleRows: 0,
updatedRows: 0,
skippedNoProfile: 0,
skippedNoTrade: 0,
skippedTagDisabled: 0,
skippedAlreadyTagged: 0,
dryRun: Boolean(options.dryRun)
};
if (!shouldAttachAlpacaSubTag({ profileId: options.profileId })) {
return {
...summary,
unsupported: true
};
}
const sinceMs = Date.now() - Math.max(1, Math.floor(options.lookbackHours || 720)) * 60 * 60 * 1000;
const rows = await queryOrders({
profileId: options.profileId,
limit: Math.max(1, Math.min(5000, Math.floor(Number(options.maxRows || 500))))
});
const candidates = rows.filter((row) => {
const createdAt = toTimestampMs(row.created_at, 0);
return createdAt >= sinceMs && !String(row.sub_tag || '').trim() && String(row.source || '').trim() !== 'MANUAL';
});
summary.scannedRows = candidates.length;
for (const row of candidates) {
const profileId = String(row.profile_id || '').trim();
if (!profileId) {
summary.skippedNoProfile += 1;
continue;
}
const tradeId = String(row.trade_id || '').trim();
if (!tradeId) {
summary.skippedNoTrade += 1;
continue;
}
if (String(row.sub_tag || '').trim()) {
summary.skippedAlreadyTagged += 1;
continue;
}
const derived = resolvePersistedOrderSubTag({
profile_id: profileId,
trade_id: tradeId,
action: String(row.action || ''),
});
if (!derived) {
summary.skippedTagDisabled += 1;
continue;
}
summary.eligibleRows += 1;
if (summary.dryRun) continue;
await upsertOrderDocument({
...row,
sub_tag: derived,
updated_at: nowIso(),
});
summary.updatedRows += 1;
}
return summary;
}
export async function getStaleOrders(staleThresholdMinutes: number = 5, scope?: string | StaleOrderScope): Promise<any[]> {
const scopeObject: StaleOrderScope = typeof scope === 'string' ? { profileId: scope } : (scope || {});
const threshold = Date.now() - staleThresholdMinutes * 60 * 1000;
const rows = await queryOrders({
profileId: String(scopeObject.profileId || '').trim() || undefined,
userId: String(scopeObject.userId || '').trim() || undefined,
statuses: ['pending_new', 'pending', 'accepted', 'new'],
orderBy: 'created_at',
ascending: true,
limit: 250
});
return rows.filter((row) => {
const createdAt = toTimestampMs(row.created_at, 0);
if (!(createdAt > 0 && createdAt < threshold)) return false;
if (scopeObject.profileNullOnly && row.profile_id) return false;
if (scopeObject.includeOrphanUserOrders && scopeObject.profileId) {
return row.profile_id === scopeObject.profileId || (!row.profile_id && row.user_id === scopeObject.userId);
}
return true;
});
}
export async function getExpiredOrUnknownOrders(): Promise<any[]> {
return await queryOrders({
statuses: ['expired', 'unknown'],
limit: 2000
});
}
export async function isReconciliationBackfillAuditAvailable(): Promise<boolean> {
return cosmosEnabled();
}
export async function getVirtualOpenPosition(profileId: string, symbol: string): Promise<VirtualOpenPosition | null> {
const symbolCandidates = buildLifecycleSymbolCandidates(symbol);
if (!symbolCandidates.length) return null;
const rows = await queryOrders({
profileId,
symbols: symbolCandidates,
statuses: ['filled', 'partially_filled'],
limit: 5000
});
type TradeLedger = {
tradeId: string;
side: 'BUY' | 'SELL';
entryQty: number;
entryNotional: number;
entryLastPrice: number;
exitQty: number;
userId?: string;
stopLoss: number;
takeProfit: number;
lastTs: number;
};
type SideAggregate = {
side: 'BUY' | 'SELL';
qty: number;
notional: number;
userId?: string;
stopLoss: number;
takeProfit: number;
tradeIds: string[];
primaryTradeId: string;
primaryTs: number;
};
const orderedRows = rows
.map((row, index) => ({ row, index, ts: orderTimestamp(row, index) }))
.sort((a, b) => (a.ts - b.ts) || (a.index - b.index));
const ledgerByTrade = new Map<string, TradeLedger>();
const entrySideByTrade = new Map<string, 'BUY' | 'SELL'>();
const openTradeQueueBySide: Record<'BUY' | 'SELL', string[]> = { BUY: [], SELL: [] };
const normalizeToken = (value: string): string => value.replace(/[^A-Za-z0-9]/g, '').slice(0, 24) || 'token';
const profileToken = normalizeToken(profileId);
const symbolToken = normalizeToken(symbol);
let syntheticCounter = 0;
const buildSyntheticTradeId = (side: 'BUY' | 'SELL', ts: number): string => {
syntheticCounter += 1;
const tsToken = Number.isFinite(ts) && ts > 0 ? Math.trunc(ts) : syntheticCounter;
return `__legacy__-${profileToken}-${symbolToken}-${side}-${tsToken}-${String(syntheticCounter).padStart(4, '0')}`;
};
for (const { row, ts } of orderedRows) {
const qty = clampNumber(row.qty || row.quantity);
if (qty <= 0) continue;
const rowSide = normalizeTradeSide(row.side || 'BUY');
const rawTradeId = String(row.trade_id || '').trim();
const oppositeSide: 'BUY' | 'SELL' = rowSide === 'BUY' ? 'SELL' : 'BUY';
const explicitAction = normalizeOrderAction(row.action || undefined);
let tradeId = rawTradeId;
let action = explicitAction;
if (!action && !tradeId) {
action = openTradeQueueBySide[oppositeSide].length > 0 ? 'EXIT' : 'ENTRY';
}
if (!tradeId) {
if (action === 'EXIT' && openTradeQueueBySide[oppositeSide].length > 0) {
tradeId = openTradeQueueBySide[oppositeSide][0];
} else {
tradeId = buildSyntheticTradeId(action === 'EXIT' ? oppositeSide : rowSide, ts);
}
}
if (!action) {
const knownEntrySide = entrySideByTrade.get(tradeId);
action = knownEntrySide ? (rowSide === knownEntrySide ? 'ENTRY' : 'EXIT') : inferLifecycleAction(undefined, row.side);
}
let ledger = ledgerByTrade.get(tradeId);
if (!ledger) {
ledger = {
tradeId,
side: rowSide,
entryQty: 0,
entryNotional: 0,
entryLastPrice: 0,
exitQty: 0,
userId: toOptionalString(row.user_id),
stopLoss: 0,
takeProfit: 0,
lastTs: ts
};
ledgerByTrade.set(tradeId, ledger);
}
if (action === 'ENTRY') {
if (ledger.entryQty === 0) ledger.side = rowSide;
ledger.entryQty += qty;
entrySideByTrade.set(tradeId, ledger.side);
if (!openTradeQueueBySide[ledger.side].includes(tradeId)) {
openTradeQueueBySide[ledger.side].push(tradeId);
}
const price = clampNumber(row.price);
if (price > 0) {
ledger.entryNotional += price * qty;
ledger.entryLastPrice = price;
}
const stopLoss = clampNumber(row.stop_loss);
const takeProfit = clampNumber(row.take_profit);
if (stopLoss > 0) ledger.stopLoss = stopLoss;
if (takeProfit > 0) ledger.takeProfit = takeProfit;
} else {
ledger.exitQty += qty;
const queue = openTradeQueueBySide[oppositeSide];
const idx = queue.findIndex((value) => value === tradeId);
if (idx >= 0) queue.splice(idx, 1);
else if (queue.length > 0) queue.shift();
}
if (row.user_id) ledger.userId = String(row.user_id);
ledger.lastTs = Math.max(ledger.lastTs, ts);
}
const aggregateBySide = new Map<'BUY' | 'SELL', SideAggregate>();
for (const tradeLedger of ledgerByTrade.values()) {
const remainingQty = tradeLedger.entryQty - tradeLedger.exitQty;
if (remainingQty <= dustThresholdQty()) continue;
const weightedEntryPrice = tradeLedger.entryQty > 0 && tradeLedger.entryNotional > 0
? tradeLedger.entryNotional / tradeLedger.entryQty
: tradeLedger.entryLastPrice;
if (!(weightedEntryPrice > 0)) continue;
const normalizedTradeId = tradeLedger.tradeId.startsWith('__legacy__-')
? `TRD-LEGACY-${tradeLedger.tradeId.slice('__legacy__-'.length)}`
: tradeLedger.tradeId;
let aggregate = aggregateBySide.get(tradeLedger.side);
if (!aggregate) {
aggregate = {
side: tradeLedger.side,
qty: 0,
notional: 0,
userId: tradeLedger.userId,
stopLoss: tradeLedger.stopLoss,
takeProfit: tradeLedger.takeProfit,
tradeIds: [],
primaryTradeId: normalizedTradeId,
primaryTs: tradeLedger.lastTs
};
aggregateBySide.set(tradeLedger.side, aggregate);
}
aggregate.qty += remainingQty;
aggregate.notional += remainingQty * weightedEntryPrice;
if (!aggregate.tradeIds.includes(normalizedTradeId)) aggregate.tradeIds.push(normalizedTradeId);
if (tradeLedger.lastTs >= aggregate.primaryTs) {
aggregate.primaryTs = tradeLedger.lastTs;
aggregate.primaryTradeId = normalizedTradeId;
aggregate.userId = tradeLedger.userId || aggregate.userId;
if (tradeLedger.stopLoss > 0) aggregate.stopLoss = tradeLedger.stopLoss;
if (tradeLedger.takeProfit > 0) aggregate.takeProfit = tradeLedger.takeProfit;
}
}
let dominant: SideAggregate | null = null;
for (const candidate of aggregateBySide.values()) {
if (!dominant || candidate.qty > dominant.qty) {
dominant = candidate;
}
}
if (!dominant || dominant.qty <= 1e-8) return null;
const entryPrice = dominant.notional / dominant.qty;
return {
profileId,
symbol,
side: dominant.side,
qty: Number(dominant.qty.toFixed(8)),
entryPrice: Number(entryPrice.toFixed(8)),
stopLoss: Number((dominant.stopLoss || 0).toFixed(8)),
takeProfit: Number((dominant.takeProfit || 0).toFixed(8)),
userId: dominant.userId,
tradeId: dominant.primaryTradeId,
tradeIds: dominant.tradeIds
};
}
export async function getVirtualOpenPositionForTrade(profileId: string, symbol: string, tradeId: string): Promise<VirtualOpenPosition | null> {
const symbolCandidates = buildLifecycleSymbolCandidates(symbol);
if (!symbolCandidates.length) return null;
const rows = await queryOrders({
profileId,
tradeId: String(tradeId || '').trim(),
symbols: symbolCandidates,
statuses: ['filled', 'partially_filled'],
orderBy: 'created_at',
ascending: true,
limit: 1000
});
if (!rows.length) return null;
let entrySide: 'BUY' | 'SELL' | null = null;
let entryQty = 0;
let entryNotional = 0;
let exitQty = 0;
let stopLoss = 0;
let takeProfit = 0;
let userId: string | undefined;
for (const row of sortByCreatedAtAsc(rows)) {
const qty = clampNumber(row.qty || row.quantity);
if (qty <= 0) continue;
const side = normalizeTradeSide(row.side || 'BUY');
let action = normalizeOrderAction(row.action || undefined);
if (!action) {
action = entrySide ? (side === entrySide ? 'ENTRY' : 'EXIT') : inferLifecycleAction(undefined, row.side);
}
if (action === 'ENTRY') {
if (!entrySide) entrySide = side;
entryQty += qty;
const price = clampNumber(row.price);
if (price > 0) entryNotional += price * qty;
const sl = clampNumber(row.stop_loss);
const tp = clampNumber(row.take_profit);
if (sl > 0) stopLoss = sl;
if (tp > 0) takeProfit = tp;
} else {
exitQty += qty;
}
userId = toOptionalString(row.user_id) || userId;
}
const remainingQty = entryQty - exitQty;
if (!(remainingQty > dustThresholdQty()) || !(entryNotional > 0) || !entrySide) return null;
return {
profileId,
symbol,
side: entrySide,
qty: Number(remainingQty.toFixed(8)),
entryPrice: Number((entryNotional / entryQty).toFixed(8)),
stopLoss: Number((stopLoss || 0).toFixed(8)),
takeProfit: Number((takeProfit || 0).toFixed(8)),
userId,
tradeId: String(tradeId || '').trim(),
tradeIds: [String(tradeId || '').trim()]
};
}