Import more stuff from microblog.pub

pull/1/head
Thomas Sileo 2018-06-13 21:42:03 +02:00
rodzic 0574be3475
commit cf0dc36d3b
9 zmienionych plików z 421 dodań i 5 usunięć

Wyświetl plik

@ -237,10 +237,10 @@ class BaseActivity(object, metaclass=_ActivityMeta):
valid_kwargs[k] = v
self._data.update(**valid_kwargs)
def ctx(self) -> Dict[str, Any]:
def ctx(self) -> Any:
return self.__ctx
def set_ctx(self, ctx: Dict[str, Any]) -> None:
def set_ctx(self, ctx: Any) -> None:
self.__ctx = ctx
def _init(self, **kwargs) -> Optional[List[str]]:

Wyświetl plik

@ -0,0 +1,64 @@
import re
from typing import List
from typing import Tuple
from typing import Dict
from bleach.linkifier import Linker
from markdown import markdown
from .webfinger import get_actor_url
from .activitypub import BACKEND
from .activitypub import UninitializedBackendError
def _set_attrs(attrs, new=False):
attrs[(None, "target")] = "_blank"
attrs[(None, "class")] = "external"
attrs[(None, "rel")] = "noopener"
attrs[(None, "title")] = attrs[(None, "href")]
return attrs
LINKER = Linker(callbacks=[_set_attrs])
HASHTAG_REGEX = re.compile(r"(#[\d\w\.]+)")
MENTION_REGEX = re.compile(r"@[\d\w_.+-]+@[\d\w-]+\.[\d\w\-.]+")
def hashtagify(content: str) -> Tuple[str, List[Dict[str, str]]]:
if BACKEND is None:
raise UninitializedBackendError
base_url = BACKEND.base_url()
tags = []
for hashtag in re.findall(HASHTAG_REGEX, content):
tag = hashtag[1:]
link = f'<a href="{base_url}/tags/{tag}" class="mention hashtag" rel="tag">#<span>{tag}</span></a>'
tags.append(dict(href=f"{base_url}/tags/{tag}", name=hashtag, type="Hashtag"))
content = content.replace(hashtag, link)
return content, tags
def mentionify(content: str) -> Tuple[str, List[Dict[str, str]]]:
if BACKEND is None:
raise UninitializedBackendError
tags = []
for mention in re.findall(MENTION_REGEX, content):
_, username, domain = mention.split("@")
actor_url = get_actor_url(mention)
p = BACKEND.fetch_iri(actor_url)
tags.append(dict(type="Mention", href=p["id"], name=mention))
link = f'<span class="h-card"><a href="{p["url"]}" class="u-url mention">@<span>{username}</span></a></span>'
content = content.replace(mention, link)
return content, tags
def parse_markdown(content: str) -> Tuple[str, List[Dict[str, str]]]:
tags = []
content = LINKER.linkify(content)
content, hashtag_tags = hashtagify(content)
tags.extend(hashtag_tags)
content, mention_tags = mentionify(content)
tags.extend(mention_tags)
content = markdown(content)
return content, tags

Wyświetl plik

@ -0,0 +1,101 @@
"""Implements HTTP signature for Flask requests.
Mastodon instances won't accept requests that are not signed using this scheme.
"""
from datetime import datetime
from urllib.parse import urlparse
from typing import Any, Dict, Optional
import base64
import hashlib
import logging
# FIXME(tsileo): no more Flask
from flask import request
from requests.auth import AuthBase
from Crypto.Signature import PKCS1_v1_5
from Crypto.Hash import SHA256
logger = logging.getLogger(__name__)
def _build_signed_string(
signed_headers: str, method: str, path: str, headers: Any, body_digest: str
) -> str:
out = []
for signed_header in signed_headers.split(" "):
if signed_header == "(request-target)":
out.append("(request-target): " + method.lower() + " " + path)
elif signed_header == "digest":
out.append("digest: " + body_digest)
else:
out.append(signed_header + ": " + headers[signed_header])
return "\n".join(out)
def _parse_sig_header(val: Optional[str]) -> Optional[Dict[str, str]]:
if not val:
return None
out = {}
for data in val.split(","):
k, v = data.split("=", 1)
out[k] = v[1 : len(v) - 1] # noqa: black conflict
return out
def _verify_h(signed_string, signature, pubkey):
signer = PKCS1_v1_5.new(pubkey)
digest = SHA256.new()
digest.update(signed_string.encode("utf-8"))
return signer.verify(digest, signature)
def _body_digest() -> str:
h = hashlib.new("sha256")
h.update(request.data)
return "SHA-256=" + base64.b64encode(h.digest()).decode("utf-8")
def verify_request(actor_service) -> bool:
hsig = _parse_sig_header(request.headers.get("Signature"))
if not hsig:
logger.debug("no signature in header")
return False
logger.debug(f"hsig={hsig}")
signed_string = _build_signed_string(
hsig["headers"], request.method, request.path, request.headers, _body_digest()
)
_, rk = actor_service.get_public_key(hsig["keyId"])
return _verify_h(signed_string, base64.b64decode(hsig["signature"]), rk)
class HTTPSigAuth(AuthBase):
def __init__(self, keyid, privkey):
self.keyid = keyid
self.privkey = privkey
def __call__(self, r):
logger.info(f"keyid={self.keyid}")
host = urlparse(r.url).netloc
bh = hashlib.new("sha256")
bh.update(r.body.encode("utf-8"))
bodydigest = "SHA-256=" + base64.b64encode(bh.digest()).decode("utf-8")
date = datetime.utcnow().strftime("%a, %d %b %Y %H:%M:%S GMT")
r.headers.update({"Digest": bodydigest, "Date": date})
r.headers.update({"Host": host})
sigheaders = "(request-target) user-agent host date digest content-type"
to_be_signed = _build_signed_string(
sigheaders, r.method, r.path_url, r.headers, bodydigest
)
signer = PKCS1_v1_5.new(self.privkey)
digest = SHA256.new()
digest.update(to_be_signed.encode("utf-8"))
sig = base64.b64encode(signer.sign(digest))
sig = sig.decode("utf-8")
headers = {
"Signature": f'keyId="{self.keyid}",algorithm="rsa-sha256",headers="{sigheaders}",signature="{sig}"'
}
logger.info(f"signed request headers={headers}")
r.headers.update(headers)
return r

Wyświetl plik

@ -0,0 +1,33 @@
from typing import Optional
from typing import Any
from typing import Dict
from Crypto.PublicKey import RSA
class Key(object):
DEFAULT_KEY_SIZE = 2048
def __init__(self, owner: str) -> None:
self.owner = owner
self.privkey_pem: Optional[str] = None
self.pubkey_pem: Optional[str] = None
self.privkey: Optional[Any] = None
def load(self, privkey_pem: str) -> None:
self.privkey_pem = privkey_pem
self.privkey = RSA.importKey(self.privkey_pem)
self.pubkey_pem = self.privkey.publickey().exportKey("PEM").decode("utf-8")
def new(self) -> None:
k = RSA.generate(self.DEFAULT_KEY_SIZE)
self.privkey_pem = k.exportKey("PEM").decode("utf-8")
self.pubkey_pem = k.publickey().exportKey("PEM").decode("utf-8")
self.privkey = k
def to_dict(self) -> Dict[str, Any]:
return {
"id": f"{self.owner}#main-key",
"owner": self.owner,
"publicKeyPem": self.pubkey_pem,
}

Wyświetl plik

@ -0,0 +1,81 @@
from pyld import jsonld
import hashlib
from datetime import datetime
from Crypto.Signature import PKCS1_v1_5
from Crypto.Hash import SHA256
import base64
import typing
from typing import Any
from typing import Dict
if typing.TYPE_CHECKING:
from .key import Key # noqa: type checking
# cache the downloaded "schemas", otherwise the library is super slow
# (https://github.com/digitalbazaar/pyld/issues/70)
_CACHE: Dict[str, Any] = {}
LOADER = jsonld.requests_document_loader()
def _caching_document_loader(url: str) -> Any:
if url in _CACHE:
return _CACHE[url]
resp = LOADER(url)
_CACHE[url] = resp
return resp
jsonld.set_document_loader(_caching_document_loader)
def _options_hash(doc):
doc = dict(doc["signature"])
for k in ["type", "id", "signatureValue"]:
if k in doc:
del doc[k]
doc["@context"] = "https://w3id.org/identity/v1"
normalized = jsonld.normalize(
doc, {"algorithm": "URDNA2015", "format": "application/nquads"}
)
h = hashlib.new("sha256")
h.update(normalized.encode("utf-8"))
return h.hexdigest()
def _doc_hash(doc):
doc = dict(doc)
if "signature" in doc:
del doc["signature"]
normalized = jsonld.normalize(
doc, {"algorithm": "URDNA2015", "format": "application/nquads"}
)
h = hashlib.new("sha256")
h.update(normalized.encode("utf-8"))
return h.hexdigest()
def verify_signature(doc, pubkey):
to_be_signed = _options_hash(doc) + _doc_hash(doc)
signature = doc["signature"]["signatureValue"]
signer = PKCS1_v1_5.new(pubkey)
digest = SHA256.new()
digest.update(to_be_signed.encode("utf-8"))
return signer.verify(digest, base64.b64decode(signature))
def generate_signature(doc, key: "Key"):
options = {
"type": "RsaSignature2017",
"creator": doc["actor"] + "#main-key",
"created": datetime.utcnow().replace(microsecond=0).isoformat() + "Z",
}
doc["signature"] = options
to_be_signed = _options_hash(doc) + _doc_hash(doc)
signer = PKCS1_v1_5.new(key.privkey)
digest = SHA256.new()
digest.update(to_be_signed.encode("utf-8"))
sig = base64.b64encode(signer.sign(digest))
options["signatureValue"] = sig.decode("utf-8")

Wyświetl plik

@ -0,0 +1,76 @@
from urllib.parse import urlparse
from typing import Dict, Any
from typing import Optional
import logging
import requests
from .urlutils import check_url
logger = logging.getLogger(__name__)
def webfinger(resource: str) -> Optional[Dict[str, Any]]:
"""Mastodon-like WebFinger resolution to retrieve the activity stream Actor URL.
"""
logger.info(f"performing webfinger resolution for {resource}")
protos = ["https", "http"]
if resource.startswith("http://"):
protos.reverse()
host = urlparse(resource).netloc
elif resource.startswith("https://"):
host = urlparse(resource).netloc
else:
if resource.startswith("acct:"):
resource = resource[5:]
if resource.startswith("@"):
resource = resource[1:]
_, host = resource.split("@", 1)
resource = "acct:" + resource
# Security check on the url (like not calling localhost)
check_url(f"https://{host}")
for i, proto in enumerate(protos):
try:
url = f"{proto}://{host}/.well-known/webfinger"
# FIXME(tsileo): BACKEND.do_req so we can set a UserAgent
resp = requests.get(url, {"resource": resource})
except requests.ConnectionError:
# If we tried https first and the domain is "http only"
if i == 0:
continue
break
if resp.status_code == 404:
return None
resp.raise_for_status()
return resp.json()
def get_remote_follow_template(resource: str) -> Optional[str]:
data = webfinger(resource)
if data is None:
return None
for link in data["links"]:
if link.get("rel") == "http://ostatus.org/schema/1.0/subscribe":
return link.get("template")
return None
def get_actor_url(resource: str) -> Optional[str]:
"""Mastodon-like WebFinger resolution to retrieve the activity stream Actor URL.
Returns:
the Actor URL or None if the resolution failed.
"""
data = webfinger(resource)
if data is None:
return None
for link in data["links"]:
if (
link.get("rel") == "self"
and link.get("type") == "application/activity+json"
):
return link.get("href")
return None

Wyświetl plik

@ -20,7 +20,7 @@ REQUIRES_PYTHON = ">=3.6.0"
VERSION = None
REQUIRED = ["requests", "markdown", "pyld", "pycryptodome", "html2text"]
REQUIRED = ["requests", "markdown", "bleach", "pyld", "pycryptodome", "html2text"]
DEPENDENCY_LINKS = []

Wyświetl plik

@ -8,9 +8,8 @@ from little_boxes.backend import Backend
import little_boxes.activitypub as ap
# FIXME(tsileo): keeps differents list of each `as_actor`, and uses `as_actor` as first
# arg for everything.
def track_call(f):
"""Method decorator used to track the events fired during tests."""
fname = f.__name__
def wrapper(*args, **kwargs):

Wyświetl plik

@ -375,3 +375,65 @@ def test_little_boxes_follow_and_new_note_to_followers_and_single_actor_dedup():
lambda create: _assert_eq(create.get_object().id, note.id),
),
)
def test_little_boxes_follow_and_new_create_note():
back, f = test_little_boxes_follow()
me = back.get_user("tom")
other = back.get_user("tom2")
outbox = ap.Outbox(other)
note = ap.Note(
to=[ap.AS_PUBLIC], cc=[other.followers], attributedTo=other.id, content="Hello"
)
create = note.build_create()
outbox.post(create)
back.assert_called_methods(
other,
(
"an Create activity is published",
"outbox_new",
lambda as_actor: _assert_eq(as_actor.id, other.id),
lambda activity: _assert_eq(activity.id, create.id),
),
(
'"outbox_create" hook is called',
"outbox_create",
lambda as_actor: _assert_eq(as_actor.id, other.id),
lambda _create: _assert_eq(_create.id, create.id),
),
(
"the Undo activity is posted to the followee",
"post_to_remote_inbox",
lambda as_actor: _assert_eq(as_actor.id, other.id),
lambda payload: None,
lambda recipient: _assert_eq(recipient, me.inbox),
),
)
back.assert_called_methods(
me,
(
"receiving the Undo, ensure we check the actor is not blocked",
"outbox_is_blocked",
lambda as_actor: _assert_eq(as_actor.id, me.id),
lambda remote_actor: _assert_eq(remote_actor, other.id),
),
(
"receiving the Create activity",
"inbox_new",
lambda as_actor: _assert_eq(as_actor.id, me.id),
lambda activity: _assert_eq(activity.id, create.id),
),
(
'"inbox_create" hook is called',
"inbox_create",
lambda as_actor: _assert_eq(as_actor.id, me.id),
lambda _create: _assert_eq(_create.id, create.id),
),
)
return back, create