feat(platform-service): direct tracker -> fleet module wiring (§10 round-trip)
In-process tracker<->fleet bridge — no shell hop. Closes the §10 "direct
tracker->module calls" box.
- tracker-bridge.ts (new):
* ingestItemAsJob(productId, itemId, opts?) — reads the Item via the items
repository (foreign/unknown → NotFoundError), maps title/description → bodyMd
(verbatim) + labels (engine-class:/profile:/priority:/cap:) → manifest hints,
sets trackerItemId + a stable idempotency-key `tracker-<itemId>`, and submits
through coordinator.submitJob — so re-ingest dedupes and the job is scheduled by
the §7 router via the unchanged claim path.
* echoJobToItem(productId, jobId, log?) — mirrors stage → Item status
(queued/assigned/building/review/testing → in_progress; shipped → done;
failed/dead_letter → wont_fix) + a metrics-ONLY comment (attempts/duration/
tokens/cost — never the prompt body/secrets). Idempotent via the job's
`trackerEchoedStatus`; best-effort + non-fatal (items-write failure →
{ echoed: null, error }, never thrown into the job lifecycle). productId-scoped.
- Auto-echo wired into the PATCH + lease/release transitions, GATED by
FLEET_TRACKER_ECHO (default OFF → behavior byte-for-byte unchanged); never blocks
or fails the transition.
- Routes (additive): POST /fleet/tracker/ingest, POST /fleet/tracker/echo
(auth + getRequestProductId, productId-scoped).
- types.ts: optional FleetJobDoc.trackerEchoedStatus (reuses the existing
trackerItemId field; no parallel schema) + Ingest/Echo request schemas.
- repository.ts: setTrackerEchoedStatus (no rev bump — never interferes with the
fenced claim CAS).
Reuses the items + comments contracts directly (no HTTP). Does not touch
claimNextJob or the scheduler. productId on every doc; no any/console.log.
This commit is contained in:
parent
07c0d304bc
commit
b2ce22d81c
@ -138,6 +138,24 @@ export async function deleteJob(id: string, productId: string): Promise<void> {
|
||||
await jobs().delete(id, productId);
|
||||
}
|
||||
|
||||
/**
|
||||
* Record the last tracker Item status echoed for a job (§10 round-trip idempotency).
|
||||
* Deliberately does NOT bump `rev` — it is bookkeeping for the downstream echo, not a
|
||||
* lifecycle transition, so it never interferes with the coordinator's fenced CAS.
|
||||
*/
|
||||
export async function setTrackerEchoedStatus(
|
||||
id: string,
|
||||
productId: string,
|
||||
status: string
|
||||
): Promise<FleetJobDoc | null> {
|
||||
const cur = await jobs().findById(id, productId);
|
||||
if (!cur) return null;
|
||||
return jobs().update(id, productId, {
|
||||
trackerEchoedStatus: status,
|
||||
updatedAt: new Date().toISOString(),
|
||||
});
|
||||
}
|
||||
|
||||
// ── Runs ────────────────────────────────────────────────────────────────────
|
||||
|
||||
export async function createRun(doc: FleetRunDoc): Promise<FleetRunDoc> {
|
||||
|
||||
@ -27,6 +27,7 @@ import * as repo from './repository.js';
|
||||
import * as coordinator from './coordinator.js';
|
||||
import * as artifactsBlob from './artifacts-blob.js';
|
||||
import * as enrollment from './enrollment.js';
|
||||
import * as trackerBridge from './tracker-bridge.js';
|
||||
import {
|
||||
SubmitJobSchema,
|
||||
ListJobsQuerySchema,
|
||||
@ -39,6 +40,8 @@ import {
|
||||
EnrollFactorySchema,
|
||||
RotateTokenSchema,
|
||||
RevokeTokenSchema,
|
||||
IngestItemSchema,
|
||||
EchoJobSchema,
|
||||
} from './types.js';
|
||||
|
||||
function badRequest(issues: { message: string }[]): never {
|
||||
@ -99,6 +102,9 @@ export async function fleetRoutes(app: FastifyInstance) {
|
||||
}
|
||||
throw new ConflictError('concurrent update conflict — retry');
|
||||
}
|
||||
// §10: opt-in (FLEET_TRACKER_ECHO, default OFF) downstream echo of the new
|
||||
// stage to the linked tracker Item. Never blocks/fails the transition.
|
||||
await trackerBridge.maybeEchoOnTransition(pid, id, req.log);
|
||||
return res.doc;
|
||||
});
|
||||
|
||||
@ -160,6 +166,8 @@ export async function fleetRoutes(app: FastifyInstance) {
|
||||
if (res.reason === 'fenced') throw new ConflictError('stale leaseEpoch — release fenced');
|
||||
throw new ConflictError('lease release conflict — retry');
|
||||
}
|
||||
// §10: release often carries a terminal stage (shipped/failed) — echo it (opt-in).
|
||||
if (parsed.data.stage) await trackerBridge.maybeEchoOnTransition(pid, id, req.log);
|
||||
return res.doc;
|
||||
});
|
||||
|
||||
@ -313,4 +321,26 @@ export async function fleetRoutes(app: FastifyInstance) {
|
||||
});
|
||||
return { revoked };
|
||||
});
|
||||
|
||||
// ── Tracker bridge (§10): ingest an Item as a job (idempotent, scheduled by §7) ──
|
||||
app.post('/fleet/tracker/ingest', async (req, reply) => {
|
||||
await extractAuth(req);
|
||||
const parsed = IngestItemSchema.safeParse(req.body);
|
||||
if (!parsed.success) badRequest(parsed.error.issues);
|
||||
const pid = getRequestProductId(req);
|
||||
const result = await trackerBridge.ingestItemAsJob(pid, parsed.data.itemId, {
|
||||
priority: parsed.data.priority,
|
||||
});
|
||||
reply.code(result.outcome === 'created' ? 201 : 200);
|
||||
return { outcome: result.outcome, job: result.job };
|
||||
});
|
||||
|
||||
// ── Tracker bridge (§10): manually echo a job's current outcome to its Item ──
|
||||
app.post('/fleet/tracker/echo', async req => {
|
||||
await extractAuth(req);
|
||||
const parsed = EchoJobSchema.safeParse(req.body);
|
||||
if (!parsed.success) badRequest(parsed.error.issues);
|
||||
const pid = getRequestProductId(req);
|
||||
return trackerBridge.echoJobToItem(pid, parsed.data.jobId, req.log);
|
||||
});
|
||||
}
|
||||
|
||||
@ -0,0 +1,256 @@
|
||||
/**
|
||||
* Tracker ↔ fleet bridge (§10) — in-process round-trip on the memory providers.
|
||||
* Service tests call the bridge directly; route tests use Fastify inject to exercise
|
||||
* the gated auto-echo (FLEET_TRACKER_ECHO, default OFF). Auth + productId are mocked
|
||||
* exactly like the other fleet routes tests; NO live HTTP.
|
||||
*/
|
||||
|
||||
import Fastify, { type FastifyInstance } from 'fastify';
|
||||
import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest';
|
||||
import { MemoryDatastoreProvider } from '@bytelyst/datastore';
|
||||
import { _resetDatastoreProvider, setProvider } from '../../lib/datastore.js';
|
||||
import * as itemsRepo from '../items/repository.js';
|
||||
import * as commentsRepo from '../comments/repository.js';
|
||||
import type { TrackerItemDoc } from '../items/types.js';
|
||||
import * as repo from './repository.js';
|
||||
import * as coordinator from './coordinator.js';
|
||||
import * as bridge from './tracker-bridge.js';
|
||||
import { SubmitJobSchema } from './types.js';
|
||||
|
||||
vi.mock('../../lib/auth.js', () => ({
|
||||
extractAuth: vi.fn(async () => ({ sub: 'user_1', role: 'admin' })),
|
||||
}));
|
||||
vi.mock('../../lib/request-context.js', () => ({
|
||||
getRequestProductId: () => 'lysnrai',
|
||||
}));
|
||||
|
||||
const PID = 'lysnrai';
|
||||
|
||||
function makeItem(over: Partial<TrackerItemDoc> = {}): TrackerItemDoc {
|
||||
const now = new Date().toISOString();
|
||||
return {
|
||||
id: 'IT-1',
|
||||
productId: PID,
|
||||
type: 'task',
|
||||
status: 'open',
|
||||
priority: 'medium',
|
||||
title: 'Fix the widget',
|
||||
description: 'Do A then B.',
|
||||
labels: [],
|
||||
assignee: null,
|
||||
reportedBy: 'user_1',
|
||||
source: 'internal',
|
||||
visibility: 'internal',
|
||||
voteCount: 0,
|
||||
commentCount: 0,
|
||||
priorityOrder: 2,
|
||||
targetRelease: null,
|
||||
createdAt: now,
|
||||
updatedAt: now,
|
||||
...over,
|
||||
};
|
||||
}
|
||||
|
||||
async function buildApp(): Promise<FastifyInstance> {
|
||||
const { fleetRoutes } = await import('./routes.js');
|
||||
const app = Fastify({ logger: false });
|
||||
await app.register(fleetRoutes, { prefix: '/api' });
|
||||
return app;
|
||||
}
|
||||
|
||||
describe('fleet tracker bridge (§10)', () => {
|
||||
beforeEach(() => {
|
||||
setProvider(new MemoryDatastoreProvider());
|
||||
delete process.env.FLEET_TRACKER_ECHO;
|
||||
});
|
||||
afterEach(() => {
|
||||
_resetDatastoreProvider();
|
||||
delete process.env.FLEET_TRACKER_ECHO;
|
||||
vi.restoreAllMocks();
|
||||
});
|
||||
|
||||
// ── ingest ──
|
||||
it('ingest creates exactly one claimable job (trackerItemId, bodyMd, stable key)', async () => {
|
||||
await itemsRepo.create(makeItem());
|
||||
const { job, outcome } = await bridge.ingestItemAsJob(PID, 'IT-1');
|
||||
expect(outcome).toBe('created');
|
||||
expect(job.trackerItemId).toBe('IT-1');
|
||||
expect(job.idempotencyKey).toBe('tracker-IT-1');
|
||||
expect(job.bodyMd).toContain('Do A then B.');
|
||||
expect(job.stage).toBe('queued');
|
||||
|
||||
// claimable through the normal scheduler path (claim unchanged)
|
||||
const claim = await coordinator.claimNextJob({
|
||||
productId: PID,
|
||||
factoryId: 'fac_1',
|
||||
capabilities: [],
|
||||
leaseSeconds: 900,
|
||||
});
|
||||
expect(claim?.job.id).toBe(job.id);
|
||||
});
|
||||
|
||||
it('ingest maps labels → priority + capabilities', async () => {
|
||||
await itemsRepo.create(
|
||||
makeItem({ labels: ['engine-class:agentic-coder', 'priority:high', 'cap:os:mac'] })
|
||||
);
|
||||
const { job } = await bridge.ingestItemAsJob(PID, 'IT-1');
|
||||
expect(job.priority).toBe('high');
|
||||
expect(job.capabilities).toEqual(['os:mac']);
|
||||
expect(job.engineClass).toBe('agentic-coder');
|
||||
});
|
||||
|
||||
it('ingest is idempotent (same Item twice → one job, deduplicated)', async () => {
|
||||
await itemsRepo.create(makeItem());
|
||||
const first = await bridge.ingestItemAsJob(PID, 'IT-1');
|
||||
const second = await bridge.ingestItemAsJob(PID, 'IT-1');
|
||||
expect(second.outcome).toBe('deduplicated');
|
||||
expect(second.job.id).toBe(first.job.id);
|
||||
const all = await repo.findJobsByIdempotencyKey(PID, 'tracker-IT-1');
|
||||
expect(all).toHaveLength(1);
|
||||
});
|
||||
|
||||
it('ingest for a foreign productId → not found (isolation)', async () => {
|
||||
await itemsRepo.create(makeItem());
|
||||
await expect(bridge.ingestItemAsJob('other-product', 'IT-1')).rejects.toThrow(/not found/i);
|
||||
});
|
||||
|
||||
// ── echo ──
|
||||
it('echo round-trip: queued→building→shipped drives Item in_progress→done with metrics-only notes', async () => {
|
||||
await itemsRepo.create(makeItem({ description: 'SECRET-BODY-SENTINEL do the work' }));
|
||||
const { job } = await bridge.ingestItemAsJob(PID, 'IT-1');
|
||||
|
||||
const e1 = await bridge.echoJobToItem(PID, job.id);
|
||||
expect(e1.echoed).toBe('in_progress');
|
||||
expect((await itemsRepo.getById('IT-1'))?.status).toBe('in_progress');
|
||||
|
||||
await repo.updateJob(job.id, PID, { stage: 'shipped' });
|
||||
const e2 = await bridge.echoJobToItem(PID, job.id);
|
||||
expect(e2.echoed).toBe('done');
|
||||
expect((await itemsRepo.getById('IT-1'))?.status).toBe('done');
|
||||
|
||||
// metrics-only comments — never the prompt body
|
||||
const comments = await commentsRepo.listByItem('IT-1');
|
||||
expect(comments.length).toBeGreaterThanOrEqual(2);
|
||||
for (const c of comments) {
|
||||
expect(c.body).not.toContain('SECRET-BODY-SENTINEL');
|
||||
expect(c.body).toContain('attempts=');
|
||||
}
|
||||
});
|
||||
|
||||
it('echo failed → Item wont_fix (the Item vocabulary has no "blocked")', async () => {
|
||||
await itemsRepo.create(makeItem());
|
||||
const { job } = await bridge.ingestItemAsJob(PID, 'IT-1');
|
||||
await repo.updateJob(job.id, PID, { stage: 'failed' });
|
||||
const res = await bridge.echoJobToItem(PID, job.id);
|
||||
expect(res.echoed).toBe('wont_fix');
|
||||
expect((await itemsRepo.getById('IT-1'))?.status).toBe('wont_fix');
|
||||
});
|
||||
|
||||
it('echo is idempotent (re-echo of an unchanged stage → no duplicate Item write)', async () => {
|
||||
await itemsRepo.create(makeItem());
|
||||
const { job } = await bridge.ingestItemAsJob(PID, 'IT-1');
|
||||
await bridge.echoJobToItem(PID, job.id);
|
||||
const after1 = (await itemsRepo.getById('IT-1'))?.commentCount;
|
||||
const res = await bridge.echoJobToItem(PID, job.id);
|
||||
expect(res.echoed).toBe('in_progress');
|
||||
const after2 = (await itemsRepo.getById('IT-1'))?.commentCount;
|
||||
expect(after2).toBe(after1); // no extra comment written
|
||||
expect(await commentsRepo.listByItem('IT-1')).toHaveLength(after1 ?? 0);
|
||||
});
|
||||
|
||||
it('echo is non-fatal: an items-write failure → { echoed: null, error }, job untouched', async () => {
|
||||
await itemsRepo.create(makeItem());
|
||||
const { job } = await bridge.ingestItemAsJob(PID, 'IT-1');
|
||||
vi.spyOn(itemsRepo, 'update').mockRejectedValueOnce(new Error('cosmos down'));
|
||||
const res = await bridge.echoJobToItem(PID, job.id);
|
||||
expect(res.echoed).toBeNull();
|
||||
expect(res.error).toMatch(/cosmos down/);
|
||||
// job state untouched: not marked echoed, still queued
|
||||
const after = await repo.getJob(job.id, PID);
|
||||
expect(after?.trackerEchoedStatus).toBeUndefined();
|
||||
expect(after?.stage).toBe('queued');
|
||||
});
|
||||
|
||||
it('echo no-op for a job with no trackerItemId', async () => {
|
||||
const { job } = await coordinator.submitJob(
|
||||
PID,
|
||||
SubmitJobSchema.parse({ idempotencyKey: 'plain', bodyMd: '# t' })
|
||||
);
|
||||
const res = await bridge.echoJobToItem(PID, job.id);
|
||||
expect(res.echoed).toBeNull();
|
||||
});
|
||||
|
||||
// ── route + flag gating ──
|
||||
it('auto-echo OFF (default flag): a PATCH transition performs ZERO items writes', async () => {
|
||||
await itemsRepo.create(makeItem());
|
||||
const app = await buildApp();
|
||||
const ingest = await app.inject({
|
||||
method: 'POST',
|
||||
url: '/api/fleet/tracker/ingest',
|
||||
payload: { itemId: 'IT-1' },
|
||||
});
|
||||
expect(ingest.statusCode).toBe(201);
|
||||
const jobId = JSON.parse(ingest.body).job.id as string;
|
||||
|
||||
const claim = await app.inject({
|
||||
method: 'POST',
|
||||
url: '/api/fleet/claim',
|
||||
payload: { factoryId: 'fac_1', capabilities: [] },
|
||||
});
|
||||
const leaseEpoch = JSON.parse(claim.body).job.leaseEpoch as number;
|
||||
|
||||
const patch = await app.inject({
|
||||
method: 'PATCH',
|
||||
url: `/api/fleet/jobs/${jobId}`,
|
||||
payload: { stage: 'building', leaseEpoch },
|
||||
});
|
||||
expect(patch.statusCode).toBe(200);
|
||||
// flag OFF ⇒ Item untouched, no comments
|
||||
expect((await itemsRepo.getById('IT-1'))?.status).toBe('open');
|
||||
expect(await commentsRepo.listByItem('IT-1')).toHaveLength(0);
|
||||
});
|
||||
|
||||
it('auto-echo ON (FLEET_TRACKER_ECHO=1): a PATCH transition echoes to the Item', async () => {
|
||||
await itemsRepo.create(makeItem());
|
||||
const app = await buildApp();
|
||||
const ingest = await app.inject({
|
||||
method: 'POST',
|
||||
url: '/api/fleet/tracker/ingest',
|
||||
payload: { itemId: 'IT-1' },
|
||||
});
|
||||
const jobId = JSON.parse(ingest.body).job.id as string;
|
||||
const claim = await app.inject({
|
||||
method: 'POST',
|
||||
url: '/api/fleet/claim',
|
||||
payload: { factoryId: 'fac_1', capabilities: [] },
|
||||
});
|
||||
const leaseEpoch = JSON.parse(claim.body).job.leaseEpoch as number;
|
||||
|
||||
process.env.FLEET_TRACKER_ECHO = '1';
|
||||
const patch = await app.inject({
|
||||
method: 'PATCH',
|
||||
url: `/api/fleet/jobs/${jobId}`,
|
||||
payload: { stage: 'building', leaseEpoch },
|
||||
});
|
||||
expect(patch.statusCode).toBe(200);
|
||||
expect((await itemsRepo.getById('IT-1'))?.status).toBe('in_progress');
|
||||
});
|
||||
|
||||
it('manual echo route returns the echoed status', async () => {
|
||||
await itemsRepo.create(makeItem());
|
||||
const app = await buildApp();
|
||||
const ingest = await app.inject({
|
||||
method: 'POST',
|
||||
url: '/api/fleet/tracker/ingest',
|
||||
payload: { itemId: 'IT-1' },
|
||||
});
|
||||
const jobId = JSON.parse(ingest.body).job.id as string;
|
||||
const echo = await app.inject({
|
||||
method: 'POST',
|
||||
url: '/api/fleet/tracker/echo',
|
||||
payload: { jobId },
|
||||
});
|
||||
expect(echo.statusCode).toBe(200);
|
||||
expect(JSON.parse(echo.body).echoed).toBe('in_progress');
|
||||
});
|
||||
});
|
||||
241
services/platform-service/src/modules/fleet/tracker-bridge.ts
Normal file
241
services/platform-service/src/modules/fleet/tracker-bridge.ts
Normal file
@ -0,0 +1,241 @@
|
||||
/**
|
||||
* Tracker ↔ fleet bridge (§10 / §24.5) — the in-process, no-shell-hop round-trip.
|
||||
*
|
||||
* `ingestItemAsJob` turns a tracker Item into a fleet job submitted through the
|
||||
* coordinator (so it is scheduled by the §7 router like any other job), reusing the
|
||||
* existing `trackerItemId` field + a stable idempotency-key so re-ingest dedupes.
|
||||
*
|
||||
* `echoJobToItem` mirrors the job's lifecycle back onto the Item (status + a
|
||||
* metrics-only comment). Echo is BEST-EFFORT and downstream: an items-write failure
|
||||
* is surfaced as `{ echoed: null, error }` and is NEVER allowed to fail the job.
|
||||
*
|
||||
* This module calls the items + comments + fleet repositories DIRECTLY — no HTTP,
|
||||
* no Fastify types here.
|
||||
*/
|
||||
|
||||
import { randomUUID } from 'node:crypto';
|
||||
import { NotFoundError } from '../../lib/errors.js';
|
||||
import * as itemsRepo from '../items/repository.js';
|
||||
import * as commentsRepo from '../comments/repository.js';
|
||||
import type { ItemStatus, TrackerItemDoc } from '../items/types.js';
|
||||
import * as repo from './repository.js';
|
||||
import * as coordinator from './coordinator.js';
|
||||
import type { SubmitResult } from './coordinator.js';
|
||||
import {
|
||||
FLEET_ENGINE_CLASSES,
|
||||
FLEET_PRIORITIES,
|
||||
SubmitJobSchema,
|
||||
type FleetEngineClass,
|
||||
type FleetJobDoc,
|
||||
type FleetPriority,
|
||||
type FleetStage,
|
||||
} from './types.js';
|
||||
|
||||
/** Minimal logger shape (pino/req.log compatible) — keeps Fastify out of this module. */
|
||||
export interface BridgeLogger {
|
||||
error: (obj: unknown, msg?: string) => void;
|
||||
}
|
||||
|
||||
/** Config flag (default OFF): auto-echo on server-side stage transitions. */
|
||||
export function trackerEchoEnabled(): boolean {
|
||||
const v = (process.env.FLEET_TRACKER_ECHO ?? '').trim().toLowerCase();
|
||||
return v === '1' || v === 'true' || v === 'yes' || v === 'on';
|
||||
}
|
||||
|
||||
// ── Item → job ────────────────────────────────────────────────────────────────
|
||||
|
||||
interface LabelHints {
|
||||
priority?: FleetPriority;
|
||||
engineClass?: FleetEngineClass;
|
||||
profile?: string;
|
||||
capabilities: string[];
|
||||
}
|
||||
|
||||
/** Parse manifest hints carried as Item labels: engine-class:/profile:/priority:/cap:. */
|
||||
function parseLabelHints(labels: string[]): LabelHints {
|
||||
const hints: LabelHints = { capabilities: [] };
|
||||
for (const raw of labels) {
|
||||
const label = raw.trim();
|
||||
if (label.startsWith('engine-class:')) {
|
||||
const v = label.slice('engine-class:'.length);
|
||||
if ((FLEET_ENGINE_CLASSES as readonly string[]).includes(v))
|
||||
hints.engineClass = v as FleetEngineClass;
|
||||
} else if (label.startsWith('profile:')) {
|
||||
hints.profile = label.slice('profile:'.length);
|
||||
} else if (label.startsWith('priority:')) {
|
||||
const v = label.slice('priority:'.length);
|
||||
if ((FLEET_PRIORITIES as readonly string[]).includes(v)) hints.priority = v as FleetPriority;
|
||||
} else if (label.startsWith('cap:')) {
|
||||
const v = label.slice('cap:'.length);
|
||||
if (v) hints.capabilities.push(v);
|
||||
}
|
||||
}
|
||||
return hints;
|
||||
}
|
||||
|
||||
function coerceItemPriority(p: string): FleetPriority | undefined {
|
||||
return (FLEET_PRIORITIES as readonly string[]).includes(p) ? (p as FleetPriority) : undefined;
|
||||
}
|
||||
|
||||
/** Stable, idempotent derived key so re-ingesting the same Item never duplicates. */
|
||||
export function trackerIdempotencyKey(itemId: string): string {
|
||||
return `tracker-${itemId}`;
|
||||
}
|
||||
|
||||
export interface IngestOptions {
|
||||
/** Override the derived priority (else label hint, else the Item's own priority). */
|
||||
priority?: FleetPriority;
|
||||
}
|
||||
|
||||
/**
|
||||
* Read a tracker Item and submit it as a fleet job (idempotent on
|
||||
* `tracker-<itemId>`). Foreign productId / unknown item ⇒ NotFoundError.
|
||||
*/
|
||||
export async function ingestItemAsJob(
|
||||
productId: string,
|
||||
itemId: string,
|
||||
opts?: IngestOptions
|
||||
): Promise<SubmitResult> {
|
||||
const item = await itemsRepo.getById(itemId);
|
||||
if (!item || item.productId !== productId) {
|
||||
throw new NotFoundError(`tracker item '${itemId}' not found`);
|
||||
}
|
||||
|
||||
const hints = parseLabelHints(item.labels);
|
||||
const priority =
|
||||
opts?.priority ?? hints.priority ?? coerceItemPriority(item.priority) ?? 'medium';
|
||||
const description = item.description?.trim() ?? '';
|
||||
const bodyMd = description ? `# ${item.title}\n\n${item.description}` : `# ${item.title}`;
|
||||
|
||||
const input = SubmitJobSchema.parse({
|
||||
idempotencyKey: trackerIdempotencyKey(itemId),
|
||||
bodyMd,
|
||||
priority,
|
||||
capabilities: hints.capabilities,
|
||||
engineClass: hints.engineClass,
|
||||
profile: hints.profile,
|
||||
trackerItemId: itemId,
|
||||
});
|
||||
|
||||
return coordinator.submitJob(productId, input);
|
||||
}
|
||||
|
||||
// ── Job → Item (one-way echo, §24.5) ───────────────────────────────────────────
|
||||
|
||||
export interface EchoResult {
|
||||
echoed: ItemStatus | null;
|
||||
error?: string;
|
||||
}
|
||||
|
||||
/**
|
||||
* Map a fleet stage to a tracker Item status (full lifecycle, both directions).
|
||||
* The Item vocabulary has no `blocked`, so a terminal failure maps to `wont_fix`.
|
||||
*/
|
||||
export function stageToItemStatus(stage: FleetStage): ItemStatus {
|
||||
switch (stage) {
|
||||
case 'shipped':
|
||||
return 'done';
|
||||
case 'failed':
|
||||
case 'dead_letter':
|
||||
return 'wont_fix';
|
||||
default:
|
||||
// queued | blocked | assigned | building | review | testing
|
||||
return 'in_progress';
|
||||
}
|
||||
}
|
||||
|
||||
async function writeMetricsComment(
|
||||
job: FleetJobDoc,
|
||||
item: TrackerItemDoc,
|
||||
status: ItemStatus
|
||||
): Promise<void> {
|
||||
const runs = await repo.listRunsByJob(job.id);
|
||||
const latest = runs[runs.length - 1];
|
||||
// Metrics ONLY — never the prompt body (job.bodyMd) or any secret.
|
||||
const parts: string[] = [
|
||||
`agent-queue fleet: job ${job.id} → ${job.stage} (item status=${status}).`,
|
||||
`attempts=${job.attempts}.`,
|
||||
];
|
||||
if (latest?.startedAt && latest?.endedAt) {
|
||||
const dur = Math.max(
|
||||
0,
|
||||
Math.round((Date.parse(latest.endedAt) - Date.parse(latest.startedAt)) / 1000)
|
||||
);
|
||||
if (Number.isFinite(dur)) parts.push(`duration=${dur}s.`);
|
||||
}
|
||||
const ins = latest?.insights;
|
||||
if (ins) {
|
||||
if (ins.tokensIn !== undefined || ins.tokensOut !== undefined) {
|
||||
parts.push(`tokens=${ins.tokensIn ?? 0}/${ins.tokensOut ?? 0}.`);
|
||||
}
|
||||
if (ins.costUsd !== undefined) parts.push(`cost_usd=${ins.costUsd}.`);
|
||||
}
|
||||
const now = new Date().toISOString();
|
||||
await commentsRepo.create({
|
||||
id: `cmt_${randomUUID()}`,
|
||||
itemId: item.id,
|
||||
productId: item.productId,
|
||||
authorId: 'fleet-coordinator',
|
||||
authorEmail: null,
|
||||
body: parts.join(' '),
|
||||
createdAt: now,
|
||||
updatedAt: now,
|
||||
});
|
||||
await itemsRepo.incrementCommentCount(item.id, 1);
|
||||
}
|
||||
|
||||
/**
|
||||
* Echo a job's current stage onto its linked Item (status + metrics comment).
|
||||
* No trackerItemId ⇒ `{ echoed: null }`. Idempotent: an unchanged outcome is a
|
||||
* no-op. An items-write failure is non-fatal ⇒ `{ echoed: null, error }`.
|
||||
* A missing/foreign job throws NotFoundError (productId isolation).
|
||||
*/
|
||||
export async function echoJobToItem(
|
||||
productId: string,
|
||||
jobId: string,
|
||||
log?: BridgeLogger
|
||||
): Promise<EchoResult> {
|
||||
const job = await repo.getJob(jobId, productId);
|
||||
if (!job) throw new NotFoundError(`fleet job '${jobId}' not found`);
|
||||
if (!job.trackerItemId) return { echoed: null };
|
||||
|
||||
const status = stageToItemStatus(job.stage);
|
||||
if (job.trackerEchoedStatus === status) return { echoed: status };
|
||||
|
||||
try {
|
||||
const item = await itemsRepo.getById(job.trackerItemId);
|
||||
if (!item || item.productId !== productId) {
|
||||
return { echoed: null, error: `linked item '${job.trackerItemId}' not found for product` };
|
||||
}
|
||||
await itemsRepo.update(job.trackerItemId, { status });
|
||||
await writeMetricsComment(job, item, status);
|
||||
await repo.setTrackerEchoedStatus(jobId, productId, status);
|
||||
return { echoed: status };
|
||||
} catch (err) {
|
||||
const message = err instanceof Error ? err.message : String(err);
|
||||
log?.error(
|
||||
{ jobId, itemId: job.trackerItemId, error: message },
|
||||
'fleet tracker echo failed (non-fatal)'
|
||||
);
|
||||
return { echoed: null, error: message };
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Opt-in (FLEET_TRACKER_ECHO) best-effort echo invoked after a server-side stage
|
||||
* transition. Flag OFF ⇒ no-op (zero items writes). Never throws into the caller's
|
||||
* transition path.
|
||||
*/
|
||||
export async function maybeEchoOnTransition(
|
||||
productId: string,
|
||||
jobId: string,
|
||||
log?: BridgeLogger
|
||||
): Promise<void> {
|
||||
if (!trackerEchoEnabled()) return;
|
||||
try {
|
||||
await echoJobToItem(productId, jobId, log);
|
||||
} catch (err) {
|
||||
const message = err instanceof Error ? err.message : String(err);
|
||||
log?.error({ jobId, error: message }, 'fleet tracker auto-echo failed (non-fatal)');
|
||||
}
|
||||
}
|
||||
@ -160,6 +160,12 @@ export const FleetJobDocSchema = z.object({
|
||||
leaseEpoch: z.number().int().nonnegative().default(0),
|
||||
rev: z.number().int().nonnegative().default(0),
|
||||
blockedReason: z.string().optional(),
|
||||
/**
|
||||
* Last Item status echoed to the linked tracker Item (§10 round-trip). Used to
|
||||
* make re-echo of an unchanged outcome a no-op. Optional + provider-managed —
|
||||
* absent on jobs that were never echoed / have no trackerItemId.
|
||||
*/
|
||||
trackerEchoedStatus: z.string().optional(),
|
||||
createdAt: z.string(),
|
||||
updatedAt: z.string(),
|
||||
});
|
||||
@ -392,3 +398,16 @@ export const RevokeTokenSchema = z.object({
|
||||
tokenId: z.string().min(1).optional(),
|
||||
});
|
||||
export type RevokeTokenInput = z.infer<typeof RevokeTokenSchema>;
|
||||
|
||||
/** Tracker bridge (§10): ingest a tracker Item as a fleet job. */
|
||||
export const IngestItemSchema = z.object({
|
||||
itemId: z.string().min(1),
|
||||
priority: z.enum(FLEET_PRIORITIES).optional(),
|
||||
});
|
||||
export type IngestItemInput = z.infer<typeof IngestItemSchema>;
|
||||
|
||||
/** Tracker bridge (§10): manually echo a job's current outcome to its Item. */
|
||||
export const EchoJobSchema = z.object({
|
||||
jobId: z.string().min(1),
|
||||
});
|
||||
export type EchoJobInput = z.infer<typeof EchoJobSchema>;
|
||||
|
||||
Loading…
Reference in New Issue
Block a user