Rework stator to avoid deadlocks on slow databases

Refs #424
pull/472/head^2
Andrew Godwin 2023-02-03 21:51:24 -07:00
rodzic d8fc81a9a6
commit 36676fad59
6 zmienionych plików z 197 dodań i 35 usunięć

Wyświetl plik

@ -263,6 +263,7 @@ class Migration(migrations.Migration):
("undo_interaction", "Undo Interaction"),
("identity_edited", "Identity Edited"),
("identity_deleted", "Identity Deleted"),
("identity_created", "Identity Created"),
],
max_length=100,
),
@ -325,6 +326,7 @@ class Migration(migrations.Migration):
("followed", "Followed"),
("boosted", "Boosted"),
("announcement", "Announcement"),
("identity_created", "Identity Created"),
],
max_length=100,
),

Wyświetl plik

@ -0,0 +1,55 @@
# Generated by Django 4.1.4 on 2023-02-04 01:05
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
("activities", "0009_alter_timelineevent_index_together"),
]
operations = [
migrations.AlterField(
model_name="emoji",
name="state_ready",
field=models.BooleanField(db_index=True, default=True),
),
migrations.AlterField(
model_name="fanout",
name="state_ready",
field=models.BooleanField(db_index=True, default=True),
),
migrations.AlterField(
model_name="hashtag",
name="state_ready",
field=models.BooleanField(db_index=True, default=True),
),
migrations.AlterField(
model_name="post",
name="state_ready",
field=models.BooleanField(db_index=True, default=True),
),
migrations.AlterField(
model_name="postattachment",
name="state_ready",
field=models.BooleanField(db_index=True, default=True),
),
migrations.AlterField(
model_name="postinteraction",
name="state_ready",
field=models.BooleanField(db_index=True, default=True),
),
migrations.AlterIndexTogether(
name="fanout",
index_together={("state_ready", "state_locked_until", "state")},
),
migrations.AlterIndexTogether(
name="hashtag",
index_together={("state_ready", "state_locked_until", "state")},
),
migrations.AlterIndexTogether(
name="postattachment",
index_together={("state_ready", "state_locked_until", "state")},
),
]

Wyświetl plik

@ -84,7 +84,7 @@ class StatorModel(models.Model):
state: StateField
# If this row is up for transition attempts (which it always is on creation!)
state_ready = models.BooleanField(default=True)
state_ready = models.BooleanField(default=True, db_index=True)
# When the state last actually changed, or the date of instance creation
state_changed = models.DateTimeField(auto_now_add=True)
@ -102,6 +102,7 @@ class StatorModel(models.Model):
class Meta:
abstract = True
index_together = ["state_ready", "state_locked_until", "state"]
# Need this empty indexes to ensure child Models have a Meta.indexes
# that will look to add indexes (that we inject with class_prepared)
indexes: list = []

Wyświetl plik

@ -6,7 +6,7 @@ import time
import traceback
import uuid
from asgiref.sync import async_to_sync, sync_to_async
from asgiref.sync import ThreadSensitiveContext, async_to_sync, sync_to_async
from django.conf import settings
from django.utils import timezone
@ -15,6 +15,28 @@ from core.models import Config
from stator.models import StatorModel, Stats
class LoopingTask:
"""
Wrapper for having a coroutine go in the background and only have one
copy running at a time.
"""
def __init__(self, callable):
self.callable = callable
self.task: asyncio.Task | None = None
def run(self) -> bool:
# If we have a task object, see if we can clear it up
if self.task is not None:
if self.task.done():
self.task = None
else:
return False
# OK, launch a new task
self.task = asyncio.create_task(self.callable())
return True
class StatorRunner:
"""
Runs tasks on models that are looking for state changes.
@ -26,7 +48,7 @@ class StatorRunner:
models: list[type[StatorModel]],
concurrency: int = getattr(settings, "STATOR_CONCURRENCY", 50),
concurrency_per_model: int = getattr(
settings, "STATOR_CONCURRENCY_PER_MODEL", 20
settings, "STATOR_CONCURRENCY_PER_MODEL", 15
),
liveness_file: str | None = None,
schedule_interval: int = 30,
@ -53,6 +75,9 @@ class StatorRunner:
self.last_clean = time.monotonic() - self.schedule_interval
self.tasks = []
self.loop_delay = self.minimum_loop_delay
self.schedule_task = LoopingTask(self.run_scheduling)
self.fetch_task = LoopingTask(self.fetch_and_process_tasks)
self.config_task = LoopingTask(self.load_config)
# For the first time period, launch tasks
print("Running main task loop")
try:
@ -64,22 +89,25 @@ class StatorRunner:
# previous one is cancelled)
signal.alarm(self.schedule_interval * 2)
# Refresh the config
Config.system = await Config.aload_system()
print("Tasks processed this loop:")
for label, number in self.handled.items():
print(f" {label}: {number}")
print("Running cleaning and scheduling")
await self.run_scheduling()
self.config_task.run()
if self.schedule_task.run():
print("Running cleaning and scheduling")
else:
print("Previous scheduling still running...!")
# Write liveness file if configured
if self.liveness_file:
with open(self.liveness_file, "w") as fh:
fh.write(str(int(time.time())))
self.last_clean = time.monotonic()
# Clear the cleaning breadcrumbs/extra for the main part of the loop
sentry.scope_clear(scope)
self.remove_completed_tasks()
await self.fetch_and_process_tasks()
# Fetching is kind of blocking, so we need to do this
# as a separate coroutine
self.fetch_task.run()
# Are we in limited run mode?
if (
@ -122,17 +150,28 @@ class StatorRunner:
print("Watchdog timeout exceeded")
os._exit(2)
async def load_config(self):
"""
Refreshes config from the DB
"""
Config.system = await Config.aload_system()
async def run_scheduling(self):
"""
Do any transition cleanup tasks
"""
if self.handled:
print("Tasks processed since last flush:")
for label, number in self.handled.items():
print(f" {label}: {number}")
else:
print("No tasks handled since last flush.")
with sentry.start_transaction(op="task", name="stator.run_scheduling"):
for model in self.models:
asyncio.create_task(self.submit_stats(model))
asyncio.create_task(model.atransition_clean_locks())
asyncio.create_task(model.atransition_schedule_due())
asyncio.create_task(model.atransition_delete_due())
self.last_clean = time.monotonic()
await self.submit_stats(model)
await model.atransition_clean_locks()
await model.atransition_schedule_due()
await model.atransition_delete_due()
async def submit_stats(self, model):
"""
@ -171,25 +210,26 @@ class StatorRunner:
Wrapper for atransition_attempt with fallback error handling
"""
task_name = f"stator.run_transition:{instance._meta.label_lower}#{{id}} from {instance.state}"
with sentry.start_transaction(op="task", name=task_name):
sentry.set_context(
"instance",
{
"model": instance._meta.label_lower,
"pk": instance.pk,
"state": instance.state,
"state_age": instance.state_age,
},
)
try:
print(
f"Attempting transition on {instance._meta.label_lower}#{instance.pk} from state {instance.state}"
async with ThreadSensitiveContext():
with sentry.start_transaction(op="task", name=task_name):
sentry.set_context(
"instance",
{
"model": instance._meta.label_lower,
"pk": instance.pk,
"state": instance.state,
"state_age": instance.state_age,
},
)
await instance.atransition_attempt()
except BaseException as e:
await exceptions.acapture_exception(e)
traceback.print_exc()
try:
print(
f"Attempting transition on {instance._meta.label_lower}#{instance.pk} from state {instance.state}"
)
await instance.atransition_attempt()
except BaseException as e:
await exceptions.acapture_exception(e)
traceback.print_exc()
def remove_completed_tasks(self):
"""

Wyświetl plik

@ -143,8 +143,8 @@ class Settings(BaseSettings):
CACHES_DEFAULT: CacheBackendUrl | None = None
# Stator tuning
STATOR_CONCURRENCY: int = 100
STATOR_CONCURRENCY_PER_MODEL: int = 40
STATOR_CONCURRENCY: int = 50
STATOR_CONCURRENCY_PER_MODEL: int = 15
PGHOST: str | None = None
PGPORT: int | None = 5432

Wyświetl plik

@ -0,0 +1,64 @@
# Generated by Django 4.1.4 on 2023-02-04 01:05
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
("users", "0012_block_states"),
]
operations = [
migrations.AlterField(
model_name="block",
name="state_ready",
field=models.BooleanField(db_index=True, default=True),
),
migrations.AlterField(
model_name="domain",
name="state_ready",
field=models.BooleanField(db_index=True, default=True),
),
migrations.AlterField(
model_name="follow",
name="state_ready",
field=models.BooleanField(db_index=True, default=True),
),
migrations.AlterField(
model_name="identity",
name="state_ready",
field=models.BooleanField(db_index=True, default=True),
),
migrations.AlterField(
model_name="inboxmessage",
name="state_ready",
field=models.BooleanField(db_index=True, default=True),
),
migrations.AlterField(
model_name="passwordreset",
name="state_ready",
field=models.BooleanField(db_index=True, default=True),
),
migrations.AlterField(
model_name="report",
name="state_ready",
field=models.BooleanField(db_index=True, default=True),
),
migrations.AlterIndexTogether(
name="domain",
index_together={("state_ready", "state_locked_until", "state")},
),
migrations.AlterIndexTogether(
name="inboxmessage",
index_together={("state_ready", "state_locked_until", "state")},
),
migrations.AlterIndexTogether(
name="passwordreset",
index_together={("state_ready", "state_locked_until", "state")},
),
migrations.AlterIndexTogether(
name="report",
index_together={("state_ready", "state_locked_until", "state")},
),
]