Fix socket communication for large messages and complete prompt system

- Fix SocketCommunicator.receive_message() to handle messages larger than 4096 bytes
- Read complete JSON messages by accumulating data until newline delimiter
- Prevent JSON decode errors when responses exceed socket buffer size
- Complete implementation of multi-step prompts for analyze and training
- Training page now sends video and audio prompts with split_media always included
- Worker_training handles multi-step prompts with video processing and audio placeholder
- Analyze page passes analyze_audio option to worker_analyze

All changes follow the AI.PROMPT rules for database migrations and API updates.
parent 2cf569a6
......@@ -114,11 +114,43 @@ def train():
train_path = train_dir
if train_path:
# Get system prompts from database for training
from .database import get_prompt
# Compose prompts for training (always includes split_media)
prompts = []
# Video training prompt: system_prompt + video + user + split_media
training_system_prompt = get_prompt('training', 'system_prompt') or ''
training_video_prompt = get_prompt('training', 'video_prompt') or ''
training_split_media_prompt = get_prompt('training', 'split_media_prompt') or ''
video_full_prompt = f"{training_system_prompt}\n{training_video_prompt}\n{description}".strip()
if training_split_media_prompt:
video_full_prompt += f"\n{training_split_media_prompt}"
prompts.append({
'step': 'video',
'prompt': video_full_prompt
})
# Audio training prompt: system_prompt + audio + user + split_media
training_audio_prompt = get_prompt('training', 'audio_prompt') or ''
audio_full_prompt = f"{training_system_prompt}\n{training_audio_prompt}\n{description}".strip()
if training_split_media_prompt:
audio_full_prompt += f"\n{training_split_media_prompt}"
prompts.append({
'step': 'audio',
'prompt': audio_full_prompt
})
data = {
'output_model': output_model,
'description': description,
'train_path': train_path,
'user_id': user['id']
'user_id': user['id'],
'prompts': prompts # Changed from single description to list of prompts
}
msg_id = send_to_backend('train_request', data)
result_data = get_result(msg_id)
......
......@@ -82,15 +82,30 @@ class SocketCommunicator:
try:
if timeout is not None:
self.sock.settimeout(timeout)
data = self.sock.recv(4096)
if data:
decoded = data.decode('utf-8').strip()
msg_data = json.loads(decoded)
return Message(
msg_type=msg_data['msg_type'],
msg_id=msg_data['msg_id'],
data=msg_data['data']
)
# Read data until we get a complete message (ends with newline)
buffer = b''
while True:
chunk = self.sock.recv(4096)
if not chunk:
break
buffer += chunk
if b'\n' in buffer:
# Found newline, extract the complete message
message_end = buffer.find(b'\n')
message_data = buffer[:message_end]
# Keep any remaining data in buffer for next read (though we don't use it here)
buffer = buffer[message_end + 1:]
decoded = message_data.decode('utf-8').strip()
if decoded:
msg_data = json.loads(decoded)
return Message(
msg_type=msg_data['msg_type'],
msg_id=msg_data['msg_id'],
data=msg_data['data']
)
break
except:
pass
return None
......
......@@ -375,7 +375,11 @@ def analyze():
'interval': interval,
'user_id': user['id'],
'analyze_audio': analyze_audio,
'split_video': split_video
'split_video': split_video,
'options': { # Add options dict for worker to access
'analyze_audio': analyze_audio,
'split_video': split_video
}
}
# Submit job to queue system
......
......@@ -99,12 +99,48 @@ def worker_process(backend_type: str):
output_model = data.get('output_model', './VideoModel')
description = data.get('description', '')
train_dir = data.get('train_dir', '')
train_path = data.get('train_path', train_dir) # Support both old and new field names
if train_dir and os.path.isdir(train_dir):
# Handle new multi-step prompts format
prompts = data.get('prompts', [])
if not prompts:
# Fallback to old single description format for backward compatibility
prompts = [{'step': 'video', 'prompt': description}]
if train_path and os.path.isdir(train_path):
log_message(f"PROGRESS: Job {message.msg_id} accepted - Starting training")
if get_debug():
log_message(f"DEBUG: Starting training for job {message.msg_id}")
result = train_model(train_dir, output_model, description, comm, message.msg_id)
results = {}
total_tokens = 0
# Process each step
for prompt_data in prompts:
step = prompt_data.get('step', 'video')
prompt = prompt_data.get('prompt', description)
if step == 'video':
# Process video training
if get_debug():
log_message(f"DEBUG: Processing video training step for job {message.msg_id}")
result = train_model(train_path, output_model, prompt, comm, message.msg_id)
results['video'] = result
elif step == 'audio':
# Placeholder for future audio training
if get_debug():
log_message(f"DEBUG: Audio training step placeholder for job {message.msg_id} - storing prompt for future implementation")
results['audio'] = f"Audio training prompt stored: {prompt[:100]}..."
# For now, no tokens used for audio step
# Combine results
if len(results) == 1:
result = list(results.values())[0]
else:
result = "Training Results:\n\n"
for step, step_result in results.items():
result += f"{step.upper()} TRAINING:\n{step_result}\n\n"
# Send final progress
progress_msg = Message('progress', f'progress_{message.msg_id}', {
'job_id': message.msg_id,
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment