kopia lustrzana https://github.com/snarfed/bridgy-fed
rodzic
4941f14fed
commit
4f232ac732
67
redirect.py
67
redirect.py
|
@ -25,6 +25,7 @@ import activitypub
|
|||
from flask_app import app, cache
|
||||
from common import CACHE_TIME, CONTENT_TYPE_HTML
|
||||
from models import Object, User
|
||||
from webmention import Webmention
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
@ -38,9 +39,11 @@ _negotiator = ContentNegotiator(acceptable=[
|
|||
@app.get(r'/r/<path:to>')
|
||||
@flask_util.cached(cache, CACHE_TIME, headers=['Accept'])
|
||||
def redir(to):
|
||||
"""301 redirect to the embedded fully qualified URL.
|
||||
"""Either redirect to a given URL or convert it to another format.
|
||||
|
||||
e.g. redirects /r/https://foo.com/bar?baz to https://foo.com/bar?baz
|
||||
E.g. redirects /r/https://foo.com/bar?baz to https://foo.com/bar?baz, or if
|
||||
it's requested with AS2 conneg in the Accept header, fetches and converts
|
||||
and serves it as AS2.
|
||||
"""
|
||||
if request.args:
|
||||
to += '?' + urllib.parse.urlencode(request.args)
|
||||
|
@ -51,20 +54,10 @@ def redir(to):
|
|||
if not util.is_web(to):
|
||||
error(f'Expected fully qualified URL; got {to}')
|
||||
|
||||
# check that we've seen this domain before so we're not an open redirect
|
||||
domains = set((util.domain_from_link(to, minimize=True),
|
||||
util.domain_from_link(to, minimize=False),
|
||||
urllib.parse.urlparse(to).hostname))
|
||||
for domain in domains:
|
||||
if domain:
|
||||
g.user = User.get_by_id(domain)
|
||||
if g.user:
|
||||
logger.info(f'Found User for domain {domain}')
|
||||
break
|
||||
else:
|
||||
return f'No user found for any of {domains}', 404
|
||||
to_domain = urllib.parse.urlparse(to).hostname
|
||||
|
||||
# check conneg, serve AS2 if requested
|
||||
# check conneg
|
||||
accept_as2 = False
|
||||
accept = request.headers.get('Accept')
|
||||
if accept:
|
||||
try:
|
||||
|
@ -73,18 +66,38 @@ def redir(to):
|
|||
# work around https://github.com/CottageLabs/negotiator/issues/6
|
||||
negotiated = None
|
||||
if negotiated:
|
||||
type = str(negotiated.content_type)
|
||||
if type in (as2.CONTENT_TYPE, as2.CONTENT_TYPE_LD):
|
||||
# load from the datastore
|
||||
obj = Object.get_by_id(to)
|
||||
if not obj or obj.deleted:
|
||||
return f'Object not found: {to}', 404
|
||||
ret = activitypub.postprocess_as2(as2.from_as1(obj.as1))
|
||||
logger.info(f'Returning: {json_dumps(ret, indent=2)}')
|
||||
return ret, {
|
||||
'Content-Type': type,
|
||||
'Access-Control-Allow-Origin': '*',
|
||||
}
|
||||
accept_type = str(negotiated.content_type)
|
||||
if accept_type in (as2.CONTENT_TYPE, as2.CONTENT_TYPE_LD):
|
||||
accept_as2 = True
|
||||
|
||||
# check that we've seen this domain before so we're not an open redirect
|
||||
domains = set((util.domain_from_link(to, minimize=True),
|
||||
util.domain_from_link(to, minimize=False),
|
||||
to_domain))
|
||||
for domain in domains:
|
||||
if domain:
|
||||
g.user = User.get_by_id(domain)
|
||||
if g.user:
|
||||
logger.info(f'Found User for domain {domain}')
|
||||
break
|
||||
else:
|
||||
if accept_as2:
|
||||
# TODO: this is a kind of gross hack, should we do it differently?
|
||||
g.user = User(id=to_domain)
|
||||
else:
|
||||
return f'No user found for any of {domains}', 404
|
||||
|
||||
if accept_as2:
|
||||
# AS2 requested, fetch and convert and serve
|
||||
obj = Webmention.load(to)
|
||||
if not obj or obj.deleted:
|
||||
return f'Object not found: {to}', 404
|
||||
ret = activitypub.postprocess_as2(as2.from_as1(obj.as1))
|
||||
logger.info(f'Returning: {json_dumps(ret, indent=2)}')
|
||||
return ret, {
|
||||
'Content-Type': accept_type,
|
||||
'Access-Control-Allow-Origin': '*',
|
||||
}
|
||||
|
||||
# redirect
|
||||
logger.info(f'redirecting to {to}')
|
||||
|
|
|
@ -1,21 +1,29 @@
|
|||
"""Unit tests for redirect.py.
|
||||
"""
|
||||
import copy
|
||||
from unittest.mock import patch
|
||||
|
||||
from granary import as2
|
||||
from oauth_dropins.webutil.testutil import requests_response
|
||||
import requests
|
||||
|
||||
from flask_app import app, cache
|
||||
from flask_app import app, cache, g
|
||||
from common import redirect_unwrap
|
||||
from models import Object, User
|
||||
from .test_webmention import REPOST_AS2
|
||||
from .test_webmention import ACTOR_AS2, REPOST_AS2, REPOST_HTML
|
||||
from . import testutil
|
||||
|
||||
REPOST_AS2 = copy.deepcopy(REPOST_AS2)
|
||||
del REPOST_AS2['cc']
|
||||
|
||||
class RedirectTest(testutil.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
super().setUp()
|
||||
self.make_user('user.com')
|
||||
self.user = self.make_user('user.com')
|
||||
|
||||
with app.test_request_context('/'):
|
||||
g.user = None
|
||||
|
||||
def test_redirect(self):
|
||||
got = self.client.get('/r/https://user.com/bar?baz=baj&biff')
|
||||
|
@ -30,7 +38,7 @@ class RedirectTest(testutil.TestCase):
|
|||
got = self.client.get('/r/')
|
||||
self.assertEqual(404, got.status_code)
|
||||
|
||||
def test_redirect_no_magic_key_for_domain(self):
|
||||
def test_redirect_html_no_user(self):
|
||||
got = self.client.get('/r/http://bar.com/baz')
|
||||
self.assertEqual(404, got.status_code)
|
||||
|
||||
|
@ -49,6 +57,18 @@ class RedirectTest(testutil.TestCase):
|
|||
def test_as2_ld(self):
|
||||
self._test_as2(as2.CONTENT_TYPE_LD)
|
||||
|
||||
def test_as2_no_user(self):
|
||||
self.user.key.delete()
|
||||
self._test_as2(as2.CONTENT_TYPE)
|
||||
|
||||
@patch('requests.get')
|
||||
def test_as2_fetch_post(self, mock_get):
|
||||
mock_get.return_value = requests_response(REPOST_HTML)
|
||||
self._test_as2(as2.CONTENT_TYPE, stored_object=False, expected={
|
||||
**REPOST_AS2,
|
||||
'actor': ACTOR_AS2,
|
||||
})
|
||||
|
||||
def test_accept_header_cache_key(self):
|
||||
app.config['CACHE_TYPE'] = 'SimpleCache'
|
||||
cache.init_app(app)
|
||||
|
@ -70,16 +90,17 @@ class RedirectTest(testutil.TestCase):
|
|||
self.assertEqual(301, resp.status_code)
|
||||
self.assertEqual('https://user.com/bar', resp.headers['Location'])
|
||||
|
||||
def _test_as2(self, content_type):
|
||||
with app.test_request_context('/'):
|
||||
self.obj = Object(id='https://user.com/repost', as2=REPOST_AS2).put()
|
||||
def _test_as2(self, content_type, stored_object=True, expected=REPOST_AS2):
|
||||
if stored_object:
|
||||
with app.test_request_context('/'):
|
||||
self.obj = Object(id='https://user.com/repost', as2=REPOST_AS2).put()
|
||||
|
||||
resp = self.client.get('/r/https://user.com/repost',
|
||||
headers={'Accept': content_type})
|
||||
self.assertEqual(200, resp.status_code, resp.get_data(as_text=True))
|
||||
self.assertEqual(content_type, resp.content_type)
|
||||
|
||||
self.assert_equals(REPOST_AS2, resp.json)
|
||||
self.assert_equals(expected, resp.json)
|
||||
|
||||
def test_as2_deleted(self):
|
||||
with app.test_request_context('/'):
|
||||
|
|
Ładowanie…
Reference in New Issue