Merge: Phase 2 direct tracker -> fleet module wiring (§10) (#tracker)

This commit is contained in:
saravanakumardb1 2026-05-30 01:52:26 -07:00
commit a8538db774
5 changed files with 564 additions and 0 deletions

View File

@ -138,6 +138,24 @@ export async function deleteJob(id: string, productId: string): Promise<void> {
await jobs().delete(id, productId);
}
/**
* Record the last tracker Item status echoed for a job (§10 round-trip idempotency).
* Deliberately does NOT bump `rev` it is bookkeeping for the downstream echo, not a
* lifecycle transition, so it never interferes with the coordinator's fenced CAS.
*/
export async function setTrackerEchoedStatus(
id: string,
productId: string,
status: string
): Promise<FleetJobDoc | null> {
const cur = await jobs().findById(id, productId);
if (!cur) return null;
return jobs().update(id, productId, {
trackerEchoedStatus: status,
updatedAt: new Date().toISOString(),
});
}
// ── Runs ────────────────────────────────────────────────────────────────────
export async function createRun(doc: FleetRunDoc): Promise<FleetRunDoc> {

View File

@ -27,6 +27,7 @@ import * as repo from './repository.js';
import * as coordinator from './coordinator.js';
import * as artifactsBlob from './artifacts-blob.js';
import * as enrollment from './enrollment.js';
import * as trackerBridge from './tracker-bridge.js';
import {
SubmitJobSchema,
ListJobsQuerySchema,
@ -39,6 +40,8 @@ import {
EnrollFactorySchema,
RotateTokenSchema,
RevokeTokenSchema,
IngestItemSchema,
EchoJobSchema,
} from './types.js';
function badRequest(issues: { message: string }[]): never {
@ -99,6 +102,9 @@ export async function fleetRoutes(app: FastifyInstance) {
}
throw new ConflictError('concurrent update conflict — retry');
}
// §10: opt-in (FLEET_TRACKER_ECHO, default OFF) downstream echo of the new
// stage to the linked tracker Item. Never blocks/fails the transition.
await trackerBridge.maybeEchoOnTransition(pid, id, req.log);
return res.doc;
});
@ -160,6 +166,8 @@ export async function fleetRoutes(app: FastifyInstance) {
if (res.reason === 'fenced') throw new ConflictError('stale leaseEpoch — release fenced');
throw new ConflictError('lease release conflict — retry');
}
// §10: release often carries a terminal stage (shipped/failed) — echo it (opt-in).
if (parsed.data.stage) await trackerBridge.maybeEchoOnTransition(pid, id, req.log);
return res.doc;
});
@ -313,4 +321,26 @@ export async function fleetRoutes(app: FastifyInstance) {
});
return { revoked };
});
// ── Tracker bridge (§10): ingest an Item as a job (idempotent, scheduled by §7) ──
app.post('/fleet/tracker/ingest', async (req, reply) => {
await extractAuth(req);
const parsed = IngestItemSchema.safeParse(req.body);
if (!parsed.success) badRequest(parsed.error.issues);
const pid = getRequestProductId(req);
const result = await trackerBridge.ingestItemAsJob(pid, parsed.data.itemId, {
priority: parsed.data.priority,
});
reply.code(result.outcome === 'created' ? 201 : 200);
return { outcome: result.outcome, job: result.job };
});
// ── Tracker bridge (§10): manually echo a job's current outcome to its Item ──
app.post('/fleet/tracker/echo', async req => {
await extractAuth(req);
const parsed = EchoJobSchema.safeParse(req.body);
if (!parsed.success) badRequest(parsed.error.issues);
const pid = getRequestProductId(req);
return trackerBridge.echoJobToItem(pid, parsed.data.jobId, req.log);
});
}

View File

