datasette/tests/test_table_api.py

1401 wiersze
45 KiB
Python

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

from datasette.utils import detect_json1
from datasette.utils.sqlite import sqlite_version
from .fixtures import ( # noqa
app_client,
app_client_with_trace,
app_client_returned_rows_matches_page_size,
generate_compound_rows,
generate_sortable_rows,
make_app_client,
)
import json
import pytest
import urllib
@pytest.mark.asyncio
async def test_table_json(ds_client):
response = await ds_client.get("/fixtures/simple_primary_key.json?_extra=query")
assert response.status_code == 200
data = response.json()
assert (
data["query"]["sql"]
== "select id, content from simple_primary_key order by id limit 51"
)
assert data["query"]["params"] == {}
assert data["rows"] == [
{"id": "1", "content": "hello"},
{"id": "2", "content": "world"},
{"id": "3", "content": ""},
{"id": "4", "content": "RENDER_CELL_DEMO"},
{"id": "5", "content": "RENDER_CELL_ASYNC"},
]
@pytest.mark.asyncio
async def test_table_not_exists_json(ds_client):
assert (await ds_client.get("/fixtures/blah.json")).json() == {
"ok": False,
"error": "Table not found: blah",
"status": 404,
"title": None,
}
@pytest.mark.asyncio
async def test_table_shape_arrays(ds_client):
response = await ds_client.get("/fixtures/simple_primary_key.json?_shape=arrays")
assert response.json()["rows"] == [
["1", "hello"],
["2", "world"],
["3", ""],
["4", "RENDER_CELL_DEMO"],
["5", "RENDER_CELL_ASYNC"],
]
@pytest.mark.asyncio
async def test_table_shape_arrayfirst(ds_client):
response = await ds_client.get(
"/fixtures.json?"
+ urllib.parse.urlencode(
{
"sql": "select content from simple_primary_key order by id",
"_shape": "arrayfirst",
}
)
)
assert response.json() == [
"hello",
"world",
"",
"RENDER_CELL_DEMO",
"RENDER_CELL_ASYNC",
]
@pytest.mark.asyncio
async def test_table_shape_objects(ds_client):
response = await ds_client.get("/fixtures/simple_primary_key.json?_shape=objects")
assert response.json()["rows"] == [
{"id": "1", "content": "hello"},
{"id": "2", "content": "world"},
{"id": "3", "content": ""},
{"id": "4", "content": "RENDER_CELL_DEMO"},
{"id": "5", "content": "RENDER_CELL_ASYNC"},
]
@pytest.mark.asyncio
async def test_table_shape_array(ds_client):
response = await ds_client.get("/fixtures/simple_primary_key.json?_shape=array")
assert response.json() == [
{"id": "1", "content": "hello"},
{"id": "2", "content": "world"},
{"id": "3", "content": ""},
{"id": "4", "content": "RENDER_CELL_DEMO"},
{"id": "5", "content": "RENDER_CELL_ASYNC"},
]
@pytest.mark.asyncio
async def test_table_shape_array_nl(ds_client):
response = await ds_client.get(
"/fixtures/simple_primary_key.json?_shape=array&_nl=on"
)
lines = response.text.split("\n")
results = [json.loads(line) for line in lines]
assert [
{"id": "1", "content": "hello"},
{"id": "2", "content": "world"},
{"id": "3", "content": ""},
{"id": "4", "content": "RENDER_CELL_DEMO"},
{"id": "5", "content": "RENDER_CELL_ASYNC"},
] == results
@pytest.mark.asyncio
async def test_table_shape_invalid(ds_client):
response = await ds_client.get("/fixtures/simple_primary_key.json?_shape=invalid")
assert response.json() == {
"ok": False,
"error": "Invalid _shape: invalid",
"status": 400,
"title": None,
}
@pytest.mark.asyncio
async def test_table_shape_object(ds_client):
response = await ds_client.get("/fixtures/simple_primary_key.json?_shape=object")
assert response.json() == {
"1": {"id": "1", "content": "hello"},
"2": {"id": "2", "content": "world"},
"3": {"id": "3", "content": ""},
"4": {"id": "4", "content": "RENDER_CELL_DEMO"},
"5": {"id": "5", "content": "RENDER_CELL_ASYNC"},
}
@pytest.mark.asyncio
async def test_table_shape_object_compound_primary_key(ds_client):
response = await ds_client.get("/fixtures/compound_primary_key.json?_shape=object")
assert response.json() == {
"a,b": {"pk1": "a", "pk2": "b", "content": "c"},
"a~2Fb,~2Ec-d": {"pk1": "a/b", "pk2": ".c-d", "content": "c"},
}
@pytest.mark.asyncio
async def test_table_with_slashes_in_name(ds_client):
response = await ds_client.get(
"/fixtures/table~2Fwith~2Fslashes~2Ecsv.json?_shape=objects"
)
assert response.status_code == 200
data = response.json()
assert data["rows"] == [{"pk": "3", "content": "hey"}]
@pytest.mark.asyncio
async def test_table_with_reserved_word_name(ds_client):
response = await ds_client.get("/fixtures/select.json?_shape=objects")
assert response.status_code == 200
data = response.json()
assert data["rows"] == [
{
"rowid": 1,
"group": "group",
"having": "having",
"and": "and",
"json": '{"href": "http://example.com/", "label":"Example"}',
}
]
@pytest.mark.asyncio
@pytest.mark.parametrize(
"path,expected_rows,expected_pages",
[
("/fixtures/no_primary_key.json", 201, 5),
("/fixtures/paginated_view.json", 201, 9),
("/fixtures/no_primary_key.json?_size=25", 201, 9),
("/fixtures/paginated_view.json?_size=50", 201, 5),
("/fixtures/paginated_view.json?_size=max", 201, 3),
("/fixtures/123_starts_with_digits.json", 0, 1),
# Ensure faceting doesn't break pagination:
("/fixtures/compound_three_primary_keys.json?_facet=pk1", 1001, 21),
# Paginating while sorted by an expanded foreign key should work
(
"/fixtures/roadside_attraction_characteristics.json?_size=2&_sort=attraction_id&_labels=on",
5,
3,
),
],
)
async def test_paginate_tables_and_views(
ds_client, path, expected_rows, expected_pages
):
fetched = []
count = 0
while path:
if "?" in path:
path += "&_extra=next_url"
else:
path += "?_extra=next_url"
response = await ds_client.get(path)
assert response.status_code == 200
count += 1
fetched.extend(response.json()["rows"])
path = response.json()["next_url"]
if path:
assert urllib.parse.urlencode({"_next": response.json()["next"]}) in path
path = path.replace("http://localhost", "")
assert count < 30, "Possible infinite loop detected"
assert expected_rows == len(fetched)
assert expected_pages == count
@pytest.mark.asyncio
@pytest.mark.parametrize(
"path,expected_error",
[
("/fixtures/no_primary_key.json?_size=-4", "_size must be a positive integer"),
("/fixtures/no_primary_key.json?_size=dog", "_size must be a positive integer"),
("/fixtures/no_primary_key.json?_size=1001", "_size must be <= 100"),
],
)
async def test_validate_page_size(ds_client, path, expected_error):
response = await ds_client.get(path)
assert expected_error == response.json()["error"]
assert response.status_code == 400
@pytest.mark.asyncio
async def test_page_size_zero(ds_client):
"""For _size=0 we return the counts, empty rows and no continuation token"""
response = await ds_client.get(
"/fixtures/no_primary_key.json?_size=0&_extra=count,next_url"
)
assert response.status_code == 200
assert [] == response.json()["rows"]
assert 201 == response.json()["count"]
assert None is response.json()["next"]
assert None is response.json()["next_url"]
@pytest.mark.asyncio
async def test_paginate_compound_keys(ds_client):
fetched = []
path = "/fixtures/compound_three_primary_keys.json?_shape=objects&_extra=next_url"
page = 0
while path:
page += 1
response = await ds_client.get(path)
fetched.extend(response.json()["rows"])
path = response.json()["next_url"]
if path:
path = path.replace("http://localhost", "")
assert page < 100
assert 1001 == len(fetched)
assert 21 == page
# Should be correctly ordered
contents = [f["content"] for f in fetched]
expected = [r[3] for r in generate_compound_rows(1001)]
assert expected == contents
@pytest.mark.asyncio
async def test_paginate_compound_keys_with_extra_filters(ds_client):
fetched = []
path = "/fixtures/compound_three_primary_keys.json?content__contains=d&_shape=objects&_extra=next_url"
page = 0
while path:
page += 1
assert page < 100
response = await ds_client.get(path)
fetched.extend(response.json()["rows"])
path = response.json()["next_url"]
if path:
path = path.replace("http://localhost", "")
assert 2 == page
expected = [r[3] for r in generate_compound_rows(1001) if "d" in r[3]]
assert expected == [f["content"] for f in fetched]
@pytest.mark.asyncio
@pytest.mark.parametrize(
"query_string,sort_key,human_description_en",
[
("_sort=sortable", lambda row: row["sortable"], "sorted by sortable"),
(
"_sort_desc=sortable",
lambda row: -row["sortable"],
"sorted by sortable descending",
),
(
"_sort=sortable_with_nulls",
lambda row: (
1 if row["sortable_with_nulls"] is not None else 0,
row["sortable_with_nulls"],
),
"sorted by sortable_with_nulls",
),
(
"_sort_desc=sortable_with_nulls",
lambda row: (
1 if row["sortable_with_nulls"] is None else 0,
(
-row["sortable_with_nulls"]
if row["sortable_with_nulls"] is not None
else 0
),
row["content"],
),
"sorted by sortable_with_nulls descending",
),
# text column contains '$null' - ensure it doesn't confuse pagination:
("_sort=text", lambda row: row["text"], "sorted by text"),
# Still works if sort column removed using _col=
("_sort=text&_col=content", lambda row: row["text"], "sorted by text"),
],
)
async def test_sortable(ds_client, query_string, sort_key, human_description_en):
path = f"/fixtures/sortable.json?_shape=objects&_extra=human_description_en,next_url&{query_string}"
fetched = []
page = 0
while path:
page += 1
assert page < 100
response = await ds_client.get(path)
assert human_description_en == response.json()["human_description_en"]
fetched.extend(response.json()["rows"])
path = response.json()["next_url"]
if path:
path = path.replace("http://localhost", "")
assert page == 5
expected = list(generate_sortable_rows(201))
expected.sort(key=sort_key)
assert [r["content"] for r in expected] == [r["content"] for r in fetched]
@pytest.mark.asyncio
async def test_sortable_and_filtered(ds_client):
path = (
"/fixtures/sortable.json"
"?content__contains=d&_sort_desc=sortable&_shape=objects"
"&_extra=human_description_en,count"
)
response = await ds_client.get(path)
fetched = response.json()["rows"]
assert (
'where content contains "d" sorted by sortable descending'
== response.json()["human_description_en"]
)
expected = [row for row in generate_sortable_rows(201) if "d" in row["content"]]
assert len(expected) == response.json()["count"]
expected.sort(key=lambda row: -row["sortable"])
assert [r["content"] for r in expected] == [r["content"] for r in fetched]
@pytest.mark.asyncio
async def test_sortable_argument_errors(ds_client):
response = await ds_client.get("/fixtures/sortable.json?_sort=badcolumn")
assert "Cannot sort table by badcolumn" == response.json()["error"]
response = await ds_client.get("/fixtures/sortable.json?_sort_desc=badcolumn2")
assert "Cannot sort table by badcolumn2" == response.json()["error"]
response = await ds_client.get(
"/fixtures/sortable.json?_sort=sortable_with_nulls&_sort_desc=sortable"
)
assert (
"Cannot use _sort and _sort_desc at the same time" == response.json()["error"]
)
@pytest.mark.asyncio
async def test_sortable_columns_metadata(ds_client):
response = await ds_client.get("/fixtures/sortable.json?_sort=content")
assert "Cannot sort table by content" == response.json()["error"]
# no_primary_key has ALL sort options disabled
for column in ("content", "a", "b", "c"):
response = await ds_client.get(f"/fixtures/sortable.json?_sort={column}")
assert f"Cannot sort table by {column}" == response.json()["error"]
@pytest.mark.asyncio
@pytest.mark.parametrize(
"path,expected_rows",
[
(
"/fixtures/searchable.json?_shape=arrays&_search=dog",
[
[1, "barry cat", "terry dog", "panther"],
[2, "terry dog", "sara weasel", "puma"],
],
),
(
# Special keyword shouldn't break FTS query
"/fixtures/searchable.json?_shape=arrays&_search=AND",
[],
),
(
# Without _searchmode=raw this should return no results
"/fixtures/searchable.json?_shape=arrays&_search=te*+AND+do*",
[],
),
(
# _searchmode=raw
"/fixtures/searchable.json?_shape=arrays&_search=te*+AND+do*&_searchmode=raw",
[
[1, "barry cat", "terry dog", "panther"],
[2, "terry dog", "sara weasel", "puma"],
],
),
(
# _searchmode=raw combined with _search_COLUMN
"/fixtures/searchable.json?_shape=arrays&_search_text2=te*&_searchmode=raw",
[
[1, "barry cat", "terry dog", "panther"],
],
),
(
"/fixtures/searchable.json?_shape=arrays&_search=weasel",
[[2, "terry dog", "sara weasel", "puma"]],
),
(
"/fixtures/searchable.json?_shape=arrays&_search_text2=dog",
[[1, "barry cat", "terry dog", "panther"]],
),
(
"/fixtures/searchable.json?_shape=arrays&_search_name%20with%20.%20and%20spaces=panther",
[[1, "barry cat", "terry dog", "panther"]],
),
],
)
async def test_searchable(ds_client, path, expected_rows):
response = await ds_client.get(path)
assert expected_rows == response.json()["rows"]
_SEARCHMODE_RAW_RESULTS = [
[1, "barry cat", "terry dog", "panther"],
[2, "terry dog", "sara weasel", "puma"],
]
@pytest.mark.parametrize(
"table_metadata,querystring,expected_rows",
[
(
{},
"_search=te*+AND+do*",
[],
),
(
{"searchmode": "raw"},
"_search=te*+AND+do*",
_SEARCHMODE_RAW_RESULTS,
),
(
{},
"_search=te*+AND+do*&_searchmode=raw",
_SEARCHMODE_RAW_RESULTS,
),
# Can be over-ridden with _searchmode=escaped
(
{"searchmode": "raw"},
"_search=te*+AND+do*&_searchmode=escaped",
[],
),
],
)
def test_searchmode(table_metadata, querystring, expected_rows):
with make_app_client(
metadata={"databases": {"fixtures": {"tables": {"searchable": table_metadata}}}}
) as client:
response = client.get("/fixtures/searchable.json?_shape=arrays&" + querystring)
assert expected_rows == response.json["rows"]
@pytest.mark.asyncio
@pytest.mark.parametrize(
"path,expected_rows",
[
(
"/fixtures/searchable_view_configured_by_metadata.json?_shape=arrays&_search=weasel",
[[2, "terry dog", "sara weasel", "puma"]],
),
# This should return all results because search is not configured:
(
"/fixtures/searchable_view.json?_shape=arrays&_search=weasel",
[
[1, "barry cat", "terry dog", "panther"],
[2, "terry dog", "sara weasel", "puma"],
],
),
(
"/fixtures/searchable_view.json?_shape=arrays&_search=weasel&_fts_table=searchable_fts&_fts_pk=pk",
[[2, "terry dog", "sara weasel", "puma"]],
),
],
)
async def test_searchable_views(ds_client, path, expected_rows):
response = await ds_client.get(path)
assert response.json()["rows"] == expected_rows
@pytest.mark.asyncio
async def test_searchable_invalid_column(ds_client):
response = await ds_client.get("/fixtures/searchable.json?_search_invalid=x")
assert response.status_code == 400
assert response.json() == {
"ok": False,
"error": "Cannot search by that column",
"status": 400,
"title": None,
}
@pytest.mark.asyncio
@pytest.mark.parametrize(
"path,expected_rows",
[
(
"/fixtures/simple_primary_key.json?_shape=arrays&content=hello",
[["1", "hello"]],
),
(
"/fixtures/simple_primary_key.json?_shape=arrays&content__contains=o",
[
["1", "hello"],
["2", "world"],
["4", "RENDER_CELL_DEMO"],
],
),
(
"/fixtures/simple_primary_key.json?_shape=arrays&content__exact=",
[["3", ""]],
),
(
"/fixtures/simple_primary_key.json?_shape=arrays&content__not=world",
[
["1", "hello"],
["3", ""],
["4", "RENDER_CELL_DEMO"],
["5", "RENDER_CELL_ASYNC"],
],
),
],
)
async def test_table_filter_queries(ds_client, path, expected_rows):
response = await ds_client.get(path)
assert response.json()["rows"] == expected_rows
@pytest.mark.asyncio
async def test_table_filter_queries_multiple_of_same_type(ds_client):
response = await ds_client.get(
"/fixtures/simple_primary_key.json?_shape=arrays&content__not=world&content__not=hello"
)
assert [
["3", ""],
["4", "RENDER_CELL_DEMO"],
["5", "RENDER_CELL_ASYNC"],
] == response.json()["rows"]
@pytest.mark.skipif(not detect_json1(), reason="Requires the SQLite json1 module")
@pytest.mark.asyncio
async def test_table_filter_json_arraycontains(ds_client):
response = await ds_client.get(
"/fixtures/facetable.json?_shape=arrays&tags__arraycontains=tag1"
)
assert response.json()["rows"] == [
[
1,
"2019-01-14 08:00:00",
1,
1,
"CA",
1,
"Mission",
'["tag1", "tag2"]',
'[{"foo": "bar"}]',
"one",
"n1",
],
[
2,
"2019-01-14 08:00:00",
1,
1,
"CA",
1,
"Dogpatch",
'["tag1", "tag3"]',
"[]",
"two",
"n2",
],
]
@pytest.mark.skipif(not detect_json1(), reason="Requires the SQLite json1 module")
@pytest.mark.asyncio
async def test_table_filter_json_arraynotcontains(ds_client):
response = await ds_client.get(
"/fixtures/facetable.json?_shape=arrays&tags__arraynotcontains=tag3&tags__not=[]"
)
assert response.json()["rows"] == [
[
1,
"2019-01-14 08:00:00",
1,
1,
"CA",
1,
"Mission",
'["tag1", "tag2"]',
'[{"foo": "bar"}]',
"one",
"n1",
]
]
@pytest.mark.asyncio
async def test_table_filter_extra_where(ds_client):
response = await ds_client.get(
"/fixtures/facetable.json?_shape=arrays&_where=_neighborhood='Dogpatch'"
)
assert [
[
2,
"2019-01-14 08:00:00",
1,
1,
"CA",
1,
"Dogpatch",
'["tag1", "tag3"]',
"[]",
"two",
"n2",
]
] == response.json()["rows"]
@pytest.mark.asyncio
async def test_table_filter_extra_where_invalid(ds_client):
response = await ds_client.get(
"/fixtures/facetable.json?_where=_neighborhood=Dogpatch'"
)
assert response.status_code == 400
assert "Invalid SQL" == response.json()["title"]
def test_table_filter_extra_where_disabled_if_no_sql_allowed():
with make_app_client(config={"allow_sql": {}}) as client:
response = client.get(
"/fixtures/facetable.json?_where=_neighborhood='Dogpatch'"
)
assert response.status_code == 403
assert "_where= is not allowed" == response.json["error"]
@pytest.mark.asyncio
async def test_table_through(ds_client):
# Just the museums:
response = await ds_client.get(
"/fixtures/roadside_attractions.json?_shape=arrays"
'&_through={"table":"roadside_attraction_characteristics","column":"characteristic_id","value":"1"}'
"&_extra=human_description_en"
)
assert response.json()["rows"] == [
[
3,
"Burlingame Museum of PEZ Memorabilia",
"214 California Drive, Burlingame, CA 94010",
None,
37.5793,
-122.3442,
],
[
4,
"Bigfoot Discovery Museum",
"5497 Highway 9, Felton, CA 95018",
"https://www.bigfootdiscoveryproject.com/",
37.0414,
-122.0725,
],
]
assert (
response.json()["human_description_en"]
== 'where roadside_attraction_characteristics.characteristic_id = "1"'
)
@pytest.mark.asyncio
async def test_max_returned_rows(ds_client):
response = await ds_client.get(
"/fixtures.json?sql=select+content+from+no_primary_key"
)
data = response.json()
assert data["truncated"]
assert 100 == len(data["rows"])
@pytest.mark.asyncio
async def test_view(ds_client):
response = await ds_client.get("/fixtures/simple_view.json?_shape=objects")
assert response.status_code == 200
data = response.json()
assert data["rows"] == [
{"upper_content": "HELLO", "content": "hello"},
{"upper_content": "WORLD", "content": "world"},
{"upper_content": "", "content": ""},
{"upper_content": "RENDER_CELL_DEMO", "content": "RENDER_CELL_DEMO"},
{"upper_content": "RENDER_CELL_ASYNC", "content": "RENDER_CELL_ASYNC"},
]
@pytest.mark.xfail
@pytest.mark.asyncio
async def test_unit_filters(ds_client):
response = await ds_client.get(
"/fixtures/units.json?_shape=arrays&distance__lt=75km&frequency__gt=1kHz"
)
assert response.status_code == 200
data = response.json()
assert data["units"]["distance"] == "m"
assert data["units"]["frequency"] == "Hz"
assert len(data["rows"]) == 1
assert data["rows"][0][0] == 2
def test_page_size_matching_max_returned_rows(
app_client_returned_rows_matches_page_size,
):
fetched = []
path = "/fixtures/no_primary_key.json?_extra=next_url"
while path:
response = app_client_returned_rows_matches_page_size.get(path)
fetched.extend(response.json["rows"])
assert len(response.json["rows"]) in (1, 50)
path = response.json["next_url"]
if path:
path = path.replace("http://localhost", "")
assert len(fetched) == 201
@pytest.mark.asyncio
@pytest.mark.parametrize(
"path,expected_facet_results",
[
(
"/fixtures/facetable.json?_facet=state&_facet=_city_id",
{
"state": {
"name": "state",
"hideable": True,
"type": "column",
"toggle_url": "/fixtures/facetable.json?_facet=_city_id",
"results": [
{
"value": "CA",
"label": "CA",
"count": 10,
"toggle_url": "_facet=state&_facet=_city_id&state=CA",
"selected": False,
},
{
"value": "MI",
"label": "MI",
"count": 4,
"toggle_url": "_facet=state&_facet=_city_id&state=MI",
"selected": False,
},
{
"value": "MC",
"label": "MC",
"count": 1,
"toggle_url": "_facet=state&_facet=_city_id&state=MC",
"selected": False,
},
],
"truncated": False,
},
"_city_id": {
"name": "_city_id",
"hideable": True,
"type": "column",
"toggle_url": "/fixtures/facetable.json?_facet=state",
"results": [
{
"value": 1,
"label": "San Francisco",
"count": 6,
"toggle_url": "_facet=state&_facet=_city_id&_city_id__exact=1",
"selected": False,
},
{
"value": 2,
"label": "Los Angeles",
"count": 4,
"toggle_url": "_facet=state&_facet=_city_id&_city_id__exact=2",
"selected": False,
},
{
"value": 3,
"label": "Detroit",
"count": 4,
"toggle_url": "_facet=state&_facet=_city_id&_city_id__exact=3",
"selected": False,
},
{
"value": 4,
"label": "Memnonia",
"count": 1,
"toggle_url": "_facet=state&_facet=_city_id&_city_id__exact=4",
"selected": False,
},
],
"truncated": False,
},
},
),
(
"/fixtures/facetable.json?_facet=state&_facet=_city_id&state=MI",
{
"state": {
"name": "state",
"hideable": True,
"type": "column",
"toggle_url": "/fixtures/facetable.json?_facet=_city_id&state=MI",
"results": [
{
"value": "MI",
"label": "MI",
"count": 4,
"selected": True,
"toggle_url": "_facet=state&_facet=_city_id",
}
],
"truncated": False,
},
"_city_id": {
"name": "_city_id",
"hideable": True,
"type": "column",
"toggle_url": "/fixtures/facetable.json?_facet=state&state=MI",
"results": [
{
"value": 3,
"label": "Detroit",
"count": 4,
"selected": False,
"toggle_url": "_facet=state&_facet=_city_id&state=MI&_city_id__exact=3",
}
],
"truncated": False,
},
},
),
(
"/fixtures/facetable.json?_facet=planet_int",
{
"planet_int": {
"name": "planet_int",
"hideable": True,
"type": "column",
"toggle_url": "/fixtures/facetable.json",
"results": [
{
"value": 1,
"label": 1,
"count": 14,
"selected": False,
"toggle_url": "_facet=planet_int&planet_int=1",
},
{
"value": 2,
"label": 2,
"count": 1,
"selected": False,
"toggle_url": "_facet=planet_int&planet_int=2",
},
],
"truncated": False,
}
},
),
(
# planet_int is an integer field:
"/fixtures/facetable.json?_facet=planet_int&planet_int=1",
{
"planet_int": {
"name": "planet_int",
"hideable": True,
"type": "column",
"toggle_url": "/fixtures/facetable.json?planet_int=1",
"results": [
{
"value": 1,
"label": 1,
"count": 14,
"selected": True,
"toggle_url": "_facet=planet_int",
}
],
"truncated": False,
}
},
),
],
)
async def test_facets(ds_client, path, expected_facet_results):
response = await ds_client.get(path)
facet_results = response.json()["facet_results"]
# We only compare the querystring portion of the taggle_url
for facet_name, facet_info in facet_results["results"].items():
assert facet_name == facet_info["name"]
assert False is facet_info["truncated"]
for facet_value in facet_info["results"]:
facet_value["toggle_url"] = facet_value["toggle_url"].split("?")[1]
assert expected_facet_results == facet_results["results"]
@pytest.mark.asyncio
@pytest.mark.skipif(not detect_json1(), reason="requires JSON1 extension")
async def test_facets_array(ds_client):
response = await ds_client.get("/fixtures/facetable.json?_facet_array=tags")
facet_results = response.json()["facet_results"]
assert facet_results["results"]["tags"]["results"] == [
{
"value": "tag1",
"label": "tag1",
"count": 2,
"toggle_url": "http://localhost/fixtures/facetable.json?_facet_array=tags&tags__arraycontains=tag1",
"selected": False,
},
{
"value": "tag2",
"label": "tag2",
"count": 1,
"toggle_url": "http://localhost/fixtures/facetable.json?_facet_array=tags&tags__arraycontains=tag2",
"selected": False,
},
{
"value": "tag3",
"label": "tag3",
"count": 1,
"toggle_url": "http://localhost/fixtures/facetable.json?_facet_array=tags&tags__arraycontains=tag3",
"selected": False,
},
]
@pytest.mark.asyncio
async def test_suggested_facets(ds_client):
suggestions = [
{
"name": suggestion["name"],
"querystring": suggestion["toggle_url"].split("?")[-1],
}
for suggestion in (
await ds_client.get("/fixtures/facetable.json?_extra=suggested_facets")
).json()["suggested_facets"]
]
expected = [
{"name": "created", "querystring": "_extra=suggested_facets&_facet=created"},
{
"name": "planet_int",
"querystring": "_extra=suggested_facets&_facet=planet_int",
},
{"name": "on_earth", "querystring": "_extra=suggested_facets&_facet=on_earth"},
{"name": "state", "querystring": "_extra=suggested_facets&_facet=state"},
{"name": "_city_id", "querystring": "_extra=suggested_facets&_facet=_city_id"},
{
"name": "_neighborhood",
"querystring": "_extra=suggested_facets&_facet=_neighborhood",
},
{"name": "tags", "querystring": "_extra=suggested_facets&_facet=tags"},
{
"name": "complex_array",
"querystring": "_extra=suggested_facets&_facet=complex_array",
},
{
"name": "created",
"querystring": "_extra=suggested_facets&_facet_date=created",
},
]
if detect_json1():
expected.append(
{"name": "tags", "querystring": "_extra=suggested_facets&_facet_array=tags"}
)
assert expected == suggestions
def test_allow_facet_off():
with make_app_client(settings={"allow_facet": False}) as client:
assert (
client.get(
"/fixtures/facetable.json?_facet=planet_int&_extra=suggested_facets"
).status
== 400
)
data = client.get("/fixtures/facetable.json?_extra=suggested_facets").json
# Should not suggest any facets either:
assert [] == data["suggested_facets"]
def test_suggest_facets_off():
with make_app_client(settings={"suggest_facets": False}) as client:
# Now suggested_facets should be []
assert (
[]
== client.get("/fixtures/facetable.json?_extra=suggested_facets").json[
"suggested_facets"
]
)
@pytest.mark.asyncio
@pytest.mark.parametrize("nofacet", (True, False))
async def test_nofacet(ds_client, nofacet):
path = "/fixtures/facetable.json?_facet=state&_extra=suggested_facets"
if nofacet:
path += "&_nofacet=1"
response = await ds_client.get(path)
if nofacet:
assert response.json()["suggested_facets"] == []
assert response.json()["facet_results"]["results"] == {}
else:
assert response.json()["suggested_facets"] != []
assert response.json()["facet_results"]["results"] != {}
@pytest.mark.asyncio
@pytest.mark.parametrize("nosuggest", (True, False))
async def test_nosuggest(ds_client, nosuggest):
path = "/fixtures/facetable.json?_facet=state&_extra=suggested_facets"
if nosuggest:
path += "&_nosuggest=1"
response = await ds_client.get(path)
if nosuggest:
assert response.json()["suggested_facets"] == []
# But facets should still be returned:
assert response.json()["facet_results"] != {}
else:
assert response.json()["suggested_facets"] != []
assert response.json()["facet_results"] != {}
@pytest.mark.asyncio
@pytest.mark.parametrize("nocount,expected_count", ((True, None), (False, 15)))
async def test_nocount(ds_client, nocount, expected_count):
path = "/fixtures/facetable.json?_extra=count"
if nocount:
path += "&_nocount=1"
response = await ds_client.get(path)
assert response.json()["count"] == expected_count
def test_nocount_nofacet_if_shape_is_object(app_client_with_trace):
response = app_client_with_trace.get(
"/fixtures/facetable.json?_trace=1&_shape=object"
)
assert "count(*)" not in response.text
@pytest.mark.asyncio
async def test_expand_labels(ds_client):
response = await ds_client.get(
"/fixtures/facetable.json?_shape=object&_labels=1&_size=2"
"&_neighborhood__contains=c"
)
assert response.json() == {
"2": {
"pk": 2,
"created": "2019-01-14 08:00:00",
"planet_int": 1,
"on_earth": 1,
"state": "CA",
"_city_id": {"value": 1, "label": "San Francisco"},
"_neighborhood": "Dogpatch",
"tags": '["tag1", "tag3"]',
"complex_array": "[]",
"distinct_some_null": "two",
"n": "n2",
},
"13": {
"pk": 13,
"created": "2019-01-17 08:00:00",
"planet_int": 1,
"on_earth": 1,
"state": "MI",
"_city_id": {"value": 3, "label": "Detroit"},
"_neighborhood": "Corktown",
"tags": "[]",
"complex_array": "[]",
"distinct_some_null": None,
"n": None,
},
}
@pytest.mark.asyncio
async def test_expand_label(ds_client):
response = await ds_client.get(
"/fixtures/foreign_key_references.json?_shape=object"
"&_label=foreign_key_with_label&_size=1"
)
assert response.json() == {
"1": {
"pk": "1",
"foreign_key_with_label": {"value": "1", "label": "hello"},
"foreign_key_with_blank_label": "3",
"foreign_key_with_no_label": "1",
"foreign_key_compound_pk1": "a",
"foreign_key_compound_pk2": "b",
}
}
@pytest.mark.asyncio
@pytest.mark.parametrize(
"path,expected_cache_control",
[
("/fixtures/facetable.json", "max-age=5"),
("/fixtures/facetable.json?_ttl=invalid", "max-age=5"),
("/fixtures/facetable.json?_ttl=10", "max-age=10"),
("/fixtures/facetable.json?_ttl=0", "no-cache"),
],
)
async def test_ttl_parameter(ds_client, path, expected_cache_control):
response = await ds_client.get(path)
assert response.headers["Cache-Control"] == expected_cache_control
@pytest.mark.asyncio
async def test_infinity_returned_as_null(ds_client):
response = await ds_client.get("/fixtures/infinity.json?_shape=array")
assert response.json() == [
{"rowid": 1, "value": None},
{"rowid": 2, "value": None},
{"rowid": 3, "value": 1.5},
]
@pytest.mark.asyncio
async def test_infinity_returned_as_invalid_json_if_requested(ds_client):
response = await ds_client.get(
"/fixtures/infinity.json?_shape=array&_json_infinity=1"
)
assert response.json() == [
{"rowid": 1, "value": float("inf")},
{"rowid": 2, "value": float("-inf")},
{"rowid": 3, "value": 1.5},
]
@pytest.mark.asyncio
async def test_custom_query_with_unicode_characters(ds_client):
# /fixtures/𝐜𝐢𝐭𝐢𝐞𝐬.json
response = await ds_client.get(
"/fixtures/~F0~9D~90~9C~F0~9D~90~A2~F0~9D~90~AD~F0~9D~90~A2~F0~9D~90~9E~F0~9D~90~AC.json?_shape=array"
)
assert response.json() == [{"id": 1, "name": "San Francisco"}]
@pytest.mark.asyncio
async def test_null_and_compound_foreign_keys_are_not_expanded(ds_client):
response = await ds_client.get(
"/fixtures/foreign_key_references.json?_shape=array&_labels=on"
)
assert response.json() == [
{
"pk": "1",
"foreign_key_with_label": {"value": "1", "label": "hello"},
"foreign_key_with_blank_label": {"value": "3", "label": ""},
"foreign_key_with_no_label": {"value": "1", "label": "1"},
"foreign_key_compound_pk1": "a",
"foreign_key_compound_pk2": "b",
},
{
"pk": "2",
"foreign_key_with_label": None,
"foreign_key_with_blank_label": None,
"foreign_key_with_no_label": None,
"foreign_key_compound_pk1": None,
"foreign_key_compound_pk2": None,
},
]
@pytest.mark.asyncio
@pytest.mark.parametrize(
"path,expected_json,expected_text",
[
(
"/fixtures/binary_data.json?_shape=array",
[
{"rowid": 1, "data": {"$base64": True, "encoded": "FRwCx60F/g=="}},
{"rowid": 2, "data": {"$base64": True, "encoded": "FRwDx60F/g=="}},
{"rowid": 3, "data": None},
],
None,
),
(
"/fixtures/binary_data.json?_shape=array&_nl=on",
None,
(
'{"rowid": 1, "data": {"$base64": true, "encoded": "FRwCx60F/g=="}}\n'
'{"rowid": 2, "data": {"$base64": true, "encoded": "FRwDx60F/g=="}}\n'
'{"rowid": 3, "data": null}'
),
),
],
)
async def test_binary_data_in_json(ds_client, path, expected_json, expected_text):
response = await ds_client.get(path)
if expected_json:
assert response.json() == expected_json
else:
assert response.text == expected_text
@pytest.mark.asyncio
@pytest.mark.parametrize(
"qs",
[
"",
"?_shape=arrays",
"?_shape=arrayfirst",
"?_shape=object",
"?_shape=objects",
"?_shape=array",
"?_shape=array&_nl=on",
],
)
async def test_paginate_using_link_header(ds_client, qs):
path = f"/fixtures/compound_three_primary_keys.json{qs}"
num_pages = 0
while path:
response = await ds_client.get(path)
assert response.status_code == 200
num_pages += 1
link = response.headers.get("link")
if link:
assert link.startswith("<")
assert link.endswith('>; rel="next"')
path = link[1:].split(">")[0]
path = path.replace("http://localhost", "")
else:
path = None
assert num_pages == 21
@pytest.mark.skipif(
sqlite_version() < (3, 31, 0),
reason="generated columns were added in SQLite 3.31.0",
)
def test_generated_columns_are_visible_in_datasette():
with make_app_client(
extra_databases={
"generated.db": """
CREATE TABLE generated_columns (
body TEXT,
id INT GENERATED ALWAYS AS (json_extract(body, '$.number')) STORED,
consideration INT GENERATED ALWAYS AS (json_extract(body, '$.string')) STORED
);
INSERT INTO generated_columns (body) VALUES (
'{"number": 1, "string": "This is a string"}'
);"""
}
) as client:
response = client.get("/generated/generated_columns.json?_shape=array")
assert response.json == [
{
"rowid": 1,
"body": '{"number": 1, "string": "This is a string"}',
"id": 1,
"consideration": "This is a string",
}
]
@pytest.mark.asyncio
@pytest.mark.parametrize(
"path,expected_columns",
(
("/fixtures/facetable.json?_col=created", ["pk", "created"]),
(
"/fixtures/facetable.json?_nocol=created",
[
"pk",
"planet_int",
"on_earth",
"state",
"_city_id",
"_neighborhood",
"tags",
"complex_array",
"distinct_some_null",
"n",
],
),
(
"/fixtures/facetable.json?_col=state&_col=created",
["pk", "state", "created"],
),
(
"/fixtures/facetable.json?_col=state&_col=state",
["pk", "state"],
),
(
"/fixtures/facetable.json?_col=state&_col=created&_nocol=created",
["pk", "state"],
),
(
# Ensure faceting doesn't break, https://github.com/simonw/datasette/issues/1345
"/fixtures/facetable.json?_nocol=state&_facet=state",
[
"pk",
"created",
"planet_int",
"on_earth",
"_city_id",
"_neighborhood",
"tags",
"complex_array",
"distinct_some_null",
"n",
],
),
(
"/fixtures/simple_view.json?_nocol=content",
["upper_content"],
),
("/fixtures/simple_view.json?_col=content", ["content"]),
),
)
async def test_col_nocol(ds_client, path, expected_columns):
response = await ds_client.get(path + "&_extra=columns")
assert response.status_code == 200
columns = response.json()["columns"]
assert columns == expected_columns
@pytest.mark.asyncio
@pytest.mark.parametrize(
"path,expected_error",
(
("/fixtures/facetable.json?_col=bad", "_col=bad - invalid columns"),
("/fixtures/facetable.json?_nocol=bad", "_nocol=bad - invalid columns"),
("/fixtures/facetable.json?_nocol=pk", "_nocol=pk - invalid columns"),
("/fixtures/simple_view.json?_col=bad", "_col=bad - invalid columns"),
),
)
async def test_col_nocol_errors(ds_client, path, expected_error):
response = await ds_client.get(path)
assert response.status_code == 400
assert response.json()["error"] == expected_error
@pytest.mark.asyncio
@pytest.mark.parametrize(
"extra,expected_json",
(
(
"columns",
{
"ok": True,
"next": None,
"columns": ["id", "content", "content2"],
"rows": [{"id": "1", "content": "hey", "content2": "world"}],
"truncated": False,
},
),
(
"count",
{
"ok": True,
"next": None,
"rows": [{"id": "1", "content": "hey", "content2": "world"}],
"truncated": False,
"count": 1,
},
),
),
)
async def test_table_extras(ds_client, extra, expected_json):
response = await ds_client.get(
"/fixtures/primary_key_multiple_columns.json?_extra=" + extra
)
assert response.status_code == 200
assert response.json() == expected_json