feat(extraction): Phase 5 observability + error handling (5.7-5.12)
- 5.7: Enhanced structured logging with userId, productId, cacheHit, tokenCount - 5.8: Metrics module (counters + histograms) + /extract/metrics endpoint - 5.9: Grafana dashboard config for extraction-service (Loki queries) - 5.10: Error mapping — sidecar errors → proper HTTP status codes (408, 429, 502, 503) - 5.11: Circuit breaker for Python sidecar (5 failures → 30s OPEN) - 5.12: Graceful degradation — circuit open returns 503, cached results still served - 46 TS tests passing
This commit is contained in:
parent
9c8a3169dc
commit
b8c0a73e89
85
services/extraction-service/src/lib/circuit-breaker.ts
Normal file
85
services/extraction-service/src/lib/circuit-breaker.ts
Normal file
@ -0,0 +1,85 @@
|
|||||||
|
/**
|
||||||
|
* Simple circuit breaker for the Python sidecar.
|
||||||
|
*
|
||||||
|
* States: CLOSED (normal) → OPEN (fail fast) → HALF_OPEN (probe)
|
||||||
|
* Opens after `failureThreshold` consecutive failures.
|
||||||
|
* Resets after `resetTimeoutMs` in OPEN state.
|
||||||
|
*/
|
||||||
|
|
||||||
|
type State = 'CLOSED' | 'OPEN' | 'HALF_OPEN';
|
||||||
|
|
||||||
|
export interface CircuitBreakerOptions {
|
||||||
|
failureThreshold?: number;
|
||||||
|
resetTimeoutMs?: number;
|
||||||
|
}
|
||||||
|
|
||||||
|
export class CircuitBreaker {
|
||||||
|
private state: State = 'CLOSED';
|
||||||
|
private failureCount = 0;
|
||||||
|
private lastFailureTime = 0;
|
||||||
|
private readonly failureThreshold: number;
|
||||||
|
private readonly resetTimeoutMs: number;
|
||||||
|
|
||||||
|
constructor(opts: CircuitBreakerOptions = {}) {
|
||||||
|
this.failureThreshold = opts.failureThreshold ?? 5;
|
||||||
|
this.resetTimeoutMs = opts.resetTimeoutMs ?? 30_000; // 30s
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Check if request is allowed. Throws if circuit is OPEN.
|
||||||
|
*/
|
||||||
|
allowRequest(): boolean {
|
||||||
|
if (this.state === 'CLOSED') return true;
|
||||||
|
|
||||||
|
if (this.state === 'OPEN') {
|
||||||
|
// Check if reset timeout has elapsed
|
||||||
|
if (Date.now() - this.lastFailureTime >= this.resetTimeoutMs) {
|
||||||
|
this.state = 'HALF_OPEN';
|
||||||
|
return true; // Allow one probe request
|
||||||
|
}
|
||||||
|
return false; // Still open, fail fast
|
||||||
|
}
|
||||||
|
|
||||||
|
// HALF_OPEN: allow the probe
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Record a successful call — resets the breaker to CLOSED.
|
||||||
|
*/
|
||||||
|
recordSuccess(): void {
|
||||||
|
this.failureCount = 0;
|
||||||
|
this.state = 'CLOSED';
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Record a failure — may trip the breaker to OPEN.
|
||||||
|
*/
|
||||||
|
recordFailure(): void {
|
||||||
|
this.failureCount++;
|
||||||
|
this.lastFailureTime = Date.now();
|
||||||
|
|
||||||
|
if (this.failureCount >= this.failureThreshold) {
|
||||||
|
this.state = 'OPEN';
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
get currentState(): State {
|
||||||
|
return this.state;
|
||||||
|
}
|
||||||
|
|
||||||
|
get stats(): { state: State; failureCount: number; threshold: number; resetMs: number } {
|
||||||
|
return {
|
||||||
|
state: this.state,
|
||||||
|
failureCount: this.failureCount,
|
||||||
|
threshold: this.failureThreshold,
|
||||||
|
resetMs: this.resetTimeoutMs,
|
||||||
|
};
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Module-level singleton for the sidecar circuit breaker
|
||||||
|
export const sidecarBreaker = new CircuitBreaker({
|
||||||
|
failureThreshold: 5,
|
||||||
|
resetTimeoutMs: 30_000,
|
||||||
|
});
|
||||||
107
services/extraction-service/src/lib/metrics.ts
Normal file
107
services/extraction-service/src/lib/metrics.ts
Normal file
@ -0,0 +1,107 @@
|
|||||||
|
/**
|
||||||
|
* Prometheus metrics for extraction-service.
|
||||||
|
*
|
||||||
|
* Exposed via GET /metrics (auto-registered by fastify-metrics).
|
||||||
|
* Custom counters/histograms for extraction-specific telemetry.
|
||||||
|
*/
|
||||||
|
|
||||||
|
// ── In-memory counters (fastify-metrics handles HTTP metrics) ────
|
||||||
|
// These are simple counters since prom-client may not be directly available.
|
||||||
|
// They're exposed via the /extract/metrics endpoint.
|
||||||
|
|
||||||
|
interface MetricBucket {
|
||||||
|
labels: Record<string, string>;
|
||||||
|
value: number;
|
||||||
|
}
|
||||||
|
|
||||||
|
class Counter {
|
||||||
|
private _buckets = new Map<string, MetricBucket>();
|
||||||
|
constructor(public readonly name: string) {}
|
||||||
|
|
||||||
|
inc(labels: Record<string, string>, amount = 1): void {
|
||||||
|
const key = JSON.stringify(labels);
|
||||||
|
const existing = this._buckets.get(key);
|
||||||
|
if (existing) {
|
||||||
|
existing.value += amount;
|
||||||
|
} else {
|
||||||
|
this._buckets.set(key, { labels, value: amount });
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
toJSON(): Array<{ labels: Record<string, string>; value: number }> {
|
||||||
|
return [...this._buckets.values()];
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
class Histogram {
|
||||||
|
private _observations: Array<{ labels: Record<string, string>; value: number }> = [];
|
||||||
|
private _sum = 0;
|
||||||
|
private _count = 0;
|
||||||
|
constructor(public readonly name: string) {}
|
||||||
|
|
||||||
|
observe(labels: Record<string, string>, value: number): void {
|
||||||
|
this._observations.push({ labels, value });
|
||||||
|
this._sum += value;
|
||||||
|
this._count++;
|
||||||
|
}
|
||||||
|
|
||||||
|
toJSON(): { count: number; sum: number; avg: number } {
|
||||||
|
return {
|
||||||
|
count: this._count,
|
||||||
|
sum: Math.round(this._sum * 100) / 100,
|
||||||
|
avg: this._count > 0 ? Math.round((this._sum / this._count) * 100) / 100 : 0,
|
||||||
|
};
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// ── Exported metrics ─────────────────────────────────────────────
|
||||||
|
|
||||||
|
export const extractionRequestsTotal = new Counter('extraction_requests_total');
|
||||||
|
export const extractionDurationSeconds = new Histogram('extraction_duration_seconds');
|
||||||
|
export const extractionEntitiesExtracted = new Histogram('extraction_entities_extracted');
|
||||||
|
export const extractionCacheHitTotal = new Counter('extraction_cache_hit_total');
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Record an extraction event with all metric dimensions.
|
||||||
|
*/
|
||||||
|
export function recordExtraction(params: {
|
||||||
|
taskId?: string;
|
||||||
|
modelId?: string;
|
||||||
|
productId?: string;
|
||||||
|
status: 'success' | 'error' | 'cache_hit';
|
||||||
|
durationMs?: number;
|
||||||
|
entityCount?: number;
|
||||||
|
}): void {
|
||||||
|
const labels = {
|
||||||
|
task_id: params.taskId || 'unknown',
|
||||||
|
model_id: params.modelId || 'unknown',
|
||||||
|
product_id: params.productId || 'unknown',
|
||||||
|
status: params.status,
|
||||||
|
};
|
||||||
|
|
||||||
|
extractionRequestsTotal.inc(labels);
|
||||||
|
|
||||||
|
if (params.status === 'cache_hit') {
|
||||||
|
extractionCacheHitTotal.inc(labels);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (params.durationMs !== undefined) {
|
||||||
|
extractionDurationSeconds.observe(labels, params.durationMs / 1000);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (params.entityCount !== undefined) {
|
||||||
|
extractionEntitiesExtracted.observe(labels, params.entityCount);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get all metrics as a JSON summary.
|
||||||
|
*/
|
||||||
|
export function getMetricsSummary(): Record<string, unknown> {
|
||||||
|
return {
|
||||||
|
extraction_requests_total: extractionRequestsTotal.toJSON(),
|
||||||
|
extraction_duration_seconds: extractionDurationSeconds.toJSON(),
|
||||||
|
extraction_entities_extracted: extractionEntitiesExtracted.toJSON(),
|
||||||
|
extraction_cache_hit_total: extractionCacheHitTotal.toJSON(),
|
||||||
|
};
|
||||||
|
}
|
||||||
@ -11,6 +11,8 @@ import {
|
|||||||
} from '../../lib/python-bridge.js';
|
} from '../../lib/python-bridge.js';
|
||||||
import { BadRequestError } from '../../lib/errors.js';
|
import { BadRequestError } from '../../lib/errors.js';
|
||||||
import { checkQuota, incrementUsage, getUsageSummary } from './usage.js';
|
import { checkQuota, incrementUsage, getUsageSummary } from './usage.js';
|
||||||
|
import { recordExtraction, getMetricsSummary } from '../../lib/metrics.js';
|
||||||
|
import { sidecarBreaker } from '../../lib/circuit-breaker.js';
|
||||||
|
|
||||||
// ── In-memory LRU cache ────────────────────────────────────────
|
// ── In-memory LRU cache ────────────────────────────────────────
|
||||||
const CACHE_TTL_MS = parseInt(process.env.EXTRACTION_CACHE_TTL_MS || '86400000', 10); // 24h
|
const CACHE_TTL_MS = parseInt(process.env.EXTRACTION_CACHE_TTL_MS || '86400000', 10); // 24h
|
||||||
@ -97,12 +99,28 @@ export async function extractRoutes(app: FastifyInstance) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
req.log.info({ taskId, modelId, textLength: text.length }, 'extraction request');
|
req.log.info(
|
||||||
|
{
|
||||||
|
taskId,
|
||||||
|
modelId,
|
||||||
|
textLength: text.length,
|
||||||
|
userId,
|
||||||
|
userPlan,
|
||||||
|
productId: req.headers['x-product-id'],
|
||||||
|
},
|
||||||
|
'extraction request'
|
||||||
|
);
|
||||||
|
|
||||||
// Check cache
|
// Check cache
|
||||||
const cached = cacheGet(text, taskId, modelId);
|
const cached = cacheGet(text, taskId, modelId);
|
||||||
if (cached) {
|
if (cached) {
|
||||||
req.log.info({ taskId }, 'cache hit');
|
req.log.info({ taskId, userId, cacheHit: true }, 'cache hit');
|
||||||
|
recordExtraction({
|
||||||
|
taskId,
|
||||||
|
modelId,
|
||||||
|
productId: req.headers['x-product-id'] as string,
|
||||||
|
status: 'cache_hit',
|
||||||
|
});
|
||||||
reply.header('X-Extraction-Cache', 'HIT');
|
reply.header('X-Extraction-Cache', 'HIT');
|
||||||
return reply.send({
|
return reply.send({
|
||||||
extractions: cached.extractions,
|
extractions: cached.extractions,
|
||||||
@ -118,32 +136,90 @@ export async function extractRoutes(app: FastifyInstance) {
|
|||||||
|
|
||||||
reply.header('X-Extraction-Cache', 'MISS');
|
reply.header('X-Extraction-Cache', 'MISS');
|
||||||
|
|
||||||
const result = await sidecarExtract(
|
// Circuit breaker — fail fast if sidecar is down
|
||||||
{
|
if (!sidecarBreaker.allowRequest()) {
|
||||||
text,
|
recordExtraction({ taskId, modelId, status: 'error' });
|
||||||
task_id: taskId,
|
return reply.status(503).send({
|
||||||
task_prompt: taskPrompt,
|
error: 'Extraction service temporarily unavailable (circuit open)',
|
||||||
examples: examples?.map(e => ({
|
circuitState: sidecarBreaker.currentState,
|
||||||
text: e.text,
|
});
|
||||||
extractions: e.extractions.map(ex => ({
|
}
|
||||||
extraction_class: ex.extraction_class,
|
|
||||||
extraction_text: ex.extraction_text,
|
let result;
|
||||||
attributes: ex.attributes,
|
try {
|
||||||
|
result = await sidecarExtract(
|
||||||
|
{
|
||||||
|
text,
|
||||||
|
task_id: taskId,
|
||||||
|
task_prompt: taskPrompt,
|
||||||
|
examples: examples?.map(e => ({
|
||||||
|
text: e.text,
|
||||||
|
extractions: e.extractions.map(ex => ({
|
||||||
|
extraction_class: ex.extraction_class,
|
||||||
|
extraction_text: ex.extraction_text,
|
||||||
|
attributes: ex.attributes,
|
||||||
|
})),
|
||||||
})),
|
})),
|
||||||
})),
|
model_id: modelId,
|
||||||
model_id: modelId,
|
extraction_passes: options?.extractionPasses,
|
||||||
extraction_passes: options?.extractionPasses,
|
max_workers: options?.maxWorkers,
|
||||||
max_workers: options?.maxWorkers,
|
max_char_buffer: options?.maxCharBuffer,
|
||||||
max_char_buffer: options?.maxCharBuffer,
|
},
|
||||||
},
|
requestId
|
||||||
requestId
|
);
|
||||||
);
|
sidecarBreaker.recordSuccess();
|
||||||
|
} catch (err) {
|
||||||
|
sidecarBreaker.recordFailure();
|
||||||
|
const message = err instanceof Error ? err.message : 'Sidecar error';
|
||||||
|
req.log.error({ error: message, taskId, userId }, 'extraction failed');
|
||||||
|
recordExtraction({
|
||||||
|
taskId,
|
||||||
|
modelId,
|
||||||
|
productId: req.headers['x-product-id'] as string,
|
||||||
|
status: 'error',
|
||||||
|
});
|
||||||
|
|
||||||
|
// Map sidecar errors to proper HTTP status codes (5.10)
|
||||||
|
if (message.includes('timeout') || message.includes('Timeout')) {
|
||||||
|
return reply.status(408).send({ error: 'Extraction timed out', detail: message });
|
||||||
|
}
|
||||||
|
if (message.includes('429') || message.includes('rate limit')) {
|
||||||
|
return reply
|
||||||
|
.status(429)
|
||||||
|
.send({ error: 'Upstream LLM rate limit', detail: message, retryAfter: 60 });
|
||||||
|
}
|
||||||
|
if (message.includes('Model') && message.includes('unavailable')) {
|
||||||
|
return reply.status(503).send({ error: 'Model unavailable', detail: message });
|
||||||
|
}
|
||||||
|
if (message.includes('400') || message.includes('Invalid')) {
|
||||||
|
return reply.status(400).send({ error: 'Invalid extraction request', detail: message });
|
||||||
|
}
|
||||||
|
return reply.status(502).send({ error: 'Extraction sidecar error', detail: message });
|
||||||
|
}
|
||||||
|
|
||||||
cachePut(text, taskId, modelId, result);
|
cachePut(text, taskId, modelId, result);
|
||||||
if (userId) incrementUsage(userId, userPlan);
|
if (userId) incrementUsage(userId, userPlan);
|
||||||
|
recordExtraction({
|
||||||
|
taskId,
|
||||||
|
modelId,
|
||||||
|
productId: req.headers['x-product-id'] as string,
|
||||||
|
status: 'success',
|
||||||
|
durationMs: result.metadata.duration_ms,
|
||||||
|
entityCount: result.extractions.length,
|
||||||
|
});
|
||||||
|
|
||||||
req.log.info(
|
req.log.info(
|
||||||
{ entityCount: result.extractions.length, durationMs: result.metadata.duration_ms },
|
{
|
||||||
|
taskId,
|
||||||
|
modelId,
|
||||||
|
entityCount: result.extractions.length,
|
||||||
|
durationMs: result.metadata.duration_ms,
|
||||||
|
tokenCount: result.metadata.token_count,
|
||||||
|
charCount: result.metadata.char_count,
|
||||||
|
userId,
|
||||||
|
productId: req.headers['x-product-id'],
|
||||||
|
cacheHit: false,
|
||||||
|
},
|
||||||
'extraction complete'
|
'extraction complete'
|
||||||
);
|
);
|
||||||
|
|
||||||
@ -229,6 +305,13 @@ export async function extractRoutes(app: FastifyInstance) {
|
|||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
|
/**
|
||||||
|
* GET /extract/metrics — Extraction metrics summary.
|
||||||
|
*/
|
||||||
|
app.get('/extract/metrics', async (_req, reply) => {
|
||||||
|
return reply.send(getMetricsSummary());
|
||||||
|
});
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* GET /extract/usage — Per-user extraction usage (admin).
|
* GET /extract/usage — Per-user extraction usage (admin).
|
||||||
*/
|
*/
|
||||||
|
|||||||
@ -0,0 +1,95 @@
|
|||||||
|
{
|
||||||
|
"annotations": { "list": [] },
|
||||||
|
"editable": true,
|
||||||
|
"fiscalYearStartMonth": 0,
|
||||||
|
"graphTooltip": 0,
|
||||||
|
"id": null,
|
||||||
|
"links": [],
|
||||||
|
"panels": [
|
||||||
|
{
|
||||||
|
"title": "Extraction Requests (Loki logs)",
|
||||||
|
"type": "timeseries",
|
||||||
|
"gridPos": { "h": 8, "w": 12, "x": 0, "y": 0 },
|
||||||
|
"targets": [
|
||||||
|
{
|
||||||
|
"datasource": { "type": "loki", "uid": "loki" },
|
||||||
|
"expr": "count_over_time({container=\"extraction-service\"} |= \"extraction complete\" [5m])",
|
||||||
|
"legendFormat": "Extractions / 5m"
|
||||||
|
}
|
||||||
|
]
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"title": "Cache Hits vs Misses",
|
||||||
|
"type": "timeseries",
|
||||||
|
"gridPos": { "h": 8, "w": 12, "x": 12, "y": 0 },
|
||||||
|
"targets": [
|
||||||
|
{
|
||||||
|
"datasource": { "type": "loki", "uid": "loki" },
|
||||||
|
"expr": "count_over_time({container=\"extraction-service\"} |= \"cache hit\" [5m])",
|
||||||
|
"legendFormat": "Cache Hits / 5m"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"datasource": { "type": "loki", "uid": "loki" },
|
||||||
|
"expr": "count_over_time({container=\"extraction-service\"} |= \"extraction complete\" |= \"cacheHit\":false [5m])",
|
||||||
|
"legendFormat": "Cache Misses / 5m"
|
||||||
|
}
|
||||||
|
]
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"title": "Errors",
|
||||||
|
"type": "timeseries",
|
||||||
|
"gridPos": { "h": 8, "w": 12, "x": 0, "y": 8 },
|
||||||
|
"targets": [
|
||||||
|
{
|
||||||
|
"datasource": { "type": "loki", "uid": "loki" },
|
||||||
|
"expr": "count_over_time({container=\"extraction-service\"} |= \"error\" [5m])",
|
||||||
|
"legendFormat": "Errors / 5m"
|
||||||
|
}
|
||||||
|
]
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"title": "Quota Exceeded (429s)",
|
||||||
|
"type": "stat",
|
||||||
|
"gridPos": { "h": 8, "w": 12, "x": 12, "y": 8 },
|
||||||
|
"targets": [
|
||||||
|
{
|
||||||
|
"datasource": { "type": "loki", "uid": "loki" },
|
||||||
|
"expr": "count_over_time({container=\"extraction-service\"} |= \"quota exceeded\" [24h])",
|
||||||
|
"legendFormat": "429s Today"
|
||||||
|
}
|
||||||
|
]
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"title": "Python Sidecar Health",
|
||||||
|
"type": "stat",
|
||||||
|
"gridPos": { "h": 4, "w": 6, "x": 0, "y": 16 },
|
||||||
|
"targets": [
|
||||||
|
{
|
||||||
|
"datasource": { "type": "loki", "uid": "loki" },
|
||||||
|
"expr": "count_over_time({container=\"extraction-service\"} |= \"Sidecar unavailable\" [1h])",
|
||||||
|
"legendFormat": "Sidecar Errors (1h)"
|
||||||
|
}
|
||||||
|
]
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"title": "Extraction Logs",
|
||||||
|
"type": "logs",
|
||||||
|
"gridPos": { "h": 10, "w": 24, "x": 0, "y": 20 },
|
||||||
|
"targets": [
|
||||||
|
{
|
||||||
|
"datasource": { "type": "loki", "uid": "loki" },
|
||||||
|
"expr": "{container=\"extraction-service\"}"
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"schemaVersion": 39,
|
||||||
|
"tags": ["extraction", "lysnrai"],
|
||||||
|
"templating": { "list": [] },
|
||||||
|
"time": { "from": "now-6h", "to": "now" },
|
||||||
|
"timepicker": {},
|
||||||
|
"timezone": "",
|
||||||
|
"title": "Extraction Service",
|
||||||
|
"uid": "extraction-service",
|
||||||
|
"version": 1
|
||||||
|
}
|
||||||
Loading…
Reference in New Issue
Block a user