feat(fleet): correlation-id tracing + capacity autoscaling signal (ops #4/#5)

Thread a trace-context correlation id across the coordinator<->runner boundary
so a logical work-unit (job -> claim -> run -> ship) is stitchable end to end,
and add an advisory capacity autoscaling signal an external scaler can consume.

Tracing (#4):
- Mint/propagate a correlationId at submit from the inbound
  x-correlation-id/traceparent/x-request-id (else generate ftr_<uuid>); persist
  it on the job, inherit onto the run + lease at claim, and stamp every
  lifecycle event (submitted/assigned/transition/lease_renewed/lease_released/
  retry_scheduled/dead_letter). Children of a composite job share the parent id.
- Echo it back on the x-correlation-id response header (submit/claim/renew/
  release/patch) so a factory can carry it forward, and bind it to req.log.
- New pure trace.ts (header resolution incl. W3C traceparent trace-id).

Autoscaling signal (#5):
- New pure autoscaler.ts turns a product FleetMetrics + saturation alerts
  (no_live_capacity/saturated/queue_starvation) into an auditable scale
  recommendation (action/recommendedSeats/delta/urgency/signals).
  budget_exhausted suppresses scale-out; idle slack reclaims down to a floor.
  Thresholds tunable via FLEET_AUTOSCALE_* env.
- GET /fleet/autoscale (per-product) + GET /fleet/autoscale/all (global, admin
  or scrape token). Documented the env vars in .env.example.

Tests: +29 (trace 10, tracing 7, autoscaler 12); full suite 1846 green; lint + tsc clean.

Generated with [Devin](https://cli.devin.ai/docs)

Co-Authored-By: Devin <158243242+devin-ai-integration[bot]@users.noreply.github.com>
This commit is contained in:
saravanakumardb1 2026-06-01 22:43:56 -07:00
parent 93d1caf4a2
commit 321cfe7546
10 changed files with 915 additions and 8 deletions

View File

@ -111,3 +111,10 @@ FLEET_COST_ROUTING=
FLEET_ENGINE_BREAKER=
FLEET_BUDGETS=
FLEET_TENANT_ENFORCEMENT=
# Capacity autoscaling signal (§5) — tunes the advisory scale recommendation
# served at GET /api/fleet/autoscale[/all] (consumed by an external scaler).
# All optional; unset keys fall back to the in-code defaults shown below.
FLEET_AUTOSCALE_SCALE_OUT_PCT=85
FLEET_AUTOSCALE_SCALE_IN_PCT=20
FLEET_AUTOSCALE_MAX_STEP=5
FLEET_AUTOSCALE_MIN_SEATS=0

View File

@ -0,0 +1,227 @@
/**
* Capacity autoscaling signal (§5) pure recommender unit tests + a route test.
*/
import Fastify, { type FastifyInstance } from 'fastify';
import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest';
import { MemoryDatastoreProvider } from '@bytelyst/datastore';
import { _resetDatastoreProvider, setProvider } from '../../lib/datastore.js';
import {
recommendScaling,
scalingThresholdsFromEnv,
DEFAULT_SCALING_THRESHOLDS,
} from './autoscaler.js';
import type { FleetMetrics, FleetAlert } from './coordinator.js';
vi.mock('../../lib/auth.js', () => ({
extractAuth: vi.fn(async () => ({ sub: 'user_1', role: 'admin' })),
}));
vi.mock('../../lib/request-context.js', () => ({
getRequestProductId: () => 'lysnrai',
requireProductAccess: async () => 'lysnrai',
}));
function metrics(
over: {
jobs?: Partial<FleetMetrics['jobs']>;
factories?: Partial<FleetMetrics['factories']>;
alerts?: FleetAlert[];
} = {}
): FleetMetrics {
return {
productId: 'lysnrai',
generatedAt: '2026-06-01T00:00:00.000Z',
jobs: {
total: 0,
byStage: {} as FleetMetrics['jobs']['byStage'],
queueDepth: 0,
blocked: 0,
active: 0,
oldestQueuedAgeMs: null,
...over.jobs,
},
factories: {
total: 1,
live: 1,
stale: 0,
byHealth: { ok: 1, degraded: 0, down: 0 },
seatsUsed: 0,
seatsTotal: 4,
utilizationPct: 0,
...over.factories,
},
budget: null,
alerts: over.alerts ?? [],
};
}
const alert = (code: string, level: FleetAlert['level'] = 'warning'): FleetAlert => ({
code,
level,
message: code,
});
describe('recommendScaling', () => {
it('holds when capacity is balanced against demand', () => {
const r = recommendScaling(
metrics({ factories: { seatsUsed: 2, seatsTotal: 4, utilizationPct: 50 } })
);
expect(r.action).toBe('hold');
expect(r.delta).toBe(0);
expect(r.recommendedSeats).toBe(4);
});
it('CRITICAL scale-out on no_live_capacity even from a cold (zero-seat) fleet', () => {
const r = recommendScaling(
metrics({
jobs: { queueDepth: 3, total: 3 },
factories: { total: 0, live: 0, seatsUsed: 0, seatsTotal: 0, utilizationPct: 0 },
alerts: [alert('no_live_capacity', 'critical')],
})
);
expect(r.action).toBe('scale_out');
expect(r.urgency).toBe('critical');
expect(r.recommendedSeats).toBe(3); // min(queueDepth=3, maxStep=5)
expect(r.delta).toBe(3);
});
it('scales out on saturation, bounded by maxScaleOutStep', () => {
const r = recommendScaling(
metrics({
jobs: { queueDepth: 20, total: 24 },
factories: { seatsUsed: 4, seatsTotal: 4, utilizationPct: 100 },
alerts: [alert('saturated')],
})
);
expect(r.action).toBe('scale_out');
expect(r.delta).toBe(DEFAULT_SCALING_THRESHOLDS.maxScaleOutStep); // capped at 5
expect(r.recommendedSeats).toBe(4 + 5);
});
it('scales out on queue_starvation', () => {
const r = recommendScaling(
metrics({
jobs: { queueDepth: 1, total: 1, oldestQueuedAgeMs: 999_999 },
factories: { seatsUsed: 1, seatsTotal: 2, utilizationPct: 50 },
alerts: [alert('queue_starvation')],
})
);
expect(r.action).toBe('scale_out');
expect(r.delta).toBe(1);
});
it('scales out on high utilization with a backlog (no explicit alert)', () => {
const r = recommendScaling(
metrics({
jobs: { queueDepth: 2, total: 6 },
factories: { seatsUsed: 4, seatsTotal: 4, utilizationPct: 90 },
}),
{ scaleOutUtilizationPct: 85 }
);
expect(r.action).toBe('scale_out');
expect(r.delta).toBe(2);
});
it('budget_exhausted suppresses scale-out (claims are blocked) → hold', () => {
const r = recommendScaling(
metrics({
jobs: { queueDepth: 5, total: 5 },
factories: { seatsUsed: 4, seatsTotal: 4, utilizationPct: 100 },
alerts: [alert('saturated'), alert('budget_exhausted', 'critical')],
})
);
expect(r.action).toBe('hold');
expect(r.reason).toMatch(/budget/i);
});
it('scales in idle slack down to in-use seats / floor', () => {
const r = recommendScaling(
metrics({
jobs: { queueDepth: 0, active: 0, total: 0 },
factories: { seatsUsed: 1, seatsTotal: 6, utilizationPct: 16 },
})
);
expect(r.action).toBe('scale_in');
expect(r.recommendedSeats).toBe(1); // max(minSeats=0, seatsUsed=1)
expect(r.delta).toBe(-5);
});
it('does NOT scale in while work is in flight', () => {
const r = recommendScaling(
metrics({
jobs: { queueDepth: 0, active: 2, total: 2 },
factories: { seatsUsed: 2, seatsTotal: 6, utilizationPct: 33 },
})
);
expect(r.action).toBe('hold');
});
it('respects a minSeats floor on scale-in', () => {
const r = recommendScaling(
metrics({
jobs: { queueDepth: 0, active: 0, total: 0 },
factories: { seatsUsed: 0, seatsTotal: 6, utilizationPct: 0 },
}),
{ minSeats: 2 }
);
expect(r.action).toBe('scale_in');
expect(r.recommendedSeats).toBe(2);
});
});
describe('scalingThresholdsFromEnv', () => {
const KEYS = [
'FLEET_AUTOSCALE_SCALE_OUT_PCT',
'FLEET_AUTOSCALE_SCALE_IN_PCT',
'FLEET_AUTOSCALE_MAX_STEP',
'FLEET_AUTOSCALE_MIN_SEATS',
];
afterEach(() => KEYS.forEach(k => delete process.env[k]));
it('returns an empty override set when nothing is configured', () => {
expect(scalingThresholdsFromEnv()).toEqual({});
});
it('parses configured overrides and ignores invalid values', () => {
process.env.FLEET_AUTOSCALE_SCALE_OUT_PCT = '70';
process.env.FLEET_AUTOSCALE_MAX_STEP = '10';
process.env.FLEET_AUTOSCALE_MIN_SEATS = 'not-a-number';
expect(scalingThresholdsFromEnv()).toEqual({ scaleOutUtilizationPct: 70, maxScaleOutStep: 10 });
});
});
async function buildApp(): Promise<FastifyInstance> {
const { fleetRoutes } = await import('./routes.js');
const app = Fastify({ logger: false });
await app.register(fleetRoutes, { prefix: '/api' });
return app;
}
describe('GET /fleet/autoscale (route)', () => {
beforeEach(() => setProvider(new MemoryDatastoreProvider()));
afterEach(() => {
_resetDatastoreProvider();
vi.clearAllMocks();
});
it('returns hold for an empty fleet and scale_out once jobs queue with no capacity', async () => {
const app = await buildApp();
const idle = await app.inject({ method: 'GET', url: '/api/fleet/autoscale' });
expect(idle.statusCode).toBe(200);
expect(JSON.parse(idle.body).action).toBe('hold');
// Queue a job with no factories registered → no_live_capacity → scale_out.
await app.inject({
method: 'POST',
url: '/api/fleet/jobs',
payload: { idempotencyKey: 'as-1', bodyMd: '# task' },
});
const hot = await app.inject({ method: 'GET', url: '/api/fleet/autoscale' });
const body = JSON.parse(hot.body);
expect(body.action).toBe('scale_out');
expect(body.urgency).toBe('critical');
expect(body.signals.queueDepth).toBe(1);
expect(body.delta).toBeGreaterThanOrEqual(1);
});
});

View File

@ -0,0 +1,220 @@
/**
* Capacity autoscaling signal (§5).
*
* Turns a product's point-in-time `FleetMetrics` (queue depth, seat utilization,
* and the derived saturation alerts: `no_live_capacity`, `saturated`,
* `queue_starvation`) into a concrete, auditable scale recommendation an
* external scaler can consume (pull model `GET /fleet/autoscale`). PURE +
* synchronous: the route does the I/O (compute metrics) and hands the snapshot
* here, keeping the policy fully unit-testable and free of side effects.
*
* The recommendation is advisory: it never starts/stops factories itself, it
* reports a desired seat target + delta + the raw signals that drove it, so a
* downstream controller (or a human) makes the actual call.
*/
import type { FleetMetrics } from './coordinator.js';
export type ScalingAction = 'scale_out' | 'scale_in' | 'hold';
/** Tunable thresholds (route resolves these from `FLEET_AUTOSCALE_*` env). */
export interface ScalingThresholds {
/** Seat utilization at/above which a backlog should trigger scale-out (%). */
scaleOutUtilizationPct: number;
/** Seat utilization at/below which idle capacity may be reclaimed (%). */
scaleInUtilizationPct: number;
/** Max seats to recommend adding in a single step (rate-limits scale-out). */
maxScaleOutStep: number;
/** Never recommend scaling below this many seats (keeps a warm floor). */
minSeats: number;
}
export const DEFAULT_SCALING_THRESHOLDS: ScalingThresholds = {
scaleOutUtilizationPct: 85,
scaleInUtilizationPct: 20,
maxScaleOutStep: 5,
minSeats: 0,
};
export interface ScalingSignals {
queueDepth: number;
blocked: number;
active: number;
seatsUsed: number;
seatsTotal: number;
utilizationPct: number;
/** Runnable backlog driving scale-out pressure (queued jobs; blocked excluded). */
pressure: number;
/** Codes of the alerts currently firing for the product. */
alertCodes: string[];
}
export interface ScalingRecommendation {
productId: string;
action: ScalingAction;
/** Absolute recommended seat target. */
recommendedSeats: number;
/** recommendedSeats - current seatsTotal (positive = add, negative = remove). */
delta: number;
/** Human-readable justification. */
reason: string;
/** Urgency: `critical` when work is queued with zero usable capacity. */
urgency: 'critical' | 'warning' | 'none';
signals: ScalingSignals;
generatedAt: string;
}
/** Clamp to a non-negative integer. */
function clampNonNeg(n: number): number {
return Math.max(0, Math.round(n));
}
/** Parse a non-negative number from env, falling back when unset/invalid. */
function envNum(name: string): number | undefined {
const raw = process.env[name];
if (raw === undefined || raw.trim() === '') return undefined;
const n = Number(raw);
return Number.isFinite(n) && n >= 0 ? n : undefined;
}
/**
* Resolve threshold overrides from `FLEET_AUTOSCALE_*` env vars (any unset key
* falls back to {@link DEFAULT_SCALING_THRESHOLDS}). Isolated here so the route
* stays a thin pass-through and the defaults remain unit-testable.
*/
export function scalingThresholdsFromEnv(): Partial<ScalingThresholds> {
const out: Partial<ScalingThresholds> = {};
const outPct = envNum('FLEET_AUTOSCALE_SCALE_OUT_PCT');
const inPct = envNum('FLEET_AUTOSCALE_SCALE_IN_PCT');
const step = envNum('FLEET_AUTOSCALE_MAX_STEP');
const floor = envNum('FLEET_AUTOSCALE_MIN_SEATS');
if (outPct !== undefined) out.scaleOutUtilizationPct = outPct;
if (inPct !== undefined) out.scaleInUtilizationPct = inPct;
if (step !== undefined) out.maxScaleOutStep = step;
if (floor !== undefined) out.minSeats = floor;
return out;
}
/**
* Compute a scale recommendation for one product. Decision order (highest
* urgency first):
*
* 1. `no_live_capacity` (queued work, zero live healthy seats) CRITICAL
* scale-out, even from a cold zero-seat fleet.
* 2. `saturated` / `queue_starvation` / high utilization with a backlog
* scale-out by the smaller of the backlog and `maxScaleOutStep`.
* 3. Idle (no backlog, no in-flight work, utilization scale-in floor) with
* slack seats above `minSeats` reclaim idle capacity.
* 4. Otherwise hold.
*
* A `budget_exhausted` alert SUPPRESSES scale-out (new claims are blocked by the
* budget, so more seats can't drain the queue) it degrades to hold.
*/
export function recommendScaling(
metrics: FleetMetrics,
thresholds: Partial<ScalingThresholds> = {}
): ScalingRecommendation {
const t = { ...DEFAULT_SCALING_THRESHOLDS, ...thresholds };
const { queueDepth, blocked, active } = metrics.jobs;
const { seatsUsed, seatsTotal, utilizationPct } = metrics.factories;
const alertCodes = metrics.alerts.map(a => a.code);
const pressure = queueDepth;
const signals: ScalingSignals = {
queueDepth,
blocked,
active,
seatsUsed,
seatsTotal,
utilizationPct,
pressure,
alertCodes,
};
const hold = (reason: string): ScalingRecommendation => ({
productId: metrics.productId,
action: 'hold',
recommendedSeats: seatsTotal,
delta: 0,
reason,
urgency: 'none',
signals,
generatedAt: metrics.generatedAt,
});
const scaleOut = (
addSeats: number,
reason: string,
urgency: 'critical' | 'warning'
): ScalingRecommendation => {
const add = Math.min(Math.max(1, addSeats), t.maxScaleOutStep);
const recommendedSeats = clampNonNeg(seatsTotal + add);
return {
productId: metrics.productId,
action: 'scale_out',
recommendedSeats,
delta: recommendedSeats - seatsTotal,
reason,
urgency,
signals,
generatedAt: metrics.generatedAt,
};
};
// Budget exhaustion blocks claims — more seats won't help; do not scale out.
if (alertCodes.includes('budget_exhausted')) {
return hold('Budget exhausted — claims are blocked, so scaling out cannot drain the queue.');
}
// 1. No usable capacity for queued work — most urgent.
if (alertCodes.includes('no_live_capacity')) {
return scaleOut(
pressure,
`${queueDepth} job(s) queued with no live, healthy factory — provision capacity now.`,
'critical'
);
}
// 2. Saturation / starvation / sustained high utilization with a backlog.
if (alertCodes.includes('saturated')) {
return scaleOut(
pressure,
`All ${seatsTotal} seat(s) busy with ${queueDepth} job(s) still queued — add capacity.`,
'warning'
);
}
if (alertCodes.includes('queue_starvation')) {
return scaleOut(
pressure,
`Oldest queued job has waited past the starvation threshold — add capacity to drain it.`,
'warning'
);
}
if (pressure > 0 && seatsTotal > 0 && utilizationPct >= t.scaleOutUtilizationPct) {
return scaleOut(
pressure,
`Seat utilization ${utilizationPct}% ≥ ${t.scaleOutUtilizationPct}% with ${queueDepth} queued — add capacity.`,
'warning'
);
}
// 3. Idle slack — reclaim capacity down to the floor (never below in-use seats).
const idle = queueDepth === 0 && active === 0;
if (idle && seatsTotal > t.minSeats && utilizationPct <= t.scaleInUtilizationPct) {
const recommendedSeats = clampNonNeg(Math.max(t.minSeats, seatsUsed));
if (recommendedSeats < seatsTotal) {
return {
productId: metrics.productId,
action: 'scale_in',
recommendedSeats,
delta: recommendedSeats - seatsTotal,
reason: `No queued or in-flight work and ${utilizationPct}% utilization — reclaim idle seats down to ${recommendedSeats}.`,
urgency: 'none',
signals,
generatedAt: metrics.generatedAt,
};
}
}
return hold('Capacity is balanced against current demand.');
}

View File

@ -149,7 +149,12 @@ export interface SubmitResult {
outcome: 'created' | 'deduplicated' | 'superseded';
}
export async function submitJob(productId: string, input: SubmitJobInput): Promise<SubmitResult> {
export async function submitJob(
productId: string,
input: SubmitJobInput,
opts?: { correlationId?: string }
): Promise<SubmitResult> {
const correlationId = opts?.correlationId;
const hash = contentHash(input.bodyMd);
const existingForKey = await repo.findJobsByIdempotencyKey(productId, input.idempotencyKey);
@ -234,6 +239,7 @@ export async function submitJob(productId: string, input: SubmitJobInput): Promi
attemptsBase: 0,
leaseEpoch: 0,
rev: 0,
correlationId,
createdAt: now,
updatedAt: now,
};
@ -297,6 +303,8 @@ export async function submitJob(productId: string, input: SubmitJobInput): Promi
attemptsBase: 0,
leaseEpoch: 0,
rev: 0,
// Children share the parent's trace so a composite job is one work-unit.
correlationId,
createdAt: childNow,
updatedAt: childNow,
};
@ -306,6 +314,7 @@ export async function submitJob(productId: string, input: SubmitJobInput): Promi
await repo.appendEvent({
jobId: childId,
productId,
correlationId,
type: 'submitted',
data: { stage: childStage, idempotencyKey: child.idempotencyKey, parentId: id },
});
@ -314,6 +323,7 @@ export async function submitJob(productId: string, input: SubmitJobInput): Promi
await repo.appendEvent({
jobId: id,
productId,
correlationId,
type: 'submitted',
data: { stage: base.stage, idempotencyKey: input.idempotencyKey, childCount: children.length },
});
@ -488,6 +498,9 @@ export async function tryClaimJob(
const expiresAt = new Date(now + ctx.leaseSeconds * 1000).toISOString();
const nowIso = new Date(now).toISOString();
// The run/lease inherit the job's trace so the whole work-unit shares one id.
const correlationId = job.correlationId;
const existingLease = await repo.getLease(job.id);
let lease: FleetLeaseDoc;
if (existingLease) {
@ -497,6 +510,7 @@ export async function tryClaimJob(
leaseEpoch: newEpoch,
renewals: 0,
status: 'held',
correlationId,
});
lease = updated.ok ? updated.doc : existingLease;
} else {
@ -510,6 +524,7 @@ export async function tryClaimJob(
renewals: 0,
status: 'held',
rev: 0,
correlationId,
updatedAt: nowIso,
});
}
@ -525,6 +540,7 @@ export async function tryClaimJob(
engine: job.engine ?? job.engineClass ?? 'unknown',
startedAt: nowIso,
insights: {},
correlationId,
});
await repo.appendEvent({
@ -532,6 +548,7 @@ export async function tryClaimJob(
productId: ctx.productId,
type: 'assigned',
actor: ctx.factoryId,
correlationId,
data: { leaseEpoch: newEpoch, attempt },
});
@ -846,6 +863,7 @@ export async function patchJobFenced(
jobId,
productId,
type: 'transition',
correlationId: job.correlationId,
data: { stage: patch.stage ?? job.stage, leaseEpoch: patch.leaseEpoch },
});
@ -1082,7 +1100,13 @@ export async function renewLease(
status: 'held',
});
if (!res.ok) return { ok: false, reason: 'conflict' };
await repo.appendEvent({ jobId, productId, type: 'lease_renewed', data: { leaseEpoch } });
await repo.appendEvent({
jobId,
productId,
type: 'lease_renewed',
correlationId: job.correlationId,
data: { leaseEpoch },
});
return { ok: true, doc: res.doc };
}
@ -1202,6 +1226,7 @@ export async function releaseLease(
jobId,
productId,
type: 'retry_scheduled',
correlationId: job.correlationId,
data: {
attempt: job.attempts,
max: job.retry?.max,
@ -1216,6 +1241,7 @@ export async function releaseLease(
jobId,
productId,
type: 'dead_letter',
correlationId: job.correlationId,
data: { attempts: job.attempts, max: job.retry?.max, result: report?.result ?? 'failed' },
});
} else {
@ -1253,7 +1279,13 @@ export async function releaseLease(
engineBreaker.recordSuccess(breakerFactory, breakerEngine);
}
}
await repo.appendEvent({ jobId, productId, type: 'lease_released', data: { leaseEpoch, stage } });
await repo.appendEvent({
jobId,
productId,
type: 'lease_released',
correlationId: job.correlationId,
data: { leaseEpoch, stage },
});
return { ok: true, doc: res.doc };
}

View File

@ -343,6 +343,8 @@ export interface AppendEventInput {
productId: string;
type: string;
actor?: string;
/** Trace-context correlation id of the owning job (§4), stamped on the event. */
correlationId?: string;
data?: Record<string, unknown>;
}
@ -358,6 +360,7 @@ export async function appendEvent(input: AppendEventInput): Promise<FleetEventDo
type: input.type,
at: new Date().toISOString(),
actor: input.actor,
correlationId: input.correlationId,
data: input.data ?? {},
};
return events().create(doc);

View File

@ -13,6 +13,9 @@
* GET /fleet/jobs/:id/events append-only event stream
* GET /fleet/jobs/:id/events/stream live event stream (SSE, resumable)
* GET /fleet/metrics fleet metrics + alerts (queue depth, utilization)
* GET /fleet/metrics/prom Prometheus exposition (all products, labelled)
* GET /fleet/autoscale capacity scale recommendation for this product (§5)
* GET /fleet/autoscale/all capacity scale recommendations for all products (§5)
* GET /fleet/queue-state per-product queue version (M0 RU gate cheap point read)
* POST /fleet/jobs/:id/review/request route a building job into the review gate
* POST /fleet/jobs/:id/review submit a reviewer decision (approve/reject)
@ -36,6 +39,8 @@ import * as trackerBridge from './tracker-bridge.js';
import { getReaperStats } from './reaper.js';
import { getEngineBreakerSnapshot } from './engine-breaker.js';
import { renderFleetMetricsProm, scrapeTokenMatches } from './prometheus.js';
import { recommendScaling, scalingThresholdsFromEnv } from './autoscaler.js';
import { CORRELATION_HEADER, resolveCorrelationId } from './trace.js';
import { getAllProducts } from '../products/cache.js';
import {
SubmitJobSchema,
@ -77,6 +82,16 @@ async function requireOwnProduct(req: import('fastify').FastifyRequest): Promise
const delay = (ms: number): Promise<void> => new Promise(resolve => setTimeout(resolve, ms));
/**
* Echo a job's trace-context correlation id back to the caller on the
* `x-correlation-id` response header (§4), so an external factory can carry it
* on its subsequent renew/release calls and a tracer can stitch the work-unit
* across the coordinatorrunner boundary. No-op when absent (legacy jobs).
*/
function echoCorrelation(reply: import('fastify').FastifyReply, correlationId?: string): void {
if (correlationId) reply.header(CORRELATION_HEADER, correlationId);
}
/** Parse an integer query param, clamping to [min, max]; fall back to `fallback`. */
function clampInt(raw: string | undefined, fallback: number, min: number, max: number): number {
const n = Number.parseInt(raw ?? '', 10);
@ -91,7 +106,13 @@ export async function fleetRoutes(app: FastifyInstance) {
const parsed = SubmitJobSchema.safeParse(req.body);
if (!parsed.success) badRequest(parsed.error.issues);
const pid = parsed.data.productId || (await requireProductAccess(req));
const result = await coordinator.submitJob(pid, parsed.data);
// Mint/propagate the trace-context correlation id from the inbound request
// headers and bind it to the request logger so the whole submit path is
// correlatable (§4). The job persists it; the run/lease/events inherit it.
const correlationId = resolveCorrelationId(req.headers);
req.log = req.log.child({ correlationId });
const result = await coordinator.submitJob(pid, parsed.data, { correlationId });
echoCorrelation(reply, result.job.correlationId ?? correlationId);
reply.code(result.outcome === 'created' ? 201 : 200);
return { outcome: result.outcome, job: result.job };
});
@ -124,7 +145,7 @@ export async function fleetRoutes(app: FastifyInstance) {
});
// ── Fenced state transition ──
app.patch('/fleet/jobs/:id', async req => {
app.patch('/fleet/jobs/:id', async (req, reply) => {
await extractAuth(req);
const { id } = req.params as { id: string };
const pid = await requireProductAccess(req);
@ -138,6 +159,7 @@ export async function fleetRoutes(app: FastifyInstance) {
}
throw new ConflictError('concurrent update conflict — retry');
}
echoCorrelation(reply, res.doc.correlationId);
// §10: opt-in (FLEET_TRACKER_ECHO, default OFF) downstream echo of the new
// stage to the linked tracker Item. Never blocks/fails the transition.
await trackerBridge.maybeEchoOnTransition(pid, id, req.log);
@ -242,7 +264,7 @@ export async function fleetRoutes(app: FastifyInstance) {
return { ...res.doc, gate: res.gate };
});
app.post('/fleet/claim', async req => {
app.post('/fleet/claim', async (req, reply) => {
await extractAuth(req);
const parsed = ClaimSchema.safeParse(req.body);
if (!parsed.success) badRequest(parsed.error.issues);
@ -262,11 +284,18 @@ export async function fleetRoutes(app: FastifyInstance) {
leaseSeconds: parsed.data.leaseSeconds,
});
if (!claim) return { claimed: false };
// Hand the job's trace back to the factory so it can carry it on its
// renew/release calls — closing the loop across the coordinator↔runner
// boundary (§4). Also bind it to the request logger.
if (claim.job.correlationId) {
req.log = req.log.child({ correlationId: claim.job.correlationId });
}
echoCorrelation(reply, claim.job.correlationId);
return { claimed: true, ...claim };
});
// ── Lease renew ──
app.post('/fleet/jobs/:id/lease/renew', async req => {
app.post('/fleet/jobs/:id/lease/renew', async (req, reply) => {
await extractAuth(req);
const { id } = req.params as { id: string };
const pid = await requireProductAccess(req);
@ -283,11 +312,12 @@ export async function fleetRoutes(app: FastifyInstance) {
if (res.reason === 'fenced') throw new ConflictError('stale leaseEpoch — renew fenced');
throw new ConflictError('lease renew conflict — retry');
}
echoCorrelation(reply, res.doc.correlationId);
return res.doc;
});
// ── Lease release ──
app.post('/fleet/jobs/:id/lease/release', async req => {
app.post('/fleet/jobs/:id/lease/release', async (req, reply) => {
await extractAuth(req);
const { id } = req.params as { id: string };
const pid = await requireProductAccess(req);
@ -305,6 +335,7 @@ export async function fleetRoutes(app: FastifyInstance) {
if (res.reason === 'fenced') throw new ConflictError('stale leaseEpoch — release fenced');
throw new ConflictError('lease release conflict — retry');
}
echoCorrelation(reply, res.doc.correlationId);
// §10: release often carries a terminal stage (shipped/failed) — echo it (opt-in).
if (parsed.data.stage) await trackerBridge.maybeEchoOnTransition(pid, id, req.log);
return res.doc;
@ -484,6 +515,43 @@ export async function fleetRoutes(app: FastifyInstance) {
return body;
});
// ── Capacity autoscaling signal (§5) ──
// Advisory scale recommendation for THIS product, derived from the live
// saturation alerts (no_live_capacity / saturated / queue_starvation) + seat
// utilization. Pull model: an external scaler GETs this and decides — the
// coordinator never starts/stops factories itself. Thresholds are tunable via
// FLEET_AUTOSCALE_* env.
app.get('/fleet/autoscale', async req => {
await extractAuth(req);
const pid = await requireProductAccess(req);
const metrics = await coordinator.fleetMetrics(pid);
return recommendScaling(metrics, scalingThresholdsFromEnv());
});
// ── Capacity autoscaling signal — GLOBAL (all products) ──
// One call for a fleet-wide scaler controller. Auth mirrors the Prometheus
// exposition: a matching FLEET_METRICS_TOKEN bearer, else an admin JWT.
app.get('/fleet/autoscale/all', async req => {
if (!scrapeTokenMatches(req.headers['authorization'])) {
const auth = await extractAuth(req);
if (auth.role !== 'admin' && auth.role !== 'super_admin') {
throw new ForbiddenError('admin role or a metrics scrape token is required');
}
}
const thresholds = scalingThresholdsFromEnv();
const recommendations = await Promise.all(
getAllProducts().map(async p =>
recommendScaling(await coordinator.fleetMetrics(p.productId), thresholds)
)
);
return {
generatedAt: new Date().toISOString(),
recommendations,
// Quick roll-up so a controller can act on the actionable subset.
actionable: recommendations.filter(r => r.action !== 'hold'),
};
});
// ── M0 RU gate: per-product queue version (cheap ~1 RU point read) ──
// A polling factory reads this each tick and only runs the expensive claim when
// `version` changed since its last attempt. See

View File

@ -0,0 +1,83 @@
/**
* Fleet trace-context helpers (§4) pure resolution-rule unit tests.
*/
import { describe, it, expect } from 'vitest';
import {
CORRELATION_HEADER,
genCorrelationId,
traceIdFromTraceparent,
correlationIdFromHeaders,
resolveCorrelationId,
} from './trace.js';
describe('genCorrelationId', () => {
it('mints a prefixed, unique id', () => {
const a = genCorrelationId();
const b = genCorrelationId();
expect(a).toMatch(/^ftr_[0-9a-f-]{36}$/);
expect(a).not.toBe(b);
});
});
describe('traceIdFromTraceparent', () => {
it('extracts the 32-hex trace-id from a valid W3C traceparent', () => {
const tp = '00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01';
expect(traceIdFromTraceparent(tp)).toBe('4bf92f3577b34da6a3ce929d0e0e4736');
});
it('lowercases the trace-id', () => {
const tp = '00-4BF92F3577B34DA6A3CE929D0E0E4736-00f067aa0ba902b7-01';
expect(traceIdFromTraceparent(tp)).toBe('4bf92f3577b34da6a3ce929d0e0e4736');
});
it('rejects malformed / all-zero / missing traceparents', () => {
expect(traceIdFromTraceparent(undefined)).toBeUndefined();
expect(traceIdFromTraceparent('garbage')).toBeUndefined();
expect(traceIdFromTraceparent('00-xyz-00f067aa0ba902b7-01')).toBeUndefined();
expect(
traceIdFromTraceparent('00-00000000000000000000000000000000-00f067aa0ba902b7-01')
).toBeUndefined();
});
});
describe('correlationIdFromHeaders', () => {
it('prefers x-correlation-id', () => {
expect(
correlationIdFromHeaders({
[CORRELATION_HEADER]: 'cid-1',
traceparent: '00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01',
'x-request-id': 'req-1',
})
).toBe('cid-1');
});
it('falls back to the traceparent trace-id, then x-request-id', () => {
expect(
correlationIdFromHeaders({
traceparent: '00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01',
'x-request-id': 'req-1',
})
).toBe('4bf92f3577b34da6a3ce929d0e0e4736');
expect(correlationIdFromHeaders({ 'x-request-id': 'req-1' })).toBe('req-1');
});
it('collapses an array-valued header to its first entry', () => {
expect(correlationIdFromHeaders({ [CORRELATION_HEADER]: ['cid-a', 'cid-b'] })).toBe('cid-a');
});
it('returns undefined when no correlating header is present', () => {
expect(correlationIdFromHeaders({})).toBeUndefined();
expect(correlationIdFromHeaders({ [CORRELATION_HEADER]: ' ' })).toBeUndefined();
});
});
describe('resolveCorrelationId', () => {
it('uses the inbound id when present', () => {
expect(resolveCorrelationId({ 'x-request-id': 'req-7' })).toBe('req-7');
});
it('mints a fresh id when the request carries none', () => {
expect(resolveCorrelationId({})).toMatch(/^ftr_/);
});
});

View File

@ -0,0 +1,70 @@
/**
* Fleet trace-context helpers (§4 ops tracing).
*
* A `correlationId` threads a single logical work-unit across the
* coordinatorrunner boundary: it is minted when a job is submitted, persisted
* on the job, then inherited by the run/lease and stamped on every lifecycle
* event (assigned transition lease_renewed lease_released). The
* coordinator echoes it back to the runner on the `x-correlation-id` response
* header so an external factory can carry it on its subsequent renew/release
* calls, and a downstream tracer can stitch job claim run ship together.
*
* PURE + dependency-light so the resolution rules stay unit-testable.
*/
import crypto from 'node:crypto';
/** Response/request header carrying the fleet correlation id. */
export const CORRELATION_HEADER = 'x-correlation-id';
/** Mint a fresh correlation id (`ftr_<uuid>` — distinct from job/run ids). */
export function genCorrelationId(): string {
return `ftr_${crypto.randomUUID()}`;
}
/** Header bag as Fastify exposes it (`string | string[] | undefined`). */
export type HeaderBag = Record<string, string | string[] | undefined>;
/** Read a header case-insensitively, collapsing an array to its first value. */
function header(headers: HeaderBag, name: string): string | undefined {
const v = headers[name] ?? headers[name.toLowerCase()];
const s = Array.isArray(v) ? v[0] : v;
const trimmed = s?.trim();
return trimmed ? trimmed : undefined;
}
/**
* Extract the 32-hex trace-id from a W3C `traceparent` header
* (`<version>-<trace-id>-<parent-id>-<flags>`). Returns undefined when the
* header is malformed or the trace-id is all-zero (the spec's "invalid" value).
*/
export function traceIdFromTraceparent(traceparent: string | undefined): string | undefined {
if (!traceparent) return undefined;
const parts = traceparent.trim().split('-');
if (parts.length < 4) return undefined;
const traceId = parts[1];
if (!/^[0-9a-f]{32}$/i.test(traceId) || /^0{32}$/.test(traceId)) return undefined;
return traceId.toLowerCase();
}
/**
* Resolve the correlation id carried by an inbound request, in precedence
* order: an explicit `x-correlation-id`, then the trace-id of a W3C
* `traceparent`, then the propagated `x-request-id`. Returns undefined when the
* request carries none (the caller decides whether to mint one).
*/
export function correlationIdFromHeaders(headers: HeaderBag): string | undefined {
return (
header(headers, CORRELATION_HEADER) ||
traceIdFromTraceparent(header(headers, 'traceparent')) ||
header(headers, 'x-request-id')
);
}
/**
* Resolve the correlation id for a new job: the inbound request's id when
* present, else a freshly minted one. Always returns a non-empty string.
*/
export function resolveCorrelationId(headers: HeaderBag): string {
return correlationIdFromHeaders(headers) ?? genCorrelationId();
}

View File

@ -0,0 +1,182 @@
/**
* Fleet correlation-id tracing (§4) end-to-end across the coordinatorrunner
* boundary: a job minted with a correlation id propagates onto the run, lease,
* and every lifecycle event, and the routes echo it on the `x-correlation-id`
* response header so an external factory can carry it forward.
*/
import Fastify, { type FastifyInstance } from 'fastify';
import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest';
import { MemoryDatastoreProvider } from '@bytelyst/datastore';
import { _resetDatastoreProvider, setProvider } from '../../lib/datastore.js';
import * as repo from './repository.js';
import * as coord from './coordinator.js';
import { CORRELATION_HEADER } from './trace.js';
import type { SubmitJobInput } from './types.js';
vi.mock('../../lib/auth.js', () => ({
extractAuth: vi.fn(async () => ({ sub: 'user_1', role: 'admin' })),
}));
vi.mock('../../lib/request-context.js', () => ({
getRequestProductId: () => 'lysnrai',
requireProductAccess: async () => 'lysnrai',
}));
const PID = 'lysnrai';
function input(over: Partial<SubmitJobInput> = {}): SubmitJobInput {
return {
idempotencyKey: 'trace-1',
bodyMd: '# do the thing',
priority: 'medium',
capabilities: [],
prefersEngine: [],
allowedScope: [],
deps: [],
kind: 'leaf',
...over,
};
}
const factory = (over = {}) => ({
productId: PID,
factoryId: 'fac_1',
capabilities: [] as string[],
leaseSeconds: 900,
...over,
});
async function buildApp(): Promise<FastifyInstance> {
const { fleetRoutes } = await import('./routes.js');
const app = Fastify({ logger: false });
await app.register(fleetRoutes, { prefix: '/api' });
return app;
}
describe('fleet correlation-id tracing (coordinator)', () => {
beforeEach(() => setProvider(new MemoryDatastoreProvider()));
afterEach(() => {
_resetDatastoreProvider();
vi.clearAllMocks();
});
it('persists the correlation id on the job and stamps the submitted event', async () => {
const { job } = await coord.submitJob(PID, input(), { correlationId: 'cid-abc' });
expect(job.correlationId).toBe('cid-abc');
const events = await repo.listEvents(job.id);
const submitted = events.find(e => e.type === 'submitted');
expect(submitted?.correlationId).toBe('cid-abc');
});
it('run + lease inherit the job correlation id at claim, and the assigned event carries it', async () => {
await coord.submitJob(PID, input(), { correlationId: 'cid-claim' });
const claim = await coord.claimNextJob(factory());
expect(claim).not.toBeNull();
expect(claim!.run.correlationId).toBe('cid-claim');
expect(claim!.lease.correlationId).toBe('cid-claim');
const events = await repo.listEvents(claim!.job.id);
expect(events.find(e => e.type === 'assigned')?.correlationId).toBe('cid-claim');
});
it('threads the correlation id across transition → release lifecycle events', async () => {
await coord.submitJob(PID, input(), { correlationId: 'cid-life' });
const claim = await coord.claimNextJob(factory());
const jobId = claim!.job.id;
const epoch = claim!.lease.leaseEpoch;
await coord.patchJobFenced(jobId, PID, { leaseEpoch: epoch, stage: 'building' });
await coord.renewLease(jobId, PID, epoch, 900);
await coord.releaseLease(jobId, PID, epoch, 'shipped', { result: 'shipped' });
const events = await repo.listEvents(jobId);
// Every lifecycle event for this job shares the one correlation id.
for (const e of events) expect(e.correlationId).toBe('cid-life');
expect(events.map(e => e.type)).toEqual(
expect.arrayContaining([
'submitted',
'assigned',
'transition',
'lease_renewed',
'lease_released',
])
);
});
it('children of a composite job share the parent correlation id', async () => {
const { job } = await coord.submitJob(
PID,
input({
idempotencyKey: 'parent-1',
children: [
{
idempotencyKey: 'child-1',
bodyMd: '# child',
priority: 'medium',
capabilities: [],
prefersEngine: [],
allowedScope: [],
deps: [],
},
],
}),
{ correlationId: 'cid-dag' }
);
expect(job.correlationId).toBe('cid-dag');
const children = await repo.findJobsByIdempotencyKey(PID, 'child-1');
expect(children[0]?.correlationId).toBe('cid-dag');
});
});
describe('fleet correlation-id tracing (routes)', () => {
beforeEach(() => setProvider(new MemoryDatastoreProvider()));
afterEach(() => {
_resetDatastoreProvider();
vi.clearAllMocks();
});
it('POST /fleet/jobs honors an inbound x-correlation-id and echoes it back', async () => {
const app = await buildApp();
const res = await app.inject({
method: 'POST',
url: '/api/fleet/jobs',
headers: { [CORRELATION_HEADER]: 'inbound-cid' },
payload: { idempotencyKey: 'r-1', bodyMd: '# task' },
});
expect(res.statusCode).toBe(201);
expect(res.headers[CORRELATION_HEADER]).toBe('inbound-cid');
expect(JSON.parse(res.body).job.correlationId).toBe('inbound-cid');
});
it('POST /fleet/jobs mints + echoes a correlation id when the request carries none', async () => {
const app = await buildApp();
const res = await app.inject({
method: 'POST',
url: '/api/fleet/jobs',
payload: { idempotencyKey: 'r-2', bodyMd: '# task' },
});
expect(res.statusCode).toBe(201);
const echoed = res.headers[CORRELATION_HEADER];
expect(echoed).toMatch(/^ftr_/);
expect(JSON.parse(res.body).job.correlationId).toBe(echoed);
});
it('POST /fleet/claim hands the job correlation id back on the response header', async () => {
const app = await buildApp();
await app.inject({
method: 'POST',
url: '/api/fleet/jobs',
headers: { [CORRELATION_HEADER]: 'claim-cid' },
payload: { idempotencyKey: 'r-3', bodyMd: '# task' },
});
const res = await app.inject({
method: 'POST',
url: '/api/fleet/claim',
payload: { factoryId: 'fac_1', capabilities: [] },
});
expect(res.statusCode).toBe(200);
expect(res.headers[CORRELATION_HEADER]).toBe('claim-cid');
const body = JSON.parse(res.body);
expect(body.claimed).toBe(true);
expect(body.run.correlationId).toBe('claim-cid');
});
});

View File

@ -232,6 +232,14 @@ export const FleetJobDocSchema = z.object({
* absent on jobs that were never echoed / have no trackerItemId.
*/
trackerEchoedStatus: z.string().optional(),
/**
* Trace-context correlation id (§4) minted at submission from the inbound
* `x-correlation-id`/`traceparent`/`x-request-id` (or freshly generated). The
* run/lease inherit it and every lifecycle event is stamped with it, so a
* single logical work-unit can be stitched across the coordinatorrunner
* boundary (job claim run ship). Optional absent on legacy jobs.
*/
correlationId: z.string().optional(),
createdAt: z.string(),
updatedAt: z.string(),
});
@ -256,6 +264,8 @@ export const FleetRunDocSchema = z.object({
prUrl: z.string().optional(),
branch: z.string().optional(),
prState: z.enum(['open', 'merged']).optional(),
/** Trace-context correlation id inherited from the job at claim time (§4). */
correlationId: z.string().optional(),
});
export type FleetRunDoc = z.infer<typeof FleetRunDocSchema>;
@ -270,6 +280,8 @@ export const FleetLeaseDocSchema = z.object({
renewals: z.number().int().nonnegative().default(0),
status: z.enum(LEASE_STATUS).default('held'),
rev: z.number().int().nonnegative().default(0),
/** Trace-context correlation id inherited from the job at claim time (§4). */
correlationId: z.string().optional(),
updatedAt: z.string(),
});
export type FleetLeaseDoc = z.infer<typeof FleetLeaseDocSchema>;
@ -338,6 +350,9 @@ export const FleetEventDocSchema = z.object({
type: z.string().min(1),
at: z.string(),
actor: z.string().optional(),
/** Trace-context correlation id of the owning job (§4) stamped on every
* lifecycle event so the stream is correlatable without joining to the job. */
correlationId: z.string().optional(),
data: z.record(z.string(), z.unknown()).default({}),
});
export type FleetEventDoc = z.infer<typeof FleetEventDocSchema>;