ATProto: delete polling code and config

for #978. end of a (brief) era!
pull/1081/head
Ryan Barrett 2024-05-21 13:14:16 -07:00
rodzic d2ba81a211
commit 7f72a62081
Nie znaleziono w bazie danych klucza dla tego podpisu
ID klucza GPG: 6BE31FDF4776E9D4
8 zmienionych plików z 11 dodań i 451 usunięć

8
app.py
Wyświetl plik

@ -10,11 +10,3 @@ import activitypub, atproto, convert, follow, pages, redirect, ui, webfinger, we
import models
models.reset_protocol_properties()
app.add_url_rule('/queue/atproto-poll-notifs',
view_func=atproto.poll_notifications,
methods=['GET', 'POST'])
app.add_url_rule('/queue/atproto-poll-posts',
view_func=atproto.poll_posts,
methods=['GET', 'POST'])

Wyświetl plik

@ -636,155 +636,3 @@ class ATProto(User, Protocol):
output = client.com.atproto.moderation.createReport(input)
logger.info(f'Created report on {mod_host}: {json_dumps(output)}')
return True
# URL route is registered in hub.py
# TODO: this is now unused. delete it!
def poll_notifications():
"""Fetches and enqueueus new activities from the AppView for our users.
Uses the ``listNotifications`` endpoint, which is intended for end users. 🤷
https://github.com/bluesky-social/atproto/discussions/1538
TODO: unify with poll_posts
"""
repos = {r.key.id(): r for r in AtpRepo.query()}
logger.info(f'Got {len(repos)} repos')
if not repos:
return 'Nothing to do ¯\_(ツ)_/¯', 204
users = itertools.chain(*(cls.query(cls.copies.uri.IN(list(repos)))
for cls in set(PROTOCOLS.values())
if cls and cls != ATProto))
# this client needs to be request-local because we set its service token
# below per user that we're polling
client = Client(f'https://{os.environ["APPVIEW_HOST"]}',
headers={'User-Agent': USER_AGENT})
for user in users:
if not user.is_enabled(ATProto):
logger.info(f'Skipping {user.key.id()}')
continue
logger.debug(f'Fetching notifs for {user.key.id()}')
did = user.get_copy(ATProto)
repo = repos[did]
client.session['accessJwt'] = service_jwt(os.environ['APPVIEW_HOST'],
repo_did=did,
privkey=repo.signing_key)
resp = client.app.bsky.notification.listNotifications(
# higher limit for protocol bot users to try to make sure we don't
# miss any follows
limit=100 if Protocol.for_bridgy_subdomain(user.handle) else 10)
latest_indexed_at = user.atproto_notifs_indexed_at
for notif in resp['notifications']:
if (user.atproto_notifs_indexed_at
and notif['indexedAt'] <= user.atproto_notifs_indexed_at):
continue
if not latest_indexed_at or notif['indexedAt'] > latest_indexed_at:
latest_indexed_at = notif['indexedAt']
# TODO: verify sig. skipping this for now because we're getting
# these from the AppView, which is trusted, specifically we expect
# the relay and/or the AppView already checked sigs.
actor_did = notif['author']['did']
obj = Object.get_or_create(id=notif['uri'], bsky=notif['record'],
source_protocol=ATProto.ABBREV,
actor=actor_did)
if obj.status in ('complete', 'ignored'):
continue
logger.debug(f'Got new {notif["reason"]} from {notif["author"]["handle"]} {notif["uri"]} {notif["cid"]} : {json_dumps(notif, indent=2)}')
if not obj.status:
obj.status = 'new'
obj.add('notify', user.key)
obj.put()
common.create_task(queue='receive', obj=obj.key.urlsafe(),
authed_as=actor_did)
# store indexed_at
@ndb.transactional()
def store_indexed_at():
u = user.key.get()
u.atproto_notifs_indexed_at = latest_indexed_at
u.put()
store_indexed_at()
return 'OK'
# URL route is registered in hub.py
# TODO: this is now unused. delete it!
def poll_posts():
"""Fetches and enqueueus bridged Bluesky users' new posts from the AppView.
Uses the ``getAuthorFeed`` endpoint, which is intended for clients. 🤷
TODO: unify with poll_notifications
"""
# this client needs to be request-local because we set its service token
# below per user that we're polling
client = Client(f'https://{os.environ["APPVIEW_HOST"]}',
headers={'User-Agent': USER_AGENT})
for user in ATProto.query(ATProto.enabled_protocols != None):
if user.status:
continue
did = user.key.id()
logger.debug(f'Fetching posts for {did} {user.handle}')
resp = appview.app.bsky.feed.getAuthorFeed(
actor=did, filter='posts_with_replies', limit=10)
latest_indexed_at = user.atproto_feed_indexed_at
for item in resp['feed']:
# TODO: handle reposts once we have a URI for them
# https://github.com/bluesky-social/atproto/issues/1811
if item.get('reason'):
continue
post = item['post']
# TODO: use item['reason']['indexedAt'] instead for reposts once
# we're handling them
if (user.atproto_feed_indexed_at
and post['indexedAt'] <= user.atproto_feed_indexed_at):
continue
if not latest_indexed_at or post['indexedAt'] > latest_indexed_at:
latest_indexed_at = post['indexedAt']
# TODO: verify sig. skipping this for now because we're getting
# these from the AppView, which is trusted, specifically we expect
# the relay and/or the AppView already checked sigs.
assert did == post['author']['did']
obj = Object.get_or_create(id=post['uri'], bsky=post['record'],
source_protocol=ATProto.ABBREV, actor=did)
if obj.status in ('complete', 'ignored'):
continue
logger.debug(f'Got new post: {post["uri"]} : {json_dumps(item, indent=2)}')
if not obj.status:
obj.status = 'new'
obj.add('feed', user.key)
obj.put()
common.create_task(queue='receive', obj=obj.key.urlsafe(), authed_as=did)
# store indexed_at
@ndb.transactional()
def store_indexed_at():
u = user.key.get()
u.atproto_feed_indexed_at = latest_indexed_at
u.put()
store_indexed_at()
return 'OK'

Wyświetl plik

@ -2,12 +2,3 @@
# docs: https://cloud.google.com/appengine/docs/standard/python3/scheduling-jobs-with-cron-yaml
cron:
# - description: ATProto poll posts
# url: /queue/atproto-poll-posts
# schedule: every 1 minutes
# target: default
# - description: ATProto poll notifications
# url: /queue/atproto-poll-notifs
# schedule: every 1 minutes
# target: default

Wyświetl plik

@ -4,11 +4,5 @@ dispatch:
- url: "*/queue/atproto-commit"
service: hub
# - url: "*/queue/atproto-poll-notifs"
# service: hub
# - url: "*/queue/atproto-poll-posts"
# service: hub
- url: "*/xrpc/com.atproto.sync.subscribeRepos"
service: hub

32
hub.py
Wyświetl plik

@ -28,9 +28,7 @@ util.set_user_agent(USER_AGENT)
models.reset_protocol_properties()
#
# Flask app
#
app = Flask(__name__)
app.json.compact = False
app_dir = Path(__file__).parent
@ -55,18 +53,9 @@ def health_check():
return 'OK'
#
# ATProto XRPC server, other URL routes
#
# ATProto XRPC server
lexrpc.flask_server.init_flask(arroba.server.server, app)
app.add_url_rule('/queue/atproto-poll-notifs',
view_func=atproto.poll_notifications,
methods=['GET', 'POST'])
app.add_url_rule('/queue/atproto-poll-posts',
view_func=atproto.poll_posts,
methods=['GET', 'POST'])
@app.post('/queue/atproto-commit')
@flask_util.cloud_tasks_only
@ -79,25 +68,6 @@ def atproto_commit():
return 'OK'
#
# ATProto firehose consumer
#
# if LOCAL_SERVER or not DEBUG:
# def subscribe():
# with appengine_config.ndb_client.context():
# atproto_firehose.subscribe()
# assert 'atproto_firehose.subscribe' not in [t.name for t in threading.enumerate()]
# Thread(target=subscribe, name='atproto_firehose.subscribe').start()
# def handle():
# with appengine_config.ndb_client.context():
# atproto_firehose.handle()
# assert 'atproto_firehose.handle' not in [t.name for t in threading.enumerate()]
# Thread(target=handle, name='atproto_firehose.handle').start()
# send requestCrawl to relay
# delay because we're not up and serving XRPCs at this point yet. not sure why not.
if 'GAE_INSTANCE' in os.environ: # prod

Wyświetl plik

@ -1423,7 +1423,7 @@ I'm <a href="https://snarfed.org/">Ryan Barrett</a>. I'm just a guy who likes <a
<th>Publish inbound</th>
<td class="done"><a href="https://webmention.net/">webmention</a> to fed.brid.gy</td>
<td class="done">deliver to fed.brid.gy <a href="https://www.w3.org/TR/activitypub/#inbox">inbox</a>, user or shared</td>
<td class="done">poll posts on AppView via <code>app.bsky.feed.getTimeline</code></td>
<td class="done">receive post via <a href="https://docs.bsky.app/docs/advanced-guides/firehose">firehose</a> from relay</td>
<td>publish <a href="https://github.com/nostr-protocol/nips/blob/master/01.md#events-and-signatures">event</a> to BF relay</td>
</tr>
<tr>
@ -1438,7 +1438,7 @@ I'm <a href="https://snarfed.org/">Ryan Barrett</a>. I'm just a guy who likes <a
<td class="done"><em>users</em>: UI on BF user page<br>
<em>code</em>: <a href="https://webmention.net/">webmention</a> with <code><a href="https://indieweb.org/follow#How_to_publish">u-follow-of</a></code></td>
<td class="done"><code><a href="https://www.w3.org/TR/activitystreams-vocabulary/#dfn-follow">Follow</a></code> activity delivered to BF user <a href="https://www.w3.org/TR/activitypub/#inbox">inbox</a></td>
<td class="done">poll notifications on AppView via <code>app.bsky.notification.listNotifications</code></td>
<td class="done">receive follow via <a href="https://docs.bsky.app/docs/advanced-guides/firehose">firehose</a> from relay</td>
<td>user's client sends <code><a href="https://github.com/nostr-protocol/nips/blob/master/01.md#from-client-to-relay-sending-events-and-creating-subscriptions">REQ</a></code> to BF relay</td>
</tr>
<tr>
@ -1452,7 +1452,7 @@ I'm <a href="https://snarfed.org/">Ryan Barrett</a>. I'm just a guy who likes <a
<th>Response inbound</th>
<td class="done"><a href="https://webmention.net/">webmention</a> to a BF proxy page</td>
<td class="done"><code><a href="https://www.w3.org/TR/activitystreams-vocabulary/#dfn-create">Create</a></code>, <code><a href="https://www.w3.org/TR/activitystreams-vocabulary/#dfn-like">Like</a></code>, <code><a href="https://www.w3.org/TR/activitystreams-vocabulary/#dfn-announce">Announce</a></code> delivered to BF user <a href="https://www.w3.org/TR/activitypub/#inbox">inbox</a></td>
<td class="done">poll notifications on AppView via <code>app.bsky.notification.listNotifications</code></td>
<td class="done">receive response via <a href="https://docs.bsky.app/docs/advanced-guides/firehose">firehose</a> from relay</td>
<td><a href="https://github.com/nostr-protocol/nips/blob/master/10.md">NIP-10</a> response <a href="https://github.com/nostr-protocol/nips/blob/master/01.md#events-and-signatures">event</a> received at BF relay or other relay</td>
</tr>
<tr>

Wyświetl plik

@ -1240,248 +1240,6 @@ class ATProtoTest(TestCase):
},
}, data=None, headers=ANY)
@patch.object(tasks_client, 'create_task', return_value=Task(name='my task'))
@patch('requests.get')
def test_poll_notifications(self, mock_get, mock_create_task):
user_a = self.make_user(id='fake:user-a', cls=Fake,
copies=[Target(uri='did:plc:a', protocol='atproto')])
user_b = self.make_user(id='fake:user-b', cls=Fake,
copies=[Target(uri='did:plc:b', protocol='atproto')])
user_c = self.make_user(id='fake:user-c', cls=Fake,
atproto_notifs_indexed_at='indexed-c-2',
copies=[Target(uri='did:plc:c', protocol='atproto')])
Repo.create(self.storage, 'did:plc:a', signing_key=ATPROTO_KEY)
Repo.create(self.storage, 'did:plc:c', signing_key=ATPROTO_KEY)
like = {
'$type': 'app.bsky.feed.like',
'subject': {
'cid': '...',
'uri': 'at://did:plc:a/app.bsky.feed.post/999',
},
}
reply = {
'$type': 'app.bsky.feed.post',
'text': 'I hereby reply',
'reply': {
'root': {
'cid': '...',
'uri': 'at://did:plc:a/app.bsky.feed.post/987',
},
'parent': {
'cid': '...',
'uri': 'at://did:plc:a/app.bsky.feed.post/987',
}
},
}
follow = {
'$type': 'app.bsky.graph.follow',
'subject': 'did:plc:c',
}
eve = {
'$type': 'app.bsky.actor.defs#profileView',
'did': 'did:plc:eve',
'handle': 'eve.com',
}
alice = {
'$type': 'app.bsky.actor.defs#profileView',
'did': 'did:plc:a',
'handle': 'alice',
}
mock_get.side_effect = [
requests_response({
'notifications': [{
'uri': 'at://did:plc:d/app.bsky.feed.like/123',
'cid': '...',
'author': eve,
'record': like,
'reason': 'like',
'indexedAt': 'indexed-a-3',
}, {
'uri': 'at://did:plc:d/app.bsky.feed.post/456',
'cid': '...',
'author': eve,
'record': reply,
'reason': 'reply',
'indexedAt': 'indexed-a-1',
}],
}),
requests_response(DID_DOC),
requests_response({
'notifications': [{
'uri': 'at://did:plc:d/app.bsky.graph.follow/789',
'cid': '...',
'author': alice,
'record': follow,
'reason': 'follow',
'indexedAt': 'indexed-c-3',
}, {
'uri': 'at://did:plc:d/app.bsky.graph.follow/abc',
'cid': '...',
'author': eve,
'record': follow,
'reason': 'follow',
'indexedAt': 'indexed-c-1',
}],
}),
]
resp = self.post('/queue/atproto-poll-notifs', client=hub.app.test_client())
self.assertEqual(200, resp.status_code)
expected_list_notifs = [call(url, json=None, data=None, headers={
'Content-Type': 'application/json',
'User-Agent': common.USER_AGENT,
}) for url in [
'https://appview.local/xrpc/app.bsky.notification.listNotifications?limit=10',
'https://appview.local/xrpc/app.bsky.notification.listNotifications?limit=10',
]]
assert mock_get.call_args_list[0].kwargs['headers'].pop('Authorization')
self.assertEqual(expected_list_notifs[0], mock_get.call_args_list[0])
assert mock_get.call_args_list[2].kwargs['headers'].pop('Authorization')
self.assertEqual(expected_list_notifs[1], mock_get.call_args_list[2])
like_obj = Object.get_by_id('at://did:plc:d/app.bsky.feed.like/123')
self.assertEqual(like, like_obj.bsky)
self.assert_task(mock_create_task, 'receive', '/queue/receive',
obj=like_obj.key.urlsafe(), authed_as='did:plc:eve')
reply_obj = Object.get_by_id('at://did:plc:d/app.bsky.feed.post/456')
self.assertEqual(reply, reply_obj.bsky)
self.assert_task(mock_create_task, 'receive', '/queue/receive',
obj=reply_obj.key.urlsafe(), authed_as='did:plc:eve')
follow_obj = Object.get_by_id('at://did:plc:d/app.bsky.graph.follow/789')
self.assertEqual(follow, follow_obj.bsky)
self.assert_task(mock_create_task, 'receive', '/queue/receive',
obj=follow_obj.key.urlsafe(), authed_as='did:plc:a')
self.assertEqual('indexed-a-3', user_a.key.get().atproto_notifs_indexed_at)
self.assertEqual('indexed-c-3', user_c.key.get().atproto_notifs_indexed_at)
@patch.object(tasks_client, 'create_task', return_value=Task(name='my task'))
@patch('requests.get')
def test_poll_posts(self, mock_get, mock_create_task):
for i in ['a', 'b', 'c', 'd']:
did = f'did:plc:{i}'
self.store_object(id=did, raw={
**DID_DOC,
'id': did,
})
user_a = self.make_user(
id='did:plc:a', cls=ATProto, enabled_protocols=['fake'],
copies=[Target(uri='fake:user-a', protocol='fake')],
atproto_feed_indexed_at='2020-01-02T03:04:05.000Z')
user_b = self.make_user(id='did:plc:b', cls=ATProto) # no enabled protocols
user_c = self.make_user(
id='did:plc:c', cls=ATProto, enabled_protocols=['fake'],
copies=[Target(uri='fake:user-c', protocol='fake')])
user_d = self.make_user(
id='did:plc:d', cls=ATProto, enabled_protocols=['fake'],
copies=[Target(uri='fake:user-d', protocol='fake')])
post = {
'$type': 'app.bsky.feed.post',
'text': 'My original post',
'createdAt': '2007-07-07T03:04:05',
}
post_view = {
'$type': 'app.bsky.feed.defs#postView',
'uri': 'at://did:web:alice.com/app.bsky.feed.post/123',
'cid': '',
'record': post,
'author': {
'$type': 'app.bsky.actor.defs#profileViewBasic',
'did': 'did:plc:a',
'handle': 'alice.com',
},
'indexedAt': '2022-01-02T03:04:05.000Z',
}
mock_get.side_effect = [
requests_response({
'feed': [{
'$type': 'app.bsky.feed.defs#feedViewPost',
'post': post_view,
}, {
'$type': 'app.bsky.feed.defs#feedViewPost',
'post': {
**post_view,
'uri': 'at://did:web:alice.com/app.bsky.feed.post/456',
# before atproto_feed_indexed_at, should be ignored
'indexedAt': '2015-01-02T03:04:05.000Z',
},
}],
}),
requests_response({
**DID_DOC,
'id': 'did:plc:alice.com',
}),
requests_response({
'feed': [],
}),
requests_response({
'feed': [{
'$type': 'app.bsky.feed.defs#feedViewPost',
'post': post_view,
'reason': {
'$type': 'app.bsky.feed.defs#reasonRepost',
'by': {
'$type': 'app.bsky.actor.defs#profileViewBasic',
'did': 'did:web:bob.com',
'handle': 'bob.com',
},
'indexedAt': '2022-01-02T03:04:05.000Z',
},
}],
}),
]
resp = self.post('/queue/atproto-poll-posts', client=hub.app.test_client())
self.assertEqual(200, resp.status_code)
get = [call(url, json=None, data=None, headers={
'Content-Type': 'application/json',
'User-Agent': common.USER_AGENT,
}) for url in [
'https://appview.local/xrpc/app.bsky.feed.getAuthorFeed?actor=did%3Aplc%3Aa&filter=posts_with_replies&limit=10',
'https://appview.local/xrpc/app.bsky.feed.getAuthorFeed?actor=did%3Aplc%3Ac&filter=posts_with_replies&limit=10',
'https://appview.local/xrpc/app.bsky.feed.getAuthorFeed?actor=did%3Aplc%3Ad&filter=posts_with_replies&limit=10',
]]
self.assertEqual([
get[0],
self.req('https://alice.com/.well-known/did.json'),
get[1],
get[2],
], mock_get.call_args_list)
post_obj = Object.get_by_id('at://did:web:alice.com/app.bsky.feed.post/123')
self.assertEqual(post, post_obj.bsky)
self.assertEqual(1, mock_create_task.call_count)
self.assert_task(mock_create_task, 'receive', '/queue/receive',
obj=post_obj.key.urlsafe(), authed_as='did:plc:a')
# indexedAt was too old
self.assertIsNone(Object.get_by_id(
'at://did:web:alice.com/app.bsky.feed.post/456)'))
self.assertEqual('2022-01-02T03:04:05.000Z',
user_a.key.get().atproto_feed_indexed_at)
self.assertIsNone(user_b.key.get().atproto_feed_indexed_at)
self.assertIsNone(user_c.key.get().atproto_feed_indexed_at)
self.assertIsNone(user_d.key.get().atproto_feed_indexed_at)
# TODO: https://github.com/snarfed/bridgy-fed/issues/728
# repost_obj = Object.get_by_id('at://did:plc:d/app.bsky.feed.post/456')
# self.assertEqual(repost, repost_obj.bsky)
# self.assert_task(mock_create_task, 'receive', '/queue/receive',
# obj=repost_obj.key.urlsafe(), authed_as='did:plc:eve')
def test_datastore_client_get_record_datastore(self):
self.make_user_and_repo()
post = {

Wyświetl plik

@ -1,5 +1,6 @@
"""Integration tests."""
import copy
from unittest import skip
from unittest.mock import patch
from arroba.datastore_storage import DatastoreStorage
@ -39,6 +40,8 @@ PROFILE_GETRECORD = {
@patch('ids.COPIES_PROTOCOLS', ['atproto'])
class IntegrationTests(TestCase):
# TODO: port to firehose
@skip
@patch('requests.post')
@patch('requests.get')
def test_atproto_notify_reply_to_activitypub(self, mock_get, mock_post):
@ -136,6 +139,8 @@ class IntegrationTests(TestCase):
})
# TODO: port to firehose
@skip
@patch('requests.post', return_value=requests_response(''))
@patch('requests.get')
def test_atproto_follow_to_web(self, mock_get, mock_post):
@ -463,6 +468,8 @@ class IntegrationTests(TestCase):
self.assertEqual(0, len(user.copies))
# TODO: port to firehose
@skip
@patch('requests.post')
@patch('requests.get')
def test_atproto_follow_ap_bot_user_enables_protocol(self, mock_get, mock_post):