From f97f6431cf3b7706f75fc05b5656e96cc9a39a8d Mon Sep 17 00:00:00 2001 From: Piero Toffanin Date: Mon, 24 Oct 2016 14:14:35 -0400 Subject: [PATCH] Added scheduler, automatically updating processing nodes, online filter for API --- app/api/processingnodes.py | 28 +++++++++- app/api/tasks.py | 6 ++- app/apps.py | 5 +- app/boot.py | 10 +++- app/scheduler.py | 54 +++++++++++++++++++ .../app/js/components/EditTaskPanel.jsx | 4 +- app/static/app/js/components/ProjectList.jsx | 3 +- .../app/js/components/ProjectListItem.jsx | 3 +- app/tests/classes.py | 8 ++- app/tests/test_api.py | 15 ++++++ app/tests/test_app.py | 8 +++ requirements.txt | 5 +- webodm/settings.py | 8 ++- 13 files changed, 145 insertions(+), 12 deletions(-) create mode 100644 app/scheduler.py diff --git a/app/api/processingnodes.py b/app/api/processingnodes.py index 8c84c59b..6e53e9f5 100644 --- a/app/api/processingnodes.py +++ b/app/api/processingnodes.py @@ -3,13 +3,36 @@ from rest_framework.response import Response from rest_framework.decorators import permission_classes from rest_framework.permissions import DjangoModelPermissions from rest_framework.filters import DjangoFilterBackend +from django_filters.rest_framework import FilterSet from nodeodm.models import ProcessingNode +import django_filters +from django.utils import timezone +from datetime import timedelta +from django.db.models import Q + class ProcessingNodeSerializer(serializers.ModelSerializer): class Meta: model = ProcessingNode fields = '__all__' +class ProcessingNodeFilter(FilterSet): + online = django_filters.MethodFilter() + + def filter_online(self, queryset, value): + online_threshold = timezone.now() - timedelta(minutes=5) + + if value.lower() in ['true', '1']: + return queryset.filter(last_refreshed__isnull=False, last_refreshed__gte=online_threshold) + elif value.lower() in ['false', '0']: + return queryset.filter(Q(last_refreshed__isnull=True) | Q(last_refreshed__lt=online_threshold)) + + return queryset + + class Meta: + model = ProcessingNode + fields = ['online', 'id', 'hostname', 'port', 'api_version', 'queue_count', ] + class ProcessingNodeViewSet(viewsets.ModelViewSet): """ Processing nodes available. Processing nodes are associated with @@ -18,7 +41,10 @@ class ProcessingNodeViewSet(viewsets.ModelViewSet): # Don't need a "view node" permission. If you are logged-in, you can view nodes. permission_classes = (DjangoModelPermissions, ) + filter_backends = (DjangoFilterBackend, ) + filter_class = ProcessingNodeFilter + pagination_class = None serializer_class = ProcessingNodeSerializer - queryset = ProcessingNode.objects.all() + queryset = ProcessingNode.objects.all() \ No newline at end of file diff --git a/app/api/tasks.py b/app/api/tasks.py index 68065041..1c439678 100644 --- a/app/api/tasks.py +++ b/app/api/tasks.py @@ -3,7 +3,7 @@ from django.core.exceptions import ObjectDoesNotExist from rest_framework import status, serializers, viewsets, filters, exceptions, permissions, parsers from rest_framework.response import Response from rest_framework.decorators import parser_classes, api_view -from app import models +from app import models, scheduler from nodeodm.models import ProcessingNode class TaskIDsSerializer(serializers.BaseSerializer): @@ -84,6 +84,10 @@ class TaskViewSet(viewsets.ViewSet): serializer = TaskSerializer(task, data=request.data, partial=partial) serializer.is_valid(raise_exception=True) serializer.save() + + # Call the scheduler (speed things up) + scheduler.process_pending_tasks(background=True) + return Response(serializer.data) def partial_update(self, request, *args, **kwargs): diff --git a/app/apps.py b/app/apps.py index 1d26e888..2471be8b 100644 --- a/app/apps.py +++ b/app/apps.py @@ -2,10 +2,13 @@ from __future__ import unicode_literals from django.apps import AppConfig from .boot import boot +from webodm import settings class MainConfig(AppConfig): name = 'app' verbose_name = 'Application' def ready(self): - boot() \ No newline at end of file + # Test cases call boot() independently + if not settings.TESTING: + boot() \ No newline at end of file diff --git a/app/boot.py b/app/boot.py index 721a6f8a..b0a3b9ca 100644 --- a/app/boot.py +++ b/app/boot.py @@ -3,8 +3,8 @@ def boot(): from django.contrib.auth.models import Permission from django.contrib.auth.models import User, Group from django.db.utils import ProgrammingError - from . import signals - import logging + from . import signals, scheduler + import logging, os logger = logging.getLogger('app.logger') @@ -26,3 +26,9 @@ def boot(): logger.info("Created superuser") except ProgrammingError: logger.warn("Could not create default group/user. If running a migration, this is expected.") + + # Run only on the main runserver process + # (do not start again on the auto-reloader process) + if os.environ.get('RUN_MAIN') != 'true': + # Setup and start scheduler + scheduler.setup() \ No newline at end of file diff --git a/app/scheduler.py b/app/scheduler.py new file mode 100644 index 00000000..5b213ab2 --- /dev/null +++ b/app/scheduler.py @@ -0,0 +1,54 @@ +import logging +from apscheduler.schedulers.background import BackgroundScheduler +from apscheduler.schedulers import SchedulerAlreadyRunningError, SchedulerNotRunningError +import threading +from nodeodm.models import ProcessingNode +from app.models import Task +import random + +logger = logging.getLogger('app.logger') +scheduler = None + +# Adds background={True|False} param to any function +# So that we can call update_nodes_info(background=True) from the outside +def job(func): + def wrapper(*args,**kwargs): + if (kwargs.get('background', False)): + thread = (threading.Thread(target=func)) + thread.start() + return thread + else: + return func(*args, **kwargs) + return wrapper + +@job +def update_nodes_info(): + processing_nodes = ProcessingNode.objects.all() + for processing_node in processing_nodes: + processing_node.update_node_info() + +@job +def process_pending_tasks(): + tasks = Task.objects.filter(uuid=None).exclude(processing_node=None) + for task in tasks: + print("Need to process: {}".format(task)) + +def setup(): + global scheduler + + logger.info("Starting background scheduler...") + try: + scheduler = BackgroundScheduler() + scheduler.start() + scheduler.add_job(update_nodes_info, 'interval', seconds=30) + scheduler.add_job(process_pending_tasks, 'interval', seconds=15) + except SchedulerAlreadyRunningError: + logger.warn("Scheduler already running (this is OK while testing)") + +def teardown(): + if scheduler != None: + logger.info("Stopping scheduler...") + try: + scheduler.shutdown(wait=False) + except SchedulerNotRunningError: + logger.warn("Scheduler not running") diff --git a/app/static/app/js/components/EditTaskPanel.jsx b/app/static/app/js/components/EditTaskPanel.jsx index 0c8dadf0..b98db466 100644 --- a/app/static/app/js/components/EditTaskPanel.jsx +++ b/app/static/app/js/components/EditTaskPanel.jsx @@ -36,9 +36,9 @@ class EditTaskPanel extends React.Component { setTimeout(loadProcessingNodes, 1000); } - this.nodesRequest = $.getJSON("/api/processingnodes/", json => { + this.nodesRequest = + $.getJSON("/api/processingnodes/?online=True", json => { if (Array.isArray(json)){ - let nodes = json.map(node => { return { id: node.id, diff --git a/app/static/app/js/components/ProjectList.jsx b/app/static/app/js/components/ProjectList.jsx index c6bec728..85fe6aba 100644 --- a/app/static/app/js/components/ProjectList.jsx +++ b/app/static/app/js/components/ProjectList.jsx @@ -16,7 +16,8 @@ class ProjectList extends React.Component { componentDidMount(){ // Load projects from API - this.serverRequest = $.getJSON(this.props.source, json => { + this.serverRequest = + $.getJSON(this.props.source, json => { if (json.results){ this.setState({ projects: json.results, diff --git a/app/static/app/js/components/ProjectListItem.jsx b/app/static/app/js/components/ProjectListItem.jsx index 07040959..e354f85b 100644 --- a/app/static/app/js/components/ProjectListItem.jsx +++ b/app/static/app/js/components/ProjectListItem.jsx @@ -133,7 +133,8 @@ class ProjectListItem extends React.Component { this.setUploadState({showEditTask: false}); this.setState({updatingTask: true}); - this.updateTaskRequest = $.ajax({ + this.updateTaskRequest = + $.ajax({ url: `/api/projects/${this.props.data.id}/tasks/${this.state.upload.taskId}/`, contentType: 'application/json', data: JSON.stringify({ diff --git a/app/tests/classes.py b/app/tests/classes.py index 3f89e63e..f3196aad 100644 --- a/app/tests/classes.py +++ b/app/tests/classes.py @@ -2,6 +2,7 @@ from django.test import TestCase from django.contrib.auth.models import User, Group from app.models import Project from app.boot import boot +from app import scheduler class BootTestCase(TestCase): ''' @@ -10,7 +11,7 @@ class BootTestCase(TestCase): module should derive from this class instead of TestCase. We don't use fixtures because we have signal initialization login - for some models, which doesn't play well with them, and this: http://blog.namis.me/2012/04/21/burn-your-fixtures/ + for some models, which doesn't play well with them. ''' @classmethod def setUpClass(cls): @@ -46,3 +47,8 @@ class BootTestCase(TestCase): boot() setupUsers() setupProjects() + + @classmethod + def tearDownClass(cls): + super(BootTestCase, cls).tearDownClass() + scheduler.teardown() diff --git a/app/tests/test_api.py b/app/tests/test_api.py index 27d4b1d3..b65e1d9e 100644 --- a/app/tests/test_api.py +++ b/app/tests/test_api.py @@ -137,11 +137,26 @@ class TestApi(BootTestCase): self.assertTrue(len(res.data) == 1) self.assertTrue(res.data[0]["hostname"] == "localhost") + # Can use filters + res = client.get('/api/processingnodes/?id={}'.format(pnode.id)) + self.assertEqual(res.status_code, status.HTTP_200_OK) + self.assertTrue(len(res.data) == 1) + + # Can filter online + res = client.get('/api/processingnodes/?online=true') + self.assertEqual(res.status_code, status.HTTP_200_OK) + self.assertTrue(len(res.data) == 0) + + res = client.get('/api/processingnodes/?online=false') + self.assertEqual(res.status_code, status.HTTP_200_OK) + self.assertTrue(len(res.data) == 1) + # Can get single processing node as normal user res = client.get('/api/processingnodes/{}/'.format(pnode.id)) self.assertEqual(res.status_code, status.HTTP_200_OK) self.assertTrue(res.data["hostname"] == "localhost") + # Cannot delete a processing node as normal user res = client.delete('/api/processingnodes/{}/'.format(pnode.id)) self.assertTrue(res.status_code, status.HTTP_403_FORBIDDEN) diff --git a/app/tests/test_app.py b/app/tests/test_app.py index cc4fc3ae..88a20187 100644 --- a/app/tests/test_app.py +++ b/app/tests/test_app.py @@ -4,6 +4,7 @@ from django.test import Client from app.models import Project, Task from .classes import BootTestCase +from app import scheduler class TestApp(BootTestCase): fixtures = ['test_processingnodes', ] @@ -119,3 +120,10 @@ class TestApp(BootTestCase): # Should not have permission self.assertFalse(anotherUser.has_perm("delete_project", p)) + + def test_scheduler(self): + # Can call update_nodes_info() + self.assertTrue(scheduler.update_nodes_info() == None) + + # Can call function in background + self.assertTrue(scheduler.update_nodes_info(background=True).join() == None) diff --git a/requirements.txt b/requirements.txt index f6255cc1..46223255 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,5 @@ anyjson==0.3.3 +APScheduler==3.2.0 attrs==16.2.0 bravado==8.3.0 bravado-core==4.5.0 @@ -6,7 +7,6 @@ cffi==1.8.3 crochet==1.5.0 cryptography==1.5 Django==1.10 -django-background-task==0.1.8 django-common-helpers==0.8.0 django-filter==0.15.3 django-guardian==1.4.6 @@ -15,7 +15,9 @@ djangorestframework==3.5.1 drf-nested-routers==0.11.1 enum34==1.1.6 fido==3.2.0 +funcsigs==1.0.2 functools32==3.2.3.post2 +futures==3.0.5 idna==2.1 ipaddress==1.0.17 jsonschema==2.5.1 @@ -35,6 +37,7 @@ simplejson==3.8.2 six==1.10.0 swagger-spec-validator==2.0.2 Twisted==16.4.1 +tzlocal==1.3 yelp-bytes==0.3.0 yelp-encodings==0.1.3 zope.interface==4.3.2 diff --git a/webodm/settings.py b/webodm/settings.py index c5f05ddc..c81b6a51 100644 --- a/webodm/settings.py +++ b/webodm/settings.py @@ -10,7 +10,7 @@ For the full list of settings and their values, see https://docs.djangoproject.com/en/1.10/ref/settings/ """ -import os +import os, sys from django.contrib.messages import constants as messages # Build paths inside the project like this: os.path.join(BASE_DIR, ...) @@ -181,6 +181,10 @@ LOGGING = { 'app.logger': { 'handlers': ['console'], 'level': 'INFO', + }, + 'apscheduler.executors.default': { + 'handlers': ['console'], + 'level': 'WARN', } } } @@ -212,6 +216,8 @@ REST_FRAMEWORK = { 'PAGE_SIZE': 10, } +TESTING = sys.argv[1:2] == ['test'] + try: from .local_settings import * except ImportError: