Commit 4bba04b4 authored by Your Name's avatar Your Name

feat: Add Codex provider with OAuth2 Device Authorization Grant support (v0.9.8)

- New codex provider type using OpenAI-compatible protocol
- OAuth2 authentication via Device Authorization Grant flow
- Provider handler in aisbf/providers/codex.py
- OAuth2 handler in aisbf/auth/codex.py
- Dashboard integration with authentication UI
- Token refresh with automatic retry
- API key exchange from ID token
- Updated version to 0.9.8 in setup.py and pyproject.toml
- Updated CHANGELOG.md, README.md, PYPI.md with Codex documentation
- Added codex provider configuration to config/providers.json
parent 1491d963
...@@ -7,6 +7,24 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ...@@ -7,6 +7,24 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## [Unreleased] ## [Unreleased]
## [0.9.8] - 2026-04-04
### Added
- **Codex Provider (OAuth2)**: Full support for OpenAI Codex using OAuth2 Device Authorization Grant
- New `codex` provider type with OpenAI-compatible API protocol
- OAuth2 authentication via `aisbf/auth/codex.py` with device code flow
- Provider handler in `aisbf/providers/codex.py` extending OpenAI protocol
- Dashboard integration with authentication UI (device code flow)
- Token refresh with automatic retry
- API key exchange from ID token for direct API access
- Credentials stored in `~/.aisbf/codex_credentials.json`
- Uses OpenAI's OAuth2 endpoints (`https://auth.openai.com`)
- No localhost callback port needed (device code flow like Kilo)
- Dashboard endpoints: `/dashboard/codex/auth/start`, `/dashboard/codex/auth/poll`, `/dashboard/codex/auth/status`, `/dashboard/codex/auth/logout`
### Changed
- **Version Bump**: Updated version to 0.9.8 in setup.py and pyproject.toml
## [0.9.7] - 2026-04-03 ## [0.9.7] - 2026-04-03
### Fixed ### Fixed
......
...@@ -156,6 +156,11 @@ AISBF supports OAuth2 authentication for several providers: ...@@ -156,6 +156,11 @@ AISBF supports OAuth2 authentication for several providers:
- Device Authorization Grant OAuth2 flow - Device Authorization Grant OAuth2 flow
- Seamless integration with Kilocode services - Seamless integration with Kilocode services
### Codex (OpenAI)
- Device Authorization Grant OAuth2 flow (same protocol as OpenAI)
- Automatic token refresh and API key exchange
- Dashboard integration for easy authentication
**Setup Instructions:** **Setup Instructions:**
1. Start AISBF: `aisbf` 1. Start AISBF: `aisbf`
2. Access dashboard: `http://localhost:17765/dashboard` 2. Access dashboard: `http://localhost:17765/dashboard`
......
...@@ -24,9 +24,10 @@ Access the dashboard at `http://localhost:17765/dashboard` (default credentials: ...@@ -24,9 +24,10 @@ Access the dashboard at `http://localhost:17765/dashboard` (default credentials:
## Key Features ## Key Features
- **Multi-Provider Support**: Unified interface for Google, OpenAI, Anthropic, Ollama, Kiro (Amazon Q Developer), Kiro-cli, Claude Code (OAuth2), and Kilocode (OAuth2) - **Multi-Provider Support**: Unified interface for Google, OpenAI, Anthropic, Ollama, Kiro (Amazon Q Developer), Kiro-cli, Claude Code (OAuth2), Kilocode (OAuth2), and Codex (OAuth2)
- **Claude OAuth2 Authentication**: Full OAuth2 PKCE flow for Claude Code with automatic token refresh and Chrome extension for remote servers - **Claude OAuth2 Authentication**: Full OAuth2 PKCE flow for Claude Code with automatic token refresh and Chrome extension for remote servers
- **Kilocode OAuth2 Authentication**: OAuth2 Device Authorization Grant for Kilo Code with automatic token refresh - **Kilocode OAuth2 Authentication**: OAuth2 Device Authorization Grant for Kilo Code with automatic token refresh
- **Codex OAuth2 Authentication**: OAuth2 Device Authorization Grant for OpenAI Codex with automatic token refresh and API key exchange
- **Rotation Models**: Weighted load balancing across multiple providers with automatic failover - **Rotation Models**: Weighted load balancing across multiple providers with automatic failover
- **Autoselect Models**: AI-powered model selection based on content analysis and request characteristics - **Autoselect Models**: AI-powered model selection based on content analysis and request characteristics
- **Semantic Classification**: Fast hybrid BM25 + semantic model selection using sentence transformers (optional) - **Semantic Classification**: Fast hybrid BM25 + semantic model selection using sentence transformers (optional)
...@@ -134,6 +135,7 @@ See [`PYPI.md`](PYPI.md) for detailed instructions on publishing to PyPI. ...@@ -134,6 +135,7 @@ See [`PYPI.md`](PYPI.md) for detailed instructions on publishing to PyPI.
- Kiro (Amazon Q Developer / AWS CodeWhisperer) - Kiro (Amazon Q Developer / AWS CodeWhisperer)
- Kiro-cli (Amazon Q Developer CLI authentication) - Kiro-cli (Amazon Q Developer CLI authentication)
- Kilocode (OAuth2 Device Authorization Grant) - Kilocode (OAuth2 Device Authorization Grant)
- Codex (OAuth2 Device Authorization Grant - OpenAI protocol)
### Kiro-cli Provider Support ### Kiro-cli Provider Support
...@@ -218,6 +220,50 @@ AISBF supports Kilo Code as a provider using OAuth2 Device Authorization Grant: ...@@ -218,6 +220,50 @@ AISBF supports Kilo Code as a provider using OAuth2 Device Authorization Grant:
``` ```
See [`KILO_OAUTH2.md`](KILO_OAUTH2.md) for detailed setup instructions. See [`KILO_OAUTH2.md`](KILO_OAUTH2.md) for detailed setup instructions.
### Codex OAuth2 Authentication
AISBF supports OpenAI Codex as a provider using OAuth2 Device Authorization Grant:
#### Features
- Full OAuth2 Device Authorization Grant flow (same protocol as OpenAI)
- Automatic token refresh with refresh token rotation
- API key exchange from ID token for direct API access
- Dashboard integration with authentication UI
- No localhost callback port needed (device code flow)
- Credentials stored in `~/.aisbf/codex_credentials.json`
#### Setup
1. Add codex provider to configuration (via dashboard or `~/.aisbf/providers.json`)
2. Click "Authenticate with Codex (Device Code)" in dashboard
3. Complete device authorization flow at `https://auth.openai.com/codex/device`
4. Use codex models via API: `codex/<model>`
#### Configuration Example
```json
{
"providers": {
"codex": {
"id": "codex",
"name": "Codex (OpenAI OAuth2)",
"endpoint": "https://api.openai.com/v1",
"type": "codex",
"api_key_required": false,
"codex_config": {
"credentials_file": "~/.aisbf/codex_credentials.json",
"issuer": "https://auth.openai.com"
},
"models": [
{
"name": "gpt-4o",
"context_size": 128000
}
]
}
}
}
```
## Configuration ## Configuration
### SSL/TLS Configuration ### SSL/TLS Configuration
......
"""
Copyright (C) 2026 Stefy Lanza <stefy@nexlab.net>
AISBF - AI Service Broker Framework || AI Should Be Free
OAuth2 Device Authorization Grant implementation for Codex (OpenAI).
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <https://www.gnu.org/licenses/>.
Why did the programmer quit his job? Because he didn't get arrays!
"""
import asyncio
import base64
import hashlib
import json
import logging
import os
import secrets
import time
import urllib.parse
from datetime import datetime, timedelta
from pathlib import Path
from typing import Optional, Dict, Any
import httpx
logger = logging.getLogger(__name__)
# Codex OAuth2 Constants (from codex-oauth-implementation-guide.md)
CLIENT_ID = "app_EMoamEEZ73f0CkXaXp7hrann"
DEFAULT_ISSUER = "https://auth.openai.com"
DEFAULT_PORT = 1455
SCOPES = "openid profile email offline_access api.connectors.read api.connectors.invoke"
class CodexOAuth2:
"""
OAuth2 Device Authorization Grant implementation for Codex (OpenAI).
Implements RFC 8628 device authorization flow for CLI/desktop applications.
Supports authentication with OpenAI's Codex OAuth2 endpoints.
"""
def __init__(self, credentials_file: Optional[str] = None, issuer: Optional[str] = None):
"""
Initialize Codex OAuth2 client.
Args:
credentials_file: Path to credentials JSON file (default: ~/.aisbf/codex_credentials.json)
issuer: OAuth2 issuer URL (default: https://auth.openai.com)
"""
self.credentials_file = credentials_file or os.path.expanduser("~/.aisbf/codex_credentials.json")
self.issuer = (issuer or DEFAULT_ISSUER).rstrip("/")
self.credentials = None
self._load_credentials()
def _load_credentials(self) -> None:
"""Load credentials from file if it exists."""
if os.path.exists(self.credentials_file):
try:
with open(self.credentials_file, 'r') as f:
self.credentials = json.load(f)
logger.info(f"CodexOAuth2: Loaded credentials from {self.credentials_file}")
except Exception as e:
logger.warning(f"CodexOAuth2: Failed to load credentials: {e}")
self.credentials = None
def _save_credentials(self, credentials: Dict[str, Any]) -> None:
"""
Save credentials to file with secure permissions.
Args:
credentials: Credentials dict to save
"""
try:
# Ensure directory exists
os.makedirs(os.path.dirname(self.credentials_file), exist_ok=True)
# Write credentials
with open(self.credentials_file, 'w') as f:
json.dump(credentials, f, indent=2)
# Set file permissions to 0o600 (user read/write only)
os.chmod(self.credentials_file, 0o600)
self.credentials = credentials
logger.info(f"CodexOAuth2: Saved credentials to {self.credentials_file}")
except Exception as e:
logger.error(f"CodexOAuth2: Failed to save credentials: {e}")
raise
@staticmethod
def generate_pkce() -> tuple:
"""Generate PKCE code verifier and challenge."""
code_verifier = base64.urlsafe_b64encode(secrets.token_bytes(64)).rstrip(b'=').decode()
code_challenge = base64.urlsafe_b64encode(
hashlib.sha256(code_verifier.encode()).digest()
).rstrip(b'=').decode()
return code_verifier, code_challenge
@staticmethod
def generate_state() -> str:
"""Generate CSRF state parameter."""
return base64.urlsafe_b64encode(secrets.token_bytes(32)).rstrip(b'=').decode()
def build_authorization_url(
self,
redirect_uri: str,
code_challenge: str,
state: str,
workspace_id: Optional[str] = None,
) -> str:
"""Build the OAuth authorization URL."""
params = {
"response_type": "code",
"client_id": CLIENT_ID,
"redirect_uri": redirect_uri,
"scope": SCOPES,
"code_challenge": code_challenge,
"code_challenge_method": "S256",
"id_token_add_organizations": "true",
"codex_cli_simplified_flow": "true",
"state": state,
"originator": "codex_cli",
}
if workspace_id:
params["allowed_workspace_id"] = workspace_id
query = urllib.parse.urlencode(params)
return f"{self.issuer}/oauth/authorize?{query}"
async def exchange_code_for_tokens(
self,
code: str,
redirect_uri: str,
code_verifier: str,
) -> dict:
"""Exchange authorization code for tokens."""
token_url = f"{self.issuer}/oauth/token"
body = (
f"grant_type=authorization_code"
f"&code={urllib.parse.quote(code)}"
f"&redirect_uri={urllib.parse.quote(redirect_uri)}"
f"&client_id={urllib.parse.quote(CLIENT_ID)}"
f"&code_verifier={urllib.parse.quote(code_verifier)}"
)
async with httpx.AsyncClient() as client:
response = await client.post(
token_url,
headers={"Content-Type": "application/x-www-form-urlencoded"},
data=body,
timeout=30.0
)
response.raise_for_status()
return response.json()
async def refresh_tokens(self, refresh_token: str) -> dict:
"""Refresh tokens using the refresh token."""
token_url = f"{self.issuer}/oauth/token"
async with httpx.AsyncClient() as client:
response = await client.post(
token_url,
headers={"Content-Type": "application/json"},
json={
"client_id": CLIENT_ID,
"grant_type": "refresh_token",
"refresh_token": refresh_token,
},
timeout=30.0
)
if response.status_code == 401:
body = response.json()
error = body.get("error", {})
error_code = error.get("code", "unknown") if isinstance(error, dict) else str(error)
messages = {
"refresh_token_expired": "Your refresh token has expired. Please log out and sign in again.",
"refresh_token_reused": "Your refresh token was already used. Please log out and sign in again.",
"refresh_token_invalidated": "Your refresh token was revoked. Please log out and sign in again.",
}
raise Exception(messages.get(error_code, "Your access token could not be refreshed. Please log out and sign in again."))
response.raise_for_status()
return response.json()
async def obtain_api_key(self, id_token: str) -> str:
"""Exchange ID token for an OpenAI API key."""
token_url = f"{self.issuer}/oauth/token"
body = (
f"grant_type={urllib.parse.quote('urn:ietf:params:oauth:grant-type:token-exchange')}"
f"&client_id={urllib.parse.quote(CLIENT_ID)}"
f"&requested_token={urllib.parse.quote('openai-api-key')}"
f"&subject_token={urllib.parse.quote(id_token)}"
f"&subject_token_type={urllib.parse.quote('urn:ietf:params:oauth:token-type:id_token')}"
)
async with httpx.AsyncClient() as client:
response = await client.post(
token_url,
headers={"Content-Type": "application/x-www-form-urlencoded"},
data=body,
timeout=30.0
)
response.raise_for_status()
return response.json()["access_token"]
async def request_device_code(self) -> dict:
"""Request a device code for headless login."""
url = f"{self.issuer}/api/accounts/deviceauth/usercode"
async with httpx.AsyncClient() as client:
response = await client.post(
url,
headers={"Content-Type": "application/json"},
json={"client_id": CLIENT_ID},
timeout=30.0
)
response.raise_for_status()
return response.json()
async def poll_device_code_token(
self,
device_auth_id: str,
user_code: str,
interval: int = 5,
) -> dict:
"""Poll for device code token."""
url = f"{self.issuer}/api/accounts/deviceauth/token"
max_wait = timedelta(minutes=15)
start = datetime.now()
async with httpx.AsyncClient() as client:
while True:
response = await client.post(
url,
headers={"Content-Type": "application/json"},
json={
"device_auth_id": device_auth_id,
"user_code": user_code,
},
timeout=30.0
)
if response.status_code == 200:
return response.json()
if response.status_code in (403, 404):
if datetime.now() - start > max_wait:
raise TimeoutError("Device auth timed out after 15 minutes")
await asyncio.sleep(interval)
continue
raise Exception(f"Device auth failed with status {response.status_code}")
async def authenticate_with_device_flow(self) -> Dict[str, Any]:
"""
Complete device authorization flow.
Returns:
Dict with authentication result
"""
# Step 1: Request device code
device_resp = await self.request_device_code()
device_auth_id = device_resp["device_auth_id"]
user_code = device_resp["user_code"]
interval = int(device_resp.get("interval", 5))
logger.info(f"CodexOAuth2: Device code initiated - user_code: {user_code}")
# Step 2: Poll for token
token_resp = await self.poll_device_code_token(device_auth_id, user_code, interval)
# Step 3: Exchange for tokens
redirect_uri = f"{self.issuer}/deviceauth/callback"
tokens = await self.exchange_code_for_tokens(
code=token_resp["authorization_code"],
redirect_uri=redirect_uri,
code_verifier=token_resp["code_verifier"],
)
# Step 4: Optionally obtain API key
api_key = None
try:
api_key = await self.obtain_api_key(tokens["id_token"])
except Exception as e:
logger.warning(f"CodexOAuth2: Failed to obtain API key: {e}")
# Step 5: Save credentials
credentials = {
"auth_mode": "codex",
"tokens": {
"id_token": tokens["id_token"],
"access_token": tokens["access_token"],
"refresh_token": tokens["refresh_token"],
"account_id": tokens.get("account_id"),
},
"openai_api_key": api_key,
"last_refresh": datetime.utcnow().isoformat() + "Z",
}
self._save_credentials(credentials)
return {
"type": "success",
"provider": "codex",
"user_code": user_code,
"verification_uri": f"{self.issuer}/codex/device",
}
async def authenticate_with_browser_flow(self, port: int = DEFAULT_PORT) -> Dict[str, Any]:
"""
Complete browser-based OAuth2 flow with PKCE.
Returns:
Dict with auth_url, state, code_verifier for external handling
"""
code_verifier, code_challenge = self.generate_pkce()
state = self.generate_state()
redirect_uri = f"http://localhost:{port}/auth/callback"
auth_url = self.build_authorization_url(
redirect_uri=redirect_uri,
code_challenge=code_challenge,
state=state,
)
return {
"auth_url": auth_url,
"state": state,
"code_verifier": code_verifier,
"redirect_uri": redirect_uri,
"port": port,
}
async def complete_browser_flow(
self,
code: str,
state: str,
code_verifier: str,
redirect_uri: str,
) -> Dict[str, Any]:
"""
Complete browser OAuth2 flow after receiving callback.
Args:
code: Authorization code from callback
state: State parameter (for CSRF verification)
code_verifier: PKCE code verifier
redirect_uri: Redirect URI used in authorization
Returns:
Dict with authentication result
"""
# Exchange code for tokens
tokens = await self.exchange_code_for_tokens(
code=code,
redirect_uri=redirect_uri,
code_verifier=code_verifier,
)
# Optionally obtain API key
api_key = None
try:
api_key = await self.obtain_api_key(tokens["id_token"])
except Exception as e:
logger.warning(f"CodexOAuth2: Failed to obtain API key: {e}")
# Save credentials
credentials = {
"auth_mode": "codex",
"tokens": {
"id_token": tokens["id_token"],
"access_token": tokens["access_token"],
"refresh_token": tokens["refresh_token"],
"account_id": tokens.get("account_id"),
},
"openai_api_key": api_key,
"last_refresh": datetime.utcnow().isoformat() + "Z",
}
self._save_credentials(credentials)
return {
"type": "success",
"provider": "codex",
}
def get_valid_token(self) -> Optional[str]:
"""
Get a valid access token, refreshing if needed.
Returns:
Access token string or None if not authenticated
"""
if not self.credentials or not self.credentials.get("tokens"):
return None
tokens = self.credentials.get("tokens", {})
access_token = tokens.get("access_token")
refresh_token = tokens.get("refresh_token")
if not access_token or not refresh_token:
return None
# Check if access token is expired (with 5 minute buffer)
try:
claims = self._parse_jwt_claims(access_token)
exp = claims.get("exp", 0)
if exp < time.time() + 300:
# Token expired or about to expire - refresh
return None
except Exception:
return None
return access_token
async def get_valid_token_with_refresh(self) -> Optional[str]:
"""
Get a valid access token, automatically refreshing if needed.
Returns:
Access token string or None if refresh fails
"""
token = self.get_valid_token()
if token:
return token
# Try to refresh
if not self.credentials or not self.credentials.get("tokens"):
return None
refresh_token = self.credentials["tokens"].get("refresh_token")
if not refresh_token:
return None
try:
new_tokens = await self.refresh_tokens(refresh_token)
# Optionally obtain new API key
api_key = None
try:
api_key = await self.obtain_api_key(new_tokens["id_token"])
except Exception:
api_key = self.credentials.get("openai_api_key")
# Update credentials
self.credentials["tokens"] = {
"id_token": new_tokens["id_token"],
"access_token": new_tokens["access_token"],
"refresh_token": new_tokens["refresh_token"],
"account_id": new_tokens.get("account_id"),
}
self.credentials["openai_api_key"] = api_key
self.credentials["last_refresh"] = datetime.utcnow().isoformat() + "Z"
self._save_credentials(self.credentials)
return new_tokens["access_token"]
except Exception as e:
logger.error(f"CodexOAuth2: Token refresh failed: {e}")
return None
def is_authenticated(self) -> bool:
"""Check if user is authenticated with valid token."""
return self.get_valid_token() is not None
def get_user_email(self) -> Optional[str]:
"""Get authenticated user's email from ID token."""
if not self.credentials or not self.credentials.get("tokens"):
return None
id_token = self.credentials["tokens"].get("id_token")
if not id_token:
return None
try:
claims = self._parse_jwt_claims(id_token)
email = claims.get("email")
profile = claims.get("https://api.openai.com/profile", {})
if not email and profile:
email = profile.get("email")
return email
except Exception:
return None
def logout(self) -> None:
"""Clear stored credentials."""
if os.path.exists(self.credentials_file):
try:
os.remove(self.credentials_file)
logger.info("CodexOAuth2: Credentials removed")
except Exception as e:
logger.error(f"CodexOAuth2: Failed to remove credentials: {e}")
self.credentials = None
@staticmethod
def _parse_jwt_claims(jwt_token: str) -> dict:
"""Parse the payload from a JWT token."""
parts = jwt_token.split(".")
if len(parts) != 3:
raise ValueError("Invalid JWT format")
# Add padding if needed
payload = parts[1]
padding = 4 - len(payload) % 4
if padding != 4:
payload += "=" * padding
decoded = base64.urlsafe_b64decode(payload)
return json.loads(decoded)
...@@ -39,6 +39,7 @@ from .claude import ClaudeProviderHandler ...@@ -39,6 +39,7 @@ from .claude import ClaudeProviderHandler
from .kiro import KiroProviderHandler from .kiro import KiroProviderHandler
from .kilo import KiloProviderHandler from .kilo import KiloProviderHandler
from .ollama import OllamaProviderHandler from .ollama import OllamaProviderHandler
from .codex import CodexProviderHandler
from ..config import config from ..config import config
...@@ -50,7 +51,8 @@ PROVIDER_HANDLERS = { ...@@ -50,7 +51,8 @@ PROVIDER_HANDLERS = {
'kiro': KiroProviderHandler, 'kiro': KiroProviderHandler,
'claude': ClaudeProviderHandler, 'claude': ClaudeProviderHandler,
'kilo': KiloProviderHandler, 'kilo': KiloProviderHandler,
'kilocode': KiloProviderHandler # Kilocode provider with OAuth2 support 'kilocode': KiloProviderHandler, # Kilocode provider with OAuth2 support
'codex': CodexProviderHandler # Codex provider with OAuth2 support (OpenAI protocol)
} }
......
"""
Copyright (C) 2026 Stefy Lanza <stefy@nexlab.net>
AISBF - AI Service Broker Framework || AI Should Be Free
Codex provider handler.
Uses the same protocol as OpenAI but with OAuth2 authentication.
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <https://www.gnu.org/licenses/>.
Why did the programmer quit his job? Because he didn't get arrays!
"""
import logging
import time
from typing import Dict, List, Optional, Union
from openai import OpenAI
from ..models import Model
from ..config import config
from ..utils import count_messages_tokens
from .base import BaseProviderHandler, AISBF_DEBUG
from ..auth.codex import CodexOAuth2
class CodexProviderHandler(BaseProviderHandler):
"""
Codex provider handler.
Uses the same OpenAI-compatible protocol but authenticates via OAuth2
using the Codex OAuth2 flow (device code or browser-based PKCE).
"""
def __init__(self, provider_id: str, api_key: Optional[str] = None):
super().__init__(provider_id, api_key)
# Get provider config
provider_config = config.providers.get(provider_id)
endpoint = provider_config.endpoint if provider_config else "https://api.openai.com/v1"
# Initialize OAuth2 client
codex_config = getattr(provider_config, 'codex_config', {}) if provider_config else {}
credentials_file = codex_config.get('credentials_file', '~/.aisbf/codex_credentials.json')
issuer = codex_config.get('issuer', 'https://auth.openai.com')
self.oauth2 = CodexOAuth2(
credentials_file=credentials_file,
issuer=issuer,
)
# Resolve API key: use provided key, or get from OAuth2, or use stored API key
resolved_api_key = api_key
if not resolved_api_key:
# Try to get OAuth2 access token
resolved_api_key = self.oauth2.get_valid_token()
if not resolved_api_key:
# Fall back to provider config API key
if provider_config and provider_config.api_key:
resolved_api_key = provider_config.api_key
self.client = OpenAI(base_url=endpoint, api_key=resolved_api_key or "dummy")
self._oauth2_enabled = not api_key and provider_config and not provider_config.api_key_required
async def _get_valid_api_key(self) -> str:
"""Get a valid API key, refreshing OAuth2 if needed."""
# If we have an API key from config, use it
provider_config = config.providers.get(self.provider_id)
if provider_config and provider_config.api_key:
return provider_config.api_key
# Try OAuth2 token
token = await self.oauth2.get_valid_token_with_refresh()
if token:
return token
raise Exception("Codex authentication required. Please authenticate via dashboard or provide API key.")
async def handle_request(
self,
model: str,
messages: List[Dict],
max_tokens: Optional[int] = None,
temperature: Optional[float] = 1.0,
stream: Optional[bool] = False,
tools: Optional[List[Dict]] = None,
tool_choice: Optional[Union[str, Dict]] = None,
) -> Union[Dict, object]:
if self.is_rate_limited():
raise Exception("Provider rate limited")
try:
logger = logging.getLogger(__name__)
logger.info(f"CodexProviderHandler: Handling request for model {model}")
if AISBF_DEBUG:
logger.info(f"CodexProviderHandler: Messages: {messages}")
else:
logger.info(f"CodexProviderHandler: Messages count: {len(messages)}")
# Apply rate limiting
await self.apply_rate_limit()
# Get valid API key (with OAuth2 refresh if needed)
api_key = await self._get_valid_api_key()
# Re-initialize client with fresh token if OAuth2 is enabled
if self._oauth2_enabled:
provider_config = config.providers.get(self.provider_id)
endpoint = provider_config.endpoint if provider_config else "https://api.openai.com/v1"
self.client = OpenAI(base_url=endpoint, api_key=api_key)
# Check if native caching is enabled for this provider
provider_config = config.providers.get(self.provider_id)
enable_native_caching = getattr(provider_config, 'enable_native_caching', False)
min_cacheable_tokens = getattr(provider_config, 'min_cacheable_tokens', 1024)
prompt_cache_key = getattr(provider_config, 'prompt_cache_key', None)
# Build request parameters
request_params = {
"model": model,
"messages": [],
"temperature": temperature,
"stream": stream
}
# Only add max_tokens if it's not None
if max_tokens is not None:
request_params["max_tokens"] = max_tokens
# Add prompt_cache_key if provided
if enable_native_caching and prompt_cache_key:
request_params["prompt_cache_key"] = prompt_cache_key
# Build messages with all fields
if enable_native_caching:
cumulative_tokens = 0
for i, msg in enumerate(messages):
message_tokens = count_messages_tokens([msg], model)
cumulative_tokens += message_tokens
message = {"role": msg["role"]}
if msg["role"] == "tool":
if "tool_call_id" in msg and msg["tool_call_id"] is not None:
message["tool_call_id"] = msg["tool_call_id"]
else:
logger.warning(f"Skipping tool message without tool_call_id: {msg}")
continue
if "content" in msg and msg["content"] is not None:
message["content"] = msg["content"]
if "tool_calls" in msg and msg["tool_calls"] is not None:
message["tool_calls"] = msg["tool_calls"]
if "name" in msg and msg["name"] is not None:
message["name"] = msg["name"]
if (msg["role"] == "system" or
(i < len(messages) - 2 and cumulative_tokens >= min_cacheable_tokens)):
message["cache_control"] = {"type": "ephemeral"}
request_params["messages"].append(message)
else:
for msg in messages:
message = {"role": msg["role"]}
if msg["role"] == "tool":
if "tool_call_id" in msg and msg["tool_call_id"] is not None:
message["tool_call_id"] = msg["tool_call_id"]
else:
logger.warning(f"Skipping tool message without tool_call_id: {msg}")
continue
if "content" in msg and msg["content"] is not None:
message["content"] = msg["content"]
if "tool_calls" in msg and msg["tool_calls"] is not None:
message["tool_calls"] = msg["tool_calls"]
if "name" in msg and msg["name"] is not None:
message["name"] = msg["name"]
request_params["messages"].append(message)
if tools is not None:
request_params["tools"] = tools
if tool_choice is not None:
request_params["tool_choice"] = tool_choice
response = self.client.chat.completions.create(**request_params)
logger.info(f"CodexProviderHandler: Response received")
self.record_success()
if AISBF_DEBUG:
logger.info(f"=== RAW CODEX RESPONSE ===")
logger.info(f"Raw response type: {type(response)}")
logger.info(f"Raw response: {response}")
logger.info(f"=== END RAW CODEX RESPONSE ===")
return response
except Exception as e:
logger = logging.getLogger(__name__)
logger.error(f"CodexProviderHandler: Error: {str(e)}", exc_info=True)
self.record_failure()
raise e
async def get_models(self) -> List[Model]:
try:
logger = logging.getLogger(__name__)
logger.info("CodexProviderHandler: Getting models list")
# Apply rate limiting
await self.apply_rate_limit()
# Get valid API key for models list
api_key = await self._get_valid_api_key()
provider_config = config.providers.get(self.provider_id)
endpoint = provider_config.endpoint if provider_config else "https://api.openai.com/v1"
# Create temporary client with fresh token
temp_client = OpenAI(base_url=endpoint, api_key=api_key)
models = temp_client.models.list()
logger.info(f"CodexProviderHandler: Models received")
result = []
for model in models:
context_size = None
if hasattr(model, 'context_window') and model.context_window:
context_size = model.context_window
elif hasattr(model, 'context_length') and model.context_length:
context_size = model.context_length
elif hasattr(model, 'max_context_length') and model.max_context_length:
context_size = model.max_context_length
pricing = None
if hasattr(model, 'pricing') and model.pricing:
pricing = model.pricing
elif hasattr(model, 'top_provider') and model.top_provider:
top_provider = model.top_provider
if hasattr(top_provider, 'dict'):
top_provider = top_provider.dict()
if isinstance(top_provider, dict):
tp_pricing = top_provider.get('pricing')
if tp_pricing:
pricing = tp_pricing
result.append(Model(
id=model.id,
name=model.id,
provider_id=self.provider_id,
context_size=context_size,
context_length=context_size,
pricing=pricing
))
return result
except Exception as e:
logger = logging.getLogger(__name__)
logger.error(f"CodexProviderHandler: Error getting models: {str(e)}", exc_info=True)
raise e
# Codex OAuth 2.1 Implementation Guide
This document explains how the Codex CLI performs OAuth 2.1 authentication, how tokens are managed, and how they are used to make API requests. It includes Python examples so you can reimplement the flow in your own code.
---
## Table of Contents
1. [Overview](#overview)
2. [Authentication Flows](#authentication-flows)
3. [Browser-Based Login (Authorization Code + PKCE)](#browser-based-login-authorization-code--pkce)
4. [Device Code Login](#device-code-login)
5. [Token Refresh](#token-refresh)
6. [Token Exchange for API Key](#token-exchange-for-api-key)
7. [How Tokens Are Used for API Requests](#how-tokens-are-used-for-api-requests)
8. [Credential Storage](#credential-storage)
9. [Complete Python Implementation](#complete-python-implementation)
10. [Key Constants and Endpoints](#key-constants-and-endpoints)
---
## Overview
Codex uses OAuth 2.1 with the following characteristics:
- **Public client**: No client secret is used (CLI apps cannot securely store secrets)
- **PKCE (Proof Key for Code Exchange)**: Required for the authorization code flow to prevent authorization code interception attacks
- **Two login methods**:
1. **Browser-based login**: Standard OAuth 2.1 Authorization Code flow with PKCE
2. **Device Code login**: For headless/remote environments where a browser is not available
- **Token types**: The OAuth flow returns three tokens:
- `id_token`: JWT containing user identity claims
- `access_token`: JWT used for API authentication (Bearer token)
- `refresh_token`: Long-lived token used to obtain new access tokens
- **Token exchange**: The `id_token` can be exchanged for an OpenAI API key via a separate endpoint
### Key Endpoints
| Endpoint | URL |
|----------|-----|
| Authorization | `https://auth.openai.com/oauth/authorize` |
| Token | `https://auth.openai.com/oauth/token` |
| Device Code User Code | `https://auth.openai.com/api/accounts/deviceauth/usercode` |
| Device Code Token Poll | `https://auth.openai.com/api/accounts/deviceauth/token` |
| Token Refresh | `https://auth.openai.com/oauth/token` |
### Client ID
```
CLIENT_ID = "app_EMoamEEZ73f0CkXaXp7hrann"
```
---
## Authentication Flows
### 1. Browser-Based Login (Authorization Code + PKCE)
This is the default flow when running `codex login` in a terminal with browser access.
#### Step 1: Generate PKCE Codes
PKCE requires two values:
- `code_verifier`: A cryptographically random string (43-128 characters)
- `code_challenge`: `BASE64URL(SHA256(code_verifier))`
```python
import secrets
import hashlib
import base64
def generate_pkce():
# Generate a random 64-byte verifier
code_verifier = base64.urlsafe_b64encode(secrets.token_bytes(64)).rstrip(b'=').decode()
# Create the challenge: BASE64URL(SHA256(verifier))
code_challenge = base64.urlsafe_b64encode(
hashlib.sha256(code_verifier.encode()).digest()
).rstrip(b'=').decode()
return code_verifier, code_challenge
```
#### Step 2: Generate State Parameter
The `state` parameter prevents CSRF attacks. It should be a cryptographically random value.
```python
import secrets
import base64
def generate_state():
return base64.urlsafe_b64encode(secrets.token_bytes(32)).rstrip(b'=').decode()
```
#### Step 3: Build Authorization URL
```python
import urllib.parse
def build_authorization_url(
issuer: str,
client_id: str,
redirect_uri: str,
code_challenge: str,
state: str,
) -> str:
params = {
"response_type": "code",
"client_id": client_id,
"redirect_uri": redirect_uri,
"scope": "openid profile email offline_access api.connectors.read api.connectors.invoke",
"code_challenge": code_challenge,
"code_challenge_method": "S256",
"id_token_add_organizations": "true",
"codex_cli_simplified_flow": "true",
"state": state,
"originator": "codex_cli",
}
query_string = urllib.parse.urlencode(params)
return f"{issuer}/oauth/authorize?{query_string}"
```
#### Step 4: Start Local Callback Server
Codex starts a local HTTP server on `localhost:1455` to receive the OAuth callback. The redirect URI is `http://localhost:{port}/auth/callback`.
```python
import http.server
import threading
from urllib.parse import urlparse, parse_qs
class OAuthCallbackHandler(http.server.BaseHTTPRequestHandler):
def do_GET(self):
parsed = urlparse(self.path)
params = parse_qs(parsed.query)
if parsed.path == "/auth/callback":
# Store the callback data
self.server.auth_code = params.get("code", [None])[0]
self.server.state = params.get("state", [None])[0]
self.server.error = params.get("error", [None])[0]
# Send response
self.send_response(302)
self.send_header("Location", f"http://localhost:{self.server.server_port}/success")
self.end_headers()
elif parsed.path == "/success":
self.send_response(200)
self.send_header("Content-Type", "text/html")
self.end_headers()
self.wfile.write(b"<h1>Login successful! You can close this window.</h1>")
elif parsed.path == "/cancel":
self.send_response(200)
self.wfile.write(b"Login cancelled")
def log_message(self, format, *args):
pass # Suppress logging
def start_callback_server(port=1455):
server = http.server.HTTPServer(("127.0.0.1", port), OAuthCallbackHandler)
server.auth_code = None
server.state = None
server.error = None
thread = threading.Thread(target=server.serve_forever, daemon=True)
thread.start()
return server
```
#### Step 5: Open Browser and Wait for Callback
```python
import webbrowser
def run_browser_login(issuer, client_id, port=1455):
code_verifier, code_challenge = generate_pkce()
state = generate_state()
redirect_uri = f"http://localhost:{port}/auth/callback"
auth_url = build_authorization_url(
issuer, client_id, redirect_uri, code_challenge, state
)
# Start callback server
server = start_callback_server(port)
# Open browser
webbrowser.open(auth_url)
# Wait for callback (with timeout)
import time
timeout = 300 # 5 minutes
start = time.time()
while server.auth_code is None and server.error is None:
if time.time() - start > timeout:
server.shutdown()
raise TimeoutError("Login timed out")
time.sleep(0.5)
if server.error:
server.shutdown()
raise Exception(f"OAuth error: {server.error}")
server.shutdown()
return server.auth_code, state, code_verifier, redirect_uri
```
#### Step 6: Exchange Authorization Code for Tokens
```python
import requests
import urllib.parse
def exchange_code_for_tokens(
issuer: str,
client_id: str,
redirect_uri: str,
code_verifier: str,
code: str,
) -> dict:
token_url = f"{issuer}/oauth/token"
body = (
f"grant_type=authorization_code"
f"&code={urllib.parse.quote(code)}"
f"&redirect_uri={urllib.parse.quote(redirect_uri)}"
f"&client_id={urllib.parse.quote(client_id)}"
f"&code_verifier={urllib.parse.quote(code_verifier)}"
)
response = requests.post(
token_url,
headers={"Content-Type": "application/x-www-form-urlencoded"},
data=body,
)
response.raise_for_status()
return response.json()
# Returns: {"id_token": "...", "access_token": "...", "refresh_token": "..."}
```
---
### 2. Device Code Login
Used for headless environments where a browser is not available.
#### Step 1: Request Device Code
```python
import requests
def request_device_code(issuer: str, client_id: str) -> dict:
url = f"{issuer}/api/accounts/deviceauth/usercode"
response = requests.post(
url,
headers={"Content-Type": "application/json"},
json={"client_id": client_id},
)
response.raise_for_status()
return response.json()
# Returns: {"device_auth_id": "...", "user_code": "...", "interval": "5"}
```
#### Step 2: Display Instructions to User
```python
def display_device_instructions(issuer: str, device_code_resp: dict):
verification_url = f"{issuer}/codex/device"
user_code = device_code_resp["user_code"]
print(f"\nFollow these steps to sign in with ChatGPT using device code authorization:\n")
print(f"1. Open this link in your browser and sign in to your account")
print(f" {verification_url}")
print(f"\n2. Enter this one-time code (expires in 15 minutes)")
print(f" {user_code}")
print(f"\nDevice codes are a common phishing target. Never share this code.\n")
```
#### Step 3: Poll for Token
```python
import time
from datetime import datetime, timedelta
def poll_for_token(
issuer: str,
device_auth_id: str,
user_code: str,
interval: int = 5,
) -> dict:
url = f"{issuer}/api/accounts/deviceauth/token"
max_wait = timedelta(minutes=15)
start = datetime.now()
while True:
response = requests.post(
url,
headers={"Content-Type": "application/json"},
json={
"device_auth_id": device_auth_id,
"user_code": user_code,
},
)
if response.status_code == 200:
return response.json()
# Returns: {"authorization_code": "...", "code_challenge": "...", "code_verifier": "..."}
if response.status_code in (403, 404):
if datetime.now() - start > max_wait:
raise TimeoutError("Device auth timed out after 15 minutes")
time.sleep(interval)
continue
raise Exception(f"Device auth failed with status {response.status_code}")
```
#### Step 4: Exchange for Tokens
The device code flow returns PKCE codes directly, so you use them to exchange for tokens:
```python
def complete_device_code_login(issuer: str, client_id: str, token_resp: dict) -> dict:
redirect_uri = f"{issuer}/deviceauth/callback"
return exchange_code_for_tokens(
issuer=issuer,
client_id=client_id,
redirect_uri=redirect_uri,
code_verifier=token_resp["code_verifier"],
code=token_resp["authorization_code"],
)
```
---
## Token Refresh
Access tokens expire. Use the refresh token to obtain new tokens without re-authenticating.
```python
def refresh_tokens(issuer: str, client_id: str, refresh_token: str) -> dict:
token_url = f"{issuer}/oauth/token"
response = requests.post(
token_url,
headers={"Content-Type": "application/json"},
json={
"client_id": client_id,
"grant_type": "refresh_token",
"refresh_token": refresh_token,
},
)
if response.status_code == 401:
# Refresh token expired or invalid - user must re-authenticate
body = response.json()
error_code = body.get("error", {})
if isinstance(error_code, dict):
error_code = error_code.get("code", "unknown")
if error_code == "refresh_token_expired":
raise Exception("Your refresh token has expired. Please log out and sign in again.")
elif error_code == "refresh_token_reused":
raise Exception("Your refresh token was already used. Please log out and sign in again.")
elif error_code == "refresh_token_invalidated":
raise Exception("Your refresh token was revoked. Please log out and sign in again.")
else:
raise Exception("Your access token could not be refreshed. Please log out and sign in again.")
response.raise_for_status()
return response.json()
# Returns: {"id_token": "...", "access_token": "...", "refresh_token": "..."}
```
**Important**: The refresh token may change with each refresh (token rotation). Always save the new refresh token if one is returned.
---
## Token Exchange for API Key
Codex can exchange the `id_token` for an OpenAI API key:
```python
def obtain_api_key(issuer: str, client_id: str, id_token: str) -> str:
token_url = f"{issuer}/oauth/token"
body = (
f"grant_type={urllib.parse.quote('urn:ietf:params:oauth:grant-type:token-exchange')}"
f"&client_id={urllib.parse.quote(client_id)}"
f"&requested_token={urllib.parse.quote('openai-api-key')}"
f"&subject_token={urllib.parse.quote(id_token)}"
f"&subject_token_type={urllib.parse.quote('urn:ietf:params:oauth:token-type:id_token')}"
)
response = requests.post(
token_url,
headers={"Content-Type": "application/x-www-form-urlencoded"},
data=body,
)
response.raise_for_status()
return response.json()["access_token"]
```
---
## How Tokens Are Used for API Requests
### Bearer Token Authentication
The `access_token` is used as a Bearer token in the `Authorization` header:
```python
import requests
def make_api_request(access_token: str, url: str, method: str = "GET", **kwargs):
headers = kwargs.pop("headers", {})
headers["Authorization"] = f"Bearer {access_token}"
response = requests.request(method, url, headers=headers, **kwargs)
if response.status_code == 401:
# Token may have expired - refresh and retry
raise TokenExpiredError("Access token expired")
return response
```
### Token Data Structure
Codex stores the following token data:
```python
from dataclasses import dataclass
from typing import Optional
from datetime import datetime
@dataclass
class IdTokenInfo:
email: Optional[str]
chatgpt_plan_type: Optional[str] # "free", "plus", "pro", "business", "enterprise", "edu"
chatgpt_user_id: Optional[str]
chatgpt_account_id: Optional[str] # Workspace/organization ID
raw_jwt: str
@dataclass
class TokenData:
id_token: IdTokenInfo
access_token: str
refresh_token: str
account_id: Optional[str]
```
### Parsing JWT Claims
```python
import base64
import json
def parse_jwt_claims(jwt_token: str) -> dict:
"""Parse the payload from a JWT token."""
parts = jwt_token.split(".")
if len(parts) != 3:
raise ValueError("Invalid JWT format")
# Add padding if needed
payload = parts[1]
padding = 4 - len(payload) % 4
if padding != 4:
payload += "=" * padding
decoded = base64.urlsafe_b64decode(payload)
return json.loads(decoded)
def parse_chatgpt_jwt_claims(jwt_token: str) -> IdTokenInfo:
claims = parse_jwt_claims(jwt_token)
# Extract email from top-level or profile claim
email = claims.get("email")
profile = claims.get("https://api.openai.com/profile", {})
if not email and profile:
email = profile.get("email")
# Extract auth claims
auth = claims.get("https://api.openai.com/auth", {})
return IdTokenInfo(
email=email,
chatgpt_plan_type=auth.get("chatgpt_plan_type"),
chatgpt_user_id=auth.get("chatgpt_user_id") or auth.get("user_id"),
chatgpt_account_id=auth.get("chatgpt_account_id"),
raw_jwt=jwt_token,
)
```
---
## Credential Storage
Codex stores credentials in `~/.codex/auth.json`:
```json
{
"auth_mode": "chatgpt",
"tokens": {
"id_token": "<JWT>",
"access_token": "<JWT>",
"refresh_token": "<string>",
"account_id": "<string>"
},
"last_refresh": "2024-01-01T00:00:00Z"
}
```
Alternatively, credentials can be stored in the system keyring (macOS Keychain, Windows Credential Manager, etc.) when configured.
---
## Complete Python Implementation
Here's a complete, reusable Python class that implements the full OAuth flow:
```python
"""
Codex OAuth 2.1 Client Implementation
This module provides a complete implementation of the OAuth 2.1 flows used by Codex CLI,
including browser-based login with PKCE, device code login, token refresh, and API key exchange.
"""
import base64
import hashlib
import json
import secrets
import time
import urllib.parse
import webbrowser
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from pathlib import Path
from typing import Optional
import requests
# Constants
CLIENT_ID = "app_EMoamEEZ73f0CkXaXp7hrann"
DEFAULT_ISSUER = "https://auth.openai.com"
DEFAULT_PORT = 1455
SCOPES = "openid profile email offline_access api.connectors.read api.connectors.invoke"
@dataclass
class IdTokenInfo:
"""Parsed ID token claims."""
email: Optional[str] = None
chatgpt_plan_type: Optional[str] = None
chatgpt_user_id: Optional[str] = None
chatgpt_account_id: Optional[str] = None
raw_jwt: str = ""
@dataclass
class TokenData:
"""Complete token data stored after authentication."""
id_token: IdTokenInfo
access_token: str
refresh_token: str
account_id: Optional[str] = None
@dataclass
class AuthData:
"""Full authentication data stored to disk."""
auth_mode: str = "chatgpt"
tokens: Optional[TokenData] = None
openai_api_key: Optional[str] = None
last_refresh: Optional[str] = None
class CodexOAuthClient:
"""OAuth 2.1 client for Codex authentication."""
def __init__(
self,
issuer: str = DEFAULT_ISSUER,
client_id: str = CLIENT_ID,
codex_home: Optional[Path] = None,
):
self.issuer = issuer.rstrip("/")
self.client_id = client_id
self.codex_home = codex_home or Path.home() / ".codex"
self.session = requests.Session()
# -------------------------------------------------------------------------
# PKCE Helpers
# -------------------------------------------------------------------------
@staticmethod
def generate_pkce() -> tuple[str, str]:
"""Generate PKCE code verifier and challenge."""
code_verifier = base64.urlsafe_b64encode(secrets.token_bytes(64)).rstrip(b"=").decode()
code_challenge = base64.urlsafe_b64encode(
hashlib.sha256(code_verifier.encode()).digest()
).rstrip(b"=").decode()
return code_verifier, code_challenge
@staticmethod
def generate_state() -> str:
"""Generate CSRF state parameter."""
return base64.urlsafe_b64encode(secrets.token_bytes(32)).rstrip(b"=").decode()
# -------------------------------------------------------------------------
# Authorization URL
# -------------------------------------------------------------------------
def build_authorization_url(
self,
redirect_uri: str,
code_challenge: str,
state: str,
workspace_id: Optional[str] = None,
) -> str:
"""Build the OAuth authorization URL."""
params = {
"response_type": "code",
"client_id": self.client_id,
"redirect_uri": redirect_uri,
"scope": SCOPES,
"code_challenge": code_challenge,
"code_challenge_method": "S256",
"id_token_add_organizations": "true",
"codex_cli_simplified_flow": "true",
"state": state,
"originator": "codex_cli",
}
if workspace_id:
params["allowed_workspace_id"] = workspace_id
query = urllib.parse.urlencode(params)
return f"{self.issuer}/oauth/authorize?{query}"
# -------------------------------------------------------------------------
# Token Exchange
# -------------------------------------------------------------------------
def exchange_code_for_tokens(
self,
code: str,
redirect_uri: str,
code_verifier: str,
) -> dict:
"""Exchange authorization code for tokens."""
token_url = f"{self.issuer}/oauth/token"
body = (
f"grant_type=authorization_code"
f"&code={urllib.parse.quote(code)}"
f"&redirect_uri={urllib.parse.quote(redirect_uri)}"
f"&client_id={urllib.parse.quote(self.client_id)}"
f"&code_verifier={urllib.parse.quote(code_verifier)}"
)
response = self.session.post(
token_url,
headers={"Content-Type": "application/x-www-form-urlencoded"},
data=body,
)
response.raise_for_status()
return response.json()
# -------------------------------------------------------------------------
# Token Refresh
# -------------------------------------------------------------------------
def refresh_tokens(self, refresh_token: str) -> dict:
"""Refresh tokens using the refresh token."""
token_url = f"{self.issuer}/oauth/token"
response = self.session.post(
token_url,
headers={"Content-Type": "application/json"},
json={
"client_id": self.client_id,
"grant_type": "refresh_token",
"refresh_token": refresh_token,
},
)
if response.status_code == 401:
body = response.json()
error = body.get("error", {})
error_code = error.get("code", "unknown") if isinstance(error, dict) else str(error)
messages = {
"refresh_token_expired": "Your refresh token has expired. Please log out and sign in again.",
"refresh_token_reused": "Your refresh token was already used. Please log out and sign in again.",
"refresh_token_invalidated": "Your refresh token was revoked. Please log out and sign in again.",
}
raise Exception(messages.get(error_code, "Your access token could not be refreshed. Please log out and sign in again."))
response.raise_for_status()
return response.json()
# -------------------------------------------------------------------------
# API Key Exchange
# -------------------------------------------------------------------------
def obtain_api_key(self, id_token: str) -> str:
"""Exchange ID token for an OpenAI API key."""
token_url = f"{self.issuer}/oauth/token"
body = (
f"grant_type={urllib.parse.quote('urn:ietf:params:oauth:grant-type:token-exchange')}"
f"&client_id={urllib.parse.quote(self.client_id)}"
f"&requested_token={urllib.parse.quote('openai-api-key')}"
f"&subject_token={urllib.parse.quote(id_token)}"
f"&subject_token_type={urllib.parse.quote('urn:ietf:params:oauth:token-type:id_token')}"
)
response = self.session.post(
token_url,
headers={"Content-Type": "application/x-www-form-urlencoded"},
data=body,
)
response.raise_for_status()
return response.json()["access_token"]
# -------------------------------------------------------------------------
# Device Code Flow
# -------------------------------------------------------------------------
def request_device_code(self) -> dict:
"""Request a device code for headless login."""
url = f"{self.issuer}/api/accounts/deviceauth/usercode"
response = self.session.post(
url,
headers={"Content-Type": "application/json"},
json={"client_id": self.client_id},
)
response.raise_for_status()
return response.json()
def poll_device_code_token(
self,
device_auth_id: str,
user_code: str,
interval: int = 5,
) -> dict:
"""Poll for device code token."""
url = f"{self.issuer}/api/accounts/deviceauth/token"
max_wait = timedelta(minutes=15)
start = datetime.now()
while True:
response = self.session.post(
url,
headers={"Content-Type": "application/json"},
json={
"device_auth_id": device_auth_id,
"user_code": user_code,
},
)
if response.status_code == 200:
return response.json()
if response.status_code in (403, 404):
if datetime.now() - start > max_wait:
raise TimeoutError("Device auth timed out after 15 minutes")
time.sleep(interval)
continue
raise Exception(f"Device auth failed with status {response.status_code}")
def run_device_code_login(self) -> TokenData:
"""Complete device code login flow."""
# Step 1: Request device code
device_resp = self.request_device_code()
device_auth_id = device_resp["device_auth_id"]
user_code = device_resp["user_code"]
interval = int(device_resp.get("interval", 5))
# Step 2: Display instructions
verification_url = f"{self.issuer}/codex/device"
print(f"\nFollow these steps to sign in:\n")
print(f"1. Open this link and sign in:")
print(f" {verification_url}")
print(f"\n2. Enter this code (expires in 15 minutes):")
print(f" {user_code}")
print(f"\nNever share this code.\n")
# Step 3: Poll for token
token_resp = self.poll_device_code_token(device_auth_id, user_code, interval)
# Step 4: Exchange for tokens
redirect_uri = f"{self.issuer}/deviceauth/callback"
tokens = self.exchange_code_for_tokens(
code=token_resp["authorization_code"],
redirect_uri=redirect_uri,
code_verifier=token_resp["code_verifier"],
)
return self._build_token_data(tokens)
# -------------------------------------------------------------------------
# JWT Parsing
# -------------------------------------------------------------------------
@staticmethod
def parse_jwt_claims(jwt_token: str) -> dict:
"""Parse JWT payload."""
parts = jwt_token.split(".")
if len(parts) != 3:
raise ValueError("Invalid JWT format")
payload = parts[1]
padding = 4 - len(payload) % 4
if padding != 4:
payload += "=" * padding
decoded = base64.urlsafe_b64decode(payload)
return json.loads(decoded)
def parse_id_token(self, jwt_token: str) -> IdTokenInfo:
"""Parse ID token into structured claims."""
claims = self.parse_jwt_claims(jwt_token)
email = claims.get("email")
profile = claims.get("https://api.openai.com/profile", {})
if not email and profile:
email = profile.get("email")
auth = claims.get("https://api.openai.com/auth", {})
return IdTokenInfo(
email=email,
chatgpt_plan_type=auth.get("chatgpt_plan_type"),
chatgpt_user_id=auth.get("chatgpt_user_id") or auth.get("user_id"),
chatgpt_account_id=auth.get("chatgpt_account_id"),
raw_jwt=jwt_token,
)
# -------------------------------------------------------------------------
# Token Data Building
# -------------------------------------------------------------------------
def _build_token_data(self, tokens: dict) -> TokenData:
"""Build TokenData from token response."""
id_token_info = self.parse_id_token(tokens["id_token"])
account_id = id_token_info.chatgpt_account_id
return TokenData(
id_token=id_token_info,
access_token=tokens["access_token"],
refresh_token=tokens["refresh_token"],
account_id=account_id,
)
# -------------------------------------------------------------------------
# Credential Storage
# -------------------------------------------------------------------------
def save_auth(self, auth_data: AuthData) -> None:
"""Save authentication data to auth.json."""
self.codex_home.mkdir(parents=True, exist_ok=True)
auth_path = self.codex_home / "auth.json"
with open(auth_path, "w") as f:
json.dump(
{
"auth_mode": auth_data.auth_mode,
"openai_api_key": auth_data.openai_api_key,
"tokens": {
"id_token": auth_data.tokens.id_token.raw_jwt if auth_data.tokens else None,
"access_token": auth_data.tokens.access_token if auth_data.tokens else None,
"refresh_token": auth_data.tokens.refresh_token if auth_data.tokens else None,
"account_id": auth_data.tokens.account_id if auth_data.tokens else None,
} if auth_data.tokens else None,
"last_refresh": auth_data.last_refresh,
},
f,
indent=2,
)
# Set restrictive permissions on Unix
import os
if os.name == "posix":
os.chmod(auth_path, 0o600)
def load_auth(self) -> Optional[AuthData]:
"""Load authentication data from auth.json."""
auth_path = self.codex_home / "auth.json"
if not auth_path.exists():
return None
with open(auth_path) as f:
data = json.load(f)
tokens = None
if data.get("tokens"):
t = data["tokens"]
tokens = TokenData(
id_token=self.parse_id_token(t["id_token"]),
access_token=t["access_token"],
refresh_token=t["refresh_token"],
account_id=t.get("account_id"),
)
return AuthData(
auth_mode=data.get("auth_mode", "chatgpt"),
tokens=tokens,
openai_api_key=data.get("openai_api_key"),
last_refresh=data.get("last_refresh"),
)
# -------------------------------------------------------------------------
# Making Authenticated API Requests
# -------------------------------------------------------------------------
def get_bearer_token(self) -> str:
"""Get the current access token, refreshing if needed."""
auth = self.load_auth()
if not auth or not auth.tokens:
raise Exception("Not authenticated. Please run login first.")
# Check if token is expired (with 5 minute buffer)
claims = self.parse_jwt_claims(auth.tokens.access_token)
exp = claims.get("exp", 0)
if exp < time.time() + 300:
# Token expired or about to expire - refresh
new_tokens = self.refresh_tokens(auth.tokens.refresh_token)
token_data = self._build_token_data(new_tokens)
auth.tokens = token_data
auth.last_refresh = datetime.utcnow().isoformat() + "Z"
self.save_auth(auth)
return token_data.access_token
return auth.tokens.access_token
def make_api_request(
self,
method: str,
url: str,
max_retries: int = 1,
**kwargs,
) -> requests.Response:
"""Make an authenticated API request with automatic token refresh."""
headers = kwargs.pop("headers", {})
for attempt in range(max_retries + 1):
token = self.get_bearer_token()
headers["Authorization"] = f"Bearer {token}"
response = self.session.request(method, url, headers=headers, **kwargs)
if response.status_code == 401 and attempt < max_retries:
# Force refresh and retry
auth = self.load_auth()
if auth and auth.tokens:
new_tokens = self.refresh_tokens(auth.tokens.refresh_token)
token_data = self._build_token_data(new_tokens)
auth.tokens = token_data
auth.last_refresh = datetime.utcnow().isoformat() + "Z"
self.save_auth(auth)
continue
return response
response.raise_for_status()
return response
# -------------------------------------------------------------------------
# Usage Examples
# -------------------------------------------------------------------------
if __name__ == "__main__":
import sys
client = CodexOAuthClient()
if len(sys.argv) < 2:
print("Usage:")
print(" python oauth_client.py login # Browser login")
print(" python oauth_client.py device-login # Device code login")
print(" python oauth_client.py status # Check auth status")
print(" python oauth_client.py refresh # Refresh tokens")
print(" python oauth_client.py api-key # Get API key")
print(" python oauth_client.py request <URL> # Make API request")
sys.exit(1)
command = sys.argv[1]
if command == "login":
print("Browser login is best implemented with a local callback server.")
print("Use 'device-login' for headless environments.")
elif command == "device-login":
token_data = client.run_device_code_login()
# Optionally obtain an API key
try:
api_key = client.obtain_api_key(token_data.id_token.raw_jwt)
except Exception:
api_key = None
auth_data = AuthData(
auth_mode="chatgpt",
tokens=token_data,
openai_api_key=api_key,
last_refresh=datetime.utcnow().isoformat() + "Z",
)
client.save_auth(auth_data)
print("Successfully logged in!")
elif command == "status":
auth = client.load_auth()
if auth and auth.tokens:
print(f"Logged in as: {auth.tokens.id_token.email}")
print(f"Plan: {auth.tokens.id_token.chatgpt_plan_type}")
print(f"Account: {auth.tokens.id_token.chatgpt_account_id}")
if auth.openai_api_key:
key = auth.openai_api_key
masked = f"{key[:8]}***{key[-5:]}" if len(key) > 13 else "***"
print(f"API Key: {masked}")
else:
print("Not logged in.")
sys.exit(1)
elif command == "refresh":
auth = client.load_auth()
if not auth or not auth.tokens:
print("Not logged in.")
sys.exit(1)
new_tokens = client.refresh_tokens(auth.tokens.refresh_token)
token_data = client._build_token_data(new_tokens)
auth.tokens = token_data
auth.last_refresh = datetime.utcnow().isoformat() + "Z"
client.save_auth(auth)
print("Tokens refreshed successfully!")
elif command == "api-key":
auth = client.load_auth()
if not auth or not auth.tokens:
print("Not logged in.")
sys.exit(1)
api_key = client.obtain_api_key(auth.tokens.id_token.raw_jwt)
print(f"API Key: {api_key}")
elif command == "request":
if len(sys.argv) < 3:
print("Usage: python oauth_client.py request <URL>")
sys.exit(1)
url = sys.argv[2]
response = client.make_api_request("GET", url)
print(response.text)
```
---
## Key Constants and Endpoints
| Constant | Value |
|----------|-------|
| Client ID | `app_EMoamEEZ73f0CkXaXp7hrann` |
| Default Issuer | `https://auth.openai.com` |
| Default Callback Port | `1455` |
| Scopes | `openid profile email offline_access api.connectors.read api.connectors.invoke` |
| Originator | `codex_cli` |
| Refresh Token URL | `https://auth.openai.com/oauth/token` |
| Token Rotation | Yes - refresh token may change on each refresh |
### Auth JSON File Location
- **Default**: `~/.codex/auth.json`
- **Permissions**: `0600` (owner read/write only on Unix)
### Error Codes
| Error Code | Meaning |
|------------|---------|
| `refresh_token_expired` | Refresh token has expired |
| `refresh_token_reused` | Refresh token was already used (single-use) |
| `refresh_token_invalidated` | Refresh token was revoked |
| `access_denied` + `missing_codex_entitlement` | User doesn't have Codex access |
---
## Summary of the OAuth 2.1 Flow
1. **PKCE Generation**: Generate `code_verifier` and `code_challenge` (SHA256 hash, base64url encoded)
2. **Authorization Request**: Redirect user to `/oauth/authorize` with PKCE challenge
3. **User Authentication**: User signs in at the OpenAI auth provider
4. **Callback**: Auth server redirects to `localhost:1455/auth/callback` with `code` and `state`
5. **Token Exchange**: Exchange `code` + `code_verifier` for `id_token`, `access_token`, `refresh_token`
6. **API Key Exchange** (optional): Exchange `id_token` for an OpenAI API key
7. **Storage**: Save tokens to `~/.codex/auth.json`
8. **Usage**: Use `access_token` as Bearer token for API requests
9. **Refresh**: Use `refresh_token` to get new tokens when `access_token` expires
...@@ -322,6 +322,21 @@ ...@@ -322,6 +322,21 @@
"capabilities": ["t2t", "vision", "function_calling"] "capabilities": ["t2t", "vision", "function_calling"]
} }
] ]
},
"codex": {
"id": "codex",
"name": "Codex (OpenAI OAuth2)",
"endpoint": "https://api.openai.com/v1",
"type": "codex",
"api_key_required": false,
"nsfw": false,
"privacy": false,
"rate_limit": 0,
"codex_config": {
"_comment": "Uses OAuth2 Device Authorization Grant (Codex CLI compatible)",
"credentials_file": "~/.aisbf/codex_credentials.json",
"issuer": "https://auth.openai.com"
}
} }
} }
} }
...@@ -6568,5 +6568,198 @@ async def dashboard_kilo_auth_logout(request: Request): ...@@ -6568,5 +6568,198 @@ async def dashboard_kilo_auth_logout(request: Request):
) )
# Codex OAuth2 authentication endpoints
@app.post("/dashboard/codex/auth/start")
async def dashboard_codex_auth_start(request: Request):
"""Start Codex OAuth2 Device Authorization Grant flow"""
auth_check = require_dashboard_auth(request)
if auth_check:
return auth_check
try:
data = await request.json()
provider_key = data.get('provider_key')
credentials_file = data.get('credentials_file', '~/.aisbf/codex_credentials.json')
issuer = data.get('issuer', 'https://auth.openai.com')
if not provider_key:
return JSONResponse(
status_code=400,
content={"success": False, "error": "Provider key is required"}
)
# Import CodexOAuth2
from aisbf.auth.codex import CodexOAuth2
# Create auth instance
auth = CodexOAuth2(credentials_file=credentials_file, issuer=issuer)
# Initiate device authorization (async method)
device_auth = await auth.authenticate_with_device_flow()
if not device_auth:
return JSONResponse(
status_code=500,
content={"success": False, "error": "Failed to initiate device authorization"}
)
# Store device code in session for polling
request.session['codex_device_code'] = device_auth.get('user_code')
request.session['codex_provider'] = provider_key
request.session['codex_credentials_file'] = credentials_file
request.session['codex_issuer'] = issuer
return JSONResponse({
"success": True,
"user_code": device_auth.get('user_code'),
"verification_uri": device_auth.get('verification_uri', f'{issuer}/codex/device'),
"expires_in": 900, # 15 minutes
"interval": 5,
"message": f"Please visit {device_auth.get('verification_uri', f'{issuer}/codex/device')} and enter code: {device_auth.get('user_code')}"
})
except Exception as e:
logger.error(f"Error starting Codex auth: {e}")
return JSONResponse(
status_code=500,
content={"success": False, "error": str(e)}
)
@app.post("/dashboard/codex/auth/poll")
async def dashboard_codex_auth_poll(request: Request):
"""Poll Codex OAuth2 device authorization status"""
auth_check = require_dashboard_auth(request)
if auth_check:
return auth_check
try:
# Check if authentication was completed
credentials_file = request.session.get('codex_credentials_file', '~/.aisbf/codex_credentials.json')
issuer = request.session.get('codex_issuer', 'https://auth.openai.com')
# Import CodexOAuth2
from aisbf.auth.codex import CodexOAuth2
# Create auth instance
auth = CodexOAuth2(credentials_file=credentials_file, issuer=issuer)
# Check if authenticated
if auth.is_authenticated():
# Clear session
request.session.pop('codex_device_code', None)
request.session.pop('codex_provider', None)
request.session.pop('codex_credentials_file', None)
request.session.pop('codex_issuer', None)
return JSONResponse({
"success": True,
"status": "approved",
"message": "Authentication completed successfully"
})
else:
return JSONResponse({
"success": True,
"status": "pending",
"message": "Waiting for user authorization"
})
except Exception as e:
logger.error(f"Error polling Codex auth: {e}")
return JSONResponse(
status_code=500,
content={"success": False, "status": "error", "error": str(e)}
)
@app.post("/dashboard/codex/auth/status")
async def dashboard_codex_auth_status(request: Request):
"""Check Codex authentication status"""
auth_check = require_dashboard_auth(request)
if auth_check:
return auth_check
try:
data = await request.json()
provider_key = data.get('provider_key')
credentials_file = data.get('credentials_file', '~/.aisbf/codex_credentials.json')
if not provider_key:
return JSONResponse(
status_code=400,
content={"authenticated": False, "error": "Provider key is required"}
)
# Import CodexOAuth2
from aisbf.auth.codex import CodexOAuth2
# Create auth instance
auth = CodexOAuth2(credentials_file=credentials_file)
# Check if authenticated
if auth.is_authenticated():
# Try to get a valid token (will refresh if needed)
token = auth.get_valid_token()
if token:
# Get user email from ID token
email = auth.get_user_email()
return JSONResponse({
"authenticated": True,
"email": email
})
return JSONResponse({
"authenticated": False
})
except Exception as e:
logger.error(f"Error checking Codex auth status: {e}")
return JSONResponse(
status_code=500,
content={"authenticated": False, "error": str(e)}
)
@app.post("/dashboard/codex/auth/logout")
async def dashboard_codex_auth_logout(request: Request):
"""Logout from Codex OAuth2 (clear stored credentials)"""
auth_check = require_dashboard_auth(request)
if auth_check:
return auth_check
try:
data = await request.json()
provider_key = data.get('provider_key')
credentials_file = data.get('credentials_file', '~/.aisbf/codex_credentials.json')
if not provider_key:
return JSONResponse(
status_code=400,
content={"success": False, "error": "Provider key is required"}
)
# Import CodexOAuth2
from aisbf.auth.codex import CodexOAuth2
# Create auth instance
auth = CodexOAuth2(credentials_file=credentials_file)
# Logout (clear credentials)
auth.logout()
return JSONResponse({
"success": True,
"message": "Logged out successfully"
})
except Exception as e:
logger.error(f"Error logging out from Codex: {e}")
return JSONResponse(
status_code=500,
content={"success": False, "error": str(e)}
)
if __name__ == "__main__": if __name__ == "__main__":
main() main()
...@@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" ...@@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"
[project] [project]
name = "aisbf" name = "aisbf"
version = "0.9.7" version = "0.9.8"
description = "AISBF - AI Service Broker Framework || AI Should Be Free - A modular proxy server for managing multiple AI provider integrations" description = "AISBF - AI Service Broker Framework || AI Should Be Free - A modular proxy server for managing multiple AI provider integrations"
readme = "README.md" readme = "README.md"
license = "GPL-3.0-or-later" license = "GPL-3.0-or-later"
...@@ -49,7 +49,7 @@ Documentation = "https://git.nexlab.net/nexlab/aisbf.git" ...@@ -49,7 +49,7 @@ Documentation = "https://git.nexlab.net/nexlab/aisbf.git"
[tool.setuptools] [tool.setuptools]
packages = ["aisbf", "aisbf.auth", "aisbf.providers", "aisbf.providers.kiro"] packages = ["aisbf", "aisbf.auth", "aisbf.providers", "aisbf.providers.kiro"]
# Note: Provider handler modules (base, google, openai, anthropic, claude, kilo, ollama) are in aisbf.providers package # Note: Provider handler modules (base, google, openai, anthropic, claude, kilo, ollama, codex) are in aisbf.providers package
py-modules = ["cli"] py-modules = ["cli"]
[tool.setuptools.package-data] [tool.setuptools.package-data]
......
...@@ -49,7 +49,7 @@ class InstallCommand(_install): ...@@ -49,7 +49,7 @@ class InstallCommand(_install):
setup( setup(
name="aisbf", name="aisbf",
version="0.9.7", version="0.9.8",
author="AISBF Contributors", author="AISBF Contributors",
author_email="stefy@nexlab.net", author_email="stefy@nexlab.net",
description="AISBF - AI Service Broker Framework || AI Should Be Free - A modular proxy server for managing multiple AI provider integrations", description="AISBF - AI Service Broker Framework || AI Should Be Free - A modular proxy server for managing multiple AI provider integrations",
...@@ -122,6 +122,7 @@ setup( ...@@ -122,6 +122,7 @@ setup(
'aisbf/providers/claude.py', 'aisbf/providers/claude.py',
'aisbf/providers/kilo.py', 'aisbf/providers/kilo.py',
'aisbf/providers/ollama.py', 'aisbf/providers/ollama.py',
'aisbf/providers/codex.py',
]), ]),
# aisbf.providers.kiro subpackage # aisbf.providers.kiro subpackage
('share/aisbf/aisbf/providers/kiro', [ ('share/aisbf/aisbf/providers/kiro', [
...@@ -139,6 +140,7 @@ setup( ...@@ -139,6 +140,7 @@ setup(
'aisbf/auth/kiro.py', 'aisbf/auth/kiro.py',
'aisbf/auth/claude.py', 'aisbf/auth/claude.py',
'aisbf/auth/kilo.py', 'aisbf/auth/kilo.py',
'aisbf/auth/codex.py',
]), ]),
# Install dashboard templates # Install dashboard templates
('share/aisbf/templates', [ ('share/aisbf/templates', [
......
...@@ -49,6 +49,7 @@ along with this program. If not, see <https://www.gnu.org/licenses/>. ...@@ -49,6 +49,7 @@ along with this program. If not, see <https://www.gnu.org/licenses/>.
<option value="kiro">Kiro (Amazon Q Developer)</option> <option value="kiro">Kiro (Amazon Q Developer)</option>
<option value="claude">Claude (OAuth2)</option> <option value="claude">Claude (OAuth2)</option>
<option value="kilocode">Kilocode (OAuth2)</option> <option value="kilocode">Kilocode (OAuth2)</option>
<option value="codex">Codex (OpenAI OAuth2)</option>
</select> </select>
<small style="color: #a0a0a0; display: block; margin-top: 5px;">Select the type of provider to configure appropriate settings</small> <small style="color: #a0a0a0; display: block; margin-top: 5px;">Select the type of provider to configure appropriate settings</small>
</div> </div>
...@@ -127,6 +128,7 @@ function renderProviderDetails(key) { ...@@ -127,6 +128,7 @@ function renderProviderDetails(key) {
const isKiroProvider = provider.type === 'kiro'; const isKiroProvider = provider.type === 'kiro';
const isClaudeProvider = provider.type === 'claude'; const isClaudeProvider = provider.type === 'claude';
const isKiloProvider = provider.type === 'kilocode'; const isKiloProvider = provider.type === 'kilocode';
const isCodexProvider = provider.type === 'codex';
// Initialize kiro_config if this is a kiro provider and doesn't have it // Initialize kiro_config if this is a kiro provider and doesn't have it
if (isKiroProvider && !provider.kiro_config) { if (isKiroProvider && !provider.kiro_config) {
...@@ -156,9 +158,18 @@ function renderProviderDetails(key) { ...@@ -156,9 +158,18 @@ function renderProviderDetails(key) {
}; };
} }
// Initialize codex_config if this is a codex provider and doesn't have it
if (isCodexProvider && !provider.codex_config) {
provider.codex_config = {
credentials_file: '~/.aisbf/codex_credentials.json',
issuer: 'https://auth.openai.com'
};
}
const kiroConfig = provider.kiro_config || {}; const kiroConfig = provider.kiro_config || {};
const claudeConfig = provider.claude_config || {}; const claudeConfig = provider.claude_config || {};
const kiloConfig = provider.kilo_config || {}; const kiloConfig = provider.kilo_config || {};
const codexConfig = provider.codex_config || {};
// Build authentication fields based on provider type // Build authentication fields based on provider type
let authFieldsHtml = ''; let authFieldsHtml = '';
...@@ -322,6 +333,55 @@ function renderProviderDetails(key) { ...@@ -322,6 +333,55 @@ function renderProviderDetails(key) {
<div id="claude-upload-status-${key}" style="margin-top: 10px;"></div> <div id="claude-upload-status-${key}" style="margin-top: 10px;"></div>
</div> </div>
`; `;
} else if (isCodexProvider) {
// Codex OAuth2 authentication fields
authFieldsHtml = `
<div style="background: #0f2840; padding: 15px; border-radius: 5px; margin-bottom: 15px; border-left: 3px solid #4a9eff;">
<h4 style="margin: 0 0 15px 0; color: #4a9eff;">Codex OAuth2 Authentication</h4>
<small style="color: #a0a0a0; display: block; margin-bottom: 15px;">
Codex uses OAuth2 Device Authorization Grant (same protocol as OpenAI). Click "Authenticate" to start the OAuth2 flow.
</small>
<div class="form-group">
<label>API Key (optional - if provided, used instead of OAuth2)</label>
<input type="password" value="${provider.api_key || ''}" onchange="updateProvider('${key}', 'api_key', this.value)" placeholder="Enter OpenAI API key (optional)">
<small style="color: #a0a0a0; display: block; margin-top: 5px;">If provided, API key authentication will be used instead of OAuth2</small>
</div>
<div class="form-group">
<label>OAuth2 Issuer URL</label>
<input type="text" value="${codexConfig.issuer || 'https://auth.openai.com'}" onchange="updateCodexConfig('${key}', 'issuer', this.value)" placeholder="https://auth.openai.com">
<small style="color: #a0a0a0; display: block; margin-top: 5px;">OAuth2 issuer URL (default: https://auth.openai.com)</small>
</div>
<div class="form-group">
<label>Credentials File Path</label>
<input type="text" value="${codexConfig.credentials_file || '~/.aisbf/codex_credentials.json'}" onchange="updateCodexConfig('${key}', 'credentials_file', this.value)" placeholder="~/.aisbf/codex_credentials.json">
<small style="color: #a0a0a0; display: block; margin-top: 5px;">Path where OAuth2 credentials will be stored</small>
</div>
<div style="margin-top: 15px;">
<button type="button" class="btn" onclick="authenticateCodex('${key}')" style="background: #4a9eff;">
🔐 Authenticate with Codex (Device Code)
</button>
<button type="button" class="btn btn-secondary" onclick="checkCodexAuth('${key}')" style="margin-left: 10px;">
Check Status
</button>
</div>
<div id="codex-auth-status-${key}" style="margin-top: 10px; padding: 10px; border-radius: 3px; display: none;">
<!-- Auth status will be displayed here -->
</div>
<h5 style="margin: 20px 0 10px 0; color: #8ec8ff;">Or Upload Credentials File</h5>
<div class="form-group">
<label>Upload Credentials File</label>
<input type="file" id="codex-creds-file-${key}" accept=".json" onchange="uploadCodexFile('${key}', this.files[0])">
<small style="color: #a0a0a0; display: block; margin-top: 5px;">Upload Codex OAuth2 credentials JSON file</small>
</div>
<div id="codex-upload-status-${key}" style="margin-top: 10px;"></div>
</div>
`;
} else { } else {
// Standard API key authentication fields // Standard API key authentication fields
authFieldsHtml = ` authFieldsHtml = `
...@@ -355,6 +415,7 @@ function renderProviderDetails(key) { ...@@ -355,6 +415,7 @@ function renderProviderDetails(key) {
<input type="text" value="${provider.endpoint}" onchange="updateProvider('${key}', 'endpoint', this.value)" ${provider.type === 'kilocode' ? 'readonly style="background: #0f2840; cursor: not-allowed;"' : ''} required> <input type="text" value="${provider.endpoint}" onchange="updateProvider('${key}', 'endpoint', this.value)" ${provider.type === 'kilocode' ? 'readonly style="background: #0f2840; cursor: not-allowed;"' : ''} required>
${isKiroProvider ? '<small style="color: #a0a0a0; display: block; margin-top: 5px;">Typically: https://q.us-east-1.amazonaws.com</small>' : ''} ${isKiroProvider ? '<small style="color: #a0a0a0; display: block; margin-top: 5px;">Typically: https://q.us-east-1.amazonaws.com</small>' : ''}
${provider.type === 'kilocode' ? '<small style="color: #a0a0a0; display: block; margin-top: 5px;">Fixed endpoint for Kilocode provider</small>' : ''} ${provider.type === 'kilocode' ? '<small style="color: #a0a0a0; display: block; margin-top: 5px;">Fixed endpoint for Kilocode provider</small>' : ''}
${isCodexProvider ? '<small style="color: #a0a0a0; display: block; margin-top: 5px;">Typically: https://api.openai.com/v1 (OpenAI-compatible)</small>' : ''}
</div> </div>
<div class="form-group"> <div class="form-group">
...@@ -367,6 +428,7 @@ function renderProviderDetails(key) { ...@@ -367,6 +428,7 @@ function renderProviderDetails(key) {
<option value="kiro" ${provider.type === 'kiro' ? 'selected' : ''}>Kiro (Amazon Q Developer)</option> <option value="kiro" ${provider.type === 'kiro' ? 'selected' : ''}>Kiro (Amazon Q Developer)</option>
<option value="claude" ${provider.type === 'claude' ? 'selected' : ''}>Claude (OAuth2)</option> <option value="claude" ${provider.type === 'claude' ? 'selected' : ''}>Claude (OAuth2)</option>
<option value="kilocode" ${provider.type === 'kilocode' ? 'selected' : ''}>Kilocode (OAuth2)</option> <option value="kilocode" ${provider.type === 'kilocode' ? 'selected' : ''}>Kilocode (OAuth2)</option>
<option value="codex" ${provider.type === 'codex' ? 'selected' : ''}>Codex (OpenAI OAuth2)</option>
</select> </select>
</div> </div>
...@@ -591,7 +653,8 @@ function updateNewProviderDefaults() { ...@@ -591,7 +653,8 @@ function updateNewProviderDefaults() {
'ollama': 'Ollama local provider. No API key required by default. Endpoint: http://localhost:11434/api', 'ollama': 'Ollama local provider. No API key required by default. Endpoint: http://localhost:11434/api',
'kiro': 'Kiro (Amazon Q Developer) provider. Uses Kiro credentials (IDE, CLI, or direct tokens). Endpoint: https://q.us-east-1.amazonaws.com', 'kiro': 'Kiro (Amazon Q Developer) provider. Uses Kiro credentials (IDE, CLI, or direct tokens). Endpoint: https://q.us-east-1.amazonaws.com',
'claude': 'Claude Code provider. Uses OAuth2 authentication (browser-based login). Endpoint: https://api.anthropic.com/v1', 'claude': 'Claude Code provider. Uses OAuth2 authentication (browser-based login). Endpoint: https://api.anthropic.com/v1',
'kilocode': 'Kilocode provider. Uses OAuth2 Device Authorization Grant. Endpoint: https://api.kilo.ai/api/gateway' 'kilocode': 'Kilocode provider. Uses OAuth2 Device Authorization Grant. Endpoint: https://api.kilo.ai/api/gateway',
'codex': 'Codex provider. Uses OAuth2 Device Authorization Grant (same protocol as OpenAI). Endpoint: https://api.openai.com/v1'
}; };
descriptionEl.textContent = descriptions[providerType] || 'Standard provider configuration.'; descriptionEl.textContent = descriptions[providerType] || 'Standard provider configuration.';
...@@ -627,7 +690,7 @@ function confirmAddProvider() { ...@@ -627,7 +690,7 @@ function confirmAddProvider() {
name: key, name: key,
endpoint: '', endpoint: '',
type: providerType, type: providerType,
api_key_required: providerType !== 'kiro' && providerType !== 'ollama' && providerType !== 'claude' && providerType !== 'kilocode', api_key_required: providerType !== 'kiro' && providerType !== 'ollama' && providerType !== 'claude' && providerType !== 'kilocode' && providerType !== 'codex',
rate_limit: 0, rate_limit: 0,
default_rate_limit: 0, default_rate_limit: 0,
models: [] models: []
...@@ -659,6 +722,13 @@ function confirmAddProvider() { ...@@ -659,6 +722,13 @@ function confirmAddProvider() {
credentials_file: '~/.kilo_credentials.json', credentials_file: '~/.kilo_credentials.json',
api_base: 'https://api.kilo.ai/api/gateway' api_base: 'https://api.kilo.ai/api/gateway'
}; };
} else if (providerType === 'codex') {
newProvider.endpoint = 'https://api.openai.com/v1';
newProvider.name = key + ' (Codex OAuth2)';
newProvider.codex_config = {
credentials_file: '~/.aisbf/codex_credentials.json',
issuer: 'https://auth.openai.com'
};
} else if (providerType === 'openai') { } else if (providerType === 'openai') {
newProvider.endpoint = 'https://api.openai.com/v1'; newProvider.endpoint = 'https://api.openai.com/v1';
newProvider.api_key = ''; newProvider.api_key = '';
...@@ -776,14 +846,30 @@ function updateProviderType(key, newType) { ...@@ -776,14 +846,30 @@ function updateProviderType(key, newType) {
}; };
delete providersData[key].kiro_config; delete providersData[key].kiro_config;
delete providersData[key].claude_config; delete providersData[key].claude_config;
delete providersData[key].codex_config;
// Set endpoint for kilocode (fixed, not modifiable) // Set endpoint for kilocode (fixed, not modifiable)
providersData[key].endpoint = 'https://api.kilo.ai/api/gateway'; providersData[key].endpoint = 'https://api.kilo.ai/api/gateway';
} else if (newType !== 'kiro' && newType !== 'claude' && newType !== 'kilocode' && (oldType === 'kiro' || oldType === 'claude' || oldType === 'kilocode')) { } else if (newType === 'codex' && oldType !== 'codex') {
// Transitioning FROM kiro/claude/kilocode: remove special configs, set api_key_required to true // Transitioning TO codex: initialize codex_config, set api_key_required to false
providersData[key].api_key_required = false;
providersData[key].codex_config = {
credentials_file: '~/.aisbf/codex_credentials.json',
issuer: 'https://auth.openai.com'
};
delete providersData[key].kiro_config;
delete providersData[key].claude_config;
delete providersData[key].kilo_config;
// Set default endpoint for codex
if (!providersData[key].endpoint || providersData[key].endpoint === '') {
providersData[key].endpoint = 'https://api.openai.com/v1';
}
} else if (newType !== 'kiro' && newType !== 'claude' && newType !== 'kilocode' && newType !== 'codex' && (oldType === 'kiro' || oldType === 'claude' || oldType === 'kilocode' || oldType === 'codex')) {
// Transitioning FROM kiro/claude/kilocode/codex: remove special configs, set api_key_required to true
providersData[key].api_key_required = true; providersData[key].api_key_required = true;
delete providersData[key].kiro_config; delete providersData[key].kiro_config;
delete providersData[key].claude_config; delete providersData[key].claude_config;
delete providersData[key].kilo_config; delete providersData[key].kilo_config;
delete providersData[key].codex_config;
} }
// Re-render to show appropriate fields // Re-render to show appropriate fields
...@@ -811,6 +897,184 @@ function updateKiloConfig(key, field, value) { ...@@ -811,6 +897,184 @@ function updateKiloConfig(key, field, value) {
providersData[key].kilo_config[field] = value; providersData[key].kilo_config[field] = value;
} }
function updateCodexConfig(key, field, value) {
if (!providersData[key].codex_config) {
providersData[key].codex_config = {};
}
providersData[key].codex_config[field] = value;
}
async function authenticateCodex(key) {
const statusEl = document.getElementById(`codex-auth-status-${key}`);
statusEl.style.display = 'block';
statusEl.style.background = '#0f2840';
statusEl.style.border = '1px solid #4a9eff';
statusEl.innerHTML = '<p style="margin: 0; color: #4a9eff;">🔄 Starting Codex OAuth2 Device Authorization flow...</p>';
try {
const response = await fetch('/dashboard/codex/auth/start', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
},
body: JSON.stringify({
provider_key: key,
credentials_file: providersData[key].codex_config?.credentials_file || '~/.aisbf/codex_credentials.json',
issuer: providersData[key].codex_config?.issuer || 'https://auth.openai.com'
})
});
const data = await response.json();
if (!data.success) {
statusEl.style.background = '#402010';
statusEl.style.border = '1px solid #ff4a4a';
statusEl.innerHTML = `<p style="margin: 0; color: #ff4a4a;">✗ Failed to start authentication: ${data.error || 'Unknown error'}</p>`;
return;
}
statusEl.style.background = '#0f2840';
statusEl.style.border = '1px solid #4a9eff';
statusEl.innerHTML = `
<div style="margin: 0;">
<p style="margin: 0 0 10px 0; color: #4a9eff; font-weight: bold;">🔐 Codex Device Authorization</p>
<p style="margin: 0 0 10px 0; color: #e0e0e0;">
Please visit: <a href="${data.verification_uri}" target="_blank" style="color: #4eff9e; text-decoration: underline;">${data.verification_uri}</a>
</p>
<p style="margin: 0 0 10px 0; color: #e0e0e0;">
Enter code: <strong style="color: #4eff9e; font-size: 18px; letter-spacing: 2px;">${data.user_code}</strong>
</p>
<p style="margin: 0; color: #a0a0a0; font-size: 13px;">
Waiting for authorization... (expires in ${Math.floor(data.expires_in / 60)} minutes)
</p>
</div>
`;
try {
window.open(data.verification_uri, 'codex-auth', 'width=600,height=700');
} catch (e) {
console.error('Could not open auth window:', e);
}
let pollCount = 0;
const maxPolls = Math.floor(data.expires_in / data.interval);
const pollInterval = setInterval(async () => {
pollCount++;
try {
const pollResponse = await fetch('/dashboard/codex/auth/poll', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
}
});
const pollData = await pollResponse.json();
if (pollData.status === 'approved') {
clearInterval(pollInterval);
statusEl.style.background = '#0f4020';
statusEl.style.border = '1px solid #4eff9e';
statusEl.innerHTML = '<p style="margin: 0; color: #4eff9e;">✓ Codex authentication successful! Credentials saved.</p>';
} else if (pollData.status === 'denied') {
clearInterval(pollInterval);
statusEl.style.background = '#402010';
statusEl.style.border = '1px solid #ff4a4a';
statusEl.innerHTML = '<p style="margin: 0; color: #ff4a4a;">✗ Authorization denied by user.</p>';
} else if (pollData.status === 'expired') {
clearInterval(pollInterval);
statusEl.style.background = '#402010';
statusEl.style.border = '1px solid #ff4a4a';
statusEl.innerHTML = '<p style="margin: 0; color: #ff4a4a;">✗ Authorization code expired. Please try again.</p>';
}
} catch (error) {
console.error('Error polling Codex auth:', error);
}
if (pollCount >= maxPolls) {
clearInterval(pollInterval);
statusEl.style.background = '#402010';
statusEl.style.border = '1px solid #ff4a4a';
statusEl.innerHTML = '<p style="margin: 0; color: #ff4a4a;">✗ Authentication timeout. Please try again.</p>';
}
}, data.interval * 1000);
} catch (error) {
statusEl.style.background = '#402010';
statusEl.style.border = '1px solid #ff4a4a';
statusEl.innerHTML = `<p style="margin: 0; color: #ff4a4a;">✗ Error: ${error.message}</p>`;
}
}
async function checkCodexAuth(key) {
const statusEl = document.getElementById(`codex-auth-status-${key}`);
statusEl.style.display = 'block';
statusEl.style.background = '#0f2840';
statusEl.style.border = '1px solid #4a9eff';
statusEl.innerHTML = '<p style="margin: 0; color: #4a9eff;">🔄 Checking Codex authentication status...</p>';
try {
const response = await fetch('/dashboard/codex/auth/status', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
},
body: JSON.stringify({
provider_key: key,
credentials_file: providersData[key].codex_config?.credentials_file || '~/.aisbf/codex_credentials.json'
})
});
const data = await response.json();
if (data.authenticated) {
statusEl.style.background = '#0f4020';
statusEl.style.border = '1px solid #4eff9e';
const expiresIn = data.expires_in ? ` (expires in ${Math.floor(data.expires_in / (24 * 60 * 60))} days)` : '';
statusEl.innerHTML = `<p style="margin: 0; color: #4eff9e;">✓ Codex authenticated${expiresIn}</p>`;
} else {
statusEl.style.background = '#402010';
statusEl.style.border = '1px solid #ff4a4a';
statusEl.innerHTML = '<p style="margin: 0; color: #ff4a4a;">✗ Not authenticated. Click "Authenticate with Codex" to log in.</p>';
}
} catch (error) {
statusEl.style.background = '#402010';
statusEl.style.border = '1px solid #ff4a4a';
statusEl.innerHTML = `<p style="margin: 0; color: #ff4a4a;">✗ Error: ${error.message}</p>`;
}
}
async function uploadCodexFile(providerKey, file) {
if (!file) return;
const statusEl = document.getElementById(`codex-upload-status-${providerKey}`);
statusEl.innerHTML = '<p style="margin: 0; color: #4a9eff;">🔄 Uploading file...</p>';
const formData = new FormData();
formData.append('file', file);
formData.append('provider_key', providerKey);
formData.append('file_type', 'credentials');
try {
const response = await fetch('/dashboard/providers/upload-auth-file', {
method: 'POST',
body: formData
});
const data = await response.json();
if (data.success) {
statusEl.innerHTML = `<p style="margin: 0; color: #4eff9e;">✓ File uploaded successfully! Path: ${data.file_path}</p>`;
updateCodexConfig(providerKey, 'credentials_file', data.file_path);
} else {
statusEl.innerHTML = `<p style="margin: 0; color: #ff4a4a;">✗ Upload failed: ${data.error}</p>`;
}
} catch (error) {
statusEl.innerHTML = `<p style="margin: 0; color: #ff4a4a;">✗ Error: ${error.message}</p>`;
}
}
async function authenticateKilo(key) { async function authenticateKilo(key) {
const statusEl = document.getElementById(`kilo-auth-status-${key}`); const statusEl = document.getElementById(`kilo-auth-status-${key}`);
statusEl.style.display = 'block'; statusEl.style.display = 'block';
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment