feat(platform-service): fleet artifacts + blob wiring (§13)

Artifact pointers in fleet_artifacts; large outputs in @bytelyst/blob (never
Cosmos). Routes: POST/GET /fleet/jobs/:id/artifacts, GET/DELETE
/fleet/artifacts/:id with short-lived SAS. 7 artifact tests.
This commit is contained in:
saravanakumardb1 2026-05-29 23:11:45 -07:00
parent 7930e8b0bd
commit b65e818f3d
7 changed files with 502 additions and 13 deletions

View File

@ -0,0 +1,112 @@
/**
* Fleet artifacts blob wiring (§13/§26).
*
* Large run outputs (logs, coverage, screenshots, build output) are written to
* blob storage; only a POINTER (blob key + size/content-type/sha256) is persisted
* in the `fleet_artifacts` Cosmos container. The bytes NEVER touch Cosmos this
* keeps documents well under the doc-size / RU ceilings no matter how large a log
* gets. Read access is granted via a short-lived SAS URL minted on demand from the
* stored key (the URL itself is never persisted).
*
* Blob key scheme (deterministic, product- and job-scoped):
* fleet/<productId>/<jobId>/<artifactId>-<kind>
*/
import { createHash, randomUUID } from 'node:crypto';
import { getBucket, generateSasUrl } from '../../lib/blob.js';
import * as repo from './repository.js';
import { FleetArtifactDocSchema, type FleetArtifactDoc, type FleetArtifactKind } from './types.js';
/** Container holding all fleet run-output blobs. */
export const FLEET_ARTIFACTS_CONTAINER = 'fleet-artifacts';
/** SAS read-URL lifetime. Short-lived — callers re-issue via getArtifactDownload. */
export const ARTIFACT_SAS_TTL_MINUTES = 15;
/** A persisted artifact pointer paired with a freshly-minted SAS read URL. */
export interface ArtifactWithDownload {
artifact: FleetArtifactDoc;
downloadUrl: string;
}
export interface UploadArtifactArgs {
productId: string;
jobId: string;
kind: FleetArtifactKind;
bytes: Buffer;
contentType: string;
runId?: string;
}
/** Deterministic blob key for an artifact. */
export function artifactBlobKey(
productId: string,
jobId: string,
artifactId: string,
kind: FleetArtifactKind
): string {
return `fleet/${productId}/${jobId}/${artifactId}-${kind}`;
}
/**
* Upload artifact bytes to blob storage and persist the Cosmos pointer.
* Returns the pointer doc plus a short-lived SAS read URL. The bytes live only in
* blob the returned doc carries no inline payload.
*/
export async function uploadArtifact(args: UploadArtifactArgs): Promise<ArtifactWithDownload> {
const id = `art_${randomUUID()}`;
const blobKey = artifactBlobKey(args.productId, args.jobId, id, args.kind);
const bucket = await getBucket(FLEET_ARTIFACTS_CONTAINER);
const meta = await bucket.upload(blobKey, args.bytes, { contentType: args.contentType });
const sha256 = createHash('sha256').update(args.bytes).digest('hex');
const doc: FleetArtifactDoc = FleetArtifactDocSchema.parse({
id,
productId: args.productId,
jobId: args.jobId,
runId: args.runId,
kind: args.kind,
blobKey,
contentType: args.contentType,
sizeBytes: meta.size ?? args.bytes.length,
sha256,
createdAt: new Date().toISOString(),
});
const artifact = await repo.createArtifact(doc);
const downloadUrl = await issueDownloadUrl(blobKey);
return { artifact, downloadUrl };
}
/**
* Re-issue a fresh short-lived SAS read URL for an existing artifact, scoped to
* its owning product. Returns null when the artifact does not exist for that product.
*/
export async function getArtifactDownload(
id: string,
productId: string
): Promise<ArtifactWithDownload | null> {
const artifact = await repo.getArtifact(id, productId);
if (!artifact) return null;
const downloadUrl = await issueDownloadUrl(artifact.blobKey);
return { artifact, downloadUrl };
}
/**
* Delete an artifact's pointer (and its backing blob), scoped to its owning
* product. Returns false when nothing matched.
*/
export async function deleteArtifact(id: string, productId: string): Promise<boolean> {
const artifact = await repo.getArtifact(id, productId);
if (!artifact) return false;
const bucket = await getBucket(FLEET_ARTIFACTS_CONTAINER);
await bucket.delete(artifact.blobKey);
await repo.deleteArtifact(id, productId);
return true;
}
function issueDownloadUrl(blobKey: string): Promise<string> {
return generateSasUrl(FLEET_ARTIFACTS_CONTAINER, blobKey, 'r', ARTIFACT_SAS_TTL_MINUTES);
}

View File

@ -0,0 +1,251 @@
/**
* Fleet artifacts blob wiring (§13/§26).
*
* Runs on the in-memory datastore + in-memory blob provider. The central
* guarantee under test: artifact BYTES live in blob storage and only a POINTER
* (blobKey + size/content-type/sha256) is persisted in Cosmos never the bytes.
*
* Auth + productId resolution are mocked exactly as the items / fleet routes
* tests do. The non-route service tests don't touch those mocks.
*/
// Select the in-memory blob provider before the storage singleton is created.
process.env.STORAGE_PROVIDER = 'memory';
import Fastify, { type FastifyInstance } from 'fastify';
import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest';
import { MemoryDatastoreProvider } from '@bytelyst/datastore';
import { _resetBlobClient, getBucket } from '@bytelyst/blob';
import { _resetDatastoreProvider, setProvider } from '../../lib/datastore.js';
import * as repo from './repository.js';
import * as artifactsBlob from './artifacts-blob.js';
vi.mock('../../lib/auth.js', () => ({
extractAuth: vi.fn(async () => ({ sub: 'user_1', role: 'admin' })),
}));
vi.mock('../../lib/request-context.js', () => ({
getRequestProductId: () => 'lysnrai',
}));
const PID = 'lysnrai';
/** Fields that would indicate bytes were (wrongly) inlined into the Cosmos doc. */
const INLINE_PAYLOAD_FIELDS = [
'contentBase64',
'payload',
'bytes',
'data',
'content',
'body',
'blob',
];
async function buildApp(): Promise<FastifyInstance> {
const { fleetRoutes } = await import('./routes.js');
const app = Fastify({ logger: false });
await app.register(fleetRoutes, { prefix: '/api' });
return app;
}
beforeEach(() => {
setProvider(new MemoryDatastoreProvider());
_resetBlobClient();
});
afterEach(() => {
_resetDatastoreProvider();
_resetBlobClient();
vi.clearAllMocks();
});
describe('fleet artifacts — blob service', () => {
it('upload writes bytes to blob and persists a pointer-only Cosmos doc', async () => {
const bytes = Buffer.from('hello log output');
const { artifact, downloadUrl } = await artifactsBlob.uploadArtifact({
productId: PID,
jobId: 'j1',
kind: 'log',
bytes,
contentType: 'text/plain',
});
// pointer metadata
expect(artifact.productId).toBe(PID);
expect(artifact.jobId).toBe('j1');
expect(artifact.kind).toBe('log');
expect(artifact.contentType).toBe('text/plain');
expect(artifact.sizeBytes).toBe(bytes.length);
expect(artifact.blobKey).toBe(`fleet/${PID}/j1/${artifact.id}-log`);
expect(artifact.sha256).toMatch(/^[0-9a-f]{64}$/);
expect(downloadUrl).toContain('signed=true');
// the persisted Cosmos doc carries NO inline payload — bytes are not in Cosmos
const stored = await repo.getArtifact(artifact.id, PID);
expect(stored).not.toBeNull();
for (const field of INLINE_PAYLOAD_FIELDS) {
expect(stored).not.toHaveProperty(field);
}
// only the pointer/metadata fields are present
expect(stored).toMatchObject({
id: artifact.id,
productId: PID,
jobId: 'j1',
kind: 'log',
blobKey: artifact.blobKey,
contentType: 'text/plain',
sizeBytes: bytes.length,
});
// the actual bytes DO live in blob storage
const bucket = await getBucket(artifactsBlob.FLEET_ARTIFACTS_CONTAINER);
const fromBlob = await bucket.download(artifact.blobKey);
expect(fromBlob.toString()).toBe('hello log output');
});
it('list by job is partition-isolated (returns only that job)', async () => {
await artifactsBlob.uploadArtifact({
productId: PID,
jobId: 'jA',
kind: 'log',
bytes: Buffer.from('a1'),
contentType: 'text/plain',
});
await artifactsBlob.uploadArtifact({
productId: PID,
jobId: 'jA',
kind: 'coverage',
bytes: Buffer.from('a2'),
contentType: 'application/json',
});
await artifactsBlob.uploadArtifact({
productId: PID,
jobId: 'jB',
kind: 'screenshot',
bytes: Buffer.from('b1'),
contentType: 'image/png',
});
const a = await repo.listArtifactsByJob('jA');
const b = await repo.listArtifactsByJob('jB');
expect(a).toHaveLength(2);
expect(a.every(x => x.jobId === 'jA')).toBe(true);
expect(b).toHaveLength(1);
expect(b[0].jobId).toBe('jB');
});
it('get re-issues a fresh SAS URL; a large (>Cosmos-safe) payload still succeeds (blob offload)', async () => {
// 3 MB — comfortably beyond the ~2 MB Cosmos document ceiling, so this can
// only succeed because the bytes are offloaded to blob, not stored inline.
const big = Buffer.alloc(3 * 1024 * 1024, 0x61);
const { artifact } = await artifactsBlob.uploadArtifact({
productId: PID,
jobId: 'jbig',
kind: 'build',
bytes: big,
contentType: 'application/octet-stream',
});
expect(artifact.sizeBytes).toBe(3 * 1024 * 1024);
const dl = await artifactsBlob.getArtifactDownload(artifact.id, PID);
expect(dl).not.toBeNull();
expect(dl?.downloadUrl).toContain('signed=true');
expect(dl?.downloadUrl).toContain(artifact.blobKey);
// product-scoped: a foreign product cannot fetch it
expect(await artifactsBlob.getArtifactDownload(artifact.id, 'other-product')).toBeNull();
// bytes round-trip from blob at full size
const bucket = await getBucket(artifactsBlob.FLEET_ARTIFACTS_CONTAINER);
expect((await bucket.download(artifact.blobKey)).length).toBe(3 * 1024 * 1024);
});
it('delete removes both the pointer and the backing blob', async () => {
const { artifact } = await artifactsBlob.uploadArtifact({
productId: PID,
jobId: 'jd',
kind: 'other',
bytes: Buffer.from('tmp'),
contentType: 'text/plain',
});
const bucket = await getBucket(artifactsBlob.FLEET_ARTIFACTS_CONTAINER);
expect(await bucket.exists(artifact.blobKey)).toBe(true);
expect(await artifactsBlob.deleteArtifact(artifact.id, PID)).toBe(true);
expect(await repo.getArtifact(artifact.id, PID)).toBeNull();
expect(await bucket.exists(artifact.blobKey)).toBe(false);
// idempotent / unknown → false
expect(await artifactsBlob.deleteArtifact(artifact.id, PID)).toBe(false);
});
});
describe('fleet artifacts — routes (fastify inject)', () => {
const b64 = (s: string) => Buffer.from(s).toString('base64');
it('POST upload → 201 with pointer + SAS; GET list → the pointer; bytes not echoed', async () => {
const app = await buildApp();
const res = await app.inject({
method: 'POST',
url: '/api/fleet/jobs/jr1/artifacts',
payload: { kind: 'log', contentType: 'text/plain', contentBase64: b64('route log bytes') },
});
expect(res.statusCode).toBe(201);
const body = JSON.parse(res.body);
expect(body.artifact.jobId).toBe('jr1');
expect(body.artifact.blobKey).toBe(`fleet/${PID}/jr1/${body.artifact.id}-log`);
expect(body.artifact.sizeBytes).toBe(Buffer.from('route log bytes').length);
expect(body.downloadUrl).toContain('signed=true');
for (const field of INLINE_PAYLOAD_FIELDS) {
expect(body.artifact).not.toHaveProperty(field);
}
const list = await app.inject({ method: 'GET', url: '/api/fleet/jobs/jr1/artifacts' });
expect(list.statusCode).toBe(200);
const arts = JSON.parse(list.body).artifacts;
expect(arts).toHaveLength(1);
expect(arts[0].id).toBe(body.artifact.id);
});
it('GET /fleet/artifacts/:id returns a fresh SAS URL; DELETE removes it', async () => {
const app = await buildApp();
const up = await app.inject({
method: 'POST',
url: '/api/fleet/jobs/jr2/artifacts',
payload: {
kind: 'coverage',
contentType: 'application/json',
contentBase64: b64('{"pct":91}'),
},
});
const id = JSON.parse(up.body).artifact.id as string;
const got = await app.inject({ method: 'GET', url: `/api/fleet/artifacts/${id}` });
expect(got.statusCode).toBe(200);
const gotBody = JSON.parse(got.body);
expect(gotBody.artifact.id).toBe(id);
expect(gotBody.downloadUrl).toContain('signed=true');
const del = await app.inject({ method: 'DELETE', url: `/api/fleet/artifacts/${id}` });
expect(del.statusCode).toBe(200);
expect(JSON.parse(del.body).deleted).toBe(true);
// gone now
const after = await app.inject({ method: 'GET', url: `/api/fleet/artifacts/${id}` });
expect(after.statusCode).toBe(404);
});
it('rejects an invalid upload body (400) and unknown artifact ids (404)', async () => {
const app = await buildApp();
const bad = await app.inject({
method: 'POST',
url: '/api/fleet/jobs/jr3/artifacts',
payload: { contentBase64: b64('x') }, // missing kind
});
expect(bad.statusCode).toBe(400);
const getMissing = await app.inject({ method: 'GET', url: '/api/fleet/artifacts/nope' });
expect(getMissing.statusCode).toBe(404);
const delMissing = await app.inject({ method: 'DELETE', url: '/api/fleet/artifacts/nope' });
expect(delMissing.statusCode).toBe(404);
});
});

View File

@ -171,17 +171,29 @@ describe('fleet repository', () => {
expect(events.map(e => e.type)).toEqual(['submitted', 'assigned', 'transition']);
});
it('artifacts: create + list', async () => {
it('artifacts: create + listByJob + get + delete (pointer only)', async () => {
await repo.createArtifact({
id: 'art_1',
productId: PID,
jobId: 'j',
kind: 'coverage',
blobUrl: 'https://b/x',
blobKey: 'fleet/lysnrai/j/art_1-coverage',
contentType: 'application/json',
sizeBytes: 42,
createdAt: now,
});
const arts = await repo.listArtifacts('j');
const arts = await repo.listArtifactsByJob('j');
expect(arts).toHaveLength(1);
expect(arts[0].blobUrl).toBe('https://b/x');
expect(arts[0].blobKey).toBe('fleet/lysnrai/j/art_1-coverage');
// get is product-scoped (wrong product → null)
expect((await repo.getArtifact('art_1', PID))?.contentType).toBe('application/json');
expect(await repo.getArtifact('art_1', 'other-product')).toBeNull();
// delete returns the removed doc and clears the partition
const removed = await repo.deleteArtifact('art_1', PID);
expect(removed?.id).toBe('art_1');
expect(await repo.listArtifactsByJob('j')).toHaveLength(0);
expect(await repo.deleteArtifact('art_1', PID)).toBeNull();
});
});

View File

@ -247,12 +247,39 @@ export async function listEvents(jobId: string): Promise<FleetEventDoc[]> {
return docs;
}
// ── Artifacts ─────────────────────────────────────────────────────────────────
// ── Artifacts (pointers only — bytes live in blob, never Cosmos; §13) ──────────
export async function createArtifact(doc: FleetArtifactDoc): Promise<FleetArtifactDoc> {
return artifacts().create(doc);
}
export async function listArtifacts(jobId: string): Promise<FleetArtifactDoc[]> {
/** All artifact pointers for a job, oldest-first (single partition — pk `/jobId`). */
export async function listArtifactsByJob(jobId: string): Promise<FleetArtifactDoc[]> {
return artifacts().findMany({ filter: { jobId }, sort: { createdAt: 1 } });
}
/**
* Fetch one artifact pointer by id, scoped to its owning product. The container
* is partitioned by `/jobId` (not `/productId`), so this is a small filtered
* query rather than a point read; the `productId` predicate enforces ownership
* for the by-id routes (foreign-product / unknown id null).
*/
export async function getArtifact(id: string, productId: string): Promise<FleetArtifactDoc | null> {
const found = await artifacts().findMany({ filter: { id, productId }, limit: 1 });
return found[0] ?? null;
}
/**
* Delete an artifact pointer by id, scoped to its owning product. Resolves the
* partition (`jobId`) from the located doc, then deletes. Returns the removed doc
* (so callers can also clean up the backing blob) or null when not found.
*/
export async function deleteArtifact(
id: string,
productId: string
): Promise<FleetArtifactDoc | null> {
const doc = await getArtifact(id, productId);
if (!doc) return null;
await artifacts().delete(id, doc.jobId);
return doc;
}

View File

@ -11,6 +11,10 @@
* POST /fleet/factories/heartbeat factory liveness
* GET /fleet/jobs/:id/runs job run history
* GET /fleet/jobs/:id/events append-only event stream
* POST /fleet/jobs/:id/artifacts upload a run output (base64 body blob + pointer)
* GET /fleet/jobs/:id/artifacts list a job's artifact pointers
* GET /fleet/artifacts/:artifactId pointer + fresh short-lived SAS download URL
* DELETE /fleet/artifacts/:artifactId delete pointer (and backing blob)
*
* All routes require auth + a resolved productId, exactly like the items module.
*/
@ -21,6 +25,7 @@ import { BadRequestError, ConflictError, NotFoundError } from '../../lib/errors.
import { extractAuth } from '../../lib/auth.js';
import * as repo from './repository.js';
import * as coordinator from './coordinator.js';
import * as artifactsBlob from './artifacts-blob.js';
import {
SubmitJobSchema,
ListJobsQuerySchema,
@ -29,6 +34,7 @@ import {
RenewLeaseSchema,
ReleaseLeaseSchema,
HeartbeatSchema,
UploadArtifactSchema,
} from './types.js';
function badRequest(issues: { message: string }[]): never {
@ -179,4 +185,53 @@ export async function fleetRoutes(app: FastifyInstance) {
const events = await repo.listEvents(id);
return { events };
});
// ── Artifacts: upload (base64 body → blob + pointer) ──
app.post('/fleet/jobs/:id/artifacts', async (req, reply) => {
await extractAuth(req);
const { id: jobId } = req.params as { id: string };
const parsed = UploadArtifactSchema.safeParse(req.body);
if (!parsed.success) badRequest(parsed.error.issues);
const pid = parsed.data.productId || getRequestProductId(req);
const bytes = Buffer.from(parsed.data.contentBase64, 'base64');
if (bytes.length === 0) badRequest([{ message: 'contentBase64 decoded to empty bytes' }]);
const { artifact, downloadUrl } = await artifactsBlob.uploadArtifact({
productId: pid,
jobId,
kind: parsed.data.kind,
bytes,
contentType: parsed.data.contentType,
runId: parsed.data.runId,
});
reply.code(201);
return { artifact, downloadUrl };
});
// ── Artifacts: list a job's pointers ──
app.get('/fleet/jobs/:id/artifacts', async req => {
await extractAuth(req);
const { id: jobId } = req.params as { id: string };
const artifacts = await repo.listArtifactsByJob(jobId);
return { artifacts };
});
// ── Artifacts: pointer + fresh SAS download URL ──
app.get('/fleet/artifacts/:artifactId', async req => {
await extractAuth(req);
const { artifactId } = req.params as { artifactId: string };
const pid = getRequestProductId(req);
const found = await artifactsBlob.getArtifactDownload(artifactId, pid);
if (!found) throw new NotFoundError('Artifact not found');
return found;
});
// ── Artifacts: delete pointer (and backing blob) ──
app.delete('/fleet/artifacts/:artifactId', async req => {
await extractAuth(req);
const { artifactId } = req.params as { artifactId: string };
const pid = getRequestProductId(req);
const deleted = await artifactsBlob.deleteArtifact(artifactId, pid);
if (!deleted) throw new NotFoundError('Artifact not found');
return { deleted: true };
});
}

View File

@ -160,18 +160,22 @@ describe('FleetProfileDocSchema / FleetEventDocSchema / FleetArtifactDocSchema',
const { type: _t, ...bad } = valid;
expect(FleetEventDocSchema.safeParse(bad).success).toBe(false);
});
it('accepts a valid artifact and rejects missing blobUrl', () => {
it('accepts a valid artifact pointer and rejects missing blobKey', () => {
const valid = {
id: 'art_1',
productId: 'lysnrai',
jobId: 'fjob_1',
kind: 'coverage',
blobUrl: 'https://b/x',
blobKey: 'fleet/lysnrai/fjob_1/art_1-coverage',
contentType: 'application/json',
sizeBytes: 1234,
createdAt: now,
};
expect(FleetArtifactDocSchema.safeParse(valid).success).toBe(true);
const { blobUrl: _b, ...bad } = valid;
const { blobKey: _b, ...bad } = valid;
expect(FleetArtifactDocSchema.safeParse(bad).success).toBe(false);
// kind is a closed enum — an unknown kind is rejected
expect(FleetArtifactDocSchema.safeParse({ ...valid, kind: 'bogus' }).success).toBe(false);
});
});

View File

@ -76,6 +76,10 @@ export type LeaseStatus = (typeof LEASE_STATUS)[number];
export const JOB_KINDS = ['leaf', 'composite'] as const;
export type JobKind = (typeof JOB_KINDS)[number];
/** Artifact categories (§13/§26). Large outputs live in blob; Cosmos holds pointers only. */
export const FLEET_ARTIFACT_KINDS = ['log', 'coverage', 'screenshot', 'build', 'other'] as const;
export type FleetArtifactKind = (typeof FLEET_ARTIFACT_KINDS)[number];
// ── Shared value objects ─────────────────────────────────────────────────────
export const CheckpointSchema = z.object({
@ -232,15 +236,25 @@ export const FleetEventDocSchema = z.object({
});
export type FleetEventDoc = z.infer<typeof FleetEventDocSchema>;
/** FleetArtifactDoc — pointer to a blob-stored artifact (pk `/jobId`). No inline logs. */
/**
* FleetArtifactDoc a POINTER to a blob-stored run output (pk `/jobId`).
*
* Large outputs (logs, coverage, screenshots, build output) are written to blob
* storage; only this pointer (blob key + size/content-type/sha256 metadata) lives
* in Cosmos NEVER the bytes themselves (doc-size + RU limits, §13). The
* short-lived SAS read URL is minted on demand from `blobKey` and is intentionally
* NOT persisted on the doc.
*/
export const FleetArtifactDocSchema = z.object({
id: z.string(),
productId: z.string().min(1),
jobId: z.string().min(1),
runId: z.string().optional(),
kind: z.string().min(1),
blobUrl: z.string().min(1),
sizeBytes: z.number().int().nonnegative().optional(),
kind: z.enum(FLEET_ARTIFACT_KINDS),
blobKey: z.string().min(1),
contentType: z.string().min(1),
sizeBytes: z.number().int().nonnegative(),
sha256: z.string().optional(),
createdAt: z.string(),
});
export type FleetArtifactDoc = z.infer<typeof FleetArtifactDocSchema>;
@ -315,3 +329,17 @@ export const HeartbeatSchema = z.object({
seatLimit: z.number().int().positive().optional(),
});
export type HeartbeatInput = z.infer<typeof HeartbeatSchema>;
/**
* Upload an artifact for a job. The bytes are carried base64-encoded in the JSON
* body (large content is offloaded to blob server-side; nothing is stored inline
* in Cosmos). `productId` may override the request-resolved product.
*/
export const UploadArtifactSchema = z.object({
productId: z.string().min(1).optional(),
runId: z.string().min(1).optional(),
kind: z.enum(FLEET_ARTIFACT_KINDS),
contentType: z.string().min(1).default('application/octet-stream'),
contentBase64: z.string().min(1),
});
export type UploadArtifactInput = z.infer<typeof UploadArtifactSchema>;