Revert "temporarily disable tasklet patch and hydrating objects/authors in pages.serve_feed"

This reverts commit 01ba8b5d50.

didn't help with ndb caching

for #1149
ndb-logging
Ryan Barrett 2024-12-19 16:41:55 -08:00
rodzic 01ba8b5d50
commit 572ca2c9ab
Nie znaleziono w bazie danych klucza dla tego podpisu
ID klucza GPG: 6BE31FDF4776E9D4
3 zmienionych plików z 55 dodań i 59 usunięć

Wyświetl plik

@ -62,40 +62,40 @@ lexrpc.flask_server.init_flask(arroba.server.server, app)
###########################################
# # https://github.com/googleapis/python-ndb/issues/743#issuecomment-2067590945
# #
# # fixes "RuntimeError: Key has already been set in this batch" errors due to
# # tasklets in pages.serve_feed
# from logging import error as log_error
# from sys import modules
# https://github.com/googleapis/python-ndb/issues/743#issuecomment-2067590945
#
# fixes "RuntimeError: Key has already been set in this batch" errors due to
# tasklets in pages.serve_feed
from logging import error as log_error
from sys import modules
# from google.cloud.datastore_v1.types.entity import Key
# from google.cloud.ndb._cache import (
# _GlobalCacheSetBatch,
# global_compare_and_swap,
# global_set_if_not_exists,
# global_watch,
# )
# from google.cloud.ndb.tasklets import Future, Return, tasklet
from google.cloud.datastore_v1.types.entity import Key
from google.cloud.ndb._cache import (
_GlobalCacheSetBatch,
global_compare_and_swap,
global_set_if_not_exists,
global_watch,
)
from google.cloud.ndb.tasklets import Future, Return, tasklet
# GLOBAL_CACHE_KEY_PREFIX: bytes = modules["google.cloud.ndb._cache"]._PREFIX
# LOCKED_FOR_READ: bytes = modules["google.cloud.ndb._cache"]._LOCKED_FOR_READ
# LOCK_TIME: bytes = modules["google.cloud.ndb._cache"]._LOCK_TIME
GLOBAL_CACHE_KEY_PREFIX: bytes = modules["google.cloud.ndb._cache"]._PREFIX
LOCKED_FOR_READ: bytes = modules["google.cloud.ndb._cache"]._LOCKED_FOR_READ
LOCK_TIME: bytes = modules["google.cloud.ndb._cache"]._LOCK_TIME
# @tasklet
# def custom_global_lock_for_read(key: str, value: str):
# if value is not None:
# yield global_watch(key, value)
# lock_acquired = yield global_compare_and_swap(
# key, LOCKED_FOR_READ, expires=LOCK_TIME
# )
# else:
# lock_acquired = yield global_set_if_not_exists(
# key, LOCKED_FOR_READ, expires=LOCK_TIME
# )
@tasklet
def custom_global_lock_for_read(key: str, value: str):
if value is not None:
yield global_watch(key, value)
lock_acquired = yield global_compare_and_swap(
key, LOCKED_FOR_READ, expires=LOCK_TIME
)
else:
lock_acquired = yield global_set_if_not_exists(
key, LOCKED_FOR_READ, expires=LOCK_TIME
)
# if lock_acquired:
# raise Return(LOCKED_FOR_READ)
if lock_acquired:
raise Return(LOCKED_FOR_READ)
# modules["google.cloud.ndb._cache"].global_lock_for_read = custom_global_lock_for_read
modules["google.cloud.ndb._cache"].global_lock_for_read = custom_global_lock_for_read

Wyświetl plik

@ -260,34 +260,33 @@ def serve_feed(*, objects, format, user, title, as_snippets=False, quiet=False):
else:
activities = [obj.as1 for obj in objects]
# TODO: bring back?
# # hydrate authors, actors, objects from stored Objects
# fields = 'author', 'actor', 'object'
# gets = []
# for a in activities:
# for field in fields:
# val = as1.get_object(a, field)
# if val and val.keys() <= set(['id']):
# def hydrate(a, f):
# def maybe_set(future):
# if future.result() and future.result().as1:
# a[f] = future.result().as1
# return maybe_set
# hydrate authors, actors, objects from stored Objects
fields = 'author', 'actor', 'object'
gets = []
for a in activities:
for field in fields:
val = as1.get_object(a, field)
if val and val.keys() <= set(['id']):
def hydrate(a, f):
def maybe_set(future):
if future.result() and future.result().as1:
a[f] = future.result().as1
return maybe_set
# # TODO: extract a Protocol class method out of User.profile_id,
# # then use that here instead. the catch is that we'd need to
# # determine Protocol for every id, which is expensive.
# #
# # same TODO is in models.fetch_objects
# id = val['id']
# if id.startswith('did:'):
# id = f'at://{id}/app.bsky.actor.profile/self'
# TODO: extract a Protocol class method out of User.profile_id,
# then use that here instead. the catch is that we'd need to
# determine Protocol for every id, which is expensive.
#
# same TODO is in models.fetch_objects
id = val['id']
if id.startswith('did:'):
id = f'at://{id}/app.bsky.actor.profile/self'
# future = Object.get_by_id_async(id)
# future.add_done_callback(hydrate(a, field))
# gets.append(future)
future = Object.get_by_id_async(id)
future.add_done_callback(hydrate(a, field))
gets.append(future)
# tasklets.wait_all(gets)
tasklets.wait_all(gets)
actor = (user.obj.as1 if user.obj and user.obj.as1
else {'displayName': user.readable_id, 'url': user.web_url()})

Wyświetl plik

@ -488,7 +488,6 @@ class PagesTest(TestCase):
self.assert_equals(200, got.status_code)
self.assert_equals([], microformats2.html_to_activities(got.text))
@skip
def test_feed_html(self):
self.add_objects()
@ -525,7 +524,6 @@ class PagesTest(TestCase):
self.user.put()
self.test_feed_atom_empty()
@skip
def test_feed_atom(self):
self.add_objects()
got = self.client.get('/web/user.com/feed?format=atom')
@ -549,7 +547,6 @@ class PagesTest(TestCase):
self.assert_equals(rss.CONTENT_TYPE, got.headers['Content-Type'])
self.assert_equals([], rss.to_activities(got.text))
@skip
def test_feed_rss(self):
self.add_objects()
got = self.client.get('/web/user.com/feed?format=rss')