Remove queue processing from QueueManager

- QueueManager now only handles job submission and management
- Job processing is handled exclusively by cluster master
- Eliminates duplicate queue processing between web and cluster processes
parent c243e798
......@@ -18,28 +18,18 @@
Queue management for concurrent processing.
"""
import threading
import time
from typing import List, Dict, Any, Optional
from .database import (
add_to_queue, get_pending_queue_items, update_queue_status,
add_to_queue, update_queue_status,
get_queue_status, get_user_queue_items, get_queue_position
)
from .config import get_max_concurrent_jobs
from .comm import Message
class QueueManager:
"""Manages processing queue and concurrent job execution."""
"""Manages job submission to the processing queue."""
def __init__(self):
print("QueueManager initialized", flush=True)
self.active_jobs = 0
self.max_concurrent = get_max_concurrent_jobs()
self.lock = threading.Lock()
self.running = True
self.worker_thread = threading.Thread(target=self._process_queue, daemon=True)
self.worker_thread.start()
self.last_status_print = 0 # Timestamp of last status message
def submit_job(self, user_id: int, request_type: str, data: dict, priority: int = 0) -> int:
......@@ -110,156 +100,6 @@ class QueueManager:
return True
def _can_start_job(self, job: Dict[str, Any]) -> bool:
"""Check if a job can be started (worker available)."""
from .config import get_analysis_backend, get_training_backend
from .backend import worker_sockets
request_type = job['request_type']
if request_type == 'analyze':
process_type = 'analysis'
elif request_type == 'train':
process_type = 'training'
else:
process_type = request_type
# Check for distributed worker via TCP query
if self._has_distributed_worker(process_type):
return True
# For local processing, always allow if no distributed workers
# The backend will handle worker availability
return True
def _has_distributed_worker(self, process_type: str) -> bool:
"""Check if there is a distributed worker available for the process type."""
from .database import get_connected_cluster_clients
clients = get_connected_cluster_clients()
return len(clients) > 0
def _process_queue(self) -> None:
"""Background thread to process queued jobs."""
print("Queue manager started", flush=True)
while self.running:
try:
with self.lock:
if self.active_jobs < self.max_concurrent:
pending = get_pending_queue_items()
if pending:
current_time = time.time()
if current_time - self.last_status_print >= 10:
print(f"Found {len(pending)} pending job(s)", flush=True)
self.last_status_print = current_time
job = pending[0] # Get highest priority job
if self._can_start_job(job):
self._start_job(job)
else:
if current_time - self.last_status_print >= 10:
print(f"Job {job['id']} waiting for available workers", flush=True)
self.last_status_print = current_time
time.sleep(1) # Check every second
except Exception as e:
print(f"Queue processing error: {e}", flush=True)
time.sleep(5)
def _start_job(self, job: Dict[str, Any]) -> None:
"""Start processing a job."""
update_queue_status(job['id'], 'processing')
self.active_jobs += 1
# Start job in separate thread
threading.Thread(target=self._execute_job, args=(job,), daemon=True).start()
def _execute_job(self, job: Dict[str, Any]) -> None:
"""Execute the job by sending to appropriate worker (local or remote)."""
try:
from .config import is_cluster_client, get_cluster_host, get_cluster_token
if is_cluster_client():
# Running as cluster client - forward to cluster master
self._execute_remote_job(job)
else:
# Running as master - use local workers or distributed
self._execute_local_or_distributed_job(job)
except Exception as e:
update_queue_status(job['id'], 'failed', error=str(e))
finally:
with self.lock:
self.active_jobs -= 1
def _execute_remote_job(self, job: Dict[str, Any]) -> None:
"""Execute job via cluster master."""
from .cluster_client import ClusterClient
from .config import get_cluster_host, get_cluster_port, get_cluster_token
# This would be handled by the cluster client bridge
# For now, simulate
time.sleep(10)
estimated_tokens = job.get('estimated_tokens', 100)
import random
used_tokens = int(estimated_tokens * random.uniform(0.8, 1.2))
result = {"status": "completed", "result": f"Processed {job['request_type']} via cluster"}
update_queue_status(job['id'], 'completed', result, used_tokens=used_tokens)
def _execute_local_or_distributed_job(self, job: Dict[str, Any]) -> None:
"""Execute job using cluster master for both local and distributed processing."""
# Always use cluster master for job processing
# The cluster master will handle worker selection (local or distributed)
from .database import update_queue_status
update_queue_status(job['id'], 'processing', {'status': 'Waiting for assignment'})
retry_count = job.get('retry_count', 0)
if retry_count > 0:
print(f"Job {job['id']} retry {retry_count}, marked for processing")
else:
print(f"Job {job['id']} marked for processing")
def _send_to_distributed_worker(self, job: Dict[str, Any], worker_key: str) -> None:
"""Send job to distributed worker."""
from .cluster_master import cluster_master
# Parse worker key
client_id, proc_name = worker_key.split(':', 1)
# Send job to the appropriate client
if client_id in cluster_master.client_sockets:
# Add job_id to data for cancellation checking
job_data = job['data'].copy()
job_data['job_id'] = job['id']
message = {
'type': 'job_request',
'job_id': job['id'],
'request_type': job['request_type'],
'data': job_data
}
cluster_master.client_sockets[client_id].sendall(
json.dumps(message).encode('utf-8') + b'\n'
)
# Wait for completion (simplified - in real implementation would be async)
time.sleep(10)
estimated_tokens = job.get('estimated_tokens', 100)
import random
used_tokens = int(estimated_tokens * random.uniform(0.8, 1.2))
result = {"status": "completed", "result": f"Processed {job['request_type']} on distributed worker"}
update_queue_status(job['id'], 'completed', result, used_tokens=used_tokens)
else:
# Worker not available, fall back to local
self._execute_local_job(job)
def stop(self) -> None:
"""Stop the queue manager."""
self.running = False
if self.worker_thread.is_alive():
self.worker_thread.join(timeout=5)
# Global queue manager instance
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment