Implement streaming support for Google GenAI provider

- Use generate_content_stream() for streaming requests
- Create async generator that yields OpenAI-compatible chunks
- Extract text from each stream chunk
- Generate unique chunk IDs
- Format chunks as chat.completion.chunk objects
- Include delta content in each chunk
- Maintain non-streaming functionality for regular requests

This fixes the streaming issue where Google GenAI was returning
a dict instead of an iterable, causing 'JSONResponse object is
not iterable' errors.
parent 3c7bec4c
...@@ -126,13 +126,14 @@ class GoogleProviderHandler(BaseProviderHandler): ...@@ -126,13 +126,14 @@ class GoogleProviderHandler(BaseProviderHandler):
async def handle_request(self, model: str, messages: List[Dict], max_tokens: Optional[int] = None, async def handle_request(self, model: str, messages: List[Dict], max_tokens: Optional[int] = None,
temperature: Optional[float] = 1.0, stream: Optional[bool] = False, temperature: Optional[float] = 1.0, stream: Optional[bool] = False,
tools: Optional[List[Dict]] = None, tool_choice: Optional[Union[str, Dict]] = None) -> Dict: tools: Optional[List[Dict]] = None, tool_choice: Optional[Union[str, Dict]] = None) -> Union[Dict, object]:
if self.is_rate_limited(): if self.is_rate_limited():
raise Exception("Provider rate limited") raise Exception("Provider rate limited")
try: try:
import logging import logging
logging.info(f"GoogleProviderHandler: Handling request for model {model}") logging.info(f"GoogleProviderHandler: Handling request for model {model}")
logging.info(f"GoogleProviderHandler: Stream: {stream}")
if AISBF_DEBUG: if AISBF_DEBUG:
logging.info(f"GoogleProviderHandler: Messages: {messages}") logging.info(f"GoogleProviderHandler: Messages: {messages}")
else: else:
...@@ -149,21 +150,83 @@ class GoogleProviderHandler(BaseProviderHandler): ...@@ -149,21 +150,83 @@ class GoogleProviderHandler(BaseProviderHandler):
if max_tokens is not None: if max_tokens is not None:
config["max_output_tokens"] = max_tokens config["max_output_tokens"] = max_tokens
# Generate content using the google-genai client # Handle streaming request
response = self.client.models.generate_content( if stream:
model=model, logging.info(f"GoogleProviderHandler: Using streaming API")
contents=content, response = self.client.models.generate_content_stream(
config=config model=model,
) contents=content,
config=config
logging.info(f"GoogleProviderHandler: Response received: {response}") )
self.record_success() logging.info(f"GoogleProviderHandler: Streaming response received")
self.record_success()
# Extract content from the nested response structure return response
# The response has candidates[0].content.parts else:
response_text = "" # Generate content using the google-genai client
tool_calls = None response = self.client.models.generate_content(
finish_reason = "stop" model=model,
contents=content,
config=config
)
# Handle streaming response
if stream:
logging.info(f"GoogleProviderHandler: Processing streaming response")
# Create a generator that yields OpenAI-compatible chunks
async def stream_generator():
try:
chunk_id = 0
for chunk in response:
logging.info(f"GoogleProviderHandler: Processing stream chunk")
# Extract text from the chunk
chunk_text = ""
try:
if hasattr(chunk, 'candidates') and chunk.candidates:
candidate = chunk.candidates[0] if chunk.candidates else None
if candidate and hasattr(candidate, 'content') and candidate.content:
if hasattr(candidate.content, 'parts') and candidate.content.parts:
for part in candidate.content.parts:
if hasattr(part, 'text') and part.text:
chunk_text += part.text
except Exception as e:
logging.error(f"Error extracting text from stream chunk: {e}")
# Create OpenAI-compatible chunk
openai_chunk = {
"id": f"google-{model}-{int(time.time())}-chunk-{chunk_id}",
"object": "chat.completion.chunk",
"created": int(time.time()),
"model": model,
"choices": [{
"index": 0,
"delta": {
"content": chunk_text
},
"finish_reason": None
}]
}
chunk_id += 1
logging.info(f"Yielding OpenAI chunk: {openai_chunk}")
yield openai_chunk
except Exception as e:
logging.error(f"Error in stream generator: {e}", exc_info=True)
raise
return stream_generator()
# Non-streaming response
logging.info(f"GoogleProviderHandler: Response received: {response}")
self.record_success()
# Extract content from the nested response structure
# The response has candidates[0].content.parts
response_text = ""
tool_calls = None
finish_reason = "stop"
logging.info(f"=== GOOGLE RESPONSE PARSING START ===") logging.info(f"=== GOOGLE RESPONSE PARSING START ===")
logging.info(f"Response type: {type(response)}") logging.info(f"Response type: {type(response)}")
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment