kopia lustrzana https://github.com/simonw/datasette
ds_client for test_plugins.py, refs #1959
rodzic
b998c2793f
commit
30f1a0705b
|
@ -1,5 +1,6 @@
|
|||
from bs4 import BeautifulSoup as Soup
|
||||
from .fixtures import (
|
||||
app_client,
|
||||
app_client,
|
||||
make_app_client,
|
||||
TABLES,
|
||||
|
@ -41,22 +42,28 @@ def test_plugin_hooks_have_tests(plugin_hook):
|
|||
assert ok, f"Plugin hook is missing tests: {plugin_hook}"
|
||||
|
||||
|
||||
def test_hook_plugins_dir_plugin_prepare_connection(app_client):
|
||||
response = app_client.get(
|
||||
@pytest.mark.ds_client
|
||||
@pytest.mark.asyncio
|
||||
async def test_hook_plugins_dir_plugin_prepare_connection(ds_client):
|
||||
response = await ds_client.get(
|
||||
"/fixtures.json?sql=select+convert_units(100%2C+'m'%2C+'ft')"
|
||||
)
|
||||
assert pytest.approx(328.0839) == response.json["rows"][0][0]
|
||||
assert pytest.approx(328.0839) == response.json()["rows"][0][0]
|
||||
|
||||
|
||||
def test_hook_plugin_prepare_connection_arguments(app_client):
|
||||
response = app_client.get(
|
||||
@pytest.mark.ds_client
|
||||
@pytest.mark.asyncio
|
||||
async def test_hook_plugin_prepare_connection_arguments(ds_client):
|
||||
response = await ds_client.get(
|
||||
"/fixtures.json?sql=select+prepare_connection_args()&_shape=arrayfirst"
|
||||
)
|
||||
assert [
|
||||
"database=fixtures, datasette.plugin_config(\"name-of-plugin\")={'depth': 'root'}"
|
||||
] == response.json
|
||||
] == response.json()
|
||||
|
||||
|
||||
@pytest.mark.ds_client
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.parametrize(
|
||||
"path,expected_decoded_object",
|
||||
[
|
||||
|
@ -106,10 +113,10 @@ def test_hook_plugin_prepare_connection_arguments(app_client):
|
|||
),
|
||||
],
|
||||
)
|
||||
def test_hook_extra_css_urls(app_client, path, expected_decoded_object):
|
||||
response = app_client.get(path)
|
||||
assert response.status == 200
|
||||
links = Soup(response.body, "html.parser").findAll("link")
|
||||
async def test_hook_extra_css_urls(ds_client, path, expected_decoded_object):
|
||||
response = await ds_client.get(path)
|
||||
assert response.status_code == 200
|
||||
links = Soup(response.text, "html.parser").findAll("link")
|
||||
special_href = [
|
||||
l for l in links if l.attrs["href"].endswith("/extra-css-urls-demo.css")
|
||||
][0]["href"]
|
||||
|
@ -120,9 +127,11 @@ def test_hook_extra_css_urls(app_client, path, expected_decoded_object):
|
|||
)
|
||||
|
||||
|
||||
def test_hook_extra_js_urls(app_client):
|
||||
response = app_client.get("/")
|
||||
scripts = Soup(response.body, "html.parser").findAll("script")
|
||||
@pytest.mark.ds_client
|
||||
@pytest.mark.asyncio
|
||||
async def test_hook_extra_js_urls(ds_client):
|
||||
response = await ds_client.get("/")
|
||||
scripts = Soup(response.text, "html.parser").findAll("script")
|
||||
script_attrs = [s.attrs for s in scripts]
|
||||
for attrs in [
|
||||
{
|
||||
|
@ -138,15 +147,17 @@ def test_hook_extra_js_urls(app_client):
|
|||
assert any(s == attrs for s in script_attrs), "Expected: {}".format(attrs)
|
||||
|
||||
|
||||
def test_plugins_with_duplicate_js_urls(app_client):
|
||||
@pytest.mark.ds_client
|
||||
@pytest.mark.asyncio
|
||||
async def test_plugins_with_duplicate_js_urls(ds_client):
|
||||
# If two plugins both require jQuery, jQuery should be loaded only once
|
||||
response = app_client.get("/fixtures")
|
||||
response = await ds_client.get("/fixtures")
|
||||
# This test is a little tricky, as if the user has any other plugins in
|
||||
# their current virtual environment those may affect what comes back too.
|
||||
# What matters is that https://plugin-example.datasette.io/jquery.js is only there once
|
||||
# and it comes before plugin1.js and plugin2.js which could be in either
|
||||
# order
|
||||
scripts = Soup(response.body, "html.parser").findAll("script")
|
||||
scripts = Soup(response.text, "html.parser").findAll("script")
|
||||
srcs = [s["src"] for s in scripts if s.get("src")]
|
||||
# No duplicates allowed:
|
||||
assert len(srcs) == len(set(srcs))
|
||||
|
@ -164,13 +175,15 @@ def test_plugins_with_duplicate_js_urls(app_client):
|
|||
)
|
||||
|
||||
|
||||
def test_hook_render_cell_link_from_json(app_client):
|
||||
@pytest.mark.ds_client
|
||||
@pytest.mark.asyncio
|
||||
async def test_hook_render_cell_link_from_json(ds_client):
|
||||
sql = """
|
||||
select '{"href": "http://example.com/", "label":"Example"}'
|
||||
""".strip()
|
||||
path = "/fixtures?" + urllib.parse.urlencode({"sql": sql})
|
||||
response = app_client.get(path)
|
||||
td = Soup(response.body, "html.parser").find("table").find("tbody").find("td")
|
||||
response = await ds_client.get(path)
|
||||
td = Soup(response.text, "html.parser").find("table").find("tbody").find("td")
|
||||
a = td.find("a")
|
||||
assert a is not None, str(a)
|
||||
assert a.attrs["href"] == "http://example.com/"
|
||||
|
@ -178,9 +191,11 @@ def test_hook_render_cell_link_from_json(app_client):
|
|||
assert a.text == "Example"
|
||||
|
||||
|
||||
def test_hook_render_cell_demo(app_client):
|
||||
response = app_client.get("/fixtures/simple_primary_key?id=4")
|
||||
soup = Soup(response.body, "html.parser")
|
||||
@pytest.mark.ds_client
|
||||
@pytest.mark.asyncio
|
||||
async def test_hook_render_cell_demo(ds_client):
|
||||
response = await ds_client.get("/fixtures/simple_primary_key?id=4")
|
||||
soup = Soup(response.text, "html.parser")
|
||||
td = soup.find("td", {"class": "col-content"})
|
||||
assert json.loads(td.string) == {
|
||||
"row": {"id": "4", "content": "RENDER_CELL_DEMO"},
|
||||
|
@ -191,60 +206,70 @@ def test_hook_render_cell_demo(app_client):
|
|||
}
|
||||
|
||||
|
||||
@pytest.mark.ds_client
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.parametrize(
|
||||
"path", ("/fixtures?sql=select+'RENDER_CELL_ASYNC'", "/fixtures/simple_primary_key")
|
||||
)
|
||||
def test_hook_render_cell_async(app_client, path):
|
||||
response = app_client.get(path)
|
||||
assert b"RENDER_CELL_ASYNC_RESULT" in response.body
|
||||
async def test_hook_render_cell_async(ds_client, path):
|
||||
response = await ds_client.get(path)
|
||||
assert b"RENDER_CELL_ASYNC_RESULT" in response.content
|
||||
|
||||
|
||||
def test_plugin_config(app_client):
|
||||
assert {"depth": "table"} == app_client.ds.plugin_config(
|
||||
@pytest.mark.ds_client
|
||||
@pytest.mark.asyncio
|
||||
async def test_plugin_config(ds_client):
|
||||
assert {"depth": "table"} == ds_client.ds.plugin_config(
|
||||
"name-of-plugin", database="fixtures", table="sortable"
|
||||
)
|
||||
assert {"depth": "database"} == app_client.ds.plugin_config(
|
||||
assert {"depth": "database"} == ds_client.ds.plugin_config(
|
||||
"name-of-plugin", database="fixtures", table="unknown_table"
|
||||
)
|
||||
assert {"depth": "database"} == app_client.ds.plugin_config(
|
||||
assert {"depth": "database"} == ds_client.ds.plugin_config(
|
||||
"name-of-plugin", database="fixtures"
|
||||
)
|
||||
assert {"depth": "root"} == app_client.ds.plugin_config(
|
||||
assert {"depth": "root"} == ds_client.ds.plugin_config(
|
||||
"name-of-plugin", database="unknown_database"
|
||||
)
|
||||
assert {"depth": "root"} == app_client.ds.plugin_config("name-of-plugin")
|
||||
assert None is app_client.ds.plugin_config("unknown-plugin")
|
||||
assert {"depth": "root"} == ds_client.ds.plugin_config("name-of-plugin")
|
||||
assert None is ds_client.ds.plugin_config("unknown-plugin")
|
||||
|
||||
|
||||
def test_plugin_config_env(app_client):
|
||||
@pytest.mark.ds_client
|
||||
@pytest.mark.asyncio
|
||||
async def test_plugin_config_env(ds_client):
|
||||
os.environ["FOO_ENV"] = "FROM_ENVIRONMENT"
|
||||
assert {"foo": "FROM_ENVIRONMENT"} == app_client.ds.plugin_config("env-plugin")
|
||||
assert {"foo": "FROM_ENVIRONMENT"} == ds_client.ds.plugin_config("env-plugin")
|
||||
# Ensure secrets aren't visible in /-/metadata.json
|
||||
metadata = app_client.get("/-/metadata.json")
|
||||
assert {"foo": {"$env": "FOO_ENV"}} == metadata.json["plugins"]["env-plugin"]
|
||||
metadata = await ds_client.get("/-/metadata.json")
|
||||
assert {"foo": {"$env": "FOO_ENV"}} == metadata.json()["plugins"]["env-plugin"]
|
||||
del os.environ["FOO_ENV"]
|
||||
|
||||
|
||||
def test_plugin_config_env_from_list(app_client):
|
||||
@pytest.mark.ds_client
|
||||
@pytest.mark.asyncio
|
||||
async def test_plugin_config_env_from_list(ds_client):
|
||||
os.environ["FOO_ENV"] = "FROM_ENVIRONMENT"
|
||||
assert [{"in_a_list": "FROM_ENVIRONMENT"}] == app_client.ds.plugin_config(
|
||||
assert [{"in_a_list": "FROM_ENVIRONMENT"}] == ds_client.ds.plugin_config(
|
||||
"env-plugin-list"
|
||||
)
|
||||
# Ensure secrets aren't visible in /-/metadata.json
|
||||
metadata = app_client.get("/-/metadata.json")
|
||||
assert [{"in_a_list": {"$env": "FOO_ENV"}}] == metadata.json["plugins"][
|
||||
metadata = await ds_client.get("/-/metadata.json")
|
||||
assert [{"in_a_list": {"$env": "FOO_ENV"}}] == metadata.json()["plugins"][
|
||||
"env-plugin-list"
|
||||
]
|
||||
del os.environ["FOO_ENV"]
|
||||
|
||||
|
||||
def test_plugin_config_file(app_client):
|
||||
@pytest.mark.ds_client
|
||||
@pytest.mark.asyncio
|
||||
async def test_plugin_config_file(ds_client):
|
||||
with open(TEMP_PLUGIN_SECRET_FILE, "w") as fp:
|
||||
fp.write("FROM_FILE")
|
||||
assert {"foo": "FROM_FILE"} == app_client.ds.plugin_config("file-plugin")
|
||||
assert {"foo": "FROM_FILE"} == ds_client.ds.plugin_config("file-plugin")
|
||||
# Ensure secrets aren't visible in /-/metadata.json
|
||||
metadata = app_client.get("/-/metadata.json")
|
||||
assert {"foo": {"$file": TEMP_PLUGIN_SECRET_FILE}} == metadata.json["plugins"][
|
||||
metadata = await ds_client.get("/-/metadata.json")
|
||||
assert {"foo": {"$file": TEMP_PLUGIN_SECRET_FILE}} == metadata.json()["plugins"][
|
||||
"file-plugin"
|
||||
]
|
||||
os.remove(TEMP_PLUGIN_SECRET_FILE)
|
||||
|
@ -309,8 +334,10 @@ def test_hook_extra_body_script(app_client, path, expected_extra_body_script):
|
|||
assert expected_extra_body_script == actual_data
|
||||
|
||||
|
||||
def test_hook_asgi_wrapper(app_client):
|
||||
response = app_client.get("/fixtures")
|
||||
@pytest.mark.ds_client
|
||||
@pytest.mark.asyncio
|
||||
async def test_hook_asgi_wrapper(ds_client):
|
||||
response = await ds_client.get("/fixtures")
|
||||
assert "_internal, fixtures" == response.headers["x-databases"]
|
||||
|
||||
|
||||
|
@ -319,9 +346,9 @@ def test_hook_extra_template_vars(restore_working_directory):
|
|||
template_dir=str(pathlib.Path(__file__).parent / "test_templates")
|
||||
) as client:
|
||||
response = client.get("/-/metadata")
|
||||
assert response.status == 200
|
||||
assert response.status_code == 200
|
||||
extra_template_vars = json.loads(
|
||||
Soup(response.body, "html.parser").select("pre.extra_template_vars")[0].text
|
||||
Soup(response.text, "html.parser").select("pre.extra_template_vars")[0].text
|
||||
)
|
||||
assert {
|
||||
"template": "show_json.html",
|
||||
|
@ -329,7 +356,7 @@ def test_hook_extra_template_vars(restore_working_directory):
|
|||
"columns": None,
|
||||
} == extra_template_vars
|
||||
extra_template_vars_from_awaitable = json.loads(
|
||||
Soup(response.body, "html.parser")
|
||||
Soup(response.text, "html.parser")
|
||||
.select("pre.extra_template_vars_from_awaitable")[0]
|
||||
.text
|
||||
)
|
||||
|
@ -345,9 +372,9 @@ def test_plugins_async_template_function(restore_working_directory):
|
|||
template_dir=str(pathlib.Path(__file__).parent / "test_templates")
|
||||
) as client:
|
||||
response = client.get("/-/metadata")
|
||||
assert response.status == 200
|
||||
assert response.status_code == 200
|
||||
extra_from_awaitable_function = (
|
||||
Soup(response.body, "html.parser")
|
||||
Soup(response.text, "html.parser")
|
||||
.select("pre.extra_from_awaitable_function")[0]
|
||||
.text
|
||||
)
|
||||
|
@ -415,19 +442,23 @@ def view_names_client(tmp_path_factory):
|
|||
)
|
||||
def test_view_names(view_names_client, path, view_name):
|
||||
response = view_names_client.get(path)
|
||||
assert response.status == 200
|
||||
assert response.status_code == 200
|
||||
assert f"view_name:{view_name}" == response.text
|
||||
|
||||
|
||||
def test_hook_register_output_renderer_no_parameters(app_client):
|
||||
response = app_client.get("/fixtures/facetable.testnone")
|
||||
assert 200 == response.status
|
||||
assert b"Hello" == response.body
|
||||
@pytest.mark.ds_client
|
||||
@pytest.mark.asyncio
|
||||
async def test_hook_register_output_renderer_no_parameters(ds_client):
|
||||
response = await ds_client.get("/fixtures/facetable.testnone")
|
||||
assert response.status_code == 200
|
||||
assert b"Hello" == response.content
|
||||
|
||||
|
||||
def test_hook_register_output_renderer_all_parameters(app_client):
|
||||
response = app_client.get("/fixtures/facetable.testall")
|
||||
assert 200 == response.status
|
||||
@pytest.mark.ds_client
|
||||
@pytest.mark.asyncio
|
||||
async def test_hook_register_output_renderer_all_parameters(ds_client):
|
||||
response = await ds_client.get("/fixtures/facetable.testall")
|
||||
assert response.status_code == 200
|
||||
# Lots of 'at 0x103a4a690' in here - replace those so we can do
|
||||
# an easy comparison
|
||||
body = at_memory_re.sub(" at 0xXXX", response.text)
|
||||
|
@ -472,47 +503,61 @@ def test_hook_register_output_renderer_all_parameters(app_client):
|
|||
"1+1": 2,
|
||||
}
|
||||
# Test that query_name is set correctly
|
||||
query_response = app_client.get("/fixtures/pragma_cache_size.testall")
|
||||
assert "pragma_cache_size" == json.loads(query_response.body)["query_name"]
|
||||
query_response = await ds_client.get("/fixtures/pragma_cache_size.testall")
|
||||
assert query_response.json()["query_name"] == "pragma_cache_size"
|
||||
|
||||
|
||||
def test_hook_register_output_renderer_custom_status_code(app_client):
|
||||
response = app_client.get("/fixtures/pragma_cache_size.testall?status_code=202")
|
||||
assert 202 == response.status
|
||||
@pytest.mark.ds_client
|
||||
@pytest.mark.asyncio
|
||||
async def test_hook_register_output_renderer_custom_status_code(ds_client):
|
||||
response = await ds_client.get(
|
||||
"/fixtures/pragma_cache_size.testall?status_code=202"
|
||||
)
|
||||
assert response.status_code == 202
|
||||
|
||||
|
||||
def test_hook_register_output_renderer_custom_content_type(app_client):
|
||||
response = app_client.get(
|
||||
@pytest.mark.ds_client
|
||||
@pytest.mark.asyncio
|
||||
async def test_hook_register_output_renderer_custom_content_type(ds_client):
|
||||
response = await ds_client.get(
|
||||
"/fixtures/pragma_cache_size.testall?content_type=text/blah"
|
||||
)
|
||||
assert "text/blah" == response.headers["content-type"]
|
||||
|
||||
|
||||
def test_hook_register_output_renderer_custom_headers(app_client):
|
||||
response = app_client.get(
|
||||
@pytest.mark.ds_client
|
||||
@pytest.mark.asyncio
|
||||
async def test_hook_register_output_renderer_custom_headers(ds_client):
|
||||
response = await ds_client.get(
|
||||
"/fixtures/pragma_cache_size.testall?header=x-wow:1&header=x-gosh:2"
|
||||
)
|
||||
assert "1" == response.headers["x-wow"]
|
||||
assert "2" == response.headers["x-gosh"]
|
||||
|
||||
|
||||
def test_hook_register_output_renderer_returning_response(app_client):
|
||||
response = app_client.get("/fixtures/facetable.testresponse")
|
||||
assert 200 == response.status
|
||||
assert response.json == {"this_is": "json"}
|
||||
@pytest.mark.ds_client
|
||||
@pytest.mark.asyncio
|
||||
async def test_hook_register_output_renderer_returning_response(ds_client):
|
||||
response = await ds_client.get("/fixtures/facetable.testresponse")
|
||||
assert response.status_code == 200
|
||||
assert response.json() == {"this_is": "json"}
|
||||
|
||||
|
||||
def test_hook_register_output_renderer_returning_broken_value(app_client):
|
||||
response = app_client.get("/fixtures/facetable.testresponse?_broken=1")
|
||||
assert 500 == response.status
|
||||
@pytest.mark.ds_client
|
||||
@pytest.mark.asyncio
|
||||
async def test_hook_register_output_renderer_returning_broken_value(ds_client):
|
||||
response = await ds_client.get("/fixtures/facetable.testresponse?_broken=1")
|
||||
assert response.status_code == 500
|
||||
assert "this should break should be dict or Response" in response.text
|
||||
|
||||
|
||||
def test_hook_register_output_renderer_can_render(app_client):
|
||||
response = app_client.get("/fixtures/facetable?_no_can_render=1")
|
||||
assert response.status == 200
|
||||
@pytest.mark.ds_client
|
||||
@pytest.mark.asyncio
|
||||
async def test_hook_register_output_renderer_can_render(ds_client):
|
||||
response = await ds_client.get("/fixtures/facetable?_no_can_render=1")
|
||||
assert response.status_code == 200
|
||||
links = (
|
||||
Soup(response.body, "html.parser")
|
||||
Soup(response.text, "html.parser")
|
||||
.find("p", {"class": "export-links"})
|
||||
.findAll("a")
|
||||
)
|
||||
|
@ -520,9 +565,9 @@ def test_hook_register_output_renderer_can_render(app_client):
|
|||
# Should not be present because we sent ?_no_can_render=1
|
||||
assert "/fixtures/facetable.testall?_labels=on" not in actual
|
||||
# Check that it was passed the values we expected
|
||||
assert hasattr(app_client.ds, "_can_render_saw")
|
||||
assert hasattr(ds_client.ds, "_can_render_saw")
|
||||
assert {
|
||||
"datasette": app_client.ds,
|
||||
"datasette": ds_client.ds,
|
||||
"columns": [
|
||||
"pk",
|
||||
"created",
|
||||
|
@ -541,18 +586,19 @@ def test_hook_register_output_renderer_can_render(app_client):
|
|||
"database": "fixtures",
|
||||
"table": "facetable",
|
||||
"view_name": "table",
|
||||
}.items() <= app_client.ds._can_render_saw.items()
|
||||
}.items() <= ds_client.ds._can_render_saw.items()
|
||||
|
||||
|
||||
@pytest.mark.ds_client
|
||||
@pytest.mark.asyncio
|
||||
async def test_hook_prepare_jinja2_environment(app_client):
|
||||
app_client.ds._HELLO = "HI"
|
||||
await app_client.ds.invoke_startup()
|
||||
template = app_client.ds.jinja_env.from_string(
|
||||
async def test_hook_prepare_jinja2_environment(ds_client):
|
||||
ds_client.ds._HELLO = "HI"
|
||||
await ds_client.ds.invoke_startup()
|
||||
template = ds_client.ds.jinja_env.from_string(
|
||||
"Hello there, {{ a|format_numeric }}, {{ a|to_hello }}, {{ b|select_times_three }}",
|
||||
{"a": 3412341, "b": 5},
|
||||
)
|
||||
rendered = await app_client.ds.render_template(template)
|
||||
rendered = await ds_client.ds.render_template(template)
|
||||
assert "Hello there, 3,412,341, HI, 15" == rendered
|
||||
|
||||
|
||||
|
@ -565,8 +611,10 @@ def test_hook_publish_subcommand():
|
|||
assert ["cloudrun", "heroku"] == cli.publish.list_commands({})
|
||||
|
||||
|
||||
def test_hook_register_facet_classes(app_client):
|
||||
response = app_client.get(
|
||||
@pytest.mark.ds_client
|
||||
@pytest.mark.asyncio
|
||||
async def test_hook_register_facet_classes(ds_client):
|
||||
response = await ds_client.get(
|
||||
"/fixtures/compound_three_primary_keys.json?_dummy_facet=1"
|
||||
)
|
||||
assert [
|
||||
|
@ -602,30 +650,36 @@ def test_hook_register_facet_classes(app_client):
|
|||
"name": "pk3",
|
||||
"toggle_url": "http://localhost/fixtures/compound_three_primary_keys.json?_dummy_facet=1&_facet=pk3",
|
||||
},
|
||||
] == response.json["suggested_facets"]
|
||||
] == response.json()["suggested_facets"]
|
||||
|
||||
|
||||
def test_hook_actor_from_request(app_client):
|
||||
app_client.get("/")
|
||||
@pytest.mark.ds_client
|
||||
@pytest.mark.asyncio
|
||||
async def test_hook_actor_from_request(ds_client):
|
||||
await ds_client.get("/")
|
||||
# Should have no actor
|
||||
assert None == app_client.ds._last_request.scope["actor"]
|
||||
app_client.get("/?_bot=1")
|
||||
assert ds_client.ds._last_request.scope["actor"] is None
|
||||
await ds_client.get("/?_bot=1")
|
||||
# Should have bot actor
|
||||
assert {"id": "bot"} == app_client.ds._last_request.scope["actor"]
|
||||
assert ds_client.ds._last_request.scope["actor"] == {"id": "bot"}
|
||||
|
||||
|
||||
def test_hook_actor_from_request_async(app_client):
|
||||
app_client.get("/")
|
||||
@pytest.mark.ds_client
|
||||
@pytest.mark.asyncio
|
||||
async def test_hook_actor_from_request_async(ds_client):
|
||||
await ds_client.get("/")
|
||||
# Should have no actor
|
||||
assert None == app_client.ds._last_request.scope["actor"]
|
||||
app_client.get("/?_bot2=1")
|
||||
assert ds_client.ds._last_request.scope["actor"] is None
|
||||
await ds_client.get("/?_bot2=1")
|
||||
# Should have bot2 actor
|
||||
assert {"id": "bot2", "1+1": 2} == app_client.ds._last_request.scope["actor"]
|
||||
assert ds_client.ds._last_request.scope["actor"] == {"id": "bot2", "1+1": 2}
|
||||
|
||||
|
||||
def test_existing_scope_actor_respected(app_client):
|
||||
app_client.get("/?_actor_in_scope=1")
|
||||
assert {"id": "from-scope"} == app_client.ds._last_request.scope["actor"]
|
||||
@pytest.mark.ds_client
|
||||
@pytest.mark.asyncio
|
||||
async def test_existing_scope_actor_respected(ds_client):
|
||||
await ds_client.get("/?_actor_in_scope=1")
|
||||
assert ds_client.ds._last_request.scope["actor"] == {"id": "from-scope"}
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
|
@ -664,13 +718,17 @@ async def test_hook_permission_allowed(action, expected):
|
|||
pm.unregister(name="undo_register_extras")
|
||||
|
||||
|
||||
def test_actor_json(app_client):
|
||||
assert {"actor": None} == app_client.get("/-/actor.json").json
|
||||
assert {"actor": {"id": "bot2", "1+1": 2}} == app_client.get(
|
||||
"/-/actor.json?_bot2=1"
|
||||
).json
|
||||
@pytest.mark.ds_client
|
||||
@pytest.mark.asyncio
|
||||
async def test_actor_json(ds_client):
|
||||
assert (await ds_client.get("/-/actor.json")).json() == {"actor": None}
|
||||
assert (await ds_client.get("/-/actor.json?_bot2=1")).json() == {
|
||||
"actor": {"id": "bot2", "1+1": 2}
|
||||
}
|
||||
|
||||
|
||||
@pytest.mark.ds_client
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.parametrize(
|
||||
"path,body",
|
||||
[
|
||||
|
@ -679,10 +737,10 @@ def test_actor_json(app_client):
|
|||
("/not-async/", "This was not async"),
|
||||
],
|
||||
)
|
||||
def test_hook_register_routes(app_client, path, body):
|
||||
response = app_client.get(path)
|
||||
assert 200 == response.status
|
||||
assert body == response.text
|
||||
async def test_hook_register_routes(ds_client, path, body):
|
||||
response = await ds_client.get(path)
|
||||
assert response.status_code == 200
|
||||
assert response.text == body
|
||||
|
||||
|
||||
@pytest.mark.parametrize("configured_path", ("path1", "path2"))
|
||||
|
@ -697,11 +755,11 @@ def test_hook_register_routes_with_datasette(configured_path):
|
|||
}
|
||||
) as client:
|
||||
response = client.get(f"/{configured_path}/")
|
||||
assert response.status == 200
|
||||
assert response.status_code == 200
|
||||
assert configured_path.upper() == response.text
|
||||
# Other one should 404
|
||||
other_path = [p for p in ("path1", "path2") if configured_path != p][0]
|
||||
assert client.get(f"/{other_path}/", follow_redirects=True).status == 404
|
||||
assert client.get(f"/{other_path}/", follow_redirects=True).status_code == 404
|
||||
|
||||
|
||||
def test_hook_register_routes_override():
|
||||
|
@ -716,7 +774,7 @@ def test_hook_register_routes_override():
|
|||
}
|
||||
) as client:
|
||||
response = client.get("/db/table")
|
||||
assert response.status == 200
|
||||
assert response.status_code == 200
|
||||
assert (
|
||||
response.text
|
||||
== "/db/table: [('db_name', 'db'), ('table_and_format', 'table')]"
|
||||
|
@ -725,9 +783,9 @@ def test_hook_register_routes_override():
|
|||
|
||||
def test_hook_register_routes_post(app_client):
|
||||
response = app_client.post("/post/", {"this is": "post data"}, csrftoken_from=True)
|
||||
assert 200 == response.status
|
||||
assert response.status_code == 200
|
||||
assert "csrftoken" in response.json
|
||||
assert "post data" == response.json["this is"]
|
||||
assert response.json["this is"] == "post data"
|
||||
|
||||
|
||||
def test_hook_register_routes_csrftoken(restore_working_directory, tmpdir_factory):
|
||||
|
@ -741,18 +799,22 @@ def test_hook_register_routes_csrftoken(restore_working_directory, tmpdir_factor
|
|||
assert f"CSRFTOKEN: {expected_token}" == response.text
|
||||
|
||||
|
||||
def test_hook_register_routes_asgi(app_client):
|
||||
response = app_client.get("/three/")
|
||||
assert {"hello": "world"} == response.json
|
||||
@pytest.mark.ds_client
|
||||
@pytest.mark.asyncio
|
||||
async def test_hook_register_routes_asgi(ds_client):
|
||||
response = await ds_client.get("/three/")
|
||||
assert {"hello": "world"} == response.json()
|
||||
assert "1" == response.headers["x-three"]
|
||||
|
||||
|
||||
def test_hook_register_routes_add_message(app_client):
|
||||
response = app_client.get("/add-message/")
|
||||
assert 200 == response.status
|
||||
assert "Added message" == response.text
|
||||
decoded = app_client.ds.unsign(response.cookies["ds_messages"], "messages")
|
||||
assert [["Hello from messages", 1]] == decoded
|
||||
@pytest.mark.ds_client
|
||||
@pytest.mark.asyncio
|
||||
async def test_hook_register_routes_add_message(ds_client):
|
||||
response = await ds_client.get("/add-message/")
|
||||
assert response.status_code == 200
|
||||
assert response.text == "Added message"
|
||||
decoded = ds_client.ds.unsign(response.cookies["ds_messages"], "messages")
|
||||
assert decoded == [["Hello from messages", 1]]
|
||||
|
||||
|
||||
def test_hook_register_routes_render_message(restore_working_directory, tmpdir_factory):
|
||||
|
@ -765,15 +827,18 @@ def test_hook_register_routes_render_message(restore_working_directory, tmpdir_f
|
|||
assert "Hello from messages" in response2.text
|
||||
|
||||
|
||||
@pytest.mark.ds_client
|
||||
@pytest.mark.asyncio
|
||||
async def test_hook_startup(app_client):
|
||||
await app_client.ds.invoke_startup()
|
||||
assert app_client.ds._startup_hook_fired
|
||||
assert 2 == app_client.ds._startup_hook_calculation
|
||||
async def test_hook_startup(ds_client):
|
||||
await ds_client.ds.invoke_startup()
|
||||
assert ds_client.ds._startup_hook_fired
|
||||
assert 2 == ds_client.ds._startup_hook_calculation
|
||||
|
||||
|
||||
def test_hook_canned_queries(app_client):
|
||||
queries = app_client.get("/fixtures.json").json["queries"]
|
||||
@pytest.mark.ds_client
|
||||
@pytest.mark.asyncio
|
||||
async def test_hook_canned_queries(ds_client):
|
||||
queries = (await ds_client.get("/fixtures.json")).json()["queries"]
|
||||
queries_by_name = {q["name"]: q for q in queries}
|
||||
assert {
|
||||
"sql": "select 2",
|
||||
|
@ -787,20 +852,26 @@ def test_hook_canned_queries(app_client):
|
|||
} == queries_by_name["from_hook"]
|
||||
|
||||
|
||||
def test_hook_canned_queries_non_async(app_client):
|
||||
response = app_client.get("/fixtures/from_hook.json?_shape=array")
|
||||
assert [{"1": 1, "actor_id": "null"}] == response.json
|
||||
@pytest.mark.ds_client
|
||||
@pytest.mark.asyncio
|
||||
async def test_hook_canned_queries_non_async(ds_client):
|
||||
response = await ds_client.get("/fixtures/from_hook.json?_shape=array")
|
||||
assert [{"1": 1, "actor_id": "null"}] == response.json()
|
||||
|
||||
|
||||
def test_hook_canned_queries_async(app_client):
|
||||
response = app_client.get("/fixtures/from_async_hook.json?_shape=array")
|
||||
assert [{"2": 2}] == response.json
|
||||
@pytest.mark.ds_client
|
||||
@pytest.mark.asyncio
|
||||
async def test_hook_canned_queries_async(ds_client):
|
||||
response = await ds_client.get("/fixtures/from_async_hook.json?_shape=array")
|
||||
assert [{"2": 2}] == response.json()
|
||||
|
||||
|
||||
def test_hook_canned_queries_actor(app_client):
|
||||
assert [{"1": 1, "actor_id": "bot"}] == app_client.get(
|
||||
"/fixtures/from_hook.json?_bot=1&_shape=array"
|
||||
).json
|
||||
@pytest.mark.ds_client
|
||||
@pytest.mark.asyncio
|
||||
async def test_hook_canned_queries_actor(ds_client):
|
||||
assert (
|
||||
await ds_client.get("/fixtures/from_hook.json?_bot=1&_shape=array")
|
||||
).json() == [{"1": 1, "actor_id": "bot"}]
|
||||
|
||||
|
||||
def test_hook_register_magic_parameters(restore_working_directory):
|
||||
|
@ -823,7 +894,7 @@ def test_hook_register_magic_parameters(restore_working_directory):
|
|||
},
|
||||
) as client:
|
||||
response = client.post("/data/runme", {}, csrftoken_from=True)
|
||||
assert 302 == response.status
|
||||
assert response.status_code == 302
|
||||
actual = client.get("/data/logs.json?_sort_desc=rowid&_shape=array").json
|
||||
assert [{"rowid": 1, "line": "1.1"}] == actual
|
||||
# Now try the GET request against get_uuid
|
||||
|
@ -839,7 +910,7 @@ def test_hook_forbidden(restore_working_directory):
|
|||
metadata={"allow": {}},
|
||||
) as client:
|
||||
response = client.get("/")
|
||||
assert 403 == response.status
|
||||
assert response.status_code == 403
|
||||
response2 = client.get("/data2")
|
||||
assert 302 == response2.status
|
||||
assert (
|
||||
|
@ -852,39 +923,47 @@ def test_hook_forbidden(restore_working_directory):
|
|||
)
|
||||
|
||||
|
||||
def test_hook_handle_exception(app_client):
|
||||
app_client.get("/trigger-error?x=123")
|
||||
assert hasattr(app_client.ds, "_exception_hook_fired")
|
||||
request, exception = app_client.ds._exception_hook_fired
|
||||
@pytest.mark.ds_client
|
||||
@pytest.mark.asyncio
|
||||
async def test_hook_handle_exception(ds_client):
|
||||
await ds_client.get("/trigger-error?x=123")
|
||||
assert hasattr(ds_client.ds, "_exception_hook_fired")
|
||||
request, exception = ds_client.ds._exception_hook_fired
|
||||
assert request.url == "http://localhost/trigger-error?x=123"
|
||||
assert isinstance(exception, ZeroDivisionError)
|
||||
|
||||
|
||||
@pytest.mark.ds_client
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.parametrize("param", ("_custom_error", "_custom_error_async"))
|
||||
def test_hook_handle_exception_custom_response(app_client, param):
|
||||
response = app_client.get("/trigger-error?{}=1".format(param))
|
||||
async def test_hook_handle_exception_custom_response(ds_client, param):
|
||||
response = await ds_client.get("/trigger-error?{}=1".format(param))
|
||||
assert response.text == param
|
||||
|
||||
|
||||
def test_hook_menu_links(app_client):
|
||||
@pytest.mark.ds_client
|
||||
@pytest.mark.asyncio
|
||||
async def test_hook_menu_links(ds_client):
|
||||
def get_menu_links(html):
|
||||
soup = Soup(html, "html.parser")
|
||||
return [
|
||||
{"label": a.text, "href": a["href"]} for a in soup.select(".nav-menu a")
|
||||
]
|
||||
|
||||
response = app_client.get("/")
|
||||
response = await ds_client.get("/")
|
||||
assert get_menu_links(response.text) == []
|
||||
|
||||
response_2 = app_client.get("/?_bot=1&_hello=BOB")
|
||||
response_2 = await ds_client.get("/?_bot=1&_hello=BOB")
|
||||
assert get_menu_links(response_2.text) == [
|
||||
{"label": "Hello, BOB", "href": "/"},
|
||||
{"label": "Hello 2", "href": "/"},
|
||||
]
|
||||
|
||||
|
||||
@pytest.mark.ds_client
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.parametrize("table_or_view", ["facetable", "simple_view"])
|
||||
def test_hook_table_actions(app_client, table_or_view):
|
||||
async def test_hook_table_actions(ds_client, table_or_view):
|
||||
def get_table_actions_links(html):
|
||||
soup = Soup(html, "html.parser")
|
||||
details = soup.find("details", {"class": "actions-menu-links"})
|
||||
|
@ -892,10 +971,10 @@ def test_hook_table_actions(app_client, table_or_view):
|
|||
return []
|
||||
return [{"label": a.text, "href": a["href"]} for a in details.select("a")]
|
||||
|
||||
response = app_client.get(f"/fixtures/{table_or_view}")
|
||||
response = await ds_client.get(f"/fixtures/{table_or_view}")
|
||||
assert get_table_actions_links(response.text) == []
|
||||
|
||||
response_2 = app_client.get(f"/fixtures/{table_or_view}?_bot=1&_hello=BOB")
|
||||
response_2 = await ds_client.get(f"/fixtures/{table_or_view}?_bot=1&_hello=BOB")
|
||||
assert sorted(
|
||||
get_table_actions_links(response_2.text), key=lambda l: l["label"]
|
||||
) == [
|
||||
|
@ -905,7 +984,9 @@ def test_hook_table_actions(app_client, table_or_view):
|
|||
]
|
||||
|
||||
|
||||
def test_hook_database_actions(app_client):
|
||||
@pytest.mark.ds_client
|
||||
@pytest.mark.asyncio
|
||||
async def test_hook_database_actions(ds_client):
|
||||
def get_table_actions_links(html):
|
||||
soup = Soup(html, "html.parser")
|
||||
details = soup.find("details", {"class": "actions-menu-links"})
|
||||
|
@ -913,10 +994,10 @@ def test_hook_database_actions(app_client):
|
|||
return []
|
||||
return [{"label": a.text, "href": a["href"]} for a in details.select("a")]
|
||||
|
||||
response = app_client.get("/fixtures")
|
||||
response = await ds_client.get("/fixtures")
|
||||
assert get_table_actions_links(response.text) == []
|
||||
|
||||
response_2 = app_client.get("/fixtures?_bot=1&_hello=BOB")
|
||||
response_2 = await ds_client.get("/fixtures?_bot=1&_hello=BOB")
|
||||
assert get_table_actions_links(response_2.text) == [
|
||||
{"label": "Database: fixtures - BOB", "href": "/"},
|
||||
]
|
||||
|
@ -930,25 +1011,27 @@ def test_hook_skip_csrf(app_client):
|
|||
csrftoken_from=True,
|
||||
cookies={"ds_actor": cookie},
|
||||
)
|
||||
assert csrf_response.status == 200
|
||||
assert csrf_response.status_code == 200
|
||||
missing_csrf_response = app_client.post(
|
||||
"/post/", post_data={"this is": "post data"}, cookies={"ds_actor": cookie}
|
||||
)
|
||||
assert missing_csrf_response.status == 403
|
||||
assert missing_csrf_response.status_code == 403
|
||||
# But "/skip-csrf" should allow
|
||||
allow_csrf_response = app_client.post(
|
||||
"/skip-csrf", post_data={"this is": "post data"}, cookies={"ds_actor": cookie}
|
||||
)
|
||||
assert allow_csrf_response.status == 405 # Method not allowed
|
||||
assert allow_csrf_response.status_code == 405 # Method not allowed
|
||||
# /skip-csrf-2 should not
|
||||
second_missing_csrf_response = app_client.post(
|
||||
"/skip-csrf-2", post_data={"this is": "post data"}, cookies={"ds_actor": cookie}
|
||||
)
|
||||
assert second_missing_csrf_response.status == 403
|
||||
assert second_missing_csrf_response.status_code == 403
|
||||
|
||||
|
||||
def test_hook_get_metadata(app_client):
|
||||
app_client.ds._metadata_local = {
|
||||
@pytest.mark.ds_client
|
||||
@pytest.mark.asyncio
|
||||
async def test_hook_get_metadata(ds_client):
|
||||
ds_client.ds._metadata_local = {
|
||||
"title": "Testing get_metadata hook!",
|
||||
"databases": {"from-local": {"title": "Hello from local metadata"}},
|
||||
}
|
||||
|
@ -965,7 +1048,7 @@ def test_hook_get_metadata(app_client):
|
|||
]
|
||||
|
||||
pm.hook.get_metadata = get_metadata_mock
|
||||
meta = app_client.ds.metadata()
|
||||
meta = ds_client.ds.metadata()
|
||||
assert "Testing get_metadata hook!" == meta["title"]
|
||||
assert "Hello from local metadata" == meta["databases"]["from-local"]["title"]
|
||||
assert "Hello from the plugin hook" == meta["databases"]["from-hook"]["title"]
|
||||
|
@ -1027,7 +1110,9 @@ def test_hook_register_commands():
|
|||
importlib.reload(cli)
|
||||
|
||||
|
||||
def test_hook_filters_from_request(app_client):
|
||||
@pytest.mark.ds_client
|
||||
@pytest.mark.asyncio
|
||||
async def test_hook_filters_from_request(ds_client):
|
||||
class ReturnNothingPlugin:
|
||||
__name__ = "ReturnNothingPlugin"
|
||||
|
||||
|
@ -1037,10 +1122,10 @@ def test_hook_filters_from_request(app_client):
|
|||
return FilterArguments(["1 = 0"], human_descriptions=["NOTHING"])
|
||||
|
||||
pm.register(ReturnNothingPlugin(), name="ReturnNothingPlugin")
|
||||
response = app_client.get("/fixtures/facetable?_nothing=1")
|
||||
response = await ds_client.get("/fixtures/facetable?_nothing=1")
|
||||
assert "0 rows\n where NOTHING" in response.text
|
||||
json_response = app_client.get("/fixtures/facetable.json?_nothing=1")
|
||||
assert json_response.json["rows"] == []
|
||||
json_response = await ds_client.get("/fixtures/facetable.json?_nothing=1")
|
||||
assert json_response.json()["rows"] == []
|
||||
pm.unregister(name="ReturnNothingPlugin")
|
||||
|
||||
|
||||
|
|
Ładowanie…
Reference in New Issue