Collect all chunks in thread pool before yielding to avoid generator issues

parent bf2b3b0a
...@@ -1048,8 +1048,10 @@ class VulkanBackend(ModelBackend): ...@@ -1048,8 +1048,10 @@ class VulkanBackend(ModelBackend):
total_content = "" total_content = ""
chunk_count = 0 chunk_count = 0
def sync_generator(): # Collect all chunks synchronously then yield them
"""Synchronous generator that runs in executor.""" # This avoids issues with generators across thread boundaries
def collect_chunks():
"""Collect all chunks from the stream."""
print(f"DEBUG: generate_chat_stream: Calling create_chat_completion with tools={tools}") print(f"DEBUG: generate_chat_stream: Calling create_chat_completion with tools={tools}")
stream = self.model.create_chat_completion( stream = self.model.create_chat_completion(
messages=messages, messages=messages,
...@@ -1061,32 +1063,31 @@ class VulkanBackend(ModelBackend): ...@@ -1061,32 +1063,31 @@ class VulkanBackend(ModelBackend):
stream=True, stream=True,
) )
print(f"DEBUG: generate_chat_stream: Got stream object: {type(stream)}") print(f"DEBUG: generate_chat_stream: Got stream object: {type(stream)}")
chunks = []
for chunk in stream: for chunk in stream:
yield chunk chunks.append(chunk)
print(f"DEBUG: generate_chat_stream: Collected {len(chunks)} chunks")
return chunks
try: try:
# Run synchronous generator in thread pool # Run the collection in thread pool
loop = asyncio.get_event_loop() loop = asyncio.get_event_loop()
gen = sync_generator() chunks = await loop.run_in_executor(None, collect_chunks)
while True: for chunk in chunks:
try: chunk_count += 1
chunk = await loop.run_in_executor(None, lambda: next(gen, None)) print(f"DEBUG: generate_chat_stream: Processing chunk {chunk_count}: {repr(chunk)}")
if chunk is None: delta = chunk["choices"][0].get("delta", {})
break content = delta.get("content", "")
chunk_count += 1 # Handle Qwen3's special thinking token - skip it and continue
print(f"DEBUG: generate_chat_stream: Raw chunk {chunk_count}: {repr(chunk)}") # Qwen3 uses `<think>` tags for reasoning, we should pass through the content
delta = chunk["choices"][0].get("delta", {}) if content:
content = delta.get("content", "") total_content += content
yield content
# Handle Qwen3's special thinking token - skip it and continue
# Qwen3 uses `<think>` tags for reasoning, we should pass through the content # Small yield to allow other async tasks
if content: await asyncio.sleep(0)
total_content += content
yield content
except StopIteration:
break
print(f"DEBUG: generate_chat_stream yielded {chunk_count} chunks, total content length: {len(total_content)}") print(f"DEBUG: generate_chat_stream yielded {chunk_count} chunks, total content length: {len(total_content)}")
if chunk_count == 0 or not total_content.strip(): if chunk_count == 0 or not total_content.strip():
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment