Add unit tests for timestamping_enricher

pull/224/head
Patrick Robertson 2025-01-29 12:20:52 +01:00
rodzic dcd5576f29
commit 4c1c8953ca
3 zmienionych plików z 58 dodań i 8 usunięć

Wyświetl plik

@ -6,11 +6,11 @@ from importlib.metadata import version
from asn1crypto.cms import ContentInfo from asn1crypto.cms import ContentInfo
from certvalidator import CertificateValidator, ValidationContext from certvalidator import CertificateValidator, ValidationContext
from asn1crypto import pem from asn1crypto import pem
from asn1crypto.core import Asn1Value
import certifi import certifi
from auto_archiver.core import Enricher from auto_archiver.core import Enricher
from auto_archiver.core import Metadata, ArchivingContext, Media from auto_archiver.core import Metadata, ArchivingContext, Media
from auto_archiver.core import Extractor
class TimestampingEnricher(Enricher): class TimestampingEnricher(Enricher):
@ -45,13 +45,10 @@ class TimestampingEnricher(Enricher):
from slugify import slugify from slugify import slugify
for tsa_url in self.tsa_urls: for tsa_url in self.tsa_urls:
try: try:
signing_settings = SigningSettings(tsp_server=tsa_url, digest_algorithm=DigestAlgorithm.SHA256)
signer = TSPSigner()
message = bytes(data_to_sign, encoding='utf8') message = bytes(data_to_sign, encoding='utf8')
# send TSQ and get TSR from the TSA server signed = self.sign_data(tsa_url, message)
signed = signer.sign(message=message, signing_settings=signing_settings)
# fail if there's any issue with the certificates, uses certifi list of trusted CAs # fail if there's any issue with the certificates, uses certifi list of trusted CAs
TSPVerifier(certifi.where()).verify(signed, message=message) self.verify_signed(signed, message)
# download and verify timestamping certificate # download and verify timestamping certificate
cert_chain = self.download_and_verify_certificate(signed) cert_chain = self.download_and_verify_certificate(signed)
# continue with saving the timestamp token # continue with saving the timestamp token
@ -72,9 +69,22 @@ class TimestampingEnricher(Enricher):
else: else:
logger.warning(f"No successful timestamps for {url=}") logger.warning(f"No successful timestamps for {url=}")
def verify_signed(self, signed: bytes, message: bytes) -> None:
verifier = TSPVerifier(certifi.where())
verifier.verify(signed, message=message)
def sign_data(self, tsa_url: str, bytes_data: bytes) -> bytes:
signing_settings = SigningSettings(tsp_server=tsa_url, digest_algorithm=DigestAlgorithm.SHA256)
signer = TSPSigner()
# send TSQ and get TSR from the TSA server
return signer.sign(message=bytes_data, signing_settings=signing_settings)
def load_tst_certs(self, signed: bytes) -> list[Asn1Value]:
return ContentInfo.load(signed)["content"]["certificates"]
def download_and_verify_certificate(self, signed: bytes) -> list[Media]: def download_and_verify_certificate(self, signed: bytes) -> list[Media]:
# returns the leaf certificate URL, fails if not set # returns the leaf certificate URL, fails if not set
tst = ContentInfo.load(signed) certificates = self.load_tst_certs(signed)
trust_roots = [] trust_roots = []
with open(certifi.where(), 'rb') as f: with open(certifi.where(), 'rb') as f:
@ -82,7 +92,6 @@ class TimestampingEnricher(Enricher):
trust_roots.append(der_bytes) trust_roots.append(der_bytes)
context = ValidationContext(trust_roots=trust_roots) context = ValidationContext(trust_roots=trust_roots)
certificates = tst["content"]["certificates"]
first_cert = certificates[0].dump() first_cert = certificates[0].dump()
intermediate_certs = [] intermediate_certs = []
for i in range(1, len(certificates)): # cannot use list comprehension [1:] for i in range(1, len(certificates)): # cannot use list comprehension [1:]

Plik binarny nie jest wyświetlany.

Wyświetl plik

@ -0,0 +1,41 @@
import pytest
from auto_archiver.modules.timestamping_enricher.timestamping_enricher import TimestampingEnricher
@pytest.fixture
def digicert():
with open("tests/data/timestamp_token_digicert_com.crt", "rb") as f:
return f.read()
@pytest.mark.download
def test_sign_data(setup_module):
tsa_url = "http://timestamp.digicert.com"
tsp: TimestampingEnricher = setup_module("timestamping_enricher")
data = b"4b7b4e39f12b8c725e6e603e6d4422500316df94211070682ef10260ff5759ef"
result: bytes = tsp.sign_data(tsa_url, data)
assert isinstance(result, bytes)
try:
tsp.verify_signed(result, data)
except Exception as e:
pytest.fail(f"Verification failed: {e}")
def test_tsp_enricher_download_syndication(setup_module, digicert):
tsp: TimestampingEnricher = setup_module("timestamping_enricher")
try:
cert_chain = tsp.download_and_verify_certificate(digicert)
assert len(cert_chain) == 3
assert cert_chain[0].filename == "/var/folders/h7/g67pz_kx67q7qxzzrrhvry5r0000gn/T/74515005589773707779.crt"
assert cert_chain[1].filename == "/var/folders/h7/g67pz_kx67q7qxzzrrhvry5r0000gn/T/95861100433808324400.crt"
assert cert_chain[2].filename == "/var/folders/h7/g67pz_kx67q7qxzzrrhvry5r0000gn/T/15527051335772373346.crt"
except Exception as e:
pytest.fail(f"Verification failed: {e}")
def test_tst_cert_valid(setup_module, digicert):
tsp: TimestampingEnricher = setup_module("timestamping_enricher")
try:
tsp.verify_signed(digicert, b"4b7b4e39f12b8c725e6e603e6d4422500316df94211070682ef10260ff5759ef")
except Exception as e:
pytest.fail(f"Verification failed: {e}")