# src/providers/anthropic_provider.py import json import logging from collections.abc import Generator from typing import Any from anthropic import Anthropic, Stream from anthropic.types import Message, MessageStreamEvent, TextDelta # Use relative imports for modules within the same package from providers.base import BaseProvider # Use absolute imports as per Ruff warning and user instructions from src.llm_models import MODELS from src.tools.conversion import convert_to_anthropic_tools logger = logging.getLogger(__name__) class AnthropicProvider(BaseProvider): """Provider implementation for Anthropic Claude models.""" def __init__(self, api_key: str, base_url: str | None = None): # Anthropic client doesn't use base_url in the same way, but store it if needed # Use default Anthropic endpoint if base_url is not provided or relevant effective_base_url = base_url or MODELS.get("anthropic", {}).get("endpoint") super().__init__(api_key, effective_base_url) # Pass base_url to parent, though Anthropic client might ignore it logger.info("Initializing AnthropicProvider") try: self.client = Anthropic(api_key=self.api_key) # Note: Anthropic client doesn't take base_url during init except Exception as e: logger.error(f"Failed to initialize Anthropic client: {e}", exc_info=True) raise def _convert_messages(self, messages: list[dict[str, Any]]) -> tuple[str | None, list[dict[str, Any]]]: """Converts standard message format to Anthropic's format, extracting system prompt.""" anthropic_messages = [] system_prompt = None for i, message in enumerate(messages): role = message.get("role") content = message.get("content") if role == "system": if i == 0: system_prompt = content logger.debug("Extracted system prompt for Anthropic.") else: # Handle system message not at the start (append to previous user message or add as user) logger.warning("System message found not at the beginning. Treating as user message.") anthropic_messages.append({"role": "user", "content": f"[System Note]\n{content}"}) continue # Handle tool results specifically if role == "tool": # Find the preceding assistant message with the corresponding tool_use block # This requires careful handling in the follow-up logic tool_use_id = message.get("tool_call_id") tool_content = content # Format as a tool_result content block anthropic_messages.append({"role": "user", "content": [{"type": "tool_result", "tool_use_id": tool_use_id, "content": tool_content}]}) continue # Handle assistant message potentially containing tool_use blocks if role == "assistant": # Check if content is structured (e.g., from a previous tool call response) if isinstance(content, list): # Assuming tool calls might be represented as a list anthropic_messages.append({"role": "assistant", "content": content}) else: anthropic_messages.append({"role": "assistant", "content": content}) # Regular text content continue # Regular user messages if role == "user": anthropic_messages.append({"role": "user", "content": content}) continue logger.warning(f"Unsupported role '{role}' in message conversion for Anthropic.") # Ensure conversation starts with a user message if no system prompt was used if not system_prompt and anthropic_messages and anthropic_messages[0]["role"] != "user": logger.warning("Anthropic conversation must start with a user message. Prepending empty user message.") anthropic_messages.insert(0, {"role": "user", "content": "[Start of conversation]"}) # Or handle differently return system_prompt, anthropic_messages def create_chat_completion( self, messages: list[dict[str, str]], model: str, temperature: float = 0.4, max_tokens: int | None = None, # Anthropic requires max_tokens stream: bool = True, tools: list[dict[str, Any]] | None = None, ) -> Stream[MessageStreamEvent] | Message: """Creates a chat completion using the Anthropic API.""" logger.debug(f"Anthropic create_chat_completion called. Stream: {stream}, Tools: {bool(tools)}") # Anthropic requires max_tokens if max_tokens is None: max_tokens = 4096 # Default value if not provided logger.warning(f"max_tokens not provided for Anthropic, defaulting to {max_tokens}") system_prompt, anthropic_messages = self._convert_messages(messages) try: completion_params = { "model": model, "messages": anthropic_messages, "temperature": temperature, "max_tokens": max_tokens, "stream": stream, } if system_prompt: completion_params["system"] = system_prompt if tools: completion_params["tools"] = tools # Anthropic doesn't have an explicit 'tool_choice' like OpenAI's 'auto' in the main API call # Remove None values (though Anthropic requires max_tokens) completion_params = {k: v for k, v in completion_params.items() if v is not None} log_params = completion_params.copy() if "messages" in log_params: log_params["messages"] = [{k: (v[:100] + "..." if isinstance(v, str) and len(v) > 100 else v) for k, v in msg.items()} for msg in log_params["messages"][-2:]] tools_log = log_params.get("tools", "Not Present") logger.debug(f"Calling Anthropic API. Model: {log_params.get('model')}, Stream: {log_params.get('stream')}, System: {bool(log_params.get('system'))}, Tools: {tools_log}") logger.debug(f"Full API Params (messages summarized): {log_params}") response = self.client.messages.create(**completion_params) logger.debug("Anthropic API call successful.") return response except Exception as e: logger.error(f"Anthropic API error: {e}", exc_info=True) raise def get_streaming_content(self, response: Stream[MessageStreamEvent]) -> Generator[str, None, None]: """Yields content chunks from an Anthropic streaming response.""" logger.debug("Processing Anthropic stream...") full_delta = "" try: # Iterate through events in the stream for event in response: if event.type == "content_block_delta": # Check if the delta is for text content before accessing .text if isinstance(event.delta, TextDelta): delta_text = event.delta.text if delta_text: full_delta += delta_text yield delta_text # Ignore other delta types like InputJSONDelta for text streaming # Other event types like 'message_start', 'content_block_start', etc., can be logged or handled if needed elif event.type == "message_start": logger.debug(f"Anthropic stream started. Model: {event.message.model}") elif event.type == "message_stop": # The stop_reason might be available on the 'message' object associated with the stream, # not directly on the stop event itself. We log that the stop event occurred. # Accessing the actual reason might require inspecting the final message state if needed. logger.debug("Anthropic stream message_stop event received.") elif event.type == "content_block_start": if event.content_block.type == "tool_use": logger.debug(f"Anthropic stream detected tool use start: ID {event.content_block.id}, Name: {event.content_block.name}") elif event.type == "content_block_stop": logger.debug(f"Anthropic stream detected content block stop. Index: {event.index}") logger.debug(f"Anthropic stream finished. Total delta length: {len(full_delta)}") except Exception as e: logger.error(f"Error processing Anthropic stream: {e}", exc_info=True) yield json.dumps({"error": f"Stream processing error: {str(e)}"}) def get_content(self, response: Message) -> str: """Extracts content from a non-streaming Anthropic response.""" try: # Combine text content from all text blocks text_content = "".join([block.text for block in response.content if block.type == "text"]) logger.debug(f"Extracted content (length {len(text_content)}) from non-streaming Anthropic response.") return text_content except Exception as e: logger.error(f"Error extracting content from Anthropic response: {e}", exc_info=True) return f"[Error extracting content: {str(e)}]" def has_tool_calls(self, response: Stream[MessageStreamEvent] | Message) -> bool: """Checks if the Anthropic response contains tool calls.""" try: if isinstance(response, Message): # Non-streaming # Check stop reason and content blocks has_tool_use_block = any(block.type == "tool_use" for block in response.content) has_calls = response.stop_reason == "tool_use" or has_tool_use_block logger.debug(f"Non-streaming Anthropic response check: stop_reason='{response.stop_reason}', has_tool_use_block={has_tool_use_block}. Result: {has_calls}") return has_calls elif isinstance(response, Stream): # Cannot reliably check an unconsumed stream without consuming it. # The LLMClient should handle this by checking after consumption or based on stop_reason if available post-stream. logger.warning("has_tool_calls check on an Anthropic stream is unreliable before consumption.") return False else: logger.warning(f"has_tool_calls received unexpected type for Anthropic: {type(response)}") return False except Exception as e: logger.error(f"Error checking for Anthropic tool calls: {e}", exc_info=True) return False def parse_tool_calls(self, response: Message) -> list[dict[str, Any]]: """Parses tool calls from a non-streaming Anthropic response.""" parsed_calls = [] try: if not isinstance(response, Message): logger.error(f"parse_tool_calls expects Anthropic Message, got {type(response)}") return [] if response.stop_reason != "tool_use": logger.debug("No tool use indicated by stop_reason.") # return [] # Might still have tool_use blocks even if stop_reason isn't tool_use? Check API docs. Let's check content anyway. tool_use_blocks = [block for block in response.content if block.type == "tool_use"] if not tool_use_blocks: logger.debug("No 'tool_use' content blocks found in Anthropic response.") return [] logger.debug(f"Parsing {len(tool_use_blocks)} 'tool_use' blocks from Anthropic response.") for block in tool_use_blocks: # Adapt server/tool name splitting if needed (similar to OpenAI provider) # Assuming Anthropic tool names might also be prefixed like "server__tool" parts = block.name.split("__", 1) if len(parts) == 2: server_name, func_name = parts else: logger.warning(f"Could not determine server_name from Anthropic tool name '{block.name}'.") server_name = None func_name = block.name parsed_calls.append({ "id": block.id, "server_name": server_name, "function_name": func_name, "arguments": json.dumps(block.input), # Anthropic input is already a dict, dump to string like OpenAI provider expects? Or keep as dict? Let's keep as dict for now. # "arguments": block.input, # Keep as dict? Let's try this first. }) return parsed_calls except Exception as e: logger.error(f"Error parsing Anthropic tool calls: {e}", exc_info=True) return [] def format_tool_results(self, tool_call_id: str, result: Any) -> dict[str, Any]: """Formats a tool result for an Anthropic follow-up request.""" # Anthropic expects a 'tool_result' content block # The content of the result block should typically be a string. try: if isinstance(result, dict): content_str = json.dumps(result) else: content_str = str(result) except Exception as e: logger.error(f"Error JSON-encoding tool result for Anthropic {tool_call_id}: {e}") content_str = json.dumps({"error": "Failed to encode tool result", "original_type": str(type(result))}) logger.debug(f"Formatting Anthropic tool result for call ID {tool_call_id}") # This needs to be placed inside a "user" role message's content list return { "type": "tool_result", "tool_use_id": tool_call_id, "content": content_str, # Optionally add is_error=True if result indicates an error } def convert_tools(self, tools: list[dict[str, Any]]) -> list[dict[str, Any]]: """Converts internal tool format to Anthropic's format.""" # Use the conversion function, assuming it's correctly placed and imported logger.debug(f"Converting {len(tools)} tools to Anthropic format.") try: # The conversion function needs to handle the server__tool prefixing anthropic_tools = convert_to_anthropic_tools(tools) logger.debug(f"Tool conversion result: {anthropic_tools}") return anthropic_tools except Exception as e: logger.error(f"Error during Anthropic tool conversion: {e}", exc_info=True) return [] # Helper needed by LLMClient's current tool handling logic (if adapting OpenAI's pattern) def get_original_message_with_calls(self, response: Message) -> dict[str, Any]: """Extracts the assistant's message containing tool calls for Anthropic.""" try: if isinstance(response, Message) and any(block.type == "tool_use" for block in response.content): # Anthropic's response structure is different. The 'message' itself is the assistant's turn. # We need to return a representation of this turn, including the tool_use blocks. # Convert Pydantic models within content to dicts content_list = [block.model_dump(exclude_unset=True) for block in response.content] return {"role": "assistant", "content": content_list} else: logger.warning("Could not extract original message with tool calls from Anthropic response.") return {"role": "assistant", "content": "[Could not extract tool calls message]"} except Exception as e: logger.error(f"Error extracting original Anthropic message with calls: {e}", exc_info=True) return {"role": "assistant", "content": f"[Error extracting tool calls message: {str(e)}]"}