Add real-time GPU usage monitoring to cluster nodes - clients send GPU stats...

Add real-time GPU usage monitoring to cluster nodes - clients send GPU stats every 5 seconds, master stores and serves via API, web UI shows usage bars and utilization
parent 112f7140
...@@ -186,7 +186,20 @@ function renderNodesTable() { ...@@ -186,7 +186,20 @@ function renderNodesTable() {
<td>${node.weight || 100}</td> <td>${node.weight || 100}</td>
<td>${node.gpus}</td> <td>${node.gpus}</td>
<td> <td>
${node.gpu_memory.length > 0 ? node.gpu_memory.join('<br>') : 'No GPU info'} ${node.gpu_usage_info && node.gpu_usage_info.length > 0 ? node.gpu_usage_info.map(gpu => `
<div class="gpu-info" style="margin-bottom: 0.5rem;">
<strong>${gpu.name}</strong><br>
<div class="gpu-stats" style="margin-top: 0.25rem;">
<div class="memory-bar" style="width: 100%; height: 8px; background: #e5e7eb; border-radius: 4px; overflow: hidden;">
<div class="memory-used" style="height: 100%; background: linear-gradient(90deg, #10b981 0%, #f59e0b 70%, #ef4444 100%); width: ${(gpu.used_vram / gpu.total_vram * 100).toFixed(1)}%; transition: width 0.3s ease;"></div>
</div>
<span class="memory-text" style="font-size: 0.75rem; color: #6b7280; margin-top: 0.125rem; display: block;">
${gpu.used_vram.toFixed(1)}GB / ${gpu.total_vram.toFixed(1)}GB
${gpu.utilization > 0 ? ` • GPU: ${gpu.utilization}%` : ''}
</span>
</div>
</div>
`).join('') : node.gpu_memory.length > 0 ? node.gpu_memory.join('<br>') : 'No GPU info'}
${node.total_memory > 0 ? `<br><strong>Total: ${node.total_memory}GB</strong>` : ''} ${node.total_memory > 0 ? `<br><strong>Total: ${node.total_memory}GB</strong>` : ''}
</td> </td>
<td> <td>
......
...@@ -453,6 +453,95 @@ class ClusterClient: ...@@ -453,6 +453,95 @@ class ClusterClient:
# Clean up # Clean up
delattr(self, '_current_model_transfer') delattr(self, '_current_model_transfer')
async def monitor_gpu_usage(self) -> None:
"""Periodically monitor and send GPU usage stats to master."""
while self.connected:
try:
# Collect current GPU stats
gpu_stats = self._collect_gpu_stats()
if gpu_stats:
# Send GPU stats to master
await self._send_message({
'type': 'gpu_stats_update',
'gpu_stats': gpu_stats
})
# Send every 5 seconds
await asyncio.sleep(5)
except Exception as e:
print(f"Error monitoring GPU usage: {e}")
await asyncio.sleep(5)
def _collect_gpu_stats(self) -> Dict[str, Any]:
"""Collect current GPU usage statistics."""
try:
# Try to get actual GPU stats using pynvml (NVIDIA management library)
import nvidia_ml_py as pynvml
pynvml.nvmlInit()
device_count = pynvml.nvmlDeviceGetCount()
gpu_stats = []
for i in range(device_count):
handle = pynvml.nvmlDeviceGetHandleByIndex(i)
name = pynvml.nvmlDeviceGetName(handle)
memory_info = pynvml.nvmlDeviceGetMemoryInfo(handle)
utilization = pynvml.nvmlDeviceGetUtilizationRates(handle)
gpu_stats.append({
'device_id': i,
'name': name.decode('utf-8') if isinstance(name, bytes) else str(name),
'memory_used': memory_info.used / 1024**3, # Convert bytes to GB
'memory_total': memory_info.total / 1024**3,
'utilization': utilization.gpu,
'backend': 'cuda'
})
pynvml.nvmlShutdown()
return {'gpus': gpu_stats, 'backend': 'cuda'}
except ImportError:
# Fallback to PyTorch-only stats if pynvml not available
try:
import torch
if torch.cuda.is_available():
gpu_stats = []
for i in range(torch.cuda.device_count()):
gpu_stats.append({
'device_id': i,
'name': torch.cuda.get_device_name(i),
'memory_used': torch.cuda.memory_allocated(i) / 1024**3, # GB
'memory_total': torch.cuda.get_device_properties(i).total_memory / 1024**3,
'utilization': 0, # pynvml required for actual utilization
'backend': 'cuda'
})
return {'gpus': gpu_stats, 'backend': 'cuda'}
except:
pass
except Exception as e:
print(f"Error collecting GPU stats with pynvml: {e}")
# Fallback to PyTorch if pynvml fails
try:
import torch
if torch.cuda.is_available():
gpu_stats = []
for i in range(torch.cuda.device_count()):
gpu_stats.append({
'device_id': i,
'name': torch.cuda.get_device_name(i),
'memory_used': torch.cuda.memory_allocated(i) / 1024**3, # GB
'memory_total': torch.cuda.get_device_properties(i).total_memory / 1024**3,
'utilization': 0,
'backend': 'cuda'
})
return {'gpus': gpu_stats, 'backend': 'cuda'}
except:
pass
return None
async def _handle_restart_workers(self, message: Dict[str, Any]) -> None: async def _handle_restart_workers(self, message: Dict[str, Any]) -> None:
"""Handle restart workers command from master.""" """Handle restart workers command from master."""
backend = message.get('backend', 'cuda') backend = message.get('backend', 'cuda')
...@@ -671,6 +760,9 @@ class ClusterClient: ...@@ -671,6 +760,9 @@ class ClusterClient:
# Start command handling task # Start command handling task
command_task = asyncio.create_task(self.handle_master_commands()) command_task = asyncio.create_task(self.handle_master_commands())
# Start GPU monitoring task
gpu_monitor_task = asyncio.create_task(self.monitor_gpu_usage())
connection_lost = False connection_lost = False
try: try:
while self.connected: while self.connected:
...@@ -690,10 +782,15 @@ class ClusterClient: ...@@ -690,10 +782,15 @@ class ClusterClient:
finally: finally:
# Cleanup for this connection # Cleanup for this connection
command_task.cancel() command_task.cancel()
gpu_monitor_task.cancel()
try: try:
await command_task await command_task
except: except:
pass pass
try:
await gpu_monitor_task
except:
pass
# Don't terminate processes on reconnection, just close websocket # Don't terminate processes on reconnection, just close websocket
if self.websocket: if self.websocket:
......
...@@ -248,6 +248,17 @@ class ClusterMaster: ...@@ -248,6 +248,17 @@ class ClusterMaster:
self.clients[client_id]['last_seen'] = time.time() self.clients[client_id]['last_seen'] = time.time()
return {'type': 'heartbeat_ack'} return {'type': 'heartbeat_ack'}
elif msg_type == 'gpu_stats_update':
# Update GPU usage stats for a client
client_id = self._get_client_by_websocket(websocket)
if client_id:
gpu_stats = message.get('gpu_stats', {})
# Store GPU stats in client info for API access
if client_id in self.clients:
self.clients[client_id]['gpu_stats'] = gpu_stats
self.clients[client_id]['gpu_stats_updated'] = time.time()
return {'type': 'gpu_stats_ack'}
elif msg_type == 'pong': elif msg_type == 'pong':
return None # No response needed return None # No response needed
......
...@@ -755,10 +755,41 @@ def api_cluster_nodes(): ...@@ -755,10 +755,41 @@ def api_cluster_nodes():
gpu_info = client.get('gpu_info', {}) gpu_info = client.get('gpu_info', {})
available_backends = client.get('available_backends', []) available_backends = client.get('available_backends', [])
# Get GPU memory info from actual device info # Get GPU memory info - prefer live stats from cluster_master if available
gpu_memory = [] gpu_memory = []
total_memory = 0 total_memory = 0
gpu_usage_info = []
# Check for live GPU stats from cluster master
from .cluster_master import cluster_master
client_id = None
for cid, client_info in cluster_master.clients.items():
if client_info.get('token') == token:
client_id = cid
break
if client_id and client_id in cluster_master.clients:
live_gpu_stats = cluster_master.clients[client_id].get('gpu_stats')
if live_gpu_stats and 'gpus' in live_gpu_stats:
# Use live GPU stats
for gpu in live_gpu_stats['gpus']:
device_id = gpu.get('device_id', 0)
name = gpu.get('name', f'GPU {device_id}')
total_vram = gpu.get('memory_total', 0)
used_vram = gpu.get('memory_used', 0)
utilization = gpu.get('utilization', 0)
gpu_memory.append(f"{name}: {total_vram:.1f}GB VRAM")
gpu_usage_info.append({
'name': name,
'total_vram': total_vram,
'used_vram': used_vram,
'utilization': utilization
})
total_memory += total_vram
# Fallback to static device info if no live stats
if not gpu_memory:
# Use CUDA device info if available # Use CUDA device info if available
cuda_device_info = gpu_info.get('cuda_device_info', []) cuda_device_info = gpu_info.get('cuda_device_info', [])
for device in cuda_device_info: for device in cuda_device_info:
...@@ -766,6 +797,12 @@ def api_cluster_nodes(): ...@@ -766,6 +797,12 @@ def api_cluster_nodes():
vram_gb = device.get('vram_gb', 8) vram_gb = device.get('vram_gb', 8)
name = device.get('name', f'CUDA Device {device_id}') name = device.get('name', f'CUDA Device {device_id}')
gpu_memory.append(f"{name}: {vram_gb}GB VRAM") gpu_memory.append(f"{name}: {vram_gb}GB VRAM")
gpu_usage_info.append({
'name': name,
'total_vram': vram_gb,
'used_vram': 0, # No live data
'utilization': 0
})
total_memory += vram_gb total_memory += vram_gb
# Use ROCm device info if available # Use ROCm device info if available
...@@ -775,6 +812,12 @@ def api_cluster_nodes(): ...@@ -775,6 +812,12 @@ def api_cluster_nodes():
vram_gb = device.get('vram_gb', 16) vram_gb = device.get('vram_gb', 16)
name = device.get('name', f'ROCm Device {device_id}') name = device.get('name', f'ROCm Device {device_id}')
gpu_memory.append(f"{name}: {vram_gb}GB VRAM") gpu_memory.append(f"{name}: {vram_gb}GB VRAM")
gpu_usage_info.append({
'name': name,
'total_vram': vram_gb,
'used_vram': 0, # No live data
'utilization': 0
})
total_memory += vram_gb total_memory += vram_gb
# Fallback to old format if no device info available # Fallback to old format if no device info available
...@@ -782,10 +825,26 @@ def api_cluster_nodes(): ...@@ -782,10 +825,26 @@ def api_cluster_nodes():
cuda_devices = gpu_info.get('cuda_devices', 0) cuda_devices = gpu_info.get('cuda_devices', 0)
rocm_devices = gpu_info.get('rocm_devices', 0) rocm_devices = gpu_info.get('rocm_devices', 0)
if cuda_devices > 0: if cuda_devices > 0:
gpu_memory.extend([f"CUDA Device {i}: 8GB VRAM" for i in range(cuda_devices)]) for i in range(cuda_devices):
name = f"CUDA Device {i}"
gpu_memory.append(f"{name}: 8GB VRAM")
gpu_usage_info.append({
'name': name,
'total_vram': 8,
'used_vram': 0,
'utilization': 0
})
total_memory += cuda_devices * 8 total_memory += cuda_devices * 8
if rocm_devices > 0: if rocm_devices > 0:
gpu_memory.extend([f"ROCm Device {i}: 16GB VRAM" for i in range(rocm_devices)]) for i in range(rocm_devices):
name = f"ROCm Device {i}"
gpu_memory.append(f"{name}: 16GB VRAM")
gpu_usage_info.append({
'name': name,
'total_vram': 16,
'used_vram': 0,
'utilization': 0
})
total_memory += rocm_devices * 16 total_memory += rocm_devices * 16
# Calculate uptime from connected_at (when client first connected) # Calculate uptime from connected_at (when client first connected)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment