From a4683023ad4f1d874696e92c56b77fad576325ce Mon Sep 17 00:00:00 2001 From: abhishekbhakat Date: Wed, 26 Mar 2025 11:57:52 +0000 Subject: [PATCH] feat: add support for Anthropic provider, including configuration and conversion utilities --- config/sample_config.ini | 28 ++- pyproject.toml | 4 +- src/app.py | 43 ++-- src/providers/__init__.py | 12 +- src/providers/anthropic_provider.py | 295 ++++++++++++++++++++++++++++ src/tools/__init__.py | 6 + src/tools/conversion.py | 177 +++++++++++++++++ 7 files changed, 534 insertions(+), 31 deletions(-) create mode 100644 src/providers/anthropic_provider.py create mode 100644 src/tools/__init__.py create mode 100644 src/tools/conversion.py diff --git a/config/sample_config.ini b/config/sample_config.ini index a4635c8..e499c7f 100644 --- a/config/sample_config.ini +++ b/config/sample_config.ini @@ -1,7 +1,31 @@ +[base] +# provider can be [ openai|openrouter|anthropic|google] +provider = openrouter + +[openrouter] +api_key = YOUR_API_KEY +base_url = https://openrouter.ai/api/v1 +model = openai/gpt-4o-2024-11-20 +context_window = 128000 + +[anthropic] +api_key = YOUR_API_KEY +base_url = https://api.anthropic.com/v1/messages +model = claude-3-7-sonnet-20250219 +context_window = 128000 + +[google] +api_key = YOUR_API_KEY +base_url = https://generativelanguage.googleapis.com/v1beta/generateContent +model = gemini-2.0-flash +context_window = 1000000 + + [openai] api_key = YOUR_API_KEY -base_url = CUSTOM_BASE_URL -model = YOUR_MODEL_ID +base_url = https://api.openai.com/v1 +model = openai/gpt-4o +context_window = 128000 [mcp] servers_json = config/mcp_config.json diff --git a/pyproject.toml b/pyproject.toml index 475ed09..c8a8268 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -10,7 +10,9 @@ authors = [ dependencies = [ "streamlit", "python-dotenv", - "openai" + "openai", + "anthropic", + "google-genai", ] classifiers = [ "Development Status :: 3 - Alpha", diff --git a/src/app.py b/src/app.py index 4bba335..81f8b85 100644 --- a/src/app.py +++ b/src/app.py @@ -49,42 +49,43 @@ def init_session_state(): logger.info("Registered MCP Manager shutdown hook.") # --- LLM Client Setup --- - # Determine provider details (e.g., from an [llm] section) - provider_name = "openai" # Default - model_name = None # Must be provided in config + provider_name = None + model_name = None api_key = None base_url = None - # Prioritize [llm] section, fallback to [openai] for compatibility - if config.has_section("llm"): - logger.info("Reading configuration from [llm] section.") - provider_name = config["llm"].get("provider", provider_name) - model_name = config["llm"].get("model") - api_key = config["llm"].get("api_key") - base_url = config["llm"].get("base_url") # Optional - elif config.has_section("openai"): - logger.warning("Using legacy [openai] section for configuration.") - provider_name = "openai" # Force openai if using this section - model_name = config["openai"].get("model") - api_key = config["openai"].get("api_key") - base_url = config["openai"].get("base_url") # Optional + # 1. Determine provider from [base] section + if config.has_section("base") and config["base"].get("provider"): + provider_name = config["base"].get("provider") + logger.info(f"Provider selected from [base] section: {provider_name}") else: - raise ValueError("Missing [llm] or [openai] section in config.ini") + # Fallback or error if [base] provider is missing? Let's error for now. + raise ValueError("Missing 'provider' setting in [base] section of config.ini") + + # 2. Read details from the specific provider's section + if config.has_section(provider_name): + provider_config = config[provider_name] + model_name = provider_config.get("model") + api_key = provider_config.get("api_key") + base_url = provider_config.get("base_url") # Optional + logger.info(f"Read configuration from [{provider_name}] section.") + else: + raise ValueError(f"Missing configuration section '[{provider_name}]' in config.ini for the selected provider.") # Validate required config if not api_key: - raise ValueError("Missing 'api_key' in config.ini ([llm] or [openai] section)") + raise ValueError(f"Missing 'api_key' in [{provider_name}] section of config.ini") if not model_name: - raise ValueError("Missing 'model' name in config.ini ([llm] or [openai] section)") + raise ValueError(f"Missing 'model' name in [{provider_name}] section of config.ini") logger.info(f"Configuring LLMClient for provider: {provider_name}, model: {model_name}") st.session_state.client = LLMClient( provider_name=provider_name, api_key=api_key, mcp_manager=mcp_manager, - base_url=base_url, # Pass None if not provided + base_url=base_url, ) - st.session_state.model_name = model_name # Store model name for chat requests + st.session_state.model_name = model_name logger.info("LLMClient initialized successfully.") except Exception as e: diff --git a/src/providers/__init__.py b/src/providers/__init__.py index b7c1dcc..e1fdd53 100644 --- a/src/providers/__init__.py +++ b/src/providers/__init__.py @@ -1,23 +1,21 @@ # src/providers/__init__.py import logging +from providers.anthropic_provider import AnthropicProvider from providers.base import BaseProvider - -# Import specific provider implementations here as they are created from providers.openai_provider import OpenAIProvider -# from .anthropic_provider import AnthropicProvider -# from .google_provider import GoogleProvider -# from .openrouter_provider import OpenRouterProvider +# from providers.google_provider import GoogleProvider +# from providers.openrouter_provider import OpenRouterProvider logger = logging.getLogger(__name__) # Map provider names (lowercase) to their corresponding class implementations PROVIDER_MAP: dict[str, type[BaseProvider]] = { "openai": OpenAIProvider, - # "anthropic": AnthropicProvider, + "anthropic": AnthropicProvider, # "google": GoogleProvider, - # "openrouter": OpenRouterProvider, + # "openrouter": OpenRouterProvider, # OpenRouter can often use OpenAIProvider with custom base_url } diff --git a/src/providers/anthropic_provider.py b/src/providers/anthropic_provider.py new file mode 100644 index 0000000..14f96f6 --- /dev/null +++ b/src/providers/anthropic_provider.py @@ -0,0 +1,295 @@ +# src/providers/anthropic_provider.py +import json +import logging +from collections.abc import Generator +from typing import Any + +from anthropic import Anthropic, Stream +from anthropic.types import Message, MessageStreamEvent, TextDelta + +# Use relative imports for modules within the same package +from providers.base import BaseProvider + +# Use absolute imports as per Ruff warning and user instructions +from src.llm_models import MODELS +from src.tools.conversion import convert_to_anthropic_tools + +logger = logging.getLogger(__name__) + + +class AnthropicProvider(BaseProvider): + """Provider implementation for Anthropic Claude models.""" + + def __init__(self, api_key: str, base_url: str | None = None): + # Anthropic client doesn't use base_url in the same way, but store it if needed + # Use default Anthropic endpoint if base_url is not provided or relevant + effective_base_url = base_url or MODELS.get("anthropic", {}).get("endpoint") + super().__init__(api_key, effective_base_url) # Pass base_url to parent, though Anthropic client might ignore it + logger.info("Initializing AnthropicProvider") + try: + self.client = Anthropic(api_key=self.api_key) + # Note: Anthropic client doesn't take base_url during init + except Exception as e: + logger.error(f"Failed to initialize Anthropic client: {e}", exc_info=True) + raise + + def _convert_messages(self, messages: list[dict[str, Any]]) -> tuple[str | None, list[dict[str, Any]]]: + """Converts standard message format to Anthropic's format, extracting system prompt.""" + anthropic_messages = [] + system_prompt = None + for i, message in enumerate(messages): + role = message.get("role") + content = message.get("content") + + if role == "system": + if i == 0: + system_prompt = content + logger.debug("Extracted system prompt for Anthropic.") + else: + # Handle system message not at the start (append to previous user message or add as user) + logger.warning("System message found not at the beginning. Treating as user message.") + anthropic_messages.append({"role": "user", "content": f"[System Note]\n{content}"}) + continue + + # Handle tool results specifically + if role == "tool": + # Find the preceding assistant message with the corresponding tool_use block + # This requires careful handling in the follow-up logic + tool_use_id = message.get("tool_call_id") + tool_content = content + # Format as a tool_result content block + anthropic_messages.append({"role": "user", "content": [{"type": "tool_result", "tool_use_id": tool_use_id, "content": tool_content}]}) + continue + + # Handle assistant message potentially containing tool_use blocks + if role == "assistant": + # Check if content is structured (e.g., from a previous tool call response) + if isinstance(content, list): # Assuming tool calls might be represented as a list + anthropic_messages.append({"role": "assistant", "content": content}) + else: + anthropic_messages.append({"role": "assistant", "content": content}) # Regular text content + continue + + # Regular user messages + if role == "user": + anthropic_messages.append({"role": "user", "content": content}) + continue + + logger.warning(f"Unsupported role '{role}' in message conversion for Anthropic.") + + # Ensure conversation starts with a user message if no system prompt was used + if not system_prompt and anthropic_messages and anthropic_messages[0]["role"] != "user": + logger.warning("Anthropic conversation must start with a user message. Prepending empty user message.") + anthropic_messages.insert(0, {"role": "user", "content": "[Start of conversation]"}) # Or handle differently + + return system_prompt, anthropic_messages + + def create_chat_completion( + self, + messages: list[dict[str, str]], + model: str, + temperature: float = 0.4, + max_tokens: int | None = None, # Anthropic requires max_tokens + stream: bool = True, + tools: list[dict[str, Any]] | None = None, + ) -> Stream[MessageStreamEvent] | Message: + """Creates a chat completion using the Anthropic API.""" + logger.debug(f"Anthropic create_chat_completion called. Stream: {stream}, Tools: {bool(tools)}") + + # Anthropic requires max_tokens + if max_tokens is None: + max_tokens = 4096 # Default value if not provided + logger.warning(f"max_tokens not provided for Anthropic, defaulting to {max_tokens}") + + system_prompt, anthropic_messages = self._convert_messages(messages) + + try: + completion_params = { + "model": model, + "messages": anthropic_messages, + "temperature": temperature, + "max_tokens": max_tokens, + "stream": stream, + } + if system_prompt: + completion_params["system"] = system_prompt + if tools: + completion_params["tools"] = tools + # Anthropic doesn't have an explicit 'tool_choice' like OpenAI's 'auto' in the main API call + + # Remove None values (though Anthropic requires max_tokens) + completion_params = {k: v for k, v in completion_params.items() if v is not None} + + log_params = completion_params.copy() + if "messages" in log_params: + log_params["messages"] = [{k: (v[:100] + "..." if isinstance(v, str) and len(v) > 100 else v) for k, v in msg.items()} for msg in log_params["messages"][-2:]] + tools_log = log_params.get("tools", "Not Present") + logger.debug(f"Calling Anthropic API. Model: {log_params.get('model')}, Stream: {log_params.get('stream')}, System: {bool(log_params.get('system'))}, Tools: {tools_log}") + logger.debug(f"Full API Params (messages summarized): {log_params}") + + response = self.client.messages.create(**completion_params) + logger.debug("Anthropic API call successful.") + return response + except Exception as e: + logger.error(f"Anthropic API error: {e}", exc_info=True) + raise + + def get_streaming_content(self, response: Stream[MessageStreamEvent]) -> Generator[str, None, None]: + """Yields content chunks from an Anthropic streaming response.""" + logger.debug("Processing Anthropic stream...") + full_delta = "" + try: + # Iterate through events in the stream + for event in response: + if event.type == "content_block_delta": + # Check if the delta is for text content before accessing .text + if isinstance(event.delta, TextDelta): + delta_text = event.delta.text + if delta_text: + full_delta += delta_text + yield delta_text + # Ignore other delta types like InputJSONDelta for text streaming + # Other event types like 'message_start', 'content_block_start', etc., can be logged or handled if needed + elif event.type == "message_start": + logger.debug(f"Anthropic stream started. Model: {event.message.model}") + elif event.type == "message_stop": + # The stop_reason might be available on the 'message' object associated with the stream, + # not directly on the stop event itself. We log that the stop event occurred. + # Accessing the actual reason might require inspecting the final message state if needed. + logger.debug("Anthropic stream message_stop event received.") + elif event.type == "content_block_start": + if event.content_block.type == "tool_use": + logger.debug(f"Anthropic stream detected tool use start: ID {event.content_block.id}, Name: {event.content_block.name}") + elif event.type == "content_block_stop": + logger.debug(f"Anthropic stream detected content block stop. Index: {event.index}") + + logger.debug(f"Anthropic stream finished. Total delta length: {len(full_delta)}") + except Exception as e: + logger.error(f"Error processing Anthropic stream: {e}", exc_info=True) + yield json.dumps({"error": f"Stream processing error: {str(e)}"}) + + def get_content(self, response: Message) -> str: + """Extracts content from a non-streaming Anthropic response.""" + try: + # Combine text content from all text blocks + text_content = "".join([block.text for block in response.content if block.type == "text"]) + logger.debug(f"Extracted content (length {len(text_content)}) from non-streaming Anthropic response.") + return text_content + except Exception as e: + logger.error(f"Error extracting content from Anthropic response: {e}", exc_info=True) + return f"[Error extracting content: {str(e)}]" + + def has_tool_calls(self, response: Stream[MessageStreamEvent] | Message) -> bool: + """Checks if the Anthropic response contains tool calls.""" + try: + if isinstance(response, Message): # Non-streaming + # Check stop reason and content blocks + has_tool_use_block = any(block.type == "tool_use" for block in response.content) + has_calls = response.stop_reason == "tool_use" or has_tool_use_block + logger.debug(f"Non-streaming Anthropic response check: stop_reason='{response.stop_reason}', has_tool_use_block={has_tool_use_block}. Result: {has_calls}") + return has_calls + elif isinstance(response, Stream): + # Cannot reliably check an unconsumed stream without consuming it. + # The LLMClient should handle this by checking after consumption or based on stop_reason if available post-stream. + logger.warning("has_tool_calls check on an Anthropic stream is unreliable before consumption.") + return False + else: + logger.warning(f"has_tool_calls received unexpected type for Anthropic: {type(response)}") + return False + except Exception as e: + logger.error(f"Error checking for Anthropic tool calls: {e}", exc_info=True) + return False + + def parse_tool_calls(self, response: Message) -> list[dict[str, Any]]: + """Parses tool calls from a non-streaming Anthropic response.""" + parsed_calls = [] + try: + if not isinstance(response, Message): + logger.error(f"parse_tool_calls expects Anthropic Message, got {type(response)}") + return [] + + if response.stop_reason != "tool_use": + logger.debug("No tool use indicated by stop_reason.") + # return [] # Might still have tool_use blocks even if stop_reason isn't tool_use? Check API docs. Let's check content anyway. + + tool_use_blocks = [block for block in response.content if block.type == "tool_use"] + if not tool_use_blocks: + logger.debug("No 'tool_use' content blocks found in Anthropic response.") + return [] + + logger.debug(f"Parsing {len(tool_use_blocks)} 'tool_use' blocks from Anthropic response.") + for block in tool_use_blocks: + # Adapt server/tool name splitting if needed (similar to OpenAI provider) + # Assuming Anthropic tool names might also be prefixed like "server__tool" + parts = block.name.split("__", 1) + if len(parts) == 2: + server_name, func_name = parts + else: + logger.warning(f"Could not determine server_name from Anthropic tool name '{block.name}'.") + server_name = None + func_name = block.name + + parsed_calls.append({ + "id": block.id, + "server_name": server_name, + "function_name": func_name, + "arguments": json.dumps(block.input), # Anthropic input is already a dict, dump to string like OpenAI provider expects? Or keep as dict? Let's keep as dict for now. + # "arguments": block.input, # Keep as dict? Let's try this first. + }) + + return parsed_calls + except Exception as e: + logger.error(f"Error parsing Anthropic tool calls: {e}", exc_info=True) + return [] + + def format_tool_results(self, tool_call_id: str, result: Any) -> dict[str, Any]: + """Formats a tool result for an Anthropic follow-up request.""" + # Anthropic expects a 'tool_result' content block + # The content of the result block should typically be a string. + try: + if isinstance(result, dict): + content_str = json.dumps(result) + else: + content_str = str(result) + except Exception as e: + logger.error(f"Error JSON-encoding tool result for Anthropic {tool_call_id}: {e}") + content_str = json.dumps({"error": "Failed to encode tool result", "original_type": str(type(result))}) + + logger.debug(f"Formatting Anthropic tool result for call ID {tool_call_id}") + # This needs to be placed inside a "user" role message's content list + return { + "type": "tool_result", + "tool_use_id": tool_call_id, + "content": content_str, + # Optionally add is_error=True if result indicates an error + } + + def convert_tools(self, tools: list[dict[str, Any]]) -> list[dict[str, Any]]: + """Converts internal tool format to Anthropic's format.""" + # Use the conversion function, assuming it's correctly placed and imported + logger.debug(f"Converting {len(tools)} tools to Anthropic format.") + try: + # The conversion function needs to handle the server__tool prefixing + anthropic_tools = convert_to_anthropic_tools(tools) + logger.debug(f"Tool conversion result: {anthropic_tools}") + return anthropic_tools + except Exception as e: + logger.error(f"Error during Anthropic tool conversion: {e}", exc_info=True) + return [] + + # Helper needed by LLMClient's current tool handling logic (if adapting OpenAI's pattern) + def get_original_message_with_calls(self, response: Message) -> dict[str, Any]: + """Extracts the assistant's message containing tool calls for Anthropic.""" + try: + if isinstance(response, Message) and any(block.type == "tool_use" for block in response.content): + # Anthropic's response structure is different. The 'message' itself is the assistant's turn. + # We need to return a representation of this turn, including the tool_use blocks. + # Convert Pydantic models within content to dicts + content_list = [block.model_dump(exclude_unset=True) for block in response.content] + return {"role": "assistant", "content": content_list} + else: + logger.warning("Could not extract original message with tool calls from Anthropic response.") + return {"role": "assistant", "content": "[Could not extract tool calls message]"} + except Exception as e: + logger.error(f"Error extracting original Anthropic message with calls: {e}", exc_info=True) + return {"role": "assistant", "content": f"[Error extracting tool calls message: {str(e)}]"} diff --git a/src/tools/__init__.py b/src/tools/__init__.py new file mode 100644 index 0000000..057b8bf --- /dev/null +++ b/src/tools/__init__.py @@ -0,0 +1,6 @@ +# src/tools/__init__.py +# This file makes the 'tools' directory a Python package. + +# Optionally import key functions/classes for easier access +# from .conversion import convert_to_openai_tools, convert_to_anthropic_tools +# from .execution import execute_tool # Assuming execution.py will exist diff --git a/src/tools/conversion.py b/src/tools/conversion.py new file mode 100644 index 0000000..a07b723 --- /dev/null +++ b/src/tools/conversion.py @@ -0,0 +1,177 @@ +""" +Conversion utilities for MCP tools. + +This module contains functions to convert between different tool formats +for various LLM providers (OpenAI, Anthropic, etc.). +""" + +import logging +from typing import Any + +logger = logging.getLogger(__name__) + + +def convert_to_openai_tools(mcp_tools: list[dict[str, Any]]) -> list[dict[str, Any]]: + """ + Convert MCP tools to OpenAI tool definitions. + + Args: + mcp_tools: List of MCP tools (each with server_name, name, description, inputSchema). + + Returns: + List of OpenAI tool definitions. + """ + openai_tools = [] + logger.debug(f"Converting {len(mcp_tools)} MCP tools to OpenAI format.") + + for tool in mcp_tools: + server_name = tool.get("server_name") + tool_name = tool.get("name") + description = tool.get("description") + input_schema = tool.get("inputSchema") + + if not server_name or not tool_name or not description or not input_schema: + logger.warning(f"Skipping invalid MCP tool definition during OpenAI conversion: {tool}") + continue + + # Prefix tool name with server name for routing + prefixed_tool_name = f"{server_name}__{tool_name}" + + # Initialize the OpenAI tool structure + openai_tool = { + "type": "function", + "function": { + "name": prefixed_tool_name, + "description": description, + "parameters": input_schema, # OpenAI uses JSON Schema directly + }, + } + # Basic validation/cleaning of schema if needed could go here + if not isinstance(input_schema, dict) or input_schema.get("type") != "object": + logger.warning(f"Input schema for tool '{prefixed_tool_name}' is not a valid JSON object schema. OpenAI might reject this.") + # Ensure basic structure if missing + if not isinstance(input_schema, dict): + input_schema = {} + if "type" not in input_schema: + input_schema["type"] = "object" + if "properties" not in input_schema: + input_schema["properties"] = {} + openai_tool["function"]["parameters"] = input_schema + + openai_tools.append(openai_tool) + logger.debug(f"Converted MCP tool to OpenAI: {prefixed_tool_name}") + + return openai_tools + + +def convert_to_anthropic_tools(mcp_tools: list[dict[str, Any]]) -> list[dict[str, Any]]: + """ + Convert MCP tools to Anthropic tool definitions. + + Args: + mcp_tools: List of MCP tools (each with server_name, name, description, inputSchema). + + Returns: + List of Anthropic tool definitions. + """ + logger.debug(f"Converting {len(mcp_tools)} MCP tools to Anthropic format") + anthropic_tools = [] + + for tool in mcp_tools: + server_name = tool.get("server_name") + tool_name = tool.get("name") + description = tool.get("description") + input_schema = tool.get("inputSchema") + + if not server_name or not tool_name or not description or not input_schema: + logger.warning(f"Skipping invalid MCP tool definition during Anthropic conversion: {tool}") + continue + + # Prefix tool name with server name for routing + prefixed_tool_name = f"{server_name}__{tool_name}" + + # Initialize the Anthropic tool structure + # Anthropic's format is quite close to JSON Schema + anthropic_tool = {"name": prefixed_tool_name, "description": description, "input_schema": input_schema} + + # Basic validation/cleaning of schema if needed + if not isinstance(input_schema, dict) or input_schema.get("type") != "object": + logger.warning(f"Input schema for tool '{prefixed_tool_name}' is not a valid JSON object schema. Anthropic might reject this.") + # Ensure basic structure if missing + if not isinstance(input_schema, dict): + input_schema = {} + if "type" not in input_schema: + input_schema["type"] = "object" + if "properties" not in input_schema: + input_schema["properties"] = {} + anthropic_tool["input_schema"] = input_schema + + anthropic_tools.append(anthropic_tool) + logger.debug(f"Converted MCP tool to Anthropic: {prefixed_tool_name}") + + return anthropic_tools + + +def convert_to_google_tools(mcp_tools: list[dict[str, Any]]) -> list[dict[str, Any]]: + """ + Convert MCP tools to Google Gemini format (dictionary structure). + + Args: + mcp_tools: List of MCP tools (each with server_name, name, description, inputSchema). + + Returns: + List containing one dictionary with 'function_declarations'. + """ + logger.debug(f"Converting {len(mcp_tools)} MCP tools to Google Gemini format") + + function_declarations = [] + + for tool in mcp_tools: + server_name = tool.get("server_name") + tool_name = tool.get("name") + description = tool.get("description") + input_schema = tool.get("inputSchema") + + if not server_name or not tool_name or not description or not input_schema: + logger.warning(f"Skipping invalid MCP tool definition during Google conversion: {tool}") + continue + + # Prefix tool name with server name for routing + prefixed_tool_name = f"{server_name}__{tool_name}" + + # Basic validation/cleaning of schema + if not isinstance(input_schema, dict) or input_schema.get("type") != "object": + logger.warning(f"Input schema for tool '{prefixed_tool_name}' is not a valid JSON object schema. Google might reject this.") + # Ensure basic structure if missing + if not isinstance(input_schema, dict): + input_schema = {} + if "type" not in input_schema: + input_schema["type"] = "object" + if "properties" not in input_schema: + input_schema["properties"] = {} + # Google requires properties for object type, add dummy if empty + if not input_schema["properties"]: + logger.warning(f"Empty properties for tool '{prefixed_tool_name}', adding dummy property for Google.") + input_schema["properties"] = {"_dummy_param": {"type": "STRING", "description": "Placeholder"}} + + # Create function declaration for Google's format + function_declaration = { + "name": prefixed_tool_name, + "description": description, + "parameters": input_schema, # Google uses JSON Schema directly + } + + function_declarations.append(function_declaration) + logger.debug(f"Converted MCP tool to Google FunctionDeclaration: {prefixed_tool_name}") + + # Google API expects a list containing one Tool object dict + google_tools_wrapper = [{"function_declarations": function_declarations}] if function_declarations else [] + + logger.debug(f"Final Google tools structure: {google_tools_wrapper}") + return google_tools_wrapper + + +# Note: The _handle_schema_construct helper from the reference code is not strictly +# needed if we assume the inputSchema is already valid JSON Schema. +# If complex schemas (anyOf, etc.) need specific handling beyond standard JSON Schema, +# that logic could be added here or within the provider implementations.