kopia lustrzana https://github.com/snarfed/bridgy-fed
				
				
				
			
		
			
				
	
	
		
			136 wiersze
		
	
	
		
			4.1 KiB
		
	
	
	
		
			Python
		
	
	
			
		
		
	
	
			136 wiersze
		
	
	
		
			4.1 KiB
		
	
	
	
		
			Python
		
	
	
"""Single-instance hub for ATProto subscription (firehose) server and client."""
 | 
						|
from functools import lru_cache
 | 
						|
from ipaddress import ip_address, ip_network
 | 
						|
import logging
 | 
						|
import os
 | 
						|
from pathlib import Path
 | 
						|
import socket
 | 
						|
import threading
 | 
						|
from threading import Thread, Timer
 | 
						|
 | 
						|
import arroba.server
 | 
						|
from arroba import xrpc_sync
 | 
						|
from flask import Flask, render_template
 | 
						|
import lexrpc.client
 | 
						|
import lexrpc.flask_server
 | 
						|
from oauth_dropins.webutil.appengine_info import DEBUG, LOCAL_SERVER
 | 
						|
from oauth_dropins.webutil import appengine_config, flask_util
 | 
						|
import pytz
 | 
						|
 | 
						|
# all protocols
 | 
						|
import activitypub, atproto, web
 | 
						|
import atproto_firehose
 | 
						|
import common
 | 
						|
import models
 | 
						|
import pages
 | 
						|
 | 
						|
# as of 2024-07-10
 | 
						|
BSKY_TEAM_CIDRS = (
 | 
						|
    # https://discord.com/channels/1097580399187738645/1115973909624397855/1260356452162469969
 | 
						|
    ip_network('209.249.133.120/29'),
 | 
						|
    ip_network('108.179.139.0/24'),
 | 
						|
    # https://github.com/bluesky-social/atproto/discussions/3036#discussioncomment-11431550
 | 
						|
    ip_network('67.213.161.32/29')
 | 
						|
)
 | 
						|
BSKY_TEAM_HOSTS = (
 | 
						|
    'zip.zayo.com',  # maybe? https://github.com/bluesky-social/atproto/discussions/3036#discussioncomment-11399854
 | 
						|
)
 | 
						|
 | 
						|
# WARNING: when this is higher than 1, we start seeing ndb context exceptions,
 | 
						|
# "ContextError: No current context," in _handle, even though it has an ndb context
 | 
						|
# from handler. No clue why. They happen more often as the number of threads
 | 
						|
# increases. Are ndb clients/contexts not thread safe?!
 | 
						|
# https://github.com/snarfed/bridgy-fed/issues/1315
 | 
						|
# https://console.cloud.google.com/errors/detail/CJrBqKnRzPfNRA;time=PT1H;refresh=true;locations=global?project=bridgy-federated
 | 
						|
HANDLE_THREADS = 10
 | 
						|
 | 
						|
logger = logging.getLogger(__name__)
 | 
						|
 | 
						|
models.reset_protocol_properties()
 | 
						|
 | 
						|
# Flask app
 | 
						|
app = Flask(__name__)
 | 
						|
app.json.compact = False
 | 
						|
app_dir = Path(__file__).parent
 | 
						|
app.config.from_pyfile(app_dir / 'config.py')
 | 
						|
 | 
						|
app.wsgi_app = flask_util.ndb_context_middleware(
 | 
						|
    app.wsgi_app, client=appengine_config.ndb_client, **common.NDB_CONTEXT_KWARGS)
 | 
						|
 | 
						|
 | 
						|
# app.add_url_rule('/hub/eval', view_func=pages.python_eval, methods=['POST'])
 | 
						|
 | 
						|
@app.get('/liveness_check')
 | 
						|
@app.get('/readiness_check')
 | 
						|
def health_check():
 | 
						|
    """App Engine Flex health checks.
 | 
						|
 | 
						|
    https://cloud.google.com/appengine/docs/flexible/reference/app-yaml?tab=python#updated_health_checks
 | 
						|
    """
 | 
						|
    return 'OK'
 | 
						|
 | 
						|
 | 
						|
# ATProto XRPC server
 | 
						|
lexrpc.flask_server.init_flask(arroba.server.server, app)
 | 
						|
 | 
						|
 | 
						|
@app.post('/queue/atproto-commit')
 | 
						|
@flask_util.cloud_tasks_only(log=False)
 | 
						|
def atproto_commit():
 | 
						|
    """Handler for atproto-commit tasks.
 | 
						|
 | 
						|
    Triggers `subscribeRepos` to check for new commits.
 | 
						|
    """
 | 
						|
    xrpc_sync.send_events()
 | 
						|
    return 'OK'
 | 
						|
 | 
						|
 | 
						|
@lru_cache
 | 
						|
def gethostbyaddr(addr):
 | 
						|
    """Wrapper for :func:`socket.gethostbyaddr` that caches the result."""
 | 
						|
    for subnet in BSKY_TEAM_CIDRS:
 | 
						|
        if ip_address(addr) in subnet:
 | 
						|
            return 'bsky'
 | 
						|
 | 
						|
    try:
 | 
						|
        return socket.gethostbyaddr(addr)[0]
 | 
						|
    except socket.herror:
 | 
						|
        return None
 | 
						|
 | 
						|
 | 
						|
@app.get('/admin/atproto')
 | 
						|
def atproto_admin():
 | 
						|
    return render_template(
 | 
						|
        'atproto.html',
 | 
						|
        subscribers=lexrpc.flask_server.subscribers,
 | 
						|
        gethostbyaddr=gethostbyaddr,
 | 
						|
        pytz=pytz,
 | 
						|
    )
 | 
						|
 | 
						|
 | 
						|
 | 
						|
# start firehose consumer threads
 | 
						|
if LOCAL_SERVER or not DEBUG:
 | 
						|
    threads = [t.name for t in threading.enumerate()]
 | 
						|
    assert 'atproto_firehose.subscriber' not in threads
 | 
						|
    assert 'atproto_firehose.handler' not in threads
 | 
						|
 | 
						|
    Thread(target=atproto_firehose.subscriber, name='atproto_firehose.subscriber').start()
 | 
						|
    for i in range(HANDLE_THREADS):
 | 
						|
        Thread(target=atproto_firehose.handler, name=f'atproto_firehose.handler-{i}').start()
 | 
						|
 | 
						|
 | 
						|
# send requestCrawl to relay every 5m.
 | 
						|
# delay 15s at startup because we're not up and serving XRPCs at this point yet.
 | 
						|
# not sure why not.
 | 
						|
if 'GAE_INSTANCE' in os.environ:  # prod
 | 
						|
    def request_crawl():
 | 
						|
        bgs = lexrpc.client.Client(f'https://{os.environ["BGS_HOST"]}',
 | 
						|
                                   headers={'User-Agent': common.USER_AGENT})
 | 
						|
        resp = bgs.com.atproto.sync.requestCrawl({'hostname': os.environ['PDS_HOST']})
 | 
						|
        logger.info(resp)
 | 
						|
        Timer(5 * 60, request_crawl).start()
 | 
						|
 | 
						|
    Timer(15, request_crawl).start()
 | 
						|
    logger.info('Will send relay requestCrawl in 15s')
 |