video: cap CPU cores + thermal-manage RIFE interpolation

rife-ncnn-vulkan and the ffmpeg frame extract/encode were grabbing all cores
and ran with no ongoing thermal control. Now:

- _cpu_thread_limit() mirrors coderai's half-the-cores cap (honours the
  OMP_NUM_THREADS set at import). All ffmpeg calls in the upscale + interpolate
  paths pass -threads N and are CPU-pinned via a sched_setaffinity preexec_fn;
  rife gets -j capped and the same affinity pin — so neither can saturate 24
  cores.
- RIFE is one opaque subprocess, so it now runs under a watcher thread that
  SIGSTOPs it when the GPU/CPU exceeds the configured thermal-high threshold and
  SIGCONTs it once cooled (the subprocess analogue of the upscaler's per-frame
  thermal gate), and terminates it on task cancel. Per-frame progress preserved.
Co-Authored-By: 's avatarClaude Opus 4.8 <noreply@anthropic.com>
parent cc2436ed
......@@ -29,6 +29,7 @@ import asyncio
import base64
import io
import os
import signal
import subprocess
import tempfile
import time
......@@ -2099,8 +2100,8 @@ def _model_upscale_video(model_name: str, in_path: str, factor: int,
frames_dir = tempfile.mkdtemp()
out_dir = tempfile.mkdtemp()
temps += [frames_dir, out_dir]
r = subprocess.run(['ffmpeg', '-y', '-i', in_path, f'{frames_dir}/%08d.png'],
capture_output=True)
r = subprocess.run(_ffmpeg(['-i', in_path, f'{frames_dir}/%08d.png']),
capture_output=True, preexec_fn=_cpu_affinity_preexec())
if r.returncode != 0:
raise RuntimeError(
f"frame extraction failed: {r.stderr.decode(errors='replace')}")
......@@ -2159,12 +2160,12 @@ def _model_upscale_video(model_name: str, in_path: str, factor: int,
out = tempfile.mktemp(suffix='_up.mp4')
temps.append(out)
cmd = ['ffmpeg', '-y', '-framerate', f'{fps}', '-i', f'{out_dir}/%08d.png']
cmd = _ffmpeg(['-framerate', f'{fps}', '-i', f'{out_dir}/%08d.png'])
# Carry the original audio track if present.
cmd += ['-i', in_path, '-map', '0:v:0', '-map', '1:a:0?',
'-c:v', 'libx264', '-pix_fmt', 'yuv420p', '-c:a', 'copy',
'-shortest', out]
r = subprocess.run(cmd, capture_output=True)
r = subprocess.run(cmd, capture_output=True, preexec_fn=_cpu_affinity_preexec())
if r.returncode != 0:
raise RuntimeError(
f"reassembly failed: {r.stderr.decode(errors='replace')}")
......@@ -2183,6 +2184,38 @@ def _count_video_frames(path: str) -> int:
return 0
def _cpu_thread_limit() -> int:
"""CPU thread budget for helper subprocesses (ffmpeg, rife I/O). Mirrors
coderai's global cap (half the cores, set as OMP_NUM_THREADS at import) so
frame extraction/encoding and interpolation don't grab every core."""
try:
v = int(os.environ.get("OMP_NUM_THREADS", "") or 0)
if v > 0:
return v
except ValueError:
pass
return max(1, (os.cpu_count() or 4) // 2)
def _cpu_affinity_preexec():
"""preexec_fn that pins a child (ffmpeg/rife) to the first N CPUs (N =
_cpu_thread_limit) so it physically cannot saturate all cores, regardless of
how many internal threads it spawns. No-op where sched_setaffinity is absent."""
n = _cpu_thread_limit()
def _fn():
try:
if hasattr(os, "sched_setaffinity"):
os.sched_setaffinity(0, set(range(min(n, os.cpu_count() or n))))
except Exception:
pass
return _fn
def _ffmpeg(args: list) -> list:
"""Prefix an ffmpeg arg list with a thread cap so it doesn't use all cores."""
return ['ffmpeg', '-y', '-threads', str(_cpu_thread_limit())] + list(args)
def _find_rife_binary():
"""Locate the rife-ncnn-vulkan executable: PATH first, then the common
user/system install dirs (the prebuilt release may not be on the server's
......@@ -2329,43 +2362,95 @@ def _rife_interpolate(path: str, multiplier: int, temps: list) -> str:
frames_dir = tempfile.mkdtemp()
out_dir = tempfile.mkdtemp()
temps += [frames_dir, out_dir]
_aff = _cpu_affinity_preexec()
# ffmpeg here only extracts/encodes frames (a muxer), never interpolates —
# the actual interpolation is done by the RIFE neural network.
r = subprocess.run(['ffmpeg', '-y', '-i', path, f'{frames_dir}/%08d.png'],
capture_output=True)
# the actual interpolation is done by the RIFE neural network. Thread-capped
# + CPU-pinned so it doesn't grab every core.
r = subprocess.run(_ffmpeg(['-i', path, f'{frames_dir}/%08d.png']),
capture_output=True, preexec_fn=_aff)
if r.returncode != 0:
raise RuntimeError(
f"frame extraction failed: {r.stderr.decode(errors='replace')}")
# Watch the output frame dir so the (opaque) rife subprocess still
# reports progress as it writes interpolated frames.
# -n sets the exact target frame count so any multiplier works (rife's
# default only doubles); -g pins the GPU to the configured NVIDIA device;
# -j caps the load/proc/save thread counts (CPU PNG I/O).
_jl = max(1, min(4, _cpu_thread_limit()))
_cmd = [rife_bin, '-i', frames_dir, '-o', out_dir, '-m', _model_arg,
'-g', str(_rife_gpu_id(rife_bin)), '-j', f'{_jl}:2:{_jl}']
if out_frames:
_cmd += ['-n', str(out_frames)]
# Thermal-managed run: rife is one opaque subprocess, so a watcher thread
# both reports per-frame progress AND enforces thermal limits by
# SIGSTOP-ing the process when the GPU/CPU gets too hot and SIGCONT-ing it
# once cooled — the subprocess equivalent of the upscaler's per-frame gate.
_vid_progress_reset(max(1, out_frames))
_vid_progress["phase"] = "interpolating"
import threading, glob as _glob
import threading, glob as _glob, time as _time
try:
from codai.models.thermal import (
_settings_from_global_args as _therm_settings,
read_gpu_temp as _rgt, read_cpu_temp as _rct)
_ts = _therm_settings()
except Exception:
_ts, _rgt, _rct = None, None, None
proc = subprocess.Popen(_cmd, stdout=subprocess.PIPE,
stderr=subprocess.PIPE, preexec_fn=_aff)
_stop = threading.Event()
_paused = {"v": False}
def _watch():
_last_t = 0.0
while not _stop.is_set():
n = len(_glob.glob(f'{out_dir}/*.png'))
_vid_progress_step(min(n, out_frames) if out_frames else n)
if out_frames:
try: task_registry.step(_tid, min(n, out_frames))
except Exception: pass
# Cancellation → kill the process.
try:
task_registry.raise_if_cancelled(_tid)
except TaskCancelled:
try: proc.send_signal(signal.SIGCONT); proc.terminate()
except Exception: pass
_stop.set(); break
except Exception:
pass
# Thermal gate (every poll_seconds): pause/resume the subprocess.
now = _time.monotonic()
if _ts is not None and (now - _last_t) >= float(getattr(_ts, "poll_seconds", 5.0)):
_last_t = now
gt = _rgt() if (_rgt and _ts.gpu_enabled) else None
ct = _rct() if (_rct and _ts.cpu_enabled) else None
too_hot = ((gt is not None and gt >= _ts.gpu_high) or
(ct is not None and ct >= _ts.cpu_high))
cool = ((gt is None or gt <= _ts.gpu_resume) and
(ct is None or ct <= _ts.cpu_resume))
if too_hot and not _paused["v"]:
try:
proc.send_signal(signal.SIGSTOP); _paused["v"] = True
_vid_progress["phase"] = "interpolating (thermal pause)"
import logging
logging.getLogger(__name__).warning(
"rife paused — too hot (GPU %s / CPU %s °C)", gt, ct)
except Exception: pass
elif _paused["v"] and cool:
try:
proc.send_signal(signal.SIGCONT); _paused["v"] = False
_vid_progress["phase"] = "interpolating"
except Exception: pass
_stop.wait(0.5)
_w = threading.Thread(target=_watch, daemon=True); _w.start()
# -n sets the exact target frame count so any multiplier works (rife's
# default only doubles); -g pins the GPU to the configured NVIDIA device.
_cmd = [rife_bin, '-i', frames_dir, '-o', out_dir, '-m', _model_arg,
'-g', str(_rife_gpu_id(rife_bin))]
if out_frames:
_cmd += ['-n', str(out_frames)]
r = subprocess.run(_cmd, capture_output=True)
_stop.set(); _w.join(timeout=1)
if r.returncode != 0:
_out, _err = proc.communicate()
_stop.set(); _w.join(timeout=2)
if proc.returncode != 0:
raise RuntimeError(
f"rife-ncnn-vulkan failed: {r.stderr.decode(errors='replace')}")
r = subprocess.run(['ffmpeg', '-y', '-r', str(multiplier * 8), '-i',
f'{out_dir}/%08d.png', '-c:v', 'libx264', out],
capture_output=True)
f"rife-ncnn-vulkan failed: {(_err or b'').decode(errors='replace')}")
r = subprocess.run(
_ffmpeg(['-r', str(multiplier * 8), '-i', f'{out_dir}/%08d.png',
'-c:v', 'libx264', out]),
capture_output=True, preexec_fn=_aff)
if r.returncode != 0 or not os.path.exists(out):
raise RuntimeError(
f"reassembly failed: {r.stderr.decode(errors='replace')}")
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment