Commit d7882404 authored by Lisa (Hermes AI)'s avatar Lisa (Hermes AI)

fix: proxy child gateway sessions over HTTP

parent 565e2a88
...@@ -63,6 +63,8 @@ import json ...@@ -63,6 +63,8 @@ import json
import logging import logging
import os
import ssl as ssl_lib import ssl as ssl_lib
import threading import threading
...@@ -71,6 +73,10 @@ import time ...@@ -71,6 +73,10 @@ import time
import uuid import uuid
from urllib import error as urllib_error
from urllib import request as urllib_request
from dataclasses import dataclass from dataclasses import dataclass
from pathlib import Path from pathlib import Path
...@@ -172,6 +178,188 @@ class CommandExecution: ...@@ -172,6 +178,188 @@ class CommandExecution:
class HttpProxyGateway:
"""Thin proxy used by child Hermes sessions to reuse the primary node gateway."""
def __init__(self, base_url: str, config: Optional[Dict[str, Any]] = None):
self.base_url = base_url.rstrip('/')
self.config = config or {}
self.bind_address = self.config.get('bind_address', '127.0.0.1')
self.http_port = int(self.config.get('http_port', 8766) or 8766)
self.websocket_port = int(self.config.get('websocket_port', 8765) or 8765)
self.tokens = self.config.get('tokens', {})
self.nodes = {}
self.commands = {}
self.command_waiters = {}
self._nodes_lock = threading.Lock()
self._commands_lock = threading.Lock()
self._running = False
self._loop = None
self._websocket_thread = None
self._websocket_server = None
self._http_runner = None
self._http_site = None
def start(self):
return None
def stop(self):
return None
close = stop
def _request(self, method: str, path: str, payload: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
url = f"{self.base_url}{path}"
data = None
headers = {}
if payload is not None:
data = json.dumps(payload).encode('utf-8')
headers['Content-Type'] = 'application/json'
req = urllib_request.Request(url, data=data, headers=headers, method=method.upper())
try:
with urllib_request.urlopen(req, timeout=300) as resp:
body = resp.read().decode('utf-8')
return json.loads(body) if body else {}
except urllib_error.HTTPError as e:
body = e.read().decode('utf-8', errors='replace') if hasattr(e, 'read') else ''
try:
parsed = json.loads(body) if body else {}
except Exception:
parsed = {'error': body or str(e)}
message = parsed.get('error') or str(e)
if e.code == 404:
raise ValueError(message)
if e.code == 403:
raise PermissionError(message)
raise RuntimeError(message)
except urllib_error.URLError as e:
raise RuntimeError(f"Primary node gateway unavailable at {url}: {e}") from e
def list_nodes(self) -> List[Dict[str, Any]]:
result = self._request('GET', '/nodes')
if isinstance(result, dict) and 'nodes' in result:
return result['nodes']
if isinstance(result, list):
return result
return []
def get_node_status(self, node_name: str) -> Dict[str, Any]:
return self._request('GET', f'/nodes/{node_name}/status')
async def execute_command(self, node_name: str, command: List[str], timeout: int = 30, approved: bool = False) -> Dict[str, Any]:
return self.execute_command_sync(node_name, command, timeout, approved)
def execute_command_sync(self, node_name: str, command: List[str], timeout: int = 30, approved: bool = False) -> Dict[str, Any]:
return self._request('POST', f'/nodes/{node_name}/exec', {
'command': command,
'timeout': timeout,
'approved': approved,
})
def execute_browser_command_sync(self, node_name: str, payload: Dict[str, Any], timeout: int = 30) -> Dict[str, Any]:
req = dict(payload)
req.setdefault('timeout', timeout)
return self._request('POST', f'/nodes/{node_name}/browser', req)
def execute_computer_command_sync(self, node_name: str, payload: Dict[str, Any], timeout: int = 30) -> Dict[str, Any]:
req = dict(payload)
req.setdefault('timeout', timeout)
return self._request('POST', f'/nodes/{node_name}/computer', req)
def execute_desktop_observe_sync(self, node_name: str, payload: Dict[str, Any], timeout: int = 30) -> Dict[str, Any]:
req = dict(payload)
req.setdefault('timeout', timeout)
return self._request('POST', f'/nodes/{node_name}/observe', req)
def execute_desktop_observe_command_sync(self, node_name: str, payload: Dict[str, Any], timeout: int = 30) -> Dict[str, Any]:
return self.execute_desktop_observe_sync(node_name, payload, timeout)
def execute_audio_command_sync(self, node_name: str, payload: Dict[str, Any], timeout: int = 30) -> Dict[str, Any]:
req = dict(payload)
req.setdefault('timeout', timeout)
return self._request('POST', f'/nodes/{node_name}/audio', req)
def execute_camera_command_sync(self, node_name: str, payload: Dict[str, Any], timeout: int = 30) -> Dict[str, Any]:
req = dict(payload)
req.setdefault('timeout', timeout)
return self._request('POST', f'/nodes/{node_name}/camera', req)
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
# Node Gateway (embedded WebSocket server) # Node Gateway (embedded WebSocket server)
...@@ -728,7 +916,7 @@ class NodeGateway: ...@@ -728,7 +916,7 @@ class NodeGateway:
})) }))
elif msg_type == 'browser_control_response': elif msg_type in ('browser_control_response', 'browser_control_result'):
await self._handle_browser_control_response(msg) await self._handle_browser_control_response(msg)
...@@ -757,10 +945,12 @@ class NodeGateway: ...@@ -757,10 +945,12 @@ class NodeGateway:
async def _handle_browser_control_response(self, msg: dict): async def _handle_browser_control_response(self, msg: dict):
"""Handle browser control response from node""" """Handle browser control response/result from node"""
cmd_id = msg.get("id") cmd_id = msg.get("id")
success = msg.get("success")
result_type = msg.get("result") result_type = msg.get("result")
...@@ -783,7 +973,11 @@ class NodeGateway: ...@@ -783,7 +973,11 @@ class NodeGateway:
if result_type == "ok": if success is None:
success = (result_type == "ok")
if success:
cmd.status = "completed" cmd.status = "completed"
...@@ -809,7 +1003,7 @@ class NodeGateway: ...@@ -809,7 +1003,7 @@ class NodeGateway:
logger.info(f"Browser command {cmd_id} completed: {result_type}") logger.info(f"Browser command {cmd_id} completed: {'ok' if success else 'error'}")
...@@ -1006,7 +1200,9 @@ class NodeGateway: ...@@ -1006,7 +1200,9 @@ class NodeGateway:
# Check if node has browser control capability # Check if node has browser control capability
if "browser_control" not in node.capabilities: node_tools = self._get_node_tools(node)
if "browser_control" not in node_tools:
raise ValueError(f"Node '{node_name}' does not support browser control") raise ValueError(f"Node '{node_name}' does not support browser control")
...@@ -1054,22 +1250,18 @@ class NodeGateway: ...@@ -1054,22 +1250,18 @@ class NodeGateway:
# Register waiter before yielding control so fast node replies can't race us
future = asyncio.Future()
self.command_waiters[cmd_id] = future
await node.socket.send(json.dumps(msg)) await node.socket.send(json.dumps(msg))
cmd.status = 'running' cmd.status = 'running'
logger.info(f"Sent browser command {cmd_id} to node '{node_name}': {command.get('command')}") logger.info(f"Sent browser command {cmd_id} to node '{node_name}': {command.get('command')}")
# Wait for completion # Wait for completion
future = asyncio.Future()
self.command_waiters[cmd_id] = future
try: try:
...@@ -2275,6 +2467,34 @@ _gateway: Optional[NodeGateway] = None ...@@ -2275,6 +2467,34 @@ _gateway: Optional[NodeGateway] = None
def _running_inside_gateway_session() -> bool:
"""Return True when this Hermes process is a child session spawned from the gateway."""
return bool(os.getenv("HERMES_SESSION_KEY"))
def _gateway_http_base_url(config: Dict[str, Any]) -> str:
bind_address = str(config.get('bind_address', '127.0.0.1') or '127.0.0.1')
if bind_address in {'0.0.0.0', '::', ''}:
bind_address = '127.0.0.1'
http_port = int(config.get('http_port', 8766) or 8766)
return f"http://{bind_address}:{http_port}"
def _build_http_proxy_gateway(config: Dict[str, Any]) -> "HttpProxyGateway":
return HttpProxyGateway(_gateway_http_base_url(config), config=config)
def _get_gateway() -> NodeGateway: def _get_gateway() -> NodeGateway:
"""Get the singleton gateway instance""" """Get the singleton gateway instance"""
...@@ -2297,9 +2517,15 @@ def _init_gateway(config: Dict[str, Any]) -> NodeGateway: ...@@ -2297,9 +2517,15 @@ def _init_gateway(config: Dict[str, Any]) -> NodeGateway:
if _gateway is None: if _gateway is None:
_gateway = NodeGateway(config) if _running_inside_gateway_session():
_gateway.start() _gateway = _build_http_proxy_gateway(config)
else:
_gateway = NodeGateway(config)
_gateway.start()
return _gateway return _gateway
...@@ -2888,6 +3114,9 @@ def tool_node_exec(*args, **kwargs) -> Dict[str, Any]: ...@@ -2888,6 +3114,9 @@ def tool_node_exec(*args, **kwargs) -> Dict[str, Any]:
if not command: if not command:
raise ValueError(f"Missing required parameter: 'command' (got keys: {sorted(params.keys())})") raise ValueError(f"Missing required parameter: 'command' (got keys: {sorted(params.keys())})")
if isinstance(gw, HttpProxyGateway):
return gw.execute_command_sync(node_name, command, timeout, approved)
with gw._nodes_lock: with gw._nodes_lock:
if node_name not in gw.nodes: if node_name not in gw.nodes:
available = list(gw.nodes.keys()) available = list(gw.nodes.keys())
...@@ -3031,6 +3260,7 @@ def register(ctx): ...@@ -3031,6 +3260,7 @@ def register(ctx):
node_config = { node_config = {
'bind_address': hermes_config.get('node_gateway', {}).get('bind_address', '0.0.0.0'), 'bind_address': hermes_config.get('node_gateway', {}).get('bind_address', '0.0.0.0'),
'websocket_port': hermes_config.get('node_gateway', {}).get('websocket_port', 8765), 'websocket_port': hermes_config.get('node_gateway', {}).get('websocket_port', 8765),
'http_port': hermes_config.get('node_gateway', {}).get('http_port', 8766),
'use_tls': hermes_config.get('node_gateway', {}).get('use_tls', True), 'use_tls': hermes_config.get('node_gateway', {}).get('use_tls', True),
'cert_dir': hermes_config.get('node_gateway', {}).get('cert_dir', 'cert_dir': hermes_config.get('node_gateway', {}).get('cert_dir',
'/home/lisa/.config/hermes-node-gateway/certs'), '/home/lisa/.config/hermes-node-gateway/certs'),
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment