From f3039fc87a068b0c3ec9c6a517a1b7f9a7448324 Mon Sep 17 00:00:00 2001 From: Ryan Barrett Date: Fri, 13 Oct 2023 18:28:04 -0700 Subject: [PATCH] Protocol.send: bring back log_data kwarg --- activitypub.py | 12 ++++++++---- atproto.py | 10 +++++++--- protocol.py | 14 ++++++++++---- tests/test_protocol.py | 2 +- tests/testutil.py | 2 +- 5 files changed, 27 insertions(+), 13 deletions(-) diff --git a/activitypub.py b/activitypub.py index a393aca..0c3fc8f 100644 --- a/activitypub.py +++ b/activitypub.py @@ -187,7 +187,7 @@ class ActivityPub(User, Protocol): return actor.get('publicInbox') or actor.get('inbox') @classmethod - def send(to_cls, obj, url, orig_obj=None): + def send(to_cls, obj, url, orig_obj=None, log_data=True): """Delivers an activity to an inbox URL. If ``obj.recipient_obj`` is set, it's interpreted as the receiving actor @@ -206,7 +206,7 @@ class ActivityPub(User, Protocol): elif not activity.get('actor'): logger.warning('Outgoing AP activity has no actor!') - return signed_post(url, data=activity).ok + return signed_post(url, log_data=True, data=activity).ok @classmethod def fetch(cls, obj, **kwargs): @@ -418,7 +418,7 @@ def signed_post(url, **kwargs): return signed_request(util.requests_post, url, **kwargs) -def signed_request(fn, url, data=None, headers=None, **kwargs): +def signed_request(fn, url, data=None, log_data=True, headers=None, **kwargs): """Wraps ``requests.*`` and adds HTTP Signature. If the current session has a user (ie in ``g.user``), signs with that user's @@ -428,6 +428,7 @@ def signed_request(fn, url, data=None, headers=None, **kwargs): fn (callable): :func:`util.requests_get` or :func:`util.requests_post` url (str): data (dict): optional AS2 object + log_data (bool): whether to log full data object kwargs: passed through to requests Returns: @@ -440,6 +441,8 @@ def signed_request(fn, url, data=None, headers=None, **kwargs): user = g.user or default_signature_user() if data: + if log_data: + logger.info(f'Sending AS2 object: {json_dumps(data, indent=2)}') data = json_dumps(data).encode() headers = { @@ -475,7 +478,8 @@ def signed_request(fn, url, data=None, headers=None, **kwargs): # handle GET redirects manually so that we generate a new HTTP signature if resp.is_redirect and fn == util.requests_get: new_url = urljoin(url, resp.headers['Location']) - return signed_request(fn, new_url, data=data, headers=headers, **kwargs) + return signed_request(fn, new_url, data=data, headers=headers, + log_data=log_data, **kwargs) type = common.content_type(resp) if (type and type != 'text/html' and diff --git a/atproto.py b/atproto.py index 6ba324d..1b75620 100644 --- a/atproto.py +++ b/atproto.py @@ -257,7 +257,7 @@ class ATProto(User, Protocol): user.put() @classmethod - def send(to_cls, obj, url, orig_obj=None): + def send(to_cls, obj, url, orig_obj=None, log_data=True): """Creates a record if we own its repo. Creates the repo first if it doesn't exist. @@ -311,8 +311,12 @@ class ATProto(User, Protocol): ndb.transactional() def write(): tid = next_tid() - record_json = json_dumps(dag_json.encode(record).decode(), indent=2) - logger.info(f'Storing ATProto app.bsky.feed.post {tid}: {record_json}') + log_msg = f'Storing ATProto app.bsky.feed.post {tid}' + if log_data: + log_msg += ': ' + json_dumps(dag_json.encode(record).decode(), + indent=2) + logger.info(log_msg) + repo.apply_writes( [Write(action=Action.CREATE, collection='app.bsky.feed.post', rkey=tid, record=record)]) diff --git a/protocol.py b/protocol.py index 92ec3d0..165c5c3 100644 --- a/protocol.py +++ b/protocol.py @@ -373,7 +373,7 @@ class Protocol: return g.user.key @classmethod - def send(to_cls, obj, url, orig_obj=None): + def send(to_cls, obj, url, orig_obj=None, log_data=True): """Sends an outgoing activity. To be implemented by subclasses. @@ -383,6 +383,7 @@ class Protocol: url (str): destination URL to send to orig_obj (models.Object): the "original object" that this object refers to, eg replies to or reposts or likes + log_data (bool): whether to log full data object Returns: bool: True if the activity is sent successfully, False if it is @@ -832,16 +833,18 @@ class Protocol: ) logger.info(f'Delivering to: {obj.undelivered}') + log_data = True errors = [] # stores (target URL, code, body) tuples # deliver to all targets, in parallel, with a thread pool with ThreadPoolExecutor(max_workers=DELIVER_THREADS, thread_name_prefix='deliver') as executor: results = [] + log_data = True for target, orig_obj in sorted_targets: @copy_current_request_context - def deliver_one(target, orig_obj, g_user): + def deliver_one(target, orig_obj, g_user, log_data): """Runs on a separate thread! Note that this has to be defined *inside* the loop, once per @@ -857,7 +860,8 @@ class Protocol: with ndb_client.context(): try: - sent = protocol.send(obj, target.uri, orig_obj=orig_obj) + sent = protocol.send(obj, target.uri, orig_obj=orig_obj, + log_data=log_data) if sent: obj.add('delivered', target) obj.remove('undelivered', target) @@ -871,7 +875,9 @@ class Protocol: obj.put() - results.append(executor.submit(deliver_one, target, orig_obj, g.user)) + results.append(executor.submit(deliver_one, target, orig_obj, + g.user, log_data)) + log_data = False # re-raise any exception that were raised for r in results: diff --git a/tests/test_protocol.py b/tests/test_protocol.py index 5871460..b99aaf6 100644 --- a/tests/test_protocol.py +++ b/tests/test_protocol.py @@ -989,7 +989,7 @@ class ProtocolReceiveTest(TestCase): } sent = [] - def send(obj, url, orig_obj=None): + def send(obj, url, orig_obj=None, log_data=True): self.assertEqual(create_as1, obj.as1) if not sent: self.assertEqual('target:1', url) diff --git a/tests/testutil.py b/tests/testutil.py index 229e10b..5706de5 100644 --- a/tests/testutil.py +++ b/tests/testutil.py @@ -102,7 +102,7 @@ class Fake(User, protocol.Protocol): return url.startswith('fake:blocklisted') @classmethod - def send(cls, obj, url, orig_obj=None): + def send(cls, obj, url, orig_obj=None, log_data=True): logger.info(f'Fake.send {url}') cls.sent.append((obj, url)) return True