diff --git a/services/platform-service/src/lib/cosmos-init.ts b/services/platform-service/src/lib/cosmos-init.ts index eecbab2e..bd53980f 100644 --- a/services/platform-service/src/lib/cosmos-init.ts +++ b/services/platform-service/src/lib/cosmos-init.ts @@ -34,6 +34,10 @@ const CONTAINER_DEFS: Record = { daily_briefs: { partitionKeyPath: '/userId' }, reflections: { partitionKeyPath: '/userId' }, brain_insights: { partitionKeyPath: '/userId' }, + // Telemetry (client diagnostics — see docs/WINDSURF/CLIENT_TELEMETRY_DESIGN.md) + telemetry_events: { partitionKeyPath: '/pk', defaultTtl: 30 * 86400 }, + telemetry_error_clusters: { partitionKeyPath: '/pk', defaultTtl: 90 * 86400 }, + telemetry_collection_policies: { partitionKeyPath: '/productId' }, }; export async function initCosmosIfNeeded(): Promise { diff --git a/services/platform-service/src/modules/telemetry/repository.ts b/services/platform-service/src/modules/telemetry/repository.ts new file mode 100644 index 00000000..65975bb4 --- /dev/null +++ b/services/platform-service/src/modules/telemetry/repository.ts @@ -0,0 +1,254 @@ +/** + * Telemetry repository — Cosmos DB CRUD for events, policies, and clusters. + */ + +import { getContainer } from '../../lib/cosmos.js'; +import type { + TelemetryEventDoc, + TelemetryCollectionPolicyDoc, + TelemetryErrorCluster, + TelemetryQueryInput, +} from './types.js'; + +// ─── Container accessors ──────────────────────────────────────────── + +function eventsContainer() { + return getContainer('telemetry_events'); +} + +function policiesContainer() { + return getContainer('telemetry_collection_policies'); +} + +function clustersContainer() { + return getContainer('telemetry_error_clusters'); +} + +// ─── Events ───────────────────────────────────────────────────────── + +export async function upsertEvent(doc: TelemetryEventDoc): Promise { + await eventsContainer().items.upsert(doc); +} + +export async function upsertEventsBatch(docs: TelemetryEventDoc[]): Promise { + // Cosmos DB doesn't have native batch across partitions. + // We upsert individually; for v1 this is acceptable. + // TODO: Group by pk and use bulk operations for same-partition batches. + const promises = docs.map(doc => eventsContainer().items.upsert(doc)); + await Promise.all(promises); +} + +export async function queryEvents( + productId: string, + input: TelemetryQueryInput +): Promise<{ events: TelemetryEventDoc[]; continuationToken?: string }> { + const conditions: string[] = ['c.productId = @productId']; + const parameters: Array<{ name: string; value: string | number | boolean }> = [ + { name: '@productId', value: productId }, + ]; + + if (input.userId) { + conditions.push('c.userId = @userId'); + parameters.push({ name: '@userId', value: input.userId }); + } + if (input.anonymousInstallId) { + conditions.push('c.anonymousInstallId = @anonymousInstallId'); + parameters.push({ name: '@anonymousInstallId', value: input.anonymousInstallId }); + } + if (input.platform) { + conditions.push('c.platform = @platform'); + parameters.push({ name: '@platform', value: input.platform }); + } + if (input.channel) { + conditions.push('c.channel = @channel'); + parameters.push({ name: '@channel', value: input.channel }); + } + if (input.osFamily) { + conditions.push('c.osFamily = @osFamily'); + parameters.push({ name: '@osFamily', value: input.osFamily }); + } + if (input.appVersion) { + conditions.push('c.appVersion = @appVersion'); + parameters.push({ name: '@appVersion', value: input.appVersion }); + } + if (input.buildNumber) { + conditions.push('c.buildNumber = @buildNumber'); + parameters.push({ name: '@buildNumber', value: input.buildNumber }); + } + if (input.module) { + conditions.push('c.module = @module'); + parameters.push({ name: '@module', value: input.module }); + } + if (input.eventName) { + conditions.push('c.eventName = @eventName'); + parameters.push({ name: '@eventName', value: input.eventName }); + } + if (input.eventType) { + conditions.push('c.eventType = @eventType'); + parameters.push({ name: '@eventType', value: input.eventType }); + } + if (input.from) { + conditions.push('c.occurredAt >= @from'); + parameters.push({ name: '@from', value: input.from }); + } + if (input.to) { + conditions.push('c.occurredAt <= @to'); + parameters.push({ name: '@to', value: input.to }); + } + + const query = `SELECT * FROM c WHERE ${conditions.join(' AND ')} ORDER BY c.occurredAt DESC`; + + const iterator = eventsContainer().items.query( + { query, parameters }, + { + maxItemCount: input.limit, + continuationToken: input.continuationToken || undefined, + } + ); + + const { resources, continuationToken } = await iterator.fetchNext(); + return { + events: resources, + continuationToken: continuationToken || undefined, + }; +} + +export async function deleteEventsByUserId(productId: string, userId: string): Promise { + // Find all events for this user, then delete them + const { resources } = await eventsContainer() + .items.query<{ id: string; pk: string }>({ + query: 'SELECT c.id, c.pk FROM c WHERE c.productId = @productId AND c.userId = @userId', + parameters: [ + { name: '@productId', value: productId }, + { name: '@userId', value: userId }, + ], + }) + .fetchAll(); + + let deleted = 0; + for (const doc of resources) { + try { + await eventsContainer().item(doc.id, doc.pk).delete(); + deleted++; + } catch { + // Skip docs that fail to delete (already deleted, etc.) + } + } + return deleted; +} + +// ─── Policies ─────────────────────────────────────────────────────── + +export async function listPolicies(productId: string): Promise { + const { resources } = await policiesContainer() + .items.query({ + query: 'SELECT * FROM c WHERE c.productId = @productId ORDER BY c.priority DESC', + parameters: [{ name: '@productId', value: productId }], + }) + .fetchAll(); + return resources; +} + +export async function getPolicy( + id: string, + productId: string +): Promise { + try { + const { resource } = await policiesContainer() + .item(id, productId) + .read(); + return resource ?? null; + } catch { + return null; + } +} + +export async function createPolicy( + doc: TelemetryCollectionPolicyDoc +): Promise { + const { resource } = await policiesContainer().items.create(doc); + return resource as TelemetryCollectionPolicyDoc; +} + +export async function updatePolicy( + id: string, + productId: string, + updates: Partial +): Promise { + try { + const { resource: existing } = await policiesContainer() + .item(id, productId) + .read(); + if (!existing) return null; + const merged = { ...existing, ...updates, updatedAt: new Date().toISOString() }; + const { resource } = await policiesContainer().item(id, productId).replace(merged); + return resource as TelemetryCollectionPolicyDoc; + } catch { + return null; + } +} + +export async function deletePolicy(id: string, productId: string): Promise { + try { + await policiesContainer().item(id, productId).delete(); + return true; + } catch { + return false; + } +} + +// ─── Clusters ─────────────────────────────────────────────────────── + +export async function listClusters( + productId: string, + filters?: { + platform?: string; + module?: string; + from?: string; + to?: string; + limit?: number; + } +): Promise { + const conditions: string[] = ['c.productId = @productId']; + const parameters: Array<{ name: string; value: string | number | boolean }> = [ + { name: '@productId', value: productId }, + ]; + + if (filters?.platform) { + conditions.push('c.platform = @platform'); + parameters.push({ name: '@platform', value: filters.platform }); + } + if (filters?.module) { + conditions.push('c.module = @module'); + parameters.push({ name: '@module', value: filters.module }); + } + if (filters?.from) { + conditions.push('c.lastSeenAt >= @from'); + parameters.push({ name: '@from', value: filters.from }); + } + if (filters?.to) { + conditions.push('c.lastSeenAt <= @to'); + parameters.push({ name: '@to', value: filters.to }); + } + + const limit = filters?.limit ?? 50; + const query = `SELECT TOP ${limit} * FROM c WHERE ${conditions.join(' AND ')} ORDER BY c.totalCount DESC`; + + const { resources } = await clustersContainer() + .items.query({ query, parameters }) + .fetchAll(); + return resources; +} + +export async function upsertCluster(cluster: TelemetryErrorCluster): Promise { + await clustersContainer().items.upsert(cluster); +} + +export async function getCluster(id: string, pk: string): Promise { + try { + const { resource } = await clustersContainer().item(id, pk).read(); + return resource ?? null; + } catch { + return null; + } +} diff --git a/services/platform-service/src/modules/telemetry/routes.ts b/services/platform-service/src/modules/telemetry/routes.ts new file mode 100644 index 00000000..233d0bf9 --- /dev/null +++ b/services/platform-service/src/modules/telemetry/routes.ts @@ -0,0 +1,535 @@ +/** + * Telemetry REST endpoints. + * + * POST /telemetry/events — batch ingest (any auth) + * GET /telemetry/config — collection config for clients + * GET /telemetry/query — admin query + * GET /telemetry/clusters — admin error clusters + * GET /telemetry/policies — list policies (admin) + * POST /telemetry/policies — create policy (admin) + * PUT /telemetry/policies/:id — update policy (admin) + * DELETE /telemetry/policies/:id — delete policy (admin) + * DELETE /telemetry/user/:userId — GDPR erasure (admin) + */ + +import type { FastifyInstance } from 'fastify'; +import { randomUUID } from 'node:crypto'; +import { createHash } from 'node:crypto'; +import { getRequestProductId } from '../../lib/request-context.js'; +import { BadRequestError, NotFoundError, UnauthorizedError } from '../../lib/errors.js'; +import { hashUserFlag } from '../flags/routes.js'; +import * as repo from './repository.js'; +import { + TelemetryIngestRequestSchema, + CreatePolicySchema, + UpdatePolicySchema, + TelemetryQuerySchema, + type TelemetryEventDoc, + type TelemetryCollectionPolicyDoc, + type TelemetryCollectionConfig, + type TelemetryErrorCluster, +} from './types.js'; + +// ─── Helpers ──────────────────────────────────────────────────────── + +const DEFAULT_EVENT_TTL_DAYS = parseInt(process.env.TELEMETRY_EVENT_TTL_DAYS ?? '30', 10); +const DEFAULT_CLUSTER_TTL_DAYS = parseInt(process.env.TELEMETRY_CLUSTER_TTL_DAYS ?? '90', 10); +const TELEMETRY_ENABLED = process.env.TELEMETRY_ENABLED !== 'false'; +const PII_SCAN_ENABLED = process.env.TELEMETRY_PII_SCAN_ENABLED !== 'false'; +const CLIENT_BATCH_SIZE = parseInt(process.env.TELEMETRY_CLIENT_BATCH_SIZE ?? '20', 10); +const CLIENT_FLUSH_MS = parseInt(process.env.TELEMETRY_CLIENT_FLUSH_MS ?? '60000', 10); +const CLIENT_MAX_QUEUE = parseInt(process.env.TELEMETRY_CLIENT_MAX_QUEUE ?? '200', 10); + +/** PII patterns — reject events containing these. */ +const PII_PATTERNS = [ + /\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z]{2,}\b/i, // email + /\b\d{3}[-.]?\d{3}[-.]?\d{4}\b/, // US phone + /\b\d{4}[- ]?\d{4}[- ]?\d{4}[- ]?\d{4}\b/, // credit card + /\b\d{3}-\d{2}-\d{4}\b/, // SSN +]; + +function containsPII(text: string): boolean { + if (!PII_SCAN_ENABLED) return false; + return PII_PATTERNS.some(p => p.test(text)); +} + +function computePk(productId: string, occurredAt: string, platform: string): string { + const date = new Date(occurredAt); + const yyyyMM = `${date.getUTCFullYear()}${String(date.getUTCMonth() + 1).padStart(2, '0')}`; + return `${productId}:${yyyyMM}:${platform}`; +} + +function normalizeMessage(msg: string): string { + return msg + .replace(/[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}/gi, '') + .replace(/\d+/g, '') + .replace(/\/[\w/.]+/g, '') + .toLowerCase() + .trim(); +} + +function generateFingerprint(event: TelemetryEventDoc): string { + const input = [ + event.platform, + event.channel, + event.module, + event.eventName, + event.errorDomain ?? '', + event.errorCode ?? '', + normalizeMessage(event.message ?? ''), + ].join(':'); + return createHash('sha256').update(input).digest('hex').substring(0, 16); +} + +function requireAdmin(req: { jwtPayload?: { role?: string } }): void { + if (req.jwtPayload?.role !== 'admin') { + throw new UnauthorizedError('Admin access required'); + } +} + +// ─── Policy evaluation helpers ────────────────────────────────────── + +interface ClientContext { + userId?: string; + anonymousInstallId?: string; + platform?: string; + channel?: string; + osFamily?: string; + appVersion?: string; + buildNumber?: string; + releaseChannel?: string; + countryCode?: string; + regionCode?: string; +} + +function policyMatchesContext(policy: TelemetryCollectionPolicyDoc, ctx: ClientContext): boolean { + const t = policy.targeting; + + // Check time bounds + const now = new Date().toISOString(); + if (policy.startsAt && now < policy.startsAt) return false; + if (policy.expiresAt && now > policy.expiresAt) return false; + + // User targeting + if (t.userIds?.length && ctx.userId && !t.userIds.includes(ctx.userId)) return false; + if ( + t.anonymousInstallIds?.length && + ctx.anonymousInstallId && + !t.anonymousInstallIds.includes(ctx.anonymousInstallId) + ) + return false; + + // Platform targeting + if (t.platforms?.length && ctx.platform && !t.platforms.includes(ctx.platform as never)) + return false; + if (t.channels?.length && ctx.channel && !t.channels.includes(ctx.channel as never)) return false; + if (t.osFamilies?.length && ctx.osFamily && !t.osFamilies.includes(ctx.osFamily as never)) + return false; + + // Version targeting + if (t.appVersions?.length && ctx.appVersion && !t.appVersions.includes(ctx.appVersion)) + return false; + if (t.buildNumbers?.length && ctx.buildNumber && !t.buildNumbers.includes(ctx.buildNumber)) + return false; + if (t.buildNumberRange && ctx.buildNumber) { + const bn = parseInt(ctx.buildNumber, 10); + if (!isNaN(bn)) { + if (t.buildNumberRange.min !== undefined && bn < t.buildNumberRange.min) return false; + if (t.buildNumberRange.max !== undefined && bn > t.buildNumberRange.max) return false; + } + } + + // Region targeting + if (t.countryCodes?.length && ctx.countryCode && !t.countryCodes.includes(ctx.countryCode)) + return false; + if (t.regionCodes?.length && ctx.regionCode && !t.regionCodes.includes(ctx.regionCode)) + return false; + + // Release channel targeting + if ( + t.releaseChannels?.length && + ctx.releaseChannel && + !t.releaseChannels.includes(ctx.releaseChannel as never) + ) + return false; + + // Percentage rollout (deterministic) + if (t.percentage !== undefined && t.percentage < 100) { + const identity = ctx.userId || ctx.anonymousInstallId || ''; + if (!identity) return false; + const bucket = hashUserFlag(identity, `telemetry_policy_${policy.id}`); + if (bucket >= t.percentage) return false; + } + + return true; +} + +function mergePolicies(policies: TelemetryCollectionPolicyDoc[]): TelemetryCollectionConfig { + if (policies.length === 0) { + // Hardcoded default + return { + enabled: TELEMETRY_ENABLED, + eventTypes: ['warn', 'error', 'fatal'], + modules: [], + samplingRates: { debug: 0, info: 0, warn: 0.5, error: 1, fatal: 1 }, + batchSize: CLIENT_BATCH_SIZE, + flushIntervalMs: CLIENT_FLUSH_MS, + maxQueueSize: CLIENT_MAX_QUEUE, + }; + } + + // Policies are already sorted by priority DESC + const eventTypesSet = new Set(); + const modulesSet = new Set(); + const rates: Record = { debug: 0, info: 0, warn: 0, error: 0, fatal: 0 }; + let enabled = TELEMETRY_ENABLED; + + for (const p of policies) { + // Union event types + for (const et of p.eventTypes) { + eventTypesSet.add(et); + // Highest-priority policy's rate wins per event type + if (rates[et] === 0) { + rates[et] = p.samplingRate; + } + } + // Union modules + for (const m of p.modules) { + modulesSet.add(m); + } + // samplingRate 0 from highest-priority policy disables everything + if (p.priority >= 999 && p.samplingRate === 0) { + enabled = false; + } + } + + return { + enabled, + eventTypes: Array.from(eventTypesSet), + modules: Array.from(modulesSet), + samplingRates: { + debug: rates.debug, + info: rates.info, + warn: rates.warn, + error: rates.error, + fatal: rates.fatal, + }, + batchSize: CLIENT_BATCH_SIZE, + flushIntervalMs: CLIENT_FLUSH_MS, + maxQueueSize: CLIENT_MAX_QUEUE, + }; +} + +// ─── Cluster update ───────────────────────────────────────────────── + +async function updateClusterForEvent(event: TelemetryEventDoc): Promise { + if (!['warn', 'error', 'fatal'].includes(event.eventType)) return; + + const fingerprint = event.fingerprint || generateFingerprint(event); + const date = new Date(event.occurredAt); + const yyyyMM = `${date.getUTCFullYear()}${String(date.getUTCMonth() + 1).padStart(2, '0')}`; + const clusterId = `${fingerprint}:${yyyyMM}`; + const pk = `${event.productId}:${event.platform}:${event.module}`; + + const existing = await repo.getCluster(clusterId, pk); + + if (existing) { + existing.totalCount++; + existing.lastSeenAt = event.receivedAt; + existing.sampleMessage = event.message; + existing.sampleErrorDomain = event.errorDomain; + existing.sampleErrorCode = event.errorCode; + + // Update affected users (dedup, cap 100) + const uid = event.userId || event.anonymousInstallId; + if (uid && event.userId && !existing.affectedUserIds.includes(uid)) { + if (existing.affectedUserIds.length < 100) existing.affectedUserIds.push(uid); + } + if (uid && event.anonymousInstallId && !existing.affectedInstallIds.includes(uid)) { + if (existing.affectedInstallIds.length < 100) existing.affectedInstallIds.push(uid); + } + if (!existing.affectedOsFamilies.includes(event.osFamily)) { + existing.affectedOsFamilies.push(event.osFamily); + } + + // Update version breakdown + const vEntry = existing.affectedVersions.find( + v => v.appVersion === event.appVersion && v.buildNumber === event.buildNumber + ); + if (vEntry) { + vEntry.count++; + vEntry.lastSeenAt = event.receivedAt; + } else if (existing.affectedVersions.length < 50) { + existing.affectedVersions.push({ + appVersion: event.appVersion, + buildNumber: event.buildNumber, + count: 1, + lastSeenAt: event.receivedAt, + }); + } + + // Escalate severity + const severityOrder = { warn: 0, error: 1, fatal: 2 }; + const eventSev = event.eventType as 'warn' | 'error' | 'fatal'; + if ((severityOrder[eventSev] ?? 0) > (severityOrder[existing.severity] ?? 0)) { + existing.severity = eventSev; + } + + await repo.upsertCluster(existing); + } else { + const newCluster: TelemetryErrorCluster = { + id: clusterId, + pk, + productId: event.productId, + fingerprint, + platform: event.platform, + channel: event.channel, + module: event.module, + eventName: event.eventName, + affectedVersions: [ + { + appVersion: event.appVersion, + buildNumber: event.buildNumber, + count: 1, + lastSeenAt: event.receivedAt, + }, + ], + firstSeenAt: event.receivedAt, + lastSeenAt: event.receivedAt, + totalCount: 1, + affectedUserIds: event.userId ? [event.userId] : [], + affectedInstallIds: event.anonymousInstallId ? [event.anonymousInstallId] : [], + affectedOsFamilies: [event.osFamily], + sampleErrorDomain: event.errorDomain, + sampleErrorCode: event.errorCode, + sampleMessage: event.message, + severity: event.eventType as 'warn' | 'error' | 'fatal', + ttl: DEFAULT_CLUSTER_TTL_DAYS * 86400, + }; + await repo.upsertCluster(newCluster); + } +} + +// ─── Routes ───────────────────────────────────────────────────────── + +export async function telemetryRoutes(app: FastifyInstance) { + // ── Batch ingest ────────────────────────────────────────────── + app.post('/telemetry/events', async (req, reply) => { + if (!TELEMETRY_ENABLED) { + return { accepted: 0, rejected: 0, serverTime: new Date().toISOString() }; + } + + // Auth: JWT or X-Install-Token + const installToken = req.headers['x-install-token'] as string | undefined; + if (!req.jwtPayload && !installToken) { + throw new UnauthorizedError('JWT or X-Install-Token required'); + } + + const parsed = TelemetryIngestRequestSchema.safeParse(req.body); + if (!parsed.success) { + throw new BadRequestError(parsed.error.issues.map(i => i.message).join('; ')); + } + + const { productId, events } = parsed.data; + const now = new Date().toISOString(); + const ttl = DEFAULT_EVENT_TTL_DAYS * 86400; + + let accepted = 0; + let rejected = 0; + const errors: Array<{ index: number; reason: string }> = []; + const docsToInsert: TelemetryEventDoc[] = []; + + for (let i = 0; i < events.length; i++) { + const event = events[i]; + + // Validate productId matches request-level + if (event.productId !== productId) { + errors.push({ index: i, reason: `productId mismatch: expected ${productId}` }); + rejected++; + continue; + } + + // PII scan + const fieldsToScan = [event.message, event.eventName, event.errorDomain].filter(Boolean); + if (fieldsToScan.some(f => containsPII(f!))) { + errors.push({ index: i, reason: 'PII detected' }); + rejected++; + continue; + } + + const pk = computePk(productId, event.occurredAt, event.platform); + const doc: TelemetryEventDoc = { + ...event, + pk, + receivedAt: now, + ttl, + }; + + docsToInsert.push(doc); + accepted++; + } + + // Batch upsert (idempotent by event id) + if (docsToInsert.length > 0) { + await repo.upsertEventsBatch(docsToInsert); + + // Fire-and-forget cluster updates for warn/error/fatal + for (const doc of docsToInsert) { + if (['warn', 'error', 'fatal'].includes(doc.eventType)) { + updateClusterForEvent(doc).catch(err => { + req.log.warn({ err, eventId: doc.id }, 'Cluster update failed'); + }); + } + } + } + + reply.code(accepted > 0 ? 200 : 400); + return { + accepted, + rejected, + errors: errors.length > 0 ? errors : undefined, + serverTime: now, + }; + }); + + // ── Collection config (client poll) ─────────────────────────── + app.get('/telemetry/config', async req => { + if (!TELEMETRY_ENABLED) { + return { + enabled: false, + eventTypes: [], + modules: [], + samplingRates: { debug: 0, info: 0, warn: 0, error: 0, fatal: 0 }, + batchSize: CLIENT_BATCH_SIZE, + flushIntervalMs: CLIENT_FLUSH_MS, + maxQueueSize: CLIENT_MAX_QUEUE, + } satisfies TelemetryCollectionConfig; + } + + const productId = getRequestProductId(req); + const ctx: ClientContext = req.query as ClientContext; + + const allPolicies = await repo.listPolicies(productId); + const activePolicies = allPolicies.filter(p => p.enabled && policyMatchesContext(p, ctx)); + + return mergePolicies(activePolicies); + }); + + // ── Admin: query events ─────────────────────────────────────── + app.get('/telemetry/query', async req => { + requireAdmin(req); + const productId = getRequestProductId(req); + const parsed = TelemetryQuerySchema.safeParse(req.query); + if (!parsed.success) { + throw new BadRequestError(parsed.error.issues.map(i => i.message).join('; ')); + } + const result = await repo.queryEvents(productId, parsed.data); + return { + events: result.events, + total: result.events.length, + continuationToken: result.continuationToken, + }; + }); + + // ── Admin: error clusters ───────────────────────────────────── + app.get('/telemetry/clusters', async req => { + requireAdmin(req); + const productId = getRequestProductId(req); + const { + platform, + module: mod, + from, + to, + limit, + } = req.query as { + platform?: string; + module?: string; + from?: string; + to?: string; + limit?: string; + }; + const clusters = await repo.listClusters(productId, { + platform, + module: mod, + from, + to, + limit: limit ? parseInt(limit, 10) : undefined, + }); + return { clusters, total: clusters.length }; + }); + + // ── Admin: list policies ────────────────────────────────────── + app.get('/telemetry/policies', async req => { + requireAdmin(req); + const productId = getRequestProductId(req); + return { policies: await repo.listPolicies(productId) }; + }); + + // ── Admin: create policy ────────────────────────────────────── + app.post('/telemetry/policies', async (req, reply) => { + requireAdmin(req); + const productId = getRequestProductId(req); + const parsed = CreatePolicySchema.safeParse(req.body); + if (!parsed.success) { + throw new BadRequestError(parsed.error.issues.map(i => i.message).join('; ')); + } + + const now = new Date().toISOString(); + const doc: TelemetryCollectionPolicyDoc = { + id: randomUUID(), + productId, + ...parsed.data, + createdAt: now, + updatedAt: now, + createdBy: req.jwtPayload?.sub ?? 'unknown', + }; + + const created = await repo.createPolicy(doc); + reply.code(201); + return created; + }); + + // ── Admin: update policy ────────────────────────────────────── + app.put('/telemetry/policies/:id', async req => { + requireAdmin(req); + const { id } = req.params as { id: string }; + const productId = getRequestProductId(req); + + const parsed = UpdatePolicySchema.safeParse(req.body); + if (!parsed.success) { + throw new BadRequestError(parsed.error.issues.map(i => i.message).join('; ')); + } + + // Convert null to undefined for optional fields (Zod nullable → TS optional) + const updates: Record = { ...parsed.data }; + if (updates.startsAt === null) updates.startsAt = undefined; + if (updates.expiresAt === null) updates.expiresAt = undefined; + + const updated = await repo.updatePolicy( + id, + productId, + updates as Partial + ); + if (!updated) throw new NotFoundError('Policy not found'); + return updated; + }); + + // ── Admin: delete policy ────────────────────────────────────── + app.delete('/telemetry/policies/:id', async req => { + requireAdmin(req); + const { id } = req.params as { id: string }; + const productId = getRequestProductId(req); + const deleted = await repo.deletePolicy(id, productId); + if (!deleted) throw new NotFoundError('Policy not found'); + return { success: true }; + }); + + // ── Admin: GDPR erasure ─────────────────────────────────────── + app.delete('/telemetry/user/:userId', async req => { + requireAdmin(req); + const { userId } = req.params as { userId: string }; + const productId = getRequestProductId(req); + const eventsDeleted = await repo.deleteEventsByUserId(productId, userId); + return { userId, eventsDeleted, clustersUpdated: 0 }; + }); +} diff --git a/services/platform-service/src/modules/telemetry/telemetry.test.ts b/services/platform-service/src/modules/telemetry/telemetry.test.ts new file mode 100644 index 00000000..199bea96 --- /dev/null +++ b/services/platform-service/src/modules/telemetry/telemetry.test.ts @@ -0,0 +1,317 @@ +/** + * Telemetry module tests — types, policy evaluation, fingerprinting. + */ + +import { describe, it, expect } from 'vitest'; +import { + TelemetryEventSchema, + TelemetryIngestRequestSchema, + CreatePolicySchema, + UpdatePolicySchema, + TelemetryQuerySchema, +} from './types.js'; + +// ─── Minimal valid event for reuse ────────────────────────────────── + +function validEvent(overrides: Record = {}) { + return { + id: '550e8400-e29b-41d4-a716-446655440000', + productId: 'lysnrai', + anonymousInstallId: '660e8400-e29b-41d4-a716-446655440001', + sessionId: 'sess_abc123', + platform: 'ios', + channel: 'keyboard_extension', + osFamily: 'ios', + appVersion: '1.0.0', + buildNumber: '26', + releaseChannel: 'beta', + eventType: 'error', + module: 'keyboard_dictation', + eventName: 'recognition_failed', + occurredAt: '2026-02-17T08:00:00.000Z', + ...overrides, + }; +} + +// ─── TelemetryEventSchema ─────────────────────────────────────────── + +describe('TelemetryEventSchema', () => { + it('accepts a valid event with anonymousInstallId', () => { + const result = TelemetryEventSchema.safeParse(validEvent()); + expect(result.success).toBe(true); + }); + + it('accepts a valid event with userId', () => { + const result = TelemetryEventSchema.safeParse( + validEvent({ userId: 'usr_abc', anonymousInstallId: undefined }) + ); + expect(result.success).toBe(true); + }); + + it('accepts event with both userId and anonymousInstallId', () => { + const result = TelemetryEventSchema.safeParse(validEvent({ userId: 'usr_abc' })); + expect(result.success).toBe(true); + }); + + it('rejects event with neither userId nor anonymousInstallId', () => { + const result = TelemetryEventSchema.safeParse(validEvent({ anonymousInstallId: undefined })); + expect(result.success).toBe(false); + }); + + it('rejects empty id', () => { + const result = TelemetryEventSchema.safeParse(validEvent({ id: 'not-a-uuid' })); + expect(result.success).toBe(false); + }); + + it('rejects invalid platform', () => { + const result = TelemetryEventSchema.safeParse(validEvent({ platform: 'symbian' })); + expect(result.success).toBe(false); + }); + + it('rejects invalid channel', () => { + const result = TelemetryEventSchema.safeParse(validEvent({ channel: 'fax_machine' })); + expect(result.success).toBe(false); + }); + + it('rejects invalid osFamily', () => { + const result = TelemetryEventSchema.safeParse(validEvent({ osFamily: 'beos' })); + expect(result.success).toBe(false); + }); + + it('rejects invalid eventType', () => { + const result = TelemetryEventSchema.safeParse(validEvent({ eventType: 'trace' })); + expect(result.success).toBe(false); + }); + + it('rejects message > 512 chars', () => { + const result = TelemetryEventSchema.safeParse(validEvent({ message: 'x'.repeat(513) })); + expect(result.success).toBe(false); + }); + + it('accepts message at exactly 512 chars', () => { + const result = TelemetryEventSchema.safeParse(validEvent({ message: 'x'.repeat(512) })); + expect(result.success).toBe(true); + }); + + it('rejects stackTrace > 8192 chars', () => { + const result = TelemetryEventSchema.safeParse( + validEvent({ stackTrace: 'x'.repeat(8193), eventType: 'fatal' }) + ); + expect(result.success).toBe(false); + }); + + it('accepts optional fields', () => { + const result = TelemetryEventSchema.safeParse( + validEvent({ + feature: 'voice_typing', + errorDomain: 'kAFAssistantErrorDomain', + errorCode: '209', + message: 'Recognition timed out', + fingerprint: 'abc123def456', + tags: { backend: 'azure' }, + metrics: { duration_ms: 5000 }, + context: { dictation: { backend: 'azure' } }, + osVersion: 'iOS 18.2', + deviceModel: 'iPhone17,3', + locale: 'en-US', + timezone: 'America/Los_Angeles', + }) + ); + expect(result.success).toBe(true); + }); + + it('validates all platform enum values', () => { + for (const p of ['ios', 'android', 'web', 'desktop']) { + const result = TelemetryEventSchema.safeParse(validEvent({ platform: p })); + expect(result.success).toBe(true); + } + }); + + it('validates all channel enum values', () => { + for (const c of [ + 'mobile_app', + 'keyboard_extension', + 'web_app', + 'desktop_app', + 'backend_service', + ]) { + const result = TelemetryEventSchema.safeParse(validEvent({ channel: c })); + expect(result.success).toBe(true); + } + }); + + it('validates all osFamily enum values', () => { + for (const os of ['ios', 'android', 'macos', 'windows', 'linux', 'chromeos', 'other']) { + const result = TelemetryEventSchema.safeParse(validEvent({ osFamily: os })); + expect(result.success).toBe(true); + } + }); + + it('validates all eventType enum values', () => { + for (const et of ['debug', 'info', 'warn', 'error', 'fatal']) { + const result = TelemetryEventSchema.safeParse(validEvent({ eventType: et })); + expect(result.success).toBe(true); + } + }); + + it('validates all releaseChannel enum values', () => { + for (const rc of ['dev', 'beta', 'prod']) { + const result = TelemetryEventSchema.safeParse(validEvent({ releaseChannel: rc })); + expect(result.success).toBe(true); + } + }); +}); + +// ─── TelemetryIngestRequestSchema ─────────────────────────────────── + +describe('TelemetryIngestRequestSchema', () => { + it('accepts valid batch', () => { + const result = TelemetryIngestRequestSchema.safeParse({ + productId: 'lysnrai', + events: [validEvent()], + }); + expect(result.success).toBe(true); + }); + + it('rejects empty events array', () => { + const result = TelemetryIngestRequestSchema.safeParse({ + productId: 'lysnrai', + events: [], + }); + expect(result.success).toBe(false); + }); + + it('rejects > 50 events', () => { + const events = Array.from({ length: 51 }, (_, i) => + validEvent({ id: `550e8400-e29b-41d4-a716-44665544${String(i).padStart(4, '0')}` }) + ); + const result = TelemetryIngestRequestSchema.safeParse({ + productId: 'lysnrai', + events, + }); + expect(result.success).toBe(false); + }); + + it('accepts clientClockSkewMs', () => { + const result = TelemetryIngestRequestSchema.safeParse({ + productId: 'lysnrai', + events: [validEvent()], + clientClockSkewMs: -500, + }); + expect(result.success).toBe(true); + }); +}); + +// ─── CreatePolicySchema ───────────────────────────────────────────── + +describe('CreatePolicySchema', () => { + it('accepts minimal policy', () => { + const result = CreatePolicySchema.safeParse({ name: 'Test policy' }); + expect(result.success).toBe(true); + if (result.success) { + expect(result.data.enabled).toBe(true); + expect(result.data.priority).toBe(50); + expect(result.data.eventTypes).toEqual(['warn', 'error', 'fatal']); + expect(result.data.samplingRate).toBe(1.0); + } + }); + + it('accepts full policy with targeting', () => { + const result = CreatePolicySchema.safeParse({ + name: 'Debug iOS keyboard', + description: 'Debug user X iOS keyboard dictation', + enabled: true, + priority: 100, + eventTypes: ['debug', 'info', 'warn', 'error', 'fatal'], + modules: ['keyboard_dictation'], + samplingRate: 1.0, + targeting: { + userIds: ['usr_abc'], + platforms: ['ios'], + channels: ['keyboard_extension'], + osFamilies: ['ios'], + buildNumberRange: { min: 1, max: 25 }, + countryCodes: ['US'], + regionCodes: ['US:WA'], + releaseChannels: ['beta'], + percentage: 100, + }, + startsAt: '2026-02-17T00:00:00.000Z', + expiresAt: '2026-02-20T00:00:00.000Z', + }); + expect(result.success).toBe(true); + }); + + it('rejects empty name', () => { + const result = CreatePolicySchema.safeParse({ name: '' }); + expect(result.success).toBe(false); + }); + + it('rejects samplingRate > 1', () => { + const result = CreatePolicySchema.safeParse({ name: 'Test', samplingRate: 1.5 }); + expect(result.success).toBe(false); + }); + + it('rejects priority > 999', () => { + const result = CreatePolicySchema.safeParse({ name: 'Test', priority: 1000 }); + expect(result.success).toBe(false); + }); +}); + +// ─── UpdatePolicySchema ───────────────────────────────────────────── + +describe('UpdatePolicySchema', () => { + it('accepts partial updates', () => { + const result = UpdatePolicySchema.safeParse({ enabled: false, priority: 10 }); + expect(result.success).toBe(true); + }); + + it('accepts empty update', () => { + const result = UpdatePolicySchema.safeParse({}); + expect(result.success).toBe(true); + }); + + it('accepts nullable startsAt/expiresAt (to clear them)', () => { + const result = UpdatePolicySchema.safeParse({ startsAt: null, expiresAt: null }); + expect(result.success).toBe(true); + }); +}); + +// ─── TelemetryQuerySchema ─────────────────────────────────────────── + +describe('TelemetryQuerySchema', () => { + it('accepts empty query (defaults)', () => { + const result = TelemetryQuerySchema.safeParse({}); + expect(result.success).toBe(true); + if (result.success) { + expect(result.data.limit).toBe(50); + } + }); + + it('accepts full filter set', () => { + const result = TelemetryQuerySchema.safeParse({ + userId: 'usr_abc', + platform: 'ios', + channel: 'keyboard_extension', + module: 'keyboard_dictation', + eventType: 'error', + from: '2026-02-01T00:00:00.000Z', + to: '2026-02-17T23:59:59.000Z', + limit: 100, + }); + expect(result.success).toBe(true); + }); + + it('rejects limit > 200', () => { + const result = TelemetryQuerySchema.safeParse({ limit: 201 }); + expect(result.success).toBe(false); + }); + + it('coerces string limit to number', () => { + const result = TelemetryQuerySchema.safeParse({ limit: '25' }); + expect(result.success).toBe(true); + if (result.success) { + expect(result.data.limit).toBe(25); + } + }); +}); diff --git a/services/platform-service/src/modules/telemetry/types.ts b/services/platform-service/src/modules/telemetry/types.ts new file mode 100644 index 00000000..475caf41 --- /dev/null +++ b/services/platform-service/src/modules/telemetry/types.ts @@ -0,0 +1,255 @@ +/** + * Telemetry types — event schema, collection policies, and cluster types. + * See docs/WINDSURF/CLIENT_TELEMETRY_DESIGN.md for full design. + */ + +import { z } from 'zod'; + +// ─── Enums ────────────────────────────────────────────────────────── + +export const PlatformEnum = z.enum(['ios', 'android', 'web', 'desktop']); +export const ChannelEnum = z.enum([ + 'mobile_app', + 'keyboard_extension', + 'web_app', + 'desktop_app', + 'backend_service', +]); +export const OsFamilyEnum = z.enum([ + 'ios', + 'android', + 'macos', + 'windows', + 'linux', + 'chromeos', + 'other', +]); +export const EventTypeEnum = z.enum(['debug', 'info', 'warn', 'error', 'fatal']); +export const ReleaseChannelEnum = z.enum(['dev', 'beta', 'prod']); + +// ─── Telemetry Event ──────────────────────────────────────────────── + +export const TelemetryEventSchema = z + .object({ + // Identity + id: z.string().uuid(), + productId: z.string().min(1), + userId: z.string().optional(), + anonymousInstallId: z.string().uuid().optional(), + sessionId: z.string().min(1), + requestId: z.string().optional(), + + // Source classification + platform: PlatformEnum, + channel: ChannelEnum, + osFamily: OsFamilyEnum, + osVersion: z.string().optional(), + deviceModel: z.string().optional(), + locale: z.string().optional(), + timezone: z.string().optional(), + + // App release + appVersion: z.string().min(1), + buildNumber: z.string().min(1), + releaseChannel: ReleaseChannelEnum, + + // Event semantics + eventType: EventTypeEnum, + module: z.string().min(1), + feature: z.string().optional(), + eventName: z.string().min(1), + + // Error & diagnostics + errorDomain: z.string().optional(), + errorCode: z.string().optional(), + message: z.string().max(512).optional(), + stackTrace: z.string().max(8192).optional(), + fingerprint: z.string().optional(), + + // Structured metadata + tags: z.record(z.string().max(128)).optional(), + metrics: z.record(z.number()).optional(), + context: z.record(z.unknown()).optional(), + + // Timing (client provides occurredAt) + occurredAt: z.string().datetime(), + }) + .refine(e => e.userId || e.anonymousInstallId, { + message: 'At least one of userId or anonymousInstallId is required', + }); + +export type TelemetryEvent = z.infer; + +/** Server-enriched event document stored in Cosmos. */ +export interface TelemetryEventDoc extends TelemetryEvent { + pk: string; // ${productId}:${yyyyMM}:${platform} + receivedAt: string; + ttl: number; // Cosmos TTL in seconds + countryCode?: string; + regionCode?: string; +} + +// ─── Batch Ingest Request ─────────────────────────────────────────── + +export const TelemetryIngestRequestSchema = z.object({ + productId: z.string().min(1), + events: z.array(TelemetryEventSchema).min(1).max(50), + clientClockSkewMs: z.number().optional(), +}); + +export type TelemetryIngestRequest = z.infer; + +export interface TelemetryIngestResponse { + accepted: number; + rejected: number; + errors?: Array<{ index: number; reason: string }>; + serverTime: string; +} + +// ─── Collection Policy ────────────────────────────────────────────── + +export const TelemetryTargetingSchema = z.object({ + userIds: z.array(z.string()).optional(), + anonymousInstallIds: z.array(z.string()).optional(), + platforms: z.array(PlatformEnum).optional(), + channels: z.array(ChannelEnum).optional(), + osFamilies: z.array(OsFamilyEnum).optional(), + appVersions: z.array(z.string()).optional(), + appVersionRange: z + .object({ + min: z.string().optional(), + max: z.string().optional(), + }) + .optional(), + buildNumbers: z.array(z.string()).optional(), + buildNumberRange: z + .object({ + min: z.number().optional(), + max: z.number().optional(), + }) + .optional(), + countryCodes: z.array(z.string()).optional(), + regionCodes: z.array(z.string()).optional(), + releaseChannels: z.array(ReleaseChannelEnum).optional(), + percentage: z.number().min(0).max(100).optional(), +}); + +export const CreatePolicySchema = z.object({ + name: z.string().min(1).max(200), + description: z.string().default(''), + enabled: z.boolean().default(true), + priority: z.number().int().min(0).max(999).default(50), + eventTypes: z.array(EventTypeEnum).default(['warn', 'error', 'fatal']), + modules: z.array(z.string()).default([]), + samplingRate: z.number().min(0).max(1).default(1.0), + targeting: TelemetryTargetingSchema.default({}), + startsAt: z.string().datetime().optional(), + expiresAt: z.string().datetime().optional(), +}); + +export const UpdatePolicySchema = z.object({ + name: z.string().min(1).max(200).optional(), + description: z.string().optional(), + enabled: z.boolean().optional(), + priority: z.number().int().min(0).max(999).optional(), + eventTypes: z.array(EventTypeEnum).optional(), + modules: z.array(z.string()).optional(), + samplingRate: z.number().min(0).max(1).optional(), + targeting: TelemetryTargetingSchema.optional(), + startsAt: z.string().datetime().optional().nullable(), + expiresAt: z.string().datetime().optional().nullable(), +}); + +export type CreatePolicyInput = z.infer; +export type UpdatePolicyInput = z.infer; + +export interface TelemetryCollectionPolicyDoc { + id: string; + productId: string; + name: string; + description: string; + enabled: boolean; + priority: number; + eventTypes: string[]; + modules: string[]; + samplingRate: number; + targeting: z.infer; + startsAt?: string; + expiresAt?: string; + createdAt: string; + updatedAt: string; + createdBy: string; +} + +// ─── Collection Config Response (for clients) ────────────────────── + +export interface TelemetryCollectionConfig { + enabled: boolean; + eventTypes: string[]; + modules: string[]; + samplingRates: { + debug: number; + info: number; + warn: number; + error: number; + fatal: number; + }; + batchSize: number; + flushIntervalMs: number; + maxQueueSize: number; +} + +// ─── Error Cluster ────────────────────────────────────────────────── + +export interface TelemetryErrorCluster { + id: string; // ${fingerprint}:${yyyyMM} + pk: string; // ${productId}:${platform}:${module} + productId: string; + fingerprint: string; + + platform: string; + channel: string; + module: string; + eventName: string; + + affectedVersions: Array<{ + appVersion: string; + buildNumber: string; + count: number; + lastSeenAt: string; + }>; + + firstSeenAt: string; + lastSeenAt: string; + totalCount: number; + affectedUserIds: string[]; + affectedInstallIds: string[]; + affectedOsFamilies: string[]; + + sampleErrorDomain?: string; + sampleErrorCode?: string; + sampleMessage?: string; + severity: 'warn' | 'error' | 'fatal'; + ttl: number; +} + +// ─── Query / Admin types ──────────────────────────────────────────── + +export const TelemetryQuerySchema = z.object({ + userId: z.string().optional(), + anonymousInstallId: z.string().optional(), + platform: z.string().optional(), + channel: z.string().optional(), + osFamily: z.string().optional(), + appVersion: z.string().optional(), + buildNumber: z.string().optional(), + module: z.string().optional(), + eventName: z.string().optional(), + eventType: z.string().optional(), + from: z.string().datetime().optional(), + to: z.string().datetime().optional(), + limit: z.coerce.number().int().min(1).max(200).default(50), + continuationToken: z.string().optional(), +}); + +export type TelemetryQueryInput = z.infer; diff --git a/services/platform-service/src/server.ts b/services/platform-service/src/server.ts index ed0007fc..88d7e21a 100644 --- a/services/platform-service/src/server.ts +++ b/services/platform-service/src/server.ts @@ -46,6 +46,7 @@ import { publicRoutes } from './modules/public/routes.js'; import { tokenRoutes } from './modules/tokens/routes.js'; import { themeRoutes } from './modules/themes/routes.js'; import { waitlistRoutes } from './modules/waitlist/routes.js'; +import { telemetryRoutes } from './modules/telemetry/routes.js'; import { initCosmosIfNeeded } from './lib/cosmos-init.js'; import { config } from './lib/config.js'; @@ -116,6 +117,8 @@ await app.register(tokenRoutes, { prefix: '/api' }); await app.register(themeRoutes, { prefix: '/api' }); // Waitlist module (pre-launch signups — public + admin routes) await app.register(waitlistRoutes, { prefix: '/api' }); +// Telemetry module (client ingest + admin query + policies) +await app.register(telemetryRoutes, { prefix: '/api' }); // Public routes — no auth, registered at top level await app.register(publicRoutes, { prefix: '/api' });