@ -0,0 +1,256 @@
/**
* Tracker fleet bridge (§10) in-process round-trip on the memory providers.
* Service tests call the bridge directly; route tests use Fastify inject to exercise
* the gated auto-echo (FLEET_TRACKER_ECHO, default OFF). Auth + productId are mocked
* exactly like the other fleet routes tests; NO live HTTP.
*/
import Fastify, { type FastifyInstance } from 'fastify';
import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest';
import { MemoryDatastoreProvider } from '@bytelyst/datastore';
import { _resetDatastoreProvider, setProvider } from '../../lib/datastore.js';
import * as itemsRepo from '../items/repository.js';
import * as commentsRepo from '../comments/repository.js';
import type { TrackerItemDoc } from '../items/types.js';
import * as repo from './repository.js';
import * as coordinator from './coordinator.js';
import * as bridge from './tracker-bridge.js';
import { SubmitJobSchema } from './types.js';
vi.mock('../../lib/auth.js', () => ({
extractAuth: vi.fn(async () => ({ sub: 'user_1', role: 'admin' })),
}));
vi.mock('../../lib/request-context.js', () => ({
getRequestProductId: () => 'lysnrai',
}));
const PID = 'lysnrai';
function makeItem(over: Partial<TrackerItemDoc> = {}): TrackerItemDoc {
const now = new Date().toISOString();
return {
id: 'IT-1',
productId: PID,
type: 'task',
status: 'open',
priority: 'medium',
title: 'Fix the widget',
description: 'Do A then B.',
labels: [],
assignee: null,
reportedBy: 'user_1',
source: 'internal',
visibility: 'internal',
voteCount: 0,
commentCount: 0,
priorityOrder: 2,
targetRelease: null,
createdAt: now,
updatedAt: now,
...over,
};
}
async function buildApp(): Promise<FastifyInstance> {
const { fleetRoutes } = await import('./routes.js');
const app = Fastify({ logger: false });
await app.register(fleetRoutes, { prefix: '/api' });
return app;
}
describe('fleet tracker bridge (§10)', () => {
beforeEach(() => {
setProvider(new MemoryDatastoreProvider());
delete process.env.FLEET_TRACKER_ECHO;
});
afterEach(() => {
_resetDatastoreProvider();
delete process.env.FLEET_TRACKER_ECHO;
vi.restoreAllMocks();
});
// ── ingest ──
it('ingest creates exactly one claimable job (trackerItemId, bodyMd, stable key)', async () => {
await itemsRepo.create(makeItem());
const { job, outcome } = await bridge.ingestItemAsJob(PID, 'IT-1');
expect(outcome).toBe('created');
expect(job.trackerItemId).toBe('IT-1');
expect(job.idempotencyKey).toBe('tracker-IT-1');
expect(job.bodyMd).toContain('Do A then B.');
expect(job.stage).toBe('queued');
// claimable through the normal scheduler path (claim unchanged)
const claim = await coordinator.claimNextJob({
productId: PID,
factoryId: 'fac_1',
capabilities: [],
leaseSeconds: 900,
});
expect(claim?.job.id).toBe(job.id);
});
it('ingest maps labels → priority + capabilities', async () => {
await itemsRepo.create(
makeItem({ labels: ['engine-class:agentic-coder', 'priority:high', 'cap:os:mac'] })
);
const { job } = await bridge.ingestItemAsJob(PID, 'IT-1');
expect(job.priority).toBe('high');
expect(job.capabilities).toEqual(['os:mac']);
expect(job.engineClass).toBe('agentic-coder');
});
it('ingest is idempotent (same Item twice → one job, deduplicated)', async () => {
await itemsRepo.create(makeItem());
const first = await bridge.ingestItemAsJob(PID, 'IT-1');
const second = await bridge.ingestItemAsJob(PID, 'IT-1');
expect(second.outcome).toBe('deduplicated');
expect(second.job.id).toBe(first.job.id);
const all = await repo.findJobsByIdempotencyKey(PID, 'tracker-IT-1');
expect(all).toHaveLength(1);
});
it('ingest for a foreign productId → not found (isolation)', async () => {
await itemsRepo.create(makeItem());
await expect(bridge.ingestItemAsJob('other-product', 'IT-1')).rejects.toThrow(/not found/i);
});
// ── echo ──
it('echo round-trip: queued→building→shipped drives Item in_progress→done with metrics-only notes', async () => {
await itemsRepo.create(makeItem({ description: 'SECRET-BODY-SENTINEL do the work' }));
const { job } = await bridge.ingestItemAsJob(PID, 'IT-1');
const e1 = await bridge.echoJobToItem(PID, job.id);
expect(e1.echoed).toBe('in_progress');
expect((await itemsRepo.getById('IT-1'))?.status).toBe('in_progress');
await repo.updateJob(job.id, PID, { stage: 'shipped' });
const e2 = await bridge.echoJobToItem(PID, job.id);
expect(e2.echoed).toBe('done');
expect((await itemsRepo.getById('IT-1'))?.status).toBe('done');
// metrics-only comments — never the prompt body
const comments = await commentsRepo.listByItem('IT-1');
expect(comments.length).toBeGreaterThanOrEqual(2);
for (const c of comments) {
expect(c.body).not.toContain('SECRET-BODY-SENTINEL');
expect(c.body).toContain('attempts=');
}
});
it('echo failed → Item wont_fix (the Item vocabulary has no "blocked")', async () => {
await itemsRepo.create(makeItem());
const { job } = await bridge.ingestItemAsJob(PID, 'IT-1');
await repo.updateJob(job.id, PID, { stage: 'failed' });
const res = await bridge.echoJobToItem(PID, job.id);
expect(res.echoed).toBe('wont_fix');
expect((await itemsRepo.getById('IT-1'))?.status).toBe('wont_fix');
});
it('echo is idempotent (re-echo of an unchanged stage → no duplicate Item write)', async () => {
await itemsRepo.create(makeItem());
const { job } = await bridge.ingestItemAsJob(PID, 'IT-1');
await bridge.echoJobToItem(PID, job.id);
const after1 = (await itemsRepo.getById('IT-1'))?.commentCount;
const res = await bridge.echoJobToItem(PID, job.id);
expect(res.echoed).toBe('in_progress');
const after2 = (await itemsRepo.getById('IT-1'))?.commentCount;
expect(after2).toBe(after1); // no extra comment written
expect(await commentsRepo.listByItem('IT-1')).toHaveLength(after1 ?? 0);
});
it('echo is non-fatal: an items-write failure → { echoed: null, error }, job untouched', async () => {
await itemsRepo.create(makeItem());
const { job } = await bridge.ingestItemAsJob(PID, 'IT-1');
vi.spyOn(itemsRepo, 'update').mockRejectedValueOnce(new Error('cosmos down'));
const res = await bridge.echoJobToItem(PID, job.id);
expect(res.echoed).toBeNull();
expect(res.error).toMatch(/cosmos down/);
// job state untouched: not marked echoed, still queued
const after = await repo.getJob(job.id, PID);
expect(after?.trackerEchoedStatus).toBeUndefined();
expect(after?.stage).toBe('queued');
});
it('echo no-op for a job with no trackerItemId', async () => {
const { job } = await coordinator.submitJob(
PID,
SubmitJobSchema.parse({ idempotencyKey: 'plain', bodyMd: '# t' })
);
const res = await bridge.echoJobToItem(PID, job.id);
expect(res.echoed).toBeNull();
});
// ── route + flag gating ──
it('auto-echo OFF (default flag): a PATCH transition performs ZERO items writes', async () => {
await itemsRepo.create(makeItem());
const app = await buildApp();
const ingest = await app.inject({
method: 'POST',
url: '/api/fleet/tracker/ingest',
payload: { itemId: 'IT-1' },
});
expect(ingest.statusCode).toBe(201);
const jobId = JSON.parse(ingest.body).job.id as string;
const claim = await app.inject({
method: 'POST',
url: '/api/fleet/claim',
payload: { factoryId: 'fac_1', capabilities: [] },
});
const leaseEpoch = JSON.parse(claim.body).job.leaseEpoch as number;
const patch = await app.inject({
method: 'PATCH',
url: `/api/fleet/jobs/${jobId}`,
payload: { stage: 'building', leaseEpoch },
});
expect(patch.statusCode).toBe(200);
// flag OFF ⇒ Item untouched, no comments
expect((await itemsRepo.getById('IT-1'))?.status).toBe('open');
expect(await commentsRepo.listByItem('IT-1')).toHaveLength(0);
});
it('auto-echo ON (FLEET_TRACKER_ECHO=1): a PATCH transition echoes to the Item', async () => {
await itemsRepo.create(makeItem());
const app = await buildApp();
const ingest = await app.inject({
method: 'POST',
url: '/api/fleet/tracker/ingest',
payload: { itemId: 'IT-1' },
});
const jobId = JSON.parse(ingest.body).job.id as string;
const claim = await app.inject({
method: 'POST',
url: '/api/fleet/claim',
payload: { factoryId: 'fac_1', capabilities: [] },
});
const leaseEpoch = JSON.parse(claim.body).job.leaseEpoch as number;
process.env.FLEET_TRACKER_ECHO = '1';
const patch = await app.inject({
method: 'PATCH',
url: `/api/fleet/jobs/${jobId}`,
payload: { stage: 'building', leaseEpoch },
});
expect(patch.statusCode).toBe(200);
expect((await itemsRepo.getById('IT-1'))?.status).toBe('in_progress');
});
it('manual echo route returns the echoed status', async () => {
await itemsRepo.create(makeItem());
const app = await buildApp();
const ingest = await app.inject({
method: 'POST',
url: '/api/fleet/tracker/ingest',
payload: { itemId: 'IT-1' },
});
const jobId = JSON.parse(ingest.body).job.id as string;
const echo = await app.inject({
method: 'POST',
url: '/api/fleet/tracker/echo',
payload: { jobId },
});
expect(echo.statusCode).toBe(200);
expect(JSON.parse(echo.body).echoed).toBe('in_progress');
});
});

View File

@ -0,0 +1,241 @@
/**
* Tracker fleet bridge (§10 / §24.5) the in-process, no-shell-hop round-trip.
*
* `ingestItemAsJob` turns a tracker Item into a fleet job submitted through the
* coordinator (so it is scheduled by the §7 router like any other job), reusing the
* existing `trackerItemId` field + a stable idempotency-key so re-ingest dedupes.
*
* `echoJobToItem` mirrors the job's lifecycle back onto the Item (status + a
* metrics-only comment). Echo is BEST-EFFORT and downstream: an items-write failure
* is surfaced as `{ echoed: null, error }` and is NEVER allowed to fail the job.
*
* This module calls the items + comments + fleet repositories DIRECTLY no HTTP,
* no Fastify types here.
*/
import { randomUUID } from 'node:crypto';
import { NotFoundError } from '../../lib/errors.js';
import * as itemsRepo from '../items/repository.js';
import * as commentsRepo from '../comments/repository.js';
import type { ItemStatus, TrackerItemDoc } from '../items/types.js';
import * as repo from './repository.js';
import * as coordinator from './coordinator.js';
import type { SubmitResult } from './coordinator.js';
import {
FLEET_ENGINE_CLASSES,
FLEET_PRIORITIES,
SubmitJobSchema,
type FleetEngineClass,
type FleetJobDoc,
type FleetPriority,
type FleetStage,
} from './types.js';
/** Minimal logger shape (pino/req.log compatible) — keeps Fastify out of this module. */
export interface BridgeLogger {
error: (obj: unknown, msg?: string) => void;
}
/** Config flag (default OFF): auto-echo on server-side stage transitions. */
export function trackerEchoEnabled(): boolean {
const v = (process.env.FLEET_TRACKER_ECHO ?? '').trim().toLowerCase();
return v === '1' || v === 'true' || v === 'yes' || v === 'on';
}
// ── Item → job ────────────────────────────────────────────────────────────────
interface LabelHints {
priority?: FleetPriority;
engineClass?: FleetEngineClass;
profile?: string;
capabilities: string[];
}
/** Parse manifest hints carried as Item labels: engine-class:/profile:/priority:/cap:. */
function parseLabelHints(labels: string[]): LabelHints {
const hints: LabelHints = { capabilities: [] };
for (const raw of labels) {
const label = raw.trim();
if (label.startsWith('engine-class:')) {
const v = label.slice('engine-class:'.length);
if ((FLEET_ENGINE_CLASSES as readonly string[]).includes(v))
hints.engineClass = v as FleetEngineClass;
} else if (label.startsWith('profile:')) {
hints.profile = label.slice('profile:'.length);
} else if (label.startsWith('priority:')) {
const v = label.slice('priority:'.length);
if ((FLEET_PRIORITIES as readonly string[]).includes(v)) hints.priority = v as FleetPriority;
} else if (label.startsWith('cap:')) {
const v = label.slice('cap:'.length);
if (v) hints.capabilities.push(v);
}
}
return hints;
}
function coerceItemPriority(p: string): FleetPriority | undefined {
return (FLEET_PRIORITIES as readonly string[]).includes(p) ? (p as FleetPriority) : undefined;
}
/** Stable, idempotent derived key so re-ingesting the same Item never duplicates. */
export function trackerIdempotencyKey(itemId: string): string {
return `tracker-${itemId}`;
}
export interface IngestOptions {
/** Override the derived priority (else label hint, else the Item's own priority). */
priority?: FleetPriority;
}
/**
* Read a tracker Item and submit it as a fleet job (idempotent on
* `tracker-<itemId>`). Foreign productId / unknown item NotFoundError.
*/
export async function ingestItemAsJob(
productId: string,
itemId: string,
opts?: IngestOptions
): Promise<SubmitResult> {
const item = await itemsRepo.getById(itemId);
if (!item || item.productId !== productId) {
throw new NotFoundError(`tracker item '${itemId}' not found`);
}
const hints = parseLabelHints(item.labels);
const priority =
opts?.priority ?? hints.priority ?? coerceItemPriority(item.priority) ?? 'medium';
const description = item.description?.trim() ?? '';
const bodyMd = description ? `# ${item.title}\n\n${item.description}` : `# ${item.title}`;
const input = SubmitJobSchema.parse({
idempotencyKey: trackerIdempotencyKey(itemId),
bodyMd,
priority,
capabilities: hints.capabilities,
engineClass: hints.engineClass,
profile: hints.profile,
trackerItemId: itemId,
});
return coordinator.submitJob(productId, input);
}
// ── Job → Item (one-way echo, §24.5) ───────────────────────────────────────────
export interface EchoResult {
echoed: ItemStatus | null;
error?: string;
}
/**
* Map a fleet stage to a tracker Item status (full lifecycle, both directions).
* The Item vocabulary has no `blocked`, so a terminal failure maps to `wont_fix`.
*/
export function stageToItemStatus(stage: FleetStage): ItemStatus {
switch (stage) {
case 'shipped':
return 'done';
case 'failed':
case 'dead_letter':
return 'wont_fix';
default:
// queued | blocked | assigned | building | review | testing
return 'in_progress';
}
}
async function writeMetricsComment(
job: FleetJobDoc,
item: TrackerItemDoc,
status: ItemStatus
): Promise<void> {
const runs = await repo.listRunsByJob(job.id);
const latest = runs[runs.length - 1];
// Metrics ONLY — never the prompt body (job.bodyMd) or any secret.
const parts: string[] = [
`agent-queue fleet: job ${job.id}${job.stage} (item status=${status}).`,
`attempts=${job.attempts}.`,
];
if (latest?.startedAt && latest?.endedAt) {
const dur = Math.max(
0,
Math.round((Date.parse(latest.endedAt) - Date.parse(latest.startedAt)) / 1000)
);
if (Number.isFinite(dur)) parts.push(`duration=${dur}s.`);
}
const ins = latest?.insights;
if (ins) {
if (ins.tokensIn !== undefined || ins.tokensOut !== undefined) {
parts.push(`tokens=${ins.tokensIn ?? 0}/${ins.tokensOut ?? 0}.`);
}
if (ins.costUsd !== undefined) parts.push(`cost_usd=${ins.costUsd}.`);
}
const now = new Date().toISOString();
await commentsRepo.create({
id: `cmt_${randomUUID()}`,
itemId: item.id,
productId: item.productId,
authorId: 'fleet-coordinator',
authorEmail: null,
body: parts.join(' '),
createdAt: now,
updatedAt: now,
});
await itemsRepo.incrementCommentCount(item.id, 1);
}
/**
* Echo a job's current stage onto its linked Item (status + metrics comment).
* No trackerItemId `{ echoed: null }`. Idempotent: an unchanged outcome is a
* no-op. An items-write failure is non-fatal `{ echoed: null, error }`.
* A missing/foreign job throws NotFoundError (productId isolation).
*/
export async function echoJobToItem(
productId: string,
jobId: string,
log?: BridgeLogger
): Promise<EchoResult> {
const job = await repo.getJob(jobId, productId);
if (!job) throw new NotFoundError(`fleet job '${jobId}' not found`);
if (!job.trackerItemId) return { echoed: null };
const status = stageToItemStatus(job.stage);
if (job.trackerEchoedStatus === status) return { echoed: status };
try {
const item = await itemsRepo.getById(job.trackerItemId);
if (!item || item.productId !== productId) {
return { echoed: null, error: `linked item '${job.trackerItemId}' not found for product` };
}
await itemsRepo.update(job.trackerItemId, { status });
await writeMetricsComment(job, item, status);
await repo.setTrackerEchoedStatus(jobId, productId, status);
return { echoed: status };
} catch (err) {
const message = err instanceof Error ? err.message : String(err);
log?.error(
{ jobId, itemId: job.trackerItemId, error: message },
'fleet tracker echo failed (non-fatal)'
);
return { echoed: null, error: message };
}
}
/**
* Opt-in (FLEET_TRACKER_ECHO) best-effort echo invoked after a server-side stage
* transition. Flag OFF no-op (zero items writes). Never throws into the caller's
* transition path.
*/
export async function maybeEchoOnTransition(
productId: string,
jobId: string,
log?: BridgeLogger
): Promise<void> {
if (!trackerEchoEnabled()) return;
try {
await echoJobToItem(productId, jobId, log);
} catch (err) {
const message = err instanceof Error ? err.message : String(err);
log?.error({ jobId, error: message }, 'fleet tracker auto-echo failed (non-fatal)');
}
}

View File

@ -160,6 +160,12 @@ export const FleetJobDocSchema = z.object({
leaseEpoch: z.number().int().nonnegative().default(0),
rev: z.number().int().nonnegative().default(0),
blockedReason: z.string().optional(),
/**
* Last Item status echoed to the linked tracker Item (§10 round-trip). Used to
* make re-echo of an unchanged outcome a no-op. Optional + provider-managed
* absent on jobs that were never echoed / have no trackerItemId.
*/
trackerEchoedStatus: z.string().optional(),
createdAt: z.string(),
updatedAt: z.string(),
});
@ -392,3 +398,16 @@ export const RevokeTokenSchema = z.object({
tokenId: z.string().min(1).optional(),
});
export type RevokeTokenInput = z.infer<typeof RevokeTokenSchema>;
/** Tracker bridge (§10): ingest a tracker Item as a fleet job. */
export const IngestItemSchema = z.object({
itemId: z.string().min(1),
priority: z.enum(FLEET_PRIORITIES).optional(),
});
export type IngestItemInput = z.infer<typeof IngestItemSchema>;
/** Tracker bridge (§10): manually echo a job's current outcome to its Item. */
export const EchoJobSchema = z.object({
jobId: z.string().min(1),
});
export type EchoJobInput = z.infer<typeof EchoJobSchema>;