2025-05-26 18:12:32 +00:00
import timeago
from flask_socketio import SocketIO
import time
import os
from loguru import logger
from blinker import signal
from changedetectionio import strtobool
2025-06-03 08:17:19 +00:00
2025-05-26 18:12:32 +00:00
class SignalHandler :
""" A standalone class to receive signals """
2025-06-03 08:17:19 +00:00
2025-05-26 18:12:32 +00:00
def __init__ ( self , socketio_instance , datastore ) :
self . socketio_instance = socketio_instance
self . datastore = datastore
# Connect to the watch_check_update signal
from changedetectionio . flask_app import watch_check_update as wcc
wcc . connect ( self . handle_signal , weak = False )
2025-06-03 08:17:19 +00:00
# logger.info("SignalHandler: Connected to signal from direct import")
2025-05-26 18:12:32 +00:00
# Connect to the queue_length signal
queue_length_signal = signal ( ' queue_length ' )
queue_length_signal . connect ( self . handle_queue_length , weak = False )
2025-06-03 08:17:19 +00:00
# logger.info("SignalHandler: Connected to queue_length signal")
2025-05-26 18:12:32 +00:00
2025-06-03 12:54:13 +00:00
watch_delete_signal = signal ( ' watch_deleted ' )
watch_delete_signal . connect ( self . handle_deleted_signal , weak = False )
2025-06-04 10:03:11 +00:00
# Connect to the notification_event signal
notification_event_signal = signal ( ' notification_event ' )
notification_event_signal . connect ( self . handle_notification_event , weak = False )
logger . info ( " SignalHandler: Connected to notification_event signal " )
2025-06-03 08:17:19 +00:00
# Create and start the queue update thread using standard threading
import threading
self . polling_emitter_thread = threading . Thread (
2025-07-09 13:16:22 +00:00
target = self . polling_emit_running_or_queued_watches_threaded ,
2025-06-03 08:17:19 +00:00
daemon = True
)
self . polling_emitter_thread . start ( )
logger . info ( " Started polling thread using threading (eventlet-free) " )
2025-05-26 18:12:32 +00:00
# Store the thread reference in socketio for clean shutdown
self . socketio_instance . polling_emitter_thread = self . polling_emitter_thread
def handle_signal ( self , * args , * * kwargs ) :
logger . trace ( f " SignalHandler: Signal received with { len ( args ) } args and { len ( kwargs ) } kwargs " )
# Safely extract the watch UUID from kwargs
watch_uuid = kwargs . get ( ' watch_uuid ' )
app_context = kwargs . get ( ' app_context ' )
if watch_uuid :
# Get the watch object from the datastore
watch = self . datastore . data [ ' watching ' ] . get ( watch_uuid )
if watch :
if app_context :
2025-06-03 08:17:19 +00:00
# note
2025-05-26 18:12:32 +00:00
with app_context . app_context ( ) :
with app_context . test_request_context ( ) :
# Forward to handle_watch_update with the watch parameter
handle_watch_update ( self . socketio_instance , watch = watch , datastore = self . datastore )
else :
handle_watch_update ( self . socketio_instance , watch = watch , datastore = self . datastore )
2025-05-28 07:25:23 +00:00
logger . trace ( f " Signal handler processed watch UUID { watch_uuid } " )
2025-05-26 18:12:32 +00:00
else :
logger . warning ( f " Watch UUID { watch_uuid } not found in datastore " )
2025-06-03 12:54:13 +00:00
def handle_deleted_signal ( self , * args , * * kwargs ) :
watch_uuid = kwargs . get ( ' watch_uuid ' )
if watch_uuid :
# Emit the queue size to all connected clients
self . socketio_instance . emit ( " watch_deleted " , {
" uuid " : watch_uuid ,
" event_timestamp " : time . time ( )
} )
logger . debug ( f " Watch UUID { watch_uuid } was deleted " )
2025-05-26 18:12:32 +00:00
def handle_queue_length ( self , * args , * * kwargs ) :
""" Handle queue_length signal and emit to all clients """
try :
queue_length = kwargs . get ( ' length ' , 0 )
logger . debug ( f " SignalHandler: Queue length update received: { queue_length } " )
2025-06-03 08:17:19 +00:00
2025-05-26 18:12:32 +00:00
# Emit the queue size to all connected clients
self . socketio_instance . emit ( " queue_size " , {
" q_length " : queue_length ,
" event_timestamp " : time . time ( )
} )
2025-06-03 08:17:19 +00:00
2025-05-26 18:12:32 +00:00
except Exception as e :
logger . error ( f " Socket.IO error in handle_queue_length: { str ( e ) } " )
2025-06-04 10:03:11 +00:00
def handle_notification_event ( self , * args , * * kwargs ) :
""" Handle notification_event signal and emit to all clients """
try :
watch_uuid = kwargs . get ( ' watch_uuid ' )
logger . debug ( f " SignalHandler: Notification event received for watch UUID: { watch_uuid } " )
# Emit the notification event to all connected clients
self . socketio_instance . emit ( " notification_event " , {
" watch_uuid " : watch_uuid ,
" event_timestamp " : time . time ( )
} )
2025-07-09 13:16:22 +00:00
2025-06-04 10:03:11 +00:00
logger . trace ( f " Socket.IO: Emitted notification_event for watch UUID { watch_uuid } " )
except Exception as e :
logger . error ( f " Socket.IO error in handle_notification_event: { str ( e ) } " )
2025-06-03 08:17:19 +00:00
def polling_emit_running_or_queued_watches_threaded ( self ) :
""" Threading version of polling for Windows compatibility """
import time
import threading
logger . info ( " Queue update thread started (threading mode) " )
2025-07-09 13:16:22 +00:00
2025-06-03 08:17:19 +00:00
# Import here to avoid circular imports
from changedetectionio . flask_app import app
from changedetectionio import worker_handler
2025-05-26 18:12:32 +00:00
watch_check_update = signal ( ' watch_check_update ' )
2025-07-09 13:16:22 +00:00
2025-06-03 08:17:19 +00:00
# Track previous state to avoid unnecessary emissions
previous_running_uuids = set ( )
2025-07-09 13:16:22 +00:00
2025-06-03 08:17:19 +00:00
# Run until app shutdown - check exit flag more frequently for fast shutdown
exit_event = getattr ( app . config , ' exit ' , threading . Event ( ) )
2025-07-09 13:16:22 +00:00
2025-06-03 08:17:19 +00:00
while not exit_event . is_set ( ) :
2025-05-26 18:12:32 +00:00
try :
2025-06-03 08:17:19 +00:00
# Get current running UUIDs from async workers
running_uuids = set ( worker_handler . get_running_uuids ( ) )
2025-07-09 13:16:22 +00:00
2025-06-03 08:17:19 +00:00
# Only send updates for UUIDs that changed state
newly_running = running_uuids - previous_running_uuids
no_longer_running = previous_running_uuids - running_uuids
2025-07-09 13:16:22 +00:00
2025-06-03 08:17:19 +00:00
# Send updates for newly running UUIDs (but exit fast if shutdown requested)
for uuid in newly_running :
if exit_event . is_set ( ) :
break
logger . trace ( f " Threading polling: UUID { uuid } started processing " )
with app . app_context ( ) :
watch_check_update . send ( app_context = app , watch_uuid = uuid )
time . sleep ( 0.01 ) # Small yield
2025-07-09 13:16:22 +00:00
2025-06-03 08:17:19 +00:00
# Send updates for UUIDs that finished processing (but exit fast if shutdown requested)
if not exit_event . is_set ( ) :
for uuid in no_longer_running :
if exit_event . is_set ( ) :
break
logger . trace ( f " Threading polling: UUID { uuid } finished processing " )
2025-05-26 18:12:32 +00:00
with app . app_context ( ) :
2025-06-03 08:17:19 +00:00
watch_check_update . send ( app_context = app , watch_uuid = uuid )
time . sleep ( 0.01 ) # Small yield
2025-07-09 13:16:22 +00:00
2025-06-03 08:17:19 +00:00
# Update tracking for next iteration
previous_running_uuids = running_uuids
2025-07-09 13:16:22 +00:00
2025-06-03 08:17:19 +00:00
# Sleep between polling cycles, but check exit flag every 0.5 seconds for fast shutdown
for _ in range ( 20 ) : # 20 * 0.5 = 10 seconds total
if exit_event . is_set ( ) :
2025-05-26 18:12:32 +00:00
break
2025-06-03 08:17:19 +00:00
time . sleep ( 0.5 )
2025-07-09 13:16:22 +00:00
2025-05-26 18:12:32 +00:00
except Exception as e :
2025-06-03 08:17:19 +00:00
logger . error ( f " Error in threading polling: { str ( e ) } " )
# Even during error recovery, check for exit quickly
for _ in range ( 1 ) : # 1 * 0.5 = 0.5 seconds
if exit_event . is_set ( ) :
break
time . sleep ( 0.5 )
2025-07-09 13:16:22 +00:00
2025-06-03 08:17:19 +00:00
# Check if we're in pytest environment - if so, be more gentle with logging
import sys
in_pytest = " pytest " in sys . modules or " PYTEST_CURRENT_TEST " in os . environ
2025-07-09 13:16:22 +00:00
2025-06-03 08:17:19 +00:00
if not in_pytest :
logger . info ( " Queue update thread stopped (threading mode) " )
2025-05-26 18:12:32 +00:00
def handle_watch_update ( socketio , * * kwargs ) :
""" Handle watch update signal from blinker """
try :
watch = kwargs . get ( ' watch ' )
datastore = kwargs . get ( ' datastore ' )
# Emit the watch update to all connected clients
2025-06-03 08:17:19 +00:00
from changedetectionio . flask_app import update_q
2025-05-26 18:12:32 +00:00
from changedetectionio . flask_app import _jinja2_filter_datetime
2025-06-03 08:17:19 +00:00
from changedetectionio import worker_handler
2025-05-26 18:12:32 +00:00
# Get list of watches that are currently running
2025-06-03 08:17:19 +00:00
running_uuids = worker_handler . get_running_uuids ( )
2025-05-26 18:12:32 +00:00
# Get list of watches in the queue
queue_list = [ ]
for q_item in update_q . queue :
if hasattr ( q_item , ' item ' ) and ' uuid ' in q_item . item :
queue_list . append ( q_item . item [ ' uuid ' ] )
# Get the error texts from the watch
error_texts = watch . compile_error_texts ( )
# Create a simplified watch data object to send to clients
2025-06-03 08:44:15 +00:00
2025-05-26 18:12:32 +00:00
watch_data = {
2025-06-03 08:44:15 +00:00
' checking_now ' : True if watch . get ( ' uuid ' ) in running_uuids else False ,
2025-07-09 13:16:22 +00:00
' error_text ' : error_texts ,
' event_timestamp ' : time . time ( ) ,
2025-05-26 18:12:32 +00:00
' fetch_time ' : watch . get ( ' fetch_time ' ) ,
' has_error ' : True if error_texts else False ,
2025-07-09 13:16:22 +00:00
' has_favicon ' : True if watch . get_favicon_filename ( ) else False ,
2025-06-03 08:17:19 +00:00
' history_n ' : watch . history_n ,
2025-06-03 08:44:15 +00:00
' last_changed_text ' : timeago . format ( int ( watch . last_changed ) , time . time ( ) ) if watch . history_n > = 2 and int ( watch . last_changed ) > 0 else ' Not yet ' ,
2025-07-09 13:16:22 +00:00
' last_checked ' : watch . get ( ' last_checked ' ) ,
' last_checked_text ' : _jinja2_filter_datetime ( watch ) ,
2025-05-26 18:12:32 +00:00
' notification_muted ' : True if watch . get ( ' notification_muted ' ) else False ,
2025-07-09 13:16:22 +00:00
' paused ' : True if watch . get ( ' paused ' ) else False ,
' queued ' : True if watch . get ( ' uuid ' ) in queue_list else False ,
2025-05-26 18:12:32 +00:00
' unviewed ' : watch . has_unviewed ,
2025-06-03 08:44:15 +00:00
' uuid ' : watch . get ( ' uuid ' ) ,
2025-05-26 18:12:32 +00:00
}
2025-06-03 08:17:19 +00:00
errored_count = 0
for watch_uuid_iter , watch_iter in datastore . data [ ' watching ' ] . items ( ) :
if watch_iter . get ( ' last_error ' ) :
2025-05-26 18:12:32 +00:00
errored_count + = 1
general_stats = {
' count_errors ' : errored_count ,
' has_unviewed ' : datastore . has_unviewed
}
# Debug what's being emitted
2025-06-03 08:17:19 +00:00
# logger.debug(f"Emitting 'watch_update' event for {watch.get('uuid')}, data: {watch_data}")
2025-05-26 18:12:32 +00:00
# Emit to all clients (no 'broadcast' parameter needed - it's the default behavior)
socketio . emit ( " watch_update " , { ' watch ' : watch_data , ' general_stats ' : general_stats } )
2025-06-03 08:17:19 +00:00
# Log after successful emit - use watch_data['uuid'] to avoid variable shadowing issues
logger . trace ( f " Socket.IO: Emitted update for watch { watch_data [ ' uuid ' ] } , Checking now: { watch_data [ ' checking_now ' ] } " )
2025-05-26 18:12:32 +00:00
except Exception as e :
logger . error ( f " Socket.IO error in handle_watch_update: { str ( e ) } " )
def init_socketio ( app , datastore ) :
""" Initialize SocketIO with the main Flask app """
2025-06-03 08:17:19 +00:00
import platform
import sys
2025-07-09 13:16:22 +00:00
2025-06-03 08:17:19 +00:00
# Platform-specific async_mode selection for better stability
system = platform . system ( ) . lower ( )
python_version = sys . version_info
2025-07-09 13:16:22 +00:00
2025-06-03 08:17:19 +00:00
# Check for SocketIO mode configuration via environment variable
# Default is 'threading' for best cross-platform compatibility
socketio_mode = os . getenv ( ' SOCKETIO_MODE ' , ' threading ' ) . lower ( )
2025-07-09 13:16:22 +00:00
2025-06-03 08:17:19 +00:00
if socketio_mode == ' gevent ' :
# Use gevent mode (higher concurrency but platform limitations)
try :
import gevent
async_mode = ' gevent '
logger . info ( f " SOCKETIO_MODE=gevent: Using { async_mode } mode for Socket.IO " )
except ImportError :
async_mode = ' threading '
logger . warning ( f " SOCKETIO_MODE=gevent but gevent not available, falling back to { async_mode } mode " )
elif socketio_mode == ' threading ' :
# Use threading mode (default - best compatibility)
async_mode = ' threading '
logger . info ( f " SOCKETIO_MODE=threading: Using { async_mode } mode for Socket.IO " )
else :
# Invalid mode specified, use default
async_mode = ' threading '
logger . warning ( f " Invalid SOCKETIO_MODE= ' { socketio_mode } ' , using default { async_mode } mode for Socket.IO " )
2025-07-09 13:16:22 +00:00
2025-06-03 08:17:19 +00:00
# Log platform info for debugging
logger . info ( f " Platform: { system } , Python: { python_version . major } . { python_version . minor } , Socket.IO mode: { async_mode } " )
2025-05-26 18:12:32 +00:00
# Restrict SocketIO CORS to same origin by default, can be overridden with env var
cors_origins = os . environ . get ( ' SOCKETIO_CORS_ORIGINS ' , None )
2025-06-03 08:17:19 +00:00
2025-05-26 18:12:32 +00:00
socketio = SocketIO ( app ,
2025-06-03 08:17:19 +00:00
async_mode = async_mode ,
cors_allowed_origins = cors_origins , # None means same-origin only
logger = strtobool ( os . getenv ( ' SOCKETIO_LOGGING ' , ' False ' ) ) ,
engineio_logger = strtobool ( os . getenv ( ' SOCKETIO_LOGGING ' , ' False ' ) ) )
2025-05-26 18:12:32 +00:00
# Set up event handlers
2025-06-03 08:17:19 +00:00
logger . info ( " Socket.IO: Registering connect event handler " )
2025-06-03 12:54:13 +00:00
@socketio.on ( ' checkbox-operation ' )
def event_checkbox_operations ( data ) :
from changedetectionio . blueprint . ui import _handle_operations
from changedetectionio import queuedWatchMetaData
from changedetectionio import worker_handler
from changedetectionio . flask_app import update_q , watch_check_update
logger . trace ( f " Got checkbox operations event: { data } " )
datastore = socketio . datastore
_handle_operations (
op = data . get ( ' op ' ) ,
uuids = data . get ( ' uuids ' ) ,
datastore = datastore ,
extra_data = data . get ( ' extra_data ' ) ,
worker_handler = worker_handler ,
update_q = update_q ,
queuedWatchMetaData = queuedWatchMetaData ,
watch_check_update = watch_check_update ,
emit_flash = False
)
2025-05-26 18:12:32 +00:00
@socketio.on ( ' connect ' )
def handle_connect ( ) :
""" Handle client connection """
2025-06-03 08:17:19 +00:00
# logger.info("Socket.IO: CONNECT HANDLER CALLED - Starting connection process")
2025-05-26 18:12:32 +00:00
from flask import request
from flask_login import current_user
from changedetectionio . flask_app import update_q
# Access datastore from socketio
datastore = socketio . datastore
2025-06-03 08:17:19 +00:00
# logger.info(f"Socket.IO: Current user authenticated: {current_user.is_authenticated if hasattr(current_user, 'is_authenticated') else 'No current_user'}")
2025-05-26 18:12:32 +00:00
# Check if authentication is required and user is not authenticated
has_password_enabled = datastore . data [ ' settings ' ] [ ' application ' ] . get ( ' password ' ) or os . getenv ( " SALTED_PASS " , False )
2025-06-03 08:17:19 +00:00
# logger.info(f"Socket.IO: Password enabled: {has_password_enabled}")
2025-05-26 18:12:32 +00:00
if has_password_enabled and not current_user . is_authenticated :
logger . warning ( " Socket.IO: Rejecting unauthenticated connection " )
return False # Reject the connection
# Send the current queue size to the newly connected client
try :
queue_size = update_q . qsize ( )
socketio . emit ( " queue_size " , {
" q_length " : queue_size ,
" event_timestamp " : time . time ( )
} , room = request . sid ) # Send only to this client
logger . debug ( f " Socket.IO: Sent initial queue size { queue_size } to new client " )
except Exception as e :
logger . error ( f " Socket.IO error sending initial queue size: { str ( e ) } " )
logger . info ( " Socket.IO: Client connected " )
2025-06-03 08:17:19 +00:00
# logger.info("Socket.IO: Registering disconnect event handler")
2025-05-26 18:12:32 +00:00
@socketio.on ( ' disconnect ' )
def handle_disconnect ( ) :
""" Handle client disconnection """
logger . info ( " Socket.IO: Client disconnected " )
# Create a dedicated signal handler that will receive signals and emit them to clients
signal_handler = SignalHandler ( socketio , datastore )
2025-06-03 08:17:19 +00:00
# Register watch operation event handlers
from . events import register_watch_operation_handlers
register_watch_operation_handlers ( socketio , datastore )
2025-05-26 18:12:32 +00:00
# Store the datastore reference on the socketio object for later use
socketio . datastore = datastore
2025-06-03 08:17:19 +00:00
# No stop event needed for threading mode - threads check app.config.exit directly
2025-05-26 18:12:32 +00:00
# Add a shutdown method to the socketio object
def shutdown ( ) :
2025-06-03 08:17:19 +00:00
""" Shutdown the SocketIO server fast and aggressively """
2025-05-26 18:12:32 +00:00
try :
2025-06-03 08:17:19 +00:00
logger . info ( " Socket.IO: Fast shutdown initiated... " )
# For threading mode, give the thread a very short time to exit gracefully
2025-05-26 18:12:32 +00:00
if hasattr ( socketio , ' polling_emitter_thread ' ) :
2025-06-03 08:17:19 +00:00
if socketio . polling_emitter_thread . is_alive ( ) :
logger . info ( " Socket.IO: Waiting 1 second for polling thread to stop... " )
socketio . polling_emitter_thread . join ( timeout = 1.0 ) # Only 1 second timeout
if socketio . polling_emitter_thread . is_alive ( ) :
logger . info ( " Socket.IO: Polling thread still running after timeout - continuing with shutdown " )
else :
logger . info ( " Socket.IO: Polling thread stopped quickly " )
else :
logger . info ( " Socket.IO: Polling thread already stopped " )
logger . info ( " Socket.IO: Fast shutdown complete " )
2025-05-26 18:12:32 +00:00
except Exception as e :
logger . error ( f " Socket.IO error during shutdown: { str ( e ) } " )
2025-06-03 08:17:19 +00:00
2025-05-26 18:12:32 +00:00
# Attach the shutdown method to the socketio object
socketio . shutdown = shutdown
logger . info ( " Socket.IO initialized and attached to main Flask app " )
2025-06-03 08:17:19 +00:00
logger . info ( f " Socket.IO: Registered event handlers: { socketio . handlers if hasattr ( socketio , ' handlers ' ) else ' No handlers found ' } " )
2025-07-09 13:16:22 +00:00
return socketio