kopia lustrzana https://dev.funkwhale.audio/funkwhale/funkwhale
263 wiersze
8.7 KiB
Python
263 wiersze
8.7 KiB
Python
import datetime
|
|
import logging
|
|
import os
|
|
import requests
|
|
|
|
from django.conf import settings
|
|
from django.db.models import Q, F
|
|
from django.utils import timezone
|
|
from dynamic_preferences.registries import global_preferences_registry
|
|
from requests.exceptions import RequestException
|
|
|
|
from funkwhale_api.common import preferences
|
|
from funkwhale_api.common import session
|
|
from funkwhale_api.common import utils as common_utils
|
|
from funkwhale_api.music import models as music_models
|
|
from funkwhale_api.taskapp import celery
|
|
|
|
from . import keys
|
|
from . import models, signing
|
|
from . import serializers
|
|
from . import routes
|
|
from . import utils
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
@celery.app.task(name="federation.clean_music_cache")
|
|
def clean_music_cache():
|
|
preferences = global_preferences_registry.manager()
|
|
delay = preferences["federation__music_cache_duration"]
|
|
if delay < 1:
|
|
return # cache clearing disabled
|
|
limit = timezone.now() - datetime.timedelta(minutes=delay)
|
|
|
|
candidates = (
|
|
music_models.Upload.objects.filter(
|
|
Q(audio_file__isnull=False)
|
|
& (Q(accessed_date__lt=limit) | Q(accessed_date=None)),
|
|
# library__actor__user=None,
|
|
)
|
|
.local(False)
|
|
.exclude(audio_file="")
|
|
.only("audio_file", "id")
|
|
.order_by("id")
|
|
)
|
|
for upload in candidates:
|
|
upload.audio_file.delete()
|
|
|
|
# we also delete orphaned files, if any
|
|
storage = models.LibraryTrack._meta.get_field("audio_file").storage
|
|
files = get_files(storage, "federation_cache/tracks")
|
|
existing = music_models.Upload.objects.filter(audio_file__in=files)
|
|
missing = set(files) - set(existing.values_list("audio_file", flat=True))
|
|
for m in missing:
|
|
storage.delete(m)
|
|
|
|
|
|
def get_files(storage, *parts):
|
|
"""
|
|
This is a recursive function that return all files available
|
|
in a given directory using django's storage.
|
|
"""
|
|
if not parts:
|
|
raise ValueError("Missing path")
|
|
try:
|
|
dirs, files = storage.listdir(os.path.join(*parts))
|
|
except FileNotFoundError:
|
|
return []
|
|
for dir in dirs:
|
|
files += get_files(storage, *(list(parts) + [dir]))
|
|
return [os.path.join(parts[-1], path) for path in files]
|
|
|
|
|
|
@celery.app.task(name="federation.dispatch_inbox")
|
|
@celery.require_instance(models.Activity.objects.select_related(), "activity")
|
|
def dispatch_inbox(activity, call_handlers=True):
|
|
"""
|
|
Given an activity instance, triggers our internal delivery logic (follow
|
|
creation, etc.)
|
|
"""
|
|
|
|
routes.inbox.dispatch(
|
|
activity.payload,
|
|
context={
|
|
"activity": activity,
|
|
"actor": activity.actor,
|
|
"inbox_items": activity.inbox_items.filter(is_read=False),
|
|
},
|
|
call_handlers=call_handlers,
|
|
)
|
|
|
|
|
|
@celery.app.task(name="federation.dispatch_outbox")
|
|
@celery.require_instance(models.Activity.objects.select_related(), "activity")
|
|
def dispatch_outbox(activity):
|
|
"""
|
|
Deliver a local activity to its recipients, both locally and remotely
|
|
"""
|
|
inbox_items = activity.inbox_items.filter(is_read=False).select_related()
|
|
|
|
if inbox_items.exists():
|
|
dispatch_inbox.delay(activity_id=activity.pk, call_handlers=False)
|
|
|
|
if not preferences.get("federation__enabled"):
|
|
# federation is disabled, we only deliver to local recipients
|
|
return
|
|
|
|
deliveries = activity.deliveries.filter(is_delivered=False)
|
|
|
|
for id in deliveries.values_list("pk", flat=True):
|
|
deliver_to_remote.delay(delivery_id=id)
|
|
|
|
|
|
@celery.app.task(
|
|
name="federation.deliver_to_remote_inbox",
|
|
autoretry_for=[RequestException],
|
|
retry_backoff=30,
|
|
max_retries=5,
|
|
)
|
|
@celery.require_instance(
|
|
models.Delivery.objects.filter(is_delivered=False).select_related(
|
|
"activity__actor"
|
|
),
|
|
"delivery",
|
|
)
|
|
def deliver_to_remote(delivery):
|
|
|
|
if not preferences.get("federation__enabled"):
|
|
# federation is disabled, we only deliver to local recipients
|
|
return
|
|
|
|
actor = delivery.activity.actor
|
|
logger.info("Preparing activity delivery to %s", delivery.inbox_url)
|
|
auth = signing.get_auth(actor.private_key, actor.private_key_id)
|
|
try:
|
|
response = session.get_session().post(
|
|
auth=auth,
|
|
json=delivery.activity.payload,
|
|
url=delivery.inbox_url,
|
|
timeout=5,
|
|
verify=settings.EXTERNAL_REQUESTS_VERIFY_SSL,
|
|
headers={"Content-Type": "application/activity+json"},
|
|
)
|
|
logger.debug("Remote answered with %s", response.status_code)
|
|
response.raise_for_status()
|
|
except Exception:
|
|
delivery.last_attempt_date = timezone.now()
|
|
delivery.attempts = F("attempts") + 1
|
|
delivery.save(update_fields=["last_attempt_date", "attempts"])
|
|
raise
|
|
else:
|
|
delivery.last_attempt_date = timezone.now()
|
|
delivery.attempts = F("attempts") + 1
|
|
delivery.is_delivered = True
|
|
delivery.save(update_fields=["last_attempt_date", "attempts", "is_delivered"])
|
|
|
|
|
|
def fetch_nodeinfo(domain_name):
|
|
s = session.get_session()
|
|
wellknown_url = "https://{}/.well-known/nodeinfo".format(domain_name)
|
|
response = s.get(
|
|
url=wellknown_url, timeout=5, verify=settings.EXTERNAL_REQUESTS_VERIFY_SSL
|
|
)
|
|
response.raise_for_status()
|
|
serializer = serializers.NodeInfoSerializer(data=response.json())
|
|
serializer.is_valid(raise_exception=True)
|
|
nodeinfo_url = None
|
|
for link in serializer.validated_data["links"]:
|
|
if link["rel"] == "http://nodeinfo.diaspora.software/ns/schema/2.0":
|
|
nodeinfo_url = link["href"]
|
|
break
|
|
|
|
response = s.get(
|
|
url=nodeinfo_url, timeout=5, verify=settings.EXTERNAL_REQUESTS_VERIFY_SSL
|
|
)
|
|
response.raise_for_status()
|
|
return response.json()
|
|
|
|
|
|
@celery.app.task(name="federation.update_domain_nodeinfo")
|
|
@celery.require_instance(
|
|
models.Domain.objects.external(), "domain", id_kwarg_name="domain_name"
|
|
)
|
|
def update_domain_nodeinfo(domain):
|
|
now = timezone.now()
|
|
try:
|
|
nodeinfo = {"status": "ok", "payload": fetch_nodeinfo(domain.name)}
|
|
except (requests.RequestException, serializers.serializers.ValidationError) as e:
|
|
nodeinfo = {"status": "error", "error": str(e)}
|
|
|
|
service_actor_id = common_utils.recursive_getattr(
|
|
nodeinfo, "payload.metadata.actorId", permissive=True
|
|
)
|
|
try:
|
|
domain.service_actor = (
|
|
utils.retrieve_ap_object(
|
|
service_actor_id,
|
|
queryset=models.Actor,
|
|
serializer_class=serializers.ActorSerializer,
|
|
)
|
|
if service_actor_id
|
|
else None
|
|
)
|
|
except (serializers.serializers.ValidationError, RequestException) as e:
|
|
logger.warning(
|
|
"Cannot fetch system actor for domain %s: %s", domain.name, str(e)
|
|
)
|
|
domain.nodeinfo_fetch_date = now
|
|
domain.nodeinfo = nodeinfo
|
|
domain.save(update_fields=["nodeinfo", "nodeinfo_fetch_date", "service_actor"])
|
|
|
|
|
|
def delete_qs(qs):
|
|
label = qs.model._meta.label
|
|
result = qs.delete()
|
|
related = sum(result[1].values())
|
|
|
|
logger.info(
|
|
"Purged %s %s objects (and %s related entities)", result[0], label, related
|
|
)
|
|
|
|
|
|
def handle_purge_actors(ids, only=[]):
|
|
"""
|
|
Empty only means we purge everything
|
|
Otherwise, we purge only the requested bits: media
|
|
"""
|
|
# purge follows (received emitted)
|
|
if not only:
|
|
delete_qs(models.LibraryFollow.objects.filter(target__actor_id__in=ids))
|
|
delete_qs(models.Follow.objects.filter(actor_id__in=ids))
|
|
|
|
# purge audio content
|
|
if not only or "media" in only:
|
|
delete_qs(models.LibraryFollow.objects.filter(actor_id__in=ids))
|
|
delete_qs(models.Follow.objects.filter(target_id__in=ids))
|
|
delete_qs(music_models.Upload.objects.filter(library__actor_id__in=ids))
|
|
delete_qs(music_models.Library.objects.filter(actor_id__in=ids))
|
|
|
|
# purge remaining activities / deliveries
|
|
if not only:
|
|
delete_qs(models.InboxItem.objects.filter(actor_id__in=ids))
|
|
delete_qs(models.Activity.objects.filter(actor_id__in=ids))
|
|
|
|
|
|
@celery.app.task(name="federation.purge_actors")
|
|
def purge_actors(ids=[], domains=[], only=[]):
|
|
actors = models.Actor.objects.filter(
|
|
Q(id__in=ids) | Q(domain_id__in=domains)
|
|
).order_by("id")
|
|
found_ids = list(actors.values_list("id", flat=True))
|
|
logger.info("Starting purging %s accounts", len(found_ids))
|
|
handle_purge_actors(ids=found_ids, only=only)
|
|
|
|
|
|
@celery.app.task(name="federation.rotate_actor_key")
|
|
@celery.require_instance(models.Actor.objects.local(), "actor")
|
|
def rotate_actor_key(actor):
|
|
pair = keys.get_key_pair()
|
|
actor.private_key = pair[0].decode()
|
|
actor.public_key = pair[1].decode()
|
|
actor.save(update_fields=["private_key", "public_key"])
|