kopia lustrzana https://github.com/simonw/datasette
spike to fix half of #1099
rodzic
e4ebef082d
commit
41beef009f
|
@ -523,30 +523,21 @@ def detect_primary_keys(conn, table):
|
|||
|
||||
def get_outbound_foreign_keys(conn, table):
|
||||
infos = conn.execute(f"PRAGMA foreign_key_list([{table}])").fetchall()
|
||||
fks = []
|
||||
fks = {}
|
||||
for info in infos:
|
||||
if info is not None:
|
||||
id, seq, table_name, from_, to_, on_update, on_delete, match = info
|
||||
fks.append(
|
||||
{
|
||||
"column": from_,
|
||||
if id in fks:
|
||||
fk_info = fks[id]
|
||||
fk_info["columns"] += (from_,)
|
||||
fk_info["other_columns"] += (to_,)
|
||||
else:
|
||||
fks[id] = {
|
||||
"other_table": table_name,
|
||||
"other_column": to_,
|
||||
"id": id,
|
||||
"seq": seq,
|
||||
"columns": (from_,),
|
||||
"other_columns": (to_,),
|
||||
}
|
||||
)
|
||||
# Filter out compound foreign keys by removing any where "id" is not unique
|
||||
id_counts = Counter(fk["id"] for fk in fks)
|
||||
return [
|
||||
{
|
||||
"column": fk["column"],
|
||||
"other_table": fk["other_table"],
|
||||
"other_column": fk["other_column"],
|
||||
}
|
||||
for fk in fks
|
||||
if id_counts[fk["id"]] == 1
|
||||
]
|
||||
return list(fks.values())
|
||||
|
||||
|
||||
def get_all_foreign_keys(conn):
|
||||
|
@ -560,17 +551,17 @@ def get_all_foreign_keys(conn):
|
|||
fks = get_outbound_foreign_keys(conn, table)
|
||||
for fk in fks:
|
||||
table_name = fk["other_table"]
|
||||
from_ = fk["column"]
|
||||
to_ = fk["other_column"]
|
||||
from_ = fk["columns"]
|
||||
to_ = fk["other_columns"]
|
||||
if table_name not in table_to_foreign_keys:
|
||||
# Weird edge case where something refers to a table that does
|
||||
# not actually exist
|
||||
continue
|
||||
table_to_foreign_keys[table_name]["incoming"].append(
|
||||
{"other_table": table, "column": to_, "other_column": from_}
|
||||
{"other_table": table, "columns": to_, "other_columns": from_}
|
||||
)
|
||||
table_to_foreign_keys[table]["outgoing"].append(
|
||||
{"other_table": table_name, "column": from_, "other_column": to_}
|
||||
{"other_table": table_name, "columns": from_, "other_columns": to_}
|
||||
)
|
||||
|
||||
return table_to_foreign_keys
|
||||
|
|
|
@ -97,8 +97,6 @@ class RowView(DataView):
|
|||
)
|
||||
|
||||
async def foreign_key_tables(self, database, table, pk_values):
|
||||
if len(pk_values) != 1:
|
||||
return []
|
||||
db = self.ds.databases[database]
|
||||
all_foreign_keys = await db.get_all_foreign_keys()
|
||||
foreign_keys = all_foreign_keys[table]["incoming"]
|
||||
|
@ -107,37 +105,44 @@ class RowView(DataView):
|
|||
|
||||
sql = "select " + ", ".join(
|
||||
[
|
||||
"(select count(*) from {table} where {column}=:id)".format(
|
||||
"(select count(*) from {table} where {condition})".format(
|
||||
table=escape_sqlite(fk["other_table"]),
|
||||
column=escape_sqlite(fk["other_column"]),
|
||||
condition=" and ".join(
|
||||
"{column}=:id{i}".format(column=column, i=i)
|
||||
for i, column in enumerate(fk["other_columns"])
|
||||
),
|
||||
)
|
||||
for fk in foreign_keys
|
||||
]
|
||||
)
|
||||
try:
|
||||
rows = list(await db.execute(sql, {"id": pk_values[0]}))
|
||||
rows = list(
|
||||
await db.execute(
|
||||
sql, {"id{i}".format(i=i): pk for i, pk in enumerate(pk_values)}
|
||||
)
|
||||
)
|
||||
except QueryInterrupted:
|
||||
# Almost certainly hit the timeout
|
||||
return []
|
||||
|
||||
foreign_table_counts = dict(
|
||||
zip(
|
||||
[(fk["other_table"], fk["other_column"]) for fk in foreign_keys],
|
||||
[(fk["other_table"], fk["other_columns"]) for fk in foreign_keys],
|
||||
list(rows[0]),
|
||||
)
|
||||
)
|
||||
foreign_key_tables = []
|
||||
for fk in foreign_keys:
|
||||
count = (
|
||||
foreign_table_counts.get((fk["other_table"], fk["other_column"])) or 0
|
||||
foreign_table_counts.get((fk["other_table"], fk["other_columns"])) or 0
|
||||
)
|
||||
key = fk["other_column"]
|
||||
if key.startswith("_"):
|
||||
key += "__exact"
|
||||
link = "{}?{}={}".format(
|
||||
self.ds.urls.table(database, fk["other_table"]),
|
||||
key,
|
||||
",".join(pk_values),
|
||||
query_pairs = zip(fk["other_columns"], pk_values)
|
||||
query = "&".join(
|
||||
"{}={}".format(col + "__exact" if col.startswith("_") else col, pk)
|
||||
for col, pk in query_pairs
|
||||
)
|
||||
link = "{}?{}".format(
|
||||
self.ds.urls.table(database, fk["other_table"]), query
|
||||
)
|
||||
foreign_key_tables.append({**fk, **{"count": count, "link": link}})
|
||||
return foreign_key_tables
|
||||
|
|
|
@ -644,7 +644,11 @@ class TableView(DataView):
|
|||
columns_to_expand = request.args.getlist("_label")
|
||||
if columns_to_expand is None and all_labels:
|
||||
# expand all columns with foreign keys
|
||||
columns_to_expand = [fk["column"] for fk, _ in expandable_columns]
|
||||
columns_to_expand = [
|
||||
fk["columns"][0]
|
||||
for fk, _ in expandable_columns
|
||||
if len(fk["columns"]) == 1
|
||||
]
|
||||
|
||||
if columns_to_expand:
|
||||
expanded_labels = {}
|
||||
|
@ -920,8 +924,9 @@ async def display_columns_and_rows(
|
|||
)
|
||||
|
||||
column_to_foreign_key_table = {
|
||||
fk["column"]: fk["other_table"]
|
||||
fk["columns"][0]: fk["other_table"]
|
||||
for fk in await db.foreign_keys_for_table(table_name)
|
||||
if len(fk["columns"]) == 1
|
||||
}
|
||||
|
||||
cell_rows = []
|
||||
|
|
Ładowanie…
Reference in New Issue