OpenDroneMap-WebODM/worker/tasks.py

242 wiersze
8.7 KiB
Python

import os
import shutil
import tempfile
import traceback
import json
import socket
import time
from threading import Event, Thread
from celery.utils.log import get_task_logger
from django.core.exceptions import ObjectDoesNotExist
from django.db.models import Count
from django.db.models import Q
from app.models import Profile
from app.models import Project
from app.models import Task
from nodeodm import status_codes
from nodeodm.models import ProcessingNode
from webodm import settings
import worker
from .celery import app
from app.raster_utils import export_raster as export_raster_sync, extension_for_export_format
from app.pointcloud_utils import export_pointcloud as export_pointcloud_sync
from django.utils import timezone
from datetime import timedelta
import redis
logger = get_task_logger("app.logger")
redis_client = redis.Redis.from_url(settings.CELERY_BROKER_URL)
# What class to use for async results, since during testing we need to mock it
TestSafeAsyncResult = worker.celery.MockAsyncResult if settings.TESTING else app.AsyncResult
@app.task(ignore_result=True)
def update_nodes_info():
if settings.NODE_OPTIMISTIC_MODE:
return
processing_nodes = ProcessingNode.objects.all()
for processing_node in processing_nodes:
processing_node.update_node_info()
# Workaround for mysterious "webodm_node-odm-1" or "webodm-node-odm-1" hostname switcharoo on Mac
# Technically we already check for the correct hostname during setup,
# but sometimes that doesn't work?
check_hostname = 'webodm_node-odm-1'
if processing_node.hostname == check_hostname and not processing_node.is_online():
try:
socket.gethostbyname(processing_node.hostname)
except:
# Hostname was invalid, try renaming
processing_node.hostname = 'webodm-node-odm-1'
processing_node.update_node_info()
if processing_node.is_online():
logger.info("Found and fixed webodm_node-odm-1 hostname switcharoo")
else:
processing_node.hostname = check_hostname
processing_node.save()
@app.task(ignore_result=True)
def cleanup_projects():
# Delete all projects that are marked for deletion
# and that have no tasks left
total, count_dict = Project.objects.filter(deleting=True).annotate(
tasks_count=Count('task')
).filter(tasks_count=0).delete()
if total > 0 and 'app.Project' in count_dict:
logger.info("Deleted {} projects".format(count_dict['app.Project']))
@app.task(ignore_result=True)
def cleanup_tasks():
# Delete tasks that are older than
if settings.CLEANUP_PARTIAL_TASKS is None:
return
tasks_to_delete = Task.objects.filter(partial=True, created_at__lte=timezone.now() - timedelta(hours=settings.CLEANUP_PARTIAL_TASKS))
for t in tasks_to_delete:
logger.info("Cleaning up partial task {}".format(t))
t.delete()
@app.task(ignore_result=True)
def cleanup_tmp_directory():
# Delete files and folder in the tmp directory that are
# older than 24 hours
tmpdir = settings.MEDIA_TMP
time_limit = 60 * 60 * 24
for f in os.listdir(tmpdir):
now = time.time()
filepath = os.path.join(tmpdir, f)
modified = os.stat(filepath).st_mtime
if modified < now - time_limit:
if os.path.isfile(filepath):
os.remove(filepath)
else:
shutil.rmtree(filepath, ignore_errors=True)
logger.info('Cleaned up: %s (%s)' % (f, modified))
# Based on https://stackoverflow.com/questions/22498038/improve-current-implementation-of-a-setinterval-python/22498708#22498708
def setInterval(interval, func, *args):
stopped = Event()
def loop():
while not stopped.wait(interval):
func(*args)
t = Thread(target=loop)
t.daemon = True
t.start()
return stopped.set
@app.task(ignore_result=True)
def process_task(taskId):
lock_id = 'task_lock_{}'.format(taskId)
cancel_monitor = None
delete_lock = True
try:
task_lock_last_update = redis_client.getset(lock_id, time.time())
if task_lock_last_update is not None:
# Check if lock has expired
if time.time() - float(task_lock_last_update) <= 30:
# Locked
delete_lock = False
return
else:
# Expired
logger.warning("Task {} has an expired lock! This could mean that WebODM is running out of memory. Check your server configuration.".format(taskId))
# Set lock
def update_lock():
redis_client.set(lock_id, time.time())
cancel_monitor = setInterval(5, update_lock)
try:
task = Task.objects.get(pk=taskId)
except ObjectDoesNotExist:
logger.info("Task {} has already been deleted.".format(taskId))
return
try:
task.process()
except Exception as e:
logger.error(
"Uncaught error! This is potentially bad. Please report it to http://github.com/OpenDroneMap/WebODM/issues: {} {}".format(
e, traceback.format_exc()))
if settings.TESTING: raise e
finally:
if cancel_monitor is not None:
cancel_monitor()
if delete_lock:
try:
redis_client.delete(lock_id)
except redis.exceptions.RedisError:
# Ignore errors, the lock will expire at some point
pass
def get_pending_tasks():
# All tasks that have a processing node assigned
# Or that need one assigned (via auto)
# or tasks that need a status update
# or tasks that have a pending action
# no partial tasks allowed
return Task.objects.filter(Q(processing_node__isnull=True, auto_processing_node=True, partial=False) |
Q(Q(status=None) | Q(status__in=[status_codes.QUEUED, status_codes.RUNNING]),
processing_node__isnull=False, partial=False) |
Q(pending_action__isnull=False, partial=False))
@app.task(ignore_result=True)
def process_pending_tasks():
tasks = get_pending_tasks()
for task in tasks:
process_task.delay(task.id)
@app.task(bind=True)
def export_raster(self, input, **opts):
try:
logger.info("Exporting raster {} with options: {}".format(input, json.dumps(opts)))
tmpfile = tempfile.mktemp('_raster.{}'.format(extension_for_export_format(opts.get('format', 'gtiff'))), dir=settings.MEDIA_TMP)
export_raster_sync(input, tmpfile, **opts)
result = {'file': tmpfile}
if settings.TESTING:
TestSafeAsyncResult.set(self.request.id, result)
return result
except Exception as e:
logger.error(str(e))
return {'error': str(e)}
@app.task(bind=True)
def export_pointcloud(self, input, **opts):
try:
logger.info("Exporting point cloud {} with options: {}".format(input, json.dumps(opts)))
tmpfile = tempfile.mktemp('_pointcloud.{}'.format(opts.get('format', 'laz')), dir=settings.MEDIA_TMP)
export_pointcloud_sync(input, tmpfile, **opts)
result = {'file': tmpfile}
if settings.TESTING:
TestSafeAsyncResult.set(self.request.id, result)
return result
except Exception as e:
logger.error(str(e))
return {'error': str(e)}
@app.task(ignore_result=True)
def check_quotas():
profiles = Profile.objects.filter(quota__gt=-1)
for p in profiles:
if p.has_exceeded_quota():
deadline = p.get_quota_deadline()
if deadline is None:
deadline = p.set_quota_deadline(settings.QUOTA_EXCEEDED_GRACE_PERIOD)
now = time.time()
if now > deadline:
# deadline passed, delete tasks until quota is met
logger.info("Quota deadline expired for %s, deleting tasks" % str(p.user.username))
task_count = Task.objects.filter(project__owner=p.user).count()
c = 0
while p.has_exceeded_quota():
try:
last_task = Task.objects.filter(project__owner=p.user).order_by("-created_at").first()
if last_task is None:
break
logger.info("Deleting %s" % last_task)
last_task.delete()
except Exception as e:
logger.warn("Cannot delete %s for %s: %s" % (str(last_task), str(p.user.username), str(e)))
c += 1
if c >= task_count:
break
else:
p.clear_quota_deadline()