kopia lustrzana https://github.com/simonw/datasette
998 wiersze
34 KiB
Python
998 wiersze
34 KiB
Python
from bs4 import BeautifulSoup as Soup
|
|
from .fixtures import (
|
|
app_client,
|
|
make_app_client,
|
|
TABLES,
|
|
TEMP_PLUGIN_SECRET_FILE,
|
|
TestClient as _TestClient,
|
|
) # noqa
|
|
from click.testing import CliRunner
|
|
from datasette.app import Datasette
|
|
from datasette import cli, hookimpl
|
|
from datasette.filters import FilterArguments
|
|
from datasette.plugins import get_plugins, DEFAULT_PLUGINS, pm
|
|
from datasette.utils.sqlite import sqlite3
|
|
from datasette.utils import CustomRow
|
|
from jinja2.environment import Template
|
|
import base64
|
|
import importlib
|
|
import json
|
|
import os
|
|
import pathlib
|
|
import re
|
|
import textwrap
|
|
import pytest
|
|
import urllib
|
|
|
|
at_memory_re = re.compile(r" at 0x\w+")
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"plugin_hook", [name for name in dir(pm.hook) if not name.startswith("_")]
|
|
)
|
|
def test_plugin_hooks_have_tests(plugin_hook):
|
|
"""Every plugin hook should be referenced in this test module"""
|
|
tests_in_this_module = [t for t in globals().keys() if t.startswith("test_hook_")]
|
|
ok = False
|
|
for test in tests_in_this_module:
|
|
if plugin_hook in test:
|
|
ok = True
|
|
assert ok, f"Plugin hook is missing tests: {plugin_hook}"
|
|
|
|
|
|
def test_hook_plugins_dir_plugin_prepare_connection(app_client):
|
|
response = app_client.get(
|
|
"/fixtures.json?sql=select+convert_units(100%2C+'m'%2C+'ft')"
|
|
)
|
|
assert pytest.approx(328.0839) == response.json["rows"][0][0]
|
|
|
|
|
|
def test_hook_plugin_prepare_connection_arguments(app_client):
|
|
response = app_client.get(
|
|
"/fixtures.json?sql=select+prepare_connection_args()&_shape=arrayfirst"
|
|
)
|
|
assert [
|
|
"database=fixtures, datasette.plugin_config(\"name-of-plugin\")={'depth': 'root'}"
|
|
] == response.json
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"path,expected_decoded_object",
|
|
[
|
|
(
|
|
"/",
|
|
{
|
|
"template": "index.html",
|
|
"database": None,
|
|
"table": None,
|
|
"view_name": "index",
|
|
"request_path": "/",
|
|
"added": 15,
|
|
"columns": None,
|
|
},
|
|
),
|
|
(
|
|
"/fixtures",
|
|
{
|
|
"template": "database.html",
|
|
"database": "fixtures",
|
|
"table": None,
|
|
"view_name": "database",
|
|
"request_path": "/fixtures",
|
|
"added": 15,
|
|
"columns": None,
|
|
},
|
|
),
|
|
(
|
|
"/fixtures/sortable",
|
|
{
|
|
"template": "table.html",
|
|
"database": "fixtures",
|
|
"table": "sortable",
|
|
"view_name": "table",
|
|
"request_path": "/fixtures/sortable",
|
|
"added": 15,
|
|
"columns": [
|
|
"pk1",
|
|
"pk2",
|
|
"content",
|
|
"sortable",
|
|
"sortable_with_nulls",
|
|
"sortable_with_nulls_2",
|
|
"text",
|
|
],
|
|
},
|
|
),
|
|
],
|
|
)
|
|
def test_hook_extra_css_urls(app_client, path, expected_decoded_object):
|
|
response = app_client.get(path)
|
|
assert response.status == 200
|
|
links = Soup(response.body, "html.parser").findAll("link")
|
|
special_href = [
|
|
l for l in links if l.attrs["href"].endswith("/extra-css-urls-demo.css")
|
|
][0]["href"]
|
|
# This link has a base64-encoded JSON blob in it
|
|
encoded = special_href.split("/")[3]
|
|
assert expected_decoded_object == json.loads(
|
|
base64.b64decode(encoded).decode("utf8")
|
|
)
|
|
|
|
|
|
def test_hook_extra_js_urls(app_client):
|
|
response = app_client.get("/")
|
|
scripts = Soup(response.body, "html.parser").findAll("script")
|
|
script_attrs = [s.attrs for s in scripts]
|
|
for attrs in [
|
|
{
|
|
"integrity": "SRIHASH",
|
|
"crossorigin": "anonymous",
|
|
"src": "https://plugin-example.datasette.io/jquery.js",
|
|
},
|
|
{
|
|
"src": "https://plugin-example.datasette.io/plugin.module.js",
|
|
"type": "module",
|
|
},
|
|
]:
|
|
assert any(s == attrs for s in script_attrs), "Expected: {}".format(attrs)
|
|
|
|
|
|
def test_plugins_with_duplicate_js_urls(app_client):
|
|
# If two plugins both require jQuery, jQuery should be loaded only once
|
|
response = app_client.get("/fixtures")
|
|
# This test is a little tricky, as if the user has any other plugins in
|
|
# their current virtual environment those may affect what comes back too.
|
|
# What matters is that https://plugin-example.datasette.io/jquery.js is only there once
|
|
# and it comes before plugin1.js and plugin2.js which could be in either
|
|
# order
|
|
scripts = Soup(response.body, "html.parser").findAll("script")
|
|
srcs = [s["src"] for s in scripts if s.get("src")]
|
|
# No duplicates allowed:
|
|
assert len(srcs) == len(set(srcs))
|
|
# jquery.js loaded once:
|
|
assert 1 == srcs.count("https://plugin-example.datasette.io/jquery.js")
|
|
# plugin1.js and plugin2.js are both there:
|
|
assert 1 == srcs.count("https://plugin-example.datasette.io/plugin1.js")
|
|
assert 1 == srcs.count("https://plugin-example.datasette.io/plugin2.js")
|
|
# jquery comes before them both
|
|
assert srcs.index("https://plugin-example.datasette.io/jquery.js") < srcs.index(
|
|
"https://plugin-example.datasette.io/plugin1.js"
|
|
)
|
|
assert srcs.index("https://plugin-example.datasette.io/jquery.js") < srcs.index(
|
|
"https://plugin-example.datasette.io/plugin2.js"
|
|
)
|
|
|
|
|
|
def test_hook_render_cell_link_from_json(app_client):
|
|
sql = """
|
|
select '{"href": "http://example.com/", "label":"Example"}'
|
|
""".strip()
|
|
path = "/fixtures?" + urllib.parse.urlencode({"sql": sql})
|
|
response = app_client.get(path)
|
|
td = Soup(response.body, "html.parser").find("table").find("tbody").find("td")
|
|
a = td.find("a")
|
|
assert a is not None, str(a)
|
|
assert a.attrs["href"] == "http://example.com/"
|
|
assert a.attrs["data-database"] == "fixtures"
|
|
assert a.text == "Example"
|
|
|
|
|
|
def test_hook_render_cell_demo(app_client):
|
|
response = app_client.get("/fixtures/simple_primary_key?id=4")
|
|
soup = Soup(response.body, "html.parser")
|
|
td = soup.find("td", {"class": "col-content"})
|
|
assert {
|
|
"column": "content",
|
|
"table": "simple_primary_key",
|
|
"database": "fixtures",
|
|
"config": {"depth": "table", "special": "this-is-simple_primary_key"},
|
|
} == json.loads(td.string)
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"path", ("/fixtures?sql=select+'RENDER_CELL_ASYNC'", "/fixtures/simple_primary_key")
|
|
)
|
|
def test_hook_render_cell_async(app_client, path):
|
|
response = app_client.get(path)
|
|
assert b"RENDER_CELL_ASYNC_RESULT" in response.body
|
|
|
|
|
|
def test_plugin_config(app_client):
|
|
assert {"depth": "table"} == app_client.ds.plugin_config(
|
|
"name-of-plugin", database="fixtures", table="sortable"
|
|
)
|
|
assert {"depth": "database"} == app_client.ds.plugin_config(
|
|
"name-of-plugin", database="fixtures", table="unknown_table"
|
|
)
|
|
assert {"depth": "database"} == app_client.ds.plugin_config(
|
|
"name-of-plugin", database="fixtures"
|
|
)
|
|
assert {"depth": "root"} == app_client.ds.plugin_config(
|
|
"name-of-plugin", database="unknown_database"
|
|
)
|
|
assert {"depth": "root"} == app_client.ds.plugin_config("name-of-plugin")
|
|
assert None is app_client.ds.plugin_config("unknown-plugin")
|
|
|
|
|
|
def test_plugin_config_env(app_client):
|
|
os.environ["FOO_ENV"] = "FROM_ENVIRONMENT"
|
|
assert {"foo": "FROM_ENVIRONMENT"} == app_client.ds.plugin_config("env-plugin")
|
|
# Ensure secrets aren't visible in /-/metadata.json
|
|
metadata = app_client.get("/-/metadata.json")
|
|
assert {"foo": {"$env": "FOO_ENV"}} == metadata.json["plugins"]["env-plugin"]
|
|
del os.environ["FOO_ENV"]
|
|
|
|
|
|
def test_plugin_config_env_from_list(app_client):
|
|
os.environ["FOO_ENV"] = "FROM_ENVIRONMENT"
|
|
assert [{"in_a_list": "FROM_ENVIRONMENT"}] == app_client.ds.plugin_config(
|
|
"env-plugin-list"
|
|
)
|
|
# Ensure secrets aren't visible in /-/metadata.json
|
|
metadata = app_client.get("/-/metadata.json")
|
|
assert [{"in_a_list": {"$env": "FOO_ENV"}}] == metadata.json["plugins"][
|
|
"env-plugin-list"
|
|
]
|
|
del os.environ["FOO_ENV"]
|
|
|
|
|
|
def test_plugin_config_file(app_client):
|
|
with open(TEMP_PLUGIN_SECRET_FILE, "w") as fp:
|
|
fp.write("FROM_FILE")
|
|
assert {"foo": "FROM_FILE"} == app_client.ds.plugin_config("file-plugin")
|
|
# Ensure secrets aren't visible in /-/metadata.json
|
|
metadata = app_client.get("/-/metadata.json")
|
|
assert {"foo": {"$file": TEMP_PLUGIN_SECRET_FILE}} == metadata.json["plugins"][
|
|
"file-plugin"
|
|
]
|
|
os.remove(TEMP_PLUGIN_SECRET_FILE)
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"path,expected_extra_body_script",
|
|
[
|
|
(
|
|
"/",
|
|
{
|
|
"template": "index.html",
|
|
"database": None,
|
|
"table": None,
|
|
"config": {"depth": "root"},
|
|
"view_name": "index",
|
|
"request_path": "/",
|
|
"added": 15,
|
|
"columns": None,
|
|
},
|
|
),
|
|
(
|
|
"/fixtures",
|
|
{
|
|
"template": "database.html",
|
|
"database": "fixtures",
|
|
"table": None,
|
|
"config": {"depth": "database"},
|
|
"view_name": "database",
|
|
"request_path": "/fixtures",
|
|
"added": 15,
|
|
"columns": None,
|
|
},
|
|
),
|
|
(
|
|
"/fixtures/sortable",
|
|
{
|
|
"template": "table.html",
|
|
"database": "fixtures",
|
|
"table": "sortable",
|
|
"config": {"depth": "table"},
|
|
"view_name": "table",
|
|
"request_path": "/fixtures/sortable",
|
|
"added": 15,
|
|
"columns": [
|
|
"pk1",
|
|
"pk2",
|
|
"content",
|
|
"sortable",
|
|
"sortable_with_nulls",
|
|
"sortable_with_nulls_2",
|
|
"text",
|
|
],
|
|
},
|
|
),
|
|
],
|
|
)
|
|
def test_hook_extra_body_script(app_client, path, expected_extra_body_script):
|
|
r = re.compile(r"<script type=\"module\">var extra_body_script = (.*?);</script>")
|
|
json_data = r.search(app_client.get(path).text).group(1)
|
|
actual_data = json.loads(json_data)
|
|
assert expected_extra_body_script == actual_data
|
|
|
|
|
|
def test_hook_asgi_wrapper(app_client):
|
|
response = app_client.get("/fixtures")
|
|
assert "_internal, fixtures" == response.headers["x-databases"]
|
|
|
|
|
|
def test_hook_extra_template_vars(restore_working_directory):
|
|
with make_app_client(
|
|
template_dir=str(pathlib.Path(__file__).parent / "test_templates")
|
|
) as client:
|
|
response = client.get("/-/metadata")
|
|
assert response.status == 200
|
|
extra_template_vars = json.loads(
|
|
Soup(response.body, "html.parser").select("pre.extra_template_vars")[0].text
|
|
)
|
|
assert {
|
|
"template": "show_json.html",
|
|
"scope_path": "/-/metadata",
|
|
"columns": None,
|
|
} == extra_template_vars
|
|
extra_template_vars_from_awaitable = json.loads(
|
|
Soup(response.body, "html.parser")
|
|
.select("pre.extra_template_vars_from_awaitable")[0]
|
|
.text
|
|
)
|
|
assert {
|
|
"template": "show_json.html",
|
|
"awaitable": True,
|
|
"scope_path": "/-/metadata",
|
|
} == extra_template_vars_from_awaitable
|
|
|
|
|
|
def test_plugins_async_template_function(restore_working_directory):
|
|
with make_app_client(
|
|
template_dir=str(pathlib.Path(__file__).parent / "test_templates")
|
|
) as client:
|
|
response = client.get("/-/metadata")
|
|
assert response.status == 200
|
|
extra_from_awaitable_function = (
|
|
Soup(response.body, "html.parser")
|
|
.select("pre.extra_from_awaitable_function")[0]
|
|
.text
|
|
)
|
|
expected = (
|
|
sqlite3.connect(":memory:").execute("select sqlite_version()").fetchone()[0]
|
|
)
|
|
assert expected == extra_from_awaitable_function
|
|
|
|
|
|
def test_default_plugins_have_no_templates_path_or_static_path():
|
|
# The default plugins that ship with Datasette should have their static_path and
|
|
# templates_path all set to None
|
|
plugins = get_plugins()
|
|
for plugin in plugins:
|
|
if plugin["name"] in DEFAULT_PLUGINS:
|
|
assert None is plugin["static_path"]
|
|
assert None is plugin["templates_path"]
|
|
|
|
|
|
@pytest.fixture(scope="session")
|
|
def view_names_client(tmp_path_factory):
|
|
tmpdir = tmp_path_factory.mktemp("test-view-names")
|
|
templates = tmpdir / "templates"
|
|
templates.mkdir()
|
|
plugins = tmpdir / "plugins"
|
|
plugins.mkdir()
|
|
for template in (
|
|
"index.html",
|
|
"database.html",
|
|
"table.html",
|
|
"row.html",
|
|
"show_json.html",
|
|
"query.html",
|
|
):
|
|
(templates / template).write_text("view_name:{{ view_name }}", "utf-8")
|
|
(plugins / "extra_vars.py").write_text(
|
|
textwrap.dedent(
|
|
"""
|
|
from datasette import hookimpl
|
|
@hookimpl
|
|
def extra_template_vars(view_name):
|
|
return {"view_name": view_name}
|
|
"""
|
|
),
|
|
"utf-8",
|
|
)
|
|
db_path = str(tmpdir / "fixtures.db")
|
|
conn = sqlite3.connect(db_path)
|
|
conn.executescript(TABLES)
|
|
return _TestClient(
|
|
Datasette([db_path], template_dir=str(templates), plugins_dir=str(plugins))
|
|
)
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"path,view_name",
|
|
(
|
|
("/", "index"),
|
|
("/fixtures", "database"),
|
|
("/fixtures/units", "table"),
|
|
("/fixtures/units/1", "row"),
|
|
("/-/metadata", "json_data"),
|
|
("/fixtures?sql=select+1", "database"),
|
|
),
|
|
)
|
|
def test_view_names(view_names_client, path, view_name):
|
|
response = view_names_client.get(path)
|
|
assert response.status == 200
|
|
assert f"view_name:{view_name}" == response.text
|
|
|
|
|
|
def test_hook_register_output_renderer_no_parameters(app_client):
|
|
response = app_client.get("/fixtures/facetable.testnone")
|
|
assert 200 == response.status
|
|
assert b"Hello" == response.body
|
|
|
|
|
|
def test_hook_register_output_renderer_all_parameters(app_client):
|
|
response = app_client.get("/fixtures/facetable.testall")
|
|
assert 200 == response.status
|
|
# Lots of 'at 0x103a4a690' in here - replace those so we can do
|
|
# an easy comparison
|
|
body = at_memory_re.sub(" at 0xXXX", response.text)
|
|
assert json.loads(body) == {
|
|
"datasette": "<datasette.app.Datasette object at 0xXXX>",
|
|
"columns": [
|
|
"pk",
|
|
"created",
|
|
"planet_int",
|
|
"on_earth",
|
|
"state",
|
|
"_city_id",
|
|
"_neighborhood",
|
|
"tags",
|
|
"complex_array",
|
|
"distinct_some_null",
|
|
],
|
|
"rows": [
|
|
"<sqlite3.Row object at 0xXXX>",
|
|
"<sqlite3.Row object at 0xXXX>",
|
|
"<sqlite3.Row object at 0xXXX>",
|
|
"<sqlite3.Row object at 0xXXX>",
|
|
"<sqlite3.Row object at 0xXXX>",
|
|
"<sqlite3.Row object at 0xXXX>",
|
|
"<sqlite3.Row object at 0xXXX>",
|
|
"<sqlite3.Row object at 0xXXX>",
|
|
"<sqlite3.Row object at 0xXXX>",
|
|
"<sqlite3.Row object at 0xXXX>",
|
|
"<sqlite3.Row object at 0xXXX>",
|
|
"<sqlite3.Row object at 0xXXX>",
|
|
"<sqlite3.Row object at 0xXXX>",
|
|
"<sqlite3.Row object at 0xXXX>",
|
|
"<sqlite3.Row object at 0xXXX>",
|
|
],
|
|
"sql": "select pk, created, planet_int, on_earth, state, _city_id, _neighborhood, tags, complex_array, distinct_some_null from facetable order by pk limit 51",
|
|
"query_name": None,
|
|
"database": "fixtures",
|
|
"table": "facetable",
|
|
"request": '<asgi.Request method="GET" url="http://localhost/fixtures/facetable.testall">',
|
|
"view_name": "table",
|
|
"1+1": 2,
|
|
}
|
|
# Test that query_name is set correctly
|
|
query_response = app_client.get("/fixtures/pragma_cache_size.testall")
|
|
assert "pragma_cache_size" == json.loads(query_response.body)["query_name"]
|
|
|
|
|
|
def test_hook_register_output_renderer_custom_status_code(app_client):
|
|
response = app_client.get("/fixtures/pragma_cache_size.testall?status_code=202")
|
|
assert 202 == response.status
|
|
|
|
|
|
def test_hook_register_output_renderer_custom_content_type(app_client):
|
|
response = app_client.get(
|
|
"/fixtures/pragma_cache_size.testall?content_type=text/blah"
|
|
)
|
|
assert "text/blah" == response.headers["content-type"]
|
|
|
|
|
|
def test_hook_register_output_renderer_custom_headers(app_client):
|
|
response = app_client.get(
|
|
"/fixtures/pragma_cache_size.testall?header=x-wow:1&header=x-gosh:2"
|
|
)
|
|
assert "1" == response.headers["x-wow"]
|
|
assert "2" == response.headers["x-gosh"]
|
|
|
|
|
|
def test_hook_register_output_renderer_returning_response(app_client):
|
|
response = app_client.get("/fixtures/facetable.testresponse")
|
|
assert 200 == response.status
|
|
assert response.json == {"this_is": "json"}
|
|
|
|
|
|
def test_hook_register_output_renderer_returning_broken_value(app_client):
|
|
response = app_client.get("/fixtures/facetable.testresponse?_broken=1")
|
|
assert 500 == response.status
|
|
assert "this should break should be dict or Response" in response.text
|
|
|
|
|
|
def test_hook_register_output_renderer_can_render(app_client):
|
|
response = app_client.get("/fixtures/facetable?_no_can_render=1")
|
|
assert response.status == 200
|
|
links = (
|
|
Soup(response.body, "html.parser")
|
|
.find("p", {"class": "export-links"})
|
|
.findAll("a")
|
|
)
|
|
actual = [l["href"] for l in links]
|
|
# Should not be present because we sent ?_no_can_render=1
|
|
assert "/fixtures/facetable.testall?_labels=on" not in actual
|
|
# Check that it was passed the values we expected
|
|
assert hasattr(app_client.ds, "_can_render_saw")
|
|
assert {
|
|
"datasette": app_client.ds,
|
|
"columns": [
|
|
"pk",
|
|
"created",
|
|
"planet_int",
|
|
"on_earth",
|
|
"state",
|
|
"_city_id",
|
|
"_neighborhood",
|
|
"tags",
|
|
"complex_array",
|
|
"distinct_some_null",
|
|
],
|
|
"sql": "select pk, created, planet_int, on_earth, state, _city_id, _neighborhood, tags, complex_array, distinct_some_null from facetable order by pk limit 51",
|
|
"query_name": None,
|
|
"database": "fixtures",
|
|
"table": "facetable",
|
|
"view_name": "table",
|
|
}.items() <= app_client.ds._can_render_saw.items()
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_hook_prepare_jinja2_environment(app_client):
|
|
template = app_client.ds.jinja_env.from_string(
|
|
"Hello there, {{ a|format_numeric }}", {"a": 3412341}
|
|
)
|
|
rendered = await app_client.ds.render_template(template)
|
|
assert "Hello there, 3,412,341" == rendered
|
|
|
|
|
|
def test_hook_publish_subcommand():
|
|
# This is hard to test properly, because publish subcommand plugins
|
|
# cannot be loaded using the --plugins-dir mechanism - they need
|
|
# to be installed using "pip install". So I'm cheating and taking
|
|
# advantage of the fact that cloudrun/heroku use the plugin hook
|
|
# to register themselves as default plugins.
|
|
assert ["cloudrun", "heroku"] == cli.publish.list_commands({})
|
|
|
|
|
|
def test_hook_register_facet_classes(app_client):
|
|
response = app_client.get(
|
|
"/fixtures/compound_three_primary_keys.json?_dummy_facet=1"
|
|
)
|
|
assert [
|
|
{
|
|
"name": "pk1",
|
|
"toggle_url": "http://localhost/fixtures/compound_three_primary_keys.json?_dummy_facet=1&_facet_dummy=pk1",
|
|
"type": "dummy",
|
|
},
|
|
{
|
|
"name": "pk2",
|
|
"toggle_url": "http://localhost/fixtures/compound_three_primary_keys.json?_dummy_facet=1&_facet_dummy=pk2",
|
|
"type": "dummy",
|
|
},
|
|
{
|
|
"name": "pk3",
|
|
"toggle_url": "http://localhost/fixtures/compound_three_primary_keys.json?_dummy_facet=1&_facet_dummy=pk3",
|
|
"type": "dummy",
|
|
},
|
|
{
|
|
"name": "content",
|
|
"toggle_url": "http://localhost/fixtures/compound_three_primary_keys.json?_dummy_facet=1&_facet_dummy=content",
|
|
"type": "dummy",
|
|
},
|
|
{
|
|
"name": "pk1",
|
|
"toggle_url": "http://localhost/fixtures/compound_three_primary_keys.json?_dummy_facet=1&_facet=pk1",
|
|
},
|
|
{
|
|
"name": "pk2",
|
|
"toggle_url": "http://localhost/fixtures/compound_three_primary_keys.json?_dummy_facet=1&_facet=pk2",
|
|
},
|
|
{
|
|
"name": "pk3",
|
|
"toggle_url": "http://localhost/fixtures/compound_three_primary_keys.json?_dummy_facet=1&_facet=pk3",
|
|
},
|
|
] == response.json["suggested_facets"]
|
|
|
|
|
|
def test_hook_actor_from_request(app_client):
|
|
app_client.get("/")
|
|
# Should have no actor
|
|
assert None == app_client.ds._last_request.scope["actor"]
|
|
app_client.get("/?_bot=1")
|
|
# Should have bot actor
|
|
assert {"id": "bot"} == app_client.ds._last_request.scope["actor"]
|
|
|
|
|
|
def test_hook_actor_from_request_async(app_client):
|
|
app_client.get("/")
|
|
# Should have no actor
|
|
assert None == app_client.ds._last_request.scope["actor"]
|
|
app_client.get("/?_bot2=1")
|
|
# Should have bot2 actor
|
|
assert {"id": "bot2", "1+1": 2} == app_client.ds._last_request.scope["actor"]
|
|
|
|
|
|
def test_existing_scope_actor_respected(app_client):
|
|
app_client.get("/?_actor_in_scope=1")
|
|
assert {"id": "from-scope"} == app_client.ds._last_request.scope["actor"]
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
@pytest.mark.parametrize(
|
|
"action,expected",
|
|
[
|
|
("this_is_allowed", True),
|
|
("this_is_denied", False),
|
|
("this_is_allowed_async", True),
|
|
("this_is_denied_async", False),
|
|
("no_match", None),
|
|
],
|
|
)
|
|
async def test_hook_permission_allowed(app_client, action, expected):
|
|
actual = await app_client.ds.permission_allowed(
|
|
{"id": "actor"}, action, default=None
|
|
)
|
|
assert expected == actual
|
|
|
|
|
|
def test_actor_json(app_client):
|
|
assert {"actor": None} == app_client.get("/-/actor.json").json
|
|
assert {"actor": {"id": "bot2", "1+1": 2}} == app_client.get(
|
|
"/-/actor.json?_bot2=1"
|
|
).json
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"path,body",
|
|
[
|
|
("/one/", "2"),
|
|
("/two/Ray?greeting=Hail", "Hail Ray"),
|
|
("/not-async/", "This was not async"),
|
|
],
|
|
)
|
|
def test_hook_register_routes(app_client, path, body):
|
|
response = app_client.get(path)
|
|
assert 200 == response.status
|
|
assert body == response.text
|
|
|
|
|
|
@pytest.mark.parametrize("configured_path", ("path1", "path2"))
|
|
def test_hook_register_routes_with_datasette(configured_path):
|
|
with make_app_client(
|
|
metadata={
|
|
"plugins": {
|
|
"register-route-demo": {
|
|
"path": configured_path,
|
|
}
|
|
}
|
|
}
|
|
) as client:
|
|
response = client.get(f"/{configured_path}/")
|
|
assert response.status == 200
|
|
assert configured_path.upper() == response.text
|
|
# Other one should 404
|
|
other_path = [p for p in ("path1", "path2") if configured_path != p][0]
|
|
assert client.get(f"/{other_path}/", follow_redirects=True).status == 404
|
|
|
|
|
|
def test_hook_register_routes_override():
|
|
"Plugins can over-ride default paths such as /db/table"
|
|
with make_app_client(
|
|
metadata={
|
|
"plugins": {
|
|
"register-route-demo": {
|
|
"path": "blah",
|
|
}
|
|
}
|
|
}
|
|
) as client:
|
|
response = client.get("/db/table")
|
|
assert response.status == 200
|
|
assert (
|
|
response.text
|
|
== "/db/table: [('db_name', 'db'), ('table_and_format', 'table')]"
|
|
)
|
|
|
|
|
|
def test_hook_register_routes_post(app_client):
|
|
response = app_client.post("/post/", {"this is": "post data"}, csrftoken_from=True)
|
|
assert 200 == response.status
|
|
assert "csrftoken" in response.json
|
|
assert "post data" == response.json["this is"]
|
|
|
|
|
|
def test_hook_register_routes_csrftoken(restore_working_directory, tmpdir_factory):
|
|
templates = tmpdir_factory.mktemp("templates")
|
|
(templates / "csrftoken_form.html").write_text(
|
|
"CSRFTOKEN: {{ csrftoken() }}", "utf-8"
|
|
)
|
|
with make_app_client(template_dir=templates) as client:
|
|
response = client.get("/csrftoken-form/")
|
|
expected_token = client.ds._last_request.scope["csrftoken"]()
|
|
assert f"CSRFTOKEN: {expected_token}" == response.text
|
|
|
|
|
|
def test_hook_register_routes_asgi(app_client):
|
|
response = app_client.get("/three/")
|
|
assert {"hello": "world"} == response.json
|
|
assert "1" == response.headers["x-three"]
|
|
|
|
|
|
def test_hook_register_routes_add_message(app_client):
|
|
response = app_client.get("/add-message/")
|
|
assert 200 == response.status
|
|
assert "Added message" == response.text
|
|
decoded = app_client.ds.unsign(response.cookies["ds_messages"], "messages")
|
|
assert [["Hello from messages", 1]] == decoded
|
|
|
|
|
|
def test_hook_register_routes_render_message(restore_working_directory, tmpdir_factory):
|
|
templates = tmpdir_factory.mktemp("templates")
|
|
(templates / "render_message.html").write_text('{% extends "base.html" %}', "utf-8")
|
|
with make_app_client(template_dir=templates) as client:
|
|
response1 = client.get("/add-message/")
|
|
response2 = client.get("/render-message/", cookies=response1.cookies)
|
|
assert 200 == response2.status
|
|
assert "Hello from messages" in response2.text
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_hook_startup(app_client):
|
|
await app_client.ds.invoke_startup()
|
|
assert app_client.ds._startup_hook_fired
|
|
assert 2 == app_client.ds._startup_hook_calculation
|
|
|
|
|
|
def test_hook_canned_queries(app_client):
|
|
queries = app_client.get("/fixtures.json").json["queries"]
|
|
queries_by_name = {q["name"]: q for q in queries}
|
|
assert {
|
|
"sql": "select 2",
|
|
"name": "from_async_hook",
|
|
"private": False,
|
|
} == queries_by_name["from_async_hook"]
|
|
assert {
|
|
"sql": "select 1, 'null' as actor_id",
|
|
"name": "from_hook",
|
|
"private": False,
|
|
} == queries_by_name["from_hook"]
|
|
|
|
|
|
def test_hook_canned_queries_non_async(app_client):
|
|
response = app_client.get("/fixtures/from_hook.json?_shape=array")
|
|
assert [{"1": 1, "actor_id": "null"}] == response.json
|
|
|
|
|
|
def test_hook_canned_queries_async(app_client):
|
|
response = app_client.get("/fixtures/from_async_hook.json?_shape=array")
|
|
assert [{"2": 2}] == response.json
|
|
|
|
|
|
def test_hook_canned_queries_actor(app_client):
|
|
assert [{"1": 1, "actor_id": "bot"}] == app_client.get(
|
|
"/fixtures/from_hook.json?_bot=1&_shape=array"
|
|
).json
|
|
|
|
|
|
def test_hook_register_magic_parameters(restore_working_directory):
|
|
with make_app_client(
|
|
extra_databases={"data.db": "create table logs (line text)"},
|
|
metadata={
|
|
"databases": {
|
|
"data": {
|
|
"queries": {
|
|
"runme": {
|
|
"sql": "insert into logs (line) values (:_request_http_version)",
|
|
"write": True,
|
|
},
|
|
"get_uuid": {
|
|
"sql": "select :_uuid_new",
|
|
},
|
|
}
|
|
}
|
|
}
|
|
},
|
|
) as client:
|
|
response = client.post("/data/runme", {}, csrftoken_from=True)
|
|
assert 302 == response.status
|
|
actual = client.get("/data/logs.json?_sort_desc=rowid&_shape=array").json
|
|
assert [{"rowid": 1, "line": "1.1"}] == actual
|
|
# Now try the GET request against get_uuid
|
|
response_get = client.get("/data/get_uuid.json?_shape=array")
|
|
assert 200 == response_get.status
|
|
new_uuid = response_get.json[0][":_uuid_new"]
|
|
assert 4 == new_uuid.count("-")
|
|
|
|
|
|
def test_hook_forbidden(restore_working_directory):
|
|
with make_app_client(
|
|
extra_databases={"data2.db": "create table logs (line text)"},
|
|
metadata={"allow": {}},
|
|
) as client:
|
|
response = client.get("/")
|
|
assert 403 == response.status
|
|
response2 = client.get("/data2")
|
|
assert 302 == response2.status
|
|
assert "/login?message=view-database" == response2.headers["Location"]
|
|
assert "view-database" == client.ds._last_forbidden_message
|
|
|
|
|
|
def test_hook_menu_links(app_client):
|
|
def get_menu_links(html):
|
|
soup = Soup(html, "html.parser")
|
|
return [
|
|
{"label": a.text, "href": a["href"]} for a in soup.find("nav").select("a")
|
|
]
|
|
|
|
response = app_client.get("/")
|
|
assert get_menu_links(response.text) == []
|
|
|
|
response_2 = app_client.get("/?_bot=1&_hello=BOB")
|
|
assert get_menu_links(response_2.text) == [
|
|
{"label": "Hello, BOB", "href": "/"},
|
|
{"label": "Hello 2", "href": "/"},
|
|
]
|
|
|
|
|
|
@pytest.mark.parametrize("table_or_view", ["facetable", "simple_view"])
|
|
def test_hook_table_actions(app_client, table_or_view):
|
|
def get_table_actions_links(html):
|
|
soup = Soup(html, "html.parser")
|
|
details = soup.find("details", {"class": "actions-menu-links"})
|
|
if details is None:
|
|
return []
|
|
return [{"label": a.text, "href": a["href"]} for a in details.select("a")]
|
|
|
|
response = app_client.get(f"/fixtures/{table_or_view}")
|
|
assert get_table_actions_links(response.text) == []
|
|
|
|
response_2 = app_client.get(f"/fixtures/{table_or_view}?_bot=1&_hello=BOB")
|
|
assert sorted(
|
|
get_table_actions_links(response_2.text), key=lambda l: l["label"]
|
|
) == [
|
|
{"label": "Database: fixtures", "href": "/"},
|
|
{"label": "From async BOB", "href": "/"},
|
|
{"label": f"Table: {table_or_view}", "href": "/"},
|
|
]
|
|
|
|
|
|
def test_hook_database_actions(app_client):
|
|
def get_table_actions_links(html):
|
|
soup = Soup(html, "html.parser")
|
|
details = soup.find("details", {"class": "actions-menu-links"})
|
|
if details is None:
|
|
return []
|
|
return [{"label": a.text, "href": a["href"]} for a in details.select("a")]
|
|
|
|
response = app_client.get("/fixtures")
|
|
assert get_table_actions_links(response.text) == []
|
|
|
|
response_2 = app_client.get("/fixtures?_bot=1&_hello=BOB")
|
|
assert get_table_actions_links(response_2.text) == [
|
|
{"label": "Database: fixtures - BOB", "href": "/"},
|
|
]
|
|
|
|
|
|
def test_hook_skip_csrf(app_client):
|
|
cookie = app_client.actor_cookie({"id": "test"})
|
|
csrf_response = app_client.post(
|
|
"/post/",
|
|
post_data={"this is": "post data"},
|
|
csrftoken_from=True,
|
|
cookies={"ds_actor": cookie},
|
|
)
|
|
assert csrf_response.status == 200
|
|
missing_csrf_response = app_client.post(
|
|
"/post/", post_data={"this is": "post data"}, cookies={"ds_actor": cookie}
|
|
)
|
|
assert missing_csrf_response.status == 403
|
|
# But "/skip-csrf" should allow
|
|
allow_csrf_response = app_client.post(
|
|
"/skip-csrf", post_data={"this is": "post data"}, cookies={"ds_actor": cookie}
|
|
)
|
|
assert allow_csrf_response.status == 405 # Method not allowed
|
|
# /skip-csrf-2 should not
|
|
second_missing_csrf_response = app_client.post(
|
|
"/skip-csrf-2", post_data={"this is": "post data"}, cookies={"ds_actor": cookie}
|
|
)
|
|
assert second_missing_csrf_response.status == 403
|
|
|
|
|
|
def test_hook_get_metadata(app_client):
|
|
app_client.ds._metadata_local = {
|
|
"title": "Testing get_metadata hook!",
|
|
"databases": {"from-local": {"title": "Hello from local metadata"}},
|
|
}
|
|
og_pm_hook_get_metadata = pm.hook.get_metadata
|
|
|
|
def get_metadata_mock(*args, **kwargs):
|
|
return [
|
|
{
|
|
"databases": {
|
|
"from-hook": {"title": "Hello from the plugin hook"},
|
|
"from-local": {"title": "This will be overwritten!"},
|
|
}
|
|
}
|
|
]
|
|
|
|
pm.hook.get_metadata = get_metadata_mock
|
|
meta = app_client.ds.metadata()
|
|
assert "Testing get_metadata hook!" == meta["title"]
|
|
assert "Hello from local metadata" == meta["databases"]["from-local"]["title"]
|
|
assert "Hello from the plugin hook" == meta["databases"]["from-hook"]["title"]
|
|
pm.hook.get_metadata = og_pm_hook_get_metadata
|
|
|
|
|
|
def _extract_commands(output):
|
|
lines = output.split("Commands:\n", 1)[1].split("\n")
|
|
return {line.split()[0].replace("*", "") for line in lines if line.strip()}
|
|
|
|
|
|
def test_hook_register_commands():
|
|
# Without the plugin should have seven commands
|
|
runner = CliRunner()
|
|
result = runner.invoke(cli.cli, "--help")
|
|
commands = _extract_commands(result.output)
|
|
assert commands == {
|
|
"serve",
|
|
"inspect",
|
|
"install",
|
|
"package",
|
|
"plugins",
|
|
"publish",
|
|
"uninstall",
|
|
}
|
|
|
|
# Now install a plugin
|
|
class VerifyPlugin:
|
|
__name__ = "VerifyPlugin"
|
|
|
|
@hookimpl
|
|
def register_commands(self, cli):
|
|
@cli.command()
|
|
def verify():
|
|
pass
|
|
|
|
@cli.command()
|
|
def unverify():
|
|
pass
|
|
|
|
pm.register(VerifyPlugin(), name="verify")
|
|
importlib.reload(cli)
|
|
result2 = runner.invoke(cli.cli, "--help")
|
|
commands2 = _extract_commands(result2.output)
|
|
assert commands2 == {
|
|
"serve",
|
|
"inspect",
|
|
"install",
|
|
"package",
|
|
"plugins",
|
|
"publish",
|
|
"uninstall",
|
|
"verify",
|
|
"unverify",
|
|
}
|
|
pm.unregister(name="verify")
|
|
importlib.reload(cli)
|
|
|
|
|
|
def test_hook_filters_from_request(app_client):
|
|
class ReturnNothingPlugin:
|
|
__name__ = "ReturnNothingPlugin"
|
|
|
|
@hookimpl
|
|
def filters_from_request(self, request):
|
|
if request.args.get("_nothing"):
|
|
return FilterArguments(["1 = 0"], human_descriptions=["NOTHING"])
|
|
|
|
pm.register(ReturnNothingPlugin(), name="ReturnNothingPlugin")
|
|
response = app_client.get("/fixtures/facetable?_nothing=1")
|
|
assert "0 rows\n where NOTHING" in response.text
|
|
json_response = app_client.get("/fixtures/facetable.json?_nothing=1")
|
|
assert json_response.json["rows"] == []
|
|
pm.unregister(name="ReturnNothingPlugin")
|