atproto_firehose: switch CAR decoding from carbox to libipld

for #1295
pull/1310/head
Ryan Barrett 2024-09-06 11:55:54 -07:00
rodzic 3823379334
commit 48c9b5c7e7
Nie znaleziono w bazie danych klucza dla tego podpisu
ID klucza GPG: 6BE31FDF4776E9D4
1 zmienionych plików z 6 dodań i 7 usunięć

Wyświetl plik

@ -218,13 +218,11 @@ def handle(limit=None):
return
while frame := events.get():
# header, payload = libipld.decode_dag_cbor_multi(frame)
header, payload = libipld.decode_dag_cbor_multi(frame)
buf = BytesIO(frame)
header = dag_cbor.decode(buf, allow_concat=True)
# parse header
if header.get('op') == -1:
payload = dag_cbor.decode(buf)
logger.warning(f'Got error from relay! {payload}')
continue
@ -235,7 +233,6 @@ def handle(limit=None):
continue
# parse payload
payload = dag_cbor.decode(buf)
repo = payload.get('repo')
if not repo:
logger.warning(f'Payload missing repo! {payload}')
@ -264,8 +261,9 @@ def handle(limit=None):
blocks = {}
if block_bytes := payload.get('blocks'):
_, blocks = read_car(block_bytes)
blocks = {block.cid: block for block in blocks}
# _, blocks = read_car(block_bytes)
# blocks = {block.cid: block for block in blocks}
_, blocks = libipld.decode_car(block_bytes)
# detect records that reference an ATProto user, eg replies, likes,
# reposts, mentions
@ -294,7 +292,8 @@ def handle(limit=None):
continue
try:
op = Op(*op[:-1], record=block.decoded)
# op = Op(*op[:-1], record=block)
op = Op(*op[:-1], record=block)
except BaseException:
# https://github.com/hashberg-io/dag-cbor/issues/14
logger.error(f"Couldn't decode block {cid} seq {op.seq}",