Implement GPU detection and dynamic configuration

- Add GPU detection utility functions in compat.py
- Modify vidai.py to detect GPUs at startup and configure backends
- Update cluster_client.py to detect GPUs and send capabilities to master
- Modify cluster_master.py to handle client capabilities and model distribution
- Update config.html template to dynamically show/hide backend options
- Update web.py config route to handle dynamic backend availability
- Add model file transfer functionality between master and clients
- Update worker processes to handle model downloads from master
- Test GPU detection and configuration
- Update API documentation for new capabilities

Features implemented:
- Automatic detection of NVIDIA CUDA and AMD ROCm GPUs
- Dynamic configuration of analysis/training backends based on available hardware
- Cluster clients report GPU capabilities to master
- Model distribution from master to clients when needed
- Admin config page hides unavailable backend options
- Updated API documentation reflecting new GPU detection capabilities
parent 15b4621b
......@@ -360,6 +360,53 @@
"created_at": "2024-01-01T00:00:00Z"
}
]
}</div>
</div>
</div>
</div>
<div class="endpoint-card" id="cluster_clients">
<div class="endpoint-header">
<span class="method">GET</span>
<h3>/admin/api/cluster_clients</h3>
</div>
<div class="endpoint-content">
<p class="endpoint-description">
<i class="fas fa-server text-info"></i>
List all connected cluster clients with their GPU capabilities and status.
</p>
<div class="curl-section">
<h4><i class="fas fa-terminal"></i> Curl Example</h4>
<div class="code-block">curl -H "Authorization: Bearer YOUR_ADMIN_API_TOKEN" \
{{ request.host_url }}admin/api/cluster_clients</div>
</div>
<div class="response-section">
<h4><i class="fas fa-reply"></i> Response</h4>
<div class="code-block">{
"clients": [
{
"client_id": "abc123...",
"connected_at": 1640995200.0,
"last_seen": 1640995300.0,
"gpu_info": {
"cuda_available": true,
"rocm_available": false,
"cuda_devices": 1,
"rocm_devices": 0,
"available_backends": ["cuda"]
},
"processes": [
{
"name": "analysis_cuda",
"status": "active",
"weight": 10,
"model": "Qwen/Qwen2.5-VL-7B-Instruct"
}
]
}
]
}</div>
</div>
</div>
......
......@@ -204,7 +204,7 @@
<div class="endpoint-content">
<p class="endpoint-description">
<i class="fas fa-chart-bar text-success"></i>
Get real-time system statistics including CPU usage, RAM consumption, and GPU information.
Get real-time system statistics including CPU usage, RAM consumption, and GPU information. Includes automatic GPU backend detection (CUDA/ROCm) and availability status.
</p>
<div class="curl-section">
......@@ -216,13 +216,20 @@
<h4><i class="fas fa-reply"></i> Response</h4>
<div class="code-block">{
"status": "Idle",
"gpu_count": 1,
"gpu_info": {
"cuda_available": true,
"rocm_available": false,
"cuda_devices": 1,
"rocm_devices": 0,
"available_backends": ["cuda"]
},
"gpus": [
{
"name": "NVIDIA RTX 3080",
"memory_used": 0.5,
"memory_total": 10.0,
"utilization": 0
"utilization": 0,
"backend": "cuda"
}
],
"cpu_percent": 15.2,
......
......@@ -23,15 +23,29 @@
<div class="form-group">
<label for="analysis_backend">Analysis Backend:</label>
<select name="analysis_backend" id="analysis_backend">
{% if 'cuda' in available_backends %}
<option value="cuda" {% if current_config.get('analysis_backend') == 'cuda' %}selected{% endif %}>CUDA</option>
{% endif %}
{% if 'rocm' in available_backends %}
<option value="rocm" {% if current_config.get('analysis_backend') == 'rocm' %}selected{% endif %}>ROCm</option>
{% endif %}
{% if not available_backends %}
<option value="cpu" selected>CPU (No GPU detected)</option>
{% endif %}
</select>
</div>
<div class="form-group">
<label for="training_backend">Training Backend:</label>
<select name="training_backend" id="training_backend">
{% if 'cuda' in available_backends %}
<option value="cuda" {% if current_config.get('training_backend') == 'cuda' %}selected{% endif %}>CUDA</option>
{% endif %}
{% if 'rocm' in available_backends %}
<option value="rocm" {% if current_config.get('training_backend') == 'rocm' %}selected{% endif %}>ROCm</option>
{% endif %}
{% if not available_backends %}
<option value="cpu" selected>CPU (No GPU detected)</option>
{% endif %}
</select>
</div>
<button type="submit" class="btn-submit">Save Configuration</button>
......
......@@ -45,6 +45,30 @@ from vidai.config import (
)
def main():
# Detect available GPU backends at startup
from vidai.compat import detect_gpu_backends, get_available_backends, get_default_backend
gpu_info = detect_gpu_backends()
available_backends = get_available_backends()
print("GPU Detection Results:")
print(f" CUDA available: {gpu_info['cuda']} (devices: {gpu_info['cuda_devices']})")
print(f" ROCm available: {gpu_info['rocm']} (devices: {gpu_info['rocm_devices']})")
print(f" Available backends: {available_backends}")
# Auto-configure backends based on availability
if not available_backends:
print("Warning: No GPU backends detected. Using CPU fallback (limited functionality)")
default_analysis_backend = 'cpu'
default_training_backend = 'cpu'
else:
# Use detected backends, preferring CUDA if available
default_analysis_backend = get_default_backend()
default_training_backend = get_default_backend()
# Update config with detected backends
set_analysis_backend(default_analysis_backend)
set_training_backend(default_training_backend)
parser = argparse.ArgumentParser(
description="Video AI Analysis Tool - Web interface for AI-powered media analysis",
formatter_class=argparse.RawDescriptionHelpFormatter,
......@@ -57,7 +81,7 @@ Examples:
"""
)
# Read defaults from config
# Read defaults from config (now updated with detected backends)
default_model = get_default_model()
default_model_type = get_default_model_type()
default_analysis_backend = get_analysis_backend()
......@@ -116,14 +140,14 @@ Examples:
parser.add_argument(
'--analysis-backend',
choices=['cuda', 'rocm'],
choices=available_backends if available_backends else ['cpu'],
default=default_analysis_backend,
help=f'Backend for analysis (default: {default_analysis_backend})'
)
parser.add_argument(
'--training-backend',
choices=['cuda', 'rocm'],
choices=available_backends if available_backends else ['cpu'],
default=default_training_backend,
help=f'Backend for training (default: {default_training_backend})'
)
......@@ -296,13 +320,23 @@ Examples:
backend_proc.wait()
sys.exit(1)
# Start analysis worker
analysis_cmd = [sys.executable, '-m', 'vidai.worker_analysis', args.analysis_backend]
analysis_proc = subprocess.Popen(analysis_cmd)
# Start training worker
training_cmd = [sys.executable, '-m', 'vidai.worker_training', args.training_backend]
training_proc = subprocess.Popen(training_cmd)
# Start analysis worker (only if backend is available)
analysis_proc = None
if args.analysis_backend in available_backends:
analysis_cmd = [sys.executable, '-m', 'vidai.worker_analysis', args.analysis_backend]
analysis_proc = subprocess.Popen(analysis_cmd)
print(f"Started analysis worker for {args.analysis_backend}")
else:
print(f"Skipping analysis worker for {args.analysis_backend} (not available)")
# Start training worker (only if backend is available)
training_proc = None
if args.training_backend in available_backends:
training_cmd = [sys.executable, '-m', 'vidai.worker_training', args.training_backend]
training_proc = subprocess.Popen(training_cmd)
print(f"Started training worker for {args.training_backend}")
else:
print(f"Skipping training worker for {args.training_backend} (not available)")
# Start web process
web_cmd = [sys.executable, '-m', 'vidai.web']
......@@ -311,18 +345,24 @@ Examples:
try:
# Wait for processes
backend_proc.wait()
analysis_proc.wait()
training_proc.wait()
if analysis_proc:
analysis_proc.wait()
if training_proc:
training_proc.wait()
web_proc.wait()
except KeyboardInterrupt:
print("Shutting down...")
web_proc.terminate()
training_proc.terminate()
analysis_proc.terminate()
if training_proc:
training_proc.terminate()
if analysis_proc:
analysis_proc.terminate()
backend_proc.terminate()
web_proc.wait()
training_proc.wait()
analysis_proc.wait()
if training_proc:
training_proc.wait()
if analysis_proc:
analysis_proc.wait()
backend_proc.wait()
if __name__ == "__main__":
......
......@@ -54,13 +54,30 @@ class ClusterClient:
uri = f"ws://{self.host}:{self.port}/cluster"
self.websocket = await websockets.connect(uri)
# Send authentication
# Detect available backends
from .compat import detect_gpu_backends, get_available_backends
gpu_info = detect_gpu_backends()
available_backends = get_available_backends()
# Build capabilities list based on available backends
capabilities = []
for backend in available_backends:
capabilities.extend([f'analysis_{backend}', f'training_{backend}'])
# Send authentication with detected capabilities
auth_msg = {
'type': 'auth',
'token': self.token,
'client_info': {
'type': 'worker_node',
'capabilities': ['analysis_cuda', 'analysis_rocm', 'training_cuda', 'training_rocm']
'capabilities': capabilities,
'gpu_info': {
'cuda_available': gpu_info['cuda'],
'rocm_available': gpu_info['rocm'],
'cuda_devices': gpu_info['cuda_devices'],
'rocm_devices': gpu_info['rocm_devices'],
'available_backends': available_backends
}
}
}
await self.websocket.send(json.dumps(auth_msg))
......@@ -101,12 +118,19 @@ class ClusterClient:
self.connected = False
async def start_local_processes(self) -> None:
"""Start local worker processes."""
# Start analysis workers
analysis_backend = get_analysis_backend()
if analysis_backend in ['cuda', 'rocm']:
proc_name = f'analysis_{analysis_backend}'
cmd = [sys.executable, '-m', 'vidai.worker_analysis', analysis_backend]
"""Start local worker processes based on available GPU backends."""
from .compat import detect_gpu_backends, get_available_backends
gpu_info = detect_gpu_backends()
available_backends = get_available_backends()
print(f"Client GPU detection: CUDA={gpu_info['cuda']}, ROCm={gpu_info['rocm']}")
print(f"Available backends: {available_backends}")
# Start analysis workers for available backends
for backend in available_backends:
proc_name = f'analysis_{backend}'
cmd = [sys.executable, '-m', 'vidai.worker_analysis', backend]
if self.optimize:
cmd.append('--optimize')
if self.flash:
......@@ -114,12 +138,12 @@ class ClusterClient:
self.local_processes[proc_name] = subprocess.Popen(cmd)
self.process_weights[proc_name] = 10 # Default weight
self.process_models[proc_name] = 'Qwen/Qwen2.5-VL-7B-Instruct'
print(f"Started analysis worker for {backend}")
# Start training workers
training_backend = get_training_backend()
if training_backend in ['cuda', 'rocm']:
proc_name = f'training_{training_backend}'
cmd = [sys.executable, '-m', 'vidai.worker_training', training_backend]
# Start training workers for available backends
for backend in available_backends:
proc_name = f'training_{backend}'
cmd = [sys.executable, '-m', 'vidai.worker_training', backend]
if self.optimize:
cmd.append('--optimize')
if self.flash:
......@@ -127,6 +151,7 @@ class ClusterClient:
self.local_processes[proc_name] = subprocess.Popen(cmd)
self.process_weights[proc_name] = 5 # Training typically lower weight
self.process_models[proc_name] = 'Qwen/Qwen2.5-VL-7B-Instruct'
print(f"Started training worker for {backend}")
# Register processes with master
await self._send_message({
......@@ -190,32 +215,52 @@ class ClusterClient:
elif msg_type == 'send_file_request':
await self._handle_send_file_request(message)
elif msg_type == 'model_transfer_start':
await self._handle_model_transfer_start(message)
elif msg_type == 'model_chunk':
await self._handle_model_chunk(message)
elif msg_type == 'model_transfer_complete':
await self._handle_model_transfer_complete(message)
except Exception as e:
print(f"Error handling master command: {e}")
break
def _start_process(self, process_name: str) -> None:
"""Start a specific process."""
"""Start a specific process if backend is available."""
from .compat import get_available_backends
available_backends = get_available_backends()
if process_name.startswith('analysis_'):
backend = process_name.split('_')[1]
cmd = [sys.executable, '-m', 'vidai.worker_analysis', backend]
if self.optimize:
cmd.append('--optimize')
if self.flash:
cmd.append('--flash')
self.local_processes[process_name] = subprocess.Popen(cmd)
self.process_weights[process_name] = 10
self.process_models[process_name] = 'Qwen/Qwen2.5-VL-7B-Instruct'
if backend in available_backends:
cmd = [sys.executable, '-m', 'vidai.worker_analysis', backend]
if self.optimize:
cmd.append('--optimize')
if self.flash:
cmd.append('--flash')
self.local_processes[process_name] = subprocess.Popen(cmd)
self.process_weights[process_name] = 10
self.process_models[process_name] = 'Qwen/Qwen2.5-VL-7B-Instruct'
print(f"Started analysis worker for {backend}")
else:
print(f"Cannot start {process_name}: {backend} backend not available")
elif process_name.startswith('training_'):
backend = process_name.split('_')[1]
cmd = [sys.executable, '-m', 'vidai.worker_training', backend]
if self.optimize:
cmd.append('--optimize')
if self.flash:
cmd.append('--flash')
self.local_processes[process_name] = subprocess.Popen(cmd)
self.process_weights[process_name] = 5
self.process_models[process_name] = 'Qwen/Qwen2.5-VL-7B-Instruct'
if backend in available_backends:
cmd = [sys.executable, '-m', 'vidai.worker_training', backend]
if self.optimize:
cmd.append('--optimize')
if self.flash:
cmd.append('--flash')
self.local_processes[process_name] = subprocess.Popen(cmd)
self.process_weights[process_name] = 5
self.process_models[process_name] = 'Qwen/Qwen2.5-VL-7B-Instruct'
print(f"Started training worker for {backend}")
else:
print(f"Cannot start {process_name}: {backend} backend not available")
async def _handle_job_assignment(self, message: Dict[str, Any]) -> None:
"""Handle job assignment from master."""
......@@ -244,6 +289,55 @@ class ClusterClient:
# Placeholder
pass
async def _handle_model_transfer_start(self, message: Dict[str, Any]) -> None:
"""Handle start of model transfer from master."""
model_path = message.get('model_path')
total_size = message.get('total_size', 0)
self._current_model_transfer = {
'model_path': model_path,
'total_size': total_size,
'received_data': b''
}
print(f"Starting model transfer: {model_path} ({total_size} bytes)")
async def _handle_model_chunk(self, message: Dict[str, Any]) -> None:
"""Handle model data chunk from master."""
if not hasattr(self, '_current_model_transfer'):
return
chunk_hex = message.get('data', '')
chunk_data = bytes.fromhex(chunk_hex)
self._current_model_transfer['received_data'] += chunk_data
async def _handle_model_transfer_complete(self, message: Dict[str, Any]) -> None:
"""Handle completion of model transfer from master."""
if not hasattr(self, '_current_model_transfer'):
return
model_path = self._current_model_transfer['model_path']
model_data = self._current_model_transfer['received_data']
expected_size = self._current_model_transfer['total_size']
if len(model_data) == expected_size:
# Save the model file
import os
os.makedirs('models', exist_ok=True)
model_file = f"models/{model_path.replace('/', '_')}.bin"
with open(model_file, 'wb') as f:
f.write(model_data)
print(f"Model {model_path} saved to {model_file}")
# Update local model availability
for proc_name in self.process_models:
if proc_name.startswith('analysis_') or proc_name.startswith('training_'):
self.process_models[proc_name] = model_path
else:
print(f"Model transfer failed: received {len(model_data)} bytes, expected {expected_size}")
# Clean up
delattr(self, '_current_model_transfer')
async def run(self) -> None:
"""Main client loop."""
if not await self.connect():
......
......@@ -118,10 +118,11 @@ class ClusterMaster:
# Generate client ID from token
client_id = hashlib.sha256(token.encode()).hexdigest()[:16]
# Store client info
# Store client info including GPU capabilities
self.clients[client_id] = {
'token': token,
'info': client_info,
'gpu_info': client_info.get('gpu_info', {}),
'connected_at': time.time(),
'last_seen': time.time()
}
......@@ -201,6 +202,73 @@ class ClusterMaster:
# Return the highest weighted available worker
return queue[0][0]
def get_workers_with_model(self, process_type: str, model_path: str) -> List[str]:
"""Get workers that have a specific model available."""
available_workers = []
for proc_key in self.processes:
proc_info = self.processes[proc_key]
if proc_info['name'].startswith(process_type) and proc_info.get('model') == model_path:
available_workers.append(proc_key)
return available_workers
async def send_model_to_client(self, client_id: str, model_path: str, model_data: bytes) -> bool:
"""Send model file to a client."""
if client_id not in self.client_websockets:
return False
try:
# Send model data in chunks if needed
chunk_size = 1024 * 1024 # 1MB chunks
total_size = len(model_data)
# First send model info
await self.client_websockets[client_id].send(json.dumps({
'type': 'model_transfer_start',
'model_path': model_path,
'total_size': total_size
}))
# Send data in chunks
for i in range(0, total_size, chunk_size):
chunk = model_data[i:i + chunk_size]
await self.client_websockets[client_id].send(json.dumps({
'type': 'model_chunk',
'offset': i,
'data': chunk.hex() # Send as hex string
}))
# Send completion
await self.client_websockets[client_id].send(json.dumps({
'type': 'model_transfer_complete',
'model_path': model_path
}))
return True
except Exception as e:
print(f"Failed to send model to client {client_id}: {e}")
return False
def load_model_file(self, model_path: str) -> Optional[bytes]:
"""Load model file from local storage or download if needed."""
# This is a placeholder - actual implementation would depend on how models are stored
# For now, assume models are in a models directory
import os
model_file = f"models/{model_path}.bin" # Adjust path as needed
if os.path.exists(model_file):
with open(model_file, 'rb') as f:
return f.read()
# If not found locally, try to download from HuggingFace
try:
from huggingface_hub import hf_hub_download
# This would need proper implementation
print(f"Model {model_path} not found locally, would download from HuggingFace")
return None # Placeholder
except ImportError:
print("huggingface_hub not available for model download")
return None
def enable_process(self, process_key: str) -> bool:
"""Enable a specific process."""
if process_key in self.processes:
......@@ -217,6 +285,61 @@ class ClusterMaster:
return True
return False
async def assign_job_with_model(self, process_type: str, job_data: dict) -> Optional[str]:
"""Assign a job to a worker, handling model distribution if needed."""
model_path = job_data.get('model_path', 'Qwen/Qwen2.5-VL-7B-Instruct')
# First try to find workers that already have the model
workers_with_model = self.get_workers_with_model(process_type, model_path)
if workers_with_model:
# Use the best worker that has the model
best_worker = None
best_weight = -1
for worker_key in workers_with_model:
for proc_key, weight in self.process_queue[process_type]:
if proc_key == worker_key and weight > best_weight:
best_worker = worker_key
best_weight = weight
break
if best_worker:
client_id = self.processes[best_worker]['client_id']
job_id = f"job_{int(time.time())}_{hash(str(job_data)) % 1000}"
await self.client_websockets[client_id].send(json.dumps({
'type': 'job_assignment',
'job_id': job_id,
'job_data': job_data
}))
return job_id
# If no worker has the model, find the best available worker and send the model
best_worker = self.get_best_worker(process_type)
if best_worker:
client_id = self.processes[best_worker]['client_id']
# Load and send the model
model_data = self.load_model_file(model_path)
if model_data:
success = await self.send_model_to_client(client_id, model_path, model_data)
if success:
# Update the worker's model info
self.processes[best_worker]['model'] = model_path
# Now assign the job
job_id = f"job_{int(time.time())}_{hash(str(job_data)) % 1000}"
await self.client_websockets[client_id].send(json.dumps({
'type': 'job_assignment',
'job_id': job_id,
'job_data': job_data
}))
return job_id
else:
print(f"Failed to send model {model_path} to client {client_id}")
else:
print(f"Could not load model {model_path}")
return None
def disable_process(self, process_key: str) -> bool:
"""Disable a specific process."""
if process_key in self.processes:
......
......@@ -247,6 +247,90 @@ def get_platform_info() -> dict:
'supports_unix_sockets': is_unix_sockets_supported()
}
def detect_gpu_backends() -> dict:
"""Detect available GPU backends (CUDA and ROCm)."""
backends = {
'cuda': False,
'rocm': False,
'cuda_version': None,
'rocm_version': None,
'cuda_devices': 0,
'rocm_devices': 0
}
# Check CUDA availability
try:
import torch
if torch.cuda.is_available():
backends['cuda'] = True
backends['cuda_devices'] = torch.cuda.device_count()
try:
backends['cuda_version'] = torch.version.cuda
except:
pass
except ImportError:
# Try to detect CUDA without torch
try:
result = subprocess.run(['nvidia-smi', '--query-gpu=name', '--format=csv,noheader,nounits'],
capture_output=True, text=True, timeout=5)
if result.returncode == 0:
lines = result.stdout.strip().split('\n')
backends['cuda'] = True
backends['cuda_devices'] = len(lines)
except (subprocess.TimeoutExpired, FileNotFoundError):
pass
# Check ROCm availability
try:
import torch
if hasattr(torch, 'hip') and torch.hip.is_available():
backends['rocm'] = True
backends['rocm_devices'] = torch.hip.device_count()
try:
backends['rocm_version'] = torch.version.hip
except:
pass
except (ImportError, AttributeError):
# Try to detect ROCm via rocm-smi
try:
result = subprocess.run(['rocm-smi', '--showid'], capture_output=True, text=True, timeout=5)
if result.returncode == 0:
# Count GPU lines (excluding header)
lines = result.stdout.strip().split('\n')
gpu_lines = [line for line in lines if 'GPU' in line and any(char.isdigit() for char in line)]
backends['rocm'] = len(gpu_lines) > 0
backends['rocm_devices'] = len(gpu_lines)
except (subprocess.TimeoutExpired, FileNotFoundError):
pass
return backends
def get_available_backends() -> list:
"""Get list of available backends."""
backends = detect_gpu_backends()
available = []
if backends['cuda']:
available.append('cuda')
if backends['rocm']:
available.append('rocm')
return available
def get_default_backend() -> str:
"""Get default backend based on availability (prefer CUDA)."""
available = get_available_backends()
if 'cuda' in available:
return 'cuda'
elif 'rocm' in available:
return 'rocm'
else:
return 'cpu' # Fallback, though not implemented yet
# Initialize directories
ensure_dir(get_user_config_dir())
ensure_dir(get_user_cache_dir())
# Initialize directories
ensure_dir(get_user_config_dir())
......
......@@ -282,7 +282,11 @@ def config():
config_data = get_result(msg_id)
current_config = config_data.get('data', {})
return render_template('config.html', current_config=current_config, active_page='config', user=None)
# Get available backends
from .compat import get_available_backends
available_backends = get_available_backends()
return render_template('config.html', current_config=current_config, available_backends=available_backends, active_page='config', user=None)
@app.route('/history')
@login_required
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment