ATProto firehose: fix race conditions in loading DIDs

for #978
in-reply-to-bridged
Ryan Barrett 2024-05-09 15:38:52 -07:00
rodzic 5ef4111b4d
commit d56f405c96
Nie znaleziono w bazie danych klucza dla tego podpisu
ID klucza GPG: 6BE31FDF4776E9D4
3 zmienionych plików z 20 dodań i 16 usunięć

Wyświetl plik

@ -38,24 +38,32 @@ new_commits = SimpleQueue()
atproto_dids = None
bridged_dids = None
loaded_dids_at = None
load_dids_thread = None
load_dids_lock = Lock()
def load_dids():
# start in a a separate thread since it needs to make its own NDB context
# when it runs in the timer thread
thread = Thread(target=_load_dids)
thread.start()
thread.join()
global load_dids_thread
with load_dids_lock:
if load_dids_thread:
return
# run in a separate thread since it needs to make its own NDB
# context when it runs in the timer thread
load_dids_thread = Thread(target=_load_dids)
load_dids_thread.start()
load_dids_thread.join()
def _load_dids():
global atproto_dids, bridged_dids, loaded_dids_at
global atproto_dids, bridged_dids, load_dids_thread
with load_dids_lock, ndb_client.context():
if loaded_dids_at and loaded_dids_at > util.now() - RECONNECT_DELAY:
return
if not DEBUG:
load_dids_thread = Timer(STORE_CURSOR_FREQ.total_seconds(), _load_dids)
load_dids_thread.start()
atproto_query = ATProto.query(ATProto.enabled_protocols != None)
atproto_dids = frozenset(key.id() for key in atproto_query.iter(keys_only=True))
@ -67,9 +75,6 @@ def _load_dids():
bridged_dids = frozenset(user.get_copy(ATProto) for user in others_queries)
logger.info(f'Loaded {len(atproto_dids)} ATProto, {len(bridged_dids)} bridged dids')
loaded_dids_at = util.now()
if not DEBUG:
Timer(STORE_CURSOR_FREQ.total_seconds(), _load_dids).start()
def subscribe(reconnect=True):
@ -253,7 +258,6 @@ def handle(limit=None):
continue
try:
logger.info(f'existing {obj_id}: {Object.get_by_id(obj_id)}')
obj = Object.get_or_create(id=obj_id, actor=op.repo, status='new',
users=[ATProto(id=op.repo).key],
source_protocol=ATProto.LABEL, **record_kwarg)

4
hub.py
Wyświetl plik

@ -88,14 +88,14 @@ if LOCAL_SERVER or not DEBUG:
atproto_firehose.subscribe()
assert 'atproto_firehose.subscribe' not in [t.name for t in threading.enumerate()]
Thread(target=subscribe, name='atproto_firehose.subscribe', daemon=True).start()
Thread(target=subscribe, name='atproto_firehose.subscribe').start()
def handle():
with appengine_config.ndb_client.context():
atproto_firehose.handle()
assert 'atproto_firehose.handle' not in [t.name for t in threading.enumerate()]
Thread(target=handle, name='atproto_firehose.handle', daemon=True).start()
Thread(target=handle, name='atproto_firehose.handle').start()
# send requestCrawl to relay

Wyświetl plik

@ -93,7 +93,7 @@ class ATProtoFirehoseSubscribeTest(TestCase):
atproto_firehose.atproto_dids = None
atproto_firehose.bridged_dids = None
atproto_firehose.loaded_dids_at = None
atproto_firehose.load_dids_thread = None
self.alice = self.make_user(
'eefake:alice', cls=ExplicitEnableFake,