Pass actor dict to permission_resources_sql

Refine permissions plumbing to feed full actor dictionaries into permission_resources_sql, removing the app-level cache and adjusting allowed_resources_sql and the debug views to use actor dicts. Teach the default permissions plugin to emit config-derived allow/deny rows through the hook, expand plugin and documentation examples, and add regression tests covering config allow blocks and the new hook behaviour.
new-permissions
Simon Willison 2025-09-28 20:34:51 -07:00
rodzic 9d3abdb37b
commit e738ff1f1a
9 zmienionych plików z 540 dodań i 44 usunięć

Wyświetl plik

@ -1037,7 +1037,7 @@ class Datasette:
)
return result
async def allowed_resources_sql(self, actor_id: str, action: str):
async def allowed_resources_sql(self, actor: Optional[dict], action: str):
"""Combine permission_resources_sql PluginSQL blocks into a UNION query.
Returns a (sql, params) tuple suitable for execution against SQLite.
@ -1045,7 +1045,7 @@ class Datasette:
plugin_blocks: List[PluginSQL] = []
for block in pm.hook.permission_resources_sql(
datasette=self,
actor_id=actor_id,
actor=actor,
action=action,
):
block = await await_me_maybe(block)
@ -1066,7 +1066,11 @@ class Datasette:
continue
plugin_blocks.append(candidate)
sql, params = build_rules_union(actor=actor_id, plugins=plugin_blocks)
actor_id = actor.get("id") if actor else None
sql, params = build_rules_union(
actor=str(actor_id) if actor_id is not None else "",
plugins=plugin_blocks,
)
return sql, params
async def permission_allowed_2(
@ -1077,10 +1081,11 @@ class Datasette:
if default is DEFAULT_NOT_SET and action in self.permissions:
default = self.permissions[action].default
if isinstance(actor, dict):
actor_id = actor.get("id")
if isinstance(actor, dict) or actor is None:
actor_dict = actor
else:
actor_id = actor
actor_dict = {"id": actor}
actor_id = actor_dict.get("id") if actor_dict else None
candidate_parent = None
candidate_child = None
@ -1091,10 +1096,7 @@ class Datasette:
elif resource is not None:
raise TypeError("resource must be None, str, or (parent, child) tuple")
union_sql, union_params = await self.allowed_resources_sql(
actor_id=str(actor_id) if actor_id is not None else None,
action=action,
)
union_sql, union_params = await self.allowed_resources_sql(actor_dict, action)
query = f"""
WITH rules AS (

Wyświetl plik

@ -3,7 +3,7 @@ from datasette.utils.permissions import PluginSQL
from datasette.utils import actor_matches_allow
import itsdangerous
import time
from typing import Union, Tuple
from typing import List, Optional, Tuple, Union
@hookimpl
@ -174,23 +174,160 @@ def permission_allowed_default(datasette, actor, action, resource):
@hookimpl
def permission_resources_sql(datasette, actor_id, action):
async def permission_resources_sql(datasette, actor, action):
rules: List[PluginSQL] = []
config_rules = await _config_permission_rules(datasette, actor, action)
rules.extend(config_rules)
default_allow_actions = {
"view-instance",
"view-database",
"view-table",
"execute-sql",
}
if action not in default_allow_actions:
return None
if action in default_allow_actions:
reason = f"default allow for {action}".replace("'", "''")
sql = (
"SELECT NULL AS parent, NULL AS child, 1 AS allow, " f"'{reason}' AS reason"
)
rules.append(
PluginSQL(
source="default_permissions",
sql=sql,
params={},
)
)
reason = f"default allow for {action}".replace("'", "''")
sql = "SELECT NULL AS parent, NULL AS child, 1 AS allow, " f"'{reason}' AS reason"
return PluginSQL(
source="default_permissions",
sql=sql,
params={},
)
if not rules:
return None
if len(rules) == 1:
return rules[0]
return rules
async def _config_permission_rules(datasette, actor, action) -> List[PluginSQL]:
config = datasette.config or {}
if actor is None:
actor_dict: Optional[dict] = None
elif isinstance(actor, dict):
actor_dict = actor
else:
actor_lookup = await datasette.actors_from_ids([actor])
actor_dict = actor_lookup.get(actor) or {"id": actor}
def evaluate(allow_block):
if allow_block is None:
return None
return actor_matches_allow(actor_dict, allow_block)
rows = []
def add_row(parent, child, result, scope):
if result is None:
return
rows.append(
(
parent,
child,
bool(result),
f"config {'allow' if result else 'deny'} {scope}",
)
)
root_perm = (config.get("permissions") or {}).get(action)
add_row(None, None, evaluate(root_perm), f"permissions for {action}")
for db_name, db_config in (config.get("databases") or {}).items():
db_perm = (db_config.get("permissions") or {}).get(action)
add_row(
db_name, None, evaluate(db_perm), f"permissions for {action} on {db_name}"
)
for table_name, table_config in (db_config.get("tables") or {}).items():
table_perm = (table_config.get("permissions") or {}).get(action)
add_row(
db_name,
table_name,
evaluate(table_perm),
f"permissions for {action} on {db_name}/{table_name}",
)
if action == "view-table":
table_allow = (table_config or {}).get("allow")
add_row(
db_name,
table_name,
evaluate(table_allow),
f"allow for {action} on {db_name}/{table_name}",
)
for query_name, query_config in (db_config.get("queries") or {}).items():
query_perm = (query_config.get("permissions") or {}).get(action)
add_row(
db_name,
query_name,
evaluate(query_perm),
f"permissions for {action} on {db_name}/{query_name}",
)
if action == "view-query":
query_allow = (query_config or {}).get("allow")
add_row(
db_name,
query_name,
evaluate(query_allow),
f"allow for {action} on {db_name}/{query_name}",
)
if action == "view-database":
db_allow = db_config.get("allow")
add_row(
db_name, None, evaluate(db_allow), f"allow for {action} on {db_name}"
)
if action == "execute-sql":
db_allow_sql = db_config.get("allow_sql")
add_row(db_name, None, evaluate(db_allow_sql), f"allow_sql for {db_name}")
if action == "view-instance":
allow_block = config.get("allow")
add_row(None, None, evaluate(allow_block), "allow for view-instance")
if action == "view-table":
# Tables handled in loop
pass
if action == "view-query":
# Queries handled in loop
pass
if action == "execute-sql":
allow_sql = config.get("allow_sql")
add_row(None, None, evaluate(allow_sql), "allow_sql")
if action == "view-database":
# already handled per-database
pass
if not rows:
return []
parts = []
params = {}
for idx, (parent, child, allow, reason) in enumerate(rows):
key = f"cfg_{idx}"
parts.append(
f"SELECT :{key}_parent AS parent, :{key}_child AS child, :{key}_allow AS allow, :{key}_reason AS reason"
)
params[f"{key}_parent"] = parent
params[f"{key}_child"] = child
params[f"{key}_allow"] = 1 if allow else 0
params[f"{key}_reason"] = reason
sql = "\nUNION ALL\n".join(parts)
print(sql, params)
return [PluginSQL(source="config_permissions", sql=sql, params=params)]
async def _resolve_config_permissions_blocks(datasette, actor, action, resource):

Wyświetl plik

@ -116,7 +116,7 @@ def permission_allowed(datasette, actor, action, resource):
@hookspec
def permission_resources_sql(datasette, actor_id, action):
def permission_resources_sql(datasette, actor, action):
"""Return datasette.permissions.PluginSQL()"""

Wyświetl plik

@ -28,25 +28,20 @@ class PluginSQL:
def _namespace_params(i: int, params: Dict[str, Any]) -> Tuple[str, Dict[str, Any]]:
"""
Rewrite :user, :parent, :child, :action placeholders to distinct names per plugin block.
Returns (rewritten_sql, namespaced_params). Only replaces those names.
Rewrite parameter placeholders to distinct names per plugin block.
Returns (rewritten_sql, namespaced_params).
"""
# We do string replace on *tokens* ":user", ":parent", ":child", ":action"
# This assumes plugin SQL uses those names *verbatim* when needed.
# Anything else is left untouched.
replacements = {key: f"{key}_{i}" for key in params.keys()}
def rewrite(s: str) -> str:
return (
s.replace(":user", f":user_{i}")
.replace(":parent", f":parent_{i}")
.replace(":child", f":child_{i}")
.replace(":action", f":action_{i}")
)
for key in sorted(replacements.keys(), key=len, reverse=True):
s = s.replace(f":{key}", f":{replacements[key]}")
return s
namespaced: Dict[str, Any] = {}
for key in ("user", "parent", "child", "action"):
if key in params and params[key] is not None:
namespaced[f"{key}_{i}"] = params[key]
for key, value in params.items():
namespaced[replacements[key]] = value
return rewrite, namespaced

Wyświetl plik

@ -236,7 +236,7 @@ class AllowedResourcesView(BaseView):
status=400,
)
actor_id = (request.actor or {}).get("id") if request.actor else None
actor = request.actor if isinstance(request.actor, dict) else None
parent_filter = request.args.get("parent")
child_filter = request.args.get("child")
if child_filter and not parent_filter:
@ -278,7 +278,7 @@ class AllowedResourcesView(BaseView):
return Response.json(
{
"action": action,
"actor_id": actor_id,
"actor_id": (actor or {}).get("id") if actor else None,
"page": page,
"page_size": page_size,
"total": 0,
@ -290,7 +290,7 @@ class AllowedResourcesView(BaseView):
plugins = []
for block in pm.hook.permission_resources_sql(
datasette=self.ds,
actor_id=actor_id,
actor=actor,
action=action,
):
block = await await_me_maybe(block)
@ -311,6 +311,7 @@ class AllowedResourcesView(BaseView):
continue
plugins.append(candidate)
actor_id = actor.get("id") if actor else None
rows = await resolve_permissions_from_catalog(
db,
actor=str(actor_id) if actor_id is not None else "",
@ -323,7 +324,9 @@ class AllowedResourcesView(BaseView):
allowed_rows = [row for row in rows if row["allow"] == 1]
if parent_filter is not None:
allowed_rows = [row for row in allowed_rows if row["parent"] == parent_filter]
allowed_rows = [
row for row in allowed_rows if row["parent"] == parent_filter
]
if child_filter is not None:
allowed_rows = [row for row in allowed_rows if row["child"] == child_filter]
total = len(allowed_rows)
@ -384,7 +387,7 @@ class PermissionRulesView(BaseView):
if action not in self.ds.permissions:
return Response.json({"error": f"Unknown action: {action}"}, status=404)
actor_id = (request.actor or {}).get("id") if request.actor else None
actor = request.actor if isinstance(request.actor, dict) else None
try:
page = int(request.args.get("page", "1"))
@ -402,7 +405,7 @@ class PermissionRulesView(BaseView):
page_size = max_page_size
offset = (page - 1) * page_size
union_sql, union_params = await self.ds.allowed_resources_sql(actor_id, action)
union_sql, union_params = await self.ds.allowed_resources_sql(actor, action)
await self.ds.refresh_schemas()
db = self.ds.get_internal_database()
@ -458,7 +461,7 @@ class PermissionRulesView(BaseView):
response = {
"action": action,
"actor_id": actor_id,
"actor_id": (actor or {}).get("id") if actor else None,
"page": page,
"page_size": page_size,
"total": total,

Wyświetl plik

@ -0,0 +1,166 @@
# Permission Plugin Examples
These snippets show how to use the new `permission_resources_sql` hook to
contribute rows to the action-based permission resolver. Each hook receives the
current actor dictionary (or ``None``) and must return an instance of
`datasette.utils.permissions.PluginSQL` (or a coroutine that resolves to one).
All examples assume the plugin lives in `my_permission_plugin/__init__.py` and
is registered using the standard `entry_points` mechanism.
The hook may return a single `PluginSQL`, `None`, or a list/tuple of
`PluginSQL` objects if you need to contribute multiple rows at once.
## Allow Alice To View A Specific Table
This plugin grants the actor with `id == "alice"` permission to perform the
`view-table` action against the `sales` table inside the `accounting` database.
```python
from datasette import hookimpl
from datasette.utils.permissions import PluginSQL
@hookimpl
def permission_resources_sql(datasette, actor, action):
if action != "view-table":
return None
if not actor or actor.get("id") != "alice":
return None
return PluginSQL(
source="alice_sales_allow",
sql="""
SELECT
'accounting' AS parent,
'sales' AS child,
1 AS allow,
'alice can view accounting/sales' AS reason
""",
params={},
)
```
## Restrict Execute-SQL To A Database Prefix
Only allow `execute-sql` against databases whose name begins with
`analytics_`. This shows how to use parameters that the permission resolver
will pass through to the SQL snippet.
```python
from datasette import hookimpl
from datasette.utils.permissions import PluginSQL
@hookimpl
def permission_resources_sql(datasette, actor, action):
if action != "execute-sql":
return None
return PluginSQL(
source="analytics_execute_sql",
sql="""
SELECT
parent,
NULL AS child,
1 AS allow,
'execute-sql allowed for analytics_*' AS reason
FROM catalog_databases
WHERE database_name LIKE :prefix
""",
params={
"prefix": "analytics_%",
},
)
```
## Read Permissions From A Custom Table
This example stores grants in an internal table called `permission_grants`
with columns `(actor_id, action, parent, child, allow, reason)`.
```python
from datasette import hookimpl
from datasette.utils.permissions import PluginSQL
@hookimpl
def permission_resources_sql(datasette, actor, action):
if not actor:
return None
return PluginSQL(
source="permission_grants_table",
sql="""
SELECT
parent,
child,
allow,
COALESCE(reason, 'permission_grants table') AS reason
FROM permission_grants
WHERE actor_id = :actor_id
AND action = :action
""",
params={
"actor_id": actor.get("id"),
"action": action,
},
)
```
## Default Deny With An Exception
Combine a root-level deny with a specific table allow for trusted users.
The resolver will automatically apply the most specific rule.
```python
from datasette import hookimpl
from datasette.utils.permissions import PluginSQL
TRUSTED = {"alice", "bob"}
@hookimpl
def permission_resources_sql(datasette, actor, action):
if action != "view-table":
return None
actor_id = (actor or {}).get("id")
if actor_id not in TRUSTED:
return PluginSQL(
source="view_table_root_deny",
sql="""
SELECT NULL AS parent, NULL AS child, 0 AS allow,
'default deny view-table' AS reason
""",
params={},
)
return PluginSQL(
source="trusted_allow",
sql="""
SELECT NULL AS parent, NULL AS child, 0 AS allow,
'default deny view-table' AS reason
UNION ALL
SELECT 'reports' AS parent, 'daily_metrics' AS child, 1 AS allow,
'trusted user access' AS reason
""",
params={"actor_id": actor_id},
)
```
The `UNION ALL` ensures the deny rule is always present, while the second row
adds the exception for trusted users.
## Using Datasette.allowed_resources_sql()
Within Datasette itself (or a plugin that has access to a `Datasette` instance)
you can inspect the combined rules for debugging:
```python
sql, params = await datasette.allowed_resources_sql(
actor={"id": "alice"},
action="view-table",
)
print(sql)
print(params)
```
The SQL can then be executed directly or embedded in other queries.

Wyświetl plik

@ -0,0 +1,49 @@
from datasette import hookimpl
from datasette.utils.permissions import PluginSQL
@hookimpl
def permission_resources_sql(datasette, actor, action):
if action != "view-table":
return None
actor_id = (actor or {}).get("id")
root_deny = PluginSQL(
source="example_default_deny",
sql="""
SELECT NULL AS parent, NULL AS child, 0 AS allow,
'example plugin default deny' AS reason
""",
params={},
)
pelican_allow = PluginSQL(
source="pelican_content_tables",
sql="""
SELECT
database_name AS parent,
table_name AS child,
1 AS allow,
'pelican allowed all content tables' AS reason
FROM catalog_tables
WHERE database_name = 'content'
AND :actor_id = 'pelican'
""",
params={"actor_id": actor_id},
)
violin_allow = PluginSQL(
source="violin_content_repos",
sql="""
SELECT
'content' AS parent,
'repos' AS child,
1 AS allow,
'violin allowed content/repos' AS reason
WHERE :actor_id = 'violin'
""",
params={"actor_id": actor_id},
)
return [root_deny, pelican_allow, violin_allow]

Wyświetl plik

@ -0,0 +1,120 @@
import pytest
from datasette.app import Datasette
from datasette.database import Database
async def setup_datasette(config=None, databases=None):
ds = Datasette(memory=True, config=config)
for name in databases or []:
ds.add_database(Database(ds, memory_name=f"{name}_memory"), name=name)
await ds.invoke_startup()
await ds.refresh_schemas()
return ds
@pytest.mark.asyncio
async def test_root_permissions_allow():
config = {"permissions": {"execute-sql": {"id": "alice"}}}
ds = await setup_datasette(config=config, databases=["content"])
assert await ds.permission_allowed_2({"id": "alice"}, "execute-sql", "content")
assert not await ds.permission_allowed_2({"id": "bob"}, "execute-sql", "content")
@pytest.mark.asyncio
async def test_database_permission():
config = {
"databases": {
"content": {
"permissions": {
"insert-row": {"id": "alice"},
}
}
}
}
ds = await setup_datasette(config=config, databases=["content"])
assert await ds.permission_allowed_2(
{"id": "alice"}, "insert-row", ("content", "repos")
)
assert not await ds.permission_allowed_2(
{"id": "bob"}, "insert-row", ("content", "repos")
)
@pytest.mark.asyncio
async def test_table_permission():
config = {
"databases": {
"content": {
"tables": {"repos": {"permissions": {"delete-row": {"id": "alice"}}}}
}
}
}
ds = await setup_datasette(config=config, databases=["content"])
assert await ds.permission_allowed_2(
{"id": "alice"}, "delete-row", ("content", "repos")
)
assert not await ds.permission_allowed_2(
{"id": "bob"}, "delete-row", ("content", "repos")
)
@pytest.mark.asyncio
async def test_view_table_allow_block():
config = {
"databases": {"content": {"tables": {"repos": {"allow": {"id": "alice"}}}}}
}
ds = await setup_datasette(config=config, databases=["content"])
assert await ds.permission_allowed_2(
{"id": "alice"}, "view-table", ("content", "repos")
)
assert not await ds.permission_allowed_2(
{"id": "bob"}, "view-table", ("content", "repos")
)
assert await ds.permission_allowed_2(
{"id": "bob"}, "view-table", ("content", "other")
)
@pytest.mark.asyncio
async def test_view_table_allow_false_blocks():
config = {
"databases": {"content": {"tables": {"repos": {"allow": False}}}}
}
ds = await setup_datasette(config=config, databases=["content"])
assert not await ds.permission_allowed_2(
{"id": "alice"}, "view-table", ("content", "repos")
)
@pytest.mark.asyncio
async def test_allow_sql_blocks():
config = {"allow_sql": {"id": "alice"}}
ds = await setup_datasette(config=config, databases=["content"])
assert await ds.permission_allowed_2({"id": "alice"}, "execute-sql", "content")
assert not await ds.permission_allowed_2({"id": "bob"}, "execute-sql", "content")
config = {"databases": {"content": {"allow_sql": {"id": "bob"}}}}
ds = await setup_datasette(config=config, databases=["content"])
assert await ds.permission_allowed_2({"id": "bob"}, "execute-sql", "content")
assert not await ds.permission_allowed_2({"id": "alice"}, "execute-sql", "content")
config = {"allow_sql": False}
ds = await setup_datasette(config=config, databases=["content"])
assert not await ds.permission_allowed_2({"id": "alice"}, "execute-sql", "content")
@pytest.mark.asyncio
async def test_view_instance_allow_block():
config = {"allow": {"id": "alice"}}
ds = await setup_datasette(config=config)
assert await ds.permission_allowed_2({"id": "alice"}, "view-instance")
assert not await ds.permission_allowed_2({"id": "bob"}, "view-instance")

Wyświetl plik

@ -12,8 +12,9 @@ from datasette.app import Datasette
from datasette import cli, hookimpl, Permission
from datasette.filters import FilterArguments
from datasette.plugins import get_plugins, DEFAULT_PLUGINS, pm
from datasette.utils.permissions import PluginSQL
from datasette.utils.sqlite import sqlite3
from datasette.utils import StartupError
from datasette.utils import StartupError, await_me_maybe
from jinja2 import ChoiceLoader, FileSystemLoader
import base64
import datetime
@ -701,6 +702,29 @@ async def test_hook_permission_allowed(action, expected):
pm.unregister(name="undo_register_extras")
@pytest.mark.asyncio
async def test_hook_permission_resources_sql():
ds = Datasette()
await ds.invoke_startup()
collected = []
for block in pm.hook.permission_resources_sql(
datasette=ds,
actor={"id": "alice"},
action="view-table",
):
block = await await_me_maybe(block)
if block is None:
continue
if isinstance(block, (list, tuple)):
collected.extend(block)
else:
collected.append(block)
assert collected
assert all(isinstance(item, PluginSQL) for item in collected)
@pytest.mark.asyncio
async def test_actor_json(ds_client):
assert (await ds_client.get("/-/actor.json")).json() == {"actor": None}