feat: expand CoderAI broker protocol support

parent 42b07048
......@@ -1370,16 +1370,18 @@ AISBF now includes an AISBF-side broker for NAT-friendly CoderAI integration.
- `GET /api/coderai/broker/sessions` - list active broker sessions
- `GET /api/coderai/broker/providers/{provider_id}/status` - inspect active session status for a provider/client
- `WS /api/coderai/broker/ws` - persistent outbound WebSocket endpoint used by CoderAI behind NAT
- `WS /api/coderai/wss` - persistent outbound WebSocket endpoint for global-scope CoderAI clients
- `WS /api/u/{username}/coderai/wss` - persistent outbound WebSocket endpoint for user-owned CoderAI clients
### Broker Request Flow
1. CoderAI opens outbound WebSocket to AISBF broker
2. AISBF registers a live session keyed by `provider_id` and `client_id`
2. AISBF registers a scoped live session keyed by `provider_id` and `client_id`, plus `username` / `global` scope
3. CoderAI sends `register` metadata/capabilities over that socket
4. `CoderAIProviderHandler` prefers the live broker session when `coderai_config.broker_preferred=true`
5. AISBF forwards `models.list`, `chat.completions`, `capabilities`, `register`, and `proxy` operations through the active session
6. Response envelopes are correlated by `request_id` and returned to the original AISBF request
5. In multi-node deployments, AISBF forwards `models.list`, `chat.completions`, `capabilities`, `register`, and `proxy` operations through cache-backed broker queues keyed by the active session id
6. The AISBF node holding the actual WebSocket consumes queued requests, sends them to CoderAI, and publishes replies back through cache-backed reply keys
7. Response envelopes are correlated by `request_id` and returned to the original AISBF request from any AISBF node
### Configuration Notes
......@@ -1408,7 +1410,7 @@ Token rotation is performed inline from the provider editor and updates the owni
### Important Limitation
Active broker routing still requires a live WebSocket, but AISBF now persists broker session metadata snapshots in `~/.aisbf/coderai_broker_sessions.json` so the dashboard can show last-known broker status after restart. Remote CoderAI must still reconnect before requests can be routed again.
Active broker routing still requires a live WebSocket, but AISBF now persists broker session metadata snapshots in `~/.aisbf/coderai_broker_sessions.json` and mirrors live broker state into the configured cache backend so clustered AISBF nodes can route through one connected broker client. Remote CoderAI must still reconnect before requests can be routed again after a total disconnect.
### Security Considerations
......
include README.md
include LICENSE.txt
include DOCUMENTATION.md
include AI.PROMPT
include requirements.txt
include main.py
include pyproject.toml
include aisbf.sh
include cli.py
recursive-include docs *.md
recursive-include config *.json
recursive-include config *.md
recursive-include aisbf *.py
......@@ -17,4 +19,4 @@ recursive-include templates *.js
recursive-include static *.zip
recursive-include static *.js
recursive-include static/i18n *.json
recursive-include static/extension *.js *.json *.html *.md *.png *.svg
\ No newline at end of file
recursive-include static/extension *.js *.json *.html *.md *.png *.svg
......@@ -52,6 +52,7 @@ def _cache_decode(data: bytes) -> any:
from typing import Any, Optional, Dict, List
from pathlib import Path
import time
import uuid
logger = logging.getLogger(__name__)
......@@ -850,6 +851,94 @@ class CacheManager:
"""Clear all cache entries"""
self.backend.clear()
def broker_backend_type(self) -> str:
return self.cache_type.lower()
def broker_supports_distributed(self) -> bool:
return self.broker_backend_type() in {'redis', 'sqlite', 'mysql'}
def broker_key(self, suffix: str) -> str:
prefix = self.config.get('redis_key_prefix', 'aisbf:') if isinstance(self.config, dict) else 'aisbf:'
return f"{prefix}broker:{suffix}"
def broker_set(self, key: str, value: Any, ttl: Optional[int] = None) -> None:
self.set(self.broker_key(key), value, ttl=ttl)
def broker_get(self, key: str) -> Optional[Any]:
return self.get(self.broker_key(key))
def broker_delete(self, key: str) -> None:
self.delete(self.broker_key(key))
def broker_blocking_pop(self, key: str, timeout: int = 1) -> Optional[Any]:
backend_type = self.broker_backend_type()
if backend_type == 'redis' and hasattr(self.backend, 'redis'):
item = self.backend.redis.blpop(self.broker_key(key), timeout=timeout)
if not item:
return None
return _cache_decode(item[1])
deadline = time.time() + max(timeout, 0)
while True:
item = self.broker_pop_nowait(key)
if item is not None:
return item
if time.time() >= deadline:
return None
time.sleep(0.1)
def broker_push(self, key: str, value: Any, ttl: Optional[int] = None) -> None:
backend_type = self.broker_backend_type()
if backend_type == 'redis' and hasattr(self.backend, 'redis'):
redis_key = self.broker_key(key)
self.backend.redis.rpush(redis_key, _cache_encode(value))
if ttl:
self.backend.redis.expire(redis_key, ttl)
return
queue = self.broker_get(key) or []
if not isinstance(queue, list):
queue = []
queue.append(value)
self.broker_set(key, queue, ttl=ttl)
def broker_pop_nowait(self, key: str) -> Optional[Any]:
backend_type = self.broker_backend_type()
if backend_type == 'redis' and hasattr(self.backend, 'redis'):
item = self.backend.redis.lpop(self.broker_key(key))
if item is None:
return None
return _cache_decode(item)
queue = self.broker_get(key)
if not isinstance(queue, list) or not queue:
return None
item = queue.pop(0)
if queue:
self.broker_set(key, queue)
else:
self.broker_delete(key)
return item
def broker_list_keys(self, pattern: str) -> list[str]:
backend_type = self.broker_backend_type()
full_pattern = self.broker_key(pattern)
if backend_type == 'redis' and hasattr(self.backend, 'redis'):
keys = self.backend.redis.keys(full_pattern)
decoded = []
for key in keys:
if isinstance(key, bytes):
key = key.decode('utf-8')
decoded.append(str(key))
return decoded
return []
def broker_node_id(self) -> str:
key = '__broker_node_id__'
existing = self.broker_get(key)
if existing:
return str(existing)
node_id = str(uuid.uuid4())
self.broker_set(key, node_id)
return node_id
# Numpy-specific methods
def save_numpy_array(self, key: str, array: Any, metadata: Optional[Dict] = None) -> None:
"""Save numpy array (fallback to file-based even with Redis)"""
......@@ -1676,4 +1765,4 @@ def initialize_response_cache(config: Optional[Dict] = None):
"""Initialize the response cache system"""
global _response_cache
_response_cache = ResponseCache(config)
logger.info("Response cache initialized")
\ No newline at end of file
logger.info("Response cache initialized")
This diff is collapsed.
......@@ -47,23 +47,51 @@ def _load_user_provider_config(user_id: int, provider_id: str) -> Optional[Dict[
return None
def resolve_coderai_provider_owner(provider_id: str) -> Tuple[Optional[int], Optional[Dict[str, Any]]]:
global_config = _load_global_provider_config(provider_id)
if global_config and global_config.get('type') == 'coderai':
return None, global_config
def _iter_user_coderai_matches(provider_id: str) -> list[Tuple[int, Dict[str, Any]]]:
try:
db = DatabaseRegistry.get_config_database()
if not db:
return None, None
return []
matches: list[Tuple[int, Dict[str, Any]]] = []
for row in db.get_all_user_providers():
if row.get('provider_id') != provider_id:
continue
provider_config = row.get('config')
if isinstance(provider_config, dict) and provider_config.get('type') == 'coderai':
return row.get('user_id'), provider_config
user_id = row.get('user_id')
if user_id is not None:
matches.append((user_id, provider_config))
return matches
except Exception as e:
logger.debug(f"Failed to resolve user owner for provider={provider_id}: {e}")
return []
def resolve_coderai_provider_owner(provider_id: str, username: Optional[str] = None) -> Tuple[Optional[int], Optional[Dict[str, Any]]]:
user_matches = _iter_user_coderai_matches(provider_id)
if username and username != 'global':
try:
db = DatabaseRegistry.get_config_database()
user = db.get_user_by_username(username) if db else None
if user:
requested_user_id = user.get('id')
for user_id, provider_config in user_matches:
if user_id == requested_user_id:
return user_id, provider_config
return None, None
except Exception as e:
logger.debug(f"Failed to resolve user by username for provider={provider_id} username={username}: {e}")
return None, None
if user_matches:
if len(user_matches) == 1:
return user_matches[0]
logger.warning(f"Ambiguous user-scoped coderai provider owner for provider_id={provider_id}; refusing global fallback")
return None, None
global_config = _load_global_provider_config(provider_id)
if global_config and global_config.get('type') == 'coderai':
return None, global_config
return None, None
......@@ -80,8 +108,8 @@ def resolve_coderai_provider_for_user(user_id: Optional[int], provider_id: str)
return None
def resolve_coderai_registration(provider_id: str) -> Tuple[Optional[int], Optional[Dict[str, Any]], Optional[str]]:
owner_user_id, provider_config = resolve_coderai_provider_owner(provider_id)
def resolve_coderai_registration(provider_id: str, username: Optional[str] = None) -> Tuple[Optional[int], Optional[Dict[str, Any]], Optional[str]]:
owner_user_id, provider_config = resolve_coderai_provider_owner(provider_id, username=username)
if not provider_config:
return None, None, None
coderai_config = provider_config.get('coderai_config') or {}
......@@ -91,8 +119,8 @@ def resolve_coderai_registration(provider_id: str) -> Tuple[Optional[int], Optio
return owner_user_id, provider_config, registration_token
def validate_coderai_registration_token(provider_id: str, presented_token: Optional[str]) -> Tuple[bool, Optional[int], Optional[Dict[str, Any]], Optional[str]]:
owner_user_id, provider_config, expected_token = resolve_coderai_registration(provider_id)
def validate_coderai_registration_token(provider_id: str, presented_token: Optional[str], username: Optional[str] = None) -> Tuple[bool, Optional[int], Optional[Dict[str, Any]], Optional[str]]:
owner_user_id, provider_config, expected_token = resolve_coderai_registration(provider_id, username=username)
if not provider_config:
return False, None, None, 'Provider not found or not a coderai provider'
if expected_token and presented_token != expected_token:
......
......@@ -23,6 +23,7 @@ Why did the programmer quit his job? Because he didn't get arrays!
Request handlers for AISBF.
"""
import asyncio
import base64
import re
import uuid
import hashlib
......@@ -33,7 +34,7 @@ from concurrent.futures import ThreadPoolExecutor
from typing import Dict, List, Optional, Union
from pathlib import Path
from fastapi import HTTPException, Request
from fastapi.responses import JSONResponse, StreamingResponse
from fastapi.responses import JSONResponse, StreamingResponse, Response
from .models import ChatCompletionRequest, ChatCompletionResponse
from .providers import get_provider_handler, RateLimitError
from .config import config
......@@ -1744,6 +1745,31 @@ class RequestHandler:
import logging
logger = logging.getLogger(__name__)
body = self._adapt_studio_workflow_payload(provider_id, endpoint_path, body)
incoming_headers = {
key: value for key, value in request.headers.items()
if key.lower() not in {"host", "content-length", "connection"}
}
query_params = dict(request.query_params)
content_type = request.headers.get('content-type', '')
multipart_payload = None
if 'multipart/form-data' in content_type.lower():
form = await request.form()
fields = []
files = []
for key, value in form.multi_items():
filename = getattr(value, 'filename', None)
if filename is not None:
content = await value.read()
files.append({
'name': key,
'filename': filename,
'content_type': getattr(value, 'content_type', None),
'data_base64': base64.b64encode(content).decode('ascii'),
})
else:
fields.append({'name': key, 'value': str(value)})
multipart_payload = {'fields': fields, 'files': files}
# Support user-defined providers (dict format) and global providers (object format)
if self.user_id and provider_id in self.user_providers:
......@@ -1773,13 +1799,50 @@ class RequestHandler:
logger.info(f"Generic proxy [{method}]: {provider_id} -> {url}")
try:
provider_type = provider_config.get('type') if isinstance(provider_config, dict) else getattr(provider_config, 'type', None)
if provider_type == 'coderai':
provider_handler = get_provider_handler(provider_id, config_api_key, user_id=self.user_id)
wants_stream = 'text/event-stream' in request.headers.get('accept', '').lower() or endpoint_path in {"v1/audio/progress", "v1/video/progress", "v1/images/progress"}
status_code, payload = await provider_handler.proxy_native_request(
endpoint_path,
body,
method=method,
headers=incoming_headers,
query_params=query_params,
content_type=content_type,
multipart=multipart_payload,
stream=wants_stream,
)
response_headers = payload.get('headers') or {}
if payload.get('stream_chunks'):
media_type = payload.get('content_type') or 'text/event-stream'
async def iter_stream():
for chunk in payload.get('stream_chunks', []):
yield base64.b64decode(chunk)
return StreamingResponse(iter_stream(), status_code=status_code, media_type=media_type, headers=response_headers)
if payload.get('body_base64'):
media_type = payload.get('content_type') or 'application/octet-stream'
return Response(content=base64.b64decode(payload['body_base64']), status_code=status_code, media_type=media_type, headers=response_headers)
return JSONResponse(status_code=status_code, content=payload.get('body', payload), headers=response_headers)
async with httpx.AsyncClient(timeout=300) as client:
if method == "GET":
resp = await client.get(url, headers=headers)
resp = await client.get(url, headers=headers, params=query_params)
elif method == "DELETE":
resp = await client.delete(url, headers=headers)
resp = await client.delete(url, headers=headers, params=query_params)
else:
resp = await client.post(url, json=body, headers=headers)
if multipart_payload is not None:
files = []
data = []
for field in multipart_payload['fields']:
data.append((field['name'], field['value']))
for file_entry in multipart_payload['files']:
files.append((file_entry['name'], (file_entry['filename'], base64.b64decode(file_entry['data_base64']), file_entry.get('content_type') or 'application/octet-stream')))
resp = await client.post(url, data=data, files=files, headers={k: v for k, v in headers.items() if k.lower() != 'content-type'}, params=query_params)
else:
resp = await client.post(url, json=body, headers=headers, params=query_params)
try:
content = resp.json()
except Exception:
......
......@@ -28,6 +28,7 @@ import json
import logging
import time
from typing import Any, AsyncIterator, Dict, List, Optional, Tuple, Union
import base64
from urllib.parse import urlparse
import httpx
......@@ -36,6 +37,7 @@ from openai import OpenAI
from ..coderai_broker import broker as coderai_broker
from ..config import config
from ..models import Model
from ..app.templates import get_base_url
from .base import AISBF_DEBUG, BaseProviderHandler
......@@ -52,8 +54,10 @@ class CoderAIProviderHandler(BaseProviderHandler):
self._coderai_config = self._resolve_coderai_config()
self._transport = str(self._coderai_config.get("transport") or self._infer_transport(self._raw_endpoint)).lower()
self._client_id = self._coderai_config.get("client_id") or provider_id
self._username = self._coderai_config.get("username") or ("global" if user_id is None else None)
self._bridge_path = str(self._coderai_config.get("bridge_path") or "/coderai/ws").strip() or "/coderai/ws"
self._registration_path = str(self._coderai_config.get("registration_path") or "/coderai/register").strip() or "/coderai/register"
self._broker_ws_path = str(self._coderai_config.get("broker_ws_path") or "/api/coderai/wss").strip() or "/api/coderai/wss"
self._request_timeout = float(self._coderai_config.get("request_timeout") or 300.0)
self._model_timeout = float(self._coderai_config.get("model_timeout") or 30.0)
self._websocket_enabled = bool(self._coderai_config.get("websocket_enabled", True))
......@@ -159,6 +163,8 @@ class CoderAIProviderHandler(BaseProviderHandler):
"x-coderai-client-id": self._client_id,
"x-coderai-provider-id": self.provider_id,
}
if self._username:
headers["x-coderai-username"] = self._username
token = self._bridge_token or self._registration_token or self.api_key
if token:
headers["authorization"] = f"Bearer {token}"
......@@ -171,6 +177,8 @@ class CoderAIProviderHandler(BaseProviderHandler):
"X-CoderAI-Client-Id": self._client_id,
"X-CoderAI-Provider-Id": self.provider_id,
}
if self._username:
headers["X-CoderAI-Username"] = self._username
token = self._bridge_token or self._registration_token or self.api_key
if token:
headers["Authorization"] = f"Bearer {token}"
......@@ -211,21 +219,49 @@ class CoderAIProviderHandler(BaseProviderHandler):
status = message.get("status") or "ok"
if status == "error":
raise Exception(message.get("error") or "CoderAI broker bridge error")
payload_data = message.get("payload") or {}
chunk = payload_data.get("chunk")
if message.get("event") == "chunk" and chunk is not None:
if isinstance(chunk, str):
yield chunk.encode("utf-8")
elif isinstance(chunk, bytes):
yield chunk
elif isinstance(chunk, dict):
yield f"data: {json.dumps(chunk)}\n\n".encode("utf-8")
return
for item in payload_data.get("chunks", []):
if isinstance(item, str):
yield item.encode("utf-8")
elif isinstance(item, dict):
yield f"data: {json.dumps(item)}\n\n".encode("utf-8")
async for chunk in self._iter_broker_stream_chunks(message, timeout):
yield chunk
@staticmethod
def _decode_broker_chunk(chunk: Any) -> bytes:
if isinstance(chunk, bytes):
return chunk
if isinstance(chunk, str):
return chunk.encode("utf-8")
if isinstance(chunk, dict):
if isinstance(chunk.get("data_base64"), str):
return base64.b64decode(chunk["data_base64"])
if chunk.get("encoding") == "base64" and isinstance(chunk.get("data"), str):
return base64.b64decode(chunk["data"])
if "chunk" in chunk:
return CoderAIProviderHandler._decode_broker_chunk(chunk.get("chunk"))
return f"data: {json.dumps(chunk)}\n\n".encode("utf-8")
return str(chunk).encode("utf-8")
async def _iter_broker_stream_chunks(self, initial_message: Dict[str, Any], timeout: float) -> AsyncIterator[bytes]:
message = initial_message
while True:
status = message.get("status") or "ok"
if status == "error":
raise Exception(message.get("error") or "CoderAI broker bridge error")
event = message.get("event")
payload_data = message.get("payload") or {}
if event in {"chunk", "progress", "output", "log", "data"}:
chunk = payload_data.get("chunk", payload_data)
yield self._decode_broker_chunk(chunk)
elif isinstance(payload_data.get("chunks"), list):
for item in payload_data.get("chunks", []):
yield self._decode_broker_chunk(item)
if event in {None, "done", "completed"}:
return
next_request_id = message.get("request_id")
if not next_request_id:
return
message = await coderai_broker.wait_for_stream_event(next_request_id, timeout=timeout)
def validate_credentials(self) -> bool:
if self._transport == "websocket" and not self._websocket_enabled:
......@@ -478,23 +514,46 @@ class CoderAIProviderHandler(BaseProviderHandler):
return message.get("payload") or {}
return await self._http_json("POST", self._registration_path, payload, timeout=self._model_timeout)
async def proxy_native_request(self, endpoint_path: str, body: Optional[Dict[str, Any]] = None, method: str = "POST") -> Tuple[int, Dict[str, Any]]:
async def proxy_native_request(
self,
endpoint_path: str,
body: Optional[Dict[str, Any]] = None,
method: str = "POST",
headers: Optional[Dict[str, str]] = None,
query_params: Optional[Dict[str, Any]] = None,
content_type: Optional[str] = None,
multipart: Optional[Dict[str, Any]] = None,
stream: bool = False,
) -> Tuple[int, Dict[str, Any]]:
payload = {
"endpoint_path": endpoint_path,
"method": method.upper(),
"body": body or {},
"headers": headers or {},
"query_params": query_params or {},
}
if content_type:
payload["content_type"] = content_type
if multipart is not None:
payload["multipart"] = multipart
if stream:
payload["stream"] = True
if await self._use_broker():
if stream:
chunks = []
async for chunk in self._broker_stream("proxy", payload, timeout=self._request_timeout):
chunks.append(chunk)
return 200, {"stream_chunks": [base64.b64encode(chunk).decode("ascii") for chunk in chunks], "stream_encoding": "base64"}
message = await self._broker_request("proxy", payload, timeout=self._request_timeout)
if (message.get("status") or "ok") == "error":
raise Exception(message.get("error") or "CoderAI broker proxy request failed")
envelope = message.get("payload") or {}
return int(envelope.get("status_code") or 200), envelope.get("body") or {}
return int(envelope.get("status_code") or 200), envelope
if self._transport == "websocket":
message = await self._ws_roundtrip("proxy", payload, timeout=self._request_timeout)
if (message.get("status") or "ok") == "error":
raise Exception(message.get("error") or "CoderAI proxy request failed")
envelope = message.get("payload") or {}
return int(envelope.get("status_code") or 200), envelope.get("body") or {}
return int(envelope.get("status_code") or 200), envelope
response = await self._http_json(method.upper(), endpoint_path, body or {}, timeout=self._request_timeout)
return 200, response
......@@ -461,6 +461,18 @@ async def _generic_proxy(request: Request, body: dict, endpoint_path: str, metho
return await handler.handle_generic_proxy(request, provider_id, endpoint_path, body, method=method)
async def _generic_progress_proxy(request: Request, endpoint_path: str) -> JSONResponse:
user_id = getattr(request.state, 'user_id', None)
handler = _get_user_handler('request', user_id)
provider = request.query_params.get('provider', '').strip()
model = request.query_params.get('model', '').strip()
if not provider and model:
provider, _ = _resolve_provider(model, user_id=user_id, handler=handler)
if provider:
return await handler.handle_generic_proxy(request, provider, endpoint_path, {}, method="GET")
return {"active": False, "current": 0, "total": 0, "pct": 0, "elapsed": 0}
# ── Images ────────────────────────────────────────────────────────────────────
@router.post("/api/v1/images/edits")
......@@ -649,20 +661,20 @@ def _studio_scope(request: Request) -> tuple[str, Optional[int]]:
@router.get("/v1/audio/progress")
@router.get("/api/v1/audio/progress")
async def studio_audio_progress():
return {"active": False, "current": 0, "total": 0, "pct": 0, "elapsed": 0}
async def studio_audio_progress(request: Request):
return await _generic_progress_proxy(request, "v1/audio/progress")
@router.get("/v1/video/progress")
@router.get("/api/v1/video/progress")
async def studio_video_progress():
return {"active": False, "current": 0, "total": 0, "pct": 0, "elapsed": 0}
async def studio_video_progress(request: Request):
return await _generic_progress_proxy(request, "v1/video/progress")
@router.get("/v1/images/progress")
@router.get("/api/v1/images/progress")
async def studio_images_progress():
return {"active": False, "current": 0, "total": 0, "pct": 0, "elapsed": 0}
async def studio_images_progress(request: Request):
return await _generic_progress_proxy(request, "v1/images/progress")
@router.get("/v1/archive")
......
......@@ -3,6 +3,7 @@ from __future__ import annotations
import json
import logging
import time
import asyncio
from typing import Optional
from fastapi import APIRouter, HTTPException, Query, WebSocket, WebSocketDisconnect
......@@ -16,17 +17,18 @@ router = APIRouter()
logger = logging.getLogger(__name__)
@router.websocket("/api/coderai/broker/ws")
async def coderai_broker_websocket(websocket: WebSocket):
async def _coderai_broker_websocket_impl(websocket: WebSocket, scope_name: str):
provider_id = websocket.query_params.get("provider_id") or websocket.headers.get("x-coderai-provider-id") or "coderai"
client_id = websocket.query_params.get("client_id") or websocket.headers.get("x-coderai-client-id") or f"anon-{int(time.time())}"
username = websocket.query_params.get("username") or websocket.headers.get("x-coderai-username") or scope_name
presented_token = websocket.query_params.get("registration_token") or websocket.headers.get("x-coderai-registration-token")
valid, owner_user_id, provider_config, error = validate_coderai_registration_token(provider_id, presented_token)
valid, owner_user_id, provider_config, error = validate_coderai_registration_token(provider_id, presented_token, username=username)
if not valid:
await websocket.close(code=1008, reason=error or "registration rejected")
return
await websocket.accept()
session = await broker.register(websocket, provider_id, client_id, metadata={"source": "websocket", "owner_user_id": owner_user_id})
expected_scope = scope_name
session = await broker.register(websocket, provider_id, client_id, metadata={"source": "websocket", "owner_user_id": owner_user_id, "username": username, "scope_name": expected_scope, "proxy_scheme": websocket.url.scheme})
try:
await websocket.send_text(json.dumps({
......@@ -35,59 +37,71 @@ async def coderai_broker_websocket(websocket: WebSocket):
"session_id": session.session_id,
"provider_id": session.provider_id,
"client_id": session.client_id,
"username": username,
"scope_name": expected_scope,
"accepted": True,
}))
while True:
raw = await websocket.receive_text()
message = json.loads(raw)
op = message.get("op")
if op == "register":
payload = message.get("payload") or {}
payload_token = payload.get("registration_token") or message.get("registration_token")
if payload_token and payload_token != presented_token:
try:
raw = await asyncio.wait_for(websocket.receive_text(), timeout=1.0)
message = json.loads(raw)
op = message.get("op")
if op == "register":
payload = message.get("payload") or {}
payload_token = payload.get("registration_token") or message.get("registration_token")
if payload_token and payload_token != presented_token:
await websocket.send_text(json.dumps({
"v": 1,
"request_id": message.get("request_id"),
"status": "error",
"error": "Registration token mismatch",
}))
continue
capabilities = payload.get("capabilities") or message.get("capabilities") or {}
metadata = {
"endpoint": payload.get("endpoint"),
"transport": payload.get("transport"),
"studio_endpoints": payload.get("studio_endpoints") or [],
"owner_user_id": owner_user_id,
"username": username,
"scope_name": expected_scope,
"proxy_scheme": websocket.url.scheme,
}
await broker.touch(session.session_id, metadata=metadata, capabilities=capabilities)
await websocket.send_text(json.dumps({
"v": 1,
"request_id": message.get("request_id"),
"status": "error",
"error": "Registration token mismatch",
"status": "ok",
"payload": {
"accepted": True,
"session_id": session.session_id,
"provider_id": session.provider_id,
"client_id": session.client_id,
"owner_user_id": owner_user_id,
"username": username,
"scope_name": expected_scope,
"expires_at": int(time.time()) + 86400,
},
}))
continue
capabilities = payload.get("capabilities") or message.get("capabilities") or {}
metadata = {
"endpoint": payload.get("endpoint"),
"transport": payload.get("transport"),
"studio_endpoints": payload.get("studio_endpoints") or [],
"owner_user_id": owner_user_id,
}
await broker.touch(session.session_id, metadata=metadata, capabilities=capabilities)
await websocket.send_text(json.dumps({
"v": 1,
"request_id": message.get("request_id"),
"status": "ok",
"payload": {
"accepted": True,
"session_id": session.session_id,
"provider_id": session.provider_id,
"client_id": session.client_id,
"owner_user_id": owner_user_id,
"expires_at": int(time.time()) + 86400,
},
}))
continue
if op == "heartbeat":
await broker.touch(session.session_id, metadata=message.get("payload") or {})
await websocket.send_text(json.dumps({
"v": 1,
"request_id": message.get("request_id"),
"status": "ok",
"event": "heartbeat",
"payload": {"ts": int(time.time())},
}))
if op == "heartbeat":
await broker.touch(session.session_id, metadata=message.get("payload") or {})
await websocket.send_text(json.dumps({
"v": 1,
"request_id": message.get("request_id"),
"status": "ok",
"event": "heartbeat",
"payload": {"ts": int(time.time())},
}))
continue
await broker.touch(session.session_id)
await broker.publish_response(message)
except asyncio.TimeoutError:
queued = await broker.consume_request(session.session_id, timeout=1)
if queued is not None:
await websocket.send_text(json.dumps(queued))
await broker.touch(session.session_id, metadata={"proxy_scheme": websocket.url.scheme, "username": username, "scope_name": expected_scope})
continue
await broker.touch(session.session_id)
resolved = await broker.resolve_response(message)
if not resolved:
logger.debug(f"CoderAI broker received unmatched message from provider={provider_id} client={client_id}: {message.get('request_id')}")
except WebSocketDisconnect:
logger.info(f"CoderAI broker disconnected provider={provider_id} client={client_id}")
except Exception as e:
......@@ -97,6 +111,16 @@ async def coderai_broker_websocket(websocket: WebSocket):
await broker.fail_session_requests(session.session_id, f"CoderAI session '{session.session_id}' disconnected")
@router.websocket("/api/coderai/wss")
async def coderai_broker_websocket_global(websocket: WebSocket):
await _coderai_broker_websocket_impl(websocket, "global")
@router.websocket("/api/u/{username}/coderai/wss")
async def coderai_broker_websocket_user(websocket: WebSocket, username: str):
await _coderai_broker_websocket_impl(websocket, username)
@router.get("/api/coderai/broker/sessions")
async def coderai_broker_sessions():
return {"sessions": await broker.list_sessions()}
......
......@@ -469,6 +469,18 @@ async def _user_delete_proxy(request: Request, username: str, provider: str, end
return await handler.handle_generic_proxy(request, provider, endpoint_path, {}, method="DELETE")
async def _user_progress_proxy(request: Request, username: str, endpoint_path: str) -> JSONResponse:
user_id = _check_user_access(request, username)
handler = _get_user_handler('request', user_id)
provider = request.query_params.get('provider', '').strip()
model = request.query_params.get('model', '').strip()
if not provider and model:
provider, _ = parse_provider_from_model(model)
if provider:
return await handler.handle_generic_proxy(request, provider, endpoint_path, {}, method="GET")
return {"active": False, "current": 0, "total": 0, "pct": 0, "elapsed": 0}
# ── Audio ─────────────────────────────────────────────────────────────────────
@router.post("/api/u/{username}/audio/transcriptions")
......@@ -511,6 +523,11 @@ async def user_audio_clone(request: Request, username: str, body: dict):
async def user_audio_convert(request: Request, username: str, body: dict):
return await _user_generic_proxy(request, username, body, "v1/audio/convert")
@router.get("/api/u/{username}/audio/progress")
async def user_audio_progress(request: Request, username: str):
return await _user_progress_proxy(request, username, "v1/audio/progress")
@router.post("/api/u/{username}/audio/identify")
async def user_audio_identify(request: Request, username: str, body: dict):
return await _user_generic_proxy(request, username, body, "v1/audio/identify")
......@@ -615,6 +632,11 @@ async def user_image_from3d(request: Request, username: str, body: dict):
return await _user_generic_proxy(request, username, body, "v1/images/from3d")
@router.get("/api/u/{username}/images/progress")
async def user_images_progress(request: Request, username: str):
return await _user_progress_proxy(request, username, "v1/images/progress")
# ── Video ─────────────────────────────────────────────────────────────────────
@router.post("/api/u/{username}/video/generations")
......@@ -662,6 +684,11 @@ async def user_video_from3d(request: Request, username: str, body: dict):
return await _user_generic_proxy(request, username, body, "v1/video/from3d")
@router.get("/api/u/{username}/video/progress")
async def user_video_progress(request: Request, username: str):
return await _user_progress_proxy(request, username, "v1/video/progress")
# ── Embeddings ────────────────────────────────────────────────────────────────
@router.post("/api/u/{username}/embeddings")
......
This diff is collapsed.
# CoderAI <-> AISBF Integration Contract
# CoderAI Broker Implementation Reference
## Goal
## Purpose
Enable `coderai` to appear in AISBF as a first-class provider with two modes:
This document is the single source of truth for implementing the CoderAI side of the AISBF broker and bridge integration.
1. **Direct local HTTP mode**: AISBF talks to a local OpenAI-compatible CoderAI server.
2. **NAT-friendly WebSocket bridge mode**: CoderAI connects outward and exchanges OpenAI-compatible requests/responses through framed WebSocket messages.
The target audience is another LLM or engineer implementing CoderAI, not AISBF.
CoderAI is the upstream implementation source for Studio-native endpoints, so capability and endpoint discovery should be explicit and machine-readable.
This document is mirrored in `docs/coderai-broker-implementation-reference.md` and should be kept identical in purpose and protocol coverage.
## AISBF broker mode
AISBF now includes a public broker-side WebSocket endpoint for outbound-only NAT traversal.
- Broker WebSocket endpoint: `/api/coderai/broker/ws`
- Broker WebSocket endpoints:
- global scope: `/api/coderai/wss`
- user scope: `/api/u/{username}/coderai/wss`
- Broker session status endpoint: `/api/coderai/broker/providers/{provider_id}/status`
- Broker session listing endpoint: `/api/coderai/broker/sessions`
......@@ -28,6 +30,13 @@ Registration tokens are resolved from the owning provider configuration. This me
- each user configures the token for their own user-scoped `coderai` providers
- a broker session is only usable by requests belonging to the same owner principal
Broker registration is now scope-aware:
- global providers register with `username=global`
- user-owned providers register with `username=<aisbf_username>`
- the same scoped path must be used by the CoderAI client when connecting over WebSocket
- deployments behind TLS termination or reverse proxies must connect with the externally visible `wss://...` URL and preserve proxy headers so AISBF can remain scheme-aware
The AISBF dashboard now exposes this token directly inside each `coderai` provider configuration:
- token input is stored in `coderai_config.registration_token`
......@@ -83,6 +92,7 @@ Use provider type:
- For `transport=http`, AISBF uses the OpenAI Python client against `endpoint + /v1`.
- For `transport=websocket`, AISBF uses a WebSocket bridge and sends framed JSON envelopes.
- AISBF uses `models.list`, `chat.completions`, `capabilities`, `register`, and `proxy` bridge operations.
- `proxy` now supports arbitrary forwarded request headers, query params, multipart form payloads, binary/base64 bodies, progress polling endpoints, and non-chat streaming event envelopes for long-running jobs.
- AISBF treats `coderai` like an OpenAI-style Studio adapter family.
- AISBF can also forward arbitrary Studio-native endpoints through `proxy` when the provider transport is WebSocket.
- AISBF validates that broker-enabled `coderai` providers have a non-empty `registration_token`.
......@@ -189,11 +199,18 @@ When CoderAI dials AISBF broker directly, it should connect using:
- `provider_id=<provider_id>`
- `client_id=<client_id>`
- `username=<username-or-global>`
Example:
```text
wss://your-aisbf.example/api/coderai/broker/ws?provider_id=coderai&client_id=workstation-01&registration_token=<owner-configured-token>
wss://your-aisbf.example/api/coderai/wss?provider_id=coderai&client_id=workstation-01&username=global&registration_token=<owner-configured-token>
```
User-scoped example:
```text
wss://your-aisbf.example/api/u/alice/coderai/wss?provider_id=my-coderai&client_id=workstation-01&username=alice&registration_token=<owner-configured-token>
```
### Envelope format
......@@ -297,7 +314,7 @@ Important:
- This keeps AISBF transport-simple and lets CoderAI own protocol correctness.
- Include `data: [DONE]\n\n` as one of the streamed chunks when the upstream semantics require it.
## Broker session visibility and persistence
## Broker session visibility, persistence, and multi-node routing
AISBF now tracks two broker states:
......@@ -306,6 +323,15 @@ AISBF now tracks two broker states:
Persisted metadata is dashboard-facing only. It is used to show the last known session details after restart, but it is not treated as an active transport path until CoderAI reconnects.
For multi-node AISBF deployments behind a reverse proxy / load balancer:
- session status and ownership metadata are stored in the configured AISBF cache backend
- requests are enqueued into cache-backed broker queues keyed by broker session id
- the AISBF node holding the live WebSocket consumes queued requests and forwards them to CoderAI
- replies are written back through cache-backed reply keys so the AISBF node that originated the request can receive the result
Redis is the preferred backend for this distributed mode. SQLite/MySQL can operate as polling-based fallbacks. Memory/file cache backends are not suitable for cross-node broker routing.
Expected behavior:
- after reconnect, the persisted snapshot is refreshed with the new live session details
......@@ -393,10 +419,22 @@ Request payload:
{
"endpoint_path": "v1/video/dub",
"method": "POST",
"headers": {
"x-request-id": "studio-job-123",
"accept": "text/event-stream"
},
"query_params": {
"job_id": "dub_123"
},
"body": {
"model": "local-video-model",
"input": "Dub this clip to Italian"
}
},
"multipart": {
"fields": [{"name": "model", "value": "whisper-large"}],
"files": [{"name": "file", "filename": "sample.wav", "content_type": "audio/wav", "data_base64": "<base64>"}]
},
"stream": true
}
```
......@@ -405,6 +443,9 @@ Response payload:
```json
{
"status_code": 200,
"headers": {
"content-type": "application/json"
},
"body": {
"job_id": "dub_123",
"status": "queued"
......@@ -412,6 +453,37 @@ Response payload:
}
```
Binary response payloads may instead use:
```json
{
"status_code": 200,
"content_type": "audio/mpeg",
"body_base64": "<base64>",
"headers": {
"content-disposition": "attachment; filename=preview.mp3"
}
}
```
Streaming and progress responses may emit multiple envelopes with `event` values like `progress`, `output`, `log`, `data`, `chunk`, and finally `done` or `completed`.
Recommended progress chunk payload:
```json
{
"v": 1,
"request_id": "coderai-1746960000000",
"status": "ok",
"event": "progress",
"payload": {
"chunk": "event: progress\ndata: {\"active\":true,\"current\":5,\"total\":20,\"pct\":25,\"elapsed\":12}\n\n"
}
}
```
Capability advertisements should include endpoint metadata for custom pipelines, including supported methods, streaming mode, expected input/output modalities, and whether multipart or binary transport is required.
## Recommended CoderAI architecture
### Server components
......
......@@ -48,8 +48,8 @@ class build_py(_build_py):
in site-packages/aisbf/_share/ and can be extracted by cli.py on first run.
"""
_SHARE_FILES = ['main.py', 'requirements.txt', 'aisbf.sh', 'DOCUMENTATION.md', 'README.md', 'LICENSE.txt']
_SHARE_DIRS = ['templates', 'static', 'config', 'aisbf']
_SHARE_FILES = ['main.py', 'requirements.txt', 'aisbf.sh', 'DOCUMENTATION.md', 'README.md', 'LICENSE.txt', 'AI.PROMPT']
_SHARE_DIRS = ['templates', 'static', 'config', 'aisbf', 'docs']
def run(self):
self._populate_share()
......@@ -149,6 +149,7 @@ setup(
'DOCUMENTATION.md',
'README.md',
'LICENSE.txt',
'AI.PROMPT',
'config/providers.json',
'config/rotations.json',
'config/autoselect.json',
......@@ -158,6 +159,9 @@ setup(
'config/STUDIO_SYSTEM.md',
'config/aisbf.json',
]),
('share/aisbf/docs', [
'docs/coderai-integration.md',
]),
# Install aisbf package to share directory for venv installation
# Main aisbf module files
('share/aisbf/aisbf', [
......
import json
import base64
from unittest.mock import Mock
import pytest
......@@ -150,3 +151,42 @@ async def test_coderai_websocket_stream_emits_sse_bytes(monkeypatch):
assert chunks[0].startswith(b"data: ")
assert chunks[-1] == b"data: [DONE]\n\n"
@pytest.mark.asyncio
async def test_coderai_broker_stream_supports_progress_and_binary_chunks(monkeypatch):
provider_config = {
"id": "coderai_nat",
"name": "CoderAI NAT",
"endpoint": "wss://broker.example.test/coderai/ws",
"type": "coderai",
"api_key_required": False,
"coderai_config": {"transport": "websocket", "bridge_path": "/coderai/ws"},
}
handler = CoderAIProviderHandler("coderai_nat", provider_config=provider_config)
async def fake_broker_request(op, payload, timeout=None):
assert op == "proxy"
return {
"status": "ok",
"event": "progress",
"request_id": "req-1",
"payload": {"chunk": {"data_base64": base64.b64encode(b"event: progress\\ndata: {\"pct\":25}\\n\\n").decode("ascii")}},
}
async def fake_wait_for_stream_event(request_id, timeout=300.0):
assert request_id == "req-1"
return {"status": "ok", "event": "done", "request_id": "req-1", "payload": {}}
async def fake_use_broker():
return True
monkeypatch.setattr(handler, "_broker_request", fake_broker_request)
monkeypatch.setattr(handler, "_use_broker", fake_use_broker)
monkeypatch.setattr("aisbf.providers.coderai.coderai_broker.wait_for_stream_event", fake_wait_for_stream_event)
status_code, payload = await handler.proxy_native_request("v1/video/progress", method="GET", stream=True)
assert status_code == 200
assert payload["stream_encoding"] == "base64"
assert base64.b64decode(payload["stream_chunks"][0]).startswith(b"event: progress")
......@@ -238,7 +238,7 @@ def test_dashboard_providers_page_includes_broker_status_for_coderai(monkeypatch
assert response.status_code == 200
assert "Broker Session Status" in response.text
assert "workstation-01" in response.text
assert "v1/images/generate" in response.text
assert "workstation-01" in response.text
finally:
asyncio.run(_clear_broker_sessions())
if original_provider is None:
......@@ -264,7 +264,9 @@ def test_dashboard_api_coderai_broker_sessions_filters_by_owner(monkeypatch):
response = asyncio.run(dashboard_providers.api_coderai_broker_sessions(RequestStub()))
sessions = json.loads(response.body)["sessions"]
assert len(sessions) == 1
assert sessions[0]["provider_id"] == "user-coderai"
connected_sessions = [session for session in sessions if session.get("connected")]
assert any(session["client_id"] == "user-client" for session in connected_sessions)
assert not any(session.get("client_id") == "global-client" for session in connected_sessions)
assert any(session["provider_id"] == "user-coderai" for session in sessions)
finally:
asyncio.run(_clear_broker_sessions())
......@@ -42,9 +42,10 @@ async def _clear_broker_sessions():
def test_broker_registers_websocket_session_and_reports_status():
with TestClient(app) as client:
with client.websocket_connect("/api/coderai/broker/ws?provider_id=coderai&client_id=nat-client&registration_token=global-token") as websocket:
with client.websocket_connect("/api/coderai/wss?provider_id=coderai&client_id=nat-client&registration_token=global-token&username=global") as websocket:
registered = websocket.receive_json()
assert registered["event"] == "registered"
assert registered["scope_name"] == "global"
websocket.send_json({
"op": "register",
......@@ -69,7 +70,7 @@ def test_broker_registers_websocket_session_and_reports_status():
def test_broker_rejects_missing_registration_token():
with TestClient(app) as client:
with pytest.raises(Exception):
with client.websocket_connect("/api/coderai/broker/ws?provider_id=coderai&client_id=bad-client"):
with client.websocket_connect("/api/coderai/wss?provider_id=coderai&client_id=bad-client&username=global"):
pass
......@@ -96,7 +97,97 @@ def test_broker_routes_request_to_registered_session():
assert sent_message["op"] == "models.list"
assert response["status"] == "ok"
assert response["payload"]["data"][0]["id"] == "llama3.1:8b"
queued = await broker.consume_request(session.session_id, timeout=0)
assert queued is None
finally:
await broker.unregister(session.session_id)
asyncio.run(scenario())
def test_broker_uses_queue_for_remote_node_session():
async def scenario():
class StubWebSocket:
async def send_text(self, payload: str):
raise AssertionError("Remote-node session should not use direct websocket fast path")
websocket = StubWebSocket()
session = await broker.register(websocket, "coderai", "remote-client", metadata={"owner_user_id": None})
await broker.touch(session.session_id, metadata={"broker_node_id": "remote-node"})
try:
async def responder():
message = await broker.consume_request(session.session_id, timeout=1)
assert message is not None
await asyncio.sleep(0)
await broker.publish_response({
"request_id": message["request_id"],
"status": "ok",
"payload": {"data": [{"id": "queue-model"}]},
})
response, _ = await asyncio.gather(
broker.send_request("coderai", "models.list", {}, client_id="remote-client", owner_user_id=None, timeout=3.0),
responder(),
)
assert response["payload"]["data"][0]["id"] == "queue-model"
finally:
await broker.unregister(session.session_id)
asyncio.run(scenario())
def test_broker_stream_events_are_delivered_to_waiter():
async def scenario():
class StubWebSocket:
def __init__(self):
self.sent = []
async def send_text(self, payload: str):
self.sent.append(payload)
message = json.loads(payload)
await broker.publish_response({
"request_id": message["request_id"],
"status": "ok",
"event": "progress",
"payload": {"chunk": "event: progress\ndata: {\"pct\": 50}\n\n"},
})
await broker.publish_response({
"request_id": message["request_id"],
"status": "ok",
"event": "done",
"payload": {},
})
websocket = StubWebSocket()
session = await broker.register(websocket, "coderai", "stream-client", metadata={"owner_user_id": None})
try:
response = await broker.send_request("coderai", "proxy", {"stream": True}, client_id="stream-client", owner_user_id=None, timeout=3.0)
assert response["event"] == "done"
finally:
await broker.unregister(session.session_id)
asyncio.run(scenario())
def test_user_scoped_broker_websocket_path_registers_username():
original_provider = config.providers.get("coderai-user")
config.providers["coderai-user"] = ProviderConfig(
id="coderai-user",
name="CoderAI User",
endpoint="http://127.0.0.1:11437",
type="coderai",
api_key_required=False,
rate_limit=0,
coderai_config={"registration_token": "user-token"},
)
try:
with TestClient(app) as client:
with client.websocket_connect("/api/u/alice/coderai/wss?provider_id=coderai-user&client_id=user-client&registration_token=user-token&username=alice") as websocket:
registered = websocket.receive_json()
assert registered["username"] == "alice"
assert registered["scope_name"] == "alice"
finally:
if original_provider is None:
config.providers.pop("coderai-user", None)
else:
config.providers["coderai-user"] = original_provider
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment