kopia lustrzana https://github.com/snarfed/bridgy-fed
rodzic
0f12c755a4
commit
4097fe71fb
|
@ -24,6 +24,7 @@ from oauth_dropins.webutil.util import json_dumps, json_loads
|
||||||
from atproto import ATProto, Cursor
|
from atproto import ATProto, Cursor
|
||||||
from common import (
|
from common import (
|
||||||
add,
|
add,
|
||||||
|
cache_policy,
|
||||||
create_task,
|
create_task,
|
||||||
global_cache,
|
global_cache,
|
||||||
global_cache_timeout_policy,
|
global_cache_timeout_policy,
|
||||||
|
@ -67,7 +68,7 @@ def load_dids():
|
||||||
def _load_dids():
|
def _load_dids():
|
||||||
global atproto_dids, atproto_loaded_at, bridged_dids, bridged_loaded_at
|
global atproto_dids, atproto_loaded_at, bridged_dids, bridged_loaded_at
|
||||||
|
|
||||||
with ndb_client.context(global_cache=global_cache,
|
with ndb_client.context(cache_policy=cache_policy, global_cache=global_cache,
|
||||||
global_cache_timeout_policy=global_cache_timeout_policy):
|
global_cache_timeout_policy=global_cache_timeout_policy):
|
||||||
if not DEBUG:
|
if not DEBUG:
|
||||||
Timer(STORE_CURSOR_FREQ.total_seconds(), _load_dids).start()
|
Timer(STORE_CURSOR_FREQ.total_seconds(), _load_dids).start()
|
||||||
|
@ -97,9 +98,8 @@ def subscriber():
|
||||||
while True:
|
while True:
|
||||||
try:
|
try:
|
||||||
with ndb_client.context(
|
with ndb_client.context(
|
||||||
global_cache=global_cache,
|
cache_policy=cache_policy, global_cache=global_cache,
|
||||||
global_cache_timeout_policy=global_cache_timeout_policy,
|
global_cache_timeout_policy=global_cache_timeout_policy):
|
||||||
cache_policy=lambda key: False):
|
|
||||||
subscribe()
|
subscribe()
|
||||||
|
|
||||||
logger.info(f'disconnected! waiting {RECONNECT_DELAY} and then reconnecting')
|
logger.info(f'disconnected! waiting {RECONNECT_DELAY} and then reconnecting')
|
||||||
|
@ -260,9 +260,8 @@ def handler():
|
||||||
while True:
|
while True:
|
||||||
try:
|
try:
|
||||||
with ndb_client.context(
|
with ndb_client.context(
|
||||||
global_cache=global_cache,
|
cache_policy=cache_policy, global_cache=global_cache,
|
||||||
global_cache_timeout_policy=global_cache_timeout_policy,
|
global_cache_timeout_policy=global_cache_timeout_policy):
|
||||||
cache_policy=lambda key: False):
|
|
||||||
handle()
|
handle()
|
||||||
|
|
||||||
# if we return cleanly, that means we hit the limit
|
# if we return cleanly, that means we hit the limit
|
||||||
|
|
|
@ -20,7 +20,7 @@ import pytz
|
||||||
# all protocols
|
# all protocols
|
||||||
import activitypub, atproto, web
|
import activitypub, atproto, web
|
||||||
import atproto_firehose
|
import atproto_firehose
|
||||||
from common import global_cache, global_cache_timeout_policy, USER_AGENT
|
from common import cache_policy, global_cache, global_cache_timeout_policy, USER_AGENT
|
||||||
import models
|
import models
|
||||||
|
|
||||||
# as of 2024-07-10
|
# as of 2024-07-10
|
||||||
|
@ -42,11 +42,11 @@ app.config.from_pyfile(app_dir / 'config.py')
|
||||||
|
|
||||||
app.wsgi_app = flask_util.ndb_context_middleware(
|
app.wsgi_app = flask_util.ndb_context_middleware(
|
||||||
app.wsgi_app, client=appengine_config.ndb_client,
|
app.wsgi_app, client=appengine_config.ndb_client,
|
||||||
|
# limited context-local cache. avoid full one due to this bug:
|
||||||
|
# https://github.com/googleapis/python-ndb/issues/888
|
||||||
|
cache_policy=cache_policy,
|
||||||
global_cache=global_cache,
|
global_cache=global_cache,
|
||||||
global_cache_timeout_policy=global_cache_timeout_policy,
|
global_cache_timeout_policy=global_cache_timeout_policy,
|
||||||
# disable context-local cache due to this bug:
|
|
||||||
# https://github.com/googleapis/python-ndb/issues/888
|
|
||||||
cache_policy=lambda key: False)
|
|
||||||
|
|
||||||
|
|
||||||
@app.get('/liveness_check')
|
@app.get('/liveness_check')
|
||||||
|
|
27
common.py
27
common.py
|
@ -386,6 +386,22 @@ def report_error(msg, *, exception=False, **kwargs):
|
||||||
logger.warning(f'Failed to report error! {kwargs}', exc_info=exception)
|
logger.warning(f'Failed to report error! {kwargs}', exc_info=exception)
|
||||||
|
|
||||||
|
|
||||||
|
def cache_policy(key):
|
||||||
|
"""In memory ndb cache, only DID docs right now.
|
||||||
|
|
||||||
|
https://github.com/snarfed/bridgy-fed/issues/1149#issuecomment-2261383697
|
||||||
|
|
||||||
|
Args:
|
||||||
|
key (google.cloud.datastore.key.Key): not ``google.cloud.ndb.key.Key``!
|
||||||
|
see https://github.com/googleapis/python-ndb/issues/987
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
bool: whether to cache this object
|
||||||
|
"""
|
||||||
|
logger.info(f'ndb-cache-key {key.__class__}')
|
||||||
|
return key and key.kind == 'Object' and key.name.startswith('did:')
|
||||||
|
|
||||||
|
|
||||||
PROFILE_ID_RE = re.compile(
|
PROFILE_ID_RE = re.compile(
|
||||||
fr"""
|
fr"""
|
||||||
/users?/[^/]+$ |
|
/users?/[^/]+$ |
|
||||||
|
@ -395,7 +411,16 @@ PROFILE_ID_RE = re.compile(
|
||||||
""", re.VERBOSE)
|
""", re.VERBOSE)
|
||||||
|
|
||||||
def global_cache_timeout_policy(key):
|
def global_cache_timeout_policy(key):
|
||||||
"""Cache users and profile objects longer than other objects."""
|
"""Cache users and profile objects longer than other objects.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
key (google.cloud.datastore.key.Key): not ``google.cloud.ndb.key.Key``!
|
||||||
|
see https://github.com/googleapis/python-ndb/issues/987
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
int: cache expiration for this object, in seconds
|
||||||
|
"""
|
||||||
|
logger.info(f'ndb-cache-key {key.__class__}')
|
||||||
if (key and
|
if (key and
|
||||||
(key.kind in ('ActivityPub', 'ATProto', 'Follower', 'MagicKey')
|
(key.kind in ('ActivityPub', 'ATProto', 'Follower', 'MagicKey')
|
||||||
or key.kind == 'Object' and PROFILE_ID_RE.search(key.name))):
|
or key.kind == 'Object' and PROFILE_ID_RE.search(key.name))):
|
||||||
|
|
10
flask_app.py
10
flask_app.py
|
@ -16,7 +16,7 @@ from oauth_dropins.webutil import (
|
||||||
flask_util,
|
flask_util,
|
||||||
)
|
)
|
||||||
|
|
||||||
from common import global_cache, global_cache_timeout_policy
|
from common import cache_policy, global_cache, global_cache_timeout_policy
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
# logging.getLogger('lexrpc').setLevel(logging.INFO)
|
# logging.getLogger('lexrpc').setLevel(logging.INFO)
|
||||||
|
@ -43,11 +43,11 @@ app.url_map.redirect_defaults = False
|
||||||
|
|
||||||
app.wsgi_app = flask_util.ndb_context_middleware(
|
app.wsgi_app = flask_util.ndb_context_middleware(
|
||||||
app.wsgi_app, client=appengine_config.ndb_client,
|
app.wsgi_app, client=appengine_config.ndb_client,
|
||||||
global_cache=global_cache,
|
# limited context-local cache. avoid full one due to this bug:
|
||||||
global_cache_timeout_policy=global_cache_timeout_policy,
|
|
||||||
# disable context-local cache due to this bug:
|
|
||||||
# https://github.com/googleapis/python-ndb/issues/888
|
# https://github.com/googleapis/python-ndb/issues/888
|
||||||
cache_policy=lambda key: False)
|
cache_policy=cache_policy,
|
||||||
|
global_cache=global_cache,
|
||||||
|
global_cache_timeout_policy=global_cache_timeout_policy)
|
||||||
|
|
||||||
# deregister XRPC methods we don't support
|
# deregister XRPC methods we don't support
|
||||||
for nsid in (
|
for nsid in (
|
||||||
|
|
10
router.py
10
router.py
|
@ -11,7 +11,7 @@ from oauth_dropins.webutil import (
|
||||||
|
|
||||||
# all protocols
|
# all protocols
|
||||||
import activitypub, atproto, web
|
import activitypub, atproto, web
|
||||||
from common import global_cache, global_cache_timeout_policy
|
from common import cache_policy, global_cache, global_cache_timeout_policy
|
||||||
import models
|
import models
|
||||||
import protocol
|
import protocol
|
||||||
|
|
||||||
|
@ -24,11 +24,11 @@ app.config.from_pyfile(app_dir / 'config.py')
|
||||||
|
|
||||||
app.wsgi_app = flask_util.ndb_context_middleware(
|
app.wsgi_app = flask_util.ndb_context_middleware(
|
||||||
app.wsgi_app, client=appengine_config.ndb_client,
|
app.wsgi_app, client=appengine_config.ndb_client,
|
||||||
global_cache=global_cache,
|
# limited context-local cache. avoid full one due to this bug:
|
||||||
global_cache_timeout_policy=global_cache_timeout_policy,
|
|
||||||
# disable context-local cache due to this bug:
|
|
||||||
# https://github.com/googleapis/python-ndb/issues/888
|
# https://github.com/googleapis/python-ndb/issues/888
|
||||||
cache_policy=lambda key: False)
|
cache_policy=cache_policy,
|
||||||
|
global_cache=global_cache,
|
||||||
|
global_cache_timeout_policy=global_cache_timeout_policy)
|
||||||
|
|
||||||
app.add_url_rule('/queue/poll-feed', view_func=web.poll_feed_task, methods=['POST'])
|
app.add_url_rule('/queue/poll-feed', view_func=web.poll_feed_task, methods=['POST'])
|
||||||
app.add_url_rule('/queue/receive', view_func=protocol.receive_task, methods=['POST'])
|
app.add_url_rule('/queue/receive', view_func=protocol.receive_task, methods=['POST'])
|
||||||
|
|
|
@ -111,7 +111,21 @@ class CommonTest(TestCase):
|
||||||
with app.test_request_context(base_url='https://bsky.brid.gy', path='/foo'):
|
with app.test_request_context(base_url='https://bsky.brid.gy', path='/foo'):
|
||||||
self.assertEqual('https://bsky.brid.gy/asdf', common.host_url('asdf'))
|
self.assertEqual('https://bsky.brid.gy/asdf', common.host_url('asdf'))
|
||||||
|
|
||||||
def test_global_cache_policy(self):
|
def test_cache_policy(self):
|
||||||
|
for id in 'did:plc:foo', 'did:web:foo':
|
||||||
|
self.assertTrue(common.cache_policy(Object(id=id).key._key))
|
||||||
|
|
||||||
|
for obj in (
|
||||||
|
ATProto(id='alice'),
|
||||||
|
ActivityPub(id='alice'),
|
||||||
|
Web(id='alice'),
|
||||||
|
Object(id='https://mastodon.social/users/alice'),
|
||||||
|
Object(id='at://did:plc:user/app.bsky.actor.profile/self'),
|
||||||
|
Follower(id='abc'),
|
||||||
|
):
|
||||||
|
self.assertFalse(common.cache_policy(obj.key._key))
|
||||||
|
|
||||||
|
def test_global_cache_timeout_policy(self):
|
||||||
for good in (
|
for good in (
|
||||||
ATProto(id='alice'),
|
ATProto(id='alice'),
|
||||||
ActivityPub(id='alice'),
|
ActivityPub(id='alice'),
|
||||||
|
|
|
@ -2604,9 +2604,9 @@ class ProtocolReceiveTest(TestCase):
|
||||||
def receive():
|
def receive():
|
||||||
with app.test_request_context('/'), \
|
with app.test_request_context('/'), \
|
||||||
ndb_client.context(
|
ndb_client.context(
|
||||||
|
cache_policy=common.cache_policy,
|
||||||
global_cache=_InProcessGlobalCache(),
|
global_cache=_InProcessGlobalCache(),
|
||||||
global_cache_timeout_policy=common.global_cache_timeout_policy,
|
global_cache_timeout_policy=common.global_cache_timeout_policy):
|
||||||
cache_policy=lambda key: False):
|
|
||||||
try:
|
try:
|
||||||
Fake.receive_as1(note)
|
Fake.receive_as1(note)
|
||||||
except NoContent: # raised by the second thread
|
except NoContent: # raised by the second thread
|
||||||
|
|
|
@ -285,9 +285,9 @@ class TestCase(unittest.TestCase, testutil.Asserts):
|
||||||
# clear datastore
|
# clear datastore
|
||||||
requests.post(f'http://{ndb_client.host}/reset')
|
requests.post(f'http://{ndb_client.host}/reset')
|
||||||
self.ndb_context = ndb_client.context(
|
self.ndb_context = ndb_client.context(
|
||||||
|
cache_policy=common.cache_policy,
|
||||||
global_cache=_InProcessGlobalCache(),
|
global_cache=_InProcessGlobalCache(),
|
||||||
global_cache_timeout_policy=global_cache_timeout_policy,
|
global_cache_timeout_policy=global_cache_timeout_policy)
|
||||||
cache_policy=lambda key: False)
|
|
||||||
self.ndb_context.__enter__()
|
self.ndb_context.__enter__()
|
||||||
|
|
||||||
util.now = lambda **kwargs: testutil.NOW
|
util.now = lambda **kwargs: testutil.NOW
|
||||||
|
|
Ładowanie…
Reference in New Issue