diff --git a/runtests.py b/runtests.py index 96653b40d3..efa66910f0 100755 --- a/runtests.py +++ b/runtests.py @@ -13,7 +13,7 @@ MEDIA_ROOT = os.path.join(WAGTAIL_ROOT, 'test-media') if not settings.configured: try: - import elasticutils + import elasticsearch has_elasticsearch = True except ImportError: has_elasticsearch = False diff --git a/wagtail/wagtailsearch/backends/elasticsearch.py b/wagtail/wagtailsearch/backends/elasticsearch.py index d2b58d564f..7571db5ac3 100644 --- a/wagtail/wagtailsearch/backends/elasticsearch.py +++ b/wagtail/wagtailsearch/backends/elasticsearch.py @@ -1,6 +1,8 @@ +from __future__ import absolute_import + from django.db import models -from elasticutils import get_es, S +from elasticsearch import Elasticsearch, NotFoundError from wagtail.wagtailsearch.backends.base import BaseSearch from wagtail.wagtailsearch.indexed import Indexed @@ -9,16 +11,131 @@ import string class ElasticSearchResults(object): - def __init__(self, model, query, prefetch_related=[]): + def __init__(self, backend, model, query_string, fields=None, filters={}, prefetch_related=[]): + self.backend = backend self.model = model - self.query = query - self.count = query.count() + self.query_string = query_string + self.fields = fields + self.filters = filters self.prefetch_related = prefetch_related + def _get_filters(self): + # Filters + filters = [] + + # Filter by content type + filters.append({ + 'prefix': { + 'content_type': self.model.indexed_get_content_type() + } + }) + + # Extra filters + if self.filters: + for key, value in self.filters.items(): + if '__' in key: + field, lookup = key.split('__') + else: + field = key + lookup = None + + if lookup is None: + if value is None: + filters.append({ + 'missing': { + 'field': field, + } + }) + else: + filters.append({ + 'term': { + field: value + } + }) + + if lookup in ['startswith', 'prefix']: + filters.append({ + 'prefix': { + field: value + } + }) + + if lookup in ['gt', 'gte', 'lt', 'lte']: + filters.append({ + 'range': { + field: { + lookup: value, + } + } + }) + + if lookup == 'range': + lower, upper = value + filters.append({ + 'range': { + field: { + 'gte': lower, + 'lte': upper, + } + } + }) + + return filters + + def _get_query(self): + # Query + query = { + 'query_string': { + 'query': self.query_string, + } + } + + # Fields + if self.fields: + query['query_string']['fields'] = self.fields + + # Filters + filters = self._get_filters() + + return { + 'query': { + 'filtered': { + 'query': query, + 'filter': { + 'and': filters, + } + } + } + } + + def _get_results_pks(self, offset=0, limit=None): + query = self._get_query() + query['query']['from'] = offset + if limit is not None: + query['query']['size'] = limit + + hits = self.backend.es.search( + index=self.backend.es_index, + body=query, + _source=False, + fields='pk', + ) + + return [hit['fields']['pk'][0] for hit in hits['hits']['hits']] + + def _get_count(self): + query = self._get_query() + count = self.backend.es.count( + index=self.backend.es_index, + body=query, + ) + + return count['count'] + def __getitem__(self, key): if isinstance(key, slice): # Get primary keys - pk_list_unclean = [result._source["pk"] for result in self.query[key]] + pk_list_unclean = self._get_results_pks(key.start, key.stop - key.start) # Remove duplicate keys (and preserve order) seen_pks = set() @@ -45,11 +162,11 @@ class ElasticSearchResults(object): return results_sorted else: # Return a single item - pk = self.query[key]._source["pk"] + pk = self._get_results_pks(key, key + 1)[0] return self.model.objects.get(pk=pk) def __len__(self): - return self.count + return self._get_count() class ElasticSearch(BaseSearch): @@ -64,22 +181,17 @@ class ElasticSearch(BaseSearch): # Get ElasticSearch interface # Any remaining params are passed into the ElasticSearch constructor - self.es = get_es( + self.es = Elasticsearch( urls=self.es_urls, timeout=self.es_timeout, force_new=self.es_force_new, **params) - self.s = S().es( - urls=self.es_urls, - timeout=self.es_timeout, - force_new=self.es_force_new, - **params).indexes(self.es_index) def reset_index(self): # Delete old index try: - self.es.delete_index(self.es_index) - except: + self.es.indices.delete(self.es_index) + except NotFoundError: pass # Settings @@ -128,7 +240,7 @@ class ElasticSearch(BaseSearch): } # Create new index - self.es.create_index(self.es_index, INDEX_SETTINGS) + self.es.indices.create(self.es_index, INDEX_SETTINGS) def add_type(self, model): # Get type name @@ -144,14 +256,14 @@ class ElasticSearch(BaseSearch): }.items() + indexed_fields.items()) # Put mapping - self.es.put_mapping(self.es_index, content_type, { + self.es.indices.put_mapping(index=self.es_index, doc_type=content_type, body={ content_type: { "properties": fields, } }) def refresh_index(self): - self.es.refresh(self.es_index) + self.es.indices.refresh(self.es_index) def add(self, obj): # Make sure the object can be indexed @@ -165,6 +277,10 @@ class ElasticSearch(BaseSearch): self.es.index(self.es_index, obj.indexed_get_content_type(), doc, id=doc["id"]) def add_bulk(self, obj_list): + # TODO: Make this work with new elastic search module + for obj in obj_list: + self.add(obj) + return # Group all objects by their type type_set = {} for obj in obj_list: @@ -194,13 +310,14 @@ class ElasticSearch(BaseSearch): if not isinstance(obj, Indexed) or not isinstance(obj, models.Model): return - # Get ID for document - doc_id = obj.indexed_get_document_id() - # Delete document try: - self.es.delete(self.es_index, obj.indexed_get_content_type(), doc_id) - except: + self.es.delete( + self.es_index, + obj.indexed_get_content_type(), + obj.indexed_get_document_id(), + ) + except NotFoundError: pass # Document doesn't exist, ignore this exception def search(self, query_string, model, fields=None, filters={}, prefetch_related=[]): @@ -215,27 +332,5 @@ class ElasticSearch(BaseSearch): if not query_string: return [] - # Query - if fields: - query = self.s.query_raw({ - "query_string": { - "query": query_string, - "fields": fields, - } - }) - else: - query = self.s.query_raw({ - "query_string": { - "query": query_string, - } - }) - - # Filter results by this content type - query = query.filter(content_type__prefix=model.indexed_get_content_type()) - - # Extra filters - if filters: - query = query.filter(**filters) - # Return search results - return ElasticSearchResults(model, query, prefetch_related=prefetch_related) + return ElasticSearchResults(self, model, query_string, fields=fields, filters=filters, prefetch_related=prefetch_related)