Add immediate job assignment notification when workers register

parent 28c25ebb
...@@ -144,6 +144,17 @@ def handle_worker_message(message: Message, client_sock) -> None: ...@@ -144,6 +144,17 @@ def handle_worker_message(message: Message, client_sock) -> None:
if worker_type: if worker_type:
worker_sockets[worker_type] = client_sock worker_sockets[worker_type] = client_sock
print(f"Worker {worker_type} registered") print(f"Worker {worker_type} registered")
# Notify cluster master that a worker has registered
try:
import socket
notification_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
notification_sock.connect(('localhost', 5004)) # Cluster master port + 1
notification_sock.sendall(b"worker_registered")
notification_sock.close()
print(f"Notified cluster master of worker {worker_type} registration")
except Exception as e:
print(f"Failed to notify cluster master: {e}")
elif message.msg_type == 'progress': elif message.msg_type == 'progress':
# Store progress update for web to poll and update progress timestamp # Store progress update for web to poll and update progress timestamp
progress_key = f"progress_{message.data.get('job_id')}" progress_key = f"progress_{message.data.get('job_id')}"
......
...@@ -146,6 +146,12 @@ class ClusterMaster: ...@@ -146,6 +146,12 @@ class ClusterMaster:
start_server = websockets.serve(self._handle_client, self.host, self.port, ssl=ssl_context) start_server = websockets.serve(self._handle_client, self.host, self.port, ssl=ssl_context)
await start_server await start_server
# Start notification server for backend notifications
notification_server = await asyncio.start_server(
self._handle_notification, 'localhost', self.port + 1
)
print(f"Cluster master notification server started on port {self.port + 1}")
# Register local processes if master has weight # Register local processes if master has weight
if self.weight > 0: if self.weight > 0:
await self._register_local_processes() await self._register_local_processes()
...@@ -227,6 +233,21 @@ class ClusterMaster: ...@@ -227,6 +233,21 @@ class ClusterMaster:
if client_id: if client_id:
self._remove_client(client_id) self._remove_client(client_id)
async def _handle_notification(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter) -> None:
"""Handle notification from backend."""
try:
data = await reader.read(1024)
message = data.decode().strip()
if message == "worker_registered":
print("Received worker registration notification - checking for pending jobs")
# Trigger immediate job assignment check
await self._check_pending_jobs()
except Exception as e:
print(f"Notification handling error: {e}")
finally:
writer.close()
await writer.wait_closed()
async def _process_message(self, message: Dict[str, Any], websocket: websockets.WebSocketServerProtocol) -> Optional[Dict[str, Any]]: async def _process_message(self, message: Dict[str, Any], websocket: websockets.WebSocketServerProtocol) -> Optional[Dict[str, Any]]:
"""Process a message from a client.""" """Process a message from a client."""
msg_type = message.get('type') msg_type = message.get('type')
...@@ -1424,6 +1445,57 @@ class ClusterMaster: ...@@ -1424,6 +1445,57 @@ class ClusterMaster:
self._remove_client(client_id) self._remove_client(client_id)
return False return False
async def _check_pending_jobs(self) -> None:
"""Check for and assign pending jobs."""
from .database import get_db_connection
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute('SELECT * FROM processing_queue WHERE status = ?', ('queued',))
jobs = cursor.fetchall()
conn.close()
current_time = time.time()
should_print_status = jobs and (current_time - self.last_job_status_print) >= 10
if should_print_status:
print(f"Found {len(jobs)} pending job(s)")
self.last_job_status_print = current_time
for job_row in jobs:
job = dict(job_row)
# Check if already assigned
already_assigned = any(job_info.get('queue_id') == job['id'] for job_info in self.active_jobs.values())
if already_assigned:
continue
# Set status to processing before assigning
from .database import update_queue_status
update_queue_status(job['id'], 'processing')
job['data'] = json.loads(job['data']) if job['data'] else None
process_type = 'analysis' if job['request_type'] == 'analyze' else ('training' if job['request_type'] == 'train' else job['request_type'])
if should_print_status:
print(f"Job {job['id']} waiting for available workers")
worker_key = self.select_worker_for_job(process_type, job['data'].get('model_path', 'Qwen/Qwen2.5-VL-7B-Instruct'), job['data'])
if worker_key:
job_id = await self.assign_job_to_worker(worker_key, job['data'], queue_id=job['id'])
if job_id:
# Include worker information in the result
worker_info = {
'job_id': job_id,
'status': 'Assigned to worker',
'worker': worker_key,
'worker_type': self.processes[worker_key]['name']
}
update_queue_status(job['id'], 'processing', worker_info, job_id=job_id)
print(f"Assigned job {job['id']} ({job_id}) to worker {worker_key}")
else:
print(f"Failed to assign job {job['id']} to worker {worker_key}, re-queuing")
update_queue_status(job['id'], 'queued', error='Failed to assign to worker, re-queued')
else:
print(f"No suitable worker found for job {job['id']}, re-queuing")
update_queue_status(job['id'], 'queued', error='No suitable worker found, re-queued')
async def _management_loop(self) -> None: async def _management_loop(self) -> None:
"""Main management loop.""" """Main management loop."""
while self.running: while self.running:
...@@ -1441,54 +1513,7 @@ class ClusterMaster: ...@@ -1441,54 +1513,7 @@ class ClusterMaster:
self._remove_client(client_id) self._remove_client(client_id)
# Poll for jobs to assign # Poll for jobs to assign
from .database import get_db_connection await self._check_pending_jobs()
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute('SELECT * FROM processing_queue WHERE status = ?', ('queued',))
jobs = cursor.fetchall()
conn.close()
current_time = time.time()
should_print_status = jobs and (current_time - self.last_job_status_print) >= 10
if should_print_status:
print(f"Found {len(jobs)} pending job(s)")
self.last_job_status_print = current_time
for job_row in jobs:
job = dict(job_row)
# Check if already assigned
already_assigned = any(job_info.get('queue_id') == job['id'] for job_info in self.active_jobs.values())
if already_assigned:
continue
# Set status to processing before assigning
from .database import update_queue_status
update_queue_status(job['id'], 'processing')
job['data'] = json.loads(job['data']) if job['data'] else None
process_type = 'analysis' if job['request_type'] == 'analyze' else ('training' if job['request_type'] == 'train' else job['request_type'])
if should_print_status:
print(f"Job {job['id']} waiting for available workers")
worker_key = self.select_worker_for_job(process_type, job['data'].get('model_path', 'Qwen/Qwen2.5-VL-7B-Instruct'), job['data'])
if worker_key:
job_id = await self.assign_job_to_worker(worker_key, job['data'], queue_id=job['id'])
if job_id:
# Include worker information in the result
worker_info = {
'job_id': job_id,
'status': 'Assigned to worker',
'worker': worker_key,
'worker_type': self.processes[worker_key]['name']
}
update_queue_status(job['id'], 'processing', worker_info, job_id=job_id)
print(f"Assigned job {job['id']} ({job_id}) to worker {worker_key}")
else:
print(f"Failed to assign job {job['id']} to worker {worker_key}, re-queuing")
update_queue_status(job['id'], 'queued', error='Failed to assign to worker, re-queued')
else:
print(f"No suitable worker found for job {job['id']}, re-queuing")
update_queue_status(job['id'], 'queued', error='No suitable worker found, re-queued')
# Check for cancelled jobs that need to be stopped # Check for cancelled jobs that need to be stopped
cancelled_jobs = [] cancelled_jobs = []
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment