feat(cowork-service): add runtime projection routes

This commit is contained in:
Saravana Achu Mac 2026-04-04 01:52:04 -07:00
parent 21a6596434
commit 023826e413
8 changed files with 8996 additions and 2922 deletions

View File

@ -240,11 +240,13 @@ These should be resolved before claiming the ecosystem docs are fully implementa
- the first hosted internal runtime UI supports projected session review, projected run review, and dispatch payload validation
- [ ] wire first product implementations to emit the shared runtime objects directly from Cowork and FlowMonk
Status note:
- Cowork product-native runtime projections are now implemented in cowork-service
- `COMMIT_PENDING` adds `GET /api/agent-runtime/sessions`, `GET /api/agent-runtime/runs`, `GET /api/agent-runtime/approvals`, and `POST /api/agent-runtime/dispatch/validate`
- FlowMonk runtime-emitter implementation work was started, but this clone cannot currently verify it because the repo depends on a local npm registry at `http://localhost:3300` and backend dependencies are not installed
### 6.1 Remaining Direct Runtime TODOs
- Cowork: emit canonical `AgentSession`, `AgentTask`, `AgentRun`, and `AgentApprovalCheckpoint` objects from live task orchestration.
- Cowork: add `AgentTask`/`AgentTodo` direct projections once the product exposes first-class task/todo entities beyond current task-run/session surfaces.
- Cowork: attach canonical event IDs to approval and audit trails so ActionTrail lineage can stop using fallback/null semantics.
- FlowMonk: finish direct runtime-emitter integration once the local npm registry and backend dependencies are available again.
- Platform-service: refine the `queued -> paused` projection fallback once run-vs-session semantics are finalized.

View File

@ -1,7 +1,7 @@
# Phase 5 Execution Plan
> **Flow:** Shared agent runtime contract baseline
> **Status:** Baseline implemented, platform integration and first hosted UI in progress
> **Status:** Baseline implemented, Cowork product integration implemented, FlowMonk follow-up pending
> **Owner:** `learning_ai_common_plat`
> **Purpose:** Turn the runtime contract draft into concrete schemas for sessions, tasks, todos, runs, approvals, dispatch, and action logs.
@ -66,6 +66,12 @@ Observed baseline:
- platform-service now exposes `POST /api/agent-runtime/dispatch/validate` against the canonical dispatch schema
- admin-web now exposes the first hosted internal runtime console at `/agent-runtime`
- the hosted UI supports projected session review, projected run review, and dispatch payload validation
- cowork-service now exposes direct product-native runtime projections at:
- `GET /api/agent-runtime/sessions`
- `GET /api/agent-runtime/runs`
- `GET /api/agent-runtime/approvals`
- `POST /api/agent-runtime/dispatch/validate`
- Cowork projections now emit canonical `AgentSession`, `AgentRun`, and `AgentApprovalCheckpoint` objects from the product backend instead of only relying on platform projections
---
@ -86,9 +92,10 @@ Observed baseline:
- `3f2482b` runtime contract schema baseline
- `fe36296` platform-service runtime projection + dispatch validation
- `71ef2ac` admin-web hosted runtime console
- `COMMIT_PENDING` cowork-service runtime projection routes
## 7. Remaining Gaps
- Cowork still needs to emit the shared runtime objects directly from its own task orchestration flow.
- Cowork now emits shared runtime projections from cowork-service, but Rust-side canonical event IDs are still missing on approval/audit records.
- FlowMonk runtime-emitter code was started locally, but verification in this clone is blocked because the repo depends on a local npm registry at `http://localhost:3300` and its backend `node_modules` are missing.
- run-vs-session semantics for queued work still need a stricter mapping than the current projection fallback.

11435
pnpm-lock.yaml generated

File diff suppressed because it is too large Load Diff

View File

@ -18,6 +18,7 @@
"@bytelyst/backend-flags": "workspace:*",
"@bytelyst/backend-telemetry": "workspace:*",
"@bytelyst/config": "workspace:*",
"@bytelyst/events": "workspace:*",
"@bytelyst/errors": "workspace:*",
"@bytelyst/fastify-auth": "workspace:*",
"@bytelyst/fastify-core": "workspace:*",

View File

@ -0,0 +1,190 @@
import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest';
import Fastify from 'fastify';
import { agentRuntimeRoutes } from './routes.js';
import { setIpcBridge } from '../../lib/ipc-bridge.js';
vi.mock('../../lib/config.js', () => ({
config: {
PLATFORM_SERVICE_URL: 'http://mock-platform:4003',
},
}));
vi.mock('../../lib/product-config.js', () => ({
PRODUCT_ID: 'clawcowork',
productConfig: {
backendPort: 4009,
},
}));
vi.mock('../../lib/request-context.js', () => ({
getUserId: vi.fn(() => 'demo-user'),
}));
const mockFetch = vi.fn();
vi.stubGlobal('fetch', mockFetch);
let app: ReturnType<typeof Fastify>;
const call = vi.fn();
beforeEach(async () => {
setIpcBridge({
isRunning: true,
call,
} as any);
app = Fastify({ logger: false });
app.decorateRequest('jwtPayload', null);
await app.register(agentRuntimeRoutes);
call.mockReset();
mockFetch.mockReset();
});
afterEach(async () => {
setIpcBridge(null);
await app.close();
});
describe('agent runtime routes', () => {
it('projects IPC sessions into shared AgentSession objects', async () => {
call.mockResolvedValue({
result: {
sessions: [
{
id: 'sess-1',
userId: 'demo-user',
currentTaskId: 'task-1',
createdAt: '2026-04-04T08:00:00.000Z',
updatedAt: '2026-04-04T08:05:00.000Z',
},
],
},
});
const res = await app.inject({ method: 'GET', url: '/api/agent-runtime/sessions' });
expect(res.statusCode).toBe(200);
const body = JSON.parse(res.payload);
expect(body.sessions[0]).toMatchObject({
sessionId: 'sess-1',
userId: 'demo-user',
productId: 'clawcowork',
currentTaskId: 'task-1',
status: 'active',
});
});
it('projects IPC tasks into shared AgentRun objects', async () => {
call.mockResolvedValue({
result: {
tasks: [
{
id: 'task-1',
status: 'running',
createdAt: '2026-04-04T08:00:00.000Z',
updatedAt: '2026-04-04T08:10:00.000Z',
sessionId: 'sess-1',
correlationId: 'corr-1',
},
],
},
});
const res = await app.inject({ method: 'GET', url: '/api/agent-runtime/runs' });
expect(res.statusCode).toBe(200);
const body = JSON.parse(res.payload);
expect(body.runs[0]).toMatchObject({
runId: 'task-1',
sessionId: 'sess-1',
productId: 'clawcowork',
status: 'running',
correlationId: 'corr-1',
});
});
it('projects approval audit records into AgentApprovalCheckpoint objects', async () => {
mockFetch
.mockResolvedValueOnce({
ok: true,
json: async () => ({
records: [
{
id: 'audit-1',
action: 'approval_granted',
timestamp: '2026-04-04T09:00:00.000Z',
details: {
taskId: 'task-1',
sessionId: 'sess-1',
tool: 'rm',
resolverSurface: 'desktop',
},
},
],
}),
})
.mockResolvedValueOnce({
ok: true,
json: async () => ({ records: [] }),
});
const res = await app.inject({ method: 'GET', url: '/api/agent-runtime/approvals' });
expect(res.statusCode).toBe(200);
const body = JSON.parse(res.payload);
expect(body.approvals[0]).toMatchObject({
approvalId: 'audit-1',
sessionId: 'sess-1',
runId: 'task-1',
status: 'approved',
resolverSurface: 'desktop',
});
});
it('validates Cowork-targeted dispatch payloads', async () => {
const res = await app.inject({
method: 'POST',
url: '/api/agent-runtime/dispatch/validate',
payload: {
dispatchId: 'dispatch-1',
targetProductId: 'clawcowork',
targetExecutor: 'cowork',
userId: 'demo-user',
title: 'Review workspace changes',
intent: 'Summarize the repo changes and create a patch plan.',
artifactRefs: [],
memoryRefs: [],
dispatchContext: {
originSurface: 'desktop',
originProductId: 'clawcowork',
dispatchMode: 'interactive',
initiatedAt: '2026-04-04T10:00:00.000Z',
},
},
});
expect(res.statusCode).toBe(200);
expect(JSON.parse(res.payload).valid).toBe(true);
});
it('rejects dispatch payloads for other products', async () => {
const res = await app.inject({
method: 'POST',
url: '/api/agent-runtime/dispatch/validate',
payload: {
dispatchId: 'dispatch-1',
targetProductId: 'other-product',
targetExecutor: 'cowork',
userId: 'demo-user',
title: 'Review workspace changes',
intent: 'Summarize the repo changes and create a patch plan.',
artifactRefs: [],
memoryRefs: [],
dispatchContext: {
originSurface: 'desktop',
originProductId: 'clawcowork',
dispatchMode: 'interactive',
initiatedAt: '2026-04-04T10:00:00.000Z',
},
},
});
expect(res.statusCode).toBe(400);
});
});

View File

@ -0,0 +1,270 @@
import type { FastifyInstance, FastifyRequest } from 'fastify';
import {
AgentApprovalCheckpointSchema,
AgentDispatchRequestSchema,
AgentRunSchema,
AgentSessionSchema,
type AgentApprovalCheckpoint,
type AgentRun,
type AgentSession,
} from '@bytelyst/events';
import { BadRequestError } from '@bytelyst/errors';
import { config } from '../../lib/config.js';
import { getIpcBridge } from '../../lib/ipc-bridge.js';
import { PRODUCT_ID } from '../../lib/product-config.js';
import { getUserId } from '../../lib/request-context.js';
function buildAuth(req: FastifyRequest): Record<string, unknown> {
const userId = getUserId(req);
const role = (req.jwtPayload as Record<string, unknown> | undefined)?.role ?? 'user';
return { userId, role, productId: PRODUCT_ID, isPlatformAuth: !!req.jwtPayload };
}
function asIsoString(value: unknown, fallback: string): string {
return typeof value === 'string' && !Number.isNaN(Date.parse(value)) ? value : fallback;
}
function mapTaskStatus(status: unknown): AgentRun['status'] {
switch (status) {
case 'running':
return 'running';
case 'completed':
return 'completed';
case 'failed':
return 'failed';
case 'cancelled':
return 'cancelled';
case 'pending':
default:
return 'paused';
}
}
function toAgentSession(session: Record<string, unknown>, fallbackNow: string): AgentSession {
const sessionId =
typeof session.id === 'string' && session.id.length > 0 ? session.id : `session_${fallbackNow}`;
const createdAt = asIsoString(
session.createdAt ?? session.startedAt ?? session.updatedAt,
fallbackNow
);
const updatedAt = asIsoString(session.updatedAt ?? session.lastActiveAt ?? createdAt, createdAt);
return AgentSessionSchema.parse({
sessionId,
productId: PRODUCT_ID,
userId:
typeof session.userId === 'string' && session.userId.length > 0
? session.userId
: 'unknown-user',
status: session.waitingApproval ? 'waiting-approval' : 'active',
startedAt: createdAt,
updatedAt,
resumable: true,
currentTaskId:
typeof session.currentTaskId === 'string' && session.currentTaskId.length > 0
? session.currentTaskId
: null,
memoryRefs: [],
artifactRefs: [],
approvalRefs: [],
dispatchContext: {
originSurface: 'desktop',
originProductId: PRODUCT_ID,
dispatchMode: 'interactive',
initiatedAt: createdAt,
},
});
}
function toAgentRun(task: Record<string, unknown>, fallbackNow: string): AgentRun {
const runId = typeof task.id === 'string' && task.id.length > 0 ? task.id : `run_${fallbackNow}`;
const startedAt = asIsoString(task.createdAt ?? task.startedAt, fallbackNow);
const status = mapTaskStatus(task.status);
const completedAt =
status === 'completed' || status === 'failed' || status === 'cancelled'
? asIsoString(task.updatedAt ?? task.completedAt, startedAt)
: null;
return AgentRunSchema.parse({
runId,
sessionId:
typeof task.sessionId === 'string' && task.sessionId.length > 0 ? task.sessionId : runId,
productId: PRODUCT_ID,
status,
startedAt,
completedAt,
checkpointArtifactId:
typeof task.checkpointArtifactId === 'string' ? task.checkpointArtifactId : null,
correlationId:
typeof task.correlationId === 'string'
? task.correlationId
: typeof task.id === 'string'
? task.id
: null,
});
}
function mapApprovalStatus(action: unknown): AgentApprovalCheckpoint['status'] {
switch (action) {
case 'approval_granted':
return 'approved';
case 'approval_denied':
return 'denied';
default:
return 'requested';
}
}
function mapResolverSurface(surface: unknown): 'mobile' | 'web' | 'desktop' | null {
switch (surface) {
case 'mobile':
case 'web':
case 'desktop':
return surface;
default:
return null;
}
}
function toApprovalCheckpoint(
record: Record<string, unknown>,
fallbackNow: string
): AgentApprovalCheckpoint {
const details =
record.details && typeof record.details === 'object'
? (record.details as Record<string, unknown>)
: {};
const status = mapApprovalStatus(record.action);
const requestedAt = asIsoString(record.timestamp ?? record.createdAt, fallbackNow);
return AgentApprovalCheckpointSchema.parse({
approvalId:
typeof record.id === 'string' && record.id.length > 0 ? record.id : `approval_${fallbackNow}`,
sessionId:
typeof details.sessionId === 'string' && details.sessionId.length > 0
? details.sessionId
: typeof details.taskId === 'string' && details.taskId.length > 0
? details.taskId
: 'unknown-session',
runId:
typeof details.runId === 'string' && details.runId.length > 0
? details.runId
: typeof details.taskId === 'string' && details.taskId.length > 0
? details.taskId
: 'unknown-run',
actionLabel:
typeof details.inputSummary === 'string' && details.inputSummary.length > 0
? details.inputSummary
: typeof details.tool === 'string' && details.tool.length > 0
? `Approve ${details.tool}`
: 'Approval required',
riskLevel: status === 'denied' ? 'high' : 'medium',
status,
requestedAt,
resolvedAt: status === 'requested' ? null : requestedAt,
resolverSurface: mapResolverSurface(details.resolverSurface),
});
}
async function fetchApprovalRecords(req: FastifyRequest): Promise<Record<string, unknown>[]> {
const params = new URLSearchParams({
days: '30',
limit: '100',
});
const actions = ['approval_granted', 'approval_denied'];
const records: Record<string, unknown>[] = [];
for (const action of actions) {
params.set('action', action);
const res = await fetch(`${config.PLATFORM_SERVICE_URL}/audit?${params.toString()}`, {
headers: {
'x-product-id': PRODUCT_ID,
'x-request-id': req.id,
},
});
if (!res.ok) {
throw new BadRequestError(`Platform returned ${res.status} while fetching approval records`);
}
const body = (await res.json()) as { records?: Record<string, unknown>[] };
records.push(...(Array.isArray(body.records) ? body.records : []));
}
return records;
}
export async function agentRuntimeRoutes(app: FastifyInstance) {
const bridge = getIpcBridge();
app.get('/api/agent-runtime/sessions', async req => {
if (!bridge.isRunning) {
return { sessions: [], count: 0 };
}
const resp = await bridge.call('list_sessions', { auth: buildAuth(req) });
if (resp.error) throw new BadRequestError(resp.error.message);
const raw =
(resp.result as { sessions?: Record<string, unknown>[] } | undefined)?.sessions ?? [];
const now = new Date().toISOString();
const sessions = raw.map(session => toAgentSession(session, now));
return {
sessions,
count: sessions.length,
};
});
app.get('/api/agent-runtime/runs', async req => {
if (!bridge.isRunning) {
return { runs: [], count: 0 };
}
const resp = await bridge.call('list_tasks', { auth: buildAuth(req) });
if (resp.error) throw new BadRequestError(resp.error.message);
const raw = (resp.result as { tasks?: Record<string, unknown>[] } | undefined)?.tasks ?? [];
const now = new Date().toISOString();
const runs = raw.map(task => toAgentRun(task, now));
return {
runs,
count: runs.length,
};
});
app.get('/api/agent-runtime/approvals', async req => {
const records = await fetchApprovalRecords(req);
const now = new Date().toISOString();
const approvals = records.map(record => toApprovalCheckpoint(record, now));
return {
approvals,
count: approvals.length,
};
});
app.post('/api/agent-runtime/dispatch/validate', async req => {
const parsed = AgentDispatchRequestSchema.safeParse(req.body);
if (!parsed.success) {
throw new BadRequestError(parsed.error.issues.map(issue => issue.message).join('; '));
}
if (parsed.data.targetProductId !== PRODUCT_ID) {
throw new BadRequestError(
`Dispatch target product '${parsed.data.targetProductId}' must match '${PRODUCT_ID}'`
);
}
if (parsed.data.targetExecutor !== 'cowork' && parsed.data.targetExecutor !== 'generic-agent') {
throw new BadRequestError('Cowork runtime only accepts cowork or generic-agent dispatch');
}
return {
valid: true,
dispatchRequest: parsed.data,
};
});
}

View File

@ -83,6 +83,7 @@ vi.mock('./modules/sessions/routes.js', () => ({ sessionRoutes: vi.fn() }));
vi.mock('./modules/plugins/routes.js', () => ({ pluginRoutes: vi.fn() }));
vi.mock('./modules/schedule/routes.js', () => ({ scheduleRoutes: vi.fn() }));
vi.mock('./modules/push/routes.js', () => ({ pushRoutes: vi.fn() }));
vi.mock('./modules/agent-runtime/routes.js', () => ({ agentRuntimeRoutes: vi.fn() }));
describe('cowork-service bootstrap', () => {
beforeEach(() => {
@ -102,9 +103,9 @@ describe('cowork-service bootstrap', () => {
expect(opts.version).toBe('0.1.0');
expect(opts.readiness).toBe(true);
// health + task + llm + audit + usage + notifications + extraction + marketplace + sessions + plugins + schedule + push = 12 register calls + 1 JWT
// health + task + llm + audit + usage + notifications + extraction + marketplace + sessions + plugins + schedule + push + agent-runtime = 13 register calls + 1 JWT
expect(registerOptionalJwtContextMock).toHaveBeenCalledOnce();
expect(appMock.register).toHaveBeenCalledTimes(12);
expect(appMock.register).toHaveBeenCalledTimes(13);
expect(startServiceMock).toHaveBeenCalledWith(appMock, { port: 4009, host: '0.0.0.0' });
});
});

View File

@ -34,6 +34,7 @@ import { sessionRoutes } from './modules/sessions/routes.js';
import { pluginRoutes } from './modules/plugins/routes.js';
import { scheduleRoutes } from './modules/schedule/routes.js';
import { pushRoutes } from './modules/push/routes.js';
import { agentRuntimeRoutes } from './modules/agent-runtime/routes.js';
import type { JwtPayload } from './lib/request-context.js';
const jwtSecret = new TextEncoder().encode(config.JWT_SECRET);
@ -72,6 +73,7 @@ await app.register(sessionRoutes);
await app.register(pluginRoutes);
await app.register(scheduleRoutes);
await app.register(pushRoutes);
await app.register(agentRuntimeRoutes);
// Bootstrap endpoint (same pattern as FlowMonk, ActionTrail, etc.)
app.get('/api/bootstrap', async () => ({