feat(platform-service): Phase 2 scheduler/router core (§7) + wire into atomic claim

Add a pure, fixed-weight scoring engine that decides WHICH queued job a claiming
factory gets, and wire it into coordinator.claimNextJob (the atomic rev-CAS claim
in tryClaimJob is unchanged).

scheduler.ts (pure, synchronous, no I/O):
- scoreCandidate(job, factory, ctx, weights?) -> { score, breakdown }
  score = w1*capabilityFit + w2*affinity + w3*(1/(1+load)) + w4*costFit(budget)
        + w5*health - w6*starvationPenalty(age); breakdown is per-weighted-term
        and sums to score (explainability / Phase-3 readiness).
- selectJob(candidates, factory, ctx, weights?) -> FleetJobDoc | null
  filters to stage-eligible + deps-satisfied (injected pure predicate) +
  capability-subset (+ down-health floor), ranks by score, deterministic
  tie-break: higher priority -> older createdAt -> lower cost class.
- Fixed default weights + bucketed anti-starvation aging (Phase 3 = tunable
  weights + preemption; intentionally NOT built here).

coordinator.ts (candidate-ranking section only):
- claimNextJob now resolves deps (store-backed) into a pure predicate, builds the
  factory view + authoritative now, and selects via selectJob; tryClaimJob CAS /
  lease / fence logic untouched. ClaimContext gains additive optional scheduler
  inputs (health/load/seatLimit/factoryEngines/warmScopes/costCeilingUsd). The
  pure capability-subset predicate moved into scheduler.ts and is re-exported.

Tests: scheduler.test.ts (16) covers capability hard-filter, priority/age
tie-breaks, load, health (+ down floor), starvation, cost fit, affinity, breakdown
sum, determinism, empty/no-eligible. coordinator.test.ts adds score-driven
selection, health floor, and ordered drain; all prior fleet tests stay green.

Generated with [Devin](https://cli.devin.ai/docs)

Co-Authored-By: Devin <158243242+devin-ai-integration[bot]@users.noreply.github.com>
This commit is contained in:
saravanakumardb1 2026-05-29 23:03:40 -07:00
parent 6f6e005114
commit 7930e8b0bd
4 changed files with 576 additions and 21 deletions

View File

@ -279,4 +279,40 @@ describe('fleet coordinator', () => {
coord.submitJob(PID, input({ idempotencyKey: 'k', bodyMd: 'v2' })) coord.submitJob(PID, input({ idempotencyKey: 'k', bodyMd: 'v2' }))
).rejects.toBeInstanceOf(ConflictError); ).rejects.toBeInstanceOf(ConflictError);
}); });
// ── §7 SCORE-DRIVEN SELECTION (Phase 2 scheduler wired into claimNextJob) ──
it('selection now follows the §7 score: within-budget cost-fit beats an older same-priority job', async () => {
// jobB is submitted first (older) and is the same priority, but its budget blows
// the factory cost ceiling. The old priority+age rule would have taken the older
// jobB; the scorer prefers the within-budget jobA.
await coord.submitJob(
PID,
input({ idempotencyKey: 'B-old-expensive', priority: 'medium', budget: { usd: 100 } })
);
await coord.submitJob(
PID,
input({ idempotencyKey: 'A-new-cheap', priority: 'medium', budget: { usd: 5 } })
);
const claim = await coord.claimNextJob(factory({ costCeilingUsd: 10 }));
expect(claim?.job.idempotencyKey).toBe('A-new-cheap');
});
it('claimNextJob: a factory below the health floor (down) claims nothing; a healthy one does', async () => {
await coord.submitJob(PID, input());
expect(await coord.claimNextJob(factory({ health: 'down' }))).toBeNull();
const ok = await coord.claimNextJob(factory({ health: 'ok' }));
expect(ok).not.toBeNull();
});
it('claimNextJob drains in score order: highest priority first, then the next, then null', async () => {
await coord.submitJob(PID, input({ idempotencyKey: 'low', priority: 'low' }));
await coord.submitJob(PID, input({ idempotencyKey: 'crit', priority: 'critical' }));
expect((await coord.claimNextJob(factory({ factoryId: 'f1' })))?.job.idempotencyKey).toBe(
'crit'
);
expect((await coord.claimNextJob(factory({ factoryId: 'f2' })))?.job.idempotencyKey).toBe(
'low'
);
expect(await coord.claimNextJob(factory({ factoryId: 'f3' }))).toBeNull();
});
}); });

View File

@ -20,6 +20,7 @@
import { createHash } from 'node:crypto'; import { createHash } from 'node:crypto';
import { BadRequestError, ConflictError } from '../../lib/errors.js'; import { BadRequestError, ConflictError } from '../../lib/errors.js';
import * as repo from './repository.js'; import * as repo from './repository.js';
import { selectJob, type SchedulerContext, type SchedulerFactory } from './scheduler.js';
import { import {
ACTIVE_STAGES, ACTIVE_STAGES,
DEP_DONE_HARD, DEP_DONE_HARD,
@ -38,11 +39,10 @@ export function contentHash(bodyMd: string): string {
return createHash('sha256').update(bodyMd).digest('hex'); return createHash('sha256').update(bodyMd).digest('hex');
} }
/** Every required capability token must be advertised by the factory. */ // The capability-subset predicate + the §7 scoring/selection engine live in the
export function capabilitiesSubset(required: string[], available: string[]): boolean { // pure, unit-tested scheduler module. Re-export the predicate here to preserve the
const set = new Set(available); // coordinator's public surface (claimNextJob now ranks candidates via selectJob).
return required.every(token => set.has(token)); export { capabilitiesSubset } from './scheduler.js';
}
// ── Dependency evaluation (§5) ──────────────────────────────────────────────── // ── Dependency evaluation (§5) ────────────────────────────────────────────────
@ -228,6 +228,19 @@ export interface ClaimContext {
factoryId: string; factoryId: string;
capabilities: string[]; capabilities: string[];
leaseSeconds: number; leaseSeconds: number;
// ── §7 scheduler inputs (additive, optional — sane defaults below) ──
/** Factory health; below the floor (`down`) the factory claims nothing. */
health?: 'ok' | 'degraded' | 'down';
/** Current factory load (busier ⇒ lower score). */
load?: number;
/** Per-engine seat limit (carried for scoring/future seat-aware routing). */
seatLimit?: number;
/** Engines this factory runs (prefers-engine affinity). */
factoryEngines?: string[];
/** Scopes (repos/locks) the factory has warm (repo-stickiness affinity). */
warmScopes?: string[];
/** Factory/budget cost ceiling in USD (cost-fit). */
costCeilingUsd?: number;
} }
export interface ClaimResult { export interface ClaimResult {
@ -236,12 +249,18 @@ export interface ClaimResult {
run: FleetRunDoc; run: FleetRunDoc;
} }
/** A job is eligible for a factory iff queued/blocked-with-met-deps + caps subset. */ /**
async function eligibleForClaim(job: FleetJobDoc, factoryCaps: string[]): Promise<boolean> { * Resolve which stage-eligible (queued/blocked) jobs currently have their deps
if (job.stage !== 'queued' && job.stage !== 'blocked') return false; * satisfied. This is the store-backed (async) half of eligibility; the pure
if (!capabilitiesSubset(job.capabilities, factoryCaps)) return false; * capability-subset filter + §7 scoring + tie-break are applied by `selectJob`.
const unmet = await unmetDeps(job); */
return unmet.length === 0; async function depsSatisfiedIds(jobs: FleetJobDoc[]): Promise<Set<string>> {
const satisfied = new Set<string>();
for (const job of jobs) {
if (job.stage !== 'queued' && job.stage !== 'blocked') continue;
if ((await unmetDeps(job)).length === 0) satisfied.add(job.id);
}
return satisfied;
} }
/** /**
@ -314,19 +333,36 @@ export async function tryClaimJob(
return { ok: true, doc: { job: claimed.doc, lease, run } }; return { ok: true, doc: { job: claimed.doc, lease, run } };
} }
/** Select the highest-priority, oldest eligible job and atomically claim it. */ /**
* Select the best eligible job via the §7 scoring engine and atomically claim it.
*
* The coordinator owns all I/O: it lists candidates, resolves deps (store-backed)
* into a pure predicate, and builds the factory view + authoritative `now`. The
* pure `selectJob` then applies the capability hard-filter + fixed-weight scoring
* + deterministic tie-break (priority age cost class). The atomic
* single-winner guarantee remains entirely in `tryClaimJob`'s rev compare-and-swap,
* which is unchanged on conflict we re-select and retry.
*/
export async function claimNextJob(ctx: ClaimContext): Promise<ClaimResult | null> { export async function claimNextJob(ctx: ClaimContext): Promise<ClaimResult | null> {
const factory: SchedulerFactory = {
capabilities: ctx.capabilities,
health: ctx.health ?? 'ok',
load: ctx.load ?? 0,
seatLimit: ctx.seatLimit ?? 1,
};
for (let i = 0; i < CLAIM_MAX_RETRIES; i++) { for (let i = 0; i < CLAIM_MAX_RETRIES; i++) {
const candidates = await repo.listJobs({ productId: ctx.productId }); const candidates = await repo.listJobs({ productId: ctx.productId });
const eligible: FleetJobDoc[] = []; const satisfied = await depsSatisfiedIds(candidates);
for (const job of candidates) { const schedulerCtx: SchedulerContext = {
if (await eligibleForClaim(job, ctx.capabilities)) eligible.push(job); now: Date.now(), // coordinator-authoritative time
} isDepsSatisfied: job => satisfied.has(job.id),
if (eligible.length === 0) return null; factoryEngines: ctx.factoryEngines,
eligible.sort( warmScopes: ctx.warmScopes,
(a, b) => a.priorityOrder - b.priorityOrder || a.createdAt.localeCompare(b.createdAt) costCeilingUsd: ctx.costCeilingUsd,
); };
const result = await tryClaimJob(eligible[0], ctx); const pick = selectJob(candidates, factory, schedulerCtx);
if (!pick) return null;
const result = await tryClaimJob(pick, ctx);
if (result.ok) return result.doc; if (result.ok) return result.doc;
if (result.reason === 'not_found') continue; if (result.reason === 'not_found') continue;
// conflict: another factory won this version — re-select and retry // conflict: another factory won this version — re-select and retry

View File

@ -0,0 +1,203 @@
/**
* Fleet scheduler / router core (§7) pure, deterministic unit tests.
* No datastore, no real clock: time is injected via ctx.now.
*/
import { describe, expect, it } from 'vitest';
import {
DEFAULT_WEIGHTS,
scoreCandidate,
selectJob,
capabilitiesSubset,
type SchedulerContext,
type SchedulerFactory,
} from './scheduler.js';
import { PRIORITY_ORDER, type FleetJobDoc, type FleetPriority } from './types.js';
const NOW = Date.parse('2026-05-29T12:00:00.000Z');
const iso = (msAgo: number) => new Date(NOW - msAgo).toISOString();
/** Build a minimal valid FleetJobDoc for scoring. */
function job(over: Partial<FleetJobDoc> & { id: string }): FleetJobDoc {
const priority: FleetPriority = over.priority ?? 'medium';
const manifest: FleetJobDoc['manifestSnapshot'] = {
priority,
capabilities: over.capabilities ?? [],
prefersEngine: [],
allowedScope: [],
deps: [],
...(over.manifestSnapshot ?? {}),
};
return {
productId: 'lysnrai',
stage: 'queued',
idempotencyKey: over.id,
contentHash: 'h',
bodyMd: '# task',
capabilities: [],
deps: [],
kind: 'leaf',
attempts: 0,
leaseEpoch: 0,
rev: 0,
createdAt: iso(0),
updatedAt: iso(0),
...over,
priority,
priorityOrder: over.priorityOrder ?? PRIORITY_ORDER[priority],
manifestSnapshot: manifest,
};
}
const fac = (over: Partial<SchedulerFactory> = {}): SchedulerFactory => ({
capabilities: [],
health: 'ok',
load: 0,
...over,
});
const ctx = (over: Partial<SchedulerContext> = {}): SchedulerContext => ({ now: NOW, ...over });
describe('scheduler §7 — capability hard filter', () => {
it('a factory missing a required capability never selects that job', () => {
const needsMac = job({ id: 'needs-mac', capabilities: ['os:mac'] });
expect(selectJob([needsMac], fac({ capabilities: [] }), ctx())).toBeNull();
expect(selectJob([needsMac], fac({ capabilities: ['os:mac', 'has:git'] }), ctx())?.id).toBe(
'needs-mac'
);
});
it('among candidates, only capability-subset jobs are eligible', () => {
const a = job({ id: 'a', capabilities: ['os:mac'] });
const b = job({ id: 'b', capabilities: ['os:linux'] }); // factory cannot run this
const pick = selectJob([a, b], fac({ capabilities: ['os:mac'] }), ctx());
expect(pick?.id).toBe('a');
});
it('capabilitiesSubset predicate', () => {
expect(capabilitiesSubset(['a', 'b'], ['a', 'b', 'c'])).toBe(true);
expect(capabilitiesSubset(['a', 'z'], ['a', 'b'])).toBe(false);
expect(capabilitiesSubset([], ['a'])).toBe(true);
});
});
describe('scheduler §7 — priority + age tie-breaks (all else equal)', () => {
it('priority dominates when scores tie', () => {
const low = job({ id: 'low', priority: 'low' });
const med = job({ id: 'med', priority: 'medium' });
const high = job({ id: 'high', priority: 'high' });
const crit = job({ id: 'crit', priority: 'critical' });
expect(selectJob([low, med, high, crit], fac(), ctx())?.id).toBe('crit');
});
it('age breaks ties deterministically — older wins among equal priority', () => {
// both within the same aging bucket ⇒ equal starvation ⇒ score tie ⇒ age tie-break
const older = job({ id: 'older', priority: 'medium', createdAt: iso(5_000) });
const newer = job({ id: 'newer', priority: 'medium', createdAt: iso(1_000) });
expect(selectJob([newer, older], fac(), ctx())?.id).toBe('older');
});
});
describe('scheduler §7 — load & health', () => {
it('a higher-load factory scores lower (1/(1+load))', () => {
const j = job({ id: 'j' });
const idle = scoreCandidate(j, fac({ load: 0 }), ctx()).score;
const busy = scoreCandidate(j, fac({ load: 5 }), ctx()).score;
expect(idle).toBeGreaterThan(busy);
});
it('degraded health scores lower than ok', () => {
const j = job({ id: 'j' });
const ok = scoreCandidate(j, fac({ health: 'ok' }), ctx()).score;
const degraded = scoreCandidate(j, fac({ health: 'degraded' }), ctx()).score;
expect(ok).toBeGreaterThan(degraded);
});
it('a down factory is filtered out entirely (health floor)', () => {
const j = job({ id: 'j' });
expect(selectJob([j], fac({ health: 'down' }), ctx())).toBeNull();
});
});
describe('scheduler §7 — starvation (anti-starvation aging)', () => {
it('an aged low-priority job outranks a fresh low-priority one', () => {
const fresh = job({ id: 'fresh', priority: 'low', createdAt: iso(0) });
const aged = job({ id: 'aged', priority: 'low', createdAt: iso(40 * 60_000) });
expect(selectJob([fresh, aged], fac(), ctx())?.id).toBe('aged');
// and the aged job's standalone score is higher
expect(scoreCandidate(aged, fac(), ctx()).score).toBeGreaterThan(
scoreCandidate(fresh, fac(), ctx()).score
);
});
});
describe('scheduler §7 — cost fit', () => {
it('a job exceeding the cost ceiling is penalized and ranked last', () => {
const within = job({ id: 'within', budget: { usd: 5 } });
const over = job({ id: 'over', budget: { usd: 100 } });
const c = ctx({ costCeilingUsd: 10 });
expect(selectJob([over, within], fac(), c)?.id).toBe('within');
expect(scoreCandidate(over, fac(), c).score).toBeLessThan(
scoreCandidate(within, fac(), c).score
);
});
it('no ceiling ⇒ cost is neutral (full costFit)', () => {
const j = job({ id: 'j', budget: { usd: 999 } });
expect(scoreCandidate(j, fac(), ctx()).breakdown.costFit).toBeCloseTo(DEFAULT_WEIGHTS.costFit);
});
});
describe('scheduler §7 — affinity', () => {
it('prefers-engine match raises affinity', () => {
const j = job({
id: 'j',
manifestSnapshot: { prefersEngine: ['devin'] } as FleetJobDoc['manifestSnapshot'],
});
const hit = scoreCandidate(j, fac(), ctx({ factoryEngines: ['devin'] })).breakdown.affinity;
const miss = scoreCandidate(j, fac(), ctx({ factoryEngines: ['claude'] })).breakdown.affinity;
expect(hit).toBeGreaterThan(miss);
});
});
describe('scheduler §7 — breakdown & determinism', () => {
it('breakdown is per-weighted-term and sums to the score', () => {
const j = job({ id: 'j', priority: 'high', budget: { usd: 3 }, createdAt: iso(90_000) });
const c = ctx({ costCeilingUsd: 10, factoryEngines: ['devin'] });
const { score, breakdown } = scoreCandidate(j, fac({ load: 2, health: 'degraded' }), c);
const sum =
breakdown.capabilityFit +
breakdown.affinity +
breakdown.load +
breakdown.costFit +
breakdown.health +
breakdown.starvation;
expect(sum).toBeCloseTo(score, 12);
expect(breakdown.starvation).toBeLessThanOrEqual(0); // signed penalty
});
it('selectJob is deterministic — same inputs ⇒ same pick across runs', () => {
const cands = [
job({ id: 'a', priority: 'medium', createdAt: iso(1_000) }),
job({ id: 'b', priority: 'high', createdAt: iso(2_000) }),
job({ id: 'c', priority: 'high', createdAt: iso(3_000) }),
];
const picks = Array.from({ length: 5 }, () => selectJob(cands, fac(), ctx())?.id);
expect(new Set(picks).size).toBe(1);
// highest priority (b/c are 'high' > a's 'medium'); b vs c tie on score+priority,
// so the age tie-break wins → c is older (created 3s ago vs b's 2s ago).
expect(picks[0]).toBe('c');
});
it('empty candidates ⇒ null; no eligible (none queued/blocked) ⇒ null', () => {
expect(selectJob([], fac(), ctx())).toBeNull();
const assigned = job({ id: 'x', stage: 'assigned' });
expect(selectJob([assigned], fac(), ctx())).toBeNull();
});
it('blocked jobs are eligible only when the deps predicate says so', () => {
const blocked = job({ id: 'blk', stage: 'blocked' });
expect(selectJob([blocked], fac(), ctx({ isDepsSatisfied: () => false }))).toBeNull();
expect(selectJob([blocked], fac(), ctx({ isDepsSatisfied: () => true }))?.id).toBe('blk');
});
});

View File

@ -0,0 +1,280 @@
/**
* Fleet scheduler / router core the deterministic, fixed-weight scoring engine
* that decides WHICH job a claiming factory gets (Phase 2, §7 of the gigafactory
* roadmap).
*
* This module is PURE and SYNCHRONOUS: no datastore calls, no clock reads, no env.
* Everything it needs is passed in health/load/seatLimit from the factory view,
* age from `job.createdAt` vs `ctx.now` (coordinator-authoritative time), and the
* deps-satisfied predicate (the coordinator resolves deps asynchronously and hands
* us a pure predicate). That keeps the scoring fully unit-testable and lets the
* coordinator own all I/O and the atomic compare-and-swap claim.
*
* Phasing (§7): Phase 2 ships the deterministic filter + fixed-weight scoring.
* Phase 3 adds tunable weights, preemption, and the explainability UI; Phase 5
* learns the weights. We deliberately do NOT build tunable weights or preemption
* here only the fixed-weight core + a per-term breakdown for explainability.
*
* Scoring formula (§7):
* score = w1·capabilityFit
* + w2·affinity(prefersEngine / repo-stickiness)
* + w3·(1 / (1 + load))
* + w4·costFit(budget)
* + w5·health
* w6·starvationPenalty(age)
*
* Selection: filter to deps-satisfied + capability-subset (+ health floor), rank
* by score, then a deterministic tie-break: higher priority older createdAt
* lower cost class.
*/
import type { FactoryHealth, FleetJobDoc } from './types.js';
// ── Weights (fixed this phase; overridable via a passed-in object, NOT env) ──
/** Fixed-weight config for the §7 scoring terms. Phase 3 makes these tunable. */
export interface SchedulerWeights {
/** w1 — hard capability fit (satisfied requirement ratio). */
capabilityFit: number;
/** w2 — affinity: prefers-engine match + warm-scope (repo) stickiness. */
affinity: number;
/** w3 — inverse load `1/(1+load)`; a busier factory scores lower. */
load: number;
/** w4 — cost fit: penalize jobs whose budget exceeds the factory cost ceiling. */
costFit: number;
/** w5 — factory health (ok=1, degraded=0.5; `down` is filtered out, not scored). */
health: number;
/** w6 starvation: subtracts a freshness penalty that decays as a job ages,
* so an aged job outranks an equally-prioritised fresh one (anti-starvation). */
starvation: number;
}
/**
* Phase-2 fixed defaults. Rationale: capability + health are hard signals (1.0);
* load matters strongly (1.0); cost is a moderate guard (0.75); affinity is a
* gentle nudge (0.5); starvation is weighted high enough (1.5) to lift an aged
* job above a same-priority fresh peer. Tunable per-product weights are Phase 3.
*/
export const DEFAULT_WEIGHTS: SchedulerWeights = {
capabilityFit: 1.0,
affinity: 0.5,
load: 1.0,
costFit: 0.75,
health: 1.0,
starvation: 1.5,
};
/** Aging config for the starvation term (fixed this phase). */
export interface StarvationConfig {
/** Width of one aging bucket, in ms. Ages within a bucket score identically,
* so jobs submitted close together tie (and fall through to the priority
* tie-break) rather than being separated by sub-second noise. */
intervalMs: number;
/** Number of buckets after which the freshness penalty fully decays to 0. */
buckets: number;
}
export const DEFAULT_STARVATION: StarvationConfig = {
intervalMs: 60_000, // 1 minute
buckets: 30, // fully de-penalised after ~30 minutes of waiting
};
// ── Factory view + context the engine consumes ──────────────────────────────
/**
* The factory fields the scorer needs a structural subset of FleetFactoryDoc
* (so a real factory doc is assignable), but also satisfiable from the claim
* context so the coordinator need not always load the full doc.
*/
export interface SchedulerFactory {
capabilities: string[];
health?: FactoryHealth;
load?: number;
seatLimit?: number;
}
/** Pure context: authoritative time + injected deps predicate + affinity/cost hints. */
export interface SchedulerContext {
/** Coordinator-authoritative now (ms epoch). Drives the starvation/age term. */
now: number;
/** Deps gate (the coordinator resolves deps async and passes a pure predicate).
* Omitted treated as satisfied. */
isDepsSatisfied?: (job: FleetJobDoc) => boolean;
/** Engines the claiming factory runs (for prefers-engine affinity). */
factoryEngines?: string[];
/** Scopes (repos/locks) the factory has warm (for repo-stickiness affinity). */
warmScopes?: string[];
/** The factory/budget cost ceiling in USD for cost-fit. Omitted ⇒ unconstrained. */
costCeilingUsd?: number;
/** Override the starvation aging config (fixed defaults otherwise). */
starvation?: StarvationConfig;
}
/** Per-term, already-weighted contributions. Sums to `score` (starvation signed ). */
export interface ScoreBreakdown {
capabilityFit: number;
affinity: number;
load: number;
costFit: number;
health: number;
starvation: number;
}
export interface ScoredCandidate {
score: number;
breakdown: ScoreBreakdown;
}
// ── Pure predicates / helpers ────────────────────────────────────────────────
/** Every required capability token must be advertised by the factory (hard gate). */
export function capabilitiesSubset(required: string[], available: string[]): boolean {
const set = new Set(available);
return required.every(token => set.has(token));
}
function overlaps(a: readonly string[], b: readonly string[]): boolean {
if (a.length === 0 || b.length === 0) return false;
const set = new Set(b);
return a.some(x => set.has(x));
}
const HEALTH_SCORE: Record<FactoryHealth, number> = { ok: 1, degraded: 0.5, down: 0 };
function clamp01(n: number): number {
if (n < 0) return 0;
if (n > 1) return 1;
return n;
}
/** w1 term — satisfied-requirement ratio (1 when the hard subset holds). */
function capabilityFitTerm(job: FleetJobDoc, factory: SchedulerFactory): number {
const required = job.capabilities ?? [];
if (required.length === 0) return 1;
const have = new Set(factory.capabilities);
const matched = required.reduce((n, cap) => (have.has(cap) ? n + 1 : n), 0);
return matched / required.length;
}
/** w2 term — prefers-engine match + warm-scope stickiness, each contributing half. */
function affinityTerm(job: FleetJobDoc, ctx: SchedulerContext): number {
const prefers = job.manifestSnapshot?.prefersEngine ?? [];
const scopes = job.manifestSnapshot?.allowedScope ?? [];
const prefersScore = prefers.length > 0 && overlaps(prefers, ctx.factoryEngines ?? []) ? 1 : 0;
const stickyScore = scopes.length > 0 && overlaps(scopes, ctx.warmScopes ?? []) ? 1 : 0;
return clamp01((prefersScore + stickyScore) / 2);
}
/** w3 term — inverse load. */
function loadTerm(factory: SchedulerFactory): number {
const load = factory.load ?? 0;
return 1 / (1 + Math.max(0, load));
}
/** w4 term 1 when within the cost ceiling (or unconstrained), decays toward 0
* the further a job's budget exceeds the ceiling. */
function costFitTerm(job: FleetJobDoc, ctx: SchedulerContext): number {
const budget = job.budget?.usd;
const ceiling = ctx.costCeilingUsd;
if (budget === undefined || ceiling === undefined) return 1;
if (ceiling <= 0) return budget <= 0 ? 1 : 0;
if (budget <= ceiling) return 1;
return clamp01(ceiling / budget);
}
/** w5 term — factory health as a [0,1] score. */
function healthTerm(factory: SchedulerFactory): number {
return HEALTH_SCORE[factory.health ?? 'ok'];
}
/** Freshness penalty in [0,1]: 1 for a brand-new job, decaying to 0 as it ages
* past `buckets` aging intervals. Subtracted from the score, so an aged job
* loses less and rises above an equally-prioritised fresh peer (anti-starvation).
* Bucketing makes near-simultaneous submissions tie ( priority tie-break). */
function starvationPenaltyTerm(job: FleetJobDoc, ctx: SchedulerContext): number {
const cfg = ctx.starvation ?? DEFAULT_STARVATION;
const ageMs = Math.max(0, ctx.now - Date.parse(job.createdAt));
const aged = Math.floor(ageMs / cfg.intervalMs);
return clamp01(1 - aged / cfg.buckets);
}
// ── Public scoring API ────────────────────────────────────────────────────────
/**
* Score one (job, factory) pair. Returns the total `score` plus the per-term,
* already-weighted `breakdown` (the six terms sum to `score`) for explainability
* (§7 / Phase-3 readiness). Pure + synchronous.
*/
export function scoreCandidate(
job: FleetJobDoc,
factory: SchedulerFactory,
ctx: SchedulerContext,
weights: SchedulerWeights = DEFAULT_WEIGHTS
): ScoredCandidate {
const breakdown: ScoreBreakdown = {
capabilityFit: weights.capabilityFit * capabilityFitTerm(job, factory),
affinity: weights.affinity * affinityTerm(job, ctx),
load: weights.load * loadTerm(factory),
costFit: weights.costFit * costFitTerm(job, ctx),
health: weights.health * healthTerm(factory),
starvation: -weights.starvation * starvationPenaltyTerm(job, ctx),
};
const score =
breakdown.capabilityFit +
breakdown.affinity +
breakdown.load +
breakdown.costFit +
breakdown.health +
breakdown.starvation;
return { score, breakdown };
}
/** Cost class used as the final tie-break (lower USD budget = lower class first). */
function costClass(job: FleetJobDoc): number {
return job.budget?.usd ?? 0;
}
/** Scores within EPSILON are treated as a tie and fall through to the §7 tie-break. */
const SCORE_EPSILON = 1e-9;
/**
* Pick the best job for `factory` from `candidates`:
* 1. filter to stage queued/blocked + deps-satisfied (ctx predicate) +
* hard capability-subset, and drop everything if the factory is `down`
* (health floor filtered, not merely down-weighted, per §7);
* 2. rank by `scoreCandidate` (descending);
* 3. deterministic tie-break: higher priority older createdAt lower cost class.
* Returns `null` when nothing is eligible. Pure + synchronous + deterministic.
*/
export function selectJob(
candidates: FleetJobDoc[],
factory: SchedulerFactory,
ctx: SchedulerContext,
weights: SchedulerWeights = DEFAULT_WEIGHTS
): FleetJobDoc | null {
// Health floor: a factory below the floor is excluded entirely (§7).
if ((factory.health ?? 'ok') === 'down') return null;
const depsSatisfied = ctx.isDepsSatisfied ?? (() => true);
const eligible = candidates.filter(
job =>
(job.stage === 'queued' || job.stage === 'blocked') &&
capabilitiesSubset(job.capabilities ?? [], factory.capabilities) &&
depsSatisfied(job)
);
if (eligible.length === 0) return null;
const scored = eligible.map(job => ({
job,
score: scoreCandidate(job, factory, ctx, weights).score,
}));
scored.sort((a, b) => {
if (Math.abs(b.score - a.score) > SCORE_EPSILON) return b.score - a.score; // higher score first
if (a.job.priorityOrder !== b.job.priorityOrder)
return a.job.priorityOrder - b.job.priorityOrder; // higher priority (lower order) first
const ageCmp = a.job.createdAt.localeCompare(b.job.createdAt); // older (earlier ISO) first
if (ageCmp !== 0) return ageCmp;
return costClass(a.job) - costClass(b.job); // lower cost class first
});
return scored[0].job;
}