352 lines
11 KiB
JavaScript
352 lines
11 KiB
JavaScript
#!/usr/bin/env node
|
|
/**
|
|
* Referrals Partition Key Migration — Backfill Script
|
|
*
|
|
* Usage:
|
|
* npx tsx scripts/migrate-referrals.ts [options]
|
|
*
|
|
* Options:
|
|
* --productId <id> Product ID to migrate (default: all products)
|
|
* --batchSize <n> Batch size for backfill (default: 100)
|
|
* --verify Run consistency verification after backfill
|
|
* --mode <mode> Set migration mode (dual-write | new-only | old-only)
|
|
* --dryRun Preview changes without writing
|
|
*
|
|
* Examples:
|
|
* npx tsx scripts/migrate-referrals.ts --productId lysnrai --verify
|
|
* npx tsx scripts/migrate-referrals.ts --batchSize 50 --dryRun
|
|
* npx tsx scripts/migrate-referrals.ts --mode new-only
|
|
*/
|
|
|
|
import { CosmosClient, type Container } from '@azure/cosmos';
|
|
import { config } from '../src/lib/config.js';
|
|
import type { ReferralDoc } from '../src/modules/referrals/types.js';
|
|
|
|
interface MigrationOptions {
|
|
productId?: string;
|
|
batchSize: number;
|
|
verify: boolean;
|
|
mode?: 'dual-write' | 'new-only' | 'old-only';
|
|
dryRun: boolean;
|
|
}
|
|
|
|
function parseArgs(): MigrationOptions {
|
|
const args = process.argv.slice(2);
|
|
const options: MigrationOptions = {
|
|
batchSize: 100,
|
|
verify: false,
|
|
dryRun: false,
|
|
};
|
|
|
|
for (let i = 0; i < args.length; i++) {
|
|
const arg = args[i];
|
|
switch (arg) {
|
|
case '--productId':
|
|
options.productId = args[++i];
|
|
break;
|
|
case '--batchSize':
|
|
options.batchSize = parseInt(args[++i] || '100', 10);
|
|
break;
|
|
case '--verify':
|
|
options.verify = true;
|
|
break;
|
|
case '--mode':
|
|
options.mode = args[++i] as MigrationOptions['mode'];
|
|
break;
|
|
case '--dryRun':
|
|
options.dryRun = true;
|
|
break;
|
|
case '--help':
|
|
console.log(`
|
|
Referrals Partition Key Migration — Backfill Script
|
|
|
|
Usage: npx tsx scripts/migrate-referrals.ts [options]
|
|
|
|
Options:
|
|
--productId <id> Product ID to migrate (default: all)
|
|
--batchSize <n> Batch size for backfill (default: 100)
|
|
--verify Run consistency verification after backfill
|
|
--mode <mode> Set migration mode (dual-write|new-only|old-only)
|
|
--dryRun Preview changes without writing
|
|
--help Show this help
|
|
|
|
Examples:
|
|
npx tsx scripts/migrate-referrals.ts --productId lysnrai --verify
|
|
npx tsx scripts/migrate-referrals.ts --batchSize 50 --dryRun
|
|
npx tsx scripts/migrate-referrals.ts --mode new-only
|
|
`);
|
|
process.exit(0);
|
|
break;
|
|
}
|
|
}
|
|
|
|
return options;
|
|
}
|
|
|
|
async function getContainers(): Promise<{
|
|
oldContainer: Container;
|
|
newContainer: Container;
|
|
}> {
|
|
const endpoint = config.COSMOS_ENDPOINT || process.env.COSMOS_ENDPOINT;
|
|
const key = config.COSMOS_KEY || process.env.COSMOS_KEY;
|
|
const database = config.COSMOS_DATABASE || process.env.COSMOS_DATABASE || 'lysnrai';
|
|
|
|
if (!endpoint || !key) {
|
|
throw new Error('COSMOS_ENDPOINT and COSMOS_KEY must be set');
|
|
}
|
|
|
|
const client = new CosmosClient({ endpoint, key });
|
|
const db = client.database(database);
|
|
|
|
return {
|
|
oldContainer: db.container('referrals'),
|
|
newContainer: db.container('referrals_v2'),
|
|
};
|
|
}
|
|
|
|
async function getProductIds(oldContainer: Container): Promise<string[]> {
|
|
const { resources } = await oldContainer.items
|
|
.query<string>({
|
|
query: 'SELECT DISTINCT VALUE c.productId FROM c WHERE IS_DEFINED(c.productId)',
|
|
})
|
|
.fetchAll();
|
|
return resources;
|
|
}
|
|
|
|
async function backfillProduct(
|
|
oldContainer: Container,
|
|
newContainer: Container,
|
|
productId: string,
|
|
batchSize: number,
|
|
dryRun: boolean
|
|
): Promise<{ migrated: number; skipped: number; errors: string[] }> {
|
|
const result = { migrated: 0, skipped: 0, errors: [] as string[] };
|
|
|
|
// Get all docs from old container for this product
|
|
const { resources: oldDocs } = await oldContainer.items
|
|
.query<ReferralDoc>({
|
|
query: 'SELECT * FROM c WHERE c.productId = @productId',
|
|
parameters: [{ name: '@productId', value: productId }],
|
|
})
|
|
.fetchAll();
|
|
|
|
if (oldDocs.length === 0) {
|
|
console.log(`[${productId}] No documents in old container`);
|
|
return result;
|
|
}
|
|
|
|
console.log(`[${productId}] Found ${oldDocs.length} documents in old container`);
|
|
|
|
if (dryRun) {
|
|
console.log(`[${productId}] DRY RUN — would migrate ${oldDocs.length} documents`);
|
|
return { migrated: oldDocs.length, skipped: 0, errors: [] };
|
|
}
|
|
|
|
// Get existing docs in new container
|
|
const { resources: existingDocs } = await newContainer.items
|
|
.query<ReferralDoc>({
|
|
query: 'SELECT * FROM c WHERE c.productId = @productId',
|
|
parameters: [{ name: '@productId', value: productId }],
|
|
})
|
|
.fetchAll();
|
|
|
|
const existingIds = new Set(existingDocs.map((d) => d.id));
|
|
console.log(`[${productId}] ${existingIds.size} documents already in new container`);
|
|
|
|
// Filter docs needing migration
|
|
const toMigrate = oldDocs.filter((d) => !existingIds.has(d.id));
|
|
console.log(`[${productId}] ${toMigrate.length} documents need migration`);
|
|
|
|
if (toMigrate.length === 0) {
|
|
return result;
|
|
}
|
|
|
|
// Migrate in batches
|
|
for (let i = 0; i < toMigrate.length; i += batchSize) {
|
|
const batch = toMigrate.slice(i, i + batchSize);
|
|
const batchNum = Math.floor(i / batchSize) + 1;
|
|
const totalBatches = Math.ceil(toMigrate.length / batchSize);
|
|
|
|
console.log(
|
|
`[${productId}] Processing batch ${batchNum}/${totalBatches} (${batch.length} docs)`
|
|
);
|
|
|
|
await Promise.all(
|
|
batch.map(async (doc) => {
|
|
try {
|
|
if (!doc.referrerId) {
|
|
result.errors.push(`Doc ${doc.id}: missing referrerId (required for new PK)`);
|
|
return;
|
|
}
|
|
|
|
await newContainer.items.create(doc);
|
|
result.migrated++;
|
|
} catch (err: any) {
|
|
if (err.code === 409) {
|
|
result.skipped++;
|
|
} else {
|
|
result.errors.push(`Doc ${doc.id}: ${err.message}`);
|
|
}
|
|
}
|
|
})
|
|
);
|
|
}
|
|
|
|
return result;
|
|
}
|
|
|
|
async function verifyConsistency(
|
|
oldContainer: Container,
|
|
newContainer: Container,
|
|
productId: string
|
|
): Promise<{ inconsistencies: { id: string; issue: string }[]; totalChecked: number }> {
|
|
const inconsistencies: { id: string; issue: string }[] = [];
|
|
|
|
const [{ resources: oldDocs }, { resources: newDocs }] = await Promise.all([
|
|
oldContainer.items
|
|
.query<ReferralDoc>({
|
|
query: 'SELECT * FROM c WHERE c.productId = @productId',
|
|
parameters: [{ name: '@productId', value: productId }],
|
|
})
|
|
.fetchAll(),
|
|
newContainer.items
|
|
.query<ReferralDoc>({
|
|
query: 'SELECT * FROM c WHERE c.productId = @productId',
|
|
parameters: [{ name: '@productId', value: productId }],
|
|
})
|
|
.fetchAll(),
|
|
]);
|
|
|
|
const oldMap = new Map(oldDocs.map((d) => [d.id, d]));
|
|
const newMap = new Map(newDocs.map((d) => [d.id, d]));
|
|
|
|
// Check docs in both containers for consistency
|
|
for (const [id, newDoc] of newMap) {
|
|
if (!oldMap.has(id)) continue;
|
|
|
|
const oldDoc = oldMap.get(id)!;
|
|
if (oldDoc.status !== newDoc.status) {
|
|
inconsistencies.push({ id, issue: `status mismatch: ${oldDoc.status} vs ${newDoc.status}` });
|
|
}
|
|
if (oldDoc.referrerId !== newDoc.referrerId) {
|
|
inconsistencies.push({ id, issue: 'referrerId mismatch' });
|
|
}
|
|
}
|
|
|
|
// Check for docs only in old (pending backfill)
|
|
for (const [id] of oldMap) {
|
|
if (!newMap.has(id)) {
|
|
inconsistencies.push({ id, issue: 'pending backfill (in old, missing in new)' });
|
|
}
|
|
}
|
|
|
|
return { inconsistencies, totalChecked: oldDocs.length + newDocs.length };
|
|
}
|
|
|
|
async function setMigrationMode(mode: string): Promise<void> {
|
|
// Validate mode
|
|
const validModes = ['dual-write', 'new-only', 'old-only'];
|
|
if (!validModes.includes(mode)) {
|
|
throw new Error(`Invalid mode: ${mode}. Must be one of: ${validModes.join(', ')}`);
|
|
}
|
|
|
|
// Set environment variable for current process
|
|
process.env.REFERRAL_MIGRATION_MODE = mode;
|
|
console.log(`Migration mode set to: ${mode}`);
|
|
console.log('Note: This is in-memory only. Update deployment config for persistence.');
|
|
}
|
|
|
|
async function main(): Promise<void> {
|
|
const options = parseArgs();
|
|
|
|
console.log('===============================================');
|
|
console.log('Referrals Partition Key Migration — Backfill Script');
|
|
console.log('===============================================');
|
|
console.log(`Options: ${JSON.stringify(options, null, 2)}`);
|
|
console.log('');
|
|
|
|
// Handle mode-only operation
|
|
if (options.mode) {
|
|
await setMigrationMode(options.mode);
|
|
return;
|
|
}
|
|
|
|
const { oldContainer, newContainer } = await getContainers();
|
|
console.log('Connected to Cosmos DB');
|
|
|
|
// Get product IDs to migrate
|
|
const productIds = options.productId ? [options.productId] : await getProductIds(oldContainer);
|
|
console.log(`Migrating for products: ${productIds.join(', ')}`);
|
|
console.log('');
|
|
|
|
let totalMigrated = 0;
|
|
let totalSkipped = 0;
|
|
let totalErrors = 0;
|
|
|
|
// Backfill each product
|
|
for (const productId of productIds) {
|
|
console.log(`\n--- Processing ${productId} ---`);
|
|
const result = await backfillProduct(
|
|
oldContainer,
|
|
newContainer,
|
|
productId,
|
|
options.batchSize,
|
|
options.dryRun
|
|
);
|
|
|
|
console.log(` Migrated: ${result.migrated}`);
|
|
console.log(` Skipped: ${result.skipped}`);
|
|
if (result.errors.length > 0) {
|
|
console.log(` Errors: ${result.errors.length}`);
|
|
result.errors.slice(0, 5).forEach((e) => console.log(` - ${e}`));
|
|
if (result.errors.length > 5) {
|
|
console.log(` ... and ${result.errors.length - 5} more`);
|
|
}
|
|
}
|
|
|
|
totalMigrated += result.migrated;
|
|
totalSkipped += result.skipped;
|
|
totalErrors += result.errors.length;
|
|
|
|
// Run verification if requested
|
|
if (options.verify && !options.dryRun) {
|
|
console.log(`\n Verifying consistency...`);
|
|
const verifyResult = await verifyConsistency(oldContainer, newContainer, productId);
|
|
const realInconsistencies = verifyResult.inconsistencies.filter(
|
|
(i) => !i.issue.includes('pending backfill')
|
|
);
|
|
|
|
if (realInconsistencies.length === 0) {
|
|
console.log(` Consistency check passed (pending: ${verifyResult.inconsistencies.filter(i => i.issue.includes('pending backfill')).length})`);
|
|
} else {
|
|
console.log(` WARNING: ${realInconsistencies.length} inconsistencies found:`);
|
|
realInconsistencies.slice(0, 5).forEach((i) => console.log(` - ${i.id}: ${i.issue}`));
|
|
}
|
|
}
|
|
}
|
|
|
|
// Summary
|
|
console.log('\n===============================================');
|
|
console.log('Migration Summary');
|
|
console.log('===============================================');
|
|
console.log(`Total migrated: ${totalMigrated}`);
|
|
console.log(`Total skipped: ${totalSkipped}`);
|
|
console.log(`Total errors: ${totalErrors}`);
|
|
|
|
if (totalErrors > 0) {
|
|
process.exit(1);
|
|
}
|
|
|
|
console.log('\nMigration completed successfully!');
|
|
console.log('');
|
|
console.log('Next steps:');
|
|
console.log('1. Run verification: npx tsx scripts/migrate-referrals.ts --verify');
|
|
console.log('2. Switch to new-only mode: npx tsx scripts/migrate-referrals.ts --mode new-only');
|
|
console.log('3. Monitor for issues, then delete old container when confident');
|
|
}
|
|
|
|
main().catch((err) => {
|
|
console.error('Migration failed:', err);
|
|
process.exit(1);
|
|
});
|