diff --git a/app/boot.py b/app/boot.py index 6054924a..7448a704 100644 --- a/app/boot.py +++ b/app/boot.py @@ -1,5 +1,6 @@ import os +import kombu from django.contrib.auth.models import Permission from django.contrib.auth.models import User, Group from django.core.exceptions import ObjectDoesNotExist @@ -93,7 +94,11 @@ def boot(): register_plugins() if not settings.TESTING: - worker_tasks.update_nodes_info.delay() + try: + worker_tasks.update_nodes_info.delay() + except kombu.exceptions.OperationalError as e: + logger.error("Cannot connect to celery broker at {}. Make sure that your redis-server is running at that address: {}".format(settings.CELERY_BROKER_URL, str(e))) + except ProgrammingError: logger.warning("Could not touch the database. If running a migration, this is expected.") \ No newline at end of file diff --git a/app/tests/test_api_task.py b/app/tests/test_api_task.py index 0e8d8afe..c2fd31f7 100644 --- a/app/tests/test_api_task.py +++ b/app/tests/test_api_task.py @@ -1,8 +1,6 @@ import os -import subprocess import time -import shutil import logging from datetime import timedelta @@ -10,11 +8,10 @@ from datetime import timedelta import json import requests from django.contrib.auth.models import User -from django.contrib.gis.gdal import GDALRaster -from django.contrib.gis.gdal import OGRGeometry from rest_framework import status from rest_framework.test import APIClient +import worker from app import pending_actions from django.utils import timezone from app.models import Project, Task, ImageUpload @@ -23,6 +20,7 @@ from app.tests.classes import BootTransactionTestCase from nodeodm import status_codes from nodeodm.models import ProcessingNode, OFFLINE_MINUTES from app.testwatch import testWatch +from .utils import start_processing_node, clear_test_media_root # We need to test the task API in a TransactionTestCase because # task processing happens on a separate thread, and normal TestCases @@ -33,26 +31,10 @@ logger = logging.getLogger('app.logger') DELAY = 2 # time to sleep for during process launch, background processing, etc. -def start_processing_node(*args): - current_dir = os.path.dirname(os.path.realpath(__file__)) - node_odm = subprocess.Popen(['node', 'index.js', '--port', '11223', '--test'] + list(args), shell=False, - cwd=os.path.join(current_dir, "..", "..", "nodeodm", "external", "node-OpenDroneMap")) - time.sleep(DELAY) # Wait for the server to launch - return node_odm - class TestApiTask(BootTransactionTestCase): def setUp(self): super().setUp() - - # We need to clear previous media_root content - # This points to the test directory, but just in case - # we double check that the directory is indeed a test directory - if "_test" in settings.MEDIA_ROOT: - if os.path.exists(settings.MEDIA_ROOT): - logger.info("Cleaning up {}".format(settings.MEDIA_ROOT)) - shutil.rmtree(settings.MEDIA_ROOT) - else: - logger.warning("We did not remove MEDIA_ROOT because we couldn't find a _test suffix in its path.") + clear_test_media_root() def test_task(self): client = APIClient() @@ -204,7 +186,7 @@ class TestApiTask(BootTransactionTestCase): self.assertTrue(res.status_code == status.HTTP_200_OK) # On update scheduler.processing_pending_tasks should have been called in the background - testWatch.wait_until_call("app.scheduler.process_pending_tasks", timeout=5) + testWatch.wait_until_call("worker.tasks.process_pending_tasks", timeout=5) # Processing should have started and a UUID is assigned task.refresh_from_db() @@ -225,7 +207,7 @@ class TestApiTask(BootTransactionTestCase): time.sleep(DELAY) # Calling process pending tasks should finish the process - scheduler.process_pending_tasks() + worker.tasks.process_pending_tasks() task.refresh_from_db() self.assertTrue(task.status == status_codes.COMPLETED) @@ -294,7 +276,7 @@ class TestApiTask(BootTransactionTestCase): testWatch.clear() res = client.post("/api/projects/{}/tasks/{}/restart/".format(project.id, task.id)) self.assertTrue(res.status_code == status.HTTP_200_OK) - testWatch.wait_until_call("app.scheduler.process_pending_tasks", timeout=5) + testWatch.wait_until_call("worker.tasks.process_pending_tasks", timeout=5) task.refresh_from_db() self.assertTrue(task.status in [status_codes.RUNNING, status_codes.COMPLETED]) @@ -303,7 +285,7 @@ class TestApiTask(BootTransactionTestCase): testWatch.clear() res = client.post("/api/projects/{}/tasks/{}/cancel/".format(project.id, task.id)) self.assertTrue(res.status_code == status.HTTP_200_OK) - testWatch.wait_until_call("app.scheduler.process_pending_tasks", timeout=5) + testWatch.wait_until_call("worker.tasks.process_pending_tasks", timeout=5) # Should have been canceled task.refresh_from_db() @@ -312,7 +294,7 @@ class TestApiTask(BootTransactionTestCase): # Remove a task res = client.post("/api/projects/{}/tasks/{}/remove/".format(project.id, task.id)) self.assertTrue(res.status_code == status.HTTP_200_OK) - testWatch.wait_until_call("app.scheduler.process_pending_tasks", 2, timeout=5) + testWatch.wait_until_call("worker.tasks.process_pending_tasks", 2, timeout=5) # Has been removed along with assets self.assertFalse(Task.objects.filter(pk=task.id).exists()) @@ -322,7 +304,7 @@ class TestApiTask(BootTransactionTestCase): self.assertFalse(os.path.exists(task_assets_path)) testWatch.clear() - testWatch.intercept("app.scheduler.process_pending_tasks") + testWatch.intercept("worker.tasks.process_pending_tasks") # Create a task, then kill the processing node res = client.post("/api/projects/{}/tasks/".format(project.id), { @@ -339,7 +321,7 @@ class TestApiTask(BootTransactionTestCase): task.refresh_from_db() self.assertTrue(task.last_error is None) - scheduler.process_pending_tasks() + worker.tasks.process_pending_tasks() # Processing should fail and set an error task.refresh_from_db() @@ -356,20 +338,20 @@ class TestApiTask(BootTransactionTestCase): self.assertTrue(task.pending_action == pending_actions.RESTART) # After processing, the task should have restarted, and have no UUID or status - scheduler.process_pending_tasks() + worker.tasks.process_pending_tasks() task.refresh_from_db() self.assertTrue(task.status is None) self.assertTrue(len(task.uuid) == 0) # Another step and it should have acquired a UUID - scheduler.process_pending_tasks() + worker.tasks.process_pending_tasks() task.refresh_from_db() self.assertTrue(task.status in [status_codes.RUNNING, status_codes.COMPLETED]) self.assertTrue(len(task.uuid) > 0) # Another step and it should be completed time.sleep(DELAY) - scheduler.process_pending_tasks() + worker.tasks.process_pending_tasks() task.refresh_from_db() self.assertTrue(task.status == status_codes.COMPLETED) @@ -392,7 +374,7 @@ class TestApiTask(BootTransactionTestCase): # 4. Check that the rerun_from parameter has been cleared # but the other parameters are still set - scheduler.process_pending_tasks() + worker.tasks.process_pending_tasks() task.refresh_from_db() self.assertTrue(len(task.uuid) == 0) self.assertTrue(len(list(filter(lambda d: d['name'] == 'rerun-from', task.options))) == 0) @@ -403,7 +385,7 @@ class TestApiTask(BootTransactionTestCase): raise requests.exceptions.ConnectTimeout("Simulated timeout") testWatch.intercept("nodeodm.api_client.task_output", connTimeout) - scheduler.process_pending_tasks() + worker.tasks.process_pending_tasks() # Timeout errors should be handled by retrying again at a later time # and not fail @@ -439,9 +421,9 @@ class TestApiTask(BootTransactionTestCase): }, format="multipart") self.assertTrue(res.status_code == status.HTTP_201_CREATED) - scheduler.process_pending_tasks() + worker.tasks.process_pending_tasks() time.sleep(DELAY) - scheduler.process_pending_tasks() + worker.tasks.process_pending_tasks() task = Task.objects.get(pk=res.data['id']) self.assertTrue(task.status == status_codes.COMPLETED) @@ -491,7 +473,7 @@ class TestApiTask(BootTransactionTestCase): task.last_error = "Test error" task.save() - scheduler.process_pending_tasks() + worker.tasks.process_pending_tasks() # A processing node should not have been assigned task.refresh_from_db() @@ -501,7 +483,7 @@ class TestApiTask(BootTransactionTestCase): task.last_error = None task.save() - scheduler.process_pending_tasks() + worker.tasks.process_pending_tasks() # A processing node should not have been assigned because no processing nodes are online task.refresh_from_db() @@ -513,7 +495,7 @@ class TestApiTask(BootTransactionTestCase): self.assertTrue(pnode.is_online()) # A processing node has been assigned - scheduler.process_pending_tasks() + worker.tasks.process_pending_tasks() task.refresh_from_db() self.assertTrue(task.processing_node.id == pnode.id) @@ -532,13 +514,13 @@ class TestApiTask(BootTransactionTestCase): task.status = None task.save() - scheduler.process_pending_tasks() + worker.tasks.process_pending_tasks() # Processing node is now cleared and a new one will be assigned on the next tick task.refresh_from_db() self.assertTrue(task.processing_node is None) - scheduler.process_pending_tasks() + worker.tasks.process_pending_tasks() task.refresh_from_db() self.assertTrue(task.processing_node.id == another_pnode.id) @@ -554,7 +536,7 @@ class TestApiTask(BootTransactionTestCase): pnode.save() self.assertTrue(pnode.is_online()) - scheduler.process_pending_tasks() + worker.tasks.process_pending_tasks() # A processing node should not have been assigned because we asked # not to via auto_processing_node = false diff --git a/app/tests/test_app.py b/app/tests/test_app.py index e9c51b4a..5708c14b 100644 --- a/app/tests/test_app.py +++ b/app/tests/test_app.py @@ -4,7 +4,6 @@ from rest_framework import status from app.models import Project, Task from .classes import BootTestCase -from app import scheduler from django.core.exceptions import ValidationError class TestApp(BootTestCase): @@ -199,9 +198,3 @@ class TestApp(BootTestCase): task.options = [{'name': 'test', 'value': 1}, {"invalid": 1}] self.assertRaises(ValidationError, task.save) - - - def test_worker(self): - self.assertTrue(True) # TODO!!! - - diff --git a/app/tests/test_welcome.py b/app/tests/test_welcome.py index 072ce04a..28d99cd0 100644 --- a/app/tests/test_welcome.py +++ b/app/tests/test_welcome.py @@ -1,11 +1,9 @@ -from django.contrib.auth.models import User, Group +from django.contrib.auth.models import User from django.test import Client -from rest_framework import status -from app.models import Project, Task +from app.models import Project from .classes import BootTestCase -from app import scheduler -from django.core.exceptions import ValidationError + class TestWelcome(BootTestCase): diff --git a/app/tests/test_worker.py b/app/tests/test_worker.py new file mode 100644 index 00000000..b8340cef --- /dev/null +++ b/app/tests/test_worker.py @@ -0,0 +1,51 @@ +import worker +from app import pending_actions +from app.models import Project +from app.models import Task +from nodeodm.models import ProcessingNode +from .classes import BootTestCase +from .utils import start_processing_node + +class TestWelcome(BootTestCase): + def setUp(self): + super().setUp() + + def tearDown(self): + pass + + def test_worker_tasks(self): + project = Project.objects.get(name="User Test Project") + + pnode = ProcessingNode.objects.create(hostname="localhost", port=11223) + self.assertTrue(pnode.api_version is None) + + pnserver = start_processing_node() + + worker.tasks.update_nodes_info() + + pnode.refresh_from_db() + self.assertTrue(pnode.api_version is not None) + + # Create task + task = Task.objects.create(project=project) + + # Delete project + project.deleting = True + project.save() + + worker.tasks.cleanup_projects() + + # Task and project should still be here (since task still exists) + self.assertTrue(Task.objects.filter(pk=task.id).exists()) + self.assertTrue(Project.objects.filter(pk=project.id).exists()) + + # Remove task + task.delete() + + worker.tasks.cleanup_projects() + + # Task and project should have been removed (now that task count is zero) + self.assertFalse(Task.objects.filter(pk=task.id).exists()) + self.assertFalse(Project.objects.filter(pk=project.id).exists()) + + pnserver.terminate() diff --git a/app/tests/utils.py b/app/tests/utils.py new file mode 100644 index 00000000..cd788299 --- /dev/null +++ b/app/tests/utils.py @@ -0,0 +1,29 @@ +import os +import shutil +import time + +import subprocess + +import logging + +from webodm import settings + +logger = logging.getLogger('app.logger') + +def start_processing_node(*args): + current_dir = os.path.dirname(os.path.realpath(__file__)) + node_odm = subprocess.Popen(['node', 'index.js', '--port', '11223', '--test'] + list(args), shell=False, + cwd=os.path.join(current_dir, "..", "..", "nodeodm", "external", "node-OpenDroneMap")) + time.sleep(2) # Wait for the server to launch + return node_odm + +# We need to clear previous media_root content +# This points to the test directory, but just in case +# we double check that the directory is indeed a test directory +def clear_test_media_root(): + if "_test" in settings.MEDIA_ROOT: + if os.path.exists(settings.MEDIA_ROOT): + logger.info("Cleaning up {}".format(settings.MEDIA_ROOT)) + shutil.rmtree(settings.MEDIA_ROOT) + else: + logger.warning("We did not remove MEDIA_ROOT because we couldn't find a _test suffix in its path.") \ No newline at end of file diff --git a/app/testwatch.py b/app/testwatch.py index 46c2de71..1ae0c99a 100644 --- a/app/testwatch.py +++ b/app/testwatch.py @@ -100,19 +100,31 @@ class SharedTestWatch(TestWatch): """ :param redis_url same as celery broker URL, for ex. redis://localhost:1234 """ + def needs_redis(func): + # Lazy evaluator for redis instance + def wrapper(self, *args): + if not hasattr(self, 'r'): + self.r = redis.from_url(self.redis_url) + return func(self, *args) + return wrapper + def __init__(self, redis_url): - self.r = redis.from_url(redis_url) + self.redis_url = redis_url super().__init__() + @needs_redis def clear(self): self.r.delete('testwatch:calls', 'testwatch:intercept_list') + @needs_redis def intercept(self, fname, f = None): self.r.hmset('testwatch:intercept_list', {fname: marshal.dumps(f.__code__) if f is not None else 1}) + @needs_redis def intercept_list_has(self, fname): return self.r.hget('testwatch:intercept_list', fname) is not None + @needs_redis def execute_intercept_function_replacement(self, fname, *args, **kwargs): if self.intercept_list_has(fname) and self.r.hget('testwatch:intercept_list', fname) != b'1': # Rebuild function @@ -120,13 +132,19 @@ class SharedTestWatch(TestWatch): f = types.FunctionType(marshal.loads(fcode), globals()) f(*args, **kwargs) + @needs_redis def get_calls(self, fname): value = self.r.hget('testwatch:calls', fname) if value is None: return [] else: return json.loads(value.decode('utf-8')) + @needs_redis def set_calls(self, fname, value): self.r.hmset('testwatch:calls', {fname: json.dumps(value)}) -testWatch = TestWatch() \ No newline at end of file + def watch(**kwargs): + return TestWatch.watch(tw=sharedTestWatch, **kwargs) + +testWatch = TestWatch() +sharedTestWatch = SharedTestWatch(settings.CELERY_BROKER_URL) \ No newline at end of file diff --git a/app/urls.py b/app/urls.py index 2f3f2e2a..de02963b 100644 --- a/app/urls.py +++ b/app/urls.py @@ -32,6 +32,5 @@ urlpatterns += get_url_patterns() # Test cases call boot() independently # Also don't execute boot with celery workers -celery_running = sys.argv[2:3] == ["worker"] -if not celery_running and not settings.TESTING: +if not settings.WORKER_RUNNING and not settings.TESTING: boot() \ No newline at end of file diff --git a/nodeodm/models.py b/nodeodm/models.py index dc13cec2..1bfd89ff 100644 --- a/nodeodm/models.py +++ b/nodeodm/models.py @@ -11,7 +11,7 @@ from guardian.models import UserObjectPermissionBase from .api_client import ApiClient import json from django.db.models import signals -from datetime import datetime, timedelta +from datetime import timedelta from .exceptions import ProcessingError, ProcessingTimeout import simplejson diff --git a/package.json b/package.json index d1934505..b68ed541 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "WebODM", - "version": "0.4.2", + "version": "0.5.0", "description": "Open Source Drone Image Processing", "main": "index.js", "scripts": { diff --git a/plugins/measure/manifest.json b/plugins/measure/manifest.json index 3859d021..8cc635de 100644 --- a/plugins/measure/manifest.json +++ b/plugins/measure/manifest.json @@ -1,6 +1,6 @@ { "name": "Area/Length Measurements", - "webodmMinVersion": "0.4.2", + "webodmMinVersion": "0.5.0", "description": "A plugin to compute area and length measurements on Leaflet", "version": "0.1.0", "author": "Piero Toffanin", diff --git a/plugins/volume/manifest.json b/plugins/volume/manifest.json index db034358..c4d68fe4 100644 --- a/plugins/volume/manifest.json +++ b/plugins/volume/manifest.json @@ -1,6 +1,6 @@ { "name": "Volume Measurements", - "webodmMinVersion": "0.4.2", + "webodmMinVersion": "0.5.0", "description": "A plugin to compute volume measurements from a DSM", "version": "0.1.0", "author": "Piero Toffanin", diff --git a/webodm/settings.py b/webodm/settings.py index 2a297ddb..41778efb 100644 --- a/webodm/settings.py +++ b/webodm/settings.py @@ -45,6 +45,7 @@ with open(os.path.join(BASE_DIR, 'package.json')) as package_file: VERSION = data['version'] TESTING = sys.argv[1:2] == ['test'] +WORKER_RUNNING = sys.argv[2:3] == ["worker"] # SECURITY WARNING: don't run with debug turned on a public facing server! DEBUG = os.environ.get('WO_DEBUG', 'YES') == 'YES' or TESTING diff --git a/worker/celery.py b/worker/celery.py index 4bdd8fe8..98871334 100644 --- a/worker/celery.py +++ b/worker/celery.py @@ -33,5 +33,12 @@ app.conf.beat_schedule = { }, } +# We need this for tests +@app.task(name='celery.ping') +def ping(): + # type: () -> str + """Simple task that just returns 'pong'.""" + return 'pong' + if __name__ == '__main__': app.start() \ No newline at end of file