kopia lustrzana https://gitlab.com/marnanel/chapeau
424 wiersze
10 KiB
Python
424 wiersze
10 KiB
Python
# status.py
|
|
#
|
|
# Part of kepi.
|
|
# Copyright (c) 2018-2020 Marnanel Thurman.
|
|
# Licensed under the GNU Public License v2.
|
|
|
|
import logging
|
|
logger = logging.getLogger(name='kepi')
|
|
|
|
from django.db import models
|
|
from django.db.models.constraints import UniqueConstraint
|
|
from django.contrib.auth.models import AbstractUser
|
|
from django.conf import settings
|
|
import kepi.bowler_pub.crypto as crypto
|
|
from kepi.bowler_pub.utils import uri_to_url, is_local
|
|
import kepi.trilby_api.utils as trilby_utils
|
|
import kepi.trilby_api.signals as trilby_signals
|
|
from django.utils.timezone import now
|
|
from django.core.exceptions import ValidationError
|
|
from polymorphic.models import PolymorphicModel
|
|
import markdown
|
|
|
|
PUBLIC = "https://www.w3.org/ns/activitystreams#Public"
|
|
|
|
class Status(PolymorphicModel):
|
|
|
|
class Meta:
|
|
verbose_name_plural = 'Statuses'
|
|
|
|
@classmethod
|
|
def local_form(cls):
|
|
return Status
|
|
|
|
@classmethod
|
|
def remote_form(cls):
|
|
return Status
|
|
|
|
# TODO: The original design has the serial number
|
|
# monotonically but unpredictably increasing.
|
|
|
|
remote_url = models.URLField(
|
|
max_length = 255,
|
|
null = True,
|
|
blank = True,
|
|
unique = True,
|
|
)
|
|
|
|
account = models.ForeignKey(
|
|
'Person',
|
|
related_name = 'poster',
|
|
on_delete = models.DO_NOTHING,
|
|
)
|
|
|
|
in_reply_to = models.ForeignKey(
|
|
'self',
|
|
related_name = 'replies',
|
|
on_delete = models.DO_NOTHING,
|
|
null = True,
|
|
blank = True,
|
|
)
|
|
|
|
content_source = models.TextField(
|
|
help_text = 'Text of the status, as entered',
|
|
)
|
|
|
|
content_as_html_denormed = models.TextField(
|
|
help_text = 'HTML rendering of content_source. Do not edit!',
|
|
editable = False,
|
|
null = True,
|
|
default = None,
|
|
)
|
|
|
|
created_at = models.DateTimeField(
|
|
default = now,
|
|
)
|
|
|
|
# TODO Media
|
|
|
|
sensitive = models.BooleanField(
|
|
default = False,
|
|
)
|
|
|
|
spoiler_source = models.CharField(
|
|
max_length = 255,
|
|
null = True,
|
|
blank = True,
|
|
default = '',
|
|
)
|
|
|
|
spoiler_as_html_denormed = models.CharField(
|
|
max_length = 255,
|
|
null = True,
|
|
editable = False,
|
|
default = None,
|
|
)
|
|
|
|
visibility = models.CharField(
|
|
max_length = 1,
|
|
default = trilby_utils.VISIBILITY_PUBLIC,
|
|
choices = trilby_utils.VISIBILITY_CHOICES,
|
|
help_text = "Visiblity of this status.\n\n"+\
|
|
trilby_utils.VISIBILITY_HELP_TEXT,
|
|
)
|
|
|
|
language = models.CharField(
|
|
max_length = 255,
|
|
null = True,
|
|
default = settings.KEPI['LANGUAGES'][0],
|
|
)
|
|
|
|
reblog_of = models.ForeignKey(
|
|
'self',
|
|
related_name = 'reblogs',
|
|
on_delete = models.CASCADE,
|
|
null = True,
|
|
blank = True,
|
|
)
|
|
|
|
idempotency_key = models.CharField(
|
|
max_length = 255,
|
|
null = True,
|
|
default = None,
|
|
)
|
|
|
|
@property
|
|
def content_as_html(self):
|
|
"""
|
|
Returns an HTML rendition of content_source.
|
|
The return value will be cached.
|
|
Saving the record will clear this cache.
|
|
"""
|
|
|
|
if self.content_as_html_denormed is not None:
|
|
return self.content_as_html_denormed
|
|
|
|
if self.content_source is None:
|
|
result = '<p></p>'
|
|
else:
|
|
result = markdown.markdown(self.content_source)
|
|
|
|
self.content_as_html_denormed = result
|
|
return result
|
|
|
|
@property
|
|
def spoiler_as_html(self):
|
|
"""
|
|
Returns an HTML rendition of spoiler_source.
|
|
The return value will be cached.
|
|
Saving the record will clear this cache.
|
|
"""
|
|
|
|
if self.spoiler_as_html_denormed is not None:
|
|
return self.spoiler_as_html_denormed
|
|
|
|
if self.spoiler_source is None:
|
|
result = '<p></p>'
|
|
else:
|
|
result = markdown.markdown(self.spoiler_source)
|
|
|
|
self.spoiler_as_html_denormed = result
|
|
return result
|
|
|
|
@property
|
|
def emojis(self):
|
|
return [] # TODO
|
|
|
|
@property
|
|
def reblogs_count(self):
|
|
return self.reblogs.count()
|
|
|
|
@property
|
|
def favourites_count(self):
|
|
return 0 # FIXME
|
|
|
|
@property
|
|
def original(self):
|
|
result = self.reblog_of
|
|
|
|
if result is None:
|
|
return self
|
|
|
|
if result.reblog_of is not None:
|
|
# Reblog of reblog, which is invalid
|
|
return self
|
|
|
|
return result
|
|
|
|
@property
|
|
def reblogged(self):
|
|
return self.reblogs.exists()
|
|
|
|
@property
|
|
def favourited(self):
|
|
return False # FIXME
|
|
|
|
@property
|
|
def muted(self):
|
|
return False # FIXME
|
|
|
|
@property
|
|
def pinned(self):
|
|
return False # FIXME
|
|
|
|
@property
|
|
def media_attachments(self):
|
|
return [] # FIXME
|
|
|
|
@property
|
|
def tags(self):
|
|
import kepi.trilby_api.models.person as trilby_person
|
|
|
|
return list(trilby_person.Person.objects.filter(
|
|
mention__status = self,
|
|
))
|
|
|
|
@property
|
|
def card(self):
|
|
return None # FIXME
|
|
|
|
@property
|
|
def poll(self):
|
|
return None # FIXME
|
|
|
|
@property
|
|
def application(self):
|
|
return None # FIXME
|
|
|
|
@property
|
|
def conversation(self):
|
|
return 'conversation' # FIXME
|
|
|
|
@property
|
|
def in_reply_to_account_id(self):
|
|
return self.in_reply_to.account.id
|
|
|
|
@property
|
|
def uri(self):
|
|
# I know this property is called "uri", but
|
|
# this matches the behaviour of Mastodon
|
|
return self.url
|
|
|
|
@property
|
|
def url(self):
|
|
if self.remote_url is not None:
|
|
return self.remote_url
|
|
|
|
return uri_to_url(settings.KEPI['STATUS_LINK'] % {
|
|
'username': self.account.username,
|
|
'id': self.id,
|
|
})
|
|
|
|
@property
|
|
def activity_url(self):
|
|
if self.remote_url is not None:
|
|
raise ValueError(
|
|
"Activity URL is not stored for remote statuses",
|
|
)
|
|
|
|
return uri_to_url(settings.KEPI['STATUS_ACTIVITY_LINK'] % {
|
|
'username': self.account.username,
|
|
'id': self.id,
|
|
})
|
|
|
|
@property
|
|
def ancestors(self):
|
|
|
|
result = []
|
|
parent = self.in_reply_to
|
|
|
|
while parent is not None:
|
|
result.insert(0, parent)
|
|
parent = parent.in_reply_to
|
|
|
|
return result
|
|
|
|
@property
|
|
def descendants(self):
|
|
|
|
result = []
|
|
current = self
|
|
|
|
while True:
|
|
try:
|
|
child = Status.objects.get(
|
|
in_reply_to = current,
|
|
)
|
|
except Status.DoesNotExist:
|
|
break
|
|
|
|
result.append(child)
|
|
current = child
|
|
|
|
return result
|
|
|
|
@property
|
|
def thread(self):
|
|
|
|
result = self.ancestors
|
|
result.append(self)
|
|
result.extend(self.descendants)
|
|
|
|
return result
|
|
|
|
@property
|
|
def to(self):
|
|
return [PUBLIC] # FIXME
|
|
|
|
@property
|
|
def cc(self):
|
|
return [] # FIXME
|
|
|
|
def save(self,
|
|
send_signal = False,
|
|
*args, **kwargs):
|
|
|
|
newly_made = self.pk is None
|
|
|
|
if self.reblog_of == self:
|
|
raise ValueError("Status can't be a reblog of itself")
|
|
|
|
if self.in_reply_to == self:
|
|
raise ValueError("Status can't be a reply to itself")
|
|
|
|
if not newly_made:
|
|
old = self.__class__.objects.get(pk=self.pk)
|
|
|
|
if self.content_source != old.content_source:
|
|
logger.debug("%s: content changed; flushing HTML cache",
|
|
self)
|
|
self.content_as_html_denormed = None
|
|
|
|
if self.spoiler_source != old.spoiler_source:
|
|
logger.debug("%s: spoiler changed; flushing HTML cache",
|
|
self)
|
|
self.spoiler_as_html_denormed = None
|
|
|
|
super().save(*args, **kwargs)
|
|
|
|
if send_signal and newly_made:
|
|
|
|
if self.reblog_of is None:
|
|
trilby_signals.posted.send(sender=self)
|
|
else:
|
|
trilby_signals.reblogged.send(sender=self)
|
|
|
|
def __str__(self):
|
|
return '%s: %s' % (
|
|
self.id,
|
|
self.content_source,
|
|
)
|
|
|
|
@classmethod
|
|
def lookup(cls, url):
|
|
|
|
# TODO: not yet tested
|
|
# FIXME: if url is local, parse and return local <-- current breakage XXX
|
|
# FIXME: if remote is not found, *possibly* create and return?
|
|
|
|
if is_local(url):
|
|
|
|
view = trilby_utils.find_local_view(
|
|
url,
|
|
)
|
|
|
|
view = trilby_utils.find_local_view(
|
|
url,
|
|
which_views = ['StatusView'],
|
|
)
|
|
|
|
if view is None:
|
|
return None
|
|
|
|
statusid = int(view.kwargs['status'])
|
|
|
|
try:
|
|
result = cls.objects.get(
|
|
id=statusid,
|
|
)
|
|
except cls.DoesNotExist:
|
|
logger.debug('%s is local but does not exist',
|
|
url)
|
|
return None
|
|
|
|
if result.account.local_user.username != view.kwargs['username']:
|
|
logger.debug('%s is local but the username doesn\'t match',
|
|
url)
|
|
return None
|
|
|
|
logger.debug('%s is local and exists: %s',
|
|
url, result)
|
|
return result
|
|
|
|
# so, it's remote
|
|
|
|
try:
|
|
result = cls.objects.get(remote_url = url)
|
|
logger.debug('%s is remote and exists: %s',
|
|
url, result)
|
|
|
|
return result
|
|
except cls.DoesNotExist:
|
|
pass
|
|
|
|
logger.debug('%s is unknown',
|
|
url)
|
|
|
|
return None
|
|
|
|
@property
|
|
def is_reply(self):
|
|
return self.in_reply_to is not None
|
|
|
|
@property
|
|
def text(self):
|
|
# XXX It's possible that one of (text, content) is
|
|
# HTML and one is plain text. But the docs don't
|
|
# seem to be forthcoming on this point, so we'll
|
|
# just have to wait until we find out.
|
|
return self.content_source
|
|
|
|
@property
|
|
def is_local(self):
|
|
return self.remote_url is None
|