Fix job execution in cluster: implement proper job assignment and result handling

- Fixed process type mapping in queue manager ('analyze' -> 'analysis', 'train' -> 'training')
- Implemented actual job sending in cluster master assign_job_to_worker()
- Modified cluster client to forward jobs to local backend and monitor results
- Added result polling mechanism for cluster jobs
- Jobs should now execute on connected cluster workers instead of remaining queued

The issue was that jobs were being assigned but never sent to workers. Now:
1. Queue manager selects worker using VRAM-aware logic
2. Cluster master assigns job and sends it via websocket
3. Cluster client receives job and forwards to local backend
4. Cluster client polls backend for results and sends back to master
5. Results are properly returned to web interface
parent 0de46dee
...@@ -49,6 +49,7 @@ class ClusterClient: ...@@ -49,6 +49,7 @@ class ClusterClient:
self.local_processes = {} # type: Dict[str, subprocess.Popen] self.local_processes = {} # type: Dict[str, subprocess.Popen]
self.process_weights = {} # type: Dict[str, int] self.process_weights = {} # type: Dict[str, int]
self.process_models = {} # type: Dict[str, str] self.process_models = {} # type: Dict[str, str]
self.pending_jobs = {} # type: Dict[str, asyncio.Task] # job_id -> result monitoring task
self.loop = None self.loop = None
async def connect(self) -> bool: async def connect(self) -> bool:
...@@ -292,57 +293,102 @@ class ClusterClient: ...@@ -292,57 +293,102 @@ class ClusterClient:
async def _handle_job_assignment(self, message: Dict[str, Any]) -> None: async def _handle_job_assignment(self, message: Dict[str, Any]) -> None:
"""Handle job assignment from master.""" """Handle job assignment from master."""
job_id = message.get('job_id') job_id = message.get('job_id')
process_type = message.get('process_type', 'analysis')
job_data = message.get('job_data', {}) job_data = message.get('job_data', {})
# Extract job parameters print(f"Received job assignment: {job_id} for process type: {process_type}")
request_type = job_data.get('request_type', 'analyze')
model_path = job_data.get('model_path', 'Qwen/Qwen2.5-VL-7B-Instruct') # Forward job to local backend, which will route it to the appropriate worker
media_path = job_data.get('local_path') try:
prompt = job_data.get('prompt', 'Describe this image.') from .comm import SocketCommunicator
interval = job_data.get('interval', 10) from .compat import get_socket_path
# Forward to appropriate local worker # Connect to local backend
worker_type = f'{request_type}_cuda' # Assume CUDA for now backend_comm = SocketCommunicator(socket_path=get_socket_path('worker'), comm_type='unix')
if worker_type in self.local_processes: backend_comm.connect()
# Send job to local worker process
import json # Send job to backend
job_message = { job_message = {
'msg_type': f'{request_type}_request', 'msg_type': f'{process_type}_request',
'msg_id': job_id, 'msg_id': job_id,
'data': { 'data': job_data
'model_path': model_path,
'local_path': media_path,
'prompt': prompt,
'interval': interval
}
} }
# Send to worker via socket or other mechanism backend_comm.send_message(job_message)
# For now, assume workers listen on sockets print(f"Job {job_id} forwarded to local backend for {process_type} processing")
try:
import socket
worker_socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
worker_socket.connect(f'/tmp/vidai_{worker_type}.sock')
worker_socket.sendall(json.dumps(job_message).encode('utf-8'))
worker_socket.close()
# Wait for result (simplified) # Start monitoring for result
await asyncio.sleep(1) # Placeholder self.pending_jobs[job_id] = asyncio.create_task(self._monitor_job_result(job_id))
except Exception as e: except Exception as e:
print(f"Failed to send job to local worker: {e}") print(f"Failed to forward job {job_id} to local backend: {e}")
await self._send_message({
'type': 'job_result',
'job_id': job_id,
'result': {'status': 'failed', 'error': str(e)}
})
else:
await self._send_message({ await self._send_message({
'type': 'job_result', 'type': 'job_result',
'job_id': job_id, 'job_id': job_id,
'result': {'status': 'failed', 'error': f'No local {worker_type} worker available'} 'result': {'status': 'failed', 'error': f'Failed to forward to worker: {str(e)}'}
}) })
async def _monitor_job_result(self, job_id: str) -> None:
"""Monitor for job result from local backend."""
try:
from .comm import SocketCommunicator
from .compat import get_socket_path
# Poll for result
for _ in range(300): # Poll for up to 5 minutes (300 * 1s)
try:
backend_comm = SocketCommunicator(socket_path=get_socket_path('worker'), comm_type='unix')
backend_comm.connect()
# Send get_result request
result_request = {
'msg_type': 'get_result',
'msg_id': f'poll_{job_id}',
'data': {'request_id': job_id}
}
backend_comm.send_message(result_request)
# Try to receive response
response = backend_comm.receive_message()
if response and response.get('msg_type') in ['analyze_response', 'train_response']:
result_data = response.get('data', {})
print(f"Received result for job {job_id}")
# Send result back to cluster master
await self._send_message({
'type': 'job_result',
'job_id': job_id,
'result': result_data
})
# Clean up
if job_id in self.pending_jobs:
del self.pending_jobs[job_id]
return
elif response and response.get('msg_type') == 'result_pending':
# Result not ready yet, wait and try again
await asyncio.sleep(1)
continue
except Exception as e:
print(f"Error polling for job {job_id} result: {e}")
await asyncio.sleep(1)
# Timeout - job took too long
print(f"Job {job_id} timed out waiting for result")
await self._send_message({
'type': 'job_result',
'job_id': job_id,
'result': {'status': 'failed', 'error': 'Job timed out'}
})
except Exception as e:
print(f"Error monitoring job {job_id}: {e}")
finally:
if job_id in self.pending_jobs:
del self.pending_jobs[job_id]
async def _handle_receive_file(self, message: Dict[str, Any]) -> None: async def _handle_receive_file(self, message: Dict[str, Any]) -> None:
"""Handle receiving a file from master.""" """Handle receiving a file from master."""
filename = message.get('filename') filename = message.get('filename')
......
...@@ -454,11 +454,34 @@ class ClusterMaster: ...@@ -454,11 +454,34 @@ class ClusterMaster:
if media_path and client_id in self.client_websockets: if media_path and client_id in self.client_websockets:
self._transfer_job_files(client_id, job_data, job_id) self._transfer_job_files(client_id, job_data, job_id)
# Send job assignment (simplified for now - would need async handling in real implementation) # Send job assignment
if client_id in self.client_websockets: if client_id in self.client_websockets:
# For synchronous version, we'll skip the websocket send for now try:
# In a real implementation, this would need to be handled asynchronously # Create the job assignment message
pass job_message = {
'type': 'job_assignment',
'job_id': job_id,
'process_type': self.processes[worker_key]['name'].split('_')[0], # 'analysis' or 'training'
'job_data': job_data
}
# Send via websocket (async)
asyncio.create_task(self.client_websockets[client_id].send(json.dumps(job_message)))
print(f"Job {job_id} assigned to worker {worker_key} on client {client_id}")
except Exception as e:
print(f"Failed to send job {job_id} to worker {worker_key}: {e}")
# Clean up the failed assignment
self.worker_jobs[worker_key].remove(job_id)
self.worker_vram_usage[worker_key] -= vram_required
del self.active_jobs[job_id]
return None
else:
print(f"Client {client_id} not connected, cannot assign job {job_id}")
# Clean up the failed assignment
self.worker_jobs[worker_key].remove(job_id)
self.worker_vram_usage[worker_key] -= vram_required
del self.active_jobs[job_id]
return None
return job_id return job_id
......
...@@ -127,8 +127,14 @@ class QueueManager: ...@@ -127,8 +127,14 @@ class QueueManager:
"""Execute job using local workers or distributed cluster.""" """Execute job using local workers or distributed cluster."""
from .cluster_master import cluster_master from .cluster_master import cluster_master
# Determine process type # Determine process type - map to cluster master naming convention
process_type = job['request_type'] # 'analyze' or 'train' request_type = job['request_type']
if request_type == 'analyze':
process_type = 'analysis'
elif request_type == 'train':
process_type = 'training'
else:
process_type = request_type
model_path = job['data'].get('model_path', 'Qwen/Qwen2.5-VL-7B-Instruct') model_path = job['data'].get('model_path', 'Qwen/Qwen2.5-VL-7B-Instruct')
# Use advanced job scheduling with VRAM requirements # Use advanced job scheduling with VRAM requirements
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment