Enforce GPU-only cluster participation

- Cluster clients now refuse to connect without GPU capabilities (CUDA/ROCm)
- Cluster master rejects authentication from clients without GPU backends
- Local master node only appears in cluster nodes list if GPU backends are available
- Master already prevented launching local worker processes without GPUs
- Systems without GPUs cannot participate in distributed processing
- Clear error messages when GPU requirements are not met
- Maintains cluster integrity by ensuring all nodes contribute computational power
parent abec9e31
......@@ -54,6 +54,17 @@ class ClusterClient:
async def connect(self) -> bool:
"""Connect to cluster master via secure websocket."""
try:
# Detect available backends first
from .compat import detect_gpu_backends, get_available_backends
gpu_info = detect_gpu_backends()
available_backends = get_available_backends()
# Check if we have any GPU backends available
gpu_backends = [b for b in available_backends if b in ['cuda', 'rocm']]
if not gpu_backends:
print("No GPU backends detected (CUDA/ROCm). Cluster client requires GPU capabilities to connect.")
return False
# Create SSL context that accepts self-signed certificates
ssl_context = ssl.create_default_context()
ssl_context.check_hostname = False
......@@ -62,11 +73,6 @@ class ClusterClient:
uri = f"wss://{self.host}:{self.port}/cluster"
self.websocket = await websockets.connect(uri, ssl=ssl_context)
# Detect available backends
from .compat import detect_gpu_backends, get_available_backends
gpu_info = detect_gpu_backends()
available_backends = get_available_backends()
# Get hostname
import socket
hostname = socket.gethostname()
......
......@@ -186,7 +186,7 @@ class ClusterMaster:
return {'type': 'error', 'message': 'Unknown message type'}
def _handle_auth(self, message: Dict[str, Any], client_sock: socket.socket) -> Dict[str, Any]:
def _handle_auth(self, message: Dict[str, Any], websocket: websockets.WebSocketServerProtocol) -> Dict[str, Any]:
"""Handle client authentication."""
token = message.get('token')
client_info = message.get('client_info', {})
......@@ -194,6 +194,14 @@ class ClusterMaster:
if not token:
return {'type': 'auth_failed', 'message': 'No token provided'}
# Check if client has GPU capabilities
gpu_info = client_info.get('gpu_info', {})
available_backends = gpu_info.get('available_backends', [])
gpu_backends = [b for b in available_backends if b in ['cuda', 'rocm']]
if not gpu_backends:
return {'type': 'auth_failed', 'message': 'Client must have GPU capabilities (CUDA or ROCm) to join cluster'}
# Generate client ID from token
client_id = hashlib.sha256(token.encode()).hexdigest()[:16]
......@@ -202,7 +210,7 @@ class ClusterMaster:
'token': token,
'info': client_info,
'weight': client_info.get('weight', 100),
'gpu_info': client_info.get('gpu_info', {}),
'gpu_info': gpu_info,
'connected_at': time.time(),
'last_seen': time.time()
}
......@@ -214,7 +222,7 @@ class ClusterMaster:
self.weight = 0
print("First client connected - changing master weight to 0 (automatic)")
print(f"Client {client_id} authenticated")
print(f"Client {client_id} authenticated with GPU backends: {gpu_backends}")
return {'type': 'auth_success', 'client_id': client_id}
def _handle_register_processes(self, message: Dict[str, Any], websocket: websockets.WebSocketServerProtocol) -> Dict[str, Any]:
......
......@@ -486,61 +486,61 @@ def api_cluster_nodes():
nodes.append(node_data)
# Detect and aggregate local worker processes on master
local_workers = detect_local_workers()
if local_workers:
# Group local workers by type
local_analysis = [w for w in local_workers if w['type'] == 'analysis']
local_training = [w for w in local_workers if w['type'] == 'training']
# Calculate combined GPU info for local node
total_gpus = sum(w.get('gpus', 0) for w in local_workers)
all_gpu_memory = []
seen_memory = set()
for w in local_workers:
for mem in w.get('gpu_memory', []):
if mem not in seen_memory:
all_gpu_memory.append(mem)
seen_memory.add(mem)
total_memory = sum(w.get('total_memory', 0) for w in local_workers)
# Worker summary for local node
worker_types = []
if local_analysis:
backends = set(w['backend'] for w in local_analysis)
worker_types.append(f"Analysis ({', '.join(backends)})")
if local_training:
backends = set(w['backend'] for w in local_training)
worker_types.append(f"Training ({', '.join(backends)})")
# Use the longest uptime as representative
max_uptime = max((w.get('uptime_seconds', 0) for w in local_workers), default=0)
# Get available GPU backends for local system
from vidai.compat import get_available_backends
local_available_gpu_backends = [b for b in get_available_backends() if b in ['cuda', 'rocm']]
local_node = {
'token': 'local',
'token_name': 'Local Master Node',
'hostname': 'localhost',
'gpus': total_gpus,
'gpu_memory': all_gpu_memory,
'total_memory': total_memory,
'workers_available': len(local_workers),
'worker_types': worker_types,
'workers_summary': ', '.join(worker_types) if worker_types else 'No workers',
'ip_address': '127.0.0.1',
'connected': True,
'last_seen': current_time,
'uptime_seconds': max_uptime,
'active_jobs': 0, # Placeholder
'completed_jobs': 0, # Placeholder
'weight': 0, # Local workers don't participate in cluster load balancing
'is_local': True,
'mixed_gpu': len(local_available_gpu_backends) > 1,
'available_gpu_backends': local_available_gpu_backends
}
nodes.append(local_node)
# Only show local node if GPU backends are available
from vidai.compat import get_available_backends
local_available_gpu_backends = [b for b in get_available_backends() if b in ['cuda', 'rocm']]
if local_available_gpu_backends:
local_workers = detect_local_workers()
if local_workers:
# Group local workers by type
local_analysis = [w for w in local_workers if w['type'] == 'analysis']
local_training = [w for w in local_workers if w['type'] == 'training']
# Calculate combined GPU info for local node
total_gpus = sum(w.get('gpus', 0) for w in local_workers)
all_gpu_memory = []
seen_memory = set()
for w in local_workers:
for mem in w.get('gpu_memory', []):
if mem not in seen_memory:
all_gpu_memory.append(mem)
seen_memory.add(mem)
total_memory = sum(w.get('total_memory', 0) for w in local_workers)
# Worker summary for local node
worker_types = []
if local_analysis:
backends = set(w['backend'] for w in local_analysis)
worker_types.append(f"Analysis ({', '.join(backends)})")
if local_training:
backends = set(w['backend'] for w in local_training)
worker_types.append(f"Training ({', '.join(backends)})")
# Use the longest uptime as representative
max_uptime = max((w.get('uptime_seconds', 0) for w in local_workers), default=0)
local_node = {
'token': 'local',
'token_name': 'Local Master Node',
'hostname': 'localhost',
'gpus': total_gpus,
'gpu_memory': all_gpu_memory,
'total_memory': total_memory,
'workers_available': len(local_workers),
'worker_types': worker_types,
'workers_summary': ', '.join(worker_types) if worker_types else 'No workers',
'ip_address': '127.0.0.1',
'connected': True,
'last_seen': current_time,
'uptime_seconds': max_uptime,
'active_jobs': 0, # Placeholder
'completed_jobs': 0, # Placeholder
'weight': 0, # Local workers don't participate in cluster load balancing
'is_local': True,
'mixed_gpu': len(local_available_gpu_backends) > 1,
'available_gpu_backends': local_available_gpu_backends
}
nodes.append(local_node)
# Sort: active first, then by last_seen desc
nodes.sort(key=lambda x: (not x['connected'], -x['last_seen']))
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment