add new atproto-commit task queue, enqueue tasks for new commits

hub will consume these and emit them to subscribeRepos subscribers
pull/634/head
Ryan Barrett 2023-09-05 20:10:11 -07:00
rodzic f6d8f3a741
commit f91486db75
Nie znaleziono w bazie danych klucza dla tego podpisu
ID klucza GPG: 6BE31FDF4776E9D4
4 zmienionych plików z 83 dodań i 11 usunięć

Wyświetl plik

@ -14,7 +14,7 @@ import re
from arroba import did
from arroba.datastore_storage import DatastoreStorage
from arroba.repo import Repo, Write
from arroba.storage import Action
from arroba.storage import Action, CommitData
from arroba.util import next_tid, new_key, parse_at_uri
from flask import abort, g, request
from google.cloud import ndb
@ -178,7 +178,6 @@ class ATProto(User, Protocol):
user.put()
update()
repo = storage.load_repo(did=user.atproto_did)
writes = []
if repo is None:
@ -191,10 +190,14 @@ class ATProto(User, Protocol):
collection='app.bsky.actor.profile',
rkey='self', record=user.obj.as_bsky()))
repo.callback = lambda commit_data: common.create_task(
queue='atproto-commit', seq=commit_data.commit.seq)
# create record
writes.append(Write(action=Action.CREATE, collection='app.bsky.feed.post',
rkey=next_tid(), record=obj.as_bsky()))
repo.apply_writes(writes, privkey)
return True
@classmethod

Wyświetl plik

@ -12,7 +12,8 @@ import cachetools
from Crypto.Util import number
from flask import abort, g, make_response, request
from oauth_dropins.webutil import util, webmention
from oauth_dropins.webutil.appengine_info import DEBUG
from oauth_dropins.webutil.appengine_config import tasks_client
from oauth_dropins.webutil.appengine_info import APP_ID, DEBUG
logger = logging.getLogger(__name__)
@ -62,6 +63,8 @@ CACHE_TIME = timedelta(seconds=60)
USER_AGENT = 'Bridgy Fed (https://fed.brid.gy/)'
TASKS_LOCATION = 'us-central1'
def base64_to_long(x):
"""Converts x from URL safe base64 encoding to a long integer.
@ -227,3 +230,24 @@ def add(seq, val):
"""
if val not in seq:
seq.append(val)
def create_task(queue, **params):
"""Adds a Cloud Tasks task.
Args:
queue: string, queue name
params: form-encoded and included in the task request body
"""
assert queue
task = tasks_client.create_task(
parent=tasks_client.queue_path(APP_ID, TASKS_LOCATION, queue),
task={
'app_engine_http_request': {
'http_method': 'POST',
'relative_uri': f'/_ah/queue/{queue}',
'body': urllib.parse.urlencode(params).encode(),
'headers': {'Content-Type': 'application/x-www-form-urlencoded'},
},
})
logger.info(f'Added {queue} task {task.name} : {params}')

20
queue.yaml 100644
Wyświetl plik

@ -0,0 +1,20 @@
# https://cloud.google.com/appengine/docs/standard/python/config/queueref
# https://cloud.google.com/tasks/docs/queue-yaml
queue:
- name: atproto-commit
target: hub
rate: 10/s
max_concurrent_requests: 10
retry_parameters:
task_retry_limit: 3
min_backoff_seconds: 5
max_doublings: 5
- name: webmention
target: default
rate: 5/s
max_concurrent_requests: 5
retry_parameters:
task_retry_limit: 3
min_backoff_seconds: 120
max_doublings: 3

Wyświetl plik

@ -1,5 +1,6 @@
"""Unit tests for atproto.py."""
import copy
from google.cloud.tasks_v2.types import Task
import logging
from unittest import skip
from unittest.mock import patch
@ -14,12 +15,14 @@ from granary.tests.test_bluesky import (
POST_AS,
POST_BSKY,
)
from oauth_dropins.webutil.appengine_config import tasks_client
from oauth_dropins.webutil.testutil import requests_response
from oauth_dropins.webutil.util import json_dumps, json_loads
import requests
import atproto
from atproto import ATProto
from common import USER_AGENT
import common
from models import Object
import protocol
from .testutil import Fake, TestCase
@ -36,7 +39,7 @@ DID_DOC = {
}
},
'prev': None,
'sig': '...'
'sig': '...',
}
class ATProtoTest(TestCase):
@ -143,7 +146,7 @@ class ATProtoTest(TestCase):
json=None,
headers={
'Content-Type': 'application/json',
'User-Agent': USER_AGENT,
'User-Agent': common.USER_AGENT,
},
)
@ -175,9 +178,11 @@ class ATProtoTest(TestCase):
self.store_object(id='did:plc:foo', raw=DID_DOC)
self.assertEqual('@han.dull@atproto.brid.gy', user.ap_address())
@patch('common.APP_ID', 'my-app')
@patch.object(tasks_client, 'create_task', return_value=Task(name='my task'))
@patch('requests.post',
return_value=requests_response('OK')) # create DID on PLC
def test_send_new_repo(self, mock_post):
def test_send_new_repo(self, mock_post, mock_create_task):
user = self.make_user(id='fake:user', cls=Fake)
obj = self.store_object(id='fake:post', source_protocol='fake', our_as1={
**POST_AS,
@ -199,9 +204,20 @@ class ATProtoTest(TestCase):
record = repo.get_record('app.bsky.feed.post', arroba.util._tid_last)
self.assertEqual(POST_BSKY, record)
# check atproto-commit task
mock_create_task.assert_called_with(
parent='projects/my-app/locations/us-central1/queues/atproto-commit',
task={'app_engine_http_request': {
'http_method': 'POST',
'relative_uri': '/_ah/queue/atproto-commit',
'body': b'seq=2',
'headers': {'Content-Type': 'application/x-www-form-urlencoded'},
}})
@patch.object(tasks_client, 'create_task', return_value=Task(name='my task'))
@patch('requests.post',
return_value=requests_response('OK')) # create DID on PLC
def test_send_new_repo_includes_user_profile(self, mock_post):
def test_send_new_repo_includes_user_profile(self, mock_post, mock_create_task):
user = self.make_user(id='fake:user', cls=Fake, obj_as1=ACTOR_AS)
obj = self.store_object(id='fake:post', source_protocol='fake', our_as1={
**POST_AS,
@ -216,7 +232,10 @@ class ATProtoTest(TestCase):
record = repo.get_record('app.bsky.feed.post', arroba.util._tid_last)
self.assertEqual(POST_BSKY, record)
def test_send_existing_repo(self):
mock_create_task.assert_called()
@patch.object(tasks_client, 'create_task', return_value=Task(name='my task'))
def test_send_existing_repo(self, mock_create_task):
user = self.make_user(id='fake:user', cls=Fake, atproto_did='did:plc:foo')
did_doc = copy.deepcopy(DID_DOC)
@ -234,12 +253,17 @@ class ATProtoTest(TestCase):
record = repo.get_record('app.bsky.feed.post', arroba.util._tid_last)
self.assertEqual(POST_BSKY, record)
def test_send_not_our_repo(self):
mock_create_task.assert_called()
@patch.object(tasks_client, 'create_task')
def test_send_not_our_repo(self, mock_create_task):
self.assertFalse(ATProto.send(Object(id='fake:post'), 'http://other.pds/'))
self.assertEqual(0, AtpBlock.query().count())
self.assertEqual(0, AtpRepo.query().count())
mock_create_task.assert_not_called()
def test_send_did_doc_not_our_repo(self):
@patch.object(tasks_client, 'create_task')
def test_send_did_doc_not_our_repo(self, mock_create_task):
self.store_object(id='did:plc:foo', raw=DID_DOC) # uses https://some.pds
user = self.make_user(id='fake:user', cls=Fake, atproto_did='did:plc:foo')
obj = self.store_object(id='fake:post', source_protocol='fake', our_as1={
@ -250,3 +274,4 @@ class ATProtoTest(TestCase):
self.assertFalse(ATProto.send(obj, 'http://localhost/'))
self.assertEqual(0, AtpBlock.query().count())
self.assertEqual(0, AtpRepo.query().count())
mock_create_task.assert_not_called()