kopia lustrzana https://github.com/simonw/datasette
test_create_table_error_if_pk_changed, refs #1927
rodzic
6b27537988
commit
9342b60f14
|
@ -620,6 +620,7 @@ class TableCreateView(BaseView):
|
||||||
if not self._table_name_re.match(table_name):
|
if not self._table_name_re.match(table_name):
|
||||||
return _error(["Invalid table name"])
|
return _error(["Invalid table name"])
|
||||||
|
|
||||||
|
table_exists = await db.table_exists(data["table"])
|
||||||
columns = data.get("columns")
|
columns = data.get("columns")
|
||||||
rows = data.get("rows")
|
rows = data.get("rows")
|
||||||
row = data.get("row")
|
row = data.get("row")
|
||||||
|
@ -676,8 +677,17 @@ class TableCreateView(BaseView):
|
||||||
return _error(["pks must be a list of strings"])
|
return _error(["pks must be a list of strings"])
|
||||||
|
|
||||||
# If table exists already, read pks from that instead
|
# If table exists already, read pks from that instead
|
||||||
if await db.table_exists(table_name):
|
if table_exists:
|
||||||
pks = await db.primary_keys(table_name)
|
actual_pks = await db.primary_keys(table_name)
|
||||||
|
# if pk passed and table already exists check it does not change
|
||||||
|
bad_pks = False
|
||||||
|
if len(actual_pks) == 1 and data.get("pk") and data["pk"] != actual_pks[0]:
|
||||||
|
bad_pks = True
|
||||||
|
elif len(actual_pks) > 1 and data.get("pks") and set(data["pks"]) != set(actual_pks):
|
||||||
|
bad_pks = True
|
||||||
|
if bad_pks:
|
||||||
|
return _error(["pk cannot be changed for existing table"])
|
||||||
|
pks = actual_pks
|
||||||
|
|
||||||
def create_table(conn):
|
def create_table(conn):
|
||||||
table = sqlite_utils.Database(conn)[table_name]
|
table = sqlite_utils.Database(conn)[table_name]
|
||||||
|
|
|
@ -1199,6 +1199,43 @@ async def test_create_table_ignore_replace(ds_write, input, expected_rows_after)
|
||||||
assert rows.json() == expected_rows_after
|
assert rows.json() == expected_rows_after
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_create_table_error_if_pk_changed(ds_write):
|
||||||
|
token = write_token(ds_write)
|
||||||
|
first_response = await ds_write.client.post(
|
||||||
|
"/data/-/create",
|
||||||
|
json={
|
||||||
|
"rows": [{"id": 1, "name": "Row 1"}, {"id": 2, "name": "Row 2"}],
|
||||||
|
"table": "test_insert_replace",
|
||||||
|
"pk": "id",
|
||||||
|
},
|
||||||
|
headers={
|
||||||
|
"Authorization": "Bearer {}".format(token),
|
||||||
|
"Content-Type": "application/json",
|
||||||
|
},
|
||||||
|
)
|
||||||
|
assert first_response.status_code == 201
|
||||||
|
# Try a second time with a different pk
|
||||||
|
second_response = await ds_write.client.post(
|
||||||
|
"/data/-/create",
|
||||||
|
json={
|
||||||
|
"rows": [{"id": 1, "name": "Row 1"}, {"id": 2, "name": "Row 2"}],
|
||||||
|
"table": "test_insert_replace",
|
||||||
|
"pk": "name",
|
||||||
|
"replace": True,
|
||||||
|
},
|
||||||
|
headers={
|
||||||
|
"Authorization": "Bearer {}".format(token),
|
||||||
|
"Content-Type": "application/json",
|
||||||
|
},
|
||||||
|
)
|
||||||
|
assert second_response.status_code == 400
|
||||||
|
assert second_response.json() == {
|
||||||
|
"ok": False,
|
||||||
|
"errors": ["pk cannot be changed for existing table"],
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
@pytest.mark.parametrize(
|
@pytest.mark.parametrize(
|
||||||
"path",
|
"path",
|
||||||
|
|
Ładowanie…
Reference in New Issue