Commit 835dab76 authored by Lisa (AI Assistant)'s avatar Lisa (AI Assistant)

Add HTTP webhook endpoints mirroring all MCP tools at /hook/<tool>

parent 796f7ea5
Pipeline #276 canceled with stages
...@@ -2,6 +2,8 @@ ...@@ -2,6 +2,8 @@
""" """
ClawPhone - Centralized Job Queue MCP Server for OpenClaw Agents ClawPhone - Centralized Job Queue MCP Server for OpenClaw Agents
All webhooks use HTTPS with verify=False All webhooks use HTTPS with verify=False
HTTP endpoints mirror MCP tools at /hook/<tool_name>
""" """
import os import os
...@@ -14,7 +16,7 @@ from pathlib import Path ...@@ -14,7 +16,7 @@ from pathlib import Path
from contextlib import asynccontextmanager from contextlib import asynccontextmanager
from typing import Optional from typing import Optional
from fastapi import FastAPI, HTTPException, Header, Body from fastapi import FastAPI, HTTPException, Header, Body, Request
from fastapi.responses import PlainTextResponse from fastapi.responses import PlainTextResponse
import uvicorn import uvicorn
import httpx import httpx
...@@ -213,27 +215,27 @@ async def lifespan(app): ...@@ -213,27 +215,27 @@ async def lifespan(app):
app = FastAPI(lifespan=lifespan) app = FastAPI(lifespan=lifespan)
# ============================================================
# MCP TOOLS ENDPOINTS (JSON-RPC style)
# ============================================================
@app.get("/tools") @app.get("/tools")
async def list_tools(authorization: str = Header(None)): async def list_tools(authorization: str = Header(None)):
verify_token(authorization) verify_token(authorization)
return {"tools": [ return {"tools": [
{"name": "register", "description": "Register this agent with the MCP server. Sends own token, hook URL, capability_prompt and skill_prompt", {"name": "register", "description": "Register this agent with the MCP server",
"inputSchema": {"type": "object", "properties": { "inputSchema": {"type": "object", "properties": {
"hook": {"type": "string", "description": "Your webhook URL (https://your-server/hooks/agent)"}, "hook": {}, "token": {}, "name": {}, "capability_prompt": {}, "skill_prompt": {}
"token": {"type": "string", "description": "Your OpenClaw hook token"},
"capability_prompt": {"type": "string", "description": "Short description of what this agent can do"},
"skill_prompt": {"type": "string", "description": "Detailed instructions on how to make requests to this agent (can be large)"}
}, "required": ["hook", "token"]}}, }, "required": ["hook", "token"]}},
{"name": "list_hosts", "description": "List all registered agents with their capability prompts", {"name": "list_hosts", "description": "List all registered agents", "inputSchema": {"type": "object", "properties": {}}},
"inputSchema": {"type": "object", "properties": {}}}, {"name": "get_host_info", "description": "Get info about a specific agent",
{"name": "get_host_info", "description": "Get detailed info about a specific agent including skill prompt",
"inputSchema": {"type": "object", "properties": {"name": {}}, "required": ["name"]}}, "inputSchema": {"type": "object", "properties": {"name": {}}, "required": ["name"]}},
{"name": "post_job", "description": "Post a new job for an agent", {"name": "post_job", "description": "Post a new job",
"inputSchema": {"type": "object", "properties": {"sender": {}, "target_agent": {}, "title": {}, "description": {}}, "required": ["sender", "target_agent", "title"]}}, "inputSchema": {"type": "object", "properties": {"sender": {}, "target_agent": {}, "title": {}, "description": {}}, "required": ["sender", "target_agent", "title"]}},
{"name": "claim_job", "description": "Claim a pending job", {"name": "claim_job", "description": "Claim a pending job",
"inputSchema": {"type": "object", "properties": {"job_id": {}, "agent": {}}, "required": ["job_id", "agent"]}}, "inputSchema": {"type": "object", "properties": {"job_id": {}, "agent": {}}, "required": ["job_id", "agent"]}},
{"name": "update_job_status", "description": "Update job status", {"name": "update_job_status", "description": "Update job status",
"inputSchema": {"type": "object", "properties": {"job_id": {}, "status": {"enum": ["working", "done", "failed"]}, "agent": {}, "result": {}}, "required": ["job_id", "status", "agent"]}}, "inputSchema": {"type": "object", "properties": {"job_id": {}, "status": {}, "agent": {}, "result": {}}, "required": ["job_id", "status", "agent"]}},
{"name": "list_jobs", "description": "List jobs", {"name": "list_jobs", "description": "List jobs",
"inputSchema": {"type": "object", "properties": {"agent": {}, "status": {}}}}, "inputSchema": {"type": "object", "properties": {"agent": {}, "status": {}}}},
{"name": "get_job", "description": "Get job details", {"name": "get_job", "description": "Get job details",
...@@ -243,52 +245,25 @@ async def list_tools(authorization: str = Header(None)): ...@@ -243,52 +245,25 @@ async def list_tools(authorization: str = Header(None)):
@app.get("/tools/list_hosts") @app.get("/tools/list_hosts")
async def list_hosts(authorization: str = Header(None)): async def list_hosts(authorization: str = Header(None)):
"""List all registered agents with their capability prompts"""
verify_token(authorization) verify_token(authorization)
await load_agents() # Refresh from DB await load_agents()
hosts = {name: {"hook": info.get("hook"), "capability_prompt": info.get("capability_prompt", "")[:500], "registered": True} for name, info in AGENTS.items()}
# Return agents with name, hook, and capability_prompt (not the full skill_prompt for listing)
hosts = {}
for name, info in AGENTS.items():
hosts[name] = {
"hook": info.get("hook"),
"capability_prompt": info.get("capability_prompt", "")[:500], # Truncate for listing
"registered": True
}
return {"hosts": hosts} return {"hosts": hosts}
@app.get("/tools/get_host_info") @app.get("/tools/get_host_info")
async def get_host_info(name: str, authorization: str = Header(None)): async def get_host_info(name: str, authorization: str = Header(None)):
"""Get detailed info about a specific agent including full skill prompt"""
verify_token(authorization) verify_token(authorization)
await load_agents() # Refresh from DB await load_agents()
if name not in AGENTS: if name not in AGENTS:
return {"error": f"Agent '{name}' not found"} return {"error": f"Agent '{name}' not found"}
info = AGENTS[name] info = AGENTS[name]
return { return {"name": name, "hook": info.get("hook"), "capability_prompt": info.get("capability_prompt", ""), "skill_prompt": info.get("skill_prompt", ""), "registered": True}
"name": name,
"hook": info.get("hook"),
"capability_prompt": info.get("capability_prompt", ""),
"skill_prompt": info.get("skill_prompt", ""),
"registered": True
}
@app.post("/tools/register") @app.post("/tools/register")
async def tool_register(data: dict = Body(...), authorization: str = Header(None)): async def tool_register(data: dict = Body(...), authorization: str = Header(None)):
"""
Agent self-registration. The agent sends:
- hook: its webhook URL
- token: its own hook token for callbacks
- capability_prompt: short description of capabilities
- skill_prompt: detailed instructions (can be large)
"""
verify_token(authorization) verify_token(authorization)
hook = data.get("hook") hook = data.get("hook")
token = data.get("token", "") token = data.get("token", "")
capability_prompt = data.get("capability_prompt", "") capability_prompt = data.get("capability_prompt", "")
...@@ -297,41 +272,30 @@ async def tool_register(data: dict = Body(...), authorization: str = Header(None ...@@ -297,41 +272,30 @@ async def tool_register(data: dict = Body(...), authorization: str = Header(None
if not hook: if not hook:
return {"success": False, "error": "hook URL is required"} return {"success": False, "error": "hook URL is required"}
# Extract agent name from hook URL path or use a default
# The agent identifies itself - we'll use the hook URL's hostname as name if not provided
# Or we can require a "name" field
name = data.get("name") name = data.get("name")
if not name: if not name:
# Try to extract from hook URL
import urllib.parse import urllib.parse
try: try:
parsed = urllib.parse.urlparse(hook) parsed = urllib.parse.urlparse(hook)
name = parsed.netloc.split('.')[0] # e.g., "lisa" from "lisa.nexlab.net" name = parsed.netloc.split('.')[0]
except: except:
name = "unknown" name = "unknown"
# Ensure HTTPS
if not hook.startswith("https://"): if not hook.startswith("https://"):
hook = hook.replace("http://", "https://") hook = hook.replace("http://", "https://")
async with aiosqlite.connect(DB_PATH) as db: async with aiosqlite.connect(DB_PATH) as db:
await db.execute( await db.execute("""INSERT OR REPLACE INTO agents (name, hook, token, capability_prompt, skill_prompt, registered_at) VALUES (?, ?, ?, ?, ?, ?)""",
"""INSERT OR REPLACE INTO agents
(name, hook, token, capability_prompt, skill_prompt, registered_at)
VALUES (?, ?, ?, ?, ?, ?)""",
(name, hook, token, capability_prompt, skill_prompt, datetime.now().isoformat())) (name, hook, token, capability_prompt, skill_prompt, datetime.now().isoformat()))
await db.commit() await db.commit()
await load_agents() await load_agents()
return {"success": True, "agent": name, "hook": hook} return {"success": True, "agent": name, "hook": hook}
@app.post("/tools/post_job") @app.post("/tools/post_job")
async def tool_post_job(data: dict, authorization: str = Header(None)): async def tool_post_job(data: dict, authorization: str = Header(None)):
"""Post a new job"""
verify_token(authorization) verify_token(authorization)
sender = data.get("sender") sender = data.get("sender")
target_agent = data.get("target_agent") target_agent = data.get("target_agent")
title = data.get("title") title = data.get("title")
...@@ -345,27 +309,17 @@ async def tool_post_job(data: dict, authorization: str = Header(None)): ...@@ -345,27 +309,17 @@ async def tool_post_job(data: dict, authorization: str = Header(None)):
first_retry = (datetime.now() + timedelta(minutes=RETRY_INTERVALS[0])).isoformat() first_retry = (datetime.now() + timedelta(minutes=RETRY_INTERVALS[0])).isoformat()
async with aiosqlite.connect(DB_PATH) as db: async with aiosqlite.connect(DB_PATH) as db:
await db.execute( await db.execute("INSERT INTO jobs VALUES (?, ?, ?, ?, ?, 'pending', NULL, ?, ?, 0, ?)",
"INSERT INTO jobs VALUES (?, ?, ?, ?, ?, 'pending', NULL, ?, ?, 0, ?)",
(job_id, sender, target_agent, title, description, now, now, first_retry)) (job_id, sender, target_agent, title, description, now, now, first_retry))
await db.commit() await db.commit()
# Notify target agent via webhook (HTTPS) await notify_hook(target_agent, "new_job", {"job_id": job_id, "sender": sender, "title": title, "description": description})
await notify_hook(target_agent, "new_job", {
"job_id": job_id,
"sender": sender,
"title": title,
"description": description
})
return {"success": True, "job_id": job_id} return {"success": True, "job_id": job_id}
@app.post("/tools/claim_job") @app.post("/tools/claim_job")
async def tool_claim_job(data: dict, authorization: str = Header(None)): async def tool_claim_job(data: dict, authorization: str = Header(None)):
"""Claim a job"""
verify_token(authorization) verify_token(authorization)
job_id = data.get("job_id") job_id = data.get("job_id")
agent = data.get("agent") agent = data.get("agent")
...@@ -378,30 +332,23 @@ async def tool_claim_job(data: dict, authorization: str = Header(None)): ...@@ -378,30 +332,23 @@ async def tool_claim_job(data: dict, authorization: str = Header(None)):
if not row: if not row:
return {"success": False, "error": "Job not found"} return {"success": False, "error": "Job not found"}
if row[0] != "pending": if row[0] != "pending":
return {"success": False, "error": f"Job is {row[0]}"} return {"success": False, "error": f"Job is {row[0]}"}
if row[1] != agent: if row[1] != agent:
return {"success": False, "error": "Job is for a different agent"} return {"success": False, "error": "Job is for a different agent"}
await db.execute("UPDATE jobs SET status='claimed', updated_at=?, next_retry_at=NULL WHERE id=?", await db.execute("UPDATE jobs SET status='claimed', updated_at=?, next_retry_at=NULL WHERE id=?", (datetime.now().isoformat(), job_id))
(datetime.now().isoformat(), job_id))
await db.commit() await db.commit()
# Notify sender
sender = row[2] sender = row[2]
if sender: if sender:
await notify_hook(sender, "job_claimed", {"job_id": job_id, "agent": agent, "title": "Job claimed"}) await notify_hook(sender, "job_claimed", {"job_id": job_id, "agent": agent})
return {"success": True} return {"success": True}
@app.post("/tools/update_job_status") @app.post("/tools/update_job_status")
async def tool_update_job_status(data: dict, authorization: str = Header(None)): async def tool_update_job_status(data: dict, authorization: str = Header(None)):
"""Update job status"""
verify_token(authorization) verify_token(authorization)
job_id = data.get("job_id") job_id = data.get("job_id")
status = data.get("status") status = data.get("status")
agent = data.get("agent") agent = data.get("agent")
...@@ -416,31 +363,21 @@ async def tool_update_job_status(data: dict, authorization: str = Header(None)): ...@@ -416,31 +363,21 @@ async def tool_update_job_status(data: dict, authorization: str = Header(None)):
if not row: if not row:
return {"success": False, "error": "Job not found"} return {"success": False, "error": "Job not found"}
if row[1] != agent: if row[1] != agent:
return {"success": False, "error": "Job is for a different agent"} return {"success": False, "error": "Job is for a different agent"}
await db.execute("UPDATE jobs SET status=?, result=?, updated_at=? WHERE id=?", await db.execute("UPDATE jobs SET status=?, result=?, updated_at=? WHERE id=?", (status, result, datetime.now().isoformat(), job_id))
(status, result, datetime.now().isoformat(), job_id))
await db.commit() await db.commit()
# Notify sender
sender = row[2] sender = row[2]
if sender: if sender:
await notify_hook(sender, "job_status_changed", { await notify_hook(sender, "job_status_changed", {"job_id": job_id, "status": status, "result": result})
"job_id": job_id,
"status": status,
"result": result
})
return {"success": True} return {"success": True}
@app.get("/tools/list_jobs") @app.get("/tools/list_jobs")
async def tool_list_jobs(agent: str = None, status: str = None, authorization: str = Header(None)): async def tool_list_jobs(agent: str = None, status: str = None, authorization: str = Header(None)):
"""List jobs"""
verify_token(authorization) verify_token(authorization)
q, p = "SELECT * FROM jobs WHERE 1=1", [] q, p = "SELECT * FROM jobs WHERE 1=1", []
if agent: if agent:
q += " AND (target_agent=? OR sender=?)" q += " AND (target_agent=? OR sender=?)"
...@@ -454,23 +391,91 @@ async def tool_list_jobs(agent: str = None, status: str = None, authorization: s ...@@ -454,23 +391,91 @@ async def tool_list_jobs(agent: str = None, status: str = None, authorization: s
db.row_factory = sqlite3.Row db.row_factory = sqlite3.Row
cur = await db.execute(q, p) cur = await db.execute(q, p)
rows = await cur.fetchall() rows = await cur.fetchall()
return {"jobs": [dict(r) for r in rows]} return {"jobs": [dict(r) for r in rows]}
@app.get("/tools/get_job") @app.get("/tools/get_job")
async def tool_get_job(job_id: str, authorization: str = Header(None)): async def tool_get_job(job_id: str, authorization: str = Header(None)):
"""Get job details"""
verify_token(authorization) verify_token(authorization)
async with aiosqlite.connect(DB_PATH) as db: async with aiosqlite.connect(DB_PATH) as db:
db.row_factory = sqlite3.Row db.row_factory = sqlite3.Row
cur = await db.execute("SELECT * FROM jobs WHERE id = ?", (job_id,)) cur = await db.execute("SELECT * FROM jobs WHERE id = ?", (job_id,))
row = await cur.fetchone() row = await cur.fetchone()
return {"job": dict(row)} if row else {"error": "Not found"} return {"job": dict(row)} if row else {"error": "Not found"}
# ============================================================
# HTTP WEBHOOK ENDPOINTS (mirror MCP tools)
# These replicate all MCP actions over HTTP POST
# ============================================================
@app.post("/hook/register")
async def http_register(request: Request, authorization: str = Header(None)):
"""HTTP webhook for register - mirrors /tools/register"""
verify_token(authorization)
data = await request.json()
return await tool_register(data, authorization)
@app.post("/hook/list_hosts")
async def http_list_hosts(authorization: str = Header(None)):
"""HTTP webhook for list_hosts - mirrors /tools/list_hosts"""
verify_token(authorization)
return await list_hosts(authorization)
@app.post("/hook/get_host_info")
async def http_get_host_info(request: Request, authorization: str = Header(None)):
"""HTTP webhook for get_host_info - mirrors /tools/get_host_info"""
verify_token(authorization)
data = await request.json()
name = data.get("name")
return await get_host_info(name, authorization)
@app.post("/hook/post_job")
async def http_post_job(request: Request, authorization: str = Header(None)):
"""HTTP webhook for post_job - mirrors /tools/post_job"""
verify_token(authorization)
data = await request.json()
return await tool_post_job(data, authorization)
@app.post("/hook/claim_job")
async def http_claim_job(request: Request, authorization: str = Header(None)):
"""HTTP webhook for claim_job - mirrors /tools/claim_job"""
verify_token(authorization)
data = await request.json()
return await tool_claim_job(data, authorization)
@app.post("/hook/update_job_status")
async def http_update_job_status(request: Request, authorization: str = Header(None)):
"""HTTP webhook for update_job_status - mirrors /tools/update_job_status"""
verify_token(authorization)
data = await request.json()
return await tool_update_job_status(data, authorization)
@app.post("/hook/list_jobs")
async def http_list_jobs(request: Request, authorization: str = Header(None)):
"""HTTP webhook for list_jobs - mirrors /tools/list_jobs"""
verify_token(authorization)
data = await request.json()
agent = data.get("agent")
status = data.get("status")
return await tool_list_jobs(agent, status, authorization)
@app.post("/hook/get_job")
async def http_get_job(request: Request, authorization: str = Header(None)):
"""HTTP webhook for get_job - mirrors /tools/get_job"""
verify_token(authorization)
data = await request.json()
job_id = data.get("job_id")
return await tool_get_job(job_id, authorization)
@app.get("/health") @app.get("/health")
async def health(): async def health():
return {"status": "ok", "agents": len(AGENTS)} return {"status": "ok", "agents": len(AGENTS)}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment