from asgi_csrf import Errors from bs4 import BeautifulSoup as Soup from datasette.app import Datasette from datasette.utils import allowed_pragmas from .fixtures import ( # noqa app_client, app_client_base_url_prefix, app_client_shorter_time_limit, app_client_two_attached_databases, make_app_client, METADATA, ) from .utils import assert_footer_links, inner_html import copy import json import pathlib import pytest import re import urllib.parse def test_homepage(app_client_two_attached_databases): response = app_client_two_attached_databases.get("/") assert response.status_code == 200 assert "text/html; charset=utf-8" == response.headers["content-type"] # Should have a html lang="en" attribute assert '' in response.text soup = Soup(response.content, "html.parser") assert "Datasette Fixtures" == soup.find("h1").text assert ( "An example SQLite database demonstrating Datasette. Sign in as root user" == soup.select(".metadata-description")[0].text.strip() ) # Should be two attached databases assert [ {"href": "/extra+database", "text": "extra database"}, {"href": "/fixtures", "text": "fixtures"}, ] == [{"href": a["href"], "text": a.text.strip()} for a in soup.select("h2 a")] # Database should show count text and attached tables h2 = soup.select("h2")[0] assert "extra database" == h2.text.strip() counts_p, links_p = h2.find_all_next("p")[:2] assert ( "2 rows in 1 table, 5 rows in 4 hidden tables, 1 view" == counts_p.text.strip() ) # We should only show visible, not hidden tables here: table_links = [ {"href": a["href"], "text": a.text.strip()} for a in links_p.find_all("a") ] assert [ {"href": r"/extra+database/searchable", "text": "searchable"}, {"href": r"/extra+database/searchable_view", "text": "searchable_view"}, ] == table_links @pytest.mark.asyncio @pytest.mark.parametrize("path", ("/", "/-/")) async def test_homepage_alternative_location(path, tmp_path_factory): template_dir = tmp_path_factory.mktemp("templates") (template_dir / "index.html").write_text("Custom homepage", "utf-8") datasette = Datasette(template_dir=str(template_dir)) response = await datasette.client.get(path) assert response.status_code == 200 html = response.text if path == "/": assert html == "Custom homepage" else: assert '' in html @pytest.mark.asyncio async def test_homepage_alternative_redirect(ds_client): response = await ds_client.get("/-") assert response.status_code == 301 @pytest.mark.asyncio async def test_http_head(ds_client): response = await ds_client.head("/") assert response.status_code == 200 @pytest.mark.asyncio async def test_homepage_options(ds_client): response = await ds_client.options("/") assert response.status_code == 200 assert response.text == "ok" @pytest.mark.asyncio async def test_favicon(ds_client): response = await ds_client.get("/favicon.ico") assert response.status_code == 200 assert response.headers["cache-control"] == "max-age=3600, immutable, public" assert int(response.headers["content-length"]) > 100 assert response.headers["content-type"] == "image/png" @pytest.mark.asyncio async def test_static(ds_client): response = await ds_client.get("/-/static/app2.css") assert response.status_code == 404 response = await ds_client.get("/-/static/app.css") assert response.status_code == 200 assert "text/css" == response.headers["content-type"] assert "etag" in response.headers etag = response.headers.get("etag") response = await ds_client.get("/-/static/app.css", headers={"if-none-match": etag}) assert response.status_code == 304 def test_static_mounts(): with make_app_client( static_mounts=[("custom-static", str(pathlib.Path(__file__).parent))] ) as client: response = client.get("/custom-static/test_html.py") assert response.status_code == 200 response = client.get("/custom-static/not_exists.py") assert response.status_code == 404 response = client.get("/custom-static/../LICENSE") assert response.status_code == 404 def test_memory_database_page(): with make_app_client(memory=True) as client: response = client.get("/_memory") assert response.status_code == 200 def test_not_allowed_methods(): with make_app_client(memory=True) as client: for method in ("post", "put", "patch", "delete"): response = client.request(path="/_memory", method=method.upper()) assert response.status_code == 405 @pytest.mark.asyncio async def test_database_page(ds_client): response = await ds_client.get("/fixtures") soup = Soup(response.text, "html.parser") # Should have a for executing SQL assert "Tables', 'sortable', "pk, foreign_key_with_label, foreign_key_with_blank_label, ", ): assert fragment in response.text # And views views_ul = soup.find("h2", string="Views").find_next_sibling("ul") assert views_ul is not None assert [ ("/fixtures/paginated_view", "paginated_view"), ("/fixtures/searchable_view", "searchable_view"), ( "/fixtures/searchable_view_configured_by_metadata", "searchable_view_configured_by_metadata", ), ("/fixtures/simple_view", "simple_view"), ] == sorted([(a["href"], a.text) for a in views_ul.find_all("a")]) # And a list of canned queries queries_ul = soup.find("h2", string="Queries").find_next_sibling("ul") assert queries_ul is not None assert [ ("/fixtures/from_async_hook", "from_async_hook"), ("/fixtures/from_hook", "from_hook"), ("/fixtures/magic_parameters", "magic_parameters"), ("/fixtures/neighborhood_search#fragment-goes-here", "Search neighborhoods"), ("/fixtures/pragma_cache_size", "pragma_cache_size"), ( "/fixtures/~F0~9D~90~9C~F0~9D~90~A2~F0~9D~90~AD~F0~9D~90~A2~F0~9D~90~9E~F0~9D~90~AC", "𝐜𝐢𝐭𝐢𝐞𝐬", ), ] == sorted( [(a["href"], a.text) for a in queries_ul.find_all("a")], key=lambda p: p[0] ) @pytest.mark.asyncio async def test_invalid_custom_sql(ds_client): response = await ds_client.get("/fixtures/-/query?sql=.schema") assert response.status_code == 400 assert "Statement must be a SELECT" in response.text @pytest.mark.asyncio async def test_disallowed_custom_sql_pragma(ds_client): response = await ds_client.get( "/fixtures/-/query?sql=SELECT+*+FROM+pragma_not_on_allow_list('idx52')" ) assert response.status_code == 400 pragmas = ", ".join("pragma_{}()".format(pragma) for pragma in allowed_pragmas) assert ( "Statement contained a disallowed PRAGMA. Allowed pragma functions are {}".format( pragmas ) in response.text ) @pytest.mark.xfail(reason="Sometimes flaky in CI due to timing issues") def test_sql_time_limit(app_client_shorter_time_limit): response = app_client_shorter_time_limit.get( "/fixtures/-/query?sql=select+sleep(0.5)" ) assert 400 == response.status expected_html_fragments = [ """ sql_time_limit_ms """.strip(), 'select sleep(0.5)', ] for expected_html_fragment in expected_html_fragments: assert expected_html_fragment in response.text def test_row_page_does_not_truncate(): with make_app_client(settings={"truncate_cells_html": 5}) as client: response = client.get("/fixtures/facetable/1") assert response.status_code == 200 table = Soup(response.content, "html.parser").find("table") assert table["class"] == ["rows-and-columns"] assert ["Mission"] == [ td.string for td in table.find_all("td", {"class": "col-neighborhood-b352a7"}) ] def test_query_page_truncates(): with make_app_client(settings={"truncate_cells_html": 5}) as client: response = client.get( "/fixtures/-/query?" + urllib.parse.urlencode( { "sql": "select 'this is longer than 5' as a, 'https://example.com/' as b" } ) ) assert response.status_code == 200 table = Soup(response.content, "html.parser").find("table") tds = table.find_all("td") assert [str(td) for td in tds] == [ 'this …', 'http…', ] @pytest.mark.asyncio @pytest.mark.parametrize( "path,expected_classes", [ ("/", ["index"]), ("/fixtures", ["db", "db-fixtures"]), ("/fixtures/-/query?sql=select+1", ["query", "db-fixtures"]), ( "/fixtures/simple_primary_key", ["table", "db-fixtures", "table-simple_primary_key"], ), ( "/fixtures/neighborhood_search", ["query", "db-fixtures", "query-neighborhood_search"], ), ( "/fixtures/table~2Fwith~2Fslashes~2Ecsv", ["table", "db-fixtures", "table-tablewithslashescsv-fa7563"], ), ( "/fixtures/simple_primary_key/1", ["row", "db-fixtures", "table-simple_primary_key"], ), ], ) async def test_css_classes_on_body(ds_client, path, expected_classes): response = await ds_client.get(path) assert response.status_code == 200 classes = re.search(r'', response.text).group(1).split() assert classes == expected_classes templates_considered_re = re.compile(r"") @pytest.mark.asyncio @pytest.mark.parametrize( "path,expected_considered", [ ("/", "*index.html"), ("/fixtures", "database-fixtures.html, *database.html"), ( "/fixtures/simple_primary_key", "table-fixtures-simple_primary_key.html, *table.html", ), ( "/fixtures/table~2Fwith~2Fslashes~2Ecsv", "table-fixtures-tablewithslashescsv-fa7563.html, *table.html", ), ( "/fixtures/simple_primary_key/1", "row-fixtures-simple_primary_key.html, *row.html", ), ], ) async def test_templates_considered(ds_client, path, expected_considered): response = await ds_client.get(path) assert response.status_code == 200 match = templates_considered_re.search(response.text) assert match, "No templates considered comment found" actual_considered = match.group(1) assert actual_considered == expected_considered @pytest.mark.asyncio async def test_row_json_export_link(ds_client): response = await ds_client.get("/fixtures/simple_primary_key/1") assert response.status_code == 200 assert 'json' in response.text @pytest.mark.asyncio async def test_query_json_csv_export_links(ds_client): response = await ds_client.get("/fixtures/-/query?sql=select+1") assert response.status_code == 200 assert 'json' in response.text assert ( 'CSV' in response.text ) @pytest.mark.asyncio async def test_query_parameter_form_fields(ds_client): response = await ds_client.get("/fixtures/-/query?sql=select+:name") assert response.status_code == 200 assert ( 'name ' in response.text ) response2 = await ds_client.get("/fixtures/-/query?sql=select+:name&name=hello") assert response2.status_code == 200 assert ( 'name ' in response2.text ) @pytest.mark.asyncio async def test_row_html_simple_primary_key(ds_client): response = await ds_client.get("/fixtures/simple_primary_key/1") assert response.status_code == 200 table = Soup(response.content, "html.parser").find("table") assert ["id", "content"] == [th.string.strip() for th in table.select("thead th")] assert [ [ '1', 'hello', ] ] == [[str(td) for td in tr.select("td")] for tr in table.select("tbody tr")] @pytest.mark.asyncio async def test_row_html_no_primary_key(ds_client): response = await ds_client.get("/fixtures/no_primary_key/1") assert response.status_code == 200 table = Soup(response.text, "html.parser").find("table") assert ["rowid", "content", "a", "b", "c"] == [ th.string.strip() for th in table.select("thead th") ] expected = [ [ '1', '1', 'a1', 'b1', 'c1', ] ] assert expected == [ [str(td) for td in tr.select("td")] for tr in table.select("tbody tr") ] @pytest.mark.asyncio @pytest.mark.parametrize( "path,expected_text,expected_link", ( ( "/fixtures/facet_cities/1", "6 rows from _city_id in facetable", "/fixtures/facetable?_city_id__exact=1", ), ( "/fixtures/attraction_characteristic/2", "3 rows from characteristic_id in roadside_attraction_characteristics", "/fixtures/roadside_attraction_characteristics?characteristic_id=2", ), ), ) async def test_row_links_from_other_tables( ds_client, path, expected_text, expected_link ): response = await ds_client.get(path) assert response.status_code == 200 soup = Soup(response.text, "html.parser") h2 = soup.find("h2") assert h2.text == "Links from other tables" li = h2.find_next("ul").find("li") text = re.sub(r"\s+", " ", li.text.strip()) assert text == expected_text link = li.find("a")["href"] assert link == expected_link @pytest.mark.asyncio @pytest.mark.parametrize( "path,expected", ( ( "/fixtures/compound_primary_key/a,b", [ [ 'a', 'b', 'c', ] ], ), ( "/fixtures/compound_primary_key/a~2Fb,~2Ec~2Dd", [ [ 'a/b', '.c-d', 'c', ] ], ), ), ) async def test_row_html_compound_primary_key(ds_client, path, expected): response = await ds_client.get(path) assert response.status_code == 200 table = Soup(response.text, "html.parser").find("table") assert ["pk1", "pk2", "content"] == [ th.string.strip() for th in table.select("thead th") ] assert expected == [ [str(td) for td in tr.select("td")] for tr in table.select("tbody tr") ] @pytest.mark.asyncio async def test_index_metadata(ds_client): response = await ds_client.get("/") assert response.status_code == 200 soup = Soup(response.text, "html.parser") assert "Datasette Fixtures" == soup.find("h1").text assert ( 'An example SQLite database demonstrating Datasette. Sign in as root user' == inner_html(soup.find("div", {"class": "metadata-description"})) ) assert_footer_links(soup) @pytest.mark.asyncio async def test_database_metadata(ds_client): response = await ds_client.get("/fixtures") assert response.status_code == 200 soup = Soup(response.text, "html.parser") # Page title should be the default assert "fixtures" == soup.find("h1").text # Description should be custom assert "Test tables description" == inner_html( soup.find("div", {"class": "metadata-description"}) ) # The source/license should be inherited # assert_footer_links(soup) TODO(alex) ensure @pytest.mark.asyncio async def test_database_metadata_with_custom_sql(ds_client): response = await ds_client.get( "/fixtures/-/query?sql=select+*+from+simple_primary_key" ) assert response.status_code == 200 soup = Soup(response.text, "html.parser") # Page title should be the default assert "fixtures" == soup.find("h1").text # Description should be custom assert "Custom SQL query returning" in soup.find("h3").text # The source/license should be inherited # assert_footer_links(soup)TODO(alex) ensure def test_database_download_for_immutable(): with make_app_client(is_immutable=True) as client: assert not client.ds.databases["fixtures"].is_mutable # Regular page should have a download link response = client.get("/fixtures") soup = Soup(response.content, "html.parser") assert len(soup.find_all("a", {"href": re.compile(r"\.db$")})) # Check we can actually download it download_response = client.get("/fixtures.db") assert download_response.status_code == 200 # Check the content-length header exists assert "content-length" in download_response.headers content_length = download_response.headers["content-length"] assert content_length.isdigit() assert int(content_length) > 100 assert "content-disposition" in download_response.headers assert ( download_response.headers["content-disposition"] == 'attachment; filename="fixtures.db"' ) assert download_response.headers["transfer-encoding"] == "chunked" # ETag header should be present and match db.hash assert "etag" in download_response.headers etag = download_response.headers["etag"] assert etag == '"{}"'.format(client.ds.databases["fixtures"].hash) # Try a second download with If-None-Match: current-etag download_response2 = client.get("/fixtures.db", if_none_match=etag) assert download_response2.body == b"" assert download_response2.status == 304 def test_database_download_disallowed_for_mutable(app_client): # Use app_client because we need a file database, not in-memory response = app_client.get("/fixtures") soup = Soup(response.content, "html.parser") assert len(soup.find_all("a", {"href": re.compile(r"\.db$")})) == 0 assert app_client.get("/fixtures.db").status_code == 403 def test_database_download_disallowed_for_memory(): with make_app_client(memory=True) as client: # Memory page should NOT have a download link response = client.get("/_memory") soup = Soup(response.content, "html.parser") assert 0 == len(soup.find_all("a", {"href": re.compile(r"\.db$")})) assert 404 == client.get("/_memory.db").status def test_allow_download_off(): with make_app_client( is_immutable=True, settings={"allow_download": False} ) as client: response = client.get("/fixtures") soup = Soup(response.content, "html.parser") assert not len(soup.find_all("a", {"href": re.compile(r"\.db$")})) # Accessing URL directly should 403 response = client.get("/fixtures.db") assert 403 == response.status def test_allow_sql_off(): with make_app_client(config={"allow_sql": {}}) as client: response = client.get("/fixtures") soup = Soup(response.content, "html.parser") assert not len(soup.find_all("textarea", {"name": "sql"})) # The table page should no longer show "View and edit SQL" response = client.get("/fixtures/sortable") assert b"View and edit SQL" not in response.content @pytest.mark.asyncio @pytest.mark.parametrize("path", ["/404", "/fixtures/404"]) async def test_404(ds_client, path): response = await ds_client.get(path) assert response.status_code == 404 assert ( f' Demonstrating simple like search """.strip() == soup.find("div", {"class": "metadata-description"}).prettify().strip() ) @pytest.mark.asyncio async def test_urlify_custom_queries(ds_client): path = "/fixtures/-/query?" + urllib.parse.urlencode( {"sql": "select ('https://twitter.com/' || 'simonw') as user_url;"} ) response = await ds_client.get(path) assert response.status_code == 200 soup = Soup(response.content, "html.parser") assert ( """ https://twitter.com/simonw """ == soup.find("td", {"class": "col-user_url"}).prettify().strip() ) @pytest.mark.asyncio async def test_show_hide_sql_query(ds_client): path = "/fixtures/-/query?" + urllib.parse.urlencode( {"sql": "select ('https://twitter.com/' || 'simonw') as user_url;"} ) response = await ds_client.get(path) soup = Soup(response.content, "html.parser") span = soup.select(".show-hide-sql")[0] assert span.find("a")["href"].endswith("&_hide_sql=1") assert "(hide)" == span.getText() assert soup.find("textarea") is not None # Now follow the link to hide it response = await ds_client.get(span.find("a")["href"]) soup = Soup(response.content, "html.parser") span = soup.select(".show-hide-sql")[0] assert not span.find("a")["href"].endswith("&_hide_sql=1") assert "(show)" == span.getText() assert soup.find("textarea") is None # The SQL should still be there in a hidden form field hiddens = soup.find("form").select("input[type=hidden]") assert [ ("sql", "select ('https://twitter.com/' || 'simonw') as user_url;"), ("_hide_sql", "1"), ] == [(hidden["name"], hidden["value"]) for hidden in hiddens] @pytest.mark.asyncio async def test_canned_query_with_hide_has_no_hidden_sql(ds_client): # For a canned query the show/hide should NOT have a hidden SQL field # https://github.com/simonw/datasette/issues/1411 response = await ds_client.get("/fixtures/pragma_cache_size?_hide_sql=1") soup = Soup(response.content, "html.parser") hiddens = soup.find("form").select("input[type=hidden]") assert [ ("_hide_sql", "1"), ] == [(hidden["name"], hidden["value"]) for hidden in hiddens] @pytest.mark.parametrize( "hide_sql,querystring,expected_hidden,expected_show_hide_link,expected_show_hide_text", ( (False, "", None, "/_memory/one?_hide_sql=1", "hide"), (False, "?_hide_sql=1", "_hide_sql", "/_memory/one", "show"), (True, "", None, "/_memory/one?_show_sql=1", "show"), (True, "?_show_sql=1", "_show_sql", "/_memory/one", "hide"), ), ) def test_canned_query_show_hide_metadata_option( hide_sql, querystring, expected_hidden, expected_show_hide_link, expected_show_hide_text, ): with make_app_client( config={ "databases": { "_memory": { "queries": { "one": { "sql": "select 1 + 1", "hide_sql": hide_sql, } } } } }, memory=True, ) as client: expected_show_hide_fragment = '({})'.format( expected_show_hide_link, expected_show_hide_text ) response = client.get("/_memory/one" + querystring) html = response.text show_hide_fragment = html.split('')[1].split( "" )[0] assert show_hide_fragment == expected_show_hide_fragment if expected_hidden: assert ( ''.format(expected_hidden) in html ) else: assert '<Binary:\xa07\xa0bytes>' ], [ '<Binary:\xa07\xa0bytes>' ], ['\xa0'], ] assert expected_tds == [ [str(td) for td in tr.select("td")] for tr in table.select("tbody tr") ] @pytest.mark.asyncio @pytest.mark.parametrize( "path,expected_filename", [ ("/fixtures/binary_data/1.blob?_blob_column=data", "binary_data-1-data.blob"), ( "/fixtures/-/query.blob?sql=select+*+from+binary_data&_blob_column=data&_blob_hash=f3088978da8f9aea479ffc7f631370b968d2e855eeb172bea7f6c7a04262bb6d", "data-f30889.blob", ), ], ) async def test_blob_download(ds_client, path, expected_filename): response = await ds_client.get(path) assert response.status_code == 200 assert response.content == b"\x15\x1c\x02\xc7\xad\x05\xfe" assert response.headers["x-content-type-options"] == "nosniff" assert ( response.headers["content-disposition"] == f'attachment; filename="{expected_filename}"' ) assert response.headers["content-type"] == "application/binary" @pytest.mark.asyncio @pytest.mark.parametrize( "path,expected_message", [ ("/fixtures/binary_data/1.blob", "?_blob_column= is required"), ("/fixtures/binary_data/1.blob?_blob_column=foo", "foo is not a valid column"), ( "/fixtures/binary_data/1.blob?_blob_column=data&_blob_hash=x", "Link has expired - the requested binary content has changed or could not be found.", ), ], ) async def test_blob_download_invalid_messages(ds_client, path, expected_message): response = await ds_client.get(path) assert response.status_code == 400 assert expected_message in response.text @pytest.mark.asyncio @pytest.mark.parametrize( "path", [ "/fixtures/-/query?sql=select+*+from+[123_starts_with_digits]", "/fixtures/123_starts_with_digits", ], ) async def test_zero_results(ds_client, path): response = await ds_client.get(path) soup = Soup(response.text, "html.parser") assert 0 == len(soup.select("table")) assert 1 == len(soup.select("p.zero-results")) @pytest.mark.asyncio async def test_query_error(ds_client): response = await ds_client.get("/fixtures/-/query?sql=select+*+from+notatable") html = response.text assert 'no such table: notatable' in html assert 'select * from notatable" in html assert "0 results" not in html def test_config_template_debug_on(): with make_app_client(settings={"template_debug": True}) as client: response = client.get("/fixtures/facetable?_context=1") assert response.status_code == 200 assert response.text.startswith("{") @pytest.mark.asyncio async def test_config_template_debug_off(ds_client): response = await ds_client.get("/fixtures/facetable?_context=1") assert response.status_code == 200 assert not response.text.startswith("{") def test_debug_context_includes_extra_template_vars(): # https://github.com/simonw/datasette/issues/693 with make_app_client(settings={"template_debug": True}) as client: response = client.get("/fixtures/facetable?_context=1") # scope_path is added by PLUGIN1 assert "scope_path" in response.text @pytest.mark.parametrize( "path", [ "/", "/fixtures", "/fixtures/compound_three_primary_keys", "/fixtures/compound_three_primary_keys/a,a,a", "/fixtures/paginated_view", "/fixtures/facetable", "/fixtures/facetable?_facet=state", "/fixtures/-/query?sql=select+1", ], ) @pytest.mark.parametrize("use_prefix", (True, False)) def test_base_url_config(app_client_base_url_prefix, path, use_prefix): client = app_client_base_url_prefix path_to_get = path if use_prefix: path_to_get = "/prefix/" + path.lstrip("/") response = client.get(path_to_get) soup = Soup(response.content, "html.parser") for form in soup.select("form"): assert form["action"].startswith("/prefix") for el in soup.find_all(["a", "link", "script"]): if "href" in el.attrs: href = el["href"] elif "src" in el.attrs: href = el["src"] else: continue # Could be a if ( not href.startswith("#") and href not in { "https://datasette.io/", "https://github.com/simonw/datasette", "https://github.com/simonw/datasette/blob/main/LICENSE", "https://github.com/simonw/datasette/blob/main/tests/fixtures.py", "/login-as-root", # Only used for the latest.datasette.io demo } and not href.startswith("https://plugin-example.datasette.io/") ): # If this has been made absolute it may start http://localhost/ if href.startswith("http://localhost/"): href = href[len("http://localost/") :] assert href.startswith("/prefix/"), json.dumps( { "path": path, "path_to_get": path_to_get, "href_or_src": href, "element_parent": str(el.parent), }, indent=4, default=repr, ) def test_base_url_affects_filter_redirects(app_client_base_url_prefix): path = "/fixtures/binary_data?_filter_column=rowid&_filter_op=exact&_filter_value=1&_sort=rowid" response = app_client_base_url_prefix.get(path) assert response.status_code == 302 assert ( response.headers["location"] == "/prefix/fixtures/binary_data?_sort=rowid&rowid__exact=1" ) def test_base_url_affects_metadata_extra_css_urls(app_client_base_url_prefix): html = app_client_base_url_prefix.get("/").text assert '' in html @pytest.mark.asyncio @pytest.mark.parametrize( "path,expected", [ ( "/fixtures/neighborhood_search", "/fixtures/-/query?sql=%0Aselect+_neighborhood%2C+facet_cities.name%2C+state%0Afrom+facetable%0A++++join+facet_cities%0A++++++++on+facetable._city_id+%3D+facet_cities.id%0Awhere+_neighborhood+like+%27%25%27+%7C%7C+%3Atext+%7C%7C+%27%25%27%0Aorder+by+_neighborhood%3B%0A&text=", ), ( "/fixtures/neighborhood_search?text=ber", "/fixtures/-/query?sql=%0Aselect+_neighborhood%2C+facet_cities.name%2C+state%0Afrom+facetable%0A++++join+facet_cities%0A++++++++on+facetable._city_id+%3D+facet_cities.id%0Awhere+_neighborhood+like+%27%25%27+%7C%7C+%3Atext+%7C%7C+%27%25%27%0Aorder+by+_neighborhood%3B%0A&text=ber", ), ("/fixtures/pragma_cache_size", None), ( # /fixtures/𝐜𝐢𝐭𝐢𝐞𝐬 "/fixtures/~F0~9D~90~9C~F0~9D~90~A2~F0~9D~90~AD~F0~9D~90~A2~F0~9D~90~9E~F0~9D~90~AC", "/fixtures/-/query?sql=select+id%2C+name+from+facet_cities+order+by+id+limit+1%3B", ), ("/fixtures/magic_parameters", None), ], ) async def test_edit_sql_link_on_canned_queries(ds_client, path, expected): response = await ds_client.get(path) assert response.status_code == 200 expected_link = f'Edit SQL' if expected: assert expected_link in response.text else: assert "Edit SQL" not in response.text @pytest.mark.parametrize("permission_allowed", [True, False]) def test_edit_sql_link_not_shown_if_user_lacks_permission(permission_allowed): with make_app_client( config={ "allow_sql": None if permission_allowed else {"id": "not-you"}, "databases": {"fixtures": {"queries": {"simple": "select 1 + 1"}}}, } ) as client: response = client.get("/fixtures/simple") if permission_allowed: assert "Edit SQL" in response.text else: assert "Edit SQL" not in response.text @pytest.mark.asyncio @pytest.mark.parametrize( "actor_id,should_have_links,should_not_have_links", [ (None, None, None), ("test", None, ["/-/permissions"]), ("root", ["/-/permissions", "/-/allow-debug"], None), ], ) async def test_navigation_menu_links( ds_client, actor_id, should_have_links, should_not_have_links ): cookies = {} if actor_id: cookies = {"ds_actor": ds_client.actor_cookie({"id": actor_id})} html = (await ds_client.get("/", cookies=cookies)).text soup = Soup(html, "html.parser") details = soup.find("nav").find("details") if not actor_id: # Should not show a menu assert details is None return # They are logged in: should show a menu assert details is not None # And a logout form assert details.find("form") is not None if should_have_links: for link in should_have_links: assert ( details.find("a", {"href": link}) is not None ), f"{link} expected but missing from nav menu" if should_not_have_links: for link in should_not_have_links: assert ( details.find("a", {"href": link}) is None ), f"{link} found but should not have been in nav menu" @pytest.mark.asyncio async def test_trace_correctly_escaped(ds_client): response = await ds_client.get("/fixtures/-/query?sql=select+'Hello'&_trace=1") assert "select 'Hello" not in response.text assert "select '<h1>Hello" in response.text @pytest.mark.asyncio @pytest.mark.parametrize( "path,expected", ( # Instance index page ("/", "http://localhost/.json"), # Table page ("/fixtures/facetable", "http://localhost/fixtures/facetable.json"), ( "/fixtures/table~2Fwith~2Fslashes~2Ecsv", "http://localhost/fixtures/table~2Fwith~2Fslashes~2Ecsv.json", ), # Row page ( "/fixtures/no_primary_key/1", "http://localhost/fixtures/no_primary_key/1.json", ), # Database index page ( "/fixtures", "http://localhost/fixtures.json", ), # Custom query page ( "/fixtures/-/query?sql=select+*+from+facetable", "http://localhost/fixtures/-/query.json?sql=select+*+from+facetable", ), # Canned query page ( "/fixtures/neighborhood_search?text=town", "http://localhost/fixtures/neighborhood_search.json?text=town", ), # /-/ pages ( "/-/plugins", "http://localhost/-/plugins.json", ), ), ) async def test_alternate_url_json(ds_client, path, expected): response = await ds_client.get(path) assert response.status_code == 200 link = response.headers["link"] assert link == '<{}>; rel="alternate"; type="application/json+datasette"'.format( expected ) assert ( ''.format( expected ) in response.text ) @pytest.mark.asyncio @pytest.mark.parametrize( "path", ("/-/patterns", "/-/messages", "/-/allow-debug", "/fixtures.db"), ) async def test_no_alternate_url_json(ds_client, path): response = await ds_client.get(path) assert "link" not in response.headers assert ( '
pk, foreign_key_with_label, foreign_key_with_blank_label, ", ): assert fragment in response.text # And views views_ul = soup.find("h2", string="Views").find_next_sibling("ul") assert views_ul is not None assert [ ("/fixtures/paginated_view", "paginated_view"), ("/fixtures/searchable_view", "searchable_view"), ( "/fixtures/searchable_view_configured_by_metadata", "searchable_view_configured_by_metadata", ), ("/fixtures/simple_view", "simple_view"), ] == sorted([(a["href"], a.text) for a in views_ul.find_all("a")]) # And a list of canned queries queries_ul = soup.find("h2", string="Queries").find_next_sibling("ul") assert queries_ul is not None assert [ ("/fixtures/from_async_hook", "from_async_hook"), ("/fixtures/from_hook", "from_hook"), ("/fixtures/magic_parameters", "magic_parameters"), ("/fixtures/neighborhood_search#fragment-goes-here", "Search neighborhoods"), ("/fixtures/pragma_cache_size", "pragma_cache_size"), ( "/fixtures/~F0~9D~90~9C~F0~9D~90~A2~F0~9D~90~AD~F0~9D~90~A2~F0~9D~90~9E~F0~9D~90~AC", "𝐜𝐢𝐭𝐢𝐞𝐬", ), ] == sorted( [(a["href"], a.text) for a in queries_ul.find_all("a")], key=lambda p: p[0] ) @pytest.mark.asyncio async def test_invalid_custom_sql(ds_client): response = await ds_client.get("/fixtures/-/query?sql=.schema") assert response.status_code == 400 assert "Statement must be a SELECT" in response.text @pytest.mark.asyncio async def test_disallowed_custom_sql_pragma(ds_client): response = await ds_client.get( "/fixtures/-/query?sql=SELECT+*+FROM+pragma_not_on_allow_list('idx52')" ) assert response.status_code == 400 pragmas = ", ".join("pragma_{}()".format(pragma) for pragma in allowed_pragmas) assert ( "Statement contained a disallowed PRAGMA. Allowed pragma functions are {}".format( pragmas ) in response.text ) @pytest.mark.xfail(reason="Sometimes flaky in CI due to timing issues") def test_sql_time_limit(app_client_shorter_time_limit): response = app_client_shorter_time_limit.get( "/fixtures/-/query?sql=select+sleep(0.5)" ) assert 400 == response.status expected_html_fragments = [ """ sql_time_limit_ms """.strip(), 'select sleep(0.5)', ] for expected_html_fragment in expected_html_fragments: assert expected_html_fragment in response.text def test_row_page_does_not_truncate(): with make_app_client(settings={"truncate_cells_html": 5}) as client: response = client.get("/fixtures/facetable/1") assert response.status_code == 200 table = Soup(response.content, "html.parser").find("table") assert table["class"] == ["rows-and-columns"] assert ["Mission"] == [ td.string for td in table.find_all("td", {"class": "col-neighborhood-b352a7"}) ] def test_query_page_truncates(): with make_app_client(settings={"truncate_cells_html": 5}) as client: response = client.get( "/fixtures/-/query?" + urllib.parse.urlencode( { "sql": "select 'this is longer than 5' as a, 'https://example.com/' as b" } ) ) assert response.status_code == 200 table = Soup(response.content, "html.parser").find("table") tds = table.find_all("td") assert [str(td) for td in tds] == [ 'this …', 'http…', ] @pytest.mark.asyncio @pytest.mark.parametrize( "path,expected_classes", [ ("/", ["index"]), ("/fixtures", ["db", "db-fixtures"]), ("/fixtures/-/query?sql=select+1", ["query", "db-fixtures"]), ( "/fixtures/simple_primary_key", ["table", "db-fixtures", "table-simple_primary_key"], ), ( "/fixtures/neighborhood_search", ["query", "db-fixtures", "query-neighborhood_search"], ), ( "/fixtures/table~2Fwith~2Fslashes~2Ecsv", ["table", "db-fixtures", "table-tablewithslashescsv-fa7563"], ), ( "/fixtures/simple_primary_key/1", ["row", "db-fixtures", "table-simple_primary_key"], ), ], ) async def test_css_classes_on_body(ds_client, path, expected_classes): response = await ds_client.get(path) assert response.status_code == 200 classes = re.search(r'', response.text).group(1).split() assert classes == expected_classes templates_considered_re = re.compile(r"") @pytest.mark.asyncio @pytest.mark.parametrize( "path,expected_considered", [ ("/", "*index.html"), ("/fixtures", "database-fixtures.html, *database.html"), ( "/fixtures/simple_primary_key", "table-fixtures-simple_primary_key.html, *table.html", ), ( "/fixtures/table~2Fwith~2Fslashes~2Ecsv", "table-fixtures-tablewithslashescsv-fa7563.html, *table.html", ), ( "/fixtures/simple_primary_key/1", "row-fixtures-simple_primary_key.html, *row.html", ), ], ) async def test_templates_considered(ds_client, path, expected_considered): response = await ds_client.get(path) assert response.status_code == 200 match = templates_considered_re.search(response.text) assert match, "No templates considered comment found" actual_considered = match.group(1) assert actual_considered == expected_considered @pytest.mark.asyncio async def test_row_json_export_link(ds_client): response = await ds_client.get("/fixtures/simple_primary_key/1") assert response.status_code == 200 assert 'json' in response.text @pytest.mark.asyncio async def test_query_json_csv_export_links(ds_client): response = await ds_client.get("/fixtures/-/query?sql=select+1") assert response.status_code == 200 assert 'json' in response.text assert ( 'CSV' in response.text ) @pytest.mark.asyncio async def test_query_parameter_form_fields(ds_client): response = await ds_client.get("/fixtures/-/query?sql=select+:name") assert response.status_code == 200 assert ( 'name ' in response.text ) response2 = await ds_client.get("/fixtures/-/query?sql=select+:name&name=hello") assert response2.status_code == 200 assert ( 'name ' in response2.text ) @pytest.mark.asyncio async def test_row_html_simple_primary_key(ds_client): response = await ds_client.get("/fixtures/simple_primary_key/1") assert response.status_code == 200 table = Soup(response.content, "html.parser").find("table") assert ["id", "content"] == [th.string.strip() for th in table.select("thead th")] assert [ [ '1', 'hello', ] ] == [[str(td) for td in tr.select("td")] for tr in table.select("tbody tr")] @pytest.mark.asyncio async def test_row_html_no_primary_key(ds_client): response = await ds_client.get("/fixtures/no_primary_key/1") assert response.status_code == 200 table = Soup(response.text, "html.parser").find("table") assert ["rowid", "content", "a", "b", "c"] == [ th.string.strip() for th in table.select("thead th") ] expected = [ [ '1', '1', 'a1', 'b1', 'c1', ] ] assert expected == [ [str(td) for td in tr.select("td")] for tr in table.select("tbody tr") ] @pytest.mark.asyncio @pytest.mark.parametrize( "path,expected_text,expected_link", ( ( "/fixtures/facet_cities/1", "6 rows from _city_id in facetable", "/fixtures/facetable?_city_id__exact=1", ), ( "/fixtures/attraction_characteristic/2", "3 rows from characteristic_id in roadside_attraction_characteristics", "/fixtures/roadside_attraction_characteristics?characteristic_id=2", ), ), ) async def test_row_links_from_other_tables( ds_client, path, expected_text, expected_link ): response = await ds_client.get(path) assert response.status_code == 200 soup = Soup(response.text, "html.parser") h2 = soup.find("h2") assert h2.text == "Links from other tables" li = h2.find_next("ul").find("li") text = re.sub(r"\s+", " ", li.text.strip()) assert text == expected_text link = li.find("a")["href"] assert link == expected_link @pytest.mark.asyncio @pytest.mark.parametrize( "path,expected", ( ( "/fixtures/compound_primary_key/a,b", [ [ 'a', 'b', 'c', ] ], ), ( "/fixtures/compound_primary_key/a~2Fb,~2Ec~2Dd", [ [ 'a/b', '.c-d', 'c', ] ], ), ), ) async def test_row_html_compound_primary_key(ds_client, path, expected): response = await ds_client.get(path) assert response.status_code == 200 table = Soup(response.text, "html.parser").find("table") assert ["pk1", "pk2", "content"] == [ th.string.strip() for th in table.select("thead th") ] assert expected == [ [str(td) for td in tr.select("td")] for tr in table.select("tbody tr") ] @pytest.mark.asyncio async def test_index_metadata(ds_client): response = await ds_client.get("/") assert response.status_code == 200 soup = Soup(response.text, "html.parser") assert "Datasette Fixtures" == soup.find("h1").text assert ( 'An example SQLite database demonstrating Datasette. Sign in as root user' == inner_html(soup.find("div", {"class": "metadata-description"})) ) assert_footer_links(soup) @pytest.mark.asyncio async def test_database_metadata(ds_client): response = await ds_client.get("/fixtures") assert response.status_code == 200 soup = Soup(response.text, "html.parser") # Page title should be the default assert "fixtures" == soup.find("h1").text # Description should be custom assert "Test tables description" == inner_html( soup.find("div", {"class": "metadata-description"}) ) # The source/license should be inherited # assert_footer_links(soup) TODO(alex) ensure @pytest.mark.asyncio async def test_database_metadata_with_custom_sql(ds_client): response = await ds_client.get( "/fixtures/-/query?sql=select+*+from+simple_primary_key" ) assert response.status_code == 200 soup = Soup(response.text, "html.parser") # Page title should be the default assert "fixtures" == soup.find("h1").text # Description should be custom assert "Custom SQL query returning" in soup.find("h3").text # The source/license should be inherited # assert_footer_links(soup)TODO(alex) ensure def test_database_download_for_immutable(): with make_app_client(is_immutable=True) as client: assert not client.ds.databases["fixtures"].is_mutable # Regular page should have a download link response = client.get("/fixtures") soup = Soup(response.content, "html.parser") assert len(soup.find_all("a", {"href": re.compile(r"\.db$")})) # Check we can actually download it download_response = client.get("/fixtures.db") assert download_response.status_code == 200 # Check the content-length header exists assert "content-length" in download_response.headers content_length = download_response.headers["content-length"] assert content_length.isdigit() assert int(content_length) > 100 assert "content-disposition" in download_response.headers assert ( download_response.headers["content-disposition"] == 'attachment; filename="fixtures.db"' ) assert download_response.headers["transfer-encoding"] == "chunked" # ETag header should be present and match db.hash assert "etag" in download_response.headers etag = download_response.headers["etag"] assert etag == '"{}"'.format(client.ds.databases["fixtures"].hash) # Try a second download with If-None-Match: current-etag download_response2 = client.get("/fixtures.db", if_none_match=etag) assert download_response2.body == b"" assert download_response2.status == 304 def test_database_download_disallowed_for_mutable(app_client): # Use app_client because we need a file database, not in-memory response = app_client.get("/fixtures") soup = Soup(response.content, "html.parser") assert len(soup.find_all("a", {"href": re.compile(r"\.db$")})) == 0 assert app_client.get("/fixtures.db").status_code == 403 def test_database_download_disallowed_for_memory(): with make_app_client(memory=True) as client: # Memory page should NOT have a download link response = client.get("/_memory") soup = Soup(response.content, "html.parser") assert 0 == len(soup.find_all("a", {"href": re.compile(r"\.db$")})) assert 404 == client.get("/_memory.db").status def test_allow_download_off(): with make_app_client( is_immutable=True, settings={"allow_download": False} ) as client: response = client.get("/fixtures") soup = Soup(response.content, "html.parser") assert not len(soup.find_all("a", {"href": re.compile(r"\.db$")})) # Accessing URL directly should 403 response = client.get("/fixtures.db") assert 403 == response.status def test_allow_sql_off(): with make_app_client(config={"allow_sql": {}}) as client: response = client.get("/fixtures") soup = Soup(response.content, "html.parser") assert not len(soup.find_all("textarea", {"name": "sql"})) # The table page should no longer show "View and edit SQL" response = client.get("/fixtures/sortable") assert b"View and edit SQL" not in response.content @pytest.mark.asyncio @pytest.mark.parametrize("path", ["/404", "/fixtures/404"]) async def test_404(ds_client, path): response = await ds_client.get(path) assert response.status_code == 404 assert ( f' Demonstrating simple like search """.strip() == soup.find("div", {"class": "metadata-description"}).prettify().strip() ) @pytest.mark.asyncio async def test_urlify_custom_queries(ds_client): path = "/fixtures/-/query?" + urllib.parse.urlencode( {"sql": "select ('https://twitter.com/' || 'simonw') as user_url;"} ) response = await ds_client.get(path) assert response.status_code == 200 soup = Soup(response.content, "html.parser") assert ( """ https://twitter.com/simonw """ == soup.find("td", {"class": "col-user_url"}).prettify().strip() ) @pytest.mark.asyncio async def test_show_hide_sql_query(ds_client): path = "/fixtures/-/query?" + urllib.parse.urlencode( {"sql": "select ('https://twitter.com/' || 'simonw') as user_url;"} ) response = await ds_client.get(path) soup = Soup(response.content, "html.parser") span = soup.select(".show-hide-sql")[0] assert span.find("a")["href"].endswith("&_hide_sql=1") assert "(hide)" == span.getText() assert soup.find("textarea") is not None # Now follow the link to hide it response = await ds_client.get(span.find("a")["href"]) soup = Soup(response.content, "html.parser") span = soup.select(".show-hide-sql")[0] assert not span.find("a")["href"].endswith("&_hide_sql=1") assert "(show)" == span.getText() assert soup.find("textarea") is None # The SQL should still be there in a hidden form field hiddens = soup.find("form").select("input[type=hidden]") assert [ ("sql", "select ('https://twitter.com/' || 'simonw') as user_url;"), ("_hide_sql", "1"), ] == [(hidden["name"], hidden["value"]) for hidden in hiddens] @pytest.mark.asyncio async def test_canned_query_with_hide_has_no_hidden_sql(ds_client): # For a canned query the show/hide should NOT have a hidden SQL field # https://github.com/simonw/datasette/issues/1411 response = await ds_client.get("/fixtures/pragma_cache_size?_hide_sql=1") soup = Soup(response.content, "html.parser") hiddens = soup.find("form").select("input[type=hidden]") assert [ ("_hide_sql", "1"), ] == [(hidden["name"], hidden["value"]) for hidden in hiddens] @pytest.mark.parametrize( "hide_sql,querystring,expected_hidden,expected_show_hide_link,expected_show_hide_text", ( (False, "", None, "/_memory/one?_hide_sql=1", "hide"), (False, "?_hide_sql=1", "_hide_sql", "/_memory/one", "show"), (True, "", None, "/_memory/one?_show_sql=1", "show"), (True, "?_show_sql=1", "_show_sql", "/_memory/one", "hide"), ), ) def test_canned_query_show_hide_metadata_option( hide_sql, querystring, expected_hidden, expected_show_hide_link, expected_show_hide_text, ): with make_app_client( config={ "databases": { "_memory": { "queries": { "one": { "sql": "select 1 + 1", "hide_sql": hide_sql, } } } } }, memory=True, ) as client: expected_show_hide_fragment = '({})'.format( expected_show_hide_link, expected_show_hide_text ) response = client.get("/_memory/one" + querystring) html = response.text show_hide_fragment = html.split('')[1].split( "" )[0] assert show_hide_fragment == expected_show_hide_fragment if expected_hidden: assert ( ''.format(expected_hidden) in html ) else: assert '<Binary:\xa07\xa0bytes>' ], [ '<Binary:\xa07\xa0bytes>' ], ['\xa0'], ] assert expected_tds == [ [str(td) for td in tr.select("td")] for tr in table.select("tbody tr") ] @pytest.mark.asyncio @pytest.mark.parametrize( "path,expected_filename", [ ("/fixtures/binary_data/1.blob?_blob_column=data", "binary_data-1-data.blob"), ( "/fixtures/-/query.blob?sql=select+*+from+binary_data&_blob_column=data&_blob_hash=f3088978da8f9aea479ffc7f631370b968d2e855eeb172bea7f6c7a04262bb6d", "data-f30889.blob", ), ], ) async def test_blob_download(ds_client, path, expected_filename): response = await ds_client.get(path) assert response.status_code == 200 assert response.content == b"\x15\x1c\x02\xc7\xad\x05\xfe" assert response.headers["x-content-type-options"] == "nosniff" assert ( response.headers["content-disposition"] == f'attachment; filename="{expected_filename}"' ) assert response.headers["content-type"] == "application/binary" @pytest.mark.asyncio @pytest.mark.parametrize( "path,expected_message", [ ("/fixtures/binary_data/1.blob", "?_blob_column= is required"), ("/fixtures/binary_data/1.blob?_blob_column=foo", "foo is not a valid column"), ( "/fixtures/binary_data/1.blob?_blob_column=data&_blob_hash=x", "Link has expired - the requested binary content has changed or could not be found.", ), ], ) async def test_blob_download_invalid_messages(ds_client, path, expected_message): response = await ds_client.get(path) assert response.status_code == 400 assert expected_message in response.text @pytest.mark.asyncio @pytest.mark.parametrize( "path", [ "/fixtures/-/query?sql=select+*+from+[123_starts_with_digits]", "/fixtures/123_starts_with_digits", ], ) async def test_zero_results(ds_client, path): response = await ds_client.get(path) soup = Soup(response.text, "html.parser") assert 0 == len(soup.select("table")) assert 1 == len(soup.select("p.zero-results")) @pytest.mark.asyncio async def test_query_error(ds_client): response = await ds_client.get("/fixtures/-/query?sql=select+*+from+notatable") html = response.text assert 'no such table: notatable' in html assert 'select * from notatable" in html assert "0 results" not in html def test_config_template_debug_on(): with make_app_client(settings={"template_debug": True}) as client: response = client.get("/fixtures/facetable?_context=1") assert response.status_code == 200 assert response.text.startswith("{") @pytest.mark.asyncio async def test_config_template_debug_off(ds_client): response = await ds_client.get("/fixtures/facetable?_context=1") assert response.status_code == 200 assert not response.text.startswith("{") def test_debug_context_includes_extra_template_vars(): # https://github.com/simonw/datasette/issues/693 with make_app_client(settings={"template_debug": True}) as client: response = client.get("/fixtures/facetable?_context=1") # scope_path is added by PLUGIN1 assert "scope_path" in response.text @pytest.mark.parametrize( "path", [ "/", "/fixtures", "/fixtures/compound_three_primary_keys", "/fixtures/compound_three_primary_keys/a,a,a", "/fixtures/paginated_view", "/fixtures/facetable", "/fixtures/facetable?_facet=state", "/fixtures/-/query?sql=select+1", ], ) @pytest.mark.parametrize("use_prefix", (True, False)) def test_base_url_config(app_client_base_url_prefix, path, use_prefix): client = app_client_base_url_prefix path_to_get = path if use_prefix: path_to_get = "/prefix/" + path.lstrip("/") response = client.get(path_to_get) soup = Soup(response.content, "html.parser") for form in soup.select("form"): assert form["action"].startswith("/prefix") for el in soup.find_all(["a", "link", "script"]): if "href" in el.attrs: href = el["href"] elif "src" in el.attrs: href = el["src"] else: continue # Could be a if ( not href.startswith("#") and href not in { "https://datasette.io/", "https://github.com/simonw/datasette", "https://github.com/simonw/datasette/blob/main/LICENSE", "https://github.com/simonw/datasette/blob/main/tests/fixtures.py", "/login-as-root", # Only used for the latest.datasette.io demo } and not href.startswith("https://plugin-example.datasette.io/") ): # If this has been made absolute it may start http://localhost/ if href.startswith("http://localhost/"): href = href[len("http://localost/") :] assert href.startswith("/prefix/"), json.dumps( { "path": path, "path_to_get": path_to_get, "href_or_src": href, "element_parent": str(el.parent), }, indent=4, default=repr, ) def test_base_url_affects_filter_redirects(app_client_base_url_prefix): path = "/fixtures/binary_data?_filter_column=rowid&_filter_op=exact&_filter_value=1&_sort=rowid" response = app_client_base_url_prefix.get(path) assert response.status_code == 302 assert ( response.headers["location"] == "/prefix/fixtures/binary_data?_sort=rowid&rowid__exact=1" ) def test_base_url_affects_metadata_extra_css_urls(app_client_base_url_prefix): html = app_client_base_url_prefix.get("/").text assert '' in html @pytest.mark.asyncio @pytest.mark.parametrize( "path,expected", [ ( "/fixtures/neighborhood_search", "/fixtures/-/query?sql=%0Aselect+_neighborhood%2C+facet_cities.name%2C+state%0Afrom+facetable%0A++++join+facet_cities%0A++++++++on+facetable._city_id+%3D+facet_cities.id%0Awhere+_neighborhood+like+%27%25%27+%7C%7C+%3Atext+%7C%7C+%27%25%27%0Aorder+by+_neighborhood%3B%0A&text=", ), ( "/fixtures/neighborhood_search?text=ber", "/fixtures/-/query?sql=%0Aselect+_neighborhood%2C+facet_cities.name%2C+state%0Afrom+facetable%0A++++join+facet_cities%0A++++++++on+facetable._city_id+%3D+facet_cities.id%0Awhere+_neighborhood+like+%27%25%27+%7C%7C+%3Atext+%7C%7C+%27%25%27%0Aorder+by+_neighborhood%3B%0A&text=ber", ), ("/fixtures/pragma_cache_size", None), ( # /fixtures/𝐜𝐢𝐭𝐢𝐞𝐬 "/fixtures/~F0~9D~90~9C~F0~9D~90~A2~F0~9D~90~AD~F0~9D~90~A2~F0~9D~90~9E~F0~9D~90~AC", "/fixtures/-/query?sql=select+id%2C+name+from+facet_cities+order+by+id+limit+1%3B", ), ("/fixtures/magic_parameters", None), ], ) async def test_edit_sql_link_on_canned_queries(ds_client, path, expected): response = await ds_client.get(path) assert response.status_code == 200 expected_link = f'Edit SQL' if expected: assert expected_link in response.text else: assert "Edit SQL" not in response.text @pytest.mark.parametrize("permission_allowed", [True, False]) def test_edit_sql_link_not_shown_if_user_lacks_permission(permission_allowed): with make_app_client( config={ "allow_sql": None if permission_allowed else {"id": "not-you"}, "databases": {"fixtures": {"queries": {"simple": "select 1 + 1"}}}, } ) as client: response = client.get("/fixtures/simple") if permission_allowed: assert "Edit SQL" in response.text else: assert "Edit SQL" not in response.text @pytest.mark.asyncio @pytest.mark.parametrize( "actor_id,should_have_links,should_not_have_links", [ (None, None, None), ("test", None, ["/-/permissions"]), ("root", ["/-/permissions", "/-/allow-debug"], None), ], ) async def test_navigation_menu_links( ds_client, actor_id, should_have_links, should_not_have_links ): cookies = {} if actor_id: cookies = {"ds_actor": ds_client.actor_cookie({"id": actor_id})} html = (await ds_client.get("/", cookies=cookies)).text soup = Soup(html, "html.parser") details = soup.find("nav").find("details") if not actor_id: # Should not show a menu assert details is None return # They are logged in: should show a menu assert details is not None # And a logout form assert details.find("form") is not None if should_have_links: for link in should_have_links: assert ( details.find("a", {"href": link}) is not None ), f"{link} expected but missing from nav menu" if should_not_have_links: for link in should_not_have_links: assert ( details.find("a", {"href": link}) is None ), f"{link} found but should not have been in nav menu" @pytest.mark.asyncio async def test_trace_correctly_escaped(ds_client): response = await ds_client.get("/fixtures/-/query?sql=select+'Hello'&_trace=1") assert "select 'Hello" not in response.text assert "select '<h1>Hello" in response.text @pytest.mark.asyncio @pytest.mark.parametrize( "path,expected", ( # Instance index page ("/", "http://localhost/.json"), # Table page ("/fixtures/facetable", "http://localhost/fixtures/facetable.json"), ( "/fixtures/table~2Fwith~2Fslashes~2Ecsv", "http://localhost/fixtures/table~2Fwith~2Fslashes~2Ecsv.json", ), # Row page ( "/fixtures/no_primary_key/1", "http://localhost/fixtures/no_primary_key/1.json", ), # Database index page ( "/fixtures", "http://localhost/fixtures.json", ), # Custom query page ( "/fixtures/-/query?sql=select+*+from+facetable", "http://localhost/fixtures/-/query.json?sql=select+*+from+facetable", ), # Canned query page ( "/fixtures/neighborhood_search?text=town", "http://localhost/fixtures/neighborhood_search.json?text=town", ), # /-/ pages ( "/-/plugins", "http://localhost/-/plugins.json", ), ), ) async def test_alternate_url_json(ds_client, path, expected): response = await ds_client.get(path) assert response.status_code == 200 link = response.headers["link"] assert link == '<{}>; rel="alternate"; type="application/json+datasette"'.format( expected ) assert ( ''.format( expected ) in response.text ) @pytest.mark.asyncio @pytest.mark.parametrize( "path", ("/-/patterns", "/-/messages", "/-/allow-debug", "/fixtures.db"), ) async def test_no_alternate_url_json(ds_client, path): response = await ds_client.get(path) assert "link" not in response.headers assert ( '
no such table: notatable
{") @pytest.mark.asyncio async def test_config_template_debug_off(ds_client): response = await ds_client.get("/fixtures/facetable?_context=1") assert response.status_code == 200 assert not response.text.startswith("{") def test_debug_context_includes_extra_template_vars(): # https://github.com/simonw/datasette/issues/693 with make_app_client(settings={"template_debug": True}) as client: response = client.get("/fixtures/facetable?_context=1") # scope_path is added by PLUGIN1 assert "scope_path" in response.text @pytest.mark.parametrize( "path", [ "/", "/fixtures", "/fixtures/compound_three_primary_keys", "/fixtures/compound_three_primary_keys/a,a,a", "/fixtures/paginated_view", "/fixtures/facetable", "/fixtures/facetable?_facet=state", "/fixtures/-/query?sql=select+1", ], ) @pytest.mark.parametrize("use_prefix", (True, False)) def test_base_url_config(app_client_base_url_prefix, path, use_prefix): client = app_client_base_url_prefix path_to_get = path if use_prefix: path_to_get = "/prefix/" + path.lstrip("/") response = client.get(path_to_get) soup = Soup(response.content, "html.parser") for form in soup.select("form"): assert form["action"].startswith("/prefix") for el in soup.find_all(["a", "link", "script"]): if "href" in el.attrs: href = el["href"] elif "src" in el.attrs: href = el["src"] else: continue # Could be a if ( not href.startswith("#") and href not in { "https://datasette.io/", "https://github.com/simonw/datasette", "https://github.com/simonw/datasette/blob/main/LICENSE", "https://github.com/simonw/datasette/blob/main/tests/fixtures.py", "/login-as-root", # Only used for the latest.datasette.io demo } and not href.startswith("https://plugin-example.datasette.io/") ): # If this has been made absolute it may start http://localhost/ if href.startswith("http://localhost/"): href = href[len("http://localost/") :] assert href.startswith("/prefix/"), json.dumps( { "path": path, "path_to_get": path_to_get, "href_or_src": href, "element_parent": str(el.parent), }, indent=4, default=repr, ) def test_base_url_affects_filter_redirects(app_client_base_url_prefix): path = "/fixtures/binary_data?_filter_column=rowid&_filter_op=exact&_filter_value=1&_sort=rowid" response = app_client_base_url_prefix.get(path) assert response.status_code == 302 assert ( response.headers["location"] == "/prefix/fixtures/binary_data?_sort=rowid&rowid__exact=1" ) def test_base_url_affects_metadata_extra_css_urls(app_client_base_url_prefix): html = app_client_base_url_prefix.get("/").text assert '' in html @pytest.mark.asyncio @pytest.mark.parametrize( "path,expected", [ ( "/fixtures/neighborhood_search", "/fixtures/-/query?sql=%0Aselect+_neighborhood%2C+facet_cities.name%2C+state%0Afrom+facetable%0A++++join+facet_cities%0A++++++++on+facetable._city_id+%3D+facet_cities.id%0Awhere+_neighborhood+like+%27%25%27+%7C%7C+%3Atext+%7C%7C+%27%25%27%0Aorder+by+_neighborhood%3B%0A&text=", ), ( "/fixtures/neighborhood_search?text=ber", "/fixtures/-/query?sql=%0Aselect+_neighborhood%2C+facet_cities.name%2C+state%0Afrom+facetable%0A++++join+facet_cities%0A++++++++on+facetable._city_id+%3D+facet_cities.id%0Awhere+_neighborhood+like+%27%25%27+%7C%7C+%3Atext+%7C%7C+%27%25%27%0Aorder+by+_neighborhood%3B%0A&text=ber", ), ("/fixtures/pragma_cache_size", None), ( # /fixtures/𝐜𝐢𝐭𝐢𝐞𝐬 "/fixtures/~F0~9D~90~9C~F0~9D~90~A2~F0~9D~90~AD~F0~9D~90~A2~F0~9D~90~9E~F0~9D~90~AC", "/fixtures/-/query?sql=select+id%2C+name+from+facet_cities+order+by+id+limit+1%3B", ), ("/fixtures/magic_parameters", None), ], ) async def test_edit_sql_link_on_canned_queries(ds_client, path, expected): response = await ds_client.get(path) assert response.status_code == 200 expected_link = f'Edit SQL' if expected: assert expected_link in response.text else: assert "Edit SQL" not in response.text @pytest.mark.parametrize("permission_allowed", [True, False]) def test_edit_sql_link_not_shown_if_user_lacks_permission(permission_allowed): with make_app_client( config={ "allow_sql": None if permission_allowed else {"id": "not-you"}, "databases": {"fixtures": {"queries": {"simple": "select 1 + 1"}}}, } ) as client: response = client.get("/fixtures/simple") if permission_allowed: assert "Edit SQL" in response.text else: assert "Edit SQL" not in response.text @pytest.mark.asyncio @pytest.mark.parametrize( "actor_id,should_have_links,should_not_have_links", [ (None, None, None), ("test", None, ["/-/permissions"]), ("root", ["/-/permissions", "/-/allow-debug"], None), ], ) async def test_navigation_menu_links( ds_client, actor_id, should_have_links, should_not_have_links ): cookies = {} if actor_id: cookies = {"ds_actor": ds_client.actor_cookie({"id": actor_id})} html = (await ds_client.get("/", cookies=cookies)).text soup = Soup(html, "html.parser") details = soup.find("nav").find("details") if not actor_id: # Should not show a menu assert details is None return # They are logged in: should show a menu assert details is not None # And a logout form assert details.find("form") is not None if should_have_links: for link in should_have_links: assert ( details.find("a", {"href": link}) is not None ), f"{link} expected but missing from nav menu" if should_not_have_links: for link in should_not_have_links: assert ( details.find("a", {"href": link}) is None ), f"{link} found but should not have been in nav menu" @pytest.mark.asyncio async def test_trace_correctly_escaped(ds_client): response = await ds_client.get("/fixtures/-/query?sql=select+'Hello'&_trace=1") assert "select 'Hello" not in response.text assert "select '<h1>Hello" in response.text @pytest.mark.asyncio @pytest.mark.parametrize( "path,expected", ( # Instance index page ("/", "http://localhost/.json"), # Table page ("/fixtures/facetable", "http://localhost/fixtures/facetable.json"), ( "/fixtures/table~2Fwith~2Fslashes~2Ecsv", "http://localhost/fixtures/table~2Fwith~2Fslashes~2Ecsv.json", ), # Row page ( "/fixtures/no_primary_key/1", "http://localhost/fixtures/no_primary_key/1.json", ), # Database index page ( "/fixtures", "http://localhost/fixtures.json", ), # Custom query page ( "/fixtures/-/query?sql=select+*+from+facetable", "http://localhost/fixtures/-/query.json?sql=select+*+from+facetable", ), # Canned query page ( "/fixtures/neighborhood_search?text=town", "http://localhost/fixtures/neighborhood_search.json?text=town", ), # /-/ pages ( "/-/plugins", "http://localhost/-/plugins.json", ), ), ) async def test_alternate_url_json(ds_client, path, expected): response = await ds_client.get(path) assert response.status_code == 200 link = response.headers["link"] assert link == '<{}>; rel="alternate"; type="application/json+datasette"'.format( expected ) assert ( ''.format( expected ) in response.text ) @pytest.mark.asyncio @pytest.mark.parametrize( "path", ("/-/patterns", "/-/messages", "/-/allow-debug", "/fixtures.db"), ) async def test_no_alternate_url_json(ds_client, path): response = await ds_client.get(path) assert "link" not in response.headers assert ( '
{") def test_debug_context_includes_extra_template_vars(): # https://github.com/simonw/datasette/issues/693 with make_app_client(settings={"template_debug": True}) as client: response = client.get("/fixtures/facetable?_context=1") # scope_path is added by PLUGIN1 assert "scope_path" in response.text @pytest.mark.parametrize( "path", [ "/", "/fixtures", "/fixtures/compound_three_primary_keys", "/fixtures/compound_three_primary_keys/a,a,a", "/fixtures/paginated_view", "/fixtures/facetable", "/fixtures/facetable?_facet=state", "/fixtures/-/query?sql=select+1", ], ) @pytest.mark.parametrize("use_prefix", (True, False)) def test_base_url_config(app_client_base_url_prefix, path, use_prefix): client = app_client_base_url_prefix path_to_get = path if use_prefix: path_to_get = "/prefix/" + path.lstrip("/") response = client.get(path_to_get) soup = Soup(response.content, "html.parser") for form in soup.select("form"): assert form["action"].startswith("/prefix") for el in soup.find_all(["a", "link", "script"]): if "href" in el.attrs: href = el["href"] elif "src" in el.attrs: href = el["src"] else: continue # Could be a if ( not href.startswith("#") and href not in { "https://datasette.io/", "https://github.com/simonw/datasette", "https://github.com/simonw/datasette/blob/main/LICENSE", "https://github.com/simonw/datasette/blob/main/tests/fixtures.py", "/login-as-root", # Only used for the latest.datasette.io demo } and not href.startswith("https://plugin-example.datasette.io/") ): # If this has been made absolute it may start http://localhost/ if href.startswith("http://localhost/"): href = href[len("http://localost/") :] assert href.startswith("/prefix/"), json.dumps( { "path": path, "path_to_get": path_to_get, "href_or_src": href, "element_parent": str(el.parent), }, indent=4, default=repr, ) def test_base_url_affects_filter_redirects(app_client_base_url_prefix): path = "/fixtures/binary_data?_filter_column=rowid&_filter_op=exact&_filter_value=1&_sort=rowid" response = app_client_base_url_prefix.get(path) assert response.status_code == 302 assert ( response.headers["location"] == "/prefix/fixtures/binary_data?_sort=rowid&rowid__exact=1" ) def test_base_url_affects_metadata_extra_css_urls(app_client_base_url_prefix): html = app_client_base_url_prefix.get("/").text assert '' in html @pytest.mark.asyncio @pytest.mark.parametrize( "path,expected", [ ( "/fixtures/neighborhood_search", "/fixtures/-/query?sql=%0Aselect+_neighborhood%2C+facet_cities.name%2C+state%0Afrom+facetable%0A++++join+facet_cities%0A++++++++on+facetable._city_id+%3D+facet_cities.id%0Awhere+_neighborhood+like+%27%25%27+%7C%7C+%3Atext+%7C%7C+%27%25%27%0Aorder+by+_neighborhood%3B%0A&text=", ), ( "/fixtures/neighborhood_search?text=ber", "/fixtures/-/query?sql=%0Aselect+_neighborhood%2C+facet_cities.name%2C+state%0Afrom+facetable%0A++++join+facet_cities%0A++++++++on+facetable._city_id+%3D+facet_cities.id%0Awhere+_neighborhood+like+%27%25%27+%7C%7C+%3Atext+%7C%7C+%27%25%27%0Aorder+by+_neighborhood%3B%0A&text=ber", ), ("/fixtures/pragma_cache_size", None), ( # /fixtures/𝐜𝐢𝐭𝐢𝐞𝐬 "/fixtures/~F0~9D~90~9C~F0~9D~90~A2~F0~9D~90~AD~F0~9D~90~A2~F0~9D~90~9E~F0~9D~90~AC", "/fixtures/-/query?sql=select+id%2C+name+from+facet_cities+order+by+id+limit+1%3B", ), ("/fixtures/magic_parameters", None), ], ) async def test_edit_sql_link_on_canned_queries(ds_client, path, expected): response = await ds_client.get(path) assert response.status_code == 200 expected_link = f'Edit SQL' if expected: assert expected_link in response.text else: assert "Edit SQL" not in response.text @pytest.mark.parametrize("permission_allowed", [True, False]) def test_edit_sql_link_not_shown_if_user_lacks_permission(permission_allowed): with make_app_client( config={ "allow_sql": None if permission_allowed else {"id": "not-you"}, "databases": {"fixtures": {"queries": {"simple": "select 1 + 1"}}}, } ) as client: response = client.get("/fixtures/simple") if permission_allowed: assert "Edit SQL" in response.text else: assert "Edit SQL" not in response.text @pytest.mark.asyncio @pytest.mark.parametrize( "actor_id,should_have_links,should_not_have_links", [ (None, None, None), ("test", None, ["/-/permissions"]), ("root", ["/-/permissions", "/-/allow-debug"], None), ], ) async def test_navigation_menu_links( ds_client, actor_id, should_have_links, should_not_have_links ): cookies = {} if actor_id: cookies = {"ds_actor": ds_client.actor_cookie({"id": actor_id})} html = (await ds_client.get("/", cookies=cookies)).text soup = Soup(html, "html.parser") details = soup.find("nav").find("details") if not actor_id: # Should not show a menu assert details is None return # They are logged in: should show a menu assert details is not None # And a logout form assert details.find("form") is not None if should_have_links: for link in should_have_links: assert ( details.find("a", {"href": link}) is not None ), f"{link} expected but missing from nav menu" if should_not_have_links: for link in should_not_have_links: assert ( details.find("a", {"href": link}) is None ), f"{link} found but should not have been in nav menu" @pytest.mark.asyncio async def test_trace_correctly_escaped(ds_client): response = await ds_client.get("/fixtures/-/query?sql=select+'Hello'&_trace=1") assert "select 'Hello" not in response.text assert "select '<h1>Hello" in response.text @pytest.mark.asyncio @pytest.mark.parametrize( "path,expected", ( # Instance index page ("/", "http://localhost/.json"), # Table page ("/fixtures/facetable", "http://localhost/fixtures/facetable.json"), ( "/fixtures/table~2Fwith~2Fslashes~2Ecsv", "http://localhost/fixtures/table~2Fwith~2Fslashes~2Ecsv.json", ), # Row page ( "/fixtures/no_primary_key/1", "http://localhost/fixtures/no_primary_key/1.json", ), # Database index page ( "/fixtures", "http://localhost/fixtures.json", ), # Custom query page ( "/fixtures/-/query?sql=select+*+from+facetable", "http://localhost/fixtures/-/query.json?sql=select+*+from+facetable", ), # Canned query page ( "/fixtures/neighborhood_search?text=town", "http://localhost/fixtures/neighborhood_search.json?text=town", ), # /-/ pages ( "/-/plugins", "http://localhost/-/plugins.json", ), ), ) async def test_alternate_url_json(ds_client, path, expected): response = await ds_client.get(path) assert response.status_code == 200 link = response.headers["link"] assert link == '<{}>; rel="alternate"; type="application/json+datasette"'.format( expected ) assert ( ''.format( expected ) in response.text ) @pytest.mark.asyncio @pytest.mark.parametrize( "path", ("/-/patterns", "/-/messages", "/-/allow-debug", "/fixtures.db"), ) async def test_no_alternate_url_json(ds_client, path): response = await ds_client.get(path) assert "link" not in response.headers assert ( '