OpenDroneMap-ODM/opendm/concurrency.py

100 wiersze
2.9 KiB
Python

from vmem import virtual_memory
import os
import sys
try:
import Queue as queue
except:
import queue
import threading
import time
from opendm import log
def get_max_memory(minimum = 5, use_at_most = 0.5):
"""
:param minimum minimum value to return (return value will never be lower than this)
:param use_at_most use at most this fraction of the available memory. 0.5 = use at most 50% of available memory
:return percentage value of memory to use (75 = 75%).
"""
return max(minimum, (100 - virtual_memory().percent) * use_at_most)
def get_max_memory_mb(minimum = 100, use_at_most = 0.5):
"""
:param minimum minimum value to return (return value will never be lower than this)
:param use_at_most use at most this fraction of the available memory. 0.5 = use at most 50% of available memory
:return value of memory to use in megabytes.
"""
return max(minimum, (virtual_memory().available / 1024 / 1024) * use_at_most)
def parallel_map(func, items, max_workers=1, single_thread_fallback=True):
"""
Our own implementation for parallel processing
which handles gracefully CTRL+C and reverts to
single thread processing in case of errors
:param items list of objects
:param func function to execute on each object
"""
global error
error = None
def process_one(q):
func(q)
def worker():
global error
while True:
(num, q) = pq.get()
if q is None or error is not None:
pq.task_done()
break
try:
process_one(q)
except Exception as e:
error = e
finally:
pq.task_done()
if max_workers > 1:
use_single_thread = False
pq = queue.PriorityQueue()
threads = []
for i in range(max_workers):
t = threading.Thread(target=worker)
t.start()
threads.append(t)
i = 1
for t in items:
pq.put((i, t))
i += 1
def stop_workers():
for i in range(len(threads)):
pq.put((-1, None))
for t in threads:
t.join()
# block until all tasks are done
try:
while pq.unfinished_tasks > 0:
time.sleep(0.5)
except KeyboardInterrupt:
print("CTRL+C terminating...")
stop_workers()
sys.exit(1)
stop_workers()
if error is not None and single_thread_fallback:
# Try to reprocess using a single thread
# in case this was a memory error
log.ODM_WARNING("Failed to run process in parallel, retrying with a single thread...")
use_single_thread = True
else:
use_single_thread = True
if use_single_thread:
# Boring, single thread processing
for q in items:
process_one(q)