kopia lustrzana https://github.com/snarfed/bridgy-fed
Activity => Object: use StructuredProperty for Object.delivered etc, add protocol
#286 ndb implements StructuredProperty by hoisting each nested property into a corresponding property on the parent entity, prefixed by the StructuredProperty's name, eg delivered.uri, delivered.protocol, etc. For repeated StructuredPropertys, the hoisted properties are all repeated on the parent entity, and reconstructed into StructuredPropertys based on their order. https://googleapis.dev/python/python-ndb/latest/model.html#google.cloud.ndb.model.StructuredPropertyactivity-redesign
rodzic
f9891b6ef7
commit
532ccb8ac1
|
@ -15,7 +15,7 @@ from oauth_dropins.webutil.util import json_dumps, json_loads
|
|||
from app import app, cache
|
||||
import common
|
||||
from common import CACHE_TIME, redirect_unwrap, redirect_wrap
|
||||
from models import Follower, Object, User
|
||||
from models import Follower, Object, Target, User
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
|
19
common.py
19
common.py
|
@ -22,7 +22,7 @@ from oauth_dropins.webutil.util import json_dumps, json_loads
|
|||
import requests
|
||||
from werkzeug.exceptions import BadGateway
|
||||
|
||||
from models import Follower, Object, User
|
||||
from models import Follower, Object, Target, User
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
@ -295,6 +295,7 @@ def send_webmentions(activity_wrapped, proxy=None, **object_props):
|
|||
# send webmentions and store Objects
|
||||
errors = [] # stores (code, body) tuples
|
||||
domains = []
|
||||
targets = [Target(uri=uri, protocol='activitypub') for uri in targets]
|
||||
|
||||
obj = Object(id=source, labels=['notification'], undelivered=targets, **object_props)
|
||||
if activity.get('objectType') == 'activity':
|
||||
|
@ -302,21 +303,22 @@ def send_webmentions(activity_wrapped, proxy=None, **object_props):
|
|||
obj.put()
|
||||
|
||||
for target in targets:
|
||||
domain = util.domain_from_link(target, minimize=False)
|
||||
if (domain == util.domain_from_link(source, minimize=False)):
|
||||
logger.info(f'Skipping same-domain webmention from {source} to {target}')
|
||||
domain = util.domain_from_link(target.uri, minimize=False)
|
||||
if domain == util.domain_from_link(source, minimize=False):
|
||||
logger.info(f'Skipping same-domain webmention from {source} to {target.uri}')
|
||||
continue
|
||||
|
||||
domains.append(domain)
|
||||
if domain not in obj.domains:
|
||||
obj.domains.append(domain)
|
||||
wm_source = (obj.proxy_url()
|
||||
if verb in ('follow', 'like', 'share') or proxy
|
||||
else source)
|
||||
logger.info(f'Sending webmention from {wm_source} to {target}')
|
||||
logger.info(f'Sending webmention from {wm_source} to {target.uri}')
|
||||
|
||||
try:
|
||||
endpoint = webmention.discover(target).endpoint
|
||||
endpoint = webmention.discover(target.uri).endpoint
|
||||
if endpoint:
|
||||
webmention.send(endpoint, wm_source, target)
|
||||
webmention.send(endpoint, wm_source, target.uri)
|
||||
logger.info('Success!')
|
||||
obj.delivered.append(target)
|
||||
else:
|
||||
|
@ -332,7 +334,6 @@ def send_webmentions(activity_wrapped, proxy=None, **object_props):
|
|||
obj.put()
|
||||
|
||||
obj.status = 'complete' if obj.delivered else 'failed' if obj.failed else 'ignored'
|
||||
obj.domains = domains
|
||||
obj.put()
|
||||
|
||||
if errors:
|
||||
|
|
30
models.py
30
models.py
|
@ -23,7 +23,7 @@ import common
|
|||
WWW_DOMAINS = frozenset((
|
||||
'www.jvt.me',
|
||||
))
|
||||
|
||||
PROTOCOLS = ('activitypub', 'bluesky', 'ostatus', 'webmention', 'ui')
|
||||
KEY_BITS = 2048
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
@ -227,13 +227,32 @@ class User(StringIdModel):
|
|||
return self
|
||||
|
||||
|
||||
class Target(ndb.Model):
|
||||
"""Delivery destinations. ActivityPub inboxes, webmention targets, etc.
|
||||
|
||||
Used in StructuredPropertys inside Object; not stored directly in the
|
||||
datastore.
|
||||
|
||||
ndb implements this by hoisting each property here into a corresponding
|
||||
property on the parent entity, prefixed by the StructuredProperty name
|
||||
below, eg delivered.uri, delivered.protocol, etc.
|
||||
|
||||
For repeated StructuredPropertys, the hoisted properties are all
|
||||
repeated on the parent entity, and reconstructed into
|
||||
StructuredPropertys based on their order.
|
||||
|
||||
https://googleapis.dev/python/python-ndb/latest/model.html#google.cloud.ndb.model.StructuredProperty
|
||||
"""
|
||||
uri = ndb.StringProperty(required=True)
|
||||
protocol = ndb.StringProperty(choices=PROTOCOLS, required=True)
|
||||
|
||||
|
||||
class Object(StringIdModel):
|
||||
"""An activity or other object, eg actor.
|
||||
|
||||
Key name is the id. We synthesize ids if necessary.
|
||||
"""
|
||||
STATUSES = ('new', 'in progress', 'complete', 'failed', 'ignored')
|
||||
PROTOCOLS = ('activitypub', 'bluesky', 'ostatus', 'webmention', 'ui')
|
||||
LABELS = ('activity', 'feed', 'notification', 'user')
|
||||
|
||||
# domains of the Bridgy Fed users this activity is to or from
|
||||
|
@ -254,10 +273,9 @@ class Object(StringIdModel):
|
|||
deleted = ndb.BooleanProperty()
|
||||
object_ids = ndb.StringProperty(repeated=True) # id(s) of inner objects
|
||||
|
||||
# URLs we deliver(ed) this to. ActivityPub inboxes, webmention targets, etc.
|
||||
delivered = ndb.StringProperty(repeated=True)
|
||||
undelivered = ndb.StringProperty(repeated=True)
|
||||
failed = ndb.StringProperty(repeated=True)
|
||||
delivered = ndb.StructuredProperty(Target, repeated=True)
|
||||
undelivered = ndb.StructuredProperty(Target, repeated=True)
|
||||
failed = ndb.StructuredProperty(Target, repeated=True)
|
||||
|
||||
created = ndb.DateTimeProperty(auto_now_add=True)
|
||||
updated = ndb.DateTimeProperty(auto_now=True)
|
||||
|
|
|
@ -1,8 +1,5 @@
|
|||
# coding=utf-8
|
||||
"""Unit tests for activitypub.py.
|
||||
|
||||
TODO: test error handling
|
||||
"""
|
||||
"""Unit tests for activitypub.py."""
|
||||
import copy
|
||||
from datetime import datetime, timedelta
|
||||
from unittest.mock import ANY, call, patch
|
||||
|
@ -16,7 +13,7 @@ from urllib3.exceptions import ReadTimeoutError
|
|||
|
||||
import activitypub
|
||||
import common
|
||||
from models import Follower, Object, User
|
||||
from models import Follower, Object, Target, User
|
||||
from . import testutil
|
||||
|
||||
REPLY_OBJECT = {
|
||||
|
|
|
@ -1,8 +1,5 @@
|
|||
# coding=utf-8
|
||||
"""Unit tests for webmention.py.
|
||||
|
||||
TODO: test error handling
|
||||
"""
|
||||
"""Unit tests for webmention.py."""
|
||||
import copy
|
||||
from unittest import mock
|
||||
from urllib.parse import urlencode
|
||||
|
@ -24,7 +21,7 @@ from common import (
|
|||
default_signature_user,
|
||||
redirect_unwrap,
|
||||
)
|
||||
from models import Follower, Object, User
|
||||
from models import Follower, Object, Target, User
|
||||
import webmention
|
||||
from webmention import TASKS_LOCATION
|
||||
from . import testutil
|
||||
|
@ -537,7 +534,8 @@ class WebmentionTest(testutil.TestCase):
|
|||
"""https://github.com/snarfed/bridgy-fed/issues/78"""
|
||||
Object(id='http://a/reply', status='complete',
|
||||
as1=json_dumps(self.reply_as1),
|
||||
delivered=['https://foo.com/inbox']).put()
|
||||
delivered=[Target(uri='https://foo.com/inbox', protocol='activitypub')]
|
||||
).put()
|
||||
mock_get.side_effect = self.activitypub_gets
|
||||
|
||||
got = self.client.post('/webmention', data={
|
||||
|
@ -783,9 +781,9 @@ class WebmentionTest(testutil.TestCase):
|
|||
|
||||
Object(id='https://orig/post', domains=['orig'], status='in progress',
|
||||
as1=json_dumps(self.create_as1),
|
||||
delivered=['https://skipped/inbox'],
|
||||
undelivered=['https://shared/inbox'],
|
||||
failed=['https://public/inbox'],
|
||||
delivered=[Target(uri='https://skipped/inbox', protocol='activitypub')],
|
||||
undelivered=[Target(uri='https://shared/inbox', protocol='activitypub')],
|
||||
failed=[Target(uri='https://public/inbox', protocol='activitypub')],
|
||||
).put()
|
||||
|
||||
self.make_followers()
|
||||
|
@ -828,9 +826,9 @@ class WebmentionTest(testutil.TestCase):
|
|||
|
||||
Object(id='https://orig/post', domains=['orig'], status='in progress',
|
||||
as1=json_dumps(different_create_as1),
|
||||
delivered=['https://delivered/inbox'],
|
||||
undelivered=['https://shared/inbox'],
|
||||
failed=['https://public/inbox'],
|
||||
delivered=[Target(uri='https://delivered/inbox', protocol='activitypub')],
|
||||
undelivered=[Target(uri='https://shared/inbox', protocol='activitypub')],
|
||||
failed=[Target(uri='https://public/inbox', protocol='activitypub')],
|
||||
).put()
|
||||
|
||||
self.make_followers()
|
||||
|
|
|
@ -14,7 +14,7 @@ import requests
|
|||
|
||||
from app import app, cache
|
||||
import common
|
||||
from models import Object
|
||||
from models import Object, Target
|
||||
|
||||
|
||||
class TestCase(unittest.TestCase, testutil.Asserts):
|
||||
|
@ -72,6 +72,12 @@ class TestCase(unittest.TestCase, testutil.Asserts):
|
|||
got = Object.get_by_id(id)
|
||||
assert got, id
|
||||
|
||||
# right now we only do ActivityPub
|
||||
# TODO: revisit as soon as we launch the next protocol, eg bluesky
|
||||
for field in 'delivered', 'undelivered', 'failed':
|
||||
props[field] = [Target(uri=uri, protocol='activitypub')
|
||||
for uri in props.get(field, [])]
|
||||
|
||||
# sort keys in JSON properties
|
||||
for prop in 'as1', 'as2', 'bsky', 'mf2':
|
||||
if prop in props:
|
||||
|
|
|
@ -24,7 +24,7 @@ from werkzeug.exceptions import BadGateway, HTTPException
|
|||
import activitypub
|
||||
from app import app
|
||||
import common
|
||||
from models import Follower, Object, User
|
||||
from models import Follower, Object, Target, User
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
@ -141,11 +141,12 @@ class Webmention(View):
|
|||
if obj:
|
||||
logging.info(f'Resuming existing {obj}')
|
||||
obj.failed = []
|
||||
seen = obj.delivered + obj.undelivered + obj.failed
|
||||
seen = [t.uri for t in obj.delivered + obj.undelivered + obj.failed]
|
||||
new_inboxes = [i for i in inboxes_to_targets.keys() if i not in seen]
|
||||
if new_inboxes:
|
||||
logging.info(f'Adding new inboxes: {new_inboxes}')
|
||||
obj.undelivered += new_inboxes
|
||||
obj.undelivered += [Target(uri=uri, protocol='activitypub')
|
||||
for uri in new_inboxes]
|
||||
if type in ('note', 'article', 'comment'):
|
||||
changed = as1.activity_changed(json_loads(obj.as1), self.source_as1)
|
||||
if changed:
|
||||
|
@ -154,8 +155,9 @@ class Webmention(View):
|
|||
logger.info(f'Content has changed from last time at {obj.updated}! Redelivering to all inboxes: {obj.undelivered}')
|
||||
|
||||
else:
|
||||
obj = Object(id=self.source_url, undelivered=list(inboxes_to_targets.keys()),
|
||||
delivered=[], failed=[])
|
||||
obj = Object(id=self.source_url, delivered=[], failed=[],
|
||||
undelivered=[Target(uri=uri, protocol='activitypub')
|
||||
for uri in inboxes_to_targets.keys()])
|
||||
logging.info(f'Storing new {obj}')
|
||||
|
||||
obj.domains = [self.source_domain]
|
||||
|
@ -172,8 +174,9 @@ class Webmention(View):
|
|||
# TODO: collect by inbox, add 'to' fields, de-dupe inboxes and recipients
|
||||
#
|
||||
# make copy of undelivered because we modify it below
|
||||
logger.info(f'Delivering to inboxes: {sorted(obj.undelivered)}')
|
||||
for inbox in list(obj.undelivered):
|
||||
logger.info(f'Delivering to inboxes: {sorted(t.uri for t in obj.undelivered)}')
|
||||
for target in list(obj.undelivered):
|
||||
inbox = target.uri
|
||||
if inbox in inboxes_to_targets:
|
||||
target_as2 = inboxes_to_targets[inbox]
|
||||
else:
|
||||
|
@ -208,18 +211,23 @@ class Webmention(View):
|
|||
try:
|
||||
last = common.signed_post(inbox, data=self.source_as2,
|
||||
log_data=log_data, user=self.user)
|
||||
obj.delivered.append(inbox)
|
||||
obj.delivered.append(target)
|
||||
last_success = last
|
||||
except BaseException as e:
|
||||
obj.failed.append(inbox)
|
||||
code, body = util.interpret_http_exception(e)
|
||||
if not code and not body:
|
||||
raise
|
||||
obj.failed.append(target)
|
||||
error = e
|
||||
finally:
|
||||
log_data = False
|
||||
|
||||
obj.undelivered.remove(inbox)
|
||||
obj.undelivered.remove(target)
|
||||
obj.put()
|
||||
|
||||
obj.status = 'complete' if obj.delivered else 'failed'
|
||||
obj.status = ('complete' if obj.delivered
|
||||
else 'failed' if obj.failed
|
||||
else 'ignored')
|
||||
obj.put()
|
||||
|
||||
# Pass the AP response status code and body through as our response
|
||||
|
|
Ładowanie…
Reference in New Issue