kopia lustrzana https://github.com/snarfed/bridgy-fed
208 wiersze
5.9 KiB
Python
208 wiersze
5.9 KiB
Python
# coding=utf-8
|
|
"""Misc common utilities.
|
|
"""
|
|
import copy
|
|
from datetime import timedelta
|
|
import logging
|
|
import re
|
|
import urllib.parse
|
|
|
|
from flask import abort, make_response, request
|
|
from granary import as1, as2, microformats2
|
|
import mf2util
|
|
from oauth_dropins.webutil import util
|
|
from oauth_dropins.webutil.appengine_info import DEBUG
|
|
from oauth_dropins.webutil.util import json_dumps, json_loads
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
DOMAIN_RE = r'[^/:]+\.[^/:]+'
|
|
TLD_BLOCKLIST = ('7z', 'asp', 'aspx', 'gif', 'html', 'ico', 'jpg', 'jpeg', 'js',
|
|
'json', 'php', 'png', 'rar', 'txt', 'yaml', 'yml', 'zip')
|
|
|
|
CONTENT_TYPE_HTML = 'text/html; charset=utf-8'
|
|
|
|
PRIMARY_DOMAIN = 'fed.brid.gy'
|
|
OTHER_DOMAINS = (
|
|
'bridgy-federated.appspot.com',
|
|
'bridgy-federated.uc.r.appspot.com',
|
|
)
|
|
LOCAL_DOMAINS = (
|
|
'localhost',
|
|
'localhost:8080',
|
|
'my.dev.com:8080',
|
|
)
|
|
DOMAINS = (PRIMARY_DOMAIN,) + OTHER_DOMAINS + LOCAL_DOMAINS
|
|
# TODO: unify with Bridgy's
|
|
DOMAIN_BLOCKLIST = frozenset((
|
|
# https://github.com/snarfed/bridgy-fed/issues/348
|
|
'aaronparecki.com',
|
|
'facebook.com',
|
|
'fb.com',
|
|
't.co',
|
|
'twitter.com',
|
|
) + DOMAINS)
|
|
|
|
CACHE_TIME = timedelta(seconds=60)
|
|
|
|
|
|
def host_url(path_query=None):
|
|
base = request.host_url
|
|
if (util.domain_or_parent_in(request.host, OTHER_DOMAINS) or
|
|
# when running locally against prod datastore
|
|
(not DEBUG and request.host in LOCAL_DOMAINS)):
|
|
base = f'https://{PRIMARY_DOMAIN}'
|
|
|
|
return urllib.parse.urljoin(base, path_query)
|
|
|
|
|
|
def error(msg, status=400):
|
|
"""Like flask_util.error, but wraps body in JSON."""
|
|
logger.info(f'Returning {status}: {msg}')
|
|
abort(status, response=make_response({'error': msg}, status))
|
|
|
|
|
|
def pretty_link(url, text=None, user=None):
|
|
"""Wrapper around util.pretty_link() that converts Mastodon user URLs to @-@.
|
|
|
|
Eg for URLs like https://mastodon.social/@foo and
|
|
https://mastodon.social/users/foo, defaults text to @foo@mastodon.social if
|
|
it's not provided.
|
|
|
|
Args:
|
|
url: str
|
|
text: str
|
|
user: :class:`User`, optional, user for the current request
|
|
"""
|
|
if user and user.is_homepage(url):
|
|
return user.user_page_link()
|
|
|
|
if text is None:
|
|
match = re.match(r'https?://([^/]+)/(@|users/)([^/]+)$', url)
|
|
if match:
|
|
text = match.expand(r'@\3@\1')
|
|
|
|
return util.pretty_link(url, text=text)
|
|
|
|
|
|
def content_type(resp):
|
|
"""Returns a :class:`requests.Response`'s Content-Type, without charset suffix."""
|
|
type = resp.headers.get('Content-Type')
|
|
if type:
|
|
return type.split(';')[0]
|
|
|
|
|
|
def remove_blocklisted(urls):
|
|
"""Returns the subset of input URLs that aren't in our domain blocklist.
|
|
|
|
Args:
|
|
urls: sequence of str
|
|
|
|
Returns: list of str
|
|
"""
|
|
return [u for u in urls if not util.domain_or_parent_in(
|
|
util.domain_from_link(u), DOMAIN_BLOCKLIST)]
|
|
|
|
|
|
def redirect_wrap(url):
|
|
"""Returns a URL on our domain that redirects to this URL.
|
|
|
|
...to satisfy Mastodon's non-standard domain matching requirement. :(
|
|
|
|
Args:
|
|
url: string
|
|
|
|
https://github.com/snarfed/bridgy-fed/issues/16#issuecomment-424799599
|
|
https://github.com/tootsuite/mastodon/pull/6219#issuecomment-429142747
|
|
|
|
Returns: string, redirect url
|
|
"""
|
|
if not url:
|
|
return url
|
|
|
|
prefix = host_url('/r/')
|
|
if url.startswith(prefix):
|
|
return url
|
|
|
|
return prefix + url
|
|
|
|
|
|
def redirect_unwrap(val):
|
|
"""Removes our redirect wrapping from a URL, if it's there.
|
|
|
|
val may be a string, dict, or list. dicts and lists are unwrapped
|
|
recursively.
|
|
|
|
Strings that aren't wrapped URLs are left unchanged.
|
|
|
|
Args:
|
|
val: string or dict or list
|
|
|
|
Returns: string, unwrapped url
|
|
"""
|
|
if isinstance(val, dict):
|
|
return {k: redirect_unwrap(v) for k, v in val.items()}
|
|
|
|
elif isinstance(val, list):
|
|
return [redirect_unwrap(v) for v in val]
|
|
|
|
elif isinstance(val, str):
|
|
prefix = host_url('/r/')
|
|
if val.startswith(prefix):
|
|
unwrapped = val.removeprefix(prefix)
|
|
if util.is_web(unwrapped):
|
|
return unwrapped
|
|
elif val.startswith(host_url()):
|
|
path = val.removeprefix(host_url())
|
|
if re.match(DOMAIN_RE, path):
|
|
return f'https://{path}/'
|
|
|
|
return val
|
|
|
|
|
|
def actor(user):
|
|
"""Fetches a home page, converts its representative h-card to AS2 actor.
|
|
|
|
Args:
|
|
user: :class:`User`
|
|
|
|
Returns: (dict mf2 item, dict AS1 actor, dict AS2 actor)
|
|
"""
|
|
assert user
|
|
|
|
domain = user.key.id()
|
|
try:
|
|
mf2 = util.fetch_mf2(user.homepage, gateway=True)
|
|
except ValueError as e:
|
|
error(f"Couldn't fetch {user.homepage}: {e}")
|
|
|
|
hcard = mf2util.representative_hcard(mf2, mf2['url'])
|
|
logger.info(f'Representative h-card: {json_dumps(hcard, indent=2)}')
|
|
if not hcard:
|
|
error(f"Couldn't find a representative h-card (http://microformats.org/wiki/representative-hcard-parsing) on {mf2['url']}")
|
|
|
|
actor_as1 = microformats2.json_to_object(hcard, rel_urls=mf2.get('rel-urls'))
|
|
# TODO: fix circular dependency
|
|
import activitypub
|
|
actor_as2 = activitypub.postprocess_as2(as2.from_as1(actor_as1), user=user)
|
|
# TODO: unify with activitypub.actor()
|
|
actor_as2.update({
|
|
'id': host_url(domain),
|
|
# This has to be the domain for Mastodon etc interop! It seems like it
|
|
# should be the custom username from the acct: u-url in their h-card,
|
|
# but that breaks Mastodon's Webfinger discovery. Background:
|
|
# https://github.com/snarfed/bridgy-fed/issues/302#issuecomment-1324305460
|
|
# https://github.com/snarfed/bridgy-fed/issues/77
|
|
'preferredUsername': domain,
|
|
'inbox': host_url(f'{domain}/inbox'),
|
|
'outbox': host_url(f'{domain}/outbox'),
|
|
'following': host_url(f'{domain}/following'),
|
|
'followers': host_url(f'{domain}/followers'),
|
|
'endpoints': {
|
|
'sharedInbox': host_url('inbox'),
|
|
},
|
|
})
|
|
|
|
logger.info(f'Generated AS2 actor: {json_dumps(actor_as2, indent=2)}')
|
|
return hcard, actor_as1, actor_as2
|