From 8eb02c48aa4c91a364cdb0ec0b16bce2c9370ed8 Mon Sep 17 00:00:00 2001 From: saravanakumardb1 Date: Fri, 29 May 2026 20:20:46 -0700 Subject: [PATCH] feat(platform-service): fleet REST routes + module registration (P2 foundation) Guarded REST under /api (auth + productId, like items): POST /fleet/jobs (idempotent submit), GET /fleet/jobs (by stage/idempotencyKey), GET /fleet/jobs/:id, PATCH /fleet/jobs/:id (fenced transition), POST /fleet/claim, lease renew/release, factories/heartbeat, and runs/events streams. Every body validated with the Zod schemas; fenced/conflict map to 409, missing to 404, invalid to 400. Registers fleetRoutes in server.ts next to itemRoutes. Routes tested via Fastify inject on the memory provider (real coordinator). --- .../src/modules/fleet/routes.test.ts | 144 ++++++++++++++ .../src/modules/fleet/routes.ts | 182 ++++++++++++++++++ services/platform-service/src/server.ts | 3 + 3 files changed, 329 insertions(+) create mode 100644 services/platform-service/src/modules/fleet/routes.test.ts create mode 100644 services/platform-service/src/modules/fleet/routes.ts diff --git a/services/platform-service/src/modules/fleet/routes.test.ts b/services/platform-service/src/modules/fleet/routes.test.ts new file mode 100644 index 00000000..b0aa7380 --- /dev/null +++ b/services/platform-service/src/modules/fleet/routes.test.ts @@ -0,0 +1,144 @@ +/** + * Fleet routes — Fastify inject, real coordinator/repo on the memory provider. + * Auth + productId resolution are mocked (as in the items module routes test). + */ + +import Fastify, { type FastifyInstance } from 'fastify'; +import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'; +import { MemoryDatastoreProvider } from '@bytelyst/datastore'; +import { _resetDatastoreProvider, setProvider } from '../../lib/datastore.js'; + +vi.mock('../../lib/auth.js', () => ({ + extractAuth: vi.fn(async () => ({ sub: 'user_1', role: 'admin' })), +})); +vi.mock('../../lib/request-context.js', () => ({ + getRequestProductId: () => 'lysnrai', +})); + +async function buildApp(): Promise { + const { fleetRoutes } = await import('./routes.js'); + // Fastify's default error handler maps a thrown ServiceError (which carries a + // `.statusCode`) to the right HTTP status — same as the items routes test. + const app = Fastify({ logger: false }); + await app.register(fleetRoutes, { prefix: '/api' }); + return app; +} + +async function submit(app: FastifyInstance, body: Record) { + return app.inject({ method: 'POST', url: '/api/fleet/jobs', payload: body }); +} + +describe('fleetRoutes', () => { + beforeEach(() => setProvider(new MemoryDatastoreProvider())); + afterEach(() => { + _resetDatastoreProvider(); + vi.clearAllMocks(); + }); + + it('POST /fleet/jobs submits (201) and is idempotent (200 dedup)', async () => { + const app = await buildApp(); + const r1 = await submit(app, { idempotencyKey: 'k1', bodyMd: '# task' }); + expect(r1.statusCode).toBe(201); + expect(JSON.parse(r1.body).outcome).toBe('created'); + + const r2 = await submit(app, { idempotencyKey: 'k1', bodyMd: '# task' }); + expect(r2.statusCode).toBe(200); + expect(JSON.parse(r2.body).outcome).toBe('deduplicated'); + }); + + it('POST /fleet/jobs rejects an invalid body (400)', async () => { + const app = await buildApp(); + const res = await submit(app, { idempotencyKey: 'k1' }); // missing bodyMd + expect(res.statusCode).toBe(400); + }); + + it('full lifecycle: submit -> list -> claim -> patch(fenced) -> renew -> release', async () => { + const app = await buildApp(); + const sub = await submit(app, { idempotencyKey: 'k1', bodyMd: '# task', priority: 'high' }); + const jobId = JSON.parse(sub.body).job.id as string; + + const list = await app.inject({ method: 'GET', url: '/api/fleet/jobs?stage=queued' }); + expect(list.statusCode).toBe(200); + expect(JSON.parse(list.body).jobs).toHaveLength(1); + + const claim = await app.inject({ + method: 'POST', + url: '/api/fleet/claim', + payload: { factoryId: 'fac_1', capabilities: [] }, + }); + expect(claim.statusCode).toBe(200); + const claimed = JSON.parse(claim.body); + expect(claimed.claimed).toBe(true); + expect(claimed.job.stage).toBe('assigned'); + const epoch = claimed.job.leaseEpoch as number; + expect(epoch).toBe(1); + + // fenced: stale epoch rejected (409) + const stale = await app.inject({ + method: 'PATCH', + url: `/api/fleet/jobs/${jobId}`, + payload: { leaseEpoch: epoch - 1, stage: 'building' }, + }); + expect(stale.statusCode).toBe(409); + + // current epoch succeeds + const patch = await app.inject({ + method: 'PATCH', + url: `/api/fleet/jobs/${jobId}`, + payload: { leaseEpoch: epoch, stage: 'building' }, + }); + expect(patch.statusCode).toBe(200); + expect(JSON.parse(patch.body).stage).toBe('building'); + + const renew = await app.inject({ + method: 'POST', + url: `/api/fleet/jobs/${jobId}/lease/renew`, + payload: { leaseEpoch: epoch, leaseSeconds: 600 }, + }); + expect(renew.statusCode).toBe(200); + expect(JSON.parse(renew.body).renewals).toBe(1); + + const release = await app.inject({ + method: 'POST', + url: `/api/fleet/jobs/${jobId}/lease/release`, + payload: { leaseEpoch: epoch, stage: 'review' }, + }); + expect(release.statusCode).toBe(200); + expect(JSON.parse(release.body).status).toBe('released'); + + const events = await app.inject({ method: 'GET', url: `/api/fleet/jobs/${jobId}/events` }); + const types = JSON.parse(events.body).events.map((e: { type: string }) => e.type); + expect(types).toContain('submitted'); + expect(types).toContain('assigned'); + }); + + it('POST /fleet/claim returns claimed:false when nothing is eligible', async () => { + const app = await buildApp(); + const res = await app.inject({ + method: 'POST', + url: '/api/fleet/claim', + payload: { factoryId: 'fac_1' }, + }); + expect(res.statusCode).toBe(200); + expect(JSON.parse(res.body).claimed).toBe(false); + }); + + it('POST /fleet/factories/heartbeat upserts a factory', async () => { + const app = await buildApp(); + const res = await app.inject({ + method: 'POST', + url: '/api/fleet/factories/heartbeat', + payload: { factoryId: 'fac_1', capabilities: ['os:mac'], health: 'ok' }, + }); + expect(res.statusCode).toBe(200); + const body = JSON.parse(res.body); + expect(body.ok).toBe(true); + expect(body.factory.factoryId).toBe('fac_1'); + }); + + it('GET /fleet/jobs/:id returns 404 when missing', async () => { + const app = await buildApp(); + const res = await app.inject({ method: 'GET', url: '/api/fleet/jobs/nope' }); + expect(res.statusCode).toBe(404); + }); +}); diff --git a/services/platform-service/src/modules/fleet/routes.ts b/services/platform-service/src/modules/fleet/routes.ts new file mode 100644 index 00000000..0d630aea --- /dev/null +++ b/services/platform-service/src/modules/fleet/routes.ts @@ -0,0 +1,182 @@ +/** + * Fleet REST endpoints — agent gigafactory coordinator. + * + * POST /fleet/jobs submit a job (idempotent) + * GET /fleet/jobs list jobs (by stage / idempotencyKey) + * GET /fleet/jobs/:id get one job + * PATCH /fleet/jobs/:id fenced state transition (carries leaseEpoch) + * POST /fleet/claim atomic claim for a factory + * POST /fleet/jobs/:id/lease/renew renew a held lease + * POST /fleet/jobs/:id/lease/release release a held lease + * POST /fleet/factories/heartbeat factory liveness + * GET /fleet/jobs/:id/runs job run history + * GET /fleet/jobs/:id/events append-only event stream + * + * All routes require auth + a resolved productId, exactly like the items module. + */ + +import type { FastifyInstance } from 'fastify'; +import { getRequestProductId } from '../../lib/request-context.js'; +import { BadRequestError, ConflictError, NotFoundError } from '../../lib/errors.js'; +import { extractAuth } from '../../lib/auth.js'; +import * as repo from './repository.js'; +import * as coordinator from './coordinator.js'; +import { + SubmitJobSchema, + ListJobsQuerySchema, + PatchJobSchema, + ClaimSchema, + RenewLeaseSchema, + ReleaseLeaseSchema, + HeartbeatSchema, +} from './types.js'; + +function badRequest(issues: { message: string }[]): never { + throw new BadRequestError(issues.map(i => i.message).join('; ')); +} + +export async function fleetRoutes(app: FastifyInstance) { + // ── Submit (idempotent) ── + app.post('/fleet/jobs', async (req, reply) => { + await extractAuth(req); + const parsed = SubmitJobSchema.safeParse(req.body); + if (!parsed.success) badRequest(parsed.error.issues); + const pid = parsed.data.productId || getRequestProductId(req); + const result = await coordinator.submitJob(pid, parsed.data); + reply.code(result.outcome === 'created' ? 201 : 200); + return { outcome: result.outcome, job: result.job }; + }); + + // ── List ── + app.get('/fleet/jobs', async req => { + await extractAuth(req); + const parsed = ListJobsQuerySchema.safeParse(req.query); + if (!parsed.success) badRequest(parsed.error.issues); + const q = parsed.data; + const pid = q.productId || getRequestProductId(req); + const jobs = await repo.listJobs({ + productId: pid, + stage: q.stage, + idempotencyKey: q.idempotencyKey, + limit: q.limit, + offset: q.offset, + }); + return { jobs, limit: q.limit, offset: q.offset }; + }); + + // ── Get one ── + app.get('/fleet/jobs/:id', async req => { + await extractAuth(req); + const { id } = req.params as { id: string }; + const pid = getRequestProductId(req); + const job = await repo.getJob(id, pid); + if (!job) throw new NotFoundError('Job not found'); + return job; + }); + + // ── Fenced state transition ── + app.patch('/fleet/jobs/:id', async req => { + await extractAuth(req); + const { id } = req.params as { id: string }; + const pid = getRequestProductId(req); + const parsed = PatchJobSchema.safeParse(req.body); + if (!parsed.success) badRequest(parsed.error.issues); + const res = await coordinator.patchJobFenced(id, pid, parsed.data); + if (!res.ok) { + if (res.reason === 'not_found') throw new NotFoundError('Job not found'); + if (res.reason === 'fenced') { + throw new ConflictError('stale leaseEpoch — transition fenced (job reassigned)'); + } + throw new ConflictError('concurrent update conflict — retry'); + } + return res.doc; + }); + + // ── Atomic claim ── + app.post('/fleet/claim', async req => { + await extractAuth(req); + const parsed = ClaimSchema.safeParse(req.body); + if (!parsed.success) badRequest(parsed.error.issues); + const pid = parsed.data.productId || getRequestProductId(req); + const claim = await coordinator.claimNextJob({ + productId: pid, + factoryId: parsed.data.factoryId, + capabilities: parsed.data.capabilities, + leaseSeconds: parsed.data.leaseSeconds, + }); + if (!claim) return { claimed: false }; + return { claimed: true, ...claim }; + }); + + // ── Lease renew ── + app.post('/fleet/jobs/:id/lease/renew', async req => { + await extractAuth(req); + const { id } = req.params as { id: string }; + const pid = getRequestProductId(req); + const parsed = RenewLeaseSchema.safeParse(req.body); + if (!parsed.success) badRequest(parsed.error.issues); + const res = await coordinator.renewLease( + id, + pid, + parsed.data.leaseEpoch, + parsed.data.leaseSeconds + ); + if (!res.ok) { + if (res.reason === 'not_found') throw new NotFoundError('Job or lease not found'); + if (res.reason === 'fenced') throw new ConflictError('stale leaseEpoch — renew fenced'); + throw new ConflictError('lease renew conflict — retry'); + } + return res.doc; + }); + + // ── Lease release ── + app.post('/fleet/jobs/:id/lease/release', async req => { + await extractAuth(req); + const { id } = req.params as { id: string }; + const pid = getRequestProductId(req); + const parsed = ReleaseLeaseSchema.safeParse(req.body); + if (!parsed.success) badRequest(parsed.error.issues); + const res = await coordinator.releaseLease(id, pid, parsed.data.leaseEpoch, parsed.data.stage); + if (!res.ok) { + if (res.reason === 'not_found') throw new NotFoundError('Job or lease not found'); + if (res.reason === 'fenced') throw new ConflictError('stale leaseEpoch — release fenced'); + throw new ConflictError('lease release conflict — retry'); + } + return res.doc; + }); + + // ── Factory heartbeat ── + app.post('/fleet/factories/heartbeat', async req => { + await extractAuth(req); + const parsed = HeartbeatSchema.safeParse(req.body); + if (!parsed.success) badRequest(parsed.error.issues); + const pid = parsed.data.productId || getRequestProductId(req); + await coordinator.heartbeat({ + productId: pid, + factoryId: parsed.data.factoryId, + descriptor: parsed.data.descriptor, + capabilities: parsed.data.capabilities, + health: parsed.data.health, + load: parsed.data.load, + seatLimit: parsed.data.seatLimit, + }); + const factory = await repo.getFactory(parsed.data.factoryId, pid); + return { ok: true, factory }; + }); + + // ── Runs ── + app.get('/fleet/jobs/:id/runs', async req => { + await extractAuth(req); + const { id } = req.params as { id: string }; + const runs = await repo.listRunsByJob(id); + return { runs }; + }); + + // ── Events ── + app.get('/fleet/jobs/:id/events', async req => { + await extractAuth(req); + const { id } = req.params as { id: string }; + const events = await repo.listEvents(id); + return { events }; + }); +} diff --git a/services/platform-service/src/server.ts b/services/platform-service/src/server.ts index 9d0546ae..13303694 100644 --- a/services/platform-service/src/server.ts +++ b/services/platform-service/src/server.ts @@ -63,6 +63,7 @@ import { licenseRoutes } from './modules/licenses/routes.js'; import { stripeRoutes } from './modules/stripe/routes.js'; import { settingsRoutes } from './modules/settings/routes.js'; import { itemRoutes } from './modules/items/routes.js'; +import { fleetRoutes } from './modules/fleet/routes.js'; import { commentRoutes } from './modules/comments/routes.js'; import { voteRoutes } from './modules/votes/routes.js'; import { publicRoutes } from './modules/public/routes.js'; @@ -207,6 +208,8 @@ await app.register(stripeRoutes, { prefix: '/api' }); await app.register(settingsRoutes, { prefix: '/api' }); // Tracker modules (merged from tracker-service) await app.register(itemRoutes, { prefix: '/api' }); +// Agent Gigafactory — fleet coordinator (jobs/claim/lease/fence/reaper) +await app.register(fleetRoutes, { prefix: '/api' }); await app.register(commentRoutes, { prefix: '/api' }); await app.register(voteRoutes, { prefix: '/api' }); // API tokens module