More unit tests

pull/846/head
Piero Toffanin 2020-04-02 16:58:58 -04:00
rodzic 652bf1438e
commit b4b0ff775e
9 zmienionych plików z 137 dodań i 7 usunięć

Wyświetl plik

@ -152,7 +152,6 @@ class PluginAdmin(admin.ModelAdmin):
return HttpResponseRedirect(reverse('admin:app_plugin_changelist')) return HttpResponseRedirect(reverse('admin:app_plugin_changelist'))
def plugin_upload(self, request, *args, **kwargs): def plugin_upload(self, request, *args, **kwargs):
# messages.info(request, "YAY")
file = request.FILES.get('file') file = request.FILES.get('file')
if file is not None: if file is not None:
# Save to tmp dir # Save to tmp dir

Wyświetl plik

@ -1,6 +1,6 @@
from django.core.exceptions import ObjectDoesNotExist, SuspiciousFileOperation from django.core.exceptions import ObjectDoesNotExist, SuspiciousFileOperation
from rest_framework import exceptions from rest_framework import exceptions
import os, zipfile import os
from app import models from app import models

Wyświetl plik

@ -19,7 +19,8 @@ from app import models, pending_actions
from nodeodm import status_codes from nodeodm import status_codes
from nodeodm.models import ProcessingNode from nodeodm.models import ProcessingNode
from worker import tasks as worker_tasks from worker import tasks as worker_tasks
from .common import get_and_check_project, path_traversal_check from .common import get_and_check_project
from app.security import path_traversal_check
def flatten_files(request_files): def flatten_files(request_files):

Plik binarny nie jest wyświetlany.

Plik binarny nie jest wyświetlany.

Plik binarny nie jest wyświetlany.

Wyświetl plik

@ -18,6 +18,7 @@ from django.http import HttpResponse
from app.models import Plugin from app.models import Plugin
from app.models import Setting from app.models import Setting
from django.conf import settings from django.conf import settings
from app.security import path_traversal_check
logger = logging.getLogger('app.logger') logger = logging.getLogger('app.logger')
@ -152,9 +153,10 @@ def register_plugins():
logger.warning("Cannot register {}: {}".format(plugin, str(e))) logger.warning("Cannot register {}: {}".format(plugin, str(e)))
def valid_plugin(plugin_path): def valid_plugin(plugin_path):
initpy_path = os.path.join(plugin_path, "__init__.py")
pluginpy_path = os.path.join(plugin_path, "plugin.py") pluginpy_path = os.path.join(plugin_path, "plugin.py")
manifest_path = os.path.join(plugin_path, "manifest.json") manifest_path = os.path.join(plugin_path, "manifest.json")
return os.path.isfile(manifest_path) and os.path.isfile(pluginpy_path) return os.path.isfile(initpy_path) and os.path.isfile(manifest_path) and os.path.isfile(pluginpy_path)
plugins = None plugins = None
def get_plugins(): def get_plugins():
@ -191,9 +193,13 @@ def get_plugins():
# Instantiate the plugin # Instantiate the plugin
try: try:
try: try:
if settings.TESTING:
module = importlib.import_module("app.media_test.plugins.{}".format(dir))
else:
module = importlib.import_module("app.media.plugins.{}".format(dir)) module = importlib.import_module("app.media.plugins.{}".format(dir))
plugin = (getattr(module, "Plugin"))() plugin = (getattr(module, "Plugin"))()
except (ModuleNotFoundError, AttributeError): except (ModuleNotFoundError, AttributeError) as e:
module = importlib.import_module("plugins.{}".format(dir)) module = importlib.import_module("plugins.{}".format(dir))
plugin = (getattr(module, "Plugin"))() plugin = (getattr(module, "Plugin"))()
@ -284,7 +290,7 @@ def get_plugins_paths():
] ]
def get_plugins_persistent_path(*paths): def get_plugins_persistent_path(*paths):
return os.path.join(settings.MEDIA_ROOT, "plugins", *paths) return path_traversal_check(os.path.join(settings.MEDIA_ROOT, "plugins", *paths), os.path.join(settings.MEDIA_ROOT, "plugins"))
def get_dynamic_script_handler(script_path, callback=None, **kwargs): def get_dynamic_script_handler(script_path, callback=None, **kwargs):
def handleRequest(request): def handleRequest(request):

12
app/security.py 100644
Wyświetl plik

@ -0,0 +1,12 @@
from django.core.exceptions import SuspiciousFileOperation
import os
def path_traversal_check(unsafe_path, known_safe_path):
known_safe_path = os.path.abspath(known_safe_path)
unsafe_path = os.path.abspath(unsafe_path)
if (os.path.commonprefix([known_safe_path, unsafe_path]) != known_safe_path):
raise SuspiciousFileOperation("{} is not safe".format(unsafe_path))
# Passes the check
return unsafe_path

Wyświetl plik

@ -2,6 +2,7 @@ import os
import shutil import shutil
import sys import sys
from django.contrib.auth.models import User from django.contrib.auth.models import User
from django.test import Client from django.test import Client
from rest_framework import status from rest_framework import status
@ -79,6 +80,9 @@ class TestPlugins(BootTestCase):
test_plugin = get_plugin_by_name("test") test_plugin = get_plugin_by_name("test")
self.assertTrue(os.path.exists(test_plugin.get_path("public/node_modules"))) self.assertTrue(os.path.exists(test_plugin.get_path("public/node_modules")))
# This is a persistent plugin
self.assertTrue(test_plugin.is_persistent())
# A webpack file and build directory have been created as a # A webpack file and build directory have been created as a
# result of the build_jsx_components directive # result of the build_jsx_components directive
self.assertTrue(os.path.exists(test_plugin.get_path("public/webpack.config.js"))) self.assertTrue(os.path.exists(test_plugin.get_path("public/webpack.config.js")))
@ -317,4 +321,112 @@ class TestPlugins(BootTestCase):
self.assertEqual(p.get_manifest()['author'], "Piero Toffanin") self.assertEqual(p.get_manifest()['author'], "Piero Toffanin")
def test_plugin_loading(self):
c = Client()
plugin_file = open("app/fixtures/testabc_plugin.zip", 'rb')
bad_dir_plugin_file = open("app/fixtures/bad_dir_plugin.zip", 'rb')
missing_manifest_plugin_file = open("app/fixtures/missing_manifest_plugin.zip", 'rb')
# Cannot upload new plugins anonymously
res = c.post('/admin/app/plugin/actions/upload/', {'file': plugin_file}, follow=True)
self.assertRedirects(res, '/admin/login/?next=/admin/app/plugin/actions/upload/')
self.assertFalse(os.path.exists(get_plugins_persistent_path("testabc")))
plugin_file.seek(0)
# Cannot upload plugins as a normal user
c.login(username='testuser', password='test1234')
res = c.post('/admin/app/plugin/actions/upload/', {'file': plugin_file}, follow=True)
self.assertRedirects(res, '/admin/login/?next=/admin/app/plugin/actions/upload/')
self.assertFalse(os.path.exists(get_plugins_persistent_path("testabc")))
self.assertEqual(Plugin.objects.filter(pk='testabc').count(), 0)
plugin_file.seek(0)
# Can upload plugin as an admin
c.login(username='testsuperuser', password='test1234')
res = c.post('/admin/app/plugin/actions/upload/', {'file': plugin_file}, follow=True)
self.assertRedirects(res, '/admin/app/plugin/')
messages = list(res.context['messages'])
self.assertTrue('Plugin added successfully' in str(messages[0]))
self.assertTrue(os.path.exists(get_plugins_persistent_path("testabc")))
plugin_file.seek(0)
# Plugin has been added to db
self.assertEqual(Plugin.objects.filter(pk='testabc').count(), 1)
# This is not a persistent plugin
self.assertFalse(get_plugin_by_name('testabc').is_persistent())
# Cannot upload the same plugin again (same name)
res = c.post('/admin/app/plugin/actions/upload/', {'file': plugin_file}, follow=True)
self.assertRedirects(res, '/admin/app/plugin/')
messages = list(res.context['messages'])
self.assertTrue('already exist' in str(messages[0]))
plugin_file.seek(0)
# Can access paths (while being logged in)
res = c.get('/plugins/testabc/hello/')
self.assertEqual(res.status_code, status.HTTP_200_OK)
res = c.get('/api/plugins/testabc/hello/')
self.assertEqual(res.status_code, status.HTTP_200_OK)
# Can access public paths as logged-in, (per plugin directive)
res = c.get('/plugins/testabc/file.txt')
self.assertEqual(res.status_code, status.HTTP_200_OK)
c.logout()
# Can still access the paths as anonymous
res = c.get('/plugins/testabc/hello/')
self.assertEqual(res.status_code, status.HTTP_200_OK)
res = c.get('/api/plugins/testabc/hello/')
self.assertEqual(res.status_code, status.HTTP_200_OK)
# But not the public paths as anonymous (per plugin directive)
res = c.get('/plugins/testabc/file.txt')
self.assertEqual(res.status_code, status.HTTP_404_NOT_FOUND)
# Cannot delete plugin as normal user
c.login(username='testuser', password='test1234')
res = c.get('/admin/app/plugin/testabc/delete/', follow=True)
self.assertRedirects(res, '/admin/login/?next=/admin/app/plugin/testabc/delete/')
# Can delete plugin as admin
c.login(username='testsuperuser', password='test1234')
res = c.get('/admin/app/plugin/testabc/delete/', follow=True)
self.assertRedirects(res, '/admin/app/plugin/')
messages = list(res.context['messages'])
# No errors
self.assertEqual(len(messages), 0)
# Directories have been removed
self.assertFalse(os.path.exists(get_plugins_persistent_path("testabc")))
# Cannot access the paths as anonymous
res = c.get('/plugins/testabc/hello/')
self.assertEqual(res.status_code, status.HTTP_404_NOT_FOUND)
res = c.get('/api/plugins/testabc/hello/')
self.assertEqual(res.status_code, status.HTTP_404_NOT_FOUND)
res = c.get('/plugins/testabc/file.txt')
self.assertEqual(res.status_code, status.HTTP_404_NOT_FOUND)
# Try to add malformed plugins files
res = c.post('/admin/app/plugin/actions/upload/', {'file': missing_manifest_plugin_file}, follow=True)
self.assertRedirects(res, '/admin/app/plugin/')
messages = list(res.context['messages'])
self.assertTrue('Cannot load plugin' in str(messages[0]))
self.assertFalse(os.path.exists(get_plugins_persistent_path("test123")))
self.assertEqual(Plugin.objects.filter(pk='test123').count(), 0)
missing_manifest_plugin_file.seek(0)
res = c.post('/admin/app/plugin/actions/upload/', {'file': bad_dir_plugin_file}, follow=True)
self.assertRedirects(res, '/admin/app/plugin/')
messages = list(res.context['messages'])
self.assertTrue('Cannot load plugin' in str(messages[0]))
missing_manifest_plugin_file.seek(0)
plugin_file.close()
missing_manifest_plugin_file.close()
bad_dir_plugin_file.close()