refactor: seed cosmos stores and standardize request ids

This commit is contained in:
Saravana Achu Mac 2026-04-04 17:16:18 -07:00
parent 790213513f
commit 77c7b32ac0
18 changed files with 184 additions and 41 deletions

View File

@ -2,6 +2,7 @@ import express, { NextFunction, Request, Response } from 'express';
import { createServer } from 'http';
import { Server, Socket } from 'socket.io';
import cors from 'cors';
import { randomUUID } from 'node:crypto';
import logger from '../utils/logger.js';
import fs from 'fs';
import path from 'path';
@ -54,6 +55,7 @@ interface AuthenticatedRequest extends Request {
authEmail?: string;
authDisplayName?: string;
authPlan?: string;
requestId?: string;
}
interface RateLimitBucket {
@ -1216,6 +1218,13 @@ export class ApiServer {
},
credentials: true
}));
this.app.use((req: AuthenticatedRequest, res: Response, next: NextFunction) => {
const inbound = String(req.headers['x-request-id'] || '').trim();
const requestId = inbound || `backend-${randomUUID()}`;
req.requestId = requestId;
res.setHeader('x-request-id', requestId);
next();
});
this.app.use(express.json());
}

View File

@ -68,18 +68,7 @@ async function writeToCosmos(record: CapitalLedgerRecord): Promise<CapitalLedger
return toLedgerRecord(resource as unknown as CapitalLedgerDocument);
}
export async function getCapitalLedger(profileId: string, legacyService?: LegacySupabaseService): Promise<CapitalLedgerRecord | null> {
if (!profileId) return null;
if (isCosmosConfigured()) {
try {
const cosmosRecord = await readFromCosmos(profileId);
if (cosmosRecord) return cosmosRecord;
} catch (error) {
logger.warn(`[CapitalLedgerRepo] Cosmos read failed, falling back to legacy store: ${error instanceof Error ? error.message : 'unknown error'}`);
}
}
async function readFromLegacy(profileId: string, legacyService?: LegacySupabaseService): Promise<CapitalLedgerRecord | null> {
const client = legacyService?.getClient?.();
if (!client) return null;
@ -97,6 +86,30 @@ export async function getCapitalLedger(profileId: string, legacyService?: Legacy
return toLedgerRecord(data as CapitalLedgerDocument);
}
export async function getCapitalLedger(profileId: string, legacyService?: LegacySupabaseService): Promise<CapitalLedgerRecord | null> {
if (!profileId) return null;
if (!isCosmosConfigured()) {
return readFromLegacy(profileId, legacyService);
}
if (isCosmosConfigured()) {
try {
const cosmosRecord = await readFromCosmos(profileId);
if (cosmosRecord) return cosmosRecord;
const legacyRecord = await readFromLegacy(profileId, legacyService);
if (!legacyRecord) return null;
await writeToCosmos(legacyRecord);
logger.info(`[CapitalLedgerRepo] Seeded capital ledger ${profileId} from legacy store into Cosmos.`);
return legacyRecord;
} catch (error) {
logger.warn(`[CapitalLedgerRepo] Cosmos read/seed failed for ${profileId}: ${error instanceof Error ? error.message : 'unknown error'}`);
return null;
}
}
return null;
}
export async function upsertCapitalLedger(
record: CapitalLedgerRecord,
legacyService?: LegacySupabaseService

View File

@ -76,6 +76,28 @@ async function listFromSupabase(legacyService?: LegacySupabaseService): Promise<
}
}
async function seedCosmosFromLegacy(legacyService?: LegacySupabaseService): Promise<DynamicConfigEntry[]> {
const legacyEntries = await listFromSupabase(legacyService);
if (!legacyEntries.length) {
return [];
}
const now = new Date().toISOString();
const container = getContainer(CONTAINER_NAME);
await Promise.all(legacyEntries.map((entry) => container.items.upsert<DynamicConfigDocument>({
id: entry.key,
productId: config.PRODUCT_ID,
key: entry.key,
value: entry.value,
description: entry.description || '',
updated_at: entry.updated_at || now,
updatedAt: entry.updated_at || now,
})));
logger.info(`[DynamicConfig] Seeded ${legacyEntries.length} config entries from legacy store into Cosmos.`);
return legacyEntries;
}
async function mirrorToSupabase(entries: DynamicConfigEntry[], legacyService?: LegacySupabaseService): Promise<void> {
const client = legacyService?.getClient?.();
if (!client || entries.length === 0) return;
@ -99,16 +121,20 @@ async function mirrorToSupabase(entries: DynamicConfigEntry[], legacyService?: L
}
export async function listDynamicConfigEntries(legacyService?: LegacySupabaseService): Promise<DynamicConfigEntry[]> {
if (!isCosmosConfigured()) {
return listFromSupabase(legacyService);
}
try {
const cosmosEntries = await listFromCosmos();
if (cosmosEntries.length > 0) {
return cosmosEntries;
}
return await seedCosmosFromLegacy(legacyService);
} catch (error) {
logger.warn(`[DynamicConfig] Cosmos read failed, falling back to legacy store: ${error instanceof Error ? error.message : 'unknown error'}`);
logger.warn(`[DynamicConfig] Cosmos read/seed failed: ${error instanceof Error ? error.message : 'unknown error'}`);
return [];
}
return listFromSupabase(legacyService);
}
export async function upsertDynamicConfigEntries(

View File

@ -178,6 +178,22 @@ async function listProfilesFromSupabase(userId: string, legacyService?: LegacySu
}
}
async function seedProfilesToCosmos(profiles: TradeProfileRecord[]): Promise<TradeProfileRecord[]> {
if (!isCosmosConfigured() || profiles.length === 0) {
return profiles;
}
const container = getContainer(PROFILE_CONTAINER);
await Promise.all(profiles.map((profile) => container.items.upsert<TradeProfileDocument>({
...profile,
productId: config.PRODUCT_ID,
type: 'trade_profile',
createdAt: profile.created_at || new Date().toISOString(),
updatedAt: profile.updated_at || new Date().toISOString(),
})));
return profiles;
}
async function listAllProfilesFromSupabase(legacyService?: LegacySupabaseService): Promise<TradeProfileRecord[]> {
const client = legacyService?.getClient?.();
if (!client) {
@ -295,29 +311,45 @@ async function deleteProfileFromSupabase(profileId: string, userId: string, lega
}
export async function listTradeProfilesForUser(userId: string, legacyService?: LegacySupabaseService): Promise<TradeProfileRecord[]> {
if (!isCosmosConfigured()) {
return listProfilesFromSupabase(userId, legacyService);
}
try {
const cosmosProfiles = await listProfilesFromCosmos(userId);
if (cosmosProfiles.length > 0) {
return cosmosProfiles;
}
const seededProfiles = await seedProfilesToCosmos(await listProfilesFromSupabase(userId, legacyService));
if (seededProfiles.length > 0) {
logger.info(`[Profiles] Seeded ${seededProfiles.length} user profiles from legacy store into Cosmos for user ${userId}.`);
}
return seededProfiles;
} catch (error) {
logger.warn(`[Profiles] Cosmos profile read failed, falling back to legacy store: ${error instanceof Error ? error.message : 'unknown error'}`);
logger.warn(`[Profiles] Cosmos profile read/seed failed for user ${userId}: ${error instanceof Error ? error.message : 'unknown error'}`);
return [];
}
return listProfilesFromSupabase(userId, legacyService);
}
export async function listAllTradeProfiles(legacyService?: LegacySupabaseService): Promise<TradeProfileRecord[]> {
if (!isCosmosConfigured()) {
return listAllProfilesFromSupabase(legacyService);
}
try {
const cosmosProfiles = await listAllProfilesFromCosmos();
if (cosmosProfiles.length > 0) {
return cosmosProfiles;
}
const seededProfiles = await seedProfilesToCosmos(await listAllProfilesFromSupabase(legacyService));
if (seededProfiles.length > 0) {
logger.info(`[Profiles] Seeded ${seededProfiles.length} profiles from legacy store into Cosmos.`);
}
return seededProfiles;
} catch (error) {
logger.warn(`[Profiles] Cosmos global profile read failed, falling back to legacy store: ${error instanceof Error ? error.message : 'unknown error'}`);
logger.warn(`[Profiles] Cosmos global profile read/seed failed: ${error instanceof Error ? error.message : 'unknown error'}`);
return [];
}
return listAllProfilesFromSupabase(legacyService);
}
export async function listActiveTradeProfiles(legacyService?: LegacySupabaseService): Promise<TradeProfileRecord[]> {
@ -331,16 +363,26 @@ export async function getTradeProfileById(profileId: string, legacyService?: Leg
return null;
}
if (!isCosmosConfigured()) {
return getProfileFromSupabase(normalizedId, legacyService);
}
try {
const cosmosProfile = await getProfileFromCosmos(normalizedId);
if (cosmosProfile) {
return cosmosProfile;
}
const legacyProfile = await getProfileFromSupabase(normalizedId, legacyService);
if (!legacyProfile) {
return null;
}
await seedProfilesToCosmos([legacyProfile]);
logger.info(`[Profiles] Seeded profile ${normalizedId} from legacy store into Cosmos.`);
return legacyProfile;
} catch (error) {
logger.warn(`[Profiles] Cosmos profile lookup failed, falling back to legacy store: ${error instanceof Error ? error.message : 'unknown error'}`);
logger.warn(`[Profiles] Cosmos profile lookup/seed failed for ${normalizedId}: ${error instanceof Error ? error.message : 'unknown error'}`);
return null;
}
return getProfileFromSupabase(normalizedId, legacyService);
}
export async function getTradeProfileCapital(profileId: string, legacyService?: LegacySupabaseService): Promise<TradeProfileCapitalSummary | null> {

View File

@ -49,6 +49,12 @@ export async function loadLatestBotStateSnapshot(
): Promise<{ state: unknown } | null> {
if (!ownerId) return null;
const loadLegacySnapshot = async () => legacyService?.loadLatestBotStateSnapshot(ownerId) ?? null;
if (!isCosmosConfigured()) {
return loadLegacySnapshot();
}
if (isCosmosConfigured()) {
try {
const container = getContainer(CONTAINER_NAME);
@ -58,15 +64,28 @@ export async function loadLatestBotStateSnapshot(
if (resource?.state !== undefined) {
return { state: resource.state };
}
const legacySnapshot = await loadLegacySnapshot();
if (!legacySnapshot) {
return null;
}
await container.items.upsert<BotStateSnapshotDocument>({
id: buildSnapshotId(ownerId),
productId: config.PRODUCT_ID,
ownerId,
state: legacySnapshot.state,
updatedAt: new Date().toISOString()
});
logger.info(`[Snapshots] Seeded snapshot for owner ${ownerId} from legacy store into Cosmos.`);
return legacySnapshot;
} catch (error) {
const code = (error as { code?: number })?.code;
if (code !== 404) {
logger.warn(`[Snapshots] Cosmos load failed, falling back to legacy store: ${error instanceof Error ? error.message : 'unknown error'}`);
logger.warn(`[Snapshots] Cosmos load/seed failed for ${ownerId}: ${error instanceof Error ? error.message : 'unknown error'}`);
}
return null;
}
}
return legacyService?.loadLatestBotStateSnapshot(ownerId) ?? null;
return null;
}
export async function saveBotStateSnapshot(

View File

@ -103,6 +103,12 @@ pnpm lint
- web lint
- mobile lint
## Request Tracing
- the main web and mobile API paths now attach `x-request-id`
- backend HTTP responses echo `x-request-id` so browser/app logs can be correlated with backend logs
- during incident review, treat `x-request-id` as the primary request correlation key across client and backend traces
## Staged Cutover
### Order
@ -116,7 +122,7 @@ pnpm lint
### Backend cutover
- deploy backend with platform JWT support and Cosmos-backed trading controls enabled
- keep legacy Supabase fallback enabled during first production bake
- allow legacy Supabase reads only for controlled migration seeding where a Cosmos-native repository is not complete yet
- confirm runtime control reads/writes work through backend APIs
- confirm `dynamic_config` and trading-control containers are readable and writable
- confirm unauthorized requests are rejected and tenant-scoped reads are enforced

View File

@ -31,17 +31,18 @@ It assumes:
- [x] Web migrated into `web/` with shared runtime, shared kill-switch gate, shared telemetry bootstrap, normalized backend URL resolution, and common-platform-native session handling
- [x] Mobile migrated into `mobile/` with product identity, shared runtime bootstrap, launch-time kill-switch gate, platform-service auth, live backend polling plus websocket-backed updates, startup/error telemetry capture, secure session storage with invalidation handling, and explicit degraded/offline status surfacing
- [x] Backend now accepts common-platform JWTs with legacy Supabase fallback and persists global trading-control state through Cosmos-backed control storage
- [x] Dynamic config now flows through backend control-plane APIs with Cosmos-first storage and legacy Supabase fallback
- [x] Backend snapshots now use a Cosmos-first repository with legacy fallback
- [x] Dynamic config now flows through backend control-plane APIs with Cosmos-first storage and one-time legacy seeding during migration
- [x] Backend snapshots now use a Cosmos-first repository with one-time legacy seeding during migration
- [x] Distributed entry and reconciliation locks now use a Cosmos-first repository with legacy fallback
- [x] Capital ledger persistence now uses a Cosmos-first repository with legacy fallback
- [x] Capital ledger persistence now uses a Cosmos-first repository with one-time legacy seeding during migration
- [x] Mobile platform auth requests now use the common React Native platform SDK
- [x] Backend risk and PnL aggregate reads now flow through repository abstractions instead of direct legacy service calls
- [x] Web history, profile, marketplace, config, and manual-entry flows now run through backend APIs instead of browser-side table access
- [x] Release smoke coverage now exists for web auth and product accessibility flows, with a tracked mobile release smoke checklist in operations
- [x] Request ID propagation is now standardized across the main web/mobile API paths and echoed by backend HTTP responses
- [x] Root verification and lint flows now run successfully without sandbox-hostile script harness behavior
- [-] DRY cleanup completed for runtime/config/bootstrap concerns, shared websocket auth helpers, and platform-session handling, but not yet for all persistence and flag/correlation concerns
- [!] Full common-platform data-plane replacement remains a follow-up where legacy Supabase fallback still exists beneath backend repositories for selected trading records during migration
- [-] DRY cleanup completed for runtime/config/bootstrap concerns, shared websocket auth helpers, platform-session handling, and request tracing, but not yet for all persistence and feature-flag concerns
- [!] Full common-platform data-plane replacement remains a follow-up where selected trading-record repositories still depend on legacy Supabase storage because Cosmos-native equivalents are not finished yet
## 3. Guiding Rules
@ -176,7 +177,7 @@ Ensure all surfaces adopt one consistent platform model for auth, kill switch, t
- [x] Define kill-switch semantics across web, mobile, and backend
- [x] Define ownership split between product accessibility controls and trading-behavior controls
- [x] Define telemetry envelope fields
- [ ] Define correlation ID and request propagation strategy
- [x] Define correlation ID and request propagation strategy
- [ ] Define feature flag ownership and evaluation model
- [x] Define system-of-record ownership by concern
- [x] Define degraded-platform fallback behavior

View File

@ -1,5 +1,6 @@
import { secureSessionStorage, clearMobileSessionStorage, MOBILE_SESSION_STORAGE_KEY } from '@/lib/secureSessionStorage';
import { createMobilePlatformSdk, mobileRuntime } from '@/lib/runtime';
import { createRequestId } from '../../shared/request-id.js';
export interface PlatformAuthUser {
id: string;
@ -50,6 +51,9 @@ async function platformRequest<T>(
const sdk = createPlatformAuthSdk(options?.accessToken ?? null);
const response = await sdk.fetch(path, {
method: options?.method || 'GET',
headers: {
'x-request-id': createRequestId('mobile-auth')
},
body: options?.body ? JSON.stringify(options.body) : undefined,
});

6
shared/request-id.ts Normal file
View File

@ -0,0 +1,6 @@
export function createRequestId(prefix: string = 'req'): string {
const safePrefix = String(prefix || 'req').replace(/[^a-zA-Z0-9_-]/g, '').slice(0, 16) || 'req';
const time = Date.now().toString(36);
const random = Math.random().toString(36).slice(2, 10);
return `${safePrefix}-${time}-${random}`;
}

View File

@ -1,6 +1,7 @@
import { getPlatformAccessToken } from '../lib/authSession';
import type { BacktestRequestPayload, BacktestResult } from './types';
import { tradingRuntime } from '../lib/runtime';
import { createRequestId } from '../../../shared/request-id.js';
export const runBacktestApi = async (payload: BacktestRequestPayload): Promise<BacktestResult> => {
const accessToken = await getPlatformAccessToken();
@ -10,7 +11,8 @@ export const runBacktestApi = async (payload: BacktestRequestPayload): Promise<B
method: 'POST',
headers: {
'Content-Type': 'application/json',
Authorization: `Bearer ${accessToken}`
Authorization: `Bearer ${accessToken}`,
'x-request-id': createRequestId('web-backtest')
},
body: JSON.stringify(payload)
});

View File

@ -1,6 +1,6 @@
// @vitest-environment jsdom
import { beforeEach, describe, expect, it, vi } from 'vitest';
import { render, screen, waitFor } from '@testing-library/react';
import { fireEvent, render, screen, waitFor } from '@testing-library/react';
import userEvent from '@testing-library/user-event';
import { ChatControl } from './ChatControl';
@ -91,8 +91,7 @@ describe('ChatControl DOM flow', () => {
await user.click(screen.getByRole('button', { name: /Edit Params/i }));
const capitalInput = screen.getByPlaceholderText('Capital');
await user.clear(capitalInput);
await user.type(capitalInput, '2500');
fireEvent.change(capitalInput, { target: { value: '2500' } });
await user.click(screen.getByRole('button', { name: /Apply to Dashboard/i }));

View File

@ -1,3 +1,5 @@
import { createRequestId } from '../../../shared/request-id.js';
const AUTH_STORAGE_PREFIX = 'invttrdg_web';
const ACCESS_TOKEN_KEY = `${AUTH_STORAGE_PREFIX}_access_token`;
const REFRESH_TOKEN_KEY = `${AUTH_STORAGE_PREFIX}_refresh_token`;
@ -136,6 +138,7 @@ async function platformRequest<T>(
headers: {
'Content-Type': 'application/json',
'x-product-id': runtimeModule.tradingRuntime.productId,
'x-request-id': createRequestId('web-auth'),
...(options?.accessToken ? { Authorization: `Bearer ${options.accessToken}` } : {}),
},
body: options?.body ? JSON.stringify(options.body) : undefined,

View File

@ -1,5 +1,6 @@
import { getPlatformAccessToken } from './authSession';
import { tradingRuntime } from './runtime';
import { createRequestId } from '../../../shared/request-id.js';
export interface DynamicConfigItem {
key: string;
@ -12,6 +13,7 @@ export async function fetchDynamicConfigItems(): Promise<DynamicConfigItem[]> {
const response = await fetch(`${tradingRuntime.tradingApiUrl}/api/admin/config/dynamic`, {
headers: {
Authorization: `Bearer ${accessToken}`,
'x-request-id': createRequestId('web-config'),
},
});
@ -30,6 +32,7 @@ export async function upsertDynamicConfigItems(items: DynamicConfigItem[]): Prom
headers: {
'Content-Type': 'application/json',
Authorization: `Bearer ${accessToken}`,
'x-request-id': createRequestId('web-config'),
},
body: JSON.stringify({ items }),
});

View File

@ -1,5 +1,6 @@
import { getPlatformAccessToken } from './authSession';
import { tradingRuntime } from './runtime';
import { createRequestId } from '../../../shared/request-id.js';
export interface ManualEntryPayload {
stock_instance_id?: string;
@ -33,6 +34,7 @@ async function apiRequest<T>(path: string, init?: RequestInit): Promise<T> {
headers: {
'Content-Type': 'application/json',
Authorization: `Bearer ${accessToken}`,
'x-request-id': createRequestId('web-entries'),
...(init?.headers || {}),
},
});

View File

@ -1,5 +1,6 @@
import { getPlatformAccessToken } from './authSession';
import { tradingRuntime } from './runtime';
import { createRequestId } from '../../../shared/request-id.js';
export interface StrategyPresetPayload {
id: string;
@ -20,7 +21,8 @@ async function apiRequest<T>(path: string, init?: RequestInit): Promise<T> {
...init,
headers: {
'Content-Type': 'application/json',
Authorization: `Bearer ${getPlatformAccessToken()}`,
Authorization: `Bearer ${await getPlatformAccessToken()}`,
'x-request-id': createRequestId('web-market'),
...(init?.headers || {}),
},
});

View File

@ -1,5 +1,6 @@
import { getPlatformAccessToken } from './authSession';
import { tradingRuntime } from './runtime';
import { createRequestId } from '../../../shared/request-id.js';
export interface PositionsBootstrapResponse {
entries: any[];
@ -12,7 +13,8 @@ async function apiRequest<T>(path: string): Promise<T> {
const response = await fetch(`${tradingRuntime.tradingApiUrl}${path}`, {
headers: {
'Content-Type': 'application/json',
Authorization: `Bearer ${getPlatformAccessToken()}`,
Authorization: `Bearer ${await getPlatformAccessToken()}`,
'x-request-id': createRequestId('web-positions'),
},
});

View File

@ -1,5 +1,6 @@
import { getPlatformAccessToken } from './authSession';
import { tradingRuntime } from './runtime';
import { createRequestId } from '../../../shared/request-id.js';
export interface TradeProfilePayload {
id?: string;
@ -41,6 +42,7 @@ async function apiRequest<T>(path: string, init?: RequestInit): Promise<T> {
headers: {
'Content-Type': 'application/json',
Authorization: `Bearer ${accessToken}`,
'x-request-id': createRequestId('web-profile'),
...(init?.headers || {}),
},
});

View File

@ -1,5 +1,6 @@
import { getPlatformAccessToken } from './authSession';
import { tradingRuntime } from './runtime';
import { createRequestId } from '../../../shared/request-id.js';
export interface TradeHistoryApiRow {
id?: string;
@ -30,7 +31,8 @@ export async function fetchTradeHistory(options?: { scope?: 'user' | 'all'; limi
const response = await fetch(`${tradingRuntime.tradingApiUrl}/api/trade-history${params.toString() ? `?${params.toString()}` : ''}`, {
headers: {
'Content-Type': 'application/json',
Authorization: `Bearer ${getPlatformAccessToken()}`,
Authorization: `Bearer ${await getPlatformAccessToken()}`,
'x-request-id': createRequestId('web-history'),
},
});