bluesky: remove sync XRPC method implementations

they're moving to arroba
pull/510/head
Ryan Barrett 2023-05-22 12:56:21 -07:00
rodzic a580470e2e
commit 259e982986
Nie znaleziono w bazie danych klucza dla tego podpisu
ID klucza GPG: 6BE31FDF4776E9D4
4 zmienionych plików z 1 dodań i 589 usunięć

Wyświetl plik

@ -23,7 +23,6 @@ jobs:
curl https://packages.cloud.google.com/apt/doc/apt-key.gpg | sudo apt-key --keyring /usr/share/keyrings/cloud.google.gpg add -
sudo apt-get update
sudo apt-get install -y apt-transport-https ca-certificates gnupg google-cloud-sdk google-cloud-sdk-datastore-emulator default-jre
git clone --depth=1 https://github.com/bluesky-social/atproto.git ../atproto
- run:
name: Python dependencies

2
app.py
Wyświetl plik

@ -6,4 +6,4 @@ registered.
from flask_app import app
# import all modules to register their Flask handlers
import activitypub, atproto, follow, pages, redirect, render, superfeedr, webfinger, webmention, xrpc_actor, xrpc_feed, xrpc_graph
import activitypub, follow, pages, redirect, render, superfeedr, webfinger, webmention, xrpc_actor, xrpc_feed, xrpc_graph

Wyświetl plik

@ -1,299 +0,0 @@
"""com.atproto.sync XRPC methods."""
from collections import namedtuple
import json
import logging
import random
import re
from arroba.mst import MST, serialize_node_data
from arroba.util import dag_cbor_cid, sign_commit
from Crypto.PublicKey import ECC
import dag_cbor.encoding
from flask import g
from google.cloud.ndb.query import OR
from granary import bluesky
from multiformats import CID, multibase, multicodec, multihash
from oauth_dropins.webutil import util
from flask_app import xrpc_server
from models import Follower, Object, PAGE_SIZE, User
logger = logging.getLogger(__name__)
# https://atproto.com/specs/atp#repo-data-layout
# TODO: remove? unused?
Commit = namedtuple('Commit', [
'did', # CID
'version', # int, always 2?
'prev', # CID, previous (root?) commit
'data', # CID, MST's root node
'sig', # bytes
])
def build_repo(did=None, user=None, earliest=None, latest=None):
"""Builds a single user's repo, including commits, records, and MST.
Either did or user must be provided.
Args:
did: str did:web DID
earliest: str, base32-encoded CID
latest: str, base32-encoded CID
Returns:
([dict node, ...], :class:`MST`) tuple. First element is the
chain of repo nodes, latest to earliest.
Raises ValueError if did is not did:web or no user exists with that domain.
"""
assert (did is None) ^ (user is None)
if did:
domain = util.domain_from_link(bluesky.did_web_to_url(did), minimize=False)
if not domain:
raise ValueError(f'No domain found in {did}')
user = User.get_by_id(domain)
if not user:
raise ValueError(f'No user found for domain {domain}')
if earliest:
earliest = CID.decode(earliest)
if latest:
latest = CID.decode(latest)
inside = (earliest is None)
# collect Bluesky records
# maps repo path '[collection]/[rkey]' to app.bsky record dict
records = {}
records['app.bsky.actor.profile/self'] = bluesky.as1_to_profile(user.to_as1())
for obj in Object.query(Object.domains == user.key.id(),
Object.labels == 'user'):
if not obj.bsky:
logging.debug(f'Skipping {obj.key}')
path = f'{obj.bsky["$type"]}/{datetime_to_tid(obj.created)}'
records[path] = obj.bsky
for follower in Follower.query(Follower.status == 'active',
OR(Follower.src == domain,
Follower.dest == domain)):
bsky = bluesky.from_as1(follower.to_as1())
if not bsky:
logging.debug(f'Skipping {follower.key}')
path = f'{bsky["$type"]}/{datetime_to_tid(follower.created)}'
records[path] = bsky
# build MST and commit chain
nodes = []
mst = MST()
commit = None
for path, record in records.items():
# construct the record
logger.debug(f'Generating node for {path} {record}')
nodes.append(record)
cid = dag_cbor_cid(record)
# add to MST if we're inside the query range
if inside:
logger.debug(f'Adding to MST: {path} {cid}')
mst = mst.add(path, cid)
if cid == latest:
# latest is inclusive
inside = False
elif cid == earliest:
# earliest is exclusive
inside = True
# serialize and add all MST nodes
serialized_mst = serialize_node_data(mst.all_nodes())._asdict()
# TODO: subtree and leaf nodes?
nodes.append(serialized_mst)
# create and sign a commit
# NOTE: prev is the CID of the last *signed* commit, including sig field
prev_cid = dag_cbor_cid(commit) if commit else None
commit = {
'version': 2,
# TODO: real DID handling, including did:plc
'did': f'did:web:{domain}',
'prev': prev_cid,
'data': dag_cbor_cid(serialized_mst),
}
sign_commit(commit, ECC.import_key(user.p256_key))
nodes.append(commit)
return nodes, mst
@xrpc_server.method('com.atproto.sync.getBlob')
def get_blob(input, ):
"""
Args:
Returns:
"""
@xrpc_server.method('com.atproto.sync.getBlocks')
def get_blocks(input, did=None, cids=None):
"""Gets blocks from a given repo by their CIDs.
Ignores any unknown CIDs.
Args:
did: str
cids: list of str base32-encoded :class:`CID`s
Returns:
bytes, binary DAG-CBOR, application/vnd.ipld.car
"""
if cids is None:
cids = []
cids = [CID.decode(cid) for cid in cids]
blocks, _ = build_repo(did=did)
return dag_cbor.encoding.encode([blocks[cid] for cid in cids
if cid in blocks])
@xrpc_server.method('com.atproto.sync.getCheckout')
def get_checkout(input, ):
"""
Args:
Returns:
"""
@xrpc_server.method('com.atproto.sync.getCommitPath')
def get_commit_path(input, ):
"""
Args:
Returns:
"""
@xrpc_server.method('com.atproto.sync.getHead')
def get_head(input, did=None):
"""
Args:
did: str
Returns:
str, :class:`CID`
"""
_, mst = build_repo(did=did)
return {'root': mst.get_pointer().encode('base32')}
@xrpc_server.method('com.atproto.sync.getRecord')
def get_record(input, ):
"""
Args:
Returns:
"""
@xrpc_server.method('com.atproto.sync.getRepo')
def get_repo(input, did, earliest=None, latest=None):
"""Gets a repo's current MST.
Args:
did: str
earliest: optional str, :class:`CID`, exclusive
latest: optional str, :class:`CID`, inclusive
Returns:
bytes, binary DAG-CBOR, application/vnd.ipld.car
"""
nodes, mst = build_repo(did=did, earliest=earliest, latest=latest)
return dag_cbor.encoding.encode(nodes)
@xrpc_server.method('com.atproto.sync.listBlobs')
def list_blobs(input, ):
"""
Args:
Returns:
"""
@xrpc_server.method('com.atproto.sync.listRepos')
def list_repos(input, ):
"""List dids and root cids of hosted repos.
Args:
limit: int
cursor: str, not yet supported. TODO
Returns:
list of repos (DID + head CID)
"""
return {
# TODO: cursor
'repos': [{
'did': f'did:web:{user.key.id()}',
'head': build_repo(user=user)[1].get_pointer().encode('base32')
} for user in User.query() if not user.use_instead]
}
@xrpc_server.method('com.atproto.sync.notifyOfUpdate')
def notify_of_update(input, ):
"""
Args:
Returns:
"""
@xrpc_server.method('com.atproto.sync.requestCrawl')
def request_crawl(input, ):
"""
Args:
Returns:
"""
@xrpc_server.method('com.atproto.sync.subscribeRepos')
def subscribe_repos(input, ):
"""
Args:
Returns:
"""

Wyświetl plik

@ -1,288 +0,0 @@
"""Unit tests for atproto.py."""
from base64 import b64encode
import copy
import random
from unittest import skip
from unittest.mock import patch
import atproto
import arroba.util
from arroba.util import dag_cbor_cid, verify_commit_sig
from Crypto.PublicKey import ECC
import dag_cbor.decoding, dag_cbor.encoding
from granary import as2, bluesky
from granary.tests.test_bluesky import (
ACTOR_AS,
ACTOR_PROFILE_BSKY,
POST_AS,
POST_BSKY,
REPLY_AS,
REPLY_BSKY,
REPOST_AS,
REPOST_BSKY,
)
from multiformats import CID
from oauth_dropins.webutil import util
from oauth_dropins.webutil.testutil import NOW
from flask_app import app
from models import Follower, Object, User
from . import testutil
# # arroba.mst.Data entry for MST with POST_AS, REPLY_AS, and REPOST_AS
# POST_CID = 'bafyreic5xwex7jxqvliumozkoli3qy2hzxrmui6odl7ujrcybqaypacfiy'
# REPLY_CID = 'bafyreib55ro37wasiflouvlfenhzllorcthm7flr2nj4fnk7yjo54cagvm'
# REPOST_CID = 'bafyreiek3jnp6e4sussy4c7pwtbkkf3kepekzycylowwuepmnvq7aeng44'
# ACTOR_CID = dag_cbor_cid(ACTOR_PROFILE_BSKY)
# HEAD_CID = 'bafyreiagk7qmor3gckkm6dts7c32frtnyn4reznclojgjraqwoumecenx4'
# HEAD_CID_EMPTY = 'bafyreie5737gdxlw5i64vzichcalba3z2v5n6icifvx5xytvske7mr3hpm'
# REPO_ENTRIES = {
# 'l': CID.decode(HEAD_CID),
# 'e': [{
# 'k': b'app.bsky.feed.feedViewPost/baxkjoxgdgnaqbbi',
# 'v': CID.decode(POST_CID),
# 'p': 0,
# 't': None,
# }, {
# 'k': b'babbi',
# 'v': CID.decode(REPLY_CID),
# 'p': 38,
# 't': None,
# }, {
# 'k': b'qbbi',
# 'v': CID.decode(REPOST_CID),
# 'p': 39,
# 't': None,
# }],
# }
class AtProtoTest(testutil.TestCase):
def setUp(self):
super().setUp()
# used in now(), injected into Object.created so that TIDs are deterministic
self.last_now = NOW.replace(tzinfo=None)
def now(self):
self.last_now = self.last_now.replace(microsecond=self.last_now.microsecond + 1)
return self.last_now
@patch('models.Object.created._now')
def make_objects(self, mock_now):
mock_now.side_effect = self.now
with app.test_request_context('/'):
Object(id='a', domains=['user.com'], labels=['user'], as2=POST_AS).put()
Object(id='b', domains=['user.com'], labels=['user'], our_as1=REPLY_AS).put()
Object(id='c', domains=['user.com'], labels=['user'], our_as1=REPOST_AS).put()
# not outbound from user
Object(id='d', domains=['user.com'], labels=['feed'], our_as1=POST_AS).put()
# other user's
Object(id='f', domains=['bar.org'], labels=['user'], our_as1=POST_AS).put()
@skip
def test_get_blocks_empty(self):
self.make_user('user.com')
resp = self.client.get('/xrpc/com.atproto.sync.getBlocks', query_string={
'did': 'did:web:user.com',
'cids': [],
})
self.assertEqual(200, resp.status_code)
self.assertEqual([], dag_cbor.decoding.decode(resp.get_data()))
@skip
def test_get_blocks(self):
self.make_user('user.com')
self.make_objects()
resp = self.client.get('/xrpc/com.atproto.sync.getBlocks', query_string={
'did': 'did:web:user.com',
'cids': [REPLY_CID, REPOST_CID],
})
self.assertEqual(200, resp.status_code)
self.assertEqual([REPLY_BSKY, REPOST_BSKY],
dag_cbor.decoding.decode(resp.get_data()))
def test_get_blocks_error_not_did_web(self):
resp = self.client.get('/xrpc/com.atproto.sync.getBlocks', query_string={
'did': 'did:plc:foo',
'cids': [],
})
self.assertEqual(400, resp.status_code, resp.get_data(as_text=True))
def test_get_blocks_error_no_domain_in_did(self):
resp = self.client.get('/xrpc/com.atproto.sync.getBlocks', query_string={
'did': 'did:web:',
'cids': [],
})
self.assertEqual(400, resp.status_code, resp.get_data(as_text=True))
def test_get_blocks_error_no_user(self):
resp = self.client.get('/xrpc/com.atproto.sync.getBlocks', query_string={
'did': 'did:web:nope.com',
'cids': [],
})
self.assertEqual(400, resp.status_code, resp.get_data(as_text=True))
# def test_get_checkout(self):
# def test_get_commit_path(self):
@skip
def test_get_head_empty(self):
self.make_user('user.com')
resp = self.client.get('/xrpc/com.atproto.sync.getHead', query_string={
'did': 'did:web:user.com',
})
self.assertEqual(200, resp.status_code)
self.assertEqual({'root': HEAD_CID_EMPTY}, resp.json)
@skip
def test_get_head(self):
self.make_user('user.com')
self.make_objects()
resp = self.client.get('/xrpc/com.atproto.sync.getHead', query_string={
'did': 'did:web:user.com',
})
self.assertEqual(200, resp.status_code)
self.assertEqual({'root': HEAD_CID}, resp.json)
# alone: bafyreidk5xw2dqskokvhioznjhnha5am4nstrrmd2in7w7bmbuzpwnxlhq
# with test_get_repo: bafyreif754hxy2df3hkhzqbwccvd53imxex35zrx4gou3dvjql6mavxo6a
# def test_get_record(self):
@skip
def test_get_repo_empty_user(self):
self.make_user('user.com')
resp = self.client.get('/xrpc/com.atproto.sync.getRepo',
query_string={'did': 'did:web:user.com'})
self.assertEqual(200, resp.status_code)
def test_get_repo_profile(self):
user = self.make_user('user.com', actor_as2=as2.from_as1(ACTOR_AS))
mst = {
'e': [{
'k': b'app.bsky.actor.profile/self',
'v': dag_cbor_cid(ACTOR_PROFILE_BSKY),
'p': 0,
't': None,
}],
'l': CID.decode('bafyreiceqwd3lxb3u5pqe5qjp6v7cq7jxinbspknekibbpm4bfv54fedoe'),
}
commit = {
'version': 2,
'did': 'did:web:user.com',
'data': dag_cbor_cid(mst),
'prev': None,
}
resp = self.client.get('/xrpc/com.atproto.sync.getRepo',
query_string={'did': 'did:web:user.com'})
self.assertEqual(200, resp.status_code)
got = dag_cbor.decoding.decode(resp.get_data())
# extract and verify signature
commit = got[2]
assert verify_commit_sig(commit, ECC.import_key(user.p256_key))
del commit['sig']
self.assertEqual([
ACTOR_PROFILE_BSKY,
mst,
commit,
], got)
@skip
def test_get_repo(self):
self.make_user('user.com')
self.make_objects()
resp = self.client.get('/xrpc/com.atproto.sync.getRepo',
query_string={'did': 'did:web:user.com'})
self.assertEqual(200, resp.status_code)
decoded = dag_cbor.decoding.decode(resp.get_data())
self.assertEqual(REPO_ENTRIES, decoded)
@skip
def test_get_repo_latest_earliest(self):
self.make_user('user.com')
self.make_objects()
resp = self.client.get('/xrpc/com.atproto.sync.getRepo', query_string={
'did': 'did:web:user.com',
'latest': REPO_ENTRIES['e'][1]['v'].encode('base32'),
'earliest': REPO_ENTRIES['e'][0]['v'].encode('base32'),
})
self.assertEqual(200, resp.status_code)
decoded = dag_cbor.decoding.decode(resp.get_data())
self.assertEqual({
'l': CID.decode(
'bafyreieohwgp723mmvfsrg3mxle3azuf2u5ly6h3azlslubalqh5thwrxq'),
'e': [{
'k': b'app.bsky.feed.feedViewPost/baxkjoxgdgnbabce',
'v': CID.decode(
'bafyreib55ro37wasiflouvlfenhzllorcthm7flr2nj4fnk7yjo54cagvm'),
'p': 0,
't': None,
}],
}, decoded)
def test_get_repo_error_not_did_web(self):
resp = self.client.get('/xrpc/com.atproto.sync.getRepo',
query_string={'did': 'did:plc:foo'})
self.assertEqual(400, resp.status_code, resp.get_data(as_text=True))
def test_get_repo_error_no_domain_in_did(self):
resp = self.client.get('/xrpc/com.atproto.sync.getRepo',
query_string={'did': 'did:web:'})
self.assertEqual(400, resp.status_code, resp.get_data(as_text=True))
def test_get_repo_error_no_user(self):
resp = self.client.get('/xrpc/com.atproto.sync.getRepo',
query_string={'did': 'did:web:nope.com'})
self.assertEqual(400, resp.status_code, resp.get_data(as_text=True))
# def test_list_blobs(self):
def test_list_repos_empty(self):
resp = self.client.get('/xrpc/com.atproto.sync.listRepos')
self.assertEqual(200, resp.status_code, resp.get_data(as_text=True))
self.assertEqual({'repos': []}, resp.json)
@skip
def test_list_repos(self):
self.make_user('user.com')
self.make_objects()
self.make_user('other.com')
resp = self.client.get('/xrpc/com.atproto.sync.listRepos')
self.assertEqual(200, resp.status_code, resp.get_data(as_text=True))
self.assertEqual({'repos': [{
'did': 'did:web:other.com',
'head': HEAD_CID_EMPTY,
}, {
'did': 'did:web:user.com',
'head': HEAD_CID,
}]}, resp.json)
# # /Users/ryan/src/atproto/packages/pds/tests/sync/list.test.ts
# def test_notify_of_update(self):
# def test_request_crawl(self):
# def test_subscribe_repos(self):