fix: persist admin sessions for 30-day inactivity window

parent aac83493
......@@ -21,4 +21,5 @@ debug.log
# Test files
test_*.py
!tests/
!tests/test_admin_auth.py
!tests/test_whisper_server_local_models.py
......@@ -25,12 +25,25 @@ import threading
import time
from pathlib import Path
from typing import Any, Dict, Optional
from datetime import datetime, timedelta
from datetime import UTC, datetime, timedelta
SECRET_KEY_FILE = "secret_key"
def utc_now() -> datetime:
"""Return the current timezone-aware UTC datetime."""
return datetime.now(UTC)
def parse_session_timestamp(timestamp: str) -> datetime:
"""Parse persisted session timestamps, normalizing legacy naive values to UTC."""
parsed = datetime.fromisoformat(timestamp.replace('Z', '+00:00'))
if parsed.tzinfo is None:
return parsed.replace(tzinfo=UTC)
return parsed.astimezone(UTC)
def get_or_create_secret(config_dir: Path) -> bytes:
"""Get or create a secret key for session signing."""
secret_path = config_dir / SECRET_KEY_FILE
......@@ -107,10 +120,10 @@ def verify_password(password: str, password_hash: str) -> bool:
class SessionManager:
"""Manages user sessions."""
def __init__(self, config_dir: Path, session_timeout_minutes: int = 120):
def __init__(self, config_dir: Path, session_timeout_days: int = 30):
self.config_dir = config_dir
self.secret = get_or_create_secret(config_dir)
self.session_timeout = timedelta(minutes=session_timeout_minutes)
self.session_timeout = timedelta(days=session_timeout_days)
self._lock = threading.Lock()
def _load_auth_data(self) -> Dict[str, Any]:
......@@ -150,7 +163,7 @@ class SessionManager:
Session ID cookie value
"""
session_id = secrets.token_urlsafe(32)
expires_at = datetime.utcnow() + self.session_timeout
expires_at = utc_now() + self.session_timeout
auth_data = self._load_auth_data()
......@@ -158,7 +171,7 @@ class SessionManager:
sessions = auth_data.get("sessions", {})
sessions[session_id] = {
"username": username,
"created_at": datetime.utcnow().isoformat(),
"created_at": utc_now().isoformat(),
"expires_at": expires_at.isoformat()
}
auth_data["sessions"] = sessions
......@@ -198,8 +211,8 @@ class SessionManager:
return None
# Check expiration
expires_at = datetime.fromisoformat(session["expires_at"].replace('Z', '+00:00'))
if datetime.utcnow() > expires_at:
expires_at = parse_session_timestamp(session["expires_at"])
if utc_now() > expires_at:
# Clean up expired session
del sessions[session_id]
auth_data["sessions"] = sessions
......@@ -207,7 +220,7 @@ class SessionManager:
return None
# Extend session (sliding expiration)
new_expires = datetime.utcnow() + self.session_timeout
new_expires = utc_now() + self.session_timeout
session["expires_at"] = new_expires.isoformat()
auth_data["sessions"] = sessions
self._save_auth_data(auth_data)
......@@ -283,7 +296,7 @@ class SessionManager:
if user["username"] == username:
user["password_hash"] = hash_password(new_password)
user["must_change_password"] = False
user["last_changed_at"] = datetime.utcnow().isoformat()
user["last_changed_at"] = utc_now().isoformat()
self._save_auth_data(auth_data)
return True
......@@ -336,7 +349,7 @@ class SessionManager:
"username": username,
"password_hash": hash_password(password),
"role": role,
"created_at": datetime.utcnow().isoformat(),
"created_at": utc_now().isoformat(),
"must_change_password": False
}
......@@ -374,4 +387,4 @@ class SessionManager:
}
self._save_auth_data(auth_data)
return True
\ No newline at end of file
return True
......@@ -150,7 +150,7 @@ async def login(
httponly=True,
secure=False, # Set to True if using HTTPS
samesite="strict",
max_age=7200 # 2 hours
max_age=30 * 24 * 60 * 60
)
return response
......
from datetime import UTC, datetime, timedelta
import importlib.util
import json
import sys
AUTH_MODULE_PATH = "/storage/coderai/codai/admin/auth.py"
spec = importlib.util.spec_from_file_location("test_admin_auth_module", AUTH_MODULE_PATH)
auth_module = importlib.util.module_from_spec(spec)
sys.modules[spec.name] = auth_module
spec.loader.exec_module(auth_module)
SessionManager = auth_module.SessionManager
def _write_auth_file(config_dir, auth_data):
(config_dir / "auth.json").write_text(json.dumps(auth_data))
def _read_auth_file(config_dir):
return json.loads((config_dir / "auth.json").read_text())
def test_session_manager_defaults_to_thirty_day_sliding_timeout(tmp_path):
manager = SessionManager(tmp_path)
assert manager.session_timeout == timedelta(days=30)
def test_create_session_stores_timezone_aware_utc_timestamps(tmp_path):
manager = SessionManager(tmp_path)
cookie_value = manager.create_session("admin")
session_id = cookie_value.rsplit('.', 1)[0]
session = _read_auth_file(tmp_path)["sessions"][session_id]
created_at = datetime.fromisoformat(session["created_at"])
expires_at = datetime.fromisoformat(session["expires_at"])
assert created_at.tzinfo == UTC
assert expires_at.tzinfo == UTC
def test_validate_session_refreshes_expiration_by_thirty_days(tmp_path):
manager = SessionManager(tmp_path)
cookie_value = manager.create_session("admin")
session_id = cookie_value.rsplit('.', 1)[0]
before_validation = _read_auth_file(tmp_path)["sessions"][session_id]["expires_at"]
before_expiry = datetime.fromisoformat(before_validation)
username = manager.validate_session(cookie_value)
after_validation = _read_auth_file(tmp_path)["sessions"][session_id]["expires_at"]
after_expiry = datetime.fromisoformat(after_validation)
assert username == "admin"
assert after_expiry > before_expiry
assert after_expiry - datetime.now(UTC) > timedelta(days=29, hours=23)
def test_validate_session_rejects_and_cleans_up_expired_session(tmp_path):
manager = SessionManager(tmp_path)
cookie_value = manager.create_session("admin")
session_id = cookie_value.rsplit('.', 1)[0]
auth_data = _read_auth_file(tmp_path)
auth_data["sessions"][session_id]["expires_at"] = (datetime.now(UTC) - timedelta(seconds=1)).isoformat()
_write_auth_file(tmp_path, auth_data)
username = manager.validate_session(cookie_value)
stored_auth_data = _read_auth_file(tmp_path)
assert username is None
assert session_id not in stored_auth_data["sessions"]
def test_validate_session_accepts_legacy_naive_expiration_timestamp(tmp_path):
manager = SessionManager(tmp_path)
cookie_value = manager.create_session("admin")
session_id = cookie_value.rsplit('.', 1)[0]
auth_data = _read_auth_file(tmp_path)
auth_data["sessions"][session_id]["expires_at"] = (
datetime.now(UTC).replace(tzinfo=None) + timedelta(days=7)
).isoformat()
_write_auth_file(tmp_path, auth_data)
username = manager.validate_session(cookie_value)
stored_session = _read_auth_file(tmp_path)["sessions"][session_id]
refreshed_expiry = datetime.fromisoformat(stored_session["expires_at"])
assert username == "admin"
assert refreshed_expiry.tzinfo == UTC
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment