front: drain in-flight requests before bouncing an engine

An engine restart (admin button / config change) previously SIGTERM'd the
process immediately, severing any active SSE stream mid-response — the client
saw httpcore.RemoteProtocolError "peer closed connection without sending
complete message body".

Now restart_engine marks the engine `draining` first: the router stops routing
NEW requests to it (Engine.is_alive() reports false while draining, and the poll
loop can't flip it back healthy), and the supervisor waits up to
server.engine_restart_drain_grace seconds (default 30, 0 = immediate) for the
in-flight count to reach zero before killing the process. Stragglers past the
grace window are still bounced.

In-flight is tracked per engine in the front proxy: proxy() increments on send
and decrements once the streamed response is fully drained (or the send failed).
Co-Authored-By: 's avatarClaude Opus 4.8 <noreply@anthropic.com>
parent 0a7d343a
...@@ -50,3 +50,6 @@ coderai-runtime/ ...@@ -50,3 +50,6 @@ coderai-runtime/
# Video editor sessions + generated media (runtime artifacts) # Video editor sessions + generated media (runtime artifacts)
video_editor/sessions/ video_editor/sessions/
tools/coderai_media/ tools/coderai_media/
.oci-build.log
.oci-rebuild.sh
.oci-*.log
...@@ -52,6 +52,9 @@ class ServerConfig: ...@@ -52,6 +52,9 @@ class ServerConfig:
engine_gpus: Optional[list] = None # explicit GPU indices, e.g. [0, 1]; None = auto engine_gpus: Optional[list] = None # explicit GPU indices, e.g. [0, 1]; None = auto
proxy_status_timeout: float = 2.0 # short timeout for UI/status proxying (seconds) proxy_status_timeout: float = 2.0 # short timeout for UI/status proxying (seconds)
proxy_max_inflight: int = 64 # max concurrent proxied requests through the front proxy_max_inflight: int = 64 # max concurrent proxied requests through the front
engine_restart_drain_grace: float = 30.0 # on engine restart, wait this many seconds
# for in-flight requests to finish before
# killing the process (0 = bounce immediately)
# Explicit, heterogeneous engine declarations. Auto GPU detection only finds # Explicit, heterogeneous engine declarations. Auto GPU detection only finds
# NVIDIA cards and assumes one backend, and CUDA vs Vulkan device enumeration is # NVIDIA cards and assumes one backend, and CUDA vs Vulkan device enumeration is
# inconsistent — so for mixed setups (e.g. an NVIDIA + a Radeon card, where the # inconsistent — so for mixed setups (e.g. an NVIDIA + a Radeon card, where the
...@@ -559,6 +562,7 @@ class ConfigManager: ...@@ -559,6 +562,7 @@ class ConfigManager:
"engine_gpus": self.config.server.engine_gpus, "engine_gpus": self.config.server.engine_gpus,
"proxy_status_timeout": self.config.server.proxy_status_timeout, "proxy_status_timeout": self.config.server.proxy_status_timeout,
"proxy_max_inflight": self.config.server.proxy_max_inflight, "proxy_max_inflight": self.config.server.proxy_max_inflight,
"engine_restart_drain_grace": self.config.server.engine_restart_drain_grace,
"engine_specs": self.config.server.engine_specs, "engine_specs": self.config.server.engine_specs,
"default_engine": self.config.server.default_engine, "default_engine": self.config.server.default_engine,
}, },
......
...@@ -578,19 +578,29 @@ class FrontProxy: ...@@ -578,19 +578,29 @@ class FrontProxy:
rp_req = self._long.build_request( rp_req = self._long.build_request(
method, url, headers=headers, params=request.query_params, method, url, headers=headers, params=request.query_params,
content=content) content=content)
# Count this as in-flight on the chosen engine so a restart can drain it:
# decremented only once the response is fully streamed (or send failed).
engine.enter_request()
try: try:
rp_resp = await self._long.send(rp_req, stream=True) rp_resp = await self._long.send(rp_req, stream=True)
except Exception as exc: except Exception as exc:
engine.exit_request()
return JSONResponse( return JSONResponse(
{"error": f"Engine#{engine.id} unreachable: {exc}"}, status_code=502) {"error": f"Engine#{engine.id} unreachable: {exc}"}, status_code=502)
async def _release():
try:
await rp_resp.aclose()
finally:
engine.exit_request()
resp_headers = self._filter_headers(rp_resp.headers, _DROP_RESP) resp_headers = self._filter_headers(rp_resp.headers, _DROP_RESP)
return StreamingResponse( return StreamingResponse(
rp_resp.aiter_raw(), rp_resp.aiter_raw(),
status_code=rp_resp.status_code, status_code=rp_resp.status_code,
headers=dict(resp_headers), headers=dict(resp_headers),
media_type=rp_resp.headers.get("content-type"), media_type=rp_resp.headers.get("content-type"),
background=BackgroundTask(rp_resp.aclose), background=BackgroundTask(_release),
) )
# ----------------------------------------------------------------- status # ----------------------------------------------------------------- status
......
...@@ -31,6 +31,7 @@ import subprocess ...@@ -31,6 +31,7 @@ import subprocess
import sys import sys
import threading import threading
import time import time
from typing import Optional
import httpx import httpx
...@@ -528,31 +529,62 @@ class EngineSupervisor: ...@@ -528,31 +529,62 @@ class EngineSupervisor:
time.sleep(1.0) # avoid a tight crash loop time.sleep(1.0) # avoid a tight crash loop
self._spawn(engine) self._spawn(engine)
def restart_engine(self, engine_id: int) -> bool: def restart_engine(self, engine_id: int, drain_grace: Optional[float] = None) -> bool:
"""Forcibly kill and respawn one engine (e.g. it's stuck in a loop). """Forcibly kill and respawn one engine (e.g. it's stuck in a loop).
Before killing, mark the engine ``draining`` so the router stops sending it
NEW requests, and wait up to ``drain_grace`` seconds for in-flight (streaming)
requests to finish — so a config-triggered bounce doesn't sever active SSE
streams mid-response. After the grace window any stragglers are dropped.
Holds the restart lock so the poll loop's own respawn can't double-spawn.""" Holds the restart lock so the poll loop's own respawn can't double-spawn."""
engine = self.registry.get(engine_id) engine = self.registry.get(engine_id)
if engine is None: if engine is None:
return False return False
if drain_grace is None:
drain_grace = float(getattr(self.config.server,
"engine_restart_drain_grace", 30.0) or 0.0)
with self._restart_lock: with self._restart_lock:
proc = engine.proc proc = engine.proc
if proc is not None and proc.poll() is None: if proc is not None and proc.poll() is None and drain_grace > 0:
try: engine.draining = True
proc.terminate() self.registry.update_state(engine_id, healthy=False)
proc.wait(timeout=8) deadline = time.time() + drain_grace
except Exception: waited = False
pass while engine.inflight > 0 and time.time() < deadline \
if proc.poll() is None: and not self._stopped.is_set():
if not waited:
print(f"[front] draining engine#{engine_id} ({engine.name}): "
f"waiting for {engine.inflight} in-flight request(s) "
f"(up to {drain_grace:.0f}s)", flush=True)
waited = True
time.sleep(0.25)
if engine.inflight > 0:
print(f"[front] drain grace elapsed; bouncing engine#{engine_id} "
f"with {engine.inflight} request(s) still in flight",
flush=True)
elif waited:
print(f"[front] engine#{engine_id} drained cleanly", flush=True)
try:
proc = engine.proc
if proc is not None and proc.poll() is None:
try: try:
proc.kill() proc.terminate()
proc.wait(timeout=3) proc.wait(timeout=8)
except Exception: except Exception:
pass pass
self.registry.update_state(engine_id, healthy=False) if proc.poll() is None:
print(f"[front] restarting engine#{engine_id} ({engine.name}) on request", try:
flush=True) proc.kill()
self._spawn(engine) proc.wait(timeout=3)
except Exception:
pass
self.registry.update_state(engine_id, healthy=False)
print(f"[front] restarting engine#{engine_id} ({engine.name}) on request",
flush=True)
self._spawn(engine)
finally:
engine.draining = False
return True return True
def wait_ready(self, timeout: float = 1800.0) -> bool: def wait_ready(self, timeout: float = 1800.0) -> bool:
......
...@@ -71,6 +71,10 @@ class Engine: ...@@ -71,6 +71,10 @@ class Engine:
# event loop is GIL-blocked and can't be polled # event loop is GIL-blocked and can't be polled
last_ok: float = 0.0 # monotonic time of last successful poll last_ok: float = 0.0 # monotonic time of last successful poll
proc: object = None # subprocess.Popen (set by the supervisor) proc: object = None # subprocess.Popen (set by the supervisor)
draining: bool = False # restart pending: stop routing NEW requests here
# and let in-flight ones finish (drain grace period)
inflight: int = 0 # proxied requests currently streaming through
_inflight_lock: object = field(default_factory=threading.Lock, repr=False, compare=False)
def __post_init__(self): def __post_init__(self):
if not self.url: if not self.url:
...@@ -89,13 +93,27 @@ class Engine: ...@@ -89,13 +93,27 @@ class Engine:
An engine mid-generation can't answer the health poll and reads as An engine mid-generation can't answer the health poll and reads as
unhealthy, but it's the right place to send a request pinned/assigned to unhealthy, but it's the right place to send a request pinned/assigned to
it — the request queues on its gen-lock instead of duplicating the model it — the request queues on its gen-lock instead of duplicating the model
elsewhere. A None proc means externally managed; assume alive.""" elsewhere. A None proc means externally managed; assume alive.
While draining (a restart is pending) it reports not-alive so the router
diverts new traffic elsewhere and the existing requests can finish."""
if self.draining:
return False
p = self.proc p = self.proc
try: try:
return p is None or p.poll() is None return p is None or p.poll() is None
except Exception: except Exception:
return True return True
def enter_request(self) -> None:
with self._inflight_lock:
self.inflight += 1
def exit_request(self) -> None:
with self._inflight_lock:
if self.inflight > 0:
self.inflight -= 1
class EngineRegistry: class EngineRegistry:
def __init__(self): def __init__(self):
...@@ -151,6 +169,8 @@ class EngineRegistry: ...@@ -151,6 +169,8 @@ class EngineRegistry:
e = self._engines.get(engine_id) e = self._engines.get(engine_id)
if not e: if not e:
return return
if e.draining: # a restart is pending — stay out of rotation
healthy = False
e.healthy = healthy e.healthy = healthy
if healthy: if healthy:
e.last_ok = time.monotonic() e.last_ok = time.monotonic()
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment