kopia lustrzana https://github.com/simonw/datasette
Added addressable page per row
Refs #1 - only exists for tables with introspectable primary keys. Still need to link to this page. Also added first unit tests - refs #9magic-columns
rodzic
606ff9e35e
commit
6a9fdcc071
56
app.py
56
app.py
|
@ -6,6 +6,7 @@ from sanic_jinja2 import SanicJinja2
|
|||
import sqlite3
|
||||
from pathlib import Path
|
||||
from functools import wraps
|
||||
import urllib.parse
|
||||
import json
|
||||
import hashlib
|
||||
import sys
|
||||
|
@ -157,8 +158,37 @@ class TableView(BaseView):
|
|||
}
|
||||
|
||||
|
||||
class RowView(BaseView):
|
||||
template = 'table.html'
|
||||
|
||||
def data(self, request, name, hash, table, pk_path):
|
||||
conn = get_conn(name)
|
||||
pk_values = compound_pks_from_path(pk_path)
|
||||
pks = pks_for_table(conn, table)
|
||||
wheres = [
|
||||
'{}=?'.format(pk)
|
||||
for pk in pks
|
||||
]
|
||||
sql = 'select * from "{}" where {}'.format(
|
||||
table, ' AND '.join(wheres)
|
||||
)
|
||||
rows = conn.execute(sql, pk_values)
|
||||
columns = [r[0] for r in rows.description]
|
||||
rows = list(rows)
|
||||
if not rows:
|
||||
raise NotFound('Record not found: {}'.format(pk_values))
|
||||
return {
|
||||
'database': name,
|
||||
'database_hash': hash,
|
||||
'table': table,
|
||||
'rows': rows,
|
||||
'columns': columns,
|
||||
}
|
||||
|
||||
|
||||
app.add_route(DatabaseView.as_view(), '/<db_name:[^/]+?><as_json:(.json)?$>')
|
||||
app.add_route(TableView.as_view(), '/<db_name:[^/]+>/<table:[^/]+?><as_json:(.json)?$>')
|
||||
app.add_route(RowView.as_view(), '/<db_name:[^/]+>/<table:[^/]+?>/<pk_path:[^/]+?><as_json:(.json)?$>')
|
||||
|
||||
|
||||
def resolve_db_name(db_name, **kwargs):
|
||||
|
@ -179,7 +209,7 @@ def resolve_db_name(db_name, **kwargs):
|
|||
try:
|
||||
info = databases[name]
|
||||
except KeyError:
|
||||
raise NotFound()
|
||||
raise NotFound('Database not found: {}'.format(name))
|
||||
expected = info['hash'][:7]
|
||||
if expected != hash:
|
||||
should_redirect = '/{}-{}'.format(
|
||||
|
@ -191,6 +221,30 @@ def resolve_db_name(db_name, **kwargs):
|
|||
return name, expected, None
|
||||
|
||||
|
||||
def compound_pks_from_path(path):
|
||||
return [
|
||||
urllib.parse.unquote_plus(b) for b in path.split(',')
|
||||
]
|
||||
|
||||
|
||||
def pks_for_table(conn, table):
|
||||
rows = [
|
||||
row for row in conn.execute(
|
||||
'PRAGMA table_info("{}")'.format(table)
|
||||
).fetchall()
|
||||
if row[-1]
|
||||
]
|
||||
rows.sort(key=lambda row: row[-1])
|
||||
return [r[1] for r in rows]
|
||||
|
||||
|
||||
def path_from_row_pks(row, pks):
|
||||
bits = []
|
||||
for pk in pks:
|
||||
bits.append(urllib.parse.quote_plus(row[pk]))
|
||||
return ','.join(bits)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
if '--build' in sys.argv:
|
||||
ensure_build_metadata(True)
|
||||
|
|
|
@ -0,0 +1,46 @@
|
|||
import app
|
||||
import pytest
|
||||
import sqlite3
|
||||
|
||||
|
||||
@pytest.mark.parametrize('path,expected', [
|
||||
('foo', ['foo']),
|
||||
('foo,bar', ['foo', 'bar']),
|
||||
('123,433,112', ['123', '433', '112']),
|
||||
('123%2C433,112', ['123,433', '112']),
|
||||
('123%2F433%2F112', ['123/433/112']),
|
||||
])
|
||||
def test_compound_pks_from_path(path, expected):
|
||||
assert expected == app.compound_pks_from_path(path)
|
||||
|
||||
|
||||
@pytest.mark.parametrize('sql,table,expected_keys', [
|
||||
('''
|
||||
CREATE TABLE `Compound` (
|
||||
A varchar(5) NOT NULL,
|
||||
B varchar(10) NOT NULL,
|
||||
PRIMARY KEY (A, B)
|
||||
);
|
||||
''', 'Compound', ['A', 'B']),
|
||||
('''
|
||||
CREATE TABLE `Compound2` (
|
||||
A varchar(5) NOT NULL,
|
||||
B varchar(10) NOT NULL,
|
||||
PRIMARY KEY (B, A)
|
||||
);
|
||||
''', 'Compound2', ['B', 'A']),
|
||||
])
|
||||
def test_pks_for_table(sql, table, expected_keys):
|
||||
conn = sqlite3.connect(':memory:')
|
||||
conn.execute(sql)
|
||||
actual = app.pks_for_table(conn, table)
|
||||
assert expected_keys == actual
|
||||
|
||||
|
||||
@pytest.mark.parametrize('row,pks,expected_path', [
|
||||
({'A': 'foo', 'B': 'bar'}, ['A', 'B'], 'foo,bar'),
|
||||
({'A': 'f,o', 'B': 'bar'}, ['A', 'B'], 'f%2Co,bar'),
|
||||
])
|
||||
def test_path_from_row_pks(row, pks, expected_path):
|
||||
actual_path = app.path_from_row_pks(row, pks)
|
||||
assert expected_path == actual_path
|
Ładowanie…
Reference in New Issue