kopia lustrzana https://github.com/dgtlmoon/changedetection.io
407 wiersze
18 KiB
Python
407 wiersze
18 KiB
Python
import timeago
|
|
from flask_socketio import SocketIO
|
|
|
|
import time
|
|
import os
|
|
from loguru import logger
|
|
from blinker import signal
|
|
|
|
from changedetectionio import strtobool
|
|
|
|
|
|
class SignalHandler:
|
|
"""A standalone class to receive signals"""
|
|
|
|
def __init__(self, socketio_instance, datastore):
|
|
self.socketio_instance = socketio_instance
|
|
self.datastore = datastore
|
|
|
|
# Connect to the watch_check_update signal
|
|
from changedetectionio.flask_app import watch_check_update as wcc
|
|
wcc.connect(self.handle_signal, weak=False)
|
|
# logger.info("SignalHandler: Connected to signal from direct import")
|
|
|
|
# Connect to the queue_length signal
|
|
queue_length_signal = signal('queue_length')
|
|
queue_length_signal.connect(self.handle_queue_length, weak=False)
|
|
# logger.info("SignalHandler: Connected to queue_length signal")
|
|
|
|
watch_delete_signal = signal('watch_deleted')
|
|
watch_delete_signal.connect(self.handle_deleted_signal, weak=False)
|
|
|
|
watch_favicon_bumped_signal = signal('watch_favicon_bump')
|
|
watch_favicon_bumped_signal.connect(self.handle_watch_bumped_favicon_signal, weak=False)
|
|
|
|
# Connect to the notification_event signal
|
|
notification_event_signal = signal('notification_event')
|
|
notification_event_signal.connect(self.handle_notification_event, weak=False)
|
|
logger.info("SignalHandler: Connected to notification_event signal")
|
|
|
|
# Create and start the queue update thread using standard threading
|
|
import threading
|
|
self.polling_emitter_thread = threading.Thread(
|
|
target=self.polling_emit_running_or_queued_watches_threaded,
|
|
daemon=True
|
|
)
|
|
self.polling_emitter_thread.start()
|
|
logger.info("Started polling thread using threading (eventlet-free)")
|
|
|
|
# Store the thread reference in socketio for clean shutdown
|
|
self.socketio_instance.polling_emitter_thread = self.polling_emitter_thread
|
|
|
|
def handle_signal(self, *args, **kwargs):
|
|
logger.trace(f"SignalHandler: Signal received with {len(args)} args and {len(kwargs)} kwargs")
|
|
# Safely extract the watch UUID from kwargs
|
|
watch_uuid = kwargs.get('watch_uuid')
|
|
app_context = kwargs.get('app_context')
|
|
|
|
if watch_uuid:
|
|
# Get the watch object from the datastore
|
|
watch = self.datastore.data['watching'].get(watch_uuid)
|
|
if watch:
|
|
if app_context:
|
|
# note
|
|
with app_context.app_context():
|
|
with app_context.test_request_context():
|
|
# Forward to handle_watch_update with the watch parameter
|
|
handle_watch_update(self.socketio_instance, watch=watch, datastore=self.datastore)
|
|
else:
|
|
handle_watch_update(self.socketio_instance, watch=watch, datastore=self.datastore)
|
|
|
|
logger.trace(f"Signal handler processed watch UUID {watch_uuid}")
|
|
else:
|
|
logger.warning(f"Watch UUID {watch_uuid} not found in datastore")
|
|
|
|
def handle_watch_bumped_favicon_signal(self, *args, **kwargs):
|
|
watch_uuid = kwargs.get('watch_uuid')
|
|
if watch_uuid:
|
|
# Emit the queue size to all connected clients
|
|
self.socketio_instance.emit("watch_bumped_favicon", {
|
|
"uuid": watch_uuid,
|
|
"event_timestamp": time.time()
|
|
})
|
|
logger.debug(f"Watch UUID {watch_uuid} got its favicon updated")
|
|
|
|
def handle_deleted_signal(self, *args, **kwargs):
|
|
watch_uuid = kwargs.get('watch_uuid')
|
|
if watch_uuid:
|
|
# Emit the queue size to all connected clients
|
|
self.socketio_instance.emit("watch_deleted", {
|
|
"uuid": watch_uuid,
|
|
"event_timestamp": time.time()
|
|
})
|
|
logger.debug(f"Watch UUID {watch_uuid} was deleted")
|
|
|
|
def handle_queue_length(self, *args, **kwargs):
|
|
"""Handle queue_length signal and emit to all clients"""
|
|
try:
|
|
queue_length = kwargs.get('length', 0)
|
|
logger.debug(f"SignalHandler: Queue length update received: {queue_length}")
|
|
|
|
# Emit the queue size to all connected clients
|
|
self.socketio_instance.emit("queue_size", {
|
|
"q_length": queue_length,
|
|
"event_timestamp": time.time()
|
|
})
|
|
|
|
except Exception as e:
|
|
logger.error(f"Socket.IO error in handle_queue_length: {str(e)}")
|
|
|
|
def handle_notification_event(self, *args, **kwargs):
|
|
"""Handle notification_event signal and emit to all clients"""
|
|
try:
|
|
watch_uuid = kwargs.get('watch_uuid')
|
|
logger.debug(f"SignalHandler: Notification event received for watch UUID: {watch_uuid}")
|
|
|
|
# Emit the notification event to all connected clients
|
|
self.socketio_instance.emit("notification_event", {
|
|
"watch_uuid": watch_uuid,
|
|
"event_timestamp": time.time()
|
|
})
|
|
|
|
logger.trace(f"Socket.IO: Emitted notification_event for watch UUID {watch_uuid}")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Socket.IO error in handle_notification_event: {str(e)}")
|
|
|
|
def polling_emit_running_or_queued_watches_threaded(self):
|
|
"""Threading version of polling for Windows compatibility"""
|
|
import time
|
|
import threading
|
|
logger.info("Queue update thread started (threading mode)")
|
|
|
|
# Import here to avoid circular imports
|
|
from changedetectionio.flask_app import app
|
|
from changedetectionio import worker_handler
|
|
watch_check_update = signal('watch_check_update')
|
|
|
|
# Track previous state to avoid unnecessary emissions
|
|
previous_running_uuids = set()
|
|
|
|
# Run until app shutdown - check exit flag more frequently for fast shutdown
|
|
exit_event = getattr(app.config, 'exit', threading.Event())
|
|
|
|
while not exit_event.is_set():
|
|
try:
|
|
# Get current running UUIDs from async workers
|
|
running_uuids = set(worker_handler.get_running_uuids())
|
|
|
|
# Only send updates for UUIDs that changed state
|
|
newly_running = running_uuids - previous_running_uuids
|
|
no_longer_running = previous_running_uuids - running_uuids
|
|
|
|
# Send updates for newly running UUIDs (but exit fast if shutdown requested)
|
|
for uuid in newly_running:
|
|
if exit_event.is_set():
|
|
break
|
|
logger.trace(f"Threading polling: UUID {uuid} started processing")
|
|
with app.app_context():
|
|
watch_check_update.send(app_context=app, watch_uuid=uuid)
|
|
time.sleep(0.01) # Small yield
|
|
|
|
# Send updates for UUIDs that finished processing (but exit fast if shutdown requested)
|
|
if not exit_event.is_set():
|
|
for uuid in no_longer_running:
|
|
if exit_event.is_set():
|
|
break
|
|
logger.trace(f"Threading polling: UUID {uuid} finished processing")
|
|
with app.app_context():
|
|
watch_check_update.send(app_context=app, watch_uuid=uuid)
|
|
time.sleep(0.01) # Small yield
|
|
|
|
# Update tracking for next iteration
|
|
previous_running_uuids = running_uuids
|
|
|
|
# Sleep between polling cycles, but check exit flag every 0.5 seconds for fast shutdown
|
|
for _ in range(20): # 20 * 0.5 = 10 seconds total
|
|
if exit_event.is_set():
|
|
break
|
|
time.sleep(0.5)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error in threading polling: {str(e)}")
|
|
# Even during error recovery, check for exit quickly
|
|
for _ in range(1): # 1 * 0.5 = 0.5 seconds
|
|
if exit_event.is_set():
|
|
break
|
|
time.sleep(0.5)
|
|
|
|
# Check if we're in pytest environment - if so, be more gentle with logging
|
|
import sys
|
|
in_pytest = "pytest" in sys.modules or "PYTEST_CURRENT_TEST" in os.environ
|
|
|
|
if not in_pytest:
|
|
logger.info("Queue update thread stopped (threading mode)")
|
|
|
|
|
|
def handle_watch_update(socketio, **kwargs):
|
|
"""Handle watch update signal from blinker"""
|
|
try:
|
|
watch = kwargs.get('watch')
|
|
datastore = kwargs.get('datastore')
|
|
|
|
# Emit the watch update to all connected clients
|
|
from changedetectionio.flask_app import update_q
|
|
from changedetectionio.flask_app import _jinja2_filter_datetime
|
|
from changedetectionio import worker_handler
|
|
|
|
# Get list of watches that are currently running
|
|
running_uuids = worker_handler.get_running_uuids()
|
|
|
|
# Get list of watches in the queue
|
|
queue_list = []
|
|
for q_item in update_q.queue:
|
|
if hasattr(q_item, 'item') and 'uuid' in q_item.item:
|
|
queue_list.append(q_item.item['uuid'])
|
|
|
|
# Get the error texts from the watch
|
|
error_texts = watch.compile_error_texts()
|
|
# Create a simplified watch data object to send to clients
|
|
|
|
watch_data = {
|
|
'checking_now': True if watch.get('uuid') in running_uuids else False,
|
|
'error_text': error_texts,
|
|
'event_timestamp': time.time(),
|
|
'fetch_time': watch.get('fetch_time'),
|
|
'has_error': True if error_texts else False,
|
|
'has_favicon': True if watch.get_favicon_filename() else False,
|
|
'history_n': watch.history_n,
|
|
'last_changed_text': timeago.format(int(watch.last_changed), time.time()) if watch.history_n >= 2 and int(watch.last_changed) > 0 else 'Not yet',
|
|
'last_checked': watch.get('last_checked'),
|
|
'last_checked_text': _jinja2_filter_datetime(watch),
|
|
'notification_muted': True if watch.get('notification_muted') else False,
|
|
'paused': True if watch.get('paused') else False,
|
|
'queued': True if watch.get('uuid') in queue_list else False,
|
|
'unviewed': watch.has_unviewed,
|
|
'uuid': watch.get('uuid'),
|
|
}
|
|
|
|
errored_count = 0
|
|
for watch_uuid_iter, watch_iter in datastore.data['watching'].items():
|
|
if watch_iter.get('last_error'):
|
|
errored_count += 1
|
|
|
|
general_stats = {
|
|
'count_errors': errored_count,
|
|
'has_unviewed': datastore.has_unviewed
|
|
}
|
|
|
|
# Debug what's being emitted
|
|
# logger.debug(f"Emitting 'watch_update' event for {watch.get('uuid')}, data: {watch_data}")
|
|
|
|
# Emit to all clients (no 'broadcast' parameter needed - it's the default behavior)
|
|
socketio.emit("watch_update", {'watch': watch_data, 'general_stats': general_stats})
|
|
|
|
# Log after successful emit - use watch_data['uuid'] to avoid variable shadowing issues
|
|
logger.trace(f"Socket.IO: Emitted update for watch {watch_data['uuid']}, Checking now: {watch_data['checking_now']}")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Socket.IO error in handle_watch_update: {str(e)}")
|
|
|
|
|
|
def init_socketio(app, datastore):
|
|
"""Initialize SocketIO with the main Flask app"""
|
|
import platform
|
|
import sys
|
|
|
|
# Platform-specific async_mode selection for better stability
|
|
system = platform.system().lower()
|
|
python_version = sys.version_info
|
|
|
|
# Check for SocketIO mode configuration via environment variable
|
|
# Default is 'threading' for best cross-platform compatibility
|
|
socketio_mode = os.getenv('SOCKETIO_MODE', 'threading').lower()
|
|
|
|
if socketio_mode == 'gevent':
|
|
# Use gevent mode (higher concurrency but platform limitations)
|
|
try:
|
|
import gevent
|
|
async_mode = 'gevent'
|
|
logger.info(f"SOCKETIO_MODE=gevent: Using {async_mode} mode for Socket.IO")
|
|
except ImportError:
|
|
async_mode = 'threading'
|
|
logger.warning(f"SOCKETIO_MODE=gevent but gevent not available, falling back to {async_mode} mode")
|
|
elif socketio_mode == 'threading':
|
|
# Use threading mode (default - best compatibility)
|
|
async_mode = 'threading'
|
|
logger.info(f"SOCKETIO_MODE=threading: Using {async_mode} mode for Socket.IO")
|
|
else:
|
|
# Invalid mode specified, use default
|
|
async_mode = 'threading'
|
|
logger.warning(f"Invalid SOCKETIO_MODE='{socketio_mode}', using default {async_mode} mode for Socket.IO")
|
|
|
|
# Log platform info for debugging
|
|
logger.info(f"Platform: {system}, Python: {python_version.major}.{python_version.minor}, Socket.IO mode: {async_mode}")
|
|
|
|
# Restrict SocketIO CORS to same origin by default, can be overridden with env var
|
|
cors_origins = os.environ.get('SOCKETIO_CORS_ORIGINS', None)
|
|
|
|
socketio = SocketIO(app,
|
|
async_mode=async_mode,
|
|
cors_allowed_origins=cors_origins, # None means same-origin only
|
|
logger=strtobool(os.getenv('SOCKETIO_LOGGING', 'False')),
|
|
engineio_logger=strtobool(os.getenv('SOCKETIO_LOGGING', 'False')))
|
|
|
|
# Set up event handlers
|
|
logger.info("Socket.IO: Registering connect event handler")
|
|
|
|
@socketio.on('checkbox-operation')
|
|
def event_checkbox_operations(data):
|
|
from changedetectionio.blueprint.ui import _handle_operations
|
|
from changedetectionio import queuedWatchMetaData
|
|
from changedetectionio import worker_handler
|
|
from changedetectionio.flask_app import update_q, watch_check_update
|
|
logger.trace(f"Got checkbox operations event: {data}")
|
|
|
|
datastore = socketio.datastore
|
|
|
|
_handle_operations(
|
|
op=data.get('op'),
|
|
uuids=data.get('uuids'),
|
|
datastore=datastore,
|
|
extra_data=data.get('extra_data'),
|
|
worker_handler=worker_handler,
|
|
update_q=update_q,
|
|
queuedWatchMetaData=queuedWatchMetaData,
|
|
watch_check_update=watch_check_update,
|
|
emit_flash=False
|
|
)
|
|
|
|
@socketio.on('connect')
|
|
def handle_connect():
|
|
"""Handle client connection"""
|
|
# logger.info("Socket.IO: CONNECT HANDLER CALLED - Starting connection process")
|
|
from flask import request
|
|
from flask_login import current_user
|
|
from changedetectionio.flask_app import update_q
|
|
|
|
# Access datastore from socketio
|
|
datastore = socketio.datastore
|
|
# logger.info(f"Socket.IO: Current user authenticated: {current_user.is_authenticated if hasattr(current_user, 'is_authenticated') else 'No current_user'}")
|
|
|
|
# Check if authentication is required and user is not authenticated
|
|
has_password_enabled = datastore.data['settings']['application'].get('password') or os.getenv("SALTED_PASS", False)
|
|
# logger.info(f"Socket.IO: Password enabled: {has_password_enabled}")
|
|
if has_password_enabled and not current_user.is_authenticated:
|
|
logger.warning("Socket.IO: Rejecting unauthenticated connection")
|
|
return False # Reject the connection
|
|
|
|
# Send the current queue size to the newly connected client
|
|
try:
|
|
queue_size = update_q.qsize()
|
|
socketio.emit("queue_size", {
|
|
"q_length": queue_size,
|
|
"event_timestamp": time.time()
|
|
}, room=request.sid) # Send only to this client
|
|
logger.debug(f"Socket.IO: Sent initial queue size {queue_size} to new client")
|
|
except Exception as e:
|
|
logger.error(f"Socket.IO error sending initial queue size: {str(e)}")
|
|
|
|
logger.info("Socket.IO: Client connected")
|
|
|
|
# logger.info("Socket.IO: Registering disconnect event handler")
|
|
@socketio.on('disconnect')
|
|
def handle_disconnect():
|
|
"""Handle client disconnection"""
|
|
logger.info("Socket.IO: Client disconnected")
|
|
|
|
# Create a dedicated signal handler that will receive signals and emit them to clients
|
|
signal_handler = SignalHandler(socketio, datastore)
|
|
|
|
# Register watch operation event handlers
|
|
from .events import register_watch_operation_handlers
|
|
register_watch_operation_handlers(socketio, datastore)
|
|
|
|
# Store the datastore reference on the socketio object for later use
|
|
socketio.datastore = datastore
|
|
|
|
# No stop event needed for threading mode - threads check app.config.exit directly
|
|
|
|
# Add a shutdown method to the socketio object
|
|
def shutdown():
|
|
"""Shutdown the SocketIO server fast and aggressively"""
|
|
try:
|
|
logger.info("Socket.IO: Fast shutdown initiated...")
|
|
|
|
# For threading mode, give the thread a very short time to exit gracefully
|
|
if hasattr(socketio, 'polling_emitter_thread'):
|
|
if socketio.polling_emitter_thread.is_alive():
|
|
logger.info("Socket.IO: Waiting 1 second for polling thread to stop...")
|
|
socketio.polling_emitter_thread.join(timeout=1.0) # Only 1 second timeout
|
|
if socketio.polling_emitter_thread.is_alive():
|
|
logger.info("Socket.IO: Polling thread still running after timeout - continuing with shutdown")
|
|
else:
|
|
logger.info("Socket.IO: Polling thread stopped quickly")
|
|
else:
|
|
logger.info("Socket.IO: Polling thread already stopped")
|
|
|
|
logger.info("Socket.IO: Fast shutdown complete")
|
|
except Exception as e:
|
|
logger.error(f"Socket.IO error during shutdown: {str(e)}")
|
|
|
|
# Attach the shutdown method to the socketio object
|
|
socketio.shutdown = shutdown
|
|
|
|
logger.info("Socket.IO initialized and attached to main Flask app")
|
|
logger.info(f"Socket.IO: Registered event handlers: {socketio.handlers if hasattr(socketio, 'handlers') else 'No handlers found'}")
|
|
return socketio |