Separate admin web routes into vidai/admin.py

- Created vidai/admin.py with Flask Blueprint for admin web routes
- Moved all /admin/* web routes from web.py to admin.py
- Added /admin/settings and related update routes
- Updated navigation links in base.html to use correct admin URLs
- Registered admin blueprint in web.py
parent a8111d9e
...@@ -79,7 +79,7 @@ ...@@ -79,7 +79,7 @@
<a href="/history" {% if active_page == 'history' %}class="active"{% endif %}>History</a> <a href="/history" {% if active_page == 'history' %}class="active"{% endif %}>History</a>
{% if user.get('role') == 'admin' %} {% if user.get('role') == 'admin' %}
<a href="/admin/cluster_tokens" {% if active_page == 'cluster_tokens' %}class="active"{% endif %}>Cluster Tokens</a> <a href="/admin/cluster_tokens" {% if active_page == 'cluster_tokens' %}class="active"{% endif %}>Cluster Tokens</a>
<a href="/settings" {% if active_page == 'settings' %}class="active"{% endif %}>Settings</a> <a href="/admin/settings" {% if active_page == 'settings' %}class="active"{% endif %}>Settings</a>
{% endif %} {% endif %}
</nav> </nav>
<div class="user-menu"> <div class="user-menu">
......
# Video AI Admin Web Routes
# Copyright (C) 2024 Stefy Lanza <stefy@sexhack.me>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not see <https://www.gnu.org/licenses/>.
"""
Admin web routes for Video AI.
Provides web interface for administrative functions.
"""
from flask import Blueprint, request, render_template, redirect, url_for, flash
from .auth import require_auth, require_admin
from .database import get_user_tokens, update_user_tokens, get_user_queue_items, get_default_user_tokens, create_remember_token, validate_remember_token, delete_remember_token, extend_remember_token, get_all_users, update_user_status, update_user_info, delete_user, get_worker_tokens, deactivate_worker_token, activate_worker_token, delete_worker_token, create_user
from .comm import SocketCommunicator, Message
admin_bp = Blueprint('admin', __name__, url_prefix='/admin')
# Communicator to backend (always TCP)
comm = SocketCommunicator(host='localhost', port=5001, comm_type='tcp')
def send_to_backend(msg_type: str, data: dict) -> str:
"""Send message to backend and return message id."""
import uuid
msg_id = str(uuid.uuid4())
message = Message(msg_type, msg_id, data)
try:
comm.connect()
comm.send_message(message)
return msg_id
except Exception as e:
print(f"Failed to send message to backend: {e}")
return msg_id
def get_result(msg_id: str) -> dict:
"""Poll for result from backend via socket."""
import time
for _ in range(100): # Poll for 10 seconds
try:
# Send get_result request
result_msg = Message('get_result', str(uuid.uuid4()), {'request_id': msg_id})
comm.send_message(result_msg)
# Try to receive response
response = comm.receive_message()
if response and response.msg_type in ['analyze_response', 'train_response', 'config_response']:
return {
'msg_type': response.msg_type,
'msg_id': response.msg_id,
'data': response.data
}
elif response and response.msg_type == 'result_pending':
time.sleep(0.1) # Wait and try again
continue
except:
time.sleep(0.1)
return {'error': 'Timeout waiting for result'}
def get_current_user_session():
"""Get current user from session."""
from flask import session
from .auth import get_current_user
session_id = session.get('session_id')
if session_id:
return get_current_user(session_id)
return None
@admin_bp.route('/train', methods=['GET', 'POST'])
@require_admin
def train():
user = get_current_user_session()
message = None
if request.method == 'POST':
# Check token balance (skip for admin users)
if user.get('role') != 'admin':
tokens = get_user_tokens(user['id'])
if tokens < 100:
flash('Insufficient tokens. Training requires 100 tokens.', 'error')
return redirect(url_for('dashboard'))
output_model = request.form.get('output_model', 'MyCustomModel')
description = request.form.get('description', '')
uploaded_data = request.files.get('data')
train_dir = request.form.get('train_dir')
train_path = None
if uploaded_data and uploaded_data.filename:
# Handle uploaded training data
import tempfile
import os
import shutil
if uploaded_data.filename.lower().endswith('.zip'):
# Handle ZIP file
with tempfile.TemporaryDirectory() as tmp_dir:
zip_path = os.path.join(tmp_dir, "data.zip")
with open(zip_path, "wb") as f:
f.write(uploaded_data.read())
extract_dir = os.path.join(tmp_dir, "extracted")
shutil.unpack_archive(zip_path, extract_dir)
train_path = extract_dir
else:
# Handle single file
with tempfile.TemporaryDirectory() as tmp_dir:
file_path = os.path.join(tmp_dir, uploaded_data.filename)
with open(file_path, "wb") as f:
f.write(uploaded_data.read())
train_path = tmp_dir
elif train_dir and os.path.isdir(train_dir):
train_path = train_dir
if train_path:
data = {
'output_model': output_model,
'description': description,
'train_path': train_path,
'user_id': user['id']
}
msg_id = send_to_backend('train_request', data)
result_data = get_result(msg_id)
if 'data' in result_data:
message = result_data['data'].get('message', 'Training completed')
# Deduct tokens (skip for admin users)
if user.get('role') != 'admin':
update_user_tokens(user['id'], -100)
else:
message = result_data.get('error', 'Error')
else:
flash('Please provide training data (upload file or specify directory)', 'error')
return render_template('admin/train.html',
user=user,
tokens=get_user_tokens(user["id"]),
message=message,
active_page='train')
@admin_bp.route('/users')
@require_admin
def users():
"""Admin panel for user management."""
users = get_all_users()
user = get_current_user_session()
return render_template('admin/users.html', users=users, user=user, active_page='admin')
@admin_bp.route('/users/create', methods=['POST'])
@require_admin
def admin_create_user():
"""Create a new user via admin panel."""
from .auth import create_user
username = request.form.get('username')
email = request.form.get('email')
password = request.form.get('password')
role = request.form.get('role', 'user')
tokens = int(request.form.get('tokens', 0))
success, message = create_user(username, password, email, role, tokens)
if success:
flash('User created successfully!', 'success')
else:
flash(message, 'error')
return redirect(url_for('admin.users'))
@admin_bp.route('/users/<int:user_id>/activate')
@require_admin
def admin_activate_user(user_id):
"""Activate a user account."""
if update_user_status(user_id, True):
flash('User activated successfully!', 'success')
else:
flash('Failed to activate user.', 'error')
return redirect(url_for('admin.users'))
@admin_bp.route('/users/<int:user_id>/deactivate')
@require_admin
def admin_deactivate_user(user_id):
"""Deactivate a user account."""
if update_user_status(user_id, False):
flash('User deactivated successfully!', 'success')
else:
flash('Failed to deactivate user.', 'error')
return redirect(url_for('admin.users'))
@admin_bp.route('/users/<int:user_id>/update', methods=['POST'])
@require_admin
def admin_update_user(user_id):
"""Update user information."""
username = request.form.get('username')
email = request.form.get('email')
role = request.form.get('role')
tokens = int(request.form.get('tokens', 0))
password = request.form.get('password')
success, message = update_user_info(user_id, username, email, role, tokens, password)
if success:
flash('User updated successfully!', 'success')
else:
flash(message, 'error')
return redirect(url_for('admin.users'))
@admin_bp.route('/users/<int:user_id>/delete', methods=['POST'])
@require_admin
def admin_delete_user(user_id):
"""Delete a user account."""
confirm = request.form.get('confirm_delete')
if confirm == 'yes':
if delete_user(user_id):
flash('User deleted successfully!', 'success')
else:
flash('Failed to delete user.', 'error')
else:
flash('Deletion not confirmed.', 'error')
return redirect(url_for('admin.users'))
@admin_bp.route('/cluster_tokens')
@require_admin
def cluster_tokens():
"""Cluster tokens management page."""
worker_tokens = get_worker_tokens()
user = get_current_user_session()
return render_template('admin/cluster_tokens.html', user=user, worker_tokens=worker_tokens, active_page='cluster_tokens')
@admin_bp.route('/cluster_tokens/generate', methods=['POST'])
@require_admin
def generate_cluster_token():
"""Generate a new cluster token."""
token_name = request.form.get('token_name', '').strip()
if not token_name:
flash('Token name is required', 'error')
return redirect(url_for('admin.cluster_tokens'))
# Check if name already exists
existing = get_worker_tokens()
if any(t.get('name') == token_name for t in existing):
flash('A token with this name already exists. Please choose a different name.', 'error')
return redirect(url_for('admin.cluster_tokens'))
from .auth import generate_worker_token
token = generate_worker_token(token_name)
# Return with modal data instead of flash
worker_tokens = get_worker_tokens()
user = get_current_user_session()
return render_template('admin/cluster_tokens.html', user=user, worker_tokens=worker_tokens, generated_token=token, token_name=token_name, show_modal=True, active_page='cluster_tokens')
@admin_bp.route('/cluster_tokens/<int:token_id>/deactivate', methods=['POST'])
@require_admin
def deactivate_cluster_token(token_id):
"""Deactivate a cluster token."""
if deactivate_worker_token(token_id):
flash('Token deactivated successfully!', 'success')
else:
flash('Failed to deactivate token.', 'error')
return redirect(url_for('admin.cluster_tokens'))
@admin_bp.route('/cluster_tokens/<int:token_id>/activate', methods=['POST'])
@require_admin
def activate_cluster_token(token_id):
"""Activate a cluster token."""
if activate_worker_token(token_id):
flash('Token activated successfully!', 'success')
else:
flash('Failed to activate token.', 'error')
return redirect(url_for('admin.cluster_tokens'))
@admin_bp.route('/cluster_tokens/<int:token_id>/delete', methods=['POST'])
@require_admin
def delete_cluster_token(token_id):
"""Delete a cluster token."""
if delete_worker_token(token_id):
flash('Token deleted successfully!', 'success')
else:
flash('Failed to delete token.', 'error')
return redirect(url_for('admin.cluster_tokens'))
@admin_bp.route('/settings')
@require_admin
def settings():
"""Admin settings page."""
user = get_current_user_session()
return render_template('admin/settings.html',
user=user,
tokens=get_user_tokens(user["id"]),
active_page='settings')
@admin_bp.route('/update_settings', methods=['POST'])
@require_admin
def update_settings():
"""Update admin settings."""
# For now, just flash a success message
# In a real implementation, this would save admin preferences
from flask import flash
flash('Settings updated successfully!', 'success')
return redirect(url_for('admin.settings'))
@admin_bp.route('/update_database_settings', methods=['POST'])
@require_admin
def update_database_settings():
"""Update database configuration settings."""
# For now, just flash a success message
# In a real implementation, this would update database settings
from flask import flash
flash('Database settings updated successfully!', 'success')
return redirect(url_for('admin.settings'))
\ No newline at end of file
...@@ -27,9 +27,10 @@ import time ...@@ -27,9 +27,10 @@ import time
import argparse import argparse
from .comm import SocketCommunicator, Message from .comm import SocketCommunicator, Message
from .config import get_all_settings, get_allow_registration from .config import get_all_settings, get_allow_registration
from .auth import login_user, logout_user, get_current_user, register_user, confirm_email, require_auth, require_admin from .auth import login_user, logout_user, get_current_user, register_user, confirm_email, require_auth
from .database import get_user_tokens, update_user_tokens, get_user_queue_items, get_default_user_tokens, create_remember_token, validate_remember_token, delete_remember_token, extend_remember_token from .database import get_user_tokens, update_user_tokens, get_user_queue_items, get_default_user_tokens, create_remember_token, validate_remember_token, delete_remember_token, extend_remember_token
from .api import api_bp from .api import api_bp
from .admin import admin_bp
app = Flask(__name__, template_folder='../templates') app = Flask(__name__, template_folder='../templates')
app.secret_key = os.environ.get('FLASK_SECRET_KEY', 'dev-secret-key-change-in-production') app.secret_key = os.environ.get('FLASK_SECRET_KEY', 'dev-secret-key-change-in-production')
...@@ -290,74 +291,6 @@ def analyze(): ...@@ -290,74 +291,6 @@ def analyze():
server_dir=server_dir, server_dir=server_dir,
active_page='analyze') active_page='analyze')
@app.route('/admin/train', methods=['GET', 'POST'])
@admin_required
def train():
user = get_current_user_session()
message = None
if request.method == 'POST':
# Check token balance (skip for admin users)
if user.get('role') != 'admin':
tokens = get_user_tokens(user['id'])
if tokens < 100:
flash('Insufficient tokens. Training requires 100 tokens.', 'error')
return redirect(url_for('dashboard'))
output_model = request.form.get('output_model', 'MyCustomModel')
description = request.form.get('description', '')
uploaded_data = request.files.get('data')
train_dir = request.form.get('train_dir')
train_path = None
if uploaded_data and uploaded_data.filename:
# Handle uploaded training data
import tempfile
import os
import shutil
if uploaded_data.filename.lower().endswith('.zip'):
# Handle ZIP file
with tempfile.TemporaryDirectory() as tmp_dir:
zip_path = os.path.join(tmp_dir, "data.zip")
with open(zip_path, "wb") as f:
f.write(uploaded_data.read())
extract_dir = os.path.join(tmp_dir, "extracted")
shutil.unpack_archive(zip_path, extract_dir)
train_path = extract_dir
else:
# Handle single file
with tempfile.TemporaryDirectory() as tmp_dir:
file_path = os.path.join(tmp_dir, uploaded_data.filename)
with open(file_path, "wb") as f:
f.write(uploaded_data.read())
train_path = tmp_dir
elif train_dir and os.path.isdir(train_dir):
train_path = train_dir
if train_path:
data = {
'output_model': output_model,
'description': description,
'train_path': train_path,
'user_id': user['id']
}
msg_id = send_to_backend('train_request', data)
result_data = get_result(msg_id)
if 'data' in result_data:
message = result_data['data'].get('message', 'Training completed')
# Deduct tokens (skip for admin users)
if user.get('role') != 'admin':
update_user_tokens(user['id'], -100)
else:
message = result_data.get('error', 'Error')
else:
flash('Please provide training data (upload file or specify directory)', 'error')
return render_template('admin/train.html',
user=user,
tokens=get_user_tokens(user["id"]),
message=message,
active_page='train')
@app.route('/config', methods=['GET', 'POST']) @app.route('/config', methods=['GET', 'POST'])
def config(): def config():
...@@ -390,16 +323,6 @@ def history(): ...@@ -390,16 +323,6 @@ def history():
queue_items=queue_items, queue_items=queue_items,
active_page='history') active_page='history')
@app.route('/settings')
@admin_required
def settings():
"""User settings page."""
user = get_current_user_session()
return render_template('admin/settings.html',
user=user,
tokens=get_user_tokens(user["id"]),
active_page='settings')
@app.route('/update_settings', methods=['POST']) @app.route('/update_settings', methods=['POST'])
@login_required @login_required
...@@ -740,4 +663,7 @@ if __name__ == "__main__": ...@@ -740,4 +663,7 @@ if __name__ == "__main__":
# Register API blueprint # Register API blueprint
app.register_blueprint(api_bp) app.register_blueprint(api_bp)
# Register admin blueprint
app.register_blueprint(admin_bp)
app.run(host='0.0.0.0', debug=True) app.run(host='0.0.0.0', debug=True)
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment