2024-05-07 23:58:52 +00:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								"""Bridgy Fed firehose client. Enqueues receive tasks for events for bridged users.
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								 """
							 | 
						
					
						
							
								
									
										
										
										
											2024-05-08 22:48:39 +00:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								from collections import namedtuple
							 | 
						
					
						
							
								
									
										
										
										
											2024-05-14 05:02:09 +00:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								from datetime import datetime, timedelta
							 | 
						
					
						
							
								
									
										
										
										
											2024-05-07 23:17:44 +00:00
										 
									 
								 
							 | 
							
								
							 | 
							
								
							 | 
							
							
								import itertools
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								import logging
							 | 
						
					
						
							
								
									
										
										
										
											2024-05-07 21:15:51 +00:00
										 
									 
								 
							 | 
							
								
							 | 
							
								
							 | 
							
							
								import os
							 | 
						
					
						
							
								
									
										
										
										
											2024-05-08 17:39:03 +00:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								from queue import SimpleQueue
							 | 
						
					
						
							
								
									
										
										
										
											2024-05-10 04:25:13 +00:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								from threading import Event, Lock, Thread, Timer
							 | 
						
					
						
							
								
									
										
										
										
											2024-05-09 03:43:10 +00:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								import time
							 | 
						
					
						
							
								
									
										
										
										
											2024-05-07 21:15:51 +00:00
										 
									 
								 
							 | 
							
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							
								
									
										
										
										
											2024-05-10 18:31:47 +00:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								from arroba.datastore_storage import AtpRepo
							 | 
						
					
						
							
								
									
										
										
										
											2024-07-06 22:26:20 +00:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								from arroba.util import parse_at_uri
							 | 
						
					
						
							
								
									
										
										
										
											2024-05-07 21:15:51 +00:00
										 
									 
								 
							 | 
							
								
							 | 
							
								
							 | 
							
							
								from carbox import read_car
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								import dag_json
							 | 
						
					
						
							
								
									
										
										
										
											2024-05-09 21:31:57 +00:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								from google.cloud import ndb
							 | 
						
					
						
							
								
									
										
										
										
											2024-05-07 21:15:51 +00:00
										 
									 
								 
							 | 
							
								
							 | 
							
								
							 | 
							
							
								from granary.bluesky import AT_URI_PATTERN
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								from lexrpc.client import Client
							 | 
						
					
						
							
								
									
										
										
										
											2024-05-08 17:39:03 +00:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								from oauth_dropins.webutil import util
							 | 
						
					
						
							
								
									
										
										
										
											2024-05-09 21:31:57 +00:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								from oauth_dropins.webutil.appengine_config import ndb_client
							 | 
						
					
						
							
								
									
										
										
										
											2024-06-13 00:04:59 +00:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								from oauth_dropins.webutil.appengine_info import DEBUG
							 | 
						
					
						
							
								
									
										
										
										
											2024-07-05 04:11:38 +00:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								from oauth_dropins.webutil.util import json_dumps, json_loads
							 | 
						
					
						
							
								
									
										
										
										
											2024-05-07 21:15:51 +00:00
										 
									 
								 
							 | 
							
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							
								
									
										
										
										
											2024-05-09 04:35:03 +00:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								from atproto import ATProto, Cursor
							 | 
						
					
						
							
								
									
										
										
										
											2024-06-05 02:10:01 +00:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								from common import (
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    add,
							 | 
						
					
						
							
								
									
										
										
										
											2024-08-01 19:07:20 +00:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								    cache_policy,
							 | 
						
					
						
							
								
									
										
										
										
											2024-06-05 02:10:01 +00:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								    create_task,
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    global_cache,
							 | 
						
					
						
							
								
									
										
										
										
											2024-08-01 19:42:13 +00:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								    global_cache_policy,
							 | 
						
					
						
							
								
									
										
										
										
											2024-06-05 02:10:01 +00:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								    global_cache_timeout_policy,
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    report_exception,
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    USER_AGENT,
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								)
							 | 
						
					
						
							
								
									
										
										
										
											2024-05-14 05:02:09 +00:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								from models import Object, reset_protocol_properties
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							
								
									
										
										
										
											2024-06-13 00:04:59 +00:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								logger = logging.getLogger(__name__)
							 | 
						
					
						
							
								
									
										
										
										
											2024-05-07 23:17:44 +00:00
										 
									 
								 
							 | 
							
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							
								
									
										
										
										
											2024-05-09 03:43:10 +00:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								RECONNECT_DELAY = timedelta(seconds=30)
							 | 
						
					
						
							
								
									
										
										
										
											2024-05-23 05:08:25 +00:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								STORE_CURSOR_FREQ = timedelta(seconds=10)
							 | 
						
					
						
							
								
									
										
										
										
											2024-05-09 03:43:10 +00:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							
								
									
										
										
										
											2024-05-08 22:48:39 +00:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								# a commit operation. similar to arroba.repo.Write. record is None for deletes.
							 | 
						
					
						
							
								
									
										
										
										
											2024-05-09 06:11:27 +00:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								Op = namedtuple('Op', ['action', 'repo', 'path', 'seq', 'record'],
							 | 
						
					
						
							
								
									
										
										
										
											2024-05-08 22:48:39 +00:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								                # record is optional
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								                defaults=[None])
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								# contains Ops
							 | 
						
					
						
							
								
									
										
										
										
											2024-05-08 17:39:03 +00:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								new_commits = SimpleQueue()
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							
								
									
										
										
										
											2024-05-19 20:47:13 +00:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								# global so that subscribe can reuse it across calls
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								subscribe_cursor = None
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								# global: _load_dids populates them, subscribe and handle use them
							 | 
						
					
						
							
								
									
										
										
										
											2024-05-14 05:02:09 +00:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								atproto_dids = set()
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								atproto_loaded_at = datetime(1900, 1, 1)
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								bridged_dids = set()
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								bridged_loaded_at = datetime(1900, 1, 1)
							 | 
						
					
						
							
								
									
										
										
										
											2024-05-10 04:25:13 +00:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								dids_initialized = Event()
							 | 
						
					
						
							
								
									
										
										
										
											2024-05-09 21:31:57 +00:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							
								
									
										
										
										
											2024-05-07 21:15:51 +00:00
										 
									 
								 
							 | 
							
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							
								
									
										
										
										
											2024-05-09 21:05:05 +00:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								def load_dids():
							 | 
						
					
						
							
								
									
										
										
										
											2024-05-11 02:59:04 +00:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								    # run in a separate thread since it needs to make its own NDB
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    # context when it runs in the timer thread
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    Thread(target=_load_dids).start()
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    dids_initialized.wait()
							 | 
						
					
						
							
								
									
										
										
										
											2024-05-25 01:32:32 +00:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								    dids_initialized.clear()
							 | 
						
					
						
							
								
									
										
										
										
											2024-05-09 21:31:57 +00:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								def _load_dids():
							 | 
						
					
						
							
								
									
										
										
										
											2024-05-14 05:02:09 +00:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								    global atproto_dids, atproto_loaded_at, bridged_dids, bridged_loaded_at
							 | 
						
					
						
							
								
									
										
										
										
											2024-05-09 03:43:10 +00:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							
								
									
										
										
										
											2024-08-01 19:07:20 +00:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								    with ndb_client.context(cache_policy=cache_policy, global_cache=global_cache,
							 | 
						
					
						
							
								
									
										
										
										
											2024-08-01 19:42:13 +00:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								                            global_cache_policy=global_cache_policy,
							 | 
						
					
						
							
								
									
										
										
										
											2024-06-05 02:10:01 +00:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								                            global_cache_timeout_policy=global_cache_timeout_policy):
							 | 
						
					
						
							
								
									
										
										
										
											2024-05-09 22:38:52 +00:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								        if not DEBUG:
							 | 
						
					
						
							
								
									
										
										
										
											2024-05-10 04:25:13 +00:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								            Timer(STORE_CURSOR_FREQ.total_seconds(), _load_dids).start()
							 | 
						
					
						
							
								
									
										
										
										
											2024-05-09 03:43:10 +00:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							
								
									
										
										
										
											2024-05-14 05:02:09 +00:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								        atproto_query = ATProto.query(ATProto.enabled_protocols != None,
							 | 
						
					
						
							
								
									
										
										
										
											2024-05-25 01:32:32 +00:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								                                      ATProto.updated > atproto_loaded_at)
							 | 
						
					
						
							
								
									
										
										
										
											2024-07-20 22:19:18 +00:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								        atproto_loaded_at = ATProto.query().order(-ATProto.updated).get().updated
							 | 
						
					
						
							
								
									
										
										
										
											2024-05-14 05:02:09 +00:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								        new_atproto = [key.id() for key in atproto_query.iter(keys_only=True)]
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								        atproto_dids.update(new_atproto)
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							
								
									
										
										
										
											2024-07-23 05:24:13 +00:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								        bridged_query = AtpRepo.query(AtpRepo.status == None,
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								                                      AtpRepo.created > bridged_loaded_at)
							 | 
						
					
						
							
								
									
										
										
										
											2024-05-14 05:02:09 +00:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								        bridged_loaded_at = AtpRepo.query().order(-AtpRepo.created).get().created
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								        new_bridged = [key.id() for key in bridged_query.iter(keys_only=True)]
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								        bridged_dids.update(new_bridged)
							 | 
						
					
						
							
								
									
										
										
										
											2024-05-07 21:15:51 +00:00
										 
									 
								 
							 | 
							
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							
								
									
										
										
										
											2024-05-10 04:25:13 +00:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								        dids_initialized.set()
							 | 
						
					
						
							
								
									
										
										
										
											2024-07-20 22:19:18 +00:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								        total = len(atproto_dids) + len(new_bridged)
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								        logger.info(f'DIDs: {total} ATProto {len(atproto_dids)} (+{len(new_atproto)}), AtpRepo {len(bridged_dids)} (+{len(new_bridged)})')
							 | 
						
					
						
							
								
									
										
										
										
											2024-05-09 21:05:05 +00:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							
								
									
										
										
										
											2024-05-14 05:02:09 +00:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								def subscriber():
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    """Wrapper around :func:`_subscribe` that catches exceptions and reconnects."""
							 | 
						
					
						
							
								
									
										
										
										
											2024-05-09 21:05:05 +00:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								    logger.info(f'started thread to subscribe to {os.environ["BGS_HOST"]} firehose')
							 | 
						
					
						
							
								
									
										
										
										
											2024-05-25 01:32:32 +00:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								    load_dids()
							 | 
						
					
						
							
								
									
										
										
										
											2024-05-07 21:15:51 +00:00
										 
									 
								 
							 | 
							
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							
								
									
										
										
										
											2024-05-09 03:43:10 +00:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								    while True:
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								        try:
							 | 
						
					
						
							
								
									
										
										
										
											2024-06-05 02:10:01 +00:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								            with ndb_client.context(
							 | 
						
					
						
							
								
									
										
										
										
											2024-08-01 19:07:20 +00:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								                    cache_policy=cache_policy, global_cache=global_cache,
							 | 
						
					
						
							
								
									
										
										
										
											2024-08-01 19:42:13 +00:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								                    global_cache_policy=global_cache_policy,
							 | 
						
					
						
							
								
									
										
										
										
											2024-08-01 19:07:20 +00:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								                    global_cache_timeout_policy=global_cache_timeout_policy):
							 | 
						
					
						
							
								
									
										
										
										
											2024-05-14 05:02:09 +00:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								                subscribe()
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								            logger.info(f'disconnected! waiting {RECONNECT_DELAY} and then reconnecting')
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								            time.sleep(RECONNECT_DELAY.total_seconds())
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							
								
									
										
										
										
											2024-05-10 04:31:41 +00:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								        except BaseException:
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								            report_exception()
							 | 
						
					
						
							
								
									
										
										
										
											2024-05-09 03:43:10 +00:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							
								
									
										
										
										
											2024-05-14 05:02:09 +00:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								def subscribe():
							 | 
						
					
						
							
								
									
										
										
										
											2024-05-11 02:59:04 +00:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								    """Subscribes to the relay's firehose.
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    Relay hostname comes from the ``BGS_HOST`` environment variable.
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    Args:
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								      reconnect (bool): whether to always reconnect after we get disconnected
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    """
							 | 
						
					
						
							
								
									
										
										
										
											2024-05-19 20:47:13 +00:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								    global subscribe_cursor
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    if not subscribe_cursor:
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								        cursor = Cursor.get_by_id(
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								            f'{os.environ["BGS_HOST"]} com.atproto.sync.subscribeRepos')
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								        assert cursor
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								        subscribe_cursor = cursor.cursor + 1 if cursor.cursor else None
							 | 
						
					
						
							
								
									
										
										
										
											2024-05-09 06:11:27 +00:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							
								
									
										
										
										
											2024-06-05 02:10:01 +00:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								    client = Client(f'https://{os.environ["BGS_HOST"]}',
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								                    headers={'User-Agent': USER_AGENT})
							 | 
						
					
						
							
								
									
										
										
										
											2024-05-07 23:58:52 +00:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							
								
									
										
										
										
											2024-05-19 20:47:13 +00:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								    for header, payload in client.com.atproto.sync.subscribeRepos(
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								            cursor=subscribe_cursor):
							 | 
						
					
						
							
								
									
										
										
										
											2024-05-09 06:11:27 +00:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								        # parse header
							 | 
						
					
						
							
								
									
										
										
										
											2024-05-09 20:26:24 +00:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								        if header.get('op') == -1:
							 | 
						
					
						
							
								
									
										
										
										
											2024-05-08 00:36:34 +00:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								            logger.warning(f'Got error from relay! {payload}')
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								            continue
							 | 
						
					
						
							
								
									
										
										
										
											2024-05-09 20:26:24 +00:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								        elif header.get('t') != '#commit':
							 | 
						
					
						
							
								
									
										
										
										
											2024-05-25 01:32:32 +00:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								            logger.info(f'Got {header.get("t")} from relay: {payload}')
							 | 
						
					
						
							
								
									
										
										
										
											2024-05-07 21:15:51 +00:00
										 
									 
								 
							 | 
							
								
							 | 
							
								
							 | 
							
							
								            continue
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							
								
									
										
										
										
											2024-05-09 06:11:27 +00:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								        # parse payload
							 | 
						
					
						
							
								
									
										
										
										
											2024-05-07 23:58:52 +00:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								        repo = payload.get('repo')
							 | 
						
					
						
							
								
									
										
										
										
											2024-05-08 22:48:39 +00:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								        assert repo
							 | 
						
					
						
							
								
									
										
										
										
											2024-05-14 05:02:09 +00:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							
								
									
										
										
										
											2024-05-19 20:47:13 +00:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								        seq = payload.get('seq')
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								        if not seq:
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								            logger.warning(f'Payload missing seq! {payload}')
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								            continue
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								        # if we fail processing this commit and raise an exception up to subscriber,
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								        # skip it and start with the next commit when we're restarted
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								        subscribe_cursor = seq + 1
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							
								
									
										
										
										
											2024-05-14 05:02:09 +00:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								        # ops = ' '.join(f'{op.get("action")} {op.get("path")}'
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								        #                for op in payload.get('ops', []))
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								        # logger.info(f'seeing {payload.get("seq")} {repo} {ops}')
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							
								
									
										
										
										
											2024-05-23 05:08:25 +00:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								        if repo in bridged_dids:  # from a Bridgy Fed non-Bluesky user; ignore
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								            logger.info(f'Ignoring record from our non-ATProto bridged user {repo}')
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								            continue
							 | 
						
					
						
							
								
									
										
										
										
											2024-05-07 21:15:51 +00:00
										 
									 
								 
							 | 
							
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							
								
									
										
										
										
											2024-05-08 20:26:36 +00:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								        blocks = {}
							 | 
						
					
						
							
								
									
										
										
										
											2024-05-09 20:26:24 +00:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								        if block_bytes := payload.get('blocks'):
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								            _, blocks = read_car(block_bytes)
							 | 
						
					
						
							
								
									
										
										
										
											2024-05-08 20:26:36 +00:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								            blocks = {block.cid: block for block in blocks}
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							
								
									
										
										
										
											2024-05-07 23:58:52 +00:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								        # detect records that reference an ATProto user, eg replies, likes,
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								        # reposts, mentions
							 | 
						
					
						
							
								
									
										
										
										
											2024-05-09 20:26:24 +00:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								        for p_op in payload.get('ops', []):
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								            op = Op(repo=repo, action=p_op.get('action'), path=p_op.get('path'),
							 | 
						
					
						
							
								
									
										
										
										
											2024-05-19 20:47:13 +00:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								                    seq=seq)
							 | 
						
					
						
							
								
									
										
										
										
											2024-05-09 20:26:24 +00:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								            if not op.action or not op.path:
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								                logger.info(
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								                    f'bad payload! seq {op.seq} has action {op.action} path {op.path}!')
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								                continue
							 | 
						
					
						
							
								
									
										
										
										
											2024-05-08 20:26:36 +00:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							
								
									
										
										
										
											2024-05-09 03:43:10 +00:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								            is_ours = repo in atproto_dids
							 | 
						
					
						
							
								
									
										
										
										
											2024-05-08 22:48:39 +00:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								            if is_ours and op.action == 'delete':
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								                logger.info(f'Got delete from our ATProto user: {op}')
							 | 
						
					
						
							
								
									
										
										
										
											2024-05-08 20:31:08 +00:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								                # TODO: also detect deletes of records that *reference* our bridged
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								                # users, eg a delete of a follow or like or repost of them.
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								                # not easy because we need to getRecord the record to check
							 | 
						
					
						
							
								
									
										
										
										
											2024-05-08 22:48:39 +00:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								                new_commits.put(op)
							 | 
						
					
						
							
								
									
										
										
										
											2024-05-08 20:26:36 +00:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								                continue
							 | 
						
					
						
							
								
									
										
										
										
											2024-05-07 23:58:52 +00:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							
								
									
										
										
										
											2024-05-09 20:26:24 +00:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								            cid = p_op.get('cid')
							 | 
						
					
						
							
								
									
										
										
										
											2024-05-08 22:48:39 +00:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								            block = blocks.get(cid)
							 | 
						
					
						
							
								
									
										
										
										
											2024-05-08 20:26:36 +00:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								            # our own commits are sometimes missing the record
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								            # https://github.com/snarfed/bridgy-fed/issues/1016
							 | 
						
					
						
							
								
									
										
										
										
											2024-05-09 20:26:24 +00:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								            if not cid or not block:
							 | 
						
					
						
							
								
									
										
										
										
											2024-05-07 23:58:52 +00:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								                continue
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							
								
									
										
										
										
											2024-05-19 06:17:07 +00:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								            try:
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								                op = Op(*op[:-1], record=block.decoded)
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								            except BaseException:
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								                # https://github.com/hashberg-io/dag-cbor/issues/14
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								                logger.error(f"Couldn't decode block {cid} seq {op.seq}",
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								                             exc_info=True)
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								                continue
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							
								
									
										
										
										
											2024-05-08 22:48:39 +00:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								            type = op.record.get('$type')
							 | 
						
					
						
							
								
									
										
										
										
											2024-05-07 23:58:52 +00:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								            if not type:
							 | 
						
					
						
							
								
									
										
										
										
											2024-05-08 22:48:39 +00:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								                logger.warning('commit record missing $type! {op.action} {op.repo} {op.path} {cid}')
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								                logger.warning(dag_json.encode(op.record).decode())
							 | 
						
					
						
							
								
									
										
										
										
											2024-05-07 23:58:52 +00:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								                continue
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							
								
									
										
										
										
											2024-05-08 22:48:39 +00:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								            if is_ours:
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								                logger.info(f'Got one from our ATProto user: {op.action} {op.repo} {op.path}')
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								                new_commits.put(op)
							 | 
						
					
						
							
								
									
										
										
										
											2024-05-08 17:39:03 +00:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								                continue
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							
								
									
										
										
										
											2024-05-07 23:58:52 +00:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								            subjects = []
							 | 
						
					
						
							
								
									
										
										
										
											2024-05-08 18:18:50 +00:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								            def maybe_add(did_or_ref):
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								                if isinstance(did_or_ref, dict):
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								                    match = AT_URI_PATTERN.match(did_or_ref['uri'])
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								                    if match:
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								                        did = match.group('repo')
							 | 
						
					
						
							
								
									
										
										
										
											2024-05-08 18:31:31 +00:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								                    else:
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								                        return
							 | 
						
					
						
							
								
									
										
										
										
											2024-05-08 18:18:50 +00:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								                else:
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								                    did = did_or_ref
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							
								
									
										
										
										
											2024-05-09 03:43:10 +00:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								                if did and did in bridged_dids:
							 | 
						
					
						
							
								
									
										
										
										
											2024-05-07 23:58:52 +00:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								                    add(subjects, did)
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								            if type in ('app.bsky.feed.like', 'app.bsky.feed.repost'):
							 | 
						
					
						
							
								
									
										
										
										
											2024-05-08 22:48:39 +00:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								                maybe_add(op.record['subject'])
							 | 
						
					
						
							
								
									
										
										
										
											2024-05-07 23:58:52 +00:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								            elif type in ('app.bsky.graph.block', 'app.bsky.graph.follow'):
							 | 
						
					
						
							
								
									
										
										
										
											2024-05-08 22:48:39 +00:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								                maybe_add(op.record['subject'])
							 | 
						
					
						
							
								
									
										
										
										
											2024-05-07 23:58:52 +00:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								            elif type == 'app.bsky.feed.post':
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								                # replies
							 | 
						
					
						
							
								
									
										
										
										
											2024-05-08 22:48:39 +00:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								                if reply := op.record.get('reply'):
							 | 
						
					
						
							
								
									
										
										
										
											2024-05-07 23:58:52 +00:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								                    for ref in 'parent', 'root':
							 | 
						
					
						
							
								
									
										
										
										
											2024-05-08 18:18:50 +00:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								                        maybe_add(reply[ref])
							 | 
						
					
						
							
								
									
										
										
										
											2024-05-07 23:58:52 +00:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								                # mentions
							 | 
						
					
						
							
								
									
										
										
										
											2024-05-08 22:48:39 +00:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								                for facet in op.record.get('facets', []):
							 | 
						
					
						
							
								
									
										
										
										
											2024-05-07 23:58:52 +00:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								                    for feature in facet['features']:
							 | 
						
					
						
							
								
									
										
										
										
											2024-05-08 18:31:31 +00:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								                        if feature['$type'] == 'app.bsky.richtext.facet#mention':
							 | 
						
					
						
							
								
									
										
										
										
											2024-05-07 23:58:52 +00:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								                            maybe_add(feature['did'])
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							
								
									
										
										
										
											2024-05-08 18:39:45 +00:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								                # quote posts
							 | 
						
					
						
							
								
									
										
										
										
											2024-05-08 22:48:39 +00:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								                if embed := op.record.get('embed'):
							 | 
						
					
						
							
								
									
										
										
										
											2024-05-08 19:13:26 +00:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								                    if embed['$type'] == 'app.bsky.embed.record':
							 | 
						
					
						
							
								
									
										
										
										
											2024-05-08 18:39:45 +00:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								                        maybe_add(embed['record'])
							 | 
						
					
						
							
								
									
										
										
										
											2024-05-08 19:13:26 +00:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								                    elif embed['$type'] == 'app.bsky.embed.recordWithMedia':
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								                        maybe_add(embed['record']['record'])
							 | 
						
					
						
							
								
									
										
										
										
											2024-05-07 23:58:52 +00:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							
								
									
										
										
										
											2024-05-08 17:39:03 +00:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								            if subjects:
							 | 
						
					
						
							
								
									
										
										
										
											2024-05-08 22:48:39 +00:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								                logger.info(f'Got one re our ATProto users {subjects}: {op.action} {op.repo} {op.path}')
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								                new_commits.put(op)
							 | 
						
					
						
							
								
									
										
										
										
											2024-05-07 23:58:52 +00:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							
								
									
										
										
										
											2024-05-14 05:02:09 +00:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								def handler():
							 | 
						
					
						
							
								
									
										
										
										
											2024-07-04 23:58:06 +00:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								    """Wrapper around :func:`handle` that catches exceptions and restarts."""
							 | 
						
					
						
							
								
									
										
										
										
											2024-05-11 02:59:04 +00:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								    logger.info(f'started handle thread to store objects and enqueue receive tasks')
							 | 
						
					
						
							
								
									
										
										
										
											2024-05-08 17:39:03 +00:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							
								
									
										
										
										
											2024-05-11 02:59:04 +00:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								    while True:
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								        try:
							 | 
						
					
						
							
								
									
										
										
										
											2024-06-05 02:10:01 +00:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								            with ndb_client.context(
							 | 
						
					
						
							
								
									
										
										
										
											2024-08-01 19:07:20 +00:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								                    cache_policy=cache_policy, global_cache=global_cache,
							 | 
						
					
						
							
								
									
										
										
										
											2024-08-01 19:42:13 +00:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								                    global_cache_policy=global_cache_policy,
							 | 
						
					
						
							
								
									
										
										
										
											2024-08-01 19:07:20 +00:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								                    global_cache_timeout_policy=global_cache_timeout_policy):
							 | 
						
					
						
							
								
									
										
										
										
											2024-05-14 05:02:09 +00:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								                handle()
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							
								
									
										
										
										
											2024-05-11 02:59:04 +00:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								            # if we return cleanly, that means we hit the limit
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								            break
							 | 
						
					
						
							
								
									
										
										
										
											2024-05-14 05:02:09 +00:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							
								
									
										
										
										
											2024-05-11 02:59:04 +00:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								        except BaseException:
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								            report_exception()
							 | 
						
					
						
							
								
									
										
										
										
											2024-05-09 03:19:18 +00:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							
								
									
										
										
										
											2024-05-08 17:39:03 +00:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							
								
									
										
										
										
											2024-05-14 05:02:09 +00:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								def handle(limit=None):
							 | 
						
					
						
							
								
									
										
										
										
											2024-05-09 04:35:03 +00:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								    cursor = Cursor.get_by_id(
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								        f'{os.environ["BGS_HOST"]} com.atproto.sync.subscribeRepos')
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    assert cursor
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							
								
									
										
										
										
											2024-07-04 23:58:06 +00:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								    def _handle(op):
							 | 
						
					
						
							
								
									
										
										
										
											2024-07-06 22:26:20 +00:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								        type, _ = op.path.strip('/').split('/', maxsplit=1)
							 | 
						
					
						
							
								
									
										
										
										
											2024-08-20 18:38:35 +00:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								        record_json = dag_json.encode(op.record, dialect='atproto')
							 | 
						
					
						
							
								
									
										
										
										
											2024-07-06 22:26:20 +00:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								        if type not in ATProto.SUPPORTED_RECORD_TYPES:
							 | 
						
					
						
							
								
									
										
										
										
											2024-08-20 18:38:35 +00:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								            logger.info(f'Skipping unsupported type {op.record["$type"]}: {record_json}')
							 | 
						
					
						
							
								
									
										
										
										
											2024-07-04 23:58:06 +00:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								            return
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							
								
									
										
										
										
											2024-05-09 03:19:18 +00:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								        at_uri = f'at://{op.repo}/{op.path}'
							 | 
						
					
						
							
								
									
										
										
										
											2024-05-08 17:39:03 +00:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								        # store object, enqueue receive task
							 | 
						
					
						
							
								
									
										
										
										
											2024-05-09 03:19:18 +00:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								        if op.action in ('create', 'update'):
							 | 
						
					
						
							
								
									
										
										
										
											2024-05-10 06:08:24 +00:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								            record_kwarg = {
							 | 
						
					
						
							
								
									
										
										
										
											2024-08-20 18:38:35 +00:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								                'bsky': json_loads(record_json),
							 | 
						
					
						
							
								
									
										
										
										
											2024-05-10 06:08:24 +00:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								            }
							 | 
						
					
						
							
								
									
										
										
										
											2024-05-09 03:19:18 +00:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								            obj_id = at_uri
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								        elif op.action == 'delete':
							 | 
						
					
						
							
								
									
										
										
										
											2024-07-06 22:26:20 +00:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								            verb = ('delete'
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								                    if type in ('app.bsky.actor.profile', 'app.bsky.feed.post')
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								                    else 'undo')
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								            obj_id = f'{at_uri}#{verb}'
							 | 
						
					
						
							
								
									
										
										
										
											2024-05-09 03:19:18 +00:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								            record_kwarg = {'our_as1': {
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								                'objectType': 'activity',
							 | 
						
					
						
							
								
									
										
										
										
											2024-07-06 22:26:20 +00:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								                'verb': verb,
							 | 
						
					
						
							
								
									
										
										
										
											2024-05-09 03:19:18 +00:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								                'id': obj_id,
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								                'actor': op.repo,
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								                'object': at_uri,
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								            }}
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								        else:
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								            logger.error(f'Unknown action {action} for {op.repo} {op.path}')
							 | 
						
					
						
							
								
									
										
										
										
											2024-07-04 23:58:06 +00:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								            return
							 | 
						
					
						
							
								
									
										
										
										
											2024-05-09 03:19:18 +00:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							
								
									
										
										
										
											2024-05-09 03:43:10 +00:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								        try:
							 | 
						
					
						
							
								
									
										
										
										
											2024-05-10 04:25:13 +00:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								            # logger.info(f'enqueuing receive task for {at_uri}')
							 | 
						
					
						
							
								
									
										
										
										
											2024-05-28 23:04:14 +00:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								            obj = Object.get_or_create(id=obj_id, authed_as=op.repo, status='new',
							 | 
						
					
						
							
								
									
										
										
										
											2024-05-09 03:43:10 +00:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								                                       users=[ATProto(id=op.repo).key],
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								                                       source_protocol=ATProto.LABEL, **record_kwarg)
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								            create_task(queue='receive', obj=obj.key.urlsafe(), authed_as=op.repo)
							 | 
						
					
						
							
								
									
										
										
										
											2024-05-10 04:31:41 +00:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								        except BaseException:
							 | 
						
					
						
							
								
									
										
										
										
											2024-05-09 06:11:27 +00:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								            if DEBUG:
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								                raise
							 | 
						
					
						
							
								
									
										
										
										
											2024-05-10 04:31:41 +00:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								            report_exception()
							 | 
						
					
						
							
								
									
										
										
										
											2024-05-09 06:11:27 +00:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								        if util.now().replace(tzinfo=None) - cursor.updated > STORE_CURSOR_FREQ:
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								            # it's been long enough, update our stored cursor
							 | 
						
					
						
							
								
									
										
										
										
											2024-05-14 05:02:09 +00:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								            logger.info(f'updating stored cursor to {op.seq}')
							 | 
						
					
						
							
								
									
										
										
										
											2024-05-09 06:11:27 +00:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								            cursor.cursor = op.seq
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								            cursor.put()
							 | 
						
					
						
							
								
									
										
										
										
											2024-05-09 03:19:18 +00:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							
								
									
										
										
										
											2024-07-05 04:11:38 +00:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								    seen = 0
							 | 
						
					
						
							
								
									
										
										
										
											2024-07-04 23:58:06 +00:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								    while op := new_commits.get():
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								        _handle(op)
							 | 
						
					
						
							
								
									
										
										
										
											2024-07-05 04:11:38 +00:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								        seen += 1
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								        if limit is not None and seen >= limit:
							 | 
						
					
						
							
								
									
										
										
										
											2024-05-09 03:19:18 +00:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								            return
							 | 
						
					
						
							
								
									
										
										
										
											2024-05-08 17:39:03 +00:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							
								
									
										
										
										
											2024-05-09 13:43:51 +00:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								    assert False, "handle thread shouldn't reach here!"
							 |