learning_ai_common_plat/services/cowork-service/src/lib/ipc-bridge.ts
saravanakumardb1 d838cd658b feat(cowork-service): IPC LLM interception — Rust runtime delegates LLM calls to multi-provider router
Add reverse-IPC protocol support: Rust runtime can send intercept_llm
requests to cowork-service, which routes them through @bytelyst/llm-router.

Changes:
- ipc-bridge.ts: handleLine now detects incoming requests (has 'method' field)
  vs normal responses. New handleIncoming() + sendResponse() for reverse IPC.
  New onIncomingRequest() to register the handler.
- server.ts: Wires intercept_llm handler — validates messages, calls
  getLlmRouter().chat(), records spend for budget tracking, logs provider/model.
- ipc-bridge.test.ts: 5 new tests for reverse IPC (handler registration,
  routing, error handling, request vs response disambiguation).
- server.test.ts: Updated IPC bridge mock with onIncomingRequest.

Test count: 85 (was 80)
2026-04-03 01:11:34 -07:00

304 lines
9.8 KiB
TypeScript

/**
* IPC Bridge — spawns the Rust cowork-orchestrator as a child process and
* communicates via JSON-RPC 2.0 over stdin/stdout.
*
* This is the TypeScript counterpart of `ipc_bridge.rs` in cowork-orchestrator.
* cowork-service spawns `cowork-orchestrator --ipc-bridge` and forwards all
* task/feature/audit/telemetry/budget requests through this bridge.
*
* Protocol: line-delimited JSON-RPC 2.0 (one JSON object per line).
*/
import { spawn, type ChildProcess } from 'node:child_process';
import { createInterface, type Interface as ReadlineInterface } from 'node:readline';
import { config } from './config.js';
import type { IpcRequest, IpcResponse } from '../modules/tasks/types.js';
// ── Types ──
export interface IpcBridgeOptions {
/** Path to cowork-orchestrator binary. */
bin?: string;
/** Extra CLI args (e.g., --admin-policy path). */
args?: string[];
/** Timeout for individual IPC calls (ms). */
timeoutMs?: number;
/** Environment variables passed to the child process. */
env?: Record<string, string>;
/** Logger (defaults to console-like no-op). */
logger?: { info: (msg: string) => void; error: (msg: string) => void };
}
type PendingRequest = {
resolve: (value: IpcResponse) => void;
reject: (err: Error) => void;
timer: ReturnType<typeof setTimeout>;
};
// ── IPC Bridge ──
export class IpcBridge {
private child: ChildProcess | null = null;
private rl: ReadlineInterface | null = null;
private nextId = 1;
private pending = new Map<number, PendingRequest>();
private readonly bin: string;
private readonly args: string[];
private readonly timeoutMs: number;
private readonly childEnv: Record<string, string>;
private readonly log: { info: (msg: string) => void; error: (msg: string) => void };
private _initialized = false;
constructor(opts: IpcBridgeOptions = {}) {
this.bin = opts.bin ?? config.RUST_RUNTIME_BIN;
this.args = ['--ipc-bridge', ...(opts.args ?? [])];
this.timeoutMs = opts.timeoutMs ?? config.RUST_RUNTIME_TIMEOUT_MS;
this.childEnv = opts.env ?? {};
this.log = opts.logger ?? { info: () => {}, error: () => {} };
}
/** Spawn the Rust child process and perform the initialize handshake. */
async start(): Promise<IpcResponse> {
if (this.child) {
throw new Error('IPC bridge already started');
}
const env = { ...process.env, ...this.childEnv };
if (config.ANTHROPIC_API_KEY) {
env.ANTHROPIC_API_KEY = config.ANTHROPIC_API_KEY;
}
this.child = spawn(this.bin, this.args, {
stdio: ['pipe', 'pipe', 'pipe'],
env,
});
this.child.on('error', (err) => {
this.log.error(`IPC child process error: ${err.message}`);
});
this.child.on('exit', (code, signal) => {
this.log.info(`IPC child process exited: code=${code} signal=${signal}`);
this.rejectAllPending(new Error(`IPC child process exited (code=${code})`));
this.child = null;
this.rl = null;
this._initialized = false;
});
// Read JSON-RPC responses line by line from stdout
this.rl = createInterface({ input: this.child.stdout! });
this.rl.on('line', (line) => this.handleLine(line));
// Pipe stderr to logger
if (this.child.stderr) {
const errRl = createInterface({ input: this.child.stderr });
errRl.on('line', (line) => this.log.error(`[rust] ${line}`));
}
// Perform initialize handshake
const resp = await this.call('initialize', {});
this._initialized = true;
this.log.info(`IPC bridge initialized: protocol=${resp.result && (resp.result as Record<string, unknown>).protocolVersion}`);
return resp;
}
/** Whether the bridge child process is running and initialized. */
get isRunning(): boolean {
return this._initialized && this.child !== null && this.child.exitCode === null;
}
/** Send a JSON-RPC call and await the response. */
async call(method: string, params: Record<string, unknown>): Promise<IpcResponse> {
if (!this.child?.stdin?.writable) {
throw new Error('IPC bridge not started');
}
const id = this.nextId++;
const request: IpcRequest = { jsonrpc: '2.0', id, method, params };
return new Promise<IpcResponse>((resolve, reject) => {
const timer = setTimeout(() => {
this.pending.delete(id);
reject(new Error(`IPC call '${method}' timed out after ${this.timeoutMs}ms`));
}, this.timeoutMs);
this.pending.set(id, { resolve, reject, timer });
const line = JSON.stringify(request) + '\n';
this.child!.stdin!.write(line, (err) => {
if (err) {
clearTimeout(timer);
this.pending.delete(id);
reject(new Error(`IPC write failed: ${err.message}`));
}
});
});
}
// ── Convenience methods matching Rust IPC handler methods ──
async submitTask(
goal: string,
folder: string,
auth: Record<string, unknown>,
opts: { model?: string; plugins?: string[] } = {},
): Promise<IpcResponse> {
return this.call('submit_task', { goal, folder, auth, ...opts });
}
async getTaskStatus(taskId: string, auth: Record<string, unknown>): Promise<IpcResponse> {
return this.call('get_task_status', { taskId, auth });
}
async cancelTask(taskId: string, auth: Record<string, unknown>): Promise<IpcResponse> {
return this.call('cancel_task', { taskId, auth });
}
async listTasks(auth: Record<string, unknown>, status?: string): Promise<IpcResponse> {
return this.call('list_tasks', { auth, ...(status ? { status } : {}) });
}
async getFeatures(auth: Record<string, unknown>): Promise<IpcResponse> {
return this.call('get_features', { auth });
}
async updateFlags(flags: Record<string, boolean>, auth: Record<string, unknown>): Promise<IpcResponse> {
return this.call('update_flags', { flags, auth });
}
async flushAudit(auth: Record<string, unknown>): Promise<IpcResponse> {
return this.call('flush_audit', { auth });
}
async flushTelemetry(auth: Record<string, unknown>): Promise<IpcResponse> {
return this.call('flush_telemetry', { auth });
}
async recordSpend(
model: string,
inputTokens: number,
outputTokens: number,
costUsd: number,
auth: Record<string, unknown>,
taskId?: string,
): Promise<IpcResponse> {
return this.call('record_spend', { model, inputTokens, outputTokens, costUsd, auth, taskId });
}
async flushBudget(auth: Record<string, unknown>): Promise<IpcResponse> {
return this.call('flush_budget', { auth });
}
async updateVerdict(verdict: Record<string, unknown>, auth: Record<string, unknown>): Promise<IpcResponse> {
return this.call('update_verdict', { verdict, auth });
}
// ── Incoming request handler (Rust → TS, reverse IPC) ──
private incomingHandler: ((method: string, params: Record<string, unknown>) => Promise<unknown>) | null = null;
/**
* Register a handler for incoming requests FROM the Rust runtime.
* Used for LLM interception: Rust sends `intercept_llm` requests,
* cowork-service routes them through the multi-provider LLM router.
*/
onIncomingRequest(handler: (method: string, params: Record<string, unknown>) => Promise<unknown>): void {
this.incomingHandler = handler;
}
/** Send shutdown and close the child process. */
async shutdown(): Promise<void> {
if (!this.child) return;
try {
await this.call('shutdown', {});
} catch {
// Ignore — process may already be exiting
}
// Guard: child may have been cleared by exit handler during the call above
if (this.child) {
this.child.kill('SIGTERM');
}
this.rejectAllPending(new Error('IPC bridge shutting down'));
this.child = null;
this.rl = null;
this._initialized = false;
}
// ── Private ──
private handleLine(line: string): void {
let msg: Record<string, unknown>;
try {
msg = JSON.parse(line);
} catch {
this.log.error(`IPC: unparseable line: ${line.slice(0, 200)}`);
return;
}
// Incoming request FROM Rust (has 'method' field — reverse IPC)
if (typeof msg.method === 'string') {
this.handleIncoming(msg as unknown as IpcRequest);
return;
}
// Normal response TO a pending call
const resp = msg as unknown as IpcResponse;
const pending = this.pending.get(resp.id);
if (!pending) {
this.log.error(`IPC: unexpected response id=${resp.id}`);
return;
}
clearTimeout(pending.timer);
this.pending.delete(resp.id);
pending.resolve(resp);
}
/** Handle an incoming request from the Rust runtime (reverse IPC). */
private handleIncoming(req: IpcRequest): void {
if (!this.incomingHandler) {
this.sendResponse(req.id, undefined, { code: -32601, message: `No handler for '${req.method}'` });
return;
}
this.incomingHandler(req.method, req.params)
.then((result) => this.sendResponse(req.id, result))
.catch((err) => this.sendResponse(req.id, undefined, {
code: -32000,
message: err instanceof Error ? err.message : 'Handler error',
}));
}
/** Send a JSON-RPC response back to the Rust runtime. */
private sendResponse(id: number, result?: unknown, error?: { code: number; message: string }): void {
if (!this.child?.stdin?.writable) return;
const resp: IpcResponse = { jsonrpc: '2.0', id, ...(error ? { error } : { result }) };
this.child.stdin.write(JSON.stringify(resp) + '\n');
}
private rejectAllPending(err: Error): void {
for (const [_id, p] of this.pending) {
clearTimeout(p.timer);
p.reject(err);
}
this.pending.clear();
}
}
// ── Singleton ──
let _bridge: IpcBridge | null = null;
/** Get the singleton IPC bridge instance. */
export function getIpcBridge(): IpcBridge {
if (!_bridge) {
_bridge = new IpcBridge();
}
return _bridge;
}
/** Set a custom bridge instance (for testing). */
export function setIpcBridge(bridge: IpcBridge | null): void {
_bridge = bridge;
}