kopia lustrzana https://github.com/simonw/datasette
Let datasette inspect method more similar to the original one
rodzic
4e7d3a8ac7
commit
9c9fd7fdc5
225
datasette/app.py
225
datasette/app.py
|
@ -211,8 +211,17 @@ class Datasette:
|
|||
conn.execute("SELECT load_extension('{}')".format(extension))
|
||||
pm.hook.prepare_connection(conn=conn)
|
||||
|
||||
def _is_sqlite3_file(self, path):
|
||||
try:
|
||||
with sqlite3.connect(
|
||||
"file:{}?immutable=1".format(path), uri=True
|
||||
) as conn:
|
||||
conn.execute('select * from sqlite_master where type="table"')
|
||||
return True
|
||||
except:
|
||||
return False
|
||||
|
||||
def inspect(self):
|
||||
dbtype = 'sqlite3'
|
||||
if not self._inspect:
|
||||
self._inspect = {}
|
||||
for filename in self.files:
|
||||
|
@ -230,129 +239,139 @@ class Datasette:
|
|||
break
|
||||
|
||||
m.update(data)
|
||||
|
||||
# If it isn't a sqlite3 file, use other connectors
|
||||
if not self._is_sqlite3_file(path):
|
||||
tables, views, dbtype = connectors.inspect(path)
|
||||
self._inspect[name] = {
|
||||
"hash": m.hexdigest(),
|
||||
"file": str(path),
|
||||
"dbtype": dbtype,
|
||||
"tables": tables,
|
||||
"views": views,
|
||||
}
|
||||
continue
|
||||
|
||||
# List tables and their row counts
|
||||
database_metadata = self.metadata.get("databases", {}).get(name, {})
|
||||
tables = {}
|
||||
views = []
|
||||
try:
|
||||
with sqlite3.connect(
|
||||
"file:{}?immutable=1".format(path), uri=True
|
||||
) as conn:
|
||||
self.prepare_connection(conn)
|
||||
table_names = [
|
||||
r["name"]
|
||||
with sqlite3.connect(
|
||||
"file:{}?immutable=1".format(path), uri=True
|
||||
) as conn:
|
||||
self.prepare_connection(conn)
|
||||
table_names = [
|
||||
r["name"]
|
||||
for r in conn.execute(
|
||||
'select * from sqlite_master where type="table"'
|
||||
)
|
||||
]
|
||||
views = [
|
||||
v[0]
|
||||
for v in conn.execute(
|
||||
'select name from sqlite_master where type = "view"'
|
||||
)
|
||||
]
|
||||
for table in table_names:
|
||||
try:
|
||||
count = conn.execute(
|
||||
"select count(*) from {}".format(escape_sqlite(table))
|
||||
).fetchone()[
|
||||
0
|
||||
]
|
||||
except sqlite3.OperationalError:
|
||||
# This can happen when running against a FTS virtual tables
|
||||
# e.g. "select count(*) from some_fts;"
|
||||
count = 0
|
||||
# Does this table have a FTS table?
|
||||
fts_table = detect_fts(conn, table)
|
||||
|
||||
# Figure out primary keys
|
||||
table_info_rows = [
|
||||
row
|
||||
for row in conn.execute(
|
||||
'PRAGMA table_info("{}")'.format(table)
|
||||
).fetchall()
|
||||
if row[-1]
|
||||
]
|
||||
table_info_rows.sort(key=lambda row: row[-1])
|
||||
primary_keys = [str(r[1]) for r in table_info_rows]
|
||||
label_column = None
|
||||
# If table has two columns, one of which is ID, then label_column is the other one
|
||||
column_names = [
|
||||
r[1]
|
||||
for r in conn.execute(
|
||||
'select * from sqlite_master where type="table"'
|
||||
)
|
||||
"PRAGMA table_info({});".format(escape_sqlite(table))
|
||||
).fetchall()
|
||||
]
|
||||
views = [
|
||||
v[0]
|
||||
for v in conn.execute(
|
||||
'select name from sqlite_master where type = "view"'
|
||||
)
|
||||
]
|
||||
for table in table_names:
|
||||
try:
|
||||
count = conn.execute(
|
||||
"select count(*) from {}".format(escape_sqlite(table))
|
||||
).fetchone()[
|
||||
0
|
||||
]
|
||||
except sqlite3.OperationalError:
|
||||
# This can happen when running against a FTS virtual tables
|
||||
# e.g. "select count(*) from some_fts;"
|
||||
count = 0
|
||||
# Does this table have a FTS table?
|
||||
fts_table = detect_fts(conn, table)
|
||||
if (
|
||||
column_names
|
||||
and len(column_names) == 2
|
||||
and "id" in column_names
|
||||
):
|
||||
label_column = [c for c in column_names if c != "id"][0]
|
||||
table_metadata = database_metadata.get("tables", {}).get(
|
||||
table, {}
|
||||
)
|
||||
tables[table] = {
|
||||
"name": table,
|
||||
"columns": column_names,
|
||||
"primary_keys": primary_keys,
|
||||
"count": count,
|
||||
"label_column": label_column,
|
||||
"hidden": table_metadata.get("hidden") or False,
|
||||
"fts_table": fts_table,
|
||||
}
|
||||
|
||||
# Figure out primary keys
|
||||
table_info_rows = [
|
||||
row
|
||||
for row in conn.execute(
|
||||
'PRAGMA table_info("{}")'.format(table)
|
||||
).fetchall()
|
||||
if row[-1]
|
||||
]
|
||||
table_info_rows.sort(key=lambda row: row[-1])
|
||||
primary_keys = [str(r[1]) for r in table_info_rows]
|
||||
label_column = None
|
||||
# If table has two columns, one of which is ID, then label_column is the other one
|
||||
column_names = [
|
||||
r[1]
|
||||
for r in conn.execute(
|
||||
"PRAGMA table_info({});".format(escape_sqlite(table))
|
||||
).fetchall()
|
||||
]
|
||||
if (
|
||||
column_names
|
||||
and len(column_names) == 2
|
||||
and "id" in column_names
|
||||
):
|
||||
label_column = [c for c in column_names if c != "id"][0]
|
||||
table_metadata = database_metadata.get("tables", {}).get(
|
||||
table, {}
|
||||
)
|
||||
tables[table] = {
|
||||
"name": table,
|
||||
"columns": column_names,
|
||||
"primary_keys": primary_keys,
|
||||
"count": count,
|
||||
"label_column": label_column,
|
||||
"hidden": table_metadata.get("hidden") or False,
|
||||
"fts_table": fts_table,
|
||||
}
|
||||
foreign_keys = get_all_foreign_keys(conn)
|
||||
for table, info in foreign_keys.items():
|
||||
tables[table]["foreign_keys"] = info
|
||||
|
||||
foreign_keys = get_all_foreign_keys(conn)
|
||||
for table, info in foreign_keys.items():
|
||||
tables[table]["foreign_keys"] = info
|
||||
# Mark tables 'hidden' if they relate to FTS virtual tables
|
||||
hidden_tables = [
|
||||
r["name"]
|
||||
for r in conn.execute(
|
||||
"""
|
||||
select name from sqlite_master
|
||||
where rootpage = 0
|
||||
and sql like '%VIRTUAL TABLE%USING FTS%'
|
||||
"""
|
||||
)
|
||||
]
|
||||
|
||||
# Mark tables 'hidden' if they relate to FTS virtual tables
|
||||
hidden_tables = [
|
||||
if detect_spatialite(conn):
|
||||
# Also hide Spatialite internal tables
|
||||
hidden_tables += [
|
||||
"ElementaryGeometries",
|
||||
"SpatialIndex",
|
||||
"geometry_columns",
|
||||
"spatial_ref_sys",
|
||||
"spatialite_history",
|
||||
"sql_statements_log",
|
||||
"sqlite_sequence",
|
||||
"views_geometry_columns",
|
||||
"virts_geometry_columns",
|
||||
] + [
|
||||
r["name"]
|
||||
for r in conn.execute(
|
||||
"""
|
||||
select name from sqlite_master
|
||||
where rootpage = 0
|
||||
and sql like '%VIRTUAL TABLE%USING FTS%'
|
||||
where name like "idx_%"
|
||||
and type = "table"
|
||||
"""
|
||||
)
|
||||
]
|
||||
|
||||
if detect_spatialite(conn):
|
||||
# Also hide Spatialite internal tables
|
||||
hidden_tables += [
|
||||
"ElementaryGeometries",
|
||||
"SpatialIndex",
|
||||
"geometry_columns",
|
||||
"spatial_ref_sys",
|
||||
"spatialite_history",
|
||||
"sql_statements_log",
|
||||
"sqlite_sequence",
|
||||
"views_geometry_columns",
|
||||
"virts_geometry_columns",
|
||||
] + [
|
||||
r["name"]
|
||||
for r in conn.execute(
|
||||
"""
|
||||
select name from sqlite_master
|
||||
where name like "idx_%"
|
||||
and type = "table"
|
||||
"""
|
||||
)
|
||||
]
|
||||
|
||||
for t in tables.keys():
|
||||
for hidden_table in hidden_tables:
|
||||
if t == hidden_table or t.startswith(hidden_table):
|
||||
tables[t]["hidden"] = True
|
||||
continue
|
||||
except:
|
||||
tables, views, dbtype = connectors.inspect(path)
|
||||
for t in tables.keys():
|
||||
for hidden_table in hidden_tables:
|
||||
if t == hidden_table or t.startswith(hidden_table):
|
||||
tables[t]["hidden"] = True
|
||||
continue
|
||||
|
||||
self._inspect[name] = {
|
||||
"hash": m.hexdigest(),
|
||||
"file": str(path),
|
||||
"dbtype": dbtype,
|
||||
"dbtype": "sqlite3",
|
||||
"tables": tables,
|
||||
"views": views,
|
||||
}
|
||||
|
|
Ładowanie…
Reference in New Issue