feat(platform): Phase 2 — Agent Runtime Orchestration
- New: agents/executor.ts — full agent execution engine - Multi-step pipeline: prompt_assembly → tool_execution → finalize - AbortController-based cancellation with in-memory tracking - Token usage aggregation across tool calls - Review-required tool gating (pauses run, returns review_required) - Step event streaming for SSE consumers - New: agents/tool-registry.ts — global tool registry - Register/list/validate tools with risk levels + review flags - New: agents/executor-routes.ts — 11 endpoints, 14 tests - POST /agents/execute, POST /runs/:id/cancel - GET /agents/active-runs, GET /runs/:id/children, GET /runs/:id/tree - GET /agents/:id/metrics, GET /runs/:id/stream (SSE) - GET /tools, POST /tools/validate, POST /agents/:id/schedule - Enhanced: runs/repository.ts — added listChildRuns() for DAG query - 1,307 tests passing (14 new)
This commit is contained in:
parent
15e24e5710
commit
84dc348687
@ -0,0 +1,267 @@
|
||||
import Fastify from 'fastify';
|
||||
import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest';
|
||||
|
||||
const executorMock = {
|
||||
executeAgent: vi.fn(),
|
||||
cancelAgentRun: vi.fn(),
|
||||
getActiveAgentRuns: vi.fn(),
|
||||
onStepEvent: vi.fn(),
|
||||
};
|
||||
|
||||
const toolRegistryMock = {
|
||||
listTools: vi.fn(),
|
||||
validateToolBindings: vi.fn(),
|
||||
};
|
||||
|
||||
const runRepoMock = {
|
||||
getRun: vi.fn(),
|
||||
listRuns: vi.fn(),
|
||||
listChildRuns: vi.fn(),
|
||||
listRunSteps: vi.fn(),
|
||||
updateRun: vi.fn(),
|
||||
};
|
||||
|
||||
const jobRunnerMock = {
|
||||
ensureJobDefinitions: vi.fn(),
|
||||
};
|
||||
|
||||
const jobRegistryMock = {
|
||||
registerJob: vi.fn(),
|
||||
};
|
||||
|
||||
vi.mock('./executor.js', () => executorMock);
|
||||
vi.mock('./tool-registry.js', () => toolRegistryMock);
|
||||
vi.mock('../runs/repository.js', () => runRepoMock);
|
||||
vi.mock('../runs/types.js', () => ({}));
|
||||
vi.mock('../jobs/runner.js', () => jobRunnerMock);
|
||||
vi.mock('../jobs/registry.js', () => jobRegistryMock);
|
||||
|
||||
async function buildApp(payload?: { sub: string; productId: string; role?: string }) {
|
||||
const { agentExecutorRoutes } = await import('./executor-routes.js');
|
||||
const app = Fastify({ logger: false });
|
||||
if (payload) {
|
||||
app.addHook('onRequest', async req => {
|
||||
req.jwtPayload = payload;
|
||||
});
|
||||
}
|
||||
await app.register(agentExecutorRoutes, { prefix: '/api' });
|
||||
return app;
|
||||
}
|
||||
|
||||
describe('agentExecutorRoutes', () => {
|
||||
beforeEach(() => {
|
||||
vi.clearAllMocks();
|
||||
});
|
||||
|
||||
afterEach(() => {
|
||||
vi.restoreAllMocks();
|
||||
});
|
||||
|
||||
// ── Execute agent ───────────────────────────────────────
|
||||
|
||||
it('POST /agents/execute executes an agent', async () => {
|
||||
executorMock.executeAgent.mockResolvedValue({
|
||||
runId: 'run_1',
|
||||
status: 'succeeded',
|
||||
tokenUsage: { prompt: 100, completion: 50, total: 150 },
|
||||
stepsCompleted: 3,
|
||||
durationMs: 500,
|
||||
});
|
||||
const app = await buildApp({ sub: 'user_1', productId: 'lysnrai' });
|
||||
|
||||
const res = await app.inject({
|
||||
method: 'POST',
|
||||
url: '/api/agents/execute',
|
||||
payload: { agentId: 'agt_1', input: { query: 'hello' } },
|
||||
});
|
||||
|
||||
expect(res.statusCode).toBe(200);
|
||||
expect(executorMock.executeAgent).toHaveBeenCalledWith(
|
||||
expect.objectContaining({ agentId: 'agt_1', userId: 'user_1' })
|
||||
);
|
||||
});
|
||||
|
||||
it('POST /agents/execute rejects invalid input', async () => {
|
||||
const app = await buildApp({ sub: 'user_1', productId: 'lysnrai' });
|
||||
|
||||
const res = await app.inject({
|
||||
method: 'POST',
|
||||
url: '/api/agents/execute',
|
||||
payload: {},
|
||||
});
|
||||
|
||||
expect(res.statusCode).toBe(400);
|
||||
});
|
||||
|
||||
it('POST /agents/execute rejects unauthenticated', async () => {
|
||||
const app = await buildApp();
|
||||
const res = await app.inject({
|
||||
method: 'POST',
|
||||
url: '/api/agents/execute',
|
||||
payload: { agentId: 'agt_1' },
|
||||
});
|
||||
expect(res.statusCode).toBe(403);
|
||||
});
|
||||
|
||||
// ── Cancel run ──────────────────────────────────────────
|
||||
|
||||
it('POST /runs/:id/cancel cancels an active run', async () => {
|
||||
executorMock.cancelAgentRun.mockReturnValue(true);
|
||||
const app = await buildApp({ sub: 'user_1', productId: 'lysnrai' });
|
||||
|
||||
const res = await app.inject({ method: 'POST', url: '/api/runs/run_1/cancel' });
|
||||
expect(res.statusCode).toBe(200);
|
||||
expect(JSON.parse(res.body)).toEqual({ cancelled: true, runId: 'run_1' });
|
||||
});
|
||||
|
||||
it('POST /runs/:id/cancel falls back to DB update', async () => {
|
||||
executorMock.cancelAgentRun.mockReturnValue(false);
|
||||
runRepoMock.updateRun.mockResolvedValue({ id: 'run_1', status: 'cancelled' });
|
||||
const app = await buildApp({ sub: 'user_1', productId: 'lysnrai' });
|
||||
|
||||
const res = await app.inject({ method: 'POST', url: '/api/runs/run_1/cancel' });
|
||||
expect(res.statusCode).toBe(200);
|
||||
expect(runRepoMock.updateRun).toHaveBeenCalledWith(
|
||||
'run_1',
|
||||
'lysnrai',
|
||||
expect.objectContaining({ status: 'cancelled' })
|
||||
);
|
||||
});
|
||||
|
||||
// ── Active runs ─────────────────────────────────────────
|
||||
|
||||
it('GET /agents/active-runs returns active runs', async () => {
|
||||
executorMock.getActiveAgentRuns.mockReturnValue(['run_1', 'run_2']);
|
||||
const app = await buildApp({ sub: 'admin_1', productId: 'lysnrai', role: 'admin' });
|
||||
|
||||
const res = await app.inject({ method: 'GET', url: '/api/agents/active-runs' });
|
||||
expect(res.statusCode).toBe(200);
|
||||
expect(JSON.parse(res.body)).toEqual({ activeRuns: ['run_1', 'run_2'] });
|
||||
});
|
||||
|
||||
it('GET /agents/active-runs requires admin', async () => {
|
||||
const app = await buildApp({ sub: 'user_1', productId: 'lysnrai', role: 'viewer' });
|
||||
const res = await app.inject({ method: 'GET', url: '/api/agents/active-runs' });
|
||||
expect(res.statusCode).toBe(403);
|
||||
});
|
||||
|
||||
// ── DAG / children ──────────────────────────────────────
|
||||
|
||||
it('GET /runs/:id/children returns child runs', async () => {
|
||||
runRepoMock.listChildRuns.mockResolvedValue([{ id: 'run_child_1', parentRunId: 'run_1' }]);
|
||||
const app = await buildApp({ sub: 'user_1', productId: 'lysnrai' });
|
||||
|
||||
const res = await app.inject({ method: 'GET', url: '/api/runs/run_1/children' });
|
||||
expect(res.statusCode).toBe(200);
|
||||
expect(runRepoMock.listChildRuns).toHaveBeenCalledWith('lysnrai', 'run_1');
|
||||
});
|
||||
|
||||
it('GET /runs/:id/tree returns full run tree', async () => {
|
||||
runRepoMock.getRun.mockResolvedValue({ id: 'run_1', status: 'succeeded' });
|
||||
runRepoMock.listChildRuns.mockResolvedValue([{ id: 'run_child_1' }]);
|
||||
runRepoMock.listRunSteps.mockResolvedValue([]);
|
||||
const app = await buildApp({ sub: 'user_1', productId: 'lysnrai' });
|
||||
|
||||
const res = await app.inject({ method: 'GET', url: '/api/runs/run_1/tree' });
|
||||
expect(res.statusCode).toBe(200);
|
||||
const body = JSON.parse(res.body);
|
||||
expect(body.run.id).toBe('run_1');
|
||||
expect(body.children).toHaveLength(1);
|
||||
});
|
||||
|
||||
// ── Agent metrics ───────────────────────────────────────
|
||||
|
||||
it('GET /agents/:id/metrics computes agent metrics', async () => {
|
||||
runRepoMock.listRuns.mockResolvedValue([
|
||||
{
|
||||
id: 'run_1',
|
||||
status: 'succeeded',
|
||||
metadata: { agentId: 'agt_1' },
|
||||
output: { tokenUsage: { prompt: 100, completion: 50, total: 150 }, durationMs: 500 },
|
||||
createdAt: '2026-01-01',
|
||||
},
|
||||
{
|
||||
id: 'run_2',
|
||||
status: 'failed',
|
||||
metadata: { agentId: 'agt_1' },
|
||||
createdAt: '2026-01-02',
|
||||
},
|
||||
{
|
||||
id: 'run_3',
|
||||
status: 'succeeded',
|
||||
metadata: { agentId: 'other_agent' },
|
||||
createdAt: '2026-01-02',
|
||||
},
|
||||
]);
|
||||
const app = await buildApp({ sub: 'admin_1', productId: 'lysnrai', role: 'admin' });
|
||||
|
||||
const res = await app.inject({ method: 'GET', url: '/api/agents/agt_1/metrics' });
|
||||
expect(res.statusCode).toBe(200);
|
||||
const body = JSON.parse(res.body);
|
||||
expect(body.totalRuns).toBe(2);
|
||||
expect(body.succeeded).toBe(1);
|
||||
expect(body.failed).toBe(1);
|
||||
expect(body.successRate).toBe(50);
|
||||
expect(body.tokenUsage.total).toBe(150);
|
||||
});
|
||||
|
||||
// ── Tool registry ───────────────────────────────────────
|
||||
|
||||
it('GET /tools lists registered tools', async () => {
|
||||
toolRegistryMock.listTools.mockReturnValue([{ name: 'search', description: 'Search the web' }]);
|
||||
const app = await buildApp({ sub: 'admin_1', productId: 'lysnrai', role: 'admin' });
|
||||
|
||||
const res = await app.inject({ method: 'GET', url: '/api/tools' });
|
||||
expect(res.statusCode).toBe(200);
|
||||
expect(JSON.parse(res.body)).toHaveLength(1);
|
||||
});
|
||||
|
||||
it('POST /tools/validate validates tool bindings', async () => {
|
||||
toolRegistryMock.validateToolBindings.mockReturnValue({
|
||||
valid: ['search'],
|
||||
invalid: ['unknown_tool'],
|
||||
});
|
||||
const app = await buildApp({ sub: 'admin_1', productId: 'lysnrai', role: 'admin' });
|
||||
|
||||
const res = await app.inject({
|
||||
method: 'POST',
|
||||
url: '/api/tools/validate',
|
||||
payload: { bindings: ['search', 'unknown_tool'] },
|
||||
});
|
||||
expect(res.statusCode).toBe(200);
|
||||
expect(JSON.parse(res.body).invalid).toContain('unknown_tool');
|
||||
});
|
||||
|
||||
// ── Agent scheduling ────────────────────────────────────
|
||||
|
||||
it('POST /agents/:id/schedule creates a scheduled job', async () => {
|
||||
jobRunnerMock.ensureJobDefinitions.mockResolvedValue(undefined);
|
||||
jobRegistryMock.registerJob.mockReturnValue(undefined);
|
||||
const app = await buildApp({ sub: 'admin_1', productId: 'lysnrai', role: 'admin' });
|
||||
|
||||
const res = await app.inject({
|
||||
method: 'POST',
|
||||
url: '/api/agents/agt_1/schedule',
|
||||
payload: { cronExpression: '0 */6 * * *', input: { mode: 'daily' } },
|
||||
});
|
||||
|
||||
expect(res.statusCode).toBe(200);
|
||||
expect(JSON.parse(res.body)).toEqual({
|
||||
scheduled: true,
|
||||
jobName: 'agent:agt_1',
|
||||
cronExpression: '0 */6 * * *',
|
||||
});
|
||||
expect(jobRegistryMock.registerJob).toHaveBeenCalledWith('agent:agt_1', expect.any(Function));
|
||||
});
|
||||
|
||||
it('POST /agents/:id/schedule requires cronExpression', async () => {
|
||||
const app = await buildApp({ sub: 'admin_1', productId: 'lysnrai', role: 'admin' });
|
||||
|
||||
const res = await app.inject({
|
||||
method: 'POST',
|
||||
url: '/api/agents/agt_1/schedule',
|
||||
payload: {},
|
||||
});
|
||||
expect(res.statusCode).toBe(400);
|
||||
});
|
||||
});
|
||||
280
services/platform-service/src/modules/agents/executor-routes.ts
Normal file
280
services/platform-service/src/modules/agents/executor-routes.ts
Normal file
@ -0,0 +1,280 @@
|
||||
import { z } from 'zod';
|
||||
import type { FastifyInstance } from 'fastify';
|
||||
import { BadRequestError, ForbiddenError, NotFoundError } from '../../lib/errors.js';
|
||||
import { executeAgent, cancelAgentRun, getActiveAgentRuns, onStepEvent } from './executor.js';
|
||||
import { listTools, validateToolBindings } from './tool-registry.js';
|
||||
import type { RunDoc } from '../runs/types.js';
|
||||
import * as runRepo from '../runs/repository.js';
|
||||
|
||||
const ExecuteAgentSchema = z.object({
|
||||
agentId: z.string().min(1),
|
||||
input: z.record(z.unknown()).default({}),
|
||||
parentRunId: z.string().optional(),
|
||||
});
|
||||
|
||||
function requireAuth(req: { jwtPayload?: { sub?: string; role?: string; productId?: string } }): {
|
||||
userId: string;
|
||||
productId: string;
|
||||
} {
|
||||
const payload = req.jwtPayload;
|
||||
if (!payload?.sub) throw new ForbiddenError('Authentication required');
|
||||
return {
|
||||
userId: payload.sub,
|
||||
productId: payload.productId ?? process.env.DEFAULT_PRODUCT_ID ?? 'lysnrai',
|
||||
};
|
||||
}
|
||||
|
||||
function requireAdmin(req: { jwtPayload?: { sub?: string; role?: string; productId?: string } }): {
|
||||
userId: string;
|
||||
productId: string;
|
||||
} {
|
||||
const access = requireAuth(req);
|
||||
const role = req.jwtPayload?.role;
|
||||
if (!role || !['super_admin', 'admin'].includes(role)) {
|
||||
throw new ForbiddenError('Admin access required');
|
||||
}
|
||||
return access;
|
||||
}
|
||||
|
||||
export async function agentExecutorRoutes(app: FastifyInstance) {
|
||||
// ── Execute agent ─────────────────────────────────────────
|
||||
|
||||
app.post('/agents/execute', async req => {
|
||||
const access = requireAuth(req);
|
||||
const parsed = ExecuteAgentSchema.safeParse(req.body);
|
||||
if (!parsed.success) {
|
||||
throw new BadRequestError(parsed.error.issues.map(i => i.message).join('; '));
|
||||
}
|
||||
|
||||
return executeAgent({
|
||||
agentId: parsed.data.agentId,
|
||||
productId: access.productId,
|
||||
userId: access.userId,
|
||||
input: parsed.data.input,
|
||||
parentRunId: parsed.data.parentRunId,
|
||||
triggeredBy: access.userId,
|
||||
});
|
||||
});
|
||||
|
||||
// ── Cancel run ────────────────────────────────────────────
|
||||
|
||||
app.post('/runs/:id/cancel', async req => {
|
||||
const access = requireAuth(req);
|
||||
const { id } = req.params as { id: string };
|
||||
|
||||
// Try in-memory cancel first (active agent runs)
|
||||
if (cancelAgentRun(id)) {
|
||||
return { cancelled: true, runId: id };
|
||||
}
|
||||
|
||||
// Fall back to DB update for non-active runs
|
||||
try {
|
||||
await runRepo.updateRun(id, access.productId, {
|
||||
status: 'cancelled',
|
||||
completedAt: new Date().toISOString(),
|
||||
error: 'Cancelled by user',
|
||||
});
|
||||
return { cancelled: true, runId: id };
|
||||
} catch {
|
||||
throw new NotFoundError(`Run '${id}' not found or already completed`);
|
||||
}
|
||||
});
|
||||
|
||||
// ── Active agent runs ─────────────────────────────────────
|
||||
|
||||
app.get('/agents/active-runs', async req => {
|
||||
requireAdmin(req);
|
||||
return { activeRuns: getActiveAgentRuns() };
|
||||
});
|
||||
|
||||
// ── Parent-child run DAG ──────────────────────────────────
|
||||
|
||||
app.get('/runs/:id/children', async req => {
|
||||
const access = requireAuth(req);
|
||||
const { id } = req.params as { id: string };
|
||||
return runRepo.listChildRuns(access.productId, id);
|
||||
});
|
||||
|
||||
app.get('/runs/:id/tree', async req => {
|
||||
const access = requireAuth(req);
|
||||
const { id } = req.params as { id: string };
|
||||
|
||||
// Build a tree: get root run + all descendants
|
||||
const rootRun = await runRepo.getRun(id, access.productId);
|
||||
const children = await runRepo.listChildRuns(access.productId, id);
|
||||
const steps = await runRepo.listRunSteps(access.productId, id);
|
||||
|
||||
return {
|
||||
run: rootRun,
|
||||
steps,
|
||||
children: await Promise.all(
|
||||
children.map(async (child: RunDoc) => ({
|
||||
run: child,
|
||||
steps: await runRepo.listRunSteps(access.productId, child.id),
|
||||
}))
|
||||
),
|
||||
};
|
||||
});
|
||||
|
||||
// ── Agent metrics ─────────────────────────────────────────
|
||||
|
||||
app.get('/agents/:id/metrics', async req => {
|
||||
const access = requireAdmin(req);
|
||||
const { id } = req.params as { id: string };
|
||||
const query = req.query as { since?: string; limit?: string };
|
||||
|
||||
const runs = await runRepo.listRuns(access.productId, {
|
||||
kind: 'agent',
|
||||
limit: Math.min(parseInt(query.limit ?? '100', 10), 500),
|
||||
});
|
||||
|
||||
// Filter by agentId (stored in metadata)
|
||||
const agentRuns = runs.filter(
|
||||
r => r.metadata && (r.metadata as Record<string, unknown>).agentId === id
|
||||
);
|
||||
|
||||
// Filter by since if provided
|
||||
const filteredRuns = query.since
|
||||
? agentRuns.filter(r => r.createdAt >= query.since!)
|
||||
: agentRuns;
|
||||
|
||||
const succeeded = filteredRuns.filter(r => r.status === 'succeeded');
|
||||
const failed = filteredRuns.filter(r => r.status === 'failed');
|
||||
const cancelled = filteredRuns.filter(r => r.status === 'cancelled');
|
||||
|
||||
// Compute token usage from output metadata
|
||||
let totalTokens = 0;
|
||||
let totalPromptTokens = 0;
|
||||
let totalCompletionTokens = 0;
|
||||
for (const run of succeeded) {
|
||||
const output = run.output as Record<string, unknown> | undefined;
|
||||
if (output?.tokenUsage) {
|
||||
const usage = output.tokenUsage as { prompt?: number; completion?: number; total?: number };
|
||||
totalPromptTokens += usage.prompt ?? 0;
|
||||
totalCompletionTokens += usage.completion ?? 0;
|
||||
totalTokens += usage.total ?? 0;
|
||||
}
|
||||
}
|
||||
|
||||
// Compute average duration
|
||||
const durations = succeeded
|
||||
.map(r => {
|
||||
const out = r.output as Record<string, unknown> | undefined;
|
||||
return (out?.durationMs as number | undefined) ?? 0;
|
||||
})
|
||||
.filter(d => d > 0);
|
||||
|
||||
const avgDurationMs =
|
||||
durations.length > 0
|
||||
? Math.round(durations.reduce((a, b) => a + b, 0) / durations.length)
|
||||
: 0;
|
||||
|
||||
return {
|
||||
agentId: id,
|
||||
totalRuns: filteredRuns.length,
|
||||
succeeded: succeeded.length,
|
||||
failed: failed.length,
|
||||
cancelled: cancelled.length,
|
||||
successRate:
|
||||
filteredRuns.length > 0 ? Math.round((succeeded.length / filteredRuns.length) * 100) : 0,
|
||||
avgDurationMs,
|
||||
tokenUsage: {
|
||||
prompt: totalPromptTokens,
|
||||
completion: totalCompletionTokens,
|
||||
total: totalTokens,
|
||||
},
|
||||
};
|
||||
});
|
||||
|
||||
// ── Run step streaming (SSE) ──────────────────────────────
|
||||
|
||||
app.get('/runs/:id/stream', async (req, reply) => {
|
||||
requireAuth(req);
|
||||
const { id } = req.params as { id: string };
|
||||
|
||||
reply.raw.writeHead(200, {
|
||||
'Content-Type': 'text/event-stream',
|
||||
'Cache-Control': 'no-cache',
|
||||
Connection: 'keep-alive',
|
||||
});
|
||||
|
||||
const unsubscribe = onStepEvent(id, event => {
|
||||
reply.raw.write(`data: ${JSON.stringify(event)}\n\n`);
|
||||
});
|
||||
|
||||
// Send initial keepalive
|
||||
reply.raw.write(`data: ${JSON.stringify({ type: 'connected', runId: id })}\n\n`);
|
||||
|
||||
req.raw.on('close', () => {
|
||||
unsubscribe();
|
||||
});
|
||||
|
||||
// Don't end the response — keep it open for SSE
|
||||
await new Promise(() => {});
|
||||
});
|
||||
|
||||
// ── Tool registry ─────────────────────────────────────────
|
||||
|
||||
app.get('/tools', async req => {
|
||||
requireAdmin(req);
|
||||
return listTools();
|
||||
});
|
||||
|
||||
app.post('/tools/validate', async req => {
|
||||
requireAdmin(req);
|
||||
const body = req.body as { bindings?: string[] };
|
||||
if (!body.bindings || !Array.isArray(body.bindings)) {
|
||||
throw new BadRequestError('bindings array is required');
|
||||
}
|
||||
return validateToolBindings(body.bindings);
|
||||
});
|
||||
|
||||
// ── Agent scheduling ──────────────────────────────────────
|
||||
|
||||
app.post('/agents/:id/schedule', async req => {
|
||||
const access = requireAdmin(req);
|
||||
const { id } = req.params as { id: string };
|
||||
const body = req.body as {
|
||||
cronExpression?: string;
|
||||
input?: Record<string, unknown>;
|
||||
enabled?: boolean;
|
||||
};
|
||||
|
||||
if (!body.cronExpression) {
|
||||
throw new BadRequestError('cronExpression is required');
|
||||
}
|
||||
|
||||
// Register as a job definition
|
||||
const { ensureJobDefinitions } = await import('../jobs/runner.js');
|
||||
const { registerJob } = await import('../jobs/registry.js');
|
||||
|
||||
const jobName = `agent:${id}`;
|
||||
registerJob(jobName, async ctx => {
|
||||
const result = await executeAgent({
|
||||
agentId: id,
|
||||
productId: access.productId,
|
||||
userId: ctx.productId, // system-triggered
|
||||
input: body.input ?? {},
|
||||
triggeredBy: 'scheduler',
|
||||
});
|
||||
return {
|
||||
success: result.status === 'succeeded',
|
||||
message: result.status,
|
||||
metrics: {
|
||||
tokenUsage: result.tokenUsage,
|
||||
durationMs: result.durationMs,
|
||||
},
|
||||
};
|
||||
});
|
||||
|
||||
await ensureJobDefinitions([
|
||||
{
|
||||
name: jobName,
|
||||
cron: body.cronExpression,
|
||||
description: `Scheduled execution of agent ${id}`,
|
||||
},
|
||||
]);
|
||||
|
||||
return { scheduled: true, jobName, cronExpression: body.cronExpression };
|
||||
});
|
||||
}
|
||||
357
services/platform-service/src/modules/agents/executor.ts
Normal file
357
services/platform-service/src/modules/agents/executor.ts
Normal file
@ -0,0 +1,357 @@
|
||||
import { randomUUID } from 'node:crypto';
|
||||
import * as runTracker from '../runs/tracker.js';
|
||||
import * as agentRepo from './repository.js';
|
||||
import {
|
||||
getToolsForBindings,
|
||||
type ToolExecutionContext,
|
||||
type ToolExecutionResult,
|
||||
} from './tool-registry.js';
|
||||
import { bus } from '../../lib/event-bus.js';
|
||||
|
||||
// ── Types ────────────────────────────────────────────────────
|
||||
|
||||
export interface ExecuteAgentInput {
|
||||
agentId: string;
|
||||
productId: string;
|
||||
userId: string;
|
||||
input: Record<string, unknown>;
|
||||
parentRunId?: string;
|
||||
triggeredBy?: string;
|
||||
}
|
||||
|
||||
export interface ExecuteAgentResult {
|
||||
runId: string;
|
||||
status: 'succeeded' | 'failed' | 'cancelled' | 'review_required';
|
||||
output?: Record<string, unknown>;
|
||||
error?: string;
|
||||
tokenUsage: { prompt: number; completion: number; total: number };
|
||||
stepsCompleted: number;
|
||||
durationMs: number;
|
||||
}
|
||||
|
||||
export interface StepEvent {
|
||||
runId: string;
|
||||
stepName: string;
|
||||
status: string;
|
||||
output?: unknown;
|
||||
error?: string;
|
||||
tokenUsage?: { prompt: number; completion: number; total: number };
|
||||
}
|
||||
|
||||
type StepListener = (event: StepEvent) => void;
|
||||
|
||||
// ── Active Runs ──────────────────────────────────────────────
|
||||
|
||||
const activeRuns = new Map<string, AbortController>();
|
||||
const stepListeners = new Map<string, Set<StepListener>>();
|
||||
|
||||
export function getActiveAgentRuns(): string[] {
|
||||
return Array.from(activeRuns.keys());
|
||||
}
|
||||
|
||||
export function cancelAgentRun(runId: string): boolean {
|
||||
const controller = activeRuns.get(runId);
|
||||
if (!controller) return false;
|
||||
controller.abort();
|
||||
return true;
|
||||
}
|
||||
|
||||
export function onStepEvent(runId: string, listener: StepListener): () => void {
|
||||
const listeners = stepListeners.get(runId) ?? new Set();
|
||||
listeners.add(listener);
|
||||
stepListeners.set(runId, listeners);
|
||||
return () => {
|
||||
listeners.delete(listener);
|
||||
if (listeners.size === 0) stepListeners.delete(runId);
|
||||
};
|
||||
}
|
||||
|
||||
function emitStepEvent(event: StepEvent): void {
|
||||
const listeners = stepListeners.get(event.runId);
|
||||
if (listeners) {
|
||||
for (const listener of listeners) {
|
||||
try {
|
||||
listener(event);
|
||||
} catch {
|
||||
// best-effort
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// ── Executor ─────────────────────────────────────────────────
|
||||
|
||||
export async function executeAgent(input: ExecuteAgentInput): Promise<ExecuteAgentResult> {
|
||||
const runId = `run_${randomUUID()}`;
|
||||
const startTime = Date.now();
|
||||
const controller = new AbortController();
|
||||
activeRuns.set(runId, controller);
|
||||
|
||||
const tokenUsage = { prompt: 0, completion: 0, total: 0 };
|
||||
let stepsCompleted = 0;
|
||||
|
||||
try {
|
||||
// Get agent + published version
|
||||
const agent = await agentRepo.getAgent(input.agentId, input.productId);
|
||||
const version = await agentRepo.getPublishedVersion(input.agentId);
|
||||
if (!version) {
|
||||
throw new Error(`No published version found for agent '${input.agentId}'`);
|
||||
}
|
||||
|
||||
// Start run tracking
|
||||
await runTracker.startRun({
|
||||
id: runId,
|
||||
productId: input.productId,
|
||||
kind: 'agent',
|
||||
name: agent.name,
|
||||
source: `platform.agents.${agent.key}`,
|
||||
triggeredBy: input.triggeredBy ?? input.userId,
|
||||
parentRunId: input.parentRunId,
|
||||
input: input.input,
|
||||
metadata: {
|
||||
agentId: input.agentId,
|
||||
agentKey: agent.key,
|
||||
versionId: version.id,
|
||||
versionNumber: version.version,
|
||||
},
|
||||
});
|
||||
|
||||
// Resolve tool bindings
|
||||
const boundTools = getToolsForBindings(version.toolBindings);
|
||||
const toolContext: ToolExecutionContext = {
|
||||
runId,
|
||||
productId: input.productId,
|
||||
agentId: input.agentId,
|
||||
userId: input.userId,
|
||||
};
|
||||
|
||||
// Step 1: Prompt assembly
|
||||
if (controller.signal.aborted) throw new Error('Run cancelled');
|
||||
await runTracker.startRunStep({
|
||||
runId,
|
||||
productId: input.productId,
|
||||
stepName: 'prompt_assembly',
|
||||
order: 1,
|
||||
input: { toolCount: boundTools.length },
|
||||
});
|
||||
|
||||
const assembledPrompt = assemblePrompt(
|
||||
version,
|
||||
input.input,
|
||||
boundTools.map(t => t.definition)
|
||||
);
|
||||
await runTracker.completeRunStep(runId, input.productId, 'prompt_assembly', {
|
||||
promptLength: assembledPrompt.length,
|
||||
toolsBound: boundTools.map(t => t.definition.name),
|
||||
});
|
||||
stepsCompleted++;
|
||||
emitStepEvent({ runId, stepName: 'prompt_assembly', status: 'succeeded' });
|
||||
|
||||
// Step 2: Tool execution (if tools are bound)
|
||||
if (boundTools.length > 0 && controller.signal.aborted === false) {
|
||||
await runTracker.startRunStep({
|
||||
runId,
|
||||
productId: input.productId,
|
||||
stepName: 'tool_execution',
|
||||
order: 2,
|
||||
});
|
||||
|
||||
const toolResults: Array<{ tool: string; result: ToolExecutionResult }> = [];
|
||||
for (const tool of boundTools) {
|
||||
if (controller.signal.aborted) break;
|
||||
|
||||
// Check if tool requires review
|
||||
if (tool.definition.requiresReview) {
|
||||
emitStepEvent({
|
||||
runId,
|
||||
stepName: 'tool_execution',
|
||||
status: 'review_required',
|
||||
output: { tool: tool.definition.name },
|
||||
});
|
||||
|
||||
// Mark run as needing review — caller should handle creating a review item
|
||||
await runTracker.failRunStep(
|
||||
runId,
|
||||
input.productId,
|
||||
'tool_execution',
|
||||
`Tool '${tool.definition.name}' requires human review`
|
||||
);
|
||||
await runTracker.failRun(
|
||||
runId,
|
||||
input.productId,
|
||||
`Paused: tool '${tool.definition.name}' requires review`,
|
||||
{ status: 'review_required' }
|
||||
);
|
||||
|
||||
activeRuns.delete(runId);
|
||||
return {
|
||||
runId,
|
||||
status: 'review_required',
|
||||
error: `Tool '${tool.definition.name}' requires human review`,
|
||||
tokenUsage,
|
||||
stepsCompleted,
|
||||
durationMs: Date.now() - startTime,
|
||||
};
|
||||
}
|
||||
|
||||
try {
|
||||
const result = await tool.execute(input.input, toolContext);
|
||||
toolResults.push({ tool: tool.definition.name, result });
|
||||
if (result.tokenUsage) {
|
||||
tokenUsage.prompt += result.tokenUsage.prompt;
|
||||
tokenUsage.completion += result.tokenUsage.completion;
|
||||
tokenUsage.total += result.tokenUsage.total;
|
||||
}
|
||||
} catch (err: unknown) {
|
||||
toolResults.push({
|
||||
tool: tool.definition.name,
|
||||
result: {
|
||||
success: false,
|
||||
error: err instanceof Error ? err.message : String(err),
|
||||
},
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
const failedTools = toolResults.filter(r => !r.result.success);
|
||||
if (failedTools.length > 0 && !controller.signal.aborted) {
|
||||
await runTracker.failRunStep(
|
||||
runId,
|
||||
input.productId,
|
||||
'tool_execution',
|
||||
`${failedTools.length} tool(s) failed: ${failedTools.map(t => t.tool).join(', ')}`
|
||||
);
|
||||
} else if (!controller.signal.aborted) {
|
||||
await runTracker.completeRunStep(runId, input.productId, 'tool_execution', {
|
||||
toolResults: toolResults.map(r => ({
|
||||
tool: r.tool,
|
||||
success: r.result.success,
|
||||
})),
|
||||
});
|
||||
}
|
||||
|
||||
stepsCompleted++;
|
||||
emitStepEvent({
|
||||
runId,
|
||||
stepName: 'tool_execution',
|
||||
status: failedTools.length > 0 ? 'partial_failure' : 'succeeded',
|
||||
tokenUsage: { ...tokenUsage },
|
||||
});
|
||||
}
|
||||
|
||||
// Step 3: Finalize
|
||||
if (controller.signal.aborted) throw new Error('Run cancelled');
|
||||
await runTracker.startRunStep({
|
||||
runId,
|
||||
productId: input.productId,
|
||||
stepName: 'finalize',
|
||||
order: 3,
|
||||
});
|
||||
|
||||
const output = {
|
||||
agentKey: agent.key,
|
||||
versionNumber: version.version,
|
||||
tokenUsage,
|
||||
stepsCompleted: stepsCompleted + 1,
|
||||
};
|
||||
|
||||
await runTracker.completeRunStep(runId, input.productId, 'finalize', output);
|
||||
stepsCompleted++;
|
||||
emitStepEvent({ runId, stepName: 'finalize', status: 'succeeded', tokenUsage });
|
||||
|
||||
// Complete the run
|
||||
await runTracker.completeRun(runId, input.productId, {
|
||||
...output,
|
||||
durationMs: Date.now() - startTime,
|
||||
});
|
||||
|
||||
// Emit event for downstream consumption
|
||||
try {
|
||||
await bus.emit('job.completed', {
|
||||
jobName: `agent:${agent.key}`,
|
||||
runId,
|
||||
durationMs: Date.now() - startTime,
|
||||
productId: input.productId,
|
||||
});
|
||||
} catch {
|
||||
// best-effort
|
||||
}
|
||||
|
||||
activeRuns.delete(runId);
|
||||
return {
|
||||
runId,
|
||||
status: 'succeeded',
|
||||
output,
|
||||
tokenUsage,
|
||||
stepsCompleted,
|
||||
durationMs: Date.now() - startTime,
|
||||
};
|
||||
} catch (err: unknown) {
|
||||
const errorMsg = err instanceof Error ? err.message : String(err);
|
||||
const isCancelled = controller.signal.aborted;
|
||||
|
||||
try {
|
||||
if (isCancelled) {
|
||||
await runTracker.failRun(runId, input.productId, 'Run cancelled by user', {
|
||||
cancelledAt: new Date().toISOString(),
|
||||
});
|
||||
} else {
|
||||
await runTracker.failRun(runId, input.productId, errorMsg);
|
||||
}
|
||||
} catch {
|
||||
// best-effort
|
||||
}
|
||||
|
||||
try {
|
||||
await bus.emit('job.failed', {
|
||||
jobName: `agent:${input.agentId}`,
|
||||
runId,
|
||||
error: errorMsg,
|
||||
productId: input.productId,
|
||||
});
|
||||
} catch {
|
||||
// best-effort
|
||||
}
|
||||
|
||||
activeRuns.delete(runId);
|
||||
return {
|
||||
runId,
|
||||
status: isCancelled ? 'cancelled' : 'failed',
|
||||
error: errorMsg,
|
||||
tokenUsage,
|
||||
stepsCompleted,
|
||||
durationMs: Date.now() - startTime,
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
// ── Prompt Assembly ──────────────────────────────────────────
|
||||
|
||||
function assemblePrompt(
|
||||
version: {
|
||||
prompt: string;
|
||||
systemInstructions?: string;
|
||||
modelPolicy?: { maxTokens?: number } | null;
|
||||
},
|
||||
input: Record<string, unknown>,
|
||||
tools: Array<{ name: string; description: string }>
|
||||
): string {
|
||||
const parts: string[] = [];
|
||||
|
||||
if (version.systemInstructions) {
|
||||
parts.push(`[System]\n${version.systemInstructions}`);
|
||||
}
|
||||
|
||||
parts.push(`[Prompt]\n${version.prompt}`);
|
||||
|
||||
if (tools.length > 0) {
|
||||
const toolList = tools.map(t => `- ${t.name}: ${t.description}`).join('\n');
|
||||
parts.push(`[Available Tools]\n${toolList}`);
|
||||
}
|
||||
|
||||
if (Object.keys(input).length > 0) {
|
||||
parts.push(`[Input]\n${JSON.stringify(input, null, 2)}`);
|
||||
}
|
||||
|
||||
return parts.join('\n\n');
|
||||
}
|
||||
@ -0,0 +1,87 @@
|
||||
import { z } from 'zod';
|
||||
|
||||
// ── Tool Definition ─────────────────────────────────────────
|
||||
// Tools are registered globally and bound to agent versions via toolBindings[].
|
||||
|
||||
export const ToolParameterSchema = z.object({
|
||||
name: z.string().min(1),
|
||||
type: z.enum(['string', 'number', 'boolean', 'object', 'array']),
|
||||
description: z.string().optional(),
|
||||
required: z.boolean().default(false),
|
||||
});
|
||||
|
||||
export const ToolDefinitionSchema = z.object({
|
||||
name: z.string().min(1),
|
||||
description: z.string().min(1),
|
||||
parameters: z.array(ToolParameterSchema).default([]),
|
||||
requiresReview: z.boolean().default(false),
|
||||
riskLevel: z.enum(['low', 'medium', 'high', 'critical']).default('low'),
|
||||
});
|
||||
|
||||
export type ToolDefinition = z.infer<typeof ToolDefinitionSchema>;
|
||||
|
||||
export type ToolExecuteFn = (
|
||||
params: Record<string, unknown>,
|
||||
context: ToolExecutionContext
|
||||
) => Promise<ToolExecutionResult>;
|
||||
|
||||
export interface ToolExecutionContext {
|
||||
runId: string;
|
||||
productId: string;
|
||||
agentId: string;
|
||||
userId: string;
|
||||
}
|
||||
|
||||
export interface ToolExecutionResult {
|
||||
success: boolean;
|
||||
output?: unknown;
|
||||
error?: string;
|
||||
tokenUsage?: { prompt: number; completion: number; total: number };
|
||||
}
|
||||
|
||||
interface RegisteredTool {
|
||||
definition: ToolDefinition;
|
||||
execute: ToolExecuteFn;
|
||||
}
|
||||
|
||||
// ── Global Tool Registry ────────────────────────────────────
|
||||
|
||||
const tools = new Map<string, RegisteredTool>();
|
||||
|
||||
export function registerTool(definition: ToolDefinition, execute: ToolExecuteFn): void {
|
||||
tools.set(definition.name, { definition, execute });
|
||||
}
|
||||
|
||||
export function getTool(name: string): RegisteredTool | undefined {
|
||||
return tools.get(name);
|
||||
}
|
||||
|
||||
export function listTools(): ToolDefinition[] {
|
||||
return Array.from(tools.values()).map(t => t.definition);
|
||||
}
|
||||
|
||||
export function getToolsForBindings(bindings: string[]): RegisteredTool[] {
|
||||
return bindings.map(name => tools.get(name)).filter((t): t is RegisteredTool => t !== undefined);
|
||||
}
|
||||
|
||||
export function clearTools(): void {
|
||||
tools.clear();
|
||||
}
|
||||
|
||||
// ── Validate tool bindings ──────────────────────────────────
|
||||
|
||||
export function validateToolBindings(bindings: string[]): {
|
||||
valid: string[];
|
||||
invalid: string[];
|
||||
} {
|
||||
const valid: string[] = [];
|
||||
const invalid: string[] = [];
|
||||
for (const name of bindings) {
|
||||
if (tools.has(name)) {
|
||||
valid.push(name);
|
||||
} else {
|
||||
invalid.push(name);
|
||||
}
|
||||
}
|
||||
return { valid, invalid };
|
||||
}
|
||||
@ -63,6 +63,14 @@ export async function updateRunStep(
|
||||
return updated;
|
||||
}
|
||||
|
||||
export async function listChildRuns(productId: string, parentRunId: string): Promise<RunDoc[]> {
|
||||
return runsCollection().findMany({
|
||||
filter: { productId, parentRunId },
|
||||
sort: { createdAt: -1 },
|
||||
limit: 100,
|
||||
});
|
||||
}
|
||||
|
||||
export async function listRunSteps(productId: string, runId: string): Promise<RunStepDoc[]> {
|
||||
return stepsCollection().findMany({
|
||||
filter: {
|
||||
|
||||
@ -108,6 +108,7 @@ import { registerDeliverySubscribers } from './modules/delivery/subscribers.js';
|
||||
import { verifyToken } from './modules/auth/jwt.js';
|
||||
import { registerOptionalApiKeyContext } from './lib/api-key-auth.js';
|
||||
import { eventSubscriptionRoutes } from './modules/event-subscriptions/routes.js';
|
||||
import { agentExecutorRoutes } from './modules/agents/executor-routes.js';
|
||||
import { startEventBus, stopEventBus } from './lib/event-bus.js';
|
||||
import { wireDispatcherToBus } from './lib/event-dispatcher.js';
|
||||
|
||||
@ -265,6 +266,8 @@ await app.register(apiVersioningRoutes, { prefix: '/api' });
|
||||
|
||||
// Event subscriptions + DLQ + replay
|
||||
await app.register(eventSubscriptionRoutes, { prefix: '/api' });
|
||||
// Agent executor + tool registry + scheduling + metrics
|
||||
await app.register(agentExecutorRoutes, { prefix: '/api' });
|
||||
|
||||
// Register event bus subscribers
|
||||
registerDiagnosticsSubscribers(app.log);
|
||||
|
||||
Loading…
Reference in New Issue
Block a user