diff --git a/.circleci/config.yml b/.circleci/config.yml index 74e2381..c6c0ed4 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -23,7 +23,6 @@ jobs: curl https://packages.cloud.google.com/apt/doc/apt-key.gpg | sudo apt-key --keyring /usr/share/keyrings/cloud.google.gpg add - sudo apt-get update sudo apt-get install -y apt-transport-https ca-certificates gnupg google-cloud-sdk google-cloud-sdk-datastore-emulator default-jre - git clone --depth=1 https://github.com/bluesky-social/atproto.git ../atproto - run: name: Python dependencies diff --git a/app.py b/app.py index 2c5f825..43d2a41 100644 --- a/app.py +++ b/app.py @@ -6,4 +6,4 @@ registered. from flask_app import app # import all modules to register their Flask handlers -import activitypub, atproto, follow, pages, redirect, render, superfeedr, webfinger, webmention, xrpc_actor, xrpc_feed, xrpc_graph +import activitypub, follow, pages, redirect, render, superfeedr, webfinger, webmention, xrpc_actor, xrpc_feed, xrpc_graph diff --git a/atproto.py b/atproto.py deleted file mode 100644 index 8d3746a..0000000 --- a/atproto.py +++ /dev/null @@ -1,299 +0,0 @@ -"""com.atproto.sync XRPC methods.""" -from collections import namedtuple -import json -import logging -import random -import re - -from arroba.mst import MST, serialize_node_data -from arroba.util import dag_cbor_cid, sign_commit -from Crypto.PublicKey import ECC -import dag_cbor.encoding -from flask import g -from google.cloud.ndb.query import OR -from granary import bluesky -from multiformats import CID, multibase, multicodec, multihash -from oauth_dropins.webutil import util - -from flask_app import xrpc_server -from models import Follower, Object, PAGE_SIZE, User - -logger = logging.getLogger(__name__) - -# https://atproto.com/specs/atp#repo-data-layout -# TODO: remove? unused? -Commit = namedtuple('Commit', [ - 'did', # CID - 'version', # int, always 2? - 'prev', # CID, previous (root?) commit - 'data', # CID, MST's root node - 'sig', # bytes -]) - - -def build_repo(did=None, user=None, earliest=None, latest=None): - """Builds a single user's repo, including commits, records, and MST. - - Either did or user must be provided. - - Args: - did: str did:web DID - earliest: str, base32-encoded CID - latest: str, base32-encoded CID - - Returns: - ([dict node, ...], :class:`MST`) tuple. First element is the - chain of repo nodes, latest to earliest. - - Raises ValueError if did is not did:web or no user exists with that domain. - """ - assert (did is None) ^ (user is None) - - if did: - domain = util.domain_from_link(bluesky.did_web_to_url(did), minimize=False) - if not domain: - raise ValueError(f'No domain found in {did}') - - user = User.get_by_id(domain) - if not user: - raise ValueError(f'No user found for domain {domain}') - - if earliest: - earliest = CID.decode(earliest) - if latest: - latest = CID.decode(latest) - - inside = (earliest is None) - - # collect Bluesky records - # maps repo path '[collection]/[rkey]' to app.bsky record dict - records = {} - - records['app.bsky.actor.profile/self'] = bluesky.as1_to_profile(user.to_as1()) - - for obj in Object.query(Object.domains == user.key.id(), - Object.labels == 'user'): - if not obj.bsky: - logging.debug(f'Skipping {obj.key}') - path = f'{obj.bsky["$type"]}/{datetime_to_tid(obj.created)}' - records[path] = obj.bsky - - for follower in Follower.query(Follower.status == 'active', - OR(Follower.src == domain, - Follower.dest == domain)): - bsky = bluesky.from_as1(follower.to_as1()) - if not bsky: - logging.debug(f'Skipping {follower.key}') - path = f'{bsky["$type"]}/{datetime_to_tid(follower.created)}' - records[path] = bsky - - # build MST and commit chain - nodes = [] - mst = MST() - commit = None - for path, record in records.items(): - # construct the record - logger.debug(f'Generating node for {path} {record}') - nodes.append(record) - cid = dag_cbor_cid(record) - - # add to MST if we're inside the query range - if inside: - logger.debug(f'Adding to MST: {path} {cid}') - mst = mst.add(path, cid) - if cid == latest: - # latest is inclusive - inside = False - elif cid == earliest: - # earliest is exclusive - inside = True - - # serialize and add all MST nodes - serialized_mst = serialize_node_data(mst.all_nodes())._asdict() - # TODO: subtree and leaf nodes? - nodes.append(serialized_mst) - - # create and sign a commit - # NOTE: prev is the CID of the last *signed* commit, including sig field - prev_cid = dag_cbor_cid(commit) if commit else None - commit = { - 'version': 2, - # TODO: real DID handling, including did:plc - 'did': f'did:web:{domain}', - 'prev': prev_cid, - 'data': dag_cbor_cid(serialized_mst), - } - - sign_commit(commit, ECC.import_key(user.p256_key)) - nodes.append(commit) - - return nodes, mst - - -@xrpc_server.method('com.atproto.sync.getBlob') -def get_blob(input, ): - """ - - Args: - - - Returns: - - """ - - -@xrpc_server.method('com.atproto.sync.getBlocks') -def get_blocks(input, did=None, cids=None): - """Gets blocks from a given repo by their CIDs. - - Ignores any unknown CIDs. - - Args: - did: str - cids: list of str base32-encoded :class:`CID`s - - Returns: - bytes, binary DAG-CBOR, application/vnd.ipld.car - """ - if cids is None: - cids = [] - - cids = [CID.decode(cid) for cid in cids] - blocks, _ = build_repo(did=did) - - return dag_cbor.encoding.encode([blocks[cid] for cid in cids - if cid in blocks]) - - -@xrpc_server.method('com.atproto.sync.getCheckout') -def get_checkout(input, ): - """ - - Args: - - - Returns: - - """ - - -@xrpc_server.method('com.atproto.sync.getCommitPath') -def get_commit_path(input, ): - """ - - Args: - - - Returns: - - """ - - -@xrpc_server.method('com.atproto.sync.getHead') -def get_head(input, did=None): - """ - - Args: - did: str - - Returns: - str, :class:`CID` - """ - _, mst = build_repo(did=did) - return {'root': mst.get_pointer().encode('base32')} - - -@xrpc_server.method('com.atproto.sync.getRecord') -def get_record(input, ): - """ - - Args: - - - Returns: - - """ - - -@xrpc_server.method('com.atproto.sync.getRepo') -def get_repo(input, did, earliest=None, latest=None): - """Gets a repo's current MST. - - Args: - did: str - earliest: optional str, :class:`CID`, exclusive - latest: optional str, :class:`CID`, inclusive - - Returns: - bytes, binary DAG-CBOR, application/vnd.ipld.car - """ - nodes, mst = build_repo(did=did, earliest=earliest, latest=latest) - return dag_cbor.encoding.encode(nodes) - - -@xrpc_server.method('com.atproto.sync.listBlobs') -def list_blobs(input, ): - """ - - Args: - - - Returns: - - """ - - -@xrpc_server.method('com.atproto.sync.listRepos') -def list_repos(input, ): - """List dids and root cids of hosted repos. - - Args: - limit: int - cursor: str, not yet supported. TODO - - Returns: - list of repos (DID + head CID) - """ - return { - # TODO: cursor - 'repos': [{ - 'did': f'did:web:{user.key.id()}', - 'head': build_repo(user=user)[1].get_pointer().encode('base32') - } for user in User.query() if not user.use_instead] - } - - -@xrpc_server.method('com.atproto.sync.notifyOfUpdate') -def notify_of_update(input, ): - """ - - Args: - - - Returns: - - """ - - -@xrpc_server.method('com.atproto.sync.requestCrawl') -def request_crawl(input, ): - """ - - Args: - - - Returns: - - """ - - -@xrpc_server.method('com.atproto.sync.subscribeRepos') -def subscribe_repos(input, ): - """ - - Args: - - - Returns: - - """ diff --git a/tests/test_atproto.py b/tests/test_atproto.py deleted file mode 100644 index 599c490..0000000 --- a/tests/test_atproto.py +++ /dev/null @@ -1,288 +0,0 @@ -"""Unit tests for atproto.py.""" -from base64 import b64encode -import copy -import random -from unittest import skip -from unittest.mock import patch - -import atproto -import arroba.util -from arroba.util import dag_cbor_cid, verify_commit_sig -from Crypto.PublicKey import ECC -import dag_cbor.decoding, dag_cbor.encoding -from granary import as2, bluesky -from granary.tests.test_bluesky import ( - ACTOR_AS, - ACTOR_PROFILE_BSKY, - POST_AS, - POST_BSKY, - REPLY_AS, - REPLY_BSKY, - REPOST_AS, - REPOST_BSKY, -) -from multiformats import CID -from oauth_dropins.webutil import util -from oauth_dropins.webutil.testutil import NOW - -from flask_app import app -from models import Follower, Object, User -from . import testutil - -# # arroba.mst.Data entry for MST with POST_AS, REPLY_AS, and REPOST_AS -# POST_CID = 'bafyreic5xwex7jxqvliumozkoli3qy2hzxrmui6odl7ujrcybqaypacfiy' -# REPLY_CID = 'bafyreib55ro37wasiflouvlfenhzllorcthm7flr2nj4fnk7yjo54cagvm' -# REPOST_CID = 'bafyreiek3jnp6e4sussy4c7pwtbkkf3kepekzycylowwuepmnvq7aeng44' -# ACTOR_CID = dag_cbor_cid(ACTOR_PROFILE_BSKY) -# HEAD_CID = 'bafyreiagk7qmor3gckkm6dts7c32frtnyn4reznclojgjraqwoumecenx4' -# HEAD_CID_EMPTY = 'bafyreie5737gdxlw5i64vzichcalba3z2v5n6icifvx5xytvske7mr3hpm' -# REPO_ENTRIES = { -# 'l': CID.decode(HEAD_CID), -# 'e': [{ -# 'k': b'app.bsky.feed.feedViewPost/baxkjoxgdgnaqbbi', -# 'v': CID.decode(POST_CID), -# 'p': 0, -# 't': None, -# }, { -# 'k': b'babbi', -# 'v': CID.decode(REPLY_CID), -# 'p': 38, -# 't': None, -# }, { -# 'k': b'qbbi', -# 'v': CID.decode(REPOST_CID), -# 'p': 39, -# 't': None, -# }], -# } - - -class AtProtoTest(testutil.TestCase): - - def setUp(self): - super().setUp() - - # used in now(), injected into Object.created so that TIDs are deterministic - self.last_now = NOW.replace(tzinfo=None) - - def now(self): - self.last_now = self.last_now.replace(microsecond=self.last_now.microsecond + 1) - return self.last_now - - @patch('models.Object.created._now') - def make_objects(self, mock_now): - mock_now.side_effect = self.now - - with app.test_request_context('/'): - Object(id='a', domains=['user.com'], labels=['user'], as2=POST_AS).put() - Object(id='b', domains=['user.com'], labels=['user'], our_as1=REPLY_AS).put() - Object(id='c', domains=['user.com'], labels=['user'], our_as1=REPOST_AS).put() - # not outbound from user - Object(id='d', domains=['user.com'], labels=['feed'], our_as1=POST_AS).put() - # other user's - Object(id='f', domains=['bar.org'], labels=['user'], our_as1=POST_AS).put() - - @skip - def test_get_blocks_empty(self): - self.make_user('user.com') - - resp = self.client.get('/xrpc/com.atproto.sync.getBlocks', query_string={ - 'did': 'did:web:user.com', - 'cids': [], - }) - self.assertEqual(200, resp.status_code) - self.assertEqual([], dag_cbor.decoding.decode(resp.get_data())) - - @skip - def test_get_blocks(self): - self.make_user('user.com') - self.make_objects() - - resp = self.client.get('/xrpc/com.atproto.sync.getBlocks', query_string={ - 'did': 'did:web:user.com', - 'cids': [REPLY_CID, REPOST_CID], - }) - self.assertEqual(200, resp.status_code) - self.assertEqual([REPLY_BSKY, REPOST_BSKY], - dag_cbor.decoding.decode(resp.get_data())) - - def test_get_blocks_error_not_did_web(self): - resp = self.client.get('/xrpc/com.atproto.sync.getBlocks', query_string={ - 'did': 'did:plc:foo', - 'cids': [], - }) - self.assertEqual(400, resp.status_code, resp.get_data(as_text=True)) - - def test_get_blocks_error_no_domain_in_did(self): - resp = self.client.get('/xrpc/com.atproto.sync.getBlocks', query_string={ - 'did': 'did:web:', - 'cids': [], - }) - self.assertEqual(400, resp.status_code, resp.get_data(as_text=True)) - - def test_get_blocks_error_no_user(self): - resp = self.client.get('/xrpc/com.atproto.sync.getBlocks', query_string={ - 'did': 'did:web:nope.com', - 'cids': [], - }) - self.assertEqual(400, resp.status_code, resp.get_data(as_text=True)) - - # def test_get_checkout(self): - - - # def test_get_commit_path(self): - - - @skip - def test_get_head_empty(self): - self.make_user('user.com') - - resp = self.client.get('/xrpc/com.atproto.sync.getHead', query_string={ - 'did': 'did:web:user.com', - }) - self.assertEqual(200, resp.status_code) - self.assertEqual({'root': HEAD_CID_EMPTY}, resp.json) - - @skip - def test_get_head(self): - self.make_user('user.com') - self.make_objects() - - resp = self.client.get('/xrpc/com.atproto.sync.getHead', query_string={ - 'did': 'did:web:user.com', - }) - self.assertEqual(200, resp.status_code) - self.assertEqual({'root': HEAD_CID}, resp.json) - - # alone: bafyreidk5xw2dqskokvhioznjhnha5am4nstrrmd2in7w7bmbuzpwnxlhq - # with test_get_repo: bafyreif754hxy2df3hkhzqbwccvd53imxex35zrx4gou3dvjql6mavxo6a - - # def test_get_record(self): - - @skip - def test_get_repo_empty_user(self): - self.make_user('user.com') - - resp = self.client.get('/xrpc/com.atproto.sync.getRepo', - query_string={'did': 'did:web:user.com'}) - self.assertEqual(200, resp.status_code) - - def test_get_repo_profile(self): - user = self.make_user('user.com', actor_as2=as2.from_as1(ACTOR_AS)) - - mst = { - 'e': [{ - 'k': b'app.bsky.actor.profile/self', - 'v': dag_cbor_cid(ACTOR_PROFILE_BSKY), - 'p': 0, - 't': None, - }], - 'l': CID.decode('bafyreiceqwd3lxb3u5pqe5qjp6v7cq7jxinbspknekibbpm4bfv54fedoe'), - } - commit = { - 'version': 2, - 'did': 'did:web:user.com', - 'data': dag_cbor_cid(mst), - 'prev': None, - } - - resp = self.client.get('/xrpc/com.atproto.sync.getRepo', - query_string={'did': 'did:web:user.com'}) - self.assertEqual(200, resp.status_code) - got = dag_cbor.decoding.decode(resp.get_data()) - - # extract and verify signature - commit = got[2] - assert verify_commit_sig(commit, ECC.import_key(user.p256_key)) - del commit['sig'] - - self.assertEqual([ - ACTOR_PROFILE_BSKY, - mst, - commit, - ], got) - - @skip - def test_get_repo(self): - self.make_user('user.com') - self.make_objects() - - resp = self.client.get('/xrpc/com.atproto.sync.getRepo', - query_string={'did': 'did:web:user.com'}) - self.assertEqual(200, resp.status_code) - - decoded = dag_cbor.decoding.decode(resp.get_data()) - self.assertEqual(REPO_ENTRIES, decoded) - - @skip - def test_get_repo_latest_earliest(self): - self.make_user('user.com') - self.make_objects() - - resp = self.client.get('/xrpc/com.atproto.sync.getRepo', query_string={ - 'did': 'did:web:user.com', - 'latest': REPO_ENTRIES['e'][1]['v'].encode('base32'), - 'earliest': REPO_ENTRIES['e'][0]['v'].encode('base32'), - }) - self.assertEqual(200, resp.status_code) - - decoded = dag_cbor.decoding.decode(resp.get_data()) - self.assertEqual({ - 'l': CID.decode( - 'bafyreieohwgp723mmvfsrg3mxle3azuf2u5ly6h3azlslubalqh5thwrxq'), - 'e': [{ - 'k': b'app.bsky.feed.feedViewPost/baxkjoxgdgnbabce', - 'v': CID.decode( - 'bafyreib55ro37wasiflouvlfenhzllorcthm7flr2nj4fnk7yjo54cagvm'), - 'p': 0, - 't': None, - }], - }, decoded) - - def test_get_repo_error_not_did_web(self): - resp = self.client.get('/xrpc/com.atproto.sync.getRepo', - query_string={'did': 'did:plc:foo'}) - self.assertEqual(400, resp.status_code, resp.get_data(as_text=True)) - - def test_get_repo_error_no_domain_in_did(self): - resp = self.client.get('/xrpc/com.atproto.sync.getRepo', - query_string={'did': 'did:web:'}) - self.assertEqual(400, resp.status_code, resp.get_data(as_text=True)) - - def test_get_repo_error_no_user(self): - resp = self.client.get('/xrpc/com.atproto.sync.getRepo', - query_string={'did': 'did:web:nope.com'}) - self.assertEqual(400, resp.status_code, resp.get_data(as_text=True)) - - # def test_list_blobs(self): - - def test_list_repos_empty(self): - resp = self.client.get('/xrpc/com.atproto.sync.listRepos') - self.assertEqual(200, resp.status_code, resp.get_data(as_text=True)) - self.assertEqual({'repos': []}, resp.json) - - @skip - def test_list_repos(self): - self.make_user('user.com') - self.make_objects() - self.make_user('other.com') - - resp = self.client.get('/xrpc/com.atproto.sync.listRepos') - self.assertEqual(200, resp.status_code, resp.get_data(as_text=True)) - self.assertEqual({'repos': [{ - 'did': 'did:web:other.com', - 'head': HEAD_CID_EMPTY, - }, { - 'did': 'did:web:user.com', - 'head': HEAD_CID, - }]}, resp.json) - - # # /Users/ryan/src/atproto/packages/pds/tests/sync/list.test.ts - - # def test_notify_of_update(self): - - - # def test_request_crawl(self): - - - # def test_subscribe_repos(self):