datasette/tests/test_restriction_sql.py

316 wiersze
11 KiB
Python

import pytest
from datasette.app import Datasette
from datasette.permissions import PermissionSQL
from datasette.resources import TableResource
@pytest.mark.asyncio
async def test_multiple_restriction_sources_intersect():
"""
Test that when multiple plugins return restriction_sql, they are INTERSECTed.
This tests the case where both actor _r restrictions AND a plugin
provide restriction_sql - both must pass for access to be granted.
"""
from datasette import hookimpl
class RestrictivePlugin:
__name__ = "RestrictivePlugin"
@hookimpl
def permission_resources_sql(self, datasette, actor, action):
# Plugin adds additional restriction: only db1_multi_intersect allowed
if action == "view-table":
return PermissionSQL(
restriction_sql="SELECT 'db1_multi_intersect' AS parent, NULL AS child",
params={},
)
return None
plugin = RestrictivePlugin()
ds = Datasette()
await ds.invoke_startup()
ds.pm.register(plugin, name="restrictive_plugin")
try:
db1 = ds.add_memory_database("db1_multi_intersect")
db2 = ds.add_memory_database("db2_multi_intersect")
await db1.execute_write("CREATE TABLE t1 (id INTEGER)")
await db2.execute_write("CREATE TABLE t1 (id INTEGER)")
await ds._refresh_schemas() # Populate catalog tables
# Actor has restrictions allowing both databases
# But plugin only allows db1_multi_intersect
# INTERSECT means only db1_multi_intersect/t1 should pass
actor = {
"id": "user",
"_r": {"d": {"db1_multi_intersect": ["vt"], "db2_multi_intersect": ["vt"]}},
}
page = await ds.allowed_resources("view-table", actor)
resources = {(r.parent, r.child) for r in page.resources}
# Should only see db1_multi_intersect/t1 (intersection of actor restrictions and plugin restrictions)
assert ("db1_multi_intersect", "t1") in resources
assert ("db2_multi_intersect", "t1") not in resources
finally:
ds.pm.unregister(name="restrictive_plugin")
@pytest.mark.asyncio
async def test_restriction_sql_with_overlapping_databases_and_tables():
"""
Test actor with both database-level and table-level restrictions for same database.
When actor has:
- Database-level: db1_overlapping allowed (all tables)
- Table-level: db1_overlapping/t1 allowed
Both entries are UNION'd (OR'ed) within the actor's restrictions.
Database-level restriction allows ALL tables, so table-level is redundant.
"""
ds = Datasette()
await ds.invoke_startup()
db = ds.add_memory_database("db1_overlapping")
await db.execute_write("CREATE TABLE t1 (id INTEGER)")
await db.execute_write("CREATE TABLE t2 (id INTEGER)")
await ds._refresh_schemas()
# Actor has BOTH database-level (db1_overlapping all tables) AND table-level (db1_overlapping/t1 only)
actor = {
"id": "user",
"_r": {
"d": {
"db1_overlapping": ["vt"]
}, # Database-level: all tables in db1_overlapping
"r": {
"db1_overlapping": {"t1": ["vt"]}
}, # Table-level: only t1 in db1_overlapping
},
}
# Within actor restrictions, entries are UNION'd (OR'ed):
# - Database level allows: (db1_overlapping, NULL) → matches all tables via hierarchical matching
# - Table level allows: (db1_overlapping, t1) → redundant, already covered by database level
# Result: Both tables are allowed
page = await ds.allowed_resources("view-table", actor)
resources = {(r.parent, r.child) for r in page.resources}
assert ("db1_overlapping", "t1") in resources
# Database-level restriction allows all tables, so t2 is also allowed
assert ("db1_overlapping", "t2") in resources
@pytest.mark.asyncio
async def test_restriction_sql_empty_allowlist_query():
"""
Test the specific SQL query generated when action is not in allowlist.
actor_restrictions_sql() returns "SELECT NULL AS parent, NULL AS child WHERE 0"
Verify this produces an empty result set.
"""
ds = Datasette()
await ds.invoke_startup()
db = ds.add_memory_database("db1_empty_allowlist")
await db.execute_write("CREATE TABLE t1 (id INTEGER)")
await ds._refresh_schemas()
# Actor has restrictions but action not in allowlist
actor = {"id": "user", "_r": {"r": {"db1_empty_allowlist": {"t1": ["vt"]}}}}
# Try to view-database (only view-table is in allowlist)
page = await ds.allowed_resources("view-database", actor)
# Should be empty
assert len(page.resources) == 0
@pytest.mark.asyncio
async def test_restriction_sql_with_pagination():
"""
Test that restrictions work correctly with keyset pagination.
"""
ds = Datasette()
await ds.invoke_startup()
db = ds.add_memory_database("db1_pagination")
# Create many tables
for i in range(10):
await db.execute_write(f"CREATE TABLE t{i:02d} (id INTEGER)")
await ds._refresh_schemas()
# Actor restricted to only odd-numbered tables
restrictions = {"r": {"db1_pagination": {}}}
for i in range(10):
if i % 2 == 1: # Only odd tables
restrictions["r"]["db1_pagination"][f"t{i:02d}"] = ["vt"]
actor = {"id": "user", "_r": restrictions}
# Get first page with small limit
page1 = await ds.allowed_resources(
"view-table", actor, parent="db1_pagination", limit=2
)
assert len(page1.resources) == 2
assert page1.next is not None
# Get second page using next token
page2 = await ds.allowed_resources(
"view-table", actor, parent="db1_pagination", limit=2, next=page1.next
)
assert len(page2.resources) == 2
# Should have no overlap
page1_ids = {r.child for r in page1.resources}
page2_ids = {r.child for r in page2.resources}
assert page1_ids.isdisjoint(page2_ids)
# All should be odd-numbered tables
all_ids = page1_ids | page2_ids
for table_id in all_ids:
table_num = int(table_id[1:]) # Extract number from "t01", "t03", etc.
assert table_num % 2 == 1, f"Table {table_id} should be odd-numbered"
@pytest.mark.asyncio
async def test_also_requires_with_restrictions():
"""
Test that also_requires actions properly respect restrictions.
execute-sql requires view-database. With restrictions, both must pass.
"""
ds = Datasette()
await ds.invoke_startup()
db1 = ds.add_memory_database("db1_also_requires")
db2 = ds.add_memory_database("db2_also_requires")
await ds._refresh_schemas()
# Actor restricted to only db1_also_requires for view-database
# execute-sql requires view-database, so should only work on db1_also_requires
actor = {
"id": "user",
"_r": {
"d": {
"db1_also_requires": ["vd", "es"],
"db2_also_requires": [
"es"
], # They have execute-sql but not view-database
}
},
}
# db1_also_requires should allow execute-sql
result = await ds.allowed(
action="execute-sql",
resource=TableResource("db1_also_requires", None),
actor=actor,
)
assert result is True
# db2_also_requires should not (they have execute-sql but not view-database)
result = await ds.allowed(
action="execute-sql",
resource=TableResource("db2_also_requires", None),
actor=actor,
)
assert result is False
@pytest.mark.asyncio
async def test_restriction_abbreviations_and_full_names():
"""
Test that both abbreviations and full action names work in restrictions.
"""
ds = Datasette()
await ds.invoke_startup()
db = ds.add_memory_database("db1_abbrev")
await db.execute_write("CREATE TABLE t1 (id INTEGER)")
await ds._refresh_schemas()
# Test with abbreviation
actor_abbr = {"id": "user", "_r": {"r": {"db1_abbrev": {"t1": ["vt"]}}}}
result = await ds.allowed(
action="view-table",
resource=TableResource("db1_abbrev", "t1"),
actor=actor_abbr,
)
assert result is True
# Test with full name
actor_full = {"id": "user", "_r": {"r": {"db1_abbrev": {"t1": ["view-table"]}}}}
result = await ds.allowed(
action="view-table",
resource=TableResource("db1_abbrev", "t1"),
actor=actor_full,
)
assert result is True
# Test with mixed
actor_mixed = {"id": "user", "_r": {"d": {"db1_abbrev": ["view-database", "vt"]}}}
result = await ds.allowed(
action="view-table",
resource=TableResource("db1_abbrev", "t1"),
actor=actor_mixed,
)
assert result is True
@pytest.mark.asyncio
async def test_permission_resources_sql_multiple_restriction_sources_intersect():
"""
Test that when multiple plugins return restriction_sql, they are INTERSECTed.
This tests the case where both actor _r restrictions AND a plugin
provide restriction_sql - both must pass for access to be granted.
"""
from datasette import hookimpl
class RestrictivePlugin:
__name__ = "RestrictivePlugin"
@hookimpl
def permission_resources_sql(self, datasette, actor, action):
# Plugin adds additional restriction: only db1_multi_restrictions allowed
if action == "view-table":
return PermissionSQL(
restriction_sql="SELECT 'db1_multi_restrictions' AS parent, NULL AS child",
params={},
)
return None
plugin = RestrictivePlugin()
ds = Datasette()
await ds.invoke_startup()
ds.pm.register(plugin, name="restrictive_plugin")
try:
db1 = ds.add_memory_database("db1_multi_restrictions")
db2 = ds.add_memory_database("db2_multi_restrictions")
await db1.execute_write("CREATE TABLE t1 (id INTEGER)")
await db2.execute_write("CREATE TABLE t1 (id INTEGER)")
await ds._refresh_schemas() # Populate catalog tables
# Actor has restrictions allowing both databases
# But plugin only allows db1
# INTERSECT means only db1/t1 should pass
actor = {
"id": "user",
"_r": {
"d": {
"db1_multi_restrictions": ["vt"],
"db2_multi_restrictions": ["vt"],
}
},
}
page = await ds.allowed_resources("view-table", actor)
resources = {(r.parent, r.child) for r in page.resources}
# Should only see db1/t1 (intersection of actor restrictions and plugin restrictions)
assert ("db1_multi_restrictions", "t1") in resources
assert ("db2_multi_restrictions", "t1") not in resources
finally:
ds.pm.unregister(name="restrictive_plugin")