merge hub and atproto_firehose services into a single service, atproto_hub

atproto_firehose is averaging 20-40% CPU right now, hub 5-10%, and both are 500-600M memory steady state, so they can easily fit together.
pull/1133/head
Ryan Barrett 2024-06-12 17:04:59 -07:00
rodzic 590021926c
commit 616073f788
Nie znaleziono w bazie danych klucza dla tego podpisu
ID klucza GPG: 6BE31FDF4776E9D4
7 zmienionych plików z 21 dodań i 83 usunięć

Wyświetl plik

@ -6,20 +6,18 @@ import itertools
import logging
import os
from queue import SimpleQueue
import threading
from threading import Event, Lock, Thread, Timer
import time
from arroba.datastore_storage import AtpRepo
from carbox import read_car
import dag_json
from flask import Flask
from google.cloud import ndb
from granary.bluesky import AT_URI_PATTERN
from lexrpc.client import Client
from oauth_dropins.webutil import util
from oauth_dropins.webutil.appengine_config import ndb_client
from oauth_dropins.webutil.appengine_info import DEBUG, LOCAL_SERVER
from oauth_dropins.webutil.appengine_info import DEBUG
from oauth_dropins.webutil.util import json_loads
from atproto import ATProto, Cursor
@ -33,14 +31,11 @@ from common import (
)
from models import Object, reset_protocol_properties
# all protocols
import activitypub, atproto, web
logger = logging.getLogger(__name__)
RECONNECT_DELAY = timedelta(seconds=30)
STORE_CURSOR_FREQ = timedelta(seconds=10)
logger = logging.getLogger(__name__)
# a commit operation. similar to arroba.repo.Write. record is None for deletes.
Op = namedtuple('Op', ['action', 'repo', 'path', 'seq', 'record'],
# record is optional
@ -59,8 +54,6 @@ bridged_dids = set()
bridged_loaded_at = datetime(1900, 1, 1)
dids_initialized = Event()
reset_protocol_properties()
def load_dids():
# run in a separate thread since it needs to make its own NDB
@ -326,22 +319,3 @@ def handle(limit=None):
return
assert False, "handle thread shouldn't reach here!"
# Flask app
app = Flask(__name__)
@app.get('/liveness_check')
@app.get('/readiness_check')
def health_check():
return 'OK'
if LOCAL_SERVER or not DEBUG:
threads = [t.name for t in threading.enumerate()]
assert 'atproto_firehose.subscriber' not in threads
assert 'atproto_firehose.handler' not in threads
Thread(target=subscriber, name='atproto_firehose.subscriber').start()
Thread(target=handler, name='atproto_firehose.handler').start()

Wyświetl plik

@ -1,4 +1,4 @@
"""Single-instance hub for ATProto and Nostr websocket subscriptions."""
"""Single-instance hub for ATProto subscription (firehose) server and client."""
import logging
import os
from pathlib import Path
@ -11,14 +11,11 @@ from flask import Flask
import lexrpc.client
import lexrpc.flask_server
from oauth_dropins.webutil.appengine_info import DEBUG, LOCAL_SERVER
from oauth_dropins.webutil import (
appengine_config,
flask_util,
util,
)
from oauth_dropins.webutil import appengine_config, flask_util
# all protocols
import activitypub, atproto, web
import atproto_firehose
from common import global_cache, global_cache_timeout_policy, USER_AGENT
import models
@ -66,6 +63,16 @@ def atproto_commit():
return 'OK'
# start firehose consumer threads
if LOCAL_SERVER or not DEBUG:
threads = [t.name for t in threading.enumerate()]
assert 'atproto_firehose.subscriber' not in threads
assert 'atproto_firehose.handler' not in threads
Thread(target=atproto_firehose.subscriber, name='atproto_firehose.subscriber').start()
Thread(target=atproto_firehose.handler, name='atproto_firehose.handler').start()
# send requestCrawl to relay
# delay because we're not up and serving XRPCs at this point yet. not sure why not.
if 'GAE_INSTANCE' in os.environ: # prod

Wyświetl plik

@ -4,7 +4,7 @@
# application: bridgy-federated
service: hub
service: atproto-hub
env: flex
runtime: python
runtime_config:
@ -15,7 +15,7 @@ resources:
cpu: 1
memory_gb: 1.6
# can't be internal because Bluesky relay needs to be able to connect externally
# can't be internal because Bluesky relay(s) need to be able to connect to us
# over websocket for subscribeRepos
network:
instance_ip_mode: external
@ -37,6 +37,7 @@ env_variables:
MOD_SERVICE_HOST: mod.bsky.app
MOD_SERVICE_DID: did:plc:ar7c4by46qjdydhdevvrndac
# ...or test against labeler.dholms.xyz / did:plc:vzxheqfwpbi3lxbgdh22js66
ROLLBACK_WINDOW: 50000
# need only one instance so that new commits can be delivered to subscribeRepos
@ -47,4 +48,4 @@ manual_scaling:
# https://cloud.google.com/appengine/docs/flexible/python/runtime#application_startup
# https://docs.gunicorn.org/en/latest/settings.html#timeout
# TODO: try asyncio w/eventlet workers
entrypoint: gunicorn --workers 1 --threads 20 -b :$PORT hub:app
entrypoint: gunicorn --workers 1 --threads 25 -b :$PORT atproto_hub:app

Wyświetl plik

@ -2,10 +2,10 @@
dispatch:
- url: "*/queue/atproto-commit"
service: hub
service: atproto-hub
- url: "*/queue/*"
service: router
- url: "*/xrpc/com.atproto.sync.subscribeRepos"
service: hub
service: atproto-hub

Wyświetl plik

@ -1,42 +0,0 @@
# https://cloud.google.com/appengine/docs/flexible/reference/app-yaml?tab=python
#
# gcloud -q app deploy --project bridgy-federated firehose.yaml
# application: bridgy-federated
service: firehose
env: flex
runtime: python
runtime_config:
operating_system: ubuntu22
runtime_version: "3.11"
resources:
memory_gb: 1.0
env_variables:
PDS_HOST: atproto.brid.gy
# # sandbox
# PLC_HOST: plc.bsky-sandbox.dev
# APPVIEW_HOST: api.bsky-sandbox.dev
# BGS_HOST: bgs.bsky-sandbox.dev
# MOD_SERVICE_HOST: ?
# MOD_SERVICE_DID: ?
# prod
PLC_HOST: plc.directory
APPVIEW_HOST: api.bsky.app
BGS_HOST: bsky.network
MOD_SERVICE_HOST: mod.bsky.app
MOD_SERVICE_DID: did:plc:ar7c4by46qjdydhdevvrndac
manual_scaling:
instances: 1
# https://cloud.google.com/appengine/docs/flexible/python/runtime#application_startup
# https://docs.gunicorn.org/en/latest/settings.html#timeout
#
# this service doesn't serve any HTTP requests, but it evidently still needs a
# web server for health checks.
entrypoint: gunicorn --workers 1 --threads 5 -b :$PORT atproto_firehose:app

Wyświetl plik

@ -26,7 +26,6 @@ from werkzeug.exceptions import BadRequest
import atproto
from atproto import ATProto, DatastoreClient
import common
import hub
from models import Object, PROTOCOLS, Target
import protocol
from .testutil import ATPROTO_KEY, Fake, TestCase

Wyświetl plik

@ -18,7 +18,6 @@ from activitypub import ActivityPub
import app
from atproto import ATProto, Cursor
from atproto_firehose import handle, new_commits, Op
import hub
from models import Object, Target
from web import Web