feat(analytics): deepen analytics rollups — aggregation, summary dashboard, top metrics

- types.ts: add AggregateRollupsSchema, SummaryQuerySchema, TopMetricsSchema, AnalyticsSummary
- repository.ts: add aggregateDailyToWeekly, aggregateDailyToMonthly (merge daily rollups)
- repository.ts: add getSummary (trend + top metrics over N days), getTopMetrics (per-date)
- routes.ts: 3 new endpoints — POST /analytics/aggregate, GET /analytics/summary, GET /analytics/top-metrics
- analytics.test.ts: 11 new tests (25 total) for aggregate, summary, top-metrics schemas
- Existing 14 tests unchanged
This commit is contained in:
saravanakumardb1 2026-03-20 00:30:07 -07:00
parent b52ace83d0
commit 1efbb9340d
4 changed files with 296 additions and 4 deletions

View File

@ -3,7 +3,14 @@
*/
import { describe, it, expect } from 'vitest';
import { IngestMetricSchema, IngestBatchSchema, QueryRollupsSchema } from './types.js';
import {
IngestMetricSchema,
IngestBatchSchema,
QueryRollupsSchema,
AggregateRollupsSchema,
SummaryQuerySchema,
TopMetricsSchema,
} from './types.js';
import { toDailyKey, toWeeklyKey, toMonthlyKey, getDateKey } from './repository.js';
// ── Schema Validation ────────────────────────────────────────
@ -100,3 +107,78 @@ describe('date key helpers', () => {
expect(getDateKey(d, 'weekly')).toMatch(/^2026-W\d{2}$/);
});
});
// ── Aggregate Rollups Schema ────────────────────────────────
describe('AggregateRollupsSchema', () => {
it('accepts weekly target', () => {
const result = AggregateRollupsSchema.parse({ targetPeriod: 'weekly' });
expect(result.targetPeriod).toBe('weekly');
expect(result.sourceDate).toBeUndefined();
});
it('accepts monthly target with date', () => {
const result = AggregateRollupsSchema.parse({
targetPeriod: 'monthly',
sourceDate: '2026-03-15',
});
expect(result.targetPeriod).toBe('monthly');
expect(result.sourceDate).toBe('2026-03-15');
});
it('rejects daily as target', () => {
expect(() => AggregateRollupsSchema.parse({ targetPeriod: 'daily' })).toThrow();
});
it('rejects missing targetPeriod', () => {
expect(() => AggregateRollupsSchema.parse({})).toThrow();
});
});
// ── Summary Query Schema ────────────────────────────────────
describe('SummaryQuerySchema', () => {
it('defaults to 30 days', () => {
const result = SummaryQuerySchema.parse({});
expect(result.days).toBe(30);
});
it('accepts custom days', () => {
const result = SummaryQuerySchema.parse({ days: '7' });
expect(result.days).toBe(7);
});
it('rejects 0 days', () => {
expect(() => SummaryQuerySchema.parse({ days: 0 })).toThrow();
});
it('rejects > 365 days', () => {
expect(() => SummaryQuerySchema.parse({ days: 400 })).toThrow();
});
});
// ── Top Metrics Schema ──────────────────────────────────────
describe('TopMetricsSchema', () => {
it('accepts defaults', () => {
const result = TopMetricsSchema.parse({});
expect(result.period).toBe('daily');
expect(result.limit).toBe(10);
expect(result.date).toBeUndefined();
});
it('accepts all params', () => {
const result = TopMetricsSchema.parse({
period: 'weekly',
date: '2026-W12',
limit: '5',
});
expect(result.period).toBe('weekly');
expect(result.date).toBe('2026-W12');
expect(result.limit).toBe(5);
});
it('rejects limit > 50', () => {
expect(() => TopMetricsSchema.parse({ limit: 100 })).toThrow();
});
});

View File

@ -3,7 +3,12 @@
*/
import { getRegisteredContainer } from '@bytelyst/cosmos';
import type { AnalyticsRollupDoc, RollupPeriod, IngestMetricInput } from './types.js';
import type {
AnalyticsRollupDoc,
RollupPeriod,
IngestMetricInput,
AnalyticsSummary,
} from './types.js';
function getContainer() {
return getRegisteredContainer('analytics_rollups');
@ -142,3 +147,133 @@ export async function getRollup(
return null;
}
}
// ── Rollup Aggregation (daily → weekly/monthly) ─────────────
export async function aggregateDailyToWeekly(productId: string, dateStr: string): Promise<number> {
const date = new Date(dateStr);
const weekKey = toWeeklyKey(date);
// Find the Monday of this week
const dayOfWeek = date.getDay();
const monday = new Date(date);
monday.setDate(date.getDate() - ((dayOfWeek + 6) % 7));
const dailies = await queryRollups(productId, 'daily', toDailyKey(monday), dateStr);
if (dailies.length === 0) return 0;
const merged: Record<string, number> = {};
for (const d of dailies) {
for (const [k, v] of Object.entries(d.metrics)) {
merged[k] = (merged[k] ?? 0) + v;
}
}
const id = `${productId}:weekly:${weekKey}`;
const now = new Date().toISOString();
const doc: AnalyticsRollupDoc = {
id,
productId,
period: 'weekly',
date: weekKey,
metrics: merged,
createdAt: now,
updatedAt: now,
};
try {
await getContainer().item(id, productId).replace(doc);
} catch {
await getContainer().items.create(doc);
}
return Object.values(merged).reduce((a, b) => a + b, 0);
}
export async function aggregateDailyToMonthly(productId: string, dateStr: string): Promise<number> {
const date = new Date(dateStr);
const monthKey = toMonthlyKey(date);
const firstDay = `${monthKey}-01`;
const dailies = await queryRollups(productId, 'daily', firstDay, dateStr);
if (dailies.length === 0) return 0;
const merged: Record<string, number> = {};
for (const d of dailies) {
for (const [k, v] of Object.entries(d.metrics)) {
merged[k] = (merged[k] ?? 0) + v;
}
}
const id = `${productId}:monthly:${monthKey}`;
const now = new Date().toISOString();
const doc: AnalyticsRollupDoc = {
id,
productId,
period: 'monthly',
date: monthKey,
metrics: merged,
createdAt: now,
updatedAt: now,
};
try {
await getContainer().item(id, productId).replace(doc);
} catch {
await getContainer().items.create(doc);
}
return Object.values(merged).reduce((a, b) => a + b, 0);
}
// ── Summary Dashboard ───────────────────────────────────────
export async function getSummary(productId: string, days: number): Promise<AnalyticsSummary> {
const to = new Date();
const from = new Date();
from.setDate(to.getDate() - days);
const rollups = await queryRollups(productId, 'daily', toDailyKey(from), toDailyKey(to));
const metricTotals: Record<string, number> = {};
const trend: { date: string; total: number }[] = [];
for (const r of rollups) {
let dayTotal = 0;
for (const [k, v] of Object.entries(r.metrics)) {
metricTotals[k] = (metricTotals[k] ?? 0) + v;
dayTotal += v;
}
trend.push({ date: r.date, total: dayTotal });
}
const totalEvents = Object.values(metricTotals).reduce((a, b) => a + b, 0);
const topMetrics = Object.entries(metricTotals)
.map(([metric, total]) => ({ metric, total }))
.sort((a, b) => b.total - a.total)
.slice(0, 10);
return {
productId,
days,
totalEvents,
uniqueMetrics: Object.keys(metricTotals).length,
topMetrics,
trend,
};
}
// ── Top Metrics for a Date ──────────────────────────────────
export async function getTopMetrics(
productId: string,
period: RollupPeriod,
dateKey: string,
limit: number
): Promise<{ metric: string; value: number }[]> {
const rollup = await getRollup(productId, period, dateKey);
if (!rollup) return [];
return Object.entries(rollup.metrics)
.map(([metric, value]) => ({ metric, value }))
.sort((a, b) => b.value - a.value)
.slice(0, limit);
}

View File

@ -6,8 +6,24 @@
import type { FastifyInstance } from 'fastify';
import { UnauthorizedError, ForbiddenError } from '../../lib/errors.js';
import { getRequestProductId } from '../../lib/request-context.js';
import { IngestMetricSchema, IngestBatchSchema, QueryRollupsSchema } from './types.js';
import { ingestMetric, ingestBatch, queryRollups } from './repository.js';
import {
IngestMetricSchema,
IngestBatchSchema,
QueryRollupsSchema,
AggregateRollupsSchema,
SummaryQuerySchema,
TopMetricsSchema,
} from './types.js';
import {
ingestMetric,
ingestBatch,
queryRollups,
aggregateDailyToWeekly,
aggregateDailyToMonthly,
getSummary,
getTopMetrics,
toDailyKey,
} from './repository.js';
function requireAuth(req: { jwtPayload?: { sub: string; role?: string } }): string {
if (!req.jwtPayload?.sub) throw new UnauthorizedError('Authentication required');
@ -47,4 +63,36 @@ export async function analyticsRoutes(app: FastifyInstance): Promise<void> {
const { period, from, to, metric } = QueryRollupsSchema.parse(req.query);
return queryRollups(productId, period, from, to, metric);
});
// ── Admin: Trigger rollup aggregation ───────────────────
app.post('/analytics/aggregate', async req => {
requireAdmin(req);
const productId = getRequestProductId(req);
const { targetPeriod, sourceDate } = AggregateRollupsSchema.parse(req.body);
const date = sourceDate ?? toDailyKey(new Date(Date.now() - 86_400_000));
const totalEvents =
targetPeriod === 'weekly'
? await aggregateDailyToWeekly(productId, date)
: await aggregateDailyToMonthly(productId, date);
return { targetPeriod, sourceDate: date, totalEvents };
});
// ── Admin: Summary dashboard ────────────────────────────
app.get('/analytics/summary', async req => {
requireAdmin(req);
const productId = getRequestProductId(req);
const { days } = SummaryQuerySchema.parse(req.query);
return getSummary(productId, days);
});
// ── Admin: Top metrics for a date ───────────────────────
app.get('/analytics/top-metrics', async req => {
requireAdmin(req);
const productId = getRequestProductId(req);
const { period, date, limit } = TopMetricsSchema.parse(req.query);
const dateKey = date ?? toDailyKey(new Date());
return getTopMetrics(productId, period, dateKey, limit);
});
}

View File

@ -47,6 +47,33 @@ export const QueryRollupsSchema = z.object({
metric: z.string().optional(), // filter to specific metric
});
export const AggregateRollupsSchema = z.object({
targetPeriod: z.enum(['weekly', 'monthly']),
sourceDate: z.string().optional(), // YYYY-MM-DD to aggregate from; defaults to yesterday
});
export const SummaryQuerySchema = z.object({
days: z.coerce.number().int().min(1).max(365).default(30),
});
export const TopMetricsSchema = z.object({
period: z.enum(['daily', 'weekly', 'monthly']).default('daily'),
date: z.string().optional(), // specific date key; defaults to today
limit: z.coerce.number().int().min(1).max(50).default(10),
});
export interface AnalyticsSummary {
productId: string;
days: number;
totalEvents: number;
uniqueMetrics: number;
topMetrics: { metric: string; total: number }[];
trend: { date: string; total: number }[];
}
export type IngestMetricInput = z.infer<typeof IngestMetricSchema>;
export type IngestBatchInput = z.infer<typeof IngestBatchSchema>;
export type QueryRollupsInput = z.infer<typeof QueryRollupsSchema>;
export type AggregateRollupsInput = z.infer<typeof AggregateRollupsSchema>;
export type SummaryQueryInput = z.infer<typeof SummaryQuerySchema>;
export type TopMetricsInput = z.infer<typeof TopMetricsSchema>;