Extract facet code out into a new plugin hook, closes #427 (#445)

Datasette previously only supported one type of faceting: exact column value counting.

With this change, faceting logic is extracted out into one or more separate classes which can implement other patterns of faceting - this is discussed in #427, but potential upcoming facet types include facet-by-date, facet-by-JSON-array, facet-by-many-2-many and more.

A new plugin hook, register_facet_classes, can be used by plugins to add in additional facet classes.

Each class must implement two methods: suggest(), which scans columns in the table to decide if they might be worth suggesting for faceting, and facet_results(), which executes the facet operation and returns results ready to be displayed in the UI.
pull/434/head^2
Simon Willison 2019-05-02 17:11:26 -07:00 zatwierdzone przez GitHub
rodzic efc93b8ab5
commit ea66c45df9
Nie znaleziono w bazie danych klucza dla tego podpisu
ID klucza GPG: 4AEE18F83AFDEB23
10 zmienionych plików z 599 dodań i 131 usunięć

Wyświetl plik

@ -676,6 +676,7 @@ class Datasette:
truncate=False,
custom_time_limit=None,
page_size=None,
log_sql_errors=True,
):
"""Executes sql against db_name in a thread"""
page_size = page_size or self.page_size
@ -701,12 +702,13 @@ class Datasette:
truncated = False
except sqlite3.OperationalError as e:
if e.args == ('interrupted',):
raise InterruptedError(e)
print(
"ERROR: conn={}, sql = {}, params = {}: {}".format(
conn, repr(sql), params, e
raise InterruptedError(e, sql, params)
if log_sql_errors:
print(
"ERROR: conn={}, sql = {}, params = {}: {}".format(
conn, repr(sql), params, e
)
)
)
raise
if truncate:

251
datasette/facets.py 100644
Wyświetl plik

@ -0,0 +1,251 @@
import json
import urllib
import re
from datasette import hookimpl
from datasette.utils import (
escape_sqlite,
get_all_foreign_keys,
path_with_added_args,
path_with_removed_args,
detect_json1,
InterruptedError,
InvalidSql,
sqlite3,
)
def load_facet_configs(request, table_metadata):
# Given a request and the metadata configuration for a table, return
# a dictionary of selected facets, their lists of configs and for each
# config whether it came from the request or the metadata.
#
# return {type: [
# {"source": "metadata", "config": config1},
# {"source": "request", "config": config2}]}
facet_configs = {}
table_metadata = table_metadata or {}
metadata_facets = table_metadata.get("facets", [])
for metadata_config in metadata_facets:
if isinstance(metadata_config, str):
type = "column"
metadata_config = {"simple": metadata_config}
else:
# This should have a single key and a single value
assert len(metadata_config.values()) == 1, "Metadata config dicts should be {type: config}"
type, metadata_config = metadata_config.items()[0]
if isinstance(metadata_config, str):
metadata_config = {"simple": metadata_config}
facet_configs.setdefault(type, []).append({
"source": "metadata",
"config": metadata_config
})
qs_pairs = urllib.parse.parse_qs(request.query_string, keep_blank_values=True)
for key, values in qs_pairs.items():
if key.startswith("_facet"):
# Figure out the facet type
if key == "_facet":
type = "column"
elif key.startswith("_facet_"):
type = key[len("_facet_") :]
for value in values:
# The value is the config - either JSON or not
if value.startswith("{"):
config = json.loads(value)
else:
config = {"simple": value}
facet_configs.setdefault(type, []).append({
"source": "request",
"config": config
})
return facet_configs
@hookimpl
def register_facet_classes():
return [ColumnFacet]
class Facet:
type = None
def __init__(
self,
ds,
request,
database,
sql=None,
table=None,
params=None,
metadata=None,
row_count=None,
):
assert table or sql, "Must provide either table= or sql="
self.ds = ds
self.request = request
self.database = database
# For foreign key expansion. Can be None for e.g. canned SQL queries:
self.table = table
self.sql = sql or "select * from [{}]".format(table)
self.params = params or []
self.metadata = metadata
# row_count can be None, in which case we calculate it ourselves:
self.row_count = row_count
def get_configs(self):
configs = load_facet_configs(self.request, self.metadata)
return configs.get(self.type) or []
def get_querystring_pairs(self):
# ?_foo=bar&_foo=2&empty= becomes:
# [('_foo', 'bar'), ('_foo', '2'), ('empty', '')]
return urllib.parse.parse_qsl(self.request.query_string, keep_blank_values=True)
async def suggest(self):
return []
async def facet_results(self):
# returns ([results], [timed_out])
# TODO: Include "hideable" with each one somehow, which indicates if it was
# defined in metadata (in which case you cannot turn it off)
raise NotImplementedError
async def get_columns(self, sql, params=None):
# Detect column names using the "limit 0" trick
return (
await self.ds.execute(
self.database, "select * from ({}) limit 0".format(sql), params or []
)
).columns
async def get_row_count(self):
if self.row_count is None:
self.row_count = (
await self.ds.execute(
self.database,
"select count(*) from ({})".format(self.sql),
self.params,
)
).rows[0][0]
return self.row_count
class ColumnFacet(Facet):
type = "column"
async def suggest(self):
row_count = await self.get_row_count()
columns = await self.get_columns(self.sql, self.params)
facet_size = self.ds.config("default_facet_size")
suggested_facets = []
already_enabled = [c["config"]["simple"] for c in self.get_configs()]
for column in columns:
if column in already_enabled:
continue
suggested_facet_sql = """
select distinct {column} from (
{sql}
) where {column} is not null
limit {limit}
""".format(
column=escape_sqlite(column), sql=self.sql, limit=facet_size + 1
)
distinct_values = None
try:
distinct_values = await self.ds.execute(
self.database,
suggested_facet_sql,
self.params,
truncate=False,
custom_time_limit=self.ds.config("facet_suggest_time_limit_ms"),
)
num_distinct_values = len(distinct_values)
if (
num_distinct_values
and num_distinct_values > 1
and num_distinct_values <= facet_size
and num_distinct_values < row_count
):
suggested_facets.append(
{
"name": column,
"toggle_url": self.ds.absolute_url(
self.request,
path_with_added_args(self.request, {"_facet": column}),
),
}
)
except InterruptedError:
continue
return suggested_facets
async def facet_results(self):
facet_results = {}
facets_timed_out = []
qs_pairs = self.get_querystring_pairs()
facet_size = self.ds.config("default_facet_size")
for source_and_config in self.get_configs():
config = source_and_config["config"]
source = source_and_config["source"]
column = config.get("column") or config["simple"]
facet_sql = """
select {col} as value, count(*) as count from (
{sql}
)
where {col} is not null
group by {col} order by count desc limit {limit}
""".format(
col=escape_sqlite(column), sql=self.sql, limit=facet_size + 1
)
try:
facet_rows_results = await self.ds.execute(
self.database,
facet_sql,
self.params,
truncate=False,
custom_time_limit=self.ds.config("facet_time_limit_ms"),
)
facet_results_values = []
facet_results[column] = {
"name": column,
"type": self.type,
"hideable": source != "metadata",
"toggle_url": path_with_removed_args(self.request, {"_facet": column}),
"results": facet_results_values,
"truncated": len(facet_rows_results) > facet_size,
}
facet_rows = facet_rows_results.rows[:facet_size]
if self.table:
# Attempt to expand foreign keys into labels
values = [row["value"] for row in facet_rows]
expanded = await self.ds.expand_foreign_keys(
self.database, self.table, column, values
)
else:
expanded = {}
for row in facet_rows:
selected = (column, str(row["value"])) in qs_pairs
if selected:
toggle_path = path_with_removed_args(
self.request, {column: str(row["value"])}
)
else:
toggle_path = path_with_added_args(
self.request, {column: row["value"]}
)
facet_results_values.append(
{
"value": row["value"],
"label": expanded.get((column, row["value"]), row["value"]),
"count": row["count"],
"toggle_url": self.ds.absolute_url(
self.request, toggle_path
),
"selected": selected,
}
)
except InterruptedError:
facets_timed_out.append(column)
return facet_results, facets_timed_out

Wyświetl plik

@ -43,3 +43,8 @@ def render_cell(value, column, table, database, datasette):
@hookspec
def register_output_renderer(datasette):
"Register a renderer to output data in a different format"
@hookspec
def register_facet_classes():
"Register Facet subclasses"

Wyświetl plik

@ -3,7 +3,11 @@ import pluggy
import sys
from . import hookspecs
DEFAULT_PLUGINS = ("datasette.publish.heroku", "datasette.publish.now")
DEFAULT_PLUGINS = (
"datasette.publish.heroku",
"datasette.publish.now",
"datasette.facets",
)
pm = pluggy.PluginManager("datasette")
pm.add_hookspecs(hookspecs)

Wyświetl plik

@ -110,7 +110,7 @@
{% if suggested_facets %}
<p class="suggested-facets">
Suggested facets: {% for facet in suggested_facets %}<a href="{{ facet.toggle_url }}#facet-{{ facet.name|to_css_class }}">{{ facet.name }}</a>{% if not loop.last %}, {% endif %}{% endfor %}
Suggested facets: {% for facet in suggested_facets %}<a href="{{ facet.toggle_url }}#facet-{{ facet.name|to_css_class }}">{{ facet.name }}</a>{% if facet.type %} ({{ facet.type }}){% endif %}{% if not loop.last %}, {% endif %}{% endfor %}
</p>
{% endif %}
@ -123,9 +123,9 @@
{% for facet_info in sorted_facet_results %}
<div class="facet-info facet-{{ database|to_css_class }}-{{ table|to_css_class }}-{{ facet_info.name|to_css_class }}" id="facet-{{ facet_info.name|to_css_class }}">
<p class="facet-info-name">
<strong>{{ facet_info.name }}</strong>
{% if facet_hideable(facet_info.name) %}
<a href="{{ path_with_removed_args(request, {'_facet': facet_info['name']}) }}" class="cross">&#x2716;</a>
<strong>{{ facet_info.name }}{% if facet_info.type != "column" %} ({{ facet_info.type }}){% endif %}</strong>
{% if facet_info.hideable %}
<a href="{{ facet_info.toggle_url }}" class="cross">&#x2716;</a>
{% endif %}
</p>
<ul>

Wyświetl plik

@ -1,9 +1,11 @@
import urllib
import itertools
import jinja2
from sanic.exceptions import NotFound
from sanic.request import RequestParameters
from datasette.facets import load_facet_configs
from datasette.plugins import pm
from datasette.utils import (
CustomRow,
@ -344,9 +346,8 @@ class TableView(RowTableShared):
"where {} ".format(" and ".join(where_clauses))
) if where_clauses else "",
)
# Store current params and where_clauses for later:
# Copy of params so we can mutate them later:
from_sql_params = dict(**params)
from_sql_where_clauses = where_clauses[:]
count_sql = "select count(*) {}".format(from_sql)
@ -458,11 +459,14 @@ class TableView(RowTableShared):
else:
page_size = self.ds.page_size
sql = "select {select} from {table_name} {where}{order_by}limit {limit}{offset}".format(
sql_no_limit = "select {select} from {table_name} {where}{order_by}".format(
select=select,
table_name=escape_sqlite(table),
where=where_clause,
order_by=order_by,
)
sql = "{sql_no_limit} limit {limit}{offset}".format(
sql_no_limit=sql_no_limit.rstrip(),
limit=page_size + 1,
offset=offset,
)
@ -474,72 +478,46 @@ class TableView(RowTableShared):
database, sql, params, truncate=True, **extra_args
)
# Number of filtered rows in whole set:
filtered_table_rows_count = None
if count_sql:
try:
count_rows = list(await self.ds.execute(
database, count_sql, from_sql_params
))
filtered_table_rows_count = count_rows[0][0]
except InterruptedError:
pass
# facets support
facet_size = self.ds.config("default_facet_size")
metadata_facets = table_metadata.get("facets", [])
facets = metadata_facets[:]
if request.args.get("_facet") and not self.ds.config("allow_facet"):
if not self.ds.config("allow_facet") and any(arg.startswith("_facet") for arg in request.args):
raise DatasetteError("_facet= is not allowed", status=400)
try:
facets.extend(request.args["_facet"])
except KeyError:
pass
# pylint: disable=no-member
facet_classes = list(
itertools.chain.from_iterable(pm.hook.register_facet_classes())
)
facet_results = {}
facets_timed_out = []
for column in facets:
if _next:
continue
facet_sql = """
select {col} as value, count(*) as count
{from_sql} {and_or_where} {col} is not null
group by {col} order by count desc limit {limit}
""".format(
col=escape_sqlite(column),
from_sql=from_sql,
and_or_where='and' if from_sql_where_clauses else 'where',
limit=facet_size+1,
)
try:
facet_rows_results = await self.ds.execute(
database, facet_sql, params,
truncate=False,
custom_time_limit=self.ds.config("facet_time_limit_ms"),
)
facet_results_values = []
facet_results[column] = {
"name": column,
"results": facet_results_values,
"truncated": len(facet_rows_results) > facet_size,
}
facet_rows = facet_rows_results.rows[:facet_size]
# Attempt to expand foreign keys into labels
values = [row["value"] for row in facet_rows]
expanded = (await self.ds.expand_foreign_keys(
database, table, column, values
))
for row in facet_rows:
selected = (column, str(row["value"])) in other_args
if selected:
toggle_path = path_with_removed_args(
request, {column: str(row["value"])}
)
else:
toggle_path = path_with_added_args(
request, {column: row["value"]}
)
facet_results_values.append({
"value": row["value"],
"label": expanded.get(
(column, row["value"]),
row["value"]
),
"count": row["count"],
"toggle_url": self.ds.absolute_url(request, toggle_path),
"selected": selected,
})
except InterruptedError:
facets_timed_out.append(column)
facet_instances = []
for klass in facet_classes:
facet_instances.append(klass(
self.ds,
request,
database,
sql=sql_no_limit,
params=params,
table=table,
metadata=table_metadata,
row_count=filtered_table_rows_count,
))
for facet in facet_instances:
instance_facet_results, instance_facets_timed_out = await facet.facet_results()
facet_results.update(instance_facet_results)
facets_timed_out.extend(instance_facets_timed_out)
# Figure out columns and rows for the query
columns = [r[0] for r in results.description]
rows = list(results.rows)
@ -623,61 +601,14 @@ class TableView(RowTableShared):
)
rows = rows[:page_size]
# Number of filtered rows in whole set:
filtered_table_rows_count = None
if count_sql:
try:
count_rows = list(await self.ds.execute(
database, count_sql, from_sql_params
))
filtered_table_rows_count = count_rows[0][0]
except InterruptedError:
pass
# Detect suggested facets
suggested_facets = []
# Detect suggested facets
suggested_facets = []
if self.ds.config("suggest_facets") and self.ds.config("allow_facet"):
for facet_column in columns:
if facet_column in facets:
continue
if _next:
continue
if not self.ds.config("suggest_facets"):
continue
suggested_facet_sql = '''
select distinct {column} {from_sql}
{and_or_where} {column} is not null
limit {limit}
'''.format(
column=escape_sqlite(facet_column),
from_sql=from_sql,
and_or_where='and' if from_sql_where_clauses else 'where',
limit=facet_size+1
)
distinct_values = None
try:
distinct_values = await self.ds.execute(
database, suggested_facet_sql, from_sql_params,
truncate=False,
custom_time_limit=self.ds.config("facet_suggest_time_limit_ms"),
)
num_distinct_values = len(distinct_values)
if (
num_distinct_values and
num_distinct_values > 1 and
num_distinct_values <= facet_size and
num_distinct_values < filtered_table_rows_count
):
suggested_facets.append({
'name': facet_column,
'toggle_url': self.ds.absolute_url(
request, path_with_added_args(
request, {"_facet": facet_column}
)
),
})
except InterruptedError:
pass
if self.ds.config("suggest_facets") and self.ds.config("allow_facet") and not _next:
for facet in facet_instances:
# TODO: ensure facet is not suggested if it is already active
# used to use 'if facet_column in facets' for this
suggested_facets.extend(await facet.suggest())
# human_description_en combines filters AND search, if provided
human_description_en = filters.human_description_en(extra=search_descriptions)
@ -725,7 +656,6 @@ class TableView(RowTableShared):
),
"extra_wheres_for_ui": extra_wheres_for_ui,
"form_hidden_args": form_hidden_args,
"facet_hideable": lambda facet: facet not in metadata_facets,
"is_sortable": any(c["sortable"] for c in display_columns),
"path_with_replaced_args": path_with_replaced_args,
"path_with_removed_args": path_with_removed_args,

