Add progress updates and ping mechanism to worker_training.py

parent 80b5ce51
...@@ -29,8 +29,8 @@ import time ...@@ -29,8 +29,8 @@ import time
from .comm import SocketCommunicator, Message from .comm import SocketCommunicator, Message
from .config import get_comm_type, get_backend_worker_port, get_debug from .config import get_comm_type, get_backend_worker_port, get_debug
def train_model(train_path, output_model, description): def train_model(train_path, output_model, description, comm, job_id):
"""Perform training.""" """Perform training with progress updates."""
if get_debug(): if get_debug():
print(f"DEBUG: Starting training with videotrain for output_model {output_model}") print(f"DEBUG: Starting training with videotrain for output_model {output_model}")
desc_file = os.path.join(train_path, "description.txt") desc_file = os.path.join(train_path, "description.txt")
...@@ -39,13 +39,39 @@ def train_model(train_path, output_model, description): ...@@ -39,13 +39,39 @@ def train_model(train_path, output_model, description):
# Assume videotrain is available # Assume videotrain is available
cmd = ["python", "videotrain", train_path, "--output_dir", output_model] cmd = ["python", "videotrain", train_path, "--output_dir", output_model]
result = subprocess.run(cmd, capture_output=True, text=True) proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
# Send initial progress
progress_msg = Message('progress', f'progress_{job_id}', {
'job_id': job_id,
'stage': 'training_started',
'progress': 10,
'message': 'Training started'
})
comm.send_message(progress_msg)
print(f"PROGRESS: Job {job_id} - 10% - Training started")
last_ping = time.time()
while proc.poll() is None:
# Send ping every 30 seconds
if time.time() - last_ping > 30:
ping_msg = Message('ping', f'ping_{job_id}_{int(time.time())}', {
'job_id': job_id,
'timestamp': time.time()
})
comm.send_message(ping_msg)
print(f"PING: Job {job_id} - Keeping connection alive")
last_ping = time.time()
time.sleep(1)
# Get result
stdout, stderr = proc.communicate()
if get_debug(): if get_debug():
print(f"DEBUG: Training subprocess completed with returncode {result.returncode}") print(f"DEBUG: Training subprocess completed with returncode {proc.returncode}")
if result.returncode == 0: if proc.returncode == 0:
return "Training completed!" return "Training completed!"
else: else:
return f"Training failed: {result.stderr}" return f"Training failed: {stderr}"
def worker_process(backend_type: str): def worker_process(backend_type: str):
"""Main worker process.""" """Main worker process."""
...@@ -77,7 +103,15 @@ def worker_process(backend_type: str): ...@@ -77,7 +103,15 @@ def worker_process(backend_type: str):
print(f"PROGRESS: Job {message.msg_id} accepted - Starting training") print(f"PROGRESS: Job {message.msg_id} accepted - Starting training")
if get_debug(): if get_debug():
print(f"DEBUG: Starting training for job {message.msg_id}") print(f"DEBUG: Starting training for job {message.msg_id}")
result = train_model(train_dir, output_model, description) result = train_model(train_dir, output_model, description, comm, message.msg_id)
# Send final progress
progress_msg = Message('progress', f'progress_{message.msg_id}', {
'job_id': message.msg_id,
'stage': 'training_completed',
'progress': 100,
'message': 'Training completed'
})
comm.send_message(progress_msg)
print(f"PROGRESS: Job {message.msg_id} - 100% - Training completed") print(f"PROGRESS: Job {message.msg_id} - 100% - Training completed")
if get_debug(): if get_debug():
print(f"DEBUG: Training completed for job {message.msg_id}") print(f"DEBUG: Training completed for job {message.msg_id}")
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment