refactor: move convert_to_anthropic_tools function to tools.py for better organization
This commit is contained in:
@@ -4,8 +4,6 @@ from typing import Any
|
||||
|
||||
from anthropic.types import Message
|
||||
|
||||
from src.tools.conversion import convert_to_anthropic_tools
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@@ -63,6 +61,54 @@ def format_tool_results(tool_call_id: str, result: Any) -> dict[str, Any]:
|
||||
return {"type": "tool_result", "tool_use_id": tool_call_id, "content": content_str}
|
||||
|
||||
|
||||
def convert_to_anthropic_tools(mcp_tools: list[dict[str, Any]]) -> list[dict[str, Any]]:
|
||||
"""
|
||||
Convert MCP tools to Anthropic tool definitions.
|
||||
|
||||
Args:
|
||||
mcp_tools: List of MCP tools (each with server_name, name, description, inputSchema).
|
||||
|
||||
Returns:
|
||||
List of Anthropic tool definitions.
|
||||
"""
|
||||
logger.debug(f"Converting {len(mcp_tools)} MCP tools to Anthropic format")
|
||||
anthropic_tools = []
|
||||
|
||||
for tool in mcp_tools:
|
||||
server_name = tool.get("server_name")
|
||||
tool_name = tool.get("name")
|
||||
description = tool.get("description")
|
||||
input_schema = tool.get("inputSchema")
|
||||
|
||||
if not server_name or not tool_name or not description or not input_schema:
|
||||
logger.warning(f"Skipping invalid MCP tool definition during Anthropic conversion: {tool}")
|
||||
continue
|
||||
|
||||
# Prefix tool name with server name for routing
|
||||
prefixed_tool_name = f"{server_name}__{tool_name}"
|
||||
|
||||
# Initialize the Anthropic tool structure
|
||||
# Anthropic's format is quite close to JSON Schema
|
||||
anthropic_tool = {"name": prefixed_tool_name, "description": description, "input_schema": input_schema}
|
||||
|
||||
# Basic validation/cleaning of schema if needed
|
||||
if not isinstance(input_schema, dict) or input_schema.get("type") != "object":
|
||||
logger.warning(f"Input schema for tool '{prefixed_tool_name}' is not a valid JSON object schema. Anthropic might reject this.")
|
||||
# Ensure basic structure if missing
|
||||
if not isinstance(input_schema, dict):
|
||||
input_schema = {}
|
||||
if "type" not in input_schema:
|
||||
input_schema["type"] = "object"
|
||||
if "properties" not in input_schema:
|
||||
input_schema["properties"] = {}
|
||||
anthropic_tool["input_schema"] = input_schema
|
||||
|
||||
anthropic_tools.append(anthropic_tool)
|
||||
logger.debug(f"Converted MCP tool to Anthropic: {prefixed_tool_name}")
|
||||
|
||||
return anthropic_tools
|
||||
|
||||
|
||||
def convert_tools(tools: list[dict[str, Any]]) -> list[dict[str, Any]]:
|
||||
logger.debug(f"Converting {len(tools)} tools to Anthropic format.")
|
||||
try:
|
||||
|
||||
Reference in New Issue
Block a user