2017-12-26 20:12:37 +00:00
|
|
|
import os
|
2018-04-21 14:33:15 +00:00
|
|
|
|
2018-06-10 08:55:16 +00:00
|
|
|
import pytest
|
2017-12-27 19:29:26 +00:00
|
|
|
from django.core.management import call_command
|
|
|
|
from django.core.management.base import CommandError
|
2017-12-24 18:15:21 +00:00
|
|
|
|
|
|
|
|
2018-09-22 15:47:17 +00:00
|
|
|
DATA_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), "files")
|
2017-12-24 18:15:21 +00:00
|
|
|
|
|
|
|
|
2018-09-22 15:47:17 +00:00
|
|
|
def test_management_command_requires_a_valid_library_id(factories):
|
2018-06-09 13:36:16 +00:00
|
|
|
path = os.path.join(DATA_DIR, "dummy_file.ogg")
|
2018-09-22 15:47:17 +00:00
|
|
|
|
|
|
|
with pytest.raises(CommandError) as e:
|
|
|
|
call_command("import_files", "wrong_id", path, interactive=False)
|
|
|
|
assert "Invalid library id" in str(e)
|
2017-12-27 19:29:26 +00:00
|
|
|
|
|
|
|
|
2018-04-21 16:16:43 +00:00
|
|
|
def test_in_place_import_only_from_music_dir(factories, settings):
|
2018-09-22 15:47:17 +00:00
|
|
|
library = factories["music.Library"](actor__local=True)
|
2018-06-09 13:36:16 +00:00
|
|
|
settings.MUSIC_DIRECTORY_PATH = "/nope"
|
|
|
|
path = os.path.join(DATA_DIR, "dummy_file.ogg")
|
2018-09-22 15:47:17 +00:00
|
|
|
with pytest.raises(CommandError) as e:
|
2018-04-21 16:16:43 +00:00
|
|
|
call_command(
|
2018-09-22 15:47:17 +00:00
|
|
|
"import_files", str(library.uuid), path, in_place=True, interactive=False
|
2018-04-21 16:16:43 +00:00
|
|
|
)
|
|
|
|
|
2018-09-22 15:47:17 +00:00
|
|
|
assert "Importing in-place only works if importing" in str(e)
|
|
|
|
|
2018-04-21 16:16:43 +00:00
|
|
|
|
2018-06-19 08:59:40 +00:00
|
|
|
def test_import_with_multiple_argument(factories, mocker):
|
2018-09-22 15:47:17 +00:00
|
|
|
library = factories["music.Library"](actor__local=True)
|
2018-06-19 08:59:40 +00:00
|
|
|
path1 = os.path.join(DATA_DIR, "dummy_file.ogg")
|
|
|
|
path2 = os.path.join(DATA_DIR, "utf8-éà◌.ogg")
|
|
|
|
mocked_filter = mocker.patch(
|
2018-09-28 20:47:05 +00:00
|
|
|
"funkwhale_api.music.management.commands.import_files.Command.filter_matching",
|
2018-06-19 08:59:40 +00:00
|
|
|
return_value=({"new": [], "skipped": []}),
|
|
|
|
)
|
2018-09-22 15:47:17 +00:00
|
|
|
call_command("import_files", str(library.uuid), path1, path2, interactive=False)
|
|
|
|
mocked_filter.assert_called_once_with([path1, path2], library)
|
|
|
|
|
|
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
|
|
"path",
|
|
|
|
[os.path.join(DATA_DIR, "dummy_file.ogg"), os.path.join(DATA_DIR, "utf8-éà◌.ogg")],
|
|
|
|
)
|
|
|
|
def test_import_files_stores_proper_data(factories, mocker, now, path):
|
|
|
|
mocked_process = mocker.patch("funkwhale_api.music.tasks.process_upload")
|
|
|
|
library = factories["music.Library"](actor__local=True)
|
|
|
|
call_command(
|
|
|
|
"import_files", str(library.uuid), path, async_=False, interactive=False
|
|
|
|
)
|
|
|
|
upload = library.uploads.last()
|
|
|
|
assert upload.import_reference == "cli-{}".format(now.isoformat())
|
|
|
|
assert upload.import_status == "pending"
|
|
|
|
assert upload.source == "file://{}".format(path)
|
2018-09-23 12:38:42 +00:00
|
|
|
assert upload.import_metadata == {
|
|
|
|
"funkwhale": {
|
|
|
|
"config": {"replace": False, "dispatch_outbox": False, "broadcast": False}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
mocked_process.assert_called_once_with(upload_id=upload.pk)
|
|
|
|
|
|
|
|
|
|
|
|
def test_import_with_outbox_flag(factories, mocker):
|
|
|
|
library = factories["music.Library"](actor__local=True)
|
|
|
|
path = os.path.join(DATA_DIR, "dummy_file.ogg")
|
|
|
|
mocked_process = mocker.patch("funkwhale_api.music.tasks.process_upload")
|
|
|
|
call_command(
|
|
|
|
"import_files", str(library.uuid), path, outbox=True, interactive=False
|
|
|
|
)
|
|
|
|
upload = library.uploads.last()
|
|
|
|
|
|
|
|
assert upload.import_metadata["funkwhale"]["config"]["dispatch_outbox"] is True
|
|
|
|
|
|
|
|
mocked_process.assert_called_once_with(upload_id=upload.pk)
|
|
|
|
|
|
|
|
|
|
|
|
def test_import_with_broadcast_flag(factories, mocker):
|
|
|
|
library = factories["music.Library"](actor__local=True)
|
|
|
|
path = os.path.join(DATA_DIR, "dummy_file.ogg")
|
|
|
|
mocked_process = mocker.patch("funkwhale_api.music.tasks.process_upload")
|
|
|
|
call_command(
|
|
|
|
"import_files", str(library.uuid), path, broadcast=True, interactive=False
|
|
|
|
)
|
|
|
|
upload = library.uploads.last()
|
|
|
|
|
|
|
|
assert upload.import_metadata["funkwhale"]["config"]["broadcast"] is True
|
2018-09-22 15:47:17 +00:00
|
|
|
|
|
|
|
mocked_process.assert_called_once_with(upload_id=upload.pk)
|
2018-06-19 08:59:40 +00:00
|
|
|
|
|
|
|
|
2018-06-22 15:12:54 +00:00
|
|
|
def test_import_with_replace_flag(factories, mocker):
|
2018-09-22 15:47:17 +00:00
|
|
|
library = factories["music.Library"](actor__local=True)
|
2018-06-22 15:12:54 +00:00
|
|
|
path = os.path.join(DATA_DIR, "dummy_file.ogg")
|
2018-09-22 15:47:17 +00:00
|
|
|
mocked_process = mocker.patch("funkwhale_api.music.tasks.process_upload")
|
|
|
|
call_command(
|
|
|
|
"import_files", str(library.uuid), path, replace=True, interactive=False
|
2018-06-22 15:12:54 +00:00
|
|
|
)
|
2018-09-22 15:47:17 +00:00
|
|
|
upload = library.uploads.last()
|
2018-06-22 15:12:54 +00:00
|
|
|
|
2018-09-23 12:38:42 +00:00
|
|
|
assert upload.import_metadata["funkwhale"]["config"]["replace"] is True
|
2018-06-22 15:12:54 +00:00
|
|
|
|
2018-09-22 15:47:17 +00:00
|
|
|
mocked_process.assert_called_once_with(upload_id=upload.pk)
|
2017-12-27 19:29:26 +00:00
|
|
|
|
|
|
|
|
2018-09-22 15:47:17 +00:00
|
|
|
def test_import_with_custom_reference(factories, mocker):
|
|
|
|
library = factories["music.Library"](actor__local=True)
|
|
|
|
path = os.path.join(DATA_DIR, "dummy_file.ogg")
|
|
|
|
mocked_process = mocker.patch("funkwhale_api.music.tasks.process_upload")
|
|
|
|
call_command(
|
|
|
|
"import_files",
|
|
|
|
str(library.uuid),
|
|
|
|
path,
|
|
|
|
reference="test",
|
|
|
|
replace=True,
|
|
|
|
interactive=False,
|
|
|
|
)
|
|
|
|
upload = library.uploads.last()
|
2017-12-27 19:29:26 +00:00
|
|
|
|
2018-09-22 15:47:17 +00:00
|
|
|
assert upload.import_reference == "test"
|
2017-12-27 19:29:26 +00:00
|
|
|
|
2018-09-22 15:47:17 +00:00
|
|
|
mocked_process.assert_called_once_with(upload_id=upload.pk)
|
2018-03-25 13:40:37 +00:00
|
|
|
|
|
|
|
|
2018-04-21 14:02:42 +00:00
|
|
|
def test_import_files_skip_if_path_already_imported(factories, mocker):
|
2018-09-22 15:47:17 +00:00
|
|
|
library = factories["music.Library"](actor__local=True)
|
2018-06-09 13:36:16 +00:00
|
|
|
path = os.path.join(DATA_DIR, "dummy_file.ogg")
|
2018-04-21 14:02:42 +00:00
|
|
|
|
2018-09-22 15:47:17 +00:00
|
|
|
# existing one with same source
|
|
|
|
factories["music.Upload"](
|
|
|
|
library=library, import_status="finished", source="file://{}".format(path)
|
|
|
|
)
|
2018-04-21 14:02:42 +00:00
|
|
|
|
2018-09-22 15:47:17 +00:00
|
|
|
call_command(
|
2018-10-26 13:44:54 +00:00
|
|
|
"import_files", str(library.uuid), path, async_=False, interactive=False
|
2018-09-22 15:47:17 +00:00
|
|
|
)
|
|
|
|
assert library.uploads.count() == 1
|
2018-03-25 13:40:37 +00:00
|
|
|
|
|
|
|
|
2018-04-21 16:16:43 +00:00
|
|
|
def test_import_files_in_place(factories, mocker, settings):
|
|
|
|
settings.MUSIC_DIRECTORY_PATH = DATA_DIR
|
2018-09-22 15:47:17 +00:00
|
|
|
mocked_process = mocker.patch("funkwhale_api.music.tasks.process_upload")
|
|
|
|
library = factories["music.Library"](actor__local=True)
|
2018-06-09 13:36:16 +00:00
|
|
|
path = os.path.join(DATA_DIR, "utf8-éà◌.ogg")
|
2018-04-21 16:16:43 +00:00
|
|
|
call_command(
|
2018-06-09 13:36:16 +00:00
|
|
|
"import_files",
|
2018-09-22 15:47:17 +00:00
|
|
|
str(library.uuid),
|
2018-04-21 16:16:43 +00:00
|
|
|
path,
|
2018-09-22 15:47:17 +00:00
|
|
|
async_=False,
|
2018-04-21 16:16:43 +00:00
|
|
|
in_place=True,
|
2018-06-09 13:36:16 +00:00
|
|
|
interactive=False,
|
|
|
|
)
|
2018-09-22 15:47:17 +00:00
|
|
|
upload = library.uploads.last()
|
|
|
|
assert bool(upload.audio_file) is False
|
|
|
|
mocked_process.assert_called_once_with(upload_id=upload.pk)
|
2018-04-21 16:16:43 +00:00
|
|
|
|
|
|
|
|
2018-03-25 13:40:37 +00:00
|
|
|
def test_storage_rename_utf_8_files(factories):
|
2018-09-22 12:29:30 +00:00
|
|
|
upload = factories["music.Upload"](audio_file__filename="été.ogg")
|
|
|
|
assert upload.audio_file.name.endswith("ete.ogg")
|