takahe/users/views/identity.py

230 wiersze
7.8 KiB
Python

import json
import string
from asgiref.sync import async_to_sync
from django import forms
from django.conf import settings
from django.contrib.auth.decorators import login_required
from django.http import Http404, HttpResponseBadRequest, JsonResponse
from django.shortcuts import redirect
from django.utils.decorators import method_decorator
from django.views.decorators.csrf import csrf_exempt
from django.views.generic import FormView, TemplateView, View
from core.forms import FormHelper
from core.ld import canonicalise
from core.signatures import HttpSignature
from miniq.models import Task
from users.models import Domain, Identity
from users.shortcuts import by_handle_or_404
class ViewIdentity(TemplateView):
template_name = "identity/view.html"
def get_context_data(self, handle):
identity = by_handle_or_404(self.request, handle, local=False)
statuses = identity.statuses.all()[:100]
if identity.data_age > settings.IDENTITY_MAX_AGE:
Task.submit("identity_fetch", identity.handle)
return {
"identity": identity,
"statuses": statuses,
}
@method_decorator(login_required, name="dispatch")
class SelectIdentity(TemplateView):
template_name = "identity/select.html"
def get_context_data(self):
return {
"identities": Identity.objects.filter(users__pk=self.request.user.pk),
}
@method_decorator(login_required, name="dispatch")
class ActivateIdentity(View):
def get(self, request, handle):
identity = by_handle_or_404(request, handle)
if not identity.users.filter(pk=request.user.pk).exists():
raise Http404()
request.session["identity_id"] = identity.id
# Get next URL, not allowing offsite links
next = request.GET.get("next") or "/"
if ":" in next:
next = "/"
return redirect("/")
@method_decorator(login_required, name="dispatch")
class CreateIdentity(FormView):
template_name = "identity/create.html"
class form_class(forms.Form):
username = forms.CharField()
name = forms.CharField()
helper = FormHelper(submit_text="Create")
def __init__(self, user, *args, **kwargs):
super().__init__(*args, **kwargs)
self.fields["domain"] = forms.ChoiceField(
choices=[
(domain.domain, domain.domain)
for domain in Domain.available_for_user(user)
]
)
def clean_username(self):
# Remove any leading @
value = self.cleaned_data["username"].lstrip("@")
# Validate it's all ascii characters
for character in value:
if character not in string.ascii_letters + string.digits + "_-":
raise forms.ValidationError(
"Only the letters a-z, numbers 0-9, dashes and underscores are allowed."
)
return value
def clean(self):
# Check for existing users
username = self.cleaned_data["username"]
domain = self.cleaned_data["domain"]
if Identity.objects.filter(username=username, domain=domain).exists():
raise forms.ValidationError(f"{username}@{domain} is already taken")
def get_form(self):
form_class = self.get_form_class()
return form_class(user=self.request.user, **self.get_form_kwargs())
def form_valid(self, form):
username = form.cleaned_data["username"]
domain = form.cleaned_data["domain"]
new_identity = Identity.objects.create(
actor_uri=f"https://{domain}/@{username}/actor/",
username=username,
domain_id=domain,
name=form.cleaned_data["name"],
local=True,
)
new_identity.users.add(self.request.user)
new_identity.generate_keypair()
return redirect(new_identity.urls.view)
class Actor(View):
"""
Returns the AP Actor object
"""
def get(self, request, handle):
identity = by_handle_or_404(self.request, handle)
response = {
"@context": [
"https://www.w3.org/ns/activitystreams",
"https://w3id.org/security/v1",
],
"id": identity.urls.actor.full(),
"type": "Person",
"inbox": identity.urls.inbox.full(),
"preferredUsername": identity.username,
"publicKey": {
"id": identity.urls.actor.full() + "#main-key",
"owner": identity.urls.actor.full(),
"publicKeyPem": identity.public_key,
},
"published": identity.created.strftime("%Y-%m-%dT%H:%M:%SZ"),
"url": identity.urls.view_short.full(),
}
if identity.name:
response["name"] = identity.name
if identity.summary:
response["summary"] = identity.summary
return JsonResponse(canonicalise(response, include_security=True))
@method_decorator(csrf_exempt, name="dispatch")
class Inbox(View):
"""
AP Inbox endpoint
"""
def post(self, request, handle):
# Verify body digest
if "HTTP_DIGEST" in request.META:
expected_digest = HttpSignature.calculate_digest(request.body)
if request.META["HTTP_DIGEST"] != expected_digest:
print("Bad digest")
return HttpResponseBadRequest()
# Get the signature details
if "HTTP_SIGNATURE" not in request.META:
print("No signature")
return HttpResponseBadRequest()
signature_details = HttpSignature.parse_signature(
request.META["HTTP_SIGNATURE"]
)
# Reject unknown algorithms
if signature_details["algorithm"] != "rsa-sha256":
print("Unknown algorithm")
return HttpResponseBadRequest()
# Create the signature payload
headers_string = HttpSignature.headers_from_request(
request, signature_details["headers"]
)
# Load the LD
document = canonicalise(json.loads(request.body))
print(signature_details)
print(headers_string)
print(document)
# Find the Identity by the actor on the incoming item
identity = Identity.by_actor_uri(document["actor"], create=True)
if not identity.public_key:
# See if we can fetch it right now
async_to_sync(identity.fetch_actor)()
if not identity.public_key:
print("Cannot retrieve actor")
return HttpResponseBadRequest("Cannot retrieve actor")
if not identity.verify_signature(
signature_details["signature"], headers_string
):
print("Bad signature")
# return HttpResponseBadRequest("Bad signature")
return JsonResponse({"status": "OK"})
class Webfinger(View):
"""
Services webfinger requests
"""
def get(self, request):
resource = request.GET.get("resource")
if not resource.startswith("acct:"):
raise Http404("Not an account resource")
handle = resource[5:].replace("testfedi", "feditest")
identity = by_handle_or_404(request, handle)
return JsonResponse(
{
"subject": f"acct:{identity.handle}",
"aliases": [
identity.urls.view_short.full(),
],
"links": [
{
"rel": "http://webfinger.net/rel/profile-page",
"type": "text/html",
"href": identity.urls.view_short.full(),
},
{
"rel": "self",
"type": "application/activity+json",
"href": identity.urls.actor.full(),
},
],
}
)