import { Client } from '@modelcontextprotocol/sdk/client/index.js'; import { StdioClientTransport } from '@modelcontextprotocol/sdk/client/stdio.js'; export interface MCPConfig { serverCommand: string; serverArgs: string; envFile: string; timeoutMs: number; cacheTtlSec: number; maxRetries: number; /** * Optional logger override. When omitted, the client logs to the global * `console` (matching the previous behaviour). Consumers in a Fastify / * pino / structlog context should pass their own logger so MCP output is * captured alongside the rest of the service's structured logs. * * Each method is invoked with a message string followed by zero or more * structured arguments, mirroring the standard Console API. */ logger?: McpLogger; } /** * Minimal logger interface consumed by `MCPClient`. Compatible with `console` * (which is the default), pino, Fastify's `request.log`, and any other * logger that exposes `debug` / `info` / `warn` / `error` methods accepting * variadic arguments. */ export interface McpLogger { debug(message: string, ...args: unknown[]): void; info(message: string, ...args: unknown[]): void; warn(message: string, ...args: unknown[]): void; error(message: string, ...args: unknown[]): void; } export interface Tool { name: string; description?: string; inputSchema?: any; } interface CacheEntry { data: any; timestamp: number; ttl: number; } interface RateLimitEntry { count: number; resetTime: number; } export class MCPClient { private client: Client | null = null; private transport: StdioClientTransport | null = null; private isConnected: boolean = false; private cache: Map = new Map(); private rateLimitMap: Map = new Map(); private config: MCPConfig; private readonly log: McpLogger; private readonly RATE_LIMIT_WINDOW = 60000; // 1 minute window private readonly RATE_LIMIT_MAX_REQUESTS = 200; // Max 200 requests per minute constructor(config: MCPConfig) { this.config = config; // Default to console when no logger is provided. The `console` global // already implements the McpLogger interface (debug/info/warn/error // are all variadic), so no shim is required. this.log = config.logger ?? console; } async connect(): Promise { if (this.isConnected) { this.log.warn('[MCP] Already connected'); return; } try { this.log.info( `[MCP] Connecting to server via ${this.config.serverCommand} ${this.config.serverArgs}` ); this.client = new Client( { name: 'bytelyst-mcp-client', version: '1.0.0', }, { capabilities: {}, } ); this.transport = new StdioClientTransport({ command: this.config.serverCommand, args: [this.config.serverArgs], env: { MCP_ENV_FILE: this.config.envFile, MCP_CLIENT: 'backend', }, }); await this.client.connect(this.transport); this.isConnected = true; this.log.info('[MCP] Successfully connected to MCP server'); } catch (error: any) { this.log.error(`[MCP] Failed to connect: ${error.message}`); this.isConnected = false; throw error; } } async disconnect(): Promise { if (!this.isConnected) { return; } try { if (this.client) { await this.client.close(); } if (this.transport) { // @ts-ignore - StdioClientTransport doesn't have explicit close method if (this.transport.close) { await this.transport.close(); } } this.isConnected = false; this.client = null; this.transport = null; this.log.info('[MCP] Disconnected from MCP server'); } catch (error: any) { this.log.error(`[MCP] Error during disconnect: ${error.message}`); this.isConnected = false; } } async callTool(toolName: string, args: any = {}): Promise { if (!this.isConnected || !this.client) { throw new Error('MCP client is not connected'); } // Check rate limits before proceeding this.checkRateLimit(); // Audit log MCP tool calls with sanitized parameters this.auditLogToolCall(toolName, args); // Check cache for read operations const cacheKey = this.getCacheKey(toolName, args); const cached = this.getFromCache(cacheKey); if (cached !== null) { this.log.debug(`[MCP] Cache hit for ${toolName}`); return cached; } let lastError: Error | null = null; for (let attempt = 0; attempt < this.config.maxRetries; attempt++) { try { this.log.debug( `[MCP] Calling tool ${toolName} (attempt ${attempt + 1}/${this.config.maxRetries})` ); const result = await Promise.race([ this.client.callTool({ name: toolName, arguments: args, }), this.createTimeout(this.config.timeoutMs), ]); const response = result.content as any[]; if (response && response.length > 0) { const data = response[0]; // Cache successful read operations if (this.isReadOperation(toolName)) { this.setCache(cacheKey, data); } // Audit log successful tool execution this.auditLogToolSuccess(toolName); this.log.info(`[MCP] Tool ${toolName} executed successfully`); return data; } throw new Error(`Empty response from tool ${toolName}`); } catch (error: any) { lastError = error; this.log.warn(`[MCP] Tool ${toolName} attempt ${attempt + 1} failed: ${error.message}`); // Audit log failed tool execution this.auditLogToolFailure(toolName, error.message); // Don't retry on certain errors if (this.isNonRetriableError(error)) { break; } // Exponential backoff if (attempt < this.config.maxRetries - 1) { await this.sleep(Math.pow(2, attempt) * 1000); } } } this.log.error(`[MCP] Tool ${toolName} failed after ${this.config.maxRetries} attempts`); throw lastError || new Error(`Tool ${toolName} failed`); } async listTools(): Promise { if (!this.isConnected || !this.client) { throw new Error('MCP client is not connected'); } try { const response = await this.client.listTools(); return response.tools.map((tool: any) => ({ name: tool.name, description: tool.description, inputSchema: tool.inputSchema, })); } catch (error: any) { this.log.error(`[MCP] Failed to list tools: ${error.message}`); throw error; } } private checkRateLimit(): void { const now = Date.now(); const key = 'global'; let entry = this.rateLimitMap.get(key); if (!entry || now > entry.resetTime) { entry = { count: 0, resetTime: now + this.RATE_LIMIT_WINDOW, }; this.rateLimitMap.set(key, entry); } if (entry.count >= this.RATE_LIMIT_MAX_REQUESTS) { const resetTime = new Date(entry.resetTime).toISOString(); throw new Error( `Rate limit exceeded. Maximum ${this.RATE_LIMIT_MAX_REQUESTS} requests per ${this.RATE_LIMIT_WINDOW / 1000} seconds. Resets at ${resetTime}` ); } entry.count++; } private getCacheKey(toolName: string, args: any): string { return `${toolName}:${JSON.stringify(args)}`; } private getFromCache(key: string): any | null { const entry = this.cache.get(key); if (!entry) { return null; } const now = Date.now(); if (now - entry.timestamp > entry.ttl * 1000) { this.cache.delete(key); return null; } return entry.data; } private setCache(key: string, data: any): void { this.cache.set(key, { data, timestamp: Date.now(), ttl: this.config.cacheTtlSec, }); } private clearCache(): void { this.cache.clear(); this.log.debug('[MCP] Cache cleared'); } private isReadOperation(toolName: string): boolean { const readOperations = [ 'get_quote', 'get_bars', 'get_positions', 'get_account', 'get_news', 'get_orders', ]; return readOperations.some(op => toolName.toLowerCase().includes(op)); } private isNonRetriableError(error: any): boolean { const nonRetriablePatterns = [ 'unauthorized', 'forbidden', 'invalid', 'not found', 'authentication', ]; const message = error.message?.toLowerCase() || ''; return nonRetriablePatterns.some(pattern => message.includes(pattern)); } private createTimeout(ms: number): Promise { return new Promise((_, reject) => { setTimeout(() => reject(new Error(`Operation timed out after ${ms}ms`)), ms); }); } private sleep(ms: number): Promise { return new Promise(resolve => setTimeout(resolve, ms)); } async healthCheck(): Promise { try { if (!this.isConnected) { return false; } // Try to list tools as a simple health check await this.listTools(); return true; } catch (error) { this.log.warn(`[MCP] Health check failed: ${error}`); return false; } } getConnectionStatus(): boolean { return this.isConnected; } getCacheStats(): { size: number; keys: string[] } { return { size: this.cache.size, keys: Array.from(this.cache.keys()), }; } private auditLogToolCall(toolName: string, args: any): void { // Security: Sanitize sensitive parameters before logging const sanitizedArgs = this.sanitizeArgs(args); this.log.info(`[MCP-AUDIT] Tool called: ${toolName}, Args: ${JSON.stringify(sanitizedArgs)}`); } private auditLogToolSuccess(toolName: string): void { this.log.info(`[MCP-AUDIT] Tool succeeded: ${toolName}`); } private auditLogToolFailure(toolName: string, error: string): void { this.log.error(`[MCP-AUDIT] Tool failed: ${toolName}, Error: ${error}`); } private sanitizeArgs(args: any): any { const sensitiveKeys = ['api_key', 'secret', 'password', 'token', 'key']; const sanitized = { ...args }; for (const key of Object.keys(sanitized)) { if (sensitiveKeys.some(sensitive => key.toLowerCase().includes(sensitive))) { sanitized[key] = '[REDACTED]'; } } return sanitized; } }