feat(telemetry): Phase 3 — geo enrichment, Prometheus metrics export endpoint
This commit is contained in:
parent
17f4e39c63
commit
2f61ea517c
@ -165,6 +165,57 @@ function emitAudit(
|
||||
auditRepo.create(doc).catch(() => {});
|
||||
}
|
||||
|
||||
// ─── Geo enrichment (best-effort IP → country/region) ─────────
|
||||
|
||||
const GEO_ENABLED = process.env.TELEMETRY_GEO_ENABLED === 'true';
|
||||
const GEO_API_URL = process.env.TELEMETRY_GEO_API_URL ?? ''; // e.g. http://ip-api.com/json/{ip}?fields=countryCode,region
|
||||
|
||||
interface GeoResult {
|
||||
countryCode?: string;
|
||||
regionCode?: string;
|
||||
}
|
||||
|
||||
const geoCache = new Map<string, { result: GeoResult; ts: number }>();
|
||||
const GEO_CACHE_TTL_MS = 24 * 60 * 60 * 1000; // 24 h
|
||||
|
||||
async function lookupGeo(ip: string): Promise<GeoResult> {
|
||||
if (!GEO_ENABLED || !GEO_API_URL || !ip || ip === '127.0.0.1' || ip === '::1') return {};
|
||||
|
||||
const cached = geoCache.get(ip);
|
||||
if (cached && Date.now() - cached.ts < GEO_CACHE_TTL_MS) return cached.result;
|
||||
|
||||
try {
|
||||
const url = GEO_API_URL.replace('{ip}', encodeURIComponent(ip));
|
||||
const res = await fetch(url, { signal: AbortSignal.timeout(2000) });
|
||||
if (!res.ok) return {};
|
||||
const data = (await res.json()) as Record<string, string>;
|
||||
const result: GeoResult = {
|
||||
countryCode: data.countryCode || undefined,
|
||||
regionCode:
|
||||
data.countryCode && data.region ? `${data.countryCode}:${data.region}` : undefined,
|
||||
};
|
||||
geoCache.set(ip, { result, ts: Date.now() });
|
||||
// Cap cache size
|
||||
if (geoCache.size > 10_000) {
|
||||
const oldest = geoCache.keys().next().value;
|
||||
if (oldest) geoCache.delete(oldest);
|
||||
}
|
||||
return result;
|
||||
} catch {
|
||||
return {};
|
||||
}
|
||||
}
|
||||
|
||||
function extractClientIp(req: {
|
||||
headers: Record<string, string | string[] | undefined>;
|
||||
ip?: string;
|
||||
}): string {
|
||||
const xff = req.headers['x-forwarded-for'];
|
||||
if (typeof xff === 'string') return xff.split(',')[0].trim();
|
||||
if (Array.isArray(xff) && xff.length > 0) return xff[0].split(',')[0].trim();
|
||||
return req.ip ?? '';
|
||||
}
|
||||
|
||||
/** PII patterns — reject events containing these. */
|
||||
const PII_PATTERNS = [
|
||||
/\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z]{2,}\b/i, // email
|
||||
@ -494,6 +545,12 @@ export async function telemetryRoutes(app: FastifyInstance) {
|
||||
metrics.totalDuplicatesDropped += dupCount;
|
||||
metrics.totalBatchRequests++;
|
||||
|
||||
// Geo enrichment (best-effort, non-blocking)
|
||||
const clientIp = extractClientIp(
|
||||
req as unknown as { headers: Record<string, string | string[] | undefined>; ip?: string }
|
||||
);
|
||||
const geo = await lookupGeo(clientIp);
|
||||
|
||||
const now = new Date().toISOString();
|
||||
const ttl = DEFAULT_EVENT_TTL_DAYS * 86400;
|
||||
|
||||
@ -527,6 +584,9 @@ export async function telemetryRoutes(app: FastifyInstance) {
|
||||
pk,
|
||||
receivedAt: now,
|
||||
ttl,
|
||||
// Geo enrichment from server-side IP lookup
|
||||
countryCode: geo.countryCode,
|
||||
regionCode: geo.regionCode,
|
||||
};
|
||||
|
||||
docsToInsert.push(doc);
|
||||
@ -764,9 +824,37 @@ export async function telemetryRoutes(app: FastifyInstance) {
|
||||
return updated;
|
||||
});
|
||||
|
||||
// ── Admin: ingestion metrics ──────────────────────────────
|
||||
// ── Admin: ingestion metrics (JSON) ─────────────────────
|
||||
app.get('/telemetry/metrics', async req => {
|
||||
requireAdmin(req);
|
||||
return metrics;
|
||||
});
|
||||
|
||||
// ── Prometheus OpenMetrics export ──────────────────────
|
||||
app.get('/telemetry/metrics/prometheus', async (req, reply) => {
|
||||
requireAdmin(req);
|
||||
const lines = [
|
||||
'# HELP telemetry_events_ingested_total Total events successfully ingested',
|
||||
'# TYPE telemetry_events_ingested_total counter',
|
||||
`telemetry_events_ingested_total ${metrics.totalEventsIngested}`,
|
||||
'# HELP telemetry_events_rejected_total Total events rejected (PII, mismatch, etc.)',
|
||||
'# TYPE telemetry_events_rejected_total counter',
|
||||
`telemetry_events_rejected_total ${metrics.totalEventsRejected}`,
|
||||
'# HELP telemetry_batch_requests_total Total batch ingest requests',
|
||||
'# TYPE telemetry_batch_requests_total counter',
|
||||
`telemetry_batch_requests_total ${metrics.totalBatchRequests}`,
|
||||
'# HELP telemetry_rate_limited_total Total rate-limited requests',
|
||||
'# TYPE telemetry_rate_limited_total counter',
|
||||
`telemetry_rate_limited_total ${metrics.totalRateLimited}`,
|
||||
'# HELP telemetry_pii_blocked_total Total events blocked for PII',
|
||||
'# TYPE telemetry_pii_blocked_total counter',
|
||||
`telemetry_pii_blocked_total ${metrics.totalPiiBlocked}`,
|
||||
'# HELP telemetry_duplicates_dropped_total Total duplicate events dropped',
|
||||
'# TYPE telemetry_duplicates_dropped_total counter',
|
||||
`telemetry_duplicates_dropped_total ${metrics.totalDuplicatesDropped}`,
|
||||
'',
|
||||
];
|
||||
reply.type('text/plain; version=0.0.4; charset=utf-8');
|
||||
return lines.join('\n');
|
||||
});
|
||||
}
|
||||
|
||||
Loading…
Reference in New Issue
Block a user