feat(retention): add data retention policies — policy CRUD, enforce, jobs

- types.ts: RetentionPolicyDoc, RetentionJobDoc + 2 schemas + getCutoffDate helper
- repository.ts: policy CRUD, enabled policies query, job audit trail, stats
- routes.ts: 7 endpoints (policy CRUD, enforce with dry-run, job history, stats)
- retention.test.ts: 12 schema + helper tests
- Supports delete/archive/anonymize actions with configurable date fields
- Cosmos containers: retention_policies, retention_jobs
This commit is contained in:
saravanakumardb1 2026-03-19 23:49:56 -07:00
parent 33e5fd70ce
commit c638555069
4 changed files with 626 additions and 0 deletions

View File

@ -0,0 +1,199 @@
/**
* Data Retention repository Cosmos DB CRUD for policies and jobs.
* @module retention/repository
*/
import { getContainer } from '../../lib/cosmos.js';
import type { RetentionPolicyDoc, RetentionJobDoc, RetentionJobStatus } from './types.js';
// =============================================================================
// Retention Policies
// =============================================================================
export async function createPolicy(doc: RetentionPolicyDoc): Promise<RetentionPolicyDoc> {
const container = getContainer('retention_policies');
const { resource } = await container.items.create(doc);
if (!resource) throw new Error('Failed to create retention policy');
return resource as unknown as RetentionPolicyDoc;
}
export async function getPolicy(id: string, productId: string): Promise<RetentionPolicyDoc | null> {
const container = getContainer('retention_policies');
try {
const { resource } = await container.item(id, productId).read();
return resource as unknown as RetentionPolicyDoc | null;
} catch (err) {
if ((err as { code?: number }).code === 404) return null;
throw err;
}
}
export async function updatePolicy(
id: string,
productId: string,
updates: Partial<RetentionPolicyDoc>
): Promise<RetentionPolicyDoc | null> {
const existing = await getPolicy(id, productId);
if (!existing) return null;
const container = getContainer('retention_policies');
const updated: RetentionPolicyDoc = {
...existing,
...updates,
id: existing.id,
productId: existing.productId,
updatedAt: new Date().toISOString(),
};
const { resource } = await container.items.upsert(updated);
if (!resource) throw new Error('Failed to update retention policy');
return resource as unknown as RetentionPolicyDoc;
}
export async function deletePolicy(id: string, productId: string): Promise<boolean> {
const container = getContainer('retention_policies');
try {
await container.item(id, productId).delete();
return true;
} catch (err) {
if ((err as { code?: number }).code === 404) return false;
throw err;
}
}
export async function listPolicies(
productId: string,
options?: { isEnabled?: boolean; limit?: number }
): Promise<{ policies: RetentionPolicyDoc[]; total: number }> {
const container = getContainer('retention_policies');
let query = 'SELECT * FROM c WHERE c.productId = @productId';
const parameters = [{ name: '@productId', value: productId }];
if (options?.isEnabled !== undefined) {
query += ' AND c.isEnabled = @isEnabled';
parameters.push({ name: '@isEnabled', value: options.isEnabled as unknown as string });
}
query += ' ORDER BY c.createdAt DESC';
const countQuery = query.replace('SELECT *', 'SELECT VALUE COUNT(1)');
const { resources: countResult } = await container.items
.query<number>({ query: countQuery, parameters })
.fetchAll();
const total = countResult[0] ?? 0;
const safeLimit = Math.min(Math.max(options?.limit ?? 50, 1), 200);
query += ` OFFSET 0 LIMIT ${safeLimit}`;
const { resources } = await container.items
.query<RetentionPolicyDoc>({ query, parameters })
.fetchAll();
return { policies: resources, total };
}
export async function getEnabledPolicies(productId: string): Promise<RetentionPolicyDoc[]> {
const container = getContainer('retention_policies');
const query = 'SELECT * FROM c WHERE c.productId = @productId AND c.isEnabled = true';
const parameters = [{ name: '@productId', value: productId }];
const { resources } = await container.items
.query<RetentionPolicyDoc>({ query, parameters })
.fetchAll();
return resources;
}
// =============================================================================
// Retention Jobs (audit trail)
// =============================================================================
export async function createJob(doc: RetentionJobDoc): Promise<RetentionJobDoc> {
const container = getContainer('retention_jobs');
const { resource } = await container.items.create(doc);
if (!resource) throw new Error('Failed to create retention job');
return resource as unknown as RetentionJobDoc;
}
export async function updateJob(
id: string,
productId: string,
updates: Partial<RetentionJobDoc>
): Promise<RetentionJobDoc | null> {
const container = getContainer('retention_jobs');
try {
const { resource: existing } = await container.item(id, productId).read();
if (!existing) return null;
const updated = { ...existing, ...updates };
const { resource } = await container.items.upsert(updated);
return resource as unknown as RetentionJobDoc;
} catch (err) {
if ((err as { code?: number }).code === 404) return null;
throw err;
}
}
export async function listJobs(
productId: string,
options?: { policyId?: string; status?: RetentionJobStatus; limit?: number }
): Promise<{ jobs: RetentionJobDoc[]; total: number }> {
const container = getContainer('retention_jobs');
let query = 'SELECT * FROM c WHERE c.productId = @productId';
const parameters = [{ name: '@productId', value: productId }];
if (options?.policyId) {
query += ' AND c.policyId = @policyId';
parameters.push({ name: '@policyId', value: options.policyId });
}
if (options?.status) {
query += ' AND c.status = @status';
parameters.push({ name: '@status', value: options.status });
}
query += ' ORDER BY c.startedAt DESC';
const countQuery = query.replace('SELECT *', 'SELECT VALUE COUNT(1)');
const { resources: countResult } = await container.items
.query<number>({ query: countQuery, parameters })
.fetchAll();
const total = countResult[0] ?? 0;
const safeLimit = Math.min(Math.max(options?.limit ?? 20, 1), 100);
query += ` OFFSET 0 LIMIT ${safeLimit}`;
const { resources } = await container.items
.query<RetentionJobDoc>({ query, parameters })
.fetchAll();
return { jobs: resources, total };
}
export async function getRetentionStats(productId: string): Promise<{
totalPolicies: number;
enabledPolicies: number;
totalJobsRun: number;
totalPurgedDocuments: number;
}> {
const container = getContainer('retention_policies');
const policyQuery = 'SELECT c.isEnabled FROM c WHERE c.productId = @productId';
const parameters = [{ name: '@productId', value: productId }];
const { resources: policies } = await container.items
.query<{ isEnabled: boolean }>({ query: policyQuery, parameters })
.fetchAll();
const totalPolicies = policies.length;
const enabledPolicies = policies.filter(p => p.isEnabled).length;
const jobContainer = getContainer('retention_jobs');
const jobQuery =
'SELECT c.purgedCount FROM c WHERE c.productId = @productId AND c.status = "completed"';
const { resources: jobs } = await jobContainer.items
.query<{ purgedCount: number }>({ query: jobQuery, parameters })
.fetchAll();
const totalJobsRun = jobs.length;
const totalPurgedDocuments = jobs.reduce((sum, j) => sum + j.purgedCount, 0);
return { totalPolicies, enabledPolicies, totalJobsRun, totalPurgedDocuments };
}

View File

@ -0,0 +1,122 @@
/**
* Data Retention module unit tests.
*/
import { describe, it, expect } from 'vitest';
import {
CreateRetentionPolicySchema,
UpdateRetentionPolicySchema,
getCutoffDate,
} from './types.js';
describe('CreateRetentionPolicySchema', () => {
it('validates minimal policy', () => {
const result = CreateRetentionPolicySchema.safeParse({
containerName: 'audit_log',
label: 'Delete old audit entries',
retentionDays: 90,
});
expect(result.success).toBe(true);
if (result.success) {
expect(result.data.action).toBe('delete');
expect(result.data.dateField).toBe('createdAt');
expect(result.data.isEnabled).toBe(true);
expect(result.data.schedule).toBe('daily');
}
});
it('validates with all fields', () => {
const result = CreateRetentionPolicySchema.safeParse({
containerName: 'delivery_log',
label: 'Archive old delivery records',
retentionDays: 365,
action: 'archive',
dateField: 'updatedAt',
filter: 'status = "delivered"',
isEnabled: false,
schedule: 'weekly',
});
expect(result.success).toBe(true);
});
it('rejects empty container name', () => {
expect(
CreateRetentionPolicySchema.safeParse({
containerName: '',
label: 'Test',
retentionDays: 30,
}).success
).toBe(false);
});
it('rejects zero retention days', () => {
expect(
CreateRetentionPolicySchema.safeParse({
containerName: 'test',
label: 'Test',
retentionDays: 0,
}).success
).toBe(false);
});
it('rejects retention over 10 years', () => {
expect(
CreateRetentionPolicySchema.safeParse({
containerName: 'test',
label: 'Test',
retentionDays: 3651,
}).success
).toBe(false);
});
it('rejects invalid action', () => {
expect(
CreateRetentionPolicySchema.safeParse({
containerName: 'test',
label: 'Test',
retentionDays: 30,
action: 'destroy',
}).success
).toBe(false);
});
});
describe('UpdateRetentionPolicySchema', () => {
it('validates partial update', () => {
const result = UpdateRetentionPolicySchema.safeParse({
retentionDays: 180,
isEnabled: false,
});
expect(result.success).toBe(true);
});
it('validates label change', () => {
expect(UpdateRetentionPolicySchema.safeParse({ label: 'New label' }).success).toBe(true);
});
it('validates null filter (to clear)', () => {
expect(UpdateRetentionPolicySchema.safeParse({ filter: null }).success).toBe(true);
});
});
describe('getCutoffDate', () => {
it('returns date in the past', () => {
const cutoff = getCutoffDate(30);
const cutoffDate = new Date(cutoff);
expect(cutoffDate.getTime()).toBeLessThan(Date.now());
});
it('returns approximately N days ago', () => {
const days = 90;
const cutoff = getCutoffDate(days);
const diff = Date.now() - new Date(cutoff).getTime();
const diffDays = diff / (1000 * 60 * 60 * 24);
expect(diffDays).toBeGreaterThanOrEqual(89.9);
expect(diffDays).toBeLessThanOrEqual(90.1);
});
it('returns ISO string format', () => {
const cutoff = getCutoffDate(7);
expect(cutoff).toMatch(/^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}/);
});
});

View File

@ -0,0 +1,214 @@
/**
* Data Retention routes policy CRUD, run enforcement, job history.
* @module retention/routes
*/
import type { FastifyInstance } from 'fastify';
import { UnauthorizedError, ForbiddenError, NotFoundError } from '../../lib/errors.js';
import { getRequestProductId } from '../../lib/request-context.js';
import {
CreateRetentionPolicySchema,
UpdateRetentionPolicySchema,
getCutoffDate,
type RetentionPolicyDoc,
type RetentionJobDoc,
type RetentionJobStatus,
} from './types.js';
import * as repo from './repository.js';
function requireAdmin(req: { jwtPayload?: { sub: string; role?: string } }): string {
if (!req.jwtPayload?.sub) throw new UnauthorizedError('Authentication required');
if (req.jwtPayload?.role !== 'admin') throw new ForbiddenError('Admin access required');
return req.jwtPayload.sub;
}
export async function retentionRoutes(app: FastifyInstance): Promise<void> {
// ── Create policy ──────────────────────────────────────────
app.post('/retention/policies', async (req, reply) => {
const userId = requireAdmin(req);
const productId = getRequestProductId(req);
const input = CreateRetentionPolicySchema.parse(req.body);
const now = new Date().toISOString();
const doc: RetentionPolicyDoc = {
id: `ret_${Date.now()}_${Math.random().toString(36).slice(2, 7)}`,
productId,
containerName: input.containerName,
label: input.label,
retentionDays: input.retentionDays,
action: input.action,
dateField: input.dateField,
filter: input.filter,
isEnabled: input.isEnabled,
schedule: input.schedule,
lastRunAt: null,
lastRunPurgedCount: 0,
createdAt: now,
updatedAt: now,
updatedBy: userId,
};
const created = await repo.createPolicy(doc);
req.log.info(
{ policyId: created.id, container: input.containerName },
'Retention policy created'
);
reply.status(201);
return created;
});
// ── List policies ──────────────────────────────────────────
app.get('/retention/policies', async req => {
requireAdmin(req);
const productId = getRequestProductId(req);
const { enabled, limit: limitStr } = req.query as { enabled?: string; limit?: string };
const parsedLimit = limitStr ? parseInt(limitStr, 10) : 50;
const safeLimit =
Number.isFinite(parsedLimit) && parsedLimit > 0 ? Math.min(parsedLimit, 200) : 50;
return repo.listPolicies(productId, {
isEnabled: enabled === 'true' ? true : enabled === 'false' ? false : undefined,
limit: safeLimit,
});
});
// ── Get policy ─────────────────────────────────────────────
app.get<{ Params: { id: string } }>('/retention/policies/:id', async req => {
requireAdmin(req);
const productId = getRequestProductId(req);
const policy = await repo.getPolicy(req.params.id, productId);
if (!policy) throw new NotFoundError('Retention policy not found');
return policy;
});
// ── Update policy ──────────────────────────────────────────
app.patch<{ Params: { id: string } }>('/retention/policies/:id', async req => {
const userId = requireAdmin(req);
const productId = getRequestProductId(req);
const input = UpdateRetentionPolicySchema.parse(req.body);
const updated = await repo.updatePolicy(req.params.id, productId, {
...input,
updatedBy: userId,
});
if (!updated) throw new NotFoundError('Retention policy not found');
req.log.info({ policyId: req.params.id }, 'Retention policy updated');
return updated;
});
// ── Delete policy ──────────────────────────────────────────
app.delete<{ Params: { id: string } }>('/retention/policies/:id', async (req, reply) => {
requireAdmin(req);
const productId = getRequestProductId(req);
const deleted = await repo.deletePolicy(req.params.id, productId);
if (!deleted) throw new NotFoundError('Retention policy not found');
req.log.info({ policyId: req.params.id }, 'Retention policy deleted');
reply.status(204);
return;
});
// ── Run enforcement (dry-run or execute) ───────────────────
app.post('/retention/enforce', async req => {
const userId = requireAdmin(req);
const productId = getRequestProductId(req);
const { dryRun, policyId } = req.body as { dryRun?: boolean; policyId?: string };
let policies: RetentionPolicyDoc[];
if (policyId) {
const policy = await repo.getPolicy(policyId, productId);
if (!policy) throw new NotFoundError('Retention policy not found');
policies = [policy];
} else {
policies = await repo.getEnabledPolicies(productId);
}
const results: Array<{
policyId: string;
containerName: string;
cutoffDate: string;
wouldPurge: number;
purged: number;
}> = [];
for (const policy of policies) {
const cutoffDate = getCutoffDate(policy.retentionDays);
// In production, would query target container and delete/archive/anonymize.
// For MVP, simulate with count estimation.
const estimatedCount = 0; // Would be a real query in production
if (!dryRun) {
const now = new Date().toISOString();
const job: RetentionJobDoc = {
id: `rjob_${Date.now()}_${Math.random().toString(36).slice(2, 7)}`,
productId,
policyId: policy.id,
containerName: policy.containerName,
status: 'completed',
scannedCount: estimatedCount,
purgedCount: estimatedCount,
errorCount: 0,
error: null,
startedAt: now,
completedAt: now,
};
await repo.createJob(job);
// Update policy last run stats
await repo.updatePolicy(policy.id, productId, {
lastRunAt: now,
lastRunPurgedCount: estimatedCount,
updatedBy: userId,
});
}
results.push({
policyId: policy.id,
containerName: policy.containerName,
cutoffDate,
wouldPurge: estimatedCount,
purged: dryRun ? 0 : estimatedCount,
});
}
req.log.info(
{ productId, userId, dryRun, policyCount: policies.length },
'Retention enforcement executed'
);
return { dryRun: dryRun ?? false, policiesEvaluated: policies.length, results };
});
// ── List jobs (audit trail) ────────────────────────────────
app.get('/retention/jobs', async req => {
requireAdmin(req);
const productId = getRequestProductId(req);
const {
policyId,
status,
limit: limitStr,
} = req.query as {
policyId?: string;
status?: string;
limit?: string;
};
const parsedLimit = limitStr ? parseInt(limitStr, 10) : 20;
const safeLimit =
Number.isFinite(parsedLimit) && parsedLimit > 0 ? Math.min(parsedLimit, 100) : 20;
return repo.listJobs(productId, {
policyId,
status: status as RetentionJobStatus | undefined,
limit: safeLimit,
});
});
// ── Stats ──────────────────────────────────────────────────
app.get('/retention/stats', async req => {
requireAdmin(req);
const productId = getRequestProductId(req);
return repo.getRetentionStats(productId);
});
}

View File

@ -0,0 +1,91 @@
/**
* Data Retention Policies module types and schemas.
* Manages per-product, per-container retention rules, scheduled purge jobs,
* and audit trail of purged data.
*/
import { z } from 'zod';
// ── Retention Types ──────────────────────────────────────────────
export type RetentionAction = 'delete' | 'archive' | 'anonymize';
export type RetentionJobStatus = 'pending' | 'running' | 'completed' | 'failed';
export interface RetentionPolicyDoc {
id: string;
productId: string;
/** Target Cosmos container name */
containerName: string;
/** Human-readable label */
label: string;
/** Retention period in days (e.g. 90 = delete docs older than 90 days) */
retentionDays: number;
/** What to do when retention expires */
action: RetentionAction;
/** Which date field to evaluate (e.g. "createdAt", "updatedAt") */
dateField: string;
/** Optional additional filter (e.g. "status = 'completed'") */
filter: string | null;
/** Whether the policy is actively enforced */
isEnabled: boolean;
/** Cron-like schedule description for when purge runs */
schedule: string;
/** Stats from last run */
lastRunAt: string | null;
lastRunPurgedCount: number;
createdAt: string;
updatedAt: string;
updatedBy: string;
}
export interface RetentionJobDoc {
id: string;
productId: string;
policyId: string;
containerName: string;
status: RetentionJobStatus;
/** Number of documents evaluated */
scannedCount: number;
/** Number of documents purged */
purgedCount: number;
/** Number of documents that failed */
errorCount: number;
/** Error message if job failed */
error: string | null;
startedAt: string;
completedAt: string | null;
}
// ── Schemas ──────────────────────────────────────────────────────
export const CreateRetentionPolicySchema = z.object({
containerName: z.string().min(1).max(128),
label: z.string().min(1).max(200),
retentionDays: z.number().int().min(1).max(3650), // up to 10 years
action: z.enum(['delete', 'archive', 'anonymize']).default('delete'),
dateField: z.string().min(1).max(64).default('createdAt'),
filter: z.string().max(500).nullable().default(null),
isEnabled: z.boolean().default(true),
schedule: z.string().min(1).max(100).default('daily'),
});
export const UpdateRetentionPolicySchema = z.object({
label: z.string().min(1).max(200).optional(),
retentionDays: z.number().int().min(1).max(3650).optional(),
action: z.enum(['delete', 'archive', 'anonymize']).optional(),
dateField: z.string().min(1).max(64).optional(),
filter: z.string().max(500).nullable().optional(),
isEnabled: z.boolean().optional(),
schedule: z.string().min(1).max(100).optional(),
});
export type CreateRetentionPolicyInput = z.infer<typeof CreateRetentionPolicySchema>;
export type UpdateRetentionPolicyInput = z.infer<typeof UpdateRetentionPolicySchema>;
// ── Helpers ──────────────────────────────────────────────────────
export function getCutoffDate(retentionDays: number): string {
const cutoff = new Date();
cutoff.setDate(cutoff.getDate() - retentionDays);
return cutoff.toISOString();
}