Fix Google GenAI streaming response handling

- Return Google's synchronous iterator directly from provider handler
- Detect Google streaming responses by checking for __iter__ but not __aiter__
- Convert Google chunks to OpenAI format in stream_generator
- Handle both sync (Google) and async (OpenAI/Anthropic) streaming responses
- Fix 'async_generator object is not iterable' error

This fixes streaming requests through autoselect and rotation handlers
that were failing with 'async_generator' object is not iterable error.
parent 81e9a8f5
......@@ -146,6 +146,64 @@ class RequestHandler:
tools=request_data.get('tools'),
tool_choice=request_data.get('tool_choice')
)
# Check if this is a Google streaming response (synchronous iterator)
# Google's generate_content_stream() returns a sync iterator, not async
is_google_stream = hasattr(response, '__iter__') and not hasattr(response, '__aiter__')
logger.info(f"Is Google streaming response: {is_google_stream}")
if is_google_stream:
# Handle Google's synchronous streaming response
# Convert Google chunks to OpenAI format
chunk_id = 0
for chunk in response:
try:
logger.debug(f"Google chunk type: {type(chunk)}")
logger.debug(f"Google chunk: {chunk}")
# Extract text from Google chunk
chunk_text = ""
try:
if hasattr(chunk, 'candidates') and chunk.candidates:
candidate = chunk.candidates[0] if chunk.candidates else None
if candidate and hasattr(candidate, 'content') and candidate.content:
if hasattr(candidate.content, 'parts') and candidate.content.parts:
for part in candidate.content.parts:
if hasattr(part, 'text') and part.text:
chunk_text += part.text
except Exception as e:
logger.error(f"Error extracting text from Google chunk: {e}")
# Create OpenAI-compatible chunk
openai_chunk = {
"id": f"google-{request_data['model']}-{int(time.time())}-chunk-{chunk_id}",
"object": "chat.completion.chunk",
"created": int(time.time()),
"model": request_data['model'],
"choices": [{
"index": 0,
"delta": {
"content": chunk_text
},
"finish_reason": None
}]
}
chunk_id += 1
logger.debug(f"OpenAI chunk: {openai_chunk}")
# Serialize as JSON
import json
yield f"data: {json.dumps(openai_chunk)}\n\n".encode('utf-8')
except Exception as chunk_error:
error_msg = str(chunk_error)
logger.error(f"Error processing Google chunk: {error_msg}")
logger.error(f"Chunk type: {type(chunk)}")
logger.error(f"Chunk content: {chunk}")
# Skip this chunk and continue
continue
else:
# Handle OpenAI/Anthropic streaming responses (async iterators)
for chunk in response:
try:
# Debug: Log chunk type and content before serialization
......@@ -702,6 +760,64 @@ class AutoselectHandler:
selected_model_id,
{**request_data, "stream": True}
)
# Check if this is a Google streaming response (synchronous iterator)
# Google's generate_content_stream() returns a sync iterator, not async
is_google_stream = hasattr(response, '__iter__') and not hasattr(response, '__aiter__')
logger.info(f"Is Google streaming response: {is_google_stream}")
if is_google_stream:
# Handle Google's synchronous streaming response
# Convert Google chunks to OpenAI format
chunk_id = 0
for chunk in response:
try:
logger.debug(f"Google chunk type: {type(chunk)}")
logger.debug(f"Google chunk: {chunk}")
# Extract text from Google chunk
chunk_text = ""
try:
if hasattr(chunk, 'candidates') and chunk.candidates:
candidate = chunk.candidates[0] if chunk.candidates else None
if candidate and hasattr(candidate, 'content') and candidate.content:
if hasattr(candidate.content, 'parts') and candidate.content.parts:
for part in candidate.content.parts:
if hasattr(part, 'text') and part.text:
chunk_text += part.text
except Exception as e:
logger.error(f"Error extracting text from Google chunk: {e}")
# Create OpenAI-compatible chunk
openai_chunk = {
"id": f"google-{selected_model_id}-{int(time.time())}-chunk-{chunk_id}",
"object": "chat.completion.chunk",
"created": int(time.time()),
"model": selected_model_id,
"choices": [{
"index": 0,
"delta": {
"content": chunk_text
},
"finish_reason": None
}]
}
chunk_id += 1
logger.debug(f"OpenAI chunk: {openai_chunk}")
# Serialize as JSON
import json
yield f"data: {json.dumps(openai_chunk)}\n\n".encode('utf-8')
except Exception as chunk_error:
error_msg = str(chunk_error)
logger.error(f"Error processing Google chunk: {error_msg}")
logger.error(f"Chunk type: {type(chunk)}")
logger.error(f"Chunk content: {chunk}")
# Skip this chunk and continue
continue
else:
# Handle OpenAI/Anthropic streaming responses (async iterators)
for chunk in response:
try:
# Debug: Log chunk type and content before serialization
......
......@@ -161,54 +161,9 @@ class GoogleProviderHandler(BaseProviderHandler):
logging.info(f"GoogleProviderHandler: Streaming response received")
self.record_success()
# Create an async generator that yields OpenAI-compatible chunks
# Google's generate_content_stream() returns a synchronous iterator
# We need to wrap it in an async generator
async def stream_generator():
try:
chunk_id = 0
# Iterate over the sync iterator
for chunk in response:
logging.info(f"GoogleProviderHandler: Processing stream chunk")
# Extract text from the chunk
chunk_text = ""
try:
if hasattr(chunk, 'candidates') and chunk.candidates:
candidate = chunk.candidates[0] if chunk.candidates else None
if candidate and hasattr(candidate, 'content') and candidate.content:
if hasattr(candidate.content, 'parts') and candidate.content.parts:
for part in candidate.content.parts:
if hasattr(part, 'text') and part.text:
chunk_text += part.text
except Exception as e:
logging.error(f"Error extracting text from stream chunk: {e}")
# Create OpenAI-compatible chunk (complete object, not separate fields)
openai_chunk = {
"id": f"google-{model}-{int(time.time())}-chunk-{chunk_id}",
"object": "chat.completion.chunk",
"created": int(time.time()),
"model": model,
"choices": [{
"index": 0,
"delta": {
"content": chunk_text
},
"finish_reason": None
}]
}
chunk_id += 1
logging.info(f"Yielding OpenAI chunk: {openai_chunk}")
# Yield to complete chunk object as a single line
yield openai_chunk
except Exception as e:
logging.error(f"Error in stream generator: {e}", exc_info=True)
raise
return stream_generator()
# Return the synchronous iterator directly
# The handler will iterate over it and convert to OpenAI format
return response
else:
# Non-streaming request
# Generate content using the google-genai client
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment