Slightly more reliable queue handlers

janus-queue
dgtlmoon 2025-08-10 18:21:56 +02:00
rodzic aa4e182549
commit f0588a9dd1
6 zmienionych plików z 497 dodań i 24 usunięć

Wyświetl plik

@ -35,13 +35,22 @@ def sigshutdown_handler(_signo, _stack_frame):
app.config.exit.set()
datastore.stop_thread = True
# Shutdown workers immediately
# Shutdown workers and queues immediately
try:
from changedetectionio import worker_handler
worker_handler.shutdown_workers()
except Exception as e:
logger.error(f"Error shutting down workers: {str(e)}")
# Close janus queues properly
try:
from changedetectionio.flask_app import update_q, notification_q
update_q.close()
notification_q.close()
logger.debug("Janus queues closed successfully")
except Exception as e:
logger.critical(f"CRITICAL: Failed to close janus queues: {e}")
# Shutdown socketio server fast
from changedetectionio.flask_app import socketio_server
if socketio_server and hasattr(socketio_server, 'shutdown'):

Wyświetl plik

@ -7,6 +7,7 @@ from changedetectionio.flask_app import watch_check_update
import asyncio
import importlib
import os
import queue
import time
from loguru import logger
@ -37,13 +38,23 @@ async def async_update_worker(worker_id, q, notification_q, app, datastore):
watch = None
try:
# Use asyncio wait_for to make queue.get() cancellable
queued_item_data = await asyncio.wait_for(q.get(), timeout=1.0)
# Use native janus async interface - no threads needed!
queued_item_data = await asyncio.wait_for(q.async_get(), timeout=1.0)
except asyncio.TimeoutError:
# No jobs available, continue loop
continue
except Exception as e:
logger.error(f"Worker {worker_id} error getting queue item: {e}")
logger.critical(f"CRITICAL: Worker {worker_id} failed to get queue item: {type(e).__name__}: {e}")
# Log queue health for debugging
try:
queue_size = q.qsize()
is_empty = q.empty()
logger.critical(f"CRITICAL: Worker {worker_id} queue health - size: {queue_size}, empty: {is_empty}")
except Exception as health_e:
logger.critical(f"CRITICAL: Worker {worker_id} queue health check failed: {health_e}")
await asyncio.sleep(0.1)
continue

Wyświetl plik

@ -12,7 +12,7 @@ from blinker import signal
from changedetectionio.strtobool import strtobool
from threading import Event
from changedetectionio.custom_queue import SignalPriorityQueue, AsyncSignalPriorityQueue, NotificationQueue
from changedetectionio.queue_handlers import ReliablePriorityQueue, ReliableNotificationQueue
from changedetectionio import worker_handler
from flask import (
@ -48,9 +48,9 @@ datastore = None
ticker_thread = None
extra_stylesheets = []
# Use async queue by default, keep sync for backward compatibility
update_q = AsyncSignalPriorityQueue() if worker_handler.USE_ASYNC_WORKERS else SignalPriorityQueue()
notification_q = NotificationQueue()
# Use bulletproof janus-based queues for sync/async reliability
update_q = ReliablePriorityQueue()
notification_q = ReliableNotificationQueue()
MAX_QUEUE_SIZE = 2000
app = Flask(__name__,
@ -844,16 +844,22 @@ def ticker_thread_check_time_launch_checks():
# Use Epoch time as priority, so we get a "sorted" PriorityQueue, but we can still push a priority 1 into it.
priority = int(time.time())
logger.debug(
f"> Queued watch UUID {uuid} "
f"last checked at {watch['last_checked']} "
f"queued at {now:0.2f} priority {priority} "
f"jitter {watch.jitter_seconds:0.2f}s, "
f"{now - watch['last_checked']:0.2f}s since last checked")
# Into the queue with you
worker_handler.queue_item_async_safe(update_q, queuedWatchMetaData.PrioritizedItem(priority=priority, item={'uuid': uuid}))
queued_successfully = worker_handler.queue_item_async_safe(update_q,
queuedWatchMetaData.PrioritizedItem(priority=priority,
item={'uuid': uuid})
)
if queued_successfully:
logger.debug(
f"> Queued watch UUID {uuid} "
f"last checked at {watch['last_checked']} "
f"queued at {now:0.2f} priority {priority} "
f"jitter {watch.jitter_seconds:0.2f}s, "
f"{now - watch['last_checked']:0.2f}s since last checked")
else:
logger.critical(f"CRITICAL: Failed to queue watch UUID {uuid} in ticker thread!")
# Reset for next time
watch.jitter_seconds = 0

Wyświetl plik

@ -0,0 +1,407 @@
import heapq
import threading
from typing import Dict, List, Any, Optional
from blinker import signal
from loguru import logger
try:
import janus
except ImportError:
logger.critical("CRITICAL: janus library is required. Install with: pip install janus")
raise
class ReliablePriorityQueue:
"""
Ultra-reliable priority queue using janus for async/sync bridging.
Minimal implementation focused on reliability:
- Pure janus for sync/async bridge
- Thread-safe priority ordering
- Bulletproof error handling with critical logging
"""
def __init__(self, maxsize: int = 0):
try:
self._janus_queue = janus.Queue(maxsize=maxsize)
self.sync_q = self._janus_queue.sync_q # For sync contexts (ticker)
self.async_q = self._janus_queue.async_q # For async contexts (workers)
# Priority storage - thread-safe
self._priority_items = []
self._lock = threading.RLock()
# Signals for UI updates
self.queue_length_signal = signal('queue_length')
logger.debug("ReliablePriorityQueue initialized successfully")
except Exception as e:
logger.critical(f"CRITICAL: Failed to initialize ReliablePriorityQueue: {e}")
raise
# SYNC INTERFACE (for ticker thread)
def put(self, item, block: bool = True, timeout: Optional[float] = None):
"""Thread-safe sync put with priority ordering"""
try:
# Add to priority storage
with self._lock:
heapq.heappush(self._priority_items, item)
# Notify via janus sync queue
self.sync_q.put(True, block=block, timeout=timeout)
# Emit signals
self._emit_put_signals(item)
logger.debug(f"Successfully queued item: {self._get_item_uuid(item)}")
return True
except Exception as e:
logger.critical(f"CRITICAL: Failed to put item {self._get_item_uuid(item)}: {e}")
# Remove from priority storage if janus put failed
try:
with self._lock:
if item in self._priority_items:
self._priority_items.remove(item)
heapq.heapify(self._priority_items)
except Exception as cleanup_e:
logger.critical(f"CRITICAL: Failed to cleanup after put failure: {cleanup_e}")
return False
def get(self, block: bool = True, timeout: Optional[float] = None):
"""Thread-safe sync get with priority ordering"""
try:
# Wait for notification
self.sync_q.get(block=block, timeout=timeout)
# Get highest priority item
with self._lock:
if not self._priority_items:
logger.critical("CRITICAL: Queue notification received but no priority items available")
raise Exception("Priority queue inconsistency")
item = heapq.heappop(self._priority_items)
# Emit signals
self._emit_get_signals()
logger.debug(f"Successfully retrieved item: {self._get_item_uuid(item)}")
return item
except Exception as e:
logger.critical(f"CRITICAL: Failed to get item from queue: {e}")
raise
# ASYNC INTERFACE (for workers)
async def async_put(self, item):
"""Pure async put with priority ordering"""
try:
# Add to priority storage
with self._lock:
heapq.heappush(self._priority_items, item)
# Notify via janus async queue
await self.async_q.put(True)
# Emit signals
self._emit_put_signals(item)
logger.debug(f"Successfully async queued item: {self._get_item_uuid(item)}")
return True
except Exception as e:
logger.critical(f"CRITICAL: Failed to async put item {self._get_item_uuid(item)}: {e}")
# Remove from priority storage if janus put failed
try:
with self._lock:
if item in self._priority_items:
self._priority_items.remove(item)
heapq.heapify(self._priority_items)
except Exception as cleanup_e:
logger.critical(f"CRITICAL: Failed to cleanup after async put failure: {cleanup_e}")
return False
async def async_get(self):
"""Pure async get with priority ordering"""
try:
# Wait for notification
await self.async_q.get()
# Get highest priority item
with self._lock:
if not self._priority_items:
logger.critical("CRITICAL: Async queue notification received but no priority items available")
raise Exception("Priority queue inconsistency")
item = heapq.heappop(self._priority_items)
# Emit signals
self._emit_get_signals()
logger.debug(f"Successfully async retrieved item: {self._get_item_uuid(item)}")
return item
except Exception as e:
logger.critical(f"CRITICAL: Failed to async get item from queue: {e}")
raise
# UTILITY METHODS
def qsize(self) -> int:
"""Get current queue size"""
try:
with self._lock:
return len(self._priority_items)
except Exception as e:
logger.critical(f"CRITICAL: Failed to get queue size: {e}")
return 0
def empty(self) -> bool:
"""Check if queue is empty"""
return self.qsize() == 0
def close(self):
"""Close the janus queue"""
try:
self._janus_queue.close()
logger.debug("ReliablePriorityQueue closed successfully")
except Exception as e:
logger.critical(f"CRITICAL: Failed to close ReliablePriorityQueue: {e}")
# COMPATIBILITY METHODS (from original implementation)
@property
def queue(self):
"""Provide compatibility with original queue access"""
try:
with self._lock:
return list(self._priority_items)
except Exception as e:
logger.critical(f"CRITICAL: Failed to get queue list: {e}")
return []
def get_uuid_position(self, target_uuid: str) -> Dict[str, Any]:
"""Find position of UUID in queue"""
try:
with self._lock:
queue_list = list(self._priority_items)
total_items = len(queue_list)
if total_items == 0:
return {'position': None, 'total_items': 0, 'priority': None, 'found': False}
# Find target item
for item in queue_list:
if (hasattr(item, 'item') and isinstance(item.item, dict) and
item.item.get('uuid') == target_uuid):
# Count items with higher priority
position = sum(1 for other in queue_list if other.priority < item.priority)
return {
'position': position,
'total_items': total_items,
'priority': item.priority,
'found': True
}
return {'position': None, 'total_items': total_items, 'priority': None, 'found': False}
except Exception as e:
logger.critical(f"CRITICAL: Failed to get UUID position for {target_uuid}: {e}")
return {'position': None, 'total_items': 0, 'priority': None, 'found': False}
def get_all_queued_uuids(self, limit: Optional[int] = None, offset: int = 0) -> Dict[str, Any]:
"""Get all queued UUIDs with pagination"""
try:
with self._lock:
queue_list = sorted(self._priority_items) # Sort by priority
total_items = len(queue_list)
if total_items == 0:
return {'items': [], 'total_items': 0, 'returned_items': 0, 'has_more': False}
# Apply pagination
end_idx = min(offset + limit, total_items) if limit else total_items
items_to_process = queue_list[offset:end_idx]
result = []
for position, item in enumerate(items_to_process, start=offset):
if (hasattr(item, 'item') and isinstance(item.item, dict) and
'uuid' in item.item):
result.append({
'uuid': item.item['uuid'],
'position': position,
'priority': item.priority
})
return {
'items': result,
'total_items': total_items,
'returned_items': len(result),
'has_more': (offset + len(result)) < total_items
}
except Exception as e:
logger.critical(f"CRITICAL: Failed to get all queued UUIDs: {e}")
return {'items': [], 'total_items': 0, 'returned_items': 0, 'has_more': False}
def get_queue_summary(self) -> Dict[str, Any]:
"""Get queue summary statistics"""
try:
with self._lock:
queue_list = list(self._priority_items)
total_items = len(queue_list)
if total_items == 0:
return {
'total_items': 0, 'priority_breakdown': {},
'immediate_items': 0, 'clone_items': 0, 'scheduled_items': 0
}
immediate_items = clone_items = scheduled_items = 0
priority_counts = {}
for item in queue_list:
priority = item.priority
priority_counts[priority] = priority_counts.get(priority, 0) + 1
if priority == 1:
immediate_items += 1
elif priority == 5:
clone_items += 1
elif priority > 100:
scheduled_items += 1
return {
'total_items': total_items,
'priority_breakdown': priority_counts,
'immediate_items': immediate_items,
'clone_items': clone_items,
'scheduled_items': scheduled_items,
'min_priority': min(priority_counts.keys()) if priority_counts else None,
'max_priority': max(priority_counts.keys()) if priority_counts else None
}
except Exception as e:
logger.critical(f"CRITICAL: Failed to get queue summary: {e}")
return {'total_items': 0, 'priority_breakdown': {}, 'immediate_items': 0,
'clone_items': 0, 'scheduled_items': 0}
# PRIVATE METHODS
def _get_item_uuid(self, item) -> str:
"""Safely extract UUID from item for logging"""
try:
if hasattr(item, 'item') and isinstance(item.item, dict):
return item.item.get('uuid', 'unknown')
except Exception:
pass
return 'unknown'
def _emit_put_signals(self, item):
"""Emit signals when item is added"""
try:
# Watch update signal
if hasattr(item, 'item') and isinstance(item.item, dict) and 'uuid' in item.item:
watch_check_update = signal('watch_check_update')
if watch_check_update:
watch_check_update.send(watch_uuid=item.item['uuid'])
# Queue length signal
if self.queue_length_signal:
self.queue_length_signal.send(length=self.qsize())
except Exception as e:
logger.critical(f"CRITICAL: Failed to emit put signals: {e}")
def _emit_get_signals(self):
"""Emit signals when item is removed"""
try:
if self.queue_length_signal:
self.queue_length_signal.send(length=self.qsize())
except Exception as e:
logger.critical(f"CRITICAL: Failed to emit get signals: {e}")
class ReliableNotificationQueue:
"""
Ultra-reliable notification queue using pure janus.
Simple wrapper around janus with bulletproof error handling.
"""
def __init__(self, maxsize: int = 0):
try:
self._janus_queue = janus.Queue(maxsize=maxsize)
self.sync_q = self._janus_queue.sync_q
self.async_q = self._janus_queue.async_q
self.notification_event_signal = signal('notification_event')
logger.debug("ReliableNotificationQueue initialized successfully")
except Exception as e:
logger.critical(f"CRITICAL: Failed to initialize ReliableNotificationQueue: {e}")
raise
def put(self, item: Dict[str, Any], block: bool = True, timeout: Optional[float] = None):
"""Thread-safe sync put with signal emission"""
try:
self.sync_q.put(item, block=block, timeout=timeout)
self._emit_notification_signal(item)
logger.debug(f"Successfully queued notification: {item.get('uuid', 'unknown')}")
return True
except Exception as e:
logger.critical(f"CRITICAL: Failed to put notification {item.get('uuid', 'unknown')}: {e}")
return False
async def async_put(self, item: Dict[str, Any]):
"""Pure async put with signal emission"""
try:
await self.async_q.put(item)
self._emit_notification_signal(item)
logger.debug(f"Successfully async queued notification: {item.get('uuid', 'unknown')}")
return True
except Exception as e:
logger.critical(f"CRITICAL: Failed to async put notification {item.get('uuid', 'unknown')}: {e}")
return False
def get(self, block: bool = True, timeout: Optional[float] = None):
"""Thread-safe sync get"""
try:
return self.sync_q.get(block=block, timeout=timeout)
except Exception as e:
logger.critical(f"CRITICAL: Failed to get notification: {e}")
raise
async def async_get(self):
"""Pure async get"""
try:
return await self.async_q.get()
except Exception as e:
logger.critical(f"CRITICAL: Failed to async get notification: {e}")
raise
def qsize(self) -> int:
"""Get current queue size"""
try:
return self.sync_q.qsize()
except Exception as e:
logger.critical(f"CRITICAL: Failed to get notification queue size: {e}")
return 0
def empty(self) -> bool:
"""Check if queue is empty"""
return self.qsize() == 0
def close(self):
"""Close the janus queue"""
try:
self._janus_queue.close()
logger.debug("ReliableNotificationQueue closed successfully")
except Exception as e:
logger.critical(f"CRITICAL: Failed to close ReliableNotificationQueue: {e}")
def _emit_notification_signal(self, item: Dict[str, Any]):
"""Emit notification signal"""
try:
if self.notification_event_signal and isinstance(item, dict):
watch_uuid = item.get('uuid')
if watch_uuid:
self.notification_event_signal.send(watch_uuid=watch_uuid)
else:
self.notification_event_signal.send()
except Exception as e:
logger.critical(f"CRITICAL: Failed to emit notification signal: {e}")

Wyświetl plik

@ -188,15 +188,54 @@ def is_watch_running(watch_uuid):
def queue_item_async_safe(update_q, item):
"""Queue an item for async queue processing"""
if async_loop and not async_loop.is_closed():
"""Bulletproof queue operation with comprehensive error handling"""
item_uuid = 'unknown'
try:
# Safely extract UUID for logging
if hasattr(item, 'item') and isinstance(item.item, dict):
item_uuid = item.item.get('uuid', 'unknown')
except Exception as uuid_e:
logger.critical(f"CRITICAL: Failed to extract UUID from queue item: {uuid_e}")
# Validate inputs
if not update_q:
logger.critical(f"CRITICAL: Queue is None/invalid for item {item_uuid}")
return False
if not item:
logger.critical(f"CRITICAL: Item is None/invalid")
return False
# Attempt queue operation with multiple fallbacks
try:
# Primary: Use sync interface (thread-safe)
success = update_q.put(item, block=True, timeout=5.0)
if success is False: # Explicit False return means failure
logger.critical(f"CRITICAL: Queue.put() returned False for item {item_uuid}")
return False
logger.debug(f"Successfully queued item: {item_uuid}")
return True
except Exception as e:
logger.critical(f"CRITICAL: Exception during queue operation for item {item_uuid}: {type(e).__name__}: {e}")
# Secondary: Attempt queue health check
try:
# For async queue, schedule the put operation
asyncio.run_coroutine_threadsafe(update_q.put(item), async_loop)
except RuntimeError as e:
logger.error(f"Failed to queue item: {e}")
else:
logger.error("Async loop not available or closed for queueing item")
queue_size = update_q.qsize()
is_empty = update_q.empty()
logger.critical(f"CRITICAL: Queue health - size: {queue_size}, empty: {is_empty}")
except Exception as health_e:
logger.critical(f"CRITICAL: Queue health check failed: {health_e}")
# Log queue type for debugging
try:
logger.critical(f"CRITICAL: Queue type: {type(update_q)}, has sync_q: {hasattr(update_q, 'sync_q')}")
except Exception:
logger.critical(f"CRITICAL: Cannot determine queue type")
return False
def shutdown_workers():

Wyświetl plik

@ -7,6 +7,7 @@ flask-paginate
flask_expects_json~=1.7
flask_restful
flask_cors # For the Chrome extension to operate
janus # Thread-safe async/sync queue bridge
flask_wtf~=1.2
flask~=2.3
flask-socketio~=5.5.1