2022-08-15 16:56:53 +00:00
import os
2021-05-08 01:29:41 +00:00
import threading
import queue
2021-08-12 10:05:59 +00:00
import time
2021-05-08 01:29:41 +00:00
2022-04-19 21:15:32 +00:00
from changedetectionio import content_fetcher
2022-07-23 15:15:27 +00:00
from changedetectionio . html_tools import FilterNotFoundInResponse
2022-01-02 13:11:04 +00:00
# A single update worker
#
# Requests for checking on a single site(watch) from a queue of watches
# (another process inserts watches into the queue that are time-ready for checking)
2022-09-08 07:10:04 +00:00
import logging
import sys
2022-01-02 13:11:04 +00:00
2021-05-08 01:29:41 +00:00
class update_worker ( threading . Thread ) :
current_uuid = None
def __init__ ( self , q , notification_q , app , datastore , * args , * * kwargs ) :
2022-09-08 07:10:04 +00:00
logging . basicConfig ( stream = sys . stderr , level = logging . DEBUG )
2021-05-08 01:29:41 +00:00
self . q = q
self . app = app
self . notification_q = notification_q
self . datastore = datastore
super ( ) . __init__ ( * args , * * kwargs )
2022-07-29 19:09:55 +00:00
def send_content_changed_notification ( self , t , watch_uuid ) :
from changedetectionio import diff
2022-09-08 07:10:04 +00:00
from changedetectionio . notification import (
default_notification_format_for_watch
)
2022-07-29 19:09:55 +00:00
n_object = { }
watch = self . datastore . data [ ' watching ' ] . get ( watch_uuid , False )
if not watch :
return
watch_history = watch . history
dates = list ( watch_history . keys ( ) )
# Theoretically it's possible that this could be just 1 long,
# - In the case that the timestamp key was not unique
if len ( dates ) == 1 :
raise ValueError (
" History index had 2 or more, but only 1 date loaded, timestamps were not unique? maybe two of the same timestamps got written, needs more delay? "
)
2022-09-08 07:10:04 +00:00
n_object [ ' notification_urls ' ] = watch [ ' notification_urls ' ] if len ( watch [ ' notification_urls ' ] ) else \
self . datastore . data [ ' settings ' ] [ ' application ' ] [ ' notification_urls ' ]
n_object [ ' notification_title ' ] = watch [ ' notification_title ' ] if watch [ ' notification_title ' ] else \
self . datastore . data [ ' settings ' ] [ ' application ' ] [ ' notification_title ' ]
n_object [ ' notification_body ' ] = watch [ ' notification_body ' ] if watch [ ' notification_body ' ] else \
self . datastore . data [ ' settings ' ] [ ' application ' ] [ ' notification_body ' ]
n_object [ ' notification_format ' ] = watch [ ' notification_format ' ] if watch [ ' notification_format ' ] != default_notification_format_for_watch else \
self . datastore . data [ ' settings ' ] [ ' application ' ] [ ' notification_format ' ]
2022-07-29 19:09:55 +00:00
# Only prepare to notify if the rules above matched
2022-09-08 07:10:04 +00:00
if ' notification_urls ' in n_object and n_object [ ' notification_urls ' ] :
2022-07-29 19:09:55 +00:00
# HTML needs linebreak, but MarkDown and Text can use a linefeed
if n_object [ ' notification_format ' ] == ' HTML ' :
line_feed_sep = " </br> "
else :
line_feed_sep = " \n "
with open ( watch_history [ dates [ - 1 ] ] , ' rb ' ) as f :
snapshot_contents = f . read ( )
n_object . update ( {
' watch_url ' : watch [ ' url ' ] ,
' uuid ' : watch_uuid ,
' current_snapshot ' : snapshot_contents . decode ( ' utf-8 ' ) ,
' diff ' : diff . render_diff ( watch_history [ dates [ - 2 ] ] , watch_history [ dates [ - 1 ] ] , line_feed_sep = line_feed_sep ) ,
' diff_full ' : diff . render_diff ( watch_history [ dates [ - 2 ] ] , watch_history [ dates [ - 1 ] ] , True , line_feed_sep = line_feed_sep )
} )
2022-09-08 07:10:04 +00:00
logging . info ( " >> SENDING NOTIFICATION " )
2022-07-29 19:09:55 +00:00
self . notification_q . put ( n_object )
2022-09-08 07:10:04 +00:00
else :
logging . info ( " >> NO Notification sent, notification_url was empty in both watch and system " )
2022-07-29 19:09:55 +00:00
def send_filter_failure_notification ( self , watch_uuid ) :
2022-07-23 15:15:27 +00:00
threshold = self . datastore . data [ ' settings ' ] [ ' application ' ] . get ( ' filter_failure_notification_threshold_attempts ' )
2022-07-29 19:09:55 +00:00
watch = self . datastore . data [ ' watching ' ] . get ( watch_uuid , False )
if not watch :
return
2022-07-23 15:15:27 +00:00
n_object = { ' notification_title ' : ' Changedetection.io - Alert - CSS/xPath filter was not present in the page ' ,
' notification_body ' : " Your configured CSS/xPath filter of ' {} ' for {{ watch_url}} did not appear on the page after {} attempts, did the page change layout? \n \n Link: {{ base_url}}/edit/ {{ watch_uuid}} \n \n Thanks - Your omniscient changedetection.io installation :) \n " . format (
watch [ ' css_filter ' ] ,
threshold ) ,
' notification_format ' : ' text ' }
if len ( watch [ ' notification_urls ' ] ) :
n_object [ ' notification_urls ' ] = watch [ ' notification_urls ' ]
elif len ( self . datastore . data [ ' settings ' ] [ ' application ' ] [ ' notification_urls ' ] ) :
n_object [ ' notification_urls ' ] = self . datastore . data [ ' settings ' ] [ ' application ' ] [ ' notification_urls ' ]
# Only prepare to notify if the rules above matched
if ' notification_urls ' in n_object :
n_object . update ( {
' watch_url ' : watch [ ' url ' ] ,
2022-07-29 19:09:55 +00:00
' uuid ' : watch_uuid
2022-07-23 15:15:27 +00:00
} )
self . notification_q . put ( n_object )
2022-07-29 19:09:55 +00:00
print ( " Sent filter not found notification for {} " . format ( watch_uuid ) )
2022-07-23 15:15:27 +00:00
2022-08-15 16:56:53 +00:00
def cleanup_error_artifacts ( self , uuid ) :
# All went fine, remove error artifacts
cleanup_files = [ " last-error-screenshot.png " , " last-error.txt " ]
for f in cleanup_files :
full_path = os . path . join ( self . datastore . datastore_path , uuid , f )
if os . path . isfile ( full_path ) :
os . unlink ( full_path )
2021-05-08 01:29:41 +00:00
def run ( self ) :
2021-08-16 13:24:37 +00:00
from changedetectionio import fetch_site_status
2021-05-08 01:29:41 +00:00
update_handler = fetch_site_status . perform_site_check ( datastore = self . datastore )
while not self . app . config . exit . is_set ( ) :
try :
2022-07-31 13:35:35 +00:00
priority , uuid = self . q . get ( block = False )
2021-05-08 01:29:41 +00:00
except queue . Empty :
pass
else :
self . current_uuid = uuid
2022-08-15 19:46:18 +00:00
if uuid in list ( self . datastore . data [ ' watching ' ] . keys ( ) ) :
2021-08-12 10:05:59 +00:00
changed_detected = False
2022-07-29 08:11:49 +00:00
contents = b ' '
2022-04-02 12:49:32 +00:00
screenshot = False
2021-08-12 10:05:59 +00:00
update_obj = { }
2022-05-23 21:44:51 +00:00
xpath_data = False
2022-07-29 08:11:49 +00:00
process_changedetection_results = True
2022-07-31 13:35:35 +00:00
print ( " > Processing UUID {} Priority {} URL {} " . format ( uuid , priority , self . datastore . data [ ' watching ' ] [ uuid ] [ ' url ' ] ) )
2022-01-05 13:13:30 +00:00
now = time . time ( )
2021-08-12 10:05:59 +00:00
2021-05-08 01:29:41 +00:00
try :
2022-08-17 11:21:06 +00:00
changed_detected , update_obj , contents = update_handler . run ( uuid )
2022-01-02 13:11:04 +00:00
# Re #342
# In Python 3, all strings are sequences of Unicode characters. There is a bytes type that holds raw bytes.
# We then convert/.decode('utf-8') for the notification etc
if not isinstance ( contents , ( bytes , bytearray ) ) :
raise Exception ( " Error - returned data from the fetch handler SHOULD be bytes " )
2021-06-24 09:10:19 +00:00
except PermissionError as e :
self . app . logger . error ( " File permission error updating " , uuid , str ( e ) )
2022-07-29 08:11:49 +00:00
process_changedetection_results = False
2022-05-17 20:22:00 +00:00
except content_fetcher . ReplyWithContentButNoText as e :
# Totally fine, it's by choice - just continue on, nothing more to care about
# Page had elements/content but no renderable text
2022-07-29 08:11:49 +00:00
# Backend (not filters) gave zero output
2022-08-15 16:56:53 +00:00
self . datastore . update_watch ( uuid = uuid , update_obj = { ' last_error ' : " Got HTML content but no text found (With {} reply code). " . format ( e . status_code ) } )
if e . screenshot :
self . datastore . save_screenshot ( watch_uuid = uuid , screenshot = e . screenshot )
process_changedetection_results = False
except content_fetcher . Non200ErrorCodeReceived as e :
if e . status_code == 403 :
err_text = " Error - 403 (Access denied) received "
elif e . status_code == 404 :
err_text = " Error - 404 (Page not found) received "
elif e . status_code == 500 :
err_text = " Error - 500 (Internal server Error) received "
else :
err_text = " Error - Request returned a HTTP error code {} " . format ( str ( e . status_code ) )
if e . screenshot :
self . datastore . save_screenshot ( watch_uuid = uuid , screenshot = e . screenshot , as_error = True )
if e . xpath_data :
self . datastore . save_xpath_data ( watch_uuid = uuid , data = e . xpath_data , as_error = True )
if e . page_text :
self . datastore . save_error_text ( watch_uuid = uuid , contents = e . page_text )
self . datastore . update_watch ( uuid = uuid , update_obj = { ' last_error ' : err_text ,
# So that we get a trigger when the content is added again
' previous_md5 ' : ' ' } )
2022-07-29 08:11:49 +00:00
process_changedetection_results = False
2022-07-23 15:15:27 +00:00
except FilterNotFoundInResponse as e :
2022-09-07 21:04:35 +00:00
if not self . datastore . data [ ' watching ' ] . get ( uuid ) :
continue
2022-07-29 08:11:49 +00:00
err_text = " Warning, filter ' {} ' not found " . format ( str ( e ) )
self . datastore . update_watch ( uuid = uuid , update_obj = { ' last_error ' : err_text ,
# So that we get a trigger when the content is added again
' previous_md5 ' : ' ' } )
2022-07-23 15:15:27 +00:00
2022-07-29 08:11:49 +00:00
# Only when enabled, send the notification
if self . datastore . data [ ' watching ' ] [ uuid ] . get ( ' filter_failure_notification_send ' , False ) :
c = self . datastore . data [ ' watching ' ] [ uuid ] . get ( ' consecutive_filter_failures ' , 5 )
c + = 1
# Send notification if we reached the threshold?
threshold = self . datastore . data [ ' settings ' ] [ ' application ' ] . get ( ' filter_failure_notification_threshold_attempts ' ,
0 )
print ( " Filter for {} not found, consecutive_filter_failures: {} " . format ( uuid , c ) )
if threshold > 0 and c > = threshold :
2022-07-29 19:09:55 +00:00
if not self . datastore . data [ ' watching ' ] [ uuid ] . get ( ' notification_muted ' ) :
self . send_filter_failure_notification ( uuid )
2022-07-29 08:11:49 +00:00
c = 0
2022-07-29 19:09:55 +00:00
2022-07-29 08:11:49 +00:00
self . datastore . update_watch ( uuid = uuid , update_obj = { ' consecutive_filter_failures ' : c } )
process_changedetection_results = True
2022-07-23 15:15:27 +00:00
2021-08-12 10:05:59 +00:00
except content_fetcher . EmptyReply as e :
2022-01-05 13:13:30 +00:00
# Some kind of custom to-str handler in the exception handler that does this?
2022-06-13 21:41:10 +00:00
err_text = " EmptyReply - try increasing ' Wait seconds before extracting text ' , Status Code {} " . format ( e . status_code )
2022-01-05 13:13:30 +00:00
self . datastore . update_watch ( uuid = uuid , update_obj = { ' last_error ' : err_text ,
' last_check_status ' : e . status_code } )
2022-06-01 10:59:44 +00:00
except content_fetcher . ScreenshotUnavailable as e :
2022-06-13 21:41:10 +00:00
err_text = " Screenshot unavailable, page did not render fully in the expected time - try increasing ' Wait seconds before extracting text ' "
2022-06-01 10:59:44 +00:00
self . datastore . update_watch ( uuid = uuid , update_obj = { ' last_error ' : err_text ,
' last_check_status ' : e . status_code } )
2022-07-29 08:11:49 +00:00
process_changedetection_results = False
2022-08-15 16:56:53 +00:00
except content_fetcher . JSActionExceptions as e :
err_text = " Error running JS Actions - Page request - " + e . message
if e . screenshot :
self . datastore . save_screenshot ( watch_uuid = uuid , screenshot = e . screenshot , as_error = True )
self . datastore . update_watch ( uuid = uuid , update_obj = { ' last_error ' : err_text ,
' last_check_status ' : e . status_code } )
2022-06-01 11:12:37 +00:00
except content_fetcher . PageUnloadable as e :
err_text = " Page request from server didnt respond correctly "
2022-08-17 11:25:08 +00:00
if e . message :
err_text = " {} - {} " . format ( err_text , e . message )
2022-08-15 16:56:53 +00:00
if e . screenshot :
self . datastore . save_screenshot ( watch_uuid = uuid , screenshot = e . screenshot , as_error = True )
2022-06-01 11:12:37 +00:00
self . datastore . update_watch ( uuid = uuid , update_obj = { ' last_error ' : err_text ,
' last_check_status ' : e . status_code } )
2021-06-24 09:10:19 +00:00
except Exception as e :
2022-01-15 22:18:04 +00:00
self . app . logger . error ( " Exception reached processing watch UUID: %s - %s " , uuid , str ( e ) )
2021-08-12 10:05:59 +00:00
self . datastore . update_watch ( uuid = uuid , update_obj = { ' last_error ' : str ( e ) } )
2022-07-29 08:11:49 +00:00
# Other serious error
process_changedetection_results = False
2021-05-08 01:29:41 +00:00
else :
2022-08-17 11:29:32 +00:00
# Crash protection, the watch entry could have been removed by this point (during a slow chrome fetch etc)
if not self . datastore . data [ ' watching ' ] . get ( uuid ) :
continue
2022-08-15 16:56:53 +00:00
2022-07-29 08:11:49 +00:00
# Mark that we never had any failures
2022-08-15 16:56:53 +00:00
if not self . datastore . data [ ' watching ' ] [ uuid ] . get ( ' ignore_status_codes ' ) :
update_obj [ ' consecutive_filter_failures ' ] = 0
self . cleanup_error_artifacts ( uuid )
2022-07-29 08:11:49 +00:00
# Different exceptions mean that we may or may not want to bump the snapshot, trigger notifications etc
if process_changedetection_results :
2022-01-02 21:28:34 +00:00
try :
watch = self . datastore . data [ ' watching ' ] [ uuid ]
2022-01-05 13:13:30 +00:00
fname = " " # Saved history text filename
2022-01-02 21:28:34 +00:00
# For the FIRST time we check a site, or a change detected, save the snapshot.
if changed_detected or not watch [ ' last_checked ' ] :
# A change was detected
2022-07-29 19:09:55 +00:00
watch . save_history_text ( contents = contents , timestamp = str ( round ( time . time ( ) ) ) )
2022-01-02 21:28:34 +00:00
self . datastore . update_watch ( uuid = uuid , update_obj = update_obj )
# A change was detected
if changed_detected :
print ( " >> Change detected in UUID {} - {} " . format ( uuid , watch [ ' url ' ] ) )
# Notifications should only trigger on the second time (first time, we gather the initial snapshot)
2022-05-31 21:43:50 +00:00
if watch . history_n > = 2 :
2022-07-29 19:09:55 +00:00
if not self . datastore . data [ ' watching ' ] [ uuid ] . get ( ' notification_muted ' ) :
self . send_content_changed_notification ( self , watch_uuid = uuid )
2022-06-23 07:41:55 +00:00
2022-01-02 21:28:34 +00:00
except Exception as e :
# Catch everything possible here, so that if a worker crashes, we don't lose it until restart!
print ( " !!!! Exception in update_worker !!! \n " , e )
2022-02-13 22:43:45 +00:00
self . app . logger . error ( " Exception reached processing watch UUID: %s - %s " , uuid , str ( e ) )
self . datastore . update_watch ( uuid = uuid , update_obj = { ' last_error ' : str ( e ) } )
2021-05-08 01:29:41 +00:00
2022-07-29 08:11:49 +00:00
2022-08-15 16:56:53 +00:00
# Always record that we atleast tried
self . datastore . update_watch ( uuid = uuid , update_obj = { ' fetch_time ' : round ( time . time ( ) - now , 3 ) ,
' last_checked ' : round ( time . time ( ) ) } )
2022-06-23 07:41:55 +00:00
2022-08-15 16:56:53 +00:00
# Always save the screenshot if it's available
2022-08-17 11:21:06 +00:00
if update_handler . screenshot :
self . datastore . save_screenshot ( watch_uuid = uuid , screenshot = update_handler . screenshot )
if update_handler . xpath_data :
self . datastore . save_xpath_data ( watch_uuid = uuid , data = update_handler . xpath_data )
2022-05-23 21:44:51 +00:00
2022-01-05 13:13:30 +00:00
2021-05-08 01:29:41 +00:00
self . current_uuid = None # Done
self . q . task_done ( )
2022-02-28 14:08:51 +00:00
# Give the CPU time to interrupt
time . sleep ( 0.1 )
2021-05-08 01:29:41 +00:00
self . app . config . exit . wait ( 1 )