2025-06-15 14:44:54 +00:00
|
|
|
"""Nostr backfeed, via long-lived websocket connection(s) to relay(s)."""
|
2025-06-17 23:05:06 +00:00
|
|
|
from datetime import datetime, timedelta
|
2025-06-15 14:44:54 +00:00
|
|
|
import logging
|
2025-06-16 20:21:12 +00:00
|
|
|
import secrets
|
2025-06-17 23:05:06 +00:00
|
|
|
from threading import Event, Lock, Thread, Timer
|
2025-06-15 14:44:54 +00:00
|
|
|
import time
|
|
|
|
|
|
|
|
from google.cloud.ndb.exceptions import ContextError
|
2025-06-16 01:25:47 +00:00
|
|
|
from granary.nostr import (
|
|
|
|
id_to_uri,
|
|
|
|
KIND_DELETE,
|
2025-06-16 22:48:03 +00:00
|
|
|
uri_for,
|
2025-06-16 23:35:50 +00:00
|
|
|
uri_to_id,
|
2025-06-16 20:43:55 +00:00
|
|
|
verify,
|
2025-06-16 01:25:47 +00:00
|
|
|
)
|
2025-06-15 14:44:54 +00:00
|
|
|
from oauth_dropins.webutil import util
|
|
|
|
from oauth_dropins.webutil.appengine_config import ndb_client
|
|
|
|
from oauth_dropins.webutil.appengine_info import DEBUG
|
2025-06-16 20:21:12 +00:00
|
|
|
from oauth_dropins.webutil.util import HTTP_TIMEOUT, json_dumps, json_loads
|
|
|
|
from websockets.exceptions import ConnectionClosed
|
|
|
|
from websockets.sync.client import connect
|
|
|
|
|
2025-06-15 14:44:54 +00:00
|
|
|
from common import (
|
|
|
|
create_task,
|
|
|
|
NDB_CONTEXT_KWARGS,
|
|
|
|
report_error,
|
|
|
|
report_exception,
|
|
|
|
)
|
|
|
|
from models import PROTOCOLS
|
2025-06-16 01:25:47 +00:00
|
|
|
import nostr
|
2025-06-15 14:44:54 +00:00
|
|
|
from nostr import Nostr
|
|
|
|
from protocol import DELETE_TASK_DELAY
|
|
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
|
|
RECONNECT_DELAY = timedelta(seconds=30)
|
2025-06-17 23:05:06 +00:00
|
|
|
LOAD_USERS_FREQ = timedelta(seconds=10)
|
2025-06-15 14:44:54 +00:00
|
|
|
|
2025-06-16 01:25:47 +00:00
|
|
|
# global: _load_pubkeys populates them, subscribe uses them
|
2025-06-15 14:44:54 +00:00
|
|
|
nostr_pubkeys = set()
|
|
|
|
bridged_pubkeys = set()
|
2025-07-05 22:35:45 +00:00
|
|
|
pubkeys_loaded_at = datetime(1900, 1, 1)
|
2025-06-15 14:44:54 +00:00
|
|
|
pubkeys_initialized = Event()
|
|
|
|
|
2025-07-04 03:18:00 +00:00
|
|
|
# string relay websocket adddress URIs
|
2025-06-17 23:05:06 +00:00
|
|
|
subscribed_relays = []
|
|
|
|
subscribed_relays_lock = Lock()
|
2025-06-15 14:44:54 +00:00
|
|
|
|
2025-06-17 23:05:06 +00:00
|
|
|
|
|
|
|
def init(subscribe=True):
|
2025-06-18 04:04:57 +00:00
|
|
|
logger.info('Starting _load_users timer')
|
2025-06-15 14:44:54 +00:00
|
|
|
# run in a separate thread since it needs to make its own NDB
|
|
|
|
# context when it runs in the timer thread
|
2025-06-17 23:05:06 +00:00
|
|
|
Thread(target=_load_users, daemon=True).start()
|
2025-06-15 14:44:54 +00:00
|
|
|
pubkeys_initialized.wait()
|
|
|
|
pubkeys_initialized.clear()
|
|
|
|
|
2025-06-17 23:05:06 +00:00
|
|
|
if subscribe:
|
|
|
|
add_relay(Nostr.DEFAULT_TARGET)
|
|
|
|
|
2025-06-15 14:44:54 +00:00
|
|
|
|
2025-06-17 23:05:06 +00:00
|
|
|
def _load_users():
|
2025-07-05 22:35:45 +00:00
|
|
|
global pubkeys_loaded_at
|
2025-06-15 14:44:54 +00:00
|
|
|
|
|
|
|
if not DEBUG:
|
2025-06-17 23:05:06 +00:00
|
|
|
Timer(LOAD_USERS_FREQ.total_seconds(), _load_users).start()
|
2025-06-15 14:44:54 +00:00
|
|
|
|
|
|
|
with ndb_client.context(**NDB_CONTEXT_KWARGS):
|
|
|
|
try:
|
2025-06-18 02:02:01 +00:00
|
|
|
loaded_at = util.now().replace(tzinfo=None)
|
2025-06-16 23:15:18 +00:00
|
|
|
|
2025-06-16 23:35:50 +00:00
|
|
|
new_nostr = Nostr.query(Nostr.status == None,
|
2025-07-04 03:18:00 +00:00
|
|
|
Nostr.enabled_protocols != None,
|
2025-07-05 22:35:45 +00:00
|
|
|
Nostr.updated > pubkeys_loaded_at,
|
2025-06-17 23:05:06 +00:00
|
|
|
).fetch()
|
|
|
|
Nostr.load_multi(new_nostr)
|
|
|
|
for user in new_nostr:
|
|
|
|
nostr_pubkeys.add(uri_to_id(user.key.id()))
|
|
|
|
if target := Nostr.target_for(user.obj):
|
|
|
|
add_relay(target)
|
2025-06-16 23:35:50 +00:00
|
|
|
|
2025-06-15 14:44:54 +00:00
|
|
|
new_bridged = []
|
|
|
|
for proto in PROTOCOLS.values():
|
|
|
|
if proto and proto != Nostr:
|
2025-06-16 01:25:47 +00:00
|
|
|
# query for all users, then filter for nostr enabled
|
|
|
|
users = proto.query(proto.status == None,
|
2025-06-18 04:04:57 +00:00
|
|
|
proto.enabled_protocols == 'nostr',
|
|
|
|
proto.updated > bridged_loaded_at,
|
|
|
|
).fetch()
|
|
|
|
new_bridged.extend(users)
|
2025-06-17 01:30:03 +00:00
|
|
|
|
|
|
|
bridged_pubkeys.update(user.hex_pubkey() for user in new_bridged)
|
2025-06-15 14:44:54 +00:00
|
|
|
|
2025-07-05 22:35:45 +00:00
|
|
|
# set *after* we populate bridged_pubkeys and nostr_pubkeys so that if we
|
|
|
|
# crash earlier, we re-query from the earlier timestamp
|
|
|
|
pubkeys_loaded_at = loaded_at
|
2025-06-15 14:44:54 +00:00
|
|
|
pubkeys_initialized.set()
|
|
|
|
total = len(nostr_pubkeys) + len(bridged_pubkeys)
|
2025-06-17 01:30:03 +00:00
|
|
|
logger.info(f'Nostr pubkeys: {total}, Nostr {len(nostr_pubkeys)} (+{len(new_nostr)}), bridged {len(bridged_pubkeys)} (+{len(new_bridged)})')
|
2025-06-15 14:44:54 +00:00
|
|
|
|
|
|
|
except BaseException:
|
|
|
|
# eg google.cloud.ndb.exceptions.ContextError when we lose the ndb context
|
|
|
|
# https://console.cloud.google.com/errors/detail/CLO6nJnRtKXRyQE?project=bridgy-federated
|
|
|
|
report_exception()
|
|
|
|
|
|
|
|
|
2025-06-17 23:05:06 +00:00
|
|
|
def add_relay(relay):
|
2025-07-04 03:18:00 +00:00
|
|
|
"""Subscribes to a new relay if we're not already connected to it.
|
|
|
|
|
|
|
|
Args:
|
|
|
|
relay (str): URI, relay websocket adddress, starting with ``ws://`` or ``wss://``
|
|
|
|
"""
|
2025-06-17 23:05:06 +00:00
|
|
|
if Nostr.is_blocklisted(relay):
|
|
|
|
logger.warning(f'Not subscribing to relay {relay}')
|
|
|
|
return
|
|
|
|
|
|
|
|
with subscribed_relays_lock:
|
|
|
|
if relay not in subscribed_relays:
|
|
|
|
subscribed_relays.append(relay)
|
|
|
|
Thread(target=subscriber, daemon=True, args=(relay,)).start()
|
|
|
|
|
|
|
|
|
|
|
|
def subscriber(relay):
|
2025-07-04 03:18:00 +00:00
|
|
|
"""Wrapper around :func:`_subscribe` that catches exceptions and reconnects.
|
|
|
|
|
|
|
|
Args:
|
|
|
|
relay (str): URI, relay websocket adddress, starting with ``ws://`` or ``wss://``
|
|
|
|
"""
|
2025-06-17 23:05:06 +00:00
|
|
|
logger.info(f'started thread to subscribe to relay {relay}')
|
2025-06-15 14:44:54 +00:00
|
|
|
|
|
|
|
with ndb_client.context(**NDB_CONTEXT_KWARGS):
|
|
|
|
while True:
|
2025-06-25 23:26:56 +00:00
|
|
|
try:
|
|
|
|
subscribe(relay)
|
|
|
|
except (ConnectionClosed, TimeoutError) as err:
|
|
|
|
logger.warning(err)
|
|
|
|
logger.info(f'disconnected! waiting {RECONNECT_DELAY}, then reconnecting')
|
|
|
|
time.sleep(RECONNECT_DELAY.total_seconds())
|
|
|
|
except BaseException as err:
|
|
|
|
report_exception()
|
2025-06-15 14:44:54 +00:00
|
|
|
|
|
|
|
|
2025-06-17 23:05:06 +00:00
|
|
|
def subscribe(relay, limit=None):
|
2025-06-15 14:44:54 +00:00
|
|
|
"""Subscribes to relay(s), backfeeds responses to our users' activities.
|
|
|
|
|
2025-06-16 20:21:12 +00:00
|
|
|
Args:
|
2025-06-17 23:05:06 +00:00
|
|
|
relay (str): URI, relay websocket adddress, starting with ``ws://`` or ``wss://``
|
2025-06-16 20:21:12 +00:00
|
|
|
limit (int): return after receiving this many messages. Only used in tests.
|
2025-06-15 14:44:54 +00:00
|
|
|
"""
|
2025-06-17 23:05:06 +00:00
|
|
|
with connect(relay, user_agent_header=util.user_agent,
|
2025-06-16 20:21:12 +00:00
|
|
|
open_timeout=util.HTTP_TIMEOUT, close_timeout=util.HTTP_TIMEOUT,
|
|
|
|
) as ws:
|
|
|
|
if not DEBUG:
|
|
|
|
assert limit is None
|
|
|
|
|
2025-07-05 22:35:45 +00:00
|
|
|
last_loaded_at = pubkeys_loaded_at
|
2025-06-16 20:21:12 +00:00
|
|
|
received = 0
|
|
|
|
subscription = secrets.token_urlsafe(16)
|
2025-06-17 23:05:06 +00:00
|
|
|
req = json_dumps([
|
2025-06-17 01:30:03 +00:00
|
|
|
'REQ', subscription,
|
|
|
|
{'#p': sorted(bridged_pubkeys)},
|
|
|
|
{'authors': sorted(nostr_pubkeys)},
|
2025-06-17 23:05:06 +00:00
|
|
|
])
|
|
|
|
logger.debug(f'{ws.remote_address} <= {req}')
|
|
|
|
ws.send(req)
|
2025-06-16 20:21:12 +00:00
|
|
|
|
|
|
|
while True:
|
2025-07-05 22:35:45 +00:00
|
|
|
if pubkeys_loaded_at > last_loaded_at:
|
2025-06-25 23:26:56 +00:00
|
|
|
logger.info(f'reconnecting to {relay} to pick up new user(s)')
|
|
|
|
return
|
|
|
|
|
|
|
|
try:
|
|
|
|
# use timeout to make sure we periodically loop and check whether
|
|
|
|
# we've loaded any new users, above, and need to re-query
|
|
|
|
msg = ws.recv(timeout=util.HTTP_TIMEOUT)
|
|
|
|
except TimeoutError:
|
|
|
|
continue
|
|
|
|
|
2025-06-16 20:21:12 +00:00
|
|
|
logger.debug(f'{ws.remote_address} => {msg}')
|
|
|
|
resp = json_loads(msg)
|
|
|
|
|
|
|
|
# https://nips.nostr.com/1
|
|
|
|
match resp[0]:
|
|
|
|
case 'EVENT':
|
|
|
|
handle(resp[2])
|
|
|
|
|
|
|
|
case 'CLOSED':
|
|
|
|
# relay closed our query. reconnect!
|
|
|
|
return
|
|
|
|
|
|
|
|
case 'OK':
|
|
|
|
# TODO: this is a response to an EVENT we sent
|
|
|
|
pass
|
|
|
|
|
|
|
|
case 'EOSE':
|
|
|
|
# switching from stored results to live
|
|
|
|
pass
|
|
|
|
|
|
|
|
case 'NOTICE':
|
|
|
|
# already logged this
|
|
|
|
pass
|
|
|
|
|
|
|
|
received += 1
|
|
|
|
if limit and received >= limit:
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
|
|
def handle(event):
|
|
|
|
"""Handles a Nostr event. Enqueues a receive task for it if necessary.
|
|
|
|
|
|
|
|
Args:
|
|
|
|
event (dict): Nostr event
|
|
|
|
"""
|
2025-06-16 22:48:03 +00:00
|
|
|
if not (isinstance(event, dict) and event.get('kind') is not None
|
|
|
|
and event.get('pubkey') and event.get('id') and event.get('sig')):
|
|
|
|
logger.info(f'ignoring bad event: {event}')
|
|
|
|
return
|
2025-06-16 20:21:12 +00:00
|
|
|
|
|
|
|
id = event['id']
|
|
|
|
pubkey = event['pubkey']
|
|
|
|
|
2025-06-17 01:30:03 +00:00
|
|
|
mentions = set(tag[1] for tag in event.get('tags', []) if tag[0] == 'p')
|
2025-06-16 20:21:12 +00:00
|
|
|
|
2025-06-17 01:30:03 +00:00
|
|
|
if not (pubkey in nostr_pubkeys # from a Nostr user who's bridged
|
|
|
|
or mentions & bridged_pubkeys): # mentions a user bridged into Nostr
|
2025-06-16 20:21:12 +00:00
|
|
|
return
|
|
|
|
|
2025-06-16 20:43:55 +00:00
|
|
|
if not verify(event):
|
|
|
|
logger.debug(f'bad id or sig for {id}')
|
|
|
|
return
|
|
|
|
|
2025-06-16 20:21:12 +00:00
|
|
|
try:
|
2025-06-16 22:48:03 +00:00
|
|
|
obj_id = uri_for(event)
|
2025-06-16 20:21:12 +00:00
|
|
|
npub_uri = id_to_uri('npub', pubkey)
|
|
|
|
except (TypeError, ValueError):
|
|
|
|
logger.info(f'bad id {id} or pubkey {pubkey}')
|
|
|
|
return
|
2025-06-16 22:48:03 +00:00
|
|
|
logger.debug(f'Got Nostr event {obj_id} from {pubkey}')
|
2025-06-16 20:21:12 +00:00
|
|
|
|
|
|
|
delay = DELETE_TASK_DELAY if event.get('kind') == KIND_DELETE else None
|
|
|
|
try:
|
|
|
|
create_task(queue='receive', id=obj_id, source_protocol=Nostr.LABEL,
|
|
|
|
authed_as=npub_uri, nostr=event, delay=delay)
|
|
|
|
# when running locally, comment out above and uncomment this
|
|
|
|
# logger.info(f'enqueuing receive task for {obj_id}')
|
|
|
|
except ContextError:
|
|
|
|
raise # handled in subscriber()
|
|
|
|
except BaseException:
|
|
|
|
report_error(obj_id, exception=True)
|