Commit 3b722898 authored by nextime's avatar nextime

Add qbrowser test

parent ee7ebecd
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
qbrowser.py - A Qt6-based browser that wraps and extends the Playwright browser class
This module provides a Qt6-based browser interface that wraps the Playwright browser
API and launches Chromium browser instances without decorations using the --app flag.
Each new page is opened as a new tab in the Qt6 window, and the interface includes
URL input, home button, and browser extension management.
"""
import os
import sys
import subprocess
import tempfile
import json
import shutil
import asyncio
import platform
from pathlib import Path
from typing import Dict, List, Optional, Union, Callable, Any
from PyQt6.QtCore import Qt, QUrl, QProcess, pyqtSignal, QSize, QRect, QTimer
from PyQt6.QtGui import QIcon, QAction, QKeySequence
from PyQt6.QtWidgets import (
QApplication, QMainWindow, QTabWidget, QToolBar, QLineEdit,
QPushButton, QWidget, QVBoxLayout, QHBoxLayout, QDialog,
QLabel, QListWidget, QListWidgetItem, QCheckBox, QMenu,
QSizePolicy, QStyle, QFrame, QSplitter, QMessageBox
)
from PyQt6.QtWebEngineWidgets import QWebEngineView
from PyQt6.QtWebEngineCore import QWebEngineProfile, QWebEngineSettings, QWebEnginePage
# Import Playwright for API compatibility
from playwright.async_api import async_playwright, Browser as PlaywrightBrowser
from playwright.async_api import Page as PlaywrightPage
from playwright.async_api import BrowserContext as PlaywrightBrowserContext
class BrowserTab(QWidget):
"""
A tab widget that contains a web view and manages a Chromium process.
"""
urlChanged = pyqtSignal(str)
titleChanged = pyqtSignal(str)
def __init__(self, parent=None, url="about:blank", user_data_dir=None):
super().__init__(parent)
self.parent = parent
self.url = url
self.user_data_dir = user_data_dir or tempfile.mkdtemp(prefix="qbrowser_")
self.process = None
self.page = None # Playwright page object
# Create a container for the web view
self.layout = QVBoxLayout(self)
self.layout.setContentsMargins(0, 0, 0, 0)
self.layout.setSpacing(0)
# Create a placeholder widget until the Chromium process is ready
self.placeholder = QLabel("Loading browser...")
self.placeholder.setAlignment(Qt.AlignmentFlag.AlignCenter)
self.layout.addWidget(self.placeholder)
# Start the Chromium process
self.start_browser()
def start_browser(self):
"""Initialize the QWebEngineView for browsing."""
# Create a QWebEngineView for the Qt UI
# This will be the actual browser, no separate Chromium process
self.web_view = QWebEngineView()
self.web_view.setEnabled(True) # Enable interaction with this view
# Set up a custom profile for this tab
profile = QWebEngineProfile(self.user_data_dir, self.web_view)
page = QWebEnginePage(profile, self.web_view)
self.web_view.setPage(page)
# Connect signals
self.web_view.loadFinished.connect(self.on_load_finished)
self.web_view.urlChanged.connect(lambda url: self.urlChanged.emit(url.toString()))
self.web_view.titleChanged.connect(lambda title: self.titleChanged.emit(title))
# Load the URL
self.web_view.load(QUrl(self.url))
# Replace the placeholder with the web view
self.layout.removeWidget(self.placeholder)
self.placeholder.deleteLater()
self.layout.addWidget(self.web_view)
print(f"Loading URL in QWebEngineView: {self.url}")
def navigate(self, url):
"""Navigate to the specified URL."""
self.url = url
if hasattr(self, 'web_view'):
print(f"Navigating to: {url}")
self.web_view.load(QUrl(url))
# If we have a Playwright page, navigate it as well for API compatibility
if self.page:
asyncio.create_task(self.page.goto(url))
def on_load_finished(self, success):
"""Handle the page load finishing."""
if success:
print(f"Page loaded successfully: {self.web_view.url().toString()}")
# Update the title if available
title = self.web_view.title()
if title:
self.titleChanged.emit(title)
else:
print(f"Failed to load page: {self.url}")
# We no longer need process handling methods since we're using QWebEngineView directly
def close(self):
"""Close the tab."""
# Clean up the user data directory
try:
if os.path.exists(self.user_data_dir) and self.user_data_dir.startswith(tempfile.gettempdir()):
shutil.rmtree(self.user_data_dir)
except Exception as e:
print(f"Error cleaning up user data directory: {e}")
# Close the web view
if hasattr(self, 'web_view'):
self.web_view.close()
super().close()
class ExtensionDialog(QDialog):
"""
Dialog for managing browser extensions.
"""
def __init__(self, parent=None, extensions_dir=None):
super().__init__(parent)
self.setWindowTitle("Browser Extensions")
self.setMinimumSize(500, 400)
self.extensions_dir = extensions_dir
# Create layout
layout = QVBoxLayout(self)
# Create extensions list
self.extensions_list = QListWidget()
layout.addWidget(self.extensions_list)
# Create buttons
button_layout = QHBoxLayout()
self.install_button = QPushButton("Install New Extension")
self.remove_button = QPushButton("Remove Extension")
self.enable_button = QPushButton("Enable/Disable")
self.close_button = QPushButton("Close")
button_layout.addWidget(self.install_button)
button_layout.addWidget(self.remove_button)
button_layout.addWidget(self.enable_button)
button_layout.addWidget(self.close_button)
layout.addLayout(button_layout)
# Connect signals
self.close_button.clicked.connect(self.accept)
self.install_button.clicked.connect(self.install_extension)
self.remove_button.clicked.connect(self.remove_extension)
self.enable_button.clicked.connect(self.toggle_extension)
# Load extensions
self.load_extensions()
def load_extensions(self):
"""Load and display installed extensions."""
self.extensions_list.clear()
if not self.extensions_dir or not os.path.exists(self.extensions_dir):
self.extensions_list.addItem("No extensions directory found")
return
# Find all extension directories
for ext_dir in os.listdir(self.extensions_dir):
ext_path = os.path.join(self.extensions_dir, ext_dir)
if os.path.isdir(ext_path):
# Try to read the manifest
manifest_path = os.path.join(ext_path, "manifest.json")
if os.path.exists(manifest_path):
try:
with open(manifest_path, 'r') as f:
manifest = json.load(f)
name = manifest.get("name", ext_dir)
version = manifest.get("version", "unknown")
item = QListWidgetItem(f"{name} (v{version})")
item.setData(Qt.ItemDataRole.UserRole, ext_path)
# Check if extension is enabled
enabled = True # TODO: Implement proper extension state checking
item.setCheckState(
Qt.CheckState.Checked if enabled else Qt.CheckState.Unchecked
)
self.extensions_list.addItem(item)
except Exception as e:
print(f"Error loading extension manifest: {e}")
item = QListWidgetItem(f"{ext_dir} (Error loading manifest)")
item.setData(Qt.ItemDataRole.UserRole, ext_path)
self.extensions_list.addItem(item)
def install_extension(self):
"""Install a new extension."""
# TODO: Implement extension installation
print("Extension installation not implemented yet")
QMessageBox.information(self, "Not Implemented", "Extension installation is not implemented yet.")
def remove_extension(self):
"""Remove the selected extension."""
selected_items = self.extensions_list.selectedItems()
if not selected_items:
return
for item in selected_items:
ext_path = item.data(Qt.ItemDataRole.UserRole)
if ext_path and os.path.exists(ext_path):
try:
shutil.rmtree(ext_path)
print(f"Removed extension: {ext_path}")
except Exception as e:
print(f"Error removing extension: {e}")
self.load_extensions()
def toggle_extension(self):
"""Toggle the enabled state of the selected extension."""
selected_items = self.extensions_list.selectedItems()
if not selected_items:
return
for item in selected_items:
current_state = item.checkState()
new_state = Qt.CheckState.Unchecked if current_state == Qt.CheckState.Checked else Qt.CheckState.Checked
item.setCheckState(new_state)
# TODO: Implement proper extension state saving
ext_path = item.data(Qt.ItemDataRole.UserRole)
enabled = new_state == Qt.CheckState.Checked
print(f"Extension {ext_path} {'enabled' if enabled else 'disabled'}")
class Browser(QMainWindow):
"""
Main browser window that wraps the Playwright browser class.
"""
def __init__(self):
super().__init__()
self.setWindowTitle("QBrowser")
self.resize(1024, 768)
# Create a central widget and layout
self.central_widget = QWidget()
self.setCentralWidget(self.central_widget)
self.layout = QVBoxLayout(self.central_widget)
self.layout.setContentsMargins(0, 0, 0, 0)
self.layout.setSpacing(0)
# Create toolbar
self.toolbar = QToolBar()
self.toolbar.setMovable(False)
self.toolbar.setIconSize(QSize(16, 16))
# Add home button
self.home_button = QPushButton()
self.home_button.setIcon(self.style().standardIcon(QStyle.StandardPixmap.SP_ArrowBack))
self.home_button.setToolTip("Home")
self.home_button.clicked.connect(self.navigate_home)
self.toolbar.addWidget(self.home_button)
# Add URL input
self.url_input = QLineEdit()
self.url_input.setPlaceholderText("Enter URL...")
self.url_input.returnPressed.connect(self.navigate_to_url)
self.toolbar.addWidget(self.url_input)
# Add extensions button
self.extensions_button = QPushButton()
self.extensions_button.setIcon(self.style().standardIcon(QStyle.StandardPixmap.SP_FileDialogDetailedView))
self.extensions_button.setToolTip("Extensions")
self.extensions_button.clicked.connect(self.show_extensions)
self.toolbar.addWidget(self.extensions_button)
self.layout.addWidget(self.toolbar)
# Create tab widget
self.tabs = QTabWidget()
self.tabs.setTabsClosable(True)
self.tabs.setMovable(True)
self.tabs.setDocumentMode(True)
self.tabs.tabCloseRequested.connect(self.close_tab)
self.tabs.currentChanged.connect(self.tab_changed)
self.layout.addWidget(self.tabs)
# Add a new tab button
self.tabs.setCornerWidget(self._create_new_tab_button(), Qt.Corner.TopRightCorner)
# Set the home URL
self.home_url = "https://www.google.com"
# Create a new tab on startup
self.new_page(self.home_url)
# Set up keyboard shortcuts
self._setup_shortcuts()
# Store Playwright objects
self.playwright = None
self.playwright_browser = None
self.browser_contexts = {}
def _create_new_tab_button(self):
"""Create a button for adding new tabs."""
button = QPushButton("+")
button.setToolTip("New Tab")
button.clicked.connect(lambda: self.new_page(self.home_url))
return button
def _setup_shortcuts(self):
"""Set up keyboard shortcuts."""
# Ctrl+T: New Tab
new_tab_shortcut = QAction("New Tab", self)
new_tab_shortcut.setShortcut(QKeySequence("Ctrl+T"))
new_tab_shortcut.triggered.connect(lambda: self.new_page(self.home_url))
self.addAction(new_tab_shortcut)
# Ctrl+W: Close Tab
close_tab_shortcut = QAction("Close Tab", self)
close_tab_shortcut.setShortcut(QKeySequence("Ctrl+W"))
close_tab_shortcut.triggered.connect(lambda: self.close_tab(self.tabs.currentIndex()))
self.addAction(close_tab_shortcut)
# Ctrl+L: Focus URL bar
focus_url_shortcut = QAction("Focus URL", self)
focus_url_shortcut.setShortcut(QKeySequence("Ctrl+L"))
focus_url_shortcut.triggered.connect(self.url_input.setFocus)
self.addAction(focus_url_shortcut)
def new_page(self, url="about:blank"):
"""Create a new browser tab with the specified URL."""
# Create a new tab
tab = BrowserTab(parent=self.tabs, url=url)
# Store a reference to the Browser instance
tab.browser = self
tab.urlChanged.connect(self.update_url)
tab.titleChanged.connect(lambda title, tab=tab: self.update_title(title, tab))
# Add the tab to the tab widget
index = self.tabs.addTab(tab, "New Tab")
self.tabs.setCurrentIndex(index)
return tab
def close_tab(self, index):
"""Close the tab at the specified index."""
if index < 0 or index >= self.tabs.count():
return
# Get the tab widget
tab = self.tabs.widget(index)
# Close the tab
self.tabs.removeTab(index)
# Close the tab widget
if tab:
tab.close()
# If no tabs are left, create a new one
if self.tabs.count() == 0:
self.new_page(self.home_url)
def tab_changed(self, index):
"""Handle tab change events."""
if index < 0:
return
# Update the URL input
tab = self.tabs.widget(index)
if hasattr(tab, 'url'):
self.url_input.setText(tab.url)
def update_url(self, url):
"""Update the URL input when the page URL changes."""
self.url_input.setText(url)
# Update the tab's stored URL
current_tab = self.tabs.currentWidget()
if current_tab:
current_tab.url = url
def update_title(self, title, tab):
"""Update the tab title when the page title changes."""
index = self.tabs.indexOf(tab)
if index >= 0:
self.tabs.setTabText(index, title or "New Tab")
def navigate_to_url(self):
"""Navigate to the URL in the URL input."""
url = self.url_input.text().strip()
# Add http:// if no protocol is specified
if url and not url.startswith(("http://", "https://", "file://", "about:")):
url = "http://" + url
# Navigate the current tab
current_tab = self.tabs.currentWidget()
if current_tab:
current_tab.navigate(url)
def navigate_home(self):
"""Navigate to the home URL."""
current_tab = self.tabs.currentWidget()
if current_tab:
current_tab.navigate(self.home_url)
def show_extensions(self):
"""Show the extensions dialog."""
# Find the extensions directory
# This is a simplified approach; in reality, you'd need to find the actual extensions directory
user_data_dir = None
current_tab = self.tabs.currentWidget()
if current_tab and hasattr(current_tab, 'user_data_dir'):
user_data_dir = current_tab.user_data_dir
extensions_dir = None
if user_data_dir:
possible_ext_dirs = [
os.path.join(user_data_dir, "Extensions"),
os.path.join(user_data_dir, "Default", "Extensions")
]
for dir_path in possible_ext_dirs:
if os.path.exists(dir_path) and os.path.isdir(dir_path):
extensions_dir = dir_path
break
dialog = ExtensionDialog(self, extensions_dir)
dialog.exec()
class QPlaywrightBrowser:
"""
A class that wraps the Playwright browser API and uses Qt6Browser internally.
This class exposes the Playwright API while using the Qt6 browser for display.
"""
def __init__(self):
self.app = QApplication.instance() or QApplication(sys.argv)
self.browser_ui = None
self.playwright = None
self.browser = None
self.contexts = {}
self.event_loop = None
async def launch(self, **kwargs):
"""Launch a new browser instance."""
# Initialize Playwright if not already done
if not self.playwright:
self.playwright = await async_playwright().start()
# Create the Qt browser UI
if not self.browser_ui:
self.browser_ui = Browser()
self.browser_ui.show()
# Launch the actual Playwright browser (hidden)
# We'll use this for API compatibility but display in Qt
self.browser = await self.playwright.chromium.launch(
headless=True, # Run headless since we're displaying in Qt
**kwargs
)
# Create a default context
default_context = await self.browser.new_context()
self.contexts["default"] = default_context
return self
async def new_context(self, **kwargs):
"""Create a new browser context."""
if not self.browser:
await self.launch()
# Create a new context in the Playwright browser
context_id = f"context_{len(self.contexts)}"
context = await self.browser.new_context(**kwargs)
self.contexts[context_id] = context
# Return a wrapper that provides both Playwright API and Qt UI
return QPlaywrightBrowserContext(self, context, context_id)
async def new_page(self, url="about:blank"):
"""Create a new page in the default context."""
if "default" not in self.contexts:
await self.launch()
# Create a new page in the Playwright browser
pw_page = await self.contexts["default"].new_page()
await pw_page.goto(url)
# Create a new tab in the Qt browser
qt_tab = self.browser_ui.new_page(url)
qt_tab.page = pw_page
# Return a wrapper that provides both Playwright API and Qt UI
return QPlaywrightPage(pw_page, qt_tab)
async def close(self):
"""Close the browser."""
# Close all Playwright contexts and browser
if self.browser:
for context_id, context in self.contexts.items():
await context.close()
await self.browser.close()
self.contexts = {}
self.browser = None
# Close the Qt browser UI
if self.browser_ui:
self.browser_ui.close()
self.browser_ui = None
# Close Playwright
if self.playwright:
await self.playwright.stop()
self.playwright = None
def run(self):
"""Run the application event loop."""
return self.app.exec()
def run_async(self, coro):
"""Run an async coroutine in the Qt event loop."""
if not self.event_loop:
self.event_loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.event_loop)
# Create a QTimer to process asyncio events
timer = QTimer()
timer.timeout.connect(lambda: self.event_loop.run_until_complete(asyncio.sleep(0)))
timer.start(10) # 10ms interval
# Run the coroutine
future = asyncio.run_coroutine_threadsafe(coro, self.event_loop)
return future
class QPlaywrightBrowserContext:
"""
A wrapper around a Playwright BrowserContext that provides both
Playwright API and Qt UI integration.
"""
def __init__(self, qbrowser, context, context_id):
self.qbrowser = qbrowser
self.context = context
self.context_id = context_id
self.pages = []
async def new_page(self):
"""Create a new page in this context."""
# Create a new page in the Playwright context
pw_page = await self.context.new_page()
# Create a new tab in the Qt browser
qt_tab = self.qbrowser.browser_ui.new_page("about:blank")
qt_tab.page = pw_page
# Create a wrapper page
page = QPlaywrightPage(pw_page, qt_tab)
self.pages.append(page)
return page
async def close(self):
"""Close this context."""
# Close all pages
for page in self.pages:
await page.close()
# Close the Playwright context
await self.context.close()
# Remove from the qbrowser contexts
if self.context_id in self.qbrowser.contexts:
del self.qbrowser.contexts[self.context_id]
class QPlaywrightPage:
"""
A wrapper around a Playwright Page that provides both
Playwright API and Qt UI integration.
"""
def __init__(self, pw_page, qt_tab):
self.pw_page = pw_page
self.qt_tab = qt_tab
# Forward Playwright page methods and properties
self.goto = pw_page.goto
self.click = pw_page.click
self.fill = pw_page.fill
self.type = pw_page.type
self.press = pw_page.press
self.wait_for_selector = pw_page.wait_for_selector
self.wait_for_navigation = pw_page.wait_for_navigation
self.wait_for_load_state = pw_page.wait_for_load_state
self.evaluate = pw_page.evaluate
self.screenshot = pw_page.screenshot
self.content = pw_page.content
self.title = pw_page.title
self.url = pw_page.url
async def close(self):
"""Close this page."""
# Close the Playwright page
await self.pw_page.close()
# Close the Qt tab
# We need to use the Qt event loop for this
index = self.qt_tab.parent.indexOf(self.qt_tab)
if index >= 0:
self.qt_tab.parent.close_tab(index)
async def main_async():
"""Async main function to run the browser."""
browser = QPlaywrightBrowser()
await browser.launch()
# Open a page
url = "https://www.google.com"
if len(sys.argv) > 1:
url = sys.argv[1]
await browser.new_page(url)
# Keep the browser running
while True:
await asyncio.sleep(0.1)
def main():
"""Main function to run the browser as a standalone application."""
app = QApplication.instance() or QApplication(sys.argv)
# Create a simple browser directly without Playwright for now
# This ensures we at least get a window showing
browser = Browser()
browser.show()
# If a URL is provided as a command-line argument, navigate to it
if len(sys.argv) > 1:
url = sys.argv[1]
browser.new_page(url)
print("Browser window should be visible now")
# Run the Qt event loop
return app.exec()
if __name__ == "__main__":
sys.exit(main())
\ No newline at end of file
......@@ -7,6 +7,9 @@ stream_url = https://192.168.42.1/HLS/record/Live.m3u8
rtmp_url = rtmp://192.168.42.1/record/Live
port = 5000
[Browser]
auto_launch = true
[Tkinter]
window_title = SHM Cam Studio
button_width = 20
......
......@@ -6,49 +6,111 @@ import os
from datetime import datetime
from pathlib import Path
import logging
from typing import List, Optional
import uuid
from playwright_helper import BASE_PROFILE_DIR, BASE_STATE_DIR, PersistentBrowser
logging.getLogger(__name__)
async def new_context(url, persistent_browser: PersistentBrowser, context_id: str, extension_paths: List[Path], **kwargs):
logging.info(kwargs)
# Create a new context with the pre-set extensions
context = await persistent_browser.new_context(
context_id=context_id,
extension_paths=extension_paths,
**kwargs
)
# Open a new page in the context
page = await context.new_page()
# Navigate to a sample website
await page.goto(url)
# Print the page title as an example action
logging.info(f"Page title for context {context_id}: {await page.title()}")
return context
async def monitor_stream(log_path, domain):
async with async_playwright() as p:
browser_is_pw = False
try:
browser = await p.chromium.connect_over_cdp("http://localhost:9222")
except:
user_data_dir = './user-data'
cont = await p.chromium.launch_persistent_context(user_data_dir,
#viewport=None,
headless=False,
#try:
# browser = await p.chromium.connect_over_cdp("http://localhost:9222")
#except:
# user_data_dir = './user-data'
# cont = await p.chromium.launch_persistent_context(user_data_dir,
# #viewport=None,
# headless=False,
# args=[
# "--disable-infobars",
# #"--disable-features=Antaniu"
# ],
# ignore_default_args=["--enable-automation", "--no-sandbox"],)
# pages = cont.pages
# browser_is_pw = True
## if pages:
# page = pages[0]
# else:
# page = await cont.new_page()
# logger.info(page.url)
# await page.goto('http://localhost:5000/chat')
# logger.info("Browser session retained")
# #await browser.close()
# browser = cont.browser
# Ensure base directories exist
BASE_PROFILE_DIR.mkdir(parents=True, exist_ok=True)
BASE_STATE_DIR.mkdir(parents=True, exist_ok=True)
use_persistent_context = False
# Define pre-set extensions to be copied to all contexts
EXTENSION_PATHS = [
Path("./my_extension1"), # Example: use forward slashes for cross-platform compatibility
Path("./my_extension2") # Replace with actual paths to unpacked extension directories
]
# Create simulated browser with the chosen mode and extensions
browser = PersistentBrowser(p, BASE_PROFILE_DIR, use_persistent_context, EXTENSION_PATHS,
args=[
"--disable-infobars",
#"--disable-features=Antaniu"
"--disable-features=Antaniu"
],
ignore_default_args=["--enable-automation", "--no-sandbox"],)
pages = cont.pages
browser_is_pw = True
if pages:
page = pages[0]
else:
page = await cont.new_page()
logger.info(page.url)
await page.goto('http://localhost:5000/chat')
logger.info("Browser session retained")
#await browser.close()
browser = cont.browser
ignore_default_args=["--enable-automation", "--no-sandbox"])
await browser.launch()
# Create tasks for multiple contexts
tasks = []
context_id = str(uuid.uuid4()) # Unique ID for each context
##tasks.append(new_context("http://localhost:5000/chat", browser, context_id, EXTENSION_PATHS))
##await asyncio.gather(*tasks)
await new_context("http://localhost:5000/chat", browser, context_id, EXTENSION_PATHS,viewport=None)
bw = 0
dw = 0
bh = 0
bw = 0
while True:
browser = cont.browser
if browser:
contexts = browser.contexts
else:
contexts = [cont]
while len(browser.contexts()) > 0:
#browser = cont.browser
#if browser:
# contexts = browser.contexts
#else:
# contexts = [cont]
contexts = browser.contexts()
logging.info("MUMBER OF CONTEXTS: %d", len(browser.contexts()))
for context in contexts:
logger.debug("@@@@@@@@@@@ NEW CONTEXT @@@@@@@@")
if browser_is_pw:
sizedone=False
logging.info("NUMBER OF PAGES IN THIS CONTEXT: %d", len(context.pages))
for page in context.pages:
logger.info("newpagess----------------")
if browser_is_pw and not sizedone:
......@@ -76,10 +138,17 @@ async def monitor_stream(log_path, domain):
# XXX THIS IS FUCKONG BLOCKING, TODO: RAKE IT OUT AND PUT THE
# WHOLE THING IN A STATE MACHINE
await monitor_requests(page, log_path)
if len(context.pages) < 1 and len(browser.contexts()) == 1:
#await context.close()
logging.info("CLOOOOOOSSSINNNNNNGGGG")
await browser.close()
# XXX TODO: I ALREADY SAID MAKE A FUCKING STATE MACHINE?
logger.info("PORCDIOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOO\b\b")
await asyncio.sleep(3) # Check every 0.1 seconds
#logger.info("PORCDIOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOO\b\b")
await asyncio.sleep(.1) # Check every 0.1 seconds
# Close the simulated browser, saving states and backing up profiles
#await browser.close()
async def monitor_requests(page, log_path):
async def log_request(request):
......@@ -122,12 +191,13 @@ async def monitor_requests(page, log_path):
def run_browser():
lpath=str(Path(Path.home(), Path('streamon.log')))
#parser = argparse.ArgumentParser(description="Monitor Streamate requests")
#parser.add_argument("--log_path", type=str, default=lpath,
# help="Path to the log file")
#parser.add_argument("--domain", type=str, default="performerclient.streamatemodels.com",
# help="URL to monitor")
#args = parser.parse_args()
# Check if browser should be launched automatically
auto_launch = config.getboolean('Browser', 'auto_launch', fallback=False)
if not auto_launch:
logging.info("Browser auto-launch is disabled. Set auto_launch=true in [Browser] section of shmcamstudio.conf to enable.")
return
logging.info("Starting browser...")
log_dir = os.path.dirname(lpath)
if log_dir and not os.path.exists(log_dir):
......
# -*- coding: utf-8 -*-
from playwright.async_api import async_playwright, Browser, BrowserContext
import json
import shutil
import errno
from pathlib import Path
import asyncio
import uuid
from typing import List, Optional
# Base directories for storing profile data and storage states (cross-platform)
BASE_PROFILE_DIR = Path("chromium_profiles") # Directory for user data/profile directories
BASE_STATE_DIR = Path("browser_states") # Directory for storage state JSON files
def copytree_ignore_errors(src: Path, dst: Path, ignore_patterns: list = ["SingletonLock", "SingletonCookie", "SingletonSocket"]):
"""Copy a directory tree, ignoring specified files and handling errors gracefully."""
# Define a function to ignore files matching specified patterns (e.g., Singleton* files)
def ignore_files(directory, files):
return [f for f in files if any(pattern in f for pattern in ignore_patterns)]
import json
from playwright.async_api import async_playwright, Browser, BrowserContext, Page
from PyQt6.QtWidgets import QApplication, QMainWindow, QTabWidget, QWidget, QPushButton, QHBoxLayout, QVBoxLayout
from PyQt6.QtWebEngineWidgets import QWebEngineView
from PyQt6.QtCore import QUrl, QTimer, Qt
import sys
import threading
import time
from typing import Optional, List
class CloseableTabWidget(QTabWidget):
def __init__(self, parent=None):
super().__init__(parent)
self.setTabsClosable(False) # We'll manage close buttons manually
def addTab(self, widget: QWebEngineView, title: str) -> int:
# Create a container widget for the tab
container = QWidget()
layout = QVBoxLayout()
layout.setContentsMargins(0, 0, 0, 0)
# Add web view
layout.addWidget(widget)
# Create header with title and close button
header = QWidget()
header_layout = QHBoxLayout()
header_layout.setContentsMargins(0, 0, 0, 0)
# Close button
close_button = QPushButton("X")
close_button.setFixedSize(20, 20)
close_button.setStyleSheet("font-weight: bold; color: red; border: none;")
close_button.clicked.connect(lambda: self._close_tab(self.indexOf(container)))
header_layout.addStretch()
header_layout.addWidget(close_button)
header.setLayout(header_layout)
layout.addWidget(header)
container.setLayout(layout)
# Add to tab widget
index = super().addTab(container, title)
self.setTabText(index, title)
return index
def _close_tab(self, index: int):
if index >= 0:
widget = self.widget(index)
web_view = widget.findChild(QWebEngineView)
if web_view:
web_view.deleteLater()
self.removeTab(index)
widget.deleteLater()
class BrowserApp(QMainWindow):
def __init__(self):
super().__init__()
self.setWindowTitle("Embedded Chromium Tabs")
self.setGeometry(100, 100, 800, 600)
# Create tab widget
self.tab_widget = CloseableTabWidget(self)
self.setCentralWidget(self.tab_widget)
# Store browser instances, views, and pages
self.browsers: List[Browser] = []
self.web_views: List[QWebEngineView] = []
self.pages: List[Page] = []
self.contexts: List[BrowserContext] = []
# Asyncio event loop
self.loop = asyncio.get_event_loop()
def add_tab(self, url: str, page: Page, context: BrowserContext, browser: Browser, tab_idx: int):
# Create QWebEngineView for the tab
web_view = QWebEngineView()
web_view.setUrl(QUrl(url))
self.tab_widget.addTab(web_view, f"Tab {tab_idx+1}")
self.web_views.append(web_view)
self.pages.append(page)
self.contexts.append(context)
self.browsers.append(browser)
# Sync Playwright with QWebEngineView
QTimer.singleShot(1000, lambda: self.loop.create_task(self._sync_viewport(page, web_view, tab_idx)))
# Register event handlers
page.on("console", self._make_console_handler(page, tab_idx))
context.on("page", self._make_page_handler(tab_idx))
async def _sync_viewport(self, page: Page, web_view: QWebEngineView, tab_idx: int):
try:
width = web_view.width()
height = web_view.height()
await page.set_viewport_size({"width": width, "height": height})
print(f"Tab {tab_idx+1}: Synced viewport to {width}x{height}")
except Exception as e:
print(f"Tab {tab_idx+1}: Error syncing viewport: {e}")
def _make_console_handler(self, page: Page, tab_idx: int):
async def on_console(msg):
if msg.text.startswith("Viewport resized:"):
try:
# Copy the directory tree, ignoring specified files
shutil.copytree(src, dst, ignore=ignore_files, dirs_exist_ok=False)
except shutil.Error as e:
# Log non-critical copy errors (e.g., permission issues)
print(f"Non-critical errors during copy: {e}")
except OSError as e:
# Handle specific OS errors (e.g., file not found, device busy) gracefully
if e.errno not in (errno.ENOENT, errno.EAGAIN, errno.EBUSY):
raise
print(f"Skipped problematic file during copy: {e}")
class SimulatedBrowser:
"""Simulates a Playwright Browser object using either launch_persistent_context or browser.launch."""
def __init__(self, playwright, base_profile_dir: Path, use_persistent_context: bool, extension_paths: List[Path] = None):
"""Initialize the simulated browser."""
# Store Playwright instance for browser operations
viewport_data = json.loads(msg.text.replace("Viewport resized:", ""))
width = int(viewport_data["width"])
height = int(viewport_data["height"])
pixel_ratio = viewport_data["devicePixelRatio"]
print(f"Tab {tab_idx+1}: Real viewport size: {width}x{height} (DPR: {pixel_ratio})")
await page.set_viewport_size({"width": width, "height": height})
except Exception as e:
print(f"Tab {tab_idx+1}: Error parsing viewport data: {e}")
return on_console
def _make_page_handler(self, tab_idx: int):
async def on_new_page(new_page: Page):
print(f"Tab {tab_idx+1}: New page created")
new_url = await new_page.evaluate("window.location.href")
new_web_view = QWebEngineView()
new_web_view.setUrl(QUrl(new_url))
new_tab_idx = len(self.web_views)
self.tab_widget.addTab(new_web_view, f"Tab {new_tab_idx+1}")
self.web_views.append(new_web_view)
self.pages.append(new_page)
new_page.on("console", self._make_console_handler(new_page, tab_idx))
self.loop.create_task(self._sync_viewport(new_page, new_web_view, tab_idx))
return on_new_page
def _on_resize(self, event):
for i, (page, web_view) in enumerate(zip(self.pages, self.web_views)):
if self.tab_widget.currentIndex() == i or len(self.web_views) == 1:
self.loop.create_task(self._sync_viewport(page, web_view, i))
def closeEvent(self, event):
for browser in self.browsers:
self.loop.create_task(browser.close())
event.accept()
class QtPlaywrightBrowser:
def __init__(self, playwright):
self._playwright = playwright
# Base directory for profile data
self._base_profile_dir = base_profile_dir
# Flag to choose between launch_persistent_context and browser.launch
self._use_persistent_context = use_persistent_context
# List of extension paths to copy to each context (converted to Path objects)
self._extension_paths = [Path(p) for p in (extension_paths or [])]
# Dictionary to store context_id -> BrowserContext
self._contexts = {}
# Browser instance for browser.launch mode
self._browser = None
# Flag to track if the browser is closed
self._is_closed = False
async def launch(self):
"""Simulate browser.launch() by preparing the environment."""
# Create base profile directory if it doesn't exist
self._base_profile_dir.mkdir(parents=True, exist_ok=True)
# If using browser.launch mode, launch a single Chromium instance
if not self._use_persistent_context:
self._browser = await self._playwright.chromium.launch(headless=False)
print("Simulated browser launched.")
return self
async def new_context(self, context_id: str = None, **kwargs) -> BrowserContext:
"""Create a new context with its own persistent state."""
# Check if the browser is closed
if self._is_closed:
raise RuntimeError("Browser is closed.")
# Generate a unique context ID if not provided
context_id = context_id or str(uuid.uuid4())
# Create or restore profile directory for this context
profile_dir = self._get_profile_dir(context_id)
if not profile_dir.exists():
restore_profile_dir(context_id, self._base_profile_dir)
create_profile_dir(context_id, self._base_profile_dir)
# Create context based on mode
if self._use_persistent_context:
# Use launch_persistent_context for native persistence via user_data_dir
context = await self._playwright.chromium.launch_persistent_context(
user_data_dir=profile_dir,
self._app = QApplication.instance() or QApplication(sys.argv)
self._window = BrowserApp()
self._window.show()
self._loop = asyncio.get_event_loop()
self._contexts: List[BrowserContext] = []
self._tab_idx = 0
async def new_context(self, **kwargs) -> BrowserContext:
browser = await self._playwright.chromium.launch(
headless=False,
viewport=kwargs.get("viewport", {"width": 1280, "height": 720}),
**{k: v for k, v in kwargs.items() if k != "extension_paths"} # Filter out extension_paths
)
else:
# Use browser.launch with manual persistence
if not self._browser:
raise RuntimeError("Browser not launched.")
context = await self._browser.new_context(
viewport=kwargs.get("viewport", {"width": 1280, "height": 720}),
**{k: v for k, v in kwargs.items() if k != "extension_paths"}
args=["--app=https://www.google.com"]
)
context = await browser.new_context(**kwargs)
self._contexts.append(context)
# Inject JavaScript for resize events
await context.add_init_script("""
let resizeTimeout;
window.addEventListener('resize', () => {
clearTimeout(resizeTimeout);
resizeTimeout = setTimeout(() => {
const viewportSize = {
width: window.innerWidth,
height: window.innerHeight,
devicePixelRatio: window.devicePixelRatio
};
console.log('Viewport resized:', JSON.stringify(viewportSize));
}, 100);
});
window.dispatchEvent(new Event('resize'));
""")
# Load storage state manually if it exists
state_file = BASE_STATE_DIR / f"browser_state_{context_id}.json"
if state_file.exists():
with open(state_file, 'r', encoding='utf-8') as f:
storage_state = json.load(f)
# Load cookies from storage state
if "cookies" in storage_state:
await context.add_cookies(storage_state["cookies"])
# Load local storage from storage state
if "origins" in storage_state:
for origin in storage_state["origins"]:
for item in origin.get("localStorage", []):
await context.evaluate(
"""
({name, value}) => {
window.localStorage.setItem(name, value);
}
""",
{"name": item["name"], "value": item["value"]}
)
print(f"Loaded browser state for context {context_id} from {state_file}")
# Store the context
self._contexts[context_id] = context
print(f"Created new context with ID {context_id} using profile {profile_dir}")
return context
# Install pre-set extensions for this context
for ext_path in self._extension_paths:
await install_extension(context, ext_path, context_id)
async def new_page(self, url: str = "https://about:blank") -> Page:
context = self._contexts[-1] if self._contexts else await self.new_context()
for _ in range(10):
page = await context.new_page()
if page:
break
print(f"Tab {self._tab_idx+1}: Waiting for page to initialize...")
await asyncio.sleep(0.5)
else:
raise RuntimeError(f"Failed to initialize page for {url}")
return context
await page.goto(url)
await page.set_viewport_size({"width": 800, "height": 600})
self._window.add_tab(url, page, context, context.browser, self._tab_idx)
self._tab_idx += 1
return page
async def close(self):
"""Close all contexts and mark the browser as closed."""
if not self._is_closed:
for context_id, context in self._contexts.items():
# Save storage state before closing
await save_browser_state(context, context_id)
# Close context to release Singleton* files
for context in self._contexts:
await context.close()
# Backup profile directory after closing (only for persistent context mode)
if self._use_persistent_context:
backup_profile_dir(context_id, self._base_profile_dir)
# Close the browser if in browser.launch mode
if self._browser:
await self._browser.close()
self._contexts.clear()
self._is_closed = True
print("Simulated browser closed.")
def _get_profile_dir(self, context_id: str) -> Path:
"""Get the profile directory for a context."""
return self._base_profile_dir / f"chromium_profile_{context_id}"
async def save_browser_state(context: BrowserContext, context_id: str):
"""Save the browser's storage state for a specific context."""
state_file = BASE_STATE_DIR / f"browser_state_{context_id}.json"
await context.storage_state(path=state_file)
print(f"Saved browser state for context {context_id} to {state_file}")
def create_profile_dir(context_id: str, base_profile_dir: Path):
"""Create a profile directory for a specific context if it doesn't exist."""
profile_dir = base_profile_dir / f"chromium_profile_{context_id}"
profile_dir.mkdir(parents=True, exist_ok=True)
print(f"Created profile directory for context {context_id} at {profile_dir}")
def backup_profile_dir(context_id: str, base_profile_dir: Path):
"""Copy the profile directory for a specific context to a backup location."""
profile_dir = base_profile_dir / f"chromium_profile_{context_id}"
backup_dir = base_profile_dir / f"chromium_profile_backup_{context_id}"
if profile_dir.exists():
if backup_dir.exists():
shutil.rmtree(backup_dir) # Remove old backup
copytree_ignore_errors(profile_dir, backup_dir)
print(f"Backed up profile directory for context {context_id} to {backup_dir}")
def restore_profile_dir(context_id: str, base_profile_dir: Path):
"""Restore the profile directory for a specific context from the backup."""
profile_dir = base_profile_dir / f"chromium_profile_{context_id}"
backup_dir = base_profile_dir / f"chromium_profile_backup_{context_id}"
if backup_dir.exists():
if profile_dir.exists():
shutil.rmtree(profile_dir) # Remove current profile dir
copytree_ignore_errors(backup_dir, profile_dir)
print(f"Restored profile directory for context {context_id} from {backup_dir}")
else:
print(f"No backup profile directory found for context {context_id}.")
async def install_extension(context: BrowserContext, extension_path: Path, context_id: str):
"""Load an extension for a specific context."""
extension_path = Path(extension_path)
profile_dir = BASE_PROFILE_DIR / f"chromium_profile_{context_id}"
if extension_path.exists():
# Copy extension to the user data directory's Extensions folder
extension_dest = profile_dir / "Extensions" / extension_path.name
if not extension_dest.exists():
shutil.copytree(extension_path, extension_dest)
print(f"Copied extension to {extension_dest} for context {context_id}")
# Add a script to simulate extension interaction (optional)
await context.add_init_script(
script="""chrome.runtime.sendMessage({ message: 'Extension loaded' });"""
)
print(f"Extension loaded from {extension_path} for context {context_id}")
else:
print(f"Extension path {extension_path} does not exist for context {context_id}.")
async def run_context(simulated_browser: SimulatedBrowser, context_id: str, extension_paths: List[Path]):
"""Run a single browser context with its own persistent state."""
# Create a new context with the pre-set extensions
context = await simulated_browser.new_context(
context_id=context_id,
extension_paths=extension_paths
)
self._window.close()
# Allow Qt event loop to process close events
QTimer.singleShot(0, self._app.quit)
# Open a new page in the context
page = await context.new_page()
@property
def contexts(self) -> List[BrowserContext]:
return self._contexts
# Navigate to a sample website
await page.goto("https://example.com")
# Print the page title as an example action
print(f"Page title for context {context_id}: {await page.title()}")
async def run(playwright, num_contexts=3, use_persistent_context=True):
"""Run multiple browser contexts with persistent states."""
# Ensure base directories exist
BASE_PROFILE_DIR.mkdir(parents=True, exist_ok=True)
BASE_STATE_DIR.mkdir(parents=True, exist_ok=True)
# Define pre-set extensions to be copied to all contexts
EXTENSION_PATHS = [
Path("./my_extension1"), # Example: use forward slashes for cross-platform compatibility
Path("./my_extension2") # Replace with actual paths to unpacked extension directories
]
# Create simulated browser with the chosen mode and extensions
simulated_browser = SimulatedBrowser(playwright, BASE_PROFILE_DIR, use_persistent_context, EXTENSION_PATHS)
await simulated_browser.launch()
# Create tasks for multiple contexts
tasks = []
for _ in range(num_contexts):
context_id = str(uuid.uuid4()) # Unique ID for each context
tasks.append(run_context(simulated_browser, context_id, EXTENSION_PATHS))
# Run all contexts concurrently
await asyncio.gather(*tasks)
while True:
asyncio.sleep(0.1)
# Close the simulated browser, saving states and backing up profiles
await simulated_browser.close()
async def main():
"""Main entry point to test both modes."""
"""
HOW TO USE
----------
This script provides a unified solution for managing multiple Playwright browser contexts with persistent states (session data, extensions, cache) using two approaches:
1. launch_persistent_context mode: Uses Playwright's launch_persistent_context() for native persistence via user data directories.
2. browser.launch mode: Uses Playwright's browser.launch() with manual persistence via storage state files and profile directories.
Prerequisites:
- Install Playwright: Run "pip install playwright" in your terminal.
- Install Chromium: Run "playwright install chromium" to download the Chromium browser used by Playwright.
- Ensure Python 3.7+ is installed to support async/await syntax.
Setting Up Extensions:
- Update the EXTENSION_PATHS list in the run() function with paths to unpacked extension directories, each containing a manifest.json file.
- Example:
EXTENSION_PATHS = [
Path("./my_extension1"),
Path("./my_extension2")
]
- To obtain unpacked extensions:
- On Linux/macOS: Extract a .crx file using "unzip <extension>.crx".
- On Windows: Use 7-Zip to extract a .crx file (it is a ZIP archive).
- Alternatively, pre-install extensions in a Chromium browser and copy its profile directory (e.g., ~/.config/chromium on Linux, ~/Library/Application Support/Chromium on macOS, %LocalAppData%\\Chromium\\User Data on Windows) to chromium_profile_<context_id> for each context.
- Note: Extensions load natively in launch_persistent_context mode but may not load dynamically in browser.launch mode without additional setup (e.g., automating chrome://extensions/).
Running the Script:
- Save the script as script.py.
- Run it from the terminal: "python script.py".
- The script will:
- Create chromium_profiles and browser_states directories in the script's working directory.
- Run two tests:
1. launch_persistent_context mode: Creates 3 contexts, each with a unique user data directory (chromium_profile_<context_id>) and storage state file (browser_state_<context_id>.json).
2. browser.launch mode: Creates 3 contexts using a single browser instance, with manual persistence.
- For each context:
- Copies all extensions from EXTENSION_PATHS to the context's profile directory (chromium_profile_<context_id>/Extensions).
- Loads session data (cookies, local storage) from browser_state_<context_id>.json if it exists.
- Navigates to https://example.com and prints the page title as a sample action.
- Saves session data and backs up the profile directory (in launch_persistent_context mode) when closing.
- Output will include logs for directory creation, extension copying, state loading/saving, and page titles.
Subsequent Runs:
- The script restores profile directories and session states for each context using unique context IDs (UUIDs), ensuring persistence across runs.
- Backed-up profile directories (chromium_profile_backup_<context_id>) are used to restore chromium_profile_<context_id> if needed.
- Session data persists in browser_state_<context_id>.json files.
Customization:
- Change the number of contexts by modifying num_contexts in run().
- Switch modes by setting use_persistent_context=True (launch_persistent_context) or False (browser.launch) in run().
- Update EXTENSION_PATHS with your extension directories.
- Add context options (e.g., user_agent, locale) by passing them to new_context(**kwargs) in run_context().
DEBUGGING TIPS
--------------
If you encounter issues while running the script, consider the following debugging strategies:
Extension Not Loading:
- Verify each path in EXTENSION_PATHS points to a valid unpacked extension directory containing a manifest.json file.
- Check the chromium_profile_<context_id>/Extensions directory to ensure extensions were copied correctly.
- In launch_persistent_context mode:
- Open the browser (since headless=False) and navigate to chrome://extensions/ to confirm extensions are loaded.
- Ensure extensions are compatible with Chromium and have a valid manifest.
- In browser.launch mode:
- Extensions may not load dynamically due to Playwright's limitations.
- To enable extensions, pre-install them in a Chromium browser and copy the profile to chromium_profile_<context_id>.
- Alternatively, automate extension installation by navigating to chrome://extensions/ and enabling developer mode (not implemented in this script).
- If extensions fail to load, check console logs for errors related to install_extension().
Session Data Issues:
- Ensure browser_state_<context_id>.json files in the browser_states directory contain valid JSON.
- Open these files to verify cookies and local storage data are correctly formatted.
- Check console output for messages like "Loaded browser state..." or "Saved browser state..." to confirm state handling.
- If session data does not persist, verify write permissions for the browser_states directory.
- Test by logging into a website in one run and checking if the login persists in the next run.
Backup Errors:
- The copytree_ignore_errors function prevents errors related to SingletonLock, SingletonCookie, and SingletonSocket files by ignoring them during backups.
- If backup issues persist, check console logs for messages like "Skipped problematic file..." or "Non-critical errors during copy...".
- Ensure no other processes (e.g., lingering Chromium instances) are accessing chromium_profile_<context_id> directories.
- Verify write permissions for the chromium_profiles directory.
- If backups fail, manually inspect chromium_profile_backup_<context_id> to ensure critical files (e.g., extensions, cache) were copied.
Indentation Errors:
- The code uses consistent 4-space indentation per PEP 8. If syntax errors occur, ensure no tabs are mixed with spaces.
- Copy the code directly to avoid indentation issues introduced by text editors or copy-paste operations.
- Use an editor with Python linting (e.g., VS Code with Pylance) to detect indentation problems.
Resource Usage:
- In launch_persistent_context mode, each context runs a separate Chromium instance, increasing memory and CPU usage. Monitor system resources if using many contexts.
- In browser.launch mode, a single browser instance is used, but extension/cache persistence is limited.
- If performance is an issue, reduce num_contexts or switch to browser.launch mode.
Other Errors:
- If you encounter runtime errors (e.g., network issues, Playwright exceptions), check the full traceback printed by the script.
- Share the error message and traceback for specific debugging assistance.
- Common issues include:
- Network errors: Ensure internet connectivity and that https://example.com is accessible.
- Playwright errors: Verify Playwright and Chromium are installed correctly.
- File permission errors: Run the script with appropriate permissions (e.g., as administrator on Windows if needed).
- To isolate issues, try running with num_contexts=1 and a single extension.
Logging:
- The script includes verbose logging for key actions (e.g., directory creation, extension copying, state loading/saving).
- Review console output to trace execution and identify where errors occur.
- Add custom print statements in functions like new_context() or install_extension() for deeper debugging.
Testing:
- Test with a small number of contexts (e.g., num_contexts=1) to verify basic functionality.
- Test each mode separately by commenting out one call in main().
- Test with and without extensions to isolate extension-related issues.
- Run on different platforms (Linux, macOS, Windows) to confirm cross-platform compatibility.
"""
async with async_playwright() as playwright:
try:
# Test with launch_persistent_context mode
print("Running with launch_persistent_context...")
await run(playwright, num_contexts=3, use_persistent_context=True)
def __aenter__(self):
return self
# Test with browser.launch mode
print("\nRunning with browser.launch...")
await run(playwright, num_contexts=3, use_persistent_context=False)
except Exception as e:
print(f"An error occurred: {e}")
async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.close()
def main():
async def run():
async with async_playwright() as p:
browser = QtPlaywrightBrowser(p)
page = await browser.new_page("https://example.com")
page2 = await browser.new_page("https://www.google.com")
page3 = await browser.new_page("https://www.wikipedia.org")
await page.evaluate("window.open('https://www.bing.com')")
await asyncio.sleep(60) # Keep GUI open
await browser.close()
asyncio.run(run())
if __name__ == "__main__":
asyncio.run(main())
if __name__ == '__main__':
main()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment