OpenDroneMap-WebODM/worker/tasks.py

242 wiersze
8.7 KiB
Python
Czysty Zwykły widok Historia

2019-04-01 20:49:56 +00:00
import os
import shutil
2020-01-17 18:19:03 +00:00
import tempfile
import traceback
2021-11-01 19:51:40 +00:00
import json
2022-12-14 06:19:36 +00:00
import socket
2019-04-01 20:49:56 +00:00
import time
2019-06-14 15:15:24 +00:00
from threading import Event, Thread
from celery.utils.log import get_task_logger
from django.core.exceptions import ObjectDoesNotExist
from django.db.models import Count
from django.db.models import Q
from app.models import Profile
from app.models import Project
from app.models import Task
from nodeodm import status_codes
from nodeodm.models import ProcessingNode
from webodm import settings
2020-01-20 21:52:01 +00:00
import worker
from .celery import app
2021-11-01 19:51:40 +00:00
from app.raster_utils import export_raster as export_raster_sync, extension_for_export_format
2021-11-04 17:27:36 +00:00
from app.pointcloud_utils import export_pointcloud as export_pointcloud_sync
2023-11-06 16:21:10 +00:00
from django.utils import timezone
from datetime import timedelta
import redis
2019-06-19 14:31:51 +00:00
logger = get_task_logger("app.logger")
redis_client = redis.Redis.from_url(settings.CELERY_BROKER_URL)
2020-01-20 21:52:01 +00:00
# What class to use for async results, since during testing we need to mock it
TestSafeAsyncResult = worker.celery.MockAsyncResult if settings.TESTING else app.AsyncResult
2023-10-04 17:34:31 +00:00
@app.task(ignore_result=True)
def update_nodes_info():
2023-09-16 16:23:49 +00:00
if settings.NODE_OPTIMISTIC_MODE:
return
processing_nodes = ProcessingNode.objects.all()
for processing_node in processing_nodes:
processing_node.update_node_info()
2022-12-14 06:19:36 +00:00
# Workaround for mysterious "webodm_node-odm-1" or "webodm-node-odm-1" hostname switcharoo on Mac
# Technically we already check for the correct hostname during setup,
# but sometimes that doesn't work?
check_hostname = 'webodm_node-odm-1'
if processing_node.hostname == check_hostname and not processing_node.is_online():
try:
socket.gethostbyname(processing_node.hostname)
except:
# Hostname was invalid, try renaming
processing_node.hostname = 'webodm-node-odm-1'
processing_node.update_node_info()
if processing_node.is_online():
logger.info("Found and fixed webodm_node-odm-1 hostname switcharoo")
else:
processing_node.hostname = check_hostname
processing_node.save()
2023-10-04 17:34:31 +00:00
@app.task(ignore_result=True)
def cleanup_projects():
# Delete all projects that are marked for deletion
# and that have no tasks left
total, count_dict = Project.objects.filter(deleting=True).annotate(
tasks_count=Count('task')
).filter(tasks_count=0).delete()
if total > 0 and 'app.Project' in count_dict:
logger.info("Deleted {} projects".format(count_dict['app.Project']))
2023-11-06 16:21:10 +00:00
@app.task(ignore_result=True)
def cleanup_tasks():
# Delete tasks that are older than
if settings.CLEANUP_PARTIAL_TASKS is None:
return
tasks_to_delete = Task.objects.filter(partial=True, created_at__lte=timezone.now() - timedelta(hours=settings.CLEANUP_PARTIAL_TASKS))
for t in tasks_to_delete:
logger.info("Cleaning up partial task {}".format(t))
t.delete()
2023-10-04 17:34:31 +00:00
@app.task(ignore_result=True)
2019-04-01 20:49:56 +00:00
def cleanup_tmp_directory():
# Delete files and folder in the tmp directory that are
# older than 24 hours
tmpdir = settings.MEDIA_TMP
time_limit = 60 * 60 * 24
for f in os.listdir(tmpdir):
now = time.time()
filepath = os.path.join(tmpdir, f)
modified = os.stat(filepath).st_mtime
if modified < now - time_limit:
if os.path.isfile(filepath):
os.remove(filepath)
else:
shutil.rmtree(filepath, ignore_errors=True)
logger.info('Cleaned up: %s (%s)' % (f, modified))
2019-06-14 15:15:24 +00:00
# Based on https://stackoverflow.com/questions/22498038/improve-current-implementation-of-a-setinterval-python/22498708#22498708
def setInterval(interval, func, *args):
stopped = Event()
def loop():
while not stopped.wait(interval):
func(*args)
t = Thread(target=loop)
t.daemon = True
t.start()
return stopped.set
2023-10-04 17:34:31 +00:00
@app.task(ignore_result=True)
def process_task(taskId):
2019-06-14 15:15:24 +00:00
lock_id = 'task_lock_{}'.format(taskId)
cancel_monitor = None
2019-06-19 14:31:51 +00:00
delete_lock = True
try:
2019-06-14 15:15:24 +00:00
task_lock_last_update = redis_client.getset(lock_id, time.time())
if task_lock_last_update is not None:
# Check if lock has expired
if time.time() - float(task_lock_last_update) <= 30:
2019-06-14 15:15:24 +00:00
# Locked
2019-06-19 14:31:51 +00:00
delete_lock = False
2019-06-14 15:15:24 +00:00
return
else:
# Expired
2021-06-16 14:55:13 +00:00
logger.warning("Task {} has an expired lock! This could mean that WebODM is running out of memory. Check your server configuration.".format(taskId))
2019-06-14 15:15:24 +00:00
# Set lock
def update_lock():
redis_client.set(lock_id, time.time())
2019-06-19 14:31:51 +00:00
cancel_monitor = setInterval(5, update_lock)
try:
task = Task.objects.get(pk=taskId)
except ObjectDoesNotExist:
2019-06-14 15:15:24 +00:00
logger.info("Task {} has already been deleted.".format(taskId))
return
try:
task.process()
except Exception as e:
logger.error(
"Uncaught error! This is potentially bad. Please report it to http://github.com/OpenDroneMap/WebODM/issues: {} {}".format(
e, traceback.format_exc()))
if settings.TESTING: raise e
finally:
2019-06-14 15:28:20 +00:00
if cancel_monitor is not None:
cancel_monitor()
2019-06-19 14:31:51 +00:00
if delete_lock:
try:
redis_client.delete(lock_id)
except redis.exceptions.RedisError:
# Ignore errors, the lock will expire at some point
pass
2019-06-14 15:28:20 +00:00
2019-06-14 15:15:24 +00:00
2018-04-08 20:21:09 +00:00
def get_pending_tasks():
# All tasks that have a processing node assigned
# Or that need one assigned (via auto)
# or tasks that need a status update
# or tasks that have a pending action
# no partial tasks allowed
return Task.objects.filter(Q(processing_node__isnull=True, auto_processing_node=True, partial=False) |
Q(Q(status=None) | Q(status__in=[status_codes.QUEUED, status_codes.RUNNING]),
processing_node__isnull=False, partial=False) |
Q(pending_action__isnull=False, partial=False))
2023-10-04 17:34:31 +00:00
@app.task(ignore_result=True)
2018-04-08 20:21:09 +00:00
def process_pending_tasks():
tasks = get_pending_tasks()
2018-02-17 17:35:03 +00:00
for task in tasks:
process_task.delay(task.id)
2020-01-20 21:52:01 +00:00
@app.task(bind=True)
2021-11-01 19:51:40 +00:00
def export_raster(self, input, **opts):
2020-01-17 18:19:03 +00:00
try:
2021-11-01 19:51:40 +00:00
logger.info("Exporting raster {} with options: {}".format(input, json.dumps(opts)))
tmpfile = tempfile.mktemp('_raster.{}'.format(extension_for_export_format(opts.get('format', 'gtiff'))), dir=settings.MEDIA_TMP)
export_raster_sync(input, tmpfile, **opts)
2020-01-20 21:52:01 +00:00
result = {'file': tmpfile}
2021-11-04 17:27:36 +00:00
if settings.TESTING:
TestSafeAsyncResult.set(self.request.id, result)
return result
except Exception as e:
logger.error(str(e))
return {'error': str(e)}
@app.task(bind=True)
def export_pointcloud(self, input, **opts):
try:
logger.info("Exporting point cloud {} with options: {}".format(input, json.dumps(opts)))
tmpfile = tempfile.mktemp('_pointcloud.{}'.format(opts.get('format', 'laz')), dir=settings.MEDIA_TMP)
export_pointcloud_sync(input, tmpfile, **opts)
result = {'file': tmpfile}
2020-01-20 21:52:01 +00:00
if settings.TESTING:
TestSafeAsyncResult.set(self.request.id, result)
return result
2020-01-17 18:19:03 +00:00
except Exception as e:
2020-01-20 21:52:01 +00:00
logger.error(str(e))
return {'error': str(e)}
2023-10-04 17:34:31 +00:00
@app.task(ignore_result=True)
def check_quotas():
profiles = Profile.objects.filter(quota__gt=-1)
2023-09-04 17:34:54 +00:00
for p in profiles:
if p.has_exceeded_quota():
deadline = p.get_quota_deadline()
if deadline is None:
deadline = p.set_quota_deadline(settings.QUOTA_EXCEEDED_GRACE_PERIOD)
now = time.time()
if now > deadline:
# deadline passed, delete tasks until quota is met
logger.info("Quota deadline expired for %s, deleting tasks" % str(p.user.username))
2023-09-12 16:40:45 +00:00
task_count = Task.objects.filter(project__owner=p.user).count()
c = 0
2023-09-04 17:34:54 +00:00
while p.has_exceeded_quota():
try:
last_task = Task.objects.filter(project__owner=p.user).order_by("-created_at").first()
if last_task is None:
break
logger.info("Deleting %s" % last_task)
last_task.delete()
except Exception as e:
logger.warn("Cannot delete %s for %s: %s" % (str(last_task), str(p.user.username), str(e)))
2023-09-12 16:40:45 +00:00
c += 1
if c >= task_count:
2023-09-04 17:34:54 +00:00
break
else:
p.clear_quota_deadline()