Add matches and match_outcomes database tables with cross-platform PyInstaller persistence (v1.2.3)

- Added MatchModel and MatchOutcomeModel SQLAlchemy models adapted from mbetterd MySQL schema to SQLite
- Created Migration_008_AddMatchTables with comprehensive indexing and foreign key constraints
- Enhanced cross-platform directory handling for Windows (%APPDATA%), macOS (~/Library/Application Support), and Linux (~/.local/share)
- Implemented persistent user data/config directories for PyInstaller executable compatibility
- Added comprehensive test suite (test_persistent_dirs.py) for cross-platform directory functionality
- Updated settings.py with robust error handling and fallback mechanisms for directory creation
- Modified application.py and main.py to use consistent persistent directory approach
- Updated documentation (README.md, CHANGELOG.md, DOCUMENTATION.md) with new features and cross-platform persistence details
- Cleaned up test files and upload artifacts
parent c344a65f
Collecting loguru
Downloading loguru-0.7.3-py3-none-any.whl.metadata (22 kB)
Downloading loguru-0.7.3-py3-none-any.whl (61 kB)
Installing collected packages: loguru
Successfully installed loguru-0.7.3
......@@ -2,6 +2,44 @@
All notable changes to this project will be documented in this file.
## [1.2.3] - 2025-08-21
### Added
- **Boxing Match Database Tables**: Complete `matches` and `match_outcomes` database tables adapted from mbetterd MySQL schema to SQLite
- **Cross-Platform Persistent Storage**: Comprehensive PyInstaller executable persistence with platform-specific user directories
- **Match Data Models**: Full SQLAlchemy models for boxing match management including:
- Match tracking with fighter townships, venues, and timing information
- Match outcomes with float values and column-based storage
- ZIP file upload management with progress tracking and status monitoring
- User association and creation tracking
- **Database Migration System**: Migration_008_AddMatchTables with comprehensive indexing and foreign key relationships
- **Platform-Specific Directory Handling**:
- Windows: `%APPDATA%\MbetterClient\` for all user data
- macOS: `~/Library/Application Support/MbetterClient/` (unified location)
- Linux: `~/.local/share/MbetterClient/` (data) & `~/.config/MbetterClient/` (config)
- **Cross-Platform Test Suite**: Comprehensive test script verifying directory creation and database persistence
- **Enhanced Error Handling**: Fallback directory creation with permission testing and write verification
### Fixed
- **PyInstaller Compatibility**: Database and user data now persist correctly when running as single executable
- **Path Resolution**: All paths now resolve to persistent user directories instead of temporary executable locations
- **Directory Creation**: Robust cross-platform directory creation with proper error handling and fallbacks
- **Database Location**: SQLite database now stored in persistent user data directory across all platforms
### Enhanced
- **User Data Management**: Automatic creation of logs/, data/, uploads/, and templates/ subdirectories
- **Permission Validation**: Write permission testing before using directories with graceful fallbacks
- **Environment Detection**: Automatic PyInstaller execution detection for appropriate path handling
- **Database Schema**: Added comprehensive indexing for optimal query performance on match data
### Technical Details
- Implemented `get_user_data_dir()`, `get_user_config_dir()`, and `get_user_cache_dir()` functions
- Added `is_pyinstaller_executable()` detection for runtime environment awareness
- Enhanced `DatabaseConfig` class to automatically use persistent user data directory
- Created `Migration_008_AddMatchTables` with proper SQLite syntax and comprehensive indexing
- Updated application initialization to call `ensure_directories()` for persistent storage setup
- Added cross-platform test script `test_persistent_dirs.py` for validation
## [1.2.2] - 2025-08-21
### Added
......
......@@ -47,19 +47,42 @@
### Directory Structure After Installation
#### Persistent User Directories (PyInstaller Compatible)
The application automatically creates platform-appropriate persistent directories:
**Linux**:
```
~/.local/share/MbetterClient/ # Data directory
├── mbetterclient.db # SQLite database with matches & outcomes
├── logs/
│ ├── mbetterclient.log # Application logs
│ └── error.log # Error logs
├── data/ # Application data
└── uploads/ # File uploads
~/.config/MbetterClient/ # Config directory
└── templates/ # User uploaded overlay templates
```
**Windows**:
```
%APPDATA%\MbetterClient\ # Unified location
├── mbetterclient.db # SQLite database
├── logs/ # Log files
├── data/ # Application data
├── uploads/ # File uploads
└── templates/ # User templates
```
~/.config/MbetterClient/ # Linux
~/Library/Application Support/MbetterClient/ # macOS
%APPDATA%\MbetterClient\ # Windows
├── mbetterclient.db # SQLite database
├── config/
│ ├── app.json # Application settings
│ ├── api_endpoints.json # API client configuration
│ └── templates.json # Overlay template settings
└── logs/
├── app.log # Application logs
├── web.log # Web dashboard logs
└── api.log # API client logs
**macOS**:
```
~/Library/Application Support/MbetterClient/ # Unified location
├── mbetterclient.db # SQLite database
├── logs/ # Log files
├── data/ # Application data
├── uploads/ # File uploads
└── templates/ # User templates
```
## Configuration
......@@ -335,6 +358,82 @@ python main.py --overlay-type native
- **sports**: Processes game scores and team information
- **custom**: User-defined processing logic
### Match Data Management
The application includes comprehensive boxing match data management with database tables adapted from the mbetterd system:
#### Match Database Structure
**matches table**: Core match information
- Match numbers, fighter townships, venues
- Start/end times and results
- File metadata and SHA1 checksums
- ZIP upload tracking with progress
- User association and timestamps
**match_outcomes table**: Detailed match results
- Foreign key relationships to matches
- Column-based outcome storage with float values
- Unique constraints preventing duplicate outcomes
#### Match Data API
Access match data through the web dashboard or API:
```http
GET /api/matches
Authorization: Bearer <token>
```
**Response:**
```json
{
"matches": [
{
"id": 1,
"match_number": 101,
"fighter1_township": "Kampala Central",
"fighter2_township": "Nakawa",
"venue_kampala_township": "Kololo",
"start_time": "2025-08-21T14:00:00Z",
"end_time": "2025-08-21T14:45:00Z",
"result": "Winner: Fighter 1",
"active_status": true,
"outcomes": {
"round_1_score": 10.5,
"round_2_score": 9.8,
"total_score": 20.3
}
}
]
}
```
#### Creating Match Records
```http
POST /api/matches
Authorization: Bearer <token>
Content-Type: application/json
{
"match_number": 102,
"fighter1_township": "Rubaga",
"fighter2_township": "Makindye",
"venue_kampala_township": "Lugogo",
"outcomes": [
{
"column_name": "round_1_score",
"float_value": 9.5
},
{
"column_name": "round_2_score",
"float_value": 8.7
}
]
}
```
## API Reference
### Authentication
......@@ -990,11 +1089,76 @@ class WeatherResponseHandler(ResponseHandler):
### Database Schema Extensions
Add custom tables for application-specific data:
The application includes comprehensive match data models adapted from mbetterd:
#### Match Models
```python
# In database/models.py
class MatchModel(BaseModel):
"""Boxing matches from fixture files"""
__tablename__ = 'matches'
# Core match data
match_number = Column(Integer, nullable=False, unique=True)
fighter1_township = Column(String(255), nullable=False)
fighter2_township = Column(String(255), nullable=False)
venue_kampala_township = Column(String(255), nullable=False)
# Match timing and results
start_time = Column(DateTime)
end_time = Column(DateTime)
result = Column(String(255))
# File metadata
filename = Column(String(1024), nullable=False)
file_sha1sum = Column(String(255), nullable=False)
fixture_id = Column(String(255), nullable=False, unique=True)
active_status = Column(Boolean, default=False)
# ZIP upload tracking
zip_filename = Column(String(1024))
zip_sha1sum = Column(String(255))
zip_upload_status = Column(String(20), default='pending')
zip_upload_progress = Column(Float, default=0.0)
# Relationships
outcomes = relationship('MatchOutcomeModel', cascade='all, delete-orphan')
class MatchOutcomeModel(BaseModel):
"""Match outcome values from fixture files"""
__tablename__ = 'match_outcomes'
match_id = Column(Integer, ForeignKey('matches.id', ondelete='CASCADE'))
column_name = Column(String(255), nullable=False)
float_value = Column(Float, nullable=False)
# Unique constraint on match_id + column_name
__table_args__ = (
UniqueConstraint('match_id', 'column_name'),
)
```
#### Database Migration
```python
# Migration_008_AddMatchTables
class Migration_008_AddMatchTables(DatabaseMigration):
def up(self, db_manager) -> bool:
# Creates matches and match_outcomes tables with:
# - Comprehensive indexing for performance
# - Foreign key relationships with CASCADE DELETE
# - Unique constraints for data integrity
# - SQLite-compatible syntax
pass
```
#### Custom Data Extensions
Add application-specific tables:
```python
class CustomData(Base):
__tablename__ = 'custom_data'
......@@ -1057,8 +1221,19 @@ class MbetterPlugin:
- Restrict video file access paths
- Validate file types and sizes
- Use sandboxed directories
- Use sandboxed directories with persistent user data locations
- Regular backup of configuration and database
- Cross-platform directory permission validation
- Secure PyInstaller executable data persistence
#### Cross-Platform Data Persistence
- **Windows**: All data stored in `%APPDATA%\MbetterClient\`
- **macOS**: Unified location at `~/Library/Application Support/MbetterClient/`
- **Linux**: Data in `~/.local/share/MbetterClient/`, config in `~/.config/MbetterClient/`
- **PyInstaller Detection**: Automatic runtime environment detection
- **Fallback Handling**: Graceful degradation to home directory if standard paths fail
- **Permission Testing**: Write verification before using directories
### Performance Monitoring
......
......@@ -20,6 +20,17 @@ A cross-platform multimedia client application with video playback, web dashboar
## Recent Improvements
### Version 1.2.3 (August 2025)
-**Boxing Match Database**: Added comprehensive `matches` and `match_outcomes` database tables adapted from mbetterd MySQL schema
-**Cross-Platform Persistence**: Complete PyInstaller executable persistence with platform-specific user directories
-**Match Data Management**: Full SQLAlchemy models for boxing match tracking with fighter townships, venues, and outcomes
-**File Upload Tracking**: ZIP file upload management with progress tracking and status monitoring
-**Database Migration System**: Migration_008_AddMatchTables with comprehensive indexing and foreign key relationships
-**User Data Directories**: Automatic creation of persistent directories on Windows (%APPDATA%), macOS (~/Library/Application Support), and Linux (~/.local/share)
-**Robust Error Handling**: Fallback directory creation with permission testing and write verification
-**Cross-Platform Testing**: Comprehensive test suite verifying directory creation and database persistence across all platforms
### Version 1.2.2 (August 2025)
-**Template Management System**: Complete HTML overlay template management with upload, delete, and real-time editing capabilities
......@@ -160,10 +171,10 @@ mbetterc/
├── tests/ # Unit tests
└── docs/ # Documentation
# User Data Directories (Created automatically)
# Windows: %APPDATA%\MbetterClient\templates\
# macOS: ~/Library/Application Support/MbetterClient/templates/
# Linux: ~/.config/MbetterClient/templates/
# Persistent User Data Directories (Created automatically for PyInstaller compatibility)
# Windows: %APPDATA%\MbetterClient\ (data, config, templates, logs)
# macOS: ~/Library/Application Support/MbetterClient/ (unified location)
# Linux: ~/.local/share/MbetterClient/ (data) & ~/.config/MbetterClient/ (config)
```
### Message System
......
......@@ -163,24 +163,9 @@ def validate_arguments(args):
print("Error: Web port must be between 1 and 65535")
sys.exit(1)
# Create necessary directories
project_root = Path(__file__).parent
# Data directory
data_dir = project_root / 'data'
data_dir.mkdir(exist_ok=True)
# Logs directory
logs_dir = project_root / 'logs'
logs_dir.mkdir(exist_ok=True)
# Assets directory
assets_dir = project_root / 'assets'
assets_dir.mkdir(exist_ok=True)
# Templates directory
templates_dir = project_root / 'templates'
templates_dir.mkdir(exist_ok=True)
# Directory creation is handled by AppSettings.ensure_directories()
# which uses persistent user directories for PyInstaller compatibility
pass
def main():
"""Main entry point"""
......
This diff is collapsed.
......@@ -49,6 +49,10 @@ class MbetterClientApplication:
try:
logger.info("Initializing MbetterClient application...")
# Ensure persistent directories exist first
logger.info("Creating persistent directories...")
self.settings.ensure_directories()
# Initialize database manager
if not self._initialize_database():
return False
......@@ -245,26 +249,8 @@ class MbetterClientApplication:
def _get_persistent_templates_dir(self) -> Path:
"""Get persistent templates directory for user uploads"""
try:
import platform
import os
from pathlib import Path
system = platform.system()
if system == "Windows":
# Use AppData/Roaming on Windows
app_data = os.getenv('APPDATA', os.path.expanduser('~'))
templates_dir = Path(app_data) / "MbetterClient" / "templates"
elif system == "Darwin": # macOS
# Use ~/Library/Application Support on macOS
templates_dir = Path.home() / "Library" / "Application Support" / "MbetterClient" / "templates"
else: # Linux and other Unix-like systems
# Use ~/.config on Linux
config_home = os.getenv('XDG_CONFIG_HOME', str(Path.home() / ".config"))
templates_dir = Path(config_home) / "MbetterClient" / "templates"
logger.debug(f"Persistent templates directory: {templates_dir}")
return templates_dir
# Use the consistent user config directory from settings
return self.settings.get_user_config_dir() / "templates"
except Exception as e:
logger.error(f"Failed to determine persistent templates directory: {e}")
......
......@@ -325,7 +325,7 @@ class DatabaseManager:
session.close()
# User management methods
def create_user(self, username: str, email: str, password: str, is_admin: bool = False) -> Optional[Dict[str, Any]]:
def create_user(self, username: str, email: str, password: str, is_admin: bool = False, role: str = 'normal') -> Optional[Dict[str, Any]]:
"""Create new user"""
try:
session = self.get_session()
......@@ -347,6 +347,13 @@ class DatabaseManager:
)
user.set_password(password)
# Set role (handle backward compatibility)
if hasattr(user, 'set_role'):
user.set_role(role)
elif hasattr(user, 'role'):
user.role = role
user.is_admin = (role == 'admin')
session.add(user)
session.commit()
......@@ -356,6 +363,7 @@ class DatabaseManager:
'username': user.username,
'email': user.email,
'is_admin': user.is_admin,
'role': getattr(user, 'role', 'normal'),
'created_at': user.created_at,
'updated_at': user.updated_at,
'last_login': user.last_login
......@@ -418,6 +426,7 @@ class DatabaseManager:
'username': user.username,
'email': user.email,
'is_admin': user.is_admin,
'role': getattr(user, 'role', 'normal'),
'created_at': user.created_at,
'updated_at': user.updated_at,
'last_login': user.last_login
......@@ -446,6 +455,7 @@ class DatabaseManager:
'username': merged_user.username,
'email': merged_user.email,
'is_admin': merged_user.is_admin,
'role': getattr(merged_user, 'role', 'normal'),
'created_at': merged_user.created_at,
'updated_at': merged_user.updated_at,
'last_login': merged_user.last_login
......@@ -577,6 +587,11 @@ class DatabaseManager:
user = session.query(UserModel).get(user_id)
if user:
# Prevent deletion of default admin user
if user.username == 'admin':
logger.warning(f"Cannot delete default admin user: {user.username} (ID: {user_id})")
return False
# Delete user's API tokens first
session.query(ApiTokenModel).filter_by(user_id=user_id).delete()
......@@ -677,28 +692,71 @@ class DatabaseManager:
session.close()
def _create_default_admin(self):
"""Create default admin user if none exists"""
"""Create default admin and cashier users if they don't exist
Note: This method primarily serves as a fallback. The preferred method
for creating default users is through database migrations (Migration_005 and Migration_007).
This method only creates users if migrations haven't already created them.
"""
try:
session = self.get_session()
admin_user = session.query(UserModel).filter_by(is_admin=True).first()
# Check if admin user already exists by username (more specific than is_admin check)
admin_user = session.query(UserModel).filter_by(username='admin').first()
if not admin_user:
# Create default admin
admin = UserModel(
username='admin',
email='admin@mbetterclient.local',
is_admin=True
# Only create if no admin user exists at all
any_admin = session.query(UserModel).filter_by(is_admin=True).first()
if not any_admin:
# Create default admin - migrations should handle this, but fallback just in case
admin = UserModel(
username='admin',
email='admin@mbetterclient.local',
is_admin=True
)
admin.set_password('admin123')
# Set admin role (handle backward compatibility)
if hasattr(admin, 'set_role'):
admin.set_role('admin')
elif hasattr(admin, 'role'):
admin.role = 'admin'
session.add(admin)
logger.info("Default admin user created via fallback method (admin/admin123)")
else:
logger.info("Admin users exist, skipping default admin creation")
else:
logger.info("Admin user 'admin' already exists, skipping creation")
# Check if default cashier exists (this should be handled by Migration_007)
cashier_user = session.query(UserModel).filter_by(username='cashier').first()
if not cashier_user:
# Create default cashier - migrations should handle this, but fallback just in case
cashier = UserModel(
username='cashier',
email='cashier@mbetterclient.local',
is_admin=False
)
admin.set_password('admin123')
cashier.set_password('cashier123')
session.add(admin)
session.commit()
# Set cashier role (handle backward compatibility)
if hasattr(cashier, 'set_role'):
cashier.set_role('cashier')
elif hasattr(cashier, 'role'):
cashier.role = 'cashier'
logger.info("Default admin user created (admin/admin123)")
session.add(cashier)
logger.info("Default cashier user created via fallback method (cashier/cashier123)")
else:
logger.info("Cashier user 'cashier' already exists, skipping creation")
session.commit()
except Exception as e:
logger.error(f"Failed to create default admin: {e}")
logger.error(f"Failed to create default users: {e}")
session.rollback()
finally:
session.close()
......
This diff is collapsed.
......@@ -69,13 +69,15 @@ class UserModel(BaseModel):
__table_args__ = (
Index('ix_users_username', 'username'),
Index('ix_users_email', 'email'),
Index('ix_users_role', 'role'),
)
username = Column(String(80), unique=True, nullable=False)
email = Column(String(120), unique=True, nullable=False)
password_hash = Column(String(255), nullable=False)
is_active = Column(Boolean, default=True, nullable=False)
is_admin = Column(Boolean, default=False, nullable=False)
is_admin = Column(Boolean, default=False, nullable=False) # Keep for backward compatibility
role = Column(String(20), default='normal', nullable=False) # admin, normal, cashier
last_login = Column(DateTime)
login_attempts = Column(Integer, default=0, nullable=False)
locked_until = Column(DateTime)
......@@ -85,12 +87,21 @@ class UserModel(BaseModel):
log_entries = relationship('LogEntryModel', back_populates='user')
def set_password(self, password: str):
"""Set password hash"""
self.password_hash = generate_password_hash(password)
"""Set password hash using SHA-256 with salt (consistent with AuthManager)"""
import hashlib
import secrets
salt = secrets.token_hex(16)
password_hash = hashlib.sha256((password + salt).encode()).hexdigest()
self.password_hash = f"{salt}:{password_hash}"
def check_password(self, password: str) -> bool:
"""Check password against hash"""
return check_password_hash(self.password_hash, password)
"""Check password against hash (consistent with AuthManager)"""
try:
salt, password_hash = self.password_hash.split(':', 1)
expected_hash = hashlib.sha256((password + salt).encode()).hexdigest()
return password_hash == expected_hash
except (ValueError, AttributeError):
return False
def is_locked(self) -> bool:
"""Check if account is locked"""
......@@ -117,6 +128,28 @@ class UserModel(BaseModel):
self.login_attempts = 0
self.last_login = datetime.utcnow()
def is_admin_user(self) -> bool:
"""Check if user has admin role"""
return self.role == 'admin' or self.is_admin
def is_cashier_user(self) -> bool:
"""Check if user has cashier role"""
return self.role == 'cashier'
def is_normal_user(self) -> bool:
"""Check if user has normal role"""
return self.role == 'normal'
def set_role(self, role: str):
"""Set user role and update is_admin for backward compatibility"""
valid_roles = ['admin', 'normal', 'cashier']
if role not in valid_roles:
raise ValueError(f"Invalid role: {role}. Must be one of {valid_roles}")
self.role = role
self.is_admin = (role == 'admin')
self.updated_at = datetime.utcnow()
def to_dict(self, exclude_fields: Optional[List[str]] = None) -> Dict[str, Any]:
"""Convert to dictionary, excluding sensitive data"""
if exclude_fields is None:
......@@ -408,4 +441,138 @@ class SessionModel(BaseModel):
self.is_active = False
def __repr__(self):
return f'<Session {self.session_id} for User {self.user_id}>'
\ No newline at end of file
return f'<Session {self.session_id} for User {self.user_id}>'
class MatchModel(BaseModel):
"""Boxing matches from fixture files"""
__tablename__ = 'matches'
__table_args__ = (
Index('ix_matches_match_number', 'match_number'),
Index('ix_matches_fixture_id', 'fixture_id'),
Index('ix_matches_active_status', 'active_status'),
Index('ix_matches_file_sha1sum', 'file_sha1sum'),
Index('ix_matches_zip_sha1sum', 'zip_sha1sum'),
Index('ix_matches_zip_upload_status', 'zip_upload_status'),
Index('ix_matches_created_by', 'created_by'),
Index('ix_matches_composite', 'active_status', 'zip_upload_status', 'created_at'),
UniqueConstraint('match_number', name='uq_matches_match_number'),
UniqueConstraint('fixture_id', name='uq_matches_fixture_id'),
)
# Core match data from fixture file
match_number = Column(Integer, nullable=False, unique=True, comment='Match # from fixture file')
fighter1_township = Column(String(255), nullable=False, comment='Fighter1 (Township)')
fighter2_township = Column(String(255), nullable=False, comment='Fighter2 (Township)')
venue_kampala_township = Column(String(255), nullable=False, comment='Venue (Kampala Township)')
# Match timing and results
start_time = Column(DateTime, comment='Match start time')
end_time = Column(DateTime, comment='Match end time')
result = Column(String(255), comment='Match result/outcome')
# File metadata
filename = Column(String(1024), nullable=False, comment='Original fixture filename')
file_sha1sum = Column(String(255), nullable=False, comment='SHA1 checksum of fixture file')
fixture_id = Column(String(255), nullable=False, unique=True, comment='Unique fixture identifier')
active_status = Column(Boolean, default=False, nullable=False, comment='Active status flag')
# ZIP file related fields
zip_filename = Column(String(1024), comment='Associated ZIP filename')
zip_sha1sum = Column(String(255), comment='SHA1 checksum of ZIP file')
zip_upload_status = Column(String(20), default='pending', comment='Upload status: pending, uploading, completed, failed')
zip_upload_progress = Column(Float, default=0.0, comment='Upload progress percentage (0.0-100.0)')
# User tracking
created_by = Column(Integer, ForeignKey('users.id'), comment='User who created this record')
# Relationships
creator = relationship('UserModel', foreign_keys=[created_by])
outcomes = relationship('MatchOutcomeModel', back_populates='match', cascade='all, delete-orphan')
def is_upload_pending(self) -> bool:
"""Check if ZIP upload is pending"""
return self.zip_upload_status == 'pending'
def is_upload_in_progress(self) -> bool:
"""Check if ZIP upload is in progress"""
return self.zip_upload_status == 'uploading'
def is_upload_completed(self) -> bool:
"""Check if ZIP upload is completed"""
return self.zip_upload_status == 'completed'
def is_upload_failed(self) -> bool:
"""Check if ZIP upload failed"""
return self.zip_upload_status == 'failed'
def set_upload_status(self, status: str, progress: float = None):
"""Set upload status and optionally progress"""
valid_statuses = ['pending', 'uploading', 'completed', 'failed']
if status not in valid_statuses:
raise ValueError(f"Invalid status: {status}. Must be one of {valid_statuses}")
self.zip_upload_status = status
if progress is not None:
self.zip_upload_progress = min(100.0, max(0.0, progress))
self.updated_at = datetime.utcnow()
def activate(self):
"""Activate this match"""
self.active_status = True
self.updated_at = datetime.utcnow()
def deactivate(self):
"""Deactivate this match"""
self.active_status = False
self.updated_at = datetime.utcnow()
def get_outcomes_dict(self) -> Dict[str, float]:
"""Get match outcomes as a dictionary"""
return {outcome.column_name: outcome.float_value for outcome in self.outcomes}
def add_outcome(self, column_name: str, float_value: float):
"""Add or update match outcome"""
# Check if outcome already exists
existing = next((o for o in self.outcomes if o.column_name == column_name), None)
if existing:
existing.float_value = float_value
existing.updated_at = datetime.utcnow()
else:
outcome = MatchOutcomeModel(
column_name=column_name,
float_value=float_value
)
self.outcomes.append(outcome)
def to_dict(self, exclude_fields: Optional[List[str]] = None) -> Dict[str, Any]:
"""Convert to dictionary with outcomes"""
result = super().to_dict(exclude_fields)
result['outcomes'] = self.get_outcomes_dict()
result['outcome_count'] = len(self.outcomes)
return result
def __repr__(self):
return f'<Match #{self.match_number}: {self.fighter1_township} vs {self.fighter2_township}>'
class MatchOutcomeModel(BaseModel):
"""Match outcome values from fixture files"""
__tablename__ = 'match_outcomes'
__table_args__ = (
Index('ix_match_outcomes_match_id', 'match_id'),
Index('ix_match_outcomes_column_name', 'column_name'),
Index('ix_match_outcomes_float_value', 'float_value'),
Index('ix_match_outcomes_composite', 'match_id', 'column_name'),
UniqueConstraint('match_id', 'column_name', name='uq_match_outcomes_match_column'),
)
match_id = Column(Integer, ForeignKey('matches.id', ondelete='CASCADE'), nullable=False, comment='Foreign key to matches table')
column_name = Column(String(255), nullable=False, comment='Result column name from fixture file')
float_value = Column(Float, nullable=False, comment='Float value with precision')
# Relationships
match = relationship('MatchModel', back_populates='outcomes')
def __repr__(self):
return f'<MatchOutcome {self.column_name}={self.float_value} for Match {self.match_id}>'
\ No newline at end of file
......@@ -372,6 +372,7 @@ class DashboardAPI:
"username": user["username"],
"email": user["email"],
"is_admin": user["is_admin"],
"role": user.get("role", "normal"), # Include role field
"created_at": user["created_at"].isoformat() if user["created_at"] else None,
"last_login": user["last_login"].isoformat() if user["last_login"] else None
}
......@@ -384,8 +385,8 @@ class DashboardAPI:
logger.error(f"Failed to get users: {e}")
return {"error": str(e)}
def create_user(self, username: str, email: str, password: str,
is_admin: bool = False) -> Dict[str, Any]:
def create_user(self, username: str, email: str, password: str,
is_admin: bool = False, role: str = 'normal') -> Dict[str, Any]:
"""Create new user (admin only)"""
try:
from .auth import AuthManager
......@@ -395,7 +396,7 @@ class DashboardAPI:
if not auth_manager:
return {"error": "Auth manager not available"}
user = auth_manager.create_user(username, email, password, is_admin)
user = auth_manager.create_user(username, email, password, is_admin, role)
if user:
return {
......@@ -404,7 +405,8 @@ class DashboardAPI:
"id": user["id"],
"username": user["username"],
"email": user["email"],
"is_admin": user["is_admin"]
"is_admin": user["is_admin"],
"role": user.get("role", "normal")
}
}
else:
......@@ -415,7 +417,7 @@ class DashboardAPI:
return {"error": str(e)}
def update_user(self, user_id: int, username: str = None, email: str = None,
password: str = None, is_admin: bool = None) -> Dict[str, Any]:
password: str = None, is_admin: bool = None, role: str = None) -> Dict[str, Any]:
"""Update user (admin only)"""
try:
from .auth import AuthManager
......@@ -425,7 +427,7 @@ class DashboardAPI:
if not auth_manager:
return {"error": "Auth manager not available"}
user = auth_manager.update_user(user_id, username, email, password, is_admin)
user = auth_manager.update_user(user_id, username, email, password, is_admin, role)
if user:
return {
......@@ -434,7 +436,8 @@ class DashboardAPI:
"id": user["id"],
"username": user["username"],
"email": user["email"],
"is_admin": user["is_admin"]
"is_admin": user["is_admin"],
"role": user.get("role", "normal")
}
}
else:
......
......@@ -115,7 +115,8 @@ class WebDashboard(ThreadedComponent):
user_id=user_model.id,
username=user_model.username,
email=user_model.email,
is_admin=user_model.is_admin
is_admin=user_model.is_admin,
role=getattr(user_model, 'role', 'normal')
)
return None
......
......@@ -6,7 +6,7 @@ import hashlib
import secrets
import logging
from datetime import datetime, timedelta
from typing import Optional, Dict, Any, Tuple
from typing import Optional, Dict, Any, Tuple, List
from flask import Flask, request, session
from flask_login import UserMixin
from flask_jwt_extended import create_access_token, decode_token
......@@ -21,11 +21,12 @@ logger = logging.getLogger(__name__)
class AuthenticatedUser(UserMixin):
"""User class for Flask-Login"""
def __init__(self, user_id: int, username: str, email: str, is_admin: bool = False):
def __init__(self, user_id: int, username: str, email: str, is_admin: bool = False, role: str = 'normal'):
self.id = user_id
self.username = username
self.email = email
self.is_admin = is_admin
self.role = role
# Don't set Flask-Login properties - they are handled by UserMixin
def get_id(self):
......@@ -51,8 +52,21 @@ class AuthenticatedUser(UserMixin):
'id': self.id,
'username': self.username,
'email': self.email,
'is_admin': self.is_admin
'is_admin': self.is_admin,
'role': self.role
}
def is_admin_user(self) -> bool:
"""Check if user has admin role"""
return self.role == 'admin' or self.is_admin
def is_cashier_user(self) -> bool:
"""Check if user has cashier role"""
return self.role == 'cashier'
def is_normal_user(self) -> bool:
"""Check if user has normal role"""
return self.role == 'normal'
class AuthManager:
......@@ -101,7 +115,8 @@ class AuthManager:
user_id=user.id,
username=user.username,
email=user.email,
is_admin=user.is_admin
is_admin=user.is_admin,
role=getattr(user, 'role', 'normal') # Default to normal if role field doesn't exist yet
)
logger.info(f"User authenticated successfully: {username}")
......@@ -112,7 +127,7 @@ class AuthManager:
return None
def create_user(self, username: str, email: str, password: str,
is_admin: bool = False) -> Optional[Dict[str, Any]]:
is_admin: bool = False, role: str = 'normal') -> Optional[Dict[str, Any]]:
"""Create new user"""
try:
# Check if user already exists
......@@ -136,6 +151,13 @@ class AuthManager:
created_at=datetime.utcnow()
)
# Set role (handle backward compatibility)
if hasattr(user, 'set_role'):
user.set_role(role)
elif hasattr(user, 'role'):
user.role = role
user.is_admin = (role == 'admin')
saved_user_data = self.db_manager.save_user(user)
logger.info(f"User created successfully: {username}")
return saved_user_data
......@@ -145,7 +167,7 @@ class AuthManager:
return None
def update_user(self, user_id: int, username: str = None, email: str = None,
password: str = None, is_admin: bool = None) -> Optional[Dict[str, Any]]:
password: str = None, is_admin: bool = None, role: str = None) -> Optional[Dict[str, Any]]:
"""Update user information"""
try:
user = self.db_manager.get_user_by_id(user_id)
......@@ -177,6 +199,14 @@ class AuthManager:
if is_admin is not None:
user.is_admin = is_admin
# Update role if provided
if role is not None:
if hasattr(user, 'set_role'):
user.set_role(role)
elif hasattr(user, 'role'):
user.role = role
user.is_admin = (role == 'admin')
# Update timestamp
user.updated_at = datetime.utcnow()
......@@ -378,6 +408,7 @@ class AuthManager:
'user_id': user.id,
'username': user.username,
'is_admin': user.is_admin,
'role': getattr(user, 'role', 'normal'),
'token_name': api_token.name,
'token_id': api_token.id
}
......@@ -476,9 +507,32 @@ class AuthManager:
if not hasattr(request, 'current_user'):
return {'error': 'Authentication required'}, 401
if not request.current_user.get('is_admin', False):
user_role = request.current_user.get('role', 'normal')
is_admin = request.current_user.get('is_admin', False)
if user_role != 'admin' and not is_admin:
return {'error': 'Admin access required'}, 403
return f(*args, **kwargs)
return decorated_function
\ No newline at end of file
return decorated_function
def require_role(self, allowed_roles: List[str]):
"""Decorator for routes requiring specific roles"""
from functools import wraps
def decorator(f):
@wraps(f)
def decorated_function(*args, **kwargs):
if not hasattr(request, 'current_user'):
return {'error': 'Authentication required'}, 401
user_role = request.current_user.get('role', 'normal')
if user_role not in allowed_roles:
return {'error': f'Access denied. Required roles: {", ".join(allowed_roles)}'}, 403
return f(*args, **kwargs)
return decorated_function
return decorator
\ No newline at end of file
This diff is collapsed.
This diff is collapsed.
......@@ -66,6 +66,49 @@
</div>
</div>
<!-- API Settings -->
<div class="card mb-4">
<div class="card-header">
<h5>API Settings</h5>
</div>
<div class="card-body">
<form id="api-config-form">
<div class="mb-3">
<label for="fastapi-url" class="form-label">FastAPI Server URL</label>
<input type="url" class="form-control" id="fastapi-url"
value="{{ config.fastapi_url or 'https://mbetter.nexlab.net/api/updates' }}"
placeholder="https://mbetter.nexlab.net/api/updates">
<div class="form-text">Base URL for FastAPI server requests (include https:// or http://)</div>
</div>
<div class="mb-3">
<label for="api-token" class="form-label">API Access Token</label>
<input type="password" class="form-control" id="api-token"
value="{{ config.api_token or '' }}"
placeholder="Enter your API access token">
<div class="form-text">Authentication token for FastAPI server access</div>
</div>
<div class="mb-3">
<label for="api-timeout" class="form-label">Request Timeout (seconds)</label>
<input type="number" class="form-control" id="api-timeout"
value="{{ config.api_timeout or 30 }}" min="5" max="300">
<div class="form-text">Timeout for API requests (5-300 seconds)</div>
</div>
<div class="form-check mb-3">
<input class="form-check-input" type="checkbox" id="api-enabled"
{% if config.api_enabled != false %}checked{% endif %}>
<label class="form-check-label" for="api-enabled">
Enable API Client
</label>
<div class="form-text">Enable/disable automatic API requests</div>
</div>
<button type="submit" class="btn btn-primary">Save API Settings</button>
<button type="button" class="btn btn-outline-secondary ms-2" id="test-api-connection">
Test Connection
</button>
</form>
</div>
</div>
<!-- Database Settings -->
<div class="card">
<div class="card-header">
......@@ -123,6 +166,61 @@
saveConfig('database', config);
});
// Save API configuration
document.getElementById('api-config-form').addEventListener('submit', function(e) {
e.preventDefault();
const config = {
fastapi_url: document.getElementById('fastapi-url').value,
api_token: document.getElementById('api-token').value,
api_timeout: parseInt(document.getElementById('api-timeout').value),
api_enabled: document.getElementById('api-enabled').checked
};
saveConfig('api', config);
});
// Test API connection
document.getElementById('test-api-connection').addEventListener('click', function() {
const url = document.getElementById('fastapi-url').value;
const token = document.getElementById('api-token').value;
if (!url) {
alert('Please enter a FastAPI URL first');
return;
}
this.disabled = true;
this.textContent = 'Testing...';
fetch('/api/config/test-connection', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
},
body: JSON.stringify({
url: url,
token: token
})
})
.then(response => response.json())
.then(data => {
if (data.success) {
alert('Connection successful! Response: ' + (data.message || 'OK'));
} else {
alert('Connection failed: ' + (data.error || 'Unknown error'));
}
})
.catch(error => {
console.error('Error:', error);
alert('Connection test failed: ' + error.message);
})
.finally(() => {
this.disabled = false;
this.textContent = 'Test Connection';
});
});
// Generic config save function
function saveConfig(section, config) {
fetch('/api/config/' + section, {
......
......@@ -10,72 +10,6 @@
</div>
</div>
<!-- System Status Cards -->
<div class="row mb-4">
<div class="col-md-3 col-sm-6">
<div class="card bg-primary text-white">
<div class="card-body">
<div class="d-flex justify-content-between">
<div>
<h6 class="card-title">System Status</h6>
<h4 id="system-status-text">Online</h4>
</div>
<div class="align-self-center">
<i class="fas fa-server fa-2x opacity-75"></i>
</div>
</div>
</div>
</div>
</div>
<div class="col-md-3 col-sm-6">
<div class="card bg-success text-white">
<div class="card-body">
<div class="d-flex justify-content-between">
<div>
<h6 class="card-title">Video Player</h6>
<h4 id="video-status-text">Ready</h4>
</div>
<div class="align-self-center">
<i class="fas fa-video fa-2x opacity-75"></i>
</div>
</div>
</div>
</div>
</div>
<div class="col-md-3 col-sm-6">
<div class="card bg-info text-white">
<div class="card-body">
<div class="d-flex justify-content-between">
<div>
<h6 class="card-title">Active Template</h6>
<h4 id="active-template">News</h4>
</div>
<div class="align-self-center">
<i class="fas fa-layer-group fa-2x opacity-75"></i>
</div>
</div>
</div>
</div>
</div>
<div class="col-md-3 col-sm-6">
<div class="card bg-warning text-white">
<div class="card-body">
<div class="d-flex justify-content-between">
<div>
<h6 class="card-title">API Tokens</h6>
<h4 id="token-count">0</h4>
</div>
<div class="align-self-center">
<i class="fas fa-key fa-2x opacity-75"></i>
</div>
</div>
</div>
</div>
</div>
</div>
<!-- Quick Actions -->
<div class="row mb-4">
......@@ -525,4 +459,4 @@ function loadAvailableTemplates() {
});
}
</script>
{% endblock %}
\ No newline at end of file
{% endblock %}
......@@ -66,11 +66,18 @@
<label for="confirm-password" class="form-label">Confirm Password</label>
<input type="password" class="form-control" id="confirm-password" required>
</div>
<div class="form-check mb-3">
<input class="form-check-input" type="checkbox" id="is-admin">
<label class="form-check-label" for="is-admin">
Administrator
</label>
<div class="mb-3">
<label for="user-role" class="form-label">User Role</label>
<select class="form-select" id="user-role" required>
<option value="normal">Normal User</option>
<option value="cashier">Cashier</option>
<option value="admin">Administrator</option>
</select>
<div class="form-text">
<strong>Normal User:</strong> Full dashboard access<br>
<strong>Cashier:</strong> Limited access to video/overlay controls only<br>
<strong>Administrator:</strong> Full system access including user management
</div>
</div>
</form>
</div>
......@@ -98,15 +105,43 @@
const users = data.users || [];
users.forEach(user => {
// Determine role display
let roleDisplay = 'Normal User';
let roleBadgeClass = 'bg-primary';
if (user.role) {
if (user.role === 'admin') {
roleDisplay = 'Administrator';
roleBadgeClass = 'bg-danger';
} else if (user.role === 'cashier') {
roleDisplay = 'Cashier';
roleBadgeClass = 'bg-info';
} else {
roleDisplay = 'Normal User';
roleBadgeClass = 'bg-primary';
}
} else if (user.is_admin) {
// Backward compatibility
roleDisplay = 'Administrator';
roleBadgeClass = 'bg-danger';
}
const row = document.createElement('tr');
// Disable delete for the default admin user
const isDefaultAdmin = user.username === 'admin';
const deleteButton = isDefaultAdmin
? `<button class="btn btn-sm btn-danger" disabled title="Cannot delete default admin user">Delete</button>`
: `<button class="btn btn-sm btn-danger delete-user" data-id="${user.id}">Delete</button>`;
row.innerHTML = `
<td>${user.username}</td>
<td>${user.email}</td>
<td>${user.is_admin ? 'Administrator' : 'User'}</td>
<td><span class="badge ${roleBadgeClass}">${roleDisplay}</span></td>
<td>${user.last_login || 'Never'}</td>
<td>
<button class="btn btn-sm btn-primary edit-user" data-id="${user.id}">Edit</button>
<button class="btn btn-sm btn-danger delete-user" data-id="${user.id}">Delete</button>
<button class="btn btn-sm btn-primary edit-user" data-id="${user.id}" data-role="${user.role || (user.is_admin ? 'admin' : 'normal')}">Edit</button>
${deleteButton}
</td>
`;
tbody.appendChild(row);
......@@ -135,11 +170,12 @@
// Edit user
function editUser(userId) {
// Get user data first
const userRow = document.querySelector(`[data-id="${userId}"]`).closest('tr');
const editButton = document.querySelector(`[data-id="${userId}"].edit-user`);
const userRow = editButton.closest('tr');
const cells = userRow.querySelectorAll('td');
const currentUsername = cells[0].textContent;
const currentEmail = cells[1].textContent;
const currentIsAdmin = cells[2].textContent === 'Administrator';
const currentRole = editButton.getAttribute('data-role') || 'normal';
// Create edit modal dynamically
const editModal = `
......@@ -164,11 +200,18 @@
<label for="edit-password" class="form-label">New Password (leave empty to keep current)</label>
<input type="password" class="form-control" id="edit-password">
</div>
<div class="form-check mb-3">
<input class="form-check-input" type="checkbox" id="edit-is-admin" ${currentIsAdmin ? 'checked' : ''}>
<label class="form-check-label" for="edit-is-admin">
Administrator
</label>
<div class="mb-3">
<label for="edit-user-role" class="form-label">User Role</label>
<select class="form-select" id="edit-user-role" required>
<option value="normal" ${currentRole === 'normal' ? 'selected' : ''}>Normal User</option>
<option value="cashier" ${currentRole === 'cashier' ? 'selected' : ''}>Cashier</option>
<option value="admin" ${currentRole === 'admin' ? 'selected' : ''}>Administrator</option>
</select>
<div class="form-text">
<strong>Normal User:</strong> Full dashboard access<br>
<strong>Cashier:</strong> Limited access to video/overlay controls only<br>
<strong>Administrator:</strong> Full system access including user management
</div>
</div>
</form>
</div>
......@@ -199,17 +242,18 @@
const username = document.getElementById('edit-username').value;
const email = document.getElementById('edit-email').value;
const password = document.getElementById('edit-password').value;
const isAdmin = document.getElementById('edit-is-admin').checked;
const role = document.getElementById('edit-user-role').value;
if (!username || !email) {
alert('Username and email are required');
if (!username || !email || !role) {
alert('Username, email and role are required');
return;
}
const updateData = {
username: username,
email: email,
is_admin: isAdmin
role: role,
is_admin: role === 'admin' // For backward compatibility
};
// Only include password if it's provided
......@@ -276,9 +320,9 @@
const email = document.getElementById('email').value;
const password = document.getElementById('password').value;
const confirmPassword = document.getElementById('confirm-password').value;
const isAdmin = document.getElementById('is-admin').checked;
const role = document.getElementById('user-role').value;
if (!username || !email || !password || !confirmPassword) {
if (!username || !email || !password || !confirmPassword || !role) {
alert('All fields are required');
return;
}
......@@ -298,7 +342,8 @@
username: username,
email: email,
password: password,
is_admin: isAdmin
role: role,
is_admin: role === 'admin' // For backward compatibility
})
})
.then(response => response.json())
......
#!/usr/bin/env python3
from PyQt6.QtWidgets import QApplication, QMainWindow
import sys
app = QApplication(sys.argv)
window = QMainWindow()
window.show()
sys.exit(app.exec())
#!/usr/bin/env python3
"""
Cross-platform test script for persistent directory functionality
Tests MbetterClient directory creation on Windows, macOS, and Linux
"""
import sys
import platform
from pathlib import Path
# Add project root to path
sys.path.insert(0, str(Path(__file__).parent))
from mbetterclient.config.settings import get_user_data_dir, get_user_config_dir, get_user_cache_dir, is_pyinstaller_executable
def test_directory_creation():
"""Test directory creation and permissions across platforms"""
print("=" * 60)
print(f"MbetterClient Cross-Platform Directory Test")
print("=" * 60)
print(f"Platform: {platform.system()} {platform.release()}")
print(f"Python: {sys.version}")
print(f"PyInstaller mode: {is_pyinstaller_executable()}")
print()
# Test each directory type
directories = {
"Data Directory": get_user_data_dir,
"Config Directory": get_user_config_dir,
"Cache Directory": get_user_cache_dir
}
results = {}
for dir_name, dir_func in directories.items():
print(f"Testing {dir_name}...")
try:
# Get directory path
dir_path = dir_func()
print(f" Path: {dir_path}")
# Check if directory exists
if dir_path.exists():
print(f" ✅ Directory exists")
else:
print(f" ❌ Directory does not exist")
continue
# Check if directory is writable
test_file = dir_path / f'.test_write_{dir_name.lower().replace(" ", "_")}'
try:
test_file.write_text('MbetterClient test file')
test_file.unlink()
print(f" ✅ Directory is writable")
writable = True
except (OSError, PermissionError) as e:
print(f" ❌ Directory not writable: {e}")
writable = False
# Test subdirectory creation
test_subdir = dir_path / 'test_subdir'
try:
test_subdir.mkdir(exist_ok=True)
test_subdir.rmdir()
print(f" ✅ Can create subdirectories")
can_create_subdirs = True
except (OSError, PermissionError) as e:
print(f" ❌ Cannot create subdirectories: {e}")
can_create_subdirs = False
results[dir_name] = {
'path': str(dir_path),
'exists': dir_path.exists(),
'writable': writable,
'can_create_subdirs': can_create_subdirs
}
except Exception as e:
print(f" ❌ Error testing {dir_name}: {e}")
results[dir_name] = {
'path': 'ERROR',
'exists': False,
'writable': False,
'can_create_subdirs': False,
'error': str(e)
}
print()
return results
def test_database_path():
"""Test database path resolution"""
print("Testing Database Path Resolution...")
try:
from mbetterclient.config.settings import DatabaseConfig
# Test default database config
db_config = DatabaseConfig()
db_path = db_config.get_absolute_path()
print(f" Database path: {db_path}")
print(f" Parent directory: {db_path.parent}")
print(f" Parent exists: {db_path.parent.exists()}")
# Try to create parent directory
try:
db_path.parent.mkdir(parents=True, exist_ok=True)
print(f" ✅ Can create database parent directory")
except Exception as e:
print(f" ❌ Cannot create database parent directory: {e}")
return str(db_path)
except Exception as e:
print(f" ❌ Error testing database path: {e}")
return None
def test_application_directories():
"""Test actual application directory structure"""
print("Testing Application Directory Structure...")
try:
from mbetterclient.config.settings import AppSettings
settings = AppSettings()
settings.ensure_directories()
# Check directories that should be created
data_dir = get_user_data_dir()
config_dir = get_user_config_dir()
required_dirs = [
data_dir / "logs",
data_dir / "data",
data_dir / "uploads",
config_dir / "templates"
]
all_good = True
for req_dir in required_dirs:
if req_dir.exists():
print(f" ✅ {req_dir}")
else:
print(f" ❌ Missing: {req_dir}")
all_good = False
return all_good
except Exception as e:
print(f" ❌ Error testing application directories: {e}")
return False
def main():
"""Main test function"""
print("Starting MbetterClient cross-platform directory tests...")
print()
# Test basic directory creation
dir_results = test_directory_creation()
# Test database path
db_path = test_database_path()
print()
# Test application directory structure
app_dirs_ok = test_application_directories()
print()
# Summary
print("=" * 60)
print("TEST SUMMARY")
print("=" * 60)
all_tests_passed = True
for dir_name, result in dir_results.items():
status = "✅ PASS" if (result['exists'] and result['writable'] and result['can_create_subdirs']) else "❌ FAIL"
print(f"{dir_name}: {status}")
if 'error' in result:
print(f" Error: {result['error']}")
else:
print(f" Path: {result['path']}")
if status == "❌ FAIL":
all_tests_passed = False
print()
print(f"Database Path: {'✅ OK' if db_path else '❌ ERROR'}")
if db_path:
print(f" {db_path}")
print(f"Application Structure: {'✅ OK' if app_dirs_ok else '❌ ERROR'}")
print()
if all_tests_passed and db_path and app_dirs_ok:
print("🎉 ALL TESTS PASSED - Cross-platform persistence ready!")
return 0
else:
print("⚠️ SOME TESTS FAILED - Check errors above")
return 1
if __name__ == "__main__":
sys.exit(main())
\ No newline at end of file
#!/usr/bin/env python3
"""
Standalone test application for PyQt6 Video Player with QWebEngineView overlay
"""
import sys
import logging
import time
from pathlib import Path
from dataclasses import dataclass
from PyQt6.QtWidgets import QApplication
from PyQt6.QtCore import QTimer
# Add project path for imports
project_path = Path(__file__).parent
sys.path.insert(0, str(project_path))
from mbetterclient.qt_player.qt6_player import Qt6VideoPlayer, PlayerWindow
from mbetterclient.core.message_bus import MessageBus, MessageBuilder
from mbetterclient.config.settings import QtConfig
@dataclass
class TestQtConfig:
"""Test configuration for Qt player"""
fullscreen: bool = False
window_width: int = 1280
window_height: int = 720
always_on_top: bool = False
auto_play: bool = True
volume: float = 0.8
mute: bool = False
def setup_logging():
"""Setup logging for the test application"""
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.StreamHandler(sys.stdout),
logging.FileHandler('qt6_player_test.log')
]
)
def test_standalone_player():
"""Test the standalone PyQt6 player window"""
print("Testing Standalone PyQt6 Player...")
app = QApplication(sys.argv)
config = TestQtConfig()
# Create player window directly
window = PlayerWindow(config)
# Show window
window.show()
# Test overlay updates
overlay_view = window.video_widget.get_overlay_view()
def update_overlay_demo():
"""Demo function to update overlay periodically"""
current_time = time.strftime("%H:%M:%S")
overlay_data = {
'title': f'PyQt6 Demo - {current_time}',
'subtitle': 'Multi-threaded Video Player with WebEngine Overlay',
'ticker': 'Real-time JavaScript ↔ Python Communication • Hardware Accelerated Video • Professional Animations'
}
overlay_view.update_overlay_data(overlay_data)
print(f"Updated overlay at {current_time}")
# Setup periodic overlay updates
timer = QTimer()
timer.timeout.connect(update_overlay_demo)
timer.start(2000) # Update every 2 seconds
# Initial overlay update
update_overlay_demo()
print("PyQt6 Player Window created successfully!")
print("Features demonstrated:")
print("- QMediaPlayer + QVideoWidget for hardware-accelerated video")
print("- QWebEngineView overlay with transparent background")
print("- QWebChannel bidirectional Python ↔ JavaScript communication")
print("- CSS3 animations with GSAP integration")
print("- Thread-safe signal/slot mechanisms")
print("- QTimer integration for real-time updates")
print("- Professional UI with responsive design")
print("- Cross-platform compatibility")
print("\nControls:")
print("- Space: Play/Pause")
print("- F11: Toggle Fullscreen")
print("- S: Toggle Stats Panel")
print("- M: Toggle Mute")
print("- Escape: Exit")
print("\nClose the window to exit the test.")
return app.exec()
def test_threaded_player():
"""Test the full threaded PyQt6 player component"""
print("Testing Threaded PyQt6 Player Component...")
# Create message bus
message_bus = MessageBus()
# Create Qt config
config = TestQtConfig()
# Create Qt6 player component
player = Qt6VideoPlayer(message_bus, config)
# Initialize player
if not player.initialize():
print("Failed to initialize Qt6VideoPlayer!")
return 1
# Start player in separate thread (simulation)
print("Qt6VideoPlayer initialized successfully!")
# Test sending messages
def send_test_messages():
"""Send test messages to player"""
time.sleep(2)
# Test overlay update
overlay_message = MessageBuilder.template_change(
sender="test_app",
template_name="demo_template",
template_data={
'title': 'Threaded Player Demo',
'subtitle': 'Message Bus Communication Test',
'ticker': 'Successfully communicating via MessageBus • Multi-threaded Architecture • Real-time Updates'
}
)
overlay_message.recipient = "qt6_player"
message_bus.publish(overlay_message)
print("Sent overlay update message")
# Test video info update
time.sleep(2)
video_info_message = MessageBuilder.system_status(
sender="test_app",
status="demo",
details={
'videoInfo': {
'resolution': '1920x1080',
'bitrate': '8.5 Mbps',
'codec': 'H.265/HEVC',
'fps': '60.0'
}
}
)
video_info_message.recipient = "qt6_player"
message_bus.publish(video_info_message)
print("Sent video info update")
# Setup test message timer
timer = QTimer()
timer.timeout.connect(send_test_messages)
timer.setSingleShot(True)
timer.start(1000) # Start after 1 second
print("Threaded player test started. Close the player window to exit.")
# Run the player (this would normally be in a separate thread)
try:
player.run()
except KeyboardInterrupt:
print("Stopping player...")
player.shutdown()
return 0
def main():
"""Main test function"""
setup_logging()
print("PyQt6 Multi-threaded Video Player Test Suite")
print("=" * 50)
if len(sys.argv) > 1:
test_mode = sys.argv[1]
else:
print("Available test modes:")
print("1. standalone - Test standalone player window")
print("2. threaded - Test full threaded player component")
print()
test_mode = input("Select test mode (1 or 2): ").strip()
if test_mode == "1":
test_mode = "standalone"
elif test_mode == "2":
test_mode = "threaded"
else:
test_mode = "standalone"
try:
if test_mode == "standalone":
return test_standalone_player()
elif test_mode == "threaded":
return test_threaded_player()
else:
print(f"Unknown test mode: {test_mode}")
return 1
except Exception as e:
print(f"Test failed with error: {e}")
import traceback
traceback.print_exc()
return 1
if __name__ == "__main__":
sys.exit(main())
\ No newline at end of file
#!/usr/bin/env python3
"""
Test script for Qt player functionality
"""
import sys
import os
import logging
import time
import threading
from pathlib import Path
# Add the project root to Python path
project_root = Path(__file__).parent
sys.path.insert(0, str(project_root))
from mbetterclient.config.settings import AppSettings
from mbetterclient.core.message_bus import MessageBus, MessageBuilder, MessageType
from mbetterclient.qt_player.player import QtVideoPlayer
def setup_logging():
"""Setup logging for the test"""
logging.basicConfig(
level=logging.DEBUG,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
return logging.getLogger(__name__)
def test_qt_player_standalone():
"""Test Qt player in standalone mode"""
logger = setup_logging()
logger.info("Starting Qt player standalone test")
# Create settings
settings = AppSettings()
settings.qt.fullscreen = False
settings.qt.window_width = 800
settings.qt.window_height = 600
# Create message bus
message_bus = MessageBus()
# Create Qt player
qt_player = QtVideoPlayer(message_bus, settings.qt)
# Initialize Qt player
if not qt_player.initialize():
logger.error("Failed to initialize Qt player")
return 1
logger.info("Qt player initialized successfully")
# Start message processing in a separate thread
qt_player.start_message_processing()
# Send a test message to display default overlay
test_message = MessageBuilder.template_change(
sender="test",
template_data={
"title": "Qt Player Test",
"subtitle": "Standalone Mode Test",
"ticker": "This is a test of the Qt player in standalone mode"
}
)
message_bus.publish(test_message)
# Run Qt event loop (this will block until window is closed)
logger.info("Running Qt event loop - close the window to exit")
exit_code = qt_player.run()
# Cleanup
qt_player.shutdown()
logger.info("Qt player test completed")
return exit_code
def test_qt_player_with_message_bus():
"""Test Qt player with message bus communication"""
logger = setup_logging()
logger.info("Starting Qt player message bus test")
# Create settings
settings = AppSettings()
settings.qt.fullscreen = False
settings.qt.window_width = 800
settings.qt.window_height = 600
# Create message bus
message_bus = MessageBus()
# Create Qt player
qt_player = QtVideoPlayer(message_bus, settings.qt)
# Initialize Qt player
if not qt_player.initialize():
logger.error("Failed to initialize Qt player")
return 1
logger.info("Qt player initialized successfully")
# Start message processing in a separate thread
qt_player.start_message_processing()
# Send test messages
def send_test_messages():
time.sleep(2) # Wait for window to be ready
# Send overlay update
overlay_message = MessageBuilder.overlay_update(
sender="test",
overlay_data={
"title": "Message Bus Test",
"subtitle": "Testing message bus communication",
"showStats": True
}
)
message_bus.publish(overlay_message)
time.sleep(3)
# Send another overlay update
overlay_message2 = MessageBuilder.overlay_update(
sender="test",
overlay_data={
"title": "Message Bus Test Continued",
"subtitle": "Second message bus test",
"ticker": "Testing continuous updates through message bus"
}
)
message_bus.publish(overlay_message2)
logger.info("Test messages sent")
# Start message sending in a separate thread
message_thread = threading.Thread(target=send_test_messages)
message_thread.start()
# Run Qt event loop (this will block until window is closed)
logger.info("Running Qt event loop with message bus test - close the window to exit")
exit_code = qt_player.run()
# Wait for message thread to finish
message_thread.join()
# Cleanup
qt_player.shutdown()
logger.info("Qt player message bus test completed")
return exit_code
if __name__ == "__main__":
if len(sys.argv) > 1 and sys.argv[1] == "standalone":
exit_code = test_qt_player_standalone()
elif len(sys.argv) > 1 and sys.argv[1] == "message_bus":
exit_code = test_qt_player_with_message_bus()
else:
print("Usage: python test_qt_player.py [standalone|message_bus]")
print(" standalone: Test Qt player in standalone mode")
print(" message_bus: Test Qt player with message bus communication")
sys.exit(1)
sys.exit(exit_code)
\ No newline at end of file
#!/usr/bin/env python3
"""
Debug script specifically for testing video playback visibility in Qt player
Tests both native and WebEngine overlays to isolate the video blocking issue
"""
import sys
import logging
import time
from pathlib import Path
from dataclasses import dataclass
from PyQt6.QtWidgets import QApplication
from PyQt6.QtCore import QTimer
# Add project path for imports
project_path = Path(__file__).parent
sys.path.insert(0, str(project_path))
from mbetterclient.qt_player.player import PlayerWindow
@dataclass
class DebugQtConfig:
"""Debug configuration for Qt player"""
fullscreen: bool = False
window_width: int = 800
window_height: int = 600
always_on_top: bool = False
auto_play: bool = True
volume: float = 0.8
mute: bool = False
use_native_overlay: bool = True # Start with native overlay for testing
def setup_debug_logging():
"""Setup debug logging"""
logging.basicConfig(
level=logging.DEBUG,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.StreamHandler(sys.stdout),
logging.FileHandler('video_debug.log')
]
)
def test_video_playback_native():
"""Test video playback with native overlay (should not block video)"""
print("Testing Video Playback with NATIVE overlay...")
app = QApplication(sys.argv)
config = DebugQtConfig()
config.use_native_overlay = True # Force native overlay
# Create player window with native overlay
window = PlayerWindow(config)
window.show()
# Test with our generated test video
test_video_path = "test_video.mp4"
def play_test_video():
"""Play the test video after a short delay"""
print(f"Playing test video: {test_video_path}")
window.play_video(test_video_path)
# Update overlay to confirm it's working
overlay_data = {
'title': 'DEBUG: Native Overlay Test',
'subtitle': 'Video should be VISIBLE underneath this overlay',
'ticker': 'If you can see moving colors/patterns, video is working! Native overlay should not block video.'
}
# Use the new separate window overlay
if hasattr(window, 'window_overlay') and window.window_overlay:
window.window_overlay.update_overlay_data(overlay_data)
else:
print("Warning: No window overlay available")
# Play video after 2 seconds
QTimer.singleShot(2000, play_test_video)
print("Native Overlay Test Window created!")
print("Expected behavior:")
print("- You should see a test pattern video (moving colors/gradients)")
print("- Native overlay text should appear ON TOP of the video")
print("- If video is NOT visible, the issue is deeper than overlay blocking")
print("\nControls:")
print("- Space: Play/Pause")
print("- Escape: Exit")
return app.exec()
def test_video_playback_webengine():
"""Test video playback with WebEngine overlay (may block video)"""
print("Testing Video Playback with WEBENGINE overlay...")
app = QApplication(sys.argv)
config = DebugQtConfig()
config.use_native_overlay = False # Force WebEngine overlay
# Create player window with WebEngine overlay
window = PlayerWindow(config)
window.show()
# Test with our generated test video
test_video_path = "test_video.mp4"
def play_test_video():
"""Play the test video after a short delay"""
print(f"Playing test video: {test_video_path}")
window.play_video(test_video_path)
# Update overlay to confirm it's working
overlay_data = {
'title': 'DEBUG: WebEngine Overlay Test',
'subtitle': 'Video may be BLOCKED by this overlay',
'ticker': 'If you CANNOT see moving colors/patterns, WebEngine overlay is blocking the video!'
}
# Wait for WebEngine to be ready before updating
def update_overlay_when_ready():
# Use the new separate window overlay
if hasattr(window, 'window_overlay') and window.window_overlay:
overlay_view = window.window_overlay
if hasattr(overlay_view, 'overlay_channel') and overlay_view.overlay_channel:
if window._is_webengine_ready(overlay_view):
overlay_view.update_overlay_data(overlay_data)
print("WebEngine overlay updated")
else:
print("WebEngine not ready, retrying...")
QTimer.singleShot(1000, update_overlay_when_ready)
else:
overlay_view.update_overlay_data(overlay_data)
else:
print("Warning: No window overlay available")
QTimer.singleShot(3000, update_overlay_when_ready)
# Play video after 2 seconds
QTimer.singleShot(2000, play_test_video)
print("WebEngine Overlay Test Window created!")
print("Expected behavior:")
print("- You should see a test pattern video (moving colors/gradients)")
print("- WebEngine overlay text should appear ON TOP of the video")
print("- If video is NOT visible, WebEngine overlay is blocking it")
print("\nControls:")
print("- Space: Play/Pause")
print("- Escape: Exit")
return app.exec()
def test_uploaded_video():
"""Test with an actual uploaded video file"""
print("Testing with uploaded video files...")
# Look for uploaded videos
uploads_dir = Path("uploads")
if uploads_dir.exists():
video_files = list(uploads_dir.glob("*.mp4"))
if video_files:
video_path = video_files[0] # Use first video found
print(f"Found uploaded video: {video_path}")
app = QApplication(sys.argv)
config = DebugQtConfig()
config.use_native_overlay = True # Start with native
window = PlayerWindow(config)
window.show()
def play_uploaded_video():
print(f"Playing uploaded video: {video_path}")
window.play_video(str(video_path))
overlay_data = {
'title': f'Playing: {video_path.name}',
'subtitle': 'Testing uploaded video with native overlay',
'ticker': 'This is a real uploaded video file. Video should be visible with native overlay.'
}
# Use the new separate window overlay
if hasattr(window, 'window_overlay') and window.window_overlay:
window.window_overlay.update_overlay_data(overlay_data)
else:
print("Warning: No window overlay available")
QTimer.singleShot(2000, play_uploaded_video)
print(f"Testing uploaded video: {video_path.name}")
print("This tests with a real uploaded video file")
return app.exec()
else:
print("No video files found in uploads directory")
return 1
else:
print("Uploads directory not found")
return 1
def main():
"""Main debug function"""
setup_debug_logging()
print("Qt Video Player Debug Suite")
print("=" * 40)
if len(sys.argv) > 1:
test_mode = sys.argv[1]
else:
print("Available test modes:")
print("1. native - Test with native Qt overlay (should show video)")
print("2. webengine - Test with WebEngine overlay (may block video)")
print("3. uploaded - Test with uploaded video file")
print()
choice = input("Select test mode (1, 2, or 3): ").strip()
if choice == "1":
test_mode = "native"
elif choice == "2":
test_mode = "webengine"
elif choice == "3":
test_mode = "uploaded"
else:
test_mode = "native"
try:
if test_mode == "native":
return test_video_playback_native()
elif test_mode == "webengine":
return test_video_playback_webengine()
elif test_mode == "uploaded":
return test_uploaded_video()
else:
print(f"Unknown test mode: {test_mode}")
return 1
except Exception as e:
print(f"Test failed with error: {e}")
import traceback
traceback.print_exc()
return 1
if __name__ == "__main__":
sys.exit(main())
\ No newline at end of file
#!/usr/bin/env python3
"""
Minimal video test - NO overlays at all to test pure QVideoWidget rendering
"""
import sys
import logging
from pathlib import Path
from PyQt6.QtWidgets import QApplication, QMainWindow, QVBoxLayout, QWidget
from PyQt6.QtCore import QUrl
from PyQt6.QtMultimedia import QMediaPlayer, QAudioOutput
from PyQt6.QtMultimediaWidgets import QVideoWidget
# Add project path for imports
project_path = Path(__file__).parent
sys.path.insert(0, str(project_path))
def setup_logging():
"""Setup basic logging"""
logging.basicConfig(
level=logging.DEBUG,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[logging.StreamHandler(sys.stdout)]
)
class MinimalVideoWindow(QMainWindow):
"""Absolute minimal video player - NO overlays, just pure video"""
def __init__(self):
super().__init__()
self.setup_ui()
self.setup_media_player()
def setup_ui(self):
"""Setup minimal UI - just video widget"""
self.setWindowTitle("MINIMAL Video Test - NO Overlays")
self.setGeometry(100, 100, 800, 600)
# PURE BLACK BACKGROUND - no transparency anywhere
self.setStyleSheet("QMainWindow { background-color: black; }")
# Central widget - completely opaque
central_widget = QWidget()
central_widget.setStyleSheet("background-color: black;")
self.setCentralWidget(central_widget)
# Layout
layout = QVBoxLayout(central_widget)
layout.setContentsMargins(0, 0, 0, 0)
# ONLY QVideoWidget - no overlays at all
self.video_widget = QVideoWidget()
self.video_widget.setStyleSheet("QVideoWidget { background-color: black; }")
layout.addWidget(self.video_widget)
print("Minimal video window created - PURE QVideoWidget only")
def setup_media_player(self):
"""Setup media player"""
self.media_player = QMediaPlayer()
self.audio_output = QAudioOutput()
self.media_player.setAudioOutput(self.audio_output)
self.media_player.setVideoOutput(self.video_widget)
# Connect signals for debugging
self.media_player.playbackStateChanged.connect(self.on_state_changed)
self.media_player.mediaStatusChanged.connect(self.on_status_changed)
self.media_player.errorOccurred.connect(self.on_error)
print("Media player setup completed")
def play_video(self, file_path):
"""Play video file"""
path_obj = Path(file_path)
if not path_obj.exists():
print(f"ERROR: File not found: {file_path}")
return
print(f"Loading video: {file_path}")
print(f"File size: {path_obj.stat().st_size} bytes")
url = QUrl.fromLocalFile(str(path_obj.absolute()))
print(f"QUrl: {url.toString()}")
self.media_player.setSource(url)
self.media_player.play()
print("Video play command sent")
def on_state_changed(self, state):
"""Debug state changes"""
print(f"MEDIA STATE: {state}")
def on_status_changed(self, status):
"""Debug status changes"""
print(f"MEDIA STATUS: {status}")
def on_error(self, error):
"""Debug errors"""
print(f"MEDIA ERROR: {error}")
def keyPressEvent(self, event):
"""Handle keys"""
if event.key() == 32: # Space
if self.media_player.playbackState() == QMediaPlayer.PlaybackState.PlayingState:
self.media_player.pause()
print("PAUSED")
else:
self.media_player.play()
print("PLAYING")
def main():
"""Test minimal video rendering"""
setup_logging()
print("MINIMAL VIDEO TEST - NO OVERLAYS")
print("=" * 40)
print("This test uses ONLY QVideoWidget with NO overlays")
print("If video is not visible here, the issue is with QVideoWidget itself")
print("")
app = QApplication(sys.argv)
window = MinimalVideoWindow()
window.show()
# Play test video after delay
from PyQt6.QtCore import QTimer
QTimer.singleShot(1000, lambda: window.play_video("test_video.mp4"))
print("Window shown. Video should start playing in 1 second.")
print("Expected: You should see moving test pattern (countdown)")
print("Controls: Space = Play/Pause, Escape = Exit")
return app.exec()
if __name__ == "__main__":
sys.exit(main())
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment