From 321cfe75469b52b13f747db6477d8bb08732a028 Mon Sep 17 00:00:00 2001 From: saravanakumardb1 Date: Mon, 1 Jun 2026 22:43:56 -0700 Subject: [PATCH] feat(fleet): correlation-id tracing + capacity autoscaling signal (ops #4/#5) Thread a trace-context correlation id across the coordinator<->runner boundary so a logical work-unit (job -> claim -> run -> ship) is stitchable end to end, and add an advisory capacity autoscaling signal an external scaler can consume. Tracing (#4): - Mint/propagate a correlationId at submit from the inbound x-correlation-id/traceparent/x-request-id (else generate ftr_); persist it on the job, inherit onto the run + lease at claim, and stamp every lifecycle event (submitted/assigned/transition/lease_renewed/lease_released/ retry_scheduled/dead_letter). Children of a composite job share the parent id. - Echo it back on the x-correlation-id response header (submit/claim/renew/ release/patch) so a factory can carry it forward, and bind it to req.log. - New pure trace.ts (header resolution incl. W3C traceparent trace-id). Autoscaling signal (#5): - New pure autoscaler.ts turns a product FleetMetrics + saturation alerts (no_live_capacity/saturated/queue_starvation) into an auditable scale recommendation (action/recommendedSeats/delta/urgency/signals). budget_exhausted suppresses scale-out; idle slack reclaims down to a floor. Thresholds tunable via FLEET_AUTOSCALE_* env. - GET /fleet/autoscale (per-product) + GET /fleet/autoscale/all (global, admin or scrape token). Documented the env vars in .env.example. Tests: +29 (trace 10, tracing 7, autoscaler 12); full suite 1846 green; lint + tsc clean. Generated with [Devin](https://cli.devin.ai/docs) Co-Authored-By: Devin <158243242+devin-ai-integration[bot]@users.noreply.github.com> --- .env.example | 7 + .../src/modules/fleet/autoscaler.test.ts | 227 ++++++++++++++++++ .../src/modules/fleet/autoscaler.ts | 220 +++++++++++++++++ .../src/modules/fleet/coordinator.ts | 38 ++- .../src/modules/fleet/repository.ts | 3 + .../src/modules/fleet/routes.ts | 78 +++++- .../src/modules/fleet/trace.test.ts | 83 +++++++ .../src/modules/fleet/trace.ts | 70 ++++++ .../src/modules/fleet/tracing.test.ts | 182 ++++++++++++++ .../src/modules/fleet/types.ts | 15 ++ 10 files changed, 915 insertions(+), 8 deletions(-) create mode 100644 services/platform-service/src/modules/fleet/autoscaler.test.ts create mode 100644 services/platform-service/src/modules/fleet/autoscaler.ts create mode 100644 services/platform-service/src/modules/fleet/trace.test.ts create mode 100644 services/platform-service/src/modules/fleet/trace.ts create mode 100644 services/platform-service/src/modules/fleet/tracing.test.ts diff --git a/.env.example b/.env.example index 226ed6a9..55ed2807 100644 --- a/.env.example +++ b/.env.example @@ -111,3 +111,10 @@ FLEET_COST_ROUTING= FLEET_ENGINE_BREAKER= FLEET_BUDGETS= FLEET_TENANT_ENFORCEMENT= +# Capacity autoscaling signal (§5) — tunes the advisory scale recommendation +# served at GET /api/fleet/autoscale[/all] (consumed by an external scaler). +# All optional; unset keys fall back to the in-code defaults shown below. +FLEET_AUTOSCALE_SCALE_OUT_PCT=85 +FLEET_AUTOSCALE_SCALE_IN_PCT=20 +FLEET_AUTOSCALE_MAX_STEP=5 +FLEET_AUTOSCALE_MIN_SEATS=0 diff --git a/services/platform-service/src/modules/fleet/autoscaler.test.ts b/services/platform-service/src/modules/fleet/autoscaler.test.ts new file mode 100644 index 00000000..71bed8f6 --- /dev/null +++ b/services/platform-service/src/modules/fleet/autoscaler.test.ts @@ -0,0 +1,227 @@ +/** + * Capacity autoscaling signal (§5) — pure recommender unit tests + a route test. + */ + +import Fastify, { type FastifyInstance } from 'fastify'; +import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'; +import { MemoryDatastoreProvider } from '@bytelyst/datastore'; +import { _resetDatastoreProvider, setProvider } from '../../lib/datastore.js'; +import { + recommendScaling, + scalingThresholdsFromEnv, + DEFAULT_SCALING_THRESHOLDS, +} from './autoscaler.js'; +import type { FleetMetrics, FleetAlert } from './coordinator.js'; + +vi.mock('../../lib/auth.js', () => ({ + extractAuth: vi.fn(async () => ({ sub: 'user_1', role: 'admin' })), +})); +vi.mock('../../lib/request-context.js', () => ({ + getRequestProductId: () => 'lysnrai', + requireProductAccess: async () => 'lysnrai', +})); + +function metrics( + over: { + jobs?: Partial; + factories?: Partial; + alerts?: FleetAlert[]; + } = {} +): FleetMetrics { + return { + productId: 'lysnrai', + generatedAt: '2026-06-01T00:00:00.000Z', + jobs: { + total: 0, + byStage: {} as FleetMetrics['jobs']['byStage'], + queueDepth: 0, + blocked: 0, + active: 0, + oldestQueuedAgeMs: null, + ...over.jobs, + }, + factories: { + total: 1, + live: 1, + stale: 0, + byHealth: { ok: 1, degraded: 0, down: 0 }, + seatsUsed: 0, + seatsTotal: 4, + utilizationPct: 0, + ...over.factories, + }, + budget: null, + alerts: over.alerts ?? [], + }; +} + +const alert = (code: string, level: FleetAlert['level'] = 'warning'): FleetAlert => ({ + code, + level, + message: code, +}); + +describe('recommendScaling', () => { + it('holds when capacity is balanced against demand', () => { + const r = recommendScaling( + metrics({ factories: { seatsUsed: 2, seatsTotal: 4, utilizationPct: 50 } }) + ); + expect(r.action).toBe('hold'); + expect(r.delta).toBe(0); + expect(r.recommendedSeats).toBe(4); + }); + + it('CRITICAL scale-out on no_live_capacity even from a cold (zero-seat) fleet', () => { + const r = recommendScaling( + metrics({ + jobs: { queueDepth: 3, total: 3 }, + factories: { total: 0, live: 0, seatsUsed: 0, seatsTotal: 0, utilizationPct: 0 }, + alerts: [alert('no_live_capacity', 'critical')], + }) + ); + expect(r.action).toBe('scale_out'); + expect(r.urgency).toBe('critical'); + expect(r.recommendedSeats).toBe(3); // min(queueDepth=3, maxStep=5) + expect(r.delta).toBe(3); + }); + + it('scales out on saturation, bounded by maxScaleOutStep', () => { + const r = recommendScaling( + metrics({ + jobs: { queueDepth: 20, total: 24 }, + factories: { seatsUsed: 4, seatsTotal: 4, utilizationPct: 100 }, + alerts: [alert('saturated')], + }) + ); + expect(r.action).toBe('scale_out'); + expect(r.delta).toBe(DEFAULT_SCALING_THRESHOLDS.maxScaleOutStep); // capped at 5 + expect(r.recommendedSeats).toBe(4 + 5); + }); + + it('scales out on queue_starvation', () => { + const r = recommendScaling( + metrics({ + jobs: { queueDepth: 1, total: 1, oldestQueuedAgeMs: 999_999 }, + factories: { seatsUsed: 1, seatsTotal: 2, utilizationPct: 50 }, + alerts: [alert('queue_starvation')], + }) + ); + expect(r.action).toBe('scale_out'); + expect(r.delta).toBe(1); + }); + + it('scales out on high utilization with a backlog (no explicit alert)', () => { + const r = recommendScaling( + metrics({ + jobs: { queueDepth: 2, total: 6 }, + factories: { seatsUsed: 4, seatsTotal: 4, utilizationPct: 90 }, + }), + { scaleOutUtilizationPct: 85 } + ); + expect(r.action).toBe('scale_out'); + expect(r.delta).toBe(2); + }); + + it('budget_exhausted suppresses scale-out (claims are blocked) → hold', () => { + const r = recommendScaling( + metrics({ + jobs: { queueDepth: 5, total: 5 }, + factories: { seatsUsed: 4, seatsTotal: 4, utilizationPct: 100 }, + alerts: [alert('saturated'), alert('budget_exhausted', 'critical')], + }) + ); + expect(r.action).toBe('hold'); + expect(r.reason).toMatch(/budget/i); + }); + + it('scales in idle slack down to in-use seats / floor', () => { + const r = recommendScaling( + metrics({ + jobs: { queueDepth: 0, active: 0, total: 0 }, + factories: { seatsUsed: 1, seatsTotal: 6, utilizationPct: 16 }, + }) + ); + expect(r.action).toBe('scale_in'); + expect(r.recommendedSeats).toBe(1); // max(minSeats=0, seatsUsed=1) + expect(r.delta).toBe(-5); + }); + + it('does NOT scale in while work is in flight', () => { + const r = recommendScaling( + metrics({ + jobs: { queueDepth: 0, active: 2, total: 2 }, + factories: { seatsUsed: 2, seatsTotal: 6, utilizationPct: 33 }, + }) + ); + expect(r.action).toBe('hold'); + }); + + it('respects a minSeats floor on scale-in', () => { + const r = recommendScaling( + metrics({ + jobs: { queueDepth: 0, active: 0, total: 0 }, + factories: { seatsUsed: 0, seatsTotal: 6, utilizationPct: 0 }, + }), + { minSeats: 2 } + ); + expect(r.action).toBe('scale_in'); + expect(r.recommendedSeats).toBe(2); + }); +}); + +describe('scalingThresholdsFromEnv', () => { + const KEYS = [ + 'FLEET_AUTOSCALE_SCALE_OUT_PCT', + 'FLEET_AUTOSCALE_SCALE_IN_PCT', + 'FLEET_AUTOSCALE_MAX_STEP', + 'FLEET_AUTOSCALE_MIN_SEATS', + ]; + afterEach(() => KEYS.forEach(k => delete process.env[k])); + + it('returns an empty override set when nothing is configured', () => { + expect(scalingThresholdsFromEnv()).toEqual({}); + }); + + it('parses configured overrides and ignores invalid values', () => { + process.env.FLEET_AUTOSCALE_SCALE_OUT_PCT = '70'; + process.env.FLEET_AUTOSCALE_MAX_STEP = '10'; + process.env.FLEET_AUTOSCALE_MIN_SEATS = 'not-a-number'; + expect(scalingThresholdsFromEnv()).toEqual({ scaleOutUtilizationPct: 70, maxScaleOutStep: 10 }); + }); +}); + +async function buildApp(): Promise { + const { fleetRoutes } = await import('./routes.js'); + const app = Fastify({ logger: false }); + await app.register(fleetRoutes, { prefix: '/api' }); + return app; +} + +describe('GET /fleet/autoscale (route)', () => { + beforeEach(() => setProvider(new MemoryDatastoreProvider())); + afterEach(() => { + _resetDatastoreProvider(); + vi.clearAllMocks(); + }); + + it('returns hold for an empty fleet and scale_out once jobs queue with no capacity', async () => { + const app = await buildApp(); + + const idle = await app.inject({ method: 'GET', url: '/api/fleet/autoscale' }); + expect(idle.statusCode).toBe(200); + expect(JSON.parse(idle.body).action).toBe('hold'); + + // Queue a job with no factories registered → no_live_capacity → scale_out. + await app.inject({ + method: 'POST', + url: '/api/fleet/jobs', + payload: { idempotencyKey: 'as-1', bodyMd: '# task' }, + }); + const hot = await app.inject({ method: 'GET', url: '/api/fleet/autoscale' }); + const body = JSON.parse(hot.body); + expect(body.action).toBe('scale_out'); + expect(body.urgency).toBe('critical'); + expect(body.signals.queueDepth).toBe(1); + expect(body.delta).toBeGreaterThanOrEqual(1); + }); +}); diff --git a/services/platform-service/src/modules/fleet/autoscaler.ts b/services/platform-service/src/modules/fleet/autoscaler.ts new file mode 100644 index 00000000..f8c3190c --- /dev/null +++ b/services/platform-service/src/modules/fleet/autoscaler.ts @@ -0,0 +1,220 @@ +/** + * Capacity autoscaling signal (§5). + * + * Turns a product's point-in-time `FleetMetrics` (queue depth, seat utilization, + * and the derived saturation alerts: `no_live_capacity`, `saturated`, + * `queue_starvation`) into a concrete, auditable scale recommendation an + * external scaler can consume (pull model — `GET /fleet/autoscale`). PURE + + * synchronous: the route does the I/O (compute metrics) and hands the snapshot + * here, keeping the policy fully unit-testable and free of side effects. + * + * The recommendation is advisory: it never starts/stops factories itself, it + * reports a desired seat target + delta + the raw signals that drove it, so a + * downstream controller (or a human) makes the actual call. + */ + +import type { FleetMetrics } from './coordinator.js'; + +export type ScalingAction = 'scale_out' | 'scale_in' | 'hold'; + +/** Tunable thresholds (route resolves these from `FLEET_AUTOSCALE_*` env). */ +export interface ScalingThresholds { + /** Seat utilization at/above which a backlog should trigger scale-out (%). */ + scaleOutUtilizationPct: number; + /** Seat utilization at/below which idle capacity may be reclaimed (%). */ + scaleInUtilizationPct: number; + /** Max seats to recommend adding in a single step (rate-limits scale-out). */ + maxScaleOutStep: number; + /** Never recommend scaling below this many seats (keeps a warm floor). */ + minSeats: number; +} + +export const DEFAULT_SCALING_THRESHOLDS: ScalingThresholds = { + scaleOutUtilizationPct: 85, + scaleInUtilizationPct: 20, + maxScaleOutStep: 5, + minSeats: 0, +}; + +export interface ScalingSignals { + queueDepth: number; + blocked: number; + active: number; + seatsUsed: number; + seatsTotal: number; + utilizationPct: number; + /** Runnable backlog driving scale-out pressure (queued jobs; blocked excluded). */ + pressure: number; + /** Codes of the alerts currently firing for the product. */ + alertCodes: string[]; +} + +export interface ScalingRecommendation { + productId: string; + action: ScalingAction; + /** Absolute recommended seat target. */ + recommendedSeats: number; + /** recommendedSeats - current seatsTotal (positive = add, negative = remove). */ + delta: number; + /** Human-readable justification. */ + reason: string; + /** Urgency: `critical` when work is queued with zero usable capacity. */ + urgency: 'critical' | 'warning' | 'none'; + signals: ScalingSignals; + generatedAt: string; +} + +/** Clamp to a non-negative integer. */ +function clampNonNeg(n: number): number { + return Math.max(0, Math.round(n)); +} + +/** Parse a non-negative number from env, falling back when unset/invalid. */ +function envNum(name: string): number | undefined { + const raw = process.env[name]; + if (raw === undefined || raw.trim() === '') return undefined; + const n = Number(raw); + return Number.isFinite(n) && n >= 0 ? n : undefined; +} + +/** + * Resolve threshold overrides from `FLEET_AUTOSCALE_*` env vars (any unset key + * falls back to {@link DEFAULT_SCALING_THRESHOLDS}). Isolated here so the route + * stays a thin pass-through and the defaults remain unit-testable. + */ +export function scalingThresholdsFromEnv(): Partial { + const out: Partial = {}; + const outPct = envNum('FLEET_AUTOSCALE_SCALE_OUT_PCT'); + const inPct = envNum('FLEET_AUTOSCALE_SCALE_IN_PCT'); + const step = envNum('FLEET_AUTOSCALE_MAX_STEP'); + const floor = envNum('FLEET_AUTOSCALE_MIN_SEATS'); + if (outPct !== undefined) out.scaleOutUtilizationPct = outPct; + if (inPct !== undefined) out.scaleInUtilizationPct = inPct; + if (step !== undefined) out.maxScaleOutStep = step; + if (floor !== undefined) out.minSeats = floor; + return out; +} + +/** + * Compute a scale recommendation for one product. Decision order (highest + * urgency first): + * + * 1. `no_live_capacity` (queued work, zero live healthy seats) → CRITICAL + * scale-out, even from a cold zero-seat fleet. + * 2. `saturated` / `queue_starvation` / high utilization with a backlog → + * scale-out by the smaller of the backlog and `maxScaleOutStep`. + * 3. Idle (no backlog, no in-flight work, utilization ≤ scale-in floor) with + * slack seats above `minSeats` → reclaim idle capacity. + * 4. Otherwise hold. + * + * A `budget_exhausted` alert SUPPRESSES scale-out (new claims are blocked by the + * budget, so more seats can't drain the queue) — it degrades to hold. + */ +export function recommendScaling( + metrics: FleetMetrics, + thresholds: Partial = {} +): ScalingRecommendation { + const t = { ...DEFAULT_SCALING_THRESHOLDS, ...thresholds }; + const { queueDepth, blocked, active } = metrics.jobs; + const { seatsUsed, seatsTotal, utilizationPct } = metrics.factories; + const alertCodes = metrics.alerts.map(a => a.code); + const pressure = queueDepth; + + const signals: ScalingSignals = { + queueDepth, + blocked, + active, + seatsUsed, + seatsTotal, + utilizationPct, + pressure, + alertCodes, + }; + + const hold = (reason: string): ScalingRecommendation => ({ + productId: metrics.productId, + action: 'hold', + recommendedSeats: seatsTotal, + delta: 0, + reason, + urgency: 'none', + signals, + generatedAt: metrics.generatedAt, + }); + + const scaleOut = ( + addSeats: number, + reason: string, + urgency: 'critical' | 'warning' + ): ScalingRecommendation => { + const add = Math.min(Math.max(1, addSeats), t.maxScaleOutStep); + const recommendedSeats = clampNonNeg(seatsTotal + add); + return { + productId: metrics.productId, + action: 'scale_out', + recommendedSeats, + delta: recommendedSeats - seatsTotal, + reason, + urgency, + signals, + generatedAt: metrics.generatedAt, + }; + }; + + // Budget exhaustion blocks claims — more seats won't help; do not scale out. + if (alertCodes.includes('budget_exhausted')) { + return hold('Budget exhausted — claims are blocked, so scaling out cannot drain the queue.'); + } + + // 1. No usable capacity for queued work — most urgent. + if (alertCodes.includes('no_live_capacity')) { + return scaleOut( + pressure, + `${queueDepth} job(s) queued with no live, healthy factory — provision capacity now.`, + 'critical' + ); + } + + // 2. Saturation / starvation / sustained high utilization with a backlog. + if (alertCodes.includes('saturated')) { + return scaleOut( + pressure, + `All ${seatsTotal} seat(s) busy with ${queueDepth} job(s) still queued — add capacity.`, + 'warning' + ); + } + if (alertCodes.includes('queue_starvation')) { + return scaleOut( + pressure, + `Oldest queued job has waited past the starvation threshold — add capacity to drain it.`, + 'warning' + ); + } + if (pressure > 0 && seatsTotal > 0 && utilizationPct >= t.scaleOutUtilizationPct) { + return scaleOut( + pressure, + `Seat utilization ${utilizationPct}% ≥ ${t.scaleOutUtilizationPct}% with ${queueDepth} queued — add capacity.`, + 'warning' + ); + } + + // 3. Idle slack — reclaim capacity down to the floor (never below in-use seats). + const idle = queueDepth === 0 && active === 0; + if (idle && seatsTotal > t.minSeats && utilizationPct <= t.scaleInUtilizationPct) { + const recommendedSeats = clampNonNeg(Math.max(t.minSeats, seatsUsed)); + if (recommendedSeats < seatsTotal) { + return { + productId: metrics.productId, + action: 'scale_in', + recommendedSeats, + delta: recommendedSeats - seatsTotal, + reason: `No queued or in-flight work and ${utilizationPct}% utilization — reclaim idle seats down to ${recommendedSeats}.`, + urgency: 'none', + signals, + generatedAt: metrics.generatedAt, + }; + } + } + + return hold('Capacity is balanced against current demand.'); +} diff --git a/services/platform-service/src/modules/fleet/coordinator.ts b/services/platform-service/src/modules/fleet/coordinator.ts index c3018315..6eeae93b 100644 --- a/services/platform-service/src/modules/fleet/coordinator.ts +++ b/services/platform-service/src/modules/fleet/coordinator.ts @@ -149,7 +149,12 @@ export interface SubmitResult { outcome: 'created' | 'deduplicated' | 'superseded'; } -export async function submitJob(productId: string, input: SubmitJobInput): Promise { +export async function submitJob( + productId: string, + input: SubmitJobInput, + opts?: { correlationId?: string } +): Promise { + const correlationId = opts?.correlationId; const hash = contentHash(input.bodyMd); const existingForKey = await repo.findJobsByIdempotencyKey(productId, input.idempotencyKey); @@ -234,6 +239,7 @@ export async function submitJob(productId: string, input: SubmitJobInput): Promi attemptsBase: 0, leaseEpoch: 0, rev: 0, + correlationId, createdAt: now, updatedAt: now, }; @@ -297,6 +303,8 @@ export async function submitJob(productId: string, input: SubmitJobInput): Promi attemptsBase: 0, leaseEpoch: 0, rev: 0, + // Children share the parent's trace so a composite job is one work-unit. + correlationId, createdAt: childNow, updatedAt: childNow, }; @@ -306,6 +314,7 @@ export async function submitJob(productId: string, input: SubmitJobInput): Promi await repo.appendEvent({ jobId: childId, productId, + correlationId, type: 'submitted', data: { stage: childStage, idempotencyKey: child.idempotencyKey, parentId: id }, }); @@ -314,6 +323,7 @@ export async function submitJob(productId: string, input: SubmitJobInput): Promi await repo.appendEvent({ jobId: id, productId, + correlationId, type: 'submitted', data: { stage: base.stage, idempotencyKey: input.idempotencyKey, childCount: children.length }, }); @@ -488,6 +498,9 @@ export async function tryClaimJob( const expiresAt = new Date(now + ctx.leaseSeconds * 1000).toISOString(); const nowIso = new Date(now).toISOString(); + // The run/lease inherit the job's trace so the whole work-unit shares one id. + const correlationId = job.correlationId; + const existingLease = await repo.getLease(job.id); let lease: FleetLeaseDoc; if (existingLease) { @@ -497,6 +510,7 @@ export async function tryClaimJob( leaseEpoch: newEpoch, renewals: 0, status: 'held', + correlationId, }); lease = updated.ok ? updated.doc : existingLease; } else { @@ -510,6 +524,7 @@ export async function tryClaimJob( renewals: 0, status: 'held', rev: 0, + correlationId, updatedAt: nowIso, }); } @@ -525,6 +540,7 @@ export async function tryClaimJob( engine: job.engine ?? job.engineClass ?? 'unknown', startedAt: nowIso, insights: {}, + correlationId, }); await repo.appendEvent({ @@ -532,6 +548,7 @@ export async function tryClaimJob( productId: ctx.productId, type: 'assigned', actor: ctx.factoryId, + correlationId, data: { leaseEpoch: newEpoch, attempt }, }); @@ -846,6 +863,7 @@ export async function patchJobFenced( jobId, productId, type: 'transition', + correlationId: job.correlationId, data: { stage: patch.stage ?? job.stage, leaseEpoch: patch.leaseEpoch }, }); @@ -1082,7 +1100,13 @@ export async function renewLease( status: 'held', }); if (!res.ok) return { ok: false, reason: 'conflict' }; - await repo.appendEvent({ jobId, productId, type: 'lease_renewed', data: { leaseEpoch } }); + await repo.appendEvent({ + jobId, + productId, + type: 'lease_renewed', + correlationId: job.correlationId, + data: { leaseEpoch }, + }); return { ok: true, doc: res.doc }; } @@ -1202,6 +1226,7 @@ export async function releaseLease( jobId, productId, type: 'retry_scheduled', + correlationId: job.correlationId, data: { attempt: job.attempts, max: job.retry?.max, @@ -1216,6 +1241,7 @@ export async function releaseLease( jobId, productId, type: 'dead_letter', + correlationId: job.correlationId, data: { attempts: job.attempts, max: job.retry?.max, result: report?.result ?? 'failed' }, }); } else { @@ -1253,7 +1279,13 @@ export async function releaseLease( engineBreaker.recordSuccess(breakerFactory, breakerEngine); } } - await repo.appendEvent({ jobId, productId, type: 'lease_released', data: { leaseEpoch, stage } }); + await repo.appendEvent({ + jobId, + productId, + type: 'lease_released', + correlationId: job.correlationId, + data: { leaseEpoch, stage }, + }); return { ok: true, doc: res.doc }; } diff --git a/services/platform-service/src/modules/fleet/repository.ts b/services/platform-service/src/modules/fleet/repository.ts index 59665cfa..5615c0d2 100644 --- a/services/platform-service/src/modules/fleet/repository.ts +++ b/services/platform-service/src/modules/fleet/repository.ts @@ -343,6 +343,8 @@ export interface AppendEventInput { productId: string; type: string; actor?: string; + /** Trace-context correlation id of the owning job (§4), stamped on the event. */ + correlationId?: string; data?: Record; } @@ -358,6 +360,7 @@ export async function appendEvent(input: AppendEventInput): Promise => new Promise(resolve => setTimeout(resolve, ms)); +/** + * Echo a job's trace-context correlation id back to the caller on the + * `x-correlation-id` response header (§4), so an external factory can carry it + * on its subsequent renew/release calls and a tracer can stitch the work-unit + * across the coordinator↔runner boundary. No-op when absent (legacy jobs). + */ +function echoCorrelation(reply: import('fastify').FastifyReply, correlationId?: string): void { + if (correlationId) reply.header(CORRELATION_HEADER, correlationId); +} + /** Parse an integer query param, clamping to [min, max]; fall back to `fallback`. */ function clampInt(raw: string | undefined, fallback: number, min: number, max: number): number { const n = Number.parseInt(raw ?? '', 10); @@ -91,7 +106,13 @@ export async function fleetRoutes(app: FastifyInstance) { const parsed = SubmitJobSchema.safeParse(req.body); if (!parsed.success) badRequest(parsed.error.issues); const pid = parsed.data.productId || (await requireProductAccess(req)); - const result = await coordinator.submitJob(pid, parsed.data); + // Mint/propagate the trace-context correlation id from the inbound request + // headers and bind it to the request logger so the whole submit path is + // correlatable (§4). The job persists it; the run/lease/events inherit it. + const correlationId = resolveCorrelationId(req.headers); + req.log = req.log.child({ correlationId }); + const result = await coordinator.submitJob(pid, parsed.data, { correlationId }); + echoCorrelation(reply, result.job.correlationId ?? correlationId); reply.code(result.outcome === 'created' ? 201 : 200); return { outcome: result.outcome, job: result.job }; }); @@ -124,7 +145,7 @@ export async function fleetRoutes(app: FastifyInstance) { }); // ── Fenced state transition ── - app.patch('/fleet/jobs/:id', async req => { + app.patch('/fleet/jobs/:id', async (req, reply) => { await extractAuth(req); const { id } = req.params as { id: string }; const pid = await requireProductAccess(req); @@ -138,6 +159,7 @@ export async function fleetRoutes(app: FastifyInstance) { } throw new ConflictError('concurrent update conflict — retry'); } + echoCorrelation(reply, res.doc.correlationId); // §10: opt-in (FLEET_TRACKER_ECHO, default OFF) downstream echo of the new // stage to the linked tracker Item. Never blocks/fails the transition. await trackerBridge.maybeEchoOnTransition(pid, id, req.log); @@ -242,7 +264,7 @@ export async function fleetRoutes(app: FastifyInstance) { return { ...res.doc, gate: res.gate }; }); - app.post('/fleet/claim', async req => { + app.post('/fleet/claim', async (req, reply) => { await extractAuth(req); const parsed = ClaimSchema.safeParse(req.body); if (!parsed.success) badRequest(parsed.error.issues); @@ -262,11 +284,18 @@ export async function fleetRoutes(app: FastifyInstance) { leaseSeconds: parsed.data.leaseSeconds, }); if (!claim) return { claimed: false }; + // Hand the job's trace back to the factory so it can carry it on its + // renew/release calls — closing the loop across the coordinator↔runner + // boundary (§4). Also bind it to the request logger. + if (claim.job.correlationId) { + req.log = req.log.child({ correlationId: claim.job.correlationId }); + } + echoCorrelation(reply, claim.job.correlationId); return { claimed: true, ...claim }; }); // ── Lease renew ── - app.post('/fleet/jobs/:id/lease/renew', async req => { + app.post('/fleet/jobs/:id/lease/renew', async (req, reply) => { await extractAuth(req); const { id } = req.params as { id: string }; const pid = await requireProductAccess(req); @@ -283,11 +312,12 @@ export async function fleetRoutes(app: FastifyInstance) { if (res.reason === 'fenced') throw new ConflictError('stale leaseEpoch — renew fenced'); throw new ConflictError('lease renew conflict — retry'); } + echoCorrelation(reply, res.doc.correlationId); return res.doc; }); // ── Lease release ── - app.post('/fleet/jobs/:id/lease/release', async req => { + app.post('/fleet/jobs/:id/lease/release', async (req, reply) => { await extractAuth(req); const { id } = req.params as { id: string }; const pid = await requireProductAccess(req); @@ -305,6 +335,7 @@ export async function fleetRoutes(app: FastifyInstance) { if (res.reason === 'fenced') throw new ConflictError('stale leaseEpoch — release fenced'); throw new ConflictError('lease release conflict — retry'); } + echoCorrelation(reply, res.doc.correlationId); // §10: release often carries a terminal stage (shipped/failed) — echo it (opt-in). if (parsed.data.stage) await trackerBridge.maybeEchoOnTransition(pid, id, req.log); return res.doc; @@ -484,6 +515,43 @@ export async function fleetRoutes(app: FastifyInstance) { return body; }); + // ── Capacity autoscaling signal (§5) ── + // Advisory scale recommendation for THIS product, derived from the live + // saturation alerts (no_live_capacity / saturated / queue_starvation) + seat + // utilization. Pull model: an external scaler GETs this and decides — the + // coordinator never starts/stops factories itself. Thresholds are tunable via + // FLEET_AUTOSCALE_* env. + app.get('/fleet/autoscale', async req => { + await extractAuth(req); + const pid = await requireProductAccess(req); + const metrics = await coordinator.fleetMetrics(pid); + return recommendScaling(metrics, scalingThresholdsFromEnv()); + }); + + // ── Capacity autoscaling signal — GLOBAL (all products) ── + // One call for a fleet-wide scaler controller. Auth mirrors the Prometheus + // exposition: a matching FLEET_METRICS_TOKEN bearer, else an admin JWT. + app.get('/fleet/autoscale/all', async req => { + if (!scrapeTokenMatches(req.headers['authorization'])) { + const auth = await extractAuth(req); + if (auth.role !== 'admin' && auth.role !== 'super_admin') { + throw new ForbiddenError('admin role or a metrics scrape token is required'); + } + } + const thresholds = scalingThresholdsFromEnv(); + const recommendations = await Promise.all( + getAllProducts().map(async p => + recommendScaling(await coordinator.fleetMetrics(p.productId), thresholds) + ) + ); + return { + generatedAt: new Date().toISOString(), + recommendations, + // Quick roll-up so a controller can act on the actionable subset. + actionable: recommendations.filter(r => r.action !== 'hold'), + }; + }); + // ── M0 RU gate: per-product queue version (cheap ~1 RU point read) ── // A polling factory reads this each tick and only runs the expensive claim when // `version` changed since its last attempt. See diff --git a/services/platform-service/src/modules/fleet/trace.test.ts b/services/platform-service/src/modules/fleet/trace.test.ts new file mode 100644 index 00000000..eb13e4e1 --- /dev/null +++ b/services/platform-service/src/modules/fleet/trace.test.ts @@ -0,0 +1,83 @@ +/** + * Fleet trace-context helpers (§4) — pure resolution-rule unit tests. + */ + +import { describe, it, expect } from 'vitest'; +import { + CORRELATION_HEADER, + genCorrelationId, + traceIdFromTraceparent, + correlationIdFromHeaders, + resolveCorrelationId, +} from './trace.js'; + +describe('genCorrelationId', () => { + it('mints a prefixed, unique id', () => { + const a = genCorrelationId(); + const b = genCorrelationId(); + expect(a).toMatch(/^ftr_[0-9a-f-]{36}$/); + expect(a).not.toBe(b); + }); +}); + +describe('traceIdFromTraceparent', () => { + it('extracts the 32-hex trace-id from a valid W3C traceparent', () => { + const tp = '00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01'; + expect(traceIdFromTraceparent(tp)).toBe('4bf92f3577b34da6a3ce929d0e0e4736'); + }); + + it('lowercases the trace-id', () => { + const tp = '00-4BF92F3577B34DA6A3CE929D0E0E4736-00f067aa0ba902b7-01'; + expect(traceIdFromTraceparent(tp)).toBe('4bf92f3577b34da6a3ce929d0e0e4736'); + }); + + it('rejects malformed / all-zero / missing traceparents', () => { + expect(traceIdFromTraceparent(undefined)).toBeUndefined(); + expect(traceIdFromTraceparent('garbage')).toBeUndefined(); + expect(traceIdFromTraceparent('00-xyz-00f067aa0ba902b7-01')).toBeUndefined(); + expect( + traceIdFromTraceparent('00-00000000000000000000000000000000-00f067aa0ba902b7-01') + ).toBeUndefined(); + }); +}); + +describe('correlationIdFromHeaders', () => { + it('prefers x-correlation-id', () => { + expect( + correlationIdFromHeaders({ + [CORRELATION_HEADER]: 'cid-1', + traceparent: '00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01', + 'x-request-id': 'req-1', + }) + ).toBe('cid-1'); + }); + + it('falls back to the traceparent trace-id, then x-request-id', () => { + expect( + correlationIdFromHeaders({ + traceparent: '00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01', + 'x-request-id': 'req-1', + }) + ).toBe('4bf92f3577b34da6a3ce929d0e0e4736'); + expect(correlationIdFromHeaders({ 'x-request-id': 'req-1' })).toBe('req-1'); + }); + + it('collapses an array-valued header to its first entry', () => { + expect(correlationIdFromHeaders({ [CORRELATION_HEADER]: ['cid-a', 'cid-b'] })).toBe('cid-a'); + }); + + it('returns undefined when no correlating header is present', () => { + expect(correlationIdFromHeaders({})).toBeUndefined(); + expect(correlationIdFromHeaders({ [CORRELATION_HEADER]: ' ' })).toBeUndefined(); + }); +}); + +describe('resolveCorrelationId', () => { + it('uses the inbound id when present', () => { + expect(resolveCorrelationId({ 'x-request-id': 'req-7' })).toBe('req-7'); + }); + + it('mints a fresh id when the request carries none', () => { + expect(resolveCorrelationId({})).toMatch(/^ftr_/); + }); +}); diff --git a/services/platform-service/src/modules/fleet/trace.ts b/services/platform-service/src/modules/fleet/trace.ts new file mode 100644 index 00000000..03010726 --- /dev/null +++ b/services/platform-service/src/modules/fleet/trace.ts @@ -0,0 +1,70 @@ +/** + * Fleet trace-context helpers (§4 ops tracing). + * + * A `correlationId` threads a single logical work-unit across the + * coordinator↔runner boundary: it is minted when a job is submitted, persisted + * on the job, then inherited by the run/lease and stamped on every lifecycle + * event (assigned → transition → lease_renewed → lease_released). The + * coordinator echoes it back to the runner on the `x-correlation-id` response + * header so an external factory can carry it on its subsequent renew/release + * calls, and a downstream tracer can stitch job → claim → run → ship together. + * + * PURE + dependency-light so the resolution rules stay unit-testable. + */ + +import crypto from 'node:crypto'; + +/** Response/request header carrying the fleet correlation id. */ +export const CORRELATION_HEADER = 'x-correlation-id'; + +/** Mint a fresh correlation id (`ftr_` — distinct from job/run ids). */ +export function genCorrelationId(): string { + return `ftr_${crypto.randomUUID()}`; +} + +/** Header bag as Fastify exposes it (`string | string[] | undefined`). */ +export type HeaderBag = Record; + +/** Read a header case-insensitively, collapsing an array to its first value. */ +function header(headers: HeaderBag, name: string): string | undefined { + const v = headers[name] ?? headers[name.toLowerCase()]; + const s = Array.isArray(v) ? v[0] : v; + const trimmed = s?.trim(); + return trimmed ? trimmed : undefined; +} + +/** + * Extract the 32-hex trace-id from a W3C `traceparent` header + * (`---`). Returns undefined when the + * header is malformed or the trace-id is all-zero (the spec's "invalid" value). + */ +export function traceIdFromTraceparent(traceparent: string | undefined): string | undefined { + if (!traceparent) return undefined; + const parts = traceparent.trim().split('-'); + if (parts.length < 4) return undefined; + const traceId = parts[1]; + if (!/^[0-9a-f]{32}$/i.test(traceId) || /^0{32}$/.test(traceId)) return undefined; + return traceId.toLowerCase(); +} + +/** + * Resolve the correlation id carried by an inbound request, in precedence + * order: an explicit `x-correlation-id`, then the trace-id of a W3C + * `traceparent`, then the propagated `x-request-id`. Returns undefined when the + * request carries none (the caller decides whether to mint one). + */ +export function correlationIdFromHeaders(headers: HeaderBag): string | undefined { + return ( + header(headers, CORRELATION_HEADER) || + traceIdFromTraceparent(header(headers, 'traceparent')) || + header(headers, 'x-request-id') + ); +} + +/** + * Resolve the correlation id for a new job: the inbound request's id when + * present, else a freshly minted one. Always returns a non-empty string. + */ +export function resolveCorrelationId(headers: HeaderBag): string { + return correlationIdFromHeaders(headers) ?? genCorrelationId(); +} diff --git a/services/platform-service/src/modules/fleet/tracing.test.ts b/services/platform-service/src/modules/fleet/tracing.test.ts new file mode 100644 index 00000000..b2ac4c92 --- /dev/null +++ b/services/platform-service/src/modules/fleet/tracing.test.ts @@ -0,0 +1,182 @@ +/** + * Fleet correlation-id tracing (§4) — end-to-end across the coordinator↔runner + * boundary: a job minted with a correlation id propagates onto the run, lease, + * and every lifecycle event, and the routes echo it on the `x-correlation-id` + * response header so an external factory can carry it forward. + */ + +import Fastify, { type FastifyInstance } from 'fastify'; +import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'; +import { MemoryDatastoreProvider } from '@bytelyst/datastore'; +import { _resetDatastoreProvider, setProvider } from '../../lib/datastore.js'; +import * as repo from './repository.js'; +import * as coord from './coordinator.js'; +import { CORRELATION_HEADER } from './trace.js'; +import type { SubmitJobInput } from './types.js'; + +vi.mock('../../lib/auth.js', () => ({ + extractAuth: vi.fn(async () => ({ sub: 'user_1', role: 'admin' })), +})); +vi.mock('../../lib/request-context.js', () => ({ + getRequestProductId: () => 'lysnrai', + requireProductAccess: async () => 'lysnrai', +})); + +const PID = 'lysnrai'; + +function input(over: Partial = {}): SubmitJobInput { + return { + idempotencyKey: 'trace-1', + bodyMd: '# do the thing', + priority: 'medium', + capabilities: [], + prefersEngine: [], + allowedScope: [], + deps: [], + kind: 'leaf', + ...over, + }; +} + +const factory = (over = {}) => ({ + productId: PID, + factoryId: 'fac_1', + capabilities: [] as string[], + leaseSeconds: 900, + ...over, +}); + +async function buildApp(): Promise { + const { fleetRoutes } = await import('./routes.js'); + const app = Fastify({ logger: false }); + await app.register(fleetRoutes, { prefix: '/api' }); + return app; +} + +describe('fleet correlation-id tracing (coordinator)', () => { + beforeEach(() => setProvider(new MemoryDatastoreProvider())); + afterEach(() => { + _resetDatastoreProvider(); + vi.clearAllMocks(); + }); + + it('persists the correlation id on the job and stamps the submitted event', async () => { + const { job } = await coord.submitJob(PID, input(), { correlationId: 'cid-abc' }); + expect(job.correlationId).toBe('cid-abc'); + const events = await repo.listEvents(job.id); + const submitted = events.find(e => e.type === 'submitted'); + expect(submitted?.correlationId).toBe('cid-abc'); + }); + + it('run + lease inherit the job correlation id at claim, and the assigned event carries it', async () => { + await coord.submitJob(PID, input(), { correlationId: 'cid-claim' }); + const claim = await coord.claimNextJob(factory()); + expect(claim).not.toBeNull(); + expect(claim!.run.correlationId).toBe('cid-claim'); + expect(claim!.lease.correlationId).toBe('cid-claim'); + const events = await repo.listEvents(claim!.job.id); + expect(events.find(e => e.type === 'assigned')?.correlationId).toBe('cid-claim'); + }); + + it('threads the correlation id across transition → release lifecycle events', async () => { + await coord.submitJob(PID, input(), { correlationId: 'cid-life' }); + const claim = await coord.claimNextJob(factory()); + const jobId = claim!.job.id; + const epoch = claim!.lease.leaseEpoch; + + await coord.patchJobFenced(jobId, PID, { leaseEpoch: epoch, stage: 'building' }); + await coord.renewLease(jobId, PID, epoch, 900); + await coord.releaseLease(jobId, PID, epoch, 'shipped', { result: 'shipped' }); + + const events = await repo.listEvents(jobId); + // Every lifecycle event for this job shares the one correlation id. + for (const e of events) expect(e.correlationId).toBe('cid-life'); + expect(events.map(e => e.type)).toEqual( + expect.arrayContaining([ + 'submitted', + 'assigned', + 'transition', + 'lease_renewed', + 'lease_released', + ]) + ); + }); + + it('children of a composite job share the parent correlation id', async () => { + const { job } = await coord.submitJob( + PID, + input({ + idempotencyKey: 'parent-1', + children: [ + { + idempotencyKey: 'child-1', + bodyMd: '# child', + priority: 'medium', + capabilities: [], + prefersEngine: [], + allowedScope: [], + deps: [], + }, + ], + }), + { correlationId: 'cid-dag' } + ); + expect(job.correlationId).toBe('cid-dag'); + const children = await repo.findJobsByIdempotencyKey(PID, 'child-1'); + expect(children[0]?.correlationId).toBe('cid-dag'); + }); +}); + +describe('fleet correlation-id tracing (routes)', () => { + beforeEach(() => setProvider(new MemoryDatastoreProvider())); + afterEach(() => { + _resetDatastoreProvider(); + vi.clearAllMocks(); + }); + + it('POST /fleet/jobs honors an inbound x-correlation-id and echoes it back', async () => { + const app = await buildApp(); + const res = await app.inject({ + method: 'POST', + url: '/api/fleet/jobs', + headers: { [CORRELATION_HEADER]: 'inbound-cid' }, + payload: { idempotencyKey: 'r-1', bodyMd: '# task' }, + }); + expect(res.statusCode).toBe(201); + expect(res.headers[CORRELATION_HEADER]).toBe('inbound-cid'); + expect(JSON.parse(res.body).job.correlationId).toBe('inbound-cid'); + }); + + it('POST /fleet/jobs mints + echoes a correlation id when the request carries none', async () => { + const app = await buildApp(); + const res = await app.inject({ + method: 'POST', + url: '/api/fleet/jobs', + payload: { idempotencyKey: 'r-2', bodyMd: '# task' }, + }); + expect(res.statusCode).toBe(201); + const echoed = res.headers[CORRELATION_HEADER]; + expect(echoed).toMatch(/^ftr_/); + expect(JSON.parse(res.body).job.correlationId).toBe(echoed); + }); + + it('POST /fleet/claim hands the job correlation id back on the response header', async () => { + const app = await buildApp(); + await app.inject({ + method: 'POST', + url: '/api/fleet/jobs', + headers: { [CORRELATION_HEADER]: 'claim-cid' }, + payload: { idempotencyKey: 'r-3', bodyMd: '# task' }, + }); + const res = await app.inject({ + method: 'POST', + url: '/api/fleet/claim', + payload: { factoryId: 'fac_1', capabilities: [] }, + }); + expect(res.statusCode).toBe(200); + expect(res.headers[CORRELATION_HEADER]).toBe('claim-cid'); + const body = JSON.parse(res.body); + expect(body.claimed).toBe(true); + expect(body.run.correlationId).toBe('claim-cid'); + }); +}); diff --git a/services/platform-service/src/modules/fleet/types.ts b/services/platform-service/src/modules/fleet/types.ts index 1cf52676..55cd3fa2 100644 --- a/services/platform-service/src/modules/fleet/types.ts +++ b/services/platform-service/src/modules/fleet/types.ts @@ -232,6 +232,14 @@ export const FleetJobDocSchema = z.object({ * absent on jobs that were never echoed / have no trackerItemId. */ trackerEchoedStatus: z.string().optional(), + /** + * Trace-context correlation id (§4) minted at submission from the inbound + * `x-correlation-id`/`traceparent`/`x-request-id` (or freshly generated). The + * run/lease inherit it and every lifecycle event is stamped with it, so a + * single logical work-unit can be stitched across the coordinator↔runner + * boundary (job → claim → run → ship). Optional — absent on legacy jobs. + */ + correlationId: z.string().optional(), createdAt: z.string(), updatedAt: z.string(), }); @@ -256,6 +264,8 @@ export const FleetRunDocSchema = z.object({ prUrl: z.string().optional(), branch: z.string().optional(), prState: z.enum(['open', 'merged']).optional(), + /** Trace-context correlation id inherited from the job at claim time (§4). */ + correlationId: z.string().optional(), }); export type FleetRunDoc = z.infer; @@ -270,6 +280,8 @@ export const FleetLeaseDocSchema = z.object({ renewals: z.number().int().nonnegative().default(0), status: z.enum(LEASE_STATUS).default('held'), rev: z.number().int().nonnegative().default(0), + /** Trace-context correlation id inherited from the job at claim time (§4). */ + correlationId: z.string().optional(), updatedAt: z.string(), }); export type FleetLeaseDoc = z.infer; @@ -338,6 +350,9 @@ export const FleetEventDocSchema = z.object({ type: z.string().min(1), at: z.string(), actor: z.string().optional(), + /** Trace-context correlation id of the owning job (§4) — stamped on every + * lifecycle event so the stream is correlatable without joining to the job. */ + correlationId: z.string().optional(), data: z.record(z.string(), z.unknown()).default({}), }); export type FleetEventDoc = z.infer;