feat(auth): add get_valid_token_with_refresh to KiloOAuth2

parent 507b6150
...@@ -479,6 +479,25 @@ class KiloOAuth2: ...@@ -479,6 +479,25 @@ class KiloOAuth2:
return self.credentials.get("access") return self.credentials.get("access")
async def get_valid_token_with_refresh(self) -> Optional[str]:
"""
Get a valid access token, attempting refresh if expired.
Note: Kilo uses long-lived tokens (1 year) with the same value for
access and refresh. There is no separate refresh endpoint - when tokens
expire, users must complete device flow again.
Returns:
Access token string or None if expired/not authenticated
"""
self._load_credentials()
if self.credentials and self.credentials.get('expires', 0) > time.time():
return self.credentials.get('access')
logger.error("KiloOAuth2: Token expired, re-authentication required")
return None
def is_authenticated(self) -> bool: def is_authenticated(self) -> bool:
"""Check if user is authenticated with valid token.""" """Check if user is authenticated with valid token."""
# get_valid_token() already handles credential reloading # get_valid_token() already handles credential reloading
......
import pytest
import time
import json
import tempfile
import os
from aisbf.auth.kilo import KiloOAuth2
@pytest.fixture
def temp_credentials_file():
"""Create a temporary credentials file."""
fd, path = tempfile.mkstemp(suffix='.json')
os.close(fd)
yield path
if os.path.exists(path):
os.remove(path)
@pytest.fixture
def valid_credentials():
"""Valid credentials with future expiry."""
return {
"type": "oauth",
"access": "valid_access_token",
"refresh": "valid_refresh_token",
"expires": int(time.time()) + 3600,
"userEmail": "test@example.com"
}
@pytest.fixture
def expired_credentials():
"""Expired credentials."""
return {
"type": "oauth",
"access": "expired_access_token",
"refresh": "expired_refresh_token",
"expires": int(time.time()) - 3600,
"userEmail": "test@example.com"
}
@pytest.mark.asyncio
async def test_get_valid_token_with_refresh_returns_valid_token(temp_credentials_file, valid_credentials):
"""Test that get_valid_token_with_refresh returns token when valid."""
with open(temp_credentials_file, 'w') as f:
json.dump(valid_credentials, f)
oauth2 = KiloOAuth2(credentials_file=temp_credentials_file)
token = await oauth2.get_valid_token_with_refresh()
assert token == "valid_access_token"
@pytest.mark.asyncio
async def test_get_valid_token_with_refresh_returns_none_when_expired(temp_credentials_file, expired_credentials):
"""Test that get_valid_token_with_refresh returns None when token expired."""
with open(temp_credentials_file, 'w') as f:
json.dump(expired_credentials, f)
oauth2 = KiloOAuth2(credentials_file=temp_credentials_file)
token = await oauth2.get_valid_token_with_refresh()
assert token is None
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment