kopia lustrzana https://github.com/snarfed/bridgy-fed
				
				
				
			
		
			
				
	
	
		
			446 wiersze
		
	
	
		
			14 KiB
		
	
	
	
		
			Python
		
	
	
			
		
		
	
	
			446 wiersze
		
	
	
		
			14 KiB
		
	
	
	
		
			Python
		
	
	
| """Translates user ids, handles, and object ids between protocols.
 | |
| 
 | |
| https://fed.brid.gy/docs#translate
 | |
| """
 | |
| import inspect
 | |
| import logging
 | |
| import re
 | |
| from threading import Lock
 | |
| from urllib.parse import urljoin, urlparse
 | |
| 
 | |
| from arroba.util import parse_at_uri
 | |
| from cachetools import cached, LRUCache
 | |
| from flask import request
 | |
| from google.cloud.ndb.query import FilterNode, Query
 | |
| from granary.bluesky import BSKY_APP_URL_RE, web_url_to_at_uri
 | |
| from oauth_dropins.webutil import util
 | |
| 
 | |
| from common import (
 | |
|     LOCAL_DOMAINS,
 | |
|     PRIMARY_DOMAIN,
 | |
|     PROTOCOL_DOMAINS,
 | |
|     subdomain_wrap,
 | |
|     SUPERDOMAIN,
 | |
|     unwrap,
 | |
| )
 | |
| import models
 | |
| 
 | |
| logger = logging.getLogger(__name__)
 | |
| 
 | |
| # Protocols to check User.copies and Object.copies before translating
 | |
| # populated in models.reset_protocol_properties
 | |
| COPIES_PROTOCOLS = None
 | |
| 
 | |
| # Webfinger allows all sorts of characters that ATProto handles and Nostr usernames
 | |
| # don't, notably _ and ~. Map those to -.
 | |
| # ( : (colon) is mostly just used in the fake protocols in unit tests.)
 | |
| # https://www.rfc-editor.org/rfc/rfc7565.html#section-7
 | |
| # https://atproto.com/specs/handle
 | |
| # https://github.com/snarfed/bridgy-fed/issues/982
 | |
| # https://github.com/swicg/activitypub-webfinger/issues/9
 | |
| DASH_CHARS = ('_', '~', ':')
 | |
| 
 | |
| # can't use translate_user_id because Web.owns_id checks valid_domain, which
 | |
| # doesn't allow our protocol subdomains
 | |
| BOT_ACTOR_AP_IDS = tuple(f'https://{domain}/{domain}' for domain in PROTOCOL_DOMAINS)
 | |
| BOT_ACTOR_AP_HANDLES = tuple(f'@{domain}@{domain}' for domain in PROTOCOL_DOMAINS)
 | |
| 
 | |
| # if the path for a URL on a subdomain starts with this, it's our own web page/post,
 | |
| # not the subdomain protocol's.
 | |
| INTERNAL_PATH_PREFIX = '/internal/'
 | |
| 
 | |
| # Domains that we set custom Bluesky subdomain handles for. They redirect their
 | |
| # /.well-known/atproto-did path to fed.brid.gy for ATProto handle resolution.
 | |
| # https://github.com/snarfed/bridgy-fed/issues/1305
 | |
| # https://fed.brid.gy/docs#bluesky-handle-api
 | |
| ATPROTO_HANDLE_DOMAINS = (
 | |
|     'faithtree.social',
 | |
|     'music-social.com',
 | |
| )
 | |
| 
 | |
| 
 | |
| def validate(id, from_, to):
 | |
|     """Validates args.
 | |
| 
 | |
|     Asserts that all args are non-None. If ``from_`` or ``to`` are instances,
 | |
|     returns their classes.
 | |
|     """
 | |
|     assert id and from_ and to, (id, from_, to)
 | |
| 
 | |
|     if not inspect.isclass(from_):
 | |
|         from_ = from_.__class__
 | |
|     if not inspect.isclass(to):
 | |
|         to = to.__class__
 | |
| 
 | |
|     return id, from_, to
 | |
| 
 | |
| 
 | |
| @cached(LRUCache(10000), lock=Lock())
 | |
| def web_ap_base_domain(user_domain):
 | |
|     """Returns the full Bridgy Fed domain to use for a given Web user.
 | |
| 
 | |
|     Specifically, returns ``http://localhost/` if we're running locally,
 | |
|     ``https://[ap_subdomain].brid.gy/`` for the Web entity for this domain if it
 | |
|     exists, otherwise ``https://web.brid.gy/``.
 | |
| 
 | |
|     Args:
 | |
|       user_domain (str)
 | |
| 
 | |
|     Returns:
 | |
|       str:
 | |
|     """
 | |
|     if (request.host in LOCAL_DOMAINS and
 | |
|             not (user_domain == PRIMARY_DOMAIN or user_domain in PROTOCOL_DOMAINS)):
 | |
|         return request.host_url
 | |
| 
 | |
|     from web import Web
 | |
|     if user := Web.get_by_id(user_domain):
 | |
|         return f'https://{user.ap_subdomain}{SUPERDOMAIN}/'
 | |
| 
 | |
|     return f'https://web{SUPERDOMAIN}/'
 | |
| 
 | |
| 
 | |
| def translate_user_id(*, id, from_, to):
 | |
|     """Translate a user id from one protocol to another.
 | |
| 
 | |
|     *NOTE*: unlike :func:`translate_object_id`, if ``to`` is a ``HAS_COPIES`` protocol
 | |
|     and has no copy object for ``id``, this function returns None, not ``id``!
 | |
| 
 | |
|     TODO: unify with :func:`translate_object_id`.
 | |
| 
 | |
|     Args:
 | |
|       id (str)
 | |
|       from_ (protocol.Protocol)
 | |
|       to (protocol.Protocol)
 | |
| 
 | |
|     Returns:
 | |
|       str: the corresponding id in ``to``
 | |
|     """
 | |
|     id, from_, to = validate(id, from_, to)
 | |
| 
 | |
|     # check for and handle our own subdomain-wrapped ids, eg
 | |
|     # https://bsky.brid.gy/ap/did:plc:456
 | |
|     from protocol import Protocol
 | |
|     if domain_proto := Protocol.for_bridgy_subdomain(id, fed='web'):
 | |
|         path = urlparse(id).path.strip('/').split('/')
 | |
|         if (path[0] == from_.ABBREV
 | |
|                 or (from_.ABBREV == 'ap' and domain_proto.ABBREV == 'web'
 | |
|                     and len(path) == 1)):
 | |
|             id = unwrap(id)
 | |
|             from_ = domain_proto
 | |
| 
 | |
|     assert from_.owns_id(id) is not False or from_.LABEL == 'ui', \
 | |
|         (id, from_.LABEL, to.LABEL)
 | |
| 
 | |
|     parsed = urlparse(id)
 | |
|     if from_.LABEL == 'web' and parsed.path.strip('/') == '':
 | |
|         # home page; replace with domain
 | |
|         id = parsed.netloc
 | |
| 
 | |
|     # bsky.app profile URL to DID
 | |
|     if to.LABEL == 'atproto':
 | |
|         if match := BSKY_APP_URL_RE.match(id):
 | |
|             repo = match.group('id')
 | |
|             if repo.startswith('did:'):
 | |
|                 return repo
 | |
| 
 | |
|             from atproto import ATProto
 | |
|             try:
 | |
|                 return ATProto.handle_to_id(repo)
 | |
|             except (AssertionError, ValueError) as e:
 | |
|                 logger.warning(e)
 | |
|                 return None
 | |
| 
 | |
|     if from_ == to:
 | |
|         return id
 | |
| 
 | |
|     # follow use_instead
 | |
|     user = from_.get_by_id(id, allow_opt_out=True)
 | |
|     if user:
 | |
|         id = user.key.id()
 | |
|         if to.LABEL in COPIES_PROTOCOLS:
 | |
|             if copy := user.get_copy(to):
 | |
|                 return copy
 | |
| 
 | |
|     if from_.LABEL in COPIES_PROTOCOLS:
 | |
|         if orig := models.get_original_user_key(id):
 | |
|             if orig.kind() == to._get_kind():
 | |
|                 return orig.id()
 | |
| 
 | |
|     match from_.LABEL, to.LABEL:
 | |
|         case _, 'atproto' | 'nostr':
 | |
|             logger.debug(f"Can't translate user id {id} to {to.LABEL} , haven't copied it there yet!")
 | |
|             return None
 | |
| 
 | |
|         case 'web', 'activitypub':
 | |
|             return urljoin(web_ap_base_domain(id), id)
 | |
| 
 | |
|         case 'activitypub', 'web':
 | |
|             return id
 | |
| 
 | |
|         case _, 'activitypub' | 'web':
 | |
|             from activitypub import ActivityPub
 | |
|             if user and not user.is_enabled(ActivityPub):
 | |
|                 return user.web_url()
 | |
|             return subdomain_wrap(from_, f'/{to.ABBREV}/{id}')
 | |
| 
 | |
|         # only for unit tests
 | |
|         case _, 'fake' | 'other' | 'efake':
 | |
|             return f'{to.LABEL}:u:{id}'
 | |
|         case 'fake' | 'other', _:
 | |
|             return id
 | |
| 
 | |
|     assert False, (id, from_.LABEL, to.LABEL)
 | |
| 
 | |
| 
 | |
| def normalize_user_id(*, id, proto):
 | |
|     """Normalizes a user id to its canonical representation in a given protocol.
 | |
| 
 | |
|     Examples:
 | |
| 
 | |
|     * Web:
 | |
|       * user.com => user.com
 | |
|       * www.user.com => user.com
 | |
|       * https://user.com/ => user.com
 | |
|     * ATProto:
 | |
|       * did:plc:123 => did:plc:123
 | |
|       * https://bsky.app/profile/did:plc:123 => did:plc:123
 | |
| 
 | |
|     Note that :func:`profile_id` is a narrower inverse of this; it converts
 | |
|     user ids to profile ids.
 | |
| 
 | |
|     Args:
 | |
|       id (str)
 | |
|       proto (protocol.Protocol)
 | |
| 
 | |
|     Returns:
 | |
|       str: the normalized user id
 | |
|     """
 | |
|     normalized = translate_user_id(id=id, from_=proto, to=proto)
 | |
| 
 | |
|     if proto.LABEL == 'web':
 | |
|         normalized = util.domain_from_link(normalized)
 | |
|     elif proto.LABEL == 'atproto' and id.startswith('at://'):
 | |
|         normalized, _, _ = parse_at_uri(id)
 | |
|     elif proto.LABEL == 'nostr':
 | |
|         normalized = id.removeprefix('nostr:')
 | |
|     elif proto.LABEL in ('fake', 'efake', 'other'):
 | |
|         normalized = normalized.replace(':profile:', ':')
 | |
| 
 | |
|     return normalized
 | |
| 
 | |
| 
 | |
| def profile_id(*, id, proto):
 | |
|     """Returns the profile object id for a given user id.
 | |
| 
 | |
|     Examples:
 | |
| 
 | |
|     * Web: user.com => https://user.com/
 | |
|     * ActivityPub: https://inst.ance/alice => https://inst.ance/alice
 | |
|     * ATProto: did:plc:123 => at://did:plc:123/app.bsky.actor.profile/self
 | |
| 
 | |
|     Note that :func:`normalize_user_id` does the inverse of this, ie converts
 | |
|     profile ids to user ids.
 | |
| 
 | |
|     Args:
 | |
|       id (str)
 | |
|       proto (protocol.Protocol)
 | |
| 
 | |
|     Returns:
 | |
|       str: the profile id
 | |
|     """
 | |
|     assert proto.owns_id(id) is not False, (id, proto.LABEL)
 | |
| 
 | |
|     match proto.LABEL:
 | |
|         case 'atproto':
 | |
|             return f'at://{id}/app.bsky.actor.profile/self'
 | |
| 
 | |
|         case 'web' if not (id.startswith('https://') or id.startswith('http://')):
 | |
|             return f'https://{id}/'
 | |
| 
 | |
|         # only for unit tests
 | |
|         case 'fake' if not id.startswith('fake:profile:'):
 | |
|             return id.replace('fake:', 'fake:profile:')
 | |
| 
 | |
|         case _:
 | |
|             return id
 | |
| 
 | |
| 
 | |
| def translate_handle(*, handle, from_, to, enhanced):
 | |
|     """Translates a user handle from one protocol to another.
 | |
| 
 | |
|     Args:
 | |
|       handle (str)
 | |
|       from_ (protocol.Protocol)
 | |
|       to (protocol.Protocol)
 | |
|       enhanced (bool): whether to convert to an "enhanced" handle based on the
 | |
|         user's domain
 | |
| 
 | |
|     TODO: drop enhanced arg, always use if available?
 | |
| 
 | |
|     Returns:
 | |
|       str: the corresponding handle in ``to``
 | |
| 
 | |
|     Raises:
 | |
|       ValueError: if the user's handle is invalid, eg begins or ends with an
 | |
|         underscore or dash
 | |
|     """
 | |
|     handle, from_, to = validate(handle, from_, to)
 | |
| 
 | |
|     if from_ == to:
 | |
|         return handle
 | |
| 
 | |
|     if from_.LABEL != 'ui':
 | |
|         if from_.owns_handle(handle, allow_internal=True) is False:
 | |
|             raise ValueError(f'input handle {handle} is not valid for {from_.LABEL}')
 | |
| 
 | |
|     if from_.LABEL == 'nostr':
 | |
|         # _ username is NIP-05 shortcut for just the domain itself
 | |
|         # https://nips.nostr.com/5#showing-just-the-domain-as-an-identifier
 | |
|         handle = handle.removeprefix('_@')
 | |
| 
 | |
|     # "flatten" [@]user@domain handles to just domain-like, eg user.domain,
 | |
|     # and then append @[protocol domain], so we end up with user.domain@proto.brid.gy
 | |
|     flattened = handle.lstrip('@').replace('@', '.')
 | |
|     for from_char in DASH_CHARS:
 | |
|         flattened = flattened.replace(from_char, '-')
 | |
| 
 | |
|     def flattened_user_at_domain():
 | |
|         domain = f'{from_.ABBREV}{SUPERDOMAIN}'
 | |
|         if enhanced or handle == PRIMARY_DOMAIN or handle in PROTOCOL_DOMAINS:
 | |
|             domain = flattened
 | |
|         return f'{flattened}@{domain}'
 | |
| 
 | |
|     output = None
 | |
|     match from_.LABEL, to.LABEL:
 | |
|         case _, 'activitypub':
 | |
|             output = '@' + flattened_user_at_domain()
 | |
| 
 | |
|         case _, 'atproto':
 | |
|             if handle == PRIMARY_DOMAIN or handle in PROTOCOL_DOMAINS:
 | |
|                 return handle
 | |
| 
 | |
|             if util.domain_or_parent_in(flattened, ATPROTO_HANDLE_DOMAINS):
 | |
|                 output = flattened
 | |
|             else:
 | |
|                 output = flattened_user_at_domain().replace('@', '.')
 | |
| 
 | |
|         case _, 'nostr':
 | |
|             if handle == PRIMARY_DOMAIN or handle in PROTOCOL_DOMAINS:
 | |
|                 return f'_@{handle}'
 | |
| 
 | |
|             output = flattened_user_at_domain()
 | |
| 
 | |
|         case 'activitypub', 'web':
 | |
|             user, instance = handle.lstrip('@').split('@')
 | |
|             # TODO: get this from the actor object's url field?
 | |
|             output = (f'https://{user}' if user == instance
 | |
|                     else f'https://{instance}/@{user}')
 | |
| 
 | |
|         case _, 'web':
 | |
|             output = handle
 | |
| 
 | |
|         # only for unit tests
 | |
|         case _, 'fake' | 'other' | 'efake':
 | |
|             output = f'{to.LABEL}:handle:{handle}'
 | |
| 
 | |
|     assert output, (handle, from_.LABEL, to.LABEL)
 | |
|     # don't check Web handles because they're sometimes URLs, eg
 | |
|     # @user@instance => https://instance/@user
 | |
|     if to.LABEL != 'web' and to.owns_handle(output, allow_internal=True) is False:
 | |
|         raise ValueError(f"handle {handle} translated to {to.PHRASE} is {output}, which isn't supported there")
 | |
| 
 | |
|     return output
 | |
| 
 | |
| 
 | |
| def translate_object_id(*, id, from_, to):
 | |
|     """Translates a user handle from one protocol to another.
 | |
| 
 | |
|     *NOTE*: unlike :func:`translate_user_id`, if ``to`` is a ``HAS_COPIES`` protocol
 | |
|     and has no copy object for ``id``, this function returns ``id``, not None!
 | |
| 
 | |
|     TODO: unify with :func:`translate_user_id`.
 | |
| 
 | |
|     Args:
 | |
|       id (str)
 | |
|       from_ (protocol.Protocol)
 | |
|       to (protocol.Protocol)
 | |
| 
 | |
|     Returns:
 | |
|       str: the corresponding id in ``to``
 | |
|     """
 | |
|     id, from_, to = validate(id, from_, to)
 | |
|     assert from_.owns_id(id) is not False or from_.LABEL == 'ui', (from_.LABEL, id)
 | |
| 
 | |
|     # bsky.app profile URL to DID
 | |
|     if to.LABEL == 'atproto':
 | |
|         if match := BSKY_APP_URL_RE.match(id):
 | |
|             repo = match.group('id')
 | |
|             handle = None
 | |
|             if not repo.startswith('did:'):
 | |
|                 handle = repo
 | |
|                 from atproto import ATProto
 | |
|                 try:
 | |
|                     repo = ATProto.handle_to_id(repo)
 | |
|                 except (AssertionError, ValueError) as e:
 | |
|                     logger.warning(e)
 | |
|                     return None
 | |
| 
 | |
|             return web_url_to_at_uri(id, handle=handle, did=repo)
 | |
| 
 | |
|     if from_ == to:
 | |
|         return id
 | |
| 
 | |
|     if to.LABEL in COPIES_PROTOCOLS:
 | |
|         if obj := from_.load(id, remote=False):
 | |
|             if copy := obj.get_copy(to):
 | |
|                 return copy
 | |
| 
 | |
|     if from_.LABEL in COPIES_PROTOCOLS:
 | |
|         if orig := models.get_original_object_key(id):
 | |
|             return orig.id()
 | |
| 
 | |
|     match from_.LABEL, to.LABEL:
 | |
|         case _, 'atproto' | 'nostr':
 | |
|             logger.debug(f"Can't translate object id {id} to {to.LABEL} , haven't copied it there yet!")
 | |
|             return id
 | |
| 
 | |
|         case 'web', 'activitypub':
 | |
|             return urljoin(web_ap_base_domain(util.domain_from_link(id)), f'/r/{id}')
 | |
| 
 | |
|         case _, 'activitypub' | 'web':
 | |
|             return subdomain_wrap(from_, f'/convert/{to.ABBREV}/{id}')
 | |
| 
 | |
|         # only for unit tests
 | |
|         case _, 'fake' | 'other' | 'efake':
 | |
|             return f'{to.LABEL}:o:{from_.ABBREV}:{id}'
 | |
| 
 | |
|     assert False, (id, from_.LABEL, to.LABEL)
 | |
| 
 | |
| 
 | |
| def handle_as_domain(handle):
 | |
|     """Converts a handle to domain-like format.
 | |
| 
 | |
|     Converts handle to domain format by removing leading @ and replacing
 | |
|     @ with ., and replacing certain characters (_ ~ :) with -.
 | |
| 
 | |
|     For example:
 | |
|     * ``@user@instance.com`` => ``user.instance.com``
 | |
|     * ``user_name@instance.com`` => ``user-name.instance.com``
 | |
|     * ``@alice@inst~test.com`` => ``alice.inst-test.com``
 | |
| 
 | |
|     Args:
 | |
|       handle (str or None)
 | |
| 
 | |
|     Returns:
 | |
|       str or None: if handle is None
 | |
|     """
 | |
|     if not handle:
 | |
|         return None
 | |
| 
 | |
|     flattened = handle.lower().lstrip('@').replace('@', '.')
 | |
|     for char in DASH_CHARS:
 | |
|         flattened = flattened.replace(char, '-')
 | |
| 
 | |
|     return flattened
 |