diff --git a/services/cowork-service/src/lib/ipc-bridge.test.ts b/services/cowork-service/src/lib/ipc-bridge.test.ts index 5c56bf09..fafed170 100644 --- a/services/cowork-service/src/lib/ipc-bridge.test.ts +++ b/services/cowork-service/src/lib/ipc-bridge.test.ts @@ -179,3 +179,95 @@ describe('IpcBridge convenience methods', () => { }); }); }); + +describe('reverse IPC (incoming requests)', () => { + it('onIncomingRequest registers a handler', () => { + const bridge = new IpcBridge(); + const handler = vi.fn().mockResolvedValue({ ok: true }); + bridge.onIncomingRequest(handler); + // Handler is registered (no public getter, but we test via handleLine below) + expect(handler).not.toHaveBeenCalled(); + }); + + it('handleLine routes incoming requests to the registered handler', async () => { + const bridge = new IpcBridge(); + const handler = vi.fn().mockResolvedValue({ response: 'hello', provider: 'test', model: 'test-model' }); + bridge.onIncomingRequest(handler); + + // Simulate an incoming JSON-RPC request line from Rust + const incomingLine = JSON.stringify({ + jsonrpc: '2.0', + id: 42, + method: 'intercept_llm', + params: { messages: [{ role: 'user', content: 'hi' }], model: 'test-model' }, + }); + + // Access private handleLine via any cast (test-only) + (bridge as any).handleLine(incomingLine); + + // Wait for async handler + await new Promise((r) => setTimeout(r, 10)); + + expect(handler).toHaveBeenCalledWith('intercept_llm', { + messages: [{ role: 'user', content: 'hi' }], + model: 'test-model', + }); + }); + + it('handleLine sends error response when no handler is registered', () => { + const bridge = new IpcBridge(); + const sendSpy = vi.spyOn(bridge as any, 'sendResponse'); + + const incomingLine = JSON.stringify({ + jsonrpc: '2.0', + id: 99, + method: 'unknown_method', + params: {}, + }); + + (bridge as any).handleLine(incomingLine); + + expect(sendSpy).toHaveBeenCalledWith(99, undefined, { + code: -32601, + message: "No handler for 'unknown_method'", + }); + }); + + it('handleLine sends error response when handler throws', async () => { + const bridge = new IpcBridge(); + const handler = vi.fn().mockRejectedValue(new Error('LLM router not initialized')); + bridge.onIncomingRequest(handler); + const sendSpy = vi.spyOn(bridge as any, 'sendResponse'); + + const incomingLine = JSON.stringify({ + jsonrpc: '2.0', + id: 50, + method: 'intercept_llm', + params: { messages: [] }, + }); + + (bridge as any).handleLine(incomingLine); + await new Promise((r) => setTimeout(r, 10)); + + expect(sendSpy).toHaveBeenCalledWith(50, undefined, { + code: -32000, + message: 'LLM router not initialized', + }); + }); + + it('handleLine distinguishes incoming requests from responses', () => { + const bridge = new IpcBridge(); + const handler = vi.fn().mockResolvedValue({}); + bridge.onIncomingRequest(handler); + + // Response (no method field) — should not call handler + const responseLine = JSON.stringify({ jsonrpc: '2.0', id: 1, result: { ok: true } }); + (bridge as any).handleLine(responseLine); + expect(handler).not.toHaveBeenCalled(); + + // Request (has method field) — should call handler + const requestLine = JSON.stringify({ jsonrpc: '2.0', id: 2, method: 'test', params: {} }); + (bridge as any).handleLine(requestLine); + expect(handler).toHaveBeenCalledWith('test', {}); + }); +}); diff --git a/services/cowork-service/src/lib/ipc-bridge.ts b/services/cowork-service/src/lib/ipc-bridge.ts index 0badaeeb..5ae4a7da 100644 --- a/services/cowork-service/src/lib/ipc-bridge.ts +++ b/services/cowork-service/src/lib/ipc-bridge.ts @@ -193,6 +193,19 @@ export class IpcBridge { return this.call('update_verdict', { verdict, auth }); } + // ── Incoming request handler (Rust → TS, reverse IPC) ── + + private incomingHandler: ((method: string, params: Record) => Promise) | null = null; + + /** + * Register a handler for incoming requests FROM the Rust runtime. + * Used for LLM interception: Rust sends `intercept_llm` requests, + * cowork-service routes them through the multi-provider LLM router. + */ + onIncomingRequest(handler: (method: string, params: Record) => Promise): void { + this.incomingHandler = handler; + } + /** Send shutdown and close the child process. */ async shutdown(): Promise { if (!this.child) return; @@ -214,14 +227,22 @@ export class IpcBridge { // ── Private ── private handleLine(line: string): void { - let resp: IpcResponse; + let msg: Record; try { - resp = JSON.parse(line); + msg = JSON.parse(line); } catch { - this.log.error(`IPC: unparseable response: ${line.slice(0, 200)}`); + this.log.error(`IPC: unparseable line: ${line.slice(0, 200)}`); return; } + // Incoming request FROM Rust (has 'method' field — reverse IPC) + if (typeof msg.method === 'string') { + this.handleIncoming(msg as unknown as IpcRequest); + return; + } + + // Normal response TO a pending call + const resp = msg as unknown as IpcResponse; const pending = this.pending.get(resp.id); if (!pending) { this.log.error(`IPC: unexpected response id=${resp.id}`); @@ -233,6 +254,28 @@ export class IpcBridge { pending.resolve(resp); } + /** Handle an incoming request from the Rust runtime (reverse IPC). */ + private handleIncoming(req: IpcRequest): void { + if (!this.incomingHandler) { + this.sendResponse(req.id, undefined, { code: -32601, message: `No handler for '${req.method}'` }); + return; + } + + this.incomingHandler(req.method, req.params) + .then((result) => this.sendResponse(req.id, result)) + .catch((err) => this.sendResponse(req.id, undefined, { + code: -32000, + message: err instanceof Error ? err.message : 'Handler error', + })); + } + + /** Send a JSON-RPC response back to the Rust runtime. */ + private sendResponse(id: number, result?: unknown, error?: { code: number; message: string }): void { + if (!this.child?.stdin?.writable) return; + const resp: IpcResponse = { jsonrpc: '2.0', id, ...(error ? { error } : { result }) }; + this.child.stdin.write(JSON.stringify(resp) + '\n'); + } + private rejectAllPending(err: Error): void { for (const [_id, p] of this.pending) { clearTimeout(p.timer); diff --git a/services/cowork-service/src/server.test.ts b/services/cowork-service/src/server.test.ts index 070b03a4..4c13bd6e 100644 --- a/services/cowork-service/src/server.test.ts +++ b/services/cowork-service/src/server.test.ts @@ -55,6 +55,7 @@ vi.mock('./lib/ipc-bridge.js', () => ({ isRunning: false, start: vi.fn(async () => { throw new Error('no binary in test'); }), shutdown: vi.fn(async () => undefined), + onIncomingRequest: vi.fn(), })), })); vi.mock('./lib/flush-scheduler.js', () => ({ diff --git a/services/cowork-service/src/server.ts b/services/cowork-service/src/server.ts index 3e2ae64a..fb4af849 100644 --- a/services/cowork-service/src/server.ts +++ b/services/cowork-service/src/server.ts @@ -23,7 +23,7 @@ import { config } from './lib/config.js'; import { productConfig, PRODUCT_ID } from './lib/product-config.js'; import { getIpcBridge } from './lib/ipc-bridge.js'; import { getFlushScheduler } from './lib/flush-scheduler.js'; -import { initLlmRouter } from './lib/llm-router.js'; +import { initLlmRouter, getLlmRouter, isLlmRouterReady } from './lib/llm-router.js'; import { llmRoutes } from './modules/llm/routes.js'; import { auditRoutes } from './modules/audit/routes.js'; import { usageRoutes } from './modules/usage/routes.js'; @@ -94,6 +94,58 @@ try { app.log.warn({ err }, 'LLM router not available — no provider API keys configured'); } +// Register reverse-IPC handler for LLM interception (Rust → TS) +// When the Rust runtime needs a non-Anthropic LLM call, it sends intercept_llm +// and cowork-service routes through the multi-provider LLM router. +bridge.onIncomingRequest(async (method, params) => { + if (method === 'intercept_llm') { + if (!isLlmRouterReady()) { + throw new Error('LLM router not initialized — no providers configured'); + } + const messages = (params.messages as Array<{ role: string; content: string }>) ?? []; + if (messages.length === 0) throw new Error('messages array is required'); + + const startMs = Date.now(); + const result = await getLlmRouter().chat({ + messages: messages.map(m => ({ role: m.role as 'system' | 'user' | 'assistant', content: m.content })), + model: (params.model as string) || undefined, + temperature: (params.temperature as number) || undefined, + max_tokens: (params.max_tokens as number) || undefined, + }); + + // Record spend for budget tracking (best-effort) + const costUsd = (result as Record).costUsd as number | undefined; + if (costUsd && params.auth && params.taskId) { + bridge.recordSpend( + result.model, + 0, // token counts not always available from router + 0, + costUsd, + params.auth as Record, + params.taskId as string, + ).catch((err) => app.log.warn({ err }, 'Failed to record LLM spend')); + } + + app.log.info({ + method: 'intercept_llm', + provider: result.provider, + model: result.model, + latencyMs: Date.now() - startMs, + attempts: result.attempts, + }, 'LLM interception completed'); + + return { + response: result.response, + provider: result.provider, + model: result.model, + totalLatencyMs: result.totalLatencyMs, + attempts: result.attempts, + }; + } + + throw new Error(`Unknown incoming method: ${method}`); +}); + // Start flush scheduler (periodic drain of IPC buffers → platform-service) const scheduler = getFlushScheduler(app.log); if (bridge.isRunning) {