kopia lustrzana https://gitlab.com/jaywink/federation
366 wiersze
17 KiB
Python
366 wiersze
17 KiB
Python
import copy
|
|
import importlib
|
|
import json
|
|
import logging
|
|
import traceback
|
|
from typing import List, Dict, Union
|
|
|
|
# noinspection PyPackageRequirements
|
|
from Crypto.PublicKey import RSA
|
|
# noinspection PyPackageRequirements
|
|
from Crypto.PublicKey.RSA import RsaKey
|
|
from iteration_utilities import unique_everseen
|
|
|
|
from federation.entities.activitypub.constants import NAMESPACE_PUBLIC
|
|
from federation.entities.mixins import BaseEntity
|
|
from federation.protocols.activitypub.signing import get_http_authentication
|
|
from federation.types import UserType
|
|
from federation.utils.matrix import get_matrix_configuration
|
|
from federation.utils.network import send_document
|
|
|
|
logger = logging.getLogger("federation")
|
|
|
|
|
|
def handle_create_payload(
|
|
entity: BaseEntity,
|
|
author_user: UserType,
|
|
protocol_name: str,
|
|
to_user_key: RsaKey = None,
|
|
parent_user: UserType = None,
|
|
payload_logger: callable = None,
|
|
) -> Union[str, Dict, List[Dict]]:
|
|
"""Create a payload with the given protocol.
|
|
|
|
Any given user arguments must have ``private_key`` and ``handle`` attributes.
|
|
|
|
:arg entity: Entity object to send. Can be a base entity or a protocol specific one.
|
|
:arg author_user: User authoring the object.
|
|
:arg protocol_name: Protocol to create payload for.
|
|
:arg to_user_key: Public key of user private payload is being sent to, required for private payloads.
|
|
:arg parent_user: (Optional) User object of the parent object, if there is one. This must be given for the
|
|
Diaspora protocol if a parent object exists, so that a proper ``parent_author_signature`` can
|
|
be generated. If given, the payload will be sent as this user.
|
|
:arg payload_logger: (Optional) Function to log the payloads with.
|
|
|
|
:returns: Built payload(s) (str or dict or list (of payloads))
|
|
"""
|
|
mappers = importlib.import_module(f"federation.entities.{protocol_name}.mappers")
|
|
protocol = importlib.import_module(f"federation.protocols.{protocol_name}.protocol")
|
|
# noinspection PyUnresolvedReferences
|
|
protocol = protocol.Protocol()
|
|
# noinspection PyUnresolvedReferences
|
|
outbound_entity = mappers.get_outbound_entity(entity, author_user.rsa_private_key)
|
|
if parent_user:
|
|
outbound_entity.sign_with_parent(parent_user.rsa_private_key)
|
|
send_as_user = parent_user if parent_user else author_user
|
|
data = protocol.build_send(entity=outbound_entity, from_user=send_as_user, to_user_key=to_user_key)
|
|
if payload_logger:
|
|
try:
|
|
payload_logger(data, protocol_name, author_user.id)
|
|
except Exception as ex:
|
|
logger.warning("handle_create_payload | Failed to log payload: %s" % ex)
|
|
return data
|
|
|
|
|
|
def handle_send(
|
|
entity: BaseEntity,
|
|
author_user: UserType,
|
|
recipients: List[Dict],
|
|
parent_user: UserType = None,
|
|
payload_logger: callable = None,
|
|
) -> None:
|
|
"""Send an entity to remote servers.
|
|
|
|
Using this we will build a list of payloads per protocol. After that, each recipient will get the generated
|
|
protocol payload delivered. Delivery to the same endpoint will only be done once so it's ok to include
|
|
the same endpoint as a receiver multiple times.
|
|
|
|
Any given user arguments must have ``private_key`` and ``fid`` attributes.
|
|
|
|
:arg entity: Entity object to send. Can be a base entity or a protocol specific one.
|
|
:arg author_user: User authoring the object.
|
|
:arg recipients: A list of recipients to delivery to. Each recipient is a dict
|
|
containing at minimum the "endpoint", "fid", "public" and "protocol" keys.
|
|
|
|
For ActivityPub and Diaspora payloads, "endpoint" should be an URL of the endpoint to deliver to.
|
|
|
|
The "fid" can be empty for Diaspora payloads. For ActivityPub it should be the recipient
|
|
federation ID should the delivery be non-private.
|
|
|
|
The "protocol" should be a protocol name that is known for this recipient.
|
|
|
|
The "public" value should be a boolean to indicate whether the payload should be flagged as a
|
|
public payload.
|
|
|
|
TODO: support guessing the protocol over networks? Would need caching of results
|
|
|
|
For private deliveries to Diaspora protocol recipients, "public_key" is also required.
|
|
|
|
For example
|
|
[
|
|
{
|
|
"endpoint": "https://domain.tld/receive/users/1234-5678-0123-4567",
|
|
"fid": "",
|
|
"protocol": "diaspora",
|
|
"public": False,
|
|
"public_key": <RSAPublicKey object> | str,
|
|
},
|
|
{
|
|
"endpoint": "https://domain2.tld/receive/public",
|
|
"fid": "",
|
|
"protocol": "diaspora",
|
|
"public": True,
|
|
},
|
|
{
|
|
"endpoint": "https://domain4.tld/sharedinbox/",
|
|
"fid": "https://domain4.tld/profiles/jack/",
|
|
"protocol": "activitypub",
|
|
"public": True,
|
|
},
|
|
{
|
|
"endpoint": "https://domain4.tld/profiles/jill/inbox",
|
|
"fid": "https://domain4.tld/profiles/jill",
|
|
"protocol": "activitypub",
|
|
"public": False,
|
|
},
|
|
{
|
|
"endpoint": "https://matrix.domain.tld",
|
|
"fid": "#@user:domain.tld",
|
|
"protocol": "matrix",
|
|
"public": True,
|
|
}
|
|
]
|
|
:arg parent_user: (Optional) User object of the parent object, if there is one. This must be given for the
|
|
Diaspora protocol if a parent object exists, so that a proper ``parent_author_signature`` can
|
|
be generated. If given, the payload will be sent as this user. For Activitypub, the
|
|
parent_user's private key will be used to generate the http signature if the author_user
|
|
is not a local user.
|
|
|
|
:arg payload_logger: (Optional) Function to log the payloads with.
|
|
"""
|
|
payloads = []
|
|
ready_payloads = {
|
|
"activitypub": {
|
|
"auth": None,
|
|
"headers": {},
|
|
"payload": None,
|
|
"urls": set(),
|
|
},
|
|
"diaspora": {
|
|
"auth": None,
|
|
"headers": {},
|
|
"payload": None,
|
|
"urls": set(),
|
|
},
|
|
"matrix": {
|
|
"auth": None,
|
|
"headers": {},
|
|
"payload": None,
|
|
"urls": set(),
|
|
},
|
|
}
|
|
skip_ready_payload = {
|
|
"activitypub": False,
|
|
"diaspora": False,
|
|
"matrix": False,
|
|
}
|
|
logger.debug('handle_send - length of recipients: %s', len(recipients))
|
|
# Flatten to unique recipients
|
|
# TODO supply a callable that empties "fid" in the case that public=True
|
|
unique_recipients = list(unique_everseen(recipients, key=lambda val: val['endpoint']))
|
|
logger.debug('handle_send - length of unique_recipients: %s', len(unique_recipients))
|
|
logger.debug('handle_send / unique_recipients - %s', unique_recipients)
|
|
|
|
matrix_config = None
|
|
|
|
# Generate payloads and collect urls
|
|
for recipient in unique_recipients:
|
|
payload = None
|
|
endpoint = recipient["endpoint"]
|
|
fid = recipient["fid"]
|
|
public_key = recipient.get("public_key")
|
|
if isinstance(public_key, str):
|
|
public_key = RSA.importKey(public_key)
|
|
protocol = recipient["protocol"]
|
|
public = recipient["public"]
|
|
|
|
if protocol == "activitypub":
|
|
if skip_ready_payload["activitypub"]:
|
|
logger.debug('Skipping activitypub payload as skip_ready_payload set')
|
|
continue
|
|
if entity.__class__.__name__.startswith("Diaspora") or entity.__class__.__name__.startswith("Matrix"):
|
|
# Don't try to do anything with these entities currently
|
|
skip_ready_payload["activitypub"] = True
|
|
logger.debug('Skipping activitypub payload as payload is diaspora or matrix')
|
|
continue
|
|
# noinspection PyBroadException
|
|
try:
|
|
if not ready_payloads[protocol]["payload"]:
|
|
try:
|
|
# noinspection PyTypeChecker
|
|
ready_payloads[protocol]["payload"] = handle_create_payload(
|
|
entity, author_user, protocol, parent_user=parent_user, payload_logger=payload_logger,
|
|
)
|
|
except ValueError as ex:
|
|
# No point continuing for this protocol
|
|
skip_ready_payload["activitypub"] = True
|
|
logger.warning("handle_send - skipping activitypub due to failure to generate payload: %s", ex)
|
|
continue
|
|
payload = copy.copy(ready_payloads[protocol]["payload"])
|
|
rendered_payload = json.dumps(payload).encode("utf-8")
|
|
except Exception:
|
|
logger.error(
|
|
"handle_send - failed to generate activitypub payload for %s, %s: %s",
|
|
fid, endpoint, traceback.format_exc(),
|
|
extra={
|
|
"recipient": recipient,
|
|
"unique_recipients": list(unique_recipients),
|
|
"payload": payload,
|
|
"payloads": payloads,
|
|
"ready_payloads": ready_payloads,
|
|
"entity": entity,
|
|
"author": author_user.id,
|
|
"parent_user": parent_user.id if parent_user else None,
|
|
}
|
|
)
|
|
continue
|
|
# The parent_user MUST be local
|
|
local_user = author_user if author_user.rsa_private_key else parent_user
|
|
payloads.append({
|
|
"auth": get_http_authentication(local_user.rsa_private_key, f"{local_user.id}#main-key"),
|
|
"headers": {
|
|
"Content-Type": 'application/ld+json; profile="https://www.w3.org/ns/activitystreams"',
|
|
},
|
|
"payload": rendered_payload,
|
|
"urls": {endpoint},
|
|
})
|
|
elif protocol == "diaspora":
|
|
if entity.__class__.__name__.startswith("Activitypub") or entity.__class__.__name__.startswith("Matrix"):
|
|
# Don't try to do anything with these entities currently
|
|
skip_ready_payload["diaspora"] = True
|
|
logger.debug('Skipping diaspora payload as payload is activitypub or matrix')
|
|
continue
|
|
if public:
|
|
if skip_ready_payload["diaspora"]:
|
|
logger.debug('Skipping diaspora payload as skip_ready_payload set')
|
|
continue
|
|
if public_key:
|
|
logger.warning("handle_send - Diaspora recipient cannot be public and use encrypted delivery")
|
|
continue
|
|
if not ready_payloads[protocol]["payload"]:
|
|
try:
|
|
# noinspection PyTypeChecker
|
|
ready_payloads[protocol]["payload"] = handle_create_payload(
|
|
entity, author_user, protocol, parent_user=parent_user, payload_logger=payload_logger,
|
|
)
|
|
except Exception as ex:
|
|
# No point continuing for this protocol
|
|
skip_ready_payload["diaspora"] = True
|
|
logger.warning("handle_send - skipping diaspora due to failure to generate payload: %s", ex)
|
|
continue
|
|
ready_payloads["diaspora"]["urls"].add(endpoint)
|
|
else:
|
|
if not public_key:
|
|
logger.warning("handle_send - Diaspora recipient cannot be private without a public key for "
|
|
"encrypted delivery")
|
|
continue
|
|
# Private payload
|
|
try:
|
|
payload = handle_create_payload(
|
|
entity, author_user, "diaspora", to_user_key=public_key, parent_user=parent_user,
|
|
payload_logger=payload_logger,
|
|
)
|
|
payload = json.dumps(payload)
|
|
except Exception as ex:
|
|
logger.error("handle_send - failed to generate private payload for %s: %s", endpoint, ex)
|
|
continue
|
|
payloads.append({
|
|
"auth": None,
|
|
"headers": {
|
|
"Content-Type": "application/json",
|
|
},
|
|
"payload": payload,
|
|
"urls": {endpoint},
|
|
})
|
|
elif protocol == "matrix":
|
|
if skip_ready_payload["matrix"]:
|
|
logger.debug('Skipping matrix payload as skip_ready_payload set')
|
|
continue
|
|
if entity.__class__.__name__.startswith("Activitypub") or entity.__class__.__name__.startswith("Diaspora"):
|
|
# Don't try to do anything with these entities currently
|
|
skip_ready_payload["matrix"] = True
|
|
logger.debug('Skipping matrix payload as payload is activitypub or diaspora')
|
|
continue
|
|
payload_info = []
|
|
# noinspection PyBroadException
|
|
try:
|
|
try:
|
|
# For matrix we actually might get multiple payloads and endpoints
|
|
payload_info = handle_create_payload(
|
|
entity, author_user, protocol, parent_user=parent_user, payload_logger=payload_logger,
|
|
)
|
|
except ValueError as ex:
|
|
# No point continuing for this protocol
|
|
skip_ready_payload["matrix"] = True
|
|
logger.warning("handle_send - skipping matrix due to failure to generate payload: %s", ex)
|
|
continue
|
|
if not matrix_config:
|
|
matrix_config = get_matrix_configuration()
|
|
for payload in payload_info:
|
|
rendered_payload = json.dumps(payload["payload"]).encode("utf-8")
|
|
payloads.append({
|
|
"auth": None,
|
|
"headers": {
|
|
"Authorization": f"Bearer {matrix_config['appservice']['token']}",
|
|
"Content-Type": "application/json",
|
|
},
|
|
"payload": rendered_payload,
|
|
"urls": {payload["endpoint"]},
|
|
"method": payload.get("method"),
|
|
})
|
|
except Exception:
|
|
logger.error(
|
|
"handle_send - failed to generate matrix payload for %s, %s: %s",
|
|
fid, endpoint, traceback.format_exc(),
|
|
extra={
|
|
"recipient": recipient,
|
|
"unique_recipients": list(unique_recipients),
|
|
"payload_info": payload_info,
|
|
"payloads": payloads,
|
|
"ready_payloads": ready_payloads,
|
|
"entity": entity,
|
|
"author": author_user.id,
|
|
"parent_user": parent_user.id if parent_user else None,
|
|
}
|
|
)
|
|
logger.debug('Continuing from matrix payload after error')
|
|
continue
|
|
|
|
# Add public diaspora payload
|
|
if ready_payloads["diaspora"]["payload"]:
|
|
payloads.append({
|
|
"auth": None,
|
|
"headers": {
|
|
"Content-Type": "application/magic-envelope+xml",
|
|
},
|
|
"payload": ready_payloads["diaspora"]["payload"],
|
|
"urls": ready_payloads["diaspora"]["urls"],
|
|
})
|
|
|
|
logger.debug("handle_send - %s", payloads)
|
|
|
|
# Do actual sending
|
|
for payload in payloads:
|
|
for url in payload["urls"]:
|
|
try:
|
|
# TODO send_document and fetch_document need to handle rate limits
|
|
send_document(
|
|
url,
|
|
payload["payload"],
|
|
auth=payload.get("auth"),
|
|
headers=payload.get("headers"),
|
|
method=payload.get("method"),
|
|
)
|
|
except Exception as ex:
|
|
logger.error("handle_send - failed to send payload to %s: %s, payload: %s", url, ex, payload["payload"])
|