Fix session expiration check to use SQL instead of Python time parsing

- Modified get_persistent_session to use SQL datetime functions for expiration check
- This ensures consistent time handling between database and application
- Fixes potential issues with time parsing
parent 3d875f24
......@@ -1355,32 +1355,38 @@ def create_persistent_session(user_id: int) -> str:
def get_persistent_session(session_id: str) -> Optional[Dict[str, Any]]:
"""Get persistent session from database."""
import time
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute('''
SELECT s.session_id, s.user_id, s.created_at, s.last_activity, u.id, u.username, u.email, u.role, u.active
FROM sessions s
JOIN users u ON s.user_id = u.id
WHERE s.session_id = ? AND u.active = 1
''', (session_id,))
# Check if session exists and is not expired (1 hour), then update last_activity
config = get_db_config()
if config['type'] == 'mysql':
cursor.execute('''
SELECT s.session_id, s.user_id, u.id, u.username, u.email, u.role, u.active
FROM sessions s
JOIN users u ON s.user_id = u.id
WHERE s.session_id = ? AND u.active = 1
AND s.last_activity > DATE_SUB(NOW(), INTERVAL 1 HOUR)
''', (session_id,))
else:
cursor.execute('''
SELECT s.session_id, s.user_id, u.id, u.username, u.email, u.role, u.active
FROM sessions s
JOIN users u ON s.user_id = u.id
WHERE s.session_id = ? AND u.active = 1
AND s.last_activity > datetime('now', '-1 hour')
''', (session_id,))
row = cursor.fetchone()
if row:
# Update last activity
cursor.execute('UPDATE sessions SET last_activity = CURRENT_TIMESTAMP WHERE session_id = ?', (session_id,))
if config['type'] == 'mysql':
cursor.execute('UPDATE sessions SET last_activity = NOW() WHERE session_id = ?', (session_id,))
else:
cursor.execute('UPDATE sessions SET last_activity = CURRENT_TIMESTAMP WHERE session_id = ?', (session_id,))
conn.commit()
# Check if session is expired (1 hour)
import time
last_activity = time.mktime(time.strptime(str(row['last_activity']), '%Y-%m-%d %H:%M:%S'))
if time.time() - last_activity > 3600: # 1 hour
cursor.execute('DELETE FROM sessions WHERE session_id = ?', (session_id,))
conn.commit()
conn.close()
return None
user = {
'id': row['id'],
'username': row['username'],
......@@ -1390,6 +1396,10 @@ def get_persistent_session(session_id: str) -> Optional[Dict[str, Any]]:
}
conn.close()
return {'user': user}
# If session expired or not found, clean it up
cursor.execute('DELETE FROM sessions WHERE session_id = ?', (session_id,))
conn.commit()
conn.close()
return None
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment