refactor(credentials): move provider validation into handler layer

- Add BaseProviderHandler.validate_credentials() with default api_key check
- Implement provider-specific validate_credentials() overrides:
  - Kiro: validates creds_file/sqlite_db/token persistence (file for admin,
    DB path for users)
  - OpenAI/Anthropic/Google: validate api_key presence/format
  - Claude/Codex/Kilo/Qwen: validate OAuth2 or API key
  - Ollama: always valid (no authentication required)
- get_provider_handler() now calls handler.validate_credentials() after
  instantiation, raising ValueError on failure
- Replace all credential validation in main.py API endpoints with
  handler-level checks, removing duplicate logic
- get_provider_models() now uses get_provider_handler() for unified validation
  instead of scattered inline checks
- Remove obsolete validate_kiro_credentials() function from main.py
- All validation respects user vs admin credential storage (DB vs files)
parent 002ed209
......@@ -136,5 +136,16 @@ def get_provider_handler(provider_id: str, api_key: Optional[str] = None, user_i
handler.provider_config = provider_config
logger.info(f"Handler created: {handler.__class__.__name__}")
# Validate credentials for this provider
try:
if not handler.validate_credentials():
logger.error(f"Provider '{provider_id}' credentials validation failed")
raise ValueError(f"Provider '{provider_id}' credentials not valid or not configured")
logger.info(f"Credentials validated for provider '{provider_id}'")
except Exception as e:
logger.error(f"Error validating credentials for provider '{provider_id}': {e}")
raise
logger.info(f"=== get_provider_handler END ===")
return handler
......@@ -33,6 +33,20 @@ class AnthropicProviderHandler(BaseProviderHandler):
def __init__(self, provider_id: str, api_key: str):
super().__init__(provider_id, api_key)
self.client = Anthropic(api_key=api_key)
def validate_credentials(self) -> bool:
"""Validate Anthropic API key presence."""
if not self.api_key:
logging.error(f"[{self.provider_id}] API key required but not provided")
return False
stripped = self.api_key.strip()
if not stripped or stripped.startswith('YOUR_'):
logging.error(f"[{self.provider_id}] API key appears to be a placeholder")
return False
logging.info(f"[{self.provider_id}] API key validated")
return True
async def handle_request(self, model: str, messages: List[Dict], max_tokens: Optional[int] = None,
temperature: Optional[float] = 1.0, stream: Optional[bool] = False,
......
......@@ -857,6 +857,80 @@ class BaseProviderHandler:
except Exception:
pass
def validate_credentials(self) -> bool:
"""
Validate provider credentials.
Returns:
True if credentials are valid or validation is not needed.
False if credentials are invalid/missing.
Base implementation checks only if api_key_required=True and api_key is missing/empty.
Override in subclasses for provider-specific validation (e.g., Kiro credential files).
"""
import logging
logger = logging.getLogger(__name__)
# Determine which config to use: user-specific or global
is_user_context = self.user_id is not None and hasattr(self, 'user_provider_config') and self.user_provider_config is not None
provider_config = self.user_provider_config if is_user_context else None
if provider_config is None:
from ..config import config
provider_config = config.providers.get(self.provider_id)
if provider_config is None:
logger.error(f"[{self.provider_id}] Provider configuration not found")
return False
# Check if this provider requires authentication
if isinstance(provider_config, dict):
api_key_required = provider_config.get('api_key_required', False)
else:
api_key_required = getattr(provider_config, 'api_key_required', False)
if not api_key_required:
logger.debug(f"[{self.provider_id}] No API key required, skipping credential validation")
return True
# Check if API key is provided (either from config or passed to constructor)
if not self.api_key:
# Also check if it might be in config
if isinstance(provider_config, dict):
api_key_from_config = provider_config.get('api_key')
else:
api_key_from_config = getattr(provider_config, 'api_key', None)
if api_key_from_config:
self.api_key = api_key_from_config
else:
logger.error(f"[{self.provider_id}] API key required but not provided")
return False
# Check for placeholder/empty API key
if isinstance(self.api_key, str):
stripped = self.api_key.strip()
if not stripped or stripped.startswith('YOUR_'):
logger.error(f"[{self.provider_id}] Invalid API key format")
return False
logger.info(f"[{self.provider_id}] API key present, validation passed")
return True
# Check if API key is provided
if not self.api_key:
logger.error(f"[{self.provider_id}] API key required but not provided")
return False
# Check for placeholder/empty API key
if isinstance(self.api_key, str):
stripped = self.api_key.strip()
if not stripped or stripped.startswith('YOUR_') or 'placeholder' in stripped.lower():
logger.error(f"[{self.provider_id}] Invalid API key format")
return False
logger.info(f"[{self.provider_id}] API key present, validation passed")
return True
def parse_429_response(self, response_data: Union[Dict, str], headers: Dict = None) -> Optional[int]:
"""
Parse 429 rate limit response to extract wait time in seconds.
......
This diff is collapsed.
......@@ -92,34 +92,37 @@ class CodexProviderHandler(BaseProviderHandler):
else getattr(provider_config, 'api_key', None)) if provider_config else None
self._use_api_key_mode = bool(api_key or _cfg_api_key)
self._account_id = None # Will be extracted from ID token in OAuth2 mode
# Set base URL from config (default endpoint)
# This will be overridden for OAuth2 mode when credentials are validated
_endpoint = (provider_config.get('endpoint') if isinstance(provider_config, dict)
else getattr(provider_config, 'endpoint', None)) if provider_config else None
self.base_url = _endpoint or "https://api.openai.com/v1"
# API Key Mode: Initialize OpenAI client with configured endpoint
def validate_credentials(self) -> bool:
"""
Validate Codex credentials.
In API key mode: checks if api_key is present and valid.
In OAuth2 mode: checks if OAuth2 is authenticated via is_authenticated().
Returns:
True if credentials are valid, False otherwise.
"""
import logging
logger = logging.getLogger(__name__)
if self._use_api_key_mode:
resolved_api_key = api_key or _cfg_api_key
self.client = OpenAI(
base_url=self.base_url,
api_key=resolved_api_key or "dummy",
default_headers={
"User-Agent": "codex-cli/1.0.0",
}
)
logger.info(f"CodexProviderHandler: Initialized in API Key mode with endpoint: {self.base_url}")
logger.info(f"[{self.provider_id}] Codex using API key mode")
if self.api_key and self.api_key != "placeholder":
logger.debug(f"[{self.provider_id}] Codex API key present")
return True
logger.error(f"[{self.provider_id}] Codex API key missing or placeholder")
return False
else:
# OAuth2 Mode: Check if OAuth2 is authenticated
# If authenticated, use ChatGPT backend; otherwise use configured endpoint
if self.oauth2.is_authenticated():
self.base_url = "https://chatgpt.com/backend-api"
logger.info(f"CodexProviderHandler: Initialized in OAuth2 mode with ChatGPT backend: {self.base_url}")
else:
# Not yet authenticated, keep configured endpoint
logger.info(f"CodexProviderHandler: Initialized in OAuth2 mode (not authenticated yet) with endpoint: {self.base_url}")
self.client = None # Not used in OAuth2 mode
if hasattr(self, 'oauth2') and self.oauth2:
is_auth = self.oauth2.is_authenticated()
if is_auth:
logger.info(f"[{self.provider_id}] Codex OAuth2 credentials are valid")
else:
logger.error(f"[{self.provider_id}] Codex OAuth2 credentials are invalid or missing")
return is_auth
logger.error(f"[{self.provider_id}] No OAuth2 instance configured for Codex")
return False
def _load_oauth2_from_db(self, provider_id: str, credentials_file: str, issuer: str) -> CodexOAuth2:
"""
......
......@@ -37,6 +37,20 @@ class GoogleProviderHandler(BaseProviderHandler):
self.client = genai.Client(api_key=api_key)
# Cache storage for Google Context Caching
self._cached_content_refs = {} # {cache_key: (cached_content_name, expiry_time)}
def validate_credentials(self) -> bool:
"""Validate Google (Gemini) API key presence."""
import logging
logger = logging.getLogger(__name__)
if not self.api_key:
logger.error(f"[{self.provider_id}] API key required but not provided")
return False
stripped = self.api_key.strip()
if not stripped or stripped.startswith('YOUR_'):
logger.error(f"[{self.provider_id}] Invalid API key format")
return False
logger.info(f"[{self.provider_id}] API key validated")
return True
async def handle_request(self, model: str, messages: List[Dict], max_tokens: Optional[int] = None,
temperature: Optional[float] = 1.0, stream: Optional[bool] = False,
......
......@@ -125,7 +125,38 @@ class KiloProviderHandler(BaseProviderHandler):
self._kilo_endpoint = endpoint
self.client = OpenAI(base_url=endpoint, api_key=api_key or "placeholder")
self.client = OpenAI(base_url=endpoint, api_key=api_key or "placeholder")
def validate_credentials(self) -> bool:
"""
Validate Kilo credentials.
In API key mode: checks if api_key is present and valid (not placeholder).
In OAuth2 mode: checks if OAuth2 is authenticated via is_authenticated().
Returns:
True if credentials are valid, False otherwise.
"""
import logging
logger = logging.getLogger(__name__)
if self._use_api_key_auth:
logger.info(f"[{self.provider_id}] Kilo using API key mode")
if self.api_key and self.api_key != "placeholder":
logger.debug(f"[{self.provider_id}] Kilo API key present")
return True
logger.error(f"[{self.provider_id}] Kilo API key missing or placeholder")
return False
else:
if hasattr(self, 'oauth2') and self.oauth2:
is_auth = self.oauth2.is_authenticated()
if is_auth:
logger.info(f"[{self.provider_id}] Kilo OAuth2 credentials are valid")
else:
logger.error(f"[{self.provider_id}] Kilo OAuth2 credentials are invalid or missing")
return is_auth
logger.error(f"[{self.provider_id}] No OAuth2 instance configured for Kilo")
return False
def _load_oauth2_from_db(self, provider_id: str, credentials_file: str, api_base: str):
"""
......
......@@ -52,60 +52,105 @@ class KiroProviderHandler(BaseProviderHandler):
"""
def __init__(self, provider_id: str, api_key: str):
super().__init__(provider_id, api_key)
self.provider_config = config.get_provider(provider_id)
# Don't load provider_config here — get_provider_handler will set it after creation
self.region = "us-east-1" # Default region
# Import AuthType for checking auth type
from ...auth.kiro import AuthType
self.AuthType = AuthType
# Initialize KiroAuthManager with credentials from config
# Initialize KiroAuthManager lazily on first use
self.auth_manager = None
self._init_auth_manager()
self._kiro_config = None # Will be populated from provider_config
# HTTP client for making requests
self.client = httpx.AsyncClient(timeout=httpx.Timeout(300.0, connect=30.0))
def _init_auth_manager(self):
"""Initialize KiroAuthManager with credentials from config"""
def _ensure_auth_manager(self):
"""Initialize auth manager if not already done, using current provider_config."""
if self.auth_manager is not None:
return
from ...auth.kiro import KiroAuthManager
# Get kiro_config from provider_config (set by get_provider_handler)
provider_config = getattr(self, 'provider_config', None) or getattr(self, 'user_provider_config', None)
if not provider_config:
# Fallback to global config (shouldn't normally happen)
from ...config import config
provider_config = config.get_provider(self.provider_id)
# Extract kiro_config (handle dict or object)
kiro_config = getattr(provider_config, 'kiro_config', {}) if hasattr(provider_config, 'kiro_config') else provider_config.get('kiro_config', {})
# Extract credential parameters
refresh_token = kiro_config.get('refresh_token') if isinstance(kiro_config, dict) else None
profile_arn = kiro_config.get('profile_arn') if isinstance(kiro_config, dict) else None
region = kiro_config.get('region', 'us-east-1') if isinstance(kiro_config, dict) else getattr(provider_config, 'region', 'us-east-1')
creds_file = kiro_config.get('creds_file') if isinstance(kiro_config, dict) else getattr(provider_config, 'creds_file', None)
sqlite_db = kiro_config.get('sqlite_db') if isinstance(kiro_config, dict) else getattr(provider_config, 'sqlite_db', None)
client_id = kiro_config.get('client_id') if isinstance(kiro_config, dict) else getattr(provider_config, 'client_id', None)
client_secret = kiro_config.get('client_secret') if isinstance(kiro_config, dict) else getattr(provider_config, 'client_secret', None)
self.region = region
# Initialize auth manager
self.auth_manager = KiroAuthManager(
refresh_token=refresh_token,
profile_arn=profile_arn,
region=region,
creds_file=creds_file,
sqlite_db=sqlite_db,
client_id=client_id,
client_secret=client_secret
)
def validate_credentials(self) -> bool:
"""
Validate Kiro-specific credentials.
Checks that credential files (creds_file or sqlite_db) exist and
that auth manager can successfully load credentials. Also validates
token/profile presence based on storage type.
"""
try:
from ...auth.kiro import KiroAuthManager
# Get Kiro-specific configuration from provider config
kiro_config = getattr(self.provider_config, 'kiro_config', None)
if not kiro_config:
logging.warning(f"No kiro_config found in provider {self.provider_id}, using defaults")
kiro_config = {}
# Extract credentials from provider config
refresh_token = kiro_config.get('refresh_token') if isinstance(kiro_config, dict) else None
profile_arn = kiro_config.get('profile_arn') if isinstance(kiro_config, dict) else None
region = kiro_config.get('region', 'us-east-1') if isinstance(kiro_config, dict) else 'us-east-1'
creds_file = kiro_config.get('creds_file') if isinstance(kiro_config, dict) else None
sqlite_db = kiro_config.get('sqlite_db') if isinstance(kiro_config, dict) else None
client_id = kiro_config.get('client_id') if isinstance(kiro_config, dict) else None
client_secret = kiro_config.get('client_secret') if isinstance(kiro_config, dict) else None
self.region = region
# Initialize auth manager
self.auth_manager = KiroAuthManager(
refresh_token=refresh_token,
profile_arn=profile_arn,
region=region,
creds_file=creds_file,
sqlite_db=sqlite_db,
client_id=client_id,
client_secret=client_secret
)
logging.info(f"KiroProviderHandler: Auth manager initialized for region {region}")
self._ensure_auth_manager()
except Exception as e:
logging.error(f"Failed to initialize KiroAuthManager: {e}")
self.auth_manager = None
logging.error(f"[{self.provider_id}] Failed to initialize auth manager: {e}")
return False
if not self.auth_manager:
logging.error(f"[{self.provider_id}] Auth manager not initialized")
return False
# Check for credential sources
creds_file = getattr(self.auth_manager, 'creds_file', None)
sqlite_db = getattr(self.auth_manager, 'sqlite_db', None)
refresh_token = getattr(self.auth_manager, 'refresh_token', None)
profile_arn = getattr(self.auth_manager, 'profile_arn', None)
has_creds_file = creds_file and Path(creds_file).expanduser().exists()
has_sqlite_db = sqlite_db and Path(sqlite_db).expanduser().exists()
has_token = bool(refresh_token or profile_arn)
if not (has_creds_file or has_sqlite_db or has_token):
logging.error(
f"[{self.provider_id}] No Kiro credentials found. "
f"Need creds_file, sqlite_db, or refresh_token/profile_arn in kiro_config."
)
return False
if creds_file and not has_creds_file:
logging.error(f"[{self.provider_id}] Kiro creds_file not found: {creds_file}")
return False
if sqlite_db and not has_sqlite_db:
logging.error(f"[{self.provider_id}] Kiro sqlite_db not found: {sqlite_db}")
return False
logging.info(f"[{self.provider_id}] Kiro credentials validated successfully")
return True
async def handle_request(self, model: str, messages: List[Dict], max_tokens: Optional[int] = None,
temperature: Optional[float] = 1.0, stream: Optional[bool] = False,
tools: Optional[List[Dict]] = None, tool_choice: Optional[Union[str, Dict]] = None) -> Union[Dict, object]:
......@@ -121,6 +166,9 @@ class KiroProviderHandler(BaseProviderHandler):
logging.info(f"KiroProviderHandler: Messages count: {len(messages)}")
logging.info(f"KiroProviderHandler: Tools count: {len(tools) if tools else 0}")
# Ensure auth manager is initialized and credentials are valid
self._ensure_auth_manager()
if not self.auth_manager:
raise Exception("Kiro authentication not configured. Please set kiro_config in provider configuration.")
......
......@@ -38,7 +38,24 @@ class OllamaProviderHandler(BaseProviderHandler):
pool=60.0
)
self.client = httpx.AsyncClient(base_url=config.providers[provider_id].endpoint, timeout=timeout)
def validate_credentials(self) -> bool:
"""
Validate Ollama credentials.
Ollama typically runs locally without authentication.
If an API key is configured, it's noted but not required for validation.
Returns:
Always True (Ollama doesn't require credential validation).
"""
import logging
logger = logging.getLogger(__name__)
logger.debug(f"[{self.provider_id}] Ollama provider - no credentials required (local or trusted endpoint)")
if self.api_key:
logger.debug(f"[{self.provider_id}] Ollama API key is configured (optional)")
return True
async def handle_request(self, model: str, messages: List[Dict], max_tokens: Optional[int] = None,
temperature: Optional[float] = 1.0, stream: Optional[bool] = False,
tools: Optional[List[Dict]] = None, tool_choice: Optional[Union[str, Dict]] = None) -> Dict:
......
......@@ -33,6 +33,20 @@ class OpenAIProviderHandler(BaseProviderHandler):
def __init__(self, provider_id: str, api_key: str):
super().__init__(provider_id, api_key)
self.client = OpenAI(base_url=config.providers[provider_id].endpoint, api_key=api_key)
def validate_credentials(self) -> bool:
"""Validate OpenAI API key presence."""
if not self.api_key:
logging.error(f"[{self.provider_id}] API key required but not provided")
return False
stripped = self.api_key.strip()
if not stripped or stripped.startswith('YOUR_'):
logging.error(f"[{self.provider_id}] API key appears to be a placeholder")
return False
logging.info(f"[{self.provider_id}] API key validated")
return True
async def handle_request(self, model: str, messages: List[Dict], max_tokens: Optional[int] = None,
temperature: Optional[float] = 1.0, stream: Optional[bool] = False,
......
......@@ -106,6 +106,53 @@ class QwenProviderHandler(BaseProviderHandler):
# OpenAI SDK client (will be configured dynamically with OAuth token)
self._sdk_client = None
def validate_credentials(self) -> bool:
"""
Validate Qwen credentials.
In API key mode: checks if qwen_config.api_key is present and valid.
In OAuth2 mode: checks if OAuth2 is authenticated via is_authenticated().
Note: As of April 2026, Qwen OAuth2 service has been discontinued.
API key authentication is the recommended method.
Returns:
True if credentials are valid, False otherwise.
"""
import logging
logger = logging.getLogger(__name__)
# Check if API key mode is configured
if isinstance(self.provider_config, dict):
qwen_config = self.provider_config.get('qwen_config')
else:
qwen_config = getattr(self.provider_config, 'qwen_config', None)
api_key = qwen_config.get('api_key') if qwen_config and isinstance(qwen_config, dict) else None
if api_key:
logger.info(f"[{self.provider_id}] Qwen using API key authentication")
if api_key and api_key != "placeholder":
logger.debug(f"[{self.provider_id}] Qwen API key present")
return True
logger.error(f"[{self.provider_id}] Qwen API key is placeholder or missing")
return False
else:
# OAuth2 mode (discontinued but code maintained for future)
if hasattr(self, 'auth') and self.auth:
is_auth = self.auth.is_authenticated()
if is_auth:
logger.info(f"[{self.provider_id}] Qwen OAuth2 credentials are valid")
else:
logger.error(f"[{self.provider_id}] Qwen OAuth2 credentials are invalid or missing")
logger.warning(
"Qwen OAuth2 service has been discontinued by Qwen. "
"Tokens obtained from chat.qwen.ai are no longer accepted by DashScope API. "
"Please use API key authentication instead."
)
return is_auth
logger.error(f"[{self.provider_id}] No authentication method configured for Qwen")
return False
def _load_auth_from_db(self, provider_id: str, credentials_file: str):
"""
Load OAuth2 credentials:
......
This diff is collapsed.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment