Scrapped ElasticUtils, use Elasticsearch-py instead

Conflicts:
	wagtail/wagtailsearch/backends/elasticsearch.py
	wagtail/wagtailsearch/indexed.py
pull/342/head
Karl Hobley 2014-04-05 20:10:58 +01:00 zatwierdzone przez Karl Hobley
rodzic b26744e137
commit 4081206c77
2 zmienionych plików z 142 dodań i 47 usunięć

Wyświetl plik

@ -13,7 +13,7 @@ MEDIA_ROOT = os.path.join(WAGTAIL_ROOT, 'test-media')
if not settings.configured:
try:
import elasticutils
import elasticsearch
has_elasticsearch = True
except ImportError:
has_elasticsearch = False

Wyświetl plik

@ -1,6 +1,8 @@
from __future__ import absolute_import
from django.db import models
from elasticutils import get_es, S
from elasticsearch import Elasticsearch, NotFoundError
from wagtail.wagtailsearch.backends.base import BaseSearch
from wagtail.wagtailsearch.indexed import Indexed
@ -9,16 +11,131 @@ import string
class ElasticSearchResults(object):
def __init__(self, model, query, prefetch_related=[]):
def __init__(self, backend, model, query_string, fields=None, filters={}, prefetch_related=[]):
self.backend = backend
self.model = model
self.query = query
self.count = query.count()
self.query_string = query_string
self.fields = fields
self.filters = filters
self.prefetch_related = prefetch_related
def _get_filters(self):
# Filters
filters = []
# Filter by content type
filters.append({
'prefix': {
'content_type': self.model.indexed_get_content_type()
}
})
# Extra filters
if self.filters:
for key, value in self.filters.items():
if '__' in key:
field, lookup = key.split('__')
else:
field = key
lookup = None
if lookup is None:
if value is None:
filters.append({
'missing': {
'field': field,
}
})
else:
filters.append({
'term': {
field: value
}
})
if lookup in ['startswith', 'prefix']:
filters.append({
'prefix': {
field: value
}
})
if lookup in ['gt', 'gte', 'lt', 'lte']:
filters.append({
'range': {
field: {
lookup: value,
}
}
})
if lookup == 'range':
lower, upper = value
filters.append({
'range': {
field: {
'gte': lower,
'lte': upper,
}
}
})
return filters
def _get_query(self):
# Query
query = {
'query_string': {
'query': self.query_string,
}
}
# Fields
if self.fields:
query['query_string']['fields'] = self.fields
# Filters
filters = self._get_filters()
return {
'query': {
'filtered': {
'query': query,
'filter': {
'and': filters,
}
}
}
}
def _get_results_pks(self, offset=0, limit=None):
query = self._get_query()
query['query']['from'] = offset
if limit is not None:
query['query']['size'] = limit
hits = self.backend.es.search(
index=self.backend.es_index,
body=query,
_source=False,
fields='pk',
)
return [hit['fields']['pk'][0] for hit in hits['hits']['hits']]
def _get_count(self):
query = self._get_query()
count = self.backend.es.count(
index=self.backend.es_index,
body=query,
)
return count['count']
def __getitem__(self, key):
if isinstance(key, slice):
# Get primary keys
pk_list_unclean = [result._source["pk"] for result in self.query[key]]
pk_list_unclean = self._get_results_pks(key.start, key.stop - key.start)
# Remove duplicate keys (and preserve order)
seen_pks = set()
@ -45,11 +162,11 @@ class ElasticSearchResults(object):
return results_sorted
else:
# Return a single item
pk = self.query[key]._source["pk"]
pk = self._get_results_pks(key, key + 1)[0]
return self.model.objects.get(pk=pk)
def __len__(self):
return self.count
return self._get_count()
class ElasticSearch(BaseSearch):
@ -64,22 +181,17 @@ class ElasticSearch(BaseSearch):
# Get ElasticSearch interface
# Any remaining params are passed into the ElasticSearch constructor
self.es = get_es(
self.es = Elasticsearch(
urls=self.es_urls,
timeout=self.es_timeout,
force_new=self.es_force_new,
**params)
self.s = S().es(
urls=self.es_urls,
timeout=self.es_timeout,
force_new=self.es_force_new,
**params).indexes(self.es_index)
def reset_index(self):
# Delete old index
try:
self.es.delete_index(self.es_index)
except:
self.es.indices.delete(self.es_index)
except NotFoundError:
pass
# Settings
@ -128,7 +240,7 @@ class ElasticSearch(BaseSearch):
}
# Create new index
self.es.create_index(self.es_index, INDEX_SETTINGS)
self.es.indices.create(self.es_index, INDEX_SETTINGS)
def add_type(self, model):
# Get type name
@ -144,14 +256,14 @@ class ElasticSearch(BaseSearch):
}.items() + indexed_fields.items())
# Put mapping
self.es.put_mapping(self.es_index, content_type, {
self.es.indices.put_mapping(index=self.es_index, doc_type=content_type, body={
content_type: {
"properties": fields,
}
})
def refresh_index(self):
self.es.refresh(self.es_index)
self.es.indices.refresh(self.es_index)
def add(self, obj):
# Make sure the object can be indexed
@ -165,6 +277,10 @@ class ElasticSearch(BaseSearch):
self.es.index(self.es_index, obj.indexed_get_content_type(), doc, id=doc["id"])
def add_bulk(self, obj_list):
# TODO: Make this work with new elastic search module
for obj in obj_list:
self.add(obj)
return
# Group all objects by their type
type_set = {}
for obj in obj_list:
@ -194,13 +310,14 @@ class ElasticSearch(BaseSearch):
if not isinstance(obj, Indexed) or not isinstance(obj, models.Model):
return
# Get ID for document
doc_id = obj.indexed_get_document_id()
# Delete document
try:
self.es.delete(self.es_index, obj.indexed_get_content_type(), doc_id)
except:
self.es.delete(
self.es_index,
obj.indexed_get_content_type(),
obj.indexed_get_document_id(),
)
except NotFoundError:
pass # Document doesn't exist, ignore this exception
def search(self, query_string, model, fields=None, filters={}, prefetch_related=[]):
@ -215,27 +332,5 @@ class ElasticSearch(BaseSearch):
if not query_string:
return []
# Query
if fields:
query = self.s.query_raw({
"query_string": {
"query": query_string,
"fields": fields,
}
})
else:
query = self.s.query_raw({
"query_string": {
"query": query_string,
}
})
# Filter results by this content type
query = query.filter(content_type__prefix=model.indexed_get_content_type())
# Extra filters
if filters:
query = query.filter(**filters)
# Return search results
return ElasticSearchResults(model, query, prefetch_related=prefetch_related)
return ElasticSearchResults(self, model, query_string, fields=fields, filters=filters, prefetch_related=prefetch_related)