bridgy-fed/common.py

233 wiersze
6.8 KiB
Python

# coding=utf-8
"""Misc common utilities.
"""
import base64
import copy
from datetime import timedelta
import logging
import re
import threading
import urllib.parse
import cachetools
from Crypto.Util import number
from flask import abort, g, make_response, request
from granary import as1, as2, microformats2
import mf2util
from oauth_dropins.webutil import util, webmention
from oauth_dropins.webutil.appengine_info import DEBUG
from oauth_dropins.webutil.util import json_dumps, json_loads
from werkzeug.exceptions import BadRequest
logger = logging.getLogger(__name__)
# allow hostname chars (a-z, 0-9, -), allow arbitrary unicode (eg ☃.net), don't
# allow specific chars that we'll often see in webfinger, AP handles, etc. (@, :)
# https://stackoverflow.com/questions/10306690/what-is-a-regular-expression-which-will-match-a-valid-domain-name-without-a-subd
#
# uses $ at end but not ^ at the beginning so that it can be used to match just
# part of a URL path segment, eg for /acct:user.com in webfinger.py.
#
# TODO: preprocess with domain2idna, then narrow this to just [a-z0-9-]
DOMAIN_RE = r'[^/:;@_?!\']+\.[^/:@_?!\']+$'
TLD_BLOCKLIST = ('7z', 'asp', 'aspx', 'gif', 'html', 'ico', 'jpg', 'jpeg', 'js',
'json', 'php', 'png', 'rar', 'txt', 'yaml', 'yml', 'zip')
CONTENT_TYPE_HTML = 'text/html; charset=utf-8'
PRIMARY_DOMAIN = 'fed.brid.gy'
# protocol-specific subdomains are under this "super"domain
SUPERDOMAIN = '.brid.gy'
# TODO: add a Flask route decorator version of util.canonicalize_domain, then
# use it to canonicalize most UI routes from these to fed.brid.gy.
OTHER_DOMAINS = (
'ap.brid.gy',
'atp.brid.gy',
'bluesky.brid.gy',
'bsky.brid.gy',
'bridgy-federated.appspot.com',
'bridgy-federated.uc.r.appspot.com',
'nostr.brid.gy',
'web.brid.gy',
)
LOCAL_DOMAINS = (
'localhost',
'localhost:8080',
'my.dev.com:8080',
)
DOMAINS = (PRIMARY_DOMAIN,) + OTHER_DOMAINS + LOCAL_DOMAINS
# TODO: unify with Bridgy's
DOMAIN_BLOCKLIST = frozenset((
# https://github.com/snarfed/bridgy-fed/issues/348
'aaronparecki.com',
'facebook.com',
'fb.com',
't.co',
'twitter.com',
) + DOMAINS)
CACHE_TIME = timedelta(seconds=60)
class NoMicroformats(BadRequest):
"""Raised by :meth:`Web.fetch` when a page has no microformats2."""
pass
def base64_to_long(x):
"""Converts x from URL safe base64 encoding to a long integer.
Originally from django_salmon.magicsigs. Used in :meth:`User.public_pem`
and :meth:`User.private_pem`.
"""
return number.bytes_to_long(base64.urlsafe_b64decode(x))
def long_to_base64(x):
"""Converts x from a long integer to base64 URL safe encoding.
Originally from django_salmon.magicsigs. Used in :meth:`User.get_or_create`.
"""
return base64.urlsafe_b64encode(number.long_to_bytes(x))
def host_url(path_query=None):
base = request.host_url
if (util.domain_or_parent_in(request.host, OTHER_DOMAINS) or
# when running locally against prod datastore
(not DEBUG and request.host in LOCAL_DOMAINS)):
base = f'https://{PRIMARY_DOMAIN}'
return urllib.parse.urljoin(base, path_query)
def error(msg, status=400, exc_info=None, **kwargs):
"""Like flask_util.error, but wraps body in JSON."""
logger.info(f'Returning {status}: {msg}', exc_info=exc_info)
abort(status, response=make_response({'error': msg}, status), **kwargs)
def pretty_link(url, text=None, **kwargs):
"""Wrapper around util.pretty_link() that converts Mastodon user URLs to @-@.
Eg for URLs like https://mastodon.social/@foo and
https://mastodon.social/users/foo, defaults text to @foo@mastodon.social if
it's not provided.
Args:
url: str
text: str
kwargs: passed through to :func:`webutil.util.pretty_link`
"""
if g.user and g.user.is_web_url(url):
return g.user.user_page_link()
if text is None:
match = re.match(r'https?://([^/]+)/(@|users/)([^/]+)$', url)
if match:
text = match.expand(r'@\3@\1')
return util.pretty_link(url, text=text, **kwargs)
def content_type(resp):
"""Returns a :class:`requests.Response`'s Content-Type, without charset suffix."""
type = resp.headers.get('Content-Type')
if type:
return type.split(';')[0]
def remove_blocklisted(urls):
"""Returns the subset of input URLs that aren't in our domain blocklist.
Args:
urls: sequence of str
Returns: list of str
"""
return [u for u in urls if not util.domain_or_parent_in(
util.domain_from_link(u), DOMAIN_BLOCKLIST)]
def redirect_wrap(url):
"""Returns a URL on our domain that redirects to this URL.
...to satisfy Mastodon's non-standard domain matching requirement. :(
Args:
url: string
https://github.com/snarfed/bridgy-fed/issues/16#issuecomment-424799599
https://github.com/tootsuite/mastodon/pull/6219#issuecomment-429142747
Returns: string, redirect url
"""
if not url or url.startswith(host_url()):
return url
return host_url('/r/') + url
def redirect_unwrap(val):
"""Removes our redirect wrapping from a URL, if it's there.
val may be a string, dict, or list. dicts and lists are unwrapped
recursively.
Strings that aren't wrapped URLs are left unchanged.
Args:
val: string or dict or list
Returns: string, unwrapped url
"""
if isinstance(val, dict):
return {k: redirect_unwrap(v) for k, v in val.items()}
elif isinstance(val, list):
return [redirect_unwrap(v) for v in val]
elif isinstance(val, str):
prefix = host_url('/r/')
if val.startswith(prefix):
unwrapped = val.removeprefix(prefix)
if util.is_web(unwrapped):
return unwrapped
elif val.startswith(host_url()):
path = val.removeprefix(host_url())
if re.match(DOMAIN_RE, path):
return f'https://{path}/'
return val
def webmention_endpoint_cache_key(url):
"""Returns cache key for a cached webmention endpoint for a given URL.
Just the domain by default. If the URL is the home page, ie path is / , the
key includes a / at the end, so that we cache webmention endpoints for home
pages separate from other pages. https://github.com/snarfed/bridgy/issues/701
Example: 'snarfed.org /'
https://github.com/snarfed/bridgy-fed/issues/423
Adapted from bridgy/util.py.
"""
parsed = urllib.parse.urlparse(url)
key = parsed.netloc
if parsed.path in ('', '/'):
key += ' /'
# logger.debug(f'wm cache key {key}')
return key
@cachetools.cached(cachetools.TTLCache(50000, 60 * 60 * 2), # 2h expiration
key=webmention_endpoint_cache_key,
lock=threading.Lock(),
info=True)
def webmention_discover(url, **kwargs):
"""Thin caching wrapper around :func:`web.discover`."""
return webmention.discover(url, **kwargs)