learning_ai_invt_trdg/backend/src/services/profileRepository.ts

672 lines
26 KiB
TypeScript

import { getContainer } from '@bytelyst/cosmos';
import { randomUUID } from 'node:crypto';
import { config } from '../config/index.js';
import logger from '../utils/logger.js';
import type { supabaseService } from './SupabaseService.js';
type LegacySupabaseService = typeof supabaseService;
export interface TradingUserProfile {
user_id: string;
first_name: string;
last_name: string;
email: string;
role: string;
trade_enable: boolean;
ALPACA_API_KEY?: string;
ALPACA_SECRET_KEY?: string;
REAL_ALPACA_API_KEY?: string;
REAL_ALPACA_SECRET_KEY?: string;
drop_threshold_for_buy?: number;
gain_threshold_for_sell?: number;
market_poll_interval_in_seconds?: number;
}
export interface TradeProfileRecord {
id: string;
user_id: string;
name: string;
allocated_capital: number;
risk_per_trade_percent: number;
symbols: string;
is_active: boolean;
strategy_config: Record<string, unknown>;
created_at?: string;
updated_at?: string;
}
export interface TradeProfileCapitalSummary {
allocatedCapital: number;
isActive: boolean;
userId?: string;
}
interface TradeProfileDocument extends TradeProfileRecord {
productId: string;
type: 'trade_profile';
createdAt: string;
updatedAt: string;
}
const PROFILE_CONTAINER = 'trade_profiles';
const USER_PROFILE_CONTAINER = 'trading_users';
interface TradingUserProfileDocument extends TradingUserProfile {
id: string;
productId: string;
type: 'trading_user';
createdAt: string;
updatedAt: string;
}
function isCosmosConfigured(): boolean {
return Boolean(config.COSMOS_ENDPOINT && config.COSMOS_KEY);
}
function normalizeProfile(row: Partial<TradeProfileRecord> | null | undefined): TradeProfileRecord | null {
const id = String(row?.id || '').trim();
const userId = String(row?.user_id || '').trim();
if (!id || !userId) {
return null;
}
return {
id,
user_id: userId,
name: String(row?.name || 'Untitled Strategy').trim() || 'Untitled Strategy',
allocated_capital: Number(row?.allocated_capital || 0),
risk_per_trade_percent: Number(row?.risk_per_trade_percent || 0),
symbols: String(row?.symbols || 'BTC/USDT'),
is_active: Boolean(row?.is_active),
strategy_config: (row?.strategy_config && typeof row.strategy_config === 'object')
? row.strategy_config as Record<string, unknown>
: {},
created_at: row?.created_at ? String(row.created_at) : undefined,
updated_at: row?.updated_at ? String(row.updated_at) : undefined,
};
}
function buildDefaultTradeProfile(userId: string): TradeProfileRecord {
const timestamp = new Date().toISOString();
return {
id: randomUUID(),
user_id: userId,
name: 'My First Strategy',
allocated_capital: 1000,
risk_per_trade_percent: 1,
symbols: 'BTC/USDT, ETH/USDT',
is_active: false,
strategy_config: {
rules: [
{ ruleId: 'TrendBiasRule', enabled: true, params: { fastPeriod: 50, slowPeriod: 200 } },
{ ruleId: 'MomentumRule', enabled: true, params: { rsiPeriod: 14, overbought: 70, oversold: 30 } },
{ ruleId: 'ZoneRule', enabled: true, params: { zonePercent: 1.5 } },
{ ruleId: 'SessionRule', enabled: true, params: { sessions: 'London,NY' } },
{ ruleId: 'EntryTriggerRule', enabled: true, params: { showPatterns: true } },
{ ruleId: 'RiskManagementRule', enabled: true, params: { maxRisk: 2.0 } },
{ ruleId: 'AIAnalysisRule', enabled: false, params: { minConfidence: 0.7 } },
],
riskLimits: { maxDailyLossUsd: 50, maxOpenTrades: 3, maxConsecutiveLosses: 2 },
execution: { orderType: 'market', cooldownMinutes: 30, entryMode: 'both' },
},
created_at: timestamp,
updated_at: timestamp,
};
}
function normalizeTradingUserProfile(
row: Partial<TradingUserProfile> | null | undefined,
fallbackUserId?: string
): TradingUserProfile | null {
const userId = String(row?.user_id || fallbackUserId || '').trim();
if (!userId) {
return null;
}
return {
user_id: userId,
first_name: String(row?.first_name || ''),
last_name: String(row?.last_name || ''),
email: String(row?.email || ''),
role: String(row?.role || 'member'),
trade_enable: Boolean(row?.trade_enable ?? true),
ALPACA_API_KEY: row?.ALPACA_API_KEY,
ALPACA_SECRET_KEY: row?.ALPACA_SECRET_KEY,
REAL_ALPACA_API_KEY: row?.REAL_ALPACA_API_KEY,
REAL_ALPACA_SECRET_KEY: row?.REAL_ALPACA_SECRET_KEY,
drop_threshold_for_buy: Number(row?.drop_threshold_for_buy ?? 0),
gain_threshold_for_sell: Number(row?.gain_threshold_for_sell ?? 0),
market_poll_interval_in_seconds: Number(row?.market_poll_interval_in_seconds ?? 0),
};
}
function toTradingUserProfileDocument(profile: TradingUserProfile): TradingUserProfileDocument {
const now = new Date().toISOString();
return {
...profile,
id: profile.user_id,
productId: config.PRODUCT_ID,
type: 'trading_user',
createdAt: now,
updatedAt: now,
};
}
async function getTradingUserProfileFromCosmos(userId: string): Promise<TradingUserProfile | null> {
if (!isCosmosConfigured() || !userId) {
return null;
}
const container = getContainer(USER_PROFILE_CONTAINER);
const { resources } = await container.items.query<TradingUserProfileDocument>({
query: 'SELECT TOP 1 * FROM c WHERE c.productId = @productId AND c.type = @type AND c.user_id = @userId',
parameters: [
{ name: '@productId', value: config.PRODUCT_ID },
{ name: '@type', value: 'trading_user' },
{ name: '@userId', value: userId },
],
}).fetchAll();
return normalizeTradingUserProfile(resources[0], userId);
}
async function upsertTradingUserProfileToCosmos(profile: TradingUserProfile): Promise<void> {
if (!isCosmosConfigured()) return;
const container = getContainer(USER_PROFILE_CONTAINER);
await container.items.upsert<TradingUserProfileDocument>(toTradingUserProfileDocument(profile));
}
async function listProfilesFromCosmos(userId: string): Promise<TradeProfileRecord[]> {
if (!isCosmosConfigured()) {
return [];
}
const container = getContainer(PROFILE_CONTAINER);
const { resources } = await container.items.query<TradeProfileDocument>({
query: 'SELECT * FROM c WHERE c.productId = @productId AND c.user_id = @userId AND c.type = @type ORDER BY c.createdAt DESC',
parameters: [
{ name: '@productId', value: config.PRODUCT_ID },
{ name: '@userId', value: userId },
{ name: '@type', value: 'trade_profile' },
],
}).fetchAll();
return resources
.map((resource) => normalizeProfile({
...resource,
created_at: resource.createdAt,
updated_at: resource.updatedAt,
}))
.filter((profile): profile is TradeProfileRecord => Boolean(profile));
}
async function listAllProfilesFromCosmos(): Promise<TradeProfileRecord[]> {
if (!isCosmosConfigured()) {
return [];
}
const container = getContainer(PROFILE_CONTAINER);
const { resources } = await container.items.query<TradeProfileDocument>({
query: 'SELECT * FROM c WHERE c.productId = @productId AND c.type = @type ORDER BY c.createdAt DESC',
parameters: [
{ name: '@productId', value: config.PRODUCT_ID },
{ name: '@type', value: 'trade_profile' },
],
}).fetchAll();
return resources
.map((resource) => normalizeProfile({
...resource,
created_at: resource.createdAt,
updated_at: resource.updatedAt,
}))
.filter((profile): profile is TradeProfileRecord => Boolean(profile));
}
async function listProfilesFromSupabase(userId: string, legacyService?: LegacySupabaseService): Promise<TradeProfileRecord[]> {
const client = legacyService?.getClient?.();
if (!client) {
return [];
}
try {
const { data, error } = await client
.from('trade_profiles')
.select('id,user_id,name,allocated_capital,risk_per_trade_percent,symbols,is_active,strategy_config,created_at,updated_at')
.eq('user_id', userId)
.order('created_at', { ascending: false });
if (error || !Array.isArray(data)) {
return [];
}
return data
.map((row) => normalizeProfile(row as TradeProfileRecord))
.filter((profile): profile is TradeProfileRecord => Boolean(profile));
} catch (error) {
logger.warn(`[Profiles] Legacy profile read failed: ${error instanceof Error ? error.message : 'unknown error'}`);
return [];
}
}
async function seedProfilesToCosmos(profiles: TradeProfileRecord[]): Promise<TradeProfileRecord[]> {
if (!isCosmosConfigured() || profiles.length === 0) {
return profiles;
}
const container = getContainer(PROFILE_CONTAINER);
await Promise.all(profiles.map((profile) => container.items.upsert<TradeProfileDocument>({
...profile,
productId: config.PRODUCT_ID,
type: 'trade_profile',
createdAt: profile.created_at || new Date().toISOString(),
updatedAt: profile.updated_at || new Date().toISOString(),
})));
return profiles;
}
async function listAllProfilesFromSupabase(legacyService?: LegacySupabaseService): Promise<TradeProfileRecord[]> {
const client = legacyService?.getClient?.();
if (!client) {
return [];
}
try {
const { data, error } = await client
.from('trade_profiles')
.select('id,user_id,name,allocated_capital,risk_per_trade_percent,symbols,is_active,strategy_config,created_at,updated_at')
.order('created_at', { ascending: false });
if (error || !Array.isArray(data)) {
return [];
}
return data
.map((row) => normalizeProfile(row as TradeProfileRecord))
.filter((profile): profile is TradeProfileRecord => Boolean(profile));
} catch (error) {
logger.warn(`[Profiles] Legacy global profile read failed: ${error instanceof Error ? error.message : 'unknown error'}`);
return [];
}
}
async function getProfileFromCosmos(profileId: string): Promise<TradeProfileRecord | null> {
if (!isCosmosConfigured() || !profileId) {
return null;
}
const container = getContainer(PROFILE_CONTAINER);
const { resources } = await container.items.query<TradeProfileDocument>({
query: 'SELECT TOP 1 * FROM c WHERE c.productId = @productId AND c.type = @type AND c.id = @id',
parameters: [
{ name: '@productId', value: config.PRODUCT_ID },
{ name: '@type', value: 'trade_profile' },
{ name: '@id', value: profileId },
],
}).fetchAll();
const resource = resources[0];
if (!resource) {
return null;
}
return normalizeProfile({
...resource,
created_at: resource.createdAt,
updated_at: resource.updatedAt,
});
}
async function getProfileFromSupabase(profileId: string, legacyService?: LegacySupabaseService): Promise<TradeProfileRecord | null> {
const client = legacyService?.getClient?.();
if (!client || !profileId) {
return null;
}
try {
const { data, error } = await client
.from('trade_profiles')
.select('id,user_id,name,allocated_capital,risk_per_trade_percent,symbols,is_active,strategy_config,created_at,updated_at')
.eq('id', profileId)
.maybeSingle();
if (error || !data) {
return null;
}
return normalizeProfile(data as TradeProfileRecord);
} catch (error) {
logger.warn(`[Profiles] Legacy profile lookup failed: ${error instanceof Error ? error.message : 'unknown error'}`);
return null;
}
}
async function mirrorProfileToSupabase(profile: TradeProfileRecord, legacyService?: LegacySupabaseService): Promise<void> {
const client = legacyService?.getClient?.();
if (!client) return;
try {
const { error } = await client
.from('trade_profiles')
.upsert({
...profile,
created_at: profile.created_at || new Date().toISOString(),
updated_at: profile.updated_at || new Date().toISOString(),
}, { onConflict: 'id' });
if (error) {
logger.warn(`[Profiles] Legacy profile mirror failed: ${error.message}`);
}
} catch (error) {
logger.warn(`[Profiles] Legacy profile mirror failed: ${error instanceof Error ? error.message : 'unknown error'}`);
}
}
async function deleteProfileFromSupabase(profileId: string, userId: string, legacyService?: LegacySupabaseService): Promise<void> {
const client = legacyService?.getClient?.();
if (!client) return;
try {
const { error } = await client
.from('trade_profiles')
.delete()
.eq('id', profileId)
.eq('user_id', userId);
if (error) {
logger.warn(`[Profiles] Legacy profile delete failed: ${error.message}`);
}
} catch (error) {
logger.warn(`[Profiles] Legacy profile delete failed: ${error instanceof Error ? error.message : 'unknown error'}`);
}
}
export async function listTradeProfilesForUser(userId: string, legacyService?: LegacySupabaseService): Promise<TradeProfileRecord[]> {
if (!isCosmosConfigured()) {
return listProfilesFromSupabase(userId, legacyService);
}
try {
const cosmosProfiles = await listProfilesFromCosmos(userId);
if (cosmosProfiles.length > 0) {
return cosmosProfiles;
}
const seededProfiles = await seedProfilesToCosmos(await listProfilesFromSupabase(userId, legacyService));
if (seededProfiles.length > 0) {
logger.info(`[Profiles] Seeded ${seededProfiles.length} user profiles from legacy store into Cosmos for user ${userId}.`);
}
return seededProfiles;
} catch (error) {
logger.warn(`[Profiles] Cosmos profile read/seed failed for user ${userId}: ${error instanceof Error ? error.message : 'unknown error'}`);
return [];
}
}
export async function listAllTradeProfiles(legacyService?: LegacySupabaseService): Promise<TradeProfileRecord[]> {
if (!isCosmosConfigured()) {
return listAllProfilesFromSupabase(legacyService);
}
try {
const cosmosProfiles = await listAllProfilesFromCosmos();
if (cosmosProfiles.length > 0) {
return cosmosProfiles;
}
const seededProfiles = await seedProfilesToCosmos(await listAllProfilesFromSupabase(legacyService));
if (seededProfiles.length > 0) {
logger.info(`[Profiles] Seeded ${seededProfiles.length} profiles from legacy store into Cosmos.`);
}
return seededProfiles;
} catch (error) {
logger.warn(`[Profiles] Cosmos global profile read/seed failed: ${error instanceof Error ? error.message : 'unknown error'}`);
return [];
}
}
export async function listActiveTradeProfiles(legacyService?: LegacySupabaseService): Promise<TradeProfileRecord[]> {
const profiles = await listAllTradeProfiles(legacyService);
return profiles.filter((profile) => Boolean(profile.is_active));
}
export async function getTradeProfileById(profileId: string, legacyService?: LegacySupabaseService): Promise<TradeProfileRecord | null> {
const normalizedId = String(profileId || '').trim();
if (!normalizedId) {
return null;
}
if (!isCosmosConfigured()) {
return getProfileFromSupabase(normalizedId, legacyService);
}
try {
const cosmosProfile = await getProfileFromCosmos(normalizedId);
if (cosmosProfile) {
return cosmosProfile;
}
const legacyProfile = await getProfileFromSupabase(normalizedId, legacyService);
if (!legacyProfile) {
return null;
}
await seedProfilesToCosmos([legacyProfile]);
logger.info(`[Profiles] Seeded profile ${normalizedId} from legacy store into Cosmos.`);
return legacyProfile;
} catch (error) {
logger.warn(`[Profiles] Cosmos profile lookup/seed failed for ${normalizedId}: ${error instanceof Error ? error.message : 'unknown error'}`);
return null;
}
}
export async function getTradeProfileCapital(profileId: string, legacyService?: LegacySupabaseService): Promise<TradeProfileCapitalSummary | null> {
const profile = await getTradeProfileById(profileId, legacyService);
if (!profile) {
return null;
}
return {
allocatedCapital: Number(profile.allocated_capital || 0),
isActive: Boolean(profile.is_active),
userId: profile.user_id || undefined,
};
}
export async function getTradeProfileForUser(profileId: string, userId: string, legacyService?: LegacySupabaseService): Promise<TradeProfileRecord | null> {
const profile = await getTradeProfileById(profileId, legacyService);
if (!profile || String(profile.user_id || '').trim() !== String(userId || '').trim()) {
return null;
}
return profile;
}
export async function ensureDefaultTradeProfileForUser(userId: string, legacyService?: LegacySupabaseService): Promise<TradeProfileRecord[]> {
const profiles = await listTradeProfilesForUser(userId, legacyService);
if (profiles.length > 0) {
return profiles;
}
const created = await saveTradeProfileForUser(buildDefaultTradeProfile(userId), userId, legacyService);
return [created];
}
export async function saveTradeProfileForUser(
input: Partial<TradeProfileRecord>,
userId: string,
legacyService?: LegacySupabaseService
): Promise<TradeProfileRecord> {
const now = new Date().toISOString();
const normalized = normalizeProfile({
...input,
id: String(input.id || randomUUID()),
user_id: userId,
created_at: input.created_at || now,
updated_at: now,
});
if (!normalized) {
throw new Error('Invalid trade profile payload');
}
if (isCosmosConfigured()) {
try {
const container = getContainer(PROFILE_CONTAINER);
await container.items.upsert<TradeProfileDocument>({
...normalized,
productId: config.PRODUCT_ID,
type: 'trade_profile',
createdAt: normalized.created_at || now,
updatedAt: normalized.updated_at || now,
});
} catch (error) {
logger.warn(`[Profiles] Cosmos profile upsert failed: ${error instanceof Error ? error.message : 'unknown error'}`);
}
}
await mirrorProfileToSupabase(normalized, legacyService);
return normalized;
}
export async function deleteTradeProfileForUser(
profileId: string,
userId: string,
legacyService?: LegacySupabaseService
): Promise<void> {
if (!profileId || !userId) {
return;
}
if (isCosmosConfigured()) {
try {
const container = getContainer(PROFILE_CONTAINER);
await container.item(profileId, userId).delete();
} catch (error) {
logger.warn(`[Profiles] Cosmos profile delete failed: ${error instanceof Error ? error.message : 'unknown error'}`);
}
}
await deleteProfileFromSupabase(profileId, userId, legacyService);
}
export async function getCurrentUserProfile(
userId: string,
fallback: Partial<TradingUserProfile> = {},
legacyService?: LegacySupabaseService
): Promise<TradingUserProfile> {
if (isCosmosConfigured()) {
try {
const cosmosProfile = await getTradingUserProfileFromCosmos(userId);
if (cosmosProfile) {
return cosmosProfile;
}
} catch (error) {
logger.warn(`[Profiles] Cosmos user profile read failed for ${userId}: ${error instanceof Error ? error.message : 'unknown error'}`);
}
}
const client = legacyService?.getClient?.();
if (client) {
try {
const { data, error } = await client
.from('users')
.select('user_id,first_name,last_name,email,role,trade_enable,ALPACA_API_KEY,ALPACA_SECRET_KEY,REAL_ALPACA_API_KEY,REAL_ALPACA_SECRET_KEY,drop_threshold_for_buy,gain_threshold_for_sell,market_poll_interval_in_seconds')
.eq('user_id', userId)
.maybeSingle();
if (!error && data) {
const normalized = {
user_id: String((data as any).user_id || userId),
first_name: String((data as any).first_name || fallback.first_name || ''),
last_name: String((data as any).last_name || fallback.last_name || ''),
email: String((data as any).email || fallback.email || ''),
role: String((data as any).role || fallback.role || 'member'),
trade_enable: Boolean((data as any).trade_enable ?? fallback.trade_enable ?? true),
ALPACA_API_KEY: (data as any).ALPACA_API_KEY || fallback.ALPACA_API_KEY,
ALPACA_SECRET_KEY: (data as any).ALPACA_SECRET_KEY || fallback.ALPACA_SECRET_KEY,
REAL_ALPACA_API_KEY: (data as any).REAL_ALPACA_API_KEY || fallback.REAL_ALPACA_API_KEY,
REAL_ALPACA_SECRET_KEY: (data as any).REAL_ALPACA_SECRET_KEY || fallback.REAL_ALPACA_SECRET_KEY,
drop_threshold_for_buy: Number((data as any).drop_threshold_for_buy ?? fallback.drop_threshold_for_buy ?? 0),
gain_threshold_for_sell: Number((data as any).gain_threshold_for_sell ?? fallback.gain_threshold_for_sell ?? 0),
market_poll_interval_in_seconds: Number((data as any).market_poll_interval_in_seconds ?? fallback.market_poll_interval_in_seconds ?? 0),
};
await upsertTradingUserProfileToCosmos(normalized);
return normalized;
}
} catch (error) {
logger.warn(`[Profiles] Legacy user profile read failed: ${error instanceof Error ? error.message : 'unknown error'}`);
}
}
return {
user_id: userId,
first_name: String(fallback.first_name || ''),
last_name: String(fallback.last_name || ''),
email: String(fallback.email || ''),
role: String(fallback.role || 'member'),
trade_enable: Boolean(fallback.trade_enable ?? true),
ALPACA_API_KEY: fallback.ALPACA_API_KEY,
ALPACA_SECRET_KEY: fallback.ALPACA_SECRET_KEY,
REAL_ALPACA_API_KEY: fallback.REAL_ALPACA_API_KEY,
REAL_ALPACA_SECRET_KEY: fallback.REAL_ALPACA_SECRET_KEY,
drop_threshold_for_buy: Number(fallback.drop_threshold_for_buy ?? 0),
gain_threshold_for_sell: Number(fallback.gain_threshold_for_sell ?? 0),
market_poll_interval_in_seconds: Number(fallback.market_poll_interval_in_seconds ?? 0),
};
}
export async function saveCurrentUserProfile(
userId: string,
input: Partial<TradingUserProfile>,
fallback: Partial<TradingUserProfile> = {},
legacyService?: LegacySupabaseService
): Promise<TradingUserProfile> {
const existing = await getCurrentUserProfile(userId, fallback, legacyService);
const merged: TradingUserProfile = {
...existing,
...input,
user_id: userId,
email: String(input.email ?? existing.email ?? fallback.email ?? ''),
role: String(input.role ?? existing.role ?? fallback.role ?? 'member'),
first_name: String(input.first_name ?? existing.first_name ?? fallback.first_name ?? ''),
last_name: String(input.last_name ?? existing.last_name ?? fallback.last_name ?? ''),
trade_enable: Boolean(input.trade_enable ?? existing.trade_enable ?? fallback.trade_enable ?? true),
drop_threshold_for_buy: Number(input.drop_threshold_for_buy ?? existing.drop_threshold_for_buy ?? fallback.drop_threshold_for_buy ?? 0),
gain_threshold_for_sell: Number(input.gain_threshold_for_sell ?? existing.gain_threshold_for_sell ?? fallback.gain_threshold_for_sell ?? 0),
market_poll_interval_in_seconds: Number(input.market_poll_interval_in_seconds ?? existing.market_poll_interval_in_seconds ?? fallback.market_poll_interval_in_seconds ?? 0),
};
try {
await upsertTradingUserProfileToCosmos(merged);
} catch (error) {
logger.warn(`[Profiles] Cosmos user profile save failed: ${error instanceof Error ? error.message : 'unknown error'}`);
}
const client = legacyService?.getClient?.();
if (client) {
try {
const { error } = await client
.from('users')
.upsert({
user_id: userId,
first_name: merged.first_name,
last_name: merged.last_name,
email: merged.email,
role: merged.role,
trade_enable: merged.trade_enable,
ALPACA_API_KEY: merged.ALPACA_API_KEY ?? null,
ALPACA_SECRET_KEY: merged.ALPACA_SECRET_KEY ?? null,
REAL_ALPACA_API_KEY: merged.REAL_ALPACA_API_KEY ?? null,
REAL_ALPACA_SECRET_KEY: merged.REAL_ALPACA_SECRET_KEY ?? null,
drop_threshold_for_buy: merged.drop_threshold_for_buy,
gain_threshold_for_sell: merged.gain_threshold_for_sell,
market_poll_interval_in_seconds: merged.market_poll_interval_in_seconds,
}, { onConflict: 'user_id' });
if (error) {
throw new Error(error.message);
}
} catch (error) {
logger.warn(`[Profiles] Legacy user profile save failed: ${error instanceof Error ? error.message : 'unknown error'}`);
}
}
return merged;
}