kopia lustrzana https://github.com/simonw/datasette
rodzic
bdf59eb7db
commit
527cec66b0
|
@ -421,6 +421,7 @@ def uninstall(packages, yes):
|
|||
help="Path to JSON/YAML Datasette configuration file",
|
||||
)
|
||||
@click.option(
|
||||
"-s",
|
||||
"--setting",
|
||||
"settings",
|
||||
type=Setting(),
|
||||
|
|
|
@ -1219,3 +1219,52 @@ async def row_sql_params_pks(db, table, pk_values):
|
|||
for i, pk_value in enumerate(pk_values):
|
||||
params[f"p{i}"] = pk_value
|
||||
return sql, params, pks
|
||||
|
||||
|
||||
def _handle_pair(key: str, value: str) -> dict:
|
||||
"""
|
||||
Turn a key-value pair into a nested dictionary.
|
||||
foo, bar => {'foo': 'bar'}
|
||||
foo.bar, baz => {'foo': {'bar': 'baz'}}
|
||||
foo.bar, [1, 2, 3] => {'foo': {'bar': [1, 2, 3]}}
|
||||
foo.bar, "baz" => {'foo': {'bar': 'baz'}}
|
||||
foo.bar, '{"baz": "qux"}' => {'foo': {'bar': "{'baz': 'qux'}"}}
|
||||
"""
|
||||
try:
|
||||
value = json.loads(value)
|
||||
except json.JSONDecodeError:
|
||||
# If it doesn't parse as JSON, treat it as a string
|
||||
pass
|
||||
|
||||
keys = key.split(".")
|
||||
result = current_dict = {}
|
||||
|
||||
for k in keys[:-1]:
|
||||
current_dict[k] = {}
|
||||
current_dict = current_dict[k]
|
||||
|
||||
current_dict[keys[-1]] = value
|
||||
return result
|
||||
|
||||
|
||||
def _combine(base: dict, update: dict) -> dict:
|
||||
"""
|
||||
Recursively merge two dictionaries.
|
||||
"""
|
||||
for key, value in update.items():
|
||||
if isinstance(value, dict) and key in base and isinstance(base[key], dict):
|
||||
base[key] = _combine(base[key], value)
|
||||
else:
|
||||
base[key] = value
|
||||
return base
|
||||
|
||||
|
||||
def pairs_to_nested_config(pairs: typing.List[typing.Tuple[str, typing.Any]]) -> dict:
|
||||
"""
|
||||
Parse a list of key-value pairs into a nested dictionary.
|
||||
"""
|
||||
result = {}
|
||||
for key, value in pairs:
|
||||
parsed_pair = _handle_pair(key, value)
|
||||
result = _combine(result, parsed_pair)
|
||||
return result
|
||||
|
|
|
@ -655,3 +655,53 @@ def test_tilde_encoding(original, expected):
|
|||
def test_truncate_url(url, length, expected):
|
||||
actual = utils.truncate_url(url, length)
|
||||
assert actual == expected
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"pairs,expected",
|
||||
(
|
||||
# Simple nested objects
|
||||
([("a", "b")], {"a": "b"}),
|
||||
([("a.b", "c")], {"a": {"b": "c"}}),
|
||||
# JSON literals
|
||||
([("a.b", "true")], {"a": {"b": True}}),
|
||||
([("a.b", "false")], {"a": {"b": False}}),
|
||||
([("a.b", "null")], {"a": {"b": None}}),
|
||||
([("a.b", "1")], {"a": {"b": 1}}),
|
||||
([("a.b", "1.1")], {"a": {"b": 1.1}}),
|
||||
# Nested JSON literals
|
||||
([("a.b", '{"foo": "bar"}')], {"a": {"b": {"foo": "bar"}}}),
|
||||
([("a.b", "[1, 2, 3]")], {"a": {"b": [1, 2, 3]}}),
|
||||
# JSON strings are preserved
|
||||
([("a.b", '"true"')], {"a": {"b": "true"}}),
|
||||
([("a.b", '"[1, 2, 3]"')], {"a": {"b": "[1, 2, 3]"}}),
|
||||
# Later keys over-ride the previous
|
||||
(
|
||||
[
|
||||
("a", "b"),
|
||||
("a.b", "c"),
|
||||
],
|
||||
{"a": {"b": "c"}},
|
||||
),
|
||||
(
|
||||
[
|
||||
("settings.trace_debug", "true"),
|
||||
("plugins.datasette-ripgrep.path", "/etc"),
|
||||
("settings.trace_debug", "false"),
|
||||
],
|
||||
{
|
||||
"settings": {
|
||||
"trace_debug": False,
|
||||
},
|
||||
"plugins": {
|
||||
"datasette-ripgrep": {
|
||||
"path": "/etc",
|
||||
}
|
||||
},
|
||||
},
|
||||
),
|
||||
),
|
||||
)
|
||||
def test_pairs_to_nested_config(pairs, expected):
|
||||
actual = utils.pairs_to_nested_config(pairs)
|
||||
assert actual == expected
|
||||
|
|
Ładowanie…
Reference in New Issue