Commit 29a308f6 authored by Lisa's avatar Lisa

feat(node-agent): add computer_control desktop automation and capability-based tool discovery

Major integration of desktop automation and structured capability reporting for Hermes Node Gateway.
This enables a unified node execution framework with optional tool support that is
automatically detected and advertised.

Features:
- ComputerController class (Linux/X11) using xdotool and import (ImageMagick)
  - Screenshot (full screen, optional path or base64-return)
  - Mouse: move(x,y), click(button), position()
  - Keyboard: type_text(text), key_press(key like 'Return', 'Ctrl+c')
  - Window: active_window() returns focused window info
- Capability-based registration to gateway
  - Agent sends tool list ["exec", "browser_control", "computer_control"] on connect
  - gateway filters tools based on declared capabilities
  - missing PC deps handled: checks for xdotool/import; browser extension presence
- Universal installer updated:
  - Prompts: enable browser control? enable computer control?
  - Optional per-node sexec permissions quick-edit (allow/deny/ask comma patterns)
  - Writes config.json with enable_browser, enable_computer_control and permissions JSON
- Fixes:
  - agent formerly advertised capability key 'browser'; gateway expects 'browser_control' → aligned
  - installers baked agent compressed with base64 in shell script, no external files

Files:
- node-agent/: new ComputerController class; enhanced hermes_node_agent.py
- ~/.hermes/plugins/hermes-node-gateway/: added COMPUTER_CONTROL_SCHEMA + handlers + routing
- install_hermes_node_universal.sh (merged in repo dist/ location)

Node Agent endpoint type: "computer_control" commands (gateway forwards to node agent)
Gateway: registers tool 'computer_control' with schema and executes cc_result responses handled

Deployed nodes can now:
  - Execute shell commands via sexec (preserves existing allow/ask/deny)
  - Control browsers (if extension installed)
  - Control desktop (if X11 + xdotool + ImageMagick installed)

Hermes Gateway plugin now exposes 4 tools:
  node_list, node_status, node_exec, browser_control, computer_control ✓
Co-authored-by: 's avatarLisa <lisa@nexlab.net>
Signed-off-by: 's avatarLisa <lisa@nexlab.net>
parents
# Browser Control Protocol Schema
## Message Format
All browser control messages follow this structure:
```json
{
"type": "browser_control",
"id": "unique-command-id",
"layer": "high_level|playwright|cdp",
"command": "command_name",
"page_id": "page_1",
"params": {
// Command-specific parameters
}
}
```
## Response Format
```json
{
"type": "browser_control_response",
"id": "unique-command-id",
"result": "ok|error",
// Result-specific fields
}
```
---
## Layer 1: High-Level Commands
### Launch Browser
**Command:** `launch`
```json
{
"type": "browser_control",
"id": "cmd_1",
"layer": "high_level",
"command": "launch",
"params": {
"config": {
"mode": "headless", // "headless", "headed", "attach"
"cdp_url": null, // For attach mode: "http://localhost:9222"
"viewport": {"width": 1920, "height": 1080},
"timeout": 30000
}
}
}
```
**Response:**
```json
{
"type": "browser_control_response",
"id": "cmd_1",
"result": "ok",
"mode": "headless",
"browser_version": "120.0.6099.109"
}
```
### Create Context
**Command:** `create_context`
```json
{
"type": "browser_control",
"id": "cmd_2",
"layer": "high_level",
"command": "create_context",
"params": {
"config": {
"name": "my_context",
"persistent": false,
"incognito": true,
"user_data_dir": null, // For persistent: "/path/to/profile"
"viewport": {"width": 1920, "height": 1080},
"user_agent": "Custom User Agent",
"locale": "en-US",
"timezone": "America/New_York",
"geolocation": {"latitude": 40.7128, "longitude": -74.0060},
"permissions": ["geolocation", "notifications"],
"extra_http_headers": {"X-Custom": "value"},
"ignore_https_errors": false,
"java_script_enabled": true,
"bypass_csp": false
}
}
}
```
**Response:**
```json
{
"type": "browser_control_response",
"id": "cmd_2",
"result": "ok",
"context_name": "my_context",
"persistent": false
}
```
### New Page
**Command:** `new_page`
```json
{
"type": "browser_control",
"id": "cmd_3",
"layer": "high_level",
"command": "new_page",
"params": {
"context_name": "my_context" // Optional, uses "default" if not specified
}
}
```
**Response:**
```json
{
"type": "browser_control_response",
"id": "cmd_3",
"result": "ok",
"page_id": "page_1",
"context_name": "my_context"
}
```
### Navigate
**Command:** `navigate`
```json
{
"type": "browser_control",
"id": "cmd_4",
"layer": "high_level",
"command": "navigate",
"page_id": "page_1",
"params": {
"url": "https://example.com",
"wait_until": "load" // "load", "domcontentloaded", "networkidle"
}
}
```
**Response:**
```json
{
"type": "browser_control_response",
"id": "cmd_4",
"result": "ok",
"url": "https://example.com",
"status": 200,
"title": "Example Domain"
}
```
### Click
**Command:** `click`
```json
{
"type": "browser_control",
"id": "cmd_5",
"layer": "high_level",
"command": "click",
"page_id": "page_1",
"params": {
"selector": "#submit-button",
"button": "left", // Optional: "left", "right", "middle"
"click_count": 1, // Optional: number of clicks
"delay": 0 // Optional: delay between mousedown and mouseup in ms
}
}
```
**Response:**
```json
{
"type": "browser_control_response",
"id": "cmd_5",
"result": "ok"
}
```
### Fill Input
**Command:** `fill`
```json
{
"type": "browser_control",
"id": "cmd_6",
"layer": "high_level",
"command": "fill",
"page_id": "page_1",
"params": {
"selector": "#username",
"value": "myusername"
}
}
```
**Response:**
```json
{
"type": "browser_control_response",
"id": "cmd_6",
"result": "ok"
}
```
### Type Text
**Command:** `type_text`
```json
{
"type": "browser_control",
"id": "cmd_7",
"layer": "high_level",
"command": "type_text",
"page_id": "page_1",
"params": {
"selector": "#search",
"text": "search query",
"delay": 100 // Optional: delay between keystrokes in ms
}
}
```
### Wait for Selector
**Command:** `wait_for_selector`
```json
{
"type": "browser_control",
"id": "cmd_8",
"layer": "high_level",
"command": "wait_for_selector",
"page_id": "page_1",
"params": {
"selector": ".result",
"state": "visible", // "attached", "detached", "visible", "hidden"
"timeout": 30000
}
}
```
### Screenshot
**Command:** `screenshot`
```json
{
"type": "browser_control",
"id": "cmd_9",
"layer": "high_level",
"command": "screenshot",
"page_id": "page_1",
"params": {
"full_page": false,
"path": "/tmp/screenshot.png" // Optional: save to file
}
}
```
**Response:**
```json
{
"type": "browser_control_response",
"id": "cmd_9",
"result": "ok",
"screenshot": "iVBORw0KGgoAAAANSUhEUgAA...", // Base64 encoded PNG
"path": "/tmp/screenshot.png"
}
```
### Execute Script
**Command:** `execute_script`
```json
{
"type": "browser_control",
"id": "cmd_10",
"layer": "high_level",
"command": "execute_script",
"page_id": "page_1",
"params": {
"script": "document.title"
}
}
```
**Response:**
```json
{
"type": "browser_control_response",
"id": "cmd_10",
"result": "ok",
"result": "Example Domain"
}
```
### Evaluate Expression
**Command:** `evaluate`
```json
{
"type": "browser_control",
"id": "cmd_11",
"layer": "high_level",
"command": "evaluate",
"page_id": "page_1",
"params": {
"expression": "() => document.querySelectorAll('a').length",
"arg": null // Optional: argument to pass to function
}
}
```
### Get Content
**Command:** `get_content`
```json
{
"type": "browser_control",
"id": "cmd_12",
"layer": "high_level",
"command": "get_content",
"page_id": "page_1",
"params": {}
}
```
**Response:**
```json
{
"type": "browser_control_response",
"id": "cmd_12",
"result": "ok",
"content": "<!DOCTYPE html><html>..."
}
```
### Get Title
**Command:** `get_title`
```json
{
"type": "browser_control",
"id": "cmd_13",
"layer": "high_level",
"command": "get_title",
"page_id": "page_1",
"params": {}
}
```
**Response:**
```json
{
"type": "browser_control_response",
"id": "cmd_13",
"result": "ok",
"title": "Example Domain"
}
```
### List Pages
**Command:** `list_pages`
```json
{
"type": "browser_control",
"id": "cmd_14",
"layer": "high_level",
"command": "list_pages",
"params": {}
}
```
**Response:**
```json
{
"type": "browser_control_response",
"id": "cmd_14",
"result": "ok",
"pages": [
{
"page_id": "page_1",
"url": "https://example.com",
"title": "Example Domain"
}
]
}
```
### List Contexts
**Command:** `list_contexts`
```json
{
"type": "browser_control",
"id": "cmd_15",
"layer": "high_level",
"command": "list_contexts",
"params": {}
}
```
**Response:**
```json
{
"type": "browser_control_response",
"id": "cmd_15",
"result": "ok",
"contexts": [
{
"name": "default",
"page_count": 2
}
]
}
```
### Close Page
**Command:** `close_page`
```json
{
"type": "browser_control",
"id": "cmd_16",
"layer": "high_level",
"command": "close_page",
"page_id": "page_1",
"params": {}
}
```
### Close Context
**Command:** `close_context`
```json
{
"type": "browser_control",
"id": "cmd_17",
"layer": "high_level",
"command": "close_context",
"params": {
"context_name": "my_context"
}
}
```
### Close Browser
**Command:** `close`
```json
{
"type": "browser_control",
"id": "cmd_18",
"layer": "high_level",
"command": "close",
"params": {}
}
```
---
## Layer 2: Playwright API Commands
Direct access to Playwright Page API methods.
**Command:** `playwright`
```json
{
"type": "browser_control",
"id": "cmd_19",
"layer": "playwright",
"command": "locator",
"page_id": "page_1",
"params": {
"args": [".my-class"],
"kwargs": {}
}
}
```
**Examples:**
- `locator(selector)` - Get locator
- `get_by_text(text)` - Get element by text
- `get_by_role(role)` - Get element by ARIA role
- `get_by_test_id(test_id)` - Get element by test ID
- `query_selector(selector)` - Query selector
- `query_selector_all(selector)` - Query all selectors
---
## Layer 3: CDP (Chrome DevTools Protocol) Commands
Direct access to Chrome DevTools Protocol.
**Command:** `cdp`
```json
{
"type": "browser_control",
"id": "cmd_20",
"layer": "cdp",
"command": "Network.enable",
"page_id": "page_1",
"params": {}
}
```
**Common CDP Commands:**
- `Network.enable` - Enable network tracking
- `Network.getResponseBody` - Get response body
- `Performance.getMetrics` - Get performance metrics
- `Runtime.evaluate` - Evaluate JavaScript
- `Page.captureScreenshot` - Capture screenshot
- `DOM.getDocument` - Get DOM document
**Example with parameters:**
```json
{
"type": "browser_control",
"id": "cmd_21",
"layer": "cdp",
"command": "Runtime.evaluate",
"page_id": "page_1",
"params": {
"expression": "window.location.href",
"returnByValue": true
}
}
```
---
## Error Responses
```json
{
"type": "browser_control_response",
"id": "cmd_x",
"result": "error",
"error": "Error message describing what went wrong"
}
```
---
## Launch Modes
### 1. Headless Mode (default)
Browser runs without UI, ideal for automation and servers.
```json
{"mode": "headless"}
```
### 2. Headed Mode
Browser runs with visible UI, useful for debugging.
```json
{"mode": "headed"}
```
### 3. Attach Mode
Attach to existing browser instance via CDP.
```json
{
"mode": "attach",
"cdp_url": "http://localhost:9222"
}
```
To launch Chrome with remote debugging:
```bash
google-chrome --remote-debugging-port=9222
```
---
## Context Types
### 1. Incognito Context (default)
Isolated context with no persistent state.
```json
{
"name": "incognito_ctx",
"incognito": true,
"persistent": false
}
```
### 2. Persistent Context
Context with saved cookies, localStorage, etc.
```json
{
"name": "persistent_ctx",
"persistent": true,
"user_data_dir": "/path/to/profile"
}
```
### 3. Named Context
Multiple isolated contexts in same browser.
```json
{
"name": "user1_context",
"incognito": true
}
```
#!/usr/bin/env python3
"""
Browser Control Example Usage
Demonstrates how to use the browser control capability
via Hermes Node Protocol.
This example shows:
1. Launching browser in different modes
2. Creating contexts (incognito, persistent, named)
3. High-level commands (navigate, click, fill, screenshot)
4. Playwright API access
5. CDP (Chrome DevTools Protocol) access
Author: Lisa (Hermes AI)
Date: 2026-04-29
"""
import asyncio
import json
import base64
from pathlib import Path
class BrowserControlExample:
"""Example browser control client"""
def __init__(self, ws):
"""
Args:
ws: WebSocket connection to Hermes Gateway
"""
self.ws = ws
self.cmd_counter = 0
def _next_id(self):
"""Generate unique command ID"""
self.cmd_counter += 1
return f"cmd_{self.cmd_counter}"
async def send_command(self, layer, command, page_id=None, params=None):
"""
Send browser control command and wait for response
Args:
layer: Command layer (high_level, playwright, cdp)
command: Command name
page_id: Page identifier (optional)
params: Command parameters (optional)
Returns:
Response dict
"""
cmd_id = self._next_id()
msg = {
"type": "browser_control",
"id": cmd_id,
"layer": layer,
"command": command,
}
if page_id:
msg["page_id"] = page_id
if params:
msg["params"] = params
# Send command
await self.ws.send(json.dumps(msg))
print(f"→ Sent: {layer}/{command} (id={cmd_id})")
# Wait for response
while True:
response = await self.ws.recv()
resp_data = json.loads(response)
if resp_data.get("type") == "browser_control_response" and resp_data.get("id") == cmd_id:
print(f"← Received: {resp_data.get('result')}")
return resp_data
async def example_basic_navigation(self):
"""Example 1: Basic navigation and screenshot"""
print("\n=== Example 1: Basic Navigation ===\n")
# Launch browser in headless mode
result = await self.send_command(
"high_level", "launch",
params={"config": {"mode": "headless"}}
)
print(f"Browser launched: {result.get('browser_version')}")
# Create new page
result = await self.send_command(
"high_level", "new_page"
)
page_id = result.get("page_id")
print(f"Page created: {page_id}")
# Navigate to URL
result = await self.send_command(
"high_level", "navigate",
page_id=page_id,
params={"url": "https://example.com"}
)
print(f"Navigated to: {result.get('url')}")
print(f"Page title: {result.get('title')}")
# Take screenshot
result = await self.send_command(
"high_level", "screenshot",
page_id=page_id,
params={"full_page": True}
)
if result.get("result") == "ok":
# Save screenshot
screenshot_data = base64.b64decode(result.get("screenshot"))
screenshot_path = Path("/tmp/example_screenshot.png")
screenshot_path.write_bytes(screenshot_data)
print(f"Screenshot saved to: {screenshot_path}")
# Close browser
await self.send_command("high_level", "close")
print("Browser closed")
async def example_form_interaction(self):
"""Example 2: Form filling and interaction"""
print("\n=== Example 2: Form Interaction ===\n")
# Launch browser
await self.send_command(
"high_level", "launch",
params={"config": {"mode": "headless"}}
)
# Create page
result = await self.send_command("high_level", "new_page")
page_id = result.get("page_id")
# Navigate to form page
await self.send_command(
"high_level", "navigate",
page_id=page_id,
params={"url": "https://httpbin.org/forms/post"}
)
# Fill form fields
await self.send_command(
"high_level", "fill",
page_id=page_id,
params={"selector": "input[name='custname']", "value": "John Doe"}
)
await self.send_command(
"high_level", "fill",
page_id=page_id,
params={"selector": "input[name='custtel']", "value": "555-1234"}
)
# Click submit button
await self.send_command(
"high_level", "click",
page_id=page_id,
params={"selector": "button[type='submit']"}
)
# Wait for navigation
await asyncio.sleep(2)
# Get page content
result = await self.send_command(
"high_level", "get_content",
page_id=page_id
)
print(f"Page content length: {len(result.get('content', ''))} bytes")
# Close
await self.send_command("high_level", "close")
async def example_multiple_contexts(self):
"""Example 3: Multiple contexts (simulate multiple users)"""
print("\n=== Example 3: Multiple Contexts ===\n")
# Launch browser
await self.send_command(
"high_level", "launch",
params={"config": {"mode": "headless"}}
)
# Create context for user 1
await self.send_command(
"high_level", "create_context",
params={"config": {"name": "user1", "incognito": True}}
)
# Create context for user 2
await self.send_command(
"high_level", "create_context",
params={"config": {"name": "user2", "incognito": True}}
)
# Create page in user1 context
result = await self.send_command(
"high_level", "new_page",
params={"context_name": "user1"}
)
page1_id = result.get("page_id")
# Create page in user2 context
result = await self.send_command(
"high_level", "new_page",
params={"context_name": "user2"}
)
page2_id = result.get("page_id")
# Navigate both pages to different URLs
await self.send_command(
"high_level", "navigate",
page_id=page1_id,
params={"url": "https://example.com"}
)
await self.send_command(
"high_level", "navigate",
page_id=page2_id,
params={"url": "https://httpbin.org"}
)
# List all pages
result = await self.send_command("high_level", "list_pages")
print(f"Open pages: {len(result.get('pages', []))}")
for page in result.get("pages", []):
print(f" - {page['page_id']}: {page['url']}")
# List all contexts
result = await self.send_command("high_level", "list_contexts")
print(f"Open contexts: {len(result.get('contexts', []))}")
for ctx in result.get("contexts", []):
print(f" - {ctx['name']}: {ctx['page_count']} pages")
# Close
await self.send_command("high_level", "close")
async def example_javascript_execution(self):
"""Example 4: JavaScript execution"""
print("\n=== Example 4: JavaScript Execution ===\n")
# Launch and create page
await self.send_command(
"high_level", "launch",
params={"config": {"mode": "headless"}}
)
result = await self.send_command("high_level", "new_page")
page_id = result.get("page_id")
# Navigate
await self.send_command(
"high_level", "navigate",
page_id=page_id,
params={"url": "https://example.com"}
)
# Execute simple script
result = await self.send_command(
"high_level", "execute_script",
page_id=page_id,
params={"script": "document.title"}
)
print(f"Page title via script: {result.get('result')}")
# Evaluate expression
result = await self.send_command(
"high_level", "evaluate",
page_id=page_id,
params={"expression": "() => document.querySelectorAll('a').length"}
)
print(f"Number of links: {result.get('result')}")
# Execute complex script
script = """
() => {
return {
url: window.location.href,
title: document.title,
links: document.querySelectorAll('a').length,
images: document.querySelectorAll('img').length,
viewport: {
width: window.innerWidth,
height: window.innerHeight
}
};
}
"""
result = await self.send_command(
"high_level", "evaluate",
page_id=page_id,
params={"expression": script}
)
print(f"Page info: {json.dumps(result.get('result'), indent=2)}")
# Close
await self.send_command("high_level", "close")
async def example_playwright_api(self):
"""Example 5: Direct Playwright API access"""
print("\n=== Example 5: Playwright API ===\n")
# Launch and create page
await self.send_command(
"high_level", "launch",
params={"config": {"mode": "headless"}}
)
result = await self.send_command("high_level", "new_page")
page_id = result.get("page_id")
# Navigate
await self.send_command(
"high_level", "navigate",
page_id=page_id,
params={"url": "https://example.com"}
)
# Use Playwright locator API
result = await self.send_command(
"playwright", "locator",
page_id=page_id,
params={"args": ["h1"], "kwargs": {}}
)
print(f"Locator result: {result.get('result')}")
# Use get_by_text
result = await self.send_command(
"playwright", "get_by_text",
page_id=page_id,
params={"args": ["Example Domain"], "kwargs": {}}
)
print(f"get_by_text result: {result.get('result')}")
# Close
await self.send_command("high_level", "close")
async def example_cdp_access(self):
"""Example 6: Chrome DevTools Protocol access"""
print("\n=== Example 6: CDP Access ===\n")
# Launch and create page
await self.send_command(
"high_level", "launch",
params={"config": {"mode": "headless"}}
)
result = await self.send_command("high_level", "new_page")
page_id = result.get("page_id")
# Enable network tracking via CDP
result = await self.send_command(
"cdp", "Network.enable",
page_id=page_id,
params={}
)
print(f"Network tracking enabled: {result.get('result')}")
# Navigate
await self.send_command(
"high_level", "navigate",
page_id=page_id,
params={"url": "https://example.com"}
)
# Get performance metrics via CDP
result = await self.send_command(
"cdp", "Performance.getMetrics",
page_id=page_id,
params={}
)
print(f"Performance metrics: {result.get('result')}")
# Evaluate via CDP
result = await self.send_command(
"cdp", "Runtime.evaluate",
page_id=page_id,
params={
"expression": "navigator.userAgent",
"returnByValue": True
}
)
print(f"User agent: {result.get('result')}")
# Close
await self.send_command("high_level", "close")
async def example_headed_mode(self):
"""Example 7: Headed mode (visible browser)"""
print("\n=== Example 7: Headed Mode ===\n")
# Launch in headed mode
result = await self.send_command(
"high_level", "launch",
params={"config": {"mode": "headed"}}
)
print(f"Browser launched in headed mode: {result.get('browser_version')}")
# Create page
result = await self.send_command("high_level", "new_page")
page_id = result.get("page_id")
# Navigate
await self.send_command(
"high_level", "navigate",
page_id=page_id,
params={"url": "https://example.com"}
)
print("Browser is visible. Waiting 5 seconds...")
await asyncio.sleep(5)
# Close
await self.send_command("high_level", "close")
async def example_persistent_context(self):
"""Example 8: Persistent context (saved profile)"""
print("\n=== Example 8: Persistent Context ===\n")
# Launch browser
await self.send_command(
"high_level", "launch",
params={"config": {"mode": "headless"}}
)
# Create persistent context
profile_dir = "/tmp/browser_profile"
await self.send_command(
"high_level", "create_context",
params={
"config": {
"name": "persistent",
"persistent": True,
"user_data_dir": profile_dir
}
}
)
print(f"Created persistent context at: {profile_dir}")
# Create page in persistent context
result = await self.send_command(
"high_level", "new_page",
params={"context_name": "persistent"}
)
page_id = result.get("page_id")
# Navigate and interact
await self.send_command(
"high_level", "navigate",
page_id=page_id,
params={"url": "https://example.com"}
)
# Set localStorage via JavaScript
await self.send_command(
"high_level", "execute_script",
page_id=page_id,
params={"script": "localStorage.setItem('test', 'persistent_data')"}
)
print("Set localStorage item")
# Close
await self.send_command("high_level", "close")
print(f"Profile saved to: {profile_dir}")
async def main():
"""Main entry point"""
# This is a demonstration of the API
# In real usage, you would connect to the Hermes Gateway WebSocket
print("Browser Control Example Usage")
print("=" * 50)
print()
print("This script demonstrates the browser control API.")
print("To use it, you need to:")
print("1. Start Hermes Gateway")
print("2. Start Hermes Node Agent with browser control enabled")
print("3. Connect to the gateway WebSocket")
print()
print("Example connection code:")
print()
print("import websockets")
print("async with websockets.connect('ws://gateway:8765') as ws:")
print(" example = BrowserControlExample(ws)")
print(" await example.example_basic_navigation()")
print()
print("See BROWSER_PROTOCOL.md for full protocol documentation.")
if __name__ == "__main__":
asyncio.run(main())
#!/usr/bin/env python3
"""
Browser control module for Hermes Node Protocol.
Implements the interface expected by hermes_node_agent.py.
"""
import asyncio
import base64
import logging
from typing import Dict, Any, Optional
from pathlib import Path
try:
from playwright.async_api import async_playwright, Browser, BrowserContext, Page, Error as PlaywrightError
HAS_PLAYWRIGHT = True
except ImportError:
HAS_PLAYWRIGHT = False
Browser = BrowserContext = Page = None
logger = logging.getLogger(__name__)
class BrowserController:
"""Manages browser instances and contexts for remote control."""
def __init__(self):
self.playwright = None
self.browser: Optional[Browser] = None
self.contexts: Dict[str, BrowserContext] = {} # name -> context
self.default_context: Optional[BrowserContext] = None
self.pages: Dict[str, Page] = {} # page_id -> page
self.lock = asyncio.Lock()
async def initialize(self):
"""Initialize Playwright."""
if not HAS_PLAYWRIGHT:
raise RuntimeError("Playwright not installed. Run: pip install playwright && playwright install chromium")
self.playwright = await async_playwright().start()
logger.info("Playwright initialized")
async def shutdown(self):
"""Clean shutdown of all browser resources."""
try:
# Close all contexts
for ctx in self.contexts.values():
await ctx.close()
if self.default_context:
await self.default_context.close()
# Close browser
if self.browser:
await self.browser.close()
# Stop playwright
if self.playwright:
await self.playwright.stop()
logger.info("Browser controller shutdown complete")
except Exception as e:
logger.error(f"Error during shutdown: {e}")
# Launch and context management
async def launch(self, config: Dict = None) -> Dict[str, Any]:
"""Launch the browser."""
config = config or {}
headless = config.get("headless", False)
attach = config.get("attach", False)
cdp_url = config.get("cdp_url", "http://localhost:9222")
browser_type = config.get("browser_type", "chromium")
try:
if attach:
# Connect to existing browser
endpoints = {
"chromium": f"{cdp_url}",
"firefox": f"{cdp_url.replace(':9222', ':9200') if ':9222' in cdp_url else cdp_url}",
"webkit": f"{cdp_url.replace(':9222', ':9223') if ':9222' in cdp_url else cdp_url}"
}
endpoint = endpoints.get(browser_type, cdp_url)
if browser_type == "chromium":
self.browser = await self.playwright.chromium.connect_over_cdp(endpoint)
elif browser_type == "firefox":
self.browser = await self.playwright.firefox.connect_over_cdp(endpoint)
elif browser_type == "webkit":
self.browser = await self.playwright.webkit.connect_over_cdp(endpoint)
else:
self.browser = await self.playwright.chromium.connect_over_cdp(endpoint)
logger.info(f"Attached to existing {browser_type} browser at {endpoint}")
else:
# Launch new browser
browser_types = {
"chromium": self.playwright.chromium,
"firefox": self.playwright.firefox,
"webkit": self.playwright.webkit
}
launch_browser = browser_types.get(browser_type, self.playwright.chromium)
args = config.get("args", ['--no-sandbox', '--disable-setuid-sandbox'])
# Add extension paths if provided (Chromium only)
extension_paths = config.get("extension_paths", [])
if extension_paths and browser_type == "chromium":
args.append('--load-extension=' + ','.join(extension_paths))
self.browser = await launch_browser.launch(
headless=headless,
args=args
)
logger.info(f"Launched {browser_type} browser (headless={headless})")
return {"success": True, "browser": browser_type, "headless": headless, "mode": "attach" if attach else "launch"}
except Exception as e:
logger.error(f"Failed to launch/attach browser: {e}")
return {"success": False, "error": str(e)}
async def create_context(self, config: Dict = None) -> Dict[str, Any]:
"""Create a new browser context."""
if not self.browser:
return {"success": False, "error": "Browser not launched"}
config = config or {}
context_name = config.get("name")
async with self.lock:
try:
# Check for incognito
incognito = config.get("incognito", False)
if incognito:
ctx = await self.browser.new_context()
ctx_name = f"_incognito_{id(ctx)}"
else:
ctx = await self.browser.new_context()
ctx_name = context_name or f"ctx_{len(self.contexts)}"
self.contexts[ctx_name] = ctx
# Create initial page if requested
create_page = config.get("create_page", True)
page_id = None
if create_page:
page = await ctx.new_page()
page_id = f"page_{len(self.pages)}"
self.pages[page_id] = page
logger.info(f"Created context: {ctx_name} (incognito={incognito})")
return {
"success": True,
"context_name": ctx_name,
"page_id": page_id,
"incognito": incognito
}
except Exception as e:
logger.error(f"Failed to create context: {e}")
return {"success": False, "error": str(e)}
async def new_page(self, context_name: Optional[str] = None) -> Dict[str, Any]:
"""Create a new page in specified context (or default)."""
try:
ctx = await self._get_context(context_name)
page = await ctx.new_page()
page_id = f"page_{len(self.pages)}"
self.pages[page_id] = page
return {"success": True, "page_id": page_id, "context": context_name or "default"}
except Exception as e:
logger.error(f"Failed to create page: {e}")
return {"success": False, "error": str(e)}
# Navigation
async def navigate(self, page_id: str, url: str, wait_until: str = "load",
timeout: int = 30000) -> Dict[str, Any]:
"""Navigate to a URL."""
page = await self._get_page(page_id)
await page.goto(url, wait_until=wait_until, timeout=timeout)
return {
"success": True,
"url": page.url,
"title": await page.title()
}
# Interaction
async def click(self, page_id: str, selector: str, timeout: int = 30000) -> Dict[str, Any]:
"""Click an element."""
page = await self._get_page(page_id)
await page.click(selector, timeout=timeout)
return {"success": True, "selector": selector}
async def fill(self, page_id: str, selector: str, value: str, timeout: int = 30000) -> Dict[str, Any]:
"""Fill a form field."""
page = await self._get_page(page_id)
await page.fill(selector, value, timeout=timeout)
return {"success": True, "selector": selector, "value": value}
async def type_text(self, page_id: str, selector: str, text: str,
delay: int = 0, timeout: int = 30000) -> Dict[str, Any]:
"""Type text character by character."""
page = await self._get_page(page_id)
await page.type(selector, text, delay=delay, timeout=timeout)
return {"success": True, "selector": selector, "text": text}
async def wait_for_selector(self, page_id: str, selector: str,
state: str = "visible", timeout: int = 30000) -> Dict[str, Any]:
"""Wait for an element to be in a state."""
page = await self._get_page(page_id)
await page.wait_for_selector(selector, state=state, timeout=timeout)
return {"success": True, "selector": selector, "state": state}
# Script execution
async def execute_script(self, page_id: str, script: str) -> Dict[str, Any]:
"""Execute JavaScript in page context (no return)."""
page = await self._get_page(page_id)
await page.evaluate(script)
return {"success": True}
async def evaluate(self, page_id: str, expression: str, arg: Any = None) -> Dict[str, Any]:
"""Evaluate JavaScript and return result."""
page = await self._get_page(page_id)
result = await page.evaluate(expression, arg)
return {"success": True, "result": result}
# Inspection
async def screenshot(self, page_id: str, full_page: bool = False,
path: Optional[str] = None) -> Dict[str, Any]:
"""Take a screenshot."""
page = await self._get_page(page_id)
if path:
await page.screenshot(path=path, full_page=full_page)
return {"success": True, "path": path, "format": "png"}
else:
screenshot_bytes = await page.screenshot(full_page=full_page)
screenshot_b64 = base64.b64encode(screenshot_bytes).decode('utf-8')
return {"success": True, "screenshot": screenshot_b64, "format": "png", "encoding": "base64"}
async def get_content(self, page_id: str) -> Dict[str, Any]:
"""Get page HTML content."""
page = await self._get_page(page_id)
content = await page.content()
return {"success": True, "content": content}
async def get_title(self, page_id: str) -> Dict[str, Any]:
"""Get page title."""
page = await self._get_page(page_id)
title = await page.title()
return {"success": True, "title": title}
async def get_url(self, page_id: str) -> Dict[str, Any]:
"""Get current page URL."""
page = await self._get_page(page_id)
return {"success": True, "url": page.url}
# Cleanup
async def close_page(self, page_id: str) -> Dict[str, Any]:
"""Close a specific page."""
if page_id not in self.pages:
return {"success": False, "error": f"Page not found: {page_id}"}
page = self.pages[page_id]
await page.close()
del self.pages[page_id]
return {"success": True, "page_id": page_id}
async def close_context(self, context_name: str) -> Dict[str, Any]:
"""Close a named context and all its pages."""
if context_name not in self.contexts:
return {"success": False, "error": f"Context not found: {context_name}"}
ctx = self.contexts[context_name]
await ctx.close()
del self.contexts[context_name]
# Clean up any pages from this context
self.pages = {pid: p for pid, p in self.pages.items()
if p.context.name != context_name}
logger.info(f"Closed context: {context_name}")
return {"success": True, "context": context_name}
# Browser extension support
async def load_extension(self, extension_path: str) -> Dict[str, Any]:
"""Load a Chrome/Chromium unpacked extension."""
if not self.browser:
return {"success": False, "error": "Browser not launched"}
if not Path(extension_path).exists():
return {"success": False, "error": f"Extension path not found: {extension_path}"}
# Extension is loaded via args during browser launch, this reloads it
return {"success": False, "error": "Extension must be passed in launch config with 'extension_paths'"}
async def execute_extension_script(self, extension_id: str, script: str) -> Dict[str, Any]:
"""Execute script in extension context via CDP."""
# This would use CDP to inject into extension background page
return {"success": False, "error": "Use CDP layer directly with Runtime.evaluate"}
async def list_extensions(self) -> Dict[str, Any]:
"""List installed extensions via CDP."""
return {"success": False, "error": "Use CDP: Browser.getBrowserCommandLine and querying"}
async def close(self) -> Dict[str, Any]:
"""Close all contexts and the browser."""
await self.shutdown()
self.contexts = {}
self.pages = {}
self.default_context = None
self.browser = None
return {"success": True, "message": "Browser closed"}
# Listing
async def list_pages(self) -> Dict[str, Any]:
"""List all active pages."""
pages_info = []
for page_id, page in self.pages.items():
pages_info.append({
"page_id": page_id,
"url": page.url,
"title": await page.title()
})
return {"success": True, "pages": pages_info}
async def list_contexts(self) -> Dict[str, Any]:
"""List all contexts."""
contexts_info = []
for name, ctx in self.contexts.items():
contexts_info.append({
"name": name,
"pages_count": len(ctx.pages)
})
# Include default context if exists
if self.default_context:
contexts_info.append({
"name": "default",
"pages_count": len(self.default_context.pages)
})
return {"success": True, "contexts": contexts_info}
# Advanced APIs
async def playwright_command(self, page_id: str, command: str,
args: list = None, kwargs: dict = None) -> Dict[str, Any]:
"""Execute arbitrary Playwright API."""
args = args or []
kwargs = kwargs or {}
page = await self._get_page(page_id)
# Parse command path: "locator.click" -> page.locator(...).click(...)
parts = command.split(".")
obj = page
for i, part in enumerate(parts[:-1]):
# Get the attribute
attr = getattr(obj, part)
# If it's a method, we need to call it with an arg from args
# Pattern: part.method(args[i])
if callable(attr):
if args:
obj = attr(args[0])
args = args[1:]
else:
obj = attr()
else:
obj = attr
# Call final method
final_method = getattr(obj, parts[-1])
if callable(final_method):
result = await final_method(*args, **kwargs)
else:
result = final_method
return {"success": True, "result": result}
async def cdp_command(self, page_id: str, method: str, params: dict = None) -> Dict[str, Any]:
"""Send raw CDP command."""
page = await self._get_page(page_id)
cdp = await page.context.new_cdp_session(page)
result = await cdp.send(method, params or {})
return {"success": True, "result": result}
# Helpers
async def _get_context(self, context_name: Optional[str] = None) -> BrowserContext:
"""Get context by name or default."""
if context_name:
if context_name not in self.contexts:
raise ValueError(f"Context not found: {context_name}")
return self.contexts[context_name]
if not self.default_context:
self.default_context = await self.browser.new_context()
return self.default_context
async def _get_page(self, page_id: str) -> Page:
"""Get page by ID."""
if page_id not in self.pages:
raise ValueError(f"Page not found: {page_id}")
return self.pages[page_id]
#!/usr/bin/env python3
"""
Browser control module for Hermes Node Protocol.
Provides Playwright-based browser automation with CDP fallback.
"""
import asyncio
import base64
import logging
from typing import Dict, Any, Optional, List
from pathlib import Path
try:
from playwright.async_api import async_playwright, Browser, BrowserContext, Page, Error as PlaywrightError
HAS_PLAYWRIGHT = True
except ImportError:
HAS_PLAYWRIGHT = False
Browser = BrowserContext = Page = None
logger = logging.getLogger(__name__)
class BrowserController:
"""Manages browser instances and contexts for remote control."""
def __init__(self):
self.playwright = None
self.browser: Optional[Browser] = None
self.contexts: Dict[str, BrowserContext] = {} # Named contexts
self.default_context: Optional[BrowserContext] = None
self.pages: Dict[str, Page] = {} # context_name -> active page
self.cdp_sessions: Dict[str, Any] = {}
async def initialize(self):
"""Initialize Playwright."""
if not HAS_PLAYWRIGHT:
raise RuntimeError("Playwright not installed. Run: pip install playwright && playwright install chromium")
self.playwright = await async_playwright().start()
logger.info("Playwright initialized")
async def shutdown(self):
"""Clean shutdown of all browser resources."""
try:
# Close all contexts
for ctx in self.contexts.values():
await ctx.close()
if self.default_context:
await self.default_context.close()
# Close browser
if self.browser:
await self.browser.close()
# Stop playwright
if self.playwright:
await self.playwright.stop()
logger.info("Browser controller shutdown complete")
except Exception as e:
logger.error(f"Error during shutdown: {e}")
async def launch_browser(self, headless: bool = False, attach: bool = False,
cdp_url: Optional[str] = None) -> Dict[str, Any]:
"""
Launch or attach to a browser.
Args:
headless: Run without visible window
attach: Connect to existing browser (requires cdp_url or default port)
cdp_url: CDP endpoint URL (e.g., http://localhost:9222)
"""
try:
if attach:
# Connect to existing browser
cdp_endpoint = cdp_url or "http://localhost:9222"
self.browser = await self.playwright.chromium.connect_over_cdp(cdp_endpoint)
logger.info(f"Attached to existing browser at {cdp_endpoint}")
else:
# Launch new browser
self.browser = await self.playwright.chromium.launch(
headless=headless,
args=['--no-sandbox', '--disable-setuid-sandbox']
)
logger.info(f"Launched browser (headless={headless})")
return {"success": True, "mode": "attach" if attach else "launch", "headless": headless}
except Exception as e:
logger.error(f"Failed to launch/attach browser: {e}")
return {"success": False, "error": str(e)}
async def get_or_create_context(self, context_name: Optional[str] = None,
incognito: bool = False) -> BrowserContext:
"""
Get existing context or create new one.
Args:
context_name: Named context (persistent). None = default context.
incognito: Create temporary incognito context (not saved)
"""
if not self.browser:
raise RuntimeError("Browser not launched. Call launch_browser() first.")
# Incognito: always create fresh
if incognito:
ctx = await self.browser.new_context()
logger.info("Created incognito context")
return ctx
# Named context
if context_name:
if context_name not in self.contexts:
self.contexts[context_name] = await self.browser.new_context()
logger.info(f"Created named context: {context_name}")
return self.contexts[context_name]
# Default persistent context
if not self.default_context:
self.default_context = await self.browser.new_context()
logger.info("Created default context")
return self.default_context
async def get_page(self, context_name: Optional[str] = None,
incognito: bool = False) -> Page:
"""Get or create a page in the specified context."""
ctx = await self.get_or_create_context(context_name, incognito)
# Return existing page or create new one
pages = ctx.pages
if pages:
return pages[0]
page = await ctx.new_page()
return page
async def execute_command(self, command: Dict[str, Any]) -> Dict[str, Any]:
"""
Execute a browser control command.
Command structure:
{
"action": "navigate" | "click" | "fill" | "screenshot" | "execute_js" | "playwright" | "cdp",
"context": "name" (optional),
"incognito": true/false,
"headless": true/false (for launch),
"attach": true/false,
... action-specific params
}
"""
action = command.get("action")
context_name = command.get("context")
incognito = command.get("incognito", False)
try:
# Ensure browser is running
if not self.browser:
headless = command.get("headless", False)
attach = command.get("attach", False)
cdp_url = command.get("cdp_url")
result = await self.launch_browser(headless, attach, cdp_url)
if not result["success"]:
return result
# Route to handler
if action == "navigate":
return await self._navigate(command, context_name, incognito)
elif action == "click":
return await self._click(command, context_name, incognito)
elif action == "fill":
return await self._fill(command, context_name, incognito)
elif action == "screenshot":
return await self._screenshot(command, context_name, incognito)
elif action == "execute_js":
return await self._execute_js(command, context_name, incognito)
elif action == "evaluate":
return await self._evaluate(command, context_name, incognito)
elif action == "playwright":
return await self._playwright_api(command, context_name, incognito)
elif action == "cdp":
return await self._cdp_command(command, context_name, incognito)
elif action == "close_context":
return await self._close_context(command)
elif action == "close_browser":
return await self._close_browser()
elif action == "get_url":
return await self._get_url(context_name, incognito)
elif action == "get_title":
return await self._get_title(context_name, incognito)
else:
return {"success": False, "error": f"Unknown action: {action}"}
except Exception as e:
logger.error(f"Command execution failed: {e}", exc_info=True)
return {"success": False, "error": str(e)}
# High-level actions
async def _navigate(self, cmd: Dict, context_name: Optional[str], incognito: bool) -> Dict:
"""Navigate to URL."""
url = cmd.get("url")
if not url:
return {"success": False, "error": "Missing 'url' parameter"}
wait_until = cmd.get("wait_until", "load") # load, domcontentloaded, networkidle
timeout = cmd.get("timeout", 30000)
page = await self.get_page(context_name, incognito)
await page.goto(url, wait_until=wait_until, timeout=timeout)
return {
"success": True,
"url": page.url,
"title": await page.title()
}
async def _click(self, cmd: Dict, context_name: Optional[str], incognito: bool) -> Dict:
"""Click an element."""
selector = cmd.get("selector")
if not selector:
return {"success": False, "error": "Missing 'selector' parameter"}
page = await self.get_page(context_name, incognito)
await page.click(selector, timeout=cmd.get("timeout", 30000))
return {"success": True, "selector": selector}
async def _fill(self, cmd: Dict, context_name: Optional[str], incognito: bool) -> Dict:
"""Fill a form field."""
selector = cmd.get("selector")
text = cmd.get("text", "")
if not selector:
return {"success": False, "error": "Missing 'selector' parameter"}
page = await self.get_page(context_name, incognito)
await page.fill(selector, text, timeout=cmd.get("timeout", 30000))
return {"success": True, "selector": selector, "text": text}
async def _screenshot(self, cmd: Dict, context_name: Optional[str], incognito: bool) -> Dict:
"""Take a screenshot."""
page = await self.get_page(context_name, incognito)
full_page = cmd.get("full_page", False)
screenshot_bytes = await page.screenshot(full_page=full_page)
# Return as base64
screenshot_b64 = base64.b64encode(screenshot_bytes).decode('utf-8')
return {
"success": True,
"screenshot": screenshot_b64,
"format": "png",
"encoding": "base64"
}
async def _execute_js(self, cmd: Dict, context_name: Optional[str], incognito: bool) -> Dict:
"""Execute JavaScript in page context (no return value)."""
script = cmd.get("script")
if not script:
return {"success": False, "error": "Missing 'script' parameter"}
page = await self.get_page(context_name, incognito)
await page.evaluate(script)
return {"success": True}
async def _evaluate(self, cmd: Dict, context_name: Optional[str], incognito: bool) -> Dict:
"""Evaluate JavaScript and return result."""
script = cmd.get("script")
if not script:
return {"success": False, "error": "Missing 'script' parameter"}
page = await self.get_page(context_name, incognito)
result = await page.evaluate(script)
return {"success": True, "result": result}
async def _get_url(self, context_name: Optional[str], incognito: bool) -> Dict:
"""Get current page URL."""
page = await self.get_page(context_name, incognito)
return {"success": True, "url": page.url}
async def _get_title(self, context_name: Optional[str], incognito: bool) -> Dict:
"""Get current page title."""
page = await self.get_page(context_name, incognito)
title = await page.title()
return {"success": True, "title": title}
# Advanced APIs
async def _playwright_api(self, cmd: Dict, context_name: Optional[str], incognito: bool) -> Dict:
"""Execute arbitrary Playwright API call."""
method = cmd.get("method") # e.g., "page.click", "page.goto"
args = cmd.get("args", [])
kwargs = cmd.get("kwargs", {})
if not method:
return {"success": False, "error": "Missing 'method' parameter"}
page = await self.get_page(context_name, incognito)
# Parse method path (e.g., "page.locator.click")
parts = method.split(".")
obj = page
for part in parts[:-1]:
obj = getattr(obj, part)
if callable(obj):
obj = obj(*args[:1]) # Consume first arg if method call
args = args[1:]
# Call final method
final_method = getattr(obj, parts[-1])
result = await final_method(*args, **kwargs)
return {"success": True, "result": str(result)}
async def _cdp_command(self, cmd: Dict, context_name: Optional[str], incognito: bool) -> Dict:
"""Send raw CDP command."""
method = cmd.get("method")
params = cmd.get("params", {})
if not method:
return {"success": False, "error": "Missing 'method' parameter"}
page = await self.get_page(context_name, incognito)
cdp = await page.context.new_cdp_session(page)
result = await cdp.send(method, params)
return {"success": True, "result": result}
async def _close_context(self, cmd: Dict) -> Dict:
"""Close a named context."""
context_name = cmd.get("context")
if not context_name:
return {"success": False, "error": "Missing 'context' parameter"}
if context_name in self.contexts:
await self.contexts[context_name].close()
del self.contexts[context_name]
logger.info(f"Closed context: {context_name}")
return {"success": True, "context": context_name}
return {"success": False, "error": f"Context not found: {context_name}"}
async def _close_browser(self) -> Dict:
"""Close the browser and all contexts."""
await self.shutdown()
self.browser = None
self.contexts = {}
self.default_context = None
return {"success": True, "message": "Browser closed"}
# Global instance
_controller: Optional[BrowserController] = None
async def get_controller() -> BrowserController:
"""Get or create the global browser controller instance."""
global _controller
if _controller is None:
_controller = BrowserController()
await _controller.initialize()
return _controller
async def handle_browser_command(command: Dict[str, Any]) -> Dict[str, Any]:
"""Main entry point for browser control commands."""
controller = await get_controller()
return await controller.execute_command(command)
#!/bin/sh
# /etc/init.d/hermes-node-agent
# SysVinit script for Hermes Node Agent
#
# chkconfig: 2345 95 05
# description: Hermes Node Agent reverse-connected WebSocket client
### BEGIN INIT INFO
# Provides: hermes-node-agent
# Required-Start: $network $remote_fs $syslog
# Required-Stop: $network $remote_fs $syslog
# Default-Start: 2 3 4 5
# Default-Stop: 0 1 6
# Short-Description: Hermes Node Agent
# Description: Reverse-connected WebSocket node agent.
### END INIT INFO
NAME="hermes-node-agent"
DAEMON="/usr/bin/python3"
SCRIPT_DIR="/usr/local/bin"
DAEMON_SCRIPT="${SCRIPT_DIR}/hermes_node_agent.py"
PIDFILE="/var/run/${NAME}.pid"
LOGFILE="/var/log/${NAME}.log"
USER="root"
GROUP="root"
# Check daemon exists
if [ ! -x "$DAEMON" ]; then
echo "$DAEMON not found or not executable."
exit 5
fi
if [ ! -f "$DAEMON_SCRIPT" ]; then
echo "$DAEMON_SCRIPT not found."
exit 5
fi
# Ensure config exists
if [ ! -f "/etc/hermes-node/config.json" ]; then
echo "/etc/hermes-node/config.json not found."
exit 6
fi
. /lib/lsb/init-functions 2>/dev/null || true
start() {
echo "Starting $NAME..."
if [ -f "$PIDFILE" ]; then
PID=$(cat "$PIDFILE")
if kill -0 "$PID" 2>/dev/null; then
echo "$NAME is already running (PID $PID)."
return 0
else
rm -f "$PIDFILE"
fi
fi
touch "$LOGFILE"
chown "$USER:$GROUP" "$LOGFILE" 2>/dev/null || chmod 644 "$LOGFILE"
$DAEMON $DAEMON_SCRIPT >> "$LOGFILE" 2>&1 &
echo $! > "$PIDFILE"
sleep 1
if kill -0 $(cat "$PIDFILE") 2>/dev/null; then
echo "$NAME started (PID $(cat $PIDFILE))."
else
echo "$NAME failed to start. Check $LOGFILE"
exit 1
fi
}
stop() {
echo "Stopping $NAME..."
if [ -f "$PIDFILE" ]; then
PID=$(cat "$PIDFILE")
if kill -0 "$PID" 2>/dev/null; then
kill "$PID" 2>/dev/null
for i in $(seq 1 30); do
if ! kill -0 "$PID" 2>/dev/null; then
break
fi
sleep 0.5
done
if kill -0 "$PID" 2>/dev/null; then
echo "Force killing..."
kill -9 "$PID" 2>/dev/null
sleep 1
fi
fi
rm -f "$PIDFILE"
echo "$NAME stopped."
else
echo "$NAME is not running."
fi
pkill -f "hermes_node_agent.py" 2>/dev/null || true
}
case "$1" in
start)
start
;;
stop)
stop
;;
restart)
stop
sleep 1
start
;;
reload|force-reload)
echo "Reload not supported, restarting..."
stop
sleep 1
start
;;
status)
RUNNING=0
if [ -f "$PIDFILE" ]; then
PID=$(cat "$PIDFILE")
if kill -0 "$PID" 2>/dev/null; then
echo "$NAME is running (PID $PID)."
RUNNING=1
else
echo "$NAME is not running (stale PID file)."
RUNNING=0
fi
else
PID=$(pgrep -f "hermes_node_agent.py" | head -1)
if [ -n "$PID" ]; then
echo "$NAME is running (PID $PID) but no PID file."
RUNNING=1
else
echo "$NAME is not running."
RUNNING=0
fi
fi
exit $(( 1 - RUNNING ))
;;
*)
echo "Usage: $0 {start|stop|restart|status}"
exit 1
;;
esac
exit 0
[Unit]
Description=Hermes Node Agent
After=network-online.target
Wants=network-online.target
[Service]
Type=simple
ExecStart=/usr/bin/python3 /usr/local/bin/hermes-node-agent
Restart=always
RestartSec=10
StandardOutput=journal
StandardError=journal
# Security hardening
NoNewPrivileges=true
PrivateTmp=true
ProtectSystem=strict
ProtectHome=read-only
ReadWritePaths=/tmp
[Install]
WantedBy=default.target
// Hermes Extension Background Service Worker
// Provides bidirectional communication between CDP and content scripts
const PORT_NAME = 'hermes_agent_port';
let ports = new Map();
let messageQueue = [];
chrome.runtime.onConnect.addListener((port) => {
if (port.name === PORT_NAME) {
const tabId = port.sender?.tab?.id;
console.log('[Hermes] Port connected from tab:', tabId);
if (tabId) ports.set(tabId, port);
port.onMessage.addListener((msg) => {
console.log('[Hermes] Message from tab', tabId, ':', msg);
if (msg.type === 'hermes_injected_ready') {
// Inject script reported ready
}
});
port.onDisconnect.addListener(() => {
console.log('[Hermes] Port disconnected from tab:', tabId);
ports.delete(tabId);
});
}
});
// Message from external (CDP Runtime.evaluate)
chrome.runtime.onMessageExternal.addListener((msg, sender, sendResponse) => {
if (msg.type && msg.type.startsWith('hermes_')) {
handleHermesMessage(msg, sender, sendResponse);
return true; // async response
}
});
function handleHermesMessage(msg, sender, sendResponse) {
switch (msg.type) {
case 'hermes_eval_in_page':
// Execute JS in all tabs
chrome.tabs.query({}, (tabs) => {
tabs.forEach(tab => {
if (ports.has(tab.id)) {
ports.get(tab.id).postMessage({
type: 'hermes_exec',
script: msg.script,
id: msg.id
});
}
});
sendResponse({status: 'sent'});
});
break;
case 'hermes_get_info':
sendResponse({
status: 'ok',
extension: 'hermes_browser_agent',
version: '1.0',
tabs: Array.from(ports.keys())
});
break;
default:
sendResponse({status: 'unknown'});
}
}
// CDP can call chrome.runtime.sendMessage via Runtime.evaluate
// This allows remote commands to trigger extension actions
console.log('[Hermes] Background service worker initialized');
// Hermes Content Script - runs in every page
// Establishes communication channel with injected code
const PORT_NAME = 'hermes_agent_port';
const port = chrome.runtime.connect({name: PORT_NAME});
port.onMessage.addListener((msg) => {
if (msg.type === 'hermes_exec') {
// Execute script in page context
try {
const result = eval(msg.script);
// Result is not sent back via port (no return mechanism in MV3)
// Use CDP Runtime.evaluate for return values
} catch (e) {
console.error('[Hermes] Script execution error:', e);
}
}
});
// Signal that content script is loaded
port.postMessage({type: 'hermes_content_ready'});
console.log('[Hermes] Content script loaded');
// Hermes Injected API - exposed to page JavaScript context
// Allows page scripts to communicate with the extension and agent
window.HermesAgent = {
version: '1.0',
// Execute code via extension (safer than direct eval)
execute: async function(script) {
return new Promise((resolve) => {
const msg = {type: 'hermes_exec', script, id: Date.now()};
// CDP Runtime.evaluate is the primary channel, this is secondary
console.log('[Hermes] execute called:', script);
resolve({ok: true, note: 'use CDP Runtime.evaluate for results'});
});
},
// Get page info
getInfo: async function() {
return {
url: window.location.href,
title: document.title,
domain: window.location.hostname,
referrer: document.referrer,
timestamp: Date.now()
};
},
// Helper: wait for selector
waitForSelector: function(selector, timeout = 5000) {
return new Promise((resolve, reject) => {
const start = Date.now();
const check = () => {
const el = document.querySelector(selector);
if (el) {
resolve(el);
} else if (Date.now() - start > timeout) {
reject(new Error('Timeout waiting for: ' + selector));
} else {
requestAnimationFrame(check);
}
};
check();
});
},
// Helper: fill form
fillForm: function(selector, value) {
const el = document.querySelector(selector);
if (el) {
el.value = value;
el.dispatchEvent(new Event('input', {bubbles: true}));
el.dispatchEvent(new Event('change', {bubbles: true}));
return true;
}
return false;
}
};
console.log('[Hermes] Injected API loaded - window.HermesAgent available');
// Notify background script
if (chrome && chrome.runtime) {
chrome.runtime.sendMessage({
type: 'hermes_injected_ready',
url: window.location.href
});
}
{
"manifest_version": 3,
"name": "Hermes Node Agent Extension",
"version": "1.0",
"description": "Hermes agent helper - provides CDP communication and JS injection utilities for remote browser automation.",
"permissions": [
"storage",
"scripting",
"activeTab",
"tabs",
"webNavigation"
],
"host_permissions": [
"<all_urls>"
],
"background": {
"service_worker": "background.js",
"type": "module"
},
"content_scripts": [
{
"matches": [
"<all_urls>"
],
"js": [
"content.js"
],
"run_at": "document_start"
}
],
"web_accessible_resources": [
{
"resources": [
"injected.js"
],
"matches": [
"<all_urls>"
]
}
]
}
\ No newline at end of file
#!/usr/bin/env python3
"""
Hermes Node Agent - Reverse-connection node executor
Connects to Hermes Gateway via WebSocket and executes commands.
Supports optional tools: browser control, computer_control.
Author: Lisa (Hermes AI)
Date: 2026-04-30 (enhanced)
"""
import asyncio
import json
import logging
import os
import subprocess
import sys
import time
import argparse
from pathlib import Path
from typing import Optional, Dict, Any
try:
import websockets
except ImportError:
print("ERROR: websockets library not found. Install with: pip install websockets")
sys.exit(1)
try:
from browser_controller import BrowserController
HAS_BROWSER = True
except ImportError:
HAS_BROWSER = False
# ── Computer control dependencies ──────────────────────────────────────────
try:
from PIL import ImageGrab
HAS_PIL = True
except ImportError:
HAS_PIL = False
# ── Logging ─────────────────────────────────────────────────────────────────
logging.basicConfig(
level=logging.INFO,
format='[%(asctime)s] [%(levelname)s] %(message)s',
datefmt='%H:%M:%S'
)
logger = logging.getLogger(__name__)
# ── Default config ──────────────────────────────────────────────────────────
DEFAULT_CONFIG = {
"gateway_url": "wss://localhost:8765",
"node_name": None, # Filled at install time
"token": None, # Filled at install time
"sexec_path": str(Path.home() / ".openclaw/skills/sexec/sexec.sh"),
"reconnect_interval": 5,
"heartbeat_interval": 30,
# Capabilities — installer sets these based on user choice & system deps
"enable_browser": False, # Chrome/Edge extension present
"enable_computer_control": False, # Has X11 + required tools
}
# ── Computer control: Linux/X11 implementation ─────────────────────────────
class ComputerController:
"""Desktop automation via X11 tools (xdotool, import)."""
def __init__(self):
self.display = os.environ.get('DISPLAY', ':0')
self._check_deps()
def _check_deps(self):
"""Verify required command-line tools are present."""
self.has_xdotool = subprocess.run(['which', 'xdotool'], capture_output=True).returncode == 0
self.has_import = subprocess.run(['which', 'import'], capture_output=True).returncode == 0
self.has_scrot = subprocess.run(['which', 'scrot'], capture_output=True).returncode == 0
if not (self.has_xdotool and (self.has_import or self.has_scrot)):
logger.warning("computer_control: missing xdotool or screenshot tool")
def _run(self, cmd) -> Dict[str, Any]:
"""Run a command and return success/error dict."""
try:
result = subprocess.run(
cmd, shell=True,
capture_output=True, text=True,
timeout=30
)
return {
"success": result.returncode == 0,
"stdout": result.stdout.strip(),
"stderr": result.stderr.strip(),
"exit_code": result.returncode
}
except Exception as e:
return {"success": False, "error": str(e)}
# ── Screenshot ──────────────────────────────────────────────────────────
def screenshot(self, output_path: Optional[str] = None) -> Dict[str, Any]:
"""Take a screenshot of the entire desktop."""
if output_path:
# Save to file
if self.has_import:
cmd = f'import -display {self.display} -window root "{output_path}"'
else:
cmd = f'scrot -d 1 "{output_path}"'
return self._run(cmd)
else:
# Return base64-encoded PNG data
if self.has_import:
cmd = f'import -display {self.display} -window root png:-'
try:
result = subprocess.run(cmd, shell=True, capture_output=True, timeout=30)
if result.returncode == 0:
import base64
b64 = base64.b64encode(result.stdout).decode('ascii')
return {
"success": True,
"format": "png",
"data": b64,
"size": len(result.stdout)
}
except Exception as e:
return {"success": False, "error": str(e)}
return {"success": False, "error": "No screenshot tool available"}
# ── Mouse ───────────────────────────────────────────────────────────────
def mouse_move(self, x: int, y: int) -> Dict[str, Any]:
return self._run(f'xdotool mousemove {x} {y}')
def mouse_click(self, button: int = 1) -> Dict[str, Any]:
"""button: 1=left, 2=middle, 3=right"""
return self._run(f'xdotool click {button}')
def mouse_position(self) -> Dict[str, Any]:
out = self._run('xdotool getmouselocation --shell')
if out['success']:
info = {}
for line in out['stdout'].splitlines():
if '=' in line:
k, v = line.split('=', 1)
info[k] = int(v)
return {"success": True, "position": info}
return out
# ── Keyboard ─────────────────────────────────────────────────────────────
def type_text(self, text: str) -> Dict[str, Any]:
# Escape special chars for shell
safe = text.replace("'", "'\"'\"'")
return self._run(f"xdotool type --delay 1 '{safe}'")
def key_press(self, key: str) -> Dict[str, Any]:
"""Press a single key (e.g. 'Return', 'Ctrl+c', 'alt+Tab')."""
return self._run(f'xdotool key {key}')
def get_active_window(self) -> Dict[str, Any]:
"""Get currently-focused window info."""
win_id = self._run("xdotool getactivewindow")
if win_id['success']:
title = self._run(f"xdotool getwindowname {win_id['stdout']}")
return {
"success": True,
"window_id": win_id['stdout'],
"title": title.get('stdout', '')
}
return win_id
# ── Node Agent ───────────────────────────────────────────────────────────────
class NodeAgent:
def __init__(self, config_path: Optional[str] = None):
self.config = self._load_config(config_path)
self.capabilities = self._detect_capabilities()
self.computer = None
if self.capabilities.get('enable_computer_control'):
self.computer = ComputerController()
self.browser = None
if self.capabilities.get('enable_browser') and HAS_BROWSER:
try:
self.browser = BrowserController()
except Exception as e:
logger.warning(f"BrowserController init failed: {e}")
def _load_config(self, path: Optional[str]) -> Dict[str, Any]:
"""Load node configuration from JSON."""
cfg_path = Path(path).expanduser() if path else Path.home() / '.config' / 'hermes-node' / 'config.json'
if not cfg_path.exists():
logger.error(f"Config not found: {cfg_path}")
sys.exit(1)
with open(cfg_path) as f:
data = json.load(f)
# Merge defaults
merged = {**DEFAULT_CONFIG, **data}
if not merged['token']:
logger.error("Token missing from config")
sys.exit(1)
return merged
def _detect_capabilities(self) -> Dict[str, Any]:
"""Detect which optional tools are available on this machine."""
caps = {
'enable_browser': self.config.get('enable_browser', False),
'enable_computer_control': self.config.get('enable_computer_control', False),
}
# Browser detection: extension installed signals availability via separate channel
# Computer control: check system tools
if caps['enable_computer_control']:
cc_info = {
'display': os.environ.get('DISPLAY', ':0'),
'has_xdotool': subprocess.run(['which', 'xdotool'], capture_output=True).returncode == 0,
'has_import': subprocess.run(['which', 'import'], capture_output=True).returncode == 0,
'has_scrot': subprocess.run(['which', 'scrot'], capture_output=True).returncode == 0,
}
caps['computer_control'] = cc_info
return caps
async def connect_and_run(self):
"""Main loop: connect to gateway and process commands."""
url = f"{self.config['gateway_url']}?node_name={self.config['node_name']}&token={self.config['token']}"
logger.info(f"Connecting to {url}")
while True:
try:
async with websockets.connect(url, ping_interval=20, ping_timeout=10) as ws:
logger.info("Connected to gateway — awaiting commands")
# Send capabilities announcement
await ws.send(json.dumps({
"type": "capabilities",
"node_name": self.config['node_name'],
"tools": self._get_available_tools(),
"capabilities": self.capabilities
}))
# Listen for commands
async for raw in ws:
msg = json.loads(raw)
await self._handle_message(ws, msg)
except Exception as e:
logger.error(f"Connection error: {e}")
logger.info(f"Reconnecting in {self.config['reconnect_interval']}s...")
await asyncio.sleep(self.config['reconnect_interval'])
def _get_available_tools(self) -> list:
"""Return list of capability strings for gateway registration."""
tools = ['exec'] # always present
if self.capabilities.get('enable_browser'):
tools.append('browser_control')
if self.capabilities.get('enable_computer_control'):
tools.append('computer_control')
return tools
async def _handle_message(self, ws, msg: Dict[str, Any]):
req_type = msg.get('type')
if req_type == 'exec':
cmd_id = msg['id']
command = msg['command']
await self._handle_exec(ws, cmd_id, command)
elif req_type == 'computer_control':
action = msg['action']
params = msg.get('params', {})
await self._handle_cc(ws, msg.get('id'), action, params)
else:
logger.warning(f"Unknown message type: {req_type}")
async def _handle_exec(self, ws, cmd_id: str, command: str):
"""Execute a shell command via sexec (respecting permissions)."""
logger.info(f"Exec: {command}")
try:
sexec = Path(self.config['sexec_path']).expanduser()
if not sexec.exists():
await ws.send(json.dumps({
"type": "exec_result",
"id": cmd_id,
"error": f"sexec not found at {sexec}",
"stdout": "", "stderr": "", "exit_code": 127
}))
return
env = os.environ.copy()
env['SEXEC_COMMAND'] = command
proc = subprocess.Popen(
[sexec],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
env=env,
text=True
)
stdout, stderr = proc.communicate(timeout=300)
await ws.send(json.dumps({
"type": "exec_result",
"id": cmd_id,
"command": command,
"stdout": stdout,
"stderr": stderr,
"exit_code": proc.returncode
}))
except subprocess.CalledProcessError as e:
await ws.send(json.dumps({
"type": "exec_result",
"id": cmd_id,
"error": str(e),
"exit_code": -1
}))
async def _handle_cc(self, ws, cmd_id: str, action: str, params: Dict[str, Any]):
"""Computer control action."""
logger.info(f"CC: {action} params={params}")
if not self.computer:
await ws.send(json.dumps({
"type": "cc_result",
"id": cmd_id,
"success": False,
"error": "computer_control not available on this node"
}))
return
result = {"success": False, "error": "unknown action"}
if action == 'screenshot':
out_path = params.get('path')
res = self.computer.screenshot(output_path=out_path)
result = {**result, **res}
if res['success'] and not out_path:
result['data'] = res.get('data', '')
result['format'] = res.get('format', 'png')
elif action == 'mouse_move':
res = self.computer.mouse_move(params.get('x', 0), params.get('y', 0))
result.update(res)
elif action == 'mouse_click':
button = params.get('button', 1)
result = self.computer.mouse_click(button)
elif action == 'mouse_position':
result = self.computer.mouse_position()
elif action == 'type':
text = params.get('text', '')
result = self.computer.type_text(text)
elif action == 'key':
key = params.get('key', '')
result = self.computer.key_press(key)
elif action == 'active_window':
result = self.computer.get_active_window()
else:
result = {"success": False, "error": f"Unknown action: {action}"}
await ws.send(json.dumps({
"type": "cc_result",
"id": cmd_id,
"action": action,
**result
}))
def main():
parser = argparse.ArgumentParser(description="Hermes Node Agent")
parser.add_argument('--config', type=str, help='Path to config JSON')
parser.add_argument('--debug', action='store_true', help='Debug logging')
args = parser.parse_args()
if args.debug:
logging.getLogger().setLevel(logging.DEBUG)
# Load config to check token
config = NodeAgent(args.config)._load_config(args.config)
if config['token'] == DEFAULT_GATEWAY_TOKEN or config['token'] == 'GATEWAY_TOKEN_MUST_BE_PROVIDED':
logger.error("ERROR: Token not set in config. Edit ~/.config/hermes-node/config.json")
sys.exit(1)
logger.info(f"Node '{config['node_name']}' starting — tools: {NodeAgent(args.config)._get_available_tools()}")
agent = NodeAgent(args.config)
try:
asyncio.run(agent.connect_and_run())
except KeyboardInterrupt:
logger.info("Shutting down")
if __name__ == '__main__':
main()
#!/usr/bin/env python3
"""
Hermes Node Agent - Reverse-connection node executor
Connects to Hermes Gateway via WebSocket and executes commands
via local sexec.sh, preserving the existing permission system.
Author: Lisa (Hermes AI)
Date: 2026-04-29
"""
import asyncio
import json
import logging
import os
import subprocess
import sys
import time
import argparse
from pathlib import Path
from typing import Optional, Dict, Any
try:
import websockets
except ImportError:
print("ERROR: websockets library not found. Install with: pip install websockets")
sys.exit(1)
try:
from browser_controller import BrowserController
BROWSER_CONTROL_ENABLED = True
except ImportError:
BROWSER_CONTROL_ENABLED = False
BrowserController = None
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s [%(levelname)s] %(message)s',
handlers=[
logging.StreamHandler(),
logging.FileHandler('/var/log/hermes-node-agent.log')
]
)
logger = logging.getLogger(__name__)
class NodeAgent:
"""Hermes Node Agent - connects to gateway and executes commands"""
def __init__(self, config_path: str = "/etc/hermes-node/config.json"):
self.config_path = config_path
self.config = self._load_config()
self.ws = None
self.running = False
self.reconnect_delay = 5 # Start with 5 seconds
self.max_reconnect_delay = 60
self.last_heartbeat = 0
def _load_config(self) -> dict:
"""Load configuration from file"""
try:
with open(self.config_path) as f:
config = json.load(f)
# Validate required fields
required = ["gateway_url", "node_name", "token", "sexec_path"]
for field in required:
if field not in config:
raise ValueError(f"Missing required config field: {field}")
# Set defaults
config.setdefault("reconnect_interval", 5)
config.setdefault("heartbeat_interval", 30)
logger.info(f"Loaded config for node '{config['node_name']}'")
return config
except FileNotFoundError:
logger.error(f"Config file not found: {self.config_path}")
logger.info("Create config file with:")
logger.info(json.dumps({
"gateway_url": "ws://192.168.42.115:8765",
"node_name": "example-node",
"token": "your-secret-token",
"sexec_path": "/home/openclaw/.openclaw/skills/sexec/sexec.sh",
"reconnect_interval": 5,
"heartbeat_interval": 30
}, indent=2))
sys.exit(1)
except json.JSONDecodeError as e:
logger.error(f"Invalid JSON in config file: {e}")
sys.exit(1)
except Exception as e:
logger.error(f"Error loading config: {e}")
sys.exit(1)
async def connect(self):
"""Connect to gateway with authentication"""
url = f"{self.config['gateway_url']}/nodes?token={self.config['token']}"
try:
logger.info(f"Connecting to gateway: {self.config['gateway_url']}")
self.ws = await websockets.connect(url, ping_interval=20, ping_timeout=10)
logger.info("Connected to gateway")
# Send registration
await self._register()
# Reset reconnect delay on successful connection
self.reconnect_delay = self.config["reconnect_interval"]
return True
except Exception as e:
logger.error(f"Connection failed: {e}")
return False
async def _register(self):
"""Send registration message to gateway"""
registration = {
"type": "register",
"node_name": self.config["node_name"],
"version": "1.0",
"capabilities": ["exec", "sysinfo"],
"sexec_path": self.config["sexec_path"]
}
await self.ws.send(json.dumps(registration))
logger.info(f"Sent registration for node '{self.config['node_name']}'")
# Wait for ack
response = await self.ws.recv()
msg = json.loads(response)
if msg.get("type") == "register_ack" and msg.get("status") == "ok":
logger.info(f"Registration acknowledged by gateway (version {msg.get('gateway_version')})")
else:
logger.warning(f"Unexpected registration response: {msg}")
async def _send_heartbeat(self):
"""Send periodic heartbeat to gateway"""
while self.running and self.ws:
try:
await asyncio.sleep(self.config["heartbeat_interval"])
if not self.ws or self.ws.state.name == "CLOSED":
break
heartbeat = {
"type": "heartbeat",
"timestamp": int(time.time())
}
await self.ws.send(json.dumps(heartbeat))
self.last_heartbeat = time.time()
logger.debug("Heartbeat sent")
except Exception as e:
logger.error(f"Heartbeat error: {e}")
break
import asyncio
import json
import logging
import os
import subprocess
import sys
import time
import argparse
from pathlib import Path
from typing import Optional, Dict, Any
try:
import websockets
except ImportError:
print("ERROR: websockets library not found. Install with: pip install websockets")
sys.exit(1)
try:
from browser_controller import BrowserController
BROWSER_CONTROL_ENABLED = True
except ImportError:
BROWSER_CONTROL_ENABLED = False
BrowserController = None
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s [%(levelname)s] %(message)s',
handlers=[
logging.StreamHandler(),
logging.FileHandler('/var/log/hermes-node-agent.log')
]
)
logger = logging.getLogger(__name__)
class NodeAgent:
"""Hermes Node Agent - connects to gateway and executes commands"""
def __init__(self, config_path: str = "/etc/hermes-node/config.json"):
self.config_path = config_path
self.config = self._load_config()
self.ws = None
self.running = False
self.reconnect_delay = 5 # Start with 5 seconds
self.max_reconnect_delay = 60
self.last_heartbeat = 0
# Browser controller
if BROWSER_CONTROL_ENABLED:
self.browser_controller = BrowserController()
else:
self.browser_controller = None
async def _handle_message(self, message: str):
"""Handle incoming message from gateway"""
try:
msg = json.loads(message)
msg_type = msg.get("type")
if msg_type == "heartbeat_ack":
logger.debug("Heartbeat acknowledged")
elif msg_type == "exec":
await self._handle_exec(msg)
elif msg_type == "exec_cancel":
await self._handle_cancel(msg)
elif msg_type == "browser_control":
await self._handle_browser_control(msg)
elif msg_type == "disconnect":
logger.info(f"Gateway requested disconnect: {msg.get('reason')}")
self.running = False
else:
logger.warning(f"Unknown message type: {msg_type}")
except json.JSONDecodeError as e:
logger.error(f"Invalid JSON received: {e}")
except Exception as e:
logger.error(f"Error handling message: {e}")
async def _handle_browser_control(self, msg: Dict[str, Any]):
"""
Handle browser control commands with 3-layer interface:
Layer 1 - HIGH LEVEL: navigate, click, fill, screenshot, execute_script, close
Layer 2 - PLAYWRIGHT: Direct Playwright API access
Layer 3 - CDP: Chrome DevTools Protocol access
"""
if not self.browser_controller:
await self._send_response(msg.get("id"), "error",
error="Playwright not installed. Install with: pip install playwright && playwright install chromium")
return
cmd_id = msg.get("id")
command = msg.get("command")
layer = msg.get("layer", "high_level") # high_level, playwright, cdp
logger.info(f"Browser control command: {layer}/{command} for cmd {cmd_id}")
try:
if layer == "high_level":
result = await self._handle_high_level_command(msg)
elif layer == "playwright":
result = await self._handle_playwright_command(msg)
elif layer == "cdp":
result = await self._handle_cdp_command(msg)
else:
result = {"success": False, "error": f"Unknown layer: {layer}"}
result_type = "ok" if result.get("success") else "error"
result.pop("success", None)
await self._send_response(cmd_id, result_type, **result)
except Exception as e:
logger.error(f"Browser control error: {e}", exc_info=True)
await self._send_response(cmd_id, "error", error=str(e))
async def _handle_high_level_command(self, msg: Dict[str, Any]) -> Dict[str, Any]:
"""
Handle high-level browser commands
Commands: launch, create_context, navigate, click, fill, type_text,
screenshot, execute_script, evaluate, wait_for_selector,
get_content, get_title, close, close_page, close_context,
list_pages, list_contexts, new_page
"""
command = msg.get("command")
params = msg.get("params", {})
page_id = msg.get("page_id")
cmd_map = {
# Launch and context
"launch": (self.browser_controller.launch, [params.get("config")], {}),
"create_context": (self.browser_controller.create_context, [params.get("config")], {}),
"new_page": (self.browser_controller.new_page, [params.get("context_name")], {}),
# Navigation
"navigate": (self.browser_controller.navigate, [page_id, params.get("url")],
{"wait_until": params.get("wait_until", "load")}),
# Interaction
"click": (self.browser_controller.click, [page_id, params.get("selector")],
{k: v for k, v in params.items() if k not in ["selector", "command"]}),
"fill": (self.browser_controller.fill, [page_id, params.get("selector"), params.get("value")], {}),
"type_text": (self.browser_controller.type_text, [page_id, params.get("selector"), params.get("text")],
{"delay": params.get("delay", 0)}),
"wait_for_selector": (self.browser_controller.wait_for_selector, [page_id, params.get("selector")],
{"state": params.get("state", "visible"),
"timeout": params.get("timeout", 30000)}),
# Evaluation
"execute_script": (self.browser_controller.execute_script,
[page_id, params.get("script")], {}),
"evaluate": (self.browser_controller.evaluate,
[page_id, params.get("expression")],
{"arg": msg.get("arg")}),
# Inspection
"screenshot": (self.browser_controller.screenshot, [page_id],
{"full_page": params.get("full_page", False),
"path": params.get("path")}),
"get_content": (self.browser_controller.get_content, [page_id], {}),
"get_title": (self.browser_controller.get_title, [page_id], {}),
# Cleanup
"close_page": (self.browser_controller.close_page, [page_id], {}),
"close_context": (self.browser_controller.close_context, [params.get("context_name")], {}),
"close": (self.browser_controller.close, [], {}),
"list_pages": (self.browser_controller.list_pages, [], {}),
"list_contexts": (self.browser_controller.list_contexts, [], {}),
}
handler = cmd_map.get(command)
if not handler:
return {"success": False, "error": f"Unknown command: {command}"}
func, args, kwargs = handler
result = await func(*args, **kwargs)
return result
async def _handle_playwright_command(self, msg: Dict[str, Any]) -> Dict[str, Any]:
"""
Handle direct Playwright API commands
Commands: locator, get_by_text, get_by_role, get_by_test_id,
query_selector, query_selector_all, etc.
"""
command = msg.get("command")
params = msg.get("params", {})
page_id = msg.get("page_id")
args = params.get("args", [])
kwargs = params.get("kwargs", {})
return await self.browser_controller.playwright_command(
page_id, command, args, kwargs
)
async def _handle_cdp_command(self, msg: Dict[str, Any]) -> Dict[str, Any]:
"""
Handle Chrome DevTools Protocol commands
Commands: Network.enable, Network.getResponseBody,
Runtime.evaluate, Page.navigate, etc.
"""
command = msg.get("command")
params = msg.get("params", {})
page_id = msg.get("page_id")
return await self.browser_controller.cdp_command(
page_id, command, params
)
async def _send_response(self, cmd_id: str, result_type: str, **kwargs):
"""
Send response back to gateway
"""
if not self.ws or self.ws.state.name == "CLOSED":
logger.error("Cannot send response: WebSocket closed")
return
response = {
"type": "browser_control_response",
"id": cmd_id,
"result": result_type,
**kwargs
}
await self.ws.send(json.dumps(response))
logger.debug(f"Sent response for cmd {cmd_id}: {result_type}")
async def _handle_exec(self, msg: dict):
"""Execute command via sexec.sh"""
cmd_id = msg.get("id")
command = msg.get("command", [])
timeout = msg.get("timeout", 30)
approved = msg.get("approved", False)
logger.info(f"Executing command {cmd_id}: {' '.join(command)}")
# Build sexec command
sexec_cmd = [
self.config["sexec_path"],
"run",
"--command",
" ".join(command)
]
if approved:
sexec_cmd.append("--approved")
try:
# Execute command
process = await asyncio.create_subprocess_exec(
*sexec_cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
# Stream output
async def stream_output(stream, stream_name):
while True:
line = await stream.readline()
if not line:
break
output_msg = {
"type": "exec_output",
"id": cmd_id,
"stream": stream_name,
"data": line.decode('utf-8', errors='replace')
}
if self.ws and not self.ws.state.name == "CLOSED":
await self.ws.send(json.dumps(output_msg))
# Stream stdout and stderr concurrently
await asyncio.gather(
stream_output(process.stdout, "stdout"),
stream_output(process.stderr, "stderr")
)
# Wait for completion
exit_code = await process.wait()
# Send completion message
complete_msg = {
"type": "exec_complete",
"id": cmd_id,
"exit_code": exit_code
}
if self.ws and not self.ws.state.name == "CLOSED":
await self.ws.send(json.dumps(complete_msg))
logger.info(f"Command {cmd_id} completed with exit code {exit_code}")
except asyncio.TimeoutError:
logger.error(f"Command {cmd_id} timed out after {timeout}s")
timeout_msg = {
"type": "exec_complete",
"id": cmd_id,
"exit_code": -1,
"error": "timeout"
}
if self.ws and not self.ws.state.name == "CLOSED":
await self.ws.send(json.dumps(timeout_msg))
except Exception as e:
logger.error(f"Command {cmd_id} failed: {e}")
error_msg = {
"type": "exec_complete",
"id": cmd_id,
"exit_code": -1,
"error": str(e)
}
if self.ws and not self.ws.state.name == "CLOSED":
await self.ws.send(json.dumps(error_msg))
async def _handle_cancel(self, msg: dict):
"""Handle command cancellation (not implemented yet)"""
cmd_id = msg.get("id")
logger.warning(f"Command cancellation not yet implemented for {cmd_id}")
async def run(self):
"""Main run loop with auto-reconnect"""
self.running = True
while self.running:
if await self.connect():
try:
# Start heartbeat task
heartbeat_task = asyncio.create_task(self._send_heartbeat())
# Message receive loop
async for message in self.ws:
await self._handle_message(message)
# Connection closed
logger.warning("Connection closed by gateway")
heartbeat_task.cancel()
except websockets.exceptions.ConnectionClosed as e:
logger.warning(f"Connection closed: {e}")
except Exception as e:
logger.error(f"Error in run loop: {e}")
if self.running:
logger.info(f"Reconnecting in {self.reconnect_delay}s...")
await asyncio.sleep(self.reconnect_delay)
# Exponential backoff
self.reconnect_delay = min(
self.reconnect_delay * 2,
self.max_reconnect_delay
)
def stop(self):
"""Stop the agent"""
logger.info("Stopping agent...")
self.running = False
if self.ws:
asyncio.create_task(self.ws.close())
def main():
"""Main entry point"""
import signal
# Parse command line args
config_path = sys.argv[1] if len(sys.argv) > 1 else "/etc/hermes-node/config.json"
# Create agent
agent = NodeAgent(config_path)
# Handle signals
def signal_handler(sig, frame):
logger.info(f"Received signal {sig}, shutting down...")
agent.stop()
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
# Run agent
try:
asyncio.run(agent.run())
except KeyboardInterrupt:
logger.info("Interrupted by user")
except Exception as e:
logger.error(f"Fatal error: {e}")
sys.exit(1)
if __name__ == "__main__":
main()
#!/usr/bin/env python3
"""
Hermes Node Agent - Reverse-connection node executor
Connects to Hermes Gateway via WebSocket and executes commands
via local sexec.sh, preserving the existing permission system.
Author: Lisa (Hermes AI)
Date: 2026-04-29
"""
import asyncio
import json
import logging
import os
import subprocess
import sys
import time
import argparse
from pathlib import Path
from typing import Optional, Dict, Any
try:
import websockets
except ImportError:
print("ERROR: websockets library not found. Install with: pip install websockets")
sys.exit(1)
try:
from browser_controller import BrowserController
BROWSER_CONTROL_ENABLED = True
except ImportError:
BROWSER_CONTROL_ENABLED = False
BrowserController = None
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s [%(levelname)s] %(message)s',
handlers=[
logging.StreamHandler(),
logging.FileHandler('/var/log/hermes-node-agent.log')
]
)
logger = logging.getLogger(__name__)
class NodeAgent:
"""Hermes Node Agent - connects to gateway and executes commands"""
def __init__(self, config_path: str = "/etc/hermes-node/config.json"):
self.config_path = config_path
self.config = self._load_config()
self.ws = None
self.running = False
self.reconnect_delay = 5 # Start with 5 seconds
self.max_reconnect_delay = 60
self.last_heartbeat = 0
def _load_config(self) -> dict:
"""Load configuration from file"""
try:
with open(self.config_path) as f:
config = json.load(f)
# Validate required fields
required = ["gateway_url", "node_name", "token", "sexec_path"]
for field in required:
if field not in config:
raise ValueError(f"Missing required config field: {field}")
# Set defaults
config.setdefault("reconnect_interval", 5)
config.setdefault("heartbeat_interval", 30)
logger.info(f"Loaded config for node '{config['node_name']}'")
return config
except FileNotFoundError:
logger.error(f"Config file not found: {self.config_path}")
logger.info("Create config file with:")
logger.info(json.dumps({
"gateway_url": "ws://192.168.42.115:8765",
"node_name": "example-node",
"token": "your-secret-token",
"sexec_path": "/home/openclaw/.openclaw/skills/sexec/sexec.sh",
"reconnect_interval": 5,
"heartbeat_interval": 30
}, indent=2))
sys.exit(1)
except json.JSONDecodeError as e:
logger.error(f"Invalid JSON in config file: {e}")
sys.exit(1)
except Exception as e:
logger.error(f"Error loading config: {e}")
sys.exit(1)
async def connect(self):
"""Connect to gateway with authentication"""
url = f"{self.config['gateway_url']}/nodes?token={self.config['token']}"
try:
logger.info(f"Connecting to gateway: {self.config['gateway_url']}")
self.ws = await websockets.connect(url, ping_interval=20, ping_timeout=10)
logger.info("Connected to gateway")
# Send registration
await self._register()
# Reset reconnect delay on successful connection
self.reconnect_delay = self.config["reconnect_interval"]
return True
except Exception as e:
logger.error(f"Connection failed: {e}")
return False
async def _register(self):
"""Send registration message to gateway"""
registration = {
"type": "register",
"node_name": self.config["node_name"],
"version": "1.0",
"capabilities": ["exec", "sysinfo"],
"sexec_path": self.config["sexec_path"]
}
await self.ws.send(json.dumps(registration))
logger.info(f"Sent registration for node '{self.config['node_name']}'")
# Wait for ack
response = await self.ws.recv()
msg = json.loads(response)
if msg.get("type") == "register_ack" and msg.get("status") == "ok":
logger.info(f"Registration acknowledged by gateway (version {msg.get('gateway_version')})")
else:
logger.warning(f"Unexpected registration response: {msg}")
async def _send_heartbeat(self):
"""Send periodic heartbeat to gateway"""
while self.running and self.ws:
try:
await asyncio.sleep(self.config["heartbeat_interval"])
if not self.ws or self.ws.state.name == "CLOSED":
break
heartbeat = {
"type": "heartbeat",
"timestamp": int(time.time())
}
await self.ws.send(json.dumps(heartbeat))
self.last_heartbeat = time.time()
logger.debug("Heartbeat sent")
except Exception as e:
logger.error(f"Heartbeat error: {e}")
break
import asyncio
import json
import logging
import os
import subprocess
import sys
import time
import argparse
from pathlib import Path
from typing import Optional, Dict, Any
try:
import websockets
except ImportError:
print("ERROR: websockets library not found. Install with: pip install websockets")
sys.exit(1)
try:
from browser_controller import BrowserController
BROWSER_CONTROL_ENABLED = True
except ImportError:
BROWSER_CONTROL_ENABLED = False
BrowserController = None
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s [%(levelname)s] %(message)s',
handlers=[
logging.StreamHandler(),
logging.FileHandler('/var/log/hermes-node-agent.log')
]
)
logger = logging.getLogger(__name__)
class NodeAgent:
"""Hermes Node Agent - connects to gateway and executes commands"""
def __init__(self, config_path: str = "/etc/hermes-node/config.json"):
self.config_path = config_path
self.config = self._load_config()
self.ws = None
self.running = False
self.reconnect_delay = 5 # Start with 5 seconds
self.max_reconnect_delay = 60
self.last_heartbeat = 0
# Browser controller
if BROWSER_CONTROL_ENABLED:
self.browser_controller = BrowserController()
else:
self.browser_controller = None
async def _handle_message(self, message: str):
"""Handle incoming message from gateway"""
try:
msg = json.loads(message)
msg_type = msg.get("type")
if msg_type == "heartbeat_ack":
logger.debug("Heartbeat acknowledged")
elif msg_type == "exec":
await self._handle_exec(msg)
elif msg_type == "exec_cancel":
await self._handle_cancel(msg)
elif msg_type == "browser_control":
await self._handle_browser_control(msg)
elif msg_type == "disconnect":
logger.info(f"Gateway requested disconnect: {msg.get('reason')}")
self.running = False
else:
logger.warning(f"Unknown message type: {msg_type}")
except json.JSONDecodeError as e:
logger.error(f"Invalid JSON received: {e}")
except Exception as e:
logger.error(f"Error handling message: {e}")
async def _handle_browser_control(self, msg: Dict[str, Any]):
"""
Handle browser control commands with 3-layer interface:
Layer 1 - HIGH LEVEL: navigate, click, fill, screenshot, execute_script, close
Layer 2 - PLAYWRIGHT: Direct Playwright API access
Layer 3 - CDP: Chrome DevTools Protocol access
"""
if not self.browser_controller:
await self._send_response(msg.get("id"), "error",
error="Playwright not installed. Install with: pip install playwright && playwright install chromium")
return
cmd_id = msg.get("id")
command = msg.get("command")
layer = msg.get("layer", "high_level") # high_level, playwright, cdp
logger.info(f"Browser control command: {layer}/{command} for cmd {cmd_id}")
try:
if layer == "high_level":
result = await self._handle_high_level_command(msg)
elif layer == "playwright":
result = await self._handle_playwright_command(msg)
elif layer == "cdp":
result = await self._handle_cdp_command(msg)
else:
result = {"success": False, "error": f"Unknown layer: {layer}"}
result_type = "ok" if result.get("success") else "error"
result.pop("success", None)
await self._send_response(cmd_id, result_type, **result)
except Exception as e:
logger.error(f"Browser control error: {e}", exc_info=True)
await self._send_response(cmd_id, "error", error=str(e))
async def _handle_high_level_command(self, msg: Dict[str, Any]) -> Dict[str, Any]:
"""
Handle high-level browser commands
Commands: launch, create_context, navigate, click, fill, type_text,
screenshot, execute_script, evaluate, wait_for_selector,
get_content, get_title, close, close_page, close_context,
list_pages, list_contexts, new_page
"""
command = msg.get("command")
params = msg.get("params", {})
page_id = msg.get("page_id")
cmd_map = {
# Launch and context
"launch": (self.browser_controller.launch, [params.get("config")], {}),
"create_context": (self.browser_controller.create_context, [params.get("config")], {}),
"new_page": (self.browser_controller.new_page, [params.get("context_name")], {}),
# Navigation
"navigate": (self.browser_controller.navigate, [page_id, params.get("url")],
{"wait_until": params.get("wait_until", "load")}),
# Interaction
"click": (self.browser_controller.click, [page_id, params.get("selector")],
{k: v for k, v in params.items() if k not in ["selector", "command"]}),
"fill": (self.browser_controller.fill, [page_id, params.get("selector"), params.get("value")], {}),
"type_text": (self.browser_controller.type_text, [page_id, params.get("selector"), params.get("text")],
{"delay": params.get("delay", 0)}),
"wait_for_selector": (self.browser_controller.wait_for_selector, [page_id, params.get("selector")],
{"state": params.get("state", "visible"),
"timeout": params.get("timeout", 30000)}),
# Evaluation
"execute_script": (self.browser_controller.execute_script,
[page_id, params.get("script")], {}),
"evaluate": (self.browser_controller.evaluate,
[page_id, params.get("expression")],
{"arg": msg.get("arg")}),
# Inspection
"screenshot": (self.browser_controller.screenshot, [page_id],
{"full_page": params.get("full_page", False),
"path": params.get("path")}),
"get_content": (self.browser_controller.get_content, [page_id], {}),
"get_title": (self.browser_controller.get_title, [page_id], {}),
# Cleanup
"close_page": (self.browser_controller.close_page, [page_id], {}),
"close_context": (self.browser_controller.close_context, [params.get("context_name")], {}),
"close": (self.browser_controller.close, [], {}),
"list_pages": (self.browser_controller.list_pages, [], {}),
"list_contexts": (self.browser_controller.list_contexts, [], {}),
}
handler = cmd_map.get(command)
if not handler:
return {"success": False, "error": f"Unknown command: {command}"}
func, args, kwargs = handler
result = await func(*args, **kwargs)
return result
async def _handle_playwright_command(self, msg: Dict[str, Any]) -> Dict[str, Any]:
"""
Handle direct Playwright API commands
Commands: locator, get_by_text, get_by_role, get_by_test_id,
query_selector, query_selector_all, etc.
"""
command = msg.get("command")
params = msg.get("params", {})
page_id = msg.get("page_id")
args = params.get("args", [])
kwargs = params.get("kwargs", {})
return await self.browser_controller.playwright_command(
page_id, command, args, kwargs
)
async def _handle_cdp_command(self, msg: Dict[str, Any]) -> Dict[str, Any]:
"""
Handle Chrome DevTools Protocol commands
Commands: Network.enable, Network.getResponseBody,
Runtime.evaluate, Page.navigate, etc.
"""
command = msg.get("command")
params = msg.get("params", {})
page_id = msg.get("page_id")
return await self.browser_controller.cdp_command(
page_id, command, params
)
async def _send_response(self, cmd_id: str, result_type: str, **kwargs):
"""
Send response back to gateway
"""
if not self.ws or self.ws.state.name == "CLOSED":
logger.error("Cannot send response: WebSocket closed")
return
response = {
"type": "browser_control_response",
"id": cmd_id,
"result": result_type,
**kwargs
}
await self.ws.send(json.dumps(response))
logger.debug(f"Sent response for cmd {cmd_id}: {result_type}")
async def _handle_exec(self, msg: dict):
"""Execute command via sexec.sh"""
cmd_id = msg.get("id")
command = msg.get("command", [])
timeout = msg.get("timeout", 30)
approved = msg.get("approved", False)
logger.info(f"Executing command {cmd_id}: {' '.join(command)}")
# Build sexec command
sexec_cmd = [
self.config["sexec_path"],
"run",
"--command",
" ".join(command)
]
if approved:
sexec_cmd.append("--approved")
try:
# Execute command
process = await asyncio.create_subprocess_exec(
*sexec_cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
# Stream output
async def stream_output(stream, stream_name):
while True:
line = await stream.readline()
if not line:
break
output_msg = {
"type": "exec_output",
"id": cmd_id,
"stream": stream_name,
"data": line.decode('utf-8', errors='replace')
}
if self.ws and not self.ws.state.name == "CLOSED":
await self.ws.send(json.dumps(output_msg))
# Stream stdout and stderr concurrently
await asyncio.gather(
stream_output(process.stdout, "stdout"),
stream_output(process.stderr, "stderr")
)
# Wait for completion
exit_code = await process.wait()
# Send completion message
complete_msg = {
"type": "exec_complete",
"id": cmd_id,
"exit_code": exit_code
}
if self.ws and not self.ws.state.name == "CLOSED":
await self.ws.send(json.dumps(complete_msg))
logger.info(f"Command {cmd_id} completed with exit code {exit_code}")
except asyncio.TimeoutError:
logger.error(f"Command {cmd_id} timed out after {timeout}s")
timeout_msg = {
"type": "exec_complete",
"id": cmd_id,
"exit_code": -1,
"error": "timeout"
}
if self.ws and not self.ws.state.name == "CLOSED":
await self.ws.send(json.dumps(timeout_msg))
except Exception as e:
logger.error(f"Command {cmd_id} failed: {e}")
error_msg = {
"type": "exec_complete",
"id": cmd_id,
"exit_code": -1,
"error": str(e)
}
if self.ws and not self.ws.state.name == "CLOSED":
await self.ws.send(json.dumps(error_msg))
async def _handle_cancel(self, msg: dict):
"""Handle command cancellation (not implemented yet)"""
cmd_id = msg.get("id")
logger.warning(f"Command cancellation not yet implemented for {cmd_id}")
async def run(self):
"""Main run loop with auto-reconnect"""
self.running = True
while self.running:
if await self.connect():
try:
# Start heartbeat task
heartbeat_task = asyncio.create_task(self._send_heartbeat())
# Message receive loop
async for message in self.ws:
await self._handle_message(message)
# Connection closed
logger.warning("Connection closed by gateway")
heartbeat_task.cancel()
except websockets.exceptions.ConnectionClosed as e:
logger.warning(f"Connection closed: {e}")
except Exception as e:
logger.error(f"Error in run loop: {e}")
if self.running:
logger.info(f"Reconnecting in {self.reconnect_delay}s...")
await asyncio.sleep(self.reconnect_delay)
# Exponential backoff
self.reconnect_delay = min(
self.reconnect_delay * 2,
self.max_reconnect_delay
)
def stop(self):
"""Stop the agent"""
logger.info("Stopping agent...")
self.running = False
if self.ws:
asyncio.create_task(self.ws.close())
def main():
"""Main entry point"""
import signal
# Parse command line args
config_path = sys.argv[1] if len(sys.argv) > 1 else "/etc/hermes-node/config.json"
# Create agent
agent = NodeAgent(config_path)
# Handle signals
def signal_handler(sig, frame):
logger.info(f"Received signal {sig}, shutting down...")
agent.stop()
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
# Run agent
try:
asyncio.run(agent.run())
except KeyboardInterrupt:
logger.info("Interrupted by user")
except Exception as e:
logger.error(f"Fatal error: {e}")
sys.exit(1)
if __name__ == "__main__":
main()
#!/bin/bash
# Hermes Node Agent Installation Script
# Installs the node agent on a remote machine
set -e
echo "=== Hermes Node Agent Installer ==="
echo ""
# Check if running as root
if [ "$EUID" -eq 0 ]; then
echo "ERROR: Do not run as root. Run as the user who will run the agent."
exit 1
fi
# Check for Python 3
if ! command -v python3 &> /dev/null; then
echo "ERROR: Python 3 is required but not found."
exit 1
fi
# Check for pip
if ! command -v pip3 &> /dev/null; then
echo "ERROR: pip3 is required but not found."
echo "Install with: sudo apt install python3-pip"
exit 1
fi
# Install websockets library
echo "[1/6] Installing Python dependencies..."
apt-get update
apt-get install -y python3-websockets
# Create config directory
echo "[2/6] Creating config directory..."
sudo mkdir -p /etc/hermes-node
sudo chown $USER:$USER /etc/hermes-node
# Copy agent script
echo "[3/6] Installing agent script..."
sudo cp hermes_node_agent.py /usr/local/bin/hermes-node-agent
sudo chmod +x /usr/local/bin/hermes-node-agent
# Create example config if it doesn't exist
if [ ! -f /etc/hermes-node/config.json ]; then
echo "[4/6] Creating example config..."
cat > /etc/hermes-node/config.json <<EOF
{
"gateway_url": "ws://192.168.42.115:8765",
"node_name": "$(hostname)",
"token": "CHANGE-ME-$(openssl rand -hex 16)",
"sexec_path": "$HOME/.openclaw/skills/sexec/sexec.sh",
"reconnect_interval": 5,
"heartbeat_interval": 30
}
EOF
echo " ⚠️ Config created at /etc/hermes-node/config.json"
echo " ⚠️ EDIT THIS FILE: Set gateway_url, node_name, and token"
else
echo "[4/6] Config already exists, skipping..."
fi
# Install SysV init service
echo "[5/6] Installing SysV init service..."
sudo cp hermes-node-agent.init.d /etc/init.d/hermes-node-agent
sudo chmod +x /etc/init.d/hermes-node-agent
sudo update-rc.d hermes-node-agent defaults 2>/dev/null || true
# Enable but don't start (user needs to configure first)
echo "[6/6] Service configured..."
echo ""
echo "✅ Installation complete!"
echo ""
echo "Next steps:"
echo " 1. Edit /etc/hermes-node/config.json with your gateway URL and token"
echo " 2. Ensure sexec.sh is installed at the configured path"
echo " 3. Start the agent: /etc/init.d/hermes-node-agent start"
echo " 4. Check status: /etc/init.d/hermes-node-agent status"
echo " 5. View logs: tail -f /var/log/hermes-node-agent.log"
echo ""
#!/usr/bin/env python3
"""Test browser controller"""
import asyncio
from browser_controller import BrowserController
async def test():
controller = BrowserController()
try:
print("Initializing Playwright...")
await controller.initialize()
print("✅ Playwright initialized")
print("\nLaunching browser (headless)...")
result = await controller.launch({"headless": True})
print(f"✅ {result}")
print("\nCreating context...")
result = await controller.create_context({"name": "test_ctx"})
print(f"✅ {result}")
page_id = result.get("page_id")
print(f"\nNavigating to example.com (page_id={page_id})...")
result = await controller.navigate(page_id, "https://example.com")
print(f"✅ {result}")
print("\nGetting title...")
result = await controller.get_title(page_id)
print(f"✅ {result}")
print("\nTaking screenshot...")
result = await controller.screenshot(page_id, full_page=True)
print(f"✅ Screenshot captured ({len(result.get('screenshot', ''))} chars base64)")
print("\nClosing browser...")
await controller.close()
print("✅ Browser closed")
print("\n🎉 All tests passed!")
except Exception as e:
print(f"❌ Error: {e}")
import traceback
traceback.print_exc()
if __name__ == "__main__":
asyncio.run(test())
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment