Remove Elasticsearch6Mapping and make Elasticsearch7Mapping the base implementation

pull/11138/head
Matt Westcott 2023-10-24 17:34:20 +01:00
rodzic 7cc955048c
commit 098a4086b7
2 zmienionych plików z 276 dodań i 292 usunięć

Wyświetl plik

@ -1,283 +0,0 @@
from django.db import models
from wagtail.search.backends.base import (
get_model_root,
)
from wagtail.search.index import (
AutocompleteField,
FilterField,
Indexed,
RelatedFields,
SearchField,
)
class Field:
def __init__(self, field_name, boost=1):
self.field_name = field_name
self.boost = boost
class Elasticsearch6Mapping:
all_field_name = "_all_text"
edgengrams_field_name = "_edgengrams"
type_map = {
"AutoField": "integer",
"BinaryField": "binary",
"BooleanField": "boolean",
"CharField": "string",
"CommaSeparatedIntegerField": "string",
"DateField": "date",
"DateTimeField": "date",
"DecimalField": "double",
"FileField": "string",
"FilePathField": "string",
"FloatField": "double",
"IntegerField": "integer",
"BigIntegerField": "long",
"IPAddressField": "string",
"GenericIPAddressField": "string",
"NullBooleanField": "boolean",
"PositiveIntegerField": "integer",
"PositiveSmallIntegerField": "integer",
"SlugField": "string",
"SmallIntegerField": "integer",
"TextField": "string",
"TimeField": "date",
}
keyword_type = "keyword"
text_type = "text"
edgengram_analyzer_config = {
"analyzer": "edgengram_analyzer",
"search_analyzer": "standard",
}
def __init__(self, model):
self.model = model
def get_parent(self):
for base in self.model.__bases__:
if issubclass(base, Indexed) and issubclass(base, models.Model):
return type(self)(base)
def get_document_type(self):
return "doc"
def get_field_column_name(self, field):
# Fields in derived models get prefixed with their model name, fields
# in the root model don't get prefixed at all
# This is to prevent mapping clashes in cases where two page types have
# a field with the same name but a different type.
root_model = get_model_root(self.model)
definition_model = field.get_definition_model(self.model)
if definition_model != root_model:
prefix = (
definition_model._meta.app_label.lower()
+ "_"
+ definition_model.__name__.lower()
+ "__"
)
else:
prefix = ""
if isinstance(field, FilterField):
return prefix + field.get_attname(self.model) + "_filter"
elif isinstance(field, AutocompleteField):
return prefix + field.get_attname(self.model) + "_edgengrams"
elif isinstance(field, SearchField):
return prefix + field.get_attname(self.model)
elif isinstance(field, RelatedFields):
return prefix + field.field_name
def get_boost_field_name(self, boost):
# replace . with _ to avoid issues with . in field names
boost = str(float(boost)).replace(".", "_")
return f"{self.all_field_name}_boost_{boost}"
def get_content_type(self):
"""
Returns the content type as a string for the model.
For example: "wagtailcore.Page"
"myapp.MyModel"
"""
return self.model._meta.app_label + "." + self.model.__name__
def get_all_content_types(self):
"""
Returns all the content type strings that apply to this model.
This includes the models' content type and all concrete ancestor
models that inherit from Indexed.
For example: ["myapp.MyPageModel", "wagtailcore.Page"]
["myapp.MyModel"]
"""
# Add our content type
content_types = [self.get_content_type()]
# Add all ancestor classes content types as well
ancestor = self.get_parent()
while ancestor:
content_types.append(ancestor.get_content_type())
ancestor = ancestor.get_parent()
return content_types
def get_field_mapping(self, field):
if isinstance(field, RelatedFields):
mapping = {"type": "nested", "properties": {}}
nested_model = field.get_field(self.model).related_model
nested_mapping = type(self)(nested_model)
for sub_field in field.fields:
sub_field_name, sub_field_mapping = nested_mapping.get_field_mapping(
sub_field
)
mapping["properties"][sub_field_name] = sub_field_mapping
return self.get_field_column_name(field), mapping
else:
mapping = {"type": self.type_map.get(field.get_type(self.model), "string")}
if isinstance(field, SearchField):
if mapping["type"] == "string":
mapping["type"] = self.text_type
if field.boost:
mapping["boost"] = field.boost
mapping["include_in_all"] = True
if isinstance(field, AutocompleteField):
mapping["type"] = self.text_type
mapping.update(self.edgengram_analyzer_config)
elif isinstance(field, FilterField):
if mapping["type"] == "string":
mapping["type"] = self.keyword_type
if "es_extra" in field.kwargs:
for key, value in field.kwargs["es_extra"].items():
mapping[key] = value
return self.get_field_column_name(field), mapping
def get_mapping(self):
# Make field list
fields = {
"pk": {"type": self.keyword_type, "store": True},
"content_type": {"type": self.keyword_type},
self.edgengrams_field_name: {"type": self.text_type},
}
fields[self.edgengrams_field_name].update(self.edgengram_analyzer_config)
for field in self.model.get_search_fields():
key, val = self.get_field_mapping(field)
fields[key] = val
# Add _all_text field
fields[self.all_field_name] = {"type": "text"}
unique_boosts = set()
# Replace {"include_in_all": true} with {"copy_to": ["_all_text", "_all_text_boost_2"]}
def replace_include_in_all(properties):
for field_mapping in properties.values():
if "include_in_all" in field_mapping:
if field_mapping["include_in_all"]:
field_mapping["copy_to"] = self.all_field_name
if "boost" in field_mapping:
# added to unique_boosts to avoid duplicate fields, or cases like 2.0 and 2
unique_boosts.add(field_mapping["boost"])
field_mapping["copy_to"] = [
field_mapping["copy_to"],
self.get_boost_field_name(field_mapping["boost"]),
]
del field_mapping["boost"]
del field_mapping["include_in_all"]
if field_mapping["type"] == "nested":
replace_include_in_all(field_mapping["properties"])
replace_include_in_all(fields)
for boost in unique_boosts:
fields[self.get_boost_field_name(boost)] = {"type": "text"}
return {
self.get_document_type(): {
"properties": fields,
}
}
def get_document_id(self, obj):
return str(obj.pk)
def _get_nested_document(self, fields, obj):
doc = {}
edgengrams = []
model = type(obj)
mapping = type(self)(model)
for field in fields:
value = field.get_value(obj)
doc[mapping.get_field_column_name(field)] = value
# Check if this field should be added into _edgengrams
if isinstance(field, AutocompleteField):
edgengrams.append(value)
return doc, edgengrams
def get_document(self, obj):
# Build document
doc = {"pk": str(obj.pk), "content_type": self.get_all_content_types()}
edgengrams = []
for field in self.model.get_search_fields():
value = field.get_value(obj)
if isinstance(field, RelatedFields):
if isinstance(value, (models.Manager, models.QuerySet)):
nested_docs = []
for nested_obj in value.all():
nested_doc, extra_edgengrams = self._get_nested_document(
field.fields, nested_obj
)
nested_docs.append(nested_doc)
edgengrams.extend(extra_edgengrams)
value = nested_docs
elif isinstance(value, models.Model):
value, extra_edgengrams = self._get_nested_document(
field.fields, value
)
edgengrams.extend(extra_edgengrams)
elif isinstance(field, FilterField):
if isinstance(value, (models.Manager, models.QuerySet)):
value = list(value.values_list("pk", flat=True))
elif isinstance(value, models.Model):
value = value.pk
elif isinstance(value, (list, tuple)):
value = [
item.pk if isinstance(item, models.Model) else item
for item in value
]
doc[self.get_field_column_name(field)] = value
# Check if this field should be added into _edgengrams
if isinstance(field, AutocompleteField):
edgengrams.append(value)
# Add partials to document
doc[self.edgengrams_field_name] = edgengrams
return doc
def __repr__(self):
return f"<ElasticsearchMapping: {self.model.__name__}>"

Wyświetl plik

@ -3,7 +3,7 @@ from collections import OrderedDict
from copy import deepcopy
from urllib.parse import urlparse
from django.db import DEFAULT_DB_ALIAS
from django.db import DEFAULT_DB_ALIAS, models
from django.db.models.sql import Query
from django.db.models.sql.constants import MULTI
from django.utils.crypto import get_random_string
@ -18,21 +18,288 @@ from wagtail.search.backends.base import (
FilterFieldError,
get_model_root,
)
from wagtail.search.backends.elasticsearch6 import (
Elasticsearch6Mapping,
Field,
from wagtail.search.index import (
AutocompleteField,
FilterField,
Indexed,
RelatedFields,
SearchField,
class_is_indexed,
get_indexed_models,
)
from wagtail.search.index import class_is_indexed, get_indexed_models
from wagtail.search.query import And, Boost, Fuzzy, MatchAll, Not, Or, Phrase, PlainText
from wagtail.utils.utils import deep_update
use_new_elasticsearch_api = ELASTICSEARCH_VERSION >= (7, 15)
class Elasticsearch7Mapping(Elasticsearch6Mapping):
class Field:
def __init__(self, field_name, boost=1):
self.field_name = field_name
self.boost = boost
class Elasticsearch7Mapping:
all_field_name = "_all_text"
edgengrams_field_name = "_edgengrams"
type_map = {
"AutoField": "integer",
"BinaryField": "binary",
"BooleanField": "boolean",
"CharField": "string",
"CommaSeparatedIntegerField": "string",
"DateField": "date",
"DateTimeField": "date",
"DecimalField": "double",
"FileField": "string",
"FilePathField": "string",
"FloatField": "double",
"IntegerField": "integer",
"BigIntegerField": "long",
"IPAddressField": "string",
"GenericIPAddressField": "string",
"NullBooleanField": "boolean",
"PositiveIntegerField": "integer",
"PositiveSmallIntegerField": "integer",
"SlugField": "string",
"SmallIntegerField": "integer",
"TextField": "string",
"TimeField": "date",
}
keyword_type = "keyword"
text_type = "text"
edgengram_analyzer_config = {
"analyzer": "edgengram_analyzer",
"search_analyzer": "standard",
}
def __init__(self, model):
self.model = model
def get_parent(self):
for base in self.model.__bases__:
if issubclass(base, Indexed) and issubclass(base, models.Model):
return type(self)(base)
def get_document_type(self):
return "doc"
def get_field_column_name(self, field):
# Fields in derived models get prefixed with their model name, fields
# in the root model don't get prefixed at all
# This is to prevent mapping clashes in cases where two page types have
# a field with the same name but a different type.
root_model = get_model_root(self.model)
definition_model = field.get_definition_model(self.model)
if definition_model != root_model:
prefix = (
definition_model._meta.app_label.lower()
+ "_"
+ definition_model.__name__.lower()
+ "__"
)
else:
prefix = ""
if isinstance(field, FilterField):
return prefix + field.get_attname(self.model) + "_filter"
elif isinstance(field, AutocompleteField):
return prefix + field.get_attname(self.model) + "_edgengrams"
elif isinstance(field, SearchField):
return prefix + field.get_attname(self.model)
elif isinstance(field, RelatedFields):
return prefix + field.field_name
def get_boost_field_name(self, boost):
# replace . with _ to avoid issues with . in field names
boost = str(float(boost)).replace(".", "_")
return f"{self.all_field_name}_boost_{boost}"
def get_content_type(self):
"""
Returns the content type as a string for the model.
For example: "wagtailcore.Page"
"myapp.MyModel"
"""
return self.model._meta.app_label + "." + self.model.__name__
def get_all_content_types(self):
"""
Returns all the content type strings that apply to this model.
This includes the models' content type and all concrete ancestor
models that inherit from Indexed.
For example: ["myapp.MyPageModel", "wagtailcore.Page"]
["myapp.MyModel"]
"""
# Add our content type
content_types = [self.get_content_type()]
# Add all ancestor classes content types as well
ancestor = self.get_parent()
while ancestor:
content_types.append(ancestor.get_content_type())
ancestor = ancestor.get_parent()
return content_types
def get_field_mapping(self, field):
if isinstance(field, RelatedFields):
mapping = {"type": "nested", "properties": {}}
nested_model = field.get_field(self.model).related_model
nested_mapping = type(self)(nested_model)
for sub_field in field.fields:
sub_field_name, sub_field_mapping = nested_mapping.get_field_mapping(
sub_field
)
mapping["properties"][sub_field_name] = sub_field_mapping
return self.get_field_column_name(field), mapping
else:
mapping = {"type": self.type_map.get(field.get_type(self.model), "string")}
if isinstance(field, SearchField):
if mapping["type"] == "string":
mapping["type"] = self.text_type
if field.boost:
mapping["boost"] = field.boost
mapping["include_in_all"] = True
if isinstance(field, AutocompleteField):
mapping["type"] = self.text_type
mapping.update(self.edgengram_analyzer_config)
elif isinstance(field, FilterField):
if mapping["type"] == "string":
mapping["type"] = self.keyword_type
if "es_extra" in field.kwargs:
for key, value in field.kwargs["es_extra"].items():
mapping[key] = value
return self.get_field_column_name(field), mapping
def get_mapping(self):
mapping = super().get_mapping()
return mapping[self.get_document_type()]
# Make field list
fields = {
"pk": {"type": self.keyword_type, "store": True},
"content_type": {"type": self.keyword_type},
self.edgengrams_field_name: {"type": self.text_type},
}
fields[self.edgengrams_field_name].update(self.edgengram_analyzer_config)
for field in self.model.get_search_fields():
key, val = self.get_field_mapping(field)
fields[key] = val
# Add _all_text field
fields[self.all_field_name] = {"type": "text"}
unique_boosts = set()
# Replace {"include_in_all": true} with {"copy_to": ["_all_text", "_all_text_boost_2"]}
def replace_include_in_all(properties):
for field_mapping in properties.values():
if "include_in_all" in field_mapping:
if field_mapping["include_in_all"]:
field_mapping["copy_to"] = self.all_field_name
if "boost" in field_mapping:
# added to unique_boosts to avoid duplicate fields, or cases like 2.0 and 2
unique_boosts.add(field_mapping["boost"])
field_mapping["copy_to"] = [
field_mapping["copy_to"],
self.get_boost_field_name(field_mapping["boost"]),
]
del field_mapping["boost"]
del field_mapping["include_in_all"]
if field_mapping["type"] == "nested":
replace_include_in_all(field_mapping["properties"])
replace_include_in_all(fields)
for boost in unique_boosts:
fields[self.get_boost_field_name(boost)] = {"type": "text"}
return {
"properties": fields,
}
def get_document_id(self, obj):
return str(obj.pk)
def _get_nested_document(self, fields, obj):
doc = {}
edgengrams = []
model = type(obj)
mapping = type(self)(model)
for field in fields:
value = field.get_value(obj)
doc[mapping.get_field_column_name(field)] = value
# Check if this field should be added into _edgengrams
if isinstance(field, AutocompleteField):
edgengrams.append(value)
return doc, edgengrams
def get_document(self, obj):
# Build document
doc = {"pk": str(obj.pk), "content_type": self.get_all_content_types()}
edgengrams = []
for field in self.model.get_search_fields():
value = field.get_value(obj)
if isinstance(field, RelatedFields):
if isinstance(value, (models.Manager, models.QuerySet)):
nested_docs = []
for nested_obj in value.all():
nested_doc, extra_edgengrams = self._get_nested_document(
field.fields, nested_obj
)
nested_docs.append(nested_doc)
edgengrams.extend(extra_edgengrams)
value = nested_docs
elif isinstance(value, models.Model):
value, extra_edgengrams = self._get_nested_document(
field.fields, value
)
edgengrams.extend(extra_edgengrams)
elif isinstance(field, FilterField):
if isinstance(value, (models.Manager, models.QuerySet)):
value = list(value.values_list("pk", flat=True))
elif isinstance(value, models.Model):
value = value.pk
elif isinstance(value, (list, tuple)):
value = [
item.pk if isinstance(item, models.Model) else item
for item in value
]
doc[self.get_field_column_name(field)] = value
# Check if this field should be added into _edgengrams
if isinstance(field, AutocompleteField):
edgengrams.append(value)
# Add partials to document
doc[self.edgengrams_field_name] = edgengrams
return doc
def __repr__(self):
return f"<ElasticsearchMapping: {self.model.__name__}>"
class Elasticsearch7Index:
@ -464,7 +731,7 @@ class Elasticsearch7SearchQueryCompiler(BaseSearchQueryCompiler):
def get_content_type_filter(self):
# Query content_type using a "match" query. See comment in
# Elasticsearch6Mapping.get_document for more details
# Elasticsearch7Mapping.get_document for more details
content_type = self.mapping_class(self.queryset.model).get_content_type()
return {"match": {"content_type": content_type}}