Test login to registry as well

pull/1421/head
YuviPanda 2025-02-28 13:44:18 -08:00
rodzic c294236063
commit aa948e08af
2 zmienionych plików z 77 dodań i 6 usunięć

Wyświetl plik

@ -5,3 +5,4 @@ pytest-cov
pytest>=7 pytest>=7
pyyaml pyyaml
requests_mock requests_mock
bcrypt

Wyświetl plik

@ -1,9 +1,13 @@
import json
import os import os
import secrets import secrets
import shutil import shutil
import socket import socket
import subprocess import subprocess
from tempfile import TemporaryDirectory
from base64 import b64encode
import time import time
import bcrypt
from pathlib import Path from pathlib import Path
import pytest import pytest
@ -16,8 +20,9 @@ HERE = Path(__file__).parent
@pytest.fixture(scope="session") @pytest.fixture(scope="session")
def dind(registry, host_ip): def dind(registry):
port = get_free_port() port = get_free_port()
registry_host, _, _ = registry
# docker daemon will generate certs here, that we can then use to connect to it. # docker daemon will generate certs here, that we can then use to connect to it.
# put it in current dir than in /tmp because on macos, current dir is likely to # put it in current dir than in /tmp because on macos, current dir is likely to
@ -42,7 +47,7 @@ def dind(registry, host_ip):
"--host", "--host",
"0.0.0.0:2376", "0.0.0.0:2376",
"--insecure-registry", "--insecure-registry",
registry, registry_host,
] ]
proc = subprocess.Popen(cmd) proc = subprocess.Popen(cmd)
time.sleep(5) time.sleep(5)
@ -80,11 +85,27 @@ def host_ip():
@pytest.fixture(scope="session") @pytest.fixture(scope="session")
def registry(host_ip): def registry(host_ip):
port = get_free_port() port = get_free_port()
username = "user"
password = secrets.token_hex(16)
bcrypted_pw = bcrypt.hashpw(password.encode("utf-8"), bcrypt.gensalt(rounds=12)).decode("utf-8")
# We put our password here, and mount it into the container.
# put it in current dir than in /tmp because on macos, current dir is likely to
# shared with docker VM so it can be mounted, unlike /tmp
htpasswd_dir = HERE / f"tmp-certs-{secrets.token_hex(8)}"
htpasswd_dir.mkdir()
(htpasswd_dir / "htpasswd.conf").write_text(f"{username}:{bcrypted_pw}")
# Explicitly pull the image first so it runs on time # Explicitly pull the image first so it runs on time
registry_image = "registry:3.0.0-rc.3" registry_image = "registry:3.0.0-rc.3"
subprocess.check_call(["docker", "pull", registry_image]) subprocess.check_call(["docker", "pull", registry_image])
cmd = ["docker", "run", "--rm", "-p", f"{port}:5000", registry_image] cmd = ["docker", "run", "--rm",
"-e", "REGISTRY_AUTH=htpasswd",
"-e", "REGISTRY_AUTH_HTPASSWD_REALM=basic",
"-e", "REGISTRY_AUTH_HTPASSWD_PATH=/opt/htpasswd/htpasswd.conf",
"--mount", f"type=bind,src={htpasswd_dir},dst=/opt/htpasswd",
"-p", f"{port}:5000", registry_image]
proc = subprocess.Popen(cmd) proc = subprocess.Popen(cmd)
health_url = f"http://{host_ip}:{port}/v2" health_url = f"http://{host_ip}:{port}/v2"
# Wait for the registry to actually come up # Wait for the registry to actually come up
@ -101,14 +122,18 @@ def registry(host_ip):
raise TimeoutError("Test registry did not come up in time") raise TimeoutError("Test registry did not come up in time")
try: try:
yield f"{host_ip}:{port}" yield f"{host_ip}:{port}", username, password
finally: finally:
proc.terminate() proc.terminate()
proc.wait() proc.wait()
def test_registry(registry, dind): def test_registry_explicit_creds(registry, dind):
image_name = f"{registry}/{secrets.token_hex(8)}:latest" """
Test that we can push to registry when given explicit credentials
"""
registry_host, username, password = registry
image_name = f"{registry_host}/{secrets.token_hex(8)}:latest"
r2d = make_r2d(["--image", image_name, "--push", "--no-run", str(HERE)]) r2d = make_r2d(["--image", image_name, "--push", "--no-run", str(HERE)])
docker_host, cert_dir = dind docker_host, cert_dir = dind
@ -119,8 +144,53 @@ def test_registry(registry, dind):
os.environ["DOCKER_HOST"] = docker_host os.environ["DOCKER_HOST"] = docker_host
os.environ["DOCKER_CERT_PATH"] = str(cert_dir / "client") os.environ["DOCKER_CERT_PATH"] = str(cert_dir / "client")
os.environ["DOCKER_TLS_VERIFY"] = "1" os.environ["DOCKER_TLS_VERIFY"] = "1"
os.environ["CONTAINER_ENGINE_REGISTRY_CREDENTIALS"] = json.dumps({
"registry": f"http://{registry_host}",
"username": username,
"password": password
})
r2d.start() r2d.start()
proc = subprocess.run(["docker", "manifest", "inspect", "--insecure", image_name])
assert proc.returncode == 0
# Validate that we didn't leak our registry creds into existing docker config
docker_config_path = Path(os.environ.get("DOCKER_CONFIG", "~/.docker/config.json")).expanduser()
if docker_config_path.exists():
# Just check that our randomly generated password is not in this file
# Can this cause a conflict? Sure, if there's a different randomly generated password in here
# that matches our own randomly generated password. But if you're that unlucky, take cover from the asteroid.
assert password not in docker_config_path.read_text()
finally:
os.environ.clear()
os.environ.update(old_environ)
def test_registry_no_explicit_creds(registry, dind):
"""
Test that we can push to registry *without* explicit credentials but reading from a DOCKER_CONFIG
"""
registry_host, username, password = registry
image_name = f"{registry_host}/{secrets.token_hex(8)}:latest"
r2d = make_r2d(["--image", image_name, "--push", "--no-run", str(HERE)])
docker_host, cert_dir = dind
old_environ = os.environ.copy()
try:
os.environ["DOCKER_HOST"] = docker_host
os.environ["DOCKER_CERT_PATH"] = str(cert_dir / "client")
os.environ["DOCKER_TLS_VERIFY"] = "1"
with TemporaryDirectory() as d:
(Path(d) / "config.json").write_text(json.dumps(
({"auths":{f"http://{registry_host}":{"auth":b64encode(f"{username}:{password}".encode()).decode()}}})
))
os.environ["DOCKER_CONFIG"] = d
r2d.start()
proc = subprocess.run(["docker", "manifest", "inspect", "--insecure", image_name]) proc = subprocess.run(["docker", "manifest", "inspect", "--insecure", image_name])
assert proc.returncode == 0 assert proc.returncode == 0
finally: finally: