Store cluster processes in database for queue to check available workers

parent fd1c1545
......@@ -23,8 +23,6 @@ import time
import threading
from .comm import SocketServer, Message
from .config import get_analysis_backend, get_training_backend, set_analysis_backend, set_training_backend
from .cluster_master import start_cluster_master
import threading
worker_sockets = {} # type: dict
......@@ -131,13 +129,6 @@ def backend_process() -> None:
worker_server = SocketServer(socket_path=worker_socket_path, comm_type='unix')
worker_server.start(worker_message_handler)
# Start cluster master in background thread
from .config import get_cluster_host, get_cluster_port
cluster_host = get_cluster_host() or '0.0.0.0'
cluster_port = get_cluster_port()
cluster_thread = threading.Thread(target=start_cluster_master, args=(cluster_host, cluster_port, None, None, False), daemon=True)
cluster_thread.start()
try:
while True:
time.sleep(1)
......
......@@ -290,6 +290,15 @@ class ClusterMaster:
# Sort by weight (highest first)
self.process_queue[proc_type].sort(key=lambda x: x[1], reverse=True)
# Save to database
from .database import get_db_connection
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute('INSERT INTO cluster_processes (client_id, process_name, weight, status) VALUES (?, ?, ?, ?)',
(client_id, proc_name, proc_info.get('weight', 10), proc_info.get('status', 'active')))
conn.commit()
conn.close()
print(f"Client {client_id} registered {len(processes)} processes")
return {'type': 'registration_success'}
......@@ -302,11 +311,18 @@ class ClusterMaster:
def _remove_client(self, client_id: str) -> None:
"""Remove a client and its processes."""
from .database import disconnect_cluster_client
from .database import disconnect_cluster_client, get_db_connection
# Mark as disconnected in database
disconnect_cluster_client(client_id)
# Delete processes from database
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute('DELETE FROM cluster_processes WHERE client_id = ?', (client_id,))
conn.commit()
conn.close()
if client_id in self.client_websockets:
del self.client_websockets[client_id]
......
......@@ -628,6 +628,32 @@ def init_db(conn) -> None:
# Column might already exist
pass
# Cluster processes table
if config['type'] == 'mysql':
cursor.execute('''
CREATE TABLE IF NOT EXISTS cluster_processes (
id INT AUTO_INCREMENT PRIMARY KEY,
client_id VARCHAR(32) NOT NULL,
process_name VARCHAR(255) NOT NULL,
weight INT DEFAULT 10,
status VARCHAR(20) DEFAULT 'active',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (client_id) REFERENCES cluster_clients (client_id) ON DELETE CASCADE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci
''')
else:
cursor.execute('''
CREATE TABLE IF NOT EXISTS cluster_processes (
id INTEGER PRIMARY KEY,
client_id TEXT NOT NULL,
process_name TEXT NOT NULL,
weight INTEGER DEFAULT 10,
status TEXT DEFAULT 'active',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (client_id) REFERENCES cluster_clients (client_id)
)
''')
# Insert default admin user if not exist
import hashlib
default_password = hashlib.sha256('admin'.encode()).hexdigest()
......
......@@ -90,7 +90,6 @@ class QueueManager:
def _can_start_job(self, job: Dict[str, Any]) -> bool:
"""Check if a job can be started (worker available)."""
from .cluster_master import cluster_master
from .config import get_analysis_backend, get_training_backend
from .backend import worker_sockets
......@@ -101,11 +100,9 @@ class QueueManager:
process_type = 'training'
else:
process_type = request_type
model_path = job['data'].get('model_path', 'Qwen/Qwen2.5-VL-7B-Instruct')
# Check for distributed worker
worker_key = cluster_master.select_worker_for_job(process_type, model_path, job['data'])
if worker_key:
# Check for distributed worker via TCP query
if self._has_distributed_worker(process_type):
return True
# Check for local worker
......@@ -123,6 +120,16 @@ class QueueManager:
return False
def _has_distributed_worker(self, process_type: str) -> bool:
"""Check if there is a distributed worker available for the process type."""
from .database import get_db_connection
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute('SELECT id FROM cluster_processes WHERE process_name LIKE ? AND status = ?', (f'{process_type}%', 'active'))
result = cursor.fetchone()
conn.close()
return result is not None
def _process_queue(self) -> None:
"""Background thread to process queued jobs."""
while self.running:
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment