diff --git a/services/platform-service/src/modules/fleet/repository.ts b/services/platform-service/src/modules/fleet/repository.ts index dfc95547..905bdbe1 100644 --- a/services/platform-service/src/modules/fleet/repository.ts +++ b/services/platform-service/src/modules/fleet/repository.ts @@ -138,6 +138,24 @@ export async function deleteJob(id: string, productId: string): Promise { await jobs().delete(id, productId); } +/** + * Record the last tracker Item status echoed for a job (§10 round-trip idempotency). + * Deliberately does NOT bump `rev` — it is bookkeeping for the downstream echo, not a + * lifecycle transition, so it never interferes with the coordinator's fenced CAS. + */ +export async function setTrackerEchoedStatus( + id: string, + productId: string, + status: string +): Promise { + const cur = await jobs().findById(id, productId); + if (!cur) return null; + return jobs().update(id, productId, { + trackerEchoedStatus: status, + updatedAt: new Date().toISOString(), + }); +} + // ── Runs ──────────────────────────────────────────────────────────────────── export async function createRun(doc: FleetRunDoc): Promise { diff --git a/services/platform-service/src/modules/fleet/routes.ts b/services/platform-service/src/modules/fleet/routes.ts index 3e6144ef..b1405566 100644 --- a/services/platform-service/src/modules/fleet/routes.ts +++ b/services/platform-service/src/modules/fleet/routes.ts @@ -27,6 +27,7 @@ import * as repo from './repository.js'; import * as coordinator from './coordinator.js'; import * as artifactsBlob from './artifacts-blob.js'; import * as enrollment from './enrollment.js'; +import * as trackerBridge from './tracker-bridge.js'; import { SubmitJobSchema, ListJobsQuerySchema, @@ -39,6 +40,8 @@ import { EnrollFactorySchema, RotateTokenSchema, RevokeTokenSchema, + IngestItemSchema, + EchoJobSchema, } from './types.js'; function badRequest(issues: { message: string }[]): never { @@ -99,6 +102,9 @@ export async function fleetRoutes(app: FastifyInstance) { } throw new ConflictError('concurrent update conflict — retry'); } + // §10: opt-in (FLEET_TRACKER_ECHO, default OFF) downstream echo of the new + // stage to the linked tracker Item. Never blocks/fails the transition. + await trackerBridge.maybeEchoOnTransition(pid, id, req.log); return res.doc; }); @@ -160,6 +166,8 @@ export async function fleetRoutes(app: FastifyInstance) { if (res.reason === 'fenced') throw new ConflictError('stale leaseEpoch — release fenced'); throw new ConflictError('lease release conflict — retry'); } + // §10: release often carries a terminal stage (shipped/failed) — echo it (opt-in). + if (parsed.data.stage) await trackerBridge.maybeEchoOnTransition(pid, id, req.log); return res.doc; }); @@ -313,4 +321,26 @@ export async function fleetRoutes(app: FastifyInstance) { }); return { revoked }; }); + + // ── Tracker bridge (§10): ingest an Item as a job (idempotent, scheduled by §7) ── + app.post('/fleet/tracker/ingest', async (req, reply) => { + await extractAuth(req); + const parsed = IngestItemSchema.safeParse(req.body); + if (!parsed.success) badRequest(parsed.error.issues); + const pid = getRequestProductId(req); + const result = await trackerBridge.ingestItemAsJob(pid, parsed.data.itemId, { + priority: parsed.data.priority, + }); + reply.code(result.outcome === 'created' ? 201 : 200); + return { outcome: result.outcome, job: result.job }; + }); + + // ── Tracker bridge (§10): manually echo a job's current outcome to its Item ── + app.post('/fleet/tracker/echo', async req => { + await extractAuth(req); + const parsed = EchoJobSchema.safeParse(req.body); + if (!parsed.success) badRequest(parsed.error.issues); + const pid = getRequestProductId(req); + return trackerBridge.echoJobToItem(pid, parsed.data.jobId, req.log); + }); } diff --git a/services/platform-service/src/modules/fleet/tracker-bridge.test.ts b/services/platform-service/src/modules/fleet/tracker-bridge.test.ts new file mode 100644 index 00000000..1483e8ca --- /dev/null +++ b/services/platform-service/src/modules/fleet/tracker-bridge.test.ts @@ -0,0 +1,256 @@ +/** + * Tracker ↔ fleet bridge (§10) — in-process round-trip on the memory providers. + * Service tests call the bridge directly; route tests use Fastify inject to exercise + * the gated auto-echo (FLEET_TRACKER_ECHO, default OFF). Auth + productId are mocked + * exactly like the other fleet routes tests; NO live HTTP. + */ + +import Fastify, { type FastifyInstance } from 'fastify'; +import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'; +import { MemoryDatastoreProvider } from '@bytelyst/datastore'; +import { _resetDatastoreProvider, setProvider } from '../../lib/datastore.js'; +import * as itemsRepo from '../items/repository.js'; +import * as commentsRepo from '../comments/repository.js'; +import type { TrackerItemDoc } from '../items/types.js'; +import * as repo from './repository.js'; +import * as coordinator from './coordinator.js'; +import * as bridge from './tracker-bridge.js'; +import { SubmitJobSchema } from './types.js'; + +vi.mock('../../lib/auth.js', () => ({ + extractAuth: vi.fn(async () => ({ sub: 'user_1', role: 'admin' })), +})); +vi.mock('../../lib/request-context.js', () => ({ + getRequestProductId: () => 'lysnrai', +})); + +const PID = 'lysnrai'; + +function makeItem(over: Partial = {}): TrackerItemDoc { + const now = new Date().toISOString(); + return { + id: 'IT-1', + productId: PID, + type: 'task', + status: 'open', + priority: 'medium', + title: 'Fix the widget', + description: 'Do A then B.', + labels: [], + assignee: null, + reportedBy: 'user_1', + source: 'internal', + visibility: 'internal', + voteCount: 0, + commentCount: 0, + priorityOrder: 2, + targetRelease: null, + createdAt: now, + updatedAt: now, + ...over, + }; +} + +async function buildApp(): Promise { + const { fleetRoutes } = await import('./routes.js'); + const app = Fastify({ logger: false }); + await app.register(fleetRoutes, { prefix: '/api' }); + return app; +} + +describe('fleet tracker bridge (§10)', () => { + beforeEach(() => { + setProvider(new MemoryDatastoreProvider()); + delete process.env.FLEET_TRACKER_ECHO; + }); + afterEach(() => { + _resetDatastoreProvider(); + delete process.env.FLEET_TRACKER_ECHO; + vi.restoreAllMocks(); + }); + + // ── ingest ── + it('ingest creates exactly one claimable job (trackerItemId, bodyMd, stable key)', async () => { + await itemsRepo.create(makeItem()); + const { job, outcome } = await bridge.ingestItemAsJob(PID, 'IT-1'); + expect(outcome).toBe('created'); + expect(job.trackerItemId).toBe('IT-1'); + expect(job.idempotencyKey).toBe('tracker-IT-1'); + expect(job.bodyMd).toContain('Do A then B.'); + expect(job.stage).toBe('queued'); + + // claimable through the normal scheduler path (claim unchanged) + const claim = await coordinator.claimNextJob({ + productId: PID, + factoryId: 'fac_1', + capabilities: [], + leaseSeconds: 900, + }); + expect(claim?.job.id).toBe(job.id); + }); + + it('ingest maps labels → priority + capabilities', async () => { + await itemsRepo.create( + makeItem({ labels: ['engine-class:agentic-coder', 'priority:high', 'cap:os:mac'] }) + ); + const { job } = await bridge.ingestItemAsJob(PID, 'IT-1'); + expect(job.priority).toBe('high'); + expect(job.capabilities).toEqual(['os:mac']); + expect(job.engineClass).toBe('agentic-coder'); + }); + + it('ingest is idempotent (same Item twice → one job, deduplicated)', async () => { + await itemsRepo.create(makeItem()); + const first = await bridge.ingestItemAsJob(PID, 'IT-1'); + const second = await bridge.ingestItemAsJob(PID, 'IT-1'); + expect(second.outcome).toBe('deduplicated'); + expect(second.job.id).toBe(first.job.id); + const all = await repo.findJobsByIdempotencyKey(PID, 'tracker-IT-1'); + expect(all).toHaveLength(1); + }); + + it('ingest for a foreign productId → not found (isolation)', async () => { + await itemsRepo.create(makeItem()); + await expect(bridge.ingestItemAsJob('other-product', 'IT-1')).rejects.toThrow(/not found/i); + }); + + // ── echo ── + it('echo round-trip: queued→building→shipped drives Item in_progress→done with metrics-only notes', async () => { + await itemsRepo.create(makeItem({ description: 'SECRET-BODY-SENTINEL do the work' })); + const { job } = await bridge.ingestItemAsJob(PID, 'IT-1'); + + const e1 = await bridge.echoJobToItem(PID, job.id); + expect(e1.echoed).toBe('in_progress'); + expect((await itemsRepo.getById('IT-1'))?.status).toBe('in_progress'); + + await repo.updateJob(job.id, PID, { stage: 'shipped' }); + const e2 = await bridge.echoJobToItem(PID, job.id); + expect(e2.echoed).toBe('done'); + expect((await itemsRepo.getById('IT-1'))?.status).toBe('done'); + + // metrics-only comments — never the prompt body + const comments = await commentsRepo.listByItem('IT-1'); + expect(comments.length).toBeGreaterThanOrEqual(2); + for (const c of comments) { + expect(c.body).not.toContain('SECRET-BODY-SENTINEL'); + expect(c.body).toContain('attempts='); + } + }); + + it('echo failed → Item wont_fix (the Item vocabulary has no "blocked")', async () => { + await itemsRepo.create(makeItem()); + const { job } = await bridge.ingestItemAsJob(PID, 'IT-1'); + await repo.updateJob(job.id, PID, { stage: 'failed' }); + const res = await bridge.echoJobToItem(PID, job.id); + expect(res.echoed).toBe('wont_fix'); + expect((await itemsRepo.getById('IT-1'))?.status).toBe('wont_fix'); + }); + + it('echo is idempotent (re-echo of an unchanged stage → no duplicate Item write)', async () => { + await itemsRepo.create(makeItem()); + const { job } = await bridge.ingestItemAsJob(PID, 'IT-1'); + await bridge.echoJobToItem(PID, job.id); + const after1 = (await itemsRepo.getById('IT-1'))?.commentCount; + const res = await bridge.echoJobToItem(PID, job.id); + expect(res.echoed).toBe('in_progress'); + const after2 = (await itemsRepo.getById('IT-1'))?.commentCount; + expect(after2).toBe(after1); // no extra comment written + expect(await commentsRepo.listByItem('IT-1')).toHaveLength(after1 ?? 0); + }); + + it('echo is non-fatal: an items-write failure → { echoed: null, error }, job untouched', async () => { + await itemsRepo.create(makeItem()); + const { job } = await bridge.ingestItemAsJob(PID, 'IT-1'); + vi.spyOn(itemsRepo, 'update').mockRejectedValueOnce(new Error('cosmos down')); + const res = await bridge.echoJobToItem(PID, job.id); + expect(res.echoed).toBeNull(); + expect(res.error).toMatch(/cosmos down/); + // job state untouched: not marked echoed, still queued + const after = await repo.getJob(job.id, PID); + expect(after?.trackerEchoedStatus).toBeUndefined(); + expect(after?.stage).toBe('queued'); + }); + + it('echo no-op for a job with no trackerItemId', async () => { + const { job } = await coordinator.submitJob( + PID, + SubmitJobSchema.parse({ idempotencyKey: 'plain', bodyMd: '# t' }) + ); + const res = await bridge.echoJobToItem(PID, job.id); + expect(res.echoed).toBeNull(); + }); + + // ── route + flag gating ── + it('auto-echo OFF (default flag): a PATCH transition performs ZERO items writes', async () => { + await itemsRepo.create(makeItem()); + const app = await buildApp(); + const ingest = await app.inject({ + method: 'POST', + url: '/api/fleet/tracker/ingest', + payload: { itemId: 'IT-1' }, + }); + expect(ingest.statusCode).toBe(201); + const jobId = JSON.parse(ingest.body).job.id as string; + + const claim = await app.inject({ + method: 'POST', + url: '/api/fleet/claim', + payload: { factoryId: 'fac_1', capabilities: [] }, + }); + const leaseEpoch = JSON.parse(claim.body).job.leaseEpoch as number; + + const patch = await app.inject({ + method: 'PATCH', + url: `/api/fleet/jobs/${jobId}`, + payload: { stage: 'building', leaseEpoch }, + }); + expect(patch.statusCode).toBe(200); + // flag OFF ⇒ Item untouched, no comments + expect((await itemsRepo.getById('IT-1'))?.status).toBe('open'); + expect(await commentsRepo.listByItem('IT-1')).toHaveLength(0); + }); + + it('auto-echo ON (FLEET_TRACKER_ECHO=1): a PATCH transition echoes to the Item', async () => { + await itemsRepo.create(makeItem()); + const app = await buildApp(); + const ingest = await app.inject({ + method: 'POST', + url: '/api/fleet/tracker/ingest', + payload: { itemId: 'IT-1' }, + }); + const jobId = JSON.parse(ingest.body).job.id as string; + const claim = await app.inject({ + method: 'POST', + url: '/api/fleet/claim', + payload: { factoryId: 'fac_1', capabilities: [] }, + }); + const leaseEpoch = JSON.parse(claim.body).job.leaseEpoch as number; + + process.env.FLEET_TRACKER_ECHO = '1'; + const patch = await app.inject({ + method: 'PATCH', + url: `/api/fleet/jobs/${jobId}`, + payload: { stage: 'building', leaseEpoch }, + }); + expect(patch.statusCode).toBe(200); + expect((await itemsRepo.getById('IT-1'))?.status).toBe('in_progress'); + }); + + it('manual echo route returns the echoed status', async () => { + await itemsRepo.create(makeItem()); + const app = await buildApp(); + const ingest = await app.inject({ + method: 'POST', + url: '/api/fleet/tracker/ingest', + payload: { itemId: 'IT-1' }, + }); + const jobId = JSON.parse(ingest.body).job.id as string; + const echo = await app.inject({ + method: 'POST', + url: '/api/fleet/tracker/echo', + payload: { jobId }, + }); + expect(echo.statusCode).toBe(200); + expect(JSON.parse(echo.body).echoed).toBe('in_progress'); + }); +}); diff --git a/services/platform-service/src/modules/fleet/tracker-bridge.ts b/services/platform-service/src/modules/fleet/tracker-bridge.ts new file mode 100644 index 00000000..63955c7b --- /dev/null +++ b/services/platform-service/src/modules/fleet/tracker-bridge.ts @@ -0,0 +1,241 @@ +/** + * Tracker ↔ fleet bridge (§10 / §24.5) — the in-process, no-shell-hop round-trip. + * + * `ingestItemAsJob` turns a tracker Item into a fleet job submitted through the + * coordinator (so it is scheduled by the §7 router like any other job), reusing the + * existing `trackerItemId` field + a stable idempotency-key so re-ingest dedupes. + * + * `echoJobToItem` mirrors the job's lifecycle back onto the Item (status + a + * metrics-only comment). Echo is BEST-EFFORT and downstream: an items-write failure + * is surfaced as `{ echoed: null, error }` and is NEVER allowed to fail the job. + * + * This module calls the items + comments + fleet repositories DIRECTLY — no HTTP, + * no Fastify types here. + */ + +import { randomUUID } from 'node:crypto'; +import { NotFoundError } from '../../lib/errors.js'; +import * as itemsRepo from '../items/repository.js'; +import * as commentsRepo from '../comments/repository.js'; +import type { ItemStatus, TrackerItemDoc } from '../items/types.js'; +import * as repo from './repository.js'; +import * as coordinator from './coordinator.js'; +import type { SubmitResult } from './coordinator.js'; +import { + FLEET_ENGINE_CLASSES, + FLEET_PRIORITIES, + SubmitJobSchema, + type FleetEngineClass, + type FleetJobDoc, + type FleetPriority, + type FleetStage, +} from './types.js'; + +/** Minimal logger shape (pino/req.log compatible) — keeps Fastify out of this module. */ +export interface BridgeLogger { + error: (obj: unknown, msg?: string) => void; +} + +/** Config flag (default OFF): auto-echo on server-side stage transitions. */ +export function trackerEchoEnabled(): boolean { + const v = (process.env.FLEET_TRACKER_ECHO ?? '').trim().toLowerCase(); + return v === '1' || v === 'true' || v === 'yes' || v === 'on'; +} + +// ── Item → job ──────────────────────────────────────────────────────────────── + +interface LabelHints { + priority?: FleetPriority; + engineClass?: FleetEngineClass; + profile?: string; + capabilities: string[]; +} + +/** Parse manifest hints carried as Item labels: engine-class:/profile:/priority:/cap:. */ +function parseLabelHints(labels: string[]): LabelHints { + const hints: LabelHints = { capabilities: [] }; + for (const raw of labels) { + const label = raw.trim(); + if (label.startsWith('engine-class:')) { + const v = label.slice('engine-class:'.length); + if ((FLEET_ENGINE_CLASSES as readonly string[]).includes(v)) + hints.engineClass = v as FleetEngineClass; + } else if (label.startsWith('profile:')) { + hints.profile = label.slice('profile:'.length); + } else if (label.startsWith('priority:')) { + const v = label.slice('priority:'.length); + if ((FLEET_PRIORITIES as readonly string[]).includes(v)) hints.priority = v as FleetPriority; + } else if (label.startsWith('cap:')) { + const v = label.slice('cap:'.length); + if (v) hints.capabilities.push(v); + } + } + return hints; +} + +function coerceItemPriority(p: string): FleetPriority | undefined { + return (FLEET_PRIORITIES as readonly string[]).includes(p) ? (p as FleetPriority) : undefined; +} + +/** Stable, idempotent derived key so re-ingesting the same Item never duplicates. */ +export function trackerIdempotencyKey(itemId: string): string { + return `tracker-${itemId}`; +} + +export interface IngestOptions { + /** Override the derived priority (else label hint, else the Item's own priority). */ + priority?: FleetPriority; +} + +/** + * Read a tracker Item and submit it as a fleet job (idempotent on + * `tracker-`). Foreign productId / unknown item ⇒ NotFoundError. + */ +export async function ingestItemAsJob( + productId: string, + itemId: string, + opts?: IngestOptions +): Promise { + const item = await itemsRepo.getById(itemId); + if (!item || item.productId !== productId) { + throw new NotFoundError(`tracker item '${itemId}' not found`); + } + + const hints = parseLabelHints(item.labels); + const priority = + opts?.priority ?? hints.priority ?? coerceItemPriority(item.priority) ?? 'medium'; + const description = item.description?.trim() ?? ''; + const bodyMd = description ? `# ${item.title}\n\n${item.description}` : `# ${item.title}`; + + const input = SubmitJobSchema.parse({ + idempotencyKey: trackerIdempotencyKey(itemId), + bodyMd, + priority, + capabilities: hints.capabilities, + engineClass: hints.engineClass, + profile: hints.profile, + trackerItemId: itemId, + }); + + return coordinator.submitJob(productId, input); +} + +// ── Job → Item (one-way echo, §24.5) ─────────────────────────────────────────── + +export interface EchoResult { + echoed: ItemStatus | null; + error?: string; +} + +/** + * Map a fleet stage to a tracker Item status (full lifecycle, both directions). + * The Item vocabulary has no `blocked`, so a terminal failure maps to `wont_fix`. + */ +export function stageToItemStatus(stage: FleetStage): ItemStatus { + switch (stage) { + case 'shipped': + return 'done'; + case 'failed': + case 'dead_letter': + return 'wont_fix'; + default: + // queued | blocked | assigned | building | review | testing + return 'in_progress'; + } +} + +async function writeMetricsComment( + job: FleetJobDoc, + item: TrackerItemDoc, + status: ItemStatus +): Promise { + const runs = await repo.listRunsByJob(job.id); + const latest = runs[runs.length - 1]; + // Metrics ONLY — never the prompt body (job.bodyMd) or any secret. + const parts: string[] = [ + `agent-queue fleet: job ${job.id} → ${job.stage} (item status=${status}).`, + `attempts=${job.attempts}.`, + ]; + if (latest?.startedAt && latest?.endedAt) { + const dur = Math.max( + 0, + Math.round((Date.parse(latest.endedAt) - Date.parse(latest.startedAt)) / 1000) + ); + if (Number.isFinite(dur)) parts.push(`duration=${dur}s.`); + } + const ins = latest?.insights; + if (ins) { + if (ins.tokensIn !== undefined || ins.tokensOut !== undefined) { + parts.push(`tokens=${ins.tokensIn ?? 0}/${ins.tokensOut ?? 0}.`); + } + if (ins.costUsd !== undefined) parts.push(`cost_usd=${ins.costUsd}.`); + } + const now = new Date().toISOString(); + await commentsRepo.create({ + id: `cmt_${randomUUID()}`, + itemId: item.id, + productId: item.productId, + authorId: 'fleet-coordinator', + authorEmail: null, + body: parts.join(' '), + createdAt: now, + updatedAt: now, + }); + await itemsRepo.incrementCommentCount(item.id, 1); +} + +/** + * Echo a job's current stage onto its linked Item (status + metrics comment). + * No trackerItemId ⇒ `{ echoed: null }`. Idempotent: an unchanged outcome is a + * no-op. An items-write failure is non-fatal ⇒ `{ echoed: null, error }`. + * A missing/foreign job throws NotFoundError (productId isolation). + */ +export async function echoJobToItem( + productId: string, + jobId: string, + log?: BridgeLogger +): Promise { + const job = await repo.getJob(jobId, productId); + if (!job) throw new NotFoundError(`fleet job '${jobId}' not found`); + if (!job.trackerItemId) return { echoed: null }; + + const status = stageToItemStatus(job.stage); + if (job.trackerEchoedStatus === status) return { echoed: status }; + + try { + const item = await itemsRepo.getById(job.trackerItemId); + if (!item || item.productId !== productId) { + return { echoed: null, error: `linked item '${job.trackerItemId}' not found for product` }; + } + await itemsRepo.update(job.trackerItemId, { status }); + await writeMetricsComment(job, item, status); + await repo.setTrackerEchoedStatus(jobId, productId, status); + return { echoed: status }; + } catch (err) { + const message = err instanceof Error ? err.message : String(err); + log?.error( + { jobId, itemId: job.trackerItemId, error: message }, + 'fleet tracker echo failed (non-fatal)' + ); + return { echoed: null, error: message }; + } +} + +/** + * Opt-in (FLEET_TRACKER_ECHO) best-effort echo invoked after a server-side stage + * transition. Flag OFF ⇒ no-op (zero items writes). Never throws into the caller's + * transition path. + */ +export async function maybeEchoOnTransition( + productId: string, + jobId: string, + log?: BridgeLogger +): Promise { + if (!trackerEchoEnabled()) return; + try { + await echoJobToItem(productId, jobId, log); + } catch (err) { + const message = err instanceof Error ? err.message : String(err); + log?.error({ jobId, error: message }, 'fleet tracker auto-echo failed (non-fatal)'); + } +} diff --git a/services/platform-service/src/modules/fleet/types.ts b/services/platform-service/src/modules/fleet/types.ts index a6eb5f86..ed644902 100644 --- a/services/platform-service/src/modules/fleet/types.ts +++ b/services/platform-service/src/modules/fleet/types.ts @@ -160,6 +160,12 @@ export const FleetJobDocSchema = z.object({ leaseEpoch: z.number().int().nonnegative().default(0), rev: z.number().int().nonnegative().default(0), blockedReason: z.string().optional(), + /** + * Last Item status echoed to the linked tracker Item (§10 round-trip). Used to + * make re-echo of an unchanged outcome a no-op. Optional + provider-managed — + * absent on jobs that were never echoed / have no trackerItemId. + */ + trackerEchoedStatus: z.string().optional(), createdAt: z.string(), updatedAt: z.string(), }); @@ -392,3 +398,16 @@ export const RevokeTokenSchema = z.object({ tokenId: z.string().min(1).optional(), }); export type RevokeTokenInput = z.infer; + +/** Tracker bridge (§10): ingest a tracker Item as a fleet job. */ +export const IngestItemSchema = z.object({ + itemId: z.string().min(1), + priority: z.enum(FLEET_PRIORITIES).optional(), +}); +export type IngestItemInput = z.infer; + +/** Tracker bridge (§10): manually echo a job's current outcome to its Item. */ +export const EchoJobSchema = z.object({ + jobId: z.string().min(1), +}); +export type EchoJobInput = z.infer;