"""Integration tests.""" import copy from datetime import datetime from unittest import skip from unittest.mock import ANY, patch from arroba.datastore_storage import AtpSequence, DatastoreStorage from arroba import firehose from arroba.repo import Repo from arroba.storage import SUBSCRIBE_REPOS_NSID import arroba.util from dns.resolver import NXDOMAIN import google.cloud.dns.client from granary import as2, bluesky from granary.tests.test_bluesky import ACTOR_PROFILE_BSKY, POST_BSKY from oauth_dropins.webutil.flask_util import NoContent from oauth_dropins.webutil.testutil import NOW, requests_response from oauth_dropins.webutil import util from oauth_dropins.webutil.util import json_dumps, json_loads from activitypub import ActivityPub import app from atproto import ATProto, Cursor import atproto_firehose import common from models import DM, Follower, Object, Target import simple_websocket from web import Web from .testutil import ATPROTO_KEY, TestCase from .test_activitypub import ACTOR, add_key, sign from .test_atproto_firehose import FakeWebsocketClient, setup_firehose from . import test_atproto from . import test_web DID_DOC = { **test_atproto.DID_DOC, 'id': 'did:plc:alice', 'alsoKnownAs': ['at://alice.com'], } PROFILE_GETRECORD = { 'uri': 'at://did:plc:alice/app.bsky.actor.profile/self', 'cid': 'alice+sidd', 'value': test_atproto.ACTOR_PROFILE_BSKY, } BSKY_GET_CONVO_RESP = requests_response({ # getConvoForMembers 'convo': { 'id': 'convo123', 'rev': '22222222fuozt', 'members': [], 'muted': False, 'unreadCount': 0, }, }) BSKY_SEND_MESSAGE_RESP = requests_response({ # sendMessage 'id': 'chat456', 'rev': '22222222tef2d', # ... }) @patch('ids.COPIES_PROTOCOLS', ['atproto']) class IntegrationTests(TestCase): def setUp(self): super().setUp() self.storage = DatastoreStorage() def make_ap_user(self, ap_id, did=None, **props): user = self.make_user(id=ap_id, cls=ActivityPub, obj_as2=add_key({ 'type': 'Person', 'id': ap_id, 'name': 'My Name', 'image': 'http://pic', 'inbox': f'{ap_id}/inbox', }), **props) if did: self.make_atproto_copy(user, did) return user def make_atproto_user(self, did, enabled_protocols=['activitypub']): self.store_object(id=did, raw=DID_DOC) user = self.make_user(id=did, cls=ATProto, obj_bsky=test_atproto.ACTOR_PROFILE_BSKY, enabled_protocols=enabled_protocols) return user def make_web_user(self, domain, did=None, enabled_protocols=['activitypub']): ap_subdomain = (domain.removesuffix('.brid.gy') if domain.endswith('.brid.gy') else None) user = self.make_user(id=domain, cls=Web, ap_subdomain=ap_subdomain, enabled_protocols=enabled_protocols, obj_as1={ 'objectType': 'person', 'id': f'https://{domain}/', }) if did: self.make_atproto_copy(user, did) return user def make_atproto_copy(self, user, did): user.enabled_protocols = ['atproto'] user.copies = [Target(uri=did, protocol='atproto')] user.put() Repo.create(self.storage, did, signing_key=ATPROTO_KEY) did_doc = copy.deepcopy(test_atproto.DID_DOC) did_doc['service'][0]['serviceEndpoint'] = ATProto.DEFAULT_TARGET did_doc['id'] = did self.store_object(id=did, raw=did_doc) if user.obj.as1: profile_id = f'at://{did}/app.bsky.actor.profile/self' self.store_object(id=profile_id, bsky=bluesky.from_as1(user.obj.as1)) user.obj.copies = [Target(uri=profile_id, protocol='atproto')] user.obj.put() def firehose(self, limit=1, **op): setup_firehose() FakeWebsocketClient.setup_receive(atproto_firehose.Op(**op)) atproto_firehose.load_dids() atproto_firehose.subscribe() if limit: atproto_firehose.handle(limit=limit) assert atproto_firehose.commits.empty() @patch('requests.post') def test_atproto_post_to_activitypub(self, mock_post): """ATProto post, from firehose to ActivityPub. ATProto original post at://did:plc:alice/app.bsky.feed.post/123 ActivityPub follower http://inst/bob """ self.store_object(id='did:plc:alice', raw=DID_DOC) alice = self.make_user( id='did:plc:alice', cls=ATProto, enabled_protocols=['activitypub'], obj_bsky=test_atproto.ACTOR_PROFILE_BSKY, # make sure we handle our_as1 with profile id ok obj_as1={ **test_atproto.ACTOR_AS, 'id': 'at://did:plc:alice/app.bsky.actor.profile/self', }) bob = self.make_ap_user('http://inst/bob') Follower.get_or_create(to=alice, from_=bob) # need at least one repo for firehose subscriber to load DIDs and run Repo.create(self.storage, 'did:unused', signing_key=ATPROTO_KEY) post = { '$type': 'app.bsky.feed.post', 'text': 'I hereby post', } self.firehose(repo='did:plc:alice', action='create', seq=123, path='app.bsky.feed.post/123', record=post) self.assert_ap_deliveries(mock_post, ['http://inst/bob/inbox'], from_user=alice, data={ '@context': 'https://www.w3.org/ns/activitystreams', 'type': 'Create', 'id': 'https://bsky.brid.gy/convert/ap/at://did:plc:alice/app.bsky.feed.post/123#bridgy-fed-create', 'actor': 'https://bsky.brid.gy/ap/did:plc:alice', 'published': '2022-01-02T03:04:05+00:00', 'object': { 'type': 'Note', 'id': 'https://bsky.brid.gy/convert/ap/at://did:plc:alice/app.bsky.feed.post/123', 'url': 'http://localhost/r/https://bsky.app/profile/did:plc:alice/post/123', 'attributedTo': 'https://bsky.brid.gy/ap/did:plc:alice', 'content': '

I hereby post

', 'contentMap': {'en': '

I hereby post

'}, 'to': ['https://www.w3.org/ns/activitystreams#Public'], }, 'to': ['https://www.w3.org/ns/activitystreams#Public'], }) @patch('requests.post') def test_atproto_profile_update_to_activitypub(self, mock_post): """ATProto profile update, from firehose to ActivityPub. ATProto user did:plc:alice ActivityPub follower http://inst/bob """ alice = self.make_atproto_user('did:plc:alice') bob = self.make_ap_user('http://inst/bob') Follower.get_or_create(to=alice, from_=bob) # need at least one repo for firehose subscriber to load DIDs and run Repo.create(self.storage, 'did:unused', signing_key=ATPROTO_KEY) new_profile = { **ACTOR_PROFILE_BSKY, 'displayName': 'Ms New Alice', } self.firehose(repo='did:plc:alice', action='update', seq=123, path='app.bsky.actor.profile/self', record=new_profile) expected_actor = ActivityPub.convert( Object(id='at://did:plc:alice/app.bsky.actor.profile/self', bsky=new_profile, source_protocol='atproto'), from_user=alice) self.assert_ap_deliveries(mock_post, ['http://inst/bob/inbox'], from_user=alice, data={ '@context': 'https://www.w3.org/ns/activitystreams', 'type': 'Update', 'id': 'https://bsky.brid.gy/convert/ap/at://did:plc:alice/app.bsky.actor.profile/self#bridgy-fed-update-2022-01-02T03:04:05+00:00', 'actor': 'https://bsky.brid.gy/ap/did:plc:alice', 'object': { **expected_actor, 'updated': '2022-01-02T03:04:05+00:00', 'url': ['http://localhost/r/https://bsky.app/profile/alice.com', 'http://localhost/r/https://alice.com/'], }, }, ignore=('attachment', 'publicKey', 'to')) @patch('requests.post') def test_atproto_reply_to_activitypub(self, mock_post): """ATProto reply, from firehose to ActivityPub. ActivityPub original post http://inst/post by bob ATProto reply 123 by alice.com (did:plc:alice) https://github.com/snarfed/bridgy-fed/issues/720 """ alice = self.make_atproto_user('did:plc:alice') bob = self.make_ap_user('http://inst/bob', 'did:plc:bob') self.store_object(id='http://inst/post', source_protocol='activitypub', our_as1={ 'objectType': 'note', 'author': 'http://inst/bob', }, copies=[ Target(uri='at://did:plc:bob/app.bsky.feed.post/123', protocol='atproto'), ]) reply = { '$type': 'app.bsky.feed.post', 'text': 'I hereby reply', 'reply': { 'root': { 'cid': '...', 'uri': 'at://did:plc:bob/app.bsky.feed.post/123', }, 'parent': { 'cid': '...', 'uri': 'at://did:plc:bob/app.bsky.feed.post/123', }, }, } self.firehose(repo='did:plc:alice', action='create', seq=456, path='app.bsky.feed.post/456', record=reply) self.assert_ap_deliveries(mock_post, ['http://inst/bob/inbox'], from_user=alice, data={ '@context': 'https://www.w3.org/ns/activitystreams', 'type': 'Create', 'id': 'https://bsky.brid.gy/convert/ap/at://did:plc:alice/app.bsky.feed.post/456#bridgy-fed-create', 'actor': 'https://bsky.brid.gy/ap/did:plc:alice', 'published': '2022-01-02T03:04:05+00:00', 'object': { 'type': 'Note', 'id': 'https://bsky.brid.gy/convert/ap/at://did:plc:alice/app.bsky.feed.post/456', 'url': 'http://localhost/r/https://bsky.app/profile/did:plc:alice/post/456', 'attributedTo': 'https://bsky.brid.gy/ap/did:plc:alice', 'content': '

I hereby reply

', 'contentMap': {'en': '

I hereby reply

'}, 'inReplyTo': 'http://inst/post', 'tag': [{'type': 'Mention', 'href': 'http://inst/bob'}], 'to': ['https://www.w3.org/ns/activitystreams#Public'], 'cc': ['http://inst/bob'], }, 'to': ['https://www.w3.org/ns/activitystreams#Public'], 'cc': ['http://inst/bob'], }) @patch('requests.post', return_value=BSKY_SEND_MESSAGE_RESP) @patch('requests.get', return_value=BSKY_GET_CONVO_RESP) def test_atproto_not_bridged_reply_to_activitypub(self, mock_get, mock_post): """ATProto reply from a non-bridged user, from firehose to ActivityPub. Should be enqueued, shouldn't be delivered, should send a DM notif to the non-bridged user, should enqueue a notif task (which is run inline). ActivityPub original post http://inst/post by bob ATProto reply 123 by alice.com (did:plc:alice) """ alice = self.make_atproto_user('did:plc:alice', enabled_protocols=[]) bob = self.make_ap_user('http://inst/bob', 'did:plc:bob') self.make_web_user('ap.brid.gy', did='did:plc:ap') self.make_web_user('bsky.brid.gy') self.store_object( id='http://inst/post', source_protocol='activitypub', our_as1={'objectType': 'note', 'author': 'http://inst/bob'}, copies=[Target(uri='at://did:plc:bob/app.bsky.feed.post/123', protocol='atproto')] ) reply = { '$type': 'app.bsky.feed.post', 'text': 'I hereby reply', 'reply': { 'root': { 'cid': '...', 'uri': 'at://did:plc:bob/app.bsky.feed.post/123', }, 'parent': { 'cid': '...', 'uri': 'at://did:plc:bob/app.bsky.feed.post/123', }, }, } self.firehose(limit=1, repo='did:plc:alice', action='create', seq=456, path='app.bsky.feed.post/456', record=reply) self.assertEqual(2, len(mock_post.call_args_list)) # DM notif to bob of reply args, kwargs = mock_post.call_args_list[0] self.assertEqual(('http://inst/bob/inbox',), args) content = json_loads(kwargs['data'])['object']['content'] self.assertTrue(content.startswith("""\

Hi! Here are your recent interactions from people who aren't bridged into the fediverse: