OpenDroneMap-ODM/opendm/grass_engine.py

140 wiersze
4.7 KiB
Python
Czysty Zwykły widok Historia

2019-04-24 22:33:12 +00:00
import shutil
import tempfile
import subprocess
import os
import sys
import time
2019-04-24 22:33:12 +00:00
from opendm import log
from opendm import system
from string import Template
class GrassEngine:
def __init__(self):
self.grass_binary = system.which('grass7') or \
system.which('grass72') or \
system.which('grass74') or \
system.which('grass76') or \
system.which('grass78')
if self.grass_binary is None:
log.ODM_WARNING("Could not find a GRASS 7 executable. GRASS scripts will not work.")
else:
log.ODM_INFO("Initializing GRASS engine using {}".format(self.grass_binary))
def create_context(self, serialized_context = {}):
if self.grass_binary is None: raise GrassEngineException("GRASS engine is unavailable")
return GrassContext(self.grass_binary, **serialized_context)
class GrassContext:
def __init__(self, grass_binary, tmpdir = None, template_args = {}, location = None, auto_cleanup=True):
self.grass_binary = grass_binary
if tmpdir is None:
tmpdir = tempfile.mkdtemp('_grass_engine')
self.tmpdir = tmpdir
self.template_args = template_args
self.location = location
self.auto_cleanup = auto_cleanup
def get_cwd(self):
return self.tmpdir
def add_file(self, filename, source, use_as_location=False):
param = os.path.splitext(filename)[0] # filename without extension
dst_path = os.path.abspath(os.path.join(self.get_cwd(), filename))
with open(dst_path, 'w') as f:
f.write(source)
self.template_args[param] = dst_path
if use_as_location:
self.set_location(self.template_args[param])
return dst_path
def add_param(self, param, value):
self.template_args[param] = value
def set_location(self, location):
"""
:param location: either a "epsg:XXXXX" string or a path to a geospatial file defining the location
"""
if not location.lower().startswith('epsg:'):
location = os.path.abspath(location)
self.location = location
def execute(self, script):
"""
:param script: path to .grass script
:return: script output
"""
if self.location is None: raise GrassEngineException("Location is not set")
script = os.path.abspath(script)
# Create grass script via template substitution
try:
with open(script) as f:
script_content = f.read()
except FileNotFoundError:
raise GrassEngineException("Script does not exist: {}".format(script))
tmpl = Template(script_content)
# Write script to disk
if not os.path.exists(self.get_cwd()):
os.mkdir(self.get_cwd())
with open(os.path.join(self.get_cwd(), 'script.sh'), 'w') as f:
f.write(tmpl.substitute(self.template_args))
# Execute it
log.ODM_INFO("Executing grass script from {}: {} --tmp-location {} --exec bash script.sh".format(self.get_cwd(), self.grass_binary, self.location))
2019-04-25 17:15:54 +00:00
env = os.environ.copy()
env["GRASS_ADDON_PATH"] = env.get("GRASS_ADDON_PATH", "") + os.path.abspath(os.path.join("scripts/grass_addons"))
filename = os.path.join(self.get_cwd(), 'output.log')
with open(filename, 'wb') as writer, open(filename, 'rb', 1) as reader:
p = subprocess.Popen([self.grass_binary, '--tmp-location', self.location, '--exec', 'bash', 'script.sh'],
cwd=self.get_cwd(), stdout=subprocess.PIPE, stderr=writer, env=env)
while p.poll() is None:
sys.stdout.write(reader.read())
time.sleep(0.5)
# Read the remaining
sys.stdout.write(reader.read())
out, err = p.communicate()
out = out.decode('utf-8').strip()
if p.returncode == 0:
return out
else:
raise GrassEngineException("Could not execute GRASS script {} from {}: {}".format(script, self.get_cwd(), err))
2019-04-24 22:33:12 +00:00
def serialize(self):
return {
'tmpdir': self.tmpdir,
'template_args': self.template_args,
'location': self.location,
'auto_cleanup': self.auto_cleanup
}
def cleanup(self):
if os.path.exists(self.get_cwd()):
shutil.rmtree(self.get_cwd())
def __del__(self):
if self.auto_cleanup:
self.cleanup()
class GrassEngineException(Exception):
pass
def cleanup_grass_context(serialized_context):
ctx = grass.create_context(serialized_context)
ctx.cleanup()
grass = GrassEngine()