From 2523de4249d27e1a85101eb5495889c508a5dbdc Mon Sep 17 00:00:00 2001 From: Andrew Godwin Date: Sun, 9 Jul 2023 00:42:56 -0600 Subject: [PATCH] Prevent race condition between threads and locking --- stator/runner.py | 21 ++++++++++++--------- 1 file changed, 12 insertions(+), 9 deletions(-) diff --git a/stator/runner.py b/stator/runner.py index 3212c1c..35a810e 100644 --- a/stator/runner.py +++ b/stator/runner.py @@ -71,7 +71,7 @@ class StatorRunner: self.run_for = run_for self.minimum_loop_delay = 0.5 self.maximum_loop_delay = 5 - self.tasks: list[Future] = [] + self.tasks: dict[tuple[str, str], Future] = {} # Set up SIGALRM handler signal.signal(signal.SIGALRM, self.alarm_handler) @@ -197,11 +197,15 @@ class StatorRunner: timezone.now() + datetime.timedelta(seconds=self.lock_expiry) ), ): + key = (model._meta.label_lower, instance.pk) + # Don't run two threads for the same thing + if key in self.tasks: + continue if call_inline: task_transition(instance, in_thread=False) else: - self.tasks.append( - self.executor.submit(task_transition, instance) + self.tasks[key] = self.executor.submit( + task_transition, instance ) self.handled[model._meta.label_lower] = ( self.handled.get(model._meta.label_lower, 0) + 1 @@ -218,24 +222,23 @@ class StatorRunner: if call_inline: task_deletion(model, in_thread=False) else: - self.tasks.append(self.executor.submit(task_deletion, model)) + self.tasks[ + model._meta.label_lower, "__delete__" + ] = self.executor.submit(task_deletion, model) def clean_tasks(self): """ Removes any tasks that are done and handles exceptions if they raised them. """ - new_tasks = [] - for task in self.tasks: + for key, task in list(self.tasks.items()): if task.done(): + del self.tasks[key] try: task.result() except BaseException as e: exceptions.capture_exception(e) traceback.print_exc() - else: - new_tasks.append(task) - self.tasks = new_tasks def run_single_cycle(self): """