Rate limitng and message splitting

parent e5494efd
......@@ -427,12 +427,222 @@ This AI.PROMPT file is automatically updated when significant changes are made t
### Recent Updates
**2026-02-07 - Version 0.2.7**
- Added max_request_tokens support for automatic request splitting
- Updated ProviderConfig to include optional models field with max_request_tokens
- Updated Model class to include max_request_tokens field
- Added token counting utilities using tiktoken and langchain-text-splitters in aisbf/utils.py
- Updated RequestHandler and RotationHandler to automatically split requests exceeding max_request_tokens
- Updated config/providers.json with example models including max_request_tokens
- Updated config/rotations.json with example models including max_request_tokens
- Added tiktoken and langchain-text-splitters dependencies to requirements.txt
**2026-02-06 - Version 0.2.6**
- Bumped version to 0.2.6 in setup.py, pyproject.toml, and aisbf/__init__.py
- Updated README.md and DOCUMENTATION.md with comprehensive API endpoint documentation
- Added detailed sections for General, Provider, Rotation, and Autoselect endpoints
- Documented rotation load balancing and AI-assisted autoselect based on content analysis
## max_request_tokens Feature
### Overview
The `max_request_tokens` feature allows automatic splitting of large requests into multiple smaller requests when the token count exceeds a specified limit. This is useful for models with strict token limits or for handling very large inputs.
### Configuration
#### providers.json
In [`providers.json`](config/providers.json), each provider can optionally include a `models` field containing model definitions:
```json
{
"providers": {
"gemini": {
"id": "gemini",
"name": "Google AI Studio",
"endpoint": "https://generativelanguage.googleapis.com/v1beta",
"type": "google",
"api_key_required": true,
"rate_limit": 0,
"models": [
{
"name": "gemini-2.0-flash",
"rate_limit": 0,
"max_request_tokens": 1000000
}
]
}
}
}
```
#### rotations.json
In [`rotations.json`](config/rotations.json), each model definition can optionally include `max_request_tokens`:
```json
{
"rotations": {
"coding": {
"model_name": "coding",
"providers": [
{
"provider_id": "openai",
"api_key": "YOUR_OPENAI_API_KEY",
"models": [
{
"name": "gpt-4",
"weight": 2,
"rate_limit": 0,
"max_request_tokens": 128000
}
]
}
]
}
}
}
```
### Behavior
## Token Rate Limiting Feature
### Overview
The token rate limiting feature allows you to set limits on token usage per model to control costs and prevent abuse. When a token rate limit is exceeded, the provider is automatically disabled for a specified duration.
### Configuration
#### providers.json
In [`providers.json`](config/providers.json), model definitions can include optional token rate limit fields:
```json
{
"providers": {
"gemini": {
"id": "gemini",
"name": "Google AI Studio",
"endpoint": "https://generativelanguage.googleapis.com/v1beta",
"type": "google",
"api_key_required": true,
"rate_limit": 0,
"models": [
{
"name": "gemini-2.0-flash",
"rate_limit": 0,
"max_request_tokens": 100000,
"rate_limit_TPM": 15000,
"rate_limit_TPH": 100000,
"rate_limit_TPD": 1000000
}
]
}
}
}
```
#### rotations.json
In [`rotations.json`](config/rotations.json), model definitions can include optional token rate limit fields:
```json
{
"rotations": {
"coding": {
"model_name": "coding",
"providers": [
{
"provider_id": "gemini",
"api_key": "YOUR_GEMINI_API_KEY",
"models": [
{
"name": "gemini-2.0-flash",
"weight": 3,
"rate_limit": 0,
"max_request_tokens": 100000,
"rate_limit_TPM": 15000,
"rate_limit_TPH": 100000,
"rate_limit_TPD": 1000000
}
]
}
]
}
}
}
```
### Rate Limit Fields
- **`rate_limit_TPM`**: Maximum tokens allowed per minute (Tokens Per Minute)
- **`rate_limit_TPH`**: Maximum tokens allowed per hour (Tokens Per Hour)
- **`rate_limit_TPD`**: Maximum tokens allowed per day (Tokens Per Day)
### Behavior
When a token rate limit is exceeded:
1. The provider is automatically disabled for a duration based on which limit was exceeded:
- TPM limit exceeded: Provider disabled for 1 minute
- TPH limit exceeded: Provider disabled for 1 hour
- TPD limit exceeded: Provider disabled for 1 day
2. The rotation handler will skip the disabled provider and try the next available model
3. Token usage is tracked across all requests and accumulated for each time window
4. Old usage data outside the time window is automatically cleaned up
### Implementation
- Token counting uses tiktoken for accurate token estimation
- Usage is tracked per model in the provider handler
- Rate limit checks are performed before making requests
- Providers are automatically re-enabled after the cooldown period expires
When a request is made to a provider or rotation:
1. The system checks if the model has a `max_request_tokens` configured
2. If configured, the system counts the tokens in the request using tiktoken
3. If the request token count exceeds `max_request_tokens`, the request is automatically split into multiple chunks
4. Each chunk is sent sequentially to the provider
5. Responses from all chunks are combined into a single response
6. The combined response includes metadata indicating it was chunked:
- `aisbf_chunked: true`
- `aisbf_total_chunks: <number of chunks>`
### Limitations
- Streaming is not supported for chunked requests (falls back to non-streaming)
- Tools and tool_choice are only sent with the first chunk
- If a chunk fails, the system returns partial results from successful chunks
### Token Counting
The system uses tiktoken for accurate token counting. The encoding is automatically selected based on the model name:
- OpenAI models (gpt-*, text-embedding-*) use cl100k_base encoding
- Other models use a default encoding
#### Text Splitting
The system uses `langchain-text-splitters`'s `TokenTextSplitter` for intelligent text splitting:
- **chunk_size**: Set slightly below max_request_tokens to allow room for model output (e.g., max_request_tokens - 200)
- **chunk_overlap**: 100 tokens shared between chunks for context continuity
- This helps the model understand the transition between chunks
### Example
A request with 7,999 tokens and `max_request_tokens: 4000` would be split into 2 chunks:
- Chunk 1: ~3,800 tokens (with 100 token overlap from previous)
- Chunk 2: ~3,999 tokens
The responses are combined and returned as a single response with metadata indicating the chunking occurred.
### How to Update AI.PROMPT
When making changes that affect the project structure or functionality:
......
......@@ -10,6 +10,9 @@ A modular proxy server for managing multiple AI provider integrations with unifi
- **Streaming Support**: Full support for streaming responses from all providers
- **Error Tracking**: Automatic provider disabling after consecutive failures with cooldown periods
- **Rate Limiting**: Built-in rate limiting and graceful error handling
- **Request Splitting**: Automatic splitting of large requests when exceeding `max_request_tokens` limit
- **Token Rate Limiting**: Per-model token usage tracking with TPM (tokens per minute), TPH (tokens per hour), and TPD (tokens per day) limits
- **Automatic Provider Disabling**: Providers automatically disabled when token rate limits are exceeded
## Author
......@@ -69,8 +72,22 @@ See [`PYPI.md`](PYPI.md) for detailed instructions on publishing to PyPI.
- OpenAI and openai-compatible endpoints (openai)
- Anthropic (anthropic)
- Ollama (direct HTTP)
## Configuration
### Model Configuration
Models can be configured with the following optional fields:
- **`max_request_tokens`**: Maximum tokens allowed per request. Requests exceeding this limit are automatically split into multiple smaller requests.
- **`rate_limit_TPM`**: Maximum tokens allowed per minute (Tokens Per Minute)
- **`rate_limit_TPH`**: Maximum tokens allowed per hour (Tokens Per Hour)
- **`rate_limit_TPD`**: Maximum tokens allowed per day (Tokens Per Day)
When token rate limits are exceeded, providers are automatically disabled:
- TPM limit exceeded: Provider disabled for 1 minute
- TPH limit exceeded: Provider disabled for 1 hour
- TPD limit exceeded: Provider disabled for 1 day
See `config/providers.json` and `config/rotations.json` for configuration examples.
## API Endpoints
......
......@@ -28,6 +28,13 @@ import json
import shutil
from pathlib import Path
class ProviderModelConfig(BaseModel):
"""Model configuration within a provider"""
name: str
rate_limit: Optional[float] = None
max_request_tokens: Optional[int] = None
class ProviderConfig(BaseModel):
id: str
name: str
......@@ -35,6 +42,8 @@ class ProviderConfig(BaseModel):
type: str
api_key_required: bool
rate_limit: float = 0.0
api_key: Optional[str] = None # Optional API key in provider config
models: Optional[List[ProviderModelConfig]] = None # Optional list of models with their configs
class RotationConfig(BaseModel):
providers: List[Dict]
......
......@@ -26,13 +26,19 @@ import asyncio
import re
import uuid
import hashlib
from typing import Dict, List, Optional
import time as time_module
from typing import Dict, List, Optional, Union
from pathlib import Path
from fastapi import HTTPException, Request
from fastapi.responses import JSONResponse, StreamingResponse
from .models import ChatCompletionRequest, ChatCompletionResponse
from .providers import get_provider_handler
from .config import config
from .utils import (
count_messages_tokens,
split_messages_into_chunks,
get_max_request_tokens_for_model
)
def generate_system_fingerprint(provider_id: str, seed: Optional[int] = None) -> str:
......@@ -68,6 +74,134 @@ class RequestHandler:
def __init__(self):
self.config = config
async def _handle_chunked_request(
self,
handler,
model: str,
messages: List[Dict],
max_tokens: Optional[int],
temperature: float,
stream: bool,
tools: Optional[List[Dict]],
tool_choice: Optional[Union[str, Dict]],
max_request_tokens: int,
provider_id: str,
logger
) -> Dict:
"""
Handle a request that needs to be split into multiple chunks due to token limits.
This method splits the request into chunks, sends each chunk sequentially,
and combines the responses into a single response.
Args:
handler: The provider handler
model: The model name
messages: The messages to send
max_tokens: Max output tokens
temperature: Temperature setting
stream: Whether to stream (not supported for chunked requests)
tools: Tool definitions
tool_choice: Tool choice setting
max_request_tokens: Maximum tokens per request
provider_id: Provider identifier
logger: Logger instance
Returns:
Combined response from all chunks
"""
import time
logger.info(f"=== CHUNKED REQUEST HANDLING START ===")
logger.info(f"Max request tokens per chunk: {max_request_tokens}")
# Split messages into chunks
message_chunks = split_messages_into_chunks(messages, max_request_tokens, model)
logger.info(f"Split into {len(message_chunks)} message chunks")
if stream:
logger.warning("Streaming is not supported for chunked requests, falling back to non-streaming")
# Process each chunk and collect responses
all_responses = []
combined_content = ""
total_prompt_tokens = 0
total_completion_tokens = 0
created_time = int(time_module.time())
response_id = f"chunked-{provider_id}-{model}-{created_time}"
for chunk_idx, chunk_messages in enumerate(message_chunks):
logger.info(f"Processing chunk {chunk_idx + 1}/{len(message_chunks)}")
logger.info(f"Chunk messages count: {len(chunk_messages)}")
# Apply rate limiting between chunks
if chunk_idx > 0:
await handler.apply_rate_limit()
try:
chunk_response = await handler.handle_request(
model=model,
messages=chunk_messages,
max_tokens=max_tokens,
temperature=temperature,
stream=False, # Always non-streaming for chunked requests
tools=tools if chunk_idx == 0 else None, # Only first chunk uses tools
tool_choice=tool_choice if chunk_idx == 0 else None
)
# Extract content from response
if isinstance(chunk_response, dict):
choices = chunk_response.get('choices', [])
if choices:
content = choices[0].get('message', {}).get('content', '')
combined_content += content
# Accumulate token usage
usage = chunk_response.get('usage', {})
total_prompt_tokens += usage.get('prompt_tokens', 0)
total_completion_tokens += usage.get('completion_tokens', 0)
all_responses.append(chunk_response)
logger.info(f"Chunk {chunk_idx + 1} processed successfully")
except Exception as e:
logger.error(f"Error processing chunk {chunk_idx + 1}: {e}")
# If a chunk fails, we still try to return what we have
if all_responses:
logger.warning("Returning partial results from successful chunks")
break
else:
raise e
# Build combined response
combined_response = {
"id": response_id,
"object": "chat.completion",
"created": created_time,
"model": model,
"choices": [{
"index": 0,
"message": {
"role": "assistant",
"content": combined_content
},
"finish_reason": "stop"
}],
"usage": {
"prompt_tokens": total_prompt_tokens,
"completion_tokens": total_completion_tokens,
"total_tokens": total_prompt_tokens + total_completion_tokens
},
"aisbf_chunked": True,
"aisbf_total_chunks": len(message_chunks)
}
logger.info(f"=== CHUNKED REQUEST HANDLING END ===")
logger.info(f"Combined content length: {len(combined_content)} characters")
logger.info(f"Total chunks processed: {len(all_responses)}")
return combined_response
async def handle_chat_completion(self, request: Request, provider_id: str, request_data: Dict) -> Dict:
import logging
logger = logging.getLogger(__name__)
......@@ -98,12 +232,54 @@ class RequestHandler:
raise HTTPException(status_code=503, detail="Provider temporarily unavailable")
try:
logger.info(f"Model requested: {request_data.get('model')}")
logger.info(f"Messages count: {len(request_data.get('messages', []))}")
model = request_data.get('model')
messages = request_data.get('messages', [])
logger.info(f"Model requested: {model}")
logger.info(f"Messages count: {len(messages)}")
logger.info(f"Max tokens: {request_data.get('max_tokens')}")
logger.info(f"Temperature: {request_data.get('temperature', 1.0)}")
logger.info(f"Stream: {request_data.get('stream', False)}")
# Check for max_request_tokens in provider config
max_request_tokens = get_max_request_tokens_for_model(
model_name=model,
provider_config=provider_config,
rotation_model_config=None
)
if max_request_tokens:
# Count tokens in the request
request_tokens = count_messages_tokens(messages, model)
logger.info(f"Request tokens: {request_tokens}, max_request_tokens: {max_request_tokens}")
if request_tokens > max_request_tokens:
logger.info(f"Request exceeds max_request_tokens, will split into chunks")
# Apply rate limiting
logger.info("Applying rate limiting...")
await handler.apply_rate_limit()
logger.info("Rate limiting applied")
# Handle as chunked request
response = await self._handle_chunked_request(
handler=handler,
model=model,
messages=messages,
max_tokens=request_data.get('max_tokens'),
temperature=request_data.get('temperature', 1.0),
stream=request_data.get('stream', False),
tools=request_data.get('tools'),
tool_choice=request_data.get('tool_choice'),
max_request_tokens=max_request_tokens,
provider_id=provider_id,
logger=logger
)
handler.record_success()
logger.info(f"=== RequestHandler.handle_chat_completion END ===")
return response
# Apply rate limiting
logger.info("Applying rate limiting...")
await handler.apply_rate_limit()
......@@ -111,8 +287,8 @@ class RequestHandler:
logger.info(f"Sending request to provider handler...")
response = await handler.handle_request(
model=request_data['model'],
messages=request_data['messages'],
model=model,
messages=messages,
max_tokens=request_data.get('max_tokens'),
temperature=request_data.get('temperature', 1.0),
stream=request_data.get('stream', False),
......@@ -360,6 +536,170 @@ class RotationHandler:
return provider_config.type
return None
async def _handle_chunked_rotation_request(
self,
handler,
model_name: str,
messages: List[Dict],
max_tokens: Optional[int],
temperature: float,
stream: bool,
tools: Optional[List[Dict]],
tool_choice: Optional[Union[str, Dict]],
max_request_tokens: int,
provider_id: str,
logger
) -> Dict:
"""
Handle a rotation request that needs to be split into multiple chunks due to token limits.
This method splits the request into chunks, sends each chunk sequentially,
and combines the responses into a single response.
Args:
handler: The provider handler
model_name: The model name
messages: The messages to send
max_tokens: Max output tokens
temperature: Temperature setting
stream: Whether to stream (not supported for chunked requests)
tools: Tool definitions
tool_choice: Tool choice setting
max_request_tokens: Maximum tokens per request
provider_id: Provider identifier
logger: Logger instance
Returns:
Combined response from all chunks
"""
logger.info(f"=== ROTATION CHUNKED REQUEST HANDLING START ===")
logger.info(f"Max request tokens per chunk: {max_request_tokens}")
# Split messages into chunks
message_chunks = split_messages_into_chunks(messages, max_request_tokens, model_name)
logger.info(f"Split into {len(message_chunks)} message chunks")
if stream:
logger.warning("Streaming is not supported for chunked rotation requests, falling back to non-streaming")
# Process each chunk and collect responses
all_responses = []
combined_content = ""
total_prompt_tokens = 0
total_completion_tokens = 0
created_time = int(time_module.time())
response_id = f"chunked-rotation-{provider_id}-{model_name}-{created_time}"
for chunk_idx, chunk_messages in enumerate(message_chunks):
logger.info(f"Processing chunk {chunk_idx + 1}/{len(message_chunks)}")
logger.info(f"Chunk messages count: {len(chunk_messages)}")
# Apply rate limiting between chunks
if chunk_idx > 0:
await handler.apply_rate_limit()
try:
chunk_response = await handler.handle_request(
model=model_name,
messages=chunk_messages,
max_tokens=max_tokens,
temperature=temperature,
stream=False, # Always non-streaming for chunked requests
tools=tools if chunk_idx == 0 else None, # Only first chunk uses tools
tool_choice=tool_choice if chunk_idx == 0 else None
)
# Extract content from response
if isinstance(chunk_response, dict):
choices = chunk_response.get('choices', [])
if choices:
content = choices[0].get('message', {}).get('content', '')
combined_content += content
# Accumulate token usage
usage = chunk_response.get('usage', {})
chunk_total_tokens = usage.get('total_tokens', 0)
total_prompt_tokens += usage.get('prompt_tokens', 0)
total_completion_tokens += usage.get('completion_tokens', 0)
# Record token usage for rate limit tracking
if chunk_total_tokens > 0:
handler._record_token_usage(model_name, chunk_total_tokens)
logger.info(f"Recorded {chunk_total_tokens} tokens for chunk {chunk_idx + 1}")
all_responses.append(chunk_response)
logger.info(f"Chunk {chunk_idx + 1} processed successfully")
except Exception as e:
logger.error(f"Error processing chunk {chunk_idx + 1}: {e}")
# If a chunk fails, we still try to return what we have
if all_responses:
logger.warning("Returning partial results from successful chunks")
break
else:
raise e
# Build combined response
combined_response = {
"id": response_id,
"object": "chat.completion",
"created": created_time,
"model": model_name,
"choices": [{
"index": 0,
"message": {
"role": "assistant",
"content": combined_content
},
"finish_reason": "stop"
}],
"usage": {
"prompt_tokens": total_prompt_tokens,
"completion_tokens": total_completion_tokens,
"total_tokens": total_prompt_tokens + total_completion_tokens
},
"aisbf_chunked": True,
"aisbf_total_chunks": len(message_chunks)
}
logger.info(f"=== ROTATION CHUNKED REQUEST HANDLING END ===")
logger.info(f"Combined content length: {len(combined_content)} characters")
logger.info(f"Total chunks processed: {len(all_responses)}")
return combined_response
def _get_api_key(self, provider_id: str, rotation_api_key: Optional[str] = None) -> Optional[str]:
"""
Get the API key for a provider.
Priority order:
1. API key from provider config (providers.json)
2. API key from rotation config (rotations.json)
Args:
provider_id: The provider identifier
rotation_api_key: Optional API key from rotation configuration
Returns:
The API key to use, or None if not found
"""
import logging
logger = logging.getLogger(__name__)
# First check provider config for api_key
provider_config = self.config.get_provider(provider_id)
if provider_config and hasattr(provider_config, 'api_key') and provider_config.api_key:
logger.info(f"Using API key from provider config for {provider_id}")
return provider_config.api_key
# Fall back to rotation api_key
if rotation_api_key:
logger.info(f"Using API key from rotation config for {provider_id}")
return rotation_api_key
logger.info(f"No API key found for {provider_id}")
return None
async def handle_rotation_request(self, rotation_id: str, request_data: Dict):
"""
Handle a rotation request.
......@@ -405,8 +745,11 @@ class RotationHandler:
skipped_providers.append(provider_id)
continue
# Get API key: first from provider config, then from rotation config
api_key = self._get_api_key(provider_id, provider.get('api_key'))
# Check if provider is rate limited/deactivated
provider_handler = get_provider_handler(provider_id, provider.get('api_key'))
provider_handler = get_provider_handler(provider_id, api_key)
if provider_handler.is_rate_limited():
logger.warning(f" [SKIPPED] Provider {provider_id} is rate limited/deactivated")
logger.warning(f" Reason: Provider has exceeded failure threshold or is in cooldown period")
......@@ -429,9 +772,10 @@ class RotationHandler:
logger.info(f" Rate Limit: {model_rate_limit}")
# Add provider_id and api_key to model for later use
# Use resolved api_key (from provider config or rotation config)
model_with_provider = model.copy()
model_with_provider['provider_id'] = provider_id
model_with_provider['api_key'] = provider.get('api_key')
model_with_provider['api_key'] = api_key
available_models.append(model_with_provider)
logger.info(f"")
......@@ -536,6 +880,23 @@ class RotationHandler:
logger.warning(f"Provider {provider_id} is rate limited, skipping to next model")
continue
# Check token rate limits for this model
request_tokens = count_messages_tokens(request_data['messages'], model_name)
if handler._check_token_rate_limit(model_name, request_tokens):
logger.warning(f"Model {model_name} would exceed token rate limit, skipping to next model")
# Determine which limit was exceeded and disable provider accordingly
model_config = current_model
if model_config.get('rate_limit_TPM'):
handler._disable_provider_for_duration("1m")
logger.warning(f"Provider {provider_id} disabled for 1 minute due to TPM limit")
elif model_config.get('rate_limit_TPH'):
handler._disable_provider_for_duration("1h")
logger.warning(f"Provider {provider_id} disabled for 1 hour due to TPH limit")
elif model_config.get('rate_limit_TPD'):
handler._disable_provider_for_duration("1d")
logger.warning(f"Provider {provider_id} disabled for 1 day due to TPD limit")
continue
try:
logger.info(f"Model requested: {model_name}")
logger.info(f"Messages count: {len(request_data.get('messages', []))}")
......@@ -543,6 +904,59 @@ class RotationHandler:
logger.info(f"Temperature: {request_data.get('temperature', 1.0)}")
logger.info(f"Stream: {request_data.get('stream', False)}")
# Check for max_request_tokens in rotation model config
max_request_tokens = current_model.get('max_request_tokens')
if max_request_tokens:
# Count tokens in the request
request_tokens = count_messages_tokens(request_data['messages'], model_name)
logger.info(f"Request tokens: {request_tokens}, max_request_tokens: {max_request_tokens}")
if request_tokens > max_request_tokens:
logger.info(f"Request exceeds max_request_tokens, will split into chunks")
# Apply rate limiting
logger.info("Applying rate limiting...")
await handler.apply_rate_limit()
logger.info("Rate limiting applied")
# Handle as chunked request
response = await self._handle_chunked_rotation_request(
handler=handler,
model_name=model_name,
messages=request_data['messages'],
max_tokens=request_data.get('max_tokens'),
temperature=request_data.get('temperature', 1.0),
stream=request_data.get('stream', False),
tools=request_data.get('tools'),
tool_choice=request_data.get('tool_choice'),
max_request_tokens=max_request_tokens,
provider_id=provider_id,
logger=logger
)
handler.record_success()
logger.info(f"=== RotationHandler.handle_rotation_request END ===")
logger.info(f"Request succeeded on attempt {attempt + 1}")
logger.info(f"Successfully used model: {model_name} (provider: {provider_id})")
# Check if response is a streaming request
is_streaming = request_data.get('stream', False)
if is_streaming:
# Get provider type from configuration for proper streaming handling
provider_type = self._get_provider_type(provider_id)
logger.info(f"Returning streaming response for provider type: {provider_type}")
return self._create_streaming_response(
response=response,
provider_type=provider_type,
provider_id=provider_id,
model_name=model_name,
handler=handler,
request_data=request_data
)
else:
logger.info("Returning non-streaming response")
return response
# Apply model-specific rate limiting
rate_limit = current_model.get('rate_limit')
logger.info(f"Model-specific rate limit: {rate_limit}")
......@@ -562,6 +976,15 @@ class RotationHandler:
)
logger.info(f"Response received from provider")
logger.info(f"Response type: {type(response)}")
# Record token usage for rate limit tracking
if isinstance(response, dict):
usage = response.get('usage', {})
total_tokens = usage.get('total_tokens', 0)
if total_tokens > 0:
handler._record_token_usage(model_name, total_tokens)
logger.info(f"Recorded {total_tokens} tokens for model {model_name}")
handler.record_success()
# Update successful variables to the ones that worked
......
......@@ -59,6 +59,10 @@ class Model(BaseModel):
provider_id: str
weight: int = 1
rate_limit: Optional[float] = None
max_request_tokens: Optional[int] = None
rate_limit_TPM: Optional[int] = None # Max tokens per minute
rate_limit_TPH: Optional[int] = None # Max tokens per hour
rate_limit_TPD: Optional[int] = None # Max tokens per day
class Provider(BaseModel):
id: str
......
......@@ -46,12 +46,129 @@ class BaseProviderHandler:
self.rate_limit = config.providers[provider_id].rate_limit
# Add model-level rate limit tracking
self.model_last_request_time = {} # {model_name: timestamp}
# Token usage tracking for rate limits
self.token_usage = {} # {model_name: {"TPM": [], "TPH": [], "TPD": []}}
def is_rate_limited(self) -> bool:
if self.error_tracking['disabled_until'] and self.error_tracking['disabled_until'] > time.time():
return True
return False
def _get_model_config(self, model: str) -> Optional[Dict]:
"""Get model configuration from provider config"""
provider_config = config.providers.get(self.provider_id)
if provider_config and hasattr(provider_config, 'models') and provider_config.models:
for model_config in provider_config.models:
if model_config.get('name') == model:
return model_config
return None
def _check_token_rate_limit(self, model: str, token_count: int) -> bool:
"""
Check if a request would exceed token rate limits.
Returns True if any rate limit would be exceeded, False otherwise.
"""
model_config = self._get_model_config(model)
if not model_config:
return False
import logging
logger = logging.getLogger(__name__)
current_time = time.time()
# Check TPM (tokens per minute)
if model_config.get('rate_limit_TPM'):
tpm = model_config['rate_limit_TPM']
# Get tokens used in the last minute
tokens_used_tpm = self.token_usage.get(model, {}).get('TPM', [])
# Filter to only include requests from the last 60 seconds
one_minute_ago = current_time - 60
recent_tokens_tpm = [t for t in tokens_used_tpm if t > one_minute_ago]
total_tpm = sum(recent_tokens_tpm)
if total_tpm + token_count > tpm:
logger.warning(f"TPM limit would be exceeded: {total_tpm + token_count}/{tpm}")
return True
# Check TPH (tokens per hour)
if model_config.get('rate_limit_TPH'):
tph = model_config['rate_limit_TPH']
# Get tokens used in the last hour
tokens_used_tph = self.token_usage.get(model, {}).get('TPH', [])
# Filter to only include requests from the last 3600 seconds
one_hour_ago = current_time - 3600
recent_tokens_tph = [t for t in tokens_used_tph if t > one_hour_ago]
total_tph = sum(recent_tokens_tph)
if total_tph + token_count > tph:
logger.warning(f"TPH limit would be exceeded: {total_tph + token_count}/{tph}")
return True
# Check TPD (tokens per day)
if model_config.get('rate_limit_TPD'):
tpd = model_config['rate_limit_TPD']
# Get tokens used in the last day
tokens_used_tpd = self.token_usage.get(model, {}).get('TPD', [])
# Filter to only include requests from the last 86400 seconds
one_day_ago = current_time - 86400
recent_tokens_tpd = [t for t in tokens_used_tpd if t > one_day_ago]
total_tpd = sum(recent_tokens_tpd)
if total_tpd + token_count > tpd:
logger.warning(f"TPD limit would be exceeded: {total_tpd + token_count}/{tpd}")
return True
return False
def _record_token_usage(self, model: str, token_count: int):
"""Record token usage for rate limit tracking"""
import logging
logger = logging.getLogger(__name__)
if model not in self.token_usage:
self.token_usage[model] = {"TPM": [], "TPH": [], "TPD": []}
current_time = time.time()
# Record for all three time windows
self.token_usage[model]["TPM"].append((current_time, token_count))
self.token_usage[model]["TPH"].append((current_time, token_count))
self.token_usage[model]["TPD"].append((current_time, token_count))
logger.debug(f"Recorded token usage for model {model}: {token_count} tokens")
def _disable_provider_for_duration(self, duration: str):
"""
Disable provider for a specific duration.
Args:
duration: "1m" (1 minute), "1h" (1 hour), or "1d" (1 day)
"""
import logging
logger = logging.getLogger(__name__)
duration_map = {
"1m": 60,
"1h": 3600,
"1d": 86400
}
if duration not in duration_map:
logger.error(f"Invalid duration: {duration}")
return
disable_seconds = duration_map[duration]
self.error_tracking['disabled_until'] = time.time() + disable_seconds
logger.error(f"!!! PROVIDER DISABLED !!!")
logger.error(f"Provider: {self.provider_id}")
logger.error(f"Reason: Token rate limit exceeded")
logger.error(f"Disabled for: {duration}")
logger.error(f"Disabled until: {self.error_tracking['disabled_until']}")
logger.error(f"Provider will be automatically re-enabled after cooldown")
async def apply_rate_limit(self, rate_limit: Optional[float] = None):
"""Apply rate limiting by waiting if necessary"""
if rate_limit is None:
......
"""
Copyleft (C) 2026 Stefy Lanza <stefy@nexlab.net>
AISBF - AI Service Broker Framework || AI Should Be Free
Utility functions for AISBF.
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <https://www.gnu.org/licenses/>.
Why did the programmer quit his job? Because he didn't get arrays!
Utility functions for AISBF.
"""
from typing import Dict, List, Optional
from langchain_text_splitters import TokenTextSplitter
from .config import config
def count_messages_tokens(messages: List[Dict], model: str) -> int:
"""
Count the total number of tokens in a list of messages.
This function uses tiktoken for accurate token counting.
Args:
messages: List of message dictionaries with 'role' and 'content' keys
model: Model name to determine encoding
Returns:
Total token count across all messages
"""
import tiktoken
import logging
logger = logging.getLogger(__name__)
# Select encoding based on model
# OpenAI models use cl100k_base encoding
if model.startswith(('gpt-', 'text-', 'davinci-', 'ada-', 'babbage-', 'curie-')):
try:
encoding = tiktoken.encoding_for_model(model)
except KeyError:
# Fallback to cl100k_base if model not found
encoding = tiktoken.get_encoding("cl100k_base")
else:
# Default encoding for other models
encoding = tiktoken.get_encoding("cl100k_base")
total_tokens = 0
for msg in messages:
content = msg.get('content', '')
if content:
if isinstance(content, str):
total_tokens += len(encoding.encode(content))
elif isinstance(content, list):
# Handle complex content (e.g., with images)
for item in content:
if isinstance(item, dict):
text = item.get('text', '')
if text:
total_tokens += len(encoding.encode(text))
elif isinstance(item, str):
total_tokens += len(encoding.encode(item))
logger.debug(f"Token count for model {model}: {total_tokens}")
return total_tokens
def split_messages_into_chunks(messages: List[Dict], max_tokens: int, model: str) -> List[List[Dict]]:
"""
Split messages into chunks based on token limit using langchain-text-splitters.
This function uses TokenTextSplitter to intelligently split text while
maintaining context through overlap between chunks.
Args:
messages: List of message dictionaries with 'role' and 'content' keys
max_tokens: Maximum tokens per chunk
model: Model name (used for logging)
Returns:
List of message chunks, where each chunk is a list of messages
"""
import logging
logger = logging.getLogger(__name__)
# Combine all messages into a single text for splitting
combined_text = ""
for msg in messages:
role = msg.get('role', 'user')
content = msg.get('content', '')
if content:
if isinstance(content, str):
combined_text += f"{role}: {content}\n\n"
elif isinstance(content, list):
# Handle complex content
for item in content:
if isinstance(item, dict):
text = item.get('text', '')
if text:
combined_text += f"{role}: {text}\n\n"
elif isinstance(item, str):
combined_text += f"{role}: {item}\n\n"
# Use langchain-text-splitters for intelligent text splitting
# chunk_size: Set slightly below max_tokens to allow room for model output
# chunk_overlap: Shared tokens between chunks for context continuity
text_splitter = TokenTextSplitter(
chunk_size=max_tokens - 200, # Leave room for model output
chunk_overlap=100, # Helps model understand transition between chunks
length_function=len # Use character count as proxy for token count
)
# Split text into chunks
text_chunks = text_splitter.split_text(combined_text)
logger.info(f"Split text into {len(text_chunks)} chunks using langchain-text-splitters")
logger.info(f"Max tokens per chunk: {max_tokens}, Chunk overlap: 100")
# Convert text chunks back to message format
message_chunks = []
for i, chunk_text in enumerate(text_chunks):
# Create a single user message with the chunk content
chunk_messages = [{"role": "user", "content": chunk_text}]
message_chunks.append(chunk_messages)
logger.debug(f"Chunk {i+1}: {len(chunk_text)} characters")
return message_chunks
def get_max_request_tokens_for_model(
model_name: str,
provider_config,
rotation_model_config: Optional[Dict] = None
) -> Optional[int]:
"""
Get the max_request_tokens for a model from provider or rotation configuration.
Priority order:
1. Check rotation model config (if provided)
2. Check provider models config
Args:
model_name: The model name to look up
provider_config: The provider configuration
rotation_model_config: Optional model config from rotation
Returns:
The max_request_tokens value, or None if not configured
"""
import logging
logger = logging.getLogger(__name__)
# First check rotation model config (highest priority)
if rotation_model_config and 'max_request_tokens' in rotation_model_config:
max_tokens = rotation_model_config['max_request_tokens']
logger.info(f"Found max_request_tokens in rotation model config: {max_tokens}")
return max_tokens
# Then check provider models config
if hasattr(provider_config, 'models') and provider_config.models:
for model in provider_config.models:
if model.get('name') == model_name:
max_tokens = model.get('max_request_tokens')
if max_tokens:
logger.info(f"Found max_request_tokens in provider model config: {max_tokens}")
return max_tokens
logger.debug(f"No max_request_tokens configured for model {model_name}")
return None
\ No newline at end of file
......@@ -6,7 +6,25 @@
"endpoint": "https://generativelanguage.googleapis.com/v1beta",
"type": "google",
"api_key_required": true,
"rate_limit": 0
"rate_limit": 0,
"models": [
{
"name": "gemini-2.0-flash",
"rate_limit": 0,
"max_request_tokens": 1000000,
"rate_limit_TPM": 15000,
"rate_limit_TPH": 100000,
"rate_limit_TPD": 1000000
},
{
"name": "gemini-1.5-pro",
"rate_limit": 0,
"max_request_tokens": 2000000,
"rate_limit_TPM": 15000,
"rate_limit_TPH": 100000,
"rate_limit_TPD": 1000000
}
]
},
"openai": {
"id": "openai",
......
......@@ -10,12 +10,20 @@
{
"name": "gemini-2.0-flash",
"weight": 3,
"rate_limit": 0
"rate_limit": 0,
"max_request_tokens": 100000,
"rate_limit_TPM": 15000,
"rate_limit_TPH": 100000,
"rate_limit_TPD": 1000000
},
{
"name": "gemini-1.5-pro",
"weight": 1,
"rate_limit": 0
"rate_limit": 0,
"max_request_tokens": 100000,
"rate_limit_TPM": 15000,
"rate_limit_TPH": 100000,
"rate_limit_TPD": 1000000
}
]
},
......@@ -26,12 +34,14 @@
{
"name": "gpt-4",
"weight": 2,
"rate_limit": 0
"rate_limit": 0,
"max_request_tokens": 128000
},
{
"name": "gpt-3.5-turbo",
"weight": 1,
"rate_limit": 0
"rate_limit": 0,
"max_request_tokens": 4000
}
]
},
......@@ -42,12 +52,14 @@
{
"name": "claude-3-5-sonnet-20241022",
"weight": 2,
"rate_limit": 0
"rate_limit": 0,
"max_request_tokens": 200000
},
{
"name": "claude-3-haiku-20240307",
"weight": 1,
"rate_limit": 0
"rate_limit": 0,
"max_request_tokens": 200000
}
]
}
......
......@@ -9,3 +9,5 @@ python-dotenv
google-genai
openai
anthropic
langchain-text-splitters
tiktoken
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment