# src/mcp/manager.py """Synchronous manager for multiple MCPClient instances.""" import asyncio import json import logging import threading from typing import Any # Use relative imports within the mcp package from custom_mcp.client import MCPClient # Configure basic logging # Consider moving this to the main app entry point if not already done logging.basicConfig(level=logging.DEBUG, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s") logger = logging.getLogger(__name__) # Define reasonable timeouts for sync calls (should be slightly longer than async timeouts) INITIALIZE_TIMEOUT = 60.0 # Seconds SHUTDOWN_TIMEOUT = 30.0 # Seconds LIST_ALL_TOOLS_TIMEOUT = 30.0 # Seconds EXECUTE_TOOL_TIMEOUT = 120.0 # Seconds class SyncMCPManager: """ Manages the lifecycle of multiple MCPClient instances and provides a synchronous interface to interact with them using a background event loop. """ def __init__(self, config_path: str = "config/mcp_config.json"): """ Initializes the manager, loads config, but does not start servers yet. Args: config_path: Path to the MCP server configuration JSON file. """ self.config_path = config_path self.config: dict[str, Any] | None = None # Stores server_name -> MCPClient instance self.servers: dict[str, MCPClient] = {} self.initialized = False self._lock = threading.Lock() self._loop: asyncio.AbstractEventLoop | None = None self._thread: threading.Thread | None = None logger.info(f"Initializing SyncMCPManager with config path: {config_path}") self._load_config() def _load_config(self): """Load MCP configuration from JSON file.""" logger.debug(f"Attempting to load MCP config from: {self.config_path}") try: # Using direct file access with open(self.config_path) as f: self.config = json.load(f) logger.info("MCP configuration loaded successfully.") logger.debug(f"Config content: {self.config}") except FileNotFoundError: logger.error(f"MCP config file not found at {self.config_path}") self.config = None except json.JSONDecodeError as e: logger.error(f"Error decoding JSON from MCP config file {self.config_path}: {e}") self.config = None except Exception as e: logger.error(f"Error loading MCP config from {self.config_path}: {e}", exc_info=True) self.config = None # --- Background Event Loop Management --- def _run_event_loop(self): """Target function for the background event loop thread.""" try: self._loop = asyncio.new_event_loop() asyncio.set_event_loop(self._loop) self._loop.run_forever() finally: if self._loop and not self._loop.is_closed(): # Clean up remaining tasks before closing try: tasks = asyncio.all_tasks(self._loop) if tasks: logger.debug(f"Cancelling {len(tasks)} outstanding tasks before closing loop...") for task in tasks: task.cancel() # Allow cancellation to propagate self._loop.run_until_complete(asyncio.gather(*tasks, return_exceptions=True)) logger.debug("Outstanding tasks cancelled.") self._loop.run_until_complete(self._loop.shutdown_asyncgens()) except Exception as e: logger.error(f"Error during event loop cleanup: {e}") finally: self._loop.close() asyncio.set_event_loop(None) logger.info("Event loop thread finished.") def _start_event_loop_thread(self): """Starts the background event loop thread if not already running.""" if self._thread is None or not self._thread.is_alive(): self._thread = threading.Thread(target=self._run_event_loop, name="MCPEventLoop", daemon=True) self._thread.start() logger.info("Event loop thread started.") # Wait briefly for the loop to become available and running while self._loop is None or not self._loop.is_running(): # Use time.sleep in sync context import time time.sleep(0.01) logger.debug("Event loop is running.") def _stop_event_loop_thread(self): """Stops the background event loop thread.""" if self._loop and self._loop.is_running(): logger.info("Requesting event loop stop...") self._loop.call_soon_threadsafe(self._loop.stop) if self._thread and self._thread.is_alive(): logger.info("Waiting for event loop thread to join...") self._thread.join(timeout=5) if self._thread.is_alive(): logger.warning("Event loop thread did not stop gracefully.") self._loop = None self._thread = None logger.info("Event loop stopped.") # --- Public Synchronous Interface --- def initialize(self) -> bool: """ Initializes and starts all configured MCP servers synchronously. Returns: True if all servers started successfully, False otherwise. """ logger.info("Manager initialization requested.") if not self.config or not self.config.get("mcpServers"): logger.warning("Initialization skipped: No valid configuration loaded.") return False with self._lock: if self.initialized: logger.debug("Initialization skipped: Already initialized.") return True self._start_event_loop_thread() if not self._loop: logger.error("Failed to start event loop for initialization.") return False logger.info("Submitting asynchronous server initialization...") # Prepare coroutine to start all clients async def _async_init_all(): tasks = [] for server_name, server_config in self.config["mcpServers"].items(): command = server_config.get("command") args = server_config.get("args", []) config_env = server_config.get("env", {}) if not command: logger.error(f"Skipping server {server_name}: Missing 'command'.") continue client = MCPClient(server_name, command, args, config_env) self.servers[server_name] = client tasks.append(client.start()) # Append the start coroutine results = await asyncio.gather(*tasks, return_exceptions=True) # Check results - True means success, False or Exception means failure all_success = True failed_servers = [] for i, result in enumerate(results): server_name = list(self.config["mcpServers"].keys())[i] # Assumes order is maintained if isinstance(result, Exception) or result is False: all_success = False failed_servers.append(server_name) # Remove failed client from managed servers if server_name in self.servers: del self.servers[server_name] logger.error(f"Failed to start client for server '{server_name}'. Result/Error: {result}") if not all_success: logger.error(f"Initialization failed for servers: {failed_servers}") return all_success # Run the initialization coroutine in the background loop future = asyncio.run_coroutine_threadsafe(_async_init_all(), self._loop) try: success = future.result(timeout=INITIALIZE_TIMEOUT) if success: logger.info("Asynchronous initialization completed successfully.") self.initialized = True else: logger.error("Asynchronous initialization failed.") self.initialized = False # Attempt to clean up any partially started servers self.shutdown() # Call sync shutdown except TimeoutError: logger.error(f"Initialization timed out after {INITIALIZE_TIMEOUT}s.") self.initialized = False self.shutdown() # Clean up success = False except Exception as e: logger.error(f"Exception during initialization future result: {e}", exc_info=True) self.initialized = False self.shutdown() # Clean up success = False return self.initialized def shutdown(self): """Shuts down all managed MCP servers synchronously.""" logger.info("Manager shutdown requested.") with self._lock: # Check servers dict too, in case init was partial if not self.initialized and not self.servers: logger.debug("Shutdown skipped: Not initialized or no servers running.") # Ensure loop is stopped if it exists if self._thread and self._thread.is_alive(): self._stop_event_loop_thread() return if not self._loop or not self._loop.is_running(): logger.warning("Shutdown requested but event loop not running. Attempting direct cleanup.") # Attempt direct cleanup if loop isn't running (shouldn't happen ideally) # This part is tricky as MCPClient.stop is async. # For simplicity, we might just log and rely on process termination on app exit. # Or, try a temporary loop just for shutdown? Let's stick to stopping the thread for now. self.servers = {} self.initialized = False if self._thread and self._thread.is_alive(): self._stop_event_loop_thread() return logger.info("Submitting asynchronous server shutdown...") # Prepare coroutine to stop all clients async def _async_shutdown_all(): tasks = [client.stop() for client in self.servers.values()] if tasks: await asyncio.gather(*tasks, return_exceptions=True) # Run the shutdown coroutine in the background loop future = asyncio.run_coroutine_threadsafe(_async_shutdown_all(), self._loop) try: future.result(timeout=SHUTDOWN_TIMEOUT) logger.info("Asynchronous shutdown completed.") except TimeoutError: logger.error(f"Shutdown timed out after {SHUTDOWN_TIMEOUT}s. Event loop will be stopped.") # Processes might still be running, OS will clean up on exit hopefully except Exception as e: logger.error(f"Exception during shutdown future result: {e}", exc_info=True) finally: # Always mark as uninitialized and clear servers dict self.servers = {} self.initialized = False # Stop the background thread self._stop_event_loop_thread() logger.info("Manager shutdown complete.") def list_all_tools(self) -> list[dict[str, Any]]: """ Retrieves tools from all initialized MCP servers synchronously. Returns: A list of tool definitions in the standard internal format, aggregated from all servers. Returns empty list on failure. """ if not self.initialized or not self.servers: logger.warning("Cannot list tools: Manager not initialized or no servers running.") return [] if not self._loop or not self._loop.is_running(): logger.error("Cannot list tools: Event loop not running.") return [] logger.info(f"Requesting tools from {len(self.servers)} servers...") # Prepare coroutine to list tools from all clients async def _async_list_all(): tasks = [] server_names_in_order = [] for server_name, client in self.servers.items(): tasks.append(client.list_tools()) server_names_in_order.append(server_name) results = await asyncio.gather(*tasks, return_exceptions=True) all_tools = [] for i, result in enumerate(results): server_name = server_names_in_order[i] if isinstance(result, Exception): logger.error(f"Error listing tools for server '{server_name}': {result}") elif result is None: # MCPClient.list_tools returns None on timeout/error logger.error(f"Failed to list tools for server '{server_name}' (timeout or error).") elif isinstance(result, list): # Add server_name to each tool definition for tool in result: tool["server_name"] = server_name all_tools.extend(result) logger.debug(f"Received {len(result)} tools from {server_name}") else: logger.error(f"Unexpected result type ({type(result)}) when listing tools for {server_name}.") return all_tools # Run the coroutine in the background loop future = asyncio.run_coroutine_threadsafe(_async_list_all(), self._loop) try: aggregated_tools = future.result(timeout=LIST_ALL_TOOLS_TIMEOUT) logger.info(f"Aggregated {len(aggregated_tools)} tools from all servers.") return aggregated_tools except TimeoutError: logger.error(f"Listing all tools timed out after {LIST_ALL_TOOLS_TIMEOUT}s.") return [] except Exception as e: logger.error(f"Exception during listing all tools future result: {e}", exc_info=True) return [] def execute_tool(self, server_name: str, tool_name: str, arguments: dict[str, Any]) -> dict[str, Any] | None: """ Executes a specific tool on the designated MCP server synchronously. Args: server_name: The name of the server hosting the tool. tool_name: The name of the tool to execute. arguments: A dictionary of arguments for the tool. Returns: The result content from the tool execution (dict), an error dict ({"error": ...}), or None on timeout/comm failure. """ if not self.initialized: logger.warning(f"Cannot execute tool '{tool_name}' on {server_name}: Manager not initialized.") return None client = self.servers.get(server_name) if not client: logger.error(f"Cannot execute tool: Server '{server_name}' not found.") return None if not self._loop or not self._loop.is_running(): logger.error(f"Cannot execute tool '{tool_name}': Event loop not running.") return None logger.info(f"Executing tool '{tool_name}' on server '{server_name}' with args: {arguments}") # Run the client's call_tool coroutine in the background loop future = asyncio.run_coroutine_threadsafe(client.call_tool(tool_name, arguments), self._loop) try: result = future.result(timeout=EXECUTE_TOOL_TIMEOUT) # MCPClient.call_tool returns the result dict or an error dict or None if result is None: logger.error(f"Tool execution '{tool_name}' on {server_name} failed (timeout or comm error).") elif isinstance(result, dict) and "error" in result: logger.error(f"Tool execution '{tool_name}' on {server_name} returned error: {result['error']}") else: logger.info(f"Tool '{tool_name}' execution successful.") return result # Return result dict, error dict, or None except TimeoutError: logger.error(f"Tool execution timed out after {EXECUTE_TOOL_TIMEOUT}s for '{tool_name}' on {server_name}.") return None except Exception as e: logger.error(f"Exception during tool execution future result for '{tool_name}' on {server_name}: {e}", exc_info=True) return None