feat(extraction): Phase 5 observability + error handling (5.7-5.12)
- 5.7: Enhanced structured logging with userId, productId, cacheHit, tokenCount - 5.8: Metrics module (counters + histograms) + /extract/metrics endpoint - 5.9: Grafana dashboard config for extraction-service (Loki queries) - 5.10: Error mapping — sidecar errors → proper HTTP status codes (408, 429, 502, 503) - 5.11: Circuit breaker for Python sidecar (5 failures → 30s OPEN) - 5.12: Graceful degradation — circuit open returns 503, cached results still served - 46 TS tests passing
This commit is contained in:
parent
9c8a3169dc
commit
b8c0a73e89
85
services/extraction-service/src/lib/circuit-breaker.ts
Normal file
85
services/extraction-service/src/lib/circuit-breaker.ts
Normal file
@ -0,0 +1,85 @@
|
||||
/**
|
||||
* Simple circuit breaker for the Python sidecar.
|
||||
*
|
||||
* States: CLOSED (normal) → OPEN (fail fast) → HALF_OPEN (probe)
|
||||
* Opens after `failureThreshold` consecutive failures.
|
||||
* Resets after `resetTimeoutMs` in OPEN state.
|
||||
*/
|
||||
|
||||
type State = 'CLOSED' | 'OPEN' | 'HALF_OPEN';
|
||||
|
||||
export interface CircuitBreakerOptions {
|
||||
failureThreshold?: number;
|
||||
resetTimeoutMs?: number;
|
||||
}
|
||||
|
||||
export class CircuitBreaker {
|
||||
private state: State = 'CLOSED';
|
||||
private failureCount = 0;
|
||||
private lastFailureTime = 0;
|
||||
private readonly failureThreshold: number;
|
||||
private readonly resetTimeoutMs: number;
|
||||
|
||||
constructor(opts: CircuitBreakerOptions = {}) {
|
||||
this.failureThreshold = opts.failureThreshold ?? 5;
|
||||
this.resetTimeoutMs = opts.resetTimeoutMs ?? 30_000; // 30s
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if request is allowed. Throws if circuit is OPEN.
|
||||
*/
|
||||
allowRequest(): boolean {
|
||||
if (this.state === 'CLOSED') return true;
|
||||
|
||||
if (this.state === 'OPEN') {
|
||||
// Check if reset timeout has elapsed
|
||||
if (Date.now() - this.lastFailureTime >= this.resetTimeoutMs) {
|
||||
this.state = 'HALF_OPEN';
|
||||
return true; // Allow one probe request
|
||||
}
|
||||
return false; // Still open, fail fast
|
||||
}
|
||||
|
||||
// HALF_OPEN: allow the probe
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Record a successful call — resets the breaker to CLOSED.
|
||||
*/
|
||||
recordSuccess(): void {
|
||||
this.failureCount = 0;
|
||||
this.state = 'CLOSED';
|
||||
}
|
||||
|
||||
/**
|
||||
* Record a failure — may trip the breaker to OPEN.
|
||||
*/
|
||||
recordFailure(): void {
|
||||
this.failureCount++;
|
||||
this.lastFailureTime = Date.now();
|
||||
|
||||
if (this.failureCount >= this.failureThreshold) {
|
||||
this.state = 'OPEN';
|
||||
}
|
||||
}
|
||||
|
||||
get currentState(): State {
|
||||
return this.state;
|
||||
}
|
||||
|
||||
get stats(): { state: State; failureCount: number; threshold: number; resetMs: number } {
|
||||
return {
|
||||
state: this.state,
|
||||
failureCount: this.failureCount,
|
||||
threshold: this.failureThreshold,
|
||||
resetMs: this.resetTimeoutMs,
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
// Module-level singleton for the sidecar circuit breaker
|
||||
export const sidecarBreaker = new CircuitBreaker({
|
||||
failureThreshold: 5,
|
||||
resetTimeoutMs: 30_000,
|
||||
});
|
||||
107
services/extraction-service/src/lib/metrics.ts
Normal file
107
services/extraction-service/src/lib/metrics.ts
Normal file
@ -0,0 +1,107 @@
|
||||
/**
|
||||
* Prometheus metrics for extraction-service.
|
||||
*
|
||||
* Exposed via GET /metrics (auto-registered by fastify-metrics).
|
||||
* Custom counters/histograms for extraction-specific telemetry.
|
||||
*/
|
||||
|
||||
// ── In-memory counters (fastify-metrics handles HTTP metrics) ────
|
||||
// These are simple counters since prom-client may not be directly available.
|
||||
// They're exposed via the /extract/metrics endpoint.
|
||||
|
||||
interface MetricBucket {
|
||||
labels: Record<string, string>;
|
||||
value: number;
|
||||
}
|
||||
|
||||
class Counter {
|
||||
private _buckets = new Map<string, MetricBucket>();
|
||||
constructor(public readonly name: string) {}
|
||||
|
||||
inc(labels: Record<string, string>, amount = 1): void {
|
||||
const key = JSON.stringify(labels);
|
||||
const existing = this._buckets.get(key);
|
||||
if (existing) {
|
||||
existing.value += amount;
|
||||
} else {
|
||||
this._buckets.set(key, { labels, value: amount });
|
||||
}
|
||||
}
|
||||
|
||||
toJSON(): Array<{ labels: Record<string, string>; value: number }> {
|
||||
return [...this._buckets.values()];
|
||||
}
|
||||
}
|
||||
|
||||
class Histogram {
|
||||
private _observations: Array<{ labels: Record<string, string>; value: number }> = [];
|
||||
private _sum = 0;
|
||||
private _count = 0;
|
||||
constructor(public readonly name: string) {}
|
||||
|
||||
observe(labels: Record<string, string>, value: number): void {
|
||||
this._observations.push({ labels, value });
|
||||
this._sum += value;
|
||||
this._count++;
|
||||
}
|
||||
|
||||
toJSON(): { count: number; sum: number; avg: number } {
|
||||
return {
|
||||
count: this._count,
|
||||
sum: Math.round(this._sum * 100) / 100,
|
||||
avg: this._count > 0 ? Math.round((this._sum / this._count) * 100) / 100 : 0,
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
// ── Exported metrics ─────────────────────────────────────────────
|
||||
|
||||
export const extractionRequestsTotal = new Counter('extraction_requests_total');
|
||||
export const extractionDurationSeconds = new Histogram('extraction_duration_seconds');
|
||||
export const extractionEntitiesExtracted = new Histogram('extraction_entities_extracted');
|
||||
export const extractionCacheHitTotal = new Counter('extraction_cache_hit_total');
|
||||
|
||||
/**
|
||||
* Record an extraction event with all metric dimensions.
|
||||
*/
|
||||
export function recordExtraction(params: {
|
||||
taskId?: string;
|
||||
modelId?: string;
|
||||
productId?: string;
|
||||
status: 'success' | 'error' | 'cache_hit';
|
||||
durationMs?: number;
|
||||
entityCount?: number;
|
||||
}): void {
|
||||
const labels = {
|
||||
task_id: params.taskId || 'unknown',
|
||||
model_id: params.modelId || 'unknown',
|
||||
product_id: params.productId || 'unknown',
|
||||
status: params.status,
|
||||
};
|
||||
|
||||
extractionRequestsTotal.inc(labels);
|
||||
|
||||
if (params.status === 'cache_hit') {
|
||||
extractionCacheHitTotal.inc(labels);
|
||||
}
|
||||
|
||||
if (params.durationMs !== undefined) {
|
||||
extractionDurationSeconds.observe(labels, params.durationMs / 1000);
|
||||
}
|
||||
|
||||
if (params.entityCount !== undefined) {
|
||||
extractionEntitiesExtracted.observe(labels, params.entityCount);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get all metrics as a JSON summary.
|
||||
*/
|
||||
export function getMetricsSummary(): Record<string, unknown> {
|
||||
return {
|
||||
extraction_requests_total: extractionRequestsTotal.toJSON(),
|
||||
extraction_duration_seconds: extractionDurationSeconds.toJSON(),
|
||||
extraction_entities_extracted: extractionEntitiesExtracted.toJSON(),
|
||||
extraction_cache_hit_total: extractionCacheHitTotal.toJSON(),
|
||||
};
|
||||
}
|
||||
@ -11,6 +11,8 @@ import {
|
||||
} from '../../lib/python-bridge.js';
|
||||
import { BadRequestError } from '../../lib/errors.js';
|
||||
import { checkQuota, incrementUsage, getUsageSummary } from './usage.js';
|
||||
import { recordExtraction, getMetricsSummary } from '../../lib/metrics.js';
|
||||
import { sidecarBreaker } from '../../lib/circuit-breaker.js';
|
||||
|
||||
// ── In-memory LRU cache ────────────────────────────────────────
|
||||
const CACHE_TTL_MS = parseInt(process.env.EXTRACTION_CACHE_TTL_MS || '86400000', 10); // 24h
|
||||
@ -97,12 +99,28 @@ export async function extractRoutes(app: FastifyInstance) {
|
||||
}
|
||||
}
|
||||
|
||||
req.log.info({ taskId, modelId, textLength: text.length }, 'extraction request');
|
||||
req.log.info(
|
||||
{
|
||||
taskId,
|
||||
modelId,
|
||||
textLength: text.length,
|
||||
userId,
|
||||
userPlan,
|
||||
productId: req.headers['x-product-id'],
|
||||
},
|
||||
'extraction request'
|
||||
);
|
||||
|
||||
// Check cache
|
||||
const cached = cacheGet(text, taskId, modelId);
|
||||
if (cached) {
|
||||
req.log.info({ taskId }, 'cache hit');
|
||||
req.log.info({ taskId, userId, cacheHit: true }, 'cache hit');
|
||||
recordExtraction({
|
||||
taskId,
|
||||
modelId,
|
||||
productId: req.headers['x-product-id'] as string,
|
||||
status: 'cache_hit',
|
||||
});
|
||||
reply.header('X-Extraction-Cache', 'HIT');
|
||||
return reply.send({
|
||||
extractions: cached.extractions,
|
||||
@ -118,32 +136,90 @@ export async function extractRoutes(app: FastifyInstance) {
|
||||
|
||||
reply.header('X-Extraction-Cache', 'MISS');
|
||||
|
||||
const result = await sidecarExtract(
|
||||
{
|
||||
text,
|
||||
task_id: taskId,
|
||||
task_prompt: taskPrompt,
|
||||
examples: examples?.map(e => ({
|
||||
text: e.text,
|
||||
extractions: e.extractions.map(ex => ({
|
||||
extraction_class: ex.extraction_class,
|
||||
extraction_text: ex.extraction_text,
|
||||
attributes: ex.attributes,
|
||||
// Circuit breaker — fail fast if sidecar is down
|
||||
if (!sidecarBreaker.allowRequest()) {
|
||||
recordExtraction({ taskId, modelId, status: 'error' });
|
||||
return reply.status(503).send({
|
||||
error: 'Extraction service temporarily unavailable (circuit open)',
|
||||
circuitState: sidecarBreaker.currentState,
|
||||
});
|
||||
}
|
||||
|
||||
let result;
|
||||
try {
|
||||
result = await sidecarExtract(
|
||||
{
|
||||
text,
|
||||
task_id: taskId,
|
||||
task_prompt: taskPrompt,
|
||||
examples: examples?.map(e => ({
|
||||
text: e.text,
|
||||
extractions: e.extractions.map(ex => ({
|
||||
extraction_class: ex.extraction_class,
|
||||
extraction_text: ex.extraction_text,
|
||||
attributes: ex.attributes,
|
||||
})),
|
||||
})),
|
||||
})),
|
||||
model_id: modelId,
|
||||
extraction_passes: options?.extractionPasses,
|
||||
max_workers: options?.maxWorkers,
|
||||
max_char_buffer: options?.maxCharBuffer,
|
||||
},
|
||||
requestId
|
||||
);
|
||||
model_id: modelId,
|
||||
extraction_passes: options?.extractionPasses,
|
||||
max_workers: options?.maxWorkers,
|
||||
max_char_buffer: options?.maxCharBuffer,
|
||||
},
|
||||
requestId
|
||||
);
|
||||
sidecarBreaker.recordSuccess();
|
||||
} catch (err) {
|
||||
sidecarBreaker.recordFailure();
|
||||
const message = err instanceof Error ? err.message : 'Sidecar error';
|
||||
req.log.error({ error: message, taskId, userId }, 'extraction failed');
|
||||
recordExtraction({
|
||||
taskId,
|
||||
modelId,
|
||||
productId: req.headers['x-product-id'] as string,
|
||||
status: 'error',
|
||||
});
|
||||
|
||||
// Map sidecar errors to proper HTTP status codes (5.10)
|
||||
if (message.includes('timeout') || message.includes('Timeout')) {
|
||||
return reply.status(408).send({ error: 'Extraction timed out', detail: message });
|
||||
}
|
||||
if (message.includes('429') || message.includes('rate limit')) {
|
||||
return reply
|
||||
.status(429)
|
||||
.send({ error: 'Upstream LLM rate limit', detail: message, retryAfter: 60 });
|
||||
}
|
||||
if (message.includes('Model') && message.includes('unavailable')) {
|
||||
return reply.status(503).send({ error: 'Model unavailable', detail: message });
|
||||
}
|
||||
if (message.includes('400') || message.includes('Invalid')) {
|
||||
return reply.status(400).send({ error: 'Invalid extraction request', detail: message });
|
||||
}
|
||||
return reply.status(502).send({ error: 'Extraction sidecar error', detail: message });
|
||||
}
|
||||
|
||||
cachePut(text, taskId, modelId, result);
|
||||
if (userId) incrementUsage(userId, userPlan);
|
||||
recordExtraction({
|
||||
taskId,
|
||||
modelId,
|
||||
productId: req.headers['x-product-id'] as string,
|
||||
status: 'success',
|
||||
durationMs: result.metadata.duration_ms,
|
||||
entityCount: result.extractions.length,
|
||||
});
|
||||
|
||||
req.log.info(
|
||||
{ entityCount: result.extractions.length, durationMs: result.metadata.duration_ms },
|
||||
{
|
||||
taskId,
|
||||
modelId,
|
||||
entityCount: result.extractions.length,
|
||||
durationMs: result.metadata.duration_ms,
|
||||
tokenCount: result.metadata.token_count,
|
||||
charCount: result.metadata.char_count,
|
||||
userId,
|
||||
productId: req.headers['x-product-id'],
|
||||
cacheHit: false,
|
||||
},
|
||||
'extraction complete'
|
||||
);
|
||||
|
||||
@ -229,6 +305,13 @@ export async function extractRoutes(app: FastifyInstance) {
|
||||
}
|
||||
});
|
||||
|
||||
/**
|
||||
* GET /extract/metrics — Extraction metrics summary.
|
||||
*/
|
||||
app.get('/extract/metrics', async (_req, reply) => {
|
||||
return reply.send(getMetricsSummary());
|
||||
});
|
||||
|
||||
/**
|
||||
* GET /extract/usage — Per-user extraction usage (admin).
|
||||
*/
|
||||
|
||||
@ -0,0 +1,95 @@
|
||||
{
|
||||
"annotations": { "list": [] },
|
||||
"editable": true,
|
||||
"fiscalYearStartMonth": 0,
|
||||
"graphTooltip": 0,
|
||||
"id": null,
|
||||
"links": [],
|
||||
"panels": [
|
||||
{
|
||||
"title": "Extraction Requests (Loki logs)",
|
||||
"type": "timeseries",
|
||||
"gridPos": { "h": 8, "w": 12, "x": 0, "y": 0 },
|
||||
"targets": [
|
||||
{
|
||||
"datasource": { "type": "loki", "uid": "loki" },
|
||||
"expr": "count_over_time({container=\"extraction-service\"} |= \"extraction complete\" [5m])",
|
||||
"legendFormat": "Extractions / 5m"
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"title": "Cache Hits vs Misses",
|
||||
"type": "timeseries",
|
||||
"gridPos": { "h": 8, "w": 12, "x": 12, "y": 0 },
|
||||
"targets": [
|
||||
{
|
||||
"datasource": { "type": "loki", "uid": "loki" },
|
||||
"expr": "count_over_time({container=\"extraction-service\"} |= \"cache hit\" [5m])",
|
||||
"legendFormat": "Cache Hits / 5m"
|
||||
},
|
||||
{
|
||||
"datasource": { "type": "loki", "uid": "loki" },
|
||||
"expr": "count_over_time({container=\"extraction-service\"} |= \"extraction complete\" |= \"cacheHit\":false [5m])",
|
||||
"legendFormat": "Cache Misses / 5m"
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"title": "Errors",
|
||||
"type": "timeseries",
|
||||
"gridPos": { "h": 8, "w": 12, "x": 0, "y": 8 },
|
||||
"targets": [
|
||||
{
|
||||
"datasource": { "type": "loki", "uid": "loki" },
|
||||
"expr": "count_over_time({container=\"extraction-service\"} |= \"error\" [5m])",
|
||||
"legendFormat": "Errors / 5m"
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"title": "Quota Exceeded (429s)",
|
||||
"type": "stat",
|
||||
"gridPos": { "h": 8, "w": 12, "x": 12, "y": 8 },
|
||||
"targets": [
|
||||
{
|
||||
"datasource": { "type": "loki", "uid": "loki" },
|
||||
"expr": "count_over_time({container=\"extraction-service\"} |= \"quota exceeded\" [24h])",
|
||||
"legendFormat": "429s Today"
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"title": "Python Sidecar Health",
|
||||
"type": "stat",
|
||||
"gridPos": { "h": 4, "w": 6, "x": 0, "y": 16 },
|
||||
"targets": [
|
||||
{
|
||||
"datasource": { "type": "loki", "uid": "loki" },
|
||||
"expr": "count_over_time({container=\"extraction-service\"} |= \"Sidecar unavailable\" [1h])",
|
||||
"legendFormat": "Sidecar Errors (1h)"
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"title": "Extraction Logs",
|
||||
"type": "logs",
|
||||
"gridPos": { "h": 10, "w": 24, "x": 0, "y": 20 },
|
||||
"targets": [
|
||||
{
|
||||
"datasource": { "type": "loki", "uid": "loki" },
|
||||
"expr": "{container=\"extraction-service\"}"
|
||||
}
|
||||
]
|
||||
}
|
||||
],
|
||||
"schemaVersion": 39,
|
||||
"tags": ["extraction", "lysnrai"],
|
||||
"templating": { "list": [] },
|
||||
"time": { "from": "now-6h", "to": "now" },
|
||||
"timepicker": {},
|
||||
"timezone": "",
|
||||
"title": "Extraction Service",
|
||||
"uid": "extraction-service",
|
||||
"version": 1
|
||||
}
|
||||
Loading…
Reference in New Issue
Block a user