Fix cluster client communication by starting local backend process

- Modified cluster client to start a local backend process alongside workers
- Backend process handles communication between cluster client and local workers
- Fixed process cleanup to properly terminate backend and worker processes
- This resolves the timeout issue when cluster client forwards jobs to local backend
parent e28db173
...@@ -129,7 +129,7 @@ class ClusterClient: ...@@ -129,7 +129,7 @@ class ClusterClient:
self.connected = False self.connected = False
async def start_local_processes(self) -> None: async def start_local_processes(self) -> None:
"""Start local worker processes based on available backends (GPU and CPU).""" """Start local backend and worker processes based on available backends (GPU and CPU)."""
from .compat import detect_gpu_backends, get_available_backends from .compat import detect_gpu_backends, get_available_backends
gpu_info = detect_gpu_backends() gpu_info = detect_gpu_backends()
...@@ -140,6 +140,14 @@ class ClusterClient: ...@@ -140,6 +140,14 @@ class ClusterClient:
# Only start processes if not already started # Only start processes if not already started
if not self.local_processes: if not self.local_processes:
# Start local backend process first
backend_cmd = [sys.executable, '-m', 'vidai.backend']
self.local_processes['backend'] = subprocess.Popen(backend_cmd)
print("Started local backend process")
# Give backend time to start up
await asyncio.sleep(2)
# Start analysis workers for available backends (including CPU) # Start analysis workers for available backends (including CPU)
for backend in available_backends: for backend in available_backends:
proc_name = f'analysis_{backend}' proc_name = f'analysis_{backend}'
...@@ -481,10 +489,13 @@ class ClusterClient: ...@@ -481,10 +489,13 @@ class ClusterClient:
except: except:
pass pass
# Clear the processes dict # Clear the processes dict (but keep backend)
backend_proc = self.local_processes.get('backend')
self.local_processes.clear() self.local_processes.clear()
self.process_weights.clear() self.process_weights.clear()
self.process_models.clear() self.process_models.clear()
if backend_proc:
self.local_processes['backend'] = backend_proc
# Check if backend is available # Check if backend is available
from .compat import get_available_backends from .compat import get_available_backends
...@@ -709,9 +720,15 @@ class ClusterClient: ...@@ -709,9 +720,15 @@ class ClusterClient:
# Final cleanup # Final cleanup
for proc in self.local_processes.values(): for proc in self.local_processes.values():
try:
proc.terminate() proc.terminate()
except:
pass
for proc in self.local_processes.values(): for proc in self.local_processes.values():
try:
proc.wait() proc.wait()
except:
pass
def start_cluster_client(host: str, port: int, token: str, optimize: bool = False, flash: bool = False, weight: int = 100, shared_dir: str = None) -> None: def start_cluster_client(host: str, port: int, token: str, optimize: bool = False, flash: bool = False, weight: int = 100, shared_dir: str = None) -> None:
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment