""" Response handling utilities specific to the Google Generative AI provider. Includes functions for: - Extracting content from streaming responses. - Extracting content from non-streaming responses. - Extracting token usage information. """ import json import logging from collections.abc import Generator from typing import Any from google.genai.types import GenerateContentResponse logger = logging.getLogger(__name__) def get_streaming_content(response: Any) -> Generator[str, None, None]: """ Yields content chunks (text) from a Google streaming response iterator. Args: response: The streaming response iterator returned by `generate_content(stream=True)`. Yields: String chunks of the generated text content. May yield JSON strings containing error information if errors occur during streaming. """ logger.debug("Processing Google stream...") full_delta = "" try: if isinstance(response, dict) and "error" in response: yield json.dumps(response) logger.error(f"Stream processing stopped due to initial error: {response['error']}") return if hasattr(response, "__iter__") and not hasattr(response, "candidates"): first_item = next(response, None) if first_item and isinstance(first_item, str): try: error_data = json.loads(first_item) if "error" in error_data: yield first_item yield from response logger.error(f"Stream processing stopped due to yielded error: {error_data['error']}") return except json.JSONDecodeError: yield first_item elif first_item: pass for chunk in response: if isinstance(chunk, dict) and "error" in chunk: yield json.dumps(chunk) logger.error(f"Error encountered during Google stream: {chunk['error']}") continue delta = "" try: if hasattr(chunk, "text"): delta = chunk.text elif hasattr(chunk, "candidates") and chunk.candidates: first_candidate = chunk.candidates[0] if hasattr(first_candidate, "content") and hasattr(first_candidate.content, "parts") and first_candidate.content.parts: first_part = first_candidate.content.parts[0] if hasattr(first_part, "text"): delta = first_part.text except Exception as e: logger.warning(f"Could not extract text from stream chunk: {chunk}. Error: {e}", exc_info=True) delta = "" if delta: full_delta += delta yield delta try: if hasattr(chunk, "candidates") and chunk.candidates: for part in chunk.candidates[0].content.parts: if hasattr(part, "function_call") and part.function_call: logger.debug(f"Function call detected during stream: {part.function_call.name}") break except Exception: pass logger.debug(f"Google stream finished. Total delta length: {len(full_delta)}") except StopIteration: logger.debug("Google stream finished (StopIteration).") except Exception as e: logger.error(f"Error processing Google stream: {e}", exc_info=True) yield json.dumps({"error": f"Stream processing error: {str(e)}"}) def get_content(response: GenerateContentResponse | dict[str, Any]) -> str: """ Extracts the full text content from a non-streaming Google response. Args: response: The non-streaming response object (`GenerateContentResponse`) or an error dictionary. Returns: The concatenated text content, or an error message string. """ try: if isinstance(response, dict) and "error" in response: logger.error(f"Cannot get content from error dict: {response['error']}") return f"[Error: {response['error']}]" if not isinstance(response, GenerateContentResponse): logger.error(f"Cannot get content: Expected GenerateContentResponse or error dict, got {type(response)}") return f"[Error: Unexpected response type {type(response)}]" if hasattr(response, "text") and response.text: content = response.text logger.debug(f"Extracted content (length {len(content)}) from response.text.") return content if hasattr(response, "candidates") and response.candidates: first_candidate = response.candidates[0] if hasattr(first_candidate, "content") and first_candidate.content and hasattr(first_candidate.content, "parts") and first_candidate.content.parts: text_parts = [part.text for part in first_candidate.content.parts if hasattr(part, "text")] if text_parts: content = "".join(text_parts) logger.debug(f"Extracted content (length {len(content)}) from response candidate parts.") return content else: logger.warning("Google response candidate parts contained no text.") return "" else: logger.warning("Google response candidate has no valid content or parts.") return "" else: logger.warning(f"Could not extract content from Google response: No .text or valid candidates found. Response: {response}") return "" except AttributeError as ae: logger.error(f"Attribute error extracting content from Google response: {ae}. Response type: {type(response)}", exc_info=True) return f"[Error extracting content: Attribute missing - {str(ae)}]" except Exception as e: logger.error(f"Unexpected error extracting content from Google response: {e}", exc_info=True) return f"[Error extracting content: {str(e)}]" def get_usage(response: GenerateContentResponse | dict[str, Any]) -> dict[str, int] | None: """ Extracts token usage information from a Google response object. Args: response: The response object (`GenerateContentResponse`) or an error dictionary. Returns: A dictionary containing 'prompt_tokens' and 'completion_tokens', or None if usage information is unavailable or an error occurred. """ try: if isinstance(response, dict) and "error" in response: logger.warning(f"Cannot get usage from error dict: {response['error']}") return None if not isinstance(response, GenerateContentResponse): logger.warning(f"Cannot get usage: Expected GenerateContentResponse or error dict, got {type(response)}") return None metadata = getattr(response, "usage_metadata", None) if metadata: prompt_tokens = getattr(metadata, "prompt_token_count", 0) completion_tokens = getattr(metadata, "candidates_token_count", 0) usage = { "prompt_tokens": int(prompt_tokens), "completion_tokens": int(completion_tokens), } logger.debug(f"Extracted usage from Google response metadata: {usage}") return usage else: logger.warning(f"Could not extract usage from Google response object: No 'usage_metadata' attribute found. Response: {response}") return None except AttributeError as ae: logger.error(f"Attribute error extracting usage from Google response: {ae}. Response type: {type(response)}", exc_info=True) return None except Exception as e: logger.error(f"Unexpected error extracting usage from Google response: {e}", exc_info=True) return None