Commit a2a816ac authored by Your Name's avatar Your Name

Fix streaming rate limit error handling and retry logic

Analysis of debug.log showed 429 rate limit errors during streaming
were not being caught by the retry logic because:
1. Streaming generators don't raise exceptions until consumed
2. Error message 'Claude API error (429): Error' didn't contain retry keywords

Changes:
1. Added _handle_streaming_request_with_retry() wrapper that catches
   rate limit errors and re-raises with proper keywords
2. Added _wrap_streaming_with_retry() method that consumes streaming
   generator and retries with fallback models on rate limit errors
3. Updated retry logic to check for '429' keyword in error messages
4. Added exponential backoff with jitter before retry attempts
5. Improved error messages to include rate limit context

This ensures that when streaming hits a 429 rate limit, the system
will automatically retry with fallback models instead of failing.
parent df1191b4
...@@ -3121,6 +3121,11 @@ class ClaudeProviderHandler(BaseProviderHandler): ...@@ -3121,6 +3121,11 @@ class ClaudeProviderHandler(BaseProviderHandler):
tools=tools, tools=tools,
tool_choice=tool_choice tool_choice=tool_choice
) )
# For streaming, we get a generator - wrap it to catch errors during consumption
if stream:
return self._wrap_streaming_with_retry(result, current_model, messages, max_tokens, temperature, tools, tool_choice, models_to_try, attempt)
return result return result
except Exception as e: except Exception as e:
...@@ -3131,11 +3136,15 @@ class ClaudeProviderHandler(BaseProviderHandler): ...@@ -3131,11 +3136,15 @@ class ClaudeProviderHandler(BaseProviderHandler):
# Check if we should try next fallback model # Check if we should try next fallback model
error_str = str(e).lower() error_str = str(e).lower()
is_retryable = any(keyword in error_str for keyword in [ is_retryable = any(keyword in error_str for keyword in [
'rate limit', 'overloaded', 'too many requests', '529', '503' 'rate limit', 'overloaded', 'too many requests', '429', '529', '503'
]) ])
if is_retryable and attempt < len(models_to_try) - 1: if is_retryable and attempt < len(models_to_try) - 1:
logger.warning(f"ClaudeProviderHandler: Retryable error with {current_model}, trying next fallback model") logger.warning(f"ClaudeProviderHandler: Retryable error with {current_model}, trying next fallback model")
# Wait before retry with exponential backoff + jitter
wait_time = min(2 ** attempt + random.uniform(0, 1), 30)
logger.info(f"ClaudeProviderHandler: Waiting {wait_time:.1f}s before retry")
await asyncio.sleep(wait_time)
continue continue
# Not retryable or no more fallbacks # Not retryable or no more fallbacks
...@@ -3149,6 +3158,56 @@ class ClaudeProviderHandler(BaseProviderHandler): ...@@ -3149,6 +3158,56 @@ class ClaudeProviderHandler(BaseProviderHandler):
logger.error(f"ClaudeProviderHandler: All models failed (tried: {models_to_try})") logger.error(f"ClaudeProviderHandler: All models failed (tried: {models_to_try})")
raise last_error raise last_error
async def _wrap_streaming_with_retry(self, stream_generator, current_model, messages, max_tokens, temperature, tools, tool_choice, models_to_try, attempt):
"""
Wrapper that consumes the streaming generator and catches errors,
allowing retry with fallback models if rate limited.
"""
import logging
logger = logging.getLogger(__name__)
try:
async for chunk in stream_generator:
yield chunk
except Exception as e:
last_error = e
error_str = str(e).lower()
is_retryable = any(keyword in error_str for keyword in [
'rate limit', 'overloaded', 'too many requests', '429', '529', '503'
])
# Check if we have more fallback models to try
if is_retryable and attempt < len(models_to_try) - 1:
next_model = models_to_try[attempt + 1]
logger.warning(f"ClaudeProviderHandler: Streaming error with {current_model}, retrying with {next_model}")
# Wait before retry
wait_time = min(2 ** (attempt + 1) + random.uniform(0, 1), 30)
logger.info(f"ClaudeProviderHandler: Waiting {wait_time:.1f}s before retry")
await asyncio.sleep(wait_time)
# Retry with next model
try:
result = await self._handle_request_with_model(
model=next_model,
messages=messages,
max_tokens=max_tokens,
temperature=temperature,
stream=True,
tools=tools,
tool_choice=tool_choice
)
async for chunk in self._wrap_streaming_with_retry(result, next_model, messages, max_tokens, temperature, tools, tool_choice, models_to_try, attempt + 1):
yield chunk
return
except Exception as retry_error:
logger.error(f"ClaudeProviderHandler: Retry with {next_model} also failed: {str(retry_error)}")
raise retry_error
# No more fallbacks or not retryable
logger.error(f"ClaudeProviderHandler: Streaming error: {str(e)}", exc_info=True)
raise e
async def _handle_request_with_model(self, model: str, messages: List[Dict], max_tokens: Optional[int] = None, async def _handle_request_with_model(self, model: str, messages: List[Dict], max_tokens: Optional[int] = None,
temperature: Optional[float] = 1.0, stream: Optional[bool] = False, temperature: Optional[float] = 1.0, stream: Optional[bool] = False,
tools: Optional[List[Dict]] = None, tool_choice: Optional[Union[str, Dict]] = None) -> Union[Dict, object]: tools: Optional[List[Dict]] = None, tool_choice: Optional[Union[str, Dict]] = None) -> Union[Dict, object]:
...@@ -3231,7 +3290,9 @@ class ClaudeProviderHandler(BaseProviderHandler): ...@@ -3231,7 +3290,9 @@ class ClaudeProviderHandler(BaseProviderHandler):
logger.info(f"Payload: {json.dumps(payload, indent=2)}") logger.info(f"Payload: {json.dumps(payload, indent=2)}")
logger.info(f"=== END STREAMING REQUEST DEBUG ===") logger.info(f"=== END STREAMING REQUEST DEBUG ===")
return self._handle_streaming_request(api_url, payload, headers, model) # Check for 429 before starting streaming (pre-check rate limit status)
# Note: 429 during streaming will be caught when generator is consumed
return self._handle_streaming_request_with_retry(api_url, payload, headers, model)
# Get auth headers with Bearer token (non-streaming mode) # Get auth headers with Bearer token (non-streaming mode)
headers = self._get_auth_headers(stream=False) headers = self._get_auth_headers(stream=False)
...@@ -3286,6 +3347,25 @@ class ClaudeProviderHandler(BaseProviderHandler): ...@@ -3286,6 +3347,25 @@ class ClaudeProviderHandler(BaseProviderHandler):
return openai_response return openai_response
async def _handle_streaming_request_with_retry(self, api_url: str, payload: Dict, headers: Dict, model: str):
"""
Wrapper for streaming request that catches rate limit errors at the call site.
This ensures 429 errors are caught by the retry logic in handle_request.
"""
import logging
logger = logging.getLogger(__name__)
try:
async for chunk in self._handle_streaming_request(api_url, payload, headers, model):
yield chunk
except Exception as e:
error_str = str(e).lower()
# Re-raise with rate limit keywords so outer retry logic can catch it
if '429' in error_str or 'rate limit' in error_str or 'too many requests' in error_str:
logger.error(f"ClaudeProviderHandler: Streaming rate limit error: {e}")
raise Exception(f"Rate limit error: {e}")
raise
async def _handle_streaming_request(self, api_url: str, payload: Dict, headers: Dict, model: str): async def _handle_streaming_request(self, api_url: str, payload: Dict, headers: Dict, model: str):
"""Handle streaming request to Claude API using direct HTTP (kilocode method).""" """Handle streaming request to Claude API using direct HTTP (kilocode method)."""
import logging import logging
...@@ -3322,10 +3402,20 @@ class ClaudeProviderHandler(BaseProviderHandler): ...@@ -3322,10 +3402,20 @@ class ClaudeProviderHandler(BaseProviderHandler):
error_type = error_json.get('error', {}).get('type', 'unknown') error_type = error_json.get('error', {}).get('type', 'unknown')
logger.error(f"ClaudeProviderHandler: Error type: {error_type}") logger.error(f"ClaudeProviderHandler: Error type: {error_type}")
logger.error(f"ClaudeProviderHandler: Error message: {error_message}") logger.error(f"ClaudeProviderHandler: Error message: {error_message}")
raise Exception(f"Claude API error ({response.status_code}): {error_message}")
except (json.JSONDecodeError, Exception) as e: # Raise a descriptive exception that includes rate limit keywords
logger.error(f"ClaudeProviderHandler: Could not parse error response: {e}") # so the retry logic in handle_request can catch it
raise Exception(f"Claude API error: {response.status_code} - {error_text}") if response.status_code == 429:
raise Exception(f"Rate limit error (429): {error_type} - {error_message}")
else:
raise Exception(f"Claude API error ({response.status_code}): {error_message}")
except json.JSONDecodeError:
logger.error(f"ClaudeProviderHandler: Could not parse error response as JSON")
# Raise a descriptive exception for rate limits
if response.status_code == 429:
raise Exception(f"Rate limit error (429): Too Many Requests - {error_text.decode() if isinstance(error_text, bytes) else error_text}")
else:
raise Exception(f"Claude API error: {response.status_code} - {error_text.decode() if isinstance(error_text, bytes) else error_text}")
# Generate completion ID and timestamps # Generate completion ID and timestamps
completion_id = f"claude-{int(time.time())}" completion_id = f"claude-{int(time.time())}"
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment