AP: parallelize inbox delivery with a thread pool

fixes #652. this should make inbox deliveries roughly 10x faster.
pull/663/head
Ryan Barrett 2023-10-07 15:08:02 -07:00
rodzic 5efd97d867
commit d5499acaf3
Nie znaleziono w bazie danych klucza dla tego podpisu
ID klucza GPG: 6BE31FDF4776E9D4
2 zmienionych plików z 52 dodań i 26 usunięć

Wyświetl plik

@ -1,13 +1,15 @@
"""Base protocol class and common code."""
from concurrent.futures import ThreadPoolExecutor
import logging
import threading
from urllib.parse import urljoin
from cachetools import LRUCache
from flask import g, request
from flask import copy_current_request_context, g, request
from google.cloud import ndb
from google.cloud.ndb import OR
from granary import as1
from oauth_dropins.webutil.appengine_config import ndb_client
import werkzeug.exceptions
import common
@ -35,6 +37,8 @@ SUPPORTED_TYPES = (
'video',
)
DELIVER_THREADS = 10
# activity ids that we've already handled and can now ignore.
# used in Protocol.receive
seen_ids = LRUCache(100000)
@ -817,25 +821,47 @@ class Protocol:
errors = [] # stores (target URL, code, body) tuples
# deliver!
for target, orig_obj in sorted_targets:
assert target.uri
protocol = PROTOCOLS[target.protocol]
# deliver to all targets, in parallel, with a thread pool
with ThreadPoolExecutor(max_workers=DELIVER_THREADS) as executor:
results = []
try:
sent = protocol.send(obj, target.uri, orig_obj=orig_obj)
if sent:
obj.add('delivered', target)
obj.remove('undelivered', target)
except BaseException as e:
code, body = util.interpret_http_exception(e)
if not code and not body:
raise
obj.add('failed', target)
obj.remove('undelivered', target)
errors.append((target.uri, code, body))
for target, orig_obj in sorted_targets:
@copy_current_request_context
def deliver_one(target, orig_obj, g_user):
"""Runs on a separate thread!
obj.put()
Note that this has to be defined *inside* the loop, once per
target, since @copy_current_request_context copies when the
function is defined, and we need a fresh copy of the request
context for each call/thread.
https://www.kingname.info/2023/01/14/nested-thread-in-flask/
"""
assert target.uri
protocol = PROTOCOLS[target.protocol]
g.user = g_user
with ndb_client.context():
try:
sent = protocol.send(obj, target.uri, orig_obj=orig_obj)
if sent:
obj.add('delivered', target)
obj.remove('undelivered', target)
except BaseException as e:
code, body = util.interpret_http_exception(e)
if not code and not body:
raise
obj.add('failed', target)
obj.remove('undelivered', target)
errors.append((target.uri, code, body))
obj.put()
results.append(executor.submit(deliver_one, target, orig_obj, g.user))
# re-raise any exception that were raised
for r in results:
r.result()
# Pass the response status code and body through as our response
if obj.delivered:

Wyświetl plik

@ -1350,14 +1350,14 @@ class WebTest(TestCase):
self.as2_req('https://mas.to/mr-biff'),
))
calls = mock_post.call_args_list
self.assertEqual('https://mas.to/inbox', calls[0][0][0])
self.assertEqual(FOLLOW_AS2, json_loads(calls[0][1]['data']))
self.assertEqual('https://mas.to/inbox/biff', calls[1][0][0])
self.assertEqual({
**FOLLOW_AS2,
'object': 'https://mas.to/mr-biff',
}, json_loads(calls[1][1]['data']))
self.assertCountEqual([
(('https://mas.to/inbox',), FOLLOW_AS2),
(('https://mas.to/inbox/biff',), {
**FOLLOW_AS2,
'object': 'https://mas.to/mr-biff',
}),
], [(args, json_loads(kwargs['data']))
for args, kwargs in mock_post.call_args_list])
mf2 = util.parse_mf2(html)['items'][0]
mr_biff = ndb.Key(ActivityPub, 'https://mas.to/mr-biff')