Commit 77f5cccc authored by Lisa (AI Assistant)'s avatar Lisa (AI Assistant)

Add Redis backend support with environment variable configuration

- Add optional Redis backend support
- Detect Redis from CLAWPHONE_REDIS_URL environment variable
- Fall back to SQLite if Redis connection fails
- Store agents and jobs in Redis hash structures
- Add Redis connection initialization and error handling
- Display backend type in startup log
parent f97377fd
...@@ -2,6 +2,7 @@ ...@@ -2,6 +2,7 @@
""" """
ClawPhone - MCP Server using FastMCP for both HTTPS streaming and SSE ClawPhone - MCP Server using FastMCP for both HTTPS streaming and SSE
Supports both transport modes on the same port with HTTPS by default Supports both transport modes on the same port with HTTPS by default
Optional Redis backend support via environment variables
""" """
import os import os
...@@ -30,6 +31,10 @@ DEFAULT_LOG_PATH = "/var/log/clawphone" ...@@ -30,6 +31,10 @@ DEFAULT_LOG_PATH = "/var/log/clawphone"
DEFAULT_HOST = "0.0.0.0" DEFAULT_HOST = "0.0.0.0"
DEFAULT_PORT = 8765 DEFAULT_PORT = 8765
# Redis configuration
REDIS_URL = os.getenv("CLAWPHONE_REDIS_URL")
REDIS_ENABLED = bool(REDIS_URL)
# Parse CLI # Parse CLI
parser = argparse.ArgumentParser(description="ClawPhone MCP Server") parser = argparse.ArgumentParser(description="ClawPhone MCP Server")
parser.add_argument("--host", default=None) parser.add_argument("--host", default=None)
...@@ -54,7 +59,7 @@ LOG_DIR = args.log_dir or os.getenv("CLAWPHONE_LOG_DIR", DEFAULT_LOG_PATH) ...@@ -54,7 +59,7 @@ LOG_DIR = args.log_dir or os.getenv("CLAWPHONE_LOG_DIR", DEFAULT_LOG_PATH)
# Ensure DB directory exists # Ensure DB directory exists
db_dir = os.path.dirname(DB_PATH) db_dir = os.path.dirname(DB_PATH)
if db_dir: if db_dir and not REDIS_ENABLED:
os.makedirs(db_dir, exist_ok=True) os.makedirs(db_dir, exist_ok=True)
# Logging # Logging
...@@ -77,23 +82,51 @@ API_TOKEN = "" ...@@ -77,23 +82,51 @@ API_TOKEN = ""
HTTP_CLIENT = httpx.AsyncClient(verify=False, timeout=30.0) HTTP_CLIENT = httpx.AsyncClient(verify=False, timeout=30.0)
AGENTS = {} AGENTS = {}
# Redis client
REDIS_CLIENT = None
async def init_redis():
"""Initialize Redis connection if REDIS_ENABLED"""
global REDIS_CLIENT
if REDIS_ENABLED:
try:
import redis.asyncio as redis
REDIS_CLIENT = redis.from_url(REDIS_URL, decode_responses=True)
# Test connection
await REDIS_CLIENT.ping()
logger.info(f"Connected to Redis: {REDIS_URL}")
except Exception as e:
logger.error(f"Failed to connect to Redis: {e}")
logger.warning("Falling back to SQLite")
REDIS_ENABLED = False
# Create FastMCP server # Create FastMCP server
mcp = FastMCP("ClawPhone", host=HOST, port=PORT) mcp = FastMCP("ClawPhone", host=HOST, port=PORT)
async def init_db(): async def init_db():
async with aiosqlite.connect(DB_PATH) as db: if REDIS_ENABLED:
await db.execute("CREATE TABLE IF NOT EXISTS config (key TEXT PRIMARY KEY, value TEXT NOT NULL)") logger.info("Using Redis backend")
await db.execute("CREATE TABLE IF NOT EXISTS agents (name TEXT PRIMARY KEY, hook TEXT NOT NULL, token TEXT, capability_prompt TEXT, skill_prompt TEXT, registered_at TEXT NOT NULL)") else:
await db.execute("CREATE TABLE IF NOT EXISTS jobs (id TEXT PRIMARY KEY, sender TEXT NOT NULL, target_agent TEXT NOT NULL, title TEXT NOT NULL, description TEXT, status TEXT DEFAULT 'pending', result TEXT, reason TEXT, created_at TEXT NOT NULL, updated_at TEXT NOT NULL, retry_count INTEGER DEFAULT 0, next_retry_at TEXT)") logger.info("Using SQLite backend")
await db.commit() async with aiosqlite.connect(DB_PATH) as db:
await db.execute("CREATE TABLE IF NOT EXISTS config (key TEXT PRIMARY KEY, value TEXT NOT NULL)")
await db.execute("CREATE TABLE IF NOT EXISTS agents (name TEXT PRIMARY KEY, hook TEXT NOT NULL, token TEXT, capability_prompt TEXT, skill_prompt TEXT, registered_at TEXT NOT NULL)")
await db.execute("CREATE TABLE IF NOT EXISTS jobs (id TEXT PRIMARY KEY, sender TEXT NOT NULL, target_agent TEXT NOT NULL, title TEXT NOT NULL, description TEXT, status TEXT DEFAULT 'pending', result TEXT, reason TEXT, created_at TEXT NOT NULL, updated_at TEXT NOT NULL, retry_count INTEGER DEFAULT 0, next_retry_at TEXT)")
await db.commit()
await load_agents() await load_agents()
async def load_agents(): async def load_agents():
global AGENTS global AGENTS
async with aiosqlite.connect(DB_PATH) as db: if REDIS_ENABLED:
cur = await db.execute("SELECT name, hook, token, capability_prompt, skill_prompt FROM agents") # Load agents from Redis
rows = await cur.fetchall() agents = await REDIS_CLIENT.hgetall("clawphone:agents")
AGENTS = {r[0]: {"hook": r[1], "token": r[2], "capability_prompt": r[3] or "", "skill_prompt": r[4] or ""} for r in rows} AGENTS = {name: json.loads(data) for name, data in agents.items()}
else:
# Load agents from SQLite
async with aiosqlite.connect(DB_PATH) as db:
cur = await db.execute("SELECT name, hook, token, capability_prompt, skill_prompt FROM agents")
rows = await cur.fetchall()
AGENTS = {r[0]: {"hook": r[1], "token": r[2], "capability_prompt": r[3] or "", "skill_prompt": r[4] or ""} for r in rows}
logger.info(f"Loaded {len(AGENTS)} agents") logger.info(f"Loaded {len(AGENTS)} agents")
async def notify_hook(agent_name, event, data): async def notify_hook(agent_name, event, data):
...@@ -116,15 +149,23 @@ async def init_token(): ...@@ -116,15 +149,23 @@ async def init_token():
elif os.getenv("CLAWPHONE_TOKEN"): elif os.getenv("CLAWPHONE_TOKEN"):
API_TOKEN = os.getenv("CLAWPHONE_TOKEN") API_TOKEN = os.getenv("CLAWPHONE_TOKEN")
else: else:
async with aiosqlite.connect(DB_PATH) as db: if REDIS_ENABLED:
cur = await db.execute("SELECT value FROM config WHERE key = 'server_token'") # Get token from Redis
row = await cur.fetchone() API_TOKEN = await REDIS_CLIENT.get("clawphone:server_token")
if row: if not API_TOKEN:
API_TOKEN = row[0]
else:
API_TOKEN = secrets.token_hex(32) API_TOKEN = secrets.token_hex(32)
await db.execute("INSERT OR REPLACE INTO config VALUES ('server_token', ?)", (API_TOKEN,)) await REDIS_CLIENT.set("clawphone:server_token", API_TOKEN)
await db.commit() else:
# Get token from SQLite
async with aiosqlite.connect(DB_PATH) as db:
cur = await db.execute("SELECT value FROM config WHERE key = 'server_token'")
row = await cur.fetchone()
if row:
API_TOKEN = row[0]
else:
API_TOKEN = secrets.token_hex(32)
await db.execute("INSERT OR REPLACE INTO config VALUES ('server_token', ?)", (API_TOKEN,))
await db.commit()
logger.info(f"Token: {API_TOKEN[:16]}...") logger.info(f"Token: {API_TOKEN[:16]}...")
# MCP Tools # MCP Tools
...@@ -134,10 +175,23 @@ async def register(hook: str, token: str, name: str = None) -> str: ...@@ -134,10 +175,23 @@ async def register(hook: str, token: str, name: str = None) -> str:
if not name: if not name:
import urllib.parse import urllib.parse
name = urllib.parse.urlparse(hook).netloc.split('.')[0] or "unknown" name = urllib.parse.urlparse(hook).netloc.split('.')[0] or "unknown"
async with aiosqlite.connect(DB_PATH) as db:
await db.execute("INSERT OR REPLACE INTO agents VALUES (?, ?, ?, ?, ?, ?)", agent_data = {
(name, hook, token, "", "", datetime.now().isoformat())) "hook": hook,
await db.commit() "token": token,
"capability_prompt": "",
"skill_prompt": "",
"registered_at": datetime.now().isoformat()
}
if REDIS_ENABLED:
await REDIS_CLIENT.hset("clawphone:agents", name, json.dumps(agent_data))
else:
async with aiosqlite.connect(DB_PATH) as db:
await db.execute("INSERT OR REPLACE INTO agents VALUES (?, ?, ?, ?, ?, ?)",
(name, hook, token, "", "", datetime.now().isoformat()))
await db.commit()
await load_agents() await load_agents()
return json.dumps({"success": True, "agent": name}) return json.dumps({"success": True, "agent": name})
...@@ -160,10 +214,30 @@ async def post_job(sender: str, target_agent: str, title: str, description: str ...@@ -160,10 +214,30 @@ async def post_job(sender: str, target_agent: str, title: str, description: str
return json.dumps({"success": False, "error": "missing required"}) return json.dumps({"success": False, "error": "missing required"})
job_id = str(uuid.uuid4()) job_id = str(uuid.uuid4())
now = datetime.now().isoformat() now = datetime.now().isoformat()
async with aiosqlite.connect(DB_PATH) as db:
await db.execute("INSERT INTO jobs VALUES (?, ?, ?, ?, ?, 'pending', NULL, NULL, ?, ?, 0, NULL)", job = {
(job_id, sender, target_agent, title, description, now, now)) "id": job_id,
await db.commit() "sender": sender,
"target_agent": target_agent,
"title": title,
"description": description,
"status": "pending",
"result": None,
"reason": None,
"created_at": now,
"updated_at": now,
"retry_count": 0,
"next_retry_at": None
}
if REDIS_ENABLED:
await REDIS_CLIENT.hset("clawphone:jobs", job_id, json.dumps(job))
else:
async with aiosqlite.connect(DB_PATH) as db:
await db.execute("INSERT INTO jobs VALUES (?, ?, ?, ?, ?, 'pending', NULL, NULL, ?, ?, 0, NULL)",
(job_id, sender, target_agent, title, description, now, now))
await db.commit()
await notify_hook(target_agent, "new_job", {"job_id": job_id, "sender": sender, "title": title}) await notify_hook(target_agent, "new_job", {"job_id": job_id, "sender": sender, "title": title})
return json.dumps({"success": True, "job_id": job_id}) return json.dumps({"success": True, "job_id": job_id})
...@@ -172,17 +246,32 @@ async def claim_job(job_id: str, agent: str) -> str: ...@@ -172,17 +246,32 @@ async def claim_job(job_id: str, agent: str) -> str:
"""Claim a job for an agent""" """Claim a job for an agent"""
if not job_id or not agent: if not job_id or not agent:
return json.dumps({"success": False, "error": "missing required"}) return json.dumps({"success": False, "error": "missing required"})
async with aiosqlite.connect(DB_PATH) as db:
cur = await db.execute("SELECT status, target_agent FROM jobs WHERE id = ?", (job_id,)) if REDIS_ENABLED:
row = await cur.fetchone() job_data = await REDIS_CLIENT.hget("clawphone:jobs", job_id)
if not row: if not job_data:
return json.dumps({"success": False, "error": "Not found"}) return json.dumps({"success": False, "error": "Not found"})
if row[0] != "pending": job = json.loads(job_data)
return json.dumps({"success": False, "error": f"Job is {row[0]}"}) if job["status"] != "pending":
if row[1] != agent: return json.dumps({"success": False, "error": f"Job is {job['status']}"})
if job["target_agent"] != agent:
return json.dumps({"success": False, "error": "Wrong agent"}) return json.dumps({"success": False, "error": "Wrong agent"})
await db.execute("UPDATE jobs SET status='claimed', updated_at=? WHERE id=?", (datetime.now().isoformat(), job_id)) job["status"] = "claimed"
await db.commit() job["updated_at"] = datetime.now().isoformat()
await REDIS_CLIENT.hset("clawphone:jobs", job_id, json.dumps(job))
else:
async with aiosqlite.connect(DB_PATH) as db:
cur = await db.execute("SELECT status, target_agent FROM jobs WHERE id = ?", (job_id,))
row = await cur.fetchone()
if not row:
return json.dumps({"success": False, "error": "Not found"})
if row[0] != "pending":
return json.dumps({"success": False, "error": f"Job is {row[0]}"})
if row[1] != agent:
return json.dumps({"success": False, "error": "Wrong agent"})
await db.execute("UPDATE jobs SET status='claimed', updated_at=? WHERE id=?", (datetime.now().isoformat(), job_id))
await db.commit()
return json.dumps({"success": True}) return json.dumps({"success": True})
@mcp.tool() @mcp.tool()
...@@ -190,9 +279,21 @@ async def reject_job(job_id: str, agent: str, reason: str = "") -> str: ...@@ -190,9 +279,21 @@ async def reject_job(job_id: str, agent: str, reason: str = "") -> str:
"""Reject a job""" """Reject a job"""
if not job_id or not agent: if not job_id or not agent:
return json.dumps({"success": False, "error": "missing required"}) return json.dumps({"success": False, "error": "missing required"})
async with aiosqlite.connect(DB_PATH) as db:
await db.execute("UPDATE jobs SET status='rejected', reason=?, updated_at=? WHERE id=?", (reason, datetime.now().isoformat(), job_id)) if REDIS_ENABLED:
await db.commit() job_data = await REDIS_CLIENT.hget("clawphone:jobs", job_id)
if not job_data:
return json.dumps({"success": False, "error": "Not found"})
job = json.loads(job_data)
job["status"] = "rejected"
job["reason"] = reason
job["updated_at"] = datetime.now().isoformat()
await REDIS_CLIENT.hset("clawphone:jobs", job_id, json.dumps(job))
else:
async with aiosqlite.connect(DB_PATH) as db:
await db.execute("UPDATE jobs SET status='rejected', reason=?, updated_at=? WHERE id=?", (reason, datetime.now().isoformat(), job_id))
await db.commit()
return json.dumps({"success": True}) return json.dumps({"success": True})
@mcp.tool() @mcp.tool()
...@@ -200,45 +301,77 @@ async def update_job_status(job_id: str, status: str, agent: str, result: str = ...@@ -200,45 +301,77 @@ async def update_job_status(job_id: str, status: str, agent: str, result: str =
"""Update job status""" """Update job status"""
if not all([job_id, status, agent]): if not all([job_id, status, agent]):
return json.dumps({"success": False, "error": "missing required"}) return json.dumps({"success": False, "error": "missing required"})
async with aiosqlite.connect(DB_PATH) as db:
cur = await db.execute("SELECT target_agent FROM jobs WHERE id = ?", (job_id,)) if REDIS_ENABLED:
row = await cur.fetchone() job_data = await REDIS_CLIENT.hget("clawphone:jobs", job_id)
if not row: if not job_data:
return json.dumps({"success": False, "error": "Not found"}) return json.dumps({"success": False, "error": "Not found"})
if row[0] != agent: job = json.loads(job_data)
if job["target_agent"] != agent:
return json.dumps({"success": False, "error": "Wrong agent"}) return json.dumps({"success": False, "error": "Wrong agent"})
await db.execute("UPDATE jobs SET status=?, result=?, updated_at=? WHERE id=?", job["status"] = status
(status, result, datetime.now().isoformat(), job_id)) job["result"] = result
await db.commit() job["updated_at"] = datetime.now().isoformat()
await REDIS_CLIENT.hset("clawphone:jobs", job_id, json.dumps(job))
else:
async with aiosqlite.connect(DB_PATH) as db:
cur = await db.execute("SELECT target_agent FROM jobs WHERE id = ?", (job_id,))
row = await cur.fetchone()
if not row:
return json.dumps({"success": False, "error": "Not found"})
if row[0] != agent:
return json.dumps({"success": False, "error": "Wrong agent"})
await db.execute("UPDATE jobs SET status=?, result=?, updated_at=? WHERE id=?",
(status, result, datetime.now().isoformat(), job_id))
await db.commit()
return json.dumps({"success": True}) return json.dumps({"success": True})
@mcp.tool() @mcp.tool()
async def list_jobs(agent: str = None, status: str = None) -> str: async def list_jobs(agent: str = None, status: str = None) -> str:
"""List jobs, optionally filtered by agent and status""" """List jobs, optionally filtered by agent and status"""
q, p = "SELECT * FROM jobs WHERE 1=1", [] if REDIS_ENABLED:
if agent: # Get jobs from Redis
q += " AND (target_agent=? OR sender=?)" jobs_data = await REDIS_CLIENT.hgetall("clawphone:jobs")
p += [agent, agent] jobs = [json.loads(data) for data in jobs_data.values()]
if status: else:
q += " AND status=?" # Get jobs from SQLite
p += [status] q, p = "SELECT * FROM jobs WHERE 1=1", []
q += " ORDER BY created_at DESC" if agent:
async with aiosqlite.connect(DB_PATH) as db: q += " AND (target_agent=? OR sender=?)"
db.row_factory = sqlite3.Row p += [agent, agent]
cur = await db.execute(q, p) if status:
rows = await cur.fetchall() q += " AND status=?"
return json.dumps({"jobs": [dict(r) for r in rows]}) p += [status]
q += " ORDER BY created_at DESC"
async with aiosqlite.connect(DB_PATH) as db:
db.row_factory = sqlite3.Row
cur = await db.execute(q, p)
jobs = await cur.fetchall()
jobs = [dict(r) for r in jobs]
return json.dumps({"jobs": jobs})
@mcp.tool() @mcp.tool()
async def get_job(job_id: str) -> str: async def get_job(job_id: str) -> str:
"""Get job details""" """Get job details"""
async with aiosqlite.connect(DB_PATH) as db: if REDIS_ENABLED:
db.row_factory = sqlite3.Row job_data = await REDIS_CLIENT.hget("clawphone:jobs", job_id)
cur = await db.execute("SELECT * FROM jobs WHERE id = ?", (job_id,)) if job_data:
row = await cur.fetchone() job = json.loads(job_data)
return json.dumps({"job": dict(row)}) if row else json.dumps({"error": "Not found"}) return json.dumps({"job": job})
return json.dumps({"error": "Not found"})
else:
async with aiosqlite.connect(DB_PATH) as db:
db.row_factory = sqlite3.Row
cur = await db.execute("SELECT * FROM jobs WHERE id = ?", (job_id,))
row = await cur.fetchone()
return json.dumps({"job": dict(row)}) if row else json.dumps({"error": "Not found"})
async def main(): async def main():
# Initialize Redis if configured
await init_redis()
await init_db() await init_db()
await init_token() await init_token()
...@@ -247,6 +380,11 @@ async def main(): ...@@ -247,6 +380,11 @@ async def main():
logger.info(f" SERVER TOKEN: {API_TOKEN}") logger.info(f" SERVER TOKEN: {API_TOKEN}")
logger.info(f" MODE: {'HTTPS' if not args.plain else 'HTTP'}") logger.info(f" MODE: {'HTTPS' if not args.plain else 'HTTP'}")
logger.info(f" PORTS: {PORT}") logger.info(f" PORTS: {PORT}")
logger.info(f" DATABASE: {'Redis' if REDIS_ENABLED else 'SQLite'}")
if REDIS_ENABLED:
logger.info(f" REDIS URL: {REDIS_URL}")
else:
logger.info(f" DB PATH: {DB_PATH}")
logger.info(f" ENDPOINTS: /mcp (streaming), /sse (SSE), /messages/ (messages)") logger.info(f" ENDPOINTS: /mcp (streaming), /sse (SSE), /messages/ (messages)")
logger.info("="*60) logger.info("="*60)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment