kopia lustrzana https://github.com/wagtail/wagtail
Merge branch 'issue-1397' of https://github.com/kaedroho/wagtail into kaedroho-issue-1397
commit
2163e29b68
|
@ -0,0 +1,72 @@
|
|||
# This file contains a file storage backend that imitates behaviours of
|
||||
# common external storage backends (S3 boto, libcloud, etc).
|
||||
|
||||
# The following behaviours have been added to this backend:
|
||||
# - Calling .path on the storage or image file raises NotImplementedError
|
||||
# - File.open() after the file has been closed raises an error
|
||||
|
||||
from django.core.files.storage import Storage, FileSystemStorage
|
||||
from django.core.files.base import File
|
||||
from django.utils.deconstruct import deconstructible
|
||||
|
||||
|
||||
@deconstructible
|
||||
class DummyExternalStorage(Storage):
|
||||
def __init__(self, *args, **kwargs):
|
||||
self.wrapped = FileSystemStorage(*args, **kwargs)
|
||||
|
||||
def path(self, name):
|
||||
# Overridden to give it the behaviour of the base Storage class
|
||||
# This is what an external storage backend would have
|
||||
raise NotImplementedError("This backend doesn't support absolute paths.")
|
||||
|
||||
def _open(self, name, mode='rb'):
|
||||
# Overridden to return a DummyExternalStorageFile instead of a normal
|
||||
# File object
|
||||
return DummyExternalStorageFile(open(self.wrapped.path(name), mode))
|
||||
|
||||
|
||||
# Wrap all other functions
|
||||
|
||||
def _save(self, name, content):
|
||||
return self.wrapped._save(name, content)
|
||||
|
||||
def delete(self, name):
|
||||
self.wrapped.delete(name)
|
||||
|
||||
def exists(self, name):
|
||||
return self.wrapped.exists(name)
|
||||
|
||||
def listdir(self, path):
|
||||
return self.wrapped.listdir(path)
|
||||
|
||||
def size(self, name):
|
||||
return self.wrapped.size(name)
|
||||
|
||||
def url(self, name):
|
||||
return self.wrapped.url(name)
|
||||
|
||||
def accessed_time(self, name):
|
||||
return self.wrapped.accessed_time(name)
|
||||
|
||||
def created_time(self, name):
|
||||
return self.wrapped.created_time(name)
|
||||
|
||||
def modified_time(self, name):
|
||||
return self.wrapped.modified_time(name)
|
||||
|
||||
|
||||
class DummyExternalStorageFile(File):
|
||||
def open(self, mode=None):
|
||||
# Based on: https://github.com/django/django/blob/2c39f282b8389f47fee4b24e785a58567c6c3629/django/core/files/base.py#L135-L141
|
||||
|
||||
# I've commented out two lines of this function which stops it checking
|
||||
# the filesystem for the file. Making it behave as if it is using an
|
||||
# external file storage.
|
||||
|
||||
if not self.closed:
|
||||
self.seek(0)
|
||||
# elif self.name and os.path.exists(self.name):
|
||||
# self.file = open(self.name, mode or self.mode)
|
||||
else:
|
||||
raise ValueError("The file cannot be reopened.")
|
|
@ -75,6 +75,17 @@ class AbstractImage(models.Model, TagSearchable):
|
|||
|
||||
file_size = models.PositiveIntegerField(null=True, editable=False)
|
||||
|
||||
def is_stored_locally(self):
|
||||
"""
|
||||
Returns True if the image is hosted on the local filesystem
|
||||
"""
|
||||
try:
|
||||
self.file.path
|
||||
|
||||
return True
|
||||
except NotImplementedError:
|
||||
return False
|
||||
|
||||
def get_file_size(self):
|
||||
if self.file_size is None:
|
||||
try:
|
||||
|
@ -107,8 +118,18 @@ class AbstractImage(models.Model, TagSearchable):
|
|||
# Open file if it is closed
|
||||
close_file = False
|
||||
try:
|
||||
image_file = self.file
|
||||
|
||||
if self.file.closed:
|
||||
self.file.open('rb')
|
||||
# Reopen the file
|
||||
if self.is_stored_locally():
|
||||
self.file.open('rb')
|
||||
else:
|
||||
# Some external storage backends don't allow reopening
|
||||
# the file. Get a fresh file instance. #1397
|
||||
storage = self._meta.get_field('file').storage
|
||||
image_file = storage.open(self.file.name, 'rb')
|
||||
|
||||
close_file = True
|
||||
except IOError as e:
|
||||
# re-throw this as a SourceImageIOError so that calling code can distinguish
|
||||
|
@ -116,13 +137,13 @@ class AbstractImage(models.Model, TagSearchable):
|
|||
raise SourceImageIOError(text_type(e))
|
||||
|
||||
# Seek to beginning
|
||||
self.file.seek(0)
|
||||
image_file.seek(0)
|
||||
|
||||
try:
|
||||
yield WillowImage.open(self.file)
|
||||
yield WillowImage.open(image_file)
|
||||
finally:
|
||||
if close_file:
|
||||
self.file.close()
|
||||
image_file.close()
|
||||
|
||||
def get_rect(self):
|
||||
return Rect(0, 0, self.width, self.height)
|
||||
|
|
|
@ -91,6 +91,19 @@ class TestImageAddView(TestCase, WagtailTestUtils):
|
|||
# Test that the file_size field was set
|
||||
self.assertTrue(image.file_size)
|
||||
|
||||
@override_settings(DEFAULT_FILE_STORAGE='wagtail.tests.dummy_external_storage.DummyExternalStorage')
|
||||
def test_add_with_external_file_storage(self):
|
||||
response = self.post({
|
||||
'title': "Test image",
|
||||
'file': SimpleUploadedFile('test.png', get_test_image_file().file.getvalue()),
|
||||
})
|
||||
|
||||
# Should redirect back to index
|
||||
self.assertRedirects(response, reverse('wagtailimages:index'))
|
||||
|
||||
# Check that the image was created
|
||||
self.assertTrue(Image.objects.filter(title="Test image").exists())
|
||||
|
||||
def test_add_no_file_selected(self):
|
||||
response = self.post({
|
||||
'title': "Test image",
|
||||
|
@ -144,6 +157,19 @@ class TestImageEditView(TestCase, WagtailTestUtils):
|
|||
self.assertEqual(response.status_code, 200)
|
||||
self.assertTemplateUsed(response, 'wagtailimages/images/edit.html')
|
||||
|
||||
@override_settings(DEFAULT_FILE_STORAGE='wagtail.tests.dummy_external_storage.DummyExternalStorage')
|
||||
def test_simple_with_external_storage(self):
|
||||
# The view calls get_file_size on the image that closes the file if
|
||||
# file_size wasn't prevously populated.
|
||||
|
||||
# The view then attempts to reopen the file when rendering the template
|
||||
# which caused crashes when certian storage backends were in use.
|
||||
# See #1397
|
||||
|
||||
response = self.get()
|
||||
self.assertEqual(response.status_code, 200)
|
||||
self.assertTemplateUsed(response, 'wagtailimages/images/edit.html')
|
||||
|
||||
def test_edit(self):
|
||||
response = self.post({
|
||||
'title': "Edited",
|
||||
|
@ -175,6 +201,26 @@ class TestImageEditView(TestCase, WagtailTestUtils):
|
|||
image = Image.objects.get(id=self.image.id)
|
||||
self.assertNotEqual(image.file_size, 100000)
|
||||
|
||||
@override_settings(DEFAULT_FILE_STORAGE='wagtail.tests.dummy_external_storage.DummyExternalStorage')
|
||||
def test_edit_with_new_image_file_and_external_storage(self):
|
||||
file_content = get_test_image_file().file.getvalue()
|
||||
|
||||
# Change the file size of the image
|
||||
self.image.file_size = 100000
|
||||
self.image.save()
|
||||
|
||||
response = self.post({
|
||||
'title': "Edited",
|
||||
'file': SimpleUploadedFile('new.png', file_content),
|
||||
})
|
||||
|
||||
# Should redirect back to index
|
||||
self.assertRedirects(response, reverse('wagtailimages:index'))
|
||||
|
||||
# Check that the image file size changed (assume it changed to the correct value)
|
||||
image = Image.objects.get(id=self.image.id)
|
||||
self.assertNotEqual(image.file_size, 100000)
|
||||
|
||||
def test_with_missing_image_file(self):
|
||||
self.image.file.delete(False)
|
||||
|
||||
|
@ -306,6 +352,19 @@ class TestImageChooserUploadView(TestCase, WagtailTestUtils):
|
|||
# The form should have an error
|
||||
self.assertFormError(response, 'uploadform', 'file', "This field is required.")
|
||||
|
||||
@override_settings(DEFAULT_FILE_STORAGE='wagtail.tests.dummy_external_storage.DummyExternalStorage')
|
||||
def test_upload_with_external_storage(self):
|
||||
response = self.client.post(reverse('wagtailimages:chooser_upload'), {
|
||||
'title': "Test image",
|
||||
'file': SimpleUploadedFile('test.png', get_test_image_file().file.getvalue()),
|
||||
})
|
||||
|
||||
# Check response
|
||||
self.assertEqual(response.status_code, 200)
|
||||
|
||||
# Check that the image was created
|
||||
self.assertTrue(Image.objects.filter(title="Test image").exists())
|
||||
|
||||
|
||||
class TestMultipleImageUploader(TestCase, WagtailTestUtils):
|
||||
"""
|
||||
|
|
|
@ -79,6 +79,13 @@ class TestImage(TestCase):
|
|||
self.assertEqual(self.image.focal_point_width, None)
|
||||
self.assertEqual(self.image.focal_point_height, None)
|
||||
|
||||
def test_is_stored_locally(self):
|
||||
self.assertTrue(self.image.is_stored_locally())
|
||||
|
||||
@override_settings(DEFAULT_FILE_STORAGE='wagtail.tests.dummy_external_storage.DummyExternalStorage')
|
||||
def test_is_stored_locally_with_external_storage(self):
|
||||
self.assertFalse(self.image.is_stored_locally())
|
||||
|
||||
|
||||
class TestImagePermissions(TestCase):
|
||||
def setUp(self):
|
||||
|
|
|
@ -123,15 +123,9 @@ def edit(request, image_id):
|
|||
except NoReverseMatch:
|
||||
url_generator_enabled = False
|
||||
|
||||
try:
|
||||
local_path = image.file.path
|
||||
except NotImplementedError:
|
||||
# Image is hosted externally (eg, S3)
|
||||
local_path = None
|
||||
|
||||
if local_path:
|
||||
if image.is_stored_locally():
|
||||
# Give error if image file doesn't exist
|
||||
if not os.path.isfile(local_path):
|
||||
if not os.path.isfile(image.file.path):
|
||||
messages.error(request, _("The source image file could not be found. Please change the source or delete the image.").format(image.title), buttons=[
|
||||
messages.button(reverse('wagtailimages:delete', args=(image.id,)), _('Delete'))
|
||||
])
|
||||
|
|
Ładowanie…
Reference in New Issue