"""Base protocol class and common code.""" import logging import threading from cachetools import cached, LRUCache from flask import g from google.cloud import ndb from google.cloud.ndb import OR from granary import as1, as2 import common from common import error from models import Follower, Object, Target from oauth_dropins.webutil import util, webmention from oauth_dropins.webutil.util import json_dumps, json_loads SUPPORTED_TYPES = ( 'accept', 'article', 'audio', 'comment', 'create', 'delete', 'follow', 'image', 'like', 'note', 'post', 'share', 'stop-following', 'undo', 'update', 'video', ) # activity ids that we've already handled and can now ignore. # used in Protocol.receive seen_ids = LRUCache(100000) seen_ids_lock = threading.Lock() # objects that have been loaded in Protocol.load objects_cache = LRUCache(5000) objects_cache_lock = threading.Lock() logger = logging.getLogger(__name__) class Protocol: """Base protocol class. Not to be instantiated; classmethods only. Attributes: LABEL: str, label used in `Object.source_protocol` """ LABEL = None def __init__(self): assert False @classmethod def send(cls, obj, url, log_data=True): """Sends an outgoing activity. To be implemented by subclasses. Args: obj: :class:`Object` with activity to send url: str, destination URL to send to log_data: boolean, whether to log full data object Returns: True if the activity is sent successfully, False if it is ignored due to protocol logic. (Failures are raised as exceptions.) Returns: True if the activity was sent successfully, False if it was discarded or ignored due to protocol logic, ie not network or other failures Raises: :class:`werkzeug.HTTPException` if the request fails """ raise NotImplementedError() @classmethod def fetch(cls, obj): """Fetches a protocol-specific object and returns it in an :class:`Object`. To be implemented by subclasses. The returned :class:`Object` is loaded from the datastore, if it exists there, then updated in memory but not yet written back to the datastore. Args: obj: :class:`Object` with the id to fetch. Data is filled into one of the protocol-specific properties, eg as2, mf2, bsky. Raises: :class:`werkzeug.HTTPException` if the fetch fails """ raise NotImplementedError() @classmethod def serve(cls, obj): """Returns this protocol's Flask response for a given :class:`Object`. For example, an HTML string and `'text/html'` for :class:`Web`, or a dict with AS2 JSON and `'application/activity+json'` for :class:`ActivityPub. To be implemented by subclasses. Args: obj: :class:`Object` Returns: (response body, dict with HTTP headers) tuple appropriate to be returned from a Flask handler """ raise NotImplementedError() @classmethod def receive(cls, id, **props): """Handles an incoming activity. Args: id: str, activity id props: property values to populate into the :class:`Object` Returns: (response body, HTTP status code) tuple for Flask response Raises: :class:`werkzeug.HTTPException` if the request is invalid """ if not id: error('Activity has no id') # short circuit if we've already seen this activity id with seen_ids_lock: already_seen = id in seen_ids seen_ids[id] = True if already_seen or Object.get_by_id(id): msg = f'Already handled this activity {id}' logger.info(msg) return msg, 200 obj = Object.get_or_insert(id) obj.clear() obj.populate(source_protocol=cls.LABEL, **props) obj.put() logger.info(f'Got AS1: {json_dumps(obj.as1, indent=2)}') if obj.type not in SUPPORTED_TYPES: error(f'Sorry, {obj.type} activities are not supported yet.', status=501) # store inner object inner_obj = as1.get_object(obj.as1) inner_obj_id = inner_obj.get('id') if obj.type in ('post', 'create', 'update') and inner_obj.keys() > set(['id']): to_update = (Object.get_by_id(inner_obj_id) or Object(id=inner_obj_id)) to_update.populate(as2=obj.as2['object'], source_protocol=cls.LABEL) to_update.put() actor = as1.get_object(obj.as1, 'actor') actor_id = actor.get('id') # handle activity! if obj.type == 'accept': # eg in response to a Follow return 'OK' # noop elif obj.type == 'stop-following': if not actor_id or not inner_obj_id: error(f'Undo of Follow requires actor id and object id. Got: {actor_id} {inner_obj_id} {obj.as1}') # deactivate Follower followee_domain = util.domain_from_link(inner_obj_id, minimize=False) follower = Follower.get_by_id( Follower._id(dest=followee_domain, src=actor_id)) if follower: logger.info(f'Marking {follower} inactive') follower.status = 'inactive' follower.put() else: logger.warning(f'No Follower found for {followee_domain} {actor_id}') # TODO send webmention with 410 of u-follow obj.status = 'complete' obj.put() return 'OK' elif obj.type == 'update': if not inner_obj_id: error("Couldn't find id of object to update") obj.status = 'complete' obj.put() return 'OK' elif obj.type == 'delete': if not inner_obj_id: error("Couldn't find id of object to delete") to_delete = Object.get_by_id(inner_obj_id) if to_delete: logger.info(f'Marking Object {inner_obj_id} deleted') to_delete.deleted = True to_delete.put() # assume this is an actor # https://github.com/snarfed/bridgy-fed/issues/63 logger.info(f'Deactivating Followers with src or dest = {inner_obj_id}') followers = Follower.query(OR(Follower.src == inner_obj_id, Follower.dest == inner_obj_id) ).fetch() for f in followers: f.status = 'inactive' obj.status = 'complete' ndb.put_multi(followers + [obj]) return 'OK' # fetch actor if necessary so we have name, profile photo, etc if actor and actor.keys() == set(['id']): actor = obj.as2['actor'] = cls.load(actor['id']).as2 # fetch object if necessary so we can render it in feeds if obj.type == 'share' and inner_obj.keys() == set(['id']): inner_obj = obj.as2['object'] = as2.from_as1( cls.load(inner_obj_id).as1) if obj.type == 'follow': cls.accept_follow(obj) # deliver to each target cls.deliver(obj) # deliver original posts and reposts to followers is_reply = (obj.type == 'comment' or (inner_obj and inner_obj.get('inReplyTo'))) if (actor and actor_id and (obj.type == 'share' or obj.type in ('create', 'post') and not is_reply)): logger.info(f'Delivering to followers of {actor_id}') for f in Follower.query(Follower.dest == actor_id, Follower.status == 'active', projection=[Follower.src]): if f.src not in obj.domains: obj.domains.append(f.src) if obj.domains and 'feed' not in obj.labels: obj.labels.append('feed') obj.put() return 'OK' @classmethod def accept_follow(cls, obj): """Replies to an AP Follow request with an Accept request. Args: obj: :class:`Object`, follow activity """ logger.info('Replying to Follow with Accept') followee = as1.get_object(obj.as1) followee_id = followee.get('id') follower = as1.get_object(obj.as1, 'actor') if not followee or not followee_id or not follower: error(f'Follow activity requires object and actor. Got: {obj.as1}') inbox = follower.get('inbox') follower_id = follower.get('id') if not inbox or not follower_id: error(f'Follow actor requires id and inbox. Got: {follower}') # store Follower follower_obj = Follower.get_or_create( dest=g.user.key.id(), src=follower_id, last_follow=obj.as2) follower_obj.status = 'active' follower_obj.put() # send AP Accept followee_actor_url = g.user.actor_id() accept = { '@context': 'https://www.w3.org/ns/activitystreams', 'id': common.host_url(f'/user/{g.user.key.id()}/followers#accept-{obj.key.id()}'), 'type': 'Accept', 'actor': followee_actor_url, 'object': as2.from_as1(obj.as1), } return cls.send(Object(as2=accept), inbox) @classmethod def deliver(cls, obj): """Delivers an activity to its external recipients. Args: obj: :class:`Object`, activity to deliver """ # extract source and targets source = obj.as1.get('url') or obj.as1.get('id') inner_obj = as1.get_object(obj.as1) obj_url = util.get_url(inner_obj) or inner_obj.get('id') if not source or obj.type in ('create', 'post', 'update'): source = obj_url if not source: error("Couldn't find source post URL") targets = util.get_list(obj.as1, 'inReplyTo') targets.extend(util.get_list(inner_obj, 'inReplyTo')) for tag in (util.get_list(obj.as1, 'tags') + util.get_list(as1.get_object(obj.as1), 'tags')): if tag.get('objectType') == 'mention': url = tag.get('url') if url: targets.append(url) if obj.type in ('follow', 'like', 'share'): targets.append(obj_url) targets = util.dedupe_urls(util.get_url(t) for t in targets) targets = common.remove_blocklisted(t.lower() for t in targets) if not targets: logger.info("Couldn't find any IndieWeb target URLs in inReplyTo, object, or mention tags") return logger.info(f'targets: {targets}') # send webmentions and update Object errors = [] # stores (code, body) tuples targets = [Target(uri=uri, protocol='web') for uri in targets] obj.populate( undelivered=targets, status='in progress', ) while obj.undelivered: target = obj.undelivered.pop() domain = util.domain_from_link(target.uri, minimize=False) if g.user and domain == g.user.key.id(): if 'notification' not in obj.labels: obj.labels.append('notification') if domain == util.domain_from_link(source, minimize=False): logger.info(f'Skipping same-domain webmention from {source} to {target.uri}') continue if domain not in obj.domains: obj.domains.append(domain) try: # TODO: fix from web import Web if Web.send(obj, target.uri): obj.delivered.append(target) if 'notification' not in obj.labels: obj.labels.append('notification') except BaseException as e: code, body = util.interpret_http_exception(e) if not code and not body: raise errors.append((code, body)) obj.failed.append(target) obj.put() obj.status = ('complete' if obj.delivered or obj.domains else 'failed' if obj.failed else 'ignored') if errors: msg = 'Errors: ' + ', '.join(f'{code} {body}' for code, body in errors) error(msg, status=int(errors[0][0] or 502)) @classmethod def load(cls, id, refresh=False, **kwargs): """Loads and returns an Object from memory cache, datastore, or HTTP fetch. Assumes id is a URL. Any fragment at the end is stripped before loading. This is currently underspecified and somewhat inconsistent across AP implementations: https://socialhub.activitypub.rocks/t/problems-posting-to-mastodon-inbox/801/11 https://socialhub.activitypub.rocks/t/problems-posting-to-mastodon-inbox/801/23 https://socialhub.activitypub.rocks/t/s2s-create-activity/1647/5 https://github.com/mastodon/mastodon/issues/13879 (open!) https://github.com/w3c/activitypub/issues/224 Note that :meth:`Object._post_put_hook` updates the cache. Args: id: str refresh: boolean, whether to fetch the object remotely even if we have it stored kwargs: passed through to fetch() Returns: :class:`Object` Raises: :class:`requests.HTTPError`, anything else that :meth:`fetch` raises """ if not refresh: with objects_cache_lock: cached = objects_cache.get(id) if cached: return cached logger.info(f'Loading Object {id}') orig_as1 = None obj = Object.get_by_id(id) if obj and (obj.as1 or obj.deleted): logger.info(' got from datastore') obj.new = False orig_as1 = obj.as1 if not refresh: with objects_cache_lock: objects_cache[id] = obj return obj if refresh: logger.info(' forced refresh requested') if obj: obj.clear() else: logger.info(f' not in datastore') obj = Object(id=id) obj.new = True obj.changed = False cls.fetch(obj, **kwargs) if orig_as1: obj.new = False obj.changed = as1.activity_changed(orig_as1, obj.as1) obj.source_protocol = cls.LABEL obj.put() with objects_cache_lock: objects_cache[id] = obj return obj