Implement multi-process Video AI architecture

- Add socket-based inter-process communication
- Implement backend process for request routing
- Create separate web interface process
- Add CUDA/ROCm worker processes for analysis and training
- Add configuration system for backend selection
- Update build scripts for multi-component executables
- Add startup scripts for process orchestration
- Include GPLv3 license and copyright notices
- Add comprehensive documentation and README
- Create CHANGELOG for version tracking
parent 552f2d10
Pipeline #193 canceled with stages
#!/usr/bin/env python3
# Simple test for socket communication
import time
import threading
from vidai.comm import SocketServer, SocketCommunicator, Message
def test_handler(message: Message) -> Message:
print(f"Server received: {message.msg_type} - {message.data}")
return Message('response', message.msg_id, {'result': 'success'})
def test_server():
print("Starting test server...")
server = SocketServer(host='localhost', port=5001, comm_type='tcp')
server.start(test_handler)
time.sleep(5) # Run for 5 seconds
server.stop()
print("Server stopped")
def test_client():
time.sleep(1) # Wait for server to start
print("Starting test client...")
comm = SocketCommunicator(host='localhost', port=5001, comm_type='tcp')
comm.connect()
msg = Message('test', '123', {'data': 'hello'})
comm.send_message(msg)
response = comm.receive_message()
print(f"Client received: {response}")
comm.close()
if __name__ == "__main__":
# Start server in background
server_thread = threading.Thread(target=test_server, daemon=True)
server_thread.start()
# Start client
test_client()
# Wait for server to finish
server_thread.join()
print("Test completed")
\ No newline at end of file
...@@ -22,7 +22,7 @@ Manages request routing between web interface and worker processes. ...@@ -22,7 +22,7 @@ Manages request routing between web interface and worker processes.
import time import time
import threading import threading
from .comm import SocketServer, Message from .comm import SocketServer, Message
from .config import get_analysis_backend, get_training_backend, set_analysis_backend, set_training_backend, get_comm_type, get_config_value from .config import get_analysis_backend, get_training_backend, set_analysis_backend, set_training_backend, get_comm_type, get_use_runpod_pods, set_use_runpod_pods
from .compat import get_socket_path, get_default_comm_type from .compat import get_socket_path, get_default_comm_type
from .queue import queue_manager from .queue import queue_manager
from .runpod import runpod_manager, is_runpod_enabled, create_analysis_pod, create_training_pod, RunPodPod from .runpod import runpod_manager, is_runpod_enabled, create_analysis_pod, create_training_pod, RunPodPod
...@@ -85,7 +85,7 @@ def handle_web_message(message: Message) -> Message: ...@@ -85,7 +85,7 @@ def handle_web_message(message: Message) -> Message:
"""Handle messages from web interface.""" """Handle messages from web interface."""
if message.msg_type == 'analyze_request': if message.msg_type == 'analyze_request':
# Check if we should use RunPod pods # Check if we should use RunPod pods
if get_config_value('use_runpod_pods', False) and is_runpod_enabled(): if get_use_runpod_pods() and is_runpod_enabled():
pod = get_available_pod('analysis') pod = get_available_pod('analysis')
if pod: if pod:
# Send job to pod # Send job to pod
...@@ -97,7 +97,7 @@ def handle_web_message(message: Message) -> Message: ...@@ -97,7 +97,7 @@ def handle_web_message(message: Message) -> Message:
return Message('ack', message.msg_id, {'status': 'queued'}) return Message('ack', message.msg_id, {'status': 'queued'})
elif message.msg_type == 'train_request': elif message.msg_type == 'train_request':
if get_config_value('use_runpod_pods', False) and is_runpod_enabled(): if get_use_runpod_pods() and is_runpod_enabled():
pod = get_available_pod('training') pod = get_available_pod('training')
if pod: if pod:
return Message('ack', message.msg_id, {'status': 'pod_assigned', 'pod_id': pod.pod_id}) return Message('ack', message.msg_id, {'status': 'pod_assigned', 'pod_id': pod.pod_id})
...@@ -113,15 +113,14 @@ def handle_web_message(message: Message) -> Message: ...@@ -113,15 +113,14 @@ def handle_web_message(message: Message) -> Message:
if 'training_backend' in data: if 'training_backend' in data:
set_training_backend(data['training_backend']) set_training_backend(data['training_backend'])
if 'use_runpod_pods' in data: if 'use_runpod_pods' in data:
from .config import set_config_value set_use_runpod_pods(data['use_runpod_pods'])
set_config_value('use_runpod_pods', data['use_runpod_pods'])
return Message('config_response', message.msg_id, {'status': 'updated'}) return Message('config_response', message.msg_id, {'status': 'updated'})
elif message.msg_type == 'get_config': elif message.msg_type == 'get_config':
return Message('config_response', message.msg_id, { return Message('config_response', message.msg_id, {
'analysis_backend': get_analysis_backend(), 'analysis_backend': get_analysis_backend(),
'training_backend': get_training_backend(), 'training_backend': get_training_backend(),
'use_runpod_pods': get_config_value('use_runpod_pods', False), 'use_runpod_pods': get_use_runpod_pods(),
'runpod_enabled': is_runpod_enabled() 'runpod_enabled': is_runpod_enabled()
}) })
......
...@@ -65,7 +65,12 @@ def set_runpod_gpu_type(gpu_type: str) -> None: ...@@ -65,7 +65,12 @@ def set_runpod_gpu_type(gpu_type: str) -> None:
def set_use_runpod_pods(use_pods: bool) -> None: def set_use_runpod_pods(use_pods: bool) -> None:
"""Enable or disable automatic pod creation for jobs.""" """Enable or disable automatic pod creation for jobs."""
set_config('use_runpod_pods', use_pods) set_config('use_runpod_pods', 'true' if use_pods else 'false')
def get_use_runpod_pods() -> bool:
"""Check if RunPod pods should be used for jobs."""
return get_config('use_runpod_pods', 'false').lower() == 'true'
def get_default_model() -> str: def get_default_model() -> str:
......
...@@ -26,7 +26,7 @@ from .database import ( ...@@ -26,7 +26,7 @@ from .database import (
get_queue_status, get_user_queue_items, get_queue_position get_queue_status, get_user_queue_items, get_queue_position
) )
from .config import get_max_concurrent_jobs from .config import get_max_concurrent_jobs
from .comm import Message, send_message from .comm import Message
class QueueManager: class QueueManager:
......
...@@ -26,7 +26,7 @@ import requests ...@@ -26,7 +26,7 @@ import requests
import threading import threading
from typing import Dict, List, Optional, Any from typing import Dict, List, Optional, Any
from dataclasses import dataclass from dataclasses import dataclass
from .config import get_config_value, set_config_value from .config import get_runpod_api_key, set_runpod_api_key, get_runpod_template_id, set_runpod_template_id, get_runpod_gpu_type, set_runpod_gpu_type, get_use_runpod_pods, set_use_runpod_pods
from .compat import get_user_config_dir, ensure_dir from .compat import get_user_config_dir, ensure_dir
...@@ -46,7 +46,7 @@ class RunPodManager: ...@@ -46,7 +46,7 @@ class RunPodManager:
"""Manages RunPod pods for dynamic scaling.""" """Manages RunPod pods for dynamic scaling."""
def __init__(self): def __init__(self):
self.api_key = get_config_value('runpod_api_key') self.api_key = get_runpod_api_key()
self.base_url = "https://api.runpod.io/v1" self.base_url = "https://api.runpod.io/v1"
self.headers = { self.headers = {
'Authorization': f'Bearer {self.api_key}', 'Authorization': f'Bearer {self.api_key}',
...@@ -122,7 +122,7 @@ class RunPodManager: ...@@ -122,7 +122,7 @@ class RunPodManager:
if not self.is_configured(): if not self.is_configured():
return None return None
template_id = get_config_value('runpod_template_id', 'vidai-analysis-latest') template_id = get_runpod_template_id()
pod_config = { pod_config = {
"templateId": template_id, "templateId": template_id,
...@@ -250,14 +250,14 @@ runpod_manager = RunPodManager() ...@@ -250,14 +250,14 @@ runpod_manager = RunPodManager()
def configure_runpod(api_key: str, template_id: str = "vidai-analysis-latest"): def configure_runpod(api_key: str, template_id: str = "vidai-analysis-latest"):
"""Configure RunPod integration.""" """Configure RunPod integration."""
set_config_value('runpod_api_key', api_key) set_runpod_api_key(api_key)
set_config_value('runpod_template_id', template_id) set_runpod_template_id(template_id)
runpod_manager.__init__() # Reinitialize with new config runpod_manager.__init__() # Reinitialize with new config
def is_runpod_enabled() -> bool: def is_runpod_enabled() -> bool:
"""Check if RunPod integration is enabled and configured.""" """Check if RunPod integration is enabled and configured."""
return get_config_value('runpod_enabled', False) and runpod_manager.is_configured() return get_runpod_api_key() and runpod_manager.is_configured()
def create_analysis_pod() -> Optional[RunPodPod]: def create_analysis_pod() -> Optional[RunPodPod]:
...@@ -265,7 +265,7 @@ def create_analysis_pod() -> Optional[RunPodPod]: ...@@ -265,7 +265,7 @@ def create_analysis_pod() -> Optional[RunPodPod]:
if not is_runpod_enabled(): if not is_runpod_enabled():
return None return None
gpu_type = get_config_value('runpod_gpu_type', 'NVIDIA RTX A4000') gpu_type = get_runpod_gpu_type()
return runpod_manager.create_pod("analysis", gpu_type) return runpod_manager.create_pod("analysis", gpu_type)
...@@ -274,5 +274,5 @@ def create_training_pod() -> Optional[RunPodPod]: ...@@ -274,5 +274,5 @@ def create_training_pod() -> Optional[RunPodPod]:
if not is_runpod_enabled(): if not is_runpod_enabled():
return None return None
gpu_type = get_config_value('runpod_gpu_type', 'NVIDIA RTX A5000') gpu_type = get_runpod_gpu_type()
return runpod_manager.create_pod("training", gpu_type) return runpod_manager.create_pod("training", gpu_type)
\ No newline at end of file
This diff is collapsed.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment