Add --cluster-shared-dir option for optimized file transfers

- Add --shared-dir argument to cluster_master.py and cluster_client.py
- Implement shared directory file transfer for model files
- Falls back to websocket transfer if shared directory unavailable
- Update cluster client to handle model_shared_file messages
- Add documentation for shared directory feature in architecture.md
- Maintain backward compatibility with existing websocket transfers
parent 3c309139
......@@ -290,6 +290,21 @@ Master Node (Port 5003) ◄── Cluster Control ──► Worker Node
(Port 5002)
```
### Shared Directory File Transfer
For improved performance in cluster environments, both master and client support a `--shared-dir` option:
```
Master: python -m vidai.cluster_master --shared-dir /mnt/cluster_shared
Client: python -m vidai.cluster_client master.host token --shared-dir /mnt/cluster_shared
```
When configured:
- Model files are placed in the shared directory instead of being transferred via websocket
- Reduces network overhead for large model files
- Requires the shared directory to be accessible by both master and all clients
- Falls back to websocket transfer if shared directory is unavailable
## Deployment
### Development
......
......@@ -297,6 +297,12 @@ Examples:
help='Client weight for job distribution (default: 100, higher = more jobs)'
)
parser.add_argument(
'--cluster-shared-dir',
default=None,
help='Shared directory for cluster file transfers (must be accessible by both master and clients)'
)
parser.add_argument(
'--no-gpu',
action='store_true',
......@@ -350,7 +356,7 @@ Examples:
# Start cluster client process
from vidai.cluster_client import start_cluster_client
start_cluster_client(args.cluster_host, args.cluster_port, args.token, args.optimize, args.flash, args.weight)
start_cluster_client(args.cluster_host, args.cluster_port, args.token, args.optimize, args.flash, args.weight, args.cluster_shared_dir)
else:
print("Starting Video AI Analysis Tool...")
print(f"Server will be available at http://{args.host}:{args.port}")
......
......@@ -36,13 +36,14 @@ from .config import get_analysis_backend, get_training_backend
class ClusterClient:
"""Client that connects to cluster master."""
def __init__(self, host: str, port: int, token: str, optimize: bool = False, flash: bool = False, weight: int = 100):
def __init__(self, host: str, port: int, token: str, optimize: bool = False, flash: bool = False, weight: int = 100, shared_dir: str = None):
self.host = host
self.port = port
self.token = token
self.optimize = optimize
self.flash = flash
self.client_weight = weight # Overall client weight for job distribution
self.shared_dir = shared_dir # Shared directory for file transfers
self.websocket: Optional[websockets.WebSocketServerProtocol] = None
self.connected = False
self.local_processes = {} # type: Dict[str, subprocess.Popen]
......@@ -84,6 +85,7 @@ class ClusterClient:
'hostname': hostname,
'capabilities': capabilities,
'weight': self.client_weight,
'shared_dir': self.shared_dir,
'gpu_info': {
'cuda_available': gpu_info['cuda'],
'rocm_available': gpu_info['rocm'],
......@@ -237,6 +239,9 @@ class ClusterClient:
elif msg_type == 'model_transfer_complete':
await self._handle_model_transfer_complete(message)
elif msg_type == 'model_shared_file':
await self._handle_model_shared_file(message)
except Exception as e:
print(f"Error handling master command: {e}")
break
......@@ -351,6 +356,43 @@ class ClusterClient:
# Clean up
delattr(self, '_current_model_transfer')
async def _handle_model_shared_file(self, message: Dict[str, Any]) -> None:
"""Handle model file available in shared directory."""
model_path = message.get('model_path')
shared_file_path = message.get('shared_file_path')
total_size = message.get('total_size', 0)
if not self.shared_dir:
print(f"Received shared file message but no shared directory configured: {shared_file_path}")
return
try:
# Verify the file exists and has the correct size
if os.path.exists(shared_file_path):
actual_size = os.path.getsize(shared_file_path)
if actual_size == total_size:
# Copy file to local models directory
import shutil
os.makedirs('models', exist_ok=True)
local_model_file = f"models/{model_path.replace('/', '_')}.bin"
shutil.copy2(shared_file_path, local_model_file)
print(f"Model {model_path} copied from shared directory: {shared_file_path} -> {local_model_file}")
# Update local model availability
for proc_name in self.process_models:
if proc_name.startswith('analysis_') or proc_name.startswith('training_'):
self.process_models[proc_name] = model_path
# Clean up shared file (optional - master might want to keep it for other clients)
# os.remove(shared_file_path)
else:
print(f"Shared file size mismatch: expected {total_size}, got {actual_size}")
else:
print(f"Shared file not found: {shared_file_path}")
except Exception as e:
print(f"Error handling shared model file {shared_file_path}: {e}")
async def run(self) -> None:
"""Main client loop."""
if not await self.connect():
......@@ -383,7 +425,29 @@ class ClusterClient:
await self.websocket.close()
def start_cluster_client(host: str, port: int, token: str, optimize: bool = False, flash: bool = False, weight: int = 100) -> None:
def start_cluster_client(host: str, port: int, token: str, optimize: bool = False, flash: bool = False, weight: int = 100, shared_dir: str = None) -> None:
"""Start the cluster client."""
client = ClusterClient(host, port, token, optimize, flash, weight)
asyncio.run(client.run())
\ No newline at end of file
client = ClusterClient(host, port, token, optimize, flash, weight, shared_dir)
asyncio.run(client.run())
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description='VidAI Cluster Client')
parser.add_argument('host', help='Cluster master host')
parser.add_argument('token', help='Authentication token')
parser.add_argument('--port', type=int, default=5003, help='Cluster master port (default: 5003)')
parser.add_argument('--optimize', action='store_true', help='Optimize frame extraction')
parser.add_argument('--flash', action='store_true', help='Enable Flash Attention')
parser.add_argument('--weight', type=int, default=100, help='Client weight for job distribution (default: 100)')
parser.add_argument('--shared-dir', help='Shared directory for file transfers')
args = parser.parse_args()
print(f"Starting VidAI Cluster Client")
print(f"Connecting to {args.host}:{args.port}")
if args.shared_dir:
print(f"Using shared directory: {args.shared_dir}")
start_cluster_client(args.host, args.port, args.token, args.optimize, args.flash, args.weight, args.shared_dir)
\ No newline at end of file
......@@ -36,8 +36,9 @@ from collections import defaultdict
class ClusterMaster:
"""Master server for cluster coordination."""
def __init__(self, port: int = 5003):
def __init__(self, port: int = 5003, shared_dir: str = None):
self.port = port
self.shared_dir = shared_dir
self.server_sock: Optional[socket.socket] = None
self.clients = {} # type: Dict[str, Dict[str, Any]]
self.client_websockets = {} # type: Dict[str, websockets.WebSocketServerProtocol]
......@@ -309,41 +310,76 @@ class ClusterMaster:
return available_workers
async def send_model_to_client(self, client_id: str, model_path: str, model_data: bytes) -> bool:
"""Send model file to a client."""
"""Send model file to a client via websocket or shared directory."""
if client_id not in self.client_websockets:
return False
try:
# Send model data in chunks if needed
chunk_size = 1024 * 1024 # 1MB chunks
total_size = len(model_data)
# First send model info
await self.client_websockets[client_id].send(json.dumps({
'type': 'model_transfer_start',
'model_path': model_path,
'total_size': total_size
}))
# Send data in chunks
for i in range(0, total_size, chunk_size):
chunk = model_data[i:i + chunk_size]
# Check if shared directory is available and client supports it
client_info = self.clients.get(client_id, {})
client_shared_dir = client_info.get('info', {}).get('shared_dir')
if self.shared_dir and client_shared_dir:
# Use shared directory for file transfer
try:
# Ensure shared directory exists
os.makedirs(self.shared_dir, exist_ok=True)
# Create a unique filename for this transfer
import hashlib
transfer_id = hashlib.md5(f"{client_id}:{model_path}:{time.time()}".encode()).hexdigest()[:16]
shared_file_path = os.path.join(self.shared_dir, f"model_{transfer_id}.bin")
# Write model data to shared file
with open(shared_file_path, 'wb') as f:
f.write(model_data)
# Send file path to client
await self.client_websockets[client_id].send(json.dumps({
'type': 'model_chunk',
'offset': i,
'data': chunk.hex() # Send as hex string
'type': 'model_shared_file',
'model_path': model_path,
'shared_file_path': shared_file_path,
'total_size': len(model_data)
}))
# Send completion
await self.client_websockets[client_id].send(json.dumps({
'type': 'model_transfer_complete',
'model_path': model_path
}))
print(f"Model {model_path} placed in shared directory for client {client_id}: {shared_file_path}")
return True
return True
except Exception as e:
print(f"Failed to send model to client {client_id}: {e}")
return False
except Exception as e:
print(f"Failed to use shared directory for model transfer to client {client_id}: {e}")
# Fall back to websocket transfer
else:
# Use websocket transfer (original method)
try:
# Send model data in chunks if needed
chunk_size = 1024 * 1024 # 1MB chunks
total_size = len(model_data)
# First send model info
await self.client_websockets[client_id].send(json.dumps({
'type': 'model_transfer_start',
'model_path': model_path,
'total_size': total_size
}))
# Send data in chunks
for i in range(0, total_size, chunk_size):
chunk = model_data[i:i + chunk_size]
await self.client_websockets[client_id].send(json.dumps({
'type': 'model_chunk',
'offset': i,
'data': chunk.hex() # Send as hex string
}))
# Send completion
await self.client_websockets[client_id].send(json.dumps({
'type': 'model_transfer_complete',
'model_path': model_path
}))
return True
except Exception as e:
print(f"Failed to send model to client {client_id}: {e}")
return False
def load_model_file(self, model_path: str) -> Optional[bytes]:
"""Load model file from local storage or download if needed."""
......@@ -572,7 +608,28 @@ class ClusterMaster:
cluster_master = ClusterMaster()
def start_cluster_master(port: int = 5003) -> None:
def start_cluster_master(port: int = 5003, shared_dir: str = None) -> None:
"""Start the cluster master server."""
cluster_master.port = port
asyncio.run(cluster_master.start())
\ No newline at end of file
cluster_master.shared_dir = shared_dir
asyncio.run(cluster_master.start())
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description='VidAI Cluster Master')
parser.add_argument('--port', type=int, default=5003, help='Port to listen on (default: 5003)')
parser.add_argument('--shared-dir', help='Shared directory for file transfers')
args = parser.parse_args()
print(f"Starting VidAI Cluster Master on port {args.port}")
if args.shared_dir:
print(f"Using shared directory: {args.shared_dir}")
# Validate shared directory exists
if not os.path.exists(args.shared_dir):
print(f"Warning: Shared directory {args.shared_dir} does not exist. Creating it.")
os.makedirs(args.shared_dir, exist_ok=True)
start_cluster_master(args.port, args.shared_dir)
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment