Rate limit console messages in cluster master

- Added last_job_status_print timestamp to ClusterMaster class
- Modified _management_loop to only print job status messages once every 10 seconds
- This prevents console spam when jobs are waiting for workers
parent d98510b6
...@@ -60,6 +60,9 @@ class ClusterMaster: ...@@ -60,6 +60,9 @@ class ClusterMaster:
self.worker_jobs = defaultdict(list) # type: Dict[str, List[str]] # worker_key -> [job_ids] self.worker_jobs = defaultdict(list) # type: Dict[str, List[str]] # worker_key -> [job_ids]
self.worker_vram_usage = defaultdict(int) # type: Dict[str, int] # worker_key -> current VRAM usage in GB self.worker_vram_usage = defaultdict(int) # type: Dict[str, int] # worker_key -> current VRAM usage in GB
# Rate limiting for console messages
self.last_job_status_print = 0 # Timestamp of last job status message
def _generate_ssl_cert(self) -> ssl.SSLContext: def _generate_ssl_cert(self) -> ssl.SSLContext:
"""Generate self-signed SSL certificate for secure websockets.""" """Generate self-signed SSL certificate for secure websockets."""
cert_file = 'cluster.crt' cert_file = 'cluster.crt'
...@@ -1056,14 +1059,18 @@ class ClusterMaster: ...@@ -1056,14 +1059,18 @@ class ClusterMaster:
jobs = cursor.fetchall() jobs = cursor.fetchall()
conn.close() conn.close()
if jobs: current_time = time.time()
print(f"Found {len(jobs)} processing job(s) without assigned job_id") if jobs and (current_time - self.last_job_status_print) >= 10:
print(f"Found {len(jobs)} pending job(s)")
self.last_job_status_print = current_time
for job_row in jobs: for job_row in jobs:
job = dict(job_row) job = dict(job_row)
job['data'] = json.loads(job['data']) if job['data'] else None job['data'] = json.loads(job['data']) if job['data'] else None
process_type = 'analysis' if job['request_type'] == 'analyze' else ('training' if job['request_type'] == 'train' else job['request_type']) process_type = 'analysis' if job['request_type'] == 'analyze' else ('training' if job['request_type'] == 'train' else job['request_type'])
print(f"Attempting to assign job {job['id']} for {process_type}") if (current_time - self.last_job_status_print) >= 10:
print(f"Job {job['id']} waiting for available workers")
self.last_job_status_print = current_time
worker_key = self.select_worker_for_job(process_type, job['data'].get('model_path', 'Qwen/Qwen2.5-VL-7B-Instruct'), job['data']) worker_key = self.select_worker_for_job(process_type, job['data'].get('model_path', 'Qwen/Qwen2.5-VL-7B-Instruct'), job['data'])
if worker_key: if worker_key:
job_id = self.assign_job_to_worker(worker_key, job['data']) job_id = self.assign_job_to_worker(worker_key, job['data'])
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment