kopia lustrzana https://github.com/snarfed/bridgy-fed
remove datastore transactions from User/Object/Follow.get_or_create
...because transactions don't read or write memcache. :/ Fortunately we don't really depend on atomicity for much, last writer wins is pretty much always fine. for #1149pull/1690/head
rodzic
ffd2be6c4b
commit
d32109b281
149
models.py
149
models.py
|
|
@ -273,6 +273,10 @@ class User(StringIdModel, metaclass=ProtocolUserMeta):
|
|||
reload=False, **kwargs):
|
||||
"""Loads and returns a :class:`User`. Creates it if necessary.
|
||||
|
||||
Not transactional because transactions don't read or write memcache. :/
|
||||
Fortunately we don't really depend on atomicity for anything, last
|
||||
writer wins is pretty much always fine.
|
||||
|
||||
Args:
|
||||
propagate (bool): whether to create copies of this user in push-based
|
||||
protocols, eg ATProto and Nostr.
|
||||
|
|
@ -286,90 +290,81 @@ class User(StringIdModel, metaclass=ProtocolUserMeta):
|
|||
"""
|
||||
assert cls != User
|
||||
|
||||
@ndb.transactional()
|
||||
def _run():
|
||||
user = cls.get_by_id(id, allow_opt_out=True)
|
||||
if user:
|
||||
if reload:
|
||||
user.reload_profile(gateway=True, raise_=False)
|
||||
user = cls.get_by_id(id, allow_opt_out=True)
|
||||
if user:
|
||||
if reload:
|
||||
user.reload_profile(gateway=True, raise_=False)
|
||||
|
||||
if user.status and not allow_opt_out:
|
||||
return None
|
||||
user.existing = True
|
||||
if user.status and not allow_opt_out:
|
||||
return None
|
||||
user.existing = True
|
||||
|
||||
# TODO: propagate more fields?
|
||||
changed = False
|
||||
for field in ['obj', 'obj_key']:
|
||||
old_val = getattr(user, field, None)
|
||||
new_val = kwargs.get(field)
|
||||
if old_val is None and new_val is not None:
|
||||
setattr(user, field, new_val)
|
||||
changed = True
|
||||
|
||||
if enabled_protocols := kwargs.get('enabled_protocols'):
|
||||
user.enabled_protocols = (set(user.enabled_protocols)
|
||||
| set(enabled_protocols))
|
||||
# TODO: propagate more fields?
|
||||
changed = False
|
||||
for field in ['obj', 'obj_key']:
|
||||
old_val = getattr(user, field, None)
|
||||
new_val = kwargs.get(field)
|
||||
if old_val is None and new_val is not None:
|
||||
setattr(user, field, new_val)
|
||||
changed = True
|
||||
|
||||
if not propagate:
|
||||
if changed:
|
||||
user.put()
|
||||
return user
|
||||
if enabled_protocols := kwargs.get('enabled_protocols'):
|
||||
user.enabled_protocols = (set(user.enabled_protocols)
|
||||
| set(enabled_protocols))
|
||||
changed = True
|
||||
|
||||
else:
|
||||
if orig_key := get_original_user_key(id):
|
||||
orig = orig_key.get()
|
||||
if orig.status and not allow_opt_out:
|
||||
return None
|
||||
orig.existing = False
|
||||
return orig
|
||||
if not propagate:
|
||||
if changed:
|
||||
user.put()
|
||||
return user
|
||||
|
||||
user = cls(id=id, **kwargs)
|
||||
user.existing = False
|
||||
user.reload_profile(gateway=True, raise_=False)
|
||||
if user.status and not allow_opt_out:
|
||||
else:
|
||||
if orig_key := get_original_user_key(id):
|
||||
orig = orig_key.get()
|
||||
if orig.status and not allow_opt_out:
|
||||
return None
|
||||
orig.existing = False
|
||||
return orig
|
||||
|
||||
if propagate and not user.status:
|
||||
for label in user.enabled_protocols + list(user.DEFAULT_ENABLED_PROTOCOLS):
|
||||
proto = PROTOCOLS[label]
|
||||
if proto == cls:
|
||||
continue
|
||||
elif proto.HAS_COPIES:
|
||||
if not user.get_copy(proto) and user.is_enabled(proto):
|
||||
try:
|
||||
proto.create_for(user)
|
||||
except (ValueError, AssertionError):
|
||||
logger.info(f'failed creating {proto.LABEL} copy',
|
||||
exc_info=True)
|
||||
util.remove(user.enabled_protocols, proto.LABEL)
|
||||
else:
|
||||
logger.debug(f'{proto.LABEL} not enabled or user copy already exists, skipping propagate')
|
||||
user = cls(id=id, **kwargs)
|
||||
user.existing = False
|
||||
user.reload_profile(gateway=True, raise_=False)
|
||||
if user.status and not allow_opt_out:
|
||||
return None
|
||||
|
||||
# generate keys for all protocols _except_ our own
|
||||
#
|
||||
# these can use urandom() and do nontrivial math, so they can take time
|
||||
# depending on the amount of randomness available and compute needed.
|
||||
if not user.existing and cls.LABEL != 'activitypub':
|
||||
key = RSA.generate(KEY_BITS,
|
||||
randfunc=random.randbytes if DEBUG else None)
|
||||
user.mod = long_to_base64(key.n)
|
||||
user.public_exponent = long_to_base64(key.e)
|
||||
user.private_exponent = long_to_base64(key.d)
|
||||
if propagate and not user.status:
|
||||
for label in user.enabled_protocols + list(user.DEFAULT_ENABLED_PROTOCOLS):
|
||||
proto = PROTOCOLS[label]
|
||||
if proto == cls:
|
||||
continue
|
||||
elif proto.HAS_COPIES:
|
||||
if not user.get_copy(proto) and user.is_enabled(proto):
|
||||
try:
|
||||
proto.create_for(user)
|
||||
except (ValueError, AssertionError):
|
||||
logger.info(f'failed creating {proto.LABEL} copy',
|
||||
exc_info=True)
|
||||
util.remove(user.enabled_protocols, proto.LABEL)
|
||||
else:
|
||||
logger.debug(f'{proto.LABEL} not enabled or user copy already exists, skipping propagate')
|
||||
|
||||
try:
|
||||
user.put()
|
||||
except AssertionError as e:
|
||||
error(f'Bad {cls.__name__} id {id} : {e}')
|
||||
# generate keys for all protocols _except_ our own
|
||||
#
|
||||
# these can use urandom() and do nontrivial math, so they can take time
|
||||
# depending on the amount of randomness available and compute needed.
|
||||
if not user.existing and cls.LABEL != 'activitypub':
|
||||
key = RSA.generate(KEY_BITS,
|
||||
randfunc=random.randbytes if DEBUG else None)
|
||||
user.mod = long_to_base64(key.n)
|
||||
user.public_exponent = long_to_base64(key.e)
|
||||
user.private_exponent = long_to_base64(key.d)
|
||||
|
||||
return user
|
||||
|
||||
user = _run()
|
||||
|
||||
# load and propagate user and profile object
|
||||
if user:
|
||||
logger.debug(('Updated ' if user.existing else 'Created new ') + str(user))
|
||||
try:
|
||||
user.put()
|
||||
except AssertionError as e:
|
||||
error(f'Bad {cls.__name__} id {id} : {e}')
|
||||
|
||||
logger.debug(('Updated ' if user.existing else 'Created new ') + str(user))
|
||||
return user
|
||||
|
||||
@property
|
||||
|
|
@ -1055,7 +1050,6 @@ class Object(StringIdModel):
|
|||
logger.debug(f'Wrote {self.key}')
|
||||
|
||||
@classmethod
|
||||
@ndb.transactional()
|
||||
def get_or_create(cls, id, authed_as=None, **props):
|
||||
"""Returns an :class:`Object` with the given property values.
|
||||
|
||||
|
|
@ -1063,6 +1057,10 @@ class Object(StringIdModel):
|
|||
first. Only populates non-False/empty property values in props into the
|
||||
object. Also populates the :attr:`new` and :attr:`changed` properties.
|
||||
|
||||
Not transactional because transactions don't read or write memcache. :/
|
||||
Fortunately we don't really depend on atomicity for anything, last
|
||||
writer wins is pretty much always fine.
|
||||
|
||||
Args:
|
||||
authed_as (str): if a matching :class:`Object` already exists, its
|
||||
`author` or `actor` must contain this actor id. Implements basic
|
||||
|
|
@ -1419,10 +1417,13 @@ class Follower(ndb.Model):
|
|||
logger.debug(f'Wrote {self.key}')
|
||||
|
||||
@classmethod
|
||||
@ndb.transactional()
|
||||
def get_or_create(cls, *, from_, to, **kwargs):
|
||||
"""Returns a Follower with the given ``from_`` and ``to`` users.
|
||||
|
||||
Not transactional because transactions don't read or write memcache. :/
|
||||
Fortunately we don't really depend on atomicity for anything, last
|
||||
writer wins is pretty much always fine.
|
||||
|
||||
If a matching :class:`Follower` doesn't exist in the datastore, creates
|
||||
it first.
|
||||
|
||||
|
|
|
|||
Ładowanie…
Reference in New Issue