Tidy up async worker names and cleanups when in test mode

pull/3220/head
dgtlmoon 2025-06-02 19:18:03 +02:00
rodzic 6c3e88e261
commit 03e751b57f
5 zmienionych plików z 88 dodań i 17 usunięć

Wyświetl plik

@ -25,6 +25,11 @@ async def async_update_worker(worker_id, q, notification_q, app, datastore):
app: Flask application instance
datastore: Application datastore
"""
# Set a descriptive name for this task
task = asyncio.current_task()
if task:
task.set_name(f"async-worker-{worker_id}")
logger.info(f"Starting async worker {worker_id}")
while not app.config.exit.is_set():
@ -387,7 +392,12 @@ async def async_update_worker(worker_id, q, notification_q, app, datastore):
if app.config.exit.is_set():
break
logger.info(f"Worker {worker_id} shutting down")
# Check if we're in pytest environment - if so, be more gentle with logging
import sys
in_pytest = "pytest" in sys.modules or "PYTEST_CURRENT_TEST" in os.environ
if not in_pytest:
logger.info(f"Worker {worker_id} shutting down")
def cleanup_error_artifacts(uuid, datastore):

Wyświetl plik

@ -139,7 +139,12 @@ class SignalHandler:
break
time.sleep(0.5)
logger.info("Queue update thread stopped (threading mode)")
# Check if we're in pytest environment - if so, be more gentle with logging
import sys
in_pytest = "pytest" in sys.modules or "PYTEST_CURRENT_TEST" in os.environ
if not in_pytest:
logger.info("Queue update thread stopped (threading mode)")
def handle_watch_update(socketio, **kwargs):

Wyświetl plik

@ -407,7 +407,12 @@ class ChangeDetectionStore:
# This is a fairly basic strategy to deal with the case that the file is corrupted,
# system was out of memory, out of RAM etc
with open(self.json_store_path+".tmp", 'w') as json_file:
json.dump(data, json_file, indent=4)
# Use compact JSON in production for better performance
debug_mode = os.environ.get('CHANGEDETECTION_DEBUG', 'false').lower() == 'true'
if debug_mode:
json.dump(data, json_file, indent=4)
else:
json.dump(data, json_file, separators=(',', ':'))
os.replace(self.json_store_path+".tmp", self.json_store_path)
except Exception as e:
logger.error(f"Error writing JSON!! (Main JSON file save was skipped) : {str(e)}")

Wyświetl plik

@ -106,8 +106,33 @@ def app(request):
app.config['STOP_THREADS'] = True
def teardown():
# Stop all threads and services
datastore.stop_thread = True
app.config.exit.set()
# Shutdown workers gracefully before loguru cleanup
try:
from changedetectionio import worker_handler
worker_handler.shutdown_workers()
except Exception:
pass
# Stop socket server threads
try:
from changedetectionio.flask_app import socketio_server
if socketio_server and hasattr(socketio_server, 'shutdown'):
socketio_server.shutdown()
except Exception:
pass
# Give threads a moment to finish their shutdown
import time
time.sleep(0.1)
# Remove all loguru handlers to prevent "closed file" errors
logger.remove()
# Cleanup files
cleanup(app_config['datastore_path'])

Wyświetl plik

@ -76,9 +76,16 @@ def start_async_workers(n_workers, update_q, notification_q, app, datastore):
logger.info(f"Starting {n_workers} async workers")
for i in range(n_workers):
try:
task_future = asyncio.run_coroutine_threadsafe(
start_single_async_worker(i, update_q, notification_q, app, datastore), async_loop
)
# Use a factory function to create named worker coroutines
def create_named_worker(worker_id):
async def named_worker():
task = asyncio.current_task()
if task:
task.set_name(f"async-worker-{worker_id}")
return await start_single_async_worker(worker_id, update_q, notification_q, app, datastore)
return named_worker()
task_future = asyncio.run_coroutine_threadsafe(create_named_worker(i), async_loop)
running_async_tasks.append(task_future)
except RuntimeError as e:
logger.error(f"Failed to start async worker {i}: {e}")
@ -89,23 +96,32 @@ async def start_single_async_worker(worker_id, update_q, notification_q, app, da
"""Start a single async worker with auto-restart capability"""
from changedetectionio.async_update_worker import async_update_worker
# Check if we're in pytest environment - if so, be more gentle with logging
import os
in_pytest = "pytest" in os.sys.modules or "PYTEST_CURRENT_TEST" in os.environ
while not app.config.exit.is_set():
try:
logger.info(f"Starting async worker {worker_id}")
if not in_pytest:
logger.info(f"Starting async worker {worker_id}")
await async_update_worker(worker_id, update_q, notification_q, app, datastore)
# If we reach here, worker exited cleanly
logger.info(f"Async worker {worker_id} exited cleanly")
if not in_pytest:
logger.info(f"Async worker {worker_id} exited cleanly")
break
except asyncio.CancelledError:
# Task was cancelled (normal shutdown)
logger.info(f"Async worker {worker_id} cancelled")
if not in_pytest:
logger.info(f"Async worker {worker_id} cancelled")
break
except Exception as e:
logger.error(f"Async worker {worker_id} crashed: {e}")
logger.info(f"Restarting async worker {worker_id} in 5 seconds...")
if not in_pytest:
logger.info(f"Restarting async worker {worker_id} in 5 seconds...")
await asyncio.sleep(5)
logger.info(f"Async worker {worker_id} shutdown complete")
if not in_pytest:
logger.info(f"Async worker {worker_id} shutdown complete")
def start_workers(n_workers, update_q, notification_q, app, datastore):
@ -187,12 +203,17 @@ def shutdown_workers():
"""Shutdown all async workers fast and aggressively"""
global async_loop, async_loop_thread, running_async_tasks
logger.info("Fast shutdown of async workers initiated...")
# Check if we're in pytest environment - if so, be more gentle with logging
import os
in_pytest = "pytest" in os.sys.modules or "PYTEST_CURRENT_TEST" in os.environ
if not in_pytest:
logger.info("Fast shutdown of async workers initiated...")
# Cancel all async tasks immediately
for task_future in running_async_tasks:
task_future.cancel()
running_async_tasks.clear()
if not task_future.done():
task_future.cancel()
# Stop the async event loop immediately
if async_loop and not async_loop.is_closed():
@ -201,16 +222,21 @@ def shutdown_workers():
except RuntimeError:
# Loop might already be stopped
pass
async_loop = None
running_async_tasks.clear()
async_loop = None
# Give async thread minimal time to finish, then continue
if async_loop_thread and async_loop_thread.is_alive():
async_loop_thread.join(timeout=1.0) # Only 1 second timeout
if async_loop_thread.is_alive():
if async_loop_thread.is_alive() and not in_pytest:
logger.info("Async thread still running after timeout - continuing with shutdown")
async_loop_thread = None
logger.info("Async workers fast shutdown complete")
if not in_pytest:
logger.info("Async workers fast shutdown complete")
def adjust_async_worker_count(new_count, update_q=None, notification_q=None, app=None, datastore=None):