Implement advanced job scheduling workflow for analyze interface

- Add VRAM requirement estimation for models
- Implement intelligent worker selection based on VRAM, weight, and load
- Add support for concurrent job execution with VRAM tracking
- Integrate RunPod pod creation and management as fallback
- Implement model loading/unloading logic to avoid redundant transfers
- Add file transfer handling for shared storage and websocket fallback
- Update queue system to use advanced cluster scheduling
- Add job_id column to processing_queue table
- Update web interface to submit jobs through queue system
- Fix function name references in cluster master
parent d370350d
...@@ -335,7 +335,7 @@ class ClusterMaster: ...@@ -335,7 +335,7 @@ class ClusterMaster:
def get_best_worker_for_job(self, process_type: str, model_path: str, job_data: dict = None) -> Optional[str]: def get_best_worker_for_job(self, process_type: str, model_path: str, job_data: dict = None) -> Optional[str]:
"""Get the best worker for a job considering VRAM requirements, weight, and current load.""" """Get the best worker for a job considering VRAM requirements, weight, and current load."""
from .models import get_model_vram_requirement from .models import estimate_model_vram_requirements
if process_type not in self.process_queue: if process_type not in self.process_queue:
return None return None
...@@ -345,7 +345,7 @@ class ClusterMaster: ...@@ -345,7 +345,7 @@ class ClusterMaster:
return None return None
# Get VRAM requirement for the model # Get VRAM requirement for the model
vram_required = get_model_vram_requirement(model_path) vram_required = estimate_model_vram_requirements(model_path)
# Find workers that can handle this job # Find workers that can handle this job
candidate_workers = [] candidate_workers = []
...@@ -420,7 +420,7 @@ class ClusterMaster: ...@@ -420,7 +420,7 @@ class ClusterMaster:
async def assign_job_to_worker(self, worker_key: str, job_data: dict) -> Optional[str]: async def assign_job_to_worker(self, worker_key: str, job_data: dict) -> Optional[str]:
"""Assign a job to a worker and handle file/model transfer.""" """Assign a job to a worker and handle file/model transfer."""
from .models import get_model_vram_requirement from .models import estimate_model_vram_requirements
import uuid import uuid
client_id = self.processes[worker_key]['client_id'] client_id = self.processes[worker_key]['client_id']
...@@ -448,7 +448,7 @@ class ClusterMaster: ...@@ -448,7 +448,7 @@ class ClusterMaster:
return None return None
# Track the job # Track the job
vram_required = get_model_vram_requirement(model_path) vram_required = estimate_model_vram_requirements(model_path)
self.active_jobs[job_id] = { self.active_jobs[job_id] = {
'worker_key': worker_key, 'worker_key': worker_key,
'client_id': client_id, 'client_id': client_id,
...@@ -718,7 +718,7 @@ class ClusterMaster: ...@@ -718,7 +718,7 @@ class ClusterMaster:
async def assign_job_with_model(self, process_type: str, job_data: dict) -> Optional[str]: async def assign_job_with_model(self, process_type: str, job_data: dict) -> Optional[str]:
"""Assign a job to a worker with advanced scheduling logic.""" """Assign a job to a worker with advanced scheduling logic."""
from .models import get_model_vram_requirement from .models import estimate_model_vram_requirements
model_path = job_data.get('model_path', 'Qwen/Qwen2.5-VL-7B-Instruct') model_path = job_data.get('model_path', 'Qwen/Qwen2.5-VL-7B-Instruct')
# Step 1: Try to find workers that already have the model loaded # Step 1: Try to find workers that already have the model loaded
...@@ -728,7 +728,7 @@ class ClusterMaster: ...@@ -728,7 +728,7 @@ class ClusterMaster:
suitable_workers = [] suitable_workers = []
for worker_key in workers_with_model: for worker_key in workers_with_model:
available_vram = self._get_worker_available_vram(worker_key) available_vram = self._get_worker_available_vram(worker_key)
vram_required = get_model_vram_requirement(model_path) vram_required = estimate_model_vram_requirements(model_path)
current_jobs = len(self.worker_jobs[worker_key]) current_jobs = len(self.worker_jobs[worker_key])
max_concurrent = self._get_worker_max_concurrent_jobs(worker_key) max_concurrent = self._get_worker_max_concurrent_jobs(worker_key)
...@@ -770,7 +770,7 @@ class ClusterMaster: ...@@ -770,7 +770,7 @@ class ClusterMaster:
for proc_key in self.processes: for proc_key in self.processes:
if self.processes[proc_key]['name'].startswith(process_type): if self.processes[proc_key]['name'].startswith(process_type):
available_vram = self._get_worker_available_vram(proc_key) available_vram = self._get_worker_available_vram(proc_key)
vram_required = get_model_vram_requirement(model_path) vram_required = estimate_model_vram_requirements(model_path)
current_jobs = len(self.worker_jobs[proc_key]) current_jobs = len(self.worker_jobs[proc_key])
max_concurrent = self._get_worker_max_concurrent_jobs(proc_key) max_concurrent = self._get_worker_max_concurrent_jobs(proc_key)
...@@ -851,6 +851,70 @@ class ClusterMaster: ...@@ -851,6 +851,70 @@ class ClusterMaster:
return None return None
def select_worker_for_job(self, process_type: str, model_path: str, job_data: dict) -> Optional[str]:
"""Advanced worker selection based on VRAM requirements and job scheduling logic."""
from .models import estimate_model_vram_requirements
# Step 1: Determine VRAM required for the model
required_vram_gb = estimate_model_vram_requirements(model_path)
# Step 2: Determine workers with sufficient GPU memory
available_workers = []
concurrent_workers = []
for proc_key, proc_weight in self.process_queue.get(process_type, []):
client_id = self.processes[proc_key]['client_id']
client_info = self.clients[client_id]
gpu_info = client_info['gpu_info']
# Get total available VRAM
total_vram = 0
if gpu_info.get('cuda_available'):
cuda_devices = gpu_info.get('cuda_device_info', [])
total_vram += sum(device.get('vram_gb', 8) for device in cuda_devices)
if gpu_info.get('rocm_available'):
rocm_devices = gpu_info.get('rocm_device_info', [])
total_vram += sum(device.get('vram_gb', 16) for device in rocm_devices)
# Check if worker has enough VRAM
if total_vram >= required_vram_gb:
available_workers.append((proc_key, client_info['weight'], total_vram))
if not available_workers:
return None
# Step 3: Among workers with sufficient memory, select by weight and job status
# First priority: workers with higher weight and no running jobs
no_job_workers = []
for proc_key, weight, vram in available_workers:
# Check if worker has running jobs
current_jobs = len(self.worker_jobs[proc_key])
if current_jobs == 0:
no_job_workers.append((proc_key, weight, vram))
if no_job_workers:
# Sort by weight (highest first), then by available VRAM (highest first)
no_job_workers.sort(key=lambda x: (x[1], x[2]), reverse=True)
return no_job_workers[0][0]
# Step 4: If no workers with no jobs, check for concurrent job support
for proc_key, weight, vram in available_workers:
client_id = self.processes[proc_key]['client_id']
# Check if worker supports concurrent jobs and has enough free VRAM
current_jobs = len(self.worker_jobs[proc_key])
max_concurrent = self._get_worker_max_concurrent_jobs(proc_key)
current_vram_usage = self.worker_vram_usage[proc_key]
if current_jobs < max_concurrent and (vram - current_vram_usage) >= required_vram_gb:
concurrent_workers.append((proc_key, weight, vram - current_vram_usage))
if concurrent_workers:
# Sort by weight and available VRAM
concurrent_workers.sort(key=lambda x: (x[1], x[2]), reverse=True)
return concurrent_workers[0][0]
return None
def disable_process(self, process_key: str) -> bool: def disable_process(self, process_key: str) -> bool:
"""Disable a specific process.""" """Disable a specific process."""
if process_key in self.processes: if process_key in self.processes:
......
...@@ -618,6 +618,16 @@ def init_db(conn) -> None: ...@@ -618,6 +618,16 @@ def init_db(conn) -> None:
# Column might already exist # Column might already exist
pass pass
# Add job_id column to processing_queue table if it doesn't exist
try:
if config['type'] == 'mysql':
cursor.execute('ALTER TABLE processing_queue ADD COLUMN job_id VARCHAR(100)')
else:
cursor.execute('ALTER TABLE processing_queue ADD COLUMN job_id TEXT')
except:
# Column might already exist
pass
# Insert default admin user if not exist # Insert default admin user if not exist
import hashlib import hashlib
default_password = hashlib.sha256('admin'.encode()).hexdigest() default_password = hashlib.sha256('admin'.encode()).hexdigest()
......
...@@ -306,3 +306,47 @@ def unload_all_models() -> None: ...@@ -306,3 +306,47 @@ def unload_all_models() -> None:
for model in _model_cache.values(): for model in _model_cache.values():
model.unload_model() model.unload_model()
_model_cache.clear() _model_cache.clear()
def estimate_model_vram_requirements(model_path: str) -> int:
"""Estimate VRAM requirements for a model in GB."""
model_path_lower = model_path.lower()
# Vision-language models
if 'qwen' in model_path_lower and ('vl' in model_path_lower or 'vision' in model_path_lower):
if '7b' in model_path_lower:
return 16 # Qwen2.5-VL-7B requires ~16GB VRAM
elif '3b' in model_path_lower:
return 8 # Qwen2.5-VL-3B requires ~8GB VRAM
elif '72b' in model_path_lower:
return 144 # Qwen2.5-VL-72B requires ~144GB VRAM
else:
return 24 # Default for other Qwen VL models
# Text-only models
elif 'llama' in model_path_lower:
if '70b' in model_path_lower:
return 40
elif '65b' in model_path_lower:
return 32
elif '30b' in model_path_lower:
return 16
elif '13b' in model_path_lower:
return 8
elif '7b' in model_path_lower:
return 4
else:
return 16
elif 'mistral' in model_path_lower:
if '7b' in model_path_lower:
return 4
else:
return 8
elif 'gpt' in model_path_lower or 'chatgpt' in model_path_lower:
# These are typically API-based, but if local
return 8
# Default estimate
return 8
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment