feat(cowork-service): IPC LLM interception — Rust runtime delegates LLM calls to multi-provider router
Add reverse-IPC protocol support: Rust runtime can send intercept_llm requests to cowork-service, which routes them through @bytelyst/llm-router. Changes: - ipc-bridge.ts: handleLine now detects incoming requests (has 'method' field) vs normal responses. New handleIncoming() + sendResponse() for reverse IPC. New onIncomingRequest() to register the handler. - server.ts: Wires intercept_llm handler — validates messages, calls getLlmRouter().chat(), records spend for budget tracking, logs provider/model. - ipc-bridge.test.ts: 5 new tests for reverse IPC (handler registration, routing, error handling, request vs response disambiguation). - server.test.ts: Updated IPC bridge mock with onIncomingRequest. Test count: 85 (was 80)
This commit is contained in:
parent
a9d4c63e03
commit
d838cd658b
@ -179,3 +179,95 @@ describe('IpcBridge convenience methods', () => {
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
describe('reverse IPC (incoming requests)', () => {
|
||||
it('onIncomingRequest registers a handler', () => {
|
||||
const bridge = new IpcBridge();
|
||||
const handler = vi.fn().mockResolvedValue({ ok: true });
|
||||
bridge.onIncomingRequest(handler);
|
||||
// Handler is registered (no public getter, but we test via handleLine below)
|
||||
expect(handler).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it('handleLine routes incoming requests to the registered handler', async () => {
|
||||
const bridge = new IpcBridge();
|
||||
const handler = vi.fn().mockResolvedValue({ response: 'hello', provider: 'test', model: 'test-model' });
|
||||
bridge.onIncomingRequest(handler);
|
||||
|
||||
// Simulate an incoming JSON-RPC request line from Rust
|
||||
const incomingLine = JSON.stringify({
|
||||
jsonrpc: '2.0',
|
||||
id: 42,
|
||||
method: 'intercept_llm',
|
||||
params: { messages: [{ role: 'user', content: 'hi' }], model: 'test-model' },
|
||||
});
|
||||
|
||||
// Access private handleLine via any cast (test-only)
|
||||
(bridge as any).handleLine(incomingLine);
|
||||
|
||||
// Wait for async handler
|
||||
await new Promise((r) => setTimeout(r, 10));
|
||||
|
||||
expect(handler).toHaveBeenCalledWith('intercept_llm', {
|
||||
messages: [{ role: 'user', content: 'hi' }],
|
||||
model: 'test-model',
|
||||
});
|
||||
});
|
||||
|
||||
it('handleLine sends error response when no handler is registered', () => {
|
||||
const bridge = new IpcBridge();
|
||||
const sendSpy = vi.spyOn(bridge as any, 'sendResponse');
|
||||
|
||||
const incomingLine = JSON.stringify({
|
||||
jsonrpc: '2.0',
|
||||
id: 99,
|
||||
method: 'unknown_method',
|
||||
params: {},
|
||||
});
|
||||
|
||||
(bridge as any).handleLine(incomingLine);
|
||||
|
||||
expect(sendSpy).toHaveBeenCalledWith(99, undefined, {
|
||||
code: -32601,
|
||||
message: "No handler for 'unknown_method'",
|
||||
});
|
||||
});
|
||||
|
||||
it('handleLine sends error response when handler throws', async () => {
|
||||
const bridge = new IpcBridge();
|
||||
const handler = vi.fn().mockRejectedValue(new Error('LLM router not initialized'));
|
||||
bridge.onIncomingRequest(handler);
|
||||
const sendSpy = vi.spyOn(bridge as any, 'sendResponse');
|
||||
|
||||
const incomingLine = JSON.stringify({
|
||||
jsonrpc: '2.0',
|
||||
id: 50,
|
||||
method: 'intercept_llm',
|
||||
params: { messages: [] },
|
||||
});
|
||||
|
||||
(bridge as any).handleLine(incomingLine);
|
||||
await new Promise((r) => setTimeout(r, 10));
|
||||
|
||||
expect(sendSpy).toHaveBeenCalledWith(50, undefined, {
|
||||
code: -32000,
|
||||
message: 'LLM router not initialized',
|
||||
});
|
||||
});
|
||||
|
||||
it('handleLine distinguishes incoming requests from responses', () => {
|
||||
const bridge = new IpcBridge();
|
||||
const handler = vi.fn().mockResolvedValue({});
|
||||
bridge.onIncomingRequest(handler);
|
||||
|
||||
// Response (no method field) — should not call handler
|
||||
const responseLine = JSON.stringify({ jsonrpc: '2.0', id: 1, result: { ok: true } });
|
||||
(bridge as any).handleLine(responseLine);
|
||||
expect(handler).not.toHaveBeenCalled();
|
||||
|
||||
// Request (has method field) — should call handler
|
||||
const requestLine = JSON.stringify({ jsonrpc: '2.0', id: 2, method: 'test', params: {} });
|
||||
(bridge as any).handleLine(requestLine);
|
||||
expect(handler).toHaveBeenCalledWith('test', {});
|
||||
});
|
||||
});
|
||||
|
||||
@ -193,6 +193,19 @@ export class IpcBridge {
|
||||
return this.call('update_verdict', { verdict, auth });
|
||||
}
|
||||
|
||||
// ── Incoming request handler (Rust → TS, reverse IPC) ──
|
||||
|
||||
private incomingHandler: ((method: string, params: Record<string, unknown>) => Promise<unknown>) | null = null;
|
||||
|
||||
/**
|
||||
* Register a handler for incoming requests FROM the Rust runtime.
|
||||
* Used for LLM interception: Rust sends `intercept_llm` requests,
|
||||
* cowork-service routes them through the multi-provider LLM router.
|
||||
*/
|
||||
onIncomingRequest(handler: (method: string, params: Record<string, unknown>) => Promise<unknown>): void {
|
||||
this.incomingHandler = handler;
|
||||
}
|
||||
|
||||
/** Send shutdown and close the child process. */
|
||||
async shutdown(): Promise<void> {
|
||||
if (!this.child) return;
|
||||
@ -214,14 +227,22 @@ export class IpcBridge {
|
||||
// ── Private ──
|
||||
|
||||
private handleLine(line: string): void {
|
||||
let resp: IpcResponse;
|
||||
let msg: Record<string, unknown>;
|
||||
try {
|
||||
resp = JSON.parse(line);
|
||||
msg = JSON.parse(line);
|
||||
} catch {
|
||||
this.log.error(`IPC: unparseable response: ${line.slice(0, 200)}`);
|
||||
this.log.error(`IPC: unparseable line: ${line.slice(0, 200)}`);
|
||||
return;
|
||||
}
|
||||
|
||||
// Incoming request FROM Rust (has 'method' field — reverse IPC)
|
||||
if (typeof msg.method === 'string') {
|
||||
this.handleIncoming(msg as unknown as IpcRequest);
|
||||
return;
|
||||
}
|
||||
|
||||
// Normal response TO a pending call
|
||||
const resp = msg as unknown as IpcResponse;
|
||||
const pending = this.pending.get(resp.id);
|
||||
if (!pending) {
|
||||
this.log.error(`IPC: unexpected response id=${resp.id}`);
|
||||
@ -233,6 +254,28 @@ export class IpcBridge {
|
||||
pending.resolve(resp);
|
||||
}
|
||||
|
||||
/** Handle an incoming request from the Rust runtime (reverse IPC). */
|
||||
private handleIncoming(req: IpcRequest): void {
|
||||
if (!this.incomingHandler) {
|
||||
this.sendResponse(req.id, undefined, { code: -32601, message: `No handler for '${req.method}'` });
|
||||
return;
|
||||
}
|
||||
|
||||
this.incomingHandler(req.method, req.params)
|
||||
.then((result) => this.sendResponse(req.id, result))
|
||||
.catch((err) => this.sendResponse(req.id, undefined, {
|
||||
code: -32000,
|
||||
message: err instanceof Error ? err.message : 'Handler error',
|
||||
}));
|
||||
}
|
||||
|
||||
/** Send a JSON-RPC response back to the Rust runtime. */
|
||||
private sendResponse(id: number, result?: unknown, error?: { code: number; message: string }): void {
|
||||
if (!this.child?.stdin?.writable) return;
|
||||
const resp: IpcResponse = { jsonrpc: '2.0', id, ...(error ? { error } : { result }) };
|
||||
this.child.stdin.write(JSON.stringify(resp) + '\n');
|
||||
}
|
||||
|
||||
private rejectAllPending(err: Error): void {
|
||||
for (const [_id, p] of this.pending) {
|
||||
clearTimeout(p.timer);
|
||||
|
||||
@ -55,6 +55,7 @@ vi.mock('./lib/ipc-bridge.js', () => ({
|
||||
isRunning: false,
|
||||
start: vi.fn(async () => { throw new Error('no binary in test'); }),
|
||||
shutdown: vi.fn(async () => undefined),
|
||||
onIncomingRequest: vi.fn(),
|
||||
})),
|
||||
}));
|
||||
vi.mock('./lib/flush-scheduler.js', () => ({
|
||||
|
||||
@ -23,7 +23,7 @@ import { config } from './lib/config.js';
|
||||
import { productConfig, PRODUCT_ID } from './lib/product-config.js';
|
||||
import { getIpcBridge } from './lib/ipc-bridge.js';
|
||||
import { getFlushScheduler } from './lib/flush-scheduler.js';
|
||||
import { initLlmRouter } from './lib/llm-router.js';
|
||||
import { initLlmRouter, getLlmRouter, isLlmRouterReady } from './lib/llm-router.js';
|
||||
import { llmRoutes } from './modules/llm/routes.js';
|
||||
import { auditRoutes } from './modules/audit/routes.js';
|
||||
import { usageRoutes } from './modules/usage/routes.js';
|
||||
@ -94,6 +94,58 @@ try {
|
||||
app.log.warn({ err }, 'LLM router not available — no provider API keys configured');
|
||||
}
|
||||
|
||||
// Register reverse-IPC handler for LLM interception (Rust → TS)
|
||||
// When the Rust runtime needs a non-Anthropic LLM call, it sends intercept_llm
|
||||
// and cowork-service routes through the multi-provider LLM router.
|
||||
bridge.onIncomingRequest(async (method, params) => {
|
||||
if (method === 'intercept_llm') {
|
||||
if (!isLlmRouterReady()) {
|
||||
throw new Error('LLM router not initialized — no providers configured');
|
||||
}
|
||||
const messages = (params.messages as Array<{ role: string; content: string }>) ?? [];
|
||||
if (messages.length === 0) throw new Error('messages array is required');
|
||||
|
||||
const startMs = Date.now();
|
||||
const result = await getLlmRouter().chat({
|
||||
messages: messages.map(m => ({ role: m.role as 'system' | 'user' | 'assistant', content: m.content })),
|
||||
model: (params.model as string) || undefined,
|
||||
temperature: (params.temperature as number) || undefined,
|
||||
max_tokens: (params.max_tokens as number) || undefined,
|
||||
});
|
||||
|
||||
// Record spend for budget tracking (best-effort)
|
||||
const costUsd = (result as Record<string, unknown>).costUsd as number | undefined;
|
||||
if (costUsd && params.auth && params.taskId) {
|
||||
bridge.recordSpend(
|
||||
result.model,
|
||||
0, // token counts not always available from router
|
||||
0,
|
||||
costUsd,
|
||||
params.auth as Record<string, unknown>,
|
||||
params.taskId as string,
|
||||
).catch((err) => app.log.warn({ err }, 'Failed to record LLM spend'));
|
||||
}
|
||||
|
||||
app.log.info({
|
||||
method: 'intercept_llm',
|
||||
provider: result.provider,
|
||||
model: result.model,
|
||||
latencyMs: Date.now() - startMs,
|
||||
attempts: result.attempts,
|
||||
}, 'LLM interception completed');
|
||||
|
||||
return {
|
||||
response: result.response,
|
||||
provider: result.provider,
|
||||
model: result.model,
|
||||
totalLatencyMs: result.totalLatencyMs,
|
||||
attempts: result.attempts,
|
||||
};
|
||||
}
|
||||
|
||||
throw new Error(`Unknown incoming method: ${method}`);
|
||||
});
|
||||
|
||||
// Start flush scheduler (periodic drain of IPC buffers → platform-service)
|
||||
const scheduler = getFlushScheduler(app.log);
|
||||
if (bridge.isRunning) {
|
||||
|
||||
Loading…
Reference in New Issue
Block a user