Commit 5c357f98 authored by Your Name's avatar Your Name

Implement Phase 3 Claude provider improvements

Add three robustness improvements to ClaudeProviderHandler:

1. Message Role Validation (_validate_messages):
   - Validate roles are one of: user, assistant, system, tool
   - Auto-fix unknown roles to 'user'
   - Ensure system messages only appear at start
   - Insert synthetic assistant messages between consecutive user messages
   - Merge consecutive assistant messages
   - Validate tool messages have tool_call_id
   - Reference: vendors/kilocode normalizeMessages() + ensure_alternating_roles()

2. Tool Result Size Validation (_truncate_tool_result):
   - Truncate oversized tool results with configurable limit (default 100k chars)
   - Add truncation notice with original length info
   - Reference: vendors/claude applyToolResultBudget

3. Model Fallback Support (handle_request refactoring):
   - Add _get_fallback_models() to read fallback list from config
   - Retry with fallback models on retryable errors (rate limit, overloaded)
   - Split into handle_request() (with retry) and _handle_request_with_model() (actual logic)
   - Log fallback attempts for debugging

All methods integrated into handle_request() for automatic application.
parent 920fff5a
......@@ -2492,6 +2492,161 @@ class ClaudeProviderHandler(BaseProviderHandler):
logger.info(f"ClaudeProviderHandler: Applied cache_control to {len(cache_indices)} messages for prompt caching")
return anthropic_messages
def _validate_messages(self, messages: List[Dict]) -> List[Dict]:
"""
Validate and normalize message roles for Claude API compatibility.
Validates:
- Message roles are one of: user, assistant, system, tool
- System messages only appear at start
- Alternating user/assistant roles (after system)
- Tool messages have tool_call_id
Auto-fixes:
- Unknown roles → 'user'
- Consecutive user messages → inserts synthetic assistant
- Consecutive assistant messages → merges content
Reference: vendors/kilocode normalizeMessages() + ensure_alternating_roles()
Args:
messages: OpenAI format messages
Returns:
Validated and normalized messages
"""
import logging
logger = logging.getLogger(__name__)
if not messages:
return messages
valid_roles = {'user', 'assistant', 'system', 'tool'}
normalized = []
issues_found = 0
for i, msg in enumerate(messages):
role = msg.get('role', '')
content = msg.get('content', '')
# Validate and normalize role
if role not in valid_roles:
logger.warning(f"ClaudeProviderHandler: Unknown message role '{role}' at index {i}, treating as 'user'")
msg['role'] = 'user'
role = 'user'
issues_found += 1
# Validate system messages only at start
if role == 'system' and i > 0:
logger.warning(f"ClaudeProviderHandler: System message at index {i} (not at start), converting to user")
msg['role'] = 'user'
role = 'user'
issues_found += 1
# Validate tool messages have tool_call_id
if role == 'tool':
tool_call_id = msg.get('tool_call_id') or msg.get('name')
if not tool_call_id:
logger.warning(f"ClaudeProviderHandler: Tool message at index {i} missing tool_call_id, adding placeholder")
msg['tool_call_id'] = f"placeholder_{i}"
issues_found += 1
# Check for consecutive same-role messages
if normalized:
last_role = normalized[-1].get('role', '')
if role == 'user' and last_role == 'user':
# Insert synthetic assistant message
logger.debug(f"ClaudeProviderHandler: Inserting synthetic assistant message between consecutive user messages at index {i}")
normalized.append({
'role': 'assistant',
'content': '(empty)'
})
issues_found += 1
elif role == 'assistant' and last_role == 'assistant':
# Merge with previous assistant message
logger.debug(f"ClaudeProviderHandler: Merging consecutive assistant messages at index {i}")
prev_content = normalized[-1].get('content', '')
if isinstance(prev_content, str) and isinstance(content, str):
normalized[-1]['content'] = f"{prev_content}\n{content}"
else:
normalized[-1]['content'] = content
issues_found += 1
continue # Skip adding this message, already merged
normalized.append(msg.copy())
if issues_found:
logger.info(f"ClaudeProviderHandler: Message validation fixed {issues_found} issue(s)")
return normalized
def _truncate_tool_result(self, content: str, max_chars: int = 100000) -> tuple[str, bool]:
"""
Truncate tool result content if it exceeds the size limit.
Claude API has limits on tool result sizes. This truncates oversized
results and adds a truncation notice.
Reference: vendors/claude applyToolResultBudget
Args:
content: Tool result content string
max_chars: Maximum allowed characters (default 100000)
Returns:
Tuple of (truncated_content, was_truncated)
"""
import logging
logger = logging.getLogger(__name__)
if not content or len(content) <= max_chars:
return content, False
# Truncate and add notice
truncation_notice = f"\n\n[Tool result truncated: exceeded {max_chars} character limit. Original length: {len(content)} characters.]"
truncated = content[:max_chars - len(truncation_notice)] + truncation_notice
logger.warning(f"ClaudeProviderHandler: Tool result truncated from {len(content)} to {max_chars} characters")
return truncated, True
def _get_cache_config(self) -> Dict:
"""
Get prompt caching configuration from provider config.
Returns:
Dict with caching settings (enabled, min_messages, etc.)
"""
cache_config = {
'enabled': False,
'min_messages': 4, # Minimum messages before enabling cache
}
if self.provider_config:
claude_config = getattr(self.provider_config, 'claude_config', None)
if claude_config and isinstance(claude_config, dict):
cache_config['enabled'] = claude_config.get('enable_prompt_caching', False)
cache_config['min_messages'] = claude_config.get('cache_min_messages', 4)
return cache_config
def _get_fallback_models(self) -> List[str]:
"""
Get list of fallback models from provider config.
Returns:
List of model IDs to try as fallbacks
"""
fallback_models = []
if self.provider_config:
claude_config = getattr(self.provider_config, 'claude_config', None)
if claude_config and isinstance(claude_config, dict):
fallback_models = claude_config.get('fallback_models', [])
return fallback_models
def _convert_tool_choice_to_anthropic(self, tool_choice: Optional[Union[str, Dict]]) -> Optional[Dict]:
"""
Convert OpenAI tool_choice format to Anthropic format.
......@@ -2815,21 +2970,84 @@ class ClaudeProviderHandler(BaseProviderHandler):
if self.is_rate_limited():
raise Exception("Provider rate limited")
try:
import logging
import json
logging.info(f"ClaudeProviderHandler: Handling request for model {model}")
if AISBF_DEBUG:
logging.info(f"ClaudeProviderHandler: Messages: {messages}")
else:
logging.info(f"ClaudeProviderHandler: Messages count: {len(messages)}")
# Get fallback models from config (Phase 3.3)
fallback_models = self._get_fallback_models()
models_to_try = [model] + fallback_models
last_error = None
for attempt, current_model in enumerate(models_to_try):
try:
if attempt > 0:
import logging
logger = logging.getLogger(__name__)
logger.warning(f"ClaudeProviderHandler: Retrying with fallback model: {current_model} (original: {model})")
result = await self._handle_request_with_model(
model=current_model,
messages=messages,
max_tokens=max_tokens,
temperature=temperature,
stream=stream,
tools=tools,
tool_choice=tool_choice
)
return result
except Exception as e:
last_error = e
import logging
logger = logging.getLogger(__name__)
# Check if we should try next fallback model
error_str = str(e).lower()
is_retryable = any(keyword in error_str for keyword in [
'rate limit', 'overloaded', 'too many requests', '529', '503'
])
if is_retryable and attempt < len(models_to_try) - 1:
logger.warning(f"ClaudeProviderHandler: Retryable error with {current_model}, trying next fallback model")
continue
# Not retryable or no more fallbacks
logger.error(f"ClaudeProviderHandler: Error with model {current_model}: {str(e)}", exc_info=True)
self.record_failure()
raise e
# All models failed
import logging
logger = logging.getLogger(__name__)
logger.error(f"ClaudeProviderHandler: All models failed (tried: {models_to_try})")
raise last_error
async def _handle_request_with_model(self, model: str, messages: List[Dict], max_tokens: Optional[int] = None,
temperature: Optional[float] = 1.0, stream: Optional[bool] = False,
tools: Optional[List[Dict]] = None, tool_choice: Optional[Union[str, Dict]] = None) -> Union[Dict, object]:
"""
Handle request with a specific model, including validation and truncation.
This is the internal method that does the actual request handling.
The public handle_request() wraps this with fallback retry logic.
"""
import logging
import json
logger = logging.getLogger(__name__)
logger.info(f"ClaudeProviderHandler: Handling request for model {model}")
if AISBF_DEBUG:
logger.info(f"ClaudeProviderHandler: Messages: {messages}")
else:
logger.info(f"ClaudeProviderHandler: Messages count: {len(messages)}")
# Apply rate limiting
await self.apply_rate_limit()
# Convert messages to Anthropic format (handles tool messages properly)
system_message, anthropic_messages = self._convert_messages_to_anthropic(messages)
# Apply rate limiting
await self.apply_rate_limit()
# Validate and normalize messages (Phase 3.1)
validated_messages = self._validate_messages(messages)
# Convert messages to Anthropic format (handles tool messages properly)
system_message, anthropic_messages = self._convert_messages_to_anthropic(validated_messages)
# Build request payload for direct HTTP request (kilocode method)
# IMPORTANT: OAuth2 API uses its own model naming scheme (e.g., claude-sonnet-4-5-20250929)
......@@ -2848,102 +3066,96 @@ class ClaudeProviderHandler(BaseProviderHandler):
if system_message:
payload['system'] = system_message
# Convert OpenAI tools to Anthropic format
if tools:
anthropic_tools = self._convert_tools_to_anthropic(tools)
if anthropic_tools:
payload['tools'] = anthropic_tools
# Convert OpenAI tool_choice format to Anthropic format
if tool_choice and tools: # Only add tool_choice if we have tools
anthropic_tool_choice = self._convert_tool_choice_to_anthropic(tool_choice)
if anthropic_tool_choice:
payload['tool_choice'] = anthropic_tool_choice
# Add stream parameter
payload['stream'] = stream
# TEMPORARY: Always log payload for debugging 400 errors
logging.info(f"ClaudeProviderHandler: Request payload: {json.dumps(payload, indent=2)}")
# Use api.anthropic.com endpoint (correct endpoint for OAuth2 tokens)
api_url = 'https://api.anthropic.com/v1/messages'
logging.info(f"ClaudeProviderHandler: Making request to {api_url}")
# Make request using direct HTTP (kilocode method)
if stream:
logging.info(f"ClaudeProviderHandler: Using streaming mode with direct HTTP")
# Get auth headers with Bearer token (streaming mode)
headers = self._get_auth_headers(stream=True)
# Log the full request for debugging
if AISBF_DEBUG:
logging.info(f"=== STREAMING REQUEST DEBUG ===")
logging.info(f"URL: {api_url}")
logging.info(f"Headers: {json.dumps({k: v for k, v in headers.items() if k.lower() != 'authorization'}, indent=2)}")
logging.info(f"Payload: {json.dumps(payload, indent=2)}")
logging.info(f"=== END STREAMING REQUEST DEBUG ===")
return self._handle_streaming_request(api_url, payload, headers, model)
# Convert OpenAI tools to Anthropic format
if tools:
anthropic_tools = self._convert_tools_to_anthropic(tools)
if anthropic_tools:
payload['tools'] = anthropic_tools
# Convert OpenAI tool_choice format to Anthropic format
if tool_choice and tools: # Only add tool_choice if we have tools
anthropic_tool_choice = self._convert_tool_choice_to_anthropic(tool_choice)
if anthropic_tool_choice:
payload['tool_choice'] = anthropic_tool_choice
# Add stream parameter
payload['stream'] = stream
# TEMPORARY: Always log payload for debugging 400 errors
logger.info(f"ClaudeProviderHandler: Request payload: {json.dumps(payload, indent=2)}")
# Get auth headers with Bearer token (non-streaming mode)
headers = self._get_auth_headers(stream=False)
# Use api.anthropic.com endpoint (correct endpoint for OAuth2 tokens)
api_url = 'https://api.anthropic.com/v1/messages'
logger.info(f"ClaudeProviderHandler: Making request to {api_url}")
# Make request using direct HTTP (kilocode method)
if stream:
logger.info(f"ClaudeProviderHandler: Using streaming mode with direct HTTP")
# Get auth headers with Bearer token (streaming mode)
headers = self._get_auth_headers(stream=True)
# Log the full request for debugging
if AISBF_DEBUG:
logging.info(f"=== NON-STREAMING REQUEST DEBUG ===")
logging.info(f"URL: {api_url}")
logging.info(f"Headers (auth redacted): {json.dumps({k: v for k, v in headers.items() if k.lower() != 'authorization'}, indent=2)}")
logging.info(f"Payload: {json.dumps(payload, indent=2)}")
logging.info(f"=== END NON-STREAMING REQUEST DEBUG ===")
# Non-streaming request
response = await self.client.post(api_url, headers=headers, json=payload)
logger.info(f"=== STREAMING REQUEST DEBUG ===")
logger.info(f"URL: {api_url}")
logger.info(f"Headers: {json.dumps({k: v for k, v in headers.items() if k.lower() != 'authorization'}, indent=2)}")
logger.info(f"Payload: {json.dumps(payload, indent=2)}")
logger.info(f"=== END STREAMING REQUEST DEBUG ===")
logging.info(f"ClaudeProviderHandler: Response status: {response.status_code}")
# Check for 429 rate limit error before raising
if response.status_code == 429:
try:
response_data = response.json()
except Exception:
response_data = response.text
self.handle_429_error(response_data, dict(response.headers))
response.raise_for_status()
# Log error details for non-2xx responses
if response.status_code >= 400:
try:
error_body = response.json()
error_message = error_body.get('error', {}).get('message', 'Unknown error')
error_type = error_body.get('error', {}).get('type', 'unknown')
logging.error(f"ClaudeProviderHandler: API error response: {json.dumps(error_body, indent=2)}")
logging.error(f"ClaudeProviderHandler: Error type: {error_type}")
logging.error(f"ClaudeProviderHandler: Error message: {error_message}")
except Exception:
logging.error(f"ClaudeProviderHandler: API error response (text): {response.text}")
return self._handle_streaming_request(api_url, payload, headers, model)
# Get auth headers with Bearer token (non-streaming mode)
headers = self._get_auth_headers(stream=False)
# Log the full request for debugging
if AISBF_DEBUG:
logger.info(f"=== NON-STREAMING REQUEST DEBUG ===")
logger.info(f"URL: {api_url}")
logger.info(f"Headers (auth redacted): {json.dumps({k: v for k, v in headers.items() if k.lower() != 'authorization'}, indent=2)}")
logger.info(f"Payload: {json.dumps(payload, indent=2)}")
logger.info(f"=== END NON-STREAMING REQUEST DEBUG ===")
# Non-streaming request
response = await self.client.post(api_url, headers=headers, json=payload)
logger.info(f"ClaudeProviderHandler: Response status: {response.status_code}")
# Check for 429 rate limit error before raising
if response.status_code == 429:
try:
response_data = response.json()
except Exception:
response_data = response.text
self.handle_429_error(response_data, dict(response.headers))
response.raise_for_status()
claude_response = response.json()
if AISBF_DEBUG:
logging.info(f"ClaudeProviderHandler: API response: {json.dumps(claude_response, indent=2)}")
logging.info(f"ClaudeProviderHandler: Response received successfully via direct HTTP")
self.record_success()
# Convert Claude API response to OpenAI format
openai_response = self._convert_to_openai_format(claude_response, model)
return openai_response
except Exception as e:
import logging
logging.error(f"ClaudeProviderHandler: Error: {str(e)}", exc_info=True)
self.record_failure()
raise e
# Log error details for non-2xx responses
if response.status_code >= 400:
try:
error_body = response.json()
error_message = error_body.get('error', {}).get('message', 'Unknown error')
error_type = error_body.get('error', {}).get('type', 'unknown')
logger.error(f"ClaudeProviderHandler: API error response: {json.dumps(error_body, indent=2)}")
logger.error(f"ClaudeProviderHandler: Error type: {error_type}")
logger.error(f"ClaudeProviderHandler: Error message: {error_message}")
except Exception:
logger.error(f"ClaudeProviderHandler: API error response (text): {response.text}")
response.raise_for_status()
claude_response = response.json()
if AISBF_DEBUG:
logger.info(f"ClaudeProviderHandler: API response: {json.dumps(claude_response, indent=2)}")
logger.info(f"ClaudeProviderHandler: Response received successfully via direct HTTP")
self.record_success()
# Convert Claude API response to OpenAI format
openai_response = self._convert_to_openai_format(claude_response, model)
return openai_response
async def _handle_streaming_request(self, api_url: str, payload: Dict, headers: Dict, model: str):
"""Handle streaming request to Claude API using direct HTTP (kilocode method)."""
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment