kopia lustrzana https://github.com/jupyterhub/repo2docker
commit
e33d5f86ca
|
@ -83,4 +83,3 @@ time there is no active plan for an item. The project would like to find the
|
|||
resources and time to discuss and then execute these ideas.
|
||||
* support execution on a remote host (with more resources than available locally) via the command-line
|
||||
* add support for using ZIP files as the repo (`repo2docker https://example.com/an-archive.zip`) this will give us access to several archives (like Zenodo) that expose things as ZIP files.
|
||||
* add support for Zenodo (`repo2docker 10.5281/zenodo.1476680`) so Zenodo software archives can be used as the source in addition to a git repository
|
||||
|
|
|
@ -12,7 +12,7 @@ Using ``repo2docker``
|
|||
|
||||
``repo2docker`` can build a reproducible computational environment for any repository that
|
||||
follows :ref:`specification`. repo2docker is called with the URL of a Git repository,
|
||||
a Zenodo DOI or a path to a local directory. It then
|
||||
a DOI from Zenodo or Figshare, or a path to a local directory. It then
|
||||
performs these steps:
|
||||
|
||||
1. Inspects the repository for :ref:`configuration files <config-files>`. These will be used to build
|
||||
|
|
|
@ -142,7 +142,12 @@ class Repo2Docker(Application):
|
|||
# detecting if something will successfully `git clone` is very hard if all
|
||||
# you can do is look at the path/URL to it.
|
||||
content_providers = List(
|
||||
[contentproviders.Local, contentproviders.Zenodo, contentproviders.Git],
|
||||
[
|
||||
contentproviders.Local,
|
||||
contentproviders.Zenodo,
|
||||
contentproviders.Figshare,
|
||||
contentproviders.Git,
|
||||
],
|
||||
config=True,
|
||||
help="""
|
||||
Ordered list by priority of ContentProviders to try in turn to fetch
|
||||
|
|
|
@ -1,3 +1,4 @@
|
|||
from .git import Git
|
||||
from .base import Local
|
||||
from .zenodo import Zenodo
|
||||
from .figshare import Figshare
|
||||
|
|
|
@ -0,0 +1,90 @@
|
|||
import os
|
||||
import json
|
||||
import shutil
|
||||
import logging
|
||||
|
||||
from os import makedirs
|
||||
from os import path
|
||||
from urllib import request # urlopen, Request
|
||||
from urllib.error import HTTPError
|
||||
from zipfile import ZipFile, is_zipfile
|
||||
|
||||
from .base import ContentProvider
|
||||
from ..utils import copytree, deep_get
|
||||
from ..utils import normalize_doi, is_doi
|
||||
from .. import __version__
|
||||
|
||||
|
||||
class DoiProvider(ContentProvider):
|
||||
"""Provide contents of a repository identified by a DOI and some helper functions."""
|
||||
|
||||
def urlopen(self, req, headers=None):
|
||||
"""A urlopen() helper"""
|
||||
# someone passed a string, not a request
|
||||
if not isinstance(req, request.Request):
|
||||
req = request.Request(req)
|
||||
|
||||
req.add_header("User-Agent", "repo2docker {}".format(__version__))
|
||||
if headers is not None:
|
||||
for key, value in headers.items():
|
||||
req.add_header(key, value)
|
||||
|
||||
return request.urlopen(req)
|
||||
|
||||
def doi2url(self, doi):
|
||||
# Transform a DOI to a URL
|
||||
# If not a doi, assume we have a URL and return
|
||||
if is_doi(doi):
|
||||
doi = normalize_doi(doi)
|
||||
|
||||
try:
|
||||
resp = self.urlopen("https://doi.org/{}".format(doi))
|
||||
# If the DOI doesn't resolve, just return URL
|
||||
except HTTPError:
|
||||
return doi
|
||||
return resp.url
|
||||
else:
|
||||
# Just return what is actulally just a URL
|
||||
return doi
|
||||
|
||||
def fetch_file(self, file_ref, host, output_dir, unzip=False):
|
||||
# the assumption is that `unzip=True` means that this is the only
|
||||
# file related to a record
|
||||
file_url = deep_get(file_ref, host["download"])
|
||||
fname = deep_get(file_ref, host["filename"])
|
||||
logging.debug("Downloading file {} as {}\n".format(file_url, fname))
|
||||
with self.urlopen(file_url) as src:
|
||||
if path.dirname(fname):
|
||||
sub_dir = path.join(output_dir, path.dirname(fname))
|
||||
if not path.exists(sub_dir):
|
||||
yield "Creating {}\n".format(sub_dir)
|
||||
makedirs(sub_dir, exist_ok=True)
|
||||
|
||||
dst_fname = path.join(output_dir, fname)
|
||||
with open(dst_fname, "wb") as dst:
|
||||
yield "Fetching {}\n".format(fname)
|
||||
shutil.copyfileobj(src, dst)
|
||||
# first close the newly written file, then continue
|
||||
# processing it
|
||||
if unzip and is_zipfile(dst_fname):
|
||||
yield "Extracting {}\n".format(fname)
|
||||
zfile = ZipFile(dst_fname)
|
||||
zfile.extractall(path=output_dir)
|
||||
zfile.close()
|
||||
|
||||
# delete downloaded file ...
|
||||
os.remove(dst_fname)
|
||||
# ... and any directories we might have created,
|
||||
# in which case sub_dir will be defined
|
||||
if path.dirname(fname):
|
||||
shutil.rmtree(sub_dir)
|
||||
|
||||
new_subdirs = os.listdir(output_dir)
|
||||
# if there is only one new subdirectory move its contents
|
||||
# to the top level directory
|
||||
if len(new_subdirs) == 1:
|
||||
d = new_subdirs[0]
|
||||
copytree(path.join(output_dir, d), output_dir)
|
||||
shutil.rmtree(path.join(output_dir, d))
|
||||
|
||||
yield "Fetched files: {}\n".format(os.listdir(output_dir))
|
|
@ -0,0 +1,97 @@
|
|||
import os
|
||||
import re
|
||||
import json
|
||||
import shutil
|
||||
|
||||
from os import makedirs
|
||||
from os import path
|
||||
from urllib.request import Request
|
||||
from urllib.error import HTTPError
|
||||
from zipfile import is_zipfile
|
||||
|
||||
from .doi import DoiProvider
|
||||
from ..utils import copytree, deep_get
|
||||
|
||||
|
||||
class Figshare(DoiProvider):
|
||||
"""Provide contents of a Figshare article.
|
||||
|
||||
See https://docs.figshare.com/#public_article for API docs.
|
||||
|
||||
Examples:
|
||||
- https://doi.org/10.6084/m9.figshare.9782777
|
||||
- https://doi.org/10.6084/m9.figshare.9782777.v2
|
||||
- https://figshare.com/articles/binder-examples_requirements/9784088 (only one zipfile, no DOI)
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
self.hosts = [
|
||||
{
|
||||
"hostname": [
|
||||
"https://figshare.com/articles/",
|
||||
"http://figshare.com/articles/",
|
||||
"https://figshare.com/account/articles/",
|
||||
],
|
||||
"api": "https://api.figshare.com/v2/articles/",
|
||||
"filepath": "files",
|
||||
"filename": "name",
|
||||
"download": "download_url",
|
||||
}
|
||||
]
|
||||
|
||||
url_regex = re.compile(r"(.*)/articles/([^/]+)/(\d+)(/)?(\d+)?")
|
||||
|
||||
def detect(self, doi, ref=None, extra_args=None):
|
||||
"""Trigger this provider for things that resolve to a Figshare article"""
|
||||
# We need the hostname (url where records are), api url (for metadata),
|
||||
# filepath (path to files in metadata), filename (path to filename in
|
||||
# metadata), download (path to file download URL), and type (path to item type in metadata)
|
||||
|
||||
url = self.doi2url(doi)
|
||||
|
||||
for host in self.hosts:
|
||||
if any([url.startswith(s) for s in host["hostname"]]):
|
||||
match = self.url_regex.match(url)
|
||||
if match:
|
||||
self.article_id = match.groups()[2]
|
||||
self.article_version = match.groups()[4]
|
||||
if not self.article_version:
|
||||
self.article_version = "1"
|
||||
return {
|
||||
"article": self.article_id,
|
||||
"host": host,
|
||||
"version": self.article_version,
|
||||
}
|
||||
else:
|
||||
return None
|
||||
|
||||
def fetch(self, spec, output_dir, yield_output=False):
|
||||
"""Fetch and unpack a Figshare article"""
|
||||
article_id = spec["article"]
|
||||
article_version = spec["version"]
|
||||
host = spec["host"]
|
||||
|
||||
yield "Fetching Figshare article {} in version {}.\n".format(
|
||||
article_id, article_version
|
||||
)
|
||||
req = Request(
|
||||
"{}{}/versions/{}".format(host["api"], article_id, article_version),
|
||||
headers={"accept": "application/json"},
|
||||
)
|
||||
resp = self.urlopen(req)
|
||||
|
||||
article = json.loads(resp.read().decode("utf-8"))
|
||||
|
||||
files = deep_get(article, host["filepath"])
|
||||
# only fetch files where is_link_only: False
|
||||
files = [file for file in files if not file["is_link_only"]]
|
||||
only_one_file = len(files) == 1
|
||||
for file_ref in files:
|
||||
unzip = file_ref["name"].endswith(".zip") and only_one_file
|
||||
for line in self.fetch_file(file_ref, host, output_dir, unzip):
|
||||
yield line
|
||||
|
||||
@property
|
||||
def content_id(self):
|
||||
"""The Figshare article ID"""
|
||||
return "{}.v{}".format(self.article_id, self.article_version)
|
|
@ -4,54 +4,21 @@ import shutil
|
|||
|
||||
from os import makedirs
|
||||
from os import path
|
||||
from urllib.request import urlopen, Request
|
||||
from urllib.request import Request
|
||||
from urllib.error import HTTPError
|
||||
from zipfile import ZipFile, is_zipfile
|
||||
|
||||
from .base import ContentProvider
|
||||
from .doi import DoiProvider
|
||||
from ..utils import copytree, deep_get
|
||||
from ..utils import normalize_doi, is_doi
|
||||
from .. import __version__
|
||||
|
||||
|
||||
class Zenodo(ContentProvider):
|
||||
class Zenodo(DoiProvider):
|
||||
"""Provide contents of a Zenodo deposit."""
|
||||
|
||||
def _urlopen(self, req, headers=None):
|
||||
"""A urlopen() helper"""
|
||||
# someone passed a string, not a request
|
||||
if not isinstance(req, Request):
|
||||
req = Request(req)
|
||||
|
||||
req.add_header("User-Agent", "repo2docker {}".format(__version__))
|
||||
if headers is not None:
|
||||
for key, value in headers.items():
|
||||
req.add_header(key, value)
|
||||
|
||||
return urlopen(req)
|
||||
|
||||
def _doi2url(self, doi):
|
||||
# Transform a DOI to a URL
|
||||
# If not a doi, assume we have a URL and return
|
||||
if is_doi(doi):
|
||||
doi = normalize_doi(doi)
|
||||
|
||||
try:
|
||||
resp = self._urlopen("https://doi.org/{}".format(doi))
|
||||
# If the DOI doesn't resolve, just return URL
|
||||
except HTTPError:
|
||||
return doi
|
||||
return resp.url
|
||||
else:
|
||||
# Just return what is actulally just a URL
|
||||
return doi
|
||||
|
||||
def detect(self, doi, ref=None, extra_args=None):
|
||||
"""Trigger this provider for things that resolve to a Zenodo/Invenio record"""
|
||||
def __init__(self):
|
||||
# We need the hostname (url where records are), api url (for metadata),
|
||||
# filepath (path to files in metadata), filename (path to filename in
|
||||
# metadata), download (path to file download URL), and type (path to item type in metadata)
|
||||
hosts = [
|
||||
self.hosts = [
|
||||
{
|
||||
"hostname": ["https://zenodo.org/record/", "http://zenodo.org/record/"],
|
||||
"api": "https://zenodo.org/api/records/",
|
||||
|
@ -73,9 +40,11 @@ class Zenodo(ContentProvider):
|
|||
},
|
||||
]
|
||||
|
||||
url = self._doi2url(doi)
|
||||
def detect(self, doi, ref=None, extra_args=None):
|
||||
"""Trigger this provider for things that resolve to a Zenodo/Invenio record"""
|
||||
url = self.doi2url(doi)
|
||||
|
||||
for host in hosts:
|
||||
for host in self.hosts:
|
||||
if any([url.startswith(s) for s in host["hostname"]]):
|
||||
self.record_id = url.rsplit("/", maxsplit=1)[1]
|
||||
return {"record": self.record_id, "host": host}
|
||||
|
@ -90,53 +59,17 @@ class Zenodo(ContentProvider):
|
|||
"{}{}".format(host["api"], record_id),
|
||||
headers={"accept": "application/json"},
|
||||
)
|
||||
resp = self._urlopen(req)
|
||||
resp = self.urlopen(req)
|
||||
|
||||
record = json.loads(resp.read().decode("utf-8"))
|
||||
|
||||
def _fetch(file_ref, unzip=False):
|
||||
# the assumption is that `unzip=True` means that this is the only
|
||||
# file related to the zenodo record
|
||||
with self._urlopen(deep_get(file_ref, host["download"])) as src:
|
||||
fname = deep_get(file_ref, host["filename"])
|
||||
if path.dirname(fname):
|
||||
sub_dir = path.join(output_dir, path.dirname(fname))
|
||||
if not path.exists(sub_dir):
|
||||
yield "Creating {}\n".format(sub_dir)
|
||||
makedirs(sub_dir, exist_ok=True)
|
||||
|
||||
dst_fname = path.join(output_dir, fname)
|
||||
with open(dst_fname, "wb") as dst:
|
||||
yield "Fetching {}\n".format(fname)
|
||||
shutil.copyfileobj(src, dst)
|
||||
# first close the newly written file, then continue
|
||||
# processing it
|
||||
if unzip and is_zipfile(dst_fname):
|
||||
yield "Extracting {}\n".format(fname)
|
||||
zfile = ZipFile(dst_fname)
|
||||
zfile.extractall(path=output_dir)
|
||||
zfile.close()
|
||||
|
||||
# delete downloaded file ...
|
||||
os.remove(dst_fname)
|
||||
# ... and any directories we might have created,
|
||||
# in which case sub_dir will be defined
|
||||
if path.dirname(fname):
|
||||
shutil.rmtree(sub_dir)
|
||||
|
||||
new_subdirs = os.listdir(output_dir)
|
||||
# if there is only one new subdirectory move its contents
|
||||
# to the top level directory
|
||||
if len(new_subdirs) == 1:
|
||||
d = new_subdirs[0]
|
||||
copytree(path.join(output_dir, d), output_dir)
|
||||
shutil.rmtree(path.join(output_dir, d))
|
||||
|
||||
is_software = deep_get(record, host["type"]).lower() == "software"
|
||||
files = deep_get(record, host["filepath"])
|
||||
only_one_file = len(files) == 1
|
||||
for file_ref in files:
|
||||
for line in _fetch(file_ref, unzip=is_software and only_one_file):
|
||||
for line in self.fetch_file(
|
||||
file_ref, host, output_dir, is_software and only_one_file
|
||||
):
|
||||
yield line
|
||||
|
||||
@property
|
||||
|
|
|
@ -0,0 +1,41 @@
|
|||
import json
|
||||
import os
|
||||
import re
|
||||
import urllib
|
||||
import pytest
|
||||
import tempfile
|
||||
import logging
|
||||
|
||||
from unittest.mock import patch, MagicMock, mock_open
|
||||
from zipfile import ZipFile
|
||||
|
||||
from repo2docker.contentproviders.doi import DoiProvider
|
||||
from repo2docker.contentproviders.base import ContentProviderException
|
||||
|
||||
|
||||
def test_content_id():
|
||||
doi = DoiProvider()
|
||||
assert doi.content_id is None
|
||||
|
||||
|
||||
def fake_urlopen(req):
|
||||
print(req)
|
||||
return req.headers
|
||||
|
||||
|
||||
@patch("urllib.request.urlopen", fake_urlopen)
|
||||
def test_url_headers():
|
||||
doi = DoiProvider()
|
||||
|
||||
headers = {"test1": "value1", "Test2": "value2"}
|
||||
result = doi.urlopen("https://mybinder.org", headers=headers)
|
||||
assert "Test1" in result
|
||||
assert "Test2" in result
|
||||
assert len(result) is 3 # User-agent is also set
|
||||
|
||||
|
||||
def test_unresolving_doi():
|
||||
doi = DoiProvider()
|
||||
|
||||
fakedoi = "10.1/1234"
|
||||
assert doi.doi2url(fakedoi) is fakedoi
|
|
@ -0,0 +1,178 @@
|
|||
import json
|
||||
import os
|
||||
import re
|
||||
import pytest
|
||||
|
||||
from contextlib import contextmanager
|
||||
from io import BytesIO
|
||||
from tempfile import TemporaryDirectory, NamedTemporaryFile
|
||||
from unittest.mock import patch
|
||||
from urllib.request import urlopen, Request
|
||||
from zipfile import ZipFile
|
||||
|
||||
from repo2docker.contentproviders import Figshare
|
||||
from repo2docker.__main__ import make_r2d
|
||||
|
||||
|
||||
test_content_ids = [
|
||||
("https://figshare.com/articles/title/9782777", "9782777.v1"),
|
||||
("https://figshare.com/articles/title/9782777/2", "9782777.v2"),
|
||||
("https://figshare.com/articles/title/9782777/1234", "9782777.v1234"),
|
||||
]
|
||||
|
||||
|
||||
@pytest.mark.parametrize("link,expected", test_content_ids)
|
||||
def test_content_id(link, expected):
|
||||
with patch.object(Figshare, "urlopen") as fake_urlopen:
|
||||
fake_urlopen.return_value.url = link
|
||||
fig = Figshare()
|
||||
fig.detect("10.6084/m9.figshare.9782777")
|
||||
assert fig.content_id == expected
|
||||
|
||||
|
||||
test_fig = Figshare()
|
||||
test_fig.article_id = "123456"
|
||||
test_fig.article_version = "42"
|
||||
|
||||
test_dois_links = [
|
||||
(
|
||||
"10.6084/m9.figshare.9782777",
|
||||
{"host": test_fig.hosts[0], "article": "9782777", "version": "1"},
|
||||
),
|
||||
(
|
||||
"10.6084/m9.figshare.9782777.v1",
|
||||
{"host": test_fig.hosts[0], "article": "9782777", "version": "1"},
|
||||
),
|
||||
(
|
||||
"10.6084/m9.figshare.9782777.v2",
|
||||
{"host": test_fig.hosts[0], "article": "9782777", "version": "2"},
|
||||
),
|
||||
(
|
||||
"https://doi.org/10.6084/m9.figshare.9782777.v1",
|
||||
{"host": test_fig.hosts[0], "article": "9782777", "version": "1"},
|
||||
),
|
||||
(
|
||||
"https://doi.org/10.6084/m9.figshare.9782777.v3",
|
||||
{"host": test_fig.hosts[0], "article": "9782777", "version": "3"},
|
||||
),
|
||||
(
|
||||
"https://figshare.com/articles/title/97827771234",
|
||||
{"host": test_fig.hosts[0], "article": "97827771234", "version": "1"},
|
||||
),
|
||||
(
|
||||
"https://figshare.com/articles/title/9782777/1",
|
||||
{"host": test_fig.hosts[0], "article": "9782777", "version": "1"},
|
||||
),
|
||||
(
|
||||
"https://figshare.com/articles/title/9782777/2",
|
||||
{"host": test_fig.hosts[0], "article": "9782777", "version": "2"},
|
||||
),
|
||||
(
|
||||
"https://figshare.com/articles/title/9782777/",
|
||||
{"host": test_fig.hosts[0], "article": "9782777", "version": "1"},
|
||||
),
|
||||
(
|
||||
"https://figshare.com/articles/title/9782777/1234",
|
||||
{"host": test_fig.hosts[0], "article": "9782777", "version": "1234"},
|
||||
),
|
||||
]
|
||||
|
||||
test_spec = {"host": test_fig.hosts[0], "article": "123456", "version": "42"}
|
||||
|
||||
|
||||
@pytest.mark.parametrize("test_input,expected", test_dois_links)
|
||||
def test_detect_figshare(test_input, expected):
|
||||
assert Figshare().detect(test_input) == expected
|
||||
|
||||
|
||||
def test_detect_not_figshare():
|
||||
assert Figshare().detect("/some/path/here") is None
|
||||
assert Figshare().detect("https://example.com/path/here") is None
|
||||
assert Figshare().detect("10.21105/joss.01277") is None
|
||||
assert Figshare().detect("10.5281/zenodo.3232985") is None
|
||||
assert Figshare().detect("https://doi.org/10.21105/joss.01277") is None
|
||||
|
||||
|
||||
@contextmanager
|
||||
def figshare_archive(prefix="a_directory"):
|
||||
with NamedTemporaryFile(suffix=".zip") as zfile:
|
||||
with ZipFile(zfile.name, mode="w") as zip:
|
||||
zip.writestr("{}/some-file.txt".format(prefix), "some content")
|
||||
zip.writestr("{}/some-other-file.txt".format(prefix), "some more content")
|
||||
|
||||
yield zfile.name
|
||||
|
||||
|
||||
def test_fetch_zip():
|
||||
# see test_zenodo.py/test_fetch_software
|
||||
with figshare_archive() as fig_path:
|
||||
mock_response = BytesIO(
|
||||
json.dumps(
|
||||
{
|
||||
"files": [
|
||||
{
|
||||
"name": "afake.zip",
|
||||
"is_link_only": False,
|
||||
"download_url": "file://{}".format(fig_path),
|
||||
}
|
||||
]
|
||||
}
|
||||
).encode("utf-8")
|
||||
)
|
||||
|
||||
def mock_urlopen(self, req):
|
||||
if isinstance(req, Request):
|
||||
return mock_response
|
||||
else:
|
||||
return urlopen(req)
|
||||
|
||||
with patch.object(Figshare, "urlopen", new=mock_urlopen):
|
||||
with TemporaryDirectory() as d:
|
||||
output = []
|
||||
for l in test_fig.fetch(test_spec, d):
|
||||
output.append(l)
|
||||
|
||||
unpacked_files = set(os.listdir(d))
|
||||
expected = set(["some-other-file.txt", "some-file.txt"])
|
||||
assert expected == unpacked_files
|
||||
|
||||
|
||||
def test_fetch_data():
|
||||
with figshare_archive() as a_path:
|
||||
with figshare_archive() as b_path:
|
||||
mock_response = BytesIO(
|
||||
json.dumps(
|
||||
{
|
||||
"files": [
|
||||
{
|
||||
"name": "afake.file",
|
||||
"download_url": "file://{}".format(a_path),
|
||||
"is_link_only": False,
|
||||
},
|
||||
{
|
||||
"name": "bfake.data",
|
||||
"download_url": "file://{}".format(b_path),
|
||||
"is_link_only": False,
|
||||
},
|
||||
{"name": "cfake.link", "is_link_only": True},
|
||||
]
|
||||
}
|
||||
).encode("utf-8")
|
||||
)
|
||||
|
||||
def mock_urlopen(self, req):
|
||||
if isinstance(req, Request):
|
||||
return mock_response
|
||||
else:
|
||||
return urlopen(req)
|
||||
|
||||
with patch.object(Figshare, "urlopen", new=mock_urlopen):
|
||||
with TemporaryDirectory() as d:
|
||||
output = []
|
||||
for l in test_fig.fetch(test_spec, d):
|
||||
output.append(l)
|
||||
|
||||
unpacked_files = set(os.listdir(d))
|
||||
# ZIP files shouldn't have been unpacked
|
||||
expected = {"bfake.data", "afake.file"}
|
||||
assert expected == unpacked_files
|
|
@ -13,7 +13,7 @@ from repo2docker.contentproviders import Zenodo
|
|||
|
||||
|
||||
def test_content_id():
|
||||
with patch.object(Zenodo, "_urlopen") as fake_urlopen:
|
||||
with patch.object(Zenodo, "urlopen") as fake_urlopen:
|
||||
fake_urlopen.return_value.url = "https://zenodo.org/record/3232985"
|
||||
zen = Zenodo()
|
||||
|
||||
|
@ -21,6 +21,7 @@ def test_content_id():
|
|||
assert zen.content_id == "3232985"
|
||||
|
||||
|
||||
test_zen = Zenodo()
|
||||
test_hosts = [
|
||||
(
|
||||
[
|
||||
|
@ -28,17 +29,7 @@ test_hosts = [
|
|||
"10.5281/zenodo.3232985",
|
||||
"https://doi.org/10.5281/zenodo.3232985",
|
||||
],
|
||||
{
|
||||
"host": {
|
||||
"hostname": ["https://zenodo.org/record/", "http://zenodo.org/record/"],
|
||||
"api": "https://zenodo.org/api/records/",
|
||||
"filepath": "files",
|
||||
"filename": "filename",
|
||||
"download": "links.download",
|
||||
"type": "metadata.upload_type",
|
||||
},
|
||||
"record": "3232985",
|
||||
},
|
||||
{"host": test_zen.hosts[0], "record": "3232985"},
|
||||
),
|
||||
(
|
||||
[
|
||||
|
@ -46,27 +37,14 @@ test_hosts = [
|
|||
"10.22002/d1.1235",
|
||||
"https://doi.org/10.22002/d1.1235",
|
||||
],
|
||||
{
|
||||
"host": {
|
||||
"hostname": [
|
||||
"https://data.caltech.edu/records/",
|
||||
"http://data.caltech.edu/records/",
|
||||
],
|
||||
"api": "https://data.caltech.edu/api/record/",
|
||||
"filepath": "metadata.electronic_location_and_access",
|
||||
"filename": "electronic_name.0",
|
||||
"download": "uniform_resource_identifier",
|
||||
"type": "metadata.resourceType.resourceTypeGeneral",
|
||||
},
|
||||
"record": "1235",
|
||||
},
|
||||
{"host": test_zen.hosts[1], "record": "1235"},
|
||||
),
|
||||
]
|
||||
|
||||
|
||||
@pytest.mark.parametrize("test_input,expected", test_hosts)
|
||||
def test_detect_zenodo(test_input, expected):
|
||||
with patch.object(Zenodo, "_urlopen") as fake_urlopen:
|
||||
with patch.object(Zenodo, "urlopen") as fake_urlopen:
|
||||
fake_urlopen.return_value.url = test_input[0]
|
||||
# valid Zenodo DOIs trigger this content provider
|
||||
assert Zenodo().detect(test_input[0]) == expected
|
||||
|
@ -75,7 +53,7 @@ def test_detect_zenodo(test_input, expected):
|
|||
# only two of the three calls above have to resolve a DOI
|
||||
assert fake_urlopen.call_count == 2
|
||||
|
||||
with patch.object(Zenodo, "_urlopen") as fake_urlopen:
|
||||
with patch.object(Zenodo, "urlopen") as fake_urlopen:
|
||||
# Don't trigger the Zenodo content provider
|
||||
assert Zenodo().detect("/some/path/here") is None
|
||||
assert Zenodo().detect("https://example.com/path/here") is None
|
||||
|
@ -120,22 +98,9 @@ def test_fetch_software_from_github_archive():
|
|||
else:
|
||||
return urlopen(req)
|
||||
|
||||
with patch.object(Zenodo, "_urlopen", new=mock_urlopen):
|
||||
with patch.object(Zenodo, "urlopen", new=mock_urlopen):
|
||||
zen = Zenodo()
|
||||
spec = {
|
||||
"host": {
|
||||
"hostname": [
|
||||
"https://zenodo.org/record/",
|
||||
"http://zenodo.org/record/",
|
||||
],
|
||||
"api": "https://zenodo.org/api/records/",
|
||||
"filepath": "files",
|
||||
"filename": "filename",
|
||||
"download": "links.download",
|
||||
"type": "metadata.upload_type",
|
||||
},
|
||||
"record": "1234",
|
||||
}
|
||||
spec = {"host": test_zen.hosts[0], "record": "1234"}
|
||||
|
||||
with TemporaryDirectory() as d:
|
||||
output = []
|
||||
|
@ -173,23 +138,10 @@ def test_fetch_software():
|
|||
else:
|
||||
return urlopen(req)
|
||||
|
||||
with patch.object(Zenodo, "_urlopen", new=mock_urlopen):
|
||||
with patch.object(Zenodo, "urlopen", new=mock_urlopen):
|
||||
with TemporaryDirectory() as d:
|
||||
zen = Zenodo()
|
||||
spec = spec = {
|
||||
"host": {
|
||||
"hostname": [
|
||||
"https://zenodo.org/record/",
|
||||
"http://zenodo.org/record/",
|
||||
],
|
||||
"api": "https://zenodo.org/api/records/",
|
||||
"filepath": "files",
|
||||
"filename": "filename",
|
||||
"download": "links.download",
|
||||
"type": "metadata.upload_type",
|
||||
},
|
||||
"record": "1234",
|
||||
}
|
||||
spec = spec = {"host": test_zen.hosts[0], "record": "1234"}
|
||||
output = []
|
||||
for l in zen.fetch(spec, d):
|
||||
output.append(l)
|
||||
|
@ -227,23 +179,10 @@ def test_fetch_data():
|
|||
else:
|
||||
return urlopen(req)
|
||||
|
||||
with patch.object(Zenodo, "_urlopen", new=mock_urlopen):
|
||||
with patch.object(Zenodo, "urlopen", new=mock_urlopen):
|
||||
with TemporaryDirectory() as d:
|
||||
zen = Zenodo()
|
||||
spec = {
|
||||
"host": {
|
||||
"hostname": [
|
||||
"https://zenodo.org/record/",
|
||||
"http://zenodo.org/record/",
|
||||
],
|
||||
"api": "https://zenodo.org/api/records/",
|
||||
"filepath": "files",
|
||||
"filename": "filename",
|
||||
"download": "links.download",
|
||||
"type": "metadata.upload_type",
|
||||
},
|
||||
"record": "1234",
|
||||
}
|
||||
spec = {"host": test_zen.hosts[0], "record": "1234"}
|
||||
output = []
|
||||
for l in zen.fetch(spec, d):
|
||||
output.append(l)
|
||||
|
|
Ładowanie…
Reference in New Issue