Protocol.send: bring back log_data kwarg

pull/684/head
Ryan Barrett 2023-10-13 18:28:04 -07:00
rodzic bd19851d6c
commit f3039fc87a
Nie znaleziono w bazie danych klucza dla tego podpisu
ID klucza GPG: 6BE31FDF4776E9D4
5 zmienionych plików z 27 dodań i 13 usunięć

Wyświetl plik

@ -187,7 +187,7 @@ class ActivityPub(User, Protocol):
return actor.get('publicInbox') or actor.get('inbox')
@classmethod
def send(to_cls, obj, url, orig_obj=None):
def send(to_cls, obj, url, orig_obj=None, log_data=True):
"""Delivers an activity to an inbox URL.
If ``obj.recipient_obj`` is set, it's interpreted as the receiving actor
@ -206,7 +206,7 @@ class ActivityPub(User, Protocol):
elif not activity.get('actor'):
logger.warning('Outgoing AP activity has no actor!')
return signed_post(url, data=activity).ok
return signed_post(url, log_data=True, data=activity).ok
@classmethod
def fetch(cls, obj, **kwargs):
@ -418,7 +418,7 @@ def signed_post(url, **kwargs):
return signed_request(util.requests_post, url, **kwargs)
def signed_request(fn, url, data=None, headers=None, **kwargs):
def signed_request(fn, url, data=None, log_data=True, headers=None, **kwargs):
"""Wraps ``requests.*`` and adds HTTP Signature.
If the current session has a user (ie in ``g.user``), signs with that user's
@ -428,6 +428,7 @@ def signed_request(fn, url, data=None, headers=None, **kwargs):
fn (callable): :func:`util.requests_get` or :func:`util.requests_post`
url (str):
data (dict): optional AS2 object
log_data (bool): whether to log full data object
kwargs: passed through to requests
Returns:
@ -440,6 +441,8 @@ def signed_request(fn, url, data=None, headers=None, **kwargs):
user = g.user or default_signature_user()
if data:
if log_data:
logger.info(f'Sending AS2 object: {json_dumps(data, indent=2)}')
data = json_dumps(data).encode()
headers = {
@ -475,7 +478,8 @@ def signed_request(fn, url, data=None, headers=None, **kwargs):
# handle GET redirects manually so that we generate a new HTTP signature
if resp.is_redirect and fn == util.requests_get:
new_url = urljoin(url, resp.headers['Location'])
return signed_request(fn, new_url, data=data, headers=headers, **kwargs)
return signed_request(fn, new_url, data=data, headers=headers,
log_data=log_data, **kwargs)
type = common.content_type(resp)
if (type and type != 'text/html' and

Wyświetl plik

@ -257,7 +257,7 @@ class ATProto(User, Protocol):
user.put()
@classmethod
def send(to_cls, obj, url, orig_obj=None):
def send(to_cls, obj, url, orig_obj=None, log_data=True):
"""Creates a record if we own its repo.
Creates the repo first if it doesn't exist.
@ -311,8 +311,12 @@ class ATProto(User, Protocol):
ndb.transactional()
def write():
tid = next_tid()
record_json = json_dumps(dag_json.encode(record).decode(), indent=2)
logger.info(f'Storing ATProto app.bsky.feed.post {tid}: {record_json}')
log_msg = f'Storing ATProto app.bsky.feed.post {tid}'
if log_data:
log_msg += ': ' + json_dumps(dag_json.encode(record).decode(),
indent=2)
logger.info(log_msg)
repo.apply_writes(
[Write(action=Action.CREATE, collection='app.bsky.feed.post',
rkey=tid, record=record)])

Wyświetl plik

@ -373,7 +373,7 @@ class Protocol:
return g.user.key
@classmethod
def send(to_cls, obj, url, orig_obj=None):
def send(to_cls, obj, url, orig_obj=None, log_data=True):
"""Sends an outgoing activity.
To be implemented by subclasses.
@ -383,6 +383,7 @@ class Protocol:
url (str): destination URL to send to
orig_obj (models.Object): the "original object" that this object
refers to, eg replies to or reposts or likes
log_data (bool): whether to log full data object
Returns:
bool: True if the activity is sent successfully, False if it is
@ -832,16 +833,18 @@ class Protocol:
)
logger.info(f'Delivering to: {obj.undelivered}')
log_data = True
errors = [] # stores (target URL, code, body) tuples
# deliver to all targets, in parallel, with a thread pool
with ThreadPoolExecutor(max_workers=DELIVER_THREADS,
thread_name_prefix='deliver') as executor:
results = []
log_data = True
for target, orig_obj in sorted_targets:
@copy_current_request_context
def deliver_one(target, orig_obj, g_user):
def deliver_one(target, orig_obj, g_user, log_data):
"""Runs on a separate thread!
Note that this has to be defined *inside* the loop, once per
@ -857,7 +860,8 @@ class Protocol:
with ndb_client.context():
try:
sent = protocol.send(obj, target.uri, orig_obj=orig_obj)
sent = protocol.send(obj, target.uri, orig_obj=orig_obj,
log_data=log_data)
if sent:
obj.add('delivered', target)
obj.remove('undelivered', target)
@ -871,7 +875,9 @@ class Protocol:
obj.put()
results.append(executor.submit(deliver_one, target, orig_obj, g.user))
results.append(executor.submit(deliver_one, target, orig_obj,
g.user, log_data))
log_data = False
# re-raise any exception that were raised
for r in results:

Wyświetl plik

@ -989,7 +989,7 @@ class ProtocolReceiveTest(TestCase):
}
sent = []
def send(obj, url, orig_obj=None):
def send(obj, url, orig_obj=None, log_data=True):
self.assertEqual(create_as1, obj.as1)
if not sent:
self.assertEqual('target:1', url)

Wyświetl plik

@ -102,7 +102,7 @@ class Fake(User, protocol.Protocol):
return url.startswith('fake:blocklisted')
@classmethod
def send(cls, obj, url, orig_obj=None):
def send(cls, obj, url, orig_obj=None, log_data=True):
logger.info(f'Fake.send {url}')
cls.sent.append((obj, url))
return True