front: show model-loading progress on the Tasks page

During a GIL-heavy from_pretrained the engine's event loop is blocked, so its
/internal/engine-state poll times out and the engine looked "down" with an empty
task list — the real loading task never reached the front. Parse load progress
from the engine's log stream (which the front already pumps) into Engine.loading
and surface it as a synthetic 'loading' task (with live step/total) in
_merge_engine_tasks, even when the primary engine is the blocked one. Cleared on
"Model loaded successfully" or the next successful poll.
Co-Authored-By: 's avatarClaude Opus 4.8 <noreply@anthropic.com>
parent bc9a8352
......@@ -341,6 +341,32 @@ class FrontProxy:
t["engine"] = e.name
merged.append(t)
seen.add(t.get("id"))
# Synthetic "loading" tasks parsed from the log stream, for any engine that
# is loading a model but whose event loop is GIL-blocked (so its real
# loading task never reached us). Skip if a real loading task for the same
# engine already surfaced above.
have_loading = {(t.get("engine"), t.get("model")) for t in merged
if isinstance(t, dict) and t.get("kind") == "loading"}
for e in self.registry.all():
ld = e.loading
if not ld or (e.name, ld.get("model")) in have_loading:
continue
merged.append({
"id": f"loading-{e.name}",
"kind": "loading",
"title": f"Loading {ld.get('model') or 'model'}",
"model": ld.get("model") or "",
"status": "running",
"step": ld.get("step", 0),
"total": ld.get("total", 0),
"rate": 0.0,
"message": ld.get("message") or "Loading",
"engine": e.name,
"active": True,
"cancellable": False,
"pausable": False,
"restartable": False,
})
return merged
# -------------------------------------------------------------------- proxy
......
......@@ -23,6 +23,7 @@ import atexit
import collections
import json
import os
import re
import shutil
import signal
import socket
......@@ -313,16 +314,54 @@ class EngineSupervisor:
)
engine.proc = proc
tail = self._logs.setdefault(engine.id, collections.deque(maxlen=30))
threading.Thread(target=self._pump_logs, args=(tag, proc, tail),
threading.Thread(target=self._pump_logs, args=(tag, proc, tail, engine),
daemon=True).start()
@staticmethod
def _pump_logs(tag, proc, tail):
for line in proc.stdout:
line = line.rstrip()
if line:
tail.append(line)
print(f"[{tag}] {line}", flush=True)
# tqdm progress line, e.g. "Loading weights: 12%|██ | 50/427 [..]" → desc, n, total.
_PROGRESS_RE = re.compile(r'^(.*?):\s*\d+%\|.*?\|\s*(\d+)/(\d+)')
# Phase markers that name the model being loaded / signal completion.
_LOAD_START_RE = re.compile(
r'(?:Loading model on demand|Loading HuggingFace model|Loading model):\s*(\S.*)')
_LOAD_DONE_RE = re.compile(r'Model loaded successfully|failed to load|Error loading')
def _pump_logs(self, tag, proc, tail, engine):
for raw in proc.stdout:
line = raw.rstrip()
if not line:
continue
tail.append(line)
print(f"[{tag}] {line}", flush=True)
self._note_load_progress(engine, line)
def _note_load_progress(self, engine, line):
"""Track model-load progress from the engine's log stream so the front can
show a 'loading' task even while the engine's event loop is GIL-blocked and
unpollable. Best-effort string matching; never raises."""
try:
m = self._LOAD_START_RE.search(line)
if m:
engine.loading = {"model": m.group(1).strip().split(" ")[0],
"message": "Loading", "step": 0, "total": 0}
return
if self._LOAD_DONE_RE.search(line):
engine.loading = None
return
pm = self._PROGRESS_RE.match(line)
if pm:
desc = pm.group(1).strip() or "Loading"
# Only treat known model-load bars as a load (ignore unrelated tqdm,
# e.g. LoRA training steps which have their own task).
if engine.loading is None:
if not re.search(r'Loading (weights|checkpoint shards|'
r'pipeline components|shards)|Fetching', desc,
re.IGNORECASE):
return
engine.loading = {"model": "", "message": "", "step": 0, "total": 0}
step, total = int(pm.group(2)), int(pm.group(3))
engine.loading.update(
message=f"{desc}: {step}/{total}", step=step, total=total)
except Exception:
pass
# -------------------------------------------------------------------- lifecycle
def _set_primary(self, engines) -> None:
......@@ -369,13 +408,19 @@ class EngineSupervisor:
if r.status_code == 200:
d = r.json()
healthy = True
loaded = d.get("loaded_models") or []
self.registry.update_state(
engine.id, healthy=True,
loaded_models=d.get("loaded_models") or [],
loaded_models=loaded,
vram=d.get("vram"),
tasks=d.get("tasks") or [],
cooling=d.get("cooling"),
)
# Backstop: the engine answered, so it's no longer GIL-blocked
# loading. Clear the log-parsed loading state (the real task,
# if any, now comes through the poll).
if engine.loading is not None:
engine.loading = None
else:
self.registry.update_state(engine.id, healthy=False)
except Exception:
......
......@@ -53,6 +53,9 @@ class Engine:
vram: Optional[dict] = None
tasks: list = field(default_factory=list) # running/queued tasks on this engine
cooling: Optional[dict] = None # thermal cooldown state, or None when not cooling
loading: Optional[dict] = None # model-load progress parsed from logs (or None);
# surfaced as a synthetic task while the engine's
# event loop is GIL-blocked and can't be polled
last_ok: float = 0.0 # monotonic time of last successful poll
proc: object = None # subprocess.Popen (set by the supervisor)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment