"""Protocol-independent code for sending and receiving DMs aka chat messages."""
from datetime import timedelta
import logging
from granary import as1, source
from oauth_dropins.webutil.flask_util import error
from oauth_dropins.webutil import util
from collections import namedtuple
from common import create_task, DOMAINS
import ids
import memcache
import models
from models import Object, PROTOCOLS
import protocol
logger = logging.getLogger(__name__)
REQUESTS_LIMIT_EXPIRE = timedelta(days=1)
REQUESTS_LIMIT_USER = 10
# populated by the command() decorator
_commands = {}
def command(names, arg=False, user_bridged=None, handle_bridged=None):
    """Function decorator. Defines and registers a DM command.
    Args:
      names (sequence of str): the command strings that trigger this command, or
        ``None`` if this command has no command string
      arg: whether this command takes an argument. ``False`` for no, ``True``
        for yes, anything, ``'handle'`` for yes, a handle in the bot account's
        protocol for a user that must not already be bridged.
      user_bridged (bool): whether the user sending the DM should be
        bridged. ``True`` for yes, ``False`` for no, ``None` for either.
      handle_bridged (bool): whether the handle arg should be bridged. ``True``
        for yes, ``False`` for no, ``None` for either.
    The decorated function should have the signature:
      (from_user, to_proto, arg=None, to_user=None) => (str, None)
    If it returns a string, that text is sent to the user as a reply to their DM.
    Args for the decorated function:
      from_user (models.User): the user who sent the DM
      to_proto (protocol.Protocol): the protocol bot account they sent it to
      arg (str or None): the argument to the command, if any
      to_user (models.User or None): the user for the argument, if it's a handle
    The decorated function returns:
      str: text to reply to the user in a DM, if any
    """
    assert arg in (False, True, 'handle'), arg
    if handle_bridged is not None:
        assert arg == 'handle', arg
    def decorator(fn):
        def wrapped(from_user, to_proto, cmd, cmd_arg, dm_as1):
            def reply(text, type=None):
                maybe_send(from_proto=to_proto, to_user=from_user, text=text,
                           type=type, in_reply_to=dm_as1.get('id'))
                return 'OK', 200
            if arg and not cmd_arg:
                return reply(f'{cmd} command needs an argument
{help_text(from_user, to_proto)}')
            if arg == 'handle':
                if not to_proto.owns_handle(cmd_arg) and cmd_arg.startswith('@'):
                    logging.info(f"doesn't look like a handle, trying without leading @")
                    cmd_arg = cmd_arg.removeprefix('@')
                to_user = load_user(to_proto, cmd_arg)
                from_proto = from_user.__class__
                if not to_user:
                    return reply(f"Couldn't find user {cmd_arg} on {to_proto.PHRASE}")
                elif (handle_bridged is not None
                      and handle_bridged != to_user.is_enabled(from_proto)):
                    return reply(f'{to_user.user_link(proto=from_proto)} is {"not" if handle_bridged else "already"} bridged into {from_proto.PHRASE}.')
            from_user_enabled = from_user.is_enabled(to_proto)
            if user_bridged is True and not from_user_enabled:
                return reply(f"Looks like you're not bridged to {to_proto.PHRASE} yet! Please bridge your account first by following this account.")
            elif user_bridged is False and from_user_enabled:
                return reply(f"Looks like you're already bridged to {to_proto.PHRASE}!")
            # dispatch!
            kwargs = {}
            if arg and cmd_arg:
                kwargs['arg'] = cmd_arg
            if arg == 'handle':
                kwargs['to_user'] = to_user
            reply_text = fn(from_user, to_proto, **kwargs)
            if reply_text:
                reply(reply_text)
            return 'OK', 200
        if names is None:
            assert None not in _commands
            _commands[None] = wrapped
        else:
            assert isinstance(names, (tuple, list))
            for name in names:
                _commands[name] = wrapped
        return wrapped
    return decorator
def help_text(from_user, to_proto):
    extra = ''
    if to_proto.LABEL == 'atproto':
        extra = """
Hi! I'm a friendly bot that can help you bridge your account into {to_proto.PHRASE}. Here are some commands I respond to:
{from_user.get_copy(PROTOCOLS["atproto"])}'
@command(['username', 'handle'], arg=True, user_bridged=True)
def username(from_user, to_proto, arg):
    try:
        to_proto.set_username(from_user, arg)
    except NotImplementedError:
        return f"Sorry, Bridgy Fed doesn't support custom usernames for {to_proto.PHRASE} yet."
    except (ValueError, RuntimeError) as e:
        return str(e)
    return f"Your username in {to_proto.PHRASE} has been set to {from_user.user_link(proto=to_proto, name=False, handle=True)}. It should appear soon!"
@command(['block'], arg='handle', user_bridged=True)
def block(from_user, to_proto, arg, to_user):
    id = f'{from_user.key.id()}#bridgy-fed-block-{util.now().isoformat()}'
    obj = Object(id=id, source_protocol=from_user.LABEL, our_as1={
        'objectType': 'activity',
        'verb': 'block',
        'id': id,
        'actor': from_user.key.id(),
        'object': to_user.key.id(),
    })
    obj.put()
    from_user.deliver(obj, from_user=from_user)
    return f"""OK, you're now blocking {to_user.user_link()} on {to_proto.PHRASE}."""
@command(['unblock'], arg='handle', user_bridged=True)
def unblock(from_user, to_proto, arg, to_user):
    id = f'{from_user.key.id()}#bridgy-fed-unblock-{util.now().isoformat()}'
    obj = Object(id=id, source_protocol=from_user.LABEL, our_as1={
        'objectType': 'activity',
        'verb': 'undo',
        'id': id,
        'actor': from_user.key.id(),
        'object': {
            'objectType': 'activity',
            'verb': 'block',
            'actor': from_user.key.id(),
            'object': to_user.key.id(),
        },
    })
    obj.put()
    from_user.deliver(obj, from_user=from_user)
    return f"""OK, you're not blocking {to_user.user_link()} on {to_proto.PHRASE}."""
@command(['migrate-to'], arg='handle', user_bridged=True)
def migrate_to(from_user, to_proto, arg, to_user):
    try:
        to_proto.migrate_out(from_user, to_user.key.id())
    except ValueError as e:
        return str(e)
    return f"OK, we'll migrate your bridged account on {to_proto.PHRASE} to {to_user.user_link()}."
@command(None, arg='handle', user_bridged=True)  # no command, just the handle, alone
def prompt(from_user, to_proto, arg, to_user):
    from_proto = from_user.__class__
    try:
        ids.translate_handle(handle=arg, from_=to_proto, to=from_user, enhanced=False)
    except ValueError as e:
        logger.warning(e)
        return f"Sorry, Bridgy Fed doesn't yet support bridging handle {arg} from {to_proto.PHRASE} to {from_proto.PHRASE}."
    if to_user.is_enabled(from_proto):
        # already bridged
        return f'{to_user.user_link(proto=from_proto)} is already bridged into {from_proto.PHRASE}.'
    elif (models.DM(protocol=from_proto.LABEL, type='request_bridging')
          in to_user.sent_dms):
        # already requested
        return f"We've already sent {to_user.user_link()} a DM. Fingers crossed!"
    # check and update rate limits
    attempts_key = f'dm-user-requests-{from_user.LABEL}-{from_user.key.id()}'
    # incr leaves existing expiration as is, doesn't change it
    # https://stackoverflow.com/a/4084043/186123
    attempts = memcache.memcache.incr(attempts_key, 1)
    if not attempts:
        memcache.memcache.add(
            attempts_key, 1,
            expire=int(REQUESTS_LIMIT_EXPIRE.total_seconds()))
    elif attempts > REQUESTS_LIMIT_USER:
        return f"Sorry, you've hit your limit of {REQUESTS_LIMIT_USER} requests per day. Try again tomorrow!"
    # send the DM request!
    maybe_send(from_proto=from_proto, to_user=to_user, type='request_bridging', text=f"""\
Hi! {from_user.user_link(proto=to_proto, proto_fallback=True)} is using Bridgy Fed to bridge their account from {from_proto.PHRASE} into {to_proto.PHRASE}, and they'd like to follow you. You can bridge your account into {from_proto.PHRASE} by following this account. See the docs for more information.
If you do nothing, your account won't be bridged, and users on {from_proto.PHRASE} won't be able to see or interact with you.
Bridgy Fed will only send you this message once.""") return f"Got it! We'll send {to_user.user_link()} a message and say that you hope they'll enable the bridge. Fingers crossed!" def maybe_send(*, from_proto, to_user, text, type=None, in_reply_to=None): """Sends a DM. Creates a task to send the DM asynchronously. If ``type`` is provided, and we've already sent this user a DM of this type from this protocol, does nothing. Args: from_proto (protocol.Protocol) to_user (models.User) text (str): message content. May be HTML. type (str): optional, one of DM.TYPES in_reply_to (str): optional, ``id`` of a DM to reply to """ if type: dm = models.DM(protocol=from_proto.LABEL, type=type) if dm in to_user.sent_dms: return from web import Web bot = Web.get_by_id(from_proto.bot_user_id()) logger.info(f'Sending DM from {bot.key.id()} to {to_user.key.id()} : {text}') if not to_user.obj or not to_user.obj.as1: logger.info(" can't send DM, recipient has no profile obj") return dm_id = f'{bot.profile_id()}#bridgy-fed-dm-{type or "?"}-{to_user.key.id()}-{util.now().isoformat()}' dm_as1 = { 'objectType': 'note', 'id': dm_id, 'author': bot.key.id(), 'content': text, 'inReplyTo': in_reply_to, 'tags': [{ 'objectType': 'mention', 'url': to_user.key.id(), }], 'to': [to_user.key.id()], } Object(id=dm_id, our_as1=dm_as1, source_protocol='web').put() create_id = f'{dm_id}-create' create_as1 = { 'objectType': 'activity', 'verb': 'post', 'id': create_id, 'actor': bot.key.id(), 'object': dm_as1, 'to': [to_user.key.id()], } target_uri = to_user.target_for(to_user.obj, shared=False) target = models.Target(protocol=to_user.LABEL, uri=target_uri) create_task(queue='send', id=create_id, our_as1=create_as1, source_protocol='web', protocol=to_user.LABEL, url=target.uri, user=bot.key.urlsafe()) if type: to_user.sent_dms.append(dm) to_user.put() def receive(*, from_user, obj): """Handles a DM that a user sent to one of our protocol bot users. Args: from_user (models.User) obj (Object): DM Returns: (str, int) tuple: (response body, HTTP status code) Flask response """ recip = as1.recipient_if_dm(obj.as1) assert recip to_proto = protocol.Protocol.for_bridgy_subdomain(recip) assert to_proto # already checked in check_supported call in Protocol.receive inner_as1 = (as1.get_object(obj.as1) if as1.object_type(obj.as1) == 'post' else obj.as1) logger.info(f'got DM from {from_user.key.id()} to {to_proto.LABEL}: {inner_as1.get("content")}') # parse message text = source.html_to_text(inner_as1.get('content', '')) tokens = text.strip().lower().split() logger.info(f' tokens: {tokens}') # remove @-mention of bot, if any bot_handles = (DOMAINS + ids.BOT_ACTOR_AP_IDS + tuple(h.lstrip('@') for h in ids.BOT_ACTOR_AP_HANDLES)) if tokens and tokens[0].lstrip('@') in bot_handles: logger.debug(f' first token is bot mention, removing') tokens = tokens[1:] if not tokens or len(tokens) > 2: return r'¯\_(ツ)_/¯', 204 if fn := _commands.get(tokens[0]): return fn(from_user, to_proto, dm_as1=inner_as1, cmd=tokens[0], cmd_arg=tokens[1] if len(tokens) == 2 else None) elif len(tokens) == 1: fn = _commands.get(None) assert fn, tokens[0] return fn(from_user, to_proto, dm_as1=inner_as1, cmd=None, cmd_arg=tokens[0]) return r'¯\_(ツ)_/¯', 204 def load_user(proto, handle): """ Args: proto (protocol.Protocol) handle (str) Returns: models.User or None """ if proto.owns_handle(handle) is False: return None if id := proto.handle_to_id(handle): if user := proto.get_or_create(id): if user.obj: return user