/** * Sync Engine — Core implementation * * @module @bytelyst/sync/engine */ import type { SyncEngine, SyncEngineConfig, SyncItem, SyncResult, SyncStatus, SyncStatusInfo, SyncStatusCallback, EntityName, SyncOperation, ConflictStrategy, } from './types.js'; // ───────────────────────────────────────────────────────────────────────────── // Constants // ───────────────────────────────────────────────────────────────────────────── const DEFAULT_MAX_RETRIES = 5; const DEFAULT_RETRY_DELAY_MS = 1000; const QUEUE_KEY = 'queue'; const LAST_SYNC_KEY = 'lastSync'; // ───────────────────────────────────────────────────────────────────────────── // Sync Engine Implementation // ───────────────────────────────────────────────────────────────────────────── export class SyncEngineImpl implements SyncEngine { private config: SyncEngineConfig; private status: SyncStatus = 'idle'; private queueLength = 0; private lastSyncAt?: string; private statusListeners: Set = new Set(); private connectivityListeners: (() => void)[] = []; constructor(config: SyncEngineConfig) { this.config = { maxRetries: DEFAULT_MAX_RETRIES, retryDelayMs: DEFAULT_RETRY_DELAY_MS, ...config, }; this.setupConnectivityDetection(); } // ─────────────────────────────────────────────────────────────────────────── // Core Operations // ─────────────────────────────────────────────────────────────────────────── async push( entity: EntityName, data: unknown, operation: SyncOperation = 'create' ): Promise { const item: SyncItem = { id: this.generateId(), entity, operation, data, timestamp: new Date().toISOString(), retryCount: 0, }; // Deduplication: Check if there's already a pending item for same entity/data const existingQueue = await this.getQueue(); const dedupKey = this.getDedupKey(entity, data); const existingIndex = existingQueue.findIndex( i => this.getDedupKey(i.entity, i.data) === dedupKey ); if (existingIndex >= 0) { // Replace existing item with newer data existingQueue[existingIndex] = item; } else { existingQueue.push(item); } await this.saveQueue(existingQueue); this.queueLength = existingQueue.length; this.updateStatus('idle'); } async delete(entity: EntityName, id: string): Promise { await this.push(entity, { id }, 'delete'); } async pull(): Promise { const result: SyncResult = { success: true, pushed: 0, pulled: 0, conflicts: 0, errors: 0, timestamp: new Date().toISOString(), }; this.updateStatus('syncing'); try { // Pull changes from server for each entity for (const [entityName, entityConfig] of Object.entries(this.config.entities)) { try { const pulled = await this.pullEntity(entityName, entityConfig.endpoint); result.pulled += pulled; } catch (error) { result.errors++; this.trackError('pull', entityName, error); } } await this.setLastSyncTime(result.timestamp); this.lastSyncAt = result.timestamp; } catch (error) { result.success = false; this.updateStatus('error', error instanceof Error ? error.message : 'Unknown error'); } if (result.success && result.errors === 0) { this.updateStatus('idle'); } return result; } async fullSync(): Promise { const result = await this.pushQueue(); const pullResult = await this.pull(); return { success: result.success && pullResult.success, pushed: result.pushed, pulled: pullResult.pulled, conflicts: result.conflicts + pullResult.conflicts, errors: result.errors + pullResult.errors, timestamp: new Date().toISOString(), }; } // ─────────────────────────────────────────────────────────────────────────── // Queue Management // ─────────────────────────────────────────────────────────────────────────── private async getQueue(): Promise { const queue = await this.config.storage.getItem(QUEUE_KEY); return queue || []; } private async saveQueue(queue: SyncItem[]): Promise { await this.config.storage.setItem(QUEUE_KEY, queue); this.queueLength = queue.length; } private async pushQueue(): Promise { const queue = await this.getQueue(); const result: SyncResult = { success: true, pushed: 0, pulled: 0, conflicts: 0, errors: 0, timestamp: new Date().toISOString(), }; if (queue.length === 0) { return result; } this.updateStatus('syncing'); const remaining: SyncItem[] = []; for (const item of queue) { try { const success = await this.pushItem(item); if (success) { result.pushed++; } else { remaining.push(item); } } catch (error) { if (item.retryCount < (this.config.maxRetries || DEFAULT_MAX_RETRIES)) { item.retryCount++; item.lastError = error instanceof Error ? error.message : String(error); remaining.push(item); } else { result.errors++; this.trackError('push', item.entity, error); } } } await this.saveQueue(remaining); if (result.errors > 0) { result.success = false; } return result; } private async pushItem(item: SyncItem): Promise { const entityConfig = this.config.entities[item.entity]; if (!entityConfig) { throw new Error(`Unknown entity: ${item.entity}`); } const path = item.operation === 'delete' || item.operation === 'update' ? `${entityConfig.endpoint}/${(item.data as { id: string }).id}` : entityConfig.endpoint; const method = item.operation === 'delete' ? 'DELETE' : item.operation === 'update' ? 'PATCH' : 'POST'; try { await this.config.apiClient.fetch(path, { method, body: method !== 'DELETE' ? JSON.stringify(item.data) : undefined, }); return true; } catch { return false; } } private async pullEntity(entityName: string, endpoint: string): Promise { const lastSync = await this.getLastSyncTime(); const path = lastSync ? `${endpoint}?since=${encodeURIComponent(lastSync)}` : endpoint; const result = await this.config.apiClient.safeFetch<{ items: unknown[] }>(path); if (result.error || !result.data) { return 0; } // Store pulled items locally (consumer handles storage) return result.data.items?.length || 0; } // ─────────────────────────────────────────────────────────────────────────── // Conflict Resolution // ─────────────────────────────────────────────────────────────────────────── private async resolveConflict( item: SyncItem, remoteData: unknown, strategy: ConflictStrategy ): Promise { switch (strategy) { case 'server-wins': return remoteData; case 'client-wins': return item.data; case 'last-write-wins': { const localTime = new Date(item.timestamp).getTime(); const remoteTime = new Date( (remoteData as { updatedAt?: string })?.updatedAt || 0 ).getTime(); return localTime > remoteTime ? item.data : remoteData; } case 'manual': if (this.config.onConflict) { return await this.config.onConflict(item, remoteData); } return remoteData; default: return remoteData; } } // ─────────────────────────────────────────────────────────────────────────── // Connectivity // ─────────────────────────────────────────────────────────────────────────── private setupConnectivityDetection(): void { if (typeof window !== 'undefined' && window.addEventListener) { const handleOnline = () => { void this.flush(); this.connectivityListeners.forEach(cb => cb()); }; window.addEventListener('online', handleOnline); } } private isOnline(): boolean { if (typeof navigator !== 'undefined') { return navigator.onLine; } return true; } async flush(): Promise { if (this.status === 'syncing') return; const result = await this.pushQueue(); if (result.success && result.errors === 0) { this.updateStatus('idle'); } } // ─────────────────────────────────────────────────────────────────────────── // Status & Monitoring // ─────────────────────────────────────────────────────────────────────────── getQueueLength(): number { return this.queueLength; } getStatus(): SyncStatusInfo { return { status: this.status, queueLength: this.queueLength, lastSyncAt: this.lastSyncAt, }; } onStatusChange(callback: SyncStatusCallback): () => void { this.statusListeners.add(callback); return () => this.statusListeners.delete(callback); } private updateStatus(status: SyncStatus, error?: string): void { this.status = status; const info: SyncStatusInfo = { status, queueLength: this.queueLength, lastSyncAt: this.lastSyncAt, lastError: error, }; this.statusListeners.forEach(cb => cb(info)); } // ─────────────────────────────────────────────────────────────────────────── // Utility // ─────────────────────────────────────────────────────────────────────────── async clearQueue(): Promise { await this.saveQueue([]); } async reprocessFailed(): Promise { const queue = await this.getQueue(); const reset = queue.map(item => ({ ...item, retryCount: 0, lastError: undefined, })); await this.saveQueue(reset); await this.flush(); } private async getLastSyncTime(): Promise { return (await this.config.storage.getItem(LAST_SYNC_KEY)) || undefined; } private async setLastSyncTime(timestamp: string): Promise { await this.config.storage.setItem(LAST_SYNC_KEY, timestamp); } private generateId(): string { return `${Date.now()}-${Math.random().toString(36).substr(2, 9)}`; } private getDedupKey(entity: string, data: unknown): string { const id = (data as { id?: string })?.id; return id ? `${entity}:${id}` : `${entity}:${JSON.stringify(data)}`; } private trackError(_operation: string, _entity: string, _error: unknown): void { if (this.config.telemetryClient) { // Telemetry tracking would go here } } } // ───────────────────────────────────────────────────────────────────────────── // Factory Function // ───────────────────────────────────────────────────────────────────────────── export function createSyncEngine(config: SyncEngineConfig): SyncEngine { return new SyncEngineImpl(config); }