diff --git a/atproto_firehose.py b/atproto_firehose.py index 78f16523..3fcb78cd 100644 --- a/atproto_firehose.py +++ b/atproto_firehose.py @@ -187,26 +187,23 @@ def subscribe(): # when running locally, comment out put above and uncomment this # cursor.updated = util.now().replace(tzinfo=None) - if payload['repo'] in bridged_dids: - logger.info(f'Ignoring record from our non-ATProto bridged user {payload["repo"]}') + if payload['repo'] not in atproto_dids: continue blocks = {} # maps base32 str CID to dict block if block_bytes := payload.get('blocks'): _, blocks = libipld.decode_car(block_bytes) - # detect records that reference an ATProto user, eg replies, likes, - # reposts, mentions + # detect records from bridged ATProto users that we should handle for p_op in payload.get('ops', []): op = Op(repo=payload['repo'], action=p_op.get('action'), path=p_op.get('path'), seq=payload['seq']) if not op.action or not op.path: logger.info( - f'bad payload! seq {op.seq} has action {op.action} path {op.path}!') + f'bad payload! seq {op.seq} action {op.action} path {op.path}!') continue - is_ours = op.repo in atproto_dids - if is_ours and op.action == 'delete': + if op.action == 'delete': logger.info(f'Got delete from our ATProto user: {op}') # TODO: also detect deletes of records that *reference* our bridged # users, eg a delete of a follow or like or repost of them. @@ -230,54 +227,32 @@ def subscribe(): elif type not in ATProto.SUPPORTED_RECORD_TYPES: continue - if is_ours: - logger.info(f'Got one from our ATProto user: {op.action} {op.repo} {op.path}') - commits.put(op) - continue + def is_ours(ref, also_atproto_users=False): + """Returns True if the arg is a bridge user.""" + if match := AT_URI_PATTERN.match(ref['uri']): + did = match.group('repo') + return did and (did in bridged_dids + or also_atproto_users and did in atproto_dids) - subjects = [] + if type == 'app.bsky.feed.repost': + if not is_ours(op.record['subject'], also_atproto_users=True): + continue - def maybe_add(did_or_ref): - if isinstance(did_or_ref, dict): - match = AT_URI_PATTERN.match(did_or_ref['uri']) - if match: - did = match.group('repo') - else: - return - else: - did = did_or_ref - - if did and did in bridged_dids: - add(subjects, did) - - if type in ('app.bsky.feed.like', 'app.bsky.feed.repost'): - maybe_add(op.record['subject']) + elif type == 'app.bsky.feed.like': + if not is_ours(op.record['subject'], also_atproto_users=False): + continue elif type in ('app.bsky.graph.block', 'app.bsky.graph.follow'): - maybe_add(op.record['subject']) + if op.record['subject'] not in bridged_dids: + continue elif type == 'app.bsky.feed.post': - # replies if reply := op.record.get('reply'): - for ref in 'parent', 'root': - maybe_add(reply[ref]) + if not is_ours(reply['parent'], also_atproto_users=True): + continue - # mentions - for facet in op.record.get('facets', []): - for feature in facet['features']: - if feature['$type'] == 'app.bsky.richtext.facet#mention': - maybe_add(feature['did']) - - # quote posts - if embed := op.record.get('embed'): - if embed['$type'] == 'app.bsky.embed.record': - maybe_add(embed['record']) - elif embed['$type'] == 'app.bsky.embed.recordWithMedia': - maybe_add(embed['record']['record']) - - if subjects: - logger.info(f'Got one re our ATProto users {subjects}: {op.action} {op.repo} {op.path}') - commits.put(op) + logger.info(f'Got one: {op.action} {op.repo} {op.path}') + commits.put(op) def handler(): diff --git a/tests/test_atproto_firehose.py b/tests/test_atproto_firehose.py index a63063bd..7a8c5739 100644 --- a/tests/test_atproto_firehose.py +++ b/tests/test_atproto_firehose.py @@ -84,10 +84,9 @@ class FakeWebsocketClient: class ATProtoTestCase(TestCase): """Utilities used by both test classes.""" - def make_bridged_atproto_user(self): - self.store_object(id='did:plc:user', raw=DID_DOC) - return self.make_user('did:plc:user', cls=ATProto, - enabled_protocols=['eefake'], + def make_bridged_atproto_user(self, did='did:plc:user'): + self.store_object(id=did, raw=DID_DOC) + return self.make_user(did, cls=ATProto, enabled_protocols=['eefake'], obj_bsky=ACTOR_PROFILE_BSKY) @@ -110,6 +109,7 @@ class ATProtoFirehoseSubscribeTest(ATProtoTestCase): atproto_firehose.bridged_loaded_at = datetime(1900, 1, 1) atproto_firehose.dids_initialized.clear() + self.make_bridged_atproto_user() AtpRepo(id='did:alice', head='', signing_key_pem=b'').put() self.store_object(id='did:plc:bob', raw=DID_DOC) ATProto(id='did:plc:bob').put() @@ -180,13 +180,10 @@ class ATProtoFirehoseSubscribeTest(ATProtoTestCase): FakeWebsocketClient.url) def test_post_by_our_atproto_user(self): - self.make_bridged_atproto_user() - self.assert_enqueues(POST_BSKY, repo='did:plc:user') + self.assert_enqueues(POST_BSKY) def test_post_by_other(self): - self.store_object(id='did:plc:eve', raw={**DID_DOC, 'id': 'did:plc:eve'}) - self.make_user('did:plc:eve', cls=ATProto, enabled_protocols=['eefake']) - self.assert_doesnt_enqueue(POST_BSKY, repo='did:plc:user') + self.assert_doesnt_enqueue(POST_BSKY, repo='did:plc:bob') def test_skip_post_by_bridged_user(self): # reply to bridged user, but also from bridged user, so we should skip @@ -199,13 +196,24 @@ class ATProtoFirehoseSubscribeTest(ATProtoTestCase): }, }, repo='did:alice') + def test_like_by_our_atproto_user(self): + self.assert_enqueues({ + '$type': 'app.bsky.feed.like', + 'subject': {'uri': 'at://did:alice/app.bsky.feed.post/tid'}, + }) + + def test_like_by_our_atproto_user_of_non_bridged_user(self): + self.assert_doesnt_enqueue({ + '$type': 'app.bsky.feed.like', + 'subject': {'uri': 'at://did:eve/app.bsky.feed.post/tid'}, + }) + def test_skip_unsupported_type(self): - self.make_bridged_atproto_user() self.assert_doesnt_enqueue({ '$type': 'app.bsky.nopey.nope', }, repo='did:plc:user') - def test_reply_direct_to_our_user(self): + def test_reply_direct_to_atproto_user(self): self.assert_enqueues({ '$type': 'app.bsky.feed.post', 'reply': { @@ -215,15 +223,24 @@ class ATProtoFirehoseSubscribeTest(ATProtoTestCase): # test that we handle CIDs 'cid': A_CID.encode(), }, - 'root': { - 'uri': '-', + }, + }) + + def test_reply_direct_to_bridged_user(self): + self.make_bridged_atproto_user('did:web:carol.com') + self.assert_enqueues({ + '$type': 'app.bsky.feed.post', + 'reply': { + '$type': 'app.bsky.feed.post#replyRef', + 'parent': { + 'uri': 'at://did:web:carol.com/app.bsky.feed.post/tid', 'cid': A_CID.encode(), }, }, }) def test_reply_indirect_to_our_user(self): - self.assert_enqueues({ + self.assert_doesnt_enqueue({ '$type': 'app.bsky.feed.post', 'reply': { '$type': 'app.bsky.feed.post#replyRef', @@ -242,72 +259,6 @@ class ATProtoFirehoseSubscribeTest(ATProtoTestCase): }, }) - def test_mention_our_user(self): - self.assert_enqueues({ - '$type': 'app.bsky.feed.post', - 'facets': [{ - '$type': 'app.bsky.richtext.facet', - 'features': [{ - '$type': 'app.bsky.richtext.facet#mention', - 'did': 'did:alice', - }], - }], - }) - - def test_mention_other(self): - self.assert_doesnt_enqueue({ - '$type': 'app.bsky.feed.post', - 'facets': [{ - '$type': 'app.bsky.richtext.facet', - 'features': [{ - '$type': 'app.bsky.richtext.facet#mention', - 'did': 'did:eve', - }], - }], - }) - - def test_quote_of_our_user(self): - self.assert_enqueues({ - '$type': 'app.bsky.feed.post', - 'embed': { - '$type': 'app.bsky.embed.record', - 'record': {'uri': 'at://did:alice/app.bsky.feed.post/tid'}, - }, - }) - - def test_quote_of_other(self): - self.assert_doesnt_enqueue({ - '$type': 'app.bsky.feed.post', - 'embed': { - '$type': 'app.bsky.embed.record', - 'record': {'uri': 'at://did:eve/app.bsky.feed.post/tid'}, - }, - }) - - def test_quote_of_our_user_with_image(self): - self.assert_enqueues({ - '$type': 'app.bsky.feed.post', - 'embed': { - '$type': 'app.bsky.embed.recordWithMedia', - 'record': { - 'record': {'uri': 'at://did:alice/app.bsky.feed.post/tid'}, - }, - 'media': {'$type': 'app.bsky.embed.images'}, - }, - }) - - def test_quote_of_other_with_image(self): - self.assert_doesnt_enqueue({ - '$type': 'app.bsky.feed.post', - 'embed': { - '$type': 'app.bsky.embed.recordWithMedia', - 'record': { - 'record': {'uri': 'at://did:eve/app.bsky.feed.post/tid'}, - }, - 'media': {'$type': 'app.bsky.embed.images'}, - }, - }) - def test_like_of_our_user(self): self.assert_enqueues({ '$type': 'app.bsky.feed.like', @@ -357,19 +308,18 @@ class ATProtoFirehoseSubscribeTest(ATProtoTestCase): }) def test_delete_by_our_atproto_user(self): - self.make_bridged_atproto_user() path = 'app.bsky.feed.post/abc123' self.assert_enqueues(path=path, action='delete') def test_delete_by_other(self): - self.assert_doesnt_enqueue(action='delete') + self.assert_doesnt_enqueue(action='delete', repo='did:plc:carol') def test_update_by_our_atproto_user(self): - self.make_bridged_atproto_user() - self.assert_enqueues(action='delete') + self.assert_enqueues(action='update', record=POST_BSKY) def test_update_by_other(self): - self.assert_doesnt_enqueue(action='delete') + self.assert_doesnt_enqueue(action='update', repo='did:plc:carol', + record=POST_BSKY) def test_update_like_of_our_user(self): self.assert_enqueues(action='update', record={ @@ -381,10 +331,10 @@ class ATProtoFirehoseSubscribeTest(ATProtoTestCase): self.cursor.cursor = 1 self.cursor.put() - FakeWebsocketClient.setup_receive( - Op(repo='did:x', action='create', path='y', seq=4, record={'foo': 'bar'})) + FakeWebsocketClient.setup_receive(Op(repo='did:plc:user', action='create', + path='y', seq=4, record={'foo': 'bar'})) with patch('libipld.decode_car', side_effect=RuntimeError('oops')), \ - self.assertRaises(RuntimeError): + self.assertRaises(RuntimeError): self.subscribe() self.assertTrue(commits.empty()) @@ -418,10 +368,12 @@ class ATProtoFirehoseSubscribeTest(ATProtoTestCase): eve.enabled_protocols = ['eefake'] eve.put() self.assertGreater(eve.updated, atproto_firehose.atproto_loaded_at) + self.assert_enqueues({'$type': 'app.bsky.feed.post'}, repo='did:plc:eve') self.assertIn('did:plc:eve', atproto_firehose.atproto_dids) def test_load_dids_atprepo(self): + FakeWebsocketClient.to_receive = [({'op': 1, 't': '#info'}, {})] self.subscribe()