Fix progress message job_id to use integer ID instead of message ID for proper...

Fix progress message job_id to use integer ID instead of message ID for proper backend storage and web polling
parent 91ace43b
...@@ -173,21 +173,21 @@ def check_job_cancelled(job_id): ...@@ -173,21 +173,21 @@ def check_job_cancelled(job_id):
except: except:
return False return False
def analyze_media(media_path, prompt, model_path, interval=10, job_id=None, comm=None): def analyze_media(media_path, prompt, model_path, interval=10, job_id_int=None, comm=None):
"""Analyze media using dynamic model loading.""" """Analyze media using dynamic model loading."""
print(f"DEBUG: Starting analyze_media for job {job_id}, media_path={media_path}") print(f"DEBUG: Starting analyze_media for job {job_id}, media_path={media_path}")
# Send initial progress update # Send initial progress update
if comm: if comm:
progress_msg = Message('progress', f'progress_{job_id}', { progress_msg = Message('progress', f'progress_{job_id_int}', {
'job_id': job_id, 'job_id': job_id_int,
'stage': 'initializing', 'stage': 'initializing',
'progress': 5, 'progress': 5,
'message': 'Initializing analysis job' 'message': 'Initializing analysis job'
}) })
comm.send_message(progress_msg) comm.send_message(progress_msg)
if get_debug(): if get_debug():
print(f"PROGRESS: Job {job_id} - 5% - Initializing analysis job") print(f"PROGRESS: Job {job_id_int} - 5% - Initializing analysis job")
torch.cuda.empty_cache() torch.cuda.empty_cache()
total_tokens = 0 total_tokens = 0
...@@ -199,15 +199,15 @@ def analyze_media(media_path, prompt, model_path, interval=10, job_id=None, comm ...@@ -199,15 +199,15 @@ def analyze_media(media_path, prompt, model_path, interval=10, job_id=None, comm
# Send progress update after model loading # Send progress update after model loading
if comm: if comm:
progress_msg = Message('progress', f'progress_{job_id}', { progress_msg = Message('progress', f'progress_{job_id_int}', {
'job_id': job_id, 'job_id': job_id_int,
'stage': 'model_loaded', 'stage': 'model_loaded',
'progress': 8, 'progress': 8,
'message': f'Model {model_path.split("/")[-1]} loaded successfully' 'message': f'Model {model_path.split("/")[-1]} loaded successfully'
}) })
comm.send_message(progress_msg) comm.send_message(progress_msg)
if get_debug(): if get_debug():
print(f"PROGRESS: Job {job_id} - 8% - Model loaded successfully") print(f"PROGRESS: Job {job_id_int} - 8% - Model loaded successfully")
# Get system prompt # Get system prompt
print(f"DEBUG: Retrieving system prompt for job {job_id}") print(f"DEBUG: Retrieving system prompt for job {job_id}")
...@@ -226,15 +226,15 @@ def analyze_media(media_path, prompt, model_path, interval=10, job_id=None, comm ...@@ -226,15 +226,15 @@ def analyze_media(media_path, prompt, model_path, interval=10, job_id=None, comm
# Send progress update for frame extraction # Send progress update for frame extraction
if comm: if comm:
progress_msg = Message('progress', f'progress_{job_id}', { progress_msg = Message('progress', f'progress_{job_id_int}', {
'job_id': job_id, 'job_id': job_id_int,
'stage': 'frame_extraction', 'stage': 'frame_extraction',
'progress': 10, 'progress': 10,
'message': f'Extracted {total_frames} frames' 'message': f'Extracted {total_frames} frames'
}) })
comm.send_message(progress_msg) comm.send_message(progress_msg)
if get_debug(): if get_debug():
print(f"PROGRESS: Job {job_id} - 10% - Extracted {total_frames} frames") print(f"PROGRESS: Job {job_id_int} - 10% - Extracted {total_frames} frames")
descriptions = [] descriptions = []
...@@ -244,15 +244,15 @@ def analyze_media(media_path, prompt, model_path, interval=10, job_id=None, comm ...@@ -244,15 +244,15 @@ def analyze_media(media_path, prompt, model_path, interval=10, job_id=None, comm
# Send progress update before processing # Send progress update before processing
if comm: if comm:
progress_percent = 10 + int(i / total_frames * 70) # 10-80% for frame processing progress_percent = 10 + int(i / total_frames * 70) # 10-80% for frame processing
progress_msg = Message('progress', f'progress_{job_id}', { progress_msg = Message('progress', f'progress_{job_id_int}', {
'job_id': job_id, 'job_id': job_id_int,
'stage': 'frame_analysis', 'stage': 'frame_analysis',
'progress': progress_percent, 'progress': progress_percent,
'message': f'Processing frame {i+1}/{total_frames} at {ts:.1f}s' 'message': f'Processing frame {i+1}/{total_frames} at {ts:.1f}s'
}) })
comm.send_message(progress_msg) comm.send_message(progress_msg)
if get_debug(): if get_debug():
print(f"PROGRESS: Job {job_id} - {progress_percent}% - Processing frame {i+1}/{total_frames}") print(f"PROGRESS: Job {job_id_int} - {progress_percent}% - Processing frame {i+1}/{total_frames}")
# Check for cancellation # Check for cancellation
if job_id and check_job_cancelled(job_id): if job_id and check_job_cancelled(job_id):
...@@ -280,15 +280,15 @@ def analyze_media(media_path, prompt, model_path, interval=10, job_id=None, comm ...@@ -280,15 +280,15 @@ def analyze_media(media_path, prompt, model_path, interval=10, job_id=None, comm
# Send progress update after processing # Send progress update after processing
if comm and (i + 1) % max(1, total_frames // 10) == 0: # Update every 10% or at least every frame for small videos if comm and (i + 1) % max(1, total_frames // 10) == 0: # Update every 10% or at least every frame for small videos
progress_percent = 10 + int((i + 1) / total_frames * 70) progress_percent = 10 + int((i + 1) / total_frames * 70)
progress_msg = Message('progress', f'progress_{job_id}', { progress_msg = Message('progress', f'progress_{job_id_int}', {
'job_id': job_id, 'job_id': job_id_int,
'stage': 'frame_analysis', 'stage': 'frame_analysis',
'progress': progress_percent, 'progress': progress_percent,
'message': f'Completed frame {i+1}/{total_frames} ({progress_percent}%)' 'message': f'Completed frame {i+1}/{total_frames} ({progress_percent}%)'
}) })
comm.send_message(progress_msg) comm.send_message(progress_msg)
if get_debug(): if get_debug():
print(f"PROGRESS: Job {job_id} - {progress_percent}% - Completed frame {i+1}/{total_frames}") print(f"PROGRESS: Job {job_id_int} - {progress_percent}% - Completed frame {i+1}/{total_frames}")
if output_dir: if output_dir:
import shutil import shutil
...@@ -298,15 +298,15 @@ def analyze_media(media_path, prompt, model_path, interval=10, job_id=None, comm ...@@ -298,15 +298,15 @@ def analyze_media(media_path, prompt, model_path, interval=10, job_id=None, comm
# Send progress update for summary generation # Send progress update for summary generation
if comm: if comm:
progress_msg = Message('progress', f'progress_{job_id}', { progress_msg = Message('progress', f'progress_{job_id_int}', {
'job_id': job_id, 'job_id': job_id_int,
'stage': 'summary_generation', 'stage': 'summary_generation',
'progress': 85, 'progress': 85,
'message': 'Generating video summary' 'message': 'Generating video summary'
}) })
comm.send_message(progress_msg) comm.send_message(progress_msg)
if get_debug(): if get_debug():
print(f"PROGRESS: Job {job_id} - 85% - Generating video summary") print(f"PROGRESS: Job {job_id_int} - 85% - Generating video summary")
# Check for cancellation before summary # Check for cancellation before summary
if job_id and check_job_cancelled(job_id): if job_id and check_job_cancelled(job_id):
...@@ -338,15 +338,15 @@ def analyze_media(media_path, prompt, model_path, interval=10, job_id=None, comm ...@@ -338,15 +338,15 @@ def analyze_media(media_path, prompt, model_path, interval=10, job_id=None, comm
# Send final progress update # Send final progress update
if comm: if comm:
progress_msg = Message('progress', f'progress_{job_id}', { progress_msg = Message('progress', f'progress_{job_id_int}', {
'job_id': job_id, 'job_id': job_id_int,
'stage': 'completed', 'stage': 'completed',
'progress': 100, 'progress': 100,
'message': 'Analysis completed' 'message': 'Analysis completed'
}) })
comm.send_message(progress_msg) comm.send_message(progress_msg)
if get_debug(): if get_debug():
print(f"PROGRESS: Job {job_id} - 100% - Analysis completed") print(f"PROGRESS: Job {job_id_int} - 100% - Analysis completed")
result = f"Frame Descriptions:\n" + "\n".join(descriptions) + f"\n\nSummary:\n{summary}" result = f"Frame Descriptions:\n" + "\n".join(descriptions) + f"\n\nSummary:\n{summary}"
return result, total_tokens return result, total_tokens
...@@ -355,15 +355,15 @@ def analyze_media(media_path, prompt, model_path, interval=10, job_id=None, comm ...@@ -355,15 +355,15 @@ def analyze_media(media_path, prompt, model_path, interval=10, job_id=None, comm
# Send progress update for image analysis start # Send progress update for image analysis start
if comm: if comm:
progress_msg = Message('progress', f'progress_{job_id}', { progress_msg = Message('progress', f'progress_{job_id_int}', {
'job_id': job_id, 'job_id': job_id_int,
'stage': 'image_analysis', 'stage': 'image_analysis',
'progress': 20, 'progress': 20,
'message': 'Starting image analysis' 'message': 'Starting image analysis'
}) })
comm.send_message(progress_msg) comm.send_message(progress_msg)
if get_debug(): if get_debug():
print(f"PROGRESS: Job {job_id} - 20% - Starting image analysis") print(f"PROGRESS: Job {job_id_int} - 20% - Starting image analysis")
# Check for cancellation before processing image # Check for cancellation before processing image
if job_id and check_job_cancelled(job_id): if job_id and check_job_cancelled(job_id):
...@@ -372,15 +372,15 @@ def analyze_media(media_path, prompt, model_path, interval=10, job_id=None, comm ...@@ -372,15 +372,15 @@ def analyze_media(media_path, prompt, model_path, interval=10, job_id=None, comm
# Send progress update before model inference # Send progress update before model inference
if comm: if comm:
progress_msg = Message('progress', f'progress_{job_id}', { progress_msg = Message('progress', f'progress_{job_id_int}', {
'job_id': job_id, 'job_id': job_id_int,
'stage': 'image_processing', 'stage': 'image_processing',
'progress': 50, 'progress': 50,
'message': 'Processing image with AI model' 'message': 'Processing image with AI model'
}) })
comm.send_message(progress_msg) comm.send_message(progress_msg)
if get_debug(): if get_debug():
print(f"PROGRESS: Job {job_id} - 50% - Processing image with AI model") print(f"PROGRESS: Job {job_id_int} - 50% - Processing image with AI model")
result, tokens = analyze_single_image(media_path, full_prompt, model) result, tokens = analyze_single_image(media_path, full_prompt, model)
total_tokens += tokens total_tokens += tokens
...@@ -388,27 +388,27 @@ def analyze_media(media_path, prompt, model_path, interval=10, job_id=None, comm ...@@ -388,27 +388,27 @@ def analyze_media(media_path, prompt, model_path, interval=10, job_id=None, comm
# Send progress update for completion # Send progress update for completion
if comm: if comm:
progress_msg = Message('progress', f'progress_{job_id}', { progress_msg = Message('progress', f'progress_{job_id_int}', {
'job_id': job_id, 'job_id': job_id_int,
'stage': 'finalizing', 'stage': 'finalizing',
'progress': 90, 'progress': 90,
'message': 'Finalizing analysis results' 'message': 'Finalizing analysis results'
}) })
comm.send_message(progress_msg) comm.send_message(progress_msg)
if get_debug(): if get_debug():
print(f"PROGRESS: Job {job_id} - 90% - Finalizing analysis results") print(f"PROGRESS: Job {job_id_int} - 90% - Finalizing analysis results")
# Send final progress update # Send final progress update
if comm: if comm:
progress_msg = Message('progress', f'progress_{job_id}', { progress_msg = Message('progress', f'progress_{job_id_int}', {
'job_id': job_id, 'job_id': job_id_int,
'stage': 'completed', 'stage': 'completed',
'progress': 100, 'progress': 100,
'message': 'Image analysis completed successfully' 'message': 'Image analysis completed successfully'
}) })
comm.send_message(progress_msg) comm.send_message(progress_msg)
if get_debug(): if get_debug():
print(f"PROGRESS: Job {job_id} - 100% - Image analysis completed successfully") print(f"PROGRESS: Job {job_id_int} - 100% - Image analysis completed successfully")
torch.cuda.empty_cache() torch.cuda.empty_cache()
return result, total_tokens return result, total_tokens
...@@ -448,7 +448,7 @@ def worker_process(backend_type: str): ...@@ -448,7 +448,7 @@ def worker_process(backend_type: str):
if get_debug(): if get_debug():
print(f"PROGRESS: Job {message.msg_id} accepted - Starting analysis") print(f"PROGRESS: Job {message.msg_id} accepted - Starting analysis")
print(f"DEBUG: Starting analysis of {media_path} with model {model_path} for job {job_id}") print(f"DEBUG: Starting analysis of {media_path} with model {model_path} for job {job_id}")
result, tokens_used = analyze_media(media_path, prompt, model_path, interval, job_id, comm) result, tokens_used = analyze_media(media_path, prompt, model_path, interval, job_id_int, comm)
print(f"DEBUG: Analysis completed for job {message.msg_id}, used {tokens_used} tokens") print(f"DEBUG: Analysis completed for job {message.msg_id}, used {tokens_used} tokens")
# Release model reference (don't unload yet, per requirements) # Release model reference (don't unload yet, per requirements)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment