Always extract page <title> Re #3402

3402-always-update-page-title
dgtlmoon 2025-09-08 18:04:02 +02:00
rodzic 7576bec66a
commit a7c21c566c
11 zmienionych plików z 83 dodań i 26 usunięć

Wyświetl plik

@ -310,15 +310,6 @@ async def async_update_worker(worker_id, q, notification_q, app, datastore):
continue
if process_changedetection_results:
# Extract title if needed
if datastore.data['settings']['application'].get('extract_title_as_title') or watch['extract_title_as_title']:
if not watch['title'] or not len(watch['title']):
try:
update_obj['title'] = html_tools.extract_element(find='title', html_content=update_handler.fetcher.content)
logger.info(f"UUID: {uuid} Extract <title> updated title to '{update_obj['title']}")
except Exception as e:
logger.warning(f"UUID: {uuid} Extract <title> as watch title was enabled, but couldn't find a <title>.")
try:
datastore.update_watch(uuid=uuid, update_obj=update_obj)
@ -357,6 +348,14 @@ async def async_update_worker(worker_id, q, notification_q, app, datastore):
# Always record attempt count
count = watch.get('check_count', 0) + 1
# Always record page title (used in notifications, and can change even when the content is the same)
try:
page_title = html_tools.extract_title(data=update_handler.fetcher.content)
logger.info(f"UUID: {uuid} Page <title> is '{page_title}")
datastore.update_watch(uuid=uuid, update_obj={'title': page_title})
except Exception as e:
logger.warning(f"UUID: {uuid} Exception when extracting <title> - {str(e)}")
# Record server header
try:
server_header = update_handler.fetcher.headers.get('server', '').strip().lower()[:255]

Wyświetl plik

@ -84,7 +84,7 @@
<span class="pure-form-message-inline">Love RSS? Does your reader support HTML? Set it here</span>
</div>
<div class="pure-control-group">
{{ render_checkbox_field(form.application.form.extract_title_as_title) }}
{{ render_checkbox_field(form.application.form.use_page_title_in_list) }}
<span class="pure-form-message-inline">Note: This will automatically apply to all existing watches.</span>
</div>
<div class="pure-control-group">

Wyświetl plik

@ -548,7 +548,7 @@ class commonSettingsForm(Form):
self.notification_title.extra_notification_tokens = kwargs.get('extra_notification_tokens', {})
self.notification_urls.extra_notification_tokens = kwargs.get('extra_notification_tokens', {})
extract_title_as_title = BooleanField('Extract <title> from document and use as watch title', default=False)
use_page_title_in_list = BooleanField('Use page <title> in watch', default=False)
fetch_backend = RadioField(u'Fetch Method', choices=content_fetchers.available_fetchers(), validators=[ValidateContentFetcherIsReady()])
notification_body = TextAreaField('Notification Body', default='{{ watch_url }} had a change.', validators=[validators.Optional(), ValidateJinja2Template()])
notification_format = SelectField('Notification format', choices=valid_notification_formats.keys())

Wyświetl plik

@ -1,6 +1,7 @@
from loguru import logger
from lxml import etree
from typing import List
import html
import json
import re
@ -9,6 +10,11 @@ TEXT_FILTER_LIST_LINE_SUFFIX = "<br>"
TRANSLATE_WHITESPACE_TABLE = str.maketrans('', '', '\r\n\t ')
PERL_STYLE_REGEX = r'^/(.*?)/([a-z]*)?$'
TITLE_RE = re.compile(r"<title[^>]*>(.*?)</title>", re.I | re.S)
META_CS = re.compile(r'<meta[^>]+charset=["\']?\s*([a-z0-9_\-:+.]+)', re.I)
META_CT = re.compile(r'<meta[^>]+http-equiv=["\']?content-type["\']?[^>]*content=["\'][^>]*charset=([a-z0-9_\-:+.]+)', re.I)
# 'price' , 'lowPrice', 'highPrice' are usually under here
# All of those may or may not appear on different websites - I didnt find a way todo case-insensitive searching here
LD_JSON_PRODUCT_OFFER_SELECTORS = ["json:$..offers", "json:$..Offers"]
@ -510,3 +516,41 @@ def get_triggered_text(content, trigger_text):
i += 1
return triggered_text
def extract_title(data: bytes | str, sniff_bytes: int = 2048, scan_chars: int = 8192) -> str | None:
try:
# Only decode/process the prefix we need for title extraction
match data:
case bytes() if data.startswith((b"\xff\xfe", b"\xfe\xff")):
prefix = data[:scan_chars * 2].decode("utf-16", errors="replace")
case bytes() if data.startswith((b"\xff\xfe\x00\x00", b"\x00\x00\xfe\xff")):
prefix = data[:scan_chars * 4].decode("utf-32", errors="replace")
case bytes():
try:
prefix = data[:scan_chars].decode("utf-8")
except UnicodeDecodeError:
try:
head = data[:sniff_bytes].decode("ascii", errors="ignore")
if m := (META_CS.search(head) or META_CT.search(head)):
enc = m.group(1).lower()
else:
enc = "cp1252"
prefix = data[:scan_chars * 2].decode(enc, errors="replace")
except Exception as e:
logger.error(f"Title extraction encoding detection failed: {e}")
return None
case str():
prefix = data[:scan_chars] if len(data) > scan_chars else data
case _:
logger.error(f"Title extraction received unsupported data type: {type(data)}")
return None
# Search only in the prefix
if m := TITLE_RE.search(prefix):
return html.unescape(" ".join(m.group(1).split())).strip()
return None
except Exception as e:
logger.error(f"Title extraction failed: {e}")
return None

Wyświetl plik

@ -39,7 +39,6 @@ class model(dict):
'api_access_token_enabled': True,
'base_url' : None,
'empty_pages_are_a_change': False,
'extract_title_as_title': False,
'fetch_backend': getenv("DEFAULT_FETCH_BACKEND", "html_requests"),
'filter_failure_notification_threshold_attempts': _FILTER_FAILURE_THRESHOLD_ATTEMPTS_DEFAULT,
'global_ignore_text': [], # List of text to ignore when calculating the comparison checksum
@ -57,9 +56,10 @@ class model(dict):
'rss_hide_muted_watches': True,
'schema_version' : 0,
'shared_diff_access': False,
'webdriver_delay': None , # Extra delay in seconds before extracting text
'tags': {}, #@todo use Tag.model initialisers
'timezone': None, # Default IANA timezone name
'use_page_title_in_list': False,
'webdriver_delay': None , # Extra delay in seconds before extracting text
'ui': {
'open_diff_in_new_tab': True,
'socket_io_enabled': True,

Wyświetl plik

@ -24,7 +24,6 @@ class watch_base(dict):
'content-type': None,
'date_created': None,
'extract_text': [], # Extract text by regex after filters
'extract_title_as_title': False,
'fetch_backend': 'system', # plaintext, playwright etc
'fetch_time': 0.0,
'filter_failure_notification_send': strtobool(os.getenv('FILTER_FAILURE_NOTIFICATION_SEND_DEFAULT', 'True')),
@ -128,6 +127,7 @@ class watch_base(dict):
'remove_duplicate_lines': False,
'trigger_text': [], # List of text or regex to wait for until a change is detected
'url': '',
'use_page_title_in_list': False,
'uuid': str(uuid.uuid4()),
'webdriver_delay': None,
'webdriver_js_execute_code': None, # Run before change-detection

Wyświetl plik

@ -262,11 +262,6 @@ class ChangeDetectionStore:
extras = deepcopy(self.data['watching'][uuid])
new_uuid = self.add_watch(url=url, extras=extras)
watch = self.data['watching'][new_uuid]
if self.data['settings']['application'].get('extract_title_as_title') or watch['extract_title_as_title']:
# Because it will be recalculated on the next fetch
self.data['watching'][new_uuid]['title'] = None
return new_uuid
def url_exists(self, url):
@ -308,7 +303,6 @@ class ChangeDetectionStore:
'browser_steps',
'css_filter',
'extract_text',
'extract_title_as_title',
'headers',
'ignore_text',
'include_filters',
@ -323,6 +317,7 @@ class ChangeDetectionStore:
'title',
'trigger_text',
'url',
'use_page_title_in_list',
'webdriver_js_execute_code',
]:
if res.get(k):
@ -973,6 +968,16 @@ class ChangeDetectionStore:
f_d.write(zlib.compress(f_j.read()))
os.unlink(json_path)
def update_20(self):
for uuid, watch in self.data['watching'].items():
if self.data['watching'][uuid].get('extract_title_as_title'):
self.data['watching'][uuid]['use_page_title_in_list'] = self.data['watching'][uuid].get('extract_title_as_title')
del self.data['watching'][uuid]['extract_title_as_title']
if self.data['settings']['application'].get('extract_title_as_title'):
self.data['settings']['application']['use_page_title_in_list'] = self.data['settings']['application'].get('extract_title_as_title')
def add_notification_url(self, notification_url):
logger.debug(f">>> Adding new notification_url - '{notification_url}'")

Wyświetl plik

@ -70,7 +70,7 @@
</tr>
<tr>
<td><code>{{ '{{watch_title}}' }}</code></td>
<td>The title of the watch.</td>
<td>The page title of the watch.</td>
</tr>
<tr>
<td><code>{{ '{{watch_tag}}' }}</code></td>

Wyświetl plik

@ -102,7 +102,7 @@
<br>
</div>
<div class="pure-control-group">
{{ render_checkbox_field(form.extract_title_as_title) }}
{{ render_checkbox_field(form.use_page_title_in_list) }}
</div>
<div class="pure-control-group">
{{ render_checkbox_field(form.filter_failure_notification_send) }}

Wyświetl plik

@ -123,7 +123,7 @@ def test_check_basic_change_detection_functionality(client, live_server, measure
# Enable auto pickup of <title> in settings
res = client.post(
url_for("settings.settings_page"),
data={"application-extract_title_as_title": "1", "requests-time_between_check-minutes": 180,
data={"application-use_page_title_in_list": "1", "requests-time_between_check-minutes": 180,
'application-fetch_backend': "html_requests"},
follow_redirects=True
)
@ -138,6 +138,15 @@ def test_check_basic_change_detection_functionality(client, live_server, measure
# It should have picked up the <title>
assert b'head title' in res.data
# Recheck it but only with a title change
set_original_response(extra_title=" and more")
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
wait_for_all_checks(client)
res = client.get(url_for("watchlist.index"))
assert b'head title and more' in res.data
# Be sure the last_viewed is going to be greater than the last snapshot
time.sleep(1)

Wyświetl plik

@ -6,9 +6,9 @@ from flask import url_for
import logging
import time
def set_original_response():
test_return_data = """<html>
<head><title>head title</title></head>
def set_original_response(extra_title=''):
test_return_data = f"""<html>
<head><title>head title{extra_title}</title></head>
<body>
Some initial text<br>
<p>Which is across multiple lines</p>