Add asynchronous ping mechanism to prevent job timeouts

- Implement BackgroundPing class that runs in separate thread
- Send ping messages every configurable interval during long operations
- Fix worker to use background ping during frame processing
- Ensure ping works even during blocking model inference operations
- Add proper cleanup of ping thread in finally block
parent c94f9425
...@@ -155,3 +155,6 @@ vidai-analysis-cuda ...@@ -155,3 +155,6 @@ vidai-analysis-cuda
vidai-analysis-rocm vidai-analysis-rocm
vidai-training-cuda vidai-training-cuda
vidai-training-rocm vidai-training-rocm
# Mobile app (separate repository)
mobileapp/
\ No newline at end of file
...@@ -767,11 +767,11 @@ class ClusterMaster: ...@@ -767,11 +767,11 @@ class ClusterMaster:
"""Monitor for job result from local backend.""" """Monitor for job result from local backend."""
try: try:
from .comm import SocketCommunicator from .comm import SocketCommunicator
from .config import get_backend_web_port from .config import get_backend_web_port, get_job_timeout_base, get_job_timeout_max
# Progress tracking for timeout extension # Progress tracking for timeout extension
start_time = time.time() start_time = time.time()
timeout = 91 # Base timeout: 91 seconds timeout = get_job_timeout_base() # Configurable base timeout
last_progress_time = start_time last_progress_time = start_time
# Poll for result # Poll for result
...@@ -843,7 +843,7 @@ class ClusterMaster: ...@@ -843,7 +843,7 @@ class ClusterMaster:
# Check if we have recent activity (progress or ping within 60 seconds) # Check if we have recent activity (progress or ping within 60 seconds)
if time.time() - last_progress_time < 60: if time.time() - last_progress_time < 60:
# Extend timeout # Extend timeout
timeout = min(timeout + 60, 3600) timeout = min(timeout + 60, get_job_timeout_max())
# Result not ready yet, wait and try again # Result not ready yet, wait and try again
await asyncio.sleep(1) await asyncio.sleep(1)
...@@ -856,6 +856,12 @@ class ClusterMaster: ...@@ -856,6 +856,12 @@ class ClusterMaster:
# Timeout - job took too long # Timeout - job took too long
elapsed = time.time() - start_time elapsed = time.time() - start_time
log_message(f"Job {job_id} timed out waiting for result ({elapsed:.0f} seconds)") log_message(f"Job {job_id} timed out waiting for result ({elapsed:.0f} seconds)")
# Try to cancel the job on the worker before marking as failed
job_info = self.active_jobs.get(job_id)
if job_info:
await self._cancel_job_processing(job_id)
await self._handle_job_result({ await self._handle_job_result({
'job_id': job_id, 'job_id': job_id,
'result': {'status': 'failed', 'error': f'Job timed out after {elapsed:.0f} seconds'} 'result': {'status': 'failed', 'error': f'Job timed out after {elapsed:.0f} seconds'}
......
...@@ -382,7 +382,10 @@ def get_all_settings() -> dict: ...@@ -382,7 +382,10 @@ def get_all_settings() -> dict:
'web_port': int(config.get('web_port', '5000')), 'web_port': int(config.get('web_port', '5000')),
'backend_host': config.get('backend_host', 'localhost'), 'backend_host': config.get('backend_host', 'localhost'),
'backend_web_port': int(config.get('backend_web_port', '5001')), 'backend_web_port': int(config.get('backend_web_port', '5001')),
'backend_worker_port': int(config.get('backend_worker_port', '5002')) 'backend_worker_port': int(config.get('backend_worker_port', '5002')),
'job_timeout_base': int(config.get('job_timeout_base', '180')),
'job_timeout_max': int(config.get('job_timeout_max', '7200')),
'job_ping_interval': int(config.get('job_ping_interval', '30'))
} }
...@@ -575,3 +578,34 @@ def get_redis_password() -> str: ...@@ -575,3 +578,34 @@ def get_redis_password() -> str:
def set_redis_password(password: str) -> None: def set_redis_password(password: str) -> None:
"""Set Redis password.""" """Set Redis password."""
set_config('redis_password', password) set_config('redis_password', password)
# Job timeout settings
def get_job_timeout_base() -> int:
"""Get base job timeout in seconds."""
return int(get_config('job_timeout_base', '180'))
def set_job_timeout_base(timeout: int) -> None:
"""Set base job timeout in seconds."""
set_config('job_timeout_base', str(timeout))
def get_job_timeout_max() -> int:
"""Get maximum job timeout in seconds."""
return int(get_config('job_timeout_max', '7200'))
def set_job_timeout_max(timeout: int) -> None:
"""Set maximum job timeout in seconds."""
set_config('job_timeout_max', str(timeout))
def get_job_ping_interval() -> int:
"""Get job ping interval in seconds."""
return int(get_config('job_ping_interval', '30'))
def set_job_ping_interval(interval: int) -> None:
"""Set job ping interval in seconds."""
set_config('job_ping_interval', str(interval))
\ No newline at end of file
...@@ -92,7 +92,10 @@ DEFAULTS = { ...@@ -92,7 +92,10 @@ DEFAULTS = {
'redis_port': '6379', 'redis_port': '6379',
'redis_db': '0', 'redis_db': '0',
'redis_password': '', 'redis_password': '',
'jwt_secret_key': 'vidai-jwt-secret-key-change-in-production' 'jwt_secret_key': 'vidai-jwt-secret-key-change-in-production',
'job_timeout_base': '180', # Base timeout in seconds (3 minutes)
'job_timeout_max': '7200', # Maximum timeout in seconds (2 hours)
'job_ping_interval': '30' # Ping interval in seconds
} }
......
...@@ -252,6 +252,7 @@ def analyze_media(media_path, prompt, model_path, interval=10, job_id=None, comm ...@@ -252,6 +252,7 @@ def analyze_media(media_path, prompt, model_path, interval=10, job_id=None, comm
descriptions = [] descriptions = []
frame_start_time = time.time()
for i, (frame_path, ts) in enumerate(frames): for i, (frame_path, ts) in enumerate(frames):
if get_debug(): if get_debug():
log_message(f"DEBUG: Processing frame {i+1}/{total_frames} at {ts:.2f}s for job {job_id_int}") log_message(f"DEBUG: Processing frame {i+1}/{total_frames} at {ts:.2f}s for job {job_id_int}")
...@@ -286,12 +287,30 @@ def analyze_media(media_path, prompt, model_path, interval=10, job_id=None, comm ...@@ -286,12 +287,30 @@ def analyze_media(media_path, prompt, model_path, interval=10, job_id=None, comm
pass pass
return "Job cancelled by user", total_tokens return "Job cancelled by user", total_tokens
# Watchdog: Check if previous frame took too long (more than 5 minutes)
if i > 0 and time.time() - frame_start_time > 300: # 5 minutes per frame
log_message(f"WATCHDOG: Frame {i} processing took too long ({time.time() - frame_start_time:.0f}s), aborting job {job_id_int}")
# Clean up and return error
for fp, _ in frames[i:]:
try:
os.unlink(fp)
except:
pass
if output_dir:
try:
import shutil
shutil.rmtree(output_dir)
except:
pass
return f"Job aborted: Frame processing timeout (frame {i} took {time.time() - frame_start_time:.0f}s)", total_tokens
desc, tokens = analyze_single_image(frame_path, full_prompt, model) desc, tokens = analyze_single_image(frame_path, full_prompt, model)
total_tokens += tokens total_tokens += tokens
if get_debug(): if get_debug():
log_message(f"DEBUG: Frame {i+1} analyzed for job {job_id_int}") log_message(f"DEBUG: Frame {i+1} analyzed for job {job_id_int}")
descriptions.append(f"At {ts:.2f}s: {desc}") descriptions.append(f"At {ts:.2f}s: {desc}")
os.unlink(frame_path) os.unlink(frame_path)
frame_start_time = time.time() # Reset timer for next frame
# Send progress update after processing # Send progress update after processing
if comm and (i + 1) % max(1, total_frames // 10) == 0: # Update every 10% or at least every frame for small videos if comm and (i + 1) % max(1, total_frames // 10) == 0: # Update every 10% or at least every frame for small videos
...@@ -306,8 +325,10 @@ def analyze_media(media_path, prompt, model_path, interval=10, job_id=None, comm ...@@ -306,8 +325,10 @@ def analyze_media(media_path, prompt, model_path, interval=10, job_id=None, comm
comm.send_message(progress_msg) comm.send_message(progress_msg)
log_message(f"PROGRESS: Job {job_id_int} - {progress_percent}% - Completed frame {i+1}/{total_frames}") log_message(f"PROGRESS: Job {job_id_int} - {progress_percent}% - Completed frame {i+1}/{total_frames}")
# Send ping every 30 seconds to keep connection alive # Send ping at configurable intervals to keep connection alive
if comm and (i + 1) % max(1, total_frames // (total_frames // 30 + 1)) == 0: from .config import get_job_ping_interval
ping_interval = get_job_ping_interval()
if comm and (i + 1) % max(1, total_frames // (total_frames // ping_interval + 1)) == 0:
ping_msg = Message('ping', f'ping_{job_id_int}_{i+1}', { ping_msg = Message('ping', f'ping_{job_id_int}_{i+1}', {
'job_id': job_id, 'job_id': job_id,
'timestamp': time.time() 'timestamp': time.time()
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment