feat(telemetry): add telemetry module — ingest, config, query, clusters, policies (34 tests)

This commit is contained in:
saravanakumardb1 2026-02-17 09:06:43 -08:00
parent 083cf029c1
commit ce4c4ff53d
6 changed files with 1368 additions and 0 deletions

View File

@ -34,6 +34,10 @@ const CONTAINER_DEFS: Record<string, ContainerConfig> = {
daily_briefs: { partitionKeyPath: '/userId' },
reflections: { partitionKeyPath: '/userId' },
brain_insights: { partitionKeyPath: '/userId' },
// Telemetry (client diagnostics — see docs/WINDSURF/CLIENT_TELEMETRY_DESIGN.md)
telemetry_events: { partitionKeyPath: '/pk', defaultTtl: 30 * 86400 },
telemetry_error_clusters: { partitionKeyPath: '/pk', defaultTtl: 90 * 86400 },
telemetry_collection_policies: { partitionKeyPath: '/productId' },
};
export async function initCosmosIfNeeded(): Promise<void> {

View File

@ -0,0 +1,254 @@
/**
* Telemetry repository Cosmos DB CRUD for events, policies, and clusters.
*/
import { getContainer } from '../../lib/cosmos.js';
import type {
TelemetryEventDoc,
TelemetryCollectionPolicyDoc,
TelemetryErrorCluster,
TelemetryQueryInput,
} from './types.js';
// ─── Container accessors ────────────────────────────────────────────
function eventsContainer() {
return getContainer('telemetry_events');
}
function policiesContainer() {
return getContainer('telemetry_collection_policies');
}
function clustersContainer() {
return getContainer('telemetry_error_clusters');
}
// ─── Events ─────────────────────────────────────────────────────────
export async function upsertEvent(doc: TelemetryEventDoc): Promise<void> {
await eventsContainer().items.upsert(doc);
}
export async function upsertEventsBatch(docs: TelemetryEventDoc[]): Promise<void> {
// Cosmos DB doesn't have native batch across partitions.
// We upsert individually; for v1 this is acceptable.
// TODO: Group by pk and use bulk operations for same-partition batches.
const promises = docs.map(doc => eventsContainer().items.upsert(doc));
await Promise.all(promises);
}
export async function queryEvents(
productId: string,
input: TelemetryQueryInput
): Promise<{ events: TelemetryEventDoc[]; continuationToken?: string }> {
const conditions: string[] = ['c.productId = @productId'];
const parameters: Array<{ name: string; value: string | number | boolean }> = [
{ name: '@productId', value: productId },
];
if (input.userId) {
conditions.push('c.userId = @userId');
parameters.push({ name: '@userId', value: input.userId });
}
if (input.anonymousInstallId) {
conditions.push('c.anonymousInstallId = @anonymousInstallId');
parameters.push({ name: '@anonymousInstallId', value: input.anonymousInstallId });
}
if (input.platform) {
conditions.push('c.platform = @platform');
parameters.push({ name: '@platform', value: input.platform });
}
if (input.channel) {
conditions.push('c.channel = @channel');
parameters.push({ name: '@channel', value: input.channel });
}
if (input.osFamily) {
conditions.push('c.osFamily = @osFamily');
parameters.push({ name: '@osFamily', value: input.osFamily });
}
if (input.appVersion) {
conditions.push('c.appVersion = @appVersion');
parameters.push({ name: '@appVersion', value: input.appVersion });
}
if (input.buildNumber) {
conditions.push('c.buildNumber = @buildNumber');
parameters.push({ name: '@buildNumber', value: input.buildNumber });
}
if (input.module) {
conditions.push('c.module = @module');
parameters.push({ name: '@module', value: input.module });
}
if (input.eventName) {
conditions.push('c.eventName = @eventName');
parameters.push({ name: '@eventName', value: input.eventName });
}
if (input.eventType) {
conditions.push('c.eventType = @eventType');
parameters.push({ name: '@eventType', value: input.eventType });
}
if (input.from) {
conditions.push('c.occurredAt >= @from');
parameters.push({ name: '@from', value: input.from });
}
if (input.to) {
conditions.push('c.occurredAt <= @to');
parameters.push({ name: '@to', value: input.to });
}
const query = `SELECT * FROM c WHERE ${conditions.join(' AND ')} ORDER BY c.occurredAt DESC`;
const iterator = eventsContainer().items.query<TelemetryEventDoc>(
{ query, parameters },
{
maxItemCount: input.limit,
continuationToken: input.continuationToken || undefined,
}
);
const { resources, continuationToken } = await iterator.fetchNext();
return {
events: resources,
continuationToken: continuationToken || undefined,
};
}
export async function deleteEventsByUserId(productId: string, userId: string): Promise<number> {
// Find all events for this user, then delete them
const { resources } = await eventsContainer()
.items.query<{ id: string; pk: string }>({
query: 'SELECT c.id, c.pk FROM c WHERE c.productId = @productId AND c.userId = @userId',
parameters: [
{ name: '@productId', value: productId },
{ name: '@userId', value: userId },
],
})
.fetchAll();
let deleted = 0;
for (const doc of resources) {
try {
await eventsContainer().item(doc.id, doc.pk).delete();
deleted++;
} catch {
// Skip docs that fail to delete (already deleted, etc.)
}
}
return deleted;
}
// ─── Policies ───────────────────────────────────────────────────────
export async function listPolicies(productId: string): Promise<TelemetryCollectionPolicyDoc[]> {
const { resources } = await policiesContainer()
.items.query<TelemetryCollectionPolicyDoc>({
query: 'SELECT * FROM c WHERE c.productId = @productId ORDER BY c.priority DESC',
parameters: [{ name: '@productId', value: productId }],
})
.fetchAll();
return resources;
}
export async function getPolicy(
id: string,
productId: string
): Promise<TelemetryCollectionPolicyDoc | null> {
try {
const { resource } = await policiesContainer()
.item(id, productId)
.read<TelemetryCollectionPolicyDoc>();
return resource ?? null;
} catch {
return null;
}
}
export async function createPolicy(
doc: TelemetryCollectionPolicyDoc
): Promise<TelemetryCollectionPolicyDoc> {
const { resource } = await policiesContainer().items.create(doc);
return resource as TelemetryCollectionPolicyDoc;
}
export async function updatePolicy(
id: string,
productId: string,
updates: Partial<TelemetryCollectionPolicyDoc>
): Promise<TelemetryCollectionPolicyDoc | null> {
try {
const { resource: existing } = await policiesContainer()
.item(id, productId)
.read<TelemetryCollectionPolicyDoc>();
if (!existing) return null;
const merged = { ...existing, ...updates, updatedAt: new Date().toISOString() };
const { resource } = await policiesContainer().item(id, productId).replace(merged);
return resource as TelemetryCollectionPolicyDoc;
} catch {
return null;
}
}
export async function deletePolicy(id: string, productId: string): Promise<boolean> {
try {
await policiesContainer().item(id, productId).delete();
return true;
} catch {
return false;
}
}
// ─── Clusters ───────────────────────────────────────────────────────
export async function listClusters(
productId: string,
filters?: {
platform?: string;
module?: string;
from?: string;
to?: string;
limit?: number;
}
): Promise<TelemetryErrorCluster[]> {
const conditions: string[] = ['c.productId = @productId'];
const parameters: Array<{ name: string; value: string | number | boolean }> = [
{ name: '@productId', value: productId },
];
if (filters?.platform) {
conditions.push('c.platform = @platform');
parameters.push({ name: '@platform', value: filters.platform });
}
if (filters?.module) {
conditions.push('c.module = @module');
parameters.push({ name: '@module', value: filters.module });
}
if (filters?.from) {
conditions.push('c.lastSeenAt >= @from');
parameters.push({ name: '@from', value: filters.from });
}
if (filters?.to) {
conditions.push('c.lastSeenAt <= @to');
parameters.push({ name: '@to', value: filters.to });
}
const limit = filters?.limit ?? 50;
const query = `SELECT TOP ${limit} * FROM c WHERE ${conditions.join(' AND ')} ORDER BY c.totalCount DESC`;
const { resources } = await clustersContainer()
.items.query<TelemetryErrorCluster>({ query, parameters })
.fetchAll();
return resources;
}
export async function upsertCluster(cluster: TelemetryErrorCluster): Promise<void> {
await clustersContainer().items.upsert(cluster);
}
export async function getCluster(id: string, pk: string): Promise<TelemetryErrorCluster | null> {
try {
const { resource } = await clustersContainer().item(id, pk).read<TelemetryErrorCluster>();
return resource ?? null;
} catch {
return null;
}
}

View File

@ -0,0 +1,535 @@
/**
* Telemetry REST endpoints.
*
* POST /telemetry/events batch ingest (any auth)
* GET /telemetry/config collection config for clients
* GET /telemetry/query admin query
* GET /telemetry/clusters admin error clusters
* GET /telemetry/policies list policies (admin)
* POST /telemetry/policies create policy (admin)
* PUT /telemetry/policies/:id update policy (admin)
* DELETE /telemetry/policies/:id delete policy (admin)
* DELETE /telemetry/user/:userId GDPR erasure (admin)
*/
import type { FastifyInstance } from 'fastify';
import { randomUUID } from 'node:crypto';
import { createHash } from 'node:crypto';
import { getRequestProductId } from '../../lib/request-context.js';
import { BadRequestError, NotFoundError, UnauthorizedError } from '../../lib/errors.js';
import { hashUserFlag } from '../flags/routes.js';
import * as repo from './repository.js';
import {
TelemetryIngestRequestSchema,
CreatePolicySchema,
UpdatePolicySchema,
TelemetryQuerySchema,
type TelemetryEventDoc,
type TelemetryCollectionPolicyDoc,
type TelemetryCollectionConfig,
type TelemetryErrorCluster,
} from './types.js';
// ─── Helpers ────────────────────────────────────────────────────────
const DEFAULT_EVENT_TTL_DAYS = parseInt(process.env.TELEMETRY_EVENT_TTL_DAYS ?? '30', 10);
const DEFAULT_CLUSTER_TTL_DAYS = parseInt(process.env.TELEMETRY_CLUSTER_TTL_DAYS ?? '90', 10);
const TELEMETRY_ENABLED = process.env.TELEMETRY_ENABLED !== 'false';
const PII_SCAN_ENABLED = process.env.TELEMETRY_PII_SCAN_ENABLED !== 'false';
const CLIENT_BATCH_SIZE = parseInt(process.env.TELEMETRY_CLIENT_BATCH_SIZE ?? '20', 10);
const CLIENT_FLUSH_MS = parseInt(process.env.TELEMETRY_CLIENT_FLUSH_MS ?? '60000', 10);
const CLIENT_MAX_QUEUE = parseInt(process.env.TELEMETRY_CLIENT_MAX_QUEUE ?? '200', 10);
/** PII patterns — reject events containing these. */
const PII_PATTERNS = [
/\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z]{2,}\b/i, // email
/\b\d{3}[-.]?\d{3}[-.]?\d{4}\b/, // US phone
/\b\d{4}[- ]?\d{4}[- ]?\d{4}[- ]?\d{4}\b/, // credit card
/\b\d{3}-\d{2}-\d{4}\b/, // SSN
];
function containsPII(text: string): boolean {
if (!PII_SCAN_ENABLED) return false;
return PII_PATTERNS.some(p => p.test(text));
}
function computePk(productId: string, occurredAt: string, platform: string): string {
const date = new Date(occurredAt);
const yyyyMM = `${date.getUTCFullYear()}${String(date.getUTCMonth() + 1).padStart(2, '0')}`;
return `${productId}:${yyyyMM}:${platform}`;
}
function normalizeMessage(msg: string): string {
return msg
.replace(/[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}/gi, '<UUID>')
.replace(/\d+/g, '<N>')
.replace(/\/[\w/.]+/g, '<PATH>')
.toLowerCase()
.trim();
}
function generateFingerprint(event: TelemetryEventDoc): string {
const input = [
event.platform,
event.channel,
event.module,
event.eventName,
event.errorDomain ?? '',
event.errorCode ?? '',
normalizeMessage(event.message ?? ''),
].join(':');
return createHash('sha256').update(input).digest('hex').substring(0, 16);
}
function requireAdmin(req: { jwtPayload?: { role?: string } }): void {
if (req.jwtPayload?.role !== 'admin') {
throw new UnauthorizedError('Admin access required');
}
}
// ─── Policy evaluation helpers ──────────────────────────────────────
interface ClientContext {
userId?: string;
anonymousInstallId?: string;
platform?: string;
channel?: string;
osFamily?: string;
appVersion?: string;
buildNumber?: string;
releaseChannel?: string;
countryCode?: string;
regionCode?: string;
}
function policyMatchesContext(policy: TelemetryCollectionPolicyDoc, ctx: ClientContext): boolean {
const t = policy.targeting;
// Check time bounds
const now = new Date().toISOString();
if (policy.startsAt && now < policy.startsAt) return false;
if (policy.expiresAt && now > policy.expiresAt) return false;
// User targeting
if (t.userIds?.length && ctx.userId && !t.userIds.includes(ctx.userId)) return false;
if (
t.anonymousInstallIds?.length &&
ctx.anonymousInstallId &&
!t.anonymousInstallIds.includes(ctx.anonymousInstallId)
)
return false;
// Platform targeting
if (t.platforms?.length && ctx.platform && !t.platforms.includes(ctx.platform as never))
return false;
if (t.channels?.length && ctx.channel && !t.channels.includes(ctx.channel as never)) return false;
if (t.osFamilies?.length && ctx.osFamily && !t.osFamilies.includes(ctx.osFamily as never))
return false;
// Version targeting
if (t.appVersions?.length && ctx.appVersion && !t.appVersions.includes(ctx.appVersion))
return false;
if (t.buildNumbers?.length && ctx.buildNumber && !t.buildNumbers.includes(ctx.buildNumber))
return false;
if (t.buildNumberRange && ctx.buildNumber) {
const bn = parseInt(ctx.buildNumber, 10);
if (!isNaN(bn)) {
if (t.buildNumberRange.min !== undefined && bn < t.buildNumberRange.min) return false;
if (t.buildNumberRange.max !== undefined && bn > t.buildNumberRange.max) return false;
}
}
// Region targeting
if (t.countryCodes?.length && ctx.countryCode && !t.countryCodes.includes(ctx.countryCode))
return false;
if (t.regionCodes?.length && ctx.regionCode && !t.regionCodes.includes(ctx.regionCode))
return false;
// Release channel targeting
if (
t.releaseChannels?.length &&
ctx.releaseChannel &&
!t.releaseChannels.includes(ctx.releaseChannel as never)
)
return false;
// Percentage rollout (deterministic)
if (t.percentage !== undefined && t.percentage < 100) {
const identity = ctx.userId || ctx.anonymousInstallId || '';
if (!identity) return false;
const bucket = hashUserFlag(identity, `telemetry_policy_${policy.id}`);
if (bucket >= t.percentage) return false;
}
return true;
}
function mergePolicies(policies: TelemetryCollectionPolicyDoc[]): TelemetryCollectionConfig {
if (policies.length === 0) {
// Hardcoded default
return {
enabled: TELEMETRY_ENABLED,
eventTypes: ['warn', 'error', 'fatal'],
modules: [],
samplingRates: { debug: 0, info: 0, warn: 0.5, error: 1, fatal: 1 },
batchSize: CLIENT_BATCH_SIZE,
flushIntervalMs: CLIENT_FLUSH_MS,
maxQueueSize: CLIENT_MAX_QUEUE,
};
}
// Policies are already sorted by priority DESC
const eventTypesSet = new Set<string>();
const modulesSet = new Set<string>();
const rates: Record<string, number> = { debug: 0, info: 0, warn: 0, error: 0, fatal: 0 };
let enabled = TELEMETRY_ENABLED;
for (const p of policies) {
// Union event types
for (const et of p.eventTypes) {
eventTypesSet.add(et);
// Highest-priority policy's rate wins per event type
if (rates[et] === 0) {
rates[et] = p.samplingRate;
}
}
// Union modules
for (const m of p.modules) {
modulesSet.add(m);
}
// samplingRate 0 from highest-priority policy disables everything
if (p.priority >= 999 && p.samplingRate === 0) {
enabled = false;
}
}
return {
enabled,
eventTypes: Array.from(eventTypesSet),
modules: Array.from(modulesSet),
samplingRates: {
debug: rates.debug,
info: rates.info,
warn: rates.warn,
error: rates.error,
fatal: rates.fatal,
},
batchSize: CLIENT_BATCH_SIZE,
flushIntervalMs: CLIENT_FLUSH_MS,
maxQueueSize: CLIENT_MAX_QUEUE,
};
}
// ─── Cluster update ─────────────────────────────────────────────────
async function updateClusterForEvent(event: TelemetryEventDoc): Promise<void> {
if (!['warn', 'error', 'fatal'].includes(event.eventType)) return;
const fingerprint = event.fingerprint || generateFingerprint(event);
const date = new Date(event.occurredAt);
const yyyyMM = `${date.getUTCFullYear()}${String(date.getUTCMonth() + 1).padStart(2, '0')}`;
const clusterId = `${fingerprint}:${yyyyMM}`;
const pk = `${event.productId}:${event.platform}:${event.module}`;
const existing = await repo.getCluster(clusterId, pk);
if (existing) {
existing.totalCount++;
existing.lastSeenAt = event.receivedAt;
existing.sampleMessage = event.message;
existing.sampleErrorDomain = event.errorDomain;
existing.sampleErrorCode = event.errorCode;
// Update affected users (dedup, cap 100)
const uid = event.userId || event.anonymousInstallId;
if (uid && event.userId && !existing.affectedUserIds.includes(uid)) {
if (existing.affectedUserIds.length < 100) existing.affectedUserIds.push(uid);
}
if (uid && event.anonymousInstallId && !existing.affectedInstallIds.includes(uid)) {
if (existing.affectedInstallIds.length < 100) existing.affectedInstallIds.push(uid);
}
if (!existing.affectedOsFamilies.includes(event.osFamily)) {
existing.affectedOsFamilies.push(event.osFamily);
}
// Update version breakdown
const vEntry = existing.affectedVersions.find(
v => v.appVersion === event.appVersion && v.buildNumber === event.buildNumber
);
if (vEntry) {
vEntry.count++;
vEntry.lastSeenAt = event.receivedAt;
} else if (existing.affectedVersions.length < 50) {
existing.affectedVersions.push({
appVersion: event.appVersion,
buildNumber: event.buildNumber,
count: 1,
lastSeenAt: event.receivedAt,
});
}
// Escalate severity
const severityOrder = { warn: 0, error: 1, fatal: 2 };
const eventSev = event.eventType as 'warn' | 'error' | 'fatal';
if ((severityOrder[eventSev] ?? 0) > (severityOrder[existing.severity] ?? 0)) {
existing.severity = eventSev;
}
await repo.upsertCluster(existing);
} else {
const newCluster: TelemetryErrorCluster = {
id: clusterId,
pk,
productId: event.productId,
fingerprint,
platform: event.platform,
channel: event.channel,
module: event.module,
eventName: event.eventName,
affectedVersions: [
{
appVersion: event.appVersion,
buildNumber: event.buildNumber,
count: 1,
lastSeenAt: event.receivedAt,
},
],
firstSeenAt: event.receivedAt,
lastSeenAt: event.receivedAt,
totalCount: 1,
affectedUserIds: event.userId ? [event.userId] : [],
affectedInstallIds: event.anonymousInstallId ? [event.anonymousInstallId] : [],
affectedOsFamilies: [event.osFamily],
sampleErrorDomain: event.errorDomain,
sampleErrorCode: event.errorCode,
sampleMessage: event.message,
severity: event.eventType as 'warn' | 'error' | 'fatal',
ttl: DEFAULT_CLUSTER_TTL_DAYS * 86400,
};
await repo.upsertCluster(newCluster);
}
}
// ─── Routes ─────────────────────────────────────────────────────────
export async function telemetryRoutes(app: FastifyInstance) {
// ── Batch ingest ──────────────────────────────────────────────
app.post('/telemetry/events', async (req, reply) => {
if (!TELEMETRY_ENABLED) {
return { accepted: 0, rejected: 0, serverTime: new Date().toISOString() };
}
// Auth: JWT or X-Install-Token
const installToken = req.headers['x-install-token'] as string | undefined;
if (!req.jwtPayload && !installToken) {
throw new UnauthorizedError('JWT or X-Install-Token required');
}
const parsed = TelemetryIngestRequestSchema.safeParse(req.body);
if (!parsed.success) {
throw new BadRequestError(parsed.error.issues.map(i => i.message).join('; '));
}
const { productId, events } = parsed.data;
const now = new Date().toISOString();
const ttl = DEFAULT_EVENT_TTL_DAYS * 86400;
let accepted = 0;
let rejected = 0;
const errors: Array<{ index: number; reason: string }> = [];
const docsToInsert: TelemetryEventDoc[] = [];
for (let i = 0; i < events.length; i++) {
const event = events[i];
// Validate productId matches request-level
if (event.productId !== productId) {
errors.push({ index: i, reason: `productId mismatch: expected ${productId}` });
rejected++;
continue;
}
// PII scan
const fieldsToScan = [event.message, event.eventName, event.errorDomain].filter(Boolean);
if (fieldsToScan.some(f => containsPII(f!))) {
errors.push({ index: i, reason: 'PII detected' });
rejected++;
continue;
}
const pk = computePk(productId, event.occurredAt, event.platform);
const doc: TelemetryEventDoc = {
...event,
pk,
receivedAt: now,
ttl,
};
docsToInsert.push(doc);
accepted++;
}
// Batch upsert (idempotent by event id)
if (docsToInsert.length > 0) {
await repo.upsertEventsBatch(docsToInsert);
// Fire-and-forget cluster updates for warn/error/fatal
for (const doc of docsToInsert) {
if (['warn', 'error', 'fatal'].includes(doc.eventType)) {
updateClusterForEvent(doc).catch(err => {
req.log.warn({ err, eventId: doc.id }, 'Cluster update failed');
});
}
}
}
reply.code(accepted > 0 ? 200 : 400);
return {
accepted,
rejected,
errors: errors.length > 0 ? errors : undefined,
serverTime: now,
};
});
// ── Collection config (client poll) ───────────────────────────
app.get('/telemetry/config', async req => {
if (!TELEMETRY_ENABLED) {
return {
enabled: false,
eventTypes: [],
modules: [],
samplingRates: { debug: 0, info: 0, warn: 0, error: 0, fatal: 0 },
batchSize: CLIENT_BATCH_SIZE,
flushIntervalMs: CLIENT_FLUSH_MS,
maxQueueSize: CLIENT_MAX_QUEUE,
} satisfies TelemetryCollectionConfig;
}
const productId = getRequestProductId(req);
const ctx: ClientContext = req.query as ClientContext;
const allPolicies = await repo.listPolicies(productId);
const activePolicies = allPolicies.filter(p => p.enabled && policyMatchesContext(p, ctx));
return mergePolicies(activePolicies);
});
// ── Admin: query events ───────────────────────────────────────
app.get('/telemetry/query', async req => {
requireAdmin(req);
const productId = getRequestProductId(req);
const parsed = TelemetryQuerySchema.safeParse(req.query);
if (!parsed.success) {
throw new BadRequestError(parsed.error.issues.map(i => i.message).join('; '));
}
const result = await repo.queryEvents(productId, parsed.data);
return {
events: result.events,
total: result.events.length,
continuationToken: result.continuationToken,
};
});
// ── Admin: error clusters ─────────────────────────────────────
app.get('/telemetry/clusters', async req => {
requireAdmin(req);
const productId = getRequestProductId(req);
const {
platform,
module: mod,
from,
to,
limit,
} = req.query as {
platform?: string;
module?: string;
from?: string;
to?: string;
limit?: string;
};
const clusters = await repo.listClusters(productId, {
platform,
module: mod,
from,
to,
limit: limit ? parseInt(limit, 10) : undefined,
});
return { clusters, total: clusters.length };
});
// ── Admin: list policies ──────────────────────────────────────
app.get('/telemetry/policies', async req => {
requireAdmin(req);
const productId = getRequestProductId(req);
return { policies: await repo.listPolicies(productId) };
});
// ── Admin: create policy ──────────────────────────────────────
app.post('/telemetry/policies', async (req, reply) => {
requireAdmin(req);
const productId = getRequestProductId(req);
const parsed = CreatePolicySchema.safeParse(req.body);
if (!parsed.success) {
throw new BadRequestError(parsed.error.issues.map(i => i.message).join('; '));
}
const now = new Date().toISOString();
const doc: TelemetryCollectionPolicyDoc = {
id: randomUUID(),
productId,
...parsed.data,
createdAt: now,
updatedAt: now,
createdBy: req.jwtPayload?.sub ?? 'unknown',
};
const created = await repo.createPolicy(doc);
reply.code(201);
return created;
});
// ── Admin: update policy ──────────────────────────────────────
app.put('/telemetry/policies/:id', async req => {
requireAdmin(req);
const { id } = req.params as { id: string };
const productId = getRequestProductId(req);
const parsed = UpdatePolicySchema.safeParse(req.body);
if (!parsed.success) {
throw new BadRequestError(parsed.error.issues.map(i => i.message).join('; '));
}
// Convert null to undefined for optional fields (Zod nullable → TS optional)
const updates: Record<string, unknown> = { ...parsed.data };
if (updates.startsAt === null) updates.startsAt = undefined;
if (updates.expiresAt === null) updates.expiresAt = undefined;
const updated = await repo.updatePolicy(
id,
productId,
updates as Partial<TelemetryCollectionPolicyDoc>
);
if (!updated) throw new NotFoundError('Policy not found');
return updated;
});
// ── Admin: delete policy ──────────────────────────────────────
app.delete('/telemetry/policies/:id', async req => {
requireAdmin(req);
const { id } = req.params as { id: string };
const productId = getRequestProductId(req);
const deleted = await repo.deletePolicy(id, productId);
if (!deleted) throw new NotFoundError('Policy not found');
return { success: true };
});
// ── Admin: GDPR erasure ───────────────────────────────────────
app.delete('/telemetry/user/:userId', async req => {
requireAdmin(req);
const { userId } = req.params as { userId: string };
const productId = getRequestProductId(req);
const eventsDeleted = await repo.deleteEventsByUserId(productId, userId);
return { userId, eventsDeleted, clustersUpdated: 0 };
});
}

View File

@ -0,0 +1,317 @@
/**
* Telemetry module tests types, policy evaluation, fingerprinting.
*/
import { describe, it, expect } from 'vitest';
import {
TelemetryEventSchema,
TelemetryIngestRequestSchema,
CreatePolicySchema,
UpdatePolicySchema,
TelemetryQuerySchema,
} from './types.js';
// ─── Minimal valid event for reuse ──────────────────────────────────
function validEvent(overrides: Record<string, unknown> = {}) {
return {
id: '550e8400-e29b-41d4-a716-446655440000',
productId: 'lysnrai',
anonymousInstallId: '660e8400-e29b-41d4-a716-446655440001',
sessionId: 'sess_abc123',
platform: 'ios',
channel: 'keyboard_extension',
osFamily: 'ios',
appVersion: '1.0.0',
buildNumber: '26',
releaseChannel: 'beta',
eventType: 'error',
module: 'keyboard_dictation',
eventName: 'recognition_failed',
occurredAt: '2026-02-17T08:00:00.000Z',
...overrides,
};
}
// ─── TelemetryEventSchema ───────────────────────────────────────────
describe('TelemetryEventSchema', () => {
it('accepts a valid event with anonymousInstallId', () => {
const result = TelemetryEventSchema.safeParse(validEvent());
expect(result.success).toBe(true);
});
it('accepts a valid event with userId', () => {
const result = TelemetryEventSchema.safeParse(
validEvent({ userId: 'usr_abc', anonymousInstallId: undefined })
);
expect(result.success).toBe(true);
});
it('accepts event with both userId and anonymousInstallId', () => {
const result = TelemetryEventSchema.safeParse(validEvent({ userId: 'usr_abc' }));
expect(result.success).toBe(true);
});
it('rejects event with neither userId nor anonymousInstallId', () => {
const result = TelemetryEventSchema.safeParse(validEvent({ anonymousInstallId: undefined }));
expect(result.success).toBe(false);
});
it('rejects empty id', () => {
const result = TelemetryEventSchema.safeParse(validEvent({ id: 'not-a-uuid' }));
expect(result.success).toBe(false);
});
it('rejects invalid platform', () => {
const result = TelemetryEventSchema.safeParse(validEvent({ platform: 'symbian' }));
expect(result.success).toBe(false);
});
it('rejects invalid channel', () => {
const result = TelemetryEventSchema.safeParse(validEvent({ channel: 'fax_machine' }));
expect(result.success).toBe(false);
});
it('rejects invalid osFamily', () => {
const result = TelemetryEventSchema.safeParse(validEvent({ osFamily: 'beos' }));
expect(result.success).toBe(false);
});
it('rejects invalid eventType', () => {
const result = TelemetryEventSchema.safeParse(validEvent({ eventType: 'trace' }));
expect(result.success).toBe(false);
});
it('rejects message > 512 chars', () => {
const result = TelemetryEventSchema.safeParse(validEvent({ message: 'x'.repeat(513) }));
expect(result.success).toBe(false);
});
it('accepts message at exactly 512 chars', () => {
const result = TelemetryEventSchema.safeParse(validEvent({ message: 'x'.repeat(512) }));
expect(result.success).toBe(true);
});
it('rejects stackTrace > 8192 chars', () => {
const result = TelemetryEventSchema.safeParse(
validEvent({ stackTrace: 'x'.repeat(8193), eventType: 'fatal' })
);
expect(result.success).toBe(false);
});
it('accepts optional fields', () => {
const result = TelemetryEventSchema.safeParse(
validEvent({
feature: 'voice_typing',
errorDomain: 'kAFAssistantErrorDomain',
errorCode: '209',
message: 'Recognition timed out',
fingerprint: 'abc123def456',
tags: { backend: 'azure' },
metrics: { duration_ms: 5000 },
context: { dictation: { backend: 'azure' } },
osVersion: 'iOS 18.2',
deviceModel: 'iPhone17,3',
locale: 'en-US',
timezone: 'America/Los_Angeles',
})
);
expect(result.success).toBe(true);
});
it('validates all platform enum values', () => {
for (const p of ['ios', 'android', 'web', 'desktop']) {
const result = TelemetryEventSchema.safeParse(validEvent({ platform: p }));
expect(result.success).toBe(true);
}
});
it('validates all channel enum values', () => {
for (const c of [
'mobile_app',
'keyboard_extension',
'web_app',
'desktop_app',
'backend_service',
]) {
const result = TelemetryEventSchema.safeParse(validEvent({ channel: c }));
expect(result.success).toBe(true);
}
});
it('validates all osFamily enum values', () => {
for (const os of ['ios', 'android', 'macos', 'windows', 'linux', 'chromeos', 'other']) {
const result = TelemetryEventSchema.safeParse(validEvent({ osFamily: os }));
expect(result.success).toBe(true);
}
});
it('validates all eventType enum values', () => {
for (const et of ['debug', 'info', 'warn', 'error', 'fatal']) {
const result = TelemetryEventSchema.safeParse(validEvent({ eventType: et }));
expect(result.success).toBe(true);
}
});
it('validates all releaseChannel enum values', () => {
for (const rc of ['dev', 'beta', 'prod']) {
const result = TelemetryEventSchema.safeParse(validEvent({ releaseChannel: rc }));
expect(result.success).toBe(true);
}
});
});
// ─── TelemetryIngestRequestSchema ───────────────────────────────────
describe('TelemetryIngestRequestSchema', () => {
it('accepts valid batch', () => {
const result = TelemetryIngestRequestSchema.safeParse({
productId: 'lysnrai',
events: [validEvent()],
});
expect(result.success).toBe(true);
});
it('rejects empty events array', () => {
const result = TelemetryIngestRequestSchema.safeParse({
productId: 'lysnrai',
events: [],
});
expect(result.success).toBe(false);
});
it('rejects > 50 events', () => {
const events = Array.from({ length: 51 }, (_, i) =>
validEvent({ id: `550e8400-e29b-41d4-a716-44665544${String(i).padStart(4, '0')}` })
);
const result = TelemetryIngestRequestSchema.safeParse({
productId: 'lysnrai',
events,
});
expect(result.success).toBe(false);
});
it('accepts clientClockSkewMs', () => {
const result = TelemetryIngestRequestSchema.safeParse({
productId: 'lysnrai',
events: [validEvent()],
clientClockSkewMs: -500,
});
expect(result.success).toBe(true);
});
});
// ─── CreatePolicySchema ─────────────────────────────────────────────
describe('CreatePolicySchema', () => {
it('accepts minimal policy', () => {
const result = CreatePolicySchema.safeParse({ name: 'Test policy' });
expect(result.success).toBe(true);
if (result.success) {
expect(result.data.enabled).toBe(true);
expect(result.data.priority).toBe(50);
expect(result.data.eventTypes).toEqual(['warn', 'error', 'fatal']);
expect(result.data.samplingRate).toBe(1.0);
}
});
it('accepts full policy with targeting', () => {
const result = CreatePolicySchema.safeParse({
name: 'Debug iOS keyboard',
description: 'Debug user X iOS keyboard dictation',
enabled: true,
priority: 100,
eventTypes: ['debug', 'info', 'warn', 'error', 'fatal'],
modules: ['keyboard_dictation'],
samplingRate: 1.0,
targeting: {
userIds: ['usr_abc'],
platforms: ['ios'],
channels: ['keyboard_extension'],
osFamilies: ['ios'],
buildNumberRange: { min: 1, max: 25 },
countryCodes: ['US'],
regionCodes: ['US:WA'],
releaseChannels: ['beta'],
percentage: 100,
},
startsAt: '2026-02-17T00:00:00.000Z',
expiresAt: '2026-02-20T00:00:00.000Z',
});
expect(result.success).toBe(true);
});
it('rejects empty name', () => {
const result = CreatePolicySchema.safeParse({ name: '' });
expect(result.success).toBe(false);
});
it('rejects samplingRate > 1', () => {
const result = CreatePolicySchema.safeParse({ name: 'Test', samplingRate: 1.5 });
expect(result.success).toBe(false);
});
it('rejects priority > 999', () => {
const result = CreatePolicySchema.safeParse({ name: 'Test', priority: 1000 });
expect(result.success).toBe(false);
});
});
// ─── UpdatePolicySchema ─────────────────────────────────────────────
describe('UpdatePolicySchema', () => {
it('accepts partial updates', () => {
const result = UpdatePolicySchema.safeParse({ enabled: false, priority: 10 });
expect(result.success).toBe(true);
});
it('accepts empty update', () => {
const result = UpdatePolicySchema.safeParse({});
expect(result.success).toBe(true);
});
it('accepts nullable startsAt/expiresAt (to clear them)', () => {
const result = UpdatePolicySchema.safeParse({ startsAt: null, expiresAt: null });
expect(result.success).toBe(true);
});
});
// ─── TelemetryQuerySchema ───────────────────────────────────────────
describe('TelemetryQuerySchema', () => {
it('accepts empty query (defaults)', () => {
const result = TelemetryQuerySchema.safeParse({});
expect(result.success).toBe(true);
if (result.success) {
expect(result.data.limit).toBe(50);
}
});
it('accepts full filter set', () => {
const result = TelemetryQuerySchema.safeParse({
userId: 'usr_abc',
platform: 'ios',
channel: 'keyboard_extension',
module: 'keyboard_dictation',
eventType: 'error',
from: '2026-02-01T00:00:00.000Z',
to: '2026-02-17T23:59:59.000Z',
limit: 100,
});
expect(result.success).toBe(true);
});
it('rejects limit > 200', () => {
const result = TelemetryQuerySchema.safeParse({ limit: 201 });
expect(result.success).toBe(false);
});
it('coerces string limit to number', () => {
const result = TelemetryQuerySchema.safeParse({ limit: '25' });
expect(result.success).toBe(true);
if (result.success) {
expect(result.data.limit).toBe(25);
}
});
});

View File

@ -0,0 +1,255 @@
/**
* Telemetry types event schema, collection policies, and cluster types.
* See docs/WINDSURF/CLIENT_TELEMETRY_DESIGN.md for full design.
*/
import { z } from 'zod';
// ─── Enums ──────────────────────────────────────────────────────────
export const PlatformEnum = z.enum(['ios', 'android', 'web', 'desktop']);
export const ChannelEnum = z.enum([
'mobile_app',
'keyboard_extension',
'web_app',
'desktop_app',
'backend_service',
]);
export const OsFamilyEnum = z.enum([
'ios',
'android',
'macos',
'windows',
'linux',
'chromeos',
'other',
]);
export const EventTypeEnum = z.enum(['debug', 'info', 'warn', 'error', 'fatal']);
export const ReleaseChannelEnum = z.enum(['dev', 'beta', 'prod']);
// ─── Telemetry Event ────────────────────────────────────────────────
export const TelemetryEventSchema = z
.object({
// Identity
id: z.string().uuid(),
productId: z.string().min(1),
userId: z.string().optional(),
anonymousInstallId: z.string().uuid().optional(),
sessionId: z.string().min(1),
requestId: z.string().optional(),
// Source classification
platform: PlatformEnum,
channel: ChannelEnum,
osFamily: OsFamilyEnum,
osVersion: z.string().optional(),
deviceModel: z.string().optional(),
locale: z.string().optional(),
timezone: z.string().optional(),
// App release
appVersion: z.string().min(1),
buildNumber: z.string().min(1),
releaseChannel: ReleaseChannelEnum,
// Event semantics
eventType: EventTypeEnum,
module: z.string().min(1),
feature: z.string().optional(),
eventName: z.string().min(1),
// Error & diagnostics
errorDomain: z.string().optional(),
errorCode: z.string().optional(),
message: z.string().max(512).optional(),
stackTrace: z.string().max(8192).optional(),
fingerprint: z.string().optional(),
// Structured metadata
tags: z.record(z.string().max(128)).optional(),
metrics: z.record(z.number()).optional(),
context: z.record(z.unknown()).optional(),
// Timing (client provides occurredAt)
occurredAt: z.string().datetime(),
})
.refine(e => e.userId || e.anonymousInstallId, {
message: 'At least one of userId or anonymousInstallId is required',
});
export type TelemetryEvent = z.infer<typeof TelemetryEventSchema>;
/** Server-enriched event document stored in Cosmos. */
export interface TelemetryEventDoc extends TelemetryEvent {
pk: string; // ${productId}:${yyyyMM}:${platform}
receivedAt: string;
ttl: number; // Cosmos TTL in seconds
countryCode?: string;
regionCode?: string;
}
// ─── Batch Ingest Request ───────────────────────────────────────────
export const TelemetryIngestRequestSchema = z.object({
productId: z.string().min(1),
events: z.array(TelemetryEventSchema).min(1).max(50),
clientClockSkewMs: z.number().optional(),
});
export type TelemetryIngestRequest = z.infer<typeof TelemetryIngestRequestSchema>;
export interface TelemetryIngestResponse {
accepted: number;
rejected: number;
errors?: Array<{ index: number; reason: string }>;
serverTime: string;
}
// ─── Collection Policy ──────────────────────────────────────────────
export const TelemetryTargetingSchema = z.object({
userIds: z.array(z.string()).optional(),
anonymousInstallIds: z.array(z.string()).optional(),
platforms: z.array(PlatformEnum).optional(),
channels: z.array(ChannelEnum).optional(),
osFamilies: z.array(OsFamilyEnum).optional(),
appVersions: z.array(z.string()).optional(),
appVersionRange: z
.object({
min: z.string().optional(),
max: z.string().optional(),
})
.optional(),
buildNumbers: z.array(z.string()).optional(),
buildNumberRange: z
.object({
min: z.number().optional(),
max: z.number().optional(),
})
.optional(),
countryCodes: z.array(z.string()).optional(),
regionCodes: z.array(z.string()).optional(),
releaseChannels: z.array(ReleaseChannelEnum).optional(),
percentage: z.number().min(0).max(100).optional(),
});
export const CreatePolicySchema = z.object({
name: z.string().min(1).max(200),
description: z.string().default(''),
enabled: z.boolean().default(true),
priority: z.number().int().min(0).max(999).default(50),
eventTypes: z.array(EventTypeEnum).default(['warn', 'error', 'fatal']),
modules: z.array(z.string()).default([]),
samplingRate: z.number().min(0).max(1).default(1.0),
targeting: TelemetryTargetingSchema.default({}),
startsAt: z.string().datetime().optional(),
expiresAt: z.string().datetime().optional(),
});
export const UpdatePolicySchema = z.object({
name: z.string().min(1).max(200).optional(),
description: z.string().optional(),
enabled: z.boolean().optional(),
priority: z.number().int().min(0).max(999).optional(),
eventTypes: z.array(EventTypeEnum).optional(),
modules: z.array(z.string()).optional(),
samplingRate: z.number().min(0).max(1).optional(),
targeting: TelemetryTargetingSchema.optional(),
startsAt: z.string().datetime().optional().nullable(),
expiresAt: z.string().datetime().optional().nullable(),
});
export type CreatePolicyInput = z.infer<typeof CreatePolicySchema>;
export type UpdatePolicyInput = z.infer<typeof UpdatePolicySchema>;
export interface TelemetryCollectionPolicyDoc {
id: string;
productId: string;
name: string;
description: string;
enabled: boolean;
priority: number;
eventTypes: string[];
modules: string[];
samplingRate: number;
targeting: z.infer<typeof TelemetryTargetingSchema>;
startsAt?: string;
expiresAt?: string;
createdAt: string;
updatedAt: string;
createdBy: string;
}
// ─── Collection Config Response (for clients) ──────────────────────
export interface TelemetryCollectionConfig {
enabled: boolean;
eventTypes: string[];
modules: string[];
samplingRates: {
debug: number;
info: number;
warn: number;
error: number;
fatal: number;
};
batchSize: number;
flushIntervalMs: number;
maxQueueSize: number;
}
// ─── Error Cluster ──────────────────────────────────────────────────
export interface TelemetryErrorCluster {
id: string; // ${fingerprint}:${yyyyMM}
pk: string; // ${productId}:${platform}:${module}
productId: string;
fingerprint: string;
platform: string;
channel: string;
module: string;
eventName: string;
affectedVersions: Array<{
appVersion: string;
buildNumber: string;
count: number;
lastSeenAt: string;
}>;
firstSeenAt: string;
lastSeenAt: string;
totalCount: number;
affectedUserIds: string[];
affectedInstallIds: string[];
affectedOsFamilies: string[];
sampleErrorDomain?: string;
sampleErrorCode?: string;
sampleMessage?: string;
severity: 'warn' | 'error' | 'fatal';
ttl: number;
}
// ─── Query / Admin types ────────────────────────────────────────────
export const TelemetryQuerySchema = z.object({
userId: z.string().optional(),
anonymousInstallId: z.string().optional(),
platform: z.string().optional(),
channel: z.string().optional(),
osFamily: z.string().optional(),
appVersion: z.string().optional(),
buildNumber: z.string().optional(),
module: z.string().optional(),
eventName: z.string().optional(),
eventType: z.string().optional(),
from: z.string().datetime().optional(),
to: z.string().datetime().optional(),
limit: z.coerce.number().int().min(1).max(200).default(50),
continuationToken: z.string().optional(),
});
export type TelemetryQueryInput = z.infer<typeof TelemetryQuerySchema>;

View File

@ -46,6 +46,7 @@ import { publicRoutes } from './modules/public/routes.js';
import { tokenRoutes } from './modules/tokens/routes.js';
import { themeRoutes } from './modules/themes/routes.js';
import { waitlistRoutes } from './modules/waitlist/routes.js';
import { telemetryRoutes } from './modules/telemetry/routes.js';
import { initCosmosIfNeeded } from './lib/cosmos-init.js';
import { config } from './lib/config.js';
@ -116,6 +117,8 @@ await app.register(tokenRoutes, { prefix: '/api' });
await app.register(themeRoutes, { prefix: '/api' });
// Waitlist module (pre-launch signups — public + admin routes)
await app.register(waitlistRoutes, { prefix: '/api' });
// Telemetry module (client ingest + admin query + policies)
await app.register(telemetryRoutes, { prefix: '/api' });
// Public routes — no auth, registered at top level
await app.register(publicRoutes, { prefix: '/api' });