Identify new style Diaspora protocol public payload

This change is coming out in Diaspora version 0.7 soon.

Refs: #42
merge-requests/130/head
Jason Robinson 2017-05-12 23:59:14 +03:00
rodzic 273b7ba47a
commit dda596affc
4 zmienionych plików z 46 dodań i 20 usunięć

Wyświetl plik

@ -20,14 +20,23 @@ logger = logging.getLogger("federation")
PROTOCOL_NAME = "diaspora"
PROTOCOL_NS = "https://joindiaspora.com/protocol"
MAGIC_ENV_TAG = "{http://salmon-protocol.org/ns/magic-env}env"
def identify_payload(payload):
try:
xml = etree.fromstring(bytes(payload, encoding="utf-8"))
if xml.tag == MAGIC_ENV_TAG:
return True
except Exception:
pass
# Check for legacy
try:
xml = unquote_plus(payload)
return xml.find('xmlns="%s"' % PROTOCOL_NS) > -1
except Exception:
return False
pass
return False
class Protocol(BaseProtocol):

Wyświetl plik

@ -1,4 +1,4 @@
ENCRYPTED_DIASPORA_PAYLOAD = """<?xml version='1.0'?>
ENCRYPTED_LEGACY_DIASPORA_PAYLOAD = """<?xml version='1.0'?>
<diaspora xmlns="https://joindiaspora.com/protocol" xmlns:me="http://salmon-protocol.org/ns/magic-env">
<encrypted_header>{encrypted_header}</encrypted_header>
<me:env>
@ -11,7 +11,7 @@ ENCRYPTED_DIASPORA_PAYLOAD = """<?xml version='1.0'?>
"""
UNENCRYPTED_DIASPORA_PAYLOAD = """<?xml version='1.0'?>
UNENCRYPTED_LEGACY_DIASPORA_PAYLOAD = """<?xml version='1.0'?>
<diaspora xmlns="https://joindiaspora.com/protocol" xmlns:me="http://salmon-protocol.org/ns/magic-env">
<header>
<author_id>bob@example.com</author_id>
@ -26,6 +26,19 @@ UNENCRYPTED_DIASPORA_PAYLOAD = """<?xml version='1.0'?>
"""
DIASPORA_PUBLIC_PAYLOAD = """<?xml version='1.0' encoding='UTF-8'?>
<me:env xmlns:me="http://salmon-protocol.org/ns/magic-env">
<me:encoding>base64url</me:encoding>
<me:alg>RSA-SHA256</me:alg>
<me:data type="application/xml">PHN0YXR1c19tZXNzYWdlPjxmb28-YmFyPC9mb28-PC9zdGF0dXNfbWVzc2FnZT4=</me:data>
<me:sig key_id="Zm9vYmFyQGV4YW1wbGUuY29t">Cmk08MR4Tp8r9eVybD1hORcR_8NLRVxAu0biOfJbkI1xLx1c480zJ720cpVyKaF9""" \
"""CxVjW3lvlvRz5YbswMv0izPzfHpXoWTXH-4UPrXaGYyJnrNvqEB2UWn4iHKJ2Rerto8sJY2b95qbXD6Nq75EoBNub5P7DYc16ENhp3""" \
"""8YwBRnrBEvNOewddpOpEBVobyNB7no_QR8c_xkXie-hUDFNwI0z7vax9HkaBFbvEmzFPMZAAdWyjxeGiWiqY0t2ZdZRCPTezy66X6Q0""" \
"""qc4I8kfT-Mt1ctjGmNMoJ4Lgu-PrO5hSRT4QBAVyxaog5w-B0PIPuC-mUW5SZLsnX3_ZuwJww==</me:sig>
</me:env>
"""
DIASPORA_POST_LEGACY = """<XML>
<post>
<status_message>

Wyświetl plik

@ -1,4 +1,3 @@
# -*- coding: utf-8 -*-
from base64 import urlsafe_b64decode
from unittest.mock import Mock, patch
from xml.etree.ElementTree import ElementTree
@ -8,10 +7,13 @@ import pytest
from federation.exceptions import EncryptedMessageError, NoSenderKeyFoundError, NoHeaderInMessageError
from federation.protocols.diaspora.protocol import Protocol, identify_payload
from federation.tests.fixtures.payloads import ENCRYPTED_DIASPORA_PAYLOAD, UNENCRYPTED_DIASPORA_PAYLOAD
from federation.tests.fixtures.payloads import (
ENCRYPTED_LEGACY_DIASPORA_PAYLOAD, UNENCRYPTED_LEGACY_DIASPORA_PAYLOAD,
DIASPORA_PUBLIC_PAYLOAD,
)
class MockUser(object):
class MockUser():
private_key = "foobar"
def __init__(self, nokey=False):
@ -27,15 +29,15 @@ def mock_not_found_get_contact_key(contact):
return None
class DiasporaTestBase(object):
class DiasporaTestBase():
def init_protocol(self):
return Protocol()
def get_unencrypted_doc(self):
return etree.fromstring(UNENCRYPTED_DIASPORA_PAYLOAD)
return etree.fromstring(UNENCRYPTED_LEGACY_DIASPORA_PAYLOAD)
def get_encrypted_doc(self):
return etree.fromstring(ENCRYPTED_DIASPORA_PAYLOAD)
return etree.fromstring(ENCRYPTED_LEGACY_DIASPORA_PAYLOAD)
def get_mock_user(self, nokey=False):
return MockUser(nokey)
@ -68,7 +70,7 @@ class TestDiasporaProtocol(DiasporaTestBase):
protocol = self.init_protocol()
user = self.get_mock_user()
protocol.get_message_content = self.mock_get_message_content
sender, content = protocol.receive(UNENCRYPTED_DIASPORA_PAYLOAD, user, mock_get_contact_key,
sender, content = protocol.receive(UNENCRYPTED_LEGACY_DIASPORA_PAYLOAD, user, mock_get_contact_key,
skip_author_verification=True)
assert sender == "bob@example.com"
assert content == "<content />"
@ -80,7 +82,7 @@ class TestDiasporaProtocol(DiasporaTestBase):
return_value="<content><diaspora_handle>bob@example.com</diaspora_handle></content>"
)
protocol.parse_header = Mock(return_value="foobar")
sender, content = protocol.receive(ENCRYPTED_DIASPORA_PAYLOAD, user, mock_get_contact_key,
sender, content = protocol.receive(ENCRYPTED_LEGACY_DIASPORA_PAYLOAD, user, mock_get_contact_key,
skip_author_verification=True)
assert sender == "bob@example.com"
assert content == "<content><diaspora_handle>bob@example.com</diaspora_handle></content>"
@ -88,19 +90,19 @@ class TestDiasporaProtocol(DiasporaTestBase):
def test_receive_raises_on_encrypted_message_and_no_user(self):
protocol = self.init_protocol()
with pytest.raises(EncryptedMessageError):
protocol.receive(ENCRYPTED_DIASPORA_PAYLOAD)
protocol.receive(ENCRYPTED_LEGACY_DIASPORA_PAYLOAD)
def test_receive_raises_on_encrypted_message_and_no_user_key(self):
protocol = self.init_protocol()
user = self.get_mock_user(nokey=True)
with pytest.raises(EncryptedMessageError):
protocol.receive(ENCRYPTED_DIASPORA_PAYLOAD, user)
protocol.receive(ENCRYPTED_LEGACY_DIASPORA_PAYLOAD, user)
def test_receive_raises_if_sender_key_cannot_be_found(self):
protocol = self.init_protocol()
user = self.get_mock_user()
with pytest.raises(NoSenderKeyFoundError):
protocol.receive(UNENCRYPTED_DIASPORA_PAYLOAD, user, mock_not_found_get_contact_key)
protocol.receive(UNENCRYPTED_LEGACY_DIASPORA_PAYLOAD, user, mock_not_found_get_contact_key)
def test_find_header_raises_if_header_cannot_be_found(self):
protocol = self.init_protocol()
@ -115,8 +117,11 @@ class TestDiasporaProtocol(DiasporaTestBase):
body = protocol.get_message_content()
assert body == urlsafe_b64decode("{data}".encode("ascii"))
def test_identify_payload_with_diaspora_payload(self):
assert identify_payload(UNENCRYPTED_DIASPORA_PAYLOAD) == True
def test_identify_payload_with_legacy_diaspora_payload(self):
assert identify_payload(UNENCRYPTED_LEGACY_DIASPORA_PAYLOAD) == True
def test_identify_payload_with_diaspora_public_payload(self):
assert identify_payload(DIASPORA_PUBLIC_PAYLOAD) == True
def test_identify_payload_with_other_payload(self):
assert identify_payload("foobar not a diaspora protocol") == False

Wyświetl plik

@ -1,4 +1,3 @@
# -*- coding: utf-8 -*-
from unittest.mock import patch
import pytest
@ -6,12 +5,12 @@ import pytest
from federation.exceptions import NoSuitableProtocolFoundError
from federation.inbound import handle_receive
from federation.protocols.diaspora.protocol import Protocol
from federation.tests.fixtures.payloads import UNENCRYPTED_DIASPORA_PAYLOAD
from federation.tests.fixtures.payloads import UNENCRYPTED_LEGACY_DIASPORA_PAYLOAD
class TestHandleReceiveProtocolIdentification(object):
class TestHandleReceiveProtocolIdentification():
def test_handle_receive_routes_to_identified_protocol(self):
payload = UNENCRYPTED_DIASPORA_PAYLOAD
payload = UNENCRYPTED_LEGACY_DIASPORA_PAYLOAD
with patch.object(
Protocol,
'receive',