2017-11-10 18:48:16 +00:00
"""
Tests for various datasette helper functions .
"""
2017-11-10 19:25:54 +00:00
from datasette import utils
2019-06-24 03:13:09 +00:00
from datasette . utils . asgi import Request
2019-04-15 21:51:20 +00:00
from datasette . filters import Filters
2017-12-08 16:06:24 +00:00
import json
import os
2020-02-15 17:56:48 +00:00
import pathlib
2017-10-24 05:54:58 +00:00
import pytest
2019-04-07 01:58:51 +00:00
import sqlite3
2017-12-08 16:06:24 +00:00
import tempfile
from unittest . mock import patch
2017-10-24 05:54:58 +00:00
2019-05-04 02:15:14 +00:00
@pytest.mark.parametrize (
" path,expected " ,
[
( " foo " , [ " foo " ] ) ,
( " foo,bar " , [ " foo " , " bar " ] ) ,
( " 123,433,112 " , [ " 123 " , " 433 " , " 112 " ] ) ,
( " 123 % 2C433,112 " , [ " 123,433 " , " 112 " ] ) ,
( " 123 %2F 433 %2F 112 " , [ " 123/433/112 " ] ) ,
] ,
)
2018-04-09 00:06:10 +00:00
def test_urlsafe_components ( path , expected ) :
assert expected == utils . urlsafe_components ( path )
2017-10-24 05:54:58 +00:00
2019-05-04 02:15:14 +00:00
@pytest.mark.parametrize (
" path,added_args,expected " ,
[
( " /foo " , { " bar " : 1 } , " /foo?bar=1 " ) ,
( " /foo?bar=1 " , { " baz " : 2 } , " /foo?bar=1&baz=2 " ) ,
( " /foo?bar=1&bar=2 " , { " baz " : 3 } , " /foo?bar=1&bar=2&baz=3 " ) ,
( " /foo?bar=1 " , { " bar " : None } , " /foo " ) ,
# Test order is preserved
(
" /?_facet=prim_state&_facet=area_name " ,
( ( " prim_state " , " GA " ) , ) ,
" /?_facet=prim_state&_facet=area_name&prim_state=GA " ,
) ,
(
" /?_facet=state&_facet=city&state=MI " ,
( ( " city " , " Detroit " ) , ) ,
" /?_facet=state&_facet=city&state=MI&city=Detroit " ,
) ,
(
" /?_facet=state&_facet=city " ,
( ( " _facet " , " planet_int " ) , ) ,
" /?_facet=state&_facet=city&_facet=planet_int " ,
) ,
] ,
)
2018-05-12 21:35:25 +00:00
def test_path_with_added_args ( path , added_args , expected ) :
2019-06-24 03:13:09 +00:00
request = Request . fake ( path )
2018-05-12 21:35:25 +00:00
actual = utils . path_with_added_args ( request , added_args )
assert expected == actual
2019-05-04 02:15:14 +00:00
@pytest.mark.parametrize (
" path,args,expected " ,
[
( " /foo?bar=1 " , { " bar " } , " /foo " ) ,
( " /foo?bar=1&baz=2 " , { " bar " } , " /foo?baz=2 " ) ,
( " /foo?bar=1&bar=2&bar=3 " , { " bar " : " 2 " } , " /foo?bar=1&bar=3 " ) ,
] ,
)
2018-05-14 20:42:10 +00:00
def test_path_with_removed_args ( path , args , expected ) :
2019-06-24 03:13:09 +00:00
request = Request . fake ( path )
2018-05-14 20:42:10 +00:00
actual = utils . path_with_removed_args ( request , args )
assert expected == actual
2019-03-17 22:55:04 +00:00
# Run the test again but this time use the path= argument
2019-06-24 03:13:09 +00:00
request = Request . fake ( " / " )
2019-03-17 22:55:04 +00:00
actual = utils . path_with_removed_args ( request , args , path = path )
assert expected == actual
2018-05-14 20:42:10 +00:00
2019-05-04 02:15:14 +00:00
@pytest.mark.parametrize (
" path,args,expected " ,
[
( " /foo?bar=1 " , { " bar " : 2 } , " /foo?bar=2 " ) ,
( " /foo?bar=1&baz=2 " , { " bar " : None } , " /foo?baz=2 " ) ,
] ,
)
2018-05-15 09:34:45 +00:00
def test_path_with_replaced_args ( path , args , expected ) :
2019-06-24 03:13:09 +00:00
request = Request . fake ( path )
2018-05-15 09:34:45 +00:00
actual = utils . path_with_replaced_args ( request , args )
assert expected == actual
2018-06-21 14:56:28 +00:00
@pytest.mark.parametrize (
" row,pks,expected_path " ,
[
( { " A " : " foo " , " B " : " bar " } , [ " A " , " B " ] , " foo,bar " ) ,
( { " A " : " f,o " , " B " : " bar " } , [ " A " , " B " ] , " f % 2Co,bar " ) ,
( { " A " : 123 } , [ " A " ] , " 123 " ) ,
(
utils . CustomRow (
[ " searchable_id " , " tag " ] ,
[
2019-05-04 02:15:14 +00:00
( " searchable_id " , { " value " : 1 , " label " : " 1 " } ) ,
( " tag " , { " value " : " feline " , " label " : " feline " } ) ,
2018-06-21 14:56:28 +00:00
] ,
) ,
[ " searchable_id " , " tag " ] ,
" 1,feline " ,
) ,
] ,
)
2017-10-24 05:54:58 +00:00
def test_path_from_row_pks ( row , pks , expected_path ) :
2017-11-10 19:25:54 +00:00
actual_path = utils . path_from_row_pks ( row , pks , False )
2017-10-24 05:54:58 +00:00
assert expected_path == actual_path
2017-10-24 14:58:41 +00:00
2019-05-04 02:15:14 +00:00
@pytest.mark.parametrize (
" obj,expected " ,
[
(
{
" Description " : " Soft drinks " ,
" Picture " : b " \x15 \x1c \x02 \xc7 \xad \x05 \xfe " ,
" CategoryID " : 1 ,
} ,
"""
2017-10-24 14:58:41 +00:00
{ " CategoryID " : 1 , " Description " : " Soft drinks " , " Picture " : { " $base64 " : true , " encoded " : " FRwCx60F/g== " } }
2019-05-04 02:15:14 +00:00
""" .strip(),
)
] ,
)
2017-10-24 14:58:41 +00:00
def test_custom_json_encoder ( obj , expected ) :
2019-05-04 02:15:14 +00:00
actual = json . dumps ( obj , cls = utils . CustomJSONEncoder , sort_keys = True )
2017-10-24 14:58:41 +00:00
assert expected == actual
2017-10-25 00:06:23 +00:00
2019-05-04 02:15:14 +00:00
@pytest.mark.parametrize (
" bad_sql " ,
[
" update blah; " ,
2020-02-05 02:13:24 +00:00
" -- sql comment to skip \n update blah; " ,
" update blah set some_column= ' # Hello there \n \n * This is a list \n * of items \n -- \n [And a link](https://github.com/simonw/datasette-render-markdown). ' \n as demo_markdown " ,
2020-05-06 17:18:31 +00:00
" PRAGMA case_sensitive_like = true " ,
" SELECT * FROM pragma_not_on_allow_list( ' idx52 ' ) " ,
2019-05-04 02:15:14 +00:00
] ,
)
2017-11-05 02:49:18 +00:00
def test_validate_sql_select_bad ( bad_sql ) :
2017-11-10 19:25:54 +00:00
with pytest . raises ( utils . InvalidSql ) :
utils . validate_sql_select ( bad_sql )
2017-11-05 02:49:18 +00:00
2019-05-04 02:15:14 +00:00
@pytest.mark.parametrize (
" good_sql " ,
[
" select count(*) from airports " ,
" select foo from bar " ,
2020-02-05 02:13:24 +00:00
" --sql comment to skip \n select foo from bar " ,
" select ' # Hello there \n \n * This is a list \n * of items \n -- \n [And a link](https://github.com/simonw/datasette-render-markdown). ' \n as demo_markdown " ,
2019-05-04 02:15:14 +00:00
" select 1 + 1 " ,
2019-10-06 17:23:58 +00:00
" explain select 1 + 1 " ,
" explain query plan select 1 + 1 " ,
2019-05-04 02:15:14 +00:00
" SELECT \n blah FROM foo " ,
" WITH RECURSIVE cnt(x) AS (SELECT 1 UNION ALL SELECT x+1 FROM cnt LIMIT 10) SELECT x FROM cnt; " ,
2019-10-06 17:23:58 +00:00
" explain WITH RECURSIVE cnt(x) AS (SELECT 1 UNION ALL SELECT x+1 FROM cnt LIMIT 10) SELECT x FROM cnt; " ,
" explain query plan WITH RECURSIVE cnt(x) AS (SELECT 1 UNION ALL SELECT x+1 FROM cnt LIMIT 10) SELECT x FROM cnt; " ,
2020-05-06 17:18:31 +00:00
" SELECT * FROM pragma_index_info( ' idx52 ' ) " ,
" select * from pragma_table_xinfo( ' table ' ) " ,
2019-05-04 02:15:14 +00:00
] ,
)
2017-11-05 02:49:18 +00:00
def test_validate_sql_select_good ( good_sql ) :
2017-11-10 19:25:54 +00:00
utils . validate_sql_select ( good_sql )
2017-11-19 16:59:26 +00:00
2019-09-03 00:32:27 +00:00
@pytest.mark.parametrize ( " open_quote,close_quote " , [ ( ' " ' , ' " ' ) , ( " [ " , " ] " ) ] )
def test_detect_fts ( open_quote , close_quote ) :
2019-05-04 02:15:14 +00:00
sql = """
2017-11-19 16:59:26 +00:00
CREATE TABLE " Dumb_Table " (
" TreeID " INTEGER ,
" qSpecies " TEXT
) ;
CREATE TABLE " Street_Tree_List " (
" TreeID " INTEGER ,
" qSpecies " TEXT ,
" qAddress " TEXT ,
" SiteOrder " INTEGER ,
" qSiteInfo " TEXT ,
" PlantType " TEXT ,
" qCaretaker " TEXT
) ;
2017-11-24 22:51:00 +00:00
CREATE VIEW Test_View AS SELECT * FROM Dumb_Table ;
2019-09-03 00:32:27 +00:00
CREATE VIRTUAL TABLE { open } Street_Tree_List_fts { close } USING FTS4 ( " qAddress " , " qCaretaker " , " qSpecies " , content = { open } Street_Tree_List { close } ) ;
2017-12-07 04:54:25 +00:00
CREATE VIRTUAL TABLE r USING rtree ( a , b , c ) ;
2019-09-03 00:32:27 +00:00
""" .format(
open = open_quote , close = close_quote
)
2019-05-04 02:15:14 +00:00
conn = utils . sqlite3 . connect ( " :memory: " )
2017-11-19 16:59:26 +00:00
conn . executescript ( sql )
2019-05-04 02:15:14 +00:00
assert None is utils . detect_fts ( conn , " Dumb_Table " )
assert None is utils . detect_fts ( conn , " Test_View " )
assert None is utils . detect_fts ( conn , " r " )
assert " Street_Tree_List_fts " == utils . detect_fts ( conn , " Street_Tree_List " )
@pytest.mark.parametrize (
" url,expected " ,
[
( " http://www.google.com/ " , True ) ,
( " https://example.com/ " , True ) ,
( " www.google.com " , False ) ,
( " http://www.google.com/ is a search engine " , False ) ,
] ,
)
2017-11-29 17:05:24 +00:00
def test_is_url ( url , expected ) :
assert expected == utils . is_url ( url )
2017-11-30 07:09:54 +00:00
2019-05-04 02:15:14 +00:00
@pytest.mark.parametrize (
" s,expected " ,
[
( " simple " , " simple " ) ,
( " MixedCase " , " MixedCase " ) ,
( " -no-leading-hyphens " , " no-leading-hyphens-65bea6 " ) ,
( " _no-leading-underscores " , " no-leading-underscores-b921bc " ) ,
( " no spaces " , " no-spaces-7088d7 " ) ,
( " - " , " 336d5e " ) ,
( " no $ characters " , " no--characters-59e024 " ) ,
] ,
)
2017-11-30 07:09:54 +00:00
def test_to_css_class ( s , expected ) :
assert expected == utils . to_css_class ( s )
2017-12-08 16:06:24 +00:00
def test_temporary_docker_directory_uses_hard_link ( ) :
with tempfile . TemporaryDirectory ( ) as td :
os . chdir ( td )
2019-05-04 02:15:14 +00:00
open ( " hello " , " w " ) . write ( " world " )
2017-12-08 16:06:24 +00:00
# Default usage of this should use symlink
with utils . temporary_docker_directory (
2019-05-04 02:15:14 +00:00
files = [ " hello " ] ,
name = " t " ,
2017-12-08 16:06:24 +00:00
metadata = None ,
2017-12-09 18:38:04 +00:00
extra_options = None ,
branch = None ,
template_dir = None ,
2018-04-16 05:22:01 +00:00
plugins_dir = None ,
2017-12-09 18:38:04 +00:00
static = [ ] ,
2018-04-18 14:48:34 +00:00
install = [ ] ,
2018-05-31 14:47:22 +00:00
spatialite = False ,
2018-06-17 20:14:55 +00:00
version_note = None ,
2017-12-08 16:06:24 +00:00
) as temp_docker :
2019-05-04 02:15:14 +00:00
hello = os . path . join ( temp_docker , " hello " )
assert " world " == open ( hello ) . read ( )
2017-12-08 16:06:24 +00:00
# It should be a hard link
assert 2 == os . stat ( hello ) . st_nlink
2019-05-04 02:15:14 +00:00
@patch ( " os.link " )
2017-12-08 16:06:24 +00:00
def test_temporary_docker_directory_uses_copy_if_hard_link_fails ( mock_link ) :
# Copy instead if os.link raises OSError (normally due to different device)
mock_link . side_effect = OSError
with tempfile . TemporaryDirectory ( ) as td :
os . chdir ( td )
2019-05-04 02:15:14 +00:00
open ( " hello " , " w " ) . write ( " world " )
2017-12-08 16:06:24 +00:00
# Default usage of this should use symlink
with utils . temporary_docker_directory (
2019-05-04 02:15:14 +00:00
files = [ " hello " ] ,
name = " t " ,
2017-12-08 16:06:24 +00:00
metadata = None ,
2017-12-09 18:38:04 +00:00
extra_options = None ,
branch = None ,
template_dir = None ,
2018-04-16 05:22:01 +00:00
plugins_dir = None ,
2017-12-09 18:38:04 +00:00
static = [ ] ,
2018-04-18 14:48:34 +00:00
install = [ ] ,
2018-05-31 14:47:22 +00:00
spatialite = False ,
2018-06-17 20:14:55 +00:00
version_note = None ,
2017-12-08 16:06:24 +00:00
) as temp_docker :
2019-05-04 02:15:14 +00:00
hello = os . path . join ( temp_docker , " hello " )
assert " world " == open ( hello ) . read ( )
2017-12-08 16:06:24 +00:00
# It should be a copy, not a hard link
assert 1 == os . stat ( hello ) . st_nlink
2018-03-30 05:10:09 +00:00
2019-05-03 13:59:01 +00:00
def test_temporary_docker_directory_quotes_args ( ) :
2019-05-04 02:15:14 +00:00
with tempfile . TemporaryDirectory ( ) as td :
2019-05-03 13:59:01 +00:00
os . chdir ( td )
2019-05-04 02:15:14 +00:00
open ( " hello " , " w " ) . write ( " world " )
2019-05-03 13:59:01 +00:00
with utils . temporary_docker_directory (
2019-05-04 02:15:14 +00:00
files = [ " hello " ] ,
name = " t " ,
2019-05-03 13:59:01 +00:00
metadata = None ,
2019-05-04 02:15:14 +00:00
extra_options = " --$HOME " ,
2019-05-03 13:59:01 +00:00
branch = None ,
template_dir = None ,
plugins_dir = None ,
static = [ ] ,
install = [ ] ,
spatialite = False ,
2019-05-04 02:15:14 +00:00
version_note = " $PWD " ,
2019-05-03 13:59:01 +00:00
) as temp_docker :
2019-05-04 02:15:14 +00:00
df = os . path . join ( temp_docker , " Dockerfile " )
2019-05-03 13:59:01 +00:00
df_contents = open ( df ) . read ( )
assert " ' $PWD ' " in df_contents
assert " ' --$HOME ' " in df_contents
2018-03-30 05:10:09 +00:00
def test_compound_keys_after_sql ( ) :
2019-05-04 02:15:14 +00:00
assert " ((a > :p0)) " == utils . compound_keys_after_sql ( [ " a " ] )
assert """
2018-04-03 13:39:50 +00:00
( ( a > : p0 )
2018-03-30 05:10:09 +00:00
or
2018-04-03 13:39:50 +00:00
( a = : p0 and b > : p1 ) )
2019-05-04 02:15:14 +00:00
""" .strip() == utils.compound_keys_after_sql(
[ " a " , " b " ]
)
assert """
2018-04-03 13:39:50 +00:00
( ( a > : p0 )
2018-03-30 05:10:09 +00:00
or
2018-04-03 13:39:50 +00:00
( a = : p0 and b > : p1 )
2018-03-30 05:10:09 +00:00
or
2018-04-03 13:39:50 +00:00
( a = : p0 and b = : p1 and c > : p2 ) )
2019-05-04 02:15:14 +00:00
""" .strip() == utils.compound_keys_after_sql(
[ " a " , " b " , " c " ]
)
2018-06-15 06:51:23 +00:00
2019-03-31 18:02:22 +00:00
async def table_exists ( table ) :
2018-06-15 06:51:23 +00:00
return table == " exists.csv "
2019-03-31 18:02:22 +00:00
@pytest.mark.asyncio
2018-06-15 06:51:23 +00:00
@pytest.mark.parametrize (
" table_and_format,expected_table,expected_format " ,
[
( " blah " , " blah " , None ) ,
( " blah.csv " , " blah " , " csv " ) ,
( " blah.json " , " blah " , " json " ) ,
( " blah.baz " , " blah.baz " , None ) ,
( " exists.csv " , " exists.csv " , None ) ,
] ,
)
2019-03-31 18:02:22 +00:00
async def test_resolve_table_and_format (
2018-06-15 06:51:23 +00:00
table_and_format , expected_table , expected_format
) :
2019-03-31 18:02:22 +00:00
actual_table , actual_format = await utils . resolve_table_and_format (
2019-05-04 02:15:14 +00:00
table_and_format , table_exists , [ " json " ]
2018-06-15 06:51:23 +00:00
)
assert expected_table == actual_table
assert expected_format == actual_format
2019-04-07 01:58:51 +00:00
def test_table_columns ( ) :
conn = sqlite3 . connect ( " :memory: " )
2019-05-04 02:15:14 +00:00
conn . executescript (
"""
2019-04-07 01:58:51 +00:00
create table places ( id integer primary key , name text , bob integer )
2019-05-04 02:15:14 +00:00
"""
)
2019-04-07 01:58:51 +00:00
assert [ " id " , " name " , " bob " ] == utils . table_columns ( conn , " places " )
2018-06-15 06:51:23 +00:00
@pytest.mark.parametrize (
" path,format,extra_qs,expected " ,
[
( " /foo?sql=select+1 " , " csv " , { } , " /foo.csv?sql=select+1 " ) ,
( " /foo?sql=select+1 " , " json " , { } , " /foo.json?sql=select+1 " ) ,
( " /foo/bar " , " json " , { } , " /foo/bar.json " ) ,
( " /foo/bar " , " csv " , { } , " /foo/bar.csv " ) ,
( " /foo/bar.csv " , " json " , { } , " /foo/bar.csv?_format=json " ) ,
( " /foo/bar " , " csv " , { " _dl " : 1 } , " /foo/bar.csv?_dl=1 " ) ,
( " /foo/b.csv " , " json " , { " _dl " : 1 } , " /foo/b.csv?_dl=1&_format=json " ) ,
(
" /sf-trees/Street_Tree_List?_search=cherry&_size=1000 " ,
" csv " ,
{ " _dl " : 1 } ,
" /sf-trees/Street_Tree_List.csv?_search=cherry&_size=1000&_dl=1 " ,
) ,
] ,
)
def test_path_with_format ( path , format , extra_qs , expected ) :
2019-06-24 03:13:09 +00:00
request = Request . fake ( path )
2018-06-15 06:51:23 +00:00
actual = utils . path_with_format ( request , format , extra_qs )
assert expected == actual
2019-02-06 04:53:44 +00:00
@pytest.mark.parametrize (
" bytes,expected " ,
[
2019-05-04 02:15:14 +00:00
( 120 , " 120 bytes " ) ,
( 1024 , " 1.0 KB " ) ,
( 1024 * 1024 , " 1.0 MB " ) ,
( 1024 * 1024 * 1024 , " 1.0 GB " ) ,
( 1024 * 1024 * 1024 * 1.3 , " 1.3 GB " ) ,
( 1024 * 1024 * 1024 * 1024 , " 1.0 TB " ) ,
] ,
2019-02-06 04:53:44 +00:00
)
def test_format_bytes ( bytes , expected ) :
assert expected == utils . format_bytes ( bytes )
2019-12-29 18:48:13 +00:00
@pytest.mark.parametrize (
" query,expected " ,
[
( " dog " , ' " dog " ' ) ,
( " cat, " , ' " cat, " ' ) ,
( " cat dog " , ' " cat " " dog " ' ) ,
# If a phrase is already double quoted, leave it so
( ' " cat dog " ' , ' " cat dog " ' ) ,
( ' " cat dog " fish ' , ' " cat dog " " fish " ' ) ,
# Sensibly handle unbalanced double quotes
( ' cat " ' , ' " cat " ' ) ,
( ' " cat dog " " fish ' , ' " cat dog " " fish " ' ) ,
] ,
)
def test_escape_fts ( query , expected ) :
assert expected == utils . escape_fts ( query )
2020-02-15 17:56:48 +00:00
def test_check_connection_spatialite_raises ( ) :
path = str ( pathlib . Path ( __file__ ) . parent / " spatialite.db " )
conn = sqlite3 . connect ( path )
with pytest . raises ( utils . SpatialiteConnectionProblem ) :
utils . check_connection ( conn )
def test_check_connection_passes ( ) :
conn = sqlite3 . connect ( " :memory: " )
utils . check_connection ( conn )
2020-03-17 02:47:37 +00:00
2020-05-27 19:25:52 +00:00
def test_call_with_supported_arguments ( ) :
def foo ( a , b ) :
return " {} + {} " . format ( a , b )
assert " 1+2 " == utils . call_with_supported_arguments ( foo , a = 1 , b = 2 )
assert " 1+2 " == utils . call_with_supported_arguments ( foo , a = 1 , b = 2 , c = 3 )
with pytest . raises ( TypeError ) :
utils . call_with_supported_arguments ( foo , a = 1 )
2020-06-05 17:52:50 +00:00
2020-06-05 19:05:57 +00:00
@pytest.mark.parametrize (
" data,should_raise " ,
[
( [ [ " foo " , " bar " ] , [ " foo " , " baz " ] ] , False ) ,
( [ ( " foo " , " bar " ) , ( " foo " , " baz " ) ] , False ) ,
( ( [ " foo " , " bar " ] , [ " foo " , " baz " ] ) , False ) ,
( [ [ " foo " , " bar " ] , [ " foo " , " baz " , " bax " ] ] , True ) ,
( { " foo " : [ " bar " , " baz " ] } , False ) ,
( { " foo " : ( " bar " , " baz " ) } , False ) ,
( { " foo " : " bar " } , True ) ,
]
)
2020-06-05 18:01:06 +00:00
def test_multi_params ( data , should_raise ) :
if should_raise :
with pytest . raises ( AssertionError ) :
utils . MultiParams ( data )
return
p1 = utils . MultiParams ( data )
2020-06-05 17:52:50 +00:00
assert " bar " == p1 [ " foo " ]
2020-06-05 18:01:06 +00:00
assert [ " bar " , " baz " ] == list ( p1 . getlist ( " foo " ) )