Prevent race condition between threads and locking

pull/606/head
Andrew Godwin 2023-07-09 00:42:56 -06:00
rodzic 933f6660d5
commit 2523de4249
1 zmienionych plików z 12 dodań i 9 usunięć

Wyświetl plik

@ -71,7 +71,7 @@ class StatorRunner:
self.run_for = run_for self.run_for = run_for
self.minimum_loop_delay = 0.5 self.minimum_loop_delay = 0.5
self.maximum_loop_delay = 5 self.maximum_loop_delay = 5
self.tasks: list[Future] = [] self.tasks: dict[tuple[str, str], Future] = {}
# Set up SIGALRM handler # Set up SIGALRM handler
signal.signal(signal.SIGALRM, self.alarm_handler) signal.signal(signal.SIGALRM, self.alarm_handler)
@ -197,11 +197,15 @@ class StatorRunner:
timezone.now() + datetime.timedelta(seconds=self.lock_expiry) timezone.now() + datetime.timedelta(seconds=self.lock_expiry)
), ),
): ):
key = (model._meta.label_lower, instance.pk)
# Don't run two threads for the same thing
if key in self.tasks:
continue
if call_inline: if call_inline:
task_transition(instance, in_thread=False) task_transition(instance, in_thread=False)
else: else:
self.tasks.append( self.tasks[key] = self.executor.submit(
self.executor.submit(task_transition, instance) task_transition, instance
) )
self.handled[model._meta.label_lower] = ( self.handled[model._meta.label_lower] = (
self.handled.get(model._meta.label_lower, 0) + 1 self.handled.get(model._meta.label_lower, 0) + 1
@ -218,24 +222,23 @@ class StatorRunner:
if call_inline: if call_inline:
task_deletion(model, in_thread=False) task_deletion(model, in_thread=False)
else: else:
self.tasks.append(self.executor.submit(task_deletion, model)) self.tasks[
model._meta.label_lower, "__delete__"
] = self.executor.submit(task_deletion, model)
def clean_tasks(self): def clean_tasks(self):
""" """
Removes any tasks that are done and handles exceptions if they Removes any tasks that are done and handles exceptions if they
raised them. raised them.
""" """
new_tasks = [] for key, task in list(self.tasks.items()):
for task in self.tasks:
if task.done(): if task.done():
del self.tasks[key]
try: try:
task.result() task.result()
except BaseException as e: except BaseException as e:
exceptions.capture_exception(e) exceptions.capture_exception(e)
traceback.print_exc() traceback.print_exc()
else:
new_tasks.append(task)
self.tasks = new_tasks
def run_single_cycle(self): def run_single_cycle(self):
""" """