atproto_firehose consumer: add logging metric for events/s, backlog

for #1295
pull/1310/head
Ryan Barrett 2024-09-06 12:23:52 -07:00
rodzic b0de078732
commit 41aa8c0544
Nie znaleziono w bazie danych klucza dla tego podpisu
ID klucza GPG: 6BE31FDF4776E9D4
1 zmienionych plików z 16 dodań i 5 usunięć

Wyświetl plik

@ -180,8 +180,6 @@ def handle(limit=None):
# store object, enqueue receive task
if op.action in ('create', 'update'):
if created_at := op.record.get('createdAt'):
logger.info(f'record createdAt {created_at}')
record_kwarg = {
'bsky': json_loads(record_json),
}
@ -217,6 +215,8 @@ def handle(limit=None):
if limit is not None and seen >= limit:
return
last_stored_cursor = cur_timestamp = None
while frame := events.get():
header = libipld.decode_dag_cbor(frame)
# buf = BytesIO(frame)
@ -245,14 +245,25 @@ def handle(limit=None):
logger.warning(f'Payload missing seq! {payload}')
continue
cur_timestamp = payload['time']
# if we fail processing this commit and raise an exception up to subscriber,
# skip it and start with the next commit when we're restarted
cursor.cursor = seq + 1
elapsed = util.now().replace(tzinfo=None) - cursor.updated
if (threading.current_thread().name == 'atproto_firehose.handler-0'
and util.now().replace(tzinfo=None) - cursor.updated > STORE_CURSOR_FREQ):
# it's been long enough, update our stored cursor
logger.info(f'updating stored cursor to {cursor.cursor}')
and elapsed > STORE_CURSOR_FREQ):
events_s = 0
if last_stored_cursor:
events_s = int((cursor.cursor - last_stored_cursor) /
elapsed.total_seconds())
last_stored_cursor = cursor.cursor
behind = util.now() - util.parse_iso8601(cur_timestamp)
# it's been long enough, update our stored cursor and metrics
logger.info(f'updating stored cursor to {cursor.cursor}, {events_s} events/s, {behind} ({int(behind.total_seconds())} s) behind')
cursor.put()
# when running locally, comment out put above and uncomment this
# cursor.updated = util.now().replace(tzinfo=None)