Files
mcpapp/project_planning/implementation_mcp.md

8.7 KiB

Dolphin MCP Implementation Plan

Overview

This document outlines the plan for integrating Dolphin MCP into the existing Streamlit chat application. The integration will enable the chat app to use MCP tools with OpenAI models.

1. Configuration Setup

Enhanced config.ini Structure

We'll expand the existing [mcp] section in config.ini:

[openai]
api_key = your_api_key
base_url = https://openrouter.ai/api/v1
model = deepseek/deepseek-chat-v3-0324

[mcp]
enabled = true
# Server configurations in INI format
server.example.command = uvx
server.example.args = mcp-server-example
server.example.env.API_KEY = your-api-key

Configuration Parsing

The OpenAIClient will parse this into a format Dolphin MCP can use:

def parse_mcp_config(self):
    if not self.config.has_section('mcp'):
        return None
    
    mcp_config = {"mcpServers": {}, "models": []}
    
    # Check if MCP is enabled
    if not self.config['mcp'].getboolean('enabled', False):
        return None
    
    # Parse server configurations
    for key in self.config['mcp']:
        if key.startswith('server.') and '.' in key[7:]:
            parts = key.split('.')
            server_name = parts[1]
            config_key = '.'.join(parts[2:])
            
            if server_name not in mcp_config["mcpServers"]:
                mcp_config["mcpServers"][server_name] = {
                    "command": "",
                    "args": [],
                    "env": {}
                }
            
            if config_key == 'command':
                mcp_config["mcpServers"][server_name]["command"] = self.config['mcp'][key]
            elif config_key == 'args':
                mcp_config["mcpServers"][server_name]["args"] = self.config['mcp'][key].split()
            elif config_key.startswith('env.'):
                env_key = config_key[4:]
                mcp_config["mcpServers"][server_name]["env"][env_key] = self.config['mcp'][key]
    
    # Add model configuration from existing OpenAI settings
    model_config = {
        "model": self.config['openai']['model'],
        "provider": "openai",
        "apiKey": self.config['openai']['api_key'],
        "apiBase": self.config['openai']['base_url"],
        "systemMessage": "You are a helpful assistant that can use tools."
    }
    mcp_config["models"].append(model_config)
    
    return mcp_config

2. Code Changes

OpenAIClient Modifications

  1. Import Dolphin MCP components:
from dolphin_mcp import run_interaction
  1. Update initialization to include MCP:
def __init__(self):
    # Existing OpenAI client setup
    self.tools = []  # Will store available MCP tools
    # No need to create MCPClient directly
    # run_interaction will handle provider selection
  1. Update get_chat_response to use MCP tools:
async def get_chat_response(self, messages):
    mcp_config = self.parse_mcp_config()
    
    if not mcp_config:
        # Fall back to standard OpenAI if MCP not enabled
        return self.client.chat.completions.create(
            model=self.config['openai']['model'],
            messages=messages,
            stream=True
        )
    
    # Use Dolphin MCP with our parsed config
    return run_interaction(
        user_query=messages[-1]["content"],
        model_name=self.config['openai']['model'],
        config=mcp_config,  # Pass the config dict directly
        stream=True
    )

3. Server Management with Synchronous Wrapper

To properly manage MCP servers in a Streamlit context, we'll implement a synchronous wrapper:

import asyncio
import threading
from dolphin_mcp import MCPClient, run_interaction

class SyncMCPManager:
    """Synchronous wrapper for MCP server management"""
    
    def __init__(self, config):
        self.config = config
        self.servers = {}
        self.initialized = False
        self._lock = threading.Lock()
    
    def initialize(self):
        """Initialize and start all MCP servers synchronously"""
        if self.initialized:
            return True
        
        with self._lock:
            if self.initialized:  # Double-check after acquiring lock
                return True
                
            if not self.config or "mcpServers" not in self.config:
                return False
                
            # Run the async initialization in a synchronous wrapper
            loop = asyncio.new_event_loop()
            success = loop.run_until_complete(self._async_initialize())
            loop.close()
            
            self.initialized = success
            return success
    
    async def _async_initialize(self):
        """Async implementation of server initialization"""
        success = True
        for server_name, server_config in self.config["mcpServers"].items():
            client = MCPClient(
                server_name=server_name,
                command=server_config.get("command"),
                args=server_config.get("args", []),
                env=server_config.get("env", {})
            )
            
            ok = await client.start()
            if ok:
                # Get available tools
                tools = await client.list_tools()
                self.servers[server_name] = {
                    "client": client,
                    "tools": tools
                }
            else:
                success = False
                print(f"Failed to start MCP server: {server_name}")
        
        return success
    
    def shutdown(self):
        """Shut down all MCP servers synchronously"""
        if not self.initialized:
            return
            
        with self._lock:
            if not self.initialized:
                return
                
            loop = asyncio.new_event_loop()
            loop.run_until_complete(self._async_shutdown())
            loop.close()
            
            self.servers = {}
            self.initialized = False
    
    async def _async_shutdown(self):
        """Async implementation of server shutdown"""
        for server_info in self.servers.values():
            await server_info["client"].stop()
    
    def process_query(self, query, model_name=None):
        """Process a query using MCP tools synchronously"""
        if not self.initialized:
            self.initialize()
            
        if not self.initialized:
            return {"error": "Failed to initialize MCP servers"}
            
        loop = asyncio.new_event_loop()
        result = loop.run_until_complete(self._async_process_query(query, model_name))
        loop.close()
        
        return result
    
    async def _async_process_query(self, query, model_name=None):
        """Async implementation of query processing"""
        return await run_interaction(
            user_query=query,
            model_name=model_name,
            config=self.config,
            stream=False
        )

Streamlit Integration Example

# In app.py
import streamlit as st
from openai_client import OpenAIClient
from mcp_manager import SyncMCPManager

# Initialize on app startup
@st.cache_resource
def get_mcp_manager():
    client = OpenAIClient()
    mcp_config = client.parse_mcp_config()
    if mcp_config:
        manager = SyncMCPManager(mcp_config)
        manager.initialize()  # Start servers immediately
        return manager
    return None

# Get or initialize the MCP manager
mcp_manager = get_mcp_manager()

# Clean up when the app is killed
import atexit
if mcp_manager:
    atexit.register(mcp_manager.shutdown)

def handle_user_input():
    if prompt := st.chat_input("Type your message..."):
        if mcp_manager and mcp_manager.initialized:
            response = mcp_manager.process_query(
                prompt, 
                model_name=client.config['openai']['model']
            )
            # Handle response...

4. Streamlit App Updates

  1. Add tool usage indicators to the UI
  2. Handle streaming responses with tool calls
  3. Add error handling for MCP operations

4. Testing Strategy

Phase 1: Basic Integration

  • Verify MCP client initialization
  • Test tool discovery
  • Simple tool calls

Phase 2: Full Integration

  • End-to-end testing with real queries
  • Error scenario testing
  • Performance testing

5. Future Enhancements

  1. Tool discovery UI
  2. Tool configuration interface
  3. Multiple MCP server support
  4. Advanced error recovery

Implementation Timeline

  1. Week 1: Basic integration and testing
  2. Week 2: Full UI integration
  3. Week 3: Comprehensive testing
  4. Week 4: Deployment and monitoring

Potential Challenges

  1. Tool Compatibility: Ensuring tools work with OpenAI's function calling
  2. Error Handling: Robust error recovery when tools fail
  3. Performance: Minimizing latency from tool calls
  4. Security: Properly handling sensitive data in tool calls