ATProto firehose: error handling, reconnect in subscribe

in-reply-to-bridged
Ryan Barrett 2024-05-08 20:43:10 -07:00
rodzic b8e0fecc3b
commit 1e5fcfda0b
Nie znaleziono w bazie danych klucza dla tego podpisu
ID klucza GPG: 6BE31FDF4776E9D4
2 zmienionych plików z 49 dodań i 19 usunięć

Wyświetl plik

@ -1,22 +1,27 @@
"""Bridgy Fed firehose client. Enqueues receive tasks for events for bridged users.
"""
from collections import namedtuple
from datetime import timedelta
import itertools
import logging
import os
from queue import SimpleQueue
import time
from carbox import read_car
import dag_json
from granary.bluesky import AT_URI_PATTERN
from lexrpc.client import Client
from oauth_dropins.webutil import util
from oauth_dropins.webutil.appengine_config import error_reporting_client
from atproto import ATProto
from common import add, create_task
import models
from models import Object
RECONNECT_DELAY = timedelta(seconds=30)
logger = logging.getLogger(__name__)
# a commit operation. similar to arroba.repo.Write. record is None for deletes.
@ -28,21 +33,45 @@ Op = namedtuple('Op', ['action', 'repo', 'path', 'record'],
new_commits = SimpleQueue()
def subscribe():
def subscribe(reconnect=True):
"""Subscribes to the relay's firehose.
Relay hostname comes from the ``BGS_HOST`` environment variable.
Args:
reconnect (bool): whether to always reconnect after we get disconnected
"""
logger.info(f'starting thread to consume firehose and detect our commits')
query = ATProto.query(ATProto.enabled_protocols != None)
our_atproto_dids = frozenset(key.id() for key in query.iter(keys_only=True))
atproto_dids = frozenset(key.id() for key in query.iter(keys_only=True))
other_queries = itertools.chain(*(
cls.query(cls.copies.protocol == 'atproto').iter()
for cls in set(models.PROTOCOLS.values())
if cls and cls != ATProto))
our_bridged_dids = frozenset(user.get_copy(ATProto) for user in other_queries)
bridged_dids = frozenset(user.get_copy(ATProto) for user in other_queries)
logger.info(f'Loaded {len(our_atproto_dids)} ATProto, {len(our_bridged_dids)} bridged dids')
logger.info(f'Loaded {len(atproto_dids)} ATProto, {len(bridged_dids)} bridged dids')
logger.info(f'Subscribing to {os.environ["BGS_HOST"]} firehose')
while True:
try:
_subscribe(atproto_dids=atproto_dids, bridged_dids=bridged_dids)
except BaseException as err:
logger.error(f'reporting error, atproto_firehose.subscribe: {err}')
error_reporting_client.report_exception()
if not reconnect:
return
logger.info(f'disconnected! waiting {RECONNECT_DELAY} and then reconnecting')
time.sleep(RECONNECT_DELAY.total_seconds())
def _subscribe(atproto_dids=None, bridged_dids=None):
assert atproto_dids is not None and bridged_dids is not None, \
(atproto_dids, bridged_dids)
client = Client(f'https://{os.environ["BGS_HOST"]}')
cursor = None # TODO
@ -58,7 +87,7 @@ def subscribe():
repo = payload.get('repo')
assert repo
if repo in our_bridged_dids: # from a Bridgy Fed non-Bluesky user; ignore
if repo in bridged_dids: # from a Bridgy Fed non-Bluesky user; ignore
# logger.info(f'Ignoring record from our non-ATProto bridged user {repo}')
continue
@ -74,7 +103,7 @@ def subscribe():
assert op.action and op.path, (op.action, op.path)
cid = p_op['cid']
is_ours = repo in our_atproto_dids
is_ours = repo in atproto_dids
if is_ours and op.action == 'delete':
logger.info(f'Got delete from our ATProto user: {op}')
# TODO: also detect deletes of records that *reference* our bridged
@ -113,7 +142,7 @@ def subscribe():
else:
did = did_or_ref
if did and did in our_bridged_dids:
if did and did in bridged_dids:
add(subjects, did)
if type in ('app.bsky.feed.like', 'app.bsky.feed.repost'):
@ -145,8 +174,6 @@ def subscribe():
logger.info(f'Got one re our ATProto users {subjects}: {op.action} {op.repo} {op.path}')
new_commits.put(op)
logger.info('Ran out of events! Relay closed connection?')
def handle(limit=None):
"""Store Objects and create receive tasks for commits as they arrive.
@ -183,11 +210,14 @@ def handle(limit=None):
logger.error(f'Unknown action {action} for {op.repo} {op.path}')
continue
obj = Object.get_or_create(id=obj_id, actor=op.repo, status='new',
users=[ATProto(id=op.repo).key],
source_protocol=ATProto.LABEL, **record_kwarg)
create_task(queue='receive', obj=obj.key.urlsafe(), authed_as=op.repo)
try:
obj = Object.get_or_create(id=obj_id, actor=op.repo, status='new',
users=[ATProto(id=op.repo).key],
source_protocol=ATProto.LABEL, **record_kwarg)
create_task(queue='receive', obj=obj.key.urlsafe(), authed_as=op.repo)
except BaseException as err:
logger.error(f'reporting error, atproto_firehose.handle: {err}')
error_reporting_client.report_exception()
count += 1
if limit is not None and count >= limit:

Wyświetl plik

@ -94,7 +94,7 @@ class ATProtoFirehoseSubscribeTest(TestCase):
path='app.bsky.feed.post/abc123'):
FakeWebsocketClient.setup_receive(
Op(repo=repo, action=action, path=path, record=record))
subscribe()
subscribe(reconnect=False)
op = new_commits.get()
self.assertEqual(repo, op.repo)
@ -107,7 +107,7 @@ class ATProtoFirehoseSubscribeTest(TestCase):
path='app.bsky.feed.post/abc123'):
FakeWebsocketClient.setup_receive(
Op(repo=repo, action=action, path=path, record=record))
subscribe()
subscribe(reconnect=False)
self.assertTrue(new_commits.empty())
def test_error(self):
@ -116,7 +116,7 @@ class ATProtoFirehoseSubscribeTest(TestCase):
{'error': 'ConsumerTooSlow', 'message': 'ketchup!'},
)]
subscribe()
subscribe(reconnect=False)
self.assertTrue(new_commits.empty())
def test_info(self):
@ -125,7 +125,7 @@ class ATProtoFirehoseSubscribeTest(TestCase):
{'name': 'OutdatedCursor'},
)]
subscribe()
subscribe(reconnect=False)
self.assertTrue(new_commits.empty())
def test_non_commit(self):
@ -134,7 +134,7 @@ class ATProtoFirehoseSubscribeTest(TestCase):
{'seq': '123', 'did': 'did:abc', 'handle': 'hi.com'},
)]
subscribe()
subscribe(reconnect=False)
self.assertTrue(new_commits.empty())
def test_post_by_our_atproto_user(self):