Merge: Phase 2 scheduler/router core (§7) + fleet artifacts/blob (§13)
Combined parallel work (file-disjoint). Scheduler: deterministic scoring router wired into atomic claim. Artifacts: blob-backed pointers in fleet_artifacts. fleet suite 79/79; platform-service 1591/1591; build clean.
This commit is contained in:
commit
4f9ec3332f
112
services/platform-service/src/modules/fleet/artifacts-blob.ts
Normal file
112
services/platform-service/src/modules/fleet/artifacts-blob.ts
Normal file
@ -0,0 +1,112 @@
|
||||
/**
|
||||
* Fleet artifacts — blob wiring (§13/§26).
|
||||
*
|
||||
* Large run outputs (logs, coverage, screenshots, build output) are written to
|
||||
* blob storage; only a POINTER (blob key + size/content-type/sha256) is persisted
|
||||
* in the `fleet_artifacts` Cosmos container. The bytes NEVER touch Cosmos — this
|
||||
* keeps documents well under the doc-size / RU ceilings no matter how large a log
|
||||
* gets. Read access is granted via a short-lived SAS URL minted on demand from the
|
||||
* stored key (the URL itself is never persisted).
|
||||
*
|
||||
* Blob key scheme (deterministic, product- and job-scoped):
|
||||
* fleet/<productId>/<jobId>/<artifactId>-<kind>
|
||||
*/
|
||||
|
||||
import { createHash, randomUUID } from 'node:crypto';
|
||||
import { getBucket, generateSasUrl } from '../../lib/blob.js';
|
||||
import * as repo from './repository.js';
|
||||
import { FleetArtifactDocSchema, type FleetArtifactDoc, type FleetArtifactKind } from './types.js';
|
||||
|
||||
/** Container holding all fleet run-output blobs. */
|
||||
export const FLEET_ARTIFACTS_CONTAINER = 'fleet-artifacts';
|
||||
|
||||
/** SAS read-URL lifetime. Short-lived — callers re-issue via getArtifactDownload. */
|
||||
export const ARTIFACT_SAS_TTL_MINUTES = 15;
|
||||
|
||||
/** A persisted artifact pointer paired with a freshly-minted SAS read URL. */
|
||||
export interface ArtifactWithDownload {
|
||||
artifact: FleetArtifactDoc;
|
||||
downloadUrl: string;
|
||||
}
|
||||
|
||||
export interface UploadArtifactArgs {
|
||||
productId: string;
|
||||
jobId: string;
|
||||
kind: FleetArtifactKind;
|
||||
bytes: Buffer;
|
||||
contentType: string;
|
||||
runId?: string;
|
||||
}
|
||||
|
||||
/** Deterministic blob key for an artifact. */
|
||||
export function artifactBlobKey(
|
||||
productId: string,
|
||||
jobId: string,
|
||||
artifactId: string,
|
||||
kind: FleetArtifactKind
|
||||
): string {
|
||||
return `fleet/${productId}/${jobId}/${artifactId}-${kind}`;
|
||||
}
|
||||
|
||||
/**
|
||||
* Upload artifact bytes to blob storage and persist the Cosmos pointer.
|
||||
* Returns the pointer doc plus a short-lived SAS read URL. The bytes live only in
|
||||
* blob — the returned doc carries no inline payload.
|
||||
*/
|
||||
export async function uploadArtifact(args: UploadArtifactArgs): Promise<ArtifactWithDownload> {
|
||||
const id = `art_${randomUUID()}`;
|
||||
const blobKey = artifactBlobKey(args.productId, args.jobId, id, args.kind);
|
||||
|
||||
const bucket = await getBucket(FLEET_ARTIFACTS_CONTAINER);
|
||||
const meta = await bucket.upload(blobKey, args.bytes, { contentType: args.contentType });
|
||||
|
||||
const sha256 = createHash('sha256').update(args.bytes).digest('hex');
|
||||
|
||||
const doc: FleetArtifactDoc = FleetArtifactDocSchema.parse({
|
||||
id,
|
||||
productId: args.productId,
|
||||
jobId: args.jobId,
|
||||
runId: args.runId,
|
||||
kind: args.kind,
|
||||
blobKey,
|
||||
contentType: args.contentType,
|
||||
sizeBytes: meta.size ?? args.bytes.length,
|
||||
sha256,
|
||||
createdAt: new Date().toISOString(),
|
||||
});
|
||||
|
||||
const artifact = await repo.createArtifact(doc);
|
||||
const downloadUrl = await issueDownloadUrl(blobKey);
|
||||
return { artifact, downloadUrl };
|
||||
}
|
||||
|
||||
/**
|
||||
* Re-issue a fresh short-lived SAS read URL for an existing artifact, scoped to
|
||||
* its owning product. Returns null when the artifact does not exist for that product.
|
||||
*/
|
||||
export async function getArtifactDownload(
|
||||
id: string,
|
||||
productId: string
|
||||
): Promise<ArtifactWithDownload | null> {
|
||||
const artifact = await repo.getArtifact(id, productId);
|
||||
if (!artifact) return null;
|
||||
const downloadUrl = await issueDownloadUrl(artifact.blobKey);
|
||||
return { artifact, downloadUrl };
|
||||
}
|
||||
|
||||
/**
|
||||
* Delete an artifact's pointer (and its backing blob), scoped to its owning
|
||||
* product. Returns false when nothing matched.
|
||||
*/
|
||||
export async function deleteArtifact(id: string, productId: string): Promise<boolean> {
|
||||
const artifact = await repo.getArtifact(id, productId);
|
||||
if (!artifact) return false;
|
||||
const bucket = await getBucket(FLEET_ARTIFACTS_CONTAINER);
|
||||
await bucket.delete(artifact.blobKey);
|
||||
await repo.deleteArtifact(id, productId);
|
||||
return true;
|
||||
}
|
||||
|
||||
function issueDownloadUrl(blobKey: string): Promise<string> {
|
||||
return generateSasUrl(FLEET_ARTIFACTS_CONTAINER, blobKey, 'r', ARTIFACT_SAS_TTL_MINUTES);
|
||||
}
|
||||
251
services/platform-service/src/modules/fleet/artifacts.test.ts
Normal file
251
services/platform-service/src/modules/fleet/artifacts.test.ts
Normal file
@ -0,0 +1,251 @@
|
||||
/**
|
||||
* Fleet artifacts — blob wiring (§13/§26).
|
||||
*
|
||||
* Runs on the in-memory datastore + in-memory blob provider. The central
|
||||
* guarantee under test: artifact BYTES live in blob storage and only a POINTER
|
||||
* (blobKey + size/content-type/sha256) is persisted in Cosmos — never the bytes.
|
||||
*
|
||||
* Auth + productId resolution are mocked exactly as the items / fleet routes
|
||||
* tests do. The non-route service tests don't touch those mocks.
|
||||
*/
|
||||
|
||||
// Select the in-memory blob provider before the storage singleton is created.
|
||||
process.env.STORAGE_PROVIDER = 'memory';
|
||||
|
||||
import Fastify, { type FastifyInstance } from 'fastify';
|
||||
import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest';
|
||||
import { MemoryDatastoreProvider } from '@bytelyst/datastore';
|
||||
import { _resetBlobClient, getBucket } from '@bytelyst/blob';
|
||||
import { _resetDatastoreProvider, setProvider } from '../../lib/datastore.js';
|
||||
import * as repo from './repository.js';
|
||||
import * as artifactsBlob from './artifacts-blob.js';
|
||||
|
||||
vi.mock('../../lib/auth.js', () => ({
|
||||
extractAuth: vi.fn(async () => ({ sub: 'user_1', role: 'admin' })),
|
||||
}));
|
||||
vi.mock('../../lib/request-context.js', () => ({
|
||||
getRequestProductId: () => 'lysnrai',
|
||||
}));
|
||||
|
||||
const PID = 'lysnrai';
|
||||
|
||||
/** Fields that would indicate bytes were (wrongly) inlined into the Cosmos doc. */
|
||||
const INLINE_PAYLOAD_FIELDS = [
|
||||
'contentBase64',
|
||||
'payload',
|
||||
'bytes',
|
||||
'data',
|
||||
'content',
|
||||
'body',
|
||||
'blob',
|
||||
];
|
||||
|
||||
async function buildApp(): Promise<FastifyInstance> {
|
||||
const { fleetRoutes } = await import('./routes.js');
|
||||
const app = Fastify({ logger: false });
|
||||
await app.register(fleetRoutes, { prefix: '/api' });
|
||||
return app;
|
||||
}
|
||||
|
||||
beforeEach(() => {
|
||||
setProvider(new MemoryDatastoreProvider());
|
||||
_resetBlobClient();
|
||||
});
|
||||
afterEach(() => {
|
||||
_resetDatastoreProvider();
|
||||
_resetBlobClient();
|
||||
vi.clearAllMocks();
|
||||
});
|
||||
|
||||
describe('fleet artifacts — blob service', () => {
|
||||
it('upload writes bytes to blob and persists a pointer-only Cosmos doc', async () => {
|
||||
const bytes = Buffer.from('hello log output');
|
||||
const { artifact, downloadUrl } = await artifactsBlob.uploadArtifact({
|
||||
productId: PID,
|
||||
jobId: 'j1',
|
||||
kind: 'log',
|
||||
bytes,
|
||||
contentType: 'text/plain',
|
||||
});
|
||||
|
||||
// pointer metadata
|
||||
expect(artifact.productId).toBe(PID);
|
||||
expect(artifact.jobId).toBe('j1');
|
||||
expect(artifact.kind).toBe('log');
|
||||
expect(artifact.contentType).toBe('text/plain');
|
||||
expect(artifact.sizeBytes).toBe(bytes.length);
|
||||
expect(artifact.blobKey).toBe(`fleet/${PID}/j1/${artifact.id}-log`);
|
||||
expect(artifact.sha256).toMatch(/^[0-9a-f]{64}$/);
|
||||
expect(downloadUrl).toContain('signed=true');
|
||||
|
||||
// the persisted Cosmos doc carries NO inline payload — bytes are not in Cosmos
|
||||
const stored = await repo.getArtifact(artifact.id, PID);
|
||||
expect(stored).not.toBeNull();
|
||||
for (const field of INLINE_PAYLOAD_FIELDS) {
|
||||
expect(stored).not.toHaveProperty(field);
|
||||
}
|
||||
// only the pointer/metadata fields are present
|
||||
expect(stored).toMatchObject({
|
||||
id: artifact.id,
|
||||
productId: PID,
|
||||
jobId: 'j1',
|
||||
kind: 'log',
|
||||
blobKey: artifact.blobKey,
|
||||
contentType: 'text/plain',
|
||||
sizeBytes: bytes.length,
|
||||
});
|
||||
|
||||
// the actual bytes DO live in blob storage
|
||||
const bucket = await getBucket(artifactsBlob.FLEET_ARTIFACTS_CONTAINER);
|
||||
const fromBlob = await bucket.download(artifact.blobKey);
|
||||
expect(fromBlob.toString()).toBe('hello log output');
|
||||
});
|
||||
|
||||
it('list by job is partition-isolated (returns only that job)', async () => {
|
||||
await artifactsBlob.uploadArtifact({
|
||||
productId: PID,
|
||||
jobId: 'jA',
|
||||
kind: 'log',
|
||||
bytes: Buffer.from('a1'),
|
||||
contentType: 'text/plain',
|
||||
});
|
||||
await artifactsBlob.uploadArtifact({
|
||||
productId: PID,
|
||||
jobId: 'jA',
|
||||
kind: 'coverage',
|
||||
bytes: Buffer.from('a2'),
|
||||
contentType: 'application/json',
|
||||
});
|
||||
await artifactsBlob.uploadArtifact({
|
||||
productId: PID,
|
||||
jobId: 'jB',
|
||||
kind: 'screenshot',
|
||||
bytes: Buffer.from('b1'),
|
||||
contentType: 'image/png',
|
||||
});
|
||||
|
||||
const a = await repo.listArtifactsByJob('jA');
|
||||
const b = await repo.listArtifactsByJob('jB');
|
||||
expect(a).toHaveLength(2);
|
||||
expect(a.every(x => x.jobId === 'jA')).toBe(true);
|
||||
expect(b).toHaveLength(1);
|
||||
expect(b[0].jobId).toBe('jB');
|
||||
});
|
||||
|
||||
it('get re-issues a fresh SAS URL; a large (>Cosmos-safe) payload still succeeds (blob offload)', async () => {
|
||||
// 3 MB — comfortably beyond the ~2 MB Cosmos document ceiling, so this can
|
||||
// only succeed because the bytes are offloaded to blob, not stored inline.
|
||||
const big = Buffer.alloc(3 * 1024 * 1024, 0x61);
|
||||
const { artifact } = await artifactsBlob.uploadArtifact({
|
||||
productId: PID,
|
||||
jobId: 'jbig',
|
||||
kind: 'build',
|
||||
bytes: big,
|
||||
contentType: 'application/octet-stream',
|
||||
});
|
||||
expect(artifact.sizeBytes).toBe(3 * 1024 * 1024);
|
||||
|
||||
const dl = await artifactsBlob.getArtifactDownload(artifact.id, PID);
|
||||
expect(dl).not.toBeNull();
|
||||
expect(dl?.downloadUrl).toContain('signed=true');
|
||||
expect(dl?.downloadUrl).toContain(artifact.blobKey);
|
||||
|
||||
// product-scoped: a foreign product cannot fetch it
|
||||
expect(await artifactsBlob.getArtifactDownload(artifact.id, 'other-product')).toBeNull();
|
||||
// bytes round-trip from blob at full size
|
||||
const bucket = await getBucket(artifactsBlob.FLEET_ARTIFACTS_CONTAINER);
|
||||
expect((await bucket.download(artifact.blobKey)).length).toBe(3 * 1024 * 1024);
|
||||
});
|
||||
|
||||
it('delete removes both the pointer and the backing blob', async () => {
|
||||
const { artifact } = await artifactsBlob.uploadArtifact({
|
||||
productId: PID,
|
||||
jobId: 'jd',
|
||||
kind: 'other',
|
||||
bytes: Buffer.from('tmp'),
|
||||
contentType: 'text/plain',
|
||||
});
|
||||
const bucket = await getBucket(artifactsBlob.FLEET_ARTIFACTS_CONTAINER);
|
||||
expect(await bucket.exists(artifact.blobKey)).toBe(true);
|
||||
|
||||
expect(await artifactsBlob.deleteArtifact(artifact.id, PID)).toBe(true);
|
||||
expect(await repo.getArtifact(artifact.id, PID)).toBeNull();
|
||||
expect(await bucket.exists(artifact.blobKey)).toBe(false);
|
||||
|
||||
// idempotent / unknown → false
|
||||
expect(await artifactsBlob.deleteArtifact(artifact.id, PID)).toBe(false);
|
||||
});
|
||||
});
|
||||
|
||||
describe('fleet artifacts — routes (fastify inject)', () => {
|
||||
const b64 = (s: string) => Buffer.from(s).toString('base64');
|
||||
|
||||
it('POST upload → 201 with pointer + SAS; GET list → the pointer; bytes not echoed', async () => {
|
||||
const app = await buildApp();
|
||||
const res = await app.inject({
|
||||
method: 'POST',
|
||||
url: '/api/fleet/jobs/jr1/artifacts',
|
||||
payload: { kind: 'log', contentType: 'text/plain', contentBase64: b64('route log bytes') },
|
||||
});
|
||||
expect(res.statusCode).toBe(201);
|
||||
const body = JSON.parse(res.body);
|
||||
expect(body.artifact.jobId).toBe('jr1');
|
||||
expect(body.artifact.blobKey).toBe(`fleet/${PID}/jr1/${body.artifact.id}-log`);
|
||||
expect(body.artifact.sizeBytes).toBe(Buffer.from('route log bytes').length);
|
||||
expect(body.downloadUrl).toContain('signed=true');
|
||||
for (const field of INLINE_PAYLOAD_FIELDS) {
|
||||
expect(body.artifact).not.toHaveProperty(field);
|
||||
}
|
||||
|
||||
const list = await app.inject({ method: 'GET', url: '/api/fleet/jobs/jr1/artifacts' });
|
||||
expect(list.statusCode).toBe(200);
|
||||
const arts = JSON.parse(list.body).artifacts;
|
||||
expect(arts).toHaveLength(1);
|
||||
expect(arts[0].id).toBe(body.artifact.id);
|
||||
});
|
||||
|
||||
it('GET /fleet/artifacts/:id returns a fresh SAS URL; DELETE removes it', async () => {
|
||||
const app = await buildApp();
|
||||
const up = await app.inject({
|
||||
method: 'POST',
|
||||
url: '/api/fleet/jobs/jr2/artifacts',
|
||||
payload: {
|
||||
kind: 'coverage',
|
||||
contentType: 'application/json',
|
||||
contentBase64: b64('{"pct":91}'),
|
||||
},
|
||||
});
|
||||
const id = JSON.parse(up.body).artifact.id as string;
|
||||
|
||||
const got = await app.inject({ method: 'GET', url: `/api/fleet/artifacts/${id}` });
|
||||
expect(got.statusCode).toBe(200);
|
||||
const gotBody = JSON.parse(got.body);
|
||||
expect(gotBody.artifact.id).toBe(id);
|
||||
expect(gotBody.downloadUrl).toContain('signed=true');
|
||||
|
||||
const del = await app.inject({ method: 'DELETE', url: `/api/fleet/artifacts/${id}` });
|
||||
expect(del.statusCode).toBe(200);
|
||||
expect(JSON.parse(del.body).deleted).toBe(true);
|
||||
|
||||
// gone now
|
||||
const after = await app.inject({ method: 'GET', url: `/api/fleet/artifacts/${id}` });
|
||||
expect(after.statusCode).toBe(404);
|
||||
});
|
||||
|
||||
it('rejects an invalid upload body (400) and unknown artifact ids (404)', async () => {
|
||||
const app = await buildApp();
|
||||
|
||||
const bad = await app.inject({
|
||||
method: 'POST',
|
||||
url: '/api/fleet/jobs/jr3/artifacts',
|
||||
payload: { contentBase64: b64('x') }, // missing kind
|
||||
});
|
||||
expect(bad.statusCode).toBe(400);
|
||||
|
||||
const getMissing = await app.inject({ method: 'GET', url: '/api/fleet/artifacts/nope' });
|
||||
expect(getMissing.statusCode).toBe(404);
|
||||
|
||||
const delMissing = await app.inject({ method: 'DELETE', url: '/api/fleet/artifacts/nope' });
|
||||
expect(delMissing.statusCode).toBe(404);
|
||||
});
|
||||
});
|
||||
@ -279,4 +279,40 @@ describe('fleet coordinator', () => {
|
||||
coord.submitJob(PID, input({ idempotencyKey: 'k', bodyMd: 'v2' }))
|
||||
).rejects.toBeInstanceOf(ConflictError);
|
||||
});
|
||||
|
||||
// ── §7 SCORE-DRIVEN SELECTION (Phase 2 scheduler wired into claimNextJob) ──
|
||||
it('selection now follows the §7 score: within-budget cost-fit beats an older same-priority job', async () => {
|
||||
// jobB is submitted first (older) and is the same priority, but its budget blows
|
||||
// the factory cost ceiling. The old priority+age rule would have taken the older
|
||||
// jobB; the scorer prefers the within-budget jobA.
|
||||
await coord.submitJob(
|
||||
PID,
|
||||
input({ idempotencyKey: 'B-old-expensive', priority: 'medium', budget: { usd: 100 } })
|
||||
);
|
||||
await coord.submitJob(
|
||||
PID,
|
||||
input({ idempotencyKey: 'A-new-cheap', priority: 'medium', budget: { usd: 5 } })
|
||||
);
|
||||
const claim = await coord.claimNextJob(factory({ costCeilingUsd: 10 }));
|
||||
expect(claim?.job.idempotencyKey).toBe('A-new-cheap');
|
||||
});
|
||||
|
||||
it('claimNextJob: a factory below the health floor (down) claims nothing; a healthy one does', async () => {
|
||||
await coord.submitJob(PID, input());
|
||||
expect(await coord.claimNextJob(factory({ health: 'down' }))).toBeNull();
|
||||
const ok = await coord.claimNextJob(factory({ health: 'ok' }));
|
||||
expect(ok).not.toBeNull();
|
||||
});
|
||||
|
||||
it('claimNextJob drains in score order: highest priority first, then the next, then null', async () => {
|
||||
await coord.submitJob(PID, input({ idempotencyKey: 'low', priority: 'low' }));
|
||||
await coord.submitJob(PID, input({ idempotencyKey: 'crit', priority: 'critical' }));
|
||||
expect((await coord.claimNextJob(factory({ factoryId: 'f1' })))?.job.idempotencyKey).toBe(
|
||||
'crit'
|
||||
);
|
||||
expect((await coord.claimNextJob(factory({ factoryId: 'f2' })))?.job.idempotencyKey).toBe(
|
||||
'low'
|
||||
);
|
||||
expect(await coord.claimNextJob(factory({ factoryId: 'f3' }))).toBeNull();
|
||||
});
|
||||
});
|
||||
|
||||
@ -20,6 +20,7 @@
|
||||
import { createHash } from 'node:crypto';
|
||||
import { BadRequestError, ConflictError } from '../../lib/errors.js';
|
||||
import * as repo from './repository.js';
|
||||
import { selectJob, type SchedulerContext, type SchedulerFactory } from './scheduler.js';
|
||||
import {
|
||||
ACTIVE_STAGES,
|
||||
DEP_DONE_HARD,
|
||||
@ -38,11 +39,10 @@ export function contentHash(bodyMd: string): string {
|
||||
return createHash('sha256').update(bodyMd).digest('hex');
|
||||
}
|
||||
|
||||
/** Every required capability token must be advertised by the factory. */
|
||||
export function capabilitiesSubset(required: string[], available: string[]): boolean {
|
||||
const set = new Set(available);
|
||||
return required.every(token => set.has(token));
|
||||
}
|
||||
// The capability-subset predicate + the §7 scoring/selection engine live in the
|
||||
// pure, unit-tested scheduler module. Re-export the predicate here to preserve the
|
||||
// coordinator's public surface (claimNextJob now ranks candidates via selectJob).
|
||||
export { capabilitiesSubset } from './scheduler.js';
|
||||
|
||||
// ── Dependency evaluation (§5) ────────────────────────────────────────────────
|
||||
|
||||
@ -228,6 +228,19 @@ export interface ClaimContext {
|
||||
factoryId: string;
|
||||
capabilities: string[];
|
||||
leaseSeconds: number;
|
||||
// ── §7 scheduler inputs (additive, optional — sane defaults below) ──
|
||||
/** Factory health; below the floor (`down`) the factory claims nothing. */
|
||||
health?: 'ok' | 'degraded' | 'down';
|
||||
/** Current factory load (busier ⇒ lower score). */
|
||||
load?: number;
|
||||
/** Per-engine seat limit (carried for scoring/future seat-aware routing). */
|
||||
seatLimit?: number;
|
||||
/** Engines this factory runs (prefers-engine affinity). */
|
||||
factoryEngines?: string[];
|
||||
/** Scopes (repos/locks) the factory has warm (repo-stickiness affinity). */
|
||||
warmScopes?: string[];
|
||||
/** Factory/budget cost ceiling in USD (cost-fit). */
|
||||
costCeilingUsd?: number;
|
||||
}
|
||||
|
||||
export interface ClaimResult {
|
||||
@ -236,12 +249,18 @@ export interface ClaimResult {
|
||||
run: FleetRunDoc;
|
||||
}
|
||||
|
||||
/** A job is eligible for a factory iff queued/blocked-with-met-deps + caps subset. */
|
||||
async function eligibleForClaim(job: FleetJobDoc, factoryCaps: string[]): Promise<boolean> {
|
||||
if (job.stage !== 'queued' && job.stage !== 'blocked') return false;
|
||||
if (!capabilitiesSubset(job.capabilities, factoryCaps)) return false;
|
||||
const unmet = await unmetDeps(job);
|
||||
return unmet.length === 0;
|
||||
/**
|
||||
* Resolve which stage-eligible (queued/blocked) jobs currently have their deps
|
||||
* satisfied. This is the store-backed (async) half of eligibility; the pure
|
||||
* capability-subset filter + §7 scoring + tie-break are applied by `selectJob`.
|
||||
*/
|
||||
async function depsSatisfiedIds(jobs: FleetJobDoc[]): Promise<Set<string>> {
|
||||
const satisfied = new Set<string>();
|
||||
for (const job of jobs) {
|
||||
if (job.stage !== 'queued' && job.stage !== 'blocked') continue;
|
||||
if ((await unmetDeps(job)).length === 0) satisfied.add(job.id);
|
||||
}
|
||||
return satisfied;
|
||||
}
|
||||
|
||||
/**
|
||||
@ -314,19 +333,36 @@ export async function tryClaimJob(
|
||||
return { ok: true, doc: { job: claimed.doc, lease, run } };
|
||||
}
|
||||
|
||||
/** Select the highest-priority, oldest eligible job and atomically claim it. */
|
||||
/**
|
||||
* Select the best eligible job via the §7 scoring engine and atomically claim it.
|
||||
*
|
||||
* The coordinator owns all I/O: it lists candidates, resolves deps (store-backed)
|
||||
* into a pure predicate, and builds the factory view + authoritative `now`. The
|
||||
* pure `selectJob` then applies the capability hard-filter + fixed-weight scoring
|
||||
* + deterministic tie-break (priority → age → cost class). The atomic
|
||||
* single-winner guarantee remains entirely in `tryClaimJob`'s rev compare-and-swap,
|
||||
* which is unchanged — on conflict we re-select and retry.
|
||||
*/
|
||||
export async function claimNextJob(ctx: ClaimContext): Promise<ClaimResult | null> {
|
||||
const factory: SchedulerFactory = {
|
||||
capabilities: ctx.capabilities,
|
||||
health: ctx.health ?? 'ok',
|
||||
load: ctx.load ?? 0,
|
||||
seatLimit: ctx.seatLimit ?? 1,
|
||||
};
|
||||
for (let i = 0; i < CLAIM_MAX_RETRIES; i++) {
|
||||
const candidates = await repo.listJobs({ productId: ctx.productId });
|
||||
const eligible: FleetJobDoc[] = [];
|
||||
for (const job of candidates) {
|
||||
if (await eligibleForClaim(job, ctx.capabilities)) eligible.push(job);
|
||||
}
|
||||
if (eligible.length === 0) return null;
|
||||
eligible.sort(
|
||||
(a, b) => a.priorityOrder - b.priorityOrder || a.createdAt.localeCompare(b.createdAt)
|
||||
);
|
||||
const result = await tryClaimJob(eligible[0], ctx);
|
||||
const satisfied = await depsSatisfiedIds(candidates);
|
||||
const schedulerCtx: SchedulerContext = {
|
||||
now: Date.now(), // coordinator-authoritative time
|
||||
isDepsSatisfied: job => satisfied.has(job.id),
|
||||
factoryEngines: ctx.factoryEngines,
|
||||
warmScopes: ctx.warmScopes,
|
||||
costCeilingUsd: ctx.costCeilingUsd,
|
||||
};
|
||||
const pick = selectJob(candidates, factory, schedulerCtx);
|
||||
if (!pick) return null;
|
||||
const result = await tryClaimJob(pick, ctx);
|
||||
if (result.ok) return result.doc;
|
||||
if (result.reason === 'not_found') continue;
|
||||
// conflict: another factory won this version — re-select and retry
|
||||
|
||||
@ -171,17 +171,29 @@ describe('fleet repository', () => {
|
||||
expect(events.map(e => e.type)).toEqual(['submitted', 'assigned', 'transition']);
|
||||
});
|
||||
|
||||
it('artifacts: create + list', async () => {
|
||||
it('artifacts: create + listByJob + get + delete (pointer only)', async () => {
|
||||
await repo.createArtifact({
|
||||
id: 'art_1',
|
||||
productId: PID,
|
||||
jobId: 'j',
|
||||
kind: 'coverage',
|
||||
blobUrl: 'https://b/x',
|
||||
blobKey: 'fleet/lysnrai/j/art_1-coverage',
|
||||
contentType: 'application/json',
|
||||
sizeBytes: 42,
|
||||
createdAt: now,
|
||||
});
|
||||
const arts = await repo.listArtifacts('j');
|
||||
const arts = await repo.listArtifactsByJob('j');
|
||||
expect(arts).toHaveLength(1);
|
||||
expect(arts[0].blobUrl).toBe('https://b/x');
|
||||
expect(arts[0].blobKey).toBe('fleet/lysnrai/j/art_1-coverage');
|
||||
|
||||
// get is product-scoped (wrong product → null)
|
||||
expect((await repo.getArtifact('art_1', PID))?.contentType).toBe('application/json');
|
||||
expect(await repo.getArtifact('art_1', 'other-product')).toBeNull();
|
||||
|
||||
// delete returns the removed doc and clears the partition
|
||||
const removed = await repo.deleteArtifact('art_1', PID);
|
||||
expect(removed?.id).toBe('art_1');
|
||||
expect(await repo.listArtifactsByJob('j')).toHaveLength(0);
|
||||
expect(await repo.deleteArtifact('art_1', PID)).toBeNull();
|
||||
});
|
||||
});
|
||||
|
||||
@ -247,12 +247,39 @@ export async function listEvents(jobId: string): Promise<FleetEventDoc[]> {
|
||||
return docs;
|
||||
}
|
||||
|
||||
// ── Artifacts ─────────────────────────────────────────────────────────────────
|
||||
// ── Artifacts (pointers only — bytes live in blob, never Cosmos; §13) ──────────
|
||||
|
||||
export async function createArtifact(doc: FleetArtifactDoc): Promise<FleetArtifactDoc> {
|
||||
return artifacts().create(doc);
|
||||
}
|
||||
|
||||
export async function listArtifacts(jobId: string): Promise<FleetArtifactDoc[]> {
|
||||
/** All artifact pointers for a job, oldest-first (single partition — pk `/jobId`). */
|
||||
export async function listArtifactsByJob(jobId: string): Promise<FleetArtifactDoc[]> {
|
||||
return artifacts().findMany({ filter: { jobId }, sort: { createdAt: 1 } });
|
||||
}
|
||||
|
||||
/**
|
||||
* Fetch one artifact pointer by id, scoped to its owning product. The container
|
||||
* is partitioned by `/jobId` (not `/productId`), so this is a small filtered
|
||||
* query rather than a point read; the `productId` predicate enforces ownership
|
||||
* for the by-id routes (foreign-product / unknown id → null).
|
||||
*/
|
||||
export async function getArtifact(id: string, productId: string): Promise<FleetArtifactDoc | null> {
|
||||
const found = await artifacts().findMany({ filter: { id, productId }, limit: 1 });
|
||||
return found[0] ?? null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Delete an artifact pointer by id, scoped to its owning product. Resolves the
|
||||
* partition (`jobId`) from the located doc, then deletes. Returns the removed doc
|
||||
* (so callers can also clean up the backing blob) or null when not found.
|
||||
*/
|
||||
export async function deleteArtifact(
|
||||
id: string,
|
||||
productId: string
|
||||
): Promise<FleetArtifactDoc | null> {
|
||||
const doc = await getArtifact(id, productId);
|
||||
if (!doc) return null;
|
||||
await artifacts().delete(id, doc.jobId);
|
||||
return doc;
|
||||
}
|
||||
|
||||
@ -11,6 +11,10 @@
|
||||
* POST /fleet/factories/heartbeat factory liveness
|
||||
* GET /fleet/jobs/:id/runs job run history
|
||||
* GET /fleet/jobs/:id/events append-only event stream
|
||||
* POST /fleet/jobs/:id/artifacts upload a run output (base64 body → blob + pointer)
|
||||
* GET /fleet/jobs/:id/artifacts list a job's artifact pointers
|
||||
* GET /fleet/artifacts/:artifactId pointer + fresh short-lived SAS download URL
|
||||
* DELETE /fleet/artifacts/:artifactId delete pointer (and backing blob)
|
||||
*
|
||||
* All routes require auth + a resolved productId, exactly like the items module.
|
||||
*/
|
||||
@ -21,6 +25,7 @@ import { BadRequestError, ConflictError, NotFoundError } from '../../lib/errors.
|
||||
import { extractAuth } from '../../lib/auth.js';
|
||||
import * as repo from './repository.js';
|
||||
import * as coordinator from './coordinator.js';
|
||||
import * as artifactsBlob from './artifacts-blob.js';
|
||||
import {
|
||||
SubmitJobSchema,
|
||||
ListJobsQuerySchema,
|
||||
@ -29,6 +34,7 @@ import {
|
||||
RenewLeaseSchema,
|
||||
ReleaseLeaseSchema,
|
||||
HeartbeatSchema,
|
||||
UploadArtifactSchema,
|
||||
} from './types.js';
|
||||
|
||||
function badRequest(issues: { message: string }[]): never {
|
||||
@ -179,4 +185,53 @@ export async function fleetRoutes(app: FastifyInstance) {
|
||||
const events = await repo.listEvents(id);
|
||||
return { events };
|
||||
});
|
||||
|
||||
// ── Artifacts: upload (base64 body → blob + pointer) ──
|
||||
app.post('/fleet/jobs/:id/artifacts', async (req, reply) => {
|
||||
await extractAuth(req);
|
||||
const { id: jobId } = req.params as { id: string };
|
||||
const parsed = UploadArtifactSchema.safeParse(req.body);
|
||||
if (!parsed.success) badRequest(parsed.error.issues);
|
||||
const pid = parsed.data.productId || getRequestProductId(req);
|
||||
const bytes = Buffer.from(parsed.data.contentBase64, 'base64');
|
||||
if (bytes.length === 0) badRequest([{ message: 'contentBase64 decoded to empty bytes' }]);
|
||||
const { artifact, downloadUrl } = await artifactsBlob.uploadArtifact({
|
||||
productId: pid,
|
||||
jobId,
|
||||
kind: parsed.data.kind,
|
||||
bytes,
|
||||
contentType: parsed.data.contentType,
|
||||
runId: parsed.data.runId,
|
||||
});
|
||||
reply.code(201);
|
||||
return { artifact, downloadUrl };
|
||||
});
|
||||
|
||||
// ── Artifacts: list a job's pointers ──
|
||||
app.get('/fleet/jobs/:id/artifacts', async req => {
|
||||
await extractAuth(req);
|
||||
const { id: jobId } = req.params as { id: string };
|
||||
const artifacts = await repo.listArtifactsByJob(jobId);
|
||||
return { artifacts };
|
||||
});
|
||||
|
||||
// ── Artifacts: pointer + fresh SAS download URL ──
|
||||
app.get('/fleet/artifacts/:artifactId', async req => {
|
||||
await extractAuth(req);
|
||||
const { artifactId } = req.params as { artifactId: string };
|
||||
const pid = getRequestProductId(req);
|
||||
const found = await artifactsBlob.getArtifactDownload(artifactId, pid);
|
||||
if (!found) throw new NotFoundError('Artifact not found');
|
||||
return found;
|
||||
});
|
||||
|
||||
// ── Artifacts: delete pointer (and backing blob) ──
|
||||
app.delete('/fleet/artifacts/:artifactId', async req => {
|
||||
await extractAuth(req);
|
||||
const { artifactId } = req.params as { artifactId: string };
|
||||
const pid = getRequestProductId(req);
|
||||
const deleted = await artifactsBlob.deleteArtifact(artifactId, pid);
|
||||
if (!deleted) throw new NotFoundError('Artifact not found');
|
||||
return { deleted: true };
|
||||
});
|
||||
}
|
||||
|
||||
203
services/platform-service/src/modules/fleet/scheduler.test.ts
Normal file
203
services/platform-service/src/modules/fleet/scheduler.test.ts
Normal file
@ -0,0 +1,203 @@
|
||||
/**
|
||||
* Fleet scheduler / router core (§7) — pure, deterministic unit tests.
|
||||
* No datastore, no real clock: time is injected via ctx.now.
|
||||
*/
|
||||
|
||||
import { describe, expect, it } from 'vitest';
|
||||
import {
|
||||
DEFAULT_WEIGHTS,
|
||||
scoreCandidate,
|
||||
selectJob,
|
||||
capabilitiesSubset,
|
||||
type SchedulerContext,
|
||||
type SchedulerFactory,
|
||||
} from './scheduler.js';
|
||||
import { PRIORITY_ORDER, type FleetJobDoc, type FleetPriority } from './types.js';
|
||||
|
||||
const NOW = Date.parse('2026-05-29T12:00:00.000Z');
|
||||
const iso = (msAgo: number) => new Date(NOW - msAgo).toISOString();
|
||||
|
||||
/** Build a minimal valid FleetJobDoc for scoring. */
|
||||
function job(over: Partial<FleetJobDoc> & { id: string }): FleetJobDoc {
|
||||
const priority: FleetPriority = over.priority ?? 'medium';
|
||||
const manifest: FleetJobDoc['manifestSnapshot'] = {
|
||||
priority,
|
||||
capabilities: over.capabilities ?? [],
|
||||
prefersEngine: [],
|
||||
allowedScope: [],
|
||||
deps: [],
|
||||
...(over.manifestSnapshot ?? {}),
|
||||
};
|
||||
return {
|
||||
productId: 'lysnrai',
|
||||
stage: 'queued',
|
||||
idempotencyKey: over.id,
|
||||
contentHash: 'h',
|
||||
bodyMd: '# task',
|
||||
capabilities: [],
|
||||
deps: [],
|
||||
kind: 'leaf',
|
||||
attempts: 0,
|
||||
leaseEpoch: 0,
|
||||
rev: 0,
|
||||
createdAt: iso(0),
|
||||
updatedAt: iso(0),
|
||||
...over,
|
||||
priority,
|
||||
priorityOrder: over.priorityOrder ?? PRIORITY_ORDER[priority],
|
||||
manifestSnapshot: manifest,
|
||||
};
|
||||
}
|
||||
|
||||
const fac = (over: Partial<SchedulerFactory> = {}): SchedulerFactory => ({
|
||||
capabilities: [],
|
||||
health: 'ok',
|
||||
load: 0,
|
||||
...over,
|
||||
});
|
||||
|
||||
const ctx = (over: Partial<SchedulerContext> = {}): SchedulerContext => ({ now: NOW, ...over });
|
||||
|
||||
describe('scheduler §7 — capability hard filter', () => {
|
||||
it('a factory missing a required capability never selects that job', () => {
|
||||
const needsMac = job({ id: 'needs-mac', capabilities: ['os:mac'] });
|
||||
expect(selectJob([needsMac], fac({ capabilities: [] }), ctx())).toBeNull();
|
||||
expect(selectJob([needsMac], fac({ capabilities: ['os:mac', 'has:git'] }), ctx())?.id).toBe(
|
||||
'needs-mac'
|
||||
);
|
||||
});
|
||||
|
||||
it('among candidates, only capability-subset jobs are eligible', () => {
|
||||
const a = job({ id: 'a', capabilities: ['os:mac'] });
|
||||
const b = job({ id: 'b', capabilities: ['os:linux'] }); // factory cannot run this
|
||||
const pick = selectJob([a, b], fac({ capabilities: ['os:mac'] }), ctx());
|
||||
expect(pick?.id).toBe('a');
|
||||
});
|
||||
|
||||
it('capabilitiesSubset predicate', () => {
|
||||
expect(capabilitiesSubset(['a', 'b'], ['a', 'b', 'c'])).toBe(true);
|
||||
expect(capabilitiesSubset(['a', 'z'], ['a', 'b'])).toBe(false);
|
||||
expect(capabilitiesSubset([], ['a'])).toBe(true);
|
||||
});
|
||||
});
|
||||
|
||||
describe('scheduler §7 — priority + age tie-breaks (all else equal)', () => {
|
||||
it('priority dominates when scores tie', () => {
|
||||
const low = job({ id: 'low', priority: 'low' });
|
||||
const med = job({ id: 'med', priority: 'medium' });
|
||||
const high = job({ id: 'high', priority: 'high' });
|
||||
const crit = job({ id: 'crit', priority: 'critical' });
|
||||
expect(selectJob([low, med, high, crit], fac(), ctx())?.id).toBe('crit');
|
||||
});
|
||||
|
||||
it('age breaks ties deterministically — older wins among equal priority', () => {
|
||||
// both within the same aging bucket ⇒ equal starvation ⇒ score tie ⇒ age tie-break
|
||||
const older = job({ id: 'older', priority: 'medium', createdAt: iso(5_000) });
|
||||
const newer = job({ id: 'newer', priority: 'medium', createdAt: iso(1_000) });
|
||||
expect(selectJob([newer, older], fac(), ctx())?.id).toBe('older');
|
||||
});
|
||||
});
|
||||
|
||||
describe('scheduler §7 — load & health', () => {
|
||||
it('a higher-load factory scores lower (1/(1+load))', () => {
|
||||
const j = job({ id: 'j' });
|
||||
const idle = scoreCandidate(j, fac({ load: 0 }), ctx()).score;
|
||||
const busy = scoreCandidate(j, fac({ load: 5 }), ctx()).score;
|
||||
expect(idle).toBeGreaterThan(busy);
|
||||
});
|
||||
|
||||
it('degraded health scores lower than ok', () => {
|
||||
const j = job({ id: 'j' });
|
||||
const ok = scoreCandidate(j, fac({ health: 'ok' }), ctx()).score;
|
||||
const degraded = scoreCandidate(j, fac({ health: 'degraded' }), ctx()).score;
|
||||
expect(ok).toBeGreaterThan(degraded);
|
||||
});
|
||||
|
||||
it('a down factory is filtered out entirely (health floor)', () => {
|
||||
const j = job({ id: 'j' });
|
||||
expect(selectJob([j], fac({ health: 'down' }), ctx())).toBeNull();
|
||||
});
|
||||
});
|
||||
|
||||
describe('scheduler §7 — starvation (anti-starvation aging)', () => {
|
||||
it('an aged low-priority job outranks a fresh low-priority one', () => {
|
||||
const fresh = job({ id: 'fresh', priority: 'low', createdAt: iso(0) });
|
||||
const aged = job({ id: 'aged', priority: 'low', createdAt: iso(40 * 60_000) });
|
||||
expect(selectJob([fresh, aged], fac(), ctx())?.id).toBe('aged');
|
||||
// and the aged job's standalone score is higher
|
||||
expect(scoreCandidate(aged, fac(), ctx()).score).toBeGreaterThan(
|
||||
scoreCandidate(fresh, fac(), ctx()).score
|
||||
);
|
||||
});
|
||||
});
|
||||
|
||||
describe('scheduler §7 — cost fit', () => {
|
||||
it('a job exceeding the cost ceiling is penalized and ranked last', () => {
|
||||
const within = job({ id: 'within', budget: { usd: 5 } });
|
||||
const over = job({ id: 'over', budget: { usd: 100 } });
|
||||
const c = ctx({ costCeilingUsd: 10 });
|
||||
expect(selectJob([over, within], fac(), c)?.id).toBe('within');
|
||||
expect(scoreCandidate(over, fac(), c).score).toBeLessThan(
|
||||
scoreCandidate(within, fac(), c).score
|
||||
);
|
||||
});
|
||||
|
||||
it('no ceiling ⇒ cost is neutral (full costFit)', () => {
|
||||
const j = job({ id: 'j', budget: { usd: 999 } });
|
||||
expect(scoreCandidate(j, fac(), ctx()).breakdown.costFit).toBeCloseTo(DEFAULT_WEIGHTS.costFit);
|
||||
});
|
||||
});
|
||||
|
||||
describe('scheduler §7 — affinity', () => {
|
||||
it('prefers-engine match raises affinity', () => {
|
||||
const j = job({
|
||||
id: 'j',
|
||||
manifestSnapshot: { prefersEngine: ['devin'] } as FleetJobDoc['manifestSnapshot'],
|
||||
});
|
||||
const hit = scoreCandidate(j, fac(), ctx({ factoryEngines: ['devin'] })).breakdown.affinity;
|
||||
const miss = scoreCandidate(j, fac(), ctx({ factoryEngines: ['claude'] })).breakdown.affinity;
|
||||
expect(hit).toBeGreaterThan(miss);
|
||||
});
|
||||
});
|
||||
|
||||
describe('scheduler §7 — breakdown & determinism', () => {
|
||||
it('breakdown is per-weighted-term and sums to the score', () => {
|
||||
const j = job({ id: 'j', priority: 'high', budget: { usd: 3 }, createdAt: iso(90_000) });
|
||||
const c = ctx({ costCeilingUsd: 10, factoryEngines: ['devin'] });
|
||||
const { score, breakdown } = scoreCandidate(j, fac({ load: 2, health: 'degraded' }), c);
|
||||
const sum =
|
||||
breakdown.capabilityFit +
|
||||
breakdown.affinity +
|
||||
breakdown.load +
|
||||
breakdown.costFit +
|
||||
breakdown.health +
|
||||
breakdown.starvation;
|
||||
expect(sum).toBeCloseTo(score, 12);
|
||||
expect(breakdown.starvation).toBeLessThanOrEqual(0); // signed penalty
|
||||
});
|
||||
|
||||
it('selectJob is deterministic — same inputs ⇒ same pick across runs', () => {
|
||||
const cands = [
|
||||
job({ id: 'a', priority: 'medium', createdAt: iso(1_000) }),
|
||||
job({ id: 'b', priority: 'high', createdAt: iso(2_000) }),
|
||||
job({ id: 'c', priority: 'high', createdAt: iso(3_000) }),
|
||||
];
|
||||
const picks = Array.from({ length: 5 }, () => selectJob(cands, fac(), ctx())?.id);
|
||||
expect(new Set(picks).size).toBe(1);
|
||||
// highest priority (b/c are 'high' > a's 'medium'); b vs c tie on score+priority,
|
||||
// so the age tie-break wins → c is older (created 3s ago vs b's 2s ago).
|
||||
expect(picks[0]).toBe('c');
|
||||
});
|
||||
|
||||
it('empty candidates ⇒ null; no eligible (none queued/blocked) ⇒ null', () => {
|
||||
expect(selectJob([], fac(), ctx())).toBeNull();
|
||||
const assigned = job({ id: 'x', stage: 'assigned' });
|
||||
expect(selectJob([assigned], fac(), ctx())).toBeNull();
|
||||
});
|
||||
|
||||
it('blocked jobs are eligible only when the deps predicate says so', () => {
|
||||
const blocked = job({ id: 'blk', stage: 'blocked' });
|
||||
expect(selectJob([blocked], fac(), ctx({ isDepsSatisfied: () => false }))).toBeNull();
|
||||
expect(selectJob([blocked], fac(), ctx({ isDepsSatisfied: () => true }))?.id).toBe('blk');
|
||||
});
|
||||
});
|
||||
280
services/platform-service/src/modules/fleet/scheduler.ts
Normal file
280
services/platform-service/src/modules/fleet/scheduler.ts
Normal file
@ -0,0 +1,280 @@
|
||||
/**
|
||||
* Fleet scheduler / router core — the deterministic, fixed-weight scoring engine
|
||||
* that decides WHICH job a claiming factory gets (Phase 2, §7 of the gigafactory
|
||||
* roadmap).
|
||||
*
|
||||
* This module is PURE and SYNCHRONOUS: no datastore calls, no clock reads, no env.
|
||||
* Everything it needs is passed in — health/load/seatLimit from the factory view,
|
||||
* age from `job.createdAt` vs `ctx.now` (coordinator-authoritative time), and the
|
||||
* deps-satisfied predicate (the coordinator resolves deps asynchronously and hands
|
||||
* us a pure predicate). That keeps the scoring fully unit-testable and lets the
|
||||
* coordinator own all I/O and the atomic compare-and-swap claim.
|
||||
*
|
||||
* Phasing (§7): Phase 2 ships the deterministic filter + fixed-weight scoring.
|
||||
* Phase 3 adds tunable weights, preemption, and the explainability UI; Phase 5
|
||||
* learns the weights. We deliberately do NOT build tunable weights or preemption
|
||||
* here — only the fixed-weight core + a per-term breakdown for explainability.
|
||||
*
|
||||
* Scoring formula (§7):
|
||||
* score = w1·capabilityFit
|
||||
* + w2·affinity(prefersEngine / repo-stickiness)
|
||||
* + w3·(1 / (1 + load))
|
||||
* + w4·costFit(budget)
|
||||
* + w5·health
|
||||
* − w6·starvationPenalty(age)
|
||||
*
|
||||
* Selection: filter to deps-satisfied + capability-subset (+ health floor), rank
|
||||
* by score, then a deterministic tie-break: higher priority → older createdAt →
|
||||
* lower cost class.
|
||||
*/
|
||||
|
||||
import type { FactoryHealth, FleetJobDoc } from './types.js';
|
||||
|
||||
// ── Weights (fixed this phase; overridable via a passed-in object, NOT env) ──
|
||||
|
||||
/** Fixed-weight config for the §7 scoring terms. Phase 3 makes these tunable. */
|
||||
export interface SchedulerWeights {
|
||||
/** w1 — hard capability fit (satisfied requirement ratio). */
|
||||
capabilityFit: number;
|
||||
/** w2 — affinity: prefers-engine match + warm-scope (repo) stickiness. */
|
||||
affinity: number;
|
||||
/** w3 — inverse load `1/(1+load)`; a busier factory scores lower. */
|
||||
load: number;
|
||||
/** w4 — cost fit: penalize jobs whose budget exceeds the factory cost ceiling. */
|
||||
costFit: number;
|
||||
/** w5 — factory health (ok=1, degraded=0.5; `down` is filtered out, not scored). */
|
||||
health: number;
|
||||
/** w6 — starvation: subtracts a freshness penalty that decays as a job ages,
|
||||
* so an aged job outranks an equally-prioritised fresh one (anti-starvation). */
|
||||
starvation: number;
|
||||
}
|
||||
|
||||
/**
|
||||
* Phase-2 fixed defaults. Rationale: capability + health are hard signals (1.0);
|
||||
* load matters strongly (1.0); cost is a moderate guard (0.75); affinity is a
|
||||
* gentle nudge (0.5); starvation is weighted high enough (1.5) to lift an aged
|
||||
* job above a same-priority fresh peer. Tunable per-product weights are Phase 3.
|
||||
*/
|
||||
export const DEFAULT_WEIGHTS: SchedulerWeights = {
|
||||
capabilityFit: 1.0,
|
||||
affinity: 0.5,
|
||||
load: 1.0,
|
||||
costFit: 0.75,
|
||||
health: 1.0,
|
||||
starvation: 1.5,
|
||||
};
|
||||
|
||||
/** Aging config for the starvation term (fixed this phase). */
|
||||
export interface StarvationConfig {
|
||||
/** Width of one aging bucket, in ms. Ages within a bucket score identically,
|
||||
* so jobs submitted close together tie (and fall through to the priority
|
||||
* tie-break) rather than being separated by sub-second noise. */
|
||||
intervalMs: number;
|
||||
/** Number of buckets after which the freshness penalty fully decays to 0. */
|
||||
buckets: number;
|
||||
}
|
||||
|
||||
export const DEFAULT_STARVATION: StarvationConfig = {
|
||||
intervalMs: 60_000, // 1 minute
|
||||
buckets: 30, // fully de-penalised after ~30 minutes of waiting
|
||||
};
|
||||
|
||||
// ── Factory view + context the engine consumes ──────────────────────────────
|
||||
|
||||
/**
|
||||
* The factory fields the scorer needs — a structural subset of FleetFactoryDoc
|
||||
* (so a real factory doc is assignable), but also satisfiable from the claim
|
||||
* context so the coordinator need not always load the full doc.
|
||||
*/
|
||||
export interface SchedulerFactory {
|
||||
capabilities: string[];
|
||||
health?: FactoryHealth;
|
||||
load?: number;
|
||||
seatLimit?: number;
|
||||
}
|
||||
|
||||
/** Pure context: authoritative time + injected deps predicate + affinity/cost hints. */
|
||||
export interface SchedulerContext {
|
||||
/** Coordinator-authoritative now (ms epoch). Drives the starvation/age term. */
|
||||
now: number;
|
||||
/** Deps gate (the coordinator resolves deps async and passes a pure predicate).
|
||||
* Omitted ⇒ treated as satisfied. */
|
||||
isDepsSatisfied?: (job: FleetJobDoc) => boolean;
|
||||
/** Engines the claiming factory runs (for prefers-engine affinity). */
|
||||
factoryEngines?: string[];
|
||||
/** Scopes (repos/locks) the factory has warm (for repo-stickiness affinity). */
|
||||
warmScopes?: string[];
|
||||
/** The factory/budget cost ceiling in USD for cost-fit. Omitted ⇒ unconstrained. */
|
||||
costCeilingUsd?: number;
|
||||
/** Override the starvation aging config (fixed defaults otherwise). */
|
||||
starvation?: StarvationConfig;
|
||||
}
|
||||
|
||||
/** Per-term, already-weighted contributions. Sums to `score` (starvation signed −). */
|
||||
export interface ScoreBreakdown {
|
||||
capabilityFit: number;
|
||||
affinity: number;
|
||||
load: number;
|
||||
costFit: number;
|
||||
health: number;
|
||||
starvation: number;
|
||||
}
|
||||
|
||||
export interface ScoredCandidate {
|
||||
score: number;
|
||||
breakdown: ScoreBreakdown;
|
||||
}
|
||||
|
||||
// ── Pure predicates / helpers ────────────────────────────────────────────────
|
||||
|
||||
/** Every required capability token must be advertised by the factory (hard gate). */
|
||||
export function capabilitiesSubset(required: string[], available: string[]): boolean {
|
||||
const set = new Set(available);
|
||||
return required.every(token => set.has(token));
|
||||
}
|
||||
|
||||
function overlaps(a: readonly string[], b: readonly string[]): boolean {
|
||||
if (a.length === 0 || b.length === 0) return false;
|
||||
const set = new Set(b);
|
||||
return a.some(x => set.has(x));
|
||||
}
|
||||
|
||||
const HEALTH_SCORE: Record<FactoryHealth, number> = { ok: 1, degraded: 0.5, down: 0 };
|
||||
|
||||
function clamp01(n: number): number {
|
||||
if (n < 0) return 0;
|
||||
if (n > 1) return 1;
|
||||
return n;
|
||||
}
|
||||
|
||||
/** w1 term — satisfied-requirement ratio (1 when the hard subset holds). */
|
||||
function capabilityFitTerm(job: FleetJobDoc, factory: SchedulerFactory): number {
|
||||
const required = job.capabilities ?? [];
|
||||
if (required.length === 0) return 1;
|
||||
const have = new Set(factory.capabilities);
|
||||
const matched = required.reduce((n, cap) => (have.has(cap) ? n + 1 : n), 0);
|
||||
return matched / required.length;
|
||||
}
|
||||
|
||||
/** w2 term — prefers-engine match + warm-scope stickiness, each contributing half. */
|
||||
function affinityTerm(job: FleetJobDoc, ctx: SchedulerContext): number {
|
||||
const prefers = job.manifestSnapshot?.prefersEngine ?? [];
|
||||
const scopes = job.manifestSnapshot?.allowedScope ?? [];
|
||||
const prefersScore = prefers.length > 0 && overlaps(prefers, ctx.factoryEngines ?? []) ? 1 : 0;
|
||||
const stickyScore = scopes.length > 0 && overlaps(scopes, ctx.warmScopes ?? []) ? 1 : 0;
|
||||
return clamp01((prefersScore + stickyScore) / 2);
|
||||
}
|
||||
|
||||
/** w3 term — inverse load. */
|
||||
function loadTerm(factory: SchedulerFactory): number {
|
||||
const load = factory.load ?? 0;
|
||||
return 1 / (1 + Math.max(0, load));
|
||||
}
|
||||
|
||||
/** w4 term — 1 when within the cost ceiling (or unconstrained), decays toward 0
|
||||
* the further a job's budget exceeds the ceiling. */
|
||||
function costFitTerm(job: FleetJobDoc, ctx: SchedulerContext): number {
|
||||
const budget = job.budget?.usd;
|
||||
const ceiling = ctx.costCeilingUsd;
|
||||
if (budget === undefined || ceiling === undefined) return 1;
|
||||
if (ceiling <= 0) return budget <= 0 ? 1 : 0;
|
||||
if (budget <= ceiling) return 1;
|
||||
return clamp01(ceiling / budget);
|
||||
}
|
||||
|
||||
/** w5 term — factory health as a [0,1] score. */
|
||||
function healthTerm(factory: SchedulerFactory): number {
|
||||
return HEALTH_SCORE[factory.health ?? 'ok'];
|
||||
}
|
||||
|
||||
/** Freshness penalty in [0,1]: 1 for a brand-new job, decaying to 0 as it ages
|
||||
* past `buckets` aging intervals. Subtracted from the score, so an aged job
|
||||
* loses less and rises above an equally-prioritised fresh peer (anti-starvation).
|
||||
* Bucketing makes near-simultaneous submissions tie (→ priority tie-break). */
|
||||
function starvationPenaltyTerm(job: FleetJobDoc, ctx: SchedulerContext): number {
|
||||
const cfg = ctx.starvation ?? DEFAULT_STARVATION;
|
||||
const ageMs = Math.max(0, ctx.now - Date.parse(job.createdAt));
|
||||
const aged = Math.floor(ageMs / cfg.intervalMs);
|
||||
return clamp01(1 - aged / cfg.buckets);
|
||||
}
|
||||
|
||||
// ── Public scoring API ────────────────────────────────────────────────────────
|
||||
|
||||
/**
|
||||
* Score one (job, factory) pair. Returns the total `score` plus the per-term,
|
||||
* already-weighted `breakdown` (the six terms sum to `score`) for explainability
|
||||
* (§7 / Phase-3 readiness). Pure + synchronous.
|
||||
*/
|
||||
export function scoreCandidate(
|
||||
job: FleetJobDoc,
|
||||
factory: SchedulerFactory,
|
||||
ctx: SchedulerContext,
|
||||
weights: SchedulerWeights = DEFAULT_WEIGHTS
|
||||
): ScoredCandidate {
|
||||
const breakdown: ScoreBreakdown = {
|
||||
capabilityFit: weights.capabilityFit * capabilityFitTerm(job, factory),
|
||||
affinity: weights.affinity * affinityTerm(job, ctx),
|
||||
load: weights.load * loadTerm(factory),
|
||||
costFit: weights.costFit * costFitTerm(job, ctx),
|
||||
health: weights.health * healthTerm(factory),
|
||||
starvation: -weights.starvation * starvationPenaltyTerm(job, ctx),
|
||||
};
|
||||
const score =
|
||||
breakdown.capabilityFit +
|
||||
breakdown.affinity +
|
||||
breakdown.load +
|
||||
breakdown.costFit +
|
||||
breakdown.health +
|
||||
breakdown.starvation;
|
||||
return { score, breakdown };
|
||||
}
|
||||
|
||||
/** Cost class used as the final tie-break (lower USD budget = lower class first). */
|
||||
function costClass(job: FleetJobDoc): number {
|
||||
return job.budget?.usd ?? 0;
|
||||
}
|
||||
|
||||
/** Scores within EPSILON are treated as a tie and fall through to the §7 tie-break. */
|
||||
const SCORE_EPSILON = 1e-9;
|
||||
|
||||
/**
|
||||
* Pick the best job for `factory` from `candidates`:
|
||||
* 1. filter to stage queued/blocked + deps-satisfied (ctx predicate) +
|
||||
* hard capability-subset, and drop everything if the factory is `down`
|
||||
* (health floor — filtered, not merely down-weighted, per §7);
|
||||
* 2. rank by `scoreCandidate` (descending);
|
||||
* 3. deterministic tie-break: higher priority → older createdAt → lower cost class.
|
||||
* Returns `null` when nothing is eligible. Pure + synchronous + deterministic.
|
||||
*/
|
||||
export function selectJob(
|
||||
candidates: FleetJobDoc[],
|
||||
factory: SchedulerFactory,
|
||||
ctx: SchedulerContext,
|
||||
weights: SchedulerWeights = DEFAULT_WEIGHTS
|
||||
): FleetJobDoc | null {
|
||||
// Health floor: a factory below the floor is excluded entirely (§7).
|
||||
if ((factory.health ?? 'ok') === 'down') return null;
|
||||
|
||||
const depsSatisfied = ctx.isDepsSatisfied ?? (() => true);
|
||||
const eligible = candidates.filter(
|
||||
job =>
|
||||
(job.stage === 'queued' || job.stage === 'blocked') &&
|
||||
capabilitiesSubset(job.capabilities ?? [], factory.capabilities) &&
|
||||
depsSatisfied(job)
|
||||
);
|
||||
if (eligible.length === 0) return null;
|
||||
|
||||
const scored = eligible.map(job => ({
|
||||
job,
|
||||
score: scoreCandidate(job, factory, ctx, weights).score,
|
||||
}));
|
||||
scored.sort((a, b) => {
|
||||
if (Math.abs(b.score - a.score) > SCORE_EPSILON) return b.score - a.score; // higher score first
|
||||
if (a.job.priorityOrder !== b.job.priorityOrder)
|
||||
return a.job.priorityOrder - b.job.priorityOrder; // higher priority (lower order) first
|
||||
const ageCmp = a.job.createdAt.localeCompare(b.job.createdAt); // older (earlier ISO) first
|
||||
if (ageCmp !== 0) return ageCmp;
|
||||
return costClass(a.job) - costClass(b.job); // lower cost class first
|
||||
});
|
||||
return scored[0].job;
|
||||
}
|
||||
@ -160,18 +160,22 @@ describe('FleetProfileDocSchema / FleetEventDocSchema / FleetArtifactDocSchema',
|
||||
const { type: _t, ...bad } = valid;
|
||||
expect(FleetEventDocSchema.safeParse(bad).success).toBe(false);
|
||||
});
|
||||
it('accepts a valid artifact and rejects missing blobUrl', () => {
|
||||
it('accepts a valid artifact pointer and rejects missing blobKey', () => {
|
||||
const valid = {
|
||||
id: 'art_1',
|
||||
productId: 'lysnrai',
|
||||
jobId: 'fjob_1',
|
||||
kind: 'coverage',
|
||||
blobUrl: 'https://b/x',
|
||||
blobKey: 'fleet/lysnrai/fjob_1/art_1-coverage',
|
||||
contentType: 'application/json',
|
||||
sizeBytes: 1234,
|
||||
createdAt: now,
|
||||
};
|
||||
expect(FleetArtifactDocSchema.safeParse(valid).success).toBe(true);
|
||||
const { blobUrl: _b, ...bad } = valid;
|
||||
const { blobKey: _b, ...bad } = valid;
|
||||
expect(FleetArtifactDocSchema.safeParse(bad).success).toBe(false);
|
||||
// kind is a closed enum — an unknown kind is rejected
|
||||
expect(FleetArtifactDocSchema.safeParse({ ...valid, kind: 'bogus' }).success).toBe(false);
|
||||
});
|
||||
});
|
||||
|
||||
|
||||
@ -76,6 +76,10 @@ export type LeaseStatus = (typeof LEASE_STATUS)[number];
|
||||
export const JOB_KINDS = ['leaf', 'composite'] as const;
|
||||
export type JobKind = (typeof JOB_KINDS)[number];
|
||||
|
||||
/** Artifact categories (§13/§26). Large outputs live in blob; Cosmos holds pointers only. */
|
||||
export const FLEET_ARTIFACT_KINDS = ['log', 'coverage', 'screenshot', 'build', 'other'] as const;
|
||||
export type FleetArtifactKind = (typeof FLEET_ARTIFACT_KINDS)[number];
|
||||
|
||||
// ── Shared value objects ─────────────────────────────────────────────────────
|
||||
|
||||
export const CheckpointSchema = z.object({
|
||||
@ -232,15 +236,25 @@ export const FleetEventDocSchema = z.object({
|
||||
});
|
||||
export type FleetEventDoc = z.infer<typeof FleetEventDocSchema>;
|
||||
|
||||
/** FleetArtifactDoc — pointer to a blob-stored artifact (pk `/jobId`). No inline logs. */
|
||||
/**
|
||||
* FleetArtifactDoc — a POINTER to a blob-stored run output (pk `/jobId`).
|
||||
*
|
||||
* Large outputs (logs, coverage, screenshots, build output) are written to blob
|
||||
* storage; only this pointer (blob key + size/content-type/sha256 metadata) lives
|
||||
* in Cosmos — NEVER the bytes themselves (doc-size + RU limits, §13). The
|
||||
* short-lived SAS read URL is minted on demand from `blobKey` and is intentionally
|
||||
* NOT persisted on the doc.
|
||||
*/
|
||||
export const FleetArtifactDocSchema = z.object({
|
||||
id: z.string(),
|
||||
productId: z.string().min(1),
|
||||
jobId: z.string().min(1),
|
||||
runId: z.string().optional(),
|
||||
kind: z.string().min(1),
|
||||
blobUrl: z.string().min(1),
|
||||
sizeBytes: z.number().int().nonnegative().optional(),
|
||||
kind: z.enum(FLEET_ARTIFACT_KINDS),
|
||||
blobKey: z.string().min(1),
|
||||
contentType: z.string().min(1),
|
||||
sizeBytes: z.number().int().nonnegative(),
|
||||
sha256: z.string().optional(),
|
||||
createdAt: z.string(),
|
||||
});
|
||||
export type FleetArtifactDoc = z.infer<typeof FleetArtifactDocSchema>;
|
||||
@ -315,3 +329,17 @@ export const HeartbeatSchema = z.object({
|
||||
seatLimit: z.number().int().positive().optional(),
|
||||
});
|
||||
export type HeartbeatInput = z.infer<typeof HeartbeatSchema>;
|
||||
|
||||
/**
|
||||
* Upload an artifact for a job. The bytes are carried base64-encoded in the JSON
|
||||
* body (large content is offloaded to blob server-side; nothing is stored inline
|
||||
* in Cosmos). `productId` may override the request-resolved product.
|
||||
*/
|
||||
export const UploadArtifactSchema = z.object({
|
||||
productId: z.string().min(1).optional(),
|
||||
runId: z.string().min(1).optional(),
|
||||
kind: z.enum(FLEET_ARTIFACT_KINDS),
|
||||
contentType: z.string().min(1).default('application/octet-stream'),
|
||||
contentBase64: z.string().min(1),
|
||||
});
|
||||
export type UploadArtifactInput = z.infer<typeof UploadArtifactSchema>;
|
||||
|
||||
Loading…
Reference in New Issue
Block a user