kopia lustrzana https://github.com/simonw/datasette
425 wiersze
13 KiB
Python
425 wiersze
13 KiB
Python
from .fixtures import (
|
|
app_client,
|
|
make_app_client,
|
|
TestClient as _TestClient,
|
|
EXPECTED_PLUGINS,
|
|
)
|
|
from datasette.app import SETTINGS
|
|
from datasette.plugins import DEFAULT_PLUGINS
|
|
from datasette.cli import cli, serve
|
|
from datasette.version import __version__
|
|
from datasette.utils import tilde_encode
|
|
from datasette.utils.sqlite import sqlite3
|
|
from click.testing import CliRunner
|
|
import io
|
|
import json
|
|
import pathlib
|
|
import pytest
|
|
import sys
|
|
import textwrap
|
|
from unittest import mock
|
|
|
|
|
|
def test_inspect_cli(app_client):
|
|
runner = CliRunner()
|
|
result = runner.invoke(cli, ["inspect", "fixtures.db"])
|
|
data = json.loads(result.output)
|
|
assert ["fixtures"] == list(data.keys())
|
|
database = data["fixtures"]
|
|
assert "fixtures.db" == database["file"]
|
|
assert isinstance(database["hash"], str)
|
|
assert 64 == len(database["hash"])
|
|
for table_name, expected_count in {
|
|
"Table With Space In Name": 0,
|
|
"facetable": 15,
|
|
}.items():
|
|
assert expected_count == database["tables"][table_name]["count"]
|
|
|
|
|
|
def test_inspect_cli_writes_to_file(app_client):
|
|
runner = CliRunner()
|
|
result = runner.invoke(
|
|
cli, ["inspect", "fixtures.db", "--inspect-file", "foo.json"]
|
|
)
|
|
assert 0 == result.exit_code, result.output
|
|
with open("foo.json") as fp:
|
|
data = json.load(fp)
|
|
assert ["fixtures"] == list(data.keys())
|
|
|
|
|
|
def test_serve_with_inspect_file_prepopulates_table_counts_cache():
|
|
inspect_data = {"fixtures": {"tables": {"hithere": {"count": 44}}}}
|
|
with make_app_client(inspect_data=inspect_data, is_immutable=True) as client:
|
|
assert inspect_data == client.ds.inspect_data
|
|
db = client.ds.databases["fixtures"]
|
|
assert {"hithere": 44} == db.cached_table_counts
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"spatialite_paths,should_suggest_load_extension",
|
|
(
|
|
([], False),
|
|
(["/tmp"], True),
|
|
),
|
|
)
|
|
def test_spatialite_error_if_attempt_to_open_spatialite(
|
|
spatialite_paths, should_suggest_load_extension
|
|
):
|
|
with mock.patch("datasette.utils.SPATIALITE_PATHS", spatialite_paths):
|
|
runner = CliRunner()
|
|
result = runner.invoke(
|
|
cli, ["serve", str(pathlib.Path(__file__).parent / "spatialite.db")]
|
|
)
|
|
assert result.exit_code != 0
|
|
assert "It looks like you're trying to load a SpatiaLite" in result.output
|
|
suggestion = "--load-extension=spatialite"
|
|
if should_suggest_load_extension:
|
|
assert suggestion in result.output
|
|
else:
|
|
assert suggestion not in result.output
|
|
|
|
|
|
@mock.patch("datasette.utils.SPATIALITE_PATHS", ["/does/not/exist"])
|
|
def test_spatialite_error_if_cannot_find_load_extension_spatialite():
|
|
runner = CliRunner()
|
|
result = runner.invoke(
|
|
cli,
|
|
[
|
|
"serve",
|
|
str(pathlib.Path(__file__).parent / "spatialite.db"),
|
|
"--load-extension",
|
|
"spatialite",
|
|
],
|
|
)
|
|
assert result.exit_code != 0
|
|
assert "Could not find SpatiaLite extension" in result.output
|
|
|
|
|
|
def test_plugins_cli(app_client):
|
|
runner = CliRunner()
|
|
result1 = runner.invoke(cli, ["plugins"])
|
|
actual_plugins = sorted(
|
|
[p for p in json.loads(result1.output) if p["name"] != "TrackEventPlugin"],
|
|
key=lambda p: p["name"],
|
|
)
|
|
assert actual_plugins == EXPECTED_PLUGINS
|
|
# Try with --all
|
|
result2 = runner.invoke(cli, ["plugins", "--all"])
|
|
names = [p["name"] for p in json.loads(result2.output)]
|
|
# Should have all the EXPECTED_PLUGINS
|
|
assert set(names).issuperset({p["name"] for p in EXPECTED_PLUGINS})
|
|
# And the following too:
|
|
assert set(names).issuperset(DEFAULT_PLUGINS)
|
|
# --requirements should be empty because there are no installed non-plugins-dir plugins
|
|
result3 = runner.invoke(cli, ["plugins", "--requirements"])
|
|
assert result3.output == ""
|
|
|
|
|
|
def test_metadata_yaml():
|
|
yaml_file = io.StringIO(
|
|
textwrap.dedent(
|
|
"""
|
|
title: Hello from YAML
|
|
"""
|
|
)
|
|
)
|
|
# Annoyingly we have to provide all default arguments here:
|
|
ds = serve.callback(
|
|
[],
|
|
metadata=yaml_file,
|
|
immutable=[],
|
|
host="127.0.0.1",
|
|
port=8001,
|
|
uds=None,
|
|
reload=False,
|
|
cors=False,
|
|
sqlite_extensions=[],
|
|
inspect_file=None,
|
|
template_dir=None,
|
|
plugins_dir=None,
|
|
static=[],
|
|
memory=False,
|
|
config=[],
|
|
settings=[],
|
|
secret=None,
|
|
root=False,
|
|
token=None,
|
|
actor=None,
|
|
version_note=None,
|
|
get=None,
|
|
help_settings=False,
|
|
pdb=False,
|
|
crossdb=False,
|
|
nolock=False,
|
|
open_browser=False,
|
|
create=False,
|
|
ssl_keyfile=None,
|
|
ssl_certfile=None,
|
|
return_instance=True,
|
|
internal=None,
|
|
)
|
|
client = _TestClient(ds)
|
|
response = client.get("/.json")
|
|
assert {"title": "Hello from YAML"} == response.json["metadata"]
|
|
|
|
|
|
@mock.patch("datasette.cli.run_module")
|
|
def test_install(run_module):
|
|
runner = CliRunner()
|
|
runner.invoke(cli, ["install", "datasette-mock-plugin", "datasette-mock-plugin2"])
|
|
run_module.assert_called_once_with("pip", run_name="__main__")
|
|
assert sys.argv == [
|
|
"pip",
|
|
"install",
|
|
"datasette-mock-plugin",
|
|
"datasette-mock-plugin2",
|
|
]
|
|
|
|
|
|
@pytest.mark.parametrize("flag", ["-U", "--upgrade"])
|
|
@mock.patch("datasette.cli.run_module")
|
|
def test_install_upgrade(run_module, flag):
|
|
runner = CliRunner()
|
|
runner.invoke(cli, ["install", flag, "datasette"])
|
|
run_module.assert_called_once_with("pip", run_name="__main__")
|
|
assert sys.argv == ["pip", "install", "--upgrade", "datasette"]
|
|
|
|
|
|
@mock.patch("datasette.cli.run_module")
|
|
def test_install_requirements(run_module, tmpdir):
|
|
path = tmpdir.join("requirements.txt")
|
|
path.write("datasette-mock-plugin\ndatasette-plugin-2")
|
|
runner = CliRunner()
|
|
runner.invoke(cli, ["install", "-r", str(path)])
|
|
run_module.assert_called_once_with("pip", run_name="__main__")
|
|
assert sys.argv == ["pip", "install", "-r", str(path)]
|
|
|
|
|
|
def test_install_error_if_no_packages():
|
|
runner = CliRunner()
|
|
result = runner.invoke(cli, ["install"])
|
|
assert result.exit_code == 2
|
|
assert "Error: Please specify at least one package to install" in result.output
|
|
|
|
|
|
@mock.patch("datasette.cli.run_module")
|
|
def test_uninstall(run_module):
|
|
runner = CliRunner()
|
|
runner.invoke(cli, ["uninstall", "datasette-mock-plugin", "-y"])
|
|
run_module.assert_called_once_with("pip", run_name="__main__")
|
|
assert sys.argv == ["pip", "uninstall", "datasette-mock-plugin", "-y"]
|
|
|
|
|
|
def test_version():
|
|
runner = CliRunner()
|
|
result = runner.invoke(cli, ["--version"])
|
|
assert result.output == f"cli, version {__version__}\n"
|
|
|
|
|
|
@pytest.mark.parametrize("invalid_port", ["-1", "0.5", "dog", "65536"])
|
|
def test_serve_invalid_ports(invalid_port):
|
|
runner = CliRunner(mix_stderr=False)
|
|
result = runner.invoke(cli, ["--port", invalid_port])
|
|
assert result.exit_code == 2
|
|
assert "Invalid value for '-p'" in result.stderr
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"args",
|
|
(
|
|
["--setting", "default_page_size", "5"],
|
|
["--setting", "settings.default_page_size", "5"],
|
|
["-s", "settings.default_page_size", "5"],
|
|
),
|
|
)
|
|
def test_setting(args):
|
|
runner = CliRunner()
|
|
result = runner.invoke(cli, ["--get", "/-/settings.json"] + args)
|
|
assert result.exit_code == 0, result.output
|
|
settings = json.loads(result.output)
|
|
assert settings["default_page_size"] == 5
|
|
|
|
|
|
def test_plugin_s_overwrite():
|
|
runner = CliRunner()
|
|
plugins_dir = str(pathlib.Path(__file__).parent / "plugins")
|
|
|
|
result = runner.invoke(
|
|
cli,
|
|
[
|
|
"--plugins-dir",
|
|
plugins_dir,
|
|
"--get",
|
|
"/_memory.json?sql=select+prepare_connection_args()",
|
|
],
|
|
)
|
|
assert result.exit_code == 0, result.output
|
|
assert (
|
|
json.loads(result.output).get("rows")[0].get("prepare_connection_args()")
|
|
== 'database=_memory, datasette.plugin_config("name-of-plugin")=None'
|
|
)
|
|
|
|
result = runner.invoke(
|
|
cli,
|
|
[
|
|
"--plugins-dir",
|
|
plugins_dir,
|
|
"--get",
|
|
"/_memory.json?sql=select+prepare_connection_args()",
|
|
"-s",
|
|
"plugins.name-of-plugin",
|
|
"OVERRIDE",
|
|
],
|
|
)
|
|
assert result.exit_code == 0, result.output
|
|
assert (
|
|
json.loads(result.output).get("rows")[0].get("prepare_connection_args()")
|
|
== 'database=_memory, datasette.plugin_config("name-of-plugin")=OVERRIDE'
|
|
)
|
|
|
|
|
|
def test_setting_type_validation():
|
|
runner = CliRunner(mix_stderr=False)
|
|
result = runner.invoke(cli, ["--setting", "default_page_size", "dog"])
|
|
assert result.exit_code == 2
|
|
assert '"settings.default_page_size" should be an integer' in result.stderr
|
|
|
|
|
|
@pytest.mark.parametrize("default_allow_sql", (True, False))
|
|
def test_setting_default_allow_sql(default_allow_sql):
|
|
runner = CliRunner()
|
|
result = runner.invoke(
|
|
cli,
|
|
[
|
|
"--setting",
|
|
"default_allow_sql",
|
|
"on" if default_allow_sql else "off",
|
|
"--get",
|
|
"/_memory.json?sql=select+21&_shape=objects",
|
|
],
|
|
)
|
|
if default_allow_sql:
|
|
assert result.exit_code == 0, result.output
|
|
assert json.loads(result.output)["rows"][0] == {"21": 21}
|
|
else:
|
|
assert result.exit_code == 1, result.output
|
|
# This isn't JSON at the moment, maybe it should be though
|
|
assert "Forbidden" in result.output
|
|
|
|
|
|
def test_sql_errors_logged_to_stderr():
|
|
runner = CliRunner(mix_stderr=False)
|
|
result = runner.invoke(cli, ["--get", "/_memory.json?sql=select+blah"])
|
|
assert result.exit_code == 1
|
|
assert "sql = 'select blah', params = {}: no such column: blah\n" in result.stderr
|
|
|
|
|
|
def test_serve_create(tmpdir):
|
|
runner = CliRunner()
|
|
db_path = tmpdir / "does_not_exist_yet.db"
|
|
assert not db_path.exists()
|
|
result = runner.invoke(
|
|
cli, [str(db_path), "--create", "--get", "/-/databases.json"]
|
|
)
|
|
assert result.exit_code == 0, result.output
|
|
databases = json.loads(result.output)
|
|
assert {
|
|
"name": "does_not_exist_yet",
|
|
"is_mutable": True,
|
|
"is_memory": False,
|
|
"hash": None,
|
|
}.items() <= databases[0].items()
|
|
assert db_path.exists()
|
|
|
|
|
|
@pytest.mark.parametrize("argument", ("-c", "--config"))
|
|
@pytest.mark.parametrize("format_", ("json", "yaml"))
|
|
def test_serve_config(tmpdir, argument, format_):
|
|
config_path = tmpdir / "datasette.{}".format(format_)
|
|
config_path.write_text(
|
|
(
|
|
"settings:\n default_page_size: 5\n"
|
|
if format_ == "yaml"
|
|
else '{"settings": {"default_page_size": 5}}'
|
|
),
|
|
"utf-8",
|
|
)
|
|
runner = CliRunner()
|
|
result = runner.invoke(
|
|
cli,
|
|
[
|
|
argument,
|
|
str(config_path),
|
|
"--get",
|
|
"/-/settings.json",
|
|
],
|
|
)
|
|
assert result.exit_code == 0, result.output
|
|
assert json.loads(result.output)["default_page_size"] == 5
|
|
|
|
|
|
def test_serve_duplicate_database_names(tmpdir):
|
|
"'datasette db.db nested/db.db' should attach two databases, /db and /db_2"
|
|
runner = CliRunner()
|
|
db_1_path = str(tmpdir / "db.db")
|
|
nested = tmpdir / "nested"
|
|
nested.mkdir()
|
|
db_2_path = str(tmpdir / "nested" / "db.db")
|
|
for path in (db_1_path, db_2_path):
|
|
sqlite3.connect(path).execute("vacuum")
|
|
result = runner.invoke(cli, [db_1_path, db_2_path, "--get", "/-/databases.json"])
|
|
assert result.exit_code == 0, result.output
|
|
databases = json.loads(result.output)
|
|
assert {db["name"] for db in databases} == {"db", "db_2"}
|
|
|
|
|
|
def test_serve_deduplicate_same_database_path(tmpdir):
|
|
"'datasette db.db db.db' should only attach one database, /db"
|
|
runner = CliRunner()
|
|
db_path = str(tmpdir / "db.db")
|
|
sqlite3.connect(db_path).execute("vacuum")
|
|
result = runner.invoke(cli, [db_path, db_path, "--get", "/-/databases.json"])
|
|
assert result.exit_code == 0, result.output
|
|
databases = json.loads(result.output)
|
|
assert {db["name"] for db in databases} == {"db"}
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"filename", ["test-database (1).sqlite", "database (1).sqlite"]
|
|
)
|
|
def test_weird_database_names(tmpdir, filename):
|
|
# https://github.com/simonw/datasette/issues/1181
|
|
runner = CliRunner()
|
|
db_path = str(tmpdir / filename)
|
|
sqlite3.connect(db_path).execute("vacuum")
|
|
result1 = runner.invoke(cli, [db_path, "--get", "/"])
|
|
assert result1.exit_code == 0, result1.output
|
|
filename_no_stem = filename.rsplit(".", 1)[0]
|
|
expected_link = '<a href="/{}">{}</a>'.format(
|
|
tilde_encode(filename_no_stem), filename_no_stem
|
|
)
|
|
assert expected_link in result1.output
|
|
# Now try hitting that database page
|
|
result2 = runner.invoke(
|
|
cli, [db_path, "--get", "/{}".format(tilde_encode(filename_no_stem))]
|
|
)
|
|
assert result2.exit_code == 0, result2.output
|
|
|
|
|
|
def test_help_settings():
|
|
runner = CliRunner()
|
|
result = runner.invoke(cli, ["--help-settings"])
|
|
for setting in SETTINGS:
|
|
assert setting.name in result.output
|
|
|
|
|
|
def test_internal_db(tmpdir):
|
|
runner = CliRunner()
|
|
internal_path = tmpdir / "internal.db"
|
|
assert not internal_path.exists()
|
|
result = runner.invoke(
|
|
cli, ["--memory", "--internal", str(internal_path), "--get", "/"]
|
|
)
|
|
assert result.exit_code == 0
|
|
assert internal_path.exists()
|