From 7930e8b0bda00237787f2b961cb7c057eda6ec04 Mon Sep 17 00:00:00 2001 From: saravanakumardb1 Date: Fri, 29 May 2026 23:03:40 -0700 Subject: [PATCH 1/2] =?UTF-8?q?feat(platform-service):=20Phase=202=20sched?= =?UTF-8?q?uler/router=20core=20(=C2=A77)=20+=20wire=20into=20atomic=20cla?= =?UTF-8?q?im?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add a pure, fixed-weight scoring engine that decides WHICH queued job a claiming factory gets, and wire it into coordinator.claimNextJob (the atomic rev-CAS claim in tryClaimJob is unchanged). scheduler.ts (pure, synchronous, no I/O): - scoreCandidate(job, factory, ctx, weights?) -> { score, breakdown } score = w1*capabilityFit + w2*affinity + w3*(1/(1+load)) + w4*costFit(budget) + w5*health - w6*starvationPenalty(age); breakdown is per-weighted-term and sums to score (explainability / Phase-3 readiness). - selectJob(candidates, factory, ctx, weights?) -> FleetJobDoc | null filters to stage-eligible + deps-satisfied (injected pure predicate) + capability-subset (+ down-health floor), ranks by score, deterministic tie-break: higher priority -> older createdAt -> lower cost class. - Fixed default weights + bucketed anti-starvation aging (Phase 3 = tunable weights + preemption; intentionally NOT built here). coordinator.ts (candidate-ranking section only): - claimNextJob now resolves deps (store-backed) into a pure predicate, builds the factory view + authoritative now, and selects via selectJob; tryClaimJob CAS / lease / fence logic untouched. ClaimContext gains additive optional scheduler inputs (health/load/seatLimit/factoryEngines/warmScopes/costCeilingUsd). The pure capability-subset predicate moved into scheduler.ts and is re-exported. Tests: scheduler.test.ts (16) covers capability hard-filter, priority/age tie-breaks, load, health (+ down floor), starvation, cost fit, affinity, breakdown sum, determinism, empty/no-eligible. coordinator.test.ts adds score-driven selection, health floor, and ordered drain; all prior fleet tests stay green. Generated with [Devin](https://cli.devin.ai/docs) Co-Authored-By: Devin <158243242+devin-ai-integration[bot]@users.noreply.github.com> --- .../src/modules/fleet/coordinator.test.ts | 36 +++ .../src/modules/fleet/coordinator.ts | 78 +++-- .../src/modules/fleet/scheduler.test.ts | 203 +++++++++++++ .../src/modules/fleet/scheduler.ts | 280 ++++++++++++++++++ 4 files changed, 576 insertions(+), 21 deletions(-) create mode 100644 services/platform-service/src/modules/fleet/scheduler.test.ts create mode 100644 services/platform-service/src/modules/fleet/scheduler.ts diff --git a/services/platform-service/src/modules/fleet/coordinator.test.ts b/services/platform-service/src/modules/fleet/coordinator.test.ts index 74966e2e..78ecf83f 100644 --- a/services/platform-service/src/modules/fleet/coordinator.test.ts +++ b/services/platform-service/src/modules/fleet/coordinator.test.ts @@ -279,4 +279,40 @@ describe('fleet coordinator', () => { coord.submitJob(PID, input({ idempotencyKey: 'k', bodyMd: 'v2' })) ).rejects.toBeInstanceOf(ConflictError); }); + + // ── §7 SCORE-DRIVEN SELECTION (Phase 2 scheduler wired into claimNextJob) ── + it('selection now follows the §7 score: within-budget cost-fit beats an older same-priority job', async () => { + // jobB is submitted first (older) and is the same priority, but its budget blows + // the factory cost ceiling. The old priority+age rule would have taken the older + // jobB; the scorer prefers the within-budget jobA. + await coord.submitJob( + PID, + input({ idempotencyKey: 'B-old-expensive', priority: 'medium', budget: { usd: 100 } }) + ); + await coord.submitJob( + PID, + input({ idempotencyKey: 'A-new-cheap', priority: 'medium', budget: { usd: 5 } }) + ); + const claim = await coord.claimNextJob(factory({ costCeilingUsd: 10 })); + expect(claim?.job.idempotencyKey).toBe('A-new-cheap'); + }); + + it('claimNextJob: a factory below the health floor (down) claims nothing; a healthy one does', async () => { + await coord.submitJob(PID, input()); + expect(await coord.claimNextJob(factory({ health: 'down' }))).toBeNull(); + const ok = await coord.claimNextJob(factory({ health: 'ok' })); + expect(ok).not.toBeNull(); + }); + + it('claimNextJob drains in score order: highest priority first, then the next, then null', async () => { + await coord.submitJob(PID, input({ idempotencyKey: 'low', priority: 'low' })); + await coord.submitJob(PID, input({ idempotencyKey: 'crit', priority: 'critical' })); + expect((await coord.claimNextJob(factory({ factoryId: 'f1' })))?.job.idempotencyKey).toBe( + 'crit' + ); + expect((await coord.claimNextJob(factory({ factoryId: 'f2' })))?.job.idempotencyKey).toBe( + 'low' + ); + expect(await coord.claimNextJob(factory({ factoryId: 'f3' }))).toBeNull(); + }); }); diff --git a/services/platform-service/src/modules/fleet/coordinator.ts b/services/platform-service/src/modules/fleet/coordinator.ts index 1d259c54..6f333a16 100644 --- a/services/platform-service/src/modules/fleet/coordinator.ts +++ b/services/platform-service/src/modules/fleet/coordinator.ts @@ -20,6 +20,7 @@ import { createHash } from 'node:crypto'; import { BadRequestError, ConflictError } from '../../lib/errors.js'; import * as repo from './repository.js'; +import { selectJob, type SchedulerContext, type SchedulerFactory } from './scheduler.js'; import { ACTIVE_STAGES, DEP_DONE_HARD, @@ -38,11 +39,10 @@ export function contentHash(bodyMd: string): string { return createHash('sha256').update(bodyMd).digest('hex'); } -/** Every required capability token must be advertised by the factory. */ -export function capabilitiesSubset(required: string[], available: string[]): boolean { - const set = new Set(available); - return required.every(token => set.has(token)); -} +// The capability-subset predicate + the §7 scoring/selection engine live in the +// pure, unit-tested scheduler module. Re-export the predicate here to preserve the +// coordinator's public surface (claimNextJob now ranks candidates via selectJob). +export { capabilitiesSubset } from './scheduler.js'; // ── Dependency evaluation (§5) ──────────────────────────────────────────────── @@ -228,6 +228,19 @@ export interface ClaimContext { factoryId: string; capabilities: string[]; leaseSeconds: number; + // ── §7 scheduler inputs (additive, optional — sane defaults below) ── + /** Factory health; below the floor (`down`) the factory claims nothing. */ + health?: 'ok' | 'degraded' | 'down'; + /** Current factory load (busier ⇒ lower score). */ + load?: number; + /** Per-engine seat limit (carried for scoring/future seat-aware routing). */ + seatLimit?: number; + /** Engines this factory runs (prefers-engine affinity). */ + factoryEngines?: string[]; + /** Scopes (repos/locks) the factory has warm (repo-stickiness affinity). */ + warmScopes?: string[]; + /** Factory/budget cost ceiling in USD (cost-fit). */ + costCeilingUsd?: number; } export interface ClaimResult { @@ -236,12 +249,18 @@ export interface ClaimResult { run: FleetRunDoc; } -/** A job is eligible for a factory iff queued/blocked-with-met-deps + caps subset. */ -async function eligibleForClaim(job: FleetJobDoc, factoryCaps: string[]): Promise { - if (job.stage !== 'queued' && job.stage !== 'blocked') return false; - if (!capabilitiesSubset(job.capabilities, factoryCaps)) return false; - const unmet = await unmetDeps(job); - return unmet.length === 0; +/** + * Resolve which stage-eligible (queued/blocked) jobs currently have their deps + * satisfied. This is the store-backed (async) half of eligibility; the pure + * capability-subset filter + §7 scoring + tie-break are applied by `selectJob`. + */ +async function depsSatisfiedIds(jobs: FleetJobDoc[]): Promise> { + const satisfied = new Set(); + for (const job of jobs) { + if (job.stage !== 'queued' && job.stage !== 'blocked') continue; + if ((await unmetDeps(job)).length === 0) satisfied.add(job.id); + } + return satisfied; } /** @@ -314,19 +333,36 @@ export async function tryClaimJob( return { ok: true, doc: { job: claimed.doc, lease, run } }; } -/** Select the highest-priority, oldest eligible job and atomically claim it. */ +/** + * Select the best eligible job via the §7 scoring engine and atomically claim it. + * + * The coordinator owns all I/O: it lists candidates, resolves deps (store-backed) + * into a pure predicate, and builds the factory view + authoritative `now`. The + * pure `selectJob` then applies the capability hard-filter + fixed-weight scoring + * + deterministic tie-break (priority → age → cost class). The atomic + * single-winner guarantee remains entirely in `tryClaimJob`'s rev compare-and-swap, + * which is unchanged — on conflict we re-select and retry. + */ export async function claimNextJob(ctx: ClaimContext): Promise { + const factory: SchedulerFactory = { + capabilities: ctx.capabilities, + health: ctx.health ?? 'ok', + load: ctx.load ?? 0, + seatLimit: ctx.seatLimit ?? 1, + }; for (let i = 0; i < CLAIM_MAX_RETRIES; i++) { const candidates = await repo.listJobs({ productId: ctx.productId }); - const eligible: FleetJobDoc[] = []; - for (const job of candidates) { - if (await eligibleForClaim(job, ctx.capabilities)) eligible.push(job); - } - if (eligible.length === 0) return null; - eligible.sort( - (a, b) => a.priorityOrder - b.priorityOrder || a.createdAt.localeCompare(b.createdAt) - ); - const result = await tryClaimJob(eligible[0], ctx); + const satisfied = await depsSatisfiedIds(candidates); + const schedulerCtx: SchedulerContext = { + now: Date.now(), // coordinator-authoritative time + isDepsSatisfied: job => satisfied.has(job.id), + factoryEngines: ctx.factoryEngines, + warmScopes: ctx.warmScopes, + costCeilingUsd: ctx.costCeilingUsd, + }; + const pick = selectJob(candidates, factory, schedulerCtx); + if (!pick) return null; + const result = await tryClaimJob(pick, ctx); if (result.ok) return result.doc; if (result.reason === 'not_found') continue; // conflict: another factory won this version — re-select and retry diff --git a/services/platform-service/src/modules/fleet/scheduler.test.ts b/services/platform-service/src/modules/fleet/scheduler.test.ts new file mode 100644 index 00000000..b34e6fc4 --- /dev/null +++ b/services/platform-service/src/modules/fleet/scheduler.test.ts @@ -0,0 +1,203 @@ +/** + * Fleet scheduler / router core (§7) — pure, deterministic unit tests. + * No datastore, no real clock: time is injected via ctx.now. + */ + +import { describe, expect, it } from 'vitest'; +import { + DEFAULT_WEIGHTS, + scoreCandidate, + selectJob, + capabilitiesSubset, + type SchedulerContext, + type SchedulerFactory, +} from './scheduler.js'; +import { PRIORITY_ORDER, type FleetJobDoc, type FleetPriority } from './types.js'; + +const NOW = Date.parse('2026-05-29T12:00:00.000Z'); +const iso = (msAgo: number) => new Date(NOW - msAgo).toISOString(); + +/** Build a minimal valid FleetJobDoc for scoring. */ +function job(over: Partial & { id: string }): FleetJobDoc { + const priority: FleetPriority = over.priority ?? 'medium'; + const manifest: FleetJobDoc['manifestSnapshot'] = { + priority, + capabilities: over.capabilities ?? [], + prefersEngine: [], + allowedScope: [], + deps: [], + ...(over.manifestSnapshot ?? {}), + }; + return { + productId: 'lysnrai', + stage: 'queued', + idempotencyKey: over.id, + contentHash: 'h', + bodyMd: '# task', + capabilities: [], + deps: [], + kind: 'leaf', + attempts: 0, + leaseEpoch: 0, + rev: 0, + createdAt: iso(0), + updatedAt: iso(0), + ...over, + priority, + priorityOrder: over.priorityOrder ?? PRIORITY_ORDER[priority], + manifestSnapshot: manifest, + }; +} + +const fac = (over: Partial = {}): SchedulerFactory => ({ + capabilities: [], + health: 'ok', + load: 0, + ...over, +}); + +const ctx = (over: Partial = {}): SchedulerContext => ({ now: NOW, ...over }); + +describe('scheduler §7 — capability hard filter', () => { + it('a factory missing a required capability never selects that job', () => { + const needsMac = job({ id: 'needs-mac', capabilities: ['os:mac'] }); + expect(selectJob([needsMac], fac({ capabilities: [] }), ctx())).toBeNull(); + expect(selectJob([needsMac], fac({ capabilities: ['os:mac', 'has:git'] }), ctx())?.id).toBe( + 'needs-mac' + ); + }); + + it('among candidates, only capability-subset jobs are eligible', () => { + const a = job({ id: 'a', capabilities: ['os:mac'] }); + const b = job({ id: 'b', capabilities: ['os:linux'] }); // factory cannot run this + const pick = selectJob([a, b], fac({ capabilities: ['os:mac'] }), ctx()); + expect(pick?.id).toBe('a'); + }); + + it('capabilitiesSubset predicate', () => { + expect(capabilitiesSubset(['a', 'b'], ['a', 'b', 'c'])).toBe(true); + expect(capabilitiesSubset(['a', 'z'], ['a', 'b'])).toBe(false); + expect(capabilitiesSubset([], ['a'])).toBe(true); + }); +}); + +describe('scheduler §7 — priority + age tie-breaks (all else equal)', () => { + it('priority dominates when scores tie', () => { + const low = job({ id: 'low', priority: 'low' }); + const med = job({ id: 'med', priority: 'medium' }); + const high = job({ id: 'high', priority: 'high' }); + const crit = job({ id: 'crit', priority: 'critical' }); + expect(selectJob([low, med, high, crit], fac(), ctx())?.id).toBe('crit'); + }); + + it('age breaks ties deterministically — older wins among equal priority', () => { + // both within the same aging bucket ⇒ equal starvation ⇒ score tie ⇒ age tie-break + const older = job({ id: 'older', priority: 'medium', createdAt: iso(5_000) }); + const newer = job({ id: 'newer', priority: 'medium', createdAt: iso(1_000) }); + expect(selectJob([newer, older], fac(), ctx())?.id).toBe('older'); + }); +}); + +describe('scheduler §7 — load & health', () => { + it('a higher-load factory scores lower (1/(1+load))', () => { + const j = job({ id: 'j' }); + const idle = scoreCandidate(j, fac({ load: 0 }), ctx()).score; + const busy = scoreCandidate(j, fac({ load: 5 }), ctx()).score; + expect(idle).toBeGreaterThan(busy); + }); + + it('degraded health scores lower than ok', () => { + const j = job({ id: 'j' }); + const ok = scoreCandidate(j, fac({ health: 'ok' }), ctx()).score; + const degraded = scoreCandidate(j, fac({ health: 'degraded' }), ctx()).score; + expect(ok).toBeGreaterThan(degraded); + }); + + it('a down factory is filtered out entirely (health floor)', () => { + const j = job({ id: 'j' }); + expect(selectJob([j], fac({ health: 'down' }), ctx())).toBeNull(); + }); +}); + +describe('scheduler §7 — starvation (anti-starvation aging)', () => { + it('an aged low-priority job outranks a fresh low-priority one', () => { + const fresh = job({ id: 'fresh', priority: 'low', createdAt: iso(0) }); + const aged = job({ id: 'aged', priority: 'low', createdAt: iso(40 * 60_000) }); + expect(selectJob([fresh, aged], fac(), ctx())?.id).toBe('aged'); + // and the aged job's standalone score is higher + expect(scoreCandidate(aged, fac(), ctx()).score).toBeGreaterThan( + scoreCandidate(fresh, fac(), ctx()).score + ); + }); +}); + +describe('scheduler §7 — cost fit', () => { + it('a job exceeding the cost ceiling is penalized and ranked last', () => { + const within = job({ id: 'within', budget: { usd: 5 } }); + const over = job({ id: 'over', budget: { usd: 100 } }); + const c = ctx({ costCeilingUsd: 10 }); + expect(selectJob([over, within], fac(), c)?.id).toBe('within'); + expect(scoreCandidate(over, fac(), c).score).toBeLessThan( + scoreCandidate(within, fac(), c).score + ); + }); + + it('no ceiling ⇒ cost is neutral (full costFit)', () => { + const j = job({ id: 'j', budget: { usd: 999 } }); + expect(scoreCandidate(j, fac(), ctx()).breakdown.costFit).toBeCloseTo(DEFAULT_WEIGHTS.costFit); + }); +}); + +describe('scheduler §7 — affinity', () => { + it('prefers-engine match raises affinity', () => { + const j = job({ + id: 'j', + manifestSnapshot: { prefersEngine: ['devin'] } as FleetJobDoc['manifestSnapshot'], + }); + const hit = scoreCandidate(j, fac(), ctx({ factoryEngines: ['devin'] })).breakdown.affinity; + const miss = scoreCandidate(j, fac(), ctx({ factoryEngines: ['claude'] })).breakdown.affinity; + expect(hit).toBeGreaterThan(miss); + }); +}); + +describe('scheduler §7 — breakdown & determinism', () => { + it('breakdown is per-weighted-term and sums to the score', () => { + const j = job({ id: 'j', priority: 'high', budget: { usd: 3 }, createdAt: iso(90_000) }); + const c = ctx({ costCeilingUsd: 10, factoryEngines: ['devin'] }); + const { score, breakdown } = scoreCandidate(j, fac({ load: 2, health: 'degraded' }), c); + const sum = + breakdown.capabilityFit + + breakdown.affinity + + breakdown.load + + breakdown.costFit + + breakdown.health + + breakdown.starvation; + expect(sum).toBeCloseTo(score, 12); + expect(breakdown.starvation).toBeLessThanOrEqual(0); // signed penalty + }); + + it('selectJob is deterministic — same inputs ⇒ same pick across runs', () => { + const cands = [ + job({ id: 'a', priority: 'medium', createdAt: iso(1_000) }), + job({ id: 'b', priority: 'high', createdAt: iso(2_000) }), + job({ id: 'c', priority: 'high', createdAt: iso(3_000) }), + ]; + const picks = Array.from({ length: 5 }, () => selectJob(cands, fac(), ctx())?.id); + expect(new Set(picks).size).toBe(1); + // highest priority (b/c are 'high' > a's 'medium'); b vs c tie on score+priority, + // so the age tie-break wins → c is older (created 3s ago vs b's 2s ago). + expect(picks[0]).toBe('c'); + }); + + it('empty candidates ⇒ null; no eligible (none queued/blocked) ⇒ null', () => { + expect(selectJob([], fac(), ctx())).toBeNull(); + const assigned = job({ id: 'x', stage: 'assigned' }); + expect(selectJob([assigned], fac(), ctx())).toBeNull(); + }); + + it('blocked jobs are eligible only when the deps predicate says so', () => { + const blocked = job({ id: 'blk', stage: 'blocked' }); + expect(selectJob([blocked], fac(), ctx({ isDepsSatisfied: () => false }))).toBeNull(); + expect(selectJob([blocked], fac(), ctx({ isDepsSatisfied: () => true }))?.id).toBe('blk'); + }); +}); diff --git a/services/platform-service/src/modules/fleet/scheduler.ts b/services/platform-service/src/modules/fleet/scheduler.ts new file mode 100644 index 00000000..58fb0e2b --- /dev/null +++ b/services/platform-service/src/modules/fleet/scheduler.ts @@ -0,0 +1,280 @@ +/** + * Fleet scheduler / router core — the deterministic, fixed-weight scoring engine + * that decides WHICH job a claiming factory gets (Phase 2, §7 of the gigafactory + * roadmap). + * + * This module is PURE and SYNCHRONOUS: no datastore calls, no clock reads, no env. + * Everything it needs is passed in — health/load/seatLimit from the factory view, + * age from `job.createdAt` vs `ctx.now` (coordinator-authoritative time), and the + * deps-satisfied predicate (the coordinator resolves deps asynchronously and hands + * us a pure predicate). That keeps the scoring fully unit-testable and lets the + * coordinator own all I/O and the atomic compare-and-swap claim. + * + * Phasing (§7): Phase 2 ships the deterministic filter + fixed-weight scoring. + * Phase 3 adds tunable weights, preemption, and the explainability UI; Phase 5 + * learns the weights. We deliberately do NOT build tunable weights or preemption + * here — only the fixed-weight core + a per-term breakdown for explainability. + * + * Scoring formula (§7): + * score = w1·capabilityFit + * + w2·affinity(prefersEngine / repo-stickiness) + * + w3·(1 / (1 + load)) + * + w4·costFit(budget) + * + w5·health + * − w6·starvationPenalty(age) + * + * Selection: filter to deps-satisfied + capability-subset (+ health floor), rank + * by score, then a deterministic tie-break: higher priority → older createdAt → + * lower cost class. + */ + +import type { FactoryHealth, FleetJobDoc } from './types.js'; + +// ── Weights (fixed this phase; overridable via a passed-in object, NOT env) ── + +/** Fixed-weight config for the §7 scoring terms. Phase 3 makes these tunable. */ +export interface SchedulerWeights { + /** w1 — hard capability fit (satisfied requirement ratio). */ + capabilityFit: number; + /** w2 — affinity: prefers-engine match + warm-scope (repo) stickiness. */ + affinity: number; + /** w3 — inverse load `1/(1+load)`; a busier factory scores lower. */ + load: number; + /** w4 — cost fit: penalize jobs whose budget exceeds the factory cost ceiling. */ + costFit: number; + /** w5 — factory health (ok=1, degraded=0.5; `down` is filtered out, not scored). */ + health: number; + /** w6 — starvation: subtracts a freshness penalty that decays as a job ages, + * so an aged job outranks an equally-prioritised fresh one (anti-starvation). */ + starvation: number; +} + +/** + * Phase-2 fixed defaults. Rationale: capability + health are hard signals (1.0); + * load matters strongly (1.0); cost is a moderate guard (0.75); affinity is a + * gentle nudge (0.5); starvation is weighted high enough (1.5) to lift an aged + * job above a same-priority fresh peer. Tunable per-product weights are Phase 3. + */ +export const DEFAULT_WEIGHTS: SchedulerWeights = { + capabilityFit: 1.0, + affinity: 0.5, + load: 1.0, + costFit: 0.75, + health: 1.0, + starvation: 1.5, +}; + +/** Aging config for the starvation term (fixed this phase). */ +export interface StarvationConfig { + /** Width of one aging bucket, in ms. Ages within a bucket score identically, + * so jobs submitted close together tie (and fall through to the priority + * tie-break) rather than being separated by sub-second noise. */ + intervalMs: number; + /** Number of buckets after which the freshness penalty fully decays to 0. */ + buckets: number; +} + +export const DEFAULT_STARVATION: StarvationConfig = { + intervalMs: 60_000, // 1 minute + buckets: 30, // fully de-penalised after ~30 minutes of waiting +}; + +// ── Factory view + context the engine consumes ────────────────────────────── + +/** + * The factory fields the scorer needs — a structural subset of FleetFactoryDoc + * (so a real factory doc is assignable), but also satisfiable from the claim + * context so the coordinator need not always load the full doc. + */ +export interface SchedulerFactory { + capabilities: string[]; + health?: FactoryHealth; + load?: number; + seatLimit?: number; +} + +/** Pure context: authoritative time + injected deps predicate + affinity/cost hints. */ +export interface SchedulerContext { + /** Coordinator-authoritative now (ms epoch). Drives the starvation/age term. */ + now: number; + /** Deps gate (the coordinator resolves deps async and passes a pure predicate). + * Omitted ⇒ treated as satisfied. */ + isDepsSatisfied?: (job: FleetJobDoc) => boolean; + /** Engines the claiming factory runs (for prefers-engine affinity). */ + factoryEngines?: string[]; + /** Scopes (repos/locks) the factory has warm (for repo-stickiness affinity). */ + warmScopes?: string[]; + /** The factory/budget cost ceiling in USD for cost-fit. Omitted ⇒ unconstrained. */ + costCeilingUsd?: number; + /** Override the starvation aging config (fixed defaults otherwise). */ + starvation?: StarvationConfig; +} + +/** Per-term, already-weighted contributions. Sums to `score` (starvation signed −). */ +export interface ScoreBreakdown { + capabilityFit: number; + affinity: number; + load: number; + costFit: number; + health: number; + starvation: number; +} + +export interface ScoredCandidate { + score: number; + breakdown: ScoreBreakdown; +} + +// ── Pure predicates / helpers ──────────────────────────────────────────────── + +/** Every required capability token must be advertised by the factory (hard gate). */ +export function capabilitiesSubset(required: string[], available: string[]): boolean { + const set = new Set(available); + return required.every(token => set.has(token)); +} + +function overlaps(a: readonly string[], b: readonly string[]): boolean { + if (a.length === 0 || b.length === 0) return false; + const set = new Set(b); + return a.some(x => set.has(x)); +} + +const HEALTH_SCORE: Record = { ok: 1, degraded: 0.5, down: 0 }; + +function clamp01(n: number): number { + if (n < 0) return 0; + if (n > 1) return 1; + return n; +} + +/** w1 term — satisfied-requirement ratio (1 when the hard subset holds). */ +function capabilityFitTerm(job: FleetJobDoc, factory: SchedulerFactory): number { + const required = job.capabilities ?? []; + if (required.length === 0) return 1; + const have = new Set(factory.capabilities); + const matched = required.reduce((n, cap) => (have.has(cap) ? n + 1 : n), 0); + return matched / required.length; +} + +/** w2 term — prefers-engine match + warm-scope stickiness, each contributing half. */ +function affinityTerm(job: FleetJobDoc, ctx: SchedulerContext): number { + const prefers = job.manifestSnapshot?.prefersEngine ?? []; + const scopes = job.manifestSnapshot?.allowedScope ?? []; + const prefersScore = prefers.length > 0 && overlaps(prefers, ctx.factoryEngines ?? []) ? 1 : 0; + const stickyScore = scopes.length > 0 && overlaps(scopes, ctx.warmScopes ?? []) ? 1 : 0; + return clamp01((prefersScore + stickyScore) / 2); +} + +/** w3 term — inverse load. */ +function loadTerm(factory: SchedulerFactory): number { + const load = factory.load ?? 0; + return 1 / (1 + Math.max(0, load)); +} + +/** w4 term — 1 when within the cost ceiling (or unconstrained), decays toward 0 + * the further a job's budget exceeds the ceiling. */ +function costFitTerm(job: FleetJobDoc, ctx: SchedulerContext): number { + const budget = job.budget?.usd; + const ceiling = ctx.costCeilingUsd; + if (budget === undefined || ceiling === undefined) return 1; + if (ceiling <= 0) return budget <= 0 ? 1 : 0; + if (budget <= ceiling) return 1; + return clamp01(ceiling / budget); +} + +/** w5 term — factory health as a [0,1] score. */ +function healthTerm(factory: SchedulerFactory): number { + return HEALTH_SCORE[factory.health ?? 'ok']; +} + +/** Freshness penalty in [0,1]: 1 for a brand-new job, decaying to 0 as it ages + * past `buckets` aging intervals. Subtracted from the score, so an aged job + * loses less and rises above an equally-prioritised fresh peer (anti-starvation). + * Bucketing makes near-simultaneous submissions tie (→ priority tie-break). */ +function starvationPenaltyTerm(job: FleetJobDoc, ctx: SchedulerContext): number { + const cfg = ctx.starvation ?? DEFAULT_STARVATION; + const ageMs = Math.max(0, ctx.now - Date.parse(job.createdAt)); + const aged = Math.floor(ageMs / cfg.intervalMs); + return clamp01(1 - aged / cfg.buckets); +} + +// ── Public scoring API ──────────────────────────────────────────────────────── + +/** + * Score one (job, factory) pair. Returns the total `score` plus the per-term, + * already-weighted `breakdown` (the six terms sum to `score`) for explainability + * (§7 / Phase-3 readiness). Pure + synchronous. + */ +export function scoreCandidate( + job: FleetJobDoc, + factory: SchedulerFactory, + ctx: SchedulerContext, + weights: SchedulerWeights = DEFAULT_WEIGHTS +): ScoredCandidate { + const breakdown: ScoreBreakdown = { + capabilityFit: weights.capabilityFit * capabilityFitTerm(job, factory), + affinity: weights.affinity * affinityTerm(job, ctx), + load: weights.load * loadTerm(factory), + costFit: weights.costFit * costFitTerm(job, ctx), + health: weights.health * healthTerm(factory), + starvation: -weights.starvation * starvationPenaltyTerm(job, ctx), + }; + const score = + breakdown.capabilityFit + + breakdown.affinity + + breakdown.load + + breakdown.costFit + + breakdown.health + + breakdown.starvation; + return { score, breakdown }; +} + +/** Cost class used as the final tie-break (lower USD budget = lower class first). */ +function costClass(job: FleetJobDoc): number { + return job.budget?.usd ?? 0; +} + +/** Scores within EPSILON are treated as a tie and fall through to the §7 tie-break. */ +const SCORE_EPSILON = 1e-9; + +/** + * Pick the best job for `factory` from `candidates`: + * 1. filter to stage queued/blocked + deps-satisfied (ctx predicate) + + * hard capability-subset, and drop everything if the factory is `down` + * (health floor — filtered, not merely down-weighted, per §7); + * 2. rank by `scoreCandidate` (descending); + * 3. deterministic tie-break: higher priority → older createdAt → lower cost class. + * Returns `null` when nothing is eligible. Pure + synchronous + deterministic. + */ +export function selectJob( + candidates: FleetJobDoc[], + factory: SchedulerFactory, + ctx: SchedulerContext, + weights: SchedulerWeights = DEFAULT_WEIGHTS +): FleetJobDoc | null { + // Health floor: a factory below the floor is excluded entirely (§7). + if ((factory.health ?? 'ok') === 'down') return null; + + const depsSatisfied = ctx.isDepsSatisfied ?? (() => true); + const eligible = candidates.filter( + job => + (job.stage === 'queued' || job.stage === 'blocked') && + capabilitiesSubset(job.capabilities ?? [], factory.capabilities) && + depsSatisfied(job) + ); + if (eligible.length === 0) return null; + + const scored = eligible.map(job => ({ + job, + score: scoreCandidate(job, factory, ctx, weights).score, + })); + scored.sort((a, b) => { + if (Math.abs(b.score - a.score) > SCORE_EPSILON) return b.score - a.score; // higher score first + if (a.job.priorityOrder !== b.job.priorityOrder) + return a.job.priorityOrder - b.job.priorityOrder; // higher priority (lower order) first + const ageCmp = a.job.createdAt.localeCompare(b.job.createdAt); // older (earlier ISO) first + if (ageCmp !== 0) return ageCmp; + return costClass(a.job) - costClass(b.job); // lower cost class first + }); + return scored[0].job; +} From b65e818f3dd0a1a372451105c8678dbcfa77e0ac Mon Sep 17 00:00:00 2001 From: saravanakumardb1 Date: Fri, 29 May 2026 23:11:45 -0700 Subject: [PATCH 2/2] =?UTF-8?q?feat(platform-service):=20fleet=20artifacts?= =?UTF-8?q?=20+=20blob=20wiring=20(=C2=A713)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Artifact pointers in fleet_artifacts; large outputs in @bytelyst/blob (never Cosmos). Routes: POST/GET /fleet/jobs/:id/artifacts, GET/DELETE /fleet/artifacts/:id with short-lived SAS. 7 artifact tests. --- .../src/modules/fleet/artifacts-blob.ts | 112 ++++++++ .../src/modules/fleet/artifacts.test.ts | 251 ++++++++++++++++++ .../src/modules/fleet/repository.test.ts | 20 +- .../src/modules/fleet/repository.ts | 31 ++- .../src/modules/fleet/routes.ts | 55 ++++ .../src/modules/fleet/types.test.ts | 10 +- .../src/modules/fleet/types.ts | 36 ++- 7 files changed, 502 insertions(+), 13 deletions(-) create mode 100644 services/platform-service/src/modules/fleet/artifacts-blob.ts create mode 100644 services/platform-service/src/modules/fleet/artifacts.test.ts diff --git a/services/platform-service/src/modules/fleet/artifacts-blob.ts b/services/platform-service/src/modules/fleet/artifacts-blob.ts new file mode 100644 index 00000000..93a98f04 --- /dev/null +++ b/services/platform-service/src/modules/fleet/artifacts-blob.ts @@ -0,0 +1,112 @@ +/** + * Fleet artifacts — blob wiring (§13/§26). + * + * Large run outputs (logs, coverage, screenshots, build output) are written to + * blob storage; only a POINTER (blob key + size/content-type/sha256) is persisted + * in the `fleet_artifacts` Cosmos container. The bytes NEVER touch Cosmos — this + * keeps documents well under the doc-size / RU ceilings no matter how large a log + * gets. Read access is granted via a short-lived SAS URL minted on demand from the + * stored key (the URL itself is never persisted). + * + * Blob key scheme (deterministic, product- and job-scoped): + * fleet///- + */ + +import { createHash, randomUUID } from 'node:crypto'; +import { getBucket, generateSasUrl } from '../../lib/blob.js'; +import * as repo from './repository.js'; +import { FleetArtifactDocSchema, type FleetArtifactDoc, type FleetArtifactKind } from './types.js'; + +/** Container holding all fleet run-output blobs. */ +export const FLEET_ARTIFACTS_CONTAINER = 'fleet-artifacts'; + +/** SAS read-URL lifetime. Short-lived — callers re-issue via getArtifactDownload. */ +export const ARTIFACT_SAS_TTL_MINUTES = 15; + +/** A persisted artifact pointer paired with a freshly-minted SAS read URL. */ +export interface ArtifactWithDownload { + artifact: FleetArtifactDoc; + downloadUrl: string; +} + +export interface UploadArtifactArgs { + productId: string; + jobId: string; + kind: FleetArtifactKind; + bytes: Buffer; + contentType: string; + runId?: string; +} + +/** Deterministic blob key for an artifact. */ +export function artifactBlobKey( + productId: string, + jobId: string, + artifactId: string, + kind: FleetArtifactKind +): string { + return `fleet/${productId}/${jobId}/${artifactId}-${kind}`; +} + +/** + * Upload artifact bytes to blob storage and persist the Cosmos pointer. + * Returns the pointer doc plus a short-lived SAS read URL. The bytes live only in + * blob — the returned doc carries no inline payload. + */ +export async function uploadArtifact(args: UploadArtifactArgs): Promise { + const id = `art_${randomUUID()}`; + const blobKey = artifactBlobKey(args.productId, args.jobId, id, args.kind); + + const bucket = await getBucket(FLEET_ARTIFACTS_CONTAINER); + const meta = await bucket.upload(blobKey, args.bytes, { contentType: args.contentType }); + + const sha256 = createHash('sha256').update(args.bytes).digest('hex'); + + const doc: FleetArtifactDoc = FleetArtifactDocSchema.parse({ + id, + productId: args.productId, + jobId: args.jobId, + runId: args.runId, + kind: args.kind, + blobKey, + contentType: args.contentType, + sizeBytes: meta.size ?? args.bytes.length, + sha256, + createdAt: new Date().toISOString(), + }); + + const artifact = await repo.createArtifact(doc); + const downloadUrl = await issueDownloadUrl(blobKey); + return { artifact, downloadUrl }; +} + +/** + * Re-issue a fresh short-lived SAS read URL for an existing artifact, scoped to + * its owning product. Returns null when the artifact does not exist for that product. + */ +export async function getArtifactDownload( + id: string, + productId: string +): Promise { + const artifact = await repo.getArtifact(id, productId); + if (!artifact) return null; + const downloadUrl = await issueDownloadUrl(artifact.blobKey); + return { artifact, downloadUrl }; +} + +/** + * Delete an artifact's pointer (and its backing blob), scoped to its owning + * product. Returns false when nothing matched. + */ +export async function deleteArtifact(id: string, productId: string): Promise { + const artifact = await repo.getArtifact(id, productId); + if (!artifact) return false; + const bucket = await getBucket(FLEET_ARTIFACTS_CONTAINER); + await bucket.delete(artifact.blobKey); + await repo.deleteArtifact(id, productId); + return true; +} + +function issueDownloadUrl(blobKey: string): Promise { + return generateSasUrl(FLEET_ARTIFACTS_CONTAINER, blobKey, 'r', ARTIFACT_SAS_TTL_MINUTES); +} diff --git a/services/platform-service/src/modules/fleet/artifacts.test.ts b/services/platform-service/src/modules/fleet/artifacts.test.ts new file mode 100644 index 00000000..b96c1b22 --- /dev/null +++ b/services/platform-service/src/modules/fleet/artifacts.test.ts @@ -0,0 +1,251 @@ +/** + * Fleet artifacts — blob wiring (§13/§26). + * + * Runs on the in-memory datastore + in-memory blob provider. The central + * guarantee under test: artifact BYTES live in blob storage and only a POINTER + * (blobKey + size/content-type/sha256) is persisted in Cosmos — never the bytes. + * + * Auth + productId resolution are mocked exactly as the items / fleet routes + * tests do. The non-route service tests don't touch those mocks. + */ + +// Select the in-memory blob provider before the storage singleton is created. +process.env.STORAGE_PROVIDER = 'memory'; + +import Fastify, { type FastifyInstance } from 'fastify'; +import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'; +import { MemoryDatastoreProvider } from '@bytelyst/datastore'; +import { _resetBlobClient, getBucket } from '@bytelyst/blob'; +import { _resetDatastoreProvider, setProvider } from '../../lib/datastore.js'; +import * as repo from './repository.js'; +import * as artifactsBlob from './artifacts-blob.js'; + +vi.mock('../../lib/auth.js', () => ({ + extractAuth: vi.fn(async () => ({ sub: 'user_1', role: 'admin' })), +})); +vi.mock('../../lib/request-context.js', () => ({ + getRequestProductId: () => 'lysnrai', +})); + +const PID = 'lysnrai'; + +/** Fields that would indicate bytes were (wrongly) inlined into the Cosmos doc. */ +const INLINE_PAYLOAD_FIELDS = [ + 'contentBase64', + 'payload', + 'bytes', + 'data', + 'content', + 'body', + 'blob', +]; + +async function buildApp(): Promise { + const { fleetRoutes } = await import('./routes.js'); + const app = Fastify({ logger: false }); + await app.register(fleetRoutes, { prefix: '/api' }); + return app; +} + +beforeEach(() => { + setProvider(new MemoryDatastoreProvider()); + _resetBlobClient(); +}); +afterEach(() => { + _resetDatastoreProvider(); + _resetBlobClient(); + vi.clearAllMocks(); +}); + +describe('fleet artifacts — blob service', () => { + it('upload writes bytes to blob and persists a pointer-only Cosmos doc', async () => { + const bytes = Buffer.from('hello log output'); + const { artifact, downloadUrl } = await artifactsBlob.uploadArtifact({ + productId: PID, + jobId: 'j1', + kind: 'log', + bytes, + contentType: 'text/plain', + }); + + // pointer metadata + expect(artifact.productId).toBe(PID); + expect(artifact.jobId).toBe('j1'); + expect(artifact.kind).toBe('log'); + expect(artifact.contentType).toBe('text/plain'); + expect(artifact.sizeBytes).toBe(bytes.length); + expect(artifact.blobKey).toBe(`fleet/${PID}/j1/${artifact.id}-log`); + expect(artifact.sha256).toMatch(/^[0-9a-f]{64}$/); + expect(downloadUrl).toContain('signed=true'); + + // the persisted Cosmos doc carries NO inline payload — bytes are not in Cosmos + const stored = await repo.getArtifact(artifact.id, PID); + expect(stored).not.toBeNull(); + for (const field of INLINE_PAYLOAD_FIELDS) { + expect(stored).not.toHaveProperty(field); + } + // only the pointer/metadata fields are present + expect(stored).toMatchObject({ + id: artifact.id, + productId: PID, + jobId: 'j1', + kind: 'log', + blobKey: artifact.blobKey, + contentType: 'text/plain', + sizeBytes: bytes.length, + }); + + // the actual bytes DO live in blob storage + const bucket = await getBucket(artifactsBlob.FLEET_ARTIFACTS_CONTAINER); + const fromBlob = await bucket.download(artifact.blobKey); + expect(fromBlob.toString()).toBe('hello log output'); + }); + + it('list by job is partition-isolated (returns only that job)', async () => { + await artifactsBlob.uploadArtifact({ + productId: PID, + jobId: 'jA', + kind: 'log', + bytes: Buffer.from('a1'), + contentType: 'text/plain', + }); + await artifactsBlob.uploadArtifact({ + productId: PID, + jobId: 'jA', + kind: 'coverage', + bytes: Buffer.from('a2'), + contentType: 'application/json', + }); + await artifactsBlob.uploadArtifact({ + productId: PID, + jobId: 'jB', + kind: 'screenshot', + bytes: Buffer.from('b1'), + contentType: 'image/png', + }); + + const a = await repo.listArtifactsByJob('jA'); + const b = await repo.listArtifactsByJob('jB'); + expect(a).toHaveLength(2); + expect(a.every(x => x.jobId === 'jA')).toBe(true); + expect(b).toHaveLength(1); + expect(b[0].jobId).toBe('jB'); + }); + + it('get re-issues a fresh SAS URL; a large (>Cosmos-safe) payload still succeeds (blob offload)', async () => { + // 3 MB — comfortably beyond the ~2 MB Cosmos document ceiling, so this can + // only succeed because the bytes are offloaded to blob, not stored inline. + const big = Buffer.alloc(3 * 1024 * 1024, 0x61); + const { artifact } = await artifactsBlob.uploadArtifact({ + productId: PID, + jobId: 'jbig', + kind: 'build', + bytes: big, + contentType: 'application/octet-stream', + }); + expect(artifact.sizeBytes).toBe(3 * 1024 * 1024); + + const dl = await artifactsBlob.getArtifactDownload(artifact.id, PID); + expect(dl).not.toBeNull(); + expect(dl?.downloadUrl).toContain('signed=true'); + expect(dl?.downloadUrl).toContain(artifact.blobKey); + + // product-scoped: a foreign product cannot fetch it + expect(await artifactsBlob.getArtifactDownload(artifact.id, 'other-product')).toBeNull(); + // bytes round-trip from blob at full size + const bucket = await getBucket(artifactsBlob.FLEET_ARTIFACTS_CONTAINER); + expect((await bucket.download(artifact.blobKey)).length).toBe(3 * 1024 * 1024); + }); + + it('delete removes both the pointer and the backing blob', async () => { + const { artifact } = await artifactsBlob.uploadArtifact({ + productId: PID, + jobId: 'jd', + kind: 'other', + bytes: Buffer.from('tmp'), + contentType: 'text/plain', + }); + const bucket = await getBucket(artifactsBlob.FLEET_ARTIFACTS_CONTAINER); + expect(await bucket.exists(artifact.blobKey)).toBe(true); + + expect(await artifactsBlob.deleteArtifact(artifact.id, PID)).toBe(true); + expect(await repo.getArtifact(artifact.id, PID)).toBeNull(); + expect(await bucket.exists(artifact.blobKey)).toBe(false); + + // idempotent / unknown → false + expect(await artifactsBlob.deleteArtifact(artifact.id, PID)).toBe(false); + }); +}); + +describe('fleet artifacts — routes (fastify inject)', () => { + const b64 = (s: string) => Buffer.from(s).toString('base64'); + + it('POST upload → 201 with pointer + SAS; GET list → the pointer; bytes not echoed', async () => { + const app = await buildApp(); + const res = await app.inject({ + method: 'POST', + url: '/api/fleet/jobs/jr1/artifacts', + payload: { kind: 'log', contentType: 'text/plain', contentBase64: b64('route log bytes') }, + }); + expect(res.statusCode).toBe(201); + const body = JSON.parse(res.body); + expect(body.artifact.jobId).toBe('jr1'); + expect(body.artifact.blobKey).toBe(`fleet/${PID}/jr1/${body.artifact.id}-log`); + expect(body.artifact.sizeBytes).toBe(Buffer.from('route log bytes').length); + expect(body.downloadUrl).toContain('signed=true'); + for (const field of INLINE_PAYLOAD_FIELDS) { + expect(body.artifact).not.toHaveProperty(field); + } + + const list = await app.inject({ method: 'GET', url: '/api/fleet/jobs/jr1/artifacts' }); + expect(list.statusCode).toBe(200); + const arts = JSON.parse(list.body).artifacts; + expect(arts).toHaveLength(1); + expect(arts[0].id).toBe(body.artifact.id); + }); + + it('GET /fleet/artifacts/:id returns a fresh SAS URL; DELETE removes it', async () => { + const app = await buildApp(); + const up = await app.inject({ + method: 'POST', + url: '/api/fleet/jobs/jr2/artifacts', + payload: { + kind: 'coverage', + contentType: 'application/json', + contentBase64: b64('{"pct":91}'), + }, + }); + const id = JSON.parse(up.body).artifact.id as string; + + const got = await app.inject({ method: 'GET', url: `/api/fleet/artifacts/${id}` }); + expect(got.statusCode).toBe(200); + const gotBody = JSON.parse(got.body); + expect(gotBody.artifact.id).toBe(id); + expect(gotBody.downloadUrl).toContain('signed=true'); + + const del = await app.inject({ method: 'DELETE', url: `/api/fleet/artifacts/${id}` }); + expect(del.statusCode).toBe(200); + expect(JSON.parse(del.body).deleted).toBe(true); + + // gone now + const after = await app.inject({ method: 'GET', url: `/api/fleet/artifacts/${id}` }); + expect(after.statusCode).toBe(404); + }); + + it('rejects an invalid upload body (400) and unknown artifact ids (404)', async () => { + const app = await buildApp(); + + const bad = await app.inject({ + method: 'POST', + url: '/api/fleet/jobs/jr3/artifacts', + payload: { contentBase64: b64('x') }, // missing kind + }); + expect(bad.statusCode).toBe(400); + + const getMissing = await app.inject({ method: 'GET', url: '/api/fleet/artifacts/nope' }); + expect(getMissing.statusCode).toBe(404); + + const delMissing = await app.inject({ method: 'DELETE', url: '/api/fleet/artifacts/nope' }); + expect(delMissing.statusCode).toBe(404); + }); +}); diff --git a/services/platform-service/src/modules/fleet/repository.test.ts b/services/platform-service/src/modules/fleet/repository.test.ts index cdf5b6b5..e7c393dc 100644 --- a/services/platform-service/src/modules/fleet/repository.test.ts +++ b/services/platform-service/src/modules/fleet/repository.test.ts @@ -171,17 +171,29 @@ describe('fleet repository', () => { expect(events.map(e => e.type)).toEqual(['submitted', 'assigned', 'transition']); }); - it('artifacts: create + list', async () => { + it('artifacts: create + listByJob + get + delete (pointer only)', async () => { await repo.createArtifact({ id: 'art_1', productId: PID, jobId: 'j', kind: 'coverage', - blobUrl: 'https://b/x', + blobKey: 'fleet/lysnrai/j/art_1-coverage', + contentType: 'application/json', + sizeBytes: 42, createdAt: now, }); - const arts = await repo.listArtifacts('j'); + const arts = await repo.listArtifactsByJob('j'); expect(arts).toHaveLength(1); - expect(arts[0].blobUrl).toBe('https://b/x'); + expect(arts[0].blobKey).toBe('fleet/lysnrai/j/art_1-coverage'); + + // get is product-scoped (wrong product → null) + expect((await repo.getArtifact('art_1', PID))?.contentType).toBe('application/json'); + expect(await repo.getArtifact('art_1', 'other-product')).toBeNull(); + + // delete returns the removed doc and clears the partition + const removed = await repo.deleteArtifact('art_1', PID); + expect(removed?.id).toBe('art_1'); + expect(await repo.listArtifactsByJob('j')).toHaveLength(0); + expect(await repo.deleteArtifact('art_1', PID)).toBeNull(); }); }); diff --git a/services/platform-service/src/modules/fleet/repository.ts b/services/platform-service/src/modules/fleet/repository.ts index e9fb0373..a298855f 100644 --- a/services/platform-service/src/modules/fleet/repository.ts +++ b/services/platform-service/src/modules/fleet/repository.ts @@ -247,12 +247,39 @@ export async function listEvents(jobId: string): Promise { return docs; } -// ── Artifacts ───────────────────────────────────────────────────────────────── +// ── Artifacts (pointers only — bytes live in blob, never Cosmos; §13) ────────── export async function createArtifact(doc: FleetArtifactDoc): Promise { return artifacts().create(doc); } -export async function listArtifacts(jobId: string): Promise { +/** All artifact pointers for a job, oldest-first (single partition — pk `/jobId`). */ +export async function listArtifactsByJob(jobId: string): Promise { return artifacts().findMany({ filter: { jobId }, sort: { createdAt: 1 } }); } + +/** + * Fetch one artifact pointer by id, scoped to its owning product. The container + * is partitioned by `/jobId` (not `/productId`), so this is a small filtered + * query rather than a point read; the `productId` predicate enforces ownership + * for the by-id routes (foreign-product / unknown id → null). + */ +export async function getArtifact(id: string, productId: string): Promise { + const found = await artifacts().findMany({ filter: { id, productId }, limit: 1 }); + return found[0] ?? null; +} + +/** + * Delete an artifact pointer by id, scoped to its owning product. Resolves the + * partition (`jobId`) from the located doc, then deletes. Returns the removed doc + * (so callers can also clean up the backing blob) or null when not found. + */ +export async function deleteArtifact( + id: string, + productId: string +): Promise { + const doc = await getArtifact(id, productId); + if (!doc) return null; + await artifacts().delete(id, doc.jobId); + return doc; +} diff --git a/services/platform-service/src/modules/fleet/routes.ts b/services/platform-service/src/modules/fleet/routes.ts index 0d630aea..12b80706 100644 --- a/services/platform-service/src/modules/fleet/routes.ts +++ b/services/platform-service/src/modules/fleet/routes.ts @@ -11,6 +11,10 @@ * POST /fleet/factories/heartbeat factory liveness * GET /fleet/jobs/:id/runs job run history * GET /fleet/jobs/:id/events append-only event stream + * POST /fleet/jobs/:id/artifacts upload a run output (base64 body → blob + pointer) + * GET /fleet/jobs/:id/artifacts list a job's artifact pointers + * GET /fleet/artifacts/:artifactId pointer + fresh short-lived SAS download URL + * DELETE /fleet/artifacts/:artifactId delete pointer (and backing blob) * * All routes require auth + a resolved productId, exactly like the items module. */ @@ -21,6 +25,7 @@ import { BadRequestError, ConflictError, NotFoundError } from '../../lib/errors. import { extractAuth } from '../../lib/auth.js'; import * as repo from './repository.js'; import * as coordinator from './coordinator.js'; +import * as artifactsBlob from './artifacts-blob.js'; import { SubmitJobSchema, ListJobsQuerySchema, @@ -29,6 +34,7 @@ import { RenewLeaseSchema, ReleaseLeaseSchema, HeartbeatSchema, + UploadArtifactSchema, } from './types.js'; function badRequest(issues: { message: string }[]): never { @@ -179,4 +185,53 @@ export async function fleetRoutes(app: FastifyInstance) { const events = await repo.listEvents(id); return { events }; }); + + // ── Artifacts: upload (base64 body → blob + pointer) ── + app.post('/fleet/jobs/:id/artifacts', async (req, reply) => { + await extractAuth(req); + const { id: jobId } = req.params as { id: string }; + const parsed = UploadArtifactSchema.safeParse(req.body); + if (!parsed.success) badRequest(parsed.error.issues); + const pid = parsed.data.productId || getRequestProductId(req); + const bytes = Buffer.from(parsed.data.contentBase64, 'base64'); + if (bytes.length === 0) badRequest([{ message: 'contentBase64 decoded to empty bytes' }]); + const { artifact, downloadUrl } = await artifactsBlob.uploadArtifact({ + productId: pid, + jobId, + kind: parsed.data.kind, + bytes, + contentType: parsed.data.contentType, + runId: parsed.data.runId, + }); + reply.code(201); + return { artifact, downloadUrl }; + }); + + // ── Artifacts: list a job's pointers ── + app.get('/fleet/jobs/:id/artifacts', async req => { + await extractAuth(req); + const { id: jobId } = req.params as { id: string }; + const artifacts = await repo.listArtifactsByJob(jobId); + return { artifacts }; + }); + + // ── Artifacts: pointer + fresh SAS download URL ── + app.get('/fleet/artifacts/:artifactId', async req => { + await extractAuth(req); + const { artifactId } = req.params as { artifactId: string }; + const pid = getRequestProductId(req); + const found = await artifactsBlob.getArtifactDownload(artifactId, pid); + if (!found) throw new NotFoundError('Artifact not found'); + return found; + }); + + // ── Artifacts: delete pointer (and backing blob) ── + app.delete('/fleet/artifacts/:artifactId', async req => { + await extractAuth(req); + const { artifactId } = req.params as { artifactId: string }; + const pid = getRequestProductId(req); + const deleted = await artifactsBlob.deleteArtifact(artifactId, pid); + if (!deleted) throw new NotFoundError('Artifact not found'); + return { deleted: true }; + }); } diff --git a/services/platform-service/src/modules/fleet/types.test.ts b/services/platform-service/src/modules/fleet/types.test.ts index 5cdb93ff..f944445f 100644 --- a/services/platform-service/src/modules/fleet/types.test.ts +++ b/services/platform-service/src/modules/fleet/types.test.ts @@ -160,18 +160,22 @@ describe('FleetProfileDocSchema / FleetEventDocSchema / FleetArtifactDocSchema', const { type: _t, ...bad } = valid; expect(FleetEventDocSchema.safeParse(bad).success).toBe(false); }); - it('accepts a valid artifact and rejects missing blobUrl', () => { + it('accepts a valid artifact pointer and rejects missing blobKey', () => { const valid = { id: 'art_1', productId: 'lysnrai', jobId: 'fjob_1', kind: 'coverage', - blobUrl: 'https://b/x', + blobKey: 'fleet/lysnrai/fjob_1/art_1-coverage', + contentType: 'application/json', + sizeBytes: 1234, createdAt: now, }; expect(FleetArtifactDocSchema.safeParse(valid).success).toBe(true); - const { blobUrl: _b, ...bad } = valid; + const { blobKey: _b, ...bad } = valid; expect(FleetArtifactDocSchema.safeParse(bad).success).toBe(false); + // kind is a closed enum — an unknown kind is rejected + expect(FleetArtifactDocSchema.safeParse({ ...valid, kind: 'bogus' }).success).toBe(false); }); }); diff --git a/services/platform-service/src/modules/fleet/types.ts b/services/platform-service/src/modules/fleet/types.ts index f20c859f..5cc13e4b 100644 --- a/services/platform-service/src/modules/fleet/types.ts +++ b/services/platform-service/src/modules/fleet/types.ts @@ -76,6 +76,10 @@ export type LeaseStatus = (typeof LEASE_STATUS)[number]; export const JOB_KINDS = ['leaf', 'composite'] as const; export type JobKind = (typeof JOB_KINDS)[number]; +/** Artifact categories (§13/§26). Large outputs live in blob; Cosmos holds pointers only. */ +export const FLEET_ARTIFACT_KINDS = ['log', 'coverage', 'screenshot', 'build', 'other'] as const; +export type FleetArtifactKind = (typeof FLEET_ARTIFACT_KINDS)[number]; + // ── Shared value objects ───────────────────────────────────────────────────── export const CheckpointSchema = z.object({ @@ -232,15 +236,25 @@ export const FleetEventDocSchema = z.object({ }); export type FleetEventDoc = z.infer; -/** FleetArtifactDoc — pointer to a blob-stored artifact (pk `/jobId`). No inline logs. */ +/** + * FleetArtifactDoc — a POINTER to a blob-stored run output (pk `/jobId`). + * + * Large outputs (logs, coverage, screenshots, build output) are written to blob + * storage; only this pointer (blob key + size/content-type/sha256 metadata) lives + * in Cosmos — NEVER the bytes themselves (doc-size + RU limits, §13). The + * short-lived SAS read URL is minted on demand from `blobKey` and is intentionally + * NOT persisted on the doc. + */ export const FleetArtifactDocSchema = z.object({ id: z.string(), productId: z.string().min(1), jobId: z.string().min(1), runId: z.string().optional(), - kind: z.string().min(1), - blobUrl: z.string().min(1), - sizeBytes: z.number().int().nonnegative().optional(), + kind: z.enum(FLEET_ARTIFACT_KINDS), + blobKey: z.string().min(1), + contentType: z.string().min(1), + sizeBytes: z.number().int().nonnegative(), + sha256: z.string().optional(), createdAt: z.string(), }); export type FleetArtifactDoc = z.infer; @@ -315,3 +329,17 @@ export const HeartbeatSchema = z.object({ seatLimit: z.number().int().positive().optional(), }); export type HeartbeatInput = z.infer; + +/** + * Upload an artifact for a job. The bytes are carried base64-encoded in the JSON + * body (large content is offloaded to blob server-side; nothing is stored inline + * in Cosmos). `productId` may override the request-resolved product. + */ +export const UploadArtifactSchema = z.object({ + productId: z.string().min(1).optional(), + runId: z.string().min(1).optional(), + kind: z.enum(FLEET_ARTIFACT_KINDS), + contentType: z.string().min(1).default('application/octet-stream'), + contentBase64: z.string().min(1), +}); +export type UploadArtifactInput = z.infer;