Implement JWT mapping tokens for shorter API tokens

- Add jwt_token column to user_api_tokens table
- Modify create_user_api_token to generate short tokens with JWT backend
- Update validate_user_api_token to lookup and validate JWTs
- Maintain JWT security while providing short, usable tokens
parent 5f00f0ae
...@@ -381,6 +381,10 @@ def init_db(conn) -> None: ...@@ -381,6 +381,10 @@ def init_db(conn) -> None:
cursor.execute('ALTER TABLE user_api_tokens ADD COLUMN last_used TIMESTAMP') cursor.execute('ALTER TABLE user_api_tokens ADD COLUMN last_used TIMESTAMP')
except sqlite3.OperationalError: except sqlite3.OperationalError:
pass # Column already exists pass # Column already exists
try:
cursor.execute('ALTER TABLE user_api_tokens ADD COLUMN jwt_token TEXT')
except sqlite3.OperationalError:
pass # Column already exists
# Insert default admin user if not exist # Insert default admin user if not exist
import hashlib import hashlib
...@@ -1104,55 +1108,72 @@ def create_user_api_token(user_id: int, name: str) -> str: ...@@ -1104,55 +1108,72 @@ def create_user_api_token(user_id: int, name: str) -> str:
# Use a simple secret key (in production, use environment variable) # Use a simple secret key (in production, use environment variable)
secret_key = os.environ.get('JWT_SECRET_KEY', 'vidai-jwt-secret-key-change-in-production') secret_key = os.environ.get('JWT_SECRET_KEY', 'vidai-jwt-secret-key-change-in-production')
token = jwt.encode(payload, secret_key, algorithm='HS256') jwt_token = jwt.encode(payload, secret_key, algorithm='HS256')
# Generate short public token
short_token = secrets.token_urlsafe(16)
conn = get_db_connection() conn = get_db_connection()
cursor = conn.cursor() cursor = conn.cursor()
cursor.execute(''' cursor.execute('''
INSERT INTO user_api_tokens (user_id, name, token, created_at, active) INSERT INTO user_api_tokens (user_id, name, token, jwt_token, created_at, active)
VALUES (?, ?, ?, datetime('now'), 1) VALUES (?, ?, ?, ?, datetime('now'), 1)
''', (user_id, name, token)) ''', (user_id, name, short_token, jwt_token))
conn.commit() conn.commit()
conn.close() conn.close()
return token return short_token
def validate_user_api_token(token: str) -> Optional[Dict[str, Any]]: def validate_user_api_token(token: str) -> Optional[Dict[str, Any]]:
"""Validate user API token and return user info.""" """Validate user API token and return user info."""
import jwt import jwt
# Find JWT by short token
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute('''
SELECT jwt_token FROM user_api_tokens
WHERE token = ? AND active = 1
''', (token,))
row = cursor.fetchone()
if not row:
conn.close()
return None
jwt_token = row['jwt_token']
# Decode JWT token # Decode JWT token
try: try:
secret_key = os.environ.get('JWT_SECRET_KEY', 'vidai-jwt-secret-key-change-in-production') secret_key = os.environ.get('JWT_SECRET_KEY', 'vidai-jwt-secret-key-change-in-production')
payload = jwt.decode(token, secret_key, algorithms=['HS256']) payload = jwt.decode(jwt_token, secret_key, algorithms=['HS256'])
user_id = payload['user_id'] user_id = payload['user_id']
token_id = payload['token_id'] token_id = payload['token_id']
except jwt.ExpiredSignatureError: except jwt.ExpiredSignatureError:
conn.close()
return None return None
except jwt.InvalidTokenError: except jwt.InvalidTokenError:
conn.close()
return None return None
conn = get_db_connection()
cursor = conn.cursor()
# Update last used timestamp and get user info # Update last used timestamp and get user info
cursor.execute(''' cursor.execute('''
UPDATE user_api_tokens SET last_used = datetime('now') UPDATE user_api_tokens SET last_used = datetime('now')
WHERE user_id = ? AND token = ? AND active = 1 WHERE token = ? AND active = 1
''', (user_id, token)) ''', (token,))
cursor.execute(''' cursor.execute('''
SELECT u.* FROM user_api_tokens t SELECT u.* FROM user_api_tokens t
JOIN users u ON t.user_id = u.id JOIN users u ON t.user_id = u.id
WHERE t.user_id = ? AND t.token = ? AND t.active = 1 AND u.active = 1 WHERE t.token = ? AND t.active = 1 AND u.active = 1
''', (user_id, token)) ''', (token,))
row = cursor.fetchone() user_row = cursor.fetchone()
conn.commit() conn.commit()
conn.close() conn.close()
return dict(row) if row else None return dict(user_row) if user_row else None
def get_user_api_tokens(user_id: int) -> List[Dict[str, Any]]: def get_user_api_tokens(user_id: int) -> List[Dict[str, Any]]:
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment