Add ability to import redirects from a file wagtail.contrib.redirects

* Add support for importing redirects via tsv, csv, xls and xlsx files
* Add import_redirects management command to redirects documentation
pull/6114/head
Martin Sandström 2020-05-11 19:19:20 +02:00 zatwierdzone przez LB
rodzic 6e9bcef5db
commit 301d1bc7f5
31 zmienionych plików z 1967 dodań i 7 usunięć

Wyświetl plik

@ -38,6 +38,7 @@ Changelog
* Remove markup around rich text rendering by default, provide a way to use old behaviour via `wagtail.contrib.legacy.richtext` (Coen van der Kamp, Dan Braghis)
* Apply title length normalisation to improve ranking on PostgreSQL search (Karl Hobley)
* Add `WAGTAIL_TIME_FORMAT` setting (Jacob Topp-Mugglestone)
* Add ability to import redirects from an uploaded file (CSV, TSV, XLX, and XLXS) (Martin Sandström)
* Fix: Support IPv6 domain (Alex Gleason, Coen van der Kamp)
* Fix: Ensure link to add a new user works when no users are visible in the users list (LB (Ben Johnston))
* Fix: `AbstractEmailForm` saved submission fields are now aligned with the email content fields, `form.cleaned_data` will be used instead of `form.fields` (Haydn Greatnews)

Wyświetl plik

@ -21,10 +21,10 @@ The ``redirects`` module is not enabled by default. To install it, add ``wagtail
'wagtail.contrib.redirects',
]
MIDDLEWARE = [
# ...
# all other django middlware first
# all other django middlware first
'wagtail.contrib.redirects.middleware.RedirectMiddleware',
]
@ -40,6 +40,44 @@ Page model recipe of to have redirects created automatically when changing a pag
For an editor's guide to the interface, see :ref:`managing_redirects`.
Management commands
===================
import_redirects
----------------
.. code-block:: console
$ ./manage.py import_redirects
This command imports and creates redirects from a file supplied by the user.
Options:
- **src**
This is the path to the file you wish to import redirects from.
- **site**
This is the **site** for the site you wish to save redirects to.
- **permanent**
If the redirects imported should be **permanent** (True) or not (False). It's True by default.
- **from**
The column index you want to use as redirect from value.
- **to**
The column index you want to use as redirect to value.
- **dry_run**
Lets you run a import without doing any changes.
- **ask**
Lets you inspect and approve each redirect before it is created.
The ``Redirect`` class
======================

Wyświetl plik

@ -51,6 +51,7 @@ Other features
* Remove markup around rich text rendering by default, provide a way to use old behaviour via ``wagtail.contrib.legacy.richtext``. See :ref:`legacy_richtext`. (Coen van der Kamp, Dan Braghis)
* Add ``WAGTAIL_TIME_FORMAT`` setting (Jacob Topp-Mugglestone)
* Apply title length normalisation to improve ranking on PostgreSQL search (Karl Hobley)
* Add ability to import redirects from an uploaded file (CSV, TSV, XLX, and XLXS) (Martin Sandström)
Bug fixes

Wyświetl plik

@ -36,6 +36,7 @@ install_requires = [
"requests>=2.11.1,<3.0",
"l18n>=2018.5",
"xlsxwriter>=1.2.8,<2.0",
"tablib[xls,xlsx]>=0.14.0",
]
# Testing dependencies

Wyświetl plik

@ -0,0 +1,228 @@
"""
Copyright (c) Bojan Mihelac and individual contributors.
All rights reserved.
Redistribution and use in source and binary forms, with or without modification,
are permitted provided that the following conditions are met:
1. Redistributions of source code must retain the above copyright notice,
this list of conditions and the following disclaimer.
2. Redistributions in binary form must reproduce the above copyright
notice, this list of conditions and the following disclaimer in the
documentation and/or other materials provided with the distribution.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""
# https://raw.githubusercontent.com/django-import-export/django-import-export/master/import_export/formats/base_formats.py
from importlib import import_module
import tablib
class Format:
def get_title(self):
return type(self)
def create_dataset(self, in_stream):
"""
Create dataset from given string.
"""
raise NotImplementedError()
def export_data(self, dataset, **kwargs):
"""
Returns format representation for given dataset.
"""
raise NotImplementedError()
def is_binary(self):
"""
Returns if this format is binary.
"""
return True
def get_read_mode(self):
"""
Returns mode for opening files.
"""
return 'rb'
def get_extension(self):
"""
Returns extension for this format files.
"""
return ""
def get_content_type(self):
# For content types see
# https://www.iana.org/assignments/media-types/media-types.xhtml
return 'application/octet-stream'
@classmethod
def is_available(cls):
return True
def can_import(self):
return False
def can_export(self):
return False
class TablibFormat(Format):
TABLIB_MODULE = None
CONTENT_TYPE = 'application/octet-stream'
def get_format(self):
"""
Import and returns tablib module.
"""
try:
# Available since tablib 1.0
from tablib.formats import registry
except ImportError:
return import_module(self.TABLIB_MODULE)
else:
key = self.TABLIB_MODULE.split('.')[-1].replace('_', '')
return registry.get_format(key)
@classmethod
def is_available(cls):
try:
cls().get_format()
except (tablib.core.UnsupportedFormat, ImportError):
return False
return True
def get_title(self):
return self.get_format().title
def create_dataset(self, in_stream, **kwargs):
return tablib.import_set(in_stream, format=self.get_title())
def export_data(self, dataset, **kwargs):
return dataset.export(self.get_title(), **kwargs)
def get_extension(self):
return self.get_format().extensions[0]
def get_content_type(self):
return self.CONTENT_TYPE
def can_import(self):
return hasattr(self.get_format(), 'import_set')
def can_export(self):
return hasattr(self.get_format(), 'export_set')
class TextFormat(TablibFormat):
def get_read_mode(self):
return 'r'
def is_binary(self):
return False
class CSV(TextFormat):
TABLIB_MODULE = 'tablib.formats._csv'
CONTENT_TYPE = 'text/csv'
def create_dataset(self, in_stream, **kwargs):
return super().create_dataset(in_stream, **kwargs)
class JSON(TextFormat):
TABLIB_MODULE = 'tablib.formats._json'
CONTENT_TYPE = 'application/json'
class YAML(TextFormat):
TABLIB_MODULE = 'tablib.formats._yaml'
# See https://stackoverflow.com/questions/332129/yaml-mime-type
CONTENT_TYPE = 'text/yaml'
class TSV(TextFormat):
TABLIB_MODULE = 'tablib.formats._tsv'
CONTENT_TYPE = 'text/tab-separated-values'
def create_dataset(self, in_stream, **kwargs):
return super().create_dataset(in_stream, **kwargs)
class ODS(TextFormat):
TABLIB_MODULE = 'tablib.formats._ods'
CONTENT_TYPE = 'application/vnd.oasis.opendocument.spreadsheet'
class HTML(TextFormat):
TABLIB_MODULE = 'tablib.formats._html'
CONTENT_TYPE = 'text/html'
class XLS(TablibFormat):
TABLIB_MODULE = 'tablib.formats._xls'
CONTENT_TYPE = 'application/vnd.ms-excel'
def create_dataset(self, in_stream):
"""
Create dataset from first sheet.
"""
import xlrd
xls_book = xlrd.open_workbook(file_contents=in_stream)
dataset = tablib.Dataset()
sheet = xls_book.sheets()[0]
dataset.headers = sheet.row_values(0)
for i in range(1, sheet.nrows):
dataset.append(sheet.row_values(i))
return dataset
class XLSX(TablibFormat):
TABLIB_MODULE = 'tablib.formats._xlsx'
CONTENT_TYPE = 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet'
def create_dataset(self, in_stream):
"""
Create dataset from first sheet.
"""
from io import BytesIO
import openpyxl
xlsx_book = openpyxl.load_workbook(BytesIO(in_stream), read_only=True)
dataset = tablib.Dataset()
sheet = xlsx_book.active
# obtain generator
rows = sheet.rows
dataset.headers = [cell.value for cell in next(rows)]
for row in rows:
row_values = [cell.value for cell in row]
dataset.append(row_values)
return dataset
#: These are the default formats for import and export. Whether they can be
#: used or not is depending on their implementation in the tablib library.
DEFAULT_FORMATS = [fmt for fmt in (
CSV,
XLS,
XLSX,
TSV,
ODS,
JSON,
YAML,
HTML,
) if fmt.is_available()]

Wyświetl plik

@ -1,3 +1,5 @@
import os
from django import forms
from django.utils.translation import gettext_lazy as _
@ -42,3 +44,60 @@ class RedirectForm(forms.ModelForm):
class Meta:
model = Redirect
fields = ('old_path', 'site', 'is_permanent', 'redirect_page', 'redirect_link')
class ImportForm(forms.Form):
import_file = forms.FileField(
label=_("File to import"),
)
def __init__(self, allowed_extensions, *args, **kwargs):
super().__init__(*args, **kwargs)
accept = ",".join(
[".{}".format(x) for x in allowed_extensions]
)
self.fields["import_file"].widget = forms.FileInput(
attrs={"accept": accept}
)
uppercased_extensions = [x.upper() for x in allowed_extensions]
allowed_extensions_text = ", ".join(uppercased_extensions)
help_text = _(
"Supported formats: %(supported_formats)s."
) % {
'supported_formats': allowed_extensions_text,
}
self.fields["import_file"].help_text = help_text
class ConfirmImportForm(forms.Form):
from_index = forms.ChoiceField(label=_("From field"), choices=(),)
to_index = forms.ChoiceField(label=_("To field"), choices=(),)
site = forms.ModelChoiceField(
label=_("From site"),
queryset=Site.objects.all(),
required=False,
empty_label=_("All sites"),
)
permanent = forms.BooleanField(initial=True, required=False)
import_file_name = forms.CharField(widget=forms.HiddenInput())
original_file_name = forms.CharField(widget=forms.HiddenInput())
input_format = forms.CharField(widget=forms.HiddenInput())
def __init__(self, headers, *args, **kwargs):
super().__init__(*args, **kwargs)
choices = []
for i, f in enumerate(headers):
choices.append([str(i), f])
if len(headers) > 1:
choices.insert(0, ("", "---"))
self.fields["from_index"].choices = choices
self.fields["to_index"].choices = choices
def clean_import_file_name(self):
data = self.cleaned_data["import_file_name"]
data = os.path.basename(data)
return data

Wyświetl plik

@ -0,0 +1,186 @@
import os
import tablib
from django.core.management.base import BaseCommand
from wagtail.contrib.redirects.forms import RedirectForm
from wagtail.contrib.redirects.utils import get_format_cls_by_extension, get_supported_extensions
from wagtail.core.models import Site
class Command(BaseCommand):
help = "Imports redirects from .csv, .xls, .xlsx"
def add_arguments(self, parser):
parser.add_argument(
"--src", help="Path to file", type=str, required=True,
)
parser.add_argument(
"--site", help="The site where redirects will be associated", type=int,
)
parser.add_argument(
"--permanent",
help="Save redirects as permanent redirects",
type=bool,
default=True,
)
parser.add_argument(
"--from",
help="The column where to read from link",
default=0,
type=int,
)
parser.add_argument(
"--to", help="The column where to read to link", default=1, type=int,
)
parser.add_argument(
"--dry_run",
action="store_true",
help="Run only in test mode, will not create redirects",
)
parser.add_argument(
"--dry-run",
action="store_true",
help="Run only in test mode, will not create redirects",
)
parser.add_argument(
"--ask",
help="Ask before creating",
action="store_true",
)
parser.add_argument(
"--format",
help="Source file format (example: .csv, .xls etc)",
choices=get_supported_extensions(),
type=str,
)
parser.add_argument(
"--offset", help="Import starting with index", type=int, default=None
)
parser.add_argument(
"--limit", help="Limit import to num items", type=int, default=None
)
def handle(self, *args, **options):
src = options["src"]
from_index = options.pop("from")
to_index = options.pop("to")
site_id = options.pop("site", None)
permament = options.pop("permanent")
dry_run = options.pop("dry_run", False) or options.pop("dry-run", False)
format_ = options.pop("format", None)
ask = options.pop("ask")
offset = options.pop("offset")
limit = options.pop("limit")
errors = []
successes = 0
skipped = 0
total = 0
site = None
if site_id:
site = Site.objects.get(id=site_id)
if not os.path.exists(src):
raise Exception("Missing file '{0}'".format(src))
if not os.path.getsize(src) > 0:
raise Exception("File '{0}' is empty".format(src))
_, extension = os.path.splitext(src)
extension = extension.lstrip(".")
if not format_:
format_ = extension
if not get_format_cls_by_extension(format_):
raise Exception("Invalid format '{0}'".format(extension))
if extension in ["xls", "xlsx"]:
mode = "rb"
else:
mode = "r"
with open(src, mode) as fh:
imported_data = tablib.Dataset().load(fh.read(), format=format_)
sample_data = tablib.Dataset(
*imported_data[: min(len(imported_data), 4)],
headers=imported_data.headers
)
try:
self.stdout.write("Sample data:")
self.stdout.write(str(sample_data))
except Exception:
self.stdout.write("Warning: Cannot display sample data")
self.stdout.write("--------------")
if site:
self.stdout.write("Using site: {0}".format(site.hostname))
self.stdout.write("Importing redirects:")
if offset:
imported_data = imported_data[offset:]
if limit:
imported_data = imported_data[:limit]
for row in imported_data:
total += 1
from_link = row[from_index]
to_link = row[to_index]
data = {
"old_path": from_link,
"redirect_link": to_link,
"is_permanent": permament,
}
if site:
data["site"] = site.pk
form = RedirectForm(data)
if not form.is_valid():
error = form.errors.as_text().replace("\n", "")
self.stdout.write(
"{}. Error: {} -> {} (Reason: {})".format(
total, from_link, to_link, error,
)
)
errors.append(error)
continue
if ask:
answer = get_input(
"{}. Found {} -> {} Create? Y/n: ".format(
total, from_link, to_link,
)
)
if answer != "Y":
skipped += 1
continue
else:
self.stdout.write("{}. {} -> {}".format(total, from_link, to_link,))
if dry_run:
successes += 1
continue
form.save()
successes += 1
self.stdout.write("\n")
self.stdout.write("Found: {}".format(total))
self.stdout.write("Created: {}".format(successes))
self.stdout.write("Skipped : {}".format(skipped))
self.stdout.write("Errors: {}".format(len(errors)))
def get_input(msg): # pragma: no cover
return input(msg)

Wyświetl plik

@ -0,0 +1,17 @@
.listing-with-x-scroll {
display: block;
overflow-x: auto;
white-space: nowrap;
}
header .has-multiple-actions {
display: -webkit-box;
display: -moz-box;
display: -webkit-flex;
display: -ms-flexbox;
display: flex;
}
header .has-multiple-actions .actionbutton {
margin-left: 10px;
}

Wyświetl plik

@ -0,0 +1,52 @@
{% extends "wagtailadmin/base.html" %}
{% load i18n %}
{% block titletag %}{% trans "Redirects" %}{% endblock %}
{% block extra_js %}
{{ block.super }}
<script>
window.headerSearch = {
url: "{% url 'wagtailredirects:index' %}",
termInput: "#id_q",
targetOutput: "#redirects-results"
}
</script>
{% endblock %}
{% block content %}
{% trans "Import redirects" as header_title %}
{% include "wagtailadmin/shared/header.html" with title=header_title icon="redirect" %}
<div class="nice-padding">
<div class="help-block help-info">
{% blocktrans %}
<p>Select a file where redirects are separated into rows and contains the columns representing <code>from</code> and <code>to</code> (they can be named anything).</p>
<p>After submitting you will be taken to a confirmation view where you can customize your redirects before import.</p>
{% endblocktrans %}
</div>
</div>
{% if form.non_field_errors %}
<div class="messages">
<ul>
{% for error in form.non_field_errors %}
<li class="error">{{ error }}</li>
{% endfor %}
</ul>
</div>
{% endif %}
<form action="" method="POST" class="nice-padding" novalidate enctype="multipart/form-data">
{% csrf_token %}
<ul class="fields">
{% for field in form.visible_fields %}
{% include "wagtailadmin/shared/field_as_li.html" %}
{% endfor %}
<li>
<input type="submit" value="{% trans 'Import' %}" class="button" />
</li>
</ul>
</form>
{% endblock %}

Wyświetl plik

@ -0,0 +1,58 @@
{% extends "wagtailadmin/base.html" %}
{% load i18n %}
{% block titletag %}{% trans "Confirm import" %}{% endblock %}
{% block content %}
{% trans "Import redirects" as header_title %}
{% trans "Confirm import" as header_subtitle %}
{% include "wagtailadmin/shared/header.html" with title=header_title subtitle=header_subtitle icon="redirect" %}
{% if form.non_field_errors %}
<div class="messages">
<ul>
{% for error in form.non_field_errors %}
<li class="error">{{ error }}</li>
{% endfor %}
</ul>
</div>
{% endif %}
<form action="{% url 'wagtailredirects:process_import' %}" method="POST" class="nice-padding" novalidate enctype="multipart/form-data">
{% csrf_token %}
{% for field in form.hidden_fields %}{{ field }}{% endfor %}
<ul class="fields">
{% for field in form.visible_fields %}
{% include "wagtailadmin/shared/field_as_li.html" %}
{% endfor %}
<li>
<input type="submit" value="{% trans 'Confirm' %}" class="button" />
<a href="{% url 'wagtailredirects:index' %}" class="button button-secondary">
{% trans 'Cancel' %}
</a>
</li>
</ul>
<h2>{% trans "Preview" %}</h2>
<table class="listing listing-with-x-scroll">
<thead>
<tr>
{% for column in dataset.headers %}
<td>{{ column }}</td>
{% endfor %}
</tr>
</thead>
<tbody>
{% for row in dataset %}
<tr>
{% for column in row %}
<td>{{ column }}</td>
{% endfor %}
</tr>
{% endfor %}
</tbody>
</table>
</form>
{% endblock %}

Wyświetl plik

@ -0,0 +1,34 @@
{% extends "wagtailadmin/base.html" %}
{% load i18n %}
{% block titletag %}{% trans "Summary" %}{% endblock %}
{% block content %}
{% trans "Import redirects" as header_title %}
{% trans "Summary" as header_subtitle %}
{% include "wagtailadmin/shared/header.html" with title=header_title subtitle=header_subtitle icon="redirect" %}
<section id="summary" class="nice-padding">
<p class="help-block help-warning">
{% blocktrans with total=import_summary.total successes=import_summary.successes errors=import_summary.errors_count %}Found {{ total }} redirects, created {{ successes }} and found {{ errors }} errors.{% endblocktrans %}
</p>
<table class="listing">
<thead>
<tr>
<th>{% trans "From" %}</th>
<th>{% trans "To" %}</th>
<th>{% trans "Error" %}</th>
</tr>
</thead>
<tbody>
{% for error in import_summary.errors %}
<tr>
{% for value in error %}
<td>{{ value }}</td>
{% endfor %}
</tr>
{% endfor %}
</tbody>
</table>
<a href="{% url 'wagtailredirects:index' %}" class="button">{% trans "Continue" %}</a>
</section>
{% endblock %}

Wyświetl plik

@ -1,5 +1,7 @@
{% extends "wagtailadmin/base.html" %}
{% load i18n %}
{% load static %}
{% block titletag %}{% trans "Redirects" %}{% endblock %}
{% block extra_js %}
@ -14,11 +16,41 @@
{% endblock %}
{% block content %}
<link rel="stylesheet" type="text/css" href="{% static 'wagtailredirects/css/index.css' %}">
{% trans "Redirects" as redirects_str %}
{% trans "Add redirect" as add_str %}
{% if user_can_add %}
{% url "wagtailredirects:add" as add_link %}
{% include "wagtailadmin/shared/header.html" with title=redirects_str icon="redirect" action_url=add_link action_text=add_str search_url="wagtailredirects:index" %}
{% trans "Add redirect" as add_str %}
{% url "wagtailredirects:start_import" as import_link %}
{% trans "Import redirects" as import_str %}
<header class="hasform">
{% block breadcrumb %}{% endblock %}
<div class="row nice-padding">
<div class="left">
<div class="col header-title">
<h1 class="icon icon-redirect">{{ redirects_str }}</h1>
</div>
<form class="col search-form" action="{% url "wagtailredirects:index" %}{% if query_parameters %}?{{ query_parameters }}{% endif %}" method="get" novalidate role="search">
<ul class="fields">
{% for field in search_form %}
{% include "wagtailadmin/shared/field_as_li.html" with field=field field_classes="field-small iconfield" input_classes="icon-search" %}
{% endfor %}
<li class="submit visuallyhidden"><input type="submit" value="Search" class="button" /></li>
</ul>
</form>
</div>
<div class="right has-multiple-actions">
<div class="actionbutton">
<a href="{{ add_link }}" class="button bicolor icon icon-plus">{{ add_str }}</a>
</div>
<div class="actionbutton">
<a href="{{ import_link }}" class="button bicolor icon icon-doc-full-inverse">{{ import_str }}</a>
</div>
</div>
</div>
</header>
{% else %}
{% include "wagtailadmin/shared/header.html" with title=redirects_str icon="redirect" search_url="wagtailredirects:index" %}
{% endif %}
@ -28,5 +60,5 @@
{% include "wagtailredirects/results.html" %}
</div>
</div>
{% endblock %}

Wyświetl plik

@ -0,0 +1,4 @@
from,to
/hello,http://hello.com/random/
/goodbye,http://hello.com/goodbye/
/goodbye,/cake/
1 from to
2 /hello http://hello.com/random/
3 /goodbye http://hello.com/goodbye/
4 /goodbye /cake/

Wyświetl plik

@ -0,0 +1,14 @@
[
{
"from": "/hello-from-json",
"to": "http://hello.com/random/"
},
{
"from": "/goodbye-from-json",
"to": "http://hello.com/goodbye/"
},
{
"from": "/goodbye-from-json",
"to": "/goodbye-not-working/"
}
]

Plik binarny nie jest wyświetlany.

Wyświetl plik

@ -0,0 +1,4 @@
from to
/hello http://hello.com/random/
/goodbye http://hello.com/goodbye/
/goodbye /cake/
1 from to
2 /hello http://hello.com/random/
3 /goodbye http://hello.com/goodbye/
4 /goodbye /cake/

Plik binarny nie jest wyświetlany.

Plik binarny nie jest wyświetlany.

Wyświetl plik

@ -0,0 +1,5 @@
---
items:
- one
- two
- three

Plik binarny nie jest wyświetlany.
Nie można renderować tego pliku, ponieważ zawiera nieoczekiwany znak w wierszu 2 i kolumnie 108.

Wyświetl plik

@ -0,0 +1,415 @@
import os
from django.core.files.uploadedfile import SimpleUploadedFile
from django.test import TestCase, override_settings
from django.urls import reverse
from wagtail.contrib.redirects.models import Redirect
from wagtail.core.models import Site
from wagtail.tests.utils import WagtailTestUtils
TEST_ROOT = os.path.abspath(os.path.dirname(__file__))
@override_settings(
ALLOWED_HOSTS=["testserver", "localhost", "test.example.com", "other.example.com"]
)
class TestImportAdminViews(TestCase, WagtailTestUtils):
def setUp(self):
self.login()
def get(self, params={}):
return self.client.get(reverse("wagtailredirects:start_import"), params)
def post(self, post_data={}, follow=False):
return self.client.post(
reverse("wagtailredirects:start_import"), post_data, follow=follow
)
def post_import(self, post_data={}, follow=False):
return self.client.post(
reverse("wagtailredirects:process_import"), post_data, follow=follow
)
def test_request_start_with_get_returns_initial_form(self):
response = self.get()
self.assertEqual(
response.templates[0].name, "wagtailredirects/choose_import_file.html",
)
def test_empty_import_file_returns_error(self):
response = self.post({
"import_file": "",
"input_format": "0",
})
self.assertTrue("import_file" in response.context["form"].errors)
def test_non_valid_format_returns_error(self):
f = "{}/files/example.yaml".format(TEST_ROOT)
(_, filename) = os.path.split(f)
with open(f, "rb") as infile:
upload_file = SimpleUploadedFile(filename, infile.read())
response = self.post(
{
"import_file": upload_file,
},
follow=True,
)
self.assertContains(
response, 'File format of type &quot;yaml&quot; is not supported'
)
def test_valid_csv_triggers_confirm_view(self):
f = "{}/files/example.csv".format(TEST_ROOT)
(_, filename) = os.path.split(f)
with open(f, "rb") as infile:
upload_file = SimpleUploadedFile(filename, infile.read())
response = self.post(
{
"import_file": upload_file,
}
)
self.assertEqual(
response.templates[0].name,
"wagtailredirects/confirm_import.html",
)
self.assertEqual(len(response.context["dataset"]), 3)
def test_import_step(self):
f = "{}/files/example.csv".format(TEST_ROOT)
(_, filename) = os.path.split(f)
with open(f, "rb") as infile:
upload_file = SimpleUploadedFile(filename, infile.read())
self.assertEqual(Redirect.objects.all().count(), 0)
response = self.post(
{
"import_file": upload_file,
}
)
import_response = self.post_import(
{
**response.context["form"].initial,
"from_index": 0,
"to_index": 1,
"permanent": True,
}
)
self.assertEqual(
import_response.templates[0].name,
"wagtailredirects/import_summary.html",
)
self.assertEqual(Redirect.objects.all().count(), 2)
def test_permanent_setting(self):
f = "{}/files/example.csv".format(TEST_ROOT)
(_, filename) = os.path.split(f)
with open(f, "rb") as infile:
upload_file = SimpleUploadedFile(filename, infile.read())
self.assertEqual(Redirect.objects.all().count(), 0)
response = self.post(
{
"import_file": upload_file,
}
)
import_response = self.post_import(
{
**response.context["form"].initial,
"from_index": 0,
"to_index": 1,
"permanent": False,
}
)
self.assertEqual(
import_response.templates[0].name,
"wagtailredirects/import_summary.html",
)
self.assertFalse(Redirect.objects.first().is_permanent)
def test_site_setting(self):
f = "{}/files/example.csv".format(TEST_ROOT)
(_, filename) = os.path.split(f)
default_site = Site.objects.first()
new_site = Site.objects.create(
hostname="hello.dev", root_page=default_site.root_page,
)
with open(f, "rb") as infile:
upload_file = SimpleUploadedFile(filename, infile.read())
self.assertEqual(Redirect.objects.all().count(), 0)
response = self.post(
{
"import_file": upload_file,
}
)
import_response = self.post_import(
{
**response.context["form"].initial,
"from_index": 0,
"to_index": 1,
"permanent": False,
"site": new_site.pk,
}
)
self.assertEqual(
import_response.templates[0].name,
"wagtailredirects/import_summary.html",
)
self.assertEqual(Redirect.objects.count(), 2)
self.assertEqual(Redirect.objects.first().site, new_site)
def test_import_xls(self):
f = "{}/files/example.xls".format(TEST_ROOT)
(_, filename) = os.path.split(f)
with open(f, "rb") as infile:
upload_file = SimpleUploadedFile(filename, infile.read())
self.assertEqual(Redirect.objects.all().count(), 0)
response = self.post(
{
"import_file": upload_file,
}
)
self.assertEqual(
response.templates[0].name,
"wagtailredirects/confirm_import.html",
)
import_response = self.post_import(
{
**response.context["form"].initial,
"from_index": 0,
"to_index": 1,
"permanent": True,
},
follow=True,
)
self.assertEqual(
import_response.templates[0].name,
"wagtailredirects/index.html",
)
self.assertEqual(Redirect.objects.all().count(), 3)
def test_import_xlsx(self):
f = "{}/files/example.xlsx".format(TEST_ROOT)
(_, filename) = os.path.split(f)
with open(f, "rb") as infile:
upload_file = SimpleUploadedFile(filename, infile.read())
self.assertEqual(Redirect.objects.all().count(), 0)
response = self.post(
{
"import_file": upload_file,
}
)
self.assertEqual(
response.templates[0].name,
"wagtailredirects/confirm_import.html",
)
import_response = self.post_import(
{
**response.context["form"].initial,
"from_index": 0,
"to_index": 1,
"permanent": True,
},
follow=True,
)
self.assertEqual(
import_response.templates[0].name,
"wagtailredirects/index.html",
)
self.assertEqual(Redirect.objects.all().count(), 3)
def test_unicode_error_when_importing(self):
f = "{}/files/example_faulty.csv".format(TEST_ROOT)
(_, filename) = os.path.split(f)
with open(f, "rb") as infile:
upload_file = SimpleUploadedFile(filename, infile.read())
self.assertEqual(Redirect.objects.all().count(), 0)
response = self.post(
{
"import_file": upload_file,
},
follow=True,
)
self.assertTrue(
b"Imported file has a wrong encoding:" in response.content
)
def test_not_valid_method_for_import_file(self):
response = self.client.get(reverse("wagtailredirects:process_import"))
self.assertEqual(response.status_code, 405)
def test_error_in_data_renders_confirm_view_on_import(self):
f = "{}/files/example.csv".format(TEST_ROOT)
(_, filename) = os.path.split(f)
with open(f, "rb") as infile:
upload_file = SimpleUploadedFile(filename, infile.read())
response = self.post(
{
"import_file": upload_file,
}
)
self.post_import(
{
**response.context["form"].initial,
"from_index": 0,
"to_index": 1,
"permanent": True,
"site": 99,
}
)
self.assertEqual(
response.templates[0].name,
"wagtailredirects/confirm_import.html",
)
def test_import_tsv(self):
f = "{}/files/example.tsv".format(TEST_ROOT)
(_, filename) = os.path.split(f)
with open(f, "rb") as infile:
upload_file = SimpleUploadedFile(filename, infile.read())
self.assertEqual(Redirect.objects.all().count(), 0)
response = self.post(
{
"import_file": upload_file,
}
)
self.assertEqual(
response.templates[0].name,
"wagtailredirects/confirm_import.html",
)
import_response = self.post_import(
{
**response.context["form"].initial,
"from_index": 0,
"to_index": 1,
"permanent": True,
}
)
self.assertEqual(
import_response.templates[0].name,
"wagtailredirects/import_summary.html",
)
self.assertEqual(Redirect.objects.all().count(), 2)
@override_settings(WAGTAIL_REDIRECTS_FILE_STORAGE='cache')
def test_import_xlsx_with_cache_store_engine(self):
f = "{}/files/example.xlsx".format(TEST_ROOT)
(_, filename) = os.path.split(f)
with open(f, "rb") as infile:
upload_file = SimpleUploadedFile(filename, infile.read())
self.assertEqual(Redirect.objects.all().count(), 0)
response = self.post(
{
"import_file": upload_file,
}
)
self.assertEqual(
response.templates[0].name,
"wagtailredirects/confirm_import.html",
)
import_response = self.post_import(
{
**response.context["form"].initial,
"from_index": 0,
"to_index": 1,
"permanent": True,
},
follow=True,
)
self.assertEqual(
import_response.templates[0].name,
"wagtailredirects/index.html",
)
self.assertEqual(Redirect.objects.all().count(), 3)
@override_settings(WAGTAIL_REDIRECTS_FILE_STORAGE='cache')
def test_process_validation_works_when_using_plaintext_files_and_cache(self):
f = "{}/files/example.csv".format(TEST_ROOT)
(_, filename) = os.path.split(f)
with open(f, "rb") as infile:
upload_file = SimpleUploadedFile(filename, infile.read())
self.assertEqual(Redirect.objects.all().count(), 0)
response = self.post(
{
"import_file": upload_file,
}
)
self.assertEqual(
response.templates[0].name,
"wagtailredirects/confirm_import.html",
)
import_response = self.post_import(
{
**response.context["form"].initial,
"permanent": True,
}
)
self.assertEqual(
import_response.templates[0].name,
"wagtailredirects/confirm_import.html",
)

Wyświetl plik

@ -0,0 +1,335 @@
import os
import tempfile
from io import StringIO
from unittest.mock import patch
from django.core.management import call_command
from django.core.management.base import CommandError
from django.test import TestCase
from wagtail.contrib.redirects.models import Redirect
from wagtail.core.models import Site
TEST_ROOT = os.path.abspath(os.path.dirname(__file__))
class TestImportCommand(TestCase):
def test_empty_command_raises_errors(self):
with self.assertRaises(CommandError):
out = StringIO()
call_command("import_redirects", stdout=out)
def test_missing_file_raises_error(self):
with self.assertRaisesMessage(Exception, "Missing file 'random'"):
out = StringIO()
call_command("import_redirects", src="random", stdout=out)
def test_invalid_extension_raises_error(self):
f = "{}/files/example.numbers".format(TEST_ROOT)
with self.assertRaisesMessage(Exception, "Invalid format 'numbers'"):
out = StringIO()
call_command("import_redirects", src=f, stdout=out)
def test_empty_file_raises_error(self):
empty_file = tempfile.NamedTemporaryFile()
with self.assertRaisesMessage(
Exception, "File '{}' is empty".format(empty_file.name)
):
out = StringIO()
call_command("import_redirects", src=empty_file.name, stdout=out)
def test_header_are_not_imported(self):
invalid_file = tempfile.NamedTemporaryFile(mode="w+", encoding="utf-8")
invalid_file.write("from,to")
invalid_file.seek(0)
out = StringIO()
call_command(
"import_redirects", src=invalid_file.name, stdout=out, format="csv"
)
self.assertEqual(Redirect.objects.count(), 0)
def test_format_gets_picked_up_from_file_extension(self):
f = "{}/files/example.csv".format(TEST_ROOT)
out = StringIO()
call_command("import_redirects", src=f, stdout=out)
self.assertEqual(Redirect.objects.count(), 2)
def test_binary_formats_are_supported(self):
f = "{}/files/example.xls".format(TEST_ROOT)
out = StringIO()
call_command("import_redirects", src=f, stdout=out)
self.assertEqual(Redirect.objects.count(), 3)
def test_redirect_gets_imported(self):
invalid_file = tempfile.NamedTemporaryFile(mode="w+", encoding="utf-8")
invalid_file.write("from,to\n")
invalid_file.write("/alpha,http://omega.test/")
invalid_file.seek(0)
out = StringIO()
call_command(
"import_redirects", src=invalid_file.name, stdout=out, format="csv"
)
self.assertEqual(Redirect.objects.count(), 1)
redirect = Redirect.objects.first()
self.assertEqual(redirect.old_path, "/alpha")
self.assertEqual(redirect.redirect_link, "http://omega.test/")
self.assertEqual(redirect.is_permanent, True)
def test_trailing_slash_gets_stripped(self):
invalid_file = tempfile.NamedTemporaryFile(mode="w+", encoding="utf-8")
invalid_file.write("from,to\n")
invalid_file.write("/alpha/,http://omega.test/")
invalid_file.seek(0)
out = StringIO()
call_command(
"import_redirects", src=invalid_file.name, stdout=out, format="csv"
)
redirect = Redirect.objects.first()
self.assertEqual(redirect.old_path, "/alpha")
self.assertEqual(redirect.redirect_link, "http://omega.test/")
def test_site_id_does_not_exist(self):
with self.assertRaisesMessage(Exception, "Site matching query does not exist"):
out = StringIO()
call_command("import_redirects", src="random", site=5, stdout=out)
def test_redirect_gets_added_to_site(self):
invalid_file = tempfile.NamedTemporaryFile(mode="w+", encoding="utf-8")
invalid_file.write("from,to\n")
invalid_file.write("/alpha/,http://omega.test/")
invalid_file.seek(0)
current_site = Site.objects.first()
site = Site.objects.create(
hostname="random.test", root_page=current_site.root_page
)
out = StringIO()
call_command(
"import_redirects",
src=invalid_file.name,
site=site.pk,
stdout=out,
format="csv",
)
redirect = Redirect.objects.first()
self.assertEqual(redirect.old_path, "/alpha")
self.assertEqual(redirect.redirect_link, "http://omega.test/")
self.assertEqual(redirect.site, site)
def test_temporary_redirect(self):
invalid_file = tempfile.NamedTemporaryFile(mode="w+", encoding="utf-8")
invalid_file.write("from,to\n")
invalid_file.write("/alpha/,http://omega.test/")
invalid_file.seek(0)
out = StringIO()
call_command(
"import_redirects",
src=invalid_file.name,
permanent=False,
stdout=out,
format="csv",
)
redirect = Redirect.objects.first()
self.assertEqual(redirect.old_path, "/alpha")
self.assertEqual(redirect.redirect_link, "http://omega.test/")
self.assertEqual(redirect.is_permanent, False)
def test_duplicate_from_links_get_skipped(self):
invalid_file = tempfile.NamedTemporaryFile(mode="w+", encoding="utf-8")
invalid_file.write("from,to\n")
invalid_file.write("/alpha/,http://omega.test/\n")
invalid_file.write("/alpha/,http://omega2.test/\n")
invalid_file.seek(0)
out = StringIO()
call_command(
"import_redirects",
src=invalid_file.name,
permanent=False,
format="csv",
stdout=out,
)
self.assertEqual(Redirect.objects.count(), 1)
def test_non_absolute_to_links_get_skipped(self):
invalid_file = tempfile.NamedTemporaryFile(mode="w+", encoding="utf-8")
invalid_file.write("from,to\n")
invalid_file.write("/alpha/,/omega.test/\n")
invalid_file.seek(0)
out = StringIO()
call_command(
"import_redirects",
src=invalid_file.name,
permanent=False,
stdout=out,
format="csv",
)
self.assertEqual(Redirect.objects.count(), 0)
self.assertIn("Errors: 1", out.getvalue())
def test_from_links_are_converted_to_relative(self):
invalid_file = tempfile.NamedTemporaryFile(mode="w+", encoding="utf-8")
invalid_file.write("from,to\n")
invalid_file.write("http://alpha.test/alpha/,http://omega.test/\n")
invalid_file.seek(0)
out = StringIO()
call_command(
"import_redirects", src=invalid_file.name, format="csv", stdout=out
)
self.assertEqual(Redirect.objects.count(), 1)
redirect = Redirect.objects.first()
self.assertEqual(redirect.old_path, "/alpha")
self.assertEqual(redirect.redirect_link, "http://omega.test/")
def test_column_index_are_used(self):
invalid_file = tempfile.NamedTemporaryFile(mode="w+", encoding="utf-8")
invalid_file.write("priority,from,year,to\n")
invalid_file.write("5,/alpha,2020,http://omega.test/")
invalid_file.seek(0)
out = StringIO()
call_command(
"import_redirects",
"--src={}".format(invalid_file.name),
"--from=1",
"--to=3",
"--format=csv",
stdout=out,
)
self.assertEqual(Redirect.objects.count(), 1)
redirect = Redirect.objects.first()
self.assertEqual(redirect.old_path, "/alpha")
self.assertEqual(redirect.redirect_link, "http://omega.test/")
self.assertEqual(redirect.is_permanent, True)
def test_nothing_gets_saved_on_dry_run(self):
invalid_file = tempfile.NamedTemporaryFile(mode="w+", encoding="utf-8")
invalid_file.write("from,to\n")
invalid_file.write("/alpha,http://omega.test/")
invalid_file.seek(0)
out = StringIO()
call_command(
"import_redirects",
src=invalid_file.name,
format="csv",
dry_run=True,
stdout=out,
)
self.assertEqual(Redirect.objects.count(), 0)
@patch(
"wagtail.contrib.redirects.management.commands.import_redirects.get_input",
return_value="Y",
)
def test_successfull_ask_imports_redirect(self, get_input):
invalid_file = tempfile.NamedTemporaryFile(mode="w+", encoding="utf-8")
invalid_file.write("from,to\n")
invalid_file.write("/alpha,http://omega.test/")
invalid_file.seek(0)
out = StringIO()
call_command(
"import_redirects",
src=invalid_file.name,
format="csv",
ask=True,
stdout=out,
)
self.assertEqual(Redirect.objects.count(), 1)
@patch(
"wagtail.contrib.redirects.management.commands.import_redirects.get_input",
return_value="N",
)
def test_native_ask_imports_redirect(self, get_input):
invalid_file = tempfile.NamedTemporaryFile(mode="w+", encoding="utf-8")
invalid_file.write("from,to\n")
invalid_file.write("/alpha,http://omega.test/")
invalid_file.seek(0)
out = StringIO()
call_command(
"import_redirects",
src=invalid_file.name,
format="csv",
ask=True,
stdout=out,
)
self.assertEqual(Redirect.objects.count(), 0)
def test_offset_parameter(self):
invalid_file = tempfile.NamedTemporaryFile(mode="w+", encoding="utf-8")
invalid_file.write("from,to\n")
invalid_file.write("/one,http://one.test/\n")
invalid_file.write("/two,http://two.test/\n")
invalid_file.write("/three,http://three.test/\n")
invalid_file.write("/four,http://four.test/")
invalid_file.seek(0)
out = StringIO()
call_command(
"import_redirects",
src=invalid_file.name,
format="csv",
offset=2,
stdout=out,
)
redirects = Redirect.objects.all()
self.assertEqual(len(redirects), 2)
self.assertEqual(redirects[0].old_path, "/three")
self.assertEqual(redirects[0].redirect_link, "http://three.test/")
self.assertEqual(redirects[0].is_permanent, True)
self.assertEqual(redirects[1].old_path, "/four")
self.assertEqual(redirects[1].redirect_link, "http://four.test/")
def test_limit_parameter(self):
invalid_file = tempfile.NamedTemporaryFile(mode="w+", encoding="utf-8")
invalid_file.write("from,to\n")
invalid_file.write("/one,http://one.test/\n")
invalid_file.write("/two,http://two.test/\n")
invalid_file.write("/three,http://three.test/\n")
invalid_file.write("/four,http://four.test/")
invalid_file.seek(0)
out = StringIO()
call_command(
"import_redirects",
src=invalid_file.name,
format="csv",
limit=1,
stdout=out,
)
redirects = Redirect.objects.all()
self.assertEqual(len(redirects), 1)
self.assertEqual(redirects[0].old_path, "/one")
self.assertEqual(redirects[0].redirect_link, "http://one.test/")
self.assertEqual(redirects[0].is_permanent, True)

Wyświetl plik

@ -0,0 +1,23 @@
from django.test import TestCase
from wagtail.contrib.redirects.forms import ConfirmImportForm, ImportForm
class TestImportForm(TestCase):
def test_file_input_generates_proper_accept(self):
form = ImportForm(["csv", "tsv"])
self.assertIn('accept=".csv,.tsv"', form.as_table())
class TestConfirmImportForm(TestCase):
def test_choices_get_appended_with_intro_label_if_multiple(self):
form = ConfirmImportForm(headers=[(0, "From"), (1, "To")])
first_choice = form.fields["from_index"].choices[0]
self.assertEqual(first_choice[0], "")
self.assertEqual(first_choice[1], "---")
def test_choices_does_not_get_generated_label_if_single_choice(self):
form = ConfirmImportForm(headers=[(1, "Hi")])
first_choice = form.fields["from_index"].choices[0]
self.assertNotEqual(first_choice[0], "")
self.assertNotEqual(first_choice[1], "---")

Wyświetl plik

@ -0,0 +1,59 @@
import hashlib
import os
from django.core.files.base import ContentFile
from django.test import TestCase, override_settings
from wagtail.contrib.redirects.utils import (
get_file_storage, get_import_formats, write_to_file_storage
)
TEST_ROOT = os.path.abspath(os.path.dirname(__file__))
class TestImportUtils(TestCase):
def test_writing_file_with_format(self):
f = "{}/files/example.csv".format(TEST_ROOT)
(_, filename) = os.path.split(f)
with open(f, "rb") as infile:
content_orig = infile.read()
upload_file = ContentFile(content_orig)
import_formats = get_import_formats()
import_formats = [
x for x in import_formats if x.__name__ == "CSV"
]
input_format = import_formats[0]()
file_storage = write_to_file_storage(upload_file, input_format)
self.assertEqual(type(file_storage).__name__, "TempFolderStorage")
file_orig_checksum = hashlib.md5()
file_orig_checksum.update(content_orig)
file_orig_checksum = file_orig_checksum.hexdigest()
file_new_checksum = hashlib.md5()
with open(file_storage.get_full_path(), "rb") as file_new:
file_new_checksum.update(file_new.read())
file_new_checksum = file_new_checksum.hexdigest()
self.assertEqual(file_orig_checksum, file_new_checksum)
@override_settings(WAGTAIL_REDIRECTS_FILE_STORAGE='cache')
def test_that_cache_storage_are_returned(self):
FileStorage = get_file_storage()
self.assertEqual(FileStorage.__name__, "RedirectsCacheStorage")
def test_that_temp_folder_storage_are_returned_as_default(self):
FileStorage = get_file_storage()
self.assertEqual(FileStorage.__name__, "TempFolderStorage")
@override_settings(WAGTAIL_REDIRECTS_FILE_STORAGE='INVALID')
def test_invalid_file_storage_raises_errors(self):
with self.assertRaisesMessage(
Exception,
"Invalid file storage, must be either 'tmp_file' or 'cache'"
):
get_file_storage()

Wyświetl plik

@ -0,0 +1,117 @@
"""
Copyright (c) Bojan Mihelac and individual contributors.
All rights reserved.
Redistribution and use in source and binary forms, with or without modification,
are permitted provided that the following conditions are met:
1. Redistributions of source code must retain the above copyright notice,
this list of conditions and the following disclaimer.
2. Redistributions in binary form must reproduce the above copyright
notice, this list of conditions and the following disclaimer in the
documentation and/or other materials provided with the distribution.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""
# Copied from: https://raw.githubusercontent.com/django-import-export/django-import-export/5795e114210adf250ac6e146db2fa413f38875de/import_export/tmp_storages.py
import os
import tempfile
from uuid import uuid4
from django.core.cache import cache
from django.core.files.base import ContentFile
from django.core.files.storage import default_storage
class BaseStorage:
def __init__(self, name=None):
self.name = name
def save(self, data, mode='w'):
raise NotImplementedError
def read(self, read_mode='r'):
raise NotImplementedError
def remove(self):
raise NotImplementedError
class TempFolderStorage(BaseStorage):
def open(self, mode='r'):
if self.name:
return open(self.get_full_path(), mode)
else:
tmp_file = tempfile.NamedTemporaryFile(delete=False)
self.name = tmp_file.name
return tmp_file
def save(self, data, mode='w'):
with self.open(mode=mode) as file:
file.write(data)
def read(self, mode='r'):
with self.open(mode=mode) as file:
return file.read()
def remove(self):
os.remove(self.get_full_path())
def get_full_path(self):
return os.path.join(
tempfile.gettempdir(),
self.name
)
class CacheStorage(BaseStorage):
"""
By default memcache maximum size per key is 1MB, be careful with large files.
"""
CACHE_LIFETIME = 86400
CACHE_PREFIX = 'django-import-export-'
def save(self, data, mode=None):
if not self.name:
self.name = uuid4().hex
cache.set(self.CACHE_PREFIX + self.name, data, self.CACHE_LIFETIME)
def read(self, read_mode='r'):
return cache.get(self.CACHE_PREFIX + self.name)
def remove(self):
cache.delete(self.name)
class MediaStorage(BaseStorage):
MEDIA_FOLDER = 'django-import-export'
def save(self, data, mode=None):
if not self.name:
self.name = uuid4().hex
default_storage.save(self.get_full_path(), ContentFile(data))
def read(self, read_mode='rb'):
with default_storage.open(self.get_full_path(), mode=read_mode) as f:
return f.read()
def remove(self):
default_storage.delete(self.get_full_path())
def get_full_path(self):
return os.path.join(
self.MEDIA_FOLDER,
self.name
)

Wyświetl plik

@ -8,4 +8,6 @@ urlpatterns = [
url(r'^add/$', views.add, name='add'),
url(r'^(\d+)/$', views.edit, name='edit'),
url(r'^(\d+)/delete/$', views.delete, name='delete'),
url(r"^import/$", views.start_import, name="start_import"),
url(r"^import/process/$", views.process_import, name="process_import"),
]

Wyświetl plik

@ -0,0 +1,54 @@
from django.conf import settings
from wagtail.contrib.redirects.base_formats import DEFAULT_FORMATS
from wagtail.contrib.redirects.tmp_storages import CacheStorage, TempFolderStorage
def write_to_file_storage(import_file, input_format):
FileStorage = get_file_storage()
file_storage = FileStorage()
data = bytes()
for chunk in import_file.chunks():
data += chunk
file_storage.save(data, input_format.get_read_mode())
return file_storage
def get_supported_extensions():
return ("csv", "tsv", "xls", "xlsx")
def get_format_cls_by_extension(extension):
formats = get_import_formats()
available_formats = [x for x in formats if x.__name__ == extension.upper()]
if not available_formats:
return None
return available_formats[0]
def get_import_formats():
formats = [f for f in DEFAULT_FORMATS if f().can_import()]
return formats
def get_file_storage():
file_storage = getattr(
settings, 'WAGTAIL_REDIRECTS_FILE_STORAGE', 'tmp_file'
)
if file_storage == 'tmp_file':
return TempFolderStorage
if file_storage == 'cache':
return RedirectsCacheStorage
raise Exception(
"Invalid file storage, must be either 'tmp_file' or 'cache'"
)
class RedirectsCacheStorage(CacheStorage):
CACHE_PREFIX = 'wagtail-redirects-'

Wyświetl plik

@ -1,17 +1,26 @@
import os
from django.core.paginator import Paginator
from django.db.models import Q
from django.shortcuts import get_object_or_404, redirect
from django.shortcuts import get_object_or_404, redirect, render
from django.template.response import TemplateResponse
from django.urls import reverse
from django.utils.encoding import force_str
from django.utils.translation import gettext as _
from django.utils.translation import ngettext
from django.views.decorators.http import require_http_methods
from django.views.decorators.vary import vary_on_headers
from wagtail.admin import messages
from wagtail.admin.auth import PermissionPolicyChecker, permission_denied
from wagtail.admin.forms.search import SearchForm
from wagtail.contrib.redirects import models
from wagtail.contrib.redirects.forms import RedirectForm
from wagtail.contrib.redirects.base_formats import DEFAULT_FORMATS
from wagtail.contrib.redirects.forms import ConfirmImportForm, ImportForm, RedirectForm
from wagtail.contrib.redirects.permissions import permission_policy
from wagtail.contrib.redirects.utils import (
get_file_storage, get_format_cls_by_extension, get_import_formats, get_supported_extensions,
write_to_file_storage)
permission_checker = PermissionPolicyChecker(permission_policy)
@ -126,3 +135,215 @@ def add(request):
return TemplateResponse(request, "wagtailredirects/add.html", {
'form': form,
})
@permission_checker.require_any("add")
def start_import(request):
supported_extensions = get_supported_extensions()
from_encoding = "utf-8"
query_string = request.GET.get('q', "")
if request.POST or request.FILES:
form_kwargs = {}
form = ImportForm(
supported_extensions,
request.POST or None,
request.FILES or None,
**form_kwargs
)
else:
form = ImportForm(supported_extensions)
if not request.FILES or not form.is_valid():
return render(
request,
"wagtailredirects/choose_import_file.html",
{
'search_form': SearchForm(
data=dict(q=query_string) if query_string else None, placeholder=_("Search redirects")
),
"form": form,
},
)
import_file = form.cleaned_data["import_file"]
_name, extension = os.path.splitext(import_file.name)
extension = extension.lstrip(".")
if extension not in supported_extensions:
messages.error(
request,
_('File format of type "{}" is not supported'.format(extension))
)
return redirect('wagtailredirects:start_import')
import_format_cls = get_format_cls_by_extension(extension)
input_format = import_format_cls()
file_storage = write_to_file_storage(import_file, input_format)
try:
data = file_storage.read(input_format.get_read_mode())
if not input_format.is_binary() and from_encoding:
data = force_str(data, from_encoding)
dataset = input_format.create_dataset(data)
except UnicodeDecodeError as e:
messages.error(
request,
_(u"Imported file has a wrong encoding: %s" % e)
)
return redirect('wagtailredirects:start_import')
except Exception as e: # pragma: no cover
messages.error(
request,
_(
u"%s encountered while trying to read file: %s"
% (type(e).__name__, import_file.name)
)
)
return redirect('wagtailredirects:start_import')
initial = {
"import_file_name": file_storage.name,
"original_file_name": import_file.name,
"input_format": get_import_formats().index(import_format_cls),
}
return render(
request,
"wagtailredirects/confirm_import.html",
{
"form": ConfirmImportForm(dataset.headers, initial=initial),
"dataset": dataset,
},
)
@permission_checker.require_any("add")
@require_http_methods(["POST"])
def process_import(request):
supported_extensions = get_supported_extensions()
from_encoding = "utf-8"
form_kwargs = {}
form = ConfirmImportForm(
DEFAULT_FORMATS, request.POST or None, request.FILES or None, **form_kwargs
)
is_confirm_form_valid = form.is_valid()
import_formats = get_import_formats()
input_format = import_formats[int(form.cleaned_data["input_format"])]()
FileStorage = get_file_storage()
file_storage = FileStorage(name=form.cleaned_data["import_file_name"])
if not is_confirm_form_valid:
data = file_storage.read(input_format.get_read_mode())
if not input_format.is_binary() and from_encoding:
data = force_str(data, from_encoding)
dataset = input_format.create_dataset(data)
initial = {
"import_file_name": file_storage.name,
"original_file_name": form.cleaned_data["import_file_name"],
}
return render(
request,
"wagtailredirects/confirm_import.html",
{
"form": ConfirmImportForm(
dataset.headers,
request.POST or None,
request.FILES or None,
initial=initial,
),
"dataset": dataset,
},
)
data = file_storage.read(input_format.get_read_mode())
if not input_format.is_binary() and from_encoding:
data = force_str(data, from_encoding)
dataset = input_format.create_dataset(data)
import_summary = create_redirects_from_dataset(
dataset,
{
"from_index": int(form.cleaned_data["from_index"]),
"to_index": int(form.cleaned_data["to_index"]),
"permanent": form.cleaned_data["permanent"],
"site": form.cleaned_data["site"],
},
)
file_storage.remove()
if import_summary["errors_count"] > 0:
return render(
request,
"wagtailredirects/import_summary.html",
{
"form": ImportForm(supported_extensions),
"import_summary": import_summary,
},
)
total = import_summary["total"]
messages.success(
request,
ngettext(
"Imported %(total)d redirect",
"Imported %(total)d redirects",
total
) % {'total': total}
)
return redirect('wagtailredirects:index')
def create_redirects_from_dataset(dataset, config):
errors = []
successes = 0
total = 0
for row in dataset:
total += 1
from_link = row[config["from_index"]]
to_link = row[config["to_index"]]
data = {
"old_path": from_link,
"redirect_link": to_link,
"is_permanent": config["permanent"],
}
if config["site"]:
data["site"] = config["site"].pk
form = RedirectForm(data)
if not form.is_valid():
error = to_readable_errors(form.errors.as_text())
errors.append([from_link, to_link, error])
continue
form.save()
successes += 1
return {
"errors": errors,
"errors_count": len(errors),
"successes": successes,
"total": total,
}
def to_readable_errors(error):
errors = error.split("\n")
errors = errors[1::2]
errors = [x.lstrip('* ') for x in errors]
errors = ", ".join(errors)
return errors