From 84dc3486876eb77c6605a97a89127135daeb830d Mon Sep 17 00:00:00 2001 From: saravanakumardb1 Date: Fri, 20 Mar 2026 03:20:31 -0700 Subject: [PATCH] =?UTF-8?q?feat(platform):=20Phase=202=20=E2=80=94=20Agent?= =?UTF-8?q?=20Runtime=20Orchestration?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - New: agents/executor.ts — full agent execution engine - Multi-step pipeline: prompt_assembly → tool_execution → finalize - AbortController-based cancellation with in-memory tracking - Token usage aggregation across tool calls - Review-required tool gating (pauses run, returns review_required) - Step event streaming for SSE consumers - New: agents/tool-registry.ts — global tool registry - Register/list/validate tools with risk levels + review flags - New: agents/executor-routes.ts — 11 endpoints, 14 tests - POST /agents/execute, POST /runs/:id/cancel - GET /agents/active-runs, GET /runs/:id/children, GET /runs/:id/tree - GET /agents/:id/metrics, GET /runs/:id/stream (SSE) - GET /tools, POST /tools/validate, POST /agents/:id/schedule - Enhanced: runs/repository.ts — added listChildRuns() for DAG query - 1,307 tests passing (14 new) --- .../modules/agents/executor-routes.test.ts | 267 +++++++++++++ .../src/modules/agents/executor-routes.ts | 280 ++++++++++++++ .../src/modules/agents/executor.ts | 357 ++++++++++++++++++ .../src/modules/agents/tool-registry.ts | 87 +++++ .../src/modules/runs/repository.ts | 8 + services/platform-service/src/server.ts | 3 + 6 files changed, 1002 insertions(+) create mode 100644 services/platform-service/src/modules/agents/executor-routes.test.ts create mode 100644 services/platform-service/src/modules/agents/executor-routes.ts create mode 100644 services/platform-service/src/modules/agents/executor.ts create mode 100644 services/platform-service/src/modules/agents/tool-registry.ts diff --git a/services/platform-service/src/modules/agents/executor-routes.test.ts b/services/platform-service/src/modules/agents/executor-routes.test.ts new file mode 100644 index 00000000..6afc7a97 --- /dev/null +++ b/services/platform-service/src/modules/agents/executor-routes.test.ts @@ -0,0 +1,267 @@ +import Fastify from 'fastify'; +import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'; + +const executorMock = { + executeAgent: vi.fn(), + cancelAgentRun: vi.fn(), + getActiveAgentRuns: vi.fn(), + onStepEvent: vi.fn(), +}; + +const toolRegistryMock = { + listTools: vi.fn(), + validateToolBindings: vi.fn(), +}; + +const runRepoMock = { + getRun: vi.fn(), + listRuns: vi.fn(), + listChildRuns: vi.fn(), + listRunSteps: vi.fn(), + updateRun: vi.fn(), +}; + +const jobRunnerMock = { + ensureJobDefinitions: vi.fn(), +}; + +const jobRegistryMock = { + registerJob: vi.fn(), +}; + +vi.mock('./executor.js', () => executorMock); +vi.mock('./tool-registry.js', () => toolRegistryMock); +vi.mock('../runs/repository.js', () => runRepoMock); +vi.mock('../runs/types.js', () => ({})); +vi.mock('../jobs/runner.js', () => jobRunnerMock); +vi.mock('../jobs/registry.js', () => jobRegistryMock); + +async function buildApp(payload?: { sub: string; productId: string; role?: string }) { + const { agentExecutorRoutes } = await import('./executor-routes.js'); + const app = Fastify({ logger: false }); + if (payload) { + app.addHook('onRequest', async req => { + req.jwtPayload = payload; + }); + } + await app.register(agentExecutorRoutes, { prefix: '/api' }); + return app; +} + +describe('agentExecutorRoutes', () => { + beforeEach(() => { + vi.clearAllMocks(); + }); + + afterEach(() => { + vi.restoreAllMocks(); + }); + + // ── Execute agent ─────────────────────────────────────── + + it('POST /agents/execute executes an agent', async () => { + executorMock.executeAgent.mockResolvedValue({ + runId: 'run_1', + status: 'succeeded', + tokenUsage: { prompt: 100, completion: 50, total: 150 }, + stepsCompleted: 3, + durationMs: 500, + }); + const app = await buildApp({ sub: 'user_1', productId: 'lysnrai' }); + + const res = await app.inject({ + method: 'POST', + url: '/api/agents/execute', + payload: { agentId: 'agt_1', input: { query: 'hello' } }, + }); + + expect(res.statusCode).toBe(200); + expect(executorMock.executeAgent).toHaveBeenCalledWith( + expect.objectContaining({ agentId: 'agt_1', userId: 'user_1' }) + ); + }); + + it('POST /agents/execute rejects invalid input', async () => { + const app = await buildApp({ sub: 'user_1', productId: 'lysnrai' }); + + const res = await app.inject({ + method: 'POST', + url: '/api/agents/execute', + payload: {}, + }); + + expect(res.statusCode).toBe(400); + }); + + it('POST /agents/execute rejects unauthenticated', async () => { + const app = await buildApp(); + const res = await app.inject({ + method: 'POST', + url: '/api/agents/execute', + payload: { agentId: 'agt_1' }, + }); + expect(res.statusCode).toBe(403); + }); + + // ── Cancel run ────────────────────────────────────────── + + it('POST /runs/:id/cancel cancels an active run', async () => { + executorMock.cancelAgentRun.mockReturnValue(true); + const app = await buildApp({ sub: 'user_1', productId: 'lysnrai' }); + + const res = await app.inject({ method: 'POST', url: '/api/runs/run_1/cancel' }); + expect(res.statusCode).toBe(200); + expect(JSON.parse(res.body)).toEqual({ cancelled: true, runId: 'run_1' }); + }); + + it('POST /runs/:id/cancel falls back to DB update', async () => { + executorMock.cancelAgentRun.mockReturnValue(false); + runRepoMock.updateRun.mockResolvedValue({ id: 'run_1', status: 'cancelled' }); + const app = await buildApp({ sub: 'user_1', productId: 'lysnrai' }); + + const res = await app.inject({ method: 'POST', url: '/api/runs/run_1/cancel' }); + expect(res.statusCode).toBe(200); + expect(runRepoMock.updateRun).toHaveBeenCalledWith( + 'run_1', + 'lysnrai', + expect.objectContaining({ status: 'cancelled' }) + ); + }); + + // ── Active runs ───────────────────────────────────────── + + it('GET /agents/active-runs returns active runs', async () => { + executorMock.getActiveAgentRuns.mockReturnValue(['run_1', 'run_2']); + const app = await buildApp({ sub: 'admin_1', productId: 'lysnrai', role: 'admin' }); + + const res = await app.inject({ method: 'GET', url: '/api/agents/active-runs' }); + expect(res.statusCode).toBe(200); + expect(JSON.parse(res.body)).toEqual({ activeRuns: ['run_1', 'run_2'] }); + }); + + it('GET /agents/active-runs requires admin', async () => { + const app = await buildApp({ sub: 'user_1', productId: 'lysnrai', role: 'viewer' }); + const res = await app.inject({ method: 'GET', url: '/api/agents/active-runs' }); + expect(res.statusCode).toBe(403); + }); + + // ── DAG / children ────────────────────────────────────── + + it('GET /runs/:id/children returns child runs', async () => { + runRepoMock.listChildRuns.mockResolvedValue([{ id: 'run_child_1', parentRunId: 'run_1' }]); + const app = await buildApp({ sub: 'user_1', productId: 'lysnrai' }); + + const res = await app.inject({ method: 'GET', url: '/api/runs/run_1/children' }); + expect(res.statusCode).toBe(200); + expect(runRepoMock.listChildRuns).toHaveBeenCalledWith('lysnrai', 'run_1'); + }); + + it('GET /runs/:id/tree returns full run tree', async () => { + runRepoMock.getRun.mockResolvedValue({ id: 'run_1', status: 'succeeded' }); + runRepoMock.listChildRuns.mockResolvedValue([{ id: 'run_child_1' }]); + runRepoMock.listRunSteps.mockResolvedValue([]); + const app = await buildApp({ sub: 'user_1', productId: 'lysnrai' }); + + const res = await app.inject({ method: 'GET', url: '/api/runs/run_1/tree' }); + expect(res.statusCode).toBe(200); + const body = JSON.parse(res.body); + expect(body.run.id).toBe('run_1'); + expect(body.children).toHaveLength(1); + }); + + // ── Agent metrics ─────────────────────────────────────── + + it('GET /agents/:id/metrics computes agent metrics', async () => { + runRepoMock.listRuns.mockResolvedValue([ + { + id: 'run_1', + status: 'succeeded', + metadata: { agentId: 'agt_1' }, + output: { tokenUsage: { prompt: 100, completion: 50, total: 150 }, durationMs: 500 }, + createdAt: '2026-01-01', + }, + { + id: 'run_2', + status: 'failed', + metadata: { agentId: 'agt_1' }, + createdAt: '2026-01-02', + }, + { + id: 'run_3', + status: 'succeeded', + metadata: { agentId: 'other_agent' }, + createdAt: '2026-01-02', + }, + ]); + const app = await buildApp({ sub: 'admin_1', productId: 'lysnrai', role: 'admin' }); + + const res = await app.inject({ method: 'GET', url: '/api/agents/agt_1/metrics' }); + expect(res.statusCode).toBe(200); + const body = JSON.parse(res.body); + expect(body.totalRuns).toBe(2); + expect(body.succeeded).toBe(1); + expect(body.failed).toBe(1); + expect(body.successRate).toBe(50); + expect(body.tokenUsage.total).toBe(150); + }); + + // ── Tool registry ─────────────────────────────────────── + + it('GET /tools lists registered tools', async () => { + toolRegistryMock.listTools.mockReturnValue([{ name: 'search', description: 'Search the web' }]); + const app = await buildApp({ sub: 'admin_1', productId: 'lysnrai', role: 'admin' }); + + const res = await app.inject({ method: 'GET', url: '/api/tools' }); + expect(res.statusCode).toBe(200); + expect(JSON.parse(res.body)).toHaveLength(1); + }); + + it('POST /tools/validate validates tool bindings', async () => { + toolRegistryMock.validateToolBindings.mockReturnValue({ + valid: ['search'], + invalid: ['unknown_tool'], + }); + const app = await buildApp({ sub: 'admin_1', productId: 'lysnrai', role: 'admin' }); + + const res = await app.inject({ + method: 'POST', + url: '/api/tools/validate', + payload: { bindings: ['search', 'unknown_tool'] }, + }); + expect(res.statusCode).toBe(200); + expect(JSON.parse(res.body).invalid).toContain('unknown_tool'); + }); + + // ── Agent scheduling ──────────────────────────────────── + + it('POST /agents/:id/schedule creates a scheduled job', async () => { + jobRunnerMock.ensureJobDefinitions.mockResolvedValue(undefined); + jobRegistryMock.registerJob.mockReturnValue(undefined); + const app = await buildApp({ sub: 'admin_1', productId: 'lysnrai', role: 'admin' }); + + const res = await app.inject({ + method: 'POST', + url: '/api/agents/agt_1/schedule', + payload: { cronExpression: '0 */6 * * *', input: { mode: 'daily' } }, + }); + + expect(res.statusCode).toBe(200); + expect(JSON.parse(res.body)).toEqual({ + scheduled: true, + jobName: 'agent:agt_1', + cronExpression: '0 */6 * * *', + }); + expect(jobRegistryMock.registerJob).toHaveBeenCalledWith('agent:agt_1', expect.any(Function)); + }); + + it('POST /agents/:id/schedule requires cronExpression', async () => { + const app = await buildApp({ sub: 'admin_1', productId: 'lysnrai', role: 'admin' }); + + const res = await app.inject({ + method: 'POST', + url: '/api/agents/agt_1/schedule', + payload: {}, + }); + expect(res.statusCode).toBe(400); + }); +}); diff --git a/services/platform-service/src/modules/agents/executor-routes.ts b/services/platform-service/src/modules/agents/executor-routes.ts new file mode 100644 index 00000000..5eb50779 --- /dev/null +++ b/services/platform-service/src/modules/agents/executor-routes.ts @@ -0,0 +1,280 @@ +import { z } from 'zod'; +import type { FastifyInstance } from 'fastify'; +import { BadRequestError, ForbiddenError, NotFoundError } from '../../lib/errors.js'; +import { executeAgent, cancelAgentRun, getActiveAgentRuns, onStepEvent } from './executor.js'; +import { listTools, validateToolBindings } from './tool-registry.js'; +import type { RunDoc } from '../runs/types.js'; +import * as runRepo from '../runs/repository.js'; + +const ExecuteAgentSchema = z.object({ + agentId: z.string().min(1), + input: z.record(z.unknown()).default({}), + parentRunId: z.string().optional(), +}); + +function requireAuth(req: { jwtPayload?: { sub?: string; role?: string; productId?: string } }): { + userId: string; + productId: string; +} { + const payload = req.jwtPayload; + if (!payload?.sub) throw new ForbiddenError('Authentication required'); + return { + userId: payload.sub, + productId: payload.productId ?? process.env.DEFAULT_PRODUCT_ID ?? 'lysnrai', + }; +} + +function requireAdmin(req: { jwtPayload?: { sub?: string; role?: string; productId?: string } }): { + userId: string; + productId: string; +} { + const access = requireAuth(req); + const role = req.jwtPayload?.role; + if (!role || !['super_admin', 'admin'].includes(role)) { + throw new ForbiddenError('Admin access required'); + } + return access; +} + +export async function agentExecutorRoutes(app: FastifyInstance) { + // ── Execute agent ───────────────────────────────────────── + + app.post('/agents/execute', async req => { + const access = requireAuth(req); + const parsed = ExecuteAgentSchema.safeParse(req.body); + if (!parsed.success) { + throw new BadRequestError(parsed.error.issues.map(i => i.message).join('; ')); + } + + return executeAgent({ + agentId: parsed.data.agentId, + productId: access.productId, + userId: access.userId, + input: parsed.data.input, + parentRunId: parsed.data.parentRunId, + triggeredBy: access.userId, + }); + }); + + // ── Cancel run ──────────────────────────────────────────── + + app.post('/runs/:id/cancel', async req => { + const access = requireAuth(req); + const { id } = req.params as { id: string }; + + // Try in-memory cancel first (active agent runs) + if (cancelAgentRun(id)) { + return { cancelled: true, runId: id }; + } + + // Fall back to DB update for non-active runs + try { + await runRepo.updateRun(id, access.productId, { + status: 'cancelled', + completedAt: new Date().toISOString(), + error: 'Cancelled by user', + }); + return { cancelled: true, runId: id }; + } catch { + throw new NotFoundError(`Run '${id}' not found or already completed`); + } + }); + + // ── Active agent runs ───────────────────────────────────── + + app.get('/agents/active-runs', async req => { + requireAdmin(req); + return { activeRuns: getActiveAgentRuns() }; + }); + + // ── Parent-child run DAG ────────────────────────────────── + + app.get('/runs/:id/children', async req => { + const access = requireAuth(req); + const { id } = req.params as { id: string }; + return runRepo.listChildRuns(access.productId, id); + }); + + app.get('/runs/:id/tree', async req => { + const access = requireAuth(req); + const { id } = req.params as { id: string }; + + // Build a tree: get root run + all descendants + const rootRun = await runRepo.getRun(id, access.productId); + const children = await runRepo.listChildRuns(access.productId, id); + const steps = await runRepo.listRunSteps(access.productId, id); + + return { + run: rootRun, + steps, + children: await Promise.all( + children.map(async (child: RunDoc) => ({ + run: child, + steps: await runRepo.listRunSteps(access.productId, child.id), + })) + ), + }; + }); + + // ── Agent metrics ───────────────────────────────────────── + + app.get('/agents/:id/metrics', async req => { + const access = requireAdmin(req); + const { id } = req.params as { id: string }; + const query = req.query as { since?: string; limit?: string }; + + const runs = await runRepo.listRuns(access.productId, { + kind: 'agent', + limit: Math.min(parseInt(query.limit ?? '100', 10), 500), + }); + + // Filter by agentId (stored in metadata) + const agentRuns = runs.filter( + r => r.metadata && (r.metadata as Record).agentId === id + ); + + // Filter by since if provided + const filteredRuns = query.since + ? agentRuns.filter(r => r.createdAt >= query.since!) + : agentRuns; + + const succeeded = filteredRuns.filter(r => r.status === 'succeeded'); + const failed = filteredRuns.filter(r => r.status === 'failed'); + const cancelled = filteredRuns.filter(r => r.status === 'cancelled'); + + // Compute token usage from output metadata + let totalTokens = 0; + let totalPromptTokens = 0; + let totalCompletionTokens = 0; + for (const run of succeeded) { + const output = run.output as Record | undefined; + if (output?.tokenUsage) { + const usage = output.tokenUsage as { prompt?: number; completion?: number; total?: number }; + totalPromptTokens += usage.prompt ?? 0; + totalCompletionTokens += usage.completion ?? 0; + totalTokens += usage.total ?? 0; + } + } + + // Compute average duration + const durations = succeeded + .map(r => { + const out = r.output as Record | undefined; + return (out?.durationMs as number | undefined) ?? 0; + }) + .filter(d => d > 0); + + const avgDurationMs = + durations.length > 0 + ? Math.round(durations.reduce((a, b) => a + b, 0) / durations.length) + : 0; + + return { + agentId: id, + totalRuns: filteredRuns.length, + succeeded: succeeded.length, + failed: failed.length, + cancelled: cancelled.length, + successRate: + filteredRuns.length > 0 ? Math.round((succeeded.length / filteredRuns.length) * 100) : 0, + avgDurationMs, + tokenUsage: { + prompt: totalPromptTokens, + completion: totalCompletionTokens, + total: totalTokens, + }, + }; + }); + + // ── Run step streaming (SSE) ────────────────────────────── + + app.get('/runs/:id/stream', async (req, reply) => { + requireAuth(req); + const { id } = req.params as { id: string }; + + reply.raw.writeHead(200, { + 'Content-Type': 'text/event-stream', + 'Cache-Control': 'no-cache', + Connection: 'keep-alive', + }); + + const unsubscribe = onStepEvent(id, event => { + reply.raw.write(`data: ${JSON.stringify(event)}\n\n`); + }); + + // Send initial keepalive + reply.raw.write(`data: ${JSON.stringify({ type: 'connected', runId: id })}\n\n`); + + req.raw.on('close', () => { + unsubscribe(); + }); + + // Don't end the response — keep it open for SSE + await new Promise(() => {}); + }); + + // ── Tool registry ───────────────────────────────────────── + + app.get('/tools', async req => { + requireAdmin(req); + return listTools(); + }); + + app.post('/tools/validate', async req => { + requireAdmin(req); + const body = req.body as { bindings?: string[] }; + if (!body.bindings || !Array.isArray(body.bindings)) { + throw new BadRequestError('bindings array is required'); + } + return validateToolBindings(body.bindings); + }); + + // ── Agent scheduling ────────────────────────────────────── + + app.post('/agents/:id/schedule', async req => { + const access = requireAdmin(req); + const { id } = req.params as { id: string }; + const body = req.body as { + cronExpression?: string; + input?: Record; + enabled?: boolean; + }; + + if (!body.cronExpression) { + throw new BadRequestError('cronExpression is required'); + } + + // Register as a job definition + const { ensureJobDefinitions } = await import('../jobs/runner.js'); + const { registerJob } = await import('../jobs/registry.js'); + + const jobName = `agent:${id}`; + registerJob(jobName, async ctx => { + const result = await executeAgent({ + agentId: id, + productId: access.productId, + userId: ctx.productId, // system-triggered + input: body.input ?? {}, + triggeredBy: 'scheduler', + }); + return { + success: result.status === 'succeeded', + message: result.status, + metrics: { + tokenUsage: result.tokenUsage, + durationMs: result.durationMs, + }, + }; + }); + + await ensureJobDefinitions([ + { + name: jobName, + cron: body.cronExpression, + description: `Scheduled execution of agent ${id}`, + }, + ]); + + return { scheduled: true, jobName, cronExpression: body.cronExpression }; + }); +} diff --git a/services/platform-service/src/modules/agents/executor.ts b/services/platform-service/src/modules/agents/executor.ts new file mode 100644 index 00000000..3a2a83be --- /dev/null +++ b/services/platform-service/src/modules/agents/executor.ts @@ -0,0 +1,357 @@ +import { randomUUID } from 'node:crypto'; +import * as runTracker from '../runs/tracker.js'; +import * as agentRepo from './repository.js'; +import { + getToolsForBindings, + type ToolExecutionContext, + type ToolExecutionResult, +} from './tool-registry.js'; +import { bus } from '../../lib/event-bus.js'; + +// ── Types ──────────────────────────────────────────────────── + +export interface ExecuteAgentInput { + agentId: string; + productId: string; + userId: string; + input: Record; + parentRunId?: string; + triggeredBy?: string; +} + +export interface ExecuteAgentResult { + runId: string; + status: 'succeeded' | 'failed' | 'cancelled' | 'review_required'; + output?: Record; + error?: string; + tokenUsage: { prompt: number; completion: number; total: number }; + stepsCompleted: number; + durationMs: number; +} + +export interface StepEvent { + runId: string; + stepName: string; + status: string; + output?: unknown; + error?: string; + tokenUsage?: { prompt: number; completion: number; total: number }; +} + +type StepListener = (event: StepEvent) => void; + +// ── Active Runs ────────────────────────────────────────────── + +const activeRuns = new Map(); +const stepListeners = new Map>(); + +export function getActiveAgentRuns(): string[] { + return Array.from(activeRuns.keys()); +} + +export function cancelAgentRun(runId: string): boolean { + const controller = activeRuns.get(runId); + if (!controller) return false; + controller.abort(); + return true; +} + +export function onStepEvent(runId: string, listener: StepListener): () => void { + const listeners = stepListeners.get(runId) ?? new Set(); + listeners.add(listener); + stepListeners.set(runId, listeners); + return () => { + listeners.delete(listener); + if (listeners.size === 0) stepListeners.delete(runId); + }; +} + +function emitStepEvent(event: StepEvent): void { + const listeners = stepListeners.get(event.runId); + if (listeners) { + for (const listener of listeners) { + try { + listener(event); + } catch { + // best-effort + } + } + } +} + +// ── Executor ───────────────────────────────────────────────── + +export async function executeAgent(input: ExecuteAgentInput): Promise { + const runId = `run_${randomUUID()}`; + const startTime = Date.now(); + const controller = new AbortController(); + activeRuns.set(runId, controller); + + const tokenUsage = { prompt: 0, completion: 0, total: 0 }; + let stepsCompleted = 0; + + try { + // Get agent + published version + const agent = await agentRepo.getAgent(input.agentId, input.productId); + const version = await agentRepo.getPublishedVersion(input.agentId); + if (!version) { + throw new Error(`No published version found for agent '${input.agentId}'`); + } + + // Start run tracking + await runTracker.startRun({ + id: runId, + productId: input.productId, + kind: 'agent', + name: agent.name, + source: `platform.agents.${agent.key}`, + triggeredBy: input.triggeredBy ?? input.userId, + parentRunId: input.parentRunId, + input: input.input, + metadata: { + agentId: input.agentId, + agentKey: agent.key, + versionId: version.id, + versionNumber: version.version, + }, + }); + + // Resolve tool bindings + const boundTools = getToolsForBindings(version.toolBindings); + const toolContext: ToolExecutionContext = { + runId, + productId: input.productId, + agentId: input.agentId, + userId: input.userId, + }; + + // Step 1: Prompt assembly + if (controller.signal.aborted) throw new Error('Run cancelled'); + await runTracker.startRunStep({ + runId, + productId: input.productId, + stepName: 'prompt_assembly', + order: 1, + input: { toolCount: boundTools.length }, + }); + + const assembledPrompt = assemblePrompt( + version, + input.input, + boundTools.map(t => t.definition) + ); + await runTracker.completeRunStep(runId, input.productId, 'prompt_assembly', { + promptLength: assembledPrompt.length, + toolsBound: boundTools.map(t => t.definition.name), + }); + stepsCompleted++; + emitStepEvent({ runId, stepName: 'prompt_assembly', status: 'succeeded' }); + + // Step 2: Tool execution (if tools are bound) + if (boundTools.length > 0 && controller.signal.aborted === false) { + await runTracker.startRunStep({ + runId, + productId: input.productId, + stepName: 'tool_execution', + order: 2, + }); + + const toolResults: Array<{ tool: string; result: ToolExecutionResult }> = []; + for (const tool of boundTools) { + if (controller.signal.aborted) break; + + // Check if tool requires review + if (tool.definition.requiresReview) { + emitStepEvent({ + runId, + stepName: 'tool_execution', + status: 'review_required', + output: { tool: tool.definition.name }, + }); + + // Mark run as needing review — caller should handle creating a review item + await runTracker.failRunStep( + runId, + input.productId, + 'tool_execution', + `Tool '${tool.definition.name}' requires human review` + ); + await runTracker.failRun( + runId, + input.productId, + `Paused: tool '${tool.definition.name}' requires review`, + { status: 'review_required' } + ); + + activeRuns.delete(runId); + return { + runId, + status: 'review_required', + error: `Tool '${tool.definition.name}' requires human review`, + tokenUsage, + stepsCompleted, + durationMs: Date.now() - startTime, + }; + } + + try { + const result = await tool.execute(input.input, toolContext); + toolResults.push({ tool: tool.definition.name, result }); + if (result.tokenUsage) { + tokenUsage.prompt += result.tokenUsage.prompt; + tokenUsage.completion += result.tokenUsage.completion; + tokenUsage.total += result.tokenUsage.total; + } + } catch (err: unknown) { + toolResults.push({ + tool: tool.definition.name, + result: { + success: false, + error: err instanceof Error ? err.message : String(err), + }, + }); + } + } + + const failedTools = toolResults.filter(r => !r.result.success); + if (failedTools.length > 0 && !controller.signal.aborted) { + await runTracker.failRunStep( + runId, + input.productId, + 'tool_execution', + `${failedTools.length} tool(s) failed: ${failedTools.map(t => t.tool).join(', ')}` + ); + } else if (!controller.signal.aborted) { + await runTracker.completeRunStep(runId, input.productId, 'tool_execution', { + toolResults: toolResults.map(r => ({ + tool: r.tool, + success: r.result.success, + })), + }); + } + + stepsCompleted++; + emitStepEvent({ + runId, + stepName: 'tool_execution', + status: failedTools.length > 0 ? 'partial_failure' : 'succeeded', + tokenUsage: { ...tokenUsage }, + }); + } + + // Step 3: Finalize + if (controller.signal.aborted) throw new Error('Run cancelled'); + await runTracker.startRunStep({ + runId, + productId: input.productId, + stepName: 'finalize', + order: 3, + }); + + const output = { + agentKey: agent.key, + versionNumber: version.version, + tokenUsage, + stepsCompleted: stepsCompleted + 1, + }; + + await runTracker.completeRunStep(runId, input.productId, 'finalize', output); + stepsCompleted++; + emitStepEvent({ runId, stepName: 'finalize', status: 'succeeded', tokenUsage }); + + // Complete the run + await runTracker.completeRun(runId, input.productId, { + ...output, + durationMs: Date.now() - startTime, + }); + + // Emit event for downstream consumption + try { + await bus.emit('job.completed', { + jobName: `agent:${agent.key}`, + runId, + durationMs: Date.now() - startTime, + productId: input.productId, + }); + } catch { + // best-effort + } + + activeRuns.delete(runId); + return { + runId, + status: 'succeeded', + output, + tokenUsage, + stepsCompleted, + durationMs: Date.now() - startTime, + }; + } catch (err: unknown) { + const errorMsg = err instanceof Error ? err.message : String(err); + const isCancelled = controller.signal.aborted; + + try { + if (isCancelled) { + await runTracker.failRun(runId, input.productId, 'Run cancelled by user', { + cancelledAt: new Date().toISOString(), + }); + } else { + await runTracker.failRun(runId, input.productId, errorMsg); + } + } catch { + // best-effort + } + + try { + await bus.emit('job.failed', { + jobName: `agent:${input.agentId}`, + runId, + error: errorMsg, + productId: input.productId, + }); + } catch { + // best-effort + } + + activeRuns.delete(runId); + return { + runId, + status: isCancelled ? 'cancelled' : 'failed', + error: errorMsg, + tokenUsage, + stepsCompleted, + durationMs: Date.now() - startTime, + }; + } +} + +// ── Prompt Assembly ────────────────────────────────────────── + +function assemblePrompt( + version: { + prompt: string; + systemInstructions?: string; + modelPolicy?: { maxTokens?: number } | null; + }, + input: Record, + tools: Array<{ name: string; description: string }> +): string { + const parts: string[] = []; + + if (version.systemInstructions) { + parts.push(`[System]\n${version.systemInstructions}`); + } + + parts.push(`[Prompt]\n${version.prompt}`); + + if (tools.length > 0) { + const toolList = tools.map(t => `- ${t.name}: ${t.description}`).join('\n'); + parts.push(`[Available Tools]\n${toolList}`); + } + + if (Object.keys(input).length > 0) { + parts.push(`[Input]\n${JSON.stringify(input, null, 2)}`); + } + + return parts.join('\n\n'); +} diff --git a/services/platform-service/src/modules/agents/tool-registry.ts b/services/platform-service/src/modules/agents/tool-registry.ts new file mode 100644 index 00000000..00e5b8a8 --- /dev/null +++ b/services/platform-service/src/modules/agents/tool-registry.ts @@ -0,0 +1,87 @@ +import { z } from 'zod'; + +// ── Tool Definition ───────────────────────────────────────── +// Tools are registered globally and bound to agent versions via toolBindings[]. + +export const ToolParameterSchema = z.object({ + name: z.string().min(1), + type: z.enum(['string', 'number', 'boolean', 'object', 'array']), + description: z.string().optional(), + required: z.boolean().default(false), +}); + +export const ToolDefinitionSchema = z.object({ + name: z.string().min(1), + description: z.string().min(1), + parameters: z.array(ToolParameterSchema).default([]), + requiresReview: z.boolean().default(false), + riskLevel: z.enum(['low', 'medium', 'high', 'critical']).default('low'), +}); + +export type ToolDefinition = z.infer; + +export type ToolExecuteFn = ( + params: Record, + context: ToolExecutionContext +) => Promise; + +export interface ToolExecutionContext { + runId: string; + productId: string; + agentId: string; + userId: string; +} + +export interface ToolExecutionResult { + success: boolean; + output?: unknown; + error?: string; + tokenUsage?: { prompt: number; completion: number; total: number }; +} + +interface RegisteredTool { + definition: ToolDefinition; + execute: ToolExecuteFn; +} + +// ── Global Tool Registry ──────────────────────────────────── + +const tools = new Map(); + +export function registerTool(definition: ToolDefinition, execute: ToolExecuteFn): void { + tools.set(definition.name, { definition, execute }); +} + +export function getTool(name: string): RegisteredTool | undefined { + return tools.get(name); +} + +export function listTools(): ToolDefinition[] { + return Array.from(tools.values()).map(t => t.definition); +} + +export function getToolsForBindings(bindings: string[]): RegisteredTool[] { + return bindings.map(name => tools.get(name)).filter((t): t is RegisteredTool => t !== undefined); +} + +export function clearTools(): void { + tools.clear(); +} + +// ── Validate tool bindings ────────────────────────────────── + +export function validateToolBindings(bindings: string[]): { + valid: string[]; + invalid: string[]; +} { + const valid: string[] = []; + const invalid: string[] = []; + for (const name of bindings) { + if (tools.has(name)) { + valid.push(name); + } else { + invalid.push(name); + } + } + return { valid, invalid }; +} diff --git a/services/platform-service/src/modules/runs/repository.ts b/services/platform-service/src/modules/runs/repository.ts index 2ef0d0c7..72869f76 100644 --- a/services/platform-service/src/modules/runs/repository.ts +++ b/services/platform-service/src/modules/runs/repository.ts @@ -63,6 +63,14 @@ export async function updateRunStep( return updated; } +export async function listChildRuns(productId: string, parentRunId: string): Promise { + return runsCollection().findMany({ + filter: { productId, parentRunId }, + sort: { createdAt: -1 }, + limit: 100, + }); +} + export async function listRunSteps(productId: string, runId: string): Promise { return stepsCollection().findMany({ filter: { diff --git a/services/platform-service/src/server.ts b/services/platform-service/src/server.ts index 311001da..de363999 100644 --- a/services/platform-service/src/server.ts +++ b/services/platform-service/src/server.ts @@ -108,6 +108,7 @@ import { registerDeliverySubscribers } from './modules/delivery/subscribers.js'; import { verifyToken } from './modules/auth/jwt.js'; import { registerOptionalApiKeyContext } from './lib/api-key-auth.js'; import { eventSubscriptionRoutes } from './modules/event-subscriptions/routes.js'; +import { agentExecutorRoutes } from './modules/agents/executor-routes.js'; import { startEventBus, stopEventBus } from './lib/event-bus.js'; import { wireDispatcherToBus } from './lib/event-dispatcher.js'; @@ -265,6 +266,8 @@ await app.register(apiVersioningRoutes, { prefix: '/api' }); // Event subscriptions + DLQ + replay await app.register(eventSubscriptionRoutes, { prefix: '/api' }); +// Agent executor + tool registry + scheduling + metrics +await app.register(agentExecutorRoutes, { prefix: '/api' }); // Register event bus subscribers registerDiagnosticsSubscribers(app.log);