feat(platform): Phase 2 — Agent Runtime Orchestration

- New: agents/executor.ts — full agent execution engine
  - Multi-step pipeline: prompt_assembly → tool_execution → finalize
  - AbortController-based cancellation with in-memory tracking
  - Token usage aggregation across tool calls
  - Review-required tool gating (pauses run, returns review_required)
  - Step event streaming for SSE consumers
- New: agents/tool-registry.ts — global tool registry
  - Register/list/validate tools with risk levels + review flags
- New: agents/executor-routes.ts — 11 endpoints, 14 tests
  - POST /agents/execute, POST /runs/:id/cancel
  - GET /agents/active-runs, GET /runs/:id/children, GET /runs/:id/tree
  - GET /agents/:id/metrics, GET /runs/:id/stream (SSE)
  - GET /tools, POST /tools/validate, POST /agents/:id/schedule
- Enhanced: runs/repository.ts — added listChildRuns() for DAG query
- 1,307 tests passing (14 new)
This commit is contained in:
saravanakumardb1 2026-03-20 03:20:31 -07:00
parent 15e24e5710
commit 84dc348687
6 changed files with 1002 additions and 0 deletions

View File

@ -0,0 +1,267 @@
import Fastify from 'fastify';
import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest';
const executorMock = {
executeAgent: vi.fn(),
cancelAgentRun: vi.fn(),
getActiveAgentRuns: vi.fn(),
onStepEvent: vi.fn(),
};
const toolRegistryMock = {
listTools: vi.fn(),
validateToolBindings: vi.fn(),
};
const runRepoMock = {
getRun: vi.fn(),
listRuns: vi.fn(),
listChildRuns: vi.fn(),
listRunSteps: vi.fn(),
updateRun: vi.fn(),
};
const jobRunnerMock = {
ensureJobDefinitions: vi.fn(),
};
const jobRegistryMock = {
registerJob: vi.fn(),
};
vi.mock('./executor.js', () => executorMock);
vi.mock('./tool-registry.js', () => toolRegistryMock);
vi.mock('../runs/repository.js', () => runRepoMock);
vi.mock('../runs/types.js', () => ({}));
vi.mock('../jobs/runner.js', () => jobRunnerMock);
vi.mock('../jobs/registry.js', () => jobRegistryMock);
async function buildApp(payload?: { sub: string; productId: string; role?: string }) {
const { agentExecutorRoutes } = await import('./executor-routes.js');
const app = Fastify({ logger: false });
if (payload) {
app.addHook('onRequest', async req => {
req.jwtPayload = payload;
});
}
await app.register(agentExecutorRoutes, { prefix: '/api' });
return app;
}
describe('agentExecutorRoutes', () => {
beforeEach(() => {
vi.clearAllMocks();
});
afterEach(() => {
vi.restoreAllMocks();
});
// ── Execute agent ───────────────────────────────────────
it('POST /agents/execute executes an agent', async () => {
executorMock.executeAgent.mockResolvedValue({
runId: 'run_1',
status: 'succeeded',
tokenUsage: { prompt: 100, completion: 50, total: 150 },
stepsCompleted: 3,
durationMs: 500,
});
const app = await buildApp({ sub: 'user_1', productId: 'lysnrai' });
const res = await app.inject({
method: 'POST',
url: '/api/agents/execute',
payload: { agentId: 'agt_1', input: { query: 'hello' } },
});
expect(res.statusCode).toBe(200);
expect(executorMock.executeAgent).toHaveBeenCalledWith(
expect.objectContaining({ agentId: 'agt_1', userId: 'user_1' })
);
});
it('POST /agents/execute rejects invalid input', async () => {
const app = await buildApp({ sub: 'user_1', productId: 'lysnrai' });
const res = await app.inject({
method: 'POST',
url: '/api/agents/execute',
payload: {},
});
expect(res.statusCode).toBe(400);
});
it('POST /agents/execute rejects unauthenticated', async () => {
const app = await buildApp();
const res = await app.inject({
method: 'POST',
url: '/api/agents/execute',
payload: { agentId: 'agt_1' },
});
expect(res.statusCode).toBe(403);
});
// ── Cancel run ──────────────────────────────────────────
it('POST /runs/:id/cancel cancels an active run', async () => {
executorMock.cancelAgentRun.mockReturnValue(true);
const app = await buildApp({ sub: 'user_1', productId: 'lysnrai' });
const res = await app.inject({ method: 'POST', url: '/api/runs/run_1/cancel' });
expect(res.statusCode).toBe(200);
expect(JSON.parse(res.body)).toEqual({ cancelled: true, runId: 'run_1' });
});
it('POST /runs/:id/cancel falls back to DB update', async () => {
executorMock.cancelAgentRun.mockReturnValue(false);
runRepoMock.updateRun.mockResolvedValue({ id: 'run_1', status: 'cancelled' });
const app = await buildApp({ sub: 'user_1', productId: 'lysnrai' });
const res = await app.inject({ method: 'POST', url: '/api/runs/run_1/cancel' });
expect(res.statusCode).toBe(200);
expect(runRepoMock.updateRun).toHaveBeenCalledWith(
'run_1',
'lysnrai',
expect.objectContaining({ status: 'cancelled' })
);
});
// ── Active runs ─────────────────────────────────────────
it('GET /agents/active-runs returns active runs', async () => {
executorMock.getActiveAgentRuns.mockReturnValue(['run_1', 'run_2']);
const app = await buildApp({ sub: 'admin_1', productId: 'lysnrai', role: 'admin' });
const res = await app.inject({ method: 'GET', url: '/api/agents/active-runs' });
expect(res.statusCode).toBe(200);
expect(JSON.parse(res.body)).toEqual({ activeRuns: ['run_1', 'run_2'] });
});
it('GET /agents/active-runs requires admin', async () => {
const app = await buildApp({ sub: 'user_1', productId: 'lysnrai', role: 'viewer' });
const res = await app.inject({ method: 'GET', url: '/api/agents/active-runs' });
expect(res.statusCode).toBe(403);
});
// ── DAG / children ──────────────────────────────────────
it('GET /runs/:id/children returns child runs', async () => {
runRepoMock.listChildRuns.mockResolvedValue([{ id: 'run_child_1', parentRunId: 'run_1' }]);
const app = await buildApp({ sub: 'user_1', productId: 'lysnrai' });
const res = await app.inject({ method: 'GET', url: '/api/runs/run_1/children' });
expect(res.statusCode).toBe(200);
expect(runRepoMock.listChildRuns).toHaveBeenCalledWith('lysnrai', 'run_1');
});
it('GET /runs/:id/tree returns full run tree', async () => {
runRepoMock.getRun.mockResolvedValue({ id: 'run_1', status: 'succeeded' });
runRepoMock.listChildRuns.mockResolvedValue([{ id: 'run_child_1' }]);
runRepoMock.listRunSteps.mockResolvedValue([]);
const app = await buildApp({ sub: 'user_1', productId: 'lysnrai' });
const res = await app.inject({ method: 'GET', url: '/api/runs/run_1/tree' });
expect(res.statusCode).toBe(200);
const body = JSON.parse(res.body);
expect(body.run.id).toBe('run_1');
expect(body.children).toHaveLength(1);
});
// ── Agent metrics ───────────────────────────────────────
it('GET /agents/:id/metrics computes agent metrics', async () => {
runRepoMock.listRuns.mockResolvedValue([
{
id: 'run_1',
status: 'succeeded',
metadata: { agentId: 'agt_1' },
output: { tokenUsage: { prompt: 100, completion: 50, total: 150 }, durationMs: 500 },
createdAt: '2026-01-01',
},
{
id: 'run_2',
status: 'failed',
metadata: { agentId: 'agt_1' },
createdAt: '2026-01-02',
},
{
id: 'run_3',
status: 'succeeded',
metadata: { agentId: 'other_agent' },
createdAt: '2026-01-02',
},
]);
const app = await buildApp({ sub: 'admin_1', productId: 'lysnrai', role: 'admin' });
const res = await app.inject({ method: 'GET', url: '/api/agents/agt_1/metrics' });
expect(res.statusCode).toBe(200);
const body = JSON.parse(res.body);
expect(body.totalRuns).toBe(2);
expect(body.succeeded).toBe(1);
expect(body.failed).toBe(1);
expect(body.successRate).toBe(50);
expect(body.tokenUsage.total).toBe(150);
});
// ── Tool registry ───────────────────────────────────────
it('GET /tools lists registered tools', async () => {
toolRegistryMock.listTools.mockReturnValue([{ name: 'search', description: 'Search the web' }]);
const app = await buildApp({ sub: 'admin_1', productId: 'lysnrai', role: 'admin' });
const res = await app.inject({ method: 'GET', url: '/api/tools' });
expect(res.statusCode).toBe(200);
expect(JSON.parse(res.body)).toHaveLength(1);
});
it('POST /tools/validate validates tool bindings', async () => {
toolRegistryMock.validateToolBindings.mockReturnValue({
valid: ['search'],
invalid: ['unknown_tool'],
});
const app = await buildApp({ sub: 'admin_1', productId: 'lysnrai', role: 'admin' });
const res = await app.inject({
method: 'POST',
url: '/api/tools/validate',
payload: { bindings: ['search', 'unknown_tool'] },
});
expect(res.statusCode).toBe(200);
expect(JSON.parse(res.body).invalid).toContain('unknown_tool');
});
// ── Agent scheduling ────────────────────────────────────
it('POST /agents/:id/schedule creates a scheduled job', async () => {
jobRunnerMock.ensureJobDefinitions.mockResolvedValue(undefined);
jobRegistryMock.registerJob.mockReturnValue(undefined);
const app = await buildApp({ sub: 'admin_1', productId: 'lysnrai', role: 'admin' });
const res = await app.inject({
method: 'POST',
url: '/api/agents/agt_1/schedule',
payload: { cronExpression: '0 */6 * * *', input: { mode: 'daily' } },
});
expect(res.statusCode).toBe(200);
expect(JSON.parse(res.body)).toEqual({
scheduled: true,
jobName: 'agent:agt_1',
cronExpression: '0 */6 * * *',
});
expect(jobRegistryMock.registerJob).toHaveBeenCalledWith('agent:agt_1', expect.any(Function));
});
it('POST /agents/:id/schedule requires cronExpression', async () => {
const app = await buildApp({ sub: 'admin_1', productId: 'lysnrai', role: 'admin' });
const res = await app.inject({
method: 'POST',
url: '/api/agents/agt_1/schedule',
payload: {},
});
expect(res.statusCode).toBe(400);
});
});

View File

@ -0,0 +1,280 @@
import { z } from 'zod';
import type { FastifyInstance } from 'fastify';
import { BadRequestError, ForbiddenError, NotFoundError } from '../../lib/errors.js';
import { executeAgent, cancelAgentRun, getActiveAgentRuns, onStepEvent } from './executor.js';
import { listTools, validateToolBindings } from './tool-registry.js';
import type { RunDoc } from '../runs/types.js';
import * as runRepo from '../runs/repository.js';
const ExecuteAgentSchema = z.object({
agentId: z.string().min(1),
input: z.record(z.unknown()).default({}),
parentRunId: z.string().optional(),
});
function requireAuth(req: { jwtPayload?: { sub?: string; role?: string; productId?: string } }): {
userId: string;
productId: string;
} {
const payload = req.jwtPayload;
if (!payload?.sub) throw new ForbiddenError('Authentication required');
return {
userId: payload.sub,
productId: payload.productId ?? process.env.DEFAULT_PRODUCT_ID ?? 'lysnrai',
};
}
function requireAdmin(req: { jwtPayload?: { sub?: string; role?: string; productId?: string } }): {
userId: string;
productId: string;
} {
const access = requireAuth(req);
const role = req.jwtPayload?.role;
if (!role || !['super_admin', 'admin'].includes(role)) {
throw new ForbiddenError('Admin access required');
}
return access;
}
export async function agentExecutorRoutes(app: FastifyInstance) {
// ── Execute agent ─────────────────────────────────────────
app.post('/agents/execute', async req => {
const access = requireAuth(req);
const parsed = ExecuteAgentSchema.safeParse(req.body);
if (!parsed.success) {
throw new BadRequestError(parsed.error.issues.map(i => i.message).join('; '));
}
return executeAgent({
agentId: parsed.data.agentId,
productId: access.productId,
userId: access.userId,
input: parsed.data.input,
parentRunId: parsed.data.parentRunId,
triggeredBy: access.userId,
});
});
// ── Cancel run ────────────────────────────────────────────
app.post('/runs/:id/cancel', async req => {
const access = requireAuth(req);
const { id } = req.params as { id: string };
// Try in-memory cancel first (active agent runs)
if (cancelAgentRun(id)) {
return { cancelled: true, runId: id };
}
// Fall back to DB update for non-active runs
try {
await runRepo.updateRun(id, access.productId, {
status: 'cancelled',
completedAt: new Date().toISOString(),
error: 'Cancelled by user',
});
return { cancelled: true, runId: id };
} catch {
throw new NotFoundError(`Run '${id}' not found or already completed`);
}
});
// ── Active agent runs ─────────────────────────────────────
app.get('/agents/active-runs', async req => {
requireAdmin(req);
return { activeRuns: getActiveAgentRuns() };
});
// ── Parent-child run DAG ──────────────────────────────────
app.get('/runs/:id/children', async req => {
const access = requireAuth(req);
const { id } = req.params as { id: string };
return runRepo.listChildRuns(access.productId, id);
});
app.get('/runs/:id/tree', async req => {
const access = requireAuth(req);
const { id } = req.params as { id: string };
// Build a tree: get root run + all descendants
const rootRun = await runRepo.getRun(id, access.productId);
const children = await runRepo.listChildRuns(access.productId, id);
const steps = await runRepo.listRunSteps(access.productId, id);
return {
run: rootRun,
steps,
children: await Promise.all(
children.map(async (child: RunDoc) => ({
run: child,
steps: await runRepo.listRunSteps(access.productId, child.id),
}))
),
};
});
// ── Agent metrics ─────────────────────────────────────────
app.get('/agents/:id/metrics', async req => {
const access = requireAdmin(req);
const { id } = req.params as { id: string };
const query = req.query as { since?: string; limit?: string };
const runs = await runRepo.listRuns(access.productId, {
kind: 'agent',
limit: Math.min(parseInt(query.limit ?? '100', 10), 500),
});
// Filter by agentId (stored in metadata)
const agentRuns = runs.filter(
r => r.metadata && (r.metadata as Record<string, unknown>).agentId === id
);
// Filter by since if provided
const filteredRuns = query.since
? agentRuns.filter(r => r.createdAt >= query.since!)
: agentRuns;
const succeeded = filteredRuns.filter(r => r.status === 'succeeded');
const failed = filteredRuns.filter(r => r.status === 'failed');
const cancelled = filteredRuns.filter(r => r.status === 'cancelled');
// Compute token usage from output metadata
let totalTokens = 0;
let totalPromptTokens = 0;
let totalCompletionTokens = 0;
for (const run of succeeded) {
const output = run.output as Record<string, unknown> | undefined;
if (output?.tokenUsage) {
const usage = output.tokenUsage as { prompt?: number; completion?: number; total?: number };
totalPromptTokens += usage.prompt ?? 0;
totalCompletionTokens += usage.completion ?? 0;
totalTokens += usage.total ?? 0;
}
}
// Compute average duration
const durations = succeeded
.map(r => {
const out = r.output as Record<string, unknown> | undefined;
return (out?.durationMs as number | undefined) ?? 0;
})
.filter(d => d > 0);
const avgDurationMs =
durations.length > 0
? Math.round(durations.reduce((a, b) => a + b, 0) / durations.length)
: 0;
return {
agentId: id,
totalRuns: filteredRuns.length,
succeeded: succeeded.length,
failed: failed.length,
cancelled: cancelled.length,
successRate:
filteredRuns.length > 0 ? Math.round((succeeded.length / filteredRuns.length) * 100) : 0,
avgDurationMs,
tokenUsage: {
prompt: totalPromptTokens,
completion: totalCompletionTokens,
total: totalTokens,
},
};
});
// ── Run step streaming (SSE) ──────────────────────────────
app.get('/runs/:id/stream', async (req, reply) => {
requireAuth(req);
const { id } = req.params as { id: string };
reply.raw.writeHead(200, {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
Connection: 'keep-alive',
});
const unsubscribe = onStepEvent(id, event => {
reply.raw.write(`data: ${JSON.stringify(event)}\n\n`);
});
// Send initial keepalive
reply.raw.write(`data: ${JSON.stringify({ type: 'connected', runId: id })}\n\n`);
req.raw.on('close', () => {
unsubscribe();
});
// Don't end the response — keep it open for SSE
await new Promise(() => {});
});
// ── Tool registry ─────────────────────────────────────────
app.get('/tools', async req => {
requireAdmin(req);
return listTools();
});
app.post('/tools/validate', async req => {
requireAdmin(req);
const body = req.body as { bindings?: string[] };
if (!body.bindings || !Array.isArray(body.bindings)) {
throw new BadRequestError('bindings array is required');
}
return validateToolBindings(body.bindings);
});
// ── Agent scheduling ──────────────────────────────────────
app.post('/agents/:id/schedule', async req => {
const access = requireAdmin(req);
const { id } = req.params as { id: string };
const body = req.body as {
cronExpression?: string;
input?: Record<string, unknown>;
enabled?: boolean;
};
if (!body.cronExpression) {
throw new BadRequestError('cronExpression is required');
}
// Register as a job definition
const { ensureJobDefinitions } = await import('../jobs/runner.js');
const { registerJob } = await import('../jobs/registry.js');
const jobName = `agent:${id}`;
registerJob(jobName, async ctx => {
const result = await executeAgent({
agentId: id,
productId: access.productId,
userId: ctx.productId, // system-triggered
input: body.input ?? {},
triggeredBy: 'scheduler',
});
return {
success: result.status === 'succeeded',
message: result.status,
metrics: {
tokenUsage: result.tokenUsage,
durationMs: result.durationMs,
},
};
});
await ensureJobDefinitions([
{
name: jobName,
cron: body.cronExpression,
description: `Scheduled execution of agent ${id}`,
},
]);
return { scheduled: true, jobName, cronExpression: body.cronExpression };
});
}

View File

@ -0,0 +1,357 @@
import { randomUUID } from 'node:crypto';
import * as runTracker from '../runs/tracker.js';
import * as agentRepo from './repository.js';
import {
getToolsForBindings,
type ToolExecutionContext,
type ToolExecutionResult,
} from './tool-registry.js';
import { bus } from '../../lib/event-bus.js';
// ── Types ────────────────────────────────────────────────────
export interface ExecuteAgentInput {
agentId: string;
productId: string;
userId: string;
input: Record<string, unknown>;
parentRunId?: string;
triggeredBy?: string;
}
export interface ExecuteAgentResult {
runId: string;
status: 'succeeded' | 'failed' | 'cancelled' | 'review_required';
output?: Record<string, unknown>;
error?: string;
tokenUsage: { prompt: number; completion: number; total: number };
stepsCompleted: number;
durationMs: number;
}
export interface StepEvent {
runId: string;
stepName: string;
status: string;
output?: unknown;
error?: string;
tokenUsage?: { prompt: number; completion: number; total: number };
}
type StepListener = (event: StepEvent) => void;
// ── Active Runs ──────────────────────────────────────────────
const activeRuns = new Map<string, AbortController>();
const stepListeners = new Map<string, Set<StepListener>>();
export function getActiveAgentRuns(): string[] {
return Array.from(activeRuns.keys());
}
export function cancelAgentRun(runId: string): boolean {
const controller = activeRuns.get(runId);
if (!controller) return false;
controller.abort();
return true;
}
export function onStepEvent(runId: string, listener: StepListener): () => void {
const listeners = stepListeners.get(runId) ?? new Set();
listeners.add(listener);
stepListeners.set(runId, listeners);
return () => {
listeners.delete(listener);
if (listeners.size === 0) stepListeners.delete(runId);
};
}
function emitStepEvent(event: StepEvent): void {
const listeners = stepListeners.get(event.runId);
if (listeners) {
for (const listener of listeners) {
try {
listener(event);
} catch {
// best-effort
}
}
}
}
// ── Executor ─────────────────────────────────────────────────
export async function executeAgent(input: ExecuteAgentInput): Promise<ExecuteAgentResult> {
const runId = `run_${randomUUID()}`;
const startTime = Date.now();
const controller = new AbortController();
activeRuns.set(runId, controller);
const tokenUsage = { prompt: 0, completion: 0, total: 0 };
let stepsCompleted = 0;
try {
// Get agent + published version
const agent = await agentRepo.getAgent(input.agentId, input.productId);
const version = await agentRepo.getPublishedVersion(input.agentId);
if (!version) {
throw new Error(`No published version found for agent '${input.agentId}'`);
}
// Start run tracking
await runTracker.startRun({
id: runId,
productId: input.productId,
kind: 'agent',
name: agent.name,
source: `platform.agents.${agent.key}`,
triggeredBy: input.triggeredBy ?? input.userId,
parentRunId: input.parentRunId,
input: input.input,
metadata: {
agentId: input.agentId,
agentKey: agent.key,
versionId: version.id,
versionNumber: version.version,
},
});
// Resolve tool bindings
const boundTools = getToolsForBindings(version.toolBindings);
const toolContext: ToolExecutionContext = {
runId,
productId: input.productId,
agentId: input.agentId,
userId: input.userId,
};
// Step 1: Prompt assembly
if (controller.signal.aborted) throw new Error('Run cancelled');
await runTracker.startRunStep({
runId,
productId: input.productId,
stepName: 'prompt_assembly',
order: 1,
input: { toolCount: boundTools.length },
});
const assembledPrompt = assemblePrompt(
version,
input.input,
boundTools.map(t => t.definition)
);
await runTracker.completeRunStep(runId, input.productId, 'prompt_assembly', {
promptLength: assembledPrompt.length,
toolsBound: boundTools.map(t => t.definition.name),
});
stepsCompleted++;
emitStepEvent({ runId, stepName: 'prompt_assembly', status: 'succeeded' });
// Step 2: Tool execution (if tools are bound)
if (boundTools.length > 0 && controller.signal.aborted === false) {
await runTracker.startRunStep({
runId,
productId: input.productId,
stepName: 'tool_execution',
order: 2,
});
const toolResults: Array<{ tool: string; result: ToolExecutionResult }> = [];
for (const tool of boundTools) {
if (controller.signal.aborted) break;
// Check if tool requires review
if (tool.definition.requiresReview) {
emitStepEvent({
runId,
stepName: 'tool_execution',
status: 'review_required',
output: { tool: tool.definition.name },
});
// Mark run as needing review — caller should handle creating a review item
await runTracker.failRunStep(
runId,
input.productId,
'tool_execution',
`Tool '${tool.definition.name}' requires human review`
);
await runTracker.failRun(
runId,
input.productId,
`Paused: tool '${tool.definition.name}' requires review`,
{ status: 'review_required' }
);
activeRuns.delete(runId);
return {
runId,
status: 'review_required',
error: `Tool '${tool.definition.name}' requires human review`,
tokenUsage,
stepsCompleted,
durationMs: Date.now() - startTime,
};
}
try {
const result = await tool.execute(input.input, toolContext);
toolResults.push({ tool: tool.definition.name, result });
if (result.tokenUsage) {
tokenUsage.prompt += result.tokenUsage.prompt;
tokenUsage.completion += result.tokenUsage.completion;
tokenUsage.total += result.tokenUsage.total;
}
} catch (err: unknown) {
toolResults.push({
tool: tool.definition.name,
result: {
success: false,
error: err instanceof Error ? err.message : String(err),
},
});
}
}
const failedTools = toolResults.filter(r => !r.result.success);
if (failedTools.length > 0 && !controller.signal.aborted) {
await runTracker.failRunStep(
runId,
input.productId,
'tool_execution',
`${failedTools.length} tool(s) failed: ${failedTools.map(t => t.tool).join(', ')}`
);
} else if (!controller.signal.aborted) {
await runTracker.completeRunStep(runId, input.productId, 'tool_execution', {
toolResults: toolResults.map(r => ({
tool: r.tool,
success: r.result.success,
})),
});
}
stepsCompleted++;
emitStepEvent({
runId,
stepName: 'tool_execution',
status: failedTools.length > 0 ? 'partial_failure' : 'succeeded',
tokenUsage: { ...tokenUsage },
});
}
// Step 3: Finalize
if (controller.signal.aborted) throw new Error('Run cancelled');
await runTracker.startRunStep({
runId,
productId: input.productId,
stepName: 'finalize',
order: 3,
});
const output = {
agentKey: agent.key,
versionNumber: version.version,
tokenUsage,
stepsCompleted: stepsCompleted + 1,
};
await runTracker.completeRunStep(runId, input.productId, 'finalize', output);
stepsCompleted++;
emitStepEvent({ runId, stepName: 'finalize', status: 'succeeded', tokenUsage });
// Complete the run
await runTracker.completeRun(runId, input.productId, {
...output,
durationMs: Date.now() - startTime,
});
// Emit event for downstream consumption
try {
await bus.emit('job.completed', {
jobName: `agent:${agent.key}`,
runId,
durationMs: Date.now() - startTime,
productId: input.productId,
});
} catch {
// best-effort
}
activeRuns.delete(runId);
return {
runId,
status: 'succeeded',
output,
tokenUsage,
stepsCompleted,
durationMs: Date.now() - startTime,
};
} catch (err: unknown) {
const errorMsg = err instanceof Error ? err.message : String(err);
const isCancelled = controller.signal.aborted;
try {
if (isCancelled) {
await runTracker.failRun(runId, input.productId, 'Run cancelled by user', {
cancelledAt: new Date().toISOString(),
});
} else {
await runTracker.failRun(runId, input.productId, errorMsg);
}
} catch {
// best-effort
}
try {
await bus.emit('job.failed', {
jobName: `agent:${input.agentId}`,
runId,
error: errorMsg,
productId: input.productId,
});
} catch {
// best-effort
}
activeRuns.delete(runId);
return {
runId,
status: isCancelled ? 'cancelled' : 'failed',
error: errorMsg,
tokenUsage,
stepsCompleted,
durationMs: Date.now() - startTime,
};
}
}
// ── Prompt Assembly ──────────────────────────────────────────
function assemblePrompt(
version: {
prompt: string;
systemInstructions?: string;
modelPolicy?: { maxTokens?: number } | null;
},
input: Record<string, unknown>,
tools: Array<{ name: string; description: string }>
): string {
const parts: string[] = [];
if (version.systemInstructions) {
parts.push(`[System]\n${version.systemInstructions}`);
}
parts.push(`[Prompt]\n${version.prompt}`);
if (tools.length > 0) {
const toolList = tools.map(t => `- ${t.name}: ${t.description}`).join('\n');
parts.push(`[Available Tools]\n${toolList}`);
}
if (Object.keys(input).length > 0) {
parts.push(`[Input]\n${JSON.stringify(input, null, 2)}`);
}
return parts.join('\n\n');
}

