From c5d30b58a1cd1c66bbddcf3561db005543ecaf25 Mon Sep 17 00:00:00 2001 From: Simon Willison Date: Mon, 12 Dec 2022 18:40:45 -0800 Subject: [PATCH] Implemented metadata permissions: property, closes #1636 --- datasette/default_permissions.py | 55 ++++++++++- tests/test_permissions.py | 151 +++++++++++++++++++++++++++++-- 2 files changed, 195 insertions(+), 11 deletions(-) diff --git a/datasette/default_permissions.py b/datasette/default_permissions.py index 27e6d61b..9c274c93 100644 --- a/datasette/default_permissions.py +++ b/datasette/default_permissions.py @@ -79,7 +79,60 @@ def permission_allowed_default(datasette, actor, action, resource): async def _resolve_metadata_permissions_blocks(datasette, actor, action, resource): - # Check custom permissions: blocks - not yet implemented + # Check custom permissions: blocks + metadata = datasette.metadata() + root_block = (metadata.get("permissions", None) or {}).get(action) + if root_block: + root_result = actor_matches_allow(actor, root_block) + if root_result is not None: + return root_result + # Now try database-specific blocks + if not resource: + return None + if isinstance(resource, str): + database = resource + else: + database = resource[0] + database_block = ( + (metadata.get("databases", {}).get(database, {}).get("permissions", None)) or {} + ).get(action) + if database_block: + database_result = actor_matches_allow(actor, database_block) + if database_result is not None: + return database_result + # Finally try table/query specific blocks + if not isinstance(resource, tuple): + return None + database, table_or_query = resource + table_block = ( + ( + metadata.get("databases", {}) + .get(database, {}) + .get("tables", {}) + .get(table_or_query, {}) + .get("permissions", None) + ) + or {} + ).get(action) + if table_block: + table_result = actor_matches_allow(actor, table_block) + if table_result is not None: + return table_result + # Finally the canned queries + query_block = ( + ( + metadata.get("databases", {}) + .get(database, {}) + .get("queries", {}) + .get(table_or_query, {}) + .get("permissions", None) + ) + or {} + ).get(action) + if query_block: + query_result = actor_matches_allow(actor, query_block) + if query_result is not None: + return query_result return None diff --git a/tests/test_permissions.py b/tests/test_permissions.py index 50237ea0..8ee80889 100644 --- a/tests/test_permissions.py +++ b/tests/test_permissions.py @@ -4,6 +4,7 @@ from .fixtures import app_client, assert_permissions_checked, make_app_client from bs4 import BeautifulSoup as Soup import copy import json +from pprint import pprint import pytest_asyncio import pytest import re @@ -645,14 +646,13 @@ async def test_actor_restricted_permissions( PermMetadataTestCase = collections.namedtuple( "PermMetadataTestCase", - "metadata,actor,action,resource,default,expected_result", + "metadata,actor,action,resource,expected_result", ) @pytest.mark.asyncio -@pytest.mark.xfail(reason="Not implemented yet") @pytest.mark.parametrize( - "metadata,actor,action,resource,default,expected_result", + "metadata,actor,action,resource,expected_result", ( # Simple view-instance default=True example PermMetadataTestCase( @@ -660,7 +660,6 @@ PermMetadataTestCase = collections.namedtuple( actor=None, action="view-instance", resource=None, - default=True, expected_result=True, ), # debug-menu on root @@ -669,21 +668,153 @@ PermMetadataTestCase = collections.namedtuple( actor={"id": "user"}, action="debug-menu", resource=None, - default=False, + expected_result=True, + ), + # debug-menu on root, wrong actor + PermMetadataTestCase( + metadata={"permissions": {"debug-menu": {"id": "user"}}}, + actor={"id": "user2"}, + action="debug-menu", + resource=None, + expected_result=False, + ), + # create-table on root + PermMetadataTestCase( + metadata={"permissions": {"create-table": {"id": "user"}}}, + actor={"id": "user"}, + action="create-table", + resource=None, + expected_result=True, + ), + # create-table on database - no resource specified + PermMetadataTestCase( + metadata={ + "databases": {"db1": {"permissions": {"create-table": {"id": "user"}}}} + }, + actor={"id": "user"}, + action="create-table", + resource=None, + expected_result=False, + ), + # create-table on database + PermMetadataTestCase( + metadata={ + "databases": {"db1": {"permissions": {"create-table": {"id": "user"}}}} + }, + actor={"id": "user"}, + action="create-table", + resource="db1", + expected_result=True, + ), + # insert-row on root, wrong actor + PermMetadataTestCase( + metadata={"permissions": {"insert-row": {"id": "user"}}}, + actor={"id": "user2"}, + action="insert-row", + resource=("db1", "t1"), + expected_result=False, + ), + # insert-row on root, right actor + PermMetadataTestCase( + metadata={"permissions": {"insert-row": {"id": "user"}}}, + actor={"id": "user"}, + action="insert-row", + resource=("db1", "t1"), + expected_result=True, + ), + # insert-row on database + PermMetadataTestCase( + metadata={ + "databases": {"db1": {"permissions": {"insert-row": {"id": "user"}}}} + }, + actor={"id": "user"}, + action="insert-row", + resource="db1", + expected_result=True, + ), + # insert-row on table, wrong table + PermMetadataTestCase( + metadata={ + "databases": { + "db1": { + "tables": { + "t1": {"permissions": {"insert-row": {"id": "user"}}} + } + } + } + }, + actor={"id": "user"}, + action="insert-row", + resource=("db1", "t2"), + expected_result=False, + ), + # insert-row on table, right table + PermMetadataTestCase( + metadata={ + "databases": { + "db1": { + "tables": { + "t1": {"permissions": {"insert-row": {"id": "user"}}} + } + } + } + }, + actor={"id": "user"}, + action="insert-row", + resource=("db1", "t1"), + expected_result=True, + ), + # view-query on canned query, wrong actor + PermMetadataTestCase( + metadata={ + "databases": { + "db1": { + "queries": { + "q1": { + "sql": "select 1 + 1", + "permissions": {"view-query": {"id": "user"}}, + } + } + } + } + }, + actor={"id": "user2"}, + action="view-query", + resource=("db1", "q1"), + expected_result=False, + ), + # view-query on canned query, right actor + PermMetadataTestCase( + metadata={ + "databases": { + "db1": { + "queries": { + "q1": { + "sql": "select 1 + 1", + "permissions": {"view-query": {"id": "user"}}, + } + } + } + } + }, + actor={"id": "user"}, + action="view-query", + resource=("db1", "q1"), expected_result=True, ), ), ) async def test_permissions_in_metadata( - perms_ds, metadata, actor, action, resource, default, expected_result + perms_ds, metadata, actor, action, resource, expected_result ): previous_metadata = perms_ds.metadata() updated_metadata = copy.deepcopy(previous_metadata) updated_metadata.update(metadata) + perms_ds._metadata_local = updated_metadata try: - result = await perms_ds.permission_allowed( - actor, action, resource, default=default - ) - assert result == expected_result + result = await perms_ds.permission_allowed(actor, action, resource) + if result != expected_result: + pprint(perms_ds._permission_checks) + assert result == expected_result finally: perms_ds._metadata_local = previous_metadata