kopia lustrzana https://dev.funkwhale.audio/funkwhale/funkwhale
See #344: query parsing
rodzic
b0c9eb8cef
commit
d713ad177b
|
@ -1,7 +1,7 @@
|
|||
import django_filters
|
||||
from django.db import models
|
||||
|
||||
from funkwhale_api.music import utils
|
||||
from . import search
|
||||
|
||||
PRIVACY_LEVEL_CHOICES = [
|
||||
("me", "Only me"),
|
||||
|
@ -34,5 +34,17 @@ class SearchFilter(django_filters.CharFilter):
|
|||
def filter(self, qs, value):
|
||||
if not value:
|
||||
return qs
|
||||
query = utils.get_query(value, self.search_fields)
|
||||
query = search.get_query(value, self.search_fields)
|
||||
return qs.filter(query)
|
||||
|
||||
|
||||
class SmartSearchFilter(django_filters.CharFilter):
|
||||
def __init__(self, *args, **kwargs):
|
||||
self.config = kwargs.pop("config")
|
||||
super().__init__(*args, **kwargs)
|
||||
|
||||
def filter(self, qs, value):
|
||||
if not value:
|
||||
return qs
|
||||
cleaned = self.config.clean(value)
|
||||
return search.apply(qs, cleaned)
|
||||
|
|
|
@ -0,0 +1,130 @@
|
|||
import re
|
||||
|
||||
from django.db.models import Q
|
||||
|
||||
|
||||
QUERY_REGEX = re.compile('(((?P<key>\w+):)?(?P<value>"[^"]+"|[\S]+))')
|
||||
|
||||
|
||||
def parse_query(query):
|
||||
"""
|
||||
Given a search query such as "hello is:issue status:opened",
|
||||
returns a list of dictionnaries discribing each query token
|
||||
"""
|
||||
matches = [m.groupdict() for m in QUERY_REGEX.finditer(query.lower())]
|
||||
for m in matches:
|
||||
if m["value"].startswith('"') and m["value"].endswith('"'):
|
||||
m["value"] = m["value"][1:-1]
|
||||
return matches
|
||||
|
||||
|
||||
def normalize_query(
|
||||
query_string,
|
||||
findterms=re.compile(r'"([^"]+)"|(\S+)').findall,
|
||||
normspace=re.compile(r"\s{2,}").sub,
|
||||
):
|
||||
""" Splits the query string in invidual keywords, getting rid of unecessary spaces
|
||||
and grouping quoted words together.
|
||||
Example:
|
||||
|
||||
>>> normalize_query(' some random words "with quotes " and spaces')
|
||||
['some', 'random', 'words', 'with quotes', 'and', 'spaces']
|
||||
|
||||
"""
|
||||
return [normspace(" ", (t[0] or t[1]).strip()) for t in findterms(query_string)]
|
||||
|
||||
|
||||
def get_query(query_string, search_fields):
|
||||
""" Returns a query, that is a combination of Q objects. That combination
|
||||
aims to search keywords within a model by testing the given search fields.
|
||||
|
||||
"""
|
||||
query = None # Query to search for every search term
|
||||
terms = normalize_query(query_string)
|
||||
for term in terms:
|
||||
or_query = None # Query to search for a given term in each field
|
||||
for field_name in search_fields:
|
||||
q = Q(**{"%s__icontains" % field_name: term})
|
||||
if or_query is None:
|
||||
or_query = q
|
||||
else:
|
||||
or_query = or_query | q
|
||||
if query is None:
|
||||
query = or_query
|
||||
else:
|
||||
query = query & or_query
|
||||
return query
|
||||
|
||||
|
||||
def filter_tokens(tokens, valid):
|
||||
return [t for t in tokens if t["key"] in valid]
|
||||
|
||||
|
||||
def apply(qs, config_data):
|
||||
for k in ["filter_query", "search_query"]:
|
||||
q = config_data.get(k)
|
||||
if q:
|
||||
qs = qs.filter(q)
|
||||
return qs
|
||||
|
||||
|
||||
class SearchConfig:
|
||||
def __init__(self, search_fields={}, filter_fields={}, types=[]):
|
||||
self.filter_fields = filter_fields
|
||||
self.search_fields = search_fields
|
||||
self.types = types
|
||||
|
||||
def clean(self, query):
|
||||
tokens = parse_query(query)
|
||||
cleaned_data = {}
|
||||
|
||||
cleaned_data["types"] = self.clean_types(filter_tokens(tokens, ["is"]))
|
||||
cleaned_data["search_query"] = self.clean_search_query(
|
||||
filter_tokens(tokens, [None, "in"])
|
||||
)
|
||||
unhandled_tokens = [t for t in tokens if t["key"] not in [None, "is", "in"]]
|
||||
cleaned_data["filter_query"] = self.clean_filter_query(unhandled_tokens)
|
||||
return cleaned_data
|
||||
|
||||
def clean_search_query(self, tokens):
|
||||
if not self.search_fields or not tokens:
|
||||
return
|
||||
|
||||
fields_subset = {
|
||||
f for t in filter_tokens(tokens, ["in"]) for f in t["value"].split(",")
|
||||
} or set(self.search_fields.keys())
|
||||
fields_subset = set(self.search_fields.keys()) & fields_subset
|
||||
to_fields = [self.search_fields[k]["to"] for k in fields_subset]
|
||||
query_string = " ".join([t["value"] for t in filter_tokens(tokens, [None])])
|
||||
return get_query(query_string, sorted(to_fields))
|
||||
|
||||
def clean_filter_query(self, tokens):
|
||||
if not self.filter_fields or not tokens:
|
||||
return
|
||||
|
||||
matching = [t for t in tokens if t["key"] in self.filter_fields]
|
||||
queries = [
|
||||
Q(**{self.filter_fields[t["key"]]["to"]: t["value"]}) for t in matching
|
||||
]
|
||||
query = None
|
||||
for q in queries:
|
||||
if not query:
|
||||
query = q
|
||||
else:
|
||||
query = query & q
|
||||
return query
|
||||
|
||||
def clean_types(self, tokens):
|
||||
if not self.types:
|
||||
return []
|
||||
|
||||
if not tokens:
|
||||
# no filtering on type, we return all types
|
||||
return [t for key, t in self.types]
|
||||
types = []
|
||||
for token in tokens:
|
||||
for key, t in self.types:
|
||||
if key.lower() == token["value"]:
|
||||
types.append(t)
|
||||
|
||||
return types
|
|
@ -1,47 +1,9 @@
|
|||
import mimetypes
|
||||
import re
|
||||
|
||||
import magic
|
||||
import mutagen
|
||||
from django.db.models import Q
|
||||
|
||||
|
||||
def normalize_query(
|
||||
query_string,
|
||||
findterms=re.compile(r'"([^"]+)"|(\S+)').findall,
|
||||
normspace=re.compile(r"\s{2,}").sub,
|
||||
):
|
||||
""" Splits the query string in invidual keywords, getting rid of unecessary spaces
|
||||
and grouping quoted words together.
|
||||
Example:
|
||||
|
||||
>>> normalize_query(' some random words "with quotes " and spaces')
|
||||
['some', 'random', 'words', 'with quotes', 'and', 'spaces']
|
||||
|
||||
"""
|
||||
return [normspace(" ", (t[0] or t[1]).strip()) for t in findterms(query_string)]
|
||||
|
||||
|
||||
def get_query(query_string, search_fields):
|
||||
""" Returns a query, that is a combination of Q objects. That combination
|
||||
aims to search keywords within a model by testing the given search fields.
|
||||
|
||||
"""
|
||||
query = None # Query to search for every search term
|
||||
terms = normalize_query(query_string)
|
||||
for term in terms:
|
||||
or_query = None # Query to search for a given term in each field
|
||||
for field_name in search_fields:
|
||||
q = Q(**{"%s__icontains" % field_name: term})
|
||||
if or_query is None:
|
||||
or_query = q
|
||||
else:
|
||||
or_query = or_query | q
|
||||
if query is None:
|
||||
query = or_query
|
||||
else:
|
||||
query = query & or_query
|
||||
return query
|
||||
from funkwhale_api.common.search import normalize_query, get_query # noqa
|
||||
|
||||
|
||||
def guess_mimetype(f):
|
||||
|
|
|
@ -0,0 +1,83 @@
|
|||
import pytest
|
||||
|
||||
from django.db.models import Q
|
||||
|
||||
from funkwhale_api.common import search
|
||||
from funkwhale_api.music import models as music_models
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"query,expected",
|
||||
[
|
||||
("", [music_models.Album, music_models.Artist]),
|
||||
("is:album", [music_models.Album]),
|
||||
("is:artist is:album", [music_models.Artist, music_models.Album]),
|
||||
],
|
||||
)
|
||||
def test_search_config_is(query, expected):
|
||||
s = search.SearchConfig(
|
||||
types=[("album", music_models.Album), ("artist", music_models.Artist)]
|
||||
)
|
||||
|
||||
cleaned = s.clean(query)
|
||||
assert cleaned["types"] == expected
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"query,expected",
|
||||
[
|
||||
("", None),
|
||||
("hello world", search.get_query("hello world", ["f1", "f2", "f3"])),
|
||||
("hello in:field2", search.get_query("hello", ["f2"])),
|
||||
("hello in:field1,field2", search.get_query("hello", ["f1", "f2"])),
|
||||
],
|
||||
)
|
||||
def test_search_config_query(query, expected):
|
||||
s = search.SearchConfig(
|
||||
search_fields={
|
||||
"field1": {"to": "f1"},
|
||||
"field2": {"to": "f2"},
|
||||
"field3": {"to": "f3"},
|
||||
}
|
||||
)
|
||||
|
||||
cleaned = s.clean(query)
|
||||
assert cleaned["search_query"] == expected
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"query,expected",
|
||||
[
|
||||
("", None),
|
||||
("status:pending", Q(status="pending")),
|
||||
('user:"silent bob"', Q(user__username__iexact="silent bob")),
|
||||
(
|
||||
"user:me status:pending",
|
||||
Q(user__username__iexact="me") & Q(status="pending"),
|
||||
),
|
||||
],
|
||||
)
|
||||
def test_search_config_filter(query, expected):
|
||||
s = search.SearchConfig(
|
||||
filter_fields={
|
||||
"user": {"to": "user__username__iexact"},
|
||||
"status": {"to": "status"},
|
||||
}
|
||||
)
|
||||
|
||||
cleaned = s.clean(query)
|
||||
assert cleaned["filter_query"] == expected
|
||||
|
||||
|
||||
def test_apply():
|
||||
cleaned = {
|
||||
"filter_query": Q(batch__submitted_by__username__iexact="me"),
|
||||
"search_query": Q(source="test"),
|
||||
}
|
||||
result = search.apply(music_models.ImportJob.objects.all(), cleaned)
|
||||
|
||||
assert str(result.query) == str(
|
||||
music_models.ImportJob.objects.filter(
|
||||
Q(batch__submitted_by__username__iexact="me"), Q(source="test")
|
||||
).query
|
||||
)
|
Ładowanie…
Reference in New Issue