#!/usr/bin/env python3 import time from flask import url_for from .util import set_original_response, set_modified_response, live_server_setup, wait_for_all_checks, extract_rss_token_from_UI, \ extract_UUID_from_client def set_original_cdata_xml(): test_return_data = """ Gizi https://test.com en <![CDATA[ <img src="https://testsite.com/hacked.jpg"> Hackers can access your computer ]]> https://testsite.com/news/12341234234

The days of Terminator and The Matrix could be closer. But be positive.

Read more link...

]]>
cybernetics rand corporation Tue, 17 Oct 2023 15:10:00 GMT 1850933241
Some other title https://testsite.com/news/12341234236 Some other description
""" with open("test-datastore/endpoint-content.txt", "w") as f: f.write(test_return_data) def set_html_content(content): test_return_data = f""" Some initial text

{content}


So let's see what happens.
""" # Write as UTF-8 encoded bytes with open("test-datastore/endpoint-content.txt", "wb") as f: f.write(test_return_data.encode('utf-8')) def test_setup(client, live_server, measure_memory_usage): live_server_setup(live_server) def test_rss_and_token(client, live_server, measure_memory_usage): # live_server_setup(live_server) set_original_response() rss_token = extract_rss_token_from_UI(client) # Add our URL to the import page res = client.post( url_for("imports.import_page"), data={"urls": url_for('test_random_content_endpoint', _external=True)}, follow_redirects=True ) assert b"1 Imported" in res.data wait_for_all_checks(client) set_modified_response() time.sleep(1) client.get(url_for("ui.form_watch_checknow"), follow_redirects=True) wait_for_all_checks(client) # Add our URL to the import page res = client.get( url_for("rss.feed", token="bad token", _external=True), follow_redirects=True ) assert b"Access denied, bad token" in res.data res = client.get( url_for("rss.feed", token=rss_token, _external=True), follow_redirects=True ) assert b"Access denied, bad token" not in res.data assert b"Random content" in res.data client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True) def test_basic_cdata_rss_markup(client, live_server, measure_memory_usage): #live_server_setup(live_server) set_original_cdata_xml() test_url = url_for('test_endpoint', content_type="application/xml", _external=True) # Add our URL to the import page res = client.post( url_for("imports.import_page"), data={"urls": test_url}, follow_redirects=True ) assert b"1 Imported" in res.data wait_for_all_checks(client) res = client.get( url_for("ui.ui_views.preview_page", uuid="first"), follow_redirects=True ) assert b'CDATA' not in res.data assert b' Watch'}, follow_redirects=True ) assert b"Watch added in Paused state, saving will unpause" in res.data uuid = next(iter(live_server.app.config['DATASTORE'].data['watching'])) res = client.post( url_for("ui.ui_edit.edit_page", uuid=uuid, unpause_on_save=1), data={ "include_filters": "//item/title", "fetch_backend": "html_requests", "headers": "", "proxy": "no-proxy", "tags": "", "url": test_url, }, follow_redirects=True ) assert b"unpaused" in res.data wait_for_all_checks(client) res = client.get( url_for("ui.ui_views.preview_page", uuid="first"), follow_redirects=True ) assert b'CDATA' not in res.data assert b' to

to be sure title renders assert b'Hackers can access your computer' in res.data # Should ONLY be selected by the xpath assert b'Some other title' in res.data # Should ONLY be selected by the xpath assert b'The days of Terminator' not in res.data # Should NOT be selected by the xpath assert b'Some other description' not in res.data # Should NOT be selected by the xpath res = client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True) def test_rss_bad_chars_breaking(client, live_server): """This should absolutely trigger the RSS builder to go into worst state mode - source: prefix means no html conversion (which kinda filters out the bad stuff) - Binary data - Very long so that the saving is performed by Brotli (and decoded back to bytes) Otherwise feedgen should support regular unicode """ #live_server_setup(live_server) with open("test-datastore/endpoint-content.txt", "w") as f: ten_kb_string = "A" * 10_000 f.write(ten_kb_string) test_url = url_for('test_endpoint', _external=True) res = client.post( url_for("imports.import_page"), data={"urls": "source:"+test_url}, follow_redirects=True ) assert b"1 Imported" in res.data wait_for_all_checks(client) # Set the bad content with open("test-datastore/endpoint-content.txt", "w") as f: jpeg_bytes = "\xff\xd8\xff\xe0\x00\x10XXXXXXXX\x00\x01\x02\x00\x00\x01\x00\x01\x00\x00" # JPEG header jpeg_bytes += "A" * 10_000 f.write(jpeg_bytes) res = client.get(url_for("ui.form_watch_checknow"), follow_redirects=True) assert b'Queued 1 watch for rechecking.' in res.data wait_for_all_checks(client) rss_token = extract_rss_token_from_UI(client) uuid = next(iter(live_server.app.config['DATASTORE'].data['watching'])) assert live_server.app.config['DATASTORE'].data['watching'][uuid].history_n == 2 # Check RSS feed is still working res = client.get( url_for("rss.feed", uuid=uuid, token=rss_token), follow_redirects=False # Important! leave this off! it should not redirect ) assert res.status_code == 200 #assert live_server.app.config['DATASTORE'].data['watching'][uuid].history_n == 2 #assert live_server.app.config['DATASTORE'].data['watching'][uuid].history_n == 2