Fix streaming chunk serialization error

parent e9a244cd
......@@ -222,39 +222,15 @@ async def rotation_chat_completions(request: Request, body: ChatCompletionReques
try:
response = await rotation_handler.handle_rotation_request(body.model, body_dict)
# Check if this is a Google streaming response
# Google's generate_content_stream() returns a sync iterator with chunks that have 'candidates' attribute
is_google_stream = hasattr(response, '__iter__') and not hasattr(response, '__aiter__')
# Test the first chunk to verify if it's a Google response
if is_google_stream:
try:
# Get the first chunk to test
import itertools
first_chunk = next(iter(response))
# Check if it's a Google chunk by looking for 'candidates' attribute
if hasattr(first_chunk, 'candidates'):
logger.debug(f"Confirmed Google streaming response")
else:
logger.warning(f"Response is sync iterator but not Google format - treating as OpenAI/Anthropic stream")
is_google_stream = False
# Recreate the iterator with the first chunk
response = itertools.chain([first_chunk], response)
except Exception as e:
logger.error(f"Error testing stream type: {e}")
is_google_stream = False
else:
logger.debug(f"Not a sync iterator - treating as OpenAI/Anthropic async stream")
logger.debug(f"Rotation stream type: {'Google' if is_google_stream else 'OpenAI/Anthropic'}")
if is_google_stream:
# Handle Google's synchronous streaming response
# Check if this is a generator (sync iterator) response
if hasattr(response, '__iter__') and not hasattr(response, '__aiter__'):
logger.debug("Handling synchronous generator stream response")
# This is likely a Google streaming response
chunk_id = 0
for chunk in response:
try:
logger.debug(f"Google chunk type: {type(chunk)}")
logger.debug(f"Google chunk: {chunk}")
logger.debug(f"Chunk type: {type(chunk)}")
logger.debug(f"Chunk: {chunk}")
# Extract text from Google chunk
chunk_text = ""
......@@ -267,7 +243,7 @@ async def rotation_chat_completions(request: Request, body: ChatCompletionReques
if hasattr(part, 'text') and part.text:
chunk_text += part.text
except Exception as e:
logger.error(f"Error extracting text from Google chunk: {e}")
logger.error(f"Error extracting text from chunk: {e}")
# Create OpenAI-compatible chunk
openai_chunk = {
......@@ -290,18 +266,51 @@ async def rotation_chat_completions(request: Request, body: ChatCompletionReques
import json
yield f"data: {json.dumps(openai_chunk)}\n\n".encode('utf-8')
except Exception as chunk_error:
logger.error(f"Error processing Google chunk: {str(chunk_error)}")
logger.error(f"Error processing chunk: {str(chunk_error)}")
continue
else:
elif hasattr(response, '__aiter__'):
# Handle OpenAI/Anthropic streaming responses (async iterators)
chunk_id = 0
async for chunk in response:
try:
chunk_dict = chunk.model_dump() if hasattr(chunk, 'model_dump') else chunk
# Extract text from Google chunk
chunk_text = ""
try:
if hasattr(chunk, 'candidates') and chunk.candidates:
candidate = chunk.candidates[0] if chunk.candidates else None
if candidate and hasattr(candidate, 'content') and candidate.content:
if hasattr(candidate.content, 'parts') and candidate.content.parts:
for part in candidate.content.parts:
if hasattr(part, 'text') and part.text:
chunk_text += part.text
except Exception as e:
logger.error(f"Error extracting text from chunk: {e}")
# Create OpenAI-compatible chunk
chunk_dict = {
"id": f"google-{body.model}-{int(time.time())}-chunk-{chunk_id}",
"object": "chat.completion.chunk",
"created": int(time.time()),
"model": body.model,
"choices": [{
"index": 0,
"delta": {
"content": chunk_text
},
"finish_reason": None
}]
}
chunk_id += 1
import json
yield f"data: {json.dumps(chunk_dict)}\n\n".encode('utf-8')
except Exception as chunk_error:
logger.warning(f"Error serializing chunk: {str(chunk_error)}")
continue
else:
# Handle other types of responses
logger.warning(f"Unknown response type: {type(response)}")
import json
yield f"data: {json.dumps({'error': 'Unknown response type'})}\n\n".encode('utf-8')
except Exception as e:
logger.error(f"Error in streaming response: {str(e)}")
import json
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment