learning_ai_invt_trdg/backend/src/services/observabilityService.ts

310 lines
12 KiB
TypeScript

import logger from '../utils/logger.js';
import { config } from '../config/index.js';
import { capitalLedger } from './CapitalLedger.js';
import { OperationalEvent, OperationalEventType, OperationalEventSeverity } from '../domain/operationalEvents.js';
import { metrics } from './MetricsService.js';
import crypto from 'crypto';
export interface ObservabilitySummary {
tradingLoop: {
durationMs: number;
lastRunAt: number | null;
healthy: boolean;
};
monitorLoop: {
durationMs: number;
lastRunAt: number | null;
};
reconciliation: {
durationMs: number;
lastRunAt: number | null;
mismatchCount: number;
missingFromExchange: number;
missingInDb: number;
healthy: boolean;
};
lockContention: {
entry: number;
reconciliation: number;
};
capitalInvariantViolations: number;
}
export class ObservabilityService {
private lastTradingLoopAt: number | null = null;
private lastTradingLoopDuration = 0;
private lastMonitorLoopAt: number | null = null;
private lastMonitorLoopDuration = 0;
private lastReconciliationAt: number | null = null;
private lastReconciliationDuration = 0;
private lastReconciliationMismatch = 0;
private lastReconciliationMissingFromExchange = 0;
private lastReconciliationMissingInDb = 0;
private reconciliationDegradedStreak = 0;
private reconciliationLastSloAlertAt = 0;
private entryLockContentionsCount = 0;
private reconciliationLockContentionsCount = 0;
private capitalInvariantViolationsCount = 0;
private capitalInvariantLastAlertAt = new Map<string, number>();
private profileProvider: (() => string[]) | null = null;
private capitalWatchdog?: NodeJS.Timeout;
private watchdogRunning = false;
private readonly eventBuffer: OperationalEvent[] = [];
private readonly MAX_EVENTS = Math.max(50, Math.min(10_000, Number(config.OPERATIONAL_EVENTS_MAX_BUFFER || 2000)));
private onEventCallback: ((event: OperationalEvent) => void) | null = null;
constructor() {
}
recordTradingLoop(durationMs: number) {
metrics.subsystemDurationSeconds.labels('trading').observe(durationMs / 1000);
metrics.subsystemLastRunTimestamp.labels('trading').set(Date.now() / 1000);
metrics.subsystemAlive.labels('trading').set(1);
this.lastTradingLoopDuration = durationMs;
this.lastTradingLoopAt = Date.now();
}
recordMonitorLoop(durationMs: number) {
metrics.subsystemDurationSeconds.labels('monitor').observe(durationMs / 1000);
metrics.subsystemLastRunTimestamp.labels('monitor').set(Date.now() / 1000);
metrics.subsystemAlive.labels('monitor').set(1);
this.lastMonitorLoopDuration = durationMs;
this.lastMonitorLoopAt = Date.now();
}
recordReconciliationLoop(durationMs: number, mismatchCount: number, missingFromExchange: number, missingInDb: number) {
metrics.subsystemDurationSeconds.labels('reconciliation').observe(durationMs / 1000);
metrics.subsystemLastRunTimestamp.labels('reconciliation').set(Date.now() / 1000);
metrics.subsystemAlive.labels('reconciliation').set(1);
if (mismatchCount > 0) {
metrics.reconciliationMismatchesTotal.inc(mismatchCount);
}
metrics.reconciliationMissingItemsCount.labels('exchange').set(missingFromExchange);
metrics.reconciliationMissingItemsCount.labels('db').set(missingInDb);
this.lastReconciliationDuration = durationMs;
this.lastReconciliationAt = Date.now();
this.lastReconciliationMismatch = mismatchCount;
this.lastReconciliationMissingFromExchange = missingFromExchange;
this.lastReconciliationMissingInDb = missingInDb;
const mismatchThreshold = Math.max(0, Number(config.RECONCILIATION_SLO_MISMATCH_THRESHOLD || 1));
const missingExchangeThreshold = Math.max(0, Number(config.RECONCILIATION_SLO_MISSING_EXCHANGE_THRESHOLD || 1));
const missingDbThreshold = Math.max(0, Number(config.RECONCILIATION_SLO_MISSING_DB_THRESHOLD || 1));
const degraded = mismatchCount >= mismatchThreshold
|| missingFromExchange >= missingExchangeThreshold
|| missingInDb >= missingDbThreshold;
if (degraded) {
this.reconciliationDegradedStreak += 1;
} else {
this.reconciliationDegradedStreak = 0;
}
const alertStreak = Math.max(1, Number(config.RECONCILIATION_SLO_ALERT_STREAK || 2));
const throttleMs = Math.max(0, Number(config.RECONCILIATION_SLO_ALERT_THROTTLE_MS || 600_000));
const now = Date.now();
const canAlert = throttleMs <= 0 || (now - this.reconciliationLastSloAlertAt) >= throttleMs;
if (degraded && this.reconciliationDegradedStreak >= alertStreak && canAlert) {
this.reconciliationLastSloAlertAt = now;
const message = `Reconciliation degraded (${this.reconciliationDegradedStreak} cycles): mismatches=${mismatchCount}, missing_from_exchange=${missingFromExchange}, missing_in_db=${missingInDb}`;
this.emitEvent({
type: 'RECONCILIATION_DEGRADED',
severity: 'WARN',
message
});
logger.warn(message, {
event: 'reconciliation_slo_degraded',
mismatchCount,
missingFromExchange,
missingInDb,
degradedStreak: this.reconciliationDegradedStreak
});
}
}
incrementEntryLockContention() {
this.entryLockContentionsCount += 1;
}
incrementReconciliationLockContention() {
this.reconciliationLockContentionsCount += 1;
}
observeExchangeLatency(operation: string, durationMs: number) {
metrics.exchangeApiLatencySeconds.labels(config.PROVIDER, operation).observe(durationMs / 1000);
}
incrementUnsupportedFeature(feature: string) {
// We could add a counter for this too if needed, but not in current scope
}
registerProfileProvider(provider: () => string[]) {
this.profileProvider = provider;
}
startCapitalWatchdog(intervalMs: number = config.CAPITAL_WATCHDOG_INTERVAL_MS) {
if (this.watchdogRunning) return;
this.watchdogRunning = true;
this.capitalWatchdog = setInterval(() => {
this.checkCapitalInvariant();
}, intervalMs);
this.capitalWatchdog.unref();
}
private async checkCapitalInvariant() {
if (!this.profileProvider) return;
const profileIds = this.profileProvider().filter(Boolean);
const epsilonUsd = Math.max(0, Number(config.CAPITAL_INVARIANT_EPSILON_USD || 0));
const epsilonPct = Math.max(0, Number(config.CAPITAL_INVARIANT_EPSILON_PCT || 0));
const throttleMs = Math.max(0, Number(config.CAPITAL_INVARIANT_ALERT_THROTTLE_MS || 0));
for (const profileId of profileIds) {
const ledger = await capitalLedger.getLedger(profileId);
if (!ledger) continue;
const available = capitalLedger.availableCapital(ledger);
const allocated = Number(ledger.allocated_capital || 0);
const denominator = allocated > 0 ? allocated : 1;
const utilization = (ledger.reserved_for_orders + ledger.reserved_for_positions) / denominator * 100;
const epsilon = Math.max(epsilonUsd, Math.abs(allocated) * epsilonPct);
metrics.profileUtilizationPercent.labels(profileId).set(utilization);
if (available < -epsilon) {
metrics.capitalInvariantViolationsTotal.labels(profileId).inc();
this.capitalInvariantViolationsCount += 1;
const now = Date.now();
const lastAlertAt = this.capitalInvariantLastAlertAt.get(profileId) || 0;
const shouldEmit = throttleMs <= 0 || (now - lastAlertAt) >= throttleMs;
if (!shouldEmit) {
continue;
}
this.capitalInvariantLastAlertAt.set(profileId, now);
this.emitEvent({
type: 'INSUFFICIENT_BUYING_POWER',
severity: 'ERROR',
message: `Capital invariant violation: available capital is negative ($${available.toFixed(2)})`,
profileId
});
logger.error('Capital invariant violation detected', {
event: 'capital_invariant_violation',
profileId,
allocated: ledger.allocated_capital,
reservedOrders: ledger.reserved_for_orders,
reservedPositions: ledger.reserved_for_positions,
realizedPnl: ledger.realized_pnl,
available
});
}
}
}
emitEvent(payload: {
type: OperationalEventType;
severity: OperationalEventSeverity;
message: string;
profileId?: string;
userId?: string;
symbol?: string;
}) {
const event: OperationalEvent = {
id: crypto.randomUUID(),
timestamp: Date.now(),
...payload
};
// Push to Prometheus
metrics.operationalEventsTotal.labels(
event.severity,
event.type,
event.profileId || 'global',
event.symbol || 'NA'
).inc();
this.eventBuffer.unshift(event);
if (this.eventBuffer.length > this.MAX_EVENTS) {
this.eventBuffer.pop();
}
if (this.onEventCallback) {
this.onEventCallback(event);
}
}
getEvents(): OperationalEvent[] {
return [...this.eventBuffer];
}
clearEvents(): void {
this.eventBuffer.length = 0;
}
subscribe(callback: (event: OperationalEvent) => void) {
this.onEventCallback = callback;
}
async metrics() {
// Update aliveness gauges before scraping
const tradingAlive = this.isFresh(this.lastTradingLoopAt, config.POLLING_INTERVAL);
const monitorAlive = this.isFresh(this.lastMonitorLoopAt, config.MONITOR_INTERVAL_MS);
const reconAlive = this.isFresh(this.lastReconciliationAt, config.MONITOR_INTERVAL_MS);
metrics.subsystemAlive.labels('trading').set(tradingAlive ? 1 : 0);
metrics.subsystemAlive.labels('monitor').set(monitorAlive ? 1 : 0);
metrics.subsystemAlive.labels('reconciliation').set(reconAlive ? 1 : 0);
return metrics.getMetrics();
}
contentType() {
return metrics.getContentType();
}
getSummary(): ObservabilitySummary {
return {
tradingLoop: {
durationMs: this.lastTradingLoopDuration,
lastRunAt: this.lastTradingLoopAt,
healthy: this.isFresh(this.lastTradingLoopAt, config.POLLING_INTERVAL)
},
monitorLoop: {
durationMs: this.lastMonitorLoopDuration,
lastRunAt: this.lastMonitorLoopAt
},
reconciliation: {
durationMs: this.lastReconciliationDuration,
lastRunAt: this.lastReconciliationAt,
mismatchCount: this.lastReconciliationMismatch,
missingFromExchange: this.lastReconciliationMissingFromExchange,
missingInDb: this.lastReconciliationMissingInDb,
healthy: this.isFresh(this.lastReconciliationAt, config.MONITOR_INTERVAL_MS)
},
lockContention: {
entry: this.entryLockContentionsCount,
reconciliation: this.reconciliationLockContentionsCount
},
capitalInvariantViolations: this.capitalInvariantViolationsCount
};
}
sloFlags() {
return {
tradingLoopHealthy: this.isFresh(this.lastTradingLoopAt, config.POLLING_INTERVAL),
reconciliationHealthy: this.isFresh(this.lastReconciliationAt, config.MONITOR_INTERVAL_MS),
reconciliationDegradedStreak: this.reconciliationDegradedStreak
};
}
private isFresh(lastRun: number | null, intervalMs: number) {
if (!lastRun) return false;
return (Date.now() - lastRun) <= Math.max(intervalMs * 2, 120_000);
}
}
export const observabilityService = new ObservabilityService();