feat(extraction): Phase 5 observability + error handling (5.7-5.12)

- 5.7: Enhanced structured logging with userId, productId, cacheHit, tokenCount
- 5.8: Metrics module (counters + histograms) + /extract/metrics endpoint
- 5.9: Grafana dashboard config for extraction-service (Loki queries)
- 5.10: Error mapping — sidecar errors → proper HTTP status codes (408, 429, 502, 503)
- 5.11: Circuit breaker for Python sidecar (5 failures → 30s OPEN)
- 5.12: Graceful degradation — circuit open returns 503, cached results still served
- 46 TS tests passing
This commit is contained in:
saravanakumardb1 2026-02-14 14:04:59 -08:00
parent 9c8a3169dc
commit b8c0a73e89
4 changed files with 392 additions and 22 deletions

View File

@ -0,0 +1,85 @@
/**
* Simple circuit breaker for the Python sidecar.
*
* States: CLOSED (normal) OPEN (fail fast) HALF_OPEN (probe)
* Opens after `failureThreshold` consecutive failures.
* Resets after `resetTimeoutMs` in OPEN state.
*/
type State = 'CLOSED' | 'OPEN' | 'HALF_OPEN';
export interface CircuitBreakerOptions {
failureThreshold?: number;
resetTimeoutMs?: number;
}
export class CircuitBreaker {
private state: State = 'CLOSED';
private failureCount = 0;
private lastFailureTime = 0;
private readonly failureThreshold: number;
private readonly resetTimeoutMs: number;
constructor(opts: CircuitBreakerOptions = {}) {
this.failureThreshold = opts.failureThreshold ?? 5;
this.resetTimeoutMs = opts.resetTimeoutMs ?? 30_000; // 30s
}
/**
* Check if request is allowed. Throws if circuit is OPEN.
*/
allowRequest(): boolean {
if (this.state === 'CLOSED') return true;
if (this.state === 'OPEN') {
// Check if reset timeout has elapsed
if (Date.now() - this.lastFailureTime >= this.resetTimeoutMs) {
this.state = 'HALF_OPEN';
return true; // Allow one probe request
}
return false; // Still open, fail fast
}
// HALF_OPEN: allow the probe
return true;
}
/**
* Record a successful call resets the breaker to CLOSED.
*/
recordSuccess(): void {
this.failureCount = 0;
this.state = 'CLOSED';
}
/**
* Record a failure may trip the breaker to OPEN.
*/
recordFailure(): void {
this.failureCount++;
this.lastFailureTime = Date.now();
if (this.failureCount >= this.failureThreshold) {
this.state = 'OPEN';
}
}
get currentState(): State {
return this.state;
}
get stats(): { state: State; failureCount: number; threshold: number; resetMs: number } {
return {
state: this.state,
failureCount: this.failureCount,
threshold: this.failureThreshold,
resetMs: this.resetTimeoutMs,
};
}
}
// Module-level singleton for the sidecar circuit breaker
export const sidecarBreaker = new CircuitBreaker({
failureThreshold: 5,
resetTimeoutMs: 30_000,
});

View File

@ -0,0 +1,107 @@
/**
* Prometheus metrics for extraction-service.
*
* Exposed via GET /metrics (auto-registered by fastify-metrics).
* Custom counters/histograms for extraction-specific telemetry.
*/
// ── In-memory counters (fastify-metrics handles HTTP metrics) ────
// These are simple counters since prom-client may not be directly available.
// They're exposed via the /extract/metrics endpoint.
interface MetricBucket {
labels: Record<string, string>;
value: number;
}
class Counter {
private _buckets = new Map<string, MetricBucket>();
constructor(public readonly name: string) {}
inc(labels: Record<string, string>, amount = 1): void {
const key = JSON.stringify(labels);
const existing = this._buckets.get(key);
if (existing) {
existing.value += amount;
} else {
this._buckets.set(key, { labels, value: amount });
}
}
toJSON(): Array<{ labels: Record<string, string>; value: number }> {
return [...this._buckets.values()];
}
}
class Histogram {
private _observations: Array<{ labels: Record<string, string>; value: number }> = [];
private _sum = 0;
private _count = 0;
constructor(public readonly name: string) {}
observe(labels: Record<string, string>, value: number): void {
this._observations.push({ labels, value });
this._sum += value;
this._count++;
}
toJSON(): { count: number; sum: number; avg: number } {
return {
count: this._count,
sum: Math.round(this._sum * 100) / 100,
avg: this._count > 0 ? Math.round((this._sum / this._count) * 100) / 100 : 0,
};
}
}
// ── Exported metrics ─────────────────────────────────────────────
export const extractionRequestsTotal = new Counter('extraction_requests_total');
export const extractionDurationSeconds = new Histogram('extraction_duration_seconds');
export const extractionEntitiesExtracted = new Histogram('extraction_entities_extracted');
export const extractionCacheHitTotal = new Counter('extraction_cache_hit_total');
/**
* Record an extraction event with all metric dimensions.
*/
export function recordExtraction(params: {
taskId?: string;
modelId?: string;
productId?: string;
status: 'success' | 'error' | 'cache_hit';
durationMs?: number;
entityCount?: number;
}): void {
const labels = {
task_id: params.taskId || 'unknown',
model_id: params.modelId || 'unknown',
product_id: params.productId || 'unknown',
status: params.status,
};
extractionRequestsTotal.inc(labels);
if (params.status === 'cache_hit') {
extractionCacheHitTotal.inc(labels);
}
if (params.durationMs !== undefined) {
extractionDurationSeconds.observe(labels, params.durationMs / 1000);
}
if (params.entityCount !== undefined) {
extractionEntitiesExtracted.observe(labels, params.entityCount);
}
}
/**
* Get all metrics as a JSON summary.
*/
export function getMetricsSummary(): Record<string, unknown> {
return {
extraction_requests_total: extractionRequestsTotal.toJSON(),
extraction_duration_seconds: extractionDurationSeconds.toJSON(),
extraction_entities_extracted: extractionEntitiesExtracted.toJSON(),
extraction_cache_hit_total: extractionCacheHitTotal.toJSON(),
};
}

View File

@ -11,6 +11,8 @@ import {
} from '../../lib/python-bridge.js';
import { BadRequestError } from '../../lib/errors.js';
import { checkQuota, incrementUsage, getUsageSummary } from './usage.js';
import { recordExtraction, getMetricsSummary } from '../../lib/metrics.js';
import { sidecarBreaker } from '../../lib/circuit-breaker.js';
// ── In-memory LRU cache ────────────────────────────────────────
const CACHE_TTL_MS = parseInt(process.env.EXTRACTION_CACHE_TTL_MS || '86400000', 10); // 24h
@ -97,12 +99,28 @@ export async function extractRoutes(app: FastifyInstance) {
}
}
req.log.info({ taskId, modelId, textLength: text.length }, 'extraction request');
req.log.info(
{
taskId,
modelId,
textLength: text.length,
userId,
userPlan,
productId: req.headers['x-product-id'],
},
'extraction request'
);
// Check cache
const cached = cacheGet(text, taskId, modelId);
if (cached) {
req.log.info({ taskId }, 'cache hit');
req.log.info({ taskId, userId, cacheHit: true }, 'cache hit');
recordExtraction({
taskId,
modelId,
productId: req.headers['x-product-id'] as string,
status: 'cache_hit',
});
reply.header('X-Extraction-Cache', 'HIT');
return reply.send({
extractions: cached.extractions,
@ -118,32 +136,90 @@ export async function extractRoutes(app: FastifyInstance) {
reply.header('X-Extraction-Cache', 'MISS');
const result = await sidecarExtract(
{
text,
task_id: taskId,
task_prompt: taskPrompt,
examples: examples?.map(e => ({
text: e.text,
extractions: e.extractions.map(ex => ({
extraction_class: ex.extraction_class,
extraction_text: ex.extraction_text,
attributes: ex.attributes,
// Circuit breaker — fail fast if sidecar is down
if (!sidecarBreaker.allowRequest()) {
recordExtraction({ taskId, modelId, status: 'error' });
return reply.status(503).send({
error: 'Extraction service temporarily unavailable (circuit open)',
circuitState: sidecarBreaker.currentState,
});
}
let result;
try {
result = await sidecarExtract(
{
text,
task_id: taskId,
task_prompt: taskPrompt,
examples: examples?.map(e => ({
text: e.text,
extractions: e.extractions.map(ex => ({
extraction_class: ex.extraction_class,
extraction_text: ex.extraction_text,
attributes: ex.attributes,
})),
})),
})),
model_id: modelId,
extraction_passes: options?.extractionPasses,
max_workers: options?.maxWorkers,
max_char_buffer: options?.maxCharBuffer,
},
requestId
);
model_id: modelId,
extraction_passes: options?.extractionPasses,
max_workers: options?.maxWorkers,
max_char_buffer: options?.maxCharBuffer,
},
requestId
);
sidecarBreaker.recordSuccess();
} catch (err) {
sidecarBreaker.recordFailure();
const message = err instanceof Error ? err.message : 'Sidecar error';
req.log.error({ error: message, taskId, userId }, 'extraction failed');
recordExtraction({
taskId,
modelId,
productId: req.headers['x-product-id'] as string,
status: 'error',
});
// Map sidecar errors to proper HTTP status codes (5.10)
if (message.includes('timeout') || message.includes('Timeout')) {
return reply.status(408).send({ error: 'Extraction timed out', detail: message });
}
if (message.includes('429') || message.includes('rate limit')) {
return reply
.status(429)
.send({ error: 'Upstream LLM rate limit', detail: message, retryAfter: 60 });
}
if (message.includes('Model') && message.includes('unavailable')) {
return reply.status(503).send({ error: 'Model unavailable', detail: message });
}
if (message.includes('400') || message.includes('Invalid')) {
return reply.status(400).send({ error: 'Invalid extraction request', detail: message });
}
return reply.status(502).send({ error: 'Extraction sidecar error', detail: message });
}
cachePut(text, taskId, modelId, result);
if (userId) incrementUsage(userId, userPlan);
recordExtraction({
taskId,
modelId,
productId: req.headers['x-product-id'] as string,
status: 'success',
durationMs: result.metadata.duration_ms,
entityCount: result.extractions.length,
});
req.log.info(
{ entityCount: result.extractions.length, durationMs: result.metadata.duration_ms },
{
taskId,
modelId,
entityCount: result.extractions.length,
durationMs: result.metadata.duration_ms,
tokenCount: result.metadata.token_count,
charCount: result.metadata.char_count,
userId,
productId: req.headers['x-product-id'],
cacheHit: false,
},
'extraction complete'
);
@ -229,6 +305,13 @@ export async function extractRoutes(app: FastifyInstance) {
}
});
/**
* GET /extract/metrics Extraction metrics summary.
*/
app.get('/extract/metrics', async (_req, reply) => {
return reply.send(getMetricsSummary());
});
/**
* GET /extract/usage Per-user extraction usage (admin).
*/

View File

@ -0,0 +1,95 @@
{
"annotations": { "list": [] },
"editable": true,
"fiscalYearStartMonth": 0,
"graphTooltip": 0,
"id": null,
"links": [],
"panels": [
{
"title": "Extraction Requests (Loki logs)",
"type": "timeseries",
"gridPos": { "h": 8, "w": 12, "x": 0, "y": 0 },
"targets": [
{
"datasource": { "type": "loki", "uid": "loki" },
"expr": "count_over_time({container=\"extraction-service\"} |= \"extraction complete\" [5m])",
"legendFormat": "Extractions / 5m"
}
]
},
{
"title": "Cache Hits vs Misses",
"type": "timeseries",
"gridPos": { "h": 8, "w": 12, "x": 12, "y": 0 },
"targets": [
{
"datasource": { "type": "loki", "uid": "loki" },
"expr": "count_over_time({container=\"extraction-service\"} |= \"cache hit\" [5m])",
"legendFormat": "Cache Hits / 5m"
},
{
"datasource": { "type": "loki", "uid": "loki" },
"expr": "count_over_time({container=\"extraction-service\"} |= \"extraction complete\" |= \"cacheHit\":false [5m])",
"legendFormat": "Cache Misses / 5m"
}
]
},
{
"title": "Errors",
"type": "timeseries",
"gridPos": { "h": 8, "w": 12, "x": 0, "y": 8 },
"targets": [
{
"datasource": { "type": "loki", "uid": "loki" },
"expr": "count_over_time({container=\"extraction-service\"} |= \"error\" [5m])",
"legendFormat": "Errors / 5m"
}
]
},
{
"title": "Quota Exceeded (429s)",
"type": "stat",
"gridPos": { "h": 8, "w": 12, "x": 12, "y": 8 },
"targets": [
{
"datasource": { "type": "loki", "uid": "loki" },
"expr": "count_over_time({container=\"extraction-service\"} |= \"quota exceeded\" [24h])",
"legendFormat": "429s Today"
}
]
},
{
"title": "Python Sidecar Health",
"type": "stat",
"gridPos": { "h": 4, "w": 6, "x": 0, "y": 16 },
"targets": [
{
"datasource": { "type": "loki", "uid": "loki" },
"expr": "count_over_time({container=\"extraction-service\"} |= \"Sidecar unavailable\" [1h])",
"legendFormat": "Sidecar Errors (1h)"
}
]
},
{
"title": "Extraction Logs",
"type": "logs",
"gridPos": { "h": 10, "w": 24, "x": 0, "y": 20 },
"targets": [
{
"datasource": { "type": "loki", "uid": "loki" },
"expr": "{container=\"extraction-service\"}"
}
]
}
],
"schemaVersion": 39,
"tags": ["extraction", "lysnrai"],
"templating": { "list": [] },
"time": { "from": "now-6h", "to": "now" },
"timepicker": {},
"timezone": "",
"title": "Extraction Service",
"uid": "extraction-service",
"version": 1
}