atproto_firehose: delay parsing payload until we know it's a commit

again, this time for libipld

for #1295
pull/1310/head
Ryan Barrett 2024-09-06 12:23:15 -07:00
rodzic 48c9b5c7e7
commit b0de078732
Nie znaleziono w bazie danych klucza dla tego podpisu
ID klucza GPG: 6BE31FDF4776E9D4
1 zmienionych plików z 5 dodań i 3 usunięć

Wyświetl plik

@ -218,21 +218,23 @@ def handle(limit=None):
return return
while frame := events.get(): while frame := events.get():
header, payload = libipld.decode_dag_cbor_multi(frame) header = libipld.decode_dag_cbor(frame)
buf = BytesIO(frame) # buf = BytesIO(frame)
# parse header # parse header
if header.get('op') == -1: if header.get('op') == -1:
_, payload = libipld.decode_dag_cbor_multi(frame)
logger.warning(f'Got error from relay! {payload}') logger.warning(f'Got error from relay! {payload}')
continue continue
t = header.get('t') t = header.get('t')
if t != '#commit': if t != '#commit':
if t not in ('#account', '#identity', '#handle'): if t not in ('#account', '#identity', '#handle', '#tombstone'):
logger.info(f'Got {t} from relay') logger.info(f'Got {t} from relay')
continue continue
# parse payload # parse payload
_, payload = libipld.decode_dag_cbor_multi(frame)
repo = payload.get('repo') repo = payload.get('repo')
if not repo: if not repo:
logger.warning(f'Payload missing repo! {payload}') logger.warning(f'Payload missing repo! {payload}')