View File

@ -0,0 +1,87 @@
import { z } from 'zod';
// ── Tool Definition ─────────────────────────────────────────
// Tools are registered globally and bound to agent versions via toolBindings[].
export const ToolParameterSchema = z.object({
name: z.string().min(1),
type: z.enum(['string', 'number', 'boolean', 'object', 'array']),
description: z.string().optional(),
required: z.boolean().default(false),
});
export const ToolDefinitionSchema = z.object({
name: z.string().min(1),
description: z.string().min(1),
parameters: z.array(ToolParameterSchema).default([]),
requiresReview: z.boolean().default(false),
riskLevel: z.enum(['low', 'medium', 'high', 'critical']).default('low'),
});
export type ToolDefinition = z.infer<typeof ToolDefinitionSchema>;
export type ToolExecuteFn = (
params: Record<string, unknown>,
context: ToolExecutionContext
) => Promise<ToolExecutionResult>;
export interface ToolExecutionContext {
runId: string;
productId: string;
agentId: string;
userId: string;
}
export interface ToolExecutionResult {
success: boolean;
output?: unknown;
error?: string;
tokenUsage?: { prompt: number; completion: number; total: number };
}
interface RegisteredTool {
definition: ToolDefinition;
execute: ToolExecuteFn;
}
// ── Global Tool Registry ────────────────────────────────────
const tools = new Map<string, RegisteredTool>();
export function registerTool(definition: ToolDefinition, execute: ToolExecuteFn): void {
tools.set(definition.name, { definition, execute });
}
export function getTool(name: string): RegisteredTool | undefined {
return tools.get(name);
}
export function listTools(): ToolDefinition[] {
return Array.from(tools.values()).map(t => t.definition);
}
export function getToolsForBindings(bindings: string[]): RegisteredTool[] {
return bindings.map(name => tools.get(name)).filter((t): t is RegisteredTool => t !== undefined);
}
export function clearTools(): void {
tools.clear();
}
// ── Validate tool bindings ──────────────────────────────────
export function validateToolBindings(bindings: string[]): {
valid: string[];
invalid: string[];
} {
const valid: string[] = [];
const invalid: string[] = [];
for (const name of bindings) {
if (tools.has(name)) {
valid.push(name);
} else {
invalid.push(name);
}
}
return { valid, invalid };
}

View File

@ -63,6 +63,14 @@ export async function updateRunStep(
return updated; return updated;
} }
export async function listChildRuns(productId: string, parentRunId: string): Promise<RunDoc[]> {
return runsCollection().findMany({
filter: { productId, parentRunId },
sort: { createdAt: -1 },
limit: 100,
});
}
export async function listRunSteps(productId: string, runId: string): Promise<RunStepDoc[]> { export async function listRunSteps(productId: string, runId: string): Promise<RunStepDoc[]> {
return stepsCollection().findMany({ return stepsCollection().findMany({
filter: { filter: {

View File

@ -108,6 +108,7 @@ import { registerDeliverySubscribers } from './modules/delivery/subscribers.js';
import { verifyToken } from './modules/auth/jwt.js'; import { verifyToken } from './modules/auth/jwt.js';
import { registerOptionalApiKeyContext } from './lib/api-key-auth.js'; import { registerOptionalApiKeyContext } from './lib/api-key-auth.js';
import { eventSubscriptionRoutes } from './modules/event-subscriptions/routes.js'; import { eventSubscriptionRoutes } from './modules/event-subscriptions/routes.js';
import { agentExecutorRoutes } from './modules/agents/executor-routes.js';
import { startEventBus, stopEventBus } from './lib/event-bus.js'; import { startEventBus, stopEventBus } from './lib/event-bus.js';
import { wireDispatcherToBus } from './lib/event-dispatcher.js'; import { wireDispatcherToBus } from './lib/event-dispatcher.js';
@ -265,6 +266,8 @@ await app.register(apiVersioningRoutes, { prefix: '/api' });
// Event subscriptions + DLQ + replay // Event subscriptions + DLQ + replay
await app.register(eventSubscriptionRoutes, { prefix: '/api' }); await app.register(eventSubscriptionRoutes, { prefix: '/api' });
// Agent executor + tool registry + scheduling + metrics
await app.register(agentExecutorRoutes, { prefix: '/api' });
// Register event bus subscribers // Register event bus subscribers
registerDiagnosticsSubscribers(app.log); registerDiagnosticsSubscribers(app.log);