Add progress-based timeout extension to prevent premature job timeouts during...

Add progress-based timeout extension to prevent premature job timeouts during long-running operations
parent 159479f3
...@@ -28,6 +28,7 @@ from .queue import queue_manager ...@@ -28,6 +28,7 @@ from .queue import queue_manager
worker_sockets = {} # type: dict worker_sockets = {} # type: dict
pending_results = {} # msg_id -> result message pending_results = {} # msg_id -> result message
job_progress_times = {} # job_id -> last_progress_timestamp
def get_result(msg_id: str) -> dict: def get_result(msg_id: str) -> dict:
...@@ -128,9 +129,10 @@ def handle_worker_message(message: Message, client_sock) -> None: ...@@ -128,9 +129,10 @@ def handle_worker_message(message: Message, client_sock) -> None:
worker_sockets[worker_type] = client_sock worker_sockets[worker_type] = client_sock
print(f"Worker {worker_type} registered") print(f"Worker {worker_type} registered")
elif message.msg_type == 'progress': elif message.msg_type == 'progress':
# Store progress update for web to poll # Store progress update for web to poll and update progress timestamp
progress_key = f"progress_{message.data.get('job_id')}" progress_key = f"progress_{message.data.get('job_id')}"
pending_results[progress_key] = message pending_results[progress_key] = message
job_progress_times[message.data.get('job_id')] = time.time()
elif message.msg_type in ['analyze_response', 'train_response']: elif message.msg_type in ['analyze_response', 'train_response']:
# Store result for web to poll # Store result for web to poll
pending_results[message.msg_id] = message pending_results[message.msg_id] = message
......
...@@ -87,9 +87,31 @@ def send_to_backend(msg_type: str, data: dict) -> str: ...@@ -87,9 +87,31 @@ def send_to_backend(msg_type: str, data: dict) -> str:
print(f"Failed to send message to backend: {e}") print(f"Failed to send message to backend: {e}")
return msg_id return msg_id
def get_progress(job_id: str) -> dict:
"""Get progress for a job."""
try:
# Send get_progress request to backend
progress_msg = Message('get_progress', str(uuid.uuid4()), {'job_id': job_id})
comm.send_message(progress_msg)
# Try to receive response
response = comm.receive_message()
if response and response.msg_type == 'progress':
return response.data
elif response and response.msg_type == 'progress_pending':
return {'status': 'no_progress'}
except Exception as e:
print(f"Error getting progress: {e}")
return {}
def get_result(msg_id: str) -> dict: def get_result(msg_id: str) -> dict:
"""Poll for result from backend via socket.""" """Poll for result from backend via socket."""
for _ in range(100): # Poll for 10 seconds max_iterations = 100 # Base timeout: 10 seconds
progress_timeout = 300 # If progress received, extend to 5 minutes
has_progress = False
last_progress_time = time.time()
for i in range(max_iterations):
try: try:
# Send get_result request # Send get_result request
result_msg = Message('get_result', str(uuid.uuid4()), {'request_id': msg_id}) result_msg = Message('get_result', str(uuid.uuid4()), {'request_id': msg_id})
...@@ -104,10 +126,25 @@ def get_result(msg_id: str) -> dict: ...@@ -104,10 +126,25 @@ def get_result(msg_id: str) -> dict:
'data': response.data 'data': response.data
} }
elif response and response.msg_type == 'result_pending': elif response and response.msg_type == 'result_pending':
# Check if we have recent progress to extend timeout
progress_response = get_progress(msg_id)
if progress_response and 'progress' in progress_response:
has_progress = True
last_progress_time = time.time()
# Extend timeout when progress is active
if i >= max_iterations - 1:
max_iterations = min(max_iterations + 50, 500) # Add up to 5 more seconds per progress check
time.sleep(0.1) # Wait and try again time.sleep(0.1) # Wait and try again
continue continue
except: except:
time.sleep(0.1) time.sleep(0.1)
# If we have progress, extend timeout significantly
if has_progress and (time.time() - last_progress_time) < progress_timeout:
if i >= max_iterations - 1:
max_iterations += 100 # Add 10 more seconds when progress is recent
return {'error': 'Timeout waiting for result'} return {'error': 'Timeout waiting for result'}
@app.route('/') @app.route('/')
...@@ -579,28 +616,17 @@ def api_job_progress(job_id): ...@@ -579,28 +616,17 @@ def api_job_progress(job_id):
return {'error': 'Job not found or access denied'}, 404 return {'error': 'Job not found or access denied'}, 404
# Get progress from backend # Get progress from backend
try: progress_data = get_progress(str(job_id))
# Send get_progress request to backend if progress_data and 'progress' in progress_data:
result_msg = Message('get_progress', str(uuid.uuid4()), {'job_id': job_id}) return {
comm.send_message(result_msg) 'job_id': job_id,
'stage': progress_data.get('stage', 'unknown'),
# Try to receive response 'progress': progress_data.get('progress', 0),
response = comm.receive_message() 'message': progress_data.get('message', ''),
if response and response.msg_type == 'progress': 'timestamp': time.time()
return { }
'job_id': job_id, else:
'stage': response.data.get('stage', 'unknown'), return {'status': 'no_progress'}
'progress': response.data.get('progress', 0),
'message': response.data.get('message', ''),
'timestamp': time.time()
}
elif response and response.msg_type == 'progress_pending':
return {'status': 'no_progress'}
else:
return {'status': 'no_progress'}
except Exception as e:
print(f"Error getting progress: {e}")
return {'status': 'error', 'message': str(e)}
@app.route('/update_settings', methods=['POST']) @app.route('/update_settings', methods=['POST'])
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment