From 5c4c001f35c903cce10eb4adcad9d05d3a52336c Mon Sep 17 00:00:00 2001 From: Saravana Achu Mac Date: Sat, 4 Apr 2026 16:33:27 -0700 Subject: [PATCH] refactor: move distributed locks to cosmos-first repository --- .../src/services/distributedLockRepository.ts | 186 ++++++++++++++++++ .../src/services/distributedLockService.ts | 46 +++++ 2 files changed, 232 insertions(+) create mode 100644 backend/src/services/distributedLockRepository.ts diff --git a/backend/src/services/distributedLockRepository.ts b/backend/src/services/distributedLockRepository.ts new file mode 100644 index 0000000..0e70ebe --- /dev/null +++ b/backend/src/services/distributedLockRepository.ts @@ -0,0 +1,186 @@ +import { getContainer } from '@bytelyst/cosmos'; +import { config } from '../config/index.js'; +import logger from '../utils/logger.js'; + +const CONTAINER_NAME = 'runtime_locks'; + +type LockScope = 'entry' | 'reconciliation'; + +interface RuntimeLockDocument { + id: string; + productId: string; + scope: LockScope; + profileId: string; + symbol?: string; + owner: string; + expiresAt: number; + updatedAt: string; + ttl: number; +} + +function isCosmosConfigured(): boolean { + return Boolean(config.COSMOS_ENDPOINT && config.COSMOS_KEY); +} + +function buildEntryLockId(profileId: string, symbol?: string): string { + return `entry:${profileId}:${String(symbol || '').trim()}`; +} + +function buildReconciliationLockId(profileId: string): string { + return `reconciliation:${profileId}`; +} + +async function readLock(id: string): Promise { + const container = getContainer(CONTAINER_NAME); + try { + const { resource } = await container.item(id, config.PRODUCT_ID).read(); + return resource || null; + } catch (error) { + const code = (error as { code?: number })?.code; + if (code === 404) return null; + throw error; + } +} + +async function writeLock(doc: RuntimeLockDocument): Promise { + const container = getContainer(CONTAINER_NAME); + await container.items.upsert(doc); + return true; +} + +async function deleteLock(id: string): Promise { + const container = getContainer(CONTAINER_NAME); + try { + await container.item(id, config.PRODUCT_ID).delete(); + return true; + } catch (error) { + const code = (error as { code?: number })?.code; + if (code === 404) return false; + throw error; + } +} + +function isActive(doc: RuntimeLockDocument | null | undefined): boolean { + return Boolean(doc && Number(doc.expiresAt) > Date.now()); +} + +export async function tryAcquireEntryLock( + profileId: string, + symbol: string | undefined, + owner: string, + ttlSeconds: number +): Promise { + if (!profileId || !owner) return false; + if (!isCosmosConfigured()) return false; + + try { + const id = buildEntryLockId(profileId, symbol); + const existing = await readLock(id); + if (isActive(existing) && existing?.owner !== owner) { + return false; + } + + await writeLock({ + id, + productId: config.PRODUCT_ID, + scope: 'entry', + profileId, + symbol: String(symbol || '').trim() || undefined, + owner, + expiresAt: Date.now() + Math.max(1, ttlSeconds) * 1000, + updatedAt: new Date().toISOString(), + ttl: Math.max(1, ttlSeconds) + }); + return true; + } catch (error) { + logger.warn(`[DistributedLockRepo] Cosmos entry-lock acquire failed: ${error instanceof Error ? error.message : 'unknown error'}`); + return false; + } +} + +export async function releaseEntryLock( + profileId: string, + symbol: string | undefined, + owner: string +): Promise { + if (!profileId || !owner) return false; + if (!isCosmosConfigured()) return false; + + try { + const id = buildEntryLockId(profileId, symbol); + const existing = await readLock(id); + if (!existing) return false; + if (existing.owner !== owner) return false; + return await deleteLock(id); + } catch (error) { + logger.warn(`[DistributedLockRepo] Cosmos entry-lock release failed: ${error instanceof Error ? error.message : 'unknown error'}`); + return false; + } +} + +export async function tryAcquireReconciliationLock( + profileId: string, + owner: string, + ttlSeconds: number +): Promise { + if (!profileId || !owner) return false; + if (!isCosmosConfigured()) return false; + + try { + const id = buildReconciliationLockId(profileId); + const existing = await readLock(id); + if (isActive(existing) && existing?.owner !== owner) { + return false; + } + + await writeLock({ + id, + productId: config.PRODUCT_ID, + scope: 'reconciliation', + profileId, + owner, + expiresAt: Date.now() + Math.max(1, ttlSeconds) * 1000, + updatedAt: new Date().toISOString(), + ttl: Math.max(1, ttlSeconds) + }); + return true; + } catch (error) { + logger.warn(`[DistributedLockRepo] Cosmos reconciliation-lock acquire failed: ${error instanceof Error ? error.message : 'unknown error'}`); + return false; + } +} + +export async function releaseReconciliationLock( + profileId: string, + owner: string +): Promise { + if (!profileId || !owner) return false; + if (!isCosmosConfigured()) return false; + + try { + const id = buildReconciliationLockId(profileId); + const existing = await readLock(id); + if (!existing) return false; + if (existing.owner !== owner) return false; + return await deleteLock(id); + } catch (error) { + logger.warn(`[DistributedLockRepo] Cosmos reconciliation-lock release failed: ${error instanceof Error ? error.message : 'unknown error'}`); + return false; + } +} + +export async function isEntryLockActive( + profileId: string, + symbol: string | undefined +): Promise { + if (!profileId) return false; + if (!isCosmosConfigured()) return false; + + try { + const existing = await readLock(buildEntryLockId(profileId, symbol)); + return isActive(existing); + } catch (error) { + logger.warn(`[DistributedLockRepo] Cosmos entry-lock status failed: ${error instanceof Error ? error.message : 'unknown error'}`); + return true; + } +} diff --git a/backend/src/services/distributedLockService.ts b/backend/src/services/distributedLockService.ts index 9be4c3e..70aa874 100644 --- a/backend/src/services/distributedLockService.ts +++ b/backend/src/services/distributedLockService.ts @@ -2,6 +2,7 @@ import logger from '../utils/logger.js'; import { supabaseService } from './SupabaseService.js'; import { healthTracker } from './healthTracker.js'; import { observabilityService } from './observabilityService.js'; +import * as distributedLockRepository from './distributedLockRepository.js'; const normalizeSymbol = (symbol?: string): string => { return String(symbol || '').trim(); @@ -9,6 +10,15 @@ const normalizeSymbol = (symbol?: string): string => { export class DistributedLockService { async tryAcquireRowLock(profileId: string, symbol: string | undefined, owner: string, ttlSeconds: number = 30): Promise { + if (!profileId || !owner) return false; + const cosmosAcquired = await distributedLockRepository.tryAcquireEntryLock(profileId, symbol, owner, ttlSeconds); + if (cosmosAcquired) { + return true; + } + return this.tryAcquireRowLockLegacy(profileId, symbol, owner, ttlSeconds); + } + + private async tryAcquireRowLockLegacy(profileId: string, symbol: string | undefined, owner: string, ttlSeconds: number = 30): Promise { if (!profileId || !owner) return false; const normalizedSymbol = normalizeSymbol(symbol); const client = supabaseService.getClient(); @@ -41,6 +51,15 @@ export class DistributedLockService { } async releaseRowLock(profileId: string, symbol: string | undefined, owner: string): Promise { + if (!profileId || !owner) return false; + const cosmosReleased = await distributedLockRepository.releaseEntryLock(profileId, symbol, owner); + if (cosmosReleased) { + return true; + } + return this.releaseRowLockLegacy(profileId, symbol, owner); + } + + private async releaseRowLockLegacy(profileId: string, symbol: string | undefined, owner: string): Promise { if (!profileId || !owner) return false; const normalizedSymbol = normalizeSymbol(symbol); const client = supabaseService.getClient(); @@ -68,6 +87,15 @@ export class DistributedLockService { } async tryAcquireReconciliationLock(profileId: string, owner: string, ttlSeconds: number = 30): Promise { + if (!profileId || !owner) return false; + const cosmosAcquired = await distributedLockRepository.tryAcquireReconciliationLock(profileId, owner, ttlSeconds); + if (cosmosAcquired) { + return true; + } + return this.tryAcquireReconciliationLockLegacy(profileId, owner, ttlSeconds); + } + + private async tryAcquireReconciliationLockLegacy(profileId: string, owner: string, ttlSeconds: number = 30): Promise { if (!profileId || !owner) return false; const client = supabaseService.getClient(); if (!client) return false; @@ -94,6 +122,15 @@ export class DistributedLockService { } async releaseReconciliationLock(profileId: string, owner: string): Promise { + if (!profileId || !owner) return false; + const cosmosReleased = await distributedLockRepository.releaseReconciliationLock(profileId, owner); + if (cosmosReleased) { + return true; + } + return this.releaseReconciliationLockLegacy(profileId, owner); + } + + private async releaseReconciliationLockLegacy(profileId: string, owner: string): Promise { if (!profileId || !owner) return false; const client = supabaseService.getClient(); if (!client) return false; @@ -114,6 +151,15 @@ export class DistributedLockService { } async isEntryInProgress(profileId: string, symbol?: string): Promise { + if (!profileId) return false; + const cosmosState = await distributedLockRepository.isEntryLockActive(profileId, symbol); + if (cosmosState) { + return true; + } + return this.isEntryInProgressLegacy(profileId, symbol); + } + + private async isEntryInProgressLegacy(profileId: string, symbol?: string): Promise { if (!profileId) return false; const normalizedSymbol = normalizeSymbol(symbol); const client = supabaseService.getClient();