Implement bidirectional socket-based interprocess communication

- Separate communication protocols: TCP for web-backend, Unix sockets for backend-workers
- Remove file-based result passing, implement in-memory result storage in backend
- Web interface polls backend via socket for results instead of reading files
- Workers send results back through Unix socket connection to backend
- Maintain persistent connections for efficient bidirectional communication
parent 99d6e5b9
......@@ -26,6 +26,7 @@ from .config import get_analysis_backend, get_training_backend, set_analysis_bac
worker_sockets = {} # type: dict
pending_results = {} # msg_id -> result message
def handle_web_message(message: Message) -> Message:
......@@ -63,6 +64,14 @@ def handle_web_message(message: Message) -> Message:
'analysis_backend': get_analysis_backend(),
'training_backend': get_training_backend()
})
elif message.msg_type == 'get_result':
# Check for pending result
msg_id = message.data.get('request_id')
if msg_id in pending_results:
result = pending_results.pop(msg_id)
return result
else:
return Message('result_pending', message.msg_id, {'status': 'pending'})
return Message('error', message.msg_id, {'error': 'Unknown message type'})
......@@ -74,25 +83,8 @@ def handle_worker_message(message: Message, client_sock) -> None:
worker_sockets[worker_type] = client_sock
print(f"Worker {worker_type} registered")
elif message.msg_type in ['analyze_response', 'train_response']:
# Forward to web - but since web is connected via different server, need to store or something
# For simplicity, assume web polls for results, but since socket, perhaps have a pending responses dict
# This is getting complex. Perhaps use a shared dict or file for results.
# To keep simple, since web is Flask, it can have a global dict for results, but since separate process, hard.
# Perhaps the backend sends to web via its own connection, but web connects per request.
# For responses, backend can store in a file or database, and web reads from there.
# But to keep self-contained, use a simple JSON file for pending results.
# Web sends request with id, backend processes, stores result in file with id, web polls for result file.
# Yes, that's ad-hoc.
import os
result_dir = '/tmp/vidai_results'
os.makedirs(result_dir, exist_ok=True)
with open(os.path.join(result_dir, f"{message.msg_id}.json"), 'w') as f:
import json
json.dump({
'msg_type': message.msg_type,
'msg_id': message.msg_id,
'data': message.data
}, f)
# Store result for web to poll
pending_results[message.msg_id] = message
def worker_message_handler(message: Message, client_sock) -> None:
......@@ -104,25 +96,16 @@ def backend_process() -> None:
"""Main backend process loop."""
print("Starting Video AI Backend...")
from .config import get_comm_type, get_backend_web_port, get_backend_worker_port
comm_type = get_comm_type()
print(f"Using {comm_type} sockets for communication")
# Start web server
if comm_type == 'unix':
from .compat import get_socket_path
web_socket_path = get_socket_path('web')
web_server = SocketServer(socket_path=web_socket_path, comm_type='unix')
else:
web_server = SocketServer(host='localhost', port=get_backend_web_port(), comm_type='tcp')
from .config import get_backend_web_port
from .compat import get_socket_path
# Start web server (always TCP)
web_server = SocketServer(host='localhost', port=get_backend_web_port(), comm_type='tcp')
web_server.start(handle_web_message)
# Start worker server
if comm_type == 'unix':
worker_socket_path = get_socket_path('worker')
worker_server = SocketServer(socket_path=worker_socket_path, comm_type='unix')
else:
worker_server = SocketServer(host='localhost', port=get_backend_worker_port(), comm_type='tcp')
# Start worker server (always Unix socket)
worker_socket_path = get_socket_path('worker')
worker_server = SocketServer(socket_path=worker_socket_path, comm_type='unix')
worker_server.start(worker_message_handler)
try:
......
......@@ -30,8 +30,8 @@ from .config import get_all_settings
app = Flask(__name__)
os.makedirs('static', exist_ok=True)
# Communicator to backend
comm = SocketCommunicator(host='localhost', port=5001)
# Communicator to backend (always TCP)
comm = SocketCommunicator(host='localhost', port=5001, comm_type='tcp')
def send_to_backend(msg_type: str, data: dict) -> str:
"""Send message to backend and return message id."""
......@@ -46,15 +46,26 @@ def send_to_backend(msg_type: str, data: dict) -> str:
return msg_id
def get_result(msg_id: str) -> dict:
"""Poll for result from backend."""
result_file = f"/tmp/vidai_results/{msg_id}.json"
"""Poll for result from backend via socket."""
for _ in range(100): # Poll for 10 seconds
if os.path.exists(result_file):
with open(result_file, 'r') as f:
data = json.load(f)
os.unlink(result_file)
return data
time.sleep(0.1)
try:
# Send get_result request
result_msg = Message('get_result', str(uuid.uuid4()), {'request_id': msg_id})
comm.send_message(result_msg)
# Try to receive response
response = comm.receive_message()
if response and response.msg_type in ['analyze_response', 'train_response', 'config_response']:
return {
'msg_type': response.msg_type,
'msg_id': response.msg_id,
'data': response.data
}
elif response and response.msg_type == 'result_pending':
time.sleep(0.1) # Wait and try again
continue
except:
time.sleep(0.1)
return {'error': 'Timeout waiting for result'}
@app.route('/', methods=['GET', 'POST'])
......
......@@ -174,13 +174,9 @@ def worker_process(backend_type: str):
"""Main worker process."""
print(f"Starting Analysis Worker for {backend_type}...")
comm_type = get_comm_type()
if comm_type == 'unix':
from .compat import get_socket_path
comm = SocketCommunicator(socket_path=get_socket_path('worker'), comm_type='unix')
else:
from .config import get_backend_worker_port
comm = SocketCommunicator(host='localhost', port=get_backend_worker_port(), comm_type='tcp')
# Workers always use Unix sockets for interprocess communication
from .compat import get_socket_path
comm = SocketCommunicator(socket_path=get_socket_path('worker'), comm_type='unix')
comm.connect()
# Register with backend
......
......@@ -47,13 +47,9 @@ def worker_process(backend_type: str):
"""Main worker process."""
print(f"Starting Training Worker for {backend_type}...")
comm_type = get_comm_type()
if comm_type == 'unix':
from .compat import get_socket_path
comm = SocketCommunicator(socket_path=get_socket_path('worker'), comm_type='unix')
else:
from .config import get_backend_worker_port
comm = SocketCommunicator(host='localhost', port=get_backend_worker_port(), comm_type='tcp')
# Workers always use Unix sockets for interprocess communication
from .compat import get_socket_path
comm = SocketCommunicator(socket_path=get_socket_path('worker'), comm_type='unix')
comm.connect()
# Register with backend
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment