Coderay timeouts

parent 1bbd81d6
......@@ -68,6 +68,9 @@ class PendingCoderAIRequest:
stream_queue: asyncio.Queue | None = None
event_log: list[Dict[str, Any]] = field(default_factory=list)
request_snapshot: Dict[str, Any] = field(default_factory=dict)
# Updated by keepalive/pending messages from coderai to extend deadline.
last_keepalive: float = 0.0
keepalive_timeout: float = 0.0
@dataclass
......@@ -595,7 +598,29 @@ class CoderAIBroker:
async def wait_reply() -> Dict[str, Any]:
if use_local_fast_path:
return await asyncio.wait_for(future, timeout=timeout)
# Poll with short intervals so keepalive messages can extend the deadline.
# asyncio.shield() prevents cancelling the original future on each interval
# timeout, letting us reuse the same future across multiple poll rounds.
deadline = time.time() + timeout
while True:
remaining = deadline - time.time()
if remaining <= 0:
raise asyncio.TimeoutError()
try:
return await asyncio.wait_for(
asyncio.shield(future), timeout=min(remaining, 10.0)
)
except asyncio.TimeoutError:
async with self._lock:
pending = self._pending.get(request_id)
if pending and pending.last_keepalive > 0:
new_deadline = pending.last_keepalive + pending.keepalive_timeout
if new_deadline > deadline:
deadline = new_deadline
logger.info(
"CoderAI request %s deadline extended by keepalive to +%.0fs",
request_id, pending.keepalive_timeout,
)
# Slow path: poll the shared cache for the reply.
deadline = time.time() + timeout
......@@ -607,8 +632,19 @@ class CoderAIBroker:
return reply
if future.done():
return future.result()
# Periodically verify the session is still alive on the remote node.
now = time.time()
# Extend deadline if keepalive received.
async with self._lock:
pending = self._pending.get(request_id)
if pending and pending.last_keepalive > 0:
new_deadline = pending.last_keepalive + pending.keepalive_timeout
if new_deadline > deadline:
deadline = new_deadline
logger.info(
"CoderAI request %s deadline extended by keepalive to +%.0fs",
request_id, pending.keepalive_timeout,
)
# Periodically verify the session is still alive on the remote node.
if now - last_liveness >= LIVENESS_CHECK_INTERVAL:
live = self._cache.broker_get(
self._session_meta_key(provider_id, resolved_client_id)
......@@ -625,7 +661,8 @@ class CoderAIBroker:
await asyncio.sleep(0.1)
raise asyncio.TimeoutError()
result = await asyncio.wait_for(wait_reply(), timeout=timeout)
# Deadline is managed inside wait_reply(); no outer wait_for wrapper needed.
result = await wait_reply()
if not future.done():
future.set_result(result)
return result
......@@ -646,6 +683,23 @@ class CoderAIBroker:
if not request_id:
return
event = message.get("event")
status = message.get("status")
# Keepalive/pending: coderai signals it's still working (e.g. downloading a model).
# Extend the deadline without resolving the future.
if event == "pending" or status == "pending":
async with self._lock:
pending = self._pending.get(request_id)
if pending:
pending.last_keepalive = time.time()
est = float((message.get("payload") or {}).get("estimated_timeout", 1800))
pending.keepalive_timeout = est
logger.info(
"CoderAI request %s keepalive received, deadline extended by %.0fs",
request_id, est,
)
return
if event in {"chunk", "progress", "output", "log", "data", "done", "completed"}:
await self._publish_stream_response(message)
return
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment