Fix job scheduling: use proper cluster master methods

- Updated queue manager to use select_worker_for_job() and assign_job_to_worker() instead of unimplemented assign_job_with_model()
- Now properly implements VRAM-aware worker selection based on model requirements
- Jobs will be assigned to distributed workers when available with sufficient VRAM
- Falls back to local processing when no suitable distributed worker is found
- Added proper error handling and logging for job assignment process
parent b25f154f
...@@ -129,20 +129,31 @@ class QueueManager: ...@@ -129,20 +129,31 @@ class QueueManager:
# Determine process type # Determine process type
process_type = job['request_type'] # 'analyze' or 'train' process_type = job['request_type'] # 'analyze' or 'train'
model_path = job['data'].get('model_path', 'Qwen/Qwen2.5-VL-7B-Instruct')
# Use advanced job scheduling # Use advanced job scheduling with VRAM requirements
try: try:
job_id = cluster_master.assign_job_with_model(process_type, job['data']) worker_key = cluster_master.select_worker_for_job(process_type, model_path, job['data'])
if worker_key:
# Assign job to the selected worker
job_id = cluster_master.assign_job_to_worker(worker_key, job['data'])
if job_id: if job_id:
# Job assigned successfully, mark as processing and store job_id # Job assigned successfully, mark as processing and store job_id
from .database import update_queue_status from .database import update_queue_status
update_queue_status(job['id'], 'processing', {'job_id': job_id, 'status': 'Assigned to worker'}, job_id=job_id) update_queue_status(job['id'], 'processing', {'job_id': job_id, 'status': 'Assigned to worker'}, job_id=job_id)
print(f"Job {job['id']} assigned to worker {worker_key} with job_id {job_id}")
else:
# Failed to assign job, fall back to local processing
print(f"Failed to assign job {job['id']} to worker {worker_key}, falling back to local processing")
self._execute_local_job(job)
else: else:
# No distributed worker available, fall back to local processing # No distributed worker available, fall back to local processing
print(f"No distributed worker available for job {job['id']}, falling back to local processing") print(f"No suitable distributed worker available for job {job['id']}, falling back to local processing")
self._execute_local_job(job) self._execute_local_job(job)
except Exception as e: except Exception as e:
print(f"Error in distributed job execution for job {job['id']}: {e}")
from .database import update_queue_status from .database import update_queue_status
update_queue_status(job['id'], 'failed', error_message=str(e)) update_queue_status(job['id'], 'failed', error_message=str(e))
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment