Fix Google GenAI streaming response handling

parent 77c08ee2
......@@ -761,6 +761,8 @@ class AutoselectHandler:
{**request_data, "stream": True}
)
logger.info(f"Autoselect stream response type: {type(response)}")
# Check if this is a Google streaming response (synchronous iterator)
# Google's generate_content_stream() returns a sync iterator, not async
is_google_stream = hasattr(response, '__iter__') and not hasattr(response, '__aiter__')
......@@ -790,10 +792,10 @@ class AutoselectHandler:
# Create OpenAI-compatible chunk
openai_chunk = {
"id": f"google-{selected_model_id}-{int(time.time())}-chunk-{chunk_id}",
"id": f"google-{request_data['model']}-{int(time.time())}-chunk-{chunk_id}",
"object": "chat.completion.chunk",
"created": int(time.time()),
"model": selected_model_id,
"model": request_data['model'],
"choices": [{
"index": 0,
"delta": {
......@@ -818,7 +820,9 @@ class AutoselectHandler:
continue
else:
# Handle OpenAI/Anthropic streaming responses (async iterators)
for chunk in response:
if hasattr(response, '__aiter__'):
# It's an async iterator
async for chunk in response:
try:
# Debug: Log chunk type and content before serialization
logger.debug(f"Chunk type: {type(chunk)}")
......@@ -844,8 +848,10 @@ class AutoselectHandler:
logger.warning(f"Chunk content: {chunk}")
# Skip this chunk and continue with the next one
continue
else:
logger.warning(f"Unknown stream type: {type(response)}")
except Exception as e:
logger.error(f"Error in streaming response: {str(e)}")
logger.error(f"Error in streaming response: {str(e)}", exc_info=True)
import json
error_dict = {"error": str(e)}
yield f"data: {json.dumps(error_dict)}\n\n".encode('utf-8')
......
......@@ -216,9 +216,61 @@ async def rotation_chat_completions(request: Request, body: ChatCompletionReques
if not rotation_config:
raise HTTPException(status_code=400, detail=f"Rotation {body.model} not found")
# Check if this is a Google streaming response
async def stream_generator():
try:
response = await rotation_handler.handle_rotation_request(body.model, body_dict)
# Check if response is a Google-style streaming response (sync iterator)
is_google_stream = hasattr(response, '__iter__') and not hasattr(response, '__aiter__')
logger.debug(f"Rotation stream type: {'Google' if is_google_stream else 'OpenAI/Anthropic'}")
if is_google_stream:
# Handle Google's synchronous streaming response
chunk_id = 0
for chunk in response:
try:
logger.debug(f"Google chunk type: {type(chunk)}")
logger.debug(f"Google chunk: {chunk}")
# Extract text from Google chunk
chunk_text = ""
try:
if hasattr(chunk, 'candidates') and chunk.candidates:
candidate = chunk.candidates[0] if chunk.candidates else None
if candidate and hasattr(candidate, 'content') and candidate.content:
if hasattr(candidate.content, 'parts') and candidate.content.parts:
for part in candidate.content.parts:
if hasattr(part, 'text') and part.text:
chunk_text += part.text
except Exception as e:
logger.error(f"Error extracting text from Google chunk: {e}")
# Create OpenAI-compatible chunk
openai_chunk = {
"id": f"google-{body.model}-{int(time.time())}-chunk-{chunk_id}",
"object": "chat.completion.chunk",
"created": int(time.time()),
"model": body.model,
"choices": [{
"index": 0,
"delta": {
"content": chunk_text
},
"finish_reason": None
}]
}
chunk_id += 1
logger.debug(f"OpenAI chunk: {openai_chunk}")
import json
yield f"data: {json.dumps(openai_chunk)}\n\n".encode('utf-8')
except Exception as chunk_error:
logger.error(f"Error processing Google chunk: {str(chunk_error)}")
continue
else:
# Handle OpenAI/Anthropic streaming responses (async iterators)
for chunk in response:
try:
chunk_dict = chunk.model_dump() if hasattr(chunk, 'model_dump') else chunk
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment