Refactor cluster nodes display to show nodes instead of individual workers

- Modified API to aggregate workers per node instead of showing each worker separately
- Each cluster node now appears as a single row with summarized worker information
- Workers column shows count and types: '2 workers - Analysis (CUDA), Training (ROCm)'
- Local workers are grouped into a single 'Local Master Node' entry
- Updated frontend to display worker summaries with detailed breakdown
- Updated API documentation to reflect new response format with workers_summary field
parent 27e73381
...@@ -94,7 +94,7 @@ ...@@ -94,7 +94,7 @@
<thead> <thead>
<tr> <tr>
<th>Status</th> <th>Status</th>
<th>Token Name</th> <th>Node Name</th>
<th>Hostname</th> <th>Hostname</th>
<th>Weight</th> <th>Weight</th>
<th>GPUs</th> <th>GPUs</th>
...@@ -198,7 +198,10 @@ function renderNodesTable() { ...@@ -198,7 +198,10 @@ function renderNodesTable() {
${node.gpu_memory.length > 0 ? node.gpu_memory.join('<br>') : 'No GPU info'} ${node.gpu_memory.length > 0 ? node.gpu_memory.join('<br>') : 'No GPU info'}
${node.total_memory > 0 ? `<br><strong>Total: ${node.total_memory}GB</strong>` : ''} ${node.total_memory > 0 ? `<br><strong>Total: ${node.total_memory}GB</strong>` : ''}
</td> </td>
<td>${node.workers_available}</td> <td>
<strong>${node.workers_available}</strong> workers<br>
<small>${node.workers_summary || 'No workers'}</small>
</td>
<td>${node.ip_address}</td> <td>${node.ip_address}</td>
<td>${formatUptime(node.uptime_seconds || 0)}</td> <td>${formatUptime(node.uptime_seconds || 0)}</td>
<td>${node.active_jobs || 0}</td> <td>${node.active_jobs || 0}</td>
......
...@@ -450,6 +450,7 @@ ...@@ -450,6 +450,7 @@
"gpu_memory": ["CUDA Device 0: 8GB VRAM"], "gpu_memory": ["CUDA Device 0: 8GB VRAM"],
"total_memory": 8, "total_memory": 8,
"workers_available": 2, "workers_available": 2,
"workers_summary": "Analysis (CUDA), Training (ROCm)",
"ip_address": "192.168.1.100", "ip_address": "192.168.1.100",
"connected": true, "connected": true,
"last_seen": 1640995300.0, "last_seen": 1640995300.0,
......
...@@ -392,87 +392,134 @@ def api_cluster_nodes(): ...@@ -392,87 +392,134 @@ def api_cluster_nodes():
total_active_jobs = 0 total_active_jobs = 0
total_completed_jobs = 0 total_completed_jobs = 0
# Get active clients # Get active clients - group by hostname/token (one row per node)
node_map = {}
for client_id, client_info in cluster_master.clients.items(): for client_id, client_info in cluster_master.clients.items():
hostname = client_info['info'].get('hostname', 'unknown') hostname = client_info['info'].get('hostname', 'unknown')
token = client_info['token'] token = client_info['token']
token_name = worker_tokens.get(token, 'Unknown Token') token_name = worker_tokens.get(token, 'Unknown Token')
node_key = f"{hostname}:{token}"
if node_key not in node_map:
gpu_info = client_info.get('gpu_info', {})
cuda_devices = gpu_info.get('cuda_devices', 0)
rocm_devices = gpu_info.get('rocm_devices', 0)
# Get GPU memory info
gpu_memory = []
if cuda_devices > 0:
gpu_memory.extend([f"CUDA Device {i}: 8GB VRAM" for i in range(cuda_devices)])
if rocm_devices > 0:
gpu_memory.extend([f"ROCm Device {i}: 16GB VRAM" for i in range(rocm_devices)])
total_memory = sum([8 if 'CUDA' in mem else 16 if 'ROCm' in mem else 0 for mem in gpu_memory])
# Calculate uptime
connected_at = client_info.get('connected_at', current_time)
uptime_seconds = current_time - connected_at
node_map[node_key] = {
'token': token,
'token_name': token_name,
'hostname': hostname,
'gpus': len(gpu_memory),
'gpu_memory': gpu_memory,
'total_memory': total_memory,
'ip_address': '127.0.0.1', # Placeholder
'connected': True,
'last_seen': client_info.get('last_seen', 0),
'uptime_seconds': uptime_seconds,
'active_jobs': 0, # Placeholder
'completed_jobs': 0, # Placeholder
'weight': client_info.get('weight', 100),
'is_local': False,
'workers': [] # Will collect worker details
}
gpu_info = client_info.get('gpu_info', {}) # Add worker processes for this node
cuda_devices = gpu_info.get('cuda_devices', 0) node_workers = [p for p in cluster_master.processes.values() if p['client_id'] == client_id]
rocm_devices = gpu_info.get('rocm_devices', 0) for proc in node_workers:
worker_info = {
# Get GPU memory info (placeholder - would need actual GPU detection) 'type': proc['name'].split('_')[0], # analysis or training
gpu_memory = [] 'backend': proc['name'].split('_')[1] if '_' in proc['name'] else 'unknown',
if cuda_devices > 0: 'weight': proc.get('weight', 10),
# Placeholder values - in real implementation, get from GPU detection 'model': proc.get('model', 'default'),
gpu_memory.extend([f"CUDA Device {i}: 8GB VRAM" for i in range(cuda_devices)]) 'status': proc.get('status', 'active')
if rocm_devices > 0: }
gpu_memory.extend([f"ROCm Device {i}: 16GB VRAM" for i in range(rocm_devices)]) node_map[node_key]['workers'].append(worker_info)
total_memory = sum([8 if 'CUDA' in mem else 16 if 'ROCm' in mem else 0 for mem in gpu_memory]) # Convert node_map to nodes list
for node_key, node_data in node_map.items():
# Get IP address (placeholder - would need to get from websocket) # Summarize workers
ip_address = '127.0.0.1' # Placeholder workers_available = len(node_data['workers'])
worker_types = []
# Get workers available (processes) if any(w['type'] == 'analysis' for w in node_data['workers']):
workers_available = len([p for p in cluster_master.processes.values() if p['client_id'] == client_id]) analysis_backends = set(w['backend'] for w in node_data['workers'] if w['type'] == 'analysis')
worker_types.append(f"Analysis ({', '.join(analysis_backends)})")
# Calculate uptime if any(w['type'] == 'training' for w in node_data['workers']):
connected_at = client_info.get('connected_at', current_time) training_backends = set(w['backend'] for w in node_data['workers'] if w['type'] == 'training')
uptime_seconds = current_time - connected_at worker_types.append(f"Training ({', '.join(training_backends)})")
# Job statistics (placeholder - would need integration with job queue) node_data['workers_available'] = workers_available
# In a real implementation, track jobs per client node_data['worker_types'] = worker_types
active_jobs = 0 # Placeholder node_data['workers_summary'] = ', '.join(worker_types) if worker_types else 'No workers'
completed_jobs = 0 # Placeholder
total_active_jobs += node_data['active_jobs']
total_active_jobs += active_jobs total_completed_jobs += node_data['completed_jobs']
total_completed_jobs += completed_jobs
nodes.append(node_data)
nodes.append({
'token': token, # Detect and aggregate local worker processes on master
'token_name': token_name,
'hostname': hostname,
'gpus': len(gpu_memory),
'gpu_memory': gpu_memory,
'total_memory': total_memory,
'workers_available': workers_available,
'ip_address': ip_address,
'connected': True,
'last_seen': client_info.get('last_seen', 0),
'uptime_seconds': uptime_seconds,
'active_jobs': active_jobs,
'completed_jobs': completed_jobs,
'weight': client_info.get('weight', 100),
'is_local': False
})
# Detect local worker processes on master
local_workers = detect_local_workers() local_workers = detect_local_workers()
for worker in local_workers: if local_workers:
nodes.append({ # Group local workers by type
local_analysis = [w for w in local_workers if w['type'] == 'analysis']
local_training = [w for w in local_workers if w['type'] == 'training']
# Calculate combined GPU info for local node
total_gpus = sum(w.get('gpus', 0) for w in local_workers)
all_gpu_memory = []
seen_memory = set()
for w in local_workers:
for mem in w.get('gpu_memory', []):
if mem not in seen_memory:
all_gpu_memory.append(mem)
seen_memory.add(mem)
total_memory = sum(w.get('total_memory', 0) for w in local_workers)
# Worker summary for local node
worker_types = []
if local_analysis:
backends = set(w['backend'] for w in local_analysis)
worker_types.append(f"Analysis ({', '.join(backends)})")
if local_training:
backends = set(w['backend'] for w in local_training)
worker_types.append(f"Training ({', '.join(backends)})")
# Use the longest uptime as representative
max_uptime = max((w.get('uptime_seconds', 0) for w in local_workers), default=0)
local_node = {
'token': 'local', 'token': 'local',
'token_name': f'Local {worker["type"].title()} Worker', 'token_name': 'Local Master Node',
'hostname': 'localhost', 'hostname': 'localhost',
'gpus': worker.get('gpus', 0), 'gpus': total_gpus,
'gpu_memory': worker.get('gpu_memory', []), 'gpu_memory': all_gpu_memory,
'total_memory': worker.get('total_memory', 0), 'total_memory': total_memory,
'workers_available': 1, 'workers_available': len(local_workers),
'worker_types': worker_types,
'workers_summary': ', '.join(worker_types) if worker_types else 'No workers',
'ip_address': '127.0.0.1', 'ip_address': '127.0.0.1',
'connected': True, 'connected': True,
'last_seen': current_time, 'last_seen': current_time,
'uptime_seconds': worker.get('uptime_seconds', 0), 'uptime_seconds': max_uptime,
'active_jobs': 0, # Placeholder 'active_jobs': 0, # Placeholder
'completed_jobs': 0, # Placeholder 'completed_jobs': 0, # Placeholder
'weight': 0, # Local workers don't participate in cluster load balancing 'weight': 0, # Local workers don't participate in cluster load balancing
'is_local': True, 'is_local': True
'backend': worker.get('backend', 'unknown') }
}) nodes.append(local_node)
# Get recently disconnected clients (last 10 that were connected in last 10 minutes)
# This is a simplified version - in real implementation, you'd need persistent storage
# For now, just return active ones
# Sort: active first, then by last_seen desc # Sort: active first, then by last_seen desc
nodes.sort(key=lambda x: (not x['connected'], -x['last_seen'])) nodes.sort(key=lambda x: (not x['connected'], -x['last_seen']))
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment