Fix cluster client Message object and job re-assignment issues

- Fixed cluster_client.py to send proper Message objects instead of dicts to backend_comm.send_message()
- Modified queue.py to prevent failed jobs from being immediately re-assigned to distributed processing
- Jobs with retry_count > 0 now use local processing to avoid loops with failing distributed workers
parent e449b4a6
......@@ -309,11 +309,11 @@ class ClusterClient:
backend_comm.connect()
# Send job to backend
job_message = {
'msg_type': f'{process_type}_request',
'msg_id': job_id,
'data': job_data
}
job_message = Message(
msg_type=f'{process_type}_request',
msg_id=job_id,
data=job_data
)
backend_comm.send_message(job_message)
print(f"Job {job_id} forwarded to local backend for {process_type} processing")
......@@ -342,11 +342,11 @@ class ClusterClient:
backend_comm.connect()
# Send get_result request
result_request = {
'msg_type': 'get_result',
'msg_id': f'poll_{job_id}',
'data': {'request_id': job_id}
}
result_request = Message(
msg_type='get_result',
msg_id=f'poll_{job_id}',
data={'request_id': job_id}
)
backend_comm.send_message(result_request)
# Try to receive response
......
......@@ -207,15 +207,22 @@ class QueueManager:
else:
process_type = request_type
# Check if distributed worker is available
if self._has_distributed_worker(process_type):
# Check if this job has failed before (has retry_count)
retry_count = job.get('retry_count', 0)
# For jobs that have failed before, prefer local processing to avoid
# immediate re-assignment to the same failing distributed worker
if retry_count == 0 and self._has_distributed_worker(process_type):
# Mark as processing, cluster master will assign it
from .database import update_queue_status
update_queue_status(job['id'], 'processing', {'status': 'Waiting for assignment'})
print(f"Job {job['id']} marked for distributed processing")
return
# Fall back to local processing
# Fall back to local processing (also used for retried jobs)
if retry_count > 0:
print(f"Job {job['id']} failed before (retry {retry_count}), using local processing")
else:
print(f"No suitable distributed worker available for job {job['id']}, falling back to local processing")
self._execute_local_job(job)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment