pass Object data in receive task HTTP payload instead of through datastore

trying to cut down datastore load and costs. this switches receive tasks from storing and fetching Objects from the datastore to passing their properties - notably as2, our_as1, bsky, etc - in the HTTP request body, as serialized JSON inside form-encoded params.

for #1354, #1149
pull/1355/head
Ryan Barrett 2024-10-01 21:44:12 -07:00
rodzic a0a0e0ced6
commit 788b37279a
Nie znaleziono w bazie danych klucza dla tego podpisu
ID klucza GPG: 6BE31FDF4776E9D4
12 zmienionych plików z 307 dodań i 245 usunięć

Wyświetl plik

@ -1110,12 +1110,6 @@ def inbox(protocol=None, id=None):
if not id:
id = f'{actor_id}#{type}-{object.get("id", "")}-{util.now().isoformat()}'
try:
obj = Object.get_or_create(id=id, as2=activity, authed_as=authed_as,
source_protocol=ActivityPub.LABEL)
except AssertionError as e:
error(f'Invalid activity, probably due to id: {e}', status=400)
# automatically bridge server aka instance actors
# https://codeberg.org/fediverse/fep/src/branch/main/fep/d556/fep-d556.md
if as2.is_server_actor(actor):
@ -1127,7 +1121,8 @@ def inbox(protocol=None, id=None):
if user and not user.existing:
logger.info(f'Automatically enabled AP server actor {actor_id} for {user.enabled_protocols}')
return create_task(queue='receive', obj=obj.key.urlsafe(), authed_as=authed_as)
return create_task(queue='receive', id=id, as2=activity,
source_protocol=ActivityPub.LABEL, authed_as=authed_as)
# protocol in subdomain

Wyświetl plik

@ -967,10 +967,8 @@ def poll_chat_task():
**bluesky.to_as1(log['message']),
'to': [bot.key.id()],
}
obj = Object(id=id, source_protocol='atproto', bsky=log['message'],
our_as1=msg_as1)
obj.put()
common.create_task(queue='receive', obj=obj.key.urlsafe(),
common.create_task(queue='receive', id=id, bsky=log['message'],
our_as1=msg_as1, source_protocol=ATProto.LABEL,
authed_as=sender)
# check if we've caught up yet

Wyświetl plik

@ -312,22 +312,22 @@ def handle(limit=None):
if type in ('app.bsky.actor.profile', 'app.bsky.feed.post')
else 'undo')
obj_id = f'{at_uri}#{verb}'
record_kwarg = {'our_as1': {
'objectType': 'activity',
'verb': verb,
'id': obj_id,
'actor': op.repo,
'object': at_uri,
}}
record_kwarg = {
'our_as1': {
'objectType': 'activity',
'verb': verb,
'id': obj_id,
'actor': op.repo,
'object': at_uri,
},
}
else:
logger.error(f'Unknown action {action} for {op.repo} {op.path}')
return
try:
obj = Object.get_or_create(id=obj_id, authed_as=op.repo, status='new',
users=[ATProto(id=op.repo).key],
source_protocol=ATProto.LABEL, **record_kwarg)
create_task(queue='receive', obj=obj.key.urlsafe(), authed_as=op.repo)
create_task(queue='receive', id=obj_id, source_protocol=ATProto.LABEL,
authed_as=op.repo, **record_kwarg)
# when running locally, comment out above and uncomment this
# logger.info(f'enqueuing receive task for {at_uri}')
except ContextError:

Wyświetl plik

@ -22,6 +22,7 @@ from oauth_dropins.webutil.appengine_config import error_reporting_client, tasks
from oauth_dropins.webutil import appengine_info
from oauth_dropins.webutil.appengine_info import DEBUG
from oauth_dropins.webutil import flask_util
from oauth_dropins.webutil.util import json_dumps
import pymemcache.client.base
from pymemcache.test.utils import MockMemcacheClient
@ -332,6 +333,9 @@ def create_task(queue, delay=None, **params):
assert queue
path = f'/queue/{queue}'
params = {k: json_dumps(v, sort_keys=True) if isinstance(v, dict) else v
for k, v in params.items()}
if RUN_TASKS_INLINE or appengine_info.LOCAL_SERVER:
logger.info(f'Running task inline: {queue} {params}')
from router import app
@ -344,12 +348,12 @@ def create_task(queue, delay=None, **params):
# .match(path, method='POST')
# return app.view_functions[endpoint](**args)
body = urllib.parse.urlencode(sorted(params.items()))
body = urllib.parse.urlencode(sorted(params.items())).encode()
task = {
'app_engine_http_request': {
'http_method': 'POST',
'relative_uri': path,
'body': body.encode(),
'body': body,
'headers': {'Content-Type': 'application/x-www-form-urlencoded'},
},
}

Wyświetl plik

@ -800,6 +800,8 @@ class Protocol:
if not id:
error('No id provided')
elif from_cls.owns_id(id) is False:
error(f'Protocol {from_cls.LABEL} does not own id {id}')
elif from_cls.is_blocklisted(id, allow_internal=internal):
error(f'Activity {id} is blocklisted')
# check that this activity is public. only do this for some activities,
@ -1637,8 +1639,10 @@ def receive_task():
Calls :meth:`Protocol.receive` with the form parameters.
Parameters:
obj (url-safe google.cloud.ndb.key.Key): :class:`models.Object` to handle
authed_as (str): passed to :meth:`Protocol.receive`
obj (url-safe google.cloud.ndb.key.Key): :class:`models.Object` to handle
*: If ``obj`` is unset, all other parameters are properties for a new
:class:`models.Object` to handle
TODO: migrate incoming webmentions to this. See how we did it for AP. The
difficulty is that parts of :meth:`protocol.Protocol.receive` depend on
@ -1647,16 +1651,24 @@ def receive_task():
:class:`web.Web`.
"""
form = request.form.to_dict()
logger.info(f'Params: {list(form.items())}')
obj = ndb.Key(urlsafe=form['obj']).get()
assert obj
obj.new = True
authed_as = form.get('authed_as')
logger.info(f'Params:\n' + '\n'.join(f'{k} = {v[:100]}' for k, v in form.items()))
authed_as = form.pop('authed_as', None)
internal = (authed_as == common.PRIMARY_DOMAIN
or authed_as in common.PROTOCOL_DOMAINS)
if obj_key := form.get('obj'):
obj = ndb.Key(urlsafe=obj_key).get()
else:
for json_prop in 'as2', 'bsky', 'mf2', 'our_as1', 'raw':
if val := form.get(json_prop):
form[json_prop] = json_loads(val)
obj = Object(**form)
assert obj
assert obj.source_protocol
obj.new = True
try:
return PROTOCOLS[obj.source_protocol].receive(obj=obj, authed_as=authed_as,
internal=internal)

Wyświetl plik

@ -571,9 +571,9 @@ class ActivityPubTest(TestCase):
def test_inbox_bad_id(self, *_):
user = self.make_user(ACTOR['id'], cls=ActivityPub, obj_as2=ACTOR)
resp = self.post('/ap/sharedInbox', json={**NOTE, 'id': 'abc123'})
self.assertEqual(400, resp.status_code)
self.assertEqual(299, resp.status_code)
self.assertIsNone(Object.get_by_id('abc123'))
@patch('oauth_dropins.webutil.appengine_config.tasks_client.create_task')
def test_inbox_activity_id_on_opted_out_web_domain(self, mock_create_task, *_):
@ -604,8 +604,8 @@ class ActivityPubTest(TestCase):
author = self.make_user(ACTOR['id'], cls=ActivityPub, obj_as2=ACTOR)
resp = self.post('/ap/sharedInbox', json=NOTE)
obj_key = Object(id=NOTE['id']).key.urlsafe()
self.assert_task(mock_create_task, 'receive', obj=obj_key,
self.assert_task(mock_create_task, 'receive', id='http://mas.to/note/as2',
source_protocol='activitypub', as2=NOTE,
authed_as=NOTE['actor'])
def test_inbox_reply_object(self, mock_head, mock_get, mock_post):
@ -943,6 +943,7 @@ class ActivityPubTest(TestCase):
def _test_inbox_with_to_ignored(self, to, mock_head, mock_get, mock_post):
author = self.make_user(ACTOR['id'], cls=ActivityPub, obj_as2=ACTOR)
Follower.get_or_create(to=author, from_=self.user)
orig_obj_count = Object.query().count()
mock_head.return_value = requests_response(url='http://target')
@ -952,10 +953,7 @@ class ActivityPubTest(TestCase):
got = self.post('/user.com/inbox', json=not_public)
self.assertEqual(200, got.status_code, got.get_data(as_text=True))
activity = Object.get_by_id(not_public['id'])
self.assertIsNone(activity.status)
self.assertEqual([], activity.delivered)
self.assertIsNone(Object.get_by_id(not_public['object']['id']))
self.assertEqual(orig_obj_count, Object.query().count())
def test_follow_bot_user_enables_protocol(self, _, mock_get, __):
# bot user

Wyświetl plik

@ -13,7 +13,7 @@ from dns.resolver import NXDOMAIN
import google.cloud.dns.client
from google.cloud.dns.zone import ManagedZone
from google.cloud.tasks_v2.types import Task
from granary.bluesky import NO_AUTHENTICATED_LABEL
from granary import bluesky
from granary.tests.test_bluesky import (
ACTOR_AS,
ACTOR_PROFILE_BSKY,
@ -269,7 +269,7 @@ class ATProtoTest(TestCase):
**ACTOR_PROFILE_BSKY,
'labels': {
'values': [{
'val' : NO_AUTHENTICATED_LABEL,
'val' : bluesky.NO_AUTHENTICATED_LABEL,
'neg' : False,
}],
},
@ -2145,6 +2145,15 @@ Sed tortor neque, aliquet quis posuere aliquam […]
'rev': '123',
'sentAt': NOW.isoformat(),
}
# note that order matters in these for the assert_task calls below!
msg_alice_as1 = {
'objectType': 'note',
'id': 'at://did:al:ice/chat.bsky.convo.defs.messageView/uvw',
'content': 'foo bar',
'published': NOW.isoformat(),
'author': 'did:al:ice',
'to': ['fa.brid.gy'],
}
msg_bob = {
'$type': 'chat.bsky.convo.defs#messageView',
'id': 'xyz',
@ -2153,6 +2162,14 @@ Sed tortor neque, aliquet quis posuere aliquam […]
'rev': '456',
'sentAt': NOW.isoformat(),
}
msg_bob_as1 = {
'objectType': 'note',
'id': 'at://did:bo:b/chat.bsky.convo.defs.messageView/xyz',
'content': 'baz biff',
'published': NOW.isoformat(),
'author': 'did:bo:b',
'to': ['fa.brid.gy'],
}
msg_from_bot = {
'$type': 'chat.bsky.convo.defs#messageView',
'id': 'lmno',
@ -2169,6 +2186,14 @@ Sed tortor neque, aliquet quis posuere aliquam […]
'rev': '000',
'sentAt': NOW.isoformat(),
}
msg_eve_as1 = {
'objectType': 'note',
'id': 'at://did:ev:e/chat.bsky.convo.defs.messageView/rst',
'content': 'boff',
'published': NOW.isoformat(),
'author': 'did:ev:e',
'to': ['fa.brid.gy'],
}
mock_get.side_effect = [
requests_response({
@ -2225,41 +2250,20 @@ Sed tortor neque, aliquet quis posuere aliquam […]
id = 'at://did:al:ice/chat.bsky.convo.defs.messageView/uvw'
self.assert_task(mock_create_task, 'receive', authed_as='did:al:ice',
obj=Object(id=id).key.urlsafe())
self.assert_object(id, source_protocol='atproto', our_as1={
'objectType': 'note',
'id': 'at://did:al:ice/chat.bsky.convo.defs.messageView/uvw',
'author': 'did:al:ice',
'content': 'foo bar',
'to': ['fa.brid.gy'],
'published': NOW.isoformat(),
})
id=id, source_protocol='atproto',
bsky=msg_alice, our_as1=msg_alice_as1)
id = 'at://did:bo:b/chat.bsky.convo.defs.messageView/xyz'
self.assert_task(mock_create_task, 'receive', authed_as='did:bo:b',
obj=Object(id=id).key.urlsafe())
self.assert_object(id, source_protocol='atproto', our_as1={
'objectType': 'note',
'id': 'at://did:bo:b/chat.bsky.convo.defs.messageView/xyz',
'author': 'did:bo:b',
'content': 'baz biff',
'to': ['fa.brid.gy'],
'published': NOW.isoformat(),
})
id=id, source_protocol='atproto',
bsky=msg_bob, our_as1=msg_bob_as1)
id = 'at://did:plc:user/chat.bsky.convo.defs.messageView/lmno'
self.assertIsNone(Object.get_by_id(id))
id = 'at://did:ev:e/chat.bsky.convo.defs.messageView/rst'
self.assert_task(mock_create_task, 'receive', authed_as='did:ev:e',
obj=Object(id=id).key.urlsafe())
self.assert_object(id, source_protocol='atproto', our_as1={
'objectType': 'note',
'id': 'at://did:ev:e/chat.bsky.convo.defs.messageView/rst',
'author': 'did:ev:e',
'content': 'boff',
'to': ['fa.brid.gy'],
'published': NOW.isoformat(),
})
id=id, source_protocol='atproto',
bsky=msg_eve, our_as1=msg_eve_as1)
self.assertEqual('dunn', fa.key.get().atproto_last_chat_log_cursor)

Wyświetl plik

@ -466,11 +466,9 @@ class ATProtoFirehoseHandleTest(ATProtoTestCase):
handle(limit=1)
user_key = ATProto(id='did:plc:user').key
obj = self.assert_object('at://did:plc:user/app.bsky.feed.post/123',
bsky=reply, source_protocol='atproto',
status='new', users=[user_key],
ignore=['our_as1'])
self.assert_task(mock_create_task, 'receive', obj=obj.key.urlsafe(),
self.assert_task(mock_create_task, 'receive',
id='at://did:plc:user/app.bsky.feed.post/123',
bsky=reply, source_protocol='atproto',
authed_as='did:plc:user')
def test_delete_post(self, mock_create_task):
@ -481,15 +479,15 @@ class ATProtoFirehoseHandleTest(ATProtoTestCase):
obj_id = 'at://did:plc:user/app.bsky.feed.post/123'
delete_id = f'{obj_id}#delete'
user_key = ATProto(id='did:plc:user').key
obj = self.assert_object(delete_id, source_protocol='atproto',
status='new', users=[user_key], our_as1={
'objectType': 'activity',
'verb': 'delete',
'id': delete_id,
'actor': 'did:plc:user',
'object': obj_id,
})
self.assert_task(mock_create_task, 'receive', obj=obj.key.urlsafe(),
expected_as1 = {
'objectType': 'activity',
'verb': 'delete',
'id': delete_id,
'actor': 'did:plc:user',
'object': obj_id,
}
self.assert_task(mock_create_task, 'receive', id=delete_id,
our_as1=expected_as1, source_protocol='atproto',
authed_as='did:plc:user')
def test_delete_block_is_undo(self, mock_create_task):
@ -500,15 +498,16 @@ class ATProtoFirehoseHandleTest(ATProtoTestCase):
obj_id = 'at://did:plc:user/app.bsky.graph.block/123'
undo_id = f'{obj_id}#undo'
user_key = ATProto(id='did:plc:user').key
obj = self.assert_object(undo_id, source_protocol='atproto',
status='new', users=[user_key], our_as1={
'objectType': 'activity',
'verb': 'undo',
'id': undo_id,
'actor': 'did:plc:user',
'object': obj_id,
})
self.assert_task(mock_create_task, 'receive', obj=obj.key.urlsafe(),
expected_as1 = {
'objectType': 'activity',
'verb': 'undo',
'id': undo_id,
'actor': 'did:plc:user',
'object': obj_id,
}
self.assert_task(mock_create_task, 'receive', id=undo_id,
our_as1=expected_as1, source_protocol='atproto',
authed_as='did:plc:user')
def test_unsupported_type(self, mock_create_task):

Wyświetl plik

@ -15,6 +15,7 @@ from oauth_dropins.webutil import appengine_info, util
from oauth_dropins.webutil.appengine_config import ndb_client
from oauth_dropins.webutil.flask_util import NoContent
from oauth_dropins.webutil.testutil import NOW, requests_response
from oauth_dropins.webutil.util import json_dumps
import requests
from werkzeug.exceptions import BadRequest
@ -2939,7 +2940,7 @@ class ProtocolReceiveTest(TestCase):
mock_send.assert_not_called()
def test_receive_task_handler(self):
def test_receive_task_handler_obj(self):
note = {
'id': 'fake:post',
'objectType': 'note',
@ -2966,11 +2967,46 @@ class ProtocolReceiveTest(TestCase):
obj = Object.get_by_id('fake:post#bridgy-fed-create')
self.assertEqual('ignored', obj.status)
def test_receive_task_handler_id(self):
note = {
'id': 'fake:post',
'objectType': 'note',
'author': 'fake:other',
}
create = {
'id': 'fake:post#bridgy-fed-create',
'objectType': 'activity',
'verb': 'post',
'object': note,
'actor': 'fake:other',
}
resp = self.post('/queue/receive', data={
'our_as1': json_dumps(note),
'source_protocol': 'fake',
'authed_as': 'fake:other',
}, headers={'X-AppEngine-TaskRetryCount': '0'})
self.assertEqual(204, resp.status_code)
obj = Object.get_by_id('fake:post')
self.assertEqual(note, obj.our_as1)
obj = Object.get_by_id('fake:post#bridgy-fed-create')
self.assertEqual({
**create,
'published': '2022-01-02T03:04:05+00:00',
}, obj.our_as1)
self.assertEqual('ignored', obj.status)
@patch.object(Fake, 'receive', side_effect=requests.ConnectionError('foo'))
def test_receive_task_handler_connection_error(self, _):
obj = self.store_object(id='fake:post', source_protocol='fake')
got = self.post('/queue/receive', data={'obj': obj.key.urlsafe()})
orig_count = Object.query().count()
got = self.post('/queue/receive', data={
'our_as1': json_dumps({'id': 'fake:post'}),
'source_protocol': 'fake',
})
self.assertEqual(304, got.status_code)
self.assertEqual(orig_count, Object.query().count())
def test_receive_task_handler_authed_as(self):
note = {
@ -2978,95 +3014,97 @@ class ProtocolReceiveTest(TestCase):
'objectType': 'note',
'author': 'fake:alice',
}
obj = self.store_object(id='fake:post', our_as1=note,
source_protocol='fake')
got = self.post('/queue/receive', data={
'obj': obj.key.urlsafe(),
'our_as1': json_dumps(note),
'source_protocol': 'fake',
'authed_as': 'fake:alice',
})
self.assertEqual(204, got.status_code)
self.assertIsNotNone(Object.get_by_id('fake:post#bridgy-fed-create'))
self.assertEqual(note, Object.get_by_id('fake:post').our_as1)
def test_receive_task_handler_authed_as_domain_vs_homepage(self):
user = self.make_user('user.com', cls=Web, obj_id='https://user.com/')
obj = self.store_object(id='https://user.com/c', source_protocol='web',
our_as1= {
'id': 'https://user.com/c',
'objectType': 'note',
'author': 'https://user.com/',
})
note = {
'id': 'https://user.com/c',
'objectType': 'note',
'author': 'https://user.com/',
}
got = self.post('/queue/receive', data={
'obj': obj.key.urlsafe(),
'our_as1': json_dumps(note),
'source_protocol': 'web',
'authed_as': 'user.com',
})
self.assertEqual(204, got.status_code)
self.assertIsNotNone(Object.get_by_id('https://user.com/c#bridgy-fed-create'))
self.assertEqual({
**note,
'author': 'user.com',
}, Object.get_by_id('https://user.com/c').our_as1)
@patch('requests.get', return_value=requests_response('<html></html>'))
def test_receive_task_handler_authed_as_www_subdomain(self, _):
obj = self.store_object(id='http://www.foo.com/post', source_protocol='web',
our_as1={
'id': 'http://www.foo.com/post',
'objectType': 'note',
'author': 'http://www.foo.com/bar',
})
note = {
'id': 'http://www.foo.com/post',
'objectType': 'note',
'author': 'http://www.foo.com/bar',
}
got = self.post('/queue/receive', data={
'obj': obj.key.urlsafe(),
'our_as1': json_dumps(note),
'source_protocol': 'web',
'authed_as': 'foo.com',
})
self.assertEqual(204, got.status_code)
self.assertIsNotNone(Object.get_by_id(
'http://www.foo.com/post#bridgy-fed-create'))
self.assertEqual(note, Object.get_by_id('http://www.foo.com/post').our_as1)
@patch('requests.get', return_value=requests_response('<html></html>'))
def test_receive_task_handler_authed_as_mixed_subdomains(self, _):
user = self.make_user('user.com', cls=Web, obj_id='https://user.com/')
obj = self.store_object(id='http://user.com/post', source_protocol='web',
our_as1={
'objectType': 'note',
'author': 'http://m.user.com/',
})
note = {
'objectType': 'note',
'id': 'http://user.com/post',
'author': 'http://m.user.com/',
}
got = self.post('/queue/receive', data={
'obj': obj.key.urlsafe(),
'our_as1': json_dumps(note),
'source_protocol': 'web',
'authed_as': 'www.user.com',
})
self.assertEqual(204, got.status_code)
self.assertIsNotNone(Object.get_by_id(
'http://user.com/post#bridgy-fed-create'))
self.assertEqual(note, Object.get_by_id('http://user.com/post').our_as1)
@patch('requests.get', return_value=requests_response('<html></html>'))
def test_receive_task_handler_authed_as_wrong_domain(self, _):
obj = self.store_object(id='http://bar.com/post', source_protocol='web',
our_as1={
'id': 'http://bar.com/post',
'objectType': 'note',
'author': 'http://bar.com/',
})
note = {
'id': 'http://bar.com/post',
'objectType': 'note',
'author': 'http://bar.com/',
}
got = self.post('/queue/receive', data={
'obj': obj.key.urlsafe(),
'our_as1': json_dumps(note),
'source_protocol': 'web',
'authed_as': 'foo.com',
})
self.assertEqual(299, got.status_code)
self.assertIsNone(Object.get_by_id('http://bar.com/post#bridgy-fed-create'))
self.assertIsNone(Object.get_by_id('https://bar.com/post'))
def test_receive_task_handler_not_authed_as(self):
obj = self.store_object(id='fake:post', source_protocol='fake', our_as1={
note = {
'id': 'fake:post',
'objectType': 'note',
'author': 'fake:other',
})
}
got = self.post('/queue/receive', data={
'obj': obj.key.urlsafe(),
'our_as1': json_dumps(note),
'source_protocol': 'fake',
'authed_as': 'fake:eve',
})
self.assertEqual(299, got.status_code)
self.assertIsNone(Object.get_by_id('fake:post#bridgy-fed-create'))
self.assertIsNone(Object.get_by_id('fake:post'))
def test_like_not_authed_as_actor(self):
Fake.fetchable['fake:post'] = {

Wyświetl plik

@ -1861,6 +1861,22 @@ class WebTest(TestCase):
# fetch post to look for image
WEBMENTION_NO_REL_LINK,
]
# order matters here for assert_task below
post_as1 = {
'objectType': 'activity',
'verb': 'post',
'id': 'https://user.com/post',
'url': 'https://user.com/post',
'object':{
'objectType': 'note',
'id': 'https://user.com/post',
'url': 'https://user.com/post',
'content': 'I hereby ☕ post',
'author': {'id': 'https://user.com/'},
},
'actor': {'id': 'https://user.com/'},
'feed_index': 0,
}
got = self.post('/queue/poll-feed', data={
'domain': 'user.com',
@ -1877,30 +1893,8 @@ class WebTest(TestCase):
mock_get.assert_has_calls((
self.req('https://foo/feed'),
))
obj = self.assert_object('https://user.com/post',
users=[self.user.key],
source_protocol='web',
status='new',
atom=feed,
our_as1={
'objectType': 'activity',
'verb': 'post',
'id': 'https://user.com/post',
'url': 'https://user.com/post',
'actor': {'id': 'https://user.com/'},
'object':{
'objectType': 'note',
'id': 'https://user.com/post',
'url': 'https://user.com/post',
'author': {'id': 'https://user.com/'},
'content': 'I hereby ☕ post',
},
'feed_index': 0,
},
type='post',
labels=['user', 'activity'],
)
self.assert_task(mock_create_task, 'receive', obj=obj.key.urlsafe(),
self.assert_task(mock_create_task, 'receive', id='https://user.com/post',
source_protocol='web', atom=feed, our_as1=post_as1,
authed_as='user.com')
expected_eta = NOW_SECONDS + web.MIN_FEED_POLL_PERIOD.total_seconds()
@ -1963,35 +1957,29 @@ class WebTest(TestCase):
mock_get.assert_has_calls((
self.req('https://foo/rss'),
))
for i, (id, hour) in enumerate([('a', 8), ('b', 5), ('c', 4)]):
url = f'http://po.st/{id}'
obj = self.assert_object(
url,
users=[self.user.key],
source_protocol='web',
status='new',
rss=feed,
our_as1={
'objectType': 'activity',
'verb': 'post',
expected_as1 = {
'objectType': 'activity',
'verb': 'post',
'id': url,
'url': url,
'actor': {'id': 'https://user.com/'},
'object':{
'objectType': 'note',
'id': url,
'url': url,
'actor': {'id': 'https://user.com/'},
'object':{
'objectType': 'note',
'id': url,
'url': url,
'author': {'id': 'https://user.com/'},
'content': f'I hereby ☕ post {id}',
'published': f'2012-12-08T0{hour}:00:00+00:00',
},
'feed_index': i,
'author': {'id': 'https://user.com/'},
'content': f'I hereby ☕ post {id}',
'published': f'2012-12-08T0{hour}:00:00+00:00',
},
type='post',
labels=['user', 'activity'],
)
self.assert_task(mock_create_task, 'receive', obj=obj.key.urlsafe(),
authed_as='user.com')
'feed_index': i,
}
with self.subTest(id=id):
self.assert_task(mock_create_task, 'receive', id=url,
our_as1=expected_as1, rss=feed,
source_protocol='web', authed_as='user.com')
# delay is average of 1h and 3h between posts
expected_eta = NOW_SECONDS + timedelta(hours=2).total_seconds()
@ -2025,7 +2013,10 @@ class WebTest(TestCase):
mock_get.assert_has_calls((
self.req('https://foo/feed'),
))
assert Object.get_by_id('https://user.com/post')
queue, body = self.parse_tasks(mock_create_task)[0]
self.assertEqual('receive', queue)
self.assertEqual('https://user.com/post', body['id'])
@patch('oauth_dropins.webutil.appengine_config.tasks_client.create_task')
def test_poll_feed_use_url_as_id(self, mock_create_task, mock_get, _):
@ -2057,30 +2048,24 @@ class WebTest(TestCase):
mock_get.assert_has_calls((
self.req('https://foo/feed'),
))
obj = self.assert_object('https://user.com/post',
users=[self.user.key],
source_protocol='web',
status='new',
atom=feed,
our_as1={
'objectType': 'activity',
'verb': 'post',
'id': 'https://user.com/post',
'url': 'https://user.com/post',
'actor': {'id': 'https://user.com/'},
'object':{
'objectType': 'note',
'id': 'https://user.com/post',
'url': 'https://user.com/post',
'author': {'id': 'https://user.com/'},
'content': 'I hereby ☕ post',
},
'feed_index': 0,
},
type='post',
labels=['user', 'activity'],
)
self.assert_task(mock_create_task, 'receive', obj=obj.key.urlsafe(),
expected_as1 = {
'objectType': 'activity',
'verb': 'post',
'id': 'https://user.com/post',
'url': 'https://user.com/post',
'actor': {'id': 'https://user.com/'},
'object':{
'objectType': 'note',
'id': 'https://user.com/post',
'url': 'https://user.com/post',
'author': {'id': 'https://user.com/'},
'content': 'I hereby ☕ post',
},
'feed_index': 0,
}
self.assert_task(mock_create_task, 'receive', id='https://user.com/post',
our_as1=expected_as1, atom=feed, source_protocol='web',
authed_as='user.com')
@patch('oauth_dropins.webutil.appengine_config.tasks_client.create_task')
@ -2264,28 +2249,26 @@ class WebTest(TestCase):
self.req('https://foo/feed'),
self.req('https://user.com/post'),
))
self.assert_object(
'https://user.com/post',
source_protocol='web',
atom=feed,
status='new',
users=[self.user.key],
our_as1={
'objectType': 'activity',
'verb': 'post',
expected_as1 = {
'objectType': 'activity',
'verb': 'post',
'id': 'https://user.com/post',
'url': 'https://user.com/post',
'actor': {'id': 'https://user.com/'},
'object': {
'objectType': 'note',
'id': 'https://user.com/post',
'url': 'https://user.com/post',
'actor': {'id': 'https://user.com/'},
'object': {
'objectType': 'note',
'id': 'https://user.com/post',
'url': 'https://user.com/post',
'author': {'id': 'https://user.com/'},
'content': 'I hereby ☕ post',
'image': ['http://example.com/pic.png'],
},
'feed_index': 0,
})
'author': {'id': 'https://user.com/'},
'content': 'I hereby ☕ post',
'image': ['http://example.com/pic.png'],
},
'feed_index': 0,
}
self.assert_task(mock_create_task, 'receive', id='https://user.com/post',
source_protocol='web', atom=feed, our_as1=expected_as1,
authed_as='user.com')
@patch('oauth_dropins.webutil.appengine_config.tasks_client.create_task')
def test_poll_feed_fetch_post_for_image_same_as_user_profile(
@ -2320,9 +2303,11 @@ class WebTest(TestCase):
self.req('https://foo/feed'),
self.req('https://user.com/post'),
))
obj = Object.get_by_id('https://user.com/post')
self.assertNotIn('image', obj.our_as1)
self.assertEqual([], obj.our_as1['object']['image'])
queue, body = self.parse_tasks(mock_create_task)[0]
self.assertEqual('receive', queue)
self.assertNotIn('image', body['our_as1'])
self.assertEqual([], body['our_as1']['object']['image'])
@patch('oauth_dropins.webutil.appengine_config.tasks_client.create_task')
def test_poll_feed_etag_last_modified(self, mock_create_task, mock_get, _):
@ -2389,9 +2374,13 @@ class WebTest(TestCase):
got = self.post('/queue/poll-feed', data={'domain': 'user.com'})
self.assertEqual(200, got.status_code)
self.assertIsNotNone(Object.get_by_id('https://user.com/A'))
self.assertIsNotNone(Object.get_by_id('https://user.com/B'))
self.assertIsNone(Object.get_by_id('https://user.com/C'))
tasks = self.parse_tasks(mock_create_task)
self.assertEqual(3, len(tasks))
self.assertEqual('receive', tasks[0][0])
self.assertEqual('https://user.com/A', tasks[0][1]['id'])
self.assertEqual('receive', tasks[1][0])
self.assertEqual('https://user.com/B', tasks[1][1]['id'])
self.assertEqual('poll-feed', tasks[2][0])
user = self.user.key.get()
self.assertEqual(NOW, user.last_polled_feed)

Wyświetl plik

@ -8,7 +8,7 @@ import random
import re
import unittest
from unittest.mock import ANY, call
from urllib.parse import urlencode
from urllib.parse import parse_qs, urlencode
import warnings
from arroba import did
@ -30,6 +30,7 @@ from oauth_dropins.webutil import flask_util, testutil, util
from oauth_dropins.webutil.appengine_config import ndb_client
from oauth_dropins.webutil import appengine_info
from oauth_dropins.webutil.testutil import requests_response
from oauth_dropins.webutil.util import json_dumps, json_loads
import requests
# other modules are imported _after_ Fake etc classes is defined so that it's in
@ -578,11 +579,23 @@ class TestCase(unittest.TestCase, testutil.Asserts):
return got
def assert_task(self, mock_create_task, queue, eta_seconds=None, **params):
# if only one task was created in this queue, extract and decode HTTP
# body so we can pretty-print a comparison
bodies = [body for task_queue, body in self.parse_tasks(mock_create_task)
if task_queue == queue]
if len(bodies) == 1:
self.assertEqual({k: v.decode() if isinstance(v, bytes) else v
for k, v in params.items()},
bodies[0])
# check for exact task
params = [(k, json_dumps(v, sort_keys=True) if isinstance(v, dict) else v)
for k, v in params.items()]
expected = {
'app_engine_http_request': {
'http_method': 'POST',
'relative_uri': f'/queue/{queue}',
'body': urlencode(sorted(params.items())).encode(),
'body': urlencode(sorted(params)).encode(),
'headers': {'Content-Type': 'application/x-www-form-urlencoded'},
},
}
@ -594,6 +607,19 @@ class TestCase(unittest.TestCase, testutil.Asserts):
task=expected,
)
def parse_tasks(self, mock_create_task):
"""Returns (queue, {param name: value}) tuples, where JSON param values
are parsed."""
tasks = []
for call in mock_create_task.call_args_list:
task = call.kwargs['task']['app_engine_http_request']
queue = task['relative_uri'].removeprefix('/queue/')
body = {name: json_loads(val[0]) if val and val[0][0] == '{' else val[0]
for name, val in parse_qs(task['body'].decode()).items()}
tasks.append((queue, body))
return tasks
def assert_equals(self, expected, actual, msg=None, ignore=(), **kwargs):
return super().assert_equals(

15
web.py
Wyświetl plik

@ -743,7 +743,7 @@ def poll_feed(user, feed_url, rel_type):
logger.info(f'Got {len(activities)} feed items, only processing the first {MAX_FEED_ITEMS_PER_POLL}')
activities = activities[:MAX_FEED_ITEMS_PER_POLL]
# create Objects and receive tasks
# create receive tasks
for i, activity in enumerate(activities):
# default actor and author to user
activity.setdefault('actor', {}).setdefault('id', user.profile_id())
@ -776,19 +776,18 @@ def poll_feed(user, feed_url, rel_type):
if not obj.get('image'):
# fetch and check the post itself
logger.info(f'No image in {id} , trying metaformats')
post = Web.load(id, metaformats=True, authorship_fetch_mf2=False)
if post and post.as1:
post = Object(id=id)
fetched = Web.fetch(post, metaformats=True, authorship_fetch_mf2=False)
if fetched and post.as1:
profile_images = (as1.get_ids(user.obj.as1, 'image')
if user.obj.as1 else [])
obj['image'] = [img for img in as1.get_ids(post.as1, 'image')
if img not in profile_images]
activity['feed_index'] = i
obj = Object.get_or_create(id=id, authed_as=user.key.id(), our_as1=activity,
status='new', source_protocol=Web.ABBREV,
users=[user.key], **obj_feed_prop)
common.create_task(queue='receive', obj=obj.key.urlsafe(),
authed_as=user.key.id())
common.create_task(queue='receive', id=id, our_as1=activity,
source_protocol=Web.ABBREV, authed_as=user.key.id(),
**obj_feed_prop)
return activities