switch ATProto polling from cursor to indexedAt

for #954
pull/1020/head
Ryan Barrett 2024-05-05 07:27:47 -07:00
rodzic 6f0f53bdb8
commit c98d8fce00
Nie znaleziono w bazie danych klucza dla tego podpisu
ID klucza GPG: 6BE31FDF4776E9D4
4 zmienionych plików z 72 dodań i 36 usunięć

Wyświetl plik

@ -629,16 +629,23 @@ def poll_notifications():
privkey=repo.signing_key)
resp = client.app.bsky.notification.listNotifications(
cursor=user.atproto_notifs_cursor,
# higher limit for protocol bot users to try to make sure we don't
# miss any follows
limit=100 if Protocol.for_bridgy_subdomain(user.handle) else 10)
latest_indexed_at = user.atproto_notifs_indexed_at
for notif in resp['notifications']:
actor_did = notif['author']['did']
if (user.atproto_notifs_indexed_at
and notif['indexedAt'] <= user.atproto_notifs_indexed_at):
continue
if not latest_indexed_at or notif['indexedAt'] > latest_indexed_at:
latest_indexed_at = notif['indexedAt']
# TODO: verify sig. skipping this for now because we're getting
# these from the AppView, which is trusted, specifically we expect
# the BGS and/or the AppView already checked sigs.
actor_did = notif['author']['did']
obj = Object.get_or_create(id=notif['uri'], bsky=notif['record'],
source_protocol=ATProto.ABBREV,
actor=actor_did)
@ -655,15 +662,13 @@ def poll_notifications():
common.create_task(queue='receive', obj=obj.key.urlsafe(),
authed_as=actor_did)
# store cursor, even if it's unset, since then we want to switch to the
# latest, not keep reading starting at the old cursor.
cursor = resp.get('cursor')
# store indexed_at
@ndb.transactional()
def store_cursor():
def store_indexed_at():
u = user.key.get()
u.atproto_notifs_cursor = cursor
u.atproto_notifs_indexed_at = latest_indexed_at
u.put()
store_cursor()
store_indexed_at()
return 'OK'
@ -689,9 +694,8 @@ def poll_posts():
logger.debug(f'Fetching posts for {did} {user.handle}')
resp = appview.app.bsky.feed.getAuthorFeed(
actor=did, filter='posts_no_replies',
# cursor=user.atproto_feed_cursor,
limit=10)
actor=did, filter='posts_no_replies', limit=10)
latest_indexed_at = user.atproto_feed_indexed_at
for item in resp['feed']:
# TODO: handle reposts once we have a URI for them
@ -701,6 +705,15 @@ def poll_posts():
post = item['post']
# TODO: use item['reason']['indexedAt'] instead for reposts once
# we're handling them
if (user.atproto_feed_indexed_at
and post['indexedAt'] <= user.atproto_feed_indexed_at):
continue
if not latest_indexed_at or post['indexedAt'] > latest_indexed_at:
latest_indexed_at = post['indexedAt']
# TODO: verify sig. skipping this for now because we're getting
# these from the AppView, which is trusted, specifically we expect
# the BGS and/or the AppView already checked sigs.
@ -718,14 +731,12 @@ def poll_posts():
common.create_task(queue='receive', obj=obj.key.urlsafe(), authed_as=did)
# # store cursor, even if it's unset, since then we want to switch to the
# # latest, not keep reading starting at the old cursor.
# cursor = resp.get('cursor')
# @ndb.transactional()
# def store_cursor():
# u = user.key.get()
# u.atproto_feed_cursor = cursor
# u.put()
# store_cursor()
# store indexed_at
@ndb.transactional()
def store_indexed_at():
u = user.key.get()
u.atproto_feed_indexed_at = latest_indexed_at
u.put()
store_indexed_at()
return 'OK'

Wyświetl plik

@ -168,8 +168,8 @@ class User(StringIdModel, metaclass=ProtocolUserMeta):
enabled_protocols = ndb.StringProperty(repeated=True, choices=[])
# protocol-specific state
atproto_notifs_cursor = ndb.TextProperty()
atproto_feed_cursor = ndb.TextProperty()
atproto_notifs_indexed_at = ndb.TextProperty()
atproto_feed_indexed_at = ndb.TextProperty()
created = ndb.DateTimeProperty(auto_now_add=True)
updated = ndb.DateTimeProperty(auto_now=True)

Wyświetl plik

@ -1170,7 +1170,7 @@ class ATProtoTest(TestCase):
user_b = self.make_user(id='fake:user-b', cls=Fake,
copies=[Target(uri='did:plc:b', protocol='atproto')])
user_c = self.make_user(id='fake:user-c', cls=Fake,
atproto_notifs_cursor='kursor-c-before',
atproto_notifs_indexed_at='indexed-c-2',
copies=[Target(uri='did:plc:c', protocol='atproto')])
Repo.create(self.storage, 'did:plc:a', signing_key=ATPROTO_KEY)
@ -1214,19 +1214,20 @@ class ATProtoTest(TestCase):
mock_get.side_effect = [
requests_response({
'cursor': 'kursor-a-after',
'notifications': [{
'uri': 'at://did:plc:d/app.bsky.feed.like/123',
'cid': '...',
'author': eve,
'record': like,
'reason': 'like',
'indexedAt': 'indexed-a-3',
}, {
'uri': 'at://did:plc:d/app.bsky.feed.post/456',
'cid': '...',
'author': eve,
'record': reply,
'reason': 'reply',
'indexedAt': 'indexed-a-1',
}],
}),
requests_response(DID_DOC),
@ -1237,6 +1238,14 @@ class ATProtoTest(TestCase):
'author': alice,
'record': follow,
'reason': 'follow',
'indexedAt': 'indexed-c-3',
}, {
'uri': 'at://did:plc:d/app.bsky.graph.follow/abc',
'cid': '...',
'author': eve,
'record': follow,
'reason': 'follow',
'indexedAt': 'indexed-c-1',
}],
}),
]
@ -1249,7 +1258,7 @@ class ATProtoTest(TestCase):
'User-Agent': common.USER_AGENT,
}) for url in [
'https://appview.local/xrpc/app.bsky.notification.listNotifications?limit=10',
'https://appview.local/xrpc/app.bsky.notification.listNotifications?cursor=kursor-c-before&limit=10',
'https://appview.local/xrpc/app.bsky.notification.listNotifications?limit=10',
]]
assert mock_get.call_args_list[0].kwargs['headers'].pop('Authorization')
@ -1273,8 +1282,8 @@ class ATProtoTest(TestCase):
self.assert_task(mock_create_task, 'receive', '/queue/receive',
obj=follow_obj.key.urlsafe(), authed_as='did:plc:a')
self.assertEqual('kursor-a-after', user_a.key.get().atproto_notifs_cursor)
self.assertIsNone(user_c.key.get().atproto_notifs_cursor)
self.assertEqual('indexed-a-3', user_a.key.get().atproto_notifs_indexed_at)
self.assertEqual('indexed-c-3', user_c.key.get().atproto_notifs_indexed_at)
@patch.object(tasks_client, 'create_task', return_value=Task(name='my task'))
@patch('requests.get')
@ -1288,12 +1297,12 @@ class ATProtoTest(TestCase):
user_a = self.make_user(
id='did:plc:a', cls=ATProto, enabled_protocols=['fake'],
copies=[Target(uri='fake:user-a', protocol='fake')])
copies=[Target(uri='fake:user-a', protocol='fake')],
atproto_feed_indexed_at='2020-01-02T03:04:05.000Z')
user_b = self.make_user(id='did:plc:b', cls=ATProto) # no enabled protocols
user_c = self.make_user(
id='did:plc:c', cls=ATProto, enabled_protocols=['fake'],
copies=[Target(uri='fake:user-c', protocol='fake')],
atproto_feed_cursor='kursor-c-before')
copies=[Target(uri='fake:user-c', protocol='fake')])
user_d = self.make_user(
id='did:plc:d', cls=ATProto, enabled_protocols=['fake'],
copies=[Target(uri='fake:user-d', protocol='fake')])
@ -1313,14 +1322,22 @@ class ATProtoTest(TestCase):
'did': 'did:plc:a',
'handle': 'alice.com',
},
'indexedAt': '2022-01-02T03:04:05.000Z',
}
mock_get.side_effect = [
requests_response({
'cursor': 'kursor-a-after',
'feed': [{
'$type': 'app.bsky.feed.defs#feedViewPost',
'post': post_view,
}, {
'$type': 'app.bsky.feed.defs#feedViewPost',
'post': {
**post_view,
'uri': 'at://did:web:alice.com/app.bsky.feed.post/456',
# before atproto_feed_indexed_at, should be ignored
'indexedAt': '2015-01-02T03:04:05.000Z',
},
}],
}),
requests_response({
@ -1331,7 +1348,6 @@ class ATProtoTest(TestCase):
'feed': [],
}),
requests_response({
'cursor': 'kursor-d-after',
'feed': [{
'$type': 'app.bsky.feed.defs#feedViewPost',
'post': post_view,
@ -1368,13 +1384,19 @@ class ATProtoTest(TestCase):
post_obj = Object.get_by_id('at://did:web:alice.com/app.bsky.feed.post/123')
self.assertEqual(post, post_obj.bsky)
self.assertEqual(1, mock_create_task.call_count)
self.assert_task(mock_create_task, 'receive', '/queue/receive',
obj=post_obj.key.urlsafe(), authed_as='did:plc:a')
self.assertIsNone(user_a.key.get().atproto_feed_cursor)
self.assertIsNone(user_b.key.get().atproto_feed_cursor)
# self.assertIsNone(user_c.key.get().atproto_feed_cursor)
self.assertIsNone(user_d.key.get().atproto_feed_cursor)
# indexedAt was too old
self.assertIsNone(Object.get_by_id(
'at://did:web:alice.com/app.bsky.feed.post/456)'))
self.assertEqual('2022-01-02T03:04:05.000Z',
user_a.key.get().atproto_feed_indexed_at)
self.assertIsNone(user_b.key.get().atproto_feed_indexed_at)
self.assertIsNone(user_c.key.get().atproto_feed_indexed_at)
self.assertIsNone(user_d.key.get().atproto_feed_indexed_at)
# TODO: https://github.com/snarfed/bridgy-fed/issues/728
# repost_obj = Object.get_by_id('at://did:plc:d/app.bsky.feed.post/456')

Wyświetl plik

@ -101,6 +101,7 @@ class IntegrationTests(TestCase):
}
},
},
'indexedAt': '...',
}],
}),
# ATProto getRecord of alice's profile
@ -171,6 +172,7 @@ class IntegrationTests(TestCase):
'subject': 'did:plc:bob',
'createdAt': '2022-01-02T03:04:05.000Z',
},
'indexedAt': '...',
}],
}),
# ATProto getRecord of alice's profile
@ -488,6 +490,7 @@ class IntegrationTests(TestCase):
'$type': 'app.bsky.graph.follow',
'subject': 'did:plc:ap',
},
'indexedAt': '...',
}],
}),
# alice DID