Simplify async streaming using run_in_executor instead of manual thread

parent 8af5cf90
......@@ -1047,57 +1047,46 @@ class VulkanBackend(ModelBackend):
total_content = ""
chunk_count = 0
queue = asyncio.Queue()
stream_done = asyncio.Event()
stream_error = None
def run_stream():
"""Run the synchronous stream in a thread."""
nonlocal stream_error
try:
print(f"DEBUG: generate_chat_stream: Calling create_chat_completion with tools={tools}")
stream = self.model.create_chat_completion(
messages=messages,
max_tokens=max_tokens,
temperature=temperature,
top_p=top_p,
stop=stop or [],
tools=tools,
stream=True,
)
print(f"DEBUG: generate_chat_stream: Got stream object: {type(stream)}")
for chunk in stream:
# Put chunk in queue for async consumer
asyncio.run_coroutine_threadsafe(queue.put(chunk), loop)
asyncio.run_coroutine_threadsafe(queue.put(None), loop) # Sentinel
except Exception as e:
stream_error = e
asyncio.run_coroutine_threadsafe(queue.put(None), loop)
# Get the event loop
loop = asyncio.get_event_loop()
# Start stream in background thread
import threading
thread = threading.Thread(target=run_stream)
thread.start()
def sync_generator():
"""Synchronous generator that runs in executor."""
print(f"DEBUG: generate_chat_stream: Calling create_chat_completion with tools={tools}")
stream = self.model.create_chat_completion(
messages=messages,
max_tokens=max_tokens,
temperature=temperature,
top_p=top_p,
stop=stop or [],
tools=tools,
stream=True,
)
print(f"DEBUG: generate_chat_stream: Got stream object: {type(stream)}")
for chunk in stream:
yield chunk
try:
# Run synchronous generator in thread pool
loop = asyncio.get_event_loop()
gen = sync_generator()
while True:
chunk = await queue.get()
if chunk is None: # Sentinel
try:
chunk = await loop.run_in_executor(None, lambda: next(gen, None))
if chunk is None:
break
chunk_count += 1
print(f"DEBUG: generate_chat_stream: Raw chunk {chunk_count}: {repr(chunk)}")
delta = chunk["choices"][0].get("delta", {})
content = delta.get("content", "")
# Handle Qwen3's special thinking token - skip it and continue
# Qwen3 uses `<think>` tags for reasoning, we should pass through the content
if content:
total_content += content
yield content
except StopIteration:
break
chunk_count += 1
print(f"DEBUG: generate_chat_stream: Raw chunk {chunk_count}: {repr(chunk)}")
delta = chunk["choices"][0].get("delta", {})
content = delta.get("content", "")
# Handle Qwen3's special thinking token - skip it and continue
# Qwen3 uses `<think>` tags for reasoning, we should pass through the content
if content:
total_content += content
yield content
print(f"DEBUG: generate_chat_stream yielded {chunk_count} chunks, total content length: {len(total_content)}")
if chunk_count == 0 or not total_content.strip():
......@@ -1115,8 +1104,6 @@ class VulkanBackend(ModelBackend):
yield chunk
else:
print(f"DEBUG: Stream completed with {chunk_count} chunks")
finally:
thread.join(timeout=5)
def _manual_format_messages(self, messages: List[Dict]) -> str:
"""Manual fallback for formatting messages when create_chat_completion fails."""
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment