Merge branch 'es-index-atomic-rebuild' of https://github.com/kaedroho/wagtail into kaedroho-es-index-atomic-rebuild

pull/1573/head
Matt Westcott 2015-08-01 10:52:20 +02:00
commit 55c3606b16
5 zmienionych plików z 354 dodań i 63 usunięć

Wyświetl plik

@ -41,6 +41,20 @@ The ``AUTO_UPDATE`` setting allows you to disable this on a per-index basis:
If you have disabled auto update, you must run the :ref:`update_index` command on a regular basis to keep the index in sync with the database.
``ATOMIC_REBUILD``
==================
.. versionadded:: 1.1
By default (when using the Elasticsearch backend), when the ``update_index`` command is run, Wagtail deletes the index and rebuilds it from scratch. This causes the search engine to not return results until the rebuild is complete and is also risky as you can't rollback if an error occurs.
Setting the ``ATOMIC_REBUILD`` setting to ``True`` makes Wagtail rebuild into a separate index while keep the old index active until the new one is fully built. When the rebuild is finished, the indexes are swapped atomically and the old index is deleted.
.. warning:: Experimental feature
This feature is currently experimental. Please use it with caution.
``BACKEND``
===========

Wyświetl plik

@ -176,6 +176,9 @@ class BaseSearch(object):
def __init__(self, params):
pass
def get_rebuilder(self):
return None
def reset_index(self):
raise NotImplementedError

Wyświetl plik

@ -7,10 +7,57 @@ from django.utils.six.moves.urllib.parse import urlparse
from elasticsearch import Elasticsearch, NotFoundError
from elasticsearch.helpers import bulk
from django.utils.crypto import get_random_string
from wagtail.wagtailsearch.backends.base import BaseSearch, BaseSearchQuery, BaseSearchResults
from wagtail.wagtailsearch.index import SearchField, FilterField, class_is_indexed
INDEX_SETTINGS = {
'settings': {
'analysis': {
'analyzer': {
'ngram_analyzer': {
'type': 'custom',
'tokenizer': 'lowercase',
'filter': ['asciifolding', 'ngram']
},
'edgengram_analyzer': {
'type': 'custom',
'tokenizer': 'lowercase',
'filter': ['asciifolding', 'edgengram']
}
},
'tokenizer': {
'ngram_tokenizer': {
'type': 'nGram',
'min_gram': 3,
'max_gram': 15,
},
'edgengram_tokenizer': {
'type': 'edgeNGram',
'min_gram': 2,
'max_gram': 15,
'side': 'front'
}
},
'filter': {
'ngram': {
'type': 'nGram',
'min_gram': 3,
'max_gram': 15
},
'edgengram': {
'type': 'edgeNGram',
'min_gram': 1,
'max_gram': 15
}
}
}
}
}
class ElasticSearchMapping(object):
TYPE_MAP = {
'AutoField': 'integer',
@ -307,6 +354,118 @@ class ElasticSearchResults(BaseSearchResults):
return max(hit_count, 0)
class ElasticSearchIndexRebuilder(object):
def __init__(self, es, index_name):
self.es = es
self.index_name = index_name
def reset_index(self):
# Delete old index
try:
self.es.indices.delete(self.index_name)
except NotFoundError:
pass
# Create new index
self.es.indices.create(self.index_name, INDEX_SETTINGS)
def start(self):
# Reset the index
self.reset_index()
def add_model(self, model):
# Get mapping
mapping = ElasticSearchMapping(model)
# Put mapping
self.es.indices.put_mapping(index=self.index_name, doc_type=mapping.get_document_type(), body=mapping.get_mapping())
def add_items(self, model, obj_list):
if not class_is_indexed(model):
return
# Get mapping
mapping = ElasticSearchMapping(model)
doc_type = mapping.get_document_type()
# Create list of actions
actions = []
for obj in obj_list:
# Create the action
action = {
'_index': self.index_name,
'_type': doc_type,
'_id': mapping.get_document_id(obj),
}
action.update(mapping.get_document(obj))
actions.append(action)
# Run the actions
bulk(self.es, actions)
def finish(self):
# Refresh index
self.es.indices.refresh(self.index_name)
class ElasticSearchAtomicIndexRebuilder(ElasticSearchIndexRebuilder):
def __init__(self, es, alias_name):
self.es = es
self.alias_name = alias_name
self.index_name = alias_name + '_' + get_random_string(7).lower()
def reset_index(self):
# Delete old index using the alias
# This should delete both the alias and the index
try:
self.es.indices.delete(self.alias_name)
except NotFoundError:
pass
# Create new index
self.es.indices.create(self.index_name, INDEX_SETTINGS)
# Create a new alias
self.es.indices.put_alias(name=self.alias_name, index=self.index_name)
def start(self):
# Create the new index
self.es.indices.create(self.index_name, INDEX_SETTINGS)
def finish(self):
# Refresh the new index
self.es.indices.refresh(self.index_name)
# Create the alias if it doesnt exist yet
if not self.es.indices.exists_alias(self.alias_name):
# Make sure there isn't currently an index that clashes with alias_name
# This can happen when the atomic rebuilder is first enabled
try:
self.es.indices.delete(self.alias_name)
except NotFoundError:
pass
# Create the alias
self.es.indices.put_alias(name=self.alias_name, index=self.index_name)
else:
# Alias already exists, update it and delete old index
# Find index that alias currently points to, so we can delete it later
old_index = set(self.es.indices.get_alias(name=self.alias_name).keys()) - {self.index_name}
# Update alias to point to new index
self.es.indices.put_alias(name=self.alias_name, index=self.index_name)
# Delete old index
# es.indices.get_alias can return multiple indices. Delete them all
if old_index:
try:
self.es.indices.delete(','.join(old_index))
except NotFoundError:
pass
class ElasticSearch(BaseSearch):
def __init__(self, params):
super(ElasticSearch, self).__init__(params)
@ -316,6 +475,11 @@ class ElasticSearch(BaseSearch):
self.es_index = params.pop('INDEX', 'wagtail')
self.es_timeout = params.pop('TIMEOUT', 10)
if params.pop('ATOMIC_REBUILD', False):
self.rebuilder_class = ElasticSearchAtomicIndexRebuilder
else:
self.rebuilder_class = ElasticSearchIndexRebuilder
# If HOSTS is not set, convert URLS setting to HOSTS
es_urls = params.pop('URLS', ['http://localhost:9200'])
if self.es_hosts is None:
@ -346,60 +510,12 @@ class ElasticSearch(BaseSearch):
timeout=self.es_timeout,
**params)
def get_rebuilder(self):
return self.rebuilder_class(self.es, self.es_index)
def reset_index(self):
# Delete old index
try:
self.es.indices.delete(self.es_index)
except NotFoundError:
pass
# Settings
INDEX_SETTINGS = {
'settings': {
'analysis': {
'analyzer': {
'ngram_analyzer': {
'type': 'custom',
'tokenizer': 'lowercase',
'filter': ['asciifolding', 'ngram']
},
'edgengram_analyzer': {
'type': 'custom',
'tokenizer': 'lowercase',
'filter': ['asciifolding', 'edgengram']
}
},
'tokenizer': {
'ngram_tokenizer': {
'type': 'nGram',
'min_gram': 3,
'max_gram': 15,
},
'edgengram_tokenizer': {
'type': 'edgeNGram',
'min_gram': 2,
'max_gram': 15,
'side': 'front'
}
},
'filter': {
'ngram': {
'type': 'nGram',
'min_gram': 3,
'max_gram': 15
},
'edgengram': {
'type': 'edgeNGram',
'min_gram': 1,
'max_gram': 15
}
}
}
}
}
# Create new index
self.es.indices.create(self.es_index, INDEX_SETTINGS)
# Use the rebuilder to reset the index
self.get_rebuilder().reset_index()
def add_type(self, model):
# Get mapping

Wyświetl plik

@ -22,9 +22,16 @@ class Command(BaseCommand):
# Get backend
backend = get_search_backend(backend_name)
# Reset the index
self.stdout.write(backend_name + ": Reseting index")
backend.reset_index()
# Get rebuilder
rebuilder = backend.get_rebuilder()
if not rebuilder:
self.stdout.write(backend_name + ": Backend doesn't support rebuild. Skipping")
return
# Start rebuild
self.stdout.write(backend_name + ": Starting rebuild")
rebuilder.start()
for model, queryset in object_list:
self.stdout.write(backend_name + ": Indexing model '%s.%s'" % (
@ -32,15 +39,15 @@ class Command(BaseCommand):
model.__name__,
))
# Add type
backend.add_type(model)
# Add model
rebuilder.add_model(model)
# Add objects
backend.add_bulk(model, queryset)
# Add items
rebuilder.add_items(model, queryset)
# Refresh index
self.stdout.write(backend_name + ": Refreshing index")
backend.refresh_index()
# Finish rebuild
self.stdout.write(backend_name + ": Finishing rebuild")
rebuilder.finish()
option_list = BaseCommand.option_list + (
make_option('--backend',

Wyświetl plik

@ -10,6 +10,8 @@ import mock
from django.test import TestCase
from django.db.models import Q
from wagtail.wagtailsearch.backends import get_search_backend
from wagtail.tests.search import models
from .test_backends import BackendTests
@ -748,3 +750,152 @@ class TestBackendConfiguration(TestCase):
self.assertEqual(backend.es_hosts[3]['port'], 443)
self.assertEqual(backend.es_hosts[3]['use_ssl'], True)
self.assertEqual(backend.es_hosts[3]['url_prefix'], '/hello')
class TestRebuilder(TestCase):
def assertDictEqual(self, a, b):
default = self.JSONSerializer().default
self.assertEqual(json.dumps(a, sort_keys=True, default=default), json.dumps(b, sort_keys=True, default=default))
def setUp(self):
# Import using a try-catch block to prevent crashes if the elasticsearch-py
# module is not installed
try:
from wagtail.wagtailsearch.backends.elasticsearch import ElasticSearch
from wagtail.wagtailsearch.backends.elasticsearch import ElasticSearchMapping
from elasticsearch import NotFoundError, JSONSerializer
except ImportError:
raise unittest.SkipTest("elasticsearch-py not installed")
self.ElasticSearch = ElasticSearch
self.ElasticSearchMapping = ElasticSearchMapping
self.NotFoundError = NotFoundError
self.JSONSerializer = JSONSerializer
self.backend = get_search_backend('elasticsearch')
self.es = self.backend.es
self.rebuilder = self.backend.get_rebuilder()
self.backend.reset_index()
def test_start_creates_index(self):
# First, make sure the index is deleted
try:
self.es.indices.delete(self.backend.es_index)
except self.NotFoundError:
pass
self.assertFalse(self.es.indices.exists(self.backend.es_index))
# Run start
self.rebuilder.start()
# Check the index exists
self.assertTrue(self.es.indices.exists(self.backend.es_index))
def test_start_deletes_existing_index(self):
# Put an alias into the index so we can check it was deleted
self.es.indices.put_alias(name='this_index_should_be_deleted', index=self.backend.es_index)
self.assertTrue(self.es.indices.exists_alias(name='this_index_should_be_deleted', index=self.backend.es_index))
# Run start
self.rebuilder.start()
# The alias should be gone (proving the index was deleted and recreated)
self.assertFalse(self.es.indices.exists_alias(name='this_index_should_be_deleted', index=self.backend.es_index))
def test_add_model(self):
self.rebuilder.start()
# Add model
self.rebuilder.add_model(models.SearchTest)
# Check the mapping went into Elasticsearch correctly
mapping = self.ElasticSearchMapping(models.SearchTest)
response = self.es.indices.get_mapping(self.backend.es_index, mapping.get_document_type())
# Make some minor tweaks to the mapping so it matches what is in ES
# These are generally minor issues with the way Wagtail is generating the mapping that are being cleaned up by Elasticsearch
# TODO: Would be nice to fix these
expected_mapping = mapping.get_mapping()
expected_mapping['searchtests_searchtest']['properties']['pk']['store'] = True
expected_mapping['searchtests_searchtest']['properties']['live_filter'].pop('index')
expected_mapping['searchtests_searchtest']['properties']['live_filter'].pop('include_in_all')
expected_mapping['searchtests_searchtest']['properties']['published_date_filter']['format'] = 'dateOptionalTime'
expected_mapping['searchtests_searchtest']['properties']['published_date_filter'].pop('index')
self.assertDictEqual(expected_mapping, response[self.backend.es_index]['mappings'])
class TestAtomicRebuilder(TestCase):
def setUp(self):
# Import using a try-catch block to prevent crashes if the elasticsearch-py
# module is not installed
try:
from wagtail.wagtailsearch.backends.elasticsearch import ElasticSearch
from wagtail.wagtailsearch.backends.elasticsearch import ElasticSearchAtomicIndexRebuilder
from elasticsearch import NotFoundError
except ImportError:
raise unittest.SkipTest("elasticsearch-py not installed")
self.ElasticSearch = ElasticSearch
self.NotFoundError = NotFoundError
self.backend = get_search_backend('elasticsearch')
self.backend.rebuilder_class = ElasticSearchAtomicIndexRebuilder
self.es = self.backend.es
self.rebuilder = self.backend.get_rebuilder()
self.backend.reset_index()
def test_start_creates_new_index(self):
# Rebuilder should make up a new index name that doesn't currently exist
self.assertFalse(self.es.indices.exists(self.rebuilder.index_name))
# Run start
self.rebuilder.start()
# Check the index exists
self.assertTrue(self.es.indices.exists(self.rebuilder.index_name))
def test_start_doesnt_delete_current_index(self):
# Get current index name
current_index_name = list(self.es.indices.get_alias(name=self.rebuilder.alias_name).keys())[0]
# Run start
self.rebuilder.start()
# The index should still exist
self.assertTrue(self.es.indices.exists(current_index_name))
# And the alias should still point to it
self.assertTrue(self.es.indices.exists_alias(name=self.rebuilder.alias_name, index=current_index_name))
def test_finish_updates_alias(self):
# Run start
self.rebuilder.start()
# Check that the alias doesn't point to new index
self.assertFalse(self.es.indices.exists_alias(name=self.rebuilder.alias_name, index=self.rebuilder.index_name))
# Run finish
self.rebuilder.finish()
# Check that the alias now points to the new index
self.assertTrue(self.es.indices.exists_alias(name=self.rebuilder.alias_name, index=self.rebuilder.index_name))
def test_finish_deletes_old_index(self):
# Get current index name
current_index_name = list(self.es.indices.get_alias(name=self.rebuilder.alias_name).keys())[0]
# Run start
self.rebuilder.start()
# Index should still exist
self.assertTrue(self.es.indices.exists(current_index_name))
# Run finish
self.rebuilder.finish()
# Index should be gone
self.assertFalse(self.es.indices.exists(current_index_name))