repo2docker/tests/unit/contentproviders/test_swhid.py

158 wiersze
4.1 KiB
Python

import io
import json
import logging
import os
import re
import shutil
import tarfile
import tempfile
import urllib
from os import makedirs
from os.path import join
from unittest.mock import MagicMock, mock_open, patch
from zipfile import ZipFile
import pytest
import requests_mock
from repo2docker.contentproviders.base import ContentProviderException
from repo2docker.contentproviders.swhid import Swhid, parse_swhid
# this is a slightly stripped down copy of swh.model.cli.swhid_of_dir().
# We do not use this later to prevent having to depend on swh.model[cli]
def swhid_of_dir(path):
object = Directory.from_disk(path=path).get_data()
return swhid(DIRECTORY, object)
def test_content_id():
swhid = Swhid()
assert swhid.content_id is None
swhids_ok = [
"swh:1:dir:" + "0" * 40,
"swh:1:rev:" + "0" * 40,
]
swhids_invalid = [
"swh:1:dir:" + "0" * 39,
"swh:2:dir:" + "0" * 40,
"swh:1:rev:" + "0" * 41,
"swh:1:cnt:" + "0" * 40,
"swh:1:ori:" + "0" * 40,
"swh:1:rel:" + "0" * 40,
"swh:1:snp:" + "0" * 40,
]
detect_values = [
(swhid, {"swhid": swhid, "swhid_obj": parse_swhid(swhid)}) for swhid in swhids_ok
] + [(swhid, None) for swhid in swhids_invalid]
@pytest.mark.parametrize("swhid, expected", detect_values)
def test_detect(swhid, expected):
provider = Swhid()
assert provider.detect(swhid) == expected
def fake_urlopen(req):
print(req)
return req.headers
def test_unresolving_swhid():
provider = Swhid()
# swhid = "0" * 40
# assert provider.swhid2url(swhid) is swhid
NULLID = "0" * 40
@pytest.fixture
def gen_tarfile(tmpdir):
rootdir = join(tmpdir, "tmp")
makedirs(rootdir)
with open(join(rootdir, "file1.txt"), "wb") as fobj:
fobj.write(b"Some content\n")
# this directory hash can be computed using the swh.model package, but we do
# nto want to depend on this later to limit dependencies and because it
# does not support python 3.6;
dirhash = "89a3bd29a2c5ae0b1465febbe5df09730a8576fe"
buf = io.BytesIO()
tarf = tarfile.open(name=dirhash, fileobj=buf, mode="w")
tarf.add(rootdir, arcname=dirhash)
tarf.close()
shutil.rmtree(rootdir)
return dirhash, buf.getvalue()
def mocked_provider(tmpdir, dirhash, tarfile_buf):
provider = Swhid()
adapter = requests_mock.Adapter()
provider.base_url = "mock://api/1"
provider.retry_delay = 0.1
provider.session.mount("mock://", adapter)
adapter.register_uri(
"GET",
f"mock://api/1/revision/{NULLID}/",
json={
"author": {"fullname": "John Doe <jdoe@example.com>"},
"directory": dirhash,
},
)
adapter.register_uri(
"POST",
f"mock://api/1/vault/directory/{dirhash}/",
json={
"fetch_url": f"mock://api/1/vault/directory/{dirhash}/raw/",
"status": "new",
},
)
adapter.register_uri(
"GET",
f"mock://api/1/vault/directory/{dirhash}/",
[
{
"json": {
"fetch_url": f"mock://api/1/vault/directory/{dirhash}/raw/",
"status": "pending",
}
},
{
"json": {
"fetch_url": f"mock://api/1/vault/directory/{dirhash}/raw/",
"status": "done",
}
},
],
)
adapter.register_uri(
"GET",
f"mock://api/1/vault/directory/{dirhash}/raw/",
content=tarfile_buf,
)
return provider
def test_fetch_revision(tmpdir, gen_tarfile):
dir_id, tarfile_buf = gen_tarfile
provider = mocked_provider(tmpdir, dir_id, tarfile_buf)
swhid = "swh:1:rev:" + NULLID
for log in provider.fetch(provider.detect(swhid), tmpdir):
print(log)
assert provider.content_id == "swh:1:dir:" + dir_id
def test_fetch_directory(tmpdir, gen_tarfile):
dir_id, tarfile_buf = gen_tarfile
provider = mocked_provider(tmpdir, dir_id, tarfile_buf)
swhid = "swh:1:dir:" + dir_id
for log in provider.fetch(provider.detect(swhid), tmpdir):
print(log)
assert provider.content_id == swhid