2022-08-15 16:56:53 +00:00
import os
2021-05-08 01:29:41 +00:00
import threading
import queue
2021-08-12 10:05:59 +00:00
import time
2021-05-08 01:29:41 +00:00
2022-04-19 21:15:32 +00:00
from changedetectionio import content_fetcher
2023-03-18 19:36:26 +00:00
from . processors . text_json_diff import FilterNotFoundInResponse
2022-07-23 15:15:27 +00:00
2022-01-02 13:11:04 +00:00
# A single update worker
#
# Requests for checking on a single site(watch) from a queue of watches
# (another process inserts watches into the queue that are time-ready for checking)
2022-09-08 07:10:04 +00:00
import logging
import sys
2022-01-02 13:11:04 +00:00
2021-05-08 01:29:41 +00:00
class update_worker ( threading . Thread ) :
current_uuid = None
def __init__ ( self , q , notification_q , app , datastore , * args , * * kwargs ) :
2022-09-08 07:10:04 +00:00
logging . basicConfig ( stream = sys . stderr , level = logging . DEBUG )
2021-05-08 01:29:41 +00:00
self . q = q
self . app = app
self . notification_q = notification_q
self . datastore = datastore
super ( ) . __init__ ( * args , * * kwargs )
2022-07-29 19:09:55 +00:00
def send_content_changed_notification ( self , t , watch_uuid ) :
from changedetectionio import diff
2022-09-08 07:10:04 +00:00
from changedetectionio . notification import (
default_notification_format_for_watch
)
2022-07-29 19:09:55 +00:00
n_object = { }
watch = self . datastore . data [ ' watching ' ] . get ( watch_uuid , False )
if not watch :
return
watch_history = watch . history
dates = list ( watch_history . keys ( ) )
# Theoretically it's possible that this could be just 1 long,
# - In the case that the timestamp key was not unique
if len ( dates ) == 1 :
raise ValueError (
" History index had 2 or more, but only 1 date loaded, timestamps were not unique? maybe two of the same timestamps got written, needs more delay? "
)
2022-09-08 07:10:04 +00:00
n_object [ ' notification_urls ' ] = watch [ ' notification_urls ' ] if len ( watch [ ' notification_urls ' ] ) else \
self . datastore . data [ ' settings ' ] [ ' application ' ] [ ' notification_urls ' ]
n_object [ ' notification_title ' ] = watch [ ' notification_title ' ] if watch [ ' notification_title ' ] else \
self . datastore . data [ ' settings ' ] [ ' application ' ] [ ' notification_title ' ]
n_object [ ' notification_body ' ] = watch [ ' notification_body ' ] if watch [ ' notification_body ' ] else \
self . datastore . data [ ' settings ' ] [ ' application ' ] [ ' notification_body ' ]
n_object [ ' notification_format ' ] = watch [ ' notification_format ' ] if watch [ ' notification_format ' ] != default_notification_format_for_watch else \
self . datastore . data [ ' settings ' ] [ ' application ' ] [ ' notification_format ' ]
2022-07-29 19:09:55 +00:00
# Only prepare to notify if the rules above matched
2022-09-08 07:10:04 +00:00
if ' notification_urls ' in n_object and n_object [ ' notification_urls ' ] :
2022-07-29 19:09:55 +00:00
# HTML needs linebreak, but MarkDown and Text can use a linefeed
if n_object [ ' notification_format ' ] == ' HTML ' :
2023-03-12 16:05:34 +00:00
line_feed_sep = " <br> "
2022-07-29 19:09:55 +00:00
else :
line_feed_sep = " \n "
with open ( watch_history [ dates [ - 1 ] ] , ' rb ' ) as f :
snapshot_contents = f . read ( )
n_object . update ( {
' watch_url ' : watch [ ' url ' ] ,
' uuid ' : watch_uuid ,
2022-11-20 13:40:41 +00:00
' screenshot ' : watch . get_screenshot_as_jpeg ( ) if watch . get ( ' notification_screenshot ' ) else None ,
2022-07-29 19:09:55 +00:00
' current_snapshot ' : snapshot_contents . decode ( ' utf-8 ' ) ,
' diff ' : diff . render_diff ( watch_history [ dates [ - 2 ] ] , watch_history [ dates [ - 1 ] ] , line_feed_sep = line_feed_sep ) ,
2023-03-12 15:21:47 +00:00
' diff_added ' : diff . render_diff ( watch_history [ dates [ - 2 ] ] , watch_history [ dates [ - 1 ] ] , include_removed = False , line_feed_sep = line_feed_sep ) ,
' diff_removed ' : diff . render_diff ( watch_history [ dates [ - 2 ] ] , watch_history [ dates [ - 1 ] ] , include_added = False , line_feed_sep = line_feed_sep ) ,
' diff_full ' : diff . render_diff ( watch_history [ dates [ - 2 ] ] , watch_history [ dates [ - 1 ] ] , include_equal = True , line_feed_sep = line_feed_sep )
2022-07-29 19:09:55 +00:00
} )
2022-09-08 07:10:04 +00:00
logging . info ( " >> SENDING NOTIFICATION " )
2022-07-29 19:09:55 +00:00
self . notification_q . put ( n_object )
2022-09-08 07:10:04 +00:00
else :
logging . info ( " >> NO Notification sent, notification_url was empty in both watch and system " )
2022-07-29 19:09:55 +00:00
def send_filter_failure_notification ( self , watch_uuid ) :
2022-07-23 15:15:27 +00:00
threshold = self . datastore . data [ ' settings ' ] [ ' application ' ] . get ( ' filter_failure_notification_threshold_attempts ' )
2022-07-29 19:09:55 +00:00
watch = self . datastore . data [ ' watching ' ] . get ( watch_uuid , False )
if not watch :
return
2022-07-23 15:15:27 +00:00
n_object = { ' notification_title ' : ' Changedetection.io - Alert - CSS/xPath filter was not present in the page ' ,
2022-12-22 09:05:17 +00:00
' notification_body ' : " Your configured CSS/xPath filters of ' {} ' for {{ {{ watch_url}}}} did not appear on the page after {} attempts, did the page change layout? \n \n Link: {{ {{ base_url}}}}/edit/ {{ {{ watch_uuid}}}} \n \n Thanks - Your omniscient changedetection.io installation :) \n " . format (
2022-11-03 11:13:54 +00:00
" , " . join ( watch [ ' include_filters ' ] ) ,
2022-07-23 15:15:27 +00:00
threshold ) ,
' notification_format ' : ' text ' }
if len ( watch [ ' notification_urls ' ] ) :
n_object [ ' notification_urls ' ] = watch [ ' notification_urls ' ]
elif len ( self . datastore . data [ ' settings ' ] [ ' application ' ] [ ' notification_urls ' ] ) :
n_object [ ' notification_urls ' ] = self . datastore . data [ ' settings ' ] [ ' application ' ] [ ' notification_urls ' ]
# Only prepare to notify if the rules above matched
if ' notification_urls ' in n_object :
n_object . update ( {
' watch_url ' : watch [ ' url ' ] ,
2022-11-20 08:37:48 +00:00
' uuid ' : watch_uuid ,
2022-11-20 13:40:41 +00:00
' screenshot ' : None
2022-07-23 15:15:27 +00:00
} )
self . notification_q . put ( n_object )
2022-07-29 19:09:55 +00:00
print ( " Sent filter not found notification for {} " . format ( watch_uuid ) )
2022-07-23 15:15:27 +00:00
2022-11-24 19:53:01 +00:00
def send_step_failure_notification ( self , watch_uuid , step_n ) :
watch = self . datastore . data [ ' watching ' ] . get ( watch_uuid , False )
if not watch :
return
threshold = self . datastore . data [ ' settings ' ] [ ' application ' ] . get ( ' filter_failure_notification_threshold_attempts ' )
n_object = { ' notification_title ' : " Changedetection.io - Alert - Browser step at position {} could not be run " . format ( step_n + 1 ) ,
' notification_body ' : " Your configured browser step at position {} for {{ watch[ ' url ' ]}} "
" did not appear on the page after {} attempts, did the page change layout? "
" Does it need a delay added? \n \n Link: {{ base_url}}/edit/ {{ watch_uuid}} \n \n "
" Thanks - Your omniscient changedetection.io installation :) \n " . format ( step_n + 1 , threshold ) ,
' notification_format ' : ' text ' }
if len ( watch [ ' notification_urls ' ] ) :
n_object [ ' notification_urls ' ] = watch [ ' notification_urls ' ]
elif len ( self . datastore . data [ ' settings ' ] [ ' application ' ] [ ' notification_urls ' ] ) :
n_object [ ' notification_urls ' ] = self . datastore . data [ ' settings ' ] [ ' application ' ] [ ' notification_urls ' ]
# Only prepare to notify if the rules above matched
if ' notification_urls ' in n_object :
n_object . update ( {
' watch_url ' : watch [ ' url ' ] ,
' uuid ' : watch_uuid
} )
self . notification_q . put ( n_object )
print ( " Sent step not found notification for {} " . format ( watch_uuid ) )
2022-08-15 16:56:53 +00:00
def cleanup_error_artifacts ( self , uuid ) :
# All went fine, remove error artifacts
cleanup_files = [ " last-error-screenshot.png " , " last-error.txt " ]
for f in cleanup_files :
full_path = os . path . join ( self . datastore . datastore_path , uuid , f )
if os . path . isfile ( full_path ) :
os . unlink ( full_path )
2021-05-08 01:29:41 +00:00
def run ( self ) :
2023-03-18 19:36:26 +00:00
from . processors import text_json_diff , restock_diff
2021-05-08 01:29:41 +00:00
while not self . app . config . exit . is_set ( ) :
try :
2022-12-14 14:08:34 +00:00
queued_item_data = self . q . get ( block = False )
2021-05-08 01:29:41 +00:00
except queue . Empty :
pass
else :
2022-12-14 14:08:34 +00:00
uuid = queued_item_data . item . get ( ' uuid ' )
2021-05-08 01:29:41 +00:00
self . current_uuid = uuid
2022-08-15 19:46:18 +00:00
if uuid in list ( self . datastore . data [ ' watching ' ] . keys ( ) ) :
2021-08-12 10:05:59 +00:00
changed_detected = False
2022-07-29 08:11:49 +00:00
contents = b ' '
process_changedetection_results = True
2023-03-18 19:36:26 +00:00
update_obj = { }
print ( " > Processing UUID {} Priority {} URL {} " . format ( uuid , queued_item_data . priority ,
self . datastore . data [ ' watching ' ] [ uuid ] [ ' url ' ] ) )
2022-01-05 13:13:30 +00:00
now = time . time ( )
2021-08-12 10:05:59 +00:00
2021-05-08 01:29:41 +00:00
try :
2023-03-18 19:36:26 +00:00
processor = self . datastore . data [ ' watching ' ] [ uuid ] . get ( ' processor ' , ' text_json_diff ' )
# @todo some way to switch by name
if processor == ' restock_diff ' :
update_handler = restock_diff . perform_site_check ( datastore = self . datastore )
else :
# Used as a default and also by some tests
update_handler = text_json_diff . perform_site_check ( datastore = self . datastore )
2022-12-14 14:08:34 +00:00
changed_detected , update_obj , contents = update_handler . run ( uuid , skip_when_checksum_same = queued_item_data . item . get ( ' skip_when_checksum_same ' ) )
2022-01-02 13:11:04 +00:00
# Re #342
# In Python 3, all strings are sequences of Unicode characters. There is a bytes type that holds raw bytes.
# We then convert/.decode('utf-8') for the notification etc
if not isinstance ( contents , ( bytes , bytearray ) ) :
raise Exception ( " Error - returned data from the fetch handler SHOULD be bytes " )
2021-06-24 09:10:19 +00:00
except PermissionError as e :
self . app . logger . error ( " File permission error updating " , uuid , str ( e ) )
2022-07-29 08:11:49 +00:00
process_changedetection_results = False
2022-05-17 20:22:00 +00:00
except content_fetcher . ReplyWithContentButNoText as e :
# Totally fine, it's by choice - just continue on, nothing more to care about
# Page had elements/content but no renderable text
2022-07-29 08:11:49 +00:00
# Backend (not filters) gave zero output
2022-08-15 16:56:53 +00:00
self . datastore . update_watch ( uuid = uuid , update_obj = { ' last_error ' : " Got HTML content but no text found (With {} reply code). " . format ( e . status_code ) } )
if e . screenshot :
self . datastore . save_screenshot ( watch_uuid = uuid , screenshot = e . screenshot )
process_changedetection_results = False
except content_fetcher . Non200ErrorCodeReceived as e :
if e . status_code == 403 :
err_text = " Error - 403 (Access denied) received "
elif e . status_code == 404 :
err_text = " Error - 404 (Page not found) received "
elif e . status_code == 500 :
err_text = " Error - 500 (Internal server Error) received "
else :
err_text = " Error - Request returned a HTTP error code {} " . format ( str ( e . status_code ) )
if e . screenshot :
self . datastore . save_screenshot ( watch_uuid = uuid , screenshot = e . screenshot , as_error = True )
if e . xpath_data :
self . datastore . save_xpath_data ( watch_uuid = uuid , data = e . xpath_data , as_error = True )
if e . page_text :
self . datastore . save_error_text ( watch_uuid = uuid , contents = e . page_text )
2023-02-25 21:14:47 +00:00
self . datastore . update_watch ( uuid = uuid , update_obj = { ' last_error ' : err_text } )
2022-07-29 08:11:49 +00:00
process_changedetection_results = False
2022-07-23 15:15:27 +00:00
except FilterNotFoundInResponse as e :
2022-09-07 21:04:35 +00:00
if not self . datastore . data [ ' watching ' ] . get ( uuid ) :
continue
2022-11-03 11:13:54 +00:00
err_text = " Warning, no filters were found, no change detection ran. "
2023-02-25 21:14:47 +00:00
self . datastore . update_watch ( uuid = uuid , update_obj = { ' last_error ' : err_text } )
2022-07-23 15:15:27 +00:00
2022-07-29 08:11:49 +00:00
# Only when enabled, send the notification
if self . datastore . data [ ' watching ' ] [ uuid ] . get ( ' filter_failure_notification_send ' , False ) :
c = self . datastore . data [ ' watching ' ] [ uuid ] . get ( ' consecutive_filter_failures ' , 5 )
c + = 1
# Send notification if we reached the threshold?
threshold = self . datastore . data [ ' settings ' ] [ ' application ' ] . get ( ' filter_failure_notification_threshold_attempts ' ,
0 )
print ( " Filter for {} not found, consecutive_filter_failures: {} " . format ( uuid , c ) )
if threshold > 0 and c > = threshold :
2022-07-29 19:09:55 +00:00
if not self . datastore . data [ ' watching ' ] [ uuid ] . get ( ' notification_muted ' ) :
self . send_filter_failure_notification ( uuid )
2022-07-29 08:11:49 +00:00
c = 0
2022-07-29 19:09:55 +00:00
2022-07-29 08:11:49 +00:00
self . datastore . update_watch ( uuid = uuid , update_obj = { ' consecutive_filter_failures ' : c } )
2023-02-25 21:14:47 +00:00
process_changedetection_results = False
2022-07-23 15:15:27 +00:00
2022-12-14 14:08:34 +00:00
except content_fetcher . checksumFromPreviousCheckWasTheSame as e :
2023-02-17 15:59:03 +00:00
# Yes fine, so nothing todo, don't continue to process.
process_changedetection_results = False
changed_detected = False
2022-12-14 14:08:34 +00:00
2022-11-24 19:53:01 +00:00
except content_fetcher . BrowserStepsStepTimout as e :
if not self . datastore . data [ ' watching ' ] . get ( uuid ) :
continue
err_text = " Warning, browser step at position {} could not run, target not found, check the watch, add a delay if necessary. " . format ( e . step_n + 1 )
2023-02-25 21:14:47 +00:00
self . datastore . update_watch ( uuid = uuid , update_obj = { ' last_error ' : err_text } )
2022-11-24 19:53:01 +00:00
if self . datastore . data [ ' watching ' ] [ uuid ] . get ( ' filter_failure_notification_send ' , False ) :
c = self . datastore . data [ ' watching ' ] [ uuid ] . get ( ' consecutive_filter_failures ' , 5 )
c + = 1
# Send notification if we reached the threshold?
threshold = self . datastore . data [ ' settings ' ] [ ' application ' ] . get ( ' filter_failure_notification_threshold_attempts ' ,
0 )
print ( " Step for {} not found, consecutive_filter_failures: {} " . format ( uuid , c ) )
if threshold > 0 and c > = threshold :
if not self . datastore . data [ ' watching ' ] [ uuid ] . get ( ' notification_muted ' ) :
self . send_step_failure_notification ( watch_uuid = uuid , step_n = e . step_n )
c = 0
self . datastore . update_watch ( uuid = uuid , update_obj = { ' consecutive_filter_failures ' : c } )
2023-02-25 21:14:47 +00:00
2022-11-24 19:53:01 +00:00
process_changedetection_results = False
2021-08-12 10:05:59 +00:00
except content_fetcher . EmptyReply as e :
2022-01-05 13:13:30 +00:00
# Some kind of custom to-str handler in the exception handler that does this?
2022-06-13 21:41:10 +00:00
err_text = " EmptyReply - try increasing ' Wait seconds before extracting text ' , Status Code {} " . format ( e . status_code )
2022-01-05 13:13:30 +00:00
self . datastore . update_watch ( uuid = uuid , update_obj = { ' last_error ' : err_text ,
' last_check_status ' : e . status_code } )
2023-02-26 12:54:14 +00:00
process_changedetection_results = False
2022-06-01 10:59:44 +00:00
except content_fetcher . ScreenshotUnavailable as e :
2022-06-13 21:41:10 +00:00
err_text = " Screenshot unavailable, page did not render fully in the expected time - try increasing ' Wait seconds before extracting text ' "
2022-06-01 10:59:44 +00:00
self . datastore . update_watch ( uuid = uuid , update_obj = { ' last_error ' : err_text ,
' last_check_status ' : e . status_code } )
2022-07-29 08:11:49 +00:00
process_changedetection_results = False
2022-08-15 16:56:53 +00:00
except content_fetcher . JSActionExceptions as e :
err_text = " Error running JS Actions - Page request - " + e . message
if e . screenshot :
self . datastore . save_screenshot ( watch_uuid = uuid , screenshot = e . screenshot , as_error = True )
self . datastore . update_watch ( uuid = uuid , update_obj = { ' last_error ' : err_text ,
' last_check_status ' : e . status_code } )
2023-02-26 12:54:14 +00:00
process_changedetection_results = False
2022-06-01 11:12:37 +00:00
except content_fetcher . PageUnloadable as e :
err_text = " Page request from server didnt respond correctly "
2022-08-17 11:25:08 +00:00
if e . message :
err_text = " {} - {} " . format ( err_text , e . message )
2022-08-15 16:56:53 +00:00
if e . screenshot :
self . datastore . save_screenshot ( watch_uuid = uuid , screenshot = e . screenshot , as_error = True )
2022-06-01 11:12:37 +00:00
self . datastore . update_watch ( uuid = uuid , update_obj = { ' last_error ' : err_text ,
' last_check_status ' : e . status_code } )
2023-02-26 12:54:14 +00:00
process_changedetection_results = False
2021-06-24 09:10:19 +00:00
except Exception as e :
2022-01-15 22:18:04 +00:00
self . app . logger . error ( " Exception reached processing watch UUID: %s - %s " , uuid , str ( e ) )
2021-08-12 10:05:59 +00:00
self . datastore . update_watch ( uuid = uuid , update_obj = { ' last_error ' : str ( e ) } )
2022-07-29 08:11:49 +00:00
# Other serious error
process_changedetection_results = False
2021-05-08 01:29:41 +00:00
else :
2022-08-17 11:29:32 +00:00
# Crash protection, the watch entry could have been removed by this point (during a slow chrome fetch etc)
if not self . datastore . data [ ' watching ' ] . get ( uuid ) :
continue
2022-08-15 16:56:53 +00:00
2022-07-29 08:11:49 +00:00
# Mark that we never had any failures
2022-08-15 16:56:53 +00:00
if not self . datastore . data [ ' watching ' ] [ uuid ] . get ( ' ignore_status_codes ' ) :
update_obj [ ' consecutive_filter_failures ' ] = 0
self . cleanup_error_artifacts ( uuid )
2022-07-29 08:11:49 +00:00
# Different exceptions mean that we may or may not want to bump the snapshot, trigger notifications etc
if process_changedetection_results :
2022-01-02 21:28:34 +00:00
try :
2023-02-17 16:15:27 +00:00
watch = self . datastore . data [ ' watching ' ] . get ( uuid )
self . datastore . update_watch ( uuid = uuid , update_obj = update_obj )
2022-01-02 21:28:34 +00:00
2023-02-17 16:15:27 +00:00
# Also save the snapshot on the first time checked
2022-01-02 21:28:34 +00:00
if changed_detected or not watch [ ' last_checked ' ] :
2023-02-17 16:15:27 +00:00
watch . save_history_text ( contents = contents ,
timestamp = str ( round ( time . time ( ) ) ) ,
snapshot_id = update_obj . get ( ' previous_md5 ' , ' none ' ) )
2022-01-02 21:28:34 +00:00
# A change was detected
if changed_detected :
print ( " >> Change detected in UUID {} - {} " . format ( uuid , watch [ ' url ' ] ) )
# Notifications should only trigger on the second time (first time, we gather the initial snapshot)
2022-05-31 21:43:50 +00:00
if watch . history_n > = 2 :
2022-07-29 19:09:55 +00:00
if not self . datastore . data [ ' watching ' ] [ uuid ] . get ( ' notification_muted ' ) :
self . send_content_changed_notification ( self , watch_uuid = uuid )
2022-06-23 07:41:55 +00:00
2022-01-02 21:28:34 +00:00
except Exception as e :
# Catch everything possible here, so that if a worker crashes, we don't lose it until restart!
print ( " !!!! Exception in update_worker !!! \n " , e )
2022-02-13 22:43:45 +00:00
self . app . logger . error ( " Exception reached processing watch UUID: %s - %s " , uuid , str ( e ) )
self . datastore . update_watch ( uuid = uuid , update_obj = { ' last_error ' : str ( e ) } )
2021-05-08 01:29:41 +00:00
2022-11-10 19:01:07 +00:00
if self . datastore . data [ ' watching ' ] . get ( uuid ) :
# Always record that we atleast tried
count = self . datastore . data [ ' watching ' ] [ uuid ] . get ( ' check_count ' , 0 ) + 1
self . datastore . update_watch ( uuid = uuid , update_obj = { ' fetch_time ' : round ( time . time ( ) - now , 3 ) ,
' last_checked ' : round ( time . time ( ) ) ,
' check_count ' : count
} )
# Always save the screenshot if it's available
if update_handler . screenshot :
self . datastore . save_screenshot ( watch_uuid = uuid , screenshot = update_handler . screenshot )
if update_handler . xpath_data :
self . datastore . save_xpath_data ( watch_uuid = uuid , data = update_handler . xpath_data )
2022-05-23 21:44:51 +00:00
2022-01-05 13:13:30 +00:00
2021-05-08 01:29:41 +00:00
self . current_uuid = None # Done
self . q . task_done ( )
2022-02-28 14:08:51 +00:00
# Give the CPU time to interrupt
time . sleep ( 0.1 )
2021-05-08 01:29:41 +00:00
self . app . config . exit . wait ( 1 )