# coding=utf-8 """Unit tests for webmention.py.""" import copy from unittest.mock import patch from urllib.parse import urlencode import feedparser from flask import g, get_flashed_messages from granary import as1, as2, atom, microformats2 from httpsig.sign import HeaderSigner from oauth_dropins.webutil import appengine_config, util from oauth_dropins.webutil.appengine_config import tasks_client from oauth_dropins.webutil.appengine_info import APP_ID from oauth_dropins.webutil.testutil import NOW, requests_response from oauth_dropins.webutil.util import json_dumps, json_loads import requests from requests import HTTPError from werkzeug.exceptions import BadGateway, BadRequest # import first so that Fake is defined before URL routes are registered from . import testutil from activitypub import ActivityPub from common import ( CONTENT_TYPE_HTML, redirect_unwrap, ) from models import Follower, Object, Target, User from web import TASKS_LOCATION, Web from . import test_activitypub from .testutil import TestCase FULL_REDIR = requests_response( status=302, redirected_url='http://localhost/.well-known/webfinger?resource=acct:user.com@user.com') ACTOR_HTML = """\
Ms. ☕ Baz """ ACTOR_HTML_RESP = requests_response(ACTOR_HTML, url='https://user.com/', content_type=CONTENT_TYPE_HTML) ACTOR_MF2 = { 'type': ['h-card'], 'properties': { 'url': ['https://user.com/'], 'name': ['Ms. ☕ Baz'], }, } ACTOR_MF2_REL_URLS = { **ACTOR_MF2, 'rel-urls': {'https://user.com/': {'rels': ['me'], 'text': 'Ms. ☕ Baz'}} } ACTOR_AS1_UNWRAPPED = { 'objectType': 'person', 'displayName': 'Ms. ☕ Baz', 'url': 'https://user.com/', 'urls': [{'value': 'https://user.com/', 'displayName': 'Ms. ☕ Baz'}], } ACTOR_AS2 = { 'type': 'Person', 'id': 'http://localhost/user.com', 'url': 'http://localhost/r/https://user.com/', 'name': 'Ms. ☕ Baz', 'preferredUsername': 'user.com', } ACTOR_AS2_USER = { 'type': 'Person', 'url': 'https://user.com/', 'name': 'Ms. ☕ Baz', 'attachment': [{ 'name': 'Ms. ☕ Baz', 'type': 'PropertyValue', 'value': 'user.com', }], } ACTOR_AS2_FULL = { **ACTOR_AS2, '@context': [ 'https://www.w3.org/ns/activitystreams', 'https://w3id.org/security/v1', ], 'attachment': [{ 'name': 'Web site', 'type': 'PropertyValue', 'value': 'user.com', }], 'inbox': 'http://localhost/user.com/inbox', 'outbox': 'http://localhost/user.com/outbox', 'following': 'http://localhost/user.com/following', 'followers': 'http://localhost/user.com/followers', 'endpoints': { 'sharedInbox': 'http://localhost/inbox', }, } REPOST_HTML = """\ reposted! Ms. ☕ Baz """ REPOST = requests_response(REPOST_HTML, content_type=CONTENT_TYPE_HTML, url='https://user.com/repost') REPOST_MF2 = util.parse_mf2(REPOST_HTML)['items'][0] REPOST_AS2 = { '@context': 'https://www.w3.org/ns/activitystreams', 'type': 'Announce', 'id': 'http://localhost/r/https://user.com/repost', 'url': 'http://localhost/r/https://user.com/repost', 'name': 'reposted!', 'object': 'https://mas.to/toot/id', 'to': [as2.PUBLIC_AUDIENCE], 'cc': [ 'https://mas.to/author', 'https://mas.to/bystander', 'https://mas.to/recipient', as2.PUBLIC_AUDIENCE, ], 'actor': 'http://localhost/user.com', } REPOST_AS1_UNWRAPPED = { 'objectType': 'activity', 'verb': 'share', 'id': 'https://user.com/repost', 'url': 'https://user.com/repost', 'displayName': 'reposted!', 'object': 'https://mas.to/toot/id', 'actor': ACTOR_AS1_UNWRAPPED, } REPOST_HCITE_HTML = """\ reposted! Ms. ☕ Baz """ REPOST_HCITE = requests_response(REPOST_HTML, content_type=CONTENT_TYPE_HTML, url='https://user.com/repost') WEBMENTION_REL_LINK = requests_response( '') WEBMENTION_NO_REL_LINK = requests_response('') DELETE_AS1 = { 'objectType': 'activity', 'verb': 'delete', 'id': 'https://user.com/post#bridgy-fed-delete', 'actor': 'http://localhost/user.com', 'object': 'https://user.com/post', } DELETE_AS2 = { '@context': 'https://www.w3.org/ns/activitystreams', 'type': 'Delete', 'id': 'http://localhost/r/https://user.com/post#bridgy-fed-delete', 'actor': 'http://localhost/user.com', 'object': 'http://localhost/r/https://user.com/post', 'to': [as2.PUBLIC_AUDIENCE], } TOOT_HTML = requests_response("""\ """, url='https://mas.to/toot', content_type=CONTENT_TYPE_HTML) TOOT_AS2_DATA = { '@context': ['https://www.w3.org/ns/activitystreams'], 'type': 'Article', 'id': 'https://mas.to/toot/id', 'content': 'Lots of ☕ words...', 'actor': {'url': 'https://mas.to/author'}, 'to': ['https://mas.to/recipient', as2.PUBLIC_AUDIENCE], 'cc': ['https://mas.to/bystander', as2.PUBLIC_AUDIENCE], } TOOT_AS2 = requests_response( TOOT_AS2_DATA, url='https://mas.to/toot/id', content_type=as2.CONTENT_TYPE + '; charset=utf-8') REPLY_HTML = """\ """ REPLY = requests_response(REPLY_HTML, content_type=CONTENT_TYPE_HTML, url='https://user.com/reply') REPLY_MF2 = util.parse_mf2(REPLY_HTML)['items'][0] REPLY_AS1 = microformats2.json_to_object(REPLY_MF2) CREATE_REPLY_AS1 = { 'objectType': 'activity', 'verb': 'post', 'id': 'https://user.com/reply#bridgy-fed-create', 'actor': 'http://localhost/user.com', 'object': REPLY_AS1, } REPLY_AS2 = as2.from_as1(REPLY_AS1) LIKE_HTML = """\ Ms. ☕ Baz """ LIKE = requests_response(LIKE_HTML, content_type=CONTENT_TYPE_HTML, url='https://user.com/like') LIKE_MF2 = util.parse_mf2(LIKE_HTML)['items'][0] ACTOR = TestCase.as2_resp({ 'objectType' : 'Person', 'displayName': 'Mrs. ☕ Foo', 'id': 'https://mas.to/mrs-foo', 'inbox': 'https://mas.to/inbox', }) AS2_CREATE = { '@context': 'https://www.w3.org/ns/activitystreams', 'type': 'Create', 'id': 'http://localhost/r/https://user.com/reply#bridgy-fed-create', 'actor': 'http://localhost/user.com', 'object': { 'type': 'Note', 'id': 'http://localhost/r/https://user.com/reply', 'url': 'http://localhost/r/https://user.com/reply', 'name': 'foo ☕ bar', 'content': """\ foo ☕ bar """, 'inReplyTo': 'https://mas.to/toot/id', 'to': [as2.PUBLIC_AUDIENCE], 'cc': [ 'https://mas.to/author', 'https://mas.to/bystander', 'https://mas.to/recipient', as2.PUBLIC_AUDIENCE, ], 'attributedTo': ACTOR_AS2, 'tag': [{ 'type': 'Mention', 'href': 'https://mas.to/author', }], }, 'to': [as2.PUBLIC_AUDIENCE], } AS2_UPDATE = copy.deepcopy(AS2_CREATE) AS2_UPDATE.update({ 'id': 'http://localhost/r/https://user.com/reply#bridgy-fed-update-2022-01-02T03:04:05+00:00', 'type': 'Update', }) # we should generate this if it's not already in mf2 because Mastodon # requires it for updates AS2_UPDATE['object']['updated'] = NOW.isoformat() FOLLOW_HTML = """\ Ms. ☕ Baz """ FOLLOW = requests_response( FOLLOW_HTML, url='https://user.com/follow', content_type=CONTENT_TYPE_HTML) FOLLOW_MF2 = util.parse_mf2(FOLLOW_HTML)['items'][0] FOLLOW_AS1 = microformats2.json_to_object(FOLLOW_MF2) FOLLOW_AS2 = { '@context': 'https://www.w3.org/ns/activitystreams', 'type': 'Follow', 'id': 'http://localhost/r/https://user.com/follow', 'url': 'http://localhost/r/https://user.com/follow', 'object': 'https://mas.to/mrs-foo', 'actor': 'http://localhost/user.com', 'to': [as2.PUBLIC_AUDIENCE], } FOLLOW_FRAGMENT_HTML = """\hello i am a post
Ms. ☕ Baz
""" NOTE = requests_response(NOTE_HTML, url='https://user.com/post', content_type=CONTENT_TYPE_HTML) NOTE_MF2 = util.parse_mf2(NOTE_HTML)['items'][0] NOTE_AS1 = microformats2.json_to_object(NOTE_MF2) NOTE_AS2 = { 'type': 'Note', 'id': 'http://localhost/r/https://user.com/post', 'url': 'http://localhost/r/https://user.com/post', 'attributedTo': ACTOR_AS2, 'name': 'hello i am a post', 'content': 'hello i am a post', 'to': [as2.PUBLIC_AUDIENCE], } CREATE_AS1 = { 'objectType': 'activity', 'verb': 'post', 'id': 'https://user.com/post#bridgy-fed-create', 'actor': 'http://localhost/user.com', 'object': NOTE_AS1, } CREATE_AS2 = { '@context': 'https://www.w3.org/ns/activitystreams', 'type': 'Create', 'id': 'http://localhost/r/https://user.com/post#bridgy-fed-create', 'actor': 'http://localhost/user.com', 'object': NOTE_AS2, 'to': [as2.PUBLIC_AUDIENCE], } UPDATE_AS2 = copy.deepcopy(CREATE_AS2) UPDATE_AS2.update({ 'type': 'Update', 'id': 'http://localhost/r/https://user.com/post#bridgy-fed-update-2022-01-02T03:04:05+00:00', }) UPDATE_AS2['object']['updated'] = NOW.isoformat() NOT_FEDIVERSE = requests_response("""\ foo """, url='http://not/fediverse', content_type=CONTENT_TYPE_HTML) ACTIVITYPUB_GETS = [REPLY, NOT_FEDIVERSE, TOOT_AS2, ACTOR] @patch('requests.post') @patch('requests.get') class WebTest(TestCase): def setUp(self): super().setUp() g.user = self.make_user('user.com') self.request_context.push() def assert_deliveries(self, mock_post, inboxes, data, ignore=()): self.assertEqual(len(inboxes), len(mock_post.call_args_list)) calls = {} # maps inbox URL to JSON data for args, kwargs in mock_post.call_args_list: self.assertEqual(as2.CONTENT_TYPE, kwargs['headers']['Content-Type']) rsa_key = kwargs['auth'].header_signer._rsa._key self.assertEqual(g.user.private_pem(), rsa_key.exportKey()) calls[args[0]] = json_loads(kwargs['data']) for inbox in inboxes: got = calls[inbox] as1.get_object(got).pop('publicKey', None) self.assert_equals(data, got, inbox, ignore=ignore) def assert_object(self, id, **props): return super().assert_object(id, delivered_protocol='activitypub', **props) def test_put_validates_domain_id(self, *_): for bad in ( 'AbC.cOm', 'foo', '@user.com', '@user.com@user.com', 'acct:user.com', 'acct:@user.com@user.com', 'acc:me@user.com', ): with self.assertRaises(AssertionError): Web(id=bad).put() def test_get_or_create_lower_cases_domain(self, *_): user = Web.get_or_create('AbC.oRg') self.assertEqual('abc.org', user.key.id()) self.assert_entities_equal(user, Web.get_by_id('abc.org')) self.assertIsNone(Web.get_by_id('AbC.oRg')) def test_get_or_create_unicode_domain(self, *_): user = Web.get_or_create('☃.net') self.assertEqual('☃.net', user.key.id()) self.assert_entities_equal(user, Web.get_by_id('☃.net')) def test_bad_source_url(self, mock_get, mock_post): for data in b'', {'source': 'bad'}, {'source': 'https://'}: got = self.client.post('/webmention', data=data) self.assertEqual(400, got.status_code) self.assertEqual(0, Object.query().count()) @patch('oauth_dropins.webutil.appengine_config.tasks_client.create_task') def test_make_task(self, mock_create_task, mock_get, mock_post): mock_get.side_effect = [NOTE, ACTOR] params = { 'source': 'https://user.com/post', 'target': 'https://fed.brid.gy/', } got = self.client.post('/webmention', data=params) self.assertEqual(202, got.status_code) mock_create_task.assert_called_with( parent=f'projects/{APP_ID}/locations/{TASKS_LOCATION}/queues/webmention', task={ 'app_engine_http_request': { 'http_method': 'POST', 'relative_uri': '/_ah/queue/webmention', 'body': urlencode(params).encode(), 'headers': {'Content-Type': 'application/x-www-form-urlencoded'}, }, }, ) def test_no_user(self, mock_get, mock_post): got = self.client.post('/webmention', data={'source': 'https://nope.com/post'}) self.assertEqual(400, got.status_code) self.assertEqual(0, Object.query().count()) def test_source_fetch_fails(self, mock_get, mock_post): mock_get.side_effect = ( requests_response(REPLY_HTML, status=405, content_type=CONTENT_TYPE_HTML), ) got = self.client.post('/_ah/queue/webmention', data={'source': 'https://user.com/post'}) self.assertEqual(502, got.status_code) self.assertEqual(0, Object.query().count()) def test_no_source_entry(self, mock_get, mock_post): mock_get.return_value = requests_response("""nothing to see here except link
""", url='https://user.com/post', content_type=CONTENT_TYPE_HTML) got = self.client.post('/_ah/queue/webmention', data={ 'source': 'https://user.com/post', 'target': 'https://fed.brid.gy/', }) self.assertEqual(304, got.status_code) self.assertEqual(0, Object.query().count()) mock_get.assert_has_calls((self.req('https://user.com/post'),)) def test_source_homepage_no_mf2(self, mock_get, mock_post): mock_get.return_value = requests_response("""nothing to see here except link
""", url='https://user.com/', content_type=CONTENT_TYPE_HTML) got = self.client.post('/_ah/queue/webmention', data={ 'source': 'https://user.com/', 'target': 'https://fed.brid.gy/', }) self.assertEqual(304, got.status_code) mock_get.assert_has_calls((self.req('https://user.com/'),)) def test_no_targets(self, mock_get, mock_post): mock_get.return_value = requests_response(""" """, url='https://user.com/post', content_type=CONTENT_TYPE_HTML) got = self.client.post('/_ah/queue/webmention', data={ 'source': 'https://user.com/post', 'target': 'https://fed.brid.gy/', }) self.assertEqual(200, got.status_code) mock_get.assert_has_calls((self.req('https://user.com/post'),)) def test_bad_target_url(self, mock_get, mock_post): mock_get.side_effect = ( requests_response( REPLY_HTML.replace('https://mas.to/toot', 'bad'), content_type=CONTENT_TYPE_HTML, url='https://user.com/reply'), ValueError('foo bar'), ) got = self.client.post('/_ah/queue/webmention', data={'source': 'https://user.com/reply'}) self.assertEqual(400, got.status_code) def test_target_fetch_fails(self, mock_get, mock_post): mock_get.side_effect = ( requests_response( REPLY_HTML.replace('https://mas.to/toot', 'bad'), url='https://user.com/post', content_type=CONTENT_TYPE_HTML), requests.Timeout('foo bar')) got = self.client.post('/_ah/queue/webmention', data={'source': 'https://user.com/reply'}) self.assertEqual(502, got.status_code) def test_target_fetch_has_no_content_type(self, mock_get, mock_post): html = REPLY_HTML.replace( '', " reposted! Ms. ☕ Baz