pull/3220/head
dgtlmoon 2025-05-30 16:09:25 +02:00
rodzic 34fbfa7113
commit 01742dd670
5 zmienionych plików z 124 dodań i 4 usunięć

Wyświetl plik

@ -67,7 +67,32 @@ def construct_blueprint(datastore: ChangeDetectionStore):
del (app_update['password'])
datastore.data['settings']['application'].update(app_update)
# Handle dynamic worker count adjustment
old_worker_count = datastore.data['settings']['requests'].get('workers', 1)
new_worker_count = form.data['requests'].get('workers', 1)
datastore.data['settings']['requests'].update(form.data['requests'])
# Adjust worker count if it changed
if new_worker_count != old_worker_count:
from changedetectionio import worker_handler
from changedetectionio.flask_app import update_q, notification_q, app, datastore as ds
result = worker_handler.adjust_async_worker_count(
new_count=new_worker_count,
update_q=update_q,
notification_q=notification_q,
app=app,
datastore=ds
)
if result['status'] == 'success':
flash(f"Worker count adjusted: {result['message']}", 'notice')
elif result['status'] == 'not_supported':
flash("Dynamic worker adjustment not supported for sync workers", 'warning')
elif result['status'] == 'error':
flash(f"Error adjusting workers: {result['message']}", 'error')
if not os.getenv("SALTED_PASS", False) and len(form.application.form.password.encrypted_password):
datastore.data['settings']['application']['password'] = form.application.form.password.encrypted_password

Wyświetl plik

@ -47,6 +47,10 @@
{{ render_field(form.requests.form.jitter_seconds, class="jitter_seconds") }}
<span class="pure-form-message-inline">Example - 3 seconds random jitter could trigger up to 3 seconds earlier or up to 3 seconds later</span>
</div>
<div class="pure-control-group">
{{ render_field(form.requests.form.workers) }}
<span class="pure-form-message-inline">Number of concurrent workers to process watches. More workers = faster processing but higher memory usage.</span>
</div>
<div class="pure-control-group">
{{ render_field(form.application.form.filter_failure_notification_threshold_attempts, class="filter_failure_notification_threshold_attempts") }}
<span class="pure-form-message-inline">After this many consecutive times that the CSS/xPath filter is missing, send a notification

Wyświetl plik

@ -494,6 +494,12 @@ def changedetection_app(config=None, datastore_o=None):
result = memory_cleanup(app)
return jsonify({"status": "success", "message": "Memory cleanup completed", "result": result})
# Start the async workers during app initialization
# Can be overridden by ENV or use the default settings
n_workers = int(os.getenv("FETCH_WORKERS", datastore.data['settings']['requests']['workers']))
logger.info(f"Starting {n_workers} workers during app initialization")
worker_handler.start_workers(n_workers, update_q, notification_q, app, datastore)
# @todo handle ctrl break
ticker_thread = threading.Thread(target=ticker_thread_check_time_launch_checks).start()
threading.Thread(target=notification_runner).start()
@ -597,10 +603,7 @@ def ticker_thread_check_time_launch_checks():
recheck_time_minimum_seconds = int(os.getenv('MINIMUM_SECONDS_RECHECK_TIME', 3))
logger.debug(f"System env MINIMUM_SECONDS_RECHECK_TIME {recheck_time_minimum_seconds}")
# Spin up Workers that do the fetching
# Can be overriden by ENV or use the default settings
n_workers = int(os.getenv("FETCH_WORKERS", datastore.data['settings']['requests']['workers']))
worker_handler.start_workers(n_workers, update_q, notification_q, app, datastore)
# Workers are now started during app initialization, not here
while not app.config.exit.is_set():

Wyświetl plik

@ -719,6 +719,12 @@ class globalSettingsRequestForm(Form):
jitter_seconds = IntegerField('Random jitter seconds ± check',
render_kw={"style": "width: 5em;"},
validators=[validators.NumberRange(min=0, message="Should contain zero or more seconds")])
workers = IntegerField('Number of fetch workers',
render_kw={"style": "width: 5em;"},
validators=[validators.NumberRange(min=1, max=50,
message="Should be between 1 and 50")])
extra_proxies = FieldList(FormField(SingleExtraProxy), min_entries=5)
extra_browsers = FieldList(FormField(SingleExtraBrowser), min_entries=5)

Wyświetl plik

@ -222,6 +222,88 @@ def shutdown_workers():
logger.info("Workers shutdown complete")
def adjust_async_worker_count(new_count, update_q=None, notification_q=None, app=None, datastore=None):
"""
Dynamically adjust the number of async workers.
Args:
new_count: Target number of workers
update_q, notification_q, app, datastore: Required for adding new workers
Returns:
dict: Status of the adjustment operation
"""
global running_async_tasks, running_update_threads
current_count = get_worker_count()
if new_count == current_count:
return {
'status': 'no_change',
'message': f'Worker count already at {current_count}',
'current_count': current_count
}
if USE_ASYNC_WORKERS:
if new_count > current_count:
# Add workers
workers_to_add = new_count - current_count
logger.info(f"Adding {workers_to_add} async workers (from {current_count} to {new_count})")
if not all([update_q, notification_q, app, datastore]):
return {
'status': 'error',
'message': 'Missing required parameters to add workers',
'current_count': current_count
}
for i in range(workers_to_add):
worker_id = len(running_async_tasks)
task_future = asyncio.run_coroutine_threadsafe(
start_single_async_worker(worker_id, update_q, notification_q, app, datastore),
async_loop
)
running_async_tasks.append(task_future)
return {
'status': 'success',
'message': f'Added {workers_to_add} workers',
'previous_count': current_count,
'current_count': new_count
}
else:
# Remove workers
workers_to_remove = current_count - new_count
logger.info(f"Removing {workers_to_remove} async workers (from {current_count} to {new_count})")
removed_count = 0
for _ in range(workers_to_remove):
if running_async_tasks:
task_future = running_async_tasks.pop()
task_future.cancel()
# Wait for the task to actually stop
try:
task_future.result(timeout=5) # 5 second timeout
except Exception:
pass # Task was cancelled, which is expected
removed_count += 1
return {
'status': 'success',
'message': f'Removed {removed_count} workers',
'previous_count': current_count,
'current_count': current_count - removed_count
}
else:
# Sync workers - more complex to adjust dynamically
return {
'status': 'not_supported',
'message': 'Dynamic worker adjustment not supported for sync workers',
'current_count': current_count
}
def get_worker_status():
"""Get status information about workers"""
return {