#!/usr/bin/env node /** * Referrals Partition Key Migration — Backfill Script * * Usage: * npx tsx scripts/migrate-referrals.ts [options] * * Options: * --productId Product ID to migrate (default: all products) * --batchSize Batch size for backfill (default: 100) * --verify Run consistency verification after backfill * --mode Set migration mode (dual-write | new-only | old-only) * --dryRun Preview changes without writing * * Examples: * npx tsx scripts/migrate-referrals.ts --productId lysnrai --verify * npx tsx scripts/migrate-referrals.ts --batchSize 50 --dryRun * npx tsx scripts/migrate-referrals.ts --mode new-only */ import { CosmosClient, type Container } from '@azure/cosmos'; import { config } from '../src/lib/config.js'; import type { ReferralDoc } from '../src/modules/referrals/types.js'; interface MigrationOptions { productId?: string; batchSize: number; verify: boolean; mode?: 'dual-write' | 'new-only' | 'old-only'; dryRun: boolean; } function parseArgs(): MigrationOptions { const args = process.argv.slice(2); const options: MigrationOptions = { batchSize: 100, verify: false, dryRun: false, }; for (let i = 0; i < args.length; i++) { const arg = args[i]; switch (arg) { case '--productId': options.productId = args[++i]; break; case '--batchSize': options.batchSize = parseInt(args[++i] || '100', 10); break; case '--verify': options.verify = true; break; case '--mode': options.mode = args[++i] as MigrationOptions['mode']; break; case '--dryRun': options.dryRun = true; break; case '--help': console.log(` Referrals Partition Key Migration — Backfill Script Usage: npx tsx scripts/migrate-referrals.ts [options] Options: --productId Product ID to migrate (default: all) --batchSize Batch size for backfill (default: 100) --verify Run consistency verification after backfill --mode Set migration mode (dual-write|new-only|old-only) --dryRun Preview changes without writing --help Show this help Examples: npx tsx scripts/migrate-referrals.ts --productId lysnrai --verify npx tsx scripts/migrate-referrals.ts --batchSize 50 --dryRun npx tsx scripts/migrate-referrals.ts --mode new-only `); process.exit(0); break; } } return options; } async function getContainers(): Promise<{ oldContainer: Container; newContainer: Container; }> { const endpoint = config.COSMOS_ENDPOINT || process.env.COSMOS_ENDPOINT; const key = config.COSMOS_KEY || process.env.COSMOS_KEY; const database = config.COSMOS_DATABASE || process.env.COSMOS_DATABASE || 'lysnrai'; if (!endpoint || !key) { throw new Error('COSMOS_ENDPOINT and COSMOS_KEY must be set'); } const client = new CosmosClient({ endpoint, key }); const db = client.database(database); return { oldContainer: db.container('referrals'), newContainer: db.container('referrals_v2'), }; } async function getProductIds(oldContainer: Container): Promise { const { resources } = await oldContainer.items .query({ query: 'SELECT DISTINCT VALUE c.productId FROM c WHERE IS_DEFINED(c.productId)', }) .fetchAll(); return resources; } async function backfillProduct( oldContainer: Container, newContainer: Container, productId: string, batchSize: number, dryRun: boolean ): Promise<{ migrated: number; skipped: number; errors: string[] }> { const result = { migrated: 0, skipped: 0, errors: [] as string[] }; // Get all docs from old container for this product const { resources: oldDocs } = await oldContainer.items .query({ query: 'SELECT * FROM c WHERE c.productId = @productId', parameters: [{ name: '@productId', value: productId }], }) .fetchAll(); if (oldDocs.length === 0) { console.log(`[${productId}] No documents in old container`); return result; } console.log(`[${productId}] Found ${oldDocs.length} documents in old container`); if (dryRun) { console.log(`[${productId}] DRY RUN — would migrate ${oldDocs.length} documents`); return { migrated: oldDocs.length, skipped: 0, errors: [] }; } // Get existing docs in new container const { resources: existingDocs } = await newContainer.items .query({ query: 'SELECT * FROM c WHERE c.productId = @productId', parameters: [{ name: '@productId', value: productId }], }) .fetchAll(); const existingIds = new Set(existingDocs.map((d) => d.id)); console.log(`[${productId}] ${existingIds.size} documents already in new container`); // Filter docs needing migration const toMigrate = oldDocs.filter((d) => !existingIds.has(d.id)); console.log(`[${productId}] ${toMigrate.length} documents need migration`); if (toMigrate.length === 0) { return result; } // Migrate in batches for (let i = 0; i < toMigrate.length; i += batchSize) { const batch = toMigrate.slice(i, i + batchSize); const batchNum = Math.floor(i / batchSize) + 1; const totalBatches = Math.ceil(toMigrate.length / batchSize); console.log( `[${productId}] Processing batch ${batchNum}/${totalBatches} (${batch.length} docs)` ); await Promise.all( batch.map(async (doc) => { try { if (!doc.referrerId) { result.errors.push(`Doc ${doc.id}: missing referrerId (required for new PK)`); return; } await newContainer.items.create(doc); result.migrated++; } catch (err: any) { if (err.code === 409) { result.skipped++; } else { result.errors.push(`Doc ${doc.id}: ${err.message}`); } } }) ); } return result; } async function verifyConsistency( oldContainer: Container, newContainer: Container, productId: string ): Promise<{ inconsistencies: { id: string; issue: string }[]; totalChecked: number }> { const inconsistencies: { id: string; issue: string }[] = []; const [{ resources: oldDocs }, { resources: newDocs }] = await Promise.all([ oldContainer.items .query({ query: 'SELECT * FROM c WHERE c.productId = @productId', parameters: [{ name: '@productId', value: productId }], }) .fetchAll(), newContainer.items .query({ query: 'SELECT * FROM c WHERE c.productId = @productId', parameters: [{ name: '@productId', value: productId }], }) .fetchAll(), ]); const oldMap = new Map(oldDocs.map((d) => [d.id, d])); const newMap = new Map(newDocs.map((d) => [d.id, d])); // Check docs in both containers for consistency for (const [id, newDoc] of newMap) { if (!oldMap.has(id)) continue; const oldDoc = oldMap.get(id)!; if (oldDoc.status !== newDoc.status) { inconsistencies.push({ id, issue: `status mismatch: ${oldDoc.status} vs ${newDoc.status}` }); } if (oldDoc.referrerId !== newDoc.referrerId) { inconsistencies.push({ id, issue: 'referrerId mismatch' }); } } // Check for docs only in old (pending backfill) for (const [id] of oldMap) { if (!newMap.has(id)) { inconsistencies.push({ id, issue: 'pending backfill (in old, missing in new)' }); } } return { inconsistencies, totalChecked: oldDocs.length + newDocs.length }; } async function setMigrationMode(mode: string): Promise { // Validate mode const validModes = ['dual-write', 'new-only', 'old-only']; if (!validModes.includes(mode)) { throw new Error(`Invalid mode: ${mode}. Must be one of: ${validModes.join(', ')}`); } // Set environment variable for current process process.env.REFERRAL_MIGRATION_MODE = mode; console.log(`Migration mode set to: ${mode}`); console.log('Note: This is in-memory only. Update deployment config for persistence.'); } async function main(): Promise { const options = parseArgs(); console.log('==============================================='); console.log('Referrals Partition Key Migration — Backfill Script'); console.log('==============================================='); console.log(`Options: ${JSON.stringify(options, null, 2)}`); console.log(''); // Handle mode-only operation if (options.mode) { await setMigrationMode(options.mode); return; } const { oldContainer, newContainer } = await getContainers(); console.log('Connected to Cosmos DB'); // Get product IDs to migrate const productIds = options.productId ? [options.productId] : await getProductIds(oldContainer); console.log(`Migrating for products: ${productIds.join(', ')}`); console.log(''); let totalMigrated = 0; let totalSkipped = 0; let totalErrors = 0; // Backfill each product for (const productId of productIds) { console.log(`\n--- Processing ${productId} ---`); const result = await backfillProduct( oldContainer, newContainer, productId, options.batchSize, options.dryRun ); console.log(` Migrated: ${result.migrated}`); console.log(` Skipped: ${result.skipped}`); if (result.errors.length > 0) { console.log(` Errors: ${result.errors.length}`); result.errors.slice(0, 5).forEach((e) => console.log(` - ${e}`)); if (result.errors.length > 5) { console.log(` ... and ${result.errors.length - 5} more`); } } totalMigrated += result.migrated; totalSkipped += result.skipped; totalErrors += result.errors.length; // Run verification if requested if (options.verify && !options.dryRun) { console.log(`\n Verifying consistency...`); const verifyResult = await verifyConsistency(oldContainer, newContainer, productId); const realInconsistencies = verifyResult.inconsistencies.filter( (i) => !i.issue.includes('pending backfill') ); if (realInconsistencies.length === 0) { console.log(` Consistency check passed (pending: ${verifyResult.inconsistencies.filter(i => i.issue.includes('pending backfill')).length})`); } else { console.log(` WARNING: ${realInconsistencies.length} inconsistencies found:`); realInconsistencies.slice(0, 5).forEach((i) => console.log(` - ${i.id}: ${i.issue}`)); } } } // Summary console.log('\n==============================================='); console.log('Migration Summary'); console.log('==============================================='); console.log(`Total migrated: ${totalMigrated}`); console.log(`Total skipped: ${totalSkipped}`); console.log(`Total errors: ${totalErrors}`); if (totalErrors > 0) { process.exit(1); } console.log('\nMigration completed successfully!'); console.log(''); console.log('Next steps:'); console.log('1. Run verification: npx tsx scripts/migrate-referrals.ts --verify'); console.log('2. Switch to new-only mode: npx tsx scripts/migrate-referrals.ts --mode new-only'); console.log('3. Monitor for issues, then delete old container when confident'); } main().catch((err) => { console.error('Migration failed:', err); process.exit(1); });