diff --git a/atproto.py b/atproto.py index 131e773c..710f9a57 100644 --- a/atproto.py +++ b/atproto.py @@ -415,9 +415,13 @@ def poll_notifications(): Uses the ``listNotifications`` endpoint, which is intended for end users. 🤷 https://github.com/bluesky-social/atproto/discussions/1538 + + TODO: unify with poll_posts """ repos = {r.key.id(): r for r in AtpRepo.query()} logger.info(f'Got {len(repos)} repos') + if not repos: + return # TODO: switch from atproto_did to copies users = itertools.chain(*(cls.query(cls.atproto_did.IN(list(repos))) @@ -459,3 +463,62 @@ def poll_notifications(): # User yet. return 'OK' + + +# URL route is registered in hub.py +def poll_posts(): + """Fetches and enqueueus new posts from the AppView for our users. + + Uses the ``getTimeline`` endpoint, which is intended for end users. 🤷 + + TODO: unify with poll_notifications + """ + repos = {r.key.id(): r for r in AtpRepo.query()} + logger.info(f'Got {len(repos)} repos') + if not repos: + return + + # TODO: switch from atproto_did to copies + users = itertools.chain(*(cls.query(cls.atproto_did.IN(list(repos))) + for cls in set(PROTOCOLS.values()) + if cls and cls != ATProto)) + + # TODO: convert to Session for connection pipelining! + client = Client(f'https://{os.environ["APPVIEW_HOST"]}', + headers={'User-Agent': USER_AGENT}) + + for user in users: + logging.debug(f'Fetching notifs for {user.key.id()}') + + # TODO: store and use cursor + # seenAt would be easier, but they don't support it yet + # https://github.com/bluesky-social/atproto/issues/1636 + repo = repos[user.atproto_did] + client.session['accessJwt'] = service_jwt(os.environ['APPVIEW_HOST'], + repo_did=user.atproto_did, + privkey=repo.signing_key) + resp = client.app.bsky.feed.getTimeline() + for item in resp['feed']: + uri = item['post']['uri'] + logger.debug(f'Got {uri}: {json_dumps(item, indent=2)}') + + # TODO: handle reposts once we have a URI for them + # https://github.com/bluesky-social/atproto/issues/1811 + # + # TODO: verify sig. skipping this for now because we're getting + # these from the AppView, which is trusted, specifically we expect + # the BGS and/or the AppView already checked sigs. + obj = Object.get_or_create(id=uri, bsky=item['post'], + source_protocol=ATProto.ABBREV) + if not obj.status: + obj.status = 'new' + obj.add('feed', user.key) + obj.put() + + common.create_task(queue='receive', obj=obj.key.urlsafe(), + authed_as=user.atproto_did) + # note that we don't pass a user param above. it's the acting user, + # which is different for every notif, and may not actually have a BF + # User yet. + + return 'OK' diff --git a/hub.py b/hub.py index ce25c496..4b098ca4 100644 --- a/hub.py +++ b/hub.py @@ -70,6 +70,10 @@ app.add_url_rule('/queue/atproto-poll-notifs', view_func=atproto.poll_notifications, methods=['POST']) +app.add_url_rule('/queue/atproto-poll-posts', + view_func=atproto.poll_posts, + methods=['POST']) + @app.post('/queue/atproto-commit') @flask_util.cloud_tasks_only def atproto_commit(): diff --git a/tests/test_atproto.py b/tests/test_atproto.py index 55f47aad..3b041184 100644 --- a/tests/test_atproto.py +++ b/tests/test_atproto.py @@ -3,7 +3,7 @@ import base64 import copy import logging from unittest import skip -from unittest.mock import call, MagicMock, patch +from unittest.mock import ANY, call, MagicMock, patch from arroba.datastore_storage import AtpBlock, AtpRemoteBlob, AtpRepo, DatastoreStorage from arroba.did import encode_did_key @@ -704,8 +704,8 @@ class ATProtoTest(TestCase): @patch('requests.get') def test_poll_notifications(self, mock_get, mock_create_task): user_a = self.make_user(id='fake:user-a', cls=Fake, atproto_did=f'did:plc:a') - user_b = self.make_user(id='fake:user-c', cls=Fake, atproto_did=f'did:plc:b') - user_c = self.make_user(id='fake:user-b', cls=Fake, atproto_did=f'did:plc:c') + user_b = self.make_user(id='fake:user-b', cls=Fake, atproto_did=f'did:plc:b') + user_c = self.make_user(id='fake:user-c', cls=Fake, atproto_did=f'did:plc:c') Repo.create(self.storage, 'did:plc:a', signing_key=ATPROTO_KEY) Repo.create(self.storage, 'did:plc:c', signing_key=ATPROTO_KEY) @@ -809,3 +809,92 @@ class ATProtoTest(TestCase): self.assertEqual(follow, follow_obj.bsky) self.assert_task(mock_create_task, 'receive', '/queue/receive', obj=follow_obj.key.urlsafe(), authed_as='did:plc:a') + + @patch.object(tasks_client, 'create_task', return_value=Task(name='my task')) + @patch('requests.get') + def test_poll_posts(self, mock_get, mock_create_task): + user_a = self.make_user(id='fake:user-a', cls=Fake, atproto_did=f'did:plc:a') + user_b = self.make_user(id='fake:user-b', cls=Fake, atproto_did=f'did:plc:b') + user_c = self.make_user(id='fake:user-c', cls=Fake, atproto_did=f'did:plc:c') + Repo.create(self.storage, 'did:plc:a', signing_key=ATPROTO_KEY) + Repo.create(self.storage, 'did:plc:b', signing_key=ATPROTO_KEY) + Repo.create(self.storage, 'did:plc:c', signing_key=ATPROTO_KEY) + + post_view = { + '$type': 'app.bsky.feed.defs#postView', + 'uri': 'at://did:web:alice.com/app.bsky.feed.post/123', + 'cid': 'TODO', + 'record': { + '$type': 'app.bsky.feed.post', + 'text': 'My original post', + 'createdAt': '2007-07-07T03:04:05', + }, + 'author': { + '$type': 'app.bsky.actor.defs#profileViewBasic', + 'did': 'did:web:alice.com', + 'handle': 'alice.com', + }, + } + + mock_get.side_effect = [ + requests_response({ + 'cursor': '...', + 'feed': [{ + '$type': 'app.bsky.feed.defs#feedViewPost', + 'post': post_view, + }], + }), + requests_response({ + **DID_DOC, + 'id': 'did:plc:alice.com', + }), + requests_response({ + 'cursor': '...', + 'feed': [], + }), + requests_response({ + 'cursor': '...', + 'feed': [{ + '$type': 'app.bsky.feed.defs#feedViewPost', + 'post': post_view, + 'reason': { + '$type': 'app.bsky.feed.defs#reasonRepost', + 'by': { + '$type': 'app.bsky.actor.defs#profileViewBasic', + 'did': 'did:web:bob.com', + 'handle': 'bob.com', + }, + 'indexedAt': '2022-01-02T03:04:05+00:00', + }, + }], + }), + ] + + resp = self.post('/queue/atproto-poll-posts', client=hub.app.test_client()) + self.assertEqual(200, resp.status_code) + + get_timeline = call( + 'https://api.bsky-sandbox.dev/xrpc/app.bsky.feed.getTimeline', + json=None, + headers={ + 'Content-Type': 'application/json', + 'User-Agent': common.USER_AGENT, + 'Authorization': ANY, + }) + self.assertEqual([ + get_timeline, + self.req('https://alice.com/.well-known/did.json'), + get_timeline, + get_timeline, + ], mock_get.call_args_list) + + post_obj = Object.get_by_id('at://did:web:alice.com/app.bsky.feed.post/123') + self.assertEqual(post_view, post_obj.bsky) + self.assert_task(mock_create_task, 'receive', '/queue/receive', + obj=post_obj.key.urlsafe(), authed_as='did:plc:a') + + # TODO + # repost_obj = Object.get_by_id('at://did:plc:d/app.bsky.feed.post/456') + # self.assertEqual(repost, repost_obj.bsky) + # self.assert_task(mock_create_task, 'receive', '/queue/receive', + # obj=repost_obj.key.urlsafe(), authed_as='did:plc:eve')