kopia lustrzana https://github.com/OpenDroneMap/WebODM
307 wiersze
12 KiB
Python
307 wiersze
12 KiB
Python
import os
|
|
import shutil
|
|
|
|
import sys
|
|
from django.contrib.auth.models import User
|
|
from django.test import Client
|
|
from rest_framework import status
|
|
|
|
from app.models import Plugin
|
|
from app.models import Project
|
|
from app.models import Task
|
|
from app.plugins import UserDataStore, enable_plugin
|
|
from app.plugins import get_plugin_by_name
|
|
from app.plugins import sync_plugin_db, get_plugins_persistent_path
|
|
from app.plugins.data_store import InvalidDataStoreValue
|
|
from app.plugins.pyutils import parse_requirements, compute_file_md5, requirements_installed
|
|
from .classes import BootTestCase
|
|
from app.plugins.grass_engine import grass, GrassEngineException
|
|
|
|
from worker.tasks import execute_grass_script
|
|
|
|
class TestPlugins(BootTestCase):
|
|
def setUp(self):
|
|
pass
|
|
|
|
def tearDown(self):
|
|
pass
|
|
|
|
def test_core_plugins(self):
|
|
client = Client()
|
|
|
|
# We can access public files core plugins (without auth)
|
|
res = client.get('/plugins/test/file.txt')
|
|
self.assertEqual(res.status_code, status.HTTP_200_OK)
|
|
|
|
# We mounted an endpoint
|
|
res = client.get('/plugins/test/app_mountpoint/')
|
|
self.assertEqual(res.status_code, status.HTTP_200_OK)
|
|
self.assertTemplateUsed(res, 'plugins/test/templates/app.html')
|
|
|
|
# No python packages have been installed (plugin is disabled)
|
|
self.assertFalse(os.path.exists(get_plugins_persistent_path("test", "site-packages")))
|
|
|
|
enable_plugin("test")
|
|
|
|
# Form was rendered correctly
|
|
self.assertContains(res, '<input type="text" name="testField" class="form-control" required id="id_testField" />', count=1, status_code=200, html=True)
|
|
|
|
# It uses regex properly
|
|
res = client.get('/plugins/test/app_mountpoint/a')
|
|
self.assertEqual(res.status_code, status.HTTP_404_NOT_FOUND)
|
|
|
|
# Querying a page should show the included CSS/JS files
|
|
client.login(username='testuser', password='test1234')
|
|
res = client.get('/dashboard/')
|
|
self.assertEqual(res.status_code, status.HTTP_200_OK)
|
|
|
|
self.assertContains(res, "<link href='/plugins/test/test.css' rel='stylesheet' type='text/css'>", html=True)
|
|
self.assertContains(res, "<script src='/plugins/test/test.js'></script>", html=True)
|
|
|
|
# And our menu entry
|
|
self.assertContains(res, '<li><a href="/plugins/test/menu_url/"><i class="test-icon"></i> Test</a></li>', html=True)
|
|
|
|
# A node_modules directory has been created as a result of npm install
|
|
# because we have a package.json in the public director
|
|
test_plugin = get_plugin_by_name("test")
|
|
self.assertTrue(os.path.exists(test_plugin.get_path("public/node_modules")))
|
|
|
|
# A webpack file and build directory have been created as a
|
|
# result of the build_jsx_components directive
|
|
self.assertTrue(os.path.exists(test_plugin.get_path("public/webpack.config.js")))
|
|
self.assertTrue(os.path.exists(test_plugin.get_path("public/build")))
|
|
self.assertTrue(os.path.exists(test_plugin.get_path("public/build/component.js")))
|
|
|
|
# Test task view
|
|
user = User.objects.get(username="testuser")
|
|
project = Project.objects.get(owner=user)
|
|
task = Task.objects.create(project=project, name="Test")
|
|
client.logout()
|
|
|
|
# Cannot see the task view without logging-in
|
|
res = client.get('/plugins/test/task/{}/'.format(task.id))
|
|
self.assertEqual(res.status_code, status.HTTP_404_NOT_FOUND)
|
|
|
|
client.login(username='testuser', password='test1234')
|
|
res = client.get('/plugins/test/task/{}/'.format(task.id))
|
|
self.assertEqual(res.status_code, status.HTTP_200_OK)
|
|
self.assertContains(res, str(task.id))
|
|
|
|
# Test dynamic script
|
|
res = client.get('/plugins/test/app_dynamic_script.js')
|
|
self.assertEqual(res.status_code, status.HTTP_200_OK)
|
|
self.assertTrue(res.content.decode('utf-8') == '') # Empty
|
|
|
|
res = client.get('/plugins/test/app_dynamic_script.js?print=1')
|
|
self.assertEqual(res.status_code, status.HTTP_200_OK)
|
|
self.assertTrue(res.content.decode('utf-8') == "console.log('Hello WebODM');") # Empty
|
|
|
|
# Check that the plugins media dirs have been created
|
|
self.assertTrue(os.path.exists(get_plugins_persistent_path()))
|
|
self.assertTrue(os.path.exists(get_plugins_persistent_path("test", "site-packages")))
|
|
self.assertEqual(get_plugins_persistent_path("test", "site-packages"), test_plugin.get_python_packages_path())
|
|
|
|
# Check MD5 install has been created
|
|
self.assertTrue(os.path.exists(test_plugin.get_python_packages_path("install_md5")))
|
|
with open(test_plugin.get_python_packages_path("install_md5"), "r") as f:
|
|
md5 = f.read().strip()
|
|
self.assertTrue(len(md5) > 20)
|
|
self.assertEqual(md5, compute_file_md5(test_plugin.get_path("requirements.txt")))
|
|
|
|
self.assertTrue(requirements_installed(test_plugin.get_path("requirements.txt"), test_plugin.get_python_packages_path()))
|
|
|
|
# Test python imports context
|
|
self.assertFalse(test_plugin.get_python_packages_path() in sys.path)
|
|
with test_plugin.python_imports():
|
|
self.assertTrue(test_plugin.get_python_packages_path() in sys.path)
|
|
self.assertFalse(test_plugin.get_python_packages_path() in sys.path)
|
|
|
|
# Parse requirements test
|
|
self.assertEqual(parse_requirements(test_plugin.get_path("requirements.txt"))[0], "pyodm")
|
|
|
|
# Current plugin test
|
|
self.assertEqual(test_plugin.get_current_plugin_test(), test_plugin)
|
|
|
|
|
|
|
|
def test_grass_engine(self):
|
|
cwd = os.path.dirname(os.path.realpath(__file__))
|
|
grass_scripts_dir = os.path.join(cwd, "grass_scripts")
|
|
|
|
ctx = grass.create_context()
|
|
points = """{
|
|
"type": "FeatureCollection",
|
|
"features": [
|
|
{
|
|
"type": "Feature",
|
|
"properties": {},
|
|
"geometry": {
|
|
"type": "Point",
|
|
"coordinates": [
|
|
13.770675659179686,
|
|
45.655328041141374
|
|
]
|
|
}
|
|
}
|
|
]
|
|
}"""
|
|
ctx.add_file('test.geojson', points)
|
|
ctx.set_location("EPSG:4326")
|
|
|
|
result = execute_grass_script.delay(
|
|
os.path.join(grass_scripts_dir, "simple_test.grass"),
|
|
ctx.serialize()
|
|
).get()
|
|
self.assertTrue("Number of points: 1" in result.get('output'))
|
|
|
|
self.assertTrue(result.get('context') == ctx.serialize())
|
|
|
|
# Context dir has been cleaned up automatically
|
|
self.assertFalse(os.path.exists(ctx.get_cwd()))
|
|
|
|
error = execute_grass_script.delay(
|
|
os.path.join(grass_scripts_dir, "nonexistant_script.grass"),
|
|
ctx.serialize()
|
|
).get()
|
|
self.assertIsInstance(error, dict)
|
|
self.assertIsInstance(error['error'], str)
|
|
|
|
with self.assertRaises(GrassEngineException):
|
|
ctx.execute(os.path.join(grass_scripts_dir, "nonexistant_script.grass"))
|
|
|
|
ctx = grass.create_context({"auto_cleanup": False})
|
|
ctx.add_file('test.geojson', points)
|
|
ctx.set_location("EPSG:4326")
|
|
|
|
result = execute_grass_script.delay(
|
|
os.path.join(grass_scripts_dir, "simple_test.grass"),
|
|
ctx.serialize()
|
|
).get()
|
|
self.assertTrue("Number of points: 1" in result.get('output'))
|
|
|
|
# Path still there
|
|
self.assertTrue(os.path.exists(ctx.get_cwd()))
|
|
|
|
ctx.cleanup()
|
|
|
|
# Cleanup worked
|
|
self.assertFalse(os.path.exists(ctx.get_cwd()))
|
|
|
|
def test_plugin_datastore(self):
|
|
enable_plugin("test")
|
|
test_plugin = get_plugin_by_name("test")
|
|
user = User.objects.get(username='testuser')
|
|
other_user = User.objects.get(username='testuser2')
|
|
|
|
uds = test_plugin.get_user_data_store(user)
|
|
other_uds = test_plugin.get_user_data_store(other_user)
|
|
gds = test_plugin.get_global_data_store()
|
|
|
|
# No key
|
|
self.assertFalse(uds.has_key('mykey'))
|
|
|
|
# Default value works
|
|
self.assertTrue(uds.get_string('mykey', 'default') == 'default')
|
|
|
|
# Still no key should have been added
|
|
self.assertFalse(uds.has_key('mykey'))
|
|
|
|
# Add key
|
|
(object, created) = uds.set_string('mykey', 'value')
|
|
self.assertTrue(object.string_value == 'value')
|
|
self.assertTrue(created)
|
|
self.assertTrue(uds.has_key('mykey'))
|
|
|
|
# Key is not visible in global datastore
|
|
self.assertFalse(gds.has_key('mykey'))
|
|
|
|
# Key is not visible in another user's data store
|
|
self.assertFalse(other_uds.has_key('mykey'))
|
|
|
|
# Key is not visible in another's plugin data store
|
|
# for the same user
|
|
other_puds = UserDataStore('test2', user)
|
|
self.assertFalse(other_puds.has_key('mykey'))
|
|
|
|
# Deleting a non-existing key return False
|
|
self.assertFalse(uds.del_key('nonexistant'))
|
|
|
|
# Deleting a valid key returns True
|
|
self.assertTrue(uds.del_key('mykey'))
|
|
self.assertFalse(uds.has_key('mykey'))
|
|
|
|
# Various data types setter/getter work
|
|
uds.set_int('myint', 5)
|
|
self.assertTrue(uds.get_int('myint') == 5)
|
|
|
|
uds.set_float('myfloat', 10.0)
|
|
self.assertTrue(uds.get_float('myfloat', 50.0) == 10.0)
|
|
|
|
uds.set_bool('mybool', True)
|
|
self.assertTrue(uds.get_bool('mybool'))
|
|
|
|
uds.set_json('myjson', {'test': 123})
|
|
self.assertTrue('test' in uds.get_json('myjson'))
|
|
|
|
# Invalid types
|
|
self.assertRaises(InvalidDataStoreValue, uds.set_bool, 'invalidbool', 5)
|
|
|
|
def test_toggle_plugins(self):
|
|
c = Client()
|
|
c.login(username='testuser', password='test1234')
|
|
|
|
# Cannot toggle plugins as normal user
|
|
res = c.get('/admin/app/plugin/test/disable/', follow=True)
|
|
self.assertRedirects(res, '/admin/login/?next=/admin/app/plugin/test/disable/')
|
|
|
|
c.login(username='testsuperuser', password='test1234')
|
|
|
|
enable_plugin("test")
|
|
|
|
# Test plugin is enabled
|
|
res = c.get('/admin/app/plugin/')
|
|
self.assertContains(res, '<a class="button" href="#" disabled>Enable</a>')
|
|
self.assertContains(res, "<script src='/plugins/test/test.js'></script>")
|
|
|
|
# Disable
|
|
res = c.get('/admin/app/plugin/test/disable/', follow=True)
|
|
self.assertEqual(res.status_code, status.HTTP_200_OK)
|
|
|
|
# Test active vs. non-active flag for get_plugin_by_name
|
|
self.assertTrue(get_plugin_by_name("test") is None)
|
|
self.assertFalse(get_plugin_by_name("test", only_active=False) is None)
|
|
|
|
# Test plugin has been disabled
|
|
self.assertContains(res, '<a class="button" href="#" disabled>Disable</a>')
|
|
self.assertNotContains(res, "<script src='/plugins/test/test.js'></script>")
|
|
|
|
# Re-enable
|
|
res = c.get('/admin/app/plugin/test/enable/', follow=True)
|
|
self.assertEqual(res.status_code, status.HTTP_200_OK)
|
|
|
|
|
|
def test_plugin_functions(self):
|
|
# Check db/fs syncing
|
|
if os.path.exists('plugins/test_copy'):
|
|
print("Removing plugins/test_copy")
|
|
shutil.rmtree('plugins/test_copy')
|
|
|
|
sync_plugin_db()
|
|
self.assertTrue(Plugin.objects.filter(pk='test_copy').count() == 0)
|
|
|
|
shutil.copytree('plugins/test', 'plugins/test_copy')
|
|
|
|
sync_plugin_db()
|
|
self.assertTrue(Plugin.objects.filter(pk='test_copy').count() == 1)
|
|
|
|
shutil.rmtree('plugins/test_copy')
|
|
sync_plugin_db()
|
|
self.assertTrue(Plugin.objects.filter(pk='test_copy').count() == 0)
|
|
|
|
# Get manifest works and parses JSON
|
|
p = get_plugin_by_name("test", only_active=False)
|
|
self.assertEqual(p.get_manifest()['author'], "Piero Toffanin")
|
|
|
|
|
|
|