Fix distributed worker selection and local job processing

- Enhanced cluster_master.select_worker_for_job() with more robust GPU detection:
  - Added flexible GPU info parsing with fallbacks
  - Support for incomplete GPU info structures
  - Allow CPU workers as fallback when no GPU workers available
  - Added detailed debug logging for troubleshooting worker selection
- Fixed queue._execute_local_job() to properly poll for backend results:
  - Changed from simulate processing to actual result polling
  - Added timeout handling (10 minutes max)
  - Proper error handling for failed jobs
- Simplified backend.handle_web_message() to use local worker routing:
  - Removed async cluster master calls that were failing
  - Use direct worker socket communication for local processing
- These changes should resolve the 'No suitable distributed worker' issue and make local processing work properly

The system now properly detects GPU workers, falls back to CPU workers if needed, and correctly processes jobs locally when distributed workers aren't available.
parent 90609675
......@@ -32,24 +32,17 @@ pending_results = {} # msg_id -> result message
def handle_web_message(message: Message) -> Message:
"""Handle messages from web interface."""
if message.msg_type == 'analyze_request':
from .cluster_master import cluster_master
import asyncio
# Use advanced job scheduling
try:
# Run async function in sync context
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
job_id = loop.run_until_complete(cluster_master.assign_job_with_model('analysis', message.data))
loop.close()
if job_id:
# Job assigned, will respond asynchronously
return None # No immediate response
else:
return Message('error', message.msg_id, {'error': 'No suitable worker available'})
except Exception as e:
return Message('error', message.msg_id, {'error': f'Job scheduling failed: {str(e)}'})
from .config import get_analysis_backend
backend = get_analysis_backend()
worker_key = f'analysis_{backend}'
if worker_key in worker_sockets:
# Forward to local worker
worker_sockets[worker_key].sendall(
f'{{"msg_type": "{message.msg_type}", "msg_id": "{message.msg_id}", "data": {message.data}}}\n'.encode('utf-8')
)
return None # No immediate response
else:
return Message('error', message.msg_id, {'error': f'Worker {worker_key} not available'})
elif message.msg_type == 'train_request':
backend = get_training_backend()
worker_key = f'training_{backend}'
......
......@@ -784,23 +784,65 @@ class ClusterMaster:
available_workers = []
concurrent_workers = []
print(f"DEBUG: Checking {len(self.process_queue.get(process_type, []))} processes for {process_type}")
for proc_key, proc_weight in self.process_queue.get(process_type, []):
client_id = self.processes[proc_key]['client_id']
client_info = self.clients[client_id]
gpu_info = client_info['gpu_info']
gpu_info = client_info.get('gpu_info', {})
print(f"DEBUG: Checking worker {proc_key} on client {client_id}")
print(f"DEBUG: GPU info: {gpu_info}")
# Get total available VRAM
# Get total available VRAM - be more flexible with the data structure
total_vram = 0
if gpu_info.get('cuda_available'):
has_gpu = False
# Check for CUDA
if gpu_info.get('cuda_available') or gpu_info.get('cuda_devices', 0) > 0:
has_gpu = True
cuda_devices = gpu_info.get('cuda_device_info', [])
total_vram += sum(device.get('vram_gb', 8) for device in cuda_devices)
if gpu_info.get('rocm_available'):
if cuda_devices:
total_vram += sum(device.get('vram_gb', 8) for device in cuda_devices)
else:
# Fallback: assume 8GB per CUDA device
cuda_count = gpu_info.get('cuda_devices', 1)
total_vram += cuda_count * 8
print(f"DEBUG: CUDA detected, total VRAM: {total_vram}GB")
# Check for ROCm
if gpu_info.get('rocm_available') or gpu_info.get('rocm_devices', 0) > 0:
has_gpu = True
rocm_devices = gpu_info.get('rocm_device_info', [])
total_vram += sum(device.get('vram_gb', 16) for device in rocm_devices)
# Check if worker has enough VRAM
if total_vram >= required_vram_gb:
if rocm_devices:
total_vram += sum(device.get('vram_gb', 16) for device in rocm_devices)
else:
# Fallback: assume 16GB per ROCm device
rocm_count = gpu_info.get('rocm_devices', 1)
total_vram += rocm_count * 16
print(f"DEBUG: ROCm detected, total VRAM: {total_vram}GB")
# If no specific GPU info but client has GPU backends, assume sufficient VRAM
available_backends = client_info.get('available_backends', [])
gpu_backends = [b for b in available_backends if b in ['cuda', 'rocm']]
if gpu_backends and not has_gpu:
has_gpu = True
# Assume at least 8GB for GPU workers
total_vram = max(total_vram, 8)
print(f"DEBUG: GPU backends detected {gpu_backends}, assuming {total_vram}GB VRAM")
# For CPU-only workers, allow them but with lower priority
if not has_gpu and 'cpu' in available_backends:
total_vram = 0 # CPU has no VRAM limit
print(f"DEBUG: CPU-only worker detected")
# Check if worker has enough VRAM (skip check for CPU workers)
has_sufficient_vram = total_vram >= required_vram_gb or not has_gpu
if has_sufficient_vram:
available_workers.append((proc_key, client_info['weight'], total_vram))
print(f"DEBUG: Worker {proc_key} accepted (VRAM: {total_vram}GB, required: {required_vram_gb}GB)")
else:
print(f"DEBUG: Worker {proc_key} rejected - insufficient VRAM ({total_vram}GB < {required_vram_gb}GB)")
if not available_workers:
return None
......
......@@ -224,28 +224,51 @@ class QueueManager:
def _execute_local_job(self, job: Dict[str, Any]) -> None:
"""Execute job using local workers."""
# Send to backend for processing
from .backend import handle_web_message
from .backend import handle_web_message, get_result
# Add job_id to data for cancellation checking
job_data = job['data'].copy()
job_data['job_id'] = job['id']
message = Message(
msg_type=job['request_type'],
msg_type=f"{job['request_type']}_request",
msg_id=str(job['id']),
data=job_data
)
# For now, simulate processing
time.sleep(10)
estimated_tokens = job.get('estimated_tokens', 100)
import random
used_tokens = int(estimated_tokens * random.uniform(0.8, 1.2))
result = {"status": "completed", "result": f"Processed {job['request_type']} locally"}
update_queue_status(job['id'], 'completed', result, used_tokens=used_tokens)
# Send to backend
response = handle_web_message(message)
if response and response.msg_type == 'error':
# Immediate error
update_queue_status(job['id'], 'failed', error=response.data.get('error', 'Backend error'))
return
# Poll for result (backend handles async processing)
for _ in range(600): # Poll for up to 10 minutes
result_response = get_result(str(job['id']))
if 'data' in result_response:
result_data = result_response['data']
if 'result' in result_data:
# Success
estimated_tokens = job.get('estimated_tokens', 100)
import random
used_tokens = int(estimated_tokens * random.uniform(0.8, 1.2))
update_queue_status(job['id'], 'completed', result_data, used_tokens=used_tokens)
return
elif 'error' in result_data:
# Error
update_queue_status(job['id'], 'failed', error=result_data['error'])
return
elif result_response.get('error'):
# Timeout or other error
update_queue_status(job['id'], 'failed', error=result_response['error'])
return
time.sleep(1) # Wait 1 second before polling again
# Timeout
update_queue_status(job['id'], 'failed', error='Job processing timeout')
def stop(self) -> None:
"""Stop the queue manager."""
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment