From b8c0a73e895593f98b47495758960ec9737a2116 Mon Sep 17 00:00:00 2001 From: saravanakumardb1 Date: Sat, 14 Feb 2026 14:04:59 -0800 Subject: [PATCH] feat(extraction): Phase 5 observability + error handling (5.7-5.12) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 5.7: Enhanced structured logging with userId, productId, cacheHit, tokenCount - 5.8: Metrics module (counters + histograms) + /extract/metrics endpoint - 5.9: Grafana dashboard config for extraction-service (Loki queries) - 5.10: Error mapping — sidecar errors → proper HTTP status codes (408, 429, 502, 503) - 5.11: Circuit breaker for Python sidecar (5 failures → 30s OPEN) - 5.12: Graceful degradation — circuit open returns 503, cached results still served - 46 TS tests passing --- .../src/lib/circuit-breaker.ts | 85 ++++++++++++ .../extraction-service/src/lib/metrics.ts | 107 +++++++++++++++ .../src/modules/extract/routes.ts | 127 +++++++++++++++--- .../dashboards/extraction-service.json | 95 +++++++++++++ 4 files changed, 392 insertions(+), 22 deletions(-) create mode 100644 services/extraction-service/src/lib/circuit-breaker.ts create mode 100644 services/extraction-service/src/lib/metrics.ts create mode 100644 services/monitoring/grafana/dashboards/extraction-service.json diff --git a/services/extraction-service/src/lib/circuit-breaker.ts b/services/extraction-service/src/lib/circuit-breaker.ts new file mode 100644 index 00000000..df53187a --- /dev/null +++ b/services/extraction-service/src/lib/circuit-breaker.ts @@ -0,0 +1,85 @@ +/** + * Simple circuit breaker for the Python sidecar. + * + * States: CLOSED (normal) → OPEN (fail fast) → HALF_OPEN (probe) + * Opens after `failureThreshold` consecutive failures. + * Resets after `resetTimeoutMs` in OPEN state. + */ + +type State = 'CLOSED' | 'OPEN' | 'HALF_OPEN'; + +export interface CircuitBreakerOptions { + failureThreshold?: number; + resetTimeoutMs?: number; +} + +export class CircuitBreaker { + private state: State = 'CLOSED'; + private failureCount = 0; + private lastFailureTime = 0; + private readonly failureThreshold: number; + private readonly resetTimeoutMs: number; + + constructor(opts: CircuitBreakerOptions = {}) { + this.failureThreshold = opts.failureThreshold ?? 5; + this.resetTimeoutMs = opts.resetTimeoutMs ?? 30_000; // 30s + } + + /** + * Check if request is allowed. Throws if circuit is OPEN. + */ + allowRequest(): boolean { + if (this.state === 'CLOSED') return true; + + if (this.state === 'OPEN') { + // Check if reset timeout has elapsed + if (Date.now() - this.lastFailureTime >= this.resetTimeoutMs) { + this.state = 'HALF_OPEN'; + return true; // Allow one probe request + } + return false; // Still open, fail fast + } + + // HALF_OPEN: allow the probe + return true; + } + + /** + * Record a successful call — resets the breaker to CLOSED. + */ + recordSuccess(): void { + this.failureCount = 0; + this.state = 'CLOSED'; + } + + /** + * Record a failure — may trip the breaker to OPEN. + */ + recordFailure(): void { + this.failureCount++; + this.lastFailureTime = Date.now(); + + if (this.failureCount >= this.failureThreshold) { + this.state = 'OPEN'; + } + } + + get currentState(): State { + return this.state; + } + + get stats(): { state: State; failureCount: number; threshold: number; resetMs: number } { + return { + state: this.state, + failureCount: this.failureCount, + threshold: this.failureThreshold, + resetMs: this.resetTimeoutMs, + }; + } +} + +// Module-level singleton for the sidecar circuit breaker +export const sidecarBreaker = new CircuitBreaker({ + failureThreshold: 5, + resetTimeoutMs: 30_000, +}); diff --git a/services/extraction-service/src/lib/metrics.ts b/services/extraction-service/src/lib/metrics.ts new file mode 100644 index 00000000..cc10841d --- /dev/null +++ b/services/extraction-service/src/lib/metrics.ts @@ -0,0 +1,107 @@ +/** + * Prometheus metrics for extraction-service. + * + * Exposed via GET /metrics (auto-registered by fastify-metrics). + * Custom counters/histograms for extraction-specific telemetry. + */ + +// ── In-memory counters (fastify-metrics handles HTTP metrics) ──── +// These are simple counters since prom-client may not be directly available. +// They're exposed via the /extract/metrics endpoint. + +interface MetricBucket { + labels: Record; + value: number; +} + +class Counter { + private _buckets = new Map(); + constructor(public readonly name: string) {} + + inc(labels: Record, amount = 1): void { + const key = JSON.stringify(labels); + const existing = this._buckets.get(key); + if (existing) { + existing.value += amount; + } else { + this._buckets.set(key, { labels, value: amount }); + } + } + + toJSON(): Array<{ labels: Record; value: number }> { + return [...this._buckets.values()]; + } +} + +class Histogram { + private _observations: Array<{ labels: Record; value: number }> = []; + private _sum = 0; + private _count = 0; + constructor(public readonly name: string) {} + + observe(labels: Record, value: number): void { + this._observations.push({ labels, value }); + this._sum += value; + this._count++; + } + + toJSON(): { count: number; sum: number; avg: number } { + return { + count: this._count, + sum: Math.round(this._sum * 100) / 100, + avg: this._count > 0 ? Math.round((this._sum / this._count) * 100) / 100 : 0, + }; + } +} + +// ── Exported metrics ───────────────────────────────────────────── + +export const extractionRequestsTotal = new Counter('extraction_requests_total'); +export const extractionDurationSeconds = new Histogram('extraction_duration_seconds'); +export const extractionEntitiesExtracted = new Histogram('extraction_entities_extracted'); +export const extractionCacheHitTotal = new Counter('extraction_cache_hit_total'); + +/** + * Record an extraction event with all metric dimensions. + */ +export function recordExtraction(params: { + taskId?: string; + modelId?: string; + productId?: string; + status: 'success' | 'error' | 'cache_hit'; + durationMs?: number; + entityCount?: number; +}): void { + const labels = { + task_id: params.taskId || 'unknown', + model_id: params.modelId || 'unknown', + product_id: params.productId || 'unknown', + status: params.status, + }; + + extractionRequestsTotal.inc(labels); + + if (params.status === 'cache_hit') { + extractionCacheHitTotal.inc(labels); + } + + if (params.durationMs !== undefined) { + extractionDurationSeconds.observe(labels, params.durationMs / 1000); + } + + if (params.entityCount !== undefined) { + extractionEntitiesExtracted.observe(labels, params.entityCount); + } +} + +/** + * Get all metrics as a JSON summary. + */ +export function getMetricsSummary(): Record { + return { + extraction_requests_total: extractionRequestsTotal.toJSON(), + extraction_duration_seconds: extractionDurationSeconds.toJSON(), + extraction_entities_extracted: extractionEntitiesExtracted.toJSON(), + extraction_cache_hit_total: extractionCacheHitTotal.toJSON(), + }; +} diff --git a/services/extraction-service/src/modules/extract/routes.ts b/services/extraction-service/src/modules/extract/routes.ts index 9ace2b45..4030cc41 100644 --- a/services/extraction-service/src/modules/extract/routes.ts +++ b/services/extraction-service/src/modules/extract/routes.ts @@ -11,6 +11,8 @@ import { } from '../../lib/python-bridge.js'; import { BadRequestError } from '../../lib/errors.js'; import { checkQuota, incrementUsage, getUsageSummary } from './usage.js'; +import { recordExtraction, getMetricsSummary } from '../../lib/metrics.js'; +import { sidecarBreaker } from '../../lib/circuit-breaker.js'; // ── In-memory LRU cache ──────────────────────────────────────── const CACHE_TTL_MS = parseInt(process.env.EXTRACTION_CACHE_TTL_MS || '86400000', 10); // 24h @@ -97,12 +99,28 @@ export async function extractRoutes(app: FastifyInstance) { } } - req.log.info({ taskId, modelId, textLength: text.length }, 'extraction request'); + req.log.info( + { + taskId, + modelId, + textLength: text.length, + userId, + userPlan, + productId: req.headers['x-product-id'], + }, + 'extraction request' + ); // Check cache const cached = cacheGet(text, taskId, modelId); if (cached) { - req.log.info({ taskId }, 'cache hit'); + req.log.info({ taskId, userId, cacheHit: true }, 'cache hit'); + recordExtraction({ + taskId, + modelId, + productId: req.headers['x-product-id'] as string, + status: 'cache_hit', + }); reply.header('X-Extraction-Cache', 'HIT'); return reply.send({ extractions: cached.extractions, @@ -118,32 +136,90 @@ export async function extractRoutes(app: FastifyInstance) { reply.header('X-Extraction-Cache', 'MISS'); - const result = await sidecarExtract( - { - text, - task_id: taskId, - task_prompt: taskPrompt, - examples: examples?.map(e => ({ - text: e.text, - extractions: e.extractions.map(ex => ({ - extraction_class: ex.extraction_class, - extraction_text: ex.extraction_text, - attributes: ex.attributes, + // Circuit breaker — fail fast if sidecar is down + if (!sidecarBreaker.allowRequest()) { + recordExtraction({ taskId, modelId, status: 'error' }); + return reply.status(503).send({ + error: 'Extraction service temporarily unavailable (circuit open)', + circuitState: sidecarBreaker.currentState, + }); + } + + let result; + try { + result = await sidecarExtract( + { + text, + task_id: taskId, + task_prompt: taskPrompt, + examples: examples?.map(e => ({ + text: e.text, + extractions: e.extractions.map(ex => ({ + extraction_class: ex.extraction_class, + extraction_text: ex.extraction_text, + attributes: ex.attributes, + })), })), - })), - model_id: modelId, - extraction_passes: options?.extractionPasses, - max_workers: options?.maxWorkers, - max_char_buffer: options?.maxCharBuffer, - }, - requestId - ); + model_id: modelId, + extraction_passes: options?.extractionPasses, + max_workers: options?.maxWorkers, + max_char_buffer: options?.maxCharBuffer, + }, + requestId + ); + sidecarBreaker.recordSuccess(); + } catch (err) { + sidecarBreaker.recordFailure(); + const message = err instanceof Error ? err.message : 'Sidecar error'; + req.log.error({ error: message, taskId, userId }, 'extraction failed'); + recordExtraction({ + taskId, + modelId, + productId: req.headers['x-product-id'] as string, + status: 'error', + }); + + // Map sidecar errors to proper HTTP status codes (5.10) + if (message.includes('timeout') || message.includes('Timeout')) { + return reply.status(408).send({ error: 'Extraction timed out', detail: message }); + } + if (message.includes('429') || message.includes('rate limit')) { + return reply + .status(429) + .send({ error: 'Upstream LLM rate limit', detail: message, retryAfter: 60 }); + } + if (message.includes('Model') && message.includes('unavailable')) { + return reply.status(503).send({ error: 'Model unavailable', detail: message }); + } + if (message.includes('400') || message.includes('Invalid')) { + return reply.status(400).send({ error: 'Invalid extraction request', detail: message }); + } + return reply.status(502).send({ error: 'Extraction sidecar error', detail: message }); + } cachePut(text, taskId, modelId, result); if (userId) incrementUsage(userId, userPlan); + recordExtraction({ + taskId, + modelId, + productId: req.headers['x-product-id'] as string, + status: 'success', + durationMs: result.metadata.duration_ms, + entityCount: result.extractions.length, + }); req.log.info( - { entityCount: result.extractions.length, durationMs: result.metadata.duration_ms }, + { + taskId, + modelId, + entityCount: result.extractions.length, + durationMs: result.metadata.duration_ms, + tokenCount: result.metadata.token_count, + charCount: result.metadata.char_count, + userId, + productId: req.headers['x-product-id'], + cacheHit: false, + }, 'extraction complete' ); @@ -229,6 +305,13 @@ export async function extractRoutes(app: FastifyInstance) { } }); + /** + * GET /extract/metrics — Extraction metrics summary. + */ + app.get('/extract/metrics', async (_req, reply) => { + return reply.send(getMetricsSummary()); + }); + /** * GET /extract/usage — Per-user extraction usage (admin). */ diff --git a/services/monitoring/grafana/dashboards/extraction-service.json b/services/monitoring/grafana/dashboards/extraction-service.json new file mode 100644 index 00000000..161838cd --- /dev/null +++ b/services/monitoring/grafana/dashboards/extraction-service.json @@ -0,0 +1,95 @@ +{ + "annotations": { "list": [] }, + "editable": true, + "fiscalYearStartMonth": 0, + "graphTooltip": 0, + "id": null, + "links": [], + "panels": [ + { + "title": "Extraction Requests (Loki logs)", + "type": "timeseries", + "gridPos": { "h": 8, "w": 12, "x": 0, "y": 0 }, + "targets": [ + { + "datasource": { "type": "loki", "uid": "loki" }, + "expr": "count_over_time({container=\"extraction-service\"} |= \"extraction complete\" [5m])", + "legendFormat": "Extractions / 5m" + } + ] + }, + { + "title": "Cache Hits vs Misses", + "type": "timeseries", + "gridPos": { "h": 8, "w": 12, "x": 12, "y": 0 }, + "targets": [ + { + "datasource": { "type": "loki", "uid": "loki" }, + "expr": "count_over_time({container=\"extraction-service\"} |= \"cache hit\" [5m])", + "legendFormat": "Cache Hits / 5m" + }, + { + "datasource": { "type": "loki", "uid": "loki" }, + "expr": "count_over_time({container=\"extraction-service\"} |= \"extraction complete\" |= \"cacheHit\":false [5m])", + "legendFormat": "Cache Misses / 5m" + } + ] + }, + { + "title": "Errors", + "type": "timeseries", + "gridPos": { "h": 8, "w": 12, "x": 0, "y": 8 }, + "targets": [ + { + "datasource": { "type": "loki", "uid": "loki" }, + "expr": "count_over_time({container=\"extraction-service\"} |= \"error\" [5m])", + "legendFormat": "Errors / 5m" + } + ] + }, + { + "title": "Quota Exceeded (429s)", + "type": "stat", + "gridPos": { "h": 8, "w": 12, "x": 12, "y": 8 }, + "targets": [ + { + "datasource": { "type": "loki", "uid": "loki" }, + "expr": "count_over_time({container=\"extraction-service\"} |= \"quota exceeded\" [24h])", + "legendFormat": "429s Today" + } + ] + }, + { + "title": "Python Sidecar Health", + "type": "stat", + "gridPos": { "h": 4, "w": 6, "x": 0, "y": 16 }, + "targets": [ + { + "datasource": { "type": "loki", "uid": "loki" }, + "expr": "count_over_time({container=\"extraction-service\"} |= \"Sidecar unavailable\" [1h])", + "legendFormat": "Sidecar Errors (1h)" + } + ] + }, + { + "title": "Extraction Logs", + "type": "logs", + "gridPos": { "h": 10, "w": 24, "x": 0, "y": 20 }, + "targets": [ + { + "datasource": { "type": "loki", "uid": "loki" }, + "expr": "{container=\"extraction-service\"}" + } + ] + } + ], + "schemaVersion": 39, + "tags": ["extraction", "lysnrai"], + "templating": { "list": [] }, + "time": { "from": "now-6h", "to": "now" }, + "timepicker": {}, + "timezone": "", + "title": "Extraction Service", + "uid": "extraction-service", + "version": 1 +}