add Object.lock and new add, remove, and put methods

pull/663/head
Ryan Barrett 2023-10-07 13:51:59 -07:00
rodzic dfff201c6f
commit 5efd97d867
Nie znaleziono w bazie danych klucza dla tego podpisu
ID klucza GPG: 6BE31FDF4776E9D4
3 zmienionych plików z 51 dodań i 14 usunięć

Wyświetl plik

@ -243,7 +243,7 @@ class ATProto(User, Protocol):
action=Action.CREATE, collection='app.bsky.actor.profile',
rkey='self', record=profile)]
uri = at_uri(user.atproto_did, 'app.bsky.actor.profile', 'self')
add(user.obj.copies, Target(uri=uri, protocol='atproto'))
user.obj.add('copies', Target(uri=uri, protocol='atproto'))
user.obj.put()
repo = Repo.create(
@ -317,7 +317,7 @@ class ATProto(User, Protocol):
rkey=tid, record=record)])
at_uri = f'at://{user.atproto_did}/app.bsky.feed.post/{tid}'
add(obj.copies, Target(uri=at_uri, protocol=to_cls.ABBREV))
obj.add('copies', Target(uri=at_uri, protocol=to_cls.ABBREV))
obj.put()
write()
@ -421,7 +421,7 @@ def poll_notifications():
source_protocol=ATProto.ABBREV)
if not obj.status:
obj.status = 'new'
add(obj.notify, user.key)
obj.add('notify', user.key)
obj.put()
common.create_task(queue='receive', obj=obj.key.urlsafe(),

Wyświetl plik

@ -4,6 +4,7 @@ import itertools
import json
import logging
import random
from threading import Lock
from urllib.parse import quote, urlparse
from arroba.datastore_storage import AtpRemoteBlob
@ -535,9 +536,12 @@ class Object(StringIdModel):
datastore. If either one is None, that means we don't know whether this
:class:`Object` is new/changed.
:attr:`changed` is populated by :meth:`Object.activity_changed()`.
:attr:`changed` is populated by :meth:`activity_changed()`.
"""
lock = None
"""Initialized in __init__, synchronizes property access, :meth:`put`s, etc."""
@ComputedJsonProperty
def as1(self):
# TODO: bring back log or assert? we have prod entities that currently
@ -609,6 +613,10 @@ class Object(StringIdModel):
if self.as1:
return as1.object_type(self.as1)
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.lock = Lock()
def _object_ids(self): # id(s) of inner objects
if self.as1:
return redirect_unwrap(as1.get_ids(self.as1, 'object'))
@ -643,8 +651,10 @@ class Object(StringIdModel):
f'at:// URI ids must have DID repos; got {self.key.id()}')
if self.as1 and self.as1.get('objectType') == 'activity':
# can't self.add because we're inside self.put, which has the lock
add(self.labels, 'activity')
elif 'activity' in self.labels:
# ditto
self.labels.remove('activity')
def _post_put_hook(self, future):
@ -719,13 +729,40 @@ class Object(StringIdModel):
obj.put()
return obj
def put(self, **kwargs):
"""Stores this object. Uses ``self.lock``.
"""
with self.lock:
return super().put(**kwargs)
def add(self, prop, val):
"""Adds a value to a multiply-valued property. Uses ``self.lock``.
Args:
prop (str)
val
"""
with self.lock:
add(getattr(self, prop), val)
def remove(self, prop, val):
"""Removes a value from a multiply-valued property. Uses ``self.lock``.
Args:
prop (str)
val
"""
with self.lock:
getattr(self, prop).remove(val)
def clear(self):
"""Clears all data properties."""
for prop in 'our_as1', 'as2', 'bsky', 'mf2', 'raw':
val = getattr(self, prop, None)
if val:
logger.warning(f'Wiping out existing {prop}: {json_dumps(val, indent=2)}')
setattr(self, prop, None)
with self.lock:
setattr(self, prop, None)
def as_as2(self):
"""Returns this object as an AS2 dict."""

Wyświetl plik

@ -528,7 +528,7 @@ class Protocol:
# add owner(s)
actor_key = from_cls.actor_key(obj, default_g_user=False)
if actor_key:
add(obj.users, actor_key)
obj.add('users', actor_key)
inner_obj_as1 = as1.get_object(obj.as1)
if obj.as1.get('verb') in ('post', 'update', 'delete'):
@ -536,7 +536,7 @@ class Protocol:
if inner_actor:
user_key = from_cls.key_for(inner_actor)
if user_key:
add(obj.users, user_key)
obj.add('users', user_key)
obj.source_protocol = from_cls.LABEL
obj.put()
@ -706,7 +706,7 @@ class Protocol:
direct=not to_user.direct)
follower_obj = Follower.get_or_create(to=to_user, from_=from_user,
follow=obj.key, status='active')
add(obj.notify, to_key)
obj.add('notify', to_key)
# send accept. note that this is one accept for the whole follow, even
# if it has multiple followees!
@ -825,14 +825,14 @@ class Protocol:
try:
sent = protocol.send(obj, target.uri, orig_obj=orig_obj)
if sent:
add(obj.delivered, target)
obj.undelivered.remove(target)
obj.add('delivered', target)
obj.remove('undelivered', target)
except BaseException as e:
code, body = util.interpret_http_exception(e)
if not code and not body:
raise
add(obj.failed, target)
obj.undelivered.remove(target)
obj.add('failed', target)
obj.remove('undelivered', target)
errors.append((target.uri, code, body))
obj.put()
@ -918,7 +918,7 @@ class Protocol:
orig_user = protocol.actor_key(orig_obj, default_g_user=False)
if orig_user:
logger.info(f'Recipient is {orig_user}')
add(obj.notify, orig_user)
obj.add('notify', orig_user)
logger.info(f'Direct targets: {targets.keys()}')
@ -953,7 +953,7 @@ class Protocol:
for user in users:
if feed_obj:
add(feed_obj.feed, user.key)
feed_obj.add('feed', user.key)
# TODO: should we pass remote=False through here to Protocol.load?
target = user.target_for(user.obj, shared=True) if user.obj else None