kopia lustrzana https://github.com/dgtlmoon/changedetection.io
socketio-tweaks
rodzic
f7695f59d3
commit
3d61ce8df7
|
@ -406,50 +406,39 @@ def cleanup_error_artifacts(uuid, datastore):
|
|||
|
||||
|
||||
async def send_content_changed_notification(watch_uuid, notification_q, datastore):
|
||||
"""Helper function to queue notifications (kept sync for now)"""
|
||||
# Note: This uses the original sync notification logic since notifications
|
||||
# are handled by a separate thread. Could be made async later if needed.
|
||||
"""Helper function to queue notifications using the new notification service"""
|
||||
try:
|
||||
# Import here to avoid circular imports
|
||||
from changedetectionio.update_worker import update_worker
|
||||
from changedetectionio.notification_service import create_notification_service
|
||||
|
||||
# Create temporary worker instance just for notification methods
|
||||
temp_worker = update_worker(None, notification_q, None, datastore)
|
||||
temp_worker.datastore = datastore
|
||||
temp_worker.notification_q = notification_q
|
||||
# Create notification service instance
|
||||
notification_service = create_notification_service(datastore, notification_q)
|
||||
|
||||
temp_worker.send_content_changed_notification(watch_uuid)
|
||||
notification_service.send_content_changed_notification(watch_uuid)
|
||||
except Exception as e:
|
||||
logger.error(f"Error sending notification for {watch_uuid}: {e}")
|
||||
|
||||
|
||||
async def send_filter_failure_notification(watch_uuid, notification_q, datastore):
|
||||
"""Helper function to send filter failure notifications"""
|
||||
"""Helper function to send filter failure notifications using the new notification service"""
|
||||
try:
|
||||
# Import here to avoid circular imports
|
||||
from changedetectionio.update_worker import update_worker
|
||||
from changedetectionio.notification_service import create_notification_service
|
||||
|
||||
# Create temporary worker instance just for notification methods
|
||||
temp_worker = update_worker(None, notification_q, None, datastore)
|
||||
temp_worker.datastore = datastore
|
||||
temp_worker.notification_q = notification_q
|
||||
# Create notification service instance
|
||||
notification_service = create_notification_service(datastore, notification_q)
|
||||
|
||||
temp_worker.send_filter_failure_notification(watch_uuid)
|
||||
notification_service.send_filter_failure_notification(watch_uuid)
|
||||
except Exception as e:
|
||||
logger.error(f"Error sending filter failure notification for {watch_uuid}: {e}")
|
||||
|
||||
|
||||
async def send_step_failure_notification(watch_uuid, step_n, notification_q, datastore):
|
||||
"""Helper function to send step failure notifications"""
|
||||
"""Helper function to send step failure notifications using the new notification service"""
|
||||
try:
|
||||
# Import here to avoid circular imports
|
||||
from changedetectionio.update_worker import update_worker
|
||||
from changedetectionio.notification_service import create_notification_service
|
||||
|
||||
# Create temporary worker instance just for notification methods
|
||||
temp_worker = update_worker(None, notification_q, None, datastore)
|
||||
temp_worker.datastore = datastore
|
||||
temp_worker.notification_q = notification_q
|
||||
# Create notification service instance
|
||||
notification_service = create_notification_service(datastore, notification_q)
|
||||
|
||||
temp_worker.send_step_failure_notification(watch_uuid, step_n)
|
||||
notification_service.send_step_failure_notification(watch_uuid, step_n)
|
||||
except Exception as e:
|
||||
logger.error(f"Error sending step failure notification for {watch_uuid}: {e}")
|
|
@ -694,7 +694,6 @@ def notification_runner():
|
|||
# Threaded runner, look for new watches to feed into the Queue.
|
||||
def ticker_thread_check_time_launch_checks():
|
||||
import random
|
||||
from changedetectionio import update_worker
|
||||
proxy_last_called_time = {}
|
||||
last_health_check = 0
|
||||
|
||||
|
|
|
@ -0,0 +1,246 @@
|
|||
#!/usr/bin/env python3
|
||||
|
||||
"""
|
||||
Notification Service Module
|
||||
Extracted from update_worker.py to provide standalone notification functionality
|
||||
for both sync and async workers
|
||||
"""
|
||||
|
||||
import time
|
||||
from loguru import logger
|
||||
|
||||
|
||||
class NotificationService:
|
||||
"""
|
||||
Standalone notification service that handles all notification functionality
|
||||
previously embedded in the update_worker class
|
||||
"""
|
||||
|
||||
def __init__(self, datastore, notification_q):
|
||||
self.datastore = datastore
|
||||
self.notification_q = notification_q
|
||||
|
||||
def queue_notification_for_watch(self, n_object, watch):
|
||||
"""
|
||||
Queue a notification for a watch with full diff rendering and template variables
|
||||
"""
|
||||
from changedetectionio import diff
|
||||
from changedetectionio.notification import default_notification_format_for_watch
|
||||
|
||||
dates = []
|
||||
trigger_text = ''
|
||||
|
||||
now = time.time()
|
||||
|
||||
if watch:
|
||||
watch_history = watch.history
|
||||
dates = list(watch_history.keys())
|
||||
trigger_text = watch.get('trigger_text', [])
|
||||
|
||||
# Add text that was triggered
|
||||
if len(dates):
|
||||
snapshot_contents = watch.get_history_snapshot(dates[-1])
|
||||
else:
|
||||
snapshot_contents = "No snapshot/history available, the watch should fetch atleast once."
|
||||
|
||||
# If we ended up here with "System default"
|
||||
if n_object.get('notification_format') == default_notification_format_for_watch:
|
||||
n_object['notification_format'] = self.datastore.data['settings']['application'].get('notification_format')
|
||||
|
||||
html_colour_enable = False
|
||||
# HTML needs linebreak, but MarkDown and Text can use a linefeed
|
||||
if n_object.get('notification_format') == 'HTML':
|
||||
line_feed_sep = "<br>"
|
||||
# Snapshot will be plaintext on the disk, convert to some kind of HTML
|
||||
snapshot_contents = snapshot_contents.replace('\n', line_feed_sep)
|
||||
elif n_object.get('notification_format') == 'HTML Color':
|
||||
line_feed_sep = "<br>"
|
||||
# Snapshot will be plaintext on the disk, convert to some kind of HTML
|
||||
snapshot_contents = snapshot_contents.replace('\n', line_feed_sep)
|
||||
html_colour_enable = True
|
||||
else:
|
||||
line_feed_sep = "\n"
|
||||
|
||||
triggered_text = ''
|
||||
if len(trigger_text):
|
||||
from . import html_tools
|
||||
triggered_text = html_tools.get_triggered_text(content=snapshot_contents, trigger_text=trigger_text)
|
||||
if triggered_text:
|
||||
triggered_text = line_feed_sep.join(triggered_text)
|
||||
|
||||
# Could be called as a 'test notification' with only 1 snapshot available
|
||||
prev_snapshot = "Example text: example test\nExample text: change detection is cool\nExample text: some more examples\n"
|
||||
current_snapshot = "Example text: example test\nExample text: change detection is fantastic\nExample text: even more examples\nExample text: a lot more examples"
|
||||
|
||||
if len(dates) > 1:
|
||||
prev_snapshot = watch.get_history_snapshot(dates[-2])
|
||||
current_snapshot = watch.get_history_snapshot(dates[-1])
|
||||
|
||||
n_object.update({
|
||||
'current_snapshot': snapshot_contents,
|
||||
'diff': diff.render_diff(prev_snapshot, current_snapshot, line_feed_sep=line_feed_sep, html_colour=html_colour_enable),
|
||||
'diff_added': diff.render_diff(prev_snapshot, current_snapshot, include_removed=False, line_feed_sep=line_feed_sep),
|
||||
'diff_full': diff.render_diff(prev_snapshot, current_snapshot, include_equal=True, line_feed_sep=line_feed_sep, html_colour=html_colour_enable),
|
||||
'diff_patch': diff.render_diff(prev_snapshot, current_snapshot, line_feed_sep=line_feed_sep, patch_format=True),
|
||||
'diff_removed': diff.render_diff(prev_snapshot, current_snapshot, include_added=False, line_feed_sep=line_feed_sep),
|
||||
'notification_timestamp': now,
|
||||
'screenshot': watch.get_screenshot() if watch and watch.get('notification_screenshot') else None,
|
||||
'triggered_text': triggered_text,
|
||||
'uuid': watch.get('uuid') if watch else None,
|
||||
'watch_url': watch.get('url') if watch else None,
|
||||
})
|
||||
|
||||
if watch:
|
||||
n_object.update(watch.extra_notification_token_values())
|
||||
|
||||
logger.trace(f"Main rendered notification placeholders (diff_added etc) calculated in {time.time()-now:.3f}s")
|
||||
logger.debug("Queued notification for sending")
|
||||
self.notification_q.put(n_object)
|
||||
|
||||
def _check_cascading_vars(self, var_name, watch):
|
||||
"""
|
||||
Check notification variables in cascading priority:
|
||||
Individual watch settings > Tag settings > Global settings
|
||||
"""
|
||||
from changedetectionio.notification import (
|
||||
default_notification_format_for_watch,
|
||||
default_notification_body,
|
||||
default_notification_title
|
||||
)
|
||||
|
||||
# Would be better if this was some kind of Object where Watch can reference the parent datastore etc
|
||||
v = watch.get(var_name)
|
||||
if v and not watch.get('notification_muted'):
|
||||
if var_name == 'notification_format' and v == default_notification_format_for_watch:
|
||||
return self.datastore.data['settings']['application'].get('notification_format')
|
||||
|
||||
return v
|
||||
|
||||
tags = self.datastore.get_all_tags_for_watch(uuid=watch.get('uuid'))
|
||||
if tags:
|
||||
for tag_uuid, tag in tags.items():
|
||||
v = tag.get(var_name)
|
||||
if v and not tag.get('notification_muted'):
|
||||
return v
|
||||
|
||||
if self.datastore.data['settings']['application'].get(var_name):
|
||||
return self.datastore.data['settings']['application'].get(var_name)
|
||||
|
||||
# Otherwise could be defaults
|
||||
if var_name == 'notification_format':
|
||||
return default_notification_format_for_watch
|
||||
if var_name == 'notification_body':
|
||||
return default_notification_body
|
||||
if var_name == 'notification_title':
|
||||
return default_notification_title
|
||||
|
||||
return None
|
||||
|
||||
def send_content_changed_notification(self, watch_uuid):
|
||||
"""
|
||||
Send notification when content changes are detected
|
||||
"""
|
||||
n_object = {}
|
||||
watch = self.datastore.data['watching'].get(watch_uuid)
|
||||
if not watch:
|
||||
return
|
||||
|
||||
watch_history = watch.history
|
||||
dates = list(watch_history.keys())
|
||||
# Theoretically it's possible that this could be just 1 long,
|
||||
# - In the case that the timestamp key was not unique
|
||||
if len(dates) == 1:
|
||||
raise ValueError(
|
||||
"History index had 2 or more, but only 1 date loaded, timestamps were not unique? maybe two of the same timestamps got written, needs more delay?"
|
||||
)
|
||||
|
||||
# Should be a better parent getter in the model object
|
||||
|
||||
# Prefer - Individual watch settings > Tag settings > Global settings (in that order)
|
||||
n_object['notification_urls'] = self._check_cascading_vars('notification_urls', watch)
|
||||
n_object['notification_title'] = self._check_cascading_vars('notification_title', watch)
|
||||
n_object['notification_body'] = self._check_cascading_vars('notification_body', watch)
|
||||
n_object['notification_format'] = self._check_cascading_vars('notification_format', watch)
|
||||
|
||||
# (Individual watch) Only prepare to notify if the rules above matched
|
||||
queued = False
|
||||
if n_object and n_object.get('notification_urls'):
|
||||
queued = True
|
||||
|
||||
count = watch.get('notification_alert_count', 0) + 1
|
||||
self.datastore.update_watch(uuid=watch_uuid, update_obj={'notification_alert_count': count})
|
||||
|
||||
self.queue_notification_for_watch(n_object=n_object, watch=watch)
|
||||
|
||||
return queued
|
||||
|
||||
def send_filter_failure_notification(self, watch_uuid):
|
||||
"""
|
||||
Send notification when CSS/XPath filters fail consecutively
|
||||
"""
|
||||
threshold = self.datastore.data['settings']['application'].get('filter_failure_notification_threshold_attempts')
|
||||
watch = self.datastore.data['watching'].get(watch_uuid)
|
||||
if not watch:
|
||||
return
|
||||
|
||||
n_object = {'notification_title': 'Changedetection.io - Alert - CSS/xPath filter was not present in the page',
|
||||
'notification_body': "Your configured CSS/xPath filters of '{}' for {{{{watch_url}}}} did not appear on the page after {} attempts, did the page change layout?\n\nLink: {{{{base_url}}}}/edit/{{{{watch_uuid}}}}\n\nThanks - Your omniscient changedetection.io installation :)\n".format(
|
||||
", ".join(watch['include_filters']),
|
||||
threshold),
|
||||
'notification_format': 'text'}
|
||||
|
||||
if len(watch['notification_urls']):
|
||||
n_object['notification_urls'] = watch['notification_urls']
|
||||
|
||||
elif len(self.datastore.data['settings']['application']['notification_urls']):
|
||||
n_object['notification_urls'] = self.datastore.data['settings']['application']['notification_urls']
|
||||
|
||||
# Only prepare to notify if the rules above matched
|
||||
if 'notification_urls' in n_object:
|
||||
n_object.update({
|
||||
'watch_url': watch['url'],
|
||||
'uuid': watch_uuid,
|
||||
'screenshot': None
|
||||
})
|
||||
self.notification_q.put(n_object)
|
||||
logger.debug(f"Sent filter not found notification for {watch_uuid}")
|
||||
else:
|
||||
logger.debug(f"NOT sending filter not found notification for {watch_uuid} - no notification URLs")
|
||||
|
||||
def send_step_failure_notification(self, watch_uuid, step_n):
|
||||
"""
|
||||
Send notification when browser steps fail consecutively
|
||||
"""
|
||||
watch = self.datastore.data['watching'].get(watch_uuid, False)
|
||||
if not watch:
|
||||
return
|
||||
threshold = self.datastore.data['settings']['application'].get('filter_failure_notification_threshold_attempts')
|
||||
n_object = {'notification_title': "Changedetection.io - Alert - Browser step at position {} could not be run".format(step_n+1),
|
||||
'notification_body': "Your configured browser step at position {} for {{{{watch_url}}}} "
|
||||
"did not appear on the page after {} attempts, did the page change layout? "
|
||||
"Does it need a delay added?\n\nLink: {{{{base_url}}}}/edit/{{{{watch_uuid}}}}\n\n"
|
||||
"Thanks - Your omniscient changedetection.io installation :)\n".format(step_n+1, threshold),
|
||||
'notification_format': 'text'}
|
||||
|
||||
if len(watch['notification_urls']):
|
||||
n_object['notification_urls'] = watch['notification_urls']
|
||||
|
||||
elif len(self.datastore.data['settings']['application']['notification_urls']):
|
||||
n_object['notification_urls'] = self.datastore.data['settings']['application']['notification_urls']
|
||||
|
||||
# Only prepare to notify if the rules above matched
|
||||
if 'notification_urls' in n_object:
|
||||
n_object.update({
|
||||
'watch_url': watch['url'],
|
||||
'uuid': watch_uuid
|
||||
})
|
||||
self.notification_q.put(n_object)
|
||||
logger.error(f"Sent step not found notification for {watch_uuid}")
|
||||
|
||||
|
||||
# Convenience functions for creating notification service instances
|
||||
def create_notification_service(datastore, notification_q):
|
||||
"""
|
||||
Factory function to create a NotificationService instance
|
||||
"""
|
||||
return NotificationService(datastore, notification_q)
|
|
@ -1,608 +0,0 @@
|
|||
from .processors.exceptions import ProcessorException
|
||||
import changedetectionio.content_fetchers.exceptions as content_fetchers_exceptions
|
||||
from changedetectionio.processors.text_json_diff.processor import FilterNotFoundInResponse
|
||||
from changedetectionio import html_tools
|
||||
from changedetectionio.flask_app import watch_check_update
|
||||
|
||||
import importlib
|
||||
import os
|
||||
import queue
|
||||
import threading
|
||||
import time
|
||||
|
||||
# A single update worker
|
||||
#
|
||||
# Requests for checking on a single site(watch) from a queue of watches
|
||||
# (another process inserts watches into the queue that are time-ready for checking)
|
||||
|
||||
from loguru import logger
|
||||
|
||||
class update_worker(threading.Thread):
|
||||
current_uuid = None
|
||||
|
||||
def __init__(self, q, notification_q, app, datastore, *args, **kwargs):
|
||||
self.q = q
|
||||
self.app = app
|
||||
self.notification_q = notification_q
|
||||
self.datastore = datastore
|
||||
super().__init__(*args, **kwargs)
|
||||
|
||||
def queue_notification_for_watch(self, notification_q, n_object, watch):
|
||||
from changedetectionio import diff
|
||||
from changedetectionio.notification import default_notification_format_for_watch
|
||||
|
||||
dates = []
|
||||
trigger_text = ''
|
||||
|
||||
now = time.time()
|
||||
|
||||
if watch:
|
||||
watch_history = watch.history
|
||||
dates = list(watch_history.keys())
|
||||
trigger_text = watch.get('trigger_text', [])
|
||||
|
||||
# Add text that was triggered
|
||||
if len(dates):
|
||||
snapshot_contents = watch.get_history_snapshot(dates[-1])
|
||||
else:
|
||||
snapshot_contents = "No snapshot/history available, the watch should fetch atleast once."
|
||||
|
||||
# If we ended up here with "System default"
|
||||
if n_object.get('notification_format') == default_notification_format_for_watch:
|
||||
n_object['notification_format'] = self.datastore.data['settings']['application'].get('notification_format')
|
||||
|
||||
html_colour_enable = False
|
||||
# HTML needs linebreak, but MarkDown and Text can use a linefeed
|
||||
if n_object.get('notification_format') == 'HTML':
|
||||
line_feed_sep = "<br>"
|
||||
# Snapshot will be plaintext on the disk, convert to some kind of HTML
|
||||
snapshot_contents = snapshot_contents.replace('\n', line_feed_sep)
|
||||
elif n_object.get('notification_format') == 'HTML Color':
|
||||
line_feed_sep = "<br>"
|
||||
# Snapshot will be plaintext on the disk, convert to some kind of HTML
|
||||
snapshot_contents = snapshot_contents.replace('\n', line_feed_sep)
|
||||
html_colour_enable = True
|
||||
else:
|
||||
line_feed_sep = "\n"
|
||||
|
||||
triggered_text = ''
|
||||
if len(trigger_text):
|
||||
from . import html_tools
|
||||
triggered_text = html_tools.get_triggered_text(content=snapshot_contents, trigger_text=trigger_text)
|
||||
if triggered_text:
|
||||
triggered_text = line_feed_sep.join(triggered_text)
|
||||
|
||||
# Could be called as a 'test notification' with only 1 snapshot available
|
||||
prev_snapshot = "Example text: example test\nExample text: change detection is cool\nExample text: some more examples\n"
|
||||
current_snapshot = "Example text: example test\nExample text: change detection is fantastic\nExample text: even more examples\nExample text: a lot more examples"
|
||||
|
||||
if len(dates) > 1:
|
||||
prev_snapshot = watch.get_history_snapshot(dates[-2])
|
||||
current_snapshot = watch.get_history_snapshot(dates[-1])
|
||||
|
||||
n_object.update({
|
||||
'current_snapshot': snapshot_contents,
|
||||
'diff': diff.render_diff(prev_snapshot, current_snapshot, line_feed_sep=line_feed_sep, html_colour=html_colour_enable),
|
||||
'diff_added': diff.render_diff(prev_snapshot, current_snapshot, include_removed=False, line_feed_sep=line_feed_sep),
|
||||
'diff_full': diff.render_diff(prev_snapshot, current_snapshot, include_equal=True, line_feed_sep=line_feed_sep, html_colour=html_colour_enable),
|
||||
'diff_patch': diff.render_diff(prev_snapshot, current_snapshot, line_feed_sep=line_feed_sep, patch_format=True),
|
||||
'diff_removed': diff.render_diff(prev_snapshot, current_snapshot, include_added=False, line_feed_sep=line_feed_sep),
|
||||
'notification_timestamp': now,
|
||||
'screenshot': watch.get_screenshot() if watch and watch.get('notification_screenshot') else None,
|
||||
'triggered_text': triggered_text,
|
||||
'uuid': watch.get('uuid') if watch else None,
|
||||
'watch_url': watch.get('url') if watch else None,
|
||||
})
|
||||
|
||||
if watch:
|
||||
n_object.update(watch.extra_notification_token_values())
|
||||
|
||||
logger.trace(f"Main rendered notification placeholders (diff_added etc) calculated in {time.time()-now:.3f}s")
|
||||
logger.debug("Queued notification for sending")
|
||||
notification_q.put(n_object)
|
||||
|
||||
# Prefer - Individual watch settings > Tag settings > Global settings (in that order)
|
||||
def _check_cascading_vars(self, var_name, watch):
|
||||
|
||||
from changedetectionio.notification import (
|
||||
default_notification_format_for_watch,
|
||||
default_notification_body,
|
||||
default_notification_title
|
||||
)
|
||||
|
||||
# Would be better if this was some kind of Object where Watch can reference the parent datastore etc
|
||||
v = watch.get(var_name)
|
||||
if v and not watch.get('notification_muted'):
|
||||
if var_name == 'notification_format' and v == default_notification_format_for_watch:
|
||||
return self.datastore.data['settings']['application'].get('notification_format')
|
||||
|
||||
return v
|
||||
|
||||
tags = self.datastore.get_all_tags_for_watch(uuid=watch.get('uuid'))
|
||||
if tags:
|
||||
for tag_uuid, tag in tags.items():
|
||||
v = tag.get(var_name)
|
||||
if v and not tag.get('notification_muted'):
|
||||
return v
|
||||
|
||||
if self.datastore.data['settings']['application'].get(var_name):
|
||||
return self.datastore.data['settings']['application'].get(var_name)
|
||||
|
||||
# Otherwise could be defaults
|
||||
if var_name == 'notification_format':
|
||||
return default_notification_format_for_watch
|
||||
if var_name == 'notification_body':
|
||||
return default_notification_body
|
||||
if var_name == 'notification_title':
|
||||
return default_notification_title
|
||||
|
||||
return None
|
||||
|
||||
def send_content_changed_notification(self, watch_uuid):
|
||||
|
||||
n_object = {}
|
||||
watch = self.datastore.data['watching'].get(watch_uuid)
|
||||
if not watch:
|
||||
return
|
||||
|
||||
watch_history = watch.history
|
||||
dates = list(watch_history.keys())
|
||||
# Theoretically it's possible that this could be just 1 long,
|
||||
# - In the case that the timestamp key was not unique
|
||||
if len(dates) == 1:
|
||||
raise ValueError(
|
||||
"History index had 2 or more, but only 1 date loaded, timestamps were not unique? maybe two of the same timestamps got written, needs more delay?"
|
||||
)
|
||||
|
||||
# Should be a better parent getter in the model object
|
||||
|
||||
# Prefer - Individual watch settings > Tag settings > Global settings (in that order)
|
||||
n_object['notification_urls'] = self._check_cascading_vars('notification_urls', watch)
|
||||
n_object['notification_title'] = self._check_cascading_vars('notification_title', watch)
|
||||
n_object['notification_body'] = self._check_cascading_vars('notification_body', watch)
|
||||
n_object['notification_format'] = self._check_cascading_vars('notification_format', watch)
|
||||
|
||||
# (Individual watch) Only prepare to notify if the rules above matched
|
||||
queued = False
|
||||
if n_object and n_object.get('notification_urls'):
|
||||
queued = True
|
||||
|
||||
count = watch.get('notification_alert_count', 0) + 1
|
||||
self.datastore.update_watch(uuid=watch_uuid, update_obj={'notification_alert_count': count})
|
||||
|
||||
self.queue_notification_for_watch(notification_q=self.notification_q, n_object=n_object, watch=watch)
|
||||
|
||||
return queued
|
||||
|
||||
|
||||
def send_filter_failure_notification(self, watch_uuid):
|
||||
|
||||
threshold = self.datastore.data['settings']['application'].get('filter_failure_notification_threshold_attempts')
|
||||
watch = self.datastore.data['watching'].get(watch_uuid)
|
||||
if not watch:
|
||||
return
|
||||
|
||||
n_object = {'notification_title': 'Changedetection.io - Alert - CSS/xPath filter was not present in the page',
|
||||
'notification_body': "Your configured CSS/xPath filters of '{}' for {{{{watch_url}}}} did not appear on the page after {} attempts, did the page change layout?\n\nLink: {{{{base_url}}}}/edit/{{{{watch_uuid}}}}\n\nThanks - Your omniscient changedetection.io installation :)\n".format(
|
||||
", ".join(watch['include_filters']),
|
||||
threshold),
|
||||
'notification_format': 'text'}
|
||||
|
||||
if len(watch['notification_urls']):
|
||||
n_object['notification_urls'] = watch['notification_urls']
|
||||
|
||||
elif len(self.datastore.data['settings']['application']['notification_urls']):
|
||||
n_object['notification_urls'] = self.datastore.data['settings']['application']['notification_urls']
|
||||
|
||||
# Only prepare to notify if the rules above matched
|
||||
if 'notification_urls' in n_object:
|
||||
n_object.update({
|
||||
'watch_url': watch['url'],
|
||||
'uuid': watch_uuid,
|
||||
'screenshot': None
|
||||
})
|
||||
self.notification_q.put(n_object)
|
||||
logger.debug(f"Sent filter not found notification for {watch_uuid}")
|
||||
else:
|
||||
logger.debug(f"NOT sending filter not found notification for {watch_uuid} - no notification URLs")
|
||||
|
||||
def send_step_failure_notification(self, watch_uuid, step_n):
|
||||
watch = self.datastore.data['watching'].get(watch_uuid, False)
|
||||
if not watch:
|
||||
return
|
||||
threshold = self.datastore.data['settings']['application'].get('filter_failure_notification_threshold_attempts')
|
||||
n_object = {'notification_title': "Changedetection.io - Alert - Browser step at position {} could not be run".format(step_n+1),
|
||||
'notification_body': "Your configured browser step at position {} for {{{{watch_url}}}} "
|
||||
"did not appear on the page after {} attempts, did the page change layout? "
|
||||
"Does it need a delay added?\n\nLink: {{{{base_url}}}}/edit/{{{{watch_uuid}}}}\n\n"
|
||||
"Thanks - Your omniscient changedetection.io installation :)\n".format(step_n+1, threshold),
|
||||
'notification_format': 'text'}
|
||||
|
||||
if len(watch['notification_urls']):
|
||||
n_object['notification_urls'] = watch['notification_urls']
|
||||
|
||||
elif len(self.datastore.data['settings']['application']['notification_urls']):
|
||||
n_object['notification_urls'] = self.datastore.data['settings']['application']['notification_urls']
|
||||
|
||||
# Only prepare to notify if the rules above matched
|
||||
if 'notification_urls' in n_object:
|
||||
n_object.update({
|
||||
'watch_url': watch['url'],
|
||||
'uuid': watch_uuid
|
||||
})
|
||||
self.notification_q.put(n_object)
|
||||
logger.error(f"Sent step not found notification for {watch_uuid}")
|
||||
|
||||
|
||||
def cleanup_error_artifacts(self, uuid):
|
||||
# All went fine, remove error artifacts
|
||||
cleanup_files = ["last-error-screenshot.png", "last-error.txt"]
|
||||
for f in cleanup_files:
|
||||
full_path = os.path.join(self.datastore.datastore_path, uuid, f)
|
||||
if os.path.isfile(full_path):
|
||||
os.unlink(full_path)
|
||||
|
||||
def run(self):
|
||||
|
||||
while not self.app.config.exit.is_set():
|
||||
update_handler = None
|
||||
watch = None
|
||||
|
||||
try:
|
||||
queued_item_data = self.q.get(block=False)
|
||||
except queue.Empty:
|
||||
pass
|
||||
else:
|
||||
uuid = queued_item_data.item.get('uuid')
|
||||
fetch_start_time = round(time.time()) # Also used for a unique history key for now
|
||||
self.current_uuid = uuid
|
||||
if uuid in list(self.datastore.data['watching'].keys()) and self.datastore.data['watching'][uuid].get('url'):
|
||||
changed_detected = False
|
||||
contents = b''
|
||||
process_changedetection_results = True
|
||||
update_obj = {}
|
||||
|
||||
|
||||
# Clear last errors (move to preflight func?)
|
||||
self.datastore.data['watching'][uuid]['browser_steps_last_error_step'] = None
|
||||
self.datastore.data['watching'][uuid]['last_checked'] = fetch_start_time
|
||||
|
||||
watch = self.datastore.data['watching'].get(uuid)
|
||||
|
||||
logger.info(f"Processing watch UUID {uuid} Priority {queued_item_data.priority} URL {watch['url']}")
|
||||
|
||||
try:
|
||||
watch_check_update.send(watch_uuid=uuid)
|
||||
|
||||
# Processor is what we are using for detecting the "Change"
|
||||
processor = watch.get('processor', 'text_json_diff')
|
||||
|
||||
# Init a new 'difference_detection_processor', first look in processors
|
||||
processor_module_name = f"changedetectionio.processors.{processor}.processor"
|
||||
try:
|
||||
processor_module = importlib.import_module(processor_module_name)
|
||||
except ModuleNotFoundError as e:
|
||||
print(f"Processor module '{processor}' not found.")
|
||||
raise e
|
||||
|
||||
update_handler = processor_module.perform_site_check(datastore=self.datastore,
|
||||
watch_uuid=uuid
|
||||
)
|
||||
|
||||
update_handler.call_browser()
|
||||
|
||||
changed_detected, update_obj, contents = update_handler.run_changedetection(watch=watch)
|
||||
|
||||
# Re #342
|
||||
# In Python 3, all strings are sequences of Unicode characters. There is a bytes type that holds raw bytes.
|
||||
# We then convert/.decode('utf-8') for the notification etc
|
||||
# if not isinstance(contents, (bytes, bytearray)):
|
||||
# raise Exception("Error - returned data from the fetch handler SHOULD be bytes")
|
||||
except PermissionError as e:
|
||||
logger.critical(f"File permission error updating file, watch: {uuid}")
|
||||
logger.critical(str(e))
|
||||
process_changedetection_results = False
|
||||
|
||||
# A generic other-exception thrown by processors
|
||||
except ProcessorException as e:
|
||||
if e.screenshot:
|
||||
watch.save_screenshot(screenshot=e.screenshot)
|
||||
if e.xpath_data:
|
||||
watch.save_xpath_data(data=e.xpath_data)
|
||||
self.datastore.update_watch(uuid=uuid, update_obj={'last_error': e.message})
|
||||
process_changedetection_results = False
|
||||
|
||||
except content_fetchers_exceptions.ReplyWithContentButNoText as e:
|
||||
# Totally fine, it's by choice - just continue on, nothing more to care about
|
||||
# Page had elements/content but no renderable text
|
||||
# Backend (not filters) gave zero output
|
||||
extra_help = ""
|
||||
if e.has_filters:
|
||||
# Maybe it contains an image? offer a more helpful link
|
||||
has_img = html_tools.include_filters(include_filters='img',
|
||||
html_content=e.html_content)
|
||||
if has_img:
|
||||
extra_help = ", it's possible that the filters you have give an empty result or contain only an image."
|
||||
else:
|
||||
extra_help = ", it's possible that the filters were found, but contained no usable text."
|
||||
|
||||
self.datastore.update_watch(uuid=uuid, update_obj={
|
||||
'last_error': f"Got HTML content but no text found (With {e.status_code} reply code){extra_help}"
|
||||
})
|
||||
|
||||
if e.screenshot:
|
||||
watch.save_screenshot(screenshot=e.screenshot, as_error=True)
|
||||
|
||||
if e.xpath_data:
|
||||
watch.save_xpath_data(data=e.xpath_data)
|
||||
|
||||
process_changedetection_results = False
|
||||
|
||||
except content_fetchers_exceptions.Non200ErrorCodeReceived as e:
|
||||
if e.status_code == 403:
|
||||
err_text = "Error - 403 (Access denied) received"
|
||||
elif e.status_code == 404:
|
||||
err_text = "Error - 404 (Page not found) received"
|
||||
elif e.status_code == 407:
|
||||
err_text = "Error - 407 (Proxy authentication required) received, did you need a username and password for the proxy?"
|
||||
elif e.status_code == 500:
|
||||
err_text = "Error - 500 (Internal server error) received from the web site"
|
||||
else:
|
||||
extra = ' (Access denied or blocked)' if str(e.status_code).startswith('4') else ''
|
||||
err_text = f"Error - Request returned a HTTP error code {e.status_code}{extra}"
|
||||
|
||||
if e.screenshot:
|
||||
watch.save_screenshot(screenshot=e.screenshot, as_error=True)
|
||||
if e.xpath_data:
|
||||
watch.save_xpath_data(data=e.xpath_data, as_error=True)
|
||||
if e.page_text:
|
||||
watch.save_error_text(contents=e.page_text)
|
||||
|
||||
self.datastore.update_watch(uuid=uuid, update_obj={'last_error': err_text})
|
||||
process_changedetection_results = False
|
||||
|
||||
except FilterNotFoundInResponse as e:
|
||||
if not self.datastore.data['watching'].get(uuid):
|
||||
continue
|
||||
|
||||
err_text = "Warning, no filters were found, no change detection ran - Did the page change layout? update your Visual Filter if necessary."
|
||||
self.datastore.update_watch(uuid=uuid, update_obj={'last_error': err_text})
|
||||
|
||||
# Filter wasnt found, but we should still update the visual selector so that they can have a chance to set it up again
|
||||
if e.screenshot:
|
||||
watch.save_screenshot(screenshot=e.screenshot)
|
||||
|
||||
if e.xpath_data:
|
||||
watch.save_xpath_data(data=e.xpath_data)
|
||||
|
||||
# Only when enabled, send the notification
|
||||
if watch.get('filter_failure_notification_send', False):
|
||||
c = watch.get('consecutive_filter_failures', 0)
|
||||
c += 1
|
||||
# Send notification if we reached the threshold?
|
||||
threshold = self.datastore.data['settings']['application'].get('filter_failure_notification_threshold_attempts', 0)
|
||||
logger.debug(f"Filter for {uuid} not found, consecutive_filter_failures: {c} of threshold {threshold}")
|
||||
if c >= threshold:
|
||||
if not watch.get('notification_muted'):
|
||||
logger.debug(f"Sending filter failed notification for {uuid}")
|
||||
self.send_filter_failure_notification(uuid)
|
||||
c = 0
|
||||
logger.debug(f"Reset filter failure count back to zero")
|
||||
|
||||
self.datastore.update_watch(uuid=uuid, update_obj={'consecutive_filter_failures': c})
|
||||
else:
|
||||
logger.trace(f"{uuid} - filter_failure_notification_send not enabled, skipping")
|
||||
|
||||
|
||||
process_changedetection_results = False
|
||||
|
||||
except content_fetchers_exceptions.checksumFromPreviousCheckWasTheSame as e:
|
||||
# Yes fine, so nothing todo, don't continue to process.
|
||||
process_changedetection_results = False
|
||||
changed_detected = False
|
||||
except content_fetchers_exceptions.BrowserConnectError as e:
|
||||
self.datastore.update_watch(uuid=uuid,
|
||||
update_obj={'last_error': e.msg
|
||||
}
|
||||
)
|
||||
process_changedetection_results = False
|
||||
except content_fetchers_exceptions.BrowserFetchTimedOut as e:
|
||||
self.datastore.update_watch(uuid=uuid,
|
||||
update_obj={'last_error': e.msg
|
||||
}
|
||||
)
|
||||
process_changedetection_results = False
|
||||
except content_fetchers_exceptions.BrowserStepsStepException as e:
|
||||
|
||||
if not self.datastore.data['watching'].get(uuid):
|
||||
continue
|
||||
|
||||
error_step = e.step_n + 1
|
||||
from playwright._impl._errors import TimeoutError, Error
|
||||
|
||||
# Generally enough info for TimeoutError (couldnt locate the element after default seconds)
|
||||
err_text = f"Browser step at position {error_step} could not run, check the watch, add a delay if necessary, view Browser Steps to see screenshot at that step."
|
||||
|
||||
if e.original_e.name == "TimeoutError":
|
||||
# Just the first line is enough, the rest is the stack trace
|
||||
err_text += " Could not find the target."
|
||||
else:
|
||||
# Other Error, more info is good.
|
||||
err_text += " " + str(e.original_e).splitlines()[0]
|
||||
|
||||
logger.debug(f"BrowserSteps exception at step {error_step} {str(e.original_e)}")
|
||||
|
||||
self.datastore.update_watch(uuid=uuid,
|
||||
update_obj={'last_error': err_text,
|
||||
'browser_steps_last_error_step': error_step
|
||||
}
|
||||
)
|
||||
|
||||
if watch.get('filter_failure_notification_send', False):
|
||||
c = watch.get('consecutive_filter_failures', 0)
|
||||
c += 1
|
||||
# Send notification if we reached the threshold?
|
||||
threshold = self.datastore.data['settings']['application'].get('filter_failure_notification_threshold_attempts',
|
||||
0)
|
||||
logger.error(f"Step for {uuid} not found, consecutive_filter_failures: {c}")
|
||||
if threshold > 0 and c >= threshold:
|
||||
if not watch.get('notification_muted'):
|
||||
self.send_step_failure_notification(watch_uuid=uuid, step_n=e.step_n)
|
||||
c = 0
|
||||
|
||||
self.datastore.update_watch(uuid=uuid, update_obj={'consecutive_filter_failures': c})
|
||||
|
||||
process_changedetection_results = False
|
||||
|
||||
except content_fetchers_exceptions.EmptyReply as e:
|
||||
# Some kind of custom to-str handler in the exception handler that does this?
|
||||
err_text = "EmptyReply - try increasing 'Wait seconds before extracting text', Status Code {}".format(e.status_code)
|
||||
self.datastore.update_watch(uuid=uuid, update_obj={'last_error': err_text,
|
||||
'last_check_status': e.status_code})
|
||||
process_changedetection_results = False
|
||||
except content_fetchers_exceptions.ScreenshotUnavailable as e:
|
||||
err_text = "Screenshot unavailable, page did not render fully in the expected time or page was too long - try increasing 'Wait seconds before extracting text'"
|
||||
self.datastore.update_watch(uuid=uuid, update_obj={'last_error': err_text,
|
||||
'last_check_status': e.status_code})
|
||||
process_changedetection_results = False
|
||||
except content_fetchers_exceptions.JSActionExceptions as e:
|
||||
err_text = "Error running JS Actions - Page request - "+e.message
|
||||
if e.screenshot:
|
||||
watch.save_screenshot(screenshot=e.screenshot, as_error=True)
|
||||
self.datastore.update_watch(uuid=uuid, update_obj={'last_error': err_text,
|
||||
'last_check_status': e.status_code})
|
||||
process_changedetection_results = False
|
||||
except content_fetchers_exceptions.PageUnloadable as e:
|
||||
err_text = "Page request from server didnt respond correctly"
|
||||
if e.message:
|
||||
err_text = "{} - {}".format(err_text, e.message)
|
||||
|
||||
if e.screenshot:
|
||||
watch.save_screenshot(screenshot=e.screenshot, as_error=True)
|
||||
|
||||
self.datastore.update_watch(uuid=uuid, update_obj={'last_error': err_text,
|
||||
'last_check_status': e.status_code,
|
||||
'has_ldjson_price_data': None})
|
||||
process_changedetection_results = False
|
||||
except content_fetchers_exceptions.BrowserStepsInUnsupportedFetcher as e:
|
||||
err_text = "This watch has Browser Steps configured and so it cannot run with the 'Basic fast Plaintext/HTTP Client', either remove the Browser Steps or select a Chrome fetcher."
|
||||
self.datastore.update_watch(uuid=uuid, update_obj={'last_error': err_text})
|
||||
process_changedetection_results = False
|
||||
logger.error(f"Exception (BrowserStepsInUnsupportedFetcher) reached processing watch UUID: {uuid}")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Exception reached processing watch UUID: {uuid}")
|
||||
logger.error(str(e))
|
||||
self.datastore.update_watch(uuid=uuid, update_obj={'last_error': "Exception: " + str(e)})
|
||||
# Other serious error
|
||||
process_changedetection_results = False
|
||||
|
||||
else:
|
||||
# Crash protection, the watch entry could have been removed by this point (during a slow chrome fetch etc)
|
||||
if not self.datastore.data['watching'].get(uuid):
|
||||
continue
|
||||
|
||||
update_obj['content-type'] = update_handler.fetcher.get_all_headers().get('content-type', '').lower()
|
||||
|
||||
# Mark that we never had any failures
|
||||
if not watch.get('ignore_status_codes'):
|
||||
update_obj['consecutive_filter_failures'] = 0
|
||||
|
||||
# Everything ran OK, clean off any previous error
|
||||
update_obj['last_error'] = False
|
||||
|
||||
self.cleanup_error_artifacts(uuid)
|
||||
|
||||
if not self.datastore.data['watching'].get(uuid):
|
||||
continue
|
||||
|
||||
# Different exceptions mean that we may or may not want to bump the snapshot, trigger notifications etc
|
||||
if process_changedetection_results:
|
||||
|
||||
# Extract <title> as title if possible/requested.
|
||||
if self.datastore.data['settings']['application'].get('extract_title_as_title') or watch['extract_title_as_title']:
|
||||
if not watch['title'] or not len(watch['title']):
|
||||
try:
|
||||
update_obj['title'] = html_tools.extract_element(find='title', html_content=update_handler.fetcher.content)
|
||||
logger.info(f"UUID: {uuid} Extract <title> updated title to '{update_obj['title']}")
|
||||
except Exception as e:
|
||||
logger.warning(f"UUID: {uuid} Extract <title> as watch title was enabled, but couldn't find a <title>.")
|
||||
|
||||
try:
|
||||
self.datastore.update_watch(uuid=uuid, update_obj=update_obj)
|
||||
|
||||
|
||||
# Also save the snapshot on the first time checked, "last checked" will always be updated, so we just check history length.
|
||||
if changed_detected or not watch.history_n:
|
||||
|
||||
if update_handler.screenshot:
|
||||
watch.save_screenshot(screenshot=update_handler.screenshot)
|
||||
|
||||
if update_handler.xpath_data:
|
||||
watch.save_xpath_data(data=update_handler.xpath_data)
|
||||
|
||||
# Small hack so that we sleep just enough to allow 1 second between history snapshots
|
||||
# this is because history.txt indexes/keys snapshots by epoch seconds and we dont want dupe keys
|
||||
# @also - the keys are one per second at the most (for now)
|
||||
if watch.newest_history_key and int(fetch_start_time) == int(watch.newest_history_key):
|
||||
logger.warning(
|
||||
f"Timestamp {fetch_start_time} already exists, waiting 1 seconds so we have a unique key in history.txt")
|
||||
fetch_start_time += 1
|
||||
time.sleep(1)
|
||||
|
||||
watch.save_history_text(contents=contents,
|
||||
timestamp=int(fetch_start_time),
|
||||
snapshot_id=update_obj.get('previous_md5', 'none'))
|
||||
|
||||
|
||||
empty_pages_are_a_change = self.datastore.data['settings']['application'].get('empty_pages_are_a_change', False)
|
||||
if update_handler.fetcher.content or (not update_handler.fetcher.content and empty_pages_are_a_change):
|
||||
# attribute .last_changed is then based on this data
|
||||
watch.save_last_fetched_html(contents=update_handler.fetcher.content, timestamp=int(fetch_start_time))
|
||||
|
||||
# Notifications should only trigger on the second time (first time, we gather the initial snapshot)
|
||||
if watch.history_n >= 2:
|
||||
logger.info(f"Change detected in UUID {uuid} - {watch['url']}")
|
||||
if not watch.get('notification_muted'):
|
||||
# @todo only run this if notifications exist
|
||||
self.send_content_changed_notification(watch_uuid=uuid)
|
||||
|
||||
except Exception as e:
|
||||
# Catch everything possible here, so that if a worker crashes, we don't lose it until restart!
|
||||
logger.critical("!!!! Exception in update_worker while processing process_changedetection_results !!!")
|
||||
logger.critical(str(e))
|
||||
self.datastore.update_watch(uuid=uuid, update_obj={'last_error': str(e)})
|
||||
|
||||
|
||||
# Always record that we atleast tried
|
||||
count = watch.get('check_count', 0) + 1
|
||||
|
||||
# Record the 'server' header reply, can be used for actions in the future like cloudflare/akamai workarounds
|
||||
try:
|
||||
server_header = update_handler.fetcher.headers.get('server', '').strip().lower()[:255]
|
||||
self.datastore.update_watch(uuid=uuid,
|
||||
update_obj={'remote_server_reply': server_header}
|
||||
)
|
||||
except Exception as e:
|
||||
pass
|
||||
|
||||
self.datastore.update_watch(uuid=uuid, update_obj={'fetch_time': round(time.time() - fetch_start_time, 3),
|
||||
'check_count': count
|
||||
})
|
||||
|
||||
self.current_uuid = None # Done
|
||||
self.q.task_done()
|
||||
|
||||
# Send signal for watch check completion with the watch data
|
||||
if watch:
|
||||
logger.info(f"Sending watch_check_update signal for UUID {watch['uuid']}")
|
||||
watch_check_update.send(watch_uuid=watch['uuid'])
|
||||
|
||||
update_handler = None
|
||||
logger.debug(f"Watch {uuid} done in {time.time()-fetch_start_time:.2f}s")
|
||||
|
||||
|
||||
# Give the CPU time to interrupt
|
||||
time.sleep(0.1)
|
||||
|
||||
self.app.config.exit.wait(1)
|
|
@ -1,8 +1,8 @@
|
|||
"""
|
||||
Worker management module for changedetection.io
|
||||
|
||||
Handles both synchronous threaded workers and asynchronous workers,
|
||||
providing a unified interface for dynamic worker scaling.
|
||||
Handles asynchronous workers for dynamic worker scaling.
|
||||
Sync worker support has been removed in favor of async-only architecture.
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
|
@ -12,7 +12,6 @@ import time
|
|||
from loguru import logger
|
||||
|
||||
# Global worker state
|
||||
running_update_threads = []
|
||||
running_async_tasks = []
|
||||
async_loop = None
|
||||
async_loop_thread = None
|
||||
|
@ -20,7 +19,7 @@ async_loop_thread = None
|
|||
# Track currently processing UUIDs for async workers
|
||||
currently_processing_uuids = set()
|
||||
|
||||
# Configuration
|
||||
# Configuration - async workers only
|
||||
USE_ASYNC_WORKERS = True
|
||||
|
||||
|
||||
|
@ -86,96 +85,51 @@ async def start_single_async_worker(worker_id, update_q, notification_q, app, da
|
|||
logger.info(f"Async worker {worker_id} shutdown complete")
|
||||
|
||||
|
||||
def start_sync_workers(n_workers, update_q, notification_q, app, datastore):
|
||||
"""Start traditional threaded workers"""
|
||||
global running_update_threads
|
||||
from changedetectionio import update_worker
|
||||
|
||||
logger.info(f"Starting {n_workers} sync workers")
|
||||
for _ in range(n_workers):
|
||||
new_worker = update_worker.update_worker(update_q, notification_q, app, datastore)
|
||||
running_update_threads.append(new_worker)
|
||||
new_worker.start()
|
||||
|
||||
|
||||
def start_workers(n_workers, update_q, notification_q, app, datastore):
|
||||
"""Start workers based on configuration"""
|
||||
if USE_ASYNC_WORKERS:
|
||||
start_async_workers(n_workers, update_q, notification_q, app, datastore)
|
||||
else:
|
||||
start_sync_workers(n_workers, update_q, notification_q, app, datastore)
|
||||
"""Start async workers - sync workers are deprecated"""
|
||||
start_async_workers(n_workers, update_q, notification_q, app, datastore)
|
||||
|
||||
|
||||
def add_worker(update_q, notification_q, app, datastore):
|
||||
"""Add a new worker (for dynamic scaling)"""
|
||||
global running_async_tasks, running_update_threads
|
||||
"""Add a new async worker (for dynamic scaling)"""
|
||||
global running_async_tasks
|
||||
|
||||
if USE_ASYNC_WORKERS:
|
||||
if not async_loop:
|
||||
logger.error("Async loop not running, cannot add worker")
|
||||
return False
|
||||
|
||||
worker_id = len(running_async_tasks)
|
||||
logger.info(f"Adding async worker {worker_id}")
|
||||
if not async_loop:
|
||||
logger.error("Async loop not running, cannot add worker")
|
||||
return False
|
||||
|
||||
task_future = asyncio.run_coroutine_threadsafe(
|
||||
start_single_async_worker(worker_id, update_q, notification_q, app, datastore), async_loop
|
||||
)
|
||||
running_async_tasks.append(task_future)
|
||||
return True
|
||||
else:
|
||||
# Add sync worker
|
||||
from changedetectionio import update_worker
|
||||
logger.info(f"Adding sync worker {len(running_update_threads)}")
|
||||
|
||||
new_worker = update_worker.update_worker(update_q, notification_q, app, datastore)
|
||||
running_update_threads.append(new_worker)
|
||||
new_worker.start()
|
||||
return True
|
||||
worker_id = len(running_async_tasks)
|
||||
logger.info(f"Adding async worker {worker_id}")
|
||||
|
||||
task_future = asyncio.run_coroutine_threadsafe(
|
||||
start_single_async_worker(worker_id, update_q, notification_q, app, datastore), async_loop
|
||||
)
|
||||
running_async_tasks.append(task_future)
|
||||
return True
|
||||
|
||||
|
||||
def remove_worker():
|
||||
"""Remove a worker (for dynamic scaling)"""
|
||||
global running_async_tasks, running_update_threads
|
||||
"""Remove an async worker (for dynamic scaling)"""
|
||||
global running_async_tasks
|
||||
|
||||
if USE_ASYNC_WORKERS:
|
||||
if not running_async_tasks:
|
||||
return False
|
||||
|
||||
# Cancel the last worker
|
||||
task_future = running_async_tasks.pop()
|
||||
task_future.cancel()
|
||||
logger.info(f"Removed async worker, {len(running_async_tasks)} workers remaining")
|
||||
return True
|
||||
else:
|
||||
if not running_update_threads:
|
||||
return False
|
||||
|
||||
# Stop the last worker
|
||||
worker = running_update_threads.pop()
|
||||
# Note: Graceful shutdown would require adding stop mechanism to update_worker
|
||||
logger.info(f"Removed sync worker, {len(running_update_threads)} workers remaining")
|
||||
return True
|
||||
if not running_async_tasks:
|
||||
return False
|
||||
|
||||
# Cancel the last worker
|
||||
task_future = running_async_tasks.pop()
|
||||
task_future.cancel()
|
||||
logger.info(f"Removed async worker, {len(running_async_tasks)} workers remaining")
|
||||
return True
|
||||
|
||||
|
||||
def get_worker_count():
|
||||
"""Get current number of workers"""
|
||||
if USE_ASYNC_WORKERS:
|
||||
return len(running_async_tasks)
|
||||
else:
|
||||
return len(running_update_threads)
|
||||
"""Get current number of async workers"""
|
||||
return len(running_async_tasks)
|
||||
|
||||
|
||||
def get_running_uuids():
|
||||
"""Get list of UUIDs currently being processed"""
|
||||
if USE_ASYNC_WORKERS:
|
||||
return list(currently_processing_uuids)
|
||||
else:
|
||||
running_uuids = []
|
||||
for t in running_update_threads:
|
||||
if hasattr(t, 'current_uuid') and t.current_uuid:
|
||||
running_uuids.append(t.current_uuid)
|
||||
return running_uuids
|
||||
"""Get list of UUIDs currently being processed by async workers"""
|
||||
return list(currently_processing_uuids)
|
||||
|
||||
|
||||
def set_uuid_processing(uuid, processing=True):
|
||||
|
@ -195,44 +149,36 @@ def is_watch_running(watch_uuid):
|
|||
|
||||
|
||||
def queue_item_async_safe(update_q, item):
|
||||
"""Queue an item in a way that works with both sync and async queues"""
|
||||
if USE_ASYNC_WORKERS and async_loop:
|
||||
"""Queue an item for async queue processing"""
|
||||
if async_loop:
|
||||
# For async queue, schedule the put operation
|
||||
asyncio.run_coroutine_threadsafe(update_q.put(item), async_loop)
|
||||
else:
|
||||
# For sync queue, put directly
|
||||
update_q.put(item)
|
||||
logger.error("Async loop not available for queueing item")
|
||||
|
||||
|
||||
def shutdown_workers():
|
||||
"""Shutdown all workers gracefully"""
|
||||
global async_loop, async_loop_thread, running_async_tasks, running_update_threads
|
||||
"""Shutdown all async workers gracefully"""
|
||||
global async_loop, async_loop_thread, running_async_tasks
|
||||
|
||||
logger.info("Shutting down workers...")
|
||||
logger.info("Shutting down async workers...")
|
||||
|
||||
if USE_ASYNC_WORKERS:
|
||||
# Cancel all async tasks
|
||||
for task_future in running_async_tasks:
|
||||
task_future.cancel()
|
||||
running_async_tasks.clear()
|
||||
# Cancel all async tasks
|
||||
for task_future in running_async_tasks:
|
||||
task_future.cancel()
|
||||
running_async_tasks.clear()
|
||||
|
||||
# Stop the async event loop
|
||||
if async_loop:
|
||||
async_loop.call_soon_threadsafe(async_loop.stop)
|
||||
async_loop = None
|
||||
|
||||
# Stop the async event loop
|
||||
if async_loop:
|
||||
async_loop.call_soon_threadsafe(async_loop.stop)
|
||||
async_loop = None
|
||||
|
||||
# Wait for the async thread to finish
|
||||
if async_loop_thread and async_loop_thread.is_alive():
|
||||
async_loop_thread.join(timeout=5)
|
||||
async_loop_thread = None
|
||||
else:
|
||||
# Stop sync workers
|
||||
for worker in running_update_threads:
|
||||
# Note: Would need to add proper stop mechanism to update_worker
|
||||
pass
|
||||
running_update_threads.clear()
|
||||
# Wait for the async thread to finish
|
||||
if async_loop_thread and async_loop_thread.is_alive():
|
||||
async_loop_thread.join(timeout=5)
|
||||
async_loop_thread = None
|
||||
|
||||
logger.info("Workers shutdown complete")
|
||||
logger.info("Async workers shutdown complete")
|
||||
|
||||
|
||||
def adjust_async_worker_count(new_count, update_q=None, notification_q=None, app=None, datastore=None):
|
||||
|
@ -246,7 +192,7 @@ def adjust_async_worker_count(new_count, update_q=None, notification_q=None, app
|
|||
Returns:
|
||||
dict: Status of the adjustment operation
|
||||
"""
|
||||
global running_async_tasks, running_update_threads
|
||||
global running_async_tasks
|
||||
|
||||
current_count = get_worker_count()
|
||||
|
||||
|
@ -257,79 +203,71 @@ def adjust_async_worker_count(new_count, update_q=None, notification_q=None, app
|
|||
'current_count': current_count
|
||||
}
|
||||
|
||||
if USE_ASYNC_WORKERS:
|
||||
if new_count > current_count:
|
||||
# Add workers
|
||||
workers_to_add = new_count - current_count
|
||||
logger.info(f"Adding {workers_to_add} async workers (from {current_count} to {new_count})")
|
||||
|
||||
if not all([update_q, notification_q, app, datastore]):
|
||||
return {
|
||||
'status': 'error',
|
||||
'message': 'Missing required parameters to add workers',
|
||||
'current_count': current_count
|
||||
}
|
||||
|
||||
for i in range(workers_to_add):
|
||||
worker_id = len(running_async_tasks)
|
||||
task_future = asyncio.run_coroutine_threadsafe(
|
||||
start_single_async_worker(worker_id, update_q, notification_q, app, datastore),
|
||||
async_loop
|
||||
)
|
||||
running_async_tasks.append(task_future)
|
||||
|
||||
if new_count > current_count:
|
||||
# Add workers
|
||||
workers_to_add = new_count - current_count
|
||||
logger.info(f"Adding {workers_to_add} async workers (from {current_count} to {new_count})")
|
||||
|
||||
if not all([update_q, notification_q, app, datastore]):
|
||||
return {
|
||||
'status': 'success',
|
||||
'message': f'Added {workers_to_add} workers',
|
||||
'previous_count': current_count,
|
||||
'current_count': new_count
|
||||
'status': 'error',
|
||||
'message': 'Missing required parameters to add workers',
|
||||
'current_count': current_count
|
||||
}
|
||||
|
||||
else:
|
||||
# Remove workers
|
||||
workers_to_remove = current_count - new_count
|
||||
logger.info(f"Removing {workers_to_remove} async workers (from {current_count} to {new_count})")
|
||||
|
||||
removed_count = 0
|
||||
for _ in range(workers_to_remove):
|
||||
if running_async_tasks:
|
||||
task_future = running_async_tasks.pop()
|
||||
task_future.cancel()
|
||||
# Wait for the task to actually stop
|
||||
try:
|
||||
task_future.result(timeout=5) # 5 second timeout
|
||||
except Exception:
|
||||
pass # Task was cancelled, which is expected
|
||||
removed_count += 1
|
||||
|
||||
return {
|
||||
'status': 'success',
|
||||
'message': f'Removed {removed_count} workers',
|
||||
'previous_count': current_count,
|
||||
'current_count': current_count - removed_count
|
||||
}
|
||||
else:
|
||||
# Sync workers - more complex to adjust dynamically
|
||||
|
||||
for i in range(workers_to_add):
|
||||
worker_id = len(running_async_tasks)
|
||||
task_future = asyncio.run_coroutine_threadsafe(
|
||||
start_single_async_worker(worker_id, update_q, notification_q, app, datastore),
|
||||
async_loop
|
||||
)
|
||||
running_async_tasks.append(task_future)
|
||||
|
||||
return {
|
||||
'status': 'not_supported',
|
||||
'message': 'Dynamic worker adjustment not supported for sync workers',
|
||||
'current_count': current_count
|
||||
'status': 'success',
|
||||
'message': f'Added {workers_to_add} workers',
|
||||
'previous_count': current_count,
|
||||
'current_count': new_count
|
||||
}
|
||||
|
||||
else:
|
||||
# Remove workers
|
||||
workers_to_remove = current_count - new_count
|
||||
logger.info(f"Removing {workers_to_remove} async workers (from {current_count} to {new_count})")
|
||||
|
||||
removed_count = 0
|
||||
for _ in range(workers_to_remove):
|
||||
if running_async_tasks:
|
||||
task_future = running_async_tasks.pop()
|
||||
task_future.cancel()
|
||||
# Wait for the task to actually stop
|
||||
try:
|
||||
task_future.result(timeout=5) # 5 second timeout
|
||||
except Exception:
|
||||
pass # Task was cancelled, which is expected
|
||||
removed_count += 1
|
||||
|
||||
return {
|
||||
'status': 'success',
|
||||
'message': f'Removed {removed_count} workers',
|
||||
'previous_count': current_count,
|
||||
'current_count': current_count - removed_count
|
||||
}
|
||||
|
||||
|
||||
def get_worker_status():
|
||||
"""Get status information about workers"""
|
||||
"""Get status information about async workers"""
|
||||
return {
|
||||
'worker_type': 'async' if USE_ASYNC_WORKERS else 'sync',
|
||||
'worker_type': 'async',
|
||||
'worker_count': get_worker_count(),
|
||||
'running_uuids': get_running_uuids(),
|
||||
'async_loop_running': async_loop is not None if USE_ASYNC_WORKERS else None,
|
||||
'async_loop_running': async_loop is not None,
|
||||
}
|
||||
|
||||
|
||||
def check_worker_health(expected_count, update_q=None, notification_q=None, app=None, datastore=None):
|
||||
"""
|
||||
Check if the expected number of workers are running and restart any missing ones.
|
||||
Check if the expected number of async workers are running and restart any missing ones.
|
||||
|
||||
Args:
|
||||
expected_count: Expected number of workers
|
||||
|
@ -338,7 +276,7 @@ def check_worker_health(expected_count, update_q=None, notification_q=None, app=
|
|||
Returns:
|
||||
dict: Health check results
|
||||
"""
|
||||
global running_async_tasks, running_update_threads
|
||||
global running_async_tasks
|
||||
|
||||
current_count = get_worker_count()
|
||||
|
||||
|
@ -347,62 +285,53 @@ def check_worker_health(expected_count, update_q=None, notification_q=None, app=
|
|||
'status': 'healthy',
|
||||
'expected_count': expected_count,
|
||||
'actual_count': current_count,
|
||||
'message': f'All {expected_count} workers running'
|
||||
'message': f'All {expected_count} async workers running'
|
||||
}
|
||||
|
||||
if USE_ASYNC_WORKERS:
|
||||
# Check for crashed async workers
|
||||
dead_workers = []
|
||||
alive_count = 0
|
||||
# Check for crashed async workers
|
||||
dead_workers = []
|
||||
alive_count = 0
|
||||
|
||||
for i, task_future in enumerate(running_async_tasks[:]):
|
||||
if task_future.done():
|
||||
try:
|
||||
result = task_future.result()
|
||||
dead_workers.append(i)
|
||||
logger.warning(f"Async worker {i} completed unexpectedly")
|
||||
except Exception as e:
|
||||
dead_workers.append(i)
|
||||
logger.error(f"Async worker {i} crashed: {e}")
|
||||
else:
|
||||
alive_count += 1
|
||||
|
||||
# Remove dead workers from tracking
|
||||
for i in reversed(dead_workers):
|
||||
if i < len(running_async_tasks):
|
||||
running_async_tasks.pop(i)
|
||||
|
||||
missing_workers = expected_count - alive_count
|
||||
restarted_count = 0
|
||||
|
||||
if missing_workers > 0 and all([update_q, notification_q, app, datastore]):
|
||||
logger.info(f"Restarting {missing_workers} crashed async workers")
|
||||
|
||||
for i, task_future in enumerate(running_async_tasks[:]):
|
||||
if task_future.done():
|
||||
try:
|
||||
result = task_future.result()
|
||||
dead_workers.append(i)
|
||||
logger.warning(f"Async worker {i} completed unexpectedly")
|
||||
except Exception as e:
|
||||
dead_workers.append(i)
|
||||
logger.error(f"Async worker {i} crashed: {e}")
|
||||
else:
|
||||
alive_count += 1
|
||||
|
||||
# Remove dead workers from tracking
|
||||
for i in reversed(dead_workers):
|
||||
if i < len(running_async_tasks):
|
||||
running_async_tasks.pop(i)
|
||||
|
||||
missing_workers = expected_count - alive_count
|
||||
restarted_count = 0
|
||||
|
||||
if missing_workers > 0 and all([update_q, notification_q, app, datastore]):
|
||||
logger.info(f"Restarting {missing_workers} crashed async workers")
|
||||
|
||||
for i in range(missing_workers):
|
||||
worker_id = alive_count + i
|
||||
try:
|
||||
task_future = asyncio.run_coroutine_threadsafe(
|
||||
start_single_async_worker(worker_id, update_q, notification_q, app, datastore),
|
||||
async_loop
|
||||
)
|
||||
running_async_tasks.append(task_future)
|
||||
restarted_count += 1
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to restart worker {worker_id}: {e}")
|
||||
|
||||
return {
|
||||
'status': 'repaired' if restarted_count > 0 else 'degraded',
|
||||
'expected_count': expected_count,
|
||||
'actual_count': alive_count,
|
||||
'dead_workers': len(dead_workers),
|
||||
'restarted_workers': restarted_count,
|
||||
'message': f'Found {len(dead_workers)} dead workers, restarted {restarted_count}'
|
||||
}
|
||||
else:
|
||||
# For sync workers, just report the issue (harder to auto-restart)
|
||||
return {
|
||||
'status': 'degraded',
|
||||
'expected_count': expected_count,
|
||||
'actual_count': current_count,
|
||||
'message': f'Worker count mismatch: expected {expected_count}, got {current_count}'
|
||||
}
|
||||
for i in range(missing_workers):
|
||||
worker_id = alive_count + i
|
||||
try:
|
||||
task_future = asyncio.run_coroutine_threadsafe(
|
||||
start_single_async_worker(worker_id, update_q, notification_q, app, datastore),
|
||||
async_loop
|
||||
)
|
||||
running_async_tasks.append(task_future)
|
||||
restarted_count += 1
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to restart worker {worker_id}: {e}")
|
||||
|
||||
return {
|
||||
'status': 'repaired' if restarted_count > 0 else 'degraded',
|
||||
'expected_count': expected_count,
|
||||
'actual_count': alive_count,
|
||||
'dead_workers': len(dead_workers),
|
||||
'restarted_workers': restarted_count,
|
||||
'message': f'Found {len(dead_workers)} dead workers, restarted {restarted_count}'
|
||||
}
|
Ładowanie…
Reference in New Issue