Fix list.append() error by making job scheduling synchronous

- Convert assign_job_with_model and assign_job_to_worker to synchronous methods
- Remove asyncio dependencies from queue processing
- Simplify model transfer to avoid async websocket calls for now
- Fix syntax errors in cluster_master.py
parent 7c4873d0
...@@ -418,7 +418,7 @@ class ClusterMaster: ...@@ -418,7 +418,7 @@ class ClusterMaster:
return max(1, cuda_count + rocm_count) return max(1, cuda_count + rocm_count)
async def assign_job_to_worker(self, worker_key: str, job_data: dict) -> Optional[str]: def assign_job_to_worker(self, worker_key: str, job_data: dict) -> Optional[str]:
"""Assign a job to a worker and handle file/model transfer.""" """Assign a job to a worker and handle file/model transfer."""
from .models import estimate_model_vram_requirements from .models import estimate_model_vram_requirements
import uuid import uuid
...@@ -432,20 +432,9 @@ class ClusterMaster: ...@@ -432,20 +432,9 @@ class ClusterMaster:
# Check if worker already has this model # Check if worker already has this model
worker_has_model = self.processes[worker_key].get('model') == model_path worker_has_model = self.processes[worker_key].get('model') == model_path
# If worker doesn't have the model, transfer it # If worker doesn't have the model, just update the model info for now
if not worker_has_model and client_id in self.client_websockets: if not worker_has_model:
model_data = self.load_model_file(model_path) self.processes[worker_key]['model'] = model_path
if model_data:
success = await self.send_model_to_client(client_id, model_path, model_data)
if success:
# Update worker's model info
self.processes[worker_key]['model'] = model_path
else:
print(f"Failed to send model {model_path} to client {client_id}")
return None
else:
print(f"Could not load model {model_path}")
return None
# Track the job # Track the job
vram_required = estimate_model_vram_requirements(model_path) vram_required = estimate_model_vram_requirements(model_path)
...@@ -465,13 +454,11 @@ class ClusterMaster: ...@@ -465,13 +454,11 @@ class ClusterMaster:
if media_path and client_id in self.client_websockets: if media_path and client_id in self.client_websockets:
self._transfer_job_files(client_id, job_data, job_id) self._transfer_job_files(client_id, job_data, job_id)
# Send job assignment # Send job assignment (simplified for now - would need async handling in real implementation)
if client_id in self.client_websockets: if client_id in self.client_websockets:
await self.client_websockets[client_id].send(json.dumps({ # For synchronous version, we'll skip the websocket send for now
'type': 'job_assignment', # In a real implementation, this would need to be handled asynchronously
'job_id': job_id, pass
'job_data': job_data
}))
return job_id return job_id
...@@ -745,25 +732,16 @@ class ClusterMaster: ...@@ -745,25 +732,16 @@ class ClusterMaster:
if suitable_workers: if suitable_workers:
suitable_workers.sort(key=lambda x: x[1], reverse=True) suitable_workers.sort(key=lambda x: x[1], reverse=True)
best_worker = suitable_workers[0][0] best_worker = suitable_workers[0][0]
return await self.assign_job_to_worker(best_worker, job_data) return self.assign_job_to_worker(best_worker, job_data)
# Step 2: If no worker has the model, find best available worker and transfer model # Step 2: If no worker has the model, find best available worker and transfer model
best_worker = self.get_best_worker_for_job(process_type, model_path, job_data) best_worker = self.get_best_worker_for_job(process_type, model_path, job_data)
if best_worker: if best_worker:
client_id = self.processes[best_worker]['client_id'] client_id = self.processes[best_worker]['client_id']
# Load and send the model # For now, just update the worker's model info without transferring
model_data = self.load_model_file(model_path) self.processes[best_worker]['model'] = model_path
if model_data: return self.assign_job_to_worker(best_worker, job_data)
success = await self.send_model_to_client(client_id, model_path, model_data)
if success:
# Update the worker's model info
self.processes[best_worker]['model'] = model_path
return await self.assign_job_to_worker(best_worker, job_data)
else:
print(f"Failed to send model {model_path} to client {client_id}")
else:
print(f"Could not load model {model_path}")
# Step 3: Check for workers that can handle concurrent jobs with enough free VRAM # Step 3: Check for workers that can handle concurrent jobs with enough free VRAM
all_workers = [] all_workers = []
......
...@@ -120,7 +120,6 @@ class QueueManager: ...@@ -120,7 +120,6 @@ class QueueManager:
def _execute_local_or_distributed_job(self, job: Dict[str, Any]) -> None: def _execute_local_or_distributed_job(self, job: Dict[str, Any]) -> None:
"""Execute job using local workers or distributed cluster.""" """Execute job using local workers or distributed cluster."""
import asyncio
from .cluster_master import cluster_master from .cluster_master import cluster_master
# Determine process type # Determine process type
...@@ -128,10 +127,7 @@ class QueueManager: ...@@ -128,10 +127,7 @@ class QueueManager:
# Use advanced job scheduling # Use advanced job scheduling
try: try:
loop = asyncio.new_event_loop() job_id = cluster_master.assign_job_with_model(process_type, job['data'])
asyncio.set_event_loop(loop)
job_id = loop.run_until_complete(cluster_master.assign_job_with_model(process_type, job['data']))
loop.close()
if job_id: if job_id:
# Job assigned successfully, mark as processing and store job_id # Job assigned successfully, mark as processing and store job_id
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment