kopia lustrzana https://github.com/OpenDroneMap/WebODM
				
				
				
			
		
			
				
	
	
		
			83 wiersze
		
	
	
		
			2.7 KiB
		
	
	
	
		
			Python
		
	
	
			
		
		
	
	
			83 wiersze
		
	
	
		
			2.7 KiB
		
	
	
	
		
			Python
		
	
	
| import time
 | |
| 
 | |
| import logging
 | |
| 
 | |
| from webodm import settings
 | |
| 
 | |
| logger = logging.getLogger('app.logger')
 | |
| 
 | |
| class TestWatch:
 | |
|     def __init__(self):
 | |
|         self.clear()
 | |
| 
 | |
|     def clear(self):
 | |
|         self._calls = {}
 | |
|         self._intercept_list = {}
 | |
| 
 | |
|     def func_to_name(f):
 | |
|         return "{}.{}".format(f.__module__, f.__name__)
 | |
| 
 | |
|     def intercept(self, fname, f = None):
 | |
|         self._intercept_list[fname] = f if f is not None else True
 | |
| 
 | |
|     def execute_intercept_function_replacement(self, fname, *args, **kwargs):
 | |
|         if fname in self._intercept_list and callable(self._intercept_list[fname]):
 | |
|             (self._intercept_list[fname])(*args, **kwargs)
 | |
| 
 | |
|     def should_prevent_execution(self, func):
 | |
|         return TestWatch.func_to_name(func) in self._intercept_list
 | |
| 
 | |
|     def get_calls(self, fname):
 | |
|         return self._calls[fname] if fname in self._calls else []
 | |
| 
 | |
|     def get_calls_count(self, fname):
 | |
|         return len(self.get_calls(fname))
 | |
| 
 | |
|     def wait_until_call(self, fname, count = 1, timeout = 30):
 | |
|         SLEEP_INTERVAL = 0.125
 | |
|         TIMEOUT_LIMIT = timeout / SLEEP_INTERVAL
 | |
|         c = 0
 | |
|         while self.get_calls_count(fname) < count and c < TIMEOUT_LIMIT:
 | |
|             time.sleep(SLEEP_INTERVAL)
 | |
|             c += 1
 | |
| 
 | |
|         if c >= TIMEOUT_LIMIT:
 | |
|             raise TimeoutError("wait_until_call has timed out waiting for {}".format(fname))
 | |
| 
 | |
|         return self.get_calls(fname)
 | |
| 
 | |
|     def log_call(self, func, *args, **kwargs):
 | |
|         fname = TestWatch.func_to_name(func)
 | |
|         logger.info("{} called".format(fname))
 | |
|         list = self._calls[fname] if fname in self._calls else []
 | |
|         list.append({'f': fname, 'args': args, 'kwargs': kwargs})
 | |
|         self._calls[fname] = list
 | |
| 
 | |
|     def hook_pre(self, func, *args, **kwargs):
 | |
|         if settings.TESTING and self.should_prevent_execution(func):
 | |
|             fname = TestWatch.func_to_name(func)
 | |
|             logger.info(fname + " intercepted")
 | |
|             self.execute_intercept_function_replacement(fname, *args, **kwargs)
 | |
|             self.log_call(func, *args, **kwargs)
 | |
|             return True # Intercept
 | |
|         return False # Do not intercept
 | |
| 
 | |
|     def hook_post(self, func, *args, **kwargs):
 | |
|         if settings.TESTING:
 | |
|             self.log_call(func, *args, **kwargs)
 | |
| 
 | |
|     def watch(**kwargs):
 | |
|         """
 | |
|         Decorator that adds pre/post hook calls
 | |
|         """
 | |
|         tw = kwargs.get('testWatch', testWatch)
 | |
|         def outer(func):
 | |
|             def wrapper(*args, **kwargs):
 | |
|                 if tw.hook_pre(func, *args, **kwargs): return
 | |
|                 ret = func(*args, **kwargs)
 | |
|                 tw.hook_post(func, *args, **kwargs)
 | |
|                 return ret
 | |
|             return wrapper
 | |
|         return outer
 | |
| 
 | |
| testWatch = TestWatch() |