learning_ai_common_plat/packages/sync/src/engine.ts
2026-03-19 21:25:30 -07:00

604 lines
22 KiB
TypeScript

/**
* Sync Engine — Core implementation
*
* Offline-first sync with:
* - Queue persistence via pluggable StorageAdapter
* - Deduplication (collapse updates to same entity + id)
* - Exponential backoff retry (configurable base/max delay)
* - Conflict detection via HTTP 409 + configurable resolution strategies
* - Connectivity detection with auto-flush on reconnect
* - Telemetry integration for sync success/failure/conflict tracking
* - onPull callback so consumers merge pulled data into local store
*
* @module @bytelyst/sync/engine
*/
import type {
SyncEngine,
SyncEngineConfig,
SyncItem,
SyncResult,
SyncStatus,
SyncStatusInfo,
SyncStatusCallback,
EntityName,
SyncOperation,
ConflictStrategy,
Conflict,
} from './types.js';
// ─────────────────────────────────────────────────────────────────────────────
// Constants
// ─────────────────────────────────────────────────────────────────────────────
const DEFAULT_MAX_RETRIES = 5;
const DEFAULT_RETRY_BASE_DELAY_MS = 1000;
const DEFAULT_RETRY_MAX_DELAY_MS = 30_000;
const QUEUE_KEY = 'queue';
const LAST_SYNC_KEY = 'lastSync';
// ─────────────────────────────────────────────────────────────────────────────
// HTTP 409 Conflict Error
// ─────────────────────────────────────────────────────────────────────────────
export class SyncConflictError extends Error {
constructor(public remoteData: unknown) {
super('Sync conflict: server has newer version');
this.name = 'SyncConflictError';
}
}
// ─────────────────────────────────────────────────────────────────────────────
// Helpers
// ─────────────────────────────────────────────────────────────────────────────
/** Compute exponential backoff with jitter: base * 2^attempt + random jitter */
export function computeBackoff(attempt: number, baseMs: number, maxMs: number): number {
const delay = Math.min(baseMs * Math.pow(2, attempt), maxMs);
const jitter = delay * 0.1 * Math.random();
return delay + jitter;
}
function sleep(ms: number): Promise<void> {
return new Promise(resolve => setTimeout(resolve, ms));
}
// ─────────────────────────────────────────────────────────────────────────────
// Sync Engine Implementation
// ─────────────────────────────────────────────────────────────────────────────
export class SyncEngineImpl implements SyncEngine {
private config: Required<
Pick<SyncEngineConfig, 'maxRetries' | 'retryBaseDelayMs' | 'retryMaxDelayMs'>
> &
SyncEngineConfig;
private status: SyncStatus = 'idle';
private queueLength = 0;
private lastSyncAt?: string;
private lastError?: string;
private statusListeners: Set<SyncStatusCallback> = new Set();
private onlineHandler: (() => void) | null = null;
private offlineHandler: (() => void) | null = null;
private destroyed = false;
constructor(config: SyncEngineConfig) {
this.config = {
maxRetries: DEFAULT_MAX_RETRIES,
retryBaseDelayMs: DEFAULT_RETRY_BASE_DELAY_MS,
retryMaxDelayMs: DEFAULT_RETRY_MAX_DELAY_MS,
...config,
};
this.setupConnectivityDetection();
}
// ───────────────────────────────────────────────────────────────────────────
// Core Operations
// ───────────────────────────────────────────────────────────────────────────
async push(
entity: EntityName,
data: unknown,
operation: SyncOperation = 'create'
): Promise<void> {
const item: SyncItem = {
id: this.generateId(),
entity,
operation,
data,
timestamp: new Date().toISOString(),
retryCount: 0,
};
const existingQueue = await this.getQueue();
const dedupKey = this.getDedupKey(entity, data);
const existingIndex = existingQueue.findIndex(
i => this.getDedupKey(i.entity, i.data) === dedupKey && i.operation === operation
);
if (existingIndex >= 0) {
existingQueue[existingIndex] = item;
} else {
existingQueue.push(item);
}
await this.saveQueue(existingQueue);
this.notifyStatus();
}
async delete(entity: EntityName, id: string): Promise<void> {
// Also remove any pending create/update for this entity+id
const queue = await this.getQueue();
const dedupKey = `${entity}:${id}`;
const filtered = queue.filter(i => this.getDedupKey(i.entity, i.data) !== dedupKey);
const item: SyncItem = {
id: this.generateId(),
entity,
operation: 'delete',
data: { id },
timestamp: new Date().toISOString(),
retryCount: 0,
};
filtered.push(item);
await this.saveQueue(filtered);
this.notifyStatus();
}
async pull(): Promise<SyncResult> {
const result = this.emptyResult();
this.setStatus('syncing');
try {
for (const [entityName, entityConfig] of Object.entries(this.config.entities)) {
try {
const count = await this.pullEntity(entityName, entityConfig.endpoint);
result.pulled += count;
} catch (error) {
result.errors++;
this.trackTelemetry('sync_pull_error', entityName, error);
}
}
result.timestamp = new Date().toISOString();
await this.setLastSyncTime(result.timestamp);
this.lastSyncAt = result.timestamp;
this.setStatus(result.errors > 0 ? 'error' : 'idle');
} catch (error) {
result.success = false;
this.setStatus('error', error instanceof Error ? error.message : 'Unknown error');
}
return result;
}
async fullSync(): Promise<SyncResult> {
if (!this.isOnline()) {
this.setStatus('offline');
return { ...this.emptyResult(), success: false };
}
const pushResult = await this.pushQueue();
const pullResult = await this.pull();
const combined: SyncResult = {
success: pushResult.success && pullResult.success,
pushed: pushResult.pushed,
pulled: pullResult.pulled,
conflicts: pushResult.conflicts + pullResult.conflicts,
errors: pushResult.errors + pullResult.errors,
timestamp: new Date().toISOString(),
};
this.trackTelemetry('sync_complete', '*', undefined, {
pushed: combined.pushed,
pulled: combined.pulled,
conflicts: combined.conflicts,
errors: combined.errors,
});
return combined;
}
// ───────────────────────────────────────────────────────────────────────────
// Queue Management
// ───────────────────────────────────────────────────────────────────────────
private async getQueue(): Promise<SyncItem[]> {
const queue = await this.config.storage.getItem<SyncItem[]>(QUEUE_KEY);
return queue || [];
}
private async saveQueue(queue: SyncItem[]): Promise<void> {
await this.config.storage.setItem(QUEUE_KEY, queue);
this.queueLength = queue.length;
}
private async pushQueue(): Promise<SyncResult> {
const queue = await this.getQueue();
const result = this.emptyResult();
if (queue.length === 0) return result;
this.setStatus('syncing');
const remaining: SyncItem[] = [];
for (const item of queue) {
try {
await this.pushItemWithRetry(item);
result.pushed++;
} catch (error) {
if (error instanceof SyncConflictError) {
result.conflicts++;
const resolved = await this.handleConflict(item, error.remoteData);
if (resolved) {
result.pushed++;
} else {
result.errors++;
}
} else if (item.retryCount < this.config.maxRetries) {
item.retryCount++;
item.lastError = error instanceof Error ? error.message : String(error);
remaining.push(item);
} else {
result.errors++;
this.trackTelemetry('sync_push_dropped', item.entity, error);
}
}
}
await this.saveQueue(remaining);
if (result.errors > 0) {
result.success = false;
}
return result;
}
private async pushItemWithRetry(item: SyncItem): Promise<void> {
const entityConfig = this.config.entities[item.entity];
if (!entityConfig) {
throw new Error(`Unknown entity: ${item.entity}`);
}
const dataId = (item.data as { id?: string })?.id;
const path =
(item.operation === 'delete' || item.operation === 'update') && dataId
? `${entityConfig.endpoint}/${dataId}`
: entityConfig.endpoint;
const method =
item.operation === 'delete' ? 'DELETE' : item.operation === 'update' ? 'PATCH' : 'POST';
const headers: Record<string, string> = {};
if (method !== 'DELETE') {
headers['Content-Type'] = 'application/json';
}
// Attempt with exponential backoff on transient failures
let lastError: unknown;
const maxAttempts = Math.max(1, this.config.maxRetries - item.retryCount);
for (let attempt = 0; attempt < maxAttempts; attempt++) {
try {
await this.config.apiClient.fetch<unknown>(path, {
method,
headers,
body: method !== 'DELETE' ? JSON.stringify(item.data) : undefined,
});
// Success — track and return
this.trackTelemetry('sync_push_success', item.entity);
return;
} catch (error) {
lastError = error;
// Check for conflict (409) — don't retry, let caller handle
if (error instanceof SyncConflictError) {
throw error;
}
if (this.isConflictError(error)) {
throw new SyncConflictError(undefined);
}
// Non-retriable errors — throw immediately
if (this.isNonRetriable(error)) {
throw error;
}
// Transient error — backoff and retry
if (attempt < maxAttempts - 1) {
const delay = computeBackoff(
attempt,
this.config.retryBaseDelayMs,
this.config.retryMaxDelayMs
);
await sleep(delay);
}
}
}
throw lastError;
}
private async pullEntity(entityName: string, endpoint: string): Promise<number> {
const lastSync = await this.getLastSyncTime();
const path = lastSync ? `${endpoint}?since=${encodeURIComponent(lastSync)}` : endpoint;
const response = await this.config.apiClient.safeFetch<{ items: unknown[] }>(path);
if (response.error || !response.data) {
return 0;
}
const items = response.data.items ?? [];
if (items.length > 0 && this.config.onPull) {
await this.config.onPull(entityName, items);
}
return items.length;
}
// ───────────────────────────────────────────────────────────────────────────
// Conflict Resolution
// ───────────────────────────────────────────────────────────────────────────
private async handleConflict(item: SyncItem, remoteData: unknown): Promise<boolean> {
const entityConfig = this.config.entities[item.entity];
if (!entityConfig) return false;
const strategy = entityConfig.conflictStrategy;
this.trackTelemetry('sync_conflict', item.entity, undefined, { strategy });
try {
const winner = await this.resolveConflict(item, remoteData, strategy);
if (winner === remoteData) {
// Server wins — nothing to push, consumer gets remote via onPull
if (this.config.onPull) {
await this.config.onPull(item.entity, [remoteData]);
}
return true;
}
// Client data wins — re-push with force
const dataId = (winner as { id?: string })?.id;
const endpoint = entityConfig.endpoint;
const path = dataId ? `${endpoint}/${dataId}` : endpoint;
await this.config.apiClient.fetch(path, {
method: 'PUT',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify(winner),
});
return true;
} catch {
return false;
}
}
private async resolveConflict(
item: SyncItem,
remoteData: unknown,
strategy: ConflictStrategy
): Promise<unknown> {
switch (strategy) {
case 'server-wins':
return remoteData;
case 'client-wins':
return item.data;
case 'last-write-wins': {
const localTime = new Date(item.timestamp).getTime();
const remoteTime = new Date(
(remoteData as { updatedAt?: string })?.updatedAt ?? '1970-01-01'
).getTime();
return localTime > remoteTime ? item.data : remoteData;
}
case 'manual': {
if (this.config.onConflict) {
const conflict: Conflict = {
entity: item.entity,
localItem: item,
remoteData,
};
return await this.config.onConflict(conflict);
}
// No handler — fall back to server-wins
return remoteData;
}
default:
return remoteData;
}
}
// ───────────────────────────────────────────────────────────────────────────
// Connectivity Detection
// ───────────────────────────────────────────────────────────────────────────
private setupConnectivityDetection(): void {
if (typeof globalThis === 'undefined') return;
const win = typeof window !== 'undefined' ? window : undefined;
if (!win?.addEventListener) return;
this.onlineHandler = () => {
this.setStatus('idle');
void this.flush();
};
this.offlineHandler = () => {
this.setStatus('offline');
};
win.addEventListener('online', this.onlineHandler);
win.addEventListener('offline', this.offlineHandler);
}
private isOnline(): boolean {
if (typeof navigator !== 'undefined' && typeof navigator.onLine === 'boolean') {
return navigator.onLine;
}
return true; // Assume online in non-browser environments (Node.js, SSR)
}
async flush(): Promise<void> {
if (this.destroyed || this.status === 'syncing') return;
const result = await this.pushQueue();
if (result.success && result.errors === 0) {
this.setStatus('idle');
}
}
destroy(): void {
this.destroyed = true;
const win = typeof window !== 'undefined' ? window : undefined;
if (win) {
if (this.onlineHandler) win.removeEventListener('online', this.onlineHandler);
if (this.offlineHandler) win.removeEventListener('offline', this.offlineHandler);
}
this.statusListeners.clear();
}
// ───────────────────────────────────────────────────────────────────────────
// Status & Monitoring
// ───────────────────────────────────────────────────────────────────────────
getQueueLength(): number {
return this.queueLength;
}
getStatus(): SyncStatusInfo {
return {
status: this.status,
queueLength: this.queueLength,
lastSyncAt: this.lastSyncAt,
lastError: this.lastError,
};
}
onStatusChange(callback: SyncStatusCallback): () => void {
this.statusListeners.add(callback);
return () => this.statusListeners.delete(callback);
}
private setStatus(status: SyncStatus, error?: string): void {
this.status = status;
if (error) this.lastError = error;
this.notifyStatus();
}
private notifyStatus(): void {
const info: SyncStatusInfo = {
status: this.status,
queueLength: this.queueLength,
lastSyncAt: this.lastSyncAt,
lastError: this.lastError,
};
this.statusListeners.forEach(cb => cb(info));
}
// ───────────────────────────────────────────────────────────────────────────
// Utility
// ───────────────────────────────────────────────────────────────────────────
async clearQueue(): Promise<void> {
await this.saveQueue([]);
this.notifyStatus();
}
async reprocessFailed(): Promise<void> {
const queue = await this.getQueue();
const reset = queue.map(item => ({
...item,
retryCount: 0,
lastError: undefined,
}));
await this.saveQueue(reset);
await this.flush();
}
private async getLastSyncTime(): Promise<string | undefined> {
return (await this.config.storage.getItem<string>(LAST_SYNC_KEY)) || undefined;
}
private async setLastSyncTime(timestamp: string): Promise<void> {
await this.config.storage.setItem(LAST_SYNC_KEY, timestamp);
}
private generateId(): string {
return `${Date.now()}-${Math.random().toString(36).slice(2, 11)}`;
}
private getDedupKey(entity: string, data: unknown): string {
const id = (data as { id?: string })?.id;
return id ? `${entity}:${id}` : `${entity}:${JSON.stringify(data)}`;
}
private emptyResult(): SyncResult {
return {
success: true,
pushed: 0,
pulled: 0,
conflicts: 0,
errors: 0,
timestamp: new Date().toISOString(),
};
}
// ───────────────────────────────────────────────────────────────────────────
// Error Classification
// ───────────────────────────────────────────────────────────────────────────
private isConflictError(error: unknown): boolean {
if (error instanceof SyncConflictError) return true;
const msg = error instanceof Error ? error.message : String(error);
return msg.includes('409') || msg.includes('conflict');
}
private isNonRetriable(error: unknown): boolean {
const msg = error instanceof Error ? error.message : String(error);
// 4xx errors (except 408, 429) are non-retriable
return /\b(400|401|403|404|405|406|410|422)\b/.test(msg);
}
private extractRemoteData(error: unknown): unknown {
if (error instanceof SyncConflictError) return error.remoteData;
return undefined;
}
// ───────────────────────────────────────────────────────────────────────────
// Telemetry
// ───────────────────────────────────────────────────────────────────────────
private trackTelemetry(
eventName: string,
entity: string,
error?: unknown,
extra?: Record<string, unknown>
): void {
if (!this.config.telemetryClient) return;
try {
this.config.telemetryClient.trackEvent('sync', 'sync-engine', eventName, {
tags: {
productId: this.config.productId,
entity,
...(error ? { error: error instanceof Error ? error.message : String(error) } : {}),
},
metrics: extra as Record<string, number> | undefined,
});
} catch {
// Telemetry should never break sync
}
}
}
// ─────────────────────────────────────────────────────────────────────────────
// Factory Function
// ─────────────────────────────────────────────────────────────────────────────
export function createSyncEngine(config: SyncEngineConfig): SyncEngine {
return new SyncEngineImpl(config);
}