/** * Sync Engine — Core implementation * * Offline-first sync with: * - Queue persistence via pluggable StorageAdapter * - Deduplication (collapse updates to same entity + id) * - Exponential backoff retry (configurable base/max delay) * - Conflict detection via HTTP 409 + configurable resolution strategies * - Connectivity detection with auto-flush on reconnect * - Telemetry integration for sync success/failure/conflict tracking * - onPull callback so consumers merge pulled data into local store * * @module @bytelyst/sync/engine */ import type { SyncEngine, SyncEngineConfig, SyncItem, SyncResult, SyncStatus, SyncStatusInfo, SyncStatusCallback, EntityName, SyncOperation, ConflictStrategy, Conflict, } from './types.js'; // ───────────────────────────────────────────────────────────────────────────── // Constants // ───────────────────────────────────────────────────────────────────────────── const DEFAULT_MAX_RETRIES = 5; const DEFAULT_RETRY_BASE_DELAY_MS = 1000; const DEFAULT_RETRY_MAX_DELAY_MS = 30_000; const QUEUE_KEY = 'queue'; const LAST_SYNC_KEY = 'lastSync'; // ───────────────────────────────────────────────────────────────────────────── // HTTP 409 Conflict Error // ───────────────────────────────────────────────────────────────────────────── export class SyncConflictError extends Error { constructor(public remoteData: unknown) { super('Sync conflict: server has newer version'); this.name = 'SyncConflictError'; } } // ───────────────────────────────────────────────────────────────────────────── // Helpers // ───────────────────────────────────────────────────────────────────────────── /** Compute exponential backoff with jitter: base * 2^attempt + random jitter */ export function computeBackoff(attempt: number, baseMs: number, maxMs: number): number { const delay = Math.min(baseMs * Math.pow(2, attempt), maxMs); const jitter = delay * 0.1 * Math.random(); return delay + jitter; } function sleep(ms: number): Promise { return new Promise(resolve => setTimeout(resolve, ms)); } // ───────────────────────────────────────────────────────────────────────────── // Sync Engine Implementation // ───────────────────────────────────────────────────────────────────────────── export class SyncEngineImpl implements SyncEngine { private config: Required< Pick > & SyncEngineConfig; private status: SyncStatus = 'idle'; private queueLength = 0; private lastSyncAt?: string; private lastError?: string; private statusListeners: Set = new Set(); private onlineHandler: (() => void) | null = null; private offlineHandler: (() => void) | null = null; private destroyed = false; constructor(config: SyncEngineConfig) { this.config = { maxRetries: DEFAULT_MAX_RETRIES, retryBaseDelayMs: DEFAULT_RETRY_BASE_DELAY_MS, retryMaxDelayMs: DEFAULT_RETRY_MAX_DELAY_MS, ...config, }; this.setupConnectivityDetection(); } // ─────────────────────────────────────────────────────────────────────────── // Core Operations // ─────────────────────────────────────────────────────────────────────────── async push( entity: EntityName, data: unknown, operation: SyncOperation = 'create' ): Promise { const item: SyncItem = { id: this.generateId(), entity, operation, data, timestamp: new Date().toISOString(), retryCount: 0, }; const existingQueue = await this.getQueue(); const dedupKey = this.getDedupKey(entity, data); const existingIndex = existingQueue.findIndex( i => this.getDedupKey(i.entity, i.data) === dedupKey && i.operation === operation ); if (existingIndex >= 0) { existingQueue[existingIndex] = item; } else { existingQueue.push(item); } await this.saveQueue(existingQueue); this.notifyStatus(); } async delete(entity: EntityName, id: string): Promise { // Also remove any pending create/update for this entity+id const queue = await this.getQueue(); const dedupKey = `${entity}:${id}`; const filtered = queue.filter(i => this.getDedupKey(i.entity, i.data) !== dedupKey); const item: SyncItem = { id: this.generateId(), entity, operation: 'delete', data: { id }, timestamp: new Date().toISOString(), retryCount: 0, }; filtered.push(item); await this.saveQueue(filtered); this.notifyStatus(); } async pull(): Promise { const result = this.emptyResult(); this.setStatus('syncing'); try { for (const [entityName, entityConfig] of Object.entries(this.config.entities)) { try { const count = await this.pullEntity(entityName, entityConfig.endpoint); result.pulled += count; } catch (error) { result.errors++; this.trackTelemetry('sync_pull_error', entityName, error); } } result.timestamp = new Date().toISOString(); await this.setLastSyncTime(result.timestamp); this.lastSyncAt = result.timestamp; this.setStatus(result.errors > 0 ? 'error' : 'idle'); } catch (error) { result.success = false; this.setStatus('error', error instanceof Error ? error.message : 'Unknown error'); } return result; } async fullSync(): Promise { if (!this.isOnline()) { this.setStatus('offline'); return { ...this.emptyResult(), success: false }; } const pushResult = await this.pushQueue(); const pullResult = await this.pull(); const combined: SyncResult = { success: pushResult.success && pullResult.success, pushed: pushResult.pushed, pulled: pullResult.pulled, conflicts: pushResult.conflicts + pullResult.conflicts, errors: pushResult.errors + pullResult.errors, timestamp: new Date().toISOString(), }; this.trackTelemetry('sync_complete', '*', undefined, { pushed: combined.pushed, pulled: combined.pulled, conflicts: combined.conflicts, errors: combined.errors, }); return combined; } // ─────────────────────────────────────────────────────────────────────────── // Queue Management // ─────────────────────────────────────────────────────────────────────────── private async getQueue(): Promise { const queue = await this.config.storage.getItem(QUEUE_KEY); return queue || []; } private async saveQueue(queue: SyncItem[]): Promise { await this.config.storage.setItem(QUEUE_KEY, queue); this.queueLength = queue.length; } private async pushQueue(): Promise { const queue = await this.getQueue(); const result = this.emptyResult(); if (queue.length === 0) return result; this.setStatus('syncing'); const remaining: SyncItem[] = []; for (const item of queue) { try { await this.pushItemWithRetry(item); result.pushed++; } catch (error) { if (error instanceof SyncConflictError) { result.conflicts++; const resolved = await this.handleConflict(item, error.remoteData); if (resolved) { result.pushed++; } else { result.errors++; } } else if (item.retryCount < this.config.maxRetries) { item.retryCount++; item.lastError = error instanceof Error ? error.message : String(error); remaining.push(item); } else { result.errors++; this.trackTelemetry('sync_push_dropped', item.entity, error); } } } await this.saveQueue(remaining); if (result.errors > 0) { result.success = false; } return result; } private async pushItemWithRetry(item: SyncItem): Promise { const entityConfig = this.config.entities[item.entity]; if (!entityConfig) { throw new Error(`Unknown entity: ${item.entity}`); } const dataId = (item.data as { id?: string })?.id; const path = (item.operation === 'delete' || item.operation === 'update') && dataId ? `${entityConfig.endpoint}/${dataId}` : entityConfig.endpoint; const method = item.operation === 'delete' ? 'DELETE' : item.operation === 'update' ? 'PATCH' : 'POST'; const headers: Record = {}; if (method !== 'DELETE') { headers['Content-Type'] = 'application/json'; } // Attempt with exponential backoff on transient failures let lastError: unknown; const maxAttempts = Math.max(1, this.config.maxRetries - item.retryCount); for (let attempt = 0; attempt < maxAttempts; attempt++) { try { await this.config.apiClient.fetch(path, { method, headers, body: method !== 'DELETE' ? JSON.stringify(item.data) : undefined, }); // Success — track and return this.trackTelemetry('sync_push_success', item.entity); return; } catch (error) { lastError = error; // Check for conflict (409) — don't retry, let caller handle if (error instanceof SyncConflictError) { throw error; } if (this.isConflictError(error)) { throw new SyncConflictError(undefined); } // Non-retriable errors — throw immediately if (this.isNonRetriable(error)) { throw error; } // Transient error — backoff and retry if (attempt < maxAttempts - 1) { const delay = computeBackoff( attempt, this.config.retryBaseDelayMs, this.config.retryMaxDelayMs ); await sleep(delay); } } } throw lastError; } private async pullEntity(entityName: string, endpoint: string): Promise { const lastSync = await this.getLastSyncTime(); const path = lastSync ? `${endpoint}?since=${encodeURIComponent(lastSync)}` : endpoint; const response = await this.config.apiClient.safeFetch<{ items: unknown[] }>(path); if (response.error || !response.data) { return 0; } const items = response.data.items ?? []; if (items.length > 0 && this.config.onPull) { await this.config.onPull(entityName, items); } return items.length; } // ─────────────────────────────────────────────────────────────────────────── // Conflict Resolution // ─────────────────────────────────────────────────────────────────────────── private async handleConflict(item: SyncItem, remoteData: unknown): Promise { const entityConfig = this.config.entities[item.entity]; if (!entityConfig) return false; const strategy = entityConfig.conflictStrategy; this.trackTelemetry('sync_conflict', item.entity, undefined, { strategy }); try { const winner = await this.resolveConflict(item, remoteData, strategy); if (winner === remoteData) { // Server wins — nothing to push, consumer gets remote via onPull if (this.config.onPull) { await this.config.onPull(item.entity, [remoteData]); } return true; } // Client data wins — re-push with force const dataId = (winner as { id?: string })?.id; const endpoint = entityConfig.endpoint; const path = dataId ? `${endpoint}/${dataId}` : endpoint; await this.config.apiClient.fetch(path, { method: 'PUT', headers: { 'Content-Type': 'application/json' }, body: JSON.stringify(winner), }); return true; } catch { return false; } } private async resolveConflict( item: SyncItem, remoteData: unknown, strategy: ConflictStrategy ): Promise { switch (strategy) { case 'server-wins': return remoteData; case 'client-wins': return item.data; case 'last-write-wins': { const localTime = new Date(item.timestamp).getTime(); const remoteTime = new Date( (remoteData as { updatedAt?: string })?.updatedAt ?? '1970-01-01' ).getTime(); return localTime > remoteTime ? item.data : remoteData; } case 'manual': { if (this.config.onConflict) { const conflict: Conflict = { entity: item.entity, localItem: item, remoteData, }; return await this.config.onConflict(conflict); } // No handler — fall back to server-wins return remoteData; } default: return remoteData; } } // ─────────────────────────────────────────────────────────────────────────── // Connectivity Detection // ─────────────────────────────────────────────────────────────────────────── private setupConnectivityDetection(): void { if (typeof globalThis === 'undefined') return; const win = typeof window !== 'undefined' ? window : undefined; if (!win?.addEventListener) return; this.onlineHandler = () => { this.setStatus('idle'); void this.flush(); }; this.offlineHandler = () => { this.setStatus('offline'); }; win.addEventListener('online', this.onlineHandler); win.addEventListener('offline', this.offlineHandler); } private isOnline(): boolean { if (typeof navigator !== 'undefined' && typeof navigator.onLine === 'boolean') { return navigator.onLine; } return true; // Assume online in non-browser environments (Node.js, SSR) } async flush(): Promise { if (this.destroyed || this.status === 'syncing') return; const result = await this.pushQueue(); if (result.success && result.errors === 0) { this.setStatus('idle'); } } destroy(): void { this.destroyed = true; const win = typeof window !== 'undefined' ? window : undefined; if (win) { if (this.onlineHandler) win.removeEventListener('online', this.onlineHandler); if (this.offlineHandler) win.removeEventListener('offline', this.offlineHandler); } this.statusListeners.clear(); } // ─────────────────────────────────────────────────────────────────────────── // Status & Monitoring // ─────────────────────────────────────────────────────────────────────────── getQueueLength(): number { return this.queueLength; } getStatus(): SyncStatusInfo { return { status: this.status, queueLength: this.queueLength, lastSyncAt: this.lastSyncAt, lastError: this.lastError, }; } onStatusChange(callback: SyncStatusCallback): () => void { this.statusListeners.add(callback); return () => this.statusListeners.delete(callback); } private setStatus(status: SyncStatus, error?: string): void { this.status = status; if (error) this.lastError = error; this.notifyStatus(); } private notifyStatus(): void { const info: SyncStatusInfo = { status: this.status, queueLength: this.queueLength, lastSyncAt: this.lastSyncAt, lastError: this.lastError, }; this.statusListeners.forEach(cb => cb(info)); } // ─────────────────────────────────────────────────────────────────────────── // Utility // ─────────────────────────────────────────────────────────────────────────── async clearQueue(): Promise { await this.saveQueue([]); this.notifyStatus(); } async reprocessFailed(): Promise { const queue = await this.getQueue(); const reset = queue.map(item => ({ ...item, retryCount: 0, lastError: undefined, })); await this.saveQueue(reset); await this.flush(); } private async getLastSyncTime(): Promise { return (await this.config.storage.getItem(LAST_SYNC_KEY)) || undefined; } private async setLastSyncTime(timestamp: string): Promise { await this.config.storage.setItem(LAST_SYNC_KEY, timestamp); } private generateId(): string { return `${Date.now()}-${Math.random().toString(36).slice(2, 11)}`; } private getDedupKey(entity: string, data: unknown): string { const id = (data as { id?: string })?.id; return id ? `${entity}:${id}` : `${entity}:${JSON.stringify(data)}`; } private emptyResult(): SyncResult { return { success: true, pushed: 0, pulled: 0, conflicts: 0, errors: 0, timestamp: new Date().toISOString(), }; } // ─────────────────────────────────────────────────────────────────────────── // Error Classification // ─────────────────────────────────────────────────────────────────────────── private isConflictError(error: unknown): boolean { if (error instanceof SyncConflictError) return true; const msg = error instanceof Error ? error.message : String(error); return msg.includes('409') || msg.includes('conflict'); } private isNonRetriable(error: unknown): boolean { const msg = error instanceof Error ? error.message : String(error); // 4xx errors (except 408, 429) are non-retriable return /\b(400|401|403|404|405|406|410|422)\b/.test(msg); } private extractRemoteData(error: unknown): unknown { if (error instanceof SyncConflictError) return error.remoteData; return undefined; } // ─────────────────────────────────────────────────────────────────────────── // Telemetry // ─────────────────────────────────────────────────────────────────────────── private trackTelemetry( eventName: string, entity: string, error?: unknown, extra?: Record ): void { if (!this.config.telemetryClient) return; try { this.config.telemetryClient.trackEvent('sync', 'sync-engine', eventName, { tags: { productId: this.config.productId, entity, ...(error ? { error: error instanceof Error ? error.message : String(error) } : {}), }, metrics: extra as Record | undefined, }); } catch { // Telemetry should never break sync } } } // ───────────────────────────────────────────────────────────────────────────── // Factory Function // ───────────────────────────────────────────────────────────────────────────── export function createSyncEngine(config: SyncEngineConfig): SyncEngine { return new SyncEngineImpl(config); }