ATProto firehose subscriber: narrow filtering

this drastically reduces the volume of inbound records that we enqueue receive tasks for, from 3-6/s to .2-.4/s. before, we were enqueueing more or less everything from a bridged ATProto user *and* everything that referenced a bridged non-ATProto user. now, it's more like the intersection.

for #1329, #1149
pull/1343/head
Ryan Barrett 2024-09-21 07:27:24 -07:00
rodzic b3c3419cdb
commit 776d2b91e0
Nie znaleziono w bazie danych klucza dla tego podpisu
ID klucza GPG: 6BE31FDF4776E9D4
2 zmienionych plików z 62 dodań i 135 usunięć

Wyświetl plik

@ -187,26 +187,23 @@ def subscribe():
# when running locally, comment out put above and uncomment this
# cursor.updated = util.now().replace(tzinfo=None)
if payload['repo'] in bridged_dids:
logger.info(f'Ignoring record from our non-ATProto bridged user {payload["repo"]}')
if payload['repo'] not in atproto_dids:
continue
blocks = {} # maps base32 str CID to dict block
if block_bytes := payload.get('blocks'):
_, blocks = libipld.decode_car(block_bytes)
# detect records that reference an ATProto user, eg replies, likes,
# reposts, mentions
# detect records from bridged ATProto users that we should handle
for p_op in payload.get('ops', []):
op = Op(repo=payload['repo'], action=p_op.get('action'),
path=p_op.get('path'), seq=payload['seq'])
if not op.action or not op.path:
logger.info(
f'bad payload! seq {op.seq} has action {op.action} path {op.path}!')
f'bad payload! seq {op.seq} action {op.action} path {op.path}!')
continue
is_ours = op.repo in atproto_dids
if is_ours and op.action == 'delete':
if op.action == 'delete':
logger.info(f'Got delete from our ATProto user: {op}')
# TODO: also detect deletes of records that *reference* our bridged
# users, eg a delete of a follow or like or repost of them.
@ -230,54 +227,32 @@ def subscribe():
elif type not in ATProto.SUPPORTED_RECORD_TYPES:
continue
if is_ours:
logger.info(f'Got one from our ATProto user: {op.action} {op.repo} {op.path}')
commits.put(op)
continue
def is_ours(ref, also_atproto_users=False):
"""Returns True if the arg is a bridge user."""
if match := AT_URI_PATTERN.match(ref['uri']):
did = match.group('repo')
return did and (did in bridged_dids
or also_atproto_users and did in atproto_dids)
subjects = []
if type == 'app.bsky.feed.repost':
if not is_ours(op.record['subject'], also_atproto_users=True):
continue
def maybe_add(did_or_ref):
if isinstance(did_or_ref, dict):
match = AT_URI_PATTERN.match(did_or_ref['uri'])
if match:
did = match.group('repo')
else:
return
else:
did = did_or_ref
if did and did in bridged_dids:
add(subjects, did)
if type in ('app.bsky.feed.like', 'app.bsky.feed.repost'):
maybe_add(op.record['subject'])
elif type == 'app.bsky.feed.like':
if not is_ours(op.record['subject'], also_atproto_users=False):
continue
elif type in ('app.bsky.graph.block', 'app.bsky.graph.follow'):
maybe_add(op.record['subject'])
if op.record['subject'] not in bridged_dids:
continue
elif type == 'app.bsky.feed.post':
# replies
if reply := op.record.get('reply'):
for ref in 'parent', 'root':
maybe_add(reply[ref])
if not is_ours(reply['parent'], also_atproto_users=True):
continue
# mentions
for facet in op.record.get('facets', []):
for feature in facet['features']:
if feature['$type'] == 'app.bsky.richtext.facet#mention':
maybe_add(feature['did'])
# quote posts
if embed := op.record.get('embed'):
if embed['$type'] == 'app.bsky.embed.record':
maybe_add(embed['record'])
elif embed['$type'] == 'app.bsky.embed.recordWithMedia':
maybe_add(embed['record']['record'])
if subjects:
logger.info(f'Got one re our ATProto users {subjects}: {op.action} {op.repo} {op.path}')
commits.put(op)
logger.info(f'Got one: {op.action} {op.repo} {op.path}')
commits.put(op)
def handler():

Wyświetl plik

@ -84,10 +84,9 @@ class FakeWebsocketClient:
class ATProtoTestCase(TestCase):
"""Utilities used by both test classes."""
def make_bridged_atproto_user(self):
self.store_object(id='did:plc:user', raw=DID_DOC)
return self.make_user('did:plc:user', cls=ATProto,
enabled_protocols=['eefake'],
def make_bridged_atproto_user(self, did='did:plc:user'):
self.store_object(id=did, raw=DID_DOC)
return self.make_user(did, cls=ATProto, enabled_protocols=['eefake'],
obj_bsky=ACTOR_PROFILE_BSKY)
@ -110,6 +109,7 @@ class ATProtoFirehoseSubscribeTest(ATProtoTestCase):
atproto_firehose.bridged_loaded_at = datetime(1900, 1, 1)
atproto_firehose.dids_initialized.clear()
self.make_bridged_atproto_user()
AtpRepo(id='did:alice', head='', signing_key_pem=b'').put()
self.store_object(id='did:plc:bob', raw=DID_DOC)
ATProto(id='did:plc:bob').put()
@ -180,13 +180,10 @@ class ATProtoFirehoseSubscribeTest(ATProtoTestCase):
FakeWebsocketClient.url)
def test_post_by_our_atproto_user(self):
self.make_bridged_atproto_user()
self.assert_enqueues(POST_BSKY, repo='did:plc:user')
self.assert_enqueues(POST_BSKY)
def test_post_by_other(self):
self.store_object(id='did:plc:eve', raw={**DID_DOC, 'id': 'did:plc:eve'})
self.make_user('did:plc:eve', cls=ATProto, enabled_protocols=['eefake'])
self.assert_doesnt_enqueue(POST_BSKY, repo='did:plc:user')
self.assert_doesnt_enqueue(POST_BSKY, repo='did:plc:bob')
def test_skip_post_by_bridged_user(self):
# reply to bridged user, but also from bridged user, so we should skip
@ -199,13 +196,24 @@ class ATProtoFirehoseSubscribeTest(ATProtoTestCase):
},
}, repo='did:alice')
def test_like_by_our_atproto_user(self):
self.assert_enqueues({
'$type': 'app.bsky.feed.like',
'subject': {'uri': 'at://did:alice/app.bsky.feed.post/tid'},
})
def test_like_by_our_atproto_user_of_non_bridged_user(self):
self.assert_doesnt_enqueue({
'$type': 'app.bsky.feed.like',
'subject': {'uri': 'at://did:eve/app.bsky.feed.post/tid'},
})
def test_skip_unsupported_type(self):
self.make_bridged_atproto_user()
self.assert_doesnt_enqueue({
'$type': 'app.bsky.nopey.nope',
}, repo='did:plc:user')
def test_reply_direct_to_our_user(self):
def test_reply_direct_to_atproto_user(self):
self.assert_enqueues({
'$type': 'app.bsky.feed.post',
'reply': {
@ -215,15 +223,24 @@ class ATProtoFirehoseSubscribeTest(ATProtoTestCase):
# test that we handle CIDs
'cid': A_CID.encode(),
},
'root': {
'uri': '-',
},
})
def test_reply_direct_to_bridged_user(self):
self.make_bridged_atproto_user('did:web:carol.com')
self.assert_enqueues({
'$type': 'app.bsky.feed.post',
'reply': {
'$type': 'app.bsky.feed.post#replyRef',
'parent': {
'uri': 'at://did:web:carol.com/app.bsky.feed.post/tid',
'cid': A_CID.encode(),
},
},
})
def test_reply_indirect_to_our_user(self):
self.assert_enqueues({
self.assert_doesnt_enqueue({
'$type': 'app.bsky.feed.post',
'reply': {
'$type': 'app.bsky.feed.post#replyRef',
@ -242,72 +259,6 @@ class ATProtoFirehoseSubscribeTest(ATProtoTestCase):
},
})
def test_mention_our_user(self):
self.assert_enqueues({
'$type': 'app.bsky.feed.post',
'facets': [{
'$type': 'app.bsky.richtext.facet',
'features': [{
'$type': 'app.bsky.richtext.facet#mention',
'did': 'did:alice',
}],
}],
})
def test_mention_other(self):
self.assert_doesnt_enqueue({
'$type': 'app.bsky.feed.post',
'facets': [{
'$type': 'app.bsky.richtext.facet',
'features': [{
'$type': 'app.bsky.richtext.facet#mention',
'did': 'did:eve',
}],
}],
})
def test_quote_of_our_user(self):
self.assert_enqueues({
'$type': 'app.bsky.feed.post',
'embed': {
'$type': 'app.bsky.embed.record',
'record': {'uri': 'at://did:alice/app.bsky.feed.post/tid'},
},
})
def test_quote_of_other(self):
self.assert_doesnt_enqueue({
'$type': 'app.bsky.feed.post',
'embed': {
'$type': 'app.bsky.embed.record',
'record': {'uri': 'at://did:eve/app.bsky.feed.post/tid'},
},
})
def test_quote_of_our_user_with_image(self):
self.assert_enqueues({
'$type': 'app.bsky.feed.post',
'embed': {
'$type': 'app.bsky.embed.recordWithMedia',
'record': {
'record': {'uri': 'at://did:alice/app.bsky.feed.post/tid'},
},
'media': {'$type': 'app.bsky.embed.images'},
},
})
def test_quote_of_other_with_image(self):
self.assert_doesnt_enqueue({
'$type': 'app.bsky.feed.post',
'embed': {
'$type': 'app.bsky.embed.recordWithMedia',
'record': {
'record': {'uri': 'at://did:eve/app.bsky.feed.post/tid'},
},
'media': {'$type': 'app.bsky.embed.images'},
},
})
def test_like_of_our_user(self):
self.assert_enqueues({
'$type': 'app.bsky.feed.like',
@ -357,19 +308,18 @@ class ATProtoFirehoseSubscribeTest(ATProtoTestCase):
})
def test_delete_by_our_atproto_user(self):
self.make_bridged_atproto_user()
path = 'app.bsky.feed.post/abc123'
self.assert_enqueues(path=path, action='delete')
def test_delete_by_other(self):
self.assert_doesnt_enqueue(action='delete')
self.assert_doesnt_enqueue(action='delete', repo='did:plc:carol')
def test_update_by_our_atproto_user(self):
self.make_bridged_atproto_user()
self.assert_enqueues(action='delete')
self.assert_enqueues(action='update', record=POST_BSKY)
def test_update_by_other(self):
self.assert_doesnt_enqueue(action='delete')
self.assert_doesnt_enqueue(action='update', repo='did:plc:carol',
record=POST_BSKY)
def test_update_like_of_our_user(self):
self.assert_enqueues(action='update', record={
@ -381,10 +331,10 @@ class ATProtoFirehoseSubscribeTest(ATProtoTestCase):
self.cursor.cursor = 1
self.cursor.put()
FakeWebsocketClient.setup_receive(
Op(repo='did:x', action='create', path='y', seq=4, record={'foo': 'bar'}))
FakeWebsocketClient.setup_receive(Op(repo='did:plc:user', action='create',
path='y', seq=4, record={'foo': 'bar'}))
with patch('libipld.decode_car', side_effect=RuntimeError('oops')), \
self.assertRaises(RuntimeError):
self.assertRaises(RuntimeError):
self.subscribe()
self.assertTrue(commits.empty())
@ -418,10 +368,12 @@ class ATProtoFirehoseSubscribeTest(ATProtoTestCase):
eve.enabled_protocols = ['eefake']
eve.put()
self.assertGreater(eve.updated, atproto_firehose.atproto_loaded_at)
self.assert_enqueues({'$type': 'app.bsky.feed.post'}, repo='did:plc:eve')
self.assertIn('did:plc:eve', atproto_firehose.atproto_dids)
def test_load_dids_atprepo(self):
FakeWebsocketClient.to_receive = [({'op': 1, 't': '#info'}, {})]
self.subscribe()