diff --git a/changedetectionio/__init__.py b/changedetectionio/__init__.py index 57890b7c..65bc3484 100644 --- a/changedetectionio/__init__.py +++ b/changedetectionio/__init__.py @@ -35,13 +35,22 @@ def sigshutdown_handler(_signo, _stack_frame): app.config.exit.set() datastore.stop_thread = True - # Shutdown workers immediately + # Shutdown workers and queues immediately try: from changedetectionio import worker_handler worker_handler.shutdown_workers() except Exception as e: logger.error(f"Error shutting down workers: {str(e)}") + # Close janus queues properly + try: + from changedetectionio.flask_app import update_q, notification_q + update_q.close() + notification_q.close() + logger.debug("Janus queues closed successfully") + except Exception as e: + logger.critical(f"CRITICAL: Failed to close janus queues: {e}") + # Shutdown socketio server fast from changedetectionio.flask_app import socketio_server if socketio_server and hasattr(socketio_server, 'shutdown'): diff --git a/changedetectionio/async_update_worker.py b/changedetectionio/async_update_worker.py index c1f5f338..34c663b7 100644 --- a/changedetectionio/async_update_worker.py +++ b/changedetectionio/async_update_worker.py @@ -7,6 +7,7 @@ from changedetectionio.flask_app import watch_check_update import asyncio import importlib import os +import queue import time from loguru import logger @@ -37,13 +38,23 @@ async def async_update_worker(worker_id, q, notification_q, app, datastore): watch = None try: - # Use asyncio wait_for to make queue.get() cancellable - queued_item_data = await asyncio.wait_for(q.get(), timeout=1.0) + # Use native janus async interface - no threads needed! + queued_item_data = await asyncio.wait_for(q.async_get(), timeout=1.0) + except asyncio.TimeoutError: # No jobs available, continue loop continue except Exception as e: - logger.error(f"Worker {worker_id} error getting queue item: {e}") + logger.critical(f"CRITICAL: Worker {worker_id} failed to get queue item: {type(e).__name__}: {e}") + + # Log queue health for debugging + try: + queue_size = q.qsize() + is_empty = q.empty() + logger.critical(f"CRITICAL: Worker {worker_id} queue health - size: {queue_size}, empty: {is_empty}") + except Exception as health_e: + logger.critical(f"CRITICAL: Worker {worker_id} queue health check failed: {health_e}") + await asyncio.sleep(0.1) continue diff --git a/changedetectionio/flask_app.py b/changedetectionio/flask_app.py index 252c7100..14fd3db6 100644 --- a/changedetectionio/flask_app.py +++ b/changedetectionio/flask_app.py @@ -12,7 +12,7 @@ from blinker import signal from changedetectionio.strtobool import strtobool from threading import Event -from changedetectionio.custom_queue import SignalPriorityQueue, AsyncSignalPriorityQueue, NotificationQueue +from changedetectionio.queue_handlers import ReliablePriorityQueue, ReliableNotificationQueue from changedetectionio import worker_handler from flask import ( @@ -48,9 +48,9 @@ datastore = None ticker_thread = None extra_stylesheets = [] -# Use async queue by default, keep sync for backward compatibility -update_q = AsyncSignalPriorityQueue() if worker_handler.USE_ASYNC_WORKERS else SignalPriorityQueue() -notification_q = NotificationQueue() +# Use bulletproof janus-based queues for sync/async reliability +update_q = ReliablePriorityQueue() +notification_q = ReliableNotificationQueue() MAX_QUEUE_SIZE = 2000 app = Flask(__name__, @@ -844,16 +844,22 @@ def ticker_thread_check_time_launch_checks(): # Use Epoch time as priority, so we get a "sorted" PriorityQueue, but we can still push a priority 1 into it. priority = int(time.time()) - logger.debug( - f"> Queued watch UUID {uuid} " - f"last checked at {watch['last_checked']} " - f"queued at {now:0.2f} priority {priority} " - f"jitter {watch.jitter_seconds:0.2f}s, " - f"{now - watch['last_checked']:0.2f}s since last checked") # Into the queue with you - worker_handler.queue_item_async_safe(update_q, queuedWatchMetaData.PrioritizedItem(priority=priority, item={'uuid': uuid})) - + queued_successfully = worker_handler.queue_item_async_safe(update_q, + queuedWatchMetaData.PrioritizedItem(priority=priority, + item={'uuid': uuid}) + ) + if queued_successfully: + logger.debug( + f"> Queued watch UUID {uuid} " + f"last checked at {watch['last_checked']} " + f"queued at {now:0.2f} priority {priority} " + f"jitter {watch.jitter_seconds:0.2f}s, " + f"{now - watch['last_checked']:0.2f}s since last checked") + else: + logger.critical(f"CRITICAL: Failed to queue watch UUID {uuid} in ticker thread!") + # Reset for next time watch.jitter_seconds = 0 diff --git a/changedetectionio/queue_handlers.py b/changedetectionio/queue_handlers.py new file mode 100644 index 00000000..d9554f4a --- /dev/null +++ b/changedetectionio/queue_handlers.py @@ -0,0 +1,407 @@ +import heapq +import threading +from typing import Dict, List, Any, Optional +from blinker import signal +from loguru import logger + +try: + import janus +except ImportError: + logger.critical("CRITICAL: janus library is required. Install with: pip install janus") + raise + + +class ReliablePriorityQueue: + """ + Ultra-reliable priority queue using janus for async/sync bridging. + + Minimal implementation focused on reliability: + - Pure janus for sync/async bridge + - Thread-safe priority ordering + - Bulletproof error handling with critical logging + """ + + def __init__(self, maxsize: int = 0): + try: + self._janus_queue = janus.Queue(maxsize=maxsize) + self.sync_q = self._janus_queue.sync_q # For sync contexts (ticker) + self.async_q = self._janus_queue.async_q # For async contexts (workers) + + # Priority storage - thread-safe + self._priority_items = [] + self._lock = threading.RLock() + + # Signals for UI updates + self.queue_length_signal = signal('queue_length') + + logger.debug("ReliablePriorityQueue initialized successfully") + except Exception as e: + logger.critical(f"CRITICAL: Failed to initialize ReliablePriorityQueue: {e}") + raise + + # SYNC INTERFACE (for ticker thread) + def put(self, item, block: bool = True, timeout: Optional[float] = None): + """Thread-safe sync put with priority ordering""" + try: + # Add to priority storage + with self._lock: + heapq.heappush(self._priority_items, item) + + # Notify via janus sync queue + self.sync_q.put(True, block=block, timeout=timeout) + + # Emit signals + self._emit_put_signals(item) + + logger.debug(f"Successfully queued item: {self._get_item_uuid(item)}") + return True + + except Exception as e: + logger.critical(f"CRITICAL: Failed to put item {self._get_item_uuid(item)}: {e}") + # Remove from priority storage if janus put failed + try: + with self._lock: + if item in self._priority_items: + self._priority_items.remove(item) + heapq.heapify(self._priority_items) + except Exception as cleanup_e: + logger.critical(f"CRITICAL: Failed to cleanup after put failure: {cleanup_e}") + return False + + def get(self, block: bool = True, timeout: Optional[float] = None): + """Thread-safe sync get with priority ordering""" + try: + # Wait for notification + self.sync_q.get(block=block, timeout=timeout) + + # Get highest priority item + with self._lock: + if not self._priority_items: + logger.critical("CRITICAL: Queue notification received but no priority items available") + raise Exception("Priority queue inconsistency") + item = heapq.heappop(self._priority_items) + + # Emit signals + self._emit_get_signals() + + logger.debug(f"Successfully retrieved item: {self._get_item_uuid(item)}") + return item + + except Exception as e: + logger.critical(f"CRITICAL: Failed to get item from queue: {e}") + raise + + # ASYNC INTERFACE (for workers) + async def async_put(self, item): + """Pure async put with priority ordering""" + try: + # Add to priority storage + with self._lock: + heapq.heappush(self._priority_items, item) + + # Notify via janus async queue + await self.async_q.put(True) + + # Emit signals + self._emit_put_signals(item) + + logger.debug(f"Successfully async queued item: {self._get_item_uuid(item)}") + return True + + except Exception as e: + logger.critical(f"CRITICAL: Failed to async put item {self._get_item_uuid(item)}: {e}") + # Remove from priority storage if janus put failed + try: + with self._lock: + if item in self._priority_items: + self._priority_items.remove(item) + heapq.heapify(self._priority_items) + except Exception as cleanup_e: + logger.critical(f"CRITICAL: Failed to cleanup after async put failure: {cleanup_e}") + return False + + async def async_get(self): + """Pure async get with priority ordering""" + try: + # Wait for notification + await self.async_q.get() + + # Get highest priority item + with self._lock: + if not self._priority_items: + logger.critical("CRITICAL: Async queue notification received but no priority items available") + raise Exception("Priority queue inconsistency") + item = heapq.heappop(self._priority_items) + + # Emit signals + self._emit_get_signals() + + logger.debug(f"Successfully async retrieved item: {self._get_item_uuid(item)}") + return item + + except Exception as e: + logger.critical(f"CRITICAL: Failed to async get item from queue: {e}") + raise + + # UTILITY METHODS + def qsize(self) -> int: + """Get current queue size""" + try: + with self._lock: + return len(self._priority_items) + except Exception as e: + logger.critical(f"CRITICAL: Failed to get queue size: {e}") + return 0 + + def empty(self) -> bool: + """Check if queue is empty""" + return self.qsize() == 0 + + def close(self): + """Close the janus queue""" + try: + self._janus_queue.close() + logger.debug("ReliablePriorityQueue closed successfully") + except Exception as e: + logger.critical(f"CRITICAL: Failed to close ReliablePriorityQueue: {e}") + + # COMPATIBILITY METHODS (from original implementation) + @property + def queue(self): + """Provide compatibility with original queue access""" + try: + with self._lock: + return list(self._priority_items) + except Exception as e: + logger.critical(f"CRITICAL: Failed to get queue list: {e}") + return [] + + def get_uuid_position(self, target_uuid: str) -> Dict[str, Any]: + """Find position of UUID in queue""" + try: + with self._lock: + queue_list = list(self._priority_items) + total_items = len(queue_list) + + if total_items == 0: + return {'position': None, 'total_items': 0, 'priority': None, 'found': False} + + # Find target item + for item in queue_list: + if (hasattr(item, 'item') and isinstance(item.item, dict) and + item.item.get('uuid') == target_uuid): + + # Count items with higher priority + position = sum(1 for other in queue_list if other.priority < item.priority) + return { + 'position': position, + 'total_items': total_items, + 'priority': item.priority, + 'found': True + } + + return {'position': None, 'total_items': total_items, 'priority': None, 'found': False} + + except Exception as e: + logger.critical(f"CRITICAL: Failed to get UUID position for {target_uuid}: {e}") + return {'position': None, 'total_items': 0, 'priority': None, 'found': False} + + def get_all_queued_uuids(self, limit: Optional[int] = None, offset: int = 0) -> Dict[str, Any]: + """Get all queued UUIDs with pagination""" + try: + with self._lock: + queue_list = sorted(self._priority_items) # Sort by priority + total_items = len(queue_list) + + if total_items == 0: + return {'items': [], 'total_items': 0, 'returned_items': 0, 'has_more': False} + + # Apply pagination + end_idx = min(offset + limit, total_items) if limit else total_items + items_to_process = queue_list[offset:end_idx] + + result = [] + for position, item in enumerate(items_to_process, start=offset): + if (hasattr(item, 'item') and isinstance(item.item, dict) and + 'uuid' in item.item): + result.append({ + 'uuid': item.item['uuid'], + 'position': position, + 'priority': item.priority + }) + + return { + 'items': result, + 'total_items': total_items, + 'returned_items': len(result), + 'has_more': (offset + len(result)) < total_items + } + + except Exception as e: + logger.critical(f"CRITICAL: Failed to get all queued UUIDs: {e}") + return {'items': [], 'total_items': 0, 'returned_items': 0, 'has_more': False} + + def get_queue_summary(self) -> Dict[str, Any]: + """Get queue summary statistics""" + try: + with self._lock: + queue_list = list(self._priority_items) + total_items = len(queue_list) + + if total_items == 0: + return { + 'total_items': 0, 'priority_breakdown': {}, + 'immediate_items': 0, 'clone_items': 0, 'scheduled_items': 0 + } + + immediate_items = clone_items = scheduled_items = 0 + priority_counts = {} + + for item in queue_list: + priority = item.priority + priority_counts[priority] = priority_counts.get(priority, 0) + 1 + + if priority == 1: + immediate_items += 1 + elif priority == 5: + clone_items += 1 + elif priority > 100: + scheduled_items += 1 + + return { + 'total_items': total_items, + 'priority_breakdown': priority_counts, + 'immediate_items': immediate_items, + 'clone_items': clone_items, + 'scheduled_items': scheduled_items, + 'min_priority': min(priority_counts.keys()) if priority_counts else None, + 'max_priority': max(priority_counts.keys()) if priority_counts else None + } + + except Exception as e: + logger.critical(f"CRITICAL: Failed to get queue summary: {e}") + return {'total_items': 0, 'priority_breakdown': {}, 'immediate_items': 0, + 'clone_items': 0, 'scheduled_items': 0} + + # PRIVATE METHODS + def _get_item_uuid(self, item) -> str: + """Safely extract UUID from item for logging""" + try: + if hasattr(item, 'item') and isinstance(item.item, dict): + return item.item.get('uuid', 'unknown') + except Exception: + pass + return 'unknown' + + def _emit_put_signals(self, item): + """Emit signals when item is added""" + try: + # Watch update signal + if hasattr(item, 'item') and isinstance(item.item, dict) and 'uuid' in item.item: + watch_check_update = signal('watch_check_update') + if watch_check_update: + watch_check_update.send(watch_uuid=item.item['uuid']) + + # Queue length signal + if self.queue_length_signal: + self.queue_length_signal.send(length=self.qsize()) + + except Exception as e: + logger.critical(f"CRITICAL: Failed to emit put signals: {e}") + + def _emit_get_signals(self): + """Emit signals when item is removed""" + try: + if self.queue_length_signal: + self.queue_length_signal.send(length=self.qsize()) + except Exception as e: + logger.critical(f"CRITICAL: Failed to emit get signals: {e}") + + +class ReliableNotificationQueue: + """ + Ultra-reliable notification queue using pure janus. + + Simple wrapper around janus with bulletproof error handling. + """ + + def __init__(self, maxsize: int = 0): + try: + self._janus_queue = janus.Queue(maxsize=maxsize) + self.sync_q = self._janus_queue.sync_q + self.async_q = self._janus_queue.async_q + self.notification_event_signal = signal('notification_event') + logger.debug("ReliableNotificationQueue initialized successfully") + except Exception as e: + logger.critical(f"CRITICAL: Failed to initialize ReliableNotificationQueue: {e}") + raise + + def put(self, item: Dict[str, Any], block: bool = True, timeout: Optional[float] = None): + """Thread-safe sync put with signal emission""" + try: + self.sync_q.put(item, block=block, timeout=timeout) + self._emit_notification_signal(item) + logger.debug(f"Successfully queued notification: {item.get('uuid', 'unknown')}") + return True + except Exception as e: + logger.critical(f"CRITICAL: Failed to put notification {item.get('uuid', 'unknown')}: {e}") + return False + + async def async_put(self, item: Dict[str, Any]): + """Pure async put with signal emission""" + try: + await self.async_q.put(item) + self._emit_notification_signal(item) + logger.debug(f"Successfully async queued notification: {item.get('uuid', 'unknown')}") + return True + except Exception as e: + logger.critical(f"CRITICAL: Failed to async put notification {item.get('uuid', 'unknown')}: {e}") + return False + + def get(self, block: bool = True, timeout: Optional[float] = None): + """Thread-safe sync get""" + try: + return self.sync_q.get(block=block, timeout=timeout) + except Exception as e: + logger.critical(f"CRITICAL: Failed to get notification: {e}") + raise + + async def async_get(self): + """Pure async get""" + try: + return await self.async_q.get() + except Exception as e: + logger.critical(f"CRITICAL: Failed to async get notification: {e}") + raise + + def qsize(self) -> int: + """Get current queue size""" + try: + return self.sync_q.qsize() + except Exception as e: + logger.critical(f"CRITICAL: Failed to get notification queue size: {e}") + return 0 + + def empty(self) -> bool: + """Check if queue is empty""" + return self.qsize() == 0 + + def close(self): + """Close the janus queue""" + try: + self._janus_queue.close() + logger.debug("ReliableNotificationQueue closed successfully") + except Exception as e: + logger.critical(f"CRITICAL: Failed to close ReliableNotificationQueue: {e}") + + def _emit_notification_signal(self, item: Dict[str, Any]): + """Emit notification signal""" + try: + if self.notification_event_signal and isinstance(item, dict): + watch_uuid = item.get('uuid') + if watch_uuid: + self.notification_event_signal.send(watch_uuid=watch_uuid) + else: + self.notification_event_signal.send() + except Exception as e: + logger.critical(f"CRITICAL: Failed to emit notification signal: {e}") \ No newline at end of file diff --git a/changedetectionio/worker_handler.py b/changedetectionio/worker_handler.py index 953d2354..3247bddf 100644 --- a/changedetectionio/worker_handler.py +++ b/changedetectionio/worker_handler.py @@ -188,15 +188,54 @@ def is_watch_running(watch_uuid): def queue_item_async_safe(update_q, item): - """Queue an item for async queue processing""" - if async_loop and not async_loop.is_closed(): + """Bulletproof queue operation with comprehensive error handling""" + item_uuid = 'unknown' + + try: + # Safely extract UUID for logging + if hasattr(item, 'item') and isinstance(item.item, dict): + item_uuid = item.item.get('uuid', 'unknown') + except Exception as uuid_e: + logger.critical(f"CRITICAL: Failed to extract UUID from queue item: {uuid_e}") + + # Validate inputs + if not update_q: + logger.critical(f"CRITICAL: Queue is None/invalid for item {item_uuid}") + return False + + if not item: + logger.critical(f"CRITICAL: Item is None/invalid") + return False + + # Attempt queue operation with multiple fallbacks + try: + # Primary: Use sync interface (thread-safe) + success = update_q.put(item, block=True, timeout=5.0) + if success is False: # Explicit False return means failure + logger.critical(f"CRITICAL: Queue.put() returned False for item {item_uuid}") + return False + + logger.debug(f"Successfully queued item: {item_uuid}") + return True + + except Exception as e: + logger.critical(f"CRITICAL: Exception during queue operation for item {item_uuid}: {type(e).__name__}: {e}") + + # Secondary: Attempt queue health check try: - # For async queue, schedule the put operation - asyncio.run_coroutine_threadsafe(update_q.put(item), async_loop) - except RuntimeError as e: - logger.error(f"Failed to queue item: {e}") - else: - logger.error("Async loop not available or closed for queueing item") + queue_size = update_q.qsize() + is_empty = update_q.empty() + logger.critical(f"CRITICAL: Queue health - size: {queue_size}, empty: {is_empty}") + except Exception as health_e: + logger.critical(f"CRITICAL: Queue health check failed: {health_e}") + + # Log queue type for debugging + try: + logger.critical(f"CRITICAL: Queue type: {type(update_q)}, has sync_q: {hasattr(update_q, 'sync_q')}") + except Exception: + logger.critical(f"CRITICAL: Cannot determine queue type") + + return False def shutdown_workers(): diff --git a/requirements.txt b/requirements.txt index 01e5fecd..4374bedc 100644 --- a/requirements.txt +++ b/requirements.txt @@ -7,6 +7,7 @@ flask-paginate flask_expects_json~=1.7 flask_restful flask_cors # For the Chrome extension to operate +janus # Thread-safe async/sync queue bridge flask_wtf~=1.2 flask~=2.3 flask-socketio~=5.5.1