Implement client failure tracking and exclusion

- Added consecutive_failures and failing flags to client tracking
- Increment failure counter on job failures, reset on success
- Mark clients as failing after 3 consecutive failures
- Exclude failing clients from worker selection in all methods
- Reset failure tracking when clients reconnect
- This prevents problematic clients from receiving jobs until they reconnect
parent 20795a80
...@@ -252,7 +252,9 @@ class ClusterMaster: ...@@ -252,7 +252,9 @@ class ClusterMaster:
'gpu_info': gpu_info, 'gpu_info': gpu_info,
'available_backends': available_backends, 'available_backends': available_backends,
'connected': True, 'connected': True,
'last_seen': time.time() 'last_seen': time.time(),
'consecutive_failures': 0, # Track consecutive job failures
'failing': False # Mark as failing after 3 consecutive failures
} }
# If this is the first client and weight wasn't explicitly set, change master weight to 0 # If this is the first client and weight wasn't explicitly set, change master weight to 0
...@@ -371,6 +373,10 @@ class ClusterMaster: ...@@ -371,6 +373,10 @@ class ClusterMaster:
client_id = self.processes[proc_key]['client_id'] client_id = self.processes[proc_key]['client_id']
client_info = self.clients[client_id] client_info = self.clients[client_id]
# Skip failing clients
if client_info.get('failing', False):
continue
# Check if client has GPU # Check if client has GPU
gpu_info = client_info.get('gpu_info', {}) gpu_info = client_info.get('gpu_info', {})
has_gpu = gpu_info.get('cuda', False) or gpu_info.get('rocm', False) has_gpu = gpu_info.get('cuda', False) or gpu_info.get('rocm', False)
...@@ -593,17 +599,51 @@ class ClusterMaster: ...@@ -593,17 +599,51 @@ class ClusterMaster:
result = message.get('result', {}) result = message.get('result', {})
if job_id: if job_id:
# Mark job as completed # Check if job failed
job_failed = result.get('status') == 'failed'
# Always clean up resources
self.complete_job(job_id, result) self.complete_job(job_id, result)
# Get the client that executed this job
job_info = self.active_jobs.get(job_id)
client_id = job_info.get('client_id') if job_info else None
# Track consecutive failures per client
if client_id and client_id in self.clients:
if job_failed:
self.clients[client_id]['consecutive_failures'] += 1
if self.clients[client_id]['consecutive_failures'] >= 3:
self.clients[client_id]['failing'] = True
print(f"Client {client_id} marked as failing after {self.clients[client_id]['consecutive_failures']} consecutive failures")
else:
# Reset failure counter on success
if self.clients[client_id]['consecutive_failures'] > 0:
print(f"Client {client_id} failure counter reset (was {self.clients[client_id]['consecutive_failures']})")
self.clients[client_id]['consecutive_failures'] = 0
self.clients[client_id]['failing'] = False
# Update database if this job_id corresponds to a queue job # Update database if this job_id corresponds to a queue job
# The job_id from cluster master is like "job_1234567890_123" # The job_id from cluster master is like "job_1234567890_123"
# We need to find the corresponding queue entry # We need to find the corresponding queue entry
from .database import get_queue_by_job_id, update_queue_status from .database import get_queue_by_job_id, update_queue_status
queue_entry = get_queue_by_job_id(job_id) queue_entry = get_queue_by_job_id(job_id)
if queue_entry: if queue_entry:
if job_failed:
# Get current retry count
current_retry_count = queue_entry.get('retry_count', 0)
new_retry_count = current_retry_count + 1
if new_retry_count >= 5:
# Mark as permanently failed after 5 retries
update_queue_status(queue_entry['id'], 'failed', result, error=f'Job failed after {new_retry_count} attempts: {result.get("error", "Unknown error")}', retry_count=new_retry_count)
print(f"Job {job_id} failed permanently after {new_retry_count} attempts")
else:
# Re-queue failed jobs with incremented retry count
update_queue_status(queue_entry['id'], 'queued', result, error=f'Job failed on worker (attempt {new_retry_count}/5): {result.get("error", "Unknown error")}', retry_count=new_retry_count)
print(f"Job {job_id} failed, re-queued (attempt {new_retry_count}/5)")
else:
update_queue_status(queue_entry['id'], 'completed', result) update_queue_status(queue_entry['id'], 'completed', result)
print(f"Job {job_id} completed") print(f"Job {job_id} completed")
# TODO: Store result for retrieval by web interface # TODO: Store result for retrieval by web interface
...@@ -764,10 +804,16 @@ class ClusterMaster: ...@@ -764,10 +804,16 @@ class ClusterMaster:
for proc_key, proc_weight in queue: for proc_key, proc_weight in queue:
client_id = self.processes[proc_key]['client_id'] client_id = self.processes[proc_key]['client_id']
client_gpu_info = self.clients[client_id]['gpu_info'] client_info = self.clients[client_id]
# Skip failing clients
if client_info.get('failing', False):
continue
client_gpu_info = client_info['gpu_info']
has_gpu = client_gpu_info.get('cuda', False) or client_gpu_info.get('rocm', False) has_gpu = client_gpu_info.get('cuda', False) or client_gpu_info.get('rocm', False)
client_weight = self.clients[client_id]['weight'] client_weight = client_info['weight']
combined_weight = client_weight * proc_weight combined_weight = client_weight * proc_weight
if has_gpu: if has_gpu:
...@@ -806,6 +852,12 @@ class ClusterMaster: ...@@ -806,6 +852,12 @@ class ClusterMaster:
for proc_key, proc_weight in self.process_queue.get(process_type, []): for proc_key, proc_weight in self.process_queue.get(process_type, []):
client_id = self.processes[proc_key]['client_id'] client_id = self.processes[proc_key]['client_id']
client_info = self.clients[client_id] client_info = self.clients[client_id]
# Skip failing clients
if client_info.get('failing', False):
print(f"DEBUG: Skipping failing client {client_id}")
continue
gpu_info = client_info.get('gpu_info', {}) gpu_info = client_info.get('gpu_info', {})
print(f"DEBUG: Checking worker {proc_key} on client {client_id}") print(f"DEBUG: Checking worker {proc_key} on client {client_id}")
...@@ -1020,7 +1072,9 @@ class ClusterMaster: ...@@ -1020,7 +1072,9 @@ class ClusterMaster:
update_queue_status(job['id'], 'processing', {'job_id': job_id, 'status': 'Assigned to worker'}, job_id=job_id) update_queue_status(job['id'], 'processing', {'job_id': job_id, 'status': 'Assigned to worker'}, job_id=job_id)
print(f"Assigned job {job['id']} to worker {worker_key} with job_id {job_id}") print(f"Assigned job {job['id']} to worker {worker_key} with job_id {job_id}")
else: else:
print(f"Failed to assign job {job['id']} to worker {worker_key}") print(f"Failed to assign job {job['id']} to worker {worker_key}, re-queuing")
from .database import update_queue_status
update_queue_status(job['id'], 'queued', error='Failed to assign to worker, re-queued')
else: else:
print(f"No suitable worker found for job {job['id']}, re-queuing") print(f"No suitable worker found for job {job['id']}, re-queuing")
from .database import update_queue_status from .database import update_queue_status
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment