diff --git a/atproto_firehose.py b/atproto_firehose.py index 3d1089d..ceb8c1f 100644 --- a/atproto_firehose.py +++ b/atproto_firehose.py @@ -6,20 +6,18 @@ import itertools import logging import os from queue import SimpleQueue -import threading from threading import Event, Lock, Thread, Timer import time from arroba.datastore_storage import AtpRepo from carbox import read_car import dag_json -from flask import Flask from google.cloud import ndb from granary.bluesky import AT_URI_PATTERN from lexrpc.client import Client from oauth_dropins.webutil import util from oauth_dropins.webutil.appengine_config import ndb_client -from oauth_dropins.webutil.appengine_info import DEBUG, LOCAL_SERVER +from oauth_dropins.webutil.appengine_info import DEBUG from oauth_dropins.webutil.util import json_loads from atproto import ATProto, Cursor @@ -33,14 +31,11 @@ from common import ( ) from models import Object, reset_protocol_properties -# all protocols -import activitypub, atproto, web +logger = logging.getLogger(__name__) RECONNECT_DELAY = timedelta(seconds=30) STORE_CURSOR_FREQ = timedelta(seconds=10) -logger = logging.getLogger(__name__) - # a commit operation. similar to arroba.repo.Write. record is None for deletes. Op = namedtuple('Op', ['action', 'repo', 'path', 'seq', 'record'], # record is optional @@ -59,8 +54,6 @@ bridged_dids = set() bridged_loaded_at = datetime(1900, 1, 1) dids_initialized = Event() -reset_protocol_properties() - def load_dids(): # run in a separate thread since it needs to make its own NDB @@ -326,22 +319,3 @@ def handle(limit=None): return assert False, "handle thread shouldn't reach here!" - - -# Flask app -app = Flask(__name__) - - -@app.get('/liveness_check') -@app.get('/readiness_check') -def health_check(): - return 'OK' - - -if LOCAL_SERVER or not DEBUG: - threads = [t.name for t in threading.enumerate()] - assert 'atproto_firehose.subscriber' not in threads - assert 'atproto_firehose.handler' not in threads - - Thread(target=subscriber, name='atproto_firehose.subscriber').start() - Thread(target=handler, name='atproto_firehose.handler').start() diff --git a/hub.py b/atproto_hub.py similarity index 78% rename from hub.py rename to atproto_hub.py index ae2b496..0c6dafa 100644 --- a/hub.py +++ b/atproto_hub.py @@ -1,4 +1,4 @@ -"""Single-instance hub for ATProto and Nostr websocket subscriptions.""" +"""Single-instance hub for ATProto subscription (firehose) server and client.""" import logging import os from pathlib import Path @@ -11,14 +11,11 @@ from flask import Flask import lexrpc.client import lexrpc.flask_server from oauth_dropins.webutil.appengine_info import DEBUG, LOCAL_SERVER -from oauth_dropins.webutil import ( - appengine_config, - flask_util, - util, -) +from oauth_dropins.webutil import appengine_config, flask_util # all protocols import activitypub, atproto, web +import atproto_firehose from common import global_cache, global_cache_timeout_policy, USER_AGENT import models @@ -66,6 +63,16 @@ def atproto_commit(): return 'OK' +# start firehose consumer threads +if LOCAL_SERVER or not DEBUG: + threads = [t.name for t in threading.enumerate()] + assert 'atproto_firehose.subscriber' not in threads + assert 'atproto_firehose.handler' not in threads + + Thread(target=atproto_firehose.subscriber, name='atproto_firehose.subscriber').start() + Thread(target=atproto_firehose.handler, name='atproto_firehose.handler').start() + + # send requestCrawl to relay # delay because we're not up and serving XRPCs at this point yet. not sure why not. if 'GAE_INSTANCE' in os.environ: # prod diff --git a/hub.yaml b/atproto_hub.yaml similarity index 87% rename from hub.yaml rename to atproto_hub.yaml index 8754c3b..ddf6d59 100644 --- a/hub.yaml +++ b/atproto_hub.yaml @@ -4,7 +4,7 @@ # application: bridgy-federated -service: hub +service: atproto-hub env: flex runtime: python runtime_config: @@ -15,7 +15,7 @@ resources: cpu: 1 memory_gb: 1.6 -# can't be internal because Bluesky relay needs to be able to connect externally +# can't be internal because Bluesky relay(s) need to be able to connect to us # over websocket for subscribeRepos network: instance_ip_mode: external @@ -37,6 +37,7 @@ env_variables: MOD_SERVICE_HOST: mod.bsky.app MOD_SERVICE_DID: did:plc:ar7c4by46qjdydhdevvrndac # ...or test against labeler.dholms.xyz / did:plc:vzxheqfwpbi3lxbgdh22js66 + ROLLBACK_WINDOW: 50000 # need only one instance so that new commits can be delivered to subscribeRepos @@ -47,4 +48,4 @@ manual_scaling: # https://cloud.google.com/appengine/docs/flexible/python/runtime#application_startup # https://docs.gunicorn.org/en/latest/settings.html#timeout # TODO: try asyncio w/eventlet workers -entrypoint: gunicorn --workers 1 --threads 20 -b :$PORT hub:app +entrypoint: gunicorn --workers 1 --threads 25 -b :$PORT atproto_hub:app diff --git a/dispatch.yaml b/dispatch.yaml index 32027d6..68d48e8 100644 --- a/dispatch.yaml +++ b/dispatch.yaml @@ -2,10 +2,10 @@ dispatch: - url: "*/queue/atproto-commit" - service: hub + service: atproto-hub - url: "*/queue/*" service: router - url: "*/xrpc/com.atproto.sync.subscribeRepos" - service: hub + service: atproto-hub diff --git a/firehose.yaml b/firehose.yaml deleted file mode 100644 index 8bda83e..0000000 --- a/firehose.yaml +++ /dev/null @@ -1,42 +0,0 @@ -# https://cloud.google.com/appengine/docs/flexible/reference/app-yaml?tab=python -# -# gcloud -q app deploy --project bridgy-federated firehose.yaml - -# application: bridgy-federated - -service: firehose -env: flex -runtime: python -runtime_config: - operating_system: ubuntu22 - runtime_version: "3.11" - -resources: - memory_gb: 1.0 - -env_variables: - PDS_HOST: atproto.brid.gy - - # # sandbox - # PLC_HOST: plc.bsky-sandbox.dev - # APPVIEW_HOST: api.bsky-sandbox.dev - # BGS_HOST: bgs.bsky-sandbox.dev - # MOD_SERVICE_HOST: ? - # MOD_SERVICE_DID: ? - - # prod - PLC_HOST: plc.directory - APPVIEW_HOST: api.bsky.app - BGS_HOST: bsky.network - MOD_SERVICE_HOST: mod.bsky.app - MOD_SERVICE_DID: did:plc:ar7c4by46qjdydhdevvrndac - -manual_scaling: - instances: 1 - -# https://cloud.google.com/appengine/docs/flexible/python/runtime#application_startup -# https://docs.gunicorn.org/en/latest/settings.html#timeout -# -# this service doesn't serve any HTTP requests, but it evidently still needs a -# web server for health checks. -entrypoint: gunicorn --workers 1 --threads 5 -b :$PORT atproto_firehose:app diff --git a/tests/test_atproto.py b/tests/test_atproto.py index 497c7b4..f5f6c49 100644 --- a/tests/test_atproto.py +++ b/tests/test_atproto.py @@ -26,7 +26,6 @@ from werkzeug.exceptions import BadRequest import atproto from atproto import ATProto, DatastoreClient import common -import hub from models import Object, PROTOCOLS, Target import protocol from .testutil import ATPROTO_KEY, Fake, TestCase diff --git a/tests/test_integrations.py b/tests/test_integrations.py index b23f1c0..d0b9c70 100644 --- a/tests/test_integrations.py +++ b/tests/test_integrations.py @@ -18,7 +18,6 @@ from activitypub import ActivityPub import app from atproto import ATProto, Cursor from atproto_firehose import handle, new_commits, Op -import hub from models import Object, Target from web import Web