Modify cluster master to poll database for jobs to assign

parent ee3095c3
......@@ -32,6 +32,7 @@ import ipaddress
import sys
from typing import Dict, Any, List, Optional
from collections import defaultdict
import socketserver
class ClusterMaster:
......@@ -995,7 +996,27 @@ class ClusterMaster:
for client_id in dead_clients:
self._remove_client(client_id)
await asyncio.sleep(10)
# Poll for jobs to assign
from .database import get_db_connection
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute('SELECT * FROM processing_queue WHERE status = ? AND (job_id IS NULL OR job_id = ?)', ('processing', ''))
jobs = cursor.fetchall()
conn.close()
for job_row in jobs:
job = dict(job_row)
job['data'] = json.loads(job['data']) if job['data'] else None
process_type = 'analysis' if job['request_type'] == 'analyze' else ('training' if job['request_type'] == 'train' else job['request_type'])
worker_key = self.select_worker_for_job(process_type, job['data'].get('model_path', 'Qwen/Qwen2.5-VL-7B-Instruct'), job['data'])
if worker_key:
job_id = self.assign_job_to_worker(worker_key, job['data'])
if job_id:
from .database import update_queue_status
update_queue_status(job['id'], 'processing', {'job_id': job_id, 'status': 'Assigned to worker'}, job_id=job_id)
print(f"Assigned job {job['id']} to worker {worker_key} with job_id {job_id}")
await asyncio.sleep(5) # Poll every 5 seconds
except KeyboardInterrupt:
self.running = False
......
......@@ -191,8 +191,6 @@ class QueueManager:
def _execute_local_or_distributed_job(self, job: Dict[str, Any]) -> None:
"""Execute job using local workers or distributed cluster."""
from .cluster_master import cluster_master
# Determine process type - map to cluster master naming convention
request_type = job['request_type']
if request_type == 'analyze':
......@@ -201,33 +199,18 @@ class QueueManager:
process_type = 'training'
else:
process_type = request_type
model_path = job['data'].get('model_path', 'Qwen/Qwen2.5-VL-7B-Instruct')
# Use advanced job scheduling with VRAM requirements
try:
worker_key = cluster_master.select_worker_for_job(process_type, model_path, job['data'])
if worker_key:
# Assign job to the selected worker
job_id = cluster_master.assign_job_to_worker(worker_key, job['data'])
if job_id:
# Job assigned successfully, mark as processing and store job_id
# Check if distributed worker is available
if self._has_distributed_worker(process_type):
# Mark as processing, cluster master will assign it
from .database import update_queue_status
update_queue_status(job['id'], 'processing', {'job_id': job_id, 'status': 'Assigned to worker'}, job_id=job_id)
print(f"Job {job['id']} assigned to worker {worker_key} with job_id {job_id}")
else:
# Failed to assign job, fall back to local processing
print(f"Failed to assign job {job['id']} to worker {worker_key}, falling back to local processing")
self._execute_local_job(job)
else:
# No distributed worker available, fall back to local processing
update_queue_status(job['id'], 'processing', {'status': 'Waiting for assignment'})
print(f"Job {job['id']} marked for distributed processing")
return
# Fall back to local processing
print(f"No suitable distributed worker available for job {job['id']}, falling back to local processing")
self._execute_local_job(job)
except Exception as e:
print(f"Error in distributed job execution for job {job['id']}: {e}")
from .database import update_queue_status
update_queue_status(job['id'], 'failed', error_message=str(e))
def _send_to_distributed_worker(self, job: Dict[str, Any], worker_key: str) -> None:
"""Send job to distributed worker."""
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment