Fix streaming response handling for OpenAI async iterators

parent 0e5fab02
...@@ -764,114 +764,86 @@ class AutoselectHandler: ...@@ -764,114 +764,86 @@ class AutoselectHandler:
logger.info(f"Autoselect stream response type: {type(response)}") logger.info(f"Autoselect stream response type: {type(response)}")
# Check if this is a Google streaming response # Handle streaming responses based on chunk type rather than iterator type
# Google's generate_content_stream() returns a sync iterator with chunks that have 'candidates' attribute if hasattr(response, '__aiter__'):
is_google_stream = hasattr(response, '__iter__') and not hasattr(response, '__aiter__') # Async iterator (OpenAI/Anthropic style)
logger.info(f"Handling async stream response")
# Test the first chunk to verify if it's a Google response async for chunk in response:
if is_google_stream: try:
try: logger.debug(f"Chunk type: {type(chunk)}")
# Get the first chunk to test logger.debug(f"Chunk: {chunk}")
import itertools
first_chunk = next(iter(response)) chunk_dict = chunk.model_dump() if hasattr(chunk, 'model_dump') else chunk
# Check if it's a Google chunk by looking for 'candidates' attribute import json
if hasattr(first_chunk, 'candidates'): yield f"data: {json.dumps(chunk_dict)}\n\n".encode('utf-8')
logger.info(f"Confirmed Google streaming response") except Exception as chunk_error:
else: error_msg = str(chunk_error)
logger.warning(f"Response is sync iterator but not Google format - treating as OpenAI/Anthropic stream") if "tool" in error_msg.lower():
is_google_stream = False logger.error(f"Tool call error during streaming: {error_msg}")
# Recreate the iterator with the first chunk logger.error(f"Chunk type: {type(chunk)}")
response = itertools.chain([first_chunk], response) logger.error(f"Chunk content: {chunk}")
except Exception as e: raise
logger.error(f"Error testing stream type: {e}") else:
is_google_stream = False logger.warning(f"Error serializing chunk: {error_msg}")
else: logger.warning(f"Chunk type: {type(chunk)}")
logger.info(f"Not a sync iterator - treating as OpenAI/Anthropic async stream") logger.warning(f"Chunk content: {chunk}")
continue
logger.info(f"Is Google streaming response: {is_google_stream}") elif hasattr(response, '__iter__'):
# Sync iterator - determine if it's Google or other format
if is_google_stream: logger.info(f"Handling sync stream response")
# Handle Google's synchronous streaming response
# Convert Google chunks to OpenAI format
chunk_id = 0 chunk_id = 0
for chunk in response: for chunk in response:
try: try:
logger.debug(f"Google chunk type: {type(chunk)}") logger.debug(f"Chunk type: {type(chunk)}")
logger.debug(f"Google chunk: {chunk}") logger.debug(f"Chunk: {chunk}")
# Extract text from Google chunk
chunk_text = ""
try:
if hasattr(chunk, 'candidates') and chunk.candidates:
candidate = chunk.candidates[0] if chunk.candidates else None
if candidate and hasattr(candidate, 'content') and candidate.content:
if hasattr(candidate.content, 'parts') and candidate.content.parts:
for part in candidate.content.parts:
if hasattr(part, 'text') and part.text:
chunk_text += part.text
except Exception as e:
logger.error(f"Error extracting text from Google chunk: {e}")
# Create OpenAI-compatible chunk
openai_chunk = {
"id": f"google-{request_data['model']}-{int(time.time())}-chunk-{chunk_id}",
"object": "chat.completion.chunk",
"created": int(time.time()),
"model": request_data['model'],
"choices": [{
"index": 0,
"delta": {
"content": chunk_text
},
"finish_reason": None
}]
}
chunk_id += 1
logger.debug(f"OpenAI chunk: {openai_chunk}")
# Serialize as JSON # Check if it's a Google-style chunk with candidates
import json if hasattr(chunk, 'candidates'):
yield f"data: {json.dumps(openai_chunk)}\n\n".encode('utf-8') logger.debug("Processing Google-style chunk")
chunk_text = ""
try:
if chunk.candidates:
candidate = chunk.candidates[0] if chunk.candidates else None
if candidate and hasattr(candidate, 'content') and candidate.content:
if hasattr(candidate.content, 'parts') and candidate.content.parts:
for part in candidate.content.parts:
if hasattr(part, 'text') and part.text:
chunk_text += part.text
except Exception as e:
logger.error(f"Error extracting text from Google chunk: {e}")
openai_chunk = {
"id": f"google-{request_data['model']}-{int(time.time())}-chunk-{chunk_id}",
"object": "chat.completion.chunk",
"created": int(time.time()),
"model": request_data['model'],
"choices": [{
"index": 0,
"delta": {
"content": chunk_text
},
"finish_reason": None
}]
}
import json
yield f"data: {json.dumps(openai_chunk)}\n\n".encode('utf-8')
chunk_id += 1
else:
logger.debug("Processing other sync stream chunk")
# For other sync stream formats (like OpenAI's sync stream)
chunk_dict = chunk.model_dump() if hasattr(chunk, 'model_dump') else chunk
import json
yield f"data: {json.dumps(chunk_dict)}\n\n".encode('utf-8')
except Exception as chunk_error: except Exception as chunk_error:
error_msg = str(chunk_error) error_msg = str(chunk_error)
logger.error(f"Error processing Google chunk: {error_msg}") logger.error(f"Error processing chunk: {error_msg}")
logger.error(f"Chunk type: {type(chunk)}") logger.error(f"Chunk type: {type(chunk)}")
logger.error(f"Chunk content: {chunk}") logger.error(f"Chunk content: {chunk}")
# Skip this chunk and continue
continue continue
else: else:
# Handle OpenAI/Anthropic streaming responses (async iterators) logger.warning(f"Unknown stream type: {type(response)}")
if hasattr(response, '__aiter__'):
# It's an async iterator
async for chunk in response:
try:
# Debug: Log chunk type and content before serialization
logger.debug(f"Chunk type: {type(chunk)}")
logger.debug(f"Chunk: {chunk}")
# Convert chunk to dict and serialize as JSON
chunk_dict = chunk.model_dump() if hasattr(chunk, 'model_dump') else chunk
import json
yield f"data: {json.dumps(chunk_dict)}\n\n".encode('utf-8')
except Exception as chunk_error:
# Handle errors during chunk serialization (e.g., tool calls without tool_choice)
# This is a critical error - the model is trying to call tools without proper configuration
error_msg = str(chunk_error)
if "tool" in error_msg.lower():
logger.error(f"Tool call error during streaming: {error_msg}")
logger.error(f"Chunk type: {type(chunk)}")
logger.error(f"Chunk content: {chunk}")
# Re-raise to trigger retry in rotation handler
raise
else:
logger.warning(f"Error serializing chunk: {error_msg}")
logger.warning(f"Chunk type: {type(chunk)}")
logger.warning(f"Chunk content: {chunk}")
# Skip this chunk and continue with the next one
continue
else:
logger.warning(f"Unknown stream type: {type(response)}")
except Exception as e: except Exception as e:
logger.error(f"Error in streaming response: {str(e)}", exc_info=True) logger.error(f"Error in streaming response: {str(e)}", exc_info=True)
import json import json
......
...@@ -293,15 +293,15 @@ async def rotation_chat_completions(request: Request, body: ChatCompletionReques ...@@ -293,15 +293,15 @@ async def rotation_chat_completions(request: Request, body: ChatCompletionReques
logger.error(f"Error processing Google chunk: {str(chunk_error)}") logger.error(f"Error processing Google chunk: {str(chunk_error)}")
continue continue
else: else:
# Handle OpenAI/Anthropic streaming responses (async iterators) # Handle OpenAI/Anthropic streaming responses (async iterators)
for chunk in response: async for chunk in response:
try: try:
chunk_dict = chunk.model_dump() if hasattr(chunk, 'model_dump') else chunk chunk_dict = chunk.model_dump() if hasattr(chunk, 'model_dump') else chunk
import json import json
yield f"data: {json.dumps(chunk_dict)}\n\n".encode('utf-8') yield f"data: {json.dumps(chunk_dict)}\n\n".encode('utf-8')
except Exception as chunk_error: except Exception as chunk_error:
logger.warning(f"Error serializing chunk: {str(chunk_error)}") logger.warning(f"Error serializing chunk: {str(chunk_error)}")
continue continue
except Exception as e: except Exception as e:
logger.error(f"Error in streaming response: {str(e)}") logger.error(f"Error in streaming response: {str(e)}")
import json import json
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment