Unify job processing through cluster master

- Always use cluster master for job assignment, even for local jobs
- Remove separate local processing path
- Local processes treated same as remote, except for auto weight adjustment
parent 32ef8692
...@@ -206,36 +206,17 @@ class QueueManager: ...@@ -206,36 +206,17 @@ class QueueManager:
update_queue_status(job['id'], 'completed', result, used_tokens=used_tokens) update_queue_status(job['id'], 'completed', result, used_tokens=used_tokens)
def _execute_local_or_distributed_job(self, job: Dict[str, Any]) -> None: def _execute_local_or_distributed_job(self, job: Dict[str, Any]) -> None:
"""Execute job using local workers or distributed cluster.""" """Execute job using cluster master for both local and distributed processing."""
# Determine process type - map to cluster master naming convention # Always use cluster master for job processing
request_type = job['request_type'] # The cluster master will handle worker selection (local or distributed)
if request_type == 'analyze': from .database import update_queue_status
process_type = 'analysis' update_queue_status(job['id'], 'processing', {'status': 'Waiting for assignment'})
elif request_type == 'train':
process_type = 'training'
else:
process_type = request_type
# Check if this job has failed before (has retry_count)
retry_count = job.get('retry_count', 0) retry_count = job.get('retry_count', 0)
if retry_count > 0:
# Check if distributed workers are available print(f"Job {job['id']} retry {retry_count}, marked for processing")
has_distributed = self._has_distributed_worker(process_type) else:
print(f"Job {job['id']} marked for processing")
if has_distributed:
# Use distributed processing for all jobs when available
# The cluster master will handle retry logic and worker selection
from .database import update_queue_status
update_queue_status(job['id'], 'processing', {'status': 'Waiting for assignment'})
if retry_count > 0:
print(f"Job {job['id']} retry {retry_count}, marked for distributed processing")
else:
print(f"Job {job['id']} marked for distributed processing")
return
# No distributed workers available, fall back to local processing
print(f"No distributed workers available for job {job['id']}, falling back to local processing")
self._execute_local_job(job)
def _send_to_distributed_worker(self, job: Dict[str, Any], worker_key: str) -> None: def _send_to_distributed_worker(self, job: Dict[str, Any], worker_key: str) -> None:
"""Send job to distributed worker.""" """Send job to distributed worker."""
...@@ -273,53 +254,6 @@ class QueueManager: ...@@ -273,53 +254,6 @@ class QueueManager:
# Worker not available, fall back to local # Worker not available, fall back to local
self._execute_local_job(job) self._execute_local_job(job)
def _execute_local_job(self, job: Dict[str, Any]) -> None:
"""Execute job using local workers."""
from .backend import handle_web_message, get_result
# Add job_id to data for cancellation checking
job_data = job['data'].copy()
job_data['job_id'] = job['id']
message = Message(
msg_type=f"{job['request_type']}_request",
msg_id=str(job['id']),
data=job_data
)
# Send to backend
response = handle_web_message(message)
if response and response.msg_type == 'error':
# Immediate error - re-queue the job instead of failing
update_queue_status(job['id'], 'queued', error=response.data.get('error', 'No workers available, re-queued'))
return
# Poll for result (backend handles async processing)
for _ in range(600): # Poll for up to 10 minutes
result_response = get_result(str(job['id']))
if 'data' in result_response:
result_data = result_response['data']
if 'result' in result_data:
# Success
estimated_tokens = job.get('estimated_tokens', 100)
import random
used_tokens = int(estimated_tokens * random.uniform(0.8, 1.2))
update_queue_status(job['id'], 'completed', result_data, used_tokens=used_tokens)
return
elif 'error' in result_data:
# Error
update_queue_status(job['id'], 'failed', error=result_data['error'])
return
elif result_response.get('error'):
# Timeout or other error
update_queue_status(job['id'], 'failed', error=result_response['error'])
return
time.sleep(1) # Wait 1 second before polling again
# Timeout
update_queue_status(job['id'], 'failed', error='Job processing timeout')
def stop(self) -> None: def stop(self) -> None:
"""Stop the queue manager.""" """Stop the queue manager."""
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment