Commit 61ecc606 authored by Your Name's avatar Your Name

Now all providers type works correctly

parent fc8036ad
......@@ -141,6 +141,26 @@ class DatabaseManager:
)
''')
# Migration: Add user_id column to token_usage if it doesn't exist
# This handles databases created before the user_id column was added
try:
if self.db_type == 'sqlite':
cursor.execute("PRAGMA table_info(token_usage)")
columns = [row[1] for row in cursor.fetchall()]
if 'user_id' not in columns:
cursor.execute('ALTER TABLE token_usage ADD COLUMN user_id INTEGER')
logger.info("Migration: Added user_id column to token_usage table")
else: # mysql
cursor.execute("""
SELECT COLUMN_NAME FROM INFORMATION_SCHEMA.COLUMNS
WHERE TABLE_NAME = 'token_usage' AND COLUMN_NAME = 'user_id'
""")
if not cursor.fetchone():
cursor.execute('ALTER TABLE token_usage ADD COLUMN user_id INTEGER')
logger.info("Migration: Added user_id column to token_usage table")
except Exception as e:
logger.warning(f"Migration check for token_usage.user_id: {e}")
# Create indexes for token_usage
try:
cursor.execute('''
......
......@@ -589,10 +589,84 @@ class RequestHandler:
# This is more reliable than checking response iterability which can cause false positives
is_google_stream = provider_config.type == 'google'
is_kiro_stream = provider_config.type == 'kiro'
is_kilo_stream = provider_config.type in ('kilo', 'kilocode')
logger.info(f"Is Google streaming response: {is_google_stream} (provider type: {provider_config.type})")
logger.info(f"Is Kiro streaming response: {is_kiro_stream} (provider type: {provider_config.type})")
logger.info(f"Is Kilo streaming response: {is_kilo_stream} (provider type: {provider_config.type})")
if is_kiro_stream:
if is_kilo_stream:
# Handle Kilo/KiloCode streaming response
# Kilo returns an async generator that yields OpenAI-compatible SSE bytes directly
# We parse these and pass through with minimal processing
accumulated_response_text = "" # Track full response for token counting
chunk_count = 0
tool_calls_from_stream = [] # Track tool calls from stream
async for chunk in response:
chunk_count += 1
try:
logger.debug(f"Kilo chunk type: {type(chunk)}")
# Parse SSE chunk to extract JSON data
chunk_data = None
if isinstance(chunk, bytes):
try:
chunk_str = chunk.decode('utf-8')
# May contain multiple SSE lines
for sse_line in chunk_str.split('\n'):
sse_line = sse_line.strip()
if sse_line.startswith('data: '):
data_str = sse_line[6:].strip()
if data_str and data_str != '[DONE]':
try:
chunk_data = json.loads(data_str)
except json.JSONDecodeError:
pass
except (UnicodeDecodeError, Exception) as e:
logger.warning(f"Failed to parse Kilo bytes chunk: {e}")
elif isinstance(chunk, str):
if chunk.startswith('data: '):
data_str = chunk[6:].strip()
if data_str and data_str != '[DONE]':
try:
chunk_data = json.loads(data_str)
except json.JSONDecodeError:
pass
if chunk_data:
# Extract content and tool calls from chunk
choices = chunk_data.get('choices', [])
if choices:
delta = choices[0].get('delta', {})
# Track content
delta_content = delta.get('content', '')
if delta_content:
accumulated_response_text += delta_content
# Track tool calls
delta_tool_calls = delta.get('tool_calls', [])
if delta_tool_calls:
for tc in delta_tool_calls:
tool_calls_from_stream.append(tc)
# Pass through the chunk as-is
if isinstance(chunk, bytes):
yield chunk
elif isinstance(chunk, str):
yield chunk.encode('utf-8')
else:
yield f"data: {json.dumps(chunk)}\n\n".encode('utf-8')
except Exception as chunk_error:
logger.warning(f"Error processing Kilo chunk: {chunk_error}")
continue
logger.info(f"Kilo streaming processed {chunk_count} chunks total")
elif is_kiro_stream:
# Handle Kiro streaming response
# Kiro returns an async generator that yields OpenAI-compatible SSE strings directly
# We need to parse these and handle tool calls properly
......@@ -2753,9 +2827,10 @@ class RotationHandler:
import json
logger = logging.getLogger(__name__)
# Check if this is a Google provider based on configuration
# Check if this is a Google or Kilo provider based on configuration
is_google_provider = provider_type == 'google'
logger.info(f"Creating streaming response for provider type: {provider_type}, is_google: {is_google_provider}")
is_kilo_provider = provider_type in ('kilo', 'kilocode')
logger.info(f"Creating streaming response for provider type: {provider_type}, is_google: {is_google_provider}, is_kilo: {is_kilo_provider}")
# Generate system_fingerprint for this request
# If seed is present in request, generate unique fingerprint per request
......@@ -3031,6 +3106,47 @@ class RotationHandler:
}]
}
yield f"data: {json.dumps(final_chunk)}\n\n".encode('utf-8')
elif is_kilo_provider:
# Handle Kilo/KiloCode streaming response
# Kilo returns an async generator that yields OpenAI-compatible SSE bytes
accumulated_response_text = ""
chunk_count = 0
async for chunk in response:
chunk_count += 1
try:
# Pass through the chunk as-is (already in SSE format)
if isinstance(chunk, bytes):
# Parse to track content for token counting
try:
chunk_str = chunk.decode('utf-8')
for sse_line in chunk_str.split('\n'):
sse_line = sse_line.strip()
if sse_line.startswith('data: '):
data_str = sse_line[6:].strip()
if data_str and data_str != '[DONE]':
try:
chunk_data = json.loads(data_str)
choices = chunk_data.get('choices', [])
if choices:
delta = choices[0].get('delta', {})
delta_content = delta.get('content', '')
if delta_content:
accumulated_response_text += delta_content
except json.JSONDecodeError:
pass
except (UnicodeDecodeError, Exception):
pass
yield chunk
elif isinstance(chunk, str):
yield chunk.encode('utf-8')
else:
yield f"data: {json.dumps(chunk)}\n\n".encode('utf-8')
except Exception as chunk_error:
logger.warning(f"Error processing Kilo chunk: {chunk_error}")
continue
logger.info(f"Kilo streaming processed {chunk_count} chunks total")
else:
# Handle OpenAI/Anthropic/Kiro streaming responses
# Some providers return async generators, others return sync iterables
......
......@@ -6163,8 +6163,17 @@ class KiloProviderHandler(BaseProviderHandler):
from .kilo_oauth2 import KiloOAuth2
self.oauth2 = KiloOAuth2(credentials_file=credentials_file, api_base=api_base)
# Get endpoint from config
endpoint = getattr(self.provider_config, 'endpoint', 'https://api.kilo.ai')
# Use the configured endpoint, falling back to the canonical kilo.ai/api/openrouter/v1
configured_endpoint = getattr(self.provider_config, 'endpoint', None)
if configured_endpoint:
# Ensure endpoint ends with /v1 for OpenAI SDK compatibility
endpoint = configured_endpoint.rstrip('/')
if not endpoint.endswith('/v1'):
endpoint = endpoint + '/v1'
else:
endpoint = 'https://kilo.ai/api/openrouter/v1'
self._kilo_endpoint = endpoint
# Initialize OpenAI client (will use OAuth2 token as API key)
self.client = OpenAI(base_url=endpoint, api_key=api_key or "placeholder")
......@@ -6213,6 +6222,7 @@ class KiloProviderHandler(BaseProviderHandler):
try:
import logging
import json
logging.info(f"KiloProviderHandler: Handling request for model {model}")
if AISBF_DEBUG:
logging.info(f"KiloProviderHandler: Messages: {messages}")
......@@ -6270,6 +6280,13 @@ class KiloProviderHandler(BaseProviderHandler):
if tool_choice is not None:
request_params["tool_choice"] = tool_choice
# For streaming requests, use httpx async streaming directly
# to avoid blocking the event loop with the synchronous OpenAI SDK
if stream:
logging.info(f"KiloProviderHandler: Using async httpx streaming mode")
return await self._handle_streaming_request(request_params, token, model)
# Non-streaming: use the synchronous OpenAI SDK client
response = self.client.chat.completions.create(**request_params)
logging.info(f"KiloProviderHandler: Response received: {response}")
self.record_success()
......@@ -6290,42 +6307,234 @@ class KiloProviderHandler(BaseProviderHandler):
self.record_failure()
raise e
async def _handle_streaming_request(self, request_params: Dict, token: str, model: str):
"""
Handle streaming request to Kilo API using httpx async streaming.
This method pre-validates the upstream response status BEFORE returning
the streaming generator. This ensures that errors (404, 400, etc.) are
raised immediately and returned as proper error responses to the client,
rather than being swallowed after a 200 OK has already been sent.
Uses direct async HTTP instead of the synchronous OpenAI SDK to avoid
blocking the event loop and to provide better control over SSE parsing.
Args:
request_params: The OpenAI-compatible request parameters (with stream=True)
token: The OAuth2/API key token for authentication
model: The model name being used
Returns:
Async generator that yields OpenAI-compatible SSE chunks as bytes
Raises:
Exception: If the upstream provider returns an error response
"""
import logging
import json
logger = logging.getLogger(__name__)
logger.info(f"KiloProviderHandler: Starting async streaming request to {self._kilo_endpoint}")
# Build the full URL for chat completions
api_url = f"{self._kilo_endpoint}/chat/completions"
# Build headers
headers = {
'Authorization': f'Bearer {token}',
'Content-Type': 'application/json',
'Accept': 'text/event-stream',
}
if AISBF_DEBUG:
logger.info(f"=== KILO STREAMING REQUEST DETAILS ===")
logger.info(f"URL: {api_url}")
logger.info(f"Payload: {json.dumps(request_params, indent=2)}")
logger.info(f"=== END KILO STREAMING REQUEST DETAILS ===")
# Phase 1: Open connection and validate status BEFORE returning generator.
# This ensures errors are raised immediately (before 200 OK is sent to client),
# not lazily when the generator is consumed.
streaming_client = httpx.AsyncClient(timeout=httpx.Timeout(300.0, connect=30.0))
try:
request = streaming_client.build_request("POST", api_url, headers=headers, json=request_params)
response = await streaming_client.send(request, stream=True)
logger.info(f"KiloProviderHandler: Streaming response status: {response.status_code}")
if response.status_code >= 400:
error_text = await response.aread()
await response.aclose()
await streaming_client.aclose()
logger.error(f"KiloProviderHandler: Streaming error response: {error_text}")
try:
error_json = json.loads(error_text)
error_message = error_json.get('error', {}).get('message', 'Unknown error') if isinstance(error_json.get('error'), dict) else str(error_json.get('error', 'Unknown error'))
except (json.JSONDecodeError, Exception):
error_message = error_text.decode('utf-8') if isinstance(error_text, bytes) else str(error_text)
if response.status_code == 429:
self.handle_429_error(
error_json if 'error_json' in locals() else error_message,
dict(response.headers)
)
self.record_failure()
raise Exception(f"Kilo API streaming error ({response.status_code}): {error_message}")
except Exception:
# Ensure client is closed on any error during connection setup
await streaming_client.aclose()
raise
# Phase 2: Connection is validated (2xx status), return the streaming generator.
# The generator takes ownership of streaming_client and response and will close
# them when done.
return self._stream_kilo_response(streaming_client, response, model)
async def _stream_kilo_response(self, streaming_client, response, model: str):
"""
Yield SSE chunks from an already-validated Kilo streaming response.
This generator is only called after the upstream response status has been
verified as 2xx, so it only handles the happy path of streaming data.
Takes ownership of streaming_client and response, closing them when done.
Args:
streaming_client: The httpx.AsyncClient that owns the connection
response: The already-opened httpx streaming response (status validated)
model: The model name being used
Yields:
OpenAI-compatible SSE chunks as bytes
"""
import logging
import json
logger = logging.getLogger(__name__)
try:
# Process the SSE stream line by line
async for line in response.aiter_lines():
if not line:
continue
# SSE format: lines starting with "data: "
if line.startswith('data: '):
data_str = line[6:]
if data_str.strip() == '[DONE]':
yield b"data: [DONE]\n\n"
break
try:
chunk_data = json.loads(data_str)
# Pass through the chunk as-is (it's already in OpenAI format)
yield f"data: {json.dumps(chunk_data, ensure_ascii=False)}\n\n".encode('utf-8')
except json.JSONDecodeError as e:
logger.warning(f"KiloProviderHandler: Failed to parse streaming chunk: {e}")
continue
elif line.startswith(':'):
# SSE comment (keep-alive), skip
continue
logger.info(f"KiloProviderHandler: Streaming completed successfully")
self.record_success()
finally:
# Always close response and client when generator is done or on error
await response.aclose()
await streaming_client.aclose()
async def get_models(self) -> List[Model]:
try:
import logging
import json
logging.info("KiloProviderHandler: Getting models list")
# Ensure we have a valid token
token = await self._ensure_authenticated()
# Update client with valid token
self.client.api_key = token
# Apply rate limiting
await self.apply_rate_limit()
models = self.client.models.list()
logging.info(f"KiloProviderHandler: Models received: {models}")
# Use the correct Kilo models endpoint directly
# The OpenAI SDK appends /models to the base_url (which includes /v1),
# but Kilo's models endpoint is at the base path /models (without /v1)
# Derive models_url from the configured endpoint by stripping /v1
base_endpoint = self._kilo_endpoint.rstrip('/')
if base_endpoint.endswith('/v1'):
models_url = base_endpoint[:-3] + '/models'
else:
models_url = base_endpoint + '/models'
logging.info(f"KiloProviderHandler: Fetching models from {models_url}")
headers = {
'Authorization': f'Bearer {token}',
'Content-Type': 'application/json',
}
async with httpx.AsyncClient(timeout=httpx.Timeout(30.0, connect=10.0)) as client:
response = await client.get(models_url, headers=headers)
logging.info(f"KiloProviderHandler: Models response status: {response.status_code}")
if response.status_code != 200:
logging.warning(f"KiloProviderHandler: Models endpoint returned {response.status_code}")
try:
error_body = response.json()
logging.warning(f"KiloProviderHandler: Error response: {error_body}")
except Exception:
logging.warning(f"KiloProviderHandler: Error response (text): {response.text[:200]}")
response.raise_for_status()
models_data = response.json()
logging.info(f"KiloProviderHandler: Models received: {models_data}")
# Parse response - expect OpenAI-compatible format with 'data' array
models_list = models_data.get('data', []) if isinstance(models_data, dict) else models_data
result = []
for model in models:
# Extract context size if available
context_size = None
if hasattr(model, 'context_window') and model.context_window:
context_size = model.context_window
elif hasattr(model, 'context_length') and model.context_length:
context_size = model.context_length
elif hasattr(model, 'max_context_length') and model.max_context_length:
context_size = model.max_context_length
result.append(Model(
id=model.id,
name=model.id,
provider_id=self.provider_id,
context_size=context_size,
context_length=context_size
))
for model_entry in models_list:
if isinstance(model_entry, dict):
model_id = model_entry.get('id', '')
model_name = model_entry.get('name', model_id) or model_id
# Extract context size if available
context_size = (
model_entry.get('context_window') or
model_entry.get('context_length') or
model_entry.get('max_context_length')
)
if model_id:
result.append(Model(
id=model_id,
name=model_name,
provider_id=self.provider_id,
context_size=context_size,
context_length=context_size
))
elif hasattr(model_entry, 'id'):
# Handle OpenAI SDK model objects (fallback)
context_size = None
if hasattr(model_entry, 'context_window') and model_entry.context_window:
context_size = model_entry.context_window
elif hasattr(model_entry, 'context_length') and model_entry.context_length:
context_size = model_entry.context_length
result.append(Model(
id=model_entry.id,
name=model_entry.id,
provider_id=self.provider_id,
context_size=context_size,
context_length=context_size
))
logging.info(f"KiloProviderHandler: Parsed {len(result)} models")
return result
except Exception as e:
import logging
......
......@@ -174,13 +174,18 @@
"kilo": {
"id": "kilo",
"name": "KiloCode",
"endpoint": "https://kilocode.ai/api/openrouter",
"type": "openai",
"api_key_required": true,
"api_key": "YOUR_KILO_API_KEY",
"endpoint": "https://kilo.ai/api/openrouter/v1",
"type": "kilo",
"api_key_required": false,
"api_key": "",
"nsfw": false,
"privacy": false,
"rate_limit": 0
"rate_limit": 0,
"kilo_config": {
"_comment": "Uses Kilo OAuth2 Device Authorization Grant flow",
"credentials_file": "~/.kilo_credentials.json",
"api_base": "https://api.kilo.ai"
}
},
"perplexity": {
"id": "perplexity",
......
......@@ -42,11 +42,14 @@ import time
import logging
import sys
import os
import signal
import atexit
import argparse
import secrets
import hashlib
import asyncio
import httpx
import multiprocessing
from logging.handlers import RotatingFileHandler
from datetime import datetime, timedelta
from collections import defaultdict
......@@ -1040,6 +1043,44 @@ async def startup_event():
logger.info(f"Available rotations: {list(config.rotations.keys()) if config else []}")
logger.info(f"Available autoselect: {list(config.autoselect.keys()) if config else []}")
def _cleanup_multiprocessing_children():
"""Terminate any lingering multiprocessing child processes."""
try:
active_children = multiprocessing.active_children()
if active_children:
logger.info(f"Terminating {len(active_children)} multiprocessing child process(es)...")
for child in active_children:
logger.debug(f" Terminating child process: {child.name} (PID {child.pid})")
child.terminate()
# Give them a moment to terminate gracefully
for child in active_children:
child.join(timeout=2)
# Force kill any still alive
for child in multiprocessing.active_children():
logger.warning(f" Force killing child process: {child.name} (PID {child.pid})")
child.kill()
except Exception as e:
logger.warning(f"Error cleaning up multiprocessing children: {e}")
def _signal_handler(signum, frame):
"""Handle SIGINT/SIGTERM for clean shutdown including multiprocessing children."""
sig_name = signal.Signals(signum).name
logger.info(f"Received {sig_name}, shutting down...")
_cleanup_multiprocessing_children()
# Re-raise the signal so uvicorn can handle its own shutdown
signal.signal(signum, signal.SIG_DFL)
os.kill(os.getpid(), signum)
# Register signal handlers for clean shutdown
signal.signal(signal.SIGINT, _signal_handler)
signal.signal(signal.SIGTERM, _signal_handler)
# Also register atexit handler as a safety net
atexit.register(_cleanup_multiprocessing_children)
@app.on_event("shutdown")
async def shutdown_event():
"""Cleanup on shutdown"""
......@@ -1050,6 +1091,9 @@ async def shutdown_event():
logger.info("Shutting down TOR hidden service...")
tor_service.disconnect()
logger.info("TOR hidden service shutdown complete")
# Cleanup multiprocessing children (sentence-transformers, torch, etc.)
_cleanup_multiprocessing_children()
# Authentication middleware
@app.middleware("http")
......
......@@ -152,7 +152,7 @@ function renderProviderDetails(key) {
if (isKiloProvider && !provider.kilo_config) {
provider.kilo_config = {
credentials_file: '~/.kilo_credentials.json',
api_base: 'https://kilocode.ai/api/openrouter'
api_base: 'https://api.kilo.ai/api/gateway'
};
}
......@@ -253,7 +253,7 @@ function renderProviderDetails(key) {
<div class="form-group">
<label>API Base URL</label>
<input type="text" value="${kiloConfig.api_base || 'https://kilocode.ai/api/openrouter'}" readonly style="background: #0f2840; cursor: not-allowed;" placeholder="https://kilocode.ai/api/openrouter">
<input type="text" value="${kiloConfig.api_base || 'https://api.kilo.ai/api/gateway'}" readonly style="background: #0f2840; cursor: not-allowed;" placeholder="https://api.kilo.ai/api/gateway">
<small style="color: #a0a0a0; display: block; margin-top: 5px;">Kilocode API base URL (fixed)</small>
</div>
......@@ -591,7 +591,7 @@ function updateNewProviderDefaults() {
'ollama': 'Ollama local provider. No API key required by default. Endpoint: http://localhost:11434/api',
'kiro': 'Kiro (Amazon Q Developer) provider. Uses Kiro credentials (IDE, CLI, or direct tokens). Endpoint: https://q.us-east-1.amazonaws.com',
'claude': 'Claude Code provider. Uses OAuth2 authentication (browser-based login). Endpoint: https://api.anthropic.com/v1',
'kilocode': 'Kilocode provider. Uses OAuth2 Device Authorization Grant. Endpoint: https://kilocode.ai/api/openrouter'
'kilocode': 'Kilocode provider. Uses OAuth2 Device Authorization Grant. Endpoint: https://api.kilo.ai/api/gateway'
};
descriptionEl.textContent = descriptions[providerType] || 'Standard provider configuration.';
......@@ -653,11 +653,11 @@ function confirmAddProvider() {
credentials_file: '~/.claude_credentials.json'
};
} else if (providerType === 'kilocode') {
newProvider.endpoint = 'https://kilocode.ai/api/openrouter';
newProvider.endpoint = 'https://api.kilo.ai/api/gateway';
newProvider.name = key + ' (Kilocode OAuth2)';
newProvider.kilo_config = {
credentials_file: '~/.kilo_credentials.json',
api_base: 'https://kilocode.ai/api/openrouter'
api_base: 'https://api.kilo.ai/api/gateway'
};
} else if (providerType === 'openai') {
newProvider.endpoint = 'https://api.openai.com/v1';
......@@ -772,12 +772,12 @@ function updateProviderType(key, newType) {
providersData[key].api_key_required = false;
providersData[key].kilo_config = {
credentials_file: '~/.kilo_credentials.json',
api_base: 'https://kilocode.ai/api/openrouter'
api_base: 'https://api.kilo.ai/api/gateway'
};
delete providersData[key].kiro_config;
delete providersData[key].claude_config;
// Set endpoint for kilocode (fixed, not modifiable)
providersData[key].endpoint = 'https://kilocode.ai/api/openrouter';
providersData[key].endpoint = 'https://api.kilo.ai/api/gateway';
} else if (newType !== 'kiro' && newType !== 'claude' && newType !== 'kilocode' && (oldType === 'kiro' || oldType === 'claude' || oldType === 'kilocode')) {
// Transitioning FROM kiro/claude/kilocode: remove special configs, set api_key_required to true
providersData[key].api_key_required = true;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment