Merge branch 'matrix-delivery' into 'master'

First parts of the Matrix payload delivery

See merge request jaywink/federation!162
matrix-profile-room-messages
jaywink 2020-12-25 21:07:53 +00:00
commit 7ac452e65c
15 zmienionych plików z 310 dodań i 44 usunięć

Wyświetl plik

@ -4,16 +4,7 @@
### Added
* Add `federation.hostmeta` generators for Matrix client and server well-known files.
Django views and url configuration also included for convenience.
* Add `register_dendrite_user` Matrix protocol utility to register users on Dendrite
homeservers using a shared registration secret.
* Added configuration for a Matrix appservice to be registered with a homeserver.
* Added a Django view to push incoming Matrix appservice transactions into the congigured
payload processing function.
* WIP Matrix support over an appservice.
## [0.21.0] - 2020-12-20

Wyświetl plik

@ -235,12 +235,14 @@ Some settings need to be set in Django settings. An example is below:
"homeserver_base_url": "https://matrix.domain.tld",
# Homeserver domain and port (not server domain)
"homeserver_domain_with_port": "matrix.domain.tld:443",
# Homeserver name
"homeserver_name": "domain.tld",
# Appservice details
"appservice": {
# Unique ID to register with at the homeserver. Don't change this after creating.
"id": "uniqueid",
# Appservice user localpart (lowercase, should ideally start with _)
"sender_localpart": "_myawesomeapp",
# Short code (a-z only), used for various things like namespacing
"shortcode": "federatedapp",
# Secret token for communication
"token": "secret_token",
},

Wyświetl plik

@ -133,6 +133,7 @@ class Profile(CreatedAtMixin, OptionalRawContentMixin, PublicMixin, BaseEntity):
url = ""
username = ""
inboxes: Dict = None
mxid = ""
_allowed_children = (Image,)

Wyświetl plik

@ -0,0 +1,93 @@
import logging
from typing import Dict, List
from federation.entities.base import Post, Profile
from federation.entities.matrix.enums import EventType
from federation.entities.mixins import BaseEntity
from federation.entities.utils import get_base_attributes
from federation.utils.matrix import get_matrix_configuration, appservice_auth_header
from federation.utils.network import fetch_document
logger = logging.getLogger("federation")
class MatrixEntityMixin(BaseEntity):
_event_type: str = None
_txn_id: str = None
@property
def event_type(self) -> str:
return self._event_type
@classmethod
def from_base(cls, entity):
# type: (BaseEntity) -> MatrixEntityMixin
# noinspection PyArgumentList
return cls(**get_base_attributes(entity))
def get_endpoint(self, *args, **kwargs) -> str:
config = get_matrix_configuration()
return f"{config['homeserver_base_url']}/_matrix/client/r0"
# noinspection PyMethodMayBeStatic
def payloads(self) -> List[Dict]:
return []
@property
def txn_id(self) -> str:
return self._txn_id
class MatrixRoomMessage(Post, MatrixEntityMixin):
_event_type = EventType.ROOM_MESSAGE.value
def get_endpoint(self, fid: str, user_id: str) -> str:
endpoint = super().get_endpoint()
return f"{endpoint}/rooms/{fid}/send/{self.event_type}/{self.txn_id}?user_id={user_id}"
class MatrixProfile(Profile, MatrixEntityMixin):
_remote_create_needed = False
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# We always require an mxid
self._required.append('mxid')
@property
def localpart(self) -> str:
config = get_matrix_configuration()
return self.mxid.replace("@", "").replace(f":{config['homeserver_name']}", "")
def payloads(self) -> List[Dict]:
payloads = super().payloads()
if self._remote_create_needed:
payloads.append({
"endpoint": f"{super().get_endpoint()}/register",
"payload": {
"username": f"{self.localpart}",
"type": "m.login.application_service",
},
})
payloads.append({
"endpoint": f"{super().get_endpoint()}/profile/{self.mxid}/displayname?user_id={self.mxid}",
"payload": {
"displayname": self.name,
},
"method": "put",
})
# TODO avatar url in mxc format
return payloads
def pre_send(self):
"""
Check whether we need to create this user.
"""
doc, status, error = fetch_document(
url=f"{super().get_endpoint()}/profile/{self.mxid}",
extra_headers=appservice_auth_header(),
)
if status == 200:
return
self._remote_create_needed = True

Wyświetl plik

@ -0,0 +1,11 @@
from enum import Enum
class EnumBase(Enum):
@classmethod
def values(cls):
return [value.value for value in cls.__members__.values()]
class EventType(EnumBase):
ROOM_MESSAGE = "m.room.message"

Wyświetl plik

@ -0,0 +1,39 @@
import logging
from federation.entities.base import Profile, Post
from federation.entities.matrix.entities import MatrixRoomMessage, MatrixProfile
from federation.entities.mixins import BaseEntity
logger = logging.getLogger("federation")
def get_outbound_entity(entity: BaseEntity, private_key):
"""Get the correct outbound entity for this protocol.
:arg entity: An entity instance which can be of a base or protocol entity class.
:arg private_key: Private key of sender in str format
:returns: Protocol specific entity class instance.
:raises ValueError: If conversion cannot be done.
"""
if getattr(entity, "outbound_doc", None):
# If the entity already has an outbound doc, just return the entity as is
return entity
outbound = None
cls = entity.__class__
if cls in [
MatrixProfile,
MatrixRoomMessage,
]:
# Already fine
outbound = entity
elif cls == Post:
outbound = MatrixRoomMessage.from_base(entity)
elif cls == Profile:
outbound = MatrixProfile.from_base(entity)
if not outbound:
raise ValueError("Don't know how to convert this base entity to Matrix protocol entities.")
if hasattr(outbound, "pre_send"):
outbound.pre_send()
# Validate the entity
outbound.validate(direction="outbound")
return outbound

Wyświetl plik

@ -7,6 +7,7 @@ from typing import List, Dict, Union
# noinspection PyPackageRequirements
from Crypto.PublicKey import RSA
# noinspection PyPackageRequirements
from Crypto.PublicKey.RSA import RsaKey
from iteration_utilities import unique_everseen
@ -14,6 +15,7 @@ from federation.entities.activitypub.constants import NAMESPACE_PUBLIC
from federation.entities.mixins import BaseEntity
from federation.protocols.activitypub.signing import get_http_authentication
from federation.types import UserType
from federation.utils.matrix import get_matrix_configuration
from federation.utils.network import send_document
logger = logging.getLogger("federation")
@ -26,7 +28,7 @@ def handle_create_payload(
to_user_key: RsaKey = None,
parent_user: UserType = None,
payload_logger: callable = None,
) -> Union[str, dict]:
) -> Union[str, Dict, List[Dict]]:
"""Create a payload with the given protocol.
Any given user arguments must have ``private_key`` and ``handle`` attributes.
@ -40,11 +42,13 @@ def handle_create_payload(
be generated. If given, the payload will be sent as this user.
:arg payload_logger: (Optional) Function to log the payloads with.
:returns: Built payload (str or dict)
:returns: Built payload(s) (str or dict or list (of payloads))
"""
mappers = importlib.import_module(f"federation.entities.{protocol_name}.mappers")
protocol = importlib.import_module(f"federation.protocols.{protocol_name}.protocol")
# noinspection PyUnresolvedReferences
protocol = protocol.Protocol()
# noinspection PyUnresolvedReferences
outbound_entity = mappers.get_outbound_entity(entity, author_user.rsa_private_key)
if parent_user:
outbound_entity.sign_with_parent(parent_user.rsa_private_key)
@ -119,6 +123,12 @@ def handle_send(
"protocol": "activitypub",
"public": False,
},
{
"endpoint": "https://matrix.domain.tld",
"fid": "#@user:domain.tld",
"protocol": "matrix",
"public": True,
}
]
:arg parent_user: (Optional) User object of the parent object, if there is one. This must be given for the
Diaspora protocol if a parent object exists, so that a proper ``parent_author_signature`` can
@ -129,11 +139,19 @@ def handle_send(
ready_payloads = {
"activitypub": {
"auth": None,
"headers": {},
"payload": None,
"urls": set(),
},
"diaspora": {
"auth": None,
"headers": {},
"payload": None,
"urls": set(),
},
"matrix": {
"auth": None,
"headers": {},
"payload": None,
"urls": set(),
},
@ -141,12 +159,15 @@ def handle_send(
skip_ready_payload = {
"activitypub": False,
"diaspora": False,
"matrix": False,
}
# Flatten to unique recipients
# TODO supply a callable that empties "fid" in the case that public=True
unique_recipients = unique_everseen(recipients)
matrix_config = None
# Generate payloads and collect urls
for recipient in unique_recipients:
payload = None
@ -161,10 +182,11 @@ def handle_send(
if protocol == "activitypub":
if skip_ready_payload["activitypub"]:
continue
if entity.__class__.__name__.startswith("Diaspora"):
# Don't try to do anything with Diaspora entities currently
if entity.__class__.__name__.startswith("Diaspora") or entity.__class__.__name__.startswith("Matrix"):
# Don't try to do anything with these entities currently
skip_ready_payload["activitypub"] = True
continue
# noinspection PyBroadException
try:
if not ready_payloads[protocol]["payload"]:
try:
@ -191,7 +213,8 @@ def handle_send(
rendered_payload = json.dumps(payload).encode("utf-8")
except Exception:
logger.error(
"handle_send - failed to generate payload for %s, %s: %s", fid, endpoint, traceback.format_exc(),
"handle_send - failed to generate activitypub payload for %s, %s: %s",
fid, endpoint, traceback.format_exc(),
extra={
"recipient": recipient,
"unique_recipients": list(unique_recipients),
@ -206,13 +229,15 @@ def handle_send(
continue
payloads.append({
"auth": get_http_authentication(author_user.rsa_private_key, f"{author_user.id}#main-key"),
"headers": {
"Content-Type": 'application/ld+json; profile="https://www.w3.org/ns/activitystreams"',
},
"payload": rendered_payload,
"content_type": 'application/ld+json; profile="https://www.w3.org/ns/activitystreams"',
"urls": {endpoint},
})
elif protocol == "diaspora":
if entity.__class__.__name__.startswith("Activitypub"):
# Don't try to do anything with Activitypub entities currently
if entity.__class__.__name__.startswith("Activitypub") or entity.__class__.__name__.startswith("Matrix"):
# Don't try to do anything with these entities currently
skip_ready_payload["diaspora"] = True
continue
if public:
@ -249,14 +274,73 @@ def handle_send(
logger.error("handle_send - failed to generate private payload for %s: %s", endpoint, ex)
continue
payloads.append({
"urls": {endpoint}, "payload": payload, "content_type": "application/json", "auth": None,
"auth": None,
"headers": {
"Content-Type": "application/json",
},
"payload": payload,
"urls": {endpoint},
})
elif protocol == "matrix":
if skip_ready_payload["matrix"]:
continue
if entity.__class__.__name__.startswith("Activitypub") or entity.__class__.__name__.startswith("Diaspora"):
# Don't try to do anything with these entities currently
skip_ready_payload["matrix"] = True
continue
payload_info = []
# noinspection PyBroadException
try:
try:
# For matrix we actually might get multiple payloads and endpoints
payload_info = handle_create_payload(
entity, author_user, protocol, parent_user=parent_user, payload_logger=payload_logger,
)
except ValueError as ex:
# No point continuing for this protocol
skip_ready_payload["matrix"] = True
logger.warning("handle_send - skipping matrix due to failure to generate payload: %s", ex)
continue
if not matrix_config:
matrix_config = get_matrix_configuration()
for payload in payload_info:
rendered_payload = json.dumps(payload["payload"]).encode("utf-8")
payloads.append({
"auth": None,
"headers": {
"Authorization": f"Bearer {matrix_config['appservice']['token']}",
"Content-Type": "application/json",
},
"payload": rendered_payload,
"urls": {payload["endpoint"]},
"method": payload["method"],
})
except Exception:
logger.error(
"handle_send - failed to generate matrix payload for %s, %s: %s",
fid, endpoint, traceback.format_exc(),
extra={
"recipient": recipient,
"unique_recipients": list(unique_recipients),
"payload_info": payload_info,
"payloads": payloads,
"ready_payloads": ready_payloads,
"entity": entity,
"author": author_user.id,
"parent_user": parent_user.id,
}
)
continue
# Add public diaspora payload
if ready_payloads["diaspora"]["payload"]:
payloads.append({
"urls": ready_payloads["diaspora"]["urls"], "payload": ready_payloads["diaspora"]["payload"],
"content_type": "application/magic-envelope+xml", "auth": None,
"auth": None,
"headers": {
"Content-Type": "application/magic-envelope+xml",
},
"payload": ready_payloads["diaspora"]["payload"],
"urls": ready_payloads["diaspora"]["urls"],
})
logger.debug("handle_send - %s", payloads)
@ -265,11 +349,13 @@ def handle_send(
for payload in payloads:
for url in payload["urls"]:
try:
# TODO send_document and fetch_document need to handle rate limits
send_document(
url,
payload["payload"],
auth=payload["auth"],
headers={"Content-Type": payload["content_type"]},
auth=payload.get("auth"),
headers=payload.get("headers"),
method=payload.get("method"),
)
except Exception as ex:
logger.error("handle_send - failed to send payload to %s: %s, payload: %s", url, ex, payload["payload"])

Wyświetl plik

@ -23,19 +23,30 @@ def get_registration_config() -> Dict:
"url": f"{config['base_url']}/matrix",
"as_token": matrix_config["appservice"]["token"],
"hs_token": matrix_config["appservice"]["token"],
"sender_localpart": matrix_config["appservice"]["sender_localpart"],
"sender_localpart": f'_{matrix_config["appservice"]["shortcode"]}',
"namespaces": {
# We reserve two namespaces
# One is not exclusive, since we're interested in events of "real" users
# One is exclusive, the ones that represent "remote to us but managed by us towards Matrix"
"users": [
{
"exclusive": False,
"regex": "@.*",
},
{
"exclusive": True,
"regex": f"@_{matrix_config['appservice']['shortcode']}_.*"
},
],
"aliases": [
{
"exclusive": False,
"regex": "#.*",
}
},
{
"exclusive": True,
"regex": f"#_{matrix_config['appservice']['shortcode']}_.*"
},
],
"rooms": [],
}

Wyświetl plik

@ -1,12 +1,9 @@
import json
import logging
import re
from typing import Callable, Tuple, Union, Dict
from typing import Callable, Tuple, List, Dict
# noinspection PyPackageRequirements
from Crypto.PublicKey.RSA import RsaKey
from federation.entities.mixins import BaseEntity
from federation.entities.matrix.entities import MatrixEntityMixin
from federation.types import UserType, RequestType
from federation.utils.text import decode_if_bytes
@ -45,17 +42,16 @@ class Protocol:
request = None
user = None
def build_send(self, entity: BaseEntity, from_user: UserType, to_user_key: RsaKey = None) -> Union[str, Dict]:
# noinspection PyUnusedLocal
@staticmethod
def build_send(entity: MatrixEntityMixin, *args, **kwargs) -> List[Dict]:
"""
Build POST data for sending out to the homeserver.
:param entity: The outbound ready entity for this protocol.
:param from_user: The user sending this payload. Must have ``private_key`` and ``id`` properties.
:param to_user_key: (Optional) Public key of user we're sending a private payload to.
:returns: dict or string depending on if private or public payload.
:returns: list of payloads
"""
# TODO TBD
return {}
return entity.payloads()
def extract_actor(self):
# TODO TBD

Wyświetl plik

@ -34,9 +34,10 @@ def matrix_config_func() -> Dict:
return {
"homeserver_base_url": "https://matrix.domain.tld",
"homeserver_domain_with_port": "matrix.domain.tld:443",
"homeserver_name": "domain.tld",
"appservice": {
"id": "uniqueid",
"sender_localpart": "_myawesomeapp",
"shortcode": "myawesomeapp",
"token": "secret_token",
},
"identity_server_base_url": "https://id.domain.tld",

Wyświetl plik

@ -27,7 +27,7 @@ class TestGetBaseAttributes:
"created_at", "name", "email", "gender", "raw_content", "location", "public",
"nsfw", "public_key", "image_urls", "tag_list", "signature", "url", "atom_url",
"base_url", "id", "actor_id", "handle", "handle", "guid", "activity", "activity_id", "username",
"inboxes",
"inboxes", "mxid",
}

Wyświetl plik

@ -15,12 +15,20 @@ def test_get_registration():
"exclusive": False,
"regex": "@.*",
},
{
"exclusive": True,
"regex": "@_myawesomeapp_.*",
},
],
"aliases": [
{
"exclusive": False,
"regex": "#.*",
}
},
{
"exclusive": True,
"regex": "#_myawesomeapp_.*",
},
],
"rooms": [],
}

Wyświetl plik

@ -2,7 +2,9 @@ from enum import Enum
from typing import Optional, Dict, Union
import attr
# noinspection PyPackageRequirements
from Crypto.PublicKey import RSA
# noinspection PyPackageRequirements
from Crypto.PublicKey.RSA import RsaKey
@ -26,6 +28,15 @@ class ReceiverVariant(Enum):
FOLLOWERS = "followers"
# TODO needed?
class UserVariant(Enum):
"""
Indicates whether the user is local or remote.
"""
LOCAL = "local"
REMOTE = "remote"
@attr.s(frozen=True)
class UserType:
id: str = attr.ib()
@ -36,6 +47,11 @@ class UserType:
handle: Optional[str] = attr.ib(default=None)
guid: Optional[str] = attr.ib(default=None)
# Required only if sending to Matrix protocol
mxid: Optional[str] = attr.ib(default=None)
# TODO needed?
variant: Optional[UserVariant] = attr.ib(default=None)
@property
def rsa_private_key(self) -> RsaKey:
if isinstance(self.private_key, str):

Wyświetl plik

@ -8,6 +8,13 @@ import requests
from federation.utils.django import get_function_from_config
def appservice_auth_header() -> Dict:
config = get_matrix_configuration()
return {
"Authorization": f"Bearer {config['appservice']['token']}",
}
def generate_dendrite_mac(shared_secret: str, username: str, password: str, admin: bool) -> str:
"""
Generate a MAC for using in registering users with Dendrite.

Wyświetl plik

@ -157,7 +157,7 @@ def parse_http_date(date):
raise ValueError("%r is not a valid date" % date) from exc
def send_document(url, data, timeout=10, *args, **kwargs):
def send_document(url, data, timeout=10, method="post", *args, **kwargs):
"""Helper method to send a document via POST.
Additional ``*args`` and ``**kwargs`` will be passed on to ``requests.post``.
@ -165,9 +165,12 @@ def send_document(url, data, timeout=10, *args, **kwargs):
:arg url: Full url to send to, including protocol
:arg data: Dictionary (will be form-encoded), bytes, or file-like object to send in the body
:arg timeout: Seconds to wait for response (defaults to 10)
:arg method: Method to use, defaults to post
:returns: Tuple of status code (int or None) and error (exception class instance or None)
"""
logger.debug("send_document: url=%s, data=%s, timeout=%s", url, data, timeout)
logger.debug("send_document: url=%s, data=%s, timeout=%s, method=%s", url, data, timeout, method)
if not method:
method = "post"
headers = CaseInsensitiveDict({
'User-Agent': USER_AGENT,
})
@ -177,8 +180,9 @@ def send_document(url, data, timeout=10, *args, **kwargs):
kwargs.update({
"data": data, "timeout": timeout, "headers": headers
})
request_func = getattr(requests, method)
try:
response = requests.post(url, *args, **kwargs)
response = request_func(url, *args, **kwargs)
logger.debug("send_document: response status code %s", response.status_code)
return response.status_code, None
except RequestException as ex: