Now it works!

parent 49e14347
......@@ -24,6 +24,8 @@ Request handlers for AISBF.
"""
import asyncio
import re
import uuid
import hashlib
from typing import Dict, List, Optional
from pathlib import Path
from fastapi import HTTPException, Request
......@@ -32,6 +34,36 @@ from .models import ChatCompletionRequest, ChatCompletionResponse
from .providers import get_provider_handler
from .config import config
def generate_system_fingerprint(provider_id: str, seed: Optional[int] = None) -> str:
"""
Generate a unique system_fingerprint for OpenAI-compatible responses.
The fingerprint is:
- Unique per provider (based on provider_id)
- Different for every request if seed is present in the request
- Consistent for the same provider_id + seed combination
Args:
provider_id: The provider identifier from configuration
seed: Optional seed from the request (if present, generates unique fingerprint per request)
Returns:
A fingerprint string in format "fp_<hash>"
"""
if seed is not None:
# If seed is provided, generate a unique fingerprint for this specific request
# Combine provider_id, seed, and a timestamp component for uniqueness
unique_data = f"{provider_id}:{seed}:{uuid.uuid4()}"
hash_value = hashlib.md5(unique_data.encode()).hexdigest()[:24]
else:
# Without seed, generate a consistent fingerprint per provider
# This is still unique per provider but consistent across requests
unique_data = f"{provider_id}:aisbf:fingerprint"
hash_value = hashlib.md5(unique_data.encode()).hexdigest()[:24]
return f"fp_{hash_value}"
class RequestHandler:
def __init__(self):
self.config = config
......@@ -91,23 +123,8 @@ class RequestHandler:
logger.info(f"Response type: {type(response)}")
logger.info(f"Response: {response}")
# Log response structure before returning
if isinstance(response, dict):
logger.info(f"Response is a dict")
logger.info(f"Response keys: {response.keys()}")
if 'choices' in response:
logger.info(f"Response has 'choices' key")
logger.info(f"Choices type: {type(response['choices'])}")
logger.info(f"Choices: {response['choices']}")
if isinstance(response['choices'], list) and len(response['choices']) > 0:
logger.info(f"Choices[0]: {response['choices'][0]}")
else:
logger.error(f"Choices is empty or not a list!")
else:
logger.error(f"Response does NOT have 'choices' key!")
else:
logger.info(f"Response is a JSONResponse (from Google/Anthropic provider)")
# For OpenAI-compatible providers, the response is already a response object
# Just return it as-is without any parsing or modification
handler.record_success()
logger.info(f"=== RequestHandler.handle_chat_completion END ===")
return response
......@@ -130,8 +147,15 @@ class RequestHandler:
if handler.is_rate_limited():
raise HTTPException(status_code=503, detail="Provider temporarily unavailable")
# Generate system_fingerprint for this request
# If seed is present in request, generate unique fingerprint per request
seed = request_data.get('seed')
system_fingerprint = generate_system_fingerprint(provider_id, seed)
async def stream_generator():
import logging
import time
import json
logger = logging.getLogger(__name__)
try:
# Apply rate limiting
......@@ -147,22 +171,37 @@ class RequestHandler:
tool_choice=request_data.get('tool_choice')
)
# Check if this is a Google streaming response (synchronous iterator)
# Google's generate_content_stream() returns a sync iterator, not async
is_google_stream = hasattr(response, '__iter__') and not hasattr(response, '__aiter__')
logger.info(f"Is Google streaming response: {is_google_stream}")
# Check if this is a Google streaming response by checking provider type from config
# This is more reliable than checking response iterability which can cause false positives
is_google_stream = provider_config.type == 'google'
logger.info(f"Is Google streaming response: {is_google_stream} (provider type: {provider_config.type})")
if is_google_stream:
# Handle Google's synchronous streaming response
# Convert Google chunks to OpenAI format
# Handle Google's streaming response
# Google provider returns an async generator
# Note: Google returns accumulated text, so we need to track and send only deltas
chunk_id = 0
for chunk in response:
accumulated_text = "" # Track text we've already sent
last_chunk_id = None # Track the last chunk for finish_reason
created_time = int(time.time())
response_id = f"google-{request_data['model']}-{created_time}"
# Collect all chunks first to know when we're at the last one
chunks_list = []
async for chunk in response:
chunks_list.append(chunk)
total_chunks = len(chunks_list)
chunk_idx = 0
for chunk in chunks_list:
try:
logger.debug(f"Google chunk type: {type(chunk)}")
logger.debug(f"Google chunk: {chunk}")
# Extract text from Google chunk
# Extract text from Google chunk (this is accumulated text)
chunk_text = ""
finish_reason = None
try:
if hasattr(chunk, 'candidates') and chunk.candidates:
candidate = chunk.candidates[0] if chunk.candidates else None
......@@ -171,72 +210,120 @@ class RequestHandler:
for part in candidate.content.parts:
if hasattr(part, 'text') and part.text:
chunk_text += part.text
# Check for finish reason in candidate
if hasattr(candidate, 'finish_reason'):
google_finish = str(candidate.finish_reason)
if google_finish in ('STOP', 'END_TURN', 'FINISH_REASON_UNSPECIFIED'):
finish_reason = "stop"
elif google_finish == 'MAX_TOKENS':
finish_reason = "length"
except Exception as e:
logger.error(f"Error extracting text from Google chunk: {e}")
# Create OpenAI-compatible chunk
openai_chunk = {
"id": f"google-{request_data['model']}-{int(time.time())}-chunk-{chunk_id}",
"object": "chat.completion.chunk",
"created": int(time.time()),
"model": request_data['model'],
"choices": [{
"index": 0,
"delta": {
"content": chunk_text
},
"finish_reason": None
}]
}
# Calculate the delta (only the new text since last chunk)
delta_text = chunk_text[len(accumulated_text):] if chunk_text.startswith(accumulated_text) else chunk_text
accumulated_text = chunk_text # Update accumulated text for next iteration
chunk_id += 1
logger.debug(f"OpenAI chunk: {openai_chunk}")
# Check if this is the last chunk
is_last_chunk = (chunk_idx == total_chunks - 1)
chunk_finish_reason = finish_reason if is_last_chunk else None
# Serialize as JSON
import json
yield f"data: {json.dumps(openai_chunk)}\n\n".encode('utf-8')
# Only send if there's new content or it's the last chunk with finish_reason
if delta_text or is_last_chunk:
# Create OpenAI-compatible chunk with additional fields
openai_chunk = {
"id": response_id,
"object": "chat.completion.chunk",
"created": created_time,
"model": request_data['model'],
"service_tier": None,
"system_fingerprint": system_fingerprint,
"usage": None,
"provider": provider_id,
"choices": [{
"index": 0,
"delta": {
"content": delta_text if delta_text else "",
"refusal": None,
"role": "assistant",
"tool_calls": None
},
"finish_reason": chunk_finish_reason,
"logprobs": None,
"native_finish_reason": chunk_finish_reason
}]
}
chunk_id += 1
logger.debug(f"OpenAI chunk (delta length: {len(delta_text)}, finish: {chunk_finish_reason})")
# Serialize as JSON
yield f"data: {json.dumps(openai_chunk)}\n\n".encode('utf-8')
chunk_idx += 1
except Exception as chunk_error:
error_msg = str(chunk_error)
logger.error(f"Error processing Google chunk: {error_msg}")
logger.error(f"Chunk type: {type(chunk)}")
logger.error(f"Chunk content: {chunk}")
# Skip this chunk and continue
chunk_idx += 1
continue
# Send final chunk with usage statistics (empty content)
final_chunk = {
"id": response_id,
"object": "chat.completion.chunk",
"created": created_time,
"model": request_data['model'],
"service_tier": None,
"system_fingerprint": system_fingerprint,
"usage": {
"prompt_tokens": None,
"completion_tokens": None,
"total_tokens": None
},
"provider": provider_id,
"choices": [{
"index": 0,
"delta": {
"content": "",
"function_call": None,
"refusal": None,
"role": "assistant",
"tool_calls": None
},
"finish_reason": None,
"logprobs": None,
"native_finish_reason": None
}]
}
yield f"data: {json.dumps(final_chunk)}\n\n".encode('utf-8')
else:
# Handle OpenAI/Anthropic streaming responses (async iterators)
# Handle OpenAI/Anthropic streaming responses
# OpenAI SDK returns a sync Stream object, not an async iterator
# So we use a regular for loop, not async for
for chunk in response:
try:
# Debug: Log chunk type and content before serialization
logger.debug(f"Chunk type: {type(chunk)}")
logger.debug(f"Chunk: {chunk}")
# For OpenAI-compatible providers, just pass through the raw chunk
# Convert chunk to dict and serialize as JSON
chunk_dict = chunk.model_dump() if hasattr(chunk, 'model_dump') else chunk
import json
yield f"data: {json.dumps(chunk_dict)}\n\n".encode('utf-8')
except Exception as chunk_error:
# Handle errors during chunk serialization (e.g., tool calls without tool_choice)
# This is a critical error - the model is trying to call tools without proper configuration
# We should treat this as a provider failure
# Handle errors during chunk serialization
error_msg = str(chunk_error)
if "tool" in error_msg.lower():
logger.error(f"Tool call error during streaming: {error_msg}")
logger.error(f"Chunk type: {type(chunk)}")
logger.error(f"Chunk content: {chunk}")
# Record this as a provider failure
handler.record_failure()
# Re-raise to trigger retry in rotation handler
raise
else:
logger.warning(f"Error serializing chunk: {error_msg}")
logger.warning(f"Chunk type: {type(chunk)}")
logger.warning(f"Chunk content: {chunk}")
# Skip this chunk and continue with the next one
continue
logger.warning(f"Error serializing chunk: {error_msg}")
logger.warning(f"Chunk type: {type(chunk)}")
logger.warning(f"Chunk content: {chunk}")
# Skip this chunk and continue with the next one
continue
handler.record_success()
except Exception as e:
handler.record_failure()
import json
error_dict = {"error": str(e)}
yield f"data: {json.dumps(error_dict)}\n\n".encode('utf-8')
......@@ -266,8 +353,23 @@ class RotationHandler:
def __init__(self):
self.config = config
async def handle_rotation_request(self, rotation_id: str, request_data: Dict) -> Dict:
def _get_provider_type(self, provider_id: str) -> str:
"""Get the provider type from configuration"""
provider_config = self.config.get_provider(provider_id)
if provider_config:
return provider_config.type
return None
async def handle_rotation_request(self, rotation_id: str, request_data: Dict):
"""
Handle a rotation request.
For streaming requests, returns a StreamingResponse with proper handling
based on the selected provider's type (google vs others).
For non-streaming requests, returns the response dict directly.
"""
import logging
import time
logger = logging.getLogger(__name__)
logger.info(f"=== RotationHandler.handle_rotation_request START ===")
logger.info(f"Rotation ID: {rotation_id}")
......@@ -384,17 +486,20 @@ class RotationHandler:
logger.info(f"Model rate limit: {selected_model.get('rate_limit', 'N/A')}")
logger.info(f"=== MODEL SELECTION PROCESS END ===")
# Retry logic: Try up to 5 times with different models (configurable)
# Retry logic: Try up to 5 times, allowing model retries with rate limiting
max_retries = 5
tried_models = []
tried_models = [] # Track which models have been tried
model_retry_counts = {} # Track retry count per model
last_error = None
successful_model = None
successful_handler = None
successful_response = None
for attempt in range(max_retries):
logger.info(f"")
logger.info(f"=== ATTEMPT {attempt + 1}/{max_retries} ===")
# Select a model that hasn't been tried yet
# Select a model that hasn't been tried yet, or retry a failed model with rate limiting
remaining_models = [m for m in available_models if m not in tried_models]
if not remaining_models:
......@@ -405,10 +510,19 @@ class RotationHandler:
# Sort remaining models by weight and select the best one
remaining_models.sort(key=lambda m: m['weight'], reverse=True)
current_model = remaining_models[0]
tried_models.append(current_model)
# Check if this model has been retried too many times
model_key = f"{current_model['provider_id']}:{current_model['name']}"
retry_count = model_retry_counts.get(model_key, 0)
if retry_count >= 2: # Max 2 retries per model
logger.warning(f"Model {current_model['name']} has reached max retry count, skipping")
tried_models.append(current_model)
continue
logger.info(f"Trying model: {current_model['name']} (provider: {current_model['provider_id']})")
logger.info(f"Attempt {attempt + 1} of {max_retries}")
logger.info(f"Model retry count: {retry_count}")
provider_id = current_model['provider_id']
api_key = current_model.get('api_key')
......@@ -429,12 +543,12 @@ class RotationHandler:
logger.info(f"Temperature: {request_data.get('temperature', 1.0)}")
logger.info(f"Stream: {request_data.get('stream', False)}")
# Apply rate limiting with model-specific rate limit if available
# Apply model-specific rate limiting
rate_limit = current_model.get('rate_limit')
logger.info(f"Model-specific rate limit: {rate_limit}")
logger.info("Applying rate limiting...")
await handler.apply_rate_limit(rate_limit)
logger.info("Rate limiting applied")
logger.info("Applying model-level rate limiting...")
await handler.apply_model_rate_limit(model_name, rate_limit)
logger.info("Model-level rate limiting applied")
logger.info(f"Sending request to provider handler...")
response = await handler.handle_request(
......@@ -447,22 +561,54 @@ class RotationHandler:
tool_choice=request_data.get('tool_choice')
)
logger.info(f"Response received from provider")
logger.info(f"Response type: {type(response)}")
handler.record_success()
# Update successful_model to the one that worked
# Update successful variables to the ones that worked
successful_model = current_model
successful_handler = handler
successful_response = response
logger.info(f"=== RotationHandler.handle_rotation_request END ===")
logger.info(f"Request succeeded on attempt {attempt + 1}")
logger.info(f"Successfully used model: {successful_model['name']} (provider: {successful_model['provider_id']})")
return response
# Check if response is a streaming request
is_streaming = request_data.get('stream', False)
if is_streaming:
# Get provider type from configuration for proper streaming handling
provider_type = self._get_provider_type(provider_id)
logger.info(f"Returning streaming response for provider type: {provider_type}")
return self._create_streaming_response(
response=response,
provider_type=provider_type,
provider_id=provider_id,
model_name=model_name,
handler=handler,
request_data=request_data
)
else:
logger.info("Returning non-streaming response")
return response
except Exception as e:
last_error = str(e)
handler.record_failure()
# Increment retry count for this model
model_retry_counts[model_key] = retry_count + 1
logger.error(f"Attempt {attempt + 1} failed: {str(e)}")
logger.error(f"Error type: {type(e).__name__}")
logger.error(f"Will try next model...")
continue
logger.error(f"Model retry count: {model_retry_counts[model_key]}")
# If this is the first failure for this model, allow retry with rate limiting
if model_retry_counts[model_key] < 2:
logger.info(f"Will retry model {model_name} with rate limiting...")
continue
else:
logger.error(f"Model {model_name} has failed too many times, moving to next model...")
tried_models.append(current_model)
continue
# All retries exhausted
logger.error(f"")
......@@ -475,6 +621,184 @@ class RotationHandler:
detail=f"All providers in rotation failed after {max_retries} attempts. Last error: {last_error}"
)
def _create_streaming_response(self, response, provider_type: str, provider_id: str, model_name: str, handler, request_data: Dict):
"""
Create a StreamingResponse with proper handling based on provider type.
Args:
response: The streaming response from the provider handler
provider_type: The type of provider (e.g., 'google', 'openai', 'anthropic')
provider_id: The provider identifier from configuration
model_name: The model name being used
handler: The provider handler (for recording success/failure)
request_data: The original request data
Returns:
StreamingResponse with appropriate generator for the provider type
"""
import logging
import time
import json
logger = logging.getLogger(__name__)
# Check if this is a Google provider based on configuration
is_google_provider = provider_type == 'google'
logger.info(f"Creating streaming response for provider type: {provider_type}, is_google: {is_google_provider}")
# Generate system_fingerprint for this request
# If seed is present in request, generate unique fingerprint per request
seed = request_data.get('seed')
system_fingerprint = generate_system_fingerprint(provider_id, seed)
async def stream_generator():
try:
if is_google_provider:
# Handle Google's streaming response
# Google provider returns an async generator
# Note: Google returns accumulated text, so we need to track and send only deltas
chunk_id = 0
accumulated_text = "" # Track text we've already sent
created_time = int(time.time())
response_id = f"google-{model_name}-{created_time}"
# Collect all chunks first to know when we're at the last one
chunks_list = []
async for chunk in response:
chunks_list.append(chunk)
total_chunks = len(chunks_list)
chunk_idx = 0
for chunk in chunks_list:
try:
logger.debug(f"Google chunk type: {type(chunk)}")
logger.debug(f"Google chunk: {chunk}")
# Extract text from Google chunk (this is accumulated text)
chunk_text = ""
finish_reason = None
try:
if hasattr(chunk, 'candidates') and chunk.candidates:
candidate = chunk.candidates[0] if chunk.candidates else None
if candidate and hasattr(candidate, 'content') and candidate.content:
if hasattr(candidate.content, 'parts') and candidate.content.parts:
for part in candidate.content.parts:
if hasattr(part, 'text') and part.text:
chunk_text += part.text
# Check for finish reason in candidate
if hasattr(candidate, 'finish_reason'):
google_finish = str(candidate.finish_reason)
if google_finish in ('STOP', 'END_TURN', 'FINISH_REASON_UNSPECIFIED'):
finish_reason = "stop"
elif google_finish == 'MAX_TOKENS':
finish_reason = "length"
except Exception as e:
logger.error(f"Error extracting text from Google chunk: {e}")
# Calculate the delta (only the new text since last chunk)
delta_text = chunk_text[len(accumulated_text):] if chunk_text.startswith(accumulated_text) else chunk_text
accumulated_text = chunk_text # Update accumulated text for next iteration
# Check if this is the last chunk
is_last_chunk = (chunk_idx == total_chunks - 1)
chunk_finish_reason = finish_reason if is_last_chunk else None
# Only send if there's new content or it's the last chunk with finish_reason
if delta_text or is_last_chunk:
# Create OpenAI-compatible chunk with additional fields
openai_chunk = {
"id": response_id,
"object": "chat.completion.chunk",
"created": created_time,
"model": model_name,
"service_tier": None,
"system_fingerprint": system_fingerprint,
"usage": None,
"provider": provider_id,
"choices": [{
"index": 0,
"delta": {
"content": delta_text if delta_text else "",
"refusal": None,
"role": "assistant",
"tool_calls": None
},
"finish_reason": chunk_finish_reason,
"logprobs": None,
"native_finish_reason": chunk_finish_reason
}]
}
chunk_id += 1
logger.debug(f"OpenAI chunk (delta length: {len(delta_text)}, finish: {chunk_finish_reason})")
yield f"data: {json.dumps(openai_chunk)}\n\n".encode('utf-8')
chunk_idx += 1
except Exception as chunk_error:
error_msg = str(chunk_error)
logger.error(f"Error processing Google chunk: {error_msg}")
logger.error(f"Chunk type: {type(chunk)}")
logger.error(f"Chunk content: {chunk}")
chunk_idx += 1
continue
# Send final chunk with usage statistics (empty content)
final_chunk = {
"id": response_id,
"object": "chat.completion.chunk",
"created": created_time,
"model": model_name,
"service_tier": None,
"system_fingerprint": system_fingerprint,
"usage": {
"prompt_tokens": None,
"completion_tokens": None,
"total_tokens": None
},
"provider": provider_id,
"choices": [{
"index": 0,
"delta": {
"content": "",
"function_call": None,
"refusal": None,
"role": "assistant",
"tool_calls": None
},
"finish_reason": None,
"logprobs": None,
"native_finish_reason": None
}]
}
yield f"data: {json.dumps(final_chunk)}\n\n".encode('utf-8')
else:
# Handle OpenAI/Anthropic streaming responses
# OpenAI SDK returns a sync Stream object, not an async iterator
# So we use a regular for loop, not async for
for chunk in response:
try:
logger.debug(f"Chunk type: {type(chunk)}")
logger.debug(f"Chunk: {chunk}")
# For OpenAI-compatible providers, just pass through the raw chunk
chunk_dict = chunk.model_dump() if hasattr(chunk, 'model_dump') else chunk
yield f"data: {json.dumps(chunk_dict)}\n\n".encode('utf-8')
except Exception as chunk_error:
error_msg = str(chunk_error)
logger.warning(f"Error serializing chunk: {error_msg}")
logger.warning(f"Chunk type: {type(chunk)}")
logger.warning(f"Chunk content: {chunk}")
continue
handler.record_success()
except Exception as e:
handler.record_failure()
error_dict = {"error": str(e)}
yield f"data: {json.dumps(error_dict)}\n\n".encode('utf-8')
return StreamingResponse(stream_generator(), media_type="text/event-stream")
async def handle_rotation_model_list(self, rotation_id: str) -> List[Dict]:
rotation_config = self.config.get_rotation(rotation_id)
if not rotation_config:
......@@ -683,7 +1007,12 @@ class AutoselectHandler:
return response
async def handle_autoselect_streaming_request(self, autoselect_id: str, request_data: Dict):
"""Handle an autoselect streaming request"""
"""
Handle an autoselect streaming request.
The rotation handler handles the streaming conversion internally based on
the selected provider's type, so we just pass through the response.
"""
import logging
logger = logging.getLogger(__name__)
logger.info(f"=== AUTOSELECT STREAMING REQUEST START ===")
......@@ -751,107 +1080,20 @@ class AutoselectHandler:
logger.info(f"Request mode: Streaming")
# Now proxy the actual streaming request to the selected rotation
# The rotation handler will return a StreamingResponse with proper handling
# based on the selected provider's type (google vs others)
logger.info(f"Proxying streaming request to rotation: {selected_model_id}")
rotation_handler = RotationHandler()
async def stream_generator():
import time # Import time module
try:
response = await rotation_handler.handle_rotation_request(
selected_model_id,
{**request_data, "stream": True}
)
logger.info(f"Autoselect stream response type: {type(response)}")
# Handle streaming responses based on chunk type rather than iterator type
if hasattr(response, '__aiter__'):
# Async iterator (OpenAI/Anthropic style)
logger.info(f"Handling async stream response")
async for chunk in response:
try:
logger.debug(f"Chunk type: {type(chunk)}")
logger.debug(f"Chunk: {chunk}")
chunk_dict = chunk.model_dump() if hasattr(chunk, 'model_dump') else chunk
import json
yield f"data: {json.dumps(chunk_dict)}\n\n".encode('utf-8')
except Exception as chunk_error:
error_msg = str(chunk_error)
if "tool" in error_msg.lower():
logger.error(f"Tool call error during streaming: {error_msg}")
logger.error(f"Chunk type: {type(chunk)}")
logger.error(f"Chunk content: {chunk}")
raise
else:
logger.warning(f"Error serializing chunk: {error_msg}")
logger.warning(f"Chunk type: {type(chunk)}")
logger.warning(f"Chunk content: {chunk}")
continue
elif hasattr(response, '__iter__'):
# Sync iterator - determine if it's Google or other format
logger.info(f"Handling sync stream response")
chunk_id = 0
for chunk in response:
try:
logger.debug(f"Chunk type: {type(chunk)}")
logger.debug(f"Chunk: {chunk}")
# Check if it's a Google-style chunk with candidates
if hasattr(chunk, 'candidates'):
logger.debug("Processing Google-style chunk")
chunk_text = ""
try:
if chunk.candidates:
candidate = chunk.candidates[0] if chunk.candidates else None
if candidate and hasattr(candidate, 'content') and candidate.content:
if hasattr(candidate.content, 'parts') and candidate.content.parts:
for part in candidate.content.parts:
if hasattr(part, 'text') and part.text:
chunk_text += part.text
except Exception as e:
logger.error(f"Error extracting text from Google chunk: {e}")
openai_chunk = {
"id": f"google-{request_data['model']}-{int(time.time())}-chunk-{chunk_id}",
"object": "chat.completion.chunk",
"created": int(time.time()),
"model": request_data['model'],
"choices": [{
"index": 0,
"delta": {
"content": chunk_text
},
"finish_reason": None
}]
}
import json
yield f"data: {json.dumps(openai_chunk)}\n\n".encode('utf-8')
chunk_id += 1
else:
logger.debug("Processing other sync stream chunk")
# For other sync stream formats (like OpenAI's sync stream)
chunk_dict = chunk.model_dump() if hasattr(chunk, 'model_dump') else chunk
import json
yield f"data: {json.dumps(chunk_dict)}\n\n".encode('utf-8')
except Exception as chunk_error:
error_msg = str(chunk_error)
logger.error(f"Error processing chunk: {error_msg}")
logger.error(f"Chunk type: {type(chunk)}")
logger.error(f"Chunk content: {chunk}")
continue
else:
logger.warning(f"Unknown stream type: {type(response)}")
except Exception as e:
logger.error(f"Error in streaming response: {str(e)}", exc_info=True)
import json
error_dict = {"error": str(e)}
yield f"data: {json.dumps(error_dict)}\n\n".encode('utf-8')
# The rotation handler handles streaming internally and returns a StreamingResponse
response = await rotation_handler.handle_rotation_request(
selected_model_id,
{**request_data, "stream": True}
)
logger.info(f"=== AUTOSELECT STREAMING REQUEST END ===")
return StreamingResponse(stream_generator(), media_type="text/event-stream")
# Return the StreamingResponse directly - rotation handler already handled the conversion
return response
async def handle_autoselect_model_list(self, autoselect_id: str) -> List[Dict]:
"""List available models for an autoselect endpoint"""
......
......@@ -44,6 +44,8 @@ class BaseProviderHandler:
self.error_tracking = config.error_tracking[provider_id]
self.last_request_time = 0
self.rate_limit = config.providers[provider_id].rate_limit
# Add model-level rate limit tracking
self.model_last_request_time = {} # {model_name: timestamp}
def is_rate_limited(self) -> bool:
if self.error_tracking['disabled_until'] and self.error_tracking['disabled_until'] > time.time():
......@@ -65,6 +67,25 @@ class BaseProviderHandler:
self.last_request_time = time.time()
async def apply_model_rate_limit(self, model: str, rate_limit: Optional[float] = None):
"""Apply rate limiting for a specific model"""
if rate_limit is None:
rate_limit = self.rate_limit
if rate_limit and rate_limit > 0:
current_time = time.time()
last_time = self.model_last_request_time.get(model, 0)
time_since_last_request = current_time - last_time
required_wait = rate_limit - time_since_last_request
if required_wait > 0:
import logging
logger = logging.getLogger(__name__)
logger.info(f"Model-level rate limiting: waiting {required_wait:.2f}s for model {model}")
await asyncio.sleep(required_wait)
self.model_last_request_time[model] = time.time()
def record_failure(self):
import logging
logger = logging.getLogger(__name__)
......@@ -188,6 +209,14 @@ class GoogleProviderHandler(BaseProviderHandler):
logging.info(f"GoogleProviderHandler: Response received: {response}")
self.record_success()
# Dump raw response if AISBF_DEBUG is enabled
if AISBF_DEBUG:
logging.info(f"=== RAW GOOGLE RESPONSE ===")
logging.info(f"Raw response type: {type(response)}")
logging.info(f"Raw response: {response}")
logging.info(f"Raw response dir: {dir(response)}")
logging.info(f"=== END RAW GOOGLE RESPONSE ===")
# Extract content from the nested response structure
# The response has candidates[0].content.parts
response_text = ""
......@@ -394,6 +423,13 @@ class GoogleProviderHandler(BaseProviderHandler):
# Pydantic validation might be causing serialization issues
logging.info(f"GoogleProviderHandler: Returning response dict (no validation)")
logging.info(f"Response dict keys: {openai_response.keys()}")
# Dump final response if AISBF_DEBUG is enabled
if AISBF_DEBUG:
logging.info(f"=== FINAL GOOGLE RESPONSE DICT ===")
logging.info(f"Final response: {openai_response}")
logging.info(f"=== END FINAL GOOGLE RESPONSE DICT ===")
return openai_response
except Exception as e:
import logging
......@@ -496,10 +532,18 @@ class OpenAIProviderHandler(BaseProviderHandler):
logging.info(f"OpenAIProviderHandler: Response received: {response}")
self.record_success()
# Return Stream object directly for streaming, otherwise dump to dict
if stream:
return response
return response.model_dump()
# Dump raw response if AISBF_DEBUG is enabled
if AISBF_DEBUG:
logging.info(f"=== RAW OPENAI RESPONSE ===")
logging.info(f"Raw response type: {type(response)}")
logging.info(f"Raw response: {response}")
logging.info(f"=== END RAW OPENAI RESPONSE ===")
# Return raw response without any parsing or modification
# For streaming: return the Stream object as-is
# For non-streaming: return the response object as-is
logging.info(f"OpenAIProviderHandler: Returning raw response without parsing")
return response
except Exception as e:
import logging
logging.error(f"OpenAIProviderHandler: Error: {str(e)}", exc_info=True)
......@@ -554,6 +598,14 @@ class AnthropicProviderHandler(BaseProviderHandler):
logging.info(f"AnthropicProviderHandler: Response received: {response}")
self.record_success()
# Dump raw response if AISBF_DEBUG is enabled
if AISBF_DEBUG:
logging.info(f"=== RAW ANTHROPIC RESPONSE ===")
logging.info(f"Raw response type: {type(response)}")
logging.info(f"Raw response: {response}")
logging.info(f"Raw response dir: {dir(response)}")
logging.info(f"=== END RAW ANTHROPIC RESPONSE ===")
logging.info(f"=== ANTHROPIC RESPONSE PARSING START ===")
logging.info(f"Response type: {type(response)}")
logging.info(f"Response attributes: {dir(response)}")
......@@ -681,6 +733,13 @@ class AnthropicProviderHandler(BaseProviderHandler):
# Pydantic validation might be causing serialization issues
logging.info(f"AnthropicProviderHandler: Returning response dict (no validation)")
logging.info(f"Response dict keys: {openai_response.keys()}")
# Dump final response dict if AISBF_DEBUG is enabled
if AISBF_DEBUG:
logging.info(f"=== FINAL ANTHROPIC RESPONSE DICT ===")
logging.info(f"Final response: {openai_response}")
logging.info(f"=== END FINAL ANTHROPIC RESPONSE DICT ===")
return openai_response
except Exception as e:
import logging
......@@ -818,10 +877,17 @@ class OllamaProviderHandler(BaseProviderHandler):
logger.info(f"Final response: {response_json}")
self.record_success()
# Dump raw response if AISBF_DEBUG is enabled
if AISBF_DEBUG:
logging.info(f"=== RAW OLLAMA RESPONSE ===")
logging.info(f"Raw response JSON: {response_json}")
logging.info(f"=== END RAW OLLAMA RESPONSE ===")
logger.info(f"=== OllamaProviderHandler.handle_request END ===")
# Convert Ollama response to OpenAI-style format
return {
openai_response = {
"id": f"ollama-{model}-{int(time.time())}",
"object": "chat.completion",
"created": int(time.time()),
......@@ -840,6 +906,14 @@ class OllamaProviderHandler(BaseProviderHandler):
"total_tokens": response_json.get("prompt_eval_count", 0) + response_json.get("eval_count", 0)
}
}
# Dump final response dict if AISBF_DEBUG is enabled
if AISBF_DEBUG:
logging.info(f"=== FINAL OLLAMA RESPONSE DICT ===")
logging.info(f"Final response: {openai_response}")
logging.info(f"=== END FINAL OLLAMA RESPONSE DICT ===")
return openai_response
except Exception as e:
self.record_failure()
raise e
......
......@@ -51,6 +51,9 @@ def setup_logging():
# Create log directory if it doesn't exist
log_dir.mkdir(parents=True, exist_ok=True)
# Check if debug mode is enabled
AISBF_DEBUG = os.environ.get('AISBF_DEBUG', '').lower() in ('true', '1', 'yes')
# Setup rotating file handler for general logs
log_file = log_dir / 'aisbf.log'
file_handler = RotatingFileHandler(
......@@ -76,9 +79,16 @@ def setup_logging():
error_handler.setLevel(logging.ERROR)
error_handler.setFormatter(file_formatter)
# Setup console handler
# Setup console handler - use DEBUG level if AISBF_DEBUG is enabled
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setLevel(logging.INFO)
if AISBF_DEBUG:
console_handler.setLevel(logging.DEBUG)
print("=== AISBF DEBUG MODE ENABLED ===")
print("All debug messages will be shown in console")
print("Raw responses from providers will be logged")
print("=== END AISBF DEBUG MODE ===")
else:
console_handler.setLevel(logging.INFO)
console_formatter = logging.Formatter(
'%(asctime)s - %(levelname)s - %(message)s'
)
......@@ -115,7 +125,7 @@ async def validation_exception_handler(request: Request, exc: RequestValidationE
print(f"Request method: {request.method}")
print(f"Request headers: {dict(request.headers)}")
# Try to get the raw body
# Try to get raw body
try:
raw_body = await request.body()
print(f"Raw request body: {raw_body.decode('utf-8')}")
......@@ -130,7 +140,7 @@ async def validation_exception_handler(request: Request, exc: RequestValidationE
logger.error(f"Request method: {request.method}")
logger.error(f"Request headers: {dict(request.headers)}")
# Try to get the raw body
# Try to get raw body
try:
raw_body = await request.body()
logger.error(f"Raw request body: {raw_body.decode('utf-8')}")
......@@ -187,7 +197,12 @@ async def list_rotations():
@app.post("/api/rotations/chat/completions")
async def rotation_chat_completions(request: Request, body: ChatCompletionRequest):
"""Handle chat completions for rotations using model name to select rotation"""
"""
Handle chat completions for rotations using model name to select rotation.
The RotationHandler handles streaming internally based on the selected
provider's type (google vs others), so we just pass through the response.
"""
logger.info(f"=== ROTATION CHAT COMPLETION REQUEST START ===")
logger.info(f"Request path: {request.url.path}")
logger.info(f"Model requested: {body.model}")
......@@ -210,118 +225,11 @@ async def rotation_chat_completions(request: Request, body: ChatCompletionReques
logger.debug("Handling rotation request")
try:
if body.stream:
logger.debug("Handling streaming rotation request")
rotation_config = config.get_rotation(body.model)
if not rotation_config:
raise HTTPException(status_code=400, detail=f"Rotation {body.model} not found")
# Check if this is a Google streaming response
async def stream_generator():
import time # Import time module
try:
response = await rotation_handler.handle_rotation_request(body.model, body_dict)
# Check if this is a generator (sync iterator) response
if hasattr(response, '__iter__') and not hasattr(response, '__aiter__'):
logger.debug("Handling synchronous generator stream response")
# This is likely a Google streaming response
chunk_id = 0
for chunk in response:
try:
logger.debug(f"Chunk type: {type(chunk)}")
logger.debug(f"Chunk: {chunk}")
# Extract text from Google chunk
chunk_text = ""
try:
if hasattr(chunk, 'candidates') and chunk.candidates:
candidate = chunk.candidates[0] if chunk.candidates else None
if candidate and hasattr(candidate, 'content') and candidate.content:
if hasattr(candidate.content, 'parts') and candidate.content.parts:
for part in candidate.content.parts:
if hasattr(part, 'text') and part.text:
chunk_text += part.text
except Exception as e:
logger.error(f"Error extracting text from chunk: {e}")
# Create OpenAI-compatible chunk
openai_chunk = {
"id": f"google-{body.model}-{int(time.time())}-chunk-{chunk_id}",
"object": "chat.completion.chunk",
"created": int(time.time()),
"model": body.model,
"choices": [{
"index": 0,
"delta": {
"content": chunk_text
},
"finish_reason": None
}]
}
chunk_id += 1
logger.debug(f"OpenAI chunk: {openai_chunk}")
import json
yield f"data: {json.dumps(openai_chunk)}\n\n".encode('utf-8')
except Exception as chunk_error:
logger.error(f"Error processing chunk: {str(chunk_error)}")
continue
elif hasattr(response, '__aiter__'):
# Handle OpenAI/Anthropic streaming responses (async iterators)
chunk_id = 0
async for chunk in response:
try:
# Extract text from Google chunk
chunk_text = ""
try:
if hasattr(chunk, 'candidates') and chunk.candidates:
candidate = chunk.candidates[0] if chunk.candidates else None
if candidate and hasattr(candidate, 'content') and candidate.content:
if hasattr(candidate.content, 'parts') and candidate.content.parts:
for part in candidate.content.parts:
if hasattr(part, 'text') and part.text:
chunk_text += part.text
except Exception as e:
logger.error(f"Error extracting text from chunk: {e}")
# Create OpenAI-compatible chunk
chunk_dict = {
"id": f"google-{body.model}-{int(time.time())}-chunk-{chunk_id}",
"object": "chat.completion.chunk",
"created": int(time.time()),
"model": body.model,
"choices": [{
"index": 0,
"delta": {
"content": chunk_text
},
"finish_reason": None
}]
}
chunk_id += 1
import json
yield f"data: {json.dumps(chunk_dict)}\n\n".encode('utf-8')
except Exception as chunk_error:
logger.warning(f"Error serializing chunk: {str(chunk_error)}")
continue
else:
# Handle other types of responses
logger.warning(f"Unknown response type: {type(response)}")
import json
yield f"data: {json.dumps({'error': 'Unknown response type'})}\n\n".encode('utf-8')
except Exception as e:
logger.error(f"Error in streaming response: {str(e)}")
import json
yield f"data: {json.dumps({'error': str(e)})}\n\n".encode('utf-8')
return StreamingResponse(stream_generator(), media_type="text/event-stream")
else:
logger.debug("Handling non-streaming rotation request")
result = await rotation_handler.handle_rotation_request(body.model, body_dict)
logger.debug(f"Rotation response result: {result}")
return result
# The rotation handler handles streaming internally and returns
# a StreamingResponse for streaming requests or a dict for non-streaming
result = await rotation_handler.handle_rotation_request(body.model, body_dict)
logger.debug(f"Rotation response result type: {type(result)}")
return result
except Exception as e:
logger.error(f"Error handling rotation chat_completions: {str(e)}", exc_info=True)
raise
......@@ -477,7 +385,7 @@ async def chat_completions(provider_id: str, request: Request, body: ChatComplet
logger.error(f"Available rotations: {list(config.rotations.keys())}")
logger.error(f"Available autoselect: {list(config.autoselect.keys())}")
raise HTTPException(status_code=400, detail=f"Provider {provider_id} not found")
logger.info(f"Provider ID '{provider_id}' found in providers")
provider_config = config.get_provider(provider_id)
......@@ -524,7 +432,7 @@ async def list_models(request: Request, provider_id: str):
logger.error(f"Available rotations: {list(config.rotations.keys())}")
logger.error(f"Available autoselect: {list(config.autoselect.keys())}")
raise HTTPException(status_code=400, detail=f"Provider {provider_id} not found")
logger.info(f"Provider ID '{provider_id}' found in providers")
provider_config = config.get_provider(provider_id)
......@@ -548,16 +456,16 @@ async def catch_all_post(provider_id: str, request: Request):
logger.info(f"Available providers: {list(config.providers.keys())}")
logger.info(f"Available rotations: {list(config.rotations.keys())}")
logger.info(f"Available autoselect: {list(config.autoselect.keys())}")
error_msg = f"""
Invalid endpoint: {request.url.path}
The correct endpoint format is: /api/{{provider_id}}/chat/completions
Available providers: {list(config.providers.keys())}
Available rotations: {list(config.rotations.keys())}
Available autoselect: {list(config.autoselect.keys())}
Example: POST /api/ollama/chat/completions
"""
logger.error(error_msg)
......
......@@ -21,6 +21,8 @@
# AISBF - AI Service Broker Framework || AI Should Be Free
# DEVELOPMENT START SCRIPT - For development use only
# For production use, install with: python setup.py install
export AISBF_DEBUG=true
# Get the directory where this script is located
SCRIPT_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
......
#!/bin/bash
# Copyright (C) 2026 Stefy Lanza <stefy@nexlab.net>
#
# Test script for AISBF proxy
#
PROXY_URL="http://127.0.0.1:17765"
# Test 1: Streaming request to rotations endpoint with googletest model
echo "Test 1: Streaming request to rotations endpoint with googletest model"
echo "----------------------------------------"
echo "Note: Streaming responses will appear as data: lines"
echo ""
curl -X POST "${PROXY_URL}/api/rotations/chat/completions" \
-H "Content-Type: application/json" \
-d '{
"model": "googletest",
"messages": [
{"role": "user", "content": "Hello, how are you?"}
],
"stream": true
}' \
2>/dev/null
echo ""
echo ""
# Test 2: Streaming request to rotations endpoint with kilotest model
echo "Test 2: Streaming request to rotations endpoint with kilotest model"
echo "----------------------------------------"
echo "Note: Streaming responses will appear as data: lines"
echo ""
curl -X POST "${PROXY_URL}/api/rotations/chat/completions" \
-H "Content-Type: application/json" \
-d '{
"model": "kilotest",
"messages": [
{"role": "user", "content": "Hello, how are you?"}
],
"stream": true
}' \
2>/dev/null
echo ""
echo ""
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment