Worker unit tests, changed version number

pull/384/head
Piero Toffanin 2018-02-16 15:07:53 -05:00
rodzic 47dad9b370
commit 1aedfd5c7a
14 zmienionych plików z 145 dodań i 62 usunięć

Wyświetl plik

@ -1,5 +1,6 @@
import os import os
import kombu
from django.contrib.auth.models import Permission from django.contrib.auth.models import Permission
from django.contrib.auth.models import User, Group from django.contrib.auth.models import User, Group
from django.core.exceptions import ObjectDoesNotExist from django.core.exceptions import ObjectDoesNotExist
@ -93,7 +94,11 @@ def boot():
register_plugins() register_plugins()
if not settings.TESTING: if not settings.TESTING:
worker_tasks.update_nodes_info.delay() try:
worker_tasks.update_nodes_info.delay()
except kombu.exceptions.OperationalError as e:
logger.error("Cannot connect to celery broker at {}. Make sure that your redis-server is running at that address: {}".format(settings.CELERY_BROKER_URL, str(e)))
except ProgrammingError: except ProgrammingError:
logger.warning("Could not touch the database. If running a migration, this is expected.") logger.warning("Could not touch the database. If running a migration, this is expected.")

Wyświetl plik

@ -1,8 +1,6 @@
import os import os
import subprocess
import time import time
import shutil
import logging import logging
from datetime import timedelta from datetime import timedelta
@ -10,11 +8,10 @@ from datetime import timedelta
import json import json
import requests import requests
from django.contrib.auth.models import User from django.contrib.auth.models import User
from django.contrib.gis.gdal import GDALRaster
from django.contrib.gis.gdal import OGRGeometry
from rest_framework import status from rest_framework import status
from rest_framework.test import APIClient from rest_framework.test import APIClient
import worker
from app import pending_actions from app import pending_actions
from django.utils import timezone from django.utils import timezone
from app.models import Project, Task, ImageUpload from app.models import Project, Task, ImageUpload
@ -23,6 +20,7 @@ from app.tests.classes import BootTransactionTestCase
from nodeodm import status_codes from nodeodm import status_codes
from nodeodm.models import ProcessingNode, OFFLINE_MINUTES from nodeodm.models import ProcessingNode, OFFLINE_MINUTES
from app.testwatch import testWatch from app.testwatch import testWatch
from .utils import start_processing_node, clear_test_media_root
# We need to test the task API in a TransactionTestCase because # We need to test the task API in a TransactionTestCase because
# task processing happens on a separate thread, and normal TestCases # task processing happens on a separate thread, and normal TestCases
@ -33,26 +31,10 @@ logger = logging.getLogger('app.logger')
DELAY = 2 # time to sleep for during process launch, background processing, etc. DELAY = 2 # time to sleep for during process launch, background processing, etc.
def start_processing_node(*args):
current_dir = os.path.dirname(os.path.realpath(__file__))
node_odm = subprocess.Popen(['node', 'index.js', '--port', '11223', '--test'] + list(args), shell=False,
cwd=os.path.join(current_dir, "..", "..", "nodeodm", "external", "node-OpenDroneMap"))
time.sleep(DELAY) # Wait for the server to launch
return node_odm
class TestApiTask(BootTransactionTestCase): class TestApiTask(BootTransactionTestCase):
def setUp(self): def setUp(self):
super().setUp() super().setUp()
clear_test_media_root()
# We need to clear previous media_root content
# This points to the test directory, but just in case
# we double check that the directory is indeed a test directory
if "_test" in settings.MEDIA_ROOT:
if os.path.exists(settings.MEDIA_ROOT):
logger.info("Cleaning up {}".format(settings.MEDIA_ROOT))
shutil.rmtree(settings.MEDIA_ROOT)
else:
logger.warning("We did not remove MEDIA_ROOT because we couldn't find a _test suffix in its path.")
def test_task(self): def test_task(self):
client = APIClient() client = APIClient()
@ -204,7 +186,7 @@ class TestApiTask(BootTransactionTestCase):
self.assertTrue(res.status_code == status.HTTP_200_OK) self.assertTrue(res.status_code == status.HTTP_200_OK)
# On update scheduler.processing_pending_tasks should have been called in the background # On update scheduler.processing_pending_tasks should have been called in the background
testWatch.wait_until_call("app.scheduler.process_pending_tasks", timeout=5) testWatch.wait_until_call("worker.tasks.process_pending_tasks", timeout=5)
# Processing should have started and a UUID is assigned # Processing should have started and a UUID is assigned
task.refresh_from_db() task.refresh_from_db()
@ -225,7 +207,7 @@ class TestApiTask(BootTransactionTestCase):
time.sleep(DELAY) time.sleep(DELAY)
# Calling process pending tasks should finish the process # Calling process pending tasks should finish the process
scheduler.process_pending_tasks() worker.tasks.process_pending_tasks()
task.refresh_from_db() task.refresh_from_db()
self.assertTrue(task.status == status_codes.COMPLETED) self.assertTrue(task.status == status_codes.COMPLETED)
@ -294,7 +276,7 @@ class TestApiTask(BootTransactionTestCase):
testWatch.clear() testWatch.clear()
res = client.post("/api/projects/{}/tasks/{}/restart/".format(project.id, task.id)) res = client.post("/api/projects/{}/tasks/{}/restart/".format(project.id, task.id))
self.assertTrue(res.status_code == status.HTTP_200_OK) self.assertTrue(res.status_code == status.HTTP_200_OK)
testWatch.wait_until_call("app.scheduler.process_pending_tasks", timeout=5) testWatch.wait_until_call("worker.tasks.process_pending_tasks", timeout=5)
task.refresh_from_db() task.refresh_from_db()
self.assertTrue(task.status in [status_codes.RUNNING, status_codes.COMPLETED]) self.assertTrue(task.status in [status_codes.RUNNING, status_codes.COMPLETED])
@ -303,7 +285,7 @@ class TestApiTask(BootTransactionTestCase):
testWatch.clear() testWatch.clear()
res = client.post("/api/projects/{}/tasks/{}/cancel/".format(project.id, task.id)) res = client.post("/api/projects/{}/tasks/{}/cancel/".format(project.id, task.id))
self.assertTrue(res.status_code == status.HTTP_200_OK) self.assertTrue(res.status_code == status.HTTP_200_OK)
testWatch.wait_until_call("app.scheduler.process_pending_tasks", timeout=5) testWatch.wait_until_call("worker.tasks.process_pending_tasks", timeout=5)
# Should have been canceled # Should have been canceled
task.refresh_from_db() task.refresh_from_db()
@ -312,7 +294,7 @@ class TestApiTask(BootTransactionTestCase):
# Remove a task # Remove a task
res = client.post("/api/projects/{}/tasks/{}/remove/".format(project.id, task.id)) res = client.post("/api/projects/{}/tasks/{}/remove/".format(project.id, task.id))
self.assertTrue(res.status_code == status.HTTP_200_OK) self.assertTrue(res.status_code == status.HTTP_200_OK)
testWatch.wait_until_call("app.scheduler.process_pending_tasks", 2, timeout=5) testWatch.wait_until_call("worker.tasks.process_pending_tasks", 2, timeout=5)
# Has been removed along with assets # Has been removed along with assets
self.assertFalse(Task.objects.filter(pk=task.id).exists()) self.assertFalse(Task.objects.filter(pk=task.id).exists())
@ -322,7 +304,7 @@ class TestApiTask(BootTransactionTestCase):
self.assertFalse(os.path.exists(task_assets_path)) self.assertFalse(os.path.exists(task_assets_path))
testWatch.clear() testWatch.clear()
testWatch.intercept("app.scheduler.process_pending_tasks") testWatch.intercept("worker.tasks.process_pending_tasks")
# Create a task, then kill the processing node # Create a task, then kill the processing node
res = client.post("/api/projects/{}/tasks/".format(project.id), { res = client.post("/api/projects/{}/tasks/".format(project.id), {
@ -339,7 +321,7 @@ class TestApiTask(BootTransactionTestCase):
task.refresh_from_db() task.refresh_from_db()
self.assertTrue(task.last_error is None) self.assertTrue(task.last_error is None)
scheduler.process_pending_tasks() worker.tasks.process_pending_tasks()
# Processing should fail and set an error # Processing should fail and set an error
task.refresh_from_db() task.refresh_from_db()
@ -356,20 +338,20 @@ class TestApiTask(BootTransactionTestCase):
self.assertTrue(task.pending_action == pending_actions.RESTART) self.assertTrue(task.pending_action == pending_actions.RESTART)
# After processing, the task should have restarted, and have no UUID or status # After processing, the task should have restarted, and have no UUID or status
scheduler.process_pending_tasks() worker.tasks.process_pending_tasks()
task.refresh_from_db() task.refresh_from_db()
self.assertTrue(task.status is None) self.assertTrue(task.status is None)
self.assertTrue(len(task.uuid) == 0) self.assertTrue(len(task.uuid) == 0)
# Another step and it should have acquired a UUID # Another step and it should have acquired a UUID
scheduler.process_pending_tasks() worker.tasks.process_pending_tasks()
task.refresh_from_db() task.refresh_from_db()
self.assertTrue(task.status in [status_codes.RUNNING, status_codes.COMPLETED]) self.assertTrue(task.status in [status_codes.RUNNING, status_codes.COMPLETED])
self.assertTrue(len(task.uuid) > 0) self.assertTrue(len(task.uuid) > 0)
# Another step and it should be completed # Another step and it should be completed
time.sleep(DELAY) time.sleep(DELAY)
scheduler.process_pending_tasks() worker.tasks.process_pending_tasks()
task.refresh_from_db() task.refresh_from_db()
self.assertTrue(task.status == status_codes.COMPLETED) self.assertTrue(task.status == status_codes.COMPLETED)
@ -392,7 +374,7 @@ class TestApiTask(BootTransactionTestCase):
# 4. Check that the rerun_from parameter has been cleared # 4. Check that the rerun_from parameter has been cleared
# but the other parameters are still set # but the other parameters are still set
scheduler.process_pending_tasks() worker.tasks.process_pending_tasks()
task.refresh_from_db() task.refresh_from_db()
self.assertTrue(len(task.uuid) == 0) self.assertTrue(len(task.uuid) == 0)
self.assertTrue(len(list(filter(lambda d: d['name'] == 'rerun-from', task.options))) == 0) self.assertTrue(len(list(filter(lambda d: d['name'] == 'rerun-from', task.options))) == 0)
@ -403,7 +385,7 @@ class TestApiTask(BootTransactionTestCase):
raise requests.exceptions.ConnectTimeout("Simulated timeout") raise requests.exceptions.ConnectTimeout("Simulated timeout")
testWatch.intercept("nodeodm.api_client.task_output", connTimeout) testWatch.intercept("nodeodm.api_client.task_output", connTimeout)
scheduler.process_pending_tasks() worker.tasks.process_pending_tasks()
# Timeout errors should be handled by retrying again at a later time # Timeout errors should be handled by retrying again at a later time
# and not fail # and not fail
@ -439,9 +421,9 @@ class TestApiTask(BootTransactionTestCase):
}, format="multipart") }, format="multipart")
self.assertTrue(res.status_code == status.HTTP_201_CREATED) self.assertTrue(res.status_code == status.HTTP_201_CREATED)
scheduler.process_pending_tasks() worker.tasks.process_pending_tasks()
time.sleep(DELAY) time.sleep(DELAY)
scheduler.process_pending_tasks() worker.tasks.process_pending_tasks()
task = Task.objects.get(pk=res.data['id']) task = Task.objects.get(pk=res.data['id'])
self.assertTrue(task.status == status_codes.COMPLETED) self.assertTrue(task.status == status_codes.COMPLETED)
@ -491,7 +473,7 @@ class TestApiTask(BootTransactionTestCase):
task.last_error = "Test error" task.last_error = "Test error"
task.save() task.save()
scheduler.process_pending_tasks() worker.tasks.process_pending_tasks()
# A processing node should not have been assigned # A processing node should not have been assigned
task.refresh_from_db() task.refresh_from_db()
@ -501,7 +483,7 @@ class TestApiTask(BootTransactionTestCase):
task.last_error = None task.last_error = None
task.save() task.save()
scheduler.process_pending_tasks() worker.tasks.process_pending_tasks()
# A processing node should not have been assigned because no processing nodes are online # A processing node should not have been assigned because no processing nodes are online
task.refresh_from_db() task.refresh_from_db()
@ -513,7 +495,7 @@ class TestApiTask(BootTransactionTestCase):
self.assertTrue(pnode.is_online()) self.assertTrue(pnode.is_online())
# A processing node has been assigned # A processing node has been assigned
scheduler.process_pending_tasks() worker.tasks.process_pending_tasks()
task.refresh_from_db() task.refresh_from_db()
self.assertTrue(task.processing_node.id == pnode.id) self.assertTrue(task.processing_node.id == pnode.id)
@ -532,13 +514,13 @@ class TestApiTask(BootTransactionTestCase):
task.status = None task.status = None
task.save() task.save()
scheduler.process_pending_tasks() worker.tasks.process_pending_tasks()
# Processing node is now cleared and a new one will be assigned on the next tick # Processing node is now cleared and a new one will be assigned on the next tick
task.refresh_from_db() task.refresh_from_db()
self.assertTrue(task.processing_node is None) self.assertTrue(task.processing_node is None)
scheduler.process_pending_tasks() worker.tasks.process_pending_tasks()
task.refresh_from_db() task.refresh_from_db()
self.assertTrue(task.processing_node.id == another_pnode.id) self.assertTrue(task.processing_node.id == another_pnode.id)
@ -554,7 +536,7 @@ class TestApiTask(BootTransactionTestCase):
pnode.save() pnode.save()
self.assertTrue(pnode.is_online()) self.assertTrue(pnode.is_online())
scheduler.process_pending_tasks() worker.tasks.process_pending_tasks()
# A processing node should not have been assigned because we asked # A processing node should not have been assigned because we asked
# not to via auto_processing_node = false # not to via auto_processing_node = false

Wyświetl plik

@ -4,7 +4,6 @@ from rest_framework import status
from app.models import Project, Task from app.models import Project, Task
from .classes import BootTestCase from .classes import BootTestCase
from app import scheduler
from django.core.exceptions import ValidationError from django.core.exceptions import ValidationError
class TestApp(BootTestCase): class TestApp(BootTestCase):
@ -199,9 +198,3 @@ class TestApp(BootTestCase):
task.options = [{'name': 'test', 'value': 1}, {"invalid": 1}] task.options = [{'name': 'test', 'value': 1}, {"invalid": 1}]
self.assertRaises(ValidationError, task.save) self.assertRaises(ValidationError, task.save)
def test_worker(self):
self.assertTrue(True) # TODO!!!

Wyświetl plik

@ -1,11 +1,9 @@
from django.contrib.auth.models import User, Group from django.contrib.auth.models import User
from django.test import Client from django.test import Client
from rest_framework import status
from app.models import Project, Task from app.models import Project
from .classes import BootTestCase from .classes import BootTestCase
from app import scheduler
from django.core.exceptions import ValidationError
class TestWelcome(BootTestCase): class TestWelcome(BootTestCase):

Wyświetl plik

@ -0,0 +1,51 @@
import worker
from app import pending_actions
from app.models import Project
from app.models import Task
from nodeodm.models import ProcessingNode
from .classes import BootTestCase
from .utils import start_processing_node
class TestWelcome(BootTestCase):
def setUp(self):
super().setUp()
def tearDown(self):
pass
def test_worker_tasks(self):
project = Project.objects.get(name="User Test Project")
pnode = ProcessingNode.objects.create(hostname="localhost", port=11223)
self.assertTrue(pnode.api_version is None)
pnserver = start_processing_node()
worker.tasks.update_nodes_info()
pnode.refresh_from_db()
self.assertTrue(pnode.api_version is not None)
# Create task
task = Task.objects.create(project=project)
# Delete project
project.deleting = True
project.save()
worker.tasks.cleanup_projects()
# Task and project should still be here (since task still exists)
self.assertTrue(Task.objects.filter(pk=task.id).exists())
self.assertTrue(Project.objects.filter(pk=project.id).exists())
# Remove task
task.delete()
worker.tasks.cleanup_projects()
# Task and project should have been removed (now that task count is zero)
self.assertFalse(Task.objects.filter(pk=task.id).exists())
self.assertFalse(Project.objects.filter(pk=project.id).exists())
pnserver.terminate()

29
app/tests/utils.py 100644
Wyświetl plik

@ -0,0 +1,29 @@
import os
import shutil
import time
import subprocess
import logging
from webodm import settings
logger = logging.getLogger('app.logger')
def start_processing_node(*args):
current_dir = os.path.dirname(os.path.realpath(__file__))
node_odm = subprocess.Popen(['node', 'index.js', '--port', '11223', '--test'] + list(args), shell=False,
cwd=os.path.join(current_dir, "..", "..", "nodeodm", "external", "node-OpenDroneMap"))
time.sleep(2) # Wait for the server to launch
return node_odm
# We need to clear previous media_root content
# This points to the test directory, but just in case
# we double check that the directory is indeed a test directory
def clear_test_media_root():
if "_test" in settings.MEDIA_ROOT:
if os.path.exists(settings.MEDIA_ROOT):
logger.info("Cleaning up {}".format(settings.MEDIA_ROOT))
shutil.rmtree(settings.MEDIA_ROOT)
else:
logger.warning("We did not remove MEDIA_ROOT because we couldn't find a _test suffix in its path.")

Wyświetl plik

@ -100,19 +100,31 @@ class SharedTestWatch(TestWatch):
""" """
:param redis_url same as celery broker URL, for ex. redis://localhost:1234 :param redis_url same as celery broker URL, for ex. redis://localhost:1234
""" """
def needs_redis(func):
# Lazy evaluator for redis instance
def wrapper(self, *args):
if not hasattr(self, 'r'):
self.r = redis.from_url(self.redis_url)
return func(self, *args)
return wrapper
def __init__(self, redis_url): def __init__(self, redis_url):
self.r = redis.from_url(redis_url) self.redis_url = redis_url
super().__init__() super().__init__()
@needs_redis
def clear(self): def clear(self):
self.r.delete('testwatch:calls', 'testwatch:intercept_list') self.r.delete('testwatch:calls', 'testwatch:intercept_list')
@needs_redis
def intercept(self, fname, f = None): def intercept(self, fname, f = None):
self.r.hmset('testwatch:intercept_list', {fname: marshal.dumps(f.__code__) if f is not None else 1}) self.r.hmset('testwatch:intercept_list', {fname: marshal.dumps(f.__code__) if f is not None else 1})
@needs_redis
def intercept_list_has(self, fname): def intercept_list_has(self, fname):
return self.r.hget('testwatch:intercept_list', fname) is not None return self.r.hget('testwatch:intercept_list', fname) is not None
@needs_redis
def execute_intercept_function_replacement(self, fname, *args, **kwargs): def execute_intercept_function_replacement(self, fname, *args, **kwargs):
if self.intercept_list_has(fname) and self.r.hget('testwatch:intercept_list', fname) != b'1': if self.intercept_list_has(fname) and self.r.hget('testwatch:intercept_list', fname) != b'1':
# Rebuild function # Rebuild function
@ -120,13 +132,19 @@ class SharedTestWatch(TestWatch):
f = types.FunctionType(marshal.loads(fcode), globals()) f = types.FunctionType(marshal.loads(fcode), globals())
f(*args, **kwargs) f(*args, **kwargs)
@needs_redis
def get_calls(self, fname): def get_calls(self, fname):
value = self.r.hget('testwatch:calls', fname) value = self.r.hget('testwatch:calls', fname)
if value is None: return [] if value is None: return []
else: else:
return json.loads(value.decode('utf-8')) return json.loads(value.decode('utf-8'))
@needs_redis
def set_calls(self, fname, value): def set_calls(self, fname, value):
self.r.hmset('testwatch:calls', {fname: json.dumps(value)}) self.r.hmset('testwatch:calls', {fname: json.dumps(value)})
testWatch = TestWatch() def watch(**kwargs):
return TestWatch.watch(tw=sharedTestWatch, **kwargs)
testWatch = TestWatch()
sharedTestWatch = SharedTestWatch(settings.CELERY_BROKER_URL)

Wyświetl plik

@ -32,6 +32,5 @@ urlpatterns += get_url_patterns()
# Test cases call boot() independently # Test cases call boot() independently
# Also don't execute boot with celery workers # Also don't execute boot with celery workers
celery_running = sys.argv[2:3] == ["worker"] if not settings.WORKER_RUNNING and not settings.TESTING:
if not celery_running and not settings.TESTING:
boot() boot()

Wyświetl plik

@ -11,7 +11,7 @@ from guardian.models import UserObjectPermissionBase
from .api_client import ApiClient from .api_client import ApiClient
import json import json
from django.db.models import signals from django.db.models import signals
from datetime import datetime, timedelta from datetime import timedelta
from .exceptions import ProcessingError, ProcessingTimeout from .exceptions import ProcessingError, ProcessingTimeout
import simplejson import simplejson

Wyświetl plik

@ -1,6 +1,6 @@
{ {
"name": "WebODM", "name": "WebODM",
"version": "0.4.2", "version": "0.5.0",
"description": "Open Source Drone Image Processing", "description": "Open Source Drone Image Processing",
"main": "index.js", "main": "index.js",
"scripts": { "scripts": {

Wyświetl plik

@ -1,6 +1,6 @@
{ {
"name": "Area/Length Measurements", "name": "Area/Length Measurements",
"webodmMinVersion": "0.4.2", "webodmMinVersion": "0.5.0",
"description": "A plugin to compute area and length measurements on Leaflet", "description": "A plugin to compute area and length measurements on Leaflet",
"version": "0.1.0", "version": "0.1.0",
"author": "Piero Toffanin", "author": "Piero Toffanin",

Wyświetl plik

@ -1,6 +1,6 @@
{ {
"name": "Volume Measurements", "name": "Volume Measurements",
"webodmMinVersion": "0.4.2", "webodmMinVersion": "0.5.0",
"description": "A plugin to compute volume measurements from a DSM", "description": "A plugin to compute volume measurements from a DSM",
"version": "0.1.0", "version": "0.1.0",
"author": "Piero Toffanin", "author": "Piero Toffanin",

Wyświetl plik

@ -45,6 +45,7 @@ with open(os.path.join(BASE_DIR, 'package.json')) as package_file:
VERSION = data['version'] VERSION = data['version']
TESTING = sys.argv[1:2] == ['test'] TESTING = sys.argv[1:2] == ['test']
WORKER_RUNNING = sys.argv[2:3] == ["worker"]
# SECURITY WARNING: don't run with debug turned on a public facing server! # SECURITY WARNING: don't run with debug turned on a public facing server!
DEBUG = os.environ.get('WO_DEBUG', 'YES') == 'YES' or TESTING DEBUG = os.environ.get('WO_DEBUG', 'YES') == 'YES' or TESTING

Wyświetl plik

@ -33,5 +33,12 @@ app.conf.beat_schedule = {
}, },
} }
# We need this for tests
@app.task(name='celery.ping')
def ping():
# type: () -> str
"""Simple task that just returns 'pong'."""
return 'pong'
if __name__ == '__main__': if __name__ == '__main__':
app.start() app.start()