Commit 1c8fb397 authored by Lisa (AI Assistant)'s avatar Lisa (AI Assistant)

Add reject_job tool and HTTP endpoint with reason field

parent a1b7e9cc
Pipeline #279 canceled with stages
......@@ -2,8 +2,6 @@
"""
ClawPhone - Centralized Job Queue MCP Server for OpenClaw Agents
All webhooks use HTTPS with verify=False
HTTP endpoints mirror MCP tools at /hook/<tool_name>
"""
import os
......@@ -56,7 +54,7 @@ async def init_db():
registered_at TEXT NOT NULL
)
""")
# Jobs table
# Jobs table - added reason field
await db.execute("""
CREATE TABLE IF NOT EXISTS jobs (
id TEXT PRIMARY KEY,
......@@ -66,6 +64,7 @@ async def init_db():
description TEXT,
status TEXT DEFAULT 'pending',
result TEXT,
reason TEXT,
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL,
retry_count INTEGER DEFAULT 0,
......@@ -234,6 +233,8 @@ async def list_tools(authorization: str = Header(None)):
"inputSchema": {"type": "object", "properties": {"sender": {}, "target_agent": {}, "title": {}, "description": {}}, "required": ["sender", "target_agent", "title"]}},
{"name": "claim_job", "description": "Claim a pending job",
"inputSchema": {"type": "object", "properties": {"job_id": {}, "agent": {}}, "required": ["job_id", "agent"]}},
{"name": "reject_job", "description": "Reject a pending job with a reason",
"inputSchema": {"type": "object", "properties": {"job_id": {}, "agent": {}, "reason": {}}, "required": ["job_id", "agent", "reason"]}},
{"name": "update_job_status", "description": "Update job status",
"inputSchema": {"type": "object", "properties": {"job_id": {}, "status": {}, "agent": {}, "result": {}}, "required": ["job_id", "status", "agent"]}},
{"name": "list_jobs", "description": "List jobs",
......@@ -309,7 +310,7 @@ async def tool_post_job(data: dict, authorization: str = Header(None)):
first_retry = (datetime.now() + timedelta(minutes=RETRY_INTERVALS[0])).isoformat()
async with aiosqlite.connect(DB_PATH) as db:
await db.execute("INSERT INTO jobs VALUES (?, ?, ?, ?, ?, 'pending', NULL, ?, ?, 0, ?)",
await db.execute("INSERT INTO jobs VALUES (?, ?, ?, ?, ?, 'pending', NULL, NULL, ?, ?, 0, ?)",
(job_id, sender, target_agent, title, description, now, now, first_retry))
await db.commit()
......@@ -346,6 +347,41 @@ async def tool_claim_job(data: dict, authorization: str = Header(None)):
return {"success": True}
@app.post("/tools/reject_job")
async def tool_reject_job(data: dict, authorization: str = Header(None)):
"""Reject a pending job with a reason"""
verify_token(authorization)
job_id = data.get("job_id")
agent = data.get("agent")
reason = data.get("reason", "")
if not job_id or not agent:
return {"success": False, "error": "job_id and agent are required"}
if not reason:
return {"success": False, "error": "reason is required when rejecting a job"}
async with aiosqlite.connect(DB_PATH) as db:
cur = await db.execute("SELECT status, target_agent, sender FROM jobs WHERE id = ?", (job_id,))
row = await cur.fetchone()
if not row:
return {"success": False, "error": "Job not found"}
if row[0] != "pending":
return {"success": False, "error": f"Job is {row[0]}, can only reject pending jobs"}
if row[1] != agent:
return {"success": False, "error": "Job is for a different agent"}
await db.execute("UPDATE jobs SET status='rejected', reason=?, updated_at=? WHERE id=?", (reason, datetime.now().isoformat(), job_id))
await db.commit()
# Notify sender about rejection
sender = row[2]
if sender:
await notify_hook(sender, "job_rejected", {"job_id": job_id, "agent": agent, "reason": reason})
return {"success": True}
@app.post("/tools/update_job_status")
async def tool_update_job_status(data: dict, authorization: str = Header(None)):
verify_token(authorization)
......@@ -406,12 +442,10 @@ async def tool_get_job(job_id: str, authorization: str = Header(None)):
# ============================================================
# HTTP WEBHOOK ENDPOINTS (mirror MCP tools)
# These replicate all MCP actions over HTTP POST
# ============================================================
@app.post("/hook/register")
async def http_register(request: Request, authorization: str = Header(None)):
"""HTTP webhook for register - mirrors /tools/register"""
verify_token(authorization)
data = await request.json()
return await tool_register(data, authorization)
......@@ -419,14 +453,12 @@ async def http_register(request: Request, authorization: str = Header(None)):
@app.post("/hook/list_hosts")
async def http_list_hosts(authorization: str = Header(None)):
"""HTTP webhook for list_hosts - mirrors /tools/list_hosts"""
verify_token(authorization)
return await list_hosts(authorization)
@app.post("/hook/get_host_info")
async def http_get_host_info(request: Request, authorization: str = Header(None)):
"""HTTP webhook for get_host_info - mirrors /tools/get_host_info"""
verify_token(authorization)
data = await request.json()
name = data.get("name")
......@@ -435,7 +467,6 @@ async def http_get_host_info(request: Request, authorization: str = Header(None)
@app.post("/hook/post_job")
async def http_post_job(request: Request, authorization: str = Header(None)):
"""HTTP webhook for post_job - mirrors /tools/post_job"""
verify_token(authorization)
data = await request.json()
return await tool_post_job(data, authorization)
......@@ -443,15 +474,21 @@ async def http_post_job(request: Request, authorization: str = Header(None)):
@app.post("/hook/claim_job")
async def http_claim_job(request: Request, authorization: str = Header(None)):
"""HTTP webhook for claim_job - mirrors /tools/claim_job"""
verify_token(authorization)
data = await request.json()
return await tool_claim_job(data, authorization)
@app.post("/hook/reject_job")
async def http_reject_job(request: Request, authorization: str = Header(None)):
"""HTTP webhook for reject_job"""
verify_token(authorization)
data = await request.json()
return await tool_reject_job(data, authorization)
@app.post("/hook/update_job_status")
async def http_update_job_status(request: Request, authorization: str = Header(None)):
"""HTTP webhook for update_job_status - mirrors /tools/update_job_status"""
verify_token(authorization)
data = await request.json()
return await tool_update_job_status(data, authorization)
......@@ -459,7 +496,6 @@ async def http_update_job_status(request: Request, authorization: str = Header(N
@app.post("/hook/list_jobs")
async def http_list_jobs(request: Request, authorization: str = Header(None)):
"""HTTP webhook for list_jobs - mirrors /tools/list_jobs"""
verify_token(authorization)
data = await request.json()
agent = data.get("agent")
......@@ -469,7 +505,6 @@ async def http_list_jobs(request: Request, authorization: str = Header(None)):
@app.post("/hook/get_job")
async def http_get_job(request: Request, authorization: str = Header(None)):
"""HTTP webhook for get_job - mirrors /tools/get_job"""
verify_token(authorization)
data = await request.json()
job_id = data.get("job_id")
......
......@@ -38,13 +38,20 @@ mcporter call clawphone --tool post_job --params '{
"description": "Run uptime on ganeti1"
}'
# Claim a job
# Claim a job (accept it)
mcporter call clawphone --tool claim_job --params '{
"job_id": "uuid-here",
"agent": "nimpho"
}'
# Update job status
# Reject a job with a reason
mcporter call clawphone --tool reject_job --params '{
"job_id": "uuid-here",
"agent": "nimpho",
"reason": "Cannot perform this action - insufficient permissions"
}'
# Update job status (working/done/failed)
mcporter call clawphone --tool update_job_status --params '{
"job_id": "uuid-here",
"status": "done",
......@@ -106,6 +113,12 @@ curl -k -X POST "$CLAWPHONE_URL/hook/claim_job" \
-H "Authorization: Bearer $TOKEN" \
-d '{"job_id": "uuid-here", "agent": "nimpho"}'
# Reject a job with a reason
curl -k -X POST "$CLAWPHONE_URL/hook/reject_job" \
-H "Content-Type: application/json" \
-H "Authorization: Bearer $TOKEN" \
-d '{"job_id": "uuid-here", "agent": "nimpho", "reason": "Cannot perform - insufficient permissions"}'
# Update job status
curl -k -X POST "$CLAWPHONE_URL/hook/update_job_status" \
-H "Content-Type: application/json" \
......@@ -134,10 +147,34 @@ curl -k -X POST "$CLAWPHONE_URL/hook/get_job" \
| `get_host_info` | Get details about a specific agent | `/hook/get_host_info` |
| `post_job` | Post a job for another agent | `/hook/post_job` |
| `claim_job` | Claim a pending job | `/hook/claim_job` |
| `reject_job` | Reject a pending job with a reason | `/hook/reject_job` |
| `update_job_status` | Update job status (working/done/failed) | `/hook/update_job_status` |
| `list_jobs` | List jobs (filter by agent/status) | `/hook/list_jobs` |
| `get_job` | Get job details | `/hook/get_job` |
## Job Status Flow
```
pending → claimed → done
rejected (with reason)
failed (after max retries)
```
## Webhook Events
When a job is rejected, the sender receives a `job_rejected` webhook:
```json
{
"event": "job_rejected",
"agent": "nimpho",
"job_id": "uuid-here",
"reason": "Cannot perform - insufficient permissions",
"timestamp": "2026-03-12T21:00:00"
}
```
## Notes
- All HTTPS endpoints use `verify=False` (self-signed certs)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment