diff --git a/app.py b/app.py index 66e2fe6b..4cdea6f9 100644 --- a/app.py +++ b/app.py @@ -10,11 +10,3 @@ import activitypub, atproto, convert, follow, pages, redirect, ui, webfinger, we import models models.reset_protocol_properties() - -app.add_url_rule('/queue/atproto-poll-notifs', - view_func=atproto.poll_notifications, - methods=['GET', 'POST']) - -app.add_url_rule('/queue/atproto-poll-posts', - view_func=atproto.poll_posts, - methods=['GET', 'POST']) diff --git a/atproto.py b/atproto.py index 4e1dd41a..45de5281 100644 --- a/atproto.py +++ b/atproto.py @@ -636,155 +636,3 @@ class ATProto(User, Protocol): output = client.com.atproto.moderation.createReport(input) logger.info(f'Created report on {mod_host}: {json_dumps(output)}') return True - -# URL route is registered in hub.py -# TODO: this is now unused. delete it! -def poll_notifications(): - """Fetches and enqueueus new activities from the AppView for our users. - - Uses the ``listNotifications`` endpoint, which is intended for end users. 🤷 - - https://github.com/bluesky-social/atproto/discussions/1538 - - TODO: unify with poll_posts - """ - repos = {r.key.id(): r for r in AtpRepo.query()} - logger.info(f'Got {len(repos)} repos') - if not repos: - return 'Nothing to do ¯\_(ツ)_/¯', 204 - - users = itertools.chain(*(cls.query(cls.copies.uri.IN(list(repos))) - for cls in set(PROTOCOLS.values()) - if cls and cls != ATProto)) - - # this client needs to be request-local because we set its service token - # below per user that we're polling - client = Client(f'https://{os.environ["APPVIEW_HOST"]}', - headers={'User-Agent': USER_AGENT}) - - for user in users: - if not user.is_enabled(ATProto): - logger.info(f'Skipping {user.key.id()}') - continue - - logger.debug(f'Fetching notifs for {user.key.id()}') - - did = user.get_copy(ATProto) - repo = repos[did] - client.session['accessJwt'] = service_jwt(os.environ['APPVIEW_HOST'], - repo_did=did, - privkey=repo.signing_key) - - resp = client.app.bsky.notification.listNotifications( - # higher limit for protocol bot users to try to make sure we don't - # miss any follows - limit=100 if Protocol.for_bridgy_subdomain(user.handle) else 10) - latest_indexed_at = user.atproto_notifs_indexed_at - - for notif in resp['notifications']: - if (user.atproto_notifs_indexed_at - and notif['indexedAt'] <= user.atproto_notifs_indexed_at): - continue - - if not latest_indexed_at or notif['indexedAt'] > latest_indexed_at: - latest_indexed_at = notif['indexedAt'] - - # TODO: verify sig. skipping this for now because we're getting - # these from the AppView, which is trusted, specifically we expect - # the relay and/or the AppView already checked sigs. - actor_did = notif['author']['did'] - obj = Object.get_or_create(id=notif['uri'], bsky=notif['record'], - source_protocol=ATProto.ABBREV, - actor=actor_did) - if obj.status in ('complete', 'ignored'): - continue - - logger.debug(f'Got new {notif["reason"]} from {notif["author"]["handle"]} {notif["uri"]} {notif["cid"]} : {json_dumps(notif, indent=2)}') - - if not obj.status: - obj.status = 'new' - obj.add('notify', user.key) - obj.put() - - common.create_task(queue='receive', obj=obj.key.urlsafe(), - authed_as=actor_did) - - # store indexed_at - @ndb.transactional() - def store_indexed_at(): - u = user.key.get() - u.atproto_notifs_indexed_at = latest_indexed_at - u.put() - store_indexed_at() - - return 'OK' - - -# URL route is registered in hub.py -# TODO: this is now unused. delete it! -def poll_posts(): - """Fetches and enqueueus bridged Bluesky users' new posts from the AppView. - - Uses the ``getAuthorFeed`` endpoint, which is intended for clients. 🤷 - - TODO: unify with poll_notifications - """ - # this client needs to be request-local because we set its service token - # below per user that we're polling - client = Client(f'https://{os.environ["APPVIEW_HOST"]}', - headers={'User-Agent': USER_AGENT}) - - for user in ATProto.query(ATProto.enabled_protocols != None): - if user.status: - continue - - did = user.key.id() - logger.debug(f'Fetching posts for {did} {user.handle}') - - resp = appview.app.bsky.feed.getAuthorFeed( - actor=did, filter='posts_with_replies', limit=10) - latest_indexed_at = user.atproto_feed_indexed_at - - for item in resp['feed']: - # TODO: handle reposts once we have a URI for them - # https://github.com/bluesky-social/atproto/issues/1811 - if item.get('reason'): - continue - - post = item['post'] - - # TODO: use item['reason']['indexedAt'] instead for reposts once - # we're handling them - if (user.atproto_feed_indexed_at - and post['indexedAt'] <= user.atproto_feed_indexed_at): - continue - - if not latest_indexed_at or post['indexedAt'] > latest_indexed_at: - latest_indexed_at = post['indexedAt'] - - # TODO: verify sig. skipping this for now because we're getting - # these from the AppView, which is trusted, specifically we expect - # the relay and/or the AppView already checked sigs. - assert did == post['author']['did'] - obj = Object.get_or_create(id=post['uri'], bsky=post['record'], - source_protocol=ATProto.ABBREV, actor=did) - if obj.status in ('complete', 'ignored'): - continue - - logger.debug(f'Got new post: {post["uri"]} : {json_dumps(item, indent=2)}') - if not obj.status: - obj.status = 'new' - obj.add('feed', user.key) - obj.put() - - common.create_task(queue='receive', obj=obj.key.urlsafe(), authed_as=did) - - # store indexed_at - @ndb.transactional() - def store_indexed_at(): - u = user.key.get() - u.atproto_feed_indexed_at = latest_indexed_at - u.put() - store_indexed_at() - - return 'OK' diff --git a/cron.yaml b/cron.yaml index e89fd93d..fb93a24d 100644 --- a/cron.yaml +++ b/cron.yaml @@ -2,12 +2,3 @@ # docs: https://cloud.google.com/appengine/docs/standard/python3/scheduling-jobs-with-cron-yaml cron: -# - description: ATProto poll posts -# url: /queue/atproto-poll-posts -# schedule: every 1 minutes -# target: default - -# - description: ATProto poll notifications -# url: /queue/atproto-poll-notifs -# schedule: every 1 minutes -# target: default diff --git a/dispatch.yaml b/dispatch.yaml index 3f5a67fe..ed85075f 100644 --- a/dispatch.yaml +++ b/dispatch.yaml @@ -4,11 +4,5 @@ dispatch: - url: "*/queue/atproto-commit" service: hub - # - url: "*/queue/atproto-poll-notifs" - # service: hub - - # - url: "*/queue/atproto-poll-posts" - # service: hub - - url: "*/xrpc/com.atproto.sync.subscribeRepos" service: hub diff --git a/hub.py b/hub.py index 44e027f8..18b8d035 100644 --- a/hub.py +++ b/hub.py @@ -28,9 +28,7 @@ util.set_user_agent(USER_AGENT) models.reset_protocol_properties() -# # Flask app -# app = Flask(__name__) app.json.compact = False app_dir = Path(__file__).parent @@ -55,18 +53,9 @@ def health_check(): return 'OK' -# -# ATProto XRPC server, other URL routes -# +# ATProto XRPC server lexrpc.flask_server.init_flask(arroba.server.server, app) -app.add_url_rule('/queue/atproto-poll-notifs', - view_func=atproto.poll_notifications, - methods=['GET', 'POST']) - -app.add_url_rule('/queue/atproto-poll-posts', - view_func=atproto.poll_posts, - methods=['GET', 'POST']) @app.post('/queue/atproto-commit') @flask_util.cloud_tasks_only @@ -79,25 +68,6 @@ def atproto_commit(): return 'OK' -# -# ATProto firehose consumer -# -# if LOCAL_SERVER or not DEBUG: -# def subscribe(): -# with appengine_config.ndb_client.context(): -# atproto_firehose.subscribe() - -# assert 'atproto_firehose.subscribe' not in [t.name for t in threading.enumerate()] -# Thread(target=subscribe, name='atproto_firehose.subscribe').start() - -# def handle(): -# with appengine_config.ndb_client.context(): -# atproto_firehose.handle() - -# assert 'atproto_firehose.handle' not in [t.name for t in threading.enumerate()] -# Thread(target=handle, name='atproto_firehose.handle').start() - - # send requestCrawl to relay # delay because we're not up and serving XRPCs at this point yet. not sure why not. if 'GAE_INSTANCE' in os.environ: # prod diff --git a/templates/docs.html b/templates/docs.html index cd2f3fbb..33ec30a1 100644 --- a/templates/docs.html +++ b/templates/docs.html @@ -1423,7 +1423,7 @@ I'm Ryan Barrett. I'm just a guy who likes Publish inbound webmention to fed.brid.gy deliver to fed.brid.gy inbox, user or shared - poll posts on AppView via app.bsky.feed.getTimeline + receive post via firehose from relay publish event to BF relay @@ -1438,7 +1438,7 @@ I'm Ryan Barrett. I'm just a guy who likes users: UI on BF user page
code:
webmention with u-follow-of Follow activity delivered to BF user inbox - poll notifications on AppView via app.bsky.notification.listNotifications + receive follow via firehose from relay user's client sends REQ to BF relay @@ -1452,7 +1452,7 @@ I'm Ryan Barrett. I'm just a guy who likes Response inbound webmention to a BF proxy page Create, Like, Announce delivered to BF user inbox - poll notifications on AppView via app.bsky.notification.listNotifications + receive response via firehose from relay NIP-10 response event received at BF relay or other relay diff --git a/tests/test_atproto.py b/tests/test_atproto.py index 167d8d5a..f48a8158 100644 --- a/tests/test_atproto.py +++ b/tests/test_atproto.py @@ -1240,248 +1240,6 @@ class ATProtoTest(TestCase): }, }, data=None, headers=ANY) - @patch.object(tasks_client, 'create_task', return_value=Task(name='my task')) - @patch('requests.get') - def test_poll_notifications(self, mock_get, mock_create_task): - user_a = self.make_user(id='fake:user-a', cls=Fake, - copies=[Target(uri='did:plc:a', protocol='atproto')]) - user_b = self.make_user(id='fake:user-b', cls=Fake, - copies=[Target(uri='did:plc:b', protocol='atproto')]) - user_c = self.make_user(id='fake:user-c', cls=Fake, - atproto_notifs_indexed_at='indexed-c-2', - copies=[Target(uri='did:plc:c', protocol='atproto')]) - - Repo.create(self.storage, 'did:plc:a', signing_key=ATPROTO_KEY) - Repo.create(self.storage, 'did:plc:c', signing_key=ATPROTO_KEY) - - like = { - '$type': 'app.bsky.feed.like', - 'subject': { - 'cid': '...', - 'uri': 'at://did:plc:a/app.bsky.feed.post/999', - }, - } - reply = { - '$type': 'app.bsky.feed.post', - 'text': 'I hereby reply', - 'reply': { - 'root': { - 'cid': '...', - 'uri': 'at://did:plc:a/app.bsky.feed.post/987', - }, - 'parent': { - 'cid': '...', - 'uri': 'at://did:plc:a/app.bsky.feed.post/987', - } - }, - } - follow = { - '$type': 'app.bsky.graph.follow', - 'subject': 'did:plc:c', - } - eve = { - '$type': 'app.bsky.actor.defs#profileView', - 'did': 'did:plc:eve', - 'handle': 'eve.com', - } - alice = { - '$type': 'app.bsky.actor.defs#profileView', - 'did': 'did:plc:a', - 'handle': 'alice', - } - - mock_get.side_effect = [ - requests_response({ - 'notifications': [{ - 'uri': 'at://did:plc:d/app.bsky.feed.like/123', - 'cid': '...', - 'author': eve, - 'record': like, - 'reason': 'like', - 'indexedAt': 'indexed-a-3', - }, { - 'uri': 'at://did:plc:d/app.bsky.feed.post/456', - 'cid': '...', - 'author': eve, - 'record': reply, - 'reason': 'reply', - 'indexedAt': 'indexed-a-1', - }], - }), - requests_response(DID_DOC), - requests_response({ - 'notifications': [{ - 'uri': 'at://did:plc:d/app.bsky.graph.follow/789', - 'cid': '...', - 'author': alice, - 'record': follow, - 'reason': 'follow', - 'indexedAt': 'indexed-c-3', - }, { - 'uri': 'at://did:plc:d/app.bsky.graph.follow/abc', - 'cid': '...', - 'author': eve, - 'record': follow, - 'reason': 'follow', - 'indexedAt': 'indexed-c-1', - }], - }), - ] - - resp = self.post('/queue/atproto-poll-notifs', client=hub.app.test_client()) - self.assertEqual(200, resp.status_code) - - expected_list_notifs = [call(url, json=None, data=None, headers={ - 'Content-Type': 'application/json', - 'User-Agent': common.USER_AGENT, - }) for url in [ - 'https://appview.local/xrpc/app.bsky.notification.listNotifications?limit=10', - 'https://appview.local/xrpc/app.bsky.notification.listNotifications?limit=10', - ]] - - assert mock_get.call_args_list[0].kwargs['headers'].pop('Authorization') - self.assertEqual(expected_list_notifs[0], mock_get.call_args_list[0]) - - assert mock_get.call_args_list[2].kwargs['headers'].pop('Authorization') - self.assertEqual(expected_list_notifs[1], mock_get.call_args_list[2]) - - like_obj = Object.get_by_id('at://did:plc:d/app.bsky.feed.like/123') - self.assertEqual(like, like_obj.bsky) - self.assert_task(mock_create_task, 'receive', '/queue/receive', - obj=like_obj.key.urlsafe(), authed_as='did:plc:eve') - - reply_obj = Object.get_by_id('at://did:plc:d/app.bsky.feed.post/456') - self.assertEqual(reply, reply_obj.bsky) - self.assert_task(mock_create_task, 'receive', '/queue/receive', - obj=reply_obj.key.urlsafe(), authed_as='did:plc:eve') - - follow_obj = Object.get_by_id('at://did:plc:d/app.bsky.graph.follow/789') - self.assertEqual(follow, follow_obj.bsky) - self.assert_task(mock_create_task, 'receive', '/queue/receive', - obj=follow_obj.key.urlsafe(), authed_as='did:plc:a') - - self.assertEqual('indexed-a-3', user_a.key.get().atproto_notifs_indexed_at) - self.assertEqual('indexed-c-3', user_c.key.get().atproto_notifs_indexed_at) - - @patch.object(tasks_client, 'create_task', return_value=Task(name='my task')) - @patch('requests.get') - def test_poll_posts(self, mock_get, mock_create_task): - for i in ['a', 'b', 'c', 'd']: - did = f'did:plc:{i}' - self.store_object(id=did, raw={ - **DID_DOC, - 'id': did, - }) - - user_a = self.make_user( - id='did:plc:a', cls=ATProto, enabled_protocols=['fake'], - copies=[Target(uri='fake:user-a', protocol='fake')], - atproto_feed_indexed_at='2020-01-02T03:04:05.000Z') - user_b = self.make_user(id='did:plc:b', cls=ATProto) # no enabled protocols - user_c = self.make_user( - id='did:plc:c', cls=ATProto, enabled_protocols=['fake'], - copies=[Target(uri='fake:user-c', protocol='fake')]) - user_d = self.make_user( - id='did:plc:d', cls=ATProto, enabled_protocols=['fake'], - copies=[Target(uri='fake:user-d', protocol='fake')]) - - post = { - '$type': 'app.bsky.feed.post', - 'text': 'My original post', - 'createdAt': '2007-07-07T03:04:05', - } - post_view = { - '$type': 'app.bsky.feed.defs#postView', - 'uri': 'at://did:web:alice.com/app.bsky.feed.post/123', - 'cid': '', - 'record': post, - 'author': { - '$type': 'app.bsky.actor.defs#profileViewBasic', - 'did': 'did:plc:a', - 'handle': 'alice.com', - }, - 'indexedAt': '2022-01-02T03:04:05.000Z', - } - - mock_get.side_effect = [ - requests_response({ - 'feed': [{ - '$type': 'app.bsky.feed.defs#feedViewPost', - 'post': post_view, - }, { - '$type': 'app.bsky.feed.defs#feedViewPost', - 'post': { - **post_view, - 'uri': 'at://did:web:alice.com/app.bsky.feed.post/456', - # before atproto_feed_indexed_at, should be ignored - 'indexedAt': '2015-01-02T03:04:05.000Z', - }, - }], - }), - requests_response({ - **DID_DOC, - 'id': 'did:plc:alice.com', - }), - requests_response({ - 'feed': [], - }), - requests_response({ - 'feed': [{ - '$type': 'app.bsky.feed.defs#feedViewPost', - 'post': post_view, - 'reason': { - '$type': 'app.bsky.feed.defs#reasonRepost', - 'by': { - '$type': 'app.bsky.actor.defs#profileViewBasic', - 'did': 'did:web:bob.com', - 'handle': 'bob.com', - }, - 'indexedAt': '2022-01-02T03:04:05.000Z', - }, - }], - }), - ] - - resp = self.post('/queue/atproto-poll-posts', client=hub.app.test_client()) - self.assertEqual(200, resp.status_code) - - get = [call(url, json=None, data=None, headers={ - 'Content-Type': 'application/json', - 'User-Agent': common.USER_AGENT, - }) for url in [ - 'https://appview.local/xrpc/app.bsky.feed.getAuthorFeed?actor=did%3Aplc%3Aa&filter=posts_with_replies&limit=10', - 'https://appview.local/xrpc/app.bsky.feed.getAuthorFeed?actor=did%3Aplc%3Ac&filter=posts_with_replies&limit=10', - 'https://appview.local/xrpc/app.bsky.feed.getAuthorFeed?actor=did%3Aplc%3Ad&filter=posts_with_replies&limit=10', - ]] - self.assertEqual([ - get[0], - self.req('https://alice.com/.well-known/did.json'), - get[1], - get[2], - ], mock_get.call_args_list) - - post_obj = Object.get_by_id('at://did:web:alice.com/app.bsky.feed.post/123') - self.assertEqual(post, post_obj.bsky) - self.assertEqual(1, mock_create_task.call_count) - self.assert_task(mock_create_task, 'receive', '/queue/receive', - obj=post_obj.key.urlsafe(), authed_as='did:plc:a') - - # indexedAt was too old - self.assertIsNone(Object.get_by_id( - 'at://did:web:alice.com/app.bsky.feed.post/456)')) - - self.assertEqual('2022-01-02T03:04:05.000Z', - user_a.key.get().atproto_feed_indexed_at) - self.assertIsNone(user_b.key.get().atproto_feed_indexed_at) - self.assertIsNone(user_c.key.get().atproto_feed_indexed_at) - self.assertIsNone(user_d.key.get().atproto_feed_indexed_at) - - # TODO: https://github.com/snarfed/bridgy-fed/issues/728 - # repost_obj = Object.get_by_id('at://did:plc:d/app.bsky.feed.post/456') - # self.assertEqual(repost, repost_obj.bsky) - # self.assert_task(mock_create_task, 'receive', '/queue/receive', - # obj=repost_obj.key.urlsafe(), authed_as='did:plc:eve') - def test_datastore_client_get_record_datastore(self): self.make_user_and_repo() post = { diff --git a/tests/test_integrations.py b/tests/test_integrations.py index 74341fde..3a68d33b 100644 --- a/tests/test_integrations.py +++ b/tests/test_integrations.py @@ -1,5 +1,6 @@ """Integration tests.""" import copy +from unittest import skip from unittest.mock import patch from arroba.datastore_storage import DatastoreStorage @@ -39,6 +40,8 @@ PROFILE_GETRECORD = { @patch('ids.COPIES_PROTOCOLS', ['atproto']) class IntegrationTests(TestCase): + # TODO: port to firehose + @skip @patch('requests.post') @patch('requests.get') def test_atproto_notify_reply_to_activitypub(self, mock_get, mock_post): @@ -136,6 +139,8 @@ class IntegrationTests(TestCase): }) + # TODO: port to firehose + @skip @patch('requests.post', return_value=requests_response('')) @patch('requests.get') def test_atproto_follow_to_web(self, mock_get, mock_post): @@ -463,6 +468,8 @@ class IntegrationTests(TestCase): self.assertEqual(0, len(user.copies)) + # TODO: port to firehose + @skip @patch('requests.post') @patch('requests.get') def test_atproto_follow_ap_bot_user_enables_protocol(self, mock_get, mock_post):