import logger from '../utils/logger.js'; import { config } from '../config/index.js'; import { capitalLedger } from './CapitalLedger.js'; import { OperationalEvent, OperationalEventType, OperationalEventSeverity } from '../domain/operationalEvents.js'; import { metrics } from './MetricsService.js'; import crypto from 'crypto'; export interface ObservabilitySummary { tradingLoop: { durationMs: number; lastRunAt: number | null; healthy: boolean; }; monitorLoop: { durationMs: number; lastRunAt: number | null; }; reconciliation: { durationMs: number; lastRunAt: number | null; mismatchCount: number; missingFromExchange: number; missingInDb: number; healthy: boolean; }; lockContention: { entry: number; reconciliation: number; }; capitalInvariantViolations: number; } export class ObservabilityService { private lastTradingLoopAt: number | null = null; private lastTradingLoopDuration = 0; private lastMonitorLoopAt: number | null = null; private lastMonitorLoopDuration = 0; private lastReconciliationAt: number | null = null; private lastReconciliationDuration = 0; private lastReconciliationMismatch = 0; private lastReconciliationMissingFromExchange = 0; private lastReconciliationMissingInDb = 0; private reconciliationDegradedStreak = 0; private reconciliationLastSloAlertAt = 0; private entryLockContentionsCount = 0; private reconciliationLockContentionsCount = 0; private capitalInvariantViolationsCount = 0; private capitalInvariantLastAlertAt = new Map(); private profileProvider: (() => string[]) | null = null; private capitalWatchdog?: NodeJS.Timeout; private watchdogRunning = false; private readonly eventBuffer: OperationalEvent[] = []; private readonly MAX_EVENTS = Math.max(50, Math.min(10_000, Number(config.OPERATIONAL_EVENTS_MAX_BUFFER || 2000))); private onEventCallback: ((event: OperationalEvent) => void) | null = null; constructor() { } recordTradingLoop(durationMs: number) { metrics.subsystemDurationSeconds.labels('trading').observe(durationMs / 1000); metrics.subsystemLastRunTimestamp.labels('trading').set(Date.now() / 1000); metrics.subsystemAlive.labels('trading').set(1); this.lastTradingLoopDuration = durationMs; this.lastTradingLoopAt = Date.now(); } recordMonitorLoop(durationMs: number) { metrics.subsystemDurationSeconds.labels('monitor').observe(durationMs / 1000); metrics.subsystemLastRunTimestamp.labels('monitor').set(Date.now() / 1000); metrics.subsystemAlive.labels('monitor').set(1); this.lastMonitorLoopDuration = durationMs; this.lastMonitorLoopAt = Date.now(); } recordReconciliationLoop(durationMs: number, mismatchCount: number, missingFromExchange: number, missingInDb: number) { metrics.subsystemDurationSeconds.labels('reconciliation').observe(durationMs / 1000); metrics.subsystemLastRunTimestamp.labels('reconciliation').set(Date.now() / 1000); metrics.subsystemAlive.labels('reconciliation').set(1); if (mismatchCount > 0) { metrics.reconciliationMismatchesTotal.inc(mismatchCount); } metrics.reconciliationMissingItemsCount.labels('exchange').set(missingFromExchange); metrics.reconciliationMissingItemsCount.labels('db').set(missingInDb); this.lastReconciliationDuration = durationMs; this.lastReconciliationAt = Date.now(); this.lastReconciliationMismatch = mismatchCount; this.lastReconciliationMissingFromExchange = missingFromExchange; this.lastReconciliationMissingInDb = missingInDb; const mismatchThreshold = Math.max(0, Number(config.RECONCILIATION_SLO_MISMATCH_THRESHOLD || 1)); const missingExchangeThreshold = Math.max(0, Number(config.RECONCILIATION_SLO_MISSING_EXCHANGE_THRESHOLD || 1)); const missingDbThreshold = Math.max(0, Number(config.RECONCILIATION_SLO_MISSING_DB_THRESHOLD || 1)); const degraded = mismatchCount >= mismatchThreshold || missingFromExchange >= missingExchangeThreshold || missingInDb >= missingDbThreshold; if (degraded) { this.reconciliationDegradedStreak += 1; } else { this.reconciliationDegradedStreak = 0; } const alertStreak = Math.max(1, Number(config.RECONCILIATION_SLO_ALERT_STREAK || 2)); const throttleMs = Math.max(0, Number(config.RECONCILIATION_SLO_ALERT_THROTTLE_MS || 600_000)); const now = Date.now(); const canAlert = throttleMs <= 0 || (now - this.reconciliationLastSloAlertAt) >= throttleMs; if (degraded && this.reconciliationDegradedStreak >= alertStreak && canAlert) { this.reconciliationLastSloAlertAt = now; const message = `Reconciliation degraded (${this.reconciliationDegradedStreak} cycles): mismatches=${mismatchCount}, missing_from_exchange=${missingFromExchange}, missing_in_db=${missingInDb}`; this.emitEvent({ type: 'RECONCILIATION_DEGRADED', severity: 'WARN', message }); logger.warn(message, { event: 'reconciliation_slo_degraded', mismatchCount, missingFromExchange, missingInDb, degradedStreak: this.reconciliationDegradedStreak }); } } incrementEntryLockContention() { this.entryLockContentionsCount += 1; } incrementReconciliationLockContention() { this.reconciliationLockContentionsCount += 1; } observeExchangeLatency(operation: string, durationMs: number) { metrics.exchangeApiLatencySeconds.labels(config.PROVIDER, operation).observe(durationMs / 1000); } incrementUnsupportedFeature(feature: string) { // We could add a counter for this too if needed, but not in current scope } registerProfileProvider(provider: () => string[]) { this.profileProvider = provider; } startCapitalWatchdog(intervalMs: number = config.CAPITAL_WATCHDOG_INTERVAL_MS) { if (this.watchdogRunning) return; this.watchdogRunning = true; this.capitalWatchdog = setInterval(() => { this.checkCapitalInvariant(); }, intervalMs); this.capitalWatchdog.unref(); } private async checkCapitalInvariant() { if (!this.profileProvider) return; const profileIds = this.profileProvider().filter(Boolean); const epsilonUsd = Math.max(0, Number(config.CAPITAL_INVARIANT_EPSILON_USD || 0)); const epsilonPct = Math.max(0, Number(config.CAPITAL_INVARIANT_EPSILON_PCT || 0)); const throttleMs = Math.max(0, Number(config.CAPITAL_INVARIANT_ALERT_THROTTLE_MS || 0)); for (const profileId of profileIds) { const ledger = await capitalLedger.getLedger(profileId); if (!ledger) continue; const available = capitalLedger.availableCapital(ledger); const allocated = Number(ledger.allocated_capital || 0); const denominator = allocated > 0 ? allocated : 1; const utilization = (ledger.reserved_for_orders + ledger.reserved_for_positions) / denominator * 100; const epsilon = Math.max(epsilonUsd, Math.abs(allocated) * epsilonPct); metrics.profileUtilizationPercent.labels(profileId).set(utilization); if (available < -epsilon) { metrics.capitalInvariantViolationsTotal.labels(profileId).inc(); this.capitalInvariantViolationsCount += 1; const now = Date.now(); const lastAlertAt = this.capitalInvariantLastAlertAt.get(profileId) || 0; const shouldEmit = throttleMs <= 0 || (now - lastAlertAt) >= throttleMs; if (!shouldEmit) { continue; } this.capitalInvariantLastAlertAt.set(profileId, now); this.emitEvent({ type: 'INSUFFICIENT_BUYING_POWER', severity: 'ERROR', message: `Capital invariant violation: available capital is negative ($${available.toFixed(2)})`, profileId }); logger.error('Capital invariant violation detected', { event: 'capital_invariant_violation', profileId, allocated: ledger.allocated_capital, reservedOrders: ledger.reserved_for_orders, reservedPositions: ledger.reserved_for_positions, realizedPnl: ledger.realized_pnl, available }); } } } emitEvent(payload: { type: OperationalEventType; severity: OperationalEventSeverity; message: string; profileId?: string; userId?: string; symbol?: string; }) { const event: OperationalEvent = { id: crypto.randomUUID(), timestamp: Date.now(), ...payload }; // Push to Prometheus metrics.operationalEventsTotal.labels( event.severity, event.type, event.profileId || 'global', event.symbol || 'NA' ).inc(); this.eventBuffer.unshift(event); if (this.eventBuffer.length > this.MAX_EVENTS) { this.eventBuffer.pop(); } if (this.onEventCallback) { this.onEventCallback(event); } } getEvents(): OperationalEvent[] { return [...this.eventBuffer]; } clearEvents(): void { this.eventBuffer.length = 0; } subscribe(callback: (event: OperationalEvent) => void) { this.onEventCallback = callback; } async metrics() { // Update aliveness gauges before scraping const tradingAlive = this.isFresh(this.lastTradingLoopAt, config.POLLING_INTERVAL); const monitorAlive = this.isFresh(this.lastMonitorLoopAt, config.MONITOR_INTERVAL_MS); const reconAlive = this.isFresh(this.lastReconciliationAt, config.MONITOR_INTERVAL_MS); metrics.subsystemAlive.labels('trading').set(tradingAlive ? 1 : 0); metrics.subsystemAlive.labels('monitor').set(monitorAlive ? 1 : 0); metrics.subsystemAlive.labels('reconciliation').set(reconAlive ? 1 : 0); return metrics.getMetrics(); } contentType() { return metrics.getContentType(); } getSummary(): ObservabilitySummary { return { tradingLoop: { durationMs: this.lastTradingLoopDuration, lastRunAt: this.lastTradingLoopAt, healthy: this.isFresh(this.lastTradingLoopAt, config.POLLING_INTERVAL) }, monitorLoop: { durationMs: this.lastMonitorLoopDuration, lastRunAt: this.lastMonitorLoopAt }, reconciliation: { durationMs: this.lastReconciliationDuration, lastRunAt: this.lastReconciliationAt, mismatchCount: this.lastReconciliationMismatch, missingFromExchange: this.lastReconciliationMissingFromExchange, missingInDb: this.lastReconciliationMissingInDb, healthy: this.isFresh(this.lastReconciliationAt, config.MONITOR_INTERVAL_MS) }, lockContention: { entry: this.entryLockContentionsCount, reconciliation: this.reconciliationLockContentionsCount }, capitalInvariantViolations: this.capitalInvariantViolationsCount }; } sloFlags() { return { tradingLoopHealthy: this.isFresh(this.lastTradingLoopAt, config.POLLING_INTERVAL), reconciliationHealthy: this.isFresh(this.lastReconciliationAt, config.MONITOR_INTERVAL_MS), reconciliationDegradedStreak: this.reconciliationDegradedStreak }; } private isFresh(lastRun: number | null, intervalMs: number) { if (!lastRun) return false; return (Date.now() - lastRun) <= Math.max(intervalMs * 2, 120_000); } } export const observabilityService = new ObservabilityService();