kopia lustrzana https://github.com/kartoza/docker-osm
refactoring imposm
rodzic
c3692553c2
commit
eb67fae10f
|
@ -28,211 +28,256 @@ from subprocess import call
|
|||
from time import sleep
|
||||
from sys import stderr
|
||||
|
||||
# All these default values can be overwritten by env vars
|
||||
default = {
|
||||
'TIME': 120,
|
||||
'USER': 'docker',
|
||||
'PASSWORD': 'docker',
|
||||
'DATABASE': 'gis',
|
||||
'HOST': 'db',
|
||||
'PORT': '5432',
|
||||
'SETTINGS': 'settings',
|
||||
'CACHE': 'cache',
|
||||
'IMPORT_DONE': 'import_done',
|
||||
'IMPORT_QUEUE': 'import_queue',
|
||||
'SRID': '4326',
|
||||
'OPTIMIZE': 'false',
|
||||
'DBSCHEMA_PRODUCTION': 'public',
|
||||
'DBSCHEMA_IMPORT': 'import',
|
||||
'DBSCHEMA_BACKUP': 'backup',
|
||||
'QGIS_STYLE': 'yes'
|
||||
}
|
||||
|
||||
# Check if we overwrite default values.
|
||||
for key in environ.keys():
|
||||
if key in default.keys():
|
||||
default[key] = environ[key]
|
||||
class Importer(object):
|
||||
|
||||
# Check valid SRID.
|
||||
if default['SRID'] not in ['4326', '3857']:
|
||||
print >> stderr, 'SRID not supported : %s' % default['SRID']
|
||||
exit()
|
||||
def __init__(self):
|
||||
# Default values which can be overwritten.
|
||||
self.default = {
|
||||
'TIME': 120,
|
||||
'USER': 'docker',
|
||||
'PASSWORD': 'docker',
|
||||
'DATABASE': 'gis',
|
||||
'HOST': 'db',
|
||||
'PORT': '5432',
|
||||
'SETTINGS': 'settings',
|
||||
'CACHE': 'cache',
|
||||
'IMPORT_DONE': 'import_done',
|
||||
'IMPORT_QUEUE': 'import_queue',
|
||||
'SRID': '4326',
|
||||
'OPTIMIZE': 'false',
|
||||
'DBSCHEMA_PRODUCTION': 'public',
|
||||
'DBSCHEMA_IMPORT': 'import',
|
||||
'DBSCHEMA_BACKUP': 'backup',
|
||||
'QGIS_STYLE': 'yes'
|
||||
}
|
||||
self.osm_file = None
|
||||
self.mapping_file = None
|
||||
self.post_import_file = None
|
||||
self.qgis_style = None
|
||||
|
||||
# Check valid QGIS_STYLE.
|
||||
if default['QGIS_STYLE'] not in ['yes', 'no']:
|
||||
print >> stderr, 'QGIS_STYLE not supported : %s' % default['QGIS_STYLE']
|
||||
exit()
|
||||
self.cursor = None
|
||||
self.postgis_uri = None
|
||||
|
||||
# Check folders.
|
||||
folders = ['IMPORT_QUEUE', 'IMPORT_DONE', 'SETTINGS', 'CACHE']
|
||||
for folder in folders:
|
||||
if not isabs(default[folder]):
|
||||
# Get the absolute path.
|
||||
default[folder] = abspath(default[folder])
|
||||
@staticmethod
|
||||
def info(message):
|
||||
print message
|
||||
|
||||
# Test the folder
|
||||
if not exists(default[folder]):
|
||||
print >> stderr, 'The folder %s does not exist.' % default[folder]
|
||||
@staticmethod
|
||||
def error(message):
|
||||
print >> stderr, message
|
||||
exit()
|
||||
|
||||
# Test files
|
||||
osm_file = None
|
||||
mapping_file = None
|
||||
post_import_file = None
|
||||
qgis_style = None
|
||||
for f in listdir(default['SETTINGS']):
|
||||
def overwrite_environment(self):
|
||||
"""Overwrite default values from the environment."""
|
||||
for key in environ.keys():
|
||||
if key in self.default.keys():
|
||||
self.default[key] = environ[key]
|
||||
|
||||
if f.endswith('.pbf'):
|
||||
osm_file = join(default['SETTINGS'], f)
|
||||
def check_settings(self):
|
||||
"""Perform various checking."""
|
||||
|
||||
if f.endswith('.json'):
|
||||
mapping_file = join(default['SETTINGS'], f)
|
||||
# Check valid SRID.
|
||||
if self.default['SRID'] not in ['4326', '3857']:
|
||||
msg = 'SRID not supported : %s' % self.default['SRID']
|
||||
self.error(msg)
|
||||
|
||||
if f == 'post-pbf-import.sql':
|
||||
post_import_file = join(default['SETTINGS'], f)
|
||||
# Check valid QGIS_STYLE.
|
||||
if self.default['QGIS_STYLE'] not in ['yes', 'no']:
|
||||
msg = 'QGIS_STYLE not supported : %s' % self.default['QGIS_STYLE']
|
||||
self.error(msg)
|
||||
|
||||
if f == 'qgis_style.sql':
|
||||
qgis_style = join(default['SETTINGS'], f)
|
||||
# Check folders.
|
||||
folders = ['IMPORT_QUEUE', 'IMPORT_DONE', 'SETTINGS', 'CACHE']
|
||||
for folder in folders:
|
||||
if not isabs(self.default[folder]):
|
||||
# Get the absolute path.
|
||||
self.default[folder] = abspath(self.default[folder])
|
||||
|
||||
if not osm_file:
|
||||
print >> stderr, 'OSM file *.pbf is missing in %s' % default['SETTINGS']
|
||||
exit()
|
||||
# Test the folder
|
||||
if not exists(self.default[folder]):
|
||||
msg = 'The folder %s does not exist.' % self.default[folder]
|
||||
self.error(msg)
|
||||
|
||||
if not mapping_file:
|
||||
print >> stderr, 'Mapping file *.json is missing in %s' % default['SETTINGS']
|
||||
exit()
|
||||
# Test files
|
||||
for f in listdir(self.default['SETTINGS']):
|
||||
|
||||
if not post_import_file:
|
||||
print 'No *.sql detected in %s' % default['SETTINGS']
|
||||
else:
|
||||
print '%s detected for post import.' % post_import_file
|
||||
if f.endswith('.pbf'):
|
||||
self.osm_file = join(self.default['SETTINGS'], f)
|
||||
|
||||
if not qgis_style and default['QGIS_STYLE'] == 'yes':
|
||||
print >> stderr, 'qgis_style.sql is missing in %s and QGIS_STYLE = yes.' % default['SETTINGS']
|
||||
exit()
|
||||
elif qgis_style and default['QGIS_STYLE']:
|
||||
print '%s detected for QGIS styling.' % qgis_style
|
||||
else:
|
||||
print 'Not using QGIS default styles.'
|
||||
if f.endswith('.json'):
|
||||
self.mapping_file = join(self.default['SETTINGS'], f)
|
||||
|
||||
# Create the timestamp file
|
||||
file_path = join(default['SETTINGS'], 'timestamp.txt')
|
||||
timestamp_file = open(file_path, 'w')
|
||||
timestamp_file.write('UNDEFINED\n')
|
||||
timestamp_file.close()
|
||||
if f == 'post-pbf-import.sql':
|
||||
self.post_import_file = join(self.default['SETTINGS'], f)
|
||||
|
||||
# In docker-compose, we should wait for the DB is ready.
|
||||
print 'The checkup is OK. The container will continue soon, after the database.'
|
||||
sleep(45)
|
||||
if f == 'qgis_style.sql':
|
||||
self.qgis_style = join(self.default['SETTINGS'], f)
|
||||
|
||||
# Check postgis.
|
||||
try:
|
||||
connection = connect(
|
||||
"dbname='%s' user='%s' host='%s' password='%s'" % (
|
||||
default['DATABASE'],
|
||||
default['USER'],
|
||||
default['HOST'],
|
||||
default['PASSWORD']))
|
||||
cursor = connection.cursor()
|
||||
except OperationalError as e:
|
||||
print >> stderr, e
|
||||
exit()
|
||||
if not self.osm_file:
|
||||
msg = 'OSM file *.pbf is missing in %s' % self.default['SETTINGS']
|
||||
self.error(msg)
|
||||
|
||||
postgis_uri = 'postgis://%s:%s@%s/%s' % (
|
||||
default['USER'],
|
||||
default['PASSWORD'],
|
||||
default['HOST'],
|
||||
default['DATABASE'])
|
||||
if not self.mapping_file:
|
||||
msg = 'Mapping file *.json is missing in %s' % self.default['SETTINGS']
|
||||
self.error(msg)
|
||||
|
||||
if not self.post_import_file:
|
||||
self.info('No *.sql detected in %s' % self.default['SETTINGS'])
|
||||
else:
|
||||
self.info('%s detected for post import.' % self.post_import_file)
|
||||
|
||||
if not self.qgis_style and self.default['QGIS_STYLE'] == 'yes':
|
||||
msg = 'qgis_style.sql is missing in %s and QGIS_STYLE = yes.' % self.default['SETTINGS']
|
||||
self.error(msg)
|
||||
elif self.qgis_style and self.default['QGIS_STYLE']:
|
||||
self.info('%s detected for QGIS styling.' % self.qgis_style)
|
||||
else:
|
||||
self.info('Not using QGIS default styles.')
|
||||
|
||||
# Check if there is a table starting with 'osm_'
|
||||
sql = 'select count(*) ' \
|
||||
'from information_schema.tables ' \
|
||||
'where table_name like \'osm_%\';'
|
||||
# noinspection PyUnboundLocalVariable
|
||||
cursor.execute(sql)
|
||||
osm_tables = cursor.fetchone()[0]
|
||||
if osm_tables < 1:
|
||||
# It means that the DB is empty. Let's import the PBF file.
|
||||
command = ['imposm3', 'import', '-diff', '-deployproduction']
|
||||
command += ['-overwritecache', '-cachedir', default['CACHE']]
|
||||
command += ['-srid', default['SRID']]
|
||||
command += ['-dbschema-production', default['DBSCHEMA_PRODUCTION']]
|
||||
command += ['-dbschema-import', default['DBSCHEMA_IMPORT']]
|
||||
command += ['-dbschema-backup', default['DBSCHEMA_BACKUP']]
|
||||
command += ['-diffdir', default['SETTINGS']]
|
||||
command += ['-mapping', mapping_file]
|
||||
command += ['-read', osm_file]
|
||||
command += ['-write', '-connection', postgis_uri]
|
||||
# In docker-compose, we should wait for the DB is ready.
|
||||
self.info('The checkup is OK. The container will continue soon, after the database.')
|
||||
sleep(45)
|
||||
|
||||
print 'The database is empty. Let\'s import the PBF : %s' % osm_file
|
||||
print ' '.join(command)
|
||||
if not call(command) == 0:
|
||||
print >> stderr, 'An error occured in imposm with the original file.'
|
||||
exit()
|
||||
else:
|
||||
print 'Import PBF successful : %s' % osm_file
|
||||
def create_timestamp(self):
|
||||
file_path = join(self.default['SETTINGS'], 'timestamp.txt')
|
||||
timestamp_file = open(file_path, 'w')
|
||||
timestamp_file.write('UNDEFINED\n')
|
||||
timestamp_file.close()
|
||||
|
||||
if post_import_file or qgis_style:
|
||||
# Set the password for psql
|
||||
environ['PGPASSWORD'] = default['PASSWORD']
|
||||
def update_timestamp(self, database_timestamp):
|
||||
file_path = join(self.default['SETTINGS'], 'timestamp.txt')
|
||||
timestamp_file = open(file_path, 'w')
|
||||
timestamp_file.write('%s\n' % database_timestamp)
|
||||
timestamp_file.close()
|
||||
|
||||
if post_import_file:
|
||||
print 'Running the post import SQL file.'
|
||||
def check_postgis(self):
|
||||
try:
|
||||
connection = connect(
|
||||
"dbname='%s' user='%s' host='%s' password='%s'" % (
|
||||
self.default['DATABASE'],
|
||||
self.default['USER'],
|
||||
self.default['HOST'],
|
||||
self.default['PASSWORD']))
|
||||
self.cursor = connection.cursor()
|
||||
except OperationalError as e:
|
||||
print >> stderr, e
|
||||
exit()
|
||||
|
||||
self.postgis_uri = 'postgis://%s:%s@%s/%s' % (
|
||||
self.default['USER'],
|
||||
self.default['PASSWORD'],
|
||||
self.default['HOST'],
|
||||
self.default['DATABASE'])
|
||||
|
||||
def import_custom_sql(self):
|
||||
self.info('Running the post import SQL file.')
|
||||
command = ['psql']
|
||||
command += ['-h', default['HOST']]
|
||||
command += ['-U', default['USER']]
|
||||
command += ['-d', default['DATABASE']]
|
||||
command += ['-f', post_import_file]
|
||||
command += ['-h', self.default['HOST']]
|
||||
command += ['-U', self.default['USER']]
|
||||
command += ['-d', self.default['DATABASE']]
|
||||
command += ['-f', self.post_import_file]
|
||||
call(command)
|
||||
|
||||
if qgis_style:
|
||||
'Installing QGIS styles.'
|
||||
def import_qgis_styles(self):
|
||||
self.info('Installing QGIS styles.')
|
||||
command = ['psql']
|
||||
command += ['-h', default['HOST']]
|
||||
command += ['-U', default['USER']]
|
||||
command += ['-d', default['DATABASE']]
|
||||
command += ['-f', qgis_style]
|
||||
command += ['-h', self.default['HOST']]
|
||||
command += ['-U', self.default['USER']]
|
||||
command += ['-d', self.default['DATABASE']]
|
||||
command += ['-f', self.qgis_style]
|
||||
call(command)
|
||||
else:
|
||||
print 'The database is not empty. Let\'s import only diff files.'
|
||||
|
||||
# Finally launch the listening process.
|
||||
while True:
|
||||
import_queue = sorted(listdir(default['IMPORT_QUEUE']))
|
||||
if len(import_queue) > 0:
|
||||
for diff in import_queue:
|
||||
print 'Importing diff %s' % diff
|
||||
command = ['imposm3', 'diff']
|
||||
command += ['-cachedir', default['CACHE']]
|
||||
command += ['-dbschema-production', default['DBSCHEMA_PRODUCTION']]
|
||||
command += ['-dbschema-import', default['DBSCHEMA_IMPORT']]
|
||||
command += ['-dbschema-backup', default['DBSCHEMA_BACKUP']]
|
||||
command += ['-srid', default['SRID']]
|
||||
command += ['-diffdir', default['SETTINGS']]
|
||||
command += ['-mapping', mapping_file]
|
||||
command += ['-connection', postgis_uri]
|
||||
command += [join(default['IMPORT_QUEUE'], diff)]
|
||||
def count_table(self, name):
|
||||
"""Check if there is a table starting with name."""
|
||||
sql = 'select count(*) ' \
|
||||
'from information_schema.tables ' \
|
||||
'where table_name like \'%s\';' % name
|
||||
# noinspection PyUnboundLocalVariable
|
||||
self.cursor.execute(sql)
|
||||
return self.cursor.fetchone()[0]
|
||||
|
||||
print ' '.join(command)
|
||||
if call(command) == 0:
|
||||
move(
|
||||
join(default['IMPORT_QUEUE'], diff),
|
||||
join(default['IMPORT_DONE'], diff))
|
||||
def run(self):
|
||||
osm_tables = self.count_table('osm_%')
|
||||
if osm_tables < 1:
|
||||
# It means that the DB is empty. Let's import the PBF file.
|
||||
self._first_pbf_import()
|
||||
else:
|
||||
self.info('The database is not empty. Let\'s import only diff files.')
|
||||
|
||||
# Update the timestamp in the file.
|
||||
database_timestamp = diff.split('.')[0].split('->-')[1]
|
||||
file_path = join(default['SETTINGS'], 'timestamp.txt')
|
||||
timestamp_file = open(file_path, 'w')
|
||||
timestamp_file.write('%s\n' % database_timestamp)
|
||||
timestamp_file.close()
|
||||
self._import_diff()
|
||||
|
||||
print 'Import diff successful : %s' % diff
|
||||
else:
|
||||
print >> stderr, 'An error occured in imposm with a diff.'
|
||||
exit()
|
||||
def _first_pbf_import(self):
|
||||
command = ['imposm3', 'import', '-diff', '-deployproduction']
|
||||
command += ['-overwritecache', '-cachedir', self.default['CACHE']]
|
||||
command += ['-srid', self.default['SRID']]
|
||||
command += ['-dbschema-production',
|
||||
self.default['DBSCHEMA_PRODUCTION']]
|
||||
command += ['-dbschema-import', self.default['DBSCHEMA_IMPORT']]
|
||||
command += ['-dbschema-backup', self.default['DBSCHEMA_BACKUP']]
|
||||
command += ['-diffdir', self.default['SETTINGS']]
|
||||
command += ['-mapping', self.mapping_file]
|
||||
command += ['-read', self.osm_file]
|
||||
command += ['-write', '-connection', self.postgis_uri]
|
||||
self.info('The database is empty. Let\'s import the PBF : %s' % self.osm_file)
|
||||
self.info(' '.join(command))
|
||||
if not call(command) == 0:
|
||||
msg = 'An error occured in imposm with the original file.'
|
||||
self.error(msg)
|
||||
else:
|
||||
self.info('Import PBF successful : %s' % self.osm_file)
|
||||
|
||||
if len(listdir(default['IMPORT_QUEUE'])) == 0:
|
||||
print 'Sleeping for %s seconds.' % default['TIME']
|
||||
sleep(float(default['TIME']))
|
||||
if self.post_import_file or self.qgis_style:
|
||||
# Set the password for psql
|
||||
environ['PGPASSWORD'] = self.default['PASSWORD']
|
||||
|
||||
if self.post_import_file:
|
||||
self.import_custom_sql()
|
||||
|
||||
if self.qgis_style:
|
||||
self.import_qgis_styles()
|
||||
|
||||
def _import_diff(self):
|
||||
# Finally launch the listening process.
|
||||
while True:
|
||||
import_queue = sorted(listdir(self.default['IMPORT_QUEUE']))
|
||||
if len(import_queue) > 0:
|
||||
for diff in import_queue:
|
||||
self.info('Importing diff %s' % diff)
|
||||
command = ['imposm3', 'diff']
|
||||
command += ['-cachedir', self.default['CACHE']]
|
||||
command += ['-dbschema-production', self.default['DBSCHEMA_PRODUCTION']]
|
||||
command += ['-dbschema-import', self.default['DBSCHEMA_IMPORT']]
|
||||
command += ['-dbschema-backup', self.default['DBSCHEMA_BACKUP']]
|
||||
command += ['-srid', self.default['SRID']]
|
||||
command += ['-diffdir', self.default['SETTINGS']]
|
||||
command += ['-mapping', self.mapping_file]
|
||||
command += ['-connection', self.postgis_uri]
|
||||
command += [join(self.default['IMPORT_QUEUE'], diff)]
|
||||
|
||||
self.info(' '.join(command))
|
||||
if call(command) == 0:
|
||||
move(
|
||||
join(self.default['IMPORT_QUEUE'], diff),
|
||||
join(self.default['IMPORT_DONE'], diff))
|
||||
|
||||
# Update the timestamp in the file.
|
||||
database_timestamp = diff.split('.')[0].split('->-')[1]
|
||||
self.update_timestamp(database_timestamp)
|
||||
|
||||
self.info('Import diff successful : %s' % diff)
|
||||
else:
|
||||
msg = 'An error occured in imposm with a diff.'
|
||||
self.error(msg)
|
||||
|
||||
if len(listdir(self.default['IMPORT_QUEUE'])) == 0:
|
||||
self.info('Sleeping for %s seconds.' % self.default['TIME'])
|
||||
sleep(float(self.default['TIME']))
|
||||
|
||||
if __name__ == '__main__':
|
||||
importer = Importer()
|
||||
importer.overwrite_environment()
|
||||
importer.check_settings()
|
||||
importer.create_timestamp()
|
||||
importer.check_postgis()
|
||||
importer.run()
|
||||
|
|
Ładowanie…
Reference in New Issue