kopia lustrzana https://github.com/dgtlmoon/changedetection.io
				
				
				
			Fetching - Be sure that content-type detection works when the headers are a mixed case (#1604)
							rodzic
							
								
									ba8d2e0c2d
								
							
						
					
					
						commit
						f9387522ee
					
				| 
						 | 
				
			
			@ -147,6 +147,13 @@ class Fetcher():
 | 
			
		|||
    def is_ready(self):
 | 
			
		||||
        return True
 | 
			
		||||
 | 
			
		||||
    def get_all_headers(self):
 | 
			
		||||
        """
 | 
			
		||||
        Get all headers but ensure all keys are lowercase
 | 
			
		||||
        :return:
 | 
			
		||||
        """
 | 
			
		||||
        return {k.lower(): v for k, v in self.headers.items()}
 | 
			
		||||
 | 
			
		||||
    def iterate_browser_steps(self):
 | 
			
		||||
        from changedetectionio.blueprint.browser_steps.browser_steps import steppable_browser_interface
 | 
			
		||||
        from playwright._impl._api_types import TimeoutError
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -139,7 +139,7 @@ class perform_site_check(difference_detection_processor):
 | 
			
		|||
        self.xpath_data = fetcher.xpath_data
 | 
			
		||||
 | 
			
		||||
        # Track the content type
 | 
			
		||||
        update_obj['content_type'] = fetcher.headers.get('Content-Type', '')
 | 
			
		||||
        update_obj['content_type'] = fetcher.get_all_headers().get('content-type', '').lower()
 | 
			
		||||
 | 
			
		||||
        # Watches added automatically in the queue manager will skip if its the same checksum as the previous run
 | 
			
		||||
        # Saves a lot of CPU
 | 
			
		||||
| 
						 | 
				
			
			@ -159,7 +159,7 @@ class perform_site_check(difference_detection_processor):
 | 
			
		|||
        # https://stackoverflow.com/questions/41817578/basic-method-chaining ?
 | 
			
		||||
        # return content().textfilter().jsonextract().checksumcompare() ?
 | 
			
		||||
 | 
			
		||||
        is_json = 'application/json' in fetcher.headers.get('Content-Type', '')
 | 
			
		||||
        is_json = 'application/json' in fetcher.get_all_headers().get('content-type', '').lower()
 | 
			
		||||
        is_html = not is_json
 | 
			
		||||
 | 
			
		||||
        # source: support, basically treat it as plaintext
 | 
			
		||||
| 
						 | 
				
			
			@ -167,7 +167,7 @@ class perform_site_check(difference_detection_processor):
 | 
			
		|||
            is_html = False
 | 
			
		||||
            is_json = False
 | 
			
		||||
 | 
			
		||||
        if watch.is_pdf or 'application/pdf' in fetcher.headers.get('Content-Type', '').lower():
 | 
			
		||||
        if watch.is_pdf or 'application/pdf' in fetcher.get_all_headers().get('content-type', '').lower():
 | 
			
		||||
            from shutil import which
 | 
			
		||||
            tool = os.getenv("PDF_TO_HTML_TOOL", "pdftohtml")
 | 
			
		||||
            if not which(tool):
 | 
			
		||||
| 
						 | 
				
			
			@ -235,7 +235,7 @@ class perform_site_check(difference_detection_processor):
 | 
			
		|||
            html_content = fetcher.content
 | 
			
		||||
 | 
			
		||||
            # If not JSON,  and if it's not text/plain..
 | 
			
		||||
            if 'text/plain' in fetcher.headers.get('Content-Type', '').lower():
 | 
			
		||||
            if 'text/plain' in fetcher.get_all_headers().get('content-type', '').lower():
 | 
			
		||||
                # Don't run get_text or xpath/css filters on plaintext
 | 
			
		||||
                stripped_text_from_html = html_content
 | 
			
		||||
            else:
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -3,7 +3,7 @@
 | 
			
		|||
 | 
			
		||||
import time
 | 
			
		||||
from flask import url_for, escape
 | 
			
		||||
from . util import live_server_setup
 | 
			
		||||
from . util import live_server_setup, wait_for_all_checks
 | 
			
		||||
import pytest
 | 
			
		||||
jq_support = True
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -436,6 +436,32 @@ def test_ignore_json_order(client, live_server):
 | 
			
		|||
    res = client.get(url_for("form_delete", uuid="all"), follow_redirects=True)
 | 
			
		||||
    assert b'Deleted' in res.data
 | 
			
		||||
 | 
			
		||||
def test_correct_header_detect(client, live_server):
 | 
			
		||||
    
 | 
			
		||||
    # Like in https://github.com/dgtlmoon/changedetection.io/pull/1593
 | 
			
		||||
    # Specify extra html that JSON is sometimes wrapped in - when using Browserless/Puppeteer etc
 | 
			
		||||
    with open("test-datastore/endpoint-content.txt", "w") as f:
 | 
			
		||||
        f.write('<html><body>{"hello" : 123, "world": 123}')
 | 
			
		||||
 | 
			
		||||
    # Add our URL to the import page
 | 
			
		||||
    # Check weird casing is cleaned up and detected also
 | 
			
		||||
    test_url = url_for('test_endpoint', content_type="aPPlication/JSon", uppercase_headers=True, _external=True)
 | 
			
		||||
    res = client.post(
 | 
			
		||||
        url_for("import_page"),
 | 
			
		||||
        data={"urls": test_url},
 | 
			
		||||
        follow_redirects=True
 | 
			
		||||
    )
 | 
			
		||||
    assert b"1 Imported" in res.data
 | 
			
		||||
    wait_for_all_checks(client)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
    res = client.get(url_for("index"))
 | 
			
		||||
    # This will be fixed in #1593
 | 
			
		||||
    assert b'No parsable JSON found in this document' in res.data
 | 
			
		||||
 | 
			
		||||
    res = client.get(url_for("form_delete", uuid="all"), follow_redirects=True)
 | 
			
		||||
    assert b'Deleted' in res.data
 | 
			
		||||
 | 
			
		||||
def test_check_jsonpath_ext_filter(client, live_server):
 | 
			
		||||
    check_json_ext_filter('json:$[?(@.status==Sold)]', client, live_server)
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -119,16 +119,26 @@ def live_server_setup(live_server):
 | 
			
		|||
        status_code = request.args.get('status_code')
 | 
			
		||||
        content = request.args.get('content') or None
 | 
			
		||||
 | 
			
		||||
        # Used to just try to break the header detection
 | 
			
		||||
        uppercase_headers = request.args.get('uppercase_headers')
 | 
			
		||||
 | 
			
		||||
        try:
 | 
			
		||||
            if content is not None:
 | 
			
		||||
                resp = make_response(content, status_code)
 | 
			
		||||
                resp.headers['Content-Type'] = ctype if ctype else 'text/html'
 | 
			
		||||
                if uppercase_headers:
 | 
			
		||||
                    ctype=ctype.upper()
 | 
			
		||||
                    resp.headers['CONTENT-TYPE'] = ctype if ctype else 'text/html'
 | 
			
		||||
                else:
 | 
			
		||||
                    resp.headers['Content-Type'] = ctype if ctype else 'text/html'
 | 
			
		||||
                return resp
 | 
			
		||||
 | 
			
		||||
            # Tried using a global var here but didn't seem to work, so reading from a file instead.
 | 
			
		||||
            with open("test-datastore/endpoint-content.txt", "r") as f:
 | 
			
		||||
                resp = make_response(f.read(), status_code)
 | 
			
		||||
                resp.headers['Content-Type'] = ctype if ctype else 'text/html'
 | 
			
		||||
                if uppercase_headers:
 | 
			
		||||
                    resp.headers['CONTENT-TYPE'] = ctype if ctype else 'text/html'
 | 
			
		||||
                else:
 | 
			
		||||
                    resp.headers['Content-Type'] = ctype if ctype else 'text/html'
 | 
			
		||||
                return resp
 | 
			
		||||
        except FileNotFoundError:
 | 
			
		||||
            return make_response('', status_code)
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Ładowanie…
	
		Reference in New Issue