Separate API endpoints into vidai/api.py

- Created vidai/api.py with Flask Blueprint for all /api/* and /admin/api/* routes
- Moved API authentication decorators and helper functions to api.py
- Removed API routes from web.py and registered the API blueprint
- Set server_dir in api.py from web.py arguments
parent ca94f4bb
# Video AI API Endpoints
# Copyright (C) 2024 Stefy Lanza <stefy@sexhack.me>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
"""
API endpoints for Video AI.
Provides REST API access to analysis, training, and administrative functions.
"""
from flask import Blueprint, request, render_template, json
import os
import uuid
from .auth import login_user, logout_user, get_current_user, register_user, confirm_email, require_auth, require_admin
from .database import get_user_tokens, update_user_tokens, get_user_queue_items, get_default_user_tokens, create_remember_token, validate_remember_token, delete_remember_token, extend_remember_token
from .comm import SocketCommunicator, Message
api_bp = Blueprint('api', __name__)
# Global configuration
server_dir = None
# Communicator to backend (always TCP)
comm = SocketCommunicator(host='localhost', port=5001, comm_type='tcp')
def get_current_user_session():
"""Get current user from session."""
from flask import session
session_id = session.get('session_id')
if session_id:
return get_current_user(session_id)
return None
def api_auth_required(f):
"""Decorator to require authentication via session or API token."""
def decorated_function(*args, **kwargs):
# Check for session auth first
user = get_current_user_session()
if user:
request.api_user = user
return f(*args, **kwargs)
# Check for API token auth
auth_header = request.headers.get('Authorization')
if auth_header and auth_header.startswith('Bearer '):
token = auth_header.split(' ')[1]
from .database import validate_user_api_token
user = validate_user_api_token(token)
if user:
request.api_user = user
return f(*args, **kwargs)
# No valid auth
if request.is_json or request.path.startswith('/api/'):
return json.dumps({'error': 'Authentication required'}), 401
else:
from flask import redirect, url_for
return redirect(url_for('login'))
decorated_function.__name__ = f.__name__
return decorated_function
def admin_api_auth_required(f):
"""Decorator to require admin authentication via session or API token."""
def decorated_function(*args, **kwargs):
# Check for session auth first
user = get_current_user_session()
if user and user.get('role') == 'admin':
request.api_user = user
return f(*args, **kwargs)
# Check for API token auth
auth_header = request.headers.get('Authorization')
if auth_header and auth_header.startswith('Bearer '):
token = auth_header.split(' ')[1]
from .database import validate_user_api_token
user = validate_user_api_token(token)
if user and user.get('role') == 'admin':
request.api_user = user
return f(*args, **kwargs)
# No valid auth
if request.is_json or request.path.startswith('/api/'):
return json.dumps({'error': 'Admin authentication required'}), 401
else:
from flask import redirect, url_for, flash
flash('Admin access required', 'error')
return redirect(url_for('dashboard'))
decorated_function.__name__ = f.__name__
return decorated_function
def send_to_backend(msg_type: str, data: dict) -> str:
"""Send message to backend and return message id."""
msg_id = str(uuid.uuid4())
message = Message(msg_type, msg_id, data)
try:
comm.connect()
comm.send_message(message)
return msg_id
except Exception as e:
print(f"Failed to send message to backend: {e}")
return msg_id
def get_result(msg_id: str) -> dict:
"""Poll for result from backend via socket."""
import time
for _ in range(100): # Poll for 10 seconds
try:
# Send get_result request
result_msg = Message('get_result', str(uuid.uuid4()), {'request_id': msg_id})
comm.send_message(result_msg)
# Try to receive response
response = comm.receive_message()
if response and response.msg_type in ['analyze_response', 'train_response', 'config_response']:
return {
'msg_type': response.msg_type,
'msg_id': response.msg_id,
'data': response.data
}
elif response and response.msg_type == 'result_pending':
time.sleep(0.1) # Wait and try again
continue
except:
time.sleep(0.1)
return {'error': 'Timeout waiting for result'}
@api_bp.route('/api/stats')
@api_auth_required
def api_stats():
"""Get system stats for authenticated users."""
import psutil
import torch
import time
data = {'status': 'Idle'} # Simplified - in real implementation, get from backend
# GPU stats
if torch.cuda.is_available():
data['gpu_count'] = torch.cuda.device_count()
data['gpus'] = []
for i in range(torch.cuda.device_count()):
gpu = {
'name': torch.cuda.get_device_name(i),
'memory_used': torch.cuda.memory_allocated(i) / 1024**3, # GB
'memory_total': torch.cuda.get_device_properties(i).total_memory / 1024**3,
'utilization': 0 # Would need pynvml for actual utilization
}
data['gpus'].append(gpu)
else:
data['gpu_count'] = 0
# CPU and RAM
data['cpu_percent'] = psutil.cpu_percent()
ram = psutil.virtual_memory()
data['ram_used'] = ram.used / 1024**3
data['ram_total'] = ram.total / 1024**3
return json.dumps(data)
@api_bp.route('/api/analyze', methods=['POST'])
@api_auth_required
def api_analyze():
"""API endpoint for analysis using authentication."""
user = request.api_user
# Check token balance (skip for admin users)
if user.get('role') != 'admin':
tokens = get_user_tokens(user['id'])
if tokens <= 0:
return json.dumps({'error': 'Insufficient tokens'}), 402
# Process analysis request
model_path = request.json.get('model_path', 'Qwen/Qwen2.5-VL-7B-Instruct')
prompt = request.json.get('prompt', 'Describe this image.')
file_path = request.json.get('file_path')
interval = request.json.get('interval', 10)
if not file_path:
return json.dumps({'error': 'file_path is required'}), 400
# Send to backend for processing
data = {
'model_path': model_path,
'prompt': prompt,
'local_path': file_path,
'interval': interval,
'user_id': user['id']
}
msg_id = send_to_backend('analyze_request', data)
result_data = get_result(msg_id)
if 'data' in result_data:
result = result_data['data'].get('result', 'Analysis completed')
# Deduct tokens (skip for admin users)
if user.get('role') != 'admin':
update_user_tokens(user['id'], -10)
return json.dumps({
'result': result,
'tokens_used': 10,
'remaining_tokens': get_user_tokens(user['id'])
})
else:
error = result_data.get('error', 'Analysis failed')
return json.dumps({'error': error}), 500
@api_bp.route('/api/api_tokens')
@api_auth_required
def api_api_tokens():
"""API endpoint to list user's API tokens."""
user = request.api_user
from .database import get_user_api_tokens
user_tokens = get_user_api_tokens(user['id'])
# Return token data (without full token for security)
token_list = []
for t in user_tokens:
token_list.append({
'id': t['id'],
'name': t['name'],
'created_at': t['created_at'],
'last_used': t.get('last_used')
})
return json.dumps({'tokens': token_list})
@api_bp.route('/api')
@login_required
def api_docs():
"""API documentation page."""
user = get_current_user_session()
return render_template('api.html', user=user, active_page='api')
@api_bp.route('/admin/api/browse')
@admin_api_auth_required
def admin_api_browse():
"""Browse files in server directory (admin only)."""
path = request.args.get('path', '')
if not server_dir:
return json.dumps({'error': 'Server directory not configured'})
# Ensure path is within server_dir
full_path = os.path.join(server_dir, path)
full_path = os.path.abspath(full_path)
# Security check: ensure path is within server_dir
if not full_path.startswith(server_dir):
return json.dumps({'error': 'Access denied'})
if not os.path.exists(full_path):
return json.dumps({'error': 'Path not found'})
try:
items = []
if os.path.isdir(full_path):
for item in os.listdir(full_path):
item_path = os.path.join(full_path, item)
items.append({
'name': item,
'path': os.path.join(path, item) if path else item,
'is_dir': os.path.isdir(item_path),
'size': os.path.getsize(item_path) if os.path.isfile(item_path) else 0
})
else:
return json.dumps({'error': 'Not a directory'})
return json.dumps({
'current_path': path,
'items': items
})
except Exception as e:
return json.dumps({'error': str(e)})
@api_bp.route('/admin/api/train', methods=['POST'])
@admin_api_auth_required
def admin_api_train():
"""API endpoint for training (admin only)."""
user = request.api_user
# Check token balance (skip for admin users)
if user.get('role') != 'admin':
tokens = get_user_tokens(user['id'])
if tokens < 100:
return json.dumps({'error': 'Insufficient tokens. Training requires 100 tokens.'}), 402
# Process training request
output_model = request.json.get('output_model', 'MyCustomModel')
description = request.json.get('description', '')
train_path = request.json.get('train_path')
if not train_path or not os.path.exists(train_path):
return json.dumps({'error': 'Valid train_path is required'}), 400
# Send to backend for processing
data = {
'output_model': output_model,
'description': description,
'train_path': train_path,
'user_id': user['id']
}
msg_id = send_to_backend('train_request', data)
result_data = get_result(msg_id)
if 'data' in result_data:
message = result_data['data'].get('message', 'Training completed')
# Deduct tokens (skip for admin users)
if user.get('role') != 'admin':
update_user_tokens(user['id'], -100)
return json.dumps({
'message': message,
'tokens_used': 100,
'remaining_tokens': get_user_tokens(user['id'])
})
else:
error = result_data.get('error', 'Training failed')
return json.dumps({'error': error}), 500
@api_bp.route('/admin/api/users')
@admin_api_auth_required
def admin_api_users():
"""API endpoint to list all users (admin only)."""
from .database import get_all_users
users = get_all_users()
# Return user data without sensitive info like passwords
user_list = []
for u in users:
user_list.append({
'id': u['id'],
'username': u['username'],
'email': u['email'],
'role': u['role'],
'tokens': u['tokens'],
'active': u['active'],
'created_at': u['created_at']
})
return json.dumps({'users': user_list})
@api_bp.route('/admin/api/cluster_tokens')
@admin_api_auth_required
def admin_api_cluster_tokens():
"""API endpoint to list cluster tokens (admin only)."""
from .database import get_worker_tokens
worker_tokens = get_worker_tokens()
# Return token data
token_list = []
for t in worker_tokens:
token_list.append({
'id': t['id'],
'name': t['name'],
'token': t['token'], # Note: in real implementation, might not return full token
'active': t['active'],
'created_at': t['created_at']
})
return json.dumps({'tokens': token_list})
@api_bp.route('/admin/api')
@admin_required
def admin_api_docs():
"""Admin API documentation page."""
user = get_current_user_session()
return render_template('admin_api.html', user=user, active_page='admin_api')
\ No newline at end of file
...@@ -29,6 +29,7 @@ from .comm import SocketCommunicator, Message ...@@ -29,6 +29,7 @@ from .comm import SocketCommunicator, Message
from .config import get_all_settings, get_allow_registration from .config import get_all_settings, get_allow_registration
from .auth import login_user, logout_user, get_current_user, register_user, confirm_email, require_auth, require_admin from .auth import login_user, logout_user, get_current_user, register_user, confirm_email, require_auth, require_admin
from .database import get_user_tokens, update_user_tokens, get_user_queue_items, get_default_user_tokens, create_remember_token, validate_remember_token, delete_remember_token, extend_remember_token from .database import get_user_tokens, update_user_tokens, get_user_queue_items, get_default_user_tokens, create_remember_token, validate_remember_token, delete_remember_token, extend_remember_token
from .api import api_bp
app = Flask(__name__, template_folder='../templates') app = Flask(__name__, template_folder='../templates')
app.secret_key = os.environ.get('FLASK_SECRET_KEY', 'dev-secret-key-change-in-production') app.secret_key = os.environ.get('FLASK_SECRET_KEY', 'dev-secret-key-change-in-production')
...@@ -81,61 +82,6 @@ def admin_required(f): ...@@ -81,61 +82,6 @@ def admin_required(f):
decorated_function.__name__ = f.__name__ decorated_function.__name__ = f.__name__
return decorated_function return decorated_function
def api_auth_required(f):
"""Decorator to require authentication via session or API token."""
def decorated_function(*args, **kwargs):
# Check for session auth first
user = get_current_user_session()
if user:
request.api_user = user
return f(*args, **kwargs)
# Check for API token auth
auth_header = request.headers.get('Authorization')
if auth_header and auth_header.startswith('Bearer '):
token = auth_header.split(' ')[1]
from .database import validate_user_api_token
user = validate_user_api_token(token)
if user:
request.api_user = user
return f(*args, **kwargs)
# No valid auth
if request.is_json or request.path.startswith('/api/'):
return json.dumps({'error': 'Authentication required'}), 401
else:
return redirect(url_for('login'))
decorated_function.__name__ = f.__name__
return decorated_function
def admin_api_auth_required(f):
"""Decorator to require admin authentication via session or API token."""
def decorated_function(*args, **kwargs):
# Check for session auth first
user = get_current_user_session()
if user and user.get('role') == 'admin':
request.api_user = user
return f(*args, **kwargs)
# Check for API token auth
auth_header = request.headers.get('Authorization')
if auth_header and auth_header.startswith('Bearer '):
token = auth_header.split(' ')[1]
from .database import validate_user_api_token
user = validate_user_api_token(token)
if user and user.get('role') == 'admin':
request.api_user = user
return f(*args, **kwargs)
# No valid auth
if request.is_json or request.path.startswith('/api/'):
return json.dumps({'error': 'Admin authentication required'}), 401
else:
flash('Admin access required', 'error')
return redirect(url_for('dashboard'))
decorated_function.__name__ = f.__name__
return decorated_function
def send_to_backend(msg_type: str, data: dict) -> str: def send_to_backend(msg_type: str, data: dict) -> str:
"""Send message to backend and return message id.""" """Send message to backend and return message id."""
msg_id = str(uuid.uuid4()) msg_id = str(uuid.uuid4())
...@@ -759,198 +705,13 @@ def admin_delete_user(user_id): ...@@ -759,198 +705,13 @@ def admin_delete_user(user_id):
flash('Deletion not confirmed.', 'error') flash('Deletion not confirmed.', 'error')
return redirect(url_for('admin')) return redirect(url_for('admin'))
@app.route('/api/stats')
@api_auth_required
def api_stats():
"""Get system stats for authenticated users."""
import psutil
import torch
import time
data = {'status': 'Idle'} # Simplified - in real implementation, get from backend
# GPU stats
if torch.cuda.is_available():
data['gpu_count'] = torch.cuda.device_count()
data['gpus'] = []
for i in range(torch.cuda.device_count()):
gpu = {
'name': torch.cuda.get_device_name(i),
'memory_used': torch.cuda.memory_allocated(i) / 1024**3, # GB
'memory_total': torch.cuda.get_device_properties(i).total_memory / 1024**3,
'utilization': 0 # Would need pynvml for actual utilization
}
data['gpus'].append(gpu)
else:
data['gpu_count'] = 0
# CPU and RAM
data['cpu_percent'] = psutil.cpu_percent()
ram = psutil.virtual_memory()
data['ram_used'] = ram.used / 1024**3
data['ram_total'] = ram.total / 1024**3
return json.dumps(data)
@app.route('/api/analyze', methods=['POST'])
@api_auth_required
def api_analyze():
"""API endpoint for analysis using authentication."""
user = request.api_user
# Check token balance (skip for admin users)
if user.get('role') != 'admin':
tokens = get_user_tokens(user['id'])
if tokens <= 0:
return json.dumps({'error': 'Insufficient tokens'}), 402
# Process analysis request
model_path = request.json.get('model_path', 'Qwen/Qwen2.5-VL-7B-Instruct')
prompt = request.json.get('prompt', 'Describe this image.')
file_path = request.json.get('file_path')
interval = request.json.get('interval', 10)
if not file_path:
return json.dumps({'error': 'file_path is required'}), 400
# Send to backend for processing
data = {
'model_path': model_path,
'prompt': prompt,
'local_path': file_path,
'interval': interval,
'user_id': user['id']
}
msg_id = send_to_backend('analyze_request', data)
result_data = get_result(msg_id)
if 'data' in result_data:
result = result_data['data'].get('result', 'Analysis completed')
# Deduct tokens (skip for admin users)
if user.get('role') != 'admin':
update_user_tokens(user['id'], -10)
return json.dumps({
'result': result,
'tokens_used': 10,
'remaining_tokens': get_user_tokens(user['id'])
})
else:
error = result_data.get('error', 'Analysis failed')
return json.dumps({'error': error}), 500
@app.route('/admin/api/train', methods=['POST'])
@admin_api_auth_required
def admin_api_train():
"""API endpoint for training (admin only)."""
user = request.api_user
# Check token balance (skip for admin users)
if user.get('role') != 'admin':
tokens = get_user_tokens(user['id'])
if tokens < 100:
return json.dumps({'error': 'Insufficient tokens. Training requires 100 tokens.'}), 402
# Process training request
output_model = request.json.get('output_model', 'MyCustomModel')
description = request.json.get('description', '')
train_path = request.json.get('train_path')
if not train_path or not os.path.exists(train_path):
return json.dumps({'error': 'Valid train_path is required'}), 400
# Send to backend for processing
data = {
'output_model': output_model,
'description': description,
'train_path': train_path,
'user_id': user['id']
}
msg_id = send_to_backend('train_request', data)
result_data = get_result(msg_id)
if 'data' in result_data:
message = result_data['data'].get('message', 'Training completed')
# Deduct tokens (skip for admin users)
if user.get('role') != 'admin':
update_user_tokens(user['id'], -100)
return json.dumps({
'message': message,
'tokens_used': 100,
'remaining_tokens': get_user_tokens(user['id'])
})
else:
error = result_data.get('error', 'Training failed')
return json.dumps({'error': error}), 500
@app.route('/admin/api/users')
@admin_api_auth_required
def admin_api_users():
"""API endpoint to list all users (admin only)."""
from .database import get_all_users
users = get_all_users()
# Return user data without sensitive info like passwords
user_list = []
for u in users:
user_list.append({
'id': u['id'],
'username': u['username'],
'email': u['email'],
'role': u['role'],
'tokens': u['tokens'],
'active': u['active'],
'created_at': u['created_at']
})
return json.dumps({'users': user_list})
@app.route('/admin/api/cluster_tokens')
@admin_api_auth_required
def admin_api_cluster_tokens():
"""API endpoint to list cluster tokens (admin only)."""
from .database import get_worker_tokens
worker_tokens = get_worker_tokens()
# Return token data
token_list = []
for t in worker_tokens:
token_list.append({
'id': t['id'],
'name': t['name'],
'token': t['token'], # Note: in real implementation, might not return full token
'active': t['active'],
'created_at': t['created_at']
})
return json.dumps({'tokens': token_list})
@app.route('/api/api_tokens')
@api_auth_required
def api_api_tokens():
"""API endpoint to list user's API tokens."""
user = request.api_user
from .database import get_user_api_tokens
user_tokens = get_user_api_tokens(user['id'])
# Return token data (without full token for security)
token_list = []
for t in user_tokens:
token_list.append({
'id': t['id'],
'name': t['name'],
'created_at': t['created_at'],
'last_used': t.get('last_used')
})
return json.dumps({'tokens': token_list})
@app.route('/api')
@login_required
def api_docs():
"""API documentation page."""
user = get_current_user_session()
return render_template('api.html', user=user, active_page='api')
@app.route('/admin/api')
@admin_required
def admin_api_docs():
"""Admin API documentation page."""
user = get_current_user_session()
return render_template('admin_api.html', user=user, active_page='admin_api')
@app.route('/static/<path:filename>') @app.route('/static/<path:filename>')
def serve_static(filename): def serve_static(filename):
...@@ -960,46 +721,6 @@ def serve_static(filename): ...@@ -960,46 +721,6 @@ def serve_static(filename):
def serve_logo(): def serve_logo():
return send_from_directory('..', 'image.jpg') return send_from_directory('..', 'image.jpg')
@app.route('/admin/api/browse')
@admin_api_auth_required
def admin_api_browse():
"""Browse files in server directory (admin only)."""
path = request.args.get('path', '')
if not server_dir:
return json.dumps({'error': 'Server directory not configured'})
# Ensure path is within server_dir
full_path = os.path.join(server_dir, path)
full_path = os.path.abspath(full_path)
# Security check: ensure path is within server_dir
if not full_path.startswith(server_dir):
return json.dumps({'error': 'Access denied'})
if not os.path.exists(full_path):
return json.dumps({'error': 'Path not found'})
try:
items = []
if os.path.isdir(full_path):
for item in os.listdir(full_path):
item_path = os.path.join(full_path, item)
items.append({
'name': item,
'path': os.path.join(path, item) if path else item,
'is_dir': os.path.isdir(item_path),
'size': os.path.getsize(item_path) if os.path.isfile(item_path) else 0
})
else:
return json.dumps({'error': 'Not a directory'})
return json.dumps({
'current_path': path,
'items': items
})
except Exception as e:
return json.dumps({'error': str(e)})
if __name__ == "__main__": if __name__ == "__main__":
parser = argparse.ArgumentParser(description='VidAI Web Interface') parser = argparse.ArgumentParser(description='VidAI Web Interface')
...@@ -1012,4 +733,11 @@ if __name__ == "__main__": ...@@ -1012,4 +733,11 @@ if __name__ == "__main__":
server_dir = os.path.abspath(server_dir) server_dir = os.path.abspath(server_dir)
print(f"Server directory set to: {server_dir}") print(f"Server directory set to: {server_dir}")
# Set server_dir in API module
import vidai.api as api_module
api_module.server_dir = server_dir
# Register API blueprint
app.register_blueprint(api_bp)
app.run(host='0.0.0.0', debug=True) app.run(host='0.0.0.0', debug=True)
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment