Further WIP - currently working on verify_signed

pull/224/head
Patrick Robertson 2025-02-25 12:08:08 +00:00
rodzic 6987a4827e
commit 898faf6fe4
3 zmienionych plików z 38 dodań i 50 usunięć

Wyświetl plik

@ -36,6 +36,11 @@
"http://tss.accv.es:8318/tsa",
],
"help": "List of RFC3161 Time Stamp Authorities to use, separate with commas if passed via the command line.",
},
"cert_authorities": {
"default": None,
"help": "Path to a file containing trusted Certificate Authorities (CAs) in PEM format. If empty, the default system authorities are used.",
"type": "str",
}
},
"description": """

Wyświetl plik

@ -12,6 +12,8 @@ from rfc3161_client import (
)
from rfc3161_client import VerificationError as Rfc3161VerificationError
from rfc3161_client.base import HashAlgorithm
from rfc3161_client.tsp import SignedData
from cryptography import x509
import certifi
from auto_archiver.core import Enricher
from auto_archiver.core import Metadata, Media
@ -69,7 +71,7 @@ class TimestampingEnricher(Enricher):
# fail if there's any issue with the certificates, uses certifi list of trusted CAs
self.verify_signed(signed, message)
# download and verify timestamping certificate
cert_chain = self.download_and_verify_certificate(signed)
cert_chain = self.download_certificate(signed)
# continue with saving the timestamp token
tst_fn = os.path.join(self.tmp_dir, f"timestamp_token_{slugify(tsa_url)}")
with open(tst_fn, "wb") as f:
@ -93,46 +95,31 @@ class TimestampingEnricher(Enricher):
"""
Verify a Signed Timestamp using the TSA provided by the Trusted Root.
"""
cert_authorities = self._trusted_root.get_timestamp_authorities()
valid = False
for certificate_authority in cert_authorities:
certificates = certificate_authority.certificates(allow_expired=True)
trusted_root_path = self.cert_authorities or certifi.where()
cert_authorities = []
with open(trusted_root_path, 'rb') as f:
cert_authorities = x509.load_pem_x509_certificates(f.read())
if not cert_authorities:
raise ValueError(f"No trusted roots found in {trusted_root_path}.")
valid = False
for certificate in cert_authorities:
builder = VerifierBuilder()
for certificate in certificates:
builder.add_root_certificate(certificate)
builder.add_root_certificate(certificate)
verifier = builder.build()
try:
verifier.verify(timestamp_response, signature)
return certificate
except Rfc3161VerificationError as e:
logger.debug("Unable to verify Timestamp with CA.")
logger.exception(e)
logger.debug(f"Unable to verify Timestamp with CA {certificate.subject}: {e}")
continue
if (
certificate_authority.validity_period_start
and certificate_authority.validity_period_end
):
if (
certificate_authority.validity_period_start
<= timestamp_response.tst_info.gen_time
< certificate_authority.validity_period_end
):
return TimestampVerificationResult(
source=TimestampSource.TIMESTAMP_AUTHORITY,
time=timestamp_response.tst_info.gen_time,
)
logger.debug(
"Unable to verify Timestamp because not in CA time range."
)
else:
logger.debug(
"Unable to verify Timestamp because no validity provided."
)
return None
return False
def sign_data(self, tsa_url: str, bytes_data: bytes) -> TimeStampResponse:
# see https://github.com/sigstore/sigstore-python/blob/99948d5b80525a5a104e904ffea58169dc6e0629/sigstore/_internal/timestamp.py#L84-L121
@ -155,26 +142,16 @@ class TimestampingEnricher(Enricher):
raise
return timestamp_response
def load_tst_certs(self, signed: bytes):
return ContentInfo.load(signed)["content"]["certificates"]
def load_tst_certs(self, tsp_response: TimeStampResponse):
signed_data: SignedData = tsp_response.signed_data
certs = signed_data.certificates
def download_and_verify_certificate(self, signed: bytes) -> list[Media]:
def download_certificate(self, tsp_response: TimeStampResponse) -> list[Media]:
# returns the leaf certificate URL, fails if not set
certificates = self.load_tst_certs(signed)
trust_roots = []
with open(certifi.where(), 'rb') as f:
for _, _, der_bytes in pem.unarmor(f.read(), multiple=True):
trust_roots.append(der_bytes)
context = ValidationContext(trust_roots=trust_roots)
certificates = self.load_tst_certs(tsp_response)
first_cert = certificates[0].dump()
intermediate_certs = []
for i in range(1, len(certificates)): # cannot use list comprehension [1:]
intermediate_certs.append(certificates[i].dump())
validator = CertificateValidator(first_cert, intermediate_certs=intermediate_certs, validation_context=context)
path = validator.validate_usage({'digital_signature'}, extended_key_usage={'time_stamping'})
cert_chain = []
for cert in path:

Wyświetl plik

@ -15,15 +15,21 @@ def digicert():
def test_sign_data(setup_module):
tsa_url = "http://timestamp.identrust.com"
tsp: TimestampingEnricher = setup_module("timestamping_enricher")
data = b"4b7b4e39f12b8c725e6e603e6d4422500316df94211070682ef10260ff5759ef"
result: TimeStampResponse = tsp.sign_data(tsa_url, data)
assert isinstance(result, TimeStampResponse)
try:
tsp.verify_signed(result, data)
valid_root = tsp.verify_signed(result, data)
assert valid_root.subject == "CN=Entrust Root Certification Authority - G2, OU=(c) 2009 Entrust, Inc. - for authorized use only, OU=See www.entrust.net/legal-terms, O=Entrust, Inc., C="
except Exception as e:
pytest.fail(f"Verification failed: {e}")
# test downloading the cert
cert_chain = tsp.download_and_verify_certificate(result)
def test_tsp_enricher_download_syndication(setup_module, digicert):
tsp: TimestampingEnricher = setup_module("timestamping_enricher")