From e891c2da42422759dc78fbc233531ae4e01de7a1 Mon Sep 17 00:00:00 2001 From: dgtlmoon Date: Thu, 29 May 2025 15:03:11 +0200 Subject: [PATCH] WIP - switch to python async mode, tweak eventlet --- changedetectionio/__init__.py | 4 + .../blueprint/browser_steps/__init__.py | 87 +++++++--- .../blueprint/browser_steps/browser_steps.py | 146 +++++++++-------- .../blueprint/browser_steps/nonContext.py | 17 -- changedetectionio/content_fetchers/base.py | 12 +- .../content_fetchers/playwright.py | 155 +++++++++++++----- changedetectionio/processors/__init__.py | 46 ++++-- changedetectionio/realtime/socket_server.py | 7 +- 8 files changed, 303 insertions(+), 171 deletions(-) delete mode 100644 changedetectionio/blueprint/browser_steps/nonContext.py diff --git a/changedetectionio/__init__.py b/changedetectionio/__init__.py index a665a5d5..fcdfaf22 100644 --- a/changedetectionio/__init__.py +++ b/changedetectionio/__init__.py @@ -11,6 +11,10 @@ import getopt import platform import signal +import eventlet +# Re-enable eventlet monkey patching now that Playwright is async +eventlet.monkey_patch() + import sys from changedetectionio import store from changedetectionio.flask_app import changedetection_app diff --git a/changedetectionio/blueprint/browser_steps/__init__.py b/changedetectionio/blueprint/browser_steps/__init__.py index f7907c7c..3942393c 100644 --- a/changedetectionio/blueprint/browser_steps/__init__.py +++ b/changedetectionio/blueprint/browser_steps/__init__.py @@ -25,35 +25,65 @@ io_interface_context = None import json import hashlib from flask import Response +import asyncio +import threading + +# Global event loop for browser steps +_browser_steps_loop = None +_loop_thread = None + +def get_browser_steps_loop(): + """Get or create a dedicated event loop for browser steps operations""" + global _browser_steps_loop, _loop_thread + + if _browser_steps_loop is None or _browser_steps_loop.is_closed(): + def run_loop(): + global _browser_steps_loop + _browser_steps_loop = asyncio.new_event_loop() + asyncio.set_event_loop(_browser_steps_loop) + _browser_steps_loop.run_forever() + + _loop_thread = threading.Thread(target=run_loop, daemon=True) + _loop_thread.start() + + # Wait for loop to be ready + import time + while _browser_steps_loop is None: + time.sleep(0.01) + + return _browser_steps_loop + +def run_async_in_browser_loop(coro): + """Run async coroutine in the dedicated browser steps event loop""" + loop = get_browser_steps_loop() + future = asyncio.run_coroutine_threadsafe(coro, loop) + return future.result() def construct_blueprint(datastore: ChangeDetectionStore): browser_steps_blueprint = Blueprint('browser_steps', __name__, template_folder="templates") - def start_browsersteps_session(watch_uuid): - from . import nonContext + async def start_browsersteps_session(watch_uuid): from . import browser_steps import time - global io_interface_context + from playwright.async_api import async_playwright # We keep the playwright session open for many minutes keepalive_seconds = int(os.getenv('BROWSERSTEPS_MINUTES_KEEPALIVE', 10)) * 60 browsersteps_start_session = {'start_time': time.time()} - # You can only have one of these running - # This should be very fine to leave running for the life of the application - # @idea - Make it global so the pool of watch fetchers can use it also - if not io_interface_context: - io_interface_context = nonContext.c_sync_playwright() - # Start the Playwright context, which is actually a nodejs sub-process and communicates over STDIN/STDOUT pipes - io_interface_context = io_interface_context.start() + # Create a new async playwright instance for browser steps + playwright_instance = async_playwright() + playwright_context = await playwright_instance.start() keepalive_ms = ((keepalive_seconds + 3) * 1000) base_url = os.getenv('PLAYWRIGHT_DRIVER_URL', '').strip('"') a = "?" if not '?' in base_url else '&' base_url += a + f"timeout={keepalive_ms}" - browsersteps_start_session['browser'] = io_interface_context.chromium.connect_over_cdp(base_url) + browser = await playwright_context.chromium.connect_over_cdp(base_url, timeout=keepalive_ms) + browsersteps_start_session['browser'] = browser + browsersteps_start_session['playwright_context'] = playwright_context proxy_id = datastore.get_preferred_proxy_for_watch(uuid=watch_uuid) proxy = None @@ -75,15 +105,20 @@ def construct_blueprint(datastore: ChangeDetectionStore): logger.debug(f"Browser Steps: UUID {watch_uuid} selected proxy {proxy_url}") # Tell Playwright to connect to Chrome and setup a new session via our stepper interface - browsersteps_start_session['browserstepper'] = browser_steps.browsersteps_live_ui( - playwright_browser=browsersteps_start_session['browser'], + browserstepper = browser_steps.browsersteps_live_ui( + playwright_browser=browser, proxy=proxy, start_url=datastore.data['watching'][watch_uuid].link, headers=datastore.data['watching'][watch_uuid].get('headers') ) + + # Initialize the async connection + await browserstepper.connect(proxy=proxy) + + browsersteps_start_session['browserstepper'] = browserstepper # For test - #browsersteps_start_session['browserstepper'].action_goto_url(value="http://example.com?time="+str(time.time())) + #await browsersteps_start_session['browserstepper'].action_goto_url(value="http://example.com?time="+str(time.time())) return browsersteps_start_session @@ -92,7 +127,7 @@ def construct_blueprint(datastore: ChangeDetectionStore): @browser_steps_blueprint.route("/browsersteps_start_session", methods=['GET']) def browsersteps_start_session(): # A new session was requested, return sessionID - + import asyncio import uuid browsersteps_session_id = str(uuid.uuid4()) watch_uuid = request.args.get('uuid') @@ -104,7 +139,10 @@ def construct_blueprint(datastore: ChangeDetectionStore): logger.debug("browser_steps.py connecting") try: - browsersteps_sessions[browsersteps_session_id] = start_browsersteps_session(watch_uuid) + # Run the async function in the dedicated browser steps event loop + browsersteps_sessions[browsersteps_session_id] = run_async_in_browser_loop( + start_browsersteps_session(watch_uuid) + ) except Exception as e: if 'ECONNREFUSED' in str(e): return make_response('Unable to start the Playwright Browser session, is sockpuppetbrowser running? Network configuration is OK?', 401) @@ -169,9 +207,14 @@ def construct_blueprint(datastore: ChangeDetectionStore): is_last_step = strtobool(request.form.get('is_last_step')) try: - browsersteps_sessions[browsersteps_session_id]['browserstepper'].call_action(action_name=step_operation, - selector=step_selector, - optional_value=step_optional_value) + # Run the async call_action method in the dedicated browser steps event loop + run_async_in_browser_loop( + browsersteps_sessions[browsersteps_session_id]['browserstepper'].call_action( + action_name=step_operation, + selector=step_selector, + optional_value=step_optional_value + ) + ) except Exception as e: logger.error(f"Exception when calling step operation {step_operation} {str(e)}") @@ -185,7 +228,11 @@ def construct_blueprint(datastore: ChangeDetectionStore): # Screenshots and other info only needed on requesting a step (POST) try: - (screenshot, xpath_data) = browsersteps_sessions[browsersteps_session_id]['browserstepper'].get_current_state() + # Run the async get_current_state method in the dedicated browser steps event loop + (screenshot, xpath_data) = run_async_in_browser_loop( + browsersteps_sessions[browsersteps_session_id]['browserstepper'].get_current_state() + ) + if is_last_step: watch = datastore.data['watching'].get(uuid) u = browsersteps_sessions[browsersteps_session_id]['browserstepper'].page.url diff --git a/changedetectionio/blueprint/browser_steps/browser_steps.py b/changedetectionio/blueprint/browser_steps/browser_steps.py index d380d565..e0a3cb2c 100644 --- a/changedetectionio/blueprint/browser_steps/browser_steps.py +++ b/changedetectionio/blueprint/browser_steps/browser_steps.py @@ -63,7 +63,7 @@ class steppable_browser_interface(): self.start_url = start_url # Convert and perform "Click Button" for example - def call_action(self, action_name, selector=None, optional_value=None): + async def call_action(self, action_name, selector=None, optional_value=None): if self.page is None: logger.warning("Cannot call action on None page object") return @@ -93,73 +93,74 @@ class steppable_browser_interface(): optional_value = jinja_render(template_str=optional_value) - action_handler(selector, optional_value) + await action_handler(selector, optional_value) # Safely wait for timeout - self.page.wait_for_timeout(1.5 * 1000) + await self.page.wait_for_timeout(1.5 * 1000) logger.debug(f"Call action done in {time.time()-now:.2f}s") - def action_goto_url(self, selector=None, value=None): + async def action_goto_url(self, selector=None, value=None): if not value: logger.warning("No URL provided for goto_url action") return None now = time.time() - response = self.page.goto(value, timeout=0, wait_until='load') + response = await self.page.goto(value, timeout=0, wait_until='load') logger.debug(f"Time to goto URL {time.time()-now:.2f}s") return response # Incase they request to go back to the start - def action_goto_site(self, selector=None, value=None): - return self.action_goto_url(value=self.start_url) + async def action_goto_site(self, selector=None, value=None): + return await self.action_goto_url(value=self.start_url) - def action_click_element_containing_text(self, selector=None, value=''): + async def action_click_element_containing_text(self, selector=None, value=''): logger.debug("Clicking element containing text") if not value or not len(value.strip()): return elem = self.page.get_by_text(value) - if elem.count(): - elem.first.click(delay=randint(200, 500), timeout=self.action_timeout) + if await elem.count(): + await elem.first.click(delay=randint(200, 500), timeout=self.action_timeout) - def action_click_element_containing_text_if_exists(self, selector=None, value=''): + async def action_click_element_containing_text_if_exists(self, selector=None, value=''): logger.debug("Clicking element containing text if exists") if not value or not len(value.strip()): return elem = self.page.get_by_text(value) - logger.debug(f"Clicking element containing text - {elem.count()} elements found") - if elem.count(): - elem.first.click(delay=randint(200, 500), timeout=self.action_timeout) + count = await elem.count() + logger.debug(f"Clicking element containing text - {count} elements found") + if count: + await elem.first.click(delay=randint(200, 500), timeout=self.action_timeout) - def action_enter_text_in_field(self, selector, value): + async def action_enter_text_in_field(self, selector, value): if not selector or not len(selector.strip()): return - self.page.fill(selector, value, timeout=self.action_timeout) + await self.page.fill(selector, value, timeout=self.action_timeout) - def action_execute_js(self, selector, value): + async def action_execute_js(self, selector, value): if not value: return None - return self.page.evaluate(value) + return await self.page.evaluate(value) - def action_click_element(self, selector, value): + async def action_click_element(self, selector, value): logger.debug("Clicking element") if not selector or not len(selector.strip()): return - self.page.click(selector=selector, timeout=self.action_timeout + 20 * 1000, delay=randint(200, 500)) + await self.page.click(selector=selector, timeout=self.action_timeout + 20 * 1000, delay=randint(200, 500)) - def action_click_element_if_exists(self, selector, value): + async def action_click_element_if_exists(self, selector, value): import playwright._impl._errors as _api_types logger.debug("Clicking element if exists") if not selector or not len(selector.strip()): return try: - self.page.click(selector, timeout=self.action_timeout, delay=randint(200, 500)) + await self.page.click(selector, timeout=self.action_timeout, delay=randint(200, 500)) except _api_types.TimeoutError: return except _api_types.Error: @@ -167,7 +168,7 @@ class steppable_browser_interface(): return - def action_click_x_y(self, selector, value): + async def action_click_x_y(self, selector, value): if not value or not re.match(r'^\s?\d+\s?,\s?\d+\s?$', value): logger.warning("'Click X,Y' step should be in the format of '100 , 90'") return @@ -177,42 +178,42 @@ class steppable_browser_interface(): x = int(float(x.strip())) y = int(float(y.strip())) - self.page.mouse.click(x=x, y=y, delay=randint(200, 500)) + await self.page.mouse.click(x=x, y=y, delay=randint(200, 500)) except Exception as e: logger.error(f"Error parsing x,y coordinates: {str(e)}") - def action__select_by_option_text(self, selector, value): + async def action__select_by_option_text(self, selector, value): if not selector or not len(selector.strip()): return - self.page.select_option(selector, label=value, timeout=self.action_timeout) + await self.page.select_option(selector, label=value, timeout=self.action_timeout) - def action_scroll_down(self, selector, value): + async def action_scroll_down(self, selector, value): # Some sites this doesnt work on for some reason - self.page.mouse.wheel(0, 600) - self.page.wait_for_timeout(1000) + await self.page.mouse.wheel(0, 600) + await self.page.wait_for_timeout(1000) - def action_wait_for_seconds(self, selector, value): + async def action_wait_for_seconds(self, selector, value): try: seconds = float(value.strip()) if value else 1.0 - self.page.wait_for_timeout(seconds * 1000) + await self.page.wait_for_timeout(seconds * 1000) except (ValueError, TypeError) as e: logger.error(f"Invalid value for wait_for_seconds: {str(e)}") - def action_wait_for_text(self, selector, value): + async def action_wait_for_text(self, selector, value): if not value: return import json v = json.dumps(value) - self.page.wait_for_function( + await self.page.wait_for_function( f'document.querySelector("body").innerText.includes({v});', timeout=30000 ) - def action_wait_for_text_in_element(self, selector, value): + async def action_wait_for_text_in_element(self, selector, value): if not selector or not value: return @@ -220,49 +221,49 @@ class steppable_browser_interface(): s = json.dumps(selector) v = json.dumps(value) - self.page.wait_for_function( + await self.page.wait_for_function( f'document.querySelector({s}).innerText.includes({v});', timeout=30000 ) # @todo - in the future make some popout interface to capture what needs to be set # https://playwright.dev/python/docs/api/class-keyboard - def action_press_enter(self, selector, value): - self.page.keyboard.press("Enter", delay=randint(200, 500)) + async def action_press_enter(self, selector, value): + await self.page.keyboard.press("Enter", delay=randint(200, 500)) - def action_press_page_up(self, selector, value): - self.page.keyboard.press("PageUp", delay=randint(200, 500)) + async def action_press_page_up(self, selector, value): + await self.page.keyboard.press("PageUp", delay=randint(200, 500)) - def action_press_page_down(self, selector, value): - self.page.keyboard.press("PageDown", delay=randint(200, 500)) + async def action_press_page_down(self, selector, value): + await self.page.keyboard.press("PageDown", delay=randint(200, 500)) - def action_check_checkbox(self, selector, value): + async def action_check_checkbox(self, selector, value): if not selector: return - self.page.locator(selector).check(timeout=self.action_timeout) + await self.page.locator(selector).check(timeout=self.action_timeout) - def action_uncheck_checkbox(self, selector, value): + async def action_uncheck_checkbox(self, selector, value): if not selector: return - self.page.locator(selector).uncheck(timeout=self.action_timeout) + await self.page.locator(selector).uncheck(timeout=self.action_timeout) - def action_remove_elements(self, selector, value): + async def action_remove_elements(self, selector, value): """Removes all elements matching the given selector from the DOM.""" if not selector: return - self.page.locator(selector).evaluate_all("els => els.forEach(el => el.remove())") + await self.page.locator(selector).evaluate_all("els => els.forEach(el => el.remove())") - def action_make_all_child_elements_visible(self, selector, value): + async def action_make_all_child_elements_visible(self, selector, value): """Recursively makes all child elements inside the given selector fully visible.""" if not selector: return - self.page.locator(selector).locator("*").evaluate_all(""" + await self.page.locator(selector).locator("*").evaluate_all(""" els => els.forEach(el => { el.style.display = 'block'; // Forces it to be displayed el.style.visibility = 'visible'; // Ensures it's not hidden @@ -307,21 +308,22 @@ class browsersteps_live_ui(steppable_browser_interface): self.playwright_browser = playwright_browser self.start_url = start_url self._is_cleaned_up = False - if self.context is None: - self.connect(proxy=proxy) + self.proxy = proxy + # Note: connect() is now async and must be called separately def __del__(self): # Ensure cleanup happens if object is garbage collected - self.cleanup() + # Note: cleanup is now async, so we can only mark as cleaned up here + self._is_cleaned_up = True # Connect and setup a new context - def connect(self, proxy=None): + async def connect(self, proxy=None): # Should only get called once - test that keep_open = 1000 * 60 * 5 now = time.time() # @todo handle multiple contexts, bind a unique id from the browser on each req? - self.context = self.playwright_browser.new_context( + self.context = await self.playwright_browser.new_context( accept_downloads=False, # Should never be needed bypass_csp=True, # This is needed to enable JavaScript execution on GitHub and others extra_http_headers=self.headers, @@ -332,7 +334,7 @@ class browsersteps_live_ui(steppable_browser_interface): user_agent=manage_user_agent(headers=self.headers), ) - self.page = self.context.new_page() + self.page = await self.context.new_page() # self.page.set_default_navigation_timeout(keep_open) self.page.set_default_timeout(keep_open) @@ -342,13 +344,15 @@ class browsersteps_live_ui(steppable_browser_interface): self.page.on("console", lambda msg: print(f"Browser steps console - {msg.type}: {msg.text} {msg.args}")) logger.debug(f"Time to browser setup {time.time()-now:.2f}s") - self.page.wait_for_timeout(1 * 1000) + await self.page.wait_for_timeout(1 * 1000) def mark_as_closed(self): logger.debug("Page closed, cleaning up..") - self.cleanup() + # Note: This is called from a sync context (event handler) + # so we'll just mark as cleaned up and let __del__ handle the rest + self._is_cleaned_up = True - def cleanup(self): + async def cleanup(self): """Properly clean up all resources to prevent memory leaks""" if self._is_cleaned_up: return @@ -359,7 +363,7 @@ class browsersteps_live_ui(steppable_browser_interface): if hasattr(self, 'page') and self.page is not None: try: # Force garbage collection before closing - self.page.request_gc() + await self.page.request_gc() except Exception as e: logger.debug(f"Error during page garbage collection: {str(e)}") @@ -370,7 +374,7 @@ class browsersteps_live_ui(steppable_browser_interface): logger.debug(f"Error removing event listeners: {str(e)}") try: - self.page.close() + await self.page.close() except Exception as e: logger.debug(f"Error closing page: {str(e)}") @@ -379,7 +383,7 @@ class browsersteps_live_ui(steppable_browser_interface): # Clean up context if hasattr(self, 'context') and self.context is not None: try: - self.context.close() + await self.context.close() except Exception as e: logger.debug(f"Error closing context: {str(e)}") @@ -401,12 +405,12 @@ class browsersteps_live_ui(steppable_browser_interface): return False - def get_current_state(self): + async def get_current_state(self): """Return the screenshot and interactive elements mapping, generally always called after action_()""" import importlib.resources import json # because we for now only run browser steps in playwright mode (not puppeteer mode) - from changedetectionio.content_fetchers.playwright import capture_full_page + from changedetectionio.content_fetchers.playwright import capture_full_page_async # Safety check - don't proceed if resources are cleaned up if self._is_cleaned_up or self.page is None: @@ -416,29 +420,29 @@ class browsersteps_live_ui(steppable_browser_interface): xpath_element_js = importlib.resources.files("changedetectionio.content_fetchers.res").joinpath('xpath_element_scraper.js').read_text() now = time.time() - self.page.wait_for_timeout(1 * 1000) + await self.page.wait_for_timeout(1 * 1000) screenshot = None xpath_data = None try: # Get screenshot first - screenshot = capture_full_page(page=self.page) + screenshot = await capture_full_page_async(page=self.page) logger.debug(f"Time to get screenshot from browser {time.time() - now:.2f}s") # Then get interactive elements now = time.time() - self.page.evaluate("var include_filters=''") - self.page.request_gc() + await self.page.evaluate("var include_filters=''") + await self.page.request_gc() scan_elements = 'a,button,input,select,textarea,i,th,td,p,li,h1,h2,h3,h4,div,span' MAX_TOTAL_HEIGHT = int(os.getenv("SCREENSHOT_MAX_HEIGHT", SCREENSHOT_MAX_HEIGHT_DEFAULT)) - xpath_data = json.loads(self.page.evaluate(xpath_element_js, { + xpath_data = json.loads(await self.page.evaluate(xpath_element_js, { "visualselector_xpath_selectors": scan_elements, "max_height": MAX_TOTAL_HEIGHT })) - self.page.request_gc() + await self.page.request_gc() # Sort elements by size xpath_data['size_pos'] = sorted(xpath_data['size_pos'], key=lambda k: k['width'] * k['height'], reverse=True) @@ -448,13 +452,13 @@ class browsersteps_live_ui(steppable_browser_interface): logger.error(f"Error getting current state: {str(e)}") # Attempt recovery - force garbage collection try: - self.page.request_gc() + await self.page.request_gc() except: pass # Request garbage collection one final time try: - self.page.request_gc() + await self.page.request_gc() except: pass diff --git a/changedetectionio/blueprint/browser_steps/nonContext.py b/changedetectionio/blueprint/browser_steps/nonContext.py deleted file mode 100644 index 93abe269..00000000 --- a/changedetectionio/blueprint/browser_steps/nonContext.py +++ /dev/null @@ -1,17 +0,0 @@ -from playwright.sync_api import PlaywrightContextManager - -# So playwright wants to run as a context manager, but we do something horrible and hacky -# we are holding the session open for as long as possible, then shutting it down, and opening a new one -# So it means we don't get to use PlaywrightContextManager' __enter__ __exit__ -# To work around this, make goodbye() act the same as the __exit__() -# -# But actually I think this is because the context is opened correctly with __enter__() but we timeout the connection -# then theres some lock condition where we cant destroy it without it hanging - -class c_PlaywrightContextManager(PlaywrightContextManager): - - def goodbye(self) -> None: - self.__exit__() - -def c_sync_playwright() -> PlaywrightContextManager: - return c_PlaywrightContextManager() diff --git a/changedetectionio/content_fetchers/base.py b/changedetectionio/content_fetchers/base.py index bfa7e83c..9aa40b40 100644 --- a/changedetectionio/content_fetchers/base.py +++ b/changedetectionio/content_fetchers/base.py @@ -122,7 +122,7 @@ class Fetcher(): return None - def iterate_browser_steps(self, start_url=None): + async def iterate_browser_steps(self, start_url=None): from changedetectionio.blueprint.browser_steps.browser_steps import steppable_browser_interface from playwright._impl._errors import TimeoutError, Error from changedetectionio.safe_jinja import render as jinja_render @@ -136,8 +136,8 @@ class Fetcher(): for step in valid_steps: step_n += 1 logger.debug(f">> Iterating check - browser Step n {step_n} - {step['operation']}...") - self.screenshot_step("before-" + str(step_n)) - self.save_step_html("before-" + str(step_n)) + await self.screenshot_step("before-" + str(step_n)) + await self.save_step_html("before-" + str(step_n)) try: optional_value = step['optional_value'] @@ -148,11 +148,11 @@ class Fetcher(): if '{%' in step['selector'] or '{{' in step['selector']: selector = jinja_render(template_str=step['selector']) - getattr(interface, "call_action")(action_name=step['operation'], + await getattr(interface, "call_action")(action_name=step['operation'], selector=selector, optional_value=optional_value) - self.screenshot_step(step_n) - self.save_step_html(step_n) + await self.screenshot_step(step_n) + await self.save_step_html(step_n) except (Error, TimeoutError) as e: logger.debug(str(e)) # Stop processing here diff --git a/changedetectionio/content_fetchers/playwright.py b/changedetectionio/content_fetchers/playwright.py index bb8ade18..9974579e 100644 --- a/changedetectionio/content_fetchers/playwright.py +++ b/changedetectionio/content_fetchers/playwright.py @@ -9,6 +9,70 @@ from changedetectionio.content_fetchers import SCREENSHOT_MAX_HEIGHT_DEFAULT, vi from changedetectionio.content_fetchers.base import Fetcher, manage_user_agent from changedetectionio.content_fetchers.exceptions import PageUnloadable, Non200ErrorCodeReceived, EmptyReply, ScreenshotUnavailable +async def capture_full_page_async(page): + import os + import time + from multiprocessing import Process, Pipe + + start = time.time() + + page_height = await page.evaluate("document.documentElement.scrollHeight") + page_width = await page.evaluate("document.documentElement.scrollWidth") + original_viewport = page.viewport_size + + logger.debug(f"Playwright viewport size {page.viewport_size} page height {page_height} page width {page_width}") + + # Use an approach similar to puppeteer: set a larger viewport and take screenshots in chunks + step_size = SCREENSHOT_SIZE_STITCH_THRESHOLD # Size that won't cause GPU to overflow + screenshot_chunks = [] + y = 0 + + if page_height > page.viewport_size['height']: + if page_height < step_size: + step_size = page_height # Incase page is bigger than default viewport but smaller than proposed step size + logger.debug(f"Setting bigger viewport to step through large page width W{page.viewport_size['width']}xH{step_size} because page_height > viewport_size") + # Set viewport to a larger size to capture more content at once + await page.set_viewport_size({'width': page.viewport_size['width'], 'height': step_size}) + + # Capture screenshots in chunks up to the max total height + while y < min(page_height, SCREENSHOT_MAX_TOTAL_HEIGHT): + await page.request_gc() + await page.evaluate(f"window.scrollTo(0, {y})") + await page.request_gc() + screenshot_chunks.append(await page.screenshot( + type="jpeg", + full_page=False, + quality=int(os.getenv("SCREENSHOT_QUALITY", 72)) + )) + y += step_size + await page.request_gc() + + # Restore original viewport size + await page.set_viewport_size({'width': original_viewport['width'], 'height': original_viewport['height']}) + + # If we have multiple chunks, stitch them together + if len(screenshot_chunks) > 1: + from changedetectionio.content_fetchers.screenshot_handler import stitch_images_worker + logger.debug(f"Screenshot stitching {len(screenshot_chunks)} chunks together") + parent_conn, child_conn = Pipe() + p = Process(target=stitch_images_worker, args=(child_conn, screenshot_chunks, page_height, SCREENSHOT_MAX_TOTAL_HEIGHT)) + p.start() + screenshot = parent_conn.recv_bytes() + p.join() + logger.debug( + f"Screenshot (chunked/stitched) - Page height: {page_height} Capture height: {SCREENSHOT_MAX_TOTAL_HEIGHT} - Stitched together in {time.time() - start:.2f}s") + # Explicit cleanup + del screenshot_chunks + del p + del parent_conn, child_conn + screenshot_chunks = None + return screenshot + + logger.debug( + f"Screenshot Page height: {page_height} Capture height: {SCREENSHOT_MAX_TOTAL_HEIGHT} - Stitched together in {time.time() - start:.2f}s") + + return screenshot_chunks[0] + def capture_full_page(page): import os import time @@ -124,9 +188,9 @@ class fetcher(Fetcher): self.proxy['username'] = parsed.username self.proxy['password'] = parsed.password - def screenshot_step(self, step_n=''): + async def screenshot_step(self, step_n=''): super().screenshot_step(step_n=step_n) - screenshot = capture_full_page(page=self.page) + screenshot = await capture_full_page_async(page=self.page) if self.browser_steps_screenshot_path is not None: @@ -135,15 +199,15 @@ class fetcher(Fetcher): with open(destination, 'wb') as f: f.write(screenshot) - def save_step_html(self, step_n): + async def save_step_html(self, step_n): super().save_step_html(step_n=step_n) - content = self.page.content() + content = await self.page.content() destination = os.path.join(self.browser_steps_screenshot_path, 'step_{}.html'.format(step_n)) logger.debug(f"Saving step HTML to {destination}") with open(destination, 'w') as f: f.write(content) - def run(self, + async def run(self, url, timeout, request_headers, @@ -154,26 +218,26 @@ class fetcher(Fetcher): is_binary=False, empty_pages_are_a_change=False): - from playwright.sync_api import sync_playwright + from playwright.async_api import async_playwright import playwright._impl._errors import time self.delete_browser_steps_screenshots() response = None - with sync_playwright() as p: + async with async_playwright() as p: browser_type = getattr(p, self.browser_type) # Seemed to cause a connection Exception even tho I can see it connect # self.browser = browser_type.connect(self.command_executor, timeout=timeout*1000) # 60,000 connection timeout only - browser = browser_type.connect_over_cdp(self.browser_connection_url, timeout=60000) + browser = await browser_type.connect_over_cdp(self.browser_connection_url, timeout=60000) # SOCKS5 with authentication is not supported (yet) # https://github.com/microsoft/playwright/issues/10567 # Set user agent to prevent Cloudflare from blocking the browser # Use the default one configured in the App.py model that's passed from fetch_site_status.py - context = browser.new_context( + context = await browser.new_context( accept_downloads=False, # Should never be needed bypass_csp=True, # This is needed to enable JavaScript execution on GitHub and others extra_http_headers=request_headers, @@ -183,7 +247,7 @@ class fetcher(Fetcher): user_agent=manage_user_agent(headers=request_headers), ) - self.page = context.new_page() + self.page = await context.new_page() # Listen for all console events and handle errors self.page.on("console", lambda msg: logger.debug(f"Playwright console: Watch URL: {url} {msg.type}: {msg.text} {msg.args}")) @@ -193,32 +257,37 @@ class fetcher(Fetcher): browsersteps_interface = steppable_browser_interface(start_url=url) browsersteps_interface.page = self.page - response = browsersteps_interface.action_goto_url(value=url) + response = await browsersteps_interface.action_goto_url(value=url) if response is None: - context.close() - browser.close() + await context.close() + await browser.close() logger.debug("Content Fetcher > Response object from the browser communication was none") raise EmptyReply(url=url, status_code=None) - self.headers = response.all_headers() + # In async_playwright, all_headers() returns a coroutine + try: + self.headers = await response.all_headers() + except TypeError: + # Fallback for sync version + self.headers = response.all_headers() try: if self.webdriver_js_execute_code is not None and len(self.webdriver_js_execute_code): - browsersteps_interface.action_execute_js(value=self.webdriver_js_execute_code, selector=None) + await browsersteps_interface.action_execute_js(value=self.webdriver_js_execute_code, selector=None) except playwright._impl._errors.TimeoutError as e: - context.close() - browser.close() + await context.close() + await browser.close() # This can be ok, we will try to grab what we could retrieve pass except Exception as e: logger.debug(f"Content Fetcher > Other exception when executing custom JS code {str(e)}") - context.close() - browser.close() + await context.close() + await browser.close() raise PageUnloadable(url=url, status_code=None, message=str(e)) extra_wait = int(os.getenv("WEBDRIVER_DELAY_BEFORE_CONTENT_READY", 5)) + self.render_extract_delay - self.page.wait_for_timeout(extra_wait * 1000) + await self.page.wait_for_timeout(extra_wait * 1000) try: self.status_code = response.status @@ -226,48 +295,48 @@ class fetcher(Fetcher): # https://github.com/dgtlmoon/changedetection.io/discussions/2122#discussioncomment-8241962 logger.critical(f"Response from the browser/Playwright did not have a status_code! Response follows.") logger.critical(response) - context.close() - browser.close() + await context.close() + await browser.close() raise PageUnloadable(url=url, status_code=None, message=str(e)) if self.status_code != 200 and not ignore_status_codes: - screenshot = capture_full_page(self.page) + screenshot = await capture_full_page_async(self.page) raise Non200ErrorCodeReceived(url=url, status_code=self.status_code, screenshot=screenshot) - if not empty_pages_are_a_change and len(self.page.content().strip()) == 0: + if not empty_pages_are_a_change and len((await self.page.content()).strip()) == 0: logger.debug("Content Fetcher > Content was empty, empty_pages_are_a_change = False") - context.close() - browser.close() + await context.close() + await browser.close() raise EmptyReply(url=url, status_code=response.status) # Run Browser Steps here if self.browser_steps_get_valid_steps(): - self.iterate_browser_steps(start_url=url) + await self.iterate_browser_steps(start_url=url) - self.page.wait_for_timeout(extra_wait * 1000) + await self.page.wait_for_timeout(extra_wait * 1000) now = time.time() # So we can find an element on the page where its selector was entered manually (maybe not xPath etc) if current_include_filters is not None: - self.page.evaluate("var include_filters={}".format(json.dumps(current_include_filters))) + await self.page.evaluate("var include_filters={}".format(json.dumps(current_include_filters))) else: - self.page.evaluate("var include_filters=''") - self.page.request_gc() + await self.page.evaluate("var include_filters=''") + await self.page.request_gc() # request_gc before and after evaluate to free up memory # @todo browsersteps etc MAX_TOTAL_HEIGHT = int(os.getenv("SCREENSHOT_MAX_HEIGHT", SCREENSHOT_MAX_HEIGHT_DEFAULT)) - self.xpath_data = self.page.evaluate(XPATH_ELEMENT_JS, { + self.xpath_data = await self.page.evaluate(XPATH_ELEMENT_JS, { "visualselector_xpath_selectors": visualselector_xpath_selectors, "max_height": MAX_TOTAL_HEIGHT }) - self.page.request_gc() + await self.page.request_gc() - self.instock_data = self.page.evaluate(INSTOCK_DATA_JS) - self.page.request_gc() + self.instock_data = await self.page.evaluate(INSTOCK_DATA_JS) + await self.page.request_gc() - self.content = self.page.content() - self.page.request_gc() + self.content = await self.page.content() + await self.page.request_gc() logger.debug(f"Scrape xPath element data in browser done in {time.time() - now:.2f}s") # Bug 3 in Playwright screenshot handling @@ -279,7 +348,7 @@ class fetcher(Fetcher): # acceptable screenshot quality here try: # The actual screenshot - this always base64 and needs decoding! horrible! huge CPU usage - self.screenshot = capture_full_page(page=self.page) + self.screenshot = await capture_full_page_async(page=self.page) except Exception as e: # It's likely the screenshot was too long/big and something crashed @@ -287,30 +356,30 @@ class fetcher(Fetcher): finally: # Request garbage collection one more time before closing try: - self.page.request_gc() + await self.page.request_gc() except: pass # Clean up resources properly try: - self.page.request_gc() + await self.page.request_gc() except: pass try: - self.page.close() + await self.page.close() except: pass self.page = None try: - context.close() + await context.close() except: pass context = None try: - browser.close() + await browser.close() except: pass browser = None diff --git a/changedetectionio/processors/__init__.py b/changedetectionio/processors/__init__.py index e7c97a16..e62adc53 100644 --- a/changedetectionio/processors/__init__.py +++ b/changedetectionio/processors/__init__.py @@ -147,16 +147,42 @@ class difference_detection_processor(): # And here we go! call the right browser with browser-specific settings empty_pages_are_a_change = self.datastore.data['settings']['application'].get('empty_pages_are_a_change', False) - self.fetcher.run(url=url, - timeout=timeout, - request_headers=request_headers, - request_body=request_body, - request_method=request_method, - ignore_status_codes=ignore_status_codes, - current_include_filters=self.watch.get('include_filters'), - is_binary=is_binary, - empty_pages_are_a_change=empty_pages_are_a_change - ) + # Check if the fetcher run method is async (for playwright) + import asyncio + import inspect + + run_method = getattr(self.fetcher, 'run') + if inspect.iscoroutinefunction(run_method): + # Use asyncio to run the async method + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + try: + loop.run_until_complete( + self.fetcher.run(url=url, + timeout=timeout, + request_headers=request_headers, + request_body=request_body, + request_method=request_method, + ignore_status_codes=ignore_status_codes, + current_include_filters=self.watch.get('include_filters'), + is_binary=is_binary, + empty_pages_are_a_change=empty_pages_are_a_change + ) + ) + finally: + loop.close() + else: + # Synchronous fetcher (requests, etc.) + self.fetcher.run(url=url, + timeout=timeout, + request_headers=request_headers, + request_body=request_body, + request_method=request_method, + ignore_status_codes=ignore_status_codes, + current_include_filters=self.watch.get('include_filters'), + is_binary=is_binary, + empty_pages_are_a_change=empty_pages_are_a_change + ) #@todo .quit here could go on close object, so we can run JS if change-detected self.fetcher.quit(watch=self.watch) diff --git a/changedetectionio/realtime/socket_server.py b/changedetectionio/realtime/socket_server.py index 8cbb50c0..fc781853 100644 --- a/changedetectionio/realtime/socket_server.py +++ b/changedetectionio/realtime/socket_server.py @@ -27,7 +27,6 @@ class SignalHandler: # Create and start the queue update thread using eventlet import eventlet - # logger.info("Using eventlet for polling thread") self.polling_emitter_thread = eventlet.spawn(self.polling_emit_running_or_queued_watches) # Store the thread reference in socketio for clean shutdown @@ -177,7 +176,7 @@ def handle_watch_update(socketio, **kwargs): socketio.emit("watch_update", {'watch': watch_data, 'general_stats': general_stats}) # Log after successful emit - #logger.info(f"Socket.IO: Emitted update for watch {watch.get('uuid')}, Checking now: {watch_data['checking_now']}") + logger.info(f"Socket.IO: Emitted update for watch {watch.get('uuid')}, Checking now: {watch_data['checking_now']}") except Exception as e: logger.error(f"Socket.IO error in handle_watch_update: {str(e)}") @@ -185,8 +184,8 @@ def handle_watch_update(socketio, **kwargs): def init_socketio(app, datastore): """Initialize SocketIO with the main Flask app""" - # Use eventlet async_mode to match the eventlet server - # This is required since the main app uses eventlet.wsgi.server + # Use eventlet async_mode now that Playwright is async and compatible + # Eventlet mode works well with async_playwright async_mode = 'eventlet' logger.info(f"Using {async_mode} mode for Socket.IO")