kopia lustrzana https://github.com/OpenDroneMap/WebODM
				
				
				
			
		
			
				
	
	
		
			242 wiersze
		
	
	
		
			8.7 KiB
		
	
	
	
		
			Python
		
	
	
			
		
		
	
	
			242 wiersze
		
	
	
		
			8.7 KiB
		
	
	
	
		
			Python
		
	
	
| import os
 | |
| import shutil
 | |
| import tempfile
 | |
| import traceback
 | |
| import json
 | |
| import socket
 | |
| 
 | |
| import time
 | |
| from threading import Event, Thread
 | |
| from celery.utils.log import get_task_logger
 | |
| from django.core.exceptions import ObjectDoesNotExist
 | |
| from django.db.models import Count
 | |
| from django.db.models import Q
 | |
| from app.models import Profile
 | |
| 
 | |
| from app.models import Project
 | |
| from app.models import Task
 | |
| from nodeodm import status_codes
 | |
| from nodeodm.models import ProcessingNode
 | |
| from webodm import settings
 | |
| import worker
 | |
| from .celery import app
 | |
| from app.raster_utils import export_raster as export_raster_sync, extension_for_export_format
 | |
| from app.pointcloud_utils import export_pointcloud as export_pointcloud_sync
 | |
| from django.utils import timezone
 | |
| from datetime import timedelta
 | |
| import redis
 | |
| 
 | |
| logger = get_task_logger("app.logger")
 | |
| redis_client = redis.Redis.from_url(settings.CELERY_BROKER_URL)
 | |
| 
 | |
| # What class to use for async results, since during testing we need to mock it
 | |
| TestSafeAsyncResult = worker.celery.MockAsyncResult if settings.TESTING else app.AsyncResult
 | |
| 
 | |
| @app.task(ignore_result=True)
 | |
| def update_nodes_info():
 | |
|     if settings.NODE_OPTIMISTIC_MODE:
 | |
|         return
 | |
|     
 | |
|     processing_nodes = ProcessingNode.objects.all()
 | |
|     for processing_node in processing_nodes:
 | |
|         processing_node.update_node_info()
 | |
| 
 | |
|         # Workaround for mysterious "webodm_node-odm-1" or "webodm-node-odm-1" hostname switcharoo on Mac
 | |
|         # Technically we already check for the correct hostname during setup, 
 | |
|         # but sometimes that doesn't work?
 | |
|         check_hostname = 'webodm_node-odm-1'
 | |
|         if processing_node.hostname == check_hostname and not processing_node.is_online():
 | |
|             try:
 | |
|                 socket.gethostbyname(processing_node.hostname)
 | |
|             except:
 | |
|                 # Hostname was invalid, try renaming
 | |
|                 processing_node.hostname = 'webodm-node-odm-1'
 | |
|                 processing_node.update_node_info()
 | |
|                 if processing_node.is_online():
 | |
|                     logger.info("Found and fixed webodm_node-odm-1 hostname switcharoo")
 | |
|                 else:
 | |
|                     processing_node.hostname = check_hostname
 | |
|                 processing_node.save()
 | |
| 
 | |
| @app.task(ignore_result=True)
 | |
| def cleanup_projects():
 | |
|     # Delete all projects that are marked for deletion
 | |
|     # and that have no tasks left
 | |
|     total, count_dict = Project.objects.filter(deleting=True).annotate(
 | |
|         tasks_count=Count('task')
 | |
|     ).filter(tasks_count=0).delete()
 | |
|     if total > 0 and 'app.Project' in count_dict:
 | |
|         logger.info("Deleted {} projects".format(count_dict['app.Project']))
 | |
| 
 | |
| @app.task(ignore_result=True)
 | |
| def cleanup_tasks():
 | |
|     # Delete tasks that are older than 
 | |
|     if settings.CLEANUP_PARTIAL_TASKS is None:
 | |
|         return
 | |
|     
 | |
|     tasks_to_delete = Task.objects.filter(partial=True, created_at__lte=timezone.now() - timedelta(hours=settings.CLEANUP_PARTIAL_TASKS))
 | |
|     for t in tasks_to_delete:
 | |
|         logger.info("Cleaning up partial task {}".format(t))
 | |
|         t.delete()
 | |
| 
 | |
| @app.task(ignore_result=True)
 | |
| def cleanup_tmp_directory():
 | |
|     # Delete files and folder in the tmp directory that are
 | |
|     # older than 24 hours
 | |
|     tmpdir = settings.MEDIA_TMP
 | |
|     time_limit = 60 * 60 * 24
 | |
| 
 | |
|     for f in os.listdir(tmpdir):
 | |
|         now = time.time()
 | |
|         filepath = os.path.join(tmpdir, f)
 | |
|         modified = os.stat(filepath).st_mtime
 | |
|         if modified < now - time_limit:
 | |
|             if os.path.isfile(filepath):
 | |
|                 os.remove(filepath)
 | |
|             else:
 | |
|                 shutil.rmtree(filepath, ignore_errors=True)
 | |
| 
 | |
|             logger.info('Cleaned up: %s (%s)' % (f, modified))
 | |
| 
 | |
| 
 | |
| # Based on https://stackoverflow.com/questions/22498038/improve-current-implementation-of-a-setinterval-python/22498708#22498708
 | |
| def setInterval(interval, func, *args):
 | |
|     stopped = Event()
 | |
|     def loop():
 | |
|         while not stopped.wait(interval):
 | |
|             func(*args)
 | |
|     t = Thread(target=loop)
 | |
|     t.daemon = True
 | |
|     t.start()
 | |
|     return stopped.set
 | |
| 
 | |
| @app.task(ignore_result=True)
 | |
| def process_task(taskId):
 | |
|     lock_id = 'task_lock_{}'.format(taskId)
 | |
|     cancel_monitor = None
 | |
|     delete_lock = True
 | |
| 
 | |
|     try:
 | |
|         task_lock_last_update = redis_client.getset(lock_id, time.time())
 | |
|         if task_lock_last_update is not None:
 | |
|             # Check if lock has expired
 | |
|             if time.time() - float(task_lock_last_update) <= 30:
 | |
|                 # Locked
 | |
|                 delete_lock = False
 | |
|                 return
 | |
|             else:
 | |
|                 # Expired
 | |
|                 logger.warning("Task {} has an expired lock! This could mean that WebODM is running out of memory. Check your server configuration.".format(taskId))
 | |
| 
 | |
|         # Set lock
 | |
|         def update_lock():
 | |
|             redis_client.set(lock_id, time.time())
 | |
|         cancel_monitor = setInterval(5, update_lock)
 | |
| 
 | |
|         try:
 | |
|             task = Task.objects.get(pk=taskId)
 | |
|         except ObjectDoesNotExist:
 | |
|             logger.info("Task {} has already been deleted.".format(taskId))
 | |
|             return
 | |
| 
 | |
|         try:
 | |
|             task.process()
 | |
|         except Exception as e:
 | |
|             logger.error(
 | |
|                 "Uncaught error! This is potentially bad. Please report it to http://github.com/OpenDroneMap/WebODM/issues: {} {}".format(
 | |
|                     e, traceback.format_exc()))
 | |
|             if settings.TESTING: raise e
 | |
|     finally:
 | |
|         if cancel_monitor is not None:
 | |
|             cancel_monitor()
 | |
| 
 | |
|         if delete_lock:
 | |
|             try:
 | |
|                 redis_client.delete(lock_id)
 | |
|             except redis.exceptions.RedisError:
 | |
|                 # Ignore errors, the lock will expire at some point
 | |
|                 pass
 | |
| 
 | |
| 
 | |
| 
 | |
| def get_pending_tasks():
 | |
|     # All tasks that have a processing node assigned
 | |
|     # Or that need one assigned (via auto)
 | |
|     # or tasks that need a status update
 | |
|     # or tasks that have a pending action
 | |
|     # no partial tasks allowed
 | |
|     return Task.objects.filter(Q(processing_node__isnull=True, auto_processing_node=True, partial=False) |
 | |
|                                 Q(Q(status=None) | Q(status__in=[status_codes.QUEUED, status_codes.RUNNING]),
 | |
|                                   processing_node__isnull=False, partial=False) |
 | |
|                                 Q(pending_action__isnull=False, partial=False))
 | |
| 
 | |
| @app.task(ignore_result=True)
 | |
| def process_pending_tasks():
 | |
|     tasks = get_pending_tasks()
 | |
|     for task in tasks:
 | |
|         process_task.delay(task.id)
 | |
| 
 | |
| 
 | |
| @app.task(bind=True)
 | |
| def export_raster(self, input, **opts):
 | |
|     try:
 | |
|         logger.info("Exporting raster {} with options: {}".format(input, json.dumps(opts)))
 | |
|         tmpfile = tempfile.mktemp('_raster.{}'.format(extension_for_export_format(opts.get('format', 'gtiff'))), dir=settings.MEDIA_TMP)
 | |
|         export_raster_sync(input, tmpfile, **opts)
 | |
|         result = {'file': tmpfile}
 | |
| 
 | |
|         if settings.TESTING:
 | |
|             TestSafeAsyncResult.set(self.request.id, result)
 | |
| 
 | |
|         return result
 | |
|     except Exception as e:
 | |
|         logger.error(str(e))
 | |
|         return {'error': str(e)}
 | |
| 
 | |
| @app.task(bind=True)
 | |
| def export_pointcloud(self, input, **opts):
 | |
|     try:
 | |
|         logger.info("Exporting point cloud {} with options: {}".format(input, json.dumps(opts)))
 | |
|         tmpfile = tempfile.mktemp('_pointcloud.{}'.format(opts.get('format', 'laz')), dir=settings.MEDIA_TMP)
 | |
|         export_pointcloud_sync(input, tmpfile, **opts)
 | |
|         result = {'file': tmpfile}
 | |
| 
 | |
|         if settings.TESTING:
 | |
|             TestSafeAsyncResult.set(self.request.id, result)
 | |
| 
 | |
|         return result
 | |
|     except Exception as e:
 | |
|         logger.error(str(e))
 | |
|         return {'error': str(e)}
 | |
| 
 | |
| @app.task(ignore_result=True)
 | |
| def check_quotas():
 | |
|     profiles = Profile.objects.filter(quota__gt=-1)
 | |
|     for p in profiles:
 | |
|         if p.has_exceeded_quota():
 | |
|             deadline = p.get_quota_deadline()
 | |
|             if deadline is None:
 | |
|                 deadline = p.set_quota_deadline(settings.QUOTA_EXCEEDED_GRACE_PERIOD)
 | |
|             now = time.time()
 | |
|             if now > deadline:
 | |
|                 # deadline passed, delete tasks until quota is met
 | |
|                 logger.info("Quota deadline expired for %s, deleting tasks" % str(p.user.username))
 | |
|                 task_count = Task.objects.filter(project__owner=p.user).count()
 | |
|                 c = 0
 | |
| 
 | |
|                 while p.has_exceeded_quota():
 | |
|                     try:
 | |
|                         last_task = Task.objects.filter(project__owner=p.user).order_by("-created_at").first()
 | |
|                         if last_task is None:
 | |
|                             break
 | |
|                         logger.info("Deleting %s" % last_task)
 | |
|                         last_task.delete()
 | |
|                     except Exception as e:
 | |
|                         logger.warn("Cannot delete %s for %s: %s" % (str(last_task), str(p.user.username), str(e)))
 | |
|                     
 | |
|                     c += 1
 | |
|                     if c >= task_count:
 | |
|                         break
 | |
|         else:
 | |
|             p.clear_quota_deadline()
 |