Run llama.cpp stream in background thread to prevent blocking

parent a9d112b5
......@@ -1047,19 +1047,47 @@ class VulkanBackend(ModelBackend):
total_content = ""
chunk_count = 0
queue = asyncio.Queue()
stream_done = asyncio.Event()
stream_error = None
def run_stream():
"""Run the synchronous stream in a thread."""
nonlocal stream_error
try:
print(f"DEBUG: generate_chat_stream: Calling create_chat_completion with tools={tools}")
stream = self.model.create_chat_completion(
messages=messages,
max_tokens=max_tokens,
temperature=temperature,
top_p=top_p,
stop=stop or [],
tools=tools,
stream=True,
)
print(f"DEBUG: generate_chat_stream: Got stream object: {type(stream)}")
for chunk in stream:
# Put chunk in queue for async consumer
asyncio.run_coroutine_threadsafe(queue.put(chunk), loop)
asyncio.run_coroutine_threadsafe(queue.put(None), loop) # Sentinel
except Exception as e:
stream_error = e
asyncio.run_coroutine_threadsafe(queue.put(None), loop)
# Get the event loop
loop = asyncio.get_event_loop()
# Start stream in background thread
import threading
thread = threading.Thread(target=run_stream)
thread.start()
try:
print(f"DEBUG: generate_chat_stream: Calling create_chat_completion with tools={tools}")
stream = self.model.create_chat_completion(
messages=messages,
max_tokens=max_tokens,
temperature=temperature,
top_p=top_p,
stop=stop or [],
tools=tools,
stream=True,
)
print(f"DEBUG: generate_chat_stream: Got stream object: {type(stream)}")
for chunk in stream:
while True:
chunk = await queue.get()
if chunk is None: # Sentinel
break
chunk_count += 1
print(f"DEBUG: generate_chat_stream: Raw chunk {chunk_count}: {repr(chunk)}")
delta = chunk["choices"][0].get("delta", {})
......@@ -1070,6 +1098,7 @@ class VulkanBackend(ModelBackend):
if content:
total_content += content
yield content
print(f"DEBUG: generate_chat_stream yielded {chunk_count} chunks, total content length: {len(total_content)}")
if chunk_count == 0 or not total_content.strip():
print(f"DEBUG: Empty stream from create_chat_completion, using fallback")
......@@ -1086,6 +1115,8 @@ class VulkanBackend(ModelBackend):
yield chunk
else:
print(f"DEBUG: Stream completed with {chunk_count} chunks")
finally:
thread.join(timeout=5)
def _manual_format_messages(self, messages: List[Dict]) -> str:
"""Manual fallback for formatting messages when create_chat_completion fails."""
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment