From bdbb0a8ce4df3438ba17f996ac8004526d9eb814 Mon Sep 17 00:00:00 2001 From: saravanakumardb1 Date: Mon, 1 Jun 2026 13:17:25 -0700 Subject: [PATCH] feat(fleet): cost/latency-aware engine routing + per-engine circuit breaker MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds two additive, flag-gated routing refinements on top of the §7 scoring core; both default OFF so the deterministic claim path is unchanged. - FLEET_COST_ROUTING: soft engineQuality term (weight 0.4) biases routing toward the historically cheaper/faster engine, derived from per-engine insights.costUsd + run duration. No-history engines stay neutral, so the nudge can only demote demonstrably costly engines, never penalise new ones. - FLEET_ENGINE_BREAKER: per-(factory, engine) circuit breaker. releaseLease always records outcomes (observable via /fleet/metrics engineBreakers); when enabled, an OPEN pair is routed around. Only ever restricts the candidate set — never forces a route. The scheduler stays pure: history lookup + availability gate are injected predicates. New engineQuality term contributes 0 unless a lookup is supplied, preserving every existing score/breakdown. Generated with [Devin](https://cli.devin.ai/docs) Co-Authored-By: Devin <158243242+devin-ai-integration[bot]@users.noreply.github.com> --- .../src/modules/fleet/README.md | 24 +++ .../src/modules/fleet/coordinator.test.ts | 140 ++++++++++++++++ .../src/modules/fleet/coordinator.ts | 32 ++++ .../src/modules/fleet/engine-breaker.test.ts | 113 +++++++++++++ .../src/modules/fleet/engine-breaker.ts | 158 ++++++++++++++++++ .../src/modules/fleet/engine-stats.test.ts | 102 +++++++++++ .../src/modules/fleet/engine-stats.ts | 126 ++++++++++++++ .../src/modules/fleet/routes.ts | 5 +- .../src/modules/fleet/scheduler.test.ts | 53 ++++++ .../src/modules/fleet/scheduler.ts | 57 ++++++- 10 files changed, 808 insertions(+), 2 deletions(-) create mode 100644 services/platform-service/src/modules/fleet/engine-breaker.test.ts create mode 100644 services/platform-service/src/modules/fleet/engine-breaker.ts create mode 100644 services/platform-service/src/modules/fleet/engine-stats.test.ts create mode 100644 services/platform-service/src/modules/fleet/engine-stats.ts diff --git a/services/platform-service/src/modules/fleet/README.md b/services/platform-service/src/modules/fleet/README.md index d89ed7bc..99f3e388 100644 --- a/services/platform-service/src/modules/fleet/README.md +++ b/services/platform-service/src/modules/fleet/README.md @@ -59,6 +59,30 @@ from WIP), and marks the lease `expired`. Idempotent — a reaped lease is no lo the lease doc; it cannot requeue the job, bump the epoch, or keep the checkpoint), so the reaper — not TTL — owns recovery. +## Cost/quality-aware routing (§2) + +Two **additive, flag-gated** routing refinements layer on top of the §7 scoring +core. Both default OFF, so the deterministic claim path is byte-for-byte unchanged +unless explicitly enabled. + +- **Soft engine preference (`FLEET_COST_ROUTING`).** `engine-stats.ts` + (`computeEngineQuality`) aggregates each run's `insights.costUsd` + duration + (`endedAt − startedAt`) into a per-engine `[0,1]` quality, anchored so the + cheapest/fastest engine scores 1 and pricier/slower engines decay proportionally. + The scheduler's `engineQuality` term (weight `0.4`, below affinity) gently biases + routing toward the cheaper/faster engine among otherwise-comparable candidates. + Engines with no history are **neutral (1)** — history can only demote a + demonstrably costly engine, never penalise a new one. The term contributes `0` + unless the lookup is injected, so default scoring is untouched. +- **Per-engine circuit breaker (`FLEET_ENGINE_BREAKER`).** `engine-breaker.ts` + tracks failures per `(factoryId, engine)` (CLOSED → OPEN after N consecutive + failures → HALF_OPEN probe after a reset window). `releaseLease` always **records** + the outcome (so breaker state is observable in `/fleet/metrics → engineBreakers` + even before enforcement); when the flag is on, an OPEN pair is removed from that + factory's candidate set so a repeatedly-failing engine (e.g. `codex` erroring on a + box) is routed _around_. The breaker only ever **restricts** — it never forces a + route. Jobs without a concrete `engine` resolve on the runner and are never gated. + ## Submit semantics (idempotency + deps) - same `idempotencyKey` + identical `bodyMd` → returns the existing job (dedup). diff --git a/services/platform-service/src/modules/fleet/coordinator.test.ts b/services/platform-service/src/modules/fleet/coordinator.test.ts index 2066c4b5..ac2ad1a8 100644 --- a/services/platform-service/src/modules/fleet/coordinator.test.ts +++ b/services/platform-service/src/modules/fleet/coordinator.test.ts @@ -1402,3 +1402,143 @@ describe('fleet coordinator — draft lifecycle (save / edit / submit)', () => { if (!res.ok) expect(res.reason).toBe('not_found'); }); }); + +// ── §2: per-engine circuit breaker + cost/latency-aware routing ─────────────── + +import { engineBreaker, getEngineBreakerSnapshot } from './engine-breaker.js'; + +const sleep = (ms: number) => new Promise(r => setTimeout(r, ms)); + +describe('fleet coordinator §2 — engine circuit breaker', () => { + beforeEach(() => { + setProvider(new MemoryDatastoreProvider()); + engineBreaker.clear(); + }); + afterEach(() => { + _resetDatastoreProvider(); + engineBreaker.clear(); + delete process.env.FLEET_ENGINE_BREAKER; + }); + + it('records a failed attempt against the (factory, engine) pair on release', async () => { + const { job } = await coord.submitJob(PID, input({ engine: 'codex' })); + const claim = await coord.claimNextJob(factory({ capabilities: ['engine:codex'] })); + await coord.releaseLease(job.id, PID, claim!.job.leaseEpoch, 'failed', { + result: 'failed', + insights: { engine: 'codex' }, + }); + const snap = getEngineBreakerSnapshot(); + expect(snap).toContainEqual( + expect.objectContaining({ factoryId: 'fac_1', engine: 'codex', failureCount: 1 }) + ); + }); + + it('a shipped release records success and resets the pair', async () => { + engineBreaker.recordFailure('fac_1', 'codex'); + const { job } = await coord.submitJob(PID, input({ engine: 'codex' })); + const claim = await coord.claimNextJob(factory({ capabilities: ['engine:codex'] })); + await coord.releaseLease(job.id, PID, claim!.job.leaseEpoch, 'shipped', { + result: 'shipped', + insights: { engine: 'codex' }, + }); + expect(getEngineBreakerSnapshot()).toContainEqual( + expect.objectContaining({ factoryId: 'fac_1', engine: 'codex', failureCount: 0 }) + ); + }); + + it('when enabled, an OPEN (factory, engine) breaker is NOT routed that engine', async () => { + process.env.FLEET_ENGINE_BREAKER = '1'; + engineBreaker.recordFailure('fac_1', 'codex'); + engineBreaker.recordFailure('fac_1', 'codex'); + engineBreaker.recordFailure('fac_1', 'codex'); // → OPEN (default threshold 3) + await coord.submitJob(PID, input({ engine: 'codex' })); + const claim = await coord.claimNextJob(factory({ capabilities: ['engine:codex'] })); + expect(claim).toBeNull(); + }); + + it('the breaker routes AROUND a broken engine to a healthy candidate', async () => { + process.env.FLEET_ENGINE_BREAKER = '1'; + engineBreaker.recordFailure('fac_1', 'codex'); + engineBreaker.recordFailure('fac_1', 'codex'); + engineBreaker.recordFailure('fac_1', 'codex'); // codex OPEN on fac_1 + await coord.submitJob(PID, input({ idempotencyKey: 'c', engine: 'codex' })); + await coord.submitJob(PID, input({ idempotencyKey: 'd', engine: 'devin' })); + const claim = await coord.claimNextJob( + factory({ capabilities: ['engine:codex', 'engine:devin'] }) + ); + expect(claim?.job.engine).toBe('devin'); + }); + + it('with the flag OFF, an OPEN breaker does NOT restrict routing (default behavior)', async () => { + delete process.env.FLEET_ENGINE_BREAKER; + engineBreaker.recordFailure('fac_1', 'codex'); + engineBreaker.recordFailure('fac_1', 'codex'); + engineBreaker.recordFailure('fac_1', 'codex'); + await coord.submitJob(PID, input({ engine: 'codex' })); + const claim = await coord.claimNextJob(factory({ capabilities: ['engine:codex'] })); + expect(claim?.job.engine).toBe('codex'); + }); +}); + +describe('fleet coordinator §2 — cost/latency-aware routing', () => { + beforeEach(() => { + setProvider(new MemoryDatastoreProvider()); + engineBreaker.clear(); + }); + afterEach(() => { + _resetDatastoreProvider(); + delete process.env.FLEET_COST_ROUTING; + }); + + /** Seed two completed runs so devin reads as cheap+fast and codex as pricey+slow. */ + async function seedHistory(): Promise { + const t0 = Date.parse('2026-06-01T00:00:00.000Z'); + await repo.createRun({ + id: 'seed-devin', + productId: PID, + jobId: 'seed-devin-job', + attempt: 1, + factoryId: 'fac_1', + engine: 'devin', + startedAt: new Date(t0).toISOString(), + endedAt: new Date(t0 + 1000).toISOString(), + insights: { engine: 'devin', costUsd: 1 }, + }); + await repo.createRun({ + id: 'seed-codex', + productId: PID, + jobId: 'seed-codex-job', + attempt: 1, + factoryId: 'fac_1', + engine: 'codex', + startedAt: new Date(t0).toISOString(), + endedAt: new Date(t0 + 8000).toISOString(), + insights: { engine: 'codex', costUsd: 8 }, + }); + } + + it('prefers the historically cheaper/faster engine over the older candidate', async () => { + await seedHistory(); + // codex submitted FIRST (older) — without cost routing the age tie-break picks it. + await coord.submitJob(PID, input({ idempotencyKey: 'codex-job', engine: 'codex' })); + await sleep(3); + await coord.submitJob(PID, input({ idempotencyKey: 'devin-job', engine: 'devin' })); + const both = factory({ capabilities: ['engine:codex', 'engine:devin'] }); + + process.env.FLEET_COST_ROUTING = '1'; + const claim = await coord.claimNextJob(both); + expect(claim?.job.engine).toBe('devin'); // cheaper engine wins the nudge + }); + + it('with the flag OFF, the age tie-break stands (older codex wins) — default behavior', async () => { + await seedHistory(); + await coord.submitJob(PID, input({ idempotencyKey: 'codex-job', engine: 'codex' })); + await sleep(3); + await coord.submitJob(PID, input({ idempotencyKey: 'devin-job', engine: 'devin' })); + const both = factory({ capabilities: ['engine:codex', 'engine:devin'] }); + + delete process.env.FLEET_COST_ROUTING; + const claim = await coord.claimNextJob(both); + expect(claim?.job.engine).toBe('codex'); // older job wins without the nudge + }); +}); diff --git a/services/platform-service/src/modules/fleet/coordinator.ts b/services/platform-service/src/modules/fleet/coordinator.ts index 91166080..274945a9 100644 --- a/services/platform-service/src/modules/fleet/coordinator.ts +++ b/services/platform-service/src/modules/fleet/coordinator.ts @@ -37,7 +37,10 @@ import { type SchedulerWeights, type FleetWeightRegistry, resolveWeights, + resolveJobEngine, } from './scheduler.js'; +import { engineBreaker, isEngineBreakerEnabled } from './engine-breaker.js'; +import { computeEngineQuality, isCostRoutingEnabled } from './engine-stats.js'; import { ACTIVE_STAGES, FLEET_STAGES, @@ -559,6 +562,19 @@ export async function claimNextJob(ctx: ClaimContext): Promise number) | undefined; + if (isCostRoutingEnabled()) { + const runs = await repo.listRunsByProduct(ctx.productId); + engineQuality = computeEngineQuality(runs).lookup; + } + // §2 per-engine circuit breaker: stop routing an engine that keeps failing on + // THIS factory (flag-gated; only RESTRICTS the candidate set, never forces one). + const isEngineAvailable = isEngineBreakerEnabled() + ? (engine: string): boolean => engineBreaker.allow(ctx.factoryId, engine) + : undefined; + for (let i = 0; i < CLAIM_MAX_RETRIES; i++) { // Phase 3: budget enforcement (FLEET_BUDGETS flag) if (isBudgetsEnabled()) { @@ -595,6 +611,8 @@ export async function claimNextJob(ctx: ClaimContext): Promise { + it('a fresh (factory, engine) pair is CLOSED ⇒ routable', () => { + const b = new EngineCircuitBreaker(); + expect(b.allow('f1', 'codex', T0)).toBe(true); + expect(b.isOpen('f1', 'codex', T0)).toBe(false); + }); + + it('opens after the failure threshold of consecutive failures', () => { + const b = new EngineCircuitBreaker({ failureThreshold: 3, resetTimeoutMs: 60_000 }); + b.recordFailure('f1', 'codex', T0); + b.recordFailure('f1', 'codex', T0); + expect(b.allow('f1', 'codex', T0)).toBe(true); // 2 < 3 — still closed + b.recordFailure('f1', 'codex', T0); + expect(b.allow('f1', 'codex', T0)).toBe(false); // 3rd failure trips it OPEN + }); + + it('a success before the threshold resets the failure count', () => { + const b = new EngineCircuitBreaker({ failureThreshold: 3 }); + b.recordFailure('f1', 'codex', T0); + b.recordFailure('f1', 'codex', T0); + b.recordSuccess('f1', 'codex'); + b.recordFailure('f1', 'codex', T0); + b.recordFailure('f1', 'codex', T0); + expect(b.allow('f1', 'codex', T0)).toBe(true); // count restarted ⇒ 2 < 3 + }); + + it('isolates per factory AND per engine', () => { + const b = new EngineCircuitBreaker({ failureThreshold: 1 }); + b.recordFailure('f1', 'codex', T0); + expect(b.allow('f1', 'codex', T0)).toBe(false); // tripped + expect(b.allow('f2', 'codex', T0)).toBe(true); // other factory unaffected + expect(b.allow('f1', 'devin', T0)).toBe(true); // other engine unaffected + }); +}); + +describe('engine-breaker §2 — half-open recovery', () => { + it('after the reset window it allows exactly one probe (HALF_OPEN)', () => { + const b = new EngineCircuitBreaker({ failureThreshold: 1, resetTimeoutMs: 60_000 }); + b.recordFailure('f1', 'codex', T0); + expect(b.allow('f1', 'codex', T0 + 30_000)).toBe(false); // still inside the window + expect(b.allow('f1', 'codex', T0 + 60_000)).toBe(true); // window elapsed ⇒ probe + }); + + it('a successful probe closes the breaker', () => { + const b = new EngineCircuitBreaker({ failureThreshold: 1, resetTimeoutMs: 60_000 }); + b.recordFailure('f1', 'codex', T0); + b.allow('f1', 'codex', T0 + 60_000); // → HALF_OPEN + b.recordSuccess('f1', 'codex'); + expect(b.allow('f1', 'codex', T0 + 61_000)).toBe(true); + expect(b.isOpen('f1', 'codex', T0 + 61_000)).toBe(false); + }); + + it('a failed probe immediately re-opens the breaker', () => { + const b = new EngineCircuitBreaker({ failureThreshold: 3, resetTimeoutMs: 60_000 }); + b.recordFailure('f1', 'codex', T0); + b.recordFailure('f1', 'codex', T0); + b.recordFailure('f1', 'codex', T0); // OPEN + b.allow('f1', 'codex', T0 + 60_000); // → HALF_OPEN (probe allowed) + b.recordFailure('f1', 'codex', T0 + 61_000); // probe fails + expect(b.allow('f1', 'codex', T0 + 61_000)).toBe(false); // re-opened, fresh window + }); +}); + +describe('engine-breaker §2 — snapshot + clear', () => { + it('snapshot reports tracked pairs and reflects the reset window', () => { + const b = new EngineCircuitBreaker({ failureThreshold: 1, resetTimeoutMs: 60_000 }); + b.recordFailure('f1', 'codex', T0); + const open = b.snapshot(T0 + 1_000); + expect(open).toHaveLength(1); + expect(open[0]).toMatchObject({ factoryId: 'f1', engine: 'codex', state: 'OPEN' }); + expect(open[0].lastFailureAt).toBe(new Date(T0).toISOString()); + // once the window elapses the snapshot shows HALF_OPEN (a probe is available) + const half = b.snapshot(T0 + 60_000); + expect(half[0].state).toBe('HALF_OPEN'); + }); + + it('clear drops all state', () => { + const b = new EngineCircuitBreaker({ failureThreshold: 1 }); + b.recordFailure('f1', 'codex', T0); + b.clear(); + expect(b.snapshot(T0)).toEqual([]); + expect(b.allow('f1', 'codex', T0)).toBe(true); + }); +}); + +describe('engine-breaker §2 — feature flag', () => { + it('isEngineBreakerEnabled defaults OFF and honors truthy env values', () => { + const prev = process.env.FLEET_ENGINE_BREAKER; + try { + delete process.env.FLEET_ENGINE_BREAKER; + expect(isEngineBreakerEnabled()).toBe(false); + process.env.FLEET_ENGINE_BREAKER = '1'; + expect(isEngineBreakerEnabled()).toBe(true); + process.env.FLEET_ENGINE_BREAKER = 'true'; + expect(isEngineBreakerEnabled()).toBe(true); + process.env.FLEET_ENGINE_BREAKER = 'off'; + expect(isEngineBreakerEnabled()).toBe(false); + } finally { + if (prev === undefined) delete process.env.FLEET_ENGINE_BREAKER; + else process.env.FLEET_ENGINE_BREAKER = prev; + } + }); +}); diff --git a/services/platform-service/src/modules/fleet/engine-breaker.ts b/services/platform-service/src/modules/fleet/engine-breaker.ts new file mode 100644 index 00000000..0f47640a --- /dev/null +++ b/services/platform-service/src/modules/fleet/engine-breaker.ts @@ -0,0 +1,158 @@ +/** + * Per-engine circuit breaker for fleet routing (§2 cost/quality routing). + * + * Tracks failures per `(factoryId, engine)` pair. When a concrete engine + * repeatedly fails on a given factory (e.g. `codex` erroring on a box that lacks + * a working install), the breaker OPENS for that pair and the scheduler stops + * routing that engine THERE — without ever forcing a job onto a worse factory. + * The breaker only ever RESTRICTS the candidate set; it never overrides the + * deterministic §7 scoring or forces a route. + * + * States mirror the sidecar breaker (extraction-service): CLOSED (normal) → OPEN + * (skip-route) → HALF_OPEN (allow one probe after the reset window). A single + * probe success closes the breaker; a probe failure re-opens it. + * + * This module is PURE w.r.t. I/O — state is in-memory and process-wide (one + * coordinator process owns the claim path, matching the reaper-stats pattern). + * Time is injectable so the logic is fully unit-testable without a real clock. + */ + +type BreakerState = 'CLOSED' | 'OPEN' | 'HALF_OPEN'; + +export interface EngineBreakerOptions { + /** Consecutive failures on a (factory, engine) before the breaker OPENS. */ + failureThreshold?: number; + /** How long a breaker stays OPEN before allowing a single HALF_OPEN probe. */ + resetTimeoutMs?: number; +} + +/** One (factory, engine) entry — its current state plus a snapshot for operators. */ +export interface EngineBreakerEntry { + factoryId: string; + engine: string; + state: BreakerState; + failureCount: number; + lastFailureAt: string | null; +} + +interface Cell { + state: BreakerState; + failureCount: number; + lastFailureMs: number; +} + +const DEFAULT_FAILURE_THRESHOLD = 3; +const DEFAULT_RESET_TIMEOUT_MS = 5 * 60_000; // 5 minutes + +function key(factoryId: string, engine: string): string { + return `${factoryId}\u0000${engine}`; +} + +/** + * A registry of per-`(factoryId, engine)` circuit breakers. Construct directly in + * tests (with injected time); the coordinator uses the shared module singleton. + */ +export class EngineCircuitBreaker { + private readonly cells = new Map(); + private readonly failureThreshold: number; + private readonly resetTimeoutMs: number; + + constructor(opts: EngineBreakerOptions = {}) { + this.failureThreshold = opts.failureThreshold ?? DEFAULT_FAILURE_THRESHOLD; + this.resetTimeoutMs = opts.resetTimeoutMs ?? DEFAULT_RESET_TIMEOUT_MS; + } + + /** + * Whether a job pinned to `engine` may be routed to `factoryId` right now. + * CLOSED ⇒ yes. OPEN ⇒ no, until the reset window elapses, after which we move + * to HALF_OPEN and allow exactly one probe. May mutate state (OPEN→HALF_OPEN), + * mirroring the sidecar breaker's `allowRequest`. + */ + allow(factoryId: string, engine: string, now: number = Date.now()): boolean { + const cell = this.cells.get(key(factoryId, engine)); + if (!cell || cell.state === 'CLOSED') return true; + if (cell.state === 'OPEN') { + if (now - cell.lastFailureMs >= this.resetTimeoutMs) { + cell.state = 'HALF_OPEN'; + return true; // allow a single probe + } + return false; + } + return true; // HALF_OPEN — the probe is in flight; allow it through + } + + /** Inverse of {@link allow} — convenience for "is this route blocked?". */ + isOpen(factoryId: string, engine: string, now: number = Date.now()): boolean { + return !this.allow(factoryId, engine, now); + } + + /** Record a failed run of `engine` on `factoryId`; may trip the breaker OPEN. */ + recordFailure(factoryId: string, engine: string, now: number = Date.now()): void { + const k = key(factoryId, engine); + const cell = this.cells.get(k) ?? { state: 'CLOSED', failureCount: 0, lastFailureMs: 0 }; + cell.failureCount += 1; + cell.lastFailureMs = now; + // A failed probe (HALF_OPEN) re-opens immediately; otherwise open on threshold. + if (cell.state === 'HALF_OPEN' || cell.failureCount >= this.failureThreshold) { + cell.state = 'OPEN'; + } + this.cells.set(k, cell); + } + + /** Record a successful run of `engine` on `factoryId` — closes/resets the pair. */ + recordSuccess(factoryId: string, engine: string): void { + const k = key(factoryId, engine); + const cell = this.cells.get(k); + if (!cell) return; // never failed ⇒ stays implicitly CLOSED + cell.state = 'CLOSED'; + cell.failureCount = 0; + this.cells.set(k, cell); + } + + /** + * Operator snapshot of every tracked pair, with OPEN→HALF_OPEN reset applied as + * of `now` so the view reflects what routing would actually do. Read-only: + * callers get a copy and cannot mutate internal state. + */ + snapshot(now: number = Date.now()): EngineBreakerEntry[] { + const out: EngineBreakerEntry[] = []; + for (const [k, cell] of this.cells) { + const sep = k.indexOf('\u0000'); + const factoryId = k.slice(0, sep); + const engine = k.slice(sep + 1); + const open = cell.state === 'OPEN' && now - cell.lastFailureMs < this.resetTimeoutMs; + out.push({ + factoryId, + engine, + state: open ? 'OPEN' : cell.state === 'OPEN' ? 'HALF_OPEN' : cell.state, + failureCount: cell.failureCount, + lastFailureAt: cell.lastFailureMs ? new Date(cell.lastFailureMs).toISOString() : null, + }); + } + return out; + } + + /** Drop all state (tests / operator reset). */ + clear(): void { + this.cells.clear(); + } +} + +/** Process-wide breaker shared by the claim path + metrics (one coordinator). */ +export const engineBreaker = new EngineCircuitBreaker(); + +/** Snapshot of the shared breaker for operator metrics / the fleet dashboard. */ +export function getEngineBreakerSnapshot(now: number = Date.now()): EngineBreakerEntry[] { + return engineBreaker.snapshot(now); +} + +/** + * `FLEET_ENGINE_BREAKER` env gate — default OFF. The breaker always RECORDS + * outcomes (so its state is observable in metrics even before enforcement), but + * only RESTRICTS routing when this flag is enabled. Keeps default behavior + * byte-for-byte unchanged. + */ +export function isEngineBreakerEnabled(): boolean { + const v = (process.env.FLEET_ENGINE_BREAKER ?? '').trim().toLowerCase(); + return v === '1' || v === 'true' || v === 'on' || v === 'yes'; +} diff --git a/services/platform-service/src/modules/fleet/engine-stats.test.ts b/services/platform-service/src/modules/fleet/engine-stats.test.ts new file mode 100644 index 00000000..3288d552 --- /dev/null +++ b/services/platform-service/src/modules/fleet/engine-stats.test.ts @@ -0,0 +1,102 @@ +/** + * Per-engine quality from run history (§2) — pure unit tests. + */ + +import { describe, expect, it } from 'vitest'; +import { computeEngineQuality, isCostRoutingEnabled } from './engine-stats.js'; +import type { FleetRunDoc } from './types.js'; + +let seq = 0; +function run(over: Partial & { engine: string }): FleetRunDoc { + seq += 1; + return { + id: `r${seq}`, + productId: 'lysnrai', + jobId: `j${seq}`, + attempt: 1, + startedAt: '2026-06-01T00:00:00.000Z', + insights: {}, + ...over, + } as FleetRunDoc; +} + +/** A run with a cost and a duration (ms) on a given engine. */ +function costed(engine: string, costUsd: number, durationMs: number): FleetRunDoc { + const start = '2026-06-01T00:00:00.000Z'; + const end = new Date(Date.parse(start) + durationMs).toISOString(); + return run({ engine, startedAt: start, endedAt: end, insights: { costUsd } }); +} + +describe('computeEngineQuality §2', () => { + it('cheapest + fastest engine anchors quality at 1; pricier/slower decays', () => { + const { lookup, byEngine } = computeEngineQuality([ + costed('devin', 1, 1000), + costed('codex', 4, 4000), + ]); + expect(lookup('devin')).toBeCloseTo(1); + // codex: costScore 1/4, durScore 1000/4000 ⇒ (0.25 + 0.25)/2 = 0.25 + expect(lookup('codex')).toBeCloseTo(0.25); + expect(byEngine.get('codex')?.avgCostUsd).toBe(4); + expect(byEngine.get('codex')?.samples).toBe(1); + }); + + it('averages multiple runs per engine', () => { + const { byEngine } = computeEngineQuality([costed('devin', 2, 1000), costed('devin', 4, 3000)]); + expect(byEngine.get('devin')?.avgCostUsd).toBe(3); + expect(byEngine.get('devin')?.avgDurationMs).toBe(2000); + }); + + it('unknown / no-history engines are neutral (1) — never penalised', () => { + const { lookup } = computeEngineQuality([costed('devin', 5, 5000)]); + expect(lookup('claude')).toBe(1); + expect(lookup(undefined)).toBe(1); + }); + + it('a single engine with data scores 1 (term is inert with no peer to compare)', () => { + const { lookup } = computeEngineQuality([costed('devin', 9, 9000)]); + expect(lookup('devin')).toBeCloseTo(1); + }); + + it('uses whichever signal is present (cost-only / duration-only)', () => { + const { lookup } = computeEngineQuality([ + run({ engine: 'devin', insights: { costUsd: 1 } }), // cost only, no endedAt + run({ engine: 'codex', insights: { costUsd: 5 } }), + ]); + expect(lookup('devin')).toBeCloseTo(1); + expect(lookup('codex')).toBeCloseTo(0.2); + }); + + it('ignores runs with no engine, "unknown" engine, or no usable signal', () => { + const { byEngine } = computeEngineQuality([ + run({ engine: 'unknown', insights: { costUsd: 3 } }), + run({ engine: 'devin', insights: {} }), // no cost, no end + costed('codex', 2, 2000), + ]); + expect(byEngine.has('unknown')).toBe(false); + expect(byEngine.has('devin')).toBe(false); + expect(byEngine.has('codex')).toBe(true); + }); + + it('empty history ⇒ everything neutral', () => { + const { lookup, byEngine } = computeEngineQuality([]); + expect(byEngine.size).toBe(0); + expect(lookup('devin')).toBe(1); + }); +}); + +describe('isCostRoutingEnabled §2', () => { + it('defaults OFF and honors truthy env values', () => { + const prev = process.env.FLEET_COST_ROUTING; + try { + delete process.env.FLEET_COST_ROUTING; + expect(isCostRoutingEnabled()).toBe(false); + process.env.FLEET_COST_ROUTING = 'on'; + expect(isCostRoutingEnabled()).toBe(true); + process.env.FLEET_COST_ROUTING = '0'; + expect(isCostRoutingEnabled()).toBe(false); + } finally { + if (prev === undefined) delete process.env.FLEET_COST_ROUTING; + else process.env.FLEET_COST_ROUTING = prev; + } + }); +}); diff --git a/services/platform-service/src/modules/fleet/engine-stats.ts b/services/platform-service/src/modules/fleet/engine-stats.ts new file mode 100644 index 00000000..c52e966f --- /dev/null +++ b/services/platform-service/src/modules/fleet/engine-stats.ts @@ -0,0 +1,126 @@ +/** + * Per-engine quality from run history (§2 cost/latency-aware routing). + * + * PURE + synchronous: given a set of completed runs, derive a `(engine) => [0,1]` + * quality lookup the scheduler's soft `engineQuality` term consumes. 1 = the + * historically cheapest + fastest engine; lower = more expensive / slower. + * + * Design choices that keep this a safe, gentle nudge: + * • Scores are RELATIVE: the cheapest observed engine anchors costScore = 1, the + * fastest anchors durationScore = 1; others decay proportionally. With a single + * engine (or no data) every engine scores 1 ⇒ the term is inert. + * • Engines with NO history return the neutral 1 — history can only DEMOTE a + * demonstrably costly/slow engine, never penalise a new/unknown one. + * • Cost and duration are averaged independently and combined 50/50; an engine + * with only one signal uses whichever it has. + */ + +import type { FleetRunDoc } from './types.js'; + +interface Accum { + costSum: number; + costN: number; + durSum: number; + durN: number; +} + +/** Per-engine averages plus the final blended quality in [0,1]. */ +export interface EngineQualityEntry { + engine: string; + avgCostUsd: number | null; + avgDurationMs: number | null; + /** Runs that contributed at least one signal (cost or duration). */ + samples: number; + quality: number; +} + +export interface EngineQuality { + byEngine: Map; + /** Lookup for the scheduler: unknown / no-history engines ⇒ neutral 1. */ + lookup: (engine: string | undefined) => number; +} + +/** The engine a run actually used: the factory-reported concrete engine wins, + * else the engine the run was created with. */ +function runEngine(run: FleetRunDoc): string | undefined { + return run.insights?.engine ?? run.engine; +} + +function runDurationMs(run: FleetRunDoc): number | null { + if (!run.endedAt) return null; + const start = Date.parse(run.startedAt); + const end = Date.parse(run.endedAt); + if (Number.isNaN(start) || Number.isNaN(end) || end < start) return null; + return end - start; +} + +/** + * Build the per-engine quality lookup from `runs`. Only runs with a known engine + * and at least one usable signal (positive cost or a valid duration) contribute. + */ +export function computeEngineQuality(runs: readonly FleetRunDoc[]): EngineQuality { + const acc = new Map(); + for (const run of runs) { + const engine = runEngine(run); + if (!engine || engine === 'unknown') continue; + const cost = run.insights?.costUsd; + const dur = runDurationMs(run); + const hasCost = typeof cost === 'number' && cost > 0; + if (!hasCost && dur === null) continue; + const a = acc.get(engine) ?? { costSum: 0, costN: 0, durSum: 0, durN: 0 }; + if (hasCost) { + a.costSum += cost as number; + a.costN += 1; + } + if (dur !== null) { + a.durSum += dur; + a.durN += 1; + } + acc.set(engine, a); + } + + // Anchors: the cheapest avg cost and the fastest avg duration across engines. + let minAvgCost = Infinity; + let minAvgDur = Infinity; + const avgs = new Map(); + for (const [engine, a] of acc) { + const cost = a.costN > 0 ? a.costSum / a.costN : null; + const dur = a.durN > 0 ? a.durSum / a.durN : null; + avgs.set(engine, { cost, dur, samples: Math.max(a.costN, a.durN) }); + if (cost !== null && cost < minAvgCost) minAvgCost = cost; + if (dur !== null && dur < minAvgDur) minAvgDur = dur; + } + + const byEngine = new Map(); + for (const [engine, { cost, dur, samples }] of avgs) { + // Relative scores in (0,1]: cheapest/fastest = 1, others decay proportionally. + const costScore = cost !== null && cost > 0 ? minAvgCost / cost : cost === 0 ? 1 : null; + const durScore = dur !== null && dur > 0 ? minAvgDur / dur : dur === 0 ? 1 : null; + const parts = [costScore, durScore].filter((s): s is number => s !== null); + const quality = parts.length > 0 ? parts.reduce((s, v) => s + v, 0) / parts.length : 1; + byEngine.set(engine, { + engine, + avgCostUsd: cost, + avgDurationMs: dur, + samples, + quality, + }); + } + + const lookup = (engine: string | undefined): number => { + if (!engine) return 1; + return byEngine.get(engine)?.quality ?? 1; + }; + + return { byEngine, lookup }; +} + +/** + * `FLEET_COST_ROUTING` env gate — default OFF. When enabled, the coordinator + * injects the engine-quality lookup into the scheduler so cost/latency history + * gently biases routing. Off ⇒ the soft term stays inert (no behavior change). + */ +export function isCostRoutingEnabled(): boolean { + const v = (process.env.FLEET_COST_ROUTING ?? '').trim().toLowerCase(); + return v === '1' || v === 'true' || v === 'on' || v === 'yes'; +} diff --git a/services/platform-service/src/modules/fleet/routes.ts b/services/platform-service/src/modules/fleet/routes.ts index e3426893..a62f704f 100644 --- a/services/platform-service/src/modules/fleet/routes.ts +++ b/services/platform-service/src/modules/fleet/routes.ts @@ -34,6 +34,7 @@ import * as artifactsBlob from './artifacts-blob.js'; import * as enrollment from './enrollment.js'; import * as trackerBridge from './tracker-bridge.js'; import { getReaperStats } from './reaper.js'; +import { getEngineBreakerSnapshot } from './engine-breaker.js'; import { SubmitJobSchema, ListJobsQuerySchema, @@ -449,7 +450,9 @@ export async function fleetRoutes(app: FastifyInstance) { const metrics = await coordinator.fleetMetrics(pid); // Attach process-wide recovery/GC telemetry (the reaper runs across products, // so these counters are global — operator visibility into recovery activity). - return { ...metrics, reaper: getReaperStats() }; + // engineBreakers exposes any (factory, engine) pairs currently tripped/probing + // so the dashboard can flag routing that's being restricted (§2). + return { ...metrics, reaper: getReaperStats(), engineBreakers: getEngineBreakerSnapshot() }; }); // ── M0 RU gate: per-product queue version (cheap ~1 RU point read) ── diff --git a/services/platform-service/src/modules/fleet/scheduler.test.ts b/services/platform-service/src/modules/fleet/scheduler.test.ts index 9b58be3e..a09c955c 100644 --- a/services/platform-service/src/modules/fleet/scheduler.test.ts +++ b/services/platform-service/src/modules/fleet/scheduler.test.ts @@ -223,6 +223,59 @@ describe('scheduler §7 — affinity', () => { }); }); +describe('scheduler §2 — soft engine-quality term', () => { + it('is inert (0) when no engineQuality lookup is provided', () => { + const j = job({ id: 'j', engine: 'codex' }); + expect(scoreCandidate(j, fac(), ctx()).breakdown.engineQuality).toBe(0); + }); + + it('prefers the historically cheaper/faster engine among eligible candidates', () => { + const cheap = job({ id: 'cheap', engine: 'devin' }); + const pricey = job({ id: 'pricey', engine: 'codex' }); + // devin scores high (1), codex low (0.2) — same factory, all else equal. + const quality = (e: string | undefined) => (e === 'devin' ? 1 : e === 'codex' ? 0.2 : 1); + const c = ctx({ engineQuality: quality }); + expect(selectJob([pricey, cheap], fac(), c)?.id).toBe('cheap'); + expect(scoreCandidate(cheap, fac(), c).breakdown.engineQuality).toBeGreaterThan( + scoreCandidate(pricey, fac(), c).breakdown.engineQuality + ); + }); + + it('is only a gentle nudge — a real signal (starvation) dominates it', () => { + // The cheap engine is fresh; the pricey engine has aged a long time. The + // anti-starvation boost (weight 1.5) outweighs the engine-quality nudge (0.4), + // so the aged pricey-engine job still wins — the nudge never overpowers a + // genuine routing signal. + const cheapFresh = job({ id: 'cheapFresh', engine: 'devin', createdAt: iso(0) }); + const priceyAged = job({ id: 'priceyAged', engine: 'codex', createdAt: iso(40 * 60_000) }); + const quality = (e: string | undefined) => (e === 'devin' ? 1 : 0); + const c = ctx({ engineQuality: quality }); + expect(selectJob([cheapFresh, priceyAged], fac(), c)?.id).toBe('priceyAged'); + }); +}); + +describe('scheduler §2 — hard engine-availability gate', () => { + it('excludes a concrete-engine job when the engine is unavailable (e.g. breaker open)', () => { + const codexJob = job({ id: 'codex', engine: 'codex' }); + const blocked = ctx({ isEngineAvailable: e => e !== 'codex' }); + expect(selectJob([codexJob], fac(), blocked)).toBeNull(); + expect(selectJob([codexJob], fac(), ctx())?.id).toBe('codex'); // available by default + }); + + it('routes around an unavailable engine to an available candidate', () => { + const codexJob = job({ id: 'codex', engine: 'codex' }); + const devinJob = job({ id: 'devin', engine: 'devin' }); + const c = ctx({ isEngineAvailable: e => e !== 'codex' }); + expect(selectJob([codexJob, devinJob], fac(), c)?.id).toBe('devin'); + }); + + it('never gates a job without a concrete engine (resolves on the runner)', () => { + const abstract = job({ id: 'abs' }); // no concrete engine + const c = ctx({ isEngineAvailable: () => false }); + expect(selectJob([abstract], fac(), c)?.id).toBe('abs'); + }); +}); + describe('scheduler §7 — breakdown & determinism', () => { it('breakdown is per-weighted-term and sums to the score', () => { const j = job({ id: 'j', priority: 'high', budget: { usd: 3 }, createdAt: iso(90_000) }); diff --git a/services/platform-service/src/modules/fleet/scheduler.ts b/services/platform-service/src/modules/fleet/scheduler.ts index 2da1e6c3..a0f45a1a 100644 --- a/services/platform-service/src/modules/fleet/scheduler.ts +++ b/services/platform-service/src/modules/fleet/scheduler.ts @@ -47,6 +47,10 @@ export interface SchedulerWeights { /** w6 — starvation: subtracts a freshness penalty that decays as a job ages, * so an aged job outranks an equally-prioritised fresh one (anti-starvation). */ starvation: number; + /** w7 — engine quality: a SOFT preference for the historically cheaper/faster + * engine among eligible candidates (§2). Contributes 0 unless the caller injects + * a `ctx.engineQuality` lookup, so it never alters default scoring. */ + engineQuality: number; } /** @@ -62,6 +66,10 @@ export const DEFAULT_WEIGHTS: SchedulerWeights = { costFit: 0.75, health: 1.0, starvation: 1.5, + // A gentle nudge (0.4) — below affinity — so cost/latency history only breaks + // near-ties between otherwise-comparable engines; it never overrides hard signals. + // Inert (contributes 0) unless `ctx.engineQuality` is supplied. + engineQuality: 0.4, }; /** Aging config for the starvation term (fixed this phase). */ @@ -108,6 +116,20 @@ export interface SchedulerContext { costCeilingUsd?: number; /** Override the starvation aging config (fixed defaults otherwise). */ starvation?: StarvationConfig; + /** + * SOFT engine-quality lookup (§2): maps a candidate's engine to a [0,1] score + * where 1 = historically cheapest/fastest and lower = more expensive/slower. + * Omitted ⇒ the engineQuality term contributes 0 (default scoring is unchanged). + * The coordinator derives this from per-engine run history (cost + duration). + */ + engineQuality?: (engine: string | undefined) => number; + /** + * HARD engine-availability gate (§2): when supplied, a job pinned to a concrete + * `engine` is eligible only if this returns true for that engine. The coordinator + * composes the per-engine circuit breaker and per-engine budget here. It can only + * RESTRICT routing (never force one). Omitted ⇒ all engines available. + */ + isEngineAvailable?: (engine: string) => boolean; } /** Per-term, already-weighted contributions. Sums to `score` (starvation signed −). */ @@ -118,6 +140,7 @@ export interface ScoreBreakdown { costFit: number; health: number; starvation: number; + engineQuality: number; } export interface ScoredCandidate { @@ -229,6 +252,31 @@ function starvationPenaltyTerm(job: FleetJobDoc, ctx: SchedulerContext): number return clamp01(1 - aged / cfg.buckets); } +/** The engine a candidate will (most likely) run on: an explicit concrete engine, + * else the first prefers-engine hint, else the abstract class. Used by the soft + * engine-quality term and the hard availability gate. */ +export function resolveJobEngine(job: FleetJobDoc): string | undefined { + return job.engine ?? job.manifestSnapshot?.prefersEngine?.[0] ?? job.engineClass; +} + +/** w7 term — soft engine-quality preference in [0,1]. Inert (0) unless the caller + * injects `ctx.engineQuality`; an unknown engine returns whatever the lookup says + * (the coordinator's lookup treats no-history engines as neutral = 1, so history + * can only DEMOTE demonstrably expensive/slow engines, never penalise new ones). */ +function engineQualityTerm(job: FleetJobDoc, ctx: SchedulerContext): number { + if (!ctx.engineQuality) return 0; + return clamp01(ctx.engineQuality(resolveJobEngine(job))); +} + +/** Hard availability gate: a concrete-engine job is eligible only when the injected + * predicate allows that engine. Jobs without a concrete engine resolve on the + * runner and are never gated here (mirrors `engineEligible`). No predicate ⇒ all. */ +export function engineAvailable(job: FleetJobDoc, ctx: SchedulerContext): boolean { + if (!ctx.isEngineAvailable) return true; + if (!job.engine) return true; + return ctx.isEngineAvailable(job.engine); +} + // ── Public scoring API ──────────────────────────────────────────────────────── /** @@ -249,6 +297,7 @@ export function scoreCandidate( costFit: weights.costFit * costFitTerm(job, ctx), health: weights.health * healthTerm(factory), starvation: -weights.starvation * starvationPenaltyTerm(job, ctx), + engineQuality: weights.engineQuality * engineQualityTerm(job, ctx), }; const score = breakdown.capabilityFit + @@ -256,7 +305,8 @@ export function scoreCandidate( breakdown.load + breakdown.costFit + breakdown.health + - breakdown.starvation; + breakdown.starvation + + breakdown.engineQuality; return { score, breakdown }; } @@ -296,6 +346,10 @@ export function resolveWeights( health: requestOverride?.health ?? productEntry?.health ?? DEFAULT_WEIGHTS.health, starvation: requestOverride?.starvation ?? productEntry?.starvation ?? DEFAULT_WEIGHTS.starvation, + engineQuality: + requestOverride?.engineQuality ?? + productEntry?.engineQuality ?? + DEFAULT_WEIGHTS.engineQuality, }; } @@ -398,6 +452,7 @@ export function selectJob( !isAwaitingRetryBackoff(job, ctx.now) && capabilitiesSubset(job.capabilities ?? [], factory.capabilities) && engineEligible(job, factory.capabilities) && + engineAvailable(job, ctx) && depsSatisfied(job) ); if (eligible.length === 0) return null;