Moved @background decorator in separate module, added proof of concept intercept class

pull/94/head
Piero Toffanin 2017-02-05 22:23:02 -05:00
rodzic 792eee94e5
commit 26339e0c32
5 zmienionych plików z 77 dodań i 42 usunięć

55
app/background.py 100644
Wyświetl plik

@ -0,0 +1,55 @@
from threading import Thread
from django import db
from webodm import settings
# TODO: design class such that:
# 1. test cases can choose which functions to intercept (prevent from executing)
# 2. test cases can see how many times a function has been called (and with which parameters)
# 3. test cases can pause until a function has been called
class TestWatch:
stats = {}
def called(self, func, *args, **kwargs):
list = TestWatch.stats[func] if func in TestWatch.stats else []
list.append({'f': func, 'args': args, 'kwargs': kwargs})
print(list)
def clear(self):
TestWatch.stats = {}
testWatch = TestWatch()
def background(func):
"""
Adds background={True|False} param to any function
so that we can call update_nodes_info(background=True) from the outside
"""
def wrapper(*args,**kwargs):
background = kwargs.get('background', False)
if 'background' in kwargs: del kwargs['background']
if background:
if settings.TESTING:
# During testing, intercept all background requests and execute them on the same thread
testWatch.called(func.__name__, *args, **kwargs)
# Create a function that closes all
# db connections at the end of the thread
# This is necessary to make sure we don't leave
# open connections lying around.
def execute_and_close_db():
ret = None
try:
ret = func(*args, **kwargs)
finally:
db.connections.close_all()
return ret
t = Thread(target=execute_and_close_db)
t.daemon = True
t.start()
return t
else:
return func(*args, **kwargs)
return wrapper

Wyświetl plik

@ -1,15 +1,17 @@
import logging
import traceback
from multiprocessing.dummy import Pool as ThreadPool
from threading import Lock
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.schedulers import SchedulerAlreadyRunningError, SchedulerNotRunningError
from threading import Thread, Lock
from multiprocessing.dummy import Pool as ThreadPool
from nodeodm.models import ProcessingNode
from app.models import Task, Project
from django.db.models import Q, Count
from apscheduler.schedulers.background import BackgroundScheduler
from django import db
from django.db.models import Q, Count
from app.models import Task, Project
from nodeodm import status_codes
from nodeodm.models import ProcessingNode
from app.background import background
logger = logging.getLogger('app.logger')
scheduler = BackgroundScheduler({
@ -17,36 +19,6 @@ scheduler = BackgroundScheduler({
'apscheduler.job_defaults.max_instances': '3',
})
def background(func):
"""
Adds background={True|False} param to any function
so that we can call update_nodes_info(background=True) from the outside
"""
def wrapper(*args,**kwargs):
background = kwargs.get('background', False)
if 'background' in kwargs: del kwargs['background']
if background:
# Create a function that closes all
# db connections at the end of the thread
# This is necessary to make sure we don't leave
# open connections lying around.
def execute_and_close_db():
ret = None
try:
ret = func(*args, **kwargs)
finally:
db.connections.close_all()
return ret
t = Thread(target=execute_and_close_db)
t.start()
return t
else:
return func(*args, **kwargs)
return wrapper
@background
def update_nodes_info():
processing_nodes = ProcessingNode.objects.all()
@ -86,6 +58,8 @@ def process_pending_tasks():
task.save()
except Exception as e:
logger.error("Uncaught error: {} {}".format(e, traceback.format_exc()))
finally:
db.connections.close_all()
if tasks.count() > 0:
pool = ThreadPool(tasks.count())

Wyświetl plik

@ -56,6 +56,7 @@ class BootTestCase(TestCase):
def tearDownClass(cls):
super(BootTestCase, cls).tearDownClass()
class BootTransactionTestCase(TransactionTestCase):
'''
Same as above, but inherits from TransactionTestCase

Wyświetl plik

@ -15,8 +15,10 @@ from nodeodm import status_codes
from nodeodm.models import ProcessingNode
# We need to test the task API in a TransactionTestCase because
# processing happens on a separate thread. This is required by Django.
class TestApi(BootTransactionTestCase):
# task processing happens on a separate thread, and normal TestCases
# do not commit changes to the DB, so spawning a new thread will show no
# data in it.
class TestApiTask(BootTransactionTestCase):
def test_task(self):
DELAY = 1 # time to sleep for during process launch, background processing, etc.
client = APIClient()
@ -167,6 +169,10 @@ class TestApi(BootTransactionTestCase):
# On update scheduler.processing_pending_tasks should have been called in the background
time.sleep(DELAY)
print("HERE")
from app.background import testWatch
print(testWatch.stats)
# Processing should have completed
task.refresh_from_db()
self.assertTrue(task.status == status_codes.RUNNING)
@ -190,4 +196,4 @@ class TestApi(BootTransactionTestCase):
# Teardown processing node
node_odm.terminate()
time.sleep(20)
#time.sleep(20)

Wyświetl plik

@ -43,7 +43,7 @@ class ApiClient:
return requests.post(self.url('/task/restart'), data={'uuid': uuid}, timeout=TIMEOUT).json()
def task_download(self, uuid, asset):
res = requests.get(self.url('/task/{}/download/{}').format(uuid, asset), stream=True, timeout=TIMEOUT)
res = requests.get(self.url('/task/{}/download/{}').format(uuid, asset), stream=True)
if "Content-Type" in res.headers and "application/json" in res.headers['Content-Type']:
return res.json()
else:
@ -62,5 +62,4 @@ class ApiClient:
) for image in images]
return requests.post(self.url("/task/new"),
files=files,
data={'name': name, 'options': json.dumps(options)},
timeout=TIMEOUT).json()
data={'name': name, 'options': json.dumps(options)}).json()