CVE-2025-52558 - Fixing XSS in error handling output of watch overview list

pull/3275/head
dgtlmoon 2025-06-21 09:44:06 +02:00
rodzic 4f362385e1
commit 3d5a544ea6
6 zmienionych plików z 61 dodań i 79 usunięć

Wyświetl plik

@ -146,7 +146,7 @@ document.addEventListener('DOMContentLoaded', function() {
{%- if watch.is_pdf -%}<img class="status-icon" src="{{url_for('static_content', group='images', filename='pdf-icon.svg')}}" alt="Converting PDF to text" >{%- endif -%}
{%- if watch.has_browser_steps -%}<img class="status-icon status-browsersteps" src="{{url_for('static_content', group='images', filename='steps.svg')}}" alt="Browser Steps is enabled" >{%- endif -%}
<div class="error-text" style="display:none;">{{ watch.compile_error_texts(has_proxies=datastore.proxy_list)|safe }}</div>
<div class="error-text" style="display:none;">{{ watch.compile_error_texts(has_proxies=datastore.proxy_list) }}</div>
{%- if watch['processor'] == 'text_json_diff' -%}
{%- if watch['has_ldjson_price_data'] and not watch['track_ldjson_price_data'] -%}

Wyświetl plik

@ -8,6 +8,7 @@ import re
from pathlib import Path
from loguru import logger
from .. import safe_jinja
from ..html_tools import TRANSLATE_WHITESPACE_TABLE
# Allowable protocols, protects against javascript: etc
@ -691,11 +692,11 @@ class model(watch_base):
output.append(str(Markup(f"<div class=\"notification-error\"><a href=\"{url_for('settings.notification_logs')}\">{ self.get('last_notification_error') }</a></div>")))
else:
# Lo_Fi version
# Lo_Fi version - no app context, cant rely on Jinja2 Markup
if last_error:
output.append(str(Markup(last_error)))
output.append(safe_jinja.render_fully_escaped(last_error))
if self.get('last_notification_error'):
output.append(str(Markup(self.get('last_notification_error'))))
output.append(safe_jinja.render_fully_escaped(self.get('last_notification_error')))
res = "\n".join(output)
return res

Wyświetl plik

@ -10,9 +10,15 @@ import os
JINJA2_MAX_RETURN_PAYLOAD_SIZE = 1024 * int(os.getenv("JINJA2_MAX_RETURN_PAYLOAD_SIZE_KB", 1024 * 10))
# This is used for notifications etc, so actually it's OK to send custom HTML such as <a href> etc, but it should limit what data is available.
# (Which also limits available functions that could be called)
def render(template_str, **args: t.Any) -> str:
jinja2_env = jinja2.sandbox.ImmutableSandboxedEnvironment(extensions=['jinja2_time.TimeExtension'])
output = jinja2_env.from_string(template_str).render(args)
return output[:JINJA2_MAX_RETURN_PAYLOAD_SIZE]
def render_fully_escaped(content):
env = jinja2.sandbox.ImmutableSandboxedEnvironment(autoescape=True)
template = env.from_string("{{ some_html|e }}")
return template.render(some_html=content)

Wyświetl plik

@ -1,72 +0,0 @@
import asyncio
import socketio
from aiohttp import web
SOCKETIO_URL = 'ws://localhost.localdomain:5005'
SOCKETIO_PATH = "/socket.io"
NUM_CLIENTS = 1
clients = []
shutdown_event = asyncio.Event()
class WatchClient:
def __init__(self, client_id: int):
self.client_id = client_id
self.i_got_watch_update_event = False
self.sio = socketio.AsyncClient(reconnection_attempts=50, reconnection_delay=1)
@self.sio.event
async def connect():
print(f"[Client {self.client_id}] Connected")
@self.sio.event
async def disconnect():
print(f"[Client {self.client_id}] Disconnected")
@self.sio.on("watch_update")
async def on_watch_update(watch):
self.i_got_watch_update_event = True
print(f"[Client {self.client_id}] Received update: {watch}")
async def run(self):
try:
await self.sio.connect(SOCKETIO_URL, socketio_path=SOCKETIO_PATH, transports=["websocket", "polling"])
await self.sio.wait()
except Exception as e:
print(f"[Client {self.client_id}] Connection error: {e}")
async def handle_check(request):
all_received = all(c.i_got_watch_update_event for c in clients)
result = "yes" if all_received else "no"
print(f"Received HTTP check — returning '{result}'")
shutdown_event.set() # Signal shutdown
return web.Response(text=result)
async def start_http_server():
app = web.Application()
app.add_routes([web.get('/did_all_clients_get_watch_update', handle_check)])
runner = web.AppRunner(app)
await runner.setup()
site = web.TCPSite(runner, '0.0.0.0', 6666)
await site.start()
async def main():
#await start_http_server()
for i in range(NUM_CLIENTS):
client = WatchClient(i)
clients.append(client)
asyncio.create_task(client.run())
await shutdown_event.wait()
print("Shutting down...")
# Graceful disconnect
for c in clients:
await c.sio.disconnect()
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
print("Interrupted")

Wyświetl plik

@ -5,8 +5,22 @@ from .util import live_server_setup, wait_for_all_checks
from .. import strtobool
# def test_setup(client, live_server, measure_memory_usage):
# live_server_setup(live_server) # Setup on conftest per function
def set_original_response():
test_return_data = """<html>
<head><title>head title</title></head>
<body>
Some initial text<br>
<p>Which is across multiple lines</p>
<br>
So let's see what happens. <br>
<span class="foobar-detection" style='display:none'></span>
</body>
</html>
"""
with open("test-datastore/endpoint-content.txt", "w") as f:
f.write(test_return_data)
return None
def test_bad_access(client, live_server, measure_memory_usage):
@ -118,3 +132,33 @@ def test_xss(client, live_server, measure_memory_usage):
assert b"<img src=x onerror=alert(" not in res.data
assert b"&lt;img" in res.data
def test_xss_watch_last_error(client, live_server, measure_memory_usage):
set_original_response()
# Add our URL to the import page
res = client.post(
url_for("imports.import_page"),
data={"urls": url_for('test_endpoint', _external=True)},
follow_redirects=True
)
assert b"1 Imported" in res.data
wait_for_all_checks(client)
res = client.post(
url_for("ui.ui_edit.edit_page", uuid="first"),
data={
"include_filters": '<a href="https://foobar"></a><script>alert(123);</script>',
"url": url_for('test_endpoint', _external=True),
'fetch_backend': "html_requests"
},
follow_redirects=True
)
assert b"Updated watch." in res.data
wait_for_all_checks(client)
res = client.get(url_for("watchlist.index"))
assert b"<script>alert(123);</script>" not in res.data # this text should be there
assert b'&lt;a href=&#34;https://foobar&#34;&gt;&lt;/a&gt;&lt;script&gt;alert(123);&lt;/script&gt;' in res.data
assert b"https://foobar" in res.data # this text should be there

Wyświetl plik

@ -51,6 +51,9 @@ class TestJinja2SSTI(unittest.TestCase):
for attempt in attempt_list:
self.assertEqual(len(safe_jinja.render(attempt)), 0, f"string test '{attempt}' is correctly empty")
def test_jinja2_escaped_html(self):
x = safe_jinja.render_fully_escaped('woo <a href="https://google.com">dfdfd</a>')
self.assertEqual(x, "woo &lt;a href=&#34;https://google.com&#34;&gt;dfdfd&lt;/a&gt;")
if __name__ == '__main__':