import datetime import os import shutil import tempfile import uuid import arrow import markdown from django.conf import settings from django.core.files import File from django.core.files.base import ContentFile from django.db import models from django.db.models.signals import post_save from django.dispatch import receiver from django.urls import reverse from django.utils import timezone from taggit.managers import TaggableManager from versatileimagefield.fields import VersatileImageField from funkwhale_api import downloader, musicbrainz from funkwhale_api.federation import utils as federation_utils from . import importers, metadata, utils class APIModelMixin(models.Model): mbid = models.UUIDField(unique=True, db_index=True, null=True, blank=True) uuid = models.UUIDField(unique=True, db_index=True, default=uuid.uuid4) api_includes = [] creation_date = models.DateTimeField(default=timezone.now) import_hooks = [] class Meta: abstract = True ordering = ["-creation_date"] @classmethod def get_or_create_from_api(cls, mbid): try: return cls.objects.get(mbid=mbid), False except cls.DoesNotExist: return cls.create_from_api(id=mbid), True def get_api_data(self): return self.__class__.api.get(id=self.mbid, includes=self.api_includes)[ self.musicbrainz_model ] @classmethod def create_from_api(cls, **kwargs): if kwargs.get("id"): raw_data = cls.api.get(id=kwargs["id"], includes=cls.api_includes)[ cls.musicbrainz_model ] else: raw_data = cls.api.search(**kwargs)[ "{0}-list".format(cls.musicbrainz_model) ][0] cleaned_data = cls.clean_musicbrainz_data(raw_data) return importers.load(cls, cleaned_data, raw_data, cls.import_hooks) @classmethod def clean_musicbrainz_data(cls, data): cleaned_data = {} mapping = importers.Mapping(cls.musicbrainz_mapping) for key, value in data.items(): try: cleaned_key, cleaned_value = mapping.from_musicbrainz(key, value) cleaned_data[cleaned_key] = cleaned_value except KeyError as e: pass return cleaned_data @property def musicbrainz_url(self): if self.mbid: return "https://musicbrainz.org/{}/{}".format( self.musicbrainz_model, self.mbid ) class ArtistQuerySet(models.QuerySet): def with_albums_count(self): return self.annotate(_albums_count=models.Count("albums")) def with_albums(self): return self.prefetch_related( models.Prefetch("albums", queryset=Album.objects.with_tracks_count()) ) class Artist(APIModelMixin): name = models.CharField(max_length=255) musicbrainz_model = "artist" musicbrainz_mapping = { "mbid": {"musicbrainz_field_name": "id"}, "name": {"musicbrainz_field_name": "name"}, } api = musicbrainz.api.artists objects = ArtistQuerySet.as_manager() def __str__(self): return self.name @property def tags(self): t = [] for album in self.albums.all(): for tag in album.tags: t.append(tag) return set(t) @classmethod def get_or_create_from_name(cls, name, **kwargs): kwargs.update({"name": name}) return cls.objects.get_or_create(name__iexact=name, defaults=kwargs) def import_artist(v): a = Artist.get_or_create_from_api(mbid=v[0]["artist"]["id"])[0] return a def parse_date(v): if len(v) == 4: return datetime.date(int(v), 1, 1) d = arrow.get(v).date() return d def import_tracks(instance, cleaned_data, raw_data): for track_data in raw_data["medium-list"][0]["track-list"]: track_cleaned_data = Track.clean_musicbrainz_data(track_data["recording"]) track_cleaned_data["album"] = instance track_cleaned_data["position"] = int(track_data["position"]) track = importers.load( Track, track_cleaned_data, track_data, Track.import_hooks ) class AlbumQuerySet(models.QuerySet): def with_tracks_count(self): return self.annotate(_tracks_count=models.Count("tracks")) class Album(APIModelMixin): title = models.CharField(max_length=255) artist = models.ForeignKey(Artist, related_name="albums", on_delete=models.CASCADE) release_date = models.DateField(null=True) release_group_id = models.UUIDField(null=True, blank=True) cover = VersatileImageField( upload_to="albums/covers/%Y/%m/%d", null=True, blank=True ) TYPE_CHOICES = (("album", "Album"),) type = models.CharField(choices=TYPE_CHOICES, max_length=30, default="album") api_includes = ["artist-credits", "recordings", "media", "release-groups"] api = musicbrainz.api.releases musicbrainz_model = "release" musicbrainz_mapping = { "mbid": {"musicbrainz_field_name": "id"}, "position": { "musicbrainz_field_name": "release-list", "converter": lambda v: int(v[0]["medium-list"][0]["position"]), }, "release_group_id": { "musicbrainz_field_name": "release-group", "converter": lambda v: v["id"], }, "title": {"musicbrainz_field_name": "title"}, "release_date": {"musicbrainz_field_name": "date", "converter": parse_date}, "type": {"musicbrainz_field_name": "type", "converter": lambda v: v.lower()}, "artist": { "musicbrainz_field_name": "artist-credit", "converter": import_artist, }, } objects = AlbumQuerySet.as_manager() def get_image(self, data=None): if data: f = ContentFile(data["content"]) extensions = {"image/jpeg": "jpg", "image/png": "png", "image/gif": "gif"} extension = extensions.get(data["mimetype"], "jpg") self.cover.save("{}.{}".format(self.uuid, extension), f) else: image_data = musicbrainz.api.images.get_front(str(self.mbid)) f = ContentFile(image_data) self.cover.save("{0}.jpg".format(self.mbid), f) return self.cover.file def __str__(self): return self.title @property def tags(self): t = [] for track in self.tracks.all(): for tag in track.tags.all(): t.append(tag) return set(t) @classmethod def get_or_create_from_title(cls, title, **kwargs): kwargs.update({"title": title}) return cls.objects.get_or_create(title__iexact=title, defaults=kwargs) def import_tags(instance, cleaned_data, raw_data): MINIMUM_COUNT = 2 tags_to_add = [] for tag_data in raw_data.get("tag-list", []): try: if int(tag_data["count"]) < MINIMUM_COUNT: continue except ValueError: continue tags_to_add.append(tag_data["name"]) instance.tags.add(*tags_to_add) def import_album(v): a = Album.get_or_create_from_api(mbid=v[0]["id"])[0] return a def link_recordings(instance, cleaned_data, raw_data): tracks = [r["target"] for r in raw_data["recording-relation-list"]] Track.objects.filter(mbid__in=tracks).update(work=instance) def import_lyrics(instance, cleaned_data, raw_data): try: url = [ url_data for url_data in raw_data["url-relation-list"] if url_data["type"] == "lyrics" ][0]["target"] except (IndexError, KeyError): return l, _ = Lyrics.objects.get_or_create(work=instance, url=url) return l class Work(APIModelMixin): language = models.CharField(max_length=20) nature = models.CharField(max_length=50) title = models.CharField(max_length=255) api = musicbrainz.api.works api_includes = ["url-rels", "recording-rels"] musicbrainz_model = "work" musicbrainz_mapping = { "mbid": {"musicbrainz_field_name": "id"}, "title": {"musicbrainz_field_name": "title"}, "language": {"musicbrainz_field_name": "language"}, "nature": {"musicbrainz_field_name": "type", "converter": lambda v: v.lower()}, } import_hooks = [import_lyrics, link_recordings] def fetch_lyrics(self): l = self.lyrics.first() if l: return l data = self.api.get(self.mbid, includes=["url-rels"])["work"] l = import_lyrics(self, {}, data) return l class Lyrics(models.Model): uuid = models.UUIDField(unique=True, db_index=True, default=uuid.uuid4) work = models.ForeignKey( Work, related_name="lyrics", null=True, blank=True, on_delete=models.CASCADE ) url = models.URLField(unique=True) content = models.TextField(null=True, blank=True) @property def content_rendered(self): return markdown.markdown( self.content, safe_mode=True, enable_attributes=False, extensions=["markdown.extensions.nl2br"], ) class TrackQuerySet(models.QuerySet): def for_nested_serialization(self): return ( self.select_related() .select_related("album__artist", "artist") .prefetch_related("files") ) def get_artist(release_list): return Artist.get_or_create_from_api( mbid=release_list[0]["artist-credits"][0]["artists"]["id"] )[0] class Track(APIModelMixin): title = models.CharField(max_length=255) artist = models.ForeignKey(Artist, related_name="tracks", on_delete=models.CASCADE) position = models.PositiveIntegerField(null=True, blank=True) album = models.ForeignKey( Album, related_name="tracks", null=True, blank=True, on_delete=models.CASCADE ) work = models.ForeignKey( Work, related_name="tracks", null=True, blank=True, on_delete=models.CASCADE ) musicbrainz_model = "recording" api = musicbrainz.api.recordings api_includes = ["artist-credits", "releases", "media", "tags", "work-rels"] musicbrainz_mapping = { "mbid": {"musicbrainz_field_name": "id"}, "title": {"musicbrainz_field_name": "title"}, "artist": { # we use the artist from the release to avoid #237 "musicbrainz_field_name": "release-list", "converter": get_artist, }, "album": {"musicbrainz_field_name": "release-list", "converter": import_album}, } import_hooks = [import_tags] objects = TrackQuerySet.as_manager() tags = TaggableManager(blank=True) class Meta: ordering = ["album", "position"] def __str__(self): return self.title def save(self, **kwargs): try: self.artist except Artist.DoesNotExist: self.artist = self.album.artist super().save(**kwargs) def get_work(self): if self.work: return self.work data = self.api.get(self.mbid, includes=["work-rels"]) try: work_data = data["recording"]["work-relation-list"][0]["work"] except (IndexError, KeyError): return work, _ = Work.get_or_create_from_api(mbid=work_data["id"]) return work def get_lyrics_url(self): return reverse("api:v1:tracks-lyrics", kwargs={"pk": self.pk}) @property def full_name(self): try: return "{} - {} - {}".format(self.artist.name, self.album.title, self.title) except AttributeError: return "{} - {}".format(self.artist.name, self.title) def get_activity_url(self): if self.mbid: return "https://musicbrainz.org/recording/{}".format(self.mbid) return settings.FUNKWHALE_URL + "/tracks/{}".format(self.pk) @classmethod def get_or_create_from_title(cls, title, **kwargs): kwargs.update({"title": title}) return cls.objects.get_or_create(title__iexact=title, defaults=kwargs) @classmethod def get_or_create_from_release(cls, release_mbid, mbid): release_mbid = str(release_mbid) mbid = str(mbid) try: return cls.objects.get(mbid=mbid), False except cls.DoesNotExist: pass album = Album.get_or_create_from_api(release_mbid)[0] data = musicbrainz.client.api.releases.get( str(album.mbid), includes=Album.api_includes ) tracks = [t for m in data["release"]["medium-list"] for t in m["track-list"]] track_data = None for track in tracks: if track["recording"]["id"] == mbid: track_data = track break if not track_data: raise ValueError("No track found matching this ID") return cls.objects.update_or_create( mbid=mbid, defaults={ "position": int(track["position"]), "title": track["recording"]["title"], "album": album, "artist": album.artist, }, ) class TrackFile(models.Model): uuid = models.UUIDField(unique=True, db_index=True, default=uuid.uuid4) track = models.ForeignKey(Track, related_name="files", on_delete=models.CASCADE) audio_file = models.FileField(upload_to="tracks/%Y/%m/%d", max_length=255) source = models.URLField(null=True, blank=True, max_length=500) creation_date = models.DateTimeField(default=timezone.now) modification_date = models.DateTimeField(auto_now=True) accessed_date = models.DateTimeField(null=True, blank=True) duration = models.IntegerField(null=True, blank=True) size = models.IntegerField(null=True, blank=True) bitrate = models.IntegerField(null=True, blank=True) acoustid_track_id = models.UUIDField(null=True, blank=True) mimetype = models.CharField(null=True, blank=True, max_length=200) library_track = models.OneToOneField( "federation.LibraryTrack", related_name="local_track_file", on_delete=models.CASCADE, null=True, blank=True, ) def download_file(self): # import the track file, since there is not any # we create a tmp dir for the download tmp_dir = tempfile.mkdtemp() data = downloader.download(self.source, target_directory=tmp_dir) self.duration = data.get("duration", None) self.audio_file.save( os.path.basename(data["audio_file_path"]), File(open(data["audio_file_path"], "rb")), ) shutil.rmtree(tmp_dir) return self.audio_file def get_federation_url(self): return federation_utils.full_url("/federation/music/file/{}".format(self.uuid)) @property def path(self): return reverse("api:v1:trackfiles-serve", kwargs={"pk": self.pk}) @property def filename(self): return "{}.{}".format(self.track.full_name, self.extension) @property def extension(self): if not self.audio_file: return return os.path.splitext(self.audio_file.name)[-1].replace(".", "", 1) def get_file_size(self): if self.audio_file: return self.audio_file.size if self.source.startswith("file://"): return os.path.getsize(self.source.replace("file://", "", 1)) if self.library_track and self.library_track.audio_file: return self.library_track.audio_file.size def get_audio_file(self): if self.audio_file: return self.audio_file.open() if self.source.startswith("file://"): return open(self.source.replace("file://", "", 1), "rb") if self.library_track and self.library_track.audio_file: return self.library_track.audio_file.open() def set_audio_data(self): audio_file = self.get_audio_file() if audio_file: with audio_file as f: audio_data = utils.get_audio_file_data(f) if not audio_data: return self.duration = int(audio_data["length"]) self.bitrate = audio_data["bitrate"] self.size = self.get_file_size() else: lt = self.library_track if lt: self.duration = lt.get_metadata("length") self.size = lt.get_metadata("size") self.bitrate = lt.get_metadata("bitrate") def save(self, **kwargs): if not self.mimetype and self.audio_file: self.mimetype = utils.guess_mimetype(self.audio_file) return super().save(**kwargs) def get_metadata(self): audio_file = self.get_audio_file() if not audio_file: return return metadata.Metadata(audio_file) IMPORT_STATUS_CHOICES = ( ("pending", "Pending"), ("finished", "Finished"), ("errored", "Errored"), ("skipped", "Skipped"), ) class ImportBatch(models.Model): uuid = models.UUIDField(unique=True, db_index=True, default=uuid.uuid4) IMPORT_BATCH_SOURCES = [ ("api", "api"), ("shell", "shell"), ("federation", "federation"), ] source = models.CharField( max_length=30, default="api", choices=IMPORT_BATCH_SOURCES ) creation_date = models.DateTimeField(default=timezone.now) submitted_by = models.ForeignKey( "users.User", related_name="imports", null=True, blank=True, on_delete=models.CASCADE, ) status = models.CharField( choices=IMPORT_STATUS_CHOICES, default="pending", max_length=30 ) import_request = models.ForeignKey( "requests.ImportRequest", related_name="import_batches", null=True, blank=True, on_delete=models.CASCADE, ) class Meta: ordering = ["-creation_date"] def __str__(self): return str(self.pk) def update_status(self): old_status = self.status self.status = utils.compute_status(self.jobs.all()) if self.status == old_status: return self.save(update_fields=["status"]) if self.status != old_status and self.status == "finished": from . import tasks tasks.import_batch_notify_followers.delay(import_batch_id=self.pk) def get_federation_url(self): return federation_utils.full_url( "/federation/music/import/batch/{}".format(self.uuid) ) class ImportJob(models.Model): uuid = models.UUIDField(unique=True, db_index=True, default=uuid.uuid4) batch = models.ForeignKey( ImportBatch, related_name="jobs", on_delete=models.CASCADE ) track_file = models.ForeignKey( TrackFile, related_name="jobs", null=True, blank=True, on_delete=models.CASCADE ) source = models.CharField(max_length=500) mbid = models.UUIDField(editable=False, null=True, blank=True) status = models.CharField( choices=IMPORT_STATUS_CHOICES, default="pending", max_length=30 ) audio_file = models.FileField( upload_to="imports/%Y/%m/%d", max_length=255, null=True, blank=True ) library_track = models.ForeignKey( "federation.LibraryTrack", related_name="import_jobs", on_delete=models.SET_NULL, null=True, blank=True, ) class Meta: ordering = ("id",) @receiver(post_save, sender=ImportJob) def update_batch_status(sender, instance, **kwargs): instance.batch.update_status() @receiver(post_save, sender=ImportBatch) def update_request_status(sender, instance, created, **kwargs): update_fields = kwargs.get("update_fields", []) or [] if not instance.import_request: return if not created and not "status" in update_fields: return r_status = instance.import_request.status status = instance.status if status == "pending" and r_status == "pending": # let's mark the request as accepted since we started an import instance.import_request.status = "accepted" return instance.import_request.save(update_fields=["status"]) if status == "finished" and r_status == "accepted": # let's mark the request as imported since the import is over instance.import_request.status = "imported" return instance.import_request.save(update_fields=["status"])