2020-06-28 02:58:16 +00:00
|
|
|
from bs4 import BeautifulSoup as Soup
|
2020-09-14 20:18:15 +00:00
|
|
|
import json
|
2020-06-03 15:16:50 +00:00
|
|
|
import pytest
|
2020-06-28 02:58:16 +00:00
|
|
|
import re
|
|
|
|
from .fixtures import make_app_client, app_client
|
2020-06-03 15:16:50 +00:00
|
|
|
|
|
|
|
|
|
|
|
@pytest.fixture
|
2021-12-19 21:11:57 +00:00
|
|
|
def canned_write_client(tmpdir):
|
|
|
|
template_dir = tmpdir / "canned_write_templates"
|
|
|
|
template_dir.mkdir()
|
|
|
|
(template_dir / "query-data-update_name.html").write_text(
|
|
|
|
"""
|
|
|
|
{% extends "query.html" %}
|
|
|
|
{% block content %}!!!CUSTOM_UPDATE_NAME_TEMPLATE!!!{{ super() }}{% endblock %}
|
|
|
|
""",
|
|
|
|
"utf-8",
|
|
|
|
)
|
2020-06-07 21:14:10 +00:00
|
|
|
with make_app_client(
|
2020-06-03 15:16:50 +00:00
|
|
|
extra_databases={"data.db": "create table names (name text)"},
|
2021-12-19 21:11:57 +00:00
|
|
|
template_dir=str(template_dir),
|
2020-06-03 15:16:50 +00:00
|
|
|
metadata={
|
|
|
|
"databases": {
|
|
|
|
"data": {
|
|
|
|
"queries": {
|
2020-08-09 16:03:17 +00:00
|
|
|
"canned_read": {"sql": "select * from names"},
|
2020-06-03 15:16:50 +00:00
|
|
|
"add_name": {
|
|
|
|
"sql": "insert into names (name) values (:name)",
|
|
|
|
"write": True,
|
|
|
|
"on_success_redirect": "/data/add_name?success",
|
|
|
|
},
|
|
|
|
"add_name_specify_id": {
|
|
|
|
"sql": "insert into names (rowid, name) values (:rowid, :name)",
|
|
|
|
"write": True,
|
|
|
|
"on_error_redirect": "/data/add_name_specify_id?error",
|
|
|
|
},
|
|
|
|
"delete_name": {
|
|
|
|
"sql": "delete from names where rowid = :rowid",
|
|
|
|
"write": True,
|
|
|
|
"on_success_message": "Name deleted",
|
2020-06-06 19:05:22 +00:00
|
|
|
"allow": {"id": "root"},
|
2020-06-03 15:16:50 +00:00
|
|
|
},
|
|
|
|
"update_name": {
|
|
|
|
"sql": "update names set name = :name where rowid = :rowid",
|
2020-06-03 21:04:40 +00:00
|
|
|
"params": ["rowid", "name", "extra"],
|
2020-06-03 15:16:50 +00:00
|
|
|
"write": True,
|
|
|
|
},
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
},
|
2020-06-07 21:14:10 +00:00
|
|
|
) as client:
|
2020-06-03 15:16:50 +00:00
|
|
|
yield client
|
|
|
|
|
|
|
|
|
2020-06-28 02:58:16 +00:00
|
|
|
def test_canned_query_with_named_parameter(app_client):
|
|
|
|
response = app_client.get("/fixtures/neighborhood_search.json?text=town")
|
|
|
|
assert [
|
|
|
|
["Corktown", "Detroit", "MI"],
|
|
|
|
["Downtown", "Los Angeles", "CA"],
|
|
|
|
["Downtown", "Detroit", "MI"],
|
|
|
|
["Greektown", "Detroit", "MI"],
|
|
|
|
["Koreatown", "Los Angeles", "CA"],
|
|
|
|
["Mexicantown", "Detroit", "MI"],
|
|
|
|
] == response.json["rows"]
|
|
|
|
|
|
|
|
|
2020-06-03 15:16:50 +00:00
|
|
|
def test_insert(canned_write_client):
|
|
|
|
response = canned_write_client.post(
|
2020-07-01 03:08:00 +00:00
|
|
|
"/data/add_name",
|
|
|
|
{"name": "Hello"},
|
|
|
|
csrftoken_from=True,
|
|
|
|
cookies={"foo": "bar"},
|
2020-06-03 15:16:50 +00:00
|
|
|
)
|
|
|
|
assert 302 == response.status
|
|
|
|
assert "/data/add_name?success" == response.headers["Location"]
|
|
|
|
messages = canned_write_client.ds.unsign(
|
|
|
|
response.cookies["ds_messages"], "messages"
|
|
|
|
)
|
|
|
|
assert [["Query executed, 1 row affected", 1]] == messages
|
|
|
|
|
|
|
|
|
2020-08-09 16:03:17 +00:00
|
|
|
@pytest.mark.parametrize(
|
|
|
|
"query_name,expect_csrf_hidden_field",
|
2020-09-02 22:24:55 +00:00
|
|
|
[
|
|
|
|
("canned_read", False),
|
|
|
|
("add_name_specify_id", True),
|
|
|
|
("add_name", True),
|
|
|
|
],
|
2020-08-09 16:03:17 +00:00
|
|
|
)
|
|
|
|
def test_canned_query_form_csrf_hidden_field(
|
|
|
|
canned_write_client, query_name, expect_csrf_hidden_field
|
|
|
|
):
|
2020-11-15 23:24:22 +00:00
|
|
|
response = canned_write_client.get(f"/data/{query_name}")
|
2020-08-09 16:03:17 +00:00
|
|
|
html = response.text
|
|
|
|
fragment = '<input type="hidden" name="csrftoken" value="'
|
|
|
|
if expect_csrf_hidden_field:
|
|
|
|
assert fragment in html
|
|
|
|
else:
|
|
|
|
assert fragment not in html
|
|
|
|
|
|
|
|
|
2020-07-01 03:08:00 +00:00
|
|
|
def test_insert_with_cookies_requires_csrf(canned_write_client):
|
|
|
|
response = canned_write_client.post(
|
|
|
|
"/data/add_name",
|
|
|
|
{"name": "Hello"},
|
|
|
|
cookies={"foo": "bar"},
|
|
|
|
)
|
|
|
|
assert 403 == response.status
|
|
|
|
|
|
|
|
|
|
|
|
def test_insert_no_cookies_no_csrf(canned_write_client):
|
2021-10-14 18:03:44 +00:00
|
|
|
response = canned_write_client.post("/data/add_name", {"name": "Hello"})
|
2020-07-01 03:08:00 +00:00
|
|
|
assert 302 == response.status
|
|
|
|
assert "/data/add_name?success" == response.headers["Location"]
|
|
|
|
|
|
|
|
|
2020-06-03 15:16:50 +00:00
|
|
|
def test_custom_success_message(canned_write_client):
|
|
|
|
response = canned_write_client.post(
|
2020-06-06 19:05:22 +00:00
|
|
|
"/data/delete_name",
|
|
|
|
{"rowid": 1},
|
2020-06-10 19:39:54 +00:00
|
|
|
cookies={"ds_actor": canned_write_client.actor_cookie({"id": "root"})},
|
2020-06-06 19:05:22 +00:00
|
|
|
csrftoken_from=True,
|
2020-06-03 15:16:50 +00:00
|
|
|
)
|
|
|
|
assert 302 == response.status
|
|
|
|
messages = canned_write_client.ds.unsign(
|
|
|
|
response.cookies["ds_messages"], "messages"
|
|
|
|
)
|
|
|
|
assert [["Name deleted", 1]] == messages
|
|
|
|
|
|
|
|
|
|
|
|
def test_insert_error(canned_write_client):
|
2020-06-05 19:05:57 +00:00
|
|
|
canned_write_client.post("/data/add_name", {"name": "Hello"}, csrftoken_from=True)
|
2020-06-03 15:16:50 +00:00
|
|
|
response = canned_write_client.post(
|
|
|
|
"/data/add_name_specify_id",
|
|
|
|
{"rowid": 1, "name": "Should fail"},
|
2020-06-05 19:05:57 +00:00
|
|
|
csrftoken_from=True,
|
2020-06-03 15:16:50 +00:00
|
|
|
)
|
|
|
|
assert 302 == response.status
|
|
|
|
assert "/data/add_name_specify_id?error" == response.headers["Location"]
|
|
|
|
messages = canned_write_client.ds.unsign(
|
|
|
|
response.cookies["ds_messages"], "messages"
|
|
|
|
)
|
|
|
|
assert [["UNIQUE constraint failed: names.rowid", 3]] == messages
|
|
|
|
# How about with a custom error message?
|
|
|
|
canned_write_client.ds._metadata["databases"]["data"]["queries"][
|
|
|
|
"add_name_specify_id"
|
|
|
|
]["on_error_message"] = "ERROR"
|
|
|
|
response = canned_write_client.post(
|
|
|
|
"/data/add_name_specify_id",
|
|
|
|
{"rowid": 1, "name": "Should fail"},
|
2020-06-05 19:05:57 +00:00
|
|
|
csrftoken_from=True,
|
2020-06-03 15:16:50 +00:00
|
|
|
)
|
|
|
|
assert [["ERROR", 3]] == canned_write_client.ds.unsign(
|
|
|
|
response.cookies["ds_messages"], "messages"
|
|
|
|
)
|
2020-06-03 21:04:40 +00:00
|
|
|
|
|
|
|
|
|
|
|
def test_custom_params(canned_write_client):
|
|
|
|
response = canned_write_client.get("/data/update_name?extra=foo")
|
|
|
|
assert '<input type="text" id="qp3" name="extra" value="foo">' in response.text
|
2020-06-06 19:05:22 +00:00
|
|
|
|
|
|
|
|
2020-06-06 19:26:19 +00:00
|
|
|
def test_vary_header(canned_write_client):
|
|
|
|
# These forms embed a csrftoken so they should be served with Vary: Cookie
|
|
|
|
assert "vary" not in canned_write_client.get("/data").headers
|
|
|
|
assert "Cookie" == canned_write_client.get("/data/update_name").headers["vary"]
|
|
|
|
|
|
|
|
|
2020-09-14 20:18:15 +00:00
|
|
|
def test_json_post_body(canned_write_client):
|
|
|
|
response = canned_write_client.post(
|
|
|
|
"/data/add_name",
|
2020-09-14 20:25:09 +00:00
|
|
|
body=json.dumps({"name": ["Hello", "there"]}),
|
2020-09-14 20:18:15 +00:00
|
|
|
)
|
|
|
|
assert 302 == response.status
|
|
|
|
assert "/data/add_name?success" == response.headers["Location"]
|
2020-09-14 20:25:09 +00:00
|
|
|
rows = canned_write_client.get("/data/names.json?_shape=array").json
|
|
|
|
assert rows == [{"rowid": 1, "name": "['Hello', 'there']"}]
|
2020-09-14 20:18:15 +00:00
|
|
|
|
|
|
|
|
2020-09-14 21:23:18 +00:00
|
|
|
@pytest.mark.parametrize(
|
|
|
|
"headers,body,querystring",
|
|
|
|
(
|
|
|
|
(None, "name=NameGoesHere", "?_json=1"),
|
|
|
|
({"Accept": "application/json"}, "name=NameGoesHere", None),
|
|
|
|
(None, "name=NameGoesHere&_json=1", None),
|
|
|
|
(None, '{"name": "NameGoesHere", "_json": 1}', None),
|
|
|
|
),
|
|
|
|
)
|
|
|
|
def test_json_response(canned_write_client, headers, body, querystring):
|
|
|
|
response = canned_write_client.post(
|
|
|
|
"/data/add_name" + (querystring or ""),
|
|
|
|
body=body,
|
|
|
|
headers=headers,
|
|
|
|
)
|
|
|
|
assert 200 == response.status
|
|
|
|
assert response.headers["content-type"] == "application/json; charset=utf-8"
|
|
|
|
assert response.json == {
|
|
|
|
"ok": True,
|
|
|
|
"message": "Query executed, 1 row affected",
|
|
|
|
"redirect": "/data/add_name?success",
|
|
|
|
}
|
|
|
|
rows = canned_write_client.get("/data/names.json?_shape=array").json
|
|
|
|
assert rows == [{"rowid": 1, "name": "NameGoesHere"}]
|
|
|
|
|
|
|
|
|
2020-06-06 19:05:22 +00:00
|
|
|
def test_canned_query_permissions_on_database_page(canned_write_client):
|
|
|
|
# Without auth only shows three queries
|
2020-06-18 23:52:06 +00:00
|
|
|
query_names = {
|
2020-06-06 19:05:22 +00:00
|
|
|
q["name"] for q in canned_write_client.get("/data.json").json["queries"]
|
2020-06-18 23:52:06 +00:00
|
|
|
}
|
|
|
|
assert {
|
2020-08-09 16:03:17 +00:00
|
|
|
"canned_read",
|
2020-06-18 23:22:33 +00:00
|
|
|
"add_name",
|
|
|
|
"add_name_specify_id",
|
|
|
|
"update_name",
|
|
|
|
"from_async_hook",
|
|
|
|
"from_hook",
|
2020-06-18 23:52:06 +00:00
|
|
|
} == query_names
|
2020-06-06 19:05:22 +00:00
|
|
|
|
|
|
|
# With auth shows four
|
|
|
|
response = canned_write_client.get(
|
|
|
|
"/data.json",
|
2020-06-10 19:39:54 +00:00
|
|
|
cookies={"ds_actor": canned_write_client.actor_cookie({"id": "root"})},
|
2020-06-06 19:05:22 +00:00
|
|
|
)
|
|
|
|
assert 200 == response.status
|
2020-09-14 21:23:18 +00:00
|
|
|
query_names_and_private = sorted(
|
|
|
|
[
|
|
|
|
{"name": q["name"], "private": q["private"]}
|
|
|
|
for q in response.json["queries"]
|
|
|
|
],
|
|
|
|
key=lambda q: q["name"],
|
|
|
|
)
|
|
|
|
assert query_names_and_private == [
|
2020-06-08 03:50:37 +00:00
|
|
|
{"name": "add_name", "private": False},
|
|
|
|
{"name": "add_name_specify_id", "private": False},
|
2020-08-09 16:03:17 +00:00
|
|
|
{"name": "canned_read", "private": False},
|
2020-06-08 03:50:37 +00:00
|
|
|
{"name": "delete_name", "private": True},
|
2020-06-18 23:22:33 +00:00
|
|
|
{"name": "from_async_hook", "private": False},
|
|
|
|
{"name": "from_hook", "private": False},
|
2020-06-18 23:52:06 +00:00
|
|
|
{"name": "update_name", "private": False},
|
2020-09-14 21:23:18 +00:00
|
|
|
]
|
2020-06-06 19:27:00 +00:00
|
|
|
|
|
|
|
|
|
|
|
def test_canned_query_permissions(canned_write_client):
|
|
|
|
assert 403 == canned_write_client.get("/data/delete_name").status
|
|
|
|
assert 200 == canned_write_client.get("/data/update_name").status
|
2020-06-10 19:39:54 +00:00
|
|
|
cookies = {"ds_actor": canned_write_client.actor_cookie({"id": "root"})}
|
2020-06-06 19:27:00 +00:00
|
|
|
assert 200 == canned_write_client.get("/data/delete_name", cookies=cookies).status
|
|
|
|
assert 200 == canned_write_client.get("/data/update_name", cookies=cookies).status
|
2020-06-28 02:58:16 +00:00
|
|
|
|
|
|
|
|
|
|
|
@pytest.fixture(scope="session")
|
|
|
|
def magic_parameters_client():
|
|
|
|
with make_app_client(
|
|
|
|
extra_databases={"data.db": "create table logs (line text)"},
|
|
|
|
metadata={
|
|
|
|
"databases": {
|
|
|
|
"data": {
|
|
|
|
"queries": {
|
|
|
|
"runme_post": {"sql": "", "write": True},
|
|
|
|
"runme_get": {"sql": ""},
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
},
|
|
|
|
) as client:
|
|
|
|
yield client
|
|
|
|
|
|
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
|
|
"magic_parameter,expected_re",
|
|
|
|
[
|
|
|
|
("_actor_id", "root"),
|
|
|
|
("_header_host", "localhost"),
|
2020-06-30 22:00:17 +00:00
|
|
|
("_header_not_a_thing", ""),
|
2020-06-28 02:58:16 +00:00
|
|
|
("_cookie_foo", "bar"),
|
2020-06-28 19:45:34 +00:00
|
|
|
("_now_epoch", r"^\d+$"),
|
|
|
|
("_now_date_utc", r"^\d{4}-\d{2}-\d{2}$"),
|
|
|
|
("_now_datetime_utc", r"^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}Z$"),
|
2020-06-28 02:58:16 +00:00
|
|
|
("_random_chars_1", r"^\w$"),
|
|
|
|
("_random_chars_10", r"^\w{10}$"),
|
|
|
|
],
|
|
|
|
)
|
|
|
|
def test_magic_parameters(magic_parameters_client, magic_parameter, expected_re):
|
|
|
|
magic_parameters_client.ds._metadata["databases"]["data"]["queries"]["runme_post"][
|
|
|
|
"sql"
|
2020-11-15 23:24:22 +00:00
|
|
|
] = f"insert into logs (line) values (:{magic_parameter})"
|
2020-06-28 02:58:16 +00:00
|
|
|
magic_parameters_client.ds._metadata["databases"]["data"]["queries"]["runme_get"][
|
|
|
|
"sql"
|
2020-11-15 23:24:22 +00:00
|
|
|
] = f"select :{magic_parameter} as result"
|
2020-06-28 02:58:16 +00:00
|
|
|
cookies = {
|
|
|
|
"ds_actor": magic_parameters_client.actor_cookie({"id": "root"}),
|
|
|
|
"foo": "bar",
|
|
|
|
}
|
|
|
|
# Test the GET version
|
|
|
|
get_response = magic_parameters_client.get(
|
|
|
|
"/data/runme_get.json?_shape=array", cookies=cookies
|
|
|
|
)
|
|
|
|
get_actual = get_response.json[0]["result"]
|
|
|
|
assert re.match(expected_re, str(get_actual))
|
|
|
|
# Test the form
|
|
|
|
form_response = magic_parameters_client.get("/data/runme_post")
|
|
|
|
soup = Soup(form_response.body, "html.parser")
|
|
|
|
# The magic parameter should not be represented as a form field
|
|
|
|
assert None is soup.find("input", {"name": magic_parameter})
|
|
|
|
# Submit the form to create a log line
|
|
|
|
response = magic_parameters_client.post(
|
2020-09-15 20:10:25 +00:00
|
|
|
"/data/runme_post?_json=1", {}, csrftoken_from=True, cookies=cookies
|
2020-06-28 02:58:16 +00:00
|
|
|
)
|
2020-09-15 20:10:25 +00:00
|
|
|
assert response.json == {
|
|
|
|
"ok": True,
|
|
|
|
"message": "Query executed, 1 row affected",
|
|
|
|
"redirect": None,
|
|
|
|
}
|
2020-06-28 02:58:16 +00:00
|
|
|
post_actual = magic_parameters_client.get(
|
|
|
|
"/data/logs.json?_sort_desc=rowid&_shape=array"
|
|
|
|
).json[0]["line"]
|
|
|
|
assert re.match(expected_re, post_actual)
|
|
|
|
|
|
|
|
|
2020-09-15 20:10:25 +00:00
|
|
|
@pytest.mark.parametrize("use_csrf", [True, False])
|
|
|
|
@pytest.mark.parametrize("return_json", [True, False])
|
|
|
|
def test_magic_parameters_csrf_json(magic_parameters_client, use_csrf, return_json):
|
|
|
|
magic_parameters_client.ds._metadata["databases"]["data"]["queries"]["runme_post"][
|
|
|
|
"sql"
|
|
|
|
] = "insert into logs (line) values (:_header_host)"
|
|
|
|
qs = ""
|
|
|
|
if return_json:
|
|
|
|
qs = "?_json=1"
|
|
|
|
response = magic_parameters_client.post(
|
2020-11-15 23:24:22 +00:00
|
|
|
f"/data/runme_post{qs}",
|
2020-09-15 20:10:25 +00:00
|
|
|
{},
|
|
|
|
csrftoken_from=use_csrf or None,
|
|
|
|
)
|
|
|
|
if return_json:
|
|
|
|
assert response.status == 200
|
|
|
|
assert response.json["ok"], response.json
|
|
|
|
else:
|
|
|
|
assert response.status == 302
|
|
|
|
messages = magic_parameters_client.ds.unsign(
|
|
|
|
response.cookies["ds_messages"], "messages"
|
|
|
|
)
|
|
|
|
assert [["Query executed, 1 row affected", 1]] == messages
|
|
|
|
post_actual = magic_parameters_client.get(
|
|
|
|
"/data/logs.json?_sort_desc=rowid&_shape=array"
|
|
|
|
).json[0]["line"]
|
|
|
|
assert post_actual == "localhost"
|
|
|
|
|
|
|
|
|
2020-06-28 02:58:16 +00:00
|
|
|
def test_magic_parameters_cannot_be_used_in_arbitrary_queries(magic_parameters_client):
|
|
|
|
response = magic_parameters_client.get(
|
|
|
|
"/data.json?sql=select+:_header_host&_shape=array"
|
|
|
|
)
|
2021-06-02 03:46:20 +00:00
|
|
|
assert 400 == response.status
|
2021-10-09 00:32:52 +00:00
|
|
|
assert response.json["error"].startswith("You did not supply a value for binding")
|
2021-12-19 21:11:57 +00:00
|
|
|
|
|
|
|
|
|
|
|
def test_canned_write_custom_template(canned_write_client):
|
|
|
|
response = canned_write_client.get("/data/update_name")
|
|
|
|
assert response.status == 200
|
|
|
|
assert (
|
|
|
|
"<!-- Templates considered: *query-data-update_name.html, query-data.html, query.html -->"
|
|
|
|
in response.text
|
|
|
|
)
|
|
|
|
assert "!!!CUSTOM_UPDATE_NAME_TEMPLATE!!!" in response.text
|