Add ping forwarding from backend to cluster_master to reset job timeouts

parent 3425ca22
...@@ -197,6 +197,16 @@ def handle_worker_message(message: Message, client_sock) -> None: ...@@ -197,6 +197,16 @@ def handle_worker_message(message: Message, client_sock) -> None:
# Update progress timestamp to reset timeout # Update progress timestamp to reset timeout
job_progress_times[job_id] = time.time() job_progress_times[job_id] = time.time()
print(f"PING received for job {job_id} - resetting timeout") print(f"PING received for job {job_id} - resetting timeout")
# Notify cluster master of ping to reset job timeout
try:
import socket
notification_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
notification_sock.connect(('localhost', 5004)) # Cluster master port + 1
notification_sock.sendall(f"ping:{job_id}".encode())
notification_sock.close()
print(f"Notified cluster master of ping for job {job_id}")
except Exception as e:
print(f"Failed to notify cluster master of ping: {e}")
elif message.msg_type in ['analyze_response', 'train_response']: elif message.msg_type in ['analyze_response', 'train_response']:
# Store result for web to poll # Store result for web to poll
pending_results[message.msg_id] = message pending_results[message.msg_id] = message
......
...@@ -242,6 +242,11 @@ class ClusterMaster: ...@@ -242,6 +242,11 @@ class ClusterMaster:
print("Received worker registration notification - checking for pending jobs") print("Received worker registration notification - checking for pending jobs")
# Trigger immediate job assignment check # Trigger immediate job assignment check
await self._check_pending_jobs() await self._check_pending_jobs()
elif message.startswith("ping:"):
job_id = message.split(":", 1)[1]
if job_id in self.active_jobs:
self.active_jobs[job_id]['last_progress'] = time.time()
print(f"Ping received for job {job_id} - resetting timeout")
except Exception as e: except Exception as e:
print(f"Notification handling error: {e}") print(f"Notification handling error: {e}")
finally: finally:
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment