feat: Implement GoogleProvider for Google Generative AI integration
- Added GoogleProvider class to handle chat completions with Google Gemini API. - Implemented client initialization and response handling for streaming and non-streaming responses. - Created utility functions for tool conversion, response parsing, and content extraction. - Removed legacy tool conversion utilities from the tools module. - Enhanced logging for better traceability of API interactions and error handling.
This commit is contained in:
205
src/providers/google_provider/response.py
Normal file
205
src/providers/google_provider/response.py
Normal file
@@ -0,0 +1,205 @@
|
||||
# src/providers/google_provider/response.py
|
||||
"""
|
||||
Response handling utilities specific to the Google Generative AI provider.
|
||||
|
||||
Includes functions for:
|
||||
- Extracting content from streaming responses.
|
||||
- Extracting content from non-streaming responses.
|
||||
- Extracting token usage information.
|
||||
"""
|
||||
|
||||
import json
|
||||
import logging
|
||||
from collections.abc import Generator
|
||||
from typing import Any
|
||||
|
||||
from google.genai.types import GenerateContentResponse
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def get_streaming_content(response: Any) -> Generator[str, None, None]:
|
||||
"""
|
||||
Yields content chunks (text) from a Google streaming response iterator.
|
||||
|
||||
Args:
|
||||
response: The streaming response iterator returned by `generate_content(stream=True)`.
|
||||
|
||||
Yields:
|
||||
String chunks of the generated text content.
|
||||
May yield JSON strings containing error information if errors occur during streaming.
|
||||
"""
|
||||
logger.debug("Processing Google stream...")
|
||||
full_delta = ""
|
||||
try:
|
||||
# Check if the response itself is an error indicator (e.g., from create_chat_completion error handling)
|
||||
if isinstance(response, dict) and "error" in response:
|
||||
yield json.dumps(response)
|
||||
logger.error(f"Stream processing stopped due to initial error: {response['error']}")
|
||||
return
|
||||
# Check if response is already an error iterator
|
||||
if hasattr(response, "__iter__") and not hasattr(response, "candidates"):
|
||||
# If it looks like an error iterator from create_chat_completion
|
||||
first_item = next(response, None)
|
||||
if first_item and isinstance(first_item, str):
|
||||
try:
|
||||
error_data = json.loads(first_item)
|
||||
if "error" in error_data:
|
||||
yield first_item # Yield the error JSON
|
||||
yield from response
|
||||
logger.error(f"Stream processing stopped due to yielded error: {error_data['error']}")
|
||||
return
|
||||
except json.JSONDecodeError:
|
||||
# Not a JSON error, yield it as is and continue? Or stop?
|
||||
# Assuming it might be valid content if not JSON error.
|
||||
yield first_item
|
||||
elif first_item: # Put the first item back if it wasn't an error
|
||||
# This requires a way to chain iterators, simple yield doesn't work well here.
|
||||
# For simplicity, we assume error iterators yield JSON strings.
|
||||
# If the stream is valid, the loop below will handle it.
|
||||
# Re-assigning response might be complex. Let the main loop handle valid streams.
|
||||
pass # Let the main loop handle the original response iterator
|
||||
|
||||
# Process the stream chunk by chunk
|
||||
for chunk in response:
|
||||
# Check for errors embedded within the stream chunks (less common for Google?)
|
||||
if isinstance(chunk, dict) and "error" in chunk:
|
||||
yield json.dumps(chunk)
|
||||
logger.error(f"Error encountered during Google stream: {chunk['error']}")
|
||||
continue # Continue processing stream or stop? Continuing for now.
|
||||
|
||||
# Extract text content
|
||||
delta = ""
|
||||
try:
|
||||
if hasattr(chunk, "text"):
|
||||
delta = chunk.text
|
||||
elif hasattr(chunk, "candidates") and chunk.candidates:
|
||||
# Sometimes content might be nested under candidates even in stream?
|
||||
# Check the first candidate's first part for text.
|
||||
first_candidate = chunk.candidates[0]
|
||||
if hasattr(first_candidate, "content") and hasattr(first_candidate.content, "parts") and first_candidate.content.parts:
|
||||
first_part = first_candidate.content.parts[0]
|
||||
if hasattr(first_part, "text"):
|
||||
delta = first_part.text
|
||||
except Exception as e:
|
||||
logger.warning(f"Could not extract text from stream chunk: {chunk}. Error: {e}", exc_info=True)
|
||||
delta = "" # Ensure delta is a string
|
||||
|
||||
if delta:
|
||||
full_delta += delta
|
||||
yield delta
|
||||
|
||||
# Detect function calls during stream (optional, for logging/early detection)
|
||||
try:
|
||||
if hasattr(chunk, "candidates") and chunk.candidates:
|
||||
for part in chunk.candidates[0].content.parts:
|
||||
if hasattr(part, "function_call") and part.function_call:
|
||||
logger.debug(f"Function call detected during stream: {part.function_call.name}")
|
||||
# Note: We don't yield the function call itself here, just the text.
|
||||
# Function calls are typically processed after the stream completes.
|
||||
break # Found a function call in this chunk
|
||||
except Exception:
|
||||
# Ignore errors during optional function call detection in stream
|
||||
pass
|
||||
|
||||
logger.debug(f"Google stream finished. Total delta length: {len(full_delta)}")
|
||||
|
||||
except StopIteration:
|
||||
logger.debug("Google stream finished (StopIteration).") # Normal end of iteration
|
||||
except Exception as e:
|
||||
logger.error(f"Error processing Google stream: {e}", exc_info=True)
|
||||
# Yield a final error message
|
||||
yield json.dumps({"error": f"Stream processing error: {str(e)}"})
|
||||
|
||||
|
||||
def get_content(response: GenerateContentResponse | dict[str, Any]) -> str:
|
||||
"""
|
||||
Extracts the full text content from a non-streaming Google response.
|
||||
|
||||
Args:
|
||||
response: The non-streaming response object (`GenerateContentResponse`) or
|
||||
an error dictionary.
|
||||
|
||||
Returns:
|
||||
The concatenated text content, or an error message string.
|
||||
"""
|
||||
try:
|
||||
# Handle error dictionary case
|
||||
if isinstance(response, dict) and "error" in response:
|
||||
logger.error(f"Cannot get content from error response: {response['error']}")
|
||||
return f"[Error: {response['error']}]"
|
||||
|
||||
# Handle successful GenerateContentResponse object
|
||||
if hasattr(response, "text"):
|
||||
# The `.text` attribute usually provides the concatenated text content directly
|
||||
content = response.text
|
||||
logger.debug(f"Extracted content (length {len(content)}) from response.text.")
|
||||
return content
|
||||
elif hasattr(response, "candidates") and response.candidates:
|
||||
# Fallback: manually concatenate text from parts if .text is missing
|
||||
first_candidate = response.candidates[0]
|
||||
if hasattr(first_candidate, "content") and hasattr(first_candidate.content, "parts"):
|
||||
text_parts = []
|
||||
for part in first_candidate.content.parts:
|
||||
if hasattr(part, "text"):
|
||||
text_parts.append(part.text)
|
||||
# We are only interested in text content here, ignore function calls etc.
|
||||
content = "".join(text_parts)
|
||||
logger.debug(f"Extracted content (length {len(content)}) from response candidates' parts.")
|
||||
return content
|
||||
else:
|
||||
logger.warning("Google response candidate has no content or parts.")
|
||||
return "" # Return empty string if no text found
|
||||
else:
|
||||
logger.warning(f"Could not extract content from Google response: No 'text' or valid 'candidates'. Response type: {type(response)}")
|
||||
return "" # Return empty string if no text found
|
||||
except AttributeError as ae:
|
||||
logger.error(f"Attribute error extracting content from Google response: {ae}. Response object: {response}", exc_info=True)
|
||||
return f"[Error extracting content: Attribute missing - {str(ae)}]"
|
||||
except Exception as e:
|
||||
logger.error(f"Unexpected error extracting content from Google response: {e}", exc_info=True)
|
||||
return f"[Error extracting content: {str(e)}]"
|
||||
|
||||
|
||||
def get_usage(response: GenerateContentResponse | dict[str, Any]) -> dict[str, int] | None:
|
||||
"""
|
||||
Extracts token usage information from a Google response object.
|
||||
|
||||
Args:
|
||||
response: The response object (`GenerateContentResponse`) or an error dictionary.
|
||||
|
||||
Returns:
|
||||
A dictionary containing 'prompt_tokens' and 'completion_tokens', or None if
|
||||
usage information is unavailable or an error occurred.
|
||||
"""
|
||||
try:
|
||||
# Handle error dictionary case
|
||||
if isinstance(response, dict) and "error" in response:
|
||||
logger.warning("Cannot get usage from error response.")
|
||||
return None
|
||||
|
||||
# Check for usage metadata in the response object
|
||||
if hasattr(response, "usage_metadata"):
|
||||
metadata = response.usage_metadata
|
||||
# Google uses prompt_token_count and candidates_token_count
|
||||
usage = {
|
||||
"prompt_tokens": getattr(metadata, "prompt_token_count", 0),
|
||||
"completion_tokens": getattr(metadata, "candidates_token_count", 0),
|
||||
# Google also provides total_token_count, could be added if needed
|
||||
# "total_tokens": getattr(metadata, "total_token_count", 0),
|
||||
}
|
||||
# Ensure values are integers
|
||||
usage = {k: int(v) for k, v in usage.items()}
|
||||
logger.debug(f"Extracted usage from Google response metadata: {usage}")
|
||||
return usage
|
||||
else:
|
||||
# Log a warning only if it's not clearly an error dict already handled
|
||||
if not (isinstance(response, dict) and "error" in response):
|
||||
logger.warning(f"Could not extract usage from Google response object of type {type(response)}. No 'usage_metadata' attribute found.")
|
||||
return None
|
||||
except AttributeError as ae:
|
||||
logger.error(f"Attribute error extracting usage from Google response: {ae}. Response object: {response}", exc_info=True)
|
||||
return None
|
||||
except Exception as e:
|
||||
logger.error(f"Unexpected error extracting usage from Google response: {e}", exc_info=True)
|
||||
return None
|
||||
Reference in New Issue
Block a user