Enhanced worker progress updates with more frequent and detailed messages

parent 0b69edd6
...@@ -27,6 +27,7 @@ import subprocess ...@@ -27,6 +27,7 @@ import subprocess
import json import json
import cv2 import cv2
import time import time
import uuid
from .comm import SocketCommunicator, Message from .comm import SocketCommunicator, Message
from .models import get_model from .models import get_model
from .config import get_system_prompt_content, get_comm_type, get_backend_worker_port from .config import get_system_prompt_content, get_comm_type, get_backend_worker_port
...@@ -175,6 +176,18 @@ def check_job_cancelled(job_id): ...@@ -175,6 +176,18 @@ def check_job_cancelled(job_id):
def analyze_media(media_path, prompt, model_path, interval=10, job_id=None, comm=None): def analyze_media(media_path, prompt, model_path, interval=10, job_id=None, comm=None):
"""Analyze media using dynamic model loading.""" """Analyze media using dynamic model loading."""
print(f"DEBUG: Starting analyze_media for job {job_id}, media_path={media_path}") print(f"DEBUG: Starting analyze_media for job {job_id}, media_path={media_path}")
# Send initial progress update
if comm:
progress_msg = Message('progress', f'progress_{job_id}', {
'job_id': job_id,
'stage': 'initializing',
'progress': 5,
'message': 'Initializing analysis job'
})
comm.send_message(progress_msg)
print(f"PROGRESS: Job {job_id} - 5% - Initializing analysis job")
torch.cuda.empty_cache() torch.cuda.empty_cache()
total_tokens = 0 total_tokens = 0
...@@ -183,6 +196,17 @@ def analyze_media(media_path, prompt, model_path, interval=10, job_id=None, comm ...@@ -183,6 +196,17 @@ def analyze_media(media_path, prompt, model_path, interval=10, job_id=None, comm
model = get_or_load_model(model_path) model = get_or_load_model(model_path)
print(f"DEBUG: Model loaded for job {job_id}") print(f"DEBUG: Model loaded for job {job_id}")
# Send progress update after model loading
if comm:
progress_msg = Message('progress', f'progress_{job_id}', {
'job_id': job_id,
'stage': 'model_loaded',
'progress': 8,
'message': f'Model {model_path.split("/")[-1]} loaded successfully'
})
comm.send_message(progress_msg)
print(f"PROGRESS: Job {job_id} - 8% - Model loaded successfully")
# Get system prompt # Get system prompt
print(f"DEBUG: Retrieving system prompt for job {job_id}") print(f"DEBUG: Retrieving system prompt for job {job_id}")
try: try:
...@@ -213,6 +237,19 @@ def analyze_media(media_path, prompt, model_path, interval=10, job_id=None, comm ...@@ -213,6 +237,19 @@ def analyze_media(media_path, prompt, model_path, interval=10, job_id=None, comm
for i, (frame_path, ts) in enumerate(frames): for i, (frame_path, ts) in enumerate(frames):
print(f"DEBUG: Processing frame {i+1}/{total_frames} at {ts:.2f}s for job {job_id}") print(f"DEBUG: Processing frame {i+1}/{total_frames} at {ts:.2f}s for job {job_id}")
# Send progress update before processing
if comm:
progress_percent = 10 + int(i / total_frames * 70) # 10-80% for frame processing
progress_msg = Message('progress', f'progress_{job_id}', {
'job_id': job_id,
'stage': 'frame_analysis',
'progress': progress_percent,
'message': f'Processing frame {i+1}/{total_frames} at {ts:.1f}s'
})
comm.send_message(progress_msg)
print(f"PROGRESS: Job {job_id} - {progress_percent}% - Processing frame {i+1}/{total_frames}")
# Check for cancellation # Check for cancellation
if job_id and check_job_cancelled(job_id): if job_id and check_job_cancelled(job_id):
print(f"DEBUG: Job {job_id} cancelled during frame processing") print(f"DEBUG: Job {job_id} cancelled during frame processing")
...@@ -236,17 +273,17 @@ def analyze_media(media_path, prompt, model_path, interval=10, job_id=None, comm ...@@ -236,17 +273,17 @@ def analyze_media(media_path, prompt, model_path, interval=10, job_id=None, comm
descriptions.append(f"At {ts:.2f}s: {desc}") descriptions.append(f"At {ts:.2f}s: {desc}")
os.unlink(frame_path) os.unlink(frame_path)
# Send progress update # Send progress update after processing
if comm: if comm and (i + 1) % max(1, total_frames // 10) == 0: # Update every 10% or at least every frame for small videos
progress_percent = 10 + int((i + 1) / total_frames * 70) # 10-80% for frame processing progress_percent = 10 + int((i + 1) / total_frames * 70)
progress_msg = Message('progress', f'progress_{job_id}', { progress_msg = Message('progress', f'progress_{job_id}', {
'job_id': job_id, 'job_id': job_id,
'stage': 'frame_analysis', 'stage': 'frame_analysis',
'progress': progress_percent, 'progress': progress_percent,
'message': f'Analyzed frame {i+1}/{total_frames}' 'message': f'Completed frame {i+1}/{total_frames} ({progress_percent}%)'
}) })
comm.send_message(progress_msg) comm.send_message(progress_msg)
print(f"PROGRESS: Job {job_id} - {progress_percent}% - Analyzed frame {i+1}/{total_frames}") print(f"PROGRESS: Job {job_id} - {progress_percent}% - Completed frame {i+1}/{total_frames}")
if output_dir: if output_dir:
import shutil import shutil
...@@ -309,34 +346,58 @@ def analyze_media(media_path, prompt, model_path, interval=10, job_id=None, comm ...@@ -309,34 +346,58 @@ def analyze_media(media_path, prompt, model_path, interval=10, job_id=None, comm
else: else:
print(f"DEBUG: Detected image, analyzing for job {job_id}") print(f"DEBUG: Detected image, analyzing for job {job_id}")
# Send progress update for image analysis # Send progress update for image analysis start
if comm: if comm:
progress_msg = Message('progress', f'progress_{job_id}', { progress_msg = Message('progress', f'progress_{job_id}', {
'job_id': job_id, 'job_id': job_id,
'stage': 'image_analysis', 'stage': 'image_analysis',
'progress': 50, 'progress': 20,
'message': 'Analyzing image' 'message': 'Starting image analysis'
}) })
comm.send_message(progress_msg) comm.send_message(progress_msg)
print(f"PROGRESS: Job {job_id} - 20% - Starting image analysis")
# Check for cancellation before processing image # Check for cancellation before processing image
if job_id and check_job_cancelled(job_id): if job_id and check_job_cancelled(job_id):
print(f"DEBUG: Job {job_id} cancelled before image analysis") print(f"DEBUG: Job {job_id} cancelled before image analysis")
return "Job cancelled by user", total_tokens return "Job cancelled by user", total_tokens
# Send progress update before model inference
if comm:
progress_msg = Message('progress', f'progress_{job_id}', {
'job_id': job_id,
'stage': 'image_processing',
'progress': 50,
'message': 'Processing image with AI model'
})
comm.send_message(progress_msg)
print(f"PROGRESS: Job {job_id} - 50% - Processing image with AI model")
result, tokens = analyze_single_image(media_path, full_prompt, model) result, tokens = analyze_single_image(media_path, full_prompt, model)
total_tokens += tokens total_tokens += tokens
print(f"DEBUG: Image analysis completed for job {job_id}") print(f"DEBUG: Image analysis completed for job {job_id}")
# Send progress update for completion
if comm:
progress_msg = Message('progress', f'progress_{job_id}', {
'job_id': job_id,
'stage': 'finalizing',
'progress': 90,
'message': 'Finalizing analysis results'
})
comm.send_message(progress_msg)
print(f"PROGRESS: Job {job_id} - 90% - Finalizing analysis results")
# Send final progress update # Send final progress update
if comm: if comm:
progress_msg = Message('progress', f'progress_{job_id}', { progress_msg = Message('progress', f'progress_{job_id}', {
'job_id': job_id, 'job_id': job_id,
'stage': 'completed', 'stage': 'completed',
'progress': 100, 'progress': 100,
'message': 'Analysis completed' 'message': 'Image analysis completed successfully'
}) })
comm.send_message(progress_msg) comm.send_message(progress_msg)
print(f"PROGRESS: Job {job_id} - 100% - Image analysis completed successfully")
torch.cuda.empty_cache() torch.cuda.empty_cache()
return result, total_tokens return result, total_tokens
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment