2023-08-24 03:34:32 +00:00
""" ATProto protocol implementation.
https : / / atproto . com /
"""
2024-08-28 05:44:18 +00:00
from datetime import timedelta
2023-09-14 16:42:11 +00:00
import itertools
2023-08-24 03:34:32 +00:00
import logging
2023-09-14 16:42:11 +00:00
import os
2023-08-24 03:44:42 +00:00
import re
2023-08-24 03:34:32 +00:00
2023-08-31 03:59:37 +00:00
from arroba import did
2023-11-03 00:41:31 +00:00
from arroba . datastore_storage import AtpRemoteBlob , AtpRepo , DatastoreStorage
2023-09-01 19:07:21 +00:00
from arroba . repo import Repo , Write
2023-09-28 20:42:16 +00:00
import arroba . server
2023-09-06 03:10:11 +00:00
from arroba . storage import Action , CommitData
2024-06-08 22:16:39 +00:00
from arroba . util import (
at_uri ,
dag_cbor_cid ,
next_tid ,
parse_at_uri ,
service_jwt ,
2024-11-14 19:38:43 +00:00
TOMBSTONED ,
2024-06-08 22:16:39 +00:00
)
2024-06-06 21:02:22 +00:00
import brevity
2023-10-07 20:01:29 +00:00
import dag_json
2023-09-28 20:42:16 +00:00
from flask import abort , request
2023-10-04 20:54:20 +00:00
from google . cloud import dns
2024-07-24 20:41:21 +00:00
from google . cloud . dns . resource_record_set import ResourceRecordSet
2023-08-24 03:34:32 +00:00
from google . cloud import ndb
2024-07-24 20:41:21 +00:00
import googleapiclient . discovery
2023-08-29 19:35:20 +00:00
from granary import as1 , bluesky
2024-07-04 23:58:06 +00:00
from granary . bluesky import Bluesky , FROM_AS1_TYPES
2024-06-10 23:02:18 +00:00
from granary . source import html_to_text , INCLUDE_LINK , Source
2024-09-29 15:57:19 +00:00
from lexrpc import Client , ValidationError
2024-03-12 21:45:48 +00:00
from requests import RequestException
2024-08-28 05:44:18 +00:00
from oauth_dropins . webutil import util
2024-04-01 01:44:38 +00:00
from oauth_dropins . webutil . appengine_config import ndb_client
2023-10-05 22:55:31 +00:00
from oauth_dropins . webutil . appengine_info import DEBUG
2024-08-28 05:44:18 +00:00
from oauth_dropins . webutil . flask_util import cloud_tasks_only , get_required_param
2024-05-09 04:35:03 +00:00
from oauth_dropins . webutil . models import StringIdModel
2024-10-31 00:09:05 +00:00
from oauth_dropins . webutil . util import add , json_dumps , json_loads
2023-08-24 03:34:32 +00:00
import common
from common import (
2023-09-09 22:11:52 +00:00
DOMAIN_BLOCKLIST ,
2023-09-22 19:14:50 +00:00
DOMAIN_RE ,
2023-10-20 05:17:52 +00:00
DOMAINS ,
2023-08-24 03:34:32 +00:00
error ,
2024-06-06 14:50:16 +00:00
PRIMARY_DOMAIN ,
2023-08-31 17:48:28 +00:00
USER_AGENT ,
2023-08-24 03:34:32 +00:00
)
2023-09-14 16:42:11 +00:00
import flask_app
2024-05-29 23:18:15 +00:00
import ids
2024-06-20 02:59:55 +00:00
from models import Follower , Object , PROTOCOLS , Target , User
2023-08-24 03:34:32 +00:00
from protocol import Protocol
logger = logging . getLogger ( __name__ )
2024-04-01 01:44:38 +00:00
arroba . server . storage = DatastoreStorage ( ndb_client = ndb_client )
2023-08-24 03:34:32 +00:00
2024-04-10 23:34:40 +00:00
appview = Client ( f ' https:// { os . environ [ " APPVIEW_HOST " ] } ' ,
headers = { ' User-Agent ' : USER_AGENT } )
LEXICONS = appview . defs
2024-04-11 22:02:15 +00:00
# https://atproto.com/guides/applications#record-types
COLLECTION_TO_TYPE = {
' app.bsky.actor.profile ' : ' profile ' ,
' app.bsky.feed.like ' : ' like ' ,
' app.bsky.feed.post ' : ' post ' ,
' app.bsky.feed.repost ' : ' repost ' ,
' app.bsky.graph.follow ' : ' follow ' ,
}
2023-10-03 23:52:21 +00:00
2023-10-04 20:54:20 +00:00
DNS_GCP_PROJECT = ' brid-gy '
DNS_ZONE = ' brid-gy '
DNS_TTL = 10800 # seconds
logger . info ( f ' Using GCP DNS project { DNS_GCP_PROJECT } zone { DNS_ZONE } ' )
2024-07-24 20:41:21 +00:00
# "Cloud DNS API" https://github.com/googleapis/python-dns
2023-10-04 20:54:20 +00:00
dns_client = dns . Client ( project = DNS_GCP_PROJECT )
2024-07-24 20:41:21 +00:00
# "Discovery API" https://github.com/googleapis/google-api-python-client
dns_discovery_api = googleapiclient . discovery . build ( ' dns ' , ' v1 ' )
2023-10-04 20:54:20 +00:00
2023-09-04 15:11:19 +00:00
2024-08-26 19:56:12 +00:00
def chat_client ( * , repo , method , * * kwargs ) :
""" Returns a new Bluesky chat :class:`Client` for a given XRPC method.
Args :
repo ( arroba . repo . Repo ) : ATProto user
method ( str ) : XRPC method NSID , eg ` ` chat . bsky . convo . sendMessage ` `
kwargs : passed through to the : class : ` lexrpc . Client ` constructor
Returns :
lexrpc . Client :
"""
token = service_jwt ( host = os . environ [ ' CHAT_HOST ' ] ,
aud = os . environ [ ' CHAT_DID ' ] ,
repo_did = repo . did ,
privkey = repo . signing_key ,
lxm = method )
kwargs . setdefault ( ' headers ' , { } ) . update ( {
' User-Agent ' : USER_AGENT ,
' Authorization ' : f ' Bearer { token } ' ,
} )
kwargs . setdefault ( ' truncate ' , True )
return Client ( f ' https:// { os . environ [ " CHAT_HOST " ] } ' , * * kwargs )
2024-05-16 19:48:28 +00:00
class DatastoreClient ( Client ) :
""" Bluesky client that uses the datastore as well as remote XRPC calls.
Overrides ` ` getRecord ` ` and ` ` resolveHandle ` ` . If we have a record or DID
document stored locally , uses it as is instead of making a remote XRPC call .
Otherwise , passes through to the server .
Right now , requires that the server address is the same as
` ` $ APPVIEW_HOST ` ` , because ` ` getRecord ` ` passes through to ` ` ATProto . load ` `
and then to ` ` ATProto . fetch ` ` , which uses the ` ` appview ` ` global .
"""
def __init__ ( self , * args , * * kwargs ) :
super ( ) . __init__ ( * args , * * kwargs )
assert self . address == f ' https:// { os . environ [ " APPVIEW_HOST " ] } ' , self . address
def call ( self , nsid , input = None , headers = { } , * * params ) :
if nsid == ' com.atproto.repo.getRecord ' :
2024-06-20 18:19:31 +00:00
return self . get_record ( * * params ) # may return {}
2024-05-16 19:48:28 +00:00
if nsid == ' com.atproto.identity.resolveHandle ' :
if ret := self . resolve_handle ( * * params ) :
return ret
return super ( ) . call ( nsid , input = input , headers = headers , * * params )
def get_record ( self , repo = None , collection = None , rkey = None ) :
assert repo and collection and rkey , ( repo , collection , rkey )
uri = at_uri ( did = repo , collection = collection , rkey = rkey )
2024-06-19 23:08:07 +00:00
record = None
# local record in a repo we own?
if repo := arroba . server . storage . load_repo ( repo ) :
record = repo . get_record ( collection = collection , rkey = rkey )
2024-06-20 18:19:31 +00:00
# remote record that we may have a cached copy of
2024-06-19 23:08:07 +00:00
if not record :
2024-10-04 19:50:24 +00:00
if obj := ATProto . load ( uri , raise_ = False ) :
record = obj . bsky
2024-06-19 23:08:07 +00:00
if record :
2024-05-16 19:48:28 +00:00
return {
' uri ' : uri ,
2024-06-19 23:08:07 +00:00
' cid ' : record . get ( ' cid ' ) or dag_cbor_cid ( record ) . encode ( ' base32 ' ) ,
' value ' : record ,
2024-05-16 19:48:28 +00:00
}
2024-06-20 18:19:31 +00:00
else :
return { }
2024-05-16 19:48:28 +00:00
2024-11-08 04:52:03 +00:00
@staticmethod
def resolve_handle ( handle = None ) :
2024-05-16 19:48:28 +00:00
assert handle
2024-11-07 15:35:11 +00:00
got = ( ATProto . query ( ATProto . handle == handle ) . get ( ) # native Bluesky user
or AtpRepo . query ( AtpRepo . handles == handle , # bridged user,
2024-11-08 04:52:03 +00:00
AtpRepo . status == None ) . get ( ) # non-tombstoned first
2024-11-07 15:35:11 +00:00
or AtpRepo . query ( AtpRepo . handles == handle ) . get ( ) )
2024-05-16 20:11:29 +00:00
if got :
return { ' did ' : got . key . id ( ) }
2024-05-16 19:48:28 +00:00
2024-10-29 01:23:21 +00:00
def did_to_handle ( did , remote = None ) :
""" Resolves a DID to a handle.
2024-04-11 22:02:15 +00:00
Args :
did ( str )
Returns :
str : handle , or None
2024-10-29 01:23:21 +00:00
remote ( bool ) : whether to fetch the object over the network . See
: meth : ` Protocol . load `
2024-04-11 22:02:15 +00:00
"""
2024-10-29 01:23:21 +00:00
if did_obj := ATProto . load ( did , did_doc = True , remote = remote ) :
# use first at:// URI in alsoKnownAs
for aka in util . get_list ( did_obj . raw , ' alsoKnownAs ' ) :
if aka . startswith ( ' at:// ' ) :
handle , _ , _ = parse_at_uri ( aka )
if handle :
return handle
2024-04-11 22:02:15 +00:00
2024-05-09 04:35:03 +00:00
class Cursor ( StringIdModel ) :
""" The last cursor (sequence number) we ' ve seen for a host and event stream.
https : / / atproto . com / specs / event - stream #sequence-numbers
Key id is ` ` [ HOST ] [ XRPC ] ` ` , where ` ` [ XRPC ] ` ` is the NSID of the XRPC method
for the event stream . For example , ` subscribeRepos ` on the production relay
is ` ` bsky . network com . atproto . sync . subscribeRepos ` ` .
2024-05-09 15:44:30 +00:00
` ` cursor ` ` is the latest sequence number that we know we ' ve seen, so when we
re - subscribe to this event stream , we should send ` ` cursor + 1 ` ` .
2024-05-09 04:35:03 +00:00
"""
cursor = ndb . IntegerProperty ( )
created = ndb . DateTimeProperty ( auto_now_add = True )
updated = ndb . DateTimeProperty ( auto_now = True )
2023-08-24 03:34:32 +00:00
class ATProto ( User , Protocol ) :
""" AT Protocol class.
Key id is DID , currently either did : plc or did : web .
https : / / atproto . com / specs / did
"""
2024-04-16 18:52:50 +00:00
ABBREV = ' bsky '
2024-01-25 03:20:54 +00:00
PHRASE = ' Bluesky '
2024-04-15 21:53:54 +00:00
LOGO_HTML = ' <img src= " /oauth_dropins_static/bluesky.svg " > '
2024-04-16 18:52:50 +00:00
# note that PDS hostname is atproto.brid.gy here, not bsky.brid.gy. Bluesky
# team currently has our hostname as atproto.brid.gy in their federation
2024-05-02 00:23:39 +00:00
# test. also note that PDS URL shouldn't include trailing slash.
# https://atproto.com/specs/did#did-documents
PDS_URL = f ' https://atproto { common . SUPERDOMAIN } '
2023-10-24 23:09:28 +00:00
CONTENT_TYPE = ' application/json '
2024-04-23 23:52:53 +00:00
HAS_COPIES = True
2024-05-11 23:50:57 +00:00
REQUIRES_AVATAR = True
2024-07-16 17:46:17 +00:00
REQUIRES_NAME = False
2024-07-16 02:39:22 +00:00
DEFAULT_ENABLED_PROTOCOLS = ( ' web ' , )
2024-07-04 23:58:06 +00:00
SUPPORTED_AS1_TYPES = frozenset (
tuple ( as1 . ACTOR_TYPES )
+ tuple ( as1 . POST_TYPES )
2024-07-05 04:11:38 +00:00
+ tuple ( as1 . CRUD_VERBS )
2024-08-05 20:20:29 +00:00
+ ( ' block ' , ' follow ' , ' flag ' , ' like ' , ' share ' , ' stop-following ' )
2024-07-06 06:12:35 +00:00
)
2024-07-04 23:58:06 +00:00
SUPPORTED_RECORD_TYPES = frozenset (
type for type in itertools . chain ( * FROM_AS1_TYPES . values ( ) )
if ' # ' not in type )
2024-08-09 03:41:57 +00:00
SUPPORTS_DMS = True
2023-08-24 03:34:32 +00:00
2023-08-24 03:44:42 +00:00
def _pre_put_hook ( self ) :
2023-11-16 03:08:06 +00:00
""" Validate id, require did:plc or non-blocklisted did:web. """
2023-08-24 03:44:42 +00:00
super ( ) . _pre_put_hook ( )
id = self . key . id ( )
assert id
if id . startswith ( ' did:plc: ' ) :
assert id . removeprefix ( ' did:plc: ' )
2023-09-01 19:07:21 +00:00
elif id . startswith ( ' did:web: ' ) :
2023-08-24 03:44:42 +00:00
domain = id . removeprefix ( ' did:web: ' )
assert ( re . match ( common . DOMAIN_RE , domain )
2023-09-06 23:15:19 +00:00
and not Protocol . is_blocklisted ( domain ) ) , domain
2023-09-01 19:07:21 +00:00
else :
assert False , f ' { id } is not valid did:plc or did:web '
2023-08-24 03:44:42 +00:00
2023-09-25 22:08:14 +00:00
@ndb.ComputedProperty
2023-09-25 17:57:16 +00:00
def handle ( self ) :
2023-09-07 00:32:35 +00:00
""" Returns handle if the DID document includes one, otherwise None. """
2024-04-11 22:02:15 +00:00
return did_to_handle ( self . key . id ( ) )
2023-09-07 00:32:35 +00:00
2023-08-24 04:04:17 +00:00
def web_url ( self ) :
2024-09-19 20:47:24 +00:00
return Bluesky . user_url ( self . handle_or_id ( ) )
2023-08-24 03:34:32 +00:00
2024-10-02 22:01:07 +00:00
def id_uri ( self ) :
return f ' at:// { self . key . id ( ) } '
2023-08-24 03:34:32 +00:00
@classmethod
def owns_id ( cls , id ) :
return ( id . startswith ( ' at:// ' )
or id . startswith ( ' did:plc: ' )
2023-09-13 04:52:21 +00:00
or id . startswith ( ' did:web: ' )
or id . startswith ( ' https://bsky.app/ ' ) )
2023-08-24 03:34:32 +00:00
2023-09-22 19:14:50 +00:00
@classmethod
2024-05-03 22:18:16 +00:00
def owns_handle ( cls , handle , allow_internal = False ) :
# TODO: implement allow_internal
if not did . HANDLE_RE . fullmatch ( handle ) :
2023-09-22 19:14:50 +00:00
return False
2023-09-22 20:11:15 +00:00
@classmethod
def handle_to_id ( cls , handle ) :
2024-08-15 21:04:42 +00:00
if not handle or cls . owns_handle ( handle ) is False :
return None
2023-09-22 20:11:15 +00:00
2024-11-08 04:52:03 +00:00
if resp := DatastoreClient . resolve_handle ( handle ) :
return resp [ ' did ' ]
2023-09-22 20:11:15 +00:00
Revert "cache outbound HTTP request responses, locally to each inbound request"
This reverts commit 30debfc8faf730190bd51a3aef49df6c6bfbd50a.
seemed promising, but broke in production. Saw a lot of `IncompleteRead`s on both GETs and POSTs. Rolled back for now.
```
('Connection broken: IncompleteRead(9172 bytes read, -4586 more expected)', IncompleteRead(9172 bytes read, -4586 more expected))
...
File "oauth_dropins/webutil/util.py", line 1673, in call
resp = getattr((session or requests), fn)(url, *args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "requests_cache/session.py", line 102, in get
return self.request('GET', url, params=params, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "requests_cache/session.py", line 158, in request
return super().request(method, url, *args, headers=headers, **kwargs) # type: ignore
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "requests/sessions.py", line 589, in request
resp = self.send(prep, **send_kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "requests_cache/session.py", line 205, in send
response = self._send_and_cache(request, actions, cached_response, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "requests_cache/session.py", line 233, in _send_and_cache
self.cache.save_response(response, actions.cache_key, actions.expires)
File "requests_cache/backends/base.py", line 89, in save_response
cached_response = CachedResponse.from_response(response, expires=expires)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "requests_cache/models/response.py", line 102, in from_response
obj.raw = CachedHTTPResponse.from_response(response)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "requests_cache/models/raw_response.py", line 69, in from_response
_ = response.content # This property reads, decodes, and stores response content
^^^^^^^^^^^^^^^^
File "requests/models.py", line 899, in content
self._content = b"".join(self.iter_content(CONTENT_CHUNK_SIZE)) or b""
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "requests/models.py", line 818, in generate
raise ChunkedEncodingError(e)
```
2024-03-08 21:24:28 +00:00
return did . resolve_handle ( handle , get_fn = util . requests_get )
2023-09-22 20:11:15 +00:00
2024-10-04 00:06:46 +00:00
def reload_profile ( self ) :
""" Reloads this user ' s DID doc along with their profile object. """
super ( ) . reload_profile ( )
self . load ( self . key . id ( ) , did_doc = True , remote = True )
2024-04-12 15:46:59 +00:00
@classmethod
2024-09-10 23:44:18 +00:00
def bridged_web_url_for ( cls , user , fallback = False ) :
2024-04-12 15:46:59 +00:00
""" Returns a bridged user ' s profile URL on bsky.app.
For example , returns ` ` https : / / bsky . app / profile / alice . com . web . brid . gy ` `
for Web user ` ` alice . com ` ` .
"""
if not isinstance ( user , ATProto ) :
if did := user . get_copy ( ATProto ) :
2024-09-19 20:47:24 +00:00
return Bluesky . user_url ( did_to_handle ( did ) or did )
2024-04-12 15:46:59 +00:00
2023-08-31 17:48:28 +00:00
@classmethod
def target_for ( cls , obj , shared = False ) :
2023-10-20 05:17:52 +00:00
""" Returns our PDS URL as the target for the given object.
2023-10-18 19:01:17 +00:00
ATProto delivery is indirect . We write all records to the user ' s local
2024-05-09 13:43:51 +00:00
repo that we host , then relays and other subscribers receive them via the
2023-10-18 19:01:17 +00:00
subscribeRepos event streams . So , we use a single target , our base URL
2024-05-09 13:43:51 +00:00
( eg ` ` https : / / atproto . brid . gy ` ` ) as the PDS URL , for all activities .
2023-10-18 19:01:17 +00:00
"""
if cls . owns_id ( obj . key . id ( ) ) is not False :
2023-10-20 05:17:52 +00:00
return cls . PDS_URL
2023-08-31 17:48:28 +00:00
2023-10-18 19:01:17 +00:00
@classmethod
def pds_for ( cls , obj ) :
""" Returns the PDS URL for the given object, or None.
2023-09-11 23:21:03 +00:00
2023-08-31 17:48:28 +00:00
Args :
2023-10-06 06:32:31 +00:00
obj ( Object )
2023-08-31 17:48:28 +00:00
Returns :
2023-10-06 06:32:31 +00:00
str :
2023-08-31 17:48:28 +00:00
"""
2023-09-13 04:52:21 +00:00
id = obj . key . id ( )
2023-10-18 19:01:17 +00:00
# logger.debug(f'Finding ATProto PDS for {id}')
2023-09-13 04:52:21 +00:00
if id . startswith ( ' did: ' ) :
2023-10-18 19:01:17 +00:00
if obj . raw :
for service in obj . raw . get ( ' service ' , [ ] ) :
if service . get ( ' id ' ) in ( ' #atproto_pds ' , f ' { id } #atproto_pds ' ) :
return service . get ( ' serviceEndpoint ' )
logger . info ( f " { id } ' s DID doc has no ATProto PDS " )
2023-08-31 17:48:28 +00:00
return None
2023-09-13 04:52:21 +00:00
if id . startswith ( ' https://bsky.app/ ' ) :
2023-10-18 19:01:17 +00:00
return cls . pds_for ( Object ( id = bluesky . web_url_to_at_uri ( id ) ) )
2023-09-13 04:52:21 +00:00
if id . startswith ( ' at:// ' ) :
repo , collection , rkey = parse_at_uri ( id )
if not repo . startswith ( ' did: ' ) :
# repo is a handle; resolve it
2024-03-03 00:50:01 +00:00
repo_did = cls . handle_to_id ( repo )
2023-09-13 04:52:21 +00:00
if repo_did :
2023-10-18 19:01:17 +00:00
return cls . pds_for ( Object ( id = id . replace (
2023-09-13 04:52:21 +00:00
f ' at:// { repo } ' , f ' at:// { repo_did } ' ) ) )
else :
return None
2024-03-13 23:08:08 +00:00
did_obj = ATProto . load ( repo , did_doc = True )
2023-09-01 20:59:28 +00:00
if did_obj :
2023-10-18 19:01:17 +00:00
return cls . pds_for ( did_obj )
2023-09-11 23:21:03 +00:00
# TODO: what should we do if the DID doesn't exist? should we return
# None here? or do we need this path to return BF's URL so that we
# then create the DID for non-ATP users on demand?
2023-09-01 20:59:28 +00:00
2024-03-15 04:16:09 +00:00
# don't use Object.as1 if bsky is set, since that conversion calls
# pds_for, which would infinite loop
if not obj . bsky and obj . as1 :
2023-11-16 03:08:06 +00:00
if owner := as1 . get_owner ( obj . as1 ) :
if user_key := Protocol . key_for ( owner ) :
if user := user_key . get ( ) :
if owner_did := user . get_copy ( ATProto ) :
return cls . pds_for ( Object ( id = f ' at:// { owner_did } ' ) )
2023-09-12 18:49:57 +00:00
return None
2024-04-22 20:24:24 +00:00
def is_blocklisted ( url , allow_internal = False ) :
2023-09-06 23:15:19 +00:00
# don't block common.DOMAINS since we want ourselves, ie our own PDS, to
# be a valid domain to send to
return util . domain_or_parent_in ( util . domain_from_link ( url ) , DOMAIN_BLOCKLIST )
2023-10-04 19:44:14 +00:00
@classmethod
def create_for ( cls , user ) :
2024-03-03 00:50:01 +00:00
""" Creates an ATProto repo and profile for a non-ATProto user.
2023-10-04 19:44:14 +00:00
2024-10-22 20:49:18 +00:00
If the repo already exists , reactivates it by emitting an #account event
with active : True .
2023-10-04 19:44:14 +00:00
Args :
2023-10-06 06:32:31 +00:00
user ( models . User )
2024-05-03 22:18:16 +00:00
Raises :
ValueError : if the user ' s handle is invalid, eg begins or ends with an
underscore or dash
2023-10-04 19:44:14 +00:00
"""
assert not isinstance ( user , ATProto )
2024-10-22 20:49:18 +00:00
if copy_did := user . get_copy ( ATProto ) :
2024-11-14 19:38:43 +00:00
# already bridged and inactive
2024-10-22 20:49:18 +00:00
repo = arroba . server . storage . load_repo ( copy_did )
2024-11-14 19:38:43 +00:00
assert repo . status
if repo . status == TOMBSTONED :
# tombstoned repos can't be reactivated, have to wipe and start fresh
user . copies = [ ]
if user . obj :
user . obj . copies = [ ]
user . obj . put ( )
# fall through to create new DID, repo
elif repo . status :
# deactivated or deleted
arroba . server . storage . activate_repo ( repo )
common . create_task ( queue = ' atproto-commit ' )
return
2023-10-04 19:44:14 +00:00
# create new DID, repo
2024-05-02 00:23:39 +00:00
# PDS URL shouldn't include trailing slash!
# https://atproto.com/specs/did#did-documents
pds_url = common . host_url ( ) . rstrip ( ' / ' ) if DEBUG else cls . PDS_URL
2024-05-02 22:41:41 +00:00
handle = user . handle_as ( ' atproto ' )
2024-08-02 15:02:36 +00:00
logger . info ( f ' Creating new did:plc for { user . key . id ( ) } { handle } { pds_url } ' )
2024-10-02 21:17:27 +00:00
did_plc = did . create_plc ( handle , pds_url = pds_url , post_fn = util . requests_post ,
also_known_as = user . profile_id ( ) )
2023-10-04 19:44:14 +00:00
2024-05-28 23:04:14 +00:00
Object . get_or_create ( did_plc . did , raw = did_plc . doc , authed_as = did_plc )
2023-11-02 20:08:12 +00:00
# TODO: move this to ATProto.get_or_create?
2023-10-04 19:44:14 +00:00
add ( user . copies , Target ( uri = did_plc . did , protocol = ' atproto ' ) )
2023-10-04 20:54:20 +00:00
2024-07-24 20:41:21 +00:00
cls . set_dns ( handle = handle , did = did_plc . did )
2023-10-04 19:44:14 +00:00
# fetch and store profile
2024-07-10 00:28:01 +00:00
if not user . obj or not user . obj . as1 :
2024-10-04 00:06:46 +00:00
user . reload_profile ( )
2023-10-04 19:44:14 +00:00
2024-06-05 03:19:29 +00:00
initial_writes = [ ]
2023-10-04 19:44:14 +00:00
if user . obj and user . obj . as1 :
# create user profile
2024-04-10 22:16:37 +00:00
profile = cls . convert ( user . obj , fetch_blobs = True , from_user = user )
2024-06-01 14:17:44 +00:00
logger . info ( f ' Storing ATProto app.bsky.actor.profile self ' )
2024-06-05 03:19:29 +00:00
initial_writes . append (
Write ( action = Action . CREATE , collection = ' app.bsky.actor.profile ' ,
rkey = ' self ' , record = profile ) )
2023-11-16 03:08:06 +00:00
uri = at_uri ( did_plc . did , ' app.bsky.actor.profile ' , ' self ' )
2023-10-07 20:51:59 +00:00
user . obj . add ( ' copies ' , Target ( uri = uri , protocol = ' atproto ' ) )
2023-10-04 19:44:14 +00:00
user . obj . put ( )
2024-06-05 03:19:29 +00:00
# create chat declaration
logger . info ( f ' Storing ATProto chat declaration record ' )
chat_declaration = {
" $type " : " chat.bsky.actor.declaration " ,
" allowIncoming " : " none " ,
}
initial_writes . append (
Write ( action = Action . CREATE , collection = ' chat.bsky.actor.declaration ' ,
rkey = ' self ' , record = chat_declaration ) )
2023-10-04 19:44:14 +00:00
repo = Repo . create (
2023-11-16 03:08:06 +00:00
arroba . server . storage , did_plc . did , handle = handle ,
2023-10-04 19:44:14 +00:00
callback = lambda _ : common . create_task ( queue = ' atproto-commit ' ) ,
initial_writes = initial_writes ,
signing_key = did_plc . signing_key ,
rotation_key = did_plc . rotation_key )
user . put ( )
2024-07-24 15:17:06 +00:00
@staticmethod
2024-07-24 20:41:21 +00:00
def set_dns ( handle , did ) :
""" Create _atproto DNS record for handle resolution.
2024-07-24 15:17:06 +00:00
https : / / atproto . com / specs / handle #handle-resolution
If the DNS record already exists , or if we ' re not in prod, does nothing.
If the DNS record exists with a different DID , deletes it and recreates
it with this DID .
2024-07-24 20:41:21 +00:00
Args :
handle ( str ) : Bluesky handle , eg ` ` snarfed . org . web . brid . gy ` `
2024-07-24 15:17:06 +00:00
"""
name = f ' _atproto. { handle } . '
val = f ' " did= { did } " '
logger . info ( f ' adding GCP DNS TXT record for { name } { val } ' )
if DEBUG :
logger . info ( ' skipped since DEBUG is true ' )
return
# https://cloud.google.com/python/docs/reference/dns/latest
# https://cloud.google.com/dns/docs/reference/rest/v1/
zone = dns_client . zone ( DNS_ZONE )
2024-07-24 20:41:21 +00:00
changes = zone . changes ( )
2024-07-24 15:17:06 +00:00
2024-07-24 20:41:21 +00:00
# sadly can't check if the record exists with the google.cloud.dns API
# because it doesn't support list_resource_record_sets's name param.
# heed to use the generic discovery-based API instead.
2024-07-24 15:17:06 +00:00
# https://cloud.google.com/python/docs/reference/dns/latest/zone#listresourcerecordsetsmaxresultsnone-pagetokennone-clientnone
# https://github.com/googleapis/python-dns/issues/31#issuecomment-1595105412
# https://cloud.google.com/apis/docs/client-libraries-explained
# https://googleapis.github.io/google-api-python-client/docs/dyn/dns_v1.resourceRecordSets.html
2024-10-23 23:29:51 +00:00
logger . info ( ' Checking for existing record ' )
2024-07-24 20:41:21 +00:00
resp = dns_discovery_api . resourceRecordSets ( ) . list (
project = DNS_GCP_PROJECT , managedZone = DNS_ZONE , type = ' TXT ' , name = name ,
) . execute ( )
for existing in resp . get ( ' rrsets ' , [ ] ) :
2024-10-23 23:29:51 +00:00
logger . info ( f ' deleting { existing } ' )
2024-07-24 20:41:21 +00:00
changes . delete_record_set ( ResourceRecordSet . from_api_repr ( existing , zone = zone ) )
changes . add_record_set ( zone . resource_record_set ( name = name , record_type = ' TXT ' ,
ttl = DNS_TTL , rrdatas = [ val ] ) )
2024-07-24 15:17:06 +00:00
changes . create ( )
2024-07-24 20:41:21 +00:00
logger . info ( ' done! ' )
2024-07-24 15:17:06 +00:00
2024-10-27 17:50:01 +00:00
@classmethod
def set_username ( to_cls , user , username ) :
if not user . is_enabled ( ATProto ) :
raise ValueError ( " First, you ' ll need to bridge your account into Bluesky by following this account. " )
copy_did = user . get_copy ( ATProto )
username = username . removeprefix ( ' @ ' )
# resolve_handle checks that username is a valid domain
resolved = did . resolve_handle ( username , get_fn = util . requests_get )
if resolved != copy_did :
2024-10-29 05:27:01 +00:00
raise RuntimeError ( f """ <p>You ' ll need to connect that domain to your bridged Bluesky account, either <a href= " https://bsky.social/about/blog/4-28-2023-domain-handle-tutorial " >with DNS</a> <a href= " https://atproto.com/specs/handle#handle-resolution " >or HTTP</a>. Your DID is: <code> { copy_did } </code><p>Once you ' re done, <a href= " https://bsky-debug.app/handle?handle= { username } " >check your work here</a>, then DM me <em>username { username } </em> again. """ )
2024-10-27 17:50:01 +00:00
2024-11-14 19:38:43 +00:00
repo = arroba . server . storage . load_repo ( copy_did )
assert repo
if repo . status :
logger . info ( f ' { repo . did } is { repo . status } , giving up ' )
2024-10-27 17:50:01 +00:00
return False
2024-10-29 04:23:11 +00:00
logger . info ( f ' Setting ATProto handle for { user . key . id ( ) } to { username } ' )
2024-10-28 04:48:22 +00:00
repo . callback = lambda _ : common . create_task ( queue = ' atproto-commit ' )
2024-10-27 17:50:01 +00:00
did . update_plc ( did = copy_did , handle = username ,
signing_key = repo . signing_key , rotation_key = repo . rotation_key ,
get_fn = util . requests_get , post_fn = util . requests_post )
2024-10-28 04:48:22 +00:00
arroba . server . storage . write_event ( repo = repo , type = ' identity ' , handle = username )
2024-10-27 17:50:01 +00:00
2024-10-29 01:23:21 +00:00
# refresh our stored DID doc
2024-11-14 00:23:44 +00:00
to_cls . load ( copy_did , did_doc = True , remote = True )
2024-10-29 01:23:21 +00:00
2023-09-01 19:07:21 +00:00
@classmethod
2024-10-11 18:34:31 +00:00
def send ( to_cls , obj , url , from_user = None , orig_obj_id = None ) :
2023-09-01 19:07:21 +00:00
""" Creates a record if we own its repo.
If the repo ' s DID doc doesn ' t say we ' re its PDS, does nothing and
returns False .
2024-05-09 13:43:51 +00:00
Doesn ' t deliver anywhere externally! Relays will receive this record
2023-10-06 15:22:50 +00:00
through ` ` subscribeRepos ` ` and then deliver it to AppView ( s ) , which will
2023-09-01 19:07:21 +00:00
notify recipients as necessary .
2024-08-08 18:42:28 +00:00
Exceptions :
* ` ` flag ` ` s are translated to ` ` createReport ` ` to the mod service
* DMs are translated to ` ` sendMessage ` ` to the chat service
2023-09-01 19:07:21 +00:00
"""
2023-10-20 05:17:52 +00:00
if util . domain_from_link ( url ) not in DOMAINS :
2023-09-01 19:07:21 +00:00
logger . info ( f ' Target PDS { url } is not us ' )
return False
2023-11-15 03:55:20 +00:00
# determine "base" object, if any
2023-09-01 19:07:21 +00:00
type = as1 . object_type ( obj . as1 )
2023-11-09 22:06:59 +00:00
base_obj = obj
2024-06-22 23:41:23 +00:00
base_obj_as1 = obj . as1
2024-09-17 01:27:04 +00:00
allow_opt_out = ( type == ' delete ' )
2024-06-30 04:57:22 +00:00
if type in ( ' post ' , ' update ' , ' delete ' , ' undo ' ) :
2024-06-22 23:41:23 +00:00
base_obj_as1 = as1 . get_object ( obj . as1 )
2024-06-25 01:17:58 +00:00
base_id = base_obj_as1 [ ' id ' ]
base_obj = PROTOCOLS [ obj . source_protocol ] . load ( base_id , remote = False )
2024-06-30 04:57:22 +00:00
if type not in ( ' delete ' , ' undo ' ) :
2024-06-25 01:17:58 +00:00
if not base_obj : # probably a new repo
base_obj = Object ( id = base_id , source_protocol = obj . source_protocol )
base_obj . our_as1 = base_obj_as1
2023-11-09 22:06:59 +00:00
2024-06-20 02:59:55 +00:00
elif type == ' stop-following ' :
assert from_user
to_id = as1 . get_object ( obj . as1 ) . get ( ' id ' )
assert to_id
to_key = Protocol . key_for ( to_id )
2024-11-08 04:52:03 +00:00
if not to_key :
logger . info ( f ' Skipping, { to_id } is opted out ' )
return False
2024-06-20 02:59:55 +00:00
follower = Follower . query ( Follower . from_ == from_user . key ,
Follower . to == to_key ) . get ( )
if not follower or not follower . follow :
logger . info ( f " Skipping, can ' t find Follower for { from_user . key . id ( ) } => { to_key . id ( ) } with follow " )
return False
base_obj = follower . follow . get ( )
2024-03-13 21:40:31 +00:00
# convert to Bluesky record; short circuits on error
2024-06-22 23:41:23 +00:00
record = to_cls . convert ( base_obj , fetch_blobs = True , from_user = from_user )
2024-03-13 21:40:31 +00:00
2023-11-15 03:55:20 +00:00
# find user
2023-09-28 20:42:16 +00:00
from_cls = PROTOCOLS [ obj . source_protocol ]
2024-09-17 01:27:04 +00:00
from_key = from_cls . actor_key ( obj , allow_opt_out = allow_opt_out )
2023-09-28 20:42:16 +00:00
if not from_key :
2024-08-02 15:02:36 +00:00
logger . info ( f " Couldn ' t find { obj . source_protocol } user for { obj . key . id ( ) } " )
2023-09-01 19:07:21 +00:00
return False
2023-09-28 20:42:16 +00:00
# load user
2024-09-17 01:27:04 +00:00
user = from_cls . get_or_create ( from_key . id ( ) , allow_opt_out = allow_opt_out , propagate = True )
2023-11-16 03:08:06 +00:00
did = user . get_copy ( ATProto )
assert did
2024-08-02 15:02:36 +00:00
logger . info ( f ' { user . key . id ( ) } is { did } ' )
2024-03-13 23:08:08 +00:00
did_doc = to_cls . load ( did , did_doc = True )
2023-10-18 19:01:17 +00:00
pds = to_cls . pds_for ( did_doc )
2023-10-20 05:17:52 +00:00
if not pds or util . domain_from_link ( pds ) not in DOMAINS :
2024-08-02 15:02:36 +00:00
logger . warning ( f ' PDS { pds } is not us ' )
2023-09-28 20:42:16 +00:00
return False
# load repo
2024-11-14 19:38:43 +00:00
repo = arroba . server . storage . load_repo ( did )
assert repo
if repo . status :
logger . info ( f ' { repo . did } is { repo . status } , giving up ' )
2024-06-08 22:16:39 +00:00
return False
2023-09-28 20:42:16 +00:00
repo . callback = lambda _ : common . create_task ( queue = ' atproto-commit ' )
2023-09-06 03:10:11 +00:00
2024-05-22 22:13:36 +00:00
# non-commit operations:
2024-10-22 20:49:18 +00:00
# * delete actor => deactivate repo
2024-05-22 22:13:36 +00:00
# * flag => send report to mod service
2024-06-25 01:17:58 +00:00
# * stop-following => delete follow record (prepared above)
2024-08-08 18:42:28 +00:00
# * dm => chat message
2024-06-04 19:31:38 +00:00
verb = obj . as1 . get ( ' verb ' )
2024-06-22 23:41:23 +00:00
if verb == ' delete ' :
2024-06-25 01:17:58 +00:00
atp_base_id = ( base_id if ATProto . owns_id ( base_id )
else ids . translate_user_id ( from_ = from_cls , to = to_cls ,
id = base_id ) )
2024-06-22 23:41:23 +00:00
if atp_base_id == did :
2024-10-22 20:49:18 +00:00
logger . info ( f ' Deactivating bridged ATProto account { did } ! ' )
arroba . server . storage . deactivate_repo ( repo )
2024-06-22 23:41:23 +00:00
return True
2024-05-22 22:13:36 +00:00
2024-08-08 18:42:28 +00:00
if not record :
# _convert already logged
return False
if verb == ' flag ' :
2024-06-25 01:17:58 +00:00
logger . info ( f ' flag => createReport with { record } ' )
2024-08-26 19:59:01 +00:00
return create_report ( input = record , from_user = user )
2024-04-26 20:59:04 +00:00
2024-06-20 02:59:55 +00:00
elif verb == ' stop-following ' :
2024-06-25 01:17:58 +00:00
logger . info ( f ' stop-following => delete of { base_obj . key . id ( ) } ' )
2024-06-20 02:59:55 +00:00
assert base_obj and base_obj . type == ' follow ' , base_obj
verb = ' delete '
2024-08-15 00:06:58 +00:00
elif recip := as1 . recipient_if_dm ( obj . as1 ) :
assert recip . startswith ( ' did: ' ) , recip
2024-08-26 19:59:01 +00:00
return send_chat ( msg = record , from_repo = repo , to_did = recip )
2024-06-19 23:42:41 +00:00
2024-08-08 18:42:28 +00:00
# write commit
2023-10-03 23:52:21 +00:00
type = record [ ' $type ' ]
lex_type = LEXICONS [ type ] [ ' type ' ]
assert lex_type == ' record ' , f " Can ' t store { type } object of type { lex_type } "
2024-06-08 23:52:34 +00:00
# only modify objects that we've bridged
rkey = None
2024-06-30 04:57:22 +00:00
if verb in ( ' update ' , ' delete ' , ' undo ' ) :
2024-06-08 23:52:34 +00:00
# check that they're updating the object we have
copy = base_obj . get_copy ( to_cls )
if not copy :
2024-06-25 01:17:58 +00:00
logger . info ( f " Can ' t { verb } { base_obj . key . id ( ) } { type } , we didn ' t create it originally " )
2024-06-08 23:52:34 +00:00
return False
2024-08-11 14:42:10 +00:00
2024-06-08 23:52:34 +00:00
copy_did , coll , rkey = parse_at_uri ( copy )
2024-08-11 14:42:10 +00:00
if copy_did != did or coll != type :
logger . info ( f " Can ' t { verb } { base_obj . key . id ( ) } { type } , original { copy } is in a different repo or collection " )
return False
2024-06-08 23:52:34 +00:00
2023-09-14 17:20:04 +00:00
ndb . transactional ( )
def write ( ) :
2024-06-08 23:52:34 +00:00
nonlocal rkey
2024-03-13 21:47:48 +00:00
match verb :
case ' update ' :
action = Action . UPDATE
2024-06-30 04:57:22 +00:00
case ' delete ' | ' undo ' :
2024-03-13 21:47:48 +00:00
action = Action . DELETE
case _ :
action = Action . CREATE
rkey = next_tid ( )
2024-10-11 22:24:25 +00:00
logger . info ( f ' Storing ATProto { action } { type } { rkey } { dag_json . encode ( record , dialect = " atproto " ) } ' )
2024-06-20 04:01:50 +00:00
try :
repo . apply_writes ( [ Write ( action = action , collection = type , rkey = rkey ,
record = record ) ] )
except KeyError as e :
# raised by update and delete if no record exists for this
# collection/rkey
logger . warning ( e )
return False
2023-10-14 01:28:04 +00:00
2024-06-30 04:57:22 +00:00
if verb not in ( ' delete ' , ' undo ' ) :
2024-06-20 02:59:55 +00:00
at_uri = f ' at:// { did } / { type } / { rkey } '
base_obj . add ( ' copies ' , Target ( uri = at_uri , protocol = to_cls . LABEL ) )
base_obj . put ( )
2023-09-14 17:20:04 +00:00
2024-06-20 04:01:50 +00:00
return True
return write ( )
2023-08-24 03:34:32 +00:00
2024-03-13 23:08:08 +00:00
@classmethod
def load ( cls , id , did_doc = False , * * kwargs ) :
2024-03-14 22:40:25 +00:00
""" Thin wrapper that converts DIDs and bsky.app URLs to at:// URIs.
2024-03-13 23:08:08 +00:00
Args :
did_doc ( bool ) : if True , loads and returns a DID document object
instead of an ` ` app . bsky . actor . profile / self ` ` .
"""
2024-03-14 22:40:25 +00:00
if id . startswith ( ' did: ' ) and not did_doc :
2024-05-29 23:18:15 +00:00
id = ids . profile_id ( id = id , proto = cls )
2024-03-13 23:08:08 +00:00
2024-03-14 22:40:25 +00:00
elif id . startswith ( ' https://bsky.app/ ' ) :
try :
id = bluesky . web_url_to_at_uri ( id )
except ValueError as e :
logger . warning ( f " Couldn ' t convert { id } to at:// URI: { e } " )
return None
2024-03-13 23:08:08 +00:00
return super ( ) . load ( id , * * kwargs )
2023-08-31 03:59:37 +00:00
@classmethod
def fetch ( cls , obj , * * kwargs ) :
""" Tries to fetch a ATProto object.
2023-08-24 03:34:32 +00:00
2023-08-31 03:59:37 +00:00
Args :
2023-10-06 06:32:31 +00:00
obj ( models . Object ) : with the id to fetch . Fills data into the ` ` as2 ` `
2023-08-31 03:59:37 +00:00
property .
kwargs : ignored
2023-08-24 03:34:32 +00:00
2023-08-31 03:59:37 +00:00
Returns :
2023-10-06 06:32:31 +00:00
bool : True if the object was fetched and populated successfully ,
2023-08-31 03:59:37 +00:00
False otherwise
"""
id = obj . key . id ( )
if not cls . owns_id ( id ) :
logger . info ( f " ATProto can ' t fetch { id } " )
return False
2024-03-14 22:40:25 +00:00
assert not id . startswith ( ' https://bsky.app/ ' ) # handled in load
2023-08-31 17:48:28 +00:00
# did:plc, did:web
2023-08-31 03:59:37 +00:00
if id . startswith ( ' did: ' ) :
try :
Revert "cache outbound HTTP request responses, locally to each inbound request"
This reverts commit 30debfc8faf730190bd51a3aef49df6c6bfbd50a.
seemed promising, but broke in production. Saw a lot of `IncompleteRead`s on both GETs and POSTs. Rolled back for now.
```
('Connection broken: IncompleteRead(9172 bytes read, -4586 more expected)', IncompleteRead(9172 bytes read, -4586 more expected))
...
File "oauth_dropins/webutil/util.py", line 1673, in call
resp = getattr((session or requests), fn)(url, *args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "requests_cache/session.py", line 102, in get
return self.request('GET', url, params=params, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "requests_cache/session.py", line 158, in request
return super().request(method, url, *args, headers=headers, **kwargs) # type: ignore
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "requests/sessions.py", line 589, in request
resp = self.send(prep, **send_kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "requests_cache/session.py", line 205, in send
response = self._send_and_cache(request, actions, cached_response, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "requests_cache/session.py", line 233, in _send_and_cache
self.cache.save_response(response, actions.cache_key, actions.expires)
File "requests_cache/backends/base.py", line 89, in save_response
cached_response = CachedResponse.from_response(response, expires=expires)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "requests_cache/models/response.py", line 102, in from_response
obj.raw = CachedHTTPResponse.from_response(response)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "requests_cache/models/raw_response.py", line 69, in from_response
_ = response.content # This property reads, decodes, and stores response content
^^^^^^^^^^^^^^^^
File "requests/models.py", line 899, in content
self._content = b"".join(self.iter_content(CONTENT_CHUNK_SIZE)) or b""
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "requests/models.py", line 818, in generate
raise ChunkedEncodingError(e)
```
2024-03-08 21:24:28 +00:00
obj . raw = did . resolve ( id , get_fn = util . requests_get )
2023-08-31 03:59:37 +00:00
return True
2024-06-20 18:19:31 +00:00
except ( ValueError , RequestException ) as e :
2023-08-31 03:59:37 +00:00
util . interpret_http_exception ( e )
return False
2023-08-24 03:34:32 +00:00
2024-03-12 21:45:48 +00:00
# at:// URI. if it has a handle, resolve and replace with DID.
2023-08-31 17:48:28 +00:00
# examples:
# at://did:plc:s2koow7r6t7tozgd4slc3dsg/app.bsky.feed.post/3jqcpv7bv2c2q
# https://bsky.social/xrpc/com.atproto.repo.getRecord?repo=did:plc:s2koow7r6t7tozgd4slc3dsg&collection=app.bsky.feed.post&rkey=3jqcpv7bv2c2q
2024-03-03 00:50:01 +00:00
repo , collection , rkey = parse_at_uri ( id )
2024-09-29 23:18:19 +00:00
if not repo or not collection or not rkey :
return False
2024-03-03 00:50:01 +00:00
if not repo . startswith ( ' did: ' ) :
handle = repo
repo = cls . handle_to_id ( repo )
2024-03-12 21:45:48 +00:00
if not repo :
return False
assert repo . startswith ( ' did: ' )
2024-03-03 00:50:01 +00:00
obj . key = ndb . Key ( Object , id . replace ( f ' at:// { handle } ' , f ' at:// { repo } ' ) )
2024-03-12 21:45:48 +00:00
try :
2024-04-26 20:59:04 +00:00
appview . address = f ' https:// { os . environ [ " APPVIEW_HOST " ] } '
2024-04-10 23:34:40 +00:00
ret = appview . com . atproto . repo . getRecord (
2024-03-12 21:45:48 +00:00
repo = repo , collection = collection , rkey = rkey )
except RequestException as e :
util . interpret_http_exception ( e )
return False
2024-09-29 23:18:19 +00:00
except ValidationError as e :
logger . warning ( e )
return False
2024-03-12 21:45:48 +00:00
2023-09-14 16:42:11 +00:00
# TODO: verify sig?
2023-11-15 03:24:37 +00:00
obj . bsky = {
* * ret [ ' value ' ] ,
' cid ' : ret . get ( ' cid ' ) ,
}
2023-08-31 17:48:28 +00:00
return True
2023-08-24 04:04:17 +00:00
@classmethod
2024-05-14 22:58:53 +00:00
def _convert ( cls , obj , fetch_blobs = False , from_user = None ) :
2024-09-12 19:37:09 +00:00
r """ Converts a :class:`models.Object` to ``app.bsky.*`` lexicon JSON.
2023-10-24 23:09:28 +00:00
Args :
obj ( models . Object )
2023-11-03 00:41:31 +00:00
fetch_blobs ( bool ) : whether to fetch images and other blobs , store
them in : class : ` arroba . datastore_storage . AtpRemoteBlob ` \s if they
don ' t already exist, and fill them into the returned object.
2023-11-26 04:07:14 +00:00
from_user ( models . User ) : user ( actor ) this activity / object is from
2023-10-24 23:09:28 +00:00
Returns :
dict : JSON object
2023-08-29 19:35:20 +00:00
"""
2024-02-28 18:57:30 +00:00
from_proto = PROTOCOLS . get ( obj . source_protocol )
2023-11-03 00:41:31 +00:00
if obj . bsky :
return obj . bsky
if not obj . as1 :
return { }
blobs = { } # maps str URL to dict blob object
if fetch_blobs :
2024-10-02 14:54:46 +00:00
def fetch_blob ( url , blob_field , name , check_size = True , check_type = True ) :
2024-09-29 15:57:19 +00:00
if url and url not in blobs :
2024-09-30 15:53:04 +00:00
max_size = blob_field [ name ] . get ( ' maxSize ' ) if check_size else None
2024-10-02 14:54:46 +00:00
accept = blob_field [ name ] . get ( ' accept ' ) if check_type else None
2024-09-29 15:57:19 +00:00
try :
blob = AtpRemoteBlob . get_or_create (
2024-09-29 18:05:30 +00:00
url = url , get_fn = util . requests_get , max_size = max_size ,
accept_types = accept )
2024-09-29 15:57:19 +00:00
blobs [ url ] = blob . as_object ( )
except ( RequestException , ValidationError ) as e :
2024-10-23 23:29:51 +00:00
logger . info ( f ' failed, skipping { url } : { e } ' )
2024-09-29 15:57:19 +00:00
2023-11-03 00:41:31 +00:00
for o in obj . as1 , as1 . get_object ( obj . as1 ) :
2024-09-29 15:57:19 +00:00
for url in util . get_urls ( o , ' image ' ) :
2024-10-02 14:54:46 +00:00
# TODO: maybe eventually check size and type? the current
2024-09-29 15:57:19 +00:00
# 1MB limit feels too small though, and the AppView doesn't
# seem to validate, it's happily allowing bigger image blobs
2024-10-02 14:54:46 +00:00
# and different types as of 9/29/2024:
2024-09-29 15:57:19 +00:00
# https://github.com/snarfed/bridgy-fed/issues/1348#issuecomment-2381056468
2024-09-30 15:53:04 +00:00
fetch_blob ( url , appview . defs [ ' app.bsky.embed.images#image ' ] [ ' properties ' ] ,
2024-10-02 14:54:46 +00:00
name = ' image ' , check_size = False , check_type = False )
2024-09-29 15:57:19 +00:00
for att in util . get_list ( o , ' attachments ' ) :
if isinstance ( att , dict ) :
fetch_blob ( att . get ( ' stream ' , { } ) . get ( ' url ' ) ,
2024-09-30 15:53:04 +00:00
appview . defs [ ' app.bsky.embed.video ' ] [ ' properties ' ] ,
2024-10-02 14:54:46 +00:00
name = ' video ' , check_size = True , check_type = True )
2023-11-03 00:41:31 +00:00
2024-06-12 21:04:45 +00:00
inner_obj = as1 . get_object ( obj . as1 ) or obj . as1
orig_url = as1 . get_url ( inner_obj ) or inner_obj . get ( ' id ' )
2024-05-16 20:11:29 +00:00
# convert! using our records in the datastore and fetching code instead
# of granary's
client = DatastoreClient ( f ' https:// { os . environ [ " APPVIEW_HOST " ] } ' )
2024-07-22 20:55:28 +00:00
as_embed = obj . atom or obj . rss
2024-05-16 20:11:29 +00:00
try :
ret = bluesky . from_as1 ( cls . translate_ids ( obj . as1 ) , blobs = blobs ,
2024-07-22 20:55:28 +00:00
client = client , original_fields_prefix = ' bridgy ' ,
as_embed = as_embed )
2024-05-16 20:11:29 +00:00
except ( ValueError , RequestException ) :
2024-06-20 18:19:31 +00:00
logger . info ( f " Couldn ' t convert to ATProto " , exc_info = True )
2024-05-16 20:11:29 +00:00
return { }
2023-11-15 03:24:37 +00:00
2024-06-12 21:04:45 +00:00
if from_proto != ATProto :
if ret [ ' $type ' ] == ' app.bsky.actor.profile ' :
# populated by Protocol.convert
2024-06-12 22:15:08 +00:00
if orig_summary := obj . as1 . get ( ' bridgyOriginalSummary ' ) :
ret [ ' bridgyOriginalDescription ' ] = orig_summary
2024-06-12 21:04:45 +00:00
else :
# don't use granary's since it will include source links
2024-06-12 22:15:08 +00:00
ret . pop ( ' bridgyOriginalDescription ' , None )
2024-06-12 21:04:45 +00:00
# bridged actors get a self label
label_val = ' bridged-from-bridgy-fed '
if from_proto :
label_val + = f ' - { from_proto . LABEL } '
ret . setdefault ( ' labels ' , { ' $type ' : ' com.atproto.label.defs#selfLabels ' } )
ret [ ' labels ' ] . setdefault ( ' values ' , [ ] ) . append ( { ' val ' : label_val } )
if ( ret [ ' $type ' ] in ( ' app.bsky.actor.profile ' , ' app.bsky.feed.post ' )
and orig_url ) :
2024-06-12 22:15:08 +00:00
ret [ ' bridgyOriginalUrl ' ] = orig_url
2024-05-14 20:51:06 +00:00
2023-11-15 03:24:37 +00:00
return ret
2023-09-14 16:42:11 +00:00
2024-06-06 14:50:16 +00:00
@classmethod
2024-06-06 18:37:31 +00:00
def add_source_links ( cls , actor , obj , from_user ) :
2024-06-06 14:50:16 +00:00
""" Adds " bridged from ... by Bridgy Fed " text to ``obj.our_as1``.
Overrides the default : meth : ` protocol . Protocol . add_source_links `
implementation to use plain text URLs because ` ` app . bsky . actor . profile ` `
has no ` ` descriptionFacets ` ` for the ` ` description ` ` field .
TODO : much of this duplicates
: meth : ` protocol . Protocol . add_source_links ` . Refactor somehow .
Args :
obj ( models . Object ) :
from_user ( models . User ) : user ( actor ) this activity / object is from
"""
assert obj . our_as1
assert from_user
2024-06-12 21:04:45 +00:00
orig_summary = obj . our_as1 . setdefault ( ' summary ' , ' ' )
summary = html_to_text ( orig_summary , ignore_links = True )
2024-06-06 14:50:16 +00:00
if ' fed.brid.gy ] ' in summary or ' Bridgy Fed] ' in summary :
return
2024-06-12 21:04:45 +00:00
# consumed by _convert above
2024-06-12 22:15:08 +00:00
actor . setdefault ( ' bridgyOriginalSummary ' , orig_summary )
2024-06-12 21:04:45 +00:00
2024-06-06 14:50:16 +00:00
id = obj . key . id ( ) if obj . key else obj . our_as1 . get ( ' id ' )
proto_phrase = ( PROTOCOLS [ obj . source_protocol ] . PHRASE
if obj . source_protocol else ' ' )
if proto_phrase :
proto_phrase = f ' on { proto_phrase } '
2024-06-06 21:02:22 +00:00
if from_user . key and id in ( from_user . key . id ( ) , from_user . profile_id ( ) ) :
2024-06-06 14:50:16 +00:00
url = from_user . web_url ( )
else :
url = as1 . get_url ( obj . our_as1 ) or id
2024-06-06 21:02:22 +00:00
url = util . pretty_link ( url ) if url else ' ? '
2024-06-06 14:50:16 +00:00
2024-07-24 05:11:31 +00:00
if from_user . LABEL == ' web ' :
# link web users to their user pages
source_links = f ' [bridged from { url } { proto_phrase } : https:// { PRIMARY_DOMAIN } { from_user . user_page_path ( ) } ] '
else :
source_links = f ' [bridged from { url } { proto_phrase } by https:// { PRIMARY_DOMAIN } / ] '
2024-06-06 14:50:16 +00:00
if summary :
2024-06-10 23:02:18 +00:00
source_links = ' \n \n ' + source_links
2024-06-14 02:46:43 +00:00
obj . our_as1 [ ' summary ' ] = Bluesky ( ' unused ' ) . truncate (
summary , url = source_links , punctuation = ( ' ' , ' ' ) , type = obj . type )
2024-06-06 14:50:16 +00:00
2024-08-26 19:59:01 +00:00
def create_report ( * , input , from_user ) :
""" Sends a ``createReport`` for a ``flag`` activity.
2024-04-26 20:59:04 +00:00
2024-08-26 19:59:01 +00:00
Args :
input ( dict ) : ` ` createReport ` ` input
from_user ( models . User ) : user ( actor ) this flag is from
2024-04-26 20:59:04 +00:00
2024-08-26 19:59:01 +00:00
Returns :
bool : True if the report was sent successfully , False if the flag ' s
actor is not bridged into ATProto
"""
assert input [ ' $type ' ] == ' com.atproto.moderation.createReport#input '
2024-04-26 20:59:04 +00:00
2024-08-26 19:59:01 +00:00
repo_did = from_user . get_copy ( ATProto )
if not repo_did :
return False
2024-06-08 22:16:39 +00:00
2024-11-14 19:38:43 +00:00
repo = arroba . server . storage . load_repo ( repo_did )
assert repo
if repo . status :
logger . info ( f ' { repo . did } is { repo . status } , giving up ' )
2024-08-26 19:59:01 +00:00
return False
2024-06-08 22:16:39 +00:00
2024-08-26 19:59:01 +00:00
mod_host = os . environ [ ' MOD_SERVICE_HOST ' ]
token = service_jwt ( host = mod_host ,
aud = os . environ [ ' MOD_SERVICE_DID ' ] ,
repo_did = repo_did ,
privkey = repo . signing_key )
2024-08-07 00:42:12 +00:00
2024-08-26 19:59:01 +00:00
client = Client ( f ' https:// { mod_host } ' , truncate = True , headers = {
' User-Agent ' : USER_AGENT ,
' Authorization ' : f ' Bearer { token } ' ,
} )
output = client . com . atproto . moderation . createReport ( input )
logger . info ( f ' Created report on { mod_host } : { json_dumps ( output ) } ' )
return True
2024-08-07 00:42:12 +00:00
2024-08-26 19:59:01 +00:00
def send_chat ( * , msg , from_repo , to_did ) :
""" Sends a chat message to this user.
2024-08-07 00:42:12 +00:00
2024-08-26 19:59:01 +00:00
Args :
msg ( dict ) : ` ` chat . bsky . convo . defs #messageInput``
from_repo ( arroba . repo . Repo )
to_did ( str )
Returns :
bool : True if the message was sent successfully , False otherwise , eg
if the recipient has disabled chat
"""
assert msg [ ' $type ' ] == ' chat.bsky.convo.defs#messageInput '
client = chat_client ( repo = from_repo ,
method = ' chat.bsky.convo.getConvoForMembers ' )
try :
convo = client . chat . bsky . convo . getConvoForMembers ( members = [ to_did ] )
except RequestException as e :
util . interpret_http_exception ( e )
if e . response is not None and e . response . status_code == 400 :
body = e . response . json ( )
if ( body . get ( ' error ' ) == ' InvalidRequest '
and body . get ( ' message ' ) == ' recipient has disabled incoming messages ' ) :
return False
raise
client = chat_client ( repo = from_repo , method = ' chat.bsky.convo.sendMessage ' )
sent = client . chat . bsky . convo . sendMessage ( {
' convoId ' : convo [ ' convo ' ] [ ' id ' ] ,
' message ' : msg ,
} )
logger . info ( f ' Sent chat message from { from_repo . handle } to { to_did } : { json_dumps ( sent ) } ' )
return True
2024-08-28 05:44:18 +00:00
@cloud_tasks_only
def poll_chat_task ( ) :
""" Polls for incoming chat messages for our protocol bot users.
Params :
proto : protocol label , eg ` ` activitypub ` `
"""
proto = PROTOCOLS [ get_required_param ( ' proto ' ) ]
logger . info ( f ' Polling incoming chat messages for { proto . LABEL } ' )
from web import Web
2024-08-28 23:07:37 +00:00
bot = Web . get_by_id ( proto . bot_user_id ( ) )
assert bot . atproto_last_chat_log_cursor
repo = arroba . server . storage . load_repo ( bot . get_copy ( ATProto ) )
2024-08-28 05:44:18 +00:00
client = chat_client ( repo = repo , method = ' chat.bsky.convo.getLog ' )
while True :
2024-08-28 23:07:37 +00:00
# getLog returns logs in ascending order, starting from cursor
# https://github.com/bluesky-social/atproto/issues/2760
#
# we could use rev for idempotence, but we don't yet, since cursor alone
# should mostly avoid dupes, and we also de-dupe on chat message id, so
# we should hopefully be ok as is
logs = client . chat . bsky . convo . getLog ( cursor = bot . atproto_last_chat_log_cursor )
2024-08-28 05:44:18 +00:00
for log in logs [ ' logs ' ] :
if ( log [ ' $type ' ] == ' chat.bsky.convo.defs#logCreateMessage '
and log [ ' message ' ] [ ' $type ' ] == ' chat.bsky.convo.defs#messageView ' ) :
sender = log [ ' message ' ] [ ' sender ' ] [ ' did ' ]
2024-08-29 03:43:41 +00:00
if sender != repo . did :
2024-08-29 23:49:07 +00:00
# generate synthetic at:// URI for this message
2024-08-29 03:43:41 +00:00
id = at_uri ( did = sender ,
collection = ' chat.bsky.convo.defs.messageView ' ,
rkey = log [ ' message ' ] [ ' id ' ] )
2024-08-29 23:49:07 +00:00
msg_as1 = {
* * bluesky . to_as1 ( log [ ' message ' ] ) ,
' to ' : [ bot . key . id ( ) ] ,
}
2024-10-02 04:44:12 +00:00
common . create_task ( queue = ' receive ' , id = id , bsky = log [ ' message ' ] ,
our_as1 = msg_as1 , source_protocol = ATProto . LABEL ,
2024-10-24 04:11:21 +00:00
authed_as = sender ,
received_at = log [ ' message ' ] [ ' sentAt ' ] )
2024-08-28 05:44:18 +00:00
2024-08-28 23:07:37 +00:00
# check if we've caught up yet
2024-08-28 05:44:18 +00:00
cursor = logs . get ( ' cursor ' )
2024-08-28 23:07:37 +00:00
if cursor :
bot . atproto_last_chat_log_cursor = cursor
if not logs [ ' logs ' ] or not cursor :
2024-08-28 05:44:18 +00:00
break
# done!
2024-08-28 23:07:37 +00:00
bot . put ( )
2024-08-28 05:44:18 +00:00
return ' OK '