Implement GPU prioritization and weight-based job distribution

- Added --weight parameter to client connections (default: 100)
- Modified cluster master to prioritize GPU-enabled clients for job distribution
- GPU clients always get precedence over CPU-only clients
- When no GPU workers have required model, GPU clients still preferred for model distribution
- Client weights are combined with process weights for load balancing
- Higher weight = more jobs assigned to that client

Job distribution priority:
1. GPU clients with required model already loaded
2. CPU clients with required model already loaded
3. GPU clients (model will be sent)
4. CPU clients (model will be sent)

Within each category, clients are selected based on combined weight.
parent 6f92e72a
......@@ -238,6 +238,13 @@ Examples:
help='Run as cluster client (connects to master instead of starting web interface)'
)
parser.add_argument(
'--weight',
type=int,
default=100,
help='Client weight for job distribution (default: 100, higher = more jobs)'
)
parser.add_argument(
'--no-gpu',
action='store_true',
......@@ -281,7 +288,7 @@ Examples:
# Start cluster client process
from vidai.cluster_client import start_cluster_client
start_cluster_client(args.cluster_host, args.cluster_port, args.token, args.optimize, args.flash)
start_cluster_client(args.cluster_host, args.cluster_port, args.token, args.optimize, args.flash, args.weight)
else:
print("Starting Video AI Analysis Tool...")
print(f"Server will be available at http://{args.host}:{args.port}")
......
......@@ -35,12 +35,13 @@ from .config import get_analysis_backend, get_training_backend
class ClusterClient:
"""Client that connects to cluster master."""
def __init__(self, host: str, port: int, token: str, optimize: bool = False, flash: bool = False):
def __init__(self, host: str, port: int, token: str, optimize: bool = False, flash: bool = False, weight: int = 100):
self.host = host
self.port = port
self.token = token
self.optimize = optimize
self.flash = flash
self.client_weight = weight # Overall client weight for job distribution
self.websocket: Optional[websockets.WebSocketServerProtocol] = None
self.connected = False
self.local_processes = {} # type: Dict[str, subprocess.Popen]
......@@ -64,13 +65,14 @@ class ClusterClient:
for backend in available_backends:
capabilities.extend([f'analysis_{backend}', f'training_{backend}'])
# Send authentication with detected capabilities
# Send authentication with detected capabilities and weight
auth_msg = {
'type': 'auth',
'token': self.token,
'client_info': {
'type': 'worker_node',
'capabilities': capabilities,
'weight': self.client_weight,
'gpu_info': {
'cuda_available': gpu_info['cuda'],
'rocm_available': gpu_info['rocm'],
......@@ -370,7 +372,7 @@ class ClusterClient:
await self.websocket.close()
def start_cluster_client(host: str, port: int, token: str, optimize: bool = False, flash: bool = False) -> None:
def start_cluster_client(host: str, port: int, token: str, optimize: bool = False, flash: bool = False, weight: int = 100) -> None:
"""Start the cluster client."""
client = ClusterClient(host, port, token, optimize, flash)
client = ClusterClient(host, port, token, optimize, flash, weight)
asyncio.run(client.run())
\ No newline at end of file
......@@ -118,10 +118,11 @@ class ClusterMaster:
# Generate client ID from token
client_id = hashlib.sha256(token.encode()).hexdigest()[:16]
# Store client info including GPU capabilities
# Store client info including GPU capabilities and weight
self.clients[client_id] = {
'token': token,
'info': client_info,
'weight': client_info.get('weight', 100),
'gpu_info': client_info.get('gpu_info', {}),
'connected_at': time.time(),
'last_seen': time.time()
......@@ -190,7 +191,7 @@ class ClusterMaster:
print(f"Client {client_id} disconnected")
def get_best_worker(self, process_type: str) -> Optional[str]:
def get_best_worker(self, process_type: str, prefer_local: bool = False) -> Optional[str]:
"""Get the best available worker for a process type."""
if process_type not in self.process_queue:
return None
......@@ -199,8 +200,29 @@ class ClusterMaster:
if not queue:
return None
# Return the highest weighted available worker
return queue[0][0]
if prefer_local:
# Look for local workers first (those with "local_" prefix or similar)
local_workers = [(k, w) for k, w in queue if k.startswith('local_')]
if local_workers:
# Return the highest weighted local worker
local_workers.sort(key=lambda x: x[1], reverse=True)
return local_workers[0][0]
# Return the highest weighted available worker (using client weights)
# Sort by client weight, not process weight
weighted_workers = []
for proc_key, proc_weight in queue:
client_id = self.processes[proc_key]['client_id']
client_weight = self.clients[client_id]['weight']
# Combine client weight with process weight
combined_weight = client_weight * proc_weight
weighted_workers.append((proc_key, combined_weight))
if weighted_workers:
weighted_workers.sort(key=lambda x: x[1], reverse=True)
return weighted_workers[0][0]
return None
def get_workers_with_model(self, process_type: str, model_path: str) -> List[str]:
"""Get workers that have a specific model available."""
......@@ -292,28 +314,51 @@ class ClusterMaster:
# First try to find workers that already have the model
workers_with_model = self.get_workers_with_model(process_type, model_path)
if workers_with_model:
# Use the best worker that has the model
best_worker = None
best_weight = -1
# Prioritize workers on clients with local GPUs
gpu_workers = []
cpu_workers = []
for worker_key in workers_with_model:
for proc_key, weight in self.process_queue[process_type]:
if proc_key == worker_key and weight > best_weight:
client_id = self.processes[worker_key]['client_id']
client_gpu_info = self.clients[client_id]['gpu_info']
has_gpu = client_gpu_info.get('cuda_available', False) or client_gpu_info.get('rocm_available', False)
if has_gpu:
gpu_workers.append(worker_key)
else:
cpu_workers.append(worker_key)
# Prefer GPU workers first
candidate_workers = gpu_workers if gpu_workers else cpu_workers
if candidate_workers:
# Use the best worker from candidates (based on combined client + process weight)
best_worker = None
best_combined_weight = -1
for worker_key in candidate_workers:
client_id = self.processes[worker_key]['client_id']
client_weight = self.clients[client_id]['weight']
process_weight = self.processes[worker_key]['weight']
combined_weight = client_weight * process_weight
if combined_weight > best_combined_weight:
best_worker = worker_key
best_weight = weight
break
best_combined_weight = combined_weight
if best_worker:
client_id = self.processes[best_worker]['client_id']
job_id = f"job_{int(time.time())}_{hash(str(job_data)) % 1000}"
await self.client_websockets[client_id].send(json.dumps({
'type': 'job_assignment',
'job_id': job_id,
'job_data': job_data
}))
return job_id
if best_worker:
client_id = self.processes[best_worker]['client_id']
job_id = f"job_{int(time.time())}_{hash(str(job_data)) % 1000}"
await self.client_websockets[client_id].send(json.dumps({
'type': 'job_assignment',
'job_id': job_id,
'job_data': job_data
}))
return job_id
# If no worker has the model, find the best available worker and send the model
best_worker = self.get_best_worker(process_type)
# Prioritize GPU-enabled clients
best_worker = self.get_best_worker_for_model(process_type, model_path)
if best_worker:
client_id = self.processes[best_worker]['client_id']
......@@ -340,6 +385,42 @@ class ClusterMaster:
return None
def get_best_worker_for_model(self, process_type: str, model_path: str) -> Optional[str]:
"""Get the best worker for a process type, considering model distribution."""
if process_type not in self.process_queue:
return None
queue = self.process_queue[process_type]
if not queue:
return None
# Separate GPU and CPU clients
gpu_workers = []
cpu_workers = []
for proc_key, proc_weight in queue:
client_id = self.processes[proc_key]['client_id']
client_gpu_info = self.clients[client_id]['gpu_info']
has_gpu = client_gpu_info.get('cuda_available', False) or client_gpu_info.get('rocm_available', False)
client_weight = self.clients[client_id]['weight']
combined_weight = client_weight * proc_weight
if has_gpu:
gpu_workers.append((proc_key, combined_weight))
else:
cpu_workers.append((proc_key, combined_weight))
# Prefer GPU workers first
candidates = gpu_workers if gpu_workers else cpu_workers
if candidates:
# Return the highest weighted worker
candidates.sort(key=lambda x: x[1], reverse=True)
return candidates[0][0]
return None
def disable_process(self, process_key: str) -> bool:
"""Disable a specific process."""
if process_key in self.processes:
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment