2025-06-03 08:17:19 +00:00
#!/usr/bin/env python3
"""
Notification Service Module
Extracted from update_worker . py to provide standalone notification functionality
for both sync and async workers
"""
from loguru import logger
2025-10-14 08:58:53 +00:00
import time
# What is passed around as notification context, also used as the complete list of valid {{ tokens }}
class NotificationContextData ( dict ) :
def __init__ ( self , initial_data = None , * * kwargs ) :
super ( ) . __init__ ( {
' current_snapshot ' : None ,
' diff ' : None ,
' diff_added ' : None ,
' diff_full ' : None ,
' diff_patch ' : None ,
' diff_removed ' : None ,
' notification_timestamp ' : time . time ( ) ,
' screenshot ' : None ,
' triggered_text ' : None ,
' uuid ' : ' XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX ' , # Converted to 'watch_uuid' in create_notification_parameters
' watch_url ' : ' https://WATCH-PLACE-HOLDER/ ' ,
' base_url ' : None ,
' diff_url ' : None ,
' preview_url ' : None ,
' watch_tag ' : None ,
' watch_title ' : None
} )
# Apply any initial data passed in
self . update ( { ' watch_uuid ' : self . get ( ' uuid ' ) } )
if initial_data :
self . update ( initial_data )
2025-06-03 08:17:19 +00:00
2025-10-14 08:58:53 +00:00
# Apply any keyword arguments
if kwargs :
self . update ( kwargs )
def set_random_for_validation ( self ) :
import random , string
""" Randomly fills all dict keys with random strings (for validation/testing). """
for key in self . keys ( ) :
if key in [ ' uuid ' , ' time ' , ' watch_uuid ' ] :
continue
rand_str = ' RANDOM-PLACEHOLDER- ' + ' ' . join ( random . choices ( string . ascii_letters + string . digits , k = 12 ) )
self [ key ] = rand_str
2025-06-03 08:17:19 +00:00
class NotificationService :
"""
Standalone notification service that handles all notification functionality
previously embedded in the update_worker class
"""
def __init__ ( self , datastore , notification_q ) :
self . datastore = datastore
self . notification_q = notification_q
2025-10-14 08:58:53 +00:00
def queue_notification_for_watch ( self , n_object : NotificationContextData , watch ) :
2025-06-03 08:17:19 +00:00
"""
Queue a notification for a watch with full diff rendering and template variables
"""
from changedetectionio import diff
from changedetectionio . notification import default_notification_format_for_watch
2025-10-14 08:58:53 +00:00
if not isinstance ( n_object , NotificationContextData ) :
raise TypeError ( f " Expected NotificationContextData, got { type ( n_object ) } " )
2025-06-03 08:17:19 +00:00
dates = [ ]
trigger_text = ' '
now = time . time ( )
if watch :
watch_history = watch . history
dates = list ( watch_history . keys ( ) )
trigger_text = watch . get ( ' trigger_text ' , [ ] )
# Add text that was triggered
if len ( dates ) :
snapshot_contents = watch . get_history_snapshot ( dates [ - 1 ] )
else :
snapshot_contents = " No snapshot/history available, the watch should fetch atleast once. "
# If we ended up here with "System default"
if n_object . get ( ' notification_format ' ) == default_notification_format_for_watch :
n_object [ ' notification_format ' ] = self . datastore . data [ ' settings ' ] [ ' application ' ] . get ( ' notification_format ' )
html_colour_enable = False
# HTML needs linebreak, but MarkDown and Text can use a linefeed
if n_object . get ( ' notification_format ' ) == ' HTML ' :
line_feed_sep = " <br> "
# Snapshot will be plaintext on the disk, convert to some kind of HTML
snapshot_contents = snapshot_contents . replace ( ' \n ' , line_feed_sep )
elif n_object . get ( ' notification_format ' ) == ' HTML Color ' :
line_feed_sep = " <br> "
# Snapshot will be plaintext on the disk, convert to some kind of HTML
snapshot_contents = snapshot_contents . replace ( ' \n ' , line_feed_sep )
html_colour_enable = True
else :
line_feed_sep = " \n "
triggered_text = ' '
if len ( trigger_text ) :
from . import html_tools
triggered_text = html_tools . get_triggered_text ( content = snapshot_contents , trigger_text = trigger_text )
if triggered_text :
triggered_text = line_feed_sep . join ( triggered_text )
# Could be called as a 'test notification' with only 1 snapshot available
prev_snapshot = " Example text: example test \n Example text: change detection is cool \n Example text: some more examples \n "
current_snapshot = " Example text: example test \n Example text: change detection is fantastic \n Example text: even more examples \n Example text: a lot more examples "
if len ( dates ) > 1 :
prev_snapshot = watch . get_history_snapshot ( dates [ - 2 ] )
current_snapshot = watch . get_history_snapshot ( dates [ - 1 ] )
n_object . update ( {
' current_snapshot ' : snapshot_contents ,
' diff ' : diff . render_diff ( prev_snapshot , current_snapshot , line_feed_sep = line_feed_sep , html_colour = html_colour_enable ) ,
' diff_added ' : diff . render_diff ( prev_snapshot , current_snapshot , include_removed = False , line_feed_sep = line_feed_sep ) ,
' diff_full ' : diff . render_diff ( prev_snapshot , current_snapshot , include_equal = True , line_feed_sep = line_feed_sep , html_colour = html_colour_enable ) ,
' diff_patch ' : diff . render_diff ( prev_snapshot , current_snapshot , line_feed_sep = line_feed_sep , patch_format = True ) ,
' diff_removed ' : diff . render_diff ( prev_snapshot , current_snapshot , include_added = False , line_feed_sep = line_feed_sep ) ,
' screenshot ' : watch . get_screenshot ( ) if watch and watch . get ( ' notification_screenshot ' ) else None ,
' triggered_text ' : triggered_text ,
' uuid ' : watch . get ( ' uuid ' ) if watch else None ,
' watch_url ' : watch . get ( ' url ' ) if watch else None ,
2025-10-14 08:58:53 +00:00
' watch_uuid ' : watch . get ( ' uuid ' ) if watch else None ,
2025-06-03 08:17:19 +00:00
} )
if watch :
n_object . update ( watch . extra_notification_token_values ( ) )
logger . trace ( f " Main rendered notification placeholders (diff_added etc) calculated in { time . time ( ) - now : .3f } s " )
logger . debug ( " Queued notification for sending " )
self . notification_q . put ( n_object )
def _check_cascading_vars ( self , var_name , watch ) :
"""
Check notification variables in cascading priority :
Individual watch settings > Tag settings > Global settings
"""
from changedetectionio . notification import (
default_notification_format_for_watch ,
default_notification_body ,
default_notification_title
)
# Would be better if this was some kind of Object where Watch can reference the parent datastore etc
v = watch . get ( var_name )
if v and not watch . get ( ' notification_muted ' ) :
if var_name == ' notification_format ' and v == default_notification_format_for_watch :
return self . datastore . data [ ' settings ' ] [ ' application ' ] . get ( ' notification_format ' )
return v
tags = self . datastore . get_all_tags_for_watch ( uuid = watch . get ( ' uuid ' ) )
if tags :
for tag_uuid , tag in tags . items ( ) :
v = tag . get ( var_name )
if v and not tag . get ( ' notification_muted ' ) :
return v
if self . datastore . data [ ' settings ' ] [ ' application ' ] . get ( var_name ) :
return self . datastore . data [ ' settings ' ] [ ' application ' ] . get ( var_name )
# Otherwise could be defaults
if var_name == ' notification_format ' :
return default_notification_format_for_watch
if var_name == ' notification_body ' :
return default_notification_body
if var_name == ' notification_title ' :
return default_notification_title
return None
def send_content_changed_notification ( self , watch_uuid ) :
"""
Send notification when content changes are detected
"""
2025-10-14 08:58:53 +00:00
n_object = NotificationContextData ( )
2025-06-03 08:17:19 +00:00
watch = self . datastore . data [ ' watching ' ] . get ( watch_uuid )
if not watch :
return
watch_history = watch . history
dates = list ( watch_history . keys ( ) )
# Theoretically it's possible that this could be just 1 long,
# - In the case that the timestamp key was not unique
if len ( dates ) == 1 :
raise ValueError (
" History index had 2 or more, but only 1 date loaded, timestamps were not unique? maybe two of the same timestamps got written, needs more delay? "
)
# Should be a better parent getter in the model object
# Prefer - Individual watch settings > Tag settings > Global settings (in that order)
n_object [ ' notification_urls ' ] = self . _check_cascading_vars ( ' notification_urls ' , watch )
n_object [ ' notification_title ' ] = self . _check_cascading_vars ( ' notification_title ' , watch )
n_object [ ' notification_body ' ] = self . _check_cascading_vars ( ' notification_body ' , watch )
n_object [ ' notification_format ' ] = self . _check_cascading_vars ( ' notification_format ' , watch )
# (Individual watch) Only prepare to notify if the rules above matched
queued = False
if n_object and n_object . get ( ' notification_urls ' ) :
queued = True
count = watch . get ( ' notification_alert_count ' , 0 ) + 1
self . datastore . update_watch ( uuid = watch_uuid , update_obj = { ' notification_alert_count ' : count } )
self . queue_notification_for_watch ( n_object = n_object , watch = watch )
return queued
def send_filter_failure_notification ( self , watch_uuid ) :
"""
Send notification when CSS / XPath filters fail consecutively
"""
threshold = self . datastore . data [ ' settings ' ] [ ' application ' ] . get ( ' filter_failure_notification_threshold_attempts ' )
watch = self . datastore . data [ ' watching ' ] . get ( watch_uuid )
if not watch :
return
2025-10-14 08:58:53 +00:00
n_object = NotificationContextData ( {
' notification_title ' : ' Changedetection.io - Alert - CSS/xPath filter was not present in the page ' ,
' notification_body ' : " Your configured CSS/xPath filters of ' {} ' for {{ {{ watch_url}}}} did not appear on the page after {} attempts, did the page change layout? \n \n Link: {{ {{ base_url}}}}/edit/ {{ {{ watch_uuid}}}} \n \n Thanks - Your omniscient changedetection.io installation :) \n " . format (
" , " . join ( watch [ ' include_filters ' ] ) ,
threshold ) ,
' notification_format ' : ' text '
} )
2025-06-03 08:17:19 +00:00
if len ( watch [ ' notification_urls ' ] ) :
n_object [ ' notification_urls ' ] = watch [ ' notification_urls ' ]
elif len ( self . datastore . data [ ' settings ' ] [ ' application ' ] [ ' notification_urls ' ] ) :
n_object [ ' notification_urls ' ] = self . datastore . data [ ' settings ' ] [ ' application ' ] [ ' notification_urls ' ]
# Only prepare to notify if the rules above matched
if ' notification_urls ' in n_object :
n_object . update ( {
' watch_url ' : watch [ ' url ' ] ,
' uuid ' : watch_uuid ,
' screenshot ' : None
} )
self . notification_q . put ( n_object )
logger . debug ( f " Sent filter not found notification for { watch_uuid } " )
else :
logger . debug ( f " NOT sending filter not found notification for { watch_uuid } - no notification URLs " )
def send_step_failure_notification ( self , watch_uuid , step_n ) :
"""
Send notification when browser steps fail consecutively
"""
watch = self . datastore . data [ ' watching ' ] . get ( watch_uuid , False )
if not watch :
return
threshold = self . datastore . data [ ' settings ' ] [ ' application ' ] . get ( ' filter_failure_notification_threshold_attempts ' )
2025-10-14 08:58:53 +00:00
n_object = NotificationContextData ( {
' notification_title ' : " Changedetection.io - Alert - Browser step at position {} could not be run " . format ( step_n + 1 ) ,
' notification_body ' : " Your configured browser step at position {} for {{ {{ watch_url}}}} "
" did not appear on the page after {} attempts, did the page change layout? "
" Does it need a delay added? \n \n Link: {{ {{ base_url}}}}/edit/ {{ {{ watch_uuid}}}} \n \n "
" Thanks - Your omniscient changedetection.io installation :) \n " . format ( step_n + 1 , threshold ) ,
' notification_format ' : ' text '
} )
2025-06-03 08:17:19 +00:00
if len ( watch [ ' notification_urls ' ] ) :
n_object [ ' notification_urls ' ] = watch [ ' notification_urls ' ]
elif len ( self . datastore . data [ ' settings ' ] [ ' application ' ] [ ' notification_urls ' ] ) :
n_object [ ' notification_urls ' ] = self . datastore . data [ ' settings ' ] [ ' application ' ] [ ' notification_urls ' ]
# Only prepare to notify if the rules above matched
if ' notification_urls ' in n_object :
n_object . update ( {
' watch_url ' : watch [ ' url ' ] ,
' uuid ' : watch_uuid
} )
self . notification_q . put ( n_object )
logger . error ( f " Sent step not found notification for { watch_uuid } " )
# Convenience functions for creating notification service instances
def create_notification_service ( datastore , notification_q ) :
"""
Factory function to create a NotificationService instance
"""
return NotificationService ( datastore , notification_q )