diff --git a/atproto_firehose.py b/atproto_firehose.py index 36b7b929..6f301331 100644 --- a/atproto_firehose.py +++ b/atproto_firehose.py @@ -218,13 +218,11 @@ def handle(limit=None): return while frame := events.get(): - # header, payload = libipld.decode_dag_cbor_multi(frame) + header, payload = libipld.decode_dag_cbor_multi(frame) buf = BytesIO(frame) - header = dag_cbor.decode(buf, allow_concat=True) # parse header if header.get('op') == -1: - payload = dag_cbor.decode(buf) logger.warning(f'Got error from relay! {payload}') continue @@ -235,7 +233,6 @@ def handle(limit=None): continue # parse payload - payload = dag_cbor.decode(buf) repo = payload.get('repo') if not repo: logger.warning(f'Payload missing repo! {payload}') @@ -264,8 +261,9 @@ def handle(limit=None): blocks = {} if block_bytes := payload.get('blocks'): - _, blocks = read_car(block_bytes) - blocks = {block.cid: block for block in blocks} + # _, blocks = read_car(block_bytes) + # blocks = {block.cid: block for block in blocks} + _, blocks = libipld.decode_car(block_bytes) # detect records that reference an ATProto user, eg replies, likes, # reposts, mentions @@ -294,7 +292,8 @@ def handle(limit=None): continue try: - op = Op(*op[:-1], record=block.decoded) + # op = Op(*op[:-1], record=block) + op = Op(*op[:-1], record=block) except BaseException: # https://github.com/hashberg-io/dag-cbor/issues/14 logger.error(f"Couldn't decode block {cid} seq {op.seq}",