Change local job monitoring to poll database instead of TCP

- Use get_queue_by_job_id to check job status
- More reliable than TCP polling for local jobs
parent 08a9a99e
...@@ -628,35 +628,41 @@ class ClusterMaster: ...@@ -628,35 +628,41 @@ class ClusterMaster:
return None return None
async def _monitor_job_result(self, job_id: str, process_type: str) -> None: async def _monitor_job_result(self, job_id: str, process_type: str) -> None:
"""Monitor for job result from local backend.""" """Monitor for job result by polling database."""
try: try:
from .comm import SocketCommunicator from .database import get_queue_by_job_id
from .config import get_backend_web_port import json
# Poll for result # Poll for result
for _ in range(300): # Poll for up to 5 minutes (300 * 1s) for _ in range(300): # Poll for up to 5 minutes (300 * 1s)
try: try:
backend_comm = SocketCommunicator(host='localhost', port=get_backend_web_port(), comm_type='tcp') queue_entry = get_queue_by_job_id(job_id)
backend_comm.connect() if queue_entry:
status = queue_entry['status']
# Send get_result request if status == 'completed':
result_request = Message( result = json.loads(queue_entry['result']) if queue_entry['result'] else {}
msg_type='get_result',
msg_id=f'poll_{job_id}',
data={'request_id': job_id}
)
backend_comm.send_message(result_request)
# Try to receive response
response = backend_comm.receive_message()
if response and response.msg_type in ['analyze_response', 'train_response']:
result_data = response.data
print(f"Received result for job {job_id}") print(f"Received result for job {job_id}")
# Handle result # Handle result
await self._handle_job_result({ await self._handle_job_result({
'job_id': job_id, 'job_id': job_id,
'result': result_data 'result': result
})
# Clean up
if job_id in self.pending_jobs:
del self.pending_jobs[job_id]
return
elif status == 'failed':
error = queue_entry.get('error', 'Unknown error')
result = {'status': 'failed', 'error': error}
print(f"Job {job_id} failed: {error}")
# Handle failed result
await self._handle_job_result({
'job_id': job_id,
'result': result
}) })
# Clean up # Clean up
...@@ -664,13 +670,13 @@ class ClusterMaster: ...@@ -664,13 +670,13 @@ class ClusterMaster:
del self.pending_jobs[job_id] del self.pending_jobs[job_id]
return return
elif response and response.msg_type == 'result_pending': elif status == 'processing':
# Result not ready yet, wait and try again # Still processing, wait and try again
await asyncio.sleep(1) await asyncio.sleep(1)
continue continue
except Exception as e: except Exception as e:
print(f"Error polling for job {job_id} result: {e}") print(f"Error polling for job {job_id} status: {e}")
await asyncio.sleep(1) await asyncio.sleep(1)
# Timeout - job took too long # Timeout - job took too long
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment