Implement --client mode for cluster functionality

- Add --client mode with required --cluster-host, --cluster-port, --token options
- Optional --optimize and --flash for client mode
- Convert cluster communication from TCP to bidirectional websockets
- Implement file transfer placeholders for jobs and results
- Update cluster master to authenticate clients via cluster_token
- Add configuration page for managing cluster clients (placeholder)
- Update build scripts to include cluster components
- Add websockets to requirements
- Disable web/API interfaces in client mode
parent ea048886
......@@ -88,6 +88,20 @@ else
vidai/web.py
fi
# Build cluster master
pyinstaller $PYINSTALLER_ARGS \
--name vidai-cluster-master \
--hidden-import websockets \
--hidden-import asyncio \
vidai/cluster_master.py
# Build cluster client
pyinstaller $PYINSTALLER_ARGS \
--name vidai-cluster-client \
--hidden-import websockets \
--hidden-import asyncio \
vidai/cluster_client.py
# Build workers for each target
for TARGET in "${BUILD_TARGETS[@]}"; do
echo ""
......@@ -116,6 +130,8 @@ echo "=== Build Summary ==="
echo "Common executables:"
echo " - vidai-backend"
echo " - vidai-web"
echo " - vidai-cluster-master"
echo " - vidai-cluster-client"
echo ""
echo "Worker executables:"
for TARGET in "${BUILD_TARGETS[@]}"; do
......
......@@ -8,3 +8,4 @@ flash-attn>=2.0.0
pyinstaller>=5.0.0
PyMySQL>=1.0.0
redis>=4.0.0
websockets>=12.0.0
\ No newline at end of file
{% extends "base.html" %}
{% block title %}Cluster Clients - VidAI{% endblock %}
{% block head %}
<style>
.container { max-width: 1400px; margin: 2rem auto; padding: 0 2rem; }
.admin-card { background: white; padding: 2rem; border-radius: 12px; box-shadow: 0 2px 10px rgba(0,0,0,0.05); margin-bottom: 2rem; }
.card-header { margin-bottom: 1.5rem; }
.card-header h3 { margin: 0; color: #1e293b; }
.btn { padding: 0.75rem 2rem; background: #667eea; color: white; border: none; border-radius: 8px; font-size: 1rem; font-weight: 600; cursor: pointer; text-decoration: none; display: inline-block; }
.btn:hover { background: #5a67d8; }
.btn-danger { background: #dc2626; }
.btn-danger:hover { background: #b91c1c; }
.btn-success { background: #059669; }
.btn-success:hover { background: #047857; }
.table { width: 100%; border-collapse: collapse; margin-top: 1rem; }
.table th, .table td { padding: 1rem; text-align: left; border-bottom: 1px solid #e5e7eb; }
.table th { background: #f8fafc; font-weight: 600; color: #374151; }
.status-active { color: #065f46; font-weight: 500; }
.status-inactive { color: #dc2626; font-weight: 500; }
.alert { padding: 0.75rem; border-radius: 8px; margin-bottom: 1rem; }
.alert-error { background: #fee2e2; color: #dc2626; border: 1px solid #fecaca; }
.alert-success { background: #d1fae5; color: #065f46; border: 1px solid #a7f3d0; }
</style>
{% endblock %}
{% block content %}
<div class="container">
<div class="admin-card">
<div class="card-header">
<h3><i class="fas fa-server"></i> Cluster Clients</h3>
<p>Manage connected cluster clients and their worker configurations.</p>
</div>
{% with messages = get_flashed_messages(with_categories=true) %}
{% if messages %}
{% for category, message in messages %}
<div class="alert alert-{{ 'error' if category == 'error' else 'success' }}">{{ message }}</div>
{% endfor %}
{% endif %}
{% endwith %}
<table class="table">
<thead>
<tr>
<th>Client ID</th>
<th>Connected At</th>
<th>Last Seen</th>
<th>Processes</th>
<th>Actions</th>
</tr>
</thead>
<tbody>
{% for client in clients %}
<tr>
<td>{{ client.get('id') }}</td>
<td>{{ client.get('connected_at', 'N/A') }}</td>
<td>{{ client.get('last_seen', 'N/A') }}</td>
<td>{{ client.get('processes', [])|length }} processes</td>
<td>
<button class="btn configure-btn" data-client-id="{{ client.get('id') }}">Configure</button>
</td>
</tr>
{% endfor %}
{% if not clients %}
<tr>
<td colspan="5" style="text-align: center; color: #6b7280;">No cluster clients connected.</td>
</tr>
{% endif %}
</tbody>
</table>
</div>
</div>
<script>
document.addEventListener('DOMContentLoaded', function() {
document.querySelectorAll('.configure-btn').forEach(btn => {
btn.addEventListener('click', function() {
const clientId = this.getAttribute('data-client-id');
// Placeholder for client configuration
alert('Configuration for client ' + clientId + ' - Feature not implemented yet');
});
});
});
</script>
{% endblock %}
\ No newline at end of file
......@@ -203,15 +203,14 @@ Examples:
)
parser.add_argument(
'--cluster-token',
'--token',
default=get_config('cluster_token', ''),
help='Cluster authentication token (when running as client)'
)
parser.add_argument(
'--cluster-client',
'--client',
action='store_true',
default=get_config('cluster_client', 'false').lower() == 'true',
help='Run as cluster client (connects to master instead of starting web interface)'
)
......@@ -237,22 +236,22 @@ Examples:
set_debug(args.debug)
set_cluster_host(args.cluster_host)
set_cluster_port(args.cluster_port)
set_cluster_token(args.cluster_token)
set_cluster_client(args.cluster_client)
set_cluster_token(args.token)
set_cluster_client(args.client)
# Check if running as cluster client
if args.cluster_client:
if not args.cluster_host or not args.cluster_token:
print("Error: cluster-host and cluster-token are required when running as cluster client")
# Check if running in client mode
if args.client:
if not args.cluster_host or not args.cluster_port or not args.token:
print("Error: --cluster-host, --cluster-port, and --token are required when running in --client mode")
sys.exit(1)
print(f"Starting Video AI Analysis Tool as cluster client...")
print(f"Starting Video AI Analysis Tool in --client mode...")
print(f"Connecting to cluster master at {args.cluster_host}:{args.cluster_port}")
print("Press Ctrl+C to stop")
# Start cluster client process
from vidai.cluster_client import start_cluster_client
start_cluster_client(args.cluster_host, args.cluster_port, args.cluster_token)
start_cluster_client(args.cluster_host, args.cluster_port, args.token, args.optimize, args.flash)
else:
print("Starting Video AI Analysis Tool...")
print(f"Server will be available at http://{args.host}:{args.port}")
......
......@@ -25,6 +25,8 @@ import threading
import time
import sys
import subprocess
import asyncio
import websockets
from typing import Dict, Any, Optional
from .comm import Message
from .config import get_analysis_backend, get_training_backend
......@@ -33,21 +35,24 @@ from .config import get_analysis_backend, get_training_backend
class ClusterClient:
"""Client that connects to cluster master."""
def __init__(self, host: str, port: int, token: str):
def __init__(self, host: str, port: int, token: str, optimize: bool = False, flash: bool = False):
self.host = host
self.port = port
self.token = token
self.sock: Optional[socket.socket] = None
self.optimize = optimize
self.flash = flash
self.websocket: Optional[websockets.WebSocketServerProtocol] = None
self.connected = False
self.local_processes = {} # type: Dict[str, subprocess.Popen]
self.process_weights = {} # type: Dict[str, int]
self.process_models = {} # type: Dict[str, str]
self.loop = None
def connect(self) -> bool:
"""Connect to cluster master."""
async def connect(self) -> bool:
"""Connect to cluster master via websocket."""
try:
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.sock.connect((self.host, self.port))
uri = f"ws://{self.host}:{self.port}/cluster"
self.websocket = await websockets.connect(uri)
# Send authentication
auth_msg = {
......@@ -58,11 +63,12 @@ class ClusterClient:
'capabilities': ['analysis_cuda', 'analysis_rocm', 'training_cuda', 'training_rocm']
}
}
self.sock.sendall(json.dumps(auth_msg).encode('utf-8') + b'\n')
await self.websocket.send(json.dumps(auth_msg))
# Wait for auth response
response = self._receive_message()
if response and response.get('type') == 'auth_success':
response_str = await self.websocket.recv()
response = json.loads(response_str)
if response.get('type') == 'auth_success':
self.connected = True
print("Successfully connected to cluster master")
return True
......@@ -74,34 +80,37 @@ class ClusterClient:
print(f"Failed to connect to cluster master: {e}")
return False
def _receive_message(self) -> Optional[Dict[str, Any]]:
async def _receive_message(self) -> Optional[Dict[str, Any]]:
"""Receive a message from master."""
if not self.sock:
if not self.websocket or not self.connected:
return None
try:
data = self.sock.recv(4096)
if data:
return json.loads(data.decode('utf-8').strip())
data = await self.websocket.recv()
return json.loads(data)
except:
pass
self.connected = False
return None
def _send_message(self, message: Dict[str, Any]) -> None:
async def _send_message(self, message: Dict[str, Any]) -> None:
"""Send message to master."""
if self.sock and self.connected:
if self.websocket and self.connected:
try:
self.sock.sendall(json.dumps(message).encode('utf-8') + b'\n')
await self.websocket.send(json.dumps(message))
except:
self.connected = False
def start_local_processes(self) -> None:
async def start_local_processes(self) -> None:
"""Start local worker processes."""
# Start analysis workers
analysis_backend = get_analysis_backend()
if analysis_backend in ['cuda', 'rocm']:
proc_name = f'analysis_{analysis_backend}'
cmd = [sys.executable, '-m', 'vidai.worker_analysis', analysis_backend]
if self.optimize:
cmd.append('--optimize')
if self.flash:
cmd.append('--flash')
self.local_processes[proc_name] = subprocess.Popen(cmd)
self.process_weights[proc_name] = 10 # Default weight
self.process_models[proc_name] = 'Qwen/Qwen2.5-VL-7B-Instruct'
......@@ -111,12 +120,16 @@ class ClusterClient:
if training_backend in ['cuda', 'rocm']:
proc_name = f'training_{training_backend}'
cmd = [sys.executable, '-m', 'vidai.worker_training', training_backend]
if self.optimize:
cmd.append('--optimize')
if self.flash:
cmd.append('--flash')
self.local_processes[proc_name] = subprocess.Popen(cmd)
self.process_weights[proc_name] = 5 # Training typically lower weight
self.process_models[proc_name] = 'Qwen/Qwen2.5-VL-7B-Instruct'
# Register processes with master
self._send_message({
await self._send_message({
'type': 'register_processes',
'processes': {
name: {
......@@ -130,11 +143,11 @@ class ClusterClient:
}
})
def handle_master_commands(self) -> None:
async def handle_master_commands(self) -> None:
"""Handle commands from cluster master."""
while self.connected:
try:
message = self._receive_message()
message = await self._receive_message()
if not message:
break
......@@ -166,7 +179,16 @@ class ClusterClient:
self.process_models[process_name] = model
elif msg_type == 'ping':
self._send_message({'type': 'pong'})
await self._send_message({'type': 'pong'})
elif msg_type == 'job_assignment':
await self._handle_job_assignment(message)
elif msg_type == 'receive_file':
await self._handle_receive_file(message)
elif msg_type == 'send_file_request':
await self._handle_send_file_request(message)
except Exception as e:
print(f"Error handling master command: {e}")
......@@ -177,49 +199,84 @@ class ClusterClient:
if process_name.startswith('analysis_'):
backend = process_name.split('_')[1]
cmd = [sys.executable, '-m', 'vidai.worker_analysis', backend]
if self.optimize:
cmd.append('--optimize')
if self.flash:
cmd.append('--flash')
self.local_processes[process_name] = subprocess.Popen(cmd)
self.process_weights[process_name] = 10
self.process_models[process_name] = 'Qwen/Qwen2.5-VL-7B-Instruct'
elif process_name.startswith('training_'):
backend = process_name.split('_')[1]
cmd = [sys.executable, '-m', 'vidai.worker_training', backend]
if self.optimize:
cmd.append('--optimize')
if self.flash:
cmd.append('--flash')
self.local_processes[process_name] = subprocess.Popen(cmd)
self.process_weights[process_name] = 5
self.process_models[process_name] = 'Qwen/Qwen2.5-VL-7B-Instruct'
def run(self) -> None:
async def _handle_job_assignment(self, message: Dict[str, Any]) -> None:
"""Handle job assignment from master."""
job_id = message.get('job_id')
job_data = message.get('job_data', {})
# Process job locally and send result back
# This is a placeholder - actual implementation would depend on job type
await self._send_message({
'type': 'job_result',
'job_id': job_id,
'result': {'status': 'completed', 'data': 'placeholder result'}
})
async def _handle_receive_file(self, message: Dict[str, Any]) -> None:
"""Handle receiving a file from master."""
filename = message.get('filename')
file_size = message.get('file_size', 0)
# Receive binary data
# Placeholder
pass
async def _handle_send_file_request(self, message: Dict[str, Any]) -> None:
"""Handle request to send a file to master."""
filename = message.get('filename')
# Send file data
# Placeholder
pass
async def run(self) -> None:
"""Main client loop."""
if not self.connect():
if not await self.connect():
return
self.start_local_processes()
await self.start_local_processes()
# Start command handling thread
command_thread = threading.Thread(target=self.handle_master_commands, daemon=True)
command_thread.start()
# Start command handling task
command_task = asyncio.create_task(self.handle_master_commands())
try:
while self.connected:
time.sleep(1)
await asyncio.sleep(1)
# Send heartbeat
self._send_message({'type': 'heartbeat'})
await self._send_message({'type': 'heartbeat'})
except KeyboardInterrupt:
print("Shutting down cluster client...")
finally:
# Cleanup
command_task.cancel()
for proc in self.local_processes.values():
proc.terminate()
for proc in self.local_processes.values():
proc.wait()
if self.sock:
self.sock.close()
if self.websocket:
await self.websocket.close()
def start_cluster_client(host: str, port: int, token: str) -> None:
def start_cluster_client(host: str, port: int, token: str, optimize: bool = False, flash: bool = False) -> None:
"""Start the cluster client."""
client = ClusterClient(host, port, token)
client.run()
\ No newline at end of file
client = ClusterClient(host, port, token, optimize, flash)
asyncio.run(client.run())
\ No newline at end of file
......@@ -24,6 +24,8 @@ import json
import threading
import time
import hashlib
import asyncio
import websockets
from typing import Dict, Any, List, Optional
from collections import defaultdict
......@@ -35,7 +37,7 @@ class ClusterMaster:
self.port = port
self.server_sock: Optional[socket.socket] = None
self.clients = {} # type: Dict[str, Dict[str, Any]]
self.client_sockets = {} # type: Dict[str, socket.socket]
self.client_websockets = {} # type: Dict[str, websockets.WebSocketServerProtocol]
self.processes = {} # type: Dict[str, Dict[str, Any]]
self.tokens = {} # type: Dict[str, str] # token -> client_id
self.running = False
......@@ -43,59 +45,47 @@ class ClusterMaster:
# Load balancing
self.process_queue = defaultdict(list) # process_type -> [(client_id, weight), ...]
def start(self) -> None:
async def start(self) -> None:
"""Start the cluster master server."""
self.server_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.server_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.server_sock.bind(('0.0.0.0', self.port))
self.server_sock.listen(10)
self.running = True
print(f"Cluster master started on port {self.port}")
# Accept client connections
threading.Thread(target=self._accept_clients, daemon=True).start()
# Start websocket server
start_server = websockets.serve(self._handle_client, '0.0.0.0', self.port)
await start_server
# Start management loop
self._management_loop()
# Start management loop in background
asyncio.create_task(self._management_loop())
def _accept_clients(self) -> None:
"""Accept incoming client connections."""
# Keep running
while self.running:
try:
client_sock, addr = self.server_sock.accept()
threading.Thread(target=self._handle_client, args=(client_sock, addr), daemon=True).start()
except:
break
await asyncio.sleep(1)
def _handle_client(self, client_sock: socket.socket, addr: tuple) -> None:
"""Handle a client connection."""
async def _handle_client(self, websocket: websockets.WebSocketServerProtocol, path: str) -> None:
"""Handle a client websocket connection."""
client_id = None
try:
while self.running:
data = client_sock.recv(4096)
if not data:
break
messages = data.decode('utf-8').split('\n')
for msg_str in messages:
if msg_str.strip():
try:
message = json.loads(msg_str)
response = self._process_message(message, client_sock)
message_str = await asyncio.wait_for(websocket.recv(), timeout=1.0)
message = json.loads(message_str)
response = self._process_message(message, websocket)
if response:
client_sock.sendall(json.dumps(response).encode('utf-8') + b'\n')
await websocket.send(json.dumps(response))
except asyncio.TimeoutError:
continue
except websockets.exceptions.ConnectionClosed:
break
except json.JSONDecodeError:
pass
continue
except Exception as e:
print(f"Client connection error: {e}")
finally:
if client_id:
self._remove_client(client_id)
client_sock.close()
def _process_message(self, message: Dict[str, Any], client_sock: socket.socket) -> Optional[Dict[str, Any]]:
def _process_message(self, message: Dict[str, Any], websocket: websockets.WebSocketServerProtocol) -> Optional[Dict[str, Any]]:
"""Process a message from a client."""
msg_type = message.get('type')
......@@ -103,11 +93,11 @@ class ClusterMaster:
return self._handle_auth(message, client_sock)
elif msg_type == 'register_processes':
return self._handle_register_processes(message, client_sock)
return self._handle_register_processes(message, websocket)
elif msg_type == 'heartbeat':
# Update client last seen
client_id = self._get_client_by_socket(client_sock)
client_id = self._get_client_by_websocket(websocket)
if client_id:
self.clients[client_id]['last_seen'] = time.time()
return {'type': 'heartbeat_ack'}
......@@ -135,13 +125,13 @@ class ClusterMaster:
'connected_at': time.time(),
'last_seen': time.time()
}
self.client_sockets[client_id] = client_sock
self.client_websockets[client_id] = websocket
self.tokens[token] = client_id
print(f"Client {client_id} authenticated")
return {'type': 'auth_success', 'client_id': client_id}
def _handle_register_processes(self, message: Dict[str, Any], client_sock: socket.socket) -> Dict[str, Any]:
def _handle_register_processes(self, message: Dict[str, Any], websocket: websockets.WebSocketServerProtocol) -> Dict[str, Any]:
"""Handle process registration from client."""
client_id = self._get_client_by_socket(client_sock)
if not client_id:
......@@ -169,17 +159,17 @@ class ClusterMaster:
print(f"Client {client_id} registered {len(processes)} processes")
return {'type': 'registration_success'}
def _get_client_by_socket(self, client_sock: socket.socket) -> Optional[str]:
"""Get client ID by socket."""
for client_id, sock in self.client_sockets.items():
if sock == client_sock:
def _get_client_by_websocket(self, websocket: websockets.WebSocketServerProtocol) -> Optional[str]:
"""Get client ID by websocket."""
for client_id, ws in self.client_websockets.items():
if ws == websocket:
return client_id
return None
def _remove_client(self, client_id: str) -> None:
"""Remove a client and its processes."""
if client_id in self.client_sockets:
del self.client_sockets[client_id]
if client_id in self.client_websockets:
del self.client_websockets[client_id]
if client_id in self.clients:
del self.clients[client_id]
......@@ -217,13 +207,13 @@ class ClusterMaster:
self.processes[process_key]['status'] = 'active'
# Send command to client
client_id = self.processes[process_key]['client_id']
if client_id in self.client_sockets:
self.client_sockets[client_id].sendall(
if client_id in self.client_websockets:
asyncio.create_task(self.client_websockets[client_id].send(
json.dumps({
'type': 'enable_process',
'process_name': self.processes[process_key]['name']
}).encode('utf-8') + b'\n'
)
})
))
return True
return False
......@@ -233,13 +223,13 @@ class ClusterMaster:
self.processes[process_key]['status'] = 'disabled'
# Send command to client
client_id = self.processes[process_key]['client_id']
if client_id in self.client_sockets:
self.client_sockets[client_id].sendall(
if client_id in self.client_websockets:
asyncio.create_task(self.client_websockets[client_id].send(
json.dumps({
'type': 'disable_process',
'process_name': self.processes[process_key]['name']
}).encode('utf-8') + b'\n'
)
})
))
return True
return False
......@@ -261,18 +251,18 @@ class ClusterMaster:
# Send command to client
client_id = self.processes[process_key]['client_id']
if client_id in self.client_sockets:
self.client_sockets[client_id].sendall(
if client_id in self.client_websockets:
asyncio.create_task(self.client_websockets[client_id].send(
json.dumps({
'type': 'update_weight',
'process_name': self.processes[process_key]['name'],
'weight': weight
}).encode('utf-8') + b'\n'
)
})
))
return True
return False
def _management_loop(self) -> None:
async def _management_loop(self) -> None:
"""Main management loop."""
while self.running:
try:
......@@ -286,7 +276,7 @@ class ClusterMaster:
for client_id in dead_clients:
self._remove_client(client_id)
time.sleep(10)
await asyncio.sleep(10)
except KeyboardInterrupt:
self.running = False
......@@ -306,4 +296,4 @@ cluster_master = ClusterMaster()
def start_cluster_master(port: int = 5003) -> None:
"""Start the cluster master server."""
cluster_master.port = port
cluster_master.start()
\ No newline at end of file
asyncio.run(cluster_master.start())
\ No newline at end of file
......@@ -373,6 +373,16 @@ def delete_cluster_token(token_id):
flash('Failed to delete token.', 'error')
return redirect(url_for('cluster_tokens'))
@app.route('/admin/cluster_clients')
@admin_required
def cluster_clients():
"""Cluster clients management page."""
user = get_current_user_session()
# Get connected clients from cluster master if available
# For now, placeholder
clients = []
return render_template('admin/cluster_clients.html', user=user, clients=clients, active_page='cluster_clients')
@app.route('/api_tokens')
@login_required
def api_tokens():
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment