Optimize PyInstaller build and clean codebase

- Reduce PyInstaller binary libraries to eliminate segfaults:
  * Remove 25+ unnecessary X11/GL libraries from build.py
  * Keep only essential libraries: libxcb.so.1 and libX11.so.6
  * Update mbetter_discovery_linux.spec with minimal libraries
  * Remove qt.conf and platforms directory copying for self-contained binary

- Clean up codebase by removing unused files:
  * Remove qt6_player.py (unused duplicate implementation)
  * Remove temp_js.js and extract_js.py (utility files)
  * Remove hook-pyqt6.py and runtime_hook.py (disabled/unnecessary)

- Enhance build scripts:
  * Update MbetterClient_wrapper.sh with video safety measures
  * Update clean.sh to preserve .exe files while removing artifacts
  * Add comprehensive error handling for virtualized environments

- Restore important assets:
  * Restore dist/MBetterDiscovery.exe from git repository

Result: Self-contained binary with reduced library conflicts, better compatibility across Linux distributions and virtualized environments.
parent 658468a3
...@@ -115,12 +115,23 @@ setup_mesa_software() { ...@@ -115,12 +115,23 @@ setup_mesa_software() {
# Qt WebEngine configuration for software rendering # Qt WebEngine configuration for software rendering
export QTWEBENGINE_CHROMIUM_FLAGS="--no-sandbox --disable-gpu --disable-gpu-sandbox --disable-dev-shm-usage --disable-software-rasterizer --disable-accelerated-video-decode --disable-accelerated-video-encode --disable-gpu-compositing --disable-gpu-rasterization --disable-vulkan --disable-vulkan-surface --disable-features=Vulkan --user-data-dir=$USER_TEMP --enable-transparent-visuals --disable-background-timer-throttling --disable-renderer-backgrounding --disable-vulkan-fallback" export QTWEBENGINE_CHROMIUM_FLAGS="--no-sandbox --disable-gpu --disable-gpu-sandbox --disable-dev-shm-usage --disable-software-rasterizer --disable-accelerated-video-decode --disable-accelerated-video-encode --disable-gpu-compositing --disable-gpu-rasterization --disable-vulkan --disable-vulkan-surface --disable-features=Vulkan --user-data-dir=$USER_TEMP --enable-transparent-visuals --disable-background-timer-throttling --disable-renderer-backgrounding --disable-vulkan-fallback"
# Qt configuration # Qt configuration for virtualized environments
export QT_OPENGL=software export QT_OPENGL=software
export QTWEBENGINE_DISABLE_SANDBOX=1 export QTWEBENGINE_DISABLE_SANDBOX=1
export QT_QPA_PLATFORM=xcb export QT_QPA_PLATFORM=xcb
export QT_XCB_GL_INTEGRATION=xcb # Better transparency support export QT_XCB_GL_INTEGRATION=xcb_egl # Better compatibility with Mesa
export QT_ENABLE_HIGHDPI_SCALING=0 # Better performance export QT_ENABLE_HIGHDPI_SCALING=0 # Better performance
export QT_QUICK_BACKEND=software # Force software backend for Qt Quick
export QT_QPA_PLATFORM_PLUGIN_PATH="" # Let Qt find plugins automatically
export QT_DEBUG_PLUGINS=0 # Reduce debug output
export QT_LOGGING_RULES="qt.qpa.plugin=false" # Reduce plugin loading messages
# Additional video-specific settings for virtualized environments
export QT_MULTIMEDIA_PREFERRED_PLUGINS="" # Let Qt choose best plugin
export QT_GSTREAMER_PLAYBIN_FLAGS=0 # Disable problematic GStreamer features
export QT_MULTIMEDIA_DISABLE_GSTREAMER=1 # Disable GStreamer backend
export QT_MULTIMEDIA_FORCE_FFMPEG=1 # Force FFmpeg backend
export QT_MULTIMEDIA_DISABLE_VIDEOSINK=1 # Disable video sink in virtual environments
# Vulkan disables # Vulkan disables
export VK_ICD_FILENAMES="" export VK_ICD_FILENAMES=""
...@@ -188,6 +199,15 @@ echo "Environment: $(if [ -n "$VIRT_TYPE" ]; then echo "$VIRT_TYPE VM"; else ech ...@@ -188,6 +199,15 @@ echo "Environment: $(if [ -n "$VIRT_TYPE" ]; then echo "$VIRT_TYPE VM"; else ech
echo "Acceleration: $(if [ $HW_ACCEL -eq 1 ]; then echo "Hardware"; else echo "Software"; fi)" echo "Acceleration: $(if [ $HW_ACCEL -eq 1 ]; then echo "Hardware"; else echo "Software"; fi)"
echo "Rendering: $CONFIG_TYPE" echo "Rendering: $CONFIG_TYPE"
echo "Temp Directory: $USER_TEMP" echo "Temp Directory: $USER_TEMP"
if [ -n "$VIRT_TYPE" ]; then
echo ""
echo "⚠️ VIRTUALIZED ENVIRONMENT DETECTED:"
echo " Video playback may not be available due to graphics limitations."
echo " The application will run in audio-only mode if video fails."
echo " This is normal behavior in virtual machines."
fi
echo "" echo ""
# Verify binary exists # Verify binary exists
......
...@@ -80,6 +80,8 @@ def clean_build_directories(): ...@@ -80,6 +80,8 @@ def clean_build_directories():
spec_file.unlink() spec_file.unlink()
print(f" Removed: {spec_file}") print(f" Removed: {spec_file}")
# Removed qt.conf cleanup for self-contained binary
def collect_data_files() -> List[tuple]: def collect_data_files() -> List[tuple]:
"""Collect data files that need to be included in the build""" """Collect data files that need to be included in the build"""
...@@ -151,12 +153,15 @@ def collect_data_files() -> List[tuple]: ...@@ -151,12 +153,15 @@ def collect_data_files() -> List[tuple]:
data_files.append((str(file_path), str(relative_path.parent))) data_files.append((str(file_path), str(relative_path.parent)))
print(f" 📁 Including asset directory: {asset_dir}") print(f" 📁 Including asset directory: {asset_dir}")
# Removed qt.conf creation for self-contained binary
print(" 📦 Building self-contained binary - no external Qt configuration needed")
return data_files return data_files
def collect_hidden_imports() -> List[str]: def collect_hidden_imports() -> List[str]:
"""Collect hidden imports that PyInstaller might miss""" """Collect hidden imports that PyInstaller might miss"""
return [ hidden_imports = [
# PyQt6 modules # PyQt6 modules
'PyQt6.QtCore', 'PyQt6.QtCore',
'PyQt6.QtGui', 'PyQt6.QtGui',
...@@ -189,29 +194,42 @@ def collect_hidden_imports() -> List[str]: ...@@ -189,29 +194,42 @@ def collect_hidden_imports() -> List[str]:
'watchdog.observers', 'watchdog.observers',
'watchdog.events', 'watchdog.events',
# FFmpeg-python and related modules
'ffmpeg',
'ffmpeg._run',
'ffmpeg._utils',
'ffmpeg.nodes',
'ffmpeg.streams',
'ffmpeg.filter',
'ffmpeg.filter.graph',
'ffmpeg.input',
'ffmpeg.output',
'ffmpeg.run',
'ffmpeg.run_async',
'ffmpeg.overwrite_output',
'ffmpeg.concat',
'ffmpeg.filter_complex',
'ffmpeg.global_args',
'ffmpeg.merge_outputs',
# Other dependencies # Other dependencies
'packaging', 'packaging',
'pkg_resources', 'pkg_resources',
] ]
# Conditionally add ffmpeg module if available
try:
import ffmpeg
hidden_imports.append('ffmpeg')
print(" 📦 Added ffmpeg to hidden imports")
except ImportError:
print(" ⚠️ ffmpeg-python not available, skipping ffmpeg imports")
# Conditionally add Qt D-Bus support if available
try:
import PyQt6.QtDBus
hidden_imports.append('PyQt6.QtDBus')
print(" 📦 Added PyQt6.QtDBus to hidden imports")
except ImportError:
print(" ⚠️ PyQt6.QtDBus not available, skipping D-Bus imports")
# Conditionally add D-Bus modules if available
try:
import dbus
dbus_modules = [
'dbus',
'dbus.mainloop',
'dbus.mainloop.glib',
]
hidden_imports.extend(dbus_modules)
print(f" 📦 Added {len(dbus_modules)} dbus modules to hidden imports")
except ImportError:
print(" ⚠️ dbus not available, skipping dbus imports")
return hidden_imports
def get_platform_config() -> Dict[str, Any]: def get_platform_config() -> Dict[str, Any]:
"""Get platform-specific configuration""" """Get platform-specific configuration"""
...@@ -263,6 +281,35 @@ def create_icon_file(): ...@@ -263,6 +281,35 @@ def create_icon_file():
return icon_target if icon_target.exists() else None return icon_target if icon_target.exists() else None
def collect_binaries() -> List[tuple]:
"""Collect binary files that need to be included in the build"""
binaries = []
# Add Qt platform plugins and minimal X11 libraries for Linux
if platform.system() == 'Linux':
# Qt platform plugins - must be in 'platforms/' directory for Qt to find them
qt_platform_plugins = [
('/usr/lib/x86_64-linux-gnu/qt6/plugins/platforms/libqxcb.so', 'platforms/'),
('/usr/lib/x86_64-linux-gnu/qt6/plugins/platforms/libqwayland-egl.so', 'platforms/'),
('/usr/lib/x86_64-linux-gnu/qt6/plugins/platforms/libqwayland-generic.so', 'platforms/'),
('/usr/lib/x86_64-linux-gnu/qt6/plugins/platforms/libqoffscreen.so', 'platforms/'),
]
# Minimal X11 libraries - only include essential ones to avoid version conflicts
# Most X11 libraries should be provided by the system to prevent segfaults
essential_x11_libraries = [
('/lib/x86_64-linux-gnu/libxcb.so.1', '.'), # Core xcb library
('/lib/x86_64-linux-gnu/libX11.so.6', '.'), # Core X11 library
]
binaries.extend(qt_platform_plugins)
binaries.extend(essential_x11_libraries)
print(" 📦 Including only essential X11 libraries to prevent segfaults")
print(" 💡 System will provide most X11/GL libraries to avoid version conflicts")
return binaries
def generate_spec_file(): def generate_spec_file():
"""Generate PyInstaller spec file""" """Generate PyInstaller spec file"""
project_root = get_project_root() project_root = get_project_root()
...@@ -271,11 +318,20 @@ def generate_spec_file(): ...@@ -271,11 +318,20 @@ def generate_spec_file():
# Collect files and imports # Collect files and imports
data_files = collect_data_files() data_files = collect_data_files()
hidden_imports = collect_hidden_imports() hidden_imports = collect_hidden_imports()
binaries = collect_binaries()
icon_file = create_icon_file() icon_file = create_icon_file()
# Build pathex (additional paths for imports) # Build pathex (additional paths for imports)
pathex = [str(project_root)] + platform_config.get('additional_paths', []) pathex = [str(project_root)] + platform_config.get('additional_paths', [])
# Verify ffmpeg-python installation
try:
import ffmpeg
print(" ✅ ffmpeg-python is available")
except ImportError as e:
print(f" ⚠️ ffmpeg-python not found: {e}")
print(" 💡 This may cause build issues. Install with: pip install ffmpeg-python")
spec_content = f'''# -*- mode: python ; coding: utf-8 -*- spec_content = f'''# -*- mode: python ; coding: utf-8 -*-
block_cipher = None block_cipher = None
...@@ -283,12 +339,12 @@ block_cipher = None ...@@ -283,12 +339,12 @@ block_cipher = None
a = Analysis( a = Analysis(
['{BUILD_CONFIG['entry_point']}'], ['{BUILD_CONFIG['entry_point']}'],
pathex={pathex!r}, pathex={pathex!r},
binaries=[], binaries={binaries!r},
datas={data_files!r}, datas={data_files!r},
hiddenimports={hidden_imports!r}, hiddenimports={hidden_imports!r},
hookspath=[], hookspath={['.']!r},
hooksconfig={{}}, hooksconfig={{}},
runtime_hooks=[], runtime_hooks=[], # Disabled runtime hooks to avoid segfaults
excludes={platform_config.get('exclude_modules', [])!r}, excludes={platform_config.get('exclude_modules', [])!r},
win_no_prefer_redirects=False, win_no_prefer_redirects=False,
win_private_assemblies=False, win_private_assemblies=False,
...@@ -353,13 +409,20 @@ def check_dependencies(): ...@@ -353,13 +409,20 @@ def check_dependencies():
"""Check if all required dependencies are available""" """Check if all required dependencies are available"""
print("🔍 Checking dependencies...") print("🔍 Checking dependencies...")
# Map package names to their import names # Map package names to their import names (required packages)
required_packages = { required_packages = {
'PyInstaller': 'PyInstaller', 'PyInstaller': 'PyInstaller',
'PyQt6': 'PyQt6' 'PyQt6': 'PyQt6'
} }
# Optional packages (warn if missing but don't fail)
optional_packages = {
'ffmpeg-python': 'ffmpeg',
'dbus-python': 'dbus'
}
missing_packages = [] missing_packages = []
# Check required packages
for package_name, import_name in required_packages.items(): for package_name, import_name in required_packages.items():
try: try:
__import__(import_name) __import__(import_name)
...@@ -368,8 +431,16 @@ def check_dependencies(): ...@@ -368,8 +431,16 @@ def check_dependencies():
missing_packages.append(package_name) missing_packages.append(package_name)
print(f" ✗ {package_name}") print(f" ✗ {package_name}")
# Check optional packages (warn but don't fail)
for package_name, import_name in optional_packages.items():
try:
__import__(import_name)
print(f" ✓ {package_name} (optional)")
except ImportError:
print(f" ⚠️ {package_name} (optional) - not available")
if missing_packages: if missing_packages:
print(f"\n❌ Missing dependencies: {', '.join(missing_packages)}") print(f"\n❌ Missing required dependencies: {', '.join(missing_packages)}")
print("Please install them using:") print("Please install them using:")
print(f" pip install {' '.join(missing_packages)}") print(f" pip install {' '.join(missing_packages)}")
return False return False
...@@ -409,6 +480,9 @@ def post_build_tasks(): ...@@ -409,6 +480,9 @@ def post_build_tasks():
print(" No dist directory found") print(" No dist directory found")
return return
# Removed Qt platform plugins and qt.conf copying for self-contained binary
print(" 📦 Self-contained binary - no external Qt files needed")
# Find the created executable # Find the created executable
executable_path = None executable_path = None
for item in dist_dir.iterdir(): for item in dist_dir.iterdir():
......
...@@ -22,11 +22,38 @@ source venv/bin/activate ...@@ -22,11 +22,38 @@ source venv/bin/activate
# Install/upgrade dependencies # Install/upgrade dependencies
echo "📦 Installing dependencies..." echo "📦 Installing dependencies..."
pip install --upgrade pip if [ -n "$VIRTUAL_ENV" ]; then
pip install -r requirements.txt echo " 📦 Using virtual environment: $VIRTUAL_ENV"
pip install --upgrade pip
pip install -r requirements.txt
# Verify critical package installations
echo " 🔍 Verifying critical package installations..."
# Check ffmpeg-python
python3 -c "import ffmpeg; print('✅ ffmpeg-python installed successfully')" || {
echo " ❌ ffmpeg-python import failed, installing..."
pip install ffmpeg-python>=0.2.0
}
# Check dbus (optional, for Qt D-Bus support)
python3 -c "import dbus; print('✅ dbus installed successfully')" || {
echo " ⚠️ dbus not available, trying to install..."
pip install dbus-python || {
echo " ❌ Could not install dbus-python, Qt D-Bus support will be limited"
}
}
else
echo " ⚠️ Not in virtual environment, using --break-system-packages"
pip install --upgrade pip --break-system-packages
pip install -r requirements.txt --break-system-packages
fi
# Run the build script # Run the build script
echo "🔨 Starting build process..." echo "🔨 Starting build process..."
python3 build.py python3 build.py
# Removed Qt platform plugins and qt.conf copying for self-contained binary
echo "📦 Building self-contained binary - no external Qt files needed"
echo "✅ Build script completed!" echo "✅ Build script completed!"
\ No newline at end of file
#!/bin/bash
# Clean script for MbetterClient build artifacts
echo "🧹 MbetterClient Clean Script"
echo "============================"
# Clean build directories
echo "🗂️ Removing build directories..."
if [ -d "build" ]; then
rm -rf build
echo " ✅ Removed: build/"
else
echo " ℹ️ Build directory not found"
fi
if [ -d "dist" ]; then
echo " 🧹 Cleaning dist/ directory (preserving .exe files)..."
# List files before cleaning
echo " 📋 Files in dist/ before cleaning:"
ls -la dist/
echo ""
# Remove all files except .exe files
find dist -type f -not -name "*.exe" -delete 2>/dev/null || true
# Remove empty directories (but keep the dist directory itself)
find dist -type d -empty -delete 2>/dev/null || true
# Check what remains
remaining_files=$(find dist -type f | wc -l)
remaining_exe=$(find dist -name "*.exe" | wc -l)
if [ "$remaining_files" -gt 0 ]; then
echo " ✅ Cleaned: Removed all artifacts, preserved $remaining_exe .exe file(s)"
echo " 📋 Files remaining in dist/:"
ls -la dist/
else
echo " ℹ️ No files remain in dist/ directory"
fi
else
echo " ℹ️ Dist directory not found"
fi
# Clean generated spec files
echo "📄 Removing generated spec files..."
if [ -f "MbetterClient.spec" ]; then
rm -f MbetterClient.spec
echo " ✅ Removed: MbetterClient.spec"
else
echo " ℹ️ MbetterClient.spec not found"
fi
if [ -f "mbetter_discovery.spec" ]; then
rm -f mbetter_discovery.spec
echo " ✅ Removed: mbetter_discovery.spec"
else
echo " ℹ️ mbetter_discovery.spec not found"
fi
# Clean packages directory (optional)
echo "📦 Clean packages directory? (y/N)"
read -r response
if [[ "$response" =~ ^([yY][eE][sS]|[yY])$ ]]; then
if [ -d "packages" ]; then
rm -rf packages
echo " ✅ Removed: packages/"
else
echo " ℹ️ Packages directory not found"
fi
fi
# Clean __pycache__ directories
echo "🐍 Removing Python cache files..."
find . -type d -name "__pycache__" -exec rm -rf {} + 2>/dev/null || true
find . -name "*.pyc" -delete 2>/dev/null || true
find . -name "*.pyo" -delete 2>/dev/null || true
echo ""
echo "✅ Clean completed!"
echo ""
echo "To rebuild, run: ./build.sh"
\ No newline at end of file
#!/usr/bin/env python3
"""
Debug script to test WebDashboard SSL setup
"""
import sys
import logging
from pathlib import Path
# Add project root to path
project_root = Path(__file__).parent
sys.path.insert(0, str(project_root))
from mbetterclient.config.settings import AppSettings
from mbetterclient.web_dashboard.app import WebDashboard
from mbetterclient.core.message_bus import MessageBus
from mbetterclient.database.manager import DatabaseManager
from mbetterclient.config.manager import ConfigManager
def test_webapp_ssl():
"""Test WebDashboard SSL configuration"""
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)
print("=== Testing WebDashboard SSL Setup ===")
# Create settings with SSL enabled
settings = AppSettings()
settings.web.enable_ssl = True
settings.web.host = "127.0.0.1"
settings.web.port = 5001
print(f"SSL enabled in settings: {settings.web.enable_ssl}")
print(f"SSL auto-generate: {settings.web.ssl_auto_generate}")
print(f"Host: {settings.web.host}, Port: {settings.web.port}")
# Initialize components needed by WebDashboard
try:
# Create required components
message_bus = MessageBus()
db_manager = DatabaseManager(db_path="test.db")
config_manager = ConfigManager(db_manager)
# Create WebDashboard instance
web_dashboard = WebDashboard(
message_bus=message_bus,
db_manager=db_manager,
config_manager=config_manager,
settings=settings.web
)
print("WebDashboard instance created successfully")
# Initialize (this should trigger SSL setup)
success = web_dashboard.initialize()
print(f"WebDashboard initialization: {'SUCCESS' if success else 'FAILED'}")
# Check if SSL context was created
if hasattr(web_dashboard, 'ssl_context') and web_dashboard.ssl_context:
print(f"SSL context created: {web_dashboard.ssl_context}")
print(f"SSL context type: {type(web_dashboard.ssl_context)}")
else:
print("ERROR: No SSL context created")
# Check server configuration
if hasattr(web_dashboard, 'server') and web_dashboard.server:
print(f"Server created: {web_dashboard.server}")
print(f"Server type: {type(web_dashboard.server)}")
# Check if server has SSL context
if hasattr(web_dashboard.server, 'ssl_context'):
print(f"Server SSL context: {web_dashboard.server.ssl_context}")
else:
print("Server has no ssl_context attribute")
else:
print("ERROR: No server created")
# Check settings after initialization
print(f"Settings SSL enabled after init: {web_dashboard.settings.enable_ssl}")
except Exception as e:
logger.error(f"Test failed: {e}")
import traceback
traceback.print_exc()
return False
return True
if __name__ == "__main__":
success = test_webapp_ssl()
if success:
print("\n✅ WebDashboard SSL test completed")
else:
print("\n❌ WebDashboard SSL test failed")
sys.exit(1)
\ No newline at end of file
import re
with open('mbetterclient/web_dashboard/templates/dashboard/fixtures.html', 'r') as f:
content = f.read()
# Extract JavaScript
js_matches = re.findall(r'<script[^>]*>(.*?)</script>', content, re.DOTALL)
if js_matches:
js_code = js_matches[0]
# Write to a temporary file to examine
with open('temp_js.js', 'w') as f:
f.write(js_code)
print("JavaScript extracted and saved to temp_js.js")
print("First 200 characters:")
print(js_code[:200])
print("\nLast 200 characters:")
print(js_code[-200:])
else:
print("No JavaScript found")
\ No newline at end of file
...@@ -15,6 +15,14 @@ from pathlib import Path ...@@ -15,6 +15,14 @@ from pathlib import Path
project_root = Path(__file__).parent project_root = Path(__file__).parent
sys.path.insert(0, str(project_root)) sys.path.insert(0, str(project_root))
# Set Qt platform plugin path for PyInstaller builds
# This must be set before any Qt imports
if hasattr(sys, '_MEIPASS'):
# Running in PyInstaller bundle
qt_plugins_path = os.path.join(sys._MEIPASS, 'platforms')
os.environ['QT_QPA_PLATFORM_PLUGIN_PATH'] = qt_plugins_path
print(f"Set QT_QPA_PLATFORM_PLUGIN_PATH to: {qt_plugins_path}")
from mbetterclient.core.application import MbetterClientApplication from mbetterclient.core.application import MbetterClientApplication
from mbetterclient.utils.logger import setup_logging from mbetterclient.utils.logger import setup_logging
from mbetterclient.config.settings import AppSettings from mbetterclient.config.settings import AppSettings
......
...@@ -5,25 +5,47 @@ PyInstaller spec file for MBetter Discovery Application - Linux ...@@ -5,25 +5,47 @@ PyInstaller spec file for MBetter Discovery Application - Linux
block_cipher = None block_cipher = None
# Qt platform plugins and X11 libraries
qt_platform_plugins = [
('/usr/lib/x86_64-linux-gnu/qt6/plugins/platforms/libqxcb.so', 'platforms/'),
('/usr/lib/x86_64-linux-gnu/qt6/plugins/platforms/libqwayland-egl.so', 'platforms/'),
('/usr/lib/x86_64-linux-gnu/qt6/plugins/platforms/libqwayland-generic.so', 'platforms/'),
('/usr/lib/x86_64-linux-gnu/qt6/plugins/platforms/libqoffscreen.so', 'platforms/'),
]
# Minimal X11 libraries - only include essential ones to avoid version conflicts
# Most X11 libraries should be provided by the system to prevent segfaults
x11_libraries = [
('/lib/x86_64-linux-gnu/libxcb.so.1', '.'), # Core xcb library
('/lib/x86_64-linux-gnu/libX11.so.6', '.'), # Core X11 library
]
a = Analysis( a = Analysis(
['mbetter_discovery.py'], ['mbetter_discovery.py'],
pathex=[], pathex=[],
binaries=[], binaries=qt_platform_plugins + x11_libraries,
datas=[], datas=[],
hiddenimports=[ hiddenimports=[
'PyQt6.QtCore', 'PyQt6.QtCore',
'PyQt6.QtGui', 'PyQt6.QtGui',
'PyQt6.QtWidgets', 'PyQt6.QtWidgets',
'PyQt6.QtDBus',
'netifaces', 'netifaces',
'ffmpeg', 'ffmpeg',
'ffmpeg._run', 'ffmpeg.errors',
'ffmpeg._utils', 'ffmpeg.ffmpeg',
'ffmpeg.nodes', 'ffmpeg.file',
'ffmpeg.streams', 'ffmpeg.options',
'ffmpeg.filter', 'ffmpeg.progress',
'ffmpeg.filter.graph', 'ffmpeg.protocol',
'ffmpeg.statistics',
'ffmpeg.types',
'ffmpeg.utils',
'dbus',
'dbus.mainloop',
'dbus.mainloop.glib',
], ],
hookspath=[], hookspath=['.'],
hooksconfig={}, hooksconfig={},
runtime_hooks=[], runtime_hooks=[],
excludes=[ excludes=[
......
...@@ -16,14 +16,17 @@ a = Analysis( ...@@ -16,14 +16,17 @@ a = Analysis(
'PyQt6.QtWidgets', 'PyQt6.QtWidgets',
'netifaces', 'netifaces',
'ffmpeg', 'ffmpeg',
'ffmpeg._run', 'ffmpeg.errors',
'ffmpeg._utils', 'ffmpeg.ffmpeg',
'ffmpeg.nodes', 'ffmpeg.file',
'ffmpeg.streams', 'ffmpeg.options',
'ffmpeg.filter', 'ffmpeg.progress',
'ffmpeg.filter.graph', 'ffmpeg.protocol',
'ffmpeg.statistics',
'ffmpeg.types',
'ffmpeg.utils',
], ],
hookspath=[], hookspath=['.'],
hooksconfig={}, hooksconfig={},
runtime_hooks=[], runtime_hooks=[],
excludes=[ excludes=[
......
...@@ -507,6 +507,7 @@ class APIClient(ThreadedComponent): ...@@ -507,6 +507,7 @@ class APIClient(ThreadedComponent):
total=self.settings.retry_attempts, total=self.settings.retry_attempts,
backoff_factor=self.settings.retry_delay_seconds, backoff_factor=self.settings.retry_delay_seconds,
status_forcelist=[429, 500, 502, 503, 504], status_forcelist=[429, 500, 502, 503, 504],
allowed_methods=None, # Allow all methods (replaces deprecated method_whitelist)
) )
adapter = HTTPAdapter(max_retries=retry_strategy) adapter = HTTPAdapter(max_retries=retry_strategy)
......
...@@ -1944,8 +1944,7 @@ class QtVideoPlayer(QObject): ...@@ -1944,8 +1944,7 @@ class QtVideoPlayer(QObject):
if platform.system() != 'Linux': if platform.system() != 'Linux':
return return
logger.info("TEMPORARILY DISABLING all Linux environment variables to test video display") # Linux environment variables are properly configured for Qt xcb platform support
return # Skip all environment variable changes
try: try:
# TEMPORARILY DISABLED - ALL environment variables that might interfere with video # TEMPORARILY DISABLED - ALL environment variables that might interfere with video
......
"""
PyQt6 Multi-threaded Video Player with QWebEngineView Overlay System
"""
import sys
import time
import logging
import json
import os
from pathlib import Path
from typing import Optional, Dict, Any, List
# Suppress Chromium sandbox warnings when running as root - MUST be set before Qt imports
if os.geteuid() == 0: # Running as root
os.environ['QTWEBENGINE_DISABLE_SANDBOX'] = '1'
print("Qt WebEngine sandbox disabled for root user")
from PyQt6.QtWidgets import (
QApplication, QMainWindow, QWidget, QVBoxLayout, QHBoxLayout,
QLabel, QPushButton, QSlider, QFrame, QStackedWidget
)
from PyQt6.QtCore import (
Qt, QTimer, QThread, pyqtSignal, QUrl, QRect, QPropertyAnimation,
QEasingCurve, QSequentialAnimationGroup, QObject, QMutex, QMutexLocker,
QThreadPool, QRunnable, pyqtSlot
)
from PyQt6.QtGui import (
QFont, QPainter, QPen, QBrush, QColor, QPixmap, QMovie,
QLinearGradient, QFontMetrics, QAction
)
from PyQt6.QtMultimedia import QMediaPlayer, QAudioOutput
from PyQt6.QtMultimediaWidgets import QVideoWidget
from PyQt6.QtWebEngineWidgets import QWebEngineView
from PyQt6.QtWebEngineCore import QWebEngineProfile
from PyQt6.QtWebChannel import QWebChannel
from ..core.thread_manager import ThreadedComponent
from ..core.message_bus import MessageBus, Message, MessageType, MessageBuilder
from ..config.settings import QtConfig
from .overlay_url_handler import OverlayUrlSchemeHandler
logger = logging.getLogger(__name__)
class OverlayWebChannel(QObject):
"""QObject for WebChannel communication with overlay HTML/JS"""
# Signals to send data to JavaScript
dataUpdated = pyqtSignal(dict)
positionChanged = pyqtSignal(float, float) # position, duration in seconds
videoInfoChanged = pyqtSignal(dict)
def __init__(self):
super().__init__()
self.mutex = QMutex()
self.overlay_data = {}
logger.info("OverlayWebChannel initialized")
@pyqtSlot(str)
def updateTitle(self, title: str):
"""Update main title from JavaScript"""
with QMutexLocker(self.mutex):
self.overlay_data['title'] = title
self.dataUpdated.emit({'title': title})
logger.debug(f"Title updated: {title}")
@pyqtSlot(str)
def updateSubtitle(self, subtitle: str):
"""Update subtitle from JavaScript"""
with QMutexLocker(self.mutex):
self.overlay_data['subtitle'] = subtitle
self.dataUpdated.emit({'subtitle': subtitle})
logger.debug(f"Subtitle updated: {subtitle}")
@pyqtSlot(bool)
def toggleStats(self, show: bool):
"""Toggle stats panel visibility"""
with QMutexLocker(self.mutex):
self.overlay_data['showStats'] = show
self.dataUpdated.emit({'showStats': show})
logger.debug(f"Stats panel toggled: {show}")
def send_data_update(self, data: Dict[str, Any]):
"""Send data update to JavaScript (thread-safe)"""
with QMutexLocker(self.mutex):
self.overlay_data.update(data)
self.dataUpdated.emit(data)
def send_position_update(self, position: float, duration: float):
"""Send playback position update to JavaScript (thread-safe)"""
self.positionChanged.emit(position, duration)
def send_video_info(self, info: Dict[str, Any]):
"""Send video information to JavaScript (thread-safe)"""
self.videoInfoChanged.emit(info)
class VideoProcessingWorker(QRunnable):
"""Background worker for video processing tasks"""
def __init__(self, task_type: str, data: Dict[str, Any], callback=None):
super().__init__()
self.task_type = task_type
self.data = data
self.callback = callback
self.setAutoDelete(True)
def run(self):
"""Execute the video processing task"""
try:
logger.debug(f"Processing video task: {self.task_type}")
if self.task_type == "metadata_extraction":
result = self._extract_metadata()
elif self.task_type == "thumbnail_generation":
result = self._generate_thumbnail()
elif self.task_type == "overlay_rendering":
result = self._render_overlay()
else:
result = {"error": f"Unknown task type: {self.task_type}"}
if self.callback:
self.callback(result)
except Exception as e:
logger.error(f"Video processing worker error: {e}")
if self.callback:
self.callback({"error": str(e)})
def _extract_metadata(self) -> Dict[str, Any]:
"""Extract video metadata"""
# Placeholder for metadata extraction
return {
"resolution": "1920x1080",
"bitrate": "5.2 Mbps",
"codec": "H.264",
"fps": "30.0"
}
def _generate_thumbnail(self) -> Dict[str, Any]:
"""Generate video thumbnail"""
# Placeholder for thumbnail generation
return {"thumbnail_path": "/tmp/thumbnail.jpg"}
def _render_overlay(self) -> Dict[str, Any]:
"""Render overlay elements"""
# Placeholder for overlay rendering
return {"rendered": True}
class OverlayWebView(QWebEngineView):
"""Custom QWebEngineView for video overlays with transparent background"""
def __init__(self, parent=None):
super().__init__(parent)
self.web_channel = None
self.overlay_channel = None
self.current_template = "default.html"
# Built-in templates directory (bundled with app)
self.builtin_templates_dir = Path(__file__).parent / "templates"
# Persistent uploaded templates directory (user data)
self.uploaded_templates_dir = self._get_persistent_templates_dir()
self.uploaded_templates_dir.mkdir(parents=True, exist_ok=True)
# Primary templates directory for backwards compatibility
self.templates_dir = self.builtin_templates_dir
self.setup_web_view()
self._setup_custom_scheme()
logger.info(f"OverlayWebView initialized - builtin: {self.builtin_templates_dir}, uploaded: {self.uploaded_templates_dir}")
def setup_web_view(self):
"""Setup web view for transparent overlays"""
# Enable transparency
self.setAttribute(Qt.WidgetAttribute.WA_TranslucentBackground, True)
self.setAttribute(Qt.WidgetAttribute.WA_TransparentForMouseEvents, True)
# Suppress Chromium sandbox warnings when running as root
import os
if os.geteuid() == 0: # Running as root
os.environ['QTWEBENGINE_DISABLE_SANDBOX'] = '1'
logger.info("Disabled Qt WebEngine sandbox for root user")
# Configure page settings
page = self.page()
page.setBackgroundColor(QColor(0, 0, 0, 0)) # Transparent background
# Setup WebChannel
self.web_channel = QWebChannel()
self.overlay_channel = OverlayWebChannel()
self.web_channel.registerObject("overlay", self.overlay_channel)
page.setWebChannel(self.web_channel)
# Load default template
self.load_template(self.current_template)
def _setup_custom_scheme(self):
"""Setup custom URL scheme handler for overlay resources"""
try:
# Get the page's profile
profile = self.page().profile()
# Create and install URL scheme handler
self.scheme_handler = OverlayUrlSchemeHandler(self)
profile.installUrlSchemeHandler(b"overlay", self.scheme_handler)
logger.info("Custom overlay:// URL scheme handler installed successfully")
except Exception as e:
logger.error(f"Failed to setup custom URL scheme: {e}")
def load_template(self, template_name: str):
"""Load a specific template file, prioritizing uploaded templates"""
try:
# If no template name provided, use default
if not template_name:
template_name = "default.html"
# Ensure .html extension
if not template_name.endswith('.html'):
template_name += '.html'
# First try uploaded templates directory (user uploads take priority)
template_path = self.uploaded_templates_dir / template_name
template_source = "uploaded"
# If not found in uploaded, try built-in templates
if not template_path.exists():
template_path = self.builtin_templates_dir / template_name
template_source = "builtin"
# If still not found, fallback to default.html in built-in templates
if not template_path.exists():
default_template_path = self.builtin_templates_dir / "default.html"
if default_template_path.exists():
template_path = default_template_path
template_name = "default.html"
template_source = "builtin"
logger.warning(f"Template {template_name} not found, using default.html")
else:
logger.error(f"No template found: {template_name}, and default.html missing")
return
if template_path.exists():
self.load(QUrl.fromLocalFile(str(template_path)))
self.current_template = template_name
logger.info(f"Loaded template: {template_path} (source: {template_source})")
else:
logger.error(f"No template found: {template_name}")
except Exception as e:
logger.error(f"Failed to load template {template_name}: {e}")
def reload_current_template(self):
"""Reload the current template"""
self.load_template(self.current_template)
def get_available_templates(self) -> List[str]:
"""Get list of available template files from both directories"""
try:
templates = set()
# Get built-in templates
if self.builtin_templates_dir.exists():
builtin_templates = [f.name for f in self.builtin_templates_dir.glob("*.html")]
templates.update(builtin_templates)
# Get uploaded templates (these override built-in ones with same name)
if self.uploaded_templates_dir.exists():
uploaded_templates = [f.name for f in self.uploaded_templates_dir.glob("*.html")]
templates.update(uploaded_templates)
template_list = sorted(list(templates))
if not template_list:
template_list = ["default.html"]
return template_list
except Exception as e:
logger.error(f"Failed to get available templates: {e}")
return ["default.html"]
def _get_persistent_templates_dir(self) -> Path:
"""Get persistent templates directory for user uploads"""
try:
import platform
import os
system = platform.system()
if system == "Windows":
# Use AppData/Roaming on Windows
app_data = os.getenv('APPDATA', os.path.expanduser('~'))
templates_dir = Path(app_data) / "MbetterClient" / "templates"
elif system == "Darwin": # macOS
# Use ~/Library/Application Support on macOS
templates_dir = Path.home() / "Library" / "Application Support" / "MbetterClient" / "templates"
else: # Linux and other Unix-like systems
# Use ~/.config on Linux
config_home = os.getenv('XDG_CONFIG_HOME', str(Path.home() / ".config"))
templates_dir = Path(config_home) / "MbetterClient" / "templates"
logger.debug(f"Persistent templates directory: {templates_dir}")
return templates_dir
except Exception as e:
logger.error(f"Failed to determine persistent templates directory: {e}")
# Fallback to local directory
return Path.cwd() / "user_templates"
def get_overlay_channel(self) -> OverlayWebChannel:
"""Get the overlay communication channel"""
return self.overlay_channel
def update_overlay_data(self, data: Dict[str, Any]):
"""Update overlay with new data"""
if self.overlay_channel:
self.overlay_channel.send_data_update(data)
def update_position(self, position: float, duration: float):
"""Update playback position"""
if self.overlay_channel:
self.overlay_channel.send_position_update(position, duration)
def update_video_info(self, info: Dict[str, Any]):
"""Update video information"""
if self.overlay_channel:
self.overlay_channel.send_video_info(info)
class VideoWidget(QWidget):
"""Composite video widget with QWebEngineView overlay"""
def __init__(self, parent=None):
super().__init__(parent)
self.setup_ui()
logger.info("VideoWidget initialized")
def setup_ui(self):
"""Setup video widget with overlay"""
self.setStyleSheet("background-color: black;")
# Create layout
layout = QVBoxLayout(self)
layout.setContentsMargins(0, 0, 0, 0)
layout.setSpacing(0)
# Create stacked widget for layering
self.stacked_widget = QStackedWidget()
# Create video widget for actual video playback
self.video_widget = QVideoWidget()
self.video_widget.setStyleSheet("background-color: black;")
self.stacked_widget.addWidget(self.video_widget)
# Create overlay web view
self.overlay_view = OverlayWebView()
self.stacked_widget.addWidget(self.overlay_view)
layout.addWidget(self.stacked_widget)
# Ensure overlay is on top by setting the current widget
self.stacked_widget.setCurrentWidget(self.overlay_view)
def get_video_widget(self) -> QVideoWidget:
"""Get the video widget for media player"""
return self.video_widget
def get_overlay_view(self) -> OverlayWebView:
"""Get the overlay web view"""
return self.overlay_view
def resizeEvent(self, event):
"""Handle resize events"""
super().resizeEvent(event)
# Both widgets should automatically resize with the stacked widget
class PlayerControlsWidget(QWidget):
"""Enhanced video player controls with thread-safe operations"""
play_pause_clicked = pyqtSignal()
stop_clicked = pyqtSignal()
seek_requested = pyqtSignal(int)
volume_changed = pyqtSignal(int)
def __init__(self, parent=None):
super().__init__(parent)
self.mutex = QMutex()
self.setup_ui()
def setup_ui(self):
"""Setup enhanced control UI"""
layout = QHBoxLayout(self)
layout.setContentsMargins(10, 5, 10, 5)
# Play/Pause button
self.play_pause_btn = QPushButton("⏸️")
self.play_pause_btn.setFixedSize(40, 30)
self.play_pause_btn.clicked.connect(self.play_pause_clicked.emit)
# Stop button
self.stop_btn = QPushButton("⏹️")
self.stop_btn.setFixedSize(40, 30)
self.stop_btn.clicked.connect(self.stop_clicked.emit)
# Position slider
self.position_slider = QSlider(Qt.Orientation.Horizontal)
self.position_slider.setMinimum(0)
self.position_slider.setMaximum(100)
self.position_slider.sliderPressed.connect(self._on_slider_pressed)
self.position_slider.sliderReleased.connect(self._on_slider_released)
# Volume slider
self.volume_slider = QSlider(Qt.Orientation.Horizontal)
self.volume_slider.setMinimum(0)
self.volume_slider.setMaximum(100)
self.volume_slider.setValue(100)
self.volume_slider.setMaximumWidth(100)
self.volume_slider.valueChanged.connect(self.volume_changed.emit)
# Time labels
self.current_time_label = QLabel("00:00")
self.duration_label = QLabel("00:00")
# Volume label
volume_label = QLabel("🔊")
# Layout
layout.addWidget(self.play_pause_btn)
layout.addWidget(self.stop_btn)
layout.addWidget(self.current_time_label)
layout.addWidget(self.position_slider, 1) # Stretch
layout.addWidget(self.duration_label)
layout.addWidget(volume_label)
layout.addWidget(self.volume_slider)
# Enhanced styling
self.setStyleSheet("""
QWidget {
background-color: rgba(0, 0, 0, 200);
color: white;
}
QPushButton {
border: 1px solid #555;
border-radius: 5px;
background-color: #333;
font-weight: bold;
}
QPushButton:hover {
background-color: #555;
}
QPushButton:pressed {
background-color: #777;
}
QSlider::groove:horizontal {
border: 1px solid #555;
height: 6px;
background: #222;
border-radius: 3px;
}
QSlider::handle:horizontal {
background: #fff;
border: 2px solid #555;
width: 14px;
margin: -5px 0;
border-radius: 7px;
}
QSlider::handle:horizontal:hover {
background: #ddd;
}
""")
self.slider_pressed = False
def _on_slider_pressed(self):
with QMutexLocker(self.mutex):
self.slider_pressed = True
def _on_slider_released(self):
with QMutexLocker(self.mutex):
self.slider_pressed = False
self.seek_requested.emit(self.position_slider.value())
def update_position(self, position: int, duration: int):
"""Update position display (thread-safe)"""
with QMutexLocker(self.mutex):
if not self.slider_pressed and duration > 0:
self.position_slider.setValue(int(position * 100 / duration))
self.current_time_label.setText(self._format_time(position))
self.duration_label.setText(self._format_time(duration))
def update_play_pause_button(self, is_playing: bool):
"""Update play/pause button state (thread-safe)"""
with QMutexLocker(self.mutex):
self.play_pause_btn.setText("⏸️" if is_playing else "▶️")
def _format_time(self, ms: int) -> str:
"""Format time in milliseconds to MM:SS"""
seconds = ms // 1000
minutes = seconds // 60
seconds = seconds % 60
return f"{minutes:02d}:{seconds:02d}"
class PlayerWindow(QMainWindow):
"""Enhanced main player window with thread-safe operations"""
# Signals for thread communication
position_changed = pyqtSignal(int, int)
video_loaded = pyqtSignal(str)
def __init__(self, settings: QtConfig):
super().__init__()
self.settings = settings
self.mutex = QMutex()
self.thread_pool = QThreadPool()
self.thread_pool.setMaxThreadCount(4)
self.setup_ui()
self.setup_media_player()
self.setup_timers()
logger.info("PlayerWindow initialized")
def setup_ui(self):
"""Setup enhanced window UI"""
self.setWindowTitle("MbetterClient - PyQt6 Video Player")
self.setStyleSheet("background-color: black;")
# Central widget
central_widget = QWidget()
self.setCentralWidget(central_widget)
# Layout
layout = QVBoxLayout(central_widget)
layout.setContentsMargins(0, 0, 0, 0)
layout.setSpacing(0)
# Video widget with overlay
self.video_widget = VideoWidget()
layout.addWidget(self.video_widget, 1) # Stretch
# Controls
self.controls = PlayerControlsWidget()
self.controls.play_pause_clicked.connect(self.toggle_play_pause)
self.controls.stop_clicked.connect(self.stop_playback)
self.controls.seek_requested.connect(self.seek_to_position)
self.controls.volume_changed.connect(self.set_volume)
layout.addWidget(self.controls)
# Window settings
if self.settings.fullscreen:
self.showFullScreen()
else:
self.resize(self.settings.window_width, self.settings.window_height)
self.show()
if self.settings.always_on_top:
self.setWindowFlags(self.windowFlags() | Qt.WindowType.WindowStaysOnTopHint)
# Setup menu
self.setup_menu()
def setup_menu(self):
"""Setup application menu"""
menubar = self.menuBar()
# File menu
file_menu = menubar.addMenu('File')
open_action = QAction('Open Video', self)
open_action.triggered.connect(self.open_file_dialog)
file_menu.addAction(open_action)
# View menu
view_menu = menubar.addMenu('View')
fullscreen_action = QAction('Toggle Fullscreen', self)
fullscreen_action.setShortcut('F11')
fullscreen_action.triggered.connect(self.toggle_fullscreen)
view_menu.addAction(fullscreen_action)
stats_action = QAction('Toggle Stats', self)
stats_action.setShortcut('S')
stats_action.triggered.connect(self.toggle_stats)
view_menu.addAction(stats_action)
def setup_media_player(self):
"""Setup PyQt6 media player with audio output"""
self.media_player = QMediaPlayer()
self.audio_output = QAudioOutput()
self.media_player.setAudioOutput(self.audio_output)
self.media_player.setVideoOutput(self.video_widget.get_video_widget())
# Connect signals
self.media_player.playbackStateChanged.connect(self.on_state_changed)
self.media_player.positionChanged.connect(self.on_position_changed)
self.media_player.durationChanged.connect(self.on_duration_changed)
self.media_player.errorOccurred.connect(self.on_media_error)
self.media_player.mediaStatusChanged.connect(self.on_media_status_changed)
# Set volume
self.audio_output.setVolume(self.settings.volume)
self.audio_output.setMuted(self.settings.mute)
def setup_timers(self):
"""Setup various timers"""
# Auto-hide controls timer
self.controls_timer = QTimer()
self.controls_timer.timeout.connect(self.hide_controls)
self.controls_timer.setSingleShot(True)
# Overlay update timer
self.overlay_timer = QTimer()
self.overlay_timer.timeout.connect(self.update_overlay_periodically)
self.overlay_timer.start(1000) # Update every second
# Mouse tracking for showing controls
self.setMouseTracking(True)
self.controls_visible = True
def open_file_dialog(self):
"""Open file dialog to select video"""
from PyQt6.QtWidgets import QFileDialog
file_path, _ = QFileDialog.getOpenFileName(
self,
"Open Video File",
"",
"Video Files (*.mp4 *.avi *.mkv *.mov *.wmv *.flv *.webm *.m4v);;All Files (*)"
)
if file_path:
self.play_video(file_path)
def play_video(self, file_path: str, template_data: Dict[str, Any] = None, template_name: str = None):
"""Play video file with optional overlay data and template"""
try:
with QMutexLocker(self.mutex):
url = QUrl.fromLocalFile(str(Path(file_path).absolute()))
self.media_player.setSource(url)
# Load specified template or reload current template when playing a video
overlay_view = self.video_widget.get_overlay_view()
if template_name:
overlay_view.load_template(template_name)
logger.info(f"Loaded template '{template_name}' for video playback")
else:
overlay_view.reload_current_template()
logger.info("Reloaded current overlay template for video playback")
# Update overlay with video info
overlay_data = template_data or {}
overlay_data.update({
'title': f'Playing: {Path(file_path).name}',
'subtitle': 'MbetterClient PyQt6 Player'
})
overlay_view.update_overlay_data(overlay_data)
if self.settings.auto_play:
self.media_player.play()
# Start background metadata extraction
worker = VideoProcessingWorker(
"metadata_extraction",
{"file_path": file_path},
self.on_metadata_extracted
)
self.thread_pool.start(worker)
logger.info(f"Playing video: {file_path}")
self.video_loaded.emit(file_path)
except Exception as e:
logger.error(f"Failed to play video: {e}")
def on_metadata_extracted(self, metadata: Dict[str, Any]):
"""Handle extracted metadata"""
if 'error' not in metadata:
self.video_widget.get_overlay_view().update_video_info(metadata)
logger.debug(f"Video metadata: {metadata}")
def toggle_play_pause(self):
"""Toggle play/pause (thread-safe)"""
with QMutexLocker(self.mutex):
if self.media_player.playbackState() == QMediaPlayer.PlaybackState.PlayingState:
self.media_player.pause()
else:
self.media_player.play()
def stop_playback(self):
"""Stop playback (thread-safe)"""
with QMutexLocker(self.mutex):
self.media_player.stop()
def seek_to_position(self, percentage: int):
"""Seek to position (percentage) (thread-safe)"""
with QMutexLocker(self.mutex):
duration = self.media_player.duration()
if duration > 0:
position = int(duration * percentage / 100)
self.media_player.setPosition(position)
def set_volume(self, volume: int):
"""Set volume (0-100) (thread-safe)"""
with QMutexLocker(self.mutex):
self.audio_output.setVolume(volume / 100.0)
def toggle_fullscreen(self):
"""Toggle fullscreen mode"""
if self.isFullScreen():
self.showNormal()
else:
self.showFullScreen()
def toggle_stats(self):
"""Toggle stats panel"""
overlay_view = self.video_widget.get_overlay_view()
# Toggle stats via WebChannel
if overlay_view.overlay_channel:
current_stats = getattr(self, '_stats_visible', False)
overlay_view.overlay_channel.toggleStats(not current_stats)
self._stats_visible = not current_stats
def on_state_changed(self, state):
"""Handle playback state changes (thread-safe)"""
is_playing = state == QMediaPlayer.PlaybackState.PlayingState
self.controls.update_play_pause_button(is_playing)
# Auto-hide controls when playing in fullscreen
if is_playing and self.isFullScreen():
self.start_controls_timer()
def on_position_changed(self, position):
"""Handle position changes (thread-safe)"""
duration = self.media_player.duration()
self.controls.update_position(position, duration)
# Update overlay with position info
if duration > 0:
self.video_widget.get_overlay_view().update_position(
position / 1000.0, # Convert to seconds
duration / 1000.0 # Convert to seconds
)
# Emit signal for other components
self.position_changed.emit(position, duration)
def on_duration_changed(self, duration):
"""Handle duration changes"""
self.controls.update_position(self.media_player.position(), duration)
def on_media_error(self, error):
"""Handle media errors"""
logger.error(f"Media player error: {error}")
# Show error in overlay
self.video_widget.get_overlay_view().update_overlay_data({
'title': 'Playback Error',
'subtitle': f'Error: {error.name if hasattr(error, "name") else str(error)}',
'ticker': 'Please check the video file and try again.'
})
def on_media_status_changed(self, status):
"""Handle media status changes"""
logger.debug(f"Media status changed: {status}")
if status == QMediaPlayer.MediaStatus.LoadedMedia:
# Media loaded successfully
self.video_widget.get_overlay_view().update_overlay_data({
'subtitle': 'Media loaded successfully'
})
def update_overlay_periodically(self):
"""Periodic overlay updates"""
# Update current time in overlay
current_time = time.strftime("%H:%M:%S")
self.video_widget.get_overlay_view().update_overlay_data({
'currentTime': current_time
})
def mouseMoveEvent(self, event):
"""Show controls on mouse movement"""
super().mouseMoveEvent(event)
self.show_controls()
if self.isFullScreen() and self.media_player.playbackState() == QMediaPlayer.PlaybackState.PlayingState:
self.start_controls_timer()
def keyPressEvent(self, event):
"""Handle key presses"""
if event.key() == Qt.Key.Key_Space:
self.toggle_play_pause()
elif event.key() == Qt.Key.Key_Escape:
if self.isFullScreen():
self.showNormal()
else:
self.close()
elif event.key() == Qt.Key.Key_F11:
self.toggle_fullscreen()
elif event.key() == Qt.Key.Key_M:
current_muted = self.audio_output.isMuted()
self.audio_output.setMuted(not current_muted)
elif event.key() == Qt.Key.Key_S:
self.toggle_stats()
super().keyPressEvent(event)
def show_controls(self):
"""Show controls"""
if not self.controls_visible:
self.controls.show()
self.controls_visible = True
self.setCursor(Qt.CursorShape.ArrowCursor)
def hide_controls(self):
"""Hide controls"""
if self.isFullScreen() and self.controls_visible:
self.controls.hide()
self.controls_visible = False
self.setCursor(Qt.CursorShape.BlankCursor)
def start_controls_timer(self):
"""Start timer to hide controls"""
self.controls_timer.stop()
self.controls_timer.start(3000) # Hide after 3 seconds
def closeEvent(self, event):
"""Handle window close (thread-safe)"""
with QMutexLocker(self.mutex):
self.media_player.stop()
self.thread_pool.waitForDone(3000) # Wait up to 3 seconds for threads
event.accept()
class Qt6VideoPlayer(ThreadedComponent):
"""PyQt6 video player component with advanced features"""
def __init__(self, message_bus: MessageBus, settings: QtConfig):
super().__init__("qt6_player", message_bus)
self.settings = settings
self.app: Optional[QApplication] = None
self.window: Optional[PlayerWindow] = None
self.mutex = QMutex()
# Register message queue
self.message_queue = self.message_bus.register_component(self.name)
logger.info("Qt6VideoPlayer initialized")
def initialize(self) -> bool:
"""Initialize PyQt6 application and components"""
try:
with QMutexLocker(self.mutex):
# Create QApplication if it doesn't exist
if not QApplication.instance():
self.app = QApplication(sys.argv)
self.app.setApplicationName("MbetterClient PyQt6")
self.app.setApplicationVersion("2.0.0")
self.app.setQuitOnLastWindowClosed(True)
else:
self.app = QApplication.instance()
# Create player window
self.window = PlayerWindow(self.settings)
# Connect window signals
self.window.position_changed.connect(self._on_position_changed)
self.window.video_loaded.connect(self._on_video_loaded)
# Subscribe to messages
self.message_bus.subscribe(self.name, MessageType.VIDEO_PLAY, self._handle_video_play)
self.message_bus.subscribe(self.name, MessageType.VIDEO_PAUSE, self._handle_video_pause)
self.message_bus.subscribe(self.name, MessageType.VIDEO_STOP, self._handle_video_stop)
self.message_bus.subscribe(self.name, MessageType.VIDEO_SEEK, self._handle_video_seek)
self.message_bus.subscribe(self.name, MessageType.TEMPLATE_CHANGE, self._handle_template_change)
self.message_bus.subscribe(self.name, MessageType.OVERLAY_UPDATE, self._handle_overlay_update)
logger.info("Qt6VideoPlayer initialized successfully")
return True
except Exception as e:
logger.error(f"Qt6VideoPlayer initialization failed: {e}")
return False
def run(self):
"""Main run loop"""
try:
logger.info("Qt6VideoPlayer thread started")
# Send ready status
ready_message = MessageBuilder.system_status(
sender=self.name,
status="ready",
details={"fullscreen": self.settings.fullscreen, "version": "PyQt6-2.0.0"}
)
self.message_bus.publish(ready_message)
# Message processing loop
while self.running:
try:
# Process Qt events
if self.app:
self.app.processEvents()
# Process messages
message = self.message_bus.get_message(self.name, timeout=0.1)
if message:
self._process_message(message)
# Update heartbeat
self.heartbeat()
time.sleep(0.016) # ~60 FPS
except Exception as e:
logger.error(f"Qt6VideoPlayer run loop error: {e}")
time.sleep(0.1)
except Exception as e:
logger.error(f"Qt6VideoPlayer run failed: {e}")
finally:
logger.info("Qt6VideoPlayer thread ended")
def shutdown(self):
"""Shutdown video player"""
try:
logger.info("Shutting down Qt6VideoPlayer...")
with QMutexLocker(self.mutex):
if self.window:
self.window.close()
self.window = None
if self.app:
self.app.quit()
except Exception as e:
logger.error(f"Qt6VideoPlayer shutdown error: {e}")
def _process_message(self, message: Message):
"""Process received message"""
try:
# Messages are handled by subscribed handlers
pass
except Exception as e:
logger.error(f"Failed to process message: {e}")
def _on_position_changed(self, position: int, duration: int):
"""Handle position changes from player window"""
try:
# Send progress update
if duration > 0:
percentage = (position / duration) * 100
progress_message = MessageBuilder.video_progress(
sender=self.name,
position=position / 1000.0, # Convert to seconds
duration=duration / 1000.0, # Convert to seconds
percentage=percentage
)
self.message_bus.publish(progress_message, broadcast=True)
except Exception as e:
logger.error(f"Failed to send progress update: {e}")
def _on_video_loaded(self, file_path: str):
"""Handle video loaded event"""
try:
logger.info(f"Video loaded: {file_path}")
# Could send status update here
except Exception as e:
logger.error(f"Failed to handle video loaded event: {e}")
def _handle_video_play(self, message: Message):
"""Handle video play message"""
try:
file_path = message.data.get("file_path")
template_data = message.data.get("overlay_data", {})
template_name = message.data.get("template") # Extract template name from message
if not file_path:
logger.error("No file path provided for video play")
return
logger.info(f"Playing video: {file_path}")
logger.info(f"Template name: {template_name}")
if self.window:
self.window.play_video(file_path, template_data, template_name)
except Exception as e:
logger.error(f"Failed to handle video play: {e}")
def _handle_video_pause(self, message: Message):
"""Handle video pause message"""
try:
if self.window:
self.window.media_player.pause()
except Exception as e:
logger.error(f"Failed to handle video pause: {e}")
def _handle_video_stop(self, message: Message):
"""Handle video stop message"""
try:
if self.window:
self.window.stop_playback()
except Exception as e:
logger.error(f"Failed to handle video stop: {e}")
def _handle_video_seek(self, message: Message):
"""Handle video seek message"""
try:
position = message.data.get("position", 0)
if self.window:
duration = self.window.media_player.duration()
if duration > 0:
percentage = int(position * 100 / duration)
self.window.seek_to_position(percentage)
except Exception as e:
logger.error(f"Failed to handle video seek: {e}")
def _handle_template_change(self, message: Message):
"""Handle template change message"""
try:
template_data = message.data.get("template_data", {})
template_name = template_data.get("template_name", "")
change_type = template_data.get("change_type", "")
if self.window:
overlay_view = self.window.video_widget.get_overlay_view()
# Reload template if it's the current one being used
if template_name == overlay_view.current_template:
logger.info(f"Reloading template {template_name} due to {change_type}")
overlay_view.reload_current_template()
# Update overlay data if provided
if template_data:
overlay_view.update_overlay_data(template_data)
except Exception as e:
logger.error(f"Failed to handle template change: {e}")
def _handle_overlay_update(self, message: Message):
"""Handle overlay update message"""
try:
overlay_data = message.data.get("overlay_data", {})
if self.window and overlay_data:
overlay_view = self.window.video_widget.get_overlay_view()
overlay_view.update_overlay_data(overlay_data)
except Exception as e:
logger.error(f"Failed to handle overlay update: {e}")
\ No newline at end of file
let allFixtures = [];
// Load fixtures on page load
document.addEventListener('DOMContentLoaded', function() {
loadFixtures();
// Event listeners
document.getElementById('refresh-btn').addEventListener('click', loadFixtures);
document.getElementById('status-filter').addEventListener('change', filterFixtures);
document.getElementById('upload-filter').addEventListener('change', filterFixtures);
document.getElementById('search-input').addEventListener('input', filterFixtures);
// Reset fixtures button (admin only)
const resetBtn = document.getElementById('reset-fixtures-btn');
if (resetBtn) {
resetBtn.addEventListener('click', resetFixtures);
}
});
function loadFixtures() {
const loading = document.getElementById('loading');
const refreshBtn = document.getElementById('refresh-btn');
loading.style.display = 'block';
refreshBtn.disabled = true;
fetch('/api/fixtures')
.then(response => response.json())
.then(data => {
if (data.success) {
allFixtures = data.fixtures;
updateSummaryCards();
filterFixtures(); // This will also render the table
} else {
alert('Error loading fixtures: ' + (data.error || 'Unknown error'));
}
})
.catch(error => {
console.error('Error:', error);
alert('Failed to load fixtures: ' + error.message);
})
.finally(() => {
loading.style.display = 'none';
refreshBtn.disabled = false;
});
}
function updateSummaryCards() {
const totalCount = allFixtures.length;
const pendingCount = allFixtures.filter(f => f.fixture_status === 'pending').length;
const runningCount = allFixtures.filter(f => f.fixture_status === 'running').length;
const scheduledCount = allFixtures.filter(f => f.fixture_status === 'scheduled').length;
const betCount = allFixtures.filter(f => f.fixture_status === 'bet').length;
const ingameCount = allFixtures.filter(f => f.fixture_status === 'ingame').length;
const endCount = allFixtures.filter(f => f.fixture_status === 'end').length;
document.getElementById('total-count').textContent = totalCount;
document.getElementById('pending-count').textContent = pendingCount;
document.getElementById('running-count').textContent = runningCount;
// Update the summary cards to show more status types
const summaryCards = document.getElementById('summary-cards');
const betInGameCount = betCount + ingameCount;
summaryCards.innerHTML = `
<div class="col-md-2">
<div class="card text-center">
<div class="card-body">
<h5 class="card-title text-primary">
<i class="fas fa-list me-2"></i>Total Fixtures
</h5>
<h3 id="total-count" class="text-primary">` + totalCount + `</h3>
</div>
</div>
</div>
<div class="col-md-2">
<div class="card text-center">
<div class="card-body">
<h5 class="card-title text-warning">
<i class="fas fa-clock me-2"></i>Pending
</h5>
<h3 id="pending-count" class="text-warning">` + pendingCount + `</h3>
</div>
</div>
</div>
<div class="col-md-2">
<div class="card text-center">
<div class="card-body">
<h5 class="card-title text-info">
<i class="fas fa-play me-2"></i>Running
</h5>
<h3 id="running-count" class="text-info">` + runningCount + `</h3>
</div>
</div>
</div>
<div class="col-md-2">
<div class="card text-center">
<div class="card-body">
<h5 class="card-title text-secondary">
<i class="fas fa-calendar me-2"></i>Scheduled
</h5>
<h3 class="text-secondary">` + scheduledCount + `</h3>
</div>
</div>
</div>
<div class="col-md-2">
<div class="card text-center">
<div class="card-body">
<h5 class="card-title text-success">
<i class="fas fa-gamepad me-2"></i>Bet/In Game
</h5>
<h3 class="text-success">` + betInGameCount + `</h3>
</div>
</div>
</div>
<div class="col-md-2">
<div class="card text-center">
<div class="card-body">
<h5 class="card-title text-dark">
<i class="fas fa-stop me-2"></i>End
</h5>
<h3 class="text-dark">` + endCount + `</h3>
</div>
</div>
</div>
`;
}
function filterFixtures() {
const statusFilter = document.getElementById('status-filter').value;
const uploadFilter = document.getElementById('upload-filter').value;
const searchTerm = document.getElementById('search-input').value.toLowerCase();
let filteredFixtures = allFixtures.filter(fixture => {
// Status filter
if (statusFilter && fixture.fixture_status !== statusFilter) {
return false;
}
// Upload filter - check the first match's upload status
if (uploadFilter && fixture.matches && fixture.matches.length > 0) {
if (fixture.matches[0].zip_upload_status !== uploadFilter) {
return false;
}
}
// Search filter
if (searchTerm) {
const searchText = (fixture.fighter1_township + ' ' + fixture.fighter2_township + ' ' + fixture.venue_kampala_township).toLowerCase();
if (!searchText.includes(searchTerm)) return false;
}
return true;
});
renderFixturesTable(filteredFixtures);
document.getElementById('filtered-count').textContent = filteredFixtures.length + ' fixtures';
// Show/hide empty state
const emptyState = document.getElementById('empty-state');
const fixturesTable = document.querySelector('.card .table-responsive').parentElement;
if (filteredFixtures.length === 0 && allFixtures.length === 0) {
emptyState.style.display = 'block';
fixturesTable.style.display = 'none';
} else {
emptyState.style.display = 'none';
fixturesTable.style.display = 'block';
}
}
function renderFixturesTable(fixtures) {
const tbody = document.getElementById('fixtures-tbody');
tbody.innerHTML = '';
fixtures.forEach(fixture => {
const row = document.createElement('tr');
const startTimeInfo = fixture.start_time ? '<br><small class="text-info">Start: ' + new Date(fixture.start_time).toLocaleString() + '</small>' : '';
row.innerHTML = `
<td>
<strong>#` + fixture.match_number + `</strong>
<br>
<small class="text-muted">` + fixture.match_count + ` matches</small>
</td>
<td>
<div class="fw-bold">` + fixture.fighter1_township + `</div>
<small class="text-muted">vs</small>
<div class="fw-bold">` + fixture.fighter2_township + `</div>
</td>
<td>` + fixture.venue_kampala_township + `</td>
<td>` + getFixtureStatusBadge(fixture) + `</td>
<td>` + getUploadStatusBadge(fixture) + `</td>
<td>
<small class="text-muted">
` + new Date(fixture.created_at).toLocaleString() + `
</small>
` + startTimeInfo + `
</td>
<td>
<a href="/fixtures/` + fixture.id + `" class="btn btn-sm btn-outline-primary">
<i class="fas fa-eye me-1"></i>Details
</a>
</td>
`;
tbody.appendChild(row);
});
}
function getFixtureStatusBadge(fixture) {
const status = fixture.fixture_status;
switch (status) {
case 'pending':
return '<span class="badge bg-warning"><i class="fas fa-clock me-1"></i>Pending</span>';
case 'running':
return '<span class="badge bg-info"><i class="fas fa-play me-1"></i>Running</span>';
case 'scheduled':
return '<span class="badge bg-secondary"><i class="fas fa-calendar me-1"></i>Scheduled</span>';
case 'bet':
return '<span class="badge bg-primary"><i class="fas fa-money-bill me-1"></i>Bet</span>';
case 'ingame':
return '<span class="badge bg-success"><i class="fas fa-gamepad me-1"></i>In Game</span>';
case 'end':
return '<span class="badge bg-dark"><i class="fas fa-stop me-1"></i>End</span>';
default:
return '<span class="badge bg-secondary"><i class="fas fa-question me-1"></i>Unknown</span>';
}
}
function getUploadStatusBadge(fixture) {
// Get upload status from the first match in the fixture
if (!fixture.matches || fixture.matches.length === 0) {
return '<span class="badge bg-secondary"><i class="fas fa-clock me-1"></i>Pending</span>';
}
const firstMatch = fixture.matches[0];
const status = firstMatch.zip_upload_status || 'pending';
const progress = firstMatch.zip_upload_progress || 0;
switch (status) {
case 'completed':
return '<span class="badge bg-success"><i class="fas fa-check me-1"></i>Completed</span>';
case 'uploading':
return `<span class="badge bg-info"><i class="fas fa-spinner fa-spin me-1"></i>Uploading (${progress.toFixed(1)}%)</span>`;
case 'failed':
return '<span class="badge bg-danger"><i class="fas fa-times me-1"></i>Failed</span>';
default:
return '<span class="badge bg-secondary"><i class="fas fa-clock me-1"></i>Pending</span>';
}
}
function resetFixtures() {
const confirmMessage = 'WARNING: This will permanently delete ALL fixture data including:\n\n' +
'• All synchronized matches and outcomes\n' +
'• All downloaded ZIP files\n' +
'• This action cannot be undone!\n\n' +
'Are you sure you want to reset all fixtures data?';
if (!confirm(confirmMessage)) {
return;
}
const resetBtn = document.getElementById('reset-fixtures-btn');
const originalText = resetBtn.innerHTML;
resetBtn.disabled = true;
resetBtn.innerHTML = '<i class="fas fa-spinner fa-spin me-1"></i>Resetting...';
fetch('/api/fixtures/reset', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
}
})
.then(response => response.json())
.then(data => {
if (data.success) {
alert(`Fixtures reset successfully!\n\nRemoved:\n• ${data.removed.matches} matches\n• ${data.removed.outcomes} outcomes\n• ${data.removed.zip_files} ZIP files`);
// Reload fixtures to show empty state
loadFixtures();
} else {
alert('Error resetting fixtures: ' + (data.error || 'Unknown error'));
}
})
.catch(error => {
console.error('Error:', error);
alert('Failed to reset fixtures: ' + error.message);
})
.finally(() => {
resetBtn.disabled = false;
resetBtn.innerHTML = originalText;
});
}
#!/usr/bin/env python3
"""
Test script for the Qt6 discovery application
Tests core functionality without requiring a display
"""
import sys
import json
import socket
import threading
import time
from unittest.mock import Mock, patch
def test_imports():
"""Test that all required modules can be imported"""
print("Testing imports...")
try:
import json
print("✓ json module")
import socket
print("✓ socket module")
import threading
print("✓ threading module")
import webbrowser
print("✓ webbrowser module")
from PyQt6.QtWidgets import QApplication
print("✓ PyQt6.QtWidgets")
from PyQt6.QtCore import QTimer, pyqtSignal, QObject, Qt, QThread
print("✓ PyQt6.QtCore")
print("✓ All imports successful")
return True
except ImportError as e:
print(f"✗ Import failed: {e}")
return False
def test_udp_worker_class():
"""Test UDPDiscoveryWorker class without GUI"""
print("\nTesting UDPDiscoveryWorker class...")
try:
# Import the worker class definition
import importlib.util
spec = importlib.util.spec_from_file_location("mbetter_discovery", "mbetter_discovery.py")
discovery_module = importlib.util.module_from_spec(spec)
# Mock PyQt6 to avoid GUI requirements
with patch.dict('sys.modules', {
'PyQt6.QtWidgets': Mock(),
'PyQt6.QtCore': Mock(QObject=object, pyqtSignal=Mock(), QThread=Mock()),
'PyQt6.QtGui': Mock()
}):
spec.loader.exec_module(discovery_module)
# Test worker class can be instantiated
worker = discovery_module.UDPDiscoveryWorker(45123)
print("✓ UDPDiscoveryWorker instantiated")
# Test basic properties
assert worker.listen_port == 45123
assert worker.running == False
print("✓ Worker properties correct")
return True
except Exception as e:
print(f"✗ UDPDiscoveryWorker test failed: {e}")
return False
def test_message_processing():
"""Test message processing logic"""
print("\nTesting message processing...")
try:
# Test valid MBetterClient message
valid_message = {
"service": "MBetterClient",
"host": "192.168.1.100",
"port": 5001,
"ssl": False,
"url": "http://192.168.1.100:5001",
"timestamp": time.time()
}
# Test JSON encoding/decoding (simulates UDP message processing)
json_data = json.dumps(valid_message)
decoded_message = json.loads(json_data)
# Validate message structure
if (isinstance(decoded_message, dict) and
decoded_message.get('service') == 'MBetterClient' and
'url' in decoded_message):
print("✓ Valid message structure detected")
else:
print("✗ Message validation failed")
return False
# Test invalid message
invalid_message = {"service": "OtherService", "url": "http://example.com"}
if not (invalid_message.get('service') == 'MBetterClient'):
print("✓ Invalid message correctly rejected")
return True
except Exception as e:
print(f"✗ Message processing test failed: {e}")
return False
def test_discovery_integration():
"""Test discovery integration with UDP broadcast"""
print("\nTesting discovery integration...")
def mock_listener():
"""Mock UDP listener that receives broadcasts"""
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind(('', 45124)) # Use different port to avoid conflicts
sock.settimeout(3.0)
print("Mock listener started on port 45124")
data, addr = sock.recvfrom(1024)
message = json.loads(data.decode('utf-8'))
if message.get('service') == 'MBetterClient':
print(f"✓ Received test broadcast: {message['url']}")
sock.close()
return True
else:
print("✗ Invalid broadcast received")
sock.close()
return False
except socket.timeout:
print("⚠ No broadcast received (timeout)")
sock.close()
return False
except Exception as e:
print(f"✗ Mock listener error: {e}")
return False
def send_test_broadcast():
"""Send test broadcast to mock listener"""
time.sleep(0.5) # Wait for listener to start
try:
test_message = {
"service": "MBetterClient",
"host": "127.0.0.1",
"port": 5001,
"ssl": False,
"url": "http://127.0.0.1:5001",
"timestamp": time.time()
}
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
data = json.dumps(test_message).encode('utf-8')
sock.sendto(data, ('127.0.0.1', 45124))
sock.close()
print("✓ Test broadcast sent")
except Exception as e:
print(f"✗ Failed to send test broadcast: {e}")
# Start mock listener in thread
listener_thread = threading.Thread(target=mock_listener)
listener_thread.daemon = True
listener_thread.start()
# Send test broadcast
sender_thread = threading.Thread(target=send_test_broadcast)
sender_thread.daemon = True
sender_thread.start()
# Wait for completion
listener_thread.join(timeout=4)
sender_thread.join(timeout=1)
if listener_thread.is_alive():
print("⚠ Integration test timed out")
return False
print("✓ Integration test completed")
return True
def main():
"""Main test function"""
print("=" * 60)
print("Qt6 Discovery Application Test")
print("=" * 60)
tests = [
("Module Imports", test_imports),
("UDPDiscoveryWorker", test_udp_worker_class),
("Message Processing", test_message_processing),
("Discovery Integration", test_discovery_integration),
]
results = []
for test_name, test_func in tests:
print(f"\n--- {test_name} Test ---")
try:
result = test_func()
results.append((test_name, result))
except Exception as e:
print(f"✗ {test_name} test crashed: {e}")
results.append((test_name, False))
# Summary
print("\n" + "=" * 60)
print("Test Results Summary")
print("=" * 60)
passed = 0
total = len(results)
for test_name, result in results:
status = "✓ PASS" if result else "✗ FAIL"
print(f"{test_name:<20} {status}")
if result:
passed += 1
print(f"\nOverall: {passed}/{total} tests passed")
if passed == total:
print("🎉 All tests passed! Discovery application is ready for use.")
return True
else:
print("⚠ Some tests failed. Check the output above for details.")
return False
if __name__ == "__main__":
try:
success = main()
sys.exit(0 if success else 1)
except KeyboardInterrupt:
print("\n\nTest interrupted by user")
sys.exit(1)
\ No newline at end of file
#!/usr/bin/env python3
"""
Test script for UDP broadcast functionality
"""
import socket
import json
import time
import threading
import sys
def test_udp_broadcast():
"""Test UDP broadcast functionality"""
print("Testing UDP broadcast functionality...")
# Test server info
server_info = {
"service": "MBetterClient",
"host": "127.0.0.1",
"port": 5001,
"ssl": False,
"url": "http://127.0.0.1:5001",
"timestamp": time.time()
}
try:
# Create UDP socket for broadcasting
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
# Broadcast data
broadcast_data = json.dumps(server_info).encode('utf-8')
print(f"Broadcasting: {server_info}")
# Send to localhost broadcast and general broadcast
addresses = [
('127.255.255.255', 45123), # Local broadcast
('255.255.255.255', 45123), # Global broadcast
]
for addr in addresses:
try:
sock.sendto(broadcast_data, addr)
print(f"✓ Sent to {addr}")
except Exception as e:
print(f"✗ Failed to send to {addr}: {e}")
sock.close()
print("✓ Broadcast test completed successfully")
return True
except Exception as e:
print(f"✗ Broadcast test failed: {e}")
return False
def test_udp_listener():
"""Test UDP listener functionality"""
print("\nTesting UDP listener functionality...")
def listen_for_broadcasts():
try:
# Create UDP socket for listening
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind(('', 45123))
sock.settimeout(5.0) # 5 second timeout
print("Listening for broadcasts on port 45123...")
while True:
try:
data, addr = sock.recvfrom(1024)
message = json.loads(data.decode('utf-8'))
if message.get('service') == 'MBetterClient':
print(f"✓ Received broadcast from {addr}: {message}")
sock.close()
return True
except socket.timeout:
print("⚠ Timeout - no broadcasts received")
sock.close()
return False
except Exception as e:
print(f"✗ Listener error: {e}")
sock.close()
return False
except Exception as e:
print(f"✗ Failed to create listener: {e}")
return False
# Start listener in separate thread
listener_thread = threading.Thread(target=listen_for_broadcasts)
listener_thread.daemon = True
listener_thread.start()
# Wait a moment then send test broadcast
time.sleep(1)
# Send test broadcast
test_udp_broadcast()
# Wait for listener to complete
listener_thread.join(timeout=6)
if listener_thread.is_alive():
print("✗ Listener test timed out")
return False
else:
print("✓ Listener test completed")
return True
def test_json_structure():
"""Test JSON message structure"""
print("\nTesting JSON message structure...")
# Test valid message
valid_message = {
"service": "MBetterClient",
"host": "192.168.1.100",
"port": 5001,
"ssl": True,
"url": "https://192.168.1.100:5001",
"timestamp": time.time()
}
try:
# Test JSON encoding/decoding
json_data = json.dumps(valid_message)
decoded_data = json.loads(json_data)
# Validate required fields
required_fields = ['service', 'host', 'port', 'ssl', 'url', 'timestamp']
missing_fields = [field for field in required_fields if field not in decoded_data]
if missing_fields:
print(f"✗ Missing required fields: {missing_fields}")
return False
if decoded_data['service'] != 'MBetterClient':
print(f"✗ Invalid service name: {decoded_data['service']}")
return False
print("✓ JSON structure validation passed")
print(f" Sample message: {json_data}")
return True
except Exception as e:
print(f"✗ JSON validation failed: {e}")
return False
def main():
"""Main test function"""
print("=" * 60)
print("UDP Broadcast System Test")
print("=" * 60)
tests = [
("JSON Structure", test_json_structure),
("UDP Broadcast", test_udp_broadcast),
("UDP Listener", test_udp_listener),
]
results = []
for test_name, test_func in tests:
print(f"\n--- {test_name} Test ---")
try:
result = test_func()
results.append((test_name, result))
except Exception as e:
print(f"✗ {test_name} test crashed: {e}")
results.append((test_name, False))
# Summary
print("\n" + "=" * 60)
print("Test Results Summary")
print("=" * 60)
passed = 0
total = len(results)
for test_name, result in results:
status = "✓ PASS" if result else "✗ FAIL"
print(f"{test_name:<20} {status}")
if result:
passed += 1
print(f"\nOverall: {passed}/{total} tests passed")
if passed == total:
print("🎉 All tests passed! UDP broadcast system is working correctly.")
return True
else:
print("⚠ Some tests failed. Check the output above for details.")
return False
if __name__ == "__main__":
try:
success = main()
sys.exit(0 if success else 1)
except KeyboardInterrupt:
print("\n\nTest interrupted by user")
sys.exit(1)
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment