Fix job assignment timing issue by queuing jobs in backend when worker not registered

parent c0d4ffe7
......@@ -27,6 +27,7 @@ from .queue import queue_manager
worker_sockets = {} # type: dict
pending_jobs = {} # worker_key -> list of messages
pending_results = {} # msg_id -> result message
job_progress = {} # job_id -> progress data
job_progress_times = {} # job_id -> last_progress_timestamp
......@@ -75,8 +76,11 @@ def handle_web_message(message: Message, client_sock=None) -> Message:
print(f"Backend sent to worker {worker_key}")
return Message('success', message.msg_id, {'status': 'forwarded'})
else:
print(f"Backend: Worker {worker_key} not found in worker_sockets")
return Message('error', message.msg_id, {'error': f'Worker {worker_key} not available'})
print(f"Backend: Worker {worker_key} not found in worker_sockets, queuing job")
if worker_key not in pending_jobs:
pending_jobs[worker_key] = []
pending_jobs[worker_key].append(message)
return Message('success', message.msg_id, {'status': 'queued'})
elif message.msg_type == 'train_request':
backend = get_training_backend()
worker_key = f'training_{backend}'
......@@ -143,6 +147,24 @@ def handle_worker_message(message: Message, client_sock) -> None:
print(f"Worker {worker_type} registered")
print(f"DEBUG: worker_sockets keys after register: {list(worker_sockets.keys())}")
# Send any pending jobs for this worker
if worker_type in pending_jobs:
import json
for pending_msg in pending_jobs[worker_type]:
msg_dict = {
'msg_type': pending_msg.msg_type,
'msg_id': pending_msg.msg_id,
'data': pending_msg.data
}
msg_json = json.dumps(msg_dict) + '\n'
print(f"Backend sending pending job to worker {worker_type}: {msg_json.strip()}")
try:
worker_sockets[worker_type].sendall(msg_json.encode('utf-8'))
print(f"Backend sent pending job to worker {worker_type}")
except Exception as e:
print(f"Failed to send pending job to worker {worker_type}: {e}")
del pending_jobs[worker_type]
# Notify cluster master that a worker has registered
try:
import socket
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment