Implement multi-process architecture with CUDA/ROCm backend selection

- Refactor monolithic Flask app into separate processes:
  * Web interface process (vidai/web.py)
  * Backend routing process (vidai/backend.py)
  * Analysis worker processes (vidai/worker_analysis.py)
  * Training worker processes (vidai/worker_training.py)

- Add self-contained inter-process communication using TCP sockets
- Implement configuration system for backend selection (CUDA/ROCm)
- Add GPLv3 licensing and copyright notices
- Create comprehensive documentation and build scripts
- Update main launcher to manage all processes

This architecture provides better scalability, allows independent GPU backend selection, and maintains clean separation of concerns.
parent 56e233bb
...@@ -249,23 +249,37 @@ Examples: ...@@ -249,23 +249,37 @@ Examples:
print("Press Ctrl+C to stop") print("Press Ctrl+C to stop")
# Start backend process # Start backend process
backend_cmd = get_python_command(module='vidai.backend') backend_cmd = [sys.executable, '-m', 'vidai.backend']
backend_proc = subprocess.Popen(backend_cmd) backend_proc = subprocess.Popen(backend_cmd)
# Start analysis worker
analysis_cmd = [sys.executable, '-m', 'vidai.worker_analysis', args.analysis_backend]
analysis_proc = subprocess.Popen(analysis_cmd)
# Start training worker
training_cmd = [sys.executable, '-m', 'vidai.worker_training', args.training_backend]
training_proc = subprocess.Popen(training_cmd)
# Start web process # Start web process
web_cmd = get_python_command(module='vidai.web') web_cmd = [sys.executable, '-m', 'vidai.web']
web_proc = subprocess.Popen(web_cmd) web_proc = subprocess.Popen(web_cmd)
try: try:
# Wait for processes # Wait for processes
backend_proc.wait() backend_proc.wait()
analysis_proc.wait()
training_proc.wait()
web_proc.wait() web_proc.wait()
except KeyboardInterrupt: except KeyboardInterrupt:
print("Shutting down...") print("Shutting down...")
backend_proc.terminate()
web_proc.terminate() web_proc.terminate()
backend_proc.wait() training_proc.terminate()
analysis_proc.terminate()
backend_proc.terminate()
web_proc.wait() web_proc.wait()
training_proc.wait()
analysis_proc.wait()
backend_proc.wait()
if __name__ == "__main__": if __name__ == "__main__":
main() main()
\ No newline at end of file
...@@ -22,164 +22,103 @@ Manages request routing between web interface and worker processes. ...@@ -22,164 +22,103 @@ Manages request routing between web interface and worker processes.
import time import time
import threading import threading
from .comm import SocketServer, Message from .comm import SocketServer, Message
from .config import get_analysis_backend, get_training_backend, set_analysis_backend, set_training_backend, get_comm_type, get_use_runpod_pods, set_use_runpod_pods from .config import get_analysis_backend, get_training_backend, set_analysis_backend, set_training_backend
from .compat import get_socket_path, get_default_comm_type
from .queue import queue_manager
from .runpod import runpod_manager, is_runpod_enabled, create_analysis_pod, create_training_pod, RunPodPod
worker_sockets = {} # type: dict worker_sockets = {} # type: dict
active_pods = {} # type: dict[str, RunPodPod]
pod_workers = {} # type: dict[str, str] # pod_id -> worker_type
def create_worker_pod(worker_type: str) -> Optional[RunPodPod]:
"""Create a new worker pod on RunPod."""
if not is_runpod_enabled():
return None
try:
if worker_type == 'analysis':
pod = create_analysis_pod()
elif worker_type == 'training':
pod = create_training_pod()
else:
return None
if pod and runpod_manager.wait_for_pod_ready(pod):
active_pods[pod.pod_id] = pod
pod_workers[pod.pod_id] = worker_type
print(f"Created and ready pod {pod.pod_id} for {worker_type}")
return pod
else:
print(f"Failed to create or start pod for {worker_type}")
except Exception as e:
print(f"Error creating pod: {e}")
return None
def get_available_pod(worker_type: str) -> Optional[RunPodPod]:
"""Get an available pod for the worker type, creating one if needed."""
# First, check for existing available pods
for pod_id, pod in active_pods.items():
if pod.worker_type == worker_type and pod.status == 'RUNNING' and pod_id not in worker_sockets:
return pod
# No available pod, create a new one
return create_worker_pod(worker_type)
def cleanup_idle_pods():
"""Clean up idle pods periodically."""
while True:
try:
runpod_manager.cleanup_idle_pods(max_age=1800) # 30 minutes
time.sleep(300) # Check every 5 minutes
except Exception as e:
print(f"Error cleaning up pods: {e}")
time.sleep(60)
def handle_web_message(message: Message) -> Message: def handle_web_message(message: Message) -> Message:
"""Handle messages from web interface.""" """Handle messages from web interface."""
if message.msg_type == 'analyze_request': if message.msg_type == 'analyze_request':
# Check if we should use RunPod pods backend = get_analysis_backend()
if get_use_runpod_pods() and is_runpod_enabled(): worker_key = f'analysis_{backend}'
pod = get_available_pod('analysis') if worker_key in worker_sockets:
if pod: # Forward to worker
# Send job to pod worker_sockets[worker_key].sendall(
return Message('ack', message.msg_id, {'status': 'pod_assigned', 'pod_id': pod.pod_id}) f'{{"msg_type": "{message.msg_type}", "msg_id": "{message.msg_id}", "data": {message.data}}}\n'.encode('utf-8')
)
return None # No immediate response
else: else:
return Message('error', message.msg_id, {'error': 'No pods available'}) return Message('error', message.msg_id, {'error': f'Worker {worker_key} not available'})
else:
# Use local workers or queue
return Message('ack', message.msg_id, {'status': 'queued'})
elif message.msg_type == 'train_request': elif message.msg_type == 'train_request':
if get_use_runpod_pods() and is_runpod_enabled(): backend = get_training_backend()
pod = get_available_pod('training') worker_key = f'training_{backend}'
if pod: if worker_key in worker_sockets:
return Message('ack', message.msg_id, {'status': 'pod_assigned', 'pod_id': pod.pod_id}) worker_sockets[worker_key].sendall(
else: f'{{"msg_type": "{message.msg_type}", "msg_id": "{message.msg_id}", "data": {message.data}}}\n'.encode('utf-8')
return Message('error', message.msg_id, {'error': 'No pods available'}) )
return None
else: else:
return Message('ack', message.msg_id, {'status': 'queued'}) return Message('error', message.msg_id, {'error': f'Worker {worker_key} not available'})
elif message.msg_type == 'config_update': elif message.msg_type == 'config_update':
data = message.data data = message.data
if 'analysis_backend' in data: if 'analysis_backend' in data:
set_analysis_backend(data['analysis_backend']) set_analysis_backend(data['analysis_backend'])
if 'training_backend' in data: if 'training_backend' in data:
set_training_backend(data['training_backend']) set_training_backend(data['training_backend'])
if 'use_runpod_pods' in data:
set_use_runpod_pods(data['use_runpod_pods'])
return Message('config_response', message.msg_id, {'status': 'updated'}) return Message('config_response', message.msg_id, {'status': 'updated'})
elif message.msg_type == 'get_config': elif message.msg_type == 'get_config':
return Message('config_response', message.msg_id, { return Message('config_response', message.msg_id, {
'analysis_backend': get_analysis_backend(), 'analysis_backend': get_analysis_backend(),
'training_backend': get_training_backend(), 'training_backend': get_training_backend()
'use_runpod_pods': get_use_runpod_pods(),
'runpod_enabled': is_runpod_enabled()
}) })
return Message('error', message.msg_id, {'error': 'Unknown message type'}) return Message('error', message.msg_id, {'error': 'Unknown message type'})
# Worker handling is now done by the queue manager def handle_worker_message(message: Message, client_sock) -> None:
"""Handle messages from workers."""
if message.msg_type == 'register':
worker_type = message.data.get('type')
if worker_type:
worker_sockets[worker_type] = client_sock
print(f"Worker {worker_type} registered")
elif message.msg_type in ['analyze_response', 'train_response']:
# Forward to web - but since web is connected via different server, need to store or something
# For simplicity, assume web polls for results, but since socket, perhaps have a pending responses dict
# This is getting complex. Perhaps use a shared dict or file for results.
# To keep simple, since web is Flask, it can have a global dict for results, but since separate process, hard.
# Perhaps the backend sends to web via its own connection, but web connects per request.
# For responses, backend can store in a file or database, and web reads from there.
# But to keep self-contained, use a simple JSON file for pending results.
# Web sends request with id, backend processes, stores result in file with id, web polls for result file.
# Yes, that's ad-hoc.
import os
result_dir = '/tmp/vidai_results'
os.makedirs(result_dir, exist_ok=True)
with open(os.path.join(result_dir, f"{message.msg_id}.json"), 'w') as f:
import json
json.dump({
'msg_type': message.msg_type,
'msg_id': message.msg_id,
'data': message.data
}, f)
def worker_message_handler(message: Message, client_sock) -> None:
"""Handler for worker messages."""
handle_worker_message(message, client_sock)
def backend_process() -> None: def backend_process() -> None:
"""Main backend process loop.""" """Main backend process loop."""
print("Starting Video AI Backend...") print("Starting Video AI Backend...")
from .config import is_cluster_client, get_cluster_port # Start web server on port 5001
from .cluster_master import start_cluster_master web_server = SocketServer(port=5001)
if is_cluster_client():
print("Running as cluster client - backend not needed")
return
# Start cluster master
cluster_thread = threading.Thread(target=start_cluster_master, args=(get_cluster_port(),), daemon=True)
cluster_thread.start()
# Start pod cleanup thread if RunPod is enabled
if is_runpod_enabled():
cleanup_thread = threading.Thread(target=cleanup_idle_pods, daemon=True)
cleanup_thread.start()
print("RunPod pod cleanup thread started")
comm_type = get_comm_type()
print(f"Using {comm_type} sockets for communication")
if comm_type == 'unix':
# Start web server on Unix socket
socket_path = get_socket_path('web')
if socket_path:
web_server = SocketServer(socket_path=socket_path, comm_type='unix')
web_server.start(handle_web_message)
else:
# Fall back to TCP on Windows
web_server = SocketServer(host='localhost', port=5001, comm_type='tcp')
web_server.start(handle_web_message)
else:
# Start web server on TCP
from .config import get_backend_web_port
web_server = SocketServer(host='localhost', port=get_backend_web_port(), comm_type='tcp')
web_server.start(handle_web_message) web_server.start(handle_web_message)
# Start worker server on port 5002
worker_server = SocketServer(port=5002)
worker_server.start(worker_message_handler)
try: try:
while True: while True:
time.sleep(1) time.sleep(1)
except KeyboardInterrupt: except KeyboardInterrupt:
print("Backend shutting down...") print("Backend shutting down...")
# Clean up all active pods
for pod_id in list(active_pods.keys()):
print(f"Terminating pod {pod_id}...")
runpod_manager.terminate_pod(pod_id)
web_server.stop() web_server.stop()
worker_server.stop()
if __name__ == "__main__": if __name__ == "__main__":
......
This diff is collapsed.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment