changedetection.io/changedetectionio/realtime/socket_server.py

284 wiersze
12 KiB
Python
Czysty Zwykły widok Historia

import timeago
from flask_socketio import SocketIO
import time
import os
from loguru import logger
from blinker import signal
from changedetectionio import strtobool
class SignalHandler:
"""A standalone class to receive signals"""
def __init__(self, socketio_instance, datastore):
self.socketio_instance = socketio_instance
self.datastore = datastore
# Connect to the watch_check_update signal
from changedetectionio.flask_app import watch_check_update as wcc
wcc.connect(self.handle_signal, weak=False)
logger.info("SignalHandler: Connected to signal from direct import")
# Connect to the queue_length signal
queue_length_signal = signal('queue_length')
queue_length_signal.connect(self.handle_queue_length, weak=False)
logger.info("SignalHandler: Connected to queue_length signal")
# Create and start the queue update thread using gevent
import gevent
logger.info("Using gevent for polling thread")
self.polling_emitter_thread = gevent.spawn(self.polling_emit_running_or_queued_watches)
# Store the thread reference in socketio for clean shutdown
self.socketio_instance.polling_emitter_thread = self.polling_emitter_thread
def handle_signal(self, *args, **kwargs):
logger.trace(f"SignalHandler: Signal received with {len(args)} args and {len(kwargs)} kwargs")
# Safely extract the watch UUID from kwargs
watch_uuid = kwargs.get('watch_uuid')
app_context = kwargs.get('app_context')
if watch_uuid:
# Get the watch object from the datastore
watch = self.datastore.data['watching'].get(watch_uuid)
if watch:
if app_context:
#note
with app_context.app_context():
with app_context.test_request_context():
# Forward to handle_watch_update with the watch parameter
handle_watch_update(self.socketio_instance, watch=watch, datastore=self.datastore)
else:
handle_watch_update(self.socketio_instance, watch=watch, datastore=self.datastore)
2025-05-28 07:25:23 +00:00
logger.trace(f"Signal handler processed watch UUID {watch_uuid}")
else:
logger.warning(f"Watch UUID {watch_uuid} not found in datastore")
def handle_queue_length(self, *args, **kwargs):
"""Handle queue_length signal and emit to all clients"""
try:
queue_length = kwargs.get('length', 0)
logger.debug(f"SignalHandler: Queue length update received: {queue_length}")
# Emit the queue size to all connected clients
self.socketio_instance.emit("queue_size", {
"q_length": queue_length,
"event_timestamp": time.time()
})
except Exception as e:
logger.error(f"Socket.IO error in handle_queue_length: {str(e)}")
def polling_emit_running_or_queued_watches(self):
"""Greenlet that periodically updates the browser/frontend with current state of who is being checked or queued
This is because sometimes the browser page could reload (like on clicking on a link) but the data is old
"""
logger.info("Queue update greenlet started")
# Import the watch_check_update signal, update_q, and running_update_threads here to avoid circular imports
from changedetectionio.flask_app import app, running_update_threads
watch_check_update = signal('watch_check_update')
# Use gevent sleep for non-blocking operation
from gevent import sleep as gevent_sleep
# Get the stop event from the socketio instance
stop_event = self.socketio_instance.stop_event if hasattr(self.socketio_instance, 'stop_event') else None
# Run until explicitly stopped
while stop_event is None or not stop_event.is_set():
try:
# For each item in the queue, send a signal, so we update the UI
for t in running_update_threads:
if hasattr(t, 'current_uuid') and t.current_uuid:
2025-05-28 07:25:23 +00:00
logger.trace(f"Sending update for {t.current_uuid}")
# Send with app_context to ensure proper URL generation
with app.app_context():
watch_check_update.send(app_context=app, watch_uuid=t.current_uuid)
# Yield control back to gevent after each send to prevent blocking
gevent_sleep(0.1) # Small sleep to yield control
# Check if we need to stop in the middle of processing
if stop_event is not None and stop_event.is_set():
break
# Sleep between polling/update cycles
gevent_sleep(2)
except Exception as e:
logger.error(f"Error in queue update greenlet: {str(e)}")
# Sleep a bit to avoid flooding logs in case of persistent error
gevent_sleep(0.5)
logger.info("Queue update greenlet stopped")
def handle_watch_update(socketio, **kwargs):
"""Handle watch update signal from blinker"""
try:
watch = kwargs.get('watch')
datastore = kwargs.get('datastore')
# Emit the watch update to all connected clients
from changedetectionio.flask_app import running_update_threads, update_q
from changedetectionio.flask_app import _jinja2_filter_datetime
# Get list of watches that are currently running
running_uuids = []
for t in running_update_threads:
if hasattr(t, 'current_uuid') and t.current_uuid:
running_uuids.append(t.current_uuid)
# Get list of watches in the queue
queue_list = []
for q_item in update_q.queue:
if hasattr(q_item, 'item') and 'uuid' in q_item.item:
queue_list.append(q_item.item['uuid'])
error_texts = ""
# Get the error texts from the watch
error_texts = watch.compile_error_texts()
# Create a simplified watch data object to send to clients
watch_data = {
'checking_now': True if watch.get('uuid') in running_uuids else False,
'fetch_time': watch.get('fetch_time'),
'has_error': True if error_texts else False,
'last_changed': watch.get('last_changed'),
'last_checked': watch.get('last_checked'),
'error_text': error_texts,
'last_checked_text': _jinja2_filter_datetime(watch),
'last_changed_text': timeago.format(int(watch['last_changed']), time.time()) if watch.history_n >= 2 and int(watch.get('last_changed', 0)) > 0 else 'Not yet',
'queued': True if watch.get('uuid') in queue_list else False,
'paused': True if watch.get('paused') else False,
'notification_muted': True if watch.get('notification_muted') else False,
'unviewed': watch.has_unviewed,
'uuid': watch.get('uuid'),
'event_timestamp': time.time()
}
errored_count =0
for uuid, watch in datastore.data['watching'].items():
if watch.get('last_error'):
errored_count += 1
general_stats = {
'count_errors': errored_count,
'has_unviewed': datastore.has_unviewed
}
# Debug what's being emitted
#logger.debug(f"Emitting 'watch_update' event for {watch.get('uuid')}, data: {watch_data}")
# Emit to all clients (no 'broadcast' parameter needed - it's the default behavior)
socketio.emit("watch_update", {'watch': watch_data, 'general_stats': general_stats})
# Log after successful emit
#logger.info(f"Socket.IO: Emitted update for watch {watch.get('uuid')}, Checking now: {watch_data['checking_now']}")
except Exception as e:
logger.error(f"Socket.IO error in handle_watch_update: {str(e)}")
def init_socketio(app, datastore):
"""Initialize SocketIO with the main Flask app"""
# Use the threading async_mode instead of eventlet
# This avoids the need for monkey patching eventlet,
# Which leads to problems with async playwright etc
async_mode = 'gevent'
logger.info(f"Using {async_mode} mode for Socket.IO")
# Restrict SocketIO CORS to same origin by default, can be overridden with env var
cors_origins = os.environ.get('SOCKETIO_CORS_ORIGINS', None)
socketio = SocketIO(app,
async_mode=async_mode,
cors_allowed_origins=cors_origins, # None means same-origin only
logger=strtobool(os.getenv('SOCKETIO_LOGGING', 'False')),
engineio_logger=strtobool(os.getenv('SOCKETIO_LOGGING', 'False')))
# Set up event handlers
@socketio.on('connect')
def handle_connect():
"""Handle client connection"""
from changedetectionio.auth_decorator import login_optionally_required
from flask import request
from flask_login import current_user
from changedetectionio.flask_app import update_q
# Access datastore from socketio
datastore = socketio.datastore
# Check if authentication is required and user is not authenticated
has_password_enabled = datastore.data['settings']['application'].get('password') or os.getenv("SALTED_PASS", False)
if has_password_enabled and not current_user.is_authenticated:
logger.warning("Socket.IO: Rejecting unauthenticated connection")
return False # Reject the connection
# Send the current queue size to the newly connected client
try:
queue_size = update_q.qsize()
socketio.emit("queue_size", {
"q_length": queue_size,
"event_timestamp": time.time()
}, room=request.sid) # Send only to this client
logger.debug(f"Socket.IO: Sent initial queue size {queue_size} to new client")
except Exception as e:
logger.error(f"Socket.IO error sending initial queue size: {str(e)}")
logger.info("Socket.IO: Client connected")
@socketio.on('disconnect')
def handle_disconnect():
"""Handle client disconnection"""
logger.info("Socket.IO: Client disconnected")
# Create a dedicated signal handler that will receive signals and emit them to clients
signal_handler = SignalHandler(socketio, datastore)
# Store the datastore reference on the socketio object for later use
socketio.datastore = datastore
# Create a stop event for our queue update thread using gevent Event
import gevent.event
stop_event = gevent.event.Event()
socketio.stop_event = stop_event
# Add a shutdown method to the socketio object
def shutdown():
"""Shutdown the SocketIO server gracefully"""
try:
logger.info("Socket.IO: Shutting down server...")
# Signal the queue update thread to stop
if hasattr(socketio, 'stop_event'):
socketio.stop_event.set()
logger.info("Socket.IO: Signaled queue update thread to stop")
# Wait for the greenlet to exit (with timeout)
if hasattr(socketio, 'polling_emitter_thread'):
try:
# For gevent greenlets
socketio.polling_emitter_thread.join(timeout=5)
logger.info("Socket.IO: Queue update greenlet joined successfully")
except Exception as e:
logger.error(f"Error joining greenlet: {str(e)}")
logger.info("Socket.IO: Queue update greenlet did not exit in time")
# Close any remaining client connections
#if hasattr(socketio, 'server'):
# socketio.server.disconnect()
logger.info("Socket.IO: Server shutdown complete")
except Exception as e:
logger.error(f"Socket.IO error during shutdown: {str(e)}")
# Attach the shutdown method to the socketio object
socketio.shutdown = shutdown
logger.info("Socket.IO initialized and attached to main Flask app")
return socketio