kopia lustrzana https://github.com/simonw/datasette
New forbidden() plugin hook, closes #812
rodzic
3ec5b1abf6
commit
549b1c2063
|
@ -975,22 +975,24 @@ class DatasetteRouter:
|
|||
await response.asgi_send(send)
|
||||
return
|
||||
except NotFound as exception:
|
||||
return await self.handle_404(scope, receive, send, exception)
|
||||
return await self.handle_404(request, send, exception)
|
||||
except Exception as exception:
|
||||
return await self.handle_500(scope, receive, send, exception)
|
||||
return await self.handle_404(scope, receive, send)
|
||||
return await self.handle_500(request, send, exception)
|
||||
return await self.handle_404(request, send)
|
||||
|
||||
async def handle_404(self, scope, receive, send, exception=None):
|
||||
async def handle_404(self, request, send, exception=None):
|
||||
# If URL has a trailing slash, redirect to URL without it
|
||||
path = scope.get("raw_path", scope["path"].encode("utf8"))
|
||||
path = request.scope.get("raw_path", request.scope["path"].encode("utf8"))
|
||||
if path.endswith(b"/"):
|
||||
path = path.rstrip(b"/")
|
||||
if scope["query_string"]:
|
||||
path += b"?" + scope["query_string"]
|
||||
if request.scope["query_string"]:
|
||||
path += b"?" + request.scope["query_string"]
|
||||
await asgi_send_redirect(send, path.decode("latin1"))
|
||||
else:
|
||||
# Is there a pages/* template matching this path?
|
||||
template_path = os.path.join("pages", *scope["path"].split("/")) + ".html"
|
||||
template_path = (
|
||||
os.path.join("pages", *request.scope["path"].split("/")) + ".html"
|
||||
)
|
||||
try:
|
||||
template = self.ds.jinja_env.select_template([template_path])
|
||||
except TemplateNotFound:
|
||||
|
@ -1019,7 +1021,7 @@ class DatasetteRouter:
|
|||
"custom_status": custom_status,
|
||||
"custom_redirect": custom_redirect,
|
||||
},
|
||||
request=Request(scope, receive),
|
||||
request=request,
|
||||
view_name="page",
|
||||
)
|
||||
# Pull content-type out into separate parameter
|
||||
|
@ -1035,11 +1037,9 @@ class DatasetteRouter:
|
|||
content_type=content_type,
|
||||
)
|
||||
else:
|
||||
await self.handle_500(
|
||||
scope, receive, send, exception or NotFound("404")
|
||||
)
|
||||
await self.handle_500(request, send, exception or NotFound("404"))
|
||||
|
||||
async def handle_500(self, scope, receive, send, exception):
|
||||
async def handle_500(self, request, send, exception):
|
||||
title = None
|
||||
if isinstance(exception, NotFound):
|
||||
status = 404
|
||||
|
@ -1049,6 +1049,17 @@ class DatasetteRouter:
|
|||
status = 403
|
||||
info = {}
|
||||
message = exception.args[0]
|
||||
# Try the forbidden() plugin hook
|
||||
for custom_response in pm.hook.forbidden(
|
||||
datasette=self.ds, request=request, message=message
|
||||
):
|
||||
if callable(custom_response):
|
||||
custom_response = custom_response()
|
||||
if asyncio.iscoroutine(custom_response):
|
||||
custom_response = await custom_response
|
||||
if custom_response is not None:
|
||||
await custom_response.asgi_send(send)
|
||||
return
|
||||
elif isinstance(exception, DatasetteError):
|
||||
status = exception.status
|
||||
info = exception.error_dict
|
||||
|
@ -1070,7 +1081,7 @@ class DatasetteRouter:
|
|||
headers = {}
|
||||
if self.ds.cors:
|
||||
headers["Access-Control-Allow-Origin"] = "*"
|
||||
if scope["path"].split("?")[0].endswith(".json"):
|
||||
if request.path.split("?")[0].endswith(".json"):
|
||||
await asgi_send_json(send, info, status=status, headers=headers)
|
||||
else:
|
||||
template = self.ds.jinja_env.select_template(templates)
|
||||
|
|
|
@ -88,3 +88,8 @@ def canned_queries(datasette, database, actor):
|
|||
@hookspec
|
||||
def register_magic_parameters(datasette):
|
||||
"Return a list of (name, function) magic parameter functions"
|
||||
|
||||
|
||||
@hookspec
|
||||
def forbidden(datasette, request, message):
|
||||
"Custom response for a 403 forbidden error"
|
||||
|
|
|
@ -10,7 +10,7 @@ from datasette.utils import (
|
|||
path_with_added_args,
|
||||
path_with_removed_args,
|
||||
)
|
||||
from datasette.utils.asgi import AsgiFileDownload, Response
|
||||
from datasette.utils.asgi import AsgiFileDownload, Response, Forbidden
|
||||
from datasette.plugins import pm
|
||||
|
||||
from .base import DatasetteError, DataView
|
||||
|
@ -120,7 +120,7 @@ class DatabaseDownload(DataView):
|
|||
if db.is_memory:
|
||||
raise DatasetteError("Cannot download :memory: database", status=404)
|
||||
if not self.ds.config("allow_download") or db.is_mutable:
|
||||
raise DatasetteError("Database download is forbidden", status=403)
|
||||
raise Forbidden("Database download is forbidden")
|
||||
if not db.path:
|
||||
raise DatasetteError("Cannot download database", status=404)
|
||||
filepath = db.path
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
import json
|
||||
from datasette.utils.asgi import Response
|
||||
from datasette.utils.asgi import Response, Forbidden
|
||||
from .base import BaseView
|
||||
import secrets
|
||||
|
||||
|
@ -60,7 +60,7 @@ class AuthTokenView(BaseView):
|
|||
async def get(self, request):
|
||||
token = request.args.get("token") or ""
|
||||
if not self.ds._root_token:
|
||||
return Response("Root token has already been used", status=403)
|
||||
raise Forbidden("Root token has already been used")
|
||||
if secrets.compare_digest(token, self.ds._root_token):
|
||||
self.ds._root_token = None
|
||||
response = Response.redirect("/")
|
||||
|
@ -69,7 +69,7 @@ class AuthTokenView(BaseView):
|
|||
)
|
||||
return response
|
||||
else:
|
||||
return Response("Invalid token", status=403)
|
||||
raise Forbidden("Invalid token")
|
||||
|
||||
|
||||
class LogoutView(BaseView):
|
||||
|
@ -99,7 +99,7 @@ class PermissionsDebugView(BaseView):
|
|||
async def get(self, request):
|
||||
await self.check_permission(request, "view-instance")
|
||||
if not await self.ds.permission_allowed(request.actor, "permissions-debug"):
|
||||
return Response("Permission denied", status=403)
|
||||
raise Forbidden("Permission denied")
|
||||
return await self.render(
|
||||
["permissions_debug.html"],
|
||||
request,
|
||||
|
|
|
@ -946,3 +946,46 @@ This example registers two new magic parameters: ``:_request_http_version`` retu
|
|||
("request", request),
|
||||
("uuid", uuid),
|
||||
]
|
||||
|
||||
.. _plugin_hook_forbidden:
|
||||
|
||||
forbidden(datasette, request, message)
|
||||
--------------------------------------
|
||||
|
||||
``datasette`` - :ref:`internals_datasette`
|
||||
You can use this to access plugin configuration options via ``datasette.plugin_config(your_plugin_name)``, or to execute SQL queries.
|
||||
|
||||
``request`` - object
|
||||
The current HTTP :ref:`internals_request`.
|
||||
|
||||
``message`` - string
|
||||
A message hinting at why the request was forbidden.
|
||||
|
||||
Plugins can use this to customize how Datasette responds when a 403 Forbidden error occurs - usually because a page failed a permission check, see :authentication_permissions:.
|
||||
|
||||
If a plugin hook wishes to react to the error, it should return a :ref:`Response object <internals_response>`.
|
||||
|
||||
This example returns a redirect to a ``/-/login`` page:
|
||||
|
||||
.. code-block:: python
|
||||
|
||||
from datasette import hookimpl
|
||||
from urllib.parse import urlencode
|
||||
|
||||
@hookimpl
|
||||
def forbidden(request, message):
|
||||
return Response.redirect("/-/login?=" + urlencode({"message": message}))
|
||||
|
||||
The function can alternatively return an awaitable function if it needs to make any asynchronous method calls. This example renders a template:
|
||||
|
||||
.. code-block:: python
|
||||
|
||||
from datasette import hookimpl
|
||||
from datasette.utils.asgi import Response
|
||||
|
||||
@hookimpl
|
||||
def forbidden(datasette):
|
||||
async def inner():
|
||||
return Response.html(await datasette.render_template("forbidden.html"))
|
||||
|
||||
return inner
|
||||
|
|
|
@ -45,6 +45,7 @@ EXPECTED_PLUGINS = [
|
|||
"extra_css_urls",
|
||||
"extra_js_urls",
|
||||
"extra_template_vars",
|
||||
"forbidden",
|
||||
"permission_allowed",
|
||||
"prepare_connection",
|
||||
"prepare_jinja2_environment",
|
||||
|
|
|
@ -245,3 +245,10 @@ def register_magic_parameters():
|
|||
("request", request),
|
||||
("uuid", uuid),
|
||||
]
|
||||
|
||||
|
||||
@hookimpl
|
||||
def forbidden(datasette, request, message):
|
||||
datasette._last_forbidden_message = message
|
||||
if request.path == "/data2":
|
||||
return Response.redirect("/login?message=" + message)
|
||||
|
|
|
@ -684,3 +684,16 @@ def test_register_magic_parameters(restore_working_directory):
|
|||
assert 200 == response_get.status
|
||||
new_uuid = response_get.json[0][":_uuid_new"]
|
||||
assert 4 == new_uuid.count("-")
|
||||
|
||||
|
||||
def test_forbidden(restore_working_directory):
|
||||
with make_app_client(
|
||||
extra_databases={"data2.db": "create table logs (line text)"},
|
||||
metadata={"allow": {}},
|
||||
) as client:
|
||||
response = client.get("/")
|
||||
assert 403 == response.status
|
||||
response2 = client.get("/data2", allow_redirects=False)
|
||||
assert 302 == response2.status
|
||||
assert "/login?message=view-database" == response2.headers["Location"]
|
||||
assert "view-database" == client.ds._last_forbidden_message
|
||||
|
|
Ładowanie…
Reference in New Issue