Township gen overhaul + coderai thermal/offload/eviction fixes

Township fight-video generator (tools/gen_township_fighters.py):
- 16:9 native resolution: default 832x480 video + matching keyframes
  (configurable video_size); square 512 was off-distribution for Wan2.2.
- Split-and-chain rendering: single-render cap (default 50f); clips/outcomes
  longer than the cap render as chained sub-renders (last frame seeds the next)
  concatenated into one continuous shot, parts discarded — Matches page unchanged.
  Planned-clip ceiling raised to 480f.
- Separate outcome min/max frames (default 40/70), same split-chain path.
- Configurable short/long final-assembly intervals; clip count derives from the
  long target + fps so the long cut always fills.
- Prompt continuity: deterministic wardrobe+environment clause on every clip,
  replan clip and outcome; stronger LLM system prompts; updated default suffix.
- Run page: configurable fighter/environment counts + reference-image counts;
  moved "Include female fighters" into the Characters card; suggested
  steps/rank/weight guide table; per-profile LoRA train defaults now mirror the
  run-page config (lora_* for characters, env_lora_* for environments).
- Matches: "Remove match completely" (files + keyframes + prompts.json entry).
- Renamed the prompts step to "Generate matches prompts"; removed the gallery page.

coderai:
- images.py: fix NameError ('model_key' undefined) that silently skipped
  proactive VRAM eviction before every image load.
- thermal.py: cross-worker cooldown — when one generation pauses for heat, all
  parallel generations now back off until the resume threshold; add process-tree
  CPU% reader (100%/core).
- video.py/manager.py/main.py: offload ref-leak fix, offloaded-load VRAM guard,
  wire --pipeline-cache flags.
- Tasks page CPU tile shows process-tree CPU% scaled to cores.
Co-Authored-By: 's avatarClaude Opus 4.8 <noreply@anthropic.com>
parent eeb3bba1
......@@ -2144,11 +2144,15 @@ async def api_system_stats(username: str = Depends(require_admin)):
best-effort and may be null when a sensor/metric is unavailable."""
from codai.models import thermal
cpu = {"util": None, "temp": thermal.read_cpu_temp()}
# CPU tile = coderai process-tree usage, scaled 100% PER CORE (0..100*cores),
# not the all-core average (which reads misleadingly low when work is on a few
# cores). `cores` lets the UI scale the bar to full capacity = cores*100%.
cpu = {"util": thermal.read_process_tree_cpu(), "temp": thermal.read_cpu_temp(),
"cores": None}
ram = None
try:
import psutil
cpu["util"] = psutil.cpu_percent(interval=None)
cpu["cores"] = psutil.cpu_count()
vm = psutil.virtual_memory()
ram = {"used": vm.used / 1e9, "total": vm.total / 1e9, "percent": vm.percent}
except Exception:
......
......@@ -107,18 +107,25 @@ function actions(t) {
}
// ---- Live hardware telemetry ----
function _utilClass(pct){ return pct == null ? 'sys-ok' : (pct >= 90 ? 'sys-hot' : pct >= 70 ? 'sys-warn' : 'sys-ok'); }
// `frac` is a 0-100 fraction OF CAPACITY (the bar fill + colour are driven by it).
function _utilClass(frac){ return frac == null ? 'sys-ok' : (frac >= 90 ? 'sys-hot' : frac >= 70 ? 'sys-warn' : 'sys-ok'); }
function _tempClass(t){ return t == null ? '' : (t >= 90 ? 'sys-temp-hot' : t >= 80 ? 'sys-temp-warn' : 'sys-temp-ok'); }
function _bar(pct){
const p = pct == null ? 0 : Math.max(0, Math.min(100, pct));
return `<div class="sys-bar ${_utilClass(pct)}"><span style="width:${p}%"></span></div>`;
function _bar(frac){
const p = frac == null ? 0 : Math.max(0, Math.min(100, frac));
return `<div class="sys-bar ${_utilClass(frac)}"><span style="width:${p}%"></span></div>`;
}
function _utilTile(name, pct, temp){
// `max` = full-scale value for the bar (default 100). The CPU tile passes
// cores*100 so the displayed % can run 0..cores*100 (100% per core) while the bar
// still fills 0..100% of total capacity.
function _utilTile(name, pct, temp, max){
const full = max || 100;
const frac = pct == null ? null : (pct / full * 100);
const valTxt = pct == null ? 'n/a' : `${Math.round(pct)}%`;
const subTxt = (max && max > 100) ? `utilization (max ${max}%)` : 'utilization';
const tempTxt = temp == null ? '<span class="dim">temp n/a</span>'
: `<span class="${_tempClass(temp)}">${Math.round(temp)}°C</span>`;
return `<div class="sys-head"><span class="sys-name">${name}</span><span class="sys-val">${valTxt}</span></div>`
+ _bar(pct) + `<div class="sys-sub"><span>utilization</span>${tempTxt}</div>`;
+ _bar(frac) + `<div class="sys-sub"><span>${subTxt}</span>${tempTxt}</div>`;
}
function _memTile(name, used, total, pct){
const valTxt = (used == null || total == null) ? 'n/a' : `${used.toFixed(1)} / ${total.toFixed(1)} GB`;
......@@ -130,7 +137,7 @@ async function loadSystemStats(){
try {
const s = await fetch(ROOT_PATH + '/admin/api/system-stats').then(r => r.json());
const cpu = s.cpu || {}, gpu = s.gpu || {}, ram = s.ram || {}, vram = s.vram || {};
document.getElementById('tile-cpu').innerHTML = _utilTile('CPU', cpu.util, cpu.temp);
document.getElementById('tile-cpu').innerHTML = _utilTile('CPU', cpu.util, cpu.temp, (cpu.cores || 1) * 100);
document.getElementById('tile-gpu').innerHTML = _utilTile('GPU', gpu.util, gpu.temp);
document.getElementById('tile-ram').innerHTML = _memTile('RAM', ram.used, ram.total, ram.percent);
document.getElementById('tile-vram').innerHTML =
......
......@@ -482,7 +482,7 @@ def _load_diffusers_pipeline(model_name: str, global_args, model_config: dict =
# Needed VRAM for this model (config used_vram_gb, with quant/offload
# factors applied) — 0 when it can't be determined.
_key = None
for _k in (model_key, model_name, f"image:{model_name}"):
for _k in (f"image:{model_name}", model_name):
if _k in _mmm.config:
_key = _k
break
......
......@@ -701,11 +701,21 @@ def _free_pipeline_vram(pipe) -> None:
pass
except Exception:
pass
_c = None # drop the loop leftover ref to the last component
for _cn in list(_comps):
try:
setattr(pipe, _cn, None)
except Exception:
pass
# CRITICAL: `_comps` (and `_c` above) hold STRONG refs to every
# component (transformer/transformer_2/text_encoder/vae). They stay
# in this function's scope through the gc.collect()/empty_cache()
# below, so without clearing them first the weights are never
# actually released — empty_cache() reclaims nothing, the GPU stays
# full, and the offload-reload retry OOMs on a 0.4 GB-free card,
# cascading through every fallback (each a full ~30-min reload).
_comps.clear()
_comps = None
except Exception:
pass
for _ in range(3):
......@@ -1038,6 +1048,14 @@ def _load_video_pipeline(model_name: str, device: str, mode: str, offload: str =
def _report_loaded(pipe, strategy: str) -> None:
"""Print a post-load summary: strategy, device placement, memory state."""
_enable_vae_memory_opts(pipe)
# Record the actual strategy used (after any OOM fallbacks) so the caller
# knows whether the post-load VRAM delta reflects the FULL model footprint
# (full GPU) or just the slice that happens to be resident under offload
# (which must NOT overwrite a real measurement — see record_vram_delta).
try:
pipe._coderai_load_strategy = strategy
except Exception:
pass
print(f" ✓ Video pipeline loaded — strategy: {strategy}")
_report_device_map(pipe)
_report_offload_dir_size()
......@@ -2510,8 +2528,15 @@ async def video_generations(request: VideoGenerationRequest,
multi_model_manager.current_model_key = model_key
# Record the real VRAM used. record_vram_delta only persists when no
# used_vram_gb is configured (it writes the separate measured_vram_gb).
# Under ANY offload strategy the weights live on CPU/disk, so the GPU
# delta is a meaningless ~0 — never let it overwrite a real full-GPU
# measurement (that bug saved measured_vram_gb=0.05 and made the next
# start mis-pick full-GPU and OOM-cascade).
try:
multi_model_manager.record_vram_delta(model_key, _vram_before)
_strat = str(getattr(pipe, '_coderai_load_strategy', '') or '')
_was_offloaded = bool(_strat) and not _strat.startswith('full GPU')
multi_model_manager.record_vram_delta(
model_key, _vram_before, offloaded=_was_offloaded)
except Exception:
pass
......
......@@ -748,6 +748,12 @@ def main():
global_args.ram = config.offload.manual_ram_gb
global_args.offload_strategy = config.offload.strategy
global_args.no_ram = config.offload.no_ram
# Pipeline disk-cache flags must be carried onto global_args — pipeline_cache.
# enabled()/_force_rebuild() read them via get_global_args(). Without this the
# cache silently never engages (the startup banner reads the raw args, so it
# still claims "enabled", masking the gap).
global_args.pipeline_cache = getattr(args, "pipeline_cache", False)
global_args.rebuild_pipeline_cache = getattr(args, "rebuild_pipeline_cache", False)
global_args.load_in_4bit = config.offload.load_in_4bit
global_args.load_in_8bit = config.offload.load_in_8bit
global_args.flash_attn = config.offload.flash_attention
......
......@@ -1642,15 +1642,26 @@ class MultiModelManager:
"""Call immediately before loading a model; returns a snapshot for delta measurement."""
return self._free_vram_snapshot()
def record_vram_delta(self, model_key: str, free_before: float) -> None:
def record_vram_delta(self, model_key: str, free_before: float,
offloaded: bool = False) -> None:
"""Call immediately after a model finishes loading to record actual VRAM consumed.
If the measured value exceeds the stored estimate by more than 10%, the real
value is written back into the model config and persisted to models.json so
future eviction decisions use the accurate figure.
``offloaded`` MUST be True when the model was loaded with any CPU/disk
offload strategy (model/sequential/group/balanced/disk). In that case the
weights are not resident on the GPU, so the measured delta is a tiny,
meaningless lower bound — recording it would clobber a real full-GPU
measurement and make the next start under-estimate the footprint, pick a
full-GPU load, and OOM. So we skip recording entirely and keep the prior
estimate/measurement intact.
"""
if free_before < 0:
return
if offloaded:
return
free_after = self._free_vram_snapshot()
if free_after < 0:
return
......
......@@ -63,6 +63,14 @@ def get_cooldown_state() -> dict:
return dict(_cooldown_state)
def _cooldown_active() -> bool:
"""True while at least one worker is in the cooldown wait loop. Used so that
other parallel workers join the pause (cross-worker hysteresis) instead of
racing ahead the instant their own single read dips below the high trigger."""
with _cooldown_lock:
return _cooldown_waiters > 0
def _cooldown_enter() -> None:
global _cooldown_waiters
with _cooldown_lock:
......@@ -302,6 +310,49 @@ def read_gpu_util() -> Optional[float]:
return val
# Persistent psutil.Process handles, so cpu_percent() can report usage *since the
# previous call* without blocking. Keyed by pid.
_proc_cpu_cache: dict = {}
def read_process_tree_cpu() -> Optional[float]:
"""CPU% of the coderai process tree (this process + all children).
Scale is 100% PER CORE: a single fully-used core is 100%, so the value ranges
0 .. 100*cpu_count (e.g. 24 cores → up to 2400%). Non-blocking — it measures
usage since the previous call (the Tasks page polls every ~2 s), so the very
first reading after start is ~0 and corrects on the next poll. Torch runs its
compute on threads inside THIS process, so the main process already accounts
for generation load; children cover ffmpeg/subprocess work.
"""
try:
import psutil
except Exception:
return None
try:
root = psutil.Process()
procs = [root] + root.children(recursive=True)
except Exception:
return None
live: dict = {}
total = 0.0
for p in procs:
try:
pid = p.pid
cached = _proc_cpu_cache.get(pid)
if cached is None:
p.cpu_percent(None) # prime; contributes ~0 this round
live[pid] = p
else:
total += cached.cpu_percent(None) # usage since last call
live[pid] = cached
except Exception:
pass
_proc_cpu_cache.clear()
_proc_cpu_cache.update(live)
return round(total, 1)
def read_cpu_temp_avg(samples: int = 3, max_seconds: float = 3.0) -> Optional[float]:
"""Averaged CPU temperature for stable resume/cooldown decisions.
......@@ -463,17 +514,35 @@ def wait_until_safe(settings: Optional[ThermalSettings] = None,
hot.append(("GPU", gpu_t, settings.gpu_resume))
if settings.cpu_enabled and cpu_t is not None and cpu_t >= settings.cpu_high:
hot.append(("CPU", cpu_t, settings.cpu_resume))
if not hot:
# Cross-worker hysteresis: a thermal pause is a GLOBAL hardware event. When a
# parallel worker is already cooling down, every OTHER running generation must
# back off too — otherwise the others keep the box hot and the first worker
# can never reach the (lower) resume threshold. So even when our own single
# read is below the high trigger, join the pause while temps are still above
# the resume line and a cooldown is already in progress.
joined = False
if not hot and _cooldown_active():
if (settings.gpu_enabled and gpu_t is not None and gpu_t > settings.gpu_resume) or \
(settings.cpu_enabled and cpu_t is not None and cpu_t > settings.cpu_resume):
joined = True
if not hot and not joined:
_dbg(f"within safe limits — serving immediately{desc0}")
return
# Enter cooldown: wait until *every* triggered sensor is at/below resume.
desc = f" ({context})" if context else ""
trig = ", ".join(f"{lbl} {t:.0f}°C>={settings.gpu_high if lbl=='GPU' else settings.cpu_high:.0f}°C"
for lbl, t, _ in hot)
print(f"[thermal] Hardware too hot{desc}: {trig} — pausing requests "
f"until cooldown (GPU<={settings.gpu_resume:.0f}°C / "
f"CPU<={settings.cpu_resume:.0f}°C)")
if hot:
trig = ", ".join(f"{lbl} {t:.0f}°C>={settings.gpu_high if lbl=='GPU' else settings.cpu_high:.0f}°C"
for lbl, t, _ in hot)
print(f"[thermal] Hardware too hot{desc}: {trig} — pausing requests "
f"until cooldown (GPU<={settings.gpu_resume:.0f}°C / "
f"CPU<={settings.cpu_resume:.0f}°C)")
else:
# Joined an already-active cooldown started by another parallel worker.
print(f"[thermal] Joining active cooldown{desc} — another generation is "
f"paused; backing off until temps reach resume "
f"(GPU<={settings.gpu_resume:.0f}°C / CPU<={settings.cpu_resume:.0f}°C)")
waited = 0.0
_cooldown_enter()
try:
......
......@@ -124,6 +124,22 @@ class TaskRegistry:
if total is not None:
t.total = int(total)
def current_loading_task(self) -> Optional[str]:
"""Id of the most-recently-started running ``loading`` task, if any.
Used by the tqdm progress capture (which runs in the load's executor
thread, with no handle to the task id) to publish shard/component
progress onto the live loading entry."""
with self._lock:
best = None
best_t = -1.0
for tid, t in self._tasks.items():
if t.kind == "loading" and t.status == "running":
st = t.started_at or t.created_at or 0.0
if st >= best_t:
best_t, best = st, tid
return best
def finish(self, tid: str, status: str = "done", message: str = "") -> None:
with self._lock:
t = self._tasks.get(tid)
......@@ -261,6 +277,92 @@ def wait_if_paused(task_id: Optional[str]) -> None:
task_registry.wait_if_paused(task_id)
# --- tqdm progress capture for model loads ---------------------------------
# diffusers / transformers / huggingface_hub emit their load progress through
# tqdm ("Loading checkpoint shards", "Loading pipeline components", "Loading
# weights", download bars). We monkeypatch the base tqdm class for the duration
# of a load so those bars publish step/total/desc onto the live `loading` task —
# turning the Tasks-page "working…" into the same detailed progress the terminal
# shows. Ref-counted so concurrent/nested loads share one patch.
_tqdm_patch_lock = threading.Lock()
_tqdm_patch_depth = 0
_tqdm_orig: Dict[str, object] = {}
def _publish_loading_progress(desc, n, total):
tid = task_registry.current_loading_task()
if not tid:
return
try:
n = int(n or 0)
total = int(total or 0)
except (TypeError, ValueError):
return
# tqdm may store the desc with a trailing ": " (set_description) — normalise.
desc = (str(desc).strip().rstrip(":").strip() if desc else "") or "Loading"
# Step only counts up to total; the message carries the human-readable phase.
task_registry.step(tid, n, total if total > 0 else None)
msg = f"{desc}: {n}/{total}" if total > 0 else desc
task_registry.update(tid, message=msg)
def _install_tqdm_capture():
global _tqdm_patch_depth
with _tqdm_patch_lock:
_tqdm_patch_depth += 1
if _tqdm_patch_depth > 1:
return
try:
from tqdm import std as _tqdm_std
except Exception:
return
cls = _tqdm_std.tqdm
_tqdm_orig['update'] = cls.update
_tqdm_orig['close'] = cls.close
_tqdm_orig['cls'] = cls
def _patched_update(self, n=1):
r = _tqdm_orig['update'](self, n)
try:
if not getattr(self, 'disable', False):
_publish_loading_progress(
getattr(self, 'desc', ''), getattr(self, 'n', 0),
getattr(self, 'total', 0))
except Exception:
pass
return r
def _patched_close(self):
try:
if not getattr(self, 'disable', False) and getattr(self, 'total', 0):
_publish_loading_progress(
getattr(self, 'desc', ''), getattr(self, 'total', 0),
getattr(self, 'total', 0))
except Exception:
pass
return _tqdm_orig['close'](self)
cls.update = _patched_update
cls.close = _patched_close
def _remove_tqdm_capture():
global _tqdm_patch_depth
with _tqdm_patch_lock:
if _tqdm_patch_depth <= 0:
return
_tqdm_patch_depth -= 1
if _tqdm_patch_depth > 0:
return
cls = _tqdm_orig.get('cls')
if cls is not None:
if 'update' in _tqdm_orig:
cls.update = _tqdm_orig['update']
if 'close' in _tqdm_orig:
cls.close = _tqdm_orig['close']
_tqdm_orig.clear()
@contextmanager
def loading_task(model: str, *, model_type: str = "model", title: Optional[str] = None):
"""Context manager that shows a model load as a Tasks-page entry.
......@@ -268,16 +370,21 @@ def loading_task(model: str, *, model_type: str = "model", title: Optional[str]
Model loading can't be paused or cancelled (it's a single blocking
``from_pretrained`` / ``Llama(...)`` call), so the task is registered
non-cancellable and non-pausable — the Tasks UI shows it with no action
buttons. The task finishes ``done`` on success or ``error`` on exception.
Re-entrant guard: a nested load of the same model_key reuses no task; each
call is independent (loads don't nest in practice)."""
buttons. While the context is active, tqdm progress bars emitted by
diffusers/transformers/hf_hub are captured and published onto the task as
step/total + a phase message ("Loading checkpoint shards: 7/12"), so the UI
mirrors the terminal instead of a bare "working…". The task finishes ``done``
on success or ``error`` on exception."""
label = title or f"Loading {model}"
tid = task_registry.register(
"loading", title=label, model=model or "", status="running",
cancellable=False, restartable=False, pausable=False)
_install_tqdm_capture()
try:
yield tid
task_registry.finish(tid, "done")
except BaseException as e: # noqa: BLE001 — record then re-raise
task_registry.finish(tid, "error", str(e)[:200] or e.__class__.__name__)
raise
finally:
_remove_tqdm_capture()
This diff is collapsed.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment