Identify new style Diaspora encrypted payload

Refs: #42
merge-requests/130/head
Jason Robinson 2017-05-13 00:12:35 +03:00
rodzic dda596affc
commit 474db3744b
3 zmienionych plików z 29 dodań i 6 usunięć

Wyświetl plik

@ -1,7 +1,7 @@
import json
import logging
import warnings
from base64 import b64decode, urlsafe_b64decode, b64encode, urlsafe_b64encode
from json import loads, dumps
from urllib.parse import unquote_plus
from Crypto.Cipher import AES, PKCS1_v1_5
@ -24,13 +24,25 @@ MAGIC_ENV_TAG = "{http://salmon-protocol.org/ns/magic-env}env"
def identify_payload(payload):
"""Try to identify whether this is a Diaspora payload.
Try first public message. Then private message. The check if this is a legacy payload.
"""
# Private encrypted JSON payload
try:
data = json.loads(payload)
if "encrypted_magic_envelope" in data:
return True
except Exception:
pass
# Public XML payload
try:
xml = etree.fromstring(bytes(payload, encoding="utf-8"))
if xml.tag == MAGIC_ENV_TAG:
return True
except Exception:
pass
# Check for legacy
# Legacy XML payload
try:
xml = unquote_plus(payload)
return xml.find('xmlns="%s"' % PROTOCOL_NS) > -1
@ -160,7 +172,7 @@ class Protocol(BaseProtocol):
key and hence the passphrase for the key.
"""
decoded_json = b64decode(b64data.encode("ascii"))
rep = loads(decoded_json.decode("ascii"))
rep = json.loads(decoded_json.decode("ascii"))
outer_key_details = self.decrypt_outer_aes_key_bundle(
rep["aes_key"], key)
header = self.get_decrypted_header(
@ -182,7 +194,7 @@ class Protocol(BaseProtocol):
b64decode(data.encode("ascii")),
sentinel=None
)
return loads(decoded_json.decode("ascii"))
return json.loads(decoded_json.decode("ascii"))
def get_decrypted_header(self, ciphertext, key, iv):
"""
@ -282,7 +294,7 @@ class Protocol(BaseProtocol):
"""
Record the information on the key used to encrypt the header.
"""
d = dumps({
d = json.dumps({
"iv": b64encode(self.outer_iv).decode("ascii"),
"key": b64encode(self.outer_key).decode("ascii")
})
@ -306,7 +318,7 @@ class Protocol(BaseProtocol):
public_key)).decode("ascii")
ciphertext = b64encode(self.create_ciphertext()).decode("ascii")
d = dumps({
d = json.dumps({
"aes_key": aes_key,
"ciphertext": ciphertext
})

Wyświetl plik

@ -39,6 +39,13 @@ DIASPORA_PUBLIC_PAYLOAD = """<?xml version='1.0' encoding='UTF-8'?>
"""
DIASPORA_ENCRYPTED_PAYLOAD = """{
"aes_key": "...",
"encrypted_magic_envelope": "..."
}
"""
DIASPORA_POST_LEGACY = """<XML>
<post>
<status_message>

Wyświetl plik

@ -10,6 +10,7 @@ from federation.protocols.diaspora.protocol import Protocol, identify_payload
from federation.tests.fixtures.payloads import (
ENCRYPTED_LEGACY_DIASPORA_PAYLOAD, UNENCRYPTED_LEGACY_DIASPORA_PAYLOAD,
DIASPORA_PUBLIC_PAYLOAD,
DIASPORA_ENCRYPTED_PAYLOAD,
)
@ -123,6 +124,9 @@ class TestDiasporaProtocol(DiasporaTestBase):
def test_identify_payload_with_diaspora_public_payload(self):
assert identify_payload(DIASPORA_PUBLIC_PAYLOAD) == True
def test_identify_payload_with_diaspora_encrypted_payload(self):
assert identify_payload(DIASPORA_ENCRYPTED_PAYLOAD) == True
def test_identify_payload_with_other_payload(self):
assert identify_payload("foobar not a diaspora protocol") == False