learning_ai_common_plat/packages/sync/src/sync.test.ts
2026-03-19 21:25:30 -07:00

609 lines
23 KiB
TypeScript

/**
* Sync Engine Tests — 25+ tests
*
* Covers: queue persistence, retry with backoff, conflict resolution (all 4
* strategies), deduplication, connectivity, onPull callback, telemetry,
* delete consolidation, multiple entities, status monitoring, destroy.
*
* @module @bytelyst/sync/sync.test
*/
import { describe, it, expect, beforeEach, vi } from 'vitest';
import { createSyncEngine, InMemoryAdapter, computeBackoff, SyncConflictError } from './index.js';
import type { SyncStatusInfo, SyncEngineConfig, EntityConfig } from './types.js';
import type { ApiClient, ApiResult } from '@bytelyst/api-client';
import type { TelemetryClient } from '@bytelyst/telemetry-client';
// ─────────────────────────────────────────────────────────────────────────────
// Helpers
// ─────────────────────────────────────────────────────────────────────────────
interface MockApiClient extends ApiClient {
getRequests(): { path: string; options?: RequestInit }[];
setFetchBehavior(fn: (path: string, options?: RequestInit) => unknown): void;
setSafeFetchBehavior(fn: (path: string) => unknown): void;
}
function createMockApiClient(): MockApiClient {
const requests: { path: string; options?: RequestInit }[] = [];
let fetchBehavior: ((path: string, options?: RequestInit) => unknown) | null = null;
let safeFetchBehavior: ((path: string) => unknown) | null = null;
return {
fetch: async <T>(path: string, options?: RequestInit): Promise<T> => {
requests.push({ path, options });
if (fetchBehavior) return fetchBehavior(path, options) as T;
return {} as T;
},
safeFetch: async <T>(path: string, options?: RequestInit): Promise<ApiResult<T>> => {
requests.push({ path, options });
if (safeFetchBehavior) return safeFetchBehavior(path) as ApiResult<T>;
return { data: { items: [] } as unknown as T, error: null };
},
getRequests: () => requests,
setFetchBehavior: fn => {
fetchBehavior = fn;
},
setSafeFetchBehavior: fn => {
safeFetchBehavior = fn;
},
};
}
function createMockTelemetry(): TelemetryClient & { events: { eventName: string }[] } {
const events: { eventName: string }[] = [];
return {
init: vi.fn(),
trackEvent: (eventType: string, module: string, eventName: string) => {
events.push({ eventName });
},
flush: vi.fn(),
shutdown: vi.fn(),
getInstallId: () => 'test-install',
getSessionId: () => 'test-session',
events,
};
}
const TASKS_ENTITY: EntityConfig = {
endpoint: '/tasks',
partitionKey: 'userId',
conflictStrategy: 'server-wins',
};
function makeConfig(
storage: InMemoryAdapter,
apiClient: MockApiClient,
overrides?: Partial<SyncEngineConfig>
): SyncEngineConfig {
return {
productId: 'test',
entities: { tasks: TASKS_ENTITY },
storage,
apiClient,
maxRetries: 3,
retryBaseDelayMs: 1, // fast for tests
retryMaxDelayMs: 10,
...overrides,
};
}
// ─────────────────────────────────────────────────────────────────────────────
// Storage Adapter Tests
// ─────────────────────────────────────────────────────────────────────────────
describe('InMemoryAdapter', () => {
it('stores and retrieves items', () => {
const s = new InMemoryAdapter();
s.setItem('k', { x: 1 });
expect(s.getItem<{ x: number }>('k')).toEqual({ x: 1 });
});
it('returns null for missing keys', () => {
expect(new InMemoryAdapter().getItem('nope')).toBeNull();
});
it('lists all keys', () => {
const s = new InMemoryAdapter();
s.setItem('a', 1);
s.setItem('b', 2);
expect(s.keys()).toEqual(expect.arrayContaining(['a', 'b']));
});
it('removes items', () => {
const s = new InMemoryAdapter();
s.setItem('a', 1);
s.removeItem('a');
expect(s.getItem('a')).toBeNull();
});
it('clears all items', () => {
const s = new InMemoryAdapter();
s.setItem('a', 1);
s.setItem('b', 2);
s.clear();
expect(s.keys()).toHaveLength(0);
});
});
// ─────────────────────────────────────────────────────────────────────────────
// computeBackoff
// ─────────────────────────────────────────────────────────────────────────────
describe('computeBackoff', () => {
it('returns increasing delays', () => {
const d0 = computeBackoff(0, 1000, 30000);
const d1 = computeBackoff(1, 1000, 30000);
const d2 = computeBackoff(2, 1000, 30000);
// d0 ~ 1000, d1 ~ 2000, d2 ~ 4000 (+ jitter)
expect(d0).toBeLessThan(d1);
expect(d1).toBeLessThan(d2);
});
it('caps at maxMs', () => {
const d = computeBackoff(20, 1000, 5000);
expect(d).toBeLessThanOrEqual(5500); // 5000 + 10% jitter max
});
});
// ─────────────────────────────────────────────────────────────────────────────
// Sync Engine — Core Operations
// ─────────────────────────────────────────────────────────────────────────────
describe('Sync Engine', () => {
let storage: InMemoryAdapter;
let apiClient: MockApiClient;
beforeEach(() => {
storage = new InMemoryAdapter();
apiClient = createMockApiClient();
});
// ─── Creation ──────────────────────────────────────────────────────────
it('creates engine with all interface methods', () => {
const engine = createSyncEngine(makeConfig(storage, apiClient));
expect(engine.push).toBeTypeOf('function');
expect(engine.delete).toBeTypeOf('function');
expect(engine.pull).toBeTypeOf('function');
expect(engine.fullSync).toBeTypeOf('function');
expect(engine.getQueueLength).toBeTypeOf('function');
expect(engine.getStatus).toBeTypeOf('function');
expect(engine.onStatusChange).toBeTypeOf('function');
expect(engine.clearQueue).toBeTypeOf('function');
expect(engine.reprocessFailed).toBeTypeOf('function');
expect(engine.flush).toBeTypeOf('function');
expect(engine.destroy).toBeTypeOf('function');
});
// ─── Queue Persistence ─────────────────────────────────────────────────
it('persists queue across engine instances (simulated restart)', async () => {
const engine1 = createSyncEngine(makeConfig(storage, apiClient));
await engine1.push('tasks', { id: 't1', title: 'persist me' });
engine1.destroy();
// "Restart" — new engine, same storage
const engine2 = createSyncEngine(makeConfig(storage, apiClient));
const result = await engine2.fullSync();
expect(result.pushed).toBe(1);
const reqs = apiClient.getRequests();
const postReq = reqs.find(r => r.options?.method === 'POST');
expect(postReq).toBeDefined();
expect(postReq!.path).toBe('/tasks');
});
// ─── Push & Deduplication ──────────────────────────────────────────────
it('deduplicates updates to same entity+id', async () => {
const engine = createSyncEngine(makeConfig(storage, apiClient));
await engine.push('tasks', { id: '1', title: 'v1' }, 'update');
await engine.push('tasks', { id: '1', title: 'v2' }, 'update');
expect(engine.getQueueLength()).toBe(1);
const result = await engine.fullSync();
expect(result.pushed).toBe(1);
// The last value should be sent
const patchReq = apiClient.getRequests().find(r => r.options?.method === 'PATCH');
expect(patchReq).toBeDefined();
expect(patchReq!.path).toBe('/tasks/1');
const body = JSON.parse(patchReq!.options!.body as string);
expect(body.title).toBe('v2');
});
it('does not deduplicate different operations on same id', async () => {
const engine = createSyncEngine(makeConfig(storage, apiClient));
await engine.push('tasks', { id: '1', title: 'create' }, 'create');
await engine.push('tasks', { id: '1', title: 'update' }, 'update');
expect(engine.getQueueLength()).toBe(2);
});
it('does not deduplicate items without id', async () => {
const engine = createSyncEngine(makeConfig(storage, apiClient));
await engine.push('tasks', { title: 'Task A' });
await engine.push('tasks', { title: 'Task B' });
expect(engine.getQueueLength()).toBe(2);
});
// ─── Delete ────────────────────────────────────────────────────────────
it('delete removes pending create/update for same entity+id', async () => {
const engine = createSyncEngine(makeConfig(storage, apiClient));
await engine.push('tasks', { id: 'x', title: 'created' });
await engine.push('tasks', { id: 'x', title: 'updated' }, 'update');
// Now delete should collapse the above
await engine.delete('tasks', 'x');
expect(engine.getQueueLength()).toBe(1);
const result = await engine.fullSync();
expect(result.pushed).toBe(1);
const delReq = apiClient.getRequests().find(r => r.options?.method === 'DELETE');
expect(delReq).toBeDefined();
expect(delReq!.path).toBe('/tasks/x');
});
// ─── Pull + onPull Callback ────────────────────────────────────────────
it('invokes onPull with pulled items', async () => {
const pulled: { entity: string; items: unknown[] }[] = [];
apiClient.setSafeFetchBehavior(() => ({
data: { items: [{ id: 'r1', title: 'Remote Task' }] },
error: null,
}));
const engine = createSyncEngine(
makeConfig(storage, apiClient, {
onPull: (entity, items) => {
pulled.push({ entity, items });
},
})
);
const result = await engine.pull();
expect(result.pulled).toBe(1);
expect(pulled).toHaveLength(1);
expect(pulled[0].entity).toBe('tasks');
expect(pulled[0].items).toHaveLength(1);
});
it('pull appends ?since= parameter after first sync', async () => {
const engine = createSyncEngine(makeConfig(storage, apiClient));
await engine.pull(); // first pull — no since
const firstReq = apiClient.getRequests().find(r => r.path.startsWith('/tasks'));
expect(firstReq!.path).toBe('/tasks');
await engine.pull(); // second pull — should have since=
const allReqs = apiClient.getRequests().filter(r => r.path.startsWith('/tasks'));
const secondReq = allReqs[allReqs.length - 1];
expect(secondReq.path).toContain('?since=');
});
// ─── fullSync ──────────────────────────────────────────────────────────
it('fullSync pushes then pulls', async () => {
const engine = createSyncEngine(makeConfig(storage, apiClient));
await engine.push('tasks', { id: 't1', title: 'local' });
const result = await engine.fullSync();
expect(result.pushed).toBe(1);
expect(engine.getQueueLength()).toBe(0);
expect(engine.getStatus().lastSyncAt).toBeTruthy();
});
// ─── Retry with Backoff ────────────────────────────────────────────────
it('retries on transient errors and keeps item in queue', async () => {
let callCount = 0;
apiClient.setFetchBehavior(() => {
callCount++;
throw new Error('500 Internal Server Error');
});
const engine = createSyncEngine(makeConfig(storage, apiClient, { maxRetries: 3 }));
await engine.push('tasks', { id: 'fail', title: 'will fail' });
await engine.flush();
// Item should still be in queue with incremented retryCount
expect(engine.getQueueLength()).toBe(1);
// Multiple fetch attempts were made (backoff retries within pushItemWithRetry)
expect(callCount).toBeGreaterThan(1);
});
it('drops item after exceeding maxRetries', async () => {
apiClient.setFetchBehavior(() => {
throw new Error('500');
});
const engine = createSyncEngine(makeConfig(storage, apiClient, { maxRetries: 1 }));
await engine.push('tasks', { id: 'drop', title: 'drop me' });
// First flush: pushItemWithRetry exhausts attempts, pushQueue increments retryCount
await engine.flush();
// Second flush: retryCount >= maxRetries → dropped
await engine.flush();
expect(engine.getQueueLength()).toBe(0);
});
// ─── Conflict Resolution ───────────────────────────────────────────────
it('server-wins: accepts remote data on conflict', async () => {
const pulled: unknown[][] = [];
apiClient.setFetchBehavior(() => {
throw new SyncConflictError({ id: 'c1', title: 'Server Version' });
});
const engine = createSyncEngine(
makeConfig(storage, apiClient, {
entities: {
tasks: { endpoint: '/tasks', partitionKey: 'userId', conflictStrategy: 'server-wins' },
},
onPull: (_entity, items) => {
pulled.push(items);
},
})
);
await engine.push('tasks', { id: 'c1', title: 'Client Version' });
const result = await engine.fullSync();
expect(result.conflicts).toBe(1);
// server-wins: onPull should have been called with remote data
expect(pulled.length).toBeGreaterThanOrEqual(1);
});
it('client-wins: re-pushes local data on conflict', async () => {
let callIdx = 0;
apiClient.setFetchBehavior(() => {
callIdx++;
if (callIdx === 1) {
throw new SyncConflictError({ id: 'c2', title: 'Server' });
}
return {}; // Second call (PUT) succeeds
});
const engine = createSyncEngine(
makeConfig(storage, apiClient, {
entities: {
tasks: { endpoint: '/tasks', partitionKey: 'userId', conflictStrategy: 'client-wins' },
},
})
);
await engine.push('tasks', { id: 'c2', title: 'Client' });
const result = await engine.fullSync();
expect(result.conflicts).toBe(1);
expect(result.pushed).toBe(1); // conflict resolved → counted as pushed
// Should have made a PUT request with client data
const putReq = apiClient.getRequests().find(r => r.options?.method === 'PUT');
expect(putReq).toBeDefined();
});
it('last-write-wins: picks newer timestamp', async () => {
const pulled: unknown[][] = [];
const oldDate = '2020-01-01T00:00:00.000Z';
apiClient.setFetchBehavior(() => {
throw new SyncConflictError({ id: 'c3', title: 'Server', updatedAt: oldDate });
});
const engine = createSyncEngine(
makeConfig(storage, apiClient, {
entities: {
tasks: {
endpoint: '/tasks',
partitionKey: 'userId',
conflictStrategy: 'last-write-wins',
},
},
onPull: (_entity, items) => {
pulled.push(items);
},
})
);
// Client push will have a newer timestamp than 2020
await engine.push('tasks', { id: 'c3', title: 'Client Newer' });
const result = await engine.fullSync();
expect(result.conflicts).toBe(1);
// Client is newer → should NOT have called onPull with server data
// Instead it should have re-pushed (PUT)
const putReq = apiClient.getRequests().find(r => r.options?.method === 'PUT');
expect(putReq).toBeDefined();
});
it('manual: calls onConflict handler', async () => {
apiClient.setFetchBehavior((_path, options) => {
const method = options?.method;
if (method === 'POST') {
throw new SyncConflictError({ id: 'c4', title: 'Server' });
}
return {};
});
const onConflict = vi.fn().mockResolvedValue({ id: 'c4', title: 'Merged' });
const engine = createSyncEngine(
makeConfig(storage, apiClient, {
entities: {
tasks: { endpoint: '/tasks', partitionKey: 'userId', conflictStrategy: 'manual' },
},
onConflict,
})
);
await engine.push('tasks', { id: 'c4', title: 'Client' });
const result = await engine.fullSync();
expect(result.conflicts).toBe(1);
expect(onConflict).toHaveBeenCalledTimes(1);
expect(onConflict.mock.calls[0][0]).toMatchObject({
entity: 'tasks',
remoteData: { id: 'c4', title: 'Server' },
});
});
// ─── Multiple Entities ─────────────────────────────────────────────────
it('handles multiple entity types', async () => {
const pulled: { entity: string; items: unknown[] }[] = [];
apiClient.setSafeFetchBehavior(path => {
if (path.startsWith('/tasks')) {
return { data: { items: [{ id: 't1' }] }, error: null };
}
if (path.startsWith('/notes')) {
return { data: { items: [{ id: 'n1' }, { id: 'n2' }] }, error: null };
}
return { data: { items: [] }, error: null };
});
const engine = createSyncEngine(
makeConfig(storage, apiClient, {
entities: {
tasks: { endpoint: '/tasks', partitionKey: 'userId', conflictStrategy: 'server-wins' },
notes: { endpoint: '/notes', partitionKey: 'userId', conflictStrategy: 'client-wins' },
},
onPull: (entity, items) => {
pulled.push({ entity, items });
},
})
);
await engine.push('tasks', { id: 't-new', title: 'Task' });
await engine.push('notes', { id: 'n-new', body: 'Note' });
const result = await engine.fullSync();
expect(result.pushed).toBe(2);
expect(result.pulled).toBe(3); // 1 task + 2 notes
expect(pulled).toHaveLength(2);
});
// ─── Status Monitoring ─────────────────────────────────────────────────
it('returns correct initial status', () => {
const engine = createSyncEngine(makeConfig(storage, apiClient));
const status = engine.getStatus();
expect(status.status).toBe('idle');
expect(status.queueLength).toBe(0);
expect(status.lastSyncAt).toBeUndefined();
});
it('notifies listeners on status changes', async () => {
const engine = createSyncEngine(makeConfig(storage, apiClient));
const statuses: SyncStatusInfo[] = [];
engine.onStatusChange(s => statuses.push({ ...s }));
await engine.push('tasks', { id: 'x', title: 'X' });
await engine.fullSync();
const statusNames = statuses.map(s => s.status);
expect(statusNames).toContain('syncing');
expect(statusNames).toContain('idle');
});
it('unsubscribe stops notifications', async () => {
const engine = createSyncEngine(makeConfig(storage, apiClient));
const statuses: string[] = [];
const unsub = engine.onStatusChange(s => statuses.push(s.status));
await engine.push('tasks', { title: 'A' });
const countBefore = statuses.length;
unsub();
await engine.push('tasks', { title: 'B' });
expect(statuses.length).toBe(countBefore);
});
// ─── clearQueue ────────────────────────────────────────────────────────
it('clearQueue empties the queue', async () => {
const engine = createSyncEngine(makeConfig(storage, apiClient));
await engine.push('tasks', { title: 'A' });
await engine.push('tasks', { title: 'B' });
expect(engine.getQueueLength()).toBe(2);
await engine.clearQueue();
expect(engine.getQueueLength()).toBe(0);
const result = await engine.fullSync();
expect(result.pushed).toBe(0);
});
// ─── reprocessFailed ───────────────────────────────────────────────────
it('reprocessFailed resets retry counts and re-flushes', async () => {
let failCount = 0;
apiClient.setFetchBehavior(() => {
failCount++;
if (failCount <= 2) throw new Error('transient');
return {};
});
const engine = createSyncEngine(makeConfig(storage, apiClient, { maxRetries: 1 }));
await engine.push('tasks', { id: 'rp', title: 'reprocess' });
await engine.flush(); // fails, item stays in queue
expect(engine.getQueueLength()).toBe(1);
// Now make API succeed and reprocess
apiClient.setFetchBehavior(() => ({}));
await engine.reprocessFailed();
expect(engine.getQueueLength()).toBe(0);
});
// ─── Telemetry Integration ─────────────────────────────────────────────
it('tracks sync events via telemetry client', async () => {
const telemetry = createMockTelemetry();
const engine = createSyncEngine(
makeConfig(storage, apiClient, {
telemetryClient: telemetry,
})
);
await engine.push('tasks', { id: 't1', title: 'test' });
await engine.fullSync();
const eventNames = telemetry.events.map(e => e.eventName);
expect(eventNames).toContain('sync_push_success');
expect(eventNames).toContain('sync_complete');
});
it('telemetry tracks push errors', async () => {
const telemetry = createMockTelemetry();
apiClient.setFetchBehavior(() => {
throw new Error('400 Bad Request');
});
const engine = createSyncEngine(
makeConfig(storage, apiClient, {
telemetryClient: telemetry,
maxRetries: 1,
})
);
await engine.push('tasks', { id: 'bad', title: 'fail' });
await engine.flush();
await engine.flush(); // second flush drops item
const eventNames = telemetry.events.map(e => e.eventName);
expect(eventNames).toContain('sync_push_dropped');
});
// ─── Destroy ───────────────────────────────────────────────────────────
it('destroy prevents further flush', async () => {
const engine = createSyncEngine(makeConfig(storage, apiClient));
await engine.push('tasks', { title: 'orphan' });
engine.destroy();
await engine.flush(); // should be no-op after destroy
// Item still in queue (flush was no-op)
expect(engine.getQueueLength()).toBe(1);
});
});