Protocol.load: reload objects if our copy is over 30d old

fixes #628

no clue how much this will impact our outbound request load. we'll see!
pull/837/head
Ryan Barrett 2024-02-08 11:22:32 -08:00
rodzic c8c42ed594
commit c966090912
Nie znaleziono w bazie danych klucza dla tego podpisu
ID klucza GPG: 6BE31FDF4776E9D4
3 zmienionych plików z 50 dodań i 27 usunięć

Wyświetl plik

@ -1,5 +1,6 @@
"""Base protocol class and common code.""" """Base protocol class and common code."""
import copy import copy
from datetime import timedelta
import logging import logging
from threading import Lock from threading import Lock
from urllib.parse import urljoin from urllib.parse import urljoin
@ -38,6 +39,8 @@ SUPPORTED_TYPES = (
'video', 'video',
) )
OBJECT_REFRESH_AGE = timedelta(days=30)
# activity ids that we've already handled and can now ignore. # activity ids that we've already handled and can now ignore.
# used in Protocol.receive # used in Protocol.receive
seen_ids = LRUCache(100000) seen_ids = LRUCache(100000)
@ -1099,44 +1102,48 @@ class Protocol:
requests.HTTPError: anything that :meth:`fetch` raises requests.HTTPError: anything that :meth:`fetch` raises
""" """
assert local or remote is not False assert local or remote is not False
logger.debug(f'Loading Object {id} local={local} remote={remote}')
if remote is not True:
with objects_cache_lock:
cached = objects_cache.get(id)
if cached:
# make a copy so that if the client modifies this entity in
# memory, those modifications aren't applied to the cache
# until they explicitly put() the modified entity.
# NOTE: keep in sync with Object._post_put_hook!
return Object(id=cached.key.id(), **cached.to_dict(
# computed properties
exclude=['as1', 'expire', 'object_ids', 'type']))
obj = orig_as1 = None obj = orig_as1 = None
if local: with objects_cache_lock:
cached = objects_cache.get(id)
if cached:
# make a copy so that if the client modifies this entity in
# memory, those modifications aren't applied to the cache
# until they explicitly put() the modified entity.
# NOTE: keep in sync with Object._post_put_hook!
logger.debug(' got from cache')
obj = Object(id=cached.key.id(), **cached.to_dict(
# computed properties
exclude=['as1', 'expire', 'object_ids', 'type']))
if local and not obj:
obj = Object.get_by_id(id) obj = Object.get_by_id(id)
if obj and (obj.as1 or obj.raw or obj.deleted): if not obj:
logger.info(' got from datastore') logger.debug(f' not in datastore')
elif obj.as1 or obj.raw or obj.deleted:
logger.debug(' got from datastore')
obj.new = False obj.new = False
orig_as1 = obj.as1
if remote is not True: if remote is not True:
with objects_cache_lock: with objects_cache_lock:
objects_cache[id] = obj objects_cache[id] = obj
return obj
if remote is True: if remote is False:
logger.debug(f'Loading Object {id} local={local} remote={remote}, forced refresh requested')
elif remote is False:
logger.debug(f'Loading Object {id} local={local} remote={remote} {"empty" if obj else "not"} in datastore')
return obj return obj
elif remote is None and obj:
if obj.updated < util.as_utc(util.now() - OBJECT_REFRESH_AGE):
logger.debug(f' last updated {obj.updated}, refreshing')
else:
return obj
if obj: if obj:
orig_as1 = obj.as1
obj.clear() obj.clear()
obj.new = False obj.new = False
else: else:
obj = Object(id=id) obj = Object(id=id)
if local: if local:
logger.info(' not in datastore') logger.debug(' not in datastore')
obj.new = True obj.new = True
obj.changed = False obj.changed = False

Wyświetl plik

@ -414,7 +414,7 @@ class ObjectTest(TestCase):
self.assert_entities_equal(obj, Object.get_by_id('ab^^c')) self.assert_entities_equal(obj, Object.get_by_id('ab^^c'))
def test_get_by_id_uses_cache(self): def test_get_by_id_uses_cache(self):
obj = Object(id='foo', our_as1={'x': 'y'}) obj = Object(id='foo', our_as1={'x': 'y'}, updated=util.as_utc(NOW))
protocol.objects_cache['foo'] = obj protocol.objects_cache['foo'] = obj
loaded = Fake.load('foo') loaded = Fake.load('foo')
self.assert_entities_equal(obj, loaded) self.assert_entities_equal(obj, loaded)
@ -439,7 +439,7 @@ class ObjectTest(TestCase):
}, Fake.load('foo').our_as1) }, Fake.load('foo').our_as1)
def test_get_by_id_cached_makes_copy(self): def test_get_by_id_cached_makes_copy(self):
obj = Object(id='foo', our_as1={'x': 'y'}) obj = Object(id='foo', our_as1={'x': 'y'}, updated=util.as_utc(NOW))
protocol.objects_cache['foo'] = obj protocol.objects_cache['foo'] = obj
loaded = Fake.load('foo') loaded = Fake.load('foo')
self.assert_entities_equal(obj, loaded) self.assert_entities_equal(obj, loaded)

Wyświetl plik

@ -1,5 +1,6 @@
"""Unit tests for protocol.py.""" """Unit tests for protocol.py."""
import copy import copy
from datetime import timedelta
import logging import logging
from unittest import skip from unittest import skip
from unittest.mock import patch from unittest.mock import patch
@ -8,9 +9,9 @@ from arroba.tests.testutil import dns_answer
from flask import g from flask import g
from google.cloud import ndb from google.cloud import ndb
from granary import as2 from granary import as2
from oauth_dropins.webutil import appengine_info from oauth_dropins.webutil import appengine_info, util
from oauth_dropins.webutil.flask_util import CLOUD_TASKS_QUEUE_HEADER, NoContent from oauth_dropins.webutil.flask_util import CLOUD_TASKS_QUEUE_HEADER, NoContent
from oauth_dropins.webutil.testutil import requests_response from oauth_dropins.webutil.testutil import NOW, requests_response
import requests import requests
from werkzeug.exceptions import BadRequest from werkzeug.exceptions import BadRequest
@ -200,7 +201,7 @@ class ProtocolTest(TestCase):
self.assertEqual([], Fake.fetched) self.assertEqual([], Fake.fetched)
def test_load_cached(self): def test_load_cached(self):
obj = Object(id='foo', our_as1={'x': 'y'}) obj = Object(id='foo', our_as1={'x': 'y'}, updated=util.as_utc(NOW))
protocol.objects_cache['foo'] = obj protocol.objects_cache['foo'] = obj
loaded = Fake.load('foo') loaded = Fake.load('foo')
self.assert_entities_equal(obj, loaded) self.assert_entities_equal(obj, loaded)
@ -346,6 +347,21 @@ class ProtocolTest(TestCase):
self.assert_entities_equal(stored, got) self.assert_entities_equal(stored, got)
self.assertEqual([], Fake.fetched) self.assertEqual([], Fake.fetched)
def test_load_refresh(self):
Fake.fetchable['foo'] = {'fetched': 'x'}
too_old = (NOW.replace(tzinfo=None)
- protocol.OBJECT_REFRESH_AGE
- timedelta(days=1))
with patch('models.Object.updated._now', return_value=too_old):
obj = Object(id='foo', our_as1={'orig': 'y'}, status='in progress')
obj.put()
protocol.objects_cache['foo'] = obj
loaded = Fake.load('foo')
self.assertEqual({'fetched': 'x', 'id': 'foo'}, loaded.our_as1)
def test_actor_key(self): def test_actor_key(self):
user = self.make_user(id='fake:a', cls=Fake) user = self.make_user(id='fake:a', cls=Fake)
a_key = user.key a_key = user.key