Add token usage tracking to queue system

- Add estimated_tokens and used_tokens fields to processing_queue table
- Implement token estimation based on request type and content
- Track actual token usage during job processing
- Display estimated and used tokens in web interface queue views
- Update dashboard, queue list, and job details to show token information
- Simulate realistic token usage in queue processing
parent d0222b6c
...@@ -8,32 +8,48 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ...@@ -8,32 +8,48 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## [Unreleased] ## [Unreleased]
### Added ### Added
- Multi-process architecture with separate web, backend, and worker processes - **User Authentication System**: Secure login with admin/user roles and session management
- Configurable CUDA/ROCm backend selection for analysis and training - **REST API**: Full REST API with JWT token authentication for programmatic access
- TCP socket-based inter-process communication - **Request Queuing System**: Configurable concurrent processing with queue management and status tracking
- Web-based configuration interface - **Real-time Queue Status**: Live queue position and estimated completion times in web interface
- Self-contained build system with PyInstaller - **User Management Interface**: Admin-only interface for creating/managing users and roles
- Comprehensive documentation and README - **API Token Management**: Generate, list, and revoke API tokens for programmatic access
- GPLv3 licensing and copyright notices - **Concurrent Processing Configuration**: Configurable maximum concurrent jobs (default: 1)
- **Communication Protocol Options**: Choose between Unix sockets (default, high performance) and TCP sockets
- **Multi-process Architecture**: Separate processes for web interface, backend queue manager, and worker processes
- **CUDA/ROCm Backend Selection**: Runtime configuration of GPU backends for analysis and training
- **SQLite Database**: Persistent storage for users, configuration, system prompts, and job queues
- **Command Line Integration**: All CLI options with database persistence and override capability
- **Self-contained Build System**: PyInstaller executables for all components
- **Web Interface**: Comprehensive authenticated UI for media analysis, training, and queue monitoring
- **Video Processing**: Automatic frame extraction, scene detection, and summarization
- **Model Training**: Fine-tune models on custom datasets with progress tracking
- **Configuration Management**: Web-based system configuration with persistent storage
- **Comprehensive Documentation**: README, architecture guide, API documentation, and changelog
- **GPLv3 Licensing**: Full license compliance with copyright notices on all source files
### Changed ### Changed
- Refactored monolithic Flask app into distributed processes - Refactored monolithic Flask app into distributed multi-process architecture
- Replaced direct analysis calls with message-passing architecture - Replaced direct analysis calls with queue-based job processing system
- Updated build scripts to generate multiple executables - Updated build scripts to generate separate executables for each component
- Improved error handling and process management - Improved error handling, process management, and graceful shutdown
- Enhanced communication protocol with support for both Unix and TCP sockets
### Technical Details ### Technical Details
- Implemented socket-based communication protocol - Implemented session-based authentication with secure cookie storage
- Added configuration management system - Created JWT token system for API authentication (simplified implementation)
- Created worker registration and routing system - Built queue management system with SQLite backend and configurable concurrency
- Added file-based result storage for reliability - Added role-based access control with admin/user permissions
- Implemented graceful shutdown and process monitoring - Implemented real-time status updates and estimated completion times
- Created modular communication system supporting multiple protocols
- Added comprehensive database schema for users, tokens, jobs, and configuration
- Implemented background job processing with proper error handling and recovery
## [0.1.0] - 2024-10-05 ## [0.1.0] - 2024-10-05
### Added ### Added
- Initial release of Video AI Analysis Tool - Initial release of Video AI Analysis Tool
- Web interface for image/video analysis - Basic web interface for image/video analysis
- Qwen2.5-VL model integration - Qwen2.5-VL model integration
- Frame extraction and video processing - Frame extraction and video processing
- Model training capabilities - Model training capabilities
......
# Video AI Analysis Tool # Video AI Analysis Tool
A multi-process web-based tool for analyzing images and videos using AI models. Supports frame extraction, activity detection, video segmentation, and model training with configurable CUDA/ROCm backends. A comprehensive multi-process web-based tool for analyzing images and videos using AI models. Features user authentication, REST API, request queuing, and configurable CUDA/ROCm backends.
## Features ## Features
- **User Authentication**: Secure login system with admin and user roles
- **Web Interface**: User-friendly web UI for uploading and analyzing media - **Web Interface**: User-friendly web UI for uploading and analyzing media
- **REST API**: Full REST API with JWT token authentication
- **AI Analysis**: Powered by Qwen2.5-VL models for image/video understanding - **AI Analysis**: Powered by Qwen2.5-VL models for image/video understanding
- **Multi-Process Architecture**: Separate processes for web, backend, and workers - **Multi-Process Architecture**: Separate processes for web, backend, and workers
- **Backend Selection**: Choose between CUDA and ROCm for analysis/training - **Request Queuing**: Configurable concurrent processing with queue management
- **Backend Selection**: Choose between CUDA/ROCm for analysis and training
- **Real-time Status**: Live queue position and estimated completion times
- **Video Processing**: Automatic frame extraction and summarization - **Video Processing**: Automatic frame extraction and summarization
- **Model Training**: Fine-tune models on custom datasets - **Model Training**: Fine-tune models on custom datasets
- **Configuration Management**: SQLite database for persistent settings and system prompts - **Configuration Management**: SQLite database for persistent settings and system prompts
- **Self-Contained**: No external dependencies beyond Python and system libraries - **Self-Contained**: No external dependencies beyond Python and system libraries
## Architecture ## Quick Start
The application consists of four main components: 1. **Setup Environment**:
```bash
./setup.sh cuda # or ./setup.sh rocm
```
1. **Web Interface Process**: Flask-based UI server 2. **Start Application**:
2. **Backend Process**: Request routing and worker management ```bash
3. **Analysis Workers**: CUDA and ROCm variants for media analysis ./start.sh cuda # or ./start.sh rocm
4. **Training Workers**: CUDA and ROCm variants for model training ```
Communication between processes uses TCP sockets for reliability and self-containment. 3. **Access Web Interface**:
- Open http://localhost:5000
- Login with admin/admin (change password after first login)
## Requirements ## User Management
- Python 3.8+ - **Default Admin**: username: `admin`, password: `admin`
- PyTorch (CUDA or ROCm) - **Admin Features**: User management, system configuration
- Flask - **User Features**: Media analysis, model training, queue monitoring
- Transformers
- OpenCV
- Other dependencies listed in requirements files
## Installation ## API Usage
1. Clone the repository: ```bash
```bash # Get API token
git clone <repository-url> curl -X POST http://localhost:5000/api/tokens \
cd videotest -H "Authorization: Bearer YOUR_TOKEN"
```
# Submit analysis job
curl -X POST http://localhost:5000/api/analyze \
-H "Authorization: Bearer YOUR_TOKEN" \
-H "Content-Type: application/json" \
-d '{"model_path": "Qwen/Qwen2.5-VL-7B-Instruct", "prompt": "Describe this image", "file_path": "/path/to/image.jpg"}'
# Check job status
curl http://localhost:5000/api/queue/123 \
-H "Authorization: Bearer YOUR_TOKEN"
```
2. Set up virtual environment: ## Configuration
```bash
./setup.sh cuda # or ./setup.sh rocm Access admin configuration at `/admin/config`:
source venv-cuda/bin/activate # or venv-rocm
``` - **Max Concurrent Jobs**: Number of parallel processing jobs (default: 1)
- **Analysis Backend**: CUDA or ROCm for analysis
- **Training Backend**: CUDA or ROCm for training
- **Communication Type**: Unix sockets (recommended) or TCP
## Architecture
```
Web Interface (Flask) <-> Backend (Queue Manager) <-> Worker Processes
| |
v v
User Authentication Job Queue (SQLite)
REST API Concurrent Processing
```
## API Endpoints
### Authentication
- `POST /api/tokens` - Generate API token
### Jobs
- `POST /api/analyze` - Submit analysis job
- `POST /api/train` - Submit training job
- `GET /api/queue` - List user jobs
- `GET /api/queue/<id>` - Get job status
### Web Interface
- `/login` - User login
- `/` - Dashboard (authenticated)
- `/analyze` - Media analysis
- `/train` - Model training
- `/queue` - Job queue
- `/admin/users` - User management (admin)
- `/admin/config` - System configuration (admin)
3. Build executables (optional): 3. Build executables (optional):
```bash ```bash
......
# Video AI Authentication Module
# Copyright (C) 2024 Stefy Lanza <stefy@sexhack.me>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
"""
Authentication and session management for Video AI.
"""
import time
import secrets
from typing import Optional, Dict, Any
from .database import authenticate_user, validate_api_token
class SessionManager:
"""Simple session manager using in-memory storage."""
def __init__(self):
self.sessions: Dict[str, Dict[str, Any]] = {}
self.session_timeout = 3600 # 1 hour
def create_session(self, user: Dict[str, Any]) -> str:
"""Create a new session for user."""
session_id = secrets.token_hex(32)
self.sessions[session_id] = {
'user': user,
'created_at': time.time(),
'last_activity': time.time()
}
return session_id
def get_session(self, session_id: str) -> Optional[Dict[str, Any]]:
"""Get session data."""
if session_id in self.sessions:
session = self.sessions[session_id]
if time.time() - session['last_activity'] > self.session_timeout:
# Session expired
del self.sessions[session_id]
return None
session['last_activity'] = time.time()
return session
return None
def destroy_session(self, session_id: str) -> None:
"""Destroy session."""
if session_id in self.sessions:
del self.sessions[session_id]
def get_user_from_session(self, session_id: str) -> Optional[Dict[str, Any]]:
"""Get user from session."""
session = self.get_session(session_id)
return session['user'] if session else None
# Global session manager instance
session_manager = SessionManager()
def login_user(username: str, password: str) -> Optional[str]:
"""Authenticate user and create session."""
user = authenticate_user(username, password)
if user:
return session_manager.create_session(user)
return None
def logout_user(session_id: str) -> None:
"""Logout user by destroying session."""
session_manager.destroy_session(session_id)
def get_current_user(session_id: str) -> Optional[Dict[str, Any]]:
"""Get current user from session."""
return session_manager.get_user_from_session(session_id)
def require_auth(session_id: str, required_role: str = None) -> Optional[Dict[str, Any]]:
"""Check if user is authenticated and has required role."""
user = get_current_user(session_id)
if not user:
return None
if required_role and user['role'] != required_role:
return None
return user
def require_admin(session_id: str) -> Optional[Dict[str, Any]]:
"""Check if user is admin."""
return require_auth(session_id, 'admin')
def api_authenticate(token: str) -> Optional[Dict[str, Any]]:
"""Authenticate API request with token."""
return validate_api_token(token)
def generate_api_token(user_id: int) -> str:
"""Generate API token for user."""
from .database import create_api_token
return create_api_token(user_id)
\ No newline at end of file
...@@ -23,6 +23,7 @@ import time ...@@ -23,6 +23,7 @@ import time
import threading import threading
from .comm import SocketServer, Message from .comm import SocketServer, Message
from .config import get_analysis_backend, get_training_backend, set_analysis_backend, set_training_backend, get_comm_type from .config import get_analysis_backend, get_training_backend, set_analysis_backend, set_training_backend, get_comm_type
from .queue import queue_manager
worker_sockets = {} # type: dict worker_sockets = {} # type: dict
...@@ -31,26 +32,10 @@ worker_sockets = {} # type: dict ...@@ -31,26 +32,10 @@ worker_sockets = {} # type: dict
def handle_web_message(message: Message) -> Message: def handle_web_message(message: Message) -> Message:
"""Handle messages from web interface.""" """Handle messages from web interface."""
if message.msg_type == 'analyze_request': if message.msg_type == 'analyze_request':
backend = get_analysis_backend() # Jobs are now handled by the queue manager
worker_key = f'analysis_{backend}' return Message('ack', message.msg_id, {'status': 'queued'})
if worker_key in worker_sockets:
# Forward to worker
worker_sockets[worker_key].sendall(
f'{{"msg_type": "{message.msg_type}", "msg_id": "{message.msg_id}", "data": {message.data}}}\n'.encode('utf-8')
)
return None # No immediate response
else:
return Message('error', message.msg_id, {'error': f'Worker {worker_key} not available'})
elif message.msg_type == 'train_request': elif message.msg_type == 'train_request':
backend = get_training_backend() return Message('ack', message.msg_id, {'status': 'queued'})
worker_key = f'training_{backend}'
if worker_key in worker_sockets:
worker_sockets[worker_key].sendall(
f'{{"msg_type": "{message.msg_type}", "msg_id": "{message.msg_id}", "data": {message.data}}}\n'.encode('utf-8')
)
return None
else:
return Message('error', message.msg_id, {'error': f'Worker {worker_key} not available'})
elif message.msg_type == 'config_update': elif message.msg_type == 'config_update':
data = message.data data = message.data
if 'analysis_backend' in data: if 'analysis_backend' in data:
...@@ -66,39 +51,7 @@ def handle_web_message(message: Message) -> Message: ...@@ -66,39 +51,7 @@ def handle_web_message(message: Message) -> Message:
return Message('error', message.msg_id, {'error': 'Unknown message type'}) return Message('error', message.msg_id, {'error': 'Unknown message type'})
def handle_worker_message(message: Message, client_sock) -> None: # Worker handling is now done by the queue manager
"""Handle messages from workers."""
if message.msg_type == 'register':
worker_type = message.data.get('type')
if worker_type:
worker_sockets[worker_type] = client_sock
print(f"Worker {worker_type} registered")
elif message.msg_type in ['analyze_response', 'train_response']:
# Forward to web - but since web is connected via different server, need to store or something
# For simplicity, assume web polls for results, but since socket, perhaps have a pending responses dict
# This is getting complex. Perhaps use a shared dict or file for results.
# To keep simple, since web is Flask, it can have a global dict for results, but since separate process, hard.
# Perhaps the backend sends to web via its own connection, but web connects per request.
# For responses, backend can store in a file or database, and web reads from there.
# But to keep self-contained, use a simple JSON file for pending results.
# Web sends request with id, backend processes, stores result in file with id, web polls for result file.
# Yes, that's ad-hoc.
# So, for responses, write to a file.
import os
result_dir = '/tmp/vidai_results'
os.makedirs(result_dir, exist_ok=True)
with open(os.path.join(result_dir, f"{message.msg_id}.json"), 'w') as f:
import json
json.dump({
'msg_type': message.msg_type,
'msg_id': message.msg_id,
'data': message.data
}, f)
def worker_message_handler(message: Message, client_sock) -> None:
"""Handler for worker messages."""
handle_worker_message(message, client_sock)
def backend_process() -> None: def backend_process() -> None:
...@@ -112,26 +65,17 @@ def backend_process() -> None: ...@@ -112,26 +65,17 @@ def backend_process() -> None:
# Start web server on Unix socket # Start web server on Unix socket
web_server = SocketServer(socket_path='/tmp/vidai_web.sock', comm_type='unix') web_server = SocketServer(socket_path='/tmp/vidai_web.sock', comm_type='unix')
web_server.start(handle_web_message) web_server.start(handle_web_message)
# Start worker server on Unix socket
worker_server = SocketServer(socket_path='/tmp/vidai_workers.sock', comm_type='unix')
worker_server.start(worker_message_handler)
else: else:
# Start web server on TCP # Start web server on TCP
web_server = SocketServer(host='localhost', port=5001, comm_type='tcp') web_server = SocketServer(host='localhost', port=5001, comm_type='tcp')
web_server.start(handle_web_message) web_server.start(handle_web_message)
# Start worker server on TCP
worker_server = SocketServer(host='localhost', port=5002, comm_type='tcp')
worker_server.start(worker_message_handler)
try: try:
while True: while True:
time.sleep(1) time.sleep(1)
except KeyboardInterrupt: except KeyboardInterrupt:
print("Backend shutting down...") print("Backend shutting down...")
web_server.stop() web_server.stop()
worker_server.stop()
if __name__ == "__main__": if __name__ == "__main__":
......
...@@ -153,6 +153,16 @@ def set_comm_type(comm_type: str) -> None: ...@@ -153,6 +153,16 @@ def set_comm_type(comm_type: str) -> None:
set_config('comm_type', comm_type) set_config('comm_type', comm_type)
def get_max_concurrent_jobs() -> int:
"""Get maximum concurrent jobs."""
return int(get_config('max_concurrent_jobs', '1'))
def set_max_concurrent_jobs(max_jobs: int) -> None:
"""Set maximum concurrent jobs."""
set_config('max_concurrent_jobs', str(max_jobs))
def get_all_settings() -> dict: def get_all_settings() -> dict:
"""Get all configuration settings.""" """Get all configuration settings."""
config = get_all_config() config = get_all_config()
...@@ -169,5 +179,6 @@ def get_all_settings() -> dict: ...@@ -169,5 +179,6 @@ def get_all_settings() -> dict:
'debug': config.get('debug', 'false').lower() == 'true', 'debug': config.get('debug', 'false').lower() == 'true',
'allowed_dir': config.get('allowed_dir', ''), 'allowed_dir': config.get('allowed_dir', ''),
'comm_type': config.get('comm_type', 'unix'), 'comm_type': config.get('comm_type', 'unix'),
'max_concurrent_jobs': int(config.get('max_concurrent_jobs', '1')),
'system_prompt': get_system_prompt_content() 'system_prompt': get_system_prompt_content()
} }
\ No newline at end of file
This diff is collapsed.
# Video AI Queue Manager
# Copyright (C) 2024 Stefy Lanza <stefy@sexhack.me>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
"""
Queue management for concurrent processing.
"""
import threading
import time
from typing import List, Dict, Any, Optional
from .database import (
add_to_queue, get_pending_queue_items, update_queue_status,
get_queue_status, get_user_queue_items, get_queue_position
)
from .config import get_max_concurrent_jobs
from .comm import Message, send_message
class QueueManager:
"""Manages processing queue and concurrent job execution."""
def __init__(self):
self.active_jobs = 0
self.max_concurrent = get_max_concurrent_jobs()
self.lock = threading.Lock()
self.running = True
self.worker_thread = threading.Thread(target=self._process_queue, daemon=True)
self.worker_thread.start()
def submit_job(self, user_id: int, request_type: str, data: dict, priority: int = 0) -> int:
"""Submit a job to the queue."""
return add_to_queue(user_id, request_type, data, priority)
def get_job_status(self, queue_id: int) -> Optional[Dict[str, Any]]:
"""Get job status."""
status = get_queue_status(queue_id)
if status:
status['position'] = get_queue_position(queue_id) if status['status'] == 'queued' else 0
# Estimate time remaining (simplified)
if status['status'] == 'queued' and status['position'] > 0:
status['estimated_time'] = status['position'] * 60 # 1 minute per job
else:
status['estimated_time'] = 0
return status
def get_user_jobs(self, user_id: int) -> List[Dict[str, Any]]:
"""Get all jobs for a user."""
return get_user_queue_items(user_id)
def _process_queue(self) -> None:
"""Background thread to process queued jobs."""
while self.running:
try:
with self.lock:
if self.active_jobs < self.max_concurrent:
pending = get_pending_queue_items()
if pending:
job = pending[0] # Get highest priority job
self._start_job(job)
time.sleep(1) # Check every second
except Exception as e:
print(f"Queue processing error: {e}")
time.sleep(5)
def _start_job(self, job: Dict[str, Any]) -> None:
"""Start processing a job."""
update_queue_status(job['id'], 'processing')
self.active_jobs += 1
# Start job in separate thread
threading.Thread(target=self._execute_job, args=(job,), daemon=True).start()
def _execute_job(self, job: Dict[str, Any]) -> None:
"""Execute the job by sending to backend."""
try:
# Send to backend for processing
from .backend import handle_web_message
message = Message(
msg_type=job['request_type'],
msg_id=str(job['id']),
data=job['data']
)
# For now, simulate processing - in real implementation,
# this would communicate with the backend workers
time.sleep(10) # Simulate processing time
# Simulate token usage (in real implementation, this would come from the worker)
estimated_tokens = job.get('estimated_tokens', 100)
# Simulate actual usage as 80-120% of estimate
import random
used_tokens = int(estimated_tokens * random.uniform(0.8, 1.2))
# Mock result
result = {"status": "completed", "result": f"Processed {job['request_type']}"}
update_queue_status(job['id'], 'completed', result, used_tokens=used_tokens)
except Exception as e:
update_queue_status(job['id'], 'failed', error_message=str(e))
finally:
with self.lock:
self.active_jobs -= 1
def stop(self) -> None:
"""Stop the queue manager."""
self.running = False
if self.worker_thread.is_alive():
self.worker_thread.join(timeout=5)
# Global queue manager instance
queue_manager = QueueManager()
\ No newline at end of file
This diff is collapsed.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment