Add a urlopen helper to Zenodo content provider

Use a helper function to inject a default user-agent header into every
request we make.
pull/693/head
Tim Head 2019-05-29 08:17:22 +02:00
rodzic e99c80799d
commit 363c962efd
2 zmienionych plików z 40 dodań i 24 usunięć

Wyświetl plik

@ -4,16 +4,30 @@ import shutil
from os import makedirs
from os import path
from urllib.request import urlopen, Request
from urllib.request import build_opener, urlopen, Request
from zipfile import ZipFile, is_zipfile
from .base import ContentProvider
from ..utils import copytree
from .. import __version__
class Zenodo(ContentProvider):
"""Provide contents of a Zenodo deposit."""
def _urlopen(self, req, headers=None):
"""A urlopen() helper"""
# someone passed a string, not a request
if not isinstance(req, Request):
req = Request(req)
req.add_header("User-Agent", "repo2docker {}".format(__version__))
if headers is not None:
for key, value in headers.items():
req.add_header(key, value)
return urlopen(req)
def detect(self, doi, ref=None, extra_args=None):
"""Trigger this provider for things that resolve to a Zenodo record"""
# To support Zenodo instances not hosted at zenodo.org we need to
@ -23,14 +37,14 @@ class Zenodo(ContentProvider):
doi = doi.lower()
# 10.5281 is the Zenodo DOI prefix
if doi.startswith("10.5281/"):
resp = urlopen("https://doi.org/{}".format(doi))
resp = self._urlopen("https://doi.org/{}".format(doi))
self.record_id = resp.url.rsplit("/", maxsplit=1)[1]
return {"record": self.record_id}
elif doi.startswith("https://doi.org/10.5281/") or doi.startswith(
"http://doi.org/10.5281/"
):
resp = urlopen(doi)
resp = self._urlopen(doi)
self.record_id = resp.url.rsplit("/", maxsplit=1)[1]
return {"record": self.record_id}
@ -49,14 +63,14 @@ class Zenodo(ContentProvider):
"https://zenodo.org/api/records/{}".format(record_id),
headers={"accept": "application/json"},
)
resp = urlopen(req)
resp = self._urlopen(req)
record = json.loads(resp.read().decode("utf-8"))
def _fetch(file_ref, unzip=False):
# the assumption is that `unzip=True` means that this is the only
# file related to the zenodo record
with urlopen(file_ref["links"]["download"]) as src:
with self._urlopen(file_ref["links"]["download"]) as src:
fname = file_ref["filename"]
if path.dirname(fname):
sub_dir = path.join(output_dir, path.dirname(fname))

Wyświetl plik

@ -12,14 +12,16 @@ from repo2docker.contentproviders import Zenodo
def test_content_id():
zen = Zenodo()
with patch.object(Zenodo, "_urlopen") as fake_urlopen:
fake_urlopen.return_value.url = "https://zenodo.org/record/3232985"
zen = Zenodo()
zen.detect("10.5281/zenodo.3232985")
assert zen.content_id == "3232985"
zen.detect("10.5281/zenodo.3232985")
assert zen.content_id == "3232985"
def test_detect():
with patch("repo2docker.contentproviders.zenodo.urlopen") as fake_urlopen:
with patch.object(Zenodo, "_urlopen") as fake_urlopen:
fake_urlopen.return_value.url = "https://zenodo.org/record/3232985"
# valid Zenodo DOIs trigger this content provider
assert Zenodo().detect("10.5281/zenodo.3232985") == {"record": "3232985"}
@ -29,7 +31,7 @@ def test_detect():
# only two of the three calls above have to resolve a DOI
assert fake_urlopen.call_count == 2
with patch("repo2docker.contentproviders.zenodo.urlopen") as fake_urlopen:
with patch.object(Zenodo, "_urlopen") as fake_urlopen:
# Don't trigger the Zenodo content provider
assert Zenodo().detect("/some/path/here") is None
assert Zenodo().detect("https://example.com/path/here") is None
@ -69,16 +71,16 @@ def test_fetch_software_from_github_archive():
).encode("utf-8")
)
def mock_urlopen(req_or_path):
if isinstance(req_or_path, Request):
def mock_urlopen(self, req):
if isinstance(req, Request):
return mock_response
else:
return urlopen(req_or_path)
return urlopen(req)
with patch.object(Zenodo, '_urlopen', new=mock_urlopen):
zen = Zenodo()
with patch("repo2docker.contentproviders.zenodo.urlopen", new=mock_urlopen):
with TemporaryDirectory() as d:
zen = Zenodo()
output = []
for l in zen.fetch({"record": "1234"}, d):
output.append(l)
@ -108,13 +110,13 @@ def test_fetch_software():
).encode("utf-8")
)
def mock_urlopen(req_or_path):
if isinstance(req_or_path, Request):
def mock_urlopen(self, req):
if isinstance(req, Request):
return mock_response
else:
return urlopen(req_or_path)
return urlopen(req)
with patch("repo2docker.contentproviders.zenodo.urlopen", new=mock_urlopen):
with patch.object(Zenodo, '_urlopen', new=mock_urlopen):
with TemporaryDirectory() as d:
zen = Zenodo()
@ -149,13 +151,13 @@ def test_fetch_data():
).encode("utf-8")
)
def mock_urlopen(req_or_path):
if isinstance(req_or_path, Request):
def mock_urlopen(self, req):
if isinstance(req, Request):
return mock_response
else:
return urlopen(req_or_path)
return urlopen(req)
with patch("repo2docker.contentproviders.zenodo.urlopen", new=mock_urlopen):
with patch.object(Zenodo, '_urlopen', new=mock_urlopen):
with TemporaryDirectory() as d:
zen = Zenodo()