diff --git a/stator/admin.py b/stator/admin.py index 2d001ea..d22fd1b 100644 --- a/stator/admin.py +++ b/stator/admin.py @@ -1,20 +1,15 @@ from django.contrib import admin -from stator.models import StatorError +from stator.models import Stats -@admin.register(StatorError) +@admin.register(Stats) class DomainAdmin(admin.ModelAdmin): list_display = [ - "id", - "date", "model_label", - "instance_pk", - "state", - "error", + "updated", ] - list_filter = ["model_label", "date"] - ordering = ["-date"] + ordering = ["model_label"] def has_add_permission(self, request, obj=None): return False diff --git a/stator/migrations/0002_stats_delete_statorerror.py b/stator/migrations/0002_stats_delete_statorerror.py new file mode 100644 index 0000000..5d22003 --- /dev/null +++ b/stator/migrations/0002_stats_delete_statorerror.py @@ -0,0 +1,31 @@ +# Generated by Django 4.1.4 on 2022-12-15 18:38 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ("stator", "0001_initial"), + ] + + operations = [ + migrations.CreateModel( + name="Stats", + fields=[ + ( + "model_label", + models.CharField(max_length=200, primary_key=True, serialize=False), + ), + ("statistics", models.JSONField()), + ("created", models.DateTimeField(auto_now_add=True)), + ("updated", models.DateTimeField(auto_now=True)), + ], + options={ + "verbose_name_plural": "Stats", + }, + ), + migrations.DeleteModel( + name="StatorError", + ), + ] diff --git a/stator/models.py b/stator/models.py index 261584c..c6e777a 100644 --- a/stator/models.py +++ b/stator/models.py @@ -1,5 +1,4 @@ import datetime -import pprint import traceback from typing import ClassVar, cast @@ -127,6 +126,19 @@ class StatorModel(models.Model): ) -> list["StatorModel"]: return await sync_to_async(cls.transition_get_with_lock)(number, lock_expiry) + @classmethod + async def atransition_ready_count(cls) -> int: + """ + Returns how many instances are "queued" + """ + return await ( + cls.objects.filter( + state_locked_until__isnull=True, + state_ready=True, + state__in=cls.state_graph.automatic_states, + ).acount() + ) + @classmethod async def atransition_clean_locks(cls): await cls.objects.filter(state_locked_until__lte=timezone.now()).aupdate( @@ -158,7 +170,6 @@ class StatorModel(models.Model): try: next_state = await current_state.handler(self) except BaseException as e: - await StatorError.acreate_from_instance(self, e) await exceptions.acapture_exception(e) traceback.print_exc() else: @@ -209,46 +220,126 @@ class StatorModel(models.Model): atransition_perform = sync_to_async(transition_perform) -class StatorError(models.Model): +class Stats(models.Model): """ - Tracks any errors running the transitions. - Meant to be cleaned out regularly. Should probably be a log. + Tracks summary statistics of each model over time. """ # appname.modelname (lowercased) label for the model this represents - model_label = models.CharField(max_length=200) + model_label = models.CharField(max_length=200, primary_key=True) - # The primary key of that model (probably int or str) - instance_pk = models.CharField(max_length=200) + statistics = models.JSONField() - # The state we were on - state = models.CharField(max_length=200) + created = models.DateTimeField(auto_now_add=True) + updated = models.DateTimeField(auto_now=True) - # When it happened - date = models.DateTimeField(auto_now_add=True) - - # Error name - error = models.TextField() - - # Error details - error_details = models.TextField(blank=True, null=True) + class Meta: + verbose_name_plural = "Stats" @classmethod - async def acreate_from_instance( - cls, - instance: StatorModel, - exception: BaseException | None = None, - ): - detail = traceback.format_exc() - if exception and len(exception.args) > 1: - detail += "\n\n" + "\n\n".join( - pprint.pformat(arg) for arg in exception.args - ) + def get_for_model(cls, model: type[StatorModel]) -> "Stats": + instance = cls.objects.filter(model_label=model._meta.label_lower).first() + if instance is None: + instance = cls(model_label=model._meta.label_lower) + if not instance.statistics: + instance.statistics = {} + # Ensure there are the right keys + for key in ["queued", "hourly", "daily", "monthly"]: + if key not in instance.statistics: + instance.statistics[key] = {} + return instance - return await cls.objects.acreate( - model_label=instance._meta.label_lower, - instance_pk=str(instance.pk), - state=instance.state, - error=str(exception), - error_details=detail, + @classmethod + async def aget_for_model(cls, model: type[StatorModel]) -> "Stats": + return await sync_to_async(cls.get_for_model)(model) + + def set_queued(self, number: int): + """ + Sets the current queued amount. + + The queue is an instantaneous value (a "gauge") rather than a + sum ("counter"). It's mostly used for reporting what things are right + now, but basic trend analysis is also used to see if we think the + queue is backing up. + """ + self.statistics["queued"][ + int(timezone.now().replace(second=0, microsecond=0).timestamp()) + ] = number + + def add_handled(self, number: int): + """ + Adds the "handled" number to the current stats. + """ + hour = timezone.now().replace(minute=0, second=0, microsecond=0) + day = hour.replace(hour=0) + hour_timestamp = str(int(hour.timestamp())) + day_timestamp = str(int(day.timestamp())) + month_timestamp = str(int(day.replace(day=1).timestamp())) + self.statistics["hourly"][hour_timestamp] = ( + self.statistics["hourly"].get(hour_timestamp, 0) + number + ) + self.statistics["daily"][day_timestamp] = ( + self.statistics["daily"].get(day_timestamp, 0) + number + ) + self.statistics["monthly"][month_timestamp] = ( + self.statistics["monthly"].get(month_timestamp, 0) + number + ) + + def trim_data(self): + """ + Removes excessively old data from the field + """ + queued_horizon = int((timezone.now() - datetime.timedelta(hours=2)).timestamp()) + hourly_horizon = int( + (timezone.now() - datetime.timedelta(hours=50)).timestamp() + ) + daily_horizon = int((timezone.now() - datetime.timedelta(days=62)).timestamp()) + monthly_horizon = int( + (timezone.now() - datetime.timedelta(days=3653)).timestamp() + ) + self.statistics["queued"] = { + ts: v + for ts, v in self.statistics["queued"].items() + if int(ts) >= queued_horizon + } + self.statistics["hourly"] = { + ts: v + for ts, v in self.statistics["hourly"].items() + if int(ts) >= hourly_horizon + } + self.statistics["daily"] = { + ts: v + for ts, v in self.statistics["daily"].items() + if int(ts) >= daily_horizon + } + self.statistics["monthly"] = { + ts: v + for ts, v in self.statistics["monthly"].items() + if int(ts) >= monthly_horizon + } + + def most_recent_queued(self) -> int: + """ + Returns the most recent number of how many were queued + """ + queued = [(int(ts), v) for ts, v in self.statistics["queued"].items()] + queued.sort(reverse=True) + if queued: + return queued[0][1] + else: + return 0 + + def most_recent_handled(self) -> tuple[int, int, int]: + """ + Returns the current handling numbers for hour, day, month + """ + hour = timezone.now().replace(minute=0, second=0, microsecond=0) + day = hour.replace(hour=0) + hour_timestamp = str(int(hour.timestamp())) + day_timestamp = str(int(day.timestamp())) + month_timestamp = str(int(day.replace(day=1).timestamp())) + return ( + self.statistics["hourly"].get(hour_timestamp, 0), + self.statistics["daily"].get(day_timestamp, 0), + self.statistics["monthly"].get(month_timestamp, 0), ) diff --git a/stator/runner.py b/stator/runner.py index 7305a6e..ad3f660 100644 --- a/stator/runner.py +++ b/stator/runner.py @@ -4,12 +4,12 @@ import time import traceback import uuid -from asgiref.sync import async_to_sync +from asgiref.sync import async_to_sync, sync_to_async from django.utils import timezone from core import exceptions, sentry from core.models import Config -from stator.models import StatorModel +from stator.models import StatorModel, Stats class StatorRunner: @@ -39,7 +39,7 @@ class StatorRunner: async def run(self): sentry.set_takahe_app("stator") - self.handled = 0 + self.handled = {} self.started = time.monotonic() self.last_clean = time.monotonic() - self.schedule_interval self.tasks = [] @@ -52,7 +52,9 @@ class StatorRunner: if (time.monotonic() - self.last_clean) >= self.schedule_interval: # Refresh the config Config.system = await Config.aload_system() - print(f"{self.handled} tasks processed so far") + print("Tasks processed this loop:") + for label, number in self.handled.items(): + print(f" {label}: {number}") print("Running cleaning and scheduling") await self.run_scheduling() @@ -91,10 +93,23 @@ class StatorRunner: """ with sentry.start_transaction(op="task", name="stator.run_scheduling"): for model in self.models: + asyncio.create_task(self.submit_stats(model)) asyncio.create_task(model.atransition_clean_locks()) asyncio.create_task(model.atransition_schedule_due()) self.last_clean = time.monotonic() + async def submit_stats(self, model): + """ + Pop some statistics into the database + """ + stats_instance = await Stats.aget_for_model(model) + if stats_instance.model_label in self.handled: + stats_instance.add_handled(self.handled[stats_instance.model_label]) + del self.handled[stats_instance.model_label] + stats_instance.set_queued(await model.atransition_ready_count()) + stats_instance.trim_data() + await sync_to_async(stats_instance.save)() + async def fetch_and_process_tasks(self): # Calculate space left for tasks space_remaining = self.concurrency - len(self.tasks) @@ -110,7 +125,9 @@ class StatorRunner: self.tasks.append( asyncio.create_task(self.run_transition(instance)) ) - self.handled += 1 + self.handled[model._meta.label_lower] = ( + self.handled.get(model._meta.label_lower, 0) + 1 + ) space_remaining -= 1 async def run_transition(self, instance: StatorModel): diff --git a/takahe/urls.py b/takahe/urls.py index dfdf4da..6bb239b 100644 --- a/takahe/urls.py +++ b/takahe/urls.py @@ -127,6 +127,11 @@ urlpatterns = [ "admin/hashtags//delete/", admin.HashtagDelete.as_view(), ), + path( + "admin/stator/", + admin.Stator.as_view(), + name="admin_stator", + ), # Identity views path("@/", identity.ViewIdentity.as_view()), path("@/inbox/", activitypub.Inbox.as_view()), diff --git a/templates/admin/stator.html b/templates/admin/stator.html new file mode 100644 index 0000000..64bd432 --- /dev/null +++ b/templates/admin/stator.html @@ -0,0 +1,13 @@ +{% extends "settings/base.html" %} + +{% block subtitle %}Stator{% endblock %} + +{% block content %} + {% for model, stats in model_stats.items %} +
+ {{ model }} +

Pending: {{ stats.most_recent_queued }}

+

Processed today: {{ stats.most_recent_handled.1 }}

+
+ {% endfor %} +{% endblock %} diff --git a/templates/settings/_menu.html b/templates/settings/_menu.html index bcb404d..0a6062d 100644 --- a/templates/settings/_menu.html +++ b/templates/settings/_menu.html @@ -42,6 +42,9 @@ Tuning + + Stator + Django Admin diff --git a/users/views/admin/__init__.py b/users/views/admin/__init__.py index b8ebc40..bb70ff7 100644 --- a/users/views/admin/__init__.py +++ b/users/views/admin/__init__.py @@ -22,6 +22,7 @@ from users.views.admin.settings import ( # noqa PoliciesSettings, TuningSettings, ) +from users.views.admin.stator import Stator # noqa @method_decorator(admin_required, name="dispatch") diff --git a/users/views/admin/stator.py b/users/views/admin/stator.py new file mode 100644 index 0000000..c3ce01d --- /dev/null +++ b/users/views/admin/stator.py @@ -0,0 +1,20 @@ +from django.utils.decorators import method_decorator +from django.views.generic import TemplateView + +from stator.models import StatorModel, Stats +from users.decorators import admin_required + + +@method_decorator(admin_required, name="dispatch") +class Stator(TemplateView): + + template_name = "admin/stator.html" + + def get_context_data(self): + return { + "model_stats": { + model._meta.verbose_name_plural.title(): Stats.get_for_model(model) + for model in StatorModel.subclasses + }, + "section": "stator", + }