use ndb in-memory context cache for DID doc Objects

for #1149
pull/1230/head
Ryan Barrett 2024-08-01 12:07:20 -07:00
rodzic 0f12c755a4
commit 4097fe71fb
Nie znaleziono w bazie danych klucza dla tego podpisu
ID klucza GPG: 6BE31FDF4776E9D4
8 zmienionych plików z 66 dodań i 28 usunięć

Wyświetl plik

@ -24,6 +24,7 @@ from oauth_dropins.webutil.util import json_dumps, json_loads
from atproto import ATProto, Cursor
from common import (
add,
cache_policy,
create_task,
global_cache,
global_cache_timeout_policy,
@ -67,7 +68,7 @@ def load_dids():
def _load_dids():
global atproto_dids, atproto_loaded_at, bridged_dids, bridged_loaded_at
with ndb_client.context(global_cache=global_cache,
with ndb_client.context(cache_policy=cache_policy, global_cache=global_cache,
global_cache_timeout_policy=global_cache_timeout_policy):
if not DEBUG:
Timer(STORE_CURSOR_FREQ.total_seconds(), _load_dids).start()
@ -97,9 +98,8 @@ def subscriber():
while True:
try:
with ndb_client.context(
global_cache=global_cache,
global_cache_timeout_policy=global_cache_timeout_policy,
cache_policy=lambda key: False):
cache_policy=cache_policy, global_cache=global_cache,
global_cache_timeout_policy=global_cache_timeout_policy):
subscribe()
logger.info(f'disconnected! waiting {RECONNECT_DELAY} and then reconnecting')
@ -260,9 +260,8 @@ def handler():
while True:
try:
with ndb_client.context(
global_cache=global_cache,
global_cache_timeout_policy=global_cache_timeout_policy,
cache_policy=lambda key: False):
cache_policy=cache_policy, global_cache=global_cache,
global_cache_timeout_policy=global_cache_timeout_policy):
handle()
# if we return cleanly, that means we hit the limit

Wyświetl plik

@ -20,7 +20,7 @@ import pytz
# all protocols
import activitypub, atproto, web
import atproto_firehose
from common import global_cache, global_cache_timeout_policy, USER_AGENT
from common import cache_policy, global_cache, global_cache_timeout_policy, USER_AGENT
import models
# as of 2024-07-10
@ -42,11 +42,11 @@ app.config.from_pyfile(app_dir / 'config.py')
app.wsgi_app = flask_util.ndb_context_middleware(
app.wsgi_app, client=appengine_config.ndb_client,
# limited context-local cache. avoid full one due to this bug:
# https://github.com/googleapis/python-ndb/issues/888
cache_policy=cache_policy,
global_cache=global_cache,
global_cache_timeout_policy=global_cache_timeout_policy,
# disable context-local cache due to this bug:
# https://github.com/googleapis/python-ndb/issues/888
cache_policy=lambda key: False)
@app.get('/liveness_check')

Wyświetl plik

@ -386,6 +386,22 @@ def report_error(msg, *, exception=False, **kwargs):
logger.warning(f'Failed to report error! {kwargs}', exc_info=exception)
def cache_policy(key):
"""In memory ndb cache, only DID docs right now.
https://github.com/snarfed/bridgy-fed/issues/1149#issuecomment-2261383697
Args:
key (google.cloud.datastore.key.Key): not ``google.cloud.ndb.key.Key``!
see https://github.com/googleapis/python-ndb/issues/987
Returns:
bool: whether to cache this object
"""
logger.info(f'ndb-cache-key {key.__class__}')
return key and key.kind == 'Object' and key.name.startswith('did:')
PROFILE_ID_RE = re.compile(
fr"""
/users?/[^/]+$ |
@ -395,7 +411,16 @@ PROFILE_ID_RE = re.compile(
""", re.VERBOSE)
def global_cache_timeout_policy(key):
"""Cache users and profile objects longer than other objects."""
"""Cache users and profile objects longer than other objects.
Args:
key (google.cloud.datastore.key.Key): not ``google.cloud.ndb.key.Key``!
see https://github.com/googleapis/python-ndb/issues/987
Returns:
int: cache expiration for this object, in seconds
"""
logger.info(f'ndb-cache-key {key.__class__}')
if (key and
(key.kind in ('ActivityPub', 'ATProto', 'Follower', 'MagicKey')
or key.kind == 'Object' and PROFILE_ID_RE.search(key.name))):

Wyświetl plik

@ -16,7 +16,7 @@ from oauth_dropins.webutil import (
flask_util,
)
from common import global_cache, global_cache_timeout_policy
from common import cache_policy, global_cache, global_cache_timeout_policy
logger = logging.getLogger(__name__)
# logging.getLogger('lexrpc').setLevel(logging.INFO)
@ -43,11 +43,11 @@ app.url_map.redirect_defaults = False
app.wsgi_app = flask_util.ndb_context_middleware(
app.wsgi_app, client=appengine_config.ndb_client,
global_cache=global_cache,
global_cache_timeout_policy=global_cache_timeout_policy,
# disable context-local cache due to this bug:
# limited context-local cache. avoid full one due to this bug:
# https://github.com/googleapis/python-ndb/issues/888
cache_policy=lambda key: False)
cache_policy=cache_policy,
global_cache=global_cache,
global_cache_timeout_policy=global_cache_timeout_policy)
# deregister XRPC methods we don't support
for nsid in (

Wyświetl plik

@ -11,7 +11,7 @@ from oauth_dropins.webutil import (
# all protocols
import activitypub, atproto, web
from common import global_cache, global_cache_timeout_policy
from common import cache_policy, global_cache, global_cache_timeout_policy
import models
import protocol
@ -24,11 +24,11 @@ app.config.from_pyfile(app_dir / 'config.py')
app.wsgi_app = flask_util.ndb_context_middleware(
app.wsgi_app, client=appengine_config.ndb_client,
global_cache=global_cache,
global_cache_timeout_policy=global_cache_timeout_policy,
# disable context-local cache due to this bug:
# limited context-local cache. avoid full one due to this bug:
# https://github.com/googleapis/python-ndb/issues/888
cache_policy=lambda key: False)
cache_policy=cache_policy,
global_cache=global_cache,
global_cache_timeout_policy=global_cache_timeout_policy)
app.add_url_rule('/queue/poll-feed', view_func=web.poll_feed_task, methods=['POST'])
app.add_url_rule('/queue/receive', view_func=protocol.receive_task, methods=['POST'])

Wyświetl plik

@ -111,7 +111,21 @@ class CommonTest(TestCase):
with app.test_request_context(base_url='https://bsky.brid.gy', path='/foo'):
self.assertEqual('https://bsky.brid.gy/asdf', common.host_url('asdf'))
def test_global_cache_policy(self):
def test_cache_policy(self):
for id in 'did:plc:foo', 'did:web:foo':
self.assertTrue(common.cache_policy(Object(id=id).key._key))
for obj in (
ATProto(id='alice'),
ActivityPub(id='alice'),
Web(id='alice'),
Object(id='https://mastodon.social/users/alice'),
Object(id='at://did:plc:user/app.bsky.actor.profile/self'),
Follower(id='abc'),
):
self.assertFalse(common.cache_policy(obj.key._key))
def test_global_cache_timeout_policy(self):
for good in (
ATProto(id='alice'),
ActivityPub(id='alice'),

Wyświetl plik

@ -2604,9 +2604,9 @@ class ProtocolReceiveTest(TestCase):
def receive():
with app.test_request_context('/'), \
ndb_client.context(
global_cache=_InProcessGlobalCache(),
global_cache_timeout_policy=common.global_cache_timeout_policy,
cache_policy=lambda key: False):
cache_policy=common.cache_policy,
global_cache=_InProcessGlobalCache(),
global_cache_timeout_policy=common.global_cache_timeout_policy):
try:
Fake.receive_as1(note)
except NoContent: # raised by the second thread

Wyświetl plik

@ -285,9 +285,9 @@ class TestCase(unittest.TestCase, testutil.Asserts):
# clear datastore
requests.post(f'http://{ndb_client.host}/reset')
self.ndb_context = ndb_client.context(
cache_policy=common.cache_policy,
global_cache=_InProcessGlobalCache(),
global_cache_timeout_policy=global_cache_timeout_policy,
cache_policy=lambda key: False)
global_cache_timeout_policy=global_cache_timeout_policy)
self.ndb_context.__enter__()
util.now = lambda **kwargs: testutil.NOW