ATProto firehose: add handle wrapper that catches exceptions

also refactor and simplify load_dids, error reporting. #978
pull/1049/head
Ryan Barrett 2024-05-10 19:59:04 -07:00
rodzic 206b7a344c
commit f2b5f79489
Nie znaleziono w bazie danych klucza dla tego podpisu
ID klucza GPG: 6BE31FDF4776E9D4
2 zmienionych plików z 27 dodań i 27 usunięć

Wyświetl plik

@ -40,18 +40,13 @@ new_commits = SimpleQueue()
atproto_dids = None
bridged_dids = None
dids_initialized = Event()
load_dids_lock = Lock()
def load_dids():
with load_dids_lock:
if dids_initialized.is_set():
return
# run in a separate thread since it needs to make its own NDB
# context when it runs in the timer thread
Thread(target=_load_dids).start()
dids_initialized.wait()
# run in a separate thread since it needs to make its own NDB
# context when it runs in the timer thread
Thread(target=_load_dids).start()
dids_initialized.wait()
def _load_dids():
@ -72,21 +67,13 @@ def _load_dids():
def subscribe(reconnect=True):
"""Subscribes to the relay's firehose.
Relay hostname comes from the ``BGS_HOST`` environment variable.
Args:
reconnect (bool): whether to always reconnect after we get disconnected
"""
"""Wrapper around :func:`_subscribe` that catches exceptions and restarts."""
logger.info(f'started thread to subscribe to {os.environ["BGS_HOST"]} firehose')
while True:
try:
_subscribe()
except BaseException:
if DEBUG:
raise
report_exception()
if not reconnect:
@ -96,6 +83,13 @@ def subscribe(reconnect=True):
def _subscribe():
"""Subscribes to the relay's firehose.
Relay hostname comes from the ``BGS_HOST`` environment variable.
Args:
reconnect (bool): whether to always reconnect after we get disconnected
"""
load_dids()
cursor = Cursor.get_by_id(
@ -212,18 +206,19 @@ def _subscribe():
def handle(limit=None):
"""Store Objects and create receive tasks for commits as they arrive.
"""Wrapper around :func:`_handle` that catches exceptions and restarts."""
logger.info(f'started handle thread to store objects and enqueue receive tasks')
:meth:`Object.get_or_create` makes network calls, via eg :meth:`Object.as1`
=> :meth:`ATProto.pds_for` and :meth:`ATProto.handle`, so we don't want to
do those in the critical path in :func:`subscribe`.
while True:
try:
_handle(limit=limit)
# if we return cleanly, that means we hit the limit
break
except BaseException:
report_exception()
Args:
limit (int): return after handling this many commits
"""
logger.info(f'started thread to store objects and enqueue receive tasks')
load_dids()
def _handle(limit=None):
cursor = Cursor.get_by_id(
f'{os.environ["BGS_HOST"]} com.atproto.sync.subscribeRepos')
assert cursor

Wyświetl plik

@ -336,8 +336,13 @@ def report_exception(**kwargs):
https://cloud.google.com/error-reporting/docs/reference/libraries#client-libraries-install-python
If ``DEBUG`` is ``True``, re-raises the exception instead.
Duplicated in ``bridgy.util``.
"""
if DEBUG:
raise
try:
error_reporting_client.report_exception(**kwargs)
except BaseException: