diff --git a/datasette/views/database.py b/datasette/views/database.py index 8cd37fdb..c32ff92f 100644 --- a/datasette/views/database.py +++ b/datasette/views/database.py @@ -371,6 +371,11 @@ class MagicParameters(dict): ) ) + def __len__(self): + # Workaround for 'Incorrect number of bindings' error + # https://github.com/simonw/datasette/issues/967#issuecomment-692951144 + return super().__len__() or 1 + def __getitem__(self, key): if key.startswith("_") and key.count("_") >= 2: prefix, suffix = key[1:].split("_", 1) diff --git a/tests/test_canned_queries.py b/tests/test_canned_queries.py index 67e9f822..9620c693 100644 --- a/tests/test_canned_queries.py +++ b/tests/test_canned_queries.py @@ -305,14 +305,49 @@ def test_magic_parameters(magic_parameters_client, magic_parameter, expected_re) assert None is soup.find("input", {"name": magic_parameter}) # Submit the form to create a log line response = magic_parameters_client.post( - "/data/runme_post", {}, csrftoken_from=True, cookies=cookies + "/data/runme_post?_json=1", {}, csrftoken_from=True, cookies=cookies ) + assert response.json == { + "ok": True, + "message": "Query executed, 1 row affected", + "redirect": None, + } post_actual = magic_parameters_client.get( "/data/logs.json?_sort_desc=rowid&_shape=array" ).json[0]["line"] assert re.match(expected_re, post_actual) +@pytest.mark.parametrize("use_csrf", [True, False]) +@pytest.mark.parametrize("return_json", [True, False]) +def test_magic_parameters_csrf_json(magic_parameters_client, use_csrf, return_json): + magic_parameters_client.ds._metadata["databases"]["data"]["queries"]["runme_post"][ + "sql" + ] = "insert into logs (line) values (:_header_host)" + qs = "" + if return_json: + qs = "?_json=1" + response = magic_parameters_client.post( + "/data/runme_post{}".format(qs), + {}, + csrftoken_from=use_csrf or None, + allow_redirects=False, + ) + if return_json: + assert response.status == 200 + assert response.json["ok"], response.json + else: + assert response.status == 302 + messages = magic_parameters_client.ds.unsign( + response.cookies["ds_messages"], "messages" + ) + assert [["Query executed, 1 row affected", 1]] == messages + post_actual = magic_parameters_client.get( + "/data/logs.json?_sort_desc=rowid&_shape=array" + ).json[0]["line"] + assert post_actual == "localhost" + + def test_magic_parameters_cannot_be_used_in_arbitrary_queries(magic_parameters_client): response = magic_parameters_client.get( "/data.json?sql=select+:_header_host&_shape=array"