diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/fixtures.py b/tests/fixtures.py new file mode 100644 index 00000000..92c38482 --- /dev/null +++ b/tests/fixtures.py @@ -0,0 +1,92 @@ +from datasette.app import Datasette +import os +import sqlite3 +import tempfile +import time + + +def app_client(): + with tempfile.TemporaryDirectory() as tmpdir: + filepath = os.path.join(tmpdir, 'test_tables.db') + conn = sqlite3.connect(filepath) + conn.executescript(TABLES) + os.chdir(os.path.dirname(filepath)) + ds = Datasette( + [filepath], + page_size=50, + max_returned_rows=100, + sql_time_limit_ms=20, + ) + ds.sqlite_functions.append( + ('sleep', 1, lambda n: time.sleep(float(n))), + ) + yield ds.app().test_client + + +TABLES = ''' +CREATE TABLE simple_primary_key ( + pk varchar(30) primary key, + content text +); + +CREATE TABLE compound_primary_key ( + pk1 varchar(30), + pk2 varchar(30), + content text, + PRIMARY KEY (pk1, pk2) +); + +INSERT INTO compound_primary_key VALUES ('a', 'b', 'c'); + +CREATE TABLE no_primary_key ( + content text, + a text, + b text, + c text +); + +CREATE TABLE [123_starts_with_digits] ( + content text +); + +CREATE VIEW paginated_view AS + SELECT + content, + '- ' || content || ' -' AS content_extra + FROM no_primary_key; + +CREATE TABLE "Table With Space In Name" ( + pk varchar(30) primary key, + content text +); + +CREATE TABLE "table/with/slashes.csv" ( + pk varchar(30) primary key, + content text +); + +CREATE TABLE "complex_foreign_keys" ( + pk varchar(30) primary key, + f1 text, + f2 text, + f3 text, + FOREIGN KEY ("f1") REFERENCES [simple_primary_key](id), + FOREIGN KEY ("f2") REFERENCES [simple_primary_key](id), + FOREIGN KEY ("f3") REFERENCES [simple_primary_key](id) +); + +INSERT INTO simple_primary_key VALUES (1, 'hello'); +INSERT INTO simple_primary_key VALUES (2, 'world'); +INSERT INTO simple_primary_key VALUES (3, ''); + +INSERT INTO complex_foreign_keys VALUES (1, 1, 2, 1); + +INSERT INTO [table/with/slashes.csv] VALUES (3, 'hey'); + +CREATE VIEW simple_view AS + SELECT content, upper(content) AS upper_content FROM simple_primary_key; + +''' + '\n'.join([ + 'INSERT INTO no_primary_key VALUES ({i}, "a{i}", "b{i}", "c{i}");'.format(i=i + 1) + for i in range(201) +]) diff --git a/tests/test_api.py b/tests/test_api.py new file mode 100644 index 00000000..d717d9d4 --- /dev/null +++ b/tests/test_api.py @@ -0,0 +1,279 @@ +from .fixtures import app_client +import pytest + +pytest.fixture(scope='module')(app_client) + + +def test_homepage(app_client): + _, response = app_client.get('/.json') + assert response.status == 200 + assert response.json.keys() == {'test_tables': 0}.keys() + d = response.json['test_tables'] + assert d['name'] == 'test_tables' + assert d['tables_count'] == 7 + + +def test_database_page(app_client): + response = app_client.get('/test_tables.json', gather_request=False) + data = response.json + assert 'test_tables' == data['database'] + assert [{ + 'columns': ['content'], + 'name': '123_starts_with_digits', + 'count': 0, + 'hidden': False, + 'foreign_keys': {'incoming': [], 'outgoing': []}, + 'label_column': None, + }, { + 'columns': ['pk', 'content'], + 'name': 'Table With Space In Name', + 'count': 0, + 'hidden': False, + 'foreign_keys': {'incoming': [], 'outgoing': []}, + 'label_column': None, + }, { + 'columns': ['pk', 'f1', 'f2', 'f3'], + 'name': 'complex_foreign_keys', + 'count': 1, + 'foreign_keys': { + 'incoming': [], + 'outgoing': [{ + 'column': 'f3', + 'other_column': 'id', + 'other_table': 'simple_primary_key' + }, { + 'column': 'f2', + 'other_column': 'id', + 'other_table': 'simple_primary_key' + }, { + 'column': 'f1', + 'other_column': 'id', + 'other_table': 'simple_primary_key' + }], + }, + 'hidden': False, + 'label_column': None, + }, { + 'columns': ['pk1', 'pk2', 'content'], + 'name': 'compound_primary_key', + 'count': 1, + 'hidden': False, + 'foreign_keys': {'incoming': [], 'outgoing': []}, + 'label_column': None, + }, { + 'columns': ['content', 'a', 'b', 'c'], + 'name': 'no_primary_key', + 'count': 201, + 'hidden': False, + 'foreign_keys': {'incoming': [], 'outgoing': []}, + 'label_column': None, + }, { + 'columns': ['pk', 'content'], + 'name': 'simple_primary_key', + 'count': 3, + 'hidden': False, + 'foreign_keys': { + 'incoming': [{ + 'column': 'id', + 'other_column': 'f3', + 'other_table': 'complex_foreign_keys' + }, { + 'column': 'id', + 'other_column': 'f2', + 'other_table': 'complex_foreign_keys' + }, { + 'column': 'id', + 'other_column': 'f1', + 'other_table': 'complex_foreign_keys' + }], + 'outgoing': [], + }, + 'label_column': None, + }, { + 'columns': ['pk', 'content'], + 'name': 'table/with/slashes.csv', + 'count': 1, + 'hidden': False, + 'foreign_keys': {'incoming': [], 'outgoing': []}, + 'label_column': None, + }] == data['tables'] + + +def test_custom_sql(app_client): + response = app_client.get( + '/test_tables.jsono?sql=select+content+from+simple_primary_key', + gather_request=False + ) + data = response.json + assert { + 'sql': 'select content from simple_primary_key', + 'params': {} + } == data['query'] + assert [ + {'content': 'hello'}, + {'content': 'world'}, + {'content': ''} + ] == data['rows'] + assert ['content'] == data['columns'] + assert 'test_tables' == data['database'] + assert not data['truncated'] + + +def test_sql_time_limit(app_client): + response = app_client.get( + '/test_tables.jsono?sql=select+sleep(0.5)', + gather_request=False + ) + assert 400 == response.status + assert 'interrupted' == response.json['error'] + + +def test_custom_sql_time_limit(app_client): + response = app_client.get( + '/test_tables.jsono?sql=select+sleep(0.01)', + gather_request=False + ) + assert 200 == response.status + response = app_client.get( + '/test_tables.jsono?sql=select+sleep(0.01)&_sql_time_limit_ms=5', + gather_request=False + ) + assert 400 == response.status + assert 'interrupted' == response.json['error'] + + +def test_invalid_custom_sql(app_client): + response = app_client.get( + '/test_tables.json?sql=.schema', + gather_request=False + ) + assert response.status == 400 + assert response.json['ok'] is False + assert 'Statement must be a SELECT' == response.json['error'] + + +def test_table_json(app_client): + response = app_client.get('/test_tables/simple_primary_key.jsono', gather_request=False) + assert response.status == 200 + data = response.json + assert data['query']['sql'] == 'select * from simple_primary_key order by pk limit 51' + assert data['query']['params'] == {} + assert data['rows'] == [{ + 'pk': '1', + 'content': 'hello', + }, { + 'pk': '2', + 'content': 'world', + }, { + 'pk': '3', + 'content': '', + }] + + +def test_table_with_slashes_in_name(app_client): + response = app_client.get('/test_tables/table%2Fwith%2Fslashes.csv.jsono', gather_request=False) + assert response.status == 200 + data = response.json + assert data['rows'] == [{ + 'pk': '3', + 'content': 'hey', + }] + + +@pytest.mark.parametrize('path,expected_rows,expected_pages', [ + ('/test_tables/no_primary_key.jsono', 201, 5), + ('/test_tables/paginated_view.jsono', 201, 5), + ('/test_tables/123_starts_with_digits.jsono', 0, 1), +]) +def test_paginate_tables_and_views(app_client, path, expected_rows, expected_pages): + fetched = [] + count = 0 + while path: + response = app_client.get(path, gather_request=False) + count += 1 + fetched.extend(response.json['rows']) + path = response.json['next_url'] + if path: + assert response.json['next'] and path.endswith(response.json['next']) + assert count < 10, 'Possible infinite loop detected' + + assert expected_rows == len(fetched) + assert expected_pages == count + + +@pytest.mark.parametrize('path,expected_rows', [ + ('/test_tables/simple_primary_key.json?content=hello', [ + ['1', 'hello'], + ]), + ('/test_tables/simple_primary_key.json?content__contains=o', [ + ['1', 'hello'], + ['2', 'world'], + ]), + ('/test_tables/simple_primary_key.json?content__exact=', [ + ['3', ''], + ]), + ('/test_tables/simple_primary_key.json?content__not=world', [ + ['1', 'hello'], + ['3', ''], + ]), +]) +def test_table_filter_queries(app_client, path, expected_rows): + response = app_client.get(path, gather_request=False) + assert expected_rows == response.json['rows'] + + +def test_max_returned_rows(app_client): + response = app_client.get( + '/test_tables.jsono?sql=select+content+from+no_primary_key', + gather_request=False + ) + data = response.json + assert { + 'sql': 'select content from no_primary_key', + 'params': {} + } == data['query'] + assert data['truncated'] + assert 100 == len(data['rows']) + + +def test_view(app_client): + response = app_client.get('/test_tables/simple_view.jsono', gather_request=False) + assert response.status == 200 + data = response.json + assert data['rows'] == [{ + 'upper_content': 'HELLO', + 'content': 'hello', + }, { + 'upper_content': 'WORLD', + 'content': 'world', + }, { + 'upper_content': '', + 'content': '', + }] + + +def test_row(app_client): + response = app_client.get('/test_tables/simple_primary_key/1.jsono', gather_request=False) + assert response.status == 200 + assert [{'pk': '1', 'content': 'hello'}] == response.json['rows'] + + +def test_row_foreign_key_tables(app_client): + response = app_client.get('/test_tables/simple_primary_key/1.json?_extras=foreign_key_tables', gather_request=False) + assert response.status == 200 + assert [{ + 'column': 'id', + 'count': 1, + 'other_column': 'f3', + 'other_table': 'complex_foreign_keys' + }, { + 'column': 'id', + 'count': 0, + 'other_column': 'f2', + 'other_table': 'complex_foreign_keys' + }, { + 'column': 'id', + 'count': 1, + 'other_column': 'f1', + 'other_table': 'complex_foreign_keys' + }] == response.json['foreign_key_tables'] diff --git a/tests/test_app.py b/tests/test_app.py deleted file mode 100644 index 473cc8d3..00000000 --- a/tests/test_app.py +++ /dev/null @@ -1,637 +0,0 @@ -from bs4 import BeautifulSoup as Soup -from datasette.app import Datasette -import os -import pytest -import re -import sqlite3 -import tempfile -import time -import urllib.parse - - -@pytest.fixture(scope='module') -def app_client(): - with tempfile.TemporaryDirectory() as tmpdir: - filepath = os.path.join(tmpdir, 'test_tables.db') - conn = sqlite3.connect(filepath) - conn.executescript(TABLES) - os.chdir(os.path.dirname(filepath)) - ds = Datasette( - [filepath], - page_size=50, - max_returned_rows=100, - sql_time_limit_ms=20, - ) - ds.sqlite_functions.append( - ('sleep', 1, lambda n: time.sleep(float(n))), - ) - yield ds.app().test_client - - -def test_homepage(app_client): - response = app_client.get('/', gather_request=False) - assert response.status == 200 - assert 'test_tables' in response.text - - # Now try the JSON - _, response = app_client.get('/.json') - assert response.status == 200 - assert response.json.keys() == {'test_tables': 0}.keys() - d = response.json['test_tables'] - assert d['name'] == 'test_tables' - assert d['tables_count'] == 7 - - -def test_database_page(app_client): - response = app_client.get('/test_tables', allow_redirects=False, gather_request=False) - assert response.status == 302 - response = app_client.get('/test_tables', gather_request=False) - assert 'test_tables' in response.text - # Test JSON list of tables - response = app_client.get('/test_tables.json', gather_request=False) - data = response.json - assert 'test_tables' == data['database'] - assert [{ - 'columns': ['content'], - 'name': '123_starts_with_digits', - 'count': 0, - 'hidden': False, - 'foreign_keys': {'incoming': [], 'outgoing': []}, - 'label_column': None, - }, { - 'columns': ['pk', 'content'], - 'name': 'Table With Space In Name', - 'count': 0, - 'hidden': False, - 'foreign_keys': {'incoming': [], 'outgoing': []}, - 'label_column': None, - }, { - 'columns': ['pk', 'f1', 'f2', 'f3'], - 'name': 'complex_foreign_keys', - 'count': 1, - 'foreign_keys': { - 'incoming': [], - 'outgoing': [{ - 'column': 'f3', - 'other_column': 'id', - 'other_table': 'simple_primary_key' - }, { - 'column': 'f2', - 'other_column': 'id', - 'other_table': 'simple_primary_key' - }, { - 'column': 'f1', - 'other_column': 'id', - 'other_table': 'simple_primary_key' - }], - }, - 'hidden': False, - 'label_column': None, - }, { - 'columns': ['pk1', 'pk2', 'content'], - 'name': 'compound_primary_key', - 'count': 1, - 'hidden': False, - 'foreign_keys': {'incoming': [], 'outgoing': []}, - 'label_column': None, - }, { - 'columns': ['content', 'a', 'b', 'c'], - 'name': 'no_primary_key', - 'count': 201, - 'hidden': False, - 'foreign_keys': {'incoming': [], 'outgoing': []}, - 'label_column': None, - }, { - 'columns': ['pk', 'content'], - 'name': 'simple_primary_key', - 'count': 3, - 'hidden': False, - 'foreign_keys': { - 'incoming': [{ - 'column': 'id', - 'other_column': 'f3', - 'other_table': 'complex_foreign_keys' - }, { - 'column': 'id', - 'other_column': 'f2', - 'other_table': 'complex_foreign_keys' - }, { - 'column': 'id', - 'other_column': 'f1', - 'other_table': 'complex_foreign_keys' - }], - 'outgoing': [], - }, - 'label_column': None, - }, { - 'columns': ['pk', 'content'], - 'name': 'table/with/slashes.csv', - 'count': 1, - 'hidden': False, - 'foreign_keys': {'incoming': [], 'outgoing': []}, - 'label_column': None, - }] == data['tables'] - - -def test_custom_sql(app_client): - response = app_client.get( - '/test_tables.jsono?sql=select+content+from+simple_primary_key', - gather_request=False - ) - data = response.json - assert { - 'sql': 'select content from simple_primary_key', - 'params': {} - } == data['query'] - assert [ - {'content': 'hello'}, - {'content': 'world'}, - {'content': ''} - ] == data['rows'] - assert ['content'] == data['columns'] - assert 'test_tables' == data['database'] - assert not data['truncated'] - - -def test_sql_time_limit(app_client): - response = app_client.get( - '/test_tables.jsono?sql=select+sleep(0.5)', - gather_request=False - ) - assert 400 == response.status - assert 'interrupted' == response.json['error'] - - -def test_custom_sql_time_limit(app_client): - response = app_client.get( - '/test_tables.jsono?sql=select+sleep(0.01)', - gather_request=False - ) - assert 200 == response.status - response = app_client.get( - '/test_tables.jsono?sql=select+sleep(0.01)&_sql_time_limit_ms=5', - gather_request=False - ) - assert 400 == response.status - assert 'interrupted' == response.json['error'] - - -def test_invalid_custom_sql(app_client): - response = app_client.get( - '/test_tables?sql=.schema', - gather_request=False - ) - assert response.status == 400 - assert 'Statement must be a SELECT' in response.text - response = app_client.get( - '/test_tables.json?sql=.schema', - gather_request=False - ) - assert response.status == 400 - assert response.json['ok'] is False - assert 'Statement must be a SELECT' == response.json['error'] - - -def test_table_json(app_client): - response = app_client.get('/test_tables/simple_primary_key.jsono', gather_request=False) - assert response.status == 200 - data = response.json - assert data['query']['sql'] == 'select * from simple_primary_key order by pk limit 51' - assert data['query']['params'] == {} - assert data['rows'] == [{ - 'pk': '1', - 'content': 'hello', - }, { - 'pk': '2', - 'content': 'world', - }, { - 'pk': '3', - 'content': '', - }] - - -def test_table_with_slashes_in_name(app_client): - response = app_client.get('/test_tables/table%2Fwith%2Fslashes.csv', gather_request=False) - assert response.status == 200 - response = app_client.get('/test_tables/table%2Fwith%2Fslashes.csv.jsono', gather_request=False) - assert response.status == 200 - data = response.json - assert data['rows'] == [{ - 'pk': '3', - 'content': 'hey', - }] - - -@pytest.mark.parametrize('path,expected_rows,expected_pages', [ - ('/test_tables/no_primary_key.jsono', 201, 5), - ('/test_tables/paginated_view.jsono', 201, 5), - ('/test_tables/123_starts_with_digits.jsono', 0, 1), -]) -def test_paginate_tables_and_views(app_client, path, expected_rows, expected_pages): - fetched = [] - count = 0 - while path: - response = app_client.get(path, gather_request=False) - count += 1 - fetched.extend(response.json['rows']) - path = response.json['next_url'] - if path: - assert response.json['next'] and path.endswith(response.json['next']) - assert count < 10, 'Possible infinite loop detected' - - assert expected_rows == len(fetched) - assert expected_pages == count - - -@pytest.mark.parametrize('path,expected_rows', [ - ('/test_tables/simple_primary_key.json?content=hello', [ - ['1', 'hello'], - ]), - ('/test_tables/simple_primary_key.json?content__contains=o', [ - ['1', 'hello'], - ['2', 'world'], - ]), - ('/test_tables/simple_primary_key.json?content__exact=', [ - ['3', ''], - ]), - ('/test_tables/simple_primary_key.json?content__not=world', [ - ['1', 'hello'], - ['3', ''], - ]), -]) -def test_table_filter_queries(app_client, path, expected_rows): - response = app_client.get(path, gather_request=False) - assert expected_rows == response.json['rows'] - - -def test_max_returned_rows(app_client): - response = app_client.get( - '/test_tables.jsono?sql=select+content+from+no_primary_key', - gather_request=False - ) - data = response.json - assert { - 'sql': 'select content from no_primary_key', - 'params': {} - } == data['query'] - assert data['truncated'] - assert 100 == len(data['rows']) - - -def test_view(app_client): - response = app_client.get('/test_tables/simple_view', gather_request=False) - assert response.status == 200 - response = app_client.get('/test_tables/simple_view.jsono', gather_request=False) - assert response.status == 200 - data = response.json - assert data['rows'] == [{ - 'upper_content': 'HELLO', - 'content': 'hello', - }, { - 'upper_content': 'WORLD', - 'content': 'world', - }, { - 'upper_content': '', - 'content': '', - }] - - -def test_row(app_client): - response = app_client.get( - '/test_tables/simple_primary_key/1', - allow_redirects=False, - gather_request=False - ) - assert response.status == 302 - assert response.headers['Location'].endswith('/1') - response = app_client.get('/test_tables/simple_primary_key/1', gather_request=False) - assert response.status == 200 - response = app_client.get('/test_tables/simple_primary_key/1.jsono', gather_request=False) - assert response.status == 200 - assert [{'pk': '1', 'content': 'hello'}] == response.json['rows'] - - -def test_row_foreign_key_tables(app_client): - response = app_client.get('/test_tables/simple_primary_key/1.json?_extras=foreign_key_tables', gather_request=False) - assert response.status == 200 - assert [{ - 'column': 'id', - 'count': 1, - 'other_column': 'f3', - 'other_table': 'complex_foreign_keys' - }, { - 'column': 'id', - 'count': 0, - 'other_column': 'f2', - 'other_table': 'complex_foreign_keys' - }, { - 'column': 'id', - 'count': 1, - 'other_column': 'f1', - 'other_table': 'complex_foreign_keys' - }] == response.json['foreign_key_tables'] - - -def test_add_filter_redirects(app_client): - filter_args = urllib.parse.urlencode({ - '_filter_column': 'content', - '_filter_op': 'startswith', - '_filter_value': 'x' - }) - # First we need to resolve the correct path before testing more redirects - path_base = app_client.get( - '/test_tables/simple_primary_key', allow_redirects=False, gather_request=False - ).headers['Location'] - path = path_base + '?' + filter_args - response = app_client.get(path, allow_redirects=False, gather_request=False) - assert response.status == 302 - assert response.headers['Location'].endswith('?content__startswith=x') - - # Adding a redirect to an existing querystring: - path = path_base + '?foo=bar&' + filter_args - response = app_client.get(path, allow_redirects=False, gather_request=False) - assert response.status == 302 - assert response.headers['Location'].endswith('?content__startswith=x&foo=bar') - - # Test that op with a __x suffix overrides the filter value - path = path_base + '?' + urllib.parse.urlencode({ - '_filter_column': 'content', - '_filter_op': 'isnull__5', - '_filter_value': 'x' - }) - response = app_client.get(path, allow_redirects=False, gather_request=False) - assert response.status == 302 - assert response.headers['Location'].endswith('?content__isnull=5') - - -def test_existing_filter_redirects(app_client): - filter_args = { - '_filter_column_1': 'name', - '_filter_op_1': 'contains', - '_filter_value_1': 'hello', - '_filter_column_2': 'age', - '_filter_op_2': 'gte', - '_filter_value_2': '22', - '_filter_column_3': 'age', - '_filter_op_3': 'lt', - '_filter_value_3': '30', - '_filter_column_4': 'name', - '_filter_op_4': 'contains', - '_filter_value_4': 'world', - } - path_base = app_client.get( - '/test_tables/simple_primary_key', allow_redirects=False, gather_request=False - ).headers['Location'] - path = path_base + '?' + urllib.parse.urlencode(filter_args) - response = app_client.get(path, allow_redirects=False, gather_request=False) - assert response.status == 302 - assert response.headers['Location'].endswith( - '?age__gte=22&age__lt=30&name__contains=hello&name__contains=world' - ) - - # Setting _filter_column_3 to empty string should remove *_3 entirely - filter_args['_filter_column_3'] = '' - path = path_base + '?' + urllib.parse.urlencode(filter_args) - response = app_client.get(path, allow_redirects=False, gather_request=False) - assert response.status == 302 - assert response.headers['Location'].endswith( - '?age__gte=22&name__contains=hello&name__contains=world' - ) - - # ?_filter_op=exact should be removed if unaccompanied by _fiter_column - response = app_client.get(path_base + '?_filter_op=exact', allow_redirects=False, gather_request=False) - assert response.status == 302 - assert '?' not in response.headers['Location'] - - -def test_empty_search_parameter_gets_removed(app_client): - path_base = app_client.get( - '/test_tables/simple_primary_key', allow_redirects=False, gather_request=False - ).headers['Location'] - path = path_base + '?' + urllib.parse.urlencode({ - '_search': '', - '_filter_column': 'name', - '_filter_op': 'exact', - '_filter_value': 'chidi', - }) - response = app_client.get(path, allow_redirects=False, gather_request=False) - assert response.status == 302 - assert response.headers['Location'].endswith( - '?name__exact=chidi' - ) - - -@pytest.mark.parametrize('path,expected_classes', [ - ('/', ['index']), - ('/test_tables', ['db', 'db-test_tables']), - ('/test_tables/simple_primary_key', [ - 'table', 'db-test_tables', 'table-simple_primary_key' - ]), - ('/test_tables/table%2Fwith%2Fslashes.csv', [ - 'table', 'db-test_tables', 'table-tablewithslashescsv-fa7563' - ]), - ('/test_tables/simple_primary_key/1', [ - 'row', 'db-test_tables', 'table-simple_primary_key' - ]), -]) -def test_css_classes_on_body(app_client, path, expected_classes): - response = app_client.get(path, gather_request=False) - classes = re.search(r'', response.text).group(1).split() - assert classes == expected_classes - - -def test_table_html_simple_primary_key(app_client): - response = app_client.get('/test_tables/simple_primary_key', gather_request=False) - table = Soup(response.body, 'html.parser').find('table') - assert [ - 'Link', 'pk', 'content' - ] == [th.string for th in table.select('thead th')] - assert [ - [ - '1', - '1', - 'hello' - ], [ - '2', - '2', - 'world' - ], [ - '3', - '3', - '' - ] - ] == [[str(td) for td in tr.select('td')] for tr in table.select('tbody tr')] - - -def test_row_html_simple_primary_key(app_client): - response = app_client.get('/test_tables/simple_primary_key/1', gather_request=False) - table = Soup(response.body, 'html.parser').find('table') - assert [ - 'pk', 'content' - ] == [th.string for th in table.select('thead th')] - assert [ - [ - '1', - 'hello' - ] - ] == [[str(td) for td in tr.select('td')] for tr in table.select('tbody tr')] - - -def test_table_html_no_primary_key(app_client): - response = app_client.get('/test_tables/no_primary_key', gather_request=False) - table = Soup(response.body, 'html.parser').find('table') - assert [ - 'Link', 'rowid', 'content', 'a', 'b', 'c' - ] == [th.string for th in table.select('thead th')] - expected = [ - [ - '{}'.format(i, i), - '{}'.format(i), - '{}'.format(i), - 'a{}'.format(i), - 'b{}'.format(i), - 'c{}'.format(i), - ] for i in range(1, 51) - ] - assert expected == [[str(td) for td in tr.select('td')] for tr in table.select('tbody tr')] - - -def test_row_html_no_primary_key(app_client): - response = app_client.get('/test_tables/no_primary_key/1', gather_request=False) - table = Soup(response.body, 'html.parser').find('table') - assert [ - 'rowid', 'content', 'a', 'b', 'c' - ] == [th.string for th in table.select('thead th')] - expected = [ - [ - '1', - '1', - 'a1', - 'b1', - 'c1', - ] - ] - assert expected == [[str(td) for td in tr.select('td')] for tr in table.select('tbody tr')] - - -def test_table_html_compound_primary_key(app_client): - response = app_client.get('/test_tables/compound_primary_key', gather_request=False) - table = Soup(response.body, 'html.parser').find('table') - assert [ - 'Link', 'pk1', 'pk2', 'content' - ] == [th.string for th in table.select('thead th')] - expected = [ - [ - 'a,b', - 'a', - 'b', - 'c', - ] - ] - assert expected == [[str(td) for td in tr.select('td')] for tr in table.select('tbody tr')] - - -def test_row_html_compound_primary_key(app_client): - response = app_client.get('/test_tables/compound_primary_key/a,b', gather_request=False) - table = Soup(response.body, 'html.parser').find('table') - assert [ - 'pk1', 'pk2', 'content' - ] == [th.string for th in table.select('thead th')] - expected = [ - [ - 'a', - 'b', - 'c', - ] - ] - assert expected == [[str(td) for td in tr.select('td')] for tr in table.select('tbody tr')] - - -def test_view_html(app_client): - response = app_client.get('/test_tables/simple_view', gather_request=False) - table = Soup(response.body, 'html.parser').find('table') - assert [ - 'content', 'upper_content' - ] == [th.string for th in table.select('thead th')] - expected = [ - [ - 'hello', - 'HELLO' - ], [ - 'world', - 'WORLD' - ], [ - '', - '' - ] - ] - assert expected == [[str(td) for td in tr.select('td')] for tr in table.select('tbody tr')] - - -TABLES = ''' -CREATE TABLE simple_primary_key ( - pk varchar(30) primary key, - content text -); - -CREATE TABLE compound_primary_key ( - pk1 varchar(30), - pk2 varchar(30), - content text, - PRIMARY KEY (pk1, pk2) -); - -INSERT INTO compound_primary_key VALUES ('a', 'b', 'c'); - -CREATE TABLE no_primary_key ( - content text, - a text, - b text, - c text -); - -CREATE TABLE [123_starts_with_digits] ( - content text -); - -CREATE VIEW paginated_view AS - SELECT - content, - '- ' || content || ' -' AS content_extra - FROM no_primary_key; - -CREATE TABLE "Table With Space In Name" ( - pk varchar(30) primary key, - content text -); - -CREATE TABLE "table/with/slashes.csv" ( - pk varchar(30) primary key, - content text -); - -CREATE TABLE "complex_foreign_keys" ( - pk varchar(30) primary key, - f1 text, - f2 text, - f3 text, - FOREIGN KEY ("f1") REFERENCES [simple_primary_key](id), - FOREIGN KEY ("f2") REFERENCES [simple_primary_key](id), - FOREIGN KEY ("f3") REFERENCES [simple_primary_key](id) -); - -INSERT INTO simple_primary_key VALUES (1, 'hello'); -INSERT INTO simple_primary_key VALUES (2, 'world'); -INSERT INTO simple_primary_key VALUES (3, ''); - -INSERT INTO complex_foreign_keys VALUES (1, 1, 2, 1); - -INSERT INTO [table/with/slashes.csv] VALUES (3, 'hey'); - -CREATE VIEW simple_view AS - SELECT content, upper(content) AS upper_content FROM simple_primary_key; - -''' + '\n'.join([ - 'INSERT INTO no_primary_key VALUES ({i}, "a{i}", "b{i}", "c{i}");'.format(i=i + 1) - for i in range(201) -]) diff --git a/tests/test_html.py b/tests/test_html.py new file mode 100644 index 00000000..50d26703 --- /dev/null +++ b/tests/test_html.py @@ -0,0 +1,282 @@ +from bs4 import BeautifulSoup as Soup +from .fixtures import app_client +import pytest +import re +import urllib.parse + +pytest.fixture(scope='module')(app_client) + + +def test_homepage(app_client): + response = app_client.get('/', gather_request=False) + assert response.status == 200 + assert 'test_tables' in response.text + + +def test_database_page(app_client): + response = app_client.get('/test_tables', allow_redirects=False, gather_request=False) + assert response.status == 302 + response = app_client.get('/test_tables', gather_request=False) + assert 'test_tables' in response.text + + +def test_invalid_custom_sql(app_client): + response = app_client.get( + '/test_tables?sql=.schema', + gather_request=False + ) + assert response.status == 400 + assert 'Statement must be a SELECT' in response.text + + +def test_view(app_client): + response = app_client.get('/test_tables/simple_view', gather_request=False) + assert response.status == 200 + + +def test_row(app_client): + response = app_client.get( + '/test_tables/simple_primary_key/1', + allow_redirects=False, + gather_request=False + ) + assert response.status == 302 + assert response.headers['Location'].endswith('/1') + response = app_client.get('/test_tables/simple_primary_key/1', gather_request=False) + assert response.status == 200 + + +def test_add_filter_redirects(app_client): + filter_args = urllib.parse.urlencode({ + '_filter_column': 'content', + '_filter_op': 'startswith', + '_filter_value': 'x' + }) + # First we need to resolve the correct path before testing more redirects + path_base = app_client.get( + '/test_tables/simple_primary_key', allow_redirects=False, gather_request=False + ).headers['Location'] + path = path_base + '?' + filter_args + response = app_client.get(path, allow_redirects=False, gather_request=False) + assert response.status == 302 + assert response.headers['Location'].endswith('?content__startswith=x') + + # Adding a redirect to an existing querystring: + path = path_base + '?foo=bar&' + filter_args + response = app_client.get(path, allow_redirects=False, gather_request=False) + assert response.status == 302 + assert response.headers['Location'].endswith('?content__startswith=x&foo=bar') + + # Test that op with a __x suffix overrides the filter value + path = path_base + '?' + urllib.parse.urlencode({ + '_filter_column': 'content', + '_filter_op': 'isnull__5', + '_filter_value': 'x' + }) + response = app_client.get(path, allow_redirects=False, gather_request=False) + assert response.status == 302 + assert response.headers['Location'].endswith('?content__isnull=5') + + +def test_existing_filter_redirects(app_client): + filter_args = { + '_filter_column_1': 'name', + '_filter_op_1': 'contains', + '_filter_value_1': 'hello', + '_filter_column_2': 'age', + '_filter_op_2': 'gte', + '_filter_value_2': '22', + '_filter_column_3': 'age', + '_filter_op_3': 'lt', + '_filter_value_3': '30', + '_filter_column_4': 'name', + '_filter_op_4': 'contains', + '_filter_value_4': 'world', + } + path_base = app_client.get( + '/test_tables/simple_primary_key', allow_redirects=False, gather_request=False + ).headers['Location'] + path = path_base + '?' + urllib.parse.urlencode(filter_args) + response = app_client.get(path, allow_redirects=False, gather_request=False) + assert response.status == 302 + assert response.headers['Location'].endswith( + '?age__gte=22&age__lt=30&name__contains=hello&name__contains=world' + ) + + # Setting _filter_column_3 to empty string should remove *_3 entirely + filter_args['_filter_column_3'] = '' + path = path_base + '?' + urllib.parse.urlencode(filter_args) + response = app_client.get(path, allow_redirects=False, gather_request=False) + assert response.status == 302 + assert response.headers['Location'].endswith( + '?age__gte=22&name__contains=hello&name__contains=world' + ) + + # ?_filter_op=exact should be removed if unaccompanied by _fiter_column + response = app_client.get(path_base + '?_filter_op=exact', allow_redirects=False, gather_request=False) + assert response.status == 302 + assert '?' not in response.headers['Location'] + + +def test_empty_search_parameter_gets_removed(app_client): + path_base = app_client.get( + '/test_tables/simple_primary_key', allow_redirects=False, gather_request=False + ).headers['Location'] + path = path_base + '?' + urllib.parse.urlencode({ + '_search': '', + '_filter_column': 'name', + '_filter_op': 'exact', + '_filter_value': 'chidi', + }) + response = app_client.get(path, allow_redirects=False, gather_request=False) + assert response.status == 302 + assert response.headers['Location'].endswith( + '?name__exact=chidi' + ) + + +@pytest.mark.parametrize('path,expected_classes', [ + ('/', ['index']), + ('/test_tables', ['db', 'db-test_tables']), + ('/test_tables/simple_primary_key', [ + 'table', 'db-test_tables', 'table-simple_primary_key' + ]), + ('/test_tables/table%2Fwith%2Fslashes.csv', [ + 'table', 'db-test_tables', 'table-tablewithslashescsv-fa7563' + ]), + ('/test_tables/simple_primary_key/1', [ + 'row', 'db-test_tables', 'table-simple_primary_key' + ]), +]) +def test_css_classes_on_body(app_client, path, expected_classes): + response = app_client.get(path, gather_request=False) + classes = re.search(r'', response.text).group(1).split() + assert classes == expected_classes + + +def test_table_html_simple_primary_key(app_client): + response = app_client.get('/test_tables/simple_primary_key', gather_request=False) + table = Soup(response.body, 'html.parser').find('table') + assert [ + 'Link', 'pk', 'content' + ] == [th.string for th in table.select('thead th')] + assert [ + [ + '1', + '1', + 'hello' + ], [ + '2', + '2', + 'world' + ], [ + '3', + '3', + '' + ] + ] == [[str(td) for td in tr.select('td')] for tr in table.select('tbody tr')] + + +def test_row_html_simple_primary_key(app_client): + response = app_client.get('/test_tables/simple_primary_key/1', gather_request=False) + table = Soup(response.body, 'html.parser').find('table') + assert [ + 'pk', 'content' + ] == [th.string for th in table.select('thead th')] + assert [ + [ + '1', + 'hello' + ] + ] == [[str(td) for td in tr.select('td')] for tr in table.select('tbody tr')] + + +def test_table_html_no_primary_key(app_client): + response = app_client.get('/test_tables/no_primary_key', gather_request=False) + table = Soup(response.body, 'html.parser').find('table') + assert [ + 'Link', 'rowid', 'content', 'a', 'b', 'c' + ] == [th.string for th in table.select('thead th')] + expected = [ + [ + '{}'.format(i, i), + '{}'.format(i), + '{}'.format(i), + 'a{}'.format(i), + 'b{}'.format(i), + 'c{}'.format(i), + ] for i in range(1, 51) + ] + assert expected == [[str(td) for td in tr.select('td')] for tr in table.select('tbody tr')] + + +def test_row_html_no_primary_key(app_client): + response = app_client.get('/test_tables/no_primary_key/1', gather_request=False) + table = Soup(response.body, 'html.parser').find('table') + assert [ + 'rowid', 'content', 'a', 'b', 'c' + ] == [th.string for th in table.select('thead th')] + expected = [ + [ + '1', + '1', + 'a1', + 'b1', + 'c1', + ] + ] + assert expected == [[str(td) for td in tr.select('td')] for tr in table.select('tbody tr')] + + +def test_table_html_compound_primary_key(app_client): + response = app_client.get('/test_tables/compound_primary_key', gather_request=False) + table = Soup(response.body, 'html.parser').find('table') + assert [ + 'Link', 'pk1', 'pk2', 'content' + ] == [th.string for th in table.select('thead th')] + expected = [ + [ + 'a,b', + 'a', + 'b', + 'c', + ] + ] + assert expected == [[str(td) for td in tr.select('td')] for tr in table.select('tbody tr')] + + +def test_row_html_compound_primary_key(app_client): + response = app_client.get('/test_tables/compound_primary_key/a,b', gather_request=False) + table = Soup(response.body, 'html.parser').find('table') + assert [ + 'pk1', 'pk2', 'content' + ] == [th.string for th in table.select('thead th')] + expected = [ + [ + 'a', + 'b', + 'c', + ] + ] + assert expected == [[str(td) for td in tr.select('td')] for tr in table.select('tbody tr')] + + +def test_view_html(app_client): + response = app_client.get('/test_tables/simple_view', gather_request=False) + table = Soup(response.body, 'html.parser').find('table') + assert [ + 'content', 'upper_content' + ] == [th.string for th in table.select('thead th')] + expected = [ + [ + 'hello', + 'HELLO' + ], [ + 'world', + 'WORLD' + ], [ + '', + '' + ] + ] + assert expected == [[str(td) for td in tr.select('td')] for tr in table.select('tbody tr')]