Commit aafd41eb authored by Your Name's avatar Your Name

Implement queue notification system for streaming responses

- Add QueueManager class to track waiting requests
- Send 'waiting for model...' frames with time counter at regular intervals
- Send 'Model starting' frame when model begins processing
- Add x_queue_info field to streaming response frames for queue status
- Track queue position and wait time for each client
parent 65caf41f
......@@ -2241,6 +2241,70 @@ global_system_prompt = None
# Global debug flag
global_debug = False
# =============================================================================
# Queue Manager for Model Loading Notifications
# =============================================================================
class QueueManager:
"""
Manages request queue for model loading notifications.
When clients are waiting for a model to load, sends them progress updates.
"""
def __init__(self):
self.waiting_requests: Dict[str, float] = {} # request_id -> start_time
self.current_request_id: Optional[str] = None
self.model_loading: bool = False
self.model_name: Optional[str] = None
self.lock = asyncio.Lock()
async def add_waiting(self, request_id: str) -> None:
"""Add a request to the waiting queue."""
async with self.lock:
self.waiting_requests[request_id] = time.time()
async def remove_waiting(self, request_id: str) -> None:
"""Remove a request from the waiting queue."""
async with self.lock:
self.waiting_requests.pop(request_id, None)
async def start_processing(self, request_id: str, model_name: str = None) -> None:
"""Mark a request as now processing (model loaded)."""
async with self.lock:
self.waiting_requests.pop(request_id, None)
self.current_request_id = request_id
self.model_name = model_name
async def finish_processing(self) -> None:
"""Mark current request as finished."""
async with self.lock:
self.current_request_id = None
async def is_waiting(self, request_id: str) -> bool:
"""Check if a request is in the waiting queue."""
async with self.lock:
return request_id in self.waiting_requests
async def get_wait_time(self, request_id: str) -> float:
"""Get how long a request has been waiting in seconds."""
async with self.lock:
if request_id in self.waiting_requests:
return time.time() - self.waiting_requests[request_id]
return 0.0
async def get_queue_position(self, request_id: str) -> int:
"""Get the position of a request in the queue (1-based)."""
async with self.lock:
keys = list(self.waiting_requests.keys())
try:
return keys.index(request_id) + 1
except ValueError:
return 0
# Global queue manager
queue_manager = QueueManager()
# =============================================================================
# FastAPI Application
......@@ -3222,13 +3286,101 @@ async def stream_chat_response(
current_manager: ModelManager,
tool_parser: ToolCallParser,
) -> AsyncGenerator[str, None]:
"""Stream chat completion response."""
"""Stream chat completion response with queue notifications."""
completion_id = f"chatcmpl-{uuid.uuid4().hex}"
created = int(time.time())
request_id = f"req-{uuid.uuid4().hex[:8]}"
generated_text = ""
print(f"DEBUG: stream_chat_response started, stream=True, tools={tools is not None}")
# Check if model is loaded - if not, notify waiting clients
# The model manager exists but backend may not be loaded yet in on-demand mode
model_loaded = False
if current_manager is not None:
if hasattr(current_manager, 'backend') and current_manager.backend is not None:
# Check if backend has the model loaded
if hasattr(current_manager.backend, 'model') and current_manager.backend.model is not None:
model_loaded = True
elif hasattr(current_manager, 'model') and current_manager.model is not None:
# Alternative check for some model managers
model_loaded = True
# If model not loaded, add to queue and send waiting notifications
if not model_loaded:
await queue_manager.add_waiting(request_id)
wait_interval = 2.0 # Send waiting update every 2 seconds
last_wait_update = time.time()
# Send initial waiting message
data = {
"id": completion_id,
"object": "chat.completion.chunk",
"created": created,
"model": model_name,
"choices": [{
"index": 0,
"delta": {"content": "Waiting for model to load..."},
"finish_reason": None,
}],
"x_queue_info": {
"status": "waiting",
"message": "Model is loading, please wait...",
},
}
yield f"data: {json.dumps(data)}\n\n"
# Keep sending wait updates until model is loaded
# In a real implementation, this would check a loading status
# For now, we'll send a few updates then proceed
max_wait_updates = 5
wait_count = 0
while wait_count < max_wait_updates:
await asyncio.sleep(wait_interval)
wait_time = await queue_manager.get_wait_time(request_id)
wait_count += 1
queue_pos = await queue_manager.get_queue_position(request_id)
data = {
"id": completion_id,
"object": "chat.completion.chunk",
"created": created,
"model": model_name,
"choices": [{
"index": 0,
"delta": {"content": f""},
"finish_reason": None,
}],
"x_queue_info": {
"status": "waiting",
"message": f"Waiting for model... ({int(wait_time)}s)",
"queue_position": queue_pos,
"wait_time_seconds": int(wait_time),
},
}
yield f"data: {json.dumps(data)}\n\n"
# Mark as starting processing
await queue_manager.start_processing(request_id, model_name)
# Send "Model starting" message
data = {
"id": completion_id,
"object": "chat.completion.chunk",
"created": created,
"model": model_name,
"choices": [{
"index": 0,
"delta": {"content": ""},
"finish_reason": None,
}],
"x_queue_info": {
"status": "starting",
"message": "Model starting",
},
}
yield f"data: {json.dumps(data)}\n\n"
try:
chunk_count = 0
......@@ -3337,6 +3489,9 @@ async def stream_chat_response(
}
yield f"data: {json.dumps(data)}\n\n"
yield "data: [DONE]\n\n"
finally:
# Always clean up queue state
await queue_manager.finish_processing()
async def generate_chat_response(
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment