kopia lustrzana https://github.com/bellingcat/auto-archiver
Ruff fixes
rodzic
b8da7607e8
commit
17ae75fb95
|
@ -18,12 +18,12 @@
|
|||
],
|
||||
"help": "List of OpenTimestamps calendar servers to use for timestamping. See here for a list of calendars maintained by opentimestamps:\
|
||||
https://opentimestamps.org/#calendars",
|
||||
"type": "list"
|
||||
"type": "list",
|
||||
},
|
||||
"calendar_whitelist": {
|
||||
"default": [],
|
||||
"help": "Optional whitelist of calendar servers. Override this if you are using your own calendar servers. e.g. ['https://mycalendar.com']",
|
||||
"type": "list"
|
||||
"type": "list",
|
||||
},
|
||||
},
|
||||
"description": """
|
||||
|
@ -96,5 +96,5 @@ Calendar https://alice.btc.calendar.opentimestamps.org: Timestamped by transacti
|
|||
if you want to use your own calendars, then you can override this setting in the `calendar_whitelist` configuration option.
|
||||
|
||||
|
||||
"""
|
||||
}
|
||||
""",
|
||||
}
|
||||
|
|
|
@ -11,8 +11,8 @@ from auto_archiver.core import Enricher
|
|||
from auto_archiver.core import Metadata, Media
|
||||
from auto_archiver.utils.misc import get_current_timestamp
|
||||
|
||||
class OpentimestampsEnricher(Enricher):
|
||||
|
||||
class OpentimestampsEnricher(Enricher):
|
||||
def enrich(self, to_enrich: Metadata) -> None:
|
||||
url = to_enrich.get_url()
|
||||
logger.debug(f"OpenTimestamps timestamping files for {url=}")
|
||||
|
@ -31,42 +31,42 @@ class OpentimestampsEnricher(Enricher):
|
|||
if not os.path.exists(file_path):
|
||||
logger.warning(f"File not found: {file_path}")
|
||||
continue
|
||||
|
||||
|
||||
# Create timestamp for the file - hash is SHA256
|
||||
# Note: hash is hard-coded to SHA256 and does not use hash_enricher to set it.
|
||||
# SHA256 is the recommended hash, ref: https://github.com/bellingcat/auto-archiver/pull/247#discussion_r1992433181
|
||||
logger.debug(f"Creating timestamp for {file_path}")
|
||||
file_hash = None
|
||||
with open(file_path, 'rb') as f:
|
||||
with open(file_path, "rb") as f:
|
||||
file_hash = OpSHA256().hash_fd(f)
|
||||
|
||||
if not file_hash:
|
||||
logger.warning(f"Failed to hash file for timestamping, skipping: {file_path}")
|
||||
continue
|
||||
|
||||
|
||||
# Create a timestamp with the file hash
|
||||
timestamp = Timestamp(file_hash)
|
||||
|
||||
|
||||
# Create a detached timestamp file with the hash operation and timestamp
|
||||
detached_timestamp = DetachedTimestampFile(OpSHA256(), timestamp)
|
||||
|
||||
|
||||
# Submit to calendar servers
|
||||
submitted_to_calendar = False
|
||||
|
||||
logger.debug(f"Submitting timestamp to calendar servers for {file_path}")
|
||||
calendars = []
|
||||
whitelist = DEFAULT_CALENDAR_WHITELIST
|
||||
|
||||
|
||||
if self.calendar_whitelist:
|
||||
whitelist = set(self.calendar_whitelist)
|
||||
|
||||
|
||||
# Create calendar instances
|
||||
calendar_urls = []
|
||||
for url in self.calendar_urls:
|
||||
if url in whitelist:
|
||||
calendars.append(RemoteCalendar(url))
|
||||
calendar_urls.append(url)
|
||||
|
||||
|
||||
# Submit the hash to each calendar
|
||||
for calendar in calendars:
|
||||
try:
|
||||
|
@ -76,17 +76,19 @@ class OpentimestampsEnricher(Enricher):
|
|||
submitted_to_calendar = True
|
||||
except Exception as e:
|
||||
logger.warning(f"Failed to submit to calendar {calendar.url}: {e}")
|
||||
|
||||
|
||||
# If all calendar submissions failed, add pending attestations
|
||||
if not submitted_to_calendar and not timestamp.attestations:
|
||||
logger.error(f"Failed to submit to any calendar for {file_path}. **This file will not be timestamped.**")
|
||||
logger.error(
|
||||
f"Failed to submit to any calendar for {file_path}. **This file will not be timestamped.**"
|
||||
)
|
||||
media.set("opentimestamps", False)
|
||||
continue
|
||||
|
||||
|
||||
# Save the timestamp proof to a file
|
||||
timestamp_path = os.path.join(self.tmp_dir, f"{os.path.basename(file_path)}.ots")
|
||||
try:
|
||||
with open(timestamp_path, 'wb') as f:
|
||||
with open(timestamp_path, "wb") as f:
|
||||
# Create a serialization context and write to the file
|
||||
ctx = serialize.BytesSerializationContext()
|
||||
detached_timestamp.serialize(ctx)
|
||||
|
@ -94,25 +96,25 @@ class OpentimestampsEnricher(Enricher):
|
|||
except Exception as e:
|
||||
logger.warning(f"Failed to serialize timestamp file: {e}")
|
||||
continue
|
||||
|
||||
|
||||
# Create media for the timestamp file
|
||||
timestamp_media = Media(filename=timestamp_path)
|
||||
# explicitly set the mimetype, normally .ots files are 'application/vnd.oasis.opendocument.spreadsheet-template'
|
||||
timestamp_media.mimetype = "application/vnd.opentimestamps"
|
||||
timestamp_media.set("opentimestamps_version", opentimestamps.__version__)
|
||||
|
||||
|
||||
verification_info = self.verify_timestamp(detached_timestamp)
|
||||
for key, value in verification_info.items():
|
||||
timestamp_media.set(key, value)
|
||||
|
||||
|
||||
media.set("opentimestamp_files", [timestamp_media])
|
||||
timestamp_files.append(timestamp_media.filename)
|
||||
# Update the original media to indicate it's been timestamped
|
||||
media.set("opentimestamps", True)
|
||||
|
||||
|
||||
except Exception as e:
|
||||
logger.warning(f"Error while timestamping {media.filename}: {e}")
|
||||
|
||||
|
||||
# Add timestamp files to the metadata
|
||||
if timestamp_files:
|
||||
to_enrich.set("opentimestamped", True)
|
||||
|
@ -121,43 +123,43 @@ class OpentimestampsEnricher(Enricher):
|
|||
else:
|
||||
to_enrich.set("opentimestamped", False)
|
||||
logger.warning(f"No successful timestamps created for {url=}")
|
||||
|
||||
|
||||
def verify_timestamp(self, detached_timestamp):
|
||||
"""
|
||||
Verify a timestamp and extract verification information.
|
||||
|
||||
|
||||
Args:
|
||||
detached_timestamp: The detached timestamp to verify.
|
||||
|
||||
|
||||
Returns:
|
||||
dict: Information about the verification result.
|
||||
"""
|
||||
result = {}
|
||||
|
||||
|
||||
# Check if we have attestations
|
||||
attestations = list(detached_timestamp.timestamp.all_attestations())
|
||||
result["attestation_count"] = len(attestations)
|
||||
|
||||
|
||||
if attestations:
|
||||
attestation_info = []
|
||||
for msg, attestation in attestations:
|
||||
info = {}
|
||||
|
||||
|
||||
# Process different types of attestations
|
||||
if isinstance(attestation, PendingAttestation):
|
||||
info["status"] = "pending"
|
||||
info["uri"] = attestation.uri
|
||||
|
||||
|
||||
elif isinstance(attestation, BitcoinBlockHeaderAttestation):
|
||||
info["status"] = "confirmed"
|
||||
info["block_height"] = attestation.height
|
||||
|
||||
info["last_check"] = get_current_timestamp()
|
||||
|
||||
|
||||
attestation_info.append(info)
|
||||
|
||||
|
||||
result["attestations"] = attestation_info
|
||||
|
||||
|
||||
# For at least one confirmed attestation
|
||||
if any("confirmed" in a.get("status") for a in attestation_info):
|
||||
result["verified"] = True
|
||||
|
@ -166,5 +168,5 @@ class OpentimestampsEnricher(Enricher):
|
|||
else:
|
||||
result["verified"] = False
|
||||
result["last_updated"] = get_current_timestamp()
|
||||
|
||||
return result
|
||||
|
||||
return result
|
||||
|
|
|
@ -1,7 +1,4 @@
|
|||
from pathlib import Path
|
||||
import pytest
|
||||
import os
|
||||
import tempfile
|
||||
import hashlib
|
||||
|
||||
from opentimestamps.core.timestamp import Timestamp, DetachedTimestampFile
|
||||
|
@ -26,136 +23,146 @@ def sample_file_path(tmp_path):
|
|||
tmp_file.write_text("This is a test file content for OpenTimestamps")
|
||||
return str(tmp_file)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def detached_timestamp_file():
|
||||
"""Create a simple detached timestamp file for testing"""
|
||||
file_hash = hashlib.sha256(b"Test content").digest()
|
||||
from opentimestamps.core.op import OpSHA256
|
||||
|
||||
file_hash_op = OpSHA256()
|
||||
timestamp = Timestamp(file_hash)
|
||||
|
||||
|
||||
# Add a pending attestation
|
||||
pending = PendingAttestation("https://example.calendar.com")
|
||||
timestamp.attestations.add(pending)
|
||||
|
||||
|
||||
# Add a bitcoin attestation
|
||||
bitcoin = BitcoinBlockHeaderAttestation(783000) # Some block height
|
||||
timestamp.attestations.add(bitcoin)
|
||||
|
||||
|
||||
return DetachedTimestampFile(file_hash_op, timestamp)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def verified_timestamp_file():
|
||||
"""Create a timestamp file with a Bitcoin attestation"""
|
||||
file_hash = hashlib.sha256(b"Verified content").digest()
|
||||
from opentimestamps.core.op import OpSHA256
|
||||
|
||||
file_hash_op = OpSHA256()
|
||||
timestamp = Timestamp(file_hash)
|
||||
|
||||
|
||||
# Add only a Bitcoin attestation
|
||||
bitcoin = BitcoinBlockHeaderAttestation(783000) # Some block height
|
||||
timestamp.attestations.add(bitcoin)
|
||||
|
||||
|
||||
return DetachedTimestampFile(file_hash_op, timestamp)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def pending_timestamp_file():
|
||||
"""Create a timestamp file with only pending attestations"""
|
||||
file_hash = hashlib.sha256(b"Pending content").digest()
|
||||
from opentimestamps.core.op import OpSHA256
|
||||
|
||||
file_hash_op = OpSHA256()
|
||||
timestamp = Timestamp(file_hash)
|
||||
|
||||
|
||||
# Add only pending attestations
|
||||
pending1 = PendingAttestation("https://example1.calendar.com")
|
||||
pending2 = PendingAttestation("https://example2.calendar.com")
|
||||
timestamp.attestations.add(pending1)
|
||||
timestamp.attestations.add(pending2)
|
||||
|
||||
|
||||
return DetachedTimestampFile(file_hash_op, timestamp)
|
||||
|
||||
|
||||
@pytest.mark.download
|
||||
def test_download_tsr(setup_module, mocker):
|
||||
"""Test submitting a hash to calendar servers"""
|
||||
# Mock the RemoteCalendar submit method
|
||||
mock_submit = mocker.patch.object(RemoteCalendar, 'submit')
|
||||
mock_submit = mocker.patch.object(RemoteCalendar, "submit")
|
||||
test_timestamp = Timestamp(hashlib.sha256(b"test").digest())
|
||||
mock_submit.return_value = test_timestamp
|
||||
|
||||
|
||||
ots = setup_module("opentimestamps_enricher")
|
||||
|
||||
|
||||
# Create a calendar
|
||||
calendar = RemoteCalendar("https://alice.btc.calendar.opentimestamps.org")
|
||||
|
||||
|
||||
# Test submission
|
||||
file_hash = hashlib.sha256(b"Test file content").digest()
|
||||
result = calendar.submit(file_hash)
|
||||
|
||||
|
||||
assert mock_submit.called
|
||||
assert isinstance(result, Timestamp)
|
||||
assert result == test_timestamp
|
||||
|
||||
|
||||
def test_verify_timestamp(setup_module, detached_timestamp_file):
|
||||
"""Test the verification of timestamp attestations"""
|
||||
ots = setup_module("opentimestamps_enricher")
|
||||
|
||||
|
||||
# Test verification
|
||||
verification_info = ots.verify_timestamp(detached_timestamp_file)
|
||||
|
||||
|
||||
# Check verification results
|
||||
assert verification_info["attestation_count"] == 2
|
||||
assert verification_info["verified"] == True
|
||||
assert len(verification_info["attestations"]) == 2
|
||||
|
||||
|
||||
# Check attestation types
|
||||
assertion_types = [a["status"] for a in verification_info["attestations"]]
|
||||
assert "pending" in assertion_types
|
||||
assert "confirmed" in assertion_types
|
||||
|
||||
|
||||
# Check Bitcoin attestation details
|
||||
bitcoin_attestation = next(a for a in verification_info["attestations"] if a["status"] == "confirmed")
|
||||
assert bitcoin_attestation["block_height"] == 783000
|
||||
|
||||
|
||||
def test_verify_pending_only(setup_module, pending_timestamp_file):
|
||||
"""Test verification of timestamps with only pending attestations"""
|
||||
ots = setup_module("opentimestamps_enricher")
|
||||
|
||||
|
||||
verification_info = ots.verify_timestamp(pending_timestamp_file)
|
||||
|
||||
|
||||
assert verification_info["attestation_count"] == 2
|
||||
assert verification_info["verified"] == False
|
||||
|
||||
|
||||
# All attestations should be of type "pending"
|
||||
assert all(a["status"] == "pending" for a in verification_info["attestations"])
|
||||
|
||||
|
||||
# Check URIs of pending attestations
|
||||
uris = [a["uri"] for a in verification_info["attestations"]]
|
||||
assert "https://example1.calendar.com" in uris
|
||||
assert "https://example2.calendar.com" in uris
|
||||
|
||||
|
||||
def test_verify_bitcoin_completed(setup_module, verified_timestamp_file):
|
||||
"""Test verification of timestamps with completed Bitcoin attestations"""
|
||||
|
||||
ots = setup_module("opentimestamps_enricher")
|
||||
|
||||
|
||||
verification_info = ots.verify_timestamp(verified_timestamp_file)
|
||||
|
||||
|
||||
assert verification_info["attestation_count"] == 1
|
||||
assert verification_info["verified"] == True
|
||||
assert "pending" not in verification_info
|
||||
|
||||
|
||||
# Check that the attestation is a Bitcoin attestation
|
||||
attestation = verification_info["attestations"][0]
|
||||
assert attestation["status"] == "confirmed"
|
||||
assert attestation["block_height"] == 783000
|
||||
|
||||
|
||||
def test_full_enriching(setup_module, sample_file_path, sample_media, mocker):
|
||||
"""Test the complete enrichment process"""
|
||||
|
||||
# Mock the calendar submission to avoid network requests
|
||||
mock_calendar = mocker.patch.object(RemoteCalendar, 'submit')
|
||||
|
||||
mock_calendar = mocker.patch.object(RemoteCalendar, "submit")
|
||||
|
||||
# Create a function that returns a new timestamp for each call
|
||||
def side_effect(digest):
|
||||
test_timestamp = Timestamp(digest)
|
||||
|
@ -163,97 +170,109 @@ def test_full_enriching(setup_module, sample_file_path, sample_media, mocker):
|
|||
bitcoin = BitcoinBlockHeaderAttestation(783000)
|
||||
test_timestamp.attestations.add(bitcoin)
|
||||
return test_timestamp
|
||||
|
||||
|
||||
mock_calendar.side_effect = side_effect
|
||||
|
||||
ots = setup_module("opentimestamps_enricher")
|
||||
|
||||
|
||||
# Create test metadata with sample file
|
||||
metadata = Metadata().set_url("https://example.com")
|
||||
sample_media.filename = sample_file_path
|
||||
metadata.add_media(sample_media)
|
||||
|
||||
|
||||
# Run enrichment
|
||||
ots.enrich(metadata)
|
||||
|
||||
|
||||
# Verify results
|
||||
assert metadata.get("opentimestamped") == True
|
||||
assert metadata.get("opentimestamps_count") == 1
|
||||
|
||||
|
||||
# Check that we have one parent media item: the original
|
||||
assert len(metadata.media) == 1
|
||||
|
||||
|
||||
# Check that the original media was updated
|
||||
assert metadata.media[0].get("opentimestamps") == True
|
||||
|
||||
|
||||
# Check the timestamp file media is a child of the original
|
||||
assert len(metadata.media[0].get("opentimestamp_files")) == 1
|
||||
|
||||
timestamp_media = metadata.media[0].get("opentimestamp_files")[0]
|
||||
|
||||
assert timestamp_media.get("opentimestamps_version") is not None
|
||||
|
||||
|
||||
# Check verification results on the timestamp media
|
||||
assert timestamp_media.get("verified") == True
|
||||
assert timestamp_media.get("attestation_count") == 1
|
||||
|
||||
def test_full_enriching_one_calendar_error(setup_module, sample_file_path, sample_media, mocker, pending_timestamp_file):
|
||||
|
||||
def test_full_enriching_one_calendar_error(
|
||||
setup_module, sample_file_path, sample_media, mocker, pending_timestamp_file
|
||||
):
|
||||
"""Test enrichment when one calendar server returns an error"""
|
||||
# Mock the calendar submission to raise an exception
|
||||
mock_calendar = mocker.patch.object(RemoteCalendar, 'submit')
|
||||
|
||||
mock_calendar = mocker.patch.object(RemoteCalendar, "submit")
|
||||
|
||||
test_timestamp = Timestamp(bytes.fromhex("583988e03646c26fa290c5c2408540a2f4e2aa9be087aa4546aefb531385b935"))
|
||||
# Add a bitcoin attestation to the test timestamp
|
||||
# Add a bitcoin attestation to the test timestamp
|
||||
bitcoin = BitcoinBlockHeaderAttestation(783000)
|
||||
test_timestamp.attestations.add(bitcoin)
|
||||
|
||||
mock_calendar.side_effect = [test_timestamp, Exception("Calendar server error")]
|
||||
|
||||
ots = setup_module("opentimestamps_enricher", {"calendar_urls": ["https://alice.btc.calendar.opentimestamps.org", "https://bob.btc.calendar.opentimestamps.org"]})
|
||||
|
||||
ots = setup_module(
|
||||
"opentimestamps_enricher",
|
||||
{
|
||||
"calendar_urls": [
|
||||
"https://alice.btc.calendar.opentimestamps.org",
|
||||
"https://bob.btc.calendar.opentimestamps.org",
|
||||
]
|
||||
},
|
||||
)
|
||||
|
||||
# Create test metadata with sample file
|
||||
metadata = Metadata().set_url("https://example.com")
|
||||
sample_media.filename = sample_file_path
|
||||
metadata.add_media(sample_media)
|
||||
|
||||
|
||||
# Run enrichment (should complete despite calendar errors)
|
||||
ots.enrich(metadata)
|
||||
|
||||
|
||||
# Verify results
|
||||
assert metadata.get("opentimestamped") == True
|
||||
assert metadata.get("opentimestamps_count") == 1 # only alice worked, not bob
|
||||
assert metadata.get("opentimestamps_count") == 1 # only alice worked, not bob
|
||||
|
||||
|
||||
def test_full_enriching_calendar_error(setup_module, sample_file_path, sample_media, mocker):
|
||||
"""Test enrichment when calendar servers return errors"""
|
||||
# Mock the calendar submission to raise an exception
|
||||
mock_calendar = mocker.patch.object(RemoteCalendar, 'submit')
|
||||
mock_calendar = mocker.patch.object(RemoteCalendar, "submit")
|
||||
mock_calendar.side_effect = Exception("Calendar server error")
|
||||
|
||||
|
||||
ots = setup_module("opentimestamps_enricher")
|
||||
|
||||
|
||||
# Create test metadata with sample file
|
||||
metadata = Metadata().set_url("https://example.com")
|
||||
sample_media.filename = sample_file_path
|
||||
metadata.add_media(sample_media)
|
||||
|
||||
|
||||
# Run enrichment (should complete despite calendar errors)
|
||||
ots.enrich(metadata)
|
||||
|
||||
|
||||
# Verify results
|
||||
assert metadata.get("opentimestamped") == False
|
||||
assert metadata.get("opentimestamps_count") is None
|
||||
|
||||
|
||||
def test_no_files_to_stamp(setup_module):
|
||||
"""Test enrichment with no files to timestamp"""
|
||||
ots = setup_module("opentimestamps_enricher")
|
||||
|
||||
|
||||
# Create empty metadata
|
||||
metadata = Metadata().set_url("https://example.com")
|
||||
|
||||
|
||||
# Run enrichment
|
||||
ots.enrich(metadata)
|
||||
|
||||
|
||||
# Verify no timestamping occurred
|
||||
assert metadata.get("opentimestamped") is None
|
||||
assert len(metadata.media) == 0
|
||||
assert len(metadata.media) == 0
|
||||
|
|
Ładowanie…
Reference in New Issue