Add failing test for race condition in Filter.get_or_create

pull/844/merge
Matt Westcott 2014-12-02 17:40:32 +00:00 zatwierdzone przez Karl Hobley
rodzic f39a95ab4f
commit 8e7015b9c7
2 zmienionych plików z 52 dodań i 2 usunięć

Wyświetl plik

@ -1,5 +1,6 @@
from contextlib import contextmanager
import warnings
import threading
from django.contrib.auth import get_user_model
from django.utils import six
@ -39,3 +40,34 @@ class WagtailTestUtils(object):
for w in warning_list:
if not issubclass(w.category, DeprecationWarning):
warnings.showwarning(message=w.message, category=w.category, filename=w.filename, lineno=w.lineno, file=w.file, line=w.line)
# from http://www.caktusgroup.com/blog/2009/05/26/testing-django-views-for-concurrency-issues/
def test_concurrently(times):
"""
Add this decorator to small pieces of code that you want to test
concurrently to make sure they don't raise exceptions when run at the
same time. E.g., some Django views that do a SELECT and then a subsequent
INSERT might fail when the INSERT assumes that the data has not changed
since the SELECT.
"""
def test_concurrently_decorator(test_func):
def wrapper(*args, **kwargs):
exceptions = []
def call_test_func():
try:
test_func(*args, **kwargs)
except Exception, e:
exceptions.append(e)
raise
threads = []
for i in range(times):
threads.append(threading.Thread(target=call_test_func))
for t in threads:
t.start()
for t in threads:
t.join()
if exceptions:
raise Exception('test_concurrently intercepted %s exceptions: %s' % (len(exceptions), exceptions))
return wrapper
return test_concurrently_decorator

Wyświetl plik

@ -7,10 +7,10 @@ from django.contrib.auth.models import Group, Permission
from django.core.files.uploadedfile import SimpleUploadedFile
from django.db.utils import IntegrityError
from wagtail.tests.utils import WagtailTestUtils, unittest
from wagtail.tests.utils import WagtailTestUtils, unittest, test_concurrently
from wagtail.wagtailcore.models import Page
from wagtail.tests.models import EventPage, EventPageCarouselItem
from wagtail.wagtailimages.models import Rendition
from wagtail.wagtailimages.models import Rendition, Filter
from wagtail.wagtailimages.backends import get_image_backend
from wagtail.wagtailimages.backends.pillow import PillowBackend
from wagtail.wagtailimages.rect import Rect
@ -405,3 +405,21 @@ class TestIssue312(TestCase):
height=rend1.height,
focal_point_key=rend1.focal_point_key,
)
def test_duplicate_filters(self):
# get renditions concurrently, using various filters that are unlikely to exist already
@test_concurrently(10)
def get_renditions():
# Create an image
image = Image.objects.create(
title="Concurrency test image",
file=get_test_image_file(),
)
for width in range(10, 100, 10):
image.get_rendition('width-%d' % width)
get_renditions()
# if the above has completed with no race conditions, there should be precisely one
# of each of the above filters in the database
for width in range(10, 100, 10):
self.assertEqual(Filter.objects.filter(spec='width-%d' % width).count(), 1)