Wyświetl plik

@ -601,3 +601,68 @@ A simple example of an output renderer callback function:
return {
'body': 'Hello World'
}
.. _plugin_register_facet_classes:
register_facet_classes()
~~~~~~~~~~~~~~~~~~~~~~~~
Return a list of additional Facet subclasses to be registered.
Each Facet subclass implements a new type of facet operation. The class should look like this:
.. code-block:: python
class SpecialFacet(Facet):
# This key must be unique across all facet classes:
type = "special"
async def suggest(self, sql, params, filtered_table_rows_count):
suggested_facets = []
# Perform calculations to suggest facets
suggested_facets.append({
"name": column, # Or other unique name
# Construct the URL that will enable this facet:
"toggle_url": self.ds.absolute_url(
self.request, path_with_added_args(
self.request, {"_facet": column}
)
),
})
return suggested_facets
async def facet_results(self, sql, params):
# This should execute the facet operation and return results
facet_results = {}
facets_timed_out = []
# Do some calculations here...
for column in columns_selected_for_facet:
try:
facet_results_values = []
# More calculations...
facet_results_values.append({
"value": value,
"label": label,
"count": count,
"toggle_url": self.ds.absolute_url(self.request, toggle_path),
"selected": selected,
})
facet_results[column] = {
"name": column,
"results": facet_results_values,
"truncated": len(facet_rows_results) > facet_size,
}
except InterruptedError:
facets_timed_out.append(column)
return facet_results, facets_timed_out
See ``datasette/facets.py`` for examples of how these classes can work.
The plugin hook can then be used to register the new facet class like this:
.. code-block:: python
@hookimpl
def register_facet_classes():
return [SpecialFacet]

Wyświetl plik

@ -1129,6 +1129,9 @@ def test_page_size_matching_max_returned_rows(app_client_returned_rows_matches_p
{
"state": {
"name": "state",
"hideable": True,
"type": "column",
"toggle_url": "/fixtures/facetable.json?_facet=city_id",
"results": [
{
"value": "CA",
@ -1156,6 +1159,9 @@ def test_page_size_matching_max_returned_rows(app_client_returned_rows_matches_p
},
"city_id": {
"name": "city_id",
"hideable": True,
"type": "column",
"toggle_url": "/fixtures/facetable.json?_facet=state",
"results": [
{
"value": 1,
@ -1194,6 +1200,9 @@ def test_page_size_matching_max_returned_rows(app_client_returned_rows_matches_p
{
"state": {
"name": "state",
"hideable": True,
"type": "column",
"toggle_url": "/fixtures/facetable.json?_facet=city_id&state=MI",
"results": [
{
"value": "MI",
@ -1207,6 +1216,9 @@ def test_page_size_matching_max_returned_rows(app_client_returned_rows_matches_p
},
"city_id": {
"name": "city_id",
"hideable": True,
"type": "column",
"toggle_url": "/fixtures/facetable.json?_facet=state&state=MI",
"results": [
{
"value": 3,
@ -1224,6 +1236,9 @@ def test_page_size_matching_max_returned_rows(app_client_returned_rows_matches_p
{
"planet_int": {
"name": "planet_int",
"hideable": True,
"type": "column",
"toggle_url": "/fixtures/facetable.json",
"results": [
{
"value": 1,
@ -1249,6 +1264,9 @@ def test_page_size_matching_max_returned_rows(app_client_returned_rows_matches_p
{
"planet_int": {
"name": "planet_int",
"hideable": True,
"type": "column",
"toggle_url": "/fixtures/facetable.json?planet_int=1",
"results": [
{
"value": 1,
@ -1276,9 +1294,20 @@ def test_facets(app_client, path, expected_facet_results):
def test_suggested_facets(app_client):
assert len(app_client.get(
suggestions = [{
"name": suggestion["name"],
"querystring": suggestion["toggle_url"].split("?")[-1]
} for suggestion in app_client.get(
"/fixtures/facetable.json"
).json["suggested_facets"]) > 0
).json["suggested_facets"]]
assert [
{"name": "planet_int", "querystring": "_facet=planet_int"},
{"name": "on_earth", "querystring": "_facet=on_earth"},
{"name": "state", "querystring": "_facet=state"},
{"name": "city_id", "querystring": "_facet=city_id"},
{"name": "neighborhood", "querystring": "_facet=neighborhood"},
{"name": "tags", "querystring": "_facet=tags"}
] == suggestions
def test_allow_facet_off():

Wyświetl plik

@ -0,0 +1,174 @@
from datasette.facets import ColumnFacet
from .fixtures import app_client # noqa
from .utils import MockRequest
from collections import namedtuple
import pytest
@pytest.mark.asyncio
async def test_column_facet_suggest(app_client):
facet = ColumnFacet(
app_client.ds,
MockRequest("http://localhost/"),
database="fixtures",
sql="select * from facetable",
table="facetable",
)
suggestions = await facet.suggest()
assert [
{"name": "planet_int", "toggle_url": "http://localhost/?_facet=planet_int"},
{"name": "on_earth", "toggle_url": "http://localhost/?_facet=on_earth"},
{"name": "state", "toggle_url": "http://localhost/?_facet=state"},
{"name": "city_id", "toggle_url": "http://localhost/?_facet=city_id"},
{"name": "neighborhood", "toggle_url": "http://localhost/?_facet=neighborhood"},
{"name": "tags", "toggle_url": "http://localhost/?_facet=tags"},
] == suggestions
@pytest.mark.asyncio
async def test_column_facet_suggest_skip_if_already_selected(app_client):
facet = ColumnFacet(
app_client.ds,
MockRequest("http://localhost/?_facet=planet_int&_facet=on_earth"),
database="fixtures",
sql="select * from facetable",
table="facetable",
)
suggestions = await facet.suggest()
assert [
{
"name": "state",
"toggle_url": "http://localhost/?_facet=planet_int&_facet=on_earth&_facet=state",
},
{
"name": "city_id",
"toggle_url": "http://localhost/?_facet=planet_int&_facet=on_earth&_facet=city_id",
},
{
"name": "neighborhood",
"toggle_url": "http://localhost/?_facet=planet_int&_facet=on_earth&_facet=neighborhood",
},
{
"name": "tags",
"toggle_url": "http://localhost/?_facet=planet_int&_facet=on_earth&_facet=tags",
},
] == suggestions
@pytest.mark.asyncio
async def test_column_facet_suggest_skip_if_enabled_by_metadata(app_client):
facet = ColumnFacet(
app_client.ds,
MockRequest("http://localhost/"),
database="fixtures",
sql="select * from facetable",
table="facetable",
metadata={"facets": ["city_id"]},
)
suggestions = [s["name"] for s in await facet.suggest()]
assert ["planet_int", "on_earth", "state", "neighborhood", "tags"] == suggestions
@pytest.mark.asyncio
async def test_column_facet_results(app_client):
facet = ColumnFacet(
app_client.ds,
MockRequest("http://localhost/?_facet=city_id"),
database="fixtures",
sql="select * from facetable",
table="facetable",
)
buckets, timed_out = await facet.facet_results()
assert [] == timed_out
assert {
"city_id": {
"name": "city_id",
"type": "column",
"hideable": True,
"toggle_url": "/",
"results": [
{
"value": 1,
"label": "San Francisco",
"count": 6,
"toggle_url": "http://localhost/?_facet=city_id&city_id=1",
"selected": False,
},
{
"value": 2,
"label": "Los Angeles",
"count": 4,
"toggle_url": "http://localhost/?_facet=city_id&city_id=2",
"selected": False,
},
{
"value": 3,
"label": "Detroit",
"count": 4,
"toggle_url": "http://localhost/?_facet=city_id&city_id=3",
"selected": False,
},
{
"value": 4,
"label": "Memnonia",
"count": 1,
"toggle_url": "http://localhost/?_facet=city_id&city_id=4",
"selected": False,
},
],
"truncated": False,
}
} == buckets
@pytest.mark.asyncio
async def test_column_facet_from_metadata_cannot_be_hidden(app_client):
facet = ColumnFacet(
app_client.ds,
MockRequest("http://localhost/"),
database="fixtures",
sql="select * from facetable",
table="facetable",
metadata={"facets": ["city_id"]},
)
buckets, timed_out = await facet.facet_results()
assert [] == timed_out
assert {
"city_id": {
"name": "city_id",
"type": "column",
"hideable": False,
"toggle_url": "/",
"results": [
{
"value": 1,
"label": "San Francisco",
"count": 6,
"toggle_url": "http://localhost/?city_id=1",
"selected": False,
},
{
"value": 2,
"label": "Los Angeles",
"count": 4,
"toggle_url": "http://localhost/?city_id=2",
"selected": False,
},
{
"value": 3,
"label": "Detroit",
"count": 4,
"toggle_url": "http://localhost/?city_id=3",
"selected": False,
},
{
"value": 4,
"label": "Memnonia",
"count": 1,
"toggle_url": "http://localhost/?city_id=4",
"selected": False,
},
],
"truncated": False,
}
} == buckets

8
tests/utils.py 100644
Wyświetl plik

@ -0,0 +1,8 @@
class MockRequest:
def __init__(self, url):
self.url = url
self.path = "/" + url.split("://")[1].split("/", 1)[1]
self.query_string = ""
if "?" in url:
self.query_string = url.split("?", 1)[1]
self.path = self.path.split("?")[0]