import collections from datasette.app import Datasette from .fixtures import app_client, assert_permissions_checked, make_app_client from bs4 import BeautifulSoup as Soup import copy import json import pytest_asyncio import pytest import re import urllib @pytest.fixture(scope="module") def padlock_client(): with make_app_client( metadata={ "databases": { "fixtures": { "queries": {"two": {"sql": "select 1 + 1"}}, } } } ) as client: yield client @pytest_asyncio.fixture async def perms_ds(): ds = Datasette() await ds.invoke_startup() one = ds.add_memory_database("perms_ds_one") two = ds.add_memory_database("perms_ds_two") await one.execute_write("create table if not exists t1 (id integer primary key)") await one.execute_write("create table if not exists t2 (id integer primary key)") await two.execute_write("create table if not exists t1 (id integer primary key)") return ds @pytest.mark.parametrize( "allow,expected_anon,expected_auth", [ (None, 200, 200), ({}, 403, 403), ({"id": "root"}, 403, 200), ], ) @pytest.mark.parametrize( "path", ( "/", "/fixtures", "/fixtures/compound_three_primary_keys", "/fixtures/compound_three_primary_keys/a,a,a", "/fixtures/two", # Query ), ) def test_view_padlock(allow, expected_anon, expected_auth, path, padlock_client): padlock_client.ds._metadata_local["allow"] = allow fragment = "🔒" anon_response = padlock_client.get(path) assert expected_anon == anon_response.status if allow and anon_response.status == 200: # Should be no padlock assert fragment not in anon_response.text auth_response = padlock_client.get( path, cookies={"ds_actor": padlock_client.actor_cookie({"id": "root"})}, ) assert expected_auth == auth_response.status # Check for the padlock if allow and expected_anon == 403 and expected_auth == 200: assert fragment in auth_response.text del padlock_client.ds._metadata_local["allow"] @pytest.mark.parametrize( "allow,expected_anon,expected_auth", [ (None, 200, 200), ({}, 403, 403), ({"id": "root"}, 403, 200), ], ) def test_view_database(allow, expected_anon, expected_auth): with make_app_client( metadata={"databases": {"fixtures": {"allow": allow}}} ) as client: for path in ( "/fixtures", "/fixtures/compound_three_primary_keys", "/fixtures/compound_three_primary_keys/a,a,a", ): anon_response = client.get(path) assert expected_anon == anon_response.status, path if allow and path == "/fixtures" and anon_response.status == 200: # Should be no padlock assert ">fixtures 🔒" not in anon_response.text auth_response = client.get( path, cookies={"ds_actor": client.actor_cookie({"id": "root"})}, ) assert expected_auth == auth_response.status if ( allow and path == "/fixtures" and expected_anon == 403 and expected_auth == 200 ): assert ">fixtures 🔒" in auth_response.text def test_database_list_respects_view_database(): with make_app_client( metadata={"databases": {"fixtures": {"allow": {"id": "root"}}}}, extra_databases={"data.db": "create table names (name text)"}, ) as client: anon_response = client.get("/") assert 'data' in anon_response.text assert 'fixtures' not in anon_response.text auth_response = client.get( "/", cookies={"ds_actor": client.actor_cookie({"id": "root"})}, ) assert 'data' in auth_response.text assert 'fixtures 🔒' in auth_response.text def test_database_list_respects_view_table(): with make_app_client( metadata={ "databases": { "data": { "tables": { "names": {"allow": {"id": "root"}}, "v": {"allow": {"id": "root"}}, } } } }, extra_databases={ "data.db": "create table names (name text); create view v as select * from names" }, ) as client: html_fragments = [ ">names 🔒", ">v 🔒", ] anon_response_text = client.get("/").text assert "0 rows in 0 tables" in anon_response_text for html_fragment in html_fragments: assert html_fragment not in anon_response_text auth_response_text = client.get( "/", cookies={"ds_actor": client.actor_cookie({"id": "root"})}, ).text for html_fragment in html_fragments: assert html_fragment in auth_response_text @pytest.mark.parametrize( "allow,expected_anon,expected_auth", [ (None, 200, 200), ({}, 403, 403), ({"id": "root"}, 403, 200), ], ) def test_view_table(allow, expected_anon, expected_auth): with make_app_client( metadata={ "databases": { "fixtures": { "tables": {"compound_three_primary_keys": {"allow": allow}} } } } ) as client: anon_response = client.get("/fixtures/compound_three_primary_keys") assert expected_anon == anon_response.status if allow and anon_response.status == 200: # Should be no padlock assert ">compound_three_primary_keys 🔒" not in anon_response.text auth_response = client.get( "/fixtures/compound_three_primary_keys", cookies={"ds_actor": client.actor_cookie({"id": "root"})}, ) assert expected_auth == auth_response.status if allow and expected_anon == 403 and expected_auth == 200: assert ">compound_three_primary_keys 🔒" in auth_response.text def test_table_list_respects_view_table(): with make_app_client( metadata={ "databases": { "fixtures": { "tables": { "compound_three_primary_keys": {"allow": {"id": "root"}}, # And a SQL view too: "paginated_view": {"allow": {"id": "root"}}, } } } } ) as client: html_fragments = [ ">compound_three_primary_keys 🔒", ">paginated_view 🔒", ] anon_response = client.get("/fixtures") for html_fragment in html_fragments: assert html_fragment not in anon_response.text auth_response = client.get( "/fixtures", cookies={"ds_actor": client.actor_cookie({"id": "root"})} ) for html_fragment in html_fragments: assert html_fragment in auth_response.text @pytest.mark.parametrize( "allow,expected_anon,expected_auth", [ (None, 200, 200), ({}, 403, 403), ({"id": "root"}, 403, 200), ], ) def test_view_query(allow, expected_anon, expected_auth): with make_app_client( metadata={ "databases": { "fixtures": {"queries": {"q": {"sql": "select 1 + 1", "allow": allow}}} } } ) as client: anon_response = client.get("/fixtures/q") assert expected_anon == anon_response.status if allow and anon_response.status == 200: # Should be no padlock assert "🔒" not in anon_response.text auth_response = client.get( "/fixtures/q", cookies={"ds_actor": client.actor_cookie({"id": "root"})} ) assert expected_auth == auth_response.status if allow and expected_anon == 403 and expected_auth == 200: assert ">fixtures: q 🔒" in auth_response.text @pytest.mark.parametrize( "metadata", [ {"allow_sql": {"id": "root"}}, {"databases": {"fixtures": {"allow_sql": {"id": "root"}}}}, ], ) def test_execute_sql(metadata): schema_re = re.compile("const schema = ({.*?});", re.DOTALL) with make_app_client(metadata=metadata) as client: form_fragment = '
123_starts_with_digits" in response.text assert ">Table With Space In Name 🔒" in response.text # Queries assert ">from_async_hook 🔒" in response.text assert ">query_two" in response.text # Views assert ">paginated_view 🔒" in response.text assert ">simple_view" in response.text finally: cascade_app_client.ds._metadata_local = previous_metadata DEF = "USE_DEFAULT" @pytest.mark.asyncio @pytest.mark.parametrize( "actor,permission,resource_1,resource_2,expected_result", ( # Without restrictions the defaults apply ({"id": "t"}, "view-instance", None, None, DEF), ({"id": "t"}, "view-database", "one", None, DEF), ({"id": "t"}, "view-table", "one", "t1", DEF), # If there is an _r block, everything gets denied unless explicitly allowed ({"id": "t", "_r": {}}, "view-instance", None, None, False), ({"id": "t", "_r": {}}, "view-database", "one", None, False), ({"id": "t", "_r": {}}, "view-table", "one", "t1", False), # Explicit allowing works at the "a" for all level: ({"id": "t", "_r": {"a": ["vi"]}}, "view-instance", None, None, DEF), ({"id": "t", "_r": {"a": ["vd"]}}, "view-database", "one", None, DEF), ({"id": "t", "_r": {"a": ["vt"]}}, "view-table", "one", "t1", DEF), # But not if it's the wrong permission ({"id": "t", "_r": {"a": ["vd"]}}, "view-instance", None, None, False), ({"id": "t", "_r": {"a": ["vi"]}}, "view-database", "one", None, False), ({"id": "t", "_r": {"a": ["vd"]}}, "view-table", "one", "t1", False), # Works at the "d" for database level: ({"id": "t", "_r": {"d": {"one": ["vd"]}}}, "view-database", "one", None, DEF), ( {"id": "t", "_r": {"d": {"one": ["vdd"]}}}, "view-database-download", "one", None, DEF, ), ({"id": "t", "_r": {"d": {"one": ["es"]}}}, "execute-sql", "one", None, DEF), # Works at the "t" for table level: ( {"id": "t", "_r": {"t": {"one": {"t1": ["vt"]}}}}, "view-table", "one", "t1", DEF, ), ( {"id": "t", "_r": {"t": {"one": {"t1": ["vt"]}}}}, "view-table", "one", "t2", False, ), ), ) async def test_actor_restricted_permissions( perms_ds, actor, permission, resource_1, resource_2, expected_result ): cookies = {"ds_actor": perms_ds.sign({"a": {"id": "root"}}, "actor")} csrftoken = (await perms_ds.client.get("/-/permissions", cookies=cookies)).cookies[ "ds_csrftoken" ] cookies["ds_csrftoken"] = csrftoken response = await perms_ds.client.post( "/-/permissions", data={ "actor": json.dumps(actor), "permission": permission, "resource_1": resource_1, "resource_2": resource_2, "csrftoken": csrftoken, }, cookies=cookies, ) expected_resource = [] if resource_1: expected_resource.append(resource_1) if resource_2: expected_resource.append(resource_2) if len(expected_resource) == 1: expected_resource = expected_resource[0] expected = { "actor": actor, "permission": permission, "resource": expected_resource, "result": expected_result, } assert response.json() == expected PermMetadataTestCase = collections.namedtuple( "PermMetadataTestCase", "metadata,actor,action,resource,default,expected_result", ) @pytest.mark.asyncio @pytest.mark.xfail(reason="Not implemented yet") @pytest.mark.parametrize( "metadata,actor,action,resource,default,expected_result", ( # Simple view-instance default=True example PermMetadataTestCase( metadata={}, actor=None, action="view-instance", resource=None, default=True, expected_result=True, ), # debug-menu on root PermMetadataTestCase( metadata={"permissions": {"debug-menu": {"id": "user"}}}, actor={"id": "user"}, action="debug-menu", resource=None, default=False, expected_result=True, ), ), ) async def test_permissions_in_metadata( perms_ds, metadata, actor, action, resource, default, expected_result ): previous_metadata = perms_ds.metadata() updated_metadata = copy.deepcopy(previous_metadata) updated_metadata.update(metadata) try: result = await perms_ds.permission_allowed( actor, action, resource, default=default ) assert result == expected_result finally: perms_ds._metadata_local = previous_metadata