Bluesky: implement com.atproto.sync.getRepo

first com.atproto.sync XRPC! 🎉
pull/494/head
Ryan Barrett 2023-04-20 15:04:00 -07:00
rodzic a051248ff6
commit 3493aafca8
Nie znaleziono w bazie danych klucza dla tego podpisu
ID klucza GPG: 6BE31FDF4776E9D4
7 zmienionych plików z 383 dodań i 42 usunięć

2
app.py
Wyświetl plik

@ -6,4 +6,4 @@ registered.
from flask_app import app
# import all modules to register their Flask handlers
import activitypub, follow, pages, redirect, render, superfeedr, webfinger, webmention, xrpc_actor, xrpc_feed, xrpc_graph
import activitypub, atproto, follow, pages, redirect, render, superfeedr, webfinger, webmention, xrpc_actor, xrpc_feed, xrpc_graph

246
atproto.py 100644
Wyświetl plik

@ -0,0 +1,246 @@
"""com.atproto.sync XRPC methods."""
import json
import logging
import random
import re
import dag_cbor.encoding
from flask import g
from granary import bluesky
from multiformats import CID, multibase, multicodec, multihash
from oauth_dropins.webutil import util
from atproto_mst import MST, serialize_node_data
from flask_app import xrpc_server
from models import Object, PAGE_SIZE, User
logger = logging.getLogger(__name__)
# the bottom 32 clock ids can be randomized & are not guaranteed to be collision
# resistant. we use the same clockid for all TIDs coming from this runtime.
_clockid = random.randint(0, 31)
# _tid_last = time.time_ns() // 1000 # microseconds
# def next_tid():
# global _tid_last
# # enforce that we're at least 1us after the last TID to prevent TIDs moving
# # backwards if system clock drifts backwards
# now = time.time_ns() // 1000
# if now > _tid_last:
# _tid_last = now
# else:
# _tid_last += 1
# now = _tid_last
def datetime_to_tid(dt):
"""Converts a datetime to an ATProto TID.
https://atproto.com/guides/data-repos#identifier-types
Args:
dt: :class:`datetime.datetime`
Returns:
str, base32-encoded TID
"""
base32 = multibase.get('base32')
def base32_int_bytes(val):
return base32.encode(val.to_bytes((val.bit_length() + 7) // 8, byteorder='big'))
# util.d(base32_int_bytes(int(dt.timestamp() * 1000)),
# base32_int_bytes(_clockid).ljust(2, '2'))
tid = (base32_int_bytes(int(dt.timestamp() * 1000 * 1000)) +
base32_int_bytes(_clockid).ljust(2, '2'))
# TODO
# assert len(tid) == 13, tid
return tid
# return f'{encoded[:4]}-{encoded[4:7]}-{encoded[7:11]}-{encoded[11:]}'
@xrpc_server.method('com.atproto.sync.getBlob')
def get_blob(input, ):
"""
Args:
Returns:
"""
@xrpc_server.method('com.atproto.sync.getBlocks')
def get_blocks(input, ):
"""
Args:
Returns:
"""
@xrpc_server.method('com.atproto.sync.getCheckout')
def get_checkout(input, ):
"""
Args:
Returns:
"""
@xrpc_server.method('com.atproto.sync.getCommitPath')
def get_commit_path(input, ):
"""
Args:
Returns:
"""
@xrpc_server.method('com.atproto.sync.getHead')
def get_head(input, ):
"""
Args:
Returns:
"""
@xrpc_server.method('com.atproto.sync.getRecord')
def get_record(input, ):
"""
Args:
Returns:
"""
@xrpc_server.method('com.atproto.sync.getRepo')
def get_repo(input, did, earliest=None, latest=None):
"""
Args:
did: str
earliest: optional str, :class:`CID`, exclusive
latest: optional str, :class:`CID`, inclusive
Returns:
bytes, binary DAG-CBOR, application/vnd.ipld.car
"""
domain = util.domain_from_link(bluesky.did_web_to_url(did), minimize=False)
if not domain:
raise ValueError(f'No domain found in {did}')
g.user = User.get_by_id(domain)
if not g.user:
raise ValueError(f'No user found for domain {domain}')
blocks = {} # maps CID to DAG-CBOR block bytes
mst = MST()
for obj in Object.query(Object.domains == domain, Object.labels == 'user'):
if not obj.as1:
continue
logger.debug(f'Generating block for {obj.as1}')
bs = bluesky.from_as1(obj.as1)
cbor_bytes = dag_cbor.encoding.encode(bs)
digest = multihash.digest(cbor_bytes, 'sha2-256')
cid = CID('base58btc', 1, multicodec.get('dag-cbor'), digest)
blocks[cid] = cbor_bytes
tid = datetime_to_tid(obj.created)
rkey = f'{bs["$type"]}/{tid}'
logger.debug(f'Adding to MST: {rkey} {cid}')
mst = mst.add(rkey, cid)
return dag_cbor.encoding.encode(serialize_node_data(mst.all_nodes())._asdict())
@xrpc_server.method('com.atproto.sync.listBlobs')
def list_blobs(input, ):
"""
Args:
Returns:
"""
@xrpc_server.method('com.atproto.sync.listRepos')
def list_repos(input, ):
"""
Args:
Returns:
"""
# {
# "cursor" : "1677477924875::did:plc:ipj7dlzp3tjj2sypftxoalht",
# "repos" : [
# {
# "did" : "did:plc:ragtjsm2j2vknwkz3zp4oxrd",
# "head" : "bafyreiegluhfow7tb4jjlxwfjwtzeu6ho53uwrwcxrspt5ejt44a6ydqcy"
# },
# ...
# }
@xrpc_server.method('com.atproto.sync.notifyOfUpdate')
def notify_of_update(input, ):
"""
Args:
Returns:
"""
@xrpc_server.method('com.atproto.sync.requestCrawl')
def request_crawl(input, ):
"""
Args:
Returns:
"""
@xrpc_server.method('com.atproto.sync.subscribeRepos')
def subscribe_repos(input, ):
"""
Args:
Returns:
"""

Wyświetl plik

@ -854,10 +854,9 @@ def layer_for_entries(entries):
Returns:
number | None
"""
# first_leaf = entries.find((entry) => isinstance(entry, Leaf))
# if not first_leaf or isinstance(first_leaf, MST):
# return None
# return leading_zeros_on_hash(first_leaf.key)
for entry in entries:
if isinstance(entry, Leaf):
return leading_zeros_on_hash(first_leaf.key)
# def deserialize_node_data = (

Wyświetl plik

@ -50,7 +50,7 @@ util.set_user_agent('Bridgy Fed (https://fed.brid.gy/)')
# XRPC server
lexicons = []
for filename in (app_dir / 'lexicons/app/bsky').glob('**/*.json'):
for filename in (app_dir / 'lexicons').glob('**/*.json'):
with open(filename) as f:
lexicons.append(json.load(f))

Wyświetl plik

@ -0,0 +1,119 @@
"""Unit tests for atproto.py."""
from unittest.mock import patch
import dag_cbor.decoding
from granary import as2, bluesky
from granary.tests.test_bluesky import (
POST_BSKY,
POST_AS,
REPLY_BSKY,
REPLY_AS,
REPOST_BSKY,
REPOST_AS,
)
from multiformats import CID, multibase
from oauth_dropins.webutil import util
from oauth_dropins.webutil.testutil import NOW
from flask_app import app
from models import Object, User
from . import testutil
last_now = NOW.replace(tzinfo=None)
def next_now():
global last_now
last_now = last_now.replace(microsecond=last_now.microsecond + 1)
return last_now
class AtProtoTest(testutil.TestCase):
# def test_get_blob(input, ):
# def test_get_blocks(self):
# def test_get_checkout(self):
# def test_get_commit_path(self):
# def test_get_head(self):
# def test_get_record(self):
@patch('models.Object.created._now', side_effect=next_now)
def test_get_repo(self, _):
self.make_user('user.com')
with app.test_request_context('/'):
Object(id='a', domains=['user.com'], labels=['user'], as2=POST_AS).put()
Object(id='b', domains=['user.com'], labels=['user'], our_as1=REPLY_AS).put()
Object(id='c', domains=['user.com'], labels=['user'], our_as1=REPOST_AS).put()
# not outbound from user
Object(id='d', domains=['user.com'], labels=['feed'], our_as1=POST_AS).put()
# other user's
Object(id='f', domains=['bar.org'], labels=['user'], our_as1=POST_AS).put()
resp = self.client.get('/xrpc/com.atproto.sync.getRepo',
query_string={'did': 'did:web:user.com'})
self.assertEqual(200, resp.status_code)
decoded = dag_cbor.decoding.decode(resp.get_data())
self.assertEqual({
'l': CID.decode(multibase.decode(
'bafyreie5737gdxlw5i64vzichcalba3z2v5n6icifvx5xytvske7mr3hpm')),
'e': [{
'k': b'app.bsky.feed.feedViewPost/baxkjoxgdgnaqbbi',
'v': CID.decode(multibase.decode(
'bafyreic5xwex7jxqvliumozkoli3qy2hzxrmui6odl7ujrcybqaypacfiy')),
'p': 0,
't': None,
}, {
'k': b'babbi',
'v': CID.decode(multibase.decode(
'bafyreib55ro37wasiflouvlfenhzllorcthm7flr2nj4fnk7yjo54cagvm')),
'p': 38,
't': None,
}, {
'k': b'qbbi',
'v': CID.decode(multibase.decode(
'bafyreiek3jnp6e4sussy4c7pwtbkkf3kepekzycylowwuepmnvq7aeng44')),
'p': 39,
't': None,
}],
}, decoded)
def test_get_repo_error_not_did_web(self):
resp = self.client.get('/xrpc/com.atproto.sync.getRepo',
query_string={'did': 'did:plc:foo'})
self.assertEqual(400, resp.status_code, resp.get_data(as_text=True))
def test_get_repo_error_no_domain_in_did(self):
resp = self.client.get('/xrpc/com.atproto.sync.getRepo',
query_string={'did': 'did:web:'})
self.assertEqual(400, resp.status_code, resp.get_data(as_text=True))
def test_get_repo_error_no_user(self):
resp = self.client.get('/xrpc/com.atproto.sync.getRepo',
query_string={'did': 'did:web:nope.com'})
self.assertEqual(400, resp.status_code, resp.get_data(as_text=True))
# def test_list_blobs(self):
# def test_list_repos(self):
# # /Users/ryan/src/atproto/packages/pds/tests/sync/list.test.ts
# def test_notify_of_update(self):
# def test_request_crawl(self):
# def test_subscribe_repos(self):

Wyświetl plik

@ -16,50 +16,21 @@ from unittest import skip
import dag_cbor.random
from multiformats import CID, multibase
from atproto import datetime_to_tid
from atproto_mst import common_prefix_len, ensure_valid_key, MST
from . import testutil
# make random test data deterministic
random.seed(1234567890)
dag_cbor.random.set_options(seed=1234567890)
CID1 = CID.decode(multibase.decode(
'bafyreie5cvv4h45feadgeuwhbcutmh6t2ceseocckahdoe6uat64zmz454'))
# _tid_last = time.time_ns() // 1000 # microseconds
_tid_clockid = random.randint(0, 31)
# def next_tid():
# global _tid_last
# # enforce that we're at least 1us after the last TID to prevent TIDs moving
# # backwards if system clock drifts backwards
# now = time.time_ns() // 1000
# if now > _tid_last:
# _tid_last = now
# else:
# _tid_last += 1
# now = _tid_last
def next_tid():
ms = random.randint(datetime(2020, 1, 1).timestamp() * 1000,
datetime(2024, 1, 1).timestamp() * 1000)
# the bottom 32 clock ids can be randomized & are not guaranteed to be
# collision resistant. we use the same clockid for all TIDs coming from this
# machine
base32 = multibase.get('base32')
def base32_int_bytes(val):
return base32.encode(val.to_bytes((val.bit_length() + 7) // 8, byteorder='big'))
encoded = base32_int_bytes(ms) + base32_int_bytes(_tid_clockid).ljust(2, '2')
return f'{encoded[:4]}-{encoded[4:7]}-{encoded[7:11]}-{encoded[11:]}'
def make_data(num):
return [(f'com.example.record/{next_tid()}', cid)
def tid():
ms = random.randint(datetime(2020, 1, 1).timestamp() * 1000,
datetime(2024, 1, 1).timestamp() * 1000)
return datetime_to_tid(datetime.fromtimestamp(float(ms) / 1000))
return [(f'com.example.record/{tid()}', cid)
for cid in dag_cbor.random.rand_cid(num)]

Wyświetl plik

@ -1,9 +1,11 @@
"""Common test utility code."""
import copy
import datetime
import random
import unittest
from unittest.mock import ANY, call
import dag_cbor.random
from flask import g
from google.cloud import ndb
from granary import as2
@ -18,6 +20,10 @@ from oauth_dropins.webutil.appengine_config import ndb_client
from oauth_dropins.webutil.testutil import requests_response
import requests
# make random test data deterministic
random.seed(1234567890)
dag_cbor.random.set_options(seed=1234567890)
# load all Flask handlers
import app
from flask_app import app, cache