From 3d61ce8df79fd8dc337d268c7a8fb978636f2eff Mon Sep 17 00:00:00 2001 From: dgtlmoon Date: Fri, 30 May 2025 18:13:31 +0200 Subject: [PATCH] WIP --- changedetectionio/async_update_worker.py | 41 +- changedetectionio/flask_app.py | 1 - changedetectionio/notification_service.py | 246 +++++++++ changedetectionio/update_worker.py | 608 ---------------------- changedetectionio/worker_handler.py | 373 ++++++------- 5 files changed, 412 insertions(+), 857 deletions(-) create mode 100644 changedetectionio/notification_service.py delete mode 100644 changedetectionio/update_worker.py diff --git a/changedetectionio/async_update_worker.py b/changedetectionio/async_update_worker.py index c3e17335..54f7e603 100644 --- a/changedetectionio/async_update_worker.py +++ b/changedetectionio/async_update_worker.py @@ -406,50 +406,39 @@ def cleanup_error_artifacts(uuid, datastore): async def send_content_changed_notification(watch_uuid, notification_q, datastore): - """Helper function to queue notifications (kept sync for now)""" - # Note: This uses the original sync notification logic since notifications - # are handled by a separate thread. Could be made async later if needed. + """Helper function to queue notifications using the new notification service""" try: - # Import here to avoid circular imports - from changedetectionio.update_worker import update_worker + from changedetectionio.notification_service import create_notification_service - # Create temporary worker instance just for notification methods - temp_worker = update_worker(None, notification_q, None, datastore) - temp_worker.datastore = datastore - temp_worker.notification_q = notification_q + # Create notification service instance + notification_service = create_notification_service(datastore, notification_q) - temp_worker.send_content_changed_notification(watch_uuid) + notification_service.send_content_changed_notification(watch_uuid) except Exception as e: logger.error(f"Error sending notification for {watch_uuid}: {e}") async def send_filter_failure_notification(watch_uuid, notification_q, datastore): - """Helper function to send filter failure notifications""" + """Helper function to send filter failure notifications using the new notification service""" try: - # Import here to avoid circular imports - from changedetectionio.update_worker import update_worker + from changedetectionio.notification_service import create_notification_service - # Create temporary worker instance just for notification methods - temp_worker = update_worker(None, notification_q, None, datastore) - temp_worker.datastore = datastore - temp_worker.notification_q = notification_q + # Create notification service instance + notification_service = create_notification_service(datastore, notification_q) - temp_worker.send_filter_failure_notification(watch_uuid) + notification_service.send_filter_failure_notification(watch_uuid) except Exception as e: logger.error(f"Error sending filter failure notification for {watch_uuid}: {e}") async def send_step_failure_notification(watch_uuid, step_n, notification_q, datastore): - """Helper function to send step failure notifications""" + """Helper function to send step failure notifications using the new notification service""" try: - # Import here to avoid circular imports - from changedetectionio.update_worker import update_worker + from changedetectionio.notification_service import create_notification_service - # Create temporary worker instance just for notification methods - temp_worker = update_worker(None, notification_q, None, datastore) - temp_worker.datastore = datastore - temp_worker.notification_q = notification_q + # Create notification service instance + notification_service = create_notification_service(datastore, notification_q) - temp_worker.send_step_failure_notification(watch_uuid, step_n) + notification_service.send_step_failure_notification(watch_uuid, step_n) except Exception as e: logger.error(f"Error sending step failure notification for {watch_uuid}: {e}") \ No newline at end of file diff --git a/changedetectionio/flask_app.py b/changedetectionio/flask_app.py index 7c2b77a1..7a8c7b30 100644 --- a/changedetectionio/flask_app.py +++ b/changedetectionio/flask_app.py @@ -694,7 +694,6 @@ def notification_runner(): # Threaded runner, look for new watches to feed into the Queue. def ticker_thread_check_time_launch_checks(): import random - from changedetectionio import update_worker proxy_last_called_time = {} last_health_check = 0 diff --git a/changedetectionio/notification_service.py b/changedetectionio/notification_service.py new file mode 100644 index 00000000..5f3136b8 --- /dev/null +++ b/changedetectionio/notification_service.py @@ -0,0 +1,246 @@ +#!/usr/bin/env python3 + +""" +Notification Service Module +Extracted from update_worker.py to provide standalone notification functionality +for both sync and async workers +""" + +import time +from loguru import logger + + +class NotificationService: + """ + Standalone notification service that handles all notification functionality + previously embedded in the update_worker class + """ + + def __init__(self, datastore, notification_q): + self.datastore = datastore + self.notification_q = notification_q + + def queue_notification_for_watch(self, n_object, watch): + """ + Queue a notification for a watch with full diff rendering and template variables + """ + from changedetectionio import diff + from changedetectionio.notification import default_notification_format_for_watch + + dates = [] + trigger_text = '' + + now = time.time() + + if watch: + watch_history = watch.history + dates = list(watch_history.keys()) + trigger_text = watch.get('trigger_text', []) + + # Add text that was triggered + if len(dates): + snapshot_contents = watch.get_history_snapshot(dates[-1]) + else: + snapshot_contents = "No snapshot/history available, the watch should fetch atleast once." + + # If we ended up here with "System default" + if n_object.get('notification_format') == default_notification_format_for_watch: + n_object['notification_format'] = self.datastore.data['settings']['application'].get('notification_format') + + html_colour_enable = False + # HTML needs linebreak, but MarkDown and Text can use a linefeed + if n_object.get('notification_format') == 'HTML': + line_feed_sep = "
" + # Snapshot will be plaintext on the disk, convert to some kind of HTML + snapshot_contents = snapshot_contents.replace('\n', line_feed_sep) + elif n_object.get('notification_format') == 'HTML Color': + line_feed_sep = "
" + # Snapshot will be plaintext on the disk, convert to some kind of HTML + snapshot_contents = snapshot_contents.replace('\n', line_feed_sep) + html_colour_enable = True + else: + line_feed_sep = "\n" + + triggered_text = '' + if len(trigger_text): + from . import html_tools + triggered_text = html_tools.get_triggered_text(content=snapshot_contents, trigger_text=trigger_text) + if triggered_text: + triggered_text = line_feed_sep.join(triggered_text) + + # Could be called as a 'test notification' with only 1 snapshot available + prev_snapshot = "Example text: example test\nExample text: change detection is cool\nExample text: some more examples\n" + current_snapshot = "Example text: example test\nExample text: change detection is fantastic\nExample text: even more examples\nExample text: a lot more examples" + + if len(dates) > 1: + prev_snapshot = watch.get_history_snapshot(dates[-2]) + current_snapshot = watch.get_history_snapshot(dates[-1]) + + n_object.update({ + 'current_snapshot': snapshot_contents, + 'diff': diff.render_diff(prev_snapshot, current_snapshot, line_feed_sep=line_feed_sep, html_colour=html_colour_enable), + 'diff_added': diff.render_diff(prev_snapshot, current_snapshot, include_removed=False, line_feed_sep=line_feed_sep), + 'diff_full': diff.render_diff(prev_snapshot, current_snapshot, include_equal=True, line_feed_sep=line_feed_sep, html_colour=html_colour_enable), + 'diff_patch': diff.render_diff(prev_snapshot, current_snapshot, line_feed_sep=line_feed_sep, patch_format=True), + 'diff_removed': diff.render_diff(prev_snapshot, current_snapshot, include_added=False, line_feed_sep=line_feed_sep), + 'notification_timestamp': now, + 'screenshot': watch.get_screenshot() if watch and watch.get('notification_screenshot') else None, + 'triggered_text': triggered_text, + 'uuid': watch.get('uuid') if watch else None, + 'watch_url': watch.get('url') if watch else None, + }) + + if watch: + n_object.update(watch.extra_notification_token_values()) + + logger.trace(f"Main rendered notification placeholders (diff_added etc) calculated in {time.time()-now:.3f}s") + logger.debug("Queued notification for sending") + self.notification_q.put(n_object) + + def _check_cascading_vars(self, var_name, watch): + """ + Check notification variables in cascading priority: + Individual watch settings > Tag settings > Global settings + """ + from changedetectionio.notification import ( + default_notification_format_for_watch, + default_notification_body, + default_notification_title + ) + + # Would be better if this was some kind of Object where Watch can reference the parent datastore etc + v = watch.get(var_name) + if v and not watch.get('notification_muted'): + if var_name == 'notification_format' and v == default_notification_format_for_watch: + return self.datastore.data['settings']['application'].get('notification_format') + + return v + + tags = self.datastore.get_all_tags_for_watch(uuid=watch.get('uuid')) + if tags: + for tag_uuid, tag in tags.items(): + v = tag.get(var_name) + if v and not tag.get('notification_muted'): + return v + + if self.datastore.data['settings']['application'].get(var_name): + return self.datastore.data['settings']['application'].get(var_name) + + # Otherwise could be defaults + if var_name == 'notification_format': + return default_notification_format_for_watch + if var_name == 'notification_body': + return default_notification_body + if var_name == 'notification_title': + return default_notification_title + + return None + + def send_content_changed_notification(self, watch_uuid): + """ + Send notification when content changes are detected + """ + n_object = {} + watch = self.datastore.data['watching'].get(watch_uuid) + if not watch: + return + + watch_history = watch.history + dates = list(watch_history.keys()) + # Theoretically it's possible that this could be just 1 long, + # - In the case that the timestamp key was not unique + if len(dates) == 1: + raise ValueError( + "History index had 2 or more, but only 1 date loaded, timestamps were not unique? maybe two of the same timestamps got written, needs more delay?" + ) + + # Should be a better parent getter in the model object + + # Prefer - Individual watch settings > Tag settings > Global settings (in that order) + n_object['notification_urls'] = self._check_cascading_vars('notification_urls', watch) + n_object['notification_title'] = self._check_cascading_vars('notification_title', watch) + n_object['notification_body'] = self._check_cascading_vars('notification_body', watch) + n_object['notification_format'] = self._check_cascading_vars('notification_format', watch) + + # (Individual watch) Only prepare to notify if the rules above matched + queued = False + if n_object and n_object.get('notification_urls'): + queued = True + + count = watch.get('notification_alert_count', 0) + 1 + self.datastore.update_watch(uuid=watch_uuid, update_obj={'notification_alert_count': count}) + + self.queue_notification_for_watch(n_object=n_object, watch=watch) + + return queued + + def send_filter_failure_notification(self, watch_uuid): + """ + Send notification when CSS/XPath filters fail consecutively + """ + threshold = self.datastore.data['settings']['application'].get('filter_failure_notification_threshold_attempts') + watch = self.datastore.data['watching'].get(watch_uuid) + if not watch: + return + + n_object = {'notification_title': 'Changedetection.io - Alert - CSS/xPath filter was not present in the page', + 'notification_body': "Your configured CSS/xPath filters of '{}' for {{{{watch_url}}}} did not appear on the page after {} attempts, did the page change layout?\n\nLink: {{{{base_url}}}}/edit/{{{{watch_uuid}}}}\n\nThanks - Your omniscient changedetection.io installation :)\n".format( + ", ".join(watch['include_filters']), + threshold), + 'notification_format': 'text'} + + if len(watch['notification_urls']): + n_object['notification_urls'] = watch['notification_urls'] + + elif len(self.datastore.data['settings']['application']['notification_urls']): + n_object['notification_urls'] = self.datastore.data['settings']['application']['notification_urls'] + + # Only prepare to notify if the rules above matched + if 'notification_urls' in n_object: + n_object.update({ + 'watch_url': watch['url'], + 'uuid': watch_uuid, + 'screenshot': None + }) + self.notification_q.put(n_object) + logger.debug(f"Sent filter not found notification for {watch_uuid}") + else: + logger.debug(f"NOT sending filter not found notification for {watch_uuid} - no notification URLs") + + def send_step_failure_notification(self, watch_uuid, step_n): + """ + Send notification when browser steps fail consecutively + """ + watch = self.datastore.data['watching'].get(watch_uuid, False) + if not watch: + return + threshold = self.datastore.data['settings']['application'].get('filter_failure_notification_threshold_attempts') + n_object = {'notification_title': "Changedetection.io - Alert - Browser step at position {} could not be run".format(step_n+1), + 'notification_body': "Your configured browser step at position {} for {{{{watch_url}}}} " + "did not appear on the page after {} attempts, did the page change layout? " + "Does it need a delay added?\n\nLink: {{{{base_url}}}}/edit/{{{{watch_uuid}}}}\n\n" + "Thanks - Your omniscient changedetection.io installation :)\n".format(step_n+1, threshold), + 'notification_format': 'text'} + + if len(watch['notification_urls']): + n_object['notification_urls'] = watch['notification_urls'] + + elif len(self.datastore.data['settings']['application']['notification_urls']): + n_object['notification_urls'] = self.datastore.data['settings']['application']['notification_urls'] + + # Only prepare to notify if the rules above matched + if 'notification_urls' in n_object: + n_object.update({ + 'watch_url': watch['url'], + 'uuid': watch_uuid + }) + self.notification_q.put(n_object) + logger.error(f"Sent step not found notification for {watch_uuid}") + + +# Convenience functions for creating notification service instances +def create_notification_service(datastore, notification_q): + """ + Factory function to create a NotificationService instance + """ + return NotificationService(datastore, notification_q) \ No newline at end of file diff --git a/changedetectionio/update_worker.py b/changedetectionio/update_worker.py deleted file mode 100644 index 53d2d798..00000000 --- a/changedetectionio/update_worker.py +++ /dev/null @@ -1,608 +0,0 @@ -from .processors.exceptions import ProcessorException -import changedetectionio.content_fetchers.exceptions as content_fetchers_exceptions -from changedetectionio.processors.text_json_diff.processor import FilterNotFoundInResponse -from changedetectionio import html_tools -from changedetectionio.flask_app import watch_check_update - -import importlib -import os -import queue -import threading -import time - -# A single update worker -# -# Requests for checking on a single site(watch) from a queue of watches -# (another process inserts watches into the queue that are time-ready for checking) - -from loguru import logger - -class update_worker(threading.Thread): - current_uuid = None - - def __init__(self, q, notification_q, app, datastore, *args, **kwargs): - self.q = q - self.app = app - self.notification_q = notification_q - self.datastore = datastore - super().__init__(*args, **kwargs) - - def queue_notification_for_watch(self, notification_q, n_object, watch): - from changedetectionio import diff - from changedetectionio.notification import default_notification_format_for_watch - - dates = [] - trigger_text = '' - - now = time.time() - - if watch: - watch_history = watch.history - dates = list(watch_history.keys()) - trigger_text = watch.get('trigger_text', []) - - # Add text that was triggered - if len(dates): - snapshot_contents = watch.get_history_snapshot(dates[-1]) - else: - snapshot_contents = "No snapshot/history available, the watch should fetch atleast once." - - # If we ended up here with "System default" - if n_object.get('notification_format') == default_notification_format_for_watch: - n_object['notification_format'] = self.datastore.data['settings']['application'].get('notification_format') - - html_colour_enable = False - # HTML needs linebreak, but MarkDown and Text can use a linefeed - if n_object.get('notification_format') == 'HTML': - line_feed_sep = "
" - # Snapshot will be plaintext on the disk, convert to some kind of HTML - snapshot_contents = snapshot_contents.replace('\n', line_feed_sep) - elif n_object.get('notification_format') == 'HTML Color': - line_feed_sep = "
" - # Snapshot will be plaintext on the disk, convert to some kind of HTML - snapshot_contents = snapshot_contents.replace('\n', line_feed_sep) - html_colour_enable = True - else: - line_feed_sep = "\n" - - triggered_text = '' - if len(trigger_text): - from . import html_tools - triggered_text = html_tools.get_triggered_text(content=snapshot_contents, trigger_text=trigger_text) - if triggered_text: - triggered_text = line_feed_sep.join(triggered_text) - - # Could be called as a 'test notification' with only 1 snapshot available - prev_snapshot = "Example text: example test\nExample text: change detection is cool\nExample text: some more examples\n" - current_snapshot = "Example text: example test\nExample text: change detection is fantastic\nExample text: even more examples\nExample text: a lot more examples" - - if len(dates) > 1: - prev_snapshot = watch.get_history_snapshot(dates[-2]) - current_snapshot = watch.get_history_snapshot(dates[-1]) - - n_object.update({ - 'current_snapshot': snapshot_contents, - 'diff': diff.render_diff(prev_snapshot, current_snapshot, line_feed_sep=line_feed_sep, html_colour=html_colour_enable), - 'diff_added': diff.render_diff(prev_snapshot, current_snapshot, include_removed=False, line_feed_sep=line_feed_sep), - 'diff_full': diff.render_diff(prev_snapshot, current_snapshot, include_equal=True, line_feed_sep=line_feed_sep, html_colour=html_colour_enable), - 'diff_patch': diff.render_diff(prev_snapshot, current_snapshot, line_feed_sep=line_feed_sep, patch_format=True), - 'diff_removed': diff.render_diff(prev_snapshot, current_snapshot, include_added=False, line_feed_sep=line_feed_sep), - 'notification_timestamp': now, - 'screenshot': watch.get_screenshot() if watch and watch.get('notification_screenshot') else None, - 'triggered_text': triggered_text, - 'uuid': watch.get('uuid') if watch else None, - 'watch_url': watch.get('url') if watch else None, - }) - - if watch: - n_object.update(watch.extra_notification_token_values()) - - logger.trace(f"Main rendered notification placeholders (diff_added etc) calculated in {time.time()-now:.3f}s") - logger.debug("Queued notification for sending") - notification_q.put(n_object) - - # Prefer - Individual watch settings > Tag settings > Global settings (in that order) - def _check_cascading_vars(self, var_name, watch): - - from changedetectionio.notification import ( - default_notification_format_for_watch, - default_notification_body, - default_notification_title - ) - - # Would be better if this was some kind of Object where Watch can reference the parent datastore etc - v = watch.get(var_name) - if v and not watch.get('notification_muted'): - if var_name == 'notification_format' and v == default_notification_format_for_watch: - return self.datastore.data['settings']['application'].get('notification_format') - - return v - - tags = self.datastore.get_all_tags_for_watch(uuid=watch.get('uuid')) - if tags: - for tag_uuid, tag in tags.items(): - v = tag.get(var_name) - if v and not tag.get('notification_muted'): - return v - - if self.datastore.data['settings']['application'].get(var_name): - return self.datastore.data['settings']['application'].get(var_name) - - # Otherwise could be defaults - if var_name == 'notification_format': - return default_notification_format_for_watch - if var_name == 'notification_body': - return default_notification_body - if var_name == 'notification_title': - return default_notification_title - - return None - - def send_content_changed_notification(self, watch_uuid): - - n_object = {} - watch = self.datastore.data['watching'].get(watch_uuid) - if not watch: - return - - watch_history = watch.history - dates = list(watch_history.keys()) - # Theoretically it's possible that this could be just 1 long, - # - In the case that the timestamp key was not unique - if len(dates) == 1: - raise ValueError( - "History index had 2 or more, but only 1 date loaded, timestamps were not unique? maybe two of the same timestamps got written, needs more delay?" - ) - - # Should be a better parent getter in the model object - - # Prefer - Individual watch settings > Tag settings > Global settings (in that order) - n_object['notification_urls'] = self._check_cascading_vars('notification_urls', watch) - n_object['notification_title'] = self._check_cascading_vars('notification_title', watch) - n_object['notification_body'] = self._check_cascading_vars('notification_body', watch) - n_object['notification_format'] = self._check_cascading_vars('notification_format', watch) - - # (Individual watch) Only prepare to notify if the rules above matched - queued = False - if n_object and n_object.get('notification_urls'): - queued = True - - count = watch.get('notification_alert_count', 0) + 1 - self.datastore.update_watch(uuid=watch_uuid, update_obj={'notification_alert_count': count}) - - self.queue_notification_for_watch(notification_q=self.notification_q, n_object=n_object, watch=watch) - - return queued - - - def send_filter_failure_notification(self, watch_uuid): - - threshold = self.datastore.data['settings']['application'].get('filter_failure_notification_threshold_attempts') - watch = self.datastore.data['watching'].get(watch_uuid) - if not watch: - return - - n_object = {'notification_title': 'Changedetection.io - Alert - CSS/xPath filter was not present in the page', - 'notification_body': "Your configured CSS/xPath filters of '{}' for {{{{watch_url}}}} did not appear on the page after {} attempts, did the page change layout?\n\nLink: {{{{base_url}}}}/edit/{{{{watch_uuid}}}}\n\nThanks - Your omniscient changedetection.io installation :)\n".format( - ", ".join(watch['include_filters']), - threshold), - 'notification_format': 'text'} - - if len(watch['notification_urls']): - n_object['notification_urls'] = watch['notification_urls'] - - elif len(self.datastore.data['settings']['application']['notification_urls']): - n_object['notification_urls'] = self.datastore.data['settings']['application']['notification_urls'] - - # Only prepare to notify if the rules above matched - if 'notification_urls' in n_object: - n_object.update({ - 'watch_url': watch['url'], - 'uuid': watch_uuid, - 'screenshot': None - }) - self.notification_q.put(n_object) - logger.debug(f"Sent filter not found notification for {watch_uuid}") - else: - logger.debug(f"NOT sending filter not found notification for {watch_uuid} - no notification URLs") - - def send_step_failure_notification(self, watch_uuid, step_n): - watch = self.datastore.data['watching'].get(watch_uuid, False) - if not watch: - return - threshold = self.datastore.data['settings']['application'].get('filter_failure_notification_threshold_attempts') - n_object = {'notification_title': "Changedetection.io - Alert - Browser step at position {} could not be run".format(step_n+1), - 'notification_body': "Your configured browser step at position {} for {{{{watch_url}}}} " - "did not appear on the page after {} attempts, did the page change layout? " - "Does it need a delay added?\n\nLink: {{{{base_url}}}}/edit/{{{{watch_uuid}}}}\n\n" - "Thanks - Your omniscient changedetection.io installation :)\n".format(step_n+1, threshold), - 'notification_format': 'text'} - - if len(watch['notification_urls']): - n_object['notification_urls'] = watch['notification_urls'] - - elif len(self.datastore.data['settings']['application']['notification_urls']): - n_object['notification_urls'] = self.datastore.data['settings']['application']['notification_urls'] - - # Only prepare to notify if the rules above matched - if 'notification_urls' in n_object: - n_object.update({ - 'watch_url': watch['url'], - 'uuid': watch_uuid - }) - self.notification_q.put(n_object) - logger.error(f"Sent step not found notification for {watch_uuid}") - - - def cleanup_error_artifacts(self, uuid): - # All went fine, remove error artifacts - cleanup_files = ["last-error-screenshot.png", "last-error.txt"] - for f in cleanup_files: - full_path = os.path.join(self.datastore.datastore_path, uuid, f) - if os.path.isfile(full_path): - os.unlink(full_path) - - def run(self): - - while not self.app.config.exit.is_set(): - update_handler = None - watch = None - - try: - queued_item_data = self.q.get(block=False) - except queue.Empty: - pass - else: - uuid = queued_item_data.item.get('uuid') - fetch_start_time = round(time.time()) # Also used for a unique history key for now - self.current_uuid = uuid - if uuid in list(self.datastore.data['watching'].keys()) and self.datastore.data['watching'][uuid].get('url'): - changed_detected = False - contents = b'' - process_changedetection_results = True - update_obj = {} - - - # Clear last errors (move to preflight func?) - self.datastore.data['watching'][uuid]['browser_steps_last_error_step'] = None - self.datastore.data['watching'][uuid]['last_checked'] = fetch_start_time - - watch = self.datastore.data['watching'].get(uuid) - - logger.info(f"Processing watch UUID {uuid} Priority {queued_item_data.priority} URL {watch['url']}") - - try: - watch_check_update.send(watch_uuid=uuid) - - # Processor is what we are using for detecting the "Change" - processor = watch.get('processor', 'text_json_diff') - - # Init a new 'difference_detection_processor', first look in processors - processor_module_name = f"changedetectionio.processors.{processor}.processor" - try: - processor_module = importlib.import_module(processor_module_name) - except ModuleNotFoundError as e: - print(f"Processor module '{processor}' not found.") - raise e - - update_handler = processor_module.perform_site_check(datastore=self.datastore, - watch_uuid=uuid - ) - - update_handler.call_browser() - - changed_detected, update_obj, contents = update_handler.run_changedetection(watch=watch) - - # Re #342 - # In Python 3, all strings are sequences of Unicode characters. There is a bytes type that holds raw bytes. - # We then convert/.decode('utf-8') for the notification etc -# if not isinstance(contents, (bytes, bytearray)): -# raise Exception("Error - returned data from the fetch handler SHOULD be bytes") - except PermissionError as e: - logger.critical(f"File permission error updating file, watch: {uuid}") - logger.critical(str(e)) - process_changedetection_results = False - - # A generic other-exception thrown by processors - except ProcessorException as e: - if e.screenshot: - watch.save_screenshot(screenshot=e.screenshot) - if e.xpath_data: - watch.save_xpath_data(data=e.xpath_data) - self.datastore.update_watch(uuid=uuid, update_obj={'last_error': e.message}) - process_changedetection_results = False - - except content_fetchers_exceptions.ReplyWithContentButNoText as e: - # Totally fine, it's by choice - just continue on, nothing more to care about - # Page had elements/content but no renderable text - # Backend (not filters) gave zero output - extra_help = "" - if e.has_filters: - # Maybe it contains an image? offer a more helpful link - has_img = html_tools.include_filters(include_filters='img', - html_content=e.html_content) - if has_img: - extra_help = ", it's possible that the filters you have give an empty result or contain only an image." - else: - extra_help = ", it's possible that the filters were found, but contained no usable text." - - self.datastore.update_watch(uuid=uuid, update_obj={ - 'last_error': f"Got HTML content but no text found (With {e.status_code} reply code){extra_help}" - }) - - if e.screenshot: - watch.save_screenshot(screenshot=e.screenshot, as_error=True) - - if e.xpath_data: - watch.save_xpath_data(data=e.xpath_data) - - process_changedetection_results = False - - except content_fetchers_exceptions.Non200ErrorCodeReceived as e: - if e.status_code == 403: - err_text = "Error - 403 (Access denied) received" - elif e.status_code == 404: - err_text = "Error - 404 (Page not found) received" - elif e.status_code == 407: - err_text = "Error - 407 (Proxy authentication required) received, did you need a username and password for the proxy?" - elif e.status_code == 500: - err_text = "Error - 500 (Internal server error) received from the web site" - else: - extra = ' (Access denied or blocked)' if str(e.status_code).startswith('4') else '' - err_text = f"Error - Request returned a HTTP error code {e.status_code}{extra}" - - if e.screenshot: - watch.save_screenshot(screenshot=e.screenshot, as_error=True) - if e.xpath_data: - watch.save_xpath_data(data=e.xpath_data, as_error=True) - if e.page_text: - watch.save_error_text(contents=e.page_text) - - self.datastore.update_watch(uuid=uuid, update_obj={'last_error': err_text}) - process_changedetection_results = False - - except FilterNotFoundInResponse as e: - if not self.datastore.data['watching'].get(uuid): - continue - - err_text = "Warning, no filters were found, no change detection ran - Did the page change layout? update your Visual Filter if necessary." - self.datastore.update_watch(uuid=uuid, update_obj={'last_error': err_text}) - - # Filter wasnt found, but we should still update the visual selector so that they can have a chance to set it up again - if e.screenshot: - watch.save_screenshot(screenshot=e.screenshot) - - if e.xpath_data: - watch.save_xpath_data(data=e.xpath_data) - - # Only when enabled, send the notification - if watch.get('filter_failure_notification_send', False): - c = watch.get('consecutive_filter_failures', 0) - c += 1 - # Send notification if we reached the threshold? - threshold = self.datastore.data['settings']['application'].get('filter_failure_notification_threshold_attempts', 0) - logger.debug(f"Filter for {uuid} not found, consecutive_filter_failures: {c} of threshold {threshold}") - if c >= threshold: - if not watch.get('notification_muted'): - logger.debug(f"Sending filter failed notification for {uuid}") - self.send_filter_failure_notification(uuid) - c = 0 - logger.debug(f"Reset filter failure count back to zero") - - self.datastore.update_watch(uuid=uuid, update_obj={'consecutive_filter_failures': c}) - else: - logger.trace(f"{uuid} - filter_failure_notification_send not enabled, skipping") - - - process_changedetection_results = False - - except content_fetchers_exceptions.checksumFromPreviousCheckWasTheSame as e: - # Yes fine, so nothing todo, don't continue to process. - process_changedetection_results = False - changed_detected = False - except content_fetchers_exceptions.BrowserConnectError as e: - self.datastore.update_watch(uuid=uuid, - update_obj={'last_error': e.msg - } - ) - process_changedetection_results = False - except content_fetchers_exceptions.BrowserFetchTimedOut as e: - self.datastore.update_watch(uuid=uuid, - update_obj={'last_error': e.msg - } - ) - process_changedetection_results = False - except content_fetchers_exceptions.BrowserStepsStepException as e: - - if not self.datastore.data['watching'].get(uuid): - continue - - error_step = e.step_n + 1 - from playwright._impl._errors import TimeoutError, Error - - # Generally enough info for TimeoutError (couldnt locate the element after default seconds) - err_text = f"Browser step at position {error_step} could not run, check the watch, add a delay if necessary, view Browser Steps to see screenshot at that step." - - if e.original_e.name == "TimeoutError": - # Just the first line is enough, the rest is the stack trace - err_text += " Could not find the target." - else: - # Other Error, more info is good. - err_text += " " + str(e.original_e).splitlines()[0] - - logger.debug(f"BrowserSteps exception at step {error_step} {str(e.original_e)}") - - self.datastore.update_watch(uuid=uuid, - update_obj={'last_error': err_text, - 'browser_steps_last_error_step': error_step - } - ) - - if watch.get('filter_failure_notification_send', False): - c = watch.get('consecutive_filter_failures', 0) - c += 1 - # Send notification if we reached the threshold? - threshold = self.datastore.data['settings']['application'].get('filter_failure_notification_threshold_attempts', - 0) - logger.error(f"Step for {uuid} not found, consecutive_filter_failures: {c}") - if threshold > 0 and c >= threshold: - if not watch.get('notification_muted'): - self.send_step_failure_notification(watch_uuid=uuid, step_n=e.step_n) - c = 0 - - self.datastore.update_watch(uuid=uuid, update_obj={'consecutive_filter_failures': c}) - - process_changedetection_results = False - - except content_fetchers_exceptions.EmptyReply as e: - # Some kind of custom to-str handler in the exception handler that does this? - err_text = "EmptyReply - try increasing 'Wait seconds before extracting text', Status Code {}".format(e.status_code) - self.datastore.update_watch(uuid=uuid, update_obj={'last_error': err_text, - 'last_check_status': e.status_code}) - process_changedetection_results = False - except content_fetchers_exceptions.ScreenshotUnavailable as e: - err_text = "Screenshot unavailable, page did not render fully in the expected time or page was too long - try increasing 'Wait seconds before extracting text'" - self.datastore.update_watch(uuid=uuid, update_obj={'last_error': err_text, - 'last_check_status': e.status_code}) - process_changedetection_results = False - except content_fetchers_exceptions.JSActionExceptions as e: - err_text = "Error running JS Actions - Page request - "+e.message - if e.screenshot: - watch.save_screenshot(screenshot=e.screenshot, as_error=True) - self.datastore.update_watch(uuid=uuid, update_obj={'last_error': err_text, - 'last_check_status': e.status_code}) - process_changedetection_results = False - except content_fetchers_exceptions.PageUnloadable as e: - err_text = "Page request from server didnt respond correctly" - if e.message: - err_text = "{} - {}".format(err_text, e.message) - - if e.screenshot: - watch.save_screenshot(screenshot=e.screenshot, as_error=True) - - self.datastore.update_watch(uuid=uuid, update_obj={'last_error': err_text, - 'last_check_status': e.status_code, - 'has_ldjson_price_data': None}) - process_changedetection_results = False - except content_fetchers_exceptions.BrowserStepsInUnsupportedFetcher as e: - err_text = "This watch has Browser Steps configured and so it cannot run with the 'Basic fast Plaintext/HTTP Client', either remove the Browser Steps or select a Chrome fetcher." - self.datastore.update_watch(uuid=uuid, update_obj={'last_error': err_text}) - process_changedetection_results = False - logger.error(f"Exception (BrowserStepsInUnsupportedFetcher) reached processing watch UUID: {uuid}") - - except Exception as e: - logger.error(f"Exception reached processing watch UUID: {uuid}") - logger.error(str(e)) - self.datastore.update_watch(uuid=uuid, update_obj={'last_error': "Exception: " + str(e)}) - # Other serious error - process_changedetection_results = False - - else: - # Crash protection, the watch entry could have been removed by this point (during a slow chrome fetch etc) - if not self.datastore.data['watching'].get(uuid): - continue - - update_obj['content-type'] = update_handler.fetcher.get_all_headers().get('content-type', '').lower() - - # Mark that we never had any failures - if not watch.get('ignore_status_codes'): - update_obj['consecutive_filter_failures'] = 0 - - # Everything ran OK, clean off any previous error - update_obj['last_error'] = False - - self.cleanup_error_artifacts(uuid) - - if not self.datastore.data['watching'].get(uuid): - continue - - # Different exceptions mean that we may or may not want to bump the snapshot, trigger notifications etc - if process_changedetection_results: - - # Extract as title if possible/requested. - if self.datastore.data['settings']['application'].get('extract_title_as_title') or watch['extract_title_as_title']: - if not watch['title'] or not len(watch['title']): - try: - update_obj['title'] = html_tools.extract_element(find='title', html_content=update_handler.fetcher.content) - logger.info(f"UUID: {uuid} Extract <title> updated title to '{update_obj['title']}") - except Exception as e: - logger.warning(f"UUID: {uuid} Extract <title> as watch title was enabled, but couldn't find a <title>.") - - try: - self.datastore.update_watch(uuid=uuid, update_obj=update_obj) - - - # Also save the snapshot on the first time checked, "last checked" will always be updated, so we just check history length. - if changed_detected or not watch.history_n: - - if update_handler.screenshot: - watch.save_screenshot(screenshot=update_handler.screenshot) - - if update_handler.xpath_data: - watch.save_xpath_data(data=update_handler.xpath_data) - - # Small hack so that we sleep just enough to allow 1 second between history snapshots - # this is because history.txt indexes/keys snapshots by epoch seconds and we dont want dupe keys - # @also - the keys are one per second at the most (for now) - if watch.newest_history_key and int(fetch_start_time) == int(watch.newest_history_key): - logger.warning( - f"Timestamp {fetch_start_time} already exists, waiting 1 seconds so we have a unique key in history.txt") - fetch_start_time += 1 - time.sleep(1) - - watch.save_history_text(contents=contents, - timestamp=int(fetch_start_time), - snapshot_id=update_obj.get('previous_md5', 'none')) - - - empty_pages_are_a_change = self.datastore.data['settings']['application'].get('empty_pages_are_a_change', False) - if update_handler.fetcher.content or (not update_handler.fetcher.content and empty_pages_are_a_change): - # attribute .last_changed is then based on this data - watch.save_last_fetched_html(contents=update_handler.fetcher.content, timestamp=int(fetch_start_time)) - - # Notifications should only trigger on the second time (first time, we gather the initial snapshot) - if watch.history_n >= 2: - logger.info(f"Change detected in UUID {uuid} - {watch['url']}") - if not watch.get('notification_muted'): - # @todo only run this if notifications exist - self.send_content_changed_notification(watch_uuid=uuid) - - except Exception as e: - # Catch everything possible here, so that if a worker crashes, we don't lose it until restart! - logger.critical("!!!! Exception in update_worker while processing process_changedetection_results !!!") - logger.critical(str(e)) - self.datastore.update_watch(uuid=uuid, update_obj={'last_error': str(e)}) - - - # Always record that we atleast tried - count = watch.get('check_count', 0) + 1 - - # Record the 'server' header reply, can be used for actions in the future like cloudflare/akamai workarounds - try: - server_header = update_handler.fetcher.headers.get('server', '').strip().lower()[:255] - self.datastore.update_watch(uuid=uuid, - update_obj={'remote_server_reply': server_header} - ) - except Exception as e: - pass - - self.datastore.update_watch(uuid=uuid, update_obj={'fetch_time': round(time.time() - fetch_start_time, 3), - 'check_count': count - }) - - self.current_uuid = None # Done - self.q.task_done() - - # Send signal for watch check completion with the watch data - if watch: - logger.info(f"Sending watch_check_update signal for UUID {watch['uuid']}") - watch_check_update.send(watch_uuid=watch['uuid']) - - update_handler = None - logger.debug(f"Watch {uuid} done in {time.time()-fetch_start_time:.2f}s") - - - # Give the CPU time to interrupt - time.sleep(0.1) - - self.app.config.exit.wait(1) diff --git a/changedetectionio/worker_handler.py b/changedetectionio/worker_handler.py index d2ed6717..9d517030 100644 --- a/changedetectionio/worker_handler.py +++ b/changedetectionio/worker_handler.py @@ -1,8 +1,8 @@ """ Worker management module for changedetection.io -Handles both synchronous threaded workers and asynchronous workers, -providing a unified interface for dynamic worker scaling. +Handles asynchronous workers for dynamic worker scaling. +Sync worker support has been removed in favor of async-only architecture. """ import asyncio @@ -12,7 +12,6 @@ import time from loguru import logger # Global worker state -running_update_threads = [] running_async_tasks = [] async_loop = None async_loop_thread = None @@ -20,7 +19,7 @@ async_loop_thread = None # Track currently processing UUIDs for async workers currently_processing_uuids = set() -# Configuration +# Configuration - async workers only USE_ASYNC_WORKERS = True @@ -86,96 +85,51 @@ async def start_single_async_worker(worker_id, update_q, notification_q, app, da logger.info(f"Async worker {worker_id} shutdown complete") -def start_sync_workers(n_workers, update_q, notification_q, app, datastore): - """Start traditional threaded workers""" - global running_update_threads - from changedetectionio import update_worker - - logger.info(f"Starting {n_workers} sync workers") - for _ in range(n_workers): - new_worker = update_worker.update_worker(update_q, notification_q, app, datastore) - running_update_threads.append(new_worker) - new_worker.start() - - def start_workers(n_workers, update_q, notification_q, app, datastore): - """Start workers based on configuration""" - if USE_ASYNC_WORKERS: - start_async_workers(n_workers, update_q, notification_q, app, datastore) - else: - start_sync_workers(n_workers, update_q, notification_q, app, datastore) + """Start async workers - sync workers are deprecated""" + start_async_workers(n_workers, update_q, notification_q, app, datastore) def add_worker(update_q, notification_q, app, datastore): - """Add a new worker (for dynamic scaling)""" - global running_async_tasks, running_update_threads + """Add a new async worker (for dynamic scaling)""" + global running_async_tasks - if USE_ASYNC_WORKERS: - if not async_loop: - logger.error("Async loop not running, cannot add worker") - return False - - worker_id = len(running_async_tasks) - logger.info(f"Adding async worker {worker_id}") + if not async_loop: + logger.error("Async loop not running, cannot add worker") + return False - task_future = asyncio.run_coroutine_threadsafe( - start_single_async_worker(worker_id, update_q, notification_q, app, datastore), async_loop - ) - running_async_tasks.append(task_future) - return True - else: - # Add sync worker - from changedetectionio import update_worker - logger.info(f"Adding sync worker {len(running_update_threads)}") - - new_worker = update_worker.update_worker(update_q, notification_q, app, datastore) - running_update_threads.append(new_worker) - new_worker.start() - return True + worker_id = len(running_async_tasks) + logger.info(f"Adding async worker {worker_id}") + + task_future = asyncio.run_coroutine_threadsafe( + start_single_async_worker(worker_id, update_q, notification_q, app, datastore), async_loop + ) + running_async_tasks.append(task_future) + return True def remove_worker(): - """Remove a worker (for dynamic scaling)""" - global running_async_tasks, running_update_threads + """Remove an async worker (for dynamic scaling)""" + global running_async_tasks - if USE_ASYNC_WORKERS: - if not running_async_tasks: - return False - - # Cancel the last worker - task_future = running_async_tasks.pop() - task_future.cancel() - logger.info(f"Removed async worker, {len(running_async_tasks)} workers remaining") - return True - else: - if not running_update_threads: - return False - - # Stop the last worker - worker = running_update_threads.pop() - # Note: Graceful shutdown would require adding stop mechanism to update_worker - logger.info(f"Removed sync worker, {len(running_update_threads)} workers remaining") - return True + if not running_async_tasks: + return False + + # Cancel the last worker + task_future = running_async_tasks.pop() + task_future.cancel() + logger.info(f"Removed async worker, {len(running_async_tasks)} workers remaining") + return True def get_worker_count(): - """Get current number of workers""" - if USE_ASYNC_WORKERS: - return len(running_async_tasks) - else: - return len(running_update_threads) + """Get current number of async workers""" + return len(running_async_tasks) def get_running_uuids(): - """Get list of UUIDs currently being processed""" - if USE_ASYNC_WORKERS: - return list(currently_processing_uuids) - else: - running_uuids = [] - for t in running_update_threads: - if hasattr(t, 'current_uuid') and t.current_uuid: - running_uuids.append(t.current_uuid) - return running_uuids + """Get list of UUIDs currently being processed by async workers""" + return list(currently_processing_uuids) def set_uuid_processing(uuid, processing=True): @@ -195,44 +149,36 @@ def is_watch_running(watch_uuid): def queue_item_async_safe(update_q, item): - """Queue an item in a way that works with both sync and async queues""" - if USE_ASYNC_WORKERS and async_loop: + """Queue an item for async queue processing""" + if async_loop: # For async queue, schedule the put operation asyncio.run_coroutine_threadsafe(update_q.put(item), async_loop) else: - # For sync queue, put directly - update_q.put(item) + logger.error("Async loop not available for queueing item") def shutdown_workers(): - """Shutdown all workers gracefully""" - global async_loop, async_loop_thread, running_async_tasks, running_update_threads + """Shutdown all async workers gracefully""" + global async_loop, async_loop_thread, running_async_tasks - logger.info("Shutting down workers...") + logger.info("Shutting down async workers...") - if USE_ASYNC_WORKERS: - # Cancel all async tasks - for task_future in running_async_tasks: - task_future.cancel() - running_async_tasks.clear() + # Cancel all async tasks + for task_future in running_async_tasks: + task_future.cancel() + running_async_tasks.clear() + + # Stop the async event loop + if async_loop: + async_loop.call_soon_threadsafe(async_loop.stop) + async_loop = None - # Stop the async event loop - if async_loop: - async_loop.call_soon_threadsafe(async_loop.stop) - async_loop = None - - # Wait for the async thread to finish - if async_loop_thread and async_loop_thread.is_alive(): - async_loop_thread.join(timeout=5) - async_loop_thread = None - else: - # Stop sync workers - for worker in running_update_threads: - # Note: Would need to add proper stop mechanism to update_worker - pass - running_update_threads.clear() + # Wait for the async thread to finish + if async_loop_thread and async_loop_thread.is_alive(): + async_loop_thread.join(timeout=5) + async_loop_thread = None - logger.info("Workers shutdown complete") + logger.info("Async workers shutdown complete") def adjust_async_worker_count(new_count, update_q=None, notification_q=None, app=None, datastore=None): @@ -246,7 +192,7 @@ def adjust_async_worker_count(new_count, update_q=None, notification_q=None, app Returns: dict: Status of the adjustment operation """ - global running_async_tasks, running_update_threads + global running_async_tasks current_count = get_worker_count() @@ -257,79 +203,71 @@ def adjust_async_worker_count(new_count, update_q=None, notification_q=None, app 'current_count': current_count } - if USE_ASYNC_WORKERS: - if new_count > current_count: - # Add workers - workers_to_add = new_count - current_count - logger.info(f"Adding {workers_to_add} async workers (from {current_count} to {new_count})") - - if not all([update_q, notification_q, app, datastore]): - return { - 'status': 'error', - 'message': 'Missing required parameters to add workers', - 'current_count': current_count - } - - for i in range(workers_to_add): - worker_id = len(running_async_tasks) - task_future = asyncio.run_coroutine_threadsafe( - start_single_async_worker(worker_id, update_q, notification_q, app, datastore), - async_loop - ) - running_async_tasks.append(task_future) - + if new_count > current_count: + # Add workers + workers_to_add = new_count - current_count + logger.info(f"Adding {workers_to_add} async workers (from {current_count} to {new_count})") + + if not all([update_q, notification_q, app, datastore]): return { - 'status': 'success', - 'message': f'Added {workers_to_add} workers', - 'previous_count': current_count, - 'current_count': new_count + 'status': 'error', + 'message': 'Missing required parameters to add workers', + 'current_count': current_count } - - else: - # Remove workers - workers_to_remove = current_count - new_count - logger.info(f"Removing {workers_to_remove} async workers (from {current_count} to {new_count})") - - removed_count = 0 - for _ in range(workers_to_remove): - if running_async_tasks: - task_future = running_async_tasks.pop() - task_future.cancel() - # Wait for the task to actually stop - try: - task_future.result(timeout=5) # 5 second timeout - except Exception: - pass # Task was cancelled, which is expected - removed_count += 1 - - return { - 'status': 'success', - 'message': f'Removed {removed_count} workers', - 'previous_count': current_count, - 'current_count': current_count - removed_count - } - else: - # Sync workers - more complex to adjust dynamically + + for i in range(workers_to_add): + worker_id = len(running_async_tasks) + task_future = asyncio.run_coroutine_threadsafe( + start_single_async_worker(worker_id, update_q, notification_q, app, datastore), + async_loop + ) + running_async_tasks.append(task_future) + return { - 'status': 'not_supported', - 'message': 'Dynamic worker adjustment not supported for sync workers', - 'current_count': current_count + 'status': 'success', + 'message': f'Added {workers_to_add} workers', + 'previous_count': current_count, + 'current_count': new_count + } + + else: + # Remove workers + workers_to_remove = current_count - new_count + logger.info(f"Removing {workers_to_remove} async workers (from {current_count} to {new_count})") + + removed_count = 0 + for _ in range(workers_to_remove): + if running_async_tasks: + task_future = running_async_tasks.pop() + task_future.cancel() + # Wait for the task to actually stop + try: + task_future.result(timeout=5) # 5 second timeout + except Exception: + pass # Task was cancelled, which is expected + removed_count += 1 + + return { + 'status': 'success', + 'message': f'Removed {removed_count} workers', + 'previous_count': current_count, + 'current_count': current_count - removed_count } def get_worker_status(): - """Get status information about workers""" + """Get status information about async workers""" return { - 'worker_type': 'async' if USE_ASYNC_WORKERS else 'sync', + 'worker_type': 'async', 'worker_count': get_worker_count(), 'running_uuids': get_running_uuids(), - 'async_loop_running': async_loop is not None if USE_ASYNC_WORKERS else None, + 'async_loop_running': async_loop is not None, } def check_worker_health(expected_count, update_q=None, notification_q=None, app=None, datastore=None): """ - Check if the expected number of workers are running and restart any missing ones. + Check if the expected number of async workers are running and restart any missing ones. Args: expected_count: Expected number of workers @@ -338,7 +276,7 @@ def check_worker_health(expected_count, update_q=None, notification_q=None, app= Returns: dict: Health check results """ - global running_async_tasks, running_update_threads + global running_async_tasks current_count = get_worker_count() @@ -347,62 +285,53 @@ def check_worker_health(expected_count, update_q=None, notification_q=None, app= 'status': 'healthy', 'expected_count': expected_count, 'actual_count': current_count, - 'message': f'All {expected_count} workers running' + 'message': f'All {expected_count} async workers running' } - if USE_ASYNC_WORKERS: - # Check for crashed async workers - dead_workers = [] - alive_count = 0 + # Check for crashed async workers + dead_workers = [] + alive_count = 0 + + for i, task_future in enumerate(running_async_tasks[:]): + if task_future.done(): + try: + result = task_future.result() + dead_workers.append(i) + logger.warning(f"Async worker {i} completed unexpectedly") + except Exception as e: + dead_workers.append(i) + logger.error(f"Async worker {i} crashed: {e}") + else: + alive_count += 1 + + # Remove dead workers from tracking + for i in reversed(dead_workers): + if i < len(running_async_tasks): + running_async_tasks.pop(i) + + missing_workers = expected_count - alive_count + restarted_count = 0 + + if missing_workers > 0 and all([update_q, notification_q, app, datastore]): + logger.info(f"Restarting {missing_workers} crashed async workers") - for i, task_future in enumerate(running_async_tasks[:]): - if task_future.done(): - try: - result = task_future.result() - dead_workers.append(i) - logger.warning(f"Async worker {i} completed unexpectedly") - except Exception as e: - dead_workers.append(i) - logger.error(f"Async worker {i} crashed: {e}") - else: - alive_count += 1 - - # Remove dead workers from tracking - for i in reversed(dead_workers): - if i < len(running_async_tasks): - running_async_tasks.pop(i) - - missing_workers = expected_count - alive_count - restarted_count = 0 - - if missing_workers > 0 and all([update_q, notification_q, app, datastore]): - logger.info(f"Restarting {missing_workers} crashed async workers") - - for i in range(missing_workers): - worker_id = alive_count + i - try: - task_future = asyncio.run_coroutine_threadsafe( - start_single_async_worker(worker_id, update_q, notification_q, app, datastore), - async_loop - ) - running_async_tasks.append(task_future) - restarted_count += 1 - except Exception as e: - logger.error(f"Failed to restart worker {worker_id}: {e}") - - return { - 'status': 'repaired' if restarted_count > 0 else 'degraded', - 'expected_count': expected_count, - 'actual_count': alive_count, - 'dead_workers': len(dead_workers), - 'restarted_workers': restarted_count, - 'message': f'Found {len(dead_workers)} dead workers, restarted {restarted_count}' - } - else: - # For sync workers, just report the issue (harder to auto-restart) - return { - 'status': 'degraded', - 'expected_count': expected_count, - 'actual_count': current_count, - 'message': f'Worker count mismatch: expected {expected_count}, got {current_count}' - } \ No newline at end of file + for i in range(missing_workers): + worker_id = alive_count + i + try: + task_future = asyncio.run_coroutine_threadsafe( + start_single_async_worker(worker_id, update_q, notification_q, app, datastore), + async_loop + ) + running_async_tasks.append(task_future) + restarted_count += 1 + except Exception as e: + logger.error(f"Failed to restart worker {worker_id}: {e}") + + return { + 'status': 'repaired' if restarted_count > 0 else 'degraded', + 'expected_count': expected_count, + 'actual_count': alive_count, + 'dead_workers': len(dead_workers), + 'restarted_workers': restarted_count, + 'message': f'Found {len(dead_workers)} dead workers, restarted {restarted_count}' + } \ No newline at end of file