feat: expand CoderAI broker telemetry

parent 68b2e467
...@@ -16,6 +16,16 @@ _endpoint_model_cache: dict = {} ...@@ -16,6 +16,16 @@ _endpoint_model_cache: dict = {}
_background_tasks: set = set() _background_tasks: set = set()
def _is_broker_only_coderai(provider_config) -> bool:
provider_type = getattr(provider_config, 'type', '')
if provider_type != 'coderai':
return False
coderai_config = getattr(provider_config, 'coderai_config', None) or {}
if not isinstance(coderai_config, dict):
return False
return bool(coderai_config.get('broker_mode', False))
async def fetch_provider_models(provider_id: str, config, user_id: Optional[int] = None) -> list: async def fetch_provider_models(provider_id: str, config, user_id: Optional[int] = None) -> list:
global _model_cache, _model_cache_timestamps, _endpoint_model_cache global _model_cache, _model_cache_timestamps, _endpoint_model_cache
...@@ -172,6 +182,9 @@ async def prefetch_global_provider_models(config): ...@@ -172,6 +182,9 @@ async def prefetch_global_provider_models(config):
total += 1 total += 1
if hasattr(provider_config, 'models') and provider_config.models: if hasattr(provider_config, 'models') and provider_config.models:
continue continue
if _is_broker_only_coderai(provider_config):
logger.info(f"Skipping model prefetch for broker-only CoderAI provider '{provider_id}' until a broker session connects")
continue
provider_type = getattr(provider_config, 'type', '') provider_type = getattr(provider_config, 'type', '')
if provider_type in ('kilo', 'kilocode', 'coderai'): if provider_type in ('kilo', 'kilocode', 'coderai'):
......
This diff is collapsed.
...@@ -68,6 +68,8 @@ class CoderAIProviderHandler(BaseProviderHandler): ...@@ -68,6 +68,8 @@ class CoderAIProviderHandler(BaseProviderHandler):
self._broker_enabled = bool(self._coderai_config.get("broker_enabled", True)) self._broker_enabled = bool(self._coderai_config.get("broker_enabled", True))
self._broker_mode = bool(self._coderai_config.get("broker_mode", False)) self._broker_mode = bool(self._coderai_config.get("broker_mode", False))
self._broker_preferred = bool(self._coderai_config.get("broker_preferred", False)) self._broker_preferred = bool(self._coderai_config.get("broker_preferred", False))
if self._broker_mode:
self._transport = "broker"
self._base_endpoint = self._normalize_http_base(self._raw_endpoint) self._base_endpoint = self._normalize_http_base(self._raw_endpoint)
self._ws_endpoint = self._normalize_ws_endpoint(self._raw_endpoint) self._ws_endpoint = self._normalize_ws_endpoint(self._raw_endpoint)
self._apply_provider_defaults() self._apply_provider_defaults()
...@@ -184,6 +186,12 @@ class CoderAIProviderHandler(BaseProviderHandler): ...@@ -184,6 +186,12 @@ class CoderAIProviderHandler(BaseProviderHandler):
headers["Authorization"] = f"Bearer {token}" headers["Authorization"] = f"Bearer {token}"
return headers return headers
def _is_direct_http_mode(self) -> bool:
return self._transport == "http" and not self._broker_mode
def _is_direct_websocket_mode(self) -> bool:
return self._transport == "websocket" and not self._broker_mode
async def _use_broker(self) -> bool: async def _use_broker(self) -> bool:
if not self._broker_enabled: if not self._broker_enabled:
return False return False
...@@ -264,6 +272,9 @@ class CoderAIProviderHandler(BaseProviderHandler): ...@@ -264,6 +272,9 @@ class CoderAIProviderHandler(BaseProviderHandler):
message = await coderai_broker.wait_for_stream_event(next_request_id, timeout=timeout) message = await coderai_broker.wait_for_stream_event(next_request_id, timeout=timeout)
def validate_credentials(self) -> bool: def validate_credentials(self) -> bool:
if self._broker_mode:
logger.info(f"[{self.provider_id}] CoderAI broker mode enabled; waiting for inbound broker session")
return True
if self._transport == "websocket" and not self._websocket_enabled: if self._transport == "websocket" and not self._websocket_enabled:
logger.error(f"[{self.provider_id}] WebSocket transport selected but disabled") logger.error(f"[{self.provider_id}] WebSocket transport selected but disabled")
return False return False
...@@ -444,7 +455,7 @@ class CoderAIProviderHandler(BaseProviderHandler): ...@@ -444,7 +455,7 @@ class CoderAIProviderHandler(BaseProviderHandler):
raise Exception(message.get("error") or "CoderAI broker request failed") raise Exception(message.get("error") or "CoderAI broker request failed")
self.record_success() self.record_success()
return message.get("payload") or {} return message.get("payload") or {}
if self._transport == "websocket": if self._is_direct_websocket_mode():
if stream: if stream:
return self._ws_stream("chat.completions", payload, timeout=self._request_timeout) return self._ws_stream("chat.completions", payload, timeout=self._request_timeout)
message = await self._ws_roundtrip("chat.completions", payload, timeout=self._request_timeout) message = await self._ws_roundtrip("chat.completions", payload, timeout=self._request_timeout)
...@@ -468,7 +479,7 @@ class CoderAIProviderHandler(BaseProviderHandler): ...@@ -468,7 +479,7 @@ class CoderAIProviderHandler(BaseProviderHandler):
if (message.get("status") or "ok") == "error": if (message.get("status") or "ok") == "error":
raise Exception(message.get("error") or "CoderAI broker model discovery failed") raise Exception(message.get("error") or "CoderAI broker model discovery failed")
return self._extract_models(message.get("payload") or {}) return self._extract_models(message.get("payload") or {})
if self._transport == "websocket": if self._is_direct_websocket_mode():
message = await self._ws_roundtrip("models.list", {}, timeout=self._model_timeout) message = await self._ws_roundtrip("models.list", {}, timeout=self._model_timeout)
if (message.get("status") or "ok") == "error": if (message.get("status") or "ok") == "error":
raise Exception(message.get("error") or "CoderAI model discovery failed") raise Exception(message.get("error") or "CoderAI model discovery failed")
...@@ -488,11 +499,13 @@ class CoderAIProviderHandler(BaseProviderHandler): ...@@ -488,11 +499,13 @@ class CoderAIProviderHandler(BaseProviderHandler):
if (message.get("status") or "ok") == "error": if (message.get("status") or "ok") == "error":
raise Exception(message.get("error") or "CoderAI broker capability discovery failed") raise Exception(message.get("error") or "CoderAI broker capability discovery failed")
return message.get("payload") or {} return message.get("payload") or {}
if self._transport == "websocket": if self._is_direct_websocket_mode():
message = await self._ws_roundtrip("capabilities", {}, timeout=self._model_timeout) message = await self._ws_roundtrip("capabilities", {}, timeout=self._model_timeout)
if (message.get("status") or "ok") == "error": if (message.get("status") or "ok") == "error":
raise Exception(message.get("error") or "CoderAI capability discovery failed") raise Exception(message.get("error") or "CoderAI capability discovery failed")
return message.get("payload") or {} return message.get("payload") or {}
if self._broker_mode:
raise Exception("CoderAI broker mode requires an active broker session")
return await self._http_json("GET", "/coderai/capabilities", timeout=self._model_timeout) return await self._http_json("GET", "/coderai/capabilities", timeout=self._model_timeout)
async def register_client(self) -> Dict[str, Any]: async def register_client(self) -> Dict[str, Any]:
...@@ -507,11 +520,13 @@ class CoderAIProviderHandler(BaseProviderHandler): ...@@ -507,11 +520,13 @@ class CoderAIProviderHandler(BaseProviderHandler):
if (message.get("status") or "ok") == "error": if (message.get("status") or "ok") == "error":
raise Exception(message.get("error") or "CoderAI broker registration failed") raise Exception(message.get("error") or "CoderAI broker registration failed")
return message.get("payload") or {} return message.get("payload") or {}
if self._transport == "websocket": if self._is_direct_websocket_mode():
message = await self._ws_roundtrip("register", payload, timeout=self._model_timeout) message = await self._ws_roundtrip("register", payload, timeout=self._model_timeout)
if (message.get("status") or "ok") == "error": if (message.get("status") or "ok") == "error":
raise Exception(message.get("error") or "CoderAI registration failed") raise Exception(message.get("error") or "CoderAI registration failed")
return message.get("payload") or {} return message.get("payload") or {}
if self._broker_mode:
raise Exception("CoderAI broker mode does not support outbound registration")
return await self._http_json("POST", self._registration_path, payload, timeout=self._model_timeout) return await self._http_json("POST", self._registration_path, payload, timeout=self._model_timeout)
async def proxy_native_request( async def proxy_native_request(
...@@ -549,11 +564,13 @@ class CoderAIProviderHandler(BaseProviderHandler): ...@@ -549,11 +564,13 @@ class CoderAIProviderHandler(BaseProviderHandler):
raise Exception(message.get("error") or "CoderAI broker proxy request failed") raise Exception(message.get("error") or "CoderAI broker proxy request failed")
envelope = message.get("payload") or {} envelope = message.get("payload") or {}
return int(envelope.get("status_code") or 200), envelope return int(envelope.get("status_code") or 200), envelope
if self._transport == "websocket": if self._is_direct_websocket_mode():
message = await self._ws_roundtrip("proxy", payload, timeout=self._request_timeout) message = await self._ws_roundtrip("proxy", payload, timeout=self._request_timeout)
if (message.get("status") or "ok") == "error": if (message.get("status") or "ok") == "error":
raise Exception(message.get("error") or "CoderAI proxy request failed") raise Exception(message.get("error") or "CoderAI proxy request failed")
envelope = message.get("payload") or {} envelope = message.get("payload") or {}
return int(envelope.get("status_code") or 200), envelope return int(envelope.get("status_code") or 200), envelope
if self._broker_mode:
raise Exception("CoderAI broker mode requires an active broker session")
response = await self._http_json(method.upper(), endpoint_path, body or {}, timeout=self._request_timeout) response = await self._http_json(method.upper(), endpoint_path, body or {}, timeout=self._request_timeout)
return 200, response return 200, response
...@@ -62,6 +62,11 @@ async def _coderai_broker_websocket_impl(websocket: WebSocket, scope_name: str): ...@@ -62,6 +62,11 @@ async def _coderai_broker_websocket_impl(websocket: WebSocket, scope_name: str):
"endpoint": payload.get("endpoint"), "endpoint": payload.get("endpoint"),
"transport": payload.get("transport"), "transport": payload.get("transport"),
"studio_endpoints": payload.get("studio_endpoints") or [], "studio_endpoints": payload.get("studio_endpoints") or [],
"hardware": payload.get("hardware") or {},
"gpus": payload.get("gpus") or ((payload.get("hardware") or {}).get("gpus")) or [],
"gpu_count": payload.get("gpu_count") or ((payload.get("hardware") or {}).get("gpu_count")),
"total_vram_mb": payload.get("total_vram_mb") or ((payload.get("hardware") or {}).get("total_vram_mb")),
"available_vram_mb": payload.get("available_vram_mb") or ((payload.get("hardware") or {}).get("available_vram_mb")),
"owner_user_id": owner_user_id, "owner_user_id": owner_user_id,
"username": username, "username": username,
"scope_name": expected_scope, "scope_name": expected_scope,
......
...@@ -65,17 +65,17 @@ ...@@ -65,17 +65,17 @@
}, },
"coderai": { "coderai": {
"id": "coderai", "id": "coderai",
"name": "CoderAI Local Bridge", "name": "CoderAI Broker",
"endpoint": "http://127.0.0.1:11437", "endpoint": "http://127.0.0.1:11437",
"type": "coderai", "type": "coderai",
"api_key_required": false, "api_key_required": false,
"rate_limit": 0, "rate_limit": 0,
"coderai_config": { "coderai_config": {
"transport": "http", "transport": "broker",
"http_enabled": true, "http_enabled": false,
"websocket_enabled": true, "websocket_enabled": false,
"broker_enabled": true, "broker_enabled": true,
"broker_mode": false, "broker_mode": true,
"broker_preferred": true, "broker_preferred": true,
"discovery_enabled": true, "discovery_enabled": true,
"client_id": "aisbf-default", "client_id": "aisbf-default",
......
...@@ -25,12 +25,13 @@ The broker client should: ...@@ -25,12 +25,13 @@ The broker client should:
1. Dial AISBF over `ws://` or `wss://` 1. Dial AISBF over `ws://` or `wss://`
2. Authenticate using the provider-scoped `registration_token` 2. Authenticate using the provider-scoped `registration_token`
3. Register metadata and capabilities after connect 3. Register metadata, hardware inventory, and capabilities after connect
4. Stay connected with heartbeat support 4. Stay connected with heartbeat support
5. Receive queued or direct broker requests from AISBF 5. Receive queued or direct broker requests from AISBF
6. Execute supported operations locally 6. Execute supported operations locally
7. Send back success, error, binary, and streaming envelopes with the same `request_id` 7. Send back success, error, binary, and streaming envelopes with the same `request_id`
8. Automatically reconnect if the connection drops 8. Include performance metrics for completed requests whenever available
9. Automatically reconnect if the connection drops
## AISBF Concepts You Must Match ## AISBF Concepts You Must Match
...@@ -137,7 +138,7 @@ Store: ...@@ -137,7 +138,7 @@ Store:
### 3. Send explicit `register` operation ### 3. Send explicit `register` operation
After the `registered` event, CoderAI must send a `register` message describing its capabilities and advertised endpoints. After the `registered` event, CoderAI must send a `register` message describing its capabilities, hardware inventory, and advertised endpoints.
### 4. Enter long-lived receive loop ### 4. Enter long-lived receive loop
...@@ -164,6 +165,23 @@ CoderAI should send this after receiving the initial AISBF `registered` event. ...@@ -164,6 +165,23 @@ CoderAI should send this after receiving the initial AISBF `registered` event.
"endpoint": "ws://local-coderai-or-descriptive-endpoint", "endpoint": "ws://local-coderai-or-descriptive-endpoint",
"transport": "websocket", "transport": "websocket",
"registration_token": "<same_registration_token>", "registration_token": "<same_registration_token>",
"hardware": {
"hostname": "workstation-01",
"platform": "linux",
"gpus": [
{
"index": 0,
"name": "NVIDIA RTX 4090",
"vendor": "nvidia",
"total_vram_mb": 24576,
"available_vram_mb": 20480,
"used_vram_mb": 4096
}
],
"gpu_count": 1,
"total_vram_mb": 24576,
"available_vram_mb": 20480
},
"studio_endpoints": [ "studio_endpoints": [
"v1/images/generate", "v1/images/generate",
"v1/audio/tts", "v1/audio/tts",
...@@ -215,6 +233,31 @@ CoderAI should send this after receiving the initial AISBF `registered` event. ...@@ -215,6 +233,31 @@ CoderAI should send this after receiving the initial AISBF `registered` event.
AISBF replies with a success envelope. AISBF replies with a success envelope.
### Hardware Reporting Requirements
The `register` payload should include the best hardware view available to the running CoderAI process.
Required if detectable:
- `hardware.gpus`: array of GPU objects visible to the process
- `hardware.gpu_count`: integer count
- `hardware.total_vram_mb`: total usable VRAM across advertised GPUs
- `hardware.available_vram_mb`: currently free or available VRAM across advertised GPUs
Recommended per GPU fields:
- `index`
- `name`
- `vendor`
- `total_vram_mb`
- `available_vram_mb`
- `used_vram_mb`
- backend-specific extras if trivial to expose
If exact values are unavailable, estimate conservatively and include any supporting marker such as `estimated: true`.
AISBF stores this in broker session metadata so dashboards and future routing logic can reason about available hardware.
## Required Heartbeat Support ## Required Heartbeat Support
AISBF may send heartbeat requests, and CoderAI may also proactively keep the socket alive. AISBF may send heartbeat requests, and CoderAI may also proactively keep the socket alive.
...@@ -261,6 +304,30 @@ CoderAI may also periodically send: ...@@ -261,6 +304,30 @@ CoderAI may also periodically send:
} }
``` ```
Heartbeat payloads may also refresh dynamic hardware state such as changing free VRAM:
```json
{
"v": 1,
"op": "heartbeat",
"request_id": "hb-self-2",
"payload": {
"hardware": {
"available_vram_mb": 18432,
"gpus": [
{
"index": 0,
"available_vram_mb": 18432,
"used_vram_mb": 6144
}
]
}
}
}
```
AISBF merges those updates into the broker session metadata.
## Local HTTP Endpoints CoderAI Should Expose ## Local HTTP Endpoints CoderAI Should Expose
### OpenAI-compatible endpoints ### OpenAI-compatible endpoints
...@@ -427,6 +494,16 @@ Response payload should match `GET /v1/models`. ...@@ -427,6 +494,16 @@ Response payload should match `GET /v1/models`.
Request payload matches OpenAI `POST /v1/chat/completions` body. Request payload matches OpenAI `POST /v1/chat/completions` body.
Completed responses should include performance metrics whenever practical:
- `latency_ms`
- `prompt_tokens`
- `completion_tokens`
- `total_tokens`
- `tokens_per_second`
If exact values are unavailable, AISBF estimates latency from broker timing and may estimate throughput from tokens plus latency.
### `op = "capabilities"` ### `op = "capabilities"`
Response payload should match `GET /coderai/capabilities`. Response payload should match `GET /coderai/capabilities`.
...@@ -505,6 +582,8 @@ Semantics: ...@@ -505,6 +582,8 @@ Semantics:
"finish_reason": "stop" "finish_reason": "stop"
} }
], ],
"latency_ms": 842,
"tokens_per_second": 17.8,
"usage": { "usage": {
"prompt_tokens": 10, "prompt_tokens": 10,
"completion_tokens": 5, "completion_tokens": 5,
...@@ -514,6 +593,20 @@ Semantics: ...@@ -514,6 +593,20 @@ Semantics:
} }
``` ```
AISBF keeps a rolling performance window for the latest 100 completed broker requests per connected session.
When exact metrics are present, AISBF stores them directly. Otherwise it estimates:
- latency from broker request start to final reply time
- throughput from `total_tokens / latency_seconds` when token counts are present
The resulting session snapshot tracks:
- average latency
- average throughput
- average total tokens
- success rate
### Error ### Error
```json ```json
......
This diff is collapsed.
This diff is collapsed.
import json import json
import base64 import base64
from unittest.mock import Mock from unittest.mock import Mock
from types import SimpleNamespace
import pytest import pytest
from aisbf.providers.coderai import CoderAIProviderHandler from aisbf.providers.coderai import CoderAIProviderHandler
from aisbf.config import config from aisbf.config import config
from aisbf.config import ProviderConfig from aisbf.config import ProviderConfig
from aisbf.app.model_cache import _is_broker_only_coderai
@pytest.fixture(autouse=True) @pytest.fixture(autouse=True)
...@@ -190,3 +192,56 @@ async def test_coderai_broker_stream_supports_progress_and_binary_chunks(monkeyp ...@@ -190,3 +192,56 @@ async def test_coderai_broker_stream_supports_progress_and_binary_chunks(monkeyp
assert status_code == 200 assert status_code == 200
assert payload["stream_encoding"] == "base64" assert payload["stream_encoding"] == "base64"
assert base64.b64decode(payload["stream_chunks"][0]).startswith(b"event: progress") assert base64.b64decode(payload["stream_chunks"][0]).startswith(b"event: progress")
def test_coderai_broker_mode_forces_broker_transport_and_skips_outbound_validation():
provider_config = {
"id": "coderai_nat",
"name": "CoderAI NAT",
"endpoint": "http://127.0.0.1:11437",
"type": "coderai",
"api_key_required": False,
"coderai_config": {"broker_mode": True, "registration_token": "nat-token"},
}
handler = CoderAIProviderHandler("coderai_nat", provider_config=provider_config)
assert handler._transport == "broker"
assert handler.validate_credentials() is True
@pytest.mark.asyncio
async def test_coderai_non_broker_mode_uses_openai_compatible_http_api():
provider_config = {
"id": "coderai_local",
"name": "CoderAI",
"endpoint": "http://127.0.0.1:11437",
"type": "coderai",
"api_key_required": False,
"api_key": "local-api-token",
"coderai_config": {"broker_mode": False, "broker_enabled": False, "transport": "http"},
}
handler = CoderAIProviderHandler("coderai_local", provider_config=provider_config)
assert handler._transport == "http"
assert handler._effective_api_key() == "local-api-token"
assert handler.validate_credentials() is True
def test_model_cache_detects_broker_only_coderai_provider():
provider_config = SimpleNamespace(
type="coderai",
coderai_config={"broker_mode": True},
)
assert _is_broker_only_coderai(provider_config) is True
def test_model_cache_allows_direct_http_coderai_provider():
provider_config = SimpleNamespace(
type="coderai",
coderai_config={"broker_mode": False, "broker_enabled": False, "transport": "http"},
)
assert _is_broker_only_coderai(provider_config) is False
...@@ -149,6 +149,27 @@ def test_api_provider_save_rejects_coderai_broker_without_registration_token(mon ...@@ -149,6 +149,27 @@ def test_api_provider_save_rejects_coderai_broker_without_registration_token(mon
assert "requires a registration token" in str(exc) assert "requires a registration token" in str(exc)
def test_api_provider_save_allows_non_broker_coderai_without_registration_token():
provider_config = dashboard_providers._ensure_coderai_token(
{
"id": "coderai-http",
"name": "CoderAI HTTP",
"endpoint": "http://127.0.0.1:11437",
"api_key_required": False,
"api_key": "local-token",
"type": "coderai",
"coderai_config": {
"broker_enabled": False,
"broker_mode": False,
"registration_token": "",
},
}
)
dashboard_providers._validate_coderai_provider_config("coderai-http", provider_config)
assert provider_config["coderai_config"]["registration_token"] == ""
def test_api_provider_save_persists_coderai_token_for_user(monkeypatch): def test_api_provider_save_persists_coderai_token_for_user(monkeypatch):
db = DbStub() db = DbStub()
......
...@@ -54,6 +54,15 @@ def test_broker_registers_websocket_session_and_reports_status(): ...@@ -54,6 +54,15 @@ def test_broker_registers_websocket_session_and_reports_status():
"endpoint": "ws://local-tunnel", "endpoint": "ws://local-tunnel",
"transport": "websocket", "transport": "websocket",
"registration_token": "global-token", "registration_token": "global-token",
"hardware": {
"gpus": [
{
"name": "RTX 4090",
"total_vram_mb": 24576,
"available_vram_mb": 20480
}
]
},
"capabilities": {"studio": {"enabled": True}}, "capabilities": {"studio": {"enabled": True}},
}, },
}) })
...@@ -65,6 +74,9 @@ def test_broker_registers_websocket_session_and_reports_status(): ...@@ -65,6 +74,9 @@ def test_broker_registers_websocket_session_and_reports_status():
payload = response.json() payload = response.json()
assert payload["connected"] is True assert payload["connected"] is True
assert payload["client_id"] == "nat-client" assert payload["client_id"] == "nat-client"
assert payload["metadata"]["gpu_count"] == 1
assert payload["metadata"]["total_vram_mb"] == 24576
assert payload["metadata"]["available_vram_mb"] == 20480
def test_broker_rejects_missing_registration_token(): def test_broker_rejects_missing_registration_token():
...@@ -83,7 +95,7 @@ def test_broker_routes_request_to_registered_session(): ...@@ -83,7 +95,7 @@ def test_broker_routes_request_to_registered_session():
async def send_text(self, payload: str): async def send_text(self, payload: str):
self.sent.append(payload) self.sent.append(payload)
message = json.loads(payload) message = json.loads(payload)
await broker.resolve_response({ await broker.publish_response({
"request_id": message["request_id"], "request_id": message["request_id"],
"status": "ok", "status": "ok",
"payload": {"data": [{"id": "llama3.1:8b"}]}, "payload": {"data": [{"id": "llama3.1:8b"}]},
...@@ -97,6 +109,9 @@ def test_broker_routes_request_to_registered_session(): ...@@ -97,6 +109,9 @@ def test_broker_routes_request_to_registered_session():
assert sent_message["op"] == "models.list" assert sent_message["op"] == "models.list"
assert response["status"] == "ok" assert response["status"] == "ok"
assert response["payload"]["data"][0]["id"] == "llama3.1:8b" assert response["payload"]["data"][0]["id"] == "llama3.1:8b"
snapshot = await broker.get_session_snapshot("coderai", "bridge-client")
assert snapshot["performance"]["sample_count"] == 1
assert snapshot["performance"]["avg_latency_ms"] >= 0
queued = await broker.consume_request(session.session_id, timeout=0) queued = await broker.consume_request(session.session_id, timeout=0)
assert queued is None assert queued is None
finally: finally:
...@@ -163,6 +178,8 @@ def test_broker_stream_events_are_delivered_to_waiter(): ...@@ -163,6 +178,8 @@ def test_broker_stream_events_are_delivered_to_waiter():
try: try:
response = await broker.send_request("coderai", "proxy", {"stream": True}, client_id="stream-client", owner_user_id=None, timeout=3.0) response = await broker.send_request("coderai", "proxy", {"stream": True}, client_id="stream-client", owner_user_id=None, timeout=3.0)
assert response["event"] == "done" assert response["event"] == "done"
snapshot = await broker.get_session_snapshot("coderai", "stream-client")
assert snapshot["performance"]["sample_count"] == 1
finally: finally:
await broker.unregister(session.session_id) await broker.unregister(session.session_id)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment