kopia lustrzana https://dev.funkwhale.audio/funkwhale/funkwhale
569 wiersze
17 KiB
Python
569 wiersze
17 KiB
Python
import os
|
|
import io
|
|
import arrow
|
|
import datetime
|
|
import tempfile
|
|
import shutil
|
|
import markdown
|
|
import uuid
|
|
|
|
from django.conf import settings
|
|
from django.db import models
|
|
from django.core.files.base import ContentFile
|
|
from django.core.files import File
|
|
from django.db.models.signals import post_save
|
|
from django.dispatch import receiver
|
|
from django.urls import reverse
|
|
from django.utils import timezone
|
|
|
|
from taggit.managers import TaggableManager
|
|
from versatileimagefield.fields import VersatileImageField
|
|
|
|
from funkwhale_api import downloader
|
|
from funkwhale_api import musicbrainz
|
|
from funkwhale_api.federation import utils as federation_utils
|
|
from . import importers
|
|
from . import utils
|
|
|
|
|
|
class APIModelMixin(models.Model):
|
|
mbid = models.UUIDField(unique=True, db_index=True, null=True, blank=True)
|
|
uuid = models.UUIDField(
|
|
unique=True, db_index=True, default=uuid.uuid4)
|
|
api_includes = []
|
|
creation_date = models.DateTimeField(default=timezone.now)
|
|
import_hooks = []
|
|
|
|
class Meta:
|
|
abstract = True
|
|
ordering = ['-creation_date']
|
|
|
|
@classmethod
|
|
def get_or_create_from_api(cls, mbid):
|
|
try:
|
|
return cls.objects.get(mbid=mbid), False
|
|
except cls.DoesNotExist:
|
|
return cls.create_from_api(id=mbid), True
|
|
|
|
def get_api_data(self):
|
|
return self.__class__.api.get(id=self.mbid, includes=self.api_includes)[self.musicbrainz_model]
|
|
|
|
@classmethod
|
|
def create_from_api(cls, **kwargs):
|
|
if kwargs.get('id'):
|
|
raw_data = cls.api.get(id=kwargs['id'], includes=cls.api_includes)[cls.musicbrainz_model]
|
|
else:
|
|
raw_data = cls.api.search(**kwargs)['{0}-list'.format(cls.musicbrainz_model)][0]
|
|
cleaned_data = cls.clean_musicbrainz_data(raw_data)
|
|
return importers.load(cls, cleaned_data, raw_data, cls.import_hooks)
|
|
|
|
@classmethod
|
|
def clean_musicbrainz_data(cls, data):
|
|
cleaned_data = {}
|
|
mapping = importers.Mapping(cls.musicbrainz_mapping)
|
|
for key, value in data.items():
|
|
try:
|
|
cleaned_key, cleaned_value = mapping.from_musicbrainz(key, value)
|
|
cleaned_data[cleaned_key] = cleaned_value
|
|
except KeyError as e:
|
|
pass
|
|
return cleaned_data
|
|
|
|
@property
|
|
def musicbrainz_url(self):
|
|
if self.mbid:
|
|
return 'https://musicbrainz.org/{}/{}'.format(
|
|
self.musicbrainz_model, self.mbid)
|
|
|
|
|
|
class Artist(APIModelMixin):
|
|
name = models.CharField(max_length=255)
|
|
|
|
musicbrainz_model = 'artist'
|
|
musicbrainz_mapping = {
|
|
'mbid': {
|
|
'musicbrainz_field_name': 'id'
|
|
},
|
|
'name': {
|
|
'musicbrainz_field_name': 'name'
|
|
}
|
|
}
|
|
api = musicbrainz.api.artists
|
|
|
|
def __str__(self):
|
|
return self.name
|
|
|
|
@property
|
|
def tags(self):
|
|
t = []
|
|
for album in self.albums.all():
|
|
for tag in album.tags:
|
|
t.append(tag)
|
|
return set(t)
|
|
|
|
@classmethod
|
|
def get_or_create_from_name(cls, name, **kwargs):
|
|
kwargs.update({'name': name})
|
|
return cls.objects.get_or_create(
|
|
name__iexact=name,
|
|
defaults=kwargs)[0]
|
|
|
|
|
|
def import_artist(v):
|
|
a = Artist.get_or_create_from_api(mbid=v[0]['artist']['id'])[0]
|
|
return a
|
|
|
|
|
|
def parse_date(v):
|
|
if len(v) == 4:
|
|
return datetime.date(int(v), 1, 1)
|
|
d = arrow.get(v).date()
|
|
return d
|
|
|
|
|
|
def import_tracks(instance, cleaned_data, raw_data):
|
|
for track_data in raw_data['medium-list'][0]['track-list']:
|
|
track_cleaned_data = Track.clean_musicbrainz_data(track_data['recording'])
|
|
track_cleaned_data['album'] = instance
|
|
track_cleaned_data['position'] = int(track_data['position'])
|
|
track = importers.load(Track, track_cleaned_data, track_data, Track.import_hooks)
|
|
|
|
|
|
class Album(APIModelMixin):
|
|
title = models.CharField(max_length=255)
|
|
artist = models.ForeignKey(
|
|
Artist, related_name='albums', on_delete=models.CASCADE)
|
|
release_date = models.DateField(null=True)
|
|
release_group_id = models.UUIDField(null=True, blank=True)
|
|
cover = VersatileImageField(upload_to='albums/covers/%Y/%m/%d', null=True, blank=True)
|
|
TYPE_CHOICES = (
|
|
('album', 'Album'),
|
|
)
|
|
type = models.CharField(choices=TYPE_CHOICES, max_length=30, default='album')
|
|
|
|
api_includes = ['artist-credits', 'recordings', 'media', 'release-groups']
|
|
api = musicbrainz.api.releases
|
|
musicbrainz_model = 'release'
|
|
musicbrainz_mapping = {
|
|
'mbid': {
|
|
'musicbrainz_field_name': 'id',
|
|
},
|
|
'position': {
|
|
'musicbrainz_field_name': 'release-list',
|
|
'converter': lambda v: int(v[0]['medium-list'][0]['position']),
|
|
},
|
|
'release_group_id': {
|
|
'musicbrainz_field_name': 'release-group',
|
|
'converter': lambda v: v['id'],
|
|
},
|
|
'title': {
|
|
'musicbrainz_field_name': 'title',
|
|
},
|
|
'release_date': {
|
|
'musicbrainz_field_name': 'date',
|
|
'converter': parse_date,
|
|
|
|
},
|
|
'type': {
|
|
'musicbrainz_field_name': 'type',
|
|
'converter': lambda v: v.lower(),
|
|
},
|
|
'artist': {
|
|
'musicbrainz_field_name': 'artist-credit',
|
|
'converter': import_artist,
|
|
}
|
|
}
|
|
|
|
def get_image(self):
|
|
image_data = musicbrainz.api.images.get_front(str(self.mbid))
|
|
f = ContentFile(image_data)
|
|
self.cover.save('{0}.jpg'.format(self.mbid), f)
|
|
return self.cover.file
|
|
|
|
def __str__(self):
|
|
return self.title
|
|
|
|
@property
|
|
def tags(self):
|
|
t = []
|
|
for track in self.tracks.all():
|
|
for tag in track.tags.all():
|
|
t.append(tag)
|
|
return set(t)
|
|
|
|
@classmethod
|
|
def get_or_create_from_title(cls, title, **kwargs):
|
|
kwargs.update({'title': title})
|
|
return cls.objects.get_or_create(
|
|
title__iexact=title,
|
|
defaults=kwargs)[0]
|
|
|
|
|
|
def import_tags(instance, cleaned_data, raw_data):
|
|
MINIMUM_COUNT = 2
|
|
tags_to_add = []
|
|
for tag_data in raw_data.get('tag-list', []):
|
|
try:
|
|
if int(tag_data['count']) < MINIMUM_COUNT:
|
|
continue
|
|
except ValueError:
|
|
continue
|
|
tags_to_add.append(tag_data['name'])
|
|
instance.tags.add(*tags_to_add)
|
|
|
|
|
|
def import_album(v):
|
|
a = Album.get_or_create_from_api(mbid=v[0]['id'])[0]
|
|
return a
|
|
|
|
|
|
def link_recordings(instance, cleaned_data, raw_data):
|
|
tracks = [
|
|
r['target']
|
|
for r in raw_data['recording-relation-list']
|
|
]
|
|
Track.objects.filter(mbid__in=tracks).update(work=instance)
|
|
|
|
|
|
def import_lyrics(instance, cleaned_data, raw_data):
|
|
try:
|
|
url = [
|
|
url_data
|
|
for url_data in raw_data['url-relation-list']
|
|
if url_data['type'] == 'lyrics'
|
|
][0]['target']
|
|
except (IndexError, KeyError):
|
|
return
|
|
l, _ = Lyrics.objects.get_or_create(work=instance, url=url)
|
|
|
|
return l
|
|
|
|
|
|
class Work(APIModelMixin):
|
|
language = models.CharField(max_length=20)
|
|
nature = models.CharField(max_length=50)
|
|
title = models.CharField(max_length=255)
|
|
|
|
api = musicbrainz.api.works
|
|
api_includes = ['url-rels', 'recording-rels']
|
|
musicbrainz_model = 'work'
|
|
musicbrainz_mapping = {
|
|
'mbid': {
|
|
'musicbrainz_field_name': 'id'
|
|
},
|
|
'title': {
|
|
'musicbrainz_field_name': 'title'
|
|
},
|
|
'language': {
|
|
'musicbrainz_field_name': 'language',
|
|
},
|
|
'nature': {
|
|
'musicbrainz_field_name': 'type',
|
|
'converter': lambda v: v.lower(),
|
|
},
|
|
}
|
|
import_hooks = [
|
|
import_lyrics,
|
|
link_recordings
|
|
]
|
|
|
|
def fetch_lyrics(self):
|
|
l = self.lyrics.first()
|
|
if l:
|
|
return l
|
|
data = self.api.get(self.mbid, includes=['url-rels'])['work']
|
|
l = import_lyrics(self, {}, data)
|
|
|
|
return l
|
|
|
|
|
|
class Lyrics(models.Model):
|
|
uuid = models.UUIDField(
|
|
unique=True, db_index=True, default=uuid.uuid4)
|
|
work = models.ForeignKey(
|
|
Work,
|
|
related_name='lyrics',
|
|
null=True,
|
|
blank=True,
|
|
on_delete=models.CASCADE)
|
|
url = models.URLField(unique=True)
|
|
content = models.TextField(null=True, blank=True)
|
|
|
|
@property
|
|
def content_rendered(self):
|
|
return markdown.markdown(
|
|
self.content,
|
|
safe_mode=True,
|
|
enable_attributes=False,
|
|
extensions=['markdown.extensions.nl2br'])
|
|
|
|
|
|
class TrackQuerySet(models.QuerySet):
|
|
def for_nested_serialization(self):
|
|
return (self.select_related()
|
|
.select_related('album__artist')
|
|
.prefetch_related(
|
|
'tags',
|
|
'files',
|
|
'artist__albums__tracks__tags'))
|
|
|
|
|
|
class Track(APIModelMixin):
|
|
title = models.CharField(max_length=255)
|
|
artist = models.ForeignKey(
|
|
Artist, related_name='tracks', on_delete=models.CASCADE)
|
|
position = models.PositiveIntegerField(null=True, blank=True)
|
|
album = models.ForeignKey(
|
|
Album,
|
|
related_name='tracks',
|
|
null=True,
|
|
blank=True,
|
|
on_delete=models.CASCADE)
|
|
work = models.ForeignKey(
|
|
Work,
|
|
related_name='tracks',
|
|
null=True,
|
|
blank=True,
|
|
on_delete=models.CASCADE)
|
|
|
|
musicbrainz_model = 'recording'
|
|
api = musicbrainz.api.recordings
|
|
api_includes = ['artist-credits', 'releases', 'media', 'tags', 'work-rels']
|
|
musicbrainz_mapping = {
|
|
'mbid': {
|
|
'musicbrainz_field_name': 'id'
|
|
},
|
|
'title': {
|
|
'musicbrainz_field_name': 'title'
|
|
},
|
|
'artist': {
|
|
'musicbrainz_field_name': 'artist-credit',
|
|
'converter': lambda v: Artist.get_or_create_from_api(mbid=v[0]['artist']['id'])[0],
|
|
},
|
|
'album': {
|
|
'musicbrainz_field_name': 'release-list',
|
|
'converter': import_album,
|
|
},
|
|
}
|
|
import_hooks = [
|
|
import_tags
|
|
]
|
|
objects = TrackQuerySet.as_manager()
|
|
tags = TaggableManager()
|
|
|
|
class Meta:
|
|
ordering = ['album', 'position']
|
|
|
|
def __str__(self):
|
|
return self.title
|
|
|
|
def save(self, **kwargs):
|
|
try:
|
|
self.artist
|
|
except Artist.DoesNotExist:
|
|
self.artist = self.album.artist
|
|
super().save(**kwargs)
|
|
|
|
def get_work(self):
|
|
if self.work:
|
|
return self.work
|
|
data = self.api.get(self.mbid, includes=['work-rels'])
|
|
try:
|
|
work_data = data['recording']['work-relation-list'][0]['work']
|
|
except (IndexError, KeyError):
|
|
return
|
|
work, _ = Work.get_or_create_from_api(mbid=work_data['id'])
|
|
return work
|
|
|
|
def get_lyrics_url(self):
|
|
return reverse('api:v1:tracks-lyrics', kwargs={'pk': self.pk})
|
|
|
|
@property
|
|
def full_name(self):
|
|
try:
|
|
return '{} - {} - {}'.format(
|
|
self.artist.name,
|
|
self.album.title,
|
|
self.title,
|
|
)
|
|
except AttributeError:
|
|
return '{} - {}'.format(
|
|
self.artist.name,
|
|
self.title,
|
|
)
|
|
|
|
def get_activity_url(self):
|
|
if self.mbid:
|
|
return 'https://musicbrainz.org/recording/{}'.format(
|
|
self.mbid)
|
|
return settings.FUNKWHALE_URL + '/tracks/{}'.format(self.pk)
|
|
|
|
@classmethod
|
|
def get_or_create_from_title(cls, title, **kwargs):
|
|
kwargs.update({'title': title})
|
|
return cls.objects.get_or_create(
|
|
title__iexact=title,
|
|
defaults=kwargs)[0]
|
|
|
|
|
|
class TrackFile(models.Model):
|
|
uuid = models.UUIDField(
|
|
unique=True, db_index=True, default=uuid.uuid4)
|
|
track = models.ForeignKey(
|
|
Track, related_name='files', on_delete=models.CASCADE)
|
|
audio_file = models.FileField(upload_to='tracks/%Y/%m/%d', max_length=255)
|
|
source = models.URLField(null=True, blank=True)
|
|
creation_date = models.DateTimeField(default=timezone.now)
|
|
modification_date = models.DateTimeField(auto_now=True)
|
|
duration = models.IntegerField(null=True, blank=True)
|
|
acoustid_track_id = models.UUIDField(null=True, blank=True)
|
|
mimetype = models.CharField(null=True, blank=True, max_length=200)
|
|
|
|
library_track = models.OneToOneField(
|
|
'federation.LibraryTrack',
|
|
related_name='local_track_file',
|
|
on_delete=models.CASCADE,
|
|
null=True,
|
|
blank=True,
|
|
)
|
|
|
|
def download_file(self):
|
|
# import the track file, since there is not any
|
|
# we create a tmp dir for the download
|
|
tmp_dir = tempfile.mkdtemp()
|
|
data = downloader.download(
|
|
self.source,
|
|
target_directory=tmp_dir)
|
|
self.duration = data.get('duration', None)
|
|
self.audio_file.save(
|
|
os.path.basename(data['audio_file_path']),
|
|
File(open(data['audio_file_path'], 'rb'))
|
|
)
|
|
shutil.rmtree(tmp_dir)
|
|
return self.audio_file
|
|
|
|
def get_federation_url(self):
|
|
return federation_utils.full_url(
|
|
'/federation/music/file/{}'.format(self.uuid)
|
|
)
|
|
|
|
@property
|
|
def path(self):
|
|
return reverse(
|
|
'api:v1:trackfiles-serve', kwargs={'pk': self.pk})
|
|
|
|
@property
|
|
def filename(self):
|
|
return '{}{}'.format(
|
|
self.track.full_name,
|
|
os.path.splitext(self.audio_file.name)[-1])
|
|
|
|
def save(self, **kwargs):
|
|
if not self.mimetype and self.audio_file:
|
|
self.mimetype = utils.guess_mimetype(self.audio_file)
|
|
return super().save(**kwargs)
|
|
|
|
|
|
IMPORT_STATUS_CHOICES = (
|
|
('pending', 'Pending'),
|
|
('finished', 'Finished'),
|
|
('errored', 'Errored'),
|
|
('skipped', 'Skipped'),
|
|
)
|
|
|
|
|
|
class ImportBatch(models.Model):
|
|
uuid = models.UUIDField(
|
|
unique=True, db_index=True, default=uuid.uuid4)
|
|
IMPORT_BATCH_SOURCES = [
|
|
('api', 'api'),
|
|
('shell', 'shell'),
|
|
('federation', 'federation'),
|
|
]
|
|
source = models.CharField(
|
|
max_length=30, default='api', choices=IMPORT_BATCH_SOURCES)
|
|
creation_date = models.DateTimeField(default=timezone.now)
|
|
submitted_by = models.ForeignKey(
|
|
'users.User',
|
|
related_name='imports',
|
|
null=True,
|
|
blank=True,
|
|
on_delete=models.CASCADE)
|
|
status = models.CharField(
|
|
choices=IMPORT_STATUS_CHOICES, default='pending', max_length=30)
|
|
import_request = models.ForeignKey(
|
|
'requests.ImportRequest',
|
|
related_name='import_batches',
|
|
null=True,
|
|
blank=True,
|
|
on_delete=models.CASCADE)
|
|
|
|
class Meta:
|
|
ordering = ['-creation_date']
|
|
|
|
def __str__(self):
|
|
return str(self.pk)
|
|
|
|
def update_status(self):
|
|
self.status = utils.compute_status(self.jobs.all())
|
|
self.save(update_fields=['status'])
|
|
|
|
|
|
class ImportJob(models.Model):
|
|
uuid = models.UUIDField(
|
|
unique=True, db_index=True, default=uuid.uuid4)
|
|
batch = models.ForeignKey(
|
|
ImportBatch, related_name='jobs', on_delete=models.CASCADE)
|
|
track_file = models.ForeignKey(
|
|
TrackFile,
|
|
related_name='jobs',
|
|
null=True,
|
|
blank=True,
|
|
on_delete=models.CASCADE)
|
|
source = models.CharField(max_length=500)
|
|
mbid = models.UUIDField(editable=False, null=True, blank=True)
|
|
|
|
status = models.CharField(
|
|
choices=IMPORT_STATUS_CHOICES, default='pending', max_length=30)
|
|
audio_file = models.FileField(
|
|
upload_to='imports/%Y/%m/%d', max_length=255, null=True, blank=True)
|
|
|
|
library_track = models.ForeignKey(
|
|
'federation.LibraryTrack',
|
|
related_name='import_jobs',
|
|
on_delete=models.SET_NULL,
|
|
null=True,
|
|
blank=True
|
|
)
|
|
|
|
class Meta:
|
|
ordering = ('id', )
|
|
|
|
|
|
@receiver(post_save, sender=ImportJob)
|
|
def update_batch_status(sender, instance, **kwargs):
|
|
instance.batch.update_status()
|
|
|
|
|
|
@receiver(post_save, sender=ImportBatch)
|
|
def update_request_status(sender, instance, created, **kwargs):
|
|
update_fields = kwargs.get('update_fields', []) or []
|
|
if not instance.import_request:
|
|
return
|
|
|
|
if not created and not 'status' in update_fields:
|
|
return
|
|
|
|
r_status = instance.import_request.status
|
|
status = instance.status
|
|
|
|
if status == 'pending' and r_status == 'pending':
|
|
# let's mark the request as accepted since we started an import
|
|
instance.import_request.status = 'accepted'
|
|
return instance.import_request.save(update_fields=['status'])
|
|
|
|
if status == 'finished' and r_status == 'accepted':
|
|
# let's mark the request as imported since the import is over
|
|
instance.import_request.status = 'imported'
|
|
return instance.import_request.save(update_fields=['status'])
|