kopia lustrzana https://github.com/simonw/datasette
Trying to get pytest-dist working on multiple cores
rodzic
7d7f5f61fd
commit
be018ce47f
|
@ -1,9 +1,11 @@
|
|||
from datasette.app import Datasette
|
||||
from sanic.testing import PORT as PORT_BASE, SanicTestClient
|
||||
import itertools
|
||||
import json
|
||||
import os
|
||||
import pytest
|
||||
import random
|
||||
import re
|
||||
import sqlite3
|
||||
import sys
|
||||
import string
|
||||
|
@ -11,7 +13,7 @@ import tempfile
|
|||
import time
|
||||
|
||||
|
||||
class TestClient:
|
||||
class MyTestClient:
|
||||
def __init__(self, sanic_test_client):
|
||||
self.sanic_test_client = sanic_test_client
|
||||
|
||||
|
@ -24,7 +26,7 @@ class TestClient:
|
|||
|
||||
|
||||
@pytest.fixture(scope='session')
|
||||
def app_client(sql_time_limit_ms=None, max_returned_rows=None, config=None, filename="fixtures.db"):
|
||||
def app_client(worker_id, sql_time_limit_ms=None, max_returned_rows=None, config=None, filename="fixtures.db"):
|
||||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
filepath = os.path.join(tmpdir, filename)
|
||||
conn = sqlite3.connect(filepath)
|
||||
|
@ -49,38 +51,51 @@ def app_client(sql_time_limit_ms=None, max_returned_rows=None, config=None, file
|
|||
ds.sqlite_functions.append(
|
||||
('sleep', 1, lambda n: time.sleep(float(n))),
|
||||
)
|
||||
client = TestClient(ds.app().test_client)
|
||||
m = re.search(r'[0-9]+', worker_id)
|
||||
if m:
|
||||
num_id = m.group(0)
|
||||
else:
|
||||
num_id = 0
|
||||
port = PORT_BASE + int(num_id)
|
||||
client = MyTestClient(SanicTestClient(ds.app(), port))
|
||||
client.ds = ds
|
||||
yield client
|
||||
|
||||
|
||||
@pytest.fixture(scope='session')
|
||||
def app_client_shorter_time_limit():
|
||||
yield from app_client(20)
|
||||
def app_client_factory(worker_id):
|
||||
def factory(**kwargs):
|
||||
yield from app_client(worker_id, **kwargs)
|
||||
return factory
|
||||
|
||||
|
||||
@pytest.fixture(scope='session')
|
||||
def app_client_returned_rows_matches_page_size():
|
||||
yield from app_client(max_returned_rows=50)
|
||||
def app_client_shorter_time_limit(worker_id):
|
||||
yield from app_client(worker_id, 20)
|
||||
|
||||
|
||||
@pytest.fixture(scope='session')
|
||||
def app_client_larger_cache_size():
|
||||
yield from app_client(config={
|
||||
def app_client_returned_rows_matches_page_size(worker_id):
|
||||
yield from app_client(worker_id, max_returned_rows=50)
|
||||
|
||||
|
||||
@pytest.fixture(scope='session')
|
||||
def app_client_larger_cache_size(worker_id):
|
||||
yield from app_client(worker_id, config={
|
||||
'cache_size_kb': 2500,
|
||||
})
|
||||
|
||||
|
||||
@pytest.fixture(scope='session')
|
||||
def app_client_csv_max_mb_one():
|
||||
yield from app_client(config={
|
||||
def app_client_csv_max_mb_one(worker_id):
|
||||
yield from app_client(worker_id, config={
|
||||
'max_csv_mb': 1,
|
||||
})
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def app_client_with_dot():
|
||||
yield from app_client(filename="fixtures.dot.db")
|
||||
def app_client_with_dot(worker_id):
|
||||
yield from app_client(worker_id, filename="fixtures.dot.db")
|
||||
|
||||
|
||||
def generate_compound_rows(num):
|
||||
|
|
|
@ -1,5 +1,6 @@
|
|||
from .fixtures import ( # noqa
|
||||
app_client,
|
||||
app_client_factory,
|
||||
app_client_shorter_time_limit,
|
||||
app_client_larger_cache_size,
|
||||
app_client_returned_rows_matches_page_size,
|
||||
|
@ -424,8 +425,8 @@ def test_invalid_custom_sql(app_client):
|
|||
assert 'Statement must be a SELECT' == response.json['error']
|
||||
|
||||
|
||||
def test_allow_sql_off():
|
||||
for client in app_client(config={
|
||||
def test_allow_sql_off(worker_id):
|
||||
for client in app_client(worker_id, config={
|
||||
'allow_sql': False,
|
||||
}):
|
||||
assert 400 == client.get(
|
||||
|
@ -971,11 +972,12 @@ def test_config_json(app_client):
|
|||
} == response.json
|
||||
|
||||
|
||||
def test_page_size_matching_max_returned_rows(app_client_returned_rows_matches_page_size):
|
||||
def test_page_size_matching_max_returned_rows(app_client_factory):
|
||||
app_client = app_client_factory(max_returned_rows=50)
|
||||
fetched = []
|
||||
path = '/fixtures/no_primary_key.json'
|
||||
while path:
|
||||
response = app_client_returned_rows_matches_page_size.get(path)
|
||||
response = app_client.get(path)
|
||||
fetched.extend(response.json['rows'])
|
||||
assert len(response.json['rows']) in (1, 50)
|
||||
path = response.json['next_url']
|
||||
|
@ -1140,8 +1142,8 @@ def test_suggested_facets(app_client):
|
|||
).json["suggested_facets"]) > 0
|
||||
|
||||
|
||||
def test_allow_facet_off():
|
||||
for client in app_client(config={
|
||||
def test_allow_facet_off(worker_id):
|
||||
for client in app_client(worker_id, config={
|
||||
'allow_facet': False,
|
||||
}):
|
||||
assert 400 == client.get(
|
||||
|
@ -1153,8 +1155,8 @@ def test_allow_facet_off():
|
|||
).json["suggested_facets"]
|
||||
|
||||
|
||||
def test_suggest_facets_off():
|
||||
for client in app_client(config={
|
||||
def test_suggest_facets_off(worker_id):
|
||||
for client in app_client(worker_id, config={
|
||||
'suggest_facets': False,
|
||||
}):
|
||||
# Now suggested_facets should be []
|
||||
|
|
|
@ -30,6 +30,7 @@ pk,planet_int,on_earth,state,city_id,city_id_label,neighborhood
|
|||
15,2,0,MC,4,Memnonia,Arcadia Planitia
|
||||
'''.lstrip().replace('\n', '\r\n')
|
||||
|
||||
|
||||
def test_table_csv(app_client):
|
||||
response = app_client.get('/fixtures/simple_primary_key.csv')
|
||||
assert response.status == 200
|
||||
|
|
|
@ -639,8 +639,8 @@ def test_allow_download_on(app_client):
|
|||
assert len(soup.findAll('a', {'href': re.compile('\.db$')}))
|
||||
|
||||
|
||||
def test_allow_download_off():
|
||||
for client in app_client(config={
|
||||
def test_allow_download_off(worker_id):
|
||||
for client in app_client(worker_id, config={
|
||||
'allow_download': False,
|
||||
}):
|
||||
response = client.get(
|
||||
|
@ -665,8 +665,8 @@ def test_allow_sql_on(app_client):
|
|||
assert len(soup.findAll('textarea', {'name': 'sql'}))
|
||||
|
||||
|
||||
def test_allow_sql_off():
|
||||
for client in app_client(config={
|
||||
def test_allow_sql_off(worker_id):
|
||||
for client in app_client(worker_id, config={
|
||||
'allow_sql': False,
|
||||
}):
|
||||
response = client.get(
|
||||
|
|
Ładowanie…
Reference in New Issue