WIP - switch to python async mode, tweak eventlet

socketio-tweaks
dgtlmoon 2025-05-29 15:03:11 +02:00
rodzic b535339e94
commit e891c2da42
8 zmienionych plików z 303 dodań i 171 usunięć

Wyświetl plik

@ -11,6 +11,10 @@ import getopt
import platform
import signal
import eventlet
# Re-enable eventlet monkey patching now that Playwright is async
eventlet.monkey_patch()
import sys
from changedetectionio import store
from changedetectionio.flask_app import changedetection_app

Wyświetl plik

@ -25,35 +25,65 @@ io_interface_context = None
import json
import hashlib
from flask import Response
import asyncio
import threading
# Global event loop for browser steps
_browser_steps_loop = None
_loop_thread = None
def get_browser_steps_loop():
"""Get or create a dedicated event loop for browser steps operations"""
global _browser_steps_loop, _loop_thread
if _browser_steps_loop is None or _browser_steps_loop.is_closed():
def run_loop():
global _browser_steps_loop
_browser_steps_loop = asyncio.new_event_loop()
asyncio.set_event_loop(_browser_steps_loop)
_browser_steps_loop.run_forever()
_loop_thread = threading.Thread(target=run_loop, daemon=True)
_loop_thread.start()
# Wait for loop to be ready
import time
while _browser_steps_loop is None:
time.sleep(0.01)
return _browser_steps_loop
def run_async_in_browser_loop(coro):
"""Run async coroutine in the dedicated browser steps event loop"""
loop = get_browser_steps_loop()
future = asyncio.run_coroutine_threadsafe(coro, loop)
return future.result()
def construct_blueprint(datastore: ChangeDetectionStore):
browser_steps_blueprint = Blueprint('browser_steps', __name__, template_folder="templates")
def start_browsersteps_session(watch_uuid):
from . import nonContext
async def start_browsersteps_session(watch_uuid):
from . import browser_steps
import time
global io_interface_context
from playwright.async_api import async_playwright
# We keep the playwright session open for many minutes
keepalive_seconds = int(os.getenv('BROWSERSTEPS_MINUTES_KEEPALIVE', 10)) * 60
browsersteps_start_session = {'start_time': time.time()}
# You can only have one of these running
# This should be very fine to leave running for the life of the application
# @idea - Make it global so the pool of watch fetchers can use it also
if not io_interface_context:
io_interface_context = nonContext.c_sync_playwright()
# Start the Playwright context, which is actually a nodejs sub-process and communicates over STDIN/STDOUT pipes
io_interface_context = io_interface_context.start()
# Create a new async playwright instance for browser steps
playwright_instance = async_playwright()
playwright_context = await playwright_instance.start()
keepalive_ms = ((keepalive_seconds + 3) * 1000)
base_url = os.getenv('PLAYWRIGHT_DRIVER_URL', '').strip('"')
a = "?" if not '?' in base_url else '&'
base_url += a + f"timeout={keepalive_ms}"
browsersteps_start_session['browser'] = io_interface_context.chromium.connect_over_cdp(base_url)
browser = await playwright_context.chromium.connect_over_cdp(base_url, timeout=keepalive_ms)
browsersteps_start_session['browser'] = browser
browsersteps_start_session['playwright_context'] = playwright_context
proxy_id = datastore.get_preferred_proxy_for_watch(uuid=watch_uuid)
proxy = None
@ -75,15 +105,20 @@ def construct_blueprint(datastore: ChangeDetectionStore):
logger.debug(f"Browser Steps: UUID {watch_uuid} selected proxy {proxy_url}")
# Tell Playwright to connect to Chrome and setup a new session via our stepper interface
browsersteps_start_session['browserstepper'] = browser_steps.browsersteps_live_ui(
playwright_browser=browsersteps_start_session['browser'],
browserstepper = browser_steps.browsersteps_live_ui(
playwright_browser=browser,
proxy=proxy,
start_url=datastore.data['watching'][watch_uuid].link,
headers=datastore.data['watching'][watch_uuid].get('headers')
)
# Initialize the async connection
await browserstepper.connect(proxy=proxy)
browsersteps_start_session['browserstepper'] = browserstepper
# For test
#browsersteps_start_session['browserstepper'].action_goto_url(value="http://example.com?time="+str(time.time()))
#await browsersteps_start_session['browserstepper'].action_goto_url(value="http://example.com?time="+str(time.time()))
return browsersteps_start_session
@ -92,7 +127,7 @@ def construct_blueprint(datastore: ChangeDetectionStore):
@browser_steps_blueprint.route("/browsersteps_start_session", methods=['GET'])
def browsersteps_start_session():
# A new session was requested, return sessionID
import asyncio
import uuid
browsersteps_session_id = str(uuid.uuid4())
watch_uuid = request.args.get('uuid')
@ -104,7 +139,10 @@ def construct_blueprint(datastore: ChangeDetectionStore):
logger.debug("browser_steps.py connecting")
try:
browsersteps_sessions[browsersteps_session_id] = start_browsersteps_session(watch_uuid)
# Run the async function in the dedicated browser steps event loop
browsersteps_sessions[browsersteps_session_id] = run_async_in_browser_loop(
start_browsersteps_session(watch_uuid)
)
except Exception as e:
if 'ECONNREFUSED' in str(e):
return make_response('Unable to start the Playwright Browser session, is sockpuppetbrowser running? Network configuration is OK?', 401)
@ -169,9 +207,14 @@ def construct_blueprint(datastore: ChangeDetectionStore):
is_last_step = strtobool(request.form.get('is_last_step'))
try:
browsersteps_sessions[browsersteps_session_id]['browserstepper'].call_action(action_name=step_operation,
selector=step_selector,
optional_value=step_optional_value)
# Run the async call_action method in the dedicated browser steps event loop
run_async_in_browser_loop(
browsersteps_sessions[browsersteps_session_id]['browserstepper'].call_action(
action_name=step_operation,
selector=step_selector,
optional_value=step_optional_value
)
)
except Exception as e:
logger.error(f"Exception when calling step operation {step_operation} {str(e)}")
@ -185,7 +228,11 @@ def construct_blueprint(datastore: ChangeDetectionStore):
# Screenshots and other info only needed on requesting a step (POST)
try:
(screenshot, xpath_data) = browsersteps_sessions[browsersteps_session_id]['browserstepper'].get_current_state()
# Run the async get_current_state method in the dedicated browser steps event loop
(screenshot, xpath_data) = run_async_in_browser_loop(
browsersteps_sessions[browsersteps_session_id]['browserstepper'].get_current_state()
)
if is_last_step:
watch = datastore.data['watching'].get(uuid)
u = browsersteps_sessions[browsersteps_session_id]['browserstepper'].page.url

Wyświetl plik

@ -63,7 +63,7 @@ class steppable_browser_interface():
self.start_url = start_url
# Convert and perform "Click Button" for example
def call_action(self, action_name, selector=None, optional_value=None):
async def call_action(self, action_name, selector=None, optional_value=None):
if self.page is None:
logger.warning("Cannot call action on None page object")
return
@ -93,73 +93,74 @@ class steppable_browser_interface():
optional_value = jinja_render(template_str=optional_value)
action_handler(selector, optional_value)
await action_handler(selector, optional_value)
# Safely wait for timeout
self.page.wait_for_timeout(1.5 * 1000)
await self.page.wait_for_timeout(1.5 * 1000)
logger.debug(f"Call action done in {time.time()-now:.2f}s")
def action_goto_url(self, selector=None, value=None):
async def action_goto_url(self, selector=None, value=None):
if not value:
logger.warning("No URL provided for goto_url action")
return None
now = time.time()
response = self.page.goto(value, timeout=0, wait_until='load')
response = await self.page.goto(value, timeout=0, wait_until='load')
logger.debug(f"Time to goto URL {time.time()-now:.2f}s")
return response
# Incase they request to go back to the start
def action_goto_site(self, selector=None, value=None):
return self.action_goto_url(value=self.start_url)
async def action_goto_site(self, selector=None, value=None):
return await self.action_goto_url(value=self.start_url)
def action_click_element_containing_text(self, selector=None, value=''):
async def action_click_element_containing_text(self, selector=None, value=''):
logger.debug("Clicking element containing text")
if not value or not len(value.strip()):
return
elem = self.page.get_by_text(value)
if elem.count():
elem.first.click(delay=randint(200, 500), timeout=self.action_timeout)
if await elem.count():
await elem.first.click(delay=randint(200, 500), timeout=self.action_timeout)
def action_click_element_containing_text_if_exists(self, selector=None, value=''):
async def action_click_element_containing_text_if_exists(self, selector=None, value=''):
logger.debug("Clicking element containing text if exists")
if not value or not len(value.strip()):
return
elem = self.page.get_by_text(value)
logger.debug(f"Clicking element containing text - {elem.count()} elements found")
if elem.count():
elem.first.click(delay=randint(200, 500), timeout=self.action_timeout)
count = await elem.count()
logger.debug(f"Clicking element containing text - {count} elements found")
if count:
await elem.first.click(delay=randint(200, 500), timeout=self.action_timeout)
def action_enter_text_in_field(self, selector, value):
async def action_enter_text_in_field(self, selector, value):
if not selector or not len(selector.strip()):
return
self.page.fill(selector, value, timeout=self.action_timeout)
await self.page.fill(selector, value, timeout=self.action_timeout)
def action_execute_js(self, selector, value):
async def action_execute_js(self, selector, value):
if not value:
return None
return self.page.evaluate(value)
return await self.page.evaluate(value)
def action_click_element(self, selector, value):
async def action_click_element(self, selector, value):
logger.debug("Clicking element")
if not selector or not len(selector.strip()):
return
self.page.click(selector=selector, timeout=self.action_timeout + 20 * 1000, delay=randint(200, 500))
await self.page.click(selector=selector, timeout=self.action_timeout + 20 * 1000, delay=randint(200, 500))
def action_click_element_if_exists(self, selector, value):
async def action_click_element_if_exists(self, selector, value):
import playwright._impl._errors as _api_types
logger.debug("Clicking element if exists")
if not selector or not len(selector.strip()):
return
try:
self.page.click(selector, timeout=self.action_timeout, delay=randint(200, 500))
await self.page.click(selector, timeout=self.action_timeout, delay=randint(200, 500))
except _api_types.TimeoutError:
return
except _api_types.Error:
@ -167,7 +168,7 @@ class steppable_browser_interface():
return
def action_click_x_y(self, selector, value):
async def action_click_x_y(self, selector, value):
if not value or not re.match(r'^\s?\d+\s?,\s?\d+\s?$', value):
logger.warning("'Click X,Y' step should be in the format of '100 , 90'")
return
@ -177,42 +178,42 @@ class steppable_browser_interface():
x = int(float(x.strip()))
y = int(float(y.strip()))
self.page.mouse.click(x=x, y=y, delay=randint(200, 500))
await self.page.mouse.click(x=x, y=y, delay=randint(200, 500))
except Exception as e:
logger.error(f"Error parsing x,y coordinates: {str(e)}")
def action__select_by_option_text(self, selector, value):
async def action__select_by_option_text(self, selector, value):
if not selector or not len(selector.strip()):
return
self.page.select_option(selector, label=value, timeout=self.action_timeout)
await self.page.select_option(selector, label=value, timeout=self.action_timeout)
def action_scroll_down(self, selector, value):
async def action_scroll_down(self, selector, value):
# Some sites this doesnt work on for some reason
self.page.mouse.wheel(0, 600)
self.page.wait_for_timeout(1000)
await self.page.mouse.wheel(0, 600)
await self.page.wait_for_timeout(1000)
def action_wait_for_seconds(self, selector, value):
async def action_wait_for_seconds(self, selector, value):
try:
seconds = float(value.strip()) if value else 1.0
self.page.wait_for_timeout(seconds * 1000)
await self.page.wait_for_timeout(seconds * 1000)
except (ValueError, TypeError) as e:
logger.error(f"Invalid value for wait_for_seconds: {str(e)}")
def action_wait_for_text(self, selector, value):
async def action_wait_for_text(self, selector, value):
if not value:
return
import json
v = json.dumps(value)
self.page.wait_for_function(
await self.page.wait_for_function(
f'document.querySelector("body").innerText.includes({v});',
timeout=30000
)
def action_wait_for_text_in_element(self, selector, value):
async def action_wait_for_text_in_element(self, selector, value):
if not selector or not value:
return
@ -220,49 +221,49 @@ class steppable_browser_interface():
s = json.dumps(selector)
v = json.dumps(value)
self.page.wait_for_function(
await self.page.wait_for_function(
f'document.querySelector({s}).innerText.includes({v});',
timeout=30000
)
# @todo - in the future make some popout interface to capture what needs to be set
# https://playwright.dev/python/docs/api/class-keyboard
def action_press_enter(self, selector, value):
self.page.keyboard.press("Enter", delay=randint(200, 500))
async def action_press_enter(self, selector, value):
await self.page.keyboard.press("Enter", delay=randint(200, 500))
def action_press_page_up(self, selector, value):
self.page.keyboard.press("PageUp", delay=randint(200, 500))
async def action_press_page_up(self, selector, value):
await self.page.keyboard.press("PageUp", delay=randint(200, 500))
def action_press_page_down(self, selector, value):
self.page.keyboard.press("PageDown", delay=randint(200, 500))
async def action_press_page_down(self, selector, value):
await self.page.keyboard.press("PageDown", delay=randint(200, 500))
def action_check_checkbox(self, selector, value):
async def action_check_checkbox(self, selector, value):
if not selector:
return
self.page.locator(selector).check(timeout=self.action_timeout)
await self.page.locator(selector).check(timeout=self.action_timeout)
def action_uncheck_checkbox(self, selector, value):
async def action_uncheck_checkbox(self, selector, value):
if not selector:
return
self.page.locator(selector).uncheck(timeout=self.action_timeout)
await self.page.locator(selector).uncheck(timeout=self.action_timeout)
def action_remove_elements(self, selector, value):
async def action_remove_elements(self, selector, value):
"""Removes all elements matching the given selector from the DOM."""
if not selector:
return
self.page.locator(selector).evaluate_all("els => els.forEach(el => el.remove())")
await self.page.locator(selector).evaluate_all("els => els.forEach(el => el.remove())")
def action_make_all_child_elements_visible(self, selector, value):
async def action_make_all_child_elements_visible(self, selector, value):
"""Recursively makes all child elements inside the given selector fully visible."""
if not selector:
return
self.page.locator(selector).locator("*").evaluate_all("""
await self.page.locator(selector).locator("*").evaluate_all("""
els => els.forEach(el => {
el.style.display = 'block'; // Forces it to be displayed
el.style.visibility = 'visible'; // Ensures it's not hidden
@ -307,21 +308,22 @@ class browsersteps_live_ui(steppable_browser_interface):
self.playwright_browser = playwright_browser
self.start_url = start_url
self._is_cleaned_up = False
if self.context is None:
self.connect(proxy=proxy)
self.proxy = proxy
# Note: connect() is now async and must be called separately
def __del__(self):
# Ensure cleanup happens if object is garbage collected
self.cleanup()
# Note: cleanup is now async, so we can only mark as cleaned up here
self._is_cleaned_up = True
# Connect and setup a new context
def connect(self, proxy=None):
async def connect(self, proxy=None):
# Should only get called once - test that
keep_open = 1000 * 60 * 5
now = time.time()
# @todo handle multiple contexts, bind a unique id from the browser on each req?
self.context = self.playwright_browser.new_context(
self.context = await self.playwright_browser.new_context(
accept_downloads=False, # Should never be needed
bypass_csp=True, # This is needed to enable JavaScript execution on GitHub and others
extra_http_headers=self.headers,
@ -332,7 +334,7 @@ class browsersteps_live_ui(steppable_browser_interface):
user_agent=manage_user_agent(headers=self.headers),
)
self.page = self.context.new_page()
self.page = await self.context.new_page()
# self.page.set_default_navigation_timeout(keep_open)
self.page.set_default_timeout(keep_open)
@ -342,13 +344,15 @@ class browsersteps_live_ui(steppable_browser_interface):
self.page.on("console", lambda msg: print(f"Browser steps console - {msg.type}: {msg.text} {msg.args}"))
logger.debug(f"Time to browser setup {time.time()-now:.2f}s")
self.page.wait_for_timeout(1 * 1000)
await self.page.wait_for_timeout(1 * 1000)
def mark_as_closed(self):
logger.debug("Page closed, cleaning up..")
self.cleanup()
# Note: This is called from a sync context (event handler)
# so we'll just mark as cleaned up and let __del__ handle the rest
self._is_cleaned_up = True
def cleanup(self):
async def cleanup(self):
"""Properly clean up all resources to prevent memory leaks"""
if self._is_cleaned_up:
return
@ -359,7 +363,7 @@ class browsersteps_live_ui(steppable_browser_interface):
if hasattr(self, 'page') and self.page is not None:
try:
# Force garbage collection before closing
self.page.request_gc()
await self.page.request_gc()
except Exception as e:
logger.debug(f"Error during page garbage collection: {str(e)}")
@ -370,7 +374,7 @@ class browsersteps_live_ui(steppable_browser_interface):
logger.debug(f"Error removing event listeners: {str(e)}")
try:
self.page.close()
await self.page.close()
except Exception as e:
logger.debug(f"Error closing page: {str(e)}")
@ -379,7 +383,7 @@ class browsersteps_live_ui(steppable_browser_interface):
# Clean up context
if hasattr(self, 'context') and self.context is not None:
try:
self.context.close()
await self.context.close()
except Exception as e:
logger.debug(f"Error closing context: {str(e)}")
@ -401,12 +405,12 @@ class browsersteps_live_ui(steppable_browser_interface):
return False
def get_current_state(self):
async def get_current_state(self):
"""Return the screenshot and interactive elements mapping, generally always called after action_()"""
import importlib.resources
import json
# because we for now only run browser steps in playwright mode (not puppeteer mode)
from changedetectionio.content_fetchers.playwright import capture_full_page
from changedetectionio.content_fetchers.playwright import capture_full_page_async
# Safety check - don't proceed if resources are cleaned up
if self._is_cleaned_up or self.page is None:
@ -416,29 +420,29 @@ class browsersteps_live_ui(steppable_browser_interface):
xpath_element_js = importlib.resources.files("changedetectionio.content_fetchers.res").joinpath('xpath_element_scraper.js').read_text()
now = time.time()
self.page.wait_for_timeout(1 * 1000)
await self.page.wait_for_timeout(1 * 1000)
screenshot = None
xpath_data = None
try:
# Get screenshot first
screenshot = capture_full_page(page=self.page)
screenshot = await capture_full_page_async(page=self.page)
logger.debug(f"Time to get screenshot from browser {time.time() - now:.2f}s")
# Then get interactive elements
now = time.time()
self.page.evaluate("var include_filters=''")
self.page.request_gc()
await self.page.evaluate("var include_filters=''")
await self.page.request_gc()
scan_elements = 'a,button,input,select,textarea,i,th,td,p,li,h1,h2,h3,h4,div,span'
MAX_TOTAL_HEIGHT = int(os.getenv("SCREENSHOT_MAX_HEIGHT", SCREENSHOT_MAX_HEIGHT_DEFAULT))
xpath_data = json.loads(self.page.evaluate(xpath_element_js, {
xpath_data = json.loads(await self.page.evaluate(xpath_element_js, {
"visualselector_xpath_selectors": scan_elements,
"max_height": MAX_TOTAL_HEIGHT
}))
self.page.request_gc()
await self.page.request_gc()
# Sort elements by size
xpath_data['size_pos'] = sorted(xpath_data['size_pos'], key=lambda k: k['width'] * k['height'], reverse=True)
@ -448,13 +452,13 @@ class browsersteps_live_ui(steppable_browser_interface):
logger.error(f"Error getting current state: {str(e)}")
# Attempt recovery - force garbage collection
try:
self.page.request_gc()
await self.page.request_gc()
except:
pass
# Request garbage collection one final time
try:
self.page.request_gc()
await self.page.request_gc()
except:
pass

Wyświetl plik

@ -1,17 +0,0 @@
from playwright.sync_api import PlaywrightContextManager
# So playwright wants to run as a context manager, but we do something horrible and hacky
# we are holding the session open for as long as possible, then shutting it down, and opening a new one
# So it means we don't get to use PlaywrightContextManager' __enter__ __exit__
# To work around this, make goodbye() act the same as the __exit__()
#
# But actually I think this is because the context is opened correctly with __enter__() but we timeout the connection
# then theres some lock condition where we cant destroy it without it hanging
class c_PlaywrightContextManager(PlaywrightContextManager):
def goodbye(self) -> None:
self.__exit__()
def c_sync_playwright() -> PlaywrightContextManager:
return c_PlaywrightContextManager()

Wyświetl plik

@ -122,7 +122,7 @@ class Fetcher():
return None
def iterate_browser_steps(self, start_url=None):
async def iterate_browser_steps(self, start_url=None):
from changedetectionio.blueprint.browser_steps.browser_steps import steppable_browser_interface
from playwright._impl._errors import TimeoutError, Error
from changedetectionio.safe_jinja import render as jinja_render
@ -136,8 +136,8 @@ class Fetcher():
for step in valid_steps:
step_n += 1
logger.debug(f">> Iterating check - browser Step n {step_n} - {step['operation']}...")
self.screenshot_step("before-" + str(step_n))
self.save_step_html("before-" + str(step_n))
await self.screenshot_step("before-" + str(step_n))
await self.save_step_html("before-" + str(step_n))
try:
optional_value = step['optional_value']
@ -148,11 +148,11 @@ class Fetcher():
if '{%' in step['selector'] or '{{' in step['selector']:
selector = jinja_render(template_str=step['selector'])
getattr(interface, "call_action")(action_name=step['operation'],
await getattr(interface, "call_action")(action_name=step['operation'],
selector=selector,
optional_value=optional_value)
self.screenshot_step(step_n)
self.save_step_html(step_n)
await self.screenshot_step(step_n)
await self.save_step_html(step_n)
except (Error, TimeoutError) as e:
logger.debug(str(e))
# Stop processing here

Wyświetl plik

@ -9,6 +9,70 @@ from changedetectionio.content_fetchers import SCREENSHOT_MAX_HEIGHT_DEFAULT, vi
from changedetectionio.content_fetchers.base import Fetcher, manage_user_agent
from changedetectionio.content_fetchers.exceptions import PageUnloadable, Non200ErrorCodeReceived, EmptyReply, ScreenshotUnavailable
async def capture_full_page_async(page):
import os
import time
from multiprocessing import Process, Pipe
start = time.time()
page_height = await page.evaluate("document.documentElement.scrollHeight")
page_width = await page.evaluate("document.documentElement.scrollWidth")
original_viewport = page.viewport_size
logger.debug(f"Playwright viewport size {page.viewport_size} page height {page_height} page width {page_width}")
# Use an approach similar to puppeteer: set a larger viewport and take screenshots in chunks
step_size = SCREENSHOT_SIZE_STITCH_THRESHOLD # Size that won't cause GPU to overflow
screenshot_chunks = []
y = 0
if page_height > page.viewport_size['height']:
if page_height < step_size:
step_size = page_height # Incase page is bigger than default viewport but smaller than proposed step size
logger.debug(f"Setting bigger viewport to step through large page width W{page.viewport_size['width']}xH{step_size} because page_height > viewport_size")
# Set viewport to a larger size to capture more content at once
await page.set_viewport_size({'width': page.viewport_size['width'], 'height': step_size})
# Capture screenshots in chunks up to the max total height
while y < min(page_height, SCREENSHOT_MAX_TOTAL_HEIGHT):
await page.request_gc()
await page.evaluate(f"window.scrollTo(0, {y})")
await page.request_gc()
screenshot_chunks.append(await page.screenshot(
type="jpeg",
full_page=False,
quality=int(os.getenv("SCREENSHOT_QUALITY", 72))
))
y += step_size
await page.request_gc()
# Restore original viewport size
await page.set_viewport_size({'width': original_viewport['width'], 'height': original_viewport['height']})
# If we have multiple chunks, stitch them together
if len(screenshot_chunks) > 1:
from changedetectionio.content_fetchers.screenshot_handler import stitch_images_worker
logger.debug(f"Screenshot stitching {len(screenshot_chunks)} chunks together")
parent_conn, child_conn = Pipe()
p = Process(target=stitch_images_worker, args=(child_conn, screenshot_chunks, page_height, SCREENSHOT_MAX_TOTAL_HEIGHT))
p.start()
screenshot = parent_conn.recv_bytes()
p.join()
logger.debug(
f"Screenshot (chunked/stitched) - Page height: {page_height} Capture height: {SCREENSHOT_MAX_TOTAL_HEIGHT} - Stitched together in {time.time() - start:.2f}s")
# Explicit cleanup
del screenshot_chunks
del p
del parent_conn, child_conn
screenshot_chunks = None
return screenshot
logger.debug(
f"Screenshot Page height: {page_height} Capture height: {SCREENSHOT_MAX_TOTAL_HEIGHT} - Stitched together in {time.time() - start:.2f}s")
return screenshot_chunks[0]
def capture_full_page(page):
import os
import time
@ -124,9 +188,9 @@ class fetcher(Fetcher):
self.proxy['username'] = parsed.username
self.proxy['password'] = parsed.password
def screenshot_step(self, step_n=''):
async def screenshot_step(self, step_n=''):
super().screenshot_step(step_n=step_n)
screenshot = capture_full_page(page=self.page)
screenshot = await capture_full_page_async(page=self.page)
if self.browser_steps_screenshot_path is not None:
@ -135,15 +199,15 @@ class fetcher(Fetcher):
with open(destination, 'wb') as f:
f.write(screenshot)
def save_step_html(self, step_n):
async def save_step_html(self, step_n):
super().save_step_html(step_n=step_n)
content = self.page.content()
content = await self.page.content()
destination = os.path.join(self.browser_steps_screenshot_path, 'step_{}.html'.format(step_n))
logger.debug(f"Saving step HTML to {destination}")
with open(destination, 'w') as f:
f.write(content)
def run(self,
async def run(self,
url,
timeout,
request_headers,
@ -154,26 +218,26 @@ class fetcher(Fetcher):
is_binary=False,
empty_pages_are_a_change=False):
from playwright.sync_api import sync_playwright
from playwright.async_api import async_playwright
import playwright._impl._errors
import time
self.delete_browser_steps_screenshots()
response = None
with sync_playwright() as p:
async with async_playwright() as p:
browser_type = getattr(p, self.browser_type)
# Seemed to cause a connection Exception even tho I can see it connect
# self.browser = browser_type.connect(self.command_executor, timeout=timeout*1000)
# 60,000 connection timeout only
browser = browser_type.connect_over_cdp(self.browser_connection_url, timeout=60000)
browser = await browser_type.connect_over_cdp(self.browser_connection_url, timeout=60000)
# SOCKS5 with authentication is not supported (yet)
# https://github.com/microsoft/playwright/issues/10567
# Set user agent to prevent Cloudflare from blocking the browser
# Use the default one configured in the App.py model that's passed from fetch_site_status.py
context = browser.new_context(
context = await browser.new_context(
accept_downloads=False, # Should never be needed
bypass_csp=True, # This is needed to enable JavaScript execution on GitHub and others
extra_http_headers=request_headers,
@ -183,7 +247,7 @@ class fetcher(Fetcher):
user_agent=manage_user_agent(headers=request_headers),
)
self.page = context.new_page()
self.page = await context.new_page()
# Listen for all console events and handle errors
self.page.on("console", lambda msg: logger.debug(f"Playwright console: Watch URL: {url} {msg.type}: {msg.text} {msg.args}"))
@ -193,32 +257,37 @@ class fetcher(Fetcher):
browsersteps_interface = steppable_browser_interface(start_url=url)
browsersteps_interface.page = self.page
response = browsersteps_interface.action_goto_url(value=url)
response = await browsersteps_interface.action_goto_url(value=url)
if response is None:
context.close()
browser.close()
await context.close()
await browser.close()
logger.debug("Content Fetcher > Response object from the browser communication was none")
raise EmptyReply(url=url, status_code=None)
self.headers = response.all_headers()
# In async_playwright, all_headers() returns a coroutine
try:
self.headers = await response.all_headers()
except TypeError:
# Fallback for sync version
self.headers = response.all_headers()
try:
if self.webdriver_js_execute_code is not None and len(self.webdriver_js_execute_code):
browsersteps_interface.action_execute_js(value=self.webdriver_js_execute_code, selector=None)
await browsersteps_interface.action_execute_js(value=self.webdriver_js_execute_code, selector=None)
except playwright._impl._errors.TimeoutError as e:
context.close()
browser.close()
await context.close()
await browser.close()
# This can be ok, we will try to grab what we could retrieve
pass
except Exception as e:
logger.debug(f"Content Fetcher > Other exception when executing custom JS code {str(e)}")
context.close()
browser.close()
await context.close()
await browser.close()
raise PageUnloadable(url=url, status_code=None, message=str(e))
extra_wait = int(os.getenv("WEBDRIVER_DELAY_BEFORE_CONTENT_READY", 5)) + self.render_extract_delay
self.page.wait_for_timeout(extra_wait * 1000)
await self.page.wait_for_timeout(extra_wait * 1000)
try:
self.status_code = response.status
@ -226,48 +295,48 @@ class fetcher(Fetcher):
# https://github.com/dgtlmoon/changedetection.io/discussions/2122#discussioncomment-8241962
logger.critical(f"Response from the browser/Playwright did not have a status_code! Response follows.")
logger.critical(response)
context.close()
browser.close()
await context.close()
await browser.close()
raise PageUnloadable(url=url, status_code=None, message=str(e))
if self.status_code != 200 and not ignore_status_codes:
screenshot = capture_full_page(self.page)
screenshot = await capture_full_page_async(self.page)
raise Non200ErrorCodeReceived(url=url, status_code=self.status_code, screenshot=screenshot)
if not empty_pages_are_a_change and len(self.page.content().strip()) == 0:
if not empty_pages_are_a_change and len((await self.page.content()).strip()) == 0:
logger.debug("Content Fetcher > Content was empty, empty_pages_are_a_change = False")
context.close()
browser.close()
await context.close()
await browser.close()
raise EmptyReply(url=url, status_code=response.status)
# Run Browser Steps here
if self.browser_steps_get_valid_steps():
self.iterate_browser_steps(start_url=url)
await self.iterate_browser_steps(start_url=url)
self.page.wait_for_timeout(extra_wait * 1000)
await self.page.wait_for_timeout(extra_wait * 1000)
now = time.time()
# So we can find an element on the page where its selector was entered manually (maybe not xPath etc)
if current_include_filters is not None:
self.page.evaluate("var include_filters={}".format(json.dumps(current_include_filters)))
await self.page.evaluate("var include_filters={}".format(json.dumps(current_include_filters)))
else:
self.page.evaluate("var include_filters=''")
self.page.request_gc()
await self.page.evaluate("var include_filters=''")
await self.page.request_gc()
# request_gc before and after evaluate to free up memory
# @todo browsersteps etc
MAX_TOTAL_HEIGHT = int(os.getenv("SCREENSHOT_MAX_HEIGHT", SCREENSHOT_MAX_HEIGHT_DEFAULT))
self.xpath_data = self.page.evaluate(XPATH_ELEMENT_JS, {
self.xpath_data = await self.page.evaluate(XPATH_ELEMENT_JS, {
"visualselector_xpath_selectors": visualselector_xpath_selectors,
"max_height": MAX_TOTAL_HEIGHT
})
self.page.request_gc()
await self.page.request_gc()
self.instock_data = self.page.evaluate(INSTOCK_DATA_JS)
self.page.request_gc()
self.instock_data = await self.page.evaluate(INSTOCK_DATA_JS)
await self.page.request_gc()
self.content = self.page.content()
self.page.request_gc()
self.content = await self.page.content()
await self.page.request_gc()
logger.debug(f"Scrape xPath element data in browser done in {time.time() - now:.2f}s")
# Bug 3 in Playwright screenshot handling
@ -279,7 +348,7 @@ class fetcher(Fetcher):
# acceptable screenshot quality here
try:
# The actual screenshot - this always base64 and needs decoding! horrible! huge CPU usage
self.screenshot = capture_full_page(page=self.page)
self.screenshot = await capture_full_page_async(page=self.page)
except Exception as e:
# It's likely the screenshot was too long/big and something crashed
@ -287,30 +356,30 @@ class fetcher(Fetcher):
finally:
# Request garbage collection one more time before closing
try:
self.page.request_gc()
await self.page.request_gc()
except:
pass
# Clean up resources properly
try:
self.page.request_gc()
await self.page.request_gc()
except:
pass
try:
self.page.close()
await self.page.close()
except:
pass
self.page = None
try:
context.close()
await context.close()
except:
pass
context = None
try:
browser.close()
await browser.close()
except:
pass
browser = None

Wyświetl plik

@ -147,16 +147,42 @@ class difference_detection_processor():
# And here we go! call the right browser with browser-specific settings
empty_pages_are_a_change = self.datastore.data['settings']['application'].get('empty_pages_are_a_change', False)
self.fetcher.run(url=url,
timeout=timeout,
request_headers=request_headers,
request_body=request_body,
request_method=request_method,
ignore_status_codes=ignore_status_codes,
current_include_filters=self.watch.get('include_filters'),
is_binary=is_binary,
empty_pages_are_a_change=empty_pages_are_a_change
)
# Check if the fetcher run method is async (for playwright)
import asyncio
import inspect
run_method = getattr(self.fetcher, 'run')
if inspect.iscoroutinefunction(run_method):
# Use asyncio to run the async method
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
loop.run_until_complete(
self.fetcher.run(url=url,
timeout=timeout,
request_headers=request_headers,
request_body=request_body,
request_method=request_method,
ignore_status_codes=ignore_status_codes,
current_include_filters=self.watch.get('include_filters'),
is_binary=is_binary,
empty_pages_are_a_change=empty_pages_are_a_change
)
)
finally:
loop.close()
else:
# Synchronous fetcher (requests, etc.)
self.fetcher.run(url=url,
timeout=timeout,
request_headers=request_headers,
request_body=request_body,
request_method=request_method,
ignore_status_codes=ignore_status_codes,
current_include_filters=self.watch.get('include_filters'),
is_binary=is_binary,
empty_pages_are_a_change=empty_pages_are_a_change
)
#@todo .quit here could go on close object, so we can run JS if change-detected
self.fetcher.quit(watch=self.watch)

Wyświetl plik

@ -27,7 +27,6 @@ class SignalHandler:
# Create and start the queue update thread using eventlet
import eventlet
# logger.info("Using eventlet for polling thread")
self.polling_emitter_thread = eventlet.spawn(self.polling_emit_running_or_queued_watches)
# Store the thread reference in socketio for clean shutdown
@ -177,7 +176,7 @@ def handle_watch_update(socketio, **kwargs):
socketio.emit("watch_update", {'watch': watch_data, 'general_stats': general_stats})
# Log after successful emit
#logger.info(f"Socket.IO: Emitted update for watch {watch.get('uuid')}, Checking now: {watch_data['checking_now']}")
logger.info(f"Socket.IO: Emitted update for watch {watch.get('uuid')}, Checking now: {watch_data['checking_now']}")
except Exception as e:
logger.error(f"Socket.IO error in handle_watch_update: {str(e)}")
@ -185,8 +184,8 @@ def handle_watch_update(socketio, **kwargs):
def init_socketio(app, datastore):
"""Initialize SocketIO with the main Flask app"""
# Use eventlet async_mode to match the eventlet server
# This is required since the main app uses eventlet.wsgi.server
# Use eventlet async_mode now that Playwright is async and compatible
# Eventlet mode works well with async_playwright
async_mode = 'eventlet'
logger.info(f"Using {async_mode} mode for Socket.IO")