funkwhale/api/funkwhale_api/common/models.py

261 wiersze
8.5 KiB
Python
Czysty Zwykły widok Historia

import uuid
2019-11-25 08:49:06 +00:00
import magic
import mimetypes
from django.contrib.postgres.fields import JSONField
from django.contrib.contenttypes.fields import GenericForeignKey
from django.contrib.contenttypes.models import ContentType
from django.conf import settings
2019-04-17 14:11:24 +00:00
from django.core.serializers.json import DjangoJSONEncoder
2019-04-19 10:05:13 +00:00
from django.db import connections, models, transaction
from django.db.models import Lookup
from django.db.models.fields import Field
2019-04-19 10:05:13 +00:00
from django.db.models.sql.compiler import SQLCompiler
2019-11-25 08:49:06 +00:00
from django.dispatch import receiver
from django.utils import timezone
from django.urls import reverse
2019-11-25 08:49:06 +00:00
from versatileimagefield.fields import VersatileImageField
from versatileimagefield.image_warmer import VersatileImageFieldWarmer
from funkwhale_api.federation import utils as federation_utils
2019-11-25 08:49:06 +00:00
from . import utils
from . import validators
@Field.register_lookup
class NotEqual(Lookup):
lookup_name = "ne"
def as_sql(self, compiler, connection):
lhs, lhs_params = self.process_lhs(compiler, connection)
rhs, rhs_params = self.process_rhs(compiler, connection)
params = lhs_params + rhs_params
return "%s <> %s" % (lhs, rhs), params
2019-03-21 09:46:28 +00:00
2019-04-19 10:05:13 +00:00
class NullsLastSQLCompiler(SQLCompiler):
def get_order_by(self):
result = super().get_order_by()
if result and self.connection.vendor == "postgresql":
return [
(
expr,
(
sql + " NULLS LAST" if not sql.endswith(" NULLS LAST") else sql,
params,
is_ref,
),
)
for (expr, (sql, params, is_ref)) in result
]
return result
class NullsLastQuery(models.sql.query.Query):
"""Use a custom compiler to inject 'NULLS LAST' (for PostgreSQL)."""
def get_compiler(self, using=None, connection=None):
if using is None and connection is None:
raise ValueError("Need either using or connection")
if using:
connection = connections[using]
return NullsLastSQLCompiler(self, connection, using)
class NullsLastQuerySet(models.QuerySet):
def __init__(self, model=None, query=None, using=None, hints=None):
super().__init__(model, query, using, hints)
self.query = query or NullsLastQuery(self.model)
class LocalFromFidQuerySet:
def local(self, include=True):
host = settings.FEDERATION_HOSTNAME
query = models.Q(fid__startswith="http://{}/".format(host)) | models.Q(
fid__startswith="https://{}/".format(host)
)
if include:
return self.filter(query)
else:
return self.filter(~query)
class GenericTargetQuerySet(models.QuerySet):
def get_for_target(self, target):
content_type = ContentType.objects.get_for_model(target)
return self.filter(target_content_type=content_type, target_id=target.pk)
class Mutation(models.Model):
fid = models.URLField(unique=True, max_length=500, db_index=True)
uuid = models.UUIDField(unique=True, db_index=True, default=uuid.uuid4)
created_by = models.ForeignKey(
"federation.Actor",
related_name="created_mutations",
on_delete=models.SET_NULL,
null=True,
blank=True,
)
approved_by = models.ForeignKey(
"federation.Actor",
related_name="approved_mutations",
on_delete=models.SET_NULL,
null=True,
blank=True,
)
type = models.CharField(max_length=100, db_index=True)
# None = no choice, True = approved, False = refused
is_approved = models.NullBooleanField(default=None)
# None = not applied, True = applied, False = failed
is_applied = models.NullBooleanField(default=None)
creation_date = models.DateTimeField(default=timezone.now, db_index=True)
applied_date = models.DateTimeField(null=True, blank=True, db_index=True)
summary = models.TextField(max_length=2000, null=True, blank=True)
2019-04-17 14:11:24 +00:00
payload = JSONField(encoder=DjangoJSONEncoder)
previous_state = JSONField(null=True, default=None, encoder=DjangoJSONEncoder)
target_id = models.IntegerField(null=True)
target_content_type = models.ForeignKey(
ContentType,
null=True,
on_delete=models.CASCADE,
related_name="targeting_mutations",
)
target = GenericForeignKey("target_content_type", "target_id")
objects = GenericTargetQuerySet.as_manager()
def get_federation_id(self):
if self.fid:
return self.fid
return federation_utils.full_url(
reverse("federation:edits-detail", kwargs={"uuid": self.uuid})
)
def save(self, **kwargs):
if not self.pk and not self.fid:
self.fid = self.get_federation_id()
return super().save(**kwargs)
@transaction.atomic
def apply(self):
from . import mutations
if self.is_applied:
raise ValueError("Mutation was already applied")
previous_state = mutations.registry.apply(
type=self.type, obj=self.target, payload=self.payload
)
self.previous_state = previous_state
self.is_applied = True
self.applied_date = timezone.now()
self.save(update_fields=["is_applied", "applied_date", "previous_state"])
return previous_state
2019-11-25 08:49:06 +00:00
def get_file_path(instance, filename):
return utils.ChunkedPath("attachments")(instance, filename)
class AttachmentQuerySet(models.QuerySet):
def attached(self, include=True):
related_fields = ["covered_album"]
query = None
for field in related_fields:
field_query = ~models.Q(**{field: None})
query = query | field_query if query else field_query
if include is False:
query = ~query
return self.filter(query)
class Attachment(models.Model):
# Remote URL where the attachment can be fetched
url = models.URLField(max_length=500, unique=True, null=True)
uuid = models.UUIDField(unique=True, db_index=True, default=uuid.uuid4)
# Actor associated with the attachment
actor = models.ForeignKey(
"federation.Actor",
related_name="attachments",
on_delete=models.CASCADE,
null=True,
)
creation_date = models.DateTimeField(default=timezone.now)
last_fetch_date = models.DateTimeField(null=True, blank=True)
# File size
size = models.IntegerField(null=True, blank=True)
mimetype = models.CharField(null=True, blank=True, max_length=200)
file = VersatileImageField(
upload_to=get_file_path,
max_length=255,
validators=[
validators.ImageDimensionsValidator(min_width=50, min_height=50),
validators.FileValidator(
allowed_extensions=["png", "jpg", "jpeg"], max_size=1024 * 1024 * 5,
),
],
)
objects = AttachmentQuerySet.as_manager()
def save(self, **kwargs):
if self.file and not self.size:
self.size = self.file.size
if self.file and not self.mimetype:
self.mimetype = self.guess_mimetype()
return super().save()
@property
def is_local(self):
return federation_utils.is_local(self.fid)
def guess_mimetype(self):
f = self.file
b = min(1000000, f.size)
t = magic.from_buffer(f.read(b), mime=True)
if not t.startswith("image/"):
# failure, we try guessing by extension
mt, _ = mimetypes.guess_type(f.name)
if mt:
t = mt
return t
@property
def download_url_original(self):
if self.file:
return federation_utils.full_url(self.file.url)
proxy_url = reverse("api:v1:attachments-proxy", kwargs={"uuid": self.uuid})
return federation_utils.full_url(proxy_url + "?next=original")
@property
def download_url_medium_square_crop(self):
if self.file:
return federation_utils.full_url(self.file.crop["200x200"].url)
proxy_url = reverse("api:v1:attachments-proxy", kwargs={"uuid": self.uuid})
return federation_utils.full_url(proxy_url + "?next=medium_square_crop")
@receiver(models.signals.post_save, sender=Attachment)
def warm_attachment_thumbnails(sender, instance, **kwargs):
if not instance.file or not settings.CREATE_IMAGE_THUMBNAILS:
return
warmer = VersatileImageFieldWarmer(
instance_or_queryset=instance,
rendition_key_set="attachment_square",
image_attr="file",
)
num_created, failed_to_create = warmer.warm()