Removed scheduler code, background decorator, added Celery workers, migrated code, added SharedTestWatch

pull/384/head
Piero Toffanin 2018-02-15 16:23:29 -05:00
rodzic d9f4f1527c
commit 22c3e66c02
20 zmienionych plików z 256 dodań i 254 usunięć

1
.gitignore vendored
Wyświetl plik

@ -75,6 +75,7 @@ target/
# celery beat schedule file
celerybeat-schedule
celerybeat.pid
# dotenv
.env

Wyświetl plik

@ -17,8 +17,9 @@ from rest_framework.views import APIView
from nodeodm import status_codes
from .common import get_and_check_project, get_tile_json, path_traversal_check
from app import models, scheduler, pending_actions
from app import models, pending_actions
from nodeodm.models import ProcessingNode
from worker import tasks as worker_tasks
class TaskIDsSerializer(serializers.BaseSerializer):
@ -84,8 +85,8 @@ class TaskViewSet(viewsets.ViewSet):
task.last_error = None
task.save()
# Call the scheduler (speed things up)
scheduler.process_pending_tasks(background=True)
# Process pending tasks without waiting for the scheduler (speed things up)
worker_tasks.process_pending_tasks.delay()
return Response({'success': True})
@ -180,8 +181,8 @@ class TaskViewSet(viewsets.ViewSet):
serializer.is_valid(raise_exception=True)
serializer.save()
# Call the scheduler (speed things up)
scheduler.process_pending_tasks(background=True)
# Process pending tasks without waiting for the scheduler (speed things up)
worker_tasks.process_pending_tasks.delay()
return Response(serializer.data)

Wyświetl plik

@ -1,40 +0,0 @@
from threading import Thread
import logging
from django import db
from app.testwatch import testWatch
logger = logging.getLogger('app.logger')
def background(func):
"""
Adds background={True|False} param to any function
so that we can call update_nodes_info(background=True) from the outside
"""
def wrapper(*args,**kwargs):
background = kwargs.get('background', False)
if 'background' in kwargs: del kwargs['background']
if background:
if testWatch.hook_pre(func, *args, **kwargs): return
# Create a function that closes all
# db connections at the end of the thread
# This is necessary to make sure we don't leave
# open connections lying around.
def execute_and_close_db():
ret = None
try:
ret = func(*args, **kwargs)
finally:
db.connections.close_all()
testWatch.hook_post(func, *args, **kwargs)
return ret
t = Thread(target=execute_and_close_db)
t.daemon = True
t.start()
return t
else:
return func(*args, **kwargs)
return wrapper

Wyświetl plik

@ -7,13 +7,14 @@ from django.core.files import File
from django.db.utils import ProgrammingError
from guardian.shortcuts import assign_perm
from worker import tasks as worker_tasks
from app.models import Preset
from app.models import Theme
from app.plugins import register_plugins
from nodeodm.models import ProcessingNode
# noinspection PyUnresolvedReferences
from webodm.settings import MEDIA_ROOT
from . import scheduler, signals
from . import signals
import logging
from .models import Task, Setting
from webodm import settings
@ -22,7 +23,7 @@ from webodm.wsgi import booted
def boot():
# booted is a shared memory variable to keep track of boot status
# as multiple workers could trigger the boot sequence twice
# as multiple gunicorn workers could trigger the boot sequence twice
if not settings.DEBUG and booted.value: return
booted.value = True
@ -92,10 +93,7 @@ def boot():
register_plugins()
if not settings.TESTING:
# Setup and start scheduler
scheduler.setup()
scheduler.update_nodes_info(background=True)
worker_tasks.update_nodes_info.delay()
except ProgrammingError:
logger.warning("Could not touch the database. If running a migration, this is expected.")

Wyświetl plik

@ -32,7 +32,7 @@ class Project(models.Model):
super().delete(*args)
else:
# Need to remove all tasks before we can remove this project
# which will be deleted on the scheduler after pending actions
# which will be deleted by workers after pending actions
# have been completed
self.task_set.update(pending_action=pending_actions.REMOVE)
self.deleting = True

Wyświetl plik

@ -109,7 +109,7 @@ class Task(models.Model):
# mission
created_at = models.DateTimeField(default=timezone.now, help_text="Creation date")
pending_action = models.IntegerField(choices=PENDING_ACTIONS, db_index=True, null=True, blank=True, help_text="A requested action to be performed on the task. The selected action will be performed by the scheduler at the next iteration.")
pending_action = models.IntegerField(choices=PENDING_ACTIONS, db_index=True, null=True, blank=True, help_text="A requested action to be performed on the task. The selected action will be performed by the worker at the next iteration.")
public = models.BooleanField(default=False, help_text="A flag indicating whether this task is available to the public")
@ -221,7 +221,7 @@ class Task(models.Model):
def process(self):
"""
This method contains the logic for processing tasks asynchronously
from a background thread or from the scheduler. Here tasks that are
from a background thread or from a worker. Here tasks that are
ready to be processed execute some logic. This could be communication
with a processing node or executing a pending action.
"""

Wyświetl plik

@ -1,96 +0,0 @@
import logging
import traceback
from multiprocessing.dummy import Pool as ThreadPool
from threading import Lock
from apscheduler.schedulers import SchedulerAlreadyRunningError, SchedulerNotRunningError
from apscheduler.schedulers.background import BackgroundScheduler
from django import db
from django.db.models import Q, Count
from webodm import settings
from app.models import Task, Project
from nodeodm import status_codes
from nodeodm.models import ProcessingNode
from app.background import background
logger = logging.getLogger('app.logger')
scheduler = BackgroundScheduler({
'apscheduler.job_defaults.coalesce': 'true',
'apscheduler.job_defaults.max_instances': '3',
})
@background
def update_nodes_info():
processing_nodes = ProcessingNode.objects.all()
for processing_node in processing_nodes:
processing_node.update_node_info()
tasks_mutex = Lock()
@background
def process_pending_tasks():
tasks = []
try:
tasks_mutex.acquire()
# All tasks that have a processing node assigned
# Or that need one assigned (via auto)
# or tasks that need a status update
# or tasks that have a pending action
# and that are not locked (being processed by another thread)
tasks = Task.objects.filter(Q(processing_node__isnull=True, auto_processing_node=True) |
Q(Q(status=None) | Q(status__in=[status_codes.QUEUED, status_codes.RUNNING]), processing_node__isnull=False) |
Q(pending_action__isnull=False)).exclude(Q(processing_lock=True))
for task in tasks:
task.processing_lock = True
task.save()
finally:
tasks_mutex.release()
def process(task):
try:
task.process()
except Exception as e:
logger.error("Uncaught error! This is potentially bad. Please report it to http://github.com/OpenDroneMap/WebODM/issues: {} {}".format(e, traceback.format_exc()))
if settings.TESTING: raise e
finally:
# Might have been deleted
if task.pk is not None:
task.processing_lock = False
task.save()
db.connections.close_all()
if tasks.count() > 0:
pool = ThreadPool(tasks.count())
pool.map(process, tasks, chunksize=1)
pool.close()
pool.join()
def cleanup_projects():
# Delete all projects that are marked for deletion
# and that have no tasks left
total, count_dict = Project.objects.filter(deleting=True).annotate(
tasks_count=Count('task')
).filter(tasks_count=0).delete()
if total > 0 and 'app.Project' in count_dict:
logger.info("Deleted {} projects".format(count_dict['app.Project']))
def setup():
try:
scheduler.start()
scheduler.add_job(update_nodes_info, 'interval', seconds=30)
scheduler.add_job(process_pending_tasks, 'interval', seconds=5)
scheduler.add_job(cleanup_projects, 'interval', seconds=60)
except SchedulerAlreadyRunningError:
logger.warning("Scheduler already running (this is OK while testing)")
def teardown():
logger.info("Stopping scheduler...")
try:
scheduler.shutdown()
logger.info("Scheduler stopped")
except SchedulerNotRunningError:
logger.warning("Scheduler not running")

Wyświetl plik

@ -16,7 +16,6 @@ from rest_framework import status
from rest_framework.test import APIClient
from app import pending_actions
from app import scheduler
from django.utils import timezone
from app.models import Project, Task, ImageUpload
from app.models.task import task_directory_path, full_task_directory_path

Wyświetl plik

@ -201,15 +201,7 @@ class TestApp(BootTestCase):
self.assertRaises(ValidationError, task.save)
def test_scheduler(self):
self.assertTrue(scheduler.setup() is None)
# Can call update_nodes_info()
self.assertTrue(scheduler.update_nodes_info() is None)
# Can call function in background
self.assertTrue(scheduler.update_nodes_info(background=True).join() is None)
self.assertTrue(scheduler.teardown() is None)
def test_worker(self):
self.assertTrue(True) # TODO!!!

Wyświetl plik

@ -1,6 +1,7 @@
from django.test import TestCase
from webodm.settings import CELERY_BROKER_URL
from app.testwatch import TestWatch
from app.testwatch import TestWatch, SharedTestWatch
def test(a, b):
@ -8,50 +9,51 @@ def test(a, b):
class TestTestWatch(TestCase):
def test_methods(self):
tw = TestWatch()
self.assertTrue(tw.get_calls_count("app.tests.test_testwatch.test") == 0)
self.assertTrue(tw.get_calls_count("app.tests.test_testwatch.nonexistent") == 0)
def test_watch_instance(tw):
self.assertTrue(tw.get_calls_count("app.tests.test_testwatch.test") == 0)
self.assertTrue(tw.get_calls_count("app.tests.test_testwatch.nonexistent") == 0)
# Test watch count
tw.hook_pre(test, 1, 2)
test(1, 2)
tw.hook_post(test, 1, 2)
# Test watch count
tw.hook_pre(test, 1, 2)
test(1, 2)
tw.hook_post(test, 1, 2)
self.assertTrue(tw.get_calls_count("app.tests.test_testwatch.test") == 1)
self.assertTrue(tw.get_calls_count("app.tests.test_testwatch.test") == 1)
tw.hook_pre(test, 1, 2)
test(1, 2)
tw.hook_post(test, 1, 2)
tw.hook_pre(test, 1, 2)
test(1, 2)
tw.hook_post(test, 1, 2)
self.assertTrue(tw.get_calls_count("app.tests.test_testwatch.test") == 2)
self.assertTrue(tw.get_calls_count("app.tests.test_testwatch.test") == 2)
@TestWatch.watch(testWatch=tw)
def test2(d):
d['flag'] = not d['flag']
@TestWatch.watch(testWatch=tw)
def test2(d):
d['flag'] = not d['flag']
# Test intercept
tw.intercept("app.tests.test_testwatch.test2")
d = {'flag': True}
test2(d)
self.assertTrue(d['flag'])
# Test intercept
tw.intercept("app.tests.test_testwatch.test2")
d = {'flag': True}
test2(d)
self.assertTrue(d['flag'])
# Test function replacement intercept
d = {
'a': False,
'b': False
}
@TestWatch.watch(testWatch=tw)
def test3(d):
d['a'] = True
# Test function replacement intercept
d = {
'a': False,
'b': False
}
@TestWatch.watch(testWatch=tw)
def test3(d):
d['a'] = True
def replacement(d):
d['b'] = True
tw.intercept("app.tests.test_testwatch.test3", replacement)
test3(d)
self.assertFalse(d['a'])
self.assertTrue(d['b'])
def replacement(d):
d['b'] = True
tw.intercept("app.tests.test_testwatch.test3", replacement)
test3(d)
self.assertFalse(d['a'])
self.assertTrue(d['b'])
test_watch_instance(TestWatch())
test_watch_instance(SharedTestWatch(CELERY_BROKER_URL))

Wyświetl plik

@ -1,7 +1,12 @@
import time
import time, redis
import logging
import marshal
import types
import json
from webodm import settings
logger = logging.getLogger('app.logger')
@ -10,26 +15,32 @@ class TestWatch:
def __init__(self):
self.clear()
def func_to_name(f):
return "{}.{}".format(f.__module__, f.__name__)
def clear(self):
self._calls = {}
self._intercept_list = {}
def func_to_name(f):
return "{}.{}".format(f.__module__, f.__name__)
def intercept(self, fname, f = None):
self._intercept_list[fname] = f if f is not None else True
def execute_intercept_function_replacement(self, fname, *args, **kwargs):
if fname in self._intercept_list and callable(self._intercept_list[fname]):
(self._intercept_list[fname])(*args, **kwargs)
def intercept_list_has(self, fname):
return fname in self._intercept_list
def should_prevent_execution(self, func):
return TestWatch.func_to_name(func) in self._intercept_list
def execute_intercept_function_replacement(self, fname, *args, **kwargs):
if self.intercept_list_has(fname) and callable(self._intercept_list[fname]):
(self._intercept_list[fname])(*args, **kwargs)
def get_calls(self, fname):
return self._calls[fname] if fname in self._calls else []
def set_calls(self, fname, value):
self._calls[fname] = value
def should_prevent_execution(self, func):
return self.intercept_list_has(TestWatch.func_to_name(func))
def get_calls_count(self, fname):
return len(self.get_calls(fname))
@ -49,9 +60,9 @@ class TestWatch:
def log_call(self, func, *args, **kwargs):
fname = TestWatch.func_to_name(func)
logger.info("{} called".format(fname))
list = self._calls[fname] if fname in self._calls else []
list = self.get_calls(fname)
list.append({'f': fname, 'args': args, 'kwargs': kwargs})
self._calls[fname] = list
self.set_calls(fname, list)
def hook_pre(self, func, *args, **kwargs):
if settings.TESTING and self.should_prevent_execution(func):
@ -80,4 +91,42 @@ class TestWatch:
return wrapper
return outer
"""
Redis-backed test watch
suitable for cross-machine/cross-process
test watching
"""
class SharedTestWatch(TestWatch):
"""
:param redis_url same as celery broker URL, for ex. redis://localhost:1234
"""
def __init__(self, redis_url):
self.r = redis.from_url(redis_url)
super().__init__()
def clear(self):
self.r.delete('testwatch:calls', 'testwatch:intercept_list')
def intercept(self, fname, f = None):
self.r.hmset('testwatch:intercept_list', {fname: marshal.dumps(f.__code__) if f is not None else 1})
def intercept_list_has(self, fname):
return self.r.hget('testwatch:intercept_list', fname) is not None
def execute_intercept_function_replacement(self, fname, *args, **kwargs):
if self.intercept_list_has(fname) and self.r.hget('testwatch:intercept_list', fname) != b'1':
# Rebuild function
fcode = self.r.hget('testwatch:intercept_list', fname)
f = types.FunctionType(marshal.loads(fcode), globals())
f(*args, **kwargs)
def get_calls(self, fname):
value = self.r.hget('testwatch:calls', fname)
if value is None: return []
else:
return json.loads(value.decode('utf-8'))
def set_calls(self, fname, value):
self.r.hmset('testwatch:calls', {fname: json.dumps(value)})
testWatch = TestWatch()

Wyświetl plik

@ -1,3 +1,4 @@
import sys
from django.conf.urls import url, include
from .views import app as app_views, public as public_views
@ -30,5 +31,7 @@ urlpatterns = [
urlpatterns += get_url_patterns()
# Test cases call boot() independently
if not settings.TESTING:
# Also don't execute boot with celery workers
celery_running = sys.argv[2:3] == ["worker"]
if not celery_running and not settings.TESTING:
boot()

Wyświetl plik

@ -37,6 +37,8 @@ services:
image: opendronemap/webodm_worker
container_name: worker
entrypoint: /bin/bash -c \"/webodm/wait-for-it.sh broker:6379 -- /broker/start.sh\""
volumes:
- ${WO_MEDIA_DIR}:/worker/app/media
depends_on:
- broker
environment:

Wyświetl plik

@ -313,6 +313,16 @@ LIBSASS_CUSTOM_FUNCTIONS = {
'scalebyiv': scalebyiv
}
# Celery
CELERY_BROKER_URL = os.environ.get('WO_BROKER', 'redis://localhost')
CELERY_RESULT_BACKEND = os.environ.get('WO_BROKER', 'redis://localhost')
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_ACCEPT_CONTENT = ['json']
CELERY_INCLUDE=['worker.tasks']
if TESTING:
MEDIA_ROOT = os.path.join(BASE_DIR, 'app', 'media_test')

Wyświetl plik

@ -3,6 +3,16 @@ set -eo pipefail
__dirname=$(cd $(dirname "$0"); pwd -P)
cd ${__dirname}
usage(){
echo "Usage: $0 <command>"
echo
echo "This program manages the background worker processes. WebODM requires at least one background process worker to be running at all times."
echo
echo "Command list:"
echo " start Start background worker"
exit
}
check_command(){
check_msg_prefix="Checking for $1... "
check_msg_result="\033[92m\033[1m OK\033[0m\033[39m"
@ -36,11 +46,17 @@ environment_check(){
fi
}
environment_check
echo "Starting worker using broker at $WO_BROKER"
# Switch to parent directory
# so that celery recognizes the package name
cd ${__dirname}/../
start(){
action=$1
celery -A worker worker --loglevel=info
echo "Starting worker using broker at $WO_BROKER"
celery -A worker worker --loglevel=info
}
if [[ $1 = "start" ]]; then
environment_check
start
else
usage
fi

Wyświetl plik

@ -1,21 +0,0 @@
FROM ubuntu:16.04
MAINTAINER Piero Toffanin <pt@masseranolabs.com>
WO_BROKER=redis://broker
RUN apt-get update && \
apt-get install -y software-properties-common && \
add-apt-repository -y ppa:ubuntugis/ubuntugis-unstable && \
apt-get install update && \
apt-get install -y grass-core python-pip
COPY requirements.txt /worker/
COPY ../wait-for-it.sh /worker/
WORKDIR /worker
RUN pip install -U pip && pip install -r requirements.txt
RUN apt-get clean && \
rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/*
RUN chmod 644 /docker-entrypoint-initdb.d/init-db.sql

Wyświetl plik

@ -1,8 +1,37 @@
from celery import Celery
import os
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'webodm.settings')
app = Celery('tasks')
app.config_from_object('worker.celeryconfig');
app.config_from_object('django.conf:settings', namespace='CELERY')
app.conf.beat_schedule = {
'update-nodes-info': {
'task': 'worker.tasks.update_nodes_info',
'schedule': 30,
'options': {
'expires': 14,
'retry': False
}
},
'cleanup-projects': {
'task': 'worker.tasks.cleanup_projects',
'schedule': 60,
'options': {
'expires': 29,
'retry': False
}
},
'process-pending-tasks': {
'task': 'worker.tasks.process_pending_tasks',
'schedule': 5,
'options': {
'expires': 2,
'retry': False
}
},
}
if __name__ == '__main__':
app.start()

Wyświetl plik

@ -1,9 +0,0 @@
import os
broker_url = os.environ.get('WO_BROKER', 'redis://localhost')
result_backend = os.environ.get('WO_BROKER', 'redis://localhost')
task_serializer = 'json'
result_serializer = 'json'
accept_content = ['json']
include=['worker.tasks']

Wyświetl plik

@ -1,2 +0,0 @@
celery
redis

Wyświetl plik

@ -1,15 +1,83 @@
import traceback
from django.core.exceptions import ObjectDoesNotExist
from django.db.models import Count
from django.db.models import Q
from app.models import Project
from app.models import Task
from webodm import settings
from nodeodm import status_codes
from nodeodm.models import ProcessingNode
from .celery import app
from celery.utils.log import get_task_logger
from django.db import transaction
logger = get_task_logger(__name__)
@app.task
def add(x, y):
return x + y
def update_nodes_info():
processing_nodes = ProcessingNode.objects.all()
for processing_node in processing_nodes:
processing_node.update_node_info()
@app.task
def mul(x, y):
return x * y
def cleanup_projects():
# Delete all projects that are marked for deletion
# and that have no tasks left
total, count_dict = Project.objects.filter(deleting=True).annotate(
tasks_count=Count('task')
).filter(tasks_count=0).delete()
if total > 0 and 'app.Project' in count_dict:
logger.info("Deleted {} projects".format(count_dict['app.Project']))
@app.task
def xsum(numbers):
return sum(numbers)
def process_task(taskId):
# TODO: would a redis lock perform better here?
with transaction.atomic():
try:
task = Task.objects.filter(pk=taskId).select_for_update().get()
except ObjectDoesNotExist:
logger.info("Task id {} has already been deleted.".format(taskId))
return
if not task.processing_lock:
task.processing_lock = True
task.save()
else:
return # Another worker beat us to it
try:
task.process()
except Exception as e:
logger.error(
"Uncaught error! This is potentially bad. Please report it to http://github.com/OpenDroneMap/WebODM/issues: {} {}".format(
e, traceback.format_exc()))
if settings.TESTING: raise e
finally:
# Might have been deleted
if task.pk is not None:
task.processing_lock = False
task.save()
@app.task
def process_pending_tasks():
# All tasks that have a processing node assigned
# Or that need one assigned (via auto)
# or tasks that need a status update
# or tasks that have a pending action
# and that are not locked (being processed by another thread)
qs = Task.objects.filter(Q(processing_node__isnull=True, auto_processing_node=True) |
Q(Q(status=None) | Q(status__in=[status_codes.QUEUED, status_codes.RUNNING]),
processing_node__isnull=False) |
Q(pending_action__isnull=False)).exclude(Q(processing_lock=True))
tasks = list(qs)
if len(qs) > 0:
for task in tasks:
process_task.delay(task.id)