feat(platform-service): fleet REST routes + module registration (P2 foundation)

Guarded REST under /api (auth + productId, like items): POST /fleet/jobs (idempotent
submit), GET /fleet/jobs (by stage/idempotencyKey), GET /fleet/jobs/:id, PATCH
/fleet/jobs/:id (fenced transition), POST /fleet/claim, lease renew/release,
factories/heartbeat, and runs/events streams. Every body validated with the Zod
schemas; fenced/conflict map to 409, missing to 404, invalid to 400. Registers
fleetRoutes in server.ts next to itemRoutes. Routes tested via Fastify inject on
the memory provider (real coordinator).
This commit is contained in:
saravanakumardb1 2026-05-29 20:20:46 -07:00
parent 8f51570da7
commit 8eb02c48aa
3 changed files with 329 additions and 0 deletions

View File

@ -0,0 +1,144 @@
/**
* Fleet routes Fastify inject, real coordinator/repo on the memory provider.
* Auth + productId resolution are mocked (as in the items module routes test).
*/
import Fastify, { type FastifyInstance } from 'fastify';
import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest';
import { MemoryDatastoreProvider } from '@bytelyst/datastore';
import { _resetDatastoreProvider, setProvider } from '../../lib/datastore.js';
vi.mock('../../lib/auth.js', () => ({
extractAuth: vi.fn(async () => ({ sub: 'user_1', role: 'admin' })),
}));
vi.mock('../../lib/request-context.js', () => ({
getRequestProductId: () => 'lysnrai',
}));
async function buildApp(): Promise<FastifyInstance> {
const { fleetRoutes } = await import('./routes.js');
// Fastify's default error handler maps a thrown ServiceError (which carries a
// `.statusCode`) to the right HTTP status — same as the items routes test.
const app = Fastify({ logger: false });
await app.register(fleetRoutes, { prefix: '/api' });
return app;
}
async function submit(app: FastifyInstance, body: Record<string, unknown>) {
return app.inject({ method: 'POST', url: '/api/fleet/jobs', payload: body });
}
describe('fleetRoutes', () => {
beforeEach(() => setProvider(new MemoryDatastoreProvider()));
afterEach(() => {
_resetDatastoreProvider();
vi.clearAllMocks();
});
it('POST /fleet/jobs submits (201) and is idempotent (200 dedup)', async () => {
const app = await buildApp();
const r1 = await submit(app, { idempotencyKey: 'k1', bodyMd: '# task' });
expect(r1.statusCode).toBe(201);
expect(JSON.parse(r1.body).outcome).toBe('created');
const r2 = await submit(app, { idempotencyKey: 'k1', bodyMd: '# task' });
expect(r2.statusCode).toBe(200);
expect(JSON.parse(r2.body).outcome).toBe('deduplicated');
});
it('POST /fleet/jobs rejects an invalid body (400)', async () => {
const app = await buildApp();
const res = await submit(app, { idempotencyKey: 'k1' }); // missing bodyMd
expect(res.statusCode).toBe(400);
});
it('full lifecycle: submit -> list -> claim -> patch(fenced) -> renew -> release', async () => {
const app = await buildApp();
const sub = await submit(app, { idempotencyKey: 'k1', bodyMd: '# task', priority: 'high' });
const jobId = JSON.parse(sub.body).job.id as string;
const list = await app.inject({ method: 'GET', url: '/api/fleet/jobs?stage=queued' });
expect(list.statusCode).toBe(200);
expect(JSON.parse(list.body).jobs).toHaveLength(1);
const claim = await app.inject({
method: 'POST',
url: '/api/fleet/claim',
payload: { factoryId: 'fac_1', capabilities: [] },
});
expect(claim.statusCode).toBe(200);
const claimed = JSON.parse(claim.body);
expect(claimed.claimed).toBe(true);
expect(claimed.job.stage).toBe('assigned');
const epoch = claimed.job.leaseEpoch as number;
expect(epoch).toBe(1);
// fenced: stale epoch rejected (409)
const stale = await app.inject({
method: 'PATCH',
url: `/api/fleet/jobs/${jobId}`,
payload: { leaseEpoch: epoch - 1, stage: 'building' },
});
expect(stale.statusCode).toBe(409);
// current epoch succeeds
const patch = await app.inject({
method: 'PATCH',
url: `/api/fleet/jobs/${jobId}`,
payload: { leaseEpoch: epoch, stage: 'building' },
});
expect(patch.statusCode).toBe(200);
expect(JSON.parse(patch.body).stage).toBe('building');
const renew = await app.inject({
method: 'POST',
url: `/api/fleet/jobs/${jobId}/lease/renew`,
payload: { leaseEpoch: epoch, leaseSeconds: 600 },
});
expect(renew.statusCode).toBe(200);
expect(JSON.parse(renew.body).renewals).toBe(1);
const release = await app.inject({
method: 'POST',
url: `/api/fleet/jobs/${jobId}/lease/release`,
payload: { leaseEpoch: epoch, stage: 'review' },
});
expect(release.statusCode).toBe(200);
expect(JSON.parse(release.body).status).toBe('released');
const events = await app.inject({ method: 'GET', url: `/api/fleet/jobs/${jobId}/events` });
const types = JSON.parse(events.body).events.map((e: { type: string }) => e.type);
expect(types).toContain('submitted');
expect(types).toContain('assigned');
});
it('POST /fleet/claim returns claimed:false when nothing is eligible', async () => {
const app = await buildApp();
const res = await app.inject({
method: 'POST',
url: '/api/fleet/claim',
payload: { factoryId: 'fac_1' },
});
expect(res.statusCode).toBe(200);
expect(JSON.parse(res.body).claimed).toBe(false);
});
it('POST /fleet/factories/heartbeat upserts a factory', async () => {
const app = await buildApp();
const res = await app.inject({
method: 'POST',
url: '/api/fleet/factories/heartbeat',
payload: { factoryId: 'fac_1', capabilities: ['os:mac'], health: 'ok' },
});
expect(res.statusCode).toBe(200);
const body = JSON.parse(res.body);
expect(body.ok).toBe(true);
expect(body.factory.factoryId).toBe('fac_1');
});
it('GET /fleet/jobs/:id returns 404 when missing', async () => {
const app = await buildApp();
const res = await app.inject({ method: 'GET', url: '/api/fleet/jobs/nope' });
expect(res.statusCode).toBe(404);
});
});

View File

@ -0,0 +1,182 @@
/**
* Fleet REST endpoints agent gigafactory coordinator.
*
* POST /fleet/jobs submit a job (idempotent)
* GET /fleet/jobs list jobs (by stage / idempotencyKey)
* GET /fleet/jobs/:id get one job
* PATCH /fleet/jobs/:id fenced state transition (carries leaseEpoch)
* POST /fleet/claim atomic claim for a factory
* POST /fleet/jobs/:id/lease/renew renew a held lease
* POST /fleet/jobs/:id/lease/release release a held lease
* POST /fleet/factories/heartbeat factory liveness
* GET /fleet/jobs/:id/runs job run history
* GET /fleet/jobs/:id/events append-only event stream
*
* All routes require auth + a resolved productId, exactly like the items module.
*/
import type { FastifyInstance } from 'fastify';
import { getRequestProductId } from '../../lib/request-context.js';
import { BadRequestError, ConflictError, NotFoundError } from '../../lib/errors.js';
import { extractAuth } from '../../lib/auth.js';
import * as repo from './repository.js';
import * as coordinator from './coordinator.js';
import {
SubmitJobSchema,
ListJobsQuerySchema,
PatchJobSchema,
ClaimSchema,
RenewLeaseSchema,
ReleaseLeaseSchema,
HeartbeatSchema,
} from './types.js';
function badRequest(issues: { message: string }[]): never {
throw new BadRequestError(issues.map(i => i.message).join('; '));
}
export async function fleetRoutes(app: FastifyInstance) {
// ── Submit (idempotent) ──
app.post('/fleet/jobs', async (req, reply) => {
await extractAuth(req);
const parsed = SubmitJobSchema.safeParse(req.body);
if (!parsed.success) badRequest(parsed.error.issues);
const pid = parsed.data.productId || getRequestProductId(req);
const result = await coordinator.submitJob(pid, parsed.data);
reply.code(result.outcome === 'created' ? 201 : 200);
return { outcome: result.outcome, job: result.job };
});
// ── List ──
app.get('/fleet/jobs', async req => {
await extractAuth(req);
const parsed = ListJobsQuerySchema.safeParse(req.query);
if (!parsed.success) badRequest(parsed.error.issues);
const q = parsed.data;
const pid = q.productId || getRequestProductId(req);
const jobs = await repo.listJobs({
productId: pid,
stage: q.stage,
idempotencyKey: q.idempotencyKey,
limit: q.limit,
offset: q.offset,
});
return { jobs, limit: q.limit, offset: q.offset };
});
// ── Get one ──
app.get('/fleet/jobs/:id', async req => {
await extractAuth(req);
const { id } = req.params as { id: string };
const pid = getRequestProductId(req);
const job = await repo.getJob(id, pid);
if (!job) throw new NotFoundError('Job not found');
return job;
});
// ── Fenced state transition ──
app.patch('/fleet/jobs/:id', async req => {
await extractAuth(req);
const { id } = req.params as { id: string };
const pid = getRequestProductId(req);
const parsed = PatchJobSchema.safeParse(req.body);
if (!parsed.success) badRequest(parsed.error.issues);
const res = await coordinator.patchJobFenced(id, pid, parsed.data);
if (!res.ok) {
if (res.reason === 'not_found') throw new NotFoundError('Job not found');
if (res.reason === 'fenced') {
throw new ConflictError('stale leaseEpoch — transition fenced (job reassigned)');
}
throw new ConflictError('concurrent update conflict — retry');
}
return res.doc;
});
// ── Atomic claim ──
app.post('/fleet/claim', async req => {
await extractAuth(req);
const parsed = ClaimSchema.safeParse(req.body);
if (!parsed.success) badRequest(parsed.error.issues);
const pid = parsed.data.productId || getRequestProductId(req);
const claim = await coordinator.claimNextJob({
productId: pid,
factoryId: parsed.data.factoryId,
capabilities: parsed.data.capabilities,
leaseSeconds: parsed.data.leaseSeconds,
});
if (!claim) return { claimed: false };
return { claimed: true, ...claim };
});
// ── Lease renew ──
app.post('/fleet/jobs/:id/lease/renew', async req => {
await extractAuth(req);
const { id } = req.params as { id: string };
const pid = getRequestProductId(req);
const parsed = RenewLeaseSchema.safeParse(req.body);
if (!parsed.success) badRequest(parsed.error.issues);
const res = await coordinator.renewLease(
id,
pid,
parsed.data.leaseEpoch,
parsed.data.leaseSeconds
);
if (!res.ok) {
if (res.reason === 'not_found') throw new NotFoundError('Job or lease not found');
if (res.reason === 'fenced') throw new ConflictError('stale leaseEpoch — renew fenced');
throw new ConflictError('lease renew conflict — retry');
}
return res.doc;
});
// ── Lease release ──
app.post('/fleet/jobs/:id/lease/release', async req => {
await extractAuth(req);
const { id } = req.params as { id: string };
const pid = getRequestProductId(req);
const parsed = ReleaseLeaseSchema.safeParse(req.body);
if (!parsed.success) badRequest(parsed.error.issues);
const res = await coordinator.releaseLease(id, pid, parsed.data.leaseEpoch, parsed.data.stage);
if (!res.ok) {
if (res.reason === 'not_found') throw new NotFoundError('Job or lease not found');
if (res.reason === 'fenced') throw new ConflictError('stale leaseEpoch — release fenced');
throw new ConflictError('lease release conflict — retry');
}
return res.doc;
});
// ── Factory heartbeat ──
app.post('/fleet/factories/heartbeat', async req => {
await extractAuth(req);
const parsed = HeartbeatSchema.safeParse(req.body);
if (!parsed.success) badRequest(parsed.error.issues);
const pid = parsed.data.productId || getRequestProductId(req);
await coordinator.heartbeat({
productId: pid,
factoryId: parsed.data.factoryId,
descriptor: parsed.data.descriptor,
capabilities: parsed.data.capabilities,
health: parsed.data.health,
load: parsed.data.load,
seatLimit: parsed.data.seatLimit,
});
const factory = await repo.getFactory(parsed.data.factoryId, pid);
return { ok: true, factory };
});
// ── Runs ──
app.get('/fleet/jobs/:id/runs', async req => {
await extractAuth(req);
const { id } = req.params as { id: string };
const runs = await repo.listRunsByJob(id);
return { runs };
});
// ── Events ──
app.get('/fleet/jobs/:id/events', async req => {
await extractAuth(req);
const { id } = req.params as { id: string };
const events = await repo.listEvents(id);
return { events };
});
}

View File

@ -63,6 +63,7 @@ import { licenseRoutes } from './modules/licenses/routes.js';
import { stripeRoutes } from './modules/stripe/routes.js'; import { stripeRoutes } from './modules/stripe/routes.js';
import { settingsRoutes } from './modules/settings/routes.js'; import { settingsRoutes } from './modules/settings/routes.js';
import { itemRoutes } from './modules/items/routes.js'; import { itemRoutes } from './modules/items/routes.js';
import { fleetRoutes } from './modules/fleet/routes.js';
import { commentRoutes } from './modules/comments/routes.js'; import { commentRoutes } from './modules/comments/routes.js';
import { voteRoutes } from './modules/votes/routes.js'; import { voteRoutes } from './modules/votes/routes.js';
import { publicRoutes } from './modules/public/routes.js'; import { publicRoutes } from './modules/public/routes.js';
@ -207,6 +208,8 @@ await app.register(stripeRoutes, { prefix: '/api' });
await app.register(settingsRoutes, { prefix: '/api' }); await app.register(settingsRoutes, { prefix: '/api' });
// Tracker modules (merged from tracker-service) // Tracker modules (merged from tracker-service)
await app.register(itemRoutes, { prefix: '/api' }); await app.register(itemRoutes, { prefix: '/api' });
// Agent Gigafactory — fleet coordinator (jobs/claim/lease/fence/reaper)
await app.register(fleetRoutes, { prefix: '/api' });
await app.register(commentRoutes, { prefix: '/api' }); await app.register(commentRoutes, { prefix: '/api' });
await app.register(voteRoutes, { prefix: '/api' }); await app.register(voteRoutes, { prefix: '/api' });
// API tokens module // API tokens module