kopia lustrzana https://github.com/snarfed/bridgy-fed
Protocol.receive: detect and prevent duplicate calls for the same object
for https://github.com/snarfed/bridgy-fed/issues/1063, should fix it. coming up with the test for this was fun!pull/1175/head
rodzic
8307b38024
commit
89e2372922
|
@ -739,6 +739,8 @@ class Protocol:
|
|||
assert isinstance(obj, Object), obj
|
||||
logger.info(f'From {from_cls.LABEL}: {obj.type} {obj.key} AS1: {json_dumps(obj.as1, indent=2)}')
|
||||
|
||||
# TODO: return 204 for all of these errors so we don't retry them
|
||||
# https://cloud.google.com/tasks/docs/creating-appengine-handlers
|
||||
if not obj.as1:
|
||||
error('No object data provided')
|
||||
|
||||
|
@ -755,10 +757,17 @@ class Protocol:
|
|||
elif from_cls.is_blocklisted(id, allow_internal=internal):
|
||||
error(f'Activity {id} is blocklisted')
|
||||
|
||||
# lease this object atomically
|
||||
lease_memcache_key = f'receive-{id}'
|
||||
if not common.memcache.add(lease_memcache_key, 'leased',
|
||||
noreply=False, expire=5 * 60): # 5 min
|
||||
error('This object is already being received elsewhere', status=204)
|
||||
|
||||
# short circuit if we've already seen this activity id.
|
||||
# (don't do this for bare objects since we need to check further down
|
||||
# whether they've been updated since we saw them last.)
|
||||
if obj.as1.get('objectType') == 'activity' and 'force' not in request.values:
|
||||
# TODO: switch to memcache
|
||||
with seen_ids_lock:
|
||||
already_seen = id in seen_ids
|
||||
seen_ids[id] = True
|
||||
|
|
|
@ -2,14 +2,18 @@
|
|||
import copy
|
||||
from datetime import timedelta
|
||||
import logging
|
||||
from threading import Condition, Thread
|
||||
from unittest import skip
|
||||
from unittest.mock import patch
|
||||
|
||||
from arroba.tests.testutil import dns_answer
|
||||
from cachetools import LRUCache, TTLCache
|
||||
from google.cloud import ndb
|
||||
from google.cloud.ndb.global_cache import _InProcessGlobalCache
|
||||
from granary import as2
|
||||
from granary.tests.test_bluesky import ACTOR_PROFILE_BSKY
|
||||
from oauth_dropins.webutil import appengine_info, models, util
|
||||
from oauth_dropins.webutil.appengine_config import ndb_client
|
||||
from oauth_dropins.webutil.flask_util import NoContent
|
||||
from oauth_dropins.webutil.testutil import NOW, requests_response
|
||||
import requests
|
||||
|
@ -2257,6 +2261,8 @@ class ProtocolReceiveTest(TestCase):
|
|||
copies=[Target(uri='fake:bob', protocol='fake')])
|
||||
|
||||
protocol.seen_ids.clear()
|
||||
common.memcache.clear()
|
||||
|
||||
obj.new = True
|
||||
OtherFake.fetchable = {
|
||||
'other:bob': {},
|
||||
|
@ -2288,7 +2294,9 @@ class ProtocolReceiveTest(TestCase):
|
|||
copies=[Target(uri='fake:post', protocol='fake')])
|
||||
|
||||
protocol.seen_ids.clear()
|
||||
common.memcache.clear()
|
||||
obj.new = True
|
||||
|
||||
_, code = Fake.receive(obj, authed_as='fake:alice')
|
||||
self.assertEqual(204, code)
|
||||
|
||||
|
@ -2336,6 +2344,8 @@ class ProtocolReceiveTest(TestCase):
|
|||
copies=[Target(uri='other:post', protocol='other')])
|
||||
|
||||
protocol.seen_ids.clear()
|
||||
common.memcache.clear()
|
||||
|
||||
obj.new = True
|
||||
self.assertEqual(('OK', 202),
|
||||
OtherFake.receive(obj, authed_as='other:eve'))
|
||||
|
@ -2535,6 +2545,66 @@ class ProtocolReceiveTest(TestCase):
|
|||
self.assertTrue(user.is_enabled(Fake))
|
||||
self.assertEqual(['eefake:user'], ExplicitEnableFake.fetched)
|
||||
|
||||
def test_receive_activity_lease(self):
|
||||
def reset_seen_ids():
|
||||
protocol.seen_ids = LRUCache(100000)
|
||||
|
||||
self.addCleanup(reset_seen_ids)
|
||||
|
||||
# pretend like the two threads below are in different processes
|
||||
protocol.seen_ids = TTLCache(maxsize=10, ttl=0)
|
||||
|
||||
Follower.get_or_create(to=self.user, from_=self.alice)
|
||||
|
||||
note = {
|
||||
'objectType': 'activity',
|
||||
'verb': 'post',
|
||||
'id': 'fake:post',
|
||||
'actor': 'fake:user',
|
||||
'object': {
|
||||
'id': 'fake:note',
|
||||
'objectType': 'note',
|
||||
'author': 'fake:user',
|
||||
},
|
||||
}
|
||||
|
||||
orig_fake_send = Fake.send
|
||||
at_send = Condition()
|
||||
continue_send = Condition()
|
||||
def send(*args, **kwargs):
|
||||
with at_send:
|
||||
at_send.notify()
|
||||
with continue_send:
|
||||
continue_send.wait()
|
||||
return orig_fake_send(*args, **kwargs)
|
||||
|
||||
def receive():
|
||||
with app.test_request_context('/'), \
|
||||
ndb_client.context(
|
||||
global_cache=_InProcessGlobalCache(),
|
||||
global_cache_timeout_policy=common.global_cache_timeout_policy,
|
||||
cache_policy=lambda key: False):
|
||||
try:
|
||||
Fake.receive_as1(note)
|
||||
except NoContent: # raised by the second thread
|
||||
pass
|
||||
|
||||
first = Thread(target=receive)
|
||||
second = Thread(target=receive)
|
||||
|
||||
with patch.object(Fake, 'send', side_effect=send):
|
||||
first.start()
|
||||
second.start()
|
||||
with at_send:
|
||||
at_send.wait()
|
||||
with continue_send:
|
||||
continue_send.notify(1)
|
||||
first.join()
|
||||
second.join()
|
||||
|
||||
# only one receive call should try to send
|
||||
self.assertEqual([('fake:post', 'fake:shared:target')], Fake.sent)
|
||||
|
||||
def test_dm_no_yes_sets_enabled_protocols(self):
|
||||
# bot user
|
||||
self.make_user('fa.brid.gy', cls=Web)
|
||||
|
|
Ładowanie…
Reference in New Issue