2024-05-07 23:58:52 +00:00
|
|
|
"""Bridgy Fed firehose client. Enqueues receive tasks for events for bridged users.
|
|
|
|
"""
|
2024-05-08 22:48:39 +00:00
|
|
|
from collections import namedtuple
|
2024-05-07 23:17:44 +00:00
|
|
|
import itertools
|
|
|
|
import logging
|
2024-05-07 21:15:51 +00:00
|
|
|
import os
|
2024-05-08 17:39:03 +00:00
|
|
|
from queue import SimpleQueue
|
2024-05-07 21:15:51 +00:00
|
|
|
|
|
|
|
from carbox import read_car
|
|
|
|
import dag_json
|
|
|
|
from granary.bluesky import AT_URI_PATTERN
|
|
|
|
from lexrpc.client import Client
|
2024-05-08 17:39:03 +00:00
|
|
|
from oauth_dropins.webutil import util
|
2024-05-07 21:15:51 +00:00
|
|
|
|
2024-05-07 23:17:44 +00:00
|
|
|
from atproto import ATProto
|
2024-05-08 17:39:03 +00:00
|
|
|
from common import add, create_task
|
2024-05-07 23:17:44 +00:00
|
|
|
import models
|
2024-05-08 17:39:03 +00:00
|
|
|
from models import Object
|
2024-05-07 23:17:44 +00:00
|
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
2024-05-07 21:15:51 +00:00
|
|
|
|
2024-05-08 22:48:39 +00:00
|
|
|
# a commit operation. similar to arroba.repo.Write. record is None for deletes.
|
|
|
|
Op = namedtuple('Op', ['action', 'repo', 'path', 'record'],
|
|
|
|
# record is optional
|
|
|
|
defaults=[None])
|
|
|
|
|
|
|
|
# contains Ops
|
2024-05-08 17:39:03 +00:00
|
|
|
new_commits = SimpleQueue()
|
|
|
|
|
2024-05-07 21:15:51 +00:00
|
|
|
|
2024-05-07 23:58:52 +00:00
|
|
|
def subscribe():
|
2024-05-08 17:39:03 +00:00
|
|
|
logger.info(f'starting thread to consume firehose and detect our commits')
|
|
|
|
|
2024-05-07 23:17:44 +00:00
|
|
|
query = ATProto.query(ATProto.enabled_protocols != None)
|
|
|
|
our_atproto_dids = frozenset(key.id() for key in query.iter(keys_only=True))
|
|
|
|
|
|
|
|
other_queries = itertools.chain(*(
|
|
|
|
cls.query(cls.copies.protocol == 'atproto').iter()
|
|
|
|
for cls in set(models.PROTOCOLS.values())
|
|
|
|
if cls and cls != ATProto))
|
|
|
|
our_bridged_dids = frozenset(user.get_copy(ATProto) for user in other_queries)
|
2024-05-07 21:15:51 +00:00
|
|
|
|
2024-05-07 23:58:52 +00:00
|
|
|
logger.info(f'Loaded {len(our_atproto_dids)} ATProto, {len(our_bridged_dids)} bridged dids')
|
2024-05-08 00:36:34 +00:00
|
|
|
logger.info(f'Subscribing to {os.environ["BGS_HOST"]} firehose')
|
2024-05-07 21:15:51 +00:00
|
|
|
|
2024-05-07 23:58:52 +00:00
|
|
|
client = Client(f'https://{os.environ["BGS_HOST"]}')
|
|
|
|
cursor = None # TODO
|
|
|
|
|
|
|
|
for header, payload in client.com.atproto.sync.subscribeRepos(cursor=cursor):
|
|
|
|
if header['op'] == -1:
|
2024-05-08 00:36:34 +00:00
|
|
|
logger.warning(f'Got error from relay! {payload}')
|
|
|
|
continue
|
2024-05-07 23:58:52 +00:00
|
|
|
elif header['t'] == '#info':
|
2024-05-08 00:36:34 +00:00
|
|
|
logger.info(f'Got info from relay: {payload}')
|
|
|
|
continue
|
2024-05-07 23:58:52 +00:00
|
|
|
elif header['t'] != '#commit':
|
2024-05-07 21:15:51 +00:00
|
|
|
continue
|
|
|
|
|
2024-05-07 23:58:52 +00:00
|
|
|
repo = payload.get('repo')
|
2024-05-08 22:48:39 +00:00
|
|
|
assert repo
|
2024-05-07 23:58:52 +00:00
|
|
|
if repo in our_bridged_dids: # from a Bridgy Fed non-Bluesky user; ignore
|
|
|
|
# logger.info(f'Ignoring record from our non-ATProto bridged user {repo}')
|
2024-05-07 21:15:51 +00:00
|
|
|
continue
|
|
|
|
|
2024-05-08 20:26:36 +00:00
|
|
|
blocks = {}
|
|
|
|
if payload['blocks']:
|
|
|
|
_, blocks = read_car(payload['blocks'])
|
|
|
|
blocks = {block.cid: block for block in blocks}
|
|
|
|
|
2024-05-07 23:58:52 +00:00
|
|
|
# detect records that reference an ATProto user, eg replies, likes,
|
|
|
|
# reposts, mentions
|
2024-05-08 22:48:39 +00:00
|
|
|
for p_op in payload['ops']:
|
|
|
|
op = Op(repo=repo, action=p_op['action'], path=p_op['path'])
|
|
|
|
assert op.action and op.path, (op.action, op.path)
|
|
|
|
cid = p_op['cid']
|
2024-05-08 20:26:36 +00:00
|
|
|
|
|
|
|
is_ours = repo in our_atproto_dids
|
2024-05-08 22:48:39 +00:00
|
|
|
if is_ours and op.action == 'delete':
|
|
|
|
logger.info(f'Got delete from our ATProto user: {op}')
|
2024-05-08 20:31:08 +00:00
|
|
|
# TODO: also detect deletes of records that *reference* our bridged
|
|
|
|
# users, eg a delete of a follow or like or repost of them.
|
|
|
|
# not easy because we need to getRecord the record to check
|
2024-05-08 22:48:39 +00:00
|
|
|
new_commits.put(op)
|
2024-05-08 20:26:36 +00:00
|
|
|
continue
|
2024-05-07 23:58:52 +00:00
|
|
|
|
2024-05-08 22:48:39 +00:00
|
|
|
block = blocks.get(cid)
|
2024-05-08 20:26:36 +00:00
|
|
|
# our own commits are sometimes missing the record
|
|
|
|
# https://github.com/snarfed/bridgy-fed/issues/1016
|
|
|
|
if not block:
|
2024-05-07 23:58:52 +00:00
|
|
|
continue
|
|
|
|
|
2024-05-08 22:48:39 +00:00
|
|
|
op = Op(*op[:-1], record=block.decoded)
|
|
|
|
type = op.record.get('$type')
|
2024-05-07 23:58:52 +00:00
|
|
|
if not type:
|
2024-05-08 22:48:39 +00:00
|
|
|
logger.warning('commit record missing $type! {op.action} {op.repo} {op.path} {cid}')
|
|
|
|
logger.warning(dag_json.encode(op.record).decode())
|
2024-05-07 23:58:52 +00:00
|
|
|
continue
|
|
|
|
|
2024-05-08 22:48:39 +00:00
|
|
|
if is_ours:
|
|
|
|
logger.info(f'Got one from our ATProto user: {op.action} {op.repo} {op.path}')
|
|
|
|
new_commits.put(op)
|
2024-05-08 17:39:03 +00:00
|
|
|
continue
|
|
|
|
|
2024-05-07 23:58:52 +00:00
|
|
|
subjects = []
|
2024-05-08 18:18:50 +00:00
|
|
|
|
|
|
|
def maybe_add(did_or_ref):
|
|
|
|
if isinstance(did_or_ref, dict):
|
|
|
|
match = AT_URI_PATTERN.match(did_or_ref['uri'])
|
|
|
|
if match:
|
|
|
|
did = match.group('repo')
|
2024-05-08 18:31:31 +00:00
|
|
|
else:
|
|
|
|
return
|
2024-05-08 18:18:50 +00:00
|
|
|
else:
|
|
|
|
did = did_or_ref
|
|
|
|
|
2024-05-08 17:39:03 +00:00
|
|
|
if did and did in our_bridged_dids:
|
2024-05-07 23:58:52 +00:00
|
|
|
add(subjects, did)
|
|
|
|
|
|
|
|
if type in ('app.bsky.feed.like', 'app.bsky.feed.repost'):
|
2024-05-08 22:48:39 +00:00
|
|
|
maybe_add(op.record['subject'])
|
2024-05-07 23:58:52 +00:00
|
|
|
|
|
|
|
elif type in ('app.bsky.graph.block', 'app.bsky.graph.follow'):
|
2024-05-08 22:48:39 +00:00
|
|
|
maybe_add(op.record['subject'])
|
2024-05-07 23:58:52 +00:00
|
|
|
|
|
|
|
elif type == 'app.bsky.feed.post':
|
|
|
|
# replies
|
2024-05-08 22:48:39 +00:00
|
|
|
if reply := op.record.get('reply'):
|
2024-05-07 23:58:52 +00:00
|
|
|
for ref in 'parent', 'root':
|
2024-05-08 18:18:50 +00:00
|
|
|
maybe_add(reply[ref])
|
2024-05-07 23:58:52 +00:00
|
|
|
|
|
|
|
# mentions
|
2024-05-08 22:48:39 +00:00
|
|
|
for facet in op.record.get('facets', []):
|
2024-05-07 23:58:52 +00:00
|
|
|
for feature in facet['features']:
|
2024-05-08 18:31:31 +00:00
|
|
|
if feature['$type'] == 'app.bsky.richtext.facet#mention':
|
2024-05-07 23:58:52 +00:00
|
|
|
maybe_add(feature['did'])
|
|
|
|
|
2024-05-08 18:39:45 +00:00
|
|
|
# quote posts
|
2024-05-08 22:48:39 +00:00
|
|
|
if embed := op.record.get('embed'):
|
2024-05-08 19:13:26 +00:00
|
|
|
if embed['$type'] == 'app.bsky.embed.record':
|
2024-05-08 18:39:45 +00:00
|
|
|
maybe_add(embed['record'])
|
2024-05-08 19:13:26 +00:00
|
|
|
elif embed['$type'] == 'app.bsky.embed.recordWithMedia':
|
|
|
|
maybe_add(embed['record']['record'])
|
2024-05-07 23:58:52 +00:00
|
|
|
|
2024-05-08 17:39:03 +00:00
|
|
|
if subjects:
|
2024-05-08 22:48:39 +00:00
|
|
|
logger.info(f'Got one re our ATProto users {subjects}: {op.action} {op.repo} {op.path}')
|
|
|
|
new_commits.put(op)
|
2024-05-07 23:58:52 +00:00
|
|
|
|
2024-05-08 17:39:03 +00:00
|
|
|
logger.info('Ran out of events! Relay closed connection?')
|
2024-05-07 23:58:52 +00:00
|
|
|
|
|
|
|
|
2024-05-09 03:19:18 +00:00
|
|
|
def handle(limit=None):
|
2024-05-08 17:39:03 +00:00
|
|
|
"""Store Objects and create receive tasks for commits as they arrive.
|
|
|
|
|
|
|
|
:meth:`Object.get_or_create` makes network calls, via eg :meth:`Object.as1`
|
|
|
|
=> :meth:`ATProto.pds_for` and :meth:`ATProto.handle`, so we don't want to
|
|
|
|
do those in the critical path in :func:`subscribe`.
|
2024-05-09 03:19:18 +00:00
|
|
|
|
|
|
|
Args:
|
|
|
|
limit (int): return after handling this many commits
|
2024-05-08 17:39:03 +00:00
|
|
|
"""
|
|
|
|
logger.info(f'started thread to store objects and enqueue receive tasks')
|
|
|
|
|
2024-05-09 03:19:18 +00:00
|
|
|
count = 0
|
|
|
|
while op := new_commits.get():
|
|
|
|
at_uri = f'at://{op.repo}/{op.path}'
|
2024-05-08 17:39:03 +00:00
|
|
|
|
|
|
|
# store object, enqueue receive task
|
|
|
|
# TODO: for Object.bsky, does record have CIDs etc? how do we store?
|
|
|
|
# dag-json? how are polls doing this?
|
2024-05-09 03:19:18 +00:00
|
|
|
if op.action in ('create', 'update'):
|
|
|
|
record_kwarg = {'bsky': op.record}
|
|
|
|
obj_id = at_uri
|
|
|
|
elif op.action == 'delete':
|
|
|
|
obj_id = f'{at_uri}#delete'
|
|
|
|
record_kwarg = {'our_as1': {
|
|
|
|
'objectType': 'activity',
|
|
|
|
'verb': 'delete',
|
|
|
|
'id': obj_id,
|
|
|
|
'actor': op.repo,
|
|
|
|
'object': at_uri,
|
|
|
|
}}
|
|
|
|
else:
|
|
|
|
logger.error(f'Unknown action {action} for {op.repo} {op.path}')
|
|
|
|
continue
|
|
|
|
|
|
|
|
obj = Object.get_or_create(id=obj_id, actor=op.repo, status='new',
|
|
|
|
users=[ATProto(id=op.repo).key],
|
|
|
|
source_protocol=ATProto.LABEL, **record_kwarg)
|
|
|
|
|
|
|
|
create_task(queue='receive', obj=obj.key.urlsafe(), authed_as=op.repo)
|
|
|
|
|
|
|
|
count += 1
|
|
|
|
if limit is not None and count >= limit:
|
|
|
|
return
|
2024-05-08 17:39:03 +00:00
|
|
|
|
|
|
|
assert False, "enqueue thread shouldn't reach here!"
|
2024-05-08 00:36:34 +00:00
|
|
|
|
2024-05-07 23:58:52 +00:00
|
|
|
|
|
|
|
if __name__ == '__main__':
|
|
|
|
from oauth_dropins.webutil import appengine_config
|
|
|
|
import activitypub, web
|
|
|
|
|
|
|
|
with appengine_config.ndb_client.context():
|
|
|
|
subscribe()
|