kopia lustrzana https://github.com/snarfed/bridgy-fed
add Protocol.owns_handle (and in subclasses)
rodzic
6a6a1657a7
commit
0d33b6422d
|
@ -122,6 +122,18 @@ class ActivityPub(User, Protocol):
|
||||||
|
|
||||||
return False
|
return False
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def owns_handle(cls, handle):
|
||||||
|
"""Returns True if handle is a WebFinger @-@, False otherwise.
|
||||||
|
|
||||||
|
Example: ``@user@instance.com``. The leading ``@`` is optional.
|
||||||
|
|
||||||
|
https://datatracker.ietf.org/doc/html/rfc7033#section-3.1
|
||||||
|
https://datatracker.ietf.org/doc/html/rfc7033#section-4.5
|
||||||
|
"""
|
||||||
|
parts = handle.lstrip('@').split('@')
|
||||||
|
return len(parts) == 2 and parts[0] and parts[1]
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def target_for(cls, obj, shared=False):
|
def target_for(cls, obj, shared=False):
|
||||||
"""Returns `obj`'s or its author's/actor's inbox, if available."""
|
"""Returns `obj`'s or its author's/actor's inbox, if available."""
|
||||||
|
|
|
@ -23,6 +23,7 @@ import common
|
||||||
from common import (
|
from common import (
|
||||||
add,
|
add,
|
||||||
DOMAIN_BLOCKLIST,
|
DOMAIN_BLOCKLIST,
|
||||||
|
DOMAIN_RE,
|
||||||
error,
|
error,
|
||||||
USER_AGENT,
|
USER_AGENT,
|
||||||
)
|
)
|
||||||
|
@ -93,6 +94,11 @@ class ATProto(User, Protocol):
|
||||||
or id.startswith('did:web:')
|
or id.startswith('did:web:')
|
||||||
or id.startswith('https://bsky.app/'))
|
or id.startswith('https://bsky.app/'))
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def owns_handle(cls, handle):
|
||||||
|
if not re.match(DOMAIN_RE, handle):
|
||||||
|
return False
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def target_for(cls, obj, shared=False):
|
def target_for(cls, obj, shared=False):
|
||||||
"""Returns the PDS URL for the given object, or None.
|
"""Returns the PDS URL for the given object, or None.
|
||||||
|
|
5
ids.py
5
ids.py
|
@ -64,10 +64,7 @@ def convert_handle(*, handle, from_proto, to_proto):
|
||||||
"""
|
"""
|
||||||
assert handle and from_proto and to_proto
|
assert handle and from_proto and to_proto
|
||||||
assert from_proto != to_proto
|
assert from_proto != to_proto
|
||||||
|
assert from_proto.owns_handle(handle) is not False
|
||||||
if from_proto in (Web, ATProto):
|
|
||||||
# Web, ATProto, Nostr handles are all domains
|
|
||||||
assert re.match(DOMAIN_RE, handle)
|
|
||||||
|
|
||||||
match (from_proto.LABEL, to_proto.LABEL):
|
match (from_proto.LABEL, to_proto.LABEL):
|
||||||
case (_, 'activitypub'):
|
case (_, 'activitypub'):
|
||||||
|
|
27
protocol.py
27
protocol.py
|
@ -138,10 +138,33 @@ class Protocol:
|
||||||
Returns False if the id's domain is in :attr:`common.DOMAIN_BLOCKLIST`.
|
Returns False if the id's domain is in :attr:`common.DOMAIN_BLOCKLIST`.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
id: str
|
id (str)
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
boolean or None
|
bool or None
|
||||||
|
"""
|
||||||
|
return False
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def owns_handle(cls, handle):
|
||||||
|
"""Returns whether this protocol owns the handle, or None if it's unclear.
|
||||||
|
|
||||||
|
To be implemented by subclasses.
|
||||||
|
|
||||||
|
Some protocols' handles are more or less deterministic based on the id
|
||||||
|
format, eg ActivityPub (technically WebFinger) handles are
|
||||||
|
``@user@instance.com``. Others, like domains, could be owned by eg Web,
|
||||||
|
ActivityPub, AT Protocol, or others.
|
||||||
|
|
||||||
|
This should be a quick guess without expensive side effects, eg no
|
||||||
|
external HTTP fetches to fetch the id itself or otherwise perform
|
||||||
|
discovery.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
handle (str)
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
bool or None
|
||||||
"""
|
"""
|
||||||
return False
|
return False
|
||||||
|
|
||||||
|
|
|
@ -1497,6 +1497,17 @@ class ActivityPubUtilsTest(TestCase):
|
||||||
self.assertFalse(ActivityPub.owns_id('https://twitter.com/foo'))
|
self.assertFalse(ActivityPub.owns_id('https://twitter.com/foo'))
|
||||||
self.assertFalse(ActivityPub.owns_id('https://fed.brid.gy/foo'))
|
self.assertFalse(ActivityPub.owns_id('https://fed.brid.gy/foo'))
|
||||||
|
|
||||||
|
def test_owns_handle(self):
|
||||||
|
for handle in ('@user@instance', 'user@instance.com', 'user.com@instance.com',
|
||||||
|
'user@instance'):
|
||||||
|
with self.subTest(handle=handle):
|
||||||
|
assert ActivityPub.owns_handle(handle)
|
||||||
|
|
||||||
|
for handle in ('instance', 'instance.com', '@user', '@user.com',
|
||||||
|
'http://user.com'):
|
||||||
|
with self.subTest(handle=handle):
|
||||||
|
self.assertFalse(ActivityPub.owns_handle(handle))
|
||||||
|
|
||||||
def test_postprocess_as2_multiple_in_reply_tos(self):
|
def test_postprocess_as2_multiple_in_reply_tos(self):
|
||||||
self.assert_equals({
|
self.assert_equals({
|
||||||
'id': 'http://localhost/r/xyz',
|
'id': 'http://localhost/r/xyz',
|
||||||
|
|
|
@ -90,6 +90,16 @@ class ATProtoTest(TestCase):
|
||||||
self.assertTrue(ATProto.owns_id(
|
self.assertTrue(ATProto.owns_id(
|
||||||
'https://bsky.app/profile/snarfed.org/post/3k62u4ht77f2z'))
|
'https://bsky.app/profile/snarfed.org/post/3k62u4ht77f2z'))
|
||||||
|
|
||||||
|
def test_owns_handle(self):
|
||||||
|
self.assertIsNone(ATProto.owns_handle('foo.com'))
|
||||||
|
self.assertIsNone(ATProto.owns_handle('foo.bar.com'))
|
||||||
|
|
||||||
|
self.assertFalse(ATProto.owns_handle('foo'))
|
||||||
|
self.assertFalse(ATProto.owns_handle('@foo'))
|
||||||
|
self.assertFalse(ATProto.owns_handle('@foo.com'))
|
||||||
|
self.assertFalse(ATProto.owns_handle('@foo@bar.com'))
|
||||||
|
self.assertFalse(ATProto.owns_handle('foo@bar.com'))
|
||||||
|
|
||||||
def test_target_for_did_doc(self):
|
def test_target_for_did_doc(self):
|
||||||
self.assertIsNone(ATProto.target_for(Object(id='did:plc:foo')))
|
self.assertIsNone(ATProto.target_for(Object(id='did:plc:foo')))
|
||||||
|
|
||||||
|
|
|
@ -1900,6 +1900,16 @@ class WebUtilTest(TestCase):
|
||||||
self.assertFalse(Web.owns_id('https://twitter.com/foo'))
|
self.assertFalse(Web.owns_id('https://twitter.com/foo'))
|
||||||
self.assertFalse(Web.owns_id('https://fed.brid.gy/foo'))
|
self.assertFalse(Web.owns_id('https://fed.brid.gy/foo'))
|
||||||
|
|
||||||
|
def test_owns_handle(self, *_):
|
||||||
|
self.assertIsNone(Web.owns_handle('foo.com'))
|
||||||
|
self.assertIsNone(Web.owns_handle('foo.bar.com'))
|
||||||
|
|
||||||
|
self.assertFalse(Web.owns_handle('foo'))
|
||||||
|
self.assertFalse(Web.owns_handle('@foo'))
|
||||||
|
self.assertFalse(Web.owns_handle('@foo.com'))
|
||||||
|
self.assertFalse(Web.owns_handle('@foo@bar.com'))
|
||||||
|
self.assertFalse(Web.owns_handle('foo@bar.com'))
|
||||||
|
|
||||||
def test_fetch(self, mock_get, __):
|
def test_fetch(self, mock_get, __):
|
||||||
mock_get.return_value = REPOST
|
mock_get.return_value = REPOST
|
||||||
|
|
||||||
|
|
|
@ -89,6 +89,8 @@ class Fake(User, protocol.Protocol):
|
||||||
|
|
||||||
return id.startswith('fake:') or id in cls.fetchable
|
return id.startswith('fake:') or id in cls.fetchable
|
||||||
|
|
||||||
|
owns_handle = owns_id
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def is_blocklisted(cls, url):
|
def is_blocklisted(cls, url):
|
||||||
return url.startswith('fake:blocklisted')
|
return url.startswith('fake:blocklisted')
|
||||||
|
|
11
web.py
11
web.py
|
@ -21,7 +21,7 @@ from requests import HTTPError, RequestException
|
||||||
from werkzeug.exceptions import BadGateway, BadRequest, HTTPException, NotFound
|
from werkzeug.exceptions import BadGateway, BadRequest, HTTPException, NotFound
|
||||||
|
|
||||||
import common
|
import common
|
||||||
from common import add
|
from common import add, DOMAIN_RE
|
||||||
from flask_app import app, cache
|
from flask_app import app, cache
|
||||||
from models import Follower, Object, PROTOCOLS, Target, User
|
from models import Follower, Object, PROTOCOLS, Target, User
|
||||||
from protocol import Protocol
|
from protocol import Protocol
|
||||||
|
@ -67,7 +67,7 @@ class Web(User, Protocol):
|
||||||
"""Validate domain id, don't allow upper case or invalid characters."""
|
"""Validate domain id, don't allow upper case or invalid characters."""
|
||||||
super()._pre_put_hook()
|
super()._pre_put_hook()
|
||||||
id = self.key.id()
|
id = self.key.id()
|
||||||
assert re.match(common.DOMAIN_RE, id)
|
assert re.match(DOMAIN_RE, id)
|
||||||
assert id.lower() == id, f'upper case is not allowed in Web key id: {id}'
|
assert id.lower() == id, f'upper case is not allowed in Web key id: {id}'
|
||||||
assert not self.is_blocklisted(id), f'{id} is a blocked domain'
|
assert not self.is_blocklisted(id), f'{id} is a blocked domain'
|
||||||
|
|
||||||
|
@ -234,7 +234,7 @@ class Web(User, Protocol):
|
||||||
if parsed.path in ('', '/'):
|
if parsed.path in ('', '/'):
|
||||||
id = parsed.netloc
|
id = parsed.netloc
|
||||||
|
|
||||||
if re.match(common.DOMAIN_RE, id):
|
if re.match(DOMAIN_RE, id):
|
||||||
tld = id.split('.')[-1]
|
tld = id.split('.')[-1]
|
||||||
if tld in NON_TLDS:
|
if tld in NON_TLDS:
|
||||||
logger.info(f"{id} looks like a domain but {tld} isn't a TLD")
|
logger.info(f"{id} looks like a domain but {tld} isn't a TLD")
|
||||||
|
@ -260,6 +260,11 @@ class Web(User, Protocol):
|
||||||
|
|
||||||
return None if util.is_web(id) else False
|
return None if util.is_web(id) else False
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def owns_handle(cls, handle):
|
||||||
|
if not re.match(DOMAIN_RE, handle):
|
||||||
|
return False
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def target_for(cls, obj, shared=False):
|
def target_for(cls, obj, shared=False):
|
||||||
"""Returns `obj`'s id, as a URL webmention target."""
|
"""Returns `obj`'s id, as a URL webmention target."""
|
||||||
|
|
Ładowanie…
Reference in New Issue