funkwhale/api/funkwhale_api/music/models.py

409 wiersze
12 KiB
Python

import os
import io
import arrow
import datetime
import tempfile
import shutil
import markdown
from django.conf import settings
from django.db import models
from django.contrib.staticfiles.templatetags.staticfiles import static
from django.core.files.base import ContentFile
from django.core.files import File
from django.core.urlresolvers import reverse
from django.utils import timezone
from taggit.managers import TaggableManager
from versatileimagefield.fields import VersatileImageField
from funkwhale_api.taskapp import celery
from funkwhale_api import downloader
from funkwhale_api import musicbrainz
from . import importers
from . import lyrics as lyrics_utils
class APIModelMixin(models.Model):
mbid = models.UUIDField(unique=True, db_index=True, null=True, blank=True)
api_includes = []
creation_date = models.DateTimeField(default=timezone.now)
import_hooks = []
class Meta:
abstract = True
ordering = ['-creation_date']
@classmethod
def get_or_create_from_api(cls, mbid):
try:
return cls.objects.get(mbid=mbid), False
except cls.DoesNotExist:
return cls.create_from_api(id=mbid), True
def get_api_data(self):
return self.__class__.api.get(id=self.mbid, includes=self.api_includes)[self.musicbrainz_model]
@classmethod
def create_from_api(cls, **kwargs):
if kwargs.get('id'):
raw_data = cls.api.get(id=kwargs['id'], includes=cls.api_includes)[cls.musicbrainz_model]
else:
raw_data = cls.api.search(**kwargs)['{0}-list'.format(cls.musicbrainz_model)][0]
cleaned_data = cls.clean_musicbrainz_data(raw_data)
return importers.load(cls, cleaned_data, raw_data, cls.import_hooks)
@classmethod
def clean_musicbrainz_data(cls, data):
cleaned_data = {}
mapping = importers.Mapping(cls.musicbrainz_mapping)
for key, value in data.items():
try:
cleaned_key, cleaned_value = mapping.from_musicbrainz(key, value)
cleaned_data[cleaned_key] = cleaned_value
except KeyError as e:
pass
return cleaned_data
class Artist(APIModelMixin):
name = models.CharField(max_length=255)
musicbrainz_model = 'artist'
musicbrainz_mapping = {
'mbid': {
'musicbrainz_field_name': 'id'
},
'name': {
'musicbrainz_field_name': 'name'
}
}
api = musicbrainz.api.artists
def __str__(self):
return self.name
@property
def tags(self):
t = []
for album in self.albums.all():
for tag in album.tags:
t.append(tag)
return set(t)
def import_artist(v):
a = Artist.get_or_create_from_api(mbid=v[0]['artist']['id'])[0]
return a
def parse_date(v):
if len(v) == 4:
return datetime.date(int(v), 1, 1)
d = arrow.get(v).date()
return d
def import_tracks(instance, cleaned_data, raw_data):
for track_data in raw_data['medium-list'][0]['track-list']:
track_cleaned_data = Track.clean_musicbrainz_data(track_data['recording'])
track_cleaned_data['album'] = instance
track_cleaned_data['position'] = int(track_data['position'])
track = importers.load(Track, track_cleaned_data, track_data, Track.import_hooks)
class Album(APIModelMixin):
title = models.CharField(max_length=255)
artist = models.ForeignKey(Artist, related_name='albums')
release_date = models.DateField(null=True)
cover = VersatileImageField(upload_to='albums/covers/%Y/%m/%d', null=True, blank=True)
TYPE_CHOICES = (
('album', 'Album'),
)
type = models.CharField(choices=TYPE_CHOICES, max_length=30, default='album')
api_includes = ['artist-credits', 'recordings', 'media']
api = musicbrainz.api.releases
musicbrainz_model = 'release'
musicbrainz_mapping = {
'mbid': {
'musicbrainz_field_name': 'id',
},
'position': {
'musicbrainz_field_name': 'release-list',
'converter': lambda v: int(v[0]['medium-list'][0]['position']),
},
'title': {
'musicbrainz_field_name': 'title',
},
'release_date': {
'musicbrainz_field_name': 'date',
'converter': parse_date,
},
'type': {
'musicbrainz_field_name': 'type',
'converter': lambda v: v.lower(),
},
'artist': {
'musicbrainz_field_name': 'artist-credit',
'converter': import_artist,
}
}
def get_image(self):
image_data = musicbrainz.api.images.get_front(str(self.mbid))
f = ContentFile(image_data)
self.cover.save('{0}.jpg'.format(self.mbid), f)
return self.cover.file
def __str__(self):
return self.title
@property
def tags(self):
t = []
for track in self.tracks.all():
for tag in track.tags.all():
t.append(tag)
return set(t)
def import_tags(instance, cleaned_data, raw_data):
MINIMUM_COUNT = 2
tags_to_add = []
for tag_data in raw_data.get('tag-list', []):
try:
if int(tag_data['count']) < MINIMUM_COUNT:
continue
except ValueError:
continue
tags_to_add.append(tag_data['name'])
instance.tags.add(*tags_to_add)
def import_album(v):
a = Album.get_or_create_from_api(mbid=v[0]['id'])[0]
return a
def link_recordings(instance, cleaned_data, raw_data):
tracks = [
r['target']
for r in raw_data['recording-relation-list']
]
Track.objects.filter(mbid__in=tracks).update(work=instance)
def import_lyrics(instance, cleaned_data, raw_data):
try:
url = [
url_data
for url_data in raw_data['url-relation-list']
if url_data['type'] == 'lyrics'
][0]['target']
except (IndexError, KeyError):
return
l, _ = Lyrics.objects.get_or_create(work=instance, url=url)
return l
class Work(APIModelMixin):
language = models.CharField(max_length=20)
nature = models.CharField(max_length=50)
title = models.CharField(max_length=255)
api = musicbrainz.api.works
api_includes = ['url-rels', 'recording-rels']
musicbrainz_model = 'work'
musicbrainz_mapping = {
'mbid': {
'musicbrainz_field_name': 'id'
},
'title': {
'musicbrainz_field_name': 'title'
},
'language': {
'musicbrainz_field_name': 'language',
},
'nature': {
'musicbrainz_field_name': 'type',
'converter': lambda v: v.lower(),
},
}
import_hooks = [
import_lyrics,
link_recordings
]
def fetch_lyrics(self):
l = self.lyrics.first()
if l:
return l
data = self.api.get(self.mbid, includes=['url-rels'])['work']
l = import_lyrics(self, {}, data)
return l
class Lyrics(models.Model):
work = models.ForeignKey(Work, related_name='lyrics', null=True, blank=True)
url = models.URLField(unique=True)
content = models.TextField(null=True, blank=True)
@celery.app.task(name='Lyrics.fetch_content', filter=celery.task_method)
def fetch_content(self):
html = lyrics_utils._get_html(self.url)
content = lyrics_utils.extract_content(html)
cleaned_content = lyrics_utils.clean_content(content)
self.content = cleaned_content
self.save()
@property
def content_rendered(self):
return markdown.markdown(
self.content,
safe_mode=True,
enable_attributes=False,
extensions=['markdown.extensions.nl2br'])
class Track(APIModelMixin):
title = models.CharField(max_length=255)
artist = models.ForeignKey(Artist, related_name='tracks')
position = models.PositiveIntegerField(null=True, blank=True)
album = models.ForeignKey(Album, related_name='tracks', null=True, blank=True)
work = models.ForeignKey(Work, related_name='tracks', null=True, blank=True)
musicbrainz_model = 'recording'
api = musicbrainz.api.recordings
api_includes = ['artist-credits', 'releases', 'media', 'tags', 'work-rels']
musicbrainz_mapping = {
'mbid': {
'musicbrainz_field_name': 'id'
},
'title': {
'musicbrainz_field_name': 'title'
},
'artist': {
'musicbrainz_field_name': 'artist-credit',
'converter': lambda v: Artist.get_or_create_from_api(mbid=v[0]['artist']['id'])[0],
},
'album': {
'musicbrainz_field_name': 'release-list',
'converter': import_album,
},
}
import_hooks = [
import_tags
]
tags = TaggableManager()
def __str__(self):
return self.title
def save(self, **kwargs):
try:
self.artist
except Artist.DoesNotExist:
self.artist = self.album.artist
super().save(**kwargs)
def get_work(self):
if self.work:
return self.work
data = self.api.get(self.mbid, includes=['work-rels'])
try:
work_data = data['recording']['work-relation-list'][0]['work']
except (IndexError, KeyError):
return
work, _ = Work.get_or_create_from_api(mbid=work_data['id'])
return work
def get_lyrics_url(self):
return reverse('api:tracks-lyrics', kwargs={'pk': self.pk})
@property
def full_name(self):
try:
return '{} - {} - {}'.format(
self.artist.name,
self.album.title,
self.title,
)
except AttributeError:
return '{} - {}'.format(
self.artist.name,
self.title,
)
class TrackFile(models.Model):
track = models.ForeignKey(Track, related_name='files')
audio_file = models.FileField(upload_to='tracks/%Y/%m/%d', max_length=255)
source = models.URLField(null=True, blank=True)
duration = models.IntegerField(null=True, blank=True)
def download_file(self):
# import the track file, since there is not any
# we create a tmp dir for the download
tmp_dir = tempfile.mkdtemp()
data = downloader.download(
self.source,
target_directory=tmp_dir)
self.duration = data.get('duration', None)
self.audio_file.save(
os.path.basename(data['audio_file_path']),
File(open(data['audio_file_path'], 'rb'))
)
shutil.rmtree(tmp_dir)
return self.audio_file
@property
def path(self):
if settings.USE_SAMPLE_TRACK:
return static('music/sample1.ogg')
return self.audio_file.url
class ImportBatch(models.Model):
creation_date = models.DateTimeField(default=timezone.now)
submitted_by = models.ForeignKey('users.User', related_name='imports')
class Meta:
ordering = ['-creation_date']
def __str__(self):
return str(self.pk)
@property
def status(self):
pending = any([job.status == 'pending' for job in self.jobs.all()])
if pending:
return 'pending'
return 'finished'
class ImportJob(models.Model):
batch = models.ForeignKey(ImportBatch, related_name='jobs')
source = models.URLField()
mbid = models.UUIDField(editable=False)
STATUS_CHOICES = (
('pending', 'Pending'),
('finished', 'finished'),
)
status = models.CharField(choices=STATUS_CHOICES, default='pending', max_length=30)
@celery.app.task(name='ImportJob.run', filter=celery.task_method)
def run(self, replace=False):
try:
track, created = Track.get_or_create_from_api(mbid=self.mbid)
track_file = None
if replace:
track_file = track.files.first()
elif track.files.count() > 0:
return
track_file = track_file or TrackFile(track=track, source=self.source)
track_file.download_file()
track_file.save()
self.status = 'finished'
self.save()
return track.pk
except Exception as exc:
if not settings.DEBUG:
raise ImportJob.run.retry(args=[self], exc=exc, countdown=30, max_retries=3)
raise