add task queue for delivering AP Creates and Updates to followers

fixes #335
pull/354/head
Ryan Barrett 2023-01-04 20:48:39 -08:00
rodzic fe5c3947a8
commit 512737cc99
Nie znaleziono w bazie danych klucza dla tego podpisu
ID klucza GPG: 6BE31FDF4776E9D4
3 zmienionych plików z 70 dodań i 22 usunięć

Wyświetl plik

@ -29,6 +29,7 @@ google-cloud-core==2.3.2
google-cloud-datastore==2.11.0
google-cloud-logging==3.4.0
google-cloud-ndb==2.1.0
google-cloud-tasks==2.11.0
googleapis-common-protos==1.57.0
grpc-google-iam-v1==0.12.4
grpcio==1.51.1

Wyświetl plik

@ -11,6 +11,8 @@ import feedparser
from granary import as2, atom, microformats2
from httpsig.sign import HeaderSigner
from oauth_dropins.webutil import util
from oauth_dropins.webutil.appengine_config import tasks_client
from oauth_dropins.webutil.appengine_info import APP_ID
from oauth_dropins.webutil.testutil import requests_response
from oauth_dropins.webutil.util import json_dumps, json_loads
import requests
@ -27,6 +29,7 @@ from common import (
)
from models import Follower, User, Activity
import webmention
from webmention import TASKS_LOCATION
from . import testutil
REPOST_HTML = """\
@ -345,7 +348,7 @@ class WebmentionTest(testutil.TestCase):
</body>
</html>""", content_type=CONTENT_TYPE_HTML)
got = self.client.post('/webmention', data={
got = self.client.post('/_ah/queue/webmention', data={
'source': 'http://a/post',
'target': 'https://fed.brid.gy/',
})
@ -658,7 +661,28 @@ class WebmentionTest(testutil.TestCase):
{'type': 'Image', 'url': 'http://orig/pic'}
self.assert_equals(repost_as2, json_loads(kwargs['data']))
def test_activitypub_create_post(self, mock_get, mock_post):
@mock.patch('oauth_dropins.webutil.appengine_config.tasks_client.create_task')
def test_activitypub_create_post_make_task(self, mock_create_task, mock_get, _):
mock_get.side_effect = [self.create, self.actor]
got = self.client.post('/webmention', data={
'source': 'http://orig/post',
'target': 'https://fed.brid.gy/',
})
self.assertEqual(202, got.status_code)
mock_create_task.assert_called_with(
parent=f'projects/{APP_ID}/locations/{TASKS_LOCATION}/queues/webmention',
task={
'app_engine_http_request': {
'http_method': 'POST',
'relative_uri': '/_ah/queue/webmention',
'body': urlencode({'source': 'http://orig/post'}).encode(),
'headers': {'Content-Type': 'application/x-www-form-urlencoded'},
},
},
)
def test_activitypub_create_post_run_task(self, mock_get, mock_post):
mock_get.side_effect = [self.create, self.actor]
mock_post.return_value = requests_response('abc xyz')
@ -708,7 +732,7 @@ class WebmentionTest(testutil.TestCase):
'inbox': 'https://inbox',
}}))
got = self.client.post('/webmention', data={
got = self.client.post('/_ah/queue/webmention', data={
'source': 'http://orig/post',
'target': 'https://fed.brid.gy/',
})
@ -751,7 +775,7 @@ class WebmentionTest(testutil.TestCase):
'orig', 'https://mastodon/aaa',
last_follow=json_dumps({'actor': {'inbox': 'https://inbox'}}))
got = self.client.post('/webmention', data={
got = self.client.post('/_ah/queue/webmention', data={
'source': 'http://orig/post',
'target': 'https://fed.brid.gy/',
})

Wyświetl plik

@ -14,6 +14,8 @@ from google.cloud.ndb import Key
from granary import as1, as2, atom, microformats2
import mf2util
from oauth_dropins.webutil import flask_util, util
from oauth_dropins.webutil.appengine_config import tasks_client
from oauth_dropins.webutil.appengine_info import APP_ID
from oauth_dropins.webutil.flask_util import error
from oauth_dropins.webutil.util import json_dumps, json_loads
import requests
@ -28,9 +30,14 @@ logger = logging.getLogger(__name__)
SKIP_EMAIL_DOMAINS = frozenset(('localhost', 'snarfed.org'))
# https://cloud.google.com/appengine/docs/locations
TASKS_LOCATION = 'us-central1'
class Webmention(View):
"""Handles inbound webmention, converts to ActivityPub."""
IS_TASK = False
source_url = None # string
source_domain = None # string
source_mf2 = None # parsed mf2 dict
@ -88,7 +95,7 @@ class Webmention(View):
self.user = User.get_or_create(self.source_domain)
ret = self.try_activitypub()
return ret or 'No action taken'
return ret or 'No ActivityPub targets'
def try_activitypub(self):
"""Attempts ActivityPub delivery.
@ -159,31 +166,40 @@ class Webmention(View):
else:
return str(error)
def _targets(self):
"""
Returns: list of string URLs, the source's inReplyTos or objects
(if appropriate)
"""
targets = util.get_urls(self.source_obj, 'inReplyTo')
if targets:
logger.info(f'targets from inReplyTo: {targets}')
return targets
if self.source_obj.get('verb') in as1.VERBS_WITH_OBJECT:
targets = util.get_urls(self.source_obj, 'object')
logger.info(f'targets from object: {targets}')
return targets
def _activitypub_targets(self):
"""
Returns: list of (Activity, string inbox URL)
"""
# if there's in-reply-to, like-of, or repost-of, they're the targets.
# otherwise, it's all followers' inboxes.
targets = self._targets()
targets = util.get_urls(self.source_obj, 'inReplyTo')
if targets:
logger.info(f'targets from inReplyTo: {targets}')
elif self.source_obj.get('verb') in as1.VERBS_WITH_OBJECT:
targets = util.get_urls(self.source_obj, 'object')
logger.info(f'targets from object: {targets}')
if not targets:
# interpret this as a Create or Update, deliver it to followers
# interpret this as a Create or Update, deliver it to followers. use
# task queue since we send to each inbox in serial, which can take a
# long time with many followers/instances.
if not self.IS_TASK:
queue_path= tasks_client.queue_path(APP_ID, TASKS_LOCATION, 'webmention')
tasks_client.create_task(
parent=queue_path,
task={
'app_engine_http_request': {
'http_method': 'POST',
'relative_uri': '/_ah/queue/webmention',
'body': urlencode({'source': self.source_url}).encode(),
# https://googleapis.dev/python/cloudtasks/latest/gapic/v2/types.html#google.cloud.tasks_v2.types.AppEngineHttpRequest.headers
'headers': {'Content-Type': 'application/x-www-form-urlencoded'},
},
},
)
# not actually an error
error('Delivering to followers in the background', status=202)
inboxes = set()
for follower in Follower.query().filter(
Follower.key > Key('Follower', self.source_domain + ' '),
@ -259,5 +275,12 @@ class Webmention(View):
return activities_and_inbox_urls
class WebmentionTask(Webmention):
IS_TASK = True
app.add_url_rule('/webmention', view_func=Webmention.as_view('webmention'),
methods=['POST'])
app.add_url_rule('/_ah/queue/webmention',
view_func=WebmentionTask.as_view('webmention-task'),
methods=['POST'])