check_visibility can now take multiple permissions into account

Closes #1829
master
Simon Willison 2022-10-23 19:11:33 -07:00 zatwierdzone przez GitHub
rodzic 6887c12ea3
commit 78dad236df
Nie znaleziono w bazie danych klucza dla tego podpisu
ID klucza GPG: 4AEE18F83AFDEB23
10 zmienionych plików z 196 dodań i 82 usunięć

Wyświetl plik

@ -1,5 +1,5 @@
import asyncio
from typing import Sequence, Union, Tuple
from typing import Sequence, Union, Tuple, Optional
import asgi_csrf
import collections
import datetime
@ -707,7 +707,7 @@ class Datasette:
Raises datasette.Forbidden() if any of the checks fail
"""
assert actor is None or isinstance(actor, dict)
assert actor is None or isinstance(actor, dict), "actor must be None or a dict"
for permission in permissions:
if isinstance(permission, str):
action = permission
@ -732,23 +732,34 @@ class Datasette:
else:
raise Forbidden(action)
async def check_visibility(self, actor, action, resource):
async def check_visibility(
self,
actor: dict,
action: Optional[str] = None,
resource: Optional[Union[str, Tuple[str, str]]] = None,
permissions: Optional[
Sequence[Union[Tuple[str, Union[str, Tuple[str, str]]], str]]
] = None,
):
"""Returns (visible, private) - visible = can you see it, private = can others see it too"""
visible = await self.permission_allowed(
actor,
action,
resource=resource,
default=True,
)
if not visible:
if permissions:
assert (
not action and not resource
), "Can't use action= or resource= with permissions="
else:
permissions = [(action, resource)]
try:
await self.ensure_permissions(actor, permissions)
except Forbidden:
return False, False
private = not await self.permission_allowed(
None,
action,
resource=resource,
default=True,
)
return visible, private
# User can see it, but can the anonymous user see it?
try:
await self.ensure_permissions(None, permissions)
except Forbidden:
# It's visible but private
return True, True
# It's visible to everyone
return True, False
async def execute(
self,

Wyświetl plik

@ -20,7 +20,7 @@
{% endblock %}
{% block content %}
<h1 style="padding-left: 10px; border-left: 10px solid #{{ database_color(database) }}">{{ table }}: {{ ', '.join(primary_key_values) }}</h1>
<h1 style="padding-left: 10px; border-left: 10px solid #{{ database_color(database) }}">{{ table }}: {{ ', '.join(primary_key_values) }}{% if private %} 🔒{% endif %}</h1>
{% block description_source_license %}{% include "_description_source_license.html" %}{% endblock %}

Wyświetl plik

@ -40,13 +40,16 @@ class DatabaseView(DataView):
raise NotFound("Database not found: {}".format(database_route))
database = db.name
await self.ds.ensure_permissions(
visible, private = await self.ds.check_visibility(
request.actor,
[
permissions=[
("view-database", database),
"view-instance",
],
)
if not visible:
raise Forbidden("You do not have permission to view this database")
metadata = (self.ds.metadata("databases") or {}).get(database, {})
self.ds.update_with_inherited_metadata(metadata)
@ -63,27 +66,27 @@ class DatabaseView(DataView):
views = []
for view_name in await db.view_names():
visible, private = await self.ds.check_visibility(
view_visible, view_private = await self.ds.check_visibility(
request.actor,
"view-table",
(database, view_name),
)
if visible:
if view_visible:
views.append(
{
"name": view_name,
"private": private,
"private": view_private,
}
)
tables = []
for table in table_counts:
visible, private = await self.ds.check_visibility(
table_visible, table_private = await self.ds.check_visibility(
request.actor,
"view-table",
(database, table),
)
if not visible:
if not table_visible:
continue
table_columns = await db.table_columns(table)
tables.append(
@ -95,7 +98,7 @@ class DatabaseView(DataView):
"hidden": table in hidden_table_names,
"fts_table": await db.fts_table(table),
"foreign_keys": all_foreign_keys[table],
"private": private,
"private": table_private,
}
)
@ -104,13 +107,13 @@ class DatabaseView(DataView):
for query in (
await self.ds.get_canned_queries(database, request.actor)
).values():
visible, private = await self.ds.check_visibility(
query_visible, query_private = await self.ds.check_visibility(
request.actor,
"view-query",
(database, query["name"]),
)
if visible:
canned_queries.append(dict(query, private=private))
if query_visible:
canned_queries.append(dict(query, private=query_private))
async def database_actions():
links = []
@ -130,15 +133,13 @@ class DatabaseView(DataView):
return (
{
"database": database,
"private": private,
"path": self.ds.urls.database(database),
"size": db.size,
"tables": tables,
"hidden_count": len([t for t in tables if t["hidden"]]),
"views": views,
"queries": canned_queries,
"private": not await self.ds.permission_allowed(
None, "view-database", database, default=True
),
"allow_execute_sql": await self.ds.permission_allowed(
request.actor, "execute-sql", database, default=True
),
@ -227,17 +228,17 @@ class QueryView(DataView):
private = False
if canned_query:
# Respect canned query permissions
await self.ds.ensure_permissions(
visible, private = await self.ds.check_visibility(
request.actor,
[
permissions=[
("view-query", (database, canned_query)),
("view-database", database),
"view-instance",
],
)
private = not await self.ds.permission_allowed(
None, "view-query", (database, canned_query), default=True
)
if not visible:
raise Forbidden("You do not have permission to view this query")
else:
await self.ds.ensure_permissions(request.actor, [("execute-sql", database)])

Wyświetl plik

@ -23,25 +23,25 @@ class IndexView(BaseView):
await self.ds.ensure_permissions(request.actor, ["view-instance"])
databases = []
for name, db in self.ds.databases.items():
visible, database_private = await self.ds.check_visibility(
database_visible, database_private = await self.ds.check_visibility(
request.actor,
"view-database",
name,
)
if not visible:
if not database_visible:
continue
table_names = await db.table_names()
hidden_table_names = set(await db.hidden_table_names())
views = []
for view_name in await db.view_names():
visible, private = await self.ds.check_visibility(
view_visible, view_private = await self.ds.check_visibility(
request.actor,
"view-table",
(name, view_name),
)
if visible:
views.append({"name": view_name, "private": private})
if view_visible:
views.append({"name": view_name, "private": view_private})
# Perform counts only for immutable or DBS with <= COUNT_TABLE_LIMIT tables
table_counts = {}

Wyświetl plik

@ -1,4 +1,4 @@
from datasette.utils.asgi import NotFound
from datasette.utils.asgi import NotFound, Forbidden
from datasette.database import QueryInterrupted
from .base import DataView
from datasette.utils import (
@ -21,14 +21,19 @@ class RowView(DataView):
except KeyError:
raise NotFound("Database not found: {}".format(database_route))
database = db.name
await self.ds.ensure_permissions(
# Ensure user has permission to view this row
visible, private = await self.ds.check_visibility(
request.actor,
[
permissions=[
("view-table", (database, table)),
("view-database", database),
"view-instance",
],
)
if not visible:
raise Forbidden("You do not have permission to view this table")
pk_values = urlsafe_components(request.url_vars["pks"])
try:
db = self.ds.get_database(route=database_route)
@ -55,6 +60,7 @@ class RowView(DataView):
for column in display_columns:
column["sortable"] = False
return {
"private": private,
"foreign_key_tables": await self.foreign_key_tables(
database, table, pk_values
),

Wyświetl plik

@ -28,7 +28,7 @@ from datasette.utils import (
urlsafe_components,
value_as_boolean,
)
from datasette.utils.asgi import BadRequest, NotFound
from datasette.utils.asgi import BadRequest, Forbidden, NotFound
from datasette.filters import Filters
from .base import DataView, DatasetteError, ureg
from .database import QueryView
@ -213,18 +213,16 @@ class TableView(DataView):
raise NotFound(f"Table not found: {table_name}")
# Ensure user has permission to view this table
await self.ds.ensure_permissions(
visible, private = await self.ds.check_visibility(
request.actor,
[
permissions=[
("view-table", (database_name, table_name)),
("view-database", database_name),
"view-instance",
],
)
private = not await self.ds.permission_allowed(
None, "view-table", (database_name, table_name), default=True
)
if not visible:
raise Forbidden("You do not have permission to view this table")
# Handle ?_filter_column and redirect, if present
redirect_params = filters_should_redirect(request.args)

Wyświetl plik

@ -349,7 +349,7 @@ await .ensure_permissions(actor, permissions)
``permissions`` - list
A list of permissions to check. Each permission in that list can be a string ``action`` name or a 2-tuple of ``(action, resource)``.
This method allows multiple permissions to be checked at onced. It raises a ``datasette.Forbidden`` exception if any of the checks are denied before one of them is explicitly granted.
This method allows multiple permissions to be checked at once. It raises a ``datasette.Forbidden`` exception if any of the checks are denied before one of them is explicitly granted.
This is useful when you need to check multiple permissions at once. For example, an actor should be able to view a table if either one of the following checks returns ``True`` or not a single one of them returns ``False``:
@ -366,18 +366,21 @@ This is useful when you need to check multiple permissions at once. For example,
.. _datasette_check_visibilty:
await .check_visibility(actor, action, resource=None)
-----------------------------------------------------
await .check_visibility(actor, action=None, resource=None, permissions=None)
----------------------------------------------------------------------------
``actor`` - dictionary
The authenticated actor. This is usually ``request.actor``.
``action`` - string
``action`` - string, optional
The name of the action that is being permission checked.
``resource`` - string or tuple, optional
The resource, e.g. the name of the database, or a tuple of two strings containing the name of the database and the name of the table. Only some permissions apply to a resource.
``permissions`` - list of ``action`` strings or ``(action, resource)`` tuples, optional
Provide this instead of ``action`` and ``resource`` to check multiple permissions at once.
This convenience method can be used to answer the question "should this item be considered private, in that it is visible to me but it is not visible to anonymous users?"
It returns a tuple of two booleans, ``(visible, private)``. ``visible`` indicates if the actor can see this resource. ``private`` will be ``True`` if an anonymous user would not be able to view the resource.
@ -387,7 +390,22 @@ This example checks if the user can access a specific table, and sets ``private`
.. code-block:: python
visible, private = await self.ds.check_visibility(
request.actor, "view-table", (database, table)
request.actor,
action="view-table",
resource=(database, table),
)
The following example runs three checks in a row, similar to :ref:`datasette_ensure_permissions`. If any of the checks are denied before one of them is explicitly granted then ``visible`` will be ``False``. ``private`` will be ``True`` if an anonymous user would not be able to view the resource.
.. code-block:: python
visible, private = await self.ds.check_visibility(
request.actor,
permissions=[
("view-table", (database, table)),
("view-database", database),
"view-instance",
],
)
.. _datasette_get_database:

Wyświetl plik

@ -1,6 +1,7 @@
"""
Tests for the datasette.app.Datasette class
"""
from datasette import Forbidden
from datasette.app import Datasette, Database
from itsdangerous import BadSignature
from .fixtures import app_client
@ -75,3 +76,52 @@ async def test_num_sql_threads_zero():
assert response.json() == {"num_threads": 0, "threads": []}
response2 = await ds.client.get("/test_num_sql_threads_zero/t.json?_shape=array")
assert response2.json() == [{"id": 1}]
ROOT = {"id": "root"}
ALLOW_ROOT = {"allow": {"id": "root"}}
@pytest.mark.asyncio
@pytest.mark.parametrize(
"actor,metadata,permissions,should_allow,expected_private",
(
(None, ALLOW_ROOT, ["view-instance"], False, False),
(ROOT, ALLOW_ROOT, ["view-instance"], True, True),
(
None,
{"databases": {"_memory": ALLOW_ROOT}},
[("view-database", "_memory")],
False,
False,
),
(
ROOT,
{"databases": {"_memory": ALLOW_ROOT}},
[("view-database", "_memory")],
True,
True,
),
# Check private is false for non-protected instance check
(
ROOT,
{"allow": True},
["view-instance"],
True,
False,
),
),
)
async def test_datasette_ensure_permissions_check_visibility(
actor, metadata, permissions, should_allow, expected_private
):
ds = Datasette([], memory=True, metadata=metadata)
if not should_allow:
with pytest.raises(Forbidden):
await ds.ensure_permissions(actor, permissions)
else:
await ds.ensure_permissions(actor, permissions)
# And try check_visibility too:
visible, private = await ds.check_visibility(actor, permissions=permissions)
assert visible == should_allow
assert private == expected_private

Wyświetl plik

@ -5,6 +5,20 @@ import pytest
import urllib
@pytest.fixture(scope="module")
def padlock_client():
with make_app_client(
metadata={
"databases": {
"fixtures": {
"queries": {"two": {"sql": "select 1 + 1"}},
}
}
}
) as client:
yield client
@pytest.mark.parametrize(
"allow,expected_anon,expected_auth",
[
@ -13,27 +27,33 @@ import urllib
({"id": "root"}, 403, 200),
],
)
def test_view_instance(allow, expected_anon, expected_auth):
with make_app_client(metadata={"allow": allow}) as client:
for path in (
"/",
"/fixtures",
"/fixtures/compound_three_primary_keys",
"/fixtures/compound_three_primary_keys/a,a,a",
):
anon_response = client.get(path)
assert expected_anon == anon_response.status
if allow and path == "/" and anon_response.status == 200:
# Should be no padlock
assert "<h1>Datasette 🔒</h1>" not in anon_response.text
auth_response = client.get(
path,
cookies={"ds_actor": client.actor_cookie({"id": "root"})},
)
assert expected_auth == auth_response.status
# Check for the padlock
if allow and path == "/" and expected_anon == 403 and expected_auth == 200:
assert "<h1>Datasette 🔒</h1>" in auth_response.text
@pytest.mark.parametrize(
"path",
(
"/",
"/fixtures",
"/fixtures/compound_three_primary_keys",
"/fixtures/compound_three_primary_keys/a,a,a",
"/fixtures/two", # Query
),
)
def test_view_padlock(allow, expected_anon, expected_auth, path, padlock_client):
padlock_client.ds._metadata_local["allow"] = allow
fragment = "🔒</h1>"
anon_response = padlock_client.get(path)
assert expected_anon == anon_response.status
if allow and anon_response.status == 200:
# Should be no padlock
assert fragment not in anon_response.text
auth_response = padlock_client.get(
path,
cookies={"ds_actor": padlock_client.actor_cookie({"id": "root"})},
)
assert expected_auth == auth_response.status
# Check for the padlock
if allow and expected_anon == 403 and expected_auth == 200:
assert fragment in auth_response.text
del padlock_client.ds._metadata_local["allow"]
@pytest.mark.parametrize(
@ -467,6 +487,10 @@ def test_permissions_cascade(cascade_app_client, path, permissions, expected_sta
path,
cookies={"ds_actor": cascade_app_client.actor_cookie(actor)},
)
assert expected_status == response.status
assert (
response.status == expected_status
), "path: {}, permissions: {}, expected_status: {}, status: {}".format(
path, permissions, expected_status, response.status
)
finally:
cascade_app_client.ds._metadata_local = previous_metadata
cascade_app_client.ds._local_metadata = previous_metadata

Wyświetl plik

@ -823,8 +823,14 @@ def test_hook_forbidden(restore_working_directory):
assert 403 == response.status
response2 = client.get("/data2")
assert 302 == response2.status
assert "/login?message=view-database" == response2.headers["Location"]
assert "view-database" == client.ds._last_forbidden_message
assert (
response2.headers["Location"]
== "/login?message=You do not have permission to view this database"
)
assert (
client.ds._last_forbidden_message
== "You do not have permission to view this database"
)
def test_hook_handle_exception(app_client):