| 
									
										
										
										
											2025-01-19 17:25:57 +00:00
										 |  |  | """ATProto firehose client. Enqueues receive tasks for events for bridged users.
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | https://atproto.com/specs/event-stream | 
					
						
							|  |  |  | https://atproto.com/specs/sync#firehose | 
					
						
							|  |  |  | """
 | 
					
						
							| 
									
										
										
										
											2024-05-08 22:48:39 +00:00
										 |  |  | from collections import namedtuple | 
					
						
							| 
									
										
										
										
											2024-05-14 05:02:09 +00:00
										 |  |  | from datetime import datetime, timedelta | 
					
						
							| 
									
										
										
										
											2024-09-01 14:46:15 +00:00
										 |  |  | from io import BytesIO | 
					
						
							| 
									
										
										
										
											2024-05-07 23:17:44 +00:00
										 |  |  | import itertools | 
					
						
							|  |  |  | import logging | 
					
						
							| 
									
										
										
										
											2024-05-07 21:15:51 +00:00
										 |  |  | import os | 
					
						
							| 
									
										
										
										
											2024-08-30 04:05:33 +00:00
										 |  |  | from queue import Queue | 
					
						
							| 
									
										
										
										
											2024-05-10 04:25:13 +00:00
										 |  |  | from threading import Event, Lock, Thread, Timer | 
					
						
							| 
									
										
										
										
											2024-09-01 14:46:15 +00:00
										 |  |  | import threading | 
					
						
							| 
									
										
										
										
											2024-05-09 03:43:10 +00:00
										 |  |  | import time | 
					
						
							| 
									
										
										
										
											2024-05-07 21:15:51 +00:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-05-10 18:31:47 +00:00
										 |  |  | from arroba.datastore_storage import AtpRepo | 
					
						
							| 
									
										
										
										
											2024-07-06 22:26:20 +00:00
										 |  |  | from arroba.util import parse_at_uri | 
					
						
							| 
									
										
										
										
											2024-09-01 14:46:15 +00:00
										 |  |  | import dag_cbor | 
					
						
							| 
									
										
										
										
											2024-05-07 21:15:51 +00:00
										 |  |  | import dag_json | 
					
						
							| 
									
										
										
										
											2024-05-09 21:31:57 +00:00
										 |  |  | from google.cloud import ndb | 
					
						
							| 
									
										
										
										
											2024-10-01 15:53:38 +00:00
										 |  |  | from google.cloud.ndb.exceptions import ContextError | 
					
						
							| 
									
										
										
										
											2024-05-07 21:15:51 +00:00
										 |  |  | from granary.bluesky import AT_URI_PATTERN | 
					
						
							|  |  |  | from lexrpc.client import Client | 
					
						
							| 
									
										
										
										
											2024-09-01 14:46:15 +00:00
										 |  |  | import libipld | 
					
						
							| 
									
										
										
										
											2024-05-08 17:39:03 +00:00
										 |  |  | from oauth_dropins.webutil import util | 
					
						
							| 
									
										
										
										
											2024-05-09 21:31:57 +00:00
										 |  |  | from oauth_dropins.webutil.appengine_config import ndb_client | 
					
						
							| 
									
										
										
										
											2024-06-13 00:04:59 +00:00
										 |  |  | from oauth_dropins.webutil.appengine_info import DEBUG | 
					
						
							| 
									
										
										
										
											2024-07-05 04:11:38 +00:00
										 |  |  | from oauth_dropins.webutil.util import json_dumps, json_loads | 
					
						
							| 
									
										
										
										
											2024-05-07 21:15:51 +00:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-05-09 04:35:03 +00:00
										 |  |  | from atproto import ATProto, Cursor | 
					
						
							| 
									
										
										
										
											2024-06-05 02:10:01 +00:00
										 |  |  | from common import ( | 
					
						
							|  |  |  |     create_task, | 
					
						
							| 
									
										
										
										
											2024-12-18 23:44:35 +00:00
										 |  |  |     NDB_CONTEXT_KWARGS, | 
					
						
							| 
									
										
										
										
											2024-09-23 04:50:25 +00:00
										 |  |  |     PROTOCOL_DOMAINS, | 
					
						
							| 
									
										
										
										
											2024-09-10 19:42:49 +00:00
										 |  |  |     report_error, | 
					
						
							| 
									
										
										
										
											2024-06-05 02:10:01 +00:00
										 |  |  |     report_exception, | 
					
						
							|  |  |  |     USER_AGENT, | 
					
						
							|  |  |  | ) | 
					
						
							| 
									
										
										
										
											2024-11-29 05:39:30 +00:00
										 |  |  | from protocol import DELETE_TASK_DELAY | 
					
						
							| 
									
										
										
										
											2024-09-23 04:50:25 +00:00
										 |  |  | from web import Web | 
					
						
							| 
									
										
										
										
											2024-05-14 05:02:09 +00:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-06-13 00:04:59 +00:00
										 |  |  | logger = logging.getLogger(__name__) | 
					
						
							| 
									
										
										
										
											2024-05-07 23:17:44 +00:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-05-09 03:43:10 +00:00
										 |  |  | RECONNECT_DELAY = timedelta(seconds=30) | 
					
						
							| 
									
										
										
										
											2024-05-23 05:08:25 +00:00
										 |  |  | STORE_CURSOR_FREQ = timedelta(seconds=10) | 
					
						
							| 
									
										
										
										
											2024-05-09 03:43:10 +00:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-05-08 22:48:39 +00:00
										 |  |  | # a commit operation. similar to arroba.repo.Write. record is None for deletes. | 
					
						
							| 
									
										
										
										
											2024-10-24 04:11:21 +00:00
										 |  |  | Op = namedtuple('Op', ['action', 'repo', 'path', 'seq', 'record', 'time'], | 
					
						
							| 
									
										
										
										
											2025-01-05 21:13:12 +00:00
										 |  |  |                 # last four fields are optional | 
					
						
							|  |  |  |                 defaults=[None, None, None, None]) | 
					
						
							| 
									
										
										
										
											2024-05-08 22:48:39 +00:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-09-06 22:42:14 +00:00
										 |  |  | # contains Ops | 
					
						
							| 
									
										
										
										
											2024-08-30 04:05:33 +00:00
										 |  |  | # | 
					
						
							|  |  |  | # maxsize is important here! if we hit this limit, subscribe will block when it | 
					
						
							|  |  |  | # tries to add more commits until handle consumes some. this keeps subscribe | 
					
						
							|  |  |  | # from getting too far ahead of handle and using too much memory in this queue. | 
					
						
							| 
									
										
										
										
											2024-09-06 22:42:14 +00:00
										 |  |  | commits = Queue(maxsize=1000) | 
					
						
							| 
									
										
										
										
											2024-05-08 17:39:03 +00:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-05-19 20:47:13 +00:00
										 |  |  | # global so that subscribe can reuse it across calls | 
					
						
							| 
									
										
										
										
											2024-08-30 20:16:17 +00:00
										 |  |  | cursor = None | 
					
						
							| 
									
										
										
										
											2024-05-19 20:47:13 +00:00
										 |  |  | 
 | 
					
						
							|  |  |  | # global: _load_dids populates them, subscribe and handle use them | 
					
						
							| 
									
										
										
										
											2024-05-14 05:02:09 +00:00
										 |  |  | atproto_dids = set() | 
					
						
							|  |  |  | atproto_loaded_at = datetime(1900, 1, 1) | 
					
						
							|  |  |  | bridged_dids = set() | 
					
						
							|  |  |  | bridged_loaded_at = datetime(1900, 1, 1) | 
					
						
							| 
									
										
										
										
											2024-09-23 05:29:37 +00:00
										 |  |  | protocol_bot_dids = set() | 
					
						
							| 
									
										
										
										
											2024-05-10 04:25:13 +00:00
										 |  |  | dids_initialized = Event() | 
					
						
							| 
									
										
										
										
											2024-05-09 21:31:57 +00:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-05-07 21:15:51 +00:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-05-09 21:05:05 +00:00
										 |  |  | def load_dids(): | 
					
						
							| 
									
										
										
										
											2024-05-11 02:59:04 +00:00
										 |  |  |     # run in a separate thread since it needs to make its own NDB | 
					
						
							|  |  |  |     # context when it runs in the timer thread | 
					
						
							|  |  |  |     Thread(target=_load_dids).start() | 
					
						
							|  |  |  |     dids_initialized.wait() | 
					
						
							| 
									
										
										
										
											2024-05-25 01:32:32 +00:00
										 |  |  |     dids_initialized.clear() | 
					
						
							| 
									
										
										
										
											2024-05-09 21:31:57 +00:00
										 |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | def _load_dids(): | 
					
						
							| 
									
										
										
										
											2024-05-14 05:02:09 +00:00
										 |  |  |     global atproto_dids, atproto_loaded_at, bridged_dids, bridged_loaded_at | 
					
						
							| 
									
										
										
										
											2024-05-09 03:43:10 +00:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-12-18 23:44:35 +00:00
										 |  |  |     with ndb_client.context(**NDB_CONTEXT_KWARGS): | 
					
						
							| 
									
										
										
										
											2024-05-09 22:38:52 +00:00
										 |  |  |         if not DEBUG: | 
					
						
							| 
									
										
										
										
											2024-05-10 04:25:13 +00:00
										 |  |  |             Timer(STORE_CURSOR_FREQ.total_seconds(), _load_dids).start() | 
					
						
							| 
									
										
										
										
											2024-05-09 03:43:10 +00:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2025-01-07 04:45:55 +00:00
										 |  |  |         atproto_query = ATProto.query(ATProto.status == None, | 
					
						
							|  |  |  |                                       ATProto.enabled_protocols != None, | 
					
						
							| 
									
										
										
										
											2024-05-25 01:32:32 +00:00
										 |  |  |                                       ATProto.updated > atproto_loaded_at) | 
					
						
							| 
									
										
										
										
											2024-11-21 23:52:54 +00:00
										 |  |  |         loaded_at = ATProto.query().order(-ATProto.updated).get().updated | 
					
						
							| 
									
										
										
										
											2024-05-14 05:02:09 +00:00
										 |  |  |         new_atproto = [key.id() for key in atproto_query.iter(keys_only=True)] | 
					
						
							|  |  |  |         atproto_dids.update(new_atproto) | 
					
						
							| 
									
										
										
										
											2024-11-21 23:52:54 +00:00
										 |  |  |         # set *after* we populate atproto_dids so that if we crash earlier, we | 
					
						
							|  |  |  |         # re-query from the earlier timestamp | 
					
						
							|  |  |  |         atproto_loaded_at = loaded_at | 
					
						
							| 
									
										
										
										
											2024-05-14 05:02:09 +00:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-07-23 05:24:13 +00:00
										 |  |  |         bridged_query = AtpRepo.query(AtpRepo.status == None, | 
					
						
							|  |  |  |                                       AtpRepo.created > bridged_loaded_at) | 
					
						
							| 
									
										
										
										
											2024-11-21 23:52:54 +00:00
										 |  |  |         loaded_at = AtpRepo.query().order(-AtpRepo.created).get().created | 
					
						
							| 
									
										
										
										
											2024-05-14 05:02:09 +00:00
										 |  |  |         new_bridged = [key.id() for key in bridged_query.iter(keys_only=True)] | 
					
						
							|  |  |  |         bridged_dids.update(new_bridged) | 
					
						
							| 
									
										
										
										
											2024-11-21 23:52:54 +00:00
										 |  |  |         # set *after* we populate bridged_dids so that if we crash earlier, we | 
					
						
							|  |  |  |         # re-query from the earlier timestamp | 
					
						
							|  |  |  |         bridged_loaded_at = loaded_at | 
					
						
							| 
									
										
										
										
											2024-05-07 21:15:51 +00:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-09-23 05:29:37 +00:00
										 |  |  |         if not protocol_bot_dids: | 
					
						
							|  |  |  |             bot_keys = [Web(id=domain).key for domain in PROTOCOL_DOMAINS] | 
					
						
							|  |  |  |             for bot in ndb.get_multi(bot_keys): | 
					
						
							|  |  |  |                 if bot: | 
					
						
							|  |  |  |                     if did := bot.get_copy(ATProto): | 
					
						
							|  |  |  |                         logger.info(f'Loaded protocol bot user {bot.key.id()} {did}') | 
					
						
							|  |  |  |                         protocol_bot_dids.add(did) | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-05-10 04:25:13 +00:00
										 |  |  |         dids_initialized.set() | 
					
						
							| 
									
										
										
										
											2024-09-01 07:26:37 +00:00
										 |  |  |         total = len(atproto_dids) + len(bridged_dids) | 
					
						
							| 
									
										
										
										
											2024-09-06 22:42:14 +00:00
										 |  |  |         logger.info(f'DIDs: {total} ATProto {len(atproto_dids)} (+{len(new_atproto)}), AtpRepo {len(bridged_dids)} (+{len(new_bridged)}); commits {commits.qsize()}') | 
					
						
							| 
									
										
										
										
											2024-05-09 21:05:05 +00:00
										 |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-05-14 05:02:09 +00:00
										 |  |  | def subscriber(): | 
					
						
							|  |  |  |     """Wrapper around :func:`_subscribe` that catches exceptions and reconnects.""" | 
					
						
							| 
									
										
										
										
											2024-05-09 21:05:05 +00:00
										 |  |  |     logger.info(f'started thread to subscribe to {os.environ["BGS_HOST"]} firehose') | 
					
						
							| 
									
										
										
										
											2024-05-25 01:32:32 +00:00
										 |  |  |     load_dids() | 
					
						
							| 
									
										
										
										
											2024-05-07 21:15:51 +00:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-12-18 23:44:35 +00:00
										 |  |  |     with ndb_client.context(**NDB_CONTEXT_KWARGS): | 
					
						
							| 
									
										
										
										
											2024-09-09 20:21:42 +00:00
										 |  |  |          while True: | 
					
						
							|  |  |  |             try: | 
					
						
							| 
									
										
										
										
											2024-05-14 05:02:09 +00:00
										 |  |  |                 subscribe() | 
					
						
							| 
									
										
										
										
											2024-09-09 20:21:42 +00:00
										 |  |  |             except BaseException: | 
					
						
							|  |  |  |                 report_exception() | 
					
						
							| 
									
										
										
										
											2024-11-21 03:59:29 +00:00
										 |  |  |             logger.info(f'disconnected! waiting {RECONNECT_DELAY} and then reconnecting') | 
					
						
							|  |  |  |             time.sleep(RECONNECT_DELAY.total_seconds()) | 
					
						
							| 
									
										
										
										
											2024-05-09 03:43:10 +00:00
										 |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-05-14 05:02:09 +00:00
										 |  |  | def subscribe(): | 
					
						
							| 
									
										
										
										
											2024-05-11 02:59:04 +00:00
										 |  |  |     """Subscribes to the relay's firehose.
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     Relay hostname comes from the ``BGS_HOST`` environment variable. | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     Args: | 
					
						
							|  |  |  |       reconnect (bool): whether to always reconnect after we get disconnected | 
					
						
							|  |  |  |     """
 | 
					
						
							| 
									
										
										
										
											2024-08-30 20:16:17 +00:00
										 |  |  |     global cursor | 
					
						
							|  |  |  |     if not cursor: | 
					
						
							|  |  |  |         cursor = Cursor.get_or_insert( | 
					
						
							| 
									
										
										
										
											2024-05-19 20:47:13 +00:00
										 |  |  |             f'{os.environ["BGS_HOST"]} com.atproto.sync.subscribeRepos') | 
					
						
							| 
									
										
										
										
											2024-08-30 20:16:17 +00:00
										 |  |  |         # TODO: remove? does this make us skip events? if we remove it, will we | 
					
						
							|  |  |  |         # infinite loop when we fail on an event? | 
					
						
							|  |  |  |         if cursor.cursor: | 
					
						
							|  |  |  |             cursor.cursor += 1 | 
					
						
							| 
									
										
										
										
											2024-05-09 06:11:27 +00:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-09-06 22:42:14 +00:00
										 |  |  |     last_stored_cursor = cur_timestamp = None | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-06-05 02:10:01 +00:00
										 |  |  |     client = Client(f'https://{os.environ["BGS_HOST"]}', | 
					
						
							|  |  |  |                     headers={'User-Agent': USER_AGENT}) | 
					
						
							| 
									
										
										
										
											2024-05-07 23:58:52 +00:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-09-22 00:34:35 +00:00
										 |  |  |     for frame in client.com.atproto.sync.subscribeRepos(decode=False, | 
					
						
							|  |  |  |                                                         cursor=cursor.cursor): | 
					
						
							| 
									
										
										
										
											2024-09-01 14:46:15 +00:00
										 |  |  |         # parse header | 
					
						
							| 
									
										
										
										
											2024-09-06 22:42:14 +00:00
										 |  |  |         header = libipld.decode_dag_cbor(frame) | 
					
						
							| 
									
										
										
										
											2024-09-01 14:46:15 +00:00
										 |  |  |         if header.get('op') == -1: | 
					
						
							| 
									
										
										
										
											2024-09-06 19:23:15 +00:00
										 |  |  |             _, payload = libipld.decode_dag_cbor_multi(frame) | 
					
						
							| 
									
										
										
										
											2024-09-01 14:46:15 +00:00
										 |  |  |             logger.warning(f'Got error from relay! {payload}') | 
					
						
							|  |  |  |             continue | 
					
						
							| 
									
										
										
										
											2024-09-05 22:20:35 +00:00
										 |  |  | 
 | 
					
						
							|  |  |  |         t = header.get('t') | 
					
						
							| 
									
										
										
										
											2025-01-05 21:13:12 +00:00
										 |  |  | 
 | 
					
						
							|  |  |  |         if t not in ('#commit', '#account', '#identity'): | 
					
						
							| 
									
										
										
										
											2025-01-06 04:06:48 +00:00
										 |  |  |             if t not in ('#handle', '#tombstone'): | 
					
						
							|  |  |  |                 logger.info(f'Got {t} from relay') | 
					
						
							| 
									
										
										
										
											2024-09-01 14:46:15 +00:00
										 |  |  |             continue | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         # parse payload | 
					
						
							| 
									
										
										
										
											2024-09-06 19:23:15 +00:00
										 |  |  |         _, payload = libipld.decode_dag_cbor_multi(frame) | 
					
						
							| 
									
										
										
										
											2025-01-05 21:13:12 +00:00
										 |  |  |         repo = payload.get('repo') or payload.get('did') | 
					
						
							| 
									
										
										
										
											2024-09-01 14:46:15 +00:00
										 |  |  |         if not repo: | 
					
						
							|  |  |  |             logger.warning(f'Payload missing repo! {payload}') | 
					
						
							|  |  |  |             continue | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         seq = payload.get('seq') | 
					
						
							|  |  |  |         if not seq: | 
					
						
							|  |  |  |             logger.warning(f'Payload missing seq! {payload}') | 
					
						
							|  |  |  |             continue | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-09-06 19:23:52 +00:00
										 |  |  |         cur_timestamp = payload['time'] | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-09-01 14:46:15 +00:00
										 |  |  |         # if we fail processing this commit and raise an exception up to subscriber, | 
					
						
							|  |  |  |         # skip it and start with the next commit when we're restarted | 
					
						
							|  |  |  |         cursor.cursor = seq + 1 | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-09-06 19:23:52 +00:00
										 |  |  |         elapsed = util.now().replace(tzinfo=None) - cursor.updated | 
					
						
							| 
									
										
										
										
											2024-09-06 22:42:14 +00:00
										 |  |  |         if elapsed > STORE_CURSOR_FREQ: | 
					
						
							| 
									
										
										
										
											2024-09-06 19:23:52 +00:00
										 |  |  |             events_s = 0 | 
					
						
							|  |  |  |             if last_stored_cursor: | 
					
						
							|  |  |  |                 events_s = int((cursor.cursor - last_stored_cursor) / | 
					
						
							|  |  |  |                                elapsed.total_seconds()) | 
					
						
							|  |  |  |             last_stored_cursor = cursor.cursor | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |             behind = util.now() - util.parse_iso8601(cur_timestamp) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |             # it's been long enough, update our stored cursor and metrics | 
					
						
							|  |  |  |             logger.info(f'updating stored cursor to {cursor.cursor}, {events_s} events/s, {behind} ({int(behind.total_seconds())} s) behind') | 
					
						
							| 
									
										
										
										
											2024-09-01 14:46:15 +00:00
										 |  |  |             cursor.put() | 
					
						
							|  |  |  |             # when running locally, comment out put above and uncomment this | 
					
						
							|  |  |  |             # cursor.updated = util.now().replace(tzinfo=None) | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2025-01-05 21:13:12 +00:00
										 |  |  |         if t in ('#account', '#identity'): | 
					
						
							|  |  |  |             if repo in atproto_dids or repo in bridged_dids: | 
					
						
							|  |  |  |                 logger.debug(f'Got {t[1:]} {repo}') | 
					
						
							|  |  |  |                 commits.put(Op(action='account', repo=repo, seq=seq, | 
					
						
							|  |  |  |                                time=cur_timestamp)) | 
					
						
							| 
									
										
										
										
											2025-01-06 03:54:41 +00:00
										 |  |  |             continue | 
					
						
							| 
									
										
										
										
											2025-01-05 21:13:12 +00:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-09-10 19:18:56 +00:00
										 |  |  |         blocks = {}  # maps base32 str CID to dict block | 
					
						
							| 
									
										
										
										
											2024-05-09 20:26:24 +00:00
										 |  |  |         if block_bytes := payload.get('blocks'): | 
					
						
							| 
									
										
										
										
											2024-09-06 18:55:54 +00:00
										 |  |  |             _, blocks = libipld.decode_car(block_bytes) | 
					
						
							| 
									
										
										
										
											2024-05-08 20:26:36 +00:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-09-21 14:27:24 +00:00
										 |  |  |         # detect records from bridged ATProto users that we should handle | 
					
						
							| 
									
										
										
										
											2024-05-09 20:26:24 +00:00
										 |  |  |         for p_op in payload.get('ops', []): | 
					
						
							| 
									
										
										
										
											2024-09-01 07:26:37 +00:00
										 |  |  |             op = Op(repo=payload['repo'], action=p_op.get('action'), | 
					
						
							| 
									
										
										
										
											2024-10-24 04:11:21 +00:00
										 |  |  |                     path=p_op.get('path'), seq=payload['seq'], time=payload['time']) | 
					
						
							| 
									
										
										
										
											2024-05-09 20:26:24 +00:00
										 |  |  |             if not op.action or not op.path: | 
					
						
							|  |  |  |                 logger.info( | 
					
						
							| 
									
										
										
										
											2024-09-21 14:27:24 +00:00
										 |  |  |                     f'bad payload! seq {op.seq} action {op.action} path {op.path}!') | 
					
						
							| 
									
										
										
										
											2024-05-09 20:26:24 +00:00
										 |  |  |                 continue | 
					
						
							| 
									
										
										
										
											2024-05-08 20:26:36 +00:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-09-23 04:50:25 +00:00
										 |  |  |             if op.repo in atproto_dids and op.action == 'delete': | 
					
						
							| 
									
										
										
										
											2024-05-08 20:31:08 +00:00
										 |  |  |                 # TODO: also detect deletes of records that *reference* our bridged | 
					
						
							|  |  |  |                 # users, eg a delete of a follow or like or repost of them. | 
					
						
							|  |  |  |                 # not easy because we need to getRecord the record to check | 
					
						
							| 
									
										
										
										
											2024-09-06 22:42:14 +00:00
										 |  |  |                 commits.put(op) | 
					
						
							| 
									
										
										
										
											2024-05-08 20:26:36 +00:00
										 |  |  |                 continue | 
					
						
							| 
									
										
										
										
											2024-05-07 23:58:52 +00:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-05-09 20:26:24 +00:00
										 |  |  |             cid = p_op.get('cid') | 
					
						
							| 
									
										
										
										
											2024-05-08 22:48:39 +00:00
										 |  |  |             block = blocks.get(cid) | 
					
						
							| 
									
										
										
										
											2024-05-08 20:26:36 +00:00
										 |  |  |             # our own commits are sometimes missing the record | 
					
						
							|  |  |  |             # https://github.com/snarfed/bridgy-fed/issues/1016 | 
					
						
							| 
									
										
										
										
											2024-05-09 20:26:24 +00:00
										 |  |  |             if not cid or not block: | 
					
						
							| 
									
										
										
										
											2024-05-07 23:58:52 +00:00
										 |  |  |                 continue | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-10-24 04:11:21 +00:00
										 |  |  |             op = op._replace(record=block) | 
					
						
							| 
									
										
										
										
											2024-05-08 22:48:39 +00:00
										 |  |  |             type = op.record.get('$type') | 
					
						
							| 
									
										
										
										
											2024-05-07 23:58:52 +00:00
										 |  |  |             if not type: | 
					
						
							| 
									
										
										
										
											2024-05-08 22:48:39 +00:00
										 |  |  |                 logger.warning('commit record missing $type! {op.action} {op.repo} {op.path} {cid}') | 
					
						
							|  |  |  |                 logger.warning(dag_json.encode(op.record).decode()) | 
					
						
							| 
									
										
										
										
											2024-05-07 23:58:52 +00:00
										 |  |  |                 continue | 
					
						
							| 
									
										
										
										
											2024-09-13 18:10:35 +00:00
										 |  |  |             elif type not in ATProto.SUPPORTED_RECORD_TYPES: | 
					
						
							|  |  |  |                 continue | 
					
						
							| 
									
										
										
										
											2024-05-07 23:58:52 +00:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-09-23 04:50:25 +00:00
										 |  |  |             # generally we only want records from bridged Bluesky users. the one | 
					
						
							|  |  |  |             # exception is follows of protocol bot users. | 
					
						
							|  |  |  |             if (op.repo not in atproto_dids | 
					
						
							|  |  |  |                 and not (type == 'app.bsky.graph.follow' | 
					
						
							|  |  |  |                          and op.record['subject'] in protocol_bot_dids)): | 
					
						
							|  |  |  |                 continue | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-09-21 14:27:24 +00:00
										 |  |  |             def is_ours(ref, also_atproto_users=False): | 
					
						
							|  |  |  |                 """Returns True if the arg is a bridge user.""" | 
					
						
							|  |  |  |                 if match := AT_URI_PATTERN.match(ref['uri']): | 
					
						
							|  |  |  |                     did = match.group('repo') | 
					
						
							|  |  |  |                     return did and (did in bridged_dids | 
					
						
							|  |  |  |                                     or also_atproto_users and did in atproto_dids) | 
					
						
							| 
									
										
										
										
											2024-05-08 18:18:50 +00:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-09-21 14:27:24 +00:00
										 |  |  |             if type == 'app.bsky.feed.repost': | 
					
						
							|  |  |  |                 if not is_ours(op.record['subject'], also_atproto_users=True): | 
					
						
							|  |  |  |                     continue | 
					
						
							| 
									
										
										
										
											2024-05-08 18:18:50 +00:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-09-21 14:27:24 +00:00
										 |  |  |             elif type == 'app.bsky.feed.like': | 
					
						
							|  |  |  |                 if not is_ours(op.record['subject'], also_atproto_users=False): | 
					
						
							|  |  |  |                     continue | 
					
						
							| 
									
										
										
										
											2024-05-07 23:58:52 +00:00
										 |  |  | 
 | 
					
						
							|  |  |  |             elif type in ('app.bsky.graph.block', 'app.bsky.graph.follow'): | 
					
						
							| 
									
										
										
										
											2024-09-21 14:27:24 +00:00
										 |  |  |                 if op.record['subject'] not in bridged_dids: | 
					
						
							|  |  |  |                     continue | 
					
						
							| 
									
										
										
										
											2024-05-07 23:58:52 +00:00
										 |  |  | 
 | 
					
						
							|  |  |  |             elif type == 'app.bsky.feed.post': | 
					
						
							| 
									
										
										
										
											2024-05-08 22:48:39 +00:00
										 |  |  |                 if reply := op.record.get('reply'): | 
					
						
							| 
									
										
										
										
											2024-09-21 14:27:24 +00:00
										 |  |  |                     if not is_ours(reply['parent'], also_atproto_users=True): | 
					
						
							|  |  |  |                         continue | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |             commits.put(op) | 
					
						
							| 
									
										
										
										
											2024-09-06 22:42:14 +00:00
										 |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | def handler(): | 
					
						
							|  |  |  |     """Wrapper around :func:`handle` that catches exceptions and restarts.""" | 
					
						
							|  |  |  |     logger.info(f'started handle thread to store objects and enqueue receive tasks') | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-10-01 15:53:38 +00:00
										 |  |  |     while True: | 
					
						
							| 
									
										
										
										
											2024-12-18 23:44:35 +00:00
										 |  |  |         with ndb_client.context(**NDB_CONTEXT_KWARGS): | 
					
						
							| 
									
										
										
										
											2024-09-09 20:21:42 +00:00
										 |  |  |             try: | 
					
						
							| 
									
										
										
										
											2024-09-06 22:42:14 +00:00
										 |  |  |                 handle() | 
					
						
							| 
									
										
										
										
											2024-09-09 20:21:42 +00:00
										 |  |  |                 # if we return cleanly, that means we hit the limit | 
					
						
							|  |  |  |                 break | 
					
						
							|  |  |  |             except BaseException: | 
					
						
							|  |  |  |                 report_exception() | 
					
						
							| 
									
										
										
										
											2024-10-01 15:53:38 +00:00
										 |  |  |                 # fall through to loop to create new ndb context in case this is | 
					
						
							|  |  |  |                 # a ContextError | 
					
						
							|  |  |  |                 # https://console.cloud.google.com/errors/detail/CIvwj_7MmsfOWw;time=P1D;locations=global?project=bridgy-federated | 
					
						
							| 
									
										
										
										
											2024-09-06 22:42:14 +00:00
										 |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | def handle(limit=None): | 
					
						
							| 
									
										
										
										
											2025-01-05 21:22:55 +00:00
										 |  |  |     def _handle_account(op): | 
					
						
							|  |  |  |         # reload DID doc to fetch new changes | 
					
						
							|  |  |  |         ATProto.load(op.repo, did_doc=True, remote=True) | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-09-06 22:42:14 +00:00
										 |  |  |     def _handle(op): | 
					
						
							|  |  |  |         at_uri = f'at://{op.repo}/{op.path}' | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         type, _ = op.path.strip('/').split('/', maxsplit=1) | 
					
						
							|  |  |  |         if type not in ATProto.SUPPORTED_RECORD_TYPES: | 
					
						
							| 
									
										
										
										
											2024-11-17 15:57:03 +00:00
										 |  |  |             logger.info(f'Skipping unsupported type {type}: {at_uri}') | 
					
						
							| 
									
										
										
										
											2024-09-06 22:42:14 +00:00
										 |  |  |             return | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         # store object, enqueue receive task | 
					
						
							| 
									
										
										
										
											2025-01-07 16:12:58 +00:00
										 |  |  |         verb = None | 
					
						
							| 
									
										
										
										
											2024-09-06 22:42:14 +00:00
										 |  |  |         if op.action in ('create', 'update'): | 
					
						
							|  |  |  |             record_kwarg = { | 
					
						
							|  |  |  |                 'bsky': op.record, | 
					
						
							|  |  |  |             } | 
					
						
							|  |  |  |             obj_id = at_uri | 
					
						
							|  |  |  |         elif op.action == 'delete': | 
					
						
							| 
									
										
										
										
											2025-01-07 16:02:11 +00:00
										 |  |  |             verb = ( | 
					
						
							|  |  |  |                 'delete' if type in ('app.bsky.actor.profile', 'app.bsky.feed.post') | 
					
						
							|  |  |  |                 else 'stop-following' if type == 'app.bsky.graph.follow' | 
					
						
							|  |  |  |                 else 'undo') | 
					
						
							| 
									
										
										
										
											2024-09-06 22:42:14 +00:00
										 |  |  |             obj_id = f'{at_uri}#{verb}' | 
					
						
							| 
									
										
										
										
											2024-10-02 04:44:12 +00:00
										 |  |  |             record_kwarg = { | 
					
						
							|  |  |  |                 'our_as1': { | 
					
						
							|  |  |  |                     'objectType': 'activity', | 
					
						
							|  |  |  |                     'verb': verb, | 
					
						
							|  |  |  |                     'id': obj_id, | 
					
						
							|  |  |  |                     'actor': op.repo, | 
					
						
							|  |  |  |                     'object': at_uri, | 
					
						
							|  |  |  |                 }, | 
					
						
							|  |  |  |             } | 
					
						
							| 
									
										
										
										
											2024-09-06 22:42:14 +00:00
										 |  |  |         else: | 
					
						
							| 
									
										
										
										
											2025-01-19 18:35:38 +00:00
										 |  |  |             logger.error(f'Unknown action {op.action} for {op.repo} {op.path}') | 
					
						
							| 
									
										
										
										
											2024-09-06 22:42:14 +00:00
										 |  |  |             return | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2025-01-07 16:12:58 +00:00
										 |  |  |         if verb and verb not in ATProto.SUPPORTED_AS1_TYPES: | 
					
						
							|  |  |  |             return | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2025-01-19 18:35:38 +00:00
										 |  |  |         logger.debug(f'Got {op.action} {op.repo} {op.path}') | 
					
						
							| 
									
										
										
										
											2024-11-29 05:39:30 +00:00
										 |  |  |         delay = DELETE_TASK_DELAY if op.action == 'delete' else None | 
					
						
							| 
									
										
										
										
											2024-09-06 22:42:14 +00:00
										 |  |  |         try: | 
					
						
							| 
									
										
										
										
											2024-10-02 04:44:12 +00:00
										 |  |  |             create_task(queue='receive', id=obj_id, source_protocol=ATProto.LABEL, | 
					
						
							| 
									
										
										
										
											2024-11-29 05:39:30 +00:00
										 |  |  |                         authed_as=op.repo, received_at=op.time, delay=delay, | 
					
						
							|  |  |  |                         **record_kwarg) | 
					
						
							| 
									
										
										
										
											2024-09-06 22:42:14 +00:00
										 |  |  |             # when running locally, comment out above and uncomment this | 
					
						
							|  |  |  |             # logger.info(f'enqueuing receive task for {at_uri}') | 
					
						
							| 
									
										
										
										
											2024-10-01 15:53:38 +00:00
										 |  |  |         except ContextError: | 
					
						
							|  |  |  |             raise  # handled in handle() | 
					
						
							| 
									
										
										
										
											2024-09-06 22:42:14 +00:00
										 |  |  |         except BaseException: | 
					
						
							| 
									
										
										
										
											2024-09-10 19:18:56 +00:00
										 |  |  |             report_error(obj_id, exception=True) | 
					
						
							| 
									
										
										
										
											2024-09-06 22:42:14 +00:00
										 |  |  | 
 | 
					
						
							|  |  |  |     seen = 0 | 
					
						
							|  |  |  |     while op := commits.get(): | 
					
						
							| 
									
										
										
										
											2025-01-05 21:22:55 +00:00
										 |  |  |         match op.action: | 
					
						
							|  |  |  |             case 'account': | 
					
						
							|  |  |  |                 _handle_account(op) | 
					
						
							|  |  |  |             case _: | 
					
						
							|  |  |  |                 _handle(op) | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-09-06 22:42:14 +00:00
										 |  |  |         seen += 1 | 
					
						
							|  |  |  |         if limit is not None and seen >= limit: | 
					
						
							|  |  |  |             return | 
					
						
							| 
									
										
										
										
											2024-05-08 17:39:03 +00:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-05-09 13:43:51 +00:00
										 |  |  |     assert False, "handle thread shouldn't reach here!" |