Implement LD signatures for outbound payloads.

ld-signatures
Alain St-Denis 2023-03-14 20:20:44 -04:00
rodzic f787c2f998
commit 0d42bb7018
2 zmienionych plików z 30 dodań i 6 usunięć

Wyświetl plik

@ -8,7 +8,7 @@ from Crypto.PublicKey.RSA import RsaKey
from federation.entities.activitypub.enums import ActorType
from federation.entities.mixins import BaseEntity
from federation.protocols.activitypub.signing import verify_request_signature, verify_ld_signature
from federation.protocols.activitypub.signing import verify_request_signature, verify_ld_signature, create_ld_signature
from federation.types import UserType, RequestType
from federation.utils.text import decode_if_bytes
@ -59,6 +59,7 @@ class Protocol:
rendered = entity.outbound_doc
else:
rendered = entity.to_as2()
create_ld_signature(rendered, from_user)
return rendered
def extract_actor(self):

Wyświetl plik

@ -7,7 +7,7 @@ import datetime
import logging
import math
import re
from base64 import b64decode
from base64 import b64encode, b64decode
from copy import copy
from funcy import omit
from pyld import jsonld
@ -45,10 +45,6 @@ def get_http_authentication(private_key: RsaKey, private_key_id: str, digest: bo
)
def create_ld_signature(payload, private_key):
pass
def verify_request_signature(request: RequestType, required: bool=True):
"""
Verify HTTP signature in request against a public key.
@ -95,6 +91,33 @@ def verify_request_signature(request: RequestType, required: bool=True):
return signer.id
def create_ld_signature(obj, author):
# Use models.Signature? Maybe overkill...
sig = {
'created': datetime.datetime.now(tz=datetime.timezone.utc).isoformat(timespec='seconds'),
'creator': f'{author.id}#main-key',
'@context':'https://w3id.org/security/v1'
}
try:
private_key = import_key(author.private_key)
except ValueError as exc:
logger.warning(f'ld_signature - {exc}')
return None
signer = pkcs1_15.new(private_key)
sig_nquads = normalize(sig, options={'format':'application/nquads','algorithm':'URDNA2015'}).encode('utf-8')
sig_digest = SHA256.new(sig_nquads).hexdigest()
obj_nquads = normalize(obj, options={'format':'application/nquads','algorithm':'URDNA2015'}).encode('utf-8')
obj_digest = SHA256.new(obj_nquads).hexdigest()
digest = (sig_digest + obj_digest).encode('utf-8')
signature = signer.sign(SHA256.new(digest))
sig.update({'type': 'RsaSignature2017', 'signatureValue': b64encode(signature).decode()})
sig.pop('@context')
obj.update({'signature':sig})
def verify_ld_signature(payload):
"""
Verify inbound payload LD signature