tasks: show model downloads on the Tasks page

Surface out-of-process download workers (tracked in _download_status) as
first-class tasks in /admin/api/tasks, alongside generations, training and
queued requests. They render with a percentage progress bar plus a
filename / rate / ETA readout, and can be cancelled from the Tasks page
(routed through a shared _cancel_download_session helper) or removed once
finished/failed.
Co-Authored-By: 's avatarClaude Opus 4.8 <noreply@anthropic.com>
parent a2460385
...@@ -827,7 +827,7 @@ def _run_download_thread(session_id: str, model_id: str, file_pattern: str, pq): ...@@ -827,7 +827,7 @@ def _run_download_thread(session_id: str, model_id: str, file_pattern: str, pq):
import os import os
status = {"session_id": session_id, "model_id": model_id, "file_pattern": file_pattern, status = {"session_id": session_id, "model_id": model_id, "file_pattern": file_pattern,
"status": "starting", "status": "starting", "started_at": time.time(),
"percent": 0, "filename": "", "rate": 0, "eta": None} "percent": 0, "filename": "", "rate": 0, "eta": None}
_download_status[session_id] = status _download_status[session_id] = status
...@@ -1075,15 +1075,16 @@ async def api_list_downloads(username: str = Depends(require_admin)): ...@@ -1075,15 +1075,16 @@ async def api_list_downloads(username: str = Depends(require_admin)):
return list(_download_status.values()) return list(_download_status.values())
@router.post("/admin/api/download-cancel/{session_id}", summary="Cancel a download") def _cancel_download_session(session_id: str) -> bool:
async def api_cancel_download(session_id: str, username: str = Depends(require_admin)): """Cancel an active download by flagging the session and terminating its worker
"""Cancel an active download by terminating its worker process immediately. process. Returns False if there is no such download session.
Flagging the session (so the supervisor classifies it as cancelled, not Flagging the session (so the supervisor classifies it as cancelled, not
failed) and killing the child process tears down every HF chunk connection at failed) and killing the child process tears down every HF chunk connection at
once — the supervisor's relay loop then exits cleanly.""" once — the supervisor's relay loop then exits cleanly. Shared by the dedicated
download-cancel endpoint and the unified Tasks-page cancel path."""
if session_id not in _download_sessions and session_id not in _download_status: if session_id not in _download_sessions and session_id not in _download_status:
raise HTTPException(status_code=404, detail="Download session not found") return False
_download_cancelled.add(session_id) _download_cancelled.add(session_id)
proc = _download_procs.get(session_id) proc = _download_procs.get(session_id)
if proc is not None and proc.poll() is None: if proc is not None and proc.poll() is None:
...@@ -1091,6 +1092,14 @@ async def api_cancel_download(session_id: str, username: str = Depends(require_a ...@@ -1091,6 +1092,14 @@ async def api_cancel_download(session_id: str, username: str = Depends(require_a
proc.terminate() proc.terminate()
except Exception: except Exception:
pass pass
return True
@router.post("/admin/api/download-cancel/{session_id}", summary="Cancel a download")
async def api_cancel_download(session_id: str, username: str = Depends(require_admin)):
"""Cancel an active download by terminating its worker process immediately."""
if not _cancel_download_session(session_id):
raise HTTPException(status_code=404, detail="Download session not found")
return {"success": True} return {"success": True}
...@@ -2253,6 +2262,32 @@ async def api_turboquant_info(username: str = Depends(require_admin)): ...@@ -2253,6 +2262,32 @@ async def api_turboquant_info(username: str = Depends(require_admin)):
# --- Task / queue management --- # --- Task / queue management ---
def _human_bytes(n: float) -> str:
"""Compact human-readable byte size (e.g. 45.2 MB) for download readouts."""
try:
n = float(n)
except (TypeError, ValueError):
return "0 B"
for unit in ("B", "KB", "MB", "GB", "TB"):
if n < 1024 or unit == "TB":
return f"{n:.0f} {unit}" if unit == "B" else f"{n:.1f} {unit}"
n /= 1024
return f"{n:.1f} TB"
def _human_duration(seconds: float) -> str:
"""Compact h/m/s duration (e.g. 3m 12s) for download ETAs."""
try:
s = int(seconds)
except (TypeError, ValueError):
return ""
if s < 60:
return f"{s}s"
if s < 3600:
return f"{s // 60}m {s % 60}s"
return f"{s // 3600}h {(s % 3600) // 60}m"
@router.get("/admin/api/tasks", summary="List active and recent tasks") @router.get("/admin/api/tasks", summary="List active and recent tasks")
def api_tasks(username: str = Depends(require_admin)): def api_tasks(username: str = Depends(require_admin)):
"""Unified live view of long-running work: in-flight / recent generations """Unified live view of long-running work: in-flight / recent generations
...@@ -2327,6 +2362,46 @@ def api_tasks(username: str = Depends(require_admin)): ...@@ -2327,6 +2362,46 @@ def api_tasks(username: str = Depends(require_admin)):
"restartable": False, "restartable": False,
}) })
# Model downloads run in out-of-process workers tracked in `_download_status`,
# not the task registry. Surface them here so the Tasks page shows downloads
# in progress alongside generations, training and queued requests.
for sid, d in list(_download_status.items()):
if sid in seen:
continue
seen.add(sid)
dstatus = d.get("status") or "starting"
active = dstatus in ("starting", "downloading")
pct = int(round(d.get("percent") or 0))
bits = []
fn = d.get("filename") or ""
if fn:
bits.append(fn)
rate = d.get("rate") or 0
if active and rate:
bits.append(f"{_human_bytes(rate)}/s")
eta = d.get("eta")
if active and eta:
bits.append(f"ETA {_human_duration(eta)}")
if d.get("error"):
bits.append(str(d["error"]))
elif not fn and d.get("last_info"):
bits.append(str(d["last_info"]))
tasks.append({
"id": sid,
"kind": "download",
"title": d.get("model_id") or "",
"model": d.get("file_pattern") or "",
"status": "running" if active else dstatus,
"step": pct, "total": 100,
"percent": pct,
"message": " · ".join(bits),
"started_at": d.get("started_at"),
"active": active,
"cancellable": active,
"pausable": False,
"restartable": False,
})
# Successfully-finished work is dropped from the live list — a "done" job is # Successfully-finished work is dropped from the live list — a "done" job is
# no longer actionable, so it shouldn't clutter the view. Terminal-but-notable # no longer actionable, so it shouldn't clutter the view. Terminal-but-notable
# states (cancelled / error / interrupted) stay, so they can be inspected, # states (cancelled / error / interrupted) stay, so they can be inspected,
...@@ -2478,6 +2553,9 @@ def _do_task_cancel(task_id: str) -> bool: ...@@ -2478,6 +2553,9 @@ def _do_task_cancel(task_id: str) -> bool:
in-memory task registry.""" in-memory task registry."""
from codai.tasks import task_registry from codai.tasks import task_registry
from codai.api.loras import cancel_job from codai.api.loras import cancel_job
# Download workers live in their own session registry, not the task registry.
if _cancel_download_session(task_id):
return True
if cancel_job(task_id): if cancel_job(task_id):
return True return True
return task_registry.cancel(task_id) return task_registry.cancel(task_id)
...@@ -2506,6 +2584,14 @@ async def api_task_remove(task_id: str, username: str = Depends(require_admin)): ...@@ -2506,6 +2584,14 @@ async def api_task_remove(task_id: str, username: str = Depends(require_admin)):
remove a task that is still active — cancel it first.""" remove a task that is still active — cancel it first."""
from codai.tasks import task_registry from codai.tasks import task_registry
from codai.api.loras import remove_job from codai.api.loras import remove_job
# Finished/failed download session — drop it from the live status map.
d = _download_status.get(task_id)
if d is not None:
if d.get("status") in ("starting", "downloading"):
raise HTTPException(status_code=409, detail="Download is still active — cancel it first")
_download_status.pop(task_id, None)
_download_sessions.pop(task_id, None)
return {"ok": True, "task_id": task_id, "removed": True}
# Training job (durable record) first. # Training job (durable record) first.
if remove_job(task_id): if remove_job(task_id):
return {"ok": True, "task_id": task_id, "removed": True} return {"ok": True, "task_id": task_id, "removed": True}
......
...@@ -88,13 +88,20 @@ function fmtTime(s) { ...@@ -88,13 +88,20 @@ function fmtTime(s) {
} catch { return ''; } } catch { return ''; }
} }
const KIND_LABEL = {training:'Training', image:'Image', video:'Video', upscale:'Upscale', interpolate:'Interpolate', audio:'Audio', text:'Text', tts:'Speech (TTS)', transcription:'Transcription', embedding:'Embedding', spatial:'3D / Spatial', pipeline:'Pipeline', request:'Request', loading:'Loading'}; const KIND_LABEL = {training:'Training', image:'Image', video:'Video', upscale:'Upscale', interpolate:'Interpolate', audio:'Audio', text:'Text', tts:'Speech (TTS)', transcription:'Transcription', embedding:'Embedding', spatial:'3D / Spatial', pipeline:'Pipeline', request:'Request', loading:'Loading', download:'Download'};
const STATUS_BADGE = { const STATUS_BADGE = {
running:'badge-admin', queued:'badge-user', done:'badge-ok', error:'badge-err', running:'badge-admin', queued:'badge-user', done:'badge-ok', error:'badge-err',
cancelled:'badge-user', interrupted:'badge-warn' cancelled:'badge-user', interrupted:'badge-warn'
}; };
function progressBar(t) { function progressBar(t) {
// Downloads report a 0-100 percent; render a percentage bar (the filename /
// rate / ETA detail is shown in the status cell, so it isn't repeated here).
if (t.kind === 'download') {
const pct = Math.max(0, Math.min(100, Math.round(t.percent || t.step || 0)));
return `<div class="progress"><div class="progress-fill" style="width:${pct}%"></div></div>
<span class="dim small">${pct}%</span>`;
}
const total = t.total || 0, step = t.step || 0; const total = t.total || 0, step = t.step || 0;
// Live throughput for text generation (tokens/s), shown while running. // Live throughput for text generation (tokens/s), shown while running.
const rate = (t.rate && t.status === 'running') const rate = (t.rate && t.status === 'running')
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment