ensure workers are running

socketio-tweaks
dgtlmoon 2025-05-30 16:14:31 +02:00
rodzic 01742dd670
commit e5aba3b2f0
3 zmienionych plików z 158 dodań i 9 usunięć

Wyświetl plik

@ -247,12 +247,22 @@ async def async_update_worker(worker_id, q, notification_q, app, datastore):
await asyncio.sleep(0.01)
except Exception as e:
logger.error(f"Worker {worker_id} unexpected error: {e}")
logger.error(f"Worker {worker_id} unexpected error processing {uuid}: {e}")
logger.error(f"Worker {worker_id} traceback:", exc_info=True)
# Make sure to mark UUID as completed even on error
if uuid:
worker_handler.set_uuid_processing(uuid, processing=False)
try:
worker_handler.set_uuid_processing(uuid, processing=False)
# Also update the watch with error information
if datastore and uuid in datastore.data['watching']:
datastore.update_watch(uuid=uuid, update_obj={'last_error': f"Worker error: {str(e)}"})
except Exception as cleanup_error:
logger.error(f"Worker {worker_id} error during cleanup: {cleanup_error}")
current_uuid = None
await asyncio.sleep(0.1)
# Brief pause before continuing to avoid tight error loops
await asyncio.sleep(1.0)
# Check if we should exit
if app.config.exit.is_set():

Wyświetl plik

@ -494,6 +494,33 @@ def changedetection_app(config=None, datastore_o=None):
result = memory_cleanup(app)
return jsonify({"status": "success", "message": "Memory cleanup completed", "result": result})
# Worker health check endpoint
@app.route('/worker-health', methods=['GET'])
@login_optionally_required
def worker_health():
from flask import jsonify
expected_workers = int(os.getenv("FETCH_WORKERS", datastore.data['settings']['requests']['workers']))
# Get basic status
status = worker_handler.get_worker_status()
# Perform health check
health_result = worker_handler.check_worker_health(
expected_count=expected_workers,
update_q=update_q,
notification_q=notification_q,
app=app,
datastore=datastore
)
return jsonify({
"status": "success",
"worker_status": status,
"health_check": health_result,
"expected_workers": expected_workers
})
# Start the async workers during app initialization
# Can be overridden by ENV or use the default settings
n_workers = int(os.getenv("FETCH_WORKERS", datastore.data['settings']['requests']['workers']))
@ -599,6 +626,7 @@ def ticker_thread_check_time_launch_checks():
import random
from changedetectionio import update_worker
proxy_last_called_time = {}
last_health_check = 0
recheck_time_minimum_seconds = int(os.getenv('MINIMUM_SECONDS_RECHECK_TIME', 3))
logger.debug(f"System env MINIMUM_SECONDS_RECHECK_TIME {recheck_time_minimum_seconds}")
@ -607,6 +635,23 @@ def ticker_thread_check_time_launch_checks():
while not app.config.exit.is_set():
# Periodic worker health check (every 60 seconds)
now = time.time()
if now - last_health_check > 60:
expected_workers = int(os.getenv("FETCH_WORKERS", datastore.data['settings']['requests']['workers']))
health_result = worker_handler.check_worker_health(
expected_count=expected_workers,
update_q=update_q,
notification_q=notification_q,
app=app,
datastore=datastore
)
if health_result['status'] != 'healthy':
logger.warning(f"Worker health check: {health_result['message']}")
last_health_check = now
# Get a list of watches by UUID that are currently fetching data
running_uuids = worker_handler.get_running_uuids()

Wyświetl plik

@ -64,13 +64,26 @@ def start_async_workers(n_workers, update_q, notification_q, app, datastore):
async def start_single_async_worker(worker_id, update_q, notification_q, app, datastore):
"""Start a single async worker"""
"""Start a single async worker with auto-restart capability"""
from changedetectionio.async_update_worker import async_update_worker
try:
await async_update_worker(worker_id, update_q, notification_q, app, datastore)
except Exception as e:
logger.error(f"Async worker {worker_id} crashed: {e}")
while not app.config.exit.is_set():
try:
logger.info(f"Starting async worker {worker_id}")
await async_update_worker(worker_id, update_q, notification_q, app, datastore)
# If we reach here, worker exited cleanly
logger.info(f"Async worker {worker_id} exited cleanly")
break
except asyncio.CancelledError:
# Task was cancelled (normal shutdown)
logger.info(f"Async worker {worker_id} cancelled")
break
except Exception as e:
logger.error(f"Async worker {worker_id} crashed: {e}")
logger.info(f"Restarting async worker {worker_id} in 5 seconds...")
await asyncio.sleep(5)
logger.info(f"Async worker {worker_id} shutdown complete")
def start_sync_workers(n_workers, update_q, notification_q, app, datastore):
@ -311,4 +324,85 @@ def get_worker_status():
'worker_count': get_worker_count(),
'running_uuids': get_running_uuids(),
'async_loop_running': async_loop is not None if USE_ASYNC_WORKERS else None,
}
}
def check_worker_health(expected_count, update_q=None, notification_q=None, app=None, datastore=None):
"""
Check if the expected number of workers are running and restart any missing ones.
Args:
expected_count: Expected number of workers
update_q, notification_q, app, datastore: Required for restarting workers
Returns:
dict: Health check results
"""
global running_async_tasks, running_update_threads
current_count = get_worker_count()
if current_count == expected_count:
return {
'status': 'healthy',
'expected_count': expected_count,
'actual_count': current_count,
'message': f'All {expected_count} workers running'
}
if USE_ASYNC_WORKERS:
# Check for crashed async workers
dead_workers = []
alive_count = 0
for i, task_future in enumerate(running_async_tasks[:]):
if task_future.done():
try:
result = task_future.result()
dead_workers.append(i)
logger.warning(f"Async worker {i} completed unexpectedly")
except Exception as e:
dead_workers.append(i)
logger.error(f"Async worker {i} crashed: {e}")
else:
alive_count += 1
# Remove dead workers from tracking
for i in reversed(dead_workers):
if i < len(running_async_tasks):
running_async_tasks.pop(i)
missing_workers = expected_count - alive_count
restarted_count = 0
if missing_workers > 0 and all([update_q, notification_q, app, datastore]):
logger.info(f"Restarting {missing_workers} crashed async workers")
for i in range(missing_workers):
worker_id = alive_count + i
try:
task_future = asyncio.run_coroutine_threadsafe(
start_single_async_worker(worker_id, update_q, notification_q, app, datastore),
async_loop
)
running_async_tasks.append(task_future)
restarted_count += 1
except Exception as e:
logger.error(f"Failed to restart worker {worker_id}: {e}")
return {
'status': 'repaired' if restarted_count > 0 else 'degraded',
'expected_count': expected_count,
'actual_count': alive_count,
'dead_workers': len(dead_workers),
'restarted_workers': restarted_count,
'message': f'Found {len(dead_workers)} dead workers, restarted {restarted_count}'
}
else:
# For sync workers, just report the issue (harder to auto-restart)
return {
'status': 'degraded',
'expected_count': expected_count,
'actual_count': current_count,
'message': f'Worker count mismatch: expected {expected_count}, got {current_count}'
}