kopia lustrzana https://github.com/wagtail/wagtail
refactor(search/elastic): Added ElasticSearchIndex class
This class represents an individual index in Elasticsearch. This commit also refactors duplicated implementations of the CRUD methods for updating documents and mappings in the database. These now all live in the new index class.pull/2064/merge
rodzic
c08f5b76a5
commit
4441232fe3
|
@ -369,7 +369,7 @@ class ElasticSearchResults(BaseSearchResults):
|
|||
def _do_search(self):
|
||||
# Params for elasticsearch query
|
||||
params = dict(
|
||||
index=self.backend.name,
|
||||
index=self.backend.index_name,
|
||||
body=self._get_es_body(),
|
||||
_source=False,
|
||||
fields='pk',
|
||||
|
@ -400,7 +400,7 @@ class ElasticSearchResults(BaseSearchResults):
|
|||
def _do_count(self):
|
||||
# Get count
|
||||
hit_count = self.backend.es.count(
|
||||
index=self.backend.name,
|
||||
index=self.backend.index_name,
|
||||
body=self._get_es_body(for_count=True),
|
||||
)['count']
|
||||
|
||||
|
@ -412,26 +412,48 @@ class ElasticSearchResults(BaseSearchResults):
|
|||
return max(hit_count, 0)
|
||||
|
||||
|
||||
class ElasticSearchIndexRebuilder(object):
|
||||
def __init__(self, index):
|
||||
self.es = index.es
|
||||
self.index_name = index.name
|
||||
self.mapping_class = index.mapping_class
|
||||
self.index_settings = index.settings
|
||||
class ElasticSearchIndex(object):
|
||||
def __init__(self, backend, name):
|
||||
self.backend = backend
|
||||
self.es = backend.es
|
||||
self.mapping_class = backend.mapping_class
|
||||
self.name = name
|
||||
|
||||
def reset_index(self):
|
||||
# Delete old index
|
||||
def put(self):
|
||||
self.es.indices.create(self.name, self.backend.settings)
|
||||
|
||||
def delete(self):
|
||||
try:
|
||||
self.es.indices.delete(self.index_name)
|
||||
self.es.indices.delete(self.name)
|
||||
except NotFoundError:
|
||||
pass
|
||||
|
||||
# Create new index
|
||||
self.es.indices.create(self.index_name, self.index_settings)
|
||||
def exists(self):
|
||||
return self.es.indices.exists(self.name)
|
||||
|
||||
def start(self):
|
||||
# Reset the index
|
||||
self.reset_index()
|
||||
def is_alias(self):
|
||||
return self.es.indices.exists_alias(self.name)
|
||||
|
||||
def aliased_indices(self):
|
||||
"""
|
||||
If this index object represents an alias (which appear the same in the
|
||||
Elasticsearch API), this method can be used to fetch the list of indices
|
||||
the alias points to.
|
||||
|
||||
Use the is_alias method if you need to find out if this an alias. This
|
||||
returns an empty list if called on an index.
|
||||
"""
|
||||
return [
|
||||
self.backend.index_class(self.backend, index_name)
|
||||
for index_name in self.es.indices.get_alias(name=self.name).keys()
|
||||
]
|
||||
|
||||
def put_alias(self, name):
|
||||
"""
|
||||
Creates a new alias to this index. If the alias already exists it will
|
||||
be repointed to this index.
|
||||
"""
|
||||
self.es.indices.put_alias(name=name, index=self.name)
|
||||
|
||||
def add_model(self, model):
|
||||
# Get mapping
|
||||
|
@ -439,10 +461,23 @@ class ElasticSearchIndexRebuilder(object):
|
|||
|
||||
# Put mapping
|
||||
self.es.indices.put_mapping(
|
||||
index=self.index_name, doc_type=mapping.get_document_type(), body=mapping.get_mapping()
|
||||
index=self.name, doc_type=mapping.get_document_type(), body=mapping.get_mapping()
|
||||
)
|
||||
|
||||
def add_items(self, model, obj_list):
|
||||
def add_item(self, item):
|
||||
# Make sure the object can be indexed
|
||||
if not class_is_indexed(item.__class__):
|
||||
return
|
||||
|
||||
# Get mapping
|
||||
mapping = self.mapping_class(item.__class__)
|
||||
|
||||
# Add document to index
|
||||
self.es.index(
|
||||
self.name, mapping.get_document_type(), mapping.get_document(item), id=mapping.get_document_id(item)
|
||||
)
|
||||
|
||||
def add_items(self, model, items):
|
||||
if not class_is_indexed(model):
|
||||
return
|
||||
|
||||
|
@ -452,85 +487,122 @@ class ElasticSearchIndexRebuilder(object):
|
|||
|
||||
# Create list of actions
|
||||
actions = []
|
||||
for obj in obj_list:
|
||||
for item in items:
|
||||
# Create the action
|
||||
action = {
|
||||
'_index': self.index_name,
|
||||
'_index': self.name,
|
||||
'_type': doc_type,
|
||||
'_id': mapping.get_document_id(obj),
|
||||
'_id': mapping.get_document_id(item),
|
||||
}
|
||||
action.update(mapping.get_document(obj))
|
||||
action.update(mapping.get_document(item))
|
||||
actions.append(action)
|
||||
|
||||
# Run the actions
|
||||
bulk(self.es, actions)
|
||||
|
||||
def delete_item(self, item):
|
||||
# Make sure the object can be indexed
|
||||
if not class_is_indexed(item.__class__):
|
||||
return
|
||||
|
||||
# Get mapping
|
||||
mapping = self.mapping_class(item.__class__)
|
||||
|
||||
# Delete document
|
||||
try:
|
||||
self.es.delete(
|
||||
self.name,
|
||||
mapping.get_document_type(),
|
||||
mapping.get_document_id(item),
|
||||
)
|
||||
except NotFoundError:
|
||||
pass # Document doesn't exist, ignore this exception
|
||||
|
||||
def refresh(self):
|
||||
self.es.indices.refresh(self.name)
|
||||
|
||||
def reset(self):
|
||||
# Delete old index
|
||||
self.delete()
|
||||
|
||||
# Create new index
|
||||
self.put()
|
||||
|
||||
|
||||
class ElasticSearchIndexRebuilder(object):
|
||||
def __init__(self, index):
|
||||
self.index = index
|
||||
|
||||
def reset_index(self):
|
||||
self.index.reset()
|
||||
|
||||
def start(self):
|
||||
# Reset the index
|
||||
self.reset_index()
|
||||
|
||||
def add_model(self, model):
|
||||
self.index.add_model(model)
|
||||
|
||||
def add_items(self, model, obj_list):
|
||||
self.index.add_items(model, obj_list)
|
||||
|
||||
def finish(self):
|
||||
# Refresh index
|
||||
self.es.indices.refresh(self.index_name)
|
||||
self.index.refresh()
|
||||
|
||||
|
||||
class ElasticSearchAtomicIndexRebuilder(ElasticSearchIndexRebuilder):
|
||||
def __init__(self, index):
|
||||
self.es = index.es
|
||||
self.alias_name = index.name
|
||||
self.index_name = self.alias_name + '_' + get_random_string(7).lower()
|
||||
self.mapping_class = index.mapping_class
|
||||
self.index_settings = index.settings
|
||||
self.alias = index
|
||||
self.index = index.backend.index_class(
|
||||
index.backend,
|
||||
self.alias.name + '_' + get_random_string(7).lower()
|
||||
)
|
||||
|
||||
def reset_index(self):
|
||||
# Delete old index using the alias
|
||||
# This should delete both the alias and the index
|
||||
try:
|
||||
self.es.indices.delete(self.alias_name)
|
||||
except NotFoundError:
|
||||
pass
|
||||
self.alias.delete()
|
||||
|
||||
# Create new index
|
||||
self.es.indices.create(self.index_name, self.index_settings)
|
||||
self.index.put()
|
||||
|
||||
# Create a new alias
|
||||
self.es.indices.put_alias(name=self.alias_name, index=self.index_name)
|
||||
self.index.put_alias(self.alias.name)
|
||||
|
||||
def start(self):
|
||||
# Create the new index
|
||||
self.es.indices.create(self.index_name, self.index_settings)
|
||||
self.index.put()
|
||||
|
||||
def finish(self):
|
||||
# Refresh the new index
|
||||
self.es.indices.refresh(self.index_name)
|
||||
self.index.refresh()
|
||||
|
||||
# Create the alias if it doesnt exist yet
|
||||
if not self.es.indices.exists_alias(self.alias_name):
|
||||
# Make sure there isn't currently an index that clashes with alias_name
|
||||
if not self.alias.is_alias():
|
||||
# Make sure there isn't currently an index that clashes with alias
|
||||
# This can happen when the atomic rebuilder is first enabled
|
||||
try:
|
||||
self.es.indices.delete(self.alias_name)
|
||||
except NotFoundError:
|
||||
pass
|
||||
self.alias.delete()
|
||||
|
||||
# Create the alias
|
||||
self.es.indices.put_alias(name=self.alias_name, index=self.index_name)
|
||||
self.index.put_alias(self.alias.name)
|
||||
|
||||
else:
|
||||
# Alias already exists, update it and delete old index
|
||||
|
||||
# Find index that alias currently points to, so we can delete it later
|
||||
old_index = set(self.es.indices.get_alias(name=self.alias_name).keys()) - {self.index_name}
|
||||
old_index = self.alias.aliased_indices()
|
||||
|
||||
# Update alias to point to new index
|
||||
self.es.indices.put_alias(name=self.alias_name, index=self.index_name)
|
||||
self.index.put_alias(self.alias.name)
|
||||
|
||||
# Delete old index
|
||||
# es.indices.get_alias can return multiple indices. Delete them all
|
||||
if old_index:
|
||||
try:
|
||||
self.es.indices.delete(','.join(old_index))
|
||||
except NotFoundError:
|
||||
pass
|
||||
# aliased_indices() can return multiple indices. Delete them all
|
||||
for index in old_index:
|
||||
if index.name != self.index.name:
|
||||
index.delete()
|
||||
|
||||
|
||||
class ElasticSearch(BaseSearch):
|
||||
index_class = ElasticSearchIndex
|
||||
query_class = ElasticSearchQuery
|
||||
results_class = ElasticSearchResults
|
||||
mapping_class = ElasticSearchMapping
|
||||
|
@ -586,7 +658,7 @@ class ElasticSearch(BaseSearch):
|
|||
|
||||
# Get settings
|
||||
self.hosts = params.pop('HOSTS', None)
|
||||
self.name = params.pop('INDEX', 'wagtail')
|
||||
self.index_name = params.pop('INDEX', 'wagtail')
|
||||
self.timeout = params.pop('TIMEOUT', 10)
|
||||
|
||||
if params.pop('ATOMIC_REBUILD', False):
|
||||
|
@ -624,78 +696,30 @@ class ElasticSearch(BaseSearch):
|
|||
timeout=self.timeout,
|
||||
**params)
|
||||
|
||||
def get_index(self):
|
||||
return self.index_class(self, self.index_name)
|
||||
|
||||
def get_rebuilder(self):
|
||||
return self.rebuilder_class(self)
|
||||
return self.rebuilder_class(self.get_index())
|
||||
|
||||
def reset_index(self):
|
||||
# Use the rebuilder to reset the index
|
||||
self.get_rebuilder().reset_index()
|
||||
|
||||
def add_type(self, model):
|
||||
# Get mapping
|
||||
mapping = self.mapping_class(model)
|
||||
|
||||
# Put mapping
|
||||
self.es.indices.put_mapping(
|
||||
index=self.name, doc_type=mapping.get_document_type(), body=mapping.get_mapping()
|
||||
)
|
||||
self.get_index().add_model(model)
|
||||
|
||||
def refresh_index(self):
|
||||
self.es.indices.refresh(self.name)
|
||||
self.get_index().refresh()
|
||||
|
||||
def add(self, obj):
|
||||
# Make sure the object can be indexed
|
||||
if not class_is_indexed(obj.__class__):
|
||||
return
|
||||
|
||||
# Get mapping
|
||||
mapping = self.mapping_class(obj.__class__)
|
||||
|
||||
# Add document to index
|
||||
self.es.index(
|
||||
self.name, mapping.get_document_type(), mapping.get_document(obj), id=mapping.get_document_id(obj)
|
||||
)
|
||||
self.get_index().add_item(obj)
|
||||
|
||||
def add_bulk(self, model, obj_list):
|
||||
if not class_is_indexed(model):
|
||||
return
|
||||
|
||||
# Get mapping
|
||||
mapping = self.mapping_class(model)
|
||||
doc_type = mapping.get_document_type()
|
||||
|
||||
# Create list of actions
|
||||
actions = []
|
||||
for obj in obj_list:
|
||||
# Create the action
|
||||
action = {
|
||||
'_index': self.name,
|
||||
'_type': doc_type,
|
||||
'_id': mapping.get_document_id(obj),
|
||||
}
|
||||
action.update(mapping.get_document(obj))
|
||||
actions.append(action)
|
||||
|
||||
# Run the actions
|
||||
bulk(self.es, actions)
|
||||
self.get_index().add_items(model, obj_list)
|
||||
|
||||
def delete(self, obj):
|
||||
# Make sure the object can be indexed
|
||||
if not class_is_indexed(obj.__class__):
|
||||
return
|
||||
|
||||
# Get mapping
|
||||
mapping = self.mapping_class(obj.__class__)
|
||||
|
||||
# Delete document
|
||||
try:
|
||||
self.es.delete(
|
||||
self.name,
|
||||
mapping.get_document_type(),
|
||||
mapping.get_document_id(obj),
|
||||
)
|
||||
except NotFoundError:
|
||||
pass # Document doesn't exist, ignore this exception
|
||||
self.get_index().delete_item(obj)
|
||||
|
||||
|
||||
SearchBackend = ElasticSearch
|
||||
|
|
|
@ -970,23 +970,23 @@ class TestRebuilder(TestCase):
|
|||
def test_start_creates_index(self):
|
||||
# First, make sure the index is deleted
|
||||
try:
|
||||
self.es.indices.delete(self.backend.name)
|
||||
self.es.indices.delete(self.backend.index_name)
|
||||
except self.NotFoundError:
|
||||
pass
|
||||
|
||||
self.assertFalse(self.es.indices.exists(self.backend.name))
|
||||
self.assertFalse(self.es.indices.exists(self.backend.index_name))
|
||||
|
||||
# Run start
|
||||
self.rebuilder.start()
|
||||
|
||||
# Check the index exists
|
||||
self.assertTrue(self.es.indices.exists(self.backend.name))
|
||||
self.assertTrue(self.es.indices.exists(self.backend.index_name))
|
||||
|
||||
def test_start_deletes_existing_index(self):
|
||||
# Put an alias into the index so we can check it was deleted
|
||||
self.es.indices.put_alias(name='this_index_should_be_deleted', index=self.backend.name)
|
||||
self.es.indices.put_alias(name='this_index_should_be_deleted', index=self.backend.index_name)
|
||||
self.assertTrue(
|
||||
self.es.indices.exists_alias(name='this_index_should_be_deleted', index=self.backend.name)
|
||||
self.es.indices.exists_alias(name='this_index_should_be_deleted', index=self.backend.index_name)
|
||||
)
|
||||
|
||||
# Run start
|
||||
|
@ -994,7 +994,7 @@ class TestRebuilder(TestCase):
|
|||
|
||||
# The alias should be gone (proving the index was deleted and recreated)
|
||||
self.assertFalse(
|
||||
self.es.indices.exists_alias(name='this_index_should_be_deleted', index=self.backend.name)
|
||||
self.es.indices.exists_alias(name='this_index_should_be_deleted', index=self.backend.index_name)
|
||||
)
|
||||
|
||||
def test_add_model(self):
|
||||
|
@ -1005,7 +1005,7 @@ class TestRebuilder(TestCase):
|
|||
|
||||
# Check the mapping went into Elasticsearch correctly
|
||||
mapping = ElasticSearch.mapping_class(models.SearchTest)
|
||||
response = self.es.indices.get_mapping(self.backend.name, mapping.get_document_type())
|
||||
response = self.es.indices.get_mapping(self.backend.index_name, mapping.get_document_type())
|
||||
|
||||
# Make some minor tweaks to the mapping so it matches what is in ES
|
||||
# These are generally minor issues with the way Wagtail is
|
||||
|
@ -1019,7 +1019,7 @@ class TestRebuilder(TestCase):
|
|||
'dateOptionalTime'
|
||||
expected_mapping['searchtests_searchtest']['properties']['published_date_filter'].pop('index')
|
||||
|
||||
self.assertDictEqual(expected_mapping, response[self.backend.name]['mappings'])
|
||||
self.assertDictEqual(expected_mapping, response[self.backend.index_name]['mappings'])
|
||||
|
||||
|
||||
@unittest.skipUnless(os.environ.get('ELASTICSEARCH_URL', False), "ELASTICSEARCH_URL not set")
|
||||
|
@ -1034,17 +1034,17 @@ class TestAtomicRebuilder(TestCase):
|
|||
|
||||
def test_start_creates_new_index(self):
|
||||
# Rebuilder should make up a new index name that doesn't currently exist
|
||||
self.assertFalse(self.es.indices.exists(self.rebuilder.index_name))
|
||||
self.assertFalse(self.es.indices.exists(self.rebuilder.index.name))
|
||||
|
||||
# Run start
|
||||
self.rebuilder.start()
|
||||
|
||||
# Check the index exists
|
||||
self.assertTrue(self.es.indices.exists(self.rebuilder.index_name))
|
||||
self.assertTrue(self.es.indices.exists(self.rebuilder.index.name))
|
||||
|
||||
def test_start_doesnt_delete_current_index(self):
|
||||
# Get current index name
|
||||
current_index_name = list(self.es.indices.get_alias(name=self.rebuilder.alias_name).keys())[0]
|
||||
current_index_name = list(self.es.indices.get_alias(name=self.rebuilder.alias.name).keys())[0]
|
||||
|
||||
# Run start
|
||||
self.rebuilder.start()
|
||||
|
@ -1053,7 +1053,7 @@ class TestAtomicRebuilder(TestCase):
|
|||
self.assertTrue(self.es.indices.exists(current_index_name))
|
||||
|
||||
# And the alias should still point to it
|
||||
self.assertTrue(self.es.indices.exists_alias(name=self.rebuilder.alias_name, index=current_index_name))
|
||||
self.assertTrue(self.es.indices.exists_alias(name=self.rebuilder.alias.name, index=current_index_name))
|
||||
|
||||
def test_finish_updates_alias(self):
|
||||
# Run start
|
||||
|
@ -1061,18 +1061,18 @@ class TestAtomicRebuilder(TestCase):
|
|||
|
||||
# Check that the alias doesn't point to new index
|
||||
self.assertFalse(
|
||||
self.es.indices.exists_alias(name=self.rebuilder.alias_name, index=self.rebuilder.index_name)
|
||||
self.es.indices.exists_alias(name=self.rebuilder.alias.name, index=self.rebuilder.index.name)
|
||||
)
|
||||
|
||||
# Run finish
|
||||
self.rebuilder.finish()
|
||||
|
||||
# Check that the alias now points to the new index
|
||||
self.assertTrue(self.es.indices.exists_alias(name=self.rebuilder.alias_name, index=self.rebuilder.index_name))
|
||||
self.assertTrue(self.es.indices.exists_alias(name=self.rebuilder.alias.name, index=self.rebuilder.index.name))
|
||||
|
||||
def test_finish_deletes_old_index(self):
|
||||
# Get current index name
|
||||
current_index_name = list(self.es.indices.get_alias(name=self.rebuilder.alias_name).keys())[0]
|
||||
current_index_name = list(self.es.indices.get_alias(name=self.rebuilder.alias.name).keys())[0]
|
||||
|
||||
# Run start
|
||||
self.rebuilder.start()
|
||||
|
|
Ładowanie…
Reference in New Issue