kopia lustrzana https://github.com/snarfed/bridgy-fed
ATProto firehose: encode Object.bsky as compact DAG-JSON
fixes https://console.cloud.google.com/errors/detail/CIGJnbnCq7fm1gE;time=P30D?project=bridgy-federated and similarpull/1049/head
rodzic
b49fe13e59
commit
741146eb8d
|
|
@ -17,6 +17,7 @@ from lexrpc.client import Client
|
||||||
from oauth_dropins.webutil import util
|
from oauth_dropins.webutil import util
|
||||||
from oauth_dropins.webutil.appengine_config import ndb_client
|
from oauth_dropins.webutil.appengine_config import ndb_client
|
||||||
from oauth_dropins.webutil.appengine_info import DEBUG
|
from oauth_dropins.webutil.appengine_info import DEBUG
|
||||||
|
from oauth_dropins.webutil.util import json_loads
|
||||||
|
|
||||||
from atproto import ATProto, Cursor
|
from atproto import ATProto, Cursor
|
||||||
from common import add, create_task, report_exception
|
from common import add, create_task, report_exception
|
||||||
|
|
@ -236,10 +237,10 @@ def handle(limit=None):
|
||||||
at_uri = f'at://{op.repo}/{op.path}'
|
at_uri = f'at://{op.repo}/{op.path}'
|
||||||
|
|
||||||
# store object, enqueue receive task
|
# store object, enqueue receive task
|
||||||
# TODO: for Object.bsky, does record have CIDs etc? how do we store?
|
|
||||||
# dag-json? how are polls doing this?
|
|
||||||
if op.action in ('create', 'update'):
|
if op.action in ('create', 'update'):
|
||||||
record_kwarg = {'bsky': op.record}
|
record_kwarg = {
|
||||||
|
'bsky': json_loads(dag_json.encode(op.record, compact=True)),
|
||||||
|
}
|
||||||
obj_id = at_uri
|
obj_id = at_uri
|
||||||
elif op.action == 'delete':
|
elif op.action == 'delete':
|
||||||
obj_id = f'{at_uri}#delete'
|
obj_id = f'{at_uri}#delete'
|
||||||
|
|
|
||||||
|
|
@ -1,4 +1,5 @@
|
||||||
"""Unit tests for atproto_firehose.py."""
|
"""Unit tests for atproto_firehose.py."""
|
||||||
|
import copy
|
||||||
from datetime import timedelta, timezone
|
from datetime import timedelta, timezone
|
||||||
from unittest import skip
|
from unittest import skip
|
||||||
from unittest.mock import patch
|
from unittest.mock import patch
|
||||||
|
|
@ -31,6 +32,8 @@ import protocol
|
||||||
from .testutil import ExplicitEnableFake, Fake, TestCase
|
from .testutil import ExplicitEnableFake, Fake, TestCase
|
||||||
from .test_atproto import DID_DOC
|
from .test_atproto import DID_DOC
|
||||||
|
|
||||||
|
A_CID = CID.decode('bafkreicqpqncshdd27sgztqgzocd3zhhqnnsv6slvzhs5uz6f57cq6lmtq')
|
||||||
|
|
||||||
|
|
||||||
class FakeWebsocketClient:
|
class FakeWebsocketClient:
|
||||||
"""Fake of :class:`simple_websocket.Client`."""
|
"""Fake of :class:`simple_websocket.Client`."""
|
||||||
|
|
@ -50,19 +53,18 @@ class FakeWebsocketClient:
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def setup_receive(cls, op):
|
def setup_receive(cls, op):
|
||||||
cid = CID.decode('bafkreicqpqncshdd27sgztqgzocd3zhhqnnsv6slvzhs5uz6f57cq6lmtq')
|
|
||||||
if op.action == 'delete':
|
if op.action == 'delete':
|
||||||
block_bytes = b''
|
block_bytes = b''
|
||||||
else:
|
else:
|
||||||
block = Block(decoded=op.record)
|
block = Block(decoded=op.record)
|
||||||
block_bytes = write_car([cid], [block])
|
block_bytes = write_car([A_CID], [block])
|
||||||
|
|
||||||
cls.to_receive = [({
|
cls.to_receive = [({
|
||||||
'op': 1,
|
'op': 1,
|
||||||
't': '#commit',
|
't': '#commit',
|
||||||
}, {
|
}, {
|
||||||
'blocks': block_bytes,
|
'blocks': block_bytes,
|
||||||
'commit': cid,
|
'commit': A_CID,
|
||||||
'ops': [{
|
'ops': [{
|
||||||
'action': op.action,
|
'action': op.action,
|
||||||
'cid': None if op.action == 'delete' else block.cid,
|
'cid': None if op.action == 'delete' else block.cid,
|
||||||
|
|
@ -176,8 +178,15 @@ class ATProtoFirehoseSubscribeTest(TestCase):
|
||||||
'$type': 'app.bsky.feed.post',
|
'$type': 'app.bsky.feed.post',
|
||||||
'reply': {
|
'reply': {
|
||||||
'$type': 'app.bsky.feed.post#replyRef',
|
'$type': 'app.bsky.feed.post#replyRef',
|
||||||
'parent': {'uri': 'at://did:alice/app.bsky.feed.post/tid'},
|
'parent': {
|
||||||
'root': {'uri': '-'},
|
'uri': 'at://did:alice/app.bsky.feed.post/tid',
|
||||||
|
# test that we encode CIDs and bytes as JSON
|
||||||
|
'cid': A_CID,
|
||||||
|
},
|
||||||
|
'root': {
|
||||||
|
'uri': '-',
|
||||||
|
'cid': A_CID,
|
||||||
|
},
|
||||||
},
|
},
|
||||||
})
|
})
|
||||||
|
|
||||||
|
|
@ -364,14 +373,21 @@ class ATProtoFirehoseHandleTest(TestCase):
|
||||||
atproto_firehose.dids_initialized.clear()
|
atproto_firehose.dids_initialized.clear()
|
||||||
|
|
||||||
def test_handle_create(self, mock_create_task):
|
def test_handle_create(self, mock_create_task):
|
||||||
|
reply = copy.deepcopy(REPLY_BSKY)
|
||||||
|
# test that we encode CIDs and bytes as JSON
|
||||||
|
reply['reply']['root']['cid'] = reply['reply']['parent']['cid'] = A_CID
|
||||||
|
|
||||||
new_commits.put(Op(repo='did:plc:user', action='create', seq=789,
|
new_commits.put(Op(repo='did:plc:user', action='create', seq=789,
|
||||||
path='app.bsky.feed.post/123', record=POST_BSKY))
|
path='app.bsky.feed.post/123', record=reply))
|
||||||
|
|
||||||
handle(limit=1)
|
handle(limit=1)
|
||||||
|
|
||||||
|
expected = copy.deepcopy(REPLY_BSKY)
|
||||||
|
expected['reply']['root']['cid'] = expected['reply']['parent']['cid'] = \
|
||||||
|
A_CID.encode()
|
||||||
user_key = ATProto(id='did:plc:user').key
|
user_key = ATProto(id='did:plc:user').key
|
||||||
obj = self.assert_object('at://did:plc:user/app.bsky.feed.post/123',
|
obj = self.assert_object('at://did:plc:user/app.bsky.feed.post/123',
|
||||||
bsky=POST_BSKY, source_protocol='atproto',
|
bsky=expected, source_protocol='atproto',
|
||||||
status='new', users=[user_key],
|
status='new', users=[user_key],
|
||||||
ignore=['our_as1'])
|
ignore=['our_as1'])
|
||||||
self.assert_task(mock_create_task, 'receive', '/queue/receive',
|
self.assert_task(mock_create_task, 'receive', '/queue/receive',
|
||||||
|
|
|
||||||
Ładowanie…
Reference in New Issue