Fix cluster master to run jobs locally when no clients connected

- Register local processes in cluster master when weight > 0
- Handle local job assignment by forwarding to backend via TCP
- Allows jobs to run locally when no cluster clients are connected
parent 6b673482
...@@ -33,6 +33,7 @@ import sys ...@@ -33,6 +33,7 @@ import sys
from typing import Dict, Any, List, Optional from typing import Dict, Any, List, Optional
from collections import defaultdict from collections import defaultdict
import socketserver import socketserver
from .comm import Message
class ClusterMaster: class ClusterMaster:
...@@ -143,6 +144,10 @@ class ClusterMaster: ...@@ -143,6 +144,10 @@ class ClusterMaster:
start_server = websockets.serve(self._handle_client, self.host, self.port, ssl=ssl_context) start_server = websockets.serve(self._handle_client, self.host, self.port, ssl=ssl_context)
await start_server await start_server
# Register local processes if master has weight
if self.weight > 0:
await self._register_local_processes()
# Start management loop in background # Start management loop in background
asyncio.create_task(self._management_loop()) asyncio.create_task(self._management_loop())
...@@ -150,6 +155,47 @@ class ClusterMaster: ...@@ -150,6 +155,47 @@ class ClusterMaster:
while self.running: while self.running:
await asyncio.sleep(1) await asyncio.sleep(1)
async def _register_local_processes(self) -> None:
"""Register local processes for job execution when no clients are connected."""
from .compat import detect_gpu_backends, get_available_backends
gpu_info = detect_gpu_backends()
available_backends = get_available_backends()
client_id = "local_master"
hostname = "localhost"
# Register as a client
self.clients[client_id] = {
'token': 'local',
'hostname': hostname,
'ip_address': '127.0.0.1',
'weight': self.weight,
'gpu_info': gpu_info,
'available_backends': available_backends,
'connected': True,
'last_seen': time.time(),
'consecutive_failures': 0,
'failing': False
}
# Register processes
for backend in available_backends:
for proc_type in ['analysis', 'training']:
proc_name = f'{proc_type}_{backend}'
proc_key = f"{client_id}:{proc_name}"
weight = 10 if proc_type == 'analysis' else 5
self.processes[proc_key] = {
'client_id': client_id,
'name': proc_name,
'weight': weight,
'model': 'Qwen/Qwen2.5-VL-7B-Instruct',
'status': 'active'
}
self.process_queue[proc_type].append((proc_key, weight))
self.process_queue[proc_type].sort(key=lambda x: x[1], reverse=True)
print(f"Registered {len([p for p in self.processes if self.processes[p]['client_id'] == client_id])} local processes for backends: {available_backends}")
async def _handle_client(self, websocket: websockets.WebSocketServerProtocol) -> None: async def _handle_client(self, websocket: websockets.WebSocketServerProtocol) -> None:
"""Handle a client websocket connection.""" """Handle a client websocket connection."""
client_id = None client_id = None
...@@ -481,7 +527,10 @@ class ClusterMaster: ...@@ -481,7 +527,10 @@ class ClusterMaster:
await self._transfer_job_files(client_id, job_data, job_id) await self._transfer_job_files(client_id, job_data, job_id)
# Send job assignment # Send job assignment
if client_id in self.client_websockets: if client_id == "local_master":
# Handle local job assignment
return await self._assign_local_job(worker_key, job_data)
elif client_id in self.client_websockets:
try: try:
# Create the job assignment message # Create the job assignment message
job_message = { job_message = {
...@@ -515,6 +564,59 @@ class ClusterMaster: ...@@ -515,6 +564,59 @@ class ClusterMaster:
return job_id return job_id
async def _assign_local_job(self, worker_key: str, job_data: dict) -> Optional[str]:
"""Assign a job to a local worker."""
from .models import estimate_model_vram_requirements
client_id = self.processes[worker_key]['client_id']
model_path = job_data.get('model_path', 'Qwen/Qwen2.5-VL-7B-Instruct')
process_type = self.processes[worker_key]['name'].split('_')[0]
# Generate job ID
job_id = f"job_{int(time.time())}_{hash(str(job_data)) % 1000}"
# Track the job
vram_required = estimate_model_vram_requirements(model_path)
self.active_jobs[job_id] = {
'worker_key': worker_key,
'client_id': client_id,
'model_path': model_path,
'vram_required': vram_required,
'start_time': time.time(),
'job_data': job_data
}
self.worker_jobs[worker_key].append(job_id)
self.worker_vram_usage[worker_key] += vram_required
# Forward job to local backend via TCP
try:
from .comm import SocketCommunicator
from .config import get_backend_web_port
# Connect to local backend web server
backend_comm = SocketCommunicator(host='localhost', port=get_backend_web_port(), comm_type='tcp')
backend_comm.connect()
# Send job to backend
job_message = Message(
msg_type=f'{process_type}_request',
msg_id=job_id,
data=job_data
)
backend_comm.send_message(job_message)
print(f"Job {job_id} forwarded to local backend for {process_type} processing")
return job_id
except Exception as e:
print(f"Failed to forward job {job_id} to local backend: {e}")
# Clean up
self.worker_jobs[worker_key].remove(job_id)
self.worker_vram_usage[worker_key] -= vram_required
del self.active_jobs[job_id]
return None
async def _transfer_job_files(self, client_id: str, job_data: dict, job_id: str) -> None: async def _transfer_job_files(self, client_id: str, job_data: dict, job_id: str) -> None:
"""Transfer job files to client.""" """Transfer job files to client."""
media_path = job_data.get('local_path') media_path = job_data.get('local_path')
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment