import logger from '../utils/logger.js'; import { supabaseService } from './SupabaseService.js'; import { healthTracker } from './healthTracker.js'; import { observabilityService } from './observabilityService.js'; import * as distributedLockRepository from './distributedLockRepository.js'; const normalizeSymbol = (symbol?: string): string => { return String(symbol || '').trim(); }; export class DistributedLockService { async tryAcquireRowLock(profileId: string, symbol: string | undefined, owner: string, ttlSeconds: number = 30): Promise { if (!profileId || !owner) return false; const cosmosAcquired = await distributedLockRepository.tryAcquireEntryLock(profileId, symbol, owner, ttlSeconds); if (cosmosAcquired) { return true; } return this.tryAcquireRowLockLegacy(profileId, symbol, owner, ttlSeconds); } private async tryAcquireRowLockLegacy(profileId: string, symbol: string | undefined, owner: string, ttlSeconds: number = 30): Promise { if (!profileId || !owner) return false; const normalizedSymbol = normalizeSymbol(symbol); const client = supabaseService.getClient(); if (!client) return false; const ttl = Number.isFinite(ttlSeconds) ? Math.max(1, ttlSeconds) : 30; const { data, error } = await client .rpc('fn_try_acquire_entry_lock_row', { p_profile_id: profileId, p_symbol: normalizedSymbol, p_owner: owner, p_ttl_seconds: ttl }) .maybeSingle(); if (error) { if (this.isRpcNetworkFailure(error)) { logger.error(`[DistributedLock] RPC network error while acquiring row lock for ${profileId}:${normalizedSymbol}; failing closed: ${error.message}`); return false; } logger.error(`[DistributedLock] Failed to acquire row lock for ${profileId}:${normalizedSymbol}: ${error.message}`); return false; } const acquired = Boolean(data); if (!acquired) { this.recordContention(); } return acquired; } async releaseRowLock(profileId: string, symbol: string | undefined, owner: string): Promise { if (!profileId || !owner) return false; const cosmosReleased = await distributedLockRepository.releaseEntryLock(profileId, symbol, owner); if (cosmosReleased) { return true; } return this.releaseRowLockLegacy(profileId, symbol, owner); } private async releaseRowLockLegacy(profileId: string, symbol: string | undefined, owner: string): Promise { if (!profileId || !owner) return false; const normalizedSymbol = normalizeSymbol(symbol); const client = supabaseService.getClient(); if (!client) return false; const { data, error } = await client .rpc('fn_release_entry_lock_row', { p_profile_id: profileId, p_symbol: normalizedSymbol, p_owner: owner }) .maybeSingle(); if (error) { logger.warn(`[DistributedLock] Failed to release row lock for ${profileId}:${normalizedSymbol}: ${error.message}`); return false; } return Boolean(data); } private isRpcNetworkFailure(error: any): boolean { const message = String(error?.message || '').toLowerCase(); return message.includes('fetch failed') || message.includes('network'); } async tryAcquireReconciliationLock(profileId: string, owner: string, ttlSeconds: number = 30): Promise { if (!profileId || !owner) return false; const cosmosAcquired = await distributedLockRepository.tryAcquireReconciliationLock(profileId, owner, ttlSeconds); if (cosmosAcquired) { return true; } return this.tryAcquireReconciliationLockLegacy(profileId, owner, ttlSeconds); } private async tryAcquireReconciliationLockLegacy(profileId: string, owner: string, ttlSeconds: number = 30): Promise { if (!profileId || !owner) return false; const client = supabaseService.getClient(); if (!client) return false; const ttl = Number.isFinite(ttlSeconds) ? Math.max(1, ttlSeconds) : 30; const { data, error } = await client .rpc('fn_try_acquire_reconciliation_lock_row', { p_profile_id: profileId, p_owner: owner, p_ttl_seconds: ttl }) .maybeSingle(); if (error) { if (this.isRpcNetworkFailure(error)) { logger.error(`[DistributedLock] RPC network error while acquiring reconciliation lock for ${profileId}; failing closed: ${error.message}`); return false; } logger.error(`[DistributedLock] Reconciliation lock acquisition failed for ${profileId}: ${error.message}`); return false; } return Boolean(data); } async releaseReconciliationLock(profileId: string, owner: string): Promise { if (!profileId || !owner) return false; const cosmosReleased = await distributedLockRepository.releaseReconciliationLock(profileId, owner); if (cosmosReleased) { return true; } return this.releaseReconciliationLockLegacy(profileId, owner); } private async releaseReconciliationLockLegacy(profileId: string, owner: string): Promise { if (!profileId || !owner) return false; const client = supabaseService.getClient(); if (!client) return false; const { data, error } = await client .rpc('fn_release_reconciliation_lock_row', { p_profile_id: profileId, p_owner: owner }) .maybeSingle(); if (error) { logger.warn(`[DistributedLock] Failed to release reconciliation lock for ${profileId}: ${error.message}`); return false; } return Boolean(data); } async isEntryInProgress(profileId: string, symbol?: string): Promise { if (!profileId) return false; const cosmosState = await distributedLockRepository.isEntryLockActive(profileId, symbol); if (cosmosState) { return true; } return this.isEntryInProgressLegacy(profileId, symbol); } private async isEntryInProgressLegacy(profileId: string, symbol?: string): Promise { if (!profileId) return false; const normalizedSymbol = normalizeSymbol(symbol); const client = supabaseService.getClient(); if (!client) return false; const { data, error } = await client .rpc('fn_is_entry_in_progress', { p_profile: profileId, p_symbol: normalizedSymbol }) .maybeSingle(); if (error) { logger.error(`[DistributedLock] Lifecycle check failed for ${profileId}:${normalizedSymbol}: ${error.message}`); return true; } return Boolean(data); } recordContention() { healthTracker.incrementLockContention(); observabilityService.incrementEntryLockContention(); } } export const distributedLockService = new DistributedLockService(); export const entryLockService = distributedLockService;