feat(fleet): cost/latency-aware engine routing + per-engine circuit breaker

Adds two additive, flag-gated routing refinements on top of the §7 scoring
core; both default OFF so the deterministic claim path is unchanged.

- FLEET_COST_ROUTING: soft engineQuality term (weight 0.4) biases routing
  toward the historically cheaper/faster engine, derived from per-engine
  insights.costUsd + run duration. No-history engines stay neutral, so the
  nudge can only demote demonstrably costly engines, never penalise new ones.
- FLEET_ENGINE_BREAKER: per-(factory, engine) circuit breaker. releaseLease
  always records outcomes (observable via /fleet/metrics engineBreakers);
  when enabled, an OPEN pair is routed around. Only ever restricts the
  candidate set — never forces a route.

The scheduler stays pure: history lookup + availability gate are injected
predicates. New engineQuality term contributes 0 unless a lookup is supplied,
preserving every existing score/breakdown.

Generated with [Devin](https://cli.devin.ai/docs)

Co-Authored-By: Devin <158243242+devin-ai-integration[bot]@users.noreply.github.com>
This commit is contained in:
saravanakumardb1 2026-06-01 13:17:25 -07:00
parent 5413c0e789
commit bdbb0a8ce4
10 changed files with 808 additions and 2 deletions

View File

@ -59,6 +59,30 @@ from WIP), and marks the lease `expired`. Idempotent — a reaped lease is no lo
the lease doc; it cannot requeue the job, bump the epoch, or keep the checkpoint),
so the reaper — not TTL — owns recovery.
## Cost/quality-aware routing (§2)
Two **additive, flag-gated** routing refinements layer on top of the §7 scoring
core. Both default OFF, so the deterministic claim path is byte-for-byte unchanged
unless explicitly enabled.
- **Soft engine preference (`FLEET_COST_ROUTING`).** `engine-stats.ts`
(`computeEngineQuality`) aggregates each run's `insights.costUsd` + duration
(`endedAt startedAt`) into a per-engine `[0,1]` quality, anchored so the
cheapest/fastest engine scores 1 and pricier/slower engines decay proportionally.
The scheduler's `engineQuality` term (weight `0.4`, below affinity) gently biases
routing toward the cheaper/faster engine among otherwise-comparable candidates.
Engines with no history are **neutral (1)** — history can only demote a
demonstrably costly engine, never penalise a new one. The term contributes `0`
unless the lookup is injected, so default scoring is untouched.
- **Per-engine circuit breaker (`FLEET_ENGINE_BREAKER`).** `engine-breaker.ts`
tracks failures per `(factoryId, engine)` (CLOSED → OPEN after N consecutive
failures → HALF_OPEN probe after a reset window). `releaseLease` always **records**
the outcome (so breaker state is observable in `/fleet/metrics → engineBreakers`
even before enforcement); when the flag is on, an OPEN pair is removed from that
factory's candidate set so a repeatedly-failing engine (e.g. `codex` erroring on a
box) is routed _around_. The breaker only ever **restricts** — it never forces a
route. Jobs without a concrete `engine` resolve on the runner and are never gated.
## Submit semantics (idempotency + deps)
- same `idempotencyKey` + identical `bodyMd` → returns the existing job (dedup).

View File

@ -1402,3 +1402,143 @@ describe('fleet coordinator — draft lifecycle (save / edit / submit)', () => {
if (!res.ok) expect(res.reason).toBe('not_found');
});
});
// ── §2: per-engine circuit breaker + cost/latency-aware routing ───────────────
import { engineBreaker, getEngineBreakerSnapshot } from './engine-breaker.js';
const sleep = (ms: number) => new Promise(r => setTimeout(r, ms));
describe('fleet coordinator §2 — engine circuit breaker', () => {
beforeEach(() => {
setProvider(new MemoryDatastoreProvider());
engineBreaker.clear();
});
afterEach(() => {
_resetDatastoreProvider();
engineBreaker.clear();
delete process.env.FLEET_ENGINE_BREAKER;
});
it('records a failed attempt against the (factory, engine) pair on release', async () => {
const { job } = await coord.submitJob(PID, input({ engine: 'codex' }));
const claim = await coord.claimNextJob(factory({ capabilities: ['engine:codex'] }));
await coord.releaseLease(job.id, PID, claim!.job.leaseEpoch, 'failed', {
result: 'failed',
insights: { engine: 'codex' },
});
const snap = getEngineBreakerSnapshot();
expect(snap).toContainEqual(
expect.objectContaining({ factoryId: 'fac_1', engine: 'codex', failureCount: 1 })
);
});
it('a shipped release records success and resets the pair', async () => {
engineBreaker.recordFailure('fac_1', 'codex');
const { job } = await coord.submitJob(PID, input({ engine: 'codex' }));
const claim = await coord.claimNextJob(factory({ capabilities: ['engine:codex'] }));
await coord.releaseLease(job.id, PID, claim!.job.leaseEpoch, 'shipped', {
result: 'shipped',
insights: { engine: 'codex' },
});
expect(getEngineBreakerSnapshot()).toContainEqual(
expect.objectContaining({ factoryId: 'fac_1', engine: 'codex', failureCount: 0 })
);
});
it('when enabled, an OPEN (factory, engine) breaker is NOT routed that engine', async () => {
process.env.FLEET_ENGINE_BREAKER = '1';
engineBreaker.recordFailure('fac_1', 'codex');
engineBreaker.recordFailure('fac_1', 'codex');
engineBreaker.recordFailure('fac_1', 'codex'); // → OPEN (default threshold 3)
await coord.submitJob(PID, input({ engine: 'codex' }));
const claim = await coord.claimNextJob(factory({ capabilities: ['engine:codex'] }));
expect(claim).toBeNull();
});
it('the breaker routes AROUND a broken engine to a healthy candidate', async () => {
process.env.FLEET_ENGINE_BREAKER = '1';
engineBreaker.recordFailure('fac_1', 'codex');
engineBreaker.recordFailure('fac_1', 'codex');
engineBreaker.recordFailure('fac_1', 'codex'); // codex OPEN on fac_1
await coord.submitJob(PID, input({ idempotencyKey: 'c', engine: 'codex' }));
await coord.submitJob(PID, input({ idempotencyKey: 'd', engine: 'devin' }));
const claim = await coord.claimNextJob(
factory({ capabilities: ['engine:codex', 'engine:devin'] })
);
expect(claim?.job.engine).toBe('devin');
});
it('with the flag OFF, an OPEN breaker does NOT restrict routing (default behavior)', async () => {
delete process.env.FLEET_ENGINE_BREAKER;
engineBreaker.recordFailure('fac_1', 'codex');
engineBreaker.recordFailure('fac_1', 'codex');
engineBreaker.recordFailure('fac_1', 'codex');
await coord.submitJob(PID, input({ engine: 'codex' }));
const claim = await coord.claimNextJob(factory({ capabilities: ['engine:codex'] }));
expect(claim?.job.engine).toBe('codex');
});
});
describe('fleet coordinator §2 — cost/latency-aware routing', () => {
beforeEach(() => {
setProvider(new MemoryDatastoreProvider());
engineBreaker.clear();
});
afterEach(() => {
_resetDatastoreProvider();
delete process.env.FLEET_COST_ROUTING;
});
/** Seed two completed runs so devin reads as cheap+fast and codex as pricey+slow. */
async function seedHistory(): Promise<void> {
const t0 = Date.parse('2026-06-01T00:00:00.000Z');
await repo.createRun({
id: 'seed-devin',
productId: PID,
jobId: 'seed-devin-job',
attempt: 1,
factoryId: 'fac_1',
engine: 'devin',
startedAt: new Date(t0).toISOString(),
endedAt: new Date(t0 + 1000).toISOString(),
insights: { engine: 'devin', costUsd: 1 },
});
await repo.createRun({
id: 'seed-codex',
productId: PID,
jobId: 'seed-codex-job',
attempt: 1,
factoryId: 'fac_1',
engine: 'codex',
startedAt: new Date(t0).toISOString(),
endedAt: new Date(t0 + 8000).toISOString(),
insights: { engine: 'codex', costUsd: 8 },
});
}
it('prefers the historically cheaper/faster engine over the older candidate', async () => {
await seedHistory();
// codex submitted FIRST (older) — without cost routing the age tie-break picks it.
await coord.submitJob(PID, input({ idempotencyKey: 'codex-job', engine: 'codex' }));
await sleep(3);
await coord.submitJob(PID, input({ idempotencyKey: 'devin-job', engine: 'devin' }));
const both = factory({ capabilities: ['engine:codex', 'engine:devin'] });
process.env.FLEET_COST_ROUTING = '1';
const claim = await coord.claimNextJob(both);
expect(claim?.job.engine).toBe('devin'); // cheaper engine wins the nudge
});
it('with the flag OFF, the age tie-break stands (older codex wins) — default behavior', async () => {
await seedHistory();
await coord.submitJob(PID, input({ idempotencyKey: 'codex-job', engine: 'codex' }));
await sleep(3);
await coord.submitJob(PID, input({ idempotencyKey: 'devin-job', engine: 'devin' }));
const both = factory({ capabilities: ['engine:codex', 'engine:devin'] });
delete process.env.FLEET_COST_ROUTING;
const claim = await coord.claimNextJob(both);
expect(claim?.job.engine).toBe('codex'); // older job wins without the nudge
});
});

View File

@ -37,7 +37,10 @@ import {
type SchedulerWeights,
type FleetWeightRegistry,
resolveWeights,
resolveJobEngine,
} from './scheduler.js';
import { engineBreaker, isEngineBreakerEnabled } from './engine-breaker.js';
import { computeEngineQuality, isCostRoutingEnabled } from './engine-stats.js';
import {
ACTIVE_STAGES,
FLEET_STAGES,
@ -559,6 +562,19 @@ export async function claimNextJob(ctx: ClaimContext): Promise<ClaimResult | nul
const weights = resolveWeights(weightRegistry, ctx.productId, ctx.weights);
const seatLimit = ctx.seatLimit ?? 1;
// §2 cost/latency-aware routing: derive a per-engine quality lookup from run
// history once (flag-gated; off ⇒ the soft engineQuality term stays inert).
let engineQuality: ((engine: string | undefined) => number) | undefined;
if (isCostRoutingEnabled()) {
const runs = await repo.listRunsByProduct(ctx.productId);
engineQuality = computeEngineQuality(runs).lookup;
}
// §2 per-engine circuit breaker: stop routing an engine that keeps failing on
// THIS factory (flag-gated; only RESTRICTS the candidate set, never forces one).
const isEngineAvailable = isEngineBreakerEnabled()
? (engine: string): boolean => engineBreaker.allow(ctx.factoryId, engine)
: undefined;
for (let i = 0; i < CLAIM_MAX_RETRIES; i++) {
// Phase 3: budget enforcement (FLEET_BUDGETS flag)
if (isBudgetsEnabled()) {
@ -595,6 +611,8 @@ export async function claimNextJob(ctx: ClaimContext): Promise<ClaimResult | nul
factoryEngines: ctx.factoryEngines,
warmScopes: ctx.warmScopes,
costCeilingUsd: ctx.costCeilingUsd,
engineQuality,
isEngineAvailable,
};
pick = selectJob(candidates, factory, schedulerCtx, weights);
}
@ -1201,6 +1219,20 @@ export async function releaseLease(
endedAt: new Date().toISOString(),
});
}
// §2 per-engine circuit breaker bookkeeping: attribute this attempt's outcome to
// the (factory, engine) pair so a repeatedly-failing engine on a box can be routed
// around. We RECORD regardless of the enforcement flag (so breaker state is
// observable in metrics even before enforcement is enabled); only the claim-time
// restriction is gated. The concrete engine the factory ran wins, else the job's.
const breakerFactory = lease.holderFactoryId;
const breakerEngine = report?.insights?.engine ?? resolveJobEngine(job);
if (breakerFactory && breakerEngine && breakerEngine !== 'unknown') {
if (isFailure) {
engineBreaker.recordFailure(breakerFactory, breakerEngine);
} else if (stage === 'review' || stage === 'testing' || stage === 'shipped') {
engineBreaker.recordSuccess(breakerFactory, breakerEngine);
}
}
await repo.appendEvent({ jobId, productId, type: 'lease_released', data: { leaseEpoch, stage } });
return { ok: true, doc: res.doc };
}

View File

@ -0,0 +1,113 @@
/**
* Per-engine circuit breaker (§2) pure, deterministic unit tests.
* Time is injected so no real clock is involved.
*/
import { describe, expect, it } from 'vitest';
import { EngineCircuitBreaker, isEngineBreakerEnabled } from './engine-breaker.js';
const T0 = Date.parse('2026-06-01T00:00:00.000Z');
describe('engine-breaker §2 — state machine', () => {
it('a fresh (factory, engine) pair is CLOSED ⇒ routable', () => {
const b = new EngineCircuitBreaker();
expect(b.allow('f1', 'codex', T0)).toBe(true);
expect(b.isOpen('f1', 'codex', T0)).toBe(false);
});
it('opens after the failure threshold of consecutive failures', () => {
const b = new EngineCircuitBreaker({ failureThreshold: 3, resetTimeoutMs: 60_000 });
b.recordFailure('f1', 'codex', T0);
b.recordFailure('f1', 'codex', T0);
expect(b.allow('f1', 'codex', T0)).toBe(true); // 2 < 3 — still closed
b.recordFailure('f1', 'codex', T0);
expect(b.allow('f1', 'codex', T0)).toBe(false); // 3rd failure trips it OPEN
});
it('a success before the threshold resets the failure count', () => {
const b = new EngineCircuitBreaker({ failureThreshold: 3 });
b.recordFailure('f1', 'codex', T0);
b.recordFailure('f1', 'codex', T0);
b.recordSuccess('f1', 'codex');
b.recordFailure('f1', 'codex', T0);
b.recordFailure('f1', 'codex', T0);
expect(b.allow('f1', 'codex', T0)).toBe(true); // count restarted ⇒ 2 < 3
});
it('isolates per factory AND per engine', () => {
const b = new EngineCircuitBreaker({ failureThreshold: 1 });
b.recordFailure('f1', 'codex', T0);
expect(b.allow('f1', 'codex', T0)).toBe(false); // tripped
expect(b.allow('f2', 'codex', T0)).toBe(true); // other factory unaffected
expect(b.allow('f1', 'devin', T0)).toBe(true); // other engine unaffected
});
});
describe('engine-breaker §2 — half-open recovery', () => {
it('after the reset window it allows exactly one probe (HALF_OPEN)', () => {
const b = new EngineCircuitBreaker({ failureThreshold: 1, resetTimeoutMs: 60_000 });
b.recordFailure('f1', 'codex', T0);
expect(b.allow('f1', 'codex', T0 + 30_000)).toBe(false); // still inside the window
expect(b.allow('f1', 'codex', T0 + 60_000)).toBe(true); // window elapsed ⇒ probe
});
it('a successful probe closes the breaker', () => {
const b = new EngineCircuitBreaker({ failureThreshold: 1, resetTimeoutMs: 60_000 });
b.recordFailure('f1', 'codex', T0);
b.allow('f1', 'codex', T0 + 60_000); // → HALF_OPEN
b.recordSuccess('f1', 'codex');
expect(b.allow('f1', 'codex', T0 + 61_000)).toBe(true);
expect(b.isOpen('f1', 'codex', T0 + 61_000)).toBe(false);
});
it('a failed probe immediately re-opens the breaker', () => {
const b = new EngineCircuitBreaker({ failureThreshold: 3, resetTimeoutMs: 60_000 });
b.recordFailure('f1', 'codex', T0);
b.recordFailure('f1', 'codex', T0);
b.recordFailure('f1', 'codex', T0); // OPEN
b.allow('f1', 'codex', T0 + 60_000); // → HALF_OPEN (probe allowed)
b.recordFailure('f1', 'codex', T0 + 61_000); // probe fails
expect(b.allow('f1', 'codex', T0 + 61_000)).toBe(false); // re-opened, fresh window
});
});
describe('engine-breaker §2 — snapshot + clear', () => {
it('snapshot reports tracked pairs and reflects the reset window', () => {
const b = new EngineCircuitBreaker({ failureThreshold: 1, resetTimeoutMs: 60_000 });
b.recordFailure('f1', 'codex', T0);
const open = b.snapshot(T0 + 1_000);
expect(open).toHaveLength(1);
expect(open[0]).toMatchObject({ factoryId: 'f1', engine: 'codex', state: 'OPEN' });
expect(open[0].lastFailureAt).toBe(new Date(T0).toISOString());
// once the window elapses the snapshot shows HALF_OPEN (a probe is available)
const half = b.snapshot(T0 + 60_000);
expect(half[0].state).toBe('HALF_OPEN');
});
it('clear drops all state', () => {
const b = new EngineCircuitBreaker({ failureThreshold: 1 });
b.recordFailure('f1', 'codex', T0);
b.clear();
expect(b.snapshot(T0)).toEqual([]);
expect(b.allow('f1', 'codex', T0)).toBe(true);
});
});
describe('engine-breaker §2 — feature flag', () => {
it('isEngineBreakerEnabled defaults OFF and honors truthy env values', () => {
const prev = process.env.FLEET_ENGINE_BREAKER;
try {
delete process.env.FLEET_ENGINE_BREAKER;
expect(isEngineBreakerEnabled()).toBe(false);
process.env.FLEET_ENGINE_BREAKER = '1';
expect(isEngineBreakerEnabled()).toBe(true);
process.env.FLEET_ENGINE_BREAKER = 'true';
expect(isEngineBreakerEnabled()).toBe(true);
process.env.FLEET_ENGINE_BREAKER = 'off';
expect(isEngineBreakerEnabled()).toBe(false);
} finally {
if (prev === undefined) delete process.env.FLEET_ENGINE_BREAKER;
else process.env.FLEET_ENGINE_BREAKER = prev;
}
});
});

View File

@ -0,0 +1,158 @@
/**
* Per-engine circuit breaker for fleet routing (§2 cost/quality routing).
*
* Tracks failures per `(factoryId, engine)` pair. When a concrete engine
* repeatedly fails on a given factory (e.g. `codex` erroring on a box that lacks
* a working install), the breaker OPENS for that pair and the scheduler stops
* routing that engine THERE without ever forcing a job onto a worse factory.
* The breaker only ever RESTRICTS the candidate set; it never overrides the
* deterministic §7 scoring or forces a route.
*
* States mirror the sidecar breaker (extraction-service): CLOSED (normal) OPEN
* (skip-route) HALF_OPEN (allow one probe after the reset window). A single
* probe success closes the breaker; a probe failure re-opens it.
*
* This module is PURE w.r.t. I/O state is in-memory and process-wide (one
* coordinator process owns the claim path, matching the reaper-stats pattern).
* Time is injectable so the logic is fully unit-testable without a real clock.
*/
type BreakerState = 'CLOSED' | 'OPEN' | 'HALF_OPEN';
export interface EngineBreakerOptions {
/** Consecutive failures on a (factory, engine) before the breaker OPENS. */
failureThreshold?: number;
/** How long a breaker stays OPEN before allowing a single HALF_OPEN probe. */
resetTimeoutMs?: number;
}
/** One (factory, engine) entry — its current state plus a snapshot for operators. */
export interface EngineBreakerEntry {
factoryId: string;
engine: string;
state: BreakerState;
failureCount: number;
lastFailureAt: string | null;
}
interface Cell {
state: BreakerState;
failureCount: number;
lastFailureMs: number;
}
const DEFAULT_FAILURE_THRESHOLD = 3;
const DEFAULT_RESET_TIMEOUT_MS = 5 * 60_000; // 5 minutes
function key(factoryId: string, engine: string): string {
return `${factoryId}\u0000${engine}`;
}
/**
* A registry of per-`(factoryId, engine)` circuit breakers. Construct directly in
* tests (with injected time); the coordinator uses the shared module singleton.
*/
export class EngineCircuitBreaker {
private readonly cells = new Map<string, Cell>();
private readonly failureThreshold: number;
private readonly resetTimeoutMs: number;
constructor(opts: EngineBreakerOptions = {}) {
this.failureThreshold = opts.failureThreshold ?? DEFAULT_FAILURE_THRESHOLD;
this.resetTimeoutMs = opts.resetTimeoutMs ?? DEFAULT_RESET_TIMEOUT_MS;
}
/**
* Whether a job pinned to `engine` may be routed to `factoryId` right now.
* CLOSED yes. OPEN no, until the reset window elapses, after which we move
* to HALF_OPEN and allow exactly one probe. May mutate state (OPENHALF_OPEN),
* mirroring the sidecar breaker's `allowRequest`.
*/
allow(factoryId: string, engine: string, now: number = Date.now()): boolean {
const cell = this.cells.get(key(factoryId, engine));
if (!cell || cell.state === 'CLOSED') return true;
if (cell.state === 'OPEN') {
if (now - cell.lastFailureMs >= this.resetTimeoutMs) {
cell.state = 'HALF_OPEN';
return true; // allow a single probe
}
return false;
}
return true; // HALF_OPEN — the probe is in flight; allow it through
}
/** Inverse of {@link allow} — convenience for "is this route blocked?". */
isOpen(factoryId: string, engine: string, now: number = Date.now()): boolean {
return !this.allow(factoryId, engine, now);
}
/** Record a failed run of `engine` on `factoryId`; may trip the breaker OPEN. */
recordFailure(factoryId: string, engine: string, now: number = Date.now()): void {
const k = key(factoryId, engine);
const cell = this.cells.get(k) ?? { state: 'CLOSED', failureCount: 0, lastFailureMs: 0 };
cell.failureCount += 1;
cell.lastFailureMs = now;
// A failed probe (HALF_OPEN) re-opens immediately; otherwise open on threshold.
if (cell.state === 'HALF_OPEN' || cell.failureCount >= this.failureThreshold) {
cell.state = 'OPEN';
}
this.cells.set(k, cell);
}
/** Record a successful run of `engine` on `factoryId` — closes/resets the pair. */
recordSuccess(factoryId: string, engine: string): void {
const k = key(factoryId, engine);
const cell = this.cells.get(k);
if (!cell) return; // never failed ⇒ stays implicitly CLOSED
cell.state = 'CLOSED';
cell.failureCount = 0;
this.cells.set(k, cell);
}
/**
* Operator snapshot of every tracked pair, with OPENHALF_OPEN reset applied as
* of `now` so the view reflects what routing would actually do. Read-only:
* callers get a copy and cannot mutate internal state.
*/
snapshot(now: number = Date.now()): EngineBreakerEntry[] {
const out: EngineBreakerEntry[] = [];
for (const [k, cell] of this.cells) {
const sep = k.indexOf('\u0000');
const factoryId = k.slice(0, sep);
const engine = k.slice(sep + 1);
const open = cell.state === 'OPEN' && now - cell.lastFailureMs < this.resetTimeoutMs;
out.push({
factoryId,
engine,
state: open ? 'OPEN' : cell.state === 'OPEN' ? 'HALF_OPEN' : cell.state,
failureCount: cell.failureCount,
lastFailureAt: cell.lastFailureMs ? new Date(cell.lastFailureMs).toISOString() : null,
});
}
return out;
}
/** Drop all state (tests / operator reset). */
clear(): void {
this.cells.clear();
}
}
/** Process-wide breaker shared by the claim path + metrics (one coordinator). */
export const engineBreaker = new EngineCircuitBreaker();
/** Snapshot of the shared breaker for operator metrics / the fleet dashboard. */
export function getEngineBreakerSnapshot(now: number = Date.now()): EngineBreakerEntry[] {
return engineBreaker.snapshot(now);
}
/**
* `FLEET_ENGINE_BREAKER` env gate default OFF. The breaker always RECORDS
* outcomes (so its state is observable in metrics even before enforcement), but
* only RESTRICTS routing when this flag is enabled. Keeps default behavior
* byte-for-byte unchanged.
*/
export function isEngineBreakerEnabled(): boolean {
const v = (process.env.FLEET_ENGINE_BREAKER ?? '').trim().toLowerCase();
return v === '1' || v === 'true' || v === 'on' || v === 'yes';
}

View File

@ -0,0 +1,102 @@
/**
* Per-engine quality from run history (§2) pure unit tests.
*/
import { describe, expect, it } from 'vitest';
import { computeEngineQuality, isCostRoutingEnabled } from './engine-stats.js';
import type { FleetRunDoc } from './types.js';
let seq = 0;
function run(over: Partial<FleetRunDoc> & { engine: string }): FleetRunDoc {
seq += 1;
return {
id: `r${seq}`,
productId: 'lysnrai',
jobId: `j${seq}`,
attempt: 1,
startedAt: '2026-06-01T00:00:00.000Z',
insights: {},
...over,
} as FleetRunDoc;
}
/** A run with a cost and a duration (ms) on a given engine. */
function costed(engine: string, costUsd: number, durationMs: number): FleetRunDoc {
const start = '2026-06-01T00:00:00.000Z';
const end = new Date(Date.parse(start) + durationMs).toISOString();
return run({ engine, startedAt: start, endedAt: end, insights: { costUsd } });
}
describe('computeEngineQuality §2', () => {
it('cheapest + fastest engine anchors quality at 1; pricier/slower decays', () => {
const { lookup, byEngine } = computeEngineQuality([
costed('devin', 1, 1000),
costed('codex', 4, 4000),
]);
expect(lookup('devin')).toBeCloseTo(1);
// codex: costScore 1/4, durScore 1000/4000 ⇒ (0.25 + 0.25)/2 = 0.25
expect(lookup('codex')).toBeCloseTo(0.25);
expect(byEngine.get('codex')?.avgCostUsd).toBe(4);
expect(byEngine.get('codex')?.samples).toBe(1);
});
it('averages multiple runs per engine', () => {
const { byEngine } = computeEngineQuality([costed('devin', 2, 1000), costed('devin', 4, 3000)]);
expect(byEngine.get('devin')?.avgCostUsd).toBe(3);
expect(byEngine.get('devin')?.avgDurationMs).toBe(2000);
});
it('unknown / no-history engines are neutral (1) — never penalised', () => {
const { lookup } = computeEngineQuality([costed('devin', 5, 5000)]);
expect(lookup('claude')).toBe(1);
expect(lookup(undefined)).toBe(1);
});
it('a single engine with data scores 1 (term is inert with no peer to compare)', () => {
const { lookup } = computeEngineQuality([costed('devin', 9, 9000)]);
expect(lookup('devin')).toBeCloseTo(1);
});
it('uses whichever signal is present (cost-only / duration-only)', () => {
const { lookup } = computeEngineQuality([
run({ engine: 'devin', insights: { costUsd: 1 } }), // cost only, no endedAt
run({ engine: 'codex', insights: { costUsd: 5 } }),
]);
expect(lookup('devin')).toBeCloseTo(1);
expect(lookup('codex')).toBeCloseTo(0.2);
});
it('ignores runs with no engine, "unknown" engine, or no usable signal', () => {
const { byEngine } = computeEngineQuality([
run({ engine: 'unknown', insights: { costUsd: 3 } }),
run({ engine: 'devin', insights: {} }), // no cost, no end
costed('codex', 2, 2000),
]);
expect(byEngine.has('unknown')).toBe(false);
expect(byEngine.has('devin')).toBe(false);
expect(byEngine.has('codex')).toBe(true);
});
it('empty history ⇒ everything neutral', () => {
const { lookup, byEngine } = computeEngineQuality([]);
expect(byEngine.size).toBe(0);
expect(lookup('devin')).toBe(1);
});
});
describe('isCostRoutingEnabled §2', () => {
it('defaults OFF and honors truthy env values', () => {
const prev = process.env.FLEET_COST_ROUTING;
try {
delete process.env.FLEET_COST_ROUTING;
expect(isCostRoutingEnabled()).toBe(false);
process.env.FLEET_COST_ROUTING = 'on';
expect(isCostRoutingEnabled()).toBe(true);
process.env.FLEET_COST_ROUTING = '0';
expect(isCostRoutingEnabled()).toBe(false);
} finally {
if (prev === undefined) delete process.env.FLEET_COST_ROUTING;
else process.env.FLEET_COST_ROUTING = prev;
}
});
});

View File

@ -0,0 +1,126 @@
/**
* Per-engine quality from run history (§2 cost/latency-aware routing).
*
* PURE + synchronous: given a set of completed runs, derive a `(engine) => [0,1]`
* quality lookup the scheduler's soft `engineQuality` term consumes. 1 = the
* historically cheapest + fastest engine; lower = more expensive / slower.
*
* Design choices that keep this a safe, gentle nudge:
* Scores are RELATIVE: the cheapest observed engine anchors costScore = 1, the
* fastest anchors durationScore = 1; others decay proportionally. With a single
* engine (or no data) every engine scores 1 the term is inert.
* Engines with NO history return the neutral 1 history can only DEMOTE a
* demonstrably costly/slow engine, never penalise a new/unknown one.
* Cost and duration are averaged independently and combined 50/50; an engine
* with only one signal uses whichever it has.
*/
import type { FleetRunDoc } from './types.js';
interface Accum {
costSum: number;
costN: number;
durSum: number;
durN: number;
}
/** Per-engine averages plus the final blended quality in [0,1]. */
export interface EngineQualityEntry {
engine: string;
avgCostUsd: number | null;
avgDurationMs: number | null;
/** Runs that contributed at least one signal (cost or duration). */
samples: number;
quality: number;
}
export interface EngineQuality {
byEngine: Map<string, EngineQualityEntry>;
/** Lookup for the scheduler: unknown / no-history engines ⇒ neutral 1. */
lookup: (engine: string | undefined) => number;
}
/** The engine a run actually used: the factory-reported concrete engine wins,
* else the engine the run was created with. */
function runEngine(run: FleetRunDoc): string | undefined {
return run.insights?.engine ?? run.engine;
}
function runDurationMs(run: FleetRunDoc): number | null {
if (!run.endedAt) return null;
const start = Date.parse(run.startedAt);
const end = Date.parse(run.endedAt);
if (Number.isNaN(start) || Number.isNaN(end) || end < start) return null;
return end - start;
}
/**
* Build the per-engine quality lookup from `runs`. Only runs with a known engine
* and at least one usable signal (positive cost or a valid duration) contribute.
*/
export function computeEngineQuality(runs: readonly FleetRunDoc[]): EngineQuality {
const acc = new Map<string, Accum>();
for (const run of runs) {
const engine = runEngine(run);
if (!engine || engine === 'unknown') continue;
const cost = run.insights?.costUsd;
const dur = runDurationMs(run);
const hasCost = typeof cost === 'number' && cost > 0;
if (!hasCost && dur === null) continue;
const a = acc.get(engine) ?? { costSum: 0, costN: 0, durSum: 0, durN: 0 };
if (hasCost) {
a.costSum += cost as number;
a.costN += 1;
}
if (dur !== null) {
a.durSum += dur;
a.durN += 1;
}
acc.set(engine, a);
}
// Anchors: the cheapest avg cost and the fastest avg duration across engines.
let minAvgCost = Infinity;
let minAvgDur = Infinity;
const avgs = new Map<string, { cost: number | null; dur: number | null; samples: number }>();
for (const [engine, a] of acc) {
const cost = a.costN > 0 ? a.costSum / a.costN : null;
const dur = a.durN > 0 ? a.durSum / a.durN : null;
avgs.set(engine, { cost, dur, samples: Math.max(a.costN, a.durN) });
if (cost !== null && cost < minAvgCost) minAvgCost = cost;
if (dur !== null && dur < minAvgDur) minAvgDur = dur;
}
const byEngine = new Map<string, EngineQualityEntry>();
for (const [engine, { cost, dur, samples }] of avgs) {
// Relative scores in (0,1]: cheapest/fastest = 1, others decay proportionally.
const costScore = cost !== null && cost > 0 ? minAvgCost / cost : cost === 0 ? 1 : null;
const durScore = dur !== null && dur > 0 ? minAvgDur / dur : dur === 0 ? 1 : null;
const parts = [costScore, durScore].filter((s): s is number => s !== null);
const quality = parts.length > 0 ? parts.reduce((s, v) => s + v, 0) / parts.length : 1;
byEngine.set(engine, {
engine,
avgCostUsd: cost,
avgDurationMs: dur,
samples,
quality,
});
}
const lookup = (engine: string | undefined): number => {
if (!engine) return 1;
return byEngine.get(engine)?.quality ?? 1;
};
return { byEngine, lookup };
}
/**
* `FLEET_COST_ROUTING` env gate default OFF. When enabled, the coordinator
* injects the engine-quality lookup into the scheduler so cost/latency history
* gently biases routing. Off the soft term stays inert (no behavior change).
*/
export function isCostRoutingEnabled(): boolean {
const v = (process.env.FLEET_COST_ROUTING ?? '').trim().toLowerCase();
return v === '1' || v === 'true' || v === 'on' || v === 'yes';
}

View File

@ -34,6 +34,7 @@ import * as artifactsBlob from './artifacts-blob.js';
import * as enrollment from './enrollment.js';
import * as trackerBridge from './tracker-bridge.js';
import { getReaperStats } from './reaper.js';
import { getEngineBreakerSnapshot } from './engine-breaker.js';
import {
SubmitJobSchema,
ListJobsQuerySchema,
@ -449,7 +450,9 @@ export async function fleetRoutes(app: FastifyInstance) {
const metrics = await coordinator.fleetMetrics(pid);
// Attach process-wide recovery/GC telemetry (the reaper runs across products,
// so these counters are global — operator visibility into recovery activity).
return { ...metrics, reaper: getReaperStats() };
// engineBreakers exposes any (factory, engine) pairs currently tripped/probing
// so the dashboard can flag routing that's being restricted (§2).
return { ...metrics, reaper: getReaperStats(), engineBreakers: getEngineBreakerSnapshot() };
});
// ── M0 RU gate: per-product queue version (cheap ~1 RU point read) ──

View File

@ -223,6 +223,59 @@ describe('scheduler §7 — affinity', () => {
});
});
describe('scheduler §2 — soft engine-quality term', () => {
it('is inert (0) when no engineQuality lookup is provided', () => {
const j = job({ id: 'j', engine: 'codex' });
expect(scoreCandidate(j, fac(), ctx()).breakdown.engineQuality).toBe(0);
});
it('prefers the historically cheaper/faster engine among eligible candidates', () => {
const cheap = job({ id: 'cheap', engine: 'devin' });
const pricey = job({ id: 'pricey', engine: 'codex' });
// devin scores high (1), codex low (0.2) — same factory, all else equal.
const quality = (e: string | undefined) => (e === 'devin' ? 1 : e === 'codex' ? 0.2 : 1);
const c = ctx({ engineQuality: quality });
expect(selectJob([pricey, cheap], fac(), c)?.id).toBe('cheap');
expect(scoreCandidate(cheap, fac(), c).breakdown.engineQuality).toBeGreaterThan(
scoreCandidate(pricey, fac(), c).breakdown.engineQuality
);
});
it('is only a gentle nudge — a real signal (starvation) dominates it', () => {
// The cheap engine is fresh; the pricey engine has aged a long time. The
// anti-starvation boost (weight 1.5) outweighs the engine-quality nudge (0.4),
// so the aged pricey-engine job still wins — the nudge never overpowers a
// genuine routing signal.
const cheapFresh = job({ id: 'cheapFresh', engine: 'devin', createdAt: iso(0) });
const priceyAged = job({ id: 'priceyAged', engine: 'codex', createdAt: iso(40 * 60_000) });
const quality = (e: string | undefined) => (e === 'devin' ? 1 : 0);
const c = ctx({ engineQuality: quality });
expect(selectJob([cheapFresh, priceyAged], fac(), c)?.id).toBe('priceyAged');
});
});
describe('scheduler §2 — hard engine-availability gate', () => {
it('excludes a concrete-engine job when the engine is unavailable (e.g. breaker open)', () => {
const codexJob = job({ id: 'codex', engine: 'codex' });
const blocked = ctx({ isEngineAvailable: e => e !== 'codex' });
expect(selectJob([codexJob], fac(), blocked)).toBeNull();
expect(selectJob([codexJob], fac(), ctx())?.id).toBe('codex'); // available by default
});
it('routes around an unavailable engine to an available candidate', () => {
const codexJob = job({ id: 'codex', engine: 'codex' });
const devinJob = job({ id: 'devin', engine: 'devin' });
const c = ctx({ isEngineAvailable: e => e !== 'codex' });
expect(selectJob([codexJob, devinJob], fac(), c)?.id).toBe('devin');
});
it('never gates a job without a concrete engine (resolves on the runner)', () => {
const abstract = job({ id: 'abs' }); // no concrete engine
const c = ctx({ isEngineAvailable: () => false });
expect(selectJob([abstract], fac(), c)?.id).toBe('abs');
});
});
describe('scheduler §7 — breakdown & determinism', () => {
it('breakdown is per-weighted-term and sums to the score', () => {
const j = job({ id: 'j', priority: 'high', budget: { usd: 3 }, createdAt: iso(90_000) });

View File

@ -47,6 +47,10 @@ export interface SchedulerWeights {
/** w6 starvation: subtracts a freshness penalty that decays as a job ages,
* so an aged job outranks an equally-prioritised fresh one (anti-starvation). */
starvation: number;
/** w7 engine quality: a SOFT preference for the historically cheaper/faster
* engine among eligible candidates (§2). Contributes 0 unless the caller injects
* a `ctx.engineQuality` lookup, so it never alters default scoring. */
engineQuality: number;
}
/**
@ -62,6 +66,10 @@ export const DEFAULT_WEIGHTS: SchedulerWeights = {
costFit: 0.75,
health: 1.0,
starvation: 1.5,
// A gentle nudge (0.4) — below affinity — so cost/latency history only breaks
// near-ties between otherwise-comparable engines; it never overrides hard signals.
// Inert (contributes 0) unless `ctx.engineQuality` is supplied.
engineQuality: 0.4,
};
/** Aging config for the starvation term (fixed this phase). */
@ -108,6 +116,20 @@ export interface SchedulerContext {
costCeilingUsd?: number;
/** Override the starvation aging config (fixed defaults otherwise). */
starvation?: StarvationConfig;
/**
* SOFT engine-quality lookup (§2): maps a candidate's engine to a [0,1] score
* where 1 = historically cheapest/fastest and lower = more expensive/slower.
* Omitted the engineQuality term contributes 0 (default scoring is unchanged).
* The coordinator derives this from per-engine run history (cost + duration).
*/
engineQuality?: (engine: string | undefined) => number;
/**
* HARD engine-availability gate (§2): when supplied, a job pinned to a concrete
* `engine` is eligible only if this returns true for that engine. The coordinator
* composes the per-engine circuit breaker and per-engine budget here. It can only
* RESTRICT routing (never force one). Omitted all engines available.
*/
isEngineAvailable?: (engine: string) => boolean;
}
/** Per-term, already-weighted contributions. Sums to `score` (starvation signed ). */
@ -118,6 +140,7 @@ export interface ScoreBreakdown {
costFit: number;
health: number;
starvation: number;
engineQuality: number;
}
export interface ScoredCandidate {
@ -229,6 +252,31 @@ function starvationPenaltyTerm(job: FleetJobDoc, ctx: SchedulerContext): number
return clamp01(1 - aged / cfg.buckets);
}
/** The engine a candidate will (most likely) run on: an explicit concrete engine,
* else the first prefers-engine hint, else the abstract class. Used by the soft
* engine-quality term and the hard availability gate. */
export function resolveJobEngine(job: FleetJobDoc): string | undefined {
return job.engine ?? job.manifestSnapshot?.prefersEngine?.[0] ?? job.engineClass;
}
/** w7 term soft engine-quality preference in [0,1]. Inert (0) unless the caller
* injects `ctx.engineQuality`; an unknown engine returns whatever the lookup says
* (the coordinator's lookup treats no-history engines as neutral = 1, so history
* can only DEMOTE demonstrably expensive/slow engines, never penalise new ones). */
function engineQualityTerm(job: FleetJobDoc, ctx: SchedulerContext): number {
if (!ctx.engineQuality) return 0;
return clamp01(ctx.engineQuality(resolveJobEngine(job)));
}
/** Hard availability gate: a concrete-engine job is eligible only when the injected
* predicate allows that engine. Jobs without a concrete engine resolve on the
* runner and are never gated here (mirrors `engineEligible`). No predicate all. */
export function engineAvailable(job: FleetJobDoc, ctx: SchedulerContext): boolean {
if (!ctx.isEngineAvailable) return true;
if (!job.engine) return true;
return ctx.isEngineAvailable(job.engine);
}
// ── Public scoring API ────────────────────────────────────────────────────────
/**
@ -249,6 +297,7 @@ export function scoreCandidate(
costFit: weights.costFit * costFitTerm(job, ctx),
health: weights.health * healthTerm(factory),
starvation: -weights.starvation * starvationPenaltyTerm(job, ctx),
engineQuality: weights.engineQuality * engineQualityTerm(job, ctx),
};
const score =
breakdown.capabilityFit +
@ -256,7 +305,8 @@ export function scoreCandidate(
breakdown.load +
breakdown.costFit +
breakdown.health +
breakdown.starvation;
breakdown.starvation +
breakdown.engineQuality;
return { score, breakdown };
}
@ -296,6 +346,10 @@ export function resolveWeights(
health: requestOverride?.health ?? productEntry?.health ?? DEFAULT_WEIGHTS.health,
starvation:
requestOverride?.starvation ?? productEntry?.starvation ?? DEFAULT_WEIGHTS.starvation,
engineQuality:
requestOverride?.engineQuality ??
productEntry?.engineQuality ??
DEFAULT_WEIGHTS.engineQuality,
};
}
@ -398,6 +452,7 @@ export function selectJob(
!isAwaitingRetryBackoff(job, ctx.now) &&
capabilitiesSubset(job.capabilities ?? [], factory.capabilities) &&
engineEligible(job, factory.capabilities) &&
engineAvailable(job, ctx) &&
depsSatisfied(job)
);
if (eligible.length === 0) return null;