2023-08-24 03:34:32 +00:00
|
|
|
"""ATProto protocol implementation.
|
|
|
|
|
|
|
|
https://atproto.com/
|
|
|
|
"""
|
2023-09-14 16:42:11 +00:00
|
|
|
import itertools
|
2023-08-24 03:34:32 +00:00
|
|
|
import logging
|
2023-09-14 16:42:11 +00:00
|
|
|
import os
|
2023-08-24 03:44:42 +00:00
|
|
|
import re
|
2023-08-24 03:34:32 +00:00
|
|
|
|
2023-08-31 03:59:37 +00:00
|
|
|
from arroba import did
|
2023-09-14 16:42:11 +00:00
|
|
|
from arroba.datastore_storage import AtpRepo, DatastoreStorage
|
2023-09-01 19:07:21 +00:00
|
|
|
from arroba.repo import Repo, Write
|
2023-09-06 03:10:11 +00:00
|
|
|
from arroba.storage import Action, CommitData
|
2023-09-18 22:04:00 +00:00
|
|
|
from arroba.util import next_tid, parse_at_uri, service_jwt
|
2023-08-24 03:34:32 +00:00
|
|
|
from flask import abort, g, request
|
|
|
|
from google.cloud import ndb
|
2023-08-29 19:35:20 +00:00
|
|
|
from granary import as1, bluesky
|
2023-08-31 17:48:28 +00:00
|
|
|
from lexrpc import Client
|
2023-09-14 16:42:11 +00:00
|
|
|
from oauth_dropins.webutil import util
|
2023-08-24 03:34:32 +00:00
|
|
|
import requests
|
|
|
|
|
|
|
|
import common
|
|
|
|
from common import (
|
|
|
|
add,
|
2023-09-09 22:11:52 +00:00
|
|
|
DOMAIN_BLOCKLIST,
|
2023-08-24 03:34:32 +00:00
|
|
|
error,
|
2023-08-31 17:48:28 +00:00
|
|
|
USER_AGENT,
|
2023-08-24 03:34:32 +00:00
|
|
|
)
|
2023-09-14 16:42:11 +00:00
|
|
|
import flask_app
|
|
|
|
import hub
|
2023-09-14 17:20:04 +00:00
|
|
|
from models import Object, PROTOCOLS, Target, User
|
2023-08-24 03:34:32 +00:00
|
|
|
from protocol import Protocol
|
|
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
2023-09-01 19:07:21 +00:00
|
|
|
storage = DatastoreStorage()
|
2023-08-24 03:34:32 +00:00
|
|
|
|
2023-09-04 15:11:19 +00:00
|
|
|
|
2023-08-24 03:34:32 +00:00
|
|
|
class ATProto(User, Protocol):
|
|
|
|
"""AT Protocol class.
|
|
|
|
|
|
|
|
Key id is DID, currently either did:plc or did:web.
|
|
|
|
https://atproto.com/specs/did
|
|
|
|
"""
|
|
|
|
ABBREV = 'atproto'
|
|
|
|
|
2023-08-24 04:04:17 +00:00
|
|
|
@ndb.ComputedProperty
|
|
|
|
def readable_id(self):
|
|
|
|
"""Prefers handle, then DID."""
|
2023-09-07 00:32:35 +00:00
|
|
|
return self.atproto_handle() or self.key.id()
|
2023-08-24 03:34:32 +00:00
|
|
|
|
2023-08-24 03:44:42 +00:00
|
|
|
def _pre_put_hook(self):
|
2023-09-01 19:07:21 +00:00
|
|
|
"""Validate id, require did:plc or non-blocklisted did:web.
|
|
|
|
|
|
|
|
Also check that the atproto_did property isn't set.
|
|
|
|
"""
|
2023-08-24 03:44:42 +00:00
|
|
|
super()._pre_put_hook()
|
|
|
|
id = self.key.id()
|
|
|
|
assert id
|
|
|
|
|
|
|
|
if id.startswith('did:plc:'):
|
|
|
|
assert id.removeprefix('did:plc:')
|
2023-09-01 19:07:21 +00:00
|
|
|
elif id.startswith('did:web:'):
|
2023-08-24 03:44:42 +00:00
|
|
|
domain = id.removeprefix('did:web:')
|
|
|
|
assert (re.match(common.DOMAIN_RE, domain)
|
2023-09-06 23:15:19 +00:00
|
|
|
and not Protocol.is_blocklisted(domain)), domain
|
2023-09-01 19:07:21 +00:00
|
|
|
else:
|
|
|
|
assert False, f'{id} is not valid did:plc or did:web'
|
2023-08-24 03:44:42 +00:00
|
|
|
|
2023-09-01 19:07:21 +00:00
|
|
|
assert not self.atproto_did, \
|
|
|
|
f"{self.key} shouldn't have atproto_did {self.atproto_did}"
|
2023-08-24 03:44:42 +00:00
|
|
|
|
2023-09-07 00:32:35 +00:00
|
|
|
def atproto_handle(self):
|
|
|
|
"""Returns handle if the DID document includes one, otherwise None."""
|
|
|
|
did_obj = ATProto.load(self.key.id(), remote=False)
|
|
|
|
if did_obj:
|
|
|
|
handle, _, _ = parse_at_uri(
|
|
|
|
util.get_first(did_obj.raw, 'alsoKnownAs', ''))
|
|
|
|
if handle:
|
|
|
|
return handle
|
|
|
|
|
2023-08-24 04:04:17 +00:00
|
|
|
def web_url(self):
|
2023-08-31 18:50:36 +00:00
|
|
|
return bluesky.Bluesky.user_url(self.readable_id)
|
2023-08-24 03:34:32 +00:00
|
|
|
|
2023-08-31 18:19:57 +00:00
|
|
|
def ap_address(self):
|
|
|
|
"""Returns this user's AP address, eg '@handle.com@bsky.brid.gy'."""
|
|
|
|
return f'@{self.readable_id}@{self.ABBREV}{common.SUPERDOMAIN}'
|
2023-08-24 03:34:32 +00:00
|
|
|
|
|
|
|
@classmethod
|
|
|
|
def owns_id(cls, id):
|
|
|
|
return (id.startswith('at://')
|
|
|
|
or id.startswith('did:plc:')
|
2023-09-13 04:52:21 +00:00
|
|
|
or id.startswith('did:web:')
|
|
|
|
or id.startswith('https://bsky.app/'))
|
2023-08-24 03:34:32 +00:00
|
|
|
|
2023-08-31 17:48:28 +00:00
|
|
|
@classmethod
|
|
|
|
def target_for(cls, obj, shared=False):
|
|
|
|
"""Returns the PDS URL for the given object, or None.
|
|
|
|
|
2023-09-11 23:21:03 +00:00
|
|
|
If the repo DID/handle doesn't exist in the PLC directory, defaults to
|
|
|
|
returning Bridgy Fed's URL as the PDS.
|
|
|
|
|
2023-08-31 17:48:28 +00:00
|
|
|
Args:
|
|
|
|
obj: :class:`Object`
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
str
|
|
|
|
"""
|
2023-09-13 04:52:21 +00:00
|
|
|
id = obj.key.id()
|
|
|
|
if id.startswith('did:'):
|
2023-08-31 17:48:28 +00:00
|
|
|
return None
|
|
|
|
|
2023-09-13 04:52:21 +00:00
|
|
|
logger.info(f'Finding ATProto PDS for {id}')
|
|
|
|
if id.startswith('https://bsky.app/'):
|
|
|
|
return cls.target_for(Object(id=bluesky.web_url_to_at_uri(id)))
|
|
|
|
|
|
|
|
if id.startswith('at://'):
|
|
|
|
repo, collection, rkey = parse_at_uri(id)
|
|
|
|
|
|
|
|
if not repo.startswith('did:'):
|
|
|
|
# repo is a handle; resolve it
|
|
|
|
repo_did = did.resolve_handle(repo, get_fn=util.requests_get)
|
|
|
|
if repo_did:
|
|
|
|
return cls.target_for(Object(id=id.replace(
|
|
|
|
f'at://{repo}', f'at://{repo_did}')))
|
|
|
|
else:
|
|
|
|
return None
|
|
|
|
|
2023-09-01 20:59:28 +00:00
|
|
|
did_obj = ATProto.load(repo)
|
|
|
|
if did_obj:
|
2023-09-12 18:49:57 +00:00
|
|
|
return cls._pds_for(did_obj)
|
2023-09-11 23:21:03 +00:00
|
|
|
# TODO: what should we do if the DID doesn't exist? should we return
|
|
|
|
# None here? or do we need this path to return BF's URL so that we
|
|
|
|
# then create the DID for non-ATP users on demand?
|
2023-09-01 20:59:28 +00:00
|
|
|
|
|
|
|
if obj.as1:
|
|
|
|
owner = as1.get_owner(obj.as1)
|
|
|
|
if owner:
|
|
|
|
user_key = Protocol.key_for(owner)
|
|
|
|
if user_key:
|
|
|
|
user = user_key.get()
|
|
|
|
if user and user.atproto_did:
|
|
|
|
return cls.target_for(Object(id=f'at://{user.atproto_did}'))
|
|
|
|
|
|
|
|
return common.host_url()
|
2023-08-24 03:34:32 +00:00
|
|
|
|
2023-09-12 18:49:57 +00:00
|
|
|
@classmethod
|
|
|
|
def _pds_for(cls, did_obj):
|
|
|
|
"""
|
|
|
|
Args:
|
|
|
|
did_obj: :class:`Object`
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
str, PDS URL, or None
|
|
|
|
"""
|
|
|
|
assert did_obj.key.id().startswith('did:')
|
|
|
|
|
|
|
|
for service in did_obj.raw.get('service', []):
|
|
|
|
if service.get('id') in ('#atproto_pds',
|
|
|
|
f'{did_obj.key.id()}#atproto_pds'):
|
|
|
|
return service.get('serviceEndpoint')
|
|
|
|
|
|
|
|
logger.info(f"{did_obj.key.id()}'s DID doc has no ATProto PDS")
|
|
|
|
return None
|
|
|
|
|
2023-09-06 23:15:19 +00:00
|
|
|
def is_blocklisted(url):
|
|
|
|
# don't block common.DOMAINS since we want ourselves, ie our own PDS, to
|
|
|
|
# be a valid domain to send to
|
|
|
|
return util.domain_or_parent_in(util.domain_from_link(url), DOMAIN_BLOCKLIST)
|
|
|
|
|
2023-09-01 19:07:21 +00:00
|
|
|
@classmethod
|
2023-09-09 21:17:47 +00:00
|
|
|
def send(to_cls, obj, url, log_data=True):
|
2023-09-01 19:07:21 +00:00
|
|
|
"""Creates a record if we own its repo.
|
|
|
|
|
|
|
|
Creates the repo first if it doesn't exist.
|
|
|
|
|
|
|
|
If the repo's DID doc doesn't say we're its PDS, does nothing and
|
|
|
|
returns False.
|
|
|
|
|
|
|
|
Doesn't deliver anywhere externally! BGS(es) will receive this record
|
|
|
|
through subscribeRepos and then deliver it to AppView(s), which will
|
|
|
|
notify recipients as necessary.
|
|
|
|
"""
|
|
|
|
if url.rstrip('/') != common.host_url().rstrip('/'):
|
|
|
|
logger.info(f'Target PDS {url} is not us')
|
|
|
|
return False
|
|
|
|
|
|
|
|
type = as1.object_type(obj.as1)
|
|
|
|
if type == 'post':
|
|
|
|
type = as1.object_type(as1.get_object(obj.as1))
|
|
|
|
assert type in ('note', 'article')
|
|
|
|
|
|
|
|
user_key = PROTOCOLS[obj.source_protocol].actor_key(obj)
|
|
|
|
if not user_key:
|
|
|
|
logger.info(f"Couldn't find {obj.source_protocol} user for {obj.key}")
|
|
|
|
return False
|
|
|
|
|
2023-09-09 14:50:53 +00:00
|
|
|
def create_atproto_commit_task(commit_data):
|
2023-09-10 23:44:05 +00:00
|
|
|
common.create_task(queue='atproto-commit')
|
2023-09-09 14:50:53 +00:00
|
|
|
|
2023-09-09 04:08:12 +00:00
|
|
|
writes = []
|
2023-09-01 19:07:21 +00:00
|
|
|
user = user_key.get()
|
2023-09-09 04:08:12 +00:00
|
|
|
repo = None
|
2023-09-01 19:07:21 +00:00
|
|
|
if user.atproto_did:
|
2023-09-09 04:08:12 +00:00
|
|
|
# existing DID and repo
|
2023-09-09 21:17:47 +00:00
|
|
|
did_doc = to_cls.load(user.atproto_did)
|
2023-09-12 18:49:57 +00:00
|
|
|
pds = to_cls._pds_for(did_doc)
|
|
|
|
if not pds or pds.rstrip('/') != url.rstrip('/'):
|
2023-09-01 19:07:21 +00:00
|
|
|
logger.warning(f'{user_key} {user.atproto_did} PDS {pds} is not us')
|
|
|
|
return False
|
2023-09-09 04:08:12 +00:00
|
|
|
repo = storage.load_repo(user.atproto_did)
|
2023-09-09 14:50:53 +00:00
|
|
|
repo.callback = create_atproto_commit_task
|
2023-09-09 04:08:12 +00:00
|
|
|
|
2023-09-01 19:07:21 +00:00
|
|
|
else:
|
2023-09-09 04:08:12 +00:00
|
|
|
# create new DID, repo
|
2023-09-07 00:32:35 +00:00
|
|
|
logger.info(f'Creating new did:plc for {user.key}')
|
2023-09-09 04:08:12 +00:00
|
|
|
did_plc = did.create_plc(user.atproto_handle(),
|
2023-09-09 14:50:53 +00:00
|
|
|
pds_url=common.host_url(),
|
2023-09-01 19:07:21 +00:00
|
|
|
post_fn=util.requests_post)
|
|
|
|
|
|
|
|
ndb.transactional()
|
2023-09-09 04:08:12 +00:00
|
|
|
def update_user_create_repo():
|
2023-09-01 19:07:21 +00:00
|
|
|
Object.get_or_create(did_plc.did, raw=did_plc.doc)
|
|
|
|
user.atproto_did = did_plc.did
|
2023-09-19 23:07:11 +00:00
|
|
|
add(user.copies, Target(uri=did_plc.did, protocol=to_cls.LABEL))
|
2023-09-01 19:07:21 +00:00
|
|
|
user.put()
|
|
|
|
|
2023-09-09 04:08:12 +00:00
|
|
|
assert not storage.load_repo(user.atproto_did)
|
|
|
|
nonlocal repo
|
|
|
|
repo = Repo.create(storage, user.atproto_did,
|
|
|
|
handle=user.atproto_handle(),
|
2023-09-09 14:50:53 +00:00
|
|
|
callback=create_atproto_commit_task,
|
2023-09-09 04:08:12 +00:00
|
|
|
signing_key=did_plc.signing_key,
|
|
|
|
rotation_key=did_plc.rotation_key)
|
|
|
|
if user.obj and user.obj.as1:
|
|
|
|
# create user profile
|
|
|
|
writes.append(Write(action=Action.CREATE,
|
|
|
|
collection='app.bsky.actor.profile',
|
|
|
|
rkey='self', record=user.obj.as_bsky()))
|
|
|
|
update_user_create_repo()
|
2023-09-01 21:18:50 +00:00
|
|
|
|
2023-09-09 04:08:12 +00:00
|
|
|
logger.info(f'{user.key} is {user.atproto_did}')
|
|
|
|
assert repo
|
2023-09-06 03:10:11 +00:00
|
|
|
|
2023-09-01 21:18:50 +00:00
|
|
|
# create record
|
2023-09-14 17:20:04 +00:00
|
|
|
ndb.transactional()
|
|
|
|
def write():
|
|
|
|
tid = next_tid()
|
|
|
|
writes.append(Write(action=Action.CREATE, collection='app.bsky.feed.post',
|
|
|
|
rkey=tid, record=obj.as_bsky()))
|
|
|
|
repo.apply_writes(writes)
|
|
|
|
|
|
|
|
at_uri = f'at://{user.atproto_did}/app.bsky.feed.post/{tid}'
|
|
|
|
add(obj.copies, Target(uri=at_uri, protocol=to_cls.ABBREV))
|
|
|
|
obj.put()
|
|
|
|
|
|
|
|
write()
|
2023-09-06 03:10:11 +00:00
|
|
|
|
2023-09-01 19:07:21 +00:00
|
|
|
return True
|
2023-08-24 03:34:32 +00:00
|
|
|
|
2023-08-31 03:59:37 +00:00
|
|
|
@classmethod
|
|
|
|
def fetch(cls, obj, **kwargs):
|
|
|
|
"""Tries to fetch a ATProto object.
|
2023-08-24 03:34:32 +00:00
|
|
|
|
2023-08-31 03:59:37 +00:00
|
|
|
Args:
|
|
|
|
obj: :class:`Object` with the id to fetch. Fills data into the as2
|
|
|
|
property.
|
|
|
|
kwargs: ignored
|
2023-08-24 03:34:32 +00:00
|
|
|
|
2023-08-31 03:59:37 +00:00
|
|
|
Returns:
|
|
|
|
True if the object was fetched and populated successfully,
|
|
|
|
False otherwise
|
2023-08-24 03:34:32 +00:00
|
|
|
|
2023-08-31 03:59:37 +00:00
|
|
|
Raises:
|
|
|
|
TODO
|
|
|
|
"""
|
|
|
|
id = obj.key.id()
|
|
|
|
if not cls.owns_id(id):
|
|
|
|
logger.info(f"ATProto can't fetch {id}")
|
|
|
|
return False
|
|
|
|
|
2023-08-31 17:48:28 +00:00
|
|
|
# did:plc, did:web
|
2023-08-31 03:59:37 +00:00
|
|
|
if id.startswith('did:'):
|
|
|
|
try:
|
|
|
|
obj.raw = did.resolve(id, get_fn=util.requests_get)
|
|
|
|
return True
|
|
|
|
except (ValueError, requests.RequestException) as e:
|
|
|
|
util.interpret_http_exception(e)
|
|
|
|
return False
|
2023-08-24 03:34:32 +00:00
|
|
|
|
2023-09-12 18:49:57 +00:00
|
|
|
pds = cls.target_for(obj)
|
|
|
|
if not pds:
|
|
|
|
return False
|
|
|
|
|
2023-08-31 17:48:28 +00:00
|
|
|
# at:// URI
|
|
|
|
# examples:
|
|
|
|
# at://did:plc:s2koow7r6t7tozgd4slc3dsg/app.bsky.feed.post/3jqcpv7bv2c2q
|
|
|
|
# https://bsky.social/xrpc/com.atproto.repo.getRecord?repo=did:plc:s2koow7r6t7tozgd4slc3dsg&collection=app.bsky.feed.post&rkey=3jqcpv7bv2c2q
|
|
|
|
repo, collection, rkey = parse_at_uri(obj.key.id())
|
2023-09-12 18:49:57 +00:00
|
|
|
client = Client(pds, headers={'User-Agent': USER_AGENT})
|
2023-08-31 17:48:28 +00:00
|
|
|
obj.bsky = client.com.atproto.repo.getRecord(
|
|
|
|
repo=repo, collection=collection, rkey=rkey)
|
2023-09-14 16:42:11 +00:00
|
|
|
# TODO: verify sig?
|
2023-08-31 17:48:28 +00:00
|
|
|
return True
|
|
|
|
|
2023-08-24 04:04:17 +00:00
|
|
|
@classmethod
|
|
|
|
def serve(cls, obj):
|
2023-08-29 19:35:20 +00:00
|
|
|
"""Serves an :class:`Object` as AS2.
|
|
|
|
|
|
|
|
This is minimally implemented to serve app.bsky.* lexicon data, but
|
|
|
|
BGSes and other clients will generally receive ATProto commits via
|
|
|
|
`com.atproto.sync.subscribeRepos` subscriptions, not BF-specific
|
|
|
|
/convert/... HTTP requests, so this should never be used in practice.
|
|
|
|
"""
|
|
|
|
return bluesky.from_as1(obj.as1), {'Content-Type': 'application/json'}
|
2023-09-14 16:42:11 +00:00
|
|
|
|
|
|
|
|
|
|
|
@hub.app.get('/_ah/queue/atproto-poll-notifs')
|
|
|
|
def poll_notifications():
|
|
|
|
"""Fetches and enqueueus new activities from the AppView for our users.
|
|
|
|
|
2023-09-18 22:04:00 +00:00
|
|
|
Uses the `listNotifications` endpoint, which is intended for end users. 🤷
|
2023-09-14 16:42:11 +00:00
|
|
|
"""
|
2023-09-18 22:04:00 +00:00
|
|
|
repos = {r.key.id(): r for r in AtpRepo.query()}
|
|
|
|
logger.info(f'Got {len(repos)} repos')
|
2023-09-14 16:42:11 +00:00
|
|
|
|
2023-09-18 22:04:00 +00:00
|
|
|
repo_dids = []
|
|
|
|
users = itertools.chain(*(cls.query(cls.atproto_did.IN(list(repos)))
|
2023-09-14 16:42:11 +00:00
|
|
|
for cls in set(PROTOCOLS.values())
|
|
|
|
if cls and cls != ATProto))
|
|
|
|
|
|
|
|
# TODO: convert to Session for connection pipelining!
|
|
|
|
client = Client(f'https://{os.environ["APPVIEW_HOST"]}',
|
|
|
|
headers={'User-Agent': USER_AGENT})
|
|
|
|
|
|
|
|
for user in users:
|
|
|
|
# TODO: store and use cursor
|
2023-09-19 19:21:34 +00:00
|
|
|
# seenAt would be easier, but they don't support it yet
|
|
|
|
# https://github.com/bluesky-social/atproto/issues/1636
|
2023-09-18 22:04:00 +00:00
|
|
|
repo = repos[user.atproto_did]
|
|
|
|
client.access_token = service_jwt(os.environ['APPVIEW_HOST'],
|
|
|
|
repo_did=user.atproto_did,
|
|
|
|
privkey=repo.signing_key)
|
2023-09-14 16:42:11 +00:00
|
|
|
resp = client.app.bsky.notification.listNotifications()
|
|
|
|
for notif in resp['notifications']:
|
|
|
|
logger.info(f'Got {notif["reason"]} from {notif["author"]["handle"]} {notif["uri"]} {notif["cid"]}')
|
|
|
|
|
2023-09-19 19:21:34 +00:00
|
|
|
# TODO: verify sig. skipping this for now because we're getting
|
|
|
|
# these from the AppView, which is trusted, specifically we expect
|
|
|
|
# the BGS and/or the AppView already checked sigs.
|
2023-09-14 16:42:11 +00:00
|
|
|
obj = Object.get_or_create(id=notif['uri'], bsky=notif['record'],
|
|
|
|
source_protocol=ATProto.ABBREV)
|
|
|
|
if not obj.status:
|
|
|
|
obj.status = 'new'
|
|
|
|
add(obj.notify, user.key)
|
|
|
|
obj.put()
|
|
|
|
|
2023-09-19 18:15:49 +00:00
|
|
|
common.create_task(queue='receive', obj=obj.key.urlsafe(),
|
|
|
|
# TODO: should this be the receiving user?
|
|
|
|
# or the sending user?
|
|
|
|
user=user.key.urlsafe())
|
2023-09-14 16:42:11 +00:00
|
|
|
|
|
|
|
return 'OK'
|