Merge pull request #28 from Gustry/refactoring

Refactoring osmupdate and imposm
pull/31/head
Etienne Trimaille 2015-12-29 14:10:23 +01:00
commit 86340505c6
2 zmienionych plików z 331 dodań i 264 usunięć

Wyświetl plik

@ -28,211 +28,256 @@ from subprocess import call
from time import sleep
from sys import stderr
# All these default values can be overwritten by env vars
default = {
'TIME': 120,
'USER': 'docker',
'PASSWORD': 'docker',
'DATABASE': 'gis',
'HOST': 'db',
'PORT': '5432',
'SETTINGS': 'settings',
'CACHE': 'cache',
'IMPORT_DONE': 'import_done',
'IMPORT_QUEUE': 'import_queue',
'SRID': '4326',
'OPTIMIZE': 'false',
'DBSCHEMA_PRODUCTION': 'public',
'DBSCHEMA_IMPORT': 'import',
'DBSCHEMA_BACKUP': 'backup',
'QGIS_STYLE': 'yes'
}
# Check if we overwrite default values.
for key in environ.keys():
if key in default.keys():
default[key] = environ[key]
class Importer(object):
# Check valid SRID.
if default['SRID'] not in ['4326', '3857']:
print >> stderr, 'SRID not supported : %s' % default['SRID']
exit()
def __init__(self):
# Default values which can be overwritten.
self.default = {
'TIME': 120,
'USER': 'docker',
'PASSWORD': 'docker',
'DATABASE': 'gis',
'HOST': 'db',
'PORT': '5432',
'SETTINGS': 'settings',
'CACHE': 'cache',
'IMPORT_DONE': 'import_done',
'IMPORT_QUEUE': 'import_queue',
'SRID': '4326',
'OPTIMIZE': 'false',
'DBSCHEMA_PRODUCTION': 'public',
'DBSCHEMA_IMPORT': 'import',
'DBSCHEMA_BACKUP': 'backup',
'QGIS_STYLE': 'yes'
}
self.osm_file = None
self.mapping_file = None
self.post_import_file = None
self.qgis_style = None
# Check valid QGIS_STYLE.
if default['QGIS_STYLE'] not in ['yes', 'no']:
print >> stderr, 'QGIS_STYLE not supported : %s' % default['QGIS_STYLE']
exit()
self.cursor = None
self.postgis_uri = None
# Check folders.
folders = ['IMPORT_QUEUE', 'IMPORT_DONE', 'SETTINGS', 'CACHE']
for folder in folders:
if not isabs(default[folder]):
# Get the absolute path.
default[folder] = abspath(default[folder])
@staticmethod
def info(message):
print message
# Test the folder
if not exists(default[folder]):
print >> stderr, 'The folder %s does not exist.' % default[folder]
@staticmethod
def error(message):
print >> stderr, message
exit()
# Test files
osm_file = None
mapping_file = None
post_import_file = None
qgis_style = None
for f in listdir(default['SETTINGS']):
def overwrite_environment(self):
"""Overwrite default values from the environment."""
for key in environ.keys():
if key in self.default.keys():
self.default[key] = environ[key]
if f.endswith('.pbf'):
osm_file = join(default['SETTINGS'], f)
def check_settings(self):
"""Perform various checking."""
if f.endswith('.json'):
mapping_file = join(default['SETTINGS'], f)
# Check valid SRID.
if self.default['SRID'] not in ['4326', '3857']:
msg = 'SRID not supported : %s' % self.default['SRID']
self.error(msg)
if f == 'post-pbf-import.sql':
post_import_file = join(default['SETTINGS'], f)
# Check valid QGIS_STYLE.
if self.default['QGIS_STYLE'] not in ['yes', 'no']:
msg = 'QGIS_STYLE not supported : %s' % self.default['QGIS_STYLE']
self.error(msg)
if f == 'qgis_style.sql':
qgis_style = join(default['SETTINGS'], f)
# Check folders.
folders = ['IMPORT_QUEUE', 'IMPORT_DONE', 'SETTINGS', 'CACHE']
for folder in folders:
if not isabs(self.default[folder]):
# Get the absolute path.
self.default[folder] = abspath(self.default[folder])
if not osm_file:
print >> stderr, 'OSM file *.pbf is missing in %s' % default['SETTINGS']
exit()
# Test the folder
if not exists(self.default[folder]):
msg = 'The folder %s does not exist.' % self.default[folder]
self.error(msg)
if not mapping_file:
print >> stderr, 'Mapping file *.json is missing in %s' % default['SETTINGS']
exit()
# Test files
for f in listdir(self.default['SETTINGS']):
if not post_import_file:
print 'No *.sql detected in %s' % default['SETTINGS']
else:
print '%s detected for post import.' % post_import_file
if f.endswith('.pbf'):
self.osm_file = join(self.default['SETTINGS'], f)
if not qgis_style and default['QGIS_STYLE'] == 'yes':
print >> stderr, 'qgis_style.sql is missing in %s and QGIS_STYLE = yes.' % default['SETTINGS']
exit()
elif qgis_style and default['QGIS_STYLE']:
print '%s detected for QGIS styling.' % qgis_style
else:
print 'Not using QGIS default styles.'
if f.endswith('.json'):
self.mapping_file = join(self.default['SETTINGS'], f)
# Create the timestamp file
file_path = join(default['SETTINGS'], 'timestamp.txt')
timestamp_file = open(file_path, 'w')
timestamp_file.write('UNDEFINED\n')
timestamp_file.close()
if f == 'post-pbf-import.sql':
self.post_import_file = join(self.default['SETTINGS'], f)
# In docker-compose, we should wait for the DB is ready.
print 'The checkup is OK. The container will continue soon, after the database.'
sleep(45)
if f == 'qgis_style.sql':
self.qgis_style = join(self.default['SETTINGS'], f)
# Check postgis.
try:
connection = connect(
"dbname='%s' user='%s' host='%s' password='%s'" % (
default['DATABASE'],
default['USER'],
default['HOST'],
default['PASSWORD']))
cursor = connection.cursor()
except OperationalError as e:
print >> stderr, e
exit()
if not self.osm_file:
msg = 'OSM file *.pbf is missing in %s' % self.default['SETTINGS']
self.error(msg)
postgis_uri = 'postgis://%s:%s@%s/%s' % (
default['USER'],
default['PASSWORD'],
default['HOST'],
default['DATABASE'])
if not self.mapping_file:
msg = 'Mapping file *.json is missing in %s' % self.default['SETTINGS']
self.error(msg)
if not self.post_import_file:
self.info('No *.sql detected in %s' % self.default['SETTINGS'])
else:
self.info('%s detected for post import.' % self.post_import_file)
if not self.qgis_style and self.default['QGIS_STYLE'] == 'yes':
msg = 'qgis_style.sql is missing in %s and QGIS_STYLE = yes.' % self.default['SETTINGS']
self.error(msg)
elif self.qgis_style and self.default['QGIS_STYLE']:
self.info('%s detected for QGIS styling.' % self.qgis_style)
else:
self.info('Not using QGIS default styles.')
# Check if there is a table starting with 'osm_'
sql = 'select count(*) ' \
'from information_schema.tables ' \
'where table_name like \'osm_%\';'
# noinspection PyUnboundLocalVariable
cursor.execute(sql)
osm_tables = cursor.fetchone()[0]
if osm_tables < 1:
# It means that the DB is empty. Let's import the PBF file.
command = ['imposm3', 'import', '-diff', '-deployproduction']
command += ['-overwritecache', '-cachedir', default['CACHE']]
command += ['-srid', default['SRID']]
command += ['-dbschema-production', default['DBSCHEMA_PRODUCTION']]
command += ['-dbschema-import', default['DBSCHEMA_IMPORT']]
command += ['-dbschema-backup', default['DBSCHEMA_BACKUP']]
command += ['-diffdir', default['SETTINGS']]
command += ['-mapping', mapping_file]
command += ['-read', osm_file]
command += ['-write', '-connection', postgis_uri]
# In docker-compose, we should wait for the DB is ready.
self.info('The checkup is OK. The container will continue soon, after the database.')
sleep(45)
print 'The database is empty. Let\'s import the PBF : %s' % osm_file
print ' '.join(command)
if not call(command) == 0:
print >> stderr, 'An error occured in imposm with the original file.'
exit()
else:
print 'Import PBF successful : %s' % osm_file
def create_timestamp(self):
file_path = join(self.default['SETTINGS'], 'timestamp.txt')
timestamp_file = open(file_path, 'w')
timestamp_file.write('UNDEFINED\n')
timestamp_file.close()
if post_import_file or qgis_style:
# Set the password for psql
environ['PGPASSWORD'] = default['PASSWORD']
def update_timestamp(self, database_timestamp):
file_path = join(self.default['SETTINGS'], 'timestamp.txt')
timestamp_file = open(file_path, 'w')
timestamp_file.write('%s\n' % database_timestamp)
timestamp_file.close()
if post_import_file:
print 'Running the post import SQL file.'
def check_postgis(self):
try:
connection = connect(
"dbname='%s' user='%s' host='%s' password='%s'" % (
self.default['DATABASE'],
self.default['USER'],
self.default['HOST'],
self.default['PASSWORD']))
self.cursor = connection.cursor()
except OperationalError as e:
print >> stderr, e
exit()
self.postgis_uri = 'postgis://%s:%s@%s/%s' % (
self.default['USER'],
self.default['PASSWORD'],
self.default['HOST'],
self.default['DATABASE'])
def import_custom_sql(self):
self.info('Running the post import SQL file.')
command = ['psql']
command += ['-h', default['HOST']]
command += ['-U', default['USER']]
command += ['-d', default['DATABASE']]
command += ['-f', post_import_file]
command += ['-h', self.default['HOST']]
command += ['-U', self.default['USER']]
command += ['-d', self.default['DATABASE']]
command += ['-f', self.post_import_file]
call(command)
if qgis_style:
'Installing QGIS styles.'
def import_qgis_styles(self):
self.info('Installing QGIS styles.')
command = ['psql']
command += ['-h', default['HOST']]
command += ['-U', default['USER']]
command += ['-d', default['DATABASE']]
command += ['-f', qgis_style]
command += ['-h', self.default['HOST']]
command += ['-U', self.default['USER']]
command += ['-d', self.default['DATABASE']]
command += ['-f', self.qgis_style]
call(command)
else:
print 'The database is not empty. Let\'s import only diff files.'
# Finally launch the listening process.
while True:
import_queue = sorted(listdir(default['IMPORT_QUEUE']))
if len(import_queue) > 0:
for diff in import_queue:
print 'Importing diff %s' % diff
command = ['imposm3', 'diff']
command += ['-cachedir', default['CACHE']]
command += ['-dbschema-production', default['DBSCHEMA_PRODUCTION']]
command += ['-dbschema-import', default['DBSCHEMA_IMPORT']]
command += ['-dbschema-backup', default['DBSCHEMA_BACKUP']]
command += ['-srid', default['SRID']]
command += ['-diffdir', default['SETTINGS']]
command += ['-mapping', mapping_file]
command += ['-connection', postgis_uri]
command += [join(default['IMPORT_QUEUE'], diff)]
def count_table(self, name):
"""Check if there is a table starting with name."""
sql = 'select count(*) ' \
'from information_schema.tables ' \
'where table_name like \'%s\';' % name
# noinspection PyUnboundLocalVariable
self.cursor.execute(sql)
return self.cursor.fetchone()[0]
print ' '.join(command)
if call(command) == 0:
move(
join(default['IMPORT_QUEUE'], diff),
join(default['IMPORT_DONE'], diff))
def run(self):
osm_tables = self.count_table('osm_%')
if osm_tables < 1:
# It means that the DB is empty. Let's import the PBF file.
self._first_pbf_import()
else:
self.info('The database is not empty. Let\'s import only diff files.')
# Update the timestamp in the file.
database_timestamp = diff.split('.')[0].split('->-')[1]
file_path = join(default['SETTINGS'], 'timestamp.txt')
timestamp_file = open(file_path, 'w')
timestamp_file.write('%s\n' % database_timestamp)
timestamp_file.close()
self._import_diff()
print 'Import diff successful : %s' % diff
else:
print >> stderr, 'An error occured in imposm with a diff.'
exit()
def _first_pbf_import(self):
command = ['imposm3', 'import', '-diff', '-deployproduction']
command += ['-overwritecache', '-cachedir', self.default['CACHE']]
command += ['-srid', self.default['SRID']]
command += ['-dbschema-production',
self.default['DBSCHEMA_PRODUCTION']]
command += ['-dbschema-import', self.default['DBSCHEMA_IMPORT']]
command += ['-dbschema-backup', self.default['DBSCHEMA_BACKUP']]
command += ['-diffdir', self.default['SETTINGS']]
command += ['-mapping', self.mapping_file]
command += ['-read', self.osm_file]
command += ['-write', '-connection', self.postgis_uri]
self.info('The database is empty. Let\'s import the PBF : %s' % self.osm_file)
self.info(' '.join(command))
if not call(command) == 0:
msg = 'An error occured in imposm with the original file.'
self.error(msg)
else:
self.info('Import PBF successful : %s' % self.osm_file)
if len(listdir(default['IMPORT_QUEUE'])) == 0:
print 'Sleeping for %s seconds.' % default['TIME']
sleep(float(default['TIME']))
if self.post_import_file or self.qgis_style:
# Set the password for psql
environ['PGPASSWORD'] = self.default['PASSWORD']
if self.post_import_file:
self.import_custom_sql()
if self.qgis_style:
self.import_qgis_styles()
def _import_diff(self):
# Finally launch the listening process.
while True:
import_queue = sorted(listdir(self.default['IMPORT_QUEUE']))
if len(import_queue) > 0:
for diff in import_queue:
self.info('Importing diff %s' % diff)
command = ['imposm3', 'diff']
command += ['-cachedir', self.default['CACHE']]
command += ['-dbschema-production', self.default['DBSCHEMA_PRODUCTION']]
command += ['-dbschema-import', self.default['DBSCHEMA_IMPORT']]
command += ['-dbschema-backup', self.default['DBSCHEMA_BACKUP']]
command += ['-srid', self.default['SRID']]
command += ['-diffdir', self.default['SETTINGS']]
command += ['-mapping', self.mapping_file]
command += ['-connection', self.postgis_uri]
command += [join(self.default['IMPORT_QUEUE'], diff)]
self.info(' '.join(command))
if call(command) == 0:
move(
join(self.default['IMPORT_QUEUE'], diff),
join(self.default['IMPORT_DONE'], diff))
# Update the timestamp in the file.
database_timestamp = diff.split('.')[0].split('->-')[1]
self.update_timestamp(database_timestamp)
self.info('Import diff successful : %s' % diff)
else:
msg = 'An error occured in imposm with a diff.'
self.error(msg)
if len(listdir(self.default['IMPORT_QUEUE'])) == 0:
self.info('Sleeping for %s seconds.' % self.default['TIME'])
sleep(float(self.default['TIME']))
if __name__ == '__main__':
importer = Importer()
importer.overwrite_environment()
importer.check_settings()
importer.create_timestamp()
importer.check_postgis()
importer.run()

Wyświetl plik

@ -27,113 +27,135 @@ from datetime import datetime
from time import sleep
from sys import stderr
# Default values which can be overwritten.
default = {
'MAX_DAYS': '100',
'DIFF': 'sporadic',
'MAX_MERGE': '7',
'COMPRESSION_LEVEL': '1',
'BASE_URL': 'http://planet.openstreetmap.org/replication/',
'IMPORT_QUEUE': 'import_queue',
'IMPORT_DONE': 'import_done',
'SETTINGS': 'settings',
'TIME': 120,
}
for key in environ.keys():
if key in default.keys():
default[key] = environ[key]
class Downloader(object):
# Folders
folders = ['IMPORT_QUEUE', 'IMPORT_DONE', 'SETTINGS']
for folder in folders:
if not isabs(default[folder]):
# Get the absolute path.
default[folder] = abspath(default[folder])
def __init__(self):
# Default values which can be overwritten.
self.default = {
'MAX_DAYS': '100',
'DIFF': 'sporadic',
'MAX_MERGE': '7',
'COMPRESSION_LEVEL': '1',
'BASE_URL': 'http://planet.openstreetmap.org/replication/',
'IMPORT_QUEUE': 'import_queue',
'IMPORT_DONE': 'import_done',
'SETTINGS': 'settings',
'TIME': 120,
}
self.osm_file = None
# Test the folder
if not exists(default[folder]):
print >> stderr, 'The folder %s does not exist.' % default[folder]
@staticmethod
def info(message):
print message
@staticmethod
def error(message):
print >> stderr, message
exit()
# Test files
osm_file = None
for f in listdir(default['SETTINGS']):
def overwrite_environment(self):
"""Overwrite default values from the environment."""
for key in environ.keys():
if key in self.default.keys():
self.default[key] = environ[key]
if f.endswith('.pbf'):
osm_file = join(default['SETTINGS'], f)
def check_settings(self):
"""Perform various checking."""
# Folders
folders = ['IMPORT_QUEUE', 'IMPORT_DONE', 'SETTINGS']
for folder in folders:
if not isabs(self.default[folder]):
# Get the absolute path.
self.default[folder] = abspath(self.default[folder])
"""
# Todo : need fix custom URL and sporadic diff : daily, hourly and minutely
if f == 'custom_url_diff.txt':
with open(join(default['SETTINGS'], f), 'r') as content_file:
default['BASE_URL'] = content_file.read()
"""
# Test the folder
if not exists(self.default[folder]):
msg = 'The folder %s does not exist.' % self.default[folder]
self.error(msg)
if not osm_file:
print >> stderr, 'OSM file *.osm.pbf is missing in %s' % default['SETTINGS']
exit()
# Test files
for f in listdir(self.default['SETTINGS']):
# In docker-compose, we should wait for the DB is ready.
print 'The checkup is OK. The container will continue soon, after the database.'
sleep(45)
if f.endswith('.pbf'):
self.osm_file = join(self.default['SETTINGS'], f)
# Finally launch the listening process.
while True:
# Check if diff to be imported is empty. If not, take the latest diff.
diff_to_be_imported = sorted(listdir(default['IMPORT_QUEUE']))
if len(diff_to_be_imported):
file_name = diff_to_be_imported[-1].split('.')[0]
timestamp = file_name.split('->-')[1]
print 'Timestamp from the latest not imported diff : %s' % timestamp
else:
# Check if imported diff is empty. If not, take the latest diff.
imported_diff = sorted(listdir(default['IMPORT_DONE']))
if len(imported_diff):
file_name = imported_diff[-1].split('.')[0]
if not self.osm_file:
msg = 'OSM file *.osm.pbf is missing in %s' % self.default['SETTINGS']
self.error(msg)
self.info('The checkup is OK. The container will continue soon, after the database.')
sleep(45)
def _check_latest_timestamp(self):
"""Fetch the latest timestamp."""
# Check if diff to be imported is empty. If not, take the latest diff.
diff_to_be_imported = sorted(listdir(self.default['IMPORT_QUEUE']))
if len(diff_to_be_imported):
file_name = diff_to_be_imported[-1].split('.')[0]
timestamp = file_name.split('->-')[1]
print 'Timestamp from the latest imported diff : %s' % timestamp
self.info('Timestamp from the latest not imported diff : %s' % timestamp)
else:
# Take the timestamp from original file.
command = ['osmconvert', osm_file, '--out-timestamp']
processus = Popen(command, stdin=PIPE, stdout=PIPE, stderr=PIPE)
timestamp, err = processus.communicate()
# Check if imported diff is empty. If not, take the latest diff.
imported_diff = sorted(listdir(self.default['IMPORT_DONE']))
if len(imported_diff):
file_name = imported_diff[-1].split('.')[0]
timestamp = file_name.split('->-')[1]
self.info('Timestamp from the latest imported diff : %s' % timestamp)
# Remove new line
timestamp = timestamp.strip()
else:
# Take the timestamp from original file.
command = ['osmconvert', self.osm_file, '--out-timestamp']
processus = Popen(
command, stdin=PIPE, stdout=PIPE, stderr=PIPE)
timestamp, err = processus.communicate()
print 'Timestamp from the original state file : %s' % timestamp
# Remove new line
timestamp = timestamp.strip()
# Removing some \ in the timestamp.
timestamp = timestamp.replace('\\', '')
self.info('Timestamp from the original state file : %s' % timestamp)
# Save time
current_time = datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%SZ')
print 'Old time : %s' % timestamp
print 'Current time : %s' % current_time
# Removing some \ in the timestamp.
timestamp = timestamp.replace('\\', '')
return timestamp
# Destination
file_name = '%s->-%s.osc.gz' % (timestamp, current_time)
file_path = join(default['IMPORT_QUEUE'], file_name)
def download(self):
"""Infinite loop to download diff files on a regular interval."""
while True:
timestamp = self._check_latest_timestamp()
# Command
command = ['osmupdate', '-v']
command += ['--max-days=' + default['MAX_DAYS']]
command += [default['DIFF']]
command += ['--max-merge=' + default['MAX_MERGE']]
command += ['--compression-level=' + default['COMPRESSION_LEVEL']]
command += ['--base-url=' + default['BASE_URL']]
command.append(timestamp)
command.append(file_path)
# Save time
current_time = datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%SZ')
self.info('Old time : %s' % timestamp)
self.info('Current time : %s' % current_time)
print ' '.join(command)
if call(command) != 0:
print >> stderr, 'An error occured in osmupdate. Let\'s try again.'
# Sleep less.
print 'Sleeping for 2 seconds.'
sleep(2.0)
else:
# Everything was fine, let's sleeping.
print 'Sleeping for %s seconds.' % default['TIME']
sleep(float(default['TIME']))
# Destination
file_name = '%s->-%s.osc.gz' % (timestamp, current_time)
file_path = join(self.default['IMPORT_QUEUE'], file_name)
# Command
command = ['osmupdate', '-v']
command += ['--max-days=' + self.default['MAX_DAYS']]
command += [self.default['DIFF']]
command += ['--max-merge=' + self.default['MAX_MERGE']]
command += ['--compression-level=' + self.default['COMPRESSION_LEVEL']]
command += ['--base-url=' + self.default['BASE_URL']]
command.append(timestamp)
command.append(file_path)
self.info(' '.join(command))
if call(command) != 0:
self.info('An error occured in osmupdate. Let\'s try again.')
# Sleep less.
self.info('Sleeping for 2 seconds.')
sleep(2.0)
else:
# Everything was fine, let's sleeping.
self.info('Sleeping for %s seconds.' % self.default['TIME'])
sleep(float(self.default['TIME']))
if __name__ == '__main__':
downloader = Downloader()
downloader.overwrite_environment()
downloader.check_settings()
downloader.download()