kopia lustrzana https://dev.funkwhale.audio/funkwhale/funkwhale
784 wiersze
27 KiB
Python
784 wiersze
27 KiB
Python
import collections
|
|
import datetime
|
|
import itertools
|
|
import os
|
|
import queue
|
|
import threading
|
|
import time
|
|
import urllib.parse
|
|
|
|
import watchdog.events
|
|
import watchdog.observers
|
|
|
|
from django.conf import settings
|
|
from django.core.files import File
|
|
from django.core.management import call_command
|
|
from django.core.management.base import BaseCommand, CommandError
|
|
from django.db.models import Q
|
|
from django.utils import timezone
|
|
|
|
from rest_framework import serializers
|
|
|
|
from funkwhale_api.common import utils as common_utils
|
|
from funkwhale_api.music import models, tasks, utils
|
|
|
|
|
|
def crawl_dir(dir, extensions, recursive=True, ignored=[]):
|
|
if os.path.isfile(dir):
|
|
yield dir
|
|
return
|
|
try:
|
|
scanner = os.scandir(dir)
|
|
for entry in scanner:
|
|
if entry.is_file():
|
|
for e in extensions:
|
|
if entry.name.lower().endswith(".{}".format(e.lower())):
|
|
if entry.path not in ignored:
|
|
yield entry.path
|
|
elif recursive and entry.is_dir():
|
|
yield from crawl_dir(
|
|
entry, extensions, recursive=recursive, ignored=ignored
|
|
)
|
|
finally:
|
|
if hasattr(scanner, "close"):
|
|
scanner.close()
|
|
|
|
|
|
def batch(iterable, n=1):
|
|
has_entries = True
|
|
while has_entries:
|
|
current = []
|
|
for i in range(0, n):
|
|
try:
|
|
current.append(next(iterable))
|
|
except StopIteration:
|
|
has_entries = False
|
|
yield current
|
|
|
|
|
|
class Command(BaseCommand):
|
|
help = "Import audio files mathinc given glob pattern"
|
|
|
|
def add_arguments(self, parser):
|
|
parser.add_argument(
|
|
"library_id",
|
|
type=str,
|
|
help=(
|
|
"A local library identifier where the files should be imported. "
|
|
"You can use the full uuid such as e29c5be9-6da3-4d92-b40b-4970edd3ee4b "
|
|
"or only a small portion of it, starting from the beginning, such as "
|
|
"e29c5be9"
|
|
),
|
|
)
|
|
parser.add_argument("path", nargs="+", type=str)
|
|
parser.add_argument(
|
|
"--recursive",
|
|
action="store_true",
|
|
dest="recursive",
|
|
default=False,
|
|
help="Will match the pattern recursively (including subdirectories)",
|
|
)
|
|
parser.add_argument(
|
|
"--username",
|
|
dest="username",
|
|
help="The username of the user you want to be bound to the import",
|
|
)
|
|
parser.add_argument(
|
|
"--async",
|
|
action="store_true",
|
|
dest="async_",
|
|
default=False,
|
|
help="Will launch celery tasks for each file to import instead of doing it synchronously and block the CLI",
|
|
)
|
|
parser.add_argument(
|
|
"--exit",
|
|
"-x",
|
|
action="store_true",
|
|
dest="exit_on_failure",
|
|
default=False,
|
|
help="Use this flag to disable error catching",
|
|
)
|
|
parser.add_argument(
|
|
"--in-place",
|
|
"-i",
|
|
action="store_true",
|
|
dest="in_place",
|
|
default=False,
|
|
help=(
|
|
"Import files without duplicating them into the media directory."
|
|
"For in-place import to work, the music files must be readable"
|
|
"by the web-server and funkwhale api and celeryworker processes."
|
|
"You may want to use this if you have a big music library to "
|
|
"import and not much disk space available."
|
|
),
|
|
)
|
|
parser.add_argument(
|
|
"--replace",
|
|
action="store_true",
|
|
dest="replace",
|
|
default=False,
|
|
help=(
|
|
"Use this flag to replace duplicates (tracks with same "
|
|
"musicbrainz mbid, or same artist, album and title) on import "
|
|
"with their newest version."
|
|
),
|
|
)
|
|
parser.add_argument(
|
|
"--outbox",
|
|
action="store_true",
|
|
dest="outbox",
|
|
default=False,
|
|
help=(
|
|
"Use this flag to notify library followers of newly imported files. "
|
|
"You'll likely want to keep this disabled for CLI imports, especially if"
|
|
"you plan to import hundreds or thousands of files, as it will cause a lot "
|
|
"of overhead on your server and on servers you are federating with."
|
|
),
|
|
)
|
|
parser.add_argument(
|
|
"--watch",
|
|
action="store_true",
|
|
dest="watch",
|
|
default=False,
|
|
help=(
|
|
"Start the command in watch mode. Instead of running a full import, "
|
|
"and exit, watch the given path and import new files, remove deleted "
|
|
"files, and update metadata corresponding to updated files."
|
|
),
|
|
)
|
|
parser.add_argument("-e", "--extension", nargs="+")
|
|
|
|
parser.add_argument(
|
|
"--broadcast",
|
|
action="store_true",
|
|
dest="broadcast",
|
|
default=False,
|
|
help=(
|
|
"Use this flag to enable realtime updates about the import in the UI. "
|
|
"This causes some overhead, so it's disabled by default."
|
|
),
|
|
)
|
|
parser.add_argument(
|
|
"--prune",
|
|
action="store_true",
|
|
dest="prune",
|
|
default=False,
|
|
help=(
|
|
"Once the import is completed, prune tracks, ablums and artists that aren't linked to any upload."
|
|
),
|
|
)
|
|
|
|
parser.add_argument(
|
|
"--reference",
|
|
action="store",
|
|
dest="reference",
|
|
default=None,
|
|
help=(
|
|
"A custom reference for the import. Leave this empty to have a random "
|
|
"reference being generated for you."
|
|
),
|
|
)
|
|
parser.add_argument(
|
|
"--noinput",
|
|
"--no-input",
|
|
action="store_false",
|
|
dest="interactive",
|
|
help="Do NOT prompt the user for input of any kind.",
|
|
)
|
|
|
|
parser.add_argument(
|
|
"--batch-size",
|
|
"-s",
|
|
dest="batch_size",
|
|
default=1000,
|
|
type=int,
|
|
help="Size of each batch, only used when crawling large collections",
|
|
)
|
|
|
|
def handle(self, *args, **options):
|
|
# handle relative directories
|
|
options["path"] = [os.path.abspath(path) for path in options["path"]]
|
|
self.is_confirmed = False
|
|
try:
|
|
library = models.Library.objects.select_related("actor__user").get(
|
|
uuid__startswith=options["library_id"]
|
|
)
|
|
except models.Library.DoesNotExist:
|
|
raise CommandError("Invalid library id")
|
|
|
|
if not library.actor.get_user():
|
|
raise CommandError("Library {} is not a local library".format(library.uuid))
|
|
|
|
if options["in_place"]:
|
|
self.stdout.write(
|
|
"Checking imported paths against settings.MUSIC_DIRECTORY_PATH"
|
|
)
|
|
|
|
for import_path in options["path"]:
|
|
p = settings.MUSIC_DIRECTORY_PATH
|
|
if not p:
|
|
raise CommandError(
|
|
"Importing in-place requires setting the "
|
|
"MUSIC_DIRECTORY_PATH variable"
|
|
)
|
|
if p and not import_path.startswith(p):
|
|
raise CommandError(
|
|
"Importing in-place only works if importing "
|
|
"from {} (MUSIC_DIRECTORY_PATH), as this directory"
|
|
"needs to be accessible by the webserver."
|
|
"Culprit: {}".format(p, import_path)
|
|
)
|
|
|
|
reference = options["reference"] or "cli-{}".format(timezone.now().isoformat())
|
|
|
|
import_url = "{}://{}/library/{}/upload?{}"
|
|
import_url = import_url.format(
|
|
settings.FUNKWHALE_PROTOCOL,
|
|
settings.FUNKWHALE_HOSTNAME,
|
|
str(library.uuid),
|
|
urllib.parse.urlencode([("import", reference)]),
|
|
)
|
|
self.stdout.write(
|
|
"For details, please refer to import reference '{}' or URL {}".format(
|
|
reference, import_url
|
|
)
|
|
)
|
|
extensions = options.get("extension") or utils.SUPPORTED_EXTENSIONS
|
|
if options["watch"]:
|
|
if len(options["path"]) > 1:
|
|
raise CommandError("Watch only work with a single directory")
|
|
|
|
return self.setup_watcher(
|
|
extensions=extensions,
|
|
path=options["path"][0],
|
|
reference=reference,
|
|
library=library,
|
|
in_place=options["in_place"],
|
|
prune=options["prune"],
|
|
recursive=options["recursive"],
|
|
replace=options["replace"],
|
|
dispatch_outbox=options["outbox"],
|
|
broadcast=options["broadcast"],
|
|
)
|
|
|
|
update = True
|
|
checked_paths = set()
|
|
if options["in_place"] and update:
|
|
self.stdout.write("Checking existing files for updates…")
|
|
message = (
|
|
"Are you sure you want to do this?\n\n"
|
|
"Type 'yes' to continue, or 'no' to skip checking for updates in "
|
|
"already imported files: "
|
|
)
|
|
if options["interactive"] and input("".join(message)) != "yes":
|
|
pass
|
|
else:
|
|
checked_paths = check_updates(
|
|
stdout=self.stdout,
|
|
paths=options["path"],
|
|
extensions=extensions,
|
|
library=library,
|
|
batch_size=options["batch_size"],
|
|
)
|
|
self.stdout.write("Existing files checked, moving on to next step!")
|
|
|
|
crawler = itertools.chain(
|
|
*[
|
|
crawl_dir(
|
|
p,
|
|
extensions=extensions,
|
|
recursive=options["recursive"],
|
|
ignored=checked_paths,
|
|
)
|
|
for p in options["path"]
|
|
]
|
|
)
|
|
errors = []
|
|
total = 0
|
|
start_time = time.time()
|
|
batch_start = None
|
|
batch_duration = None
|
|
self.stdout.write("Starting import of new files…")
|
|
for i, entries in enumerate(batch(crawler, options["batch_size"])):
|
|
total += len(entries)
|
|
batch_start = time.time()
|
|
time_stats = ""
|
|
if i > 0:
|
|
time_stats = " - running for {}s, previous batch took {}s".format(
|
|
int(time.time() - start_time), int(batch_duration),
|
|
)
|
|
if entries:
|
|
self.stdout.write(
|
|
"Handling batch {} ({} items){}".format(
|
|
i + 1, len(entries), time_stats,
|
|
)
|
|
)
|
|
batch_errors = self.handle_batch(
|
|
library=library,
|
|
paths=entries,
|
|
batch=i + 1,
|
|
reference=reference,
|
|
options=options,
|
|
)
|
|
if batch_errors:
|
|
errors += batch_errors
|
|
|
|
batch_duration = time.time() - batch_start
|
|
|
|
message = "Successfully imported {} new tracks in {}s"
|
|
if options["async_"]:
|
|
message = "Successfully launched import for {} new tracks in {}s"
|
|
|
|
self.stdout.write(
|
|
message.format(total - len(errors), int(time.time() - start_time))
|
|
)
|
|
if len(errors) > 0:
|
|
self.stderr.write("{} tracks could not be imported:".format(len(errors)))
|
|
|
|
for path, error in errors:
|
|
self.stderr.write("- {}: {}".format(path, error))
|
|
|
|
self.stdout.write(
|
|
"For details, please refer to import reference '{}' or URL {}".format(
|
|
reference, import_url
|
|
)
|
|
)
|
|
|
|
if options["prune"]:
|
|
self.stdout.write(
|
|
"Pruning dangling tracks, albums and artists from library…"
|
|
)
|
|
prune()
|
|
|
|
def handle_batch(self, library, paths, batch, reference, options):
|
|
matching = []
|
|
for m in paths:
|
|
# In some situations, the path is encoded incorrectly on the filesystem
|
|
# so we filter out faulty paths and display a warning to the user.
|
|
# see https://dev.funkwhale.audio/funkwhale/funkwhale/issues/138
|
|
try:
|
|
m.encode("utf-8")
|
|
matching.append(m)
|
|
except UnicodeEncodeError:
|
|
try:
|
|
previous = matching[-1]
|
|
except IndexError:
|
|
previous = None
|
|
self.stderr.write(
|
|
self.style.WARNING(
|
|
"[warning] Ignoring undecodable path. Previous ok file was {}".format(
|
|
previous
|
|
)
|
|
)
|
|
)
|
|
|
|
if not matching:
|
|
raise CommandError("No file matching pattern, aborting")
|
|
|
|
if options["replace"]:
|
|
filtered = {"initial": matching, "skipped": [], "new": matching}
|
|
message = " - {} files to be replaced"
|
|
import_paths = matching
|
|
else:
|
|
filtered = self.filter_matching(matching, library)
|
|
message = " - {} files already found in database"
|
|
import_paths = filtered["new"]
|
|
|
|
self.stdout.write(" Import summary:")
|
|
self.stdout.write(
|
|
" - {} files found matching this pattern: {}".format(
|
|
len(matching), options["path"]
|
|
)
|
|
)
|
|
self.stdout.write(message.format(len(filtered["skipped"])))
|
|
|
|
self.stdout.write(" - {} new files".format(len(filtered["new"])))
|
|
|
|
if batch == 1:
|
|
self.stdout.write(
|
|
" Selected options: {}".format(
|
|
", ".join(
|
|
["in place" if options["in_place"] else "copy music files"]
|
|
)
|
|
)
|
|
)
|
|
if len(filtered["new"]) == 0:
|
|
self.stdout.write(" Nothing new to import, exiting")
|
|
return
|
|
|
|
if options["interactive"] and not self.is_confirmed:
|
|
message = (
|
|
"Are you sure you want to do this?\n\n"
|
|
"Type 'yes' to continue, or 'no' to cancel: "
|
|
)
|
|
if input("".join(message)) != "yes":
|
|
raise CommandError("Import cancelled.")
|
|
self.is_confirmed = True
|
|
|
|
errors = self.do_import(
|
|
import_paths,
|
|
library=library,
|
|
reference=reference,
|
|
batch=batch,
|
|
options=options,
|
|
)
|
|
return errors
|
|
|
|
def filter_matching(self, matching, library):
|
|
sources = ["file://{}".format(p) for p in matching]
|
|
# we skip reimport for path that are already found
|
|
# as a Upload.source
|
|
existing = library.uploads.filter(source__in=sources, import_status="finished")
|
|
existing = existing.values_list("source", flat=True)
|
|
existing = set([p.replace("file://", "", 1) for p in existing])
|
|
skipped = set(matching) & existing
|
|
result = {
|
|
"initial": matching,
|
|
"skipped": list(sorted(skipped)),
|
|
"new": list(sorted(set(matching) - skipped)),
|
|
}
|
|
return result
|
|
|
|
def do_import(self, paths, library, reference, batch, options):
|
|
message = "[batch {batch}] {i}/{total} Importing {path}..."
|
|
if options["async_"]:
|
|
message = "[batch {batch}] {i}/{total} Launching import for {path}..."
|
|
|
|
# we create an upload binded to the library
|
|
async_ = options["async_"]
|
|
errors = []
|
|
for i, path in list(enumerate(paths)):
|
|
if options["verbosity"] > 1:
|
|
self.stdout.write(
|
|
message.format(batch=batch, path=path, i=i + 1, total=len(paths))
|
|
)
|
|
try:
|
|
create_upload(
|
|
path=path,
|
|
reference=reference,
|
|
library=library,
|
|
async_=async_,
|
|
replace=options["replace"],
|
|
in_place=options["in_place"],
|
|
dispatch_outbox=options["outbox"],
|
|
broadcast=options["broadcast"],
|
|
)
|
|
except Exception as e:
|
|
if options["exit_on_failure"]:
|
|
raise
|
|
m = "Error while importing {}: {} {}".format(
|
|
path, e.__class__.__name__, e
|
|
)
|
|
self.stderr.write(m)
|
|
errors.append((path, "{} {}".format(e.__class__.__name__, e)))
|
|
return errors
|
|
|
|
def setup_watcher(self, path, extensions, recursive, **kwargs):
|
|
watchdog_queue = queue.Queue()
|
|
# Set up a worker thread to process database load
|
|
worker = threading.Thread(
|
|
target=process_load_queue(self.stdout, **kwargs), args=(watchdog_queue,),
|
|
)
|
|
worker.setDaemon(True)
|
|
worker.start()
|
|
|
|
# setup watchdog to monitor directory for trigger files
|
|
patterns = ["*.{}".format(e) for e in extensions]
|
|
event_handler = Watcher(
|
|
stdout=self.stdout, queue=watchdog_queue, patterns=patterns,
|
|
)
|
|
observer = watchdog.observers.Observer()
|
|
observer.schedule(event_handler, path, recursive=recursive)
|
|
observer.start()
|
|
|
|
try:
|
|
while True:
|
|
self.stdout.write(
|
|
"Watching for changes at {}…".format(path), ending="\r"
|
|
)
|
|
time.sleep(10)
|
|
if kwargs["prune"] and GLOBAL["need_pruning"]:
|
|
self.stdout.write("Some files were deleted, pruning library…")
|
|
prune()
|
|
GLOBAL["need_pruning"] = False
|
|
except KeyboardInterrupt:
|
|
self.stdout.write("Exiting…")
|
|
observer.stop()
|
|
|
|
observer.join()
|
|
|
|
|
|
GLOBAL = {"need_pruning": False}
|
|
|
|
|
|
def prune():
|
|
call_command(
|
|
"prune_library",
|
|
dry_run=False,
|
|
prune_artists=True,
|
|
prune_albums=True,
|
|
prune_tracks=True,
|
|
)
|
|
|
|
|
|
def create_upload(
|
|
path, reference, library, async_, replace, in_place, dispatch_outbox, broadcast,
|
|
):
|
|
import_handler = tasks.process_upload.delay if async_ else tasks.process_upload
|
|
upload = models.Upload(library=library, import_reference=reference)
|
|
upload.source = "file://" + path
|
|
upload.import_metadata = {
|
|
"funkwhale": {
|
|
"config": {
|
|
"replace": replace,
|
|
"dispatch_outbox": dispatch_outbox,
|
|
"broadcast": broadcast,
|
|
}
|
|
}
|
|
}
|
|
if not in_place:
|
|
name = os.path.basename(path)
|
|
with open(path, "rb") as f:
|
|
upload.audio_file.save(name, File(f), save=False)
|
|
|
|
upload.save()
|
|
|
|
import_handler(upload_id=upload.pk)
|
|
|
|
|
|
def process_load_queue(stdout, **kwargs):
|
|
def inner(q):
|
|
# we batch events, to avoid calling same methods multiple times if a file is modified
|
|
# a lot in a really short time
|
|
flush_delay = 2
|
|
batched_events = collections.OrderedDict()
|
|
while True:
|
|
while True:
|
|
if not q.empty():
|
|
event = q.get()
|
|
batched_events[event["path"]] = event
|
|
else:
|
|
break
|
|
for path, event in batched_events.copy().items():
|
|
if time.time() - event["time"] <= flush_delay:
|
|
continue
|
|
now = datetime.datetime.utcnow()
|
|
stdout.write(
|
|
"{} -- Processing {}:{}...\n".format(
|
|
now.strftime("%Y/%m/%d %H:%M:%S"), event["type"], event["path"]
|
|
)
|
|
)
|
|
del batched_events[path]
|
|
handle_event(event, stdout=stdout, **kwargs)
|
|
time.sleep(1)
|
|
|
|
return inner
|
|
|
|
|
|
class Watcher(watchdog.events.PatternMatchingEventHandler):
|
|
def __init__(self, stdout, queue, patterns):
|
|
self.stdout = stdout
|
|
self.queue = queue
|
|
super().__init__(patterns=patterns)
|
|
|
|
def enqueue(self, event):
|
|
e = {
|
|
"is_directory": event.is_directory,
|
|
"type": event.event_type,
|
|
"path": event.src_path,
|
|
"src_path": event.src_path,
|
|
"dest_path": getattr(event, "dest_path", None),
|
|
"time": time.time(),
|
|
}
|
|
self.queue.put(e)
|
|
|
|
def on_moved(self, event):
|
|
self.enqueue(event)
|
|
|
|
def on_created(self, event):
|
|
self.enqueue(event)
|
|
|
|
def on_deleted(self, event):
|
|
self.enqueue(event)
|
|
|
|
def on_modified(self, event):
|
|
self.enqueue(event)
|
|
|
|
|
|
def handle_event(event, stdout, **kwargs):
|
|
handlers = {
|
|
"modified": handle_modified,
|
|
"created": handle_created,
|
|
"moved": handle_moved,
|
|
"deleted": handle_deleted,
|
|
}
|
|
handlers[event["type"]](event=event, stdout=stdout, **kwargs)
|
|
|
|
|
|
def handle_modified(event, stdout, library, in_place, **kwargs):
|
|
existing_candidates = library.uploads.filter(import_status="finished")
|
|
with open(event["path"], "rb") as f:
|
|
checksum = common_utils.get_file_hash(f)
|
|
|
|
existing = existing_candidates.filter(checksum=checksum).first()
|
|
if existing:
|
|
# found an existing file with same checksum, nothing to do
|
|
stdout.write(" File already imported and metadata is up-to-date")
|
|
return
|
|
|
|
to_update = None
|
|
if in_place:
|
|
source = "file://{}".format(event["path"])
|
|
to_update = (
|
|
existing_candidates.in_place()
|
|
.filter(source=source)
|
|
.select_related(
|
|
"track__attributed_to", "track__artist", "track__album__artist",
|
|
)
|
|
.first()
|
|
)
|
|
if to_update:
|
|
if (
|
|
to_update.track.attributed_to
|
|
and to_update.track.attributed_to != library.actor
|
|
):
|
|
stdout.write(
|
|
" Cannot update track metadata, track belongs to someone else".format(
|
|
to_update.pk
|
|
)
|
|
)
|
|
return
|
|
else:
|
|
stdout.write(
|
|
" Updating existing file #{} with new metadata…".format(
|
|
to_update.pk
|
|
)
|
|
)
|
|
audio_metadata = to_update.get_metadata()
|
|
try:
|
|
tasks.update_track_metadata(audio_metadata, to_update.track)
|
|
except serializers.ValidationError as e:
|
|
stdout.write(" Invalid metadata: {}".format(e))
|
|
else:
|
|
to_update.checksum = checksum
|
|
to_update.save(update_fields=["checksum"])
|
|
return
|
|
|
|
stdout.write(" Launching import for new file")
|
|
create_upload(
|
|
path=event["path"],
|
|
reference=kwargs["reference"],
|
|
library=library,
|
|
async_=False,
|
|
replace=kwargs["replace"],
|
|
in_place=in_place,
|
|
dispatch_outbox=kwargs["dispatch_outbox"],
|
|
broadcast=kwargs["broadcast"],
|
|
)
|
|
|
|
|
|
def handle_created(event, stdout, **kwargs):
|
|
"""
|
|
Created is essentially an alias for modified, because for instance when copying a file in the watched directory,
|
|
a created event will be fired on the initial touch, then many modified event (as the file is written).
|
|
"""
|
|
return handle_modified(event, stdout, **kwargs)
|
|
|
|
|
|
def handle_moved(event, stdout, library, in_place, **kwargs):
|
|
if not in_place:
|
|
return
|
|
|
|
old_source = "file://{}".format(event["src_path"])
|
|
new_source = "file://{}".format(event["dest_path"])
|
|
existing_candidates = library.uploads.filter(import_status="finished")
|
|
existing_candidates = existing_candidates.in_place().filter(source=old_source)
|
|
existing = existing_candidates.first()
|
|
if existing:
|
|
stdout.write(" Updating path of existing file #{}".format(existing.pk))
|
|
existing.source = new_source
|
|
existing.save(update_fields=["source"])
|
|
|
|
|
|
def handle_deleted(event, stdout, library, in_place, **kwargs):
|
|
if not in_place:
|
|
return
|
|
source = "file://{}".format(event["path"])
|
|
existing_candidates = library.uploads.filter(import_status="finished")
|
|
existing_candidates = existing_candidates.in_place().filter(source=source)
|
|
if existing_candidates.count():
|
|
stdout.write(" Removing file from DB")
|
|
existing_candidates.delete()
|
|
GLOBAL["need_pruning"] = True
|
|
|
|
|
|
def check_updates(stdout, library, extensions, paths, batch_size):
|
|
existing = (
|
|
library.uploads.in_place()
|
|
.filter(import_status="finished")
|
|
.exclude(checksum=None)
|
|
.select_related("library", "track")
|
|
)
|
|
queries = []
|
|
checked_paths = set()
|
|
for path in paths:
|
|
for ext in extensions:
|
|
queries.append(
|
|
Q(source__startswith="file://{}".format(path))
|
|
& Q(source__endswith=".{}".format(ext))
|
|
)
|
|
query, remainder = queries[0], queries[1:]
|
|
for q in remainder:
|
|
query = q | query
|
|
existing = existing.filter(query)
|
|
total = existing.count()
|
|
stdout.write("Found {} files to check in database!".format(total))
|
|
uploads = existing.order_by("source")
|
|
for i, rows in enumerate(batch(uploads.iterator(), batch_size)):
|
|
stdout.write("Handling batch {} ({} items)".format(i + 1, len(rows),))
|
|
|
|
for upload in rows:
|
|
|
|
check_upload(stdout, upload)
|
|
checked_paths.add(upload.source.replace("file://", "", 1))
|
|
|
|
return checked_paths
|
|
|
|
|
|
def check_upload(stdout, upload):
|
|
try:
|
|
audio_file = upload.get_audio_file()
|
|
except FileNotFoundError:
|
|
stdout.write(
|
|
" Removing file #{} missing from disk at {}".format(
|
|
upload.pk, upload.source
|
|
)
|
|
)
|
|
return upload.delete()
|
|
|
|
checksum = common_utils.get_file_hash(audio_file)
|
|
if upload.checksum != checksum:
|
|
stdout.write(
|
|
" File #{} at {} was modified, updating metadata…".format(
|
|
upload.pk, upload.source
|
|
)
|
|
)
|
|
if upload.library.actor_id != upload.track.attributed_to_id:
|
|
stdout.write(
|
|
" Cannot update track metadata, track belongs to someone else".format(
|
|
upload.pk
|
|
)
|
|
)
|
|
else:
|
|
track = models.Track.objects.select_related("artist", "album__artist").get(
|
|
pk=upload.track_id
|
|
)
|
|
try:
|
|
tasks.update_track_metadata(upload.get_metadata(), track)
|
|
except serializers.ValidationError as e:
|
|
stdout.write(" Invalid metadata: {}".format(e))
|
|
return
|
|
else:
|
|
upload.checksum = checksum
|
|
upload.save(update_fields=["checksum"])
|