kopia lustrzana https://github.com/simonw/datasette
576 wiersze
16 KiB
Python
576 wiersze
16 KiB
Python
"""
|
|
Tests for the datasette.database.Database class
|
|
"""
|
|
from datasette.database import Database, Results, MultipleValues
|
|
from datasette.utils.sqlite import sqlite3
|
|
from datasette.utils import Column
|
|
from .fixtures import app_client, app_client_two_attached_databases_crossdb_enabled
|
|
import pytest
|
|
import time
|
|
import uuid
|
|
|
|
|
|
@pytest.fixture
|
|
def db(app_client):
|
|
return app_client.ds.get_database("fixtures")
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_execute(db):
|
|
results = await db.execute("select * from facetable")
|
|
assert isinstance(results, Results)
|
|
assert 15 == len(results)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_results_first(db):
|
|
assert None is (await db.execute("select * from facetable where pk > 100")).first()
|
|
results = await db.execute("select * from facetable")
|
|
row = results.first()
|
|
assert isinstance(row, sqlite3.Row)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
@pytest.mark.parametrize("expected", (True, False))
|
|
async def test_results_bool(db, expected):
|
|
where = "" if expected else "where pk = 0"
|
|
results = await db.execute("select * from facetable {}".format(where))
|
|
assert bool(results) is expected
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"query,expected",
|
|
[
|
|
("select 1", 1),
|
|
("select 1, 2", None),
|
|
("select 1 as num union select 2 as num", None),
|
|
],
|
|
)
|
|
@pytest.mark.asyncio
|
|
async def test_results_single_value(db, query, expected):
|
|
results = await db.execute(query)
|
|
if expected:
|
|
assert expected == results.single_value()
|
|
else:
|
|
with pytest.raises(MultipleValues):
|
|
results.single_value()
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_execute_fn(db):
|
|
def get_1_plus_1(conn):
|
|
return conn.execute("select 1 + 1").fetchall()[0][0]
|
|
|
|
assert 2 == await db.execute_fn(get_1_plus_1)
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"tables,exists",
|
|
(
|
|
(["facetable", "searchable", "tags", "searchable_tags"], True),
|
|
(["foo", "bar", "baz"], False),
|
|
),
|
|
)
|
|
@pytest.mark.asyncio
|
|
async def test_table_exists(db, tables, exists):
|
|
for table in tables:
|
|
actual = await db.table_exists(table)
|
|
assert exists == actual
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"view,expected",
|
|
(
|
|
("not_a_view", False),
|
|
("paginated_view", True),
|
|
),
|
|
)
|
|
@pytest.mark.asyncio
|
|
async def test_view_exists(db, view, expected):
|
|
actual = await db.view_exists(view)
|
|
assert actual == expected
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"table,expected",
|
|
(
|
|
(
|
|
"facetable",
|
|
[
|
|
"pk",
|
|
"created",
|
|
"planet_int",
|
|
"on_earth",
|
|
"state",
|
|
"_city_id",
|
|
"_neighborhood",
|
|
"tags",
|
|
"complex_array",
|
|
"distinct_some_null",
|
|
"n",
|
|
],
|
|
),
|
|
(
|
|
"sortable",
|
|
[
|
|
"pk1",
|
|
"pk2",
|
|
"content",
|
|
"sortable",
|
|
"sortable_with_nulls",
|
|
"sortable_with_nulls_2",
|
|
"text",
|
|
],
|
|
),
|
|
),
|
|
)
|
|
@pytest.mark.asyncio
|
|
async def test_table_columns(db, table, expected):
|
|
columns = await db.table_columns(table)
|
|
assert columns == expected
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"table,expected",
|
|
(
|
|
(
|
|
"facetable",
|
|
[
|
|
Column(
|
|
cid=0,
|
|
name="pk",
|
|
type="integer",
|
|
notnull=0,
|
|
default_value=None,
|
|
is_pk=1,
|
|
hidden=0,
|
|
),
|
|
Column(
|
|
cid=1,
|
|
name="created",
|
|
type="text",
|
|
notnull=0,
|
|
default_value=None,
|
|
is_pk=0,
|
|
hidden=0,
|
|
),
|
|
Column(
|
|
cid=2,
|
|
name="planet_int",
|
|
type="integer",
|
|
notnull=0,
|
|
default_value=None,
|
|
is_pk=0,
|
|
hidden=0,
|
|
),
|
|
Column(
|
|
cid=3,
|
|
name="on_earth",
|
|
type="integer",
|
|
notnull=0,
|
|
default_value=None,
|
|
is_pk=0,
|
|
hidden=0,
|
|
),
|
|
Column(
|
|
cid=4,
|
|
name="state",
|
|
type="text",
|
|
notnull=0,
|
|
default_value=None,
|
|
is_pk=0,
|
|
hidden=0,
|
|
),
|
|
Column(
|
|
cid=5,
|
|
name="_city_id",
|
|
type="integer",
|
|
notnull=0,
|
|
default_value=None,
|
|
is_pk=0,
|
|
hidden=0,
|
|
),
|
|
Column(
|
|
cid=6,
|
|
name="_neighborhood",
|
|
type="text",
|
|
notnull=0,
|
|
default_value=None,
|
|
is_pk=0,
|
|
hidden=0,
|
|
),
|
|
Column(
|
|
cid=7,
|
|
name="tags",
|
|
type="text",
|
|
notnull=0,
|
|
default_value=None,
|
|
is_pk=0,
|
|
hidden=0,
|
|
),
|
|
Column(
|
|
cid=8,
|
|
name="complex_array",
|
|
type="text",
|
|
notnull=0,
|
|
default_value=None,
|
|
is_pk=0,
|
|
hidden=0,
|
|
),
|
|
Column(
|
|
cid=9,
|
|
name="distinct_some_null",
|
|
type="",
|
|
notnull=0,
|
|
default_value=None,
|
|
is_pk=0,
|
|
hidden=0,
|
|
),
|
|
Column(
|
|
cid=10,
|
|
name="n",
|
|
type="text",
|
|
notnull=0,
|
|
default_value=None,
|
|
is_pk=0,
|
|
hidden=0,
|
|
),
|
|
],
|
|
),
|
|
(
|
|
"sortable",
|
|
[
|
|
Column(
|
|
cid=0,
|
|
name="pk1",
|
|
type="varchar(30)",
|
|
notnull=0,
|
|
default_value=None,
|
|
is_pk=1,
|
|
hidden=0,
|
|
),
|
|
Column(
|
|
cid=1,
|
|
name="pk2",
|
|
type="varchar(30)",
|
|
notnull=0,
|
|
default_value=None,
|
|
is_pk=2,
|
|
hidden=0,
|
|
),
|
|
Column(
|
|
cid=2,
|
|
name="content",
|
|
type="text",
|
|
notnull=0,
|
|
default_value=None,
|
|
is_pk=0,
|
|
hidden=0,
|
|
),
|
|
Column(
|
|
cid=3,
|
|
name="sortable",
|
|
type="integer",
|
|
notnull=0,
|
|
default_value=None,
|
|
is_pk=0,
|
|
hidden=0,
|
|
),
|
|
Column(
|
|
cid=4,
|
|
name="sortable_with_nulls",
|
|
type="real",
|
|
notnull=0,
|
|
default_value=None,
|
|
is_pk=0,
|
|
hidden=0,
|
|
),
|
|
Column(
|
|
cid=5,
|
|
name="sortable_with_nulls_2",
|
|
type="real",
|
|
notnull=0,
|
|
default_value=None,
|
|
is_pk=0,
|
|
hidden=0,
|
|
),
|
|
Column(
|
|
cid=6,
|
|
name="text",
|
|
type="text",
|
|
notnull=0,
|
|
default_value=None,
|
|
is_pk=0,
|
|
hidden=0,
|
|
),
|
|
],
|
|
),
|
|
),
|
|
)
|
|
@pytest.mark.asyncio
|
|
async def test_table_column_details(db, table, expected):
|
|
columns = await db.table_column_details(table)
|
|
# Convert "type" to lowercase before comparison
|
|
# https://github.com/simonw/datasette/issues/1647
|
|
compare_columns = [
|
|
Column(
|
|
c.cid, c.name, c.type.lower(), c.notnull, c.default_value, c.is_pk, c.hidden
|
|
)
|
|
for c in columns
|
|
]
|
|
assert compare_columns == expected
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_get_all_foreign_keys(db):
|
|
all_foreign_keys = await db.get_all_foreign_keys()
|
|
assert all_foreign_keys["roadside_attraction_characteristics"] == {
|
|
"incoming": [],
|
|
"outgoing": [
|
|
{
|
|
"other_table": "attraction_characteristic",
|
|
"column": "characteristic_id",
|
|
"other_column": "pk",
|
|
},
|
|
{
|
|
"other_table": "roadside_attractions",
|
|
"column": "attraction_id",
|
|
"other_column": "pk",
|
|
},
|
|
],
|
|
}
|
|
assert all_foreign_keys["attraction_characteristic"] == {
|
|
"incoming": [
|
|
{
|
|
"other_table": "roadside_attraction_characteristics",
|
|
"column": "pk",
|
|
"other_column": "characteristic_id",
|
|
}
|
|
],
|
|
"outgoing": [],
|
|
}
|
|
assert all_foreign_keys["compound_primary_key"] == {
|
|
# No incoming because these are compound foreign keys, which we currently ignore
|
|
"incoming": [],
|
|
"outgoing": [],
|
|
}
|
|
assert all_foreign_keys["foreign_key_references"] == {
|
|
"incoming": [],
|
|
"outgoing": [
|
|
{
|
|
"other_table": "primary_key_multiple_columns",
|
|
"column": "foreign_key_with_no_label",
|
|
"other_column": "id",
|
|
},
|
|
{
|
|
"other_table": "simple_primary_key",
|
|
"column": "foreign_key_with_blank_label",
|
|
"other_column": "id",
|
|
},
|
|
{
|
|
"other_table": "simple_primary_key",
|
|
"column": "foreign_key_with_label",
|
|
"other_column": "id",
|
|
},
|
|
],
|
|
}
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_table_names(db):
|
|
table_names = await db.table_names()
|
|
assert table_names == [
|
|
"simple_primary_key",
|
|
"primary_key_multiple_columns",
|
|
"primary_key_multiple_columns_explicit_label",
|
|
"compound_primary_key",
|
|
"compound_three_primary_keys",
|
|
"foreign_key_references",
|
|
"sortable",
|
|
"no_primary_key",
|
|
"123_starts_with_digits",
|
|
"Table With Space In Name",
|
|
"table/with/slashes.csv",
|
|
"complex_foreign_keys",
|
|
"custom_foreign_key_label",
|
|
"units",
|
|
"tags",
|
|
"searchable",
|
|
"searchable_tags",
|
|
"searchable_fts",
|
|
"searchable_fts_segments",
|
|
"searchable_fts_segdir",
|
|
"searchable_fts_docsize",
|
|
"searchable_fts_stat",
|
|
"select",
|
|
"infinity",
|
|
"facet_cities",
|
|
"facetable",
|
|
"binary_data",
|
|
"roadside_attractions",
|
|
"attraction_characteristic",
|
|
"roadside_attraction_characteristics",
|
|
]
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_view_names(db):
|
|
view_names = await db.view_names()
|
|
assert view_names == [
|
|
"paginated_view",
|
|
"simple_view",
|
|
"searchable_view",
|
|
"searchable_view_configured_by_metadata",
|
|
]
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_execute_write_block_true(db):
|
|
await db.execute_write(
|
|
"update roadside_attractions set name = ? where pk = ?", ["Mystery!", 1]
|
|
)
|
|
rows = await db.execute("select name from roadside_attractions where pk = 1")
|
|
assert "Mystery!" == rows.rows[0][0]
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_execute_write_block_false(db):
|
|
await db.execute_write(
|
|
"update roadside_attractions set name = ? where pk = ?",
|
|
["Mystery!", 1],
|
|
)
|
|
time.sleep(0.1)
|
|
rows = await db.execute("select name from roadside_attractions where pk = 1")
|
|
assert "Mystery!" == rows.rows[0][0]
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_execute_write_script(db):
|
|
await db.execute_write_script(
|
|
"create table foo (id integer primary key); create table bar (id integer primary key);"
|
|
)
|
|
table_names = await db.table_names()
|
|
assert {"foo", "bar"}.issubset(table_names)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_execute_write_many(db):
|
|
await db.execute_write_script("create table foomany (id integer primary key)")
|
|
await db.execute_write_many(
|
|
"insert into foomany (id) values (?)", [(1,), (10,), (100,)]
|
|
)
|
|
result = await db.execute("select * from foomany")
|
|
assert [r[0] for r in result.rows] == [1, 10, 100]
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_execute_write_has_correctly_prepared_connection(db):
|
|
# The sleep() function is only available if ds._prepare_connection() was called
|
|
await db.execute_write("select sleep(0.01)")
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_execute_write_fn_block_false(db):
|
|
def write_fn(conn):
|
|
with conn:
|
|
conn.execute("delete from roadside_attractions where pk = 1;")
|
|
row = conn.execute("select count(*) from roadside_attractions").fetchone()
|
|
return row[0]
|
|
|
|
task_id = await db.execute_write_fn(write_fn, block=False)
|
|
assert isinstance(task_id, uuid.UUID)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_execute_write_fn_block_true(db):
|
|
def write_fn(conn):
|
|
with conn:
|
|
conn.execute("delete from roadside_attractions where pk = 1;")
|
|
row = conn.execute("select count(*) from roadside_attractions").fetchone()
|
|
return row[0]
|
|
|
|
new_count = await db.execute_write_fn(write_fn)
|
|
assert 3 == new_count
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_execute_write_fn_exception(db):
|
|
def write_fn(conn):
|
|
assert False
|
|
|
|
with pytest.raises(AssertionError):
|
|
await db.execute_write_fn(write_fn)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
@pytest.mark.timeout(1)
|
|
async def test_execute_write_fn_connection_exception(tmpdir, app_client):
|
|
path = str(tmpdir / "immutable.db")
|
|
sqlite3.connect(path).execute("vacuum")
|
|
db = Database(app_client.ds, path=path, is_mutable=False)
|
|
app_client.ds.add_database(db, name="immutable-db")
|
|
|
|
def write_fn(conn):
|
|
assert False
|
|
|
|
with pytest.raises(AssertionError):
|
|
await db.execute_write_fn(write_fn)
|
|
|
|
app_client.ds.remove_database("immutable-db")
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_mtime_ns(db):
|
|
assert isinstance(db.mtime_ns, int)
|
|
|
|
|
|
def test_mtime_ns_is_none_for_memory(app_client):
|
|
memory_db = Database(app_client.ds, is_memory=True)
|
|
assert memory_db.is_memory is True
|
|
assert None is memory_db.mtime_ns
|
|
|
|
|
|
def test_is_mutable(app_client):
|
|
assert Database(app_client.ds, is_memory=True).is_mutable is True
|
|
assert Database(app_client.ds, is_memory=True, is_mutable=True).is_mutable is True
|
|
assert Database(app_client.ds, is_memory=True, is_mutable=False).is_mutable is False
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_attached_databases(app_client_two_attached_databases_crossdb_enabled):
|
|
database = app_client_two_attached_databases_crossdb_enabled.ds.get_database(
|
|
"_memory"
|
|
)
|
|
attached = await database.attached_databases()
|
|
assert {a.name for a in attached} == {"extra database", "fixtures"}
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_database_memory_name(app_client):
|
|
ds = app_client.ds
|
|
foo1 = ds.add_database(Database(ds, memory_name="foo"))
|
|
foo2 = ds.add_memory_database("foo")
|
|
bar1 = ds.add_database(Database(ds, memory_name="bar"))
|
|
bar2 = ds.add_memory_database("bar")
|
|
for db in (foo1, foo2, bar1, bar2):
|
|
table_names = await db.table_names()
|
|
assert table_names == []
|
|
# Now create a table in foo
|
|
await foo1.execute_write("create table foo (t text)")
|
|
assert await foo1.table_names() == ["foo"]
|
|
assert await foo2.table_names() == ["foo"]
|
|
assert await bar1.table_names() == []
|
|
assert await bar2.table_names() == []
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_in_memory_databases_forbid_writes(app_client):
|
|
ds = app_client.ds
|
|
db = ds.add_database(Database(ds, memory_name="test"))
|
|
with pytest.raises(sqlite3.OperationalError):
|
|
await db.execute("create table foo (t text)")
|
|
assert await db.table_names() == []
|
|
# Using db.execute_write() should work:
|
|
await db.execute_write("create table foo (t text)")
|
|
assert await db.table_names() == ["foo"]
|