From fe36296196be0d5a70946fa69950d2998f93ab72 Mon Sep 17 00:00:00 2001 From: Saravana Achu Mac Date: Sat, 4 Apr 2026 01:14:37 -0700 Subject: [PATCH] feat(runtime): add platform runtime projection api --- .../ECOSYSTEM_IMPLEMENTATION_TRACKER.md | 13 +- ...PHASE4_PERSONAL_TIMELINE_EXECUTION_PLAN.md | 2 +- ...5_AGENT_RUNTIME_CONTRACT_EXECUTION_PLAN.md | 20 +- .../src/modules/agent-runtime/routes.test.ts | 131 ++++++++++++ .../src/modules/agent-runtime/routes.ts | 191 ++++++++++++++++++ services/platform-service/src/server.test.ts | 1 + services/platform-service/src/server.ts | 2 + 7 files changed, 355 insertions(+), 5 deletions(-) create mode 100644 services/platform-service/src/modules/agent-runtime/routes.test.ts create mode 100644 services/platform-service/src/modules/agent-runtime/routes.ts diff --git a/docs/ecosystem/ECOSYSTEM_IMPLEMENTATION_TRACKER.md b/docs/ecosystem/ECOSYSTEM_IMPLEMENTATION_TRACKER.md index 3676b052..3a3cb690 100644 --- a/docs/ecosystem/ECOSYSTEM_IMPLEMENTATION_TRACKER.md +++ b/docs/ecosystem/ECOSYSTEM_IMPLEMENTATION_TRACKER.md @@ -200,7 +200,7 @@ These should be resolved before claiming the ecosystem docs are fully implementa - `3f2482b` - [x] add first hosted timeline ingest/query API in `platform-service` Commits: - - `TBD` + - `e377351` Status note: - platform-service now persists timeline items in `timeline_items` - `POST /api/timeline/ingest` accepts canonical ecosystem events and stores unified timeline items @@ -219,7 +219,16 @@ These should be resolved before claiming the ecosystem docs are fully implementa - [x] validate FlowMonk-style scheduled execution against the shared runtime contract Commits: - `3f2482b` -- [ ] wire first product implementations to emit the shared runtime objects +- [x] add first runtime projection + dispatch validation API in `platform-service` + Commits: + - `TBD` + Status note: + - `GET /api/agent-runtime/sessions` projects platform sessions into the shared `AgentSession` shape + - `GET /api/agent-runtime/runs` projects platform runs into the shared `AgentRun` shape + - `POST /api/agent-runtime/dispatch/validate` validates `AgentDispatchRequest` payloads against the canonical schema +- [ ] wire first product implementations to emit the shared runtime objects directly from Cowork and FlowMonk + Status note: + - FlowMonk runtime-emitter implementation work was started, but this clone cannot currently verify it because the repo depends on a local npm registry at `http://localhost:3300` and backend dependencies are not installed --- diff --git a/docs/ecosystem/PHASE4_PERSONAL_TIMELINE_EXECUTION_PLAN.md b/docs/ecosystem/PHASE4_PERSONAL_TIMELINE_EXECUTION_PLAN.md index f1f1ede2..21e18ee0 100644 --- a/docs/ecosystem/PHASE4_PERSONAL_TIMELINE_EXECUTION_PLAN.md +++ b/docs/ecosystem/PHASE4_PERSONAL_TIMELINE_EXECUTION_PLAN.md @@ -86,4 +86,4 @@ Observed baseline: ## 6. Commits - `3f2482b` timeline contract and aggregator baseline -- `TBD` platform-service timeline ingest + query baseline +- `e377351` platform-service timeline ingest + query baseline diff --git a/docs/ecosystem/PHASE5_AGENT_RUNTIME_CONTRACT_EXECUTION_PLAN.md b/docs/ecosystem/PHASE5_AGENT_RUNTIME_CONTRACT_EXECUTION_PLAN.md index cad61375..a674f613 100644 --- a/docs/ecosystem/PHASE5_AGENT_RUNTIME_CONTRACT_EXECUTION_PLAN.md +++ b/docs/ecosystem/PHASE5_AGENT_RUNTIME_CONTRACT_EXECUTION_PLAN.md @@ -1,7 +1,7 @@ # Phase 5 Execution Plan > **Flow:** Shared agent runtime contract baseline -> **Status:** Implemented baseline +> **Status:** Baseline implemented, platform integration in progress > **Owner:** `learning_ai_common_plat` > **Purpose:** Turn the runtime contract draft into concrete schemas for sessions, tasks, todos, runs, approvals, dispatch, and action logs. @@ -15,6 +15,7 @@ Phase 5 defines and implements: 2. dispatch metadata contract 3. approval checkpoint contract 4. action-log contract +5. the first product-facing runtime projection and dispatch-validation API This phase establishes the contract layer needed before cross-product runtime adoption in Cowork, FlowMonk, and future agent surfaces. @@ -25,6 +26,8 @@ This phase establishes the contract layer needed before cross-product runtime ad - `packages/events/src/agent-runtime.ts` - `packages/events/src/agent-runtime.test.ts` - `packages/events/src/index.ts` +- `services/platform-service/src/modules/agent-runtime/routes.ts` +- `services/platform-service/src/modules/agent-runtime/routes.test.ts` --- @@ -44,12 +47,17 @@ This phase establishes the contract layer needed before cross-product runtime ad ## 4. Verification - `cd learning_ai_common_plat/packages/events && pnpm exec vitest run src/agent-runtime.test.ts` +- `cd learning_ai_common_plat && pnpm --filter @lysnrai/platform-service exec vitest run src/modules/agent-runtime/routes.test.ts src/server.test.ts` +- `cd learning_ai_common_plat && pnpm --filter @lysnrai/platform-service exec tsc --noEmit` Observed baseline: - Cowork-style remote dispatch parses against the shared runtime contract - FlowMonk-style scheduled execution parses against the same contract - approval and action-log primitives now have one canonical schema surface +- platform-service now exposes `GET /api/agent-runtime/sessions` as a shared runtime-session projection +- platform-service now exposes `GET /api/agent-runtime/runs` as a shared runtime-run projection +- platform-service now exposes `POST /api/agent-runtime/dispatch/validate` against the canonical dispatch schema --- @@ -60,10 +68,18 @@ Observed baseline: - [x] define session, task, todo, run, and approval schemas - [x] define action-log contract - [x] verify Cowork-style and FlowMonk-style runtime examples -- [ ] wire first product implementations to emit the shared runtime objects +- [x] add first product-facing runtime projection and dispatch validation API in platform-service +- [ ] wire first product implementations to emit the shared runtime objects directly from Cowork and FlowMonk --- ## 6. Commits - `3f2482b` runtime contract schema baseline +- `TBD` platform-service runtime projection + dispatch validation + +## 7. Remaining Gaps + +- Cowork still needs to emit the shared runtime objects directly from its own task orchestration flow. +- FlowMonk runtime-emitter code was started locally, but verification in this clone is blocked because the repo depends on a local npm registry at `http://localhost:3300` and its backend `node_modules` are missing. +- run-vs-session semantics for queued work still need a stricter mapping than the current projection fallback. diff --git a/services/platform-service/src/modules/agent-runtime/routes.test.ts b/services/platform-service/src/modules/agent-runtime/routes.test.ts new file mode 100644 index 00000000..4952e847 --- /dev/null +++ b/services/platform-service/src/modules/agent-runtime/routes.test.ts @@ -0,0 +1,131 @@ +import Fastify from 'fastify'; +import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'; + +const runRepoMock = { + listRuns: vi.fn(), +}; + +const sessionRepoMock = { + listAllUserSessions: vi.fn(), +}; + +vi.mock('../runs/repository.js', () => runRepoMock); +vi.mock('../sessions/repository.js', () => sessionRepoMock); + +async function buildApp(payload?: { sub: string; productId: string; role?: string }) { + const { agentRuntimeRoutes } = await import('./routes.js'); + const app = Fastify({ logger: false }); + if (payload) { + app.addHook('onRequest', async req => { + req.jwtPayload = payload; + }); + } + await app.register(agentRuntimeRoutes, { prefix: '/api' }); + return app; +} + +describe('agentRuntimeRoutes', () => { + beforeEach(() => { + vi.clearAllMocks(); + }); + + afterEach(() => { + vi.restoreAllMocks(); + }); + + it('GET /agent-runtime/sessions projects session docs into the shared runtime shape', async () => { + sessionRepoMock.listAllUserSessions.mockResolvedValue([ + { + id: 'sess_1', + productId: 'lysnrai', + userId: 'user_1', + platform: 'web', + ipAddress: '127.0.0.1', + userAgent: 'test', + lastActiveAt: '2026-04-04T18:00:00.000Z', + createdAt: '2026-04-04T17:00:00.000Z', + expiresAt: '2099-04-04T17:00:00.000Z', + }, + ]); + const app = await buildApp({ sub: 'user_1', productId: 'lysnrai' }); + + const res = await app.inject({ + method: 'GET', + url: '/api/agent-runtime/sessions', + }); + + expect(res.statusCode).toBe(200); + const body = JSON.parse(res.body); + expect(body.sessions[0]).toMatchObject({ + sessionId: 'sess_1', + productId: 'lysnrai', + userId: 'user_1', + status: 'active', + }); + expect(body.sessions[0].dispatchContext.originSurface).toBe('web'); + }); + + it('GET /agent-runtime/runs projects run docs into the shared runtime shape', async () => { + runRepoMock.listRuns.mockResolvedValue([ + { + id: 'run_1', + productId: 'lysnrai', + kind: 'agent', + name: 'runtime-check', + source: 'dispatch', + status: 'succeeded', + createdAt: '2026-04-04T18:00:00.000Z', + updatedAt: '2026-04-04T18:10:00.000Z', + completedAt: '2026-04-04T18:10:00.000Z', + metadata: { + sessionId: 'sess_1', + correlationId: 'corr_1', + }, + }, + ]); + const app = await buildApp({ sub: 'admin_1', productId: 'lysnrai', role: 'admin' }); + + const res = await app.inject({ + method: 'GET', + url: '/api/agent-runtime/runs?limit=10', + }); + + expect(res.statusCode).toBe(200); + const body = JSON.parse(res.body); + expect(body.runs[0]).toMatchObject({ + runId: 'run_1', + sessionId: 'sess_1', + productId: 'lysnrai', + status: 'completed', + correlationId: 'corr_1', + }); + }); + + it('POST /agent-runtime/dispatch/validate validates the shared dispatch contract', async () => { + const app = await buildApp({ sub: 'user_1', productId: 'lysnrai' }); + + const res = await app.inject({ + method: 'POST', + url: '/api/agent-runtime/dispatch/validate', + payload: { + dispatchId: 'dispatch_1', + targetProductId: 'lysnrai', + targetExecutor: 'generic-agent', + userId: 'user_1', + title: 'Review latest transcript', + intent: 'Summarize and prepare a memory candidate.', + artifactRefs: ['art_1'], + memoryRefs: [], + dispatchContext: { + originSurface: 'web', + originProductId: 'lysnrai', + dispatchMode: 'interactive', + initiatedAt: '2026-04-04T18:00:00.000Z', + }, + }, + }); + + expect(res.statusCode).toBe(200); + expect(JSON.parse(res.body).valid).toBe(true); + }); +}); diff --git a/services/platform-service/src/modules/agent-runtime/routes.ts b/services/platform-service/src/modules/agent-runtime/routes.ts new file mode 100644 index 00000000..db7cbaa4 --- /dev/null +++ b/services/platform-service/src/modules/agent-runtime/routes.ts @@ -0,0 +1,191 @@ +import type { FastifyInstance } from 'fastify'; +import { + AgentDispatchRequestSchema, + AgentRunSchema, + AgentSessionSchema, + type AgentRun, + type AgentSession, +} from '@bytelyst/events'; +import { requireJwtOrApiKey } from '../../lib/api-key-auth.js'; +import { BadRequestError, ForbiddenError } from '../../lib/errors.js'; +import * as runRepo from '../runs/repository.js'; +import * as sessionRepo from '../sessions/repository.js'; + +function mapOriginSurface( + platform: string +): 'browser' | 'mobile' | 'desktop' | 'web' | 'product-api' { + switch (platform) { + case 'ios': + case 'android': + return 'mobile'; + case 'desktop': + return 'desktop'; + case 'web': + return 'web'; + default: + return 'product-api'; + } +} + +function mapSessionStatus(session: { + revokedAt?: string; + expiresAt: string; +}): AgentSession['status'] { + if (session.revokedAt) return 'cancelled'; + if (new Date(session.expiresAt).getTime() < Date.now()) return 'completed'; + return 'active'; +} + +function mapRunStatus(status: string): AgentRun['status'] { + switch (status) { + case 'running': + return 'running'; + case 'succeeded': + return 'completed'; + case 'failed': + return 'failed'; + case 'cancelled': + return 'cancelled'; + case 'queued': + default: + return 'paused'; + } +} + +function toAgentSession(doc: { + id: string; + productId: string; + userId: string; + platform: string; + createdAt: string; + lastActiveAt: string; + expiresAt: string; + revokedAt?: string; +}): AgentSession { + return AgentSessionSchema.parse({ + sessionId: doc.id, + productId: doc.productId, + userId: doc.userId, + status: mapSessionStatus(doc), + startedAt: doc.createdAt, + updatedAt: doc.lastActiveAt, + resumable: !doc.revokedAt, + currentTaskId: null, + memoryRefs: [], + artifactRefs: [], + approvalRefs: [], + dispatchContext: { + originSurface: mapOriginSurface(doc.platform), + originProductId: doc.productId, + dispatchMode: 'interactive', + initiatedAt: doc.createdAt, + }, + }); +} + +function toAgentRun(doc: { + id: string; + productId: string; + status: string; + createdAt: string; + completedAt?: string; + metadata?: Record; +}): AgentRun { + return AgentRunSchema.parse({ + runId: doc.id, + sessionId: + typeof doc.metadata?.sessionId === 'string' && doc.metadata.sessionId.length > 0 + ? doc.metadata.sessionId + : `sessionless_${doc.id}`, + productId: doc.productId, + status: mapRunStatus(doc.status), + startedAt: doc.createdAt, + completedAt: doc.completedAt ?? null, + checkpointArtifactId: + typeof doc.metadata?.checkpointArtifactId === 'string' + ? doc.metadata.checkpointArtifactId + : null, + correlationId: + typeof doc.metadata?.correlationId === 'string' ? doc.metadata.correlationId : null, + }); +} + +export async function agentRuntimeRoutes(app: FastifyInstance) { + app.get('/agent-runtime/sessions', async req => { + const access = await requireJwtOrApiKey(req, { + allowJwt: true, + apiKeyScopes: ['jobs:read'], + apiKeyTokenTypes: ['service_api'], + }); + + const requestedUserId = + typeof (req.query as { userId?: string }).userId === 'string' + ? (req.query as { userId?: string }).userId + : undefined; + + if (access.source === 'jwt') { + const isAdmin = req.jwtPayload?.role === 'admin' || req.jwtPayload?.role === 'super_admin'; + if (!isAdmin && requestedUserId && requestedUserId !== access.actorId) { + throw new ForbiddenError('Non-admin callers can only access their own runtime sessions'); + } + } + + const userId = access.source === 'jwt' ? (requestedUserId ?? access.actorId) : requestedUserId; + if (!userId) { + throw new BadRequestError('userId is required for API key callers'); + } + + const docs = await sessionRepo.listAllUserSessions(userId); + return { + sessions: docs.map(toAgentSession), + count: docs.length, + }; + }); + + app.get('/agent-runtime/runs', async req => { + const access = await requireJwtOrApiKey(req, { + jwtRoles: ['super_admin', 'admin'], + apiKeyScopes: ['jobs:read'], + apiKeyTokenTypes: ['service_api'], + rateLimitKey: 'jobs:read', + }); + + const limitRaw = (req.query as { limit?: string }).limit; + const limit = limitRaw ? Number(limitRaw) : 20; + const docs = await runRepo.listRuns(access.productId, { + limit, + kind: undefined, + status: undefined, + }); + + return { + runs: docs.map(toAgentRun), + count: docs.length, + }; + }); + + app.post('/agent-runtime/dispatch/validate', async req => { + const access = await requireJwtOrApiKey(req, { + allowJwt: true, + apiKeyScopes: ['jobs:write'], + apiKeyTokenTypes: ['service_api'], + rateLimitKey: 'jobs:write', + }); + + const parsed = AgentDispatchRequestSchema.safeParse(req.body); + if (!parsed.success) { + throw new BadRequestError(parsed.error.issues.map(issue => issue.message).join('; ')); + } + + if (parsed.data.targetProductId !== access.productId) { + throw new BadRequestError( + `Dispatch target product '${parsed.data.targetProductId}' must match authenticated product '${access.productId}'` + ); + } + + return { + valid: true, + dispatchRequest: parsed.data, + }; + }); +} diff --git a/services/platform-service/src/server.test.ts b/services/platform-service/src/server.test.ts index 5ef8aa06..75d12f75 100644 --- a/services/platform-service/src/server.test.ts +++ b/services/platform-service/src/server.test.ts @@ -44,6 +44,7 @@ vi.mock('@bytelyst/fastify-core', () => ({ vi.mock('./modules/products/routes.js', () => ({ productRoutes: vi.fn() })); vi.mock('./modules/products/cache.js', () => ({ loadProductCache: loadProductCacheMock })); vi.mock('./modules/auth/routes.js', () => ({ authRoutes: vi.fn() })); +vi.mock('./modules/agent-runtime/routes.js', () => ({ agentRuntimeRoutes: vi.fn() })); vi.mock('./modules/audit/routes.js', () => ({ auditRoutes: vi.fn() })); vi.mock('./modules/notifications/routes.js', () => ({ notificationRoutes: vi.fn() })); vi.mock('./modules/timeline/routes.js', () => ({ timelineRoutes: vi.fn() })); diff --git a/services/platform-service/src/server.ts b/services/platform-service/src/server.ts index 210acbea..899d022d 100644 --- a/services/platform-service/src/server.ts +++ b/services/platform-service/src/server.ts @@ -29,6 +29,7 @@ import { oauthRoutes } from './modules/auth/oauth/routes.js'; import { mfaRoutes } from './modules/auth/mfa/routes.js'; import { passkeyRoutes } from './modules/auth/passkeys/routes.js'; import { deviceRoutes } from './modules/auth/devices/routes.js'; +import { agentRuntimeRoutes } from './modules/agent-runtime/routes.js'; import { loginEventRoutes } from './modules/auth/login-events/routes.js'; import { pushApprovalRoutes } from './modules/auth/push-approvals/routes.js'; import { qrAuthRoutes } from './modules/auth/qr-auth/routes.js'; @@ -167,6 +168,7 @@ await app.register(oauthRoutes, { prefix: '/api' }); await app.register(mfaRoutes, { prefix: '/api' }); await app.register(passkeyRoutes, { prefix: '/api' }); await app.register(deviceRoutes, { prefix: '/api' }); +await app.register(agentRuntimeRoutes, { prefix: '/api' }); await app.register(loginEventRoutes, { prefix: '/api' }); await app.register(pushApprovalRoutes, { prefix: '/api' }); await app.register(qrAuthRoutes, { prefix: '/api' });