diff --git a/changedetectionio/async_update_worker.py b/changedetectionio/async_update_worker.py index 03807c26..ae0501b0 100644 --- a/changedetectionio/async_update_worker.py +++ b/changedetectionio/async_update_worker.py @@ -25,6 +25,11 @@ async def async_update_worker(worker_id, q, notification_q, app, datastore): app: Flask application instance datastore: Application datastore """ + # Set a descriptive name for this task + task = asyncio.current_task() + if task: + task.set_name(f"async-worker-{worker_id}") + logger.info(f"Starting async worker {worker_id}") while not app.config.exit.is_set(): @@ -387,7 +392,12 @@ async def async_update_worker(worker_id, q, notification_q, app, datastore): if app.config.exit.is_set(): break - logger.info(f"Worker {worker_id} shutting down") + # Check if we're in pytest environment - if so, be more gentle with logging + import sys + in_pytest = "pytest" in sys.modules or "PYTEST_CURRENT_TEST" in os.environ + + if not in_pytest: + logger.info(f"Worker {worker_id} shutting down") def cleanup_error_artifacts(uuid, datastore): diff --git a/changedetectionio/realtime/socket_server.py b/changedetectionio/realtime/socket_server.py index fb70e585..c9241486 100644 --- a/changedetectionio/realtime/socket_server.py +++ b/changedetectionio/realtime/socket_server.py @@ -139,7 +139,12 @@ class SignalHandler: break time.sleep(0.5) - logger.info("Queue update thread stopped (threading mode)") + # Check if we're in pytest environment - if so, be more gentle with logging + import sys + in_pytest = "pytest" in sys.modules or "PYTEST_CURRENT_TEST" in os.environ + + if not in_pytest: + logger.info("Queue update thread stopped (threading mode)") def handle_watch_update(socketio, **kwargs): diff --git a/changedetectionio/store.py b/changedetectionio/store.py index f200bef3..d7ef9cd9 100644 --- a/changedetectionio/store.py +++ b/changedetectionio/store.py @@ -407,7 +407,12 @@ class ChangeDetectionStore: # This is a fairly basic strategy to deal with the case that the file is corrupted, # system was out of memory, out of RAM etc with open(self.json_store_path+".tmp", 'w') as json_file: - json.dump(data, json_file, indent=4) + # Use compact JSON in production for better performance + debug_mode = os.environ.get('CHANGEDETECTION_DEBUG', 'false').lower() == 'true' + if debug_mode: + json.dump(data, json_file, indent=4) + else: + json.dump(data, json_file, separators=(',', ':')) os.replace(self.json_store_path+".tmp", self.json_store_path) except Exception as e: logger.error(f"Error writing JSON!! (Main JSON file save was skipped) : {str(e)}") diff --git a/changedetectionio/tests/conftest.py b/changedetectionio/tests/conftest.py index c1195bcb..be32f6ee 100644 --- a/changedetectionio/tests/conftest.py +++ b/changedetectionio/tests/conftest.py @@ -106,8 +106,33 @@ def app(request): app.config['STOP_THREADS'] = True def teardown(): + # Stop all threads and services datastore.stop_thread = True app.config.exit.set() + + # Shutdown workers gracefully before loguru cleanup + try: + from changedetectionio import worker_handler + worker_handler.shutdown_workers() + except Exception: + pass + + # Stop socket server threads + try: + from changedetectionio.flask_app import socketio_server + if socketio_server and hasattr(socketio_server, 'shutdown'): + socketio_server.shutdown() + except Exception: + pass + + # Give threads a moment to finish their shutdown + import time + time.sleep(0.1) + + # Remove all loguru handlers to prevent "closed file" errors + logger.remove() + + # Cleanup files cleanup(app_config['datastore_path']) diff --git a/changedetectionio/worker_handler.py b/changedetectionio/worker_handler.py index 2ebe5ac9..953d2354 100644 --- a/changedetectionio/worker_handler.py +++ b/changedetectionio/worker_handler.py @@ -76,9 +76,16 @@ def start_async_workers(n_workers, update_q, notification_q, app, datastore): logger.info(f"Starting {n_workers} async workers") for i in range(n_workers): try: - task_future = asyncio.run_coroutine_threadsafe( - start_single_async_worker(i, update_q, notification_q, app, datastore), async_loop - ) + # Use a factory function to create named worker coroutines + def create_named_worker(worker_id): + async def named_worker(): + task = asyncio.current_task() + if task: + task.set_name(f"async-worker-{worker_id}") + return await start_single_async_worker(worker_id, update_q, notification_q, app, datastore) + return named_worker() + + task_future = asyncio.run_coroutine_threadsafe(create_named_worker(i), async_loop) running_async_tasks.append(task_future) except RuntimeError as e: logger.error(f"Failed to start async worker {i}: {e}") @@ -89,23 +96,32 @@ async def start_single_async_worker(worker_id, update_q, notification_q, app, da """Start a single async worker with auto-restart capability""" from changedetectionio.async_update_worker import async_update_worker + # Check if we're in pytest environment - if so, be more gentle with logging + import os + in_pytest = "pytest" in os.sys.modules or "PYTEST_CURRENT_TEST" in os.environ + while not app.config.exit.is_set(): try: - logger.info(f"Starting async worker {worker_id}") + if not in_pytest: + logger.info(f"Starting async worker {worker_id}") await async_update_worker(worker_id, update_q, notification_q, app, datastore) # If we reach here, worker exited cleanly - logger.info(f"Async worker {worker_id} exited cleanly") + if not in_pytest: + logger.info(f"Async worker {worker_id} exited cleanly") break except asyncio.CancelledError: # Task was cancelled (normal shutdown) - logger.info(f"Async worker {worker_id} cancelled") + if not in_pytest: + logger.info(f"Async worker {worker_id} cancelled") break except Exception as e: logger.error(f"Async worker {worker_id} crashed: {e}") - logger.info(f"Restarting async worker {worker_id} in 5 seconds...") + if not in_pytest: + logger.info(f"Restarting async worker {worker_id} in 5 seconds...") await asyncio.sleep(5) - logger.info(f"Async worker {worker_id} shutdown complete") + if not in_pytest: + logger.info(f"Async worker {worker_id} shutdown complete") def start_workers(n_workers, update_q, notification_q, app, datastore): @@ -187,12 +203,17 @@ def shutdown_workers(): """Shutdown all async workers fast and aggressively""" global async_loop, async_loop_thread, running_async_tasks - logger.info("Fast shutdown of async workers initiated...") + # Check if we're in pytest environment - if so, be more gentle with logging + import os + in_pytest = "pytest" in os.sys.modules or "PYTEST_CURRENT_TEST" in os.environ + + if not in_pytest: + logger.info("Fast shutdown of async workers initiated...") # Cancel all async tasks immediately for task_future in running_async_tasks: - task_future.cancel() - running_async_tasks.clear() + if not task_future.done(): + task_future.cancel() # Stop the async event loop immediately if async_loop and not async_loop.is_closed(): @@ -201,16 +222,21 @@ def shutdown_workers(): except RuntimeError: # Loop might already be stopped pass - async_loop = None + + running_async_tasks.clear() + async_loop = None # Give async thread minimal time to finish, then continue if async_loop_thread and async_loop_thread.is_alive(): async_loop_thread.join(timeout=1.0) # Only 1 second timeout - if async_loop_thread.is_alive(): + if async_loop_thread.is_alive() and not in_pytest: logger.info("Async thread still running after timeout - continuing with shutdown") async_loop_thread = None - logger.info("Async workers fast shutdown complete") + if not in_pytest: + logger.info("Async workers fast shutdown complete") + + def adjust_async_worker_count(new_count, update_q=None, notification_q=None, app=None, datastore=None):