Replace remaining print() statements in backend.py with log_message() calls

- Replaced all print statements in backend.py with timestamped log_message calls
- Includes worker registration, job forwarding, progress updates, ping handling, and startup/shutdown messages
- All backend logging now uses consistent timestamped format
parent c145c950
...@@ -72,12 +72,12 @@ def handle_web_message(message: Message, client_sock=None) -> Message: ...@@ -72,12 +72,12 @@ def handle_web_message(message: Message, client_sock=None) -> Message:
'data': message.data 'data': message.data
} }
msg_json = json.dumps(msg_dict) + '\n' msg_json = json.dumps(msg_dict) + '\n'
print(f"Backend sending to worker: {msg_json.strip()}") log_message(f"Backend sending to worker: {msg_json.strip()}")
worker_sockets[worker_key].sendall(msg_json.encode('utf-8')) worker_sockets[worker_key].sendall(msg_json.encode('utf-8'))
print(f"Backend sent to worker {worker_key}") log_message(f"Backend sent to worker {worker_key}")
return Message('success', message.msg_id, {'status': 'forwarded'}) return Message('success', message.msg_id, {'status': 'forwarded'})
else: else:
print(f"Backend: Worker {worker_key} not found in worker_sockets, queuing job") log_message(f"Backend: Worker {worker_key} not found in worker_sockets, queuing job")
if worker_key not in pending_jobs: if worker_key not in pending_jobs:
pending_jobs[worker_key] = [] pending_jobs[worker_key] = []
pending_jobs[worker_key].append(message) pending_jobs[worker_key].append(message)
...@@ -136,8 +136,8 @@ def handle_worker_message(message: Message, client_sock) -> None: ...@@ -136,8 +136,8 @@ def handle_worker_message(message: Message, client_sock) -> None:
worker_type = message.data.get('type') worker_type = message.data.get('type')
if worker_type: if worker_type:
worker_sockets[worker_type] = client_sock worker_sockets[worker_type] = client_sock
print(f"Worker {worker_type} registered") log_message(f"Worker {worker_type} registered")
print(f"DEBUG: worker_sockets keys after register: {list(worker_sockets.keys())}") log_message(f"DEBUG: worker_sockets keys after register: {list(worker_sockets.keys())}")
# Send any pending jobs for this worker # Send any pending jobs for this worker
if worker_type in pending_jobs: if worker_type in pending_jobs:
...@@ -149,12 +149,12 @@ def handle_worker_message(message: Message, client_sock) -> None: ...@@ -149,12 +149,12 @@ def handle_worker_message(message: Message, client_sock) -> None:
'data': pending_msg.data 'data': pending_msg.data
} }
msg_json = json.dumps(msg_dict) + '\n' msg_json = json.dumps(msg_dict) + '\n'
print(f"Backend sending pending job to worker {worker_type}: {msg_json.strip()}") log_message(f"Backend sending pending job to worker {worker_type}: {msg_json.strip()}")
try: try:
worker_sockets[worker_type].sendall(msg_json.encode('utf-8')) worker_sockets[worker_type].sendall(msg_json.encode('utf-8'))
print(f"Backend sent pending job to worker {worker_type}") log_message(f"Backend sent pending job to worker {worker_type}")
except Exception as e: except Exception as e:
print(f"Failed to send pending job to worker {worker_type}: {e}") log_message(f"Failed to send pending job to worker {worker_type}: {e}")
del pending_jobs[worker_type] del pending_jobs[worker_type]
# Notify cluster master that a worker has registered # Notify cluster master that a worker has registered
...@@ -164,31 +164,31 @@ def handle_worker_message(message: Message, client_sock) -> None: ...@@ -164,31 +164,31 @@ def handle_worker_message(message: Message, client_sock) -> None:
notification_sock.connect(('localhost', 5004)) # Cluster master port + 1 notification_sock.connect(('localhost', 5004)) # Cluster master port + 1
notification_sock.sendall(b"worker_registered") notification_sock.sendall(b"worker_registered")
notification_sock.close() notification_sock.close()
print(f"Notified cluster master of worker {worker_type} registration") log_message(f"Notified cluster master of worker {worker_type} registration")
except Exception as e: except Exception as e:
print(f"Failed to notify cluster master: {e}") log_message(f"Failed to notify cluster master: {e}")
elif message.msg_type == 'progress': elif message.msg_type == 'progress':
# Store progress update for web to poll and update progress timestamp # Store progress update for web to poll and update progress timestamp
progress_key = f"progress_{message.data.get('job_id')}" progress_key = f"progress_{message.data.get('job_id')}"
pending_results[progress_key] = message pending_results[progress_key] = message
job_progress_times[message.data.get('job_id')] = time.time() job_progress_times[message.data.get('job_id')] = time.time()
print(f"Progress update for job {message.data.get('job_id')}: {message.data.get('progress', 0)}% - {message.data.get('message', '')}") log_message(f"Progress update for job {message.data.get('job_id')}: {message.data.get('progress', 0)}% - {message.data.get('message', '')}")
# Update job progress in database # Update job progress in database
from .database import update_job_progress from .database import update_job_progress
job_id = message.data.get('job_id') job_id = message.data.get('job_id')
progress = message.data.get('progress', 0) progress = message.data.get('progress', 0)
progress_message = message.data.get('message', '') progress_message = message.data.get('message', '')
if job_id: if job_id:
print(f"DEBUG: calling update_job_progress job_id={job_id}, progress={progress}") log_message(f"DEBUG: calling update_job_progress job_id={job_id}, progress={progress}")
success = update_job_progress(job_id, progress, progress_message) success = update_job_progress(job_id, progress, progress_message)
print(f"DEBUG: update_job_progress success = {success}") log_message(f"DEBUG: update_job_progress success = {success}")
elif message.msg_type == 'ping': elif message.msg_type == 'ping':
# Handle ping messages to keep connection alive and reset timeout # Handle ping messages to keep connection alive and reset timeout
job_id = message.data.get('job_id') job_id = message.data.get('job_id')
if job_id: if job_id:
# Update progress timestamp to reset timeout # Update progress timestamp to reset timeout
job_progress_times[job_id] = time.time() job_progress_times[job_id] = time.time()
print(f"PING received for job {job_id} - resetting timeout") log_message(f"PING received for job {job_id} - resetting timeout")
# Notify cluster master of ping to reset job timeout # Notify cluster master of ping to reset job timeout
try: try:
import socket import socket
...@@ -196,9 +196,9 @@ def handle_worker_message(message: Message, client_sock) -> None: ...@@ -196,9 +196,9 @@ def handle_worker_message(message: Message, client_sock) -> None:
notification_sock.connect(('localhost', 5004)) # Cluster master port + 1 notification_sock.connect(('localhost', 5004)) # Cluster master port + 1
notification_sock.sendall(f"ping:{job_id}".encode()) notification_sock.sendall(f"ping:{job_id}".encode())
notification_sock.close() notification_sock.close()
print(f"Notified cluster master of ping for job {job_id}") log_message(f"Notified cluster master of ping for job {job_id}")
except Exception as e: except Exception as e:
print(f"Failed to notify cluster master of ping: {e}") log_message(f"Failed to notify cluster master of ping: {e}")
elif message.msg_type in ['analyze_response', 'train_response']: elif message.msg_type in ['analyze_response', 'train_response']:
# Store result for web to poll # Store result for web to poll
pending_results[message.msg_id] = message pending_results[message.msg_id] = message
...@@ -213,7 +213,7 @@ def handle_worker_message(message: Message, client_sock) -> None: ...@@ -213,7 +213,7 @@ def handle_worker_message(message: Message, client_sock) -> None:
if job_id.startswith('job_'): if job_id.startswith('job_'):
update_queue_status(job_id, 'completed', used_tokens=tokens_used) update_queue_status(job_id, 'completed', used_tokens=tokens_used)
except Exception as e: except Exception as e:
print(f"Error updating token usage: {e}") log_message(f"Error updating token usage: {e}")
def worker_message_handler(message: Message, client_sock) -> None: def worker_message_handler(message: Message, client_sock) -> None:
...@@ -223,7 +223,7 @@ def worker_message_handler(message: Message, client_sock) -> None: ...@@ -223,7 +223,7 @@ def worker_message_handler(message: Message, client_sock) -> None:
def backend_process() -> None: def backend_process() -> None:
"""Main backend process loop.""" """Main backend process loop."""
print("Starting Video AI Backend...") log_message("Starting Video AI Backend...")
from .config import get_backend_web_port from .config import get_backend_web_port
from .compat import get_socket_path from .compat import get_socket_path
...@@ -240,7 +240,7 @@ def backend_process() -> None: ...@@ -240,7 +240,7 @@ def backend_process() -> None:
while True: while True:
time.sleep(1) time.sleep(1)
except KeyboardInterrupt: except KeyboardInterrupt:
print("Backend shutting down...") log_message("Backend shutting down...")
web_server.stop() web_server.stop()
worker_server.stop() worker_server.stop()
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment