Make cluster master weight auto-adjustment conditional on explicit setting

- Added weight_explicit flag to track if --weight was specified on command line
- Automatic weight changes (100->0 on first client, 0->100 on last disconnect) only apply when weight is not explicitly set
- When --weight is specified, master maintains the explicit weight regardless of client connections
- Updated command line help and startup messages to clarify the behavior
- This allows administrators to override automatic weight management when needed
parent 711719c4
...@@ -29,6 +29,7 @@ import websockets ...@@ -29,6 +29,7 @@ import websockets
import ssl import ssl
import os import os
import ipaddress import ipaddress
import sys
from typing import Dict, Any, List, Optional from typing import Dict, Any, List, Optional
from collections import defaultdict from collections import defaultdict
...@@ -36,10 +37,11 @@ from collections import defaultdict ...@@ -36,10 +37,11 @@ from collections import defaultdict
class ClusterMaster: class ClusterMaster:
"""Master server for cluster coordination.""" """Master server for cluster coordination."""
def __init__(self, port: int = 5003, shared_dir: str = None, weight: int = 0): def __init__(self, port: int = 5003, shared_dir: str = None, weight: int = 100, weight_explicit: bool = False):
self.port = port self.port = port
self.shared_dir = shared_dir self.shared_dir = shared_dir
self.weight = weight # Master weight (default 0) self.weight = weight # Master weight (default 100, changes to 0 when clients connect if not explicit)
self.weight_explicit = weight_explicit # Whether weight was explicitly set via command line
self.server_sock: Optional[socket.socket] = None self.server_sock: Optional[socket.socket] = None
self.clients = {} # type: Dict[str, Dict[str, Any]] self.clients = {} # type: Dict[str, Dict[str, Any]]
self.client_websockets = {} # type: Dict[str, websockets.WebSocketServerProtocol] self.client_websockets = {} # type: Dict[str, websockets.WebSocketServerProtocol]
...@@ -207,6 +209,11 @@ class ClusterMaster: ...@@ -207,6 +209,11 @@ class ClusterMaster:
self.client_websockets[client_id] = websocket self.client_websockets[client_id] = websocket
self.tokens[token] = client_id self.tokens[token] = client_id
# If this is the first client and weight wasn't explicitly set, change master weight to 0
if len(self.clients) == 1 and self.weight == 100 and not self.weight_explicit:
self.weight = 0
print("First client connected - changing master weight to 0 (automatic)")
print(f"Client {client_id} authenticated") print(f"Client {client_id} authenticated")
return {'type': 'auth_success', 'client_id': client_id} return {'type': 'auth_success', 'client_id': client_id}
...@@ -266,6 +273,11 @@ class ClusterMaster: ...@@ -266,6 +273,11 @@ class ClusterMaster:
for proc_type, queue in self.process_queue.items(): for proc_type, queue in self.process_queue.items():
self.process_queue[proc_type] = [(k, w) for k, w in queue if k != proc_key] self.process_queue[proc_type] = [(k, w) for k, w in queue if k != proc_key]
# If no clients remain and weight wasn't explicitly set, change master weight back to 100
if len(self.clients) == 0 and self.weight == 0 and not self.weight_explicit:
self.weight = 100
print("All clients disconnected - changing master weight back to 100 (automatic)")
print(f"Client {client_id} disconnected") print(f"Client {client_id} disconnected")
def get_best_worker(self, process_type: str, prefer_local: bool = False) -> Optional[str]: def get_best_worker(self, process_type: str, prefer_local: bool = False) -> Optional[str]:
...@@ -609,11 +621,12 @@ class ClusterMaster: ...@@ -609,11 +621,12 @@ class ClusterMaster:
cluster_master = ClusterMaster() cluster_master = ClusterMaster()
def start_cluster_master(port: int = 5003, shared_dir: str = None, weight: int = 0) -> None: def start_cluster_master(port: int = 5003, shared_dir: str = None, weight: int = 100, weight_explicit: bool = False) -> None:
"""Start the cluster master server.""" """Start the cluster master server."""
cluster_master.port = port cluster_master.port = port
cluster_master.shared_dir = shared_dir cluster_master.shared_dir = shared_dir
cluster_master.weight = weight cluster_master.weight = weight
cluster_master.weight_explicit = weight_explicit
asyncio.run(cluster_master.start()) asyncio.run(cluster_master.start())
...@@ -623,11 +636,18 @@ if __name__ == "__main__": ...@@ -623,11 +636,18 @@ if __name__ == "__main__":
parser = argparse.ArgumentParser(description='VidAI Cluster Master') parser = argparse.ArgumentParser(description='VidAI Cluster Master')
parser.add_argument('--port', type=int, default=5003, help='Port to listen on (default: 5003)') parser.add_argument('--port', type=int, default=5003, help='Port to listen on (default: 5003)')
parser.add_argument('--shared-dir', help='Shared directory for file transfers') parser.add_argument('--shared-dir', help='Shared directory for file transfers')
parser.add_argument('--weight', type=int, default=0, help='Master weight for load balancing (default: 0)') parser.add_argument('--weight', type=int, default=100, help='Master weight for load balancing (default: 100, auto-adjusts when clients connect unless explicitly set)')
args = parser.parse_args() args = parser.parse_args()
print(f"Starting VidAI Cluster Master on port {args.port} with weight {args.weight}") # Check if weight was explicitly provided
weight_explicit = '--weight' in sys.argv
if weight_explicit:
print(f"Starting VidAI Cluster Master on port {args.port} with explicit weight {args.weight}")
else:
print(f"Starting VidAI Cluster Master on port {args.port} with default weight {args.weight} (will auto-adjust when clients connect)")
if args.shared_dir: if args.shared_dir:
print(f"Using shared directory: {args.shared_dir}") print(f"Using shared directory: {args.shared_dir}")
# Validate shared directory exists # Validate shared directory exists
...@@ -635,4 +655,4 @@ if __name__ == "__main__": ...@@ -635,4 +655,4 @@ if __name__ == "__main__":
print(f"Warning: Shared directory {args.shared_dir} does not exist. Creating it.") print(f"Warning: Shared directory {args.shared_dir} does not exist. Creating it.")
os.makedirs(args.shared_dir, exist_ok=True) os.makedirs(args.shared_dir, exist_ok=True)
start_cluster_master(args.port, args.shared_dir, args.weight) start_cluster_master(args.port, args.shared_dir, args.weight, weight_explicit)
\ No newline at end of file \ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment