wagtail/wagtail/contrib/redirects/views.py

428 wiersze
13 KiB
Python

import os
from typing import List
from django.core.exceptions import PermissionDenied, SuspiciousOperation
from django.db import transaction
from django.shortcuts import get_object_or_404, redirect, render
from django.template.response import TemplateResponse
from django.urls import reverse
from django.utils.encoding import force_str
from django.utils.functional import cached_property
from django.utils.translation import gettext as _
from django.utils.translation import gettext_lazy, ngettext
from django.views.decorators.http import require_http_methods
from wagtail.admin import messages
from wagtail.admin.auth import PermissionPolicyChecker
from wagtail.admin.forms.search import SearchForm
from wagtail.admin.ui.tables import Column, StatusTagColumn, TitleColumn
from wagtail.admin.views import generic
from wagtail.admin.widgets.button import Button
from wagtail.contrib.redirects import models
from wagtail.contrib.redirects.filters import RedirectsReportFilterSet
from wagtail.contrib.redirects.forms import (
ConfirmImportForm,
ConfirmImportManagementForm,
ImportForm,
RedirectForm,
)
from wagtail.contrib.redirects.models import Redirect
from wagtail.contrib.redirects.permissions import permission_policy
from wagtail.contrib.redirects.utils import (
get_file_storage,
get_format_cls_by_extension,
get_import_formats,
get_supported_extensions,
write_to_file_storage,
)
from wagtail.log_actions import log
permission_checker = PermissionPolicyChecker(permission_policy)
class RedirectTargetColumn(Column):
cell_template_name = "wagtailredirects/redirect_target_cell.html"
url_name = "wagtailadmin_pages:edit"
def get_value(self, instance):
if instance.redirect_page_id:
return instance.redirect_page.get_admin_display_title()
return instance.redirect_link
def get_url(self, instance):
if instance.redirect_page_id:
return reverse(self.url_name, args=[instance.redirect_page_id])
return None
def get_cell_context_data(self, instance, parent_context):
context = super().get_cell_context_data(instance, parent_context)
context["url"] = self.get_url(instance)
return context
class IndexView(generic.IndexView):
template_name = "wagtailredirects/index.html"
results_template_name = "wagtailredirects/index_results.html"
permission_policy = permission_policy
model = Redirect
header_icon = "redirect"
add_item_label = gettext_lazy("Add redirect")
context_object_name = "redirects"
index_url_name = "wagtailredirects:index"
index_results_url_name = "wagtailredirects:index_results"
add_url_name = "wagtailredirects:add"
edit_url_name = "wagtailredirects:edit"
delete_url_name = "wagtailredirects:delete"
default_ordering = "old_path"
paginate_by = 20
page_title = gettext_lazy("Redirects")
search_fields = ["old_path", "redirect_page__url_path", "redirect_link"]
_show_breadcrumbs = True
columns = [
TitleColumn(
"old_path",
label=gettext_lazy("From"),
url_name="wagtailredirects:edit",
sort_key="old_path",
),
Column(
"site",
label=gettext_lazy("Site"),
width="25%",
sort_key="site__site_name",
),
RedirectTargetColumn(
"redirect_page",
label=gettext_lazy("To"),
width="30%",
),
StatusTagColumn(
"is_permanent",
accessor="get_is_permanent_display",
label=gettext_lazy("Type"),
width="10%",
sort_key="is_permanent",
primary=lambda r: r.is_permanent,
),
]
filterset_class = RedirectsReportFilterSet
list_export = [
"old_path",
"link",
"get_is_permanent_display",
"site",
]
export_headings = {
"old_path": _("From"),
"site": _("Site"),
"link": _("To"),
"get_is_permanent_display": _("Type"),
}
def get_base_queryset(self):
return super().get_base_queryset().select_related("redirect_page", "site")
@cached_property
def header_more_buttons(self) -> List[Button]:
buttons = super().header_more_buttons.copy()
buttons.append(
Button(
_("Import redirects"),
url=reverse("wagtailredirects:start_import"),
icon_name="doc-full-inverse",
priority=50,
)
)
return buttons
class EditView(generic.EditView):
model = Redirect
form_class = RedirectForm
template_name = "wagtailredirects/edit.html"
index_url_name = "wagtailredirects:index"
edit_url_name = "wagtailredirects:edit"
delete_url_name = "wagtailredirects:delete"
pk_url_kwarg = "redirect_id"
permission_policy = permission_policy
error_message = gettext_lazy("The redirect could not be saved due to errors.")
def get_success_message(self):
return _("Redirect '%(redirect_title)s' updated.") % {
"redirect_title": self.object.title
}
@permission_checker.require("delete")
def delete(request, redirect_id):
theredirect = get_object_or_404(models.Redirect, id=redirect_id)
if not permission_policy.user_has_permission_for_instance(
request.user, "delete", theredirect
):
raise PermissionDenied
if request.method == "POST":
with transaction.atomic():
log(instance=theredirect, action="wagtail.delete")
theredirect.delete()
messages.success(
request,
_("Redirect '%(redirect_title)s' deleted.")
% {"redirect_title": theredirect.title},
)
return redirect("wagtailredirects:index")
return TemplateResponse(
request,
"wagtailredirects/confirm_delete.html",
{
"redirect": theredirect,
},
)
@permission_checker.require("add")
def add(request):
if request.method == "POST":
form = RedirectForm(request.POST, request.FILES)
if form.is_valid():
with transaction.atomic():
theredirect = form.save()
log(instance=theredirect, action="wagtail.create")
messages.success(
request,
_("Redirect '%(redirect_title)s' added.")
% {"redirect_title": theredirect.title},
buttons=[
messages.button(
reverse("wagtailredirects:edit", args=(theredirect.id,)),
_("Edit"),
)
],
)
return redirect("wagtailredirects:index")
else:
messages.error(
request, _("The redirect could not be created due to errors.")
)
else:
form = RedirectForm()
return TemplateResponse(
request,
"wagtailredirects/add.html",
{
"form": form,
},
)
@permission_checker.require_any("add")
def start_import(request):
supported_extensions = get_supported_extensions()
from_encoding = "utf-8"
query_string = request.GET.get("q", "")
if request.POST or request.FILES:
form_kwargs = {}
form = ImportForm(
supported_extensions,
request.POST or None,
request.FILES or None,
**form_kwargs,
)
else:
form = ImportForm(supported_extensions)
if not request.FILES or not form.is_valid():
return render(
request,
"wagtailredirects/choose_import_file.html",
{
"search_form": SearchForm(
data={"q": query_string} if query_string else None,
placeholder=_("Search redirects"),
),
"form": form,
},
)
import_file = form.cleaned_data["import_file"]
_name, extension = os.path.splitext(import_file.name)
extension = extension.lstrip(".")
if extension not in supported_extensions:
messages.error(
request,
_('File format of type "%(extension)s" is not supported')
% {"extension": extension},
)
return redirect("wagtailredirects:start_import")
import_format_cls = get_format_cls_by_extension(extension)
input_format = import_format_cls()
file_storage = write_to_file_storage(import_file, input_format)
try:
data = file_storage.read(input_format.get_read_mode())
if not input_format.is_binary() and from_encoding:
data = force_str(data, from_encoding)
dataset = input_format.create_dataset(data)
except UnicodeDecodeError as e:
messages.error(
request,
_("Imported file has a wrong encoding: %(error_message)s")
% {"error_message": e},
)
return redirect("wagtailredirects:start_import")
except Exception as e: # noqa: BLE001; pragma: no cover
messages.error(
request,
_("%(error)s encountered while trying to read file: %(filename)s")
% {"error": type(e).__name__, "filename": import_file.name},
)
return redirect("wagtailredirects:start_import")
# This data is needed in the processing step, so it is stored in
# hidden form fields as signed strings (signing happens in the form).
initial = {
"import_file_name": os.path.basename(file_storage.name),
"input_format": get_import_formats().index(import_format_cls),
}
return render(
request,
"wagtailredirects/confirm_import.html",
{
"form": ConfirmImportForm(dataset.headers, initial=initial),
"dataset": dataset,
},
)
@permission_checker.require_any("add")
@require_http_methods(["POST"])
def process_import(request):
supported_extensions = get_supported_extensions()
from_encoding = "utf-8"
management_form = ConfirmImportManagementForm(request.POST)
if not management_form.is_valid():
# Unable to unsign the hidden form data, or the data is missing, that's suspicious.
raise SuspiciousOperation(
f"Invalid management form, data is missing or has been tampered with:\n"
f"{management_form.errors.as_text()}"
)
input_format = get_import_formats()[
int(management_form.cleaned_data["input_format"])
]()
FileStorage = get_file_storage()
file_storage = FileStorage(name=management_form.cleaned_data["import_file_name"])
data = file_storage.read(input_format.get_read_mode())
if not input_format.is_binary() and from_encoding:
data = force_str(data, from_encoding)
dataset = input_format.create_dataset(data)
# Now check if the rest of the management form is valid
form = ConfirmImportForm(
dataset.headers,
request.POST,
request.FILES,
initial=management_form.cleaned_data,
)
if not form.is_valid():
return render(
request,
"wagtailredirects/confirm_import.html",
{
"form": form,
"dataset": dataset,
},
)
import_summary = create_redirects_from_dataset(
dataset,
{
"from_index": int(form.cleaned_data["from_index"]),
"to_index": int(form.cleaned_data["to_index"]),
"permanent": form.cleaned_data["permanent"],
"site": form.cleaned_data["site"],
},
)
file_storage.remove()
if import_summary["errors_count"] > 0:
return render(
request,
"wagtailredirects/import_summary.html",
{
"form": ImportForm(supported_extensions),
"import_summary": import_summary,
},
)
total = import_summary["total"]
messages.success(
request,
ngettext("Imported %(total)d redirect", "Imported %(total)d redirects", total)
% {"total": total},
)
return redirect("wagtailredirects:index")
def create_redirects_from_dataset(dataset, config):
errors = []
successes = 0
total = 0
for row in dataset:
total += 1
from_link = row[config["from_index"]]
to_link = row[config["to_index"]]
data = {
"old_path": from_link,
"redirect_link": to_link,
"is_permanent": config["permanent"],
}
if config["site"]:
data["site"] = config["site"].pk
form = RedirectForm(data)
if not form.is_valid():
error = to_readable_errors(form.errors.as_text())
errors.append([from_link, to_link, error])
continue
with transaction.atomic():
redirect = form.save()
log(instance=redirect, action="wagtail.create")
successes += 1
return {
"errors": errors,
"errors_count": len(errors),
"successes": successes,
"total": total,
}
def to_readable_errors(error):
errors = error.split("\n")
errors = errors[1::2]
errors = [x.lstrip("* ") for x in errors]
errors = ", ".join(errors)
return errors