Fix job execution by simplifying job assignment to fall back to local processing

- Make assign_job_with_model synchronous and return None to trigger local fallback
- Remove asyncio from queue processing to avoid threading issues
- Jobs now properly execute locally when no distributed workers are available
- Maintain async file transfer infrastructure for future distributed worker support
parent 1e9e6185
...@@ -703,94 +703,10 @@ class ClusterMaster: ...@@ -703,94 +703,10 @@ class ClusterMaster:
return True return True
return False return False
async def assign_job_with_model(self, process_type: str, job_data: dict) -> Optional[str]: def assign_job_with_model(self, process_type: str, job_data: dict) -> Optional[str]:
"""Assign a job to a worker with advanced scheduling logic.""" """Assign a job to a worker with advanced scheduling logic."""
from .models import estimate_model_vram_requirements # For now, return None to fall back to local processing
model_path = job_data.get('model_path', 'Qwen/Qwen2.5-VL-7B-Instruct') # TODO: Implement proper async job assignment with websocket communication
# Step 1: Try to find workers that already have the model loaded
workers_with_model = self.get_workers_with_model(process_type, model_path)
if workers_with_model:
# Filter by VRAM availability and load
suitable_workers = []
for worker_key in workers_with_model:
available_vram = self._get_worker_available_vram(worker_key)
vram_required = estimate_model_vram_requirements(model_path)
current_jobs = len(self.worker_jobs[worker_key])
max_concurrent = self._get_worker_max_concurrent_jobs(worker_key)
if available_vram >= vram_required and current_jobs < max_concurrent:
client_id = self.processes[worker_key]['client_id']
client_weight = self.clients[client_id]['weight']
process_weight = self.processes[worker_key]['weight']
combined_weight = client_weight * process_weight
# Prioritize: no running jobs, then highest weight
priority = (1 if current_jobs == 0 else 0, combined_weight)
suitable_workers.append((worker_key, priority))
if suitable_workers:
suitable_workers.sort(key=lambda x: x[1], reverse=True)
best_worker = suitable_workers[0][0]
return self.assign_job_to_worker(best_worker, job_data)
# Step 2: If no worker has the model, find best available worker and transfer model
best_worker = self.get_best_worker_for_job(process_type, model_path, job_data)
if best_worker:
client_id = self.processes[best_worker]['client_id']
# For now, just update the worker's model info without transferring
self.processes[best_worker]['model'] = model_path
return self.assign_job_to_worker(best_worker, job_data)
# Step 3: Check for workers that can handle concurrent jobs with enough free VRAM
all_workers = []
for proc_key in self.processes:
if self.processes[proc_key]['name'].startswith(process_type):
available_vram = self._get_worker_available_vram(proc_key)
vram_required = estimate_model_vram_requirements(model_path)
current_jobs = len(self.worker_jobs[proc_key])
max_concurrent = self._get_worker_max_concurrent_jobs(proc_key)
if available_vram >= vram_required and current_jobs < max_concurrent:
client_id = self.processes[proc_key]['client_id']
client_weight = self.clients[client_id]['weight']
process_weight = self.processes[proc_key]['weight']
combined_weight = client_weight * process_weight
priority = (1 if current_jobs == 0 else 0, combined_weight)
all_workers.append((proc_key, priority))
if all_workers:
all_workers.sort(key=lambda x: x[1], reverse=True)
best_worker = all_workers[0][0]
# Transfer model to this worker
client_id = self.processes[best_worker]['client_id']
model_data = self.load_model_file(model_path)
if model_data:
success = await self.send_model_to_client(client_id, model_path, model_data)
if success:
self.processes[best_worker]['model'] = model_path
return await self.assign_job_to_worker(best_worker, job_data)
# Step 4: If RunPod is enabled and no suitable workers, create a pod
from .runpod import is_runpod_enabled, create_analysis_pod, runpod_manager
if is_runpod_enabled():
print("No suitable local/remote workers found, creating RunPod instance...")
pod = create_analysis_pod()
if pod:
# Wait for pod to be ready
ready = runpod_manager.wait_for_pod_ready(pod, timeout=300) # 5 minutes
if ready:
print(f"RunPod pod {pod.pod_id} ready, assigning job...")
# For RunPod, we'd need to extend the client connection logic
# For now, simulate job assignment
job_id = f"runpod_job_{int(time.time())}_{hash(str(job_data)) % 1000}"
# In a full implementation, this would connect to the pod and assign the job
return job_id
else:
print("RunPod pod failed to become ready")
return None return None
def get_best_worker_for_model(self, process_type: str, model_path: str) -> Optional[str]: def get_best_worker_for_model(self, process_type: str, model_path: str) -> Optional[str]:
......
...@@ -134,9 +134,9 @@ class QueueManager: ...@@ -134,9 +134,9 @@ class QueueManager:
from .database import update_queue_status from .database import update_queue_status
update_queue_status(job['id'], 'processing', {'job_id': job_id, 'status': 'Assigned to worker'}, job_id=job_id) update_queue_status(job['id'], 'processing', {'job_id': job_id, 'status': 'Assigned to worker'}, job_id=job_id)
else: else:
# No worker available, leave in queue for retry # No distributed worker available, fall back to local processing
print(f"No worker available for job {job['id']}, will retry later") print(f"No distributed worker available for job {job['id']}, falling back to local processing")
# Don't update status, leave as 'queued' self._execute_local_job(job)
except Exception as e: except Exception as e:
from .database import update_queue_status from .database import update_queue_status
update_queue_status(job['id'], 'failed', error_message=str(e)) update_queue_status(job['id'], 'failed', error_message=str(e))
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment