Readded scheduler tests, added thread tracking

pull/40/head
Piero Toffanin 2016-10-29 19:25:21 -04:00
rodzic 2cf2c62fb5
commit b07be13d69
2 zmienionych plików z 15 dodań i 9 usunięć

Wyświetl plik

@ -10,8 +10,9 @@ import random
logger = logging.getLogger('app.logger') logger = logging.getLogger('app.logger')
scheduler = BackgroundScheduler() scheduler = BackgroundScheduler()
threads = [] # Keep track of alive threads
def job(func): def background(func):
""" """
Adds background={True|False} param to any function Adds background={True|False} param to any function
so that we can call update_nodes_info(background=True) from the outside so that we can call update_nodes_info(background=True) from the outside
@ -19,15 +20,20 @@ def job(func):
def wrapper(*args,**kwargs): def wrapper(*args,**kwargs):
background = kwargs.get('background', False) background = kwargs.get('background', False)
if background: if background:
# Need to initialize reference to variable in order
# to use it from a decorator: http://stackoverflow.com/questions/10913060/local-variable-referenced-before-assignment-for-decorator
threads_local = threads
t = Thread(target=func) t = Thread(target=func)
threads_local.append(t)
threads_local = filter(lambda u: u.is_alive(), threads_local)
t.start() t.start()
return t return t
else: else:
return func(*args, **kwargs) return func(*args, **kwargs)
return wrapper return wrapper
@background
@job
def update_nodes_info(): def update_nodes_info():
processing_nodes = ProcessingNode.objects.all() processing_nodes = ProcessingNode.objects.all()
for processing_node in processing_nodes: for processing_node in processing_nodes:
@ -36,7 +42,7 @@ def update_nodes_info():
tasks_mutex = Lock() tasks_mutex = Lock()
@job @background
def process_pending_tasks(): def process_pending_tasks():
tasks = [] tasks = []
try: try:
@ -76,6 +82,7 @@ def setup():
def teardown(): def teardown():
logger.info("Stopping scheduler...") logger.info("Stopping scheduler...")
try: try:
for t in threads: t.join()
scheduler.shutdown() scheduler.shutdown()
logger.info("Scheduler stopped") logger.info("Scheduler stopped")
except SchedulerNotRunningError: except SchedulerNotRunningError:

Wyświetl plik

@ -139,13 +139,12 @@ class TestApp(BootTestCase):
def test_scheduler(self): def test_scheduler(self):
pass self.assertTrue(scheduler.setup() == None)
#self.assertTrue(scheduler.setup() == None)
# Can call update_nodes_info() # Can call update_nodes_info()
#self.assertTrue(scheduler.update_nodes_info() == None) self.assertTrue(scheduler.update_nodes_info() == None)
# Can call function in background # Can call function in background
#self.assertTrue(scheduler.update_nodes_info(background=True).join() == None) self.assertTrue(scheduler.update_nodes_info(background=True).join() == None)
#self.assertTrue(scheduler.teardown() == None) self.assertTrue(scheduler.teardown() == None)