kopia lustrzana https://github.com/dgtlmoon/changedetection.io
395 wiersze
13 KiB
Python
395 wiersze
13 KiB
Python
"""
|
|
Worker management module for changedetection.io
|
|
|
|
Handles asynchronous workers for dynamic worker scaling.
|
|
Sync worker support has been removed in favor of async-only architecture.
|
|
"""
|
|
|
|
import asyncio
|
|
import os
|
|
import threading
|
|
import time
|
|
from loguru import logger
|
|
|
|
# Global worker state
|
|
running_async_tasks = []
|
|
async_loop = None
|
|
async_loop_thread = None
|
|
|
|
# Track currently processing UUIDs for async workers
|
|
currently_processing_uuids = set()
|
|
|
|
# Configuration - async workers only
|
|
USE_ASYNC_WORKERS = True
|
|
|
|
|
|
def start_async_event_loop():
|
|
"""Start a dedicated event loop for async workers in a separate thread"""
|
|
global async_loop
|
|
logger.info("Starting async event loop for workers")
|
|
|
|
try:
|
|
# Create a new event loop for this thread
|
|
async_loop = asyncio.new_event_loop()
|
|
# Set it as the event loop for this thread
|
|
asyncio.set_event_loop(async_loop)
|
|
|
|
logger.debug(f"Event loop created and set: {async_loop}")
|
|
|
|
# Run the event loop forever
|
|
async_loop.run_forever()
|
|
except Exception as e:
|
|
logger.error(f"Async event loop error: {e}")
|
|
finally:
|
|
# Clean up
|
|
if async_loop and not async_loop.is_closed():
|
|
async_loop.close()
|
|
async_loop = None
|
|
logger.info("Async event loop stopped")
|
|
|
|
|
|
def start_async_workers(n_workers, update_q, notification_q, app, datastore):
|
|
"""Start the async worker management system"""
|
|
global async_loop_thread, async_loop, running_async_tasks, currently_processing_uuids
|
|
|
|
# Clear any stale UUID tracking state
|
|
currently_processing_uuids.clear()
|
|
|
|
# Start the event loop in a separate thread
|
|
async_loop_thread = threading.Thread(target=start_async_event_loop, daemon=True)
|
|
async_loop_thread.start()
|
|
|
|
# Wait for the loop to be available (with timeout for safety)
|
|
max_wait_time = 5.0
|
|
wait_start = time.time()
|
|
while async_loop is None and (time.time() - wait_start) < max_wait_time:
|
|
time.sleep(0.1)
|
|
|
|
if async_loop is None:
|
|
logger.error("Failed to start async event loop within timeout")
|
|
return
|
|
|
|
# Additional brief wait to ensure loop is running
|
|
time.sleep(0.2)
|
|
|
|
# Start async workers
|
|
logger.info(f"Starting {n_workers} async workers")
|
|
for i in range(n_workers):
|
|
try:
|
|
# Use a factory function to create named worker coroutines
|
|
def create_named_worker(worker_id):
|
|
async def named_worker():
|
|
task = asyncio.current_task()
|
|
if task:
|
|
task.set_name(f"async-worker-{worker_id}")
|
|
return await start_single_async_worker(worker_id, update_q, notification_q, app, datastore)
|
|
return named_worker()
|
|
|
|
task_future = asyncio.run_coroutine_threadsafe(create_named_worker(i), async_loop)
|
|
running_async_tasks.append(task_future)
|
|
except RuntimeError as e:
|
|
logger.error(f"Failed to start async worker {i}: {e}")
|
|
continue
|
|
|
|
|
|
async def start_single_async_worker(worker_id, update_q, notification_q, app, datastore):
|
|
"""Start a single async worker with auto-restart capability"""
|
|
from changedetectionio.async_update_worker import async_update_worker
|
|
|
|
# Check if we're in pytest environment - if so, be more gentle with logging
|
|
import os
|
|
in_pytest = "pytest" in os.sys.modules or "PYTEST_CURRENT_TEST" in os.environ
|
|
|
|
while not app.config.exit.is_set():
|
|
try:
|
|
if not in_pytest:
|
|
logger.info(f"Starting async worker {worker_id}")
|
|
await async_update_worker(worker_id, update_q, notification_q, app, datastore)
|
|
# If we reach here, worker exited cleanly
|
|
if not in_pytest:
|
|
logger.info(f"Async worker {worker_id} exited cleanly")
|
|
break
|
|
except asyncio.CancelledError:
|
|
# Task was cancelled (normal shutdown)
|
|
if not in_pytest:
|
|
logger.info(f"Async worker {worker_id} cancelled")
|
|
break
|
|
except Exception as e:
|
|
logger.error(f"Async worker {worker_id} crashed: {e}")
|
|
if not in_pytest:
|
|
logger.info(f"Restarting async worker {worker_id} in 5 seconds...")
|
|
await asyncio.sleep(5)
|
|
|
|
if not in_pytest:
|
|
logger.info(f"Async worker {worker_id} shutdown complete")
|
|
|
|
|
|
def start_workers(n_workers, update_q, notification_q, app, datastore):
|
|
"""Start async workers - sync workers are deprecated"""
|
|
start_async_workers(n_workers, update_q, notification_q, app, datastore)
|
|
|
|
|
|
def add_worker(update_q, notification_q, app, datastore):
|
|
"""Add a new async worker (for dynamic scaling)"""
|
|
global running_async_tasks
|
|
|
|
if not async_loop:
|
|
logger.error("Async loop not running, cannot add worker")
|
|
return False
|
|
|
|
worker_id = len(running_async_tasks)
|
|
logger.info(f"Adding async worker {worker_id}")
|
|
|
|
task_future = asyncio.run_coroutine_threadsafe(
|
|
start_single_async_worker(worker_id, update_q, notification_q, app, datastore), async_loop
|
|
)
|
|
running_async_tasks.append(task_future)
|
|
return True
|
|
|
|
|
|
def remove_worker():
|
|
"""Remove an async worker (for dynamic scaling)"""
|
|
global running_async_tasks
|
|
|
|
if not running_async_tasks:
|
|
return False
|
|
|
|
# Cancel the last worker
|
|
task_future = running_async_tasks.pop()
|
|
task_future.cancel()
|
|
logger.info(f"Removed async worker, {len(running_async_tasks)} workers remaining")
|
|
return True
|
|
|
|
|
|
def get_worker_count():
|
|
"""Get current number of async workers"""
|
|
return len(running_async_tasks)
|
|
|
|
|
|
def get_running_uuids():
|
|
"""Get list of UUIDs currently being processed by async workers"""
|
|
return list(currently_processing_uuids)
|
|
|
|
|
|
def set_uuid_processing(uuid, processing=True):
|
|
"""Mark a UUID as being processed or completed"""
|
|
global currently_processing_uuids
|
|
if processing:
|
|
currently_processing_uuids.add(uuid)
|
|
logger.debug(f"Started processing UUID: {uuid}")
|
|
else:
|
|
currently_processing_uuids.discard(uuid)
|
|
logger.debug(f"Finished processing UUID: {uuid}")
|
|
|
|
|
|
def is_watch_running(watch_uuid):
|
|
"""Check if a specific watch is currently being processed"""
|
|
return watch_uuid in get_running_uuids()
|
|
|
|
|
|
def queue_item_async_safe(update_q, item):
|
|
"""Queue an item for async queue processing"""
|
|
if async_loop and not async_loop.is_closed():
|
|
try:
|
|
# For async queue, schedule the put operation
|
|
asyncio.run_coroutine_threadsafe(update_q.put(item), async_loop)
|
|
except RuntimeError as e:
|
|
logger.error(f"Failed to queue item: {e}")
|
|
else:
|
|
logger.error("Async loop not available or closed for queueing item")
|
|
|
|
|
|
def shutdown_workers():
|
|
"""Shutdown all async workers fast and aggressively"""
|
|
global async_loop, async_loop_thread, running_async_tasks
|
|
|
|
# Check if we're in pytest environment - if so, be more gentle with logging
|
|
import os
|
|
in_pytest = "pytest" in os.sys.modules or "PYTEST_CURRENT_TEST" in os.environ
|
|
|
|
if not in_pytest:
|
|
logger.info("Fast shutdown of async workers initiated...")
|
|
|
|
# Cancel all async tasks immediately
|
|
for task_future in running_async_tasks:
|
|
if not task_future.done():
|
|
task_future.cancel()
|
|
|
|
# Stop the async event loop immediately
|
|
if async_loop and not async_loop.is_closed():
|
|
try:
|
|
async_loop.call_soon_threadsafe(async_loop.stop)
|
|
except RuntimeError:
|
|
# Loop might already be stopped
|
|
pass
|
|
|
|
running_async_tasks.clear()
|
|
async_loop = None
|
|
|
|
# Give async thread minimal time to finish, then continue
|
|
if async_loop_thread and async_loop_thread.is_alive():
|
|
async_loop_thread.join(timeout=1.0) # Only 1 second timeout
|
|
if async_loop_thread.is_alive() and not in_pytest:
|
|
logger.info("Async thread still running after timeout - continuing with shutdown")
|
|
async_loop_thread = None
|
|
|
|
if not in_pytest:
|
|
logger.info("Async workers fast shutdown complete")
|
|
|
|
|
|
|
|
|
|
def adjust_async_worker_count(new_count, update_q=None, notification_q=None, app=None, datastore=None):
|
|
"""
|
|
Dynamically adjust the number of async workers.
|
|
|
|
Args:
|
|
new_count: Target number of workers
|
|
update_q, notification_q, app, datastore: Required for adding new workers
|
|
|
|
Returns:
|
|
dict: Status of the adjustment operation
|
|
"""
|
|
global running_async_tasks
|
|
|
|
current_count = get_worker_count()
|
|
|
|
if new_count == current_count:
|
|
return {
|
|
'status': 'no_change',
|
|
'message': f'Worker count already at {current_count}',
|
|
'current_count': current_count
|
|
}
|
|
|
|
if new_count > current_count:
|
|
# Add workers
|
|
workers_to_add = new_count - current_count
|
|
logger.info(f"Adding {workers_to_add} async workers (from {current_count} to {new_count})")
|
|
|
|
if not all([update_q, notification_q, app, datastore]):
|
|
return {
|
|
'status': 'error',
|
|
'message': 'Missing required parameters to add workers',
|
|
'current_count': current_count
|
|
}
|
|
|
|
for i in range(workers_to_add):
|
|
worker_id = len(running_async_tasks)
|
|
task_future = asyncio.run_coroutine_threadsafe(
|
|
start_single_async_worker(worker_id, update_q, notification_q, app, datastore),
|
|
async_loop
|
|
)
|
|
running_async_tasks.append(task_future)
|
|
|
|
return {
|
|
'status': 'success',
|
|
'message': f'Added {workers_to_add} workers',
|
|
'previous_count': current_count,
|
|
'current_count': new_count
|
|
}
|
|
|
|
else:
|
|
# Remove workers
|
|
workers_to_remove = current_count - new_count
|
|
logger.info(f"Removing {workers_to_remove} async workers (from {current_count} to {new_count})")
|
|
|
|
removed_count = 0
|
|
for _ in range(workers_to_remove):
|
|
if running_async_tasks:
|
|
task_future = running_async_tasks.pop()
|
|
task_future.cancel()
|
|
# Wait for the task to actually stop
|
|
try:
|
|
task_future.result(timeout=5) # 5 second timeout
|
|
except Exception:
|
|
pass # Task was cancelled, which is expected
|
|
removed_count += 1
|
|
|
|
return {
|
|
'status': 'success',
|
|
'message': f'Removed {removed_count} workers',
|
|
'previous_count': current_count,
|
|
'current_count': current_count - removed_count
|
|
}
|
|
|
|
|
|
def get_worker_status():
|
|
"""Get status information about async workers"""
|
|
return {
|
|
'worker_type': 'async',
|
|
'worker_count': get_worker_count(),
|
|
'running_uuids': get_running_uuids(),
|
|
'async_loop_running': async_loop is not None,
|
|
}
|
|
|
|
|
|
def check_worker_health(expected_count, update_q=None, notification_q=None, app=None, datastore=None):
|
|
"""
|
|
Check if the expected number of async workers are running and restart any missing ones.
|
|
|
|
Args:
|
|
expected_count: Expected number of workers
|
|
update_q, notification_q, app, datastore: Required for restarting workers
|
|
|
|
Returns:
|
|
dict: Health check results
|
|
"""
|
|
global running_async_tasks
|
|
|
|
current_count = get_worker_count()
|
|
|
|
if current_count == expected_count:
|
|
return {
|
|
'status': 'healthy',
|
|
'expected_count': expected_count,
|
|
'actual_count': current_count,
|
|
'message': f'All {expected_count} async workers running'
|
|
}
|
|
|
|
# Check for crashed async workers
|
|
dead_workers = []
|
|
alive_count = 0
|
|
|
|
for i, task_future in enumerate(running_async_tasks[:]):
|
|
if task_future.done():
|
|
try:
|
|
result = task_future.result()
|
|
dead_workers.append(i)
|
|
logger.warning(f"Async worker {i} completed unexpectedly")
|
|
except Exception as e:
|
|
dead_workers.append(i)
|
|
logger.error(f"Async worker {i} crashed: {e}")
|
|
else:
|
|
alive_count += 1
|
|
|
|
# Remove dead workers from tracking
|
|
for i in reversed(dead_workers):
|
|
if i < len(running_async_tasks):
|
|
running_async_tasks.pop(i)
|
|
|
|
missing_workers = expected_count - alive_count
|
|
restarted_count = 0
|
|
|
|
if missing_workers > 0 and all([update_q, notification_q, app, datastore]):
|
|
logger.info(f"Restarting {missing_workers} crashed async workers")
|
|
|
|
for i in range(missing_workers):
|
|
worker_id = alive_count + i
|
|
try:
|
|
task_future = asyncio.run_coroutine_threadsafe(
|
|
start_single_async_worker(worker_id, update_q, notification_q, app, datastore),
|
|
async_loop
|
|
)
|
|
running_async_tasks.append(task_future)
|
|
restarted_count += 1
|
|
except Exception as e:
|
|
logger.error(f"Failed to restart worker {worker_id}: {e}")
|
|
|
|
return {
|
|
'status': 'repaired' if restarted_count > 0 else 'degraded',
|
|
'expected_count': expected_count,
|
|
'actual_count': alive_count,
|
|
'dead_workers': len(dead_workers),
|
|
'restarted_workers': restarted_count,
|
|
'message': f'Found {len(dead_workers)} dead workers, restarted {restarted_count}'
|
|
} |