Fix Google GenAI streaming handler to use async generator

- Keep stream_generator as async function (not sync)
- Wrap Google's synchronous iterator in async generator
- Properly structure if/else for streaming vs non-streaming paths
- Fix 'client has been closed' error in streaming responses

This fixes the issue where streaming requests through autoselect
were failing with 'Cannot send a request, as a client has been closed'
error.
parent 63268f97
......@@ -160,8 +160,57 @@ class GoogleProviderHandler(BaseProviderHandler):
)
logging.info(f"GoogleProviderHandler: Streaming response received")
self.record_success()
return response
# Create an async generator that yields OpenAI-compatible chunks
# Google's generate_content_stream() returns a synchronous iterator
# We need to wrap it in an async generator
async def stream_generator():
try:
chunk_id = 0
# Iterate over the sync iterator
for chunk in response:
logging.info(f"GoogleProviderHandler: Processing stream chunk")
# Extract text from the chunk
chunk_text = ""
try:
if hasattr(chunk, 'candidates') and chunk.candidates:
candidate = chunk.candidates[0] if chunk.candidates else None
if candidate and hasattr(candidate, 'content') and candidate.content:
if hasattr(candidate.content, 'parts') and candidate.content.parts:
for part in candidate.content.parts:
if hasattr(part, 'text') and part.text:
chunk_text += part.text
except Exception as e:
logging.error(f"Error extracting text from stream chunk: {e}")
# Create OpenAI-compatible chunk (complete object, not separate fields)
openai_chunk = {
"id": f"google-{model}-{int(time.time())}-chunk-{chunk_id}",
"object": "chat.completion.chunk",
"created": int(time.time()),
"model": model,
"choices": [{
"index": 0,
"delta": {
"content": chunk_text
},
"finish_reason": None
}]
}
chunk_id += 1
logging.info(f"Yielding OpenAI chunk: {openai_chunk}")
# Yield to complete chunk object as a single line
yield openai_chunk
except Exception as e:
logging.error(f"Error in stream generator: {e}", exc_info=True)
raise
return stream_generator()
else:
# Non-streaming request
# Generate content using the google-genai client
response = self.client.models.generate_content(
model=model,
......@@ -169,267 +218,216 @@ class GoogleProviderHandler(BaseProviderHandler):
config=config
)
# Handle streaming response
if stream:
logging.info(f"GoogleProviderHandler: Processing streaming response")
# Create a generator that yields OpenAI-compatible chunks
async def stream_generator():
try:
chunk_id = 0
for chunk in response:
logging.info(f"GoogleProviderHandler: Processing stream chunk")
# Extract text from the chunk
chunk_text = ""
try:
if hasattr(chunk, 'candidates') and chunk.candidates:
candidate = chunk.candidates[0] if chunk.candidates else None
if candidate and hasattr(candidate, 'content') and candidate.content:
if hasattr(candidate.content, 'parts') and candidate.content.parts:
for part in candidate.content.parts:
if hasattr(part, 'text') and part.text:
chunk_text += part.text
except Exception as e:
logging.error(f"Error extracting text from stream chunk: {e}")
# Create OpenAI-compatible chunk (complete object, not separate fields)
openai_chunk = {
"id": f"google-{model}-{int(time.time())}-chunk-{chunk_id}",
"object": "chat.completion.chunk",
"created": int(time.time()),
"model": model,
"choices": [{
"index": 0,
"delta": {
"content": chunk_text
},
"finish_reason": None
}]
}
chunk_id += 1
logging.info(f"Yielding OpenAI chunk: {openai_chunk}")
# Yield the complete chunk object as a single line
yield openai_chunk
except Exception as e:
logging.error(f"Error in stream generator: {e}", exc_info=True)
raise
return stream_generator()
# Non-streaming response
logging.info(f"GoogleProviderHandler: Response received: {response}")
self.record_success()
# Extract content from the nested response structure
# The response has candidates[0].content.parts
response_text = ""
tool_calls = None
finish_reason = "stop"
logging.info(f"=== GOOGLE RESPONSE PARSING START ===")
logging.info(f"Response type: {type(response)}")
logging.info(f"Response attributes: {dir(response)}")
try:
# Check if response has candidates
if hasattr(response, 'candidates'):
logging.info(f"Response has 'candidates' attribute")
logging.info(f"Candidates: {response.candidates}")
logging.info(f"Candidates type: {type(response.candidates)}")
logging.info(f"Candidates length: {len(response.candidates) if hasattr(response.candidates, '__len__') else 'N/A'}")
if response.candidates:
logging.info(f"Candidates is not empty, getting first candidate")
candidate = response.candidates[0]
logging.info(f"Candidate type: {type(candidate)}")
logging.info(f"Candidate attributes: {dir(candidate)}")
# Extract finish reason
if hasattr(candidate, 'finish_reason'):
logging.info(f"Candidate has 'finish_reason' attribute")
logging.info(f"Finish reason: {candidate.finish_reason}")
# Map Google finish reasons to OpenAI format
finish_reason_map = {
'STOP': 'stop',
'MAX_TOKENS': 'length',
'SAFETY': 'content_filter',
'RECITATION': 'content_filter',
'OTHER': 'stop'
}
google_finish_reason = str(candidate.finish_reason)
finish_reason = finish_reason_map.get(google_finish_reason, 'stop')
logging.info(f"Mapped finish reason: {finish_reason}")
else:
logging.warning(f"Candidate does NOT have 'finish_reason' attribute")
logging.info(f"=== GOOGLE RESPONSE PARSING START ===")
logging.info(f"Response type: {type(response)}")
logging.info(f"Response attributes: {dir(response)}")
try:
# Check if response has candidates
if hasattr(response, 'candidates'):
logging.info(f"Response has 'candidates' attribute")
logging.info(f"Candidates: {response.candidates}")
logging.info(f"Candidates type: {type(response.candidates)}")
logging.info(f"Candidates length: {len(response.candidates) if hasattr(response.candidates, '__len__') else 'N/A'}")
# Extract content
if hasattr(candidate, 'content'):
logging.info(f"Candidate has 'content' attribute")
logging.info(f"Content: {candidate.content}")
logging.info(f"Content type: {type(candidate.content)}")
logging.info(f"Content attributes: {dir(candidate.content)}")
if response.candidates:
logging.info(f"Candidates is not empty, getting first candidate")
candidate = response.candidates[0]
logging.info(f"Candidate type: {type(candidate)}")
logging.info(f"Candidate attributes: {dir(candidate)}")
# Extract finish reason
if hasattr(candidate, 'finish_reason'):
logging.info(f"Candidate has 'finish_reason' attribute")
logging.info(f"Finish reason: {candidate.finish_reason}")
# Map Google finish reasons to OpenAI format
finish_reason_map = {
'STOP': 'stop',
'MAX_TOKENS': 'length',
'SAFETY': 'content_filter',
'RECITATION': 'content_filter',
'OTHER': 'stop'
}
google_finish_reason = str(candidate.finish_reason)
finish_reason = finish_reason_map.get(google_finish_reason, 'stop')
logging.info(f"Mapped finish reason: {finish_reason}")
else:
logging.warning(f"Candidate does NOT have 'finish_reason' attribute")
if candidate.content:
logging.info(f"Content is not empty")
# Extract content
if hasattr(candidate, 'content'):
logging.info(f"Candidate has 'content' attribute")
logging.info(f"Content: {candidate.content}")
logging.info(f"Content type: {type(candidate.content)}")
logging.info(f"Content attributes: {dir(candidate.content)}")
if hasattr(candidate.content, 'parts'):
logging.info(f"Content has 'parts' attribute")
logging.info(f"Parts: {candidate.content.parts}")
logging.info(f"Parts type: {type(candidate.content.parts)}")
logging.info(f"Parts length: {len(candidate.content.parts) if hasattr(candidate.content.parts, '__len__') else 'N/A'}")
if candidate.content:
logging.info(f"Content is not empty")
if candidate.content.parts:
logging.info(f"Parts is not empty, processing all parts")
# Process all parts to extract text and tool calls
text_parts = []
openai_tool_calls = []
call_id = 0
if hasattr(candidate.content, 'parts'):
logging.info(f"Content has 'parts' attribute")
logging.info(f"Parts: {candidate.content.parts}")
logging.info(f"Parts type: {type(candidate.content.parts)}")
logging.info(f"Parts length: {len(candidate.content.parts) if hasattr(candidate.content.parts, '__len__') else 'N/A'}")
for idx, part in enumerate(candidate.content.parts):
logging.info(f"Processing part {idx}")
logging.info(f"Part type: {type(part)}")
logging.info(f"Part attributes: {dir(part)}")
if candidate.content.parts:
logging.info(f"Parts is not empty, processing all parts")
# Check for text content
if hasattr(part, 'text') and part.text:
logging.info(f"Part {idx} has 'text' attribute")
text_parts.append(part.text)
logging.info(f"Part {idx} text length: {len(part.text)}")
# Process all parts to extract text and tool calls
text_parts = []
openai_tool_calls = []
call_id = 0
# Check for function calls (Google's format)
if hasattr(part, 'function_call') and part.function_call:
logging.info(f"Part {idx} has 'function_call' attribute")
logging.info(f"Function call: {part.function_call}")
for idx, part in enumerate(candidate.content.parts):
logging.info(f"Processing part {idx}")
logging.info(f"Part type: {type(part)}")
logging.info(f"Part attributes: {dir(part)}")
# Convert Google function call to OpenAI format
try:
function_call = part.function_call
openai_tool_call = {
"id": f"call_{call_id}",
"type": "function",
"function": {
"name": function_call.name,
"arguments": function_call.args if hasattr(function_call, 'args') else {}
# Check for text content
if hasattr(part, 'text') and part.text:
logging.info(f"Part {idx} has 'text' attribute")
text_parts.append(part.text)
logging.info(f"Part {idx} text length: {len(part.text)}")
# Check for function calls (Google's format)
if hasattr(part, 'function_call') and part.function_call:
logging.info(f"Part {idx} has 'function_call' attribute")
logging.info(f"Function call: {part.function_call}")
# Convert Google function call to OpenAI format
try:
function_call = part.function_call
openai_tool_call = {
"id": f"call_{call_id}",
"type": "function",
"function": {
"name": function_call.name,
"arguments": function_call.args if hasattr(function_call, 'args') else {}
}
}
}
openai_tool_calls.append(openai_tool_call)
call_id += 1
logging.info(f"Converted function call to OpenAI format: {openai_tool_call}")
except Exception as e:
logging.error(f"Error converting function call: {e}", exc_info=True)
openai_tool_calls.append(openai_tool_call)
call_id += 1
logging.info(f"Converted function call to OpenAI format: {openai_tool_call}")
except Exception as e:
logging.error(f"Error converting function call: {e}", exc_info=True)
# Check for function response (tool output)
if hasattr(part, 'function_response') and part.function_response:
logging.info(f"Part {idx} has 'function_response' attribute")
logging.info(f"Function response: {part.function_response}")
# Function responses are typically handled in the request, not response
# But we log them for debugging
# Check for function response (tool output)
if hasattr(part, 'function_response') and part.function_response:
logging.info(f"Part {idx} has 'function_response' attribute")
logging.info(f"Function response: {part.function_response}")
# Function responses are typically handled in the request, not response
# But we log them for debugging
# Combine all text parts
response_text = "\n".join(text_parts)
logging.info(f"Combined text length: {len(response_text)}")
logging.info(f"Combined text (first 200 chars): {response_text[:200] if response_text else 'None'}")
# Set tool_calls if we have any
if openai_tool_calls:
tool_calls = openai_tool_calls
logging.info(f"Total tool calls: {len(tool_calls)}")
for tc in tool_calls:
logging.info(f" - {tc}")
# Combine all text parts
response_text = "\n".join(text_parts)
logging.info(f"Combined text length: {len(response_text)}")
logging.info(f"Combined text (first 200 chars): {response_text[:200] if response_text else 'None'}")
# Set tool_calls if we have any
if openai_tool_calls:
tool_calls = openai_tool_calls
logging.info(f"Total tool calls: {len(tool_calls)}")
for tc in tool_calls:
logging.info(f" - {tc}")
else:
logging.info(f"No tool calls found")
else:
logging.info(f"No tool calls found")
logging.error(f"Parts is empty")
else:
logging.error(f"Parts is empty")
logging.error(f"Content does NOT have 'parts' attribute")
else:
logging.error(f"Content does NOT have 'parts' attribute")
logging.error(f"Content is empty")
else:
logging.error(f"Content is empty")
logging.error(f"Candidate does NOT have 'content' attribute")
else:
logging.error(f"Candidate does NOT have 'content' attribute")
logging.error(f"Candidates is empty")
else:
logging.error(f"Candidates is empty")
else:
logging.error(f"Response does NOT have 'candidates' attribute")
logging.error(f"Response does NOT have 'candidates' attribute")
logging.info(f"Final response_text length: {len(response_text)}")
logging.info(f"Final response_text (first 200 chars): {response_text[:200] if response_text else 'None'}")
logging.info(f"Final tool_calls: {tool_calls}")
logging.info(f"Final finish_reason: {finish_reason}")
except Exception as e:
logging.error(f"GoogleProviderHandler: Exception during response parsing: {e}", exc_info=True)
response_text = ""
logging.info(f"Final response_text length: {len(response_text)}")
logging.info(f"Final response_text (first 200 chars): {response_text[:200] if response_text else 'None'}")
logging.info(f"Final tool_calls: {tool_calls}")
logging.info(f"Final finish_reason: {finish_reason}")
except Exception as e:
logging.error(f"GoogleProviderHandler: Exception during response parsing: {e}", exc_info=True)
response_text = ""
logging.info(f"=== GOOGLE RESPONSE PARSING END ===")
logging.info(f"=== GOOGLE RESPONSE PARSING END ===")
# Extract usage metadata from the response
prompt_tokens = 0
completion_tokens = 0
total_tokens = 0
try:
if hasattr(response, 'usage_metadata') and response.usage_metadata:
usage_metadata = response.usage_metadata
prompt_tokens = getattr(usage_metadata, 'prompt_token_count', 0)
completion_tokens = getattr(usage_metadata, 'candidates_token_count', 0)
total_tokens = getattr(usage_metadata, 'total_token_count', 0)
logging.info(f"GoogleProviderHandler: Usage metadata - prompt: {prompt_tokens}, completion: {completion_tokens}, total: {total_tokens}")
except Exception as e:
logging.warning(f"GoogleProviderHandler: Could not extract usage metadata: {e}")
# Build the OpenAI-style response
openai_response = {
"id": f"google-{model}-{int(time.time())}",
"object": "chat.completion",
"created": int(time.time()),
"model": model,
"choices": [{
"index": 0,
"message": {
"role": "assistant",
"content": response_text if response_text else None
},
"finish_reason": finish_reason
}],
"usage": {
"prompt_tokens": prompt_tokens,
"completion_tokens": completion_tokens,
"total_tokens": total_tokens
# Extract usage metadata from the response
prompt_tokens = 0
completion_tokens = 0
total_tokens = 0
try:
if hasattr(response, 'usage_metadata') and response.usage_metadata:
usage_metadata = response.usage_metadata
prompt_tokens = getattr(usage_metadata, 'prompt_token_count', 0)
completion_tokens = getattr(usage_metadata, 'candidates_token_count', 0)
total_tokens = getattr(usage_metadata, 'total_token_count', 0)
logging.info(f"GoogleProviderHandler: Usage metadata - prompt: {prompt_tokens}, completion: {completion_tokens}, total: {total_tokens}")
except Exception as e:
logging.warning(f"GoogleProviderHandler: Could not extract usage metadata: {e}")
# Build the OpenAI-style response
openai_response = {
"id": f"google-{model}-{int(time.time())}",
"object": "chat.completion",
"created": int(time.time()),
"model": model,
"choices": [{
"index": 0,
"message": {
"role": "assistant",
"content": response_text if response_text else None
},
"finish_reason": finish_reason
}],
"usage": {
"prompt_tokens": prompt_tokens,
"completion_tokens": completion_tokens,
"total_tokens": total_tokens
}
}
}
# Add tool_calls to the message if present
if tool_calls:
openai_response["choices"][0]["message"]["tool_calls"] = tool_calls
# If there are tool calls, content should be None (OpenAI convention)
openai_response["choices"][0]["message"]["content"] = None
logging.info(f"Added tool_calls to response message")
# Log the final response structure
logging.info(f"=== FINAL OPENAI RESPONSE STRUCTURE ===")
logging.info(f"Response type: {type(openai_response)}")
logging.info(f"Response keys: {openai_response.keys()}")
logging.info(f"Response id: {openai_response['id']}")
logging.info(f"Response object: {openai_response['object']}")
logging.info(f"Response created: {openai_response['created']}")
logging.info(f"Response model: {openai_response['model']}")
logging.info(f"Response choices count: {len(openai_response['choices'])}")
logging.info(f"Response choices[0] index: {openai_response['choices'][0]['index']}")
logging.info(f"Response choices[0] message role: {openai_response['choices'][0]['message']['role']}")
logging.info(f"Response choices[0] message content length: {len(openai_response['choices'][0]['message']['content'])}")
logging.info(f"Response choices[0] message content (first 200 chars): {openai_response['choices'][0]['message']['content'][:200]}")
logging.info(f"Response choices[0] finish_reason: {openai_response['choices'][0]['finish_reason']}")
logging.info(f"Response usage: {openai_response['usage']}")
logging.info(f"=== END FINAL OPENAI RESPONSE STRUCTURE ===")
# Return the response dict directly without Pydantic validation
# Pydantic validation might be causing serialization issues
logging.info(f"GoogleProviderHandler: Returning response dict (no validation)")
logging.info(f"Response dict keys: {openai_response.keys()}")
return openai_response
# Add tool_calls to the message if present
if tool_calls:
openai_response["choices"][0]["message"]["tool_calls"] = tool_calls
# If there are tool calls, content should be None (OpenAI convention)
openai_response["choices"][0]["message"]["content"] = None
logging.info(f"Added tool_calls to response message")
# Log the final response structure
logging.info(f"=== FINAL OPENAI RESPONSE STRUCTURE ===")
logging.info(f"Response type: {type(openai_response)}")
logging.info(f"Response keys: {openai_response.keys()}")
logging.info(f"Response id: {openai_response['id']}")
logging.info(f"Response object: {openai_response['object']}")
logging.info(f"Response created: {openai_response['created']}")
logging.info(f"Response model: {openai_response['model']}")
logging.info(f"Response choices count: {len(openai_response['choices'])}")
logging.info(f"Response choices[0] index: {openai_response['choices'][0]['index']}")
logging.info(f"Response choices[0] message role: {openai_response['choices'][0]['message']['role']}")
logging.info(f"Response choices[0] message content length: {len(openai_response['choices'][0]['message']['content'])}")
logging.info(f"Response choices[0] message content (first 200 chars): {openai_response['choices'][0]['message']['content'][:200]}")
logging.info(f"Response choices[0] finish_reason: {openai_response['choices'][0]['finish_reason']}")
logging.info(f"Response usage: {openai_response['usage']}")
logging.info(f"=== END FINAL OPENAI RESPONSE STRUCTURE ===")
# Return the response dict directly without Pydantic validation
# Pydantic validation might be causing serialization issues
logging.info(f"GoogleProviderHandler: Returning response dict (no validation)")
logging.info(f"Response dict keys: {openai_response.keys()}")
return openai_response
except Exception as e:
import logging
logging.error(f"GoogleProviderHandler: Error: {str(e)}", exc_info=True)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment