kopia lustrzana https://github.com/jupyterhub/repo2docker
Porównaj commity
4 Commity
391b9bc5ba
...
0808018da7
Autor | SHA1 | Data |
---|---|---|
Sol Lee | 0808018da7 | |
Sol Lee | 1039d8dcb9 | |
Sol Lee | 7712751c8f | |
Sol Lee | 472c5cdc29 |
|
@ -1,4 +1,3 @@
|
|||
import re
|
||||
from datetime import datetime, timedelta, timezone
|
||||
from os import path
|
||||
from urllib.parse import urlparse
|
||||
|
@ -37,7 +36,6 @@ class CKAN(ContentProvider):
|
|||
return self.session.get(url, **kwargs)
|
||||
|
||||
urlopen = _request
|
||||
url_regex = r"/dataset/[a-z0-9_\\-]*$"
|
||||
|
||||
def detect(self, source, ref=None, extra_args=None):
|
||||
"""Trigger this provider for things that resolve to a CKAN dataset."""
|
||||
|
@ -45,14 +43,20 @@ class CKAN(ContentProvider):
|
|||
if not parsed_url.netloc:
|
||||
return None
|
||||
|
||||
url_parts = parsed_url.path.split("/")
|
||||
if url_parts[-2] == "dataset":
|
||||
self.dataset_id = url_parts[-1]
|
||||
else:
|
||||
return None
|
||||
|
||||
api_url_path = "/api/3/action/"
|
||||
api_url = parsed_url._replace(
|
||||
path=re.sub(self.url_regex, "/api/3/action/", parsed_url.path)
|
||||
path="/".join(url_parts[:-2]) + api_url_path
|
||||
).geturl()
|
||||
|
||||
status_show_url = f"{api_url}status_show"
|
||||
resp = self.urlopen(status_show_url)
|
||||
if resp.status_code == 200:
|
||||
self.dataset_id = parsed_url.path.rsplit("/", maxsplit=1)[1]
|
||||
self.version = self._fetch_version(api_url)
|
||||
return {
|
||||
"dataset_id": self.dataset_id,
|
||||
|
|
|
@ -2,29 +2,26 @@ import os
|
|||
from contextlib import contextmanager
|
||||
from tempfile import NamedTemporaryFile, TemporaryDirectory
|
||||
|
||||
import pytest
|
||||
|
||||
from repo2docker.contentproviders import CKAN
|
||||
|
||||
test_ckan = CKAN()
|
||||
test_hosts = [
|
||||
(
|
||||
[
|
||||
"http://demo.ckan.org/dataset/sample-dataset-1",
|
||||
],
|
||||
{
|
||||
"dataset_id": "sample-dataset-1",
|
||||
"api_url": "http://demo.ckan.org/api/3/action/",
|
||||
"version": "1707387710",
|
||||
},
|
||||
|
||||
def test_detect_ckan(requests_mock):
|
||||
mock_response = {"result": {"metadata_modified": "2024-02-27T14:15:54.573058"}}
|
||||
requests_mock.get("http://demo.ckan.org/api/3/action/status_show", status_code=200)
|
||||
requests_mock.get(
|
||||
"http://demo.ckan.org/api/3/action/package_show?id=1234", json=mock_response
|
||||
)
|
||||
]
|
||||
|
||||
expected = {
|
||||
"dataset_id": "1234",
|
||||
"api_url": "http://demo.ckan.org/api/3/action/",
|
||||
"version": "1709043354",
|
||||
}
|
||||
|
||||
assert CKAN().detect("http://demo.ckan.org/dataset/1234") == expected
|
||||
|
||||
|
||||
@pytest.mark.parametrize("test_input, expected", test_hosts)
|
||||
def test_detect_ckan(test_input, expected):
|
||||
assert CKAN().detect(test_input[0]) == expected
|
||||
|
||||
def test_detect_not_ckan():
|
||||
# Don't trigger the CKAN content provider
|
||||
assert CKAN().detect("/some/path/here") is None
|
||||
assert CKAN().detect("https://example.com/path/here") is None
|
||||
|
|
Ładowanie…
Reference in New Issue