Added scheduler, automatically updating processing nodes, online filter for API

pull/39/head
Piero Toffanin 2016-10-24 14:14:35 -04:00
rodzic 0e59155a0f
commit f97f6431cf
13 zmienionych plików z 145 dodań i 12 usunięć

Wyświetl plik

@ -3,13 +3,36 @@ from rest_framework.response import Response
from rest_framework.decorators import permission_classes
from rest_framework.permissions import DjangoModelPermissions
from rest_framework.filters import DjangoFilterBackend
from django_filters.rest_framework import FilterSet
from nodeodm.models import ProcessingNode
import django_filters
from django.utils import timezone
from datetime import timedelta
from django.db.models import Q
class ProcessingNodeSerializer(serializers.ModelSerializer):
class Meta:
model = ProcessingNode
fields = '__all__'
class ProcessingNodeFilter(FilterSet):
online = django_filters.MethodFilter()
def filter_online(self, queryset, value):
online_threshold = timezone.now() - timedelta(minutes=5)
if value.lower() in ['true', '1']:
return queryset.filter(last_refreshed__isnull=False, last_refreshed__gte=online_threshold)
elif value.lower() in ['false', '0']:
return queryset.filter(Q(last_refreshed__isnull=True) | Q(last_refreshed__lt=online_threshold))
return queryset
class Meta:
model = ProcessingNode
fields = ['online', 'id', 'hostname', 'port', 'api_version', 'queue_count', ]
class ProcessingNodeViewSet(viewsets.ModelViewSet):
"""
Processing nodes available. Processing nodes are associated with
@ -18,7 +41,10 @@ class ProcessingNodeViewSet(viewsets.ModelViewSet):
# Don't need a "view node" permission. If you are logged-in, you can view nodes.
permission_classes = (DjangoModelPermissions, )
filter_backends = (DjangoFilterBackend, )
filter_class = ProcessingNodeFilter
pagination_class = None
serializer_class = ProcessingNodeSerializer
queryset = ProcessingNode.objects.all()

Wyświetl plik

@ -3,7 +3,7 @@ from django.core.exceptions import ObjectDoesNotExist
from rest_framework import status, serializers, viewsets, filters, exceptions, permissions, parsers
from rest_framework.response import Response
from rest_framework.decorators import parser_classes, api_view
from app import models
from app import models, scheduler
from nodeodm.models import ProcessingNode
class TaskIDsSerializer(serializers.BaseSerializer):
@ -84,6 +84,10 @@ class TaskViewSet(viewsets.ViewSet):
serializer = TaskSerializer(task, data=request.data, partial=partial)
serializer.is_valid(raise_exception=True)
serializer.save()
# Call the scheduler (speed things up)
scheduler.process_pending_tasks(background=True)
return Response(serializer.data)
def partial_update(self, request, *args, **kwargs):

Wyświetl plik

@ -2,10 +2,13 @@ from __future__ import unicode_literals
from django.apps import AppConfig
from .boot import boot
from webodm import settings
class MainConfig(AppConfig):
name = 'app'
verbose_name = 'Application'
def ready(self):
# Test cases call boot() independently
if not settings.TESTING:
boot()

Wyświetl plik

@ -3,8 +3,8 @@ def boot():
from django.contrib.auth.models import Permission
from django.contrib.auth.models import User, Group
from django.db.utils import ProgrammingError
from . import signals
import logging
from . import signals, scheduler
import logging, os
logger = logging.getLogger('app.logger')
@ -26,3 +26,9 @@ def boot():
logger.info("Created superuser")
except ProgrammingError:
logger.warn("Could not create default group/user. If running a migration, this is expected.")
# Run only on the main runserver process
# (do not start again on the auto-reloader process)
if os.environ.get('RUN_MAIN') != 'true':
# Setup and start scheduler
scheduler.setup()

54
app/scheduler.py 100644
Wyświetl plik

@ -0,0 +1,54 @@
import logging
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.schedulers import SchedulerAlreadyRunningError, SchedulerNotRunningError
import threading
from nodeodm.models import ProcessingNode
from app.models import Task
import random
logger = logging.getLogger('app.logger')
scheduler = None
# Adds background={True|False} param to any function
# So that we can call update_nodes_info(background=True) from the outside
def job(func):
def wrapper(*args,**kwargs):
if (kwargs.get('background', False)):
thread = (threading.Thread(target=func))
thread.start()
return thread
else:
return func(*args, **kwargs)
return wrapper
@job
def update_nodes_info():
processing_nodes = ProcessingNode.objects.all()
for processing_node in processing_nodes:
processing_node.update_node_info()
@job
def process_pending_tasks():
tasks = Task.objects.filter(uuid=None).exclude(processing_node=None)
for task in tasks:
print("Need to process: {}".format(task))
def setup():
global scheduler
logger.info("Starting background scheduler...")
try:
scheduler = BackgroundScheduler()
scheduler.start()
scheduler.add_job(update_nodes_info, 'interval', seconds=30)
scheduler.add_job(process_pending_tasks, 'interval', seconds=15)
except SchedulerAlreadyRunningError:
logger.warn("Scheduler already running (this is OK while testing)")
def teardown():
if scheduler != None:
logger.info("Stopping scheduler...")
try:
scheduler.shutdown(wait=False)
except SchedulerNotRunningError:
logger.warn("Scheduler not running")

Wyświetl plik

@ -36,9 +36,9 @@ class EditTaskPanel extends React.Component {
setTimeout(loadProcessingNodes, 1000);
}
this.nodesRequest = $.getJSON("/api/processingnodes/", json => {
this.nodesRequest =
$.getJSON("/api/processingnodes/?online=True", json => {
if (Array.isArray(json)){
let nodes = json.map(node => {
return {
id: node.id,

Wyświetl plik

@ -16,7 +16,8 @@ class ProjectList extends React.Component {
componentDidMount(){
// Load projects from API
this.serverRequest = $.getJSON(this.props.source, json => {
this.serverRequest =
$.getJSON(this.props.source, json => {
if (json.results){
this.setState({
projects: json.results,

Wyświetl plik

@ -133,7 +133,8 @@ class ProjectListItem extends React.Component {
this.setUploadState({showEditTask: false});
this.setState({updatingTask: true});
this.updateTaskRequest = $.ajax({
this.updateTaskRequest =
$.ajax({
url: `/api/projects/${this.props.data.id}/tasks/${this.state.upload.taskId}/`,
contentType: 'application/json',
data: JSON.stringify({

Wyświetl plik

@ -2,6 +2,7 @@ from django.test import TestCase
from django.contrib.auth.models import User, Group
from app.models import Project
from app.boot import boot
from app import scheduler
class BootTestCase(TestCase):
'''
@ -10,7 +11,7 @@ class BootTestCase(TestCase):
module should derive from this class instead of TestCase.
We don't use fixtures because we have signal initialization login
for some models, which doesn't play well with them, and this: http://blog.namis.me/2012/04/21/burn-your-fixtures/
for some models, which doesn't play well with them.
'''
@classmethod
def setUpClass(cls):
@ -46,3 +47,8 @@ class BootTestCase(TestCase):
boot()
setupUsers()
setupProjects()
@classmethod
def tearDownClass(cls):
super(BootTestCase, cls).tearDownClass()
scheduler.teardown()

Wyświetl plik

@ -137,11 +137,26 @@ class TestApi(BootTestCase):
self.assertTrue(len(res.data) == 1)
self.assertTrue(res.data[0]["hostname"] == "localhost")
# Can use filters
res = client.get('/api/processingnodes/?id={}'.format(pnode.id))
self.assertEqual(res.status_code, status.HTTP_200_OK)
self.assertTrue(len(res.data) == 1)
# Can filter online
res = client.get('/api/processingnodes/?online=true')
self.assertEqual(res.status_code, status.HTTP_200_OK)
self.assertTrue(len(res.data) == 0)
res = client.get('/api/processingnodes/?online=false')
self.assertEqual(res.status_code, status.HTTP_200_OK)
self.assertTrue(len(res.data) == 1)
# Can get single processing node as normal user
res = client.get('/api/processingnodes/{}/'.format(pnode.id))
self.assertEqual(res.status_code, status.HTTP_200_OK)
self.assertTrue(res.data["hostname"] == "localhost")
# Cannot delete a processing node as normal user
res = client.delete('/api/processingnodes/{}/'.format(pnode.id))
self.assertTrue(res.status_code, status.HTTP_403_FORBIDDEN)

Wyświetl plik

@ -4,6 +4,7 @@ from django.test import Client
from app.models import Project, Task
from .classes import BootTestCase
from app import scheduler
class TestApp(BootTestCase):
fixtures = ['test_processingnodes', ]
@ -119,3 +120,10 @@ class TestApp(BootTestCase):
# Should not have permission
self.assertFalse(anotherUser.has_perm("delete_project", p))
def test_scheduler(self):
# Can call update_nodes_info()
self.assertTrue(scheduler.update_nodes_info() == None)
# Can call function in background
self.assertTrue(scheduler.update_nodes_info(background=True).join() == None)

Wyświetl plik

@ -1,4 +1,5 @@
anyjson==0.3.3
APScheduler==3.2.0
attrs==16.2.0
bravado==8.3.0
bravado-core==4.5.0
@ -6,7 +7,6 @@ cffi==1.8.3
crochet==1.5.0
cryptography==1.5
Django==1.10
django-background-task==0.1.8
django-common-helpers==0.8.0
django-filter==0.15.3
django-guardian==1.4.6
@ -15,7 +15,9 @@ djangorestframework==3.5.1
drf-nested-routers==0.11.1
enum34==1.1.6
fido==3.2.0
funcsigs==1.0.2
functools32==3.2.3.post2
futures==3.0.5
idna==2.1
ipaddress==1.0.17
jsonschema==2.5.1
@ -35,6 +37,7 @@ simplejson==3.8.2
six==1.10.0
swagger-spec-validator==2.0.2
Twisted==16.4.1
tzlocal==1.3
yelp-bytes==0.3.0
yelp-encodings==0.1.3
zope.interface==4.3.2

Wyświetl plik

@ -10,7 +10,7 @@ For the full list of settings and their values, see
https://docs.djangoproject.com/en/1.10/ref/settings/
"""
import os
import os, sys
from django.contrib.messages import constants as messages
# Build paths inside the project like this: os.path.join(BASE_DIR, ...)
@ -181,6 +181,10 @@ LOGGING = {
'app.logger': {
'handlers': ['console'],
'level': 'INFO',
},
'apscheduler.executors.default': {
'handlers': ['console'],
'level': 'WARN',
}
}
}
@ -212,6 +216,8 @@ REST_FRAMEWORK = {
'PAGE_SIZE': 10,
}
TESTING = sys.argv[1:2] == ['test']
try:
from .local_settings import *
except ImportError: