refactor: move backend snapshots to cosmos-first repository
This commit is contained in:
parent
1f2b7bdf89
commit
733874bb6d
@ -14,6 +14,11 @@ import { observabilityService } from './observabilityService.js';
|
|||||||
import { isTradingAdmin, verifyTradingAccessToken } from './platformAuthService.js';
|
import { isTradingAdmin, verifyTradingAccessToken } from './platformAuthService.js';
|
||||||
import { loadGlobalTradingControl, saveGlobalTradingControl } from './tradingControlRepository.js';
|
import { loadGlobalTradingControl, saveGlobalTradingControl } from './tradingControlRepository.js';
|
||||||
import { listDynamicConfigEntries, upsertDynamicConfigEntries } from './dynamicConfigRepository.js';
|
import { listDynamicConfigEntries, upsertDynamicConfigEntries } from './dynamicConfigRepository.js';
|
||||||
|
import {
|
||||||
|
loadLatestBotStateSnapshot as loadLatestBotStateSnapshotFromRepository,
|
||||||
|
resolveSnapshotOwnerId as resolveSnapshotOwnerIdFromRepository,
|
||||||
|
saveBotStateSnapshot as saveBotStateSnapshotFromRepository
|
||||||
|
} from './snapshotRepository.js';
|
||||||
import {
|
import {
|
||||||
deleteTradeProfileForUser,
|
deleteTradeProfileForUser,
|
||||||
ensureDefaultTradeProfileForUser,
|
ensureDefaultTradeProfileForUser,
|
||||||
@ -1102,7 +1107,7 @@ export class ApiServer {
|
|||||||
|
|
||||||
private async resolveSnapshotOwnerId(): Promise<string | null> {
|
private async resolveSnapshotOwnerId(): Promise<string | null> {
|
||||||
if (this.snapshotOwnerId) return this.snapshotOwnerId;
|
if (this.snapshotOwnerId) return this.snapshotOwnerId;
|
||||||
const owner = await supabaseService.getSnapshotOwnerId();
|
const owner = await resolveSnapshotOwnerIdFromRepository(supabaseService);
|
||||||
this.snapshotOwnerId = owner;
|
this.snapshotOwnerId = owner;
|
||||||
return owner;
|
return owner;
|
||||||
}
|
}
|
||||||
@ -1111,9 +1116,9 @@ export class ApiServer {
|
|||||||
try {
|
try {
|
||||||
const ownerId = await this.resolveSnapshotOwnerId();
|
const ownerId = await this.resolveSnapshotOwnerId();
|
||||||
if (!ownerId) {
|
if (!ownerId) {
|
||||||
logger.warn('[API] Snapshot owner not resolved; skipping Supabase restore.');
|
logger.warn('[API] Snapshot owner not resolved; skipping snapshot restore.');
|
||||||
} else {
|
} else {
|
||||||
const snapshot = await supabaseService.loadLatestBotStateSnapshot(ownerId);
|
const snapshot = await loadLatestBotStateSnapshotFromRepository(ownerId, supabaseService);
|
||||||
if (snapshot && snapshot.state) {
|
if (snapshot && snapshot.state) {
|
||||||
const restoredState = snapshot.state as Partial<BotState>;
|
const restoredState = snapshot.state as Partial<BotState>;
|
||||||
this.state = {
|
this.state = {
|
||||||
@ -1128,7 +1133,7 @@ export class ApiServer {
|
|||||||
if (this.state.health.tradingControl) {
|
if (this.state.health.tradingControl) {
|
||||||
healthTracker.recordTradingControl(this.state.health.tradingControl);
|
healthTracker.recordTradingControl(this.state.health.tradingControl);
|
||||||
}
|
}
|
||||||
logger.info(`[API] Restored runtime state from Supabase snapshot (user=${ownerId}).`);
|
logger.info(`[API] Restored runtime state from snapshot repository (user=${ownerId}).`);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1178,7 +1183,7 @@ export class ApiServer {
|
|||||||
try {
|
try {
|
||||||
const ownerId = await this.resolveSnapshotOwnerId();
|
const ownerId = await this.resolveSnapshotOwnerId();
|
||||||
if (!ownerId) return;
|
if (!ownerId) return;
|
||||||
await supabaseService.saveBotStateSnapshot(ownerId, this.getPersistableState());
|
await saveBotStateSnapshotFromRepository(ownerId, this.getPersistableState(), supabaseService);
|
||||||
this.lastSnapshotWriteAt = Date.now();
|
this.lastSnapshotWriteAt = Date.now();
|
||||||
logger.info(`[API] Persisted snapshot for ${ownerId}. Interval: ${Math.round(elapsed / 1000)}s`);
|
logger.info(`[API] Persisted snapshot for ${ownerId}. Interval: ${Math.round(elapsed / 1000)}s`);
|
||||||
} catch (error: any) {
|
} catch (error: any) {
|
||||||
|
|||||||
97
backend/src/services/snapshotRepository.ts
Normal file
97
backend/src/services/snapshotRepository.ts
Normal file
@ -0,0 +1,97 @@
|
|||||||
|
import { getContainer } from '@bytelyst/cosmos';
|
||||||
|
import { config } from '../config/index.js';
|
||||||
|
import logger from '../utils/logger.js';
|
||||||
|
import type { supabaseService } from './SupabaseService.js';
|
||||||
|
import { listActiveTradingUsers } from './userRepository.js';
|
||||||
|
|
||||||
|
type LegacySupabaseService = typeof supabaseService;
|
||||||
|
|
||||||
|
const CONTAINER_NAME = 'bot_state_snapshots';
|
||||||
|
|
||||||
|
interface BotStateSnapshotDocument {
|
||||||
|
id: string;
|
||||||
|
productId: string;
|
||||||
|
ownerId: string;
|
||||||
|
state: unknown;
|
||||||
|
updatedAt: string;
|
||||||
|
}
|
||||||
|
|
||||||
|
function isCosmosConfigured(): boolean {
|
||||||
|
return Boolean(config.COSMOS_ENDPOINT && config.COSMOS_KEY);
|
||||||
|
}
|
||||||
|
|
||||||
|
function isUuid(value: string): boolean {
|
||||||
|
return /^[0-9a-f]{8}-[0-9a-f]{4}-[1-5][0-9a-f]{3}-[89ab][0-9a-f]{3}-[0-9a-f]{12}$/i.test(value);
|
||||||
|
}
|
||||||
|
|
||||||
|
function buildSnapshotId(ownerId: string): string {
|
||||||
|
return `${config.PRODUCT_ID}:${ownerId}`;
|
||||||
|
}
|
||||||
|
|
||||||
|
export async function resolveSnapshotOwnerId(legacyService?: LegacySupabaseService): Promise<string | null> {
|
||||||
|
const configured = String(config.SNAPSHOT_USER_ID || '').trim().toLowerCase();
|
||||||
|
if (isUuid(configured)) {
|
||||||
|
return configured;
|
||||||
|
}
|
||||||
|
|
||||||
|
const users = await listActiveTradingUsers(legacyService);
|
||||||
|
const firstUserId = String(users[0]?.user_id || '').trim().toLowerCase();
|
||||||
|
if (isUuid(firstUserId)) {
|
||||||
|
return firstUserId;
|
||||||
|
}
|
||||||
|
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
export async function loadLatestBotStateSnapshot(
|
||||||
|
ownerId: string,
|
||||||
|
legacyService?: LegacySupabaseService
|
||||||
|
): Promise<{ state: unknown } | null> {
|
||||||
|
if (!ownerId) return null;
|
||||||
|
|
||||||
|
if (isCosmosConfigured()) {
|
||||||
|
try {
|
||||||
|
const container = getContainer(CONTAINER_NAME);
|
||||||
|
const { resource } = await container
|
||||||
|
.item(buildSnapshotId(ownerId), config.PRODUCT_ID)
|
||||||
|
.read<BotStateSnapshotDocument>();
|
||||||
|
if (resource?.state !== undefined) {
|
||||||
|
return { state: resource.state };
|
||||||
|
}
|
||||||
|
} catch (error) {
|
||||||
|
const code = (error as { code?: number })?.code;
|
||||||
|
if (code !== 404) {
|
||||||
|
logger.warn(`[Snapshots] Cosmos load failed, falling back to legacy store: ${error instanceof Error ? error.message : 'unknown error'}`);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return legacyService?.loadLatestBotStateSnapshot(ownerId) ?? null;
|
||||||
|
}
|
||||||
|
|
||||||
|
export async function saveBotStateSnapshot(
|
||||||
|
ownerId: string,
|
||||||
|
state: unknown,
|
||||||
|
legacyService?: LegacySupabaseService
|
||||||
|
): Promise<void> {
|
||||||
|
if (!ownerId) return;
|
||||||
|
|
||||||
|
if (isCosmosConfigured()) {
|
||||||
|
try {
|
||||||
|
const container = getContainer(CONTAINER_NAME);
|
||||||
|
const doc: BotStateSnapshotDocument = {
|
||||||
|
id: buildSnapshotId(ownerId),
|
||||||
|
productId: config.PRODUCT_ID,
|
||||||
|
ownerId,
|
||||||
|
state,
|
||||||
|
updatedAt: new Date().toISOString()
|
||||||
|
};
|
||||||
|
await container.items.upsert(doc);
|
||||||
|
return;
|
||||||
|
} catch (error) {
|
||||||
|
logger.warn(`[Snapshots] Cosmos upsert failed, falling back to legacy store: ${error instanceof Error ? error.message : 'unknown error'}`);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
await legacyService?.saveBotStateSnapshot(ownerId, state);
|
||||||
|
}
|
||||||
Loading…
Reference in New Issue
Block a user