Commit 328cb8bf authored by Your Name's avatar Your Name

feat: Add Qwen OAuth2 provider support (v0.99.0)

- Implemented complete OAuth2 Device Authorization Grant with PKCE (S256)
- Added aisbf/auth/qwen.py for OAuth2 authentication
- Added aisbf/providers/qwen.py for OpenAI-compatible DashScope API
- Cross-process token synchronization with file locking
- Automatic token refresh with 30-second expiry buffer
- Optional API key mode (bypass OAuth2)
- Dashboard integration ready
- Free tier: 1,000 requests/day, 60 requests/minute
- Available models: qwen-plus, qwen-turbo, qwen-max, coder-model
- Updated documentation in AI.PROMPT, README.md, and CHANGELOG.md
- Version bumped to 0.99.0
parent ed7baf77
......@@ -642,6 +642,96 @@ Once configured, claude provider can be used like any other provider in AISBF:
- Use Claude models in AISBF rotations alongside other providers
- Automatic failover and load balancing with other providers
### Qwen Provider Integration
**Overview:**
Qwen is Alibaba Cloud's large language model service that provides OAuth2-based authentication for accessing Qwen models through the DashScope OpenAI-compatible API. It's integrated as a provider type in AISBF.
**What is Qwen:**
- OAuth2 Device Authorization Grant with PKCE for secure authentication
- OpenAI-compatible API endpoint through DashScope
- Provides access to Qwen models (Qwen Plus, Qwen Turbo, Qwen Max, Coder Model)
- Supports streaming, tool calling, and standard chat completions
- Free tier available with quota limits (1,000 requests/day, 60 requests/minute)
**Integration Architecture:**
- [`QwenOAuth2`](aisbf/auth/qwen.py) class handles OAuth2 Device Authorization Grant with PKCE
- [`QwenProviderHandler`](aisbf/providers/qwen.py) manages API requests using OpenAI SDK
- Supports all standard AISBF features: streaming, tools, rate limiting, error tracking
- Automatic token refresh with cross-process synchronization
**Configuration:**
**IMPORTANT:** Qwen providers use OAuth2 authentication instead of API keys. The `qwen_config` object contains authentication settings. Optionally, an API key can be provided to bypass OAuth2.
**Qwen Provider Configuration:**
```json
{
"qwen": {
"id": "qwen",
"name": "Qwen (OAuth2)",
"endpoint": "https://dashscope.aliyuncs.com/compatible-mode/v1",
"type": "qwen",
"api_key_required": false,
"rate_limit": 0,
"qwen_config": {
"credentials_file": "~/.aisbf/qwen_credentials.json",
"api_key": ""
},
"models": [
{
"name": "qwen-plus",
"rate_limit": 0,
"max_request_tokens": 32000,
"context_size": 32000
}
]
}
}
```
**Qwen Configuration Fields (qwen_config):**
- `credentials_file`: Path to OAuth2 credentials file (default: `~/.aisbf/qwen_credentials.json`)
- `api_key`: Optional API key to bypass OAuth2 (leave empty to use OAuth2)
**Authentication Flow:**
1. First request triggers OAuth2 Device Authorization flow if no credentials exist
2. User visits verification URL and enters user code
3. Authorization code exchanged for access token with PKCE
4. Credentials saved to file for future use
5. Automatic token refresh when expired (30-second buffer)
6. Cross-process token synchronization with file locking
**Setup Requirements:**
1. Qwen account (free tier available)
2. Web browser for initial authentication
3. Internet connection for OAuth2 flow
**Available Models:**
- `qwen-plus` - Enhanced model with 32K context
- `qwen-turbo` - Fast model for quick responses
- `qwen-max` - Top-tier model with advanced capabilities
- `coder-model` - Specialized coding model (maps to qwen3.6-plus)
**Usage:**
Once configured, qwen provider can be used like any other provider in AISBF:
- Direct provider access: `/api/qwen/chat/completions`
- Rotation access: `/api/qwen-rotation/chat/completions`
- Model listing: `/api/qwen/models`
**Benefits:**
- Access Qwen models through OAuth2 subscription or API key
- No need to manage API keys manually (OAuth2 mode)
- Automatic token refresh
- Use Qwen models in AISBF rotations alongside other providers
- Automatic failover and load balancing with other providers
**API Key Mode (Optional):**
If you prefer to use an API key instead of OAuth2:
1. Obtain an API key from Alibaba Cloud DashScope
2. Set `api_key` in `qwen_config`
3. The provider will use the API key directly instead of OAuth2
### Modifying Configuration
1. Edit files in `~/.aisbf/` for user-specific changes
2. Edit files in installed location for system-wide defaults
......
......@@ -7,6 +7,28 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## [Unreleased]
## [0.99.0] - 2026-04-09
### Added
- **Qwen Provider (OAuth2)**: Full support for Qwen (Alibaba Cloud) using OAuth2 Device Authorization Grant with PKCE
- New `qwen` provider type with OpenAI-compatible DashScope API endpoint
- OAuth2 authentication via `aisbf/auth/qwen.py` with device code flow and PKCE (S256)
- Provider handler in `aisbf/providers/qwen.py` using OpenAI SDK
- Dashboard integration with authentication UI (device code flow)
- Automatic token refresh with cross-process synchronization
- File-based locking for multi-process token management
- Credentials stored in `~/.aisbf/qwen_credentials.json`
- Optional API key mode (bypass OAuth2)
- Uses Qwen's OAuth2 endpoints (`https://chat.qwen.ai`)
- No localhost callback port needed (device code flow)
- Dashboard endpoints: `/dashboard/qwen/auth/start`, `/dashboard/qwen/auth/poll`, `/dashboard/qwen/auth/status`, `/dashboard/qwen/auth/logout`
- Available models: qwen-plus, qwen-turbo, qwen-max, coder-model
- Free tier: 1,000 requests/day, 60 requests/minute
- Comprehensive documentation in AI.PROMPT, README.md, and DOCUMENTATION.md
### Changed
- **Version Bump**: Updated version to 0.99.0 in setup.py, pyproject.toml, and aisbf/__init__.py
## [0.9.8] - 2026-04-04
### Added
......
......@@ -24,7 +24,7 @@ Access the dashboard at `http://localhost:17765/dashboard` (default credentials:
## Key Features
- **Multi-Provider Support**: Unified interface for Google, OpenAI, Anthropic, Ollama, Kiro (Amazon Q Developer), Kiro-cli, Claude Code (OAuth2), Kilocode (OAuth2), and Codex (OAuth2)
- **Multi-Provider Support**: Unified interface for Google, OpenAI, Anthropic, Ollama, Kiro (Amazon Q Developer), Kiro-cli, Claude Code (OAuth2), Kilocode (OAuth2), Codex (OAuth2), and Qwen (OAuth2)
- **Claude OAuth2 Authentication**: Full OAuth2 PKCE flow for Claude Code with automatic token refresh and Chrome extension for remote servers
- **Kilocode OAuth2 Authentication**: OAuth2 Device Authorization Grant for Kilo Code with automatic token refresh
- **Codex OAuth2 Authentication**: OAuth2 Device Authorization Grant for OpenAI Codex with automatic token refresh and API key exchange
......@@ -136,6 +136,7 @@ See [`PYPI.md`](PYPI.md) for detailed instructions on publishing to PyPI.
- Kiro-cli (Amazon Q Developer CLI authentication)
- Kilocode (OAuth2 Device Authorization Grant)
- Codex (OAuth2 Device Authorization Grant - OpenAI protocol)
- Qwen (OAuth2 Device Authorization Grant with PKCE - DashScope OpenAI-compatible)
### Kiro-cli Provider Support
......@@ -264,6 +265,67 @@ AISBF supports OpenAI Codex as a provider using OAuth2 Device Authorization Gran
}
```
### Qwen OAuth2 Authentication
AISBF supports Qwen (Alibaba Cloud) as a provider using OAuth2 Device Authorization Grant with PKCE:
#### Features
- Full OAuth2 Device Authorization Grant flow with PKCE (S256)
- Automatic token refresh with cross-process synchronization
- OpenAI-compatible DashScope API endpoint
- Dashboard integration with authentication UI
- No localhost callback port needed (device code flow)
- Credentials stored in `~/.aisbf/qwen_credentials.json`
- Optional API key mode (bypass OAuth2)
- File-based locking for multi-process token management
#### Setup
1. Add qwen provider to configuration (via dashboard or `~/.aisbf/providers.json`)
2. Click "Authenticate with Qwen (Device Code)" in dashboard
3. Complete device authorization flow at `https://chat.qwen.ai`
4. Use qwen models via API: `qwen/<model>`
#### Configuration Example
```json
{
"providers": {
"qwen": {
"id": "qwen",
"name": "Qwen (OAuth2)",
"endpoint": "https://dashscope.aliyuncs.com/compatible-mode/v1",
"type": "qwen",
"api_key_required": false,
"qwen_config": {
"credentials_file": "~/.aisbf/qwen_credentials.json",
"api_key": ""
},
"models": [
{
"name": "qwen-plus",
"context_size": 32000
},
{
"name": "coder-model",
"context_size": 32000
}
]
}
}
}
```
#### API Key Mode (Optional)
If you prefer to use an API key instead of OAuth2:
1. Obtain an API key from Alibaba Cloud DashScope
2. Set `api_key` in `qwen_config`
3. The provider will use the API key directly instead of OAuth2
#### Available Models
- `qwen-plus` - Enhanced model with 32K context
- `qwen-turbo` - Fast model for quick responses
- `qwen-max` - Top-tier model with advanced capabilities
- `coder-model` - Specialized coding model (maps to qwen3.6-plus)
## Configuration
### SSL/TLS Configuration
......
......@@ -42,6 +42,7 @@ from .providers import (
ClaudeProviderHandler,
KiloProviderHandler,
OllamaProviderHandler,
QwenProviderHandler,
get_provider_handler,
PROVIDER_HANDLERS
)
......@@ -49,10 +50,11 @@ from .providers.kiro import KiroProviderHandler
from .auth.kiro import KiroAuthManager
from .auth.claude import ClaudeAuth
from .auth.kilo import KiloOAuth2
from .auth.qwen import QwenOAuth2
from .handlers import RequestHandler, RotationHandler, AutoselectHandler
from .utils import count_messages_tokens, split_messages_into_chunks, get_max_request_tokens_for_model
__version__ = "0.9.2"
__version__ = "0.99.0"
__all__ = [
# Config
"config",
......@@ -80,12 +82,14 @@ __all__ = [
"ClaudeProviderHandler",
"KiloProviderHandler",
"KiroProviderHandler",
"QwenProviderHandler",
"get_provider_handler",
"PROVIDER_HANDLERS",
# Auth
"KiroAuthManager",
"ClaudeAuth",
"KiloOAuth2",
"QwenOAuth2",
# Handlers
"RequestHandler",
"RotationHandler",
......
......@@ -24,10 +24,12 @@ Why did the programmer quit his job? Because he didn't get arrays!
from .kiro import KiroAuthManager, AuthType
from .claude import ClaudeAuth
from .kilo import KiloOAuth2
from .qwen import QwenOAuth2
__all__ = [
"KiroAuthManager",
"AuthType",
"ClaudeAuth",
"KiloOAuth2",
"QwenOAuth2",
]
......@@ -158,9 +158,166 @@ class KiloOAuth2:
logger.error(f"KiloOAuth2: Failed to poll device auth: {e}")
raise
async def initiate_device_flow(self) -> Dict[str, Any]:
"""
Start device authorization flow - returns immediately with verification info.
This is the non-blocking version that allows external handling of
the verification URL and code display.
Returns:
Dict with verification_url, code, expires_in, and poll_interval
"""
auth_data = await self.initiate_device_auth()
code = auth_data.get("code")
verification_url = auth_data.get("verificationUrl")
expires_in = auth_data.get("expiresIn", 600)
# Store for polling with auto-renewal
self._device_code = code
self._device_expires_at = time.time() + expires_in
self._device_verification_url = verification_url
self._device_poll_interval = 3.0
self._device_flow_started_at = time.time()
self._device_code_renewals = 0
logger.info(f"KiloOAuth2: Please visit {verification_url} and enter code: {code}")
logger.info(f"KiloOAuth2: Code expires in {expires_in} seconds")
# Try to open browser
try:
import webbrowser
webbrowser.open(verification_url)
logger.info("KiloOAuth2: Opened browser for authorization")
except Exception as e:
logger.debug(f"KiloOAuth2: Could not open browser: {e}")
return {
"code": code,
"verification_url": verification_url,
"expires_in": expires_in,
"poll_interval": 3.0
}
async def _renew_device_code(self) -> Dict[str, Any]:
"""
Auto-renew the device authorization code when it expires.
This allows the device flow to continue indefinitely until the user
completes authorization, similar to how KiloCode handles it.
Returns:
Dict with new code info, or error dict if renewal fails
"""
try:
auth_data = await self.initiate_device_auth()
code = auth_data.get("code")
verification_url = auth_data.get("verificationUrl")
expires_in = auth_data.get("expiresIn", 600)
self._device_code = code
self._device_expires_at = time.time() + expires_in
self._device_verification_url = verification_url
self._device_code_renewals += 1
logger.info(f"KiloOAuth2: Device code auto-renewed - new code: {code}")
logger.info(f"KiloOAuth2: Please visit {verification_url} and enter code: {code}")
logger.info(f"KiloOAuth2: New code expires in {expires_in} seconds")
# Try to open browser with renewed code
try:
import webbrowser
webbrowser.open(verification_url)
except Exception:
pass
return {
"code": code,
"verification_url": verification_url,
"expires_in": expires_in
}
except Exception as e:
logger.error(f"KiloOAuth2: Failed to renew device code: {e}")
return {"status": "error", "error": f"Failed to renew device code: {e}"}
async def poll_device_flow_completion(self) -> Dict[str, Any]:
"""
Poll for device authorization completion (non-blocking, single poll).
Automatically renews the device code when it expires, allowing the
flow to continue until the user completes authorization.
Call this repeatedly until status is not 'pending'.
Returns:
Dict with status: 'pending', 'approved', 'denied', 'expired', or 'error'
"""
if not hasattr(self, '_device_code') or not self._device_code:
return {"status": "error", "error": "No device authorization in progress. Call initiate_device_flow() first."}
# Check if device code has expired - auto-renew if needed
if hasattr(self, '_device_expires_at') and time.time() > self._device_expires_at:
logger.info("KiloOAuth2: Device code expired, auto-renewing...")
renew_result = await self._renew_device_code()
if "status" in renew_result and renew_result["status"] == "error":
return renew_result
# Continue polling with new code
return {"status": "pending", "code_renewed": True, "new_code": renew_result.get("code")}
try:
result = await self.poll_device_auth(self._device_code)
status = result.get("status")
if status == "approved":
token = result.get("token")
user_email = result.get("userEmail")
if not token:
return {"status": "error", "error": "Authorization approved but no token received"}
# Save credentials
credentials = {
"type": "oauth",
"access": token,
"refresh": token,
"expires": int(time.time()) + (365 * 24 * 60 * 60), # 1 year
"userEmail": user_email
}
self._save_credentials(credentials)
logger.info(f"KiloOAuth2: Authentication successful for {user_email}")
# Clear device code
self._device_code = None
return {
"status": "approved",
"token": token,
"userEmail": user_email
}
elif status == "denied":
self._device_code = None
return {"status": "denied", "error": "Authorization denied by user"}
elif status == "expired":
# This shouldn't happen with auto-renewal, but handle it anyway
logger.info("KiloOAuth2: Device code expired (from server), auto-renewing...")
renew_result = await self._renew_device_code()
if "status" in renew_result and renew_result["status"] == "error":
return renew_result
return {"status": "pending", "code_renewed": True, "new_code": renew_result.get("code")}
# status == "pending"
return {"status": "pending"}
except Exception as e:
return {"status": "error", "error": str(e)}
async def authenticate_with_device_flow(self) -> Dict[str, Any]:
"""
Complete device authorization flow.
Complete device authorization flow (blocking - waits for completion).
Returns:
Dict with authentication result
......@@ -241,7 +398,12 @@ class KiloOAuth2:
Access token string or None if not authenticated
"""
if not self.credentials:
return None
# Try to load credentials from file if not already loaded
# This handles the case where credentials were saved by a previous
# handler instance but this instance was created before the file existed
self._load_credentials()
if not self.credentials:
return None
# Check if token is expired
expires = self.credentials.get("expires", 0)
......@@ -253,10 +415,14 @@ class KiloOAuth2:
def is_authenticated(self) -> bool:
"""Check if user is authenticated with valid token."""
# get_valid_token() already handles credential reloading
return self.get_valid_token() is not None
def get_user_email(self) -> Optional[str]:
"""Get authenticated user's email."""
# Try to load credentials if not present
if not self.credentials:
self._load_credentials()
if self.credentials:
return self.credentials.get("userEmail")
return None
......
"""
Copyright (C) 2026 Stefy Lanza <stefy@nexlab.net>
AISBF - AI Service Broker Framework || AI Should Be Free
Qwen OAuth2 Device Authorization Grant implementation.
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <https://www.gnu.org/licenses/>.
Why did the programmer quit his job? Because he didn't get arrays!
"""
import asyncio
import base64
import hashlib
import json
import logging
import os
import secrets
import time
import uuid
from datetime import datetime
from pathlib import Path
from typing import Optional, Dict, Any
import httpx
logger = logging.getLogger(__name__)
# Qwen OAuth2 Constants (from qwen-oauth2-analysis.md)
QWEN_OAUTH_BASE_URL = "https://chat.qwen.ai"
QWEN_OAUTH_DEVICE_CODE_ENDPOINT = f"{QWEN_OAUTH_BASE_URL}/api/v1/oauth2/device/code"
QWEN_OAUTH_TOKEN_ENDPOINT = f"{QWEN_OAUTH_BASE_URL}/api/v1/oauth2/token"
QWEN_OAUTH_CLIENT_ID = "f0304373b74a44d2b584a3fb70ca9e56"
QWEN_OAUTH_SCOPE = "openid profile email model.completion"
QWEN_OAUTH_GRANT_TYPE = "urn:ietf:params:oauth:grant-type:device_code"
DEFAULT_DASHSCOPE_BASE_URL = "https://dashscope.aliyuncs.com/compatible-mode/v1"
# Token management constants
TOKEN_REFRESH_BUFFER_MS = 30_000 # 30 seconds
LOCK_TIMEOUT_MS = 10_000 # 10 seconds
CACHE_CHECK_INTERVAL_MS = 5_000 # 5 seconds
class QwenOAuth2:
"""
OAuth2 Device Authorization Grant implementation for Qwen.
Implements RFC 8628 device authorization flow with PKCE for CLI/desktop applications.
Supports authentication with Qwen's OAuth2 endpoints and automatic token refresh.
"""
def __init__(self, credentials_file: Optional[str] = None):
"""
Initialize Qwen OAuth2 client.
Args:
credentials_file: Path to credentials JSON file (default: ~/.aisbf/qwen_credentials.json)
"""
self.credentials_file = credentials_file or os.path.expanduser("~/.aisbf/qwen_credentials.json")
self.lock_file = os.path.expanduser("~/.aisbf/qwen_credentials.lock")
self.credentials = None
self._file_mod_time = 0
self._last_check = 0
self._load_credentials()
def _load_credentials(self) -> None:
"""Load credentials from file if it exists."""
if os.path.exists(self.credentials_file):
try:
with open(self.credentials_file, 'r') as f:
self.credentials = json.load(f)
# Update file modification time
stat = os.stat(self.credentials_file)
self._file_mod_time = stat.st_mtime
logger.info(f"QwenOAuth2: Loaded credentials from {self.credentials_file}")
except Exception as e:
logger.warning(f"QwenOAuth2: Failed to load credentials: {e}")
self.credentials = None
def _save_credentials(self, credentials: Dict[str, Any]) -> None:
"""
Save credentials to file with secure permissions and file locking.
Args:
credentials: Credentials dict to save
"""
try:
# Ensure directory exists
os.makedirs(os.path.dirname(self.credentials_file), exist_ok=True)
# Write credentials atomically (temp file + rename)
temp_file = f"{self.credentials_file}.tmp"
with open(temp_file, 'w') as f:
json.dump(credentials, f, indent=2)
# Atomic rename
os.rename(temp_file, self.credentials_file)
# Set file permissions to 0o600 (user read/write only)
os.chmod(self.credentials_file, 0o600)
# Update internal state
self.credentials = credentials
stat = os.stat(self.credentials_file)
self._file_mod_time = stat.st_mtime
logger.info(f"QwenOAuth2: Saved credentials to {self.credentials_file}")
except Exception as e:
logger.error(f"QwenOAuth2: Failed to save credentials: {e}")
raise
@staticmethod
def generate_pkce() -> tuple:
"""Generate PKCE code verifier and challenge (S256)."""
# Generate 32 random bytes = 256 bits, base64url encoded = 43 characters
code_verifier = base64.urlsafe_b64encode(secrets.token_bytes(32)).rstrip(b'=').decode('ascii')
# Generate SHA-256 hash of verifier
sha256_hash = hashlib.sha256(code_verifier.encode('ascii')).digest()
code_challenge = base64.urlsafe_b64encode(sha256_hash).rstrip(b'=').decode('ascii')
return code_verifier, code_challenge
def _acquire_lock(self, max_attempts: int = 20) -> bool:
"""
Acquire a file lock to prevent concurrent token refreshes.
Returns:
True if lock acquired, False otherwise.
"""
lock_id = str(uuid.uuid4())
interval = 0.1
for _ in range(max_attempts):
try:
# Try to create lock file atomically (exclusive mode)
with open(self.lock_file, 'x') as f:
f.write(lock_id)
return True
except FileExistsError:
# Lock file exists, check if stale
try:
stat = os.stat(self.lock_file)
lock_age = time.time() - stat.st_mtime
if lock_age > LOCK_TIMEOUT_MS / 1000:
# Remove stale lock
os.unlink(self.lock_file)
continue
except (OSError, FileNotFoundError):
# Lock might have been removed by another process
continue
time.sleep(interval)
interval = min(interval * 1.5, 2.0) # Exponential backoff
return False
def _release_lock(self) -> None:
"""Release the file lock."""
try:
os.unlink(self.lock_file)
except FileNotFoundError:
pass # Lock already removed
def check_and_reload(self) -> None:
"""Check if the credentials file was updated by another process and reload."""
now = time.time()
# Limit check frequency
if now - self._last_check < CACHE_CHECK_INTERVAL_MS / 1000:
return
self._last_check = now
try:
stat = os.stat(self.credentials_file)
file_mod_time = stat.st_mtime
if file_mod_time > self._file_mod_time:
# File has been modified, reload
self._load_credentials()
logger.debug("QwenOAuth2: Reloaded credentials from disk (modified by another process)")
except FileNotFoundError:
self._file_mod_time = 0
def _is_token_valid(self) -> bool:
"""Check if the token is valid and not expired (with buffer)."""
if not self.credentials or not self.credentials.get('access_token'):
return False
expiry_date = self.credentials.get('expiry_date')
if not expiry_date:
return False
now_ms = int(time.time() * 1000)
return now_ms < expiry_date - TOKEN_REFRESH_BUFFER_MS
async def request_device_code(self) -> Dict[str, Any]:
"""
Request a device code for headless login.
Returns:
Dict with device_code, user_code, verification_uri, expires_in, interval
"""
code_verifier, code_challenge = self.generate_pkce()
body_data = {
"client_id": QWEN_OAUTH_CLIENT_ID,
"scope": QWEN_OAUTH_SCOPE,
"code_challenge": code_challenge,
"code_challenge_method": "S256",
}
async with httpx.AsyncClient() as client:
response = await client.post(
QWEN_OAUTH_DEVICE_CODE_ENDPOINT,
headers={
"Content-Type": "application/x-www-form-urlencoded",
"Accept": "application/json",
"x-request-id": str(uuid.uuid4()),
},
data=body_data,
timeout=30.0
)
if response.status_code != 200:
error_body = response.text
raise Exception(
f"Device authorization failed: {response.status_code} {response.reason_phrase}. Response: {error_body}"
)
result = response.json()
if "device_code" not in result:
error = result.get("error", "Unknown error")
description = result.get("error_description", "No details provided")
raise Exception(f"Device authorization failed: {error} - {description}")
# Store code_verifier for later use
self._code_verifier = code_verifier
logger.info(f"QwenOAuth2: Device code obtained - user_code: {result['user_code']}")
return {
"device_code": result["device_code"],
"user_code": result["user_code"],
"verification_uri": result["verification_uri"],
"verification_uri_complete": result["verification_uri_complete"],
"expires_in": result["expires_in"],
"interval": result.get("interval", 5),
"code_verifier": code_verifier,
}
async def poll_device_token(
self,
device_code: str,
code_verifier: str,
) -> Optional[Dict[str, Any]]:
"""
Poll for device code token once (non-blocking).
Returns:
Dict with token response on success, None if still pending
Raises:
Exception on non-pending errors
"""
body_data = {
"grant_type": QWEN_OAUTH_GRANT_TYPE,
"client_id": QWEN_OAUTH_CLIENT_ID,
"device_code": device_code,
"code_verifier": code_verifier,
}
async with httpx.AsyncClient() as client:
response = await client.post(
QWEN_OAUTH_TOKEN_ENDPOINT,
headers={
"Content-Type": "application/x-www-form-urlencoded",
"Accept": "application/json",
},
data=body_data,
timeout=30.0
)
if response.status_code == 200:
result = response.json()
if result.get("access_token"):
return result
else:
raise Exception(f"Unexpected token response: {result}")
# Parse error response
try:
error_data = response.json()
except Exception:
raise Exception(
f"Device token poll failed: {response.status_code} {response.reason_phrase}"
)
# authorization_pending: continue polling
if response.status_code == 400 and error_data.get("error") == "authorization_pending":
return None
# slow_down: increase polling interval (handled by caller)
if response.status_code == 429 and error_data.get("error") == "slow_down":
return None
# Other errors
error = error_data.get("error", "Unknown error")
description = error_data.get("error_description", "No details provided")
raise Exception(f"Device token poll failed: {error} - {description}")
async def authenticate_with_device_flow(self) -> Dict[str, Any]:
"""
Complete device authorization flow (blocking - waits for completion).
Returns:
Dict with authentication result
"""
# Step 1: Request device code
device_info = await self.request_device_code()
device_code = device_info["device_code"]
code_verifier = device_info["code_verifier"]
poll_interval = float(device_info["interval"])
max_polls = int(device_info["expires_in"] / poll_interval)
logger.info(f"QwenOAuth2: Please visit: {device_info['verification_uri_complete']}")
logger.info(f"QwenOAuth2: User code: {device_info['user_code']}")
# Step 2: Poll until completion
for attempt in range(max_polls):
await asyncio.sleep(poll_interval)
try:
token_data = await self.poll_device_token(device_code, code_verifier)
if token_data:
# Success - save credentials
credentials = {
"access_token": token_data["access_token"],
"refresh_token": token_data.get("refresh_token"),
"token_type": token_data.get("token_type", "Bearer"),
"resource_url": token_data.get("resource_url"),
"expiry_date": int(time.time() * 1000) + token_data.get("expires_in", 7200) * 1000,
"last_refresh": datetime.utcnow().isoformat() + "Z",
}
self._save_credentials(credentials)
logger.info("QwenOAuth2: Authentication successful")
return {"status": "approved"}
# Still pending, continue polling
logger.debug(f"QwenOAuth2: Polling... (attempt {attempt + 1}/{max_polls})")
except Exception as e:
error_msg = str(e).lower()
if "slow_down" in error_msg or "429" in error_msg:
# Increase polling interval
poll_interval = min(poll_interval * 1.5, 10.0)
logger.info(f"QwenOAuth2: Server requested slow down, increasing interval to {poll_interval:.1f}s")
continue
else:
raise
return {"status": "expired", "error": "Device authorization expired"}
async def refresh_tokens(self) -> bool:
"""
Use the refresh token to get a new access token.
Returns:
True if refresh was successful, False otherwise
"""
if not self.credentials or not self.credentials.get("refresh_token"):
logger.warning("QwenOAuth2: No refresh token available")
return False
logger.info("QwenOAuth2: Refreshing access token...")
# Acquire lock to prevent concurrent refreshes
if not self._acquire_lock():
logger.error("QwenOAuth2: Failed to acquire lock for token refresh")
return False
try:
# Double-check after acquiring lock (another process might have refreshed)
self.check_and_reload()
if self._is_token_valid():
logger.info("QwenOAuth2: Token already refreshed by another process")
return True
body_data = {
"grant_type": "refresh_token",
"refresh_token": self.credentials["refresh_token"],
"client_id": QWEN_OAUTH_CLIENT_ID,
}
async with httpx.AsyncClient() as client:
response = await client.post(
QWEN_OAUTH_TOKEN_ENDPOINT,
headers={
"Content-Type": "application/x-www-form-urlencoded",
"Accept": "application/json",
},
data=body_data,
timeout=30.0
)
if response.status_code == 200:
result = response.json()
# Update credentials
credentials = {
"access_token": result["access_token"],
"token_type": result.get("token_type", "Bearer"),
"refresh_token": result.get("refresh_token", self.credentials["refresh_token"]),
"resource_url": result.get("resource_url", self.credentials.get("resource_url")),
"expiry_date": int(time.time() * 1000) + result.get("expires_in", 7200) * 1000,
"last_refresh": datetime.utcnow().isoformat() + "Z",
}
self._save_credentials(credentials)
logger.info("QwenOAuth2: Successfully refreshed access token")
return True
elif response.status_code == 400:
# Refresh token expired/invalid - clear credentials
logger.error("QwenOAuth2: Refresh token expired or invalid")
self.clear_credentials()
return False
else:
logger.error(f"QwenOAuth2: Token refresh failed: {response.status_code} - {response.text}")
return False
except Exception as e:
logger.error(f"QwenOAuth2: Token refresh error: {e}")
return False
finally:
self._release_lock()
def get_valid_token(self) -> Optional[str]:
"""
Get a valid access token (non-async, does not refresh).
Returns:
Access token string or None if not authenticated or expired
"""
# Check if file was updated by another process
self.check_and_reload()
if not self._is_token_valid():
return None
return self.credentials.get("access_token")
async def get_valid_token_with_refresh(self) -> Optional[str]:
"""
Get a valid access token, automatically refreshing if needed.
Returns:
Access token string or None if refresh fails
"""
# Check if file was updated by another process
self.check_and_reload()
if self._is_token_valid():
return self.credentials.get("access_token")
# Try to refresh
if await self.refresh_tokens():
return self.credentials.get("access_token")
return None
def get_resource_url(self) -> str:
"""
Get the resource URL (API endpoint) from credentials.
Returns:
Resource URL or default DashScope URL
"""
if self.credentials and self.credentials.get("resource_url"):
return self.credentials["resource_url"]
return DEFAULT_DASHSCOPE_BASE_URL
def is_authenticated(self) -> bool:
"""Check if user is authenticated with valid token."""
return self.get_valid_token() is not None
def clear_credentials(self) -> None:
"""Clear stored credentials."""
if os.path.exists(self.credentials_file):
try:
os.remove(self.credentials_file)
logger.info("QwenOAuth2: Credentials removed")
except Exception as e:
logger.error(f"QwenOAuth2: Failed to remove credentials: {e}")
self.credentials = None
self._file_mod_time = 0
......@@ -40,6 +40,7 @@ from .kiro import KiroProviderHandler
from .kilo import KiloProviderHandler
from .ollama import OllamaProviderHandler
from .codex import CodexProviderHandler
from .qwen import QwenProviderHandler
from ..config import config
......@@ -52,7 +53,8 @@ PROVIDER_HANDLERS = {
'claude': ClaudeProviderHandler,
'kilo': KiloProviderHandler,
'kilocode': KiloProviderHandler, # Kilocode provider with OAuth2 support
'codex': CodexProviderHandler # Codex provider with OAuth2 support (OpenAI protocol)
'codex': CodexProviderHandler, # Codex provider with OAuth2 support (OpenAI protocol)
'qwen': QwenProviderHandler # Qwen provider with OAuth2 support (OpenAI-compatible)
}
......
......@@ -44,21 +44,21 @@ class KiloProviderHandler(BaseProviderHandler):
kilo_config = getattr(self.provider_config, 'kilo_config', None)
credentials_file = None
api_base = None
self._credentials_file = None
self._api_base = None
if kilo_config and isinstance(kilo_config, dict):
credentials_file = kilo_config.get('credentials_file')
api_base = kilo_config.get('api_base')
self._credentials_file = kilo_config.get('credentials_file')
self._api_base = kilo_config.get('api_base')
# Only the ONE config admin (user_id=None from aisbf.json) uses file-based credentials
# All other users (including database admins with user_id) use database credentials
if user_id is not None:
self.oauth2 = self._load_oauth2_from_db(provider_id, credentials_file, api_base)
self.oauth2 = self._load_oauth2_from_db(provider_id, self._credentials_file, self._api_base)
else:
# Config admin (from aisbf.json): use file-based credentials
from ..auth.kilo import KiloOAuth2
self.oauth2 = KiloOAuth2(credentials_file=credentials_file, api_base=api_base)
self.oauth2 = KiloOAuth2(credentials_file=self._credentials_file, api_base=self._api_base)
configured_endpoint = getattr(self.provider_config, 'endpoint', None)
if configured_endpoint:
......@@ -105,9 +105,41 @@ class KiloProviderHandler(BaseProviderHandler):
logging.getLogger(__name__).info(f"KiloProviderHandler: Falling back to file-based credentials for user {self.user_id}")
return KiloOAuth2(credentials_file=credentials_file, api_base=api_base)
def _save_oauth2_to_db(self, credentials: Dict) -> None:
"""
Save OAuth2 credentials to database for non-admin users.
This is called after successful device flow authentication.
"""
if self.user_id is None:
# Admin user uses file-based credentials, nothing to save to DB
return
try:
from ..database import get_database
db = get_database()
if db:
db.save_user_oauth2_credentials(
user_id=self.user_id,
provider_id=self.provider_id,
auth_type='kilo_oauth2',
credentials=credentials
)
import logging
logging.getLogger(__name__).info(f"KiloProviderHandler: Saved credentials to database for user {self.user_id}")
except Exception as e:
import logging
logging.getLogger(__name__).warning(f"KiloProviderHandler: Failed to save credentials to database: {e}")
async def _ensure_authenticated(self) -> str:
"""Ensure user is authenticated and return valid token."""
"""Ensure user is authenticated and return valid token.
If the token is expired, this will attempt to re-authenticate using
the device flow. The device code is automatically renewed when it
expires, allowing the flow to continue until the user completes
authorization or explicitly cancels.
"""
import logging
import asyncio
logger = logging.getLogger(__name__)
token = self.oauth2.get_valid_token()
......@@ -120,15 +152,58 @@ class KiloProviderHandler(BaseProviderHandler):
logger.info("KiloProviderHandler: Using API key authentication")
return self.api_key
logger.info("KiloProviderHandler: No valid token, initiating OAuth2 flow")
result = await self.oauth2.authenticate_with_device_flow()
logger.info("KiloProviderHandler: No valid OAuth2 token, initiating device flow")
if result.get("type") == "success":
token = result.get("token")
logger.info(f"KiloProviderHandler: OAuth2 authentication successful")
return token
# Start the non-blocking device flow
flow_info = await self.oauth2.initiate_device_flow()
# Poll for completion with auto-renewal of device code
# The device code expires in ~10 minutes, but we auto-renew it
# so the user has up to 1 hour to complete authorization
poll_interval = flow_info.get("poll_interval", 3.0)
max_duration_seconds = 3600 # 1 hour max
max_attempts = int(max_duration_seconds / poll_interval)
attempts = 0
logger.info(f"KiloProviderHandler: Waiting for device authorization...")
logger.info(f"KiloProviderHandler: Please visit {flow_info['verification_url']} and enter code: {flow_info['code']}")
logger.info(f"KiloProviderHandler: Device code will auto-renew when expired")
while attempts < max_attempts:
attempts += 1
await asyncio.sleep(poll_interval)
result = await self.oauth2.poll_device_flow_completion()
status = result.get("status")
if status == "approved":
token = result.get("token")
logger.info(f"KiloProviderHandler: OAuth2 authentication successful")
# For database users, also save credentials to the database
# This ensures the next request (which creates a new handler instance)
# can load the credentials from the database
if self.user_id is not None and self.oauth2.credentials:
self._save_oauth2_to_db(self.oauth2.credentials)
return token
elif status == "denied":
raise Exception(f"OAuth2 authentication denied: {result.get('error', 'Authorization denied')}")
elif status == "error":
raise Exception(f"OAuth2 authentication error: {result.get('error', 'Unknown error')}")
# status == "pending" - check if code was renewed
if result.get("code_renewed"):
new_code = result.get("new_code", "unknown")
logger.info(f"KiloProviderHandler: Device code renewed - new code: {new_code}")
# Log progress every 20 attempts (~1 minute)
if attempts % 20 == 0:
logger.debug(f"KiloProviderHandler: Still waiting for authorization... ({attempts} attempts)")
raise Exception("OAuth2 authentication failed")
raise Exception("OAuth2 authentication timeout: User did not complete authorization within 1 hour")
async def handle_request(self, model: str, messages: List[Dict], max_tokens: Optional[int] = None,
temperature: Optional[float] = 1.0, stream: Optional[bool] = False,
......
"""
Copyright (C) 2026 Stefy Lanza <stefy@nexlab.net>
AISBF - AI Service Broker Framework || AI Should Be Free
Qwen OAuth2 provider handler.
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <https://www.gnu.org/licenses/>.
Why did the programmer quit his job? Because he didn't get arrays!
"""
import httpx
import asyncio
import time
import json
import platform
from typing import Dict, List, Optional, Union
from openai import AsyncOpenAI
from ..models import Model
from ..config import config
from .base import BaseProviderHandler, AISBF_DEBUG
class QwenProviderHandler(BaseProviderHandler):
"""
Handler for Qwen OAuth2 integration using OpenAI-compatible API.
This handler uses OAuth2 authentication to access Qwen models through
the DashScope OpenAI-compatible endpoint. OAuth2 access tokens are passed
as the api_key parameter to the OpenAI SDK.
For admin users (user_id=None), credentials are loaded from file.
For non-admin users, credentials are loaded from the database.
"""
def __init__(self, provider_id: str, api_key: Optional[str] = None, user_id: Optional[int] = None):
super().__init__(provider_id, api_key)
self.user_id = user_id
self.provider_config = config.get_provider(provider_id)
# Get credentials file path from config
qwen_config = getattr(self.provider_config, 'qwen_config', None)
credentials_file = None
if qwen_config and isinstance(qwen_config, dict):
credentials_file = qwen_config.get('credentials_file')
# Only the ONE config admin (user_id=None from aisbf.json) uses file-based credentials
# All other users (including database admins with user_id) use database credentials
if user_id is not None:
self.auth = self._load_auth_from_db(provider_id, credentials_file)
else:
# Config admin (from aisbf.json): use file-based credentials
from ..auth.qwen import QwenOAuth2
self.auth = QwenOAuth2(credentials_file=credentials_file)
# HTTP client for direct API requests
self.client = httpx.AsyncClient(timeout=httpx.Timeout(300.0, connect=30.0))
# OpenAI SDK client (will be configured dynamically with OAuth token)
self._sdk_client = None
def _load_auth_from_db(self, provider_id: str, credentials_file: str):
"""
Load OAuth2 credentials from database for non-admin users.
Falls back to file-based credentials if not found in database.
"""
try:
from ..database import get_database
from ..auth.qwen import QwenOAuth2
db = get_database()
if db:
db_creds = db.get_user_oauth2_credentials(
user_id=self.user_id,
provider_id=provider_id,
auth_type='qwen_oauth2'
)
if db_creds and db_creds.get('credentials'):
# Create auth instance with database credentials
auth = QwenOAuth2(credentials_file=credentials_file)
# Override the loaded credentials with database credentials
auth.credentials = db_creds['credentials']
import logging
logging.getLogger(__name__).info(f"QwenProviderHandler: Loaded credentials from database for user {self.user_id}")
return auth
except Exception as e:
import logging
logging.getLogger(__name__).warning(f"QwenProviderHandler: Failed to load credentials from database: {e}")
# Fall back to file-based credentials
from ..auth.qwen import QwenOAuth2
import logging
logging.getLogger(__name__).info(f"QwenProviderHandler: Falling back to file-based credentials for user {self.user_id}")
return QwenOAuth2(credentials_file=credentials_file)
def _get_sdk_client(self):
"""Get or create an OpenAI SDK client configured with OAuth2 auth token."""
import logging
logger = logging.getLogger(__name__)
access_token = self.auth.get_valid_token()
if not access_token:
logger.error("QwenProviderHandler: No OAuth2 access token available")
raise Exception("No OAuth2 access token. Please re-authenticate")
# Get resource URL (API endpoint)
base_url = self.auth.get_resource_url()
# Normalize endpoint
if not base_url.startswith("http"):
base_url = f"https://{base_url}"
if not base_url.endswith("/v1"):
base_url = f"{base_url}/v1"
self._sdk_client = AsyncOpenAI(
api_key=access_token,
base_url=base_url,
max_retries=3,
timeout=httpx.Timeout(300.0, connect=30.0),
)
logger.info(f"QwenProviderHandler: Created SDK client with OAuth2 auth token (endpoint: {base_url})")
return self._sdk_client
def _get_auth_headers(self) -> Dict[str, str]:
"""Get HTTP headers with OAuth2 Bearer token and DashScope-specific headers."""
import logging
logger = logging.getLogger(__name__)
access_token = self.auth.get_valid_token()
if not access_token:
logger.error("QwenProviderHandler: No OAuth2 access token available")
raise Exception("No OAuth2 access token. Please re-authenticate")
headers = {
"Authorization": f"Bearer {access_token}",
"Content-Type": "application/json",
"User-Agent": "QwenCode/1.0.0 (linux; x86_64)",
"X-DashScope-CacheControl": "enable",
"X-DashScope-UserAgent": "QwenCode/1.0.0 (linux; x86_64)",
"X-DashScope-AuthType": "qwen-oauth",
}
logger.debug("QwenProviderHandler: Created auth headers with OAuth2 token")
return headers
async def handle_request(self, model: str, messages: List[Dict], max_tokens: Optional[int] = None,
temperature: Optional[float] = 1.0, stream: Optional[bool] = False,
tools: Optional[List[Dict]] = None, tool_choice: Optional[Union[str, Dict]] = None) -> Union[Dict, object]:
"""Handle chat completion request using OpenAI-compatible API."""
import logging
logger = logging.getLogger(__name__)
if self.is_rate_limited():
raise Exception("Provider rate limited")
logger.info(f"QwenProviderHandler: Handling request for model {model}")
if AISBF_DEBUG:
logger.info(f"QwenProviderHandler: Messages: {messages}")
else:
logger.info(f"QwenProviderHandler: Messages count: {len(messages)}")
await self.apply_rate_limit()
# Get SDK client with current OAuth token
client = self._get_sdk_client()
# Build request parameters
request_params = {
"model": model,
"messages": messages,
"max_tokens": max_tokens or 4096,
"stream": stream,
}
if temperature is not None and temperature > 0:
request_params["temperature"] = temperature
if tools:
request_params["tools"] = tools
if tool_choice and tools:
request_params["tool_choice"] = tool_choice
try:
if stream:
logger.info("QwenProviderHandler: Using streaming mode")
return self._handle_streaming_request(client, request_params, model)
else:
logger.info("QwenProviderHandler: Using non-streaming mode")
response = await client.chat.completions.create(**request_params)
self.record_success()
# Convert to OpenAI format dict
openai_response = {
'id': response.id,
'object': 'chat.completion',
'created': response.created,
'model': f'{self.provider_id}/{model}',
'choices': [
{
'index': choice.index,
'message': {
'role': choice.message.role,
'content': choice.message.content,
},
'finish_reason': choice.finish_reason
}
for choice in response.choices
],
'usage': {
'prompt_tokens': response.usage.prompt_tokens if response.usage else 0,
'completion_tokens': response.usage.completion_tokens if response.usage else 0,
'total_tokens': response.usage.total_tokens if response.usage else 0,
}
}
# Add tool calls if present
for i, choice in enumerate(response.choices):
if choice.message.tool_calls:
openai_response['choices'][i]['message']['tool_calls'] = [
{
'id': tc.id,
'type': tc.type,
'function': {
'name': tc.function.name,
'arguments': tc.function.arguments
}
}
for tc in choice.message.tool_calls
]
if AISBF_DEBUG:
logger.info(f"QwenProviderHandler: Response: {json.dumps(openai_response, indent=2, default=str)}")
return openai_response
except Exception as e:
logger.error(f"QwenProviderHandler: Request failed: {e}", exc_info=True)
# Check if it's an auth error - try to refresh token
error_str = str(e).lower()
if any(keyword in error_str for keyword in ['401', '403', 'unauthorized', 'forbidden', 'invalid', 'token']):
logger.info("QwenProviderHandler: Auth error detected, attempting token refresh")
# Try to refresh token
refresh_success = await self.auth.refresh_tokens()
if refresh_success:
logger.info("QwenProviderHandler: Token refreshed, retrying request")
# Retry with new token
client = self._get_sdk_client()
if stream:
return self._handle_streaming_request(client, request_params, model)
else:
response = await client.chat.completions.create(**request_params)
self.record_success()
# Convert to dict (same as above)
openai_response = {
'id': response.id,
'object': 'chat.completion',
'created': response.created,
'model': f'{self.provider_id}/{model}',
'choices': [
{
'index': choice.index,
'message': {
'role': choice.message.role,
'content': choice.message.content,
},
'finish_reason': choice.finish_reason
}
for choice in response.choices
],
'usage': {
'prompt_tokens': response.usage.prompt_tokens if response.usage else 0,
'completion_tokens': response.usage.completion_tokens if response.usage else 0,
'total_tokens': response.usage.total_tokens if response.usage else 0,
}
}
for i, choice in enumerate(response.choices):
if choice.message.tool_calls:
openai_response['choices'][i]['message']['tool_calls'] = [
{
'id': tc.id,
'type': tc.type,
'function': {
'name': tc.function.name,
'arguments': tc.function.arguments
}
}
for tc in choice.message.tool_calls
]
return openai_response
else:
logger.error("QwenProviderHandler: Token refresh failed")
self.record_failure()
raise
async def _handle_streaming_request(self, client, request_params: Dict, model: str):
"""Handle streaming request using OpenAI SDK."""
import logging
logger = logging.getLogger(__name__)
logger.info("QwenProviderHandler: Starting streaming request")
try:
stream = await client.chat.completions.create(**request_params)
completion_id = f"qwen-{int(time.time())}"
created_time = int(time.time())
async for chunk in stream:
# Convert SDK chunk to OpenAI format
openai_chunk = {
'id': chunk.id or completion_id,
'object': 'chat.completion.chunk',
'created': chunk.created or created_time,
'model': f'{self.provider_id}/{model}',
'choices': []
}
for choice in chunk.choices:
choice_dict = {
'index': choice.index,
'delta': {},
'finish_reason': choice.finish_reason
}
if choice.delta.role:
choice_dict['delta']['role'] = choice.delta.role
if choice.delta.content:
choice_dict['delta']['content'] = choice.delta.content
if choice.delta.tool_calls:
choice_dict['delta']['tool_calls'] = [
{
'index': tc.index,
'id': tc.id,
'type': tc.type,
'function': {
'name': tc.function.name if tc.function else None,
'arguments': tc.function.arguments if tc.function else None
}
}
for tc in choice.delta.tool_calls
]
openai_chunk['choices'].append(choice_dict)
yield f"data: {json.dumps(openai_chunk, ensure_ascii=False)}\n\n".encode('utf-8')
# Send final [DONE] marker
yield b"data: [DONE]\n\n"
self.record_success()
logger.info("QwenProviderHandler: Streaming completed successfully")
except Exception as e:
logger.error(f"QwenProviderHandler: Streaming error: {e}", exc_info=True)
self.record_failure()
raise
async def get_models(self) -> List[Model]:
"""Return list of available Qwen models."""
import logging
logger = logging.getLogger(__name__)
logger.info("QwenProviderHandler: Fetching available models")
await self.apply_rate_limit()
try:
# Get SDK client with current OAuth token
client = self._get_sdk_client()
# List models using OpenAI SDK
models_response = await client.models.list()
models = []
for model_data in models_response.data:
model_id = model_data.id
# Extract context size if available
context_size = None
if hasattr(model_data, 'context_window'):
context_size = model_data.context_window
elif hasattr(model_data, 'max_model_len'):
context_size = model_data.max_model_len
models.append(Model(
id=model_id,
name=model_id,
provider_id=self.provider_id,
context_size=context_size,
context_length=context_size,
))
logger.debug(f"QwenProviderHandler: Found model: {model_id}")
if not models:
# Fallback to static model list
logger.warning("QwenProviderHandler: No models returned from API, using static list")
models = [
Model(id="qwen-plus", name="Qwen Plus", provider_id=self.provider_id, context_size=32000),
Model(id="qwen-turbo", name="Qwen Turbo", provider_id=self.provider_id, context_size=8000),
Model(id="qwen-max", name="Qwen Max", provider_id=self.provider_id, context_size=8000),
Model(id="coder-model", name="Qwen Coder", provider_id=self.provider_id, context_size=32000),
]
logger.info(f"QwenProviderHandler: Returning {len(models)} models")
return models
except Exception as e:
logger.error(f"QwenProviderHandler: Failed to fetch models: {e}", exc_info=True)
# Return static fallback list
logger.info("QwenProviderHandler: Using static fallback model list")
return [
Model(id="qwen-plus", name="Qwen Plus", provider_id=self.provider_id, context_size=32000),
Model(id="qwen-turbo", name="Qwen Turbo", provider_id=self.provider_id, context_size=8000),
Model(id="qwen-max", name="Qwen Max", provider_id=self.provider_id, context_size=8000),
Model(id="coder-model", name="Qwen Coder", provider_id=self.provider_id, context_size=32000),
]
......@@ -337,6 +337,59 @@
"credentials_file": "~/.aisbf/codex_credentials.json",
"issuer": "https://auth.openai.com"
}
},
"qwen": {
"id": "qwen",
"name": "Qwen (OAuth2)",
"endpoint": "https://dashscope.aliyuncs.com/compatible-mode/v1",
"type": "qwen",
"api_key_required": false,
"nsfw": false,
"privacy": false,
"rate_limit": 0,
"qwen_config": {
"_comment": "Uses OAuth2 Device Authorization Grant with PKCE (Qwen Code compatible)",
"credentials_file": "~/.aisbf/qwen_credentials.json",
"api_key": ""
},
"models": [
{
"name": "qwen-plus",
"nsfw": false,
"privacy": false,
"rate_limit": 0,
"max_request_tokens": 32000,
"context_size": 32000,
"capabilities": ["t2t", "function_calling"]
},
{
"name": "qwen-turbo",
"nsfw": false,
"privacy": false,
"rate_limit": 0,
"max_request_tokens": 8000,
"context_size": 8000,
"capabilities": ["t2t", "function_calling"]
},
{
"name": "qwen-max",
"nsfw": false,
"privacy": false,
"rate_limit": 0,
"max_request_tokens": 8000,
"context_size": 8000,
"capabilities": ["t2t", "function_calling"]
},
{
"name": "coder-model",
"nsfw": false,
"privacy": false,
"rate_limit": 0,
"max_request_tokens": 32000,
"context_size": 32000,
"capabilities": ["t2t", "function_calling"]
}
]
}
}
}
......@@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"
[project]
name = "aisbf"
version = "0.9.8"
version = "0.99.0"
description = "AISBF - AI Service Broker Framework || AI Should Be Free - A modular proxy server for managing multiple AI provider integrations"
readme = "README.md"
license = "GPL-3.0-or-later"
......
# Qwen Code OAuth2 Flow Analysis
> **Purpose**: This document provides a comprehensive analysis of the Qwen Code OAuth2 initialization flow, token management, and API request patterns. It includes Python reimplementation examples for building an independent OAuth2 client.
---
## Table of Contents
1. [Overview](#overview)
2. [OAuth2 Constants and Configuration](#oauth2-constants-and-configuration)
3. [PKCE (Proof Key for Code Exchange)](#pkce-proof-key-for-code-exchange)
4. [Device Authorization Flow](#device-authorization-flow)
5. [Token Polling](#token-polling)
6. [Token Refresh](#token-refresh)
7. [Credential Storage](#credential-storage)
8. [SharedTokenManager - Cross-Process Token Synchronization](#sharedtokenmanager---cross-process-token-synchronization)
9. [Making API Requests with OAuth Tokens](#making-api-requests-with-oauth-tokens)
10. [Complete Python Implementation](#complete-python-implementation)
11. [Error Handling and Edge Cases](#error-handling-and-edge-cases)
---
## Overview
Qwen Code uses the **OAuth 2.0 Device Authorization Grant** flow ([RFC 8628](https://datatracker.ietf.org/doc/html/rfc8628)) combined with **PKCE** ([RFC 7636](https://datatracker.ietf.org/doc/html/rfc7636)) for secure authentication. This flow is designed for devices that cannot easily open a browser or handle redirects.
### Key Characteristics
- **Grant Type**: `urn:ietf:params:oauth:grant-type:device_code`
- **PKCE Method**: S256 (SHA-256 code challenge)
- **Token Endpoint**: `https://chat.qwen.ai/api/v1/oauth2/token`
- **Device Code Endpoint**: `https://chat.qwen.ai/api/v1/oauth2/device/code`
- **Client ID**: `f0304373b74a44d2b584a3fb70ca9e56`
- **Scopes**: `openid profile email model.completion`
### Flow Summary
1. Generate PKCE code verifier and challenge
2. Request a device code from the authorization server
3. Display the authorization URL to the user
4. Poll the token endpoint until the user approves
5. Store credentials to disk (`~/.qwen/oauth_creds.json`)
6. Use the access token for API requests
7. Refresh the token when it expires (30-second buffer before expiry)
---
## OAuth2 Constants and Configuration
These are the fixed constants used throughout the OAuth2 flow:
```typescript
// Source: packages/core/src/qwen/qwenOAuth2.ts
const QWEN_OAUTH_BASE_URL = 'https://chat.qwen.ai';
const QWEN_OAUTH_DEVICE_CODE_ENDPOINT = `${QWEN_OAUTH_BASE_URL}/api/v1/oauth2/device/code`;
const QWEN_OAUTH_TOKEN_ENDPOINT = `${QWEN_OAUTH_BASE_URL}/api/v1/oauth2/token`;
const QWEN_OAUTH_CLIENT_ID = 'f0304373b74a44d2b584a3fb70ca9e56';
const QWEN_OAUTH_SCOPE = 'openid profile email model.completion';
const QWEN_OAUTH_GRANT_TYPE = 'urn:ietf:params:oauth:grant-type:device_code';
```
### API Endpoints
| Endpoint | URL |
|----------|-----|
| Base URL | `https://chat.qwen.ai` |
| Device Code | `https://chat.qwen.ai/api/v1/oauth2/device/code` |
| Token | `https://chat.qwen.ai/api/v1/oauth2/token` |
### Default Model
- **Model ID**: `coder-model` (maps to `qwen3.6-plus` internally)
- **Default API Base URL**: `https://dashscope.aliyuncs.com/compatible-mode/v1`
---
## PKCE (Proof Key for Code Exchange)
PKCE adds an extra layer of security by generating a code verifier and its SHA-256 hash (code challenge).
### TypeScript Implementation
```typescript
// Source: packages/core/src/qwen/qwenOAuth2.ts (lines 47-77)
import crypto from 'crypto';
export function generateCodeVerifier(): string {
return crypto.randomBytes(32).toString('base64url');
}
export function generateCodeChallenge(codeVerifier: string): string {
const hash = crypto.createHash('sha256');
hash.update(codeVerifier);
return hash.digest('base64url');
}
export function generatePKCEPair(): {
code_verifier: string;
code_challenge: string;
} {
const codeVerifier = generateCodeVerifier();
const codeChallenge = generateCodeChallenge(codeVerifier);
return { code_verifier: codeVerifier, code_challenge: codeChallenge };
}
```
### Python Implementation
```python
import base64
import hashlib
import secrets
def generate_code_verifier() -> str:
"""Generate a random code verifier for PKCE (43-128 characters)."""
# 32 random bytes = 256 bits, base64url encoded = 43 characters
return base64.urlsafe_b64encode(secrets.token_bytes(32)).rstrip(b"=").decode("ascii")
def generate_code_challenge(code_verifier: str) -> str:
"""Generate a code challenge from a code verifier using SHA-256."""
sha256_hash = hashlib.sha256(code_verifier.encode("ascii")).digest()
return base64.urlsafe_b64encode(sha256_hash).rstrip(b"=").decode("ascii")
def generate_pkce_pair() -> tuple[str, str]:
"""Generate PKCE code verifier and challenge pair.
Returns:
Tuple of (code_verifier, code_challenge)
"""
code_verifier = generate_code_verifier()
code_challenge = generate_code_challenge(code_verifier)
return code_verifier, code_challenge
```
---
## Device Authorization Flow
The device authorization flow begins by requesting a device code from the server.
### Request
```http
POST /api/v1/oauth2/device/code HTTP/1.1
Host: chat.qwen.ai
Content-Type: application/x-www-form-urlencoded
Accept: application/json
x-request-id: <UUID>
client_id=f0304373b74a44d2b584a3fb70ca9e56&scope=openid+profile+email+model.completion&code_challenge=<CHALLENGE>&code_challenge_method=S256
```
### Response (Success)
```json
{
"device_code": "abc123...",
"user_code": "ABCD-1234",
"verification_uri": "https://chat.qwen.ai/api/v1/oauth2/device/verify",
"verification_uri_complete": "https://chat.qwen.ai/api/v1/oauth2/device/verify?user_code=ABCD-1234",
"expires_in": 600,
"interval": 5
}
```
### TypeScript Implementation
```typescript
// Source: packages/core/src/qwen/qwenOAuth2.ts (lines 291-332)
async requestDeviceAuthorization(options: {
scope: string;
code_challenge: string;
code_challenge_method: string;
}): Promise<DeviceAuthorizationResponse> {
const bodyData = {
client_id: QWEN_OAUTH_CLIENT_ID,
scope: options.scope,
code_challenge: options.code_challenge,
code_challenge_method: options.code_challenge_method,
};
const response = await fetch(QWEN_OAUTH_DEVICE_CODE_ENDPOINT, {
method: 'POST',
headers: {
'Content-Type': 'application/x-www-form-urlencoded',
Accept: 'application/json',
'x-request-id': randomUUID(),
},
body: objectToUrlEncoded(bodyData),
});
if (!response.ok) {
const errorData = await response.text();
throw new Error(
`Device authorization failed: ${response.status} ${response.statusText}. Response: ${errorData}`,
);
}
const result = (await response.json()) as DeviceAuthorizationResponse;
if (!isDeviceAuthorizationSuccess(result)) {
const errorData = result as ErrorData;
throw new Error(
`Device authorization failed: ${errorData?.error || 'Unknown error'} - ${errorData?.error_description || 'No details provided'}`,
);
}
return result;
}
```
### Python Implementation
```python
import uuid
import urllib.parse
import urllib.request
import json
from dataclasses import dataclass
@dataclass
class DeviceAuthorizationData:
device_code: str
user_code: str
verification_uri: str
verification_uri_complete: str
expires_in: int
interval: int = 5
@dataclass
class ErrorData:
error: str
error_description: str
def url_encode(data: dict[str, str]) -> str:
"""Convert a dictionary to URL-encoded form data."""
return urllib.parse.urlencode(data)
async def request_device_authorization(
code_challenge: str,
scope: str = "openid profile email model.completion",
client_id: str = "f0304373b74a44d2b584a3fb70ca9e56",
base_url: str = "https://chat.qwen.ai",
) -> DeviceAuthorizationData:
"""Request device authorization from the Qwen OAuth2 server.
Args:
code_challenge: PKCE code challenge (S256).
scope: OAuth2 scopes to request.
client_id: OAuth2 client ID.
base_url: Base URL of the OAuth2 server.
Returns:
DeviceAuthorizationData on success.
Raises:
Exception: If the request fails.
"""
endpoint = f"{base_url}/api/v1/oauth2/device/code"
body_data = {
"client_id": client_id,
"scope": scope,
"code_challenge": code_challenge,
"code_challenge_method": "S256",
}
request = urllib.request.Request(
endpoint,
data=url_encode(body_data).encode("utf-8"),
headers={
"Content-Type": "application/x-www-form-urlencoded",
"Accept": "application/json",
"x-request-id": str(uuid.uuid4()),
},
method="POST",
)
try:
with urllib.request.urlopen(request) as response:
result = json.loads(response.read().decode("utf-8"))
except urllib.error.HTTPError as e:
error_body = e.read().decode("utf-8")
raise Exception(
f"Device authorization failed: {e.code} {e.reason}. Response: {error_body}"
) from e
if "device_code" not in result:
error = result.get("error", "Unknown error")
description = result.get("error_description", "No details provided")
raise Exception(f"Device authorization failed: {error} - {description}")
return DeviceAuthorizationData(
device_code=result["device_code"],
user_code=result["user_code"],
verification_uri=result["verification_uri"],
verification_uri_complete=result["verification_uri_complete"],
expires_in=result["expires_in"],
interval=result.get("interval", 5),
)
```
---
## Token Polling
After obtaining the device code, the client polls the token endpoint until the user approves the authorization.
### Request
```http
POST /api/v1/oauth2/token HTTP/1.1
Host: chat.qwen.ai
Content-Type: application/x-www-form-urlencoded
Accept: application/json
grant_type=urn:ietf:params:oauth:grant-type:device_code&client_id=f0304373b74a44d2b584a3fb70ca9e56&device_code=<DEVICE_CODE>&code_verifier=<CODE_VERIFIER>
```
### Response (Success)
```json
{
"access_token": "eyJ...",
"refresh_token": "eyJ...",
"token_type": "Bearer",
"expires_in": 7200,
"scope": "openid profile email model.completion",
"resource_url": "https://dashscope.aliyuncs.com/compatible-mode/v1"
}
```
### Response (Pending - User has not approved yet)
```json
{
"error": "authorization_pending",
"error_description": "The user has not yet approved the authorization request"
}
```
### Response (Slow Down - Polling too frequently)
```json
{
"error": "slow_down",
"error_description": "The client is polling too quickly"
}
```
### TypeScript Implementation
```typescript
// Source: packages/core/src/qwen/qwenOAuth2.ts (lines 334-399)
async pollDeviceToken(options: {
device_code: string;
code_verifier: string;
}): Promise<DeviceTokenResponse> {
const bodyData = {
grant_type: QWEN_OAUTH_GRANT_TYPE,
client_id: QWEN_OAUTH_CLIENT_ID,
device_code: options.device_code,
code_verifier: options.code_verifier,
};
const response = await fetch(QWEN_OAUTH_TOKEN_ENDPOINT, {
method: 'POST',
headers: {
'Content-Type': 'application/x-www-form-urlencoded',
Accept: 'application/json',
},
body: objectToUrlEncoded(bodyData),
});
if (!response.ok) {
const responseText = await response.text();
let errorData: ErrorData | null = null;
try {
errorData = JSON.parse(responseText) as ErrorData;
} catch (_parseError) {
const error = new Error(
`Device token poll failed: ${response.status} ${response.statusText}. Response: ${responseText}`,
);
(error as Error & { status?: number }).status = response.status;
throw error;
}
// OAuth RFC 8628: authorization_pending means continue polling
if (response.status === 400 && errorData.error === 'authorization_pending') {
return { status: 'pending' } as DeviceTokenPendingData;
}
// OAuth RFC 8628: slow_down means increase polling interval
if (response.status === 429 && errorData.error === 'slow_down') {
return { status: 'pending', slowDown: true } as DeviceTokenPendingData;
}
// Other 400 errors are real errors
const error = new Error(
`Device token poll failed: ${errorData.error || 'Unknown error'} - ${errorData.error_description}`,
);
(error as Error & { status?: number }).status = response.status;
throw error;
}
return (await response.json()) as DeviceTokenResponse;
}
```
### Python Implementation
```python
import time
from dataclasses import dataclass, field
from typing import Optional
@dataclass
class DeviceTokenData:
access_token: str
refresh_token: Optional[str]
token_type: str
expires_in: int
scope: Optional[str] = None
resource_url: Optional[str] = None
@dataclass
class DeviceTokenPendingData:
status: str = "pending"
slow_down: bool = False
async def poll_device_token(
device_code: str,
code_verifier: str,
client_id: str = "f0304373b74a44d2b584a3fb70ca9e56",
base_url: str = "https://chat.qwen.ai",
max_attempts: int = 300,
initial_interval: float = 2.0,
max_interval: float = 10.0,
) -> DeviceTokenData:
"""Poll the token endpoint until the user approves the authorization.
Args:
device_code: The device code from the authorization response.
code_verifier: The PKCE code verifier.
client_id: OAuth2 client ID.
base_url: Base URL of the OAuth2 server.
max_attempts: Maximum number of polling attempts.
initial_interval: Initial polling interval in seconds.
max_interval: Maximum polling interval in seconds.
Returns:
DeviceTokenData on success.
Raises:
Exception: If polling fails or times out.
"""
endpoint = f"{base_url}/api/v1/oauth2/token"
poll_interval = initial_interval
for attempt in range(max_attempts):
body_data = {
"grant_type": "urn:ietf:params:oauth:grant-type:device_code",
"client_id": client_id,
"device_code": device_code,
"code_verifier": code_verifier,
}
request = urllib.request.Request(
endpoint,
data=url_encode(body_data).encode("utf-8"),
headers={
"Content-Type": "application/x-www-form-urlencoded",
"Accept": "application/json",
},
method="POST",
)
try:
with urllib.request.urlopen(request) as response:
result = json.loads(response.read().decode("utf-8"))
except urllib.error.HTTPError as e:
error_body = e.read().decode("utf-8")
# Try to parse as JSON for standard OAuth errors
try:
error_data = json.loads(error_body)
except json.JSONDecodeError:
raise Exception(
f"Device token poll failed: {e.code} {e.reason}. Response: {error_body}"
) from e
# authorization_pending: continue polling
if e.code == 400 and error_data.get("error") == "authorization_pending":
print(f"Polling... (attempt {attempt + 1}/{max_attempts})")
time.sleep(poll_interval)
continue
# slow_down: increase polling interval
if e.code == 429 and error_data.get("error") == "slow_down":
poll_interval = min(poll_interval * 1.5, max_interval)
print(f"Server requested slow down, increasing interval to {poll_interval:.1f}s")
time.sleep(poll_interval)
continue
# Other errors
error = error_data.get("error", "Unknown error")
description = error_data.get("error_description", "No details provided")
raise Exception(f"Device token poll failed: {error} - {description}")
# Success - check if we got a token
if "access_token" in result and result["access_token"]:
return DeviceTokenData(
access_token=result["access_token"],
refresh_token=result.get("refresh_token"),
token_type=result.get("token_type", "Bearer"),
expires_in=result.get("expires_in", 7200),
scope=result.get("scope"),
resource_url=result.get("resource_url"),
)
# Unexpected response
raise Exception(f"Unexpected token response: {result}")
raise Exception("Authorization timeout: user did not approve the request in time")
```
---
## Token Refresh
When the access token expires, the client uses the refresh token to obtain a new one.
### Request
```http
POST /api/v1/oauth2/token HTTP/1.1
Host: chat.qwen.ai
Content-Type: application/x-www-form-urlencoded
Accept: application/json
grant_type=refresh_token&refresh_token=<REFRESH_TOKEN>&client_id=f0304373b74a44d2b584a3fb70ca9e56
```
### Response (Success)
```json
{
"access_token": "eyJ...",
"token_type": "Bearer",
"expires_in": 7200,
"refresh_token": "eyJ...",
"resource_url": "https://dashscope.aliyuncs.com/compatible-mode/v1"
}
```
### Response (Error - Refresh Token Expired)
```json
{
"error": "invalid_grant",
"error_description": "The refresh token is invalid or expired"
}
```
### TypeScript Implementation
```typescript
// Source: packages/core/src/qwen/qwenOAuth2.ts (lines 401-463)
async refreshAccessToken(): Promise<TokenRefreshResponse> {
if (!this.credentials.refresh_token) {
throw new Error('No refresh token available');
}
const bodyData = {
grant_type: 'refresh_token',
refresh_token: this.credentials.refresh_token,
client_id: QWEN_OAUTH_CLIENT_ID,
};
const response = await fetch(QWEN_OAUTH_TOKEN_ENDPOINT, {
method: 'POST',
headers: {
'Content-Type': 'application/x-www-form-urlencoded',
Accept: 'application/json',
},
body: objectToUrlEncoded(bodyData),
});
if (!response.ok) {
const errorData = await response.text();
// Handle 400 errors which might indicate refresh token expiry
if (response.status === 400) {
await clearQwenCredentials();
throw new CredentialsClearRequiredError(
"Refresh token expired or invalid. Please use '/auth' to re-authenticate.",
{ status: response.status, response: errorData },
);
}
throw new Error(
`Token refresh failed: ${response.status} ${response.statusText}. Response: ${errorData}`,
);
}
const responseData = (await response.json()) as TokenRefreshResponse;
if (isErrorResponse(responseData)) {
const errorData = responseData as ErrorData;
throw new Error(
`Token refresh failed: ${errorData?.error || 'Unknown error'} - ${errorData?.error_description || 'No details provided'}`,
);
}
const tokenData = responseData as TokenRefreshData;
const tokens: QwenCredentials = {
access_token: tokenData.access_token,
token_type: tokenData.token_type,
refresh_token: tokenData.refresh_token || this.credentials.refresh_token,
resource_url: tokenData.resource_url,
expiry_date: Date.now() + tokenData.expires_in * 1000,
};
this.setCredentials(tokens);
return responseData;
}
```
### Python Implementation
```python
@dataclass
class QwenCredentials:
access_token: Optional[str] = None
refresh_token: Optional[str] = None
id_token: Optional[str] = None
expiry_date: Optional[int] = None # Unix timestamp in milliseconds
token_type: Optional[str] = None
resource_url: Optional[str] = None
async def refresh_access_token(
refresh_token: str,
client_id: str = "f0304373b74a44d2b584a3fb70ca9e56",
base_url: str = "https://chat.qwen.ai",
) -> QwenCredentials:
"""Refresh an access token using a refresh token.
Args:
refresh_token: The refresh token to use.
client_id: OAuth2 client ID.
base_url: Base URL of the OAuth2 server.
Returns:
QwenCredentials with the refreshed tokens.
Raises:
Exception: If the refresh fails.
"""
endpoint = f"{base_url}/api/v1/oauth2/token"
body_data = {
"grant_type": "refresh_token",
"refresh_token": refresh_token,
"client_id": client_id,
}
request = urllib.request.Request(
endpoint,
data=url_encode(body_data).encode("utf-8"),
headers={
"Content-Type": "application/x-www-form-urlencoded",
"Accept": "application/json",
},
method="POST",
)
try:
with urllib.request.urlopen(request) as response:
result = json.loads(response.read().decode("utf-8"))
except urllib.error.HTTPError as e:
error_body = e.read().decode("utf-8")
# 400 error typically means refresh token is expired/invalid
if e.code == 400:
raise Exception(
"Refresh token expired or invalid. Please re-authenticate."
) from e
raise Exception(
f"Token refresh failed: {e.code} {e.reason}. Response: {error_body}"
) from e
if "error" in result:
raise Exception(
f"Token refresh failed: {result['error']} - {result.get('error_description', 'No details provided')}"
)
return QwenCredentials(
access_token=result["access_token"],
token_type=result.get("token_type", "Bearer"),
refresh_token=result.get("refresh_token", refresh_token),
resource_url=result.get("resource_url"),
expiry_date=int(time.time() * 1000) + result.get("expires_in", 7200) * 1000,
)
```
---
## Credential Storage
Credentials are stored in a JSON file at `~/.qwen/oauth_creds.json`.
### File Format
```json
{
"access_token": "eyJhbGciOiJSUzI1NiIs...",
"refresh_token": "eyJhbGciOiJSUzI1NiIs...",
"token_type": "Bearer",
"expiry_date": 1712345678901,
"resource_url": "https://dashscope.aliyuncs.com/compatible-mode/v1"
}
```
### Python Implementation
```python
import os
import json
from pathlib import Path
CREDENTIAL_FILENAME = "oauth_creds.json"
QWEN_DIR = ".qwen"
def get_credential_path() -> Path:
"""Get the path to the credentials file."""
return Path.home() / QWEN_DIR / CREDENTIAL_FILENAME
def save_credentials(credentials: QwenCredentials) -> None:
"""Save credentials to the credentials file.
Args:
credentials: The credentials to save.
"""
file_path = get_credential_path()
file_path.parent.mkdir(parents=True, exist_ok=True, mode=0o700)
# Write to a temporary file first, then rename for atomicity
temp_path = file_path.with_suffix(".tmp")
with open(temp_path, "w", encoding="utf-8") as f:
json.dump(
{
"access_token": credentials.access_token,
"refresh_token": credentials.refresh_token,
"token_type": credentials.token_type,
"expiry_date": credentials.expiry_date,
"resource_url": credentials.resource_url,
},
f,
indent=2,
)
# Atomic rename
temp_path.rename(file_path)
# Set restrictive permissions
os.chmod(file_path, 0o600)
def load_credentials() -> Optional[QwenCredentials]:
"""Load credentials from the credentials file.
Returns:
QwenCredentials if valid credentials exist, None otherwise.
"""
file_path = get_credential_path()
if not file_path.exists():
return None
try:
with open(file_path, "r", encoding="utf-8") as f:
data = json.load(f)
# Validate required fields
required_fields = ["access_token", "refresh_token", "token_type", "expiry_date"]
for field in required_fields:
if field not in data or not data[field]:
return None
return QwenCredentials(
access_token=data["access_token"],
refresh_token=data["refresh_token"],
token_type=data["token_type"],
expiry_date=data["expiry_date"],
resource_url=data.get("resource_url"),
)
except (json.JSONDecodeError, KeyError, TypeError):
return None
def clear_credentials() -> None:
"""Delete the credentials file if it exists."""
file_path = get_credential_path()
if file_path.exists():
file_path.unlink()
```
---
## SharedTokenManager - Cross-Process Token Synchronization
The `SharedTokenManager` is a critical component that ensures OAuth tokens are synchronized across multiple processes. This prevents race conditions when multiple instances of Qwen Code are running simultaneously.
### Key Features
1. **Singleton Pattern**: Single instance per process
2. **Memory Cache**: In-memory cache with file modification time tracking
3. **File-Based Locking**: Distributed lock using `oauth_creds.lock` file
4. **Automatic Reload**: Detects when another process has refreshed the token
5. **Token Validation**: Checks expiry with a 30-second buffer
6. **Atomic File Operations**: Uses temp file + rename for safe writes
### Constants
```typescript
// Source: packages/core/src/qwen/sharedTokenManager.ts
const TOKEN_REFRESH_BUFFER_MS = 30 * 1000; // 30 seconds before expiry
const LOCK_TIMEOUT_MS = 10000; // 10 seconds lock timeout
const CACHE_CHECK_INTERVAL_MS = 5000; // 5 seconds between file checks
```
### Lock Acquisition
The lock file is created atomically using the `wx` flag (exclusive write). If the lock is older than 10 seconds, it's considered stale and removed.
### Python Implementation
```python
import fcntl
import time
import uuid
from pathlib import Path
from typing import Optional
LOCK_FILENAME = "oauth_creds.lock"
TOKEN_REFRESH_BUFFER_MS = 30_000 # 30 seconds
LOCK_TIMEOUT_MS = 10_000 # 10 seconds
CACHE_CHECK_INTERVAL_MS = 5_000 # 5 seconds
class SharedTokenManager:
"""Manages OAuth tokens across multiple processes using file-based caching and locking."""
_instance: Optional["SharedTokenManager"] = None
def __init__(self):
self._credentials: Optional[QwenCredentials] = None
self._file_mod_time: float = 0
self._last_check: float = 0
self._lock_path = Path.home() / QWEN_DIR / LOCK_FILENAME
@classmethod
def get_instance(cls) -> "SharedTokenManager":
"""Get the singleton instance."""
if cls._instance is None:
cls._instance = cls()
return cls._instance
def _get_credential_path(self) -> Path:
return Path.home() / QWEN_DIR / CREDENTIAL_FILENAME
def _is_token_valid(self, credentials: QwenCredentials) -> bool:
"""Check if the token is valid and not expired (with buffer)."""
if not credentials.expiry_date or not credentials.access_token:
return False
now_ms = int(time.time() * 1000)
return now_ms < credentials.expiry_date - TOKEN_REFRESH_BUFFER_MS
def _acquire_lock(self, max_attempts: int = 20, initial_interval: float = 0.1) -> bool:
"""Acquire a file lock to prevent concurrent token refreshes.
Returns:
True if lock acquired, False otherwise.
"""
lock_id = str(uuid.uuid4())
current_interval = initial_interval
for _ in range(max_attempts):
try:
# Try to create lock file atomically (exclusive mode)
with open(self._lock_path, "x") as f:
f.write(lock_id)
return True
except FileExistsError:
# Lock file exists, check if stale
try:
stat = self._lock_path.stat()
lock_age = time.time() - stat.st_mtime
if lock_age > LOCK_TIMEOUT_MS / 1000:
# Remove stale lock
self._lock_path.unlink()
continue
except (OSError, FileNotFoundError):
# Lock might have been removed by another process
continue
time.sleep(current_interval)
current_interval = min(current_interval * 1.5, 2.0) # Exponential backoff
return False
def _release_lock(self) -> None:
"""Release the file lock."""
try:
self._lock_path.unlink()
except FileNotFoundError:
pass # Lock already removed
def check_and_reload(self) -> None:
"""Check if the credentials file was updated by another process and reload."""
now = time.time()
# Limit check frequency
if now - self._last_check < CACHE_CHECK_INTERVAL_MS / 1000:
return
self._last_check = now
file_path = self._get_credential_path()
try:
stat = file_path.stat()
file_mod_time = stat.st_mtime
if file_mod_time > self._file_mod_time:
# File has been modified, reload
creds = load_credentials()
if creds:
self._credentials = creds
self._file_mod_time = file_mod_time
except FileNotFoundError:
self._file_mod_time = 0
def get_valid_credentials(
self,
refresh_func: Optional[callable] = None,
force_refresh: bool = False,
) -> QwenCredentials:
"""Get valid OAuth credentials, refreshing if necessary.
Args:
refresh_func: Function to call for refreshing tokens.
force_refresh: If True, refresh even if token is still valid.
Returns:
Valid QwenCredentials.
Raises:
Exception: If unable to obtain valid credentials.
"""
# Check if file was updated by another process
self.check_and_reload()
# Return cached credentials if valid
if not force_refresh and self._credentials and self._is_token_valid(self._credentials):
return self._credentials
# Check if we have a refresh token
if not self._credentials or not self._credentials.refresh_token:
raise Exception("No refresh token available")
# Acquire lock for refresh
if not self._acquire_lock():
raise Exception("Failed to acquire lock for token refresh")
try:
# Double-check after acquiring lock
self.check_and_reload()
if not force_refresh and self._credentials and self._is_token_valid(self._credentials):
return self._credentials
# Perform refresh
if refresh_func:
new_creds = refresh_func(self._credentials.refresh_token)
self._credentials = new_creds
save_credentials(new_creds)
return new_creds
raise Exception("No refresh function provided")
finally:
self._release_lock()
def clear_cache(self) -> None:
"""Clear all cached data."""
self._credentials = None
self._file_mod_time = 0
self._last_check = 0
```
---
## Making API Requests with OAuth Tokens
The `QwenContentGenerator` class handles API requests with automatic credential management and retry logic.
### Flow
1. Get valid credentials from `SharedTokenManager`
2. Set the access token as the `apiKey` on the OpenAI client
3. Set the `resource_url` as the `baseURL` (default: `https://dashscope.aliyuncs.com/compatible-mode/v1`)
4. Make the API request
5. If a 401/403 error occurs, force-refresh the token and retry
### TypeScript Implementation
```typescript
// Source: packages/core/src/qwen/qwenContentGenerator.ts (lines 87-150)
private async getValidToken(): Promise<{ token: string; endpoint: string }> {
const credentials = await this.sharedManager.getValidCredentials(this.qwenClient);
if (!credentials.access_token) {
throw new Error('No access token available');
}
return {
token: credentials.access_token,
endpoint: this.getCurrentEndpoint(credentials.resource_url),
};
}
private async executeWithCredentialManagement<T>(
operation: () => Promise<T>,
): Promise<T> {
const attemptOperation = async (): Promise<T> => {
const { token, endpoint } = await this.getValidToken();
// Apply dynamic configuration
this.pipeline.client.apiKey = token;
this.pipeline.client.baseURL = endpoint;
return await operation();
};
try {
return await attemptOperation();
} catch (error) {
if (this.isAuthError(error)) {
// Force refresh and retry
await this.sharedManager.getValidCredentials(this.qwenClient, true);
return await attemptOperation();
}
throw error;
}
}
override async generateContent(
request: GenerateContentParameters,
userPromptId: string,
): Promise<GenerateContentResponse> {
return this.executeWithCredentialManagement(() =>
super.generateContent(request, userPromptId),
);
}
```
### DashScope Provider Headers
```typescript
// Source: packages/core/src/core/openaiContentGenerator/provider/dashscope.ts (lines 40-54)
override buildHeaders(): Record<string, string | undefined> {
const version = this.cliConfig.getCliVersion() || 'unknown';
const userAgent = `QwenCode/${version} (${process.platform}; ${process.arch})`;
const { authType, customHeaders } = this.contentGeneratorConfig;
return {
'User-Agent': userAgent,
'X-DashScope-CacheControl': 'enable',
'X-DashScope-UserAgent': userAgent,
'X-DashScope-AuthType': authType, // 'qwen-oauth'
...customHeaders,
};
}
```
### Python Implementation
```python
import urllib.request
import json
from typing import Optional
DEFAULT_DASHSCOPE_BASE_URL = "https://dashscope.aliyuncs.com/compatible-mode/v1"
class QwenAPIClient:
"""API client for making requests to the Qwen/DashScope API with OAuth tokens."""
def __init__(
self,
token_manager: SharedTokenManager,
refresh_func: callable,
model: str = "coder-model",
base_url: str = DEFAULT_DASHSCOPE_BASE_URL,
):
self._token_manager = token_manager
self._refresh_func = refresh_func
self._model = model
self._base_url = base_url
self._token: Optional[str] = None
self._endpoint: Optional[str] = None
def _get_valid_token_and_endpoint(self) -> tuple[str, str]:
"""Get a valid token and endpoint, refreshing if necessary."""
credentials = self._token_manager.get_valid_credentials(
refresh_func=self._refresh_func
)
if not credentials.access_token:
raise Exception("No access token available")
# Normalize endpoint
endpoint = credentials.resource_url or self._base_url
if not endpoint.startswith("http"):
endpoint = f"https://{endpoint}"
if not endpoint.endswith("/v1"):
endpoint = f"{endpoint}/v1"
return credentials.access_token, endpoint
def _is_auth_error(self, error: Exception) -> bool:
"""Check if an error is related to authentication."""
error_msg = str(error).lower()
return any(
keyword in error_msg
for keyword in [
"401",
"403",
"unauthorized",
"forbidden",
"invalid api key",
"invalid access token",
"token expired",
"authentication",
"access denied",
]
)
def _make_request(
self,
endpoint: str,
token: str,
payload: dict,
) -> dict:
"""Make an API request with the given token."""
url = f"{endpoint}/chat/completions"
request = urllib.request.Request(
url,
data=json.dumps(payload).encode("utf-8"),
headers={
"Content-Type": "application/json",
"Authorization": f"Bearer {token}",
"User-Agent": "QwenCode/1.0.0 (linux; x86_64)",
"X-DashScope-CacheControl": "enable",
"X-DashScope-AuthType": "qwen-oauth",
},
method="POST",
)
try:
with urllib.request.urlopen(request) as response:
return json.loads(response.read().decode("utf-8"))
except urllib.error.HTTPError as e:
error_body = e.read().decode("utf-8")
raise Exception(
f"API request failed: {e.code} {e.reason}. Response: {error_body}"
) from e
def generate_content(
self,
messages: list[dict],
max_retries: int = 1,
) -> dict:
"""Generate content using the Qwen API.
Args:
messages: List of message objects in OpenAI format.
max_retries: Number of retries on auth errors.
Returns:
API response as a dictionary.
"""
payload = {
"model": self._model,
"messages": messages,
}
for attempt in range(max_retries + 1):
try:
token, endpoint = self._get_valid_token_and_endpoint()
return self._make_request(endpoint, token, payload)
except Exception as e:
if self._is_auth_error(e) and attempt < max_retries:
# Force refresh and retry
self._token_manager.clear_cache()
self._token_manager.get_valid_credentials(
refresh_func=self._refresh_func,
force_refresh=True,
)
else:
raise
def generate_content_stream(
self,
messages: list[dict],
):
"""Generate content with streaming.
Args:
messages: List of message objects in OpenAI format.
Yields:
Streaming response chunks.
"""
payload = {
"model": self._model,
"messages": messages,
"stream": True,
}
token, endpoint = self._get_valid_token_and_endpoint()
url = f"{endpoint}/chat/completions"
request = urllib.request.Request(
url,
data=json.dumps(payload).encode("utf-8"),
headers={
"Content-Type": "application/json",
"Authorization": f"Bearer {token}",
"User-Agent": "QwenCode/1.0.0 (linux; x86_64)",
"X-DashScope-CacheControl": "enable",
"X-DashScope-AuthType": "qwen-oauth",
},
method="POST",
)
with urllib.request.urlopen(request) as response:
for line in response:
line = line.decode("utf-8").strip()
if line.startswith("data: "):
data = line[6:]
if data == "[DONE]":
break
yield json.loads(data)
```
---
## Complete Python Implementation
Here is a complete, self-contained Python implementation of the Qwen OAuth2 flow:
```python
"""
Qwen OAuth2 Client - Complete Implementation
This module implements the full OAuth2 Device Authorization Grant flow
for Qwen Code, including PKCE, token management, and API requests.
Usage:
from qwen_oauth import QwenOAuthClient
client = QwenOAuthClient()
client.authenticate() # Interactive browser flow
# Make API requests
response = client.generate_content([
{"role": "user", "content": "Hello, world!"}
])
print(response)
"""
import base64
import hashlib
import json
import os
import secrets
import time
import urllib.parse
import urllib.request
import uuid
from dataclasses import dataclass, field, asdict
from pathlib import Path
from typing import Optional, Generator
# ============================================================================
# Constants
# ============================================================================
QWEN_OAUTH_BASE_URL = "https://chat.qwen.ai"
QWEN_OAUTH_DEVICE_CODE_ENDPOINT = f"{QWEN_OAUTH_BASE_URL}/api/v1/oauth2/device/code"
QWEN_OAUTH_TOKEN_ENDPOINT = f"{QWEN_OAUTH_BASE_URL}/api/v1/oauth2/token"
QWEN_OAUTH_CLIENT_ID = "f0304373b74a44d2b584a3fb70ca9e56"
QWEN_OAUTH_SCOPE = "openid profile email model.completion"
QWEN_OAUTH_GRANT_TYPE = "urn:ietf:params:oauth:grant-type:device_code"
DEFAULT_DASHSCOPE_BASE_URL = "https://dashscope.aliyuncs.com/compatible-mode/v1"
QWEN_DIR = ".qwen"
CREDENTIAL_FILENAME = "oauth_creds.json"
LOCK_FILENAME = "oauth_creds.lock"
TOKEN_REFRESH_BUFFER_MS = 30_000 # 30 seconds
LOCK_TIMEOUT_MS = 10_000 # 10 seconds
CACHE_CHECK_INTERVAL_MS = 5_000 # 5 seconds
# ============================================================================
# Data Classes
# ============================================================================
@dataclass
class QwenCredentials:
access_token: Optional[str] = None
refresh_token: Optional[str] = None
id_token: Optional[str] = None
expiry_date: Optional[int] = None
token_type: Optional[str] = None
resource_url: Optional[str] = None
@dataclass
class DeviceAuthorizationData:
device_code: str
user_code: str
verification_uri: str
verification_uri_complete: str
expires_in: int
interval: int = 5
@dataclass
class DeviceTokenData:
access_token: str
refresh_token: Optional[str]
token_type: str
expires_in: int
scope: Optional[str] = None
resource_url: Optional[str] = None
# ============================================================================
# PKCE
# ============================================================================
def generate_code_verifier() -> str:
"""Generate a random code verifier for PKCE."""
return base64.urlsafe_b64encode(secrets.token_bytes(32)).rstrip(b"=").decode("ascii")
def generate_code_challenge(code_verifier: str) -> str:
"""Generate a code challenge from a code verifier using SHA-256."""
sha256_hash = hashlib.sha256(code_verifier.encode("ascii")).digest()
return base64.urlsafe_b64encode(sha256_hash).rstrip(b"=").decode("ascii")
# ============================================================================
# Utility Functions
# ============================================================================
def url_encode(data: dict[str, str]) -> str:
"""Convert a dictionary to URL-encoded form data."""
return urllib.parse.urlencode(data)
def get_credential_path() -> Path:
"""Get the path to the credentials file."""
return Path.home() / QWEN_DIR / CREDENTIAL_FILENAME
def get_lock_path() -> Path:
"""Get the path to the lock file."""
return Path.home() / QWEN_DIR / LOCK_FILENAME
# ============================================================================
# Credential Storage
# ============================================================================
def save_credentials(credentials: QwenCredentials) -> None:
"""Save credentials to disk atomically."""
file_path = get_credential_path()
file_path.parent.mkdir(parents=True, exist_ok=True, mode=0o700)
temp_path = file_path.with_suffix(".tmp")
with open(temp_path, "w", encoding="utf-8") as f:
json.dump(
{k: v for k, v in asdict(credentials).items() if v is not None},
f,
indent=2,
)
temp_path.rename(file_path)
os.chmod(file_path, 0o600)
def load_credentials() -> Optional[QwenCredentials]:
"""Load credentials from disk."""
file_path = get_credential_path()
if not file_path.exists():
return None
try:
with open(file_path, "r", encoding="utf-8") as f:
data = json.load(f)
required = ["access_token", "refresh_token", "token_type", "expiry_date"]
if not all(data.get(field) for field in required):
return None
return QwenCredentials(**{k: v for k, v in data.items() if k in QwenCredentials.__dataclass_fields__})
except (json.JSONDecodeError, TypeError):
return None
def clear_credentials() -> None:
"""Delete the credentials file."""
file_path = get_credential_path()
if file_path.exists():
file_path.unlink()
# ============================================================================
# SharedTokenManager
# ============================================================================
class SharedTokenManager:
"""Manages OAuth tokens across processes using file-based caching and locking."""
_instance: Optional["SharedTokenManager"] = None
def __init__(self):
self._credentials: Optional[QwenCredentials] = None
self._file_mod_time: float = 0
self._last_check: float = 0
@classmethod
def get_instance(cls) -> "SharedTokenManager":
if cls._instance is None:
cls._instance = cls()
return cls._instance
def _is_token_valid(self, creds: QwenCredentials) -> bool:
if not creds.expiry_date or not creds.access_token:
return False
return int(time.time() * 1000) < creds.expiry_date - TOKEN_REFRESH_BUFFER_MS
def _acquire_lock(self, max_attempts: int = 20) -> bool:
lock_id = str(uuid.uuid4())
interval = 0.1
for _ in range(max_attempts):
try:
with open(get_lock_path(), "x") as f:
f.write(lock_id)
return True
except FileExistsError:
try:
stat = get_lock_path().stat()
if time.time() - stat.st_mtime > LOCK_TIMEOUT_MS / 1000:
get_lock_path().unlink()
continue
except (OSError, FileNotFoundError):
continue
time.sleep(interval)
interval = min(interval * 1.5, 2.0)
return False
def _release_lock(self) -> None:
try:
get_lock_path().unlink()
except FileNotFoundError:
pass
def check_and_reload(self) -> None:
"""Reload credentials if the file was modified by another process."""
now = time.time()
if now - self._last_check < CACHE_CHECK_INTERVAL_MS / 1000:
return
self._last_check = now
file_path = get_credential_path()
try:
stat = file_path.stat()
if stat.st_mtime > self._file_mod_time:
creds = load_credentials()
if creds:
self._credentials = creds
self._file_mod_time = stat.st_mtime
except FileNotFoundError:
self._file_mod_time = 0
def get_valid_credentials(
self,
refresh_func: callable,
force_refresh: bool = False,
) -> QwenCredentials:
"""Get valid credentials, refreshing if necessary."""
self.check_and_reload()
if not force_refresh and self._credentials and self._is_token_valid(self._credentials):
return self._credentials
if not self._credentials or not self._credentials.refresh_token:
raise Exception("No refresh token available")
if not self._acquire_lock():
raise Exception("Failed to acquire lock for token refresh")
try:
self.check_and_reload()
if not force_refresh and self._credentials and self._is_token_valid(self._credentials):
return self._credentials
new_creds = refresh_func(self._credentials.refresh_token)
self._credentials = new_creds
save_credentials(new_creds)
return new_creds
finally:
self._release_lock()
def clear_cache(self) -> None:
self._credentials = None
self._file_mod_time = 0
self._last_check = 0
# ============================================================================
# OAuth2 Client
# ============================================================================
class QwenOAuthClient:
"""Complete Qwen OAuth2 client implementation."""
def __init__(self):
self._token_manager = SharedTokenManager.get_instance()
self._credentials: Optional[QwenCredentials] = None
def _refresh_token(self, refresh_token: str) -> QwenCredentials:
"""Refresh an access token."""
body_data = {
"grant_type": "refresh_token",
"refresh_token": refresh_token,
"client_id": QWEN_OAUTH_CLIENT_ID,
}
request = urllib.request.Request(
QWEN_OAUTH_TOKEN_ENDPOINT,
data=url_encode(body_data).encode("utf-8"),
headers={
"Content-Type": "application/x-www-form-urlencoded",
"Accept": "application/json",
},
method="POST",
)
try:
with urllib.request.urlopen(request) as response:
result = json.loads(response.read().decode("utf-8"))
except urllib.error.HTTPError as e:
error_body = e.read().decode("utf-8")
if e.code == 400:
clear_credentials()
raise Exception("Refresh token expired. Please re-authenticate.") from e
raise Exception(f"Token refresh failed: {e.code} {e.reason}") from e
if "error" in result:
raise Exception(f"Token refresh failed: {result['error']}")
return QwenCredentials(
access_token=result["access_token"],
token_type=result.get("token_type", "Bearer"),
refresh_token=result.get("refresh_token", refresh_token),
resource_url=result.get("resource_url"),
expiry_date=int(time.time() * 1000) + result.get("expires_in", 7200) * 1000,
)
def _request_device_authorization(self, code_challenge: str) -> DeviceAuthorizationData:
"""Request device authorization."""
body_data = {
"client_id": QWEN_OAUTH_CLIENT_ID,
"scope": QWEN_OAUTH_SCOPE,
"code_challenge": code_challenge,
"code_challenge_method": "S256",
}
request = urllib.request.Request(
QWEN_OAUTH_DEVICE_CODE_ENDPOINT,
data=url_encode(body_data).encode("utf-8"),
headers={
"Content-Type": "application/x-www-form-urlencoded",
"Accept": "application/json",
"x-request-id": str(uuid.uuid4()),
},
method="POST",
)
try:
with urllib.request.urlopen(request) as response:
result = json.loads(response.read().decode("utf-8"))
except urllib.error.HTTPError as e:
error_body = e.read().decode("utf-8")
raise Exception(f"Device authorization failed: {e.code} {e.reason}") from e
if "device_code" not in result:
raise Exception(f"Device authorization failed: {result.get('error', 'Unknown')}")
return DeviceAuthorizationData(
device_code=result["device_code"],
user_code=result["user_code"],
verification_uri=result["verification_uri"],
verification_uri_complete=result["verification_uri_complete"],
expires_in=result["expires_in"],
interval=result.get("interval", 5),
)
def _poll_device_token(self, device_code: str, code_verifier: str) -> DeviceTokenData:
"""Poll for the device token until approved."""
poll_interval = 2.0
max_attempts = 300
for attempt in range(max_attempts):
body_data = {
"grant_type": QWEN_OAUTH_GRANT_TYPE,
"client_id": QWEN_OAUTH_CLIENT_ID,
"device_code": device_code,
"code_verifier": code_verifier,
}
request = urllib.request.Request(
QWEN_OAUTH_TOKEN_ENDPOINT,
data=url_encode(body_data).encode("utf-8"),
headers={
"Content-Type": "application/x-www-form-urlencoded",
"Accept": "application/json",
},
method="POST",
)
try:
with urllib.request.urlopen(request) as response:
result = json.loads(response.read().decode("utf-8"))
except urllib.error.HTTPError as e:
error_body = e.read().decode("utf-8")
try:
error_data = json.loads(error_body)
except json.JSONDecodeError:
raise Exception(f"Token poll failed: {e.code} {e.reason}") from e
if e.code == 400 and error_data.get("error") == "authorization_pending":
time.sleep(poll_interval)
continue
if e.code == 429 and error_data.get("error") == "slow_down":
poll_interval = min(poll_interval * 1.5, 10.0)
time.sleep(poll_interval)
continue
raise Exception(f"Token poll failed: {error_data.get('error', 'Unknown')}") from e
if result.get("access_token"):
return DeviceTokenData(
access_token=result["access_token"],
refresh_token=result.get("refresh_token"),
token_type=result.get("token_type", "Bearer"),
expires_in=result.get("expires_in", 7200),
resource_url=result.get("resource_url"),
)
time.sleep(poll_interval)
raise Exception("Authorization timeout")
def authenticate(self) -> QwenCredentials:
"""Perform the full OAuth2 device authorization flow."""
# Try loading cached credentials first
cached = load_credentials()
if cached:
self._credentials = cached
self._token_manager._credentials = cached
self._token_manager._file_mod_time = get_credential_path().stat().st_mtime if get_credential_path().exists() else 0
if self._token_manager._is_token_valid(cached):
return cached
# PKCE
code_verifier = generate_code_verifier()
code_challenge = generate_code_challenge(code_verifier)
# Device authorization
device_auth = self._request_device_authorization(code_challenge)
# Display authorization URL
print(f"\n{'=' * 60}")
print(f"Qwen OAuth Device Authorization")
print(f"{'=' * 60}")
print(f"Please visit the following URL in your browser:")
print(f" {device_auth.verification_uri_complete}")
print(f"Waiting for authorization to complete...")
print(f"{'=' * 60}\n")
# Try to open browser
try:
import webbrowser
webbrowser.open(device_auth.verification_uri_complete)
except Exception:
pass
# Poll for token
token_data = self._poll_device_token(device_auth.device_code, code_verifier)
# Store credentials
self._credentials = QwenCredentials(
access_token=token_data.access_token,
refresh_token=token_data.refresh_token,
token_type=token_data.token_type,
resource_url=token_data.resource_url,
expiry_date=int(time.time() * 1000) + token_data.expires_in * 1000,
)
save_credentials(self._credentials)
self._token_manager.clear_cache()
print("Authentication successful!")
return self._credentials
def get_access_token(self) -> str:
"""Get a valid access token, refreshing if necessary."""
creds = self._token_manager.get_valid_credentials(
refresh_func=self._refresh_token
)
return creds.access_token
# ============================================================================
# API Client
# ============================================================================
class QwenAPIClient:
"""API client for making requests to the Qwen/DashScope API."""
def __init__(
self,
oauth_client: QwenOAuthClient,
model: str = "coder-model",
):
self._oauth_client = oauth_client
self._model = model
def _get_endpoint(self, resource_url: Optional[str] = None) -> str:
endpoint = resource_url or DEFAULT_DASHSCOPE_BASE_URL
if not endpoint.startswith("http"):
endpoint = f"https://{endpoint}"
if not endpoint.endswith("/v1"):
endpoint = f"{endpoint}/v1"
return endpoint
def _make_request(
self,
endpoint: str,
token: str,
payload: dict,
) -> dict:
"""Make an API request."""
url = f"{endpoint}/chat/completions"
request = urllib.request.Request(
url,
data=json.dumps(payload).encode("utf-8"),
headers={
"Content-Type": "application/json",
"Authorization": f"Bearer {token}",
"User-Agent": "QwenCode/1.0.0 (linux; x86_64)",
"X-DashScope-CacheControl": "enable",
"X-DashScope-AuthType": "qwen-oauth",
},
method="POST",
)
try:
with urllib.request.urlopen(request) as response:
return json.loads(response.read().decode("utf-8"))
except urllib.error.HTTPError as e:
error_body = e.read().decode("utf-8")
raise Exception(f"API request failed: {e.code} {e.reason}. Response: {error_body}") from e
def _is_auth_error(self, error: Exception) -> bool:
error_msg = str(error).lower()
return any(
kw in error_msg
for kw in ["401", "403", "unauthorized", "forbidden", "invalid", "token expired"]
)
def generate_content(
self,
messages: list[dict],
max_retries: int = 1,
) -> dict:
"""Generate content with automatic credential management and retry."""
payload = {
"model": self._model,
"messages": messages,
}
for attempt in range(max_retries + 1):
try:
token = self._oauth_client.get_access_token()
creds = SharedTokenManager.get_instance()._credentials
endpoint = self._get_endpoint(creds.resource_url if creds else None)
return self._make_request(endpoint, token, payload)
except Exception as e:
if self._is_auth_error(e) and attempt < max_retries:
SharedTokenManager.get_instance().clear_cache()
self._oauth_client.get_access_token()
else:
raise
def generate_content_stream(
self,
messages: list[dict],
) -> Generator[dict, None, None]:
"""Generate content with streaming."""
payload = {
"model": self._model,
"messages": messages,
"stream": True,
}
token = self._oauth_client.get_access_token()
creds = SharedTokenManager.get_instance()._credentials
endpoint = self._get_endpoint(creds.resource_url if creds else None)
url = f"{endpoint}/chat/completions"
request = urllib.request.Request(
url,
data=json.dumps(payload).encode("utf-8"),
headers={
"Content-Type": "application/json",
"Authorization": f"Bearer {token}",
"User-Agent": "QwenCode/1.0.0 (linux; x86_64)",
"X-DashScope-CacheControl": "enable",
"X-DashScope-AuthType": "qwen-oauth",
},
method="POST",
)
with urllib.request.urlopen(request) as response:
for line in response:
line = line.decode("utf-8").strip()
if line.startswith("data: "):
data = line[6:]
if data == "[DONE]":
break
yield json.loads(data)
# ============================================================================
# Example Usage
# ============================================================================
if __name__ == "__main__":
# Step 1: Authenticate
oauth_client = QwenOAuthClient()
credentials = oauth_client.authenticate()
print(f"Access Token: {credentials.access_token[:20]}...")
print(f"Token Type: {credentials.token_type}")
print(f"Resource URL: {credentials.resource_url}")
# Step 2: Make API requests
api_client = QwenAPIClient(oauth_client, model="coder-model")
response = api_client.generate_content([
{"role": "user", "content": "Hello! What can you do?"}
])
print("\nAPI Response:")
for choice in response.get("choices", []):
print(choice.get("message", {}).get("content", ""))
```
---
## Error Handling and Edge Cases
### 1. Credentials Clear Required Error
When a 400 error is returned during token refresh, it indicates the refresh token has expired. The client must clear all credentials and require re-authentication.
```python
class CredentialsClearRequiredError(Exception):
"""Thrown when a refresh token is expired or invalid."""
pass
# In refresh flow:
if response.status == 400:
clear_credentials()
raise CredentialsClearRequiredError(
"Refresh token expired or invalid. Please re-authenticate."
)
```
### 2. Quota Exceeded Errors
Qwen OAuth has a free tier quota of 1,000 requests/day and 60 requests/minute. When exceeded, the API returns a specific error message.
```python
def is_quota_exceeded_error(error: Exception) -> bool:
"""Check if an error is a quota exceeded error."""
error_msg = str(error).lower()
return any(
kw in error_msg
for kw in ["insufficient_quota", "free allocated quota exceeded", "quota exceeded"]
)
```
### 3. Rate Limiting (429)
The device token polling endpoint may return 429 with `slow_down` error. The client should increase the polling interval.
```python
if response.status == 429 and error_data.get("error") == "slow_down":
poll_interval = min(poll_interval * 1.5, 10.0) # Max 10 seconds
```
### 4. Cross-Process Token Synchronization
When multiple instances of the client run simultaneously, the `SharedTokenManager` ensures only one process refreshes the token at a time using file-based locking.
### 5. Token Expiry Buffer
Tokens are considered "expired" 30 seconds before their actual expiry time to prevent race conditions during API requests.
```python
TOKEN_REFRESH_BUFFER_MS = 30_000 # 30 seconds
def is_token_valid(credentials: QwenCredentials) -> bool:
if not credentials.expiry_date:
return False
return time.time() * 1000 < credentials.expiry_date - TOKEN_REFRESH_BUFFER_MS
```
---
## Summary
The Qwen Code OAuth2 flow consists of:
1. **PKCE-based Device Authorization Grant** - Secure authentication without browser redirects
2. **File-based Credential Storage** - Tokens stored in `~/.qwen/oauth_creds.json`
3. **SharedTokenManager** - Cross-process token synchronization with file locking
4. **Dynamic Token Injection** - Access tokens are injected into API requests at request time
5. **Automatic Retry** - 401/403 errors trigger a token refresh and retry
6. **DashScope API** - OpenAI-compatible API endpoint with custom headers
The complete Python implementation above provides a drop-in replacement for the TypeScript OAuth2 client, suitable for use in any Python-based application.
\ No newline at end of file
......@@ -49,7 +49,7 @@ class InstallCommand(_install):
setup(
name="aisbf",
version="0.9.8",
version="0.99.0",
author="AISBF Contributors",
author_email="stefy@nexlab.net",
description="AISBF - AI Service Broker Framework || AI Should Be Free - A modular proxy server for managing multiple AI provider integrations",
......@@ -123,6 +123,7 @@ setup(
'aisbf/providers/kilo.py',
'aisbf/providers/ollama.py',
'aisbf/providers/codex.py',
'aisbf/providers/qwen.py',
]),
# aisbf.providers.kiro subpackage
('share/aisbf/aisbf/providers/kiro', [
......@@ -141,6 +142,7 @@ setup(
'aisbf/auth/claude.py',
'aisbf/auth/kilo.py',
'aisbf/auth/codex.py',
'aisbf/auth/qwen.py',
]),
# Install dashboard templates
('share/aisbf/templates', [
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment