flask: first pass at porting webfinger; incomplete!

incomplete because I also need to port webutil.handlers.XrdOrJrdHandler
flask
Ryan Barrett 2021-07-11 13:39:19 -07:00
rodzic 4fffc073d2
commit 007f8f16fd
Nie znaleziono w bazie danych klucza dla tego podpisu
ID klucza GPG: 6BE31FDF4776E9D4
3 zmienionych plików z 180 dodań i 181 usunięć

23
app.py
Wyświetl plik

@ -1,6 +1,10 @@
"""Main Flask application."""
import logging
from flask import Flask
from flask_caching import Cache
from werkzeug.exceptions import HTTPException
from oauth_dropins.webutil import appengine_info, appengine_config, handlers, util
app = Flask('bridgy-fed')
@ -18,12 +22,17 @@ cache = Cache(app)
@app.errorhandler(Exception)
def handle_exception(e):
"""A Flask error handler that propagates HTTP exceptions into the response."""
code, body = util.interpret_http_exception(e)
if code:
return ((f'Upstream server request failed: {e}' if code in ('502', '504')
else f'HTTP Error {code}: {body}'),
int(code))
return e
"""A Flask error handler that propagates HTTP exceptions into the response."""
code, body = util.interpret_http_exception(e)
if code:
return ((f'Upstream server request failed: {e}' if code in ('502', '504')
else f'HTTP Error {code}: {body}'),
int(code))
logging.error(f'{e.__class__}: {e}')
if isinstance(e, HTTPException):
return e
else:
raise e
import activitypub, add_webmention, logs, redirect, render, salmon, superfeedr, webfinger, webmention

Wyświetl plik

@ -13,19 +13,21 @@ from oauth_dropins.webutil.testutil import requests_response
from oauth_dropins.webutil.util import json_loads
import requests
from app import application
from app import app, cache
import common
import models
from webfinger import UserHandler, WebfingerHandler
from . import testutil
client = app.test_client()
class WebfingerTest(testutil.TestCase):
def setUp(self):
super(WebfingerTest, self).setUp()
UserHandler.get.cache_clear()
WebfingerHandler.get.cache_clear()
app.testing = True
cache.clear()
self.html = """
<body class="h-card">
<a class="u-url" rel="me" href="/about-me">
@ -82,45 +84,41 @@ class WebfingerTest(testutil.TestCase):
}
def test_host_meta_handler_xrd(self):
got = application.get_response('/.well-known/host-meta')
self.assertEqual(200, got.status_int)
got = client.get('/.well-known/host-meta')
self.assertEqual(200, got.status_code)
self.assertEqual('application/xrd+xml; charset=utf-8',
got.headers['Content-Type'])
body = got.body.decode()
self.assertTrue(body.startswith('<?xml'), body)
self.assertTrue(body.startswith('<?xml'), got.get_data(as_text=True))
def test_host_meta_handler_xrds(self):
got = application.get_response('/.well-known/host-meta.xrds')
self.assertEqual(200, got.status_int)
got = client.get('/.well-known/host-meta.xrds')
self.assertEqual(200, got.status_code)
self.assertEqual('application/xrds+xml; charset=utf-8',
got.headers['Content-Type'])
body = got.body.decode()
self.assertTrue(body.startswith('<XRDS'), body)
self.assertTrue(body.startswith('<XRDS'), got.get_data(as_text=True))
def test_host_meta_handler_jrd(self):
got = application.get_response('/.well-known/host-meta.json')
self.assertEqual(200, got.status_int)
got = client.get('/.well-known/host-meta.json')
self.assertEqual(200, got.status_code)
self.assertEqual('application/jrd+json; charset=utf-8',
got.headers['Content-Type'])
body = got.body.decode()
self.assertTrue(body.startswith('{'), body)
self.assertTrue(body.startswith('{'), got.get_data(as_text=True))
@mock.patch('requests.get')
def test_user_handler(self, mock_get):
mock_get.return_value = requests_response(self.html, url = 'https://foo.com/')
mock_get.return_value = requests_response(self.html, url='https://foo.com/')
got = application.get_response('/acct:foo.com',
headers={'Accept': 'application/json'})
self.assertEqual(200, got.status_int)
got = client.get('/acct:foo.com', headers={'Accept': 'application/json'})
self.assertEqual(200, got.status_code)
self.assertEqual('application/jrd+json; charset=utf-8',
got.headers['Content-Type'])
mock_get.assert_called_once_with('http://foo.com/', headers=common.HEADERS,
stream=True, timeout=util.HTTP_TIMEOUT)
self.assertEqual(self.expected_webfinger, json_loads(got.body.decode()))
self.assertEqual(self.expected_webfinger, got.json)
# check that magic key is persistent
again = json_loads(application.get_response(
again = json_loads(client.get(
'/acct:foo.com', headers={'Accept': 'application/json'}).body.decode())
self.assertEqual(self.key.href(), again['magic_keys'][0]['value'])
@ -139,14 +137,13 @@ class WebfingerTest(testutil.TestCase):
""" + self.html
mock_get.return_value = requests_response(html, url = 'https://foo.com/')
got = application.get_response('/acct:foo.com',
headers={'Accept': 'application/json'})
self.assertEqual(200, got.status_int)
got = client.get('/acct:foo.com', headers={'Accept': 'application/json'})
self.assertEqual(200, got.status_code)
self.assertIn({
'rel': 'http://schemas.google.com/g/2010#updates-from',
'type': 'application/atom+xml',
'href': 'https://foo.com/use-this',
}, json_loads(got.body.decode())['links'])
}, got.json['links'])
@mock.patch('requests.get')
def test_user_handler_with_push_header(self, mock_get):
@ -157,13 +154,12 @@ class WebfingerTest(testutil.TestCase):
'<http://a.custom.hub/>; rel="hub"',
})
got = application.get_response('/acct:foo.com',
headers={'Accept': 'application/json'})
self.assertEqual(200, got.status_int)
got = client.get('/acct:foo.com', headers={'Accept': 'application/json'})
self.assertEqual(200, got.status_code)
self.assertIn({
'rel': 'hub',
'href': 'http://a.custom.hub/',
}, json_loads(got.body.decode())['links'])
}, got.json['links'])
@mock.patch('requests.get')
def test_user_handler_no_hcard(self, mock_get):
@ -174,16 +170,16 @@ class WebfingerTest(testutil.TestCase):
</div>
</body>
""")
got = application.get_response('/acct:foo.com')
got = client.get('/acct:foo.com')
mock_get.assert_called_once_with('http://foo.com/', headers=common.HEADERS,
stream=True, timeout=util.HTTP_TIMEOUT)
self.assertEqual(400, got.status_int)
self.assertIn('representative h-card', got.body.decode())
self.assertEqual(400, got.status_code)
self.assertIn('representative h-card', got.get_data(as_text=True))
def test_user_handler_bad_tld(self):
got = application.get_response('/acct:foo.json')
self.assertEqual(404, got.status_int)
self.assertIn("doesn't look like a domain", got.body.decode())
got = client.get('/acct:foo.json')
self.assertEqual(404, got.status_code)
self.assertIn("doesn't look like a domain", got.get_data(as_text=True))
@mock.patch('requests.get')
def test_webfinger_handler(self, mock_get):
@ -193,12 +189,11 @@ class WebfingerTest(testutil.TestCase):
'foo.com', 'http://foo.com/', 'https://foo.com/'):
url = '/.well-known/webfinger?%s' % urllib.parse.urlencode(
{'resource': resource})
got = application.get_response(url, headers={'Accept': 'application/json'})
body = got.body.decode()
self.assertEqual(200, got.status_int, body)
got = client.get(url, headers={'Accept': 'application/json'})
self.assertEqual(200, got.status_code, got.get_data(as_text=True))
self.assertEqual('application/jrd+json; charset=utf-8',
got.headers['Content-Type'])
self.assertEqual(self.expected_webfinger, json_loads(body))
self.assertEqual(self.expected_webfinger, got.json)
@mock.patch('requests.get')
def test_webfinger_handler_custom_username(self, mock_get):
@ -238,9 +233,8 @@ class WebfingerTest(testutil.TestCase):
):
url = '/.well-known/webfinger?%s' % urllib.parse.urlencode(
{'resource': resource})
got = application.get_response(url, headers={'Accept': 'application/json'})
body = got.body.decode()
self.assertEqual(200, got.status_int, body)
got = client.get(url, headers={'Accept': 'application/json'})
self.assertEqual(200, got.status_code, got.get_data(as_text=True))
self.assertEqual('application/jrd+json; charset=utf-8',
got.headers['Content-Type'])
self.assertEqual(self.expected_webfinger, json_loads(body))

Wyświetl plik

@ -7,146 +7,149 @@ Largely based on webfinger-unofficial/user.py.
"""
import datetime
import logging
import urllib.parse
import re
import urllib.parse
from flask import render_template, request
from granary.microformats2 import get_text
import mf2util
from oauth_dropins.webutil import handlers, util
from oauth_dropins.webutil.util import json_dumps
import webapp2
from app import app, cache
import common
from common import error
import models
CACHE_TIME = datetime.timedelta(seconds=15)
NON_TLDS = frozenset(('html', 'json', 'php', 'xml'))
class UserHandler(handlers.XrdOrJrdHandler):
@app.route('/acct:<string:domain>')
@cache.cached(
CACHE_TIME.total_seconds(),
make_cache_key=lambda domain: f'{request.path} {request.headers.get("Accept")}')
def user(domain):
"""Fetches a site's home page, converts its mf2 to WebFinger, and serves."""
JRD_TEMPLATE = False
return _user(domain, None)
@handlers.cache_response(CACHE_TIME, headers=['Accept'])
def get(self, *args, **kwargs):
logging.debug(f'Headers: {list(self.request.headers.items())}')
return super(UserHandler, self).get(*args, **kwargs)
def template_prefix(self):
return 'templates/webfinger_user'
def _user(domain, url):
if not re.match(common.DOMAIN_RE, domain):
return error(f'{domain} is not a domain', status=404)
def template_vars(self, domain, url=None):
assert domain
logging.debug(f'Headers: {list(request.headers.items())}')
if domain.split('.')[-1] in NON_TLDS:
self.error("%s doesn't look like a domain" % domain, status=404)
if domain.split('.')[-1] in NON_TLDS:
return error(f"{domain} doesn't look like a domain", status=404)
# find representative h-card. try url, then url's home page, then domain
urls = ['http://%s/' % domain]
if url:
urls = [url, urllib.parse.urljoin(url, '/')] + urls
# find representative h-card. try url, then url's home page, then domain
urls = [f'http://{domain}/']
if url:
urls = [url, urllib.parse.urljoin(url, '/')] + urls
for candidate in urls:
resp = common.requests_get(candidate)
parsed = util.parse_html(resp)
mf2 = util.parse_mf2(parsed, url=resp.url)
# logging.debug('Parsed mf2 for %s: %s', resp.url, json_dumps(mf2, indent=2))
hcard = mf2util.representative_hcard(mf2, resp.url)
if hcard:
logging.info('Representative h-card: %s', json_dumps(hcard, indent=2))
for candidate in urls:
resp = common.requests_get(candidate)
parsed = util.parse_html(resp)
mf2 = util.parse_mf2(parsed, url=resp.url)
# logging.debug('Parsed mf2 for %s: %s', resp.url, json_dumps(mf2, indent=2))
hcard = mf2util.representative_hcard(mf2, resp.url)
if hcard:
logging.info(f'Representative h-card: {json_dumps(hcard, indent=2)}')
break
else:
return error(f"didn't find a representative h-card (http://microformats.org/wiki/representative-hcard-parsing) on {resp.url}")
logging.info(f'Generating WebFinger data for {domain}')
key = models.MagicKey.get_or_create(domain)
props = hcard.get('properties', {})
urls = util.dedupe_urls(props.get('url', []) + [resp.url])
canonical_url = urls[0]
acct = f'{domain}@{domain}'
for url in urls:
if url.startswith('acct:'):
urluser, urldomain = util.parse_acct_uri(url)
if urldomain == domain:
acct = f'{urluser}@{domain}'
logging.info(f'Found custom username: acct:{acct}')
break
else:
self.error("""\
Couldn't find a representative h-card (http://microformats.org/wiki/representative-hcard-parsing) on %s""" % resp.url)
logging.info('Generating WebFinger data for %s', domain)
key = models.MagicKey.get_or_create(domain)
props = hcard.get('properties', {})
urls = util.dedupe_urls(props.get('url', []) + [resp.url])
canonical_url = urls[0]
acct = '%s@%s' % (domain, domain)
for url in urls:
if url.startswith('acct:'):
urluser, urldomain = util.parse_acct_uri(url)
if urldomain == domain:
acct = '%s@%s' % (urluser, domain)
logging.info('Found custom username: acct:%s', acct)
break
# discover atom feed, if any
atom = parsed.find('link', rel='alternate', type=common.CONTENT_TYPE_ATOM)
if atom and atom['href']:
atom = urllib.parse.urljoin(resp.url, atom['href'])
else:
atom = 'https://granary.io/url?' + urllib.parse.urlencode({
'input': 'html',
'output': 'atom',
'url': resp.url,
'hub': resp.url,
})
# discover PuSH, if any
for link in resp.headers.get('Link', '').split(','):
match = common.LINK_HEADER_RE.match(link)
if match and match.group(2) == 'hub':
hub = match.group(1)
else:
hub = 'https://bridgy-fed.superfeedr.com/'
# generate webfinger content
data = util.trim_nulls({
'subject': 'acct:' + acct,
'aliases': urls,
'magic_keys': [{'value': key.href()}],
'links': sum(([{
'rel': 'http://webfinger.net/rel/profile-page',
'type': 'text/html',
'href': url,
}] for url in urls if url.startswith("http")), []) + [{
'rel': 'http://webfinger.net/rel/avatar',
'href': get_text(url),
} for url in props.get('photo', [])] + [{
'rel': 'canonical_uri',
'type': 'text/html',
'href': canonical_url,
},
# ActivityPub
{
'rel': 'self',
'type': common.CONTENT_TYPE_AS2,
# WARNING: in python 2 sometimes request.host_url lost port,
# http://localhost:8080 would become just http://localhost. no
# clue how or why. pay attention here if that happens again.
'href': f'{self.request.host_url}{domain}',
}, {
'rel': 'inbox',
'type': common.CONTENT_TYPE_AS2,
'href': f'{self.request.host_url}{domain}/inbox',
},
# OStatus
{
'rel': 'http://schemas.google.com/g/2010#updates-from',
'type': common.CONTENT_TYPE_ATOM,
'href': atom,
}, {
'rel': 'hub',
'href': hub,
}, {
'rel': 'magic-public-key',
'href': key.href(),
}, {
'rel': 'salmon',
'href': f'{self.request.host_url}{domain}/salmon',
}]
# discover atom feed, if any
atom = parsed.find('link', rel='alternate', type=common.CONTENT_TYPE_ATOM)
if atom and atom['href']:
atom = urllib.parse.urljoin(resp.url, atom['href'])
else:
atom = 'https://granary.io/url?' + urllib.parse.urlencode({
'input': 'html',
'output': 'atom',
'url': resp.url,
'hub': resp.url,
})
logging.info('Returning WebFinger data: %s', json_dumps(data, indent=2))
return data
# discover PuSH, if any
for link in resp.headers.get('Link', '').split(','):
match = common.LINK_HEADER_RE.match(link)
if match and match.group(2) == 'hub':
hub = match.group(1)
else:
hub = 'https://bridgy-fed.superfeedr.com/'
# generate webfinger content
data = util.trim_nulls({
'subject': 'acct:' + acct,
'aliases': urls,
'magic_keys': [{'value': key.href()}],
'links': sum(([{
'rel': 'http://webfinger.net/rel/profile-page',
'type': 'text/html',
'href': url,
}] for url in urls if url.startswith("http")), []) + [{
'rel': 'http://webfinger.net/rel/avatar',
'href': get_text(url),
} for url in props.get('photo', [])] + [{
'rel': 'canonical_uri',
'type': 'text/html',
'href': canonical_url,
},
# ActivityPub
{
'rel': 'self',
'type': common.CONTENT_TYPE_AS2,
# WARNING: in python 2 sometimes request.host_url lost port,
# http://localhost:8080 would become just http://localhost. no
# clue how or why. pay attention here if that happens again.
'href': f'{request.host_url}{domain}',
}, {
'rel': 'inbox',
'type': common.CONTENT_TYPE_AS2,
'href': f'{request.host_url}{domain}/inbox',
},
# OStatus
{
'rel': 'http://schemas.google.com/g/2010#updates-from',
'type': common.CONTENT_TYPE_ATOM,
'href': atom,
}, {
'rel': 'hub',
'href': hub,
}, {
'rel': 'magic-public-key',
'href': key.href(),
}, {
'rel': 'salmon',
'href': f'{request.host_url}{domain}/salmon',
}]
})
logging.info(f'Returning WebFinger data: {json_dumps(data, indent=2)}')
return render_template('webfinger_user.xrd', **data)
class WebfingerHandler(UserHandler):
@app.route('/.well-known/webfinger')
def webfinger():
"""Handles Webfinger requests.
https://webfinger.net/
@ -154,23 +157,16 @@ class WebfingerHandler(UserHandler):
Supports both JRD and XRD; defaults to JRD.
https://tools.ietf.org/html/rfc7033#section-4
"""
def template_vars(self):
resource = common.get_required_param('resource')
try:
user, domain = util.parse_acct_uri(resource)
if domain in common.DOMAINS:
domain = user
except ValueError:
domain = urllib.parse.urlparse(resource).netloc or resource
resource = common.get_required_param('resource')
try:
user, domain = util.parse_acct_uri(resource)
if domain in common.DOMAINS:
domain = user
except ValueError:
domain = urllib.parse.urlparse(resource).netloc or resource
url = None
if resource.startswith('http://') or resource.startswith('https://'):
url = resource
url = None
if resource.startswith('http://') or resource.startswith('https://'):
url = resource
return super(WebfingerHandler, self).template_vars(domain, url=url)
ROUTES = [
(r'/acct:%s/?' % common.DOMAIN_RE, UserHandler),
('/.well-known/webfinger', WebfingerHandler),
] + handlers.HOST_META_ROUTES
return _user(domain, url)