2020-05-30 14:28:29 +00:00
|
|
|
"""
|
|
|
|
Tests for the datasette.app.Datasette class
|
|
|
|
"""
|
2022-05-02 20:15:27 +00:00
|
|
|
from datasette.app import Datasette, Database
|
2020-05-31 22:42:08 +00:00
|
|
|
from itsdangerous import BadSignature
|
2020-05-30 14:28:29 +00:00
|
|
|
from .fixtures import app_client
|
|
|
|
import pytest
|
|
|
|
|
|
|
|
|
|
|
|
@pytest.fixture
|
|
|
|
def datasette(app_client):
|
|
|
|
return app_client.ds
|
|
|
|
|
|
|
|
|
|
|
|
def test_get_database(datasette):
|
|
|
|
db = datasette.get_database("fixtures")
|
|
|
|
assert "fixtures" == db.name
|
|
|
|
with pytest.raises(KeyError):
|
|
|
|
datasette.get_database("missing")
|
|
|
|
|
|
|
|
|
|
|
|
def test_get_database_no_argument(datasette):
|
|
|
|
# Returns the first available database:
|
|
|
|
db = datasette.get_database()
|
|
|
|
assert "fixtures" == db.name
|
2020-05-31 22:42:08 +00:00
|
|
|
|
|
|
|
|
|
|
|
@pytest.mark.parametrize("value", ["hello", 123, {"key": "value"}])
|
|
|
|
@pytest.mark.parametrize("namespace", [None, "two"])
|
|
|
|
def test_sign_unsign(datasette, value, namespace):
|
|
|
|
extra_args = [namespace] if namespace else []
|
|
|
|
signed = datasette.sign(value, *extra_args)
|
|
|
|
assert value != signed
|
|
|
|
assert value == datasette.unsign(signed, *extra_args)
|
|
|
|
with pytest.raises(BadSignature):
|
|
|
|
datasette.unsign(signed[:-1] + ("!" if signed[-1] != "!" else ":"))
|
2020-11-24 22:06:32 +00:00
|
|
|
|
|
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
|
|
"setting,expected",
|
|
|
|
(
|
|
|
|
("base_url", "/"),
|
|
|
|
("max_csv_mb", 100),
|
|
|
|
("allow_csv_stream", True),
|
|
|
|
),
|
|
|
|
)
|
|
|
|
def test_datasette_setting(datasette, setting, expected):
|
|
|
|
assert datasette.setting(setting) == expected
|
2021-12-18 02:09:00 +00:00
|
|
|
|
|
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
|
|
async def test_datasette_constructor():
|
2021-12-18 02:19:36 +00:00
|
|
|
ds = Datasette()
|
2021-12-18 02:09:00 +00:00
|
|
|
databases = (await ds.client.get("/-/databases.json")).json()
|
|
|
|
assert databases == [
|
|
|
|
{
|
|
|
|
"name": "_memory",
|
2022-03-20 00:11:17 +00:00
|
|
|
"route": "_memory",
|
2021-12-18 02:09:00 +00:00
|
|
|
"path": None,
|
|
|
|
"size": 0,
|
2022-09-09 16:19:20 +00:00
|
|
|
"is_mutable": True,
|
2021-12-18 02:09:00 +00:00
|
|
|
"is_memory": True,
|
|
|
|
"hash": None,
|
|
|
|
}
|
|
|
|
]
|
2022-05-02 20:15:27 +00:00
|
|
|
|
|
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
|
|
async def test_num_sql_threads_zero():
|
|
|
|
ds = Datasette([], memory=True, settings={"num_sql_threads": 0})
|
|
|
|
db = ds.add_database(Database(ds, memory_name="test_num_sql_threads_zero"))
|
|
|
|
await db.execute_write("create table t(id integer primary key)")
|
|
|
|
await db.execute_write("insert into t (id) values (1)")
|
|
|
|
response = await ds.client.get("/-/threads.json")
|
|
|
|
assert response.json() == {"num_threads": 0, "threads": []}
|
|
|
|
response2 = await ds.client.get("/test_num_sql_threads_zero/t.json?_shape=array")
|
|
|
|
assert response2.json() == [{"id": 1}]
|