kopia lustrzana https://github.com/snarfed/bridgy-fed
279 wiersze
10 KiB
Python
279 wiersze
10 KiB
Python
"""Unit tests for protocol.py."""
|
|
from unittest.mock import patch
|
|
|
|
from flask import g
|
|
from oauth_dropins.webutil.testutil import requests_response
|
|
import requests
|
|
|
|
# import first so that Fake is defined before URL routes are registered
|
|
from .testutil import Fake, TestCase
|
|
|
|
from activitypub import ActivityPub
|
|
from app import app
|
|
from models import Follower, Object, PROTOCOLS, User
|
|
import protocol
|
|
from protocol import Protocol
|
|
from ui import UIProtocol
|
|
from web import Web
|
|
|
|
from .test_activitypub import ACTOR, REPLY
|
|
from .test_web import ACTOR_HTML
|
|
|
|
REPLY = {
|
|
**REPLY,
|
|
'actor': ACTOR,
|
|
'object': {
|
|
**REPLY['object'],
|
|
'author': ACTOR,
|
|
},
|
|
}
|
|
|
|
|
|
class ProtocolTest(TestCase):
|
|
|
|
def setUp(self):
|
|
super().setUp()
|
|
self.user = self.make_user('foo.com', has_hcard=True)
|
|
g.user = None
|
|
|
|
def tearDown(self):
|
|
PROTOCOLS.pop('greedy', None)
|
|
super().tearDown()
|
|
|
|
@staticmethod
|
|
def store_object(**kwargs):
|
|
obj = Object(**kwargs)
|
|
obj.put()
|
|
del protocol.objects_cache[obj.key.id()]
|
|
return obj
|
|
|
|
def test_protocols_global(self):
|
|
self.assertEqual(Fake, PROTOCOLS['fake'])
|
|
self.assertEqual(Web, PROTOCOLS['web'])
|
|
self.assertEqual(Web, PROTOCOLS['webmention'])
|
|
|
|
def test_for_domain_for_request(self):
|
|
for domain, expected in [
|
|
('fake.brid.gy', Fake),
|
|
('ap.brid.gy', ActivityPub),
|
|
('activitypub.brid.gy', ActivityPub),
|
|
('web.brid.gy', Web),
|
|
(None, None),
|
|
('', None),
|
|
('brid.gy', None),
|
|
('www.brid.gy', None),
|
|
('fed.brid.gy', None),
|
|
('fake.fed.brid.gy', None),
|
|
('fake', None),
|
|
('fake.com', None),
|
|
]:
|
|
with self.subTest(domain=domain, expected=expected):
|
|
self.assertEqual(expected, Protocol.for_domain(domain))
|
|
with app.test_request_context('/foo', base_url=f'https://{domain}/'):
|
|
self.assertEqual(expected, Protocol.for_request())
|
|
|
|
def test_for_domain_for_request_fed(self):
|
|
for url, expected in [
|
|
('https://fed.brid.gy/', Fake),
|
|
('http://localhost/foo', Fake),
|
|
('https://ap.brid.gy/bar', ActivityPub),
|
|
('https://baz/biff', None),
|
|
]:
|
|
with self.subTest(url=url, expected=expected):
|
|
self.assertEqual(expected, Protocol.for_domain(url, fed=Fake))
|
|
with app.test_request_context('/foo', base_url=url):
|
|
self.assertEqual(expected, Protocol.for_request(fed=Fake))
|
|
|
|
def test_subdomain_url(self):
|
|
self.assertEqual('https://fa.brid.gy/', Fake.subdomain_url())
|
|
self.assertEqual('https://fa.brid.gy/foo?bar', Fake.subdomain_url('foo?bar'))
|
|
self.assertEqual('https://fed.brid.gy/', UIProtocol.subdomain_url())
|
|
|
|
@patch('requests.get')
|
|
def test_receive_reply_not_feed_not_notification(self, mock_get):
|
|
Follower.get_or_create(to=Fake.get_or_create(id=ACTOR['id']),
|
|
from_=Fake.get_or_create(id='foo.com'))
|
|
other_user = self.make_user('user.com', cls=Web)
|
|
|
|
# user.com webmention discovery
|
|
mock_get.return_value = requests_response('<html></html>')
|
|
|
|
Fake.receive(REPLY['id'], as2=REPLY)
|
|
|
|
self.assert_object(REPLY['id'],
|
|
as2=REPLY,
|
|
type='post',
|
|
users=[other_user.key],
|
|
# not feed since it's a reply
|
|
# not notification since it doesn't involve the user
|
|
labels=['activity'],
|
|
status='complete',
|
|
source_protocol='fake',
|
|
)
|
|
self.assert_object(REPLY['object']['id'],
|
|
as2=REPLY['object'],
|
|
type='comment',
|
|
source_protocol='fake',
|
|
)
|
|
|
|
def test_for_id(self):
|
|
for id, expected in [
|
|
(None, None),
|
|
('', None),
|
|
('foo://bar', None),
|
|
('fake:foo', Fake),
|
|
# TODO
|
|
# ('at://foo', ATProto),
|
|
('https://ap.brid.gy/foo/bar', ActivityPub),
|
|
('https://web.brid.gy/foo/bar', Web),
|
|
]:
|
|
self.assertEqual(expected, Protocol.for_id(id))
|
|
|
|
def test_for_id_true_overrides_none(self):
|
|
class Greedy(Protocol, User):
|
|
@classmethod
|
|
def owns_id(cls, id):
|
|
return True
|
|
|
|
self.assertEqual(Greedy, Protocol.for_id('http://foo'))
|
|
self.assertEqual(Greedy, Protocol.for_id('https://bar/baz'))
|
|
|
|
def test_for_id_object(self):
|
|
self.store_object(id='http://ui/obj', source_protocol='ui')
|
|
self.assertEqual(UIProtocol, Protocol.for_id('http://ui/obj'))
|
|
|
|
def test_for_id_object_missing_source_protocol(self):
|
|
self.store_object(id='http://bad/obj')
|
|
self.assertIsNone(Protocol.for_id('http://bad/obj'))
|
|
|
|
@patch('requests.get')
|
|
def test_for_id_activitypub_fetch(self, mock_get):
|
|
mock_get.return_value = self.as2_resp(ACTOR)
|
|
self.assertEqual(ActivityPub, Protocol.for_id('http://ap/actor'))
|
|
self.assertIn(self.as2_req('http://ap/actor'), mock_get.mock_calls)
|
|
|
|
@patch('requests.get')
|
|
def test_for_id_web_fetch(self, mock_get):
|
|
mock_get.return_value = requests_response(ACTOR_HTML)
|
|
self.assertEqual(Web, Protocol.for_id('http://web.site/'))
|
|
self.assertIn(self.req('http://web.site/'), mock_get.mock_calls)
|
|
|
|
@patch('requests.get')
|
|
def test_for_id_web_fetch_no_mf2(self, mock_get):
|
|
mock_get.return_value = requests_response('<html></html>')
|
|
self.assertIsNone(Protocol.for_id('http://web.site/'))
|
|
self.assertIn(self.req('http://web.site/'), mock_get.mock_calls)
|
|
|
|
def test_load(self):
|
|
Fake.objects['foo'] = {'x': 'y'}
|
|
|
|
loaded = Fake.load('foo')
|
|
self.assert_equals({'x': 'y'}, loaded.our_as1)
|
|
self.assertFalse(loaded.changed)
|
|
self.assertTrue(loaded.new)
|
|
|
|
self.assertIsNotNone(Object.get_by_id('foo'))
|
|
self.assertEqual(['foo'], Fake.fetched)
|
|
|
|
def test_load_existing(self):
|
|
self.store_object(id='foo', our_as1={'x': 'y'})
|
|
|
|
loaded = Fake.load('foo')
|
|
self.assert_equals({'x': 'y'}, loaded.our_as1)
|
|
self.assertFalse(loaded.changed)
|
|
self.assertFalse(loaded.new)
|
|
|
|
self.assertEqual([], Fake.fetched)
|
|
|
|
def test_load_existing_empty_deleted(self):
|
|
stored = self.store_object(id='foo', deleted=True)
|
|
|
|
loaded = Fake.load('foo')
|
|
self.assert_entities_equal(stored, loaded)
|
|
self.assertFalse(loaded.changed)
|
|
self.assertFalse(loaded.new)
|
|
|
|
self.assertEqual([], Fake.fetched)
|
|
|
|
def test_load_cached(self):
|
|
obj = Object(id='foo', our_as1={'x': 'y'})
|
|
protocol.objects_cache['foo'] = obj
|
|
loaded = Fake.load('foo')
|
|
self.assert_entities_equal(obj, loaded)
|
|
|
|
# check that it's a separate copy of the entity in the cache
|
|
# https://github.com/snarfed/bridgy-fed/issues/558#issuecomment-1603203927
|
|
loaded.our_as1 = {'a': 'b'}
|
|
self.assertEqual({'x': 'y'}, Protocol.load('foo').our_as1)
|
|
|
|
def test_load_remote_true_existing_empty(self):
|
|
Fake.objects['foo'] = {'x': 'y'}
|
|
Object(id='foo').put()
|
|
|
|
loaded = Fake.load('foo', remote=True)
|
|
self.assertEqual({'x': 'y'}, loaded.as1)
|
|
self.assertTrue(loaded.changed)
|
|
self.assertFalse(loaded.new)
|
|
self.assertEqual(['foo'], Fake.fetched)
|
|
|
|
def test_load_remote_true_new_empty(self):
|
|
Fake.objects['foo'] = None
|
|
self.store_object(id='foo', our_as1={'x': 'y'})
|
|
|
|
loaded = Fake.load('foo', remote=True)
|
|
self.assertIsNone(loaded.as1)
|
|
self.assertTrue(loaded.changed)
|
|
self.assertFalse(loaded.new)
|
|
self.assertEqual(['foo'], Fake.fetched)
|
|
|
|
def test_load_remote_true_unchanged(self):
|
|
obj = self.store_object(id='foo', our_as1={'x': 'stored'})
|
|
Fake.objects['foo'] = {'x': 'stored'}
|
|
|
|
loaded = Fake.load('foo', remote=True)
|
|
self.assert_entities_equal(obj, loaded)
|
|
self.assertFalse(loaded.changed)
|
|
self.assertFalse(loaded.new)
|
|
self.assertEqual(['foo'], Fake.fetched)
|
|
|
|
def test_load_remote_true_changed(self):
|
|
self.store_object(id='foo', our_as1={'content': 'stored'})
|
|
Fake.objects['foo'] = {'content': 'new'}
|
|
|
|
loaded = Fake.load('foo', remote=True)
|
|
self.assert_equals({'content': 'new'}, loaded.our_as1)
|
|
self.assertTrue(loaded.changed)
|
|
self.assertFalse(loaded.new)
|
|
self.assertEqual(['foo'], Fake.fetched)
|
|
|
|
def test_load_remote_false(self):
|
|
self.assertIsNone(Fake.load('nope', remote=False))
|
|
self.assertEqual([], Fake.fetched)
|
|
|
|
obj = self.store_object(id='foo', our_as1={'content': 'stored'})
|
|
self.assert_entities_equal(obj, Fake.load('foo', remote=False))
|
|
self.assertEqual([], Fake.fetched)
|
|
|
|
def test_Protocol_load_remote_false_existing_object_empty(self):
|
|
obj = self.store_object(id='foo')
|
|
self.assert_entities_equal(obj, Protocol.load('foo', remote=False))
|
|
|
|
def test_local_false_missing(self):
|
|
with self.assertRaises(requests.HTTPError) as e:
|
|
Fake.load('foo', local=False)
|
|
self.assertEqual(410, e.response.status_code)
|
|
|
|
self.assertEqual(['foo'], Fake.fetched)
|
|
|
|
def test_local_false_existing(self):
|
|
self.store_object(id='foo', our_as1={'content': 'stored'}, source_protocol='ui')
|
|
|
|
Fake.objects['foo'] = {'foo': 'bar'}
|
|
Fake.load('foo', local=False)
|
|
self.assert_object('foo', source_protocol='fake', our_as1={'foo': 'bar'})
|
|
self.assertEqual(['foo'], Fake.fetched)
|
|
|
|
def test_remote_false_local_false_assert(self):
|
|
with self.assertRaises(AssertionError):
|
|
Fake.load('nope', local=False, remote=False)
|