diff --git a/takahe/urls.py b/takahe/urls.py index cb833ae..63cd0bf 100644 --- a/takahe/urls.py +++ b/takahe/urls.py @@ -153,6 +153,11 @@ urlpatterns = [ admin.FederationEdit.as_view(), name="admin_federation_edit", ), + path( + "admin/relays/", + admin.RelaysRoot.as_view(), + name="admin_relays", + ), path( "admin/users/", admin.UsersRoot.as_view(), diff --git a/templates/admin/_menu.html b/templates/admin/_menu.html index 0b0ec10..07fe06a 100644 --- a/templates/admin/_menu.html +++ b/templates/admin/_menu.html @@ -45,6 +45,10 @@ Federation + + + Relays + Users diff --git a/templates/admin/relays.html b/templates/admin/relays.html new file mode 100644 index 0000000..9038dfd --- /dev/null +++ b/templates/admin/relays.html @@ -0,0 +1,53 @@ +{% extends "admin/base_main.html" %} +{% load activity_tags %} +{% block subtitle %}Relay{% endblock %} +{% block settings_content %} + + + {% for relay in page_obj %} + + + + + + + + {% empty %} + + + + {% endfor %} +
+ {% if relay.state == 'subscribed' %} + + {% elif relay.state == 'failed' or relay.state == 'rejected' or relay.state == 'unsubscribed' %} + + {% else %} + + {% endif %} + {{ relay.inbox_uri }}{{ relay.state }} +
+ + {% csrf_token %} + +
+
+
+ + {% csrf_token %} + +
+
There are no relay yet.
+
+   Use remove only when it's stuck in (un)subscribing state for more than 10 minutes. +
+ {% include "admin/_pagination.html" with nouns="relay,relays" %} +{% endblock %} diff --git a/users/migrations/0023_add_relay.py b/users/migrations/0023_add_relay.py new file mode 100644 index 0000000..d215ea8 --- /dev/null +++ b/users/migrations/0023_add_relay.py @@ -0,0 +1,60 @@ +# Generated by Django 4.2.8 on 2024-01-02 16:20 + +from django.db import migrations, models + +import stator.models +import users.models.relay + + +class Migration(migrations.Migration): + dependencies = [ + ("users", "0022_follow_request"), + ] + + operations = [ + migrations.CreateModel( + name="Relay", + fields=[ + ( + "id", + models.BigAutoField( + auto_created=True, + primary_key=True, + serialize=False, + verbose_name="ID", + ), + ), + ("state_changed", models.DateTimeField(auto_now_add=True)), + ("state_next_attempt", models.DateTimeField(blank=True, null=True)), + ( + "state_locked_until", + models.DateTimeField(blank=True, db_index=True, null=True), + ), + ("inbox_uri", models.CharField(max_length=500, unique=True)), + ( + "state", + stator.models.StateField( + choices=[ + ("new", "new"), + ("subscribed", "subscribed"), + ("unsubscribing", "unsubscribing"), + ("unsubscribed", "unsubscribed"), + ], + default="new", + graph=users.models.relay.RelayStates, + max_length=100, + ), + ), + ("created", models.DateTimeField(auto_now_add=True)), + ("updated", models.DateTimeField(auto_now=True)), + ], + options={ + "indexes": [ + models.Index( + fields=["state", "state_next_attempt", "state_locked_until"], + name="ix_relay_state_next", + ) + ], + }, + ), + ] diff --git a/users/models/__init__.py b/users/models/__init__.py index 8396e42..be5b979 100644 --- a/users/models/__init__.py +++ b/users/models/__init__.py @@ -8,6 +8,7 @@ from .identity import Identity, IdentityStates # noqa from .inbox_message import InboxMessage, InboxMessageStates # noqa from .invite import Invite # noqa from .password_reset import PasswordReset # noqa +from .relay import Relay, RelayStates # noqa from .report import Report # noqa from .system_actor import SystemActor # noqa from .user import User # noqa diff --git a/users/models/inbox_message.py b/users/models/inbox_message.py index 5609a77..61a61ed 100644 --- a/users/models/inbox_message.py +++ b/users/models/inbox_message.py @@ -16,7 +16,7 @@ class InboxMessageStates(StateGraph): @classmethod def handle_received(cls, instance: "InboxMessage"): from activities.models import Post, PostInteraction, TimelineEvent - from users.models import Block, Follow, Identity, Report + from users.models import Block, Follow, Identity, Relay, Report from users.services import IdentityService try: @@ -68,7 +68,10 @@ class InboxMessageStates(StateGraph): case "accept": match instance.message_object_type: case "follow": - Follow.handle_accept_ap(instance.message) + if Relay.is_ap_message_for_relay(instance.message): + Relay.handle_accept_ap(instance.message) + else: + Follow.handle_accept_ap(instance.message) case None: # It's a string object, but these will only be for Follows Follow.handle_accept_ap(instance.message) @@ -77,7 +80,10 @@ class InboxMessageStates(StateGraph): case "reject": match instance.message_object_type: case "follow": - Follow.handle_reject_ap(instance.message) + if Relay.is_ap_message_for_relay(instance.message): + Relay.handle_reject_ap(instance.message) + else: + Follow.handle_reject_ap(instance.message) case None: # It's a string object, but these will only be for Follows Follow.handle_reject_ap(instance.message) diff --git a/users/models/relay.py b/users/models/relay.py new file mode 100644 index 0000000..1bce4b2 --- /dev/null +++ b/users/models/relay.py @@ -0,0 +1,150 @@ +import logging +import re +from typing import Optional + +import httpx +from django.db import models, transaction + +from core.ld import canonicalise, get_str_or_id +from core.snowflake import Snowflake +from stator.models import State, StateField, StateGraph, StatorModel +from users.models.system_actor import SystemActor + +logger = logging.getLogger(__name__) + + +class RelayStates(StateGraph): + new = State(try_interval=600) + subscribing = State(externally_progressed=True) + subscribed = State(externally_progressed=True) + failed = State(externally_progressed=True) + rejected = State(externally_progressed=True) + unsubscribing = State(try_interval=600) + unsubscribed = State(delete_after=1) + + new.transitions_to(subscribing) + new.transitions_to(unsubscribing) + new.transitions_to(failed) + new.times_out_to(failed, seconds=38400) + subscribing.transitions_to(subscribed) + subscribing.transitions_to(unsubscribing) + subscribing.transitions_to(unsubscribed) + subscribing.transitions_to(rejected) + subscribing.transitions_to(failed) + subscribed.transitions_to(unsubscribing) + subscribed.transitions_to(rejected) + failed.transitions_to(unsubscribed) + rejected.transitions_to(unsubscribed) + unsubscribing.transitions_to(failed) + unsubscribing.transitions_to(unsubscribed) + unsubscribing.times_out_to(failed, seconds=38400) + + @classmethod + def handle_new(cls, instance: "Relay"): + system_actor = SystemActor() + try: + response = system_actor.signed_request( + method="post", + uri=instance.inbox_uri, + body=instance.to_follow_ap(), + ) + except: + logger.warning(f"Error sending follow: {instance.inbox_uri}") + return cls.failed + if response.status_code >= 200 and response.status_code < 300: + return cls.subscribing + else: + logger.error(f"Follow {instance.inbox_uri} HTTP {response.status_code}") + return cls.failed + + @classmethod + def handle_unsubscribing(cls, instance: "Relay"): + system_actor = SystemActor() + try: + response = system_actor.signed_request( + method="post", + uri=instance.inbox_uri, + body=instance.to_unfollow_ap(), + ) + except: + logger.error(f"Error sending unfollow {instance.inbox_uri}") + return cls.failed + if response.status_code >= 200 and response.status_code < 300: + return cls.unsubscribed + else: + logger.error(f"Unfollow {instance.inbox_uri} HTTP {response.status_code}") + return cls.failed + + +class Relay(StatorModel): + inbox_uri = models.CharField(max_length=500, unique=True) + + state = StateField(RelayStates) + + created = models.DateTimeField(auto_now_add=True) + updated = models.DateTimeField(auto_now=True) + + class Meta: + indexes: list = [] + + @classmethod + def active_inbox_uris(cls): + return list( + cls.objects.filter(state=RelayStates.subscribed).values_list( + "inbox_uri", flat=True + ) + ) + + @classmethod + def subscribe(cls, inbox_uri: str) -> "Relay": + return cls.objects.get_or_create(inbox_uri=inbox_uri.strip())[0] + + def unsubscribe(self): + self.transition_perform(RelayStates.unsubscribing) + + def force_unsubscribe(self): + self.transition_perform(RelayStates.unsubscribed) + + def to_follow_ap(self): + system_actor = SystemActor() + return { # skip canonicalise here to keep Public addressing as full URI + "@context": ["https://www.w3.org/ns/activitystreams"], + "id": f"{system_actor.actor_uri}relay/{self.pk}/#follow", + "type": "Follow", + "actor": system_actor.actor_uri, + "object": "https://www.w3.org/ns/activitystreams#Public", + } + + def to_unfollow_ap(self): + system_actor = SystemActor() + return { # skip canonicalise here to keep Public addressing as full URI + "@context": ["https://www.w3.org/ns/activitystreams"], + "id": f"{system_actor.actor_uri}relay/{self.pk}/#unfollow", + "type": "Undo", + "actor": system_actor.actor_uri, + "object": self.to_follow_ap(), + } + + @classmethod + def is_ap_message_for_relay(cls, message) -> bool: + return ( + re.match(r".+/relay/(\d+)/#(follow|unfollow)$", message["object"]["id"]) + is not None + ) + + @classmethod + def get_by_ap(cls, message) -> "Relay": + m = re.match(r".+/relay/(\d+)/#(follow|unfollow)$", message["object"]["id"]) + if not m: + raise ValueError("Not a valid relay follow response") + return cls.objects.get(pk=int(m[1])) + + @classmethod + def handle_accept_ap(cls, message): + relay = cls.get_by_ap(message) + relay.transition_perform(RelayStates.subscribed) + + @classmethod + def handle_reject_ap(cls, message): + relay = cls.get_by_ap(message) + relay.transition_perform(RelayStates.rejected) diff --git a/users/views/admin/__init__.py b/users/views/admin/__init__.py index 3330b1b..d66384c 100644 --- a/users/views/admin/__init__.py +++ b/users/views/admin/__init__.py @@ -31,6 +31,7 @@ from users.views.admin.federation import ( # noqa from users.views.admin.hashtags import HashtagEdit, HashtagEnable, Hashtags # noqa from users.views.admin.identities import IdentitiesRoot, IdentityEdit # noqa from users.views.admin.invites import InviteCreate, InvitesRoot, InviteView # noqa +from users.views.admin.relays import RelaysRoot # noqa from users.views.admin.reports import ReportsRoot, ReportView # noqa from users.views.admin.settings import ( # noqa BasicSettings, diff --git a/users/views/admin/relays.py b/users/views/admin/relays.py new file mode 100644 index 0000000..e6ed3d0 --- /dev/null +++ b/users/views/admin/relays.py @@ -0,0 +1,31 @@ +from django.db import models +from django.shortcuts import redirect +from django.utils.decorators import method_decorator +from django.views.generic import ListView + +from users.decorators import admin_required +from users.models import Identity, Relay + + +@method_decorator(admin_required, name="dispatch") +class RelaysRoot(ListView): + template_name = "admin/relays.html" + paginate_by = 30 + + def get(self, request, *args, **kwargs): + self.extra_context = { + "section": "relays", + } + return super().get(request, *args, **kwargs) + + def get_queryset(self): + return Relay.objects.all().order_by("-id") + + def post(self, request, *args, **kwargs): + if "subscribe" in request.GET: + Relay.subscribe(request.POST.get("inbox_uri")) + elif "unsubscribe" in request.GET: + Relay.objects.get(pk=int(request.POST.get("id"))).unsubscribe() + elif "remove" in request.GET: + Relay.objects.get(pk=int(request.POST.get("id"))).force_unsubscribe() + return redirect(".")