2025-05-26 18:12:32 +00:00
|
|
|
import queue
|
2025-06-03 08:17:19 +00:00
|
|
|
import asyncio
|
2025-05-26 18:12:32 +00:00
|
|
|
from blinker import signal
|
|
|
|
from loguru import logger
|
|
|
|
|
|
|
|
class SignalPriorityQueue(queue.PriorityQueue):
|
|
|
|
"""
|
|
|
|
Extended PriorityQueue that sends a signal when items with a UUID are added.
|
|
|
|
|
|
|
|
This class extends the standard PriorityQueue and adds a signal emission
|
|
|
|
after an item is put into the queue. If the item contains a UUID, the signal
|
|
|
|
is sent with that UUID as a parameter.
|
|
|
|
"""
|
|
|
|
|
|
|
|
def __init__(self, maxsize=0):
|
|
|
|
super().__init__(maxsize)
|
|
|
|
try:
|
|
|
|
self.queue_length_signal = signal('queue_length')
|
|
|
|
except Exception as e:
|
|
|
|
logger.critical(f"Exception: {e}")
|
|
|
|
|
|
|
|
def put(self, item, block=True, timeout=None):
|
|
|
|
# Call the parent's put method first
|
|
|
|
super().put(item, block, timeout)
|
|
|
|
|
|
|
|
# After putting the item in the queue, check if it has a UUID and emit signal
|
|
|
|
if hasattr(item, 'item') and isinstance(item.item, dict) and 'uuid' in item.item:
|
|
|
|
uuid = item.item['uuid']
|
|
|
|
# Get the signal and send it if it exists
|
|
|
|
watch_check_update = signal('watch_check_update')
|
|
|
|
if watch_check_update:
|
|
|
|
# Send the watch_uuid parameter
|
|
|
|
watch_check_update.send(watch_uuid=uuid)
|
|
|
|
|
|
|
|
# Send queue_length signal with current queue size
|
|
|
|
try:
|
|
|
|
|
|
|
|
if self.queue_length_signal:
|
|
|
|
self.queue_length_signal.send(length=self.qsize())
|
|
|
|
except Exception as e:
|
|
|
|
logger.critical(f"Exception: {e}")
|
|
|
|
|
|
|
|
def get(self, block=True, timeout=None):
|
|
|
|
# Call the parent's get method first
|
|
|
|
item = super().get(block, timeout)
|
|
|
|
|
|
|
|
# Send queue_length signal with current queue size
|
|
|
|
try:
|
|
|
|
if self.queue_length_signal:
|
|
|
|
self.queue_length_signal.send(length=self.qsize())
|
|
|
|
except Exception as e:
|
|
|
|
logger.critical(f"Exception: {e}")
|
|
|
|
return item
|
2025-06-03 08:17:19 +00:00
|
|
|
|
|
|
|
def get_uuid_position(self, target_uuid):
|
|
|
|
"""
|
|
|
|
Find the position of a watch UUID in the priority queue.
|
|
|
|
Optimized for large queues - O(n) complexity instead of O(n log n).
|
|
|
|
|
|
|
|
Args:
|
|
|
|
target_uuid: The UUID to search for
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
dict: Contains position info or None if not found
|
|
|
|
- position: 0-based position in queue (0 = next to be processed)
|
|
|
|
- total_items: total number of items in queue
|
|
|
|
- priority: the priority value of the found item
|
|
|
|
"""
|
|
|
|
with self.mutex:
|
|
|
|
queue_list = list(self.queue)
|
|
|
|
total_items = len(queue_list)
|
|
|
|
|
|
|
|
if total_items == 0:
|
|
|
|
return {
|
|
|
|
'position': None,
|
|
|
|
'total_items': 0,
|
|
|
|
'priority': None,
|
|
|
|
'found': False
|
|
|
|
}
|
|
|
|
|
|
|
|
# Find the target item and its priority first - O(n)
|
|
|
|
target_item = None
|
|
|
|
target_priority = None
|
|
|
|
|
|
|
|
for item in queue_list:
|
|
|
|
if (hasattr(item, 'item') and
|
|
|
|
isinstance(item.item, dict) and
|
|
|
|
item.item.get('uuid') == target_uuid):
|
|
|
|
target_item = item
|
|
|
|
target_priority = item.priority
|
|
|
|
break
|
|
|
|
|
|
|
|
if target_item is None:
|
|
|
|
return {
|
|
|
|
'position': None,
|
|
|
|
'total_items': total_items,
|
|
|
|
'priority': None,
|
|
|
|
'found': False
|
|
|
|
}
|
|
|
|
|
|
|
|
# Count how many items have higher priority (lower numbers) - O(n)
|
|
|
|
position = 0
|
|
|
|
for item in queue_list:
|
|
|
|
# Items with lower priority numbers are processed first
|
|
|
|
if item.priority < target_priority:
|
|
|
|
position += 1
|
|
|
|
elif item.priority == target_priority and item != target_item:
|
|
|
|
# For same priority, count items that come before this one
|
|
|
|
# (Note: this is approximate since heap order isn't guaranteed for equal priorities)
|
|
|
|
position += 1
|
|
|
|
|
|
|
|
return {
|
|
|
|
'position': position,
|
|
|
|
'total_items': total_items,
|
|
|
|
'priority': target_priority,
|
|
|
|
'found': True
|
|
|
|
}
|
|
|
|
|
|
|
|
def get_all_queued_uuids(self, limit=None, offset=0):
|
|
|
|
"""
|
|
|
|
Get UUIDs currently in the queue with their positions.
|
|
|
|
For large queues, use limit/offset for pagination.
|
|
|
|
|
|
|
|
Args:
|
|
|
|
limit: Maximum number of items to return (None = all)
|
|
|
|
offset: Number of items to skip (for pagination)
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
dict: Contains items and metadata
|
|
|
|
- items: List of dicts with uuid, position, and priority
|
|
|
|
- total_items: Total number of items in queue
|
|
|
|
- returned_items: Number of items returned
|
|
|
|
- has_more: Whether there are more items after this page
|
|
|
|
"""
|
|
|
|
with self.mutex:
|
|
|
|
queue_list = list(self.queue)
|
|
|
|
total_items = len(queue_list)
|
|
|
|
|
|
|
|
if total_items == 0:
|
|
|
|
return {
|
|
|
|
'items': [],
|
|
|
|
'total_items': 0,
|
|
|
|
'returned_items': 0,
|
|
|
|
'has_more': False
|
|
|
|
}
|
|
|
|
|
|
|
|
# For very large queues, warn about performance
|
|
|
|
if total_items > 1000 and limit is None:
|
|
|
|
logger.warning(f"Getting all {total_items} queued items without limit - this may be slow")
|
|
|
|
|
|
|
|
# Sort only if we need exact positions (expensive for large queues)
|
|
|
|
if limit is not None and limit <= 100:
|
|
|
|
# For small requests, we can afford to sort
|
|
|
|
queue_items = sorted(queue_list)
|
|
|
|
end_idx = min(offset + limit, len(queue_items)) if limit else len(queue_items)
|
|
|
|
items_to_process = queue_items[offset:end_idx]
|
|
|
|
|
|
|
|
result = []
|
|
|
|
for position, item in enumerate(items_to_process, start=offset):
|
|
|
|
if (hasattr(item, 'item') and
|
|
|
|
isinstance(item.item, dict) and
|
|
|
|
'uuid' in item.item):
|
|
|
|
|
|
|
|
result.append({
|
|
|
|
'uuid': item.item['uuid'],
|
|
|
|
'position': position,
|
|
|
|
'priority': item.priority
|
|
|
|
})
|
|
|
|
|
|
|
|
return {
|
|
|
|
'items': result,
|
|
|
|
'total_items': total_items,
|
|
|
|
'returned_items': len(result),
|
|
|
|
'has_more': (offset + len(result)) < total_items
|
|
|
|
}
|
|
|
|
else:
|
|
|
|
# For large requests, return items with approximate positions
|
|
|
|
# This is much faster O(n) instead of O(n log n)
|
|
|
|
result = []
|
|
|
|
processed = 0
|
|
|
|
skipped = 0
|
|
|
|
|
|
|
|
for item in queue_list:
|
|
|
|
if (hasattr(item, 'item') and
|
|
|
|
isinstance(item.item, dict) and
|
|
|
|
'uuid' in item.item):
|
|
|
|
|
|
|
|
if skipped < offset:
|
|
|
|
skipped += 1
|
|
|
|
continue
|
|
|
|
|
|
|
|
if limit and processed >= limit:
|
|
|
|
break
|
|
|
|
|
|
|
|
# Approximate position based on priority comparison
|
|
|
|
approx_position = sum(1 for other in queue_list if other.priority < item.priority)
|
|
|
|
|
|
|
|
result.append({
|
|
|
|
'uuid': item.item['uuid'],
|
|
|
|
'position': approx_position, # Approximate
|
|
|
|
'priority': item.priority
|
|
|
|
})
|
|
|
|
processed += 1
|
|
|
|
|
|
|
|
return {
|
|
|
|
'items': result,
|
|
|
|
'total_items': total_items,
|
|
|
|
'returned_items': len(result),
|
|
|
|
'has_more': (offset + len(result)) < total_items,
|
|
|
|
'note': 'Positions are approximate for performance with large queues'
|
|
|
|
}
|
|
|
|
|
|
|
|
def get_queue_summary(self):
|
|
|
|
"""
|
|
|
|
Get a quick summary of queue state without expensive operations.
|
|
|
|
O(n) complexity - fast even for large queues.
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
dict: Queue summary statistics
|
|
|
|
"""
|
|
|
|
with self.mutex:
|
|
|
|
queue_list = list(self.queue)
|
|
|
|
total_items = len(queue_list)
|
|
|
|
|
|
|
|
if total_items == 0:
|
|
|
|
return {
|
|
|
|
'total_items': 0,
|
|
|
|
'priority_breakdown': {},
|
|
|
|
'immediate_items': 0,
|
|
|
|
'clone_items': 0,
|
|
|
|
'scheduled_items': 0
|
|
|
|
}
|
|
|
|
|
|
|
|
# Count items by priority type - O(n)
|
|
|
|
immediate_items = 0 # priority 1
|
|
|
|
clone_items = 0 # priority 5
|
|
|
|
scheduled_items = 0 # priority > 100 (timestamps)
|
|
|
|
priority_counts = {}
|
|
|
|
|
|
|
|
for item in queue_list:
|
|
|
|
priority = item.priority
|
|
|
|
priority_counts[priority] = priority_counts.get(priority, 0) + 1
|
|
|
|
|
|
|
|
if priority == 1:
|
|
|
|
immediate_items += 1
|
|
|
|
elif priority == 5:
|
|
|
|
clone_items += 1
|
|
|
|
elif priority > 100:
|
|
|
|
scheduled_items += 1
|
|
|
|
|
|
|
|
return {
|
|
|
|
'total_items': total_items,
|
|
|
|
'priority_breakdown': priority_counts,
|
|
|
|
'immediate_items': immediate_items,
|
|
|
|
'clone_items': clone_items,
|
|
|
|
'scheduled_items': scheduled_items,
|
|
|
|
'min_priority': min(priority_counts.keys()) if priority_counts else None,
|
|
|
|
'max_priority': max(priority_counts.keys()) if priority_counts else None
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
class AsyncSignalPriorityQueue(asyncio.PriorityQueue):
|
|
|
|
"""
|
|
|
|
Async version of SignalPriorityQueue that sends signals when items are added/removed.
|
|
|
|
|
|
|
|
This class extends asyncio.PriorityQueue and maintains the same signal behavior
|
|
|
|
as the synchronous version for real-time UI updates.
|
|
|
|
"""
|
|
|
|
|
|
|
|
def __init__(self, maxsize=0):
|
|
|
|
super().__init__(maxsize)
|
|
|
|
try:
|
|
|
|
self.queue_length_signal = signal('queue_length')
|
|
|
|
except Exception as e:
|
|
|
|
logger.critical(f"Exception: {e}")
|
|
|
|
|
|
|
|
async def put(self, item):
|
|
|
|
# Call the parent's put method first
|
|
|
|
await super().put(item)
|
|
|
|
|
|
|
|
# After putting the item in the queue, check if it has a UUID and emit signal
|
|
|
|
if hasattr(item, 'item') and isinstance(item.item, dict) and 'uuid' in item.item:
|
|
|
|
uuid = item.item['uuid']
|
|
|
|
# Get the signal and send it if it exists
|
|
|
|
watch_check_update = signal('watch_check_update')
|
|
|
|
if watch_check_update:
|
|
|
|
# Send the watch_uuid parameter
|
|
|
|
watch_check_update.send(watch_uuid=uuid)
|
|
|
|
|
|
|
|
# Send queue_length signal with current queue size
|
|
|
|
try:
|
|
|
|
if self.queue_length_signal:
|
|
|
|
self.queue_length_signal.send(length=self.qsize())
|
|
|
|
except Exception as e:
|
|
|
|
logger.critical(f"Exception: {e}")
|
|
|
|
|
|
|
|
async def get(self):
|
|
|
|
# Call the parent's get method first
|
|
|
|
item = await super().get()
|
|
|
|
|
|
|
|
# Send queue_length signal with current queue size
|
|
|
|
try:
|
|
|
|
if self.queue_length_signal:
|
|
|
|
self.queue_length_signal.send(length=self.qsize())
|
|
|
|
except Exception as e:
|
|
|
|
logger.critical(f"Exception: {e}")
|
|
|
|
return item
|
|
|
|
|
|
|
|
@property
|
|
|
|
def queue(self):
|
|
|
|
"""
|
|
|
|
Provide compatibility with sync PriorityQueue.queue access
|
|
|
|
Returns the internal queue for template access
|
|
|
|
"""
|
|
|
|
return self._queue if hasattr(self, '_queue') else []
|
|
|
|
|
|
|
|
def get_uuid_position(self, target_uuid):
|
|
|
|
"""
|
|
|
|
Find the position of a watch UUID in the async priority queue.
|
|
|
|
Optimized for large queues - O(n) complexity instead of O(n log n).
|
|
|
|
|
|
|
|
Args:
|
|
|
|
target_uuid: The UUID to search for
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
dict: Contains position info or None if not found
|
|
|
|
- position: 0-based position in queue (0 = next to be processed)
|
|
|
|
- total_items: total number of items in queue
|
|
|
|
- priority: the priority value of the found item
|
|
|
|
"""
|
|
|
|
queue_list = list(self._queue)
|
|
|
|
total_items = len(queue_list)
|
|
|
|
|
|
|
|
if total_items == 0:
|
|
|
|
return {
|
|
|
|
'position': None,
|
|
|
|
'total_items': 0,
|
|
|
|
'priority': None,
|
|
|
|
'found': False
|
|
|
|
}
|
|
|
|
|
|
|
|
# Find the target item and its priority first - O(n)
|
|
|
|
target_item = None
|
|
|
|
target_priority = None
|
|
|
|
|
|
|
|
for item in queue_list:
|
|
|
|
if (hasattr(item, 'item') and
|
|
|
|
isinstance(item.item, dict) and
|
|
|
|
item.item.get('uuid') == target_uuid):
|
|
|
|
target_item = item
|
|
|
|
target_priority = item.priority
|
|
|
|
break
|
|
|
|
|
|
|
|
if target_item is None:
|
|
|
|
return {
|
|
|
|
'position': None,
|
|
|
|
'total_items': total_items,
|
|
|
|
'priority': None,
|
|
|
|
'found': False
|
|
|
|
}
|
|
|
|
|
|
|
|
# Count how many items have higher priority (lower numbers) - O(n)
|
|
|
|
position = 0
|
|
|
|
for item in queue_list:
|
|
|
|
if item.priority < target_priority:
|
|
|
|
position += 1
|
|
|
|
elif item.priority == target_priority and item != target_item:
|
|
|
|
position += 1
|
|
|
|
|
|
|
|
return {
|
|
|
|
'position': position,
|
|
|
|
'total_items': total_items,
|
|
|
|
'priority': target_priority,
|
|
|
|
'found': True
|
|
|
|
}
|
|
|
|
|
|
|
|
def get_all_queued_uuids(self, limit=None, offset=0):
|
|
|
|
"""
|
|
|
|
Get UUIDs currently in the async queue with their positions.
|
|
|
|
For large queues, use limit/offset for pagination.
|
|
|
|
|
|
|
|
Args:
|
|
|
|
limit: Maximum number of items to return (None = all)
|
|
|
|
offset: Number of items to skip (for pagination)
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
dict: Contains items and metadata (same structure as sync version)
|
|
|
|
"""
|
|
|
|
queue_list = list(self._queue)
|
|
|
|
total_items = len(queue_list)
|
|
|
|
|
|
|
|
if total_items == 0:
|
|
|
|
return {
|
|
|
|
'items': [],
|
|
|
|
'total_items': 0,
|
|
|
|
'returned_items': 0,
|
|
|
|
'has_more': False
|
|
|
|
}
|
|
|
|
|
|
|
|
# Same logic as sync version but without mutex
|
|
|
|
if limit is not None and limit <= 100:
|
|
|
|
queue_items = sorted(queue_list)
|
|
|
|
end_idx = min(offset + limit, len(queue_items)) if limit else len(queue_items)
|
|
|
|
items_to_process = queue_items[offset:end_idx]
|
|
|
|
|
|
|
|
result = []
|
|
|
|
for position, item in enumerate(items_to_process, start=offset):
|
|
|
|
if (hasattr(item, 'item') and
|
|
|
|
isinstance(item.item, dict) and
|
|
|
|
'uuid' in item.item):
|
|
|
|
|
|
|
|
result.append({
|
|
|
|
'uuid': item.item['uuid'],
|
|
|
|
'position': position,
|
|
|
|
'priority': item.priority
|
|
|
|
})
|
|
|
|
|
|
|
|
return {
|
|
|
|
'items': result,
|
|
|
|
'total_items': total_items,
|
|
|
|
'returned_items': len(result),
|
|
|
|
'has_more': (offset + len(result)) < total_items
|
|
|
|
}
|
|
|
|
else:
|
|
|
|
# Fast approximate positions for large queues
|
|
|
|
result = []
|
|
|
|
processed = 0
|
|
|
|
skipped = 0
|
|
|
|
|
|
|
|
for item in queue_list:
|
|
|
|
if (hasattr(item, 'item') and
|
|
|
|
isinstance(item.item, dict) and
|
|
|
|
'uuid' in item.item):
|
|
|
|
|
|
|
|
if skipped < offset:
|
|
|
|
skipped += 1
|
|
|
|
continue
|
|
|
|
|
|
|
|
if limit and processed >= limit:
|
|
|
|
break
|
|
|
|
|
|
|
|
approx_position = sum(1 for other in queue_list if other.priority < item.priority)
|
|
|
|
|
|
|
|
result.append({
|
|
|
|
'uuid': item.item['uuid'],
|
|
|
|
'position': approx_position,
|
|
|
|
'priority': item.priority
|
|
|
|
})
|
|
|
|
processed += 1
|
|
|
|
|
|
|
|
return {
|
|
|
|
'items': result,
|
|
|
|
'total_items': total_items,
|
|
|
|
'returned_items': len(result),
|
|
|
|
'has_more': (offset + len(result)) < total_items,
|
|
|
|
'note': 'Positions are approximate for performance with large queues'
|
|
|
|
}
|
|
|
|
|
|
|
|
def get_queue_summary(self):
|
|
|
|
"""
|
|
|
|
Get a quick summary of async queue state.
|
|
|
|
O(n) complexity - fast even for large queues.
|
|
|
|
"""
|
|
|
|
queue_list = list(self._queue)
|
|
|
|
total_items = len(queue_list)
|
|
|
|
|
|
|
|
if total_items == 0:
|
|
|
|
return {
|
|
|
|
'total_items': 0,
|
|
|
|
'priority_breakdown': {},
|
|
|
|
'immediate_items': 0,
|
|
|
|
'clone_items': 0,
|
|
|
|
'scheduled_items': 0
|
|
|
|
}
|
|
|
|
|
|
|
|
immediate_items = 0
|
|
|
|
clone_items = 0
|
|
|
|
scheduled_items = 0
|
|
|
|
priority_counts = {}
|
|
|
|
|
|
|
|
for item in queue_list:
|
|
|
|
priority = item.priority
|
|
|
|
priority_counts[priority] = priority_counts.get(priority, 0) + 1
|
|
|
|
|
|
|
|
if priority == 1:
|
|
|
|
immediate_items += 1
|
|
|
|
elif priority == 5:
|
|
|
|
clone_items += 1
|
|
|
|
elif priority > 100:
|
|
|
|
scheduled_items += 1
|
|
|
|
|
|
|
|
return {
|
|
|
|
'total_items': total_items,
|
|
|
|
'priority_breakdown': priority_counts,
|
|
|
|
'immediate_items': immediate_items,
|
|
|
|
'clone_items': clone_items,
|
|
|
|
'scheduled_items': scheduled_items,
|
|
|
|
'min_priority': min(priority_counts.keys()) if priority_counts else None,
|
|
|
|
'max_priority': max(priority_counts.keys()) if priority_counts else None
|
|
|
|
}
|