Fix job re-queuing logic to prevent fallback to local processing

- Modified queue.py to allow retried jobs to use distributed processing when available
- Fixed async coroutine warning by adding await to _transfer_job_files call
- Jobs that fail on clients will now be properly re-queued for distributed processing instead of falling back to local workers that may not exist
parent d5d30329
...@@ -478,7 +478,7 @@ class ClusterMaster: ...@@ -478,7 +478,7 @@ class ClusterMaster:
# Handle file transfer # Handle file transfer
media_path = job_data.get('local_path') media_path = job_data.get('local_path')
if media_path and client_id in self.client_websockets: if media_path and client_id in self.client_websockets:
self._transfer_job_files(client_id, job_data, job_id) await self._transfer_job_files(client_id, job_data, job_id)
# Send job assignment # Send job assignment
if client_id in self.client_websockets: if client_id in self.client_websockets:
......
...@@ -210,20 +210,22 @@ class QueueManager: ...@@ -210,20 +210,22 @@ class QueueManager:
# Check if this job has failed before (has retry_count) # Check if this job has failed before (has retry_count)
retry_count = job.get('retry_count', 0) retry_count = job.get('retry_count', 0)
# For jobs that have failed before, prefer local processing to avoid # Check if distributed workers are available
# immediate re-assignment to the same failing distributed worker has_distributed = self._has_distributed_worker(process_type)
if retry_count == 0 and self._has_distributed_worker(process_type):
# Mark as processing, cluster master will assign it if has_distributed:
# Use distributed processing for all jobs when available
# The cluster master will handle retry logic and worker selection
from .database import update_queue_status from .database import update_queue_status
update_queue_status(job['id'], 'processing', {'status': 'Waiting for assignment'}) update_queue_status(job['id'], 'processing', {'status': 'Waiting for assignment'})
if retry_count > 0:
print(f"Job {job['id']} retry {retry_count}, marked for distributed processing")
else:
print(f"Job {job['id']} marked for distributed processing") print(f"Job {job['id']} marked for distributed processing")
return return
# Fall back to local processing (also used for retried jobs) # No distributed workers available, fall back to local processing
if retry_count > 0: print(f"No distributed workers available for job {job['id']}, falling back to local processing")
print(f"Job {job['id']} failed before (retry {retry_count}), using local processing")
else:
print(f"No suitable distributed worker available for job {job['id']}, falling back to local processing")
self._execute_local_job(job) self._execute_local_job(job)
def _send_to_distributed_worker(self, job: Dict[str, Any], worker_key: str) -> None: def _send_to_distributed_worker(self, job: Dict[str, Any], worker_key: str) -> None:
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment