Queuing system and lazy profile fetch

pull/1/head
Andrew Godwin 2022-11-05 22:49:25 -06:00
rodzic 56de2362a0
commit a2404e01cd
13 zmienionych plików z 246 dodań i 6 usunięć

Wyświetl plik

21
miniq/admin.py 100644
Wyświetl plik

@ -0,0 +1,21 @@
from django.contrib import admin
from miniq.models import Task
@admin.register(Task)
class TaskAdmin(admin.ModelAdmin):
list_display = ["id", "created", "type", "subject", "completed", "failed"]
ordering = ["-created"]
actions = ["reset"]
@admin.action(description="Reset Task")
def reset(self, request, queryset):
queryset.update(
failed=None,
completed=None,
locked=None,
locked_by=None,
error=None,
)

6
miniq/apps.py 100644
Wyświetl plik

@ -0,0 +1,6 @@
from django.apps import AppConfig
class MiniqConfig(AppConfig):
default_auto_field = "django.db.models.BigAutoField"
name = "miniq"

Wyświetl plik

@ -0,0 +1,37 @@
# Generated by Django 4.1.3 on 2022-11-06 03:59
from django.db import migrations, models
class Migration(migrations.Migration):
initial = True
dependencies = []
operations = [
migrations.CreateModel(
name="Task",
fields=[
(
"id",
models.BigAutoField(
auto_created=True,
primary_key=True,
serialize=False,
verbose_name="ID",
),
),
("type", models.CharField(max_length=500)),
("priority", models.IntegerField(default=0)),
("subject", models.TextField()),
("payload", models.JSONField(blank=True, null=True)),
("error", models.TextField(blank=True, null=True)),
("created", models.DateTimeField(auto_now_add=True)),
("completed", models.DateTimeField(blank=True, null=True)),
("failed", models.DateTimeField(blank=True, null=True)),
("locked", models.DateTimeField(blank=True, null=True)),
("locked_by", models.CharField(blank=True, max_length=500, null=True)),
],
),
]

Wyświetl plik

68
miniq/models.py 100644
Wyświetl plik

@ -0,0 +1,68 @@
from typing import Optional
from django.db import models, transaction
from django.utils import timezone
class Task(models.Model):
"""
A task that must be done by a queue processor
"""
class TypeChoices(models.TextChoices):
identity_fetch = "identity_fetch"
type = models.CharField(max_length=500, choices=TypeChoices.choices)
priority = models.IntegerField(default=0)
subject = models.TextField()
payload = models.JSONField(blank=True, null=True)
error = models.TextField(blank=True, null=True)
created = models.DateTimeField(auto_now_add=True)
completed = models.DateTimeField(blank=True, null=True)
failed = models.DateTimeField(blank=True, null=True)
locked = models.DateTimeField(blank=True, null=True)
locked_by = models.CharField(max_length=500, blank=True, null=True)
def __str__(self):
return f"{self.id}/{self.type}({self.subject})"
@classmethod
def get_one_available(cls, processor_id) -> Optional["Task"]:
"""
Gets one task off the list while reserving it, atomically.
"""
with transaction.atomic():
next_task = cls.objects.filter(locked__isnull=True).first()
if next_task is None:
return None
next_task.locked = timezone.now()
next_task.locked_by = processor_id
next_task.save()
return next_task
@classmethod
def submit(cls, type, subject, payload=None, deduplicate=True):
# Deduplication is done against tasks that have not started yet only,
# and only on tasks without payloads
if deduplicate and not payload:
if cls.objects.filter(
type=type,
subject=subject,
completed__isnull=True,
failed__isnull=True,
locked__isnull=True,
).exists():
return
cls.objects.create(type=type, subject=subject, payload=payload)
async def complete(self):
await self.__class__.objects.filter(id=self.id).aupdate(
completed=timezone.now()
)
async def fail(self, error):
await self.__class__.objects.filter(id=self.id).aupdate(
failed=timezone.now(),
error=error,
)

68
miniq/views.py 100644
Wyświetl plik

@ -0,0 +1,68 @@
import asyncio
import time
import traceback
import uuid
from asgiref.sync import sync_to_async
from django.http import HttpResponse
from django.views import View
from miniq.models import Task
from users.models import Identity
class QueueProcessor(View):
"""
A view that takes some items off the queue and processes them.
Tries to limit its own runtime so it's within HTTP timeout limits.
"""
START_TIMEOUT = 30
TOTAL_TIMEOUT = 60
MAX_TASKS = 10
async def get(self, request):
start_time = time.monotonic()
processor_id = uuid.uuid4().hex
handled = 0
self.tasks = []
# For the first time period, launch tasks
while (time.monotonic() - start_time) < self.START_TIMEOUT:
# Remove completed tasks
self.tasks = [t for t in self.tasks if not t.done()]
# See if there's a new task
if len(self.tasks) < self.MAX_TASKS:
# Pop a task off the queue and run it
task = await sync_to_async(Task.get_one_available)(processor_id)
if task is not None:
self.tasks.append(asyncio.create_task(self.run_task(task)))
handled += 1
# Prevent busylooping
await asyncio.sleep(0.01)
# Then wait for tasks to finish
while (time.monotonic() - start_time) < self.TOTAL_TIMEOUT:
# Remove completed tasks
self.tasks = [t for t in self.tasks if not t.done()]
if not self.tasks:
break
# Prevent busylooping
await asyncio.sleep(1)
return HttpResponse(f"{handled} tasks handled")
async def run_task(self, task):
try:
print(f"Task {task}: Starting")
handler = getattr(self, f"handle_{task.type}", None)
if handler is None:
raise ValueError(f"Cannot handle type {task.type}")
await handler(task.subject, task.payload)
await task.complete()
print(f"Task {task}: Complete")
except BaseException as e:
print(f"Task {task}: Error {e}")
traceback.print_exc()
await task.fail(f"{e}\n\n" + traceback.format_exc())
async def handle_identity_fetch(self, subject, payload):
identity = await sync_to_async(Identity.by_handle)(subject)
await identity.fetch_details()

Wyświetl plik

@ -100,7 +100,7 @@ header h1 {
font-family: "Raleway";
font-weight: normal;
background: var(--color-fg2);
padding: 10px 7px 7px 7px;
padding: 10px 7px 7px 0;
font-size: 130%;
height: 2.2em;
color: var(--color-fg1);

Wyświetl plik

@ -25,6 +25,7 @@ INSTALLED_APPS = [
"core",
"statuses",
"users",
"miniq",
]
MIDDLEWARE = [
@ -113,3 +114,4 @@ CRISPY_FAIL_SILENTLY = not DEBUG
SITE_NAME = "takahē"
DEFAULT_DOMAIN = "feditest.aeracode.org"
ALLOWED_DOMAINS = ["feditest.aeracode.org"]
IDENTITY_MAX_AGE = 24 * 60 * 60

Wyświetl plik

@ -2,6 +2,7 @@ from django.contrib import admin
from django.urls import path
from core import views as core
from miniq import views as miniq
from users.views import auth, identity
urlpatterns = [
@ -19,6 +20,8 @@ urlpatterns = [
path("identity/create/", identity.CreateIdentity.as_view()),
# Well-known endpoints
path(".well-known/webfinger", identity.Webfinger.as_view()),
# Task runner
path(".queue/process/", miniq.QueueProcessor.as_view()),
# Django admin
path("djadmin/", admin.site.urls),
]

Wyświetl plik

@ -14,10 +14,16 @@
</h1>
{% if not identity.local %}
<p class="system-note">
This user is a member of another server.
<a href="{{ identity.profile_uri }}">See their original profile</a>
</p>
{% if not identity.actor_uri %}
<p class="system-note">
The system is still fetching this profile. Refresh to see updates.
</p>
{% else %}
<p class="system-note">
This is a member of another server.
<a href="{{ identity.profile_uri }}">See their original profile</a>
</p>
{% endif %}
{% endif %}
{% for status in statuses %}

Wyświetl plik

@ -59,6 +59,19 @@ class Identity(models.Model):
fetched = models.DateTimeField(null=True, blank=True)
deleted = models.DateTimeField(null=True, blank=True)
@classmethod
def by_handle(cls, handle, create=True):
if handle.startswith("@"):
raise ValueError("Handle must not start with @")
if "@" not in handle:
raise ValueError("Handle must contain domain")
try:
return cls.objects.filter(handle=handle).get()
except cls.DoesNotExist:
if create:
return cls.objects.create(handle=handle, local=False)
return None
@property
def short_handle(self):
if self.handle.endswith("@" + settings.DEFAULT_DOMAIN):
@ -69,6 +82,17 @@ class Identity(models.Model):
def domain(self):
return self.handle.split("@", 1)[1]
@property
def data_age(self) -> float:
"""
How old our copy of this data is, in seconds
"""
if self.local:
return 0
if self.fetched is None:
return 10000000000
return (timezone.now() - self.fetched).total_seconds()
def generate_keypair(self):
private_key = rsa.generate_private_key(
public_exponent=65537,
@ -104,6 +128,7 @@ class Identity(models.Model):
response = await client.get(
f"https://{self.domain}/.well-known/webfinger?resource=acct:{self.handle}",
headers={"Accept": "application/json"},
follow_redirects=True,
)
if response.status_code >= 400:
return False
@ -126,6 +151,7 @@ class Identity(models.Model):
response = await client.get(
self.actor_uri,
headers={"Accept": "application/json"},
follow_redirects=True,
)
if response.status_code >= 400:
return False

Wyświetl plik

@ -10,6 +10,7 @@ from django.views.decorators.csrf import csrf_exempt
from django.views.generic import FormView, TemplateView, View
from core.forms import FormHelper
from miniq.models import Task
from users.models import Identity
from users.shortcuts import by_handle_or_404
@ -19,8 +20,10 @@ class ViewIdentity(TemplateView):
template_name = "identity/view.html"
def get_context_data(self, handle):
identity = by_handle_or_404(self.request, handle, local=False)
identity = Identity.by_handle(handle=handle)
statuses = identity.statuses.all()[:100]
if identity.data_age > settings.IDENTITY_MAX_AGE:
Task.submit("identity_fetch", identity.handle)
return {
"identity": identity,
"statuses": statuses,