kopia lustrzana https://github.com/snarfed/bridgy-fed
flask: port /r/, start to port common
rodzic
0b7388fed3
commit
5b8699bec1
1
app.py
1
app.py
|
@ -9,6 +9,7 @@ app.config.from_mapping(
|
|||
ENV='development' if appengine_info.DEBUG else 'PRODUCTION',
|
||||
CACHE_TYPE='SimpleCache',
|
||||
SECRET_KEY=util.read('flask_secret_key'),
|
||||
JSONIFY_PRETTYPRINT_REGULAR=True,
|
||||
)
|
||||
app.wsgi_app = handlers.ndb_context_middleware(
|
||||
app.wsgi_app, client=appengine_config.ndb_client)
|
||||
|
|
321
common.py
321
common.py
|
@ -6,11 +6,12 @@ import logging
|
|||
import re
|
||||
import urllib.parse
|
||||
|
||||
from flask import request
|
||||
from granary import as2
|
||||
from oauth_dropins.webutil import handlers, util, webmention
|
||||
import requests
|
||||
from webob import exc
|
||||
from werkzeug.exceptions import BadRequest
|
||||
from werkzeug.exceptions import abort
|
||||
|
||||
import common
|
||||
from models import Response
|
||||
|
@ -153,25 +154,27 @@ def content_type(resp):
|
|||
|
||||
|
||||
def get_required_param(request, name):
|
||||
try:
|
||||
val = request.args.get(name)
|
||||
except (UnicodeDecodeError, UnicodeEncodeError) as e:
|
||||
raise BadRequest(f"Couldn't decode query parameters as UTF-8: {e}")
|
||||
try:
|
||||
val = request.args.get(name)
|
||||
except (UnicodeDecodeError, UnicodeEncodeError) as e:
|
||||
abort(400, f"Couldn't decode query parameters as UTF-8: {e}")
|
||||
|
||||
if not val:
|
||||
raise BadRequest(f'Missing required parameter: {name}')
|
||||
if not val:
|
||||
abort(400, f'Missing required parameter: {name}')
|
||||
|
||||
return val
|
||||
return val
|
||||
|
||||
|
||||
def error(msg, status=None, exc_info=False):
|
||||
if not status:
|
||||
status = 400
|
||||
logging.info('Returning %s: %s' % (status, msg), exc_info=exc_info)
|
||||
abort(status, msg)
|
||||
|
||||
|
||||
class Handler(handlers.ModernHandler):
|
||||
"""Common request handler base class with lots of utilities."""
|
||||
|
||||
def error(self, msg, status=None, exc_info=False):
|
||||
if not status:
|
||||
status = 400
|
||||
logging.info('Returning %s: %s' % (status, msg), exc_info=exc_info)
|
||||
self.abort(status, msg)
|
||||
error = error
|
||||
|
||||
def send_webmentions(self, activity_wrapped, proxy=None, **response_props):
|
||||
"""Sends webmentions for an incoming Salmon slap or ActivityPub inbox delivery.
|
||||
|
@ -247,177 +250,181 @@ class Handler(handlers.ModernHandler):
|
|||
msg = 'Errors:\n' + '\n'.join(str(e) for e in errors)
|
||||
self.error(msg, status=errors[0].get('http_status'))
|
||||
|
||||
def postprocess_as2(self, activity, target=None, key=None):
|
||||
"""Prepare an AS2 object to be served or sent via ActivityPub.
|
||||
|
||||
Args:
|
||||
activity: dict, AS2 object or activity
|
||||
target: dict, AS2 object, optional. The target of activity's inReplyTo or
|
||||
Like/Announce/etc object, if any.
|
||||
key: MagicKey, optional. populated into publicKey field if provided.
|
||||
"""
|
||||
type = activity.get('type')
|
||||
def postprocess_as2(activity, target=None, key=None):
|
||||
"""Prepare an AS2 object to be served or sent via ActivityPub.
|
||||
|
||||
# actor objects
|
||||
if type == 'Person':
|
||||
self.postprocess_as2_actor(activity)
|
||||
if not activity.get('publicKey'):
|
||||
# underspecified, inferred from this issue and Mastodon's implementation:
|
||||
# https://github.com/w3c/activitypub/issues/203#issuecomment-297553229
|
||||
# https://github.com/tootsuite/mastodon/blob/bc2c263504e584e154384ecc2d804aeb1afb1ba3/app/services/activitypub/process_account_service.rb#L77
|
||||
activity.update({
|
||||
'publicKey': {
|
||||
'id': activity.get('preferredUsername'),
|
||||
'publicKeyPem': key.public_pem().decode(),
|
||||
},
|
||||
'@context': (util.get_list(activity, '@context') +
|
||||
['https://w3id.org/security/v1']),
|
||||
})
|
||||
return activity
|
||||
Args:
|
||||
activity: dict, AS2 object or activity
|
||||
target: dict, AS2 object, optional. The target of activity's inReplyTo or
|
||||
Like/Announce/etc object, if any.
|
||||
key: MagicKey, optional. populated into publicKey field if provided.
|
||||
"""
|
||||
type = activity.get('type')
|
||||
|
||||
for actor in (util.get_list(activity, 'attributedTo') +
|
||||
util.get_list(activity, 'actor')):
|
||||
self.postprocess_as2_actor(actor)
|
||||
# actor objects
|
||||
if type == 'Person':
|
||||
postprocess_as2_actor(activity)
|
||||
if not activity.get('publicKey'):
|
||||
# underspecified, inferred from this issue and Mastodon's implementation:
|
||||
# https://github.com/w3c/activitypub/issues/203#issuecomment-297553229
|
||||
# https://github.com/tootsuite/mastodon/blob/bc2c263504e584e154384ecc2d804aeb1afb1ba3/app/services/activitypub/process_account_service.rb#L77
|
||||
activity.update({
|
||||
'publicKey': {
|
||||
'id': activity.get('preferredUsername'),
|
||||
'publicKeyPem': key.public_pem().decode(),
|
||||
},
|
||||
'@context': (util.get_list(activity, '@context') +
|
||||
['https://w3id.org/security/v1']),
|
||||
})
|
||||
return activity
|
||||
|
||||
# inReplyTo: singly valued, prefer id over url
|
||||
target_id = target.get('id') if target else None
|
||||
in_reply_to = activity.get('inReplyTo')
|
||||
if in_reply_to:
|
||||
if target_id:
|
||||
activity['inReplyTo'] = target_id
|
||||
elif isinstance(in_reply_to, list):
|
||||
if len(in_reply_to) > 1:
|
||||
logging.warning(
|
||||
"AS2 doesn't support multiple inReplyTo URLs! "
|
||||
'Only using the first: %s' % in_reply_to[0])
|
||||
activity['inReplyTo'] = in_reply_to[0]
|
||||
for actor in (util.get_list(activity, 'attributedTo') +
|
||||
util.get_list(activity, 'actor')):
|
||||
postprocess_as2_actor(actor)
|
||||
|
||||
# Mastodon evidently requires a Mention tag for replies to generate a
|
||||
# notification to the original post's author. not required for likes,
|
||||
# reposts, etc. details:
|
||||
# https://github.com/snarfed/bridgy-fed/issues/34
|
||||
if target:
|
||||
for to in (util.get_list(target, 'attributedTo') +
|
||||
util.get_list(target, 'actor')):
|
||||
if isinstance(to, dict):
|
||||
to = to.get('url') or to.get('id')
|
||||
if to:
|
||||
activity.setdefault('tag', []).append({
|
||||
'type': 'Mention',
|
||||
'href': to,
|
||||
})
|
||||
# inReplyTo: singly valued, prefer id over url
|
||||
target_id = target.get('id') if target else None
|
||||
in_reply_to = activity.get('inReplyTo')
|
||||
if in_reply_to:
|
||||
if target_id:
|
||||
activity['inReplyTo'] = target_id
|
||||
elif isinstance(in_reply_to, list):
|
||||
if len(in_reply_to) > 1:
|
||||
logging.warning(
|
||||
"AS2 doesn't support multiple inReplyTo URLs! "
|
||||
'Only using the first: %s' % in_reply_to[0])
|
||||
activity['inReplyTo'] = in_reply_to[0]
|
||||
|
||||
# activity objects (for Like, Announce, etc): prefer id over url
|
||||
obj = activity.get('object')
|
||||
if obj:
|
||||
if isinstance(obj, dict) and not obj.get('id'):
|
||||
obj['id'] = target_id or obj.get('url')
|
||||
elif target_id and obj != target_id:
|
||||
activity['object'] = target_id
|
||||
# Mastodon evidently requires a Mention tag for replies to generate a
|
||||
# notification to the original post's author. not required for likes,
|
||||
# reposts, etc. details:
|
||||
# https://github.com/snarfed/bridgy-fed/issues/34
|
||||
if target:
|
||||
for to in (util.get_list(target, 'attributedTo') +
|
||||
util.get_list(target, 'actor')):
|
||||
if isinstance(to, dict):
|
||||
to = to.get('url') or to.get('id')
|
||||
if to:
|
||||
activity.setdefault('tag', []).append({
|
||||
'type': 'Mention',
|
||||
'href': to,
|
||||
})
|
||||
|
||||
# id is required for most things. default to url if it's not set.
|
||||
if not activity.get('id'):
|
||||
activity['id'] = activity.get('url')
|
||||
# activity objects (for Like, Announce, etc): prefer id over url
|
||||
obj = activity.get('object')
|
||||
if obj:
|
||||
if isinstance(obj, dict) and not obj.get('id'):
|
||||
obj['id'] = target_id or obj.get('url')
|
||||
elif target_id and obj != target_id:
|
||||
activity['object'] = target_id
|
||||
|
||||
# TODO: find a better way to check this, sometimes or always?
|
||||
# removed for now since it fires on posts without u-id or u-url, eg
|
||||
# https://chrisbeckstrom.com/2018/12/27/32551/
|
||||
# assert activity.get('id') or (isinstance(obj, dict) and obj.get('id'))
|
||||
# id is required for most things. default to url if it's not set.
|
||||
if not activity.get('id'):
|
||||
activity['id'] = activity.get('url')
|
||||
|
||||
activity['id'] = self.redirect_wrap(activity.get('id'))
|
||||
activity['url'] = self.redirect_wrap(activity.get('url'))
|
||||
# TODO: find a better way to check this, sometimes or always?
|
||||
# removed for now since it fires on posts without u-id or u-url, eg
|
||||
# https://chrisbeckstrom.com/2018/12/27/32551/
|
||||
# assert activity.get('id') or (isinstance(obj, dict) and obj.get('id'))
|
||||
|
||||
# copy image(s) into attachment(s). may be Mastodon-specific.
|
||||
# https://github.com/snarfed/bridgy-fed/issues/33#issuecomment-440965618
|
||||
obj_or_activity = obj if isinstance(obj, dict) else activity
|
||||
obj_or_activity.setdefault('attachment', []).extend(
|
||||
obj_or_activity.get('image', []))
|
||||
activity['id'] = redirect_wrap(activity.get('id'))
|
||||
activity['url'] = redirect_wrap(activity.get('url'))
|
||||
|
||||
# cc public and target's author(s) and recipients
|
||||
# https://www.w3.org/TR/activitystreams-vocabulary/#audienceTargeting
|
||||
# https://w3c.github.io/activitypub/#delivery
|
||||
if type in as2.TYPE_TO_VERB or type in ('Article', 'Note'):
|
||||
recips = [AS2_PUBLIC_AUDIENCE]
|
||||
if target:
|
||||
recips += itertools.chain(*(util.get_list(target, field) for field in
|
||||
('actor', 'attributedTo', 'to', 'cc')))
|
||||
activity['cc'] = util.dedupe_urls(util.get_url(recip) or recip.get('id')
|
||||
for recip in recips)
|
||||
# copy image(s) into attachment(s). may be Mastodon-specific.
|
||||
# https://github.com/snarfed/bridgy-fed/issues/33#issuecomment-440965618
|
||||
obj_or_activity = obj if isinstance(obj, dict) else activity
|
||||
obj_or_activity.setdefault('attachment', []).extend(
|
||||
obj_or_activity.get('image', []))
|
||||
|
||||
# wrap articles and notes in a Create activity
|
||||
if type in ('Article', 'Note'):
|
||||
activity = {
|
||||
'@context': as2.CONTEXT,
|
||||
'type': 'Create',
|
||||
'id': f'{activity["id"]}#bridgy-fed-create',
|
||||
'object': activity,
|
||||
}
|
||||
# cc public and target's author(s) and recipients
|
||||
# https://www.w3.org/TR/activitystreams-vocabulary/#audienceTargeting
|
||||
# https://w3c.github.io/activitypub/#delivery
|
||||
if type in as2.TYPE_TO_VERB or type in ('Article', 'Note'):
|
||||
recips = [AS2_PUBLIC_AUDIENCE]
|
||||
if target:
|
||||
recips += itertools.chain(*(util.get_list(target, field) for field in
|
||||
('actor', 'attributedTo', 'to', 'cc')))
|
||||
activity['cc'] = util.dedupe_urls(util.get_url(recip) or recip.get('id')
|
||||
for recip in recips)
|
||||
|
||||
return util.trim_nulls(activity)
|
||||
# wrap articles and notes in a Create activity
|
||||
if type in ('Article', 'Note'):
|
||||
activity = {
|
||||
'@context': as2.CONTEXT,
|
||||
'type': 'Create',
|
||||
'id': f'{activity["id"]}#bridgy-fed-create',
|
||||
'object': activity,
|
||||
}
|
||||
|
||||
def postprocess_as2_actor(self, actor):
|
||||
"""Prepare an AS2 actor object to be served or sent via ActivityPub.
|
||||
return util.trim_nulls(activity)
|
||||
|
||||
Args:
|
||||
actor: dict, AS2 actor object
|
||||
"""
|
||||
url = actor.get('url')
|
||||
if url:
|
||||
domain = urllib.parse.urlparse(url).netloc
|
||||
actor.setdefault('preferredUsername', domain)
|
||||
actor['id'] = '%s/%s' % (self.request.host_url, domain)
|
||||
actor['url'] = self.redirect_wrap(url)
|
||||
|
||||
# required by pixelfed. https://github.com/snarfed/bridgy-fed/issues/39
|
||||
actor.setdefault('summary', '')
|
||||
def postprocess_as2_actor(actor):
|
||||
"""Prepare an AS2 actor object to be served or sent via ActivityPub.
|
||||
|
||||
def redirect_wrap(self, url):
|
||||
"""Returns a URL on our domain that redirects to this URL.
|
||||
Args:
|
||||
actor: dict, AS2 actor object
|
||||
"""
|
||||
url = actor.get('url')
|
||||
if url:
|
||||
domain = urllib.parse.urlparse(url).netloc
|
||||
actor.setdefault('preferredUsername', domain)
|
||||
actor['id'] = request.host_url + domain
|
||||
actor['url'] = redirect_wrap(url)
|
||||
|
||||
...to satisfy Mastodon's non-standard domain matching requirement. :(
|
||||
# required by pixelfed. https://github.com/snarfed/bridgy-fed/issues/39
|
||||
actor.setdefault('summary', '')
|
||||
|
||||
Args:
|
||||
url: string
|
||||
|
||||
https://github.com/snarfed/bridgy-fed/issues/16#issuecomment-424799599
|
||||
https://github.com/tootsuite/mastodon/pull/6219#issuecomment-429142747
|
||||
def redirect_wrap(url):
|
||||
"""Returns a URL on our domain that redirects to this URL.
|
||||
|
||||
Returns: string, redirect url
|
||||
"""
|
||||
if not url:
|
||||
return url
|
||||
...to satisfy Mastodon's non-standard domain matching requirement. :(
|
||||
|
||||
prefix = urllib.parse.urljoin(self.request.host_url, '/r/')
|
||||
if url.startswith(prefix):
|
||||
return url
|
||||
Args:
|
||||
url: string
|
||||
|
||||
return prefix + url
|
||||
https://github.com/snarfed/bridgy-fed/issues/16#issuecomment-424799599
|
||||
https://github.com/tootsuite/mastodon/pull/6219#issuecomment-429142747
|
||||
|
||||
def redirect_unwrap(self, val):
|
||||
"""Removes our redirect wrapping from a URL, if it's there.
|
||||
Returns: string, redirect url
|
||||
"""
|
||||
if not url:
|
||||
return url
|
||||
|
||||
url may be a string, dict, or list. dicts and lists are unwrapped
|
||||
recursively.
|
||||
prefix = urllib.parse.urljoin(request.host_url, '/r/')
|
||||
if url.startswith(prefix):
|
||||
return url
|
||||
|
||||
Strings that aren't wrapped URLs are left unchanged.
|
||||
return prefix + url
|
||||
|
||||
Args:
|
||||
url: string
|
||||
|
||||
Returns: string, unwrapped url
|
||||
"""
|
||||
if isinstance(val, dict):
|
||||
return {k: self.redirect_unwrap(v) for k, v in val.items()}
|
||||
def redirect_unwrap(val):
|
||||
"""Removes our redirect wrapping from a URL, if it's there.
|
||||
|
||||
elif isinstance(val, list):
|
||||
return [self.redirect_unwrap(v) for v in val]
|
||||
url may be a string, dict, or list. dicts and lists are unwrapped
|
||||
recursively.
|
||||
|
||||
elif isinstance(val, str):
|
||||
prefix = urllib.parse.urljoin(self.request.host_url, '/r/')
|
||||
if val.startswith(prefix):
|
||||
return val[len(prefix):]
|
||||
elif val.startswith(self.request.host_url):
|
||||
return util.follow_redirects(
|
||||
util.domain_from_link(urllib.parse.urlparse(val).path.strip('/'))).url
|
||||
Strings that aren't wrapped URLs are left unchanged.
|
||||
|
||||
return val
|
||||
Args:
|
||||
url: string
|
||||
|
||||
Returns: string, unwrapped url
|
||||
"""
|
||||
if isinstance(val, dict):
|
||||
return {k: redirect_unwrap(v) for k, v in val.items()}
|
||||
|
||||
elif isinstance(val, list):
|
||||
return [redirect_unwrap(v) for v in val]
|
||||
|
||||
elif isinstance(val, str):
|
||||
prefix = urllib.parse.urljoin(request.host_url, '/r/')
|
||||
if val.startswith(prefix):
|
||||
return val[len(prefix):]
|
||||
elif val.startswith(request.host_url):
|
||||
return util.follow_redirects(
|
||||
util.domain_from_link(urllib.parse.urlparse(val).path.strip('/'))).url
|
||||
|
||||
return val
|
||||
|
|
107
redirect.py
107
redirect.py
|
@ -13,78 +13,71 @@ import logging
|
|||
import re
|
||||
import urllib.parse
|
||||
|
||||
from flask import redirect, request
|
||||
from granary import as2, microformats2
|
||||
import mf2util
|
||||
from oauth_dropins.webutil import util
|
||||
from oauth_dropins.webutil.handlers import cache_response
|
||||
from oauth_dropins.webutil.util import json_dumps
|
||||
import ujson as json
|
||||
import webapp2
|
||||
from werkzeug.exceptions import abort
|
||||
|
||||
from app import app, cache
|
||||
import common
|
||||
from common import error
|
||||
from models import MagicKey
|
||||
|
||||
CACHE_TIME = datetime.timedelta(seconds=15)
|
||||
|
||||
|
||||
class RedirectHandler(common.Handler):
|
||||
"""301 redirects to the embedded fully qualified URL.
|
||||
@app.route(r'/r/<path:to>')
|
||||
@cache.cached(15) # seconds
|
||||
def redir(to=None):
|
||||
"""301 redirect to the embedded fully qualified URL.
|
||||
|
||||
e.g. redirects /r/https://foo.com/bar?baz to https://foo.com/bar?baz
|
||||
"""
|
||||
@cache_response(CACHE_TIME)
|
||||
def get(self):
|
||||
assert self.request.path_qs.startswith('/r/')
|
||||
to = self.request.path_qs[3:]
|
||||
if request.args:
|
||||
to += '?' + urllib.parse.urlencode(request.args)
|
||||
# some browsers collapse repeated /s in the path down to a single slash.
|
||||
# if that happened to this URL, expand it back to two /s.
|
||||
to = re.sub(r'^(https?:/)([^/])', r'\1/\2', to)
|
||||
|
||||
# some browsers collapse repeated /s in the path down to a single slash.
|
||||
# if that happened to this URL, expand it back to two /s.
|
||||
to = re.sub(r'^(https?:/)([^/])', r'\1/\2', to)
|
||||
if not to.startswith('http://') and not to.startswith('https://'):
|
||||
error(f'Expected fully qualified URL; got {to}')
|
||||
|
||||
if not to.startswith('http://') and not to.startswith('https://'):
|
||||
self.error('Expected fully qualified URL; got %s' % to)
|
||||
# check that we've seen this domain before so we're not an open redirect
|
||||
domains = set((util.domain_from_link(to),
|
||||
urllib.parse.urlparse(to).hostname))
|
||||
for domain in domains:
|
||||
if MagicKey.get_by_id(domain):
|
||||
logging.info(f'Found MagicKey for domain {domain}')
|
||||
break
|
||||
else:
|
||||
logging.info(f'No user found for any of {domains}; returning 404')
|
||||
abort(404)
|
||||
|
||||
# check that we've seen this domain before so we're not an open redirect
|
||||
domains = set((util.domain_from_link(to),
|
||||
urllib.parse.urlparse(to).hostname))
|
||||
for domain in domains:
|
||||
if MagicKey.get_by_id(domain):
|
||||
logging.info(f'Found MagicKey for domain {domain}')
|
||||
break
|
||||
else:
|
||||
logging.info(f'No user found for any of {domains}; returning 404')
|
||||
self.abort(404)
|
||||
# poor man's conneg, only handle single Accept values, not multiple with
|
||||
# priorities.
|
||||
if request.headers.get('Accept') in (common.CONTENT_TYPE_AS2,
|
||||
common.CONTENT_TYPE_AS2_LD):
|
||||
return convert_to_as2(to)
|
||||
|
||||
# poor man's conneg, only handle single Accept values, not multiple with
|
||||
# priorities.
|
||||
if self.request.headers.get('Accept') in (common.CONTENT_TYPE_AS2,
|
||||
common.CONTENT_TYPE_AS2_LD):
|
||||
return self.convert_to_as2(to)
|
||||
|
||||
# redirect
|
||||
logging.info('redirecting to %s', to)
|
||||
self.redirect(to, permanent=True)
|
||||
|
||||
def convert_to_as2(self, url):
|
||||
"""Fetch a URL as HTML, convert it to AS2, and return it.
|
||||
|
||||
Currently mainly for Pixelfed.
|
||||
https://github.com/snarfed/bridgy-fed/issues/39
|
||||
"""
|
||||
mf2 = util.fetch_mf2(url)
|
||||
entry = mf2util.find_first_entry(mf2, ['h-entry'])
|
||||
logging.info('Parsed mf2 for %s: %s', mf2['url'], json_dumps(entry, indent=2))
|
||||
|
||||
obj = self.postprocess_as2(as2.from_as1(microformats2.json_to_object(entry)))
|
||||
logging.info('Returning: %s', json_dumps(obj, indent=2))
|
||||
|
||||
self.response.headers.update({
|
||||
'Content-Type': common.CONTENT_TYPE_AS2,
|
||||
'Access-Control-Allow-Origin': '*',
|
||||
})
|
||||
self.response.write(json_dumps(obj, indent=2))
|
||||
# redirect
|
||||
logging.info('redirecting to %s', to)
|
||||
return redirect(to, code=301)
|
||||
|
||||
|
||||
ROUTES = [
|
||||
(r'/r/.+', RedirectHandler),
|
||||
]
|
||||
def convert_to_as2(url):
|
||||
"""Fetch a URL as HTML, convert it to AS2, and return it.
|
||||
|
||||
Currently mainly for Pixelfed.
|
||||
https://github.com/snarfed/bridgy-fed/issues/39
|
||||
"""
|
||||
mf2 = util.fetch_mf2(url)
|
||||
entry = mf2util.find_first_entry(mf2, ['h-entry'])
|
||||
logging.info('Parsed mf2 for %s: %s', mf2['url'], json_dumps(entry, indent=2))
|
||||
|
||||
obj = common.postprocess_as2(as2.from_as1(microformats2.json_to_object(entry)))
|
||||
logging.info('Returning: %s', json_dumps(obj, indent=2))
|
||||
|
||||
return obj, {
|
||||
'Content-Type': common.CONTENT_TYPE_AS2,
|
||||
'Access-Control-Allow-Origin': '*',
|
||||
}
|
||||
|
|
|
@ -5,41 +5,43 @@ from unittest.mock import patch
|
|||
|
||||
from oauth_dropins.webutil.testutil import requests_response
|
||||
|
||||
from app import application
|
||||
from app import app, cache
|
||||
import common
|
||||
from models import MagicKey
|
||||
from redirect import RedirectHandler
|
||||
from .test_webmention import REPOST_HTML, REPOST_AS2
|
||||
from . import testutil
|
||||
|
||||
client = app.test_client()
|
||||
|
||||
|
||||
class RedirectTest(testutil.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
super(RedirectTest, self).setUp()
|
||||
RedirectHandler.get.cache_clear()
|
||||
app.testing = True
|
||||
cache.clear()
|
||||
MagicKey.get_or_create('foo.com')
|
||||
|
||||
def test_redirect(self):
|
||||
got = application.get_response('/r/https://foo.com/bar?baz=baj&biff')
|
||||
self.assertEqual(301, got.status_int)
|
||||
self.assertEqual('https://foo.com/bar?baz=baj&biff', got.headers['Location'])
|
||||
got = client.get('/r/https://foo.com/bar?baz=baj&biff')
|
||||
self.assertEqual(301, got.status_code)
|
||||
self.assertEqual('https://foo.com/bar?baz=baj&biff=', got.headers['Location'])
|
||||
|
||||
def test_redirect_scheme_missing(self):
|
||||
got = application.get_response('/r/foo.com')
|
||||
self.assertEqual(400, got.status_int)
|
||||
got = client.get('/r/foo.com')
|
||||
self.assertEqual(400, got.status_code)
|
||||
|
||||
def test_redirect_url_missing(self):
|
||||
got = application.get_response('/r/')
|
||||
self.assertEqual(404, got.status_int)
|
||||
got = client.get('/r/')
|
||||
self.assertEqual(404, got.status_code)
|
||||
|
||||
def test_redirect_no_magic_key_for_domain(self):
|
||||
got = application.get_response('/r/http://bar.com/baz')
|
||||
self.assertEqual(404, got.status_int)
|
||||
got = client.get('/r/http://bar.com/baz')
|
||||
self.assertEqual(404, got.status_code)
|
||||
|
||||
def test_redirect_single_slash(self):
|
||||
got = application.get_response('/r/https:/foo.com/bar')
|
||||
self.assertEqual(301, got.status_int)
|
||||
got = client.get('/r/https:/foo.com/bar')
|
||||
self.assertEqual(301, got.status_code)
|
||||
self.assertEqual('https://foo.com/bar', got.headers['Location'])
|
||||
|
||||
def test_as2(self):
|
||||
|
@ -63,10 +65,10 @@ class RedirectTest(testutil.TestCase):
|
|||
mock_get.return_value = requests_response(
|
||||
REPOST_HTML, content_type=common.CONTENT_TYPE_HTML)
|
||||
|
||||
got = application.get_response('/r/https://foo.com/bar', headers={'Accept': accept})
|
||||
got = client.get('/r/https://foo.com/bar', headers={'Accept': accept})
|
||||
|
||||
args, kwargs = mock_get.call_args
|
||||
self.assertEqual(('https://foo.com/bar',), args)
|
||||
|
||||
self.assertEqual(200, got.status_int)
|
||||
self.assertEqual(200, got.status_code)
|
||||
self.assertEqual(as2, got.json)
|
||||
|
|
|
@ -17,7 +17,7 @@ from oauth_dropins.webutil.util import json_dumps, json_loads
|
|||
import requests
|
||||
|
||||
import activitypub
|
||||
from app import application
|
||||
from app import app
|
||||
from common import (
|
||||
AS2_PUBLIC_AUDIENCE,
|
||||
CONNEG_HEADERS_AS2,
|
||||
|
|
Ładowanie…
Reference in New Issue