kopia lustrzana https://github.com/dgtlmoon/changedetection.io
Make browsersteps UI a little more resilient
rodzic
ffde79ecac
commit
0d1366dfb9
|
@ -1,6 +1,8 @@
|
||||||
import os
|
import os
|
||||||
import time
|
import time
|
||||||
import re
|
import re
|
||||||
|
import sys
|
||||||
|
import traceback
|
||||||
from random import randint
|
from random import randint
|
||||||
from loguru import logger
|
from loguru import logger
|
||||||
|
|
||||||
|
@ -54,14 +56,34 @@ browser_step_ui_config = {'Choose one': '0 0',
|
||||||
class steppable_browser_interface():
|
class steppable_browser_interface():
|
||||||
page = None
|
page = None
|
||||||
start_url = None
|
start_url = None
|
||||||
|
|
||||||
action_timeout = 10 * 1000
|
action_timeout = 10 * 1000
|
||||||
|
|
||||||
def __init__(self, start_url):
|
def __init__(self, start_url):
|
||||||
self.start_url = start_url
|
self.start_url = start_url
|
||||||
|
|
||||||
|
def safe_page_operation(self, operation_fn, default_return=None):
|
||||||
|
"""Safely execute a page operation with error handling"""
|
||||||
|
if self.page is None:
|
||||||
|
logger.warning("Attempted operation on None page object")
|
||||||
|
return default_return
|
||||||
|
|
||||||
|
try:
|
||||||
|
return operation_fn()
|
||||||
|
except Exception as e:
|
||||||
|
logger.debug(f"Page operation failed: {str(e)}")
|
||||||
|
# Try to reclaim memory if possible
|
||||||
|
try:
|
||||||
|
self.page.request_gc()
|
||||||
|
except:
|
||||||
|
pass
|
||||||
|
return default_return
|
||||||
|
|
||||||
# Convert and perform "Click Button" for example
|
# Convert and perform "Click Button" for example
|
||||||
def call_action(self, action_name, selector=None, optional_value=None):
|
def call_action(self, action_name, selector=None, optional_value=None):
|
||||||
|
if self.page is None:
|
||||||
|
logger.warning("Cannot call action on None page object")
|
||||||
|
return
|
||||||
|
|
||||||
now = time.time()
|
now = time.time()
|
||||||
call_action_name = re.sub('[^0-9a-zA-Z]+', '_', action_name.lower())
|
call_action_name = re.sub('[^0-9a-zA-Z]+', '_', action_name.lower())
|
||||||
if call_action_name == 'choose_one':
|
if call_action_name == 'choose_one':
|
||||||
|
@ -72,28 +94,46 @@ class steppable_browser_interface():
|
||||||
if selector and selector.startswith('/') and not selector.startswith('//'):
|
if selector and selector.startswith('/') and not selector.startswith('//'):
|
||||||
selector = "xpath=" + selector
|
selector = "xpath=" + selector
|
||||||
|
|
||||||
|
# Check if action handler exists
|
||||||
|
if not hasattr(self, "action_" + call_action_name):
|
||||||
|
logger.warning(f"Action handler for '{call_action_name}' not found")
|
||||||
|
return
|
||||||
|
|
||||||
action_handler = getattr(self, "action_" + call_action_name)
|
action_handler = getattr(self, "action_" + call_action_name)
|
||||||
|
|
||||||
# Support for Jinja2 variables in the value and selector
|
# Support for Jinja2 variables in the value and selector
|
||||||
|
|
||||||
if selector and ('{%' in selector or '{{' in selector):
|
if selector and ('{%' in selector or '{{' in selector):
|
||||||
selector = jinja_render(template_str=selector)
|
selector = jinja_render(template_str=selector)
|
||||||
|
|
||||||
if optional_value and ('{%' in optional_value or '{{' in optional_value):
|
if optional_value and ('{%' in optional_value or '{{' in optional_value):
|
||||||
optional_value = jinja_render(template_str=optional_value)
|
optional_value = jinja_render(template_str=optional_value)
|
||||||
|
|
||||||
action_handler(selector, optional_value)
|
try:
|
||||||
self.page.wait_for_timeout(1.5 * 1000)
|
action_handler(selector, optional_value)
|
||||||
logger.debug(f"Call action done in {time.time()-now:.2f}s")
|
# Safely wait for timeout
|
||||||
|
def wait_timeout():
|
||||||
|
self.page.wait_for_timeout(1.5 * 1000)
|
||||||
|
self.safe_page_operation(wait_timeout)
|
||||||
|
logger.debug(f"Call action done in {time.time()-now:.2f}s")
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error executing action '{call_action_name}': {str(e)}")
|
||||||
|
# Request garbage collection to free up resources after error
|
||||||
|
try:
|
||||||
|
self.page.request_gc()
|
||||||
|
except:
|
||||||
|
pass
|
||||||
|
|
||||||
def action_goto_url(self, selector=None, value=None):
|
def action_goto_url(self, selector=None, value=None):
|
||||||
# self.page.set_viewport_size({"width": 1280, "height": 5000})
|
if not value:
|
||||||
|
logger.warning("No URL provided for goto_url action")
|
||||||
|
return None
|
||||||
|
|
||||||
now = time.time()
|
now = time.time()
|
||||||
response = self.page.goto(value, timeout=0, wait_until='load')
|
|
||||||
# Should be the same as the puppeteer_fetch.js methods, means, load with no timeout set (skip timeout)
|
def goto_operation():
|
||||||
#and also wait for seconds ?
|
return self.page.goto(value, timeout=0, wait_until='load')
|
||||||
#await page.waitForTimeout(1000);
|
|
||||||
#await page.waitForTimeout(extra_wait_ms);
|
response = self.safe_page_operation(goto_operation)
|
||||||
logger.debug(f"Time to goto URL {time.time()-now:.2f}s")
|
logger.debug(f"Time to goto URL {time.time()-now:.2f}s")
|
||||||
return response
|
return response
|
||||||
|
|
||||||
|
@ -103,116 +143,209 @@ class steppable_browser_interface():
|
||||||
|
|
||||||
def action_click_element_containing_text(self, selector=None, value=''):
|
def action_click_element_containing_text(self, selector=None, value=''):
|
||||||
logger.debug("Clicking element containing text")
|
logger.debug("Clicking element containing text")
|
||||||
if not len(value.strip()):
|
if not value or not len(value.strip()):
|
||||||
return
|
return
|
||||||
elem = self.page.get_by_text(value)
|
|
||||||
if elem.count():
|
def click_operation():
|
||||||
elem.first.click(delay=randint(200, 500), timeout=self.action_timeout)
|
elem = self.page.get_by_text(value)
|
||||||
|
if elem.count():
|
||||||
|
elem.first.click(delay=randint(200, 500), timeout=self.action_timeout)
|
||||||
|
|
||||||
|
self.safe_page_operation(click_operation)
|
||||||
|
|
||||||
def action_click_element_containing_text_if_exists(self, selector=None, value=''):
|
def action_click_element_containing_text_if_exists(self, selector=None, value=''):
|
||||||
logger.debug("Clicking element containing text if exists")
|
logger.debug("Clicking element containing text if exists")
|
||||||
if not len(value.strip()):
|
if not value or not len(value.strip()):
|
||||||
return
|
|
||||||
elem = self.page.get_by_text(value)
|
|
||||||
logger.debug(f"Clicking element containing text - {elem.count()} elements found")
|
|
||||||
if elem.count():
|
|
||||||
elem.first.click(delay=randint(200, 500), timeout=self.action_timeout)
|
|
||||||
else:
|
|
||||||
return
|
return
|
||||||
|
|
||||||
|
def click_if_exists_operation():
|
||||||
|
elem = self.page.get_by_text(value)
|
||||||
|
logger.debug(f"Clicking element containing text - {elem.count()} elements found")
|
||||||
|
if elem.count():
|
||||||
|
elem.first.click(delay=randint(200, 500), timeout=self.action_timeout)
|
||||||
|
|
||||||
|
self.safe_page_operation(click_if_exists_operation)
|
||||||
|
|
||||||
def action_enter_text_in_field(self, selector, value):
|
def action_enter_text_in_field(self, selector, value):
|
||||||
if not len(selector.strip()):
|
if not selector or not len(selector.strip()):
|
||||||
return
|
return
|
||||||
|
|
||||||
self.page.fill(selector, value, timeout=self.action_timeout)
|
def fill_operation():
|
||||||
|
self.page.fill(selector, value, timeout=self.action_timeout)
|
||||||
|
|
||||||
|
self.safe_page_operation(fill_operation)
|
||||||
|
|
||||||
def action_execute_js(self, selector, value):
|
def action_execute_js(self, selector, value):
|
||||||
response = self.page.evaluate(value)
|
if not value:
|
||||||
return response
|
return None
|
||||||
|
|
||||||
|
def evaluate_operation():
|
||||||
|
return self.page.evaluate(value)
|
||||||
|
|
||||||
|
return self.safe_page_operation(evaluate_operation)
|
||||||
|
|
||||||
def action_click_element(self, selector, value):
|
def action_click_element(self, selector, value):
|
||||||
logger.debug("Clicking element")
|
logger.debug("Clicking element")
|
||||||
if not len(selector.strip()):
|
if not selector or not len(selector.strip()):
|
||||||
return
|
return
|
||||||
|
|
||||||
self.page.click(selector=selector, timeout=self.action_timeout + 20 * 1000, delay=randint(200, 500))
|
def click_operation():
|
||||||
|
self.page.click(selector=selector, timeout=self.action_timeout + 20 * 1000, delay=randint(200, 500))
|
||||||
|
|
||||||
|
self.safe_page_operation(click_operation)
|
||||||
|
|
||||||
def action_click_element_if_exists(self, selector, value):
|
def action_click_element_if_exists(self, selector, value):
|
||||||
import playwright._impl._errors as _api_types
|
import playwright._impl._errors as _api_types
|
||||||
logger.debug("Clicking element if exists")
|
logger.debug("Clicking element if exists")
|
||||||
if not len(selector.strip()):
|
if not selector or not len(selector.strip()):
|
||||||
return
|
|
||||||
try:
|
|
||||||
self.page.click(selector, timeout=self.action_timeout, delay=randint(200, 500))
|
|
||||||
except _api_types.TimeoutError as e:
|
|
||||||
return
|
|
||||||
except _api_types.Error as e:
|
|
||||||
# Element was there, but page redrew and now its long long gone
|
|
||||||
return
|
return
|
||||||
|
|
||||||
|
def click_if_exists_operation():
|
||||||
|
try:
|
||||||
|
self.page.click(selector, timeout=self.action_timeout, delay=randint(200, 500))
|
||||||
|
except _api_types.TimeoutError:
|
||||||
|
return
|
||||||
|
except _api_types.Error:
|
||||||
|
# Element was there, but page redrew and now its long long gone
|
||||||
|
return
|
||||||
|
|
||||||
|
self.safe_page_operation(click_if_exists_operation)
|
||||||
|
|
||||||
def action_click_x_y(self, selector, value):
|
def action_click_x_y(self, selector, value):
|
||||||
if not re.match(r'^\s?\d+\s?,\s?\d+\s?$', value):
|
if not value or not re.match(r'^\s?\d+\s?,\s?\d+\s?$', value):
|
||||||
raise Exception("'Click X,Y' step should be in the format of '100 , 90'")
|
logger.warning("'Click X,Y' step should be in the format of '100 , 90'")
|
||||||
|
return
|
||||||
|
|
||||||
x, y = value.strip().split(',')
|
try:
|
||||||
x = int(float(x.strip()))
|
x, y = value.strip().split(',')
|
||||||
y = int(float(y.strip()))
|
x = int(float(x.strip()))
|
||||||
self.page.mouse.click(x=x, y=y, delay=randint(200, 500))
|
y = int(float(y.strip()))
|
||||||
|
|
||||||
|
def click_xy_operation():
|
||||||
|
self.page.mouse.click(x=x, y=y, delay=randint(200, 500))
|
||||||
|
|
||||||
|
self.safe_page_operation(click_xy_operation)
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error parsing x,y coordinates: {str(e)}")
|
||||||
|
|
||||||
def action_scroll_down(self, selector, value):
|
def action_scroll_down(self, selector, value):
|
||||||
# Some sites this doesnt work on for some reason
|
def scroll_operation():
|
||||||
self.page.mouse.wheel(0, 600)
|
# Some sites this doesnt work on for some reason
|
||||||
self.page.wait_for_timeout(1000)
|
self.page.mouse.wheel(0, 600)
|
||||||
|
self.page.wait_for_timeout(1000)
|
||||||
|
|
||||||
|
self.safe_page_operation(scroll_operation)
|
||||||
|
|
||||||
def action_wait_for_seconds(self, selector, value):
|
def action_wait_for_seconds(self, selector, value):
|
||||||
self.page.wait_for_timeout(float(value.strip()) * 1000)
|
try:
|
||||||
|
seconds = float(value.strip()) if value else 1.0
|
||||||
|
|
||||||
|
def wait_operation():
|
||||||
|
self.page.wait_for_timeout(seconds * 1000)
|
||||||
|
|
||||||
|
self.safe_page_operation(wait_operation)
|
||||||
|
except (ValueError, TypeError) as e:
|
||||||
|
logger.error(f"Invalid value for wait_for_seconds: {str(e)}")
|
||||||
|
|
||||||
def action_wait_for_text(self, selector, value):
|
def action_wait_for_text(self, selector, value):
|
||||||
|
if not value:
|
||||||
|
return
|
||||||
|
|
||||||
import json
|
import json
|
||||||
v = json.dumps(value)
|
v = json.dumps(value)
|
||||||
self.page.wait_for_function(f'document.querySelector("body").innerText.includes({v});', timeout=30000)
|
|
||||||
|
def wait_for_text_operation():
|
||||||
|
self.page.wait_for_function(
|
||||||
|
f'document.querySelector("body").innerText.includes({v});',
|
||||||
|
timeout=30000
|
||||||
|
)
|
||||||
|
|
||||||
|
self.safe_page_operation(wait_for_text_operation)
|
||||||
|
|
||||||
def action_wait_for_text_in_element(self, selector, value):
|
def action_wait_for_text_in_element(self, selector, value):
|
||||||
|
if not selector or not value:
|
||||||
|
return
|
||||||
|
|
||||||
import json
|
import json
|
||||||
s = json.dumps(selector)
|
s = json.dumps(selector)
|
||||||
v = json.dumps(value)
|
v = json.dumps(value)
|
||||||
self.page.wait_for_function(f'document.querySelector({s}).innerText.includes({v});', timeout=30000)
|
|
||||||
|
def wait_for_text_in_element_operation():
|
||||||
|
self.page.wait_for_function(
|
||||||
|
f'document.querySelector({s}).innerText.includes({v});',
|
||||||
|
timeout=30000
|
||||||
|
)
|
||||||
|
|
||||||
|
self.safe_page_operation(wait_for_text_in_element_operation)
|
||||||
|
|
||||||
# @todo - in the future make some popout interface to capture what needs to be set
|
# @todo - in the future make some popout interface to capture what needs to be set
|
||||||
# https://playwright.dev/python/docs/api/class-keyboard
|
# https://playwright.dev/python/docs/api/class-keyboard
|
||||||
def action_press_enter(self, selector, value):
|
def action_press_enter(self, selector, value):
|
||||||
self.page.keyboard.press("Enter", delay=randint(200, 500))
|
def press_operation():
|
||||||
|
self.page.keyboard.press("Enter", delay=randint(200, 500))
|
||||||
|
|
||||||
|
self.safe_page_operation(press_operation)
|
||||||
|
|
||||||
def action_press_page_up(self, selector, value):
|
def action_press_page_up(self, selector, value):
|
||||||
self.page.keyboard.press("PageUp", delay=randint(200, 500))
|
def press_operation():
|
||||||
|
self.page.keyboard.press("PageUp", delay=randint(200, 500))
|
||||||
|
|
||||||
|
self.safe_page_operation(press_operation)
|
||||||
|
|
||||||
def action_press_page_down(self, selector, value):
|
def action_press_page_down(self, selector, value):
|
||||||
self.page.keyboard.press("PageDown", delay=randint(200, 500))
|
def press_operation():
|
||||||
|
self.page.keyboard.press("PageDown", delay=randint(200, 500))
|
||||||
|
|
||||||
|
self.safe_page_operation(press_operation)
|
||||||
|
|
||||||
def action_check_checkbox(self, selector, value):
|
def action_check_checkbox(self, selector, value):
|
||||||
self.page.locator(selector).check(timeout=self.action_timeout)
|
if not selector:
|
||||||
|
return
|
||||||
|
|
||||||
|
def check_operation():
|
||||||
|
self.page.locator(selector).check(timeout=self.action_timeout)
|
||||||
|
|
||||||
|
self.safe_page_operation(check_operation)
|
||||||
|
|
||||||
def action_uncheck_checkbox(self, selector, value):
|
def action_uncheck_checkbox(self, selector, value):
|
||||||
self.page.locator(selector).uncheck(timeout=self.action_timeout)
|
if not selector:
|
||||||
|
return
|
||||||
|
|
||||||
|
def uncheck_operation():
|
||||||
|
self.page.locator(selector).uncheck(timeout=self.action_timeout)
|
||||||
|
|
||||||
|
self.safe_page_operation(uncheck_operation)
|
||||||
|
|
||||||
def action_remove_elements(self, selector, value):
|
def action_remove_elements(self, selector, value):
|
||||||
"""Removes all elements matching the given selector from the DOM."""
|
"""Removes all elements matching the given selector from the DOM."""
|
||||||
self.page.locator(selector).evaluate_all("els => els.forEach(el => el.remove())")
|
if not selector:
|
||||||
|
return
|
||||||
|
|
||||||
|
def remove_operation():
|
||||||
|
self.page.locator(selector).evaluate_all("els => els.forEach(el => el.remove())")
|
||||||
|
|
||||||
|
self.safe_page_operation(remove_operation)
|
||||||
|
|
||||||
def action_make_all_child_elements_visible(self, selector, value):
|
def action_make_all_child_elements_visible(self, selector, value):
|
||||||
"""Recursively makes all child elements inside the given selector fully visible."""
|
"""Recursively makes all child elements inside the given selector fully visible."""
|
||||||
self.page.locator(selector).locator("*").evaluate_all("""
|
if not selector:
|
||||||
els => els.forEach(el => {
|
return
|
||||||
el.style.display = 'block'; // Forces it to be displayed
|
|
||||||
el.style.visibility = 'visible'; // Ensures it's not hidden
|
def make_visible_operation():
|
||||||
el.style.opacity = '1'; // Fully opaque
|
self.page.locator(selector).locator("*").evaluate_all("""
|
||||||
el.style.position = 'relative'; // Avoids 'absolute' hiding
|
els => els.forEach(el => {
|
||||||
el.style.height = 'auto'; // Expands collapsed elements
|
el.style.display = 'block'; // Forces it to be displayed
|
||||||
el.style.width = 'auto'; // Ensures full visibility
|
el.style.visibility = 'visible'; // Ensures it's not hidden
|
||||||
el.removeAttribute('hidden'); // Removes hidden attribute
|
el.style.opacity = '1'; // Fully opaque
|
||||||
el.classList.remove('hidden', 'd-none'); // Removes common CSS hidden classes
|
el.style.position = 'relative'; // Avoids 'absolute' hiding
|
||||||
})
|
el.style.height = 'auto'; // Expands collapsed elements
|
||||||
""")
|
el.style.width = 'auto'; // Ensures full visibility
|
||||||
|
el.removeAttribute('hidden'); // Removes hidden attribute
|
||||||
|
el.classList.remove('hidden', 'd-none'); // Removes common CSS hidden classes
|
||||||
|
})
|
||||||
|
""")
|
||||||
|
|
||||||
|
self.safe_page_operation(make_visible_operation)
|
||||||
|
|
||||||
# Responsible for maintaining a live 'context' with the chrome CDP
|
# Responsible for maintaining a live 'context' with the chrome CDP
|
||||||
# @todo - how long do contexts live for anyway?
|
# @todo - how long do contexts live for anyway?
|
||||||
|
@ -224,7 +357,9 @@ class browsersteps_live_ui(steppable_browser_interface):
|
||||||
# bump and kill this if idle after X sec
|
# bump and kill this if idle after X sec
|
||||||
age_start = 0
|
age_start = 0
|
||||||
headers = {}
|
headers = {}
|
||||||
|
# Track if resources are properly cleaned up
|
||||||
|
_is_cleaned_up = False
|
||||||
|
|
||||||
# use a special driver, maybe locally etc
|
# use a special driver, maybe locally etc
|
||||||
command_executor = os.getenv(
|
command_executor = os.getenv(
|
||||||
"PLAYWRIGHT_BROWSERSTEPS_DRIVER_URL"
|
"PLAYWRIGHT_BROWSERSTEPS_DRIVER_URL"
|
||||||
|
@ -243,9 +378,14 @@ class browsersteps_live_ui(steppable_browser_interface):
|
||||||
self.age_start = time.time()
|
self.age_start = time.time()
|
||||||
self.playwright_browser = playwright_browser
|
self.playwright_browser = playwright_browser
|
||||||
self.start_url = start_url
|
self.start_url = start_url
|
||||||
|
self._is_cleaned_up = False
|
||||||
if self.context is None:
|
if self.context is None:
|
||||||
self.connect(proxy=proxy)
|
self.connect(proxy=proxy)
|
||||||
|
|
||||||
|
def __del__(self):
|
||||||
|
# Ensure cleanup happens if object is garbage collected
|
||||||
|
self.cleanup()
|
||||||
|
|
||||||
# Connect and setup a new context
|
# Connect and setup a new context
|
||||||
def connect(self, proxy=None):
|
def connect(self, proxy=None):
|
||||||
# Should only get called once - test that
|
# Should only get called once - test that
|
||||||
|
@ -264,31 +404,74 @@ class browsersteps_live_ui(steppable_browser_interface):
|
||||||
user_agent=manage_user_agent(headers=self.headers),
|
user_agent=manage_user_agent(headers=self.headers),
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
self.page = self.context.new_page()
|
self.page = self.context.new_page()
|
||||||
|
|
||||||
# self.page.set_default_navigation_timeout(keep_open)
|
# self.page.set_default_navigation_timeout(keep_open)
|
||||||
self.page.set_default_timeout(keep_open)
|
self.page.set_default_timeout(keep_open)
|
||||||
# @todo probably this doesnt work
|
# Set event handlers
|
||||||
self.page.on(
|
self.page.on("close", self.mark_as_closed)
|
||||||
"close",
|
|
||||||
self.mark_as_closed,
|
|
||||||
)
|
|
||||||
# Listen for all console events and handle errors
|
# Listen for all console events and handle errors
|
||||||
self.page.on("console", lambda msg: print(f"Browser steps console - {msg.type}: {msg.text} {msg.args}"))
|
self.page.on("console", lambda msg: print(f"Browser steps console - {msg.type}: {msg.text} {msg.args}"))
|
||||||
|
|
||||||
logger.debug(f"Time to browser setup {time.time()-now:.2f}s")
|
logger.debug(f"Time to browser setup {time.time()-now:.2f}s")
|
||||||
self.page.wait_for_timeout(1 * 1000)
|
self.page.wait_for_timeout(1 * 1000)
|
||||||
|
|
||||||
|
|
||||||
def mark_as_closed(self):
|
def mark_as_closed(self):
|
||||||
logger.debug("Page closed, cleaning up..")
|
logger.debug("Page closed, cleaning up..")
|
||||||
|
self.cleanup()
|
||||||
|
|
||||||
|
def cleanup(self):
|
||||||
|
"""Properly clean up all resources to prevent memory leaks"""
|
||||||
|
if self._is_cleaned_up:
|
||||||
|
return
|
||||||
|
|
||||||
|
logger.debug("Cleaning up browser steps resources")
|
||||||
|
|
||||||
|
# Clean up page
|
||||||
|
if hasattr(self, 'page') and self.page is not None:
|
||||||
|
try:
|
||||||
|
# Force garbage collection before closing
|
||||||
|
self.page.request_gc()
|
||||||
|
except Exception as e:
|
||||||
|
logger.debug(f"Error during page garbage collection: {str(e)}")
|
||||||
|
|
||||||
|
try:
|
||||||
|
# Remove event listeners before closing
|
||||||
|
self.page.remove_listener("close", self.mark_as_closed)
|
||||||
|
except Exception as e:
|
||||||
|
logger.debug(f"Error removing event listeners: {str(e)}")
|
||||||
|
|
||||||
|
try:
|
||||||
|
self.page.close()
|
||||||
|
except Exception as e:
|
||||||
|
logger.debug(f"Error closing page: {str(e)}")
|
||||||
|
|
||||||
|
self.page = None
|
||||||
|
|
||||||
|
# Clean up context
|
||||||
|
if hasattr(self, 'context') and self.context is not None:
|
||||||
|
try:
|
||||||
|
self.context.close()
|
||||||
|
except Exception as e:
|
||||||
|
logger.debug(f"Error closing context: {str(e)}")
|
||||||
|
|
||||||
|
self.context = None
|
||||||
|
|
||||||
|
self._is_cleaned_up = True
|
||||||
|
logger.debug("Browser steps resources cleanup complete")
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def has_expired(self):
|
def has_expired(self):
|
||||||
if not self.page:
|
if not self.page or self._is_cleaned_up:
|
||||||
return True
|
return True
|
||||||
|
|
||||||
|
# Check if session has expired based on age
|
||||||
|
max_age_seconds = int(os.getenv("BROWSER_STEPS_MAX_AGE_SECONDS", 60 * 10)) # Default 10 minutes
|
||||||
|
if (time.time() - self.age_start) > max_age_seconds:
|
||||||
|
logger.debug(f"Browser steps session expired after {max_age_seconds} seconds")
|
||||||
|
return True
|
||||||
|
|
||||||
|
return False
|
||||||
|
|
||||||
def get_current_state(self):
|
def get_current_state(self):
|
||||||
"""Return the screenshot and interactive elements mapping, generally always called after action_()"""
|
"""Return the screenshot and interactive elements mapping, generally always called after action_()"""
|
||||||
|
@ -297,36 +480,55 @@ class browsersteps_live_ui(steppable_browser_interface):
|
||||||
# because we for now only run browser steps in playwright mode (not puppeteer mode)
|
# because we for now only run browser steps in playwright mode (not puppeteer mode)
|
||||||
from changedetectionio.content_fetchers.playwright import capture_full_page
|
from changedetectionio.content_fetchers.playwright import capture_full_page
|
||||||
|
|
||||||
|
# Safety check - don't proceed if resources are cleaned up
|
||||||
|
if self._is_cleaned_up or self.page is None:
|
||||||
|
logger.warning("Attempted to get current state after cleanup")
|
||||||
|
return (None, None)
|
||||||
|
|
||||||
xpath_element_js = importlib.resources.files("changedetectionio.content_fetchers.res").joinpath('xpath_element_scraper.js').read_text()
|
xpath_element_js = importlib.resources.files("changedetectionio.content_fetchers.res").joinpath('xpath_element_scraper.js').read_text()
|
||||||
|
|
||||||
now = time.time()
|
now = time.time()
|
||||||
self.page.wait_for_timeout(1 * 1000)
|
self.page.wait_for_timeout(1 * 1000)
|
||||||
|
|
||||||
screenshot = capture_full_page(page=self.page)
|
screenshot = None
|
||||||
|
xpath_data = None
|
||||||
|
|
||||||
|
try:
|
||||||
|
# Get screenshot first
|
||||||
|
screenshot = capture_full_page(page=self.page)
|
||||||
|
logger.debug(f"Time to get screenshot from browser {time.time() - now:.2f}s")
|
||||||
|
|
||||||
logger.debug(f"Time to get screenshot from browser {time.time() - now:.2f}s")
|
# Then get interactive elements
|
||||||
|
now = time.time()
|
||||||
|
self.page.evaluate("var include_filters=''")
|
||||||
|
self.page.request_gc()
|
||||||
|
|
||||||
now = time.time()
|
scan_elements = 'a,button,input,select,textarea,i,th,td,p,li,h1,h2,h3,h4,div,span'
|
||||||
self.page.evaluate("var include_filters=''")
|
|
||||||
# Go find the interactive elements
|
|
||||||
# @todo in the future, something smarter that can scan for elements with .click/focus etc event handlers?
|
|
||||||
|
|
||||||
self.page.request_gc()
|
MAX_TOTAL_HEIGHT = int(os.getenv("SCREENSHOT_MAX_HEIGHT", SCREENSHOT_MAX_HEIGHT_DEFAULT))
|
||||||
|
xpath_data = json.loads(self.page.evaluate(xpath_element_js, {
|
||||||
|
"visualselector_xpath_selectors": scan_elements,
|
||||||
|
"max_height": MAX_TOTAL_HEIGHT
|
||||||
|
}))
|
||||||
|
self.page.request_gc()
|
||||||
|
|
||||||
scan_elements = 'a,button,input,select,textarea,i,th,td,p,li,h1,h2,h3,h4,div,span'
|
# Sort elements by size
|
||||||
|
xpath_data['size_pos'] = sorted(xpath_data['size_pos'], key=lambda k: k['width'] * k['height'], reverse=True)
|
||||||
MAX_TOTAL_HEIGHT = int(os.getenv("SCREENSHOT_MAX_HEIGHT", SCREENSHOT_MAX_HEIGHT_DEFAULT))
|
logger.debug(f"Time to scrape xPath element data in browser {time.time()-now:.2f}s")
|
||||||
xpath_data = json.loads(self.page.evaluate(xpath_element_js, {
|
|
||||||
"visualselector_xpath_selectors": scan_elements,
|
except Exception as e:
|
||||||
"max_height": MAX_TOTAL_HEIGHT
|
logger.error(f"Error getting current state: {str(e)}")
|
||||||
}))
|
# Attempt recovery - force garbage collection
|
||||||
self.page.request_gc()
|
try:
|
||||||
|
self.page.request_gc()
|
||||||
# So the JS will find the smallest one first
|
except:
|
||||||
xpath_data['size_pos'] = sorted(xpath_data['size_pos'], key=lambda k: k['width'] * k['height'], reverse=True)
|
pass
|
||||||
logger.debug(f"Time to scrape xPath element data in browser {time.time()-now:.2f}s")
|
|
||||||
|
# Request garbage collection one final time
|
||||||
# playwright._impl._api_types.Error: Browser closed.
|
try:
|
||||||
# @todo show some countdown timer?
|
self.page.request_gc()
|
||||||
|
except:
|
||||||
|
pass
|
||||||
|
|
||||||
return (screenshot, xpath_data)
|
return (screenshot, xpath_data)
|
||||||
|
|
||||||
|
|
Ładowanie…
Reference in New Issue