datasette/tests/test_internal_db.py

137 wiersze
4.0 KiB
Python

import pytest
from unittest.mock import patch
from datasette.app import Datasette
from datasette.database import Database
@pytest.mark.asyncio
async def test_internal_only_available_to_root(ds_client):
cookie = ds_client.actor_cookie({"id": "root"})
assert (await ds_client.get("/_internal")).status_code == 403
assert (
await ds_client.get("/_internal", cookies={"ds_actor": cookie})
).status_code == 200
@pytest.mark.asyncio
async def test_internal_databases(ds_client):
cookie = ds_client.actor_cookie({"id": "root"})
databases = (
await ds_client.get(
"/_internal/databases.json?_shape=array", cookies={"ds_actor": cookie}
)
).json()
assert len(databases) == 2
internal, fixtures = databases
assert internal["database_name"] == "_internal"
assert internal["is_memory"] == 1
assert internal["path"] is None
assert isinstance(internal["schema_version"], int)
assert fixtures["database_name"] == "fixtures"
@pytest.mark.asyncio
async def test_internal_tables(ds_client):
cookie = ds_client.actor_cookie({"id": "root"})
tables = (
await ds_client.get(
"/_internal/tables.json?_shape=array", cookies={"ds_actor": cookie}
)
).json()
assert len(tables) > 5
table = tables[0]
assert set(table.keys()) == {"rootpage", "table_name", "database_name", "sql"}
@pytest.mark.asyncio
async def test_internal_indexes(ds_client):
cookie = ds_client.actor_cookie({"id": "root"})
indexes = (
await ds_client.get(
"/_internal/indexes.json?_shape=array", cookies={"ds_actor": cookie}
)
).json()
assert len(indexes) > 5
index = indexes[0]
assert set(index.keys()) == {
"partial",
"name",
"table_name",
"unique",
"seq",
"database_name",
"origin",
}
@pytest.mark.asyncio
async def test_internal_foreign_keys(ds_client):
cookie = ds_client.actor_cookie({"id": "root"})
foreign_keys = (
await ds_client.get(
"/_internal/foreign_keys.json?_shape=array", cookies={"ds_actor": cookie}
)
).json()
assert len(foreign_keys) > 5
foreign_key = foreign_keys[0]
assert set(foreign_key.keys()) == {
"table",
"seq",
"on_update",
"on_delete",
"to",
"id",
"match",
"database_name",
"table_name",
"from",
}
@pytest.mark.asyncio
@pytest.mark.parametrize("schema_version_returns_none", (True, False))
async def test_detects_schema_changes(schema_version_returns_none):
ds = Datasette()
db_name = "test_detects_schema_changes_{}".format(schema_version_returns_none)
db = ds.add_memory_database(db_name)
# Test if Datasette correctly detects schema changes, whether or not
# the schema_version method is working.
# https://github.com/simonw/datasette/issues/2058
_internal = ds.get_database("_internal")
async def get_tables():
return [
dict(r)
for r in await _internal.execute(
"select table_name from tables where database_name = ?", [db_name]
)
]
async def test_it():
await ds.refresh_schemas()
initial_hash = await db.schema_hash()
# _internal should list zero tables
tables = await get_tables()
assert tables == []
# Create a new table
await db.execute_write("CREATE TABLE test (id INTEGER PRIMARY KEY)")
await ds.refresh_schemas()
assert await db.schema_hash() != initial_hash
# _internal should list one table
tables = await get_tables()
assert tables == [
{"table_name": "test"},
]
async def schema_version_none(self):
return None
if schema_version_returns_none:
with patch(
"datasette.database.Database.schema_version", new=schema_version_none
):
await test_it()
else:
await test_it()