Add timestamped logging to all console output

- Created logging_utils.py with log_message() function that prefixes all output with date, time, and process name
- Replaced all print() statements with log_message() calls across the entire codebase
- Fixed circular import issues by separating logging from utils.py
- All console output now follows format: [YYYY-MM-DD HH:MM:SS] [process_name] message

This improves debugging and monitoring by providing clear timestamps and process identification for all log messages.
parent 8cee35a7
#!/usr/bin/env python3
"""
Script to replace all print() statements with log_message() calls
and add proper imports.
"""
import os
import re
import glob
def get_process_name(filepath):
"""Determine process name from file path."""
filename = os.path.basename(filepath)
if 'backend' in filename:
return 'backend'
elif 'worker_analysis' in filename:
return 'worker_analysis'
elif 'worker_training' in filename:
return 'worker_training'
elif 'web' in filename:
return 'web'
elif 'cluster_master' in filename:
return 'cluster_master'
elif 'cluster_client' in filename:
return 'cluster_client'
elif 'api' in filename:
return 'api'
elif 'admin' in filename:
return 'admin'
elif 'queue' in filename:
return 'queue'
elif 'database' in filename:
return 'database'
elif 'auth' in filename:
return 'auth'
elif 'config' in filename:
return 'config'
elif 'runpod' in filename:
return 'runpod'
elif 'email' in filename:
return 'email'
elif 'payments' in filename:
return 'payments'
elif 'comm' in filename:
return 'comm'
else:
return 'main'
def process_file(filepath):
"""Process a single Python file."""
print(f"Processing {filepath}...")
with open(filepath, 'r') as f:
content = f.read()
# Skip if already processed
if 'from .logging_utils import log_message' in content or 'log_message' in content:
print(f" Already processed, skipping")
return
# Add import after existing imports
import_pattern = r'(from \..* import.*\n)+'
import_match = re.search(import_pattern, content)
if import_match:
# Insert after the last import
insert_pos = import_match.end()
content = content[:insert_pos] + 'from .logging_utils import log_message\n' + content[insert_pos:]
else:
# Add at the beginning after docstring
lines = content.split('\n')
insert_pos = 0
for i, line in enumerate(lines):
if line.strip().startswith('"""') or line.strip().startswith("'''"):
# Find end of docstring
quote = line.strip()[0]
for j in range(i + 1, len(lines)):
if lines[j].strip().endswith(quote * 3):
insert_pos = j + 1
break
break
elif line.strip() and not line.startswith('#'):
insert_pos = i
break
lines.insert(insert_pos, 'from .logging_utils import log_message')
content = '\n'.join(lines)
# Replace print( with log_message(
# This is a simple replacement - may need manual review for complex cases
content = re.sub(r'\bprint\(', 'log_message(', content)
with open(filepath, 'w') as f:
f.write(content)
print(f" Updated {filepath}")
def main():
"""Main function."""
# Find all Python files in vidai directory
py_files = glob.glob('vidai/*.py')
for filepath in py_files:
if os.path.basename(filepath) in ['__init__.py', 'utils.py', 'logging_utils.py']:
continue # Skip these files
process_file(filepath)
print("Done!")
if __name__ == '__main__':
main()
\ No newline at end of file
......@@ -24,6 +24,7 @@ from .auth import require_auth
from .database import get_user_tokens, update_user_tokens, get_user_queue_items, get_default_user_tokens, create_remember_token, validate_remember_token, delete_remember_token, extend_remember_token, get_all_users, update_user_status, update_user_info, delete_user, get_worker_tokens, deactivate_worker_token, activate_worker_token, delete_worker_token, create_user, get_all_models, create_model, update_model, delete_model, get_model_by_id
from .comm import SocketCommunicator, Message
from .utils import get_current_user_session, login_required, admin_required
from .logging_utils import log_message
admin_bp = Blueprint('admin', __name__, url_prefix='/admin')
......@@ -40,7 +41,7 @@ def send_to_backend(msg_type: str, data: dict) -> str:
comm.send_message(message)
return msg_id
except Exception as e:
print(f"Failed to send message to backend: {e}")
log_message(f"Failed to send message to backend: {e}")
return msg_id
def get_result(msg_id: str) -> dict:
......
......@@ -26,6 +26,7 @@ from .auth import login_user, logout_user, get_current_user, register_user, conf
from .database import get_user_tokens, update_user_tokens, get_user_queue_items, get_default_user_tokens, create_remember_token, validate_remember_token, delete_remember_token, extend_remember_token
from .comm import SocketCommunicator, Message
from .utils import get_current_user_session, login_required, admin_required, api_auth_required, admin_api_auth_required
from .logging_utils import log_message
api_bp = Blueprint('api', __name__)
......@@ -45,7 +46,7 @@ def send_to_backend(msg_type: str, data: dict) -> str:
comm.send_message(message)
return msg_id
except Exception as e:
print(f"Failed to send message to backend: {e}")
log_message(f"Failed to send message to backend: {e}")
return msg_id
def get_result(msg_id: str) -> dict:
......@@ -166,7 +167,7 @@ def api_stats():
except ImportError:
# Fallback to PyTorch-only stats if pynvml not available
print("pynvml not available, falling back to PyTorch GPU stats")
log_message("pynvml not available, falling back to PyTorch GPU stats")
if torch.cuda.is_available():
data['gpu_count'] = torch.cuda.device_count()
data['gpus'] = []
......@@ -180,7 +181,7 @@ def api_stats():
}
data['gpus'].append(gpu)
except Exception as e:
print(f"Error getting GPU stats with pynvml: {e}")
log_message(f"Error getting GPU stats with pynvml: {e}")
# Fallback to PyTorch if pynvml fails
if torch.cuda.is_available():
data['gpu_count'] = torch.cuda.device_count()
......
......@@ -24,6 +24,7 @@ import secrets
import json
from typing import Optional, Dict, Any
from .database import authenticate_user, validate_api_token, create_persistent_session, get_persistent_session, destroy_persistent_session
from .logging_utils import log_message
# Redis support
try:
......@@ -58,9 +59,9 @@ class SessionManager:
)
# Test connection
self.redis_client.ping()
print("Redis session storage enabled")
log_message("Redis session storage enabled")
except (redis.ConnectionError, redis.AuthenticationError):
print("Redis not available, falling back to database storage")
log_message("Redis not available, falling back to database storage")
self.redis_client = None
def _use_redis(self) -> bool:
......
......@@ -24,6 +24,7 @@ import threading
from .comm import SocketServer, Message
from .config import get_analysis_backend, get_training_backend, set_analysis_backend, set_training_backend, get_backend_worker_port
from .queue import queue_manager
from .logging_utils import log_message
worker_sockets = {} # type: dict
......@@ -59,9 +60,9 @@ def handle_web_message(message: Message, client_sock=None) -> Message:
from .config import get_analysis_backend
backend = get_analysis_backend()
worker_key = f'analysis_{backend}'
print(f"Backend forwarding analyze_request {message.msg_id} to worker {worker_key}")
print(f"DEBUG: backend = {backend}, worker_key = {worker_key}")
print(f"DEBUG: Checking worker_sockets for {worker_key}, keys: {list(worker_sockets.keys())}")
log_message(f"Backend forwarding analyze_request {message.msg_id} to worker {worker_key}")
log_message(f"DEBUG: backend = {backend}, worker_key = {worker_key}")
log_message(f"DEBUG: Checking worker_sockets for {worker_key}, keys: {list(worker_sockets.keys())}")
if worker_key in worker_sockets:
# Forward to local worker
import json
......
......@@ -32,6 +32,7 @@ import ssl
from typing import Dict, Any, Optional
from .comm import Message
from .config import get_analysis_backend, get_training_backend
from .logging_utils import log_message
class ClusterClient:
......@@ -98,14 +99,14 @@ class ClusterClient:
response = json.loads(response_str)
if response.get('type') == 'auth_success':
self.connected = True
print("Successfully connected to cluster master")
log_message("Successfully connected to cluster master")
return True
else:
print("Authentication failed")
log_message("Authentication failed")
return False
except Exception as e:
print(f"Failed to connect to cluster master: {e}")
log_message(f"Failed to connect to cluster master: {e}")
return False
async def _receive_message(self) -> Optional[Dict[str, Any]]:
......@@ -135,8 +136,8 @@ class ClusterClient:
gpu_info = detect_gpu_backends()
available_backends = get_available_backends()
print(f"Client backend detection: CUDA={gpu_info['cuda']}, ROCm={gpu_info['rocm']}")
print(f"Available backends: {available_backends}")
log_message(f"Client backend detection: CUDA={gpu_info['cuda']}, ROCm={gpu_info['rocm']}")
log_message(f"Available backends: {available_backends}")
# Only start processes if not already started
if not self.local_processes:
......@@ -151,7 +152,7 @@ class ClusterClient:
self.local_processes[proc_name] = subprocess.Popen(cmd)
self.process_weights[proc_name] = 10 # Default weight
self.process_models[proc_name] = 'Qwen/Qwen2.5-VL-7B-Instruct'
print(f"Started analysis worker for {backend}")
log_message(f"Started analysis worker for {backend}")
# Start training workers for available backends (including CPU)
for backend in available_backends:
......@@ -164,7 +165,7 @@ class ClusterClient:
self.local_processes[proc_name] = subprocess.Popen(cmd)
self.process_weights[proc_name] = 5 # Training typically lower weight
self.process_models[proc_name] = 'Qwen/Qwen2.5-VL-7B-Instruct'
print(f"Started training worker for {backend}")
log_message(f"Started training worker for {backend}")
# Register processes with master
await self._send_message({
......@@ -254,7 +255,7 @@ class ClusterClient:
await self._handle_model_shared_file(message)
except Exception as e:
print(f"Error handling master command: {e}")
log_message(f"Error handling master command: {e}")
break
def _start_process(self, process_name: str) -> None:
......@@ -273,9 +274,9 @@ class ClusterClient:
self.local_processes[process_name] = subprocess.Popen(cmd)
self.process_weights[process_name] = 10
self.process_models[process_name] = 'Qwen/Qwen2.5-VL-7B-Instruct'
print(f"Started analysis worker for {backend}")
log_message(f"Started analysis worker for {backend}")
else:
print(f"Cannot start {process_name}: {backend} backend not available")
log_message(f"Cannot start {process_name}: {backend} backend not available")
elif process_name.startswith('training_'):
backend = process_name.split('_')[1]
if backend in available_backends:
......@@ -287,9 +288,9 @@ class ClusterClient:
self.local_processes[process_name] = subprocess.Popen(cmd)
self.process_weights[process_name] = 5
self.process_models[process_name] = 'Qwen/Qwen2.5-VL-7B-Instruct'
print(f"Started training worker for {backend}")
log_message(f"Started training worker for {backend}")
else:
print(f"Cannot start {process_name}: {backend} backend not available")
log_message(f"Cannot start {process_name}: {backend} backend not available")
async def _handle_job_assignment(self, message: Dict[str, Any]) -> None:
"""Handle job assignment from master."""
......@@ -297,7 +298,7 @@ class ClusterClient:
process_type = message.get('process_type', 'analysis')
job_data = message.get('job_data', {})
print(f"Received job assignment: {job_id} for process type: {process_type}")
log_message(f"Received job assignment: {job_id} for process type: {process_type}")
# Forward job to local backend via TCP (web interface), which will route it to the appropriate worker
try:
......@@ -316,13 +317,13 @@ class ClusterClient:
)
backend_comm.send_message(job_message)
print(f"Job {job_id} forwarded to local backend for {process_type} processing")
log_message(f"Job {job_id} forwarded to local backend for {process_type} processing")
# Start monitoring for result
self.pending_jobs[job_id] = asyncio.create_task(self._monitor_job_result(job_id))
except Exception as e:
print(f"Failed to forward job {job_id} to local backend: {e}")
log_message(f"Failed to forward job {job_id} to local backend: {e}")
await self._send_message({
'type': 'job_result',
'job_id': job_id,
......@@ -353,7 +354,7 @@ class ClusterClient:
response = backend_comm.receive_message()
if response and response.get('msg_type') in ['analyze_response', 'train_response']:
result_data = response.get('data', {})
print(f"Received result for job {job_id}")
log_message(f"Received result for job {job_id}")
# Send result back to cluster master
await self._send_message({
......@@ -373,11 +374,11 @@ class ClusterClient:
continue
except Exception as e:
print(f"Error polling for job {job_id} result: {e}")
log_message(f"Error polling for job {job_id} result: {e}")
await asyncio.sleep(1)
# Timeout - job took too long
print(f"Job {job_id} timed out waiting for result")
log_message(f"Job {job_id} timed out waiting for result")
await self._send_message({
'type': 'job_result',
'job_id': job_id,
......@@ -385,7 +386,7 @@ class ClusterClient:
})
except Exception as e:
print(f"Error monitoring job {job_id}: {e}")
log_message(f"Error monitoring job {job_id}: {e}")
finally:
if job_id in self.pending_jobs:
del self.pending_jobs[job_id]
......@@ -414,7 +415,7 @@ class ClusterClient:
'total_size': total_size,
'received_data': b''
}
print(f"Starting model transfer: {model_path} ({total_size} bytes)")
log_message(f"Starting model transfer: {model_path} ({total_size} bytes)")
async def _handle_model_chunk(self, message: Dict[str, Any]) -> None:
"""Handle model data chunk from master."""
......@@ -441,14 +442,14 @@ class ClusterClient:
with open(model_file, 'wb') as f:
f.write(model_data)
print(f"Model {model_path} saved to {model_file}")
log_message(f"Model {model_path} saved to {model_file}")
# Update local model availability
for proc_name in self.process_models:
if proc_name.startswith('analysis_') or proc_name.startswith('training_'):
self.process_models[proc_name] = model_path
else:
print(f"Model transfer failed: received {len(model_data)} bytes, expected {expected_size}")
log_message(f"Model transfer failed: received {len(model_data)} bytes, expected {expected_size}")
# Clean up
delattr(self, '_current_model_transfer')
......@@ -471,7 +472,7 @@ class ClusterClient:
await asyncio.sleep(5)
except Exception as e:
print(f"Error monitoring GPU usage: {e}")
log_message(f"Error monitoring GPU usage: {e}")
await asyncio.sleep(5)
def _collect_gpu_stats(self) -> Dict[str, Any]:
......@@ -521,7 +522,7 @@ class ClusterClient:
pass
except Exception as e:
print(f"Error collecting GPU stats with pynvml: {e}")
log_message(f"Error collecting GPU stats with pynvml: {e}")
# Fallback to PyTorch if pynvml fails
try:
import torch
......@@ -547,10 +548,10 @@ class ClusterClient:
backend = message.get('backend', 'cuda')
if backend not in ['cuda', 'rocm']:
print(f"Invalid backend requested: {backend} - only CUDA and ROCm supported")
log_message(f"Invalid backend requested: {backend} - only CUDA and ROCm supported")
return
print(f"Restarting workers with {backend} backend")
log_message(f"Restarting workers with {backend} backend")
# Terminate existing workers
for proc in self.local_processes.values():
......@@ -580,13 +581,13 @@ class ClusterClient:
available_backends = get_available_backends()
if backend not in available_backends:
print(f"Requested backend {backend} not available, available: {available_backends}")
log_message(f"Requested backend {backend} not available, available: {available_backends}")
# Use first available backend as fallback
if available_backends:
backend = available_backends[0]
print(f"Using fallback backend: {backend}")
log_message(f"Using fallback backend: {backend}")
else:
print("No backends available, cannot restart workers")
log_message("No backends available, cannot restart workers")
return
# Start new workers with the specified backend
......@@ -602,9 +603,9 @@ class ClusterClient:
self.local_processes[f'analysis_{backend}'] = subprocess.Popen(cmd)
self.process_weights[f'analysis_{backend}'] = 10
self.process_models[f'analysis_{backend}'] = 'Qwen/Qwen2.5-VL-7B-Instruct'
print(f"Started analysis worker with {backend}")
log_message(f"Started analysis worker with {backend}")
except Exception as e:
print(f"Failed to start analysis worker: {e}")
log_message(f"Failed to start analysis worker: {e}")
# Start training worker
try:
......@@ -616,9 +617,9 @@ class ClusterClient:
self.local_processes[f'training_{backend}'] = subprocess.Popen(cmd)
self.process_weights[f'training_{backend}'] = 5
self.process_models[f'training_{backend}'] = 'Qwen/Qwen2.5-VL-7B-Instruct'
print(f"Started training worker with {backend}")
log_message(f"Started training worker with {backend}")
except Exception as e:
print(f"Failed to start training worker: {e}")
log_message(f"Failed to start training worker: {e}")
# Re-register processes with master
await self._send_message({
......@@ -640,7 +641,7 @@ class ClusterClient:
total_size = message.get('total_size', 0)
if not self.shared_dir:
print(f"Received shared file message but no shared directory configured: {shared_file_path}")
log_message(f"Received shared file message but no shared directory configured: {shared_file_path}")
return
try:
......@@ -654,7 +655,7 @@ class ClusterClient:
local_model_file = f"models/{model_path.replace('/', '_')}.bin"
shutil.copy2(shared_file_path, local_model_file)
print(f"Model {model_path} copied from shared directory: {shared_file_path} -> {local_model_file}")
log_message(f"Model {model_path} copied from shared directory: {shared_file_path} -> {local_model_file}")
# Update local model availability
for proc_name in self.process_models:
......@@ -664,11 +665,11 @@ class ClusterClient:
# Clean up shared file (optional - master might want to keep it for other clients)
# os.remove(shared_file_path)
else:
print(f"Shared file size mismatch: expected {total_size}, got {actual_size}")
log_message(f"Shared file size mismatch: expected {total_size}, got {actual_size}")
else:
print(f"Shared file not found: {shared_file_path}")
log_message(f"Shared file not found: {shared_file_path}")
except Exception as e:
print(f"Error handling shared model file {shared_file_path}: {e}")
log_message(f"Error handling shared model file {shared_file_path}: {e}")
async def _handle_job_file_shared(self, message: Dict[str, Any]) -> None:
"""Handle job file available in shared directory."""
......@@ -677,7 +678,7 @@ class ClusterClient:
original_path = message.get('original_path')
if not self.shared_dir:
print(f"Received shared job file message but no shared directory configured: {shared_file_path}")
log_message(f"Received shared job file message but no shared directory configured: {shared_file_path}")
return
try:
......@@ -688,15 +689,15 @@ class ClusterClient:
shutil.copy2(shared_file_path, local_temp_file.name)
local_temp_file.close()
print(f"Job file for {job_id} copied from shared directory: {shared_file_path} -> {local_temp_file.name}")
log_message(f"Job file for {job_id} copied from shared directory: {shared_file_path} -> {local_temp_file.name}")
# Update job data with local path
# This would need to be stored and used when processing the job
# For now, just log
print(f"Job {job_id} file ready at: {local_temp_file.name}")
log_message(f"Job {job_id} file ready at: {local_temp_file.name}")
except Exception as e:
print(f"Error handling shared job file {shared_file_path}: {e}")
log_message(f"Error handling shared job file {shared_file_path}: {e}")
async def _handle_job_file_transfer_start(self, message: Dict[str, Any]) -> None:
"""Handle start of job file transfer."""
......@@ -711,7 +712,7 @@ class ClusterClient:
'total_size': total_size,
'received_data': b''
}
print(f"Starting job file transfer for {job_id}: {file_path} ({total_size} bytes)")
log_message(f"Starting job file transfer for {job_id}: {file_path} ({total_size} bytes)")
async def _handle_job_file_chunk(self, message: Dict[str, Any]) -> None:
"""Handle job file data chunk."""
......@@ -739,19 +740,19 @@ class ClusterClient:
temp_file.write(received_data)
temp_file.close()
print(f"Job file for {job_id} saved to: {temp_file.name}")
log_message(f"Job file for {job_id} saved to: {temp_file.name}")
# Clean up
delattr(self, '_current_job_file_transfer')
else:
print(f"Job file transfer failed: received {len(received_data)} bytes, expected {expected_size}")
log_message(f"Job file transfer failed: received {len(received_data)} bytes, expected {expected_size}")
async def run(self) -> None:
"""Main client loop with reconnection."""
reconnect = True
while reconnect: # Keep trying to connect/reconnect
if not await self.connect():
print("Failed to connect, retrying in 5 seconds...")
log_message("Failed to connect, retrying in 5 seconds...")
await asyncio.sleep(5)
continue
......@@ -772,11 +773,11 @@ class ClusterClient:
await self._send_message({'type': 'heartbeat'})
except KeyboardInterrupt:
print("Shutting down cluster client...")
log_message("Shutting down cluster client...")
reconnect = False
except Exception as e:
print(f"Connection lost: {e}, attempting to reconnect...")
log_message(f"Connection lost: {e}, attempting to reconnect...")
connection_lost = True
finally:
......@@ -837,9 +838,9 @@ if __name__ == "__main__":
args = parser.parse_args()
print(f"Starting VidAI Cluster Client")
print(f"Connecting to {args.host}:{args.port}")
log_message(f"Starting VidAI Cluster Client")
log_message(f"Connecting to {args.host}:{args.port}")
if args.shared_dir:
print(f"Using shared directory: {args.shared_dir}")
log_message(f"Using shared directory: {args.shared_dir}")
start_cluster_client(args.host, args.port, args.token, args.optimize, args.flash, args.weight, args.shared_dir)
\ No newline at end of file
......@@ -34,6 +34,7 @@ from typing import Dict, Any, List, Optional
from collections import defaultdict
import socketserver
from .comm import Message
from .logging_utils import log_message
class ClusterMaster:
......@@ -71,7 +72,7 @@ class ClusterMaster:
key_file = 'cluster.key'
if not os.path.exists(cert_file) or not os.path.exists(key_file):
print("Generating self-signed SSL certificate for cluster...")
log_message("Generating self-signed SSL certificate for cluster...")
try:
from cryptography import x509
from cryptography.x509.oid import NameOID
......@@ -122,10 +123,10 @@ class ClusterMaster:
with open(cert_file, "wb") as f:
f.write(cert.public_bytes(serialization.Encoding.PEM))
print(f"SSL certificate generated: {cert_file}, {key_file}")
log_message(f"SSL certificate generated: {cert_file}, {key_file}")
except ImportError:
print("cryptography library not available, cannot generate SSL certificate")
log_message("cryptography library not available, cannot generate SSL certificate")
raise
# Create SSL context
......@@ -137,7 +138,7 @@ class ClusterMaster:
"""Start the cluster master server."""
self.running = True
self.start_time = time.time() # Update start time when server actually starts
print(f"Cluster master started on port {self.port} (secure websocket)")
log_message(f"Cluster master started on port {self.port} (secure websocket)")
# Generate/load SSL certificate
ssl_context = self._generate_ssl_cert()
......@@ -150,7 +151,7 @@ class ClusterMaster:
notification_server = await asyncio.start_server(
self._handle_notification, 'localhost', self.port + 1
)
print(f"Cluster master notification server started on port {self.port + 1}")
log_message(f"Cluster master notification server started on port {self.port + 1}")
# Register local processes if master has weight
if self.weight > 0:
......@@ -206,8 +207,8 @@ class ClusterMaster:
# Debug VRAM info
total_vram = sum(device.get('vram_gb', 0) for device in gpu_info.get('cuda_device_info', []))
total_vram += sum(device.get('vram_gb', 0) for device in gpu_info.get('rocm_device_info', []))
print(f"Local GPU VRAM detected: {total_vram}GB")
print(f"Registered {len([p for p in self.processes if self.processes[p]['client_id'] == client_id])} local processes for backends: {available_backends}")
log_message(f"Local GPU VRAM detected: {total_vram}GB")
log_message(f"Registered {len([p for p in self.processes if self.processes[p]['client_id'] == client_id])} local processes for backends: {available_backends}")
async def _handle_client(self, websocket: websockets.WebSocketServerProtocol) -> None:
"""Handle a client websocket connection."""
......@@ -228,7 +229,7 @@ class ClusterMaster:
continue
except Exception as e:
print(f"Client connection error: {e}")
log_message(f"Client connection error: {e}")
finally:
if client_id:
self._remove_client(client_id)
......@@ -239,16 +240,16 @@ class ClusterMaster:
data = await reader.read(1024)
message = data.decode().strip()
if message == "worker_registered":
print("Received worker registration notification - checking for pending jobs")
log_message("Received worker registration notification - checking for pending jobs")
# Trigger immediate job assignment check
await self._check_pending_jobs()
elif message.startswith("ping:"):
job_id = message.split(":", 1)[1]
if job_id in self.active_jobs:
self.active_jobs[job_id]['last_progress'] = time.time()
print(f"Ping received for job {job_id} - resetting timeout")
log_message(f"Ping received for job {job_id} - resetting timeout")
except Exception as e:
print(f"Notification handling error: {e}")
log_message(f"Notification handling error: {e}")
finally:
writer.close()
await writer.wait_closed()
......@@ -291,7 +292,7 @@ class ClusterMaster:
if job_id and job_id in self.active_jobs:
# Update the job's last activity time to reset timeout
self.active_jobs[job_id]['last_progress'] = time.time()
print(f"Progress update for job {job_id}: {message.data.get('progress', 0)}%")
log_message(f"Progress update for job {job_id}: {message.data.get('progress', 0)}%")
# Update database for consistency (both local and remote workers)
from .database import update_job_progress
progress = message.data.get('progress', 0)
......@@ -303,7 +304,7 @@ class ClusterMaster:
job_id = message.data.get('job_id')
if job_id and job_id in self.active_jobs:
self.active_jobs[job_id]['last_progress'] = time.time()
print(f"Ping received for job {job_id} - resetting timeout")
log_message(f"Ping received for job {job_id} - resetting timeout")
return {'type': 'progress_ack'}
elif msg_type == 'ping':
......@@ -312,7 +313,7 @@ class ClusterMaster:
if job_id and job_id in self.active_jobs:
# Update the job's last activity time to reset timeout
self.active_jobs[job_id]['last_progress'] = time.time()
print(f"Ping received for job {job_id} - keeping alive")
log_message(f"Ping received for job {job_id} - keeping alive")
return {'type': 'pong'}
elif msg_type == 'pong':
......@@ -384,13 +385,13 @@ class ClusterMaster:
}
backend_type = "GPU" if has_gpu else "CPU-only"
print(f"Client {client_id} authenticated ({backend_type}) with backends: {available_backends}")
log_message(f"Client {client_id} authenticated ({backend_type}) with backends: {available_backends}")
# Auto-weight logic: set master weight to 0 when first client connects (unless explicitly set)
if not self.weight_explicit:
non_local_clients = [cid for cid in self.clients if not self.clients[cid].get('local', False)]
if len(non_local_clients) == 1: # This is the first non-local client
print("First client connected, setting master weight to 0 for load balancing")
log_message("First client connected, setting master weight to 0 for load balancing")
self.weight = 0
return {'type': 'auth_success', 'client_id': client_id}
......@@ -429,7 +430,7 @@ class ClusterMaster:
conn.commit()
conn.close()
print(f"Client {client_id} registered {len(processes)} processes")
log_message(f"Client {client_id} registered {len(processes)} processes")
return {'type': 'registration_success'}
def _get_client_by_websocket(self, websocket: websockets.WebSocketServerProtocol) -> Optional[str]:
......@@ -474,7 +475,7 @@ class ClusterMaster:
# No autoweight logic
print(f"Client {client_id} disconnected")
log_message(f"Client {client_id} disconnected")
def get_best_worker_for_job(self, process_type: str, model_path: str, job_data: dict = None) -> Optional[str]:
"""Get the best worker for a job considering VRAM requirements, weight, and current load."""
......@@ -624,12 +625,12 @@ class ClusterMaster:
# Send via websocket synchronously to catch connection errors
await self.client_websockets[client_id].send(json.dumps(job_message))
print(f"Job {job_id} assigned to worker {worker_key} on client {client_id}")
log_message(f"Job {job_id} assigned to worker {worker_key} on client {client_id}")
except Exception as e:
print(f"Failed to send job {job_id} to worker {worker_key}: {e}")
log_message(f"Failed to send job {job_id} to worker {worker_key}: {e}")
# Connection is broken, remove the client
if client_id in self.clients:
print(f"Removing disconnected client {client_id}")
log_message(f"Removing disconnected client {client_id}")
self._remove_client(client_id)
# Clean up the failed assignment
self.worker_jobs[worker_key].remove(job_id)
......@@ -637,7 +638,7 @@ class ClusterMaster:
del self.active_jobs[job_id]
return None
else:
print(f"Client {client_id} not connected, cannot assign job {job_id}")
log_message(f"Client {client_id} not connected, cannot assign job {job_id}")
# Clean up the failed assignment
self.worker_jobs[worker_key].remove(job_id)
self.worker_vram_usage[worker_key] -= vram_required
......@@ -697,7 +698,7 @@ class ClusterMaster:
response = backend_comm.receive_message()
if response and response.msg_type == 'error':
error_msg = response.data.get('error', 'Unknown error')
print(f"Backend refused job {job_id}: {error_msg}")
log_message(f"Backend refused job {job_id}: {error_msg}")
# Re-queue the job
from .database import update_queue_status
update_queue_status(queue_id, 'queued', error=f'Backend refused job: {error_msg}')
......@@ -707,13 +708,13 @@ class ClusterMaster:
del self.active_jobs[job_id]
return None
else:
print(f"Job {job_id} forwarded to local backend for {process_type} processing")
log_message(f"Job {job_id} forwarded to local backend for {process_type} processing")
# Start monitoring for result
self.pending_jobs[job_id] = asyncio.create_task(self._monitor_job_result(job_id, process_type))
return job_id
except Exception as e:
print(f"Failed to forward job {job_id} to local backend: {e}")
log_message(f"Failed to forward job {job_id} to local backend: {e}")
# Clean up
self.worker_jobs[worker_key].remove(job_id)
self.worker_vram_usage[worker_key] -= vram_required
......@@ -745,7 +746,7 @@ class ClusterMaster:
)
backend_comm.send_message(cancel_message)
except Exception as e:
print(f"Failed to send cancel to local backend: {e}")
log_message(f"Failed to send cancel to local backend: {e}")
else:
# Send cancel to remote client
if client_id in self.client_websockets:
......@@ -755,7 +756,7 @@ class ClusterMaster:
'job_id': job_id
}))
except Exception as e:
print(f"Failed to send cancel to client {client_id}: {e}")
log_message(f"Failed to send cancel to client {client_id}: {e}")
if client_id in self.clients:
self._remove_client(client_id)
......@@ -791,7 +792,7 @@ class ClusterMaster:
response = backend_comm.receive_message()
if response and response.msg_type in ['analyze_response', 'train_response']:
result_data = response.data
print(f"Received result for job {job_id}")
log_message(f"Received result for job {job_id}")
# Handle result
await self._handle_job_result({
......@@ -808,7 +809,7 @@ class ClusterMaster:
# Check if it's a worker not available error - re-queue the job
error_msg = response.data.get('error', '')
if 'not available' in error_msg.lower() or 'not found' in error_msg.lower():
print(f"Worker not available for job {job_id}, re-queuing after delay: {error_msg}")
log_message(f"Worker not available for job {job_id}, re-queuing after delay: {error_msg}")
# Wait a bit for worker to register, then re-queue
await asyncio.sleep(2)
from .database import update_queue_status
......@@ -821,7 +822,7 @@ class ClusterMaster:
return
else:
# Other error, fail the job
print(f"Job {job_id} failed with error: {error_msg}")
log_message(f"Job {job_id} failed with error: {error_msg}")
await self._handle_job_result({
'job_id': job_id,
'result': {'status': 'failed', 'error': error_msg}
......@@ -849,19 +850,19 @@ class ClusterMaster:
continue
except Exception as e:
print(f"Error polling for job {job_id} result: {e}")
log_message(f"Error polling for job {job_id} result: {e}")
await asyncio.sleep(1)
# Timeout - job took too long
elapsed = time.time() - start_time
print(f"Job {job_id} timed out waiting for result ({elapsed:.0f} seconds)")
log_message(f"Job {job_id} timed out waiting for result ({elapsed:.0f} seconds)")
await self._handle_job_result({
'job_id': job_id,
'result': {'status': 'failed', 'error': f'Job timed out after {elapsed:.0f} seconds'}
})
except Exception as e:
print(f"Error monitoring job {job_id}: {e}")
log_message(f"Error monitoring job {job_id}: {e}")
finally:
if job_id in self.pending_jobs:
del self.pending_jobs[job_id]
......@@ -892,14 +893,14 @@ class ClusterMaster:
'original_path': media_path
}))
except Exception as e:
print(f"Failed to send shared file notification for job {job_id}: {e}")
log_message(f"Failed to send shared file notification for job {job_id}: {e}")
# Connection is broken, remove the client
if client_id in self.clients:
print(f"Removing disconnected client {client_id}")
log_message(f"Removing disconnected client {client_id}")
self._remove_client(client_id)
return
except Exception as e:
print(f"Failed to use shared directory for job {job_id}: {e}")
log_message(f"Failed to use shared directory for job {job_id}: {e}")
# Fall back to websocket transfer
await self._transfer_file_via_websocket(client_id, media_path, job_id)
else:
......@@ -942,15 +943,15 @@ class ClusterMaster:
'file_path': file_path
}))
except Exception as e:
print(f"Failed to transfer file {file_path} for job {job_id}: {e}")
log_message(f"Failed to transfer file {file_path} for job {job_id}: {e}")
# Connection is broken, remove the client
if client_id in self.clients:
print(f"Removing disconnected client {client_id}")
log_message(f"Removing disconnected client {client_id}")
self._remove_client(client_id)
raise # Re-raise to indicate failure
except Exception as e:
print(f"Failed to transfer file {file_path} for job {job_id}: {e}")
log_message(f"Failed to transfer file {file_path} for job {job_id}: {e}")
def complete_job(self, job_id: str, result: dict = None) -> None:
"""Mark a job as completed and free up resources."""
......@@ -989,11 +990,11 @@ class ClusterMaster:
self.clients[client_id]['consecutive_failures'] += 1
if self.clients[client_id]['consecutive_failures'] >= 3:
self.clients[client_id]['failing'] = True
print(f"Client {client_id} marked as failing after {self.clients[client_id]['consecutive_failures']} consecutive failures")
log_message(f"Client {client_id} marked as failing after {self.clients[client_id]['consecutive_failures']} consecutive failures")
else:
# Reset failure counter on success
if self.clients[client_id]['consecutive_failures'] > 0:
print(f"Client {client_id} failure counter reset (was {self.clients[client_id]['consecutive_failures']})")
log_message(f"Client {client_id} failure counter reset (was {self.clients[client_id]['consecutive_failures']})")
self.clients[client_id]['consecutive_failures'] = 0
self.clients[client_id]['failing'] = False
......@@ -1011,14 +1012,14 @@ class ClusterMaster:
if new_retry_count >= 5:
# Mark as permanently failed after 5 retries
update_queue_status(queue_entry['id'], 'failed', result, error=f'Job failed after {new_retry_count} attempts: {result.get("error", "Unknown error")}', retry_count=new_retry_count)
print(f"Job {job_id} failed permanently after {new_retry_count} attempts")
log_message(f"Job {job_id} failed permanently after {new_retry_count} attempts")
else:
# Re-queue failed jobs with incremented retry count
update_queue_status(queue_entry['id'], 'queued', result, error=f'Job failed on worker (attempt {new_retry_count}/5): {result.get("error", "Unknown error")}', retry_count=new_retry_count)
print(f"Job {job_id} failed, re-queued (attempt {new_retry_count}/5)")
log_message(f"Job {job_id} failed, re-queued (attempt {new_retry_count}/5)")
else:
update_queue_status(queue_entry['id'], 'completed', result)
print(f"Job {job_id} completed")
log_message(f"Job {job_id} completed")
# TODO: Store result for retrieval by web interface
# For now, this would need integration with the queue system
......@@ -1029,7 +1030,7 @@ class ClusterMaster:
job_id = message.get('job_id')
if client_id and job_id:
print(f"Client {client_id} requested to cancel job {job_id}")
log_message(f"Client {client_id} requested to cancel job {job_id}")
# The job status is already set to cancelled in the database
# Clean up any active tracking
if job_id in self.active_jobs:
......@@ -1044,7 +1045,7 @@ class ClusterMaster:
if client_id and job_id and file_path:
# Transfer result file back to master
# This would implement the reverse of job file transfer
print(f"Worker {client_id} requesting to send result file for job {job_id}: {file_path}")
log_message(f"Worker {client_id} requesting to send result file for job {job_id}: {file_path}")
# TODO: Implement file transfer back to master
def get_best_worker(self, process_type: str, prefer_local: bool = False) -> Optional[str]:
......@@ -1093,18 +1094,18 @@ class ClusterMaster:
'total_size': len(model_data)
}))
except Exception as e:
print(f"Failed to send model shared file notification to client {client_id}: {e}")
log_message(f"Failed to send model shared file notification to client {client_id}: {e}")
# Connection is broken, remove the client
if client_id in self.clients:
print(f"Removing disconnected client {client_id}")
log_message(f"Removing disconnected client {client_id}")
self._remove_client(client_id)
return False
print(f"Model {model_path} placed in shared directory for client {client_id}: {shared_file_path}")
log_message(f"Model {model_path} placed in shared directory for client {client_id}: {shared_file_path}")
return True
except Exception as e:
print(f"Failed to use shared directory for model transfer to client {client_id}: {e}")
log_message(f"Failed to use shared directory for model transfer to client {client_id}: {e}")
# Fall back to websocket transfer
else:
# Use websocket transfer (original method)
......@@ -1137,7 +1138,7 @@ class ClusterMaster:
return True
except Exception as e:
print(f"Failed to send model to client {client_id}: {e}")
log_message(f"Failed to send model to client {client_id}: {e}")
return False
def load_model_file(self, model_path: str) -> Optional[bytes]:
......@@ -1155,10 +1156,10 @@ class ClusterMaster:
try:
from huggingface_hub import hf_hub_download
# This would need proper implementation
print(f"Model {model_path} not found locally, would download from HuggingFace")
log_message(f"Model {model_path} not found locally, would download from HuggingFace")
return None # Placeholder
except ImportError:
print("huggingface_hub not available for model download")
log_message("huggingface_hub not available for model download")
return None
async def enable_process(self, process_key: str) -> bool:
......@@ -1176,10 +1177,10 @@ class ClusterMaster:
})
)
except Exception as e:
print(f"Failed to send enable_process command to client {client_id}: {e}")
log_message(f"Failed to send enable_process command to client {client_id}: {e}")
# Connection is broken, remove the client
if client_id in self.clients:
print(f"Removing disconnected client {client_id}")
log_message(f"Removing disconnected client {client_id}")
self._remove_client(client_id)
return False
return True
......@@ -1237,19 +1238,19 @@ class ClusterMaster:
"""Advanced worker selection based on VRAM requirements and job scheduling logic."""
from .models import estimate_model_vram_requirements
print(f"DEBUG: Selecting worker for {process_type}, model {model_path}")
print(f"DEBUG: Available processes: {list(self.processes.keys())}")
print(f"DEBUG: Process queue for {process_type}: {self.process_queue.get(process_type, [])}")
log_message(f"DEBUG: Selecting worker for {process_type}, model {model_path}")
log_message(f"DEBUG: Available processes: {list(self.processes.keys())}")
log_message(f"DEBUG: Process queue for {process_type}: {self.process_queue.get(process_type, [])}")
# Step 1: Determine VRAM required for the model (includes overhead)
required_vram_gb = estimate_model_vram_requirements(model_path)
print(f"DEBUG: Required VRAM: {required_vram_gb}GB (includes model overhead)")
log_message(f"DEBUG: Required VRAM: {required_vram_gb}GB (includes model overhead)")
# Step 2: Determine workers with sufficient GPU memory
available_workers = []
concurrent_workers = []
print(f"DEBUG: Checking {len(self.process_queue.get(process_type, []))} processes for {process_type}")
log_message(f"DEBUG: Checking {len(self.process_queue.get(process_type, []))} processes for {process_type}")
for proc_key, proc_weight in self.process_queue.get(process_type, []):
client_id = self.processes[proc_key]['client_id']
......@@ -1257,13 +1258,13 @@ class ClusterMaster:
# Skip failing clients
if client_info.get('failing', False):
print(f"DEBUG: Skipping failing client {client_id}")
log_message(f"DEBUG: Skipping failing client {client_id}")
continue
gpu_info = client_info.get('gpu_info', {})
print(f"DEBUG: Checking worker {proc_key} on client {client_id}")
print(f"DEBUG: GPU info: {gpu_info}")
log_message(f"DEBUG: Checking worker {proc_key} on client {client_id}")
log_message(f"DEBUG: GPU info: {gpu_info}")
# Get total available VRAM - be more flexible with the data structure
total_vram = 0
......@@ -1279,7 +1280,7 @@ class ClusterMaster:
# Fallback: assume 8GB per CUDA device
cuda_count = gpu_info.get('cuda_devices', 1)
total_vram += cuda_count * 8
print(f"DEBUG: CUDA detected, total VRAM: {total_vram}GB")
log_message(f"DEBUG: CUDA detected, total VRAM: {total_vram}GB")
# Check for ROCm
if gpu_info.get('rocm', False) or gpu_info.get('rocm_devices', 0) > 0:
......@@ -1291,7 +1292,7 @@ class ClusterMaster:
# Fallback: assume 16GB per ROCm device
rocm_count = gpu_info.get('rocm_devices', 1)
total_vram += rocm_count * 16
print(f"DEBUG: ROCm detected, total VRAM: {total_vram}GB")
log_message(f"DEBUG: ROCm detected, total VRAM: {total_vram}GB")
# If no specific GPU info but client has GPU backends, assume sufficient VRAM
available_backends = client_info.get('available_backends', [])
......@@ -1300,21 +1301,21 @@ class ClusterMaster:
has_gpu = True
# Assume at least 8GB for GPU workers
total_vram = max(total_vram, 8)
print(f"DEBUG: GPU backends detected {gpu_backends}, assuming {total_vram}GB VRAM")
log_message(f"DEBUG: GPU backends detected {gpu_backends}, assuming {total_vram}GB VRAM")
# For CPU-only workers, allow them but with lower priority
if not has_gpu and 'cpu' in available_backends:
total_vram = 0 # CPU has no VRAM limit
print(f"DEBUG: CPU-only worker detected")
log_message(f"DEBUG: CPU-only worker detected")
# Check if worker has enough VRAM (skip check for CPU workers)
# Required VRAM already includes model-specific overhead
has_sufficient_vram = total_vram >= required_vram_gb or not has_gpu
if has_sufficient_vram:
available_workers.append((proc_key, client_info['weight'], total_vram))
print(f"DEBUG: Worker {proc_key} accepted (VRAM: {total_vram}GB, required: {required_vram_gb}GB)")
log_message(f"DEBUG: Worker {proc_key} accepted (VRAM: {total_vram}GB, required: {required_vram_gb}GB)")
else:
print(f"DEBUG: Worker {proc_key} rejected - insufficient VRAM ({total_vram}GB < {required_vram_gb}GB)")
log_message(f"DEBUG: Worker {proc_key} rejected - insufficient VRAM ({total_vram}GB < {required_vram_gb}GB)")
if not available_workers:
return None
......@@ -1366,10 +1367,10 @@ class ClusterMaster:
})
)
except Exception as e:
print(f"Failed to send disable_process command to client {client_id}: {e}")
log_message(f"Failed to send disable_process command to client {client_id}: {e}")
# Connection is broken, remove the client
if client_id in self.clients:
print(f"Removing disconnected client {client_id}")
log_message(f"Removing disconnected client {client_id}")
self._remove_client(client_id)
return False
return True
......@@ -1403,10 +1404,10 @@ class ClusterMaster:
})
)
except Exception as e:
print(f"Failed to send update_weight command to client {client_id}: {e}")
log_message(f"Failed to send update_weight command to client {client_id}: {e}")
# Connection is broken, remove the client
if client_id in self.clients:
print(f"Removing disconnected client {client_id}")
log_message(f"Removing disconnected client {client_id}")
self._remove_client(client_id)
return False
return True
......@@ -1418,7 +1419,7 @@ class ClusterMaster:
return False
if backend not in ['cuda', 'rocm', 'cpu']:
print(f"Invalid backend requested: {backend} - only CUDA, ROCm, and CPU supported")
log_message(f"Invalid backend requested: {backend} - only CUDA, ROCm, and CPU supported")
return False
# Send restart command to client
......@@ -1431,10 +1432,10 @@ class ClusterMaster:
)
return True
except Exception as e:
print(f"Failed to send restart_workers command to client {client_id}: {e}")
log_message(f"Failed to send restart_workers command to client {client_id}: {e}")
# Connection is broken, remove the client
if client_id in self.clients:
print(f"Removing disconnected client {client_id}")
log_message(f"Removing disconnected client {client_id}")
self._remove_client(client_id)
return False
......@@ -1444,7 +1445,7 @@ class ClusterMaster:
return False
if backend not in ['cuda', 'rocm', 'cpu']:
print(f"Invalid backend requested: {backend} - only CUDA, ROCm, and CPU supported")
log_message(f"Invalid backend requested: {backend} - only CUDA, ROCm, and CPU supported")
return False
# Send restart command for specific worker to client
......@@ -1458,10 +1459,10 @@ class ClusterMaster:
)
return True
except Exception as e:
print(f"Failed to send restart command for worker {worker_name} to client {client_id}: {e}")
log_message(f"Failed to send restart command for worker {worker_name} to client {client_id}: {e}")
# Connection is broken, remove the client
if client_id in self.clients:
print(f"Removing disconnected client {client_id}")
log_message(f"Removing disconnected client {client_id}")
self._remove_client(client_id)
return False
......@@ -1478,7 +1479,7 @@ class ClusterMaster:
should_print_status = jobs and (current_time - self.last_job_status_print) >= 10
if should_print_status:
print(f"Found {len(jobs)} pending job(s)")
log_message(f"Found {len(jobs)} pending job(s)")
self.last_job_status_print = current_time
for job_row in jobs:
......@@ -1495,7 +1496,7 @@ class ClusterMaster:
job['data'] = json.loads(job['data']) if job['data'] else None
process_type = 'analysis' if job['request_type'] == 'analyze' else ('training' if job['request_type'] == 'train' else job['request_type'])
if should_print_status:
print(f"Job {job['id']} waiting for available workers")
log_message(f"Job {job['id']} waiting for available workers")
worker_key = self.select_worker_for_job(process_type, job['data'].get('model_path', 'Qwen/Qwen2.5-VL-7B-Instruct'), job['data'])
if worker_key:
job_id = await self.assign_job_to_worker(worker_key, job['data'], queue_id=job['id'])
......@@ -1508,12 +1509,12 @@ class ClusterMaster:
'worker_type': self.processes[worker_key]['name']
}
update_queue_status(job['id'], 'processing', worker_info, job_id=job_id)
print(f"Assigned job {job['id']} ({job_id}) to worker {worker_key}")
log_message(f"Assigned job {job['id']} ({job_id}) to worker {worker_key}")
else:
print(f"Failed to assign job {job['id']} to worker {worker_key}, re-queuing")
log_message(f"Failed to assign job {job['id']} to worker {worker_key}, re-queuing")
update_queue_status(job['id'], 'queued', error='Failed to assign to worker, re-queued')
else:
print(f"No suitable worker found for job {job['id']}, re-queuing")
log_message(f"No suitable worker found for job {job['id']}, re-queuing")
update_queue_status(job['id'], 'queued', error='No suitable worker found, re-queued')
async def _management_loop(self) -> None:
......@@ -1580,7 +1581,7 @@ def start_cluster_master(host: str = '0.0.0.0', port: int = 5003, shared_dir: st
weight = int(config_weight)
weight_explicit = True # Config weight counts as explicit
except ValueError:
print(f"Invalid cluster_master_weight in config: {config_weight}, using auto")
log_message(f"Invalid cluster_master_weight in config: {config_weight}, using auto")
weight = 100
weight_explicit = False
else:
......@@ -1627,20 +1628,20 @@ if __name__ == "__main__":
try:
weight_value = int(args.weight)
except ValueError:
print(f"Invalid weight value: {args.weight}, using 'auto'")
log_message(f"Invalid weight value: {args.weight}, using 'auto'")
weight_value = 'auto'
weight_explicit = False
if weight_explicit and weight_value != 'auto':
print(f"Starting VidAI Cluster Master on {args.host}:{args.port} with explicit weight {weight_value}")
log_message(f"Starting VidAI Cluster Master on {args.host}:{args.port} with explicit weight {weight_value}")
else:
print(f"Starting VidAI Cluster Master on {args.host}:{args.port} with {'default ' if not weight_explicit else ''}weight {weight_value} (will auto-adjust when clients connect)")
log_message(f"Starting VidAI Cluster Master on {args.host}:{args.port} with {'default ' if not weight_explicit else ''}weight {weight_value} (will auto-adjust when clients connect)")
if args.shared_dir:
print(f"Using shared directory: {args.shared_dir}")
log_message(f"Using shared directory: {args.shared_dir}")
# Validate shared directory exists
if not os.path.exists(args.shared_dir):
print(f"Warning: Shared directory {args.shared_dir} does not exist. Creating it.")
log_message(f"Warning: Shared directory {args.shared_dir} does not exist. Creating it.")
os.makedirs(args.shared_dir, exist_ok=True)
start_cluster_master(args.host, args.port, args.shared_dir, weight_value, weight_explicit)
\ No newline at end of file
......@@ -28,6 +28,7 @@ from typing import Dict, Any, Optional
from dataclasses import dataclass
from .compat import get_socket_path, is_unix_sockets_supported
from .config import get_debug
from .logging_utils import log_message
@dataclass
......@@ -72,7 +73,7 @@ class SocketCommunicator:
}).encode('utf-8')
full_data = data + b'\n'
if get_debug():
print(f"DEBUG: SocketCommunicator sending: {full_data}")
log_message(f"DEBUG: SocketCommunicator sending: {full_data}")
self.sock.sendall(full_data)
def receive_message(self) -> Optional[Message]:
......@@ -156,7 +157,7 @@ class SocketServer:
for msg_str in messages:
if msg_str.strip():
if get_debug():
print(f"DEBUG: SocketServer processing message: {repr(msg_str)}")
log_message(f"DEBUG: SocketServer processing message: {repr(msg_str)}")
try:
msg_data = json.loads(msg_str)
message = Message(
......@@ -165,7 +166,7 @@ class SocketServer:
data=msg_data['data']
)
if get_debug():
print(f"DEBUG: SocketServer parsed message: {message}")
log_message(f"DEBUG: SocketServer parsed message: {message}")
response = self.message_handler(message, client_sock)
if response:
resp_data = json.dumps({
......@@ -176,7 +177,7 @@ class SocketServer:
client_sock.sendall(resp_data + b'\n')
except json.JSONDecodeError as e:
if get_debug():
print(f"DEBUG: SocketServer JSON decode error: {e}")
log_message(f"DEBUG: SocketServer JSON decode error: {e}")
pass
except:
pass
......
......@@ -18,6 +18,7 @@
Cross-platform compatibility utilities for Video AI.
Handles differences between Linux and Windows platforms.
"""
from .logging_utils import log_message
import os
import sys
......
......@@ -23,6 +23,7 @@ Supports CLI, config file, environment variables, and defaults.
from .config_loader import load_initial_config, DEFAULTS
from .database import get_config, set_config, get_all_config, get_system_prompt, set_system_prompt
from .logging_utils import log_message
def initialize_config(cli_args=None) -> None:
......@@ -37,20 +38,20 @@ def initialize_config(cli_args=None) -> None:
debug_explicitly_set = False
if cli_args and hasattr(cli_args, 'debug') and cli_args.debug:
debug_explicitly_set = True
print(f"DEBUG_CHECK: debug set from CLI args")
log_message(f"DEBUG_CHECK: debug set from CLI args")
elif f'VIDAI_DEBUG' in os.environ:
debug_explicitly_set = True
print(f"DEBUG_CHECK: debug set from environment VIDAI_DEBUG={os.environ['VIDAI_DEBUG']}")
log_message(f"DEBUG_CHECK: debug set from environment VIDAI_DEBUG={os.environ['VIDAI_DEBUG']}")
elif 'debug' in initial_config and initial_config['debug'] != DEFAULTS['debug']:
debug_explicitly_set = True
print(f"DEBUG_CHECK: debug set from config file, initial_config['debug']={initial_config['debug']}, DEFAULTS['debug']={DEFAULTS['debug']}")
log_message(f"DEBUG_CHECK: debug set from config file, initial_config['debug']={initial_config['debug']}, DEFAULTS['debug']={DEFAULTS['debug']}")
if not debug_explicitly_set:
# Reset debug to false if not explicitly set via CLI, env, or config file
initial_config['debug'] = 'false'
print(f"MAIN_DEBUG_SET: debug reset to false (not explicitly set)")
log_message(f"MAIN_DEBUG_SET: debug reset to false (not explicitly set)")
else:
print(f"MAIN_DEBUG_SET: debug kept as {initial_config['debug']} (explicitly set)")
log_message(f"MAIN_DEBUG_SET: debug kept as {initial_config['debug']} (explicitly set)")
# Special handling for debug_web: same logic as debug
debug_web_explicitly_set = False
......
......@@ -18,6 +18,7 @@
Configuration loader for Video AI.
Handles loading from CLI, config file, environment variables, and defaults.
"""
from .logging_utils import log_message
import os
import configparser
......@@ -111,7 +112,7 @@ def load_initial_config(cli_args=None) -> dict:
if custom_config.exists():
config_files.append(custom_config)
else:
print(f"Warning: Specified config file '{args.config}' does not exist. Falling back to default locations.")
log_message(f"Warning: Specified config file '{args.config}' does not exist. Falling back to default locations.")
else:
# Default locations
config_files = [
......
......@@ -23,6 +23,7 @@ import os
import json
from typing import Dict, Any, Optional, List
from .compat import get_user_config_dir, ensure_dir
from .logging_utils import log_message
# Database imports - conditionally import MySQL
try:
......@@ -698,9 +699,9 @@ def init_db(conn) -> None:
cursor.execute('ALTER TABLE processing_queue ADD COLUMN updated_at TIMESTAMP')
# Then set default for future inserts
cursor.execute('UPDATE processing_queue SET updated_at = CURRENT_TIMESTAMP WHERE updated_at IS NULL')
print("Added updated_at column to processing_queue table")
log_message("Added updated_at column to processing_queue table")
except Exception as e:
print(f"Error adding updated_at column: {e}")
log_message(f"Error adding updated_at column: {e}")
pass
# Cluster processes table
......
......@@ -26,6 +26,7 @@ from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from typing import Optional
from .config import get_config
from .logging_utils import log_message
def get_smtp_config() -> dict:
......@@ -45,7 +46,7 @@ def send_email(to_email: str, subject: str, html_content: str, text_content: str
config = get_smtp_config()
if not config['username'] or not config['password']:
print("SMTP not configured. Email sending disabled.")
log_message("SMTP not configured. Email sending disabled.")
return False
try:
......@@ -78,7 +79,7 @@ def send_email(to_email: str, subject: str, html_content: str, text_content: str
return True
except Exception as e:
print(f"Email sending failed: {e}")
log_message(f"Email sending failed: {e}")
return False
......
"""
Logging utilities for Video AI application.
"""
import datetime
import os
def log_message(message: str, process: str = None) -> None:
"""Log a message with timestamp and process information."""
if process is None:
# Auto-detect process type from current script name
script_name = os.path.basename(__file__)
if 'backend' in script_name:
process = 'backend'
elif 'worker_analysis' in script_name:
process = 'worker_analysis'
elif 'worker_training' in script_name:
process = 'worker_training'
elif 'web' in script_name:
process = 'web'
elif 'cluster_master' in script_name:
process = 'cluster_master'
elif 'cluster_client' in script_name:
process = 'cluster_client'
elif 'api' in script_name:
process = 'api'
elif 'admin' in script_name:
process = 'admin'
elif 'queue' in script_name:
process = 'queue'
elif 'database' in script_name:
process = 'database'
elif 'auth' in script_name:
process = 'auth'
elif 'config' in script_name:
process = 'config'
elif 'runpod' in script_name:
process = 'runpod'
elif 'email' in script_name:
process = 'email'
elif 'payments' in script_name:
process = 'payments'
elif 'comm' in script_name:
process = 'comm'
else:
process = 'main'
timestamp = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
print(f"[{timestamp}] [{process}] {message}")
\ No newline at end of file
......@@ -336,6 +336,7 @@ def estimate_model_vram_requirements(model_path: str) -> int:
# First, try to get from database
try:
from .database import get_db_connection
from .logging_utils import log_message
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute('SELECT vram_estimate, vram_overhead_gb FROM models WHERE path = ?', (model_path,))
......
......@@ -25,6 +25,7 @@ from typing import Dict, Any, Optional, Tuple, List
from .config import get_config
from .database import update_user_tokens
from .email_utils import send_payment_confirmation
from .logging_utils import log_message
class PaymentProcessor:
......@@ -59,7 +60,7 @@ class StripeProcessor(PaymentProcessor):
stripe.api_key = self.secret_key
except ImportError:
self.enabled = False
print("Stripe not available. Install with: pip install stripe")
log_message("Stripe not available. Install with: pip install stripe")
def process_payment(self, user_id: int, tokens: int, amount: float, currency: str = 'USD') -> Tuple[bool, str, Optional[str]]:
"""Process Stripe payment."""
......@@ -105,7 +106,7 @@ class StripeProcessor(PaymentProcessor):
}
except Exception as e:
print(f"Stripe payment intent creation failed: {e}")
log_message(f"Stripe payment intent creation failed: {e}")
return None
......
......@@ -20,6 +20,7 @@ Queue management for concurrent processing.
from typing import List, Dict, Any, Optional
from .database import (
from .logging_utils import log_message
add_to_queue, update_queue_status,
get_queue_status, get_user_queue_items, get_queue_position
)
......@@ -29,7 +30,7 @@ class QueueManager:
"""Manages job submission to the processing queue."""
def __init__(self):
print("QueueManager initialized", flush=True)
log_message("QueueManager initialized", flush=True)
self.last_status_print = 0 # Timestamp of last status message
def submit_job(self, user_id: int, request_type: str, data: dict, priority: int = 0) -> int:
......@@ -90,9 +91,9 @@ class QueueManager:
update_queue_status(queue_id, 'cancelled')
if cancelled_job_id:
print(f"Job {queue_id} ({cancelled_job_id}) cancelled")
log_message(f"Job {queue_id} ({cancelled_job_id}) cancelled")
else:
print(f"Job {queue_id} cancelled")
log_message(f"Job {queue_id} cancelled")
return True
......
......@@ -28,6 +28,7 @@ from typing import Dict, List, Optional, Any
from dataclasses import dataclass
from .config import get_runpod_api_key, set_runpod_api_key, get_runpod_template_id, set_runpod_template_id, get_runpod_gpu_type, set_runpod_gpu_type, get_use_runpod_pods, set_use_runpod_pods
from .compat import get_user_config_dir, ensure_dir
from .logging_utils import log_message
@dataclass
......@@ -162,7 +163,7 @@ class RunPodManager:
return pod
except Exception as e:
print(f"Failed to create pod: {e}")
log_message(f"Failed to create pod: {e}")
return None
......@@ -235,7 +236,7 @@ class RunPodManager:
pods_to_terminate.append(pod_id)
for pod_id in pods_to_terminate:
print(f"Terminating idle pod: {pod_id}")
log_message(f"Terminating idle pod: {pod_id}")
self.terminate_pod(pod_id)
def get_active_pods(self) -> List[RunPodPod]:
......
......@@ -33,6 +33,7 @@ from .database import get_user_tokens, update_user_tokens, get_user_queue_items,
from .api import api_bp
from .admin import admin_bp
from .utils import get_current_user_session, login_required, admin_required
from .logging_utils import log_message
# Determine project root (parent of vidai directory)
current_dir = os.path.dirname(os.path.abspath(__file__))
......@@ -84,7 +85,7 @@ def send_to_backend(msg_type: str, data: dict) -> str:
comm.send_message(message)
return msg_id
except Exception as e:
print(f"Failed to send message to backend: {e}")
log_message(f"Failed to send message to backend: {e}")
return msg_id
def get_progress(job_id: str) -> dict:
......@@ -102,7 +103,7 @@ def get_progress(job_id: str) -> dict:
elif response and response.msg_type == 'progress_pending':
return {'status': 'no_progress'}
except Exception as e:
print(f"Error getting progress: {e}")
log_message(f"Error getting progress: {e}")
return {}
def get_result(msg_id: str) -> dict:
......@@ -1211,7 +1212,7 @@ def detect_local_workers():
continue
except Exception as e:
print(f"Error detecting local workers: {e}")
log_message(f"Error detecting local workers: {e}")
return workers
......@@ -1323,7 +1324,7 @@ def switch_local_worker_backends(new_backend):
available_backends = get_available_backends()
if new_backend not in available_backends:
print(f"Warning: {new_backend} backend not available, available: {available_backends}")
log_message(f"Warning: {new_backend} backend not available, available: {available_backends}")
# Try to start with available backends instead
backends_to_use = [b for b in available_backends if b != new_backend][:1] # Use first available
if not backends_to_use:
......@@ -1334,24 +1335,24 @@ def switch_local_worker_backends(new_backend):
try:
cmd = [sys.executable, '-m', 'vidai.worker_analysis', new_backend]
subprocess.Popen(cmd)
print(f"Started analysis worker with {new_backend} backend")
log_message(f"Started analysis worker with {new_backend} backend")
except Exception as e:
print(f"Failed to start analysis worker: {e}")
log_message(f"Failed to start analysis worker: {e}")
return False
# Start training worker
try:
cmd = [sys.executable, '-m', 'vidai.worker_training', new_backend]
subprocess.Popen(cmd)
print(f"Started training worker with {new_backend} backend")
log_message(f"Started training worker with {new_backend} backend")
except Exception as e:
print(f"Failed to start training worker: {e}")
log_message(f"Failed to start training worker: {e}")
return False
return True
except Exception as e:
print(f"Error switching local worker backends: {e}")
log_message(f"Error switching local worker backends: {e}")
return False
@app.route('/api_tokens')
......@@ -1711,7 +1712,7 @@ if __name__ == "__main__":
server_dir = args.server_dir
if server_dir:
server_dir = os.path.abspath(server_dir)
print(f"Server directory set to: {server_dir}")
log_message(f"Server directory set to: {server_dir}")
# Set server_dir in API module
import vidai.api as api_module
......
......@@ -31,6 +31,7 @@ import uuid
from .comm import SocketCommunicator, Message
from .models import get_model
from .config import get_system_prompt_content, get_comm_type, get_backend_worker_port, get_debug
from .logging_utils import log_message
# Set PyTorch CUDA memory management
os.environ['PYTORCH_CUDA_ALLOC_CONF'] = 'expandable_segments:True'
......@@ -185,7 +186,7 @@ def analyze_media(media_path, prompt, model_path, interval=10, job_id=None, comm
"""Analyze media using dynamic model loading."""
job_id_int = int(job_id.split('_')[1]) if job_id else None
if get_debug():
print(f"DEBUG: Starting analyze_media for job {job_id_int}, media_path={media_path}")
log_message(f"DEBUG: Starting analyze_media for job {job_id_int}, media_path={media_path}")
# Send initial progress update
if comm:
......@@ -197,17 +198,17 @@ def analyze_media(media_path, prompt, model_path, interval=10, job_id=None, comm
'tokens_used': 0
})
comm.send_message(progress_msg)
print(f"PROGRESS: Job {job_id_int} - 5% - Initializing analysis job")
log_message(f"PROGRESS: Job {job_id_int} - 5% - Initializing analysis job")
torch.cuda.empty_cache()
total_tokens = 0
# Get model with reference counting
if get_debug():
print(f"DEBUG: Loading model {model_path} for job {job_id_int}")
log_message(f"DEBUG: Loading model {model_path} for job {job_id_int}")
model = get_or_load_model(model_path)
if get_debug():
print(f"DEBUG: Model loaded for job {job_id_int}")
log_message(f"DEBUG: Model loaded for job {job_id_int}")
# Send progress update after model loading
if comm:
......@@ -218,11 +219,11 @@ def analyze_media(media_path, prompt, model_path, interval=10, job_id=None, comm
'message': f'Model {model_path.split("/")[-1]} loaded successfully'
})
comm.send_message(progress_msg)
print(f"PROGRESS: Job {job_id_int} - 8% - Model loaded successfully")
log_message(f"PROGRESS: Job {job_id_int} - 8% - Model loaded successfully")
# Get system prompt
if get_debug():
print(f"DEBUG: Retrieving system prompt for job {job_id_int}")
log_message(f"DEBUG: Retrieving system prompt for job {job_id_int}")
try:
from .config import get_system_prompt_content
system_prompt = get_system_prompt_content()
......@@ -230,11 +231,11 @@ def analyze_media(media_path, prompt, model_path, interval=10, job_id=None, comm
except:
full_prompt = prompt
if get_debug():
print(f"DEBUG: Full prompt set for job {job_id_int}")
log_message(f"DEBUG: Full prompt set for job {job_id_int}")
if is_video(media_path):
if get_debug():
print(f"DEBUG: Detected video, extracting frames for job {job_id_int}")
log_message(f"DEBUG: Detected video, extracting frames for job {job_id_int}")
frames, output_dir = extract_frames(media_path, interval, optimize=True)
total_frames = len(frames)
......@@ -247,13 +248,13 @@ def analyze_media(media_path, prompt, model_path, interval=10, job_id=None, comm
'message': f'Extracted {total_frames} frames'
})
comm.send_message(progress_msg)
print(f"PROGRESS: Job {job_id_int} - 10% - Extracted {total_frames} frames")
log_message(f"PROGRESS: Job {job_id_int} - 10% - Extracted {total_frames} frames")
descriptions = []
for i, (frame_path, ts) in enumerate(frames):
if get_debug():
print(f"DEBUG: Processing frame {i+1}/{total_frames} at {ts:.2f}s for job {job_id_int}")
log_message(f"DEBUG: Processing frame {i+1}/{total_frames} at {ts:.2f}s for job {job_id_int}")
# Send progress update before processing
if comm:
......@@ -265,12 +266,12 @@ def analyze_media(media_path, prompt, model_path, interval=10, job_id=None, comm
'message': f'Processing frame {i+1}/{total_frames} at {ts:.1f}s'
})
comm.send_message(progress_msg)
print(f"PROGRESS: Job {job_id_int} - {progress_percent}% - Processing frame {i+1}/{total_frames}")
log_message(f"PROGRESS: Job {job_id_int} - {progress_percent}% - Processing frame {i+1}/{total_frames}")
# Check for cancellation
if job_id and check_job_cancelled(job_id):
if get_debug():
print(f"DEBUG: Job {job_id_int} cancelled during frame processing")
log_message(f"DEBUG: Job {job_id_int} cancelled during frame processing")
# Clean up and return cancelled message
for fp, _ in frames[i:]:
try:
......@@ -288,7 +289,7 @@ def analyze_media(media_path, prompt, model_path, interval=10, job_id=None, comm
desc, tokens = analyze_single_image(frame_path, full_prompt, model)
total_tokens += tokens
if get_debug():
print(f"DEBUG: Frame {i+1} analyzed for job {job_id_int}")
log_message(f"DEBUG: Frame {i+1} analyzed for job {job_id_int}")
descriptions.append(f"At {ts:.2f}s: {desc}")
os.unlink(frame_path)
......@@ -303,7 +304,7 @@ def analyze_media(media_path, prompt, model_path, interval=10, job_id=None, comm
'tokens_used': total_tokens
})
comm.send_message(progress_msg)
print(f"PROGRESS: Job {job_id_int} - {progress_percent}% - Completed frame {i+1}/{total_frames}")
log_message(f"PROGRESS: Job {job_id_int} - {progress_percent}% - Completed frame {i+1}/{total_frames}")
# Send ping every 30 seconds to keep connection alive
if comm and (i + 1) % max(1, total_frames // (total_frames // 30 + 1)) == 0:
......@@ -312,14 +313,14 @@ def analyze_media(media_path, prompt, model_path, interval=10, job_id=None, comm
'timestamp': time.time()
})
comm.send_message(ping_msg)
print(f"PING: Job {job_id_int} - Frame {i+1} - Keeping connection alive")
log_message(f"PING: Job {job_id_int} - Frame {i+1} - Keeping connection alive")
if output_dir:
import shutil
shutil.rmtree(output_dir)
if get_debug():
print(f"DEBUG: All frames processed, generating summary for job {job_id_int}")
log_message(f"DEBUG: All frames processed, generating summary for job {job_id_int}")
# Send progress update for summary generation
if comm:
......@@ -330,12 +331,12 @@ def analyze_media(media_path, prompt, model_path, interval=10, job_id=None, comm
'message': 'Generating video summary'
})
comm.send_message(progress_msg)
print(f"PROGRESS: Job {job_id_int} - 85% - Generating video summary")
log_message(f"PROGRESS: Job {job_id_int} - 85% - Generating video summary")
# Check for cancellation before summary
if job_id and check_job_cancelled(job_id):
if get_debug():
print(f"DEBUG: Job {job_id_int} cancelled before summary")
log_message(f"DEBUG: Job {job_id_int} cancelled before summary")
return "Job cancelled by user", total_tokens
# Generate summary
......@@ -374,7 +375,7 @@ def analyze_media(media_path, prompt, model_path, interval=10, job_id=None, comm
total_tokens += summary_tokens
if get_debug():
print(f"DEBUG: Summary generated for job {job_id_int}")
log_message(f"DEBUG: Summary generated for job {job_id_int}")
# Send final progress update
if comm:
......@@ -385,13 +386,13 @@ def analyze_media(media_path, prompt, model_path, interval=10, job_id=None, comm
'message': 'Analysis completed'
})
comm.send_message(progress_msg)
print(f"PROGRESS: Job {job_id_int} - 100% - Analysis completed")
log_message(f"PROGRESS: Job {job_id_int} - 100% - Analysis completed")
result = f"Frame Descriptions:\n" + "\n".join(descriptions) + f"\n\nSummary:\n{summary}"
return result, total_tokens
else:
if get_debug():
print(f"DEBUG: Detected image, analyzing for job {job_id_int}")
log_message(f"DEBUG: Detected image, analyzing for job {job_id_int}")
# Send progress update for image analysis start
if comm:
......@@ -402,12 +403,12 @@ def analyze_media(media_path, prompt, model_path, interval=10, job_id=None, comm
'message': 'Starting image analysis'
})
comm.send_message(progress_msg)
print(f"PROGRESS: Job {job_id_int} - 20% - Starting image analysis")
log_message(f"PROGRESS: Job {job_id_int} - 20% - Starting image analysis")
# Check for cancellation before processing image
if job_id and check_job_cancelled(job_id):
if get_debug():
print(f"DEBUG: Job {job_id_int} cancelled before image analysis")
log_message(f"DEBUG: Job {job_id_int} cancelled before image analysis")
return "Job cancelled by user", total_tokens
# Send progress update before model inference
......@@ -419,12 +420,12 @@ def analyze_media(media_path, prompt, model_path, interval=10, job_id=None, comm
'message': 'Processing image with AI model'
})
comm.send_message(progress_msg)
print(f"PROGRESS: Job {job_id_int} - 50% - Processing image with AI model")
log_message(f"PROGRESS: Job {job_id_int} - 50% - Processing image with AI model")
result, tokens = analyze_single_image(media_path, full_prompt, model)
total_tokens += tokens
if get_debug():
print(f"DEBUG: Image analysis completed for job {job_id_int}")
log_message(f"DEBUG: Image analysis completed for job {job_id_int}")
# Send progress update for completion
if comm:
......@@ -435,7 +436,7 @@ def analyze_media(media_path, prompt, model_path, interval=10, job_id=None, comm
'message': 'Finalizing analysis results'
})
comm.send_message(progress_msg)
print(f"PROGRESS: Job {job_id_int} - 90% - Finalizing analysis results")
log_message(f"PROGRESS: Job {job_id_int} - 90% - Finalizing analysis results")
# Send final progress update
if comm:
......@@ -446,57 +447,57 @@ def analyze_media(media_path, prompt, model_path, interval=10, job_id=None, comm
'message': 'Image analysis completed successfully'
})
comm.send_message(progress_msg)
print(f"PROGRESS: Job {job_id_int} - 100% - Image analysis completed successfully")
log_message(f"PROGRESS: Job {job_id_int} - 100% - Image analysis completed successfully")
torch.cuda.empty_cache()
return result, total_tokens
def worker_process(backend_type: str):
"""Main worker process."""
print(f"WORKER_DEBUG_READ: get_debug() = {get_debug()}")
log_message(f"WORKER_DEBUG_READ: get_debug() = {get_debug()}")
if get_debug():
print(f"DEBUG: Starting Analysis Worker for {backend_type}...")
print(f"DEBUG: Worker PID: {os.getpid()}")
log_message(f"DEBUG: Starting Analysis Worker for {backend_type}...")
log_message(f"DEBUG: Worker PID: {os.getpid()}")
# Workers use TCP for interprocess communication
comm = SocketCommunicator(host='127.0.0.1', port=get_backend_worker_port(), comm_type='tcp')
if get_debug():
print(f"DEBUG: Worker connecting to {comm.host}:{comm.port}")
log_message(f"DEBUG: Worker connecting to {comm.host}:{comm.port}")
comm.connect()
if get_debug():
print(f"Analysis Worker connected to backend")
log_message(f"Analysis Worker connected to backend")
# Register with backend
register_msg = Message('register', 'register', {'type': f'analysis_{backend_type}'})
comm.send_message(register_msg)
if get_debug():
print(f"Analysis Worker registered as analysis_{backend_type}")
log_message(f"Analysis Worker registered as analysis_{backend_type}")
while True:
try:
message = comm.receive_message()
if message and get_debug():
print(f"DEBUG: Worker {os.getpid()} received message: {message}")
log_message(f"DEBUG: Worker {os.getpid()} received message: {message}")
if message and message.msg_type == 'analyze_request':
if get_debug():
print(f"DEBUG: Worker received analyze_request: {message.msg_id}")
log_message(f"DEBUG: Worker received analyze_request: {message.msg_id}")
data = message.data
media_path = data.get('local_path', data.get('file_name', ''))
if not media_path:
result = 'No media path provided'
if get_debug():
print(f"DEBUG: No media path provided for job {message.msg_id}")
log_message(f"DEBUG: No media path provided for job {message.msg_id}")
else:
prompt = data.get('prompt', 'Describe this image.')
model_path = data.get('model_path', 'Qwen/Qwen2.5-VL-7B-Instruct')
interval = data.get('interval', 10)
job_id = message.msg_id # Use message ID for job identification
job_id_int = int(message.msg_id.split('_')[1]) # Extract integer job ID
print(f"PROGRESS: Job {job_id_int} accepted - Starting analysis")
log_message(f"PROGRESS: Job {job_id_int} accepted - Starting analysis")
if get_debug():
print(f"DEBUG: Starting analysis of {media_path} with model {model_path} for job {job_id}")
log_message(f"DEBUG: Starting analysis of {media_path} with model {model_path} for job {job_id}")
result, tokens_used = analyze_media(media_path, prompt, model_path, interval, job_id, comm)
if get_debug():
print(f"DEBUG: Analysis completed for job {message.msg_id}, used {tokens_used} tokens")
log_message(f"DEBUG: Analysis completed for job {message.msg_id}, used {tokens_used} tokens")
# Release model reference (don't unload yet, per requirements)
release_model(model_path)
......@@ -504,14 +505,14 @@ def worker_process(backend_type: str):
# Send result back
response = Message('analyze_response', message.msg_id, {'result': result, 'tokens_used': tokens_used})
if get_debug():
print(f"DEBUG: Sending analyze_response for job {message.msg_id}")
log_message(f"DEBUG: Sending analyze_response for job {message.msg_id}")
comm.send_message(response)
# If in cluster mode, also notify cluster master
# This would be handled by the cluster client receiving the result
time.sleep(0.1)
except Exception as e:
print(f"Worker error: {e}")
log_message(f"Worker error: {e}")
time.sleep(1)
if __name__ == "__main__":
......
......@@ -28,11 +28,12 @@ import json
import time
from .comm import SocketCommunicator, Message
from .config import get_comm_type, get_backend_worker_port, get_debug
from .logging_utils import log_message
def train_model(train_path, output_model, description, comm, job_id):
"""Perform training with progress updates."""
if get_debug():
print(f"DEBUG: Starting training with videotrain for output_model {output_model}")
log_message(f"DEBUG: Starting training with videotrain for output_model {output_model}")
desc_file = os.path.join(train_path, "description.txt")
with open(desc_file, "w") as f:
f.write(description)
......@@ -49,7 +50,7 @@ def train_model(train_path, output_model, description, comm, job_id):
'message': 'Training started'
})
comm.send_message(progress_msg)
print(f"PROGRESS: Job {job_id} - 10% - Training started")
log_message(f"PROGRESS: Job {job_id} - 10% - Training started")
last_ping = time.time()
while proc.poll() is None:
......@@ -60,14 +61,14 @@ def train_model(train_path, output_model, description, comm, job_id):
'timestamp': time.time()
})
comm.send_message(ping_msg)
print(f"PING: Job {job_id} - Keeping connection alive")
log_message(f"PING: Job {job_id} - Keeping connection alive")
last_ping = time.time()
time.sleep(1)
# Get result
stdout, stderr = proc.communicate()
if get_debug():
print(f"DEBUG: Training subprocess completed with returncode {proc.returncode}")
log_message(f"DEBUG: Training subprocess completed with returncode {proc.returncode}")
if proc.returncode == 0:
return "Training completed!"
else:
......@@ -76,7 +77,7 @@ def train_model(train_path, output_model, description, comm, job_id):
def worker_process(backend_type: str):
"""Main worker process."""
if get_debug():
print(f"Starting Training Worker for {backend_type}...")
log_message(f"Starting Training Worker for {backend_type}...")
# Workers use TCP for interprocess communication
comm = SocketCommunicator(host='127.0.0.1', port=get_backend_worker_port(), comm_type='tcp')
......@@ -90,19 +91,19 @@ def worker_process(backend_type: str):
try:
message = comm.receive_message()
if message and get_debug():
print(f"DEBUG: Worker {os.getpid()} received message: {message}")
log_message(f"DEBUG: Worker {os.getpid()} received message: {message}")
if message and message.msg_type == 'train_request':
if get_debug():
print(f"DEBUG: Worker received train_request: {message.msg_id}")
log_message(f"DEBUG: Worker received train_request: {message.msg_id}")
data = message.data
output_model = data.get('output_model', './VideoModel')
description = data.get('description', '')
train_dir = data.get('train_dir', '')
if train_dir and os.path.isdir(train_dir):
print(f"PROGRESS: Job {message.msg_id} accepted - Starting training")
log_message(f"PROGRESS: Job {message.msg_id} accepted - Starting training")
if get_debug():
print(f"DEBUG: Starting training for job {message.msg_id}")
log_message(f"DEBUG: Starting training for job {message.msg_id}")
result = train_model(train_dir, output_model, description, comm, message.msg_id)
# Send final progress
progress_msg = Message('progress', f'progress_{message.msg_id}', {
......@@ -112,21 +113,21 @@ def worker_process(backend_type: str):
'message': 'Training completed'
})
comm.send_message(progress_msg)
print(f"PROGRESS: Job {message.msg_id} - 100% - Training completed")
log_message(f"PROGRESS: Job {message.msg_id} - 100% - Training completed")
if get_debug():
print(f"DEBUG: Training completed for job {message.msg_id}")
log_message(f"DEBUG: Training completed for job {message.msg_id}")
else:
result = "No valid training directory provided"
if get_debug():
print(f"DEBUG: No valid training directory for job {message.msg_id}")
log_message(f"DEBUG: No valid training directory for job {message.msg_id}")
response = Message('train_response', message.msg_id, {'message': result})
if get_debug():
print(f"DEBUG: Sending train_response for job {message.msg_id}")
log_message(f"DEBUG: Sending train_response for job {message.msg_id}")
comm.send_message(response)
time.sleep(0.1)
except Exception as e:
print(f"Worker error: {e}")
log_message(f"Worker error: {e}")
time.sleep(1)
if __name__ == "__main__":
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment