Add job result monitoring for local job assignment

- Local jobs now monitor for completion and handle results
- Prevents jobs from hanging without result retrieval
parent 9b864708
...@@ -608,6 +608,9 @@ class ClusterMaster: ...@@ -608,6 +608,9 @@ class ClusterMaster:
backend_comm.send_message(job_message) backend_comm.send_message(job_message)
print(f"Job {job_id} forwarded to local backend for {process_type} processing") print(f"Job {job_id} forwarded to local backend for {process_type} processing")
# Start monitoring for result
self.pending_jobs[job_id] = asyncio.create_task(self._monitor_job_result(job_id, process_type))
return job_id return job_id
except Exception as e: except Exception as e:
...@@ -618,6 +621,65 @@ class ClusterMaster: ...@@ -618,6 +621,65 @@ class ClusterMaster:
del self.active_jobs[job_id] del self.active_jobs[job_id]
return None return None
async def _monitor_job_result(self, job_id: str, process_type: str) -> None:
"""Monitor for job result from local backend."""
try:
from .comm import SocketCommunicator
from .config import get_backend_web_port
# Poll for result
for _ in range(300): # Poll for up to 5 minutes (300 * 1s)
try:
backend_comm = SocketCommunicator(host='localhost', port=get_backend_web_port(), comm_type='tcp')
backend_comm.connect()
# Send get_result request
result_request = Message(
msg_type='get_result',
msg_id=f'poll_{job_id}',
data={'request_id': job_id}
)
backend_comm.send_message(result_request)
# Try to receive response
response = backend_comm.receive_message()
if response and response.get('msg_type') in ['analyze_response', 'train_response']:
result_data = response.get('data', {})
print(f"Received result for job {job_id}")
# Handle result
await self._handle_job_result({
'job_id': job_id,
'result': result_data
})
# Clean up
if job_id in self.pending_jobs:
del self.pending_jobs[job_id]
return
elif response and response.get('msg_type') == 'result_pending':
# Result not ready yet, wait and try again
await asyncio.sleep(1)
continue
except Exception as e:
print(f"Error polling for job {job_id} result: {e}")
await asyncio.sleep(1)
# Timeout - job took too long
print(f"Job {job_id} timed out waiting for result")
await self._handle_job_result({
'job_id': job_id,
'result': {'status': 'failed', 'error': 'Job timed out'}
})
except Exception as e:
print(f"Error monitoring job {job_id}: {e}")
finally:
if job_id in self.pending_jobs:
del self.pending_jobs[job_id]
async def _transfer_job_files(self, client_id: str, job_data: dict, job_id: str) -> None: async def _transfer_job_files(self, client_id: str, job_data: dict, job_id: str) -> None:
"""Transfer job files to client.""" """Transfer job files to client."""
media_path = job_data.get('local_path') media_path = job_data.get('local_path')
...@@ -1047,8 +1109,8 @@ class ClusterMaster: ...@@ -1047,8 +1109,8 @@ class ClusterMaster:
total_vram = 0 # CPU has no VRAM limit total_vram = 0 # CPU has no VRAM limit
print(f"DEBUG: CPU-only worker detected") print(f"DEBUG: CPU-only worker detected")
# Check if worker has enough VRAM (skip check for CPU workers) # Check if worker has enough VRAM (skip check for CPU workers or local workers)
has_sufficient_vram = total_vram >= required_vram_gb or not has_gpu has_sufficient_vram = total_vram >= required_vram_gb or not has_gpu or client_id == 'local'
if has_sufficient_vram: if has_sufficient_vram:
available_workers.append((proc_key, client_info['weight'], total_vram)) available_workers.append((proc_key, client_info['weight'], total_vram))
print(f"DEBUG: Worker {proc_key} accepted (VRAM: {total_vram}GB, required: {required_vram_gb}GB)") print(f"DEBUG: Worker {proc_key} accepted (VRAM: {total_vram}GB, required: {required_vram_gb}GB)")
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment