Fix import error for get_result and modify queue to keep jobs queued when no workers available

parent 55528540
...@@ -29,6 +29,26 @@ worker_sockets = {} # type: dict ...@@ -29,6 +29,26 @@ worker_sockets = {} # type: dict
pending_results = {} # msg_id -> result message pending_results = {} # msg_id -> result message
def get_result(msg_id: str) -> dict:
"""Poll for result from backend via socket."""
import uuid
from . import comm
try:
# Send get_result request
result_msg = Message('get_result', str(uuid.uuid4()), {'request_id': msg_id})
comm.send_message(result_msg)
# Wait for response
response = comm.receive_message(timeout=5)
if response:
return response.data
else:
return {'error': 'Timeout waiting for result'}
except Exception as e:
return {'error': str(e)}
def handle_web_message(message: Message) -> Message: def handle_web_message(message: Message) -> Message:
"""Handle messages from web interface.""" """Handle messages from web interface."""
if message.msg_type == 'analyze_request': if message.msg_type == 'analyze_request':
......
...@@ -88,6 +88,41 @@ class QueueManager: ...@@ -88,6 +88,41 @@ class QueueManager:
return True return True
def _can_start_job(self, job: Dict[str, Any]) -> bool:
"""Check if a job can be started (worker available)."""
from .cluster_master import cluster_master
from .config import get_analysis_backend, get_training_backend
from .backend import worker_sockets
request_type = job['request_type']
if request_type == 'analyze':
process_type = 'analysis'
elif request_type == 'train':
process_type = 'training'
else:
process_type = request_type
model_path = job['data'].get('model_path', 'Qwen/Qwen2.5-VL-7B-Instruct')
# Check for distributed worker
worker_key = cluster_master.select_worker_for_job(process_type, model_path, job['data'])
if worker_key:
return True
# Check for local worker
if request_type == 'analyze':
backend = get_analysis_backend()
worker_key_local = f'analysis_{backend}'
elif request_type == 'train':
backend = get_training_backend()
worker_key_local = f'training_{backend}'
else:
return False
if worker_key_local in worker_sockets:
return True
return False
def _process_queue(self) -> None: def _process_queue(self) -> None:
"""Background thread to process queued jobs.""" """Background thread to process queued jobs."""
while self.running: while self.running:
...@@ -97,6 +132,7 @@ class QueueManager: ...@@ -97,6 +132,7 @@ class QueueManager:
pending = get_pending_queue_items() pending = get_pending_queue_items()
if pending: if pending:
job = pending[0] # Get highest priority job job = pending[0] # Get highest priority job
if self._can_start_job(job):
self._start_job(job) self._start_job(job)
time.sleep(1) # Check every second time.sleep(1) # Check every second
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment