Add debug logging to worker processes to determine where processing stops

parent 6742c12e
...@@ -165,27 +165,35 @@ def check_job_cancelled(job_id): ...@@ -165,27 +165,35 @@ def check_job_cancelled(job_id):
def analyze_media(media_path, prompt, model_path, interval=10, job_id=None): def analyze_media(media_path, prompt, model_path, interval=10, job_id=None):
"""Analyze media using dynamic model loading.""" """Analyze media using dynamic model loading."""
print(f"DEBUG: Starting analyze_media for job {job_id}, media_path={media_path}")
torch.cuda.empty_cache() torch.cuda.empty_cache()
# Get model with reference counting # Get model with reference counting
print(f"DEBUG: Loading model {model_path} for job {job_id}")
model = get_or_load_model(model_path) model = get_or_load_model(model_path)
print(f"DEBUG: Model loaded for job {job_id}")
# Get system prompt # Get system prompt
print(f"DEBUG: Retrieving system prompt for job {job_id}")
try: try:
from .config import get_system_prompt_content from .config import get_system_prompt_content
system_prompt = get_system_prompt_content() system_prompt = get_system_prompt_content()
full_prompt = system_prompt + " " + prompt if system_prompt else prompt full_prompt = system_prompt + " " + prompt if system_prompt else prompt
except: except:
full_prompt = prompt full_prompt = prompt
print(f"DEBUG: Full prompt set for job {job_id}")
if is_video(media_path): if is_video(media_path):
print(f"DEBUG: Detected video, extracting frames for job {job_id}")
frames, output_dir = extract_frames(media_path, interval, optimize=True) frames, output_dir = extract_frames(media_path, interval, optimize=True)
total_frames = len(frames) total_frames = len(frames)
descriptions = [] descriptions = []
for i, (frame_path, ts) in enumerate(frames): for i, (frame_path, ts) in enumerate(frames):
print(f"DEBUG: Processing frame {i+1}/{total_frames} at {ts:.2f}s for job {job_id}")
# Check for cancellation # Check for cancellation
if queue_id and check_job_cancelled(queue_id): if job_id and check_job_cancelled(job_id):
print(f"DEBUG: Job {job_id} cancelled during frame processing")
# Clean up and return cancelled message # Clean up and return cancelled message
for fp, _ in frames[i:]: for fp, _ in frames[i:]:
try: try:
...@@ -201,6 +209,7 @@ def analyze_media(media_path, prompt, model_path, interval=10, job_id=None): ...@@ -201,6 +209,7 @@ def analyze_media(media_path, prompt, model_path, interval=10, job_id=None):
return "Job cancelled by user" return "Job cancelled by user"
desc = analyze_single_image(frame_path, full_prompt, model) desc = analyze_single_image(frame_path, full_prompt, model)
print(f"DEBUG: Frame {i+1} analyzed for job {job_id}")
descriptions.append(f"At {ts:.2f}s: {desc}") descriptions.append(f"At {ts:.2f}s: {desc}")
os.unlink(frame_path) os.unlink(frame_path)
...@@ -208,8 +217,10 @@ def analyze_media(media_path, prompt, model_path, interval=10, job_id=None): ...@@ -208,8 +217,10 @@ def analyze_media(media_path, prompt, model_path, interval=10, job_id=None):
import shutil import shutil
shutil.rmtree(output_dir) shutil.rmtree(output_dir)
print(f"DEBUG: All frames processed, generating summary for job {job_id}")
# Check for cancellation before summary # Check for cancellation before summary
if queue_id and check_job_cancelled(queue_id): if job_id and check_job_cancelled(job_id):
print(f"DEBUG: Job {job_id} cancelled before summary")
return "Job cancelled by user" return "Job cancelled by user"
# Generate summary # Generate summary
...@@ -222,14 +233,18 @@ def analyze_media(media_path, prompt, model_path, interval=10, job_id=None): ...@@ -222,14 +233,18 @@ def analyze_media(media_path, prompt, model_path, interval=10, job_id=None):
# Use text-only model for summary # Use text-only model for summary
summary = model.generate(f"Summarize the video based on frame descriptions: {' '.join(descriptions)}", max_new_tokens=256) summary = model.generate(f"Summarize the video based on frame descriptions: {' '.join(descriptions)}", max_new_tokens=256)
print(f"DEBUG: Summary generated for job {job_id}")
result = f"Frame Descriptions:\n" + "\n".join(descriptions) + f"\n\nSummary:\n{summary}" result = f"Frame Descriptions:\n" + "\n".join(descriptions) + f"\n\nSummary:\n{summary}"
return result return result
else: else:
print(f"DEBUG: Detected image, analyzing for job {job_id}")
# Check for cancellation before processing image # Check for cancellation before processing image
if queue_id and check_job_cancelled(queue_id): if job_id and check_job_cancelled(job_id):
print(f"DEBUG: Job {job_id} cancelled before image analysis")
return "Job cancelled by user" return "Job cancelled by user"
result = analyze_single_image(media_path, full_prompt, model) result = analyze_single_image(media_path, full_prompt, model)
print(f"DEBUG: Image analysis completed for job {job_id}")
torch.cuda.empty_cache() torch.cuda.empty_cache()
return result return result
......
...@@ -31,6 +31,7 @@ from .config import get_comm_type ...@@ -31,6 +31,7 @@ from .config import get_comm_type
def train_model(train_path, output_model, description): def train_model(train_path, output_model, description):
"""Perform training.""" """Perform training."""
print(f"DEBUG: Starting training with videotrain for output_model {output_model}")
desc_file = os.path.join(train_path, "description.txt") desc_file = os.path.join(train_path, "description.txt")
with open(desc_file, "w") as f: with open(desc_file, "w") as f:
f.write(description) f.write(description)
...@@ -38,6 +39,7 @@ def train_model(train_path, output_model, description): ...@@ -38,6 +39,7 @@ def train_model(train_path, output_model, description):
# Assume videotrain is available # Assume videotrain is available
cmd = ["python", "videotrain", train_path, "--output_dir", output_model] cmd = ["python", "videotrain", train_path, "--output_dir", output_model]
result = subprocess.run(cmd, capture_output=True, text=True) result = subprocess.run(cmd, capture_output=True, text=True)
print(f"DEBUG: Training subprocess completed with returncode {result.returncode}")
if result.returncode == 0: if result.returncode == 0:
return "Training completed!" return "Training completed!"
else: else:
...@@ -60,17 +62,22 @@ def worker_process(backend_type: str): ...@@ -60,17 +62,22 @@ def worker_process(backend_type: str):
try: try:
message = comm.receive_message() message = comm.receive_message()
if message and message.msg_type == 'train_request': if message and message.msg_type == 'train_request':
print(f"DEBUG: Worker received train_request: {message.msg_id}")
data = message.data data = message.data
output_model = data.get('output_model', './VideoModel') output_model = data.get('output_model', './VideoModel')
description = data.get('description', '') description = data.get('description', '')
train_dir = data.get('train_dir', '') train_dir = data.get('train_dir', '')
if train_dir and os.path.isdir(train_dir): if train_dir and os.path.isdir(train_dir):
print(f"DEBUG: Starting training for job {message.msg_id}")
result = train_model(train_dir, output_model, description) result = train_model(train_dir, output_model, description)
print(f"DEBUG: Training completed for job {message.msg_id}")
else: else:
result = "No valid training directory provided" result = "No valid training directory provided"
print(f"DEBUG: No valid training directory for job {message.msg_id}")
response = Message('train_response', message.msg_id, {'message': result}) response = Message('train_response', message.msg_id, {'message': result})
print(f"DEBUG: Sending train_response for job {message.msg_id}")
comm.send_message(response) comm.send_message(response)
time.sleep(0.1) time.sleep(0.1)
except Exception as e: except Exception as e:
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment