Fix race condition where jobs are sent before workers register - backend now...

Fix race condition where jobs are sent before workers register - backend now returns error for unavailable workers, cluster master re-queues jobs with delay
parent d4534415
...@@ -750,6 +750,32 @@ class ClusterMaster: ...@@ -750,6 +750,32 @@ class ClusterMaster:
del self.pending_jobs[job_id] del self.pending_jobs[job_id]
return return
elif response and response.msg_type == 'error':
# Check if it's a worker not available error - re-queue the job
error_msg = response.data.get('error', '')
if 'not available' in error_msg.lower() or 'not found' in error_msg.lower():
print(f"Worker not available for job {job_id}, re-queuing after delay: {error_msg}")
# Wait a bit for worker to register, then re-queue
await asyncio.sleep(2)
from .database import update_queue_status
update_queue_status(job_id, 'queued', error=f'Worker not available, re-queued: {error_msg}')
# Clean up
if job_id in self.pending_jobs:
del self.pending_jobs[job_id]
if job_id in self.active_jobs:
self.complete_job(job_id)
return
else:
# Other error, fail the job
print(f"Job {job_id} failed with error: {error_msg}")
await self._handle_job_result({
'job_id': job_id,
'result': {'status': 'failed', 'error': error_msg}
})
if job_id in self.pending_jobs:
del self.pending_jobs[job_id]
return
elif response and response.msg_type == 'result_pending': elif response and response.msg_type == 'result_pending':
# Result not ready yet, wait and try again # Result not ready yet, wait and try again
await asyncio.sleep(1) await asyncio.sleep(1